guardia-itsm/core/auth.py
DESKTOP-TKLFCPRython 64c27c3509 feat(itsm): G-1~G-12 확장 기능 + 하네스/봇/설치스크립트 구현
G-1: 메신저 Webhook Relay + _send_to_room 실제 httpx 호출 구현
G-2: POST /api/tasks/bulk SR 대량작업 엔드포인트 (최대 100건)
G-3: 라이선스 만료 알림 스케줄러 (매일 09:00 KST)
G-4: 체험판 upgrade_banner 필드 + license.py 배너 로직
G-5: core/auto_rca.py + incidents/problem auto-rca 엔드포인트
G-6: core/deploy_impact.py + vibe impact-analysis 엔드포인트
G-7: core/ticket_classifier.py + SR 생성 시 AI 분류 + ai-suggestion API
G-8: VulnPatchRecord 모델 + vuln_scan 패치추적 4개 엔드포인트
G-9: core/jira_sync.py + gateway Jira/Confluence 연동 엔드포인트
G-10: core/push_notify.py + routers/push.py + PushSubscription 모델
G-11: approvals 다중승인 (위임/서명/기한초과/마감연장)
G-12: alembic.ini + migrations/ + cicd/migrate_to_postgres.sh

하네스: guardia-orchestrator 확장기능 Phase 반영
봇명령어: /sr /status /license /bulk 슬래시 명령어 추가
설치스크립트: setup/ (Ubuntu, CentOS, RHEL, Windows) --test 옵션 포함

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 18:18:52 +09:00

136 lines
5.3 KiB
Python

"""JWT + PBKDF2-SHA256 인증 유틸리티 — GUARDiA ITSM.
Python 3.14 완전 호환 — 외부 bcrypt 의존성 없음.
패스워드는 PBKDF2-HMAC-SHA256 + 32바이트 salt, 반복 260000회.
"""
import base64
import hashlib
import hmac
import os
import secrets
from datetime import datetime, timedelta, timezone
from typing import Optional
# 계정 잠금 정책
MAX_FAILED_ATTEMPTS = 5 # 연속 실패 허용 횟수
LOCKOUT_MINUTES = 30 # 잠금 지속 시간(분)
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
# ── 설정 ─────────────────────────────────────────────────────────────────────
SECRET_KEY = os.environ.get("GUARDIA_JWT_SECRET", "guardia-jwt-secret-2026-change-me!")
ALGORITHM = "HS256"
TOKEN_EXPIRE_MINUTES = 480 # 8시간
_ITER = 260_000 # PBKDF2 반복 횟수 (NIST 권장)
_SALT_LEN = 32 # 바이트
_KEY_LEN = 32 # 바이트
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login", auto_error=False)
# ── 패스워드 ──────────────────────────────────────────────────────────────────
def hash_password(plain: str) -> str:
"""PBKDF2-HMAC-SHA256으로 패스워드를 해시합니다."""
salt = secrets.token_bytes(_SALT_LEN)
key = hashlib.pbkdf2_hmac("sha256", plain.encode(), salt, _ITER, _KEY_LEN)
return base64.b64encode(salt + key).decode()
def verify_password(plain: str, hashed: str) -> bool:
"""저장된 해시와 평문 패스워드를 안전하게 비교합니다."""
try:
data = base64.b64decode(hashed.encode())
salt = data[:_SALT_LEN]
stored_key = data[_SALT_LEN:]
key = hashlib.pbkdf2_hmac("sha256", plain.encode(), salt, _ITER, _KEY_LEN)
return hmac.compare_digest(key, stored_key)
except Exception:
return False
# ── JWT ───────────────────────────────────────────────────────────────────────
def create_access_token(data: dict) -> str:
payload = data.copy()
payload["exp"] = datetime.now(timezone.utc) + timedelta(minutes=TOKEN_EXPIRE_MINUTES)
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
# ── 사용자 조회 Dependencies ──────────────────────────────────────────────────
async def get_optional_user(
token: Optional[str] = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
):
"""토큰이 없거나 유효하지 않으면 None 반환 (예외 없음)."""
if not token:
return None
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if not username:
return None
except JWTError:
return None
from models import User
result = await db.execute(select(User).where(User.username == username))
user = result.scalars().first()
return user if (user and user.is_active) else None
async def get_current_user(
user=Depends(get_optional_user),
):
"""토큰 없거나 유효하지 않으면 401 반환."""
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="인증이 필요합니다",
headers={"WWW-Authenticate": "Bearer"},
)
return user
# ── Priority 4: RBAC 세분화 ──────────────────────────────────────────────────
def require_deploy_role(current_user=Depends(get_current_user)):
"""
배포 권한 확인 — ADMIN, PM, DEPLOY_ENGINEER 만 허용.
배포 트리거 및 배치 즉시 실행 엔드포인트에서 사용한다.
권한 매핑:
ADMIN : 전체 권한
PM : 배포 승인 및 트리거
DEPLOY_ENGINEER : 배포 트리거 + 배치 즉시 실행
ENGINEER : 조회만 가능
CUSTOMER : 전체 차단
"""
from models import UserRole
DEPLOY_ROLES = {UserRole.ADMIN, UserRole.PM, UserRole.DEPLOY_ENGINEER}
if current_user.role not in DEPLOY_ROLES:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="배포 권한이 없습니다 (ADMIN, PM, DEPLOY_ENGINEER 필요)",
)
return current_user
def require_admin_role(current_user=Depends(get_current_user)):
"""ADMIN 전용 엔드포인트 권한 확인."""
from models import UserRole
if current_user.role != UserRole.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="ADMIN 권한이 필요합니다",
)
return current_user