112 lines
3.8 KiB
Python
112 lines
3.8 KiB
Python
"""메일 파싱: RFC2047 디코딩, 한글, 첨부파일"""
|
|
import email, email.header, email.utils
|
|
import chardet, re
|
|
from typing import Optional
|
|
|
|
|
|
def _safe(s: str) -> str:
|
|
"""surrogate 문자 제거 → JSON 직렬화 안전"""
|
|
try:
|
|
return s.encode('utf-8', errors='replace').decode('utf-8', errors='replace')
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def decode_str(raw: Optional[str]) -> str:
|
|
if not raw:
|
|
return ""
|
|
try:
|
|
parts = email.header.decode_header(raw)
|
|
result = []
|
|
for part, charset in parts:
|
|
if isinstance(part, bytes):
|
|
cs = charset or chardet.detect(part).get('encoding') or 'utf-8'
|
|
result.append(part.decode(cs, errors='replace'))
|
|
else:
|
|
result.append(str(part))
|
|
return _safe("".join(result).strip())
|
|
except Exception:
|
|
return _safe(raw) if raw else ""
|
|
|
|
|
|
def extract_addr(raw: str) -> tuple[str, str]:
|
|
"""'홍길동 <hong@example.com>' → (name, addr)"""
|
|
name, addr = email.utils.parseaddr(decode_str(raw))
|
|
return name or addr, addr
|
|
|
|
|
|
def decode_payload(part) -> str:
|
|
raw = part.get_payload(decode=True)
|
|
if not raw:
|
|
return ""
|
|
charset = part.get_content_charset()
|
|
if not charset:
|
|
detected = chardet.detect(raw)
|
|
charset = detected.get('encoding') or 'utf-8'
|
|
return _safe(raw.decode(charset, errors='replace'))
|
|
|
|
|
|
def sanitize_html(html: str) -> str:
|
|
"""위험 태그/속성 제거 (서버 사이드 기본 처리)"""
|
|
html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL | re.IGNORECASE)
|
|
html = re.sub(r'on\w+="[^"]*"', '', html, flags=re.IGNORECASE)
|
|
html = re.sub(r"on\w+='[^']*'", '', html, flags=re.IGNORECASE)
|
|
return html
|
|
|
|
|
|
def parse_message(msg: email.message.Message) -> dict:
|
|
body_text = ""
|
|
body_html = ""
|
|
attachments = []
|
|
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
ct = part.get_content_type()
|
|
cd = str(part.get('Content-Disposition', ''))
|
|
fn = part.get_filename()
|
|
|
|
if fn or 'attachment' in cd:
|
|
raw = part.get_payload(decode=True) or b""
|
|
attachments.append({
|
|
"part_id": part.get('Content-ID', f"part_{len(attachments)}").strip('<>'),
|
|
"filename": decode_str(fn or "unnamed"),
|
|
"content_type": ct,
|
|
"size": len(raw),
|
|
"_data": raw,
|
|
})
|
|
elif ct == 'text/plain' and not body_text:
|
|
body_text = decode_payload(part)
|
|
elif ct == 'text/html' and not body_html:
|
|
body_html = sanitize_html(decode_payload(part))
|
|
else:
|
|
ct = msg.get_content_type()
|
|
if ct == 'text/html':
|
|
body_html = sanitize_html(decode_payload(msg))
|
|
else:
|
|
body_text = decode_payload(msg)
|
|
|
|
subject = decode_str(msg.get('Subject', '(제목 없음)'))
|
|
sender_raw = msg.get('From', '')
|
|
sender_name, sender_addr = extract_addr(sender_raw)
|
|
_, to_addr = extract_addr(msg.get('To', ''))
|
|
|
|
preview = (body_text or re.sub('<[^>]+>', '', body_html))[:100].strip()
|
|
|
|
return {
|
|
"subject": subject,
|
|
"sender": sender_name or sender_addr,
|
|
"sender_addr": sender_addr,
|
|
"to": decode_str(msg.get('To', '')),
|
|
"cc": decode_str(msg.get('Cc', '')),
|
|
"date": msg.get('Date', ''),
|
|
"body_text": body_text,
|
|
"body_html": body_html,
|
|
"attachments": [
|
|
{"part_id": a["part_id"], "filename": a["filename"],
|
|
"content_type": a["content_type"], "size": a["size"]}
|
|
for a in attachments
|
|
],
|
|
"preview": preview,
|
|
"_attachments_data": {a["part_id"]: a["_data"] for a in attachments},
|
|
}
|