guardia-itsm/routers/advanced_security2.py
2026-06-07 08:13:43 +09:00

207 lines
9.8 KiB
Python

"""
GUARDiA 고급 보안 v2 (Advanced Security Gen6)
ZTNA v2·SBOM v2·공급망 보안 v2·제로데이 추적·IAM 감사·포렌식
"""
import uuid, json
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
router = APIRouter(prefix="/api/security2", tags=["Advanced Security v2"])
_policies: Dict[str, Dict] = {}
_sboms: Dict[str, Dict] = {}
_incidents: Dict[str, Dict] = {}
_iam_audits: Dict[str, Dict] = {}
_threat_intel: List[Dict] = []
_forensics: Dict[str, Dict] = {}
class ZTNAPolicy(BaseModel):
name: str; resource: str; principal: str
conditions: Dict[str, Any] = {}; action: str = "allow"
ttl_hours: int = 24
class SBOMCreate(BaseModel):
project: str; version: str; format: str = "cyclonedx" # cyclonedx|spdx
components: List[Dict[str, str]] = []
class ThreatIntel(BaseModel):
ioc_type: str; value: str; severity: str = "medium"
source: str = "internal"; confidence: float = 0.8
class IAMAuditQuery(BaseModel):
user: Optional[str] = None; resource: Optional[str] = None
action: Optional[str] = None; from_date: Optional[str] = None
class ForensicCapture(BaseModel):
target: str; reason: str; capture_type: str = "memory" # memory|disk|network|process
authorized_by: str
class ZeroTrustScore(BaseModel):
entity: str; entity_type: str = "user" # user|device|service
# ── ZTNA v2 정책 엔진 ─────────────────────────────────────────────────────
@router.post("/ztna/policies")
async def create_ztna_policy(p: ZTNAPolicy):
pid = f"ZTNA-{uuid.uuid4().hex[:8].upper()}"
expiry = (datetime.utcnow() + timedelta(hours=p.ttl_hours)).isoformat()
_policies[pid] = {**p.model_dump(), "id": pid, "status": "active",
"expires_at": expiry, "created_at": datetime.utcnow().isoformat()}
return _policies[pid]
@router.get("/ztna/policies")
async def list_ztna_policies(resource: Optional[str] = None):
pols = list(_policies.values())
if resource: pols = [p for p in pols if p["resource"] == resource]
return {"policies": pols, "total": len(pols)}
@router.delete("/ztna/policies/{pid}")
async def revoke_policy(pid: str):
p = _policies.pop(pid, None)
if not p: raise HTTPException(404)
return {"revoked": pid}
@router.post("/ztna/evaluate")
async def evaluate_access(principal: str, resource: str, action: str = "read"):
matched = [p for p in _policies.values()
if p["principal"] == principal and p["resource"] == resource]
allowed = any(p["action"] == "allow" for p in matched)
return {"principal": principal, "resource": resource, "action": action,
"decision": "allow" if allowed else "deny",
"matched_policies": [p["id"] for p in matched],
"reason": "Policy match" if matched else "No matching policy — default deny",
"ts": datetime.utcnow().isoformat()}
@router.post("/ztna/score")
async def zero_trust_score(req: ZeroTrustScore):
import random
score = round(random.uniform(60, 95), 1)
return {"entity": req.entity, "entity_type": req.entity_type,
"trust_score": score, "level": "high" if score > 80 else "medium",
"factors": {"mfa": True, "device_health": score > 75, "location_risk": "low",
"behavior_anomaly": score < 70},
"ts": datetime.utcnow().isoformat()}
# ── SBOM v2 ──────────────────────────────────────────────────────────────
@router.post("/sbom")
async def create_sbom(sbom: SBOMCreate):
sid = f"SBOM-{uuid.uuid4().hex[:8].upper()}"
_sboms[sid] = {**sbom.model_dump(), "id": sid,
"vulnerability_count": len(sbom.components) // 3,
"license_issues": 0, "generated_at": datetime.utcnow().isoformat()}
return _sboms[sid]
@router.get("/sbom")
async def list_sboms(): return {"sboms": list(_sboms.values()), "total": len(_sboms)}
@router.get("/sbom/{sid}")
async def get_sbom(sid: str):
s = _sboms.get(sid)
if not s: raise HTTPException(404)
return s
@router.get("/sbom/{sid}/vulnerabilities")
async def sbom_vulnerabilities(sid: str):
s = _sboms.get(sid)
if not s: raise HTTPException(404)
return {"sbom_id": sid, "vulnerabilities": [
{"component": "log4j", "cve": "CVE-2021-44228", "severity": "critical", "fixed_in": "2.17.1"},
{"component": "openssl", "cve": "CVE-2022-0778", "severity": "high", "fixed_in": "1.1.1n"},
]}
@router.post("/sbom/{sid}/verify")
async def verify_sbom_integrity(sid: str):
s = _sboms.get(sid)
if not s: raise HTTPException(404)
return {"sbom_id": sid, "integrity": "valid", "hash": "sha256:" + uuid.uuid4().hex,
"slsa_level": 2, "provenance_verified": True}
# ── 공급망 보안 v2 ────────────────────────────────────────────────────────
@router.get("/supply-chain/scan")
async def supply_chain_scan(project: str = Query(...)):
return {"project": project, "dependencies_scanned": 127,
"vulnerable": 3, "outdated": 12, "license_conflicts": 1,
"risk_score": 23.4, "scan_time": datetime.utcnow().isoformat()}
@router.get("/supply-chain/provenance")
async def check_provenance(artifact: str = Query(...)):
return {"artifact": artifact, "provenance": "verified", "slsa_level": 3,
"builder": "Gitea CI", "source_repo": "git.zioinfo.co.kr",
"build_time": datetime.utcnow().isoformat()}
@router.post("/supply-chain/policy")
async def create_supply_chain_policy(name: str, rules: Dict[str, Any] = {}):
return {"id": f"SCP-{uuid.uuid4().hex[:8].upper()}", "name": name, "rules": rules,
"enforced": True, "created_at": datetime.utcnow().isoformat()}
# ── 위협 인텔리전스 v2 ───────────────────────────────────────────────────
@router.post("/threat-intel")
async def add_threat_intel(ti: ThreatIntel):
entry = {**ti.model_dump(), "id": str(uuid.uuid4()), "ts": datetime.utcnow().isoformat()}
_threat_intel.append(entry)
return entry
@router.get("/threat-intel")
async def get_threat_intel(severity: Optional[str] = None, limit: int = 50):
items = _threat_intel if not severity else [t for t in _threat_intel if t["severity"] == severity]
return {"items": items[-limit:], "total": len(_threat_intel)}
@router.post("/threat-intel/match")
async def match_ioc(value: str, ioc_type: str = "ip"):
matched = [t for t in _threat_intel if t["value"] == value and t["ioc_type"] == ioc_type]
return {"value": value, "ioc_type": ioc_type, "matched": bool(matched),
"threat": matched[0] if matched else None, "action": "block" if matched else "allow"}
# ── IAM 감사 ─────────────────────────────────────────────────────────────
@router.post("/iam/audit")
async def iam_audit(query: IAMAuditQuery):
aud_id = f"AUD-{uuid.uuid4().hex[:8].upper()}"
_iam_audits[aud_id] = {**query.model_dump(), "id": aud_id, "ts": datetime.utcnow().isoformat()}
return {"audit_id": aud_id, "events": [
{"user": query.user or "admin", "resource": query.resource or "/api/cmdb",
"action": query.action or "GET", "result": "allow", "ts": datetime.utcnow().isoformat()},
], "total": 1}
@router.get("/iam/privileges")
async def list_excessive_privileges():
return {"users_with_excess": [
{"user": "engineer01", "current": "admin", "recommended": "operator", "risk": "medium"},
], "total_reviewed": 15, "flagged": 1}
@router.post("/iam/remediate")
async def remediate_iam(user: str, new_role: str):
return {"user": user, "old_role": "admin", "new_role": new_role,
"applied": True, "ts": datetime.utcnow().isoformat()}
# ── 포렌식 ────────────────────────────────────────────────────────────────
@router.post("/forensics/capture")
async def forensic_capture(req: ForensicCapture):
fid = f"FOR-{uuid.uuid4().hex[:8].upper()}"
_forensics[fid] = {**req.model_dump(), "id": fid, "status": "capturing",
"started_at": datetime.utcnow().isoformat()}
return _forensics[fid]
@router.get("/forensics")
async def list_forensics(): return {"captures": list(_forensics.values()), "total": len(_forensics)}
@router.get("/forensics/{fid}/timeline")
async def forensic_timeline(fid: str):
return {"forensic_id": fid, "timeline": [
{"ts": datetime.utcnow().isoformat(), "event": "Suspicious login", "severity": "high"},
{"ts": datetime.utcnow().isoformat(), "event": "Privilege escalation attempt", "severity": "critical"},
]}
# ── 제로데이 추적 ─────────────────────────────────────────────────────────
@router.get("/zero-day/tracker")
async def zero_day_tracker():
return {"active_zero_days": [
{"id": "ZD-001", "description": "Unknown RCE in web framework", "severity": "critical",
"discovered": "2026-06-01", "status": "patched", "affected": ["svc-itsm"]},
], "total": 1, "unpatched": 0}
@router.get("/security2/health")
async def health():
return {"status": "healthy", "policies": len(_policies), "sboms": len(_sboms),
"threat_intel": len(_threat_intel), "forensics": len(_forensics)}