zioinfo-mail/backend/mail_parser.py

112 lines
3.8 KiB
Python

"""메일 파싱: RFC2047 디코딩, 한글, 첨부파일"""
import email, email.header, email.utils
import chardet, re
from typing import Optional
def _safe(s: str) -> str:
"""surrogate 문자 제거 → JSON 직렬화 안전"""
try:
return s.encode('utf-8', errors='replace').decode('utf-8', errors='replace')
except Exception:
return ""
def decode_str(raw: Optional[str]) -> str:
if not raw:
return ""
try:
parts = email.header.decode_header(raw)
result = []
for part, charset in parts:
if isinstance(part, bytes):
cs = charset or chardet.detect(part).get('encoding') or 'utf-8'
result.append(part.decode(cs, errors='replace'))
else:
result.append(str(part))
return _safe("".join(result).strip())
except Exception:
return _safe(raw) if raw else ""
def extract_addr(raw: str) -> tuple[str, str]:
"""'홍길동 <hong@example.com>' → (name, addr)"""
name, addr = email.utils.parseaddr(decode_str(raw))
return name or addr, addr
def decode_payload(part) -> str:
raw = part.get_payload(decode=True)
if not raw:
return ""
charset = part.get_content_charset()
if not charset:
detected = chardet.detect(raw)
charset = detected.get('encoding') or 'utf-8'
return _safe(raw.decode(charset, errors='replace'))
def sanitize_html(html: str) -> str:
"""위험 태그/속성 제거 (서버 사이드 기본 처리)"""
html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL | re.IGNORECASE)
html = re.sub(r'on\w+="[^"]*"', '', html, flags=re.IGNORECASE)
html = re.sub(r"on\w+='[^']*'", '', html, flags=re.IGNORECASE)
return html
def parse_message(msg: email.message.Message) -> dict:
body_text = ""
body_html = ""
attachments = []
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
cd = str(part.get('Content-Disposition', ''))
fn = part.get_filename()
if fn or 'attachment' in cd:
raw = part.get_payload(decode=True) or b""
attachments.append({
"part_id": part.get('Content-ID', f"part_{len(attachments)}").strip('<>'),
"filename": decode_str(fn or "unnamed"),
"content_type": ct,
"size": len(raw),
"_data": raw,
})
elif ct == 'text/plain' and not body_text:
body_text = decode_payload(part)
elif ct == 'text/html' and not body_html:
body_html = sanitize_html(decode_payload(part))
else:
ct = msg.get_content_type()
if ct == 'text/html':
body_html = sanitize_html(decode_payload(msg))
else:
body_text = decode_payload(msg)
subject = decode_str(msg.get('Subject', '(제목 없음)'))
sender_raw = msg.get('From', '')
sender_name, sender_addr = extract_addr(sender_raw)
_, to_addr = extract_addr(msg.get('To', ''))
preview = (body_text or re.sub('<[^>]+>', '', body_html))[:100].strip()
return {
"subject": subject,
"sender": sender_name or sender_addr,
"sender_addr": sender_addr,
"to": decode_str(msg.get('To', '')),
"cc": decode_str(msg.get('Cc', '')),
"date": msg.get('Date', ''),
"body_text": body_text,
"body_html": body_html,
"attachments": [
{"part_id": a["part_id"], "filename": a["filename"],
"content_type": a["content_type"], "size": a["size"]}
for a in attachments
],
"preview": preview,
"_attachments_data": {a["part_id"]: a["_data"] for a in attachments},
}