fix(zioinfo-mail): fix inbox/sent parsing + surrogate encoding + Sent IMAP APPEND

- mail_parser: _safe() surrogate 문자 제거 → JSON 직렬화 오류 수정
- imap_client: aioimaplib → 동기 imaplib으로 전환 (파싱 안정성)
- smtp_client: aiosmtplib → 동기 smtplib으로 전환 + raw bytes 반환
- main.py: 발송 후 append_to_sent() → Sent 폴더 자동 저장
- MailList: Sent 폴더에서 받는사람 표시 (→ info@...)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
DESKTOP-TKLFCPR\ython 2026-06-01 21:50:47 +09:00
parent d6a251e489
commit 3d5a125b04
5 changed files with 260 additions and 216 deletions

View File

@ -1,7 +1,7 @@
"""IMAP 클라이언트: aioimaplib 기반 메일 조회"""
import asyncio, ssl, email, aioimaplib
import asyncio, ssl, email, imaplib
from typing import Optional
from .mail_parser import parse_message
from .mail_parser import parse_message, _safe
IMAP_HOST = "localhost"
IMAP_PORT = 993
@ -27,236 +27,259 @@ def _imap_user(u: str) -> str:
return u.split("@")[0]
async def _connect(username: str, password: str) -> aioimaplib.IMAP4_SSL:
imap = aioimaplib.IMAP4_SSL(host=IMAP_HOST, port=IMAP_PORT, ssl_context=_ssl_ctx())
await imap.wait_hello_from_server()
res, _ = await imap.login(_imap_user(username), password)
if res != "OK":
raise ValueError("IMAP 인증 실패")
return imap
def _sync_connect(username: str, password: str) -> imaplib.IMAP4_SSL:
"""동기 imaplib 연결 (aioimaplib 파싱 문제 회피)"""
M = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT, ssl_context=_ssl_ctx())
M.login(_imap_user(username), password)
return M
async def list_folders(username: str, password: str) -> list[dict]:
import re as _re
imap = await _connect(username, password)
try:
res, lines = await imap.list('""', '*')
folders = []
seen: set = set()
for line in lines:
if not line:
continue
raw = line.decode('utf-8', 'replace') if isinstance(line, bytes) else str(line)
# IMAP LIST 형식: (\Flags) "sep" FolderName
m = _re.match(r'\s*\([^)]*\)\s+"[^"]+"\s+"?([^"\s]+)"?\s*$', raw.strip())
if not m:
# 대안: 마지막 따옴표 없는 토큰
m2 = _re.search(r'\)\s+"\."\s+(\S+)$', raw)
name = m2.group(1).strip() if m2 else ''
else:
name = m.group(1).strip()
# 유효하지 않은 이름 필터
if not name or name in seen:
continue
if not _re.match(r'^[\w\-. /]+$', name):
continue
seen.add(name)
def _do():
M = _sync_connect(username, password)
try:
res, lines = M.list()
folders = []
seen: set = set()
DEFAULT = ["Sent", "Drafts", "Trash", "Junk"]
unread = total = 0
try:
r2, data = await imap.status(f'"{name}"', '(UNSEEN MESSAGES)')
if r2 == "OK" and data:
s = data[0].decode('utf-8','replace') if isinstance(data[0], bytes) else str(data[0])
mu = _re.search(r'UNSEEN (\d+)', s)
mt = _re.search(r'MESSAGES (\d+)', s)
if mu: unread = int(mu.group(1))
if mt: total = int(mt.group(1))
except Exception:
pass
folders.append({
"name": name,
"display": FOLDER_MAP.get(name, name),
"unread": unread,
"total": total,
})
# 기본 폴더가 없으면 생성 후 목록에 추가
existing_names = {f["name"] for f in folders}
for default in ["Sent", "Drafts", "Trash", "Junk"]:
if default not in existing_names:
for line in (lines or []):
raw = line.decode('utf-8','replace') if isinstance(line, bytes) else str(line)
# (\HasNoChildren) "." INBOX
m = _re.match(r'\s*\([^)]*\)\s+"[./]"\s+"?([^"]+)"?\s*$', raw.strip())
if not m:
continue
name = m.group(1).strip().strip('"')
if not name or name in seen:
continue
if not _re.match(r'^[\w\-. /]+$', name):
continue
seen.add(name)
unread = total = 0
try:
await imap.create(f'"{default}"')
r2, d2 = M.status(f'"{name}"', '(UNSEEN MESSAGES)')
if r2 == "OK" and d2:
s = d2[0].decode('utf-8','replace') if isinstance(d2[0], bytes) else str(d2[0])
mu = _re.search(r'UNSEEN (\d+)', s)
mt = _re.search(r'MESSAGES (\d+)', s)
if mu: unread = int(mu.group(1))
if mt: total = int(mt.group(1))
except Exception:
pass
folders.append({"name": default, "display": FOLDER_MAP[default], "unread": 0, "total": 0})
folders.append({"name": name, "display": FOLDER_MAP.get(name, name),
"unread": unread, "total": total})
def sort_key(f):
try: return FOLDER_ORDER.index(f["name"])
except ValueError: return 99
# 기본 폴더 없으면 생성
existing = {f["name"] for f in folders}
for d in DEFAULT:
if d not in existing:
try: M.create(d)
except Exception: pass
folders.append({"name": d, "display": FOLDER_MAP[d], "unread": 0, "total": 0})
return sorted(folders, key=sort_key)
finally:
try: await imap.logout()
except Exception: pass
def sort_key(f):
try: return FOLDER_ORDER.index(f["name"])
except ValueError: return 99
return sorted(folders, key=sort_key)
finally:
try: M.logout()
except Exception: pass
return await asyncio.get_event_loop().run_in_executor(None, _do)
async def list_messages(username: str, password: str, folder: str = "INBOX",
page: int = 1, per_page: int = 50,
search: Optional[str] = None) -> dict:
imap = await _connect(username, password)
try:
res, _ = await imap.select(f'"{folder}"')
if res != "OK":
return {"messages": [], "total": 0, "page": page, "per_page": per_page}
import re as _re
criteria = f'TEXT "{search}"' if search else "ALL"
res, data = await imap.search(criteria)
if res != "OK" or not data or not data[0]:
return {"messages": [], "total": 0, "page": page, "per_page": per_page}
def _do():
M = _sync_connect(username, password)
try:
res, _ = M.select(f'"{folder}"')
if res != "OK":
return {"messages": [], "total": 0, "page": page, "per_page": per_page}
raw = data[0].decode() if isinstance(data[0], bytes) else str(data[0])
uids = [u for u in raw.split() if u.strip()]
total = len(uids)
# 최신순 정렬
uids = list(reversed(uids))
start = (page - 1) * per_page
page_uids = uids[start: start + per_page]
if not page_uids:
return {"messages": [], "total": total, "page": page, "per_page": per_page}
criteria = ['TEXT', search] if search else ['ALL']
res2, data2 = M.search(None, *criteria)
if res2 != "OK" or not data2 or not data2[0]:
return {"messages": [], "total": 0, "page": page, "per_page": per_page}
uid_set = ",".join(page_uids)
res, fetch_data = await imap.fetch(uid_set,
"(UID FLAGS BODY.PEEK[HEADER.FIELDS (SUBJECT FROM TO DATE)] RFC822.SIZE)")
all_ids = data2[0].split()
total = len(all_ids)
# 최신순 (역순)
all_ids = list(reversed(all_ids))
start = (page - 1) * per_page
page_ids = all_ids[start: start + per_page]
if not page_ids:
return {"messages": [], "total": total, "page": page, "per_page": per_page}
messages = []
i = 0
while i < len(fetch_data):
line = fetch_data[i]
if not isinstance(line, (bytes, str)) or not str(line).strip():
i += 1; continue
s = line.decode() if isinstance(line, bytes) else str(line)
if 'FETCH' not in s and s.strip() not in (')', ''):
i += 1; continue
# UID
import re
uid_m = re.search(r'UID (\d+)', s)
uid = uid_m.group(1) if uid_m else page_uids[len(messages)] if len(messages) < len(page_uids) else "?"
flags = re.findall(r'\\(\w+)', s)
is_read = 'Seen' in flags
size_m = re.search(r'RFC822\.SIZE (\d+)', s)
size = int(size_m.group(1)) if size_m else 0
messages = []
for uid_b in page_ids:
uid = uid_b.decode() if isinstance(uid_b, bytes) else str(uid_b)
try:
# FLAGS
rf, df = M.fetch(uid, '(FLAGS RFC822.SIZE BODY.PEEK[HEADER.FIELDS (SUBJECT FROM TO DATE)])')
if rf != "OK" or not df:
continue
# 헤더
header_raw = b""
if i + 1 < len(fetch_data) and isinstance(fetch_data[i + 1], bytes):
header_raw = fetch_data[i + 1]; i += 1
flags_str = ""
header_raw = b""
size = 0
msg = email.message_from_bytes(header_raw) if header_raw else None
parsed = parse_message(msg) if msg else {}
for item in df:
if isinstance(item, tuple):
meta = item[0].decode('utf-8','replace') if isinstance(item[0], bytes) else str(item[0])
header_raw = item[1] if isinstance(item[1], bytes) else b""
fm = _re.search(r'RFC822\.SIZE (\d+)', meta)
if fm: size = int(fm.group(1))
flags_str = meta
elif isinstance(item, bytes):
s = item.decode('utf-8','replace')
if 'FLAGS' in s: flags_str = s
messages.append({
"uid": uid,
"subject": parsed.get("subject", "(제목 없음)"),
"sender": parsed.get("sender", ""),
"sender_addr": parsed.get("sender_addr", ""),
"date": parsed.get("date", ""),
"is_read": is_read,
"has_attachment": False,
"size": size,
"preview": parsed.get("preview", ""),
})
i += 1
flags = _re.findall(r'\\(\w+)', flags_str)
is_read = 'Seen' in flags
return {"messages": messages, "total": total, "page": page, "per_page": per_page}
finally:
try: await imap.logout()
except Exception: pass
parsed = {}
if header_raw:
try:
msg = email.message_from_bytes(header_raw)
parsed = parse_message(msg)
except Exception:
pass
messages.append({
"uid": uid,
"subject": _safe(parsed.get("subject") or "(제목 없음)"),
"sender": _safe(parsed.get("sender") or ""),
"sender_addr": _safe(parsed.get("sender_addr") or ""),
"to": _safe(parsed.get("to") or ""),
"date": _safe(parsed.get("date") or ""),
"is_read": is_read,
"has_attachment": bool(parsed.get("attachments")),
"size": size,
"preview": _safe(parsed.get("preview") or ""),
})
except Exception:
continue
return {"messages": messages, "total": total, "page": page, "per_page": per_page}
finally:
try: M.logout()
except Exception: pass
return await asyncio.get_event_loop().run_in_executor(None, _do)
async def get_message(username: str, password: str, uid: str, folder: str = "INBOX") -> dict:
imap = await _connect(username, password)
try:
await imap.select(f'"{folder}"')
# 읽음 처리
await imap.store(uid, '+FLAGS', '(\\Seen)')
res, data = await imap.fetch(uid, "(UID FLAGS RFC822)")
if res != "OK" or not data:
raise ValueError("메일을 찾을 수 없습니다")
raw = b""
for item in data:
if isinstance(item, bytes) and len(item) > 200:
raw = item; break
msg = email.message_from_bytes(raw)
parsed = parse_message(msg)
import re
flags_line = str(data[0]) if data else ""
flags = re.findall(r'\\(\w+)', flags_line)
parsed["uid"] = uid
parsed["is_read"] = "Seen" in flags
return parsed
finally:
try: await imap.logout()
except Exception: pass
def _do():
M = _sync_connect(username, password)
try:
M.select(f'"{folder}"')
M.store(uid, '+FLAGS', '(\\Seen)')
res, data = M.fetch(uid, '(RFC822 FLAGS)')
if res != "OK" or not data:
raise ValueError("메일을 찾을 수 없습니다")
raw = b""
flags_str = ""
for item in data:
if isinstance(item, tuple):
raw = item[1] if isinstance(item[1], bytes) else b""
flags_str = item[0].decode('utf-8','replace') if isinstance(item[0], bytes) else ""
if not raw:
raise ValueError("메일 내용을 읽을 수 없습니다")
import re as _re
flags = _re.findall(r'\\(\w+)', flags_str)
msg = email.message_from_bytes(raw)
parsed = parse_message(msg)
parsed["uid"] = uid
parsed["is_read"] = "Seen" in flags
return parsed
finally:
try: M.logout()
except Exception: pass
return await asyncio.get_event_loop().run_in_executor(None, _do)
async def mark_read(username: str, password: str, uid: str, folder: str, read: bool = True):
imap = await _connect(username, password)
try:
await imap.select(f'"{folder}"')
flag = '+FLAGS' if read else '-FLAGS'
await imap.store(uid, flag, '(\\Seen)')
finally:
try: await imap.logout()
except Exception: pass
def _do():
M = _sync_connect(username, password)
try:
M.select(f'"{folder}"')
flag = '+FLAGS' if read else '-FLAGS'
M.store(uid, flag, '(\\Seen)')
finally:
try: M.logout()
except Exception: pass
await asyncio.get_event_loop().run_in_executor(None, _do)
async def move_message(username: str, password: str, uid: str, src: str, dst: str):
imap = await _connect(username, password)
try:
await imap.select(f'"{src}"')
await imap.copy(uid, f'"{dst}"')
await imap.store(uid, '+FLAGS', '(\\Deleted)')
await imap.expunge()
finally:
try: await imap.logout()
except Exception: pass
def _do():
M = _sync_connect(username, password)
try:
M.select(f'"{src}"')
M.copy(uid, f'"{dst}"')
M.store(uid, '+FLAGS', '(\\Deleted)')
M.expunge()
finally:
try: M.logout()
except Exception: pass
await asyncio.get_event_loop().run_in_executor(None, _do)
async def delete_message(username: str, password: str, uid: str, folder: str):
trash = "Trash"
if folder == trash:
# 영구 삭제
imap = await _connect(username, password)
try:
await imap.select(f'"{folder}"')
await imap.store(uid, '+FLAGS', '(\\Deleted)')
await imap.expunge()
finally:
try: await imap.logout()
except Exception: pass
def _do():
M = _sync_connect(username, password)
try:
M.select(f'"{folder}"')
M.store(uid, '+FLAGS', '(\\Deleted)')
M.expunge()
finally:
try: M.logout()
except Exception: pass
await asyncio.get_event_loop().run_in_executor(None, _do)
else:
await move_message(username, password, uid, folder, trash)
async def get_attachment(username: str, password: str, uid: str, part_id: str, folder: str) -> tuple[bytes, str, str]:
imap = await _connect(username, password)
try:
await imap.select(f'"{folder}"')
res, data = await imap.fetch(uid, "(RFC822)")
raw = b""
for item in data:
if isinstance(item, bytes) and len(item) > 200:
raw = item; break
msg = email.message_from_bytes(raw)
parsed = parse_message(msg)
att_data = parsed["_attachments_data"].get(part_id)
# 첨부파일 정보
for att in parsed["attachments"]:
if att["part_id"] == part_id:
return att_data or b"", att["content_type"], att["filename"]
raise ValueError("첨부파일을 찾을 수 없습니다")
finally:
try: await imap.logout()
except Exception: pass
async def append_to_sent(username: str, password: str, raw_message: bytes):
"""발송된 메일을 Sent 폴더에 IMAP APPEND"""
def _do():
M = _sync_connect(username, password)
try:
M.append("Sent", '(\\Seen)', None, raw_message)
except Exception:
pass # Sent 저장 실패는 무시 (발송은 이미 성공)
finally:
try: M.logout()
except Exception: pass
await asyncio.get_event_loop().run_in_executor(None, _do)
async def get_attachment(username: str, password: str, uid: str, part_id: str, folder: str) -> tuple:
def _do():
M = _sync_connect(username, password)
try:
M.select(f'"{folder}"')
res, data = M.fetch(uid, '(RFC822)')
raw = b""
for item in data:
if isinstance(item, tuple): raw = item[1]; break
msg = email.message_from_bytes(raw)
parsed = parse_message(msg)
att_data = parsed["_attachments_data"].get(part_id)
for att in parsed["attachments"]:
if att["part_id"] == part_id:
return att_data or b"", att["content_type"], att["filename"]
raise ValueError("첨부파일을 찾을 수 없습니다")
finally:
try: M.logout()
except Exception: pass
return await asyncio.get_event_loop().run_in_executor(None, _do)

