G-1: 메신저 Webhook Relay + _send_to_room 실제 httpx 호출 구현 G-2: POST /api/tasks/bulk SR 대량작업 엔드포인트 (최대 100건) G-3: 라이선스 만료 알림 스케줄러 (매일 09:00 KST) G-4: 체험판 upgrade_banner 필드 + license.py 배너 로직 G-5: core/auto_rca.py + incidents/problem auto-rca 엔드포인트 G-6: core/deploy_impact.py + vibe impact-analysis 엔드포인트 G-7: core/ticket_classifier.py + SR 생성 시 AI 분류 + ai-suggestion API G-8: VulnPatchRecord 모델 + vuln_scan 패치추적 4개 엔드포인트 G-9: core/jira_sync.py + gateway Jira/Confluence 연동 엔드포인트 G-10: core/push_notify.py + routers/push.py + PushSubscription 모델 G-11: approvals 다중승인 (위임/서명/기한초과/마감연장) G-12: alembic.ini + migrations/ + cicd/migrate_to_postgres.sh 하네스: guardia-orchestrator 확장기능 Phase 반영 봇명령어: /sr /status /license /bulk 슬래시 명령어 추가 설치스크립트: setup/ (Ubuntu, CentOS, RHEL, Windows) --test 옵션 포함 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
449 lines
16 KiB
Python
449 lines
16 KiB
Python
"""
|
|
GUARDiA 라이선스 엔진 — 생성/검증/관리.
|
|
|
|
라이선스 키 구조:
|
|
GRD-{base64url(iv[12] + ciphertext + gcm_tag[16] + hmac_sig[8])}
|
|
|
|
보안 설계:
|
|
- AES-256-GCM 암호화 (iv 12B + tag 16B)
|
|
- HMAC-SHA256 서명 (8B prefix — 위변조 즉시 탐지)
|
|
- 완전 오프라인 검증 (외부 API 호출 없음)
|
|
- 환경변수 GUARDIA_LICENSE_KEY (64 hex chars = 32 bytes) 필수
|
|
|
|
라이선스 에디션:
|
|
TRIAL — 7일 무료 체험: 기관 1개, 사용자 10명, 서버 20대 (설치당 1회)
|
|
COMMUNITY — 무료: 기관 1개, 사용자 10명
|
|
STANDARD — 유료: 기관 50개, 사용자 200명
|
|
ENTERPRISE — 유료: 무제한
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import hmac as _hmac
|
|
import json
|
|
import os
|
|
import secrets
|
|
from datetime import datetime, timedelta, timezone
|
|
from enum import Enum
|
|
from typing import Optional
|
|
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
|
|
|
# ── 환경 설정 ──────────────────────────────────────────────────────────────────
|
|
|
|
LICENSE_MASTER_KEY: str = os.environ.get("GUARDIA_LICENSE_KEY", "")
|
|
_HMAC_SUFFIX = b"guardia-license-hmac-v1"
|
|
_IV_LEN = 12
|
|
_SIG_LEN = 8 # HMAC-SHA256 중 앞 8바이트
|
|
|
|
# 체험판 전용 내장 마스터 키 (프로덕션 키와 완전히 분리)
|
|
# TRIAL 에디션 키만 발급 가능 — 설치당 1회 제한으로 남용 방지
|
|
_TRIAL_MASTER_KEY = "477561524469415f545249414c5f4b65795f4e6f745f466f725f50726f647563"
|
|
|
|
TRIAL_DURATION_DAYS = 7
|
|
|
|
|
|
# ── 에디션 정의 ────────────────────────────────────────────────────────────────
|
|
|
|
class LicenseEdition(str, Enum):
|
|
TRIAL = "TRIAL"
|
|
COMMUNITY = "COMMUNITY"
|
|
STANDARD = "STANDARD"
|
|
ENTERPRISE = "ENTERPRISE"
|
|
|
|
|
|
EDITION_LIMITS: dict[str, dict] = {
|
|
LicenseEdition.TRIAL: {
|
|
"max_institutions": 1,
|
|
"max_users": 10,
|
|
"max_servers": 20,
|
|
"features": ["MFA"],
|
|
"trial": True,
|
|
},
|
|
LicenseEdition.COMMUNITY: {
|
|
"max_institutions": 1,
|
|
"max_users": 10,
|
|
"max_servers": 20,
|
|
"features": ["MFA"],
|
|
},
|
|
LicenseEdition.STANDARD: {
|
|
"max_institutions": 50,
|
|
"max_users": 200,
|
|
"max_servers": 500,
|
|
"features": ["MFA", "LDAP", "PAM", "AI_AGENTS"],
|
|
},
|
|
LicenseEdition.ENTERPRISE: {
|
|
"max_institutions": -1, # -1 = 무제한
|
|
"max_users": -1,
|
|
"max_servers": -1,
|
|
"features": ["MFA", "LDAP", "PAM", "AI_AGENTS",
|
|
"VULN_SCAN", "CICD", "ANALYTICS", "FINOPS"],
|
|
},
|
|
}
|
|
|
|
# 만료 경고 기준 (일)
|
|
EXPIRY_WARN_DAYS = 30
|
|
|
|
# ── 내부 유틸 ──────────────────────────────────────────────────────────────────
|
|
|
|
def _require_master_key(override: Optional[str] = None) -> str:
|
|
mk = override or LICENSE_MASTER_KEY
|
|
if not mk:
|
|
raise RuntimeError(
|
|
"GUARDIA_LICENSE_KEY 환경변수가 설정되지 않았습니다. "
|
|
".env 파일에 64자리 hex 값(32바이트)을 설정하세요."
|
|
)
|
|
if len(mk) != 64:
|
|
raise ValueError(f"GUARDIA_LICENSE_KEY는 64자리 hex여야 합니다 (현재: {len(mk)}자)")
|
|
return mk
|
|
|
|
|
|
def _derive_keys(master_hex: str) -> tuple[bytes, bytes]:
|
|
"""master_hex → (aes_key: 32B, hmac_key: 32B)."""
|
|
master = bytes.fromhex(master_hex)
|
|
aes_key = hashlib.sha256(master + b"guardia-aes-v1").digest()
|
|
hmac_key = hashlib.sha256(master + _HMAC_SUFFIX).digest()
|
|
return aes_key, hmac_key
|
|
|
|
|
|
# ── 라이선스 생성 ───────────────────────────────────────────────────────────────
|
|
|
|
def generate_license_key(
|
|
customer: str,
|
|
edition: LicenseEdition,
|
|
expires_at: datetime,
|
|
license_id: Optional[str] = None,
|
|
custom_limits: Optional[dict] = None,
|
|
master_key_hex: Optional[str] = None,
|
|
) -> str:
|
|
"""
|
|
라이선스 키 생성 (벤더 내부 도구 전용).
|
|
|
|
Args:
|
|
customer: 고객/기관 이름
|
|
edition: COMMUNITY | STANDARD | ENTERPRISE
|
|
expires_at: 만료일시 (UTC)
|
|
license_id: 지정 라이선스 ID (미지정 시 자동 생성)
|
|
custom_limits: 에디션 기본 제한 재정의 (선택)
|
|
master_key_hex: 마스터 키 (미지정 시 환경변수 사용)
|
|
|
|
Returns:
|
|
"GRD-{base64url}" 형태의 라이선스 키 문자열
|
|
"""
|
|
mk = _require_master_key(master_key_hex)
|
|
lid = license_id or f"GRD-{secrets.token_hex(6).upper()}"
|
|
limits = custom_limits or EDITION_LIMITS[edition]
|
|
|
|
if expires_at.tzinfo is None:
|
|
expires_at = expires_at.replace(tzinfo=timezone.utc)
|
|
|
|
payload = json.dumps({
|
|
"license_id": lid,
|
|
"edition": edition.value,
|
|
"customer": customer,
|
|
"issued_at": datetime.now(timezone.utc).isoformat(),
|
|
"expires_at": expires_at.isoformat(),
|
|
"limits": limits,
|
|
}, ensure_ascii=False, separators=(",", ":"))
|
|
|
|
aes_key, hmac_key = _derive_keys(mk)
|
|
iv = secrets.token_bytes(_IV_LEN)
|
|
ct = AESGCM(aes_key).encrypt(iv, payload.encode(), None) # ct includes 16B GCM tag
|
|
|
|
blob = iv + ct
|
|
sig = _hmac.new(hmac_key, blob, hashlib.sha256).digest()[:_SIG_LEN]
|
|
|
|
encoded = base64.urlsafe_b64encode(blob + sig).decode().rstrip("=")
|
|
return f"GRD-{encoded}"
|
|
|
|
|
|
# ── 체험판 라이선스 생성 ────────────────────────────────────────────────────────
|
|
|
|
def generate_trial_key(customer: str, license_id: Optional[str] = None) -> str:
|
|
"""
|
|
7일 무료 체험 라이선스 키 생성.
|
|
|
|
- 내장 Trial 마스터 키 사용 (GUARDIA_LICENSE_KEY 불필요)
|
|
- TRIAL 에디션 고정 (COMMUNITY 동일 제한)
|
|
- 만료: 생성 시점 + 7일
|
|
|
|
Returns:
|
|
"GRD-{base64url}" 형태의 체험 라이선스 키
|
|
"""
|
|
expires_at = datetime.now(timezone.utc) + timedelta(days=TRIAL_DURATION_DAYS)
|
|
lid = license_id or f"TRL-{secrets.token_hex(6).upper()}"
|
|
limits = EDITION_LIMITS[LicenseEdition.TRIAL]
|
|
|
|
payload = json.dumps({
|
|
"license_id": lid,
|
|
"edition": LicenseEdition.TRIAL.value,
|
|
"customer": customer,
|
|
"issued_at": datetime.now(timezone.utc).isoformat(),
|
|
"expires_at": expires_at.isoformat(),
|
|
"limits": limits,
|
|
}, ensure_ascii=False, separators=(",", ":"))
|
|
|
|
aes_key, hmac_key = _derive_keys(_TRIAL_MASTER_KEY)
|
|
iv = secrets.token_bytes(_IV_LEN)
|
|
ct = AESGCM(aes_key).encrypt(iv, payload.encode(), None)
|
|
|
|
blob = iv + ct
|
|
sig = _hmac.new(hmac_key, blob, hashlib.sha256).digest()[:_SIG_LEN]
|
|
encoded = base64.urlsafe_b64encode(blob + sig).decode().rstrip("=")
|
|
return f"GRD-{encoded}"
|
|
|
|
|
|
def decode_trial_key(key: str) -> dict:
|
|
"""체험판 키 전용 복호화 (내장 Trial 마스터 키 사용)."""
|
|
if not key.startswith("GRD-"):
|
|
raise ValueError("잘못된 라이선스 키 형식")
|
|
try:
|
|
raw = base64.urlsafe_b64decode(key[4:] + "==")
|
|
except Exception:
|
|
raise ValueError("라이선스 키 base64 디코딩 실패")
|
|
|
|
min_len = _IV_LEN + 16 + _SIG_LEN
|
|
if len(raw) < min_len:
|
|
raise ValueError("라이선스 키 길이 오류")
|
|
|
|
blob, sig = raw[:-_SIG_LEN], raw[-_SIG_LEN:]
|
|
aes_key, hmac_key = _derive_keys(_TRIAL_MASTER_KEY)
|
|
|
|
expected_sig = _hmac.new(hmac_key, blob, hashlib.sha256).digest()[:_SIG_LEN]
|
|
if not _hmac.compare_digest(sig, expected_sig):
|
|
raise ValueError("체험 라이선스 서명 검증 실패")
|
|
|
|
iv, ct = blob[:_IV_LEN], blob[_IV_LEN:]
|
|
try:
|
|
plaintext = AESGCM(aes_key).decrypt(iv, ct, None)
|
|
except Exception:
|
|
raise ValueError("체험 라이선스 복호화 실패")
|
|
|
|
return json.loads(plaintext)
|
|
|
|
|
|
# ── 라이선스 복호화 ─────────────────────────────────────────────────────────────
|
|
|
|
def decode_license_key(key: str, master_key_hex: Optional[str] = None) -> dict:
|
|
"""
|
|
라이선스 키 복호화 및 서명 검증.
|
|
|
|
Returns:
|
|
payload dict (license_id, edition, customer, issued_at, expires_at, limits)
|
|
|
|
Raises:
|
|
ValueError: 키 형식/서명/복호화 오류 시
|
|
"""
|
|
mk = _require_master_key(master_key_hex)
|
|
|
|
if not key.startswith("GRD-"):
|
|
raise ValueError("잘못된 라이선스 키 형식 (GRD- 접두사 필요)")
|
|
|
|
try:
|
|
raw = base64.urlsafe_b64decode(key[4:] + "==")
|
|
except Exception:
|
|
raise ValueError("라이선스 키 base64 디코딩 실패")
|
|
|
|
min_len = _IV_LEN + 16 + _SIG_LEN # iv + gcm_tag + sig
|
|
if len(raw) < min_len:
|
|
raise ValueError("라이선스 키 길이 오류")
|
|
|
|
blob, sig = raw[:-_SIG_LEN], raw[-_SIG_LEN:]
|
|
aes_key, hmac_key = _derive_keys(mk)
|
|
|
|
expected_sig = _hmac.new(hmac_key, blob, hashlib.sha256).digest()[:_SIG_LEN]
|
|
if not _hmac.compare_digest(sig, expected_sig):
|
|
raise ValueError("라이선스 서명 검증 실패 — 위변조 또는 잘못된 키")
|
|
|
|
iv, ct = blob[:_IV_LEN], blob[_IV_LEN:]
|
|
try:
|
|
plaintext = AESGCM(aes_key).decrypt(iv, ct, None)
|
|
except Exception:
|
|
raise ValueError("라이선스 복호화 실패 — 마스터 키 불일치")
|
|
|
|
try:
|
|
return json.loads(plaintext)
|
|
except json.JSONDecodeError:
|
|
raise ValueError("라이선스 페이로드 파싱 실패")
|
|
|
|
|
|
# ── 라이선스 검증 ───────────────────────────────────────────────────────────────
|
|
|
|
def validate_license(key: str, master_key_hex: Optional[str] = None) -> dict:
|
|
"""
|
|
라이선스 키 검증 후 상태 dict 반환.
|
|
|
|
Returns: {
|
|
"valid": bool, # True = 유효하고 미만료
|
|
"expired": bool,
|
|
"expiry_warning": bool, # 만료 30일 이내 경고
|
|
"license_id": str,
|
|
"edition": str,
|
|
"customer": str,
|
|
"issued_at": str,
|
|
"expires_at": str,
|
|
"limits": dict,
|
|
"days_remaining": int,
|
|
"error": str, # 오류 시에만
|
|
}
|
|
"""
|
|
# TRIAL 키는 내장 Trial 마스터 키로 먼저 시도, 실패 시 일반 마스터 키로 재시도
|
|
try:
|
|
info = decode_trial_key(key)
|
|
if info.get("edition") != LicenseEdition.TRIAL.value:
|
|
raise ValueError("not trial")
|
|
except Exception:
|
|
try:
|
|
info = decode_license_key(key, master_key_hex)
|
|
except Exception as e:
|
|
return {"valid": False, "expired": False, "error": str(e)}
|
|
|
|
now = datetime.now(timezone.utc)
|
|
expires = datetime.fromisoformat(info["expires_at"])
|
|
if expires.tzinfo is None:
|
|
expires = expires.replace(tzinfo=timezone.utc)
|
|
|
|
expired = now > expires
|
|
days_remaining = max(0, (expires - now).days)
|
|
expiry_warning = not expired and days_remaining <= EXPIRY_WARN_DAYS
|
|
|
|
is_trial = info.get("edition") == LicenseEdition.TRIAL.value
|
|
|
|
return {
|
|
"valid": not expired,
|
|
"expired": expired,
|
|
"expiry_warning": expiry_warning,
|
|
"is_trial": is_trial,
|
|
"license_id": info["license_id"],
|
|
"edition": info["edition"],
|
|
"customer": info["customer"],
|
|
"issued_at": info["issued_at"],
|
|
"expires_at": info["expires_at"],
|
|
"limits": info["limits"],
|
|
"days_remaining": days_remaining,
|
|
}
|
|
|
|
|
|
# ── 인메모리 캐시 ───────────────────────────────────────────────────────────────
|
|
|
|
_license_cache: Optional[dict] = None
|
|
_cache_refreshed_at: Optional[datetime] = None
|
|
_CACHE_TTL_SECONDS = 3600 # 1시간
|
|
|
|
|
|
def get_cached_license() -> Optional[dict]:
|
|
"""캐시된 라이선스 상태 반환 (TTL 초과 시 None)."""
|
|
global _license_cache, _cache_refreshed_at
|
|
if _license_cache is None or _cache_refreshed_at is None:
|
|
return None
|
|
age = (datetime.now(timezone.utc) - _cache_refreshed_at).total_seconds()
|
|
if age > _CACHE_TTL_SECONDS:
|
|
return None
|
|
return _license_cache
|
|
|
|
|
|
def set_cached_license(status: dict) -> None:
|
|
global _license_cache, _cache_refreshed_at
|
|
_license_cache = status
|
|
_cache_refreshed_at = datetime.now(timezone.utc)
|
|
|
|
|
|
def invalidate_license_cache() -> None:
|
|
global _license_cache, _cache_refreshed_at
|
|
_license_cache = None
|
|
_cache_refreshed_at = None
|
|
|
|
|
|
# ── 비밀번호 복잡도 검증 ────────────────────────────────────────────────────────
|
|
|
|
class PasswordStrength(str, Enum):
|
|
WEAK = "WEAK"
|
|
MEDIUM = "MEDIUM"
|
|
STRONG = "STRONG"
|
|
|
|
|
|
def check_password_complexity(password: str) -> tuple[bool, str]:
|
|
"""
|
|
비밀번호 복잡도 검증.
|
|
|
|
규칙:
|
|
- 최소 8자
|
|
- 대문자 1개 이상
|
|
- 소문자 1개 이상
|
|
- 숫자 1개 이상
|
|
- 특수문자(!@#$%^&*...) 1개 이상
|
|
|
|
Returns:
|
|
(ok: bool, message: str)
|
|
"""
|
|
errors = []
|
|
if len(password) < 8:
|
|
errors.append("최소 8자 이상")
|
|
if not any(c.isupper() for c in password):
|
|
errors.append("대문자 1자 이상")
|
|
if not any(c.islower() for c in password):
|
|
errors.append("소문자 1자 이상")
|
|
if not any(c.isdigit() for c in password):
|
|
errors.append("숫자 1자 이상")
|
|
specials = set("!@#$%^&*()-_=+[]{}|;:',.<>?/`~")
|
|
if not any(c in specials for c in password):
|
|
errors.append("특수문자(!@#$%^&* 등) 1자 이상")
|
|
|
|
if errors:
|
|
return False, "비밀번호 복잡도 미충족: " + ", ".join(errors)
|
|
return True, "OK"
|
|
|
|
|
|
# ── CLI 진입점 (python -m core.license) ────────────────────────────────────────
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
import sys
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="GUARDiA 라이선스 키 생성 도구",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
사용 예:
|
|
python -m core.license --customer "서울시청" --edition ENTERPRISE --days 365
|
|
python -m core.license --customer "테스트기관" --edition COMMUNITY --days 30 --key 64자리hex
|
|
"""
|
|
)
|
|
parser.add_argument("--customer", required=True, help="고객/기관 이름")
|
|
parser.add_argument("--edition", required=True,
|
|
choices=["COMMUNITY", "STANDARD", "ENTERPRISE"],
|
|
help="라이선스 에디션")
|
|
parser.add_argument("--days", type=int, required=True, help="유효 기간(일)")
|
|
parser.add_argument("--lid", default=None, help="라이선스 ID (기본: 자동생성)")
|
|
parser.add_argument("--key", default=None,
|
|
help="마스터 키 (기본: GUARDIA_LICENSE_KEY 환경변수)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
expires = datetime.now(timezone.utc) + timedelta(days=args.days)
|
|
try:
|
|
lic_key = generate_license_key(
|
|
customer = args.customer,
|
|
edition = LicenseEdition(args.edition),
|
|
expires_at = expires,
|
|
license_id = args.lid,
|
|
master_key_hex = args.key,
|
|
)
|
|
except Exception as e:
|
|
print(f"[오류] {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
info = validate_license(lic_key, master_key_hex=args.key)
|
|
print("\n" + "=" * 60)
|
|
print(" GUARDiA 라이선스 키 생성 완료")
|
|
print("=" * 60)
|
|
print(f" 고객명 : {info['customer']}")
|
|
print(f" 에디션 : {info['edition']}")
|
|
print(f" 라이선스ID: {info['license_id']}")
|
|
print(f" 발급일시 : {info['issued_at']}")
|
|
print(f" 만료일시 : {info['expires_at']}")
|
|
print(f" 유효기간 : {args.days}일")
|
|
print("=" * 60)
|
|
print(f"\n라이선스 키:\n{lic_key}\n")
|