zioinfo-mail/itsm/routers/auth.py
DESKTOP-TKLFCPR\ython 82a4c72080 feat(itsm): PMS/준수성/JMeter/공공기관 기능 + Nifty UI + 로고 Copyright
[PMS 완성]
- core/si_report.py: 일/주/월 보고서 (Excel/HTML/PDF/DOCX/PPTX)
- routers/si_report.py: daily|weekly|monthly + 메신저 발송
- routers/deliverables.py: 산출물 CRUD + 제출/검토
- si_issues.py: 이슈→SR 자동 연결
- scheduler.py: 일일 18:00 + 주간 금 17:00 자동 보고서
- models.py: Deliverable 모델

[준수성 자동 점검]
- core/compliance_check.py: SC-8개/WA-7개/PI-6개 규칙
- routers/compliance.py: 스캔 + HTML/Excel 보고서

[JMeter 성능 테스트]
- routers/jmeter.py: JTL 업로드 + 내장 부하 테스트 + 보고서

[공공기관 필수 기능]
- routers/public_checklist.py: 행안부 기준 19개 항목

[UI/브랜드]
- 로고(ziologo.png) + Copyright 2026 All Rights Reserved
- Nifty 계층형 사이드바 (PMS 서브메뉴)
- X-Powered-By + X-Copyright 응답 헤더
- manual/15_UI_Nifty_가이드.md

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 22:50:29 +09:00

546 lines
20 KiB
Python

