guardia-itsm/routers/license.py
DESKTOP-TKLFCPRython 64c27c3509 feat(itsm): G-1~G-12 확장 기능 + 하네스/봇/설치스크립트 구현
G-1: 메신저 Webhook Relay + _send_to_room 실제 httpx 호출 구현
G-2: POST /api/tasks/bulk SR 대량작업 엔드포인트 (최대 100건)
G-3: 라이선스 만료 알림 스케줄러 (매일 09:00 KST)
G-4: 체험판 upgrade_banner 필드 + license.py 배너 로직
G-5: core/auto_rca.py + incidents/problem auto-rca 엔드포인트
G-6: core/deploy_impact.py + vibe impact-analysis 엔드포인트
G-7: core/ticket_classifier.py + SR 생성 시 AI 분류 + ai-suggestion API
G-8: VulnPatchRecord 모델 + vuln_scan 패치추적 4개 엔드포인트
G-9: core/jira_sync.py + gateway Jira/Confluence 연동 엔드포인트
G-10: core/push_notify.py + routers/push.py + PushSubscription 모델
G-11: approvals 다중승인 (위임/서명/기한초과/마감연장)
G-12: alembic.ini + migrations/ + cicd/migrate_to_postgres.sh

하네스: guardia-orchestrator 확장기능 Phase 반영
봇명령어: /sr /status /license /bulk 슬래시 명령어 추가
설치스크립트: setup/ (Ubuntu, CentOS, RHEL, Windows) --test 옵션 포함

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 18:18:52 +09:00

351 lines
13 KiB
Python

