"""JWT + PBKDF2-SHA256 인증 유틸리티 — GUARDiA ITSM. Python 3.14 완전 호환 — 외부 bcrypt 의존성 없음. 패스워드는 PBKDF2-HMAC-SHA256 + 32바이트 salt, 반복 260000회. """ import base64 import hashlib import hmac import os import secrets from datetime import datetime, timedelta, timezone from typing import Optional # 계정 잠금 정책 MAX_FAILED_ATTEMPTS = 5 # 연속 실패 허용 횟수 LOCKOUT_MINUTES = 30 # 잠금 지속 시간(분) from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from database import get_db # ── 설정 ───────────────────────────────────────────────────────────────────── SECRET_KEY = os.environ.get("GUARDIA_JWT_SECRET", "guardia-jwt-secret-2026-change-me!") ALGORITHM = "HS256" TOKEN_EXPIRE_MINUTES = 480 # 8시간 _ITER = 260_000 # PBKDF2 반복 횟수 (NIST 권장) _SALT_LEN = 32 # 바이트 _KEY_LEN = 32 # 바이트 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login", auto_error=False) # ── 패스워드 ────────────────────────────────────────────────────────────────── def hash_password(plain: str) -> str: """PBKDF2-HMAC-SHA256으로 패스워드를 해시합니다.""" salt = secrets.token_bytes(_SALT_LEN) key = hashlib.pbkdf2_hmac("sha256", plain.encode(), salt, _ITER, _KEY_LEN) return base64.b64encode(salt + key).decode() def verify_password(plain: str, hashed: str) -> bool: """저장된 해시와 평문 패스워드를 안전하게 비교합니다.""" try: data = base64.b64decode(hashed.encode()) salt = data[:_SALT_LEN] stored_key = data[_SALT_LEN:] key = hashlib.pbkdf2_hmac("sha256", plain.encode(), salt, _ITER, _KEY_LEN) return hmac.compare_digest(key, stored_key) except Exception: return False # ── JWT ─────────────────────────────────────────────────────────────────────── def create_access_token(data: dict) -> str: payload = data.copy() payload["exp"] = datetime.now(timezone.utc) + timedelta(minutes=TOKEN_EXPIRE_MINUTES) return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) # ── 사용자 조회 Dependencies ────────────────────────────────────────────────── async def get_optional_user( token: Optional[str] = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db), ): """토큰이 없거나 유효하지 않으면 None 반환 (예외 없음).""" if not token: return None try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username = payload.get("sub") if not username: return None except JWTError: return None from models import User result = await db.execute(select(User).where(User.username == username)) user = result.scalars().first() return user if (user and user.is_active) else None async def get_current_user( user=Depends(get_optional_user), ): """토큰 없거나 유효하지 않으면 401 반환.""" if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="인증이 필요합니다", headers={"WWW-Authenticate": "Bearer"}, ) return user # ── Priority 4: RBAC 세분화 ────────────────────────────────────────────────── def require_deploy_role(current_user=Depends(get_current_user)): """ 배포 권한 확인 — ADMIN, PM, DEPLOY_ENGINEER 만 허용. 배포 트리거 및 배치 즉시 실행 엔드포인트에서 사용한다. 권한 매핑: ADMIN : 전체 권한 PM : 배포 승인 및 트리거 DEPLOY_ENGINEER : 배포 트리거 + 배치 즉시 실행 ENGINEER : 조회만 가능 CUSTOMER : 전체 차단 """ from models import UserRole DEPLOY_ROLES = {UserRole.ADMIN, UserRole.PM, UserRole.DEPLOY_ENGINEER} if current_user.role not in DEPLOY_ROLES: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="배포 권한이 없습니다 (ADMIN, PM, DEPLOY_ENGINEER 필요)", ) return current_user def require_admin_role(current_user=Depends(get_current_user)): """ADMIN 전용 엔드포인트 권한 확인.""" from models import UserRole if current_user.role != UserRole.ADMIN: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="ADMIN 권한이 필요합니다", ) return current_user