""" 네트워크 장비 SSH 접속 및 설정 관리 엔진. 벤더별(Cisco/Huawei/Juniper/Linux) 표준 명령어 추상화. 설정 백업 → SHA-256 해시 → diff 변경 감지 → 알림. """ from __future__ import annotations import asyncio import difflib import hashlib import logging from datetime import datetime from typing import Optional import paramiko from sqlalchemy import select, desc from sqlalchemy.ext.asyncio import AsyncSession logger = logging.getLogger(__name__) # ── 벤더별 표준 명령어 ───────────────────────────────────────────────────── DEVICE_COMMANDS: dict[str, dict[str, str]] = { "cisco_ios": { "get_config": "show running-config", "get_version": "show version", "get_interfaces": "show interfaces status", "get_vlan": "show vlan brief", "get_arp": "show arp", "get_route": "show ip route", }, "huawei_vrp": { "get_config": "display current-configuration", "get_version": "display version", "get_interfaces": "display interface brief", "get_vlan": "display vlan", "get_arp": "display arp all", "get_route": "display ip routing-table", }, "junos": { "get_config": "show configuration | display set", "get_version": "show version", "get_interfaces": "show interfaces terse", "get_route": "show route", }, "linux": { "get_config": "iptables-save 2>/dev/null || cat /etc/fw/rules.conf 2>/dev/null", "get_version": "cat /etc/os-release", "get_interfaces": "ip addr show", "get_route": "ip route show", }, } # 위험 명령어 패턴 — 설정 변경/초기화/재부팅 방지 _BLOCKED_PATTERNS = [ "write erase", "factory-reset", "factory reset", "reload", "reboot", "shutdown", "rm -rf", "mkfs", "fdisk", "format flash", "no service", "delete flash:", "erase startup", ] class NetworkScanner: """네트워크 장비 SSH 접속 및 설정 관리.""" # ── 보안 검증 ─────────────────────────────────────────────────────────── def is_command_safe(self, command: str) -> bool: """위험 명령어 차단.""" cmd_lower = command.lower().strip() return not any(p in cmd_lower for p in _BLOCKED_PATTERNS) # ── SSH 명령 실행 ─────────────────────────────────────────────────────── async def execute_command(self, ip: str, user: str, password: str, port: int, command: str, timeout: int = 30) -> dict: """SSH 명령 실행. IP/계정 정보는 반환값에 포함하지 않는다.""" if not self.is_command_safe(command): return {"success": False, "stdout": "", "stderr": "차단된 명령어입니다.", "exit_code": -1} try: result = await asyncio.get_event_loop().run_in_executor( None, self._sync_ssh_exec, ip, user, password, port, command, timeout ) return result except Exception as e: logger.error("SSH exec error: %s", e) return {"success": False, "stdout": "", "stderr": str(e)[:200], "exit_code": -1} def _sync_ssh_exec(self, ip: str, user: str, password: str, port: int, command: str, timeout: int) -> dict: client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: client.connect(ip, port=port, username=user, password=password, timeout=15, allow_agent=False, look_for_keys=False) _, stdout, stderr = client.exec_command(command, timeout=timeout) exit_code = stdout.channel.recv_exit_status() return { "success": exit_code == 0, "stdout": stdout.read().decode(errors="replace"), "stderr": stderr.read().decode(errors="replace"), "exit_code": exit_code, } finally: client.close() # ── 설정 백업 ─────────────────────────────────────────────────────────── async def backup_config(self, db: AsyncSession, device_id: int, backup_type: str, backed_up_by: str) -> dict: """ 장비 설정 백업 실행. 이전 백업과 diff를 비교하여 변경 감지. """ from models import NetworkDevice, NetworkConfigBackup from core.ssh_exec import _decrypt_password q = await db.execute( select(NetworkDevice).where(NetworkDevice.id == device_id, NetworkDevice.is_active == True) ) device = q.scalar_one_or_none() if not device: return {"success": False, "error": "장비를 찾을 수 없습니다."} try: password = _decrypt_password(device.ssh_pw_enc) except Exception as e: return {"success": False, "error": "자격증명 복호화 실패"} get_config_cmd = DEVICE_COMMANDS.get(device.os_type or "linux", {}).get("get_config", "") if not get_config_cmd: return {"success": False, "error": f"지원하지 않는 OS 타입: {device.os_type}"} exec_result = await self.execute_command( device.ip_addr, device.ssh_user, password, device.ssh_port or 22, get_config_cmd, timeout=60 ) if not exec_result["success"]: return {"success": False, "error": exec_result["stderr"][:200]} config_text = exec_result["stdout"] config_hash = hashlib.sha256(config_text.encode()).hexdigest() # 이전 백업과 diff prev_q = await db.execute( select(NetworkConfigBackup) .where(NetworkConfigBackup.device_id == device_id) .order_by(desc(NetworkConfigBackup.backed_up_at)) .limit(1) ) prev_backup = prev_q.scalar_one_or_none() changed_lines = 0 diff_summary = [] if prev_backup and prev_backup.config_hash != config_hash: diff = self.diff_configs(prev_backup.config_text or "", config_text) changed_lines = sum(1 for line in diff if line.startswith(("+", "-")) and not line.startswith(("+++", "---"))) diff_summary = diff[:50] # 최대 50줄만 저장 backup = NetworkConfigBackup( device_id=device_id, config_text=config_text, config_hash=config_hash, backup_type=backup_type, backed_up_by=backed_up_by, backed_up_at=datetime.now(), ) db.add(backup) # 장비 최종 백업 시각 갱신 device.last_backup_at = datetime.now() await db.commit() await db.refresh(backup) return { "success": True, "backup_id": backup.id, "device_name": device.device_name, "config_hash": config_hash, "changed_lines": changed_lines, "diff_summary": diff_summary if changed_lines > 0 else [], "backed_up_at": backup.backed_up_at.isoformat(), } # ── 설정 비교 ─────────────────────────────────────────────────────────── def diff_configs(self, old: str, new: str) -> list[str]: """unified diff 형식으로 설정 변경 사항 반환.""" return list(difflib.unified_diff( old.splitlines(), new.splitlines(), fromfile="이전 설정", tofile="현재 설정", lineterm="", n=3, )) async def get_config_diff(self, db: AsyncSession, device_id: int, backup_id_old: Optional[int] = None, backup_id_new: Optional[int] = None) -> dict: """두 백업 간 설정 차이 반환. ID 미지정 시 최근 2개 비교.""" from models import NetworkConfigBackup if backup_id_old and backup_id_new: q_old = await db.execute( select(NetworkConfigBackup).where( NetworkConfigBackup.id == backup_id_old, NetworkConfigBackup.device_id == device_id, ) ) q_new = await db.execute( select(NetworkConfigBackup).where( NetworkConfigBackup.id == backup_id_new, NetworkConfigBackup.device_id == device_id, ) ) old_b = q_old.scalar_one_or_none() new_b = q_new.scalar_one_or_none() else: q = await db.execute( select(NetworkConfigBackup) .where(NetworkConfigBackup.device_id == device_id) .order_by(desc(NetworkConfigBackup.backed_up_at)) .limit(2) ) backups = q.scalars().all() if len(backups) < 2: return {"success": False, "error": "비교할 백업이 2개 미만입니다."} new_b, old_b = backups[0], backups[1] if not old_b or not new_b: return {"success": False, "error": "백업을 찾을 수 없습니다."} diff = self.diff_configs(old_b.config_text or "", new_b.config_text or "") added = [l for l in diff if l.startswith("+") and not l.startswith("+++")] removed = [l for l in diff if l.startswith("-") and not l.startswith("---")] return { "success": True, "device_id": device_id, "old_backup_id": old_b.id, "new_backup_id": new_b.id, "old_backed_up_at": old_b.backed_up_at.isoformat(), "new_backed_up_at": new_b.backed_up_at.isoformat(), "changed": len(added) + len(removed) > 0, "added_lines": len(added), "removed_lines": len(removed), "diff": diff[:200], # 최대 200줄 }