zioinfo-mail/itsm/core/mfa.py
DESKTOP-TKLFCPR\ython e228faabf5 feat(itsm): G-1~G-12 확장 기능 + 하네스/봇/설치스크립트 구현
G-1: 메신저 Webhook Relay + _send_to_room 실제 httpx 호출 구현
G-2: POST /api/tasks/bulk SR 대량작업 엔드포인트 (최대 100건)
G-3: 라이선스 만료 알림 스케줄러 (매일 09:00 KST)
G-4: 체험판 upgrade_banner 필드 + license.py 배너 로직
G-5: core/auto_rca.py + incidents/problem auto-rca 엔드포인트
G-6: core/deploy_impact.py + vibe impact-analysis 엔드포인트
G-7: core/ticket_classifier.py + SR 생성 시 AI 분류 + ai-suggestion API
G-8: VulnPatchRecord 모델 + vuln_scan 패치추적 4개 엔드포인트
G-9: core/jira_sync.py + gateway Jira/Confluence 연동 엔드포인트
G-10: core/push_notify.py + routers/push.py + PushSubscription 모델
G-11: approvals 다중승인 (위임/서명/기한초과/마감연장)
G-12: alembic.ini + migrations/ + cicd/migrate_to_postgres.sh

하네스: guardia-orchestrator 확장기능 Phase 반영
봇명령어: /sr /status /license /bulk 슬래시 명령어 추가
설치스크립트: setup/ (Ubuntu, CentOS, RHEL, Windows) --test 옵션 포함

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 18:18:52 +09:00

226 lines
7.6 KiB
Python

