zioinfo-mail/.claude/skills/zioinfo-mail-orchestrator/references/backend-template.py
DESKTOP-TKLFCPR\ython 60be2f9375 feat(harness): zioinfo-mail webmail harness — backend/frontend/infra agents + orchestrator
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-01 21:19:51 +09:00

145 lines
6.1 KiB
Python

"""
zioinfo-mail FastAPI 백엔드 템플릿
mail-backend-dev 에이전트가 이 파일을 기반으로 확장 구현한다.
"""
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional
import asyncio, aioimaplib, aiosmtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
import email.header, chardet, ssl
from jose import jwt
from cryptography.fernet import Fernet
import os, json, base64, hashlib
from datetime import datetime, timedelta
app = FastAPI(title="zioinfo-mail API", version="1.0.0")
app.add_middleware(CORSMiddleware,
allow_origins=["https://mail.zioinfo.co.kr", "http://localhost:5173"],
allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
# ── 설정 ────────────────────────────────────────────────────
IMAP_HOST = "localhost"; IMAP_PORT = 993
SMTP_HOST = "localhost"; SMTP_PORT = 587
JWT_SECRET = os.getenv("MAIL_JWT_SECRET", "change-me-in-production")
JWT_EXPIRE_HOURS = 8
# IMAP 자격증명 암호화 키 (32바이트 → Fernet)
FERNET_KEY = os.getenv("MAIL_FERNET_KEY",
base64.urlsafe_b64encode(hashlib.sha256(JWT_SECRET.encode()).digest()))
fernet = Fernet(FERNET_KEY)
# ── 모델 ────────────────────────────────────────────────────
class LoginRequest(BaseModel):
username: str # user@zioinfo.co.kr
password: str
class SendRequest(BaseModel):
to: str; cc: Optional[str] = None; bcc: Optional[str] = None
subject: str; body: str; html: bool = False
reply_to_uid: Optional[str] = None
# ── 인증 ────────────────────────────────────────────────────
async def verify_imap(username: str, password: str) -> bool:
"""IMAP 로그인으로 자격증명 검증"""
try:
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
imap = aioimaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT, ssl_context=ssl_ctx)
await imap.wait_hello_from_server()
res, _ = await imap.login(username, password)
await imap.logout()
return res == "OK"
except Exception:
return False
def create_token(username: str, password: str) -> str:
enc_pw = fernet.encrypt(password.encode()).decode()
payload = {
"sub": username,
"pw": enc_pw,
"exp": datetime.utcnow() + timedelta(hours=JWT_EXPIRE_HOURS)
}
return jwt.encode(payload, JWT_SECRET, algorithm="HS256")
def get_credentials(token: str = Depends(lambda: None)) -> tuple[str, str]:
from fastapi import Header
# FastAPI security dependency - 실제 구현에서 Bearer 헤더에서 추출
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
username = payload["sub"]
password = fernet.decrypt(payload["pw"].encode()).decode()
return username, password
except Exception:
raise HTTPException(status.HTTP_401_UNAUTHORIZED)
# ── 엔드포인트 ───────────────────────────────────────────────
@app.get("/health")
async def health():
return {"status": "ok", "service": "zioinfo-mail"}
@app.post("/auth/login")
async def login(req: LoginRequest):
if not await verify_imap(req.username, req.password):
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "인증 실패")
token = create_token(req.username, req.password)
return {"access_token": token, "token_type": "bearer", "username": req.username}
# ── 메일 파싱 유틸 ───────────────────────────────────────────
def decode_header_str(raw: str) -> str:
"""RFC2047 인코딩된 헤더 디코딩 (한글 포함)"""
parts = email.header.decode_header(raw or "")
result = []
for part, charset in parts:
if isinstance(part, bytes):
charset = charset or chardet.detect(part).get('encoding', 'utf-8') or 'utf-8'
result.append(part.decode(charset, errors='replace'))
else:
result.append(part)
return "".join(result)
def parse_message(msg) -> dict:
"""email.message.Message → dict"""
body_text = body_html = ""
attachments = []
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
cd = str(part.get('Content-Disposition', ''))
if ct == 'text/plain' and 'attachment' not in cd:
body_text = _decode_payload(part)
elif ct == 'text/html' and 'attachment' not in cd:
body_html = _decode_payload(part)
elif 'attachment' in cd or part.get_filename():
attachments.append({
"filename": decode_header_str(part.get_filename() or "unnamed"),
"content_type": ct,
"size": len(part.get_payload(decode=True) or b""),
})
else:
ct = msg.get_content_type()
if ct == 'text/html':
body_html = _decode_payload(msg)
else:
body_text = _decode_payload(msg)
return {
"subject": decode_header_str(msg.get("Subject", "")),
"from": decode_header_str(msg.get("From", "")),
"to": decode_header_str(msg.get("To", "")),
"cc": decode_header_str(msg.get("Cc", "")),
"date": msg.get("Date", ""),
"body_text": body_text,
"body_html": body_html,
"attachments": attachments,
}
def _decode_payload(part) -> str:
raw = part.get_payload(decode=True) or b""
charset = part.get_content_charset() or chardet.detect(raw).get('encoding', 'utf-8') or 'utf-8'
return raw.decode(charset, errors='replace')