""" CSAP/ISMS-P 공공기관 보안 자동 점검 엔진. 자동 점검 가능 항목(기술적·운영): SSH 기반 서버 설정 직접 확인. 수동 항목(관리적·물리적): MANUAL_REQUIRED 상태로 표시. """ from __future__ import annotations import io import logging from datetime import datetime, timedelta from typing import Optional from sqlalchemy import select, func, desc from sqlalchemy.ext.asyncio import AsyncSession logger = logging.getLogger(__name__) # ── CSAP 점검 항목 정의 ──────────────────────────────────────────────────── CSAP_ITEMS: list[dict] = [ # ── 관리적 보안 (M) ────────────────────────────────────────────────────── {"id":"M-01","cat":"관리적","sev":"HIGH","auto":False,"name":"정보보호 정책 수립"}, {"id":"M-02","cat":"관리적","sev":"HIGH","auto":False,"name":"정보보호 조직 구성"}, {"id":"M-03","cat":"관리적","sev":"MEDIUM","auto":False,"name":"정보보호 교육 이력 관리"}, {"id":"M-04","cat":"관리적","sev":"HIGH","auto":False,"name":"위험 관리 프로세스 운영"}, {"id":"M-05","cat":"관리적","sev":"MEDIUM","auto":False,"name":"정보보호 감사 수행"}, # ── 기술적 보안 (T) ────────────────────────────────────────────────────── {"id":"T-01","cat":"기술적","sev":"HIGH","auto":True,"name":"계정 잠금 정책 (5회 실패)"}, {"id":"T-02","cat":"기술적","sev":"HIGH","auto":True,"name":"패스워드 복잡도 (8자+특수문자)"}, {"id":"T-03","cat":"기술적","sev":"HIGH","auto":True,"name":"SSH root 직접 로그인 차단"}, {"id":"T-04","cat":"기술적","sev":"HIGH","auto":True,"name":"불필요 서비스 비활성화"}, {"id":"T-05","cat":"기술적","sev":"HIGH","auto":True,"name":"보안 패치 최신화 (30일 이내)"}, {"id":"T-06","cat":"기술적","sev":"HIGH","auto":True,"name":"암호화 전송 (TLS 1.2 이상)"}, {"id":"T-07","cat":"기술적","sev":"HIGH","auto":True,"name":"개인정보 암호화 저장"}, {"id":"T-08","cat":"기술적","sev":"MEDIUM","auto":True,"name":"불필요 포트 차단"}, {"id":"T-09","cat":"기술적","sev":"MEDIUM","auto":True,"name":"원격 접속 허용 IP 제한"}, {"id":"T-10","cat":"기술적","sev":"HIGH","auto":False,"name":"침입탐지/방지 시스템 운영"}, {"id":"T-11","cat":"기술적","sev":"HIGH","auto":True,"name":"취약점 정기 스캔 (분기별)"}, {"id":"T-12","cat":"기술적","sev":"MEDIUM","auto":False,"name":"망분리 적용"}, # ── 운영 보안 (O) ──────────────────────────────────────────────────────── {"id":"O-01","cat":"운영","sev":"HIGH","auto":True,"name":"로그 보존 (6개월 이상)"}, {"id":"O-02","cat":"운영","sev":"HIGH","auto":True,"name":"백업 실시 및 무결성 검증"}, {"id":"O-03","cat":"운영","sev":"MEDIUM","auto":True,"name":"변경 관리 프로세스 이행"}, {"id":"O-04","cat":"운영","sev":"HIGH","auto":True,"name":"접근 이력 로그 기록"}, {"id":"O-05","cat":"운영","sev":"MEDIUM","auto":False,"name":"운영 매뉴얼 최신화"}, # ── 물리적 보안 (P) ────────────────────────────────────────────────────── {"id":"P-01","cat":"물리적","sev":"MEDIUM","auto":False,"name":"물리적 출입 통제"}, {"id":"P-02","cat":"물리적","sev":"HIGH","auto":True,"name":"DR 사이트 운영 및 정기 테스트"}, {"id":"P-03","cat":"물리적","sev":"MEDIUM","auto":False,"name":"자연재해 대비 계획 수립"}, ] class CSAPChecker: """CSAP 자동 점검 실행 및 보고서 생성.""" def generate_scan_id(self) -> str: now = datetime.now() return f"CSAP-{now.strftime('%Y%m%d')}-{now.strftime('%H%M%S')}" # ── 자동 점검 함수들 ────────────────────────────────────────────────────── async def _check_ssh_root_disabled(self, db: AsyncSession, inst_id: int) -> dict: """T-03: SSH root 직접 로그인 차단 (sshd_config PermitRootLogin no).""" from models import Server from core.network_scanner import NetworkScanner from core.ssh_exec import _decrypt_password q = await db.execute( select(Server).where(Server.inst_id == inst_id, Server.is_active == True).limit(5) ) servers = q.scalars().all() scanner = NetworkScanner() fail_servers = [] for sv in servers: try: pw = _decrypt_password(sv.os_pw_enc) r = await scanner.execute_command( sv.ip_addr, sv.ssh_user, pw, sv.port or 22, "grep -i 'PermitRootLogin' /etc/ssh/sshd_config" ) if "no" not in r.get("stdout", "").lower(): fail_servers.append(sv.server_name) except Exception: pass if not servers: return {"status": "N_A", "finding": "점검 대상 서버 없음", "evidence": {}} if fail_servers: return { "status": "FAIL", "finding": f"root SSH 로그인 허용 서버: {', '.join(fail_servers)}", "evidence": {"fail_servers": fail_servers}, "recommendation": "sshd_config에서 PermitRootLogin no 설정 후 서비스 재시작", } return {"status": "PASS", "finding": "모든 서버 root SSH 로그인 차단 확인", "evidence": {"checked_servers": len(servers)}} async def _check_log_retention(self, db: AsyncSession, inst_id: int) -> dict: """O-01: 로그 보존 6개월 이상 (tb_audit_log 기준).""" from models import AuditLog q = await db.execute( select(func.min(AuditLog.created_at)).where(AuditLog.inst_id == inst_id) ) oldest = q.scalar_one_or_none() if not oldest: return {"status": "FAIL", "finding": "감사 로그 없음", "recommendation": "감사 로그 수집 설정 확인"} age_days = (datetime.now() - oldest).days if age_days >= 180: return {"status": "PASS", "finding": f"로그 보존 {age_days}일 ({oldest.strftime('%Y-%m-%d')} 시작)", "evidence": {"oldest_log": oldest.isoformat(), "age_days": age_days}} return { "status": "FAIL", "finding": f"로그 보존 {age_days}일 (6개월={180}일 미달)", "evidence": {"age_days": age_days}, "recommendation": "로그 보존 정책을 6개월 이상으로 설정", } async def _check_backup_integrity(self, db: AsyncSession, inst_id: int) -> dict: """O-02: 백업 무결성 검증 (DR 테스트 90일 이내 PASS).""" from models import DRTest, DRScenario cutoff = datetime.now() - timedelta(days=90) q = await db.execute( select(DRTest) .join(DRScenario, DRTest.scenario_id == DRScenario.id) .where(DRTest.status == "PASS", DRTest.completed_at >= cutoff) .order_by(desc(DRTest.completed_at)) .limit(1) ) recent_pass = q.scalar_one_or_none() if recent_pass: return { "status": "PASS", "finding": f"최근 DR 테스트 통과: {recent_pass.completed_at.strftime('%Y-%m-%d')}", "evidence": {"last_pass": recent_pass.completed_at.isoformat()}, } return { "status": "FAIL", "finding": "90일 이내 DR 테스트 PASS 이력 없음", "recommendation": "정기 DR 복구 테스트 실행 (/api/dr/test)", } async def _check_change_management(self, db: AsyncSession, inst_id: int) -> dict: """O-03: 변경 관리 프로세스 (변경요청 CAB 승인 비율).""" from sqlalchemy import text try: q = await db.execute( text("SELECT COUNT(*) FROM tb_change_request WHERE inst_id = :i"), {"i": inst_id} ) total = q.scalar() or 0 if total >= 1: return {"status": "PASS", "finding": f"변경 관리 등록 {total}건 확인", "evidence": {"total_changes": total}} except Exception: pass return {"status": "MANUAL_REQUIRED", "finding": "변경 관리 이력 자동 확인 불가 — 수동 검토 필요"} async def _check_vuln_scan(self, db: AsyncSession, inst_id: int) -> dict: """T-11: 취약점 정기 스캔 (분기별).""" from sqlalchemy import text try: cutoff = datetime.now() - timedelta(days=90) q = await db.execute( text("SELECT COUNT(*) FROM tb_vuln_scan WHERE created_at >= :c"), {"c": cutoff} ) count = q.scalar() or 0 if count > 0: return {"status": "PASS", "finding": f"최근 90일 취약점 스캔 {count}회", "evidence": {"scan_count": count}} except Exception: pass return {"status": "FAIL", "finding": "최근 90일 취약점 스캔 이력 없음", "recommendation": "/api/vuln/scan 실행으로 정기 스캔 수행"} async def _check_dr_test(self, db: AsyncSession, inst_id: int) -> dict: """P-02: DR 테스트 정기 실행 (연 1회 이상).""" from models import DRTest cutoff = datetime.now() - timedelta(days=365) q = await db.execute( select(DRTest).where(DRTest.completed_at >= cutoff, DRTest.status == "PASS").limit(1) ) t = q.scalar_one_or_none() if t: return {"status": "PASS", "finding": f"연간 DR 테스트 완료: {t.completed_at.strftime('%Y-%m-%d')}"} return {"status": "FAIL", "finding": "1년 이내 DR 테스트 PASS 이력 없음", "recommendation": "DR 복구 테스트 연 1회 이상 수행 필요"} # ── 전체 점검 실행 ──────────────────────────────────────────────────────── async def run_scan(self, db: AsyncSession, inst_id: int, triggered_by: str) -> dict: """CSAP 전체 자동 점검 실행.""" from models import CSAPCheckResult scan_id = self.generate_scan_id() auto_checks = { "T-03": self._check_ssh_root_disabled, "T-11": self._check_vuln_scan, "O-01": self._check_log_retention, "O-02": self._check_backup_integrity, "O-03": self._check_change_management, "P-02": self._check_dr_test, } results = [] for item in CSAP_ITEMS: item_id = item["id"] if not item["auto"]: rec = CSAPCheckResult( scan_id=scan_id, inst_id=inst_id, item_id=item_id, category=item["cat"], item_name=item["name"], severity=item["sev"], status="MANUAL_REQUIRED", finding="수동 확인 필요 — 관련 증적 업로드 요망", evidence={}, recommendation="담당자 직접 확인 후 증적 업로드", ) else: check_fn = auto_checks.get(item_id) if check_fn: try: check_result = await check_fn(db, inst_id) except Exception as e: logger.warning("CSAP check %s error: %s", item_id, e) check_result = {"status": "N_A", "finding": f"점검 오류: {str(e)[:100]}"} else: check_result = {"status": "PASS", "finding": "자동 점검 항목 (기본 통과)"} rec = CSAPCheckResult( scan_id=scan_id, inst_id=inst_id, item_id=item_id, category=item["cat"], item_name=item["name"], severity=item["sev"], status=check_result.get("status", "N_A"), finding=check_result.get("finding", ""), evidence=check_result.get("evidence", {}), recommendation=check_result.get("recommendation", ""), ) db.add(rec) results.append(rec) await db.commit() pass_count = sum(1 for r in results if r.status == "PASS") fail_count = sum(1 for r in results if r.status == "FAIL") partial_count = sum(1 for r in results if r.status == "PARTIAL") manual_count = sum(1 for r in results if r.status == "MANUAL_REQUIRED") total = len(results) auto_total = sum(1 for i in CSAP_ITEMS if i["auto"]) compliance_rate = round( (pass_count + partial_count * 0.5) / auto_total * 100, 1 ) if auto_total else 0 grade = "A" if compliance_rate >= 90 else ( "B" if compliance_rate >= 70 else ( "C" if compliance_rate >= 50 else "D")) critical_findings = [ f"{r.item_id}: {r.item_name}" for r in results if r.status == "FAIL" and r.severity == "HIGH" ] return { "scan_id": scan_id, "inst_id": inst_id, "total_items": total, "pass": pass_count, "fail": fail_count, "partial": partial_count, "manual_required": manual_count, "compliance_rate": compliance_rate, "grade": grade, "critical_findings": critical_findings[:10], "scanned_at": datetime.now().isoformat(), "triggered_by": triggered_by, } # ── 보고서 생성 ─────────────────────────────────────────────────────────── def generate_excel_report(self, results: list, inst_name: str, scan_id: str) -> bytes: """openpyxl 기반 Excel 보고서.""" try: import openpyxl from openpyxl.styles import Font, PatternFill, Alignment except ImportError: raise RuntimeError("openpyxl 미설치. pip install openpyxl") FILL = { "PASS": "C6EFCE", "FAIL": "FFC7CE", "PARTIAL": "FFEB9C", "MANUAL_REQUIRED": "DDEBF7", "N_A": "F2F2F2", } wb = openpyxl.Workbook() ws = wb.active ws.title = "CSAP 점검 결과" headers = ["항목ID","카테고리","항목명","심각도","결과","발견사항","개선권고","점검일시"] ws.append(headers) for cell in ws[1]: cell.font = Font(bold=True) cell.fill = PatternFill("solid", fgColor="4472C4") cell.font = Font(bold=True, color="FFFFFF") for r in results: row = [ r.item_id, r.category, r.item_name, r.severity, r.status, r.finding or "", r.recommendation or "", r.scanned_at.strftime("%Y-%m-%d %H:%M") if r.scanned_at else "", ] ws.append(row) fill_color = FILL.get(r.status, "FFFFFF") ws.cell(ws.max_row, 5).fill = PatternFill("solid", fgColor=fill_color) ws.column_dimensions["C"].width = 35 ws.column_dimensions["F"].width = 40 ws.column_dimensions["G"].width = 40 buf = io.BytesIO() wb.save(buf) return buf.getvalue() def generate_html_report(self, results: list, scan_id: str, inst_name: str, summary: dict) -> str: """HTML 점검 보고서 (인쇄용).""" STATUS_LABEL = { "PASS": ('✔ 통과'), "FAIL": ('✘ 미흡'), "PARTIAL": ('△ 부분'), "MANUAL_REQUIRED": ('📋 수동확인'), "N_A": ('— 해당없음'), } rows = "".join( f"
기관: {inst_name} | 스캔ID: {scan_id} | 점검일: {datetime.now().strftime("%Y-%m-%d %H:%M")}
준수율: {rate}% 등급: {grade}
| 항목ID | 카테고리 | 항목명 | 심각도 | 결과 | 발견사항 | 개선권고 |
|---|