"""D-4 보안 취약점 자동 스캔 테스트""" import sys, ast, os os.environ.setdefault("GUARDIA_SECRET_KEY", "test-d4-secret-key-32bytes-padded!") os.environ.setdefault("DATABASE_URL", "sqlite+aiosqlite:///./test_d4.db") ok = True print("=== 1. 구문 검사 ===") files = ["core/vuln_scan.py", "routers/vuln_scan.py", "main.py"] for f in files: try: with open(f, encoding="utf-8") as fh: src = fh.read() ast.parse(src) print(f" OK {f}") except SyntaxError as e: print(f" ERR {f}: {e}") ok = False print("\n=== 2. core/vuln_scan.py 핵심 함수 확인 ===") with open("core/vuln_scan.py", encoding="utf-8") as f: vuln_src = f.read() checks = [ ("def scan_ports", "포트 스캔 함수"), ("def _scan_port", "단일 포트 테스트 함수"), ("def _grab_banner", "배너 그랩 함수"), ("def check_version_vulns", "버전 취약점 체크"), ("def check_config_issues", "설정 취약점 체크"), ("async def run_vulnerability_scan", "통합 스캔 함수"), ("def calculate_cvss_simplified", "CVSS 계산 함수"), ("VULN_VERSION_PATTERNS", "CVE 패턴 DB"), ("DANGER_PORTS", "위험 포트 목록"), ("DEFAULT_PORT_SERVICES", "기본 포트-서비스 매핑"), ("REQUIRED_SECURITY_HEADERS", "필수 보안 헤더 목록"), ("CVE-2021-41773", "Apache 경로순회 CVE"), ("CVE-2022-0543", "Redis 취약점 CVE"), ("async def _llm_analyze_findings", "LLM 보조 분석 함수"), ("localhost:11434", "내부 Ollama LLM 사용"), ("risk_score", "위험 점수 계산"), ] for sym, desc in checks: status = "OK" if sym in vuln_src else "ERR" if status == "ERR": ok = False print(f" {status} {desc}") print("\n=== 3. routers/vuln_scan.py 엔드포인트 확인 ===") with open("routers/vuln_scan.py", encoding="utf-8") as f: router_src = f.read() ep_checks = [ ('@router.post("/scan"', "POST /scan"), ('@router.get("/scans"', "GET /scans"), ('/scans/{scan_id}', "GET /scans/{scan_id}"), ('@router.post("/quick-check"',"POST /quick-check"), ('/cve/{cve_id}', "GET /cve/{cve_id}"), ('@router.post("/cvss"', "POST /cvss"), ('@router.get("/stats"', "GET /stats"), ('@router.get("/policies"', "GET /policies"), ("BackgroundTasks", "비동기 백그라운드 스캔"), ("UserRole.ADMIN", "ADMIN 권한 검증"), ("status_code=202", "202 Accepted"), ] for sym, desc in ep_checks: status = "OK" if sym in router_src else "ERR" if status == "ERR": ok = False print(f" {status} {desc}") print("\n=== 4. main.py D-4 등록 확인 ===") with open("main.py", encoding="utf-8") as f: main_src = f.read() for sym, desc in [ ("vuln_scan", "vuln_scan import"), ("vuln_scan.router", "vuln_scan 라우터 등록"), ("D-4", "D-4 섹션 주석"), ]: status = "OK" if sym in main_src else "ERR" if status == "ERR": ok = False print(f" {status} {desc}") print("\n=== 5. 취약점 엔진 핵심 로직 검증 ===") try: import sys as _sys, importlib.util as _ilu, time as _t mod_name = f"_vuln_{int(_t.time()*1000)}" spec = _ilu.spec_from_file_location(mod_name, "core/vuln_scan.py") m = _ilu.module_from_spec(spec) _sys.modules[mod_name] = m spec.loader.exec_module(m) # 버전 취약점 체크 vulns = m.check_version_vulns("Server: Apache/2.4.49 (Unix)") cves = [v["cve"] for v in vulns] assert "CVE-2021-41773" in cves, f"Apache 경로순회 CVE 미탐지: {cves}" print(f" OK Apache/2.4.49 → CVE-2021-41773 탐지") vulns2 = m.check_version_vulns("OpenSSH_7.2p2 Ubuntu") cves2 = [v["cve"] for v in vulns2] assert "CVE-2023-38408" in cves2, f"OpenSSH CVE 미탐지: {cves2}" print(f" OK OpenSSH_7 → CVE-2023-38408 탐지") vulns3 = m.check_version_vulns("nginx/1.18.0 (Ubuntu)") # nginx/1.1 패턴은 1.1x에 해당하지 않으므로 미탐지가 정상 print(f" OK nginx/1.18 탐지 없음 ({len(vulns3)}개) - 정상") # 설정 취약점 체크 open_ports = [{"port": 23}, {"port": 80}, {"port": 443}, {"port": 6379}] issues = m.check_config_issues("192.168.1.1", open_ports) issue_names = [i["check"] for i in issues] assert "Telnet 활성화" in issue_names, f"Telnet 미탐지: {issue_names}" assert "Redis 외부 노출" in issue_names, f"Redis 미탐지: {issue_names}" print(f" OK Telnet/Redis 설정 이슈 탐지") # 위험 포트 체크 assert 23 in m.DANGER_PORTS, "Telnet이 DANGER_PORTS에 없음" assert 3389 in m.DANGER_PORTS, "RDP가 DANGER_PORTS에 없음" assert 445 in m.DANGER_PORTS, "SMB가 DANGER_PORTS에 없음" print(f" OK DANGER_PORTS: {sorted(m.DANGER_PORTS)}") except AssertionError as e: print(f" ERR {e}") ok = False except Exception as e: print(f" ERR 취약점 로직 오류: {type(e).__name__}: {e}") ok = False finally: _sys.modules.pop(mod_name, None) print("\n=== 6. CVSS 점수 계산 검증 ===") try: mod_name2 = f"_vuln2_{int(_t.time()*1000)}" spec2 = _ilu.spec_from_file_location(mod_name2, "core/vuln_scan.py") m2 = _ilu.module_from_spec(spec2) _sys.modules[mod_name2] = m2 spec2.loader.exec_module(m2) # CRITICAL: NETWORK + LOW + NONE + HIGH score = m2.calculate_cvss_simplified("NETWORK", "LOW", "NONE", "HIGH") assert score >= 9.0, f"CVSS CRITICAL 기대 >= 9.0: {score}" print(f" OK NETWORK/LOW/NONE/HIGH → CVSS {score} (CRITICAL)") # LOW: LOCAL + HIGH + HIGH + LOW score2 = m2.calculate_cvss_simplified("LOCAL", "HIGH", "HIGH", "LOW") assert score2 < 5.0, f"CVSS LOW 기대 < 5.0: {score2}" print(f" OK LOCAL/HIGH/HIGH/LOW → CVSS {score2} (낮음)") # NONE impact → 0.0 score3 = m2.calculate_cvss_simplified("NETWORK", "LOW", "NONE", "NONE") assert score3 == 0.0, f"impact=NONE → 0.0 기대: {score3}" print(f" OK impact=NONE → CVSS {score3}") except AssertionError as e: print(f" ERR {e}") ok = False except Exception as e: print(f" ERR CVSS 계산 오류: {type(e).__name__}: {e}") ok = False finally: _sys.modules.pop(mod_name2, None) print("\n=== 7. 위험 점수 계산 검증 ===") # severity_summary 기반 risk_score 계산 cases = [ ({"CRITICAL": 3, "HIGH": 0, "MEDIUM": 0, "LOW": 0}, 75, "CRITICAL 3개 → 75"), ({"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0}, 0, "취약점 없음 → 0"), ({"CRITICAL": 5, "HIGH": 5, "MEDIUM": 5, "LOW": 5}, 100, "복합 → 100 캡"), ] for sev, expected_min, desc in cases: score = min(100, ( sev.get("CRITICAL", 0) * 25 + sev.get("HIGH", 0) * 10 + sev.get("MEDIUM", 0) * 5 + sev.get("LOW", 0) * 1 )) status = "OK" if score >= expected_min else "ERR" if status == "ERR": ok = False print(f" {status} {desc}: score={score}") print("\n=== 8. 보안 정책 준수 확인 ===") policy_checks = [ ("외부 API 금지" in vuln_src or "외부 취약점 DB" in vuln_src, "외부 DB/API 금지 명시"), ("localhost:11434" in vuln_src, "내부 LLM만 사용"), ("graceful fallback" in vuln_src or "None" in vuln_src, "LLM 실패 시 폴백"), ("root" in vuln_src, "root 접속 제한 언급"), (vuln_src.count("CVE-") >= 5, "5개 이상 CVE 패턴"), ] for check, desc in policy_checks: status = "OK" if check else "ERR" if status == "ERR": ok = False print(f" {status} {desc}") print("\n=== 9. CVE 패턴 DB 완성도 확인 ===") try: mod_name3 = f"_vuln3_{int(_t.time()*1001)}" spec3 = _ilu.spec_from_file_location(mod_name3, "core/vuln_scan.py") m3 = _ilu.module_from_spec(spec3) _sys.modules[mod_name3] = m3 spec3.loader.exec_module(m3) patterns = m3.VULN_VERSION_PATTERNS assert len(patterns) >= 5, f"CVE 패턴이 5개 미만: {len(patterns)}" print(f" OK CVE 패턴 {len(patterns)}개 등록") severities = {p[3] for p in patterns} assert "CRITICAL" in severities, "CRITICAL 심각도 패턴 없음" assert "HIGH" in severities, "HIGH 심각도 패턴 없음" print(f" OK 심각도 수준: {sorted(severities)}") cve_ids = [p[2] for p in patterns] assert len(set(cve_ids)) == len(cve_ids), "CVE ID 중복 존재" print(f" OK CVE ID 중복 없음: {cve_ids}") except AssertionError as e: print(f" ERR {e}") ok = False except Exception as e: print(f" ERR CVE DB 확인 오류: {type(e).__name__}: {e}") ok = False finally: _sys.modules.pop(mod_name3, None) print("\n=== 10. scan_id 고유성 확인 ===") import hashlib, time ids = set() for i in range(5): time.sleep(0.001) scan_id = hashlib.sha256( f"192.168.1.{i}:{__import__('datetime').datetime.utcnow().isoformat()}:user{i}".encode() ).hexdigest()[:12] ids.add(scan_id) assert len(ids) == 5, f"scan_id 중복: {ids}" print(f" OK scan_id 5개 모두 고유: 예) {list(ids)[0]}") print("\n=== D-4 취약점 스캔 테스트 완료 ===") if ok: print("모든 검사 통과") else: sys.exit(1)