124 lines
5.9 KiB
Python
124 lines
5.9 KiB
Python
"""Manager Gen6 — 고급 보안 관리: ZeroTrust UI·위협헌팅·SOC·취약점 관제"""
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import Any, Dict, List, Optional
|
|
from fastapi import APIRouter, HTTPException, Query
|
|
from pydantic import BaseModel
|
|
|
|
router = APIRouter(prefix="/api/adv-security", tags=["Advanced Security Manager"])
|
|
|
|
_threats: Dict[str, Dict] = {}
|
|
_hunts: Dict[str, Dict] = {}
|
|
_soc_incidents: Dict[str, Dict] = {}
|
|
|
|
class ThreatHunt(BaseModel):
|
|
name: str; hypothesis: str; ioc_list: List[str] = []
|
|
scope: str = "all" # all|network|endpoint|app
|
|
created_by: str = "analyst"
|
|
|
|
class SOCIncident(BaseModel):
|
|
title: str; severity: str = "medium" # low|medium|high|critical
|
|
source: str = "SIEM"; affected_assets: List[str] = []
|
|
|
|
class ZTPolicy(BaseModel):
|
|
name: str; subject: str; resource: str
|
|
conditions: Dict[str, Any] = {}; action: str = "allow"
|
|
|
|
# ── ZeroTrust 정책 UI ───────────────────────────────────────────────────────
|
|
_zt_policies: Dict[str, Dict] = {}
|
|
|
|
@router.post("/zt-policies")
|
|
async def create_zt_policy(policy: ZTPolicy):
|
|
pid = f"ZTP-{uuid.uuid4().hex[:8].upper()}"
|
|
_zt_policies[pid] = {**policy.model_dump(), "id": pid, "status": "active",
|
|
"created_at": datetime.utcnow().isoformat(), "hits": 0}
|
|
return _zt_policies[pid]
|
|
|
|
@router.get("/zt-policies")
|
|
async def list_zt_policies():
|
|
default = [
|
|
{"id": "ZTP-DEFAULT01", "name": "DevOps 접근 정책", "subject": "devops", "resource": "ITSM API", "action": "allow", "status": "active", "hits": 1240},
|
|
{"id": "ZTP-DEFAULT02", "name": "외부망 차단", "subject": "*", "resource": "Internal DB", "action": "deny", "status": "active", "hits": 320},
|
|
]
|
|
custom = list(_zt_policies.values())
|
|
return {"policies": default + custom, "total": len(default) + len(custom)}
|
|
|
|
@router.delete("/zt-policies/{pid}")
|
|
async def revoke_zt_policy(pid: str):
|
|
_zt_policies.pop(pid, None)
|
|
return {"revoked": pid}
|
|
|
|
@router.get("/zt-score")
|
|
async def zt_score_dashboard():
|
|
return {"overall_score": 82.4, "grade": "B+",
|
|
"dimensions": {"identity": 88.0, "device": 79.5, "network": 83.1, "workload": 80.2, "data": 85.0},
|
|
"trend": "+3.2% vs last month", "ts": datetime.utcnow().isoformat()}
|
|
|
|
# ── 위협 헌팅 ──────────────────────────────────────────────────────────────
|
|
@router.post("/threat-hunts")
|
|
async def start_threat_hunt(hunt: ThreatHunt):
|
|
hid = f"HUNT-{uuid.uuid4().hex[:8].upper()}"
|
|
_hunts[hid] = {**hunt.model_dump(), "id": hid, "status": "running",
|
|
"findings": 0, "started_at": datetime.utcnow().isoformat()}
|
|
# 시뮬레이션: 즉시 완료
|
|
_hunts[hid]["status"] = "completed"
|
|
_hunts[hid]["findings"] = len(hunt.ioc_list) // 2
|
|
_hunts[hid]["completed_at"] = datetime.utcnow().isoformat()
|
|
return _hunts[hid]
|
|
|
|
@router.get("/threat-hunts")
|
|
async def list_hunts():
|
|
return {"hunts": list(_hunts.values()), "total": len(_hunts)}
|
|
|
|
@router.get("/threat-hunts/{hid}/results")
|
|
async def hunt_results(hid: str):
|
|
h = _hunts.get(hid)
|
|
if not h: raise HTTPException(404)
|
|
return {**h, "matched_iocs": h["ioc_list"][:h.get("findings", 0)],
|
|
"affected_hosts": ["app-01"] if h.get("findings") else [],
|
|
"recommendation": "경보 규칙 업데이트 권장" if h.get("findings") else "이상 없음"}
|
|
|
|
# ── SOC 통합 대시보드 ─────────────────────────────────────────────────────
|
|
@router.post("/soc/incidents")
|
|
async def create_soc_incident(inc: SOCIncident):
|
|
iid = f"SOC-{uuid.uuid4().hex[:8].upper()}"
|
|
_soc_incidents[iid] = {**inc.model_dump(), "id": iid, "status": "open",
|
|
"created_at": datetime.utcnow().isoformat(), "assignee": None}
|
|
return _soc_incidents[iid]
|
|
|
|
@router.get("/soc/incidents")
|
|
async def list_soc_incidents(status: Optional[str] = None, severity: Optional[str] = None):
|
|
items = list(_soc_incidents.values())
|
|
if status: items = [i for i in items if i["status"] == status]
|
|
if severity: items = [i for i in items if i["severity"] == severity]
|
|
return {"incidents": items, "total": len(items)}
|
|
|
|
@router.patch("/soc/incidents/{iid}")
|
|
async def update_soc_incident(iid: str, status: str = Query(...)):
|
|
inc = _soc_incidents.get(iid)
|
|
if not inc: raise HTTPException(404)
|
|
inc["status"] = status; inc["updated_at"] = datetime.utcnow().isoformat()
|
|
return inc
|
|
|
|
@router.get("/soc/dashboard")
|
|
async def soc_dashboard():
|
|
return {"summary": {"open": 3, "in_progress": 1, "resolved_today": 5},
|
|
"severity_breakdown": {"critical": 0, "high": 1, "medium": 2, "low": 3},
|
|
"mean_time_to_detect_min": 12.4, "mean_time_to_respond_min": 38.7,
|
|
"top_sources": ["SIEM", "IDS", "위협인텔"],
|
|
"ts": datetime.utcnow().isoformat()}
|
|
|
|
# ── 취약점 관제 ────────────────────────────────────────────────────────────
|
|
@router.get("/vulnerabilities")
|
|
async def list_vulnerabilities(severity: Optional[str] = None):
|
|
vulns = [
|
|
{"cve": "CVE-2023-44487", "severity": "high", "asset": "app-01", "status": "patched"},
|
|
{"cve": "CVE-2024-3094", "severity": "critical", "asset": "db-01", "status": "pending"},
|
|
]
|
|
if severity: vulns = [v for v in vulns if v["severity"] == severity]
|
|
return {"vulnerabilities": vulns, "total": len(vulns), "unpatched": sum(1 for v in vulns if v["status"] == "pending")}
|
|
|
|
@router.get("/health")
|
|
async def health():
|
|
return {"status": "ok", "zt_policies": len(_zt_policies), "hunts": len(_hunts), "soc_incidents": len(_soc_incidents)}
|