zioinfo-mail/itsm/routers/compliance.py
DESKTOP-TKLFCPR\ython fc756a493e feat(itsm): DR 자동화 · 네트워크 장비 관리 · CSAP 자동 점검 3종 추가
## 구현 내용

### DR 자동화 (routers/dr.py, core/dr_engine.py)
- DR 시나리오 등록/관리 (SERVER_FAILURE | SITE_FAILURE | DATA_CORRUPTION)
- 복구 테스트 자동화 (SSH 기반 단계별 실행 + 헬스체크)
- 백업 무결성 검증 (SSH → SHA-256 해시 검증)
- RTO/RPO 목표 대비 실적 대시보드
- Failover 실행 API (ADMIN 전용)

### 네트워크 장비 관리 (routers/network_devices.py, core/network_scanner.py)
- 스위치/라우터/방화벽/L4 장비 인벤토리 (CRUD)
- 벤더별 SSH 설정 백업 (Cisco IOS / Huawei VRP / Junos / Linux)
- 이전 백업과 unified diff 변경 감지
- 위험 명령어 차단 (write erase, factory-reset 등)
- 토폴로지 조회 API

### CSAP 공공기관 보안 자동 점검 (routers/compliance.py 확장, core/csap_checker.py)
- CSAP/ISMS-P 기반 25개 항목 자동 점검
- 기술적/운영 보안 자동 검증 (SSH, DB 직접 확인)
- 수동 항목 증적 업로드
- Excel/HTML 보고서 자동 생성
- 기관별 준수율 대시보드 (A~D 등급)

### DB 모델 추가 (models.py)
- DRScenario, DRTest
- NetworkDevice, NetworkConfigBackup
- CSAPCheckResult

### 하네스 확장
- 에이전트: dr-coordinator, network-guardian, csap-auditor
- 스킬: dr-automation, network-devices, csap-compliance
- guardia-orchestrator description에 DR/네트워크/CSAP 트리거 추가

### 매뉴얼
- 39_DR_네트워크장비_CSAP_운영가이드.md 신규 작성
- 16_API_명세서.md v2.1.0 업데이트 (617개 라우트, 섹션 21~23 추가)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-31 09:24:51 +09:00

523 lines
20 KiB
Python

