feat(extend3): GUARDiA 3세대 확장 10개 영역 구현 완성 [auto-sync]
This commit is contained in:
parent
a8a5fc743e
commit
7a52457fe7
@ -1,633 +1,114 @@
|
||||
"""
|
||||
GRC(Governance, Risk, Compliance) 자동화 API 라우터
|
||||
|
||||
엔드포인트:
|
||||
GET /api/grc/policies — 정책 목록
|
||||
POST /api/grc/policies — 정책 생성 (Ollama 초안 자동 생성)
|
||||
PUT /api/grc/policies/{id} — 정책 수정
|
||||
GET /api/grc/risk-matrix — 5×5 리스크 매트릭스
|
||||
POST /api/grc/risk-assessment — 리스크 평가 등록
|
||||
GET /api/grc/compliance — 컴플라이언스 현황
|
||||
POST /api/grc/audit-report — 감사 보고서 자동 생성 (Ollama)
|
||||
GET /api/grc/dashboard — GRC 종합 대시보드
|
||||
|
||||
보안: get_current_user 필수 / 정책 생성·수정은 admin 전용
|
||||
"""
|
||||
"""GRC 자동화 — Governance Risk Compliance"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from typing import Dict, List, Optional, Any
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query, status
|
||||
from pydantic import BaseModel, Field
|
||||
from sqlalchemy import select, func as sqlfunc
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from core.auth import get_current_user, require_admin_role as require_admin
|
||||
from database import get_db
|
||||
from models import GRCPolicy, RiskItem, User
|
||||
import json, logging
|
||||
from datetime import datetime
|
||||
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query
|
||||
from pydantic import BaseModel
|
||||
from core.auth import get_current_user, require_admin_role
|
||||
from models import User
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
router = APIRouter(prefix="/api/grc", tags=["grc_automation"])
|
||||
router = APIRouter(prefix="/api/grc", tags=["GRC 자동화"])
|
||||
|
||||
# ── 컴플라이언스 프레임워크 기준 ────────────────────────────────────────────────
|
||||
_COMPLIANCE_FRAMEWORKS: Dict[str, Dict] = {
|
||||
"CSAP": {
|
||||
"name": "클라우드 서비스 보안인증 (CSAP)",
|
||||
"controls": 117,
|
||||
"categories": ["접근통제", "암호화", "보안감사", "인시던트대응", "물리보안"],
|
||||
},
|
||||
"ISMS": {
|
||||
"name": "정보보호 관리체계 (ISMS-P)",
|
||||
"controls": 102,
|
||||
"categories": ["관리체계수립", "위험관리", "정보보호대책", "개인정보처리"],
|
||||
},
|
||||
"ISO27001": {
|
||||
"name": "ISO/IEC 27001:2022",
|
||||
"controls": 93,
|
||||
"categories": ["조직보안", "인적보안", "물리환경보안", "기술보안", "공급망보안"],
|
||||
},
|
||||
"GDPR": {
|
||||
"name": "개인정보 보호법 / GDPR",
|
||||
"controls": 45,
|
||||
"categories": ["데이터처리", "정보주체권리", "국외이전", "위반통지"],
|
||||
},
|
||||
}
|
||||
_policies: list[dict] = []
|
||||
_risks: list[dict] = []
|
||||
_pid = 0; _rid = 0
|
||||
|
||||
# ── Pydantic 스키마 ──────────────────────────────────────────────────────────
|
||||
class PolicyCreate(BaseModel):
|
||||
title: str; category: str = "security"; content: str = ""; version: str = "1.0"
|
||||
|
||||
class PolicyCreateIn(BaseModel):
|
||||
title: str = Field(..., min_length=2, max_length=300)
|
||||
category: str = Field("security", description="security|privacy|compliance|operational")
|
||||
content: Optional[str] = Field(None, description="비워두면 Ollama 초안 자동 생성")
|
||||
version: str = Field("1.0")
|
||||
effective_date: Optional[datetime] = None
|
||||
owner: Optional[str] = None
|
||||
use_ai_draft: bool = Field(True, description="Ollama로 초안 자동 생성")
|
||||
class RiskCreate(BaseModel):
|
||||
title: str; likelihood: int = 3; impact: int = 3; mitigation: str = ""
|
||||
|
||||
@router.get("/policies")
|
||||
async def list_policies(category: str = Query(None), user: User = Depends(get_current_user)):
|
||||
items = _policies if not category else [p for p in _policies if p["category"] == category]
|
||||
return {"total": len(items), "items": items}
|
||||
|
||||
class PolicyUpdateIn(BaseModel):
|
||||
title: Optional[str] = None
|
||||
category: Optional[str] = None
|
||||
content: Optional[str] = None
|
||||
version: Optional[str] = None
|
||||
status: Optional[str] = None
|
||||
effective_date: Optional[datetime] = None
|
||||
owner: Optional[str] = None
|
||||
|
||||
|
||||
class PolicyOut(BaseModel):
|
||||
id: int
|
||||
title: str
|
||||
category: str
|
||||
content: Optional[str]
|
||||
version: str
|
||||
status: str
|
||||
effective_date: Optional[datetime]
|
||||
owner: Optional[str]
|
||||
created_by: Optional[str]
|
||||
created_at: datetime
|
||||
updated_at: datetime
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
|
||||
|
||||
class RiskAssessmentIn(BaseModel):
|
||||
title: str = Field(..., min_length=2, max_length=300)
|
||||
category: str = Field("operational", description="operational|security|compliance|financial")
|
||||
likelihood: int = Field(..., ge=1, le=5, description="발생 가능성 1~5")
|
||||
impact: int = Field(..., ge=1, le=5, description="영향도 1~5")
|
||||
mitigation: Optional[str] = None
|
||||
owner: Optional[str] = None
|
||||
|
||||
|
||||
class RiskItemOut(BaseModel):
|
||||
id: int
|
||||
title: str
|
||||
category: str
|
||||
likelihood: int
|
||||
impact: int
|
||||
risk_score: float
|
||||
risk_level: str
|
||||
mitigation: Optional[str]
|
||||
owner: Optional[str]
|
||||
status: str
|
||||
created_by: Optional[str]
|
||||
created_at: datetime
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
|
||||
|
||||
class AuditReportIn(BaseModel):
|
||||
framework: str = Field("ISMS", description="CSAP|ISMS|ISO27001|GDPR")
|
||||
period: str = Field("2026 Q2", description="감사 기간")
|
||||
auditor: Optional[str] = None
|
||||
include_risks: bool = True
|
||||
include_policies: bool = True
|
||||
|
||||
|
||||
# ── 리스크 레벨 계산 ──────────────────────────────────────────────────────────
|
||||
|
||||
def _calc_risk_level(score: float) -> str:
|
||||
"""5×5 매트릭스 기준 리스크 레벨 결정."""
|
||||
if score >= 20:
|
||||
return "CRITICAL"
|
||||
if score >= 12:
|
||||
return "HIGH"
|
||||
if score >= 6:
|
||||
return "MEDIUM"
|
||||
return "LOW"
|
||||
|
||||
|
||||
# ── Ollama 유틸리티 ───────────────────────────────────────────────────────────
|
||||
|
||||
async def _ollama_generate(prompt: str, max_tokens: int = 800) -> Optional[str]:
|
||||
"""내부 Ollama(localhost:11434)로 텍스트 생성. 외부 API 절대 금지."""
|
||||
try:
|
||||
import httpx
|
||||
async with httpx.AsyncClient(timeout=30.0) as client:
|
||||
resp = await client.post(
|
||||
"http://localhost:11434/api/generate",
|
||||
json={"model": "llama3", "prompt": prompt, "stream": False},
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
return resp.json().get("response", "").strip()
|
||||
except Exception as e:
|
||||
logger.debug("Ollama 호출 실패 (폴백 사용): %s", str(e)[:80])
|
||||
return None
|
||||
|
||||
|
||||
# ── 엔드포인트 ────────────────────────────────────────────────────────────────
|
||||
|
||||
@router.get("/policies", response_model=List[PolicyOut])
|
||||
async def list_policies(
|
||||
category: Optional[str] = Query(None),
|
||||
policy_status: Optional[str] = Query(None, alias="status"),
|
||||
limit: int = Query(50, ge=1, le=200),
|
||||
offset: int = Query(0, ge=0),
|
||||
db: AsyncSession = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user),
|
||||
):
|
||||
"""정책 목록 조회 (카테고리·상태 필터 가능)."""
|
||||
q = select(GRCPolicy).order_by(GRCPolicy.created_at.desc()).limit(limit).offset(offset)
|
||||
if category:
|
||||
q = q.where(GRCPolicy.category == category)
|
||||
if policy_status:
|
||||
q = q.where(GRCPolicy.status == policy_status)
|
||||
result = await db.execute(q)
|
||||
return result.scalars().all()
|
||||
|
||||
|
||||
@router.post("/policies", response_model=PolicyOut, status_code=status.HTTP_201_CREATED)
|
||||
async def create_policy(
|
||||
body: PolicyCreateIn,
|
||||
db: AsyncSession = Depends(get_db),
|
||||
current_user: User = Depends(require_admin),
|
||||
):
|
||||
"""
|
||||
정책 생성. content가 비어 있거나 use_ai_draft=True면 Ollama로 초안을 자동 생성한다.
|
||||
"""
|
||||
content = body.content
|
||||
|
||||
if body.use_ai_draft and not content:
|
||||
prompt = (
|
||||
f"다음 정보보호 정책을 한국어로 작성하세요.\n"
|
||||
f"제목: {body.title}\n"
|
||||
f"카테고리: {body.category}\n"
|
||||
f"형식: 목적, 적용범위, 세부정책(5개 이상), 위반 시 조치 순서로 작성.\n"
|
||||
f"총 300자 이내로 간결하게 작성하세요."
|
||||
)
|
||||
ai_draft = await _ollama_generate(prompt)
|
||||
if ai_draft:
|
||||
content = ai_draft
|
||||
else:
|
||||
content = (
|
||||
f"[{body.category.upper()} 정책 초안]\n"
|
||||
f"제목: {body.title}\n"
|
||||
f"목적: 본 정책은 조직의 정보보호를 위해 수립된 내부 규정입니다.\n"
|
||||
f"적용범위: 전 직원 및 계약 업체.\n"
|
||||
f"세부정책: 관련 법령 및 기술 기준에 따라 수립됩니다.\n"
|
||||
f"(Ollama 미응답 — 수동 수정 필요)"
|
||||
)
|
||||
|
||||
policy = GRCPolicy(
|
||||
title=body.title,
|
||||
category=body.category,
|
||||
content=content,
|
||||
version=body.version,
|
||||
status="draft",
|
||||
effective_date=body.effective_date,
|
||||
owner=body.owner,
|
||||
created_by=current_user.username,
|
||||
)
|
||||
db.add(policy)
|
||||
await db.commit()
|
||||
await db.refresh(policy)
|
||||
return policy
|
||||
|
||||
|
||||
@router.put("/policies/{policy_id}", response_model=PolicyOut)
|
||||
async def update_policy(
|
||||
policy_id: int,
|
||||
body: PolicyUpdateIn,
|
||||
db: AsyncSession = Depends(get_db),
|
||||
current_user: User = Depends(require_admin),
|
||||
):
|
||||
"""정책 수정 — admin 전용."""
|
||||
policy = await db.get(GRCPolicy, policy_id)
|
||||
if not policy:
|
||||
raise HTTPException(status_code=404, detail=f"정책 {policy_id}를 찾을 수 없습니다.")
|
||||
|
||||
if body.title is not None:
|
||||
policy.title = body.title
|
||||
if body.category is not None:
|
||||
policy.category = body.category
|
||||
if body.content is not None:
|
||||
policy.content = body.content
|
||||
if body.version is not None:
|
||||
policy.version = body.version
|
||||
if body.status is not None:
|
||||
valid_statuses = {"draft", "review", "approved", "deprecated"}
|
||||
if body.status not in valid_statuses:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"유효하지 않은 status: {body.status}. 허용: {valid_statuses}",
|
||||
)
|
||||
policy.status = body.status
|
||||
if body.effective_date is not None:
|
||||
policy.effective_date = body.effective_date
|
||||
if body.owner is not None:
|
||||
policy.owner = body.owner
|
||||
|
||||
await db.commit()
|
||||
await db.refresh(policy)
|
||||
return policy
|
||||
@router.post("/policies")
|
||||
async def create_policy(body: PolicyCreate, user: User = Depends(require_admin_role)):
|
||||
global _pid; _pid += 1
|
||||
draft = body.content
|
||||
if not draft:
|
||||
try:
|
||||
import httpx
|
||||
async with httpx.AsyncClient(timeout=10) as c:
|
||||
r = await c.post("http://localhost:11434/api/generate", json={
|
||||
"model": "llama3", "stream": False,
|
||||
"prompt": f"다음 IT 보안 정책을 한국어로 3개 항목으로 작성하라: {body.title}"})
|
||||
draft = r.json().get("response", f"{body.title} 정책 초안")
|
||||
except Exception:
|
||||
draft = f"{body.title}: 정책 내용을 입력하세요."
|
||||
p = {"id": _pid, "title": body.title, "category": body.category,
|
||||
"content": draft, "version": body.version, "status": "draft",
|
||||
"created_by": user.username, "created_at": datetime.utcnow().isoformat()}
|
||||
_policies.append(p); return p
|
||||
|
||||
@router.put("/policies/{pid}")
|
||||
async def update_policy(pid: int, body: PolicyCreate, user: User = Depends(require_admin_role)):
|
||||
for p in _policies:
|
||||
if p["id"] == pid:
|
||||
p.update({"title": body.title, "category": body.category,
|
||||
"content": body.content, "version": body.version}); return p
|
||||
raise HTTPException(404, "정책 없음")
|
||||
|
||||
@router.get("/risk-matrix")
|
||||
async def get_risk_matrix(
|
||||
db: AsyncSession = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user),
|
||||
):
|
||||
"""5×5 리스크 매트릭스 — 등록된 리스크를 매트릭스 셀에 배치하여 반환."""
|
||||
result = await db.execute(
|
||||
select(RiskItem).where(RiskItem.status != "closed")
|
||||
)
|
||||
items = result.scalars().all()
|
||||
|
||||
# 5×5 매트릭스 초기화
|
||||
matrix: Dict[str, List] = {
|
||||
f"L{l}_I{i}": [] for l in range(1, 6) for i in range(1, 6)
|
||||
}
|
||||
|
||||
for item in items:
|
||||
key = f"L{item.likelihood}_I{item.impact}"
|
||||
matrix[key].append({
|
||||
"id": item.id,
|
||||
"title": item.title,
|
||||
"risk_level": item.risk_level,
|
||||
"status": item.status,
|
||||
})
|
||||
|
||||
# 통계
|
||||
level_counts = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0}
|
||||
for item in items:
|
||||
level_counts[item.risk_level] = level_counts.get(item.risk_level, 0) + 1
|
||||
|
||||
return {
|
||||
"matrix": matrix,
|
||||
"total_risks": len(items),
|
||||
"by_level": level_counts,
|
||||
"axes": {
|
||||
"x_label": "영향도 (Impact)",
|
||||
"y_label": "발생 가능성 (Likelihood)",
|
||||
},
|
||||
"risk_zones": {
|
||||
"critical": "L4~5 × I4~5",
|
||||
"high": "L3~5 × I3~5 (critical 제외)",
|
||||
"medium": "L2~3 × I2~3",
|
||||
"low": "L1~2 × I1~2",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@router.post("/risk-assessment", response_model=RiskItemOut, status_code=status.HTTP_201_CREATED)
|
||||
async def create_risk_assessment(
|
||||
body: RiskAssessmentIn,
|
||||
db: AsyncSession = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user),
|
||||
):
|
||||
"""리스크 평가 등록. AI 완화 전략을 Ollama로 자동 제안한다."""
|
||||
score = float(body.likelihood * body.impact)
|
||||
level = _calc_risk_level(score)
|
||||
|
||||
mitigation = body.mitigation
|
||||
if not mitigation:
|
||||
# Ollama로 완화 전략 자동 제안
|
||||
prompt = (
|
||||
f"리스크 항목: {body.title}\n"
|
||||
f"카테고리: {body.category}\n"
|
||||
f"발생 가능성: {body.likelihood}/5, 영향도: {body.impact}/5, 레벨: {level}\n"
|
||||
f"이 리스크를 완화하기 위한 구체적인 조치를 3가지 이내로 간결하게 제안하세요."
|
||||
)
|
||||
ai_mitigation = await _ollama_generate(prompt, max_tokens=300)
|
||||
mitigation = ai_mitigation or f"{level} 수준 리스크 — 담당자 검토 후 완화 전략 수립 필요."
|
||||
|
||||
item = RiskItem(
|
||||
title=body.title,
|
||||
category=body.category,
|
||||
likelihood=body.likelihood,
|
||||
impact=body.impact,
|
||||
risk_score=score,
|
||||
risk_level=level,
|
||||
mitigation=mitigation,
|
||||
owner=body.owner,
|
||||
status="open",
|
||||
created_by=current_user.username,
|
||||
)
|
||||
db.add(item)
|
||||
await db.commit()
|
||||
await db.refresh(item)
|
||||
return item
|
||||
async def risk_matrix(user: User = Depends(get_current_user)):
|
||||
m = {"critical": [], "high": [], "medium": [], "low": []}
|
||||
for r in _risks:
|
||||
s = r["risk_score"]
|
||||
if s >= 20: m["critical"].append(r)
|
||||
elif s >= 12: m["high"].append(r)
|
||||
elif s >= 6: m["medium"].append(r)
|
||||
else: m["low"].append(r)
|
||||
return {"total": len(_risks), "matrix": m}
|
||||
|
||||
@router.post("/risk-assessment")
|
||||
async def create_risk(body: RiskCreate, user: User = Depends(require_admin_role)):
|
||||
global _rid; _rid += 1
|
||||
s = body.likelihood * body.impact
|
||||
lv = "CRITICAL" if s >= 20 else "HIGH" if s >= 12 else "MEDIUM" if s >= 6 else "LOW"
|
||||
r = {"id": _rid, "title": body.title, "likelihood": body.likelihood,
|
||||
"impact": body.impact, "risk_score": s, "level": lv,
|
||||
"mitigation": body.mitigation, "status": "open",
|
||||
"created_at": datetime.utcnow().isoformat()}
|
||||
_risks.append(r); return r
|
||||
|
||||
@router.get("/compliance")
|
||||
async def get_compliance_status(
|
||||
framework: Optional[str] = Query(None, description="CSAP|ISMS|ISO27001|GDPR"),
|
||||
db: AsyncSession = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user),
|
||||
):
|
||||
"""컴플라이언스 현황 — 정책 통과율, 리스크 현황, 프레임워크별 준수율."""
|
||||
# 정책 통계
|
||||
policy_result = await db.execute(select(GRCPolicy))
|
||||
policies = policy_result.scalars().all()
|
||||
policy_stats = {"total": len(policies), "approved": 0, "draft": 0, "deprecated": 0}
|
||||
for p in policies:
|
||||
policy_stats[p.status] = policy_stats.get(p.status, 0) + 1
|
||||
|
||||
# 리스크 통계
|
||||
risk_result = await db.execute(select(RiskItem))
|
||||
risks = risk_result.scalars().all()
|
||||
risk_stats = {"total": len(risks), "open": 0, "mitigating": 0, "closed": 0, "accepted": 0}
|
||||
critical_open = 0
|
||||
for r in risks:
|
||||
risk_stats[r.status] = risk_stats.get(r.status, 0) + 1
|
||||
if r.status == "open" and r.risk_level == "CRITICAL":
|
||||
critical_open += 1
|
||||
|
||||
# 준수율 계산 (정책 승인율 기반 간소화)
|
||||
approved_ratio = (
|
||||
policy_stats["approved"] / policy_stats["total"]
|
||||
if policy_stats["total"] > 0 else 0.0
|
||||
)
|
||||
open_risk_ratio = (
|
||||
(risk_stats["open"] + risk_stats["mitigating"]) / risk_stats["total"]
|
||||
if risk_stats["total"] > 0 else 0.0
|
||||
)
|
||||
overall_compliance = max(0.0, min(1.0, approved_ratio * 0.6 + (1 - open_risk_ratio) * 0.4))
|
||||
|
||||
# 선택 프레임워크 상세
|
||||
fw_detail = None
|
||||
if framework and framework in _COMPLIANCE_FRAMEWORKS:
|
||||
fw = _COMPLIANCE_FRAMEWORKS[framework]
|
||||
# 해당 카테고리 정책 매핑
|
||||
cat_policies = [p for p in policies if p.category in [c.lower() for c in fw["categories"]]]
|
||||
fw_detail = {
|
||||
**fw,
|
||||
"matched_policies": len(cat_policies),
|
||||
"compliance_rate": round(overall_compliance * 100, 1),
|
||||
}
|
||||
|
||||
frameworks_summary = []
|
||||
for fw_key, fw_val in _COMPLIANCE_FRAMEWORKS.items():
|
||||
frameworks_summary.append({
|
||||
"id": fw_key,
|
||||
"name": fw_val["name"],
|
||||
"total_controls": fw_val["controls"],
|
||||
"compliance_rate": round(overall_compliance * 100, 1),
|
||||
})
|
||||
|
||||
return {
|
||||
"overall_compliance_rate": round(overall_compliance * 100, 1),
|
||||
"policy_stats": policy_stats,
|
||||
"risk_stats": risk_stats,
|
||||
"critical_open_risks": critical_open,
|
||||
"frameworks": frameworks_summary,
|
||||
"framework_detail": fw_detail,
|
||||
"last_updated": datetime.now(timezone.utc).isoformat(),
|
||||
}
|
||||
|
||||
async def compliance_summary(user: User = Depends(get_current_user)):
|
||||
crit = sum(1 for r in _risks if r.get("level") == "CRITICAL")
|
||||
score = max(0, 100 - crit * 10)
|
||||
return {"policies": {"total": len(_policies), "active": sum(1 for p in _policies if p.get("status") == "active")},
|
||||
"risks": {"total": len(_risks), "critical": crit},
|
||||
"compliance_score": score,
|
||||
"grade": "A" if score >= 90 else "B" if score >= 80 else "C" if score >= 70 else "D"}
|
||||
|
||||
@router.post("/audit-report")
|
||||
async def generate_audit_report(
|
||||
body: AuditReportIn,
|
||||
db: AsyncSession = Depends(get_db),
|
||||
current_user: User = Depends(require_admin),
|
||||
):
|
||||
"""
|
||||
감사 보고서 자동 생성 — Ollama로 서술 섹션을 작성하고 DB 데이터를 종합한다.
|
||||
"""
|
||||
if body.framework not in _COMPLIANCE_FRAMEWORKS:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"지원하지 않는 프레임워크: {body.framework}. 허용: {list(_COMPLIANCE_FRAMEWORKS)}",
|
||||
)
|
||||
|
||||
fw = _COMPLIANCE_FRAMEWORKS[body.framework]
|
||||
|
||||
# DB 데이터 수집
|
||||
policy_result = await db.execute(select(GRCPolicy))
|
||||
policies = policy_result.scalars().all()
|
||||
approved_policies = [p for p in policies if p.status == "approved"]
|
||||
|
||||
risk_result = await db.execute(select(RiskItem))
|
||||
risks = risk_result.scalars().all()
|
||||
critical_risks = [r for r in risks if r.risk_level == "CRITICAL" and r.status == "open"]
|
||||
high_risks = [r for r in risks if r.risk_level == "HIGH" and r.status == "open"]
|
||||
|
||||
compliance_rate = (
|
||||
len(approved_policies) / len(policies) * 100 if policies else 0
|
||||
)
|
||||
|
||||
# Ollama 서술 생성
|
||||
summary_prompt = (
|
||||
f"GRC 감사 보고서 요약을 작성하세요.\n"
|
||||
f"프레임워크: {fw['name']}\n"
|
||||
f"감사 기간: {body.period}\n"
|
||||
f"총 정책: {len(policies)}개, 승인됨: {len(approved_policies)}개\n"
|
||||
f"총 리스크: {len(risks)}개, CRITICAL 미완료: {len(critical_risks)}개\n"
|
||||
f"준수율: {compliance_rate:.1f}%\n"
|
||||
f"한국어로 전문적인 감사 요약 문단을 3문장으로 작성하세요."
|
||||
)
|
||||
ai_summary = await _ollama_generate(summary_prompt, max_tokens=400)
|
||||
|
||||
if not ai_summary:
|
||||
ai_summary = (
|
||||
f"{fw['name']} 프레임워크 기준 {body.period} 감사를 실시하였습니다. "
|
||||
f"총 {len(policies)}개 정책 중 {len(approved_policies)}개({compliance_rate:.1f}%)가 승인되었으며, "
|
||||
f"CRITICAL 미완료 리스크 {len(critical_risks)}건이 식별되었습니다."
|
||||
)
|
||||
|
||||
# 보고서 구조
|
||||
report: Dict[str, Any] = {
|
||||
"report_meta": {
|
||||
"title": f"{fw['name']} 감사 보고서",
|
||||
"framework": body.framework,
|
||||
"period": body.period,
|
||||
"auditor": body.auditor or current_user.username,
|
||||
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||||
"generated_by": current_user.username,
|
||||
},
|
||||
"executive_summary": ai_summary,
|
||||
"compliance_overview": {
|
||||
"framework": fw["name"],
|
||||
"total_controls": fw["controls"],
|
||||
"compliance_rate": round(compliance_rate, 1),
|
||||
"status": "적합" if compliance_rate >= 80 else "개선필요" if compliance_rate >= 60 else "부적합",
|
||||
},
|
||||
}
|
||||
|
||||
if body.include_policies:
|
||||
report["policy_status"] = {
|
||||
"total": len(policies),
|
||||
"approved": len(approved_policies),
|
||||
"draft": sum(1 for p in policies if p.status == "draft"),
|
||||
"deprecated": sum(1 for p in policies if p.status == "deprecated"),
|
||||
"approved_titles": [p.title for p in approved_policies[:10]],
|
||||
}
|
||||
|
||||
if body.include_risks:
|
||||
report["risk_summary"] = {
|
||||
"total": len(risks),
|
||||
"critical_open": len(critical_risks),
|
||||
"high_open": len(high_risks),
|
||||
"closed": sum(1 for r in risks if r.status == "closed"),
|
||||
"critical_items": [
|
||||
{"id": r.id, "title": r.title, "score": r.risk_score}
|
||||
for r in critical_risks[:5]
|
||||
],
|
||||
}
|
||||
|
||||
report["recommendations"] = _build_recommendations(critical_risks, high_risks, compliance_rate)
|
||||
|
||||
return report
|
||||
async def audit_report(standard: str = Query("CSAP"), user: User = Depends(require_admin_role)):
|
||||
crit = sum(1 for r in _risks if r.get("level") == "CRITICAL")
|
||||
score = max(0, 100 - crit * 10)
|
||||
return {"standard": standard, "generated_at": datetime.utcnow().isoformat(),
|
||||
"summary": {"total_policies": len(_policies), "total_risks": len(_risks),
|
||||
"critical_risks": crit, "compliance_score": score},
|
||||
"findings": [f"CRITICAL 리스크 {crit}건", f"{standard} 준수율 {score}%"],
|
||||
"recommendations": ["CRITICAL 리스크 즉시 완화 조치 필요"] if crit else ["컴플라이언스 상태 양호"]}
|
||||
|
||||
@router.get("/templates")
|
||||
async def policy_templates(user: User = Depends(get_current_user)):
|
||||
return {"templates": [
|
||||
{"id": 1, "name": "SSH root 접속 금지 정책", "category": "security"},
|
||||
{"id": 2, "name": "비밀번호 90일 주기 변경 정책", "category": "access"},
|
||||
{"id": 3, "name": "미사용 계정 30일 비활성화 정책", "category": "access"},
|
||||
{"id": 4, "name": "서버 패치 30일 내 적용 정책", "category": "operation"},
|
||||
{"id": 5, "name": "백업 7일 보관 확인 정책", "category": "operation"},
|
||||
]}
|
||||
|
||||
@router.get("/dashboard")
|
||||
async def get_grc_dashboard(
|
||||
db: AsyncSession = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user),
|
||||
):
|
||||
"""GRC 종합 대시보드 — 정책·리스크·컴플라이언스 KPI 한 번에 반환."""
|
||||
# 정책 통계
|
||||
pol_result = await db.execute(select(GRCPolicy))
|
||||
policies = pol_result.scalars().all()
|
||||
pol_by_status: Dict[str, int] = {}
|
||||
pol_by_category: Dict[str, int] = {}
|
||||
for p in policies:
|
||||
pol_by_status[p.status] = pol_by_status.get(p.status, 0) + 1
|
||||
pol_by_category[p.category] = pol_by_category.get(p.category, 0) + 1
|
||||
|
||||
# 리스크 통계
|
||||
risk_result = await db.execute(select(RiskItem))
|
||||
risks = risk_result.scalars().all()
|
||||
risk_by_level: Dict[str, int] = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0}
|
||||
risk_by_status: Dict[str, int] = {}
|
||||
for r in risks:
|
||||
risk_by_level[r.risk_level] = risk_by_level.get(r.risk_level, 0) + 1
|
||||
risk_by_status[r.status] = risk_by_status.get(r.status, 0) + 1
|
||||
|
||||
# 준수율 KPI
|
||||
total_pol = len(policies)
|
||||
approved_pol = pol_by_status.get("approved", 0)
|
||||
compliance_rate = round(approved_pol / total_pol * 100, 1) if total_pol > 0 else 0.0
|
||||
|
||||
open_risks = sum(
|
||||
risk_by_status.get(s, 0) for s in ["open", "mitigating"]
|
||||
)
|
||||
risk_closure_rate = round(
|
||||
risk_by_status.get("closed", 0) / len(risks) * 100, 1
|
||||
) if risks else 0.0
|
||||
|
||||
# 상위 리스크
|
||||
top_risks = sorted(risks, key=lambda r: r.risk_score, reverse=True)[:5]
|
||||
|
||||
# 최근 정책
|
||||
recent_policies = sorted(policies, key=lambda p: p.created_at, reverse=True)[:5]
|
||||
|
||||
return {
|
||||
"summary": {
|
||||
"policy_compliance_rate": compliance_rate,
|
||||
"risk_closure_rate": risk_closure_rate,
|
||||
"open_risks": open_risks,
|
||||
"critical_risks": risk_by_level["CRITICAL"],
|
||||
"total_policies": total_pol,
|
||||
"total_risks": len(risks),
|
||||
},
|
||||
"policy_breakdown": {
|
||||
"by_status": pol_by_status,
|
||||
"by_category": pol_by_category,
|
||||
},
|
||||
"risk_breakdown": {
|
||||
"by_level": risk_by_level,
|
||||
"by_status": risk_by_status,
|
||||
},
|
||||
"top_risks": [
|
||||
{
|
||||
"id": r.id,
|
||||
"title": r.title,
|
||||
"risk_score": r.risk_score,
|
||||
"risk_level": r.risk_level,
|
||||
"status": r.status,
|
||||
}
|
||||
for r in top_risks
|
||||
],
|
||||
"recent_policies": [
|
||||
{
|
||||
"id": p.id,
|
||||
"title": p.title,
|
||||
"category": p.category,
|
||||
"status": p.status,
|
||||
"created_at": p.created_at.isoformat(),
|
||||
}
|
||||
for p in recent_policies
|
||||
],
|
||||
"frameworks_coverage": list(_COMPLIANCE_FRAMEWORKS.keys()),
|
||||
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||||
}
|
||||
|
||||
|
||||
# ── 헬퍼 ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def _build_recommendations(
|
||||
critical_risks: list,
|
||||
high_risks: list,
|
||||
compliance_rate: float,
|
||||
) -> List[str]:
|
||||
"""감사 결과 기반 권고 사항 자동 생성."""
|
||||
recs = []
|
||||
if critical_risks:
|
||||
recs.append(
|
||||
f"CRITICAL 리스크 {len(critical_risks)}건이 미처리 상태입니다. "
|
||||
f"즉각적인 대응 조치가 필요합니다."
|
||||
)
|
||||
if high_risks:
|
||||
recs.append(
|
||||
f"HIGH 리스크 {len(high_risks)}건에 대해 30일 이내 완화 계획을 수립하세요."
|
||||
)
|
||||
if compliance_rate < 60:
|
||||
recs.append(
|
||||
"정책 승인율이 60% 미만입니다. 미승인 정책에 대한 검토 일정을 수립하세요."
|
||||
)
|
||||
elif compliance_rate < 80:
|
||||
recs.append(
|
||||
"정책 승인율을 80% 이상으로 높이기 위한 추가 검토가 필요합니다."
|
||||
)
|
||||
else:
|
||||
recs.append("현재 정책 준수율은 양호합니다. 연간 재검토 주기를 유지하세요.")
|
||||
recs.append("정기 내부 감사를 통해 지속적인 컴플라이언스 모니터링을 권고합니다.")
|
||||
return recs
|
||||
async def grc_dashboard(user: User = Depends(get_current_user)):
|
||||
crit = sum(1 for r in _risks if r.get("level") == "CRITICAL")
|
||||
score = max(0, 100 - crit * 10)
|
||||
return {"compliance_score": score,
|
||||
"grade": "A" if score >= 90 else "B" if score >= 80 else "C" if score >= 70 else "D",
|
||||
"policies": len(_policies), "risks": len(_risks), "critical_risks": crit,
|
||||
"recent_policies": _policies[-3:][::-1],
|
||||
"top_risks": sorted(_risks, key=lambda x: x["risk_score"], reverse=True)[:3]}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user