"""로그인 / 비밀번호 변경 / 내 정보 / MFA (TOTP) 엔드포인트."""
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import (create_access_token, get_current_user, hash_password,
verify_password, MAX_FAILED_ATTEMPTS, LOCKOUT_MINUTES)
from database import get_db
from models import User
router = APIRouter(prefix="/api/auth", tags=["auth"])
# ── 스키마 ──────────────────────────────────────────────────────────────────
class LoginRequest(BaseModel):
username: str
password: str
class ChangePasswordRequest(BaseModel):
current_password: str
new_password: str
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
must_change_pw: bool
username: str
display_name: str
role: str
mfa_enabled: bool = False
class MfaPendingResponse(BaseModel):
"""비밀번호 인증 성공 후 MFA 2단계 필요 시 반환."""
mfa_required: bool = True
mfa_token: str # 5분 유효 임시 토큰
class MfaVerifyRequest(BaseModel):
"""MFA 2단계: mfa_token + TOTP 코드."""
mfa_token: str
totp_code: str
class MfaSetupRequest(BaseModel):
"""MFA 등록 활성화 요청."""
totp_code: str # 등록 확인용 코드 (앱에서 생성된 첫 코드)
class MfaDisableRequest(BaseModel):
"""MFA 비활성화 요청."""
password: str # 본인 확인용 현재 비밀번호
totp_code: str # MFA 비활성화를 위한 TOTP 코드 (추가 검증)
# ── 엔드포인트 ───────────────────────────────────────────────────────────────
@router.post("/login")
async def login(body: LoginRequest, db: AsyncSession = Depends(get_db)):
"""
1단계 로그인 (비밀번호).
보안 강화:
- 연속 실패 5회 → 계정 30분 잠금
- 잠금 상태에서는 올바른 비밀번호를 입력해도 차단
- 성공 시 실패 횟수 초기화
"""
result = await db.execute(select(User).where(User.username == body.username))
user = result.scalars().first()
# 계정 존재 여부와 무관하게 동일한 오류 메시지 반환 (사용자 열거 방지)
_INVALID_MSG = "아이디 또는 비밀번호가 올바르지 않습니다"
if not user or not user.is_active:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=_INVALID_MSG)
# 계정 잠금 확인
now = datetime.now()
if user.locked_until and user.locked_until > now:
remaining = int((user.locked_until - now).total_seconds() / 60) + 1
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"계정이 잠겨 있습니다. {remaining}분 후 다시 시도하세요.",
)
# 잠금 기간 만료 시 자동 초기화
if user.locked_until and user.locked_until <= now:
user.failed_login_count = 0
user.locked_until = None
# 비밀번호 검증
if not verify_password(body.password, user.hashed_pw):
user.failed_login_count = (user.failed_login_count or 0) + 1
if user.failed_login_count >= MAX_FAILED_ATTEMPTS:
user.locked_until = now + timedelta(minutes=LOCKOUT_MINUTES)
await db.commit()
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"로그인 실패 {MAX_FAILED_ATTEMPTS}회 초과로 계정이 {LOCKOUT_MINUTES}분간 잠겼습니다.",
)
await db.commit()
remaining_attempts = MAX_FAILED_ATTEMPTS - user.failed_login_count
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"{_INVALID_MSG} (남은 시도: {remaining_attempts}회)",
)
# 로그인 성공 — 실패 횟수 초기화
user.failed_login_count = 0
user.locked_until = None
# MFA 활성 사용자 → 임시 토큰 반환
if user.mfa_enabled:
await db.commit()
from core.mfa import create_mfa_pending_token
mfa_token = create_mfa_pending_token(user.username)
return MfaPendingResponse(mfa_required=True, mfa_token=mfa_token)
user.last_login_at = datetime.now()
await db.commit()
token = create_access_token({"sub": user.username, "role": user.role})
return TokenResponse(
access_token=token,
must_change_pw=user.must_change_pw,
username=user.username,
display_name=user.display_name or user.username,
role=user.role,
mfa_enabled=False,
)
@router.post("/login/mfa", response_model=TokenResponse)
async def login_mfa(body: MfaVerifyRequest, db: AsyncSession = Depends(get_db)):
"""
2단계 MFA 인증 (TOTP).
mfa_token (POST /api/auth/login 에서 발급된 임시 토큰) +
totp_code (앱에서 생성된 6자리 코드) 를 검증하여 access_token 발급.
"""
from core.mfa import verify_mfa_pending_token, decrypt_totp_secret, verify_totp
username = verify_mfa_pending_token(body.mfa_token)
if not username:
raise HTTPException(status_code=401, detail="MFA 토큰이 유효하지 않거나 만료되었습니다.")
result = await db.execute(select(User).where(User.username == username))
user = result.scalars().first()
if not user or not user.is_active or not user.mfa_enabled:
raise HTTPException(status_code=401, detail="사용자를 찾을 수 없거나 MFA가 비활성화되어 있습니다.")
# TOTP 코드 검증
try:
secret = decrypt_totp_secret(user.totp_secret_enc)
if not verify_totp(secret, body.totp_code):
raise HTTPException(status_code=401, detail="TOTP 코드가 올바르지 않습니다.")
except Exception as e:
if isinstance(e, HTTPException):
raise
raise HTTPException(status_code=500, detail="MFA 검증 중 오류가 발생했습니다.")
# 로그인 완료
user.last_login_at = datetime.now()
await db.commit()
token = create_access_token({"sub": user.username, "role": user.role})
return TokenResponse(
access_token=token,
must_change_pw=user.must_change_pw,
username=user.username,
display_name=user.display_name or user.username,
role=user.role,
mfa_enabled=True,
)
@router.post("/change-password")
async def change_password(
body: ChangePasswordRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if not verify_password(body.current_password, current_user.hashed_pw):
raise HTTPException(status_code=400, detail="현재 비밀번호가 올바르지 않습니다")
if body.new_password == body.current_password:
raise HTTPException(status_code=400, detail="새 비밀번호는 현재 비밀번호와 달라야 합니다")
# 비밀번호 복잡도 검증 (상용화 보안 정책)
from core.license import check_password_complexity
ok, msg = check_password_complexity(body.new_password)
if not ok:
raise HTTPException(status_code=400, detail=msg)
# DB 재조회 후 업데이트
result = await db.execute(select(User).where(User.id == current_user.id))
user = result.scalars().first()
user.hashed_pw = hash_password(body.new_password)
user.must_change_pw = False
await db.commit()
return {"message": "비밀번호가 변경되었습니다"}
@router.get("/me")
async def me(current_user: User = Depends(get_current_user)):
return {
"id": current_user.id,
"username": current_user.username,
"display_name": current_user.display_name or current_user.username,
"role": current_user.role,
"must_change_pw": current_user.must_change_pw,
"inst_code": current_user.inst_code,
}
@router.post("/logout")
async def logout():
"""JWT는 stateless — 클라이언트에서 토큰 삭제."""
return {"message": "로그아웃되었습니다"}
# ── A-D2: MFA (TOTP) 설정 엔드포인트 ─────────────────────────────────────────
@router.get("/mfa/status")
async def mfa_status(current_user: User = Depends(get_current_user)):
"""현재 사용자의 MFA 활성화 여부 조회."""
return {
"username": current_user.username,
"mfa_enabled": current_user.mfa_enabled,
}
@router.post("/mfa/setup")
async def mfa_setup(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
MFA 등록 시작 — TOTP 시크릿 + QR 코드 생성.
새 시크릿을 생성하여 DB에 (미활성 상태로) 저장.
반환된 QR 코드를 앱으로 스캔 후 /mfa/enable 로 활성화.
"""
from core.mfa import (
generate_totp_secret, get_totp_uri,
generate_qr_base64, encrypt_totp_secret,
)
secret = generate_totp_secret()
uri = get_totp_uri(current_user.username, secret)
qr_b64 = generate_qr_base64(uri)
# 시크릿 암호화 후 DB 저장 (mfa_enabled는 아직 False)
result = await db.execute(select(User).where(User.id == current_user.id))
user = result.scalars().first()
user.totp_secret_enc = encrypt_totp_secret(secret)
await db.commit()
return {
"message": "QR 코드를 앱(Google Authenticator 등)으로 스캔하세요.",
"qr_uri": uri,
"qr_base64": qr_b64, # None 이면 qr_uri 직접 입력
"secret": secret, # 앱 수동 입력용 (화면 표시 후 숨김)
}
@router.post("/mfa/enable")
async def mfa_enable(
body: MfaSetupRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
MFA 활성화 — 앱에서 생성된 TOTP 코드로 설정 확인.
/mfa/setup 후 앱에서 생성된 코드를 입력하여 MFA를 활성화한다.
이미 MFA가 활성화된 경우 400 반환.
"""
from core.mfa import decrypt_totp_secret, verify_totp
if current_user.mfa_enabled:
raise HTTPException(400, "MFA가 이미 활성화되어 있습니다.")
if not current_user.totp_secret_enc:
raise HTTPException(400, "MFA 시크릿이 없습니다. 먼저 /mfa/setup을 호출하세요.")
try:
secret = decrypt_totp_secret(current_user.totp_secret_enc)
except Exception:
raise HTTPException(500, "MFA 시크릿 복호화 실패.")
if not verify_totp(secret, body.totp_code):
raise HTTPException(400, "TOTP 코드가 올바르지 않습니다. 앱을 확인하세요.")
result = await db.execute(select(User).where(User.id == current_user.id))
user = result.scalars().first()
user.mfa_enabled = True
await db.commit()
return {"message": "MFA가 활성화되었습니다.", "mfa_enabled": True}
@router.post("/mfa/disable")
async def mfa_disable(
body: MfaDisableRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
MFA 비활성화 — 비밀번호 + TOTP 코드 이중 확인.
ADMIN은 자신의 MFA만 비활성화 가능.
타 사용자의 MFA 강제 비활성화는 /admin/users/{id}/mfa-reset 사용.
"""
from core.mfa import decrypt_totp_secret, verify_totp
if not current_user.mfa_enabled:
raise HTTPException(400, "MFA가 활성화되어 있지 않습니다.")
if not verify_password(body.password, current_user.hashed_pw):
raise HTTPException(401, "비밀번호가 올바르지 않습니다.")
try:
secret = decrypt_totp_secret(current_user.totp_secret_enc)
if not verify_totp(secret, body.totp_code):
raise HTTPException(400, "TOTP 코드가 올바르지 않습니다.")
except HTTPException:
raise
except Exception:
raise HTTPException(500, "MFA 검증 중 오류가 발생했습니다.")
result = await db.execute(select(User).where(User.id == current_user.id))
user = result.scalars().first()
user.mfa_enabled = False
user.totp_secret_enc = None
await db.commit()
return {"message": "MFA가 비활성화되었습니다.", "mfa_enabled": False}
@router.post("/admin/users/{user_id}/mfa-reset", status_code=200)
async def admin_mfa_reset(
user_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
관리자 전용: 특정 사용자의 MFA 강제 초기화.
ADMIN 권한 필요 (본인 제외).
사용자가 MFA 앱을 분실했거나 접근 불가 시 사용.
"""
if current_user.role != "ADMIN":
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
result = await db.execute(select(User).where(User.id == user_id))
target = result.scalars().first()
if not target:
raise HTTPException(404, "사용자를 찾을 수 없습니다.")
if target.id == current_user.id:
raise HTTPException(400, "자신의 MFA는 /mfa/disable 을 사용하세요.")
target.mfa_enabled = False
target.totp_secret_enc = None
await db.commit()
return {
"message": f"{target.username}의 MFA가 초기화되었습니다.",
"reset_by": current_user.username,
"target_user": target.username,
}
@router.get("/users")
async def list_users(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""관리자 전용: 전체 사용자 목록."""
if current_user.role != "ADMIN":
raise HTTPException(status_code=403, detail="관리자만 접근 가능합니다")
result = await db.execute(select(User).order_by(User.id))
users = result.scalars().all()
return [
{
"id": u.id,
"username": u.username,
"display_name": u.display_name,
"role": u.role,
"is_active": u.is_active,
"must_change_pw": u.must_change_pw,
"last_login_at": u.last_login_at.isoformat() if u.last_login_at else None,
}
for u in users
]
@router.post("/admin/users/{user_id}/unlock", status_code=200)
async def admin_unlock_user(
user_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""관리자 전용: 잠긴 계정 강제 해제 (ADMIN 권한 필요)."""
if current_user.role != "ADMIN":
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
result = await db.execute(select(User).where(User.id == user_id))
target = result.scalars().first()
if not target:
raise HTTPException(404, "사용자를 찾을 수 없습니다.")
target.failed_login_count = 0
target.locked_until = None
await db.commit()
return {
"message": f"{target.username} 계정 잠금이 해제되었습니다.",
"unlocked_by": current_user.username,
"target_user": target.username,
}
@router.get("/admin/users/{user_id}/lock-status")
async def admin_user_lock_status(
user_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""관리자 전용: 사용자 계정 잠금 상태 조회 (ADMIN 권한 필요)."""
if current_user.role != "ADMIN":
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
result = await db.execute(select(User).where(User.id == user_id))
target = result.scalars().first()
if not target:
raise HTTPException(404, "사용자를 찾을 수 없습니다.")
from datetime import datetime as dt
is_locked = bool(target.locked_until and target.locked_until > dt.now())
remaining_min = 0
if is_locked:
remaining_min = int((target.locked_until - dt.now()).total_seconds() / 60) + 1
return {
"username": target.username,
"is_locked": is_locked,
"failed_login_count": target.failed_login_count or 0,
"locked_until": target.locked_until.isoformat() if target.locked_until else None,
"remaining_minutes": remaining_min,
}
# ── OAuth2 소셜 로그인 ────────────────────────────────────────────────────────
@router.get("/oauth/providers")
async def oauth_providers():
"""활성화된 OAuth 제공자 목록 반환 (로그인 페이지에서 버튼 표시 여부 결정)."""
from core.oauth import get_enabled_providers
return {"providers": get_enabled_providers()}
@router.get("/oauth/{provider}/start")
async def oauth_start(provider: str):
"""OAuth 인증 흐름 시작 — 제공자 인증 페이지로 리디렉트."""
from core.oauth import build_auth_url, PROVIDERS
from fastapi.responses import RedirectResponse
if provider not in PROVIDERS or not PROVIDERS[provider]["enabled"]:
raise HTTPException(400, f"OAuth 제공자 '{provider}'가 설정되지 않았습니다.")
url = build_auth_url(provider)
return RedirectResponse(url)
@router.get("/oauth/{provider}/callback")
async def oauth_callback(
provider: str,
code: str = "",
state: str = "",
error: str = "",
db: AsyncSession = Depends(get_db),
):
"""
OAuth 콜백 처리 — 인가 코드로 토큰 교환 후 GUARDiA 계정과 연결.
- 이메일 기반으로 기존 계정 조회 또는 신규 계정 자동 생성 (auth_type=oauth)
- 성공 시 /login 페이지로 리디렉트하며 JWT 토큰을 URL 파라미터로 전달
"""
from core.oauth import verify_state, exchange_code, extract_email, extract_name, PROVIDERS
from fastapi.responses import RedirectResponse
LOGIN_FAIL_URL = "/login?error=oauth_failed"
if error:
return RedirectResponse(f"/login?error={error}")
if not verify_state(state):
return RedirectResponse(f"{LOGIN_FAIL_URL}&reason=state_mismatch")
if provider not in PROVIDERS or not PROVIDERS[provider]["enabled"]:
return RedirectResponse(f"{LOGIN_FAIL_URL}&reason=unknown_provider")
userinfo = await exchange_code(provider, code)
if not userinfo:
return RedirectResponse(f"{LOGIN_FAIL_URL}&reason=token_exchange")
email = extract_email(provider, userinfo)
if not email:
return RedirectResponse(f"{LOGIN_FAIL_URL}&reason=no_email")
# 기존 계정 조회 (이메일 기준)
result = await db.execute(select(User).where(User.email == email, User.is_active == True))
user = result.scalars().first()
if not user:
# 자동 계정 생성 (CUSTOMER 역할, 비밀번호 없음)
display = extract_name(provider, userinfo)
username = email.split("@")[0].replace(".", "_")[:30]
# username 중복 처리
existing = (await db.execute(select(User).where(User.username == username))).scalars().first()
if existing:
username = f"{username}_{provider[:3]}"
user = User(
username = username,
display_name = display or username,
email = email,
hashed_pw = "", # OAuth 계정은 비밀번호 없음
role = "CUSTOMER",
is_active = True,
must_change_pw= False,
auth_type = f"oauth:{provider}",
)
db.add(user)
await db.flush()
user.last_login_at = datetime.now()
await db.commit()
token = create_access_token({"sub": user.username, "role": user.role})
# 토큰을 URL 파라미터로 전달 → 로그인 페이지 JS에서 처리
return RedirectResponse(f"/login?oauth_token={token}&username={user.username}&role={user.role}")