"""
준수성 자동 점검 API (시큐어코딩 / 웹 접근성 / 개인정보보호법 / CSAP)
엔드포인트:
POST /api/compliance/scan — 전체 프로젝트 스캔 (ADMIN 전용)
GET /api/compliance/results — 최근 스캔 결과 조회
GET /api/compliance/rules — 점검 규칙 목록
POST /api/compliance/scan/file — 파일 텍스트 단건 점검
GET /api/compliance/report/html — HTML 점검 보고서
GET /api/compliance/report/excel — Excel 점검 보고서
[CSAP 공공기관 보안 자동 점검]
POST /api/compliance/csap/scan — CSAP 전체 자동 점검 (ADMIN 전용)
GET /api/compliance/csap/items — 점검 항목 목록
GET /api/compliance/csap/results — 최근 점검 결과 요약
GET /api/compliance/csap/results/{id} — 배치 상세 결과
POST /api/compliance/csap/evidence/{id} — 수동 증적 업로드
GET /api/compliance/csap/report/html — HTML 보고서
GET /api/compliance/csap/report/excel — Excel 보고서
GET /api/compliance/csap/dashboard — 기관별 준수율 대시보드
"""
from __future__ import annotations
import logging
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import HTMLResponse, Response
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user, require_admin_role
from database import get_db
from models import User, CSAPCheckResult
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/compliance", tags=["compliance"])
# 최근 스캔 결과 캐시 (메모리)
_last_result: dict = {}
class FileScanRequest(BaseModel):
filename: str
content: str
# ── 전체 스캔 ─────────────────────────────────────────────────────────────────
@router.post("/scan")
async def run_scan(
current_user: User = Depends(require_admin_role),
):
"""GUARDiA 소스 전체 준수성 점검 (ADMIN 전용)."""
global _last_result
from core.compliance_check import scan_project
try:
result = await scan_project()
_last_result = result
return {
"message": f"점검 완료 — {result['scanned_files']}개 파일, {result['total_findings']}건 발견",
"risk_level": result["risk_level"],
"total_findings": result["total_findings"],
"by_severity": result["by_severity"],
"summary": result["summary"],
}
except Exception as e:
raise HTTPException(500, f"점검 오류: {str(e)[:200]}")
# ── 결과 조회 ─────────────────────────────────────────────────────────────────
@router.get("/results")
async def get_results(
category: Optional[str] = None,
severity: Optional[str] = None,
limit: int = 50,
current_user: User = Depends(get_current_user),
):
"""최근 스캔 결과 조회."""
if not _last_result:
return {"message": "스캔 결과 없음 — POST /api/compliance/scan 실행 필요", "findings": []}
findings = _last_result.get("findings", [])
if category:
findings = [f for f in findings if category.lower() in f["category"].lower()]
if severity:
findings = [f for f in findings if f["severity"].upper() == severity.upper()]
return {
"scan_time": _last_result.get("scan_time"),
"risk_level": _last_result.get("risk_level"),
"total_findings": _last_result.get("total_findings", 0),
"by_severity": _last_result.get("by_severity", {}),
"by_category": _last_result.get("by_category", {}),
"summary": _last_result.get("summary", {}),
"findings": findings[:limit],
}
# ── 규칙 목록 ─────────────────────────────────────────────────────────────────
@router.get("/rules")
async def list_rules(
_u: User = Depends(get_current_user),
):
"""점검 규칙 목록 (패턴 제외)."""
from core.compliance_check import SECURE_CODING_RULES, ACCESSIBILITY_RULES, PIPA_RULES
return {
"secure_coding": [{"id": r["id"], "category": r["category"], "severity": r["severity"], "message": r["message"]} for r in SECURE_CODING_RULES],
"accessibility": [{"id": r["id"], "category": r["category"], "level": r["level"], "message": r["message"]} for r in ACCESSIBILITY_RULES],
"privacy": [{"id": r["id"], "category": r["category"], "severity": r["severity"], "message": r["message"]} for r in PIPA_RULES],
}
# ── 단건 파일 점검 ────────────────────────────────────────────────────────────
@router.post("/scan/file")
async def scan_single_file(
body: FileScanRequest,
_u: User = Depends(get_current_user),
):
"""파일 텍스트 단건 점검 (개발 중 빠른 검토용)."""
from core.compliance_check import scan_file
findings = scan_file(body.filename, body.content)
return {
"filename": body.filename,
"findings": findings,
"total": len(findings),
"by_severity": {
sev: sum(1 for f in findings if f["severity"] == sev)
for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW"]
if any(f["severity"] == sev for f in findings)
}
}
# ── HTML 보고서 ──────────────────────────────────────────────────────────────
@router.get("/report/html", response_class=HTMLResponse)
async def compliance_html_report(
_u: User = Depends(get_current_user),
):
"""준수성 점검 HTML 보고서."""
if not _last_result:
return HTMLResponse("<h2>스캔 결과 없음</h2><p>POST /api/compliance/scan 을 먼저 실행하세요.</p>")
findings = _last_result.get("findings", [])
sev_color = {"CRITICAL": "#fed7d7", "HIGH": "#feebc8", "MEDIUM": "#fefcbf", "LOW": "#e6fffa"}
def _row(f):
bg = sev_color.get(f["severity"], "#fff")
return (
f"<tr style='background:{bg}'>"
f"<td>{f['rule_id']}</td><td>{f['category']}</td>"
f"<td><b>{f['severity']}</b></td>"
f"<td>{f['file']}:{f['line']}</td>"
f"<td style='font-size:12px'>{f['message']}</td>"
f"<td><code style='font-size:11px'>{f.get('snippet','')[:60]}</code></td></tr>"
)
rows = "".join(_row(f) for f in findings[:100])
risk = _last_result.get("risk_level", "?")
risk_badge_color = {"CRITICAL": "#c53030", "HIGH": "#c05621", "MEDIUM": "#744210", "LOW": "#276749"}.get(risk, "#666")
html = f"""<!DOCTYPE html><html lang="ko"><head><meta charset="UTF-8">
<title>GUARDiA 준수성 점검 보고서</title>
<style>
body{{font-family:Arial,sans-serif;margin:30px;color:#333;font-size:13px}}
h1{{color:#1a365d}} h2{{color:#2c5282;margin-top:20px}}
table{{border-collapse:collapse;width:100%}} th{{background:#2d3748;color:#fff;padding:8px;text-align:left}}
td{{padding:6px 8px;border-bottom:1px solid #e2e8f0}}
.badge{{padding:3px 10px;border-radius:12px;font-size:11px;font-weight:bold;color:white;background:{risk_badge_color}}}
</style></head><body>
<h1>GUARDiA 준수성 점검 보고서</h1>
<p>점검일시: {_last_result.get('scan_time','')} | 스캔 파일: {_last_result.get('scanned_files',0)}개</p>
<p>종합 위험도: <span class="badge">{risk}</span>
| 총 발견: {_last_result.get('total_findings',0)}
| 시큐어코딩: {_last_result.get('summary',{}).get('secure_coding',0)}
| 웹접근성: {_last_result.get('summary',{}).get('accessibility',0)}
| 개인정보: {_last_result.get('summary',{}).get('privacy',0)}
</p>
<h2>점검 결과 (상위 100건)</h2>
<table>
<tr><th>규칙ID</th><th>분류</th><th>심각도</th><th>위치</th><th>내용</th><th>코드</th></tr>
{rows}
</table>
<p style="margin-top:24px;color:#718096;font-size:11px">
Copyright &copy; 2026 GUARDiA All Rights Reserved. | 이 보고서는 자동 생성되었습니다.
</p></body></html>"""
return HTMLResponse(html)
# ── Excel 보고서 ─────────────────────────────────────────────────────────────
@router.get("/report/excel")
async def compliance_excel_report(
_u: User = Depends(get_current_user),
):
"""준수성 점검 Excel 보고서."""
if not _last_result:
raise HTTPException(404, "스캔 결과 없음 — POST /api/compliance/scan 먼저 실행")
try:
import io, openpyxl
from openpyxl.styles import Font, PatternFill, Alignment
except ImportError:
raise HTTPException(500, "openpyxl 미설치")
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "준수성 점검 결과"
headers = ["규칙 ID", "분류", "심각도", "파일", "라인", "내용", "코드 스니펫"]
for col, h in enumerate(headers, 1):
c = ws.cell(row=1, column=col, value=h)
c.font = Font(bold=True, color="FFFFFF")
c.fill = PatternFill("solid", fgColor="1a365d")
sev_colors = {"CRITICAL": "FED7D7", "HIGH": "FEEBC8", "MEDIUM": "FEFCBF", "LOW": "E6FFFA"}
for row, f in enumerate(_last_result.get("findings", []), 2):
color = sev_colors.get(f["severity"], "FFFFFF")
vals = [f["rule_id"], f["category"], f["severity"], f["file"], f["line"], f["message"], f.get("snippet", "")]
for col, val in enumerate(vals, 1):
c = ws.cell(row=row, column=col, value=val)
c.fill = PatternFill("solid", fgColor=color)
for col_idx, width in enumerate([10, 18, 12, 40, 7, 50, 40], 1):
ws.column_dimensions[ws.cell(1, col_idx).column_letter].width = width
buf = io.BytesIO()
wb.save(buf)
today = datetime.utcnow().strftime("%Y%m%d")
return Response(
content=buf.getvalue(),
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f'attachment; filename="GUARDiA_compliance_{today}.xlsx"'},
)
# ════════════════════════════════════════════════════════════════════════════════
# CSAP 공공기관 보안 자동 점검
# ════════════════════════════════════════════════════════════════════════════════
class CSAPScanRequest(BaseModel):
inst_id: int
class EvidenceUpload(BaseModel):
item_id: str
inst_id: int
finding: Optional[str] = None
evidence_note: str
status: str = "PASS" # PASS | PARTIAL
@router.post("/csap/scan")
async def csap_scan(
body: CSAPScanRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_admin_role),
):
"""CSAP 공공기관 보안 전체 자동 점검 (ADMIN 전용)."""
from core.csap_checker import CSAPChecker
try:
result = await CSAPChecker().run_scan(db, body.inst_id, current_user.username)
return result
except Exception as e:
raise HTTPException(500, f"CSAP 점검 오류: {str(e)[:200]}")
@router.get("/csap/items")
async def csap_items(
category: Optional[str] = None,
auto_only: bool = False,
_u: User = Depends(get_current_user),
):
"""CSAP 점검 항목 목록."""
from core.csap_checker import CSAP_ITEMS
items = CSAP_ITEMS
if category:
items = [i for i in items if i["cat"] == category]
if auto_only:
items = [i for i in items if i["auto"]]
return {
"total": len(items),
"categories": list({i["cat"] for i in CSAP_ITEMS}),
"items": items,
}
@router.get("/csap/results")
async def csap_results(
inst_id: Optional[int] = None,
limit: int = 10,
db: AsyncSession = Depends(get_db),
_u: User = Depends(get_current_user),
):
"""최근 CSAP 점검 결과 요약 (배치별)."""
from sqlalchemy import select, distinct, desc, func as sqlfunc
q = select(
CSAPCheckResult.scan_id,
CSAPCheckResult.inst_id,
sqlfunc.count(CSAPCheckResult.id).label("total"),
sqlfunc.sum(
(CSAPCheckResult.status == "PASS").cast(Integer)
).label("pass_count"),
sqlfunc.max(CSAPCheckResult.scanned_at).label("scanned_at"),
).group_by(CSAPCheckResult.scan_id, CSAPCheckResult.inst_id)
if inst_id:
q = q.where(CSAPCheckResult.inst_id == inst_id)
q = q.order_by(desc("scanned_at")).limit(limit)
from sqlalchemy import Integer
result = await db.execute(q)
rows = result.all()
return {
"count": len(rows),
"scans": [
{
"scan_id": r.scan_id,
"inst_id": r.inst_id,
"total": r.total,
"pass_count": r.pass_count or 0,
"scanned_at": r.scanned_at.isoformat() if r.scanned_at else None,
}
for r in rows
],
}
@router.get("/csap/results/{scan_id}")
async def csap_result_detail(
scan_id: str,
db: AsyncSession = Depends(get_db),
_u: User = Depends(get_current_user),
):
"""배치별 CSAP 점검 상세 결과."""
from sqlalchemy import select
q = await db.execute(
select(CSAPCheckResult).where(CSAPCheckResult.scan_id == scan_id)
.order_by(CSAPCheckResult.item_id)
)
items = q.scalars().all()
if not items:
raise HTTPException(404, "점검 결과를 찾을 수 없습니다.")
pass_c = sum(1 for i in items if i.status == "PASS")
fail_c = sum(1 for i in items if i.status == "FAIL")
auto_total = sum(1 for i in items if i.status != "MANUAL_REQUIRED")
rate = round((pass_c / auto_total * 100), 1) if auto_total else 0
return {
"scan_id": scan_id,
"total": len(items),
"pass": pass_c,
"fail": fail_c,
"compliance_rate": rate,
"results": [
{
"item_id": i.item_id,
"category": i.category,
"item_name": i.item_name,
"severity": i.severity,
"status": i.status,
"finding": i.finding,
"recommendation": i.recommendation,
}
for i in items
],
}
@router.post("/csap/evidence/{item_id}")
async def csap_upload_evidence(
item_id: str,
body: EvidenceUpload,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""수동 확인 항목 증적 업로드 (MANUAL_REQUIRED → PASS/PARTIAL)."""
from sqlalchemy import select, update
from core.csap_checker import CSAP_ITEMS
item_def = next((i for i in CSAP_ITEMS if i["id"] == item_id), None)
if not item_def:
raise HTTPException(404, f"항목 {item_id}를 찾을 수 없습니다.")
# 가장 최근 MANUAL_REQUIRED 결과 업데이트
q = await db.execute(
select(CSAPCheckResult)
.where(CSAPCheckResult.item_id == item_id,
CSAPCheckResult.inst_id == body.inst_id,
CSAPCheckResult.status == "MANUAL_REQUIRED")
.order_by(CSAPCheckResult.scanned_at.desc())
.limit(1)
)
rec = q.scalar_one_or_none()
if not rec:
# 신규 등록
rec = CSAPCheckResult(
scan_id=f"MANUAL-{datetime.now().strftime('%Y%m%d')}",
inst_id=body.inst_id,
item_id=item_id,
category=item_def["cat"],
item_name=item_def["name"],
severity=item_def["sev"],
status=body.status,
finding=body.finding,
evidence={"note": body.evidence_note, "uploaded_by": current_user.username},
recommendation="",
)
db.add(rec)
else:
rec.status = body.status
rec.finding = body.finding or rec.finding
rec.evidence = {"note": body.evidence_note, "uploaded_by": current_user.username}
await db.commit()
return {"message": f"{item_id} 증적 등록 완료", "status": body.status}
@router.get("/csap/report/html", response_class=HTMLResponse)
async def csap_html_report(
scan_id: str,
db: AsyncSession = Depends(get_db),
_u: User = Depends(get_current_user),
):
"""CSAP HTML 보고서 (인쇄·공문 첨부용)."""
from sqlalchemy import select
q = await db.execute(
select(CSAPCheckResult).where(CSAPCheckResult.scan_id == scan_id)
.order_by(CSAPCheckResult.item_id)
)
items = q.scalars().all()
if not items:
raise HTTPException(404, "점검 결과를 찾을 수 없습니다.")
from core.csap_checker import CSAPChecker
checker = CSAPChecker()
pass_c = sum(1 for i in items if i.status == "PASS")
auto_total = sum(1 for i in items if i.status != "MANUAL_REQUIRED")
rate = round((pass_c / auto_total * 100), 1) if auto_total else 0
grade = "A" if rate >= 90 else ("B" if rate >= 70 else ("C" if rate >= 50 else "D"))
summary = {"compliance_rate": rate, "grade": grade}
html = checker.generate_html_report(items, scan_id, "기관", summary)
return HTMLResponse(html)
@router.get("/csap/report/excel")
async def csap_excel_report(
scan_id: str,
db: AsyncSession = Depends(get_db),
_u: User = Depends(get_current_user),
):
"""CSAP Excel 보고서."""
from sqlalchemy import select
q = await db.execute(
select(CSAPCheckResult).where(CSAPCheckResult.scan_id == scan_id)
.order_by(CSAPCheckResult.item_id)
)
items = q.scalars().all()
if not items:
raise HTTPException(404, "점검 결과를 찾을 수 없습니다.")
from core.csap_checker import CSAPChecker
xlsx_bytes = CSAPChecker().generate_excel_report(items, "기관", scan_id)
today = datetime.utcnow().strftime("%Y%m%d")
return Response(
content=xlsx_bytes,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f'attachment; filename="CSAP_{scan_id}_{today}.xlsx"'},
)
@router.get("/csap/dashboard")
async def csap_dashboard(
db: AsyncSession = Depends(get_db),
_u: User = Depends(get_current_user),
):
"""기관별 최근 CSAP 준수율 대시보드."""
from sqlalchemy import select, func as sqlfunc, Integer
q = await db.execute(
select(
CSAPCheckResult.inst_id,
CSAPCheckResult.scan_id,
sqlfunc.count(CSAPCheckResult.id).label("total"),
sqlfunc.sum(
(CSAPCheckResult.status == "PASS").cast(Integer)
).label("pass_count"),
sqlfunc.max(CSAPCheckResult.scanned_at).label("scanned_at"),
)
.group_by(CSAPCheckResult.inst_id, CSAPCheckResult.scan_id)
.order_by(CSAPCheckResult.inst_id, sqlfunc.max(CSAPCheckResult.scanned_at).desc())
)
rows = q.all()
# 기관별 최근 1건만
seen = set()
dashboard = []
for r in rows:
if r.inst_id in seen:
continue
seen.add(r.inst_id)
total = r.total or 1
pass_c = r.pass_count or 0
rate = round(pass_c / total * 100, 1)
grade = "A" if rate >= 90 else ("B" if rate >= 70 else ("C" if rate >= 50 else "D"))
dashboard.append({
"inst_id": r.inst_id,
"scan_id": r.scan_id,
"compliance_rate": rate,
"grade": grade,
"pass_count": pass_c,
"total": total,
"scanned_at": r.scanned_at.isoformat() if r.scanned_at else None,
})
return {"count": len(dashboard), "institutions": dashboard}