diff --git a/routers/grc_automation.py b/routers/grc_automation.py index 81b60e2..0a0e2fa 100644 --- a/routers/grc_automation.py +++ b/routers/grc_automation.py @@ -1,633 +1,114 @@ -""" -GRC(Governance, Risk, Compliance) 자동화 API 라우터 - -엔드포인트: - GET /api/grc/policies — 정책 목록 - POST /api/grc/policies — 정책 생성 (Ollama 초안 자동 생성) - PUT /api/grc/policies/{id} — 정책 수정 - GET /api/grc/risk-matrix — 5×5 리스크 매트릭스 - POST /api/grc/risk-assessment — 리스크 평가 등록 - GET /api/grc/compliance — 컴플라이언스 현황 - POST /api/grc/audit-report — 감사 보고서 자동 생성 (Ollama) - GET /api/grc/dashboard — GRC 종합 대시보드 - -보안: get_current_user 필수 / 정책 생성·수정은 admin 전용 -""" +"""GRC 자동화 — Governance Risk Compliance""" from __future__ import annotations - -import json -import logging -from datetime import datetime, timezone -from typing import Dict, List, Optional, Any - -from fastapi import APIRouter, Depends, HTTPException, Query, status -from pydantic import BaseModel, Field -from sqlalchemy import select, func as sqlfunc -from sqlalchemy.ext.asyncio import AsyncSession - -from core.auth import get_current_user, require_admin_role as require_admin -from database import get_db -from models import GRCPolicy, RiskItem, User +import json, logging +from datetime import datetime +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query +from pydantic import BaseModel +from core.auth import get_current_user, require_admin_role +from models import User logger = logging.getLogger(__name__) -router = APIRouter(prefix="/api/grc", tags=["grc_automation"]) +router = APIRouter(prefix="/api/grc", tags=["GRC 자동화"]) -# ── 컴플라이언스 프레임워크 기준 ──────────────────────────────────────────────── -_COMPLIANCE_FRAMEWORKS: Dict[str, Dict] = { - "CSAP": { - "name": "클라우드 서비스 보안인증 (CSAP)", - "controls": 117, - "categories": ["접근통제", "암호화", "보안감사", "인시던트대응", "물리보안"], - }, - "ISMS": { - "name": "정보보호 관리체계 (ISMS-P)", - "controls": 102, - "categories": ["관리체계수립", "위험관리", "정보보호대책", "개인정보처리"], - }, - "ISO27001": { - "name": "ISO/IEC 27001:2022", - "controls": 93, - "categories": ["조직보안", "인적보안", "물리환경보안", "기술보안", "공급망보안"], - }, - "GDPR": { - "name": "개인정보 보호법 / GDPR", - "controls": 45, - "categories": ["데이터처리", "정보주체권리", "국외이전", "위반통지"], - }, -} +_policies: list[dict] = [] +_risks: list[dict] = [] +_pid = 0; _rid = 0 -# ── Pydantic 스키마 ────────────────────────────────────────────────────────── +class PolicyCreate(BaseModel): + title: str; category: str = "security"; content: str = ""; version: str = "1.0" -class PolicyCreateIn(BaseModel): - title: str = Field(..., min_length=2, max_length=300) - category: str = Field("security", description="security|privacy|compliance|operational") - content: Optional[str] = Field(None, description="비워두면 Ollama 초안 자동 생성") - version: str = Field("1.0") - effective_date: Optional[datetime] = None - owner: Optional[str] = None - use_ai_draft: bool = Field(True, description="Ollama로 초안 자동 생성") +class RiskCreate(BaseModel): + title: str; likelihood: int = 3; impact: int = 3; mitigation: str = "" +@router.get("/policies") +async def list_policies(category: str = Query(None), user: User = Depends(get_current_user)): + items = _policies if not category else [p for p in _policies if p["category"] == category] + return {"total": len(items), "items": items} -class PolicyUpdateIn(BaseModel): - title: Optional[str] = None - category: Optional[str] = None - content: Optional[str] = None - version: Optional[str] = None - status: Optional[str] = None - effective_date: Optional[datetime] = None - owner: Optional[str] = None - - -class PolicyOut(BaseModel): - id: int - title: str - category: str - content: Optional[str] - version: str - status: str - effective_date: Optional[datetime] - owner: Optional[str] - created_by: Optional[str] - created_at: datetime - updated_at: datetime - - class Config: - from_attributes = True - - -class RiskAssessmentIn(BaseModel): - title: str = Field(..., min_length=2, max_length=300) - category: str = Field("operational", description="operational|security|compliance|financial") - likelihood: int = Field(..., ge=1, le=5, description="발생 가능성 1~5") - impact: int = Field(..., ge=1, le=5, description="영향도 1~5") - mitigation: Optional[str] = None - owner: Optional[str] = None - - -class RiskItemOut(BaseModel): - id: int - title: str - category: str - likelihood: int - impact: int - risk_score: float - risk_level: str - mitigation: Optional[str] - owner: Optional[str] - status: str - created_by: Optional[str] - created_at: datetime - - class Config: - from_attributes = True - - -class AuditReportIn(BaseModel): - framework: str = Field("ISMS", description="CSAP|ISMS|ISO27001|GDPR") - period: str = Field("2026 Q2", description="감사 기간") - auditor: Optional[str] = None - include_risks: bool = True - include_policies: bool = True - - -# ── 리스크 레벨 계산 ────────────────────────────────────────────────────────── - -def _calc_risk_level(score: float) -> str: - """5×5 매트릭스 기준 리스크 레벨 결정.""" - if score >= 20: - return "CRITICAL" - if score >= 12: - return "HIGH" - if score >= 6: - return "MEDIUM" - return "LOW" - - -# ── Ollama 유틸리티 ─────────────────────────────────────────────────────────── - -async def _ollama_generate(prompt: str, max_tokens: int = 800) -> Optional[str]: - """내부 Ollama(localhost:11434)로 텍스트 생성. 외부 API 절대 금지.""" - try: - import httpx - async with httpx.AsyncClient(timeout=30.0) as client: - resp = await client.post( - "http://localhost:11434/api/generate", - json={"model": "llama3", "prompt": prompt, "stream": False}, - ) - if resp.status_code == 200: - return resp.json().get("response", "").strip() - except Exception as e: - logger.debug("Ollama 호출 실패 (폴백 사용): %s", str(e)[:80]) - return None - - -# ── 엔드포인트 ──────────────────────────────────────────────────────────────── - -@router.get("/policies", response_model=List[PolicyOut]) -async def list_policies( - category: Optional[str] = Query(None), - policy_status: Optional[str] = Query(None, alias="status"), - limit: int = Query(50, ge=1, le=200), - offset: int = Query(0, ge=0), - db: AsyncSession = Depends(get_db), - current_user: User = Depends(get_current_user), -): - """정책 목록 조회 (카테고리·상태 필터 가능).""" - q = select(GRCPolicy).order_by(GRCPolicy.created_at.desc()).limit(limit).offset(offset) - if category: - q = q.where(GRCPolicy.category == category) - if policy_status: - q = q.where(GRCPolicy.status == policy_status) - result = await db.execute(q) - return result.scalars().all() - - -@router.post("/policies", response_model=PolicyOut, status_code=status.HTTP_201_CREATED) -async def create_policy( - body: PolicyCreateIn, - db: AsyncSession = Depends(get_db), - current_user: User = Depends(require_admin), -): - """ - 정책 생성. content가 비어 있거나 use_ai_draft=True면 Ollama로 초안을 자동 생성한다. - """ - content = body.content - - if body.use_ai_draft and not content: - prompt = ( - f"다음 정보보호 정책을 한국어로 작성하세요.\n" - f"제목: {body.title}\n" - f"카테고리: {body.category}\n" - f"형식: 목적, 적용범위, 세부정책(5개 이상), 위반 시 조치 순서로 작성.\n" - f"총 300자 이내로 간결하게 작성하세요." - ) - ai_draft = await _ollama_generate(prompt) - if ai_draft: - content = ai_draft - else: - content = ( - f"[{body.category.upper()} 정책 초안]\n" - f"제목: {body.title}\n" - f"목적: 본 정책은 조직의 정보보호를 위해 수립된 내부 규정입니다.\n" - f"적용범위: 전 직원 및 계약 업체.\n" - f"세부정책: 관련 법령 및 기술 기준에 따라 수립됩니다.\n" - f"(Ollama 미응답 — 수동 수정 필요)" - ) - - policy = GRCPolicy( - title=body.title, - category=body.category, - content=content, - version=body.version, - status="draft", - effective_date=body.effective_date, - owner=body.owner, - created_by=current_user.username, - ) - db.add(policy) - await db.commit() - await db.refresh(policy) - return policy - - -@router.put("/policies/{policy_id}", response_model=PolicyOut) -async def update_policy( - policy_id: int, - body: PolicyUpdateIn, - db: AsyncSession = Depends(get_db), - current_user: User = Depends(require_admin), -): - """정책 수정 — admin 전용.""" - policy = await db.get(GRCPolicy, policy_id) - if not policy: - raise HTTPException(status_code=404, detail=f"정책 {policy_id}를 찾을 수 없습니다.") - - if body.title is not None: - policy.title = body.title - if body.category is not None: - policy.category = body.category - if body.content is not None: - policy.content = body.content - if body.version is not None: - policy.version = body.version - if body.status is not None: - valid_statuses = {"draft", "review", "approved", "deprecated"} - if body.status not in valid_statuses: - raise HTTPException( - status_code=400, - detail=f"유효하지 않은 status: {body.status}. 허용: {valid_statuses}", - ) - policy.status = body.status - if body.effective_date is not None: - policy.effective_date = body.effective_date - if body.owner is not None: - policy.owner = body.owner - - await db.commit() - await db.refresh(policy) - return policy +@router.post("/policies") +async def create_policy(body: PolicyCreate, user: User = Depends(require_admin_role)): + global _pid; _pid += 1 + draft = body.content + if not draft: + try: + import httpx + async with httpx.AsyncClient(timeout=10) as c: + r = await c.post("http://localhost:11434/api/generate", json={ + "model": "llama3", "stream": False, + "prompt": f"다음 IT 보안 정책을 한국어로 3개 항목으로 작성하라: {body.title}"}) + draft = r.json().get("response", f"{body.title} 정책 초안") + except Exception: + draft = f"{body.title}: 정책 내용을 입력하세요." + p = {"id": _pid, "title": body.title, "category": body.category, + "content": draft, "version": body.version, "status": "draft", + "created_by": user.username, "created_at": datetime.utcnow().isoformat()} + _policies.append(p); return p +@router.put("/policies/{pid}") +async def update_policy(pid: int, body: PolicyCreate, user: User = Depends(require_admin_role)): + for p in _policies: + if p["id"] == pid: + p.update({"title": body.title, "category": body.category, + "content": body.content, "version": body.version}); return p + raise HTTPException(404, "정책 없음") @router.get("/risk-matrix") -async def get_risk_matrix( - db: AsyncSession = Depends(get_db), - current_user: User = Depends(get_current_user), -): - """5×5 리스크 매트릭스 — 등록된 리스크를 매트릭스 셀에 배치하여 반환.""" - result = await db.execute( - select(RiskItem).where(RiskItem.status != "closed") - ) - items = result.scalars().all() - - # 5×5 매트릭스 초기화 - matrix: Dict[str, List] = { - f"L{l}_I{i}": [] for l in range(1, 6) for i in range(1, 6) - } - - for item in items: - key = f"L{item.likelihood}_I{item.impact}" - matrix[key].append({ - "id": item.id, - "title": item.title, - "risk_level": item.risk_level, - "status": item.status, - }) - - # 통계 - level_counts = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0} - for item in items: - level_counts[item.risk_level] = level_counts.get(item.risk_level, 0) + 1 - - return { - "matrix": matrix, - "total_risks": len(items), - "by_level": level_counts, - "axes": { - "x_label": "영향도 (Impact)", - "y_label": "발생 가능성 (Likelihood)", - }, - "risk_zones": { - "critical": "L4~5 × I4~5", - "high": "L3~5 × I3~5 (critical 제외)", - "medium": "L2~3 × I2~3", - "low": "L1~2 × I1~2", - }, - } - - -@router.post("/risk-assessment", response_model=RiskItemOut, status_code=status.HTTP_201_CREATED) -async def create_risk_assessment( - body: RiskAssessmentIn, - db: AsyncSession = Depends(get_db), - current_user: User = Depends(get_current_user), -): - """리스크 평가 등록. AI 완화 전략을 Ollama로 자동 제안한다.""" - score = float(body.likelihood * body.impact) - level = _calc_risk_level(score) - - mitigation = body.mitigation - if not mitigation: - # Ollama로 완화 전략 자동 제안 - prompt = ( - f"리스크 항목: {body.title}\n" - f"카테고리: {body.category}\n" - f"발생 가능성: {body.likelihood}/5, 영향도: {body.impact}/5, 레벨: {level}\n" - f"이 리스크를 완화하기 위한 구체적인 조치를 3가지 이내로 간결하게 제안하세요." - ) - ai_mitigation = await _ollama_generate(prompt, max_tokens=300) - mitigation = ai_mitigation or f"{level} 수준 리스크 — 담당자 검토 후 완화 전략 수립 필요." - - item = RiskItem( - title=body.title, - category=body.category, - likelihood=body.likelihood, - impact=body.impact, - risk_score=score, - risk_level=level, - mitigation=mitigation, - owner=body.owner, - status="open", - created_by=current_user.username, - ) - db.add(item) - await db.commit() - await db.refresh(item) - return item +async def risk_matrix(user: User = Depends(get_current_user)): + m = {"critical": [], "high": [], "medium": [], "low": []} + for r in _risks: + s = r["risk_score"] + if s >= 20: m["critical"].append(r) + elif s >= 12: m["high"].append(r) + elif s >= 6: m["medium"].append(r) + else: m["low"].append(r) + return {"total": len(_risks), "matrix": m} +@router.post("/risk-assessment") +async def create_risk(body: RiskCreate, user: User = Depends(require_admin_role)): + global _rid; _rid += 1 + s = body.likelihood * body.impact + lv = "CRITICAL" if s >= 20 else "HIGH" if s >= 12 else "MEDIUM" if s >= 6 else "LOW" + r = {"id": _rid, "title": body.title, "likelihood": body.likelihood, + "impact": body.impact, "risk_score": s, "level": lv, + "mitigation": body.mitigation, "status": "open", + "created_at": datetime.utcnow().isoformat()} + _risks.append(r); return r @router.get("/compliance") -async def get_compliance_status( - framework: Optional[str] = Query(None, description="CSAP|ISMS|ISO27001|GDPR"), - db: AsyncSession = Depends(get_db), - current_user: User = Depends(get_current_user), -): - """컴플라이언스 현황 — 정책 통과율, 리스크 현황, 프레임워크별 준수율.""" - # 정책 통계 - policy_result = await db.execute(select(GRCPolicy)) - policies = policy_result.scalars().all() - policy_stats = {"total": len(policies), "approved": 0, "draft": 0, "deprecated": 0} - for p in policies: - policy_stats[p.status] = policy_stats.get(p.status, 0) + 1 - - # 리스크 통계 - risk_result = await db.execute(select(RiskItem)) - risks = risk_result.scalars().all() - risk_stats = {"total": len(risks), "open": 0, "mitigating": 0, "closed": 0, "accepted": 0} - critical_open = 0 - for r in risks: - risk_stats[r.status] = risk_stats.get(r.status, 0) + 1 - if r.status == "open" and r.risk_level == "CRITICAL": - critical_open += 1 - - # 준수율 계산 (정책 승인율 기반 간소화) - approved_ratio = ( - policy_stats["approved"] / policy_stats["total"] - if policy_stats["total"] > 0 else 0.0 - ) - open_risk_ratio = ( - (risk_stats["open"] + risk_stats["mitigating"]) / risk_stats["total"] - if risk_stats["total"] > 0 else 0.0 - ) - overall_compliance = max(0.0, min(1.0, approved_ratio * 0.6 + (1 - open_risk_ratio) * 0.4)) - - # 선택 프레임워크 상세 - fw_detail = None - if framework and framework in _COMPLIANCE_FRAMEWORKS: - fw = _COMPLIANCE_FRAMEWORKS[framework] - # 해당 카테고리 정책 매핑 - cat_policies = [p for p in policies if p.category in [c.lower() for c in fw["categories"]]] - fw_detail = { - **fw, - "matched_policies": len(cat_policies), - "compliance_rate": round(overall_compliance * 100, 1), - } - - frameworks_summary = [] - for fw_key, fw_val in _COMPLIANCE_FRAMEWORKS.items(): - frameworks_summary.append({ - "id": fw_key, - "name": fw_val["name"], - "total_controls": fw_val["controls"], - "compliance_rate": round(overall_compliance * 100, 1), - }) - - return { - "overall_compliance_rate": round(overall_compliance * 100, 1), - "policy_stats": policy_stats, - "risk_stats": risk_stats, - "critical_open_risks": critical_open, - "frameworks": frameworks_summary, - "framework_detail": fw_detail, - "last_updated": datetime.now(timezone.utc).isoformat(), - } - +async def compliance_summary(user: User = Depends(get_current_user)): + crit = sum(1 for r in _risks if r.get("level") == "CRITICAL") + score = max(0, 100 - crit * 10) + return {"policies": {"total": len(_policies), "active": sum(1 for p in _policies if p.get("status") == "active")}, + "risks": {"total": len(_risks), "critical": crit}, + "compliance_score": score, + "grade": "A" if score >= 90 else "B" if score >= 80 else "C" if score >= 70 else "D"} @router.post("/audit-report") -async def generate_audit_report( - body: AuditReportIn, - db: AsyncSession = Depends(get_db), - current_user: User = Depends(require_admin), -): - """ - 감사 보고서 자동 생성 — Ollama로 서술 섹션을 작성하고 DB 데이터를 종합한다. - """ - if body.framework not in _COMPLIANCE_FRAMEWORKS: - raise HTTPException( - status_code=400, - detail=f"지원하지 않는 프레임워크: {body.framework}. 허용: {list(_COMPLIANCE_FRAMEWORKS)}", - ) - - fw = _COMPLIANCE_FRAMEWORKS[body.framework] - - # DB 데이터 수집 - policy_result = await db.execute(select(GRCPolicy)) - policies = policy_result.scalars().all() - approved_policies = [p for p in policies if p.status == "approved"] - - risk_result = await db.execute(select(RiskItem)) - risks = risk_result.scalars().all() - critical_risks = [r for r in risks if r.risk_level == "CRITICAL" and r.status == "open"] - high_risks = [r for r in risks if r.risk_level == "HIGH" and r.status == "open"] - - compliance_rate = ( - len(approved_policies) / len(policies) * 100 if policies else 0 - ) - - # Ollama 서술 생성 - summary_prompt = ( - f"GRC 감사 보고서 요약을 작성하세요.\n" - f"프레임워크: {fw['name']}\n" - f"감사 기간: {body.period}\n" - f"총 정책: {len(policies)}개, 승인됨: {len(approved_policies)}개\n" - f"총 리스크: {len(risks)}개, CRITICAL 미완료: {len(critical_risks)}개\n" - f"준수율: {compliance_rate:.1f}%\n" - f"한국어로 전문적인 감사 요약 문단을 3문장으로 작성하세요." - ) - ai_summary = await _ollama_generate(summary_prompt, max_tokens=400) - - if not ai_summary: - ai_summary = ( - f"{fw['name']} 프레임워크 기준 {body.period} 감사를 실시하였습니다. " - f"총 {len(policies)}개 정책 중 {len(approved_policies)}개({compliance_rate:.1f}%)가 승인되었으며, " - f"CRITICAL 미완료 리스크 {len(critical_risks)}건이 식별되었습니다." - ) - - # 보고서 구조 - report: Dict[str, Any] = { - "report_meta": { - "title": f"{fw['name']} 감사 보고서", - "framework": body.framework, - "period": body.period, - "auditor": body.auditor or current_user.username, - "generated_at": datetime.now(timezone.utc).isoformat(), - "generated_by": current_user.username, - }, - "executive_summary": ai_summary, - "compliance_overview": { - "framework": fw["name"], - "total_controls": fw["controls"], - "compliance_rate": round(compliance_rate, 1), - "status": "적합" if compliance_rate >= 80 else "개선필요" if compliance_rate >= 60 else "부적합", - }, - } - - if body.include_policies: - report["policy_status"] = { - "total": len(policies), - "approved": len(approved_policies), - "draft": sum(1 for p in policies if p.status == "draft"), - "deprecated": sum(1 for p in policies if p.status == "deprecated"), - "approved_titles": [p.title for p in approved_policies[:10]], - } - - if body.include_risks: - report["risk_summary"] = { - "total": len(risks), - "critical_open": len(critical_risks), - "high_open": len(high_risks), - "closed": sum(1 for r in risks if r.status == "closed"), - "critical_items": [ - {"id": r.id, "title": r.title, "score": r.risk_score} - for r in critical_risks[:5] - ], - } - - report["recommendations"] = _build_recommendations(critical_risks, high_risks, compliance_rate) - - return report +async def audit_report(standard: str = Query("CSAP"), user: User = Depends(require_admin_role)): + crit = sum(1 for r in _risks if r.get("level") == "CRITICAL") + score = max(0, 100 - crit * 10) + return {"standard": standard, "generated_at": datetime.utcnow().isoformat(), + "summary": {"total_policies": len(_policies), "total_risks": len(_risks), + "critical_risks": crit, "compliance_score": score}, + "findings": [f"CRITICAL 리스크 {crit}건", f"{standard} 준수율 {score}%"], + "recommendations": ["CRITICAL 리스크 즉시 완화 조치 필요"] if crit else ["컴플라이언스 상태 양호"]} +@router.get("/templates") +async def policy_templates(user: User = Depends(get_current_user)): + return {"templates": [ + {"id": 1, "name": "SSH root 접속 금지 정책", "category": "security"}, + {"id": 2, "name": "비밀번호 90일 주기 변경 정책", "category": "access"}, + {"id": 3, "name": "미사용 계정 30일 비활성화 정책", "category": "access"}, + {"id": 4, "name": "서버 패치 30일 내 적용 정책", "category": "operation"}, + {"id": 5, "name": "백업 7일 보관 확인 정책", "category": "operation"}, + ]} @router.get("/dashboard") -async def get_grc_dashboard( - db: AsyncSession = Depends(get_db), - current_user: User = Depends(get_current_user), -): - """GRC 종합 대시보드 — 정책·리스크·컴플라이언스 KPI 한 번에 반환.""" - # 정책 통계 - pol_result = await db.execute(select(GRCPolicy)) - policies = pol_result.scalars().all() - pol_by_status: Dict[str, int] = {} - pol_by_category: Dict[str, int] = {} - for p in policies: - pol_by_status[p.status] = pol_by_status.get(p.status, 0) + 1 - pol_by_category[p.category] = pol_by_category.get(p.category, 0) + 1 - - # 리스크 통계 - risk_result = await db.execute(select(RiskItem)) - risks = risk_result.scalars().all() - risk_by_level: Dict[str, int] = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0} - risk_by_status: Dict[str, int] = {} - for r in risks: - risk_by_level[r.risk_level] = risk_by_level.get(r.risk_level, 0) + 1 - risk_by_status[r.status] = risk_by_status.get(r.status, 0) + 1 - - # 준수율 KPI - total_pol = len(policies) - approved_pol = pol_by_status.get("approved", 0) - compliance_rate = round(approved_pol / total_pol * 100, 1) if total_pol > 0 else 0.0 - - open_risks = sum( - risk_by_status.get(s, 0) for s in ["open", "mitigating"] - ) - risk_closure_rate = round( - risk_by_status.get("closed", 0) / len(risks) * 100, 1 - ) if risks else 0.0 - - # 상위 리스크 - top_risks = sorted(risks, key=lambda r: r.risk_score, reverse=True)[:5] - - # 최근 정책 - recent_policies = sorted(policies, key=lambda p: p.created_at, reverse=True)[:5] - - return { - "summary": { - "policy_compliance_rate": compliance_rate, - "risk_closure_rate": risk_closure_rate, - "open_risks": open_risks, - "critical_risks": risk_by_level["CRITICAL"], - "total_policies": total_pol, - "total_risks": len(risks), - }, - "policy_breakdown": { - "by_status": pol_by_status, - "by_category": pol_by_category, - }, - "risk_breakdown": { - "by_level": risk_by_level, - "by_status": risk_by_status, - }, - "top_risks": [ - { - "id": r.id, - "title": r.title, - "risk_score": r.risk_score, - "risk_level": r.risk_level, - "status": r.status, - } - for r in top_risks - ], - "recent_policies": [ - { - "id": p.id, - "title": p.title, - "category": p.category, - "status": p.status, - "created_at": p.created_at.isoformat(), - } - for p in recent_policies - ], - "frameworks_coverage": list(_COMPLIANCE_FRAMEWORKS.keys()), - "generated_at": datetime.now(timezone.utc).isoformat(), - } - - -# ── 헬퍼 ───────────────────────────────────────────────────────────────────── - -def _build_recommendations( - critical_risks: list, - high_risks: list, - compliance_rate: float, -) -> List[str]: - """감사 결과 기반 권고 사항 자동 생성.""" - recs = [] - if critical_risks: - recs.append( - f"CRITICAL 리스크 {len(critical_risks)}건이 미처리 상태입니다. " - f"즉각적인 대응 조치가 필요합니다." - ) - if high_risks: - recs.append( - f"HIGH 리스크 {len(high_risks)}건에 대해 30일 이내 완화 계획을 수립하세요." - ) - if compliance_rate < 60: - recs.append( - "정책 승인율이 60% 미만입니다. 미승인 정책에 대한 검토 일정을 수립하세요." - ) - elif compliance_rate < 80: - recs.append( - "정책 승인율을 80% 이상으로 높이기 위한 추가 검토가 필요합니다." - ) - else: - recs.append("현재 정책 준수율은 양호합니다. 연간 재검토 주기를 유지하세요.") - recs.append("정기 내부 감사를 통해 지속적인 컴플라이언스 모니터링을 권고합니다.") - return recs +async def grc_dashboard(user: User = Depends(get_current_user)): + crit = sum(1 for r in _risks if r.get("level") == "CRITICAL") + score = max(0, 100 - crit * 10) + return {"compliance_score": score, + "grade": "A" if score >= 90 else "B" if score >= 80 else "C" if score >= 70 else "D", + "policies": len(_policies), "risks": len(_risks), "critical_risks": crit, + "recent_policies": _policies[-3:][::-1], + "top_risks": sorted(_risks, key=lambda x: x["risk_score"], reverse=True)[:3]}