""" 라이선스 관리 API — GUARDiA ITSM. 엔드포인트: POST /api/license/trial — 7일 무료 체험 라이선스 발급 (ADMIN 전용, 설치당 1회) POST /api/license/activate — 라이선스 키 등록 (ADMIN 전용) GET /api/license/status — 현재 라이선스 상태 조회 POST /api/license/verify — 라이선스 키 검증만 (등록 없음, ADMIN 전용) DELETE /api/license — 라이선스 비활성화 (ADMIN 전용) GET /api/license/history — 라이선스 등록 이력 (ADMIN 전용) """ from datetime import datetime from typing import Optional from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user, require_admin_role from core.license import ( validate_license, generate_trial_key, invalidate_license_cache, set_cached_license, get_cached_license, LICENSE_MASTER_KEY, TRIAL_DURATION_DAYS, ) from database import get_db from models import LicenseRecord router = APIRouter(prefix="/api/license", tags=["license"]) # ── 스키마 ────────────────────────────────────────────────────────────────────── class ActivateRequest(BaseModel): license_key: str class TrialRequest(BaseModel): customer: str = "GUARDiA 체험판" class LicenseStatusOut(BaseModel): activated: bool valid: bool expired: bool expiry_warning: bool is_trial: bool = False license_id: Optional[str] = None edition: Optional[str] = None customer: Optional[str] = None issued_at: Optional[str] = None expires_at: Optional[str] = None days_remaining: Optional[int] = None limits: Optional[dict] = None message: str upgrade_banner: Optional[dict] = None # ── 내부 헬퍼 ─────────────────────────────────────────────────────────────────── async def _load_active_license(db: AsyncSession) -> Optional[LicenseRecord]: result = await db.execute( select(LicenseRecord) .where(LicenseRecord.is_active == True) .order_by(LicenseRecord.activated_at.desc()) .limit(1) ) return result.scalars().first() async def get_license_status(db: AsyncSession) -> dict: """DB에서 활성 라이선스 조회 후 검증 결과 반환. 캐시 우선.""" cached = get_cached_license() if cached is not None: return cached record = await _load_active_license(db) if not record: status = { "activated": False, "valid": False, "expired": False, "expiry_warning": False, "message": "활성 라이선스가 없습니다.", } set_cached_license(status) return status status = validate_license(record.license_key) status["activated"] = True status["is_trial"] = bool(record.is_trial) if status.get("valid"): edition_label = status["edition"] trial_label = " [체험판]" if status.get("is_trial") else "" status["message"] = f"{edition_label}{trial_label} 라이선스 활성 ({status['days_remaining']}일 남음)" elif status.get("expired"): status["message"] = f"라이선스가 만료되었습니다 (만료일: {status.get('expires_at', '')})" else: status["message"] = status.get("error", "라이선스 오류") # G-4: 체험판 업그레이드 배너 계산 banner = None if status.get("is_trial") and status.get("valid"): days = status.get("days_remaining") or 0 if days <= 3: if days <= 1: urgency = "critical" elif days <= 2: urgency = "urgent" else: urgency = "warn" banner = { "show": True, "urgency": urgency, "days_remaining": days, "message": f"체험판이 {days}일 후 만료됩니다. 정식 라이선스를 구매하세요.", "cta_url": "/license", } status["upgrade_banner"] = banner set_cached_license(status) return status # ── 공개 API: 라이선스 상태 조회 ────────────────────────────────────────────── @router.get("/status", response_model=LicenseStatusOut) async def license_status( db: AsyncSession = Depends(get_db), current_user = Depends(get_current_user), ): """현재 라이선스 상태 조회 (로그인 사용자 전용).""" status = await get_license_status(db) return LicenseStatusOut( activated = status.get("activated", False), valid = status.get("valid", False), expired = status.get("expired", False), expiry_warning = status.get("expiry_warning", False), is_trial = status.get("is_trial", False), license_id = status.get("license_id"), edition = status.get("edition"), customer = status.get("customer"), issued_at = status.get("issued_at"), expires_at = status.get("expires_at"), days_remaining = status.get("days_remaining"), limits = status.get("limits"), message = status.get("message", ""), upgrade_banner = status.get("upgrade_banner"), ) # ── ADMIN 전용: 무료 체험 라이선스 발급 ────────────────────────────────────── @router.post("/trial", status_code=200) async def activate_trial_license( body: TrialRequest, db: AsyncSession = Depends(get_db), current_user = Depends(require_admin_role), ): """ 7일 무료 체험 라이선스 발급 및 즉시 활성화 (ADMIN 전용). - GUARDIA_LICENSE_KEY 환경변수 불필요 (내장 Trial 마스터 키 사용) - 설치당 1회만 허용 (이전 체험 이력이 있으면 거부) - TRIAL 에디션: 기관 1, 사용자 10, 서버 20 (Community 동일) """ # 이전 체험 이력 확인 (설치당 1회 제한) prev = await db.execute( select(LicenseRecord).where(LicenseRecord.is_trial == True) ) if prev.scalars().first(): raise HTTPException( status_code=409, detail=( f"무료 체험은 설치당 1회만 가능합니다. " f"이전 체험이 이미 사용되었습니다. " f"정식 라이선스를 구매하세요." ), ) # 현재 활성 라이선스가 있으면 비활성화 existing = await _load_active_license(db) if existing: existing.is_active = False # 체험 라이선스 생성 trial_key = generate_trial_key(body.customer) status = validate_license(trial_key) record = LicenseRecord( license_key = trial_key, license_id = status["license_id"], edition = status["edition"], customer = status["customer"], issued_at = datetime.fromisoformat(status["issued_at"]), expires_at = datetime.fromisoformat(status["expires_at"]), limits = status["limits"], is_active = True, is_trial = True, activated_by = current_user.username, ) db.add(record) await db.commit() invalidate_license_cache() return { "message": f"🎁 {TRIAL_DURATION_DAYS}일 무료 체험이 시작되었습니다!", "license_id": status["license_id"], "edition": status["edition"], "customer": status["customer"], "expires_at": status["expires_at"], "days_remaining": status["days_remaining"], "valid": status["valid"], "is_trial": True, "license_key": trial_key, } # ── ADMIN 전용: 라이선스 활성화 ─────────────────────────────────────────────── @router.post("/activate", status_code=200) async def activate_license( body: ActivateRequest, db: AsyncSession = Depends(get_db), current_user = Depends(require_admin_role), ): """ 라이선스 키 등록 및 활성화 (ADMIN 전용). - 키 유효성 검증 후 DB에 저장 - 기존 활성 라이선스는 비활성화 - 만료된 키도 등록 가능 (경고 표시) """ if not LICENSE_MASTER_KEY: raise HTTPException( status_code=500, detail="서버에 GUARDIA_LICENSE_KEY가 설정되지 않았습니다. 관리자에게 문의하세요.", ) key = body.license_key.strip() status = validate_license(key) if "error" in status and not status.get("expired"): raise HTTPException(status_code=400, detail=f"라이선스 키 오류: {status['error']}") # 기존 활성 라이선스 비활성화 existing = await _load_active_license(db) if existing: existing.is_active = False record = LicenseRecord( license_key = key, license_id = status["license_id"], edition = status["edition"], customer = status["customer"], issued_at = datetime.fromisoformat(status["issued_at"]), expires_at = datetime.fromisoformat(status["expires_at"]), limits = status["limits"], is_active = True, is_trial = False, activated_by = current_user.username, ) db.add(record) await db.commit() invalidate_license_cache() msg = ( f"{status['edition']} 라이선스가 활성화되었습니다 " f"({status['days_remaining']}일 남음)." ) if status.get("expired"): msg = f"경고: 만료된 라이선스입니다 (만료일: {status['expires_at']}). 갱신이 필요합니다." return { "message": msg, "license_id": status["license_id"], "edition": status["edition"], "customer": status["customer"], "expires_at": status["expires_at"], "days_remaining": status["days_remaining"], "valid": status["valid"], } # ── ADMIN 전용: 라이선스 키 검증 (등록 없음) ────────────────────────────────── @router.post("/verify") async def verify_license_key( body: ActivateRequest, current_user = Depends(require_admin_role), ): """라이선스 키 내용 확인 (등록하지 않고 검증만, ADMIN 전용).""" if not LICENSE_MASTER_KEY: raise HTTPException(status_code=500, detail="GUARDIA_LICENSE_KEY 미설정") status = validate_license(body.license_key.strip()) if "error" in status and not status.get("expired"): raise HTTPException(status_code=400, detail=status["error"]) return status # ── ADMIN 전용: 라이선스 비활성화 ──────────────────────────────────────────── @router.delete("/", status_code=200) async def deactivate_license( db: AsyncSession = Depends(get_db), current_user = Depends(require_admin_role), ): """활성 라이선스 비활성화 (ADMIN 전용). 시스템이 제한 모드로 전환됩니다.""" record = await _load_active_license(db) if not record: raise HTTPException(status_code=404, detail="활성 라이선스가 없습니다.") record.is_active = False await db.commit() invalidate_license_cache() return {"message": "라이선스가 비활성화되었습니다. 시스템이 제한 모드로 전환됩니다."} # ── 라이선스 이력 조회 ───────────────────────────────────────────────────────── @router.get("/history") async def license_history( db: AsyncSession = Depends(get_db), current_user = Depends(require_admin_role), ): """등록된 모든 라이선스 이력 조회 (ADMIN 전용).""" result = await db.execute( select(LicenseRecord).order_by(LicenseRecord.activated_at.desc()) ) records = result.scalars().all() return [ { "id": r.id, "license_id": r.license_id, "edition": r.edition, "customer": r.customer, "issued_at": r.issued_at.isoformat() if r.issued_at else None, "expires_at": r.expires_at.isoformat() if r.expires_at else None, "is_active": r.is_active, "is_trial": bool(r.is_trial), "activated_by": r.activated_by, "activated_at": r.activated_at.isoformat() if r.activated_at else None, } for r in records ]