"""메일 파싱: RFC2047 디코딩, 한글, 첨부파일""" import email, email.header, email.utils import chardet, re from typing import Optional def _safe(s: str) -> str: """surrogate 문자 제거 → JSON 직렬화 안전""" try: return s.encode('utf-8', errors='replace').decode('utf-8', errors='replace') except Exception: return "" def decode_str(raw: Optional[str]) -> str: if not raw: return "" try: parts = email.header.decode_header(raw) result = [] for part, charset in parts: if isinstance(part, bytes): cs = charset or chardet.detect(part).get('encoding') or 'utf-8' result.append(part.decode(cs, errors='replace')) else: result.append(str(part)) return _safe("".join(result).strip()) except Exception: return _safe(raw) if raw else "" def extract_addr(raw: str) -> tuple[str, str]: """'홍길동 ' → (name, addr)""" name, addr = email.utils.parseaddr(decode_str(raw)) return name or addr, addr def decode_payload(part) -> str: raw = part.get_payload(decode=True) if not raw: return "" charset = part.get_content_charset() if not charset: detected = chardet.detect(raw) charset = detected.get('encoding') or 'utf-8' return _safe(raw.decode(charset, errors='replace')) def sanitize_html(html: str) -> str: """위험 태그/속성 제거 (서버 사이드 기본 처리)""" html = re.sub(r']*>.*?', '', html, flags=re.DOTALL | re.IGNORECASE) html = re.sub(r'on\w+="[^"]*"', '', html, flags=re.IGNORECASE) html = re.sub(r"on\w+='[^']*'", '', html, flags=re.IGNORECASE) return html def parse_message(msg: email.message.Message) -> dict: body_text = "" body_html = "" attachments = [] if msg.is_multipart(): for part in msg.walk(): ct = part.get_content_type() cd = str(part.get('Content-Disposition', '')) fn = part.get_filename() if fn or 'attachment' in cd: raw = part.get_payload(decode=True) or b"" attachments.append({ "part_id": part.get('Content-ID', f"part_{len(attachments)}").strip('<>'), "filename": decode_str(fn or "unnamed"), "content_type": ct, "size": len(raw), "_data": raw, }) elif ct == 'text/plain' and not body_text: body_text = decode_payload(part) elif ct == 'text/html' and not body_html: body_html = sanitize_html(decode_payload(part)) else: ct = msg.get_content_type() if ct == 'text/html': body_html = sanitize_html(decode_payload(msg)) else: body_text = decode_payload(msg) subject = decode_str(msg.get('Subject', '(제목 없음)')) sender_raw = msg.get('From', '') sender_name, sender_addr = extract_addr(sender_raw) _, to_addr = extract_addr(msg.get('To', '')) preview = (body_text or re.sub('<[^>]+>', '', body_html))[:100].strip() return { "subject": subject, "sender": sender_name or sender_addr, "sender_addr": sender_addr, "to": decode_str(msg.get('To', '')), "cc": decode_str(msg.get('Cc', '')), "date": msg.get('Date', ''), "body_text": body_text, "body_html": body_html, "attachments": [ {"part_id": a["part_id"], "filename": a["filename"], "content_type": a["content_type"], "size": a["size"]} for a in attachments ], "preview": preview, "_attachments_data": {a["part_id"]: a["_data"] for a in attachments}, }