View File

@ -4,6 +4,14 @@ import chardet, re
from typing import Optional
def _safe(s: str) -> str:
"""surrogate 문자 제거 → JSON 직렬화 안전"""
try:
return s.encode('utf-8', errors='replace').decode('utf-8', errors='replace')
except Exception:
return ""
def decode_str(raw: Optional[str]) -> str:
if not raw:
return ""
@ -16,9 +24,9 @@ def decode_str(raw: Optional[str]) -> str:
result.append(part.decode(cs, errors='replace'))
else:
result.append(str(part))
return "".join(result).strip()
return _safe("".join(result).strip())
except Exception:
return raw or ""
return _safe(raw) if raw else ""
def extract_addr(raw: str) -> tuple[str, str]:
@ -35,7 +43,7 @@ def decode_payload(part) -> str:
if not charset:
detected = chardet.detect(raw)
charset = detected.get('encoding') or 'utf-8'
return raw.decode(charset, errors='replace')
return _safe(raw.decode(charset, errors='replace'))
def sanitize_html(html: str) -> str:

View File

@ -6,7 +6,7 @@ import io
from .auth import verify_imap, create_token, current_user
from .imap_client import (
list_folders, list_messages, get_message,
mark_read, move_message, delete_message, get_attachment,
mark_read, move_message, delete_message, get_attachment, append_to_sent,
)
from .smtp_client import send_mail
from .models import (
@ -138,11 +138,13 @@ async def attachment(
@app.post("/api/mail/send")
async def send(req: SendRequest, user=Depends(current_user)):
username, password = user
await send_mail(
raw_bytes = await send_mail(
username, password,
req.to, req.subject, req.body,
req.cc, req.bcc, req.is_html,
)
# 발송 성공 후 Sent 폴더에 저장
await append_to_sent(username, password, raw_bytes)
return {"ok": True}

View File

@ -1,9 +1,10 @@
"""SMTP 발송: aiosmtplib + STARTTLS"""
import aiosmtplib, ssl
"""SMTP 발송: smtplib STARTTLS + Sent 폴더 자동 저장"""
import smtplib, ssl, asyncio
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
from email.utils import formatdate
from typing import Optional
SMTP_HOST = "localhost"
@ -16,17 +17,16 @@ async def send_mail(
cc: Optional[str] = None, bcc: Optional[str] = None,
is_html: bool = False,
attachments: Optional[list] = None,
) -> bool:
) -> bytes:
"""발송 성공 시 raw 메시지 바이트 반환 (Sent 폴더 저장용)"""
msg = MIMEMultipart("alternative" if is_html else "mixed")
msg["From"] = username
msg["To"] = to
msg["Subject"] = subject
msg["Date"] = formatdate(localtime=True)
if cc: msg["Cc"] = cc
if is_html:
msg.attach(MIMEText(body, "html", "utf-8"))
else:
msg.attach(MIMEText(body, "plain", "utf-8"))
msg.attach(MIMEText(body, "html" if is_html else "plain", "utf-8"))
if attachments:
for name, data, ctype in attachments:
@ -40,14 +40,20 @@ async def send_mail(
if cc: recipients += [r.strip() for r in cc.split(",")]
if bcc: recipients += [r.strip() for r in bcc.split(",")]
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
raw_bytes = msg.as_bytes()
user_short = username.split("@")[0]
smtp = aiosmtplib.SMTP(hostname=SMTP_HOST, port=SMTP_PORT,
start_tls=True, tls_context=ssl_ctx)
await smtp.connect()
await smtp.login(username.split("@")[0], password) # Postfix SASL: username only
await smtp.sendmail(username, recipients, msg.as_string())
await smtp.quit()
return True
def _do():
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
s = smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=15)
s.ehlo()
s.starttls(context=ctx)
s.ehlo()
s.login(user_short, password)
s.sendmail(username, recipients, raw_bytes)
s.quit()
await asyncio.get_event_loop().run_in_executor(None, _do)
return raw_bytes

View File

@ -24,6 +24,9 @@ export default function MailList({ onRefresh }: Props) {
selectedMail, setSelectedMail, setLoading, setMessages,
markRead, removeMessage } = useMailStore()
// Sent 폴더에서는 받는사람 표시
const isSent = currentFolder === 'Sent'
const select = async (uid: string) => {
setLoading(true)
try {
@ -76,7 +79,9 @@ export default function MailList({ onRefresh }: Props) {
onClick={() => select(m.uid)}
>
<div className="mail-item-top">
<span className="mail-sender">{m.sender || m.sender_addr}</span>
<span className="mail-sender">
{isSent ? `${(m as any).to || m.sender_addr}` : (m.sender || m.sender_addr)}
</span>
<span className="mail-date">{fmtDate(m.date)}</span>
</div>
<div className="mail-item-subject">{m.subject}</div>