"""
라이선스 관리 API — GUARDiA ITSM.
엔드포인트:
POST /api/license/trial — 7일 무료 체험 라이선스 발급 (ADMIN 전용, 설치당 1회)
POST /api/license/activate — 라이선스 키 등록 (ADMIN 전용)
GET /api/license/status — 현재 라이선스 상태 조회
POST /api/license/verify — 라이선스 키 검증만 (등록 없음, ADMIN 전용)
DELETE /api/license — 라이선스 비활성화 (ADMIN 전용)
GET /api/license/history — 라이선스 등록 이력 (ADMIN 전용)
"""
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user, require_admin_role
from core.license import (
validate_license,
generate_trial_key,
invalidate_license_cache,
set_cached_license,
get_cached_license,
LICENSE_MASTER_KEY,
TRIAL_DURATION_DAYS,
)
from database import get_db
from models import LicenseRecord
router = APIRouter(prefix="/api/license", tags=["license"])
# ── 스키마 ──────────────────────────────────────────────────────────────────────
class ActivateRequest(BaseModel):
license_key: str
class TrialRequest(BaseModel):
customer: str = "GUARDiA 체험판"
class LicenseStatusOut(BaseModel):
activated: bool
valid: bool
expired: bool
expiry_warning: bool
is_trial: bool = False
license_id: Optional[str] = None
edition: Optional[str] = None
customer: Optional[str] = None
issued_at: Optional[str] = None
expires_at: Optional[str] = None
days_remaining: Optional[int] = None
limits: Optional[dict] = None
message: str
upgrade_banner: Optional[dict] = None
# ── 내부 헬퍼 ───────────────────────────────────────────────────────────────────
async def _load_active_license(db: AsyncSession) -> Optional[LicenseRecord]:
result = await db.execute(
select(LicenseRecord)
.where(LicenseRecord.is_active == True)
.order_by(LicenseRecord.activated_at.desc())
.limit(1)
)
return result.scalars().first()
async def get_license_status(db: AsyncSession) -> dict:
"""DB에서 활성 라이선스 조회 후 검증 결과 반환. 캐시 우선."""
cached = get_cached_license()
if cached is not None:
return cached
record = await _load_active_license(db)
if not record:
status = {
"activated": False, "valid": False, "expired": False,
"expiry_warning": False, "message": "활성 라이선스가 없습니다.",
}
set_cached_license(status)
return status
status = validate_license(record.license_key)
status["activated"] = True
status["is_trial"] = bool(record.is_trial)
if status.get("valid"):
edition_label = status["edition"]
trial_label = " [체험판]" if status.get("is_trial") else ""
status["message"] = f"{edition_label}{trial_label} 라이선스 활성 ({status['days_remaining']}일 남음)"
elif status.get("expired"):
status["message"] = f"라이선스가 만료되었습니다 (만료일: {status.get('expires_at', '')})"
else:
status["message"] = status.get("error", "라이선스 오류")
# G-4: 체험판 업그레이드 배너 계산
banner = None
if status.get("is_trial") and status.get("valid"):
days = status.get("days_remaining") or 0
if days <= 3:
if days <= 1:
urgency = "critical"
elif days <= 2:
urgency = "urgent"
else:
urgency = "warn"
banner = {
"show": True,
"urgency": urgency,
"days_remaining": days,
"message": f"체험판이 {days}일 후 만료됩니다. 정식 라이선스를 구매하세요.",
"cta_url": "/license",
}
status["upgrade_banner"] = banner
set_cached_license(status)
return status
# ── 공개 API: 라이선스 상태 조회 ──────────────────────────────────────────────
@router.get("/status", response_model=LicenseStatusOut)
async def license_status(
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user),
):
"""현재 라이선스 상태 조회 (로그인 사용자 전용)."""
status = await get_license_status(db)
return LicenseStatusOut(
activated = status.get("activated", False),
valid = status.get("valid", False),
expired = status.get("expired", False),
expiry_warning = status.get("expiry_warning", False),
is_trial = status.get("is_trial", False),
license_id = status.get("license_id"),
edition = status.get("edition"),
customer = status.get("customer"),
issued_at = status.get("issued_at"),
expires_at = status.get("expires_at"),
days_remaining = status.get("days_remaining"),
limits = status.get("limits"),
message = status.get("message", ""),
upgrade_banner = status.get("upgrade_banner"),
)
# ── ADMIN 전용: 무료 체험 라이선스 발급 ──────────────────────────────────────
@router.post("/trial", status_code=200)
async def activate_trial_license(
body: TrialRequest,
db: AsyncSession = Depends(get_db),
current_user = Depends(require_admin_role),
):
"""
7일 무료 체험 라이선스 발급 및 즉시 활성화 (ADMIN 전용).
- GUARDIA_LICENSE_KEY 환경변수 불필요 (내장 Trial 마스터 키 사용)
- 설치당 1회만 허용 (이전 체험 이력이 있으면 거부)
- TRIAL 에디션: 기관 1, 사용자 10, 서버 20 (Community 동일)
"""
# 이전 체험 이력 확인 (설치당 1회 제한)
prev = await db.execute(
select(LicenseRecord).where(LicenseRecord.is_trial == True)
)
if prev.scalars().first():
raise HTTPException(
status_code=409,
detail=(
f"무료 체험은 설치당 1회만 가능합니다. "
f"이전 체험이 이미 사용되었습니다. "
f"정식 라이선스를 구매하세요."
),
)
# 현재 활성 라이선스가 있으면 비활성화
existing = await _load_active_license(db)
if existing:
existing.is_active = False
# 체험 라이선스 생성
trial_key = generate_trial_key(body.customer)
status = validate_license(trial_key)
record = LicenseRecord(
license_key = trial_key,
license_id = status["license_id"],
edition = status["edition"],
customer = status["customer"],
issued_at = datetime.fromisoformat(status["issued_at"]),
expires_at = datetime.fromisoformat(status["expires_at"]),
limits = status["limits"],
is_active = True,
is_trial = True,
activated_by = current_user.username,
)
db.add(record)
await db.commit()
invalidate_license_cache()
return {
"message": f"🎁 {TRIAL_DURATION_DAYS}일 무료 체험이 시작되었습니다!",
"license_id": status["license_id"],
"edition": status["edition"],
"customer": status["customer"],
"expires_at": status["expires_at"],
"days_remaining": status["days_remaining"],
"valid": status["valid"],
"is_trial": True,
"license_key": trial_key,
}
# ── ADMIN 전용: 라이선스 활성화 ───────────────────────────────────────────────
@router.post("/activate", status_code=200)
async def activate_license(
body: ActivateRequest,
db: AsyncSession = Depends(get_db),
current_user = Depends(require_admin_role),
):
"""
라이선스 키 등록 및 활성화 (ADMIN 전용).
- 키 유효성 검증 후 DB에 저장
- 기존 활성 라이선스는 비활성화
- 만료된 키도 등록 가능 (경고 표시)
"""
if not LICENSE_MASTER_KEY:
raise HTTPException(
status_code=500,
detail="서버에 GUARDIA_LICENSE_KEY가 설정되지 않았습니다. 관리자에게 문의하세요.",
)
key = body.license_key.strip()
status = validate_license(key)
if "error" in status and not status.get("expired"):
raise HTTPException(status_code=400, detail=f"라이선스 키 오류: {status['error']}")
# 기존 활성 라이선스 비활성화
existing = await _load_active_license(db)
if existing:
existing.is_active = False
record = LicenseRecord(
license_key = key,
license_id = status["license_id"],
edition = status["edition"],
customer = status["customer"],
issued_at = datetime.fromisoformat(status["issued_at"]),
expires_at = datetime.fromisoformat(status["expires_at"]),
limits = status["limits"],
is_active = True,
is_trial = False,
activated_by = current_user.username,
)
db.add(record)
await db.commit()
invalidate_license_cache()
msg = (
f"{status['edition']} 라이선스가 활성화되었습니다 "
f"({status['days_remaining']}일 남음)."
)
if status.get("expired"):
msg = f"경고: 만료된 라이선스입니다 (만료일: {status['expires_at']}). 갱신이 필요합니다."
return {
"message": msg,
"license_id": status["license_id"],
"edition": status["edition"],
"customer": status["customer"],
"expires_at": status["expires_at"],
"days_remaining": status["days_remaining"],
"valid": status["valid"],
}
# ── ADMIN 전용: 라이선스 키 검증 (등록 없음) ──────────────────────────────────
@router.post("/verify")
async def verify_license_key(
body: ActivateRequest,
current_user = Depends(require_admin_role),
):
"""라이선스 키 내용 확인 (등록하지 않고 검증만, ADMIN 전용)."""
if not LICENSE_MASTER_KEY:
raise HTTPException(status_code=500, detail="GUARDIA_LICENSE_KEY 미설정")
status = validate_license(body.license_key.strip())
if "error" in status and not status.get("expired"):
raise HTTPException(status_code=400, detail=status["error"])
return status
# ── ADMIN 전용: 라이선스 비활성화 ────────────────────────────────────────────
@router.delete("/", status_code=200)
async def deactivate_license(
db: AsyncSession = Depends(get_db),
current_user = Depends(require_admin_role),
):
"""활성 라이선스 비활성화 (ADMIN 전용). 시스템이 제한 모드로 전환됩니다."""
record = await _load_active_license(db)
if not record:
raise HTTPException(status_code=404, detail="활성 라이선스가 없습니다.")
record.is_active = False
await db.commit()
invalidate_license_cache()
return {"message": "라이선스가 비활성화되었습니다. 시스템이 제한 모드로 전환됩니다."}
# ── 라이선스 이력 조회 ─────────────────────────────────────────────────────────
@router.get("/history")
async def license_history(
db: AsyncSession = Depends(get_db),
current_user = Depends(require_admin_role),
):
"""등록된 모든 라이선스 이력 조회 (ADMIN 전용)."""
result = await db.execute(
select(LicenseRecord).order_by(LicenseRecord.activated_at.desc())
)
records = result.scalars().all()
return [
{
"id": r.id,
"license_id": r.license_id,
"edition": r.edition,
"customer": r.customer,
"issued_at": r.issued_at.isoformat() if r.issued_at else None,
"expires_at": r.expires_at.isoformat() if r.expires_at else None,
"is_active": r.is_active,
"is_trial": bool(r.is_trial),
"activated_by": r.activated_by,
"activated_at": r.activated_at.isoformat() if r.activated_at else None,
}
for r in records
]