""" D-4: 보안 취약점 자동 스캔 엔진 기능: 1. 포트 스캔 (열린 포트 및 서비스 감지) 2. 기본 자격증명 체크 (기본 SSH 계정/패스워드 시도 금지 — 감지만) 3. 웹 취약점 패턴 스캔 (응답 헤더, 버전 노출 등) 4. 설정 취약점 감지 (root SSH 허용 여부, 평문 전송 등) 5. CVE 매핑 (서비스 버전 → 알려진 취약점) 6. Ollama sLLM 보조 분석 (내부 LLM — 외부 API 금지) 보안 제약: - 인가된 대상에만 스캔 수행 - root 계정 활용 금지 (opsagent만) - 스택트레이스 API 노출 금지 - 외부 취약점 DB 조회 금지 (내부망 전용) """ from __future__ import annotations import asyncio import hashlib import logging import socket from datetime import datetime from typing import Dict, List, Optional, Tuple logger = logging.getLogger(__name__) # ── 취약점 심각도 ───────────────────────────────────────────────────────────── SEVERITY = { "CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1, "INFO": 0, } # ── 기본 포트 서비스 매핑 ────────────────────────────────────────────────────── DEFAULT_PORT_SERVICES = { 21: "FTP", 22: "SSH", 23: "Telnet", 25: "SMTP", 53: "DNS", 80: "HTTP", 110: "POP3", 143: "IMAP", 443: "HTTPS", 445: "SMB", 1433: "MSSQL", 1521: "Oracle", 3306: "MySQL", 3389: "RDP", 5432: "PostgreSQL", 5900: "VNC", 6379: "Redis", 8080: "HTTP-Alt", 8443: "HTTPS-Alt", 27017: "MongoDB", } # ── 위험 포트 (즉시 경보) ────────────────────────────────────────────────────── DANGER_PORTS = {23, 445, 3389, 5900} # Telnet, SMB, RDP, VNC # ── 취약 서비스 버전 패턴 (내부 DB) ─────────────────────────────────────────── VULN_VERSION_PATTERNS = [ # (service, version_pattern, cve_id, severity, description) ("OpenSSH", "OpenSSH_7", "CVE-2023-38408", "HIGH", "ssh-add 원격 코드 실행"), ("OpenSSH", "OpenSSH_6", "CVE-2016-0778", "HIGH", "버퍼 오버플로"), ("Apache", "Apache/2.2", "CVE-2017-7679", "CRITICAL","mod_mime 버퍼 오버리드"), ("Apache", "Apache/2.4.49","CVE-2021-41773", "CRITICAL","경로 순회 취약점"), ("nginx", "nginx/1.1", "CVE-2013-2028", "HIGH", "청크 인코딩 스택 버퍼 오버플로"), ("Tomcat", "Apache Tomcat/8","CVE-2020-1938", "CRITICAL","AJP 파일 포함 취약점"), ("MySQL", "5.5.", "CVE-2016-6662", "HIGH", "설정 파일 덮어쓰기"), ("vsftpd", "vsftpd 2.3.4", "CVE-2011-2523", "CRITICAL","백도어"), ("Redis", "Redis", "CVE-2022-0543", "CRITICAL","샌드박스 탈출"), ] # ── 보안 헤더 체크리스트 ────────────────────────────────────────────────────── REQUIRED_SECURITY_HEADERS = [ "Strict-Transport-Security", "X-Frame-Options", "X-Content-Type-Options", "Content-Security-Policy", "X-XSS-Protection", ] def _scan_port(host: str, port: int, timeout: float = 1.0) -> bool: """단일 포트 연결 테스트.""" try: with socket.create_connection((host, port), timeout=timeout): return True except (socket.timeout, ConnectionRefusedError, OSError): return False def _grab_banner(host: str, port: int, timeout: float = 2.0) -> Optional[str]: """서비스 배너 그랩.""" try: with socket.create_connection((host, port), timeout=timeout) as s: s.settimeout(timeout) try: banner = s.recv(1024).decode("utf-8", errors="replace").strip() return banner[:200] if banner else None except Exception: # HTTP 서비스는 먼저 요청 전송 필요 if port in (80, 8080, 443, 8443): s.send(b"GET / HTTP/1.0\r\nHost: " + host.encode() + b"\r\n\r\n") resp = s.recv(2048).decode("utf-8", errors="replace") return resp[:200] return None except Exception: return None def scan_ports( host: str, ports: Optional[List[int]] = None, timeout: float = 1.0, ) -> List[Dict]: """ 포트 스캔 수행. Returns: 열린 포트 정보 리스트 """ if ports is None: ports = list(DEFAULT_PORT_SERVICES.keys()) open_ports = [] for port in ports: if _scan_port(host, port, timeout): service = DEFAULT_PORT_SERVICES.get(port, "Unknown") banner = _grab_banner(host, port, timeout) finding = { "port": port, "service": service, "state": "open", "banner": banner, } # 위험 포트 플래그 if port in DANGER_PORTS: finding["risk"] = "HIGH" finding["note"] = f"{service} 서비스 노출 — 즉시 차단 권고" else: finding["risk"] = "LOW" open_ports.append(finding) return open_ports def check_version_vulns(banner: str, service: str = "") -> List[Dict]: """ 서비스 배너/버전에서 알려진 취약점 패턴 매핑. """ findings = [] text = f"{service} {banner}".lower() for svc, pattern, cve, severity, desc in VULN_VERSION_PATTERNS: if pattern.lower() in text: findings.append({ "cve": cve, "severity": severity, "service": svc, "pattern": pattern, "description": desc, "banner": banner[:100], }) return findings def check_config_issues(host: str, open_ports: List[Dict]) -> List[Dict]: """ 설정 취약점 감지 (포트 조합 기반). """ issues = [] port_set = {p["port"] for p in open_ports} # Telnet 오픈 (평문 전송) if 23 in port_set: issues.append({ "check": "Telnet 활성화", "severity": "HIGH", "detail": "포트 23(Telnet)이 열려 있습니다. 평문 전송으로 자격증명 노출 위험.", "recommend": "Telnet 비활성화 후 SSH로 대체", }) # HTTP + HTTPS 동시 오픈 (HTTP→HTTPS 리다이렉트 미설정 가능) if 80 in port_set and 443 in port_set: issues.append({ "check": "HTTP/HTTPS 동시 노출", "severity": "MEDIUM", "detail": "HTTP(80)와 HTTPS(443)가 동시에 열려 있습니다. HTTP→HTTPS 강제 리다이렉트 확인 필요.", "recommend": "HTTP 접근 시 301 리다이렉트 적용", }) # RDP 오픈 if 3389 in port_set: issues.append({ "check": "RDP 외부 노출", "severity": "CRITICAL", "detail": "포트 3389(RDP)가 열려 있습니다. 브루트포스 및 BlueKeep 공격 벡터.", "recommend": "VPN 뒤로 이동 또는 방화벽 차단, NLA 강제 적용", }) # Redis 오픈 (인증 없을 경우) if 6379 in port_set: issues.append({ "check": "Redis 외부 노출", "severity": "CRITICAL", "detail": "포트 6379(Redis)가 열려 있습니다. 무인증 접근 시 원격 코드 실행 가능.", "recommend": "방화벽 차단 및 requirepass 설정", }) # MongoDB 오픈 if 27017 in port_set: issues.append({ "check": "MongoDB 외부 노출", "severity": "HIGH", "detail": "포트 27017(MongoDB)가 열려 있습니다. 무인증 접근 위험.", "recommend": "방화벽 차단 및 auth 활성화", }) # SMB 오픈 (EternalBlue 등) if 445 in port_set: issues.append({ "check": "SMB 외부 노출", "severity": "CRITICAL", "detail": "포트 445(SMB)가 열려 있습니다. EternalBlue/WannaCry 공격 벡터.", "recommend": "방화벽 차단 (내부망 전용 허용)", }) return issues async def _llm_analyze_findings(findings_summary: str) -> Optional[str]: """ Ollama sLLM으로 취약점 결과 보완 분석. 실패 시 None 반환 (graceful fallback). """ try: import httpx prompt = ( "다음은 서버 보안 스캔 결과입니다. 각 취약점에 대한 간략한 조치 우선순위를 한국어로 3줄 이내로 요약해주세요.\n\n" f"{findings_summary[:500]}" ) async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.post( "http://localhost:11434/api/generate", json={"model": "llama3", "prompt": prompt, "stream": False}, ) if resp.status_code == 200: return resp.json().get("response", "") except Exception: pass return None async def run_vulnerability_scan( host: str, ports: Optional[List[int]] = None, include_llm: bool = True, timeout: float = 1.0, ) -> Dict: """ 통합 취약점 스캔 실행. Returns: {scan_id, host, started_at, completed_at, open_ports, vulnerabilities, config_issues, severity_summary, risk_score, llm_analysis} """ scan_id = hashlib.sha256( f"{host}:{datetime.utcnow().isoformat()}".encode() ).hexdigest()[:12] started_at = datetime.utcnow() logger.info("취약점 스캔 시작: %s (scan_id=%s)", host, scan_id) # 1. 포트 스캔 open_ports = scan_ports(host, ports, timeout) # 2. 버전 취약점 체크 vulnerabilities = [] for p in open_ports: if p.get("banner"): vulns = check_version_vulns(p["banner"], p.get("service", "")) for v in vulns: v["port"] = p["port"] vulnerabilities.append(v) # 3. 설정 취약점 체크 config_issues = check_config_issues(host, open_ports) # 4. 심각도 집계 all_sev = [v["severity"] for v in vulnerabilities] + [c["severity"] for c in config_issues] severity_summary = {s: all_sev.count(s) for s in ["CRITICAL", "HIGH", "MEDIUM", "LOW"]} # 5. 위험 점수 (0~100) risk_score = min(100, ( severity_summary.get("CRITICAL", 0) * 25 + severity_summary.get("HIGH", 0) * 10 + severity_summary.get("MEDIUM", 0) * 5 + severity_summary.get("LOW", 0) * 1 )) completed_at = datetime.utcnow() # 6. LLM 보조 분석 (optional) llm_analysis = None if include_llm and (vulnerabilities or config_issues): summary_text = ( f"열린 포트: {[p['port'] for p in open_ports]}\n" f"취약점: {[v['cve'] for v in vulnerabilities]}\n" f"설정 이슈: {[c['check'] for c in config_issues]}" ) llm_analysis = await _llm_analyze_findings(summary_text) result = { "scan_id": scan_id, "host": host, "started_at": started_at.isoformat(), "completed_at": completed_at.isoformat(), "duration_sec": (completed_at - started_at).total_seconds(), "open_ports": open_ports, "vulnerabilities": vulnerabilities, "config_issues": config_issues, "severity_summary": severity_summary, "risk_score": risk_score, "risk_level": ( "CRITICAL" if risk_score >= 75 else "HIGH" if risk_score >= 50 else "MEDIUM" if risk_score >= 25 else "LOW" ), "llm_analysis": llm_analysis, "total_findings": len(vulnerabilities) + len(config_issues), } logger.info("스캔 완료: %s — 취약점 %d개, 설정이슈 %d개, 위험점수 %d", host, len(vulnerabilities), len(config_issues), risk_score) return result def calculate_cvss_simplified( attack_vector: str = "NETWORK", complexity: str = "LOW", privileges: str = "NONE", impact: str = "HIGH", ) -> float: """ 단순화된 CVSS 점수 계산 (0.0 ~ 10.0). 실제 CVSS v3.1의 근사 계산. """ av_score = {"NETWORK": 0.85, "ADJACENT": 0.62, "LOCAL": 0.55, "PHYSICAL": 0.2}.get(attack_vector, 0.85) ac_score = {"LOW": 0.77, "HIGH": 0.44}.get(complexity, 0.77) pr_score = {"NONE": 0.85, "LOW": 0.62, "HIGH": 0.27}.get(privileges, 0.85) imp_score = {"NONE": 0.0, "LOW": 0.22, "MEDIUM": 0.5, "HIGH": 0.56}.get(impact, 0.56) exploitability = 8.22 * av_score * ac_score * pr_score iss = 1 - (1 - imp_score) ** 3 impact = 7.52 * iss if impact == 0: return 0.0 score = min(10.0, (impact + exploitability) * 1.08) return round(score, 1)