145 lines
6.1 KiB
Python
145 lines
6.1 KiB
Python
"""
|
|
zioinfo-mail FastAPI 백엔드 템플릿
|
|
mail-backend-dev 에이전트가 이 파일을 기반으로 확장 구현한다.
|
|
"""
|
|
from fastapi import FastAPI, Depends, HTTPException, status
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import StreamingResponse
|
|
from pydantic import BaseModel
|
|
from typing import Optional
|
|
import asyncio, aioimaplib, aiosmtplib
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.mime.base import MIMEBase
|
|
from email import encoders
|
|
import email.header, chardet, ssl
|
|
from jose import jwt
|
|
from cryptography.fernet import Fernet
|
|
import os, json, base64, hashlib
|
|
from datetime import datetime, timedelta
|
|
|
|
app = FastAPI(title="zioinfo-mail API", version="1.0.0")
|
|
app.add_middleware(CORSMiddleware,
|
|
allow_origins=["https://mail.zioinfo.co.kr", "http://localhost:5173"],
|
|
allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
|
|
|
|
# ── 설정 ────────────────────────────────────────────────────
|
|
IMAP_HOST = "localhost"; IMAP_PORT = 993
|
|
SMTP_HOST = "localhost"; SMTP_PORT = 587
|
|
JWT_SECRET = os.getenv("MAIL_JWT_SECRET", "change-me-in-production")
|
|
JWT_EXPIRE_HOURS = 8
|
|
# IMAP 자격증명 암호화 키 (32바이트 → Fernet)
|
|
FERNET_KEY = os.getenv("MAIL_FERNET_KEY",
|
|
base64.urlsafe_b64encode(hashlib.sha256(JWT_SECRET.encode()).digest()))
|
|
fernet = Fernet(FERNET_KEY)
|
|
|
|
# ── 모델 ────────────────────────────────────────────────────
|
|
class LoginRequest(BaseModel):
|
|
username: str # user@zioinfo.co.kr
|
|
password: str
|
|
|
|
class SendRequest(BaseModel):
|
|
to: str; cc: Optional[str] = None; bcc: Optional[str] = None
|
|
subject: str; body: str; html: bool = False
|
|
reply_to_uid: Optional[str] = None
|
|
|
|
# ── 인증 ────────────────────────────────────────────────────
|
|
async def verify_imap(username: str, password: str) -> bool:
|
|
"""IMAP 로그인으로 자격증명 검증"""
|
|
try:
|
|
ssl_ctx = ssl.create_default_context()
|
|
ssl_ctx.check_hostname = False
|
|
ssl_ctx.verify_mode = ssl.CERT_NONE
|
|
imap = aioimaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT, ssl_context=ssl_ctx)
|
|
await imap.wait_hello_from_server()
|
|
res, _ = await imap.login(username, password)
|
|
await imap.logout()
|
|
return res == "OK"
|
|
except Exception:
|
|
return False
|
|
|
|
def create_token(username: str, password: str) -> str:
|
|
enc_pw = fernet.encrypt(password.encode()).decode()
|
|
payload = {
|
|
"sub": username,
|
|
"pw": enc_pw,
|
|
"exp": datetime.utcnow() + timedelta(hours=JWT_EXPIRE_HOURS)
|
|
}
|
|
return jwt.encode(payload, JWT_SECRET, algorithm="HS256")
|
|
|
|
def get_credentials(token: str = Depends(lambda: None)) -> tuple[str, str]:
|
|
from fastapi import Header
|
|
# FastAPI security dependency - 실제 구현에서 Bearer 헤더에서 추출
|
|
try:
|
|
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
|
|
username = payload["sub"]
|
|
password = fernet.decrypt(payload["pw"].encode()).decode()
|
|
return username, password
|
|
except Exception:
|
|
raise HTTPException(status.HTTP_401_UNAUTHORIZED)
|
|
|
|
# ── 엔드포인트 ───────────────────────────────────────────────
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"status": "ok", "service": "zioinfo-mail"}
|
|
|
|
@app.post("/auth/login")
|
|
async def login(req: LoginRequest):
|
|
if not await verify_imap(req.username, req.password):
|
|
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "인증 실패")
|
|
token = create_token(req.username, req.password)
|
|
return {"access_token": token, "token_type": "bearer", "username": req.username}
|
|
|
|
# ── 메일 파싱 유틸 ───────────────────────────────────────────
|
|
def decode_header_str(raw: str) -> str:
|
|
"""RFC2047 인코딩된 헤더 디코딩 (한글 포함)"""
|
|
parts = email.header.decode_header(raw or "")
|
|
result = []
|
|
for part, charset in parts:
|
|
if isinstance(part, bytes):
|
|
charset = charset or chardet.detect(part).get('encoding', 'utf-8') or 'utf-8'
|
|
result.append(part.decode(charset, errors='replace'))
|
|
else:
|
|
result.append(part)
|
|
return "".join(result)
|
|
|
|
def parse_message(msg) -> dict:
|
|
"""email.message.Message → dict"""
|
|
body_text = body_html = ""
|
|
attachments = []
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
ct = part.get_content_type()
|
|
cd = str(part.get('Content-Disposition', ''))
|
|
if ct == 'text/plain' and 'attachment' not in cd:
|
|
body_text = _decode_payload(part)
|
|
elif ct == 'text/html' and 'attachment' not in cd:
|
|
body_html = _decode_payload(part)
|
|
elif 'attachment' in cd or part.get_filename():
|
|
attachments.append({
|
|
"filename": decode_header_str(part.get_filename() or "unnamed"),
|
|
"content_type": ct,
|
|
"size": len(part.get_payload(decode=True) or b""),
|
|
})
|
|
else:
|
|
ct = msg.get_content_type()
|
|
if ct == 'text/html':
|
|
body_html = _decode_payload(msg)
|
|
else:
|
|
body_text = _decode_payload(msg)
|
|
return {
|
|
"subject": decode_header_str(msg.get("Subject", "")),
|
|
"from": decode_header_str(msg.get("From", "")),
|
|
"to": decode_header_str(msg.get("To", "")),
|
|
"cc": decode_header_str(msg.get("Cc", "")),
|
|
"date": msg.get("Date", ""),
|
|
"body_text": body_text,
|
|
"body_html": body_html,
|
|
"attachments": attachments,
|
|
}
|
|
|
|
def _decode_payload(part) -> str:
|
|
raw = part.get_payload(decode=True) or b""
|
|
charset = part.get_content_charset() or chardet.detect(raw).get('encoding', 'utf-8') or 'utf-8'
|
|
return raw.decode(charset, errors='replace')
|