feat: add backend/auth.py
This commit is contained in:
parent
14af58d776
commit
5f22f7b6d1
71
backend/auth.py
Normal file
71
backend/auth.py
Normal file
@ -0,0 +1,71 @@
|
||||
"""JWT 인증: IMAP 자격증명 암호화 포함"""
|
||||
import os, ssl, base64, hashlib
|
||||
from datetime import datetime, timedelta
|
||||
from fastapi import HTTPException, status, Depends
|
||||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||||
from jose import jwt, JWTError
|
||||
from cryptography.fernet import Fernet
|
||||
import aioimaplib
|
||||
|
||||
SECRET = os.getenv("MAIL_JWT_SECRET", "zioinfo-mail-jwt-2026-secret-key!!")
|
||||
_key_bytes = hashlib.sha256(SECRET.encode()).digest()
|
||||
FERNET_KEY = base64.urlsafe_b64encode(_key_bytes)
|
||||
_fernet = Fernet(FERNET_KEY)
|
||||
EXPIRE_H = 8
|
||||
|
||||
IMAP_HOST = "localhost"
|
||||
IMAP_PORT = 993
|
||||
|
||||
bearer = HTTPBearer(auto_error=False)
|
||||
|
||||
|
||||
def _ssl_ctx():
|
||||
ctx = ssl.create_default_context()
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
return ctx
|
||||
|
||||
|
||||
def _imap_user(username: str) -> str:
|
||||
"""Dovecot PAM은 username만 사용 (@ 이후 제거)"""
|
||||
return username.split("@")[0]
|
||||
|
||||
|
||||
async def verify_imap(username: str, password: str) -> bool:
|
||||
try:
|
||||
imap = aioimaplib.IMAP4_SSL(host=IMAP_HOST, port=IMAP_PORT, ssl_context=_ssl_ctx())
|
||||
await imap.wait_hello_from_server()
|
||||
res, _ = await imap.login(_imap_user(username), password)
|
||||
try: await imap.logout()
|
||||
except Exception: pass
|
||||
return res == "OK"
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def create_token(username: str, password: str) -> str:
|
||||
enc = _fernet.encrypt(password.encode()).decode()
|
||||
payload = {
|
||||
"sub": username,
|
||||
"pw": enc,
|
||||
"exp": datetime.utcnow() + timedelta(hours=EXPIRE_H),
|
||||
}
|
||||
return jwt.encode(payload, SECRET, algorithm="HS256")
|
||||
|
||||
|
||||
def decode_token(token: str) -> tuple[str, str]:
|
||||
try:
|
||||
payload = jwt.decode(token, SECRET, algorithms=["HS256"])
|
||||
username: str = payload["sub"]
|
||||
password: str = _fernet.decrypt(payload["pw"].encode()).decode()
|
||||
return username, password
|
||||
except JWTError:
|
||||
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "토큰이 유효하지 않습니다")
|
||||
|
||||
|
||||
async def current_user(
|
||||
creds: HTTPAuthorizationCredentials = Depends(bearer),
|
||||
) -> tuple[str, str]:
|
||||
if not creds:
|
||||
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "로그인이 필요합니다")
|
||||
return decode_token(creds.credentials)
|
||||
Loading…
Reference in New Issue
Block a user