""" 준수성 자동 점검 API (시큐어코딩 / 웹 접근성 / 개인정보보호법 / CSAP) 엔드포인트: POST /api/compliance/scan — 전체 프로젝트 스캔 (ADMIN 전용) GET /api/compliance/results — 최근 스캔 결과 조회 GET /api/compliance/rules — 점검 규칙 목록 POST /api/compliance/scan/file — 파일 텍스트 단건 점검 GET /api/compliance/report/html — HTML 점검 보고서 GET /api/compliance/report/excel — Excel 점검 보고서 [CSAP 공공기관 보안 자동 점검] POST /api/compliance/csap/scan — CSAP 전체 자동 점검 (ADMIN 전용) GET /api/compliance/csap/items — 점검 항목 목록 GET /api/compliance/csap/results — 최근 점검 결과 요약 GET /api/compliance/csap/results/{id} — 배치 상세 결과 POST /api/compliance/csap/evidence/{id} — 수동 증적 업로드 GET /api/compliance/csap/report/html — HTML 보고서 GET /api/compliance/csap/report/excel — Excel 보고서 GET /api/compliance/csap/dashboard — 기관별 준수율 대시보드 """ from __future__ import annotations import logging from datetime import datetime from typing import Optional from fastapi import APIRouter, Depends, HTTPException from fastapi.responses import HTMLResponse, Response from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user, require_admin_role from database import get_db from models import User, CSAPCheckResult logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/compliance", tags=["compliance"]) # 최근 스캔 결과 캐시 (메모리) _last_result: dict = {} class FileScanRequest(BaseModel): filename: str content: str # ── 전체 스캔 ───────────────────────────────────────────────────────────────── @router.post("/scan") async def run_scan( current_user: User = Depends(require_admin_role), ): """GUARDiA 소스 전체 준수성 점검 (ADMIN 전용).""" global _last_result from core.compliance_check import scan_project try: result = await scan_project() _last_result = result return { "message": f"점검 완료 — {result['scanned_files']}개 파일, {result['total_findings']}건 발견", "risk_level": result["risk_level"], "total_findings": result["total_findings"], "by_severity": result["by_severity"], "summary": result["summary"], } except Exception as e: raise HTTPException(500, f"점검 오류: {str(e)[:200]}") # ── 결과 조회 ───────────────────────────────────────────────────────────────── @router.get("/results") async def get_results( category: Optional[str] = None, severity: Optional[str] = None, limit: int = 50, current_user: User = Depends(get_current_user), ): """최근 스캔 결과 조회.""" if not _last_result: return {"message": "스캔 결과 없음 — POST /api/compliance/scan 실행 필요", "findings": []} findings = _last_result.get("findings", []) if category: findings = [f for f in findings if category.lower() in f["category"].lower()] if severity: findings = [f for f in findings if f["severity"].upper() == severity.upper()] return { "scan_time": _last_result.get("scan_time"), "risk_level": _last_result.get("risk_level"), "total_findings": _last_result.get("total_findings", 0), "by_severity": _last_result.get("by_severity", {}), "by_category": _last_result.get("by_category", {}), "summary": _last_result.get("summary", {}), "findings": findings[:limit], } # ── 규칙 목록 ───────────────────────────────────────────────────────────────── @router.get("/rules") async def list_rules( _u: User = Depends(get_current_user), ): """점검 규칙 목록 (패턴 제외).""" from core.compliance_check import SECURE_CODING_RULES, ACCESSIBILITY_RULES, PIPA_RULES return { "secure_coding": [{"id": r["id"], "category": r["category"], "severity": r["severity"], "message": r["message"]} for r in SECURE_CODING_RULES], "accessibility": [{"id": r["id"], "category": r["category"], "level": r["level"], "message": r["message"]} for r in ACCESSIBILITY_RULES], "privacy": [{"id": r["id"], "category": r["category"], "severity": r["severity"], "message": r["message"]} for r in PIPA_RULES], } # ── 단건 파일 점검 ──────────────────────────────────────────────────────────── @router.post("/scan/file") async def scan_single_file( body: FileScanRequest, _u: User = Depends(get_current_user), ): """파일 텍스트 단건 점검 (개발 중 빠른 검토용).""" from core.compliance_check import scan_file findings = scan_file(body.filename, body.content) return { "filename": body.filename, "findings": findings, "total": len(findings), "by_severity": { sev: sum(1 for f in findings if f["severity"] == sev) for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW"] if any(f["severity"] == sev for f in findings) } } # ── HTML 보고서 ────────────────────────────────────────────────────────────── @router.get("/report/html", response_class=HTMLResponse) async def compliance_html_report( _u: User = Depends(get_current_user), ): """준수성 점검 HTML 보고서.""" if not _last_result: return HTMLResponse("

스캔 결과 없음

POST /api/compliance/scan 을 먼저 실행하세요.

") findings = _last_result.get("findings", []) sev_color = {"CRITICAL": "#fed7d7", "HIGH": "#feebc8", "MEDIUM": "#fefcbf", "LOW": "#e6fffa"} def _row(f): bg = sev_color.get(f["severity"], "#fff") return ( f"" f"{f['rule_id']}{f['category']}" f"{f['severity']}" f"{f['file']}:{f['line']}" f"{f['message']}" f"{f.get('snippet','')[:60]}" ) rows = "".join(_row(f) for f in findings[:100]) risk = _last_result.get("risk_level", "?") risk_badge_color = {"CRITICAL": "#c53030", "HIGH": "#c05621", "MEDIUM": "#744210", "LOW": "#276749"}.get(risk, "#666") html = f""" GUARDiA 준수성 점검 보고서

