guardia-manager/backend/routers/adv_security_mgr.py
2026-06-07 08:13:48 +09:00

124 lines
5.9 KiB
Python

"""Manager Gen6 — 고급 보안 관리: ZeroTrust UI·위협헌팅·SOC·취약점 관제"""
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
router = APIRouter(prefix="/api/adv-security", tags=["Advanced Security Manager"])
_threats: Dict[str, Dict] = {}
_hunts: Dict[str, Dict] = {}
_soc_incidents: Dict[str, Dict] = {}
class ThreatHunt(BaseModel):
name: str; hypothesis: str; ioc_list: List[str] = []
scope: str = "all" # all|network|endpoint|app
created_by: str = "analyst"
class SOCIncident(BaseModel):
title: str; severity: str = "medium" # low|medium|high|critical
source: str = "SIEM"; affected_assets: List[str] = []
class ZTPolicy(BaseModel):
name: str; subject: str; resource: str
conditions: Dict[str, Any] = {}; action: str = "allow"
# ── ZeroTrust 정책 UI ───────────────────────────────────────────────────────
_zt_policies: Dict[str, Dict] = {}
@router.post("/zt-policies")
async def create_zt_policy(policy: ZTPolicy):
pid = f"ZTP-{uuid.uuid4().hex[:8].upper()}"
_zt_policies[pid] = {**policy.model_dump(), "id": pid, "status": "active",
"created_at": datetime.utcnow().isoformat(), "hits": 0}
return _zt_policies[pid]
@router.get("/zt-policies")
async def list_zt_policies():
default = [
{"id": "ZTP-DEFAULT01", "name": "DevOps 접근 정책", "subject": "devops", "resource": "ITSM API", "action": "allow", "status": "active", "hits": 1240},
{"id": "ZTP-DEFAULT02", "name": "외부망 차단", "subject": "*", "resource": "Internal DB", "action": "deny", "status": "active", "hits": 320},
]
custom = list(_zt_policies.values())
return {"policies": default + custom, "total": len(default) + len(custom)}
@router.delete("/zt-policies/{pid}")
async def revoke_zt_policy(pid: str):
_zt_policies.pop(pid, None)
return {"revoked": pid}
@router.get("/zt-score")
async def zt_score_dashboard():
return {"overall_score": 82.4, "grade": "B+",
"dimensions": {"identity": 88.0, "device": 79.5, "network": 83.1, "workload": 80.2, "data": 85.0},
"trend": "+3.2% vs last month", "ts": datetime.utcnow().isoformat()}
# ── 위협 헌팅 ──────────────────────────────────────────────────────────────
@router.post("/threat-hunts")
async def start_threat_hunt(hunt: ThreatHunt):
hid = f"HUNT-{uuid.uuid4().hex[:8].upper()}"
_hunts[hid] = {**hunt.model_dump(), "id": hid, "status": "running",
"findings": 0, "started_at": datetime.utcnow().isoformat()}
# 시뮬레이션: 즉시 완료
_hunts[hid]["status"] = "completed"
_hunts[hid]["findings"] = len(hunt.ioc_list) // 2
_hunts[hid]["completed_at"] = datetime.utcnow().isoformat()
return _hunts[hid]
@router.get("/threat-hunts")
async def list_hunts():
return {"hunts": list(_hunts.values()), "total": len(_hunts)}
@router.get("/threat-hunts/{hid}/results")
async def hunt_results(hid: str):
h = _hunts.get(hid)
if not h: raise HTTPException(404)
return {**h, "matched_iocs": h["ioc_list"][:h.get("findings", 0)],
"affected_hosts": ["app-01"] if h.get("findings") else [],
"recommendation": "경보 규칙 업데이트 권장" if h.get("findings") else "이상 없음"}
# ── SOC 통합 대시보드 ─────────────────────────────────────────────────────
@router.post("/soc/incidents")
async def create_soc_incident(inc: SOCIncident):
iid = f"SOC-{uuid.uuid4().hex[:8].upper()}"
_soc_incidents[iid] = {**inc.model_dump(), "id": iid, "status": "open",
"created_at": datetime.utcnow().isoformat(), "assignee": None}
return _soc_incidents[iid]
@router.get("/soc/incidents")
async def list_soc_incidents(status: Optional[str] = None, severity: Optional[str] = None):
items = list(_soc_incidents.values())
if status: items = [i for i in items if i["status"] == status]
if severity: items = [i for i in items if i["severity"] == severity]
return {"incidents": items, "total": len(items)}
@router.patch("/soc/incidents/{iid}")
async def update_soc_incident(iid: str, status: str = Query(...)):
inc = _soc_incidents.get(iid)
if not inc: raise HTTPException(404)
inc["status"] = status; inc["updated_at"] = datetime.utcnow().isoformat()
return inc
@router.get("/soc/dashboard")
async def soc_dashboard():
return {"summary": {"open": 3, "in_progress": 1, "resolved_today": 5},
"severity_breakdown": {"critical": 0, "high": 1, "medium": 2, "low": 3},
"mean_time_to_detect_min": 12.4, "mean_time_to_respond_min": 38.7,
"top_sources": ["SIEM", "IDS", "위협인텔"],
"ts": datetime.utcnow().isoformat()}
# ── 취약점 관제 ────────────────────────────────────────────────────────────
@router.get("/vulnerabilities")
async def list_vulnerabilities(severity: Optional[str] = None):
vulns = [
{"cve": "CVE-2023-44487", "severity": "high", "asset": "app-01", "status": "patched"},
{"cve": "CVE-2024-3094", "severity": "critical", "asset": "db-01", "status": "pending"},
]
if severity: vulns = [v for v in vulns if v["severity"] == severity]
return {"vulnerabilities": vulns, "total": len(vulns), "unpatched": sum(1 for v in vulns if v["status"] == "pending")}
@router.get("/health")
async def health():
return {"status": "ok", "zt_policies": len(_zt_policies), "hunts": len(_hunts), "soc_incidents": len(_soc_incidents)}