""" GUARDiA ITSM — MFA (TOTP) 유틸리티 (Enhancement D-2) 기능: 1. TOTP 시크릿 생성 (RFC 6238, pyotp) 2. Provisioning URI 생성 (QR 코드용) 3. QR 코드 PNG → base64 생성 (qrcode[pil] 필요) 4. TOTP 코드 검증 (±1 윈도우 허용, 30초 유효) 5. AES-256-GCM으로 TOTP 시크릿 암호화/복호화 6. MFA 대기 JWT 토큰 발급/검증 (1단계 로그인 성공 후 2단계 전용) MFA 로그인 흐름: 1. POST /api/auth/login → mfa_enabled=True → { mfa_required: true, mfa_token: "..." } 2. POST /api/auth/login/mfa { mfa_token, totp_code } → { access_token, ... } MFA 등록 흐름: 1. POST /api/auth/mfa/setup → { secret, qr_uri, qr_base64 } (임시 저장, 미활성) 2. POST /api/auth/mfa/enable { totp_code } → MFA 활성화 """ from __future__ import annotations import base64 import hashlib import logging import os import struct from datetime import datetime, timedelta, timezone from typing import Optional, Tuple logger = logging.getLogger(__name__) # ── AES-GCM 암호화 키 (환경변수에서 파생) ──────────────────────────────────── _SECRET = os.environ.get("GUARDIA_SECRET_KEY", os.environ.get("GUARDIA_JWT_SECRET", "guardia-mfa-secret-change-me!")) def _derive_key() -> bytes: """SECRET_KEY에서 32바이트 AES 키 파생 (SHA-256).""" return hashlib.sha256(_SECRET.encode()).digest() def _encrypt_secret(plaintext: str) -> str: """AES-256-GCM으로 TOTP 시크릿 암호화 → base64 문자열 반환.""" from cryptography.hazmat.primitives.ciphers.aead import AESGCM import secrets as _secrets key = _derive_key() nonce = _secrets.token_bytes(12) # 96-bit nonce aesgcm = AESGCM(key) ct = aesgcm.encrypt(nonce, plaintext.encode(), None) # nonce(12) + ciphertext+tag 를 base64으로 인코딩 return base64.b64encode(nonce + ct).decode() def _decrypt_secret(encrypted: str) -> str: """AES-256-GCM 복호화 → 평문 TOTP 시크릿 반환.""" from cryptography.hazmat.primitives.ciphers.aead import AESGCM key = _derive_key() data = base64.b64decode(encrypted.encode()) nonce = data[:12] ct = data[12:] aesgcm = AESGCM(key) return aesgcm.decrypt(nonce, ct, None).decode() # ── TOTP 핵심 함수 ──────────────────────────────────────────────────────────── def generate_totp_secret() -> str: """새 TOTP 시크릿 생성 (Base32, 32자 = 160bit).""" try: import pyotp return pyotp.random_base32() except ImportError: # pyotp 미설치 시 표준 라이브러리로 대체 (Base32 인코딩) import secrets as _secrets raw = _secrets.token_bytes(20) # 160bit return base64.b32encode(raw).decode().rstrip("=") def get_totp_uri(username: str, secret: str, issuer: str = "GUARDiA ITSM") -> str: """Google Authenticator 호환 Provisioning URI 생성.""" try: import pyotp totp = pyotp.TOTP(secret) return totp.provisioning_uri(name=username, issuer_name=issuer) except ImportError: # RFC 6238 URI 직접 구성 from urllib.parse import quote label = quote(f"{issuer}:{username}") return ( f"otpauth://totp/{label}" f"?secret={secret}" f"&issuer={quote(issuer)}" f"&algorithm=SHA1" f"&digits=6" f"&period=30" ) def verify_totp(secret: str, code: str, valid_window: int = 1) -> bool: """ TOTP 코드 검증. Args: secret: Base32 TOTP 시크릿 code: 사용자 입력 6자리 코드 valid_window: 허용 윈도우 수 (기본 ±1 = 90초 허용) Returns: True: 유효한 코드 """ if not code or len(code) != 6 or not code.isdigit(): return False try: import pyotp totp = pyotp.TOTP(secret) return totp.verify(code, valid_window=valid_window) except ImportError: # pyotp 미설치 시 직접 TOTP 검증 return _verify_totp_fallback(secret, code, valid_window) except Exception: return False def _verify_totp_fallback(secret: str, code: str, valid_window: int) -> bool: """pyotp 없이 TOTP 검증 (표준 라이브러리만 사용).""" import hmac as _hmac import time try: key = base64.b32decode(secret.upper()) now = int(time.time()) for offset in range(-valid_window, valid_window + 1): counter = (now + offset * 30) // 30 counter_bytes = struct.pack(">Q", counter) mac = _hmac.new(key, counter_bytes, hashlib.sha1).digest() offset_val = mac[-1] & 0x0F trunc = struct.unpack(">I", mac[offset_val:offset_val + 4])[0] & 0x7FFFFFFF totp_val = trunc % 1_000_000 if f"{totp_val:06d}" == code: return True return False except Exception: return False def generate_qr_base64(uri: str) -> Optional[str]: """ Provisioning URI → QR 코드 PNG → base64 인코딩. qrcode[pil] 미설치 시 None 반환 (URI 직접 표시 방식으로 폴백). """ try: import qrcode from io import BytesIO from PIL import Image qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4, ) qr.add_data(uri) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") buf = BytesIO() img.save(buf, format="PNG") return base64.b64encode(buf.getvalue()).decode() except ImportError: logger.info("qrcode 또는 Pillow 미설치 — QR 이미지 생성 스킵") return None except Exception as exc: logger.warning("QR 코드 생성 실패: %s", exc) return None # ── MFA 대기 JWT 토큰 (1단계 인증 후 2단계 전용) ───────────────────────────── _MFA_PENDING_EXPIRE_MINUTES = 5 # 5분 유효 def create_mfa_pending_token(username: str) -> str: """ MFA 1단계(비밀번호) 인증 성공 후 발급하는 임시 토큰. 2단계(TOTP) 인증 엔드포인트에서만 사용 가능. """ from core.auth import SECRET_KEY, ALGORITHM from jose import jwt payload = { "sub": username, "mfa_pending": True, "exp": datetime.now(timezone.utc) + timedelta(minutes=_MFA_PENDING_EXPIRE_MINUTES), } return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) def verify_mfa_pending_token(token: str) -> Optional[str]: """ MFA 대기 토큰 검증. Returns: username 문자열 (유효한 경우), None (무효) """ from core.auth import SECRET_KEY, ALGORITHM from jose import JWTError, jwt try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) if not payload.get("mfa_pending"): return None return payload.get("sub") except JWTError: return None # ── 사용자 TOTP 시크릿 저장/조회 헬퍼 ─────────────────────────────────────── def encrypt_totp_secret(secret: str) -> str: """DB 저장 전 TOTP 시크릿 암호화.""" return _encrypt_secret(secret) def decrypt_totp_secret(encrypted: str) -> str: """DB에서 조회한 암호화 시크릿 복호화.""" return _decrypt_secret(encrypted)