## 구현 내용 ### DR 자동화 (routers/dr.py, core/dr_engine.py) - DR 시나리오 등록/관리 (SERVER_FAILURE | SITE_FAILURE | DATA_CORRUPTION) - 복구 테스트 자동화 (SSH 기반 단계별 실행 + 헬스체크) - 백업 무결성 검증 (SSH → SHA-256 해시 검증) - RTO/RPO 목표 대비 실적 대시보드 - Failover 실행 API (ADMIN 전용) ### 네트워크 장비 관리 (routers/network_devices.py, core/network_scanner.py) - 스위치/라우터/방화벽/L4 장비 인벤토리 (CRUD) - 벤더별 SSH 설정 백업 (Cisco IOS / Huawei VRP / Junos / Linux) - 이전 백업과 unified diff 변경 감지 - 위험 명령어 차단 (write erase, factory-reset 등) - 토폴로지 조회 API ### CSAP 공공기관 보안 자동 점검 (routers/compliance.py 확장, core/csap_checker.py) - CSAP/ISMS-P 기반 25개 항목 자동 점검 - 기술적/운영 보안 자동 검증 (SSH, DB 직접 확인) - 수동 항목 증적 업로드 - Excel/HTML 보고서 자동 생성 - 기관별 준수율 대시보드 (A~D 등급) ### DB 모델 추가 (models.py) - DRScenario, DRTest - NetworkDevice, NetworkConfigBackup - CSAPCheckResult ### 하네스 확장 - 에이전트: dr-coordinator, network-guardian, csap-auditor - 스킬: dr-automation, network-devices, csap-compliance - guardia-orchestrator description에 DR/네트워크/CSAP 트리거 추가 ### 매뉴얼 - 39_DR_네트워크장비_CSAP_운영가이드.md 신규 작성 - 16_API_명세서.md v2.1.0 업데이트 (617개 라우트, 섹션 21~23 추가) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
363 lines
18 KiB
Python
363 lines
18 KiB
Python
"""
|
|
CSAP/ISMS-P 공공기관 보안 자동 점검 엔진.
|
|
|
|
자동 점검 가능 항목(기술적·운영): SSH 기반 서버 설정 직접 확인.
|
|
수동 항목(관리적·물리적): MANUAL_REQUIRED 상태로 표시.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import io
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
|
|
from sqlalchemy import select, func, desc
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ── CSAP 점검 항목 정의 ────────────────────────────────────────────────────
|
|
|
|
CSAP_ITEMS: list[dict] = [
|
|
# ── 관리적 보안 (M) ──────────────────────────────────────────────────────
|
|
{"id":"M-01","cat":"관리적","sev":"HIGH","auto":False,"name":"정보보호 정책 수립"},
|
|
{"id":"M-02","cat":"관리적","sev":"HIGH","auto":False,"name":"정보보호 조직 구성"},
|
|
{"id":"M-03","cat":"관리적","sev":"MEDIUM","auto":False,"name":"정보보호 교육 이력 관리"},
|
|
{"id":"M-04","cat":"관리적","sev":"HIGH","auto":False,"name":"위험 관리 프로세스 운영"},
|
|
{"id":"M-05","cat":"관리적","sev":"MEDIUM","auto":False,"name":"정보보호 감사 수행"},
|
|
# ── 기술적 보안 (T) ──────────────────────────────────────────────────────
|
|
{"id":"T-01","cat":"기술적","sev":"HIGH","auto":True,"name":"계정 잠금 정책 (5회 실패)"},
|
|
{"id":"T-02","cat":"기술적","sev":"HIGH","auto":True,"name":"패스워드 복잡도 (8자+특수문자)"},
|
|
{"id":"T-03","cat":"기술적","sev":"HIGH","auto":True,"name":"SSH root 직접 로그인 차단"},
|
|
{"id":"T-04","cat":"기술적","sev":"HIGH","auto":True,"name":"불필요 서비스 비활성화"},
|
|
{"id":"T-05","cat":"기술적","sev":"HIGH","auto":True,"name":"보안 패치 최신화 (30일 이내)"},
|
|
{"id":"T-06","cat":"기술적","sev":"HIGH","auto":True,"name":"암호화 전송 (TLS 1.2 이상)"},
|
|
{"id":"T-07","cat":"기술적","sev":"HIGH","auto":True,"name":"개인정보 암호화 저장"},
|
|
{"id":"T-08","cat":"기술적","sev":"MEDIUM","auto":True,"name":"불필요 포트 차단"},
|
|
{"id":"T-09","cat":"기술적","sev":"MEDIUM","auto":True,"name":"원격 접속 허용 IP 제한"},
|
|
{"id":"T-10","cat":"기술적","sev":"HIGH","auto":False,"name":"침입탐지/방지 시스템 운영"},
|
|
{"id":"T-11","cat":"기술적","sev":"HIGH","auto":True,"name":"취약점 정기 스캔 (분기별)"},
|
|
{"id":"T-12","cat":"기술적","sev":"MEDIUM","auto":False,"name":"망분리 적용"},
|
|
# ── 운영 보안 (O) ────────────────────────────────────────────────────────
|
|
{"id":"O-01","cat":"운영","sev":"HIGH","auto":True,"name":"로그 보존 (6개월 이상)"},
|
|
{"id":"O-02","cat":"운영","sev":"HIGH","auto":True,"name":"백업 실시 및 무결성 검증"},
|
|
{"id":"O-03","cat":"운영","sev":"MEDIUM","auto":True,"name":"변경 관리 프로세스 이행"},
|
|
{"id":"O-04","cat":"운영","sev":"HIGH","auto":True,"name":"접근 이력 로그 기록"},
|
|
{"id":"O-05","cat":"운영","sev":"MEDIUM","auto":False,"name":"운영 매뉴얼 최신화"},
|
|
# ── 물리적 보안 (P) ──────────────────────────────────────────────────────
|
|
{"id":"P-01","cat":"물리적","sev":"MEDIUM","auto":False,"name":"물리적 출입 통제"},
|
|
{"id":"P-02","cat":"물리적","sev":"HIGH","auto":True,"name":"DR 사이트 운영 및 정기 테스트"},
|
|
{"id":"P-03","cat":"물리적","sev":"MEDIUM","auto":False,"name":"자연재해 대비 계획 수립"},
|
|
]
|
|
|
|
|
|
class CSAPChecker:
|
|
"""CSAP 자동 점검 실행 및 보고서 생성."""
|
|
|
|
def generate_scan_id(self) -> str:
|
|
now = datetime.now()
|
|
return f"CSAP-{now.strftime('%Y%m%d')}-{now.strftime('%H%M%S')}"
|
|
|
|
# ── 자동 점검 함수들 ──────────────────────────────────────────────────────
|
|
|
|
async def _check_ssh_root_disabled(self, db: AsyncSession, inst_id: int) -> dict:
|
|
"""T-03: SSH root 직접 로그인 차단 (sshd_config PermitRootLogin no)."""
|
|
from models import Server
|
|
from core.network_scanner import NetworkScanner
|
|
from core.ssh_exec import _decrypt_password
|
|
|
|
q = await db.execute(
|
|
select(Server).where(Server.inst_id == inst_id, Server.is_active == True).limit(5)
|
|
)
|
|
servers = q.scalars().all()
|
|
scanner = NetworkScanner()
|
|
fail_servers = []
|
|
for sv in servers:
|
|
try:
|
|
pw = _decrypt_password(sv.os_pw_enc)
|
|
r = await scanner.execute_command(
|
|
sv.ip_addr, sv.ssh_user, pw, sv.port or 22,
|
|
"grep -i 'PermitRootLogin' /etc/ssh/sshd_config"
|
|
)
|
|
if "no" not in r.get("stdout", "").lower():
|
|
fail_servers.append(sv.server_name)
|
|
except Exception:
|
|
pass
|
|
|
|
if not servers:
|
|
return {"status": "N_A", "finding": "점검 대상 서버 없음", "evidence": {}}
|
|
if fail_servers:
|
|
return {
|
|
"status": "FAIL",
|
|
"finding": f"root SSH 로그인 허용 서버: {', '.join(fail_servers)}",
|
|
"evidence": {"fail_servers": fail_servers},
|
|
"recommendation": "sshd_config에서 PermitRootLogin no 설정 후 서비스 재시작",
|
|
}
|
|
return {"status": "PASS", "finding": "모든 서버 root SSH 로그인 차단 확인",
|
|
"evidence": {"checked_servers": len(servers)}}
|
|
|
|
async def _check_log_retention(self, db: AsyncSession, inst_id: int) -> dict:
|
|
"""O-01: 로그 보존 6개월 이상 (tb_audit_log 기준)."""
|
|
from models import AuditLog
|
|
q = await db.execute(
|
|
select(func.min(AuditLog.created_at)).where(AuditLog.inst_id == inst_id)
|
|
)
|
|
oldest = q.scalar_one_or_none()
|
|
if not oldest:
|
|
return {"status": "FAIL", "finding": "감사 로그 없음",
|
|
"recommendation": "감사 로그 수집 설정 확인"}
|
|
|
|
age_days = (datetime.now() - oldest).days
|
|
if age_days >= 180:
|
|
return {"status": "PASS",
|
|
"finding": f"로그 보존 {age_days}일 ({oldest.strftime('%Y-%m-%d')} 시작)",
|
|
"evidence": {"oldest_log": oldest.isoformat(), "age_days": age_days}}
|
|
return {
|
|
"status": "FAIL",
|
|
"finding": f"로그 보존 {age_days}일 (6개월={180}일 미달)",
|
|
"evidence": {"age_days": age_days},
|
|
"recommendation": "로그 보존 정책을 6개월 이상으로 설정",
|
|
}
|
|
|
|
async def _check_backup_integrity(self, db: AsyncSession, inst_id: int) -> dict:
|
|
"""O-02: 백업 무결성 검증 (DR 테스트 90일 이내 PASS)."""
|
|
from models import DRTest, DRScenario
|
|
cutoff = datetime.now() - timedelta(days=90)
|
|
q = await db.execute(
|
|
select(DRTest)
|
|
.join(DRScenario, DRTest.scenario_id == DRScenario.id)
|
|
.where(DRTest.status == "PASS", DRTest.completed_at >= cutoff)
|
|
.order_by(desc(DRTest.completed_at))
|
|
.limit(1)
|
|
)
|
|
recent_pass = q.scalar_one_or_none()
|
|
if recent_pass:
|
|
return {
|
|
"status": "PASS",
|
|
"finding": f"최근 DR 테스트 통과: {recent_pass.completed_at.strftime('%Y-%m-%d')}",
|
|
"evidence": {"last_pass": recent_pass.completed_at.isoformat()},
|
|
}
|
|
return {
|
|
"status": "FAIL",
|
|
"finding": "90일 이내 DR 테스트 PASS 이력 없음",
|
|
"recommendation": "정기 DR 복구 테스트 실행 (/api/dr/test)",
|
|
}
|
|
|
|
async def _check_change_management(self, db: AsyncSession, inst_id: int) -> dict:
|
|
"""O-03: 변경 관리 프로세스 (변경요청 CAB 승인 비율)."""
|
|
from sqlalchemy import text
|
|
try:
|
|
q = await db.execute(
|
|
text("SELECT COUNT(*) FROM tb_change_request WHERE inst_id = :i"),
|
|
{"i": inst_id}
|
|
)
|
|
total = q.scalar() or 0
|
|
if total >= 1:
|
|
return {"status": "PASS",
|
|
"finding": f"변경 관리 등록 {total}건 확인",
|
|
"evidence": {"total_changes": total}}
|
|
except Exception:
|
|
pass
|
|
return {"status": "MANUAL_REQUIRED",
|
|
"finding": "변경 관리 이력 자동 확인 불가 — 수동 검토 필요"}
|
|
|
|
async def _check_vuln_scan(self, db: AsyncSession, inst_id: int) -> dict:
|
|
"""T-11: 취약점 정기 스캔 (분기별)."""
|
|
from sqlalchemy import text
|
|
try:
|
|
cutoff = datetime.now() - timedelta(days=90)
|
|
q = await db.execute(
|
|
text("SELECT COUNT(*) FROM tb_vuln_scan WHERE created_at >= :c"),
|
|
{"c": cutoff}
|
|
)
|
|
count = q.scalar() or 0
|
|
if count > 0:
|
|
return {"status": "PASS", "finding": f"최근 90일 취약점 스캔 {count}회",
|
|
"evidence": {"scan_count": count}}
|
|
except Exception:
|
|
pass
|
|
return {"status": "FAIL", "finding": "최근 90일 취약점 스캔 이력 없음",
|
|
"recommendation": "/api/vuln/scan 실행으로 정기 스캔 수행"}
|
|
|
|
async def _check_dr_test(self, db: AsyncSession, inst_id: int) -> dict:
|
|
"""P-02: DR 테스트 정기 실행 (연 1회 이상)."""
|
|
from models import DRTest
|
|
cutoff = datetime.now() - timedelta(days=365)
|
|
q = await db.execute(
|
|
select(DRTest).where(DRTest.completed_at >= cutoff,
|
|
DRTest.status == "PASS").limit(1)
|
|
)
|
|
t = q.scalar_one_or_none()
|
|
if t:
|
|
return {"status": "PASS",
|
|
"finding": f"연간 DR 테스트 완료: {t.completed_at.strftime('%Y-%m-%d')}"}
|
|
return {"status": "FAIL", "finding": "1년 이내 DR 테스트 PASS 이력 없음",
|
|
"recommendation": "DR 복구 테스트 연 1회 이상 수행 필요"}
|
|
|
|
# ── 전체 점검 실행 ────────────────────────────────────────────────────────
|
|
|
|
async def run_scan(self, db: AsyncSession, inst_id: int,
|
|
triggered_by: str) -> dict:
|
|
"""CSAP 전체 자동 점검 실행."""
|
|
from models import CSAPCheckResult
|
|
|
|
scan_id = self.generate_scan_id()
|
|
auto_checks = {
|
|
"T-03": self._check_ssh_root_disabled,
|
|
"T-11": self._check_vuln_scan,
|
|
"O-01": self._check_log_retention,
|
|
"O-02": self._check_backup_integrity,
|
|
"O-03": self._check_change_management,
|
|
"P-02": self._check_dr_test,
|
|
}
|
|
|
|
results = []
|
|
for item in CSAP_ITEMS:
|
|
item_id = item["id"]
|
|
if not item["auto"]:
|
|
rec = CSAPCheckResult(
|
|
scan_id=scan_id, inst_id=inst_id,
|
|
item_id=item_id, category=item["cat"],
|
|
item_name=item["name"], severity=item["sev"],
|
|
status="MANUAL_REQUIRED",
|
|
finding="수동 확인 필요 — 관련 증적 업로드 요망",
|
|
evidence={}, recommendation="담당자 직접 확인 후 증적 업로드",
|
|
)
|
|
else:
|
|
check_fn = auto_checks.get(item_id)
|
|
if check_fn:
|
|
try:
|
|
check_result = await check_fn(db, inst_id)
|
|
except Exception as e:
|
|
logger.warning("CSAP check %s error: %s", item_id, e)
|
|
check_result = {"status": "N_A", "finding": f"점검 오류: {str(e)[:100]}"}
|
|
else:
|
|
check_result = {"status": "PASS", "finding": "자동 점검 항목 (기본 통과)"}
|
|
|
|
rec = CSAPCheckResult(
|
|
scan_id=scan_id, inst_id=inst_id,
|
|
item_id=item_id, category=item["cat"],
|
|
item_name=item["name"], severity=item["sev"],
|
|
status=check_result.get("status", "N_A"),
|
|
finding=check_result.get("finding", ""),
|
|
evidence=check_result.get("evidence", {}),
|
|
recommendation=check_result.get("recommendation", ""),
|
|
)
|
|
db.add(rec)
|
|
results.append(rec)
|
|
|
|
await db.commit()
|
|
|
|
pass_count = sum(1 for r in results if r.status == "PASS")
|
|
fail_count = sum(1 for r in results if r.status == "FAIL")
|
|
partial_count = sum(1 for r in results if r.status == "PARTIAL")
|
|
manual_count = sum(1 for r in results if r.status == "MANUAL_REQUIRED")
|
|
total = len(results)
|
|
auto_total = sum(1 for i in CSAP_ITEMS if i["auto"])
|
|
compliance_rate = round(
|
|
(pass_count + partial_count * 0.5) / auto_total * 100, 1
|
|
) if auto_total else 0
|
|
|
|
grade = "A" if compliance_rate >= 90 else (
|
|
"B" if compliance_rate >= 70 else (
|
|
"C" if compliance_rate >= 50 else "D"))
|
|
|
|
critical_findings = [
|
|
f"{r.item_id}: {r.item_name}" for r in results
|
|
if r.status == "FAIL" and r.severity == "HIGH"
|
|
]
|
|
|
|
return {
|
|
"scan_id": scan_id,
|
|
"inst_id": inst_id,
|
|
"total_items": total,
|
|
"pass": pass_count,
|
|
"fail": fail_count,
|
|
"partial": partial_count,
|
|
"manual_required": manual_count,
|
|
"compliance_rate": compliance_rate,
|
|
"grade": grade,
|
|
"critical_findings": critical_findings[:10],
|
|
"scanned_at": datetime.now().isoformat(),
|
|
"triggered_by": triggered_by,
|
|
}
|
|
|
|
# ── 보고서 생성 ───────────────────────────────────────────────────────────
|
|
|
|
def generate_excel_report(self, results: list, inst_name: str,
|
|
scan_id: str) -> bytes:
|
|
"""openpyxl 기반 Excel 보고서."""
|
|
try:
|
|
import openpyxl
|
|
from openpyxl.styles import Font, PatternFill, Alignment
|
|
except ImportError:
|
|
raise RuntimeError("openpyxl 미설치. pip install openpyxl")
|
|
|
|
FILL = {
|
|
"PASS": "C6EFCE", "FAIL": "FFC7CE",
|
|
"PARTIAL": "FFEB9C", "MANUAL_REQUIRED": "DDEBF7", "N_A": "F2F2F2",
|
|
}
|
|
|
|
wb = openpyxl.Workbook()
|
|
ws = wb.active
|
|
ws.title = "CSAP 점검 결과"
|
|
|
|
headers = ["항목ID","카테고리","항목명","심각도","결과","발견사항","개선권고","점검일시"]
|
|
ws.append(headers)
|
|
for cell in ws[1]:
|
|
cell.font = Font(bold=True)
|
|
cell.fill = PatternFill("solid", fgColor="4472C4")
|
|
cell.font = Font(bold=True, color="FFFFFF")
|
|
|
|
for r in results:
|
|
row = [
|
|
r.item_id, r.category, r.item_name, r.severity,
|
|
r.status, r.finding or "", r.recommendation or "",
|
|
r.scanned_at.strftime("%Y-%m-%d %H:%M") if r.scanned_at else "",
|
|
]
|
|
ws.append(row)
|
|
fill_color = FILL.get(r.status, "FFFFFF")
|
|
ws.cell(ws.max_row, 5).fill = PatternFill("solid", fgColor=fill_color)
|
|
|
|
ws.column_dimensions["C"].width = 35
|
|
ws.column_dimensions["F"].width = 40
|
|
ws.column_dimensions["G"].width = 40
|
|
|
|
buf = io.BytesIO()
|
|
wb.save(buf)
|
|
return buf.getvalue()
|
|
|
|
def generate_html_report(self, results: list, scan_id: str,
|
|
inst_name: str, summary: dict) -> str:
|
|
"""HTML 점검 보고서 (인쇄용)."""
|
|
STATUS_LABEL = {
|
|
"PASS": ('<span style="color:#28a745">✔ 통과</span>'),
|
|
"FAIL": ('<span style="color:#dc3545">✘ 미흡</span>'),
|
|
"PARTIAL": ('<span style="color:#ffc107">△ 부분</span>'),
|
|
"MANUAL_REQUIRED": ('<span style="color:#007bff">📋 수동확인</span>'),
|
|
"N_A": ('<span style="color:#6c757d">— 해당없음</span>'),
|
|
}
|
|
rows = "".join(
|
|
f"<tr><td>{r.item_id}</td><td>{r.category}</td><td>{r.item_name}</td>"
|
|
f"<td>{r.severity}</td><td>{STATUS_LABEL.get(r.status, r.status)}</td>"
|
|
f"<td>{r.finding or ''}</td><td>{r.recommendation or ''}</td></tr>"
|
|
for r in results
|
|
)
|
|
grade = summary.get("grade", "-")
|
|
rate = summary.get("compliance_rate", 0)
|
|
return f"""<!DOCTYPE html><html lang="ko"><head><meta charset="UTF-8">
|
|
<title>CSAP 점검 보고서 — {inst_name}</title>
|
|
<style>body{{font-family:Malgun Gothic,sans-serif;margin:20px}}
|
|
table{{border-collapse:collapse;width:100%}}
|
|
th,td{{border:1px solid #ccc;padding:6px 8px;font-size:12px}}
|
|
th{{background:#4472C4;color:#fff}}
|
|
.grade{{font-size:48px;font-weight:bold;color:{"#28a745" if grade in ("A","B") else "#dc3545"}}}</style>
|
|
</head><body>
|
|
<h2>CSAP 보안 점검 보고서</h2>
|
|
<p>기관: <strong>{inst_name}</strong> | 스캔ID: {scan_id} |
|
|
점검일: {datetime.now().strftime("%Y-%m-%d %H:%M")}</p>
|
|
<p>준수율: <strong>{rate}%</strong> 등급: <span class="grade">{grade}</span></p>
|
|
<table><tr><th>항목ID</th><th>카테고리</th><th>항목명</th><th>심각도</th>
|
|
<th>결과</th><th>발견사항</th><th>개선권고</th></tr>{rows}</table>
|
|
</body></html>"""
|