"""IMAP 클라이언트: aioimaplib 기반 메일 조회""" import asyncio, ssl, email, imaplib from typing import Optional from .mail_parser import parse_message, _safe IMAP_HOST = "localhost" IMAP_PORT = 993 FOLDER_MAP = { "INBOX": "받은메함", "Sent": "보낸메함", "Sent Messages": "보낸메함", "Drafts": "임시보관함", "Draft": "임시보관함", "Trash": "휴지통", "Deleted Messages": "휴지통", "Junk": "스팸", "Spam": "스팸", } FOLDER_ORDER = ["INBOX", "Sent", "Drafts", "Trash", "Junk"] def _ssl_ctx(): ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return ctx def _imap_user(u: str) -> str: return u.split("@")[0] def _sync_connect(username: str, password: str) -> imaplib.IMAP4_SSL: """동기 imaplib 연결 (aioimaplib 파싱 문제 회피)""" M = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT, ssl_context=_ssl_ctx()) M.login(_imap_user(username), password) return M async def list_folders(username: str, password: str) -> list[dict]: import re as _re def _do(): M = _sync_connect(username, password) try: res, lines = M.list() folders = [] seen: set = set() DEFAULT = ["Sent", "Drafts", "Trash", "Junk"] for line in (lines or []): raw = line.decode('utf-8','replace') if isinstance(line, bytes) else str(line) # (\HasNoChildren) "." INBOX m = _re.match(r'\s*\([^)]*\)\s+"[./]"\s+"?([^"]+)"?\s*$', raw.strip()) if not m: continue name = m.group(1).strip().strip('"') if not name or name in seen: continue if not _re.match(r'^[\w\-. /]+$', name): continue seen.add(name) unread = total = 0 try: r2, d2 = M.status(f'"{name}"', '(UNSEEN MESSAGES)') if r2 == "OK" and d2: s = d2[0].decode('utf-8','replace') if isinstance(d2[0], bytes) else str(d2[0]) mu = _re.search(r'UNSEEN (\d+)', s) mt = _re.search(r'MESSAGES (\d+)', s) if mu: unread = int(mu.group(1)) if mt: total = int(mt.group(1)) except Exception: pass folders.append({"name": name, "display": FOLDER_MAP.get(name, name), "unread": unread, "total": total}) # 기본 폴더 없으면 생성 existing = {f["name"] for f in folders} for d in DEFAULT: if d not in existing: try: M.create(d) except Exception: pass folders.append({"name": d, "display": FOLDER_MAP[d], "unread": 0, "total": 0}) def sort_key(f): try: return FOLDER_ORDER.index(f["name"]) except ValueError: return 99 return sorted(folders, key=sort_key) finally: try: M.logout() except Exception: pass return await asyncio.get_event_loop().run_in_executor(None, _do) async def list_messages(username: str, password: str, folder: str = "INBOX", page: int = 1, per_page: int = 50, search: Optional[str] = None) -> dict: import re as _re def _do(): M = _sync_connect(username, password) try: res, _ = M.select(f'"{folder}"') if res != "OK": return {"messages": [], "total": 0, "page": page, "per_page": per_page} criteria = ['TEXT', search] if search else ['ALL'] res2, data2 = M.search(None, *criteria) if res2 != "OK" or not data2 or not data2[0]: return {"messages": [], "total": 0, "page": page, "per_page": per_page} all_ids = data2[0].split() total = len(all_ids) # 최신순 (역순) all_ids = list(reversed(all_ids)) start = (page - 1) * per_page page_ids = all_ids[start: start + per_page] if not page_ids: return {"messages": [], "total": total, "page": page, "per_page": per_page} messages = [] for uid_b in page_ids: uid = uid_b.decode() if isinstance(uid_b, bytes) else str(uid_b) try: # FLAGS rf, df = M.fetch(uid, '(FLAGS RFC822.SIZE BODY.PEEK[HEADER.FIELDS (SUBJECT FROM TO DATE)])') if rf != "OK" or not df: continue flags_str = "" header_raw = b"" size = 0 for item in df: if isinstance(item, tuple): meta = item[0].decode('utf-8','replace') if isinstance(item[0], bytes) else str(item[0]) header_raw = item[1] if isinstance(item[1], bytes) else b"" fm = _re.search(r'RFC822\.SIZE (\d+)', meta) if fm: size = int(fm.group(1)) flags_str = meta elif isinstance(item, bytes): s = item.decode('utf-8','replace') if 'FLAGS' in s: flags_str = s flags = _re.findall(r'\\(\w+)', flags_str) is_read = 'Seen' in flags parsed = {} if header_raw: try: msg = email.message_from_bytes(header_raw) parsed = parse_message(msg) except Exception: pass messages.append({ "uid": uid, "subject": _safe(parsed.get("subject") or "(제목 없음)"), "sender": _safe(parsed.get("sender") or ""), "sender_addr": _safe(parsed.get("sender_addr") or ""), "to": _safe(parsed.get("to") or ""), "date": _safe(parsed.get("date") or ""), "is_read": is_read, "has_attachment": bool(parsed.get("attachments")), "size": size, "preview": _safe(parsed.get("preview") or ""), }) except Exception: continue return {"messages": messages, "total": total, "page": page, "per_page": per_page} finally: try: M.logout() except Exception: pass return await asyncio.get_event_loop().run_in_executor(None, _do) async def get_message(username: str, password: str, uid: str, folder: str = "INBOX") -> dict: def _do(): M = _sync_connect(username, password) try: M.select(f'"{folder}"') M.store(uid, '+FLAGS', '(\\Seen)') res, data = M.fetch(uid, '(RFC822 FLAGS)') if res != "OK" or not data: raise ValueError("메일을 찾을 수 없습니다") raw = b"" flags_str = "" for item in data: if isinstance(item, tuple): raw = item[1] if isinstance(item[1], bytes) else b"" flags_str = item[0].decode('utf-8','replace') if isinstance(item[0], bytes) else "" if not raw: raise ValueError("메일 내용을 읽을 수 없습니다") import re as _re flags = _re.findall(r'\\(\w+)', flags_str) msg = email.message_from_bytes(raw) parsed = parse_message(msg) parsed["uid"] = uid parsed["is_read"] = "Seen" in flags return parsed finally: try: M.logout() except Exception: pass return await asyncio.get_event_loop().run_in_executor(None, _do) async def mark_read(username: str, password: str, uid: str, folder: str, read: bool = True): def _do(): M = _sync_connect(username, password) try: M.select(f'"{folder}"') flag = '+FLAGS' if read else '-FLAGS' M.store(uid, flag, '(\\Seen)') finally: try: M.logout() except Exception: pass await asyncio.get_event_loop().run_in_executor(None, _do) async def move_message(username: str, password: str, uid: str, src: str, dst: str): def _do(): M = _sync_connect(username, password) try: M.select(f'"{src}"') M.copy(uid, f'"{dst}"') M.store(uid, '+FLAGS', '(\\Deleted)') M.expunge() finally: try: M.logout() except Exception: pass await asyncio.get_event_loop().run_in_executor(None, _do) async def delete_message(username: str, password: str, uid: str, folder: str): trash = "Trash" if folder == trash: def _do(): M = _sync_connect(username, password) try: M.select(f'"{folder}"') M.store(uid, '+FLAGS', '(\\Deleted)') M.expunge() finally: try: M.logout() except Exception: pass await asyncio.get_event_loop().run_in_executor(None, _do) else: await move_message(username, password, uid, folder, trash) async def append_to_sent(username: str, password: str, raw_message: bytes): """발송된 메일을 Sent 폴더에 IMAP APPEND""" def _do(): M = _sync_connect(username, password) try: M.append("Sent", '(\\Seen)', None, raw_message) except Exception: pass # Sent 저장 실패는 무시 (발송은 이미 성공) finally: try: M.logout() except Exception: pass await asyncio.get_event_loop().run_in_executor(None, _do) async def get_attachment(username: str, password: str, uid: str, part_id: str, folder: str) -> tuple: def _do(): M = _sync_connect(username, password) try: M.select(f'"{folder}"') res, data = M.fetch(uid, '(RFC822)') raw = b"" for item in data: if isinstance(item, tuple): raw = item[1]; break msg = email.message_from_bytes(raw) parsed = parse_message(msg) att_data = parsed["_attachments_data"].get(part_id) for att in parsed["attachments"]: if att["part_id"] == part_id: return att_data or b"", att["content_type"], att["filename"] raise ValueError("첨부파일을 찾을 수 없습니다") finally: try: M.logout() except Exception: pass return await asyncio.get_event_loop().run_in_executor(None, _do)