feat: add backend/mail_parser.py
This commit is contained in:
parent
e54926909f
commit
ce3f55ccdf
111
backend/mail_parser.py
Normal file
111
backend/mail_parser.py
Normal file
@ -0,0 +1,111 @@
|
||||
"""메일 파싱: RFC2047 디코딩, 한글, 첨부파일"""
|
||||
import email, email.header, email.utils
|
||||
import chardet, re
|
||||
from typing import Optional
|
||||
|
||||
|
||||
def _safe(s: str) -> str:
|
||||
"""surrogate 문자 제거 → JSON 직렬화 안전"""
|
||||
try:
|
||||
return s.encode('utf-8', errors='replace').decode('utf-8', errors='replace')
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
|
||||
def decode_str(raw: Optional[str]) -> str:
|
||||
if not raw:
|
||||
return ""
|
||||
try:
|
||||
parts = email.header.decode_header(raw)
|
||||
result = []
|
||||
for part, charset in parts:
|
||||
if isinstance(part, bytes):
|
||||
cs = charset or chardet.detect(part).get('encoding') or 'utf-8'
|
||||
result.append(part.decode(cs, errors='replace'))
|
||||
else:
|
||||
result.append(str(part))
|
||||
return _safe("".join(result).strip())
|
||||
except Exception:
|
||||
return _safe(raw) if raw else ""
|
||||
|
||||
|
||||
def extract_addr(raw: str) -> tuple[str, str]:
|
||||
"""'홍길동 <hong@example.com>' → (name, addr)"""
|
||||
name, addr = email.utils.parseaddr(decode_str(raw))
|
||||
return name or addr, addr
|
||||
|
||||
|
||||
def decode_payload(part) -> str:
|
||||
raw = part.get_payload(decode=True)
|
||||
if not raw:
|
||||
return ""
|
||||
charset = part.get_content_charset()
|
||||
if not charset:
|
||||
detected = chardet.detect(raw)
|
||||
charset = detected.get('encoding') or 'utf-8'
|
||||
return _safe(raw.decode(charset, errors='replace'))
|
||||
|
||||
|
||||
def sanitize_html(html: str) -> str:
|
||||
"""위험 태그/속성 제거 (서버 사이드 기본 처리)"""
|
||||
html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL | re.IGNORECASE)
|
||||
html = re.sub(r'on\w+="[^"]*"', '', html, flags=re.IGNORECASE)
|
||||
html = re.sub(r"on\w+='[^']*'", '', html, flags=re.IGNORECASE)
|
||||
return html
|
||||
|
||||
|
||||
def parse_message(msg: email.message.Message) -> dict:
|
||||
body_text = ""
|
||||
body_html = ""
|
||||
attachments = []
|
||||
|
||||
if msg.is_multipart():
|
||||
for part in msg.walk():
|
||||
ct = part.get_content_type()
|
||||
cd = str(part.get('Content-Disposition', ''))
|
||||
fn = part.get_filename()
|
||||
|
||||
if fn or 'attachment' in cd:
|
||||
raw = part.get_payload(decode=True) or b""
|
||||
attachments.append({
|
||||
"part_id": part.get('Content-ID', f"part_{len(attachments)}").strip('<>'),
|
||||
"filename": decode_str(fn or "unnamed"),
|
||||
"content_type": ct,
|
||||
"size": len(raw),
|
||||
"_data": raw,
|
||||
})
|
||||
elif ct == 'text/plain' and not body_text:
|
||||
body_text = decode_payload(part)
|
||||
elif ct == 'text/html' and not body_html:
|
||||
body_html = sanitize_html(decode_payload(part))
|
||||
else:
|
||||
ct = msg.get_content_type()
|
||||
if ct == 'text/html':
|
||||
body_html = sanitize_html(decode_payload(msg))
|
||||
else:
|
||||
body_text = decode_payload(msg)
|
||||
|
||||
subject = decode_str(msg.get('Subject', '(제목 없음)'))
|
||||
sender_raw = msg.get('From', '')
|
||||
sender_name, sender_addr = extract_addr(sender_raw)
|
||||
_, to_addr = extract_addr(msg.get('To', ''))
|
||||
|
||||
preview = (body_text or re.sub('<[^>]+>', '', body_html))[:100].strip()
|
||||
|
||||
return {
|
||||
"subject": subject,
|
||||
"sender": sender_name or sender_addr,
|
||||
"sender_addr": sender_addr,
|
||||
"to": decode_str(msg.get('To', '')),
|
||||
"cc": decode_str(msg.get('Cc', '')),
|
||||
"date": msg.get('Date', ''),
|
||||
"body_text": body_text,
|
||||
"body_html": body_html,
|
||||
"attachments": [
|
||||
{"part_id": a["part_id"], "filename": a["filename"],
|
||||
"content_type": a["content_type"], "size": a["size"]}
|
||||
for a in attachments
|
||||
],
|
||||
"preview": preview,
|
||||
"_attachments_data": {a["part_id"]: a["_data"] for a in attachments},
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user