diff --git a/backend/mail_parser.py b/backend/mail_parser.py new file mode 100644 index 00000000..6bc62c04 --- /dev/null +++ b/backend/mail_parser.py @@ -0,0 +1,111 @@ +"""메일 파싱: RFC2047 디코딩, 한글, 첨부파일""" +import email, email.header, email.utils +import chardet, re +from typing import Optional + + +def _safe(s: str) -> str: + """surrogate 문자 제거 → JSON 직렬화 안전""" + try: + return s.encode('utf-8', errors='replace').decode('utf-8', errors='replace') + except Exception: + return "" + + +def decode_str(raw: Optional[str]) -> str: + if not raw: + return "" + try: + parts = email.header.decode_header(raw) + result = [] + for part, charset in parts: + if isinstance(part, bytes): + cs = charset or chardet.detect(part).get('encoding') or 'utf-8' + result.append(part.decode(cs, errors='replace')) + else: + result.append(str(part)) + return _safe("".join(result).strip()) + except Exception: + return _safe(raw) if raw else "" + + +def extract_addr(raw: str) -> tuple[str, str]: + """'홍길동 ' → (name, addr)""" + name, addr = email.utils.parseaddr(decode_str(raw)) + return name or addr, addr + + +def decode_payload(part) -> str: + raw = part.get_payload(decode=True) + if not raw: + return "" + charset = part.get_content_charset() + if not charset: + detected = chardet.detect(raw) + charset = detected.get('encoding') or 'utf-8' + return _safe(raw.decode(charset, errors='replace')) + + +def sanitize_html(html: str) -> str: + """위험 태그/속성 제거 (서버 사이드 기본 처리)""" + html = re.sub(r']*>.*?', '', html, flags=re.DOTALL | re.IGNORECASE) + html = re.sub(r'on\w+="[^"]*"', '', html, flags=re.IGNORECASE) + html = re.sub(r"on\w+='[^']*'", '', html, flags=re.IGNORECASE) + return html + + +def parse_message(msg: email.message.Message) -> dict: + body_text = "" + body_html = "" + attachments = [] + + if msg.is_multipart(): + for part in msg.walk(): + ct = part.get_content_type() + cd = str(part.get('Content-Disposition', '')) + fn = part.get_filename() + + if fn or 'attachment' in cd: + raw = part.get_payload(decode=True) or b"" + attachments.append({ + "part_id": part.get('Content-ID', f"part_{len(attachments)}").strip('<>'), + "filename": decode_str(fn or "unnamed"), + "content_type": ct, + "size": len(raw), + "_data": raw, + }) + elif ct == 'text/plain' and not body_text: + body_text = decode_payload(part) + elif ct == 'text/html' and not body_html: + body_html = sanitize_html(decode_payload(part)) + else: + ct = msg.get_content_type() + if ct == 'text/html': + body_html = sanitize_html(decode_payload(msg)) + else: + body_text = decode_payload(msg) + + subject = decode_str(msg.get('Subject', '(제목 없음)')) + sender_raw = msg.get('From', '') + sender_name, sender_addr = extract_addr(sender_raw) + _, to_addr = extract_addr(msg.get('To', '')) + + preview = (body_text or re.sub('<[^>]+>', '', body_html))[:100].strip() + + return { + "subject": subject, + "sender": sender_name or sender_addr, + "sender_addr": sender_addr, + "to": decode_str(msg.get('To', '')), + "cc": decode_str(msg.get('Cc', '')), + "date": msg.get('Date', ''), + "body_text": body_text, + "body_html": body_html, + "attachments": [ + {"part_id": a["part_id"], "filename": a["filename"], + "content_type": a["content_type"], "size": a["size"]} + for a in attachments + ], + "preview": preview, + "_attachments_data": {a["part_id"]: a["_data"] for a in attachments}, + }