""" 개방망 외부 API 보안 — API Key 발급·검증·감사 - API Key: sha256 해시 저장, 평문은 발급 시 1회만 노출 - 권한 스코프: read | write | admin | webhook - IP 화이트리스트 (선택) - 요청별 감사 로깅 """ import hashlib import secrets import time from datetime import datetime, timezone from typing import Optional from fastapi import Depends, Header, HTTPException, Request, status from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from database import get_db from models import APIKey, AuditLog # ── API Key 생성 ────────────────────────────────────────────────────────────── def generate_api_key() -> tuple[str, str]: """평문 키와 SHA-256 해시를 반환. 평문은 1회만 노출.""" plain = "grd_" + secrets.token_urlsafe(32) hashed = hashlib.sha256(plain.encode()).hexdigest() return plain, hashed def hash_key(plain: str) -> str: return hashlib.sha256(plain.encode()).hexdigest() # ── API Key 검증 Dependency ─────────────────────────────────────────────────── async def verify_api_key( request: Request, x_api_key: Optional[str] = Header(None, alias="X-API-Key"), db: AsyncSession = Depends(get_db), ) -> "APIKey": """ 외부 API 호출용 API Key 인증. 헤더: X-API-Key: grd_xxxxx """ if not x_api_key: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="X-API-Key 헤더가 필요합니다.", headers={"WWW-Authenticate": "ApiKey"}, ) key_hash = hash_key(x_api_key) row = await db.execute( select(APIKey).where(APIKey.key_hash == key_hash, APIKey.is_active == True) ) api_key = row.scalar_one_or_none() if not api_key: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="유효하지 않은 API Key입니다.") # 만료 확인 if api_key.expires_at and api_key.expires_at < datetime.now(timezone.utc).replace(tzinfo=None): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="만료된 API Key입니다.") # IP 화이트리스트 확인 if api_key.allowed_ips: client_ip = request.client.host allowed = [ip.strip() for ip in api_key.allowed_ips.split(",")] if client_ip not in allowed: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"허용되지 않은 IP입니다.", ) # 사용 횟수 갱신 api_key.use_count = (api_key.use_count or 0) + 1 api_key.last_used_at = datetime.utcnow() await db.commit() return api_key def require_scope(scope: str): """특정 스코프(read/write/admin/webhook)를 요구하는 Dependency 팩토리.""" async def _check(api_key: "APIKey" = Depends(verify_api_key)): scopes = (api_key.scopes or "").split(",") if scope not in scopes and "admin" not in scopes: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"'{scope}' 권한이 필요합니다.", ) return api_key return _check