"""
GUARDiA ITSM — MFA (TOTP) 유틸리티 (Enhancement D-2)
기능:
1. TOTP 시크릿 생성 (RFC 6238, pyotp)
2. Provisioning URI 생성 (QR 코드용)
3. QR 코드 PNG → base64 생성 (qrcode[pil] 필요)
4. TOTP 코드 검증 (±1 윈도우 허용, 30초 유효)
5. AES-256-GCM으로 TOTP 시크릿 암호화/복호화
6. MFA 대기 JWT 토큰 발급/검증 (1단계 로그인 성공 후 2단계 전용)
MFA 로그인 흐름:
1. POST /api/auth/login → mfa_enabled=True → { mfa_required: true, mfa_token: "..." }
2. POST /api/auth/login/mfa { mfa_token, totp_code } → { access_token, ... }
MFA 등록 흐름:
1. POST /api/auth/mfa/setup → { secret, qr_uri, qr_base64 } (임시 저장, 미활성)
2. POST /api/auth/mfa/enable { totp_code } → MFA 활성화
"""
from __future__ import annotations
import base64
import hashlib
import logging
import os
import struct
from datetime import datetime, timedelta, timezone
from typing import Optional, Tuple
logger = logging.getLogger(__name__)
# ── AES-GCM 암호화 키 (환경변수에서 파생) ────────────────────────────────────
_SECRET = os.environ.get("GUARDIA_SECRET_KEY", os.environ.get("GUARDIA_JWT_SECRET", "guardia-mfa-secret-change-me!"))
def _derive_key() -> bytes:
"""SECRET_KEY에서 32바이트 AES 키 파생 (SHA-256)."""
return hashlib.sha256(_SECRET.encode()).digest()
def _encrypt_secret(plaintext: str) -> str:
"""AES-256-GCM으로 TOTP 시크릿 암호화 → base64 문자열 반환."""
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import secrets as _secrets
key = _derive_key()
nonce = _secrets.token_bytes(12) # 96-bit nonce
aesgcm = AESGCM(key)
ct = aesgcm.encrypt(nonce, plaintext.encode(), None)
# nonce(12) + ciphertext+tag 를 base64으로 인코딩
return base64.b64encode(nonce + ct).decode()
def _decrypt_secret(encrypted: str) -> str:
"""AES-256-GCM 복호화 → 평문 TOTP 시크릿 반환."""
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
key = _derive_key()
data = base64.b64decode(encrypted.encode())
nonce = data[:12]
ct = data[12:]
aesgcm = AESGCM(key)
return aesgcm.decrypt(nonce, ct, None).decode()
# ── TOTP 핵심 함수 ────────────────────────────────────────────────────────────
def generate_totp_secret() -> str:
"""새 TOTP 시크릿 생성 (Base32, 32자 = 160bit)."""
try:
import pyotp
return pyotp.random_base32()
except ImportError:
# pyotp 미설치 시 표준 라이브러리로 대체 (Base32 인코딩)
import secrets as _secrets
raw = _secrets.token_bytes(20) # 160bit
return base64.b32encode(raw).decode().rstrip("=")
def get_totp_uri(username: str, secret: str, issuer: str = "GUARDiA ITSM") -> str:
"""Google Authenticator 호환 Provisioning URI 생성."""
try:
import pyotp
totp = pyotp.TOTP(secret)
return totp.provisioning_uri(name=username, issuer_name=issuer)
except ImportError:
# RFC 6238 URI 직접 구성
from urllib.parse import quote
label = quote(f"{issuer}:{username}")
return (
f"otpauth://totp/{label}"
f"?secret={secret}"
f"&issuer={quote(issuer)}"
f"&algorithm=SHA1"
f"&digits=6"
f"&period=30"
)
def verify_totp(secret: str, code: str, valid_window: int = 1) -> bool:
"""
TOTP 코드 검증.
Args:
secret: Base32 TOTP 시크릿
code: 사용자 입력 6자리 코드
valid_window: 허용 윈도우 수 (기본 ±1 = 90초 허용)
Returns:
True: 유효한 코드
"""
if not code or len(code) != 6 or not code.isdigit():
return False
try:
import pyotp
totp = pyotp.TOTP(secret)
return totp.verify(code, valid_window=valid_window)
except ImportError:
# pyotp 미설치 시 직접 TOTP 검증
return _verify_totp_fallback(secret, code, valid_window)
except Exception:
return False
def _verify_totp_fallback(secret: str, code: str, valid_window: int) -> bool:
"""pyotp 없이 TOTP 검증 (표준 라이브러리만 사용)."""
import hmac as _hmac
import time
try:
key = base64.b32decode(secret.upper())
now = int(time.time())
for offset in range(-valid_window, valid_window + 1):
counter = (now + offset * 30) // 30
counter_bytes = struct.pack(">Q", counter)
mac = _hmac.new(key, counter_bytes, hashlib.sha1).digest()
offset_val = mac[-1] & 0x0F
trunc = struct.unpack(">I", mac[offset_val:offset_val + 4])[0] & 0x7FFFFFFF
totp_val = trunc % 1_000_000
if f"{totp_val:06d}" == code:
return True
return False
except Exception:
return False
def generate_qr_base64(uri: str) -> Optional[str]:
"""
Provisioning URI → QR 코드 PNG → base64 인코딩.
qrcode[pil] 미설치 시 None 반환 (URI 직접 표시 방식으로 폴백).
"""
try:
import qrcode
from io import BytesIO
from PIL import Image
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=4,
)
qr.add_data(uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buf = BytesIO()
img.save(buf, format="PNG")
return base64.b64encode(buf.getvalue()).decode()
except ImportError:
logger.info("qrcode 또는 Pillow 미설치 — QR 이미지 생성 스킵")
return None
except Exception as exc:
logger.warning("QR 코드 생성 실패: %s", exc)
return None
# ── MFA 대기 JWT 토큰 (1단계 인증 후 2단계 전용) ─────────────────────────────
_MFA_PENDING_EXPIRE_MINUTES = 5 # 5분 유효
def create_mfa_pending_token(username: str) -> str:
"""
MFA 1단계(비밀번호) 인증 성공 후 발급하는 임시 토큰.
2단계(TOTP) 인증 엔드포인트에서만 사용 가능.
"""
from core.auth import SECRET_KEY, ALGORITHM
from jose import jwt
payload = {
"sub": username,
"mfa_pending": True,
"exp": datetime.now(timezone.utc) + timedelta(minutes=_MFA_PENDING_EXPIRE_MINUTES),
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def verify_mfa_pending_token(token: str) -> Optional[str]:
"""
MFA 대기 토큰 검증.
Returns:
username 문자열 (유효한 경우), None (무효)
"""
from core.auth import SECRET_KEY, ALGORITHM
from jose import JWTError, jwt
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
if not payload.get("mfa_pending"):
return None
return payload.get("sub")
except JWTError:
return None
# ── 사용자 TOTP 시크릿 저장/조회 헬퍼 ───────────────────────────────────────
def encrypt_totp_secret(secret: str) -> str:
"""DB 저장 전 TOTP 시크릿 암호화."""
return _encrypt_secret(secret)
def decrypt_totp_secret(encrypted: str) -> str:
"""DB에서 조회한 암호화 시크릿 복호화."""
return _decrypt_secret(encrypted)