From e54926909ff040f734df0b20c4c1ff609e0be308 Mon Sep 17 00:00:00 2001 From: zio Date: Mon, 1 Jun 2026 22:12:53 +0900 Subject: [PATCH] feat: add backend/imap_client.py --- backend/imap_client.py | 285 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 285 insertions(+) create mode 100644 backend/imap_client.py diff --git a/backend/imap_client.py b/backend/imap_client.py new file mode 100644 index 00000000..579e3a8e --- /dev/null +++ b/backend/imap_client.py @@ -0,0 +1,285 @@ +"""IMAP 클라이언트: aioimaplib 기반 메일 조회""" +import asyncio, ssl, email, imaplib +from typing import Optional +from .mail_parser import parse_message, _safe + +IMAP_HOST = "localhost" +IMAP_PORT = 993 + +FOLDER_MAP = { + "INBOX": "받은메함", + "Sent": "보낸메함", "Sent Messages": "보낸메함", + "Drafts": "임시보관함", "Draft": "임시보관함", + "Trash": "휴지통", "Deleted Messages": "휴지통", + "Junk": "스팸", "Spam": "스팸", +} +FOLDER_ORDER = ["INBOX", "Sent", "Drafts", "Trash", "Junk"] + + +def _ssl_ctx(): + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + return ctx + + +def _imap_user(u: str) -> str: + return u.split("@")[0] + + +def _sync_connect(username: str, password: str) -> imaplib.IMAP4_SSL: + """동기 imaplib 연결 (aioimaplib 파싱 문제 회피)""" + M = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT, ssl_context=_ssl_ctx()) + M.login(_imap_user(username), password) + return M + + +async def list_folders(username: str, password: str) -> list[dict]: + import re as _re + + def _do(): + M = _sync_connect(username, password) + try: + res, lines = M.list() + folders = [] + seen: set = set() + DEFAULT = ["Sent", "Drafts", "Trash", "Junk"] + + for line in (lines or []): + raw = line.decode('utf-8','replace') if isinstance(line, bytes) else str(line) + # (\HasNoChildren) "." INBOX + m = _re.match(r'\s*\([^)]*\)\s+"[./]"\s+"?([^"]+)"?\s*$', raw.strip()) + if not m: + continue + name = m.group(1).strip().strip('"') + if not name or name in seen: + continue + if not _re.match(r'^[\w\-. /]+$', name): + continue + seen.add(name) + unread = total = 0 + try: + r2, d2 = M.status(f'"{name}"', '(UNSEEN MESSAGES)') + if r2 == "OK" and d2: + s = d2[0].decode('utf-8','replace') if isinstance(d2[0], bytes) else str(d2[0]) + mu = _re.search(r'UNSEEN (\d+)', s) + mt = _re.search(r'MESSAGES (\d+)', s) + if mu: unread = int(mu.group(1)) + if mt: total = int(mt.group(1)) + except Exception: + pass + folders.append({"name": name, "display": FOLDER_MAP.get(name, name), + "unread": unread, "total": total}) + + # 기본 폴더 없으면 생성 + existing = {f["name"] for f in folders} + for d in DEFAULT: + if d not in existing: + try: M.create(d) + except Exception: pass + folders.append({"name": d, "display": FOLDER_MAP[d], "unread": 0, "total": 0}) + + def sort_key(f): + try: return FOLDER_ORDER.index(f["name"]) + except ValueError: return 99 + return sorted(folders, key=sort_key) + finally: + try: M.logout() + except Exception: pass + + return await asyncio.get_event_loop().run_in_executor(None, _do) + + +async def list_messages(username: str, password: str, folder: str = "INBOX", + page: int = 1, per_page: int = 50, + search: Optional[str] = None) -> dict: + import re as _re + + def _do(): + M = _sync_connect(username, password) + try: + res, _ = M.select(f'"{folder}"') + if res != "OK": + return {"messages": [], "total": 0, "page": page, "per_page": per_page} + + criteria = ['TEXT', search] if search else ['ALL'] + res2, data2 = M.search(None, *criteria) + if res2 != "OK" or not data2 or not data2[0]: + return {"messages": [], "total": 0, "page": page, "per_page": per_page} + + all_ids = data2[0].split() + total = len(all_ids) + # 최신순 (역순) + all_ids = list(reversed(all_ids)) + start = (page - 1) * per_page + page_ids = all_ids[start: start + per_page] + if not page_ids: + return {"messages": [], "total": total, "page": page, "per_page": per_page} + + messages = [] + for uid_b in page_ids: + uid = uid_b.decode() if isinstance(uid_b, bytes) else str(uid_b) + try: + # FLAGS + rf, df = M.fetch(uid, '(FLAGS RFC822.SIZE BODY.PEEK[HEADER.FIELDS (SUBJECT FROM TO DATE)])') + if rf != "OK" or not df: + continue + + flags_str = "" + header_raw = b"" + size = 0 + + for item in df: + if isinstance(item, tuple): + meta = item[0].decode('utf-8','replace') if isinstance(item[0], bytes) else str(item[0]) + header_raw = item[1] if isinstance(item[1], bytes) else b"" + fm = _re.search(r'RFC822\.SIZE (\d+)', meta) + if fm: size = int(fm.group(1)) + flags_str = meta + elif isinstance(item, bytes): + s = item.decode('utf-8','replace') + if 'FLAGS' in s: flags_str = s + + flags = _re.findall(r'\\(\w+)', flags_str) + is_read = 'Seen' in flags + + parsed = {} + if header_raw: + try: + msg = email.message_from_bytes(header_raw) + parsed = parse_message(msg) + except Exception: + pass + + messages.append({ + "uid": uid, + "subject": _safe(parsed.get("subject") or "(제목 없음)"), + "sender": _safe(parsed.get("sender") or ""), + "sender_addr": _safe(parsed.get("sender_addr") or ""), + "to": _safe(parsed.get("to") or ""), + "date": _safe(parsed.get("date") or ""), + "is_read": is_read, + "has_attachment": bool(parsed.get("attachments")), + "size": size, + "preview": _safe(parsed.get("preview") or ""), + }) + except Exception: + continue + + return {"messages": messages, "total": total, "page": page, "per_page": per_page} + finally: + try: M.logout() + except Exception: pass + + return await asyncio.get_event_loop().run_in_executor(None, _do) + + +async def get_message(username: str, password: str, uid: str, folder: str = "INBOX") -> dict: + def _do(): + M = _sync_connect(username, password) + try: + M.select(f'"{folder}"') + M.store(uid, '+FLAGS', '(\\Seen)') + res, data = M.fetch(uid, '(RFC822 FLAGS)') + if res != "OK" or not data: + raise ValueError("메일을 찾을 수 없습니다") + raw = b"" + flags_str = "" + for item in data: + if isinstance(item, tuple): + raw = item[1] if isinstance(item[1], bytes) else b"" + flags_str = item[0].decode('utf-8','replace') if isinstance(item[0], bytes) else "" + if not raw: + raise ValueError("메일 내용을 읽을 수 없습니다") + import re as _re + flags = _re.findall(r'\\(\w+)', flags_str) + msg = email.message_from_bytes(raw) + parsed = parse_message(msg) + parsed["uid"] = uid + parsed["is_read"] = "Seen" in flags + return parsed + finally: + try: M.logout() + except Exception: pass + + return await asyncio.get_event_loop().run_in_executor(None, _do) + + +async def mark_read(username: str, password: str, uid: str, folder: str, read: bool = True): + def _do(): + M = _sync_connect(username, password) + try: + M.select(f'"{folder}"') + flag = '+FLAGS' if read else '-FLAGS' + M.store(uid, flag, '(\\Seen)') + finally: + try: M.logout() + except Exception: pass + await asyncio.get_event_loop().run_in_executor(None, _do) + + +async def move_message(username: str, password: str, uid: str, src: str, dst: str): + def _do(): + M = _sync_connect(username, password) + try: + M.select(f'"{src}"') + M.copy(uid, f'"{dst}"') + M.store(uid, '+FLAGS', '(\\Deleted)') + M.expunge() + finally: + try: M.logout() + except Exception: pass + await asyncio.get_event_loop().run_in_executor(None, _do) + + +async def delete_message(username: str, password: str, uid: str, folder: str): + trash = "Trash" + if folder == trash: + def _do(): + M = _sync_connect(username, password) + try: + M.select(f'"{folder}"') + M.store(uid, '+FLAGS', '(\\Deleted)') + M.expunge() + finally: + try: M.logout() + except Exception: pass + await asyncio.get_event_loop().run_in_executor(None, _do) + else: + await move_message(username, password, uid, folder, trash) + + +async def append_to_sent(username: str, password: str, raw_message: bytes): + """발송된 메일을 Sent 폴더에 IMAP APPEND""" + def _do(): + M = _sync_connect(username, password) + try: + M.append("Sent", '(\\Seen)', None, raw_message) + except Exception: + pass # Sent 저장 실패는 무시 (발송은 이미 성공) + finally: + try: M.logout() + except Exception: pass + await asyncio.get_event_loop().run_in_executor(None, _do) + + +async def get_attachment(username: str, password: str, uid: str, part_id: str, folder: str) -> tuple: + def _do(): + M = _sync_connect(username, password) + try: + M.select(f'"{folder}"') + res, data = M.fetch(uid, '(RFC822)') + raw = b"" + for item in data: + if isinstance(item, tuple): raw = item[1]; break + msg = email.message_from_bytes(raw) + parsed = parse_message(msg) + att_data = parsed["_attachments_data"].get(part_id) + for att in parsed["attachments"]: + if att["part_id"] == part_id: + return att_data or b"", att["content_type"], att["filename"] + raise ValueError("첨부파일을 찾을 수 없습니다") + finally: + try: M.logout() + except Exception: pass + return await asyncio.get_event_loop().run_in_executor(None, _do)