GUARDiA 준수성 점검 보고서

점검일시: {_last_result.get('scan_time','')} | 스캔 파일: {_last_result.get('scanned_files',0)}개

종합 위험도: {risk} | 총 발견: {_last_result.get('total_findings',0)}건 | 시큐어코딩: {_last_result.get('summary',{}).get('secure_coding',0)}건 | 웹접근성: {_last_result.get('summary',{}).get('accessibility',0)}건 | 개인정보: {_last_result.get('summary',{}).get('privacy',0)}건

점검 결과 (상위 100건)

{rows}
규칙ID분류심각도위치내용코드

Copyright © 2026 GUARDiA All Rights Reserved. | 이 보고서는 자동 생성되었습니다.

""" return HTMLResponse(html) # ── Excel 보고서 ───────────────────────────────────────────────────────────── @router.get("/report/excel") async def compliance_excel_report( _u: User = Depends(get_current_user), ): """준수성 점검 Excel 보고서.""" if not _last_result: raise HTTPException(404, "스캔 결과 없음 — POST /api/compliance/scan 먼저 실행") try: import io, openpyxl from openpyxl.styles import Font, PatternFill, Alignment except ImportError: raise HTTPException(500, "openpyxl 미설치") wb = openpyxl.Workbook() ws = wb.active ws.title = "준수성 점검 결과" headers = ["규칙 ID", "분류", "심각도", "파일", "라인", "내용", "코드 스니펫"] for col, h in enumerate(headers, 1): c = ws.cell(row=1, column=col, value=h) c.font = Font(bold=True, color="FFFFFF") c.fill = PatternFill("solid", fgColor="1a365d") sev_colors = {"CRITICAL": "FED7D7", "HIGH": "FEEBC8", "MEDIUM": "FEFCBF", "LOW": "E6FFFA"} for row, f in enumerate(_last_result.get("findings", []), 2): color = sev_colors.get(f["severity"], "FFFFFF") vals = [f["rule_id"], f["category"], f["severity"], f["file"], f["line"], f["message"], f.get("snippet", "")] for col, val in enumerate(vals, 1): c = ws.cell(row=row, column=col, value=val) c.fill = PatternFill("solid", fgColor=color) for col_idx, width in enumerate([10, 18, 12, 40, 7, 50, 40], 1): ws.column_dimensions[ws.cell(1, col_idx).column_letter].width = width buf = io.BytesIO() wb.save(buf) today = datetime.utcnow().strftime("%Y%m%d") return Response( content=buf.getvalue(), media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={"Content-Disposition": f'attachment; filename="GUARDiA_compliance_{today}.xlsx"'}, ) # ════════════════════════════════════════════════════════════════════════════════ # CSAP 공공기관 보안 자동 점검 # ════════════════════════════════════════════════════════════════════════════════ class CSAPScanRequest(BaseModel): inst_id: int class EvidenceUpload(BaseModel): item_id: str inst_id: int finding: Optional[str] = None evidence_note: str status: str = "PASS" # PASS | PARTIAL @router.post("/csap/scan") async def csap_scan( body: CSAPScanRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(require_admin_role), ): """CSAP 공공기관 보안 전체 자동 점검 (ADMIN 전용).""" from core.csap_checker import CSAPChecker try: result = await CSAPChecker().run_scan(db, body.inst_id, current_user.username) return result except Exception as e: raise HTTPException(500, f"CSAP 점검 오류: {str(e)[:200]}") @router.get("/csap/items") async def csap_items( category: Optional[str] = None, auto_only: bool = False, _u: User = Depends(get_current_user), ): """CSAP 점검 항목 목록.""" from core.csap_checker import CSAP_ITEMS items = CSAP_ITEMS if category: items = [i for i in items if i["cat"] == category] if auto_only: items = [i for i in items if i["auto"]] return { "total": len(items), "categories": list({i["cat"] for i in CSAP_ITEMS}), "items": items, } @router.get("/csap/results") async def csap_results( inst_id: Optional[int] = None, limit: int = 10, db: AsyncSession = Depends(get_db), _u: User = Depends(get_current_user), ): """최근 CSAP 점검 결과 요약 (배치별).""" from sqlalchemy import select, distinct, desc, func as sqlfunc q = select( CSAPCheckResult.scan_id, CSAPCheckResult.inst_id, sqlfunc.count(CSAPCheckResult.id).label("total"), sqlfunc.sum( (CSAPCheckResult.status == "PASS").cast(Integer) ).label("pass_count"), sqlfunc.max(CSAPCheckResult.scanned_at).label("scanned_at"), ).group_by(CSAPCheckResult.scan_id, CSAPCheckResult.inst_id) if inst_id: q = q.where(CSAPCheckResult.inst_id == inst_id) q = q.order_by(desc("scanned_at")).limit(limit) from sqlalchemy import Integer result = await db.execute(q) rows = result.all() return { "count": len(rows), "scans": [ { "scan_id": r.scan_id, "inst_id": r.inst_id, "total": r.total, "pass_count": r.pass_count or 0, "scanned_at": r.scanned_at.isoformat() if r.scanned_at else None, } for r in rows ], } @router.get("/csap/results/{scan_id}") async def csap_result_detail( scan_id: str, db: AsyncSession = Depends(get_db), _u: User = Depends(get_current_user), ): """배치별 CSAP 점검 상세 결과.""" from sqlalchemy import select q = await db.execute( select(CSAPCheckResult).where(CSAPCheckResult.scan_id == scan_id) .order_by(CSAPCheckResult.item_id) ) items = q.scalars().all() if not items: raise HTTPException(404, "점검 결과를 찾을 수 없습니다.") pass_c = sum(1 for i in items if i.status == "PASS") fail_c = sum(1 for i in items if i.status == "FAIL") auto_total = sum(1 for i in items if i.status != "MANUAL_REQUIRED") rate = round((pass_c / auto_total * 100), 1) if auto_total else 0 return { "scan_id": scan_id, "total": len(items), "pass": pass_c, "fail": fail_c, "compliance_rate": rate, "results": [ { "item_id": i.item_id, "category": i.category, "item_name": i.item_name, "severity": i.severity, "status": i.status, "finding": i.finding, "recommendation": i.recommendation, } for i in items ], } @router.post("/csap/evidence/{item_id}") async def csap_upload_evidence( item_id: str, body: EvidenceUpload, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """수동 확인 항목 증적 업로드 (MANUAL_REQUIRED → PASS/PARTIAL).""" from sqlalchemy import select, update from core.csap_checker import CSAP_ITEMS item_def = next((i for i in CSAP_ITEMS if i["id"] == item_id), None) if not item_def: raise HTTPException(404, f"항목 {item_id}를 찾을 수 없습니다.") # 가장 최근 MANUAL_REQUIRED 결과 업데이트 q = await db.execute( select(CSAPCheckResult) .where(CSAPCheckResult.item_id == item_id, CSAPCheckResult.inst_id == body.inst_id, CSAPCheckResult.status == "MANUAL_REQUIRED") .order_by(CSAPCheckResult.scanned_at.desc()) .limit(1) ) rec = q.scalar_one_or_none() if not rec: # 신규 등록 rec = CSAPCheckResult( scan_id=f"MANUAL-{datetime.now().strftime('%Y%m%d')}", inst_id=body.inst_id, item_id=item_id, category=item_def["cat"], item_name=item_def["name"], severity=item_def["sev"], status=body.status, finding=body.finding, evidence={"note": body.evidence_note, "uploaded_by": current_user.username}, recommendation="", ) db.add(rec) else: rec.status = body.status rec.finding = body.finding or rec.finding rec.evidence = {"note": body.evidence_note, "uploaded_by": current_user.username} await db.commit() return {"message": f"{item_id} 증적 등록 완료", "status": body.status} @router.get("/csap/report/html", response_class=HTMLResponse) async def csap_html_report( scan_id: str, db: AsyncSession = Depends(get_db), _u: User = Depends(get_current_user), ): """CSAP HTML 보고서 (인쇄·공문 첨부용).""" from sqlalchemy import select q = await db.execute( select(CSAPCheckResult).where(CSAPCheckResult.scan_id == scan_id) .order_by(CSAPCheckResult.item_id) ) items = q.scalars().all() if not items: raise HTTPException(404, "점검 결과를 찾을 수 없습니다.") from core.csap_checker import CSAPChecker checker = CSAPChecker() pass_c = sum(1 for i in items if i.status == "PASS") auto_total = sum(1 for i in items if i.status != "MANUAL_REQUIRED") rate = round((pass_c / auto_total * 100), 1) if auto_total else 0 grade = "A" if rate >= 90 else ("B" if rate >= 70 else ("C" if rate >= 50 else "D")) summary = {"compliance_rate": rate, "grade": grade} html = checker.generate_html_report(items, scan_id, "기관", summary) return HTMLResponse(html) @router.get("/csap/report/excel") async def csap_excel_report( scan_id: str, db: AsyncSession = Depends(get_db), _u: User = Depends(get_current_user), ): """CSAP Excel 보고서.""" from sqlalchemy import select q = await db.execute( select(CSAPCheckResult).where(CSAPCheckResult.scan_id == scan_id) .order_by(CSAPCheckResult.item_id) ) items = q.scalars().all() if not items: raise HTTPException(404, "점검 결과를 찾을 수 없습니다.") from core.csap_checker import CSAPChecker xlsx_bytes = CSAPChecker().generate_excel_report(items, "기관", scan_id) today = datetime.utcnow().strftime("%Y%m%d") return Response( content=xlsx_bytes, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={"Content-Disposition": f'attachment; filename="CSAP_{scan_id}_{today}.xlsx"'}, ) @router.get("/csap/dashboard") async def csap_dashboard( db: AsyncSession = Depends(get_db), _u: User = Depends(get_current_user), ): """기관별 최근 CSAP 준수율 대시보드.""" from sqlalchemy import select, func as sqlfunc, Integer q = await db.execute( select( CSAPCheckResult.inst_id, CSAPCheckResult.scan_id, sqlfunc.count(CSAPCheckResult.id).label("total"), sqlfunc.sum( (CSAPCheckResult.status == "PASS").cast(Integer) ).label("pass_count"), sqlfunc.max(CSAPCheckResult.scanned_at).label("scanned_at"), ) .group_by(CSAPCheckResult.inst_id, CSAPCheckResult.scan_id) .order_by(CSAPCheckResult.inst_id, sqlfunc.max(CSAPCheckResult.scanned_at).desc()) ) rows = q.all() # 기관별 최근 1건만 seen = set() dashboard = [] for r in rows: if r.inst_id in seen: continue seen.add(r.inst_id) total = r.total or 1 pass_c = r.pass_count or 0 rate = round(pass_c / total * 100, 1) grade = "A" if rate >= 90 else ("B" if rate >= 70 else ("C" if rate >= 50 else "D")) dashboard.append({ "inst_id": r.inst_id, "scan_id": r.scan_id, "compliance_rate": rate, "grade": grade, "pass_count": pass_c, "total": total, "scanned_at": r.scanned_at.isoformat() if r.scanned_at else None, }) return {"count": len(dashboard), "institutions": dashboard}