226 lines
7.6 KiB
Python
226 lines
7.6 KiB
Python
"""
|
|
GUARDiA ITSM — MFA (TOTP) 유틸리티 (Enhancement D-2)
|
|
|
|
기능:
|
|
1. TOTP 시크릿 생성 (RFC 6238, pyotp)
|
|
2. Provisioning URI 생성 (QR 코드용)
|
|
3. QR 코드 PNG → base64 생성 (qrcode[pil] 필요)
|
|
4. TOTP 코드 검증 (±1 윈도우 허용, 30초 유효)
|
|
5. AES-256-GCM으로 TOTP 시크릿 암호화/복호화
|
|
6. MFA 대기 JWT 토큰 발급/검증 (1단계 로그인 성공 후 2단계 전용)
|
|
|
|
MFA 로그인 흐름:
|
|
1. POST /api/auth/login → mfa_enabled=True → { mfa_required: true, mfa_token: "..." }
|
|
2. POST /api/auth/login/mfa { mfa_token, totp_code } → { access_token, ... }
|
|
|
|
MFA 등록 흐름:
|
|
1. POST /api/auth/mfa/setup → { secret, qr_uri, qr_base64 } (임시 저장, 미활성)
|
|
2. POST /api/auth/mfa/enable { totp_code } → MFA 활성화
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import logging
|
|
import os
|
|
import struct
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional, Tuple
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ── AES-GCM 암호화 키 (환경변수에서 파생) ────────────────────────────────────
|
|
_SECRET = os.environ.get("GUARDIA_SECRET_KEY", os.environ.get("GUARDIA_JWT_SECRET", "guardia-mfa-secret-change-me!"))
|
|
|
|
def _derive_key() -> bytes:
|
|
"""SECRET_KEY에서 32바이트 AES 키 파생 (SHA-256)."""
|
|
return hashlib.sha256(_SECRET.encode()).digest()
|
|
|
|
|
|
def _encrypt_secret(plaintext: str) -> str:
|
|
"""AES-256-GCM으로 TOTP 시크릿 암호화 → base64 문자열 반환."""
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
|
import secrets as _secrets
|
|
|
|
key = _derive_key()
|
|
nonce = _secrets.token_bytes(12) # 96-bit nonce
|
|
aesgcm = AESGCM(key)
|
|
ct = aesgcm.encrypt(nonce, plaintext.encode(), None)
|
|
# nonce(12) + ciphertext+tag 를 base64으로 인코딩
|
|
return base64.b64encode(nonce + ct).decode()
|
|
|
|
|
|
def _decrypt_secret(encrypted: str) -> str:
|
|
"""AES-256-GCM 복호화 → 평문 TOTP 시크릿 반환."""
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
|
|
|
key = _derive_key()
|
|
data = base64.b64decode(encrypted.encode())
|
|
nonce = data[:12]
|
|
ct = data[12:]
|
|
aesgcm = AESGCM(key)
|
|
return aesgcm.decrypt(nonce, ct, None).decode()
|
|
|
|
|
|
# ── TOTP 핵심 함수 ────────────────────────────────────────────────────────────
|
|
|
|
def generate_totp_secret() -> str:
|
|
"""새 TOTP 시크릿 생성 (Base32, 32자 = 160bit)."""
|
|
try:
|
|
import pyotp
|
|
return pyotp.random_base32()
|
|
except ImportError:
|
|
# pyotp 미설치 시 표준 라이브러리로 대체 (Base32 인코딩)
|
|
import secrets as _secrets
|
|
raw = _secrets.token_bytes(20) # 160bit
|
|
return base64.b32encode(raw).decode().rstrip("=")
|
|
|
|
|
|
def get_totp_uri(username: str, secret: str, issuer: str = "GUARDiA ITSM") -> str:
|
|
"""Google Authenticator 호환 Provisioning URI 생성."""
|
|
try:
|
|
import pyotp
|
|
totp = pyotp.TOTP(secret)
|
|
return totp.provisioning_uri(name=username, issuer_name=issuer)
|
|
except ImportError:
|
|
# RFC 6238 URI 직접 구성
|
|
from urllib.parse import quote
|
|
label = quote(f"{issuer}:{username}")
|
|
return (
|
|
f"otpauth://totp/{label}"
|
|
f"?secret={secret}"
|
|
f"&issuer={quote(issuer)}"
|
|
f"&algorithm=SHA1"
|
|
f"&digits=6"
|
|
f"&period=30"
|
|
)
|
|
|
|
|
|
def verify_totp(secret: str, code: str, valid_window: int = 1) -> bool:
|
|
"""
|
|
TOTP 코드 검증.
|
|
|
|
Args:
|
|
secret: Base32 TOTP 시크릿
|
|
code: 사용자 입력 6자리 코드
|
|
valid_window: 허용 윈도우 수 (기본 ±1 = 90초 허용)
|
|
|
|
Returns:
|
|
True: 유효한 코드
|
|
"""
|
|
if not code or len(code) != 6 or not code.isdigit():
|
|
return False
|
|
try:
|
|
import pyotp
|
|
totp = pyotp.TOTP(secret)
|
|
return totp.verify(code, valid_window=valid_window)
|
|
except ImportError:
|
|
# pyotp 미설치 시 직접 TOTP 검증
|
|
return _verify_totp_fallback(secret, code, valid_window)
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _verify_totp_fallback(secret: str, code: str, valid_window: int) -> bool:
|
|
"""pyotp 없이 TOTP 검증 (표준 라이브러리만 사용)."""
|
|
import hmac as _hmac
|
|
import time
|
|
|
|
try:
|
|
key = base64.b32decode(secret.upper())
|
|
now = int(time.time())
|
|
for offset in range(-valid_window, valid_window + 1):
|
|
counter = (now + offset * 30) // 30
|
|
counter_bytes = struct.pack(">Q", counter)
|
|
mac = _hmac.new(key, counter_bytes, hashlib.sha1).digest()
|
|
offset_val = mac[-1] & 0x0F
|
|
trunc = struct.unpack(">I", mac[offset_val:offset_val + 4])[0] & 0x7FFFFFFF
|
|
totp_val = trunc % 1_000_000
|
|
if f"{totp_val:06d}" == code:
|
|
return True
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def generate_qr_base64(uri: str) -> Optional[str]:
|
|
"""
|
|
Provisioning URI → QR 코드 PNG → base64 인코딩.
|
|
qrcode[pil] 미설치 시 None 반환 (URI 직접 표시 방식으로 폴백).
|
|
"""
|
|
try:
|
|
import qrcode
|
|
from io import BytesIO
|
|
from PIL import Image
|
|
|
|
qr = qrcode.QRCode(
|
|
version=1,
|
|
error_correction=qrcode.constants.ERROR_CORRECT_L,
|
|
box_size=10,
|
|
border=4,
|
|
)
|
|
qr.add_data(uri)
|
|
qr.make(fit=True)
|
|
img = qr.make_image(fill_color="black", back_color="white")
|
|
buf = BytesIO()
|
|
img.save(buf, format="PNG")
|
|
return base64.b64encode(buf.getvalue()).decode()
|
|
except ImportError:
|
|
logger.info("qrcode 또는 Pillow 미설치 — QR 이미지 생성 스킵")
|
|
return None
|
|
except Exception as exc:
|
|
logger.warning("QR 코드 생성 실패: %s", exc)
|
|
return None
|
|
|
|
|
|
# ── MFA 대기 JWT 토큰 (1단계 인증 후 2단계 전용) ─────────────────────────────
|
|
|
|
_MFA_PENDING_EXPIRE_MINUTES = 5 # 5분 유효
|
|
|
|
|
|
def create_mfa_pending_token(username: str) -> str:
|
|
"""
|
|
MFA 1단계(비밀번호) 인증 성공 후 발급하는 임시 토큰.
|
|
2단계(TOTP) 인증 엔드포인트에서만 사용 가능.
|
|
"""
|
|
from core.auth import SECRET_KEY, ALGORITHM
|
|
from jose import jwt
|
|
|
|
payload = {
|
|
"sub": username,
|
|
"mfa_pending": True,
|
|
"exp": datetime.now(timezone.utc) + timedelta(minutes=_MFA_PENDING_EXPIRE_MINUTES),
|
|
}
|
|
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
|
|
|
|
|
|
def verify_mfa_pending_token(token: str) -> Optional[str]:
|
|
"""
|
|
MFA 대기 토큰 검증.
|
|
|
|
Returns:
|
|
username 문자열 (유효한 경우), None (무효)
|
|
"""
|
|
from core.auth import SECRET_KEY, ALGORITHM
|
|
from jose import JWTError, jwt
|
|
|
|
try:
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
if not payload.get("mfa_pending"):
|
|
return None
|
|
return payload.get("sub")
|
|
except JWTError:
|
|
return None
|
|
|
|
|
|
# ── 사용자 TOTP 시크릿 저장/조회 헬퍼 ───────────────────────────────────────
|
|
|
|
def encrypt_totp_secret(secret: str) -> str:
|
|
"""DB 저장 전 TOTP 시크릿 암호화."""
|
|
return _encrypt_secret(secret)
|
|
|
|
|
|
def decrypt_totp_secret(encrypted: str) -> str:
|
|
"""DB에서 조회한 암호화 시크릿 복호화."""
|
|
return _decrypt_secret(encrypted)
|