"""Manager Gen6 — 고급 보안 관리: ZeroTrust UI·위협헌팅·SOC·취약점 관제""" import uuid from datetime import datetime from typing import Any, Dict, List, Optional from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel router = APIRouter(prefix="/api/adv-security", tags=["Advanced Security Manager"]) _threats: Dict[str, Dict] = {} _hunts: Dict[str, Dict] = {} _soc_incidents: Dict[str, Dict] = {} class ThreatHunt(BaseModel): name: str; hypothesis: str; ioc_list: List[str] = [] scope: str = "all" # all|network|endpoint|app created_by: str = "analyst" class SOCIncident(BaseModel): title: str; severity: str = "medium" # low|medium|high|critical source: str = "SIEM"; affected_assets: List[str] = [] class ZTPolicy(BaseModel): name: str; subject: str; resource: str conditions: Dict[str, Any] = {}; action: str = "allow" # ── ZeroTrust 정책 UI ─────────────────────────────────────────────────────── _zt_policies: Dict[str, Dict] = {} @router.post("/zt-policies") async def create_zt_policy(policy: ZTPolicy): pid = f"ZTP-{uuid.uuid4().hex[:8].upper()}" _zt_policies[pid] = {**policy.model_dump(), "id": pid, "status": "active", "created_at": datetime.utcnow().isoformat(), "hits": 0} return _zt_policies[pid] @router.get("/zt-policies") async def list_zt_policies(): default = [ {"id": "ZTP-DEFAULT01", "name": "DevOps 접근 정책", "subject": "devops", "resource": "ITSM API", "action": "allow", "status": "active", "hits": 1240}, {"id": "ZTP-DEFAULT02", "name": "외부망 차단", "subject": "*", "resource": "Internal DB", "action": "deny", "status": "active", "hits": 320}, ] custom = list(_zt_policies.values()) return {"policies": default + custom, "total": len(default) + len(custom)} @router.delete("/zt-policies/{pid}") async def revoke_zt_policy(pid: str): _zt_policies.pop(pid, None) return {"revoked": pid} @router.get("/zt-score") async def zt_score_dashboard(): return {"overall_score": 82.4, "grade": "B+", "dimensions": {"identity": 88.0, "device": 79.5, "network": 83.1, "workload": 80.2, "data": 85.0}, "trend": "+3.2% vs last month", "ts": datetime.utcnow().isoformat()} # ── 위협 헌팅 ────────────────────────────────────────────────────────────── @router.post("/threat-hunts") async def start_threat_hunt(hunt: ThreatHunt): hid = f"HUNT-{uuid.uuid4().hex[:8].upper()}" _hunts[hid] = {**hunt.model_dump(), "id": hid, "status": "running", "findings": 0, "started_at": datetime.utcnow().isoformat()} # 시뮬레이션: 즉시 완료 _hunts[hid]["status"] = "completed" _hunts[hid]["findings"] = len(hunt.ioc_list) // 2 _hunts[hid]["completed_at"] = datetime.utcnow().isoformat() return _hunts[hid] @router.get("/threat-hunts") async def list_hunts(): return {"hunts": list(_hunts.values()), "total": len(_hunts)} @router.get("/threat-hunts/{hid}/results") async def hunt_results(hid: str): h = _hunts.get(hid) if not h: raise HTTPException(404) return {**h, "matched_iocs": h["ioc_list"][:h.get("findings", 0)], "affected_hosts": ["app-01"] if h.get("findings") else [], "recommendation": "경보 규칙 업데이트 권장" if h.get("findings") else "이상 없음"} # ── SOC 통합 대시보드 ───────────────────────────────────────────────────── @router.post("/soc/incidents") async def create_soc_incident(inc: SOCIncident): iid = f"SOC-{uuid.uuid4().hex[:8].upper()}" _soc_incidents[iid] = {**inc.model_dump(), "id": iid, "status": "open", "created_at": datetime.utcnow().isoformat(), "assignee": None} return _soc_incidents[iid] @router.get("/soc/incidents") async def list_soc_incidents(status: Optional[str] = None, severity: Optional[str] = None): items = list(_soc_incidents.values()) if status: items = [i for i in items if i["status"] == status] if severity: items = [i for i in items if i["severity"] == severity] return {"incidents": items, "total": len(items)} @router.patch("/soc/incidents/{iid}") async def update_soc_incident(iid: str, status: str = Query(...)): inc = _soc_incidents.get(iid) if not inc: raise HTTPException(404) inc["status"] = status; inc["updated_at"] = datetime.utcnow().isoformat() return inc @router.get("/soc/dashboard") async def soc_dashboard(): return {"summary": {"open": 3, "in_progress": 1, "resolved_today": 5}, "severity_breakdown": {"critical": 0, "high": 1, "medium": 2, "low": 3}, "mean_time_to_detect_min": 12.4, "mean_time_to_respond_min": 38.7, "top_sources": ["SIEM", "IDS", "위협인텔"], "ts": datetime.utcnow().isoformat()} # ── 취약점 관제 ──────────────────────────────────────────────────────────── @router.get("/vulnerabilities") async def list_vulnerabilities(severity: Optional[str] = None): vulns = [ {"cve": "CVE-2023-44487", "severity": "high", "asset": "app-01", "status": "patched"}, {"cve": "CVE-2024-3094", "severity": "critical", "asset": "db-01", "status": "pending"}, ] if severity: vulns = [v for v in vulns if v["severity"] == severity] return {"vulnerabilities": vulns, "total": len(vulns), "unpatched": sum(1 for v in vulns if v["status"] == "pending")} @router.get("/health") async def health(): return {"status": "ok", "zt_policies": len(_zt_policies), "hunts": len(_hunts), "soc_incidents": len(_soc_incidents)}