--- name: network-devices description: "GUARDiA 네트워크 장비 관리 구현 스킬. 스위치/라우터/방화벽의 SSH 기반 설정 백업, 변경 감지, 명령 실행, 토폴로지 관리를 FastAPI + paramiko 패턴으로 구현한다. 다음 상황에서 반드시 사용: (1) '네트워크 장비', '스위치', '라우터', '방화벽' 관리 구현 요청; (2) network_devices.py 라우터 또는 core/network_scanner.py 작업; (3) 장비 설정 백업/비교/변경감지 구현; (4) 네트워크 토폴로지 구현; (5) 다시 실행, 업데이트, 보완 요청." --- # 네트워크 장비 관리 구현 스킬 ## 구현 대상 파일 - `itsm/core/network_scanner.py` — 장비 접속/명령 실행/백업 로직 - `itsm/routers/network_devices.py` — FastAPI 라우터 ## DB 모델 (models.py에 추가) ```python class NetworkDevice(Base): __tablename__ = "tb_network_device" id = Column(Integer, primary_key=True) device_name = Column(String(100), nullable=False) device_type = Column(String(30)) # SWITCH | ROUTER | FIREWALL | LOAD_BALANCER vendor = Column(String(30)) # CISCO | HUAWEI | JUNIPER | PIOLINK | SECUI | RADWARE model = Column(String(100)) os_type = Column(String(30)) # cisco_ios | huawei_vrp | junos | linux ip_addr = Column(String(45)) # NOT exposed in API ssh_user = Column(String(50)) # NOT exposed ssh_pw_enc = Column(Text) # AES-256, NEVER exposed ssh_port = Column(Integer, default=22) location = Column(String(200)) inst_id = Column(Integer, ForeignKey("tb_inst_meta.id")) is_active = Column(Boolean, default=True) last_backup_at = Column(DateTime) created_at = Column(DateTime, default=func.now()) backups = relationship("NetworkConfigBackup", back_populates="device", cascade="all, delete-orphan") class NetworkConfigBackup(Base): __tablename__ = "tb_network_backup" id = Column(Integer, primary_key=True) device_id = Column(Integer, ForeignKey("tb_network_device.id")) config_text = Column(Text) # 설정 전문 (암호화 선택) config_hash = Column(String(64)) # SHA-256 backup_type = Column(String(20)) # SCHEDULED | MANUAL | PRE_CHANGE backed_up_at = Column(DateTime, default=func.now()) backed_up_by = Column(String(100)) device = relationship("NetworkDevice", back_populates="backups") ``` ## 벤더별 표준 명령어 매핑 ```python DEVICE_COMMANDS = { "cisco_ios": { "get_config": "show running-config", "get_version": "show version", "get_interfaces": "show interfaces status", "get_vlan": "show vlan brief", "get_arp": "show arp", "get_route": "show ip route", "save_config": "write memory", }, "huawei_vrp": { "get_config": "display current-configuration", "get_version": "display version", "get_interfaces": "display interface brief", "get_vlan": "display vlan", "get_arp": "display arp all", "save_config": "save force", }, "junos": { "get_config": "show configuration | display set", "get_version": "show version", "get_interfaces": "show interfaces terse", "get_route": "show route", }, "linux": { # PIOLINK, SECUI 방화벽 (Linux 기반) "get_config": "cat /etc/fw/rules.conf 2>/dev/null || iptables-save", "get_version": "cat /etc/os-release", "get_interfaces": "ip addr show", "get_route": "ip route show", }, } # 위험 명령어 차단 목록 (실행 전 검증) BLOCKED_COMMANDS = [ "write erase", "factory-reset", "reload", "reboot", "rm -rf", "mkfs", "fdisk", "format", "no service", "delete flash:", ] ``` ## core/network_scanner.py 구현 패턴 ```python import asyncio, difflib, hashlib import paramiko from sqlalchemy.ext.asyncio import AsyncSession class NetworkScanner: def _is_command_safe(self, command: str) -> bool: """위험 명령어 차단.""" cmd_lower = command.lower() return not any(blocked in cmd_lower for blocked in BLOCKED_COMMANDS) async def execute_command(self, device: NetworkDevice, command: str, decrypt_fn) -> dict: """SSH 명령 실행 (벤더 무관 인터페이스).""" if not self._is_command_safe(command): return {"success": False, "error": "차단된 명령어입니다."} # paramiko SSH 접속 → 명령 실행 → stdout 반환 ... async def backup_config(self, db: AsyncSession, device: NetworkDevice, backup_type: str, user: str) -> NetworkConfigBackup: """설정 백업: 표준 명령 실행 → DB 저장.""" config_cmd = DEVICE_COMMANDS.get(device.os_type, {}).get("get_config", "") result = await self.execute_command(device, config_cmd, decrypt_fn) config_text = result["stdout"] config_hash = hashlib.sha256(config_text.encode()).hexdigest() backup = NetworkConfigBackup( device_id=device.id, config_text=config_text, config_hash=config_hash, backup_type=backup_type, backed_up_by=user, ) db.add(backup) await db.commit() return backup def diff_configs(self, old: str, new: str) -> list[str]: """unified diff 형식으로 설정 변경 사항 반환.""" return list(difflib.unified_diff( old.splitlines(), new.splitlines(), lineterm="", n=3, )) ``` ## API 응답에서 민감 정보 제외 ```python class NetworkDeviceOut(BaseModel): id: int device_name: str device_type: str vendor: str model: Optional[str] os_type: str # ip_addr, ssh_user, ssh_pw_enc 절대 포함 금지 location: Optional[str] inst_id: Optional[int] is_active: bool last_backup_at: Optional[datetime] ``` ## 설정 차이 탐지 및 알림 - 스케줄 백업 시 이전 백업과 diff → 변경 감지 시 SSE 이벤트 발행 - diff 결과가 있으면 tb_audit_log에 "설정 변경 감지" 기록 - 변경된 라인 수가 10줄 이상이면 MEDIUM 알림, 50줄 이상이면 HIGH 알림