""" SNMP 기반 네트워크 장비 자동 발견 SNMP v2c/v3으로 스위치·라우터·방화벽을 발견하고 CMDB에 등록. 에이전트리스: nmap + SNMP 프로토콜만 사용. 엔드포인트: POST /api/snmp/scan — SNMP 스캔 GET /api/snmp/devices — 발견된 장비 목록 POST /api/snmp/poll/{device_id} — 단일 장비 SNMP 폴링 GET /api/snmp/configs — SNMP 설정 목록 POST /api/snmp/configs — SNMP 커뮤니티 설정 등록 """ from __future__ import annotations import asyncio import logging from datetime import datetime from typing import Optional from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException from pydantic import BaseModel, Field from sqlalchemy import select, desc from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user, require_admin_role from database import get_db from models import User, SNMPConfig, SNMPDevice logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/snmp", tags=["SNMP 자동 발견"]) # SNMP OID 상수 (RFC 1213 MIB-II) OID_SYSNAME = "1.3.6.1.2.1.1.5.0" OID_SYSDESCR = "1.3.6.1.2.1.1.1.0" OID_SYSLOCATION = "1.3.6.1.2.1.1.6.0" OID_SYSCONTACT = "1.3.6.1.2.1.1.4.0" OID_IFNUMBER = "1.3.6.1.2.1.2.1.0" class SNMPScanRequest(BaseModel): network_range: str community: str = Field("public", description="SNMP v2c 커뮤니티 문자열") version: int = Field(2, ge=1, le=3) timeout_sec: int = Field(3, ge=1, le=10) port: int = 161 class SNMPConfigCreate(BaseModel): name: str community: str version: int = 2 ip_ranges: list[str] = [] async def _snmp_get(ip: str, community: str, oid: str, port: int = 161, timeout: int = 3) -> Optional[str]: """SNMP GET 요청 (snmpget 명령 경유 또는 순수 UDP 패킷).""" import subprocess try: r = subprocess.run( ["snmpget", "-v2c", f"-c{community}", "-t", str(timeout), "-r1", f"{ip}:{port}", oid], capture_output=True, text=True, timeout=timeout + 1 ) if r.returncode == 0 and r.stdout: # 출력 형식: OID = STRING: "value" 또는 INTEGER: 1 parts = r.stdout.strip().split(":", 2) return parts[-1].strip().strip('"') if len(parts) >= 2 else r.stdout.strip() except (subprocess.TimeoutExpired, FileNotFoundError): pass return None async def _scan_snmp_host(ip: str, community: str, port: int, timeout: int) -> Optional[dict]: """단일 호스트 SNMP 정보 수집.""" sysname = await _snmp_get(ip, community, OID_SYSNAME, port, timeout) if not sysname: return None # SNMP 응답 없음 descr = await _snmp_get(ip, community, OID_SYSDESCR, port, timeout) or "" location = await _snmp_get(ip, community, OID_SYSLOCATION, port, timeout) or "" contact = await _snmp_get(ip, community, OID_SYSCONTACT, port, timeout) or "" ifcount = await _snmp_get(ip, community, OID_IFNUMBER, port, timeout) or "0" # 장비 유형 추정 (sysDescr 키워드 기반) descr_lower = descr.lower() if any(k in descr_lower for k in ["cisco", "juniper", "arista", "switch", "router"]): device_type = "NETWORK" elif any(k in descr_lower for k in ["linux", "windows", "unix"]): device_type = "SERVER" elif any(k in descr_lower for k in ["ups", "power"]): device_type = "UPS" elif any(k in descr_lower for k in ["printer", "print"]): device_type = "PRINTER" else: device_type = "UNKNOWN" return { "ip": ip, "sysname": sysname, "description": descr[:200], "location": location, "contact": contact, "interface_count": int(ifcount) if ifcount.isdigit() else 0, "device_type": device_type, } async def _run_snmp_scan(config_id: int, req: SNMPScanRequest, user_id: int, db: AsyncSession): """백그라운드 SNMP 스캔.""" import ipaddress try: network = ipaddress.ip_network(req.network_range, strict=False) tasks = [ _scan_snmp_host(str(h), req.community, req.port, req.timeout_sec) for h in list(network.hosts())[:254] ] results = await asyncio.gather(*tasks, return_exceptions=True) for result in results: if isinstance(result, dict): device = SNMPDevice( config_id=config_id, ip_addr=result["ip"], sysname=result["sysname"], description=result["description"], device_type=result["device_type"], location=result["location"], interface_count=result["interface_count"], discovered_at=datetime.utcnow(), ) db.add(device) await db.commit() except Exception as e: logger.error(f"SNMP 스캔 실패: {e}") @router.post("/configs") async def create_snmp_config( req: SNMPConfigCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_admin_role), ): cfg = SNMPConfig( tenant_id=user.tenant_id, name=req.name, community_enc=req.community, # TODO: AES-256-GCM 암호화 version=req.version, ip_ranges=req.ip_ranges, is_active=True, created_at=datetime.utcnow(), ) db.add(cfg) await db.commit() await db.refresh(cfg) return {"ok": True, "id": cfg.id} @router.get("/configs") async def list_snmp_configs( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): rows = await db.execute( select(SNMPConfig).where(SNMPConfig.tenant_id == user.tenant_id) ) cfgs = rows.scalars().all() return [ {"id": c.id, "name": c.name, "version": c.version, "ip_ranges": c.ip_ranges, "is_active": c.is_active} for c in cfgs ] @router.post("/scan") async def start_snmp_scan( req: SNMPScanRequest, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db), user: User = Depends(require_admin_role), ): # 기본 SNMP 설정 생성 cfg = SNMPConfig( tenant_id=user.tenant_id, name=f"scan_{datetime.utcnow().strftime('%Y%m%d%H%M')}", community_enc=req.community, version=req.version, is_active=True, created_at=datetime.utcnow(), ) db.add(cfg) await db.commit() await db.refresh(cfg) background_tasks.add_task(_run_snmp_scan, cfg.id, req, user.id, db) return {"ok": True, "config_id": cfg.id, "network": req.network_range} @router.get("/devices") async def list_snmp_devices( limit: int = 100, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): rows = await db.execute( select(SNMPDevice, SNMPConfig.tenant_id).join( SNMPConfig, SNMPDevice.config_id == SNMPConfig.id ).where(SNMPConfig.tenant_id == user.tenant_id) .order_by(desc(SNMPDevice.discovered_at)).limit(limit) ) return [ { "id": r.SNMPDevice.id, "ip": r.SNMPDevice.ip_addr, "name": r.SNMPDevice.sysname, "type": r.SNMPDevice.device_type, "location": r.SNMPDevice.location, "interfaces": r.SNMPDevice.interface_count, "discovered_at": r.SNMPDevice.discovered_at, } for r in rows.all() ]