zioinfo-mail/workspace/guardia-itsm/routers/network_devices.py
DESKTOP-TKLFCPR\ython cfe2901a55 refactor(structure): consolidate all projects under workspace/
- itsm/    -> workspace/guardia-itsm/
- manager/ -> workspace/guardia-manager/
- app/     -> workspace/guardia-messenger/
- manual/  -> workspace/guardia-docs/

workspace/zioinfo-web/ unchanged.
git mv preserves full commit history.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-31 23:50:56 +09:00

321 lines
11 KiB
Python

"""
네트워크 장비 관리 API.
엔드포인트:
GET /api/network/devices 장비 목록
POST /api/network/devices 장비 등록 (ADMIN)
GET /api/network/devices/{id} 장비 상세
PUT /api/network/devices/{id} 장비 수정 (ADMIN)
DELETE /api/network/devices/{id} 장비 비활성화 (ADMIN)
POST /api/network/devices/{id}/backup 설정 백업 실행
GET /api/network/devices/{id}/backups 백업 이력
GET /api/network/devices/{id}/diff 설정 변경 비교
POST /api/network/devices/{id}/command SSH 명령 실행
GET /api/network/topology 네트워크 토폴로지
"""
from __future__ import annotations
import logging
from datetime import datetime
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy import select, desc
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user, require_admin_role
from database import get_db
from models import NetworkDevice, NetworkConfigBackup, User, UserRole
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/network", tags=["network"])
# ── 권한 ─────────────────────────────────────────────────────────────────────
def _require_ops(current_user: User = Depends(get_current_user)) -> User:
if current_user.role not in (UserRole.ADMIN, UserRole.PM, UserRole.ENGINEER):
raise HTTPException(403, "네트워크 관리 권한이 없습니다.")
return current_user
# ── Pydantic 스키마 ──────────────────────────────────────────────────────────
class DeviceCreate(BaseModel):
device_name: str
device_type: str # SWITCH | ROUTER | FIREWALL | LOAD_BALANCER
vendor: str # CISCO | HUAWEI | JUNIPER | PIOLINK | SECUI | RADWARE
model: Optional[str] = None
os_type: str = "cisco_ios" # cisco_ios | huawei_vrp | junos | linux
ip_addr: str # 저장용 (API 응답 미포함)
ssh_user: str # 저장용 (API 응답 미포함)
ssh_password: str # 저장 전 AES-256 암호화
ssh_port: int = 22
location: Optional[str] = None
inst_id: Optional[int] = None
class DeviceOut(BaseModel):
id: int
device_name: str
device_type: str
vendor: str
model: Optional[str]
os_type: str
# ip_addr, ssh_user, ssh_pw_enc 절대 미포함
location: Optional[str]
inst_id: Optional[int]
is_active: bool
last_backup_at: Optional[datetime]
model_config = {"from_attributes": True}
class BackupOut(BaseModel):
id: int
device_id: int
config_hash: str
backup_type: str
backed_up_at: datetime
backed_up_by: Optional[str]
# config_text 미포함 (대용량)
model_config = {"from_attributes": True}
class CommandRequest(BaseModel):
command: str
timeout: int = 30
# ── 엔드포인트 ───────────────────────────────────────────────────────────────
@router.get("/devices", response_model=List[DeviceOut])
async def list_devices(
inst_id: Optional[int] = None,
device_type: Optional[str] = None,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(_require_ops),
):
"""네트워크 장비 목록."""
q = select(NetworkDevice).where(NetworkDevice.is_active == True)
if inst_id:
q = q.where(NetworkDevice.inst_id == inst_id)
if device_type:
q = q.where(NetworkDevice.device_type == device_type)
result = await db.execute(q.order_by(NetworkDevice.device_name))
return result.scalars().all()
@router.post("/devices", response_model=DeviceOut, status_code=201)
async def create_device(
body: DeviceCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_admin_role),
):
"""네트워크 장비 등록 (ADMIN 전용). 비밀번호는 AES-256-GCM 암호화 저장."""
from core.ssh_exec import _encrypt_password
try:
pw_enc = _encrypt_password(body.ssh_password)
except Exception:
raise HTTPException(500, "자격증명 암호화 실패")
device = NetworkDevice(
device_name=body.device_name,
device_type=body.device_type,
vendor=body.vendor,
model=body.model,
os_type=body.os_type,
ip_addr=body.ip_addr,
ssh_user=body.ssh_user,
ssh_pw_enc=pw_enc,
ssh_port=body.ssh_port,
location=body.location,
inst_id=body.inst_id,
)
db.add(device)
await db.commit()
await db.refresh(device)
return device
@router.get("/devices/{device_id}", response_model=DeviceOut)
async def get_device(
device_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(_require_ops),
):
q = await db.execute(select(NetworkDevice).where(NetworkDevice.id == device_id))
d = q.scalar_one_or_none()
if not d:
raise HTTPException(404, "장비를 찾을 수 없습니다.")
return d
@router.put("/devices/{device_id}", response_model=DeviceOut)
async def update_device(
device_id: int,
body: DeviceCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_admin_role),
):
from core.ssh_exec import _encrypt_password
q = await db.execute(select(NetworkDevice).where(NetworkDevice.id == device_id))
d = q.scalar_one_or_none()
if not d:
raise HTTPException(404, "장비를 찾을 수 없습니다.")
d.device_name = body.device_name
d.device_type = body.device_type
d.vendor = body.vendor
d.model = body.model
d.os_type = body.os_type
d.ip_addr = body.ip_addr
d.ssh_user = body.ssh_user
d.ssh_pw_enc = _encrypt_password(body.ssh_password)
d.ssh_port = body.ssh_port
d.location = body.location
d.inst_id = body.inst_id
await db.commit()
await db.refresh(d)
return d
@router.delete("/devices/{device_id}", status_code=204)
async def deactivate_device(
device_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_admin_role),
):
"""장비 비활성화 (삭제 아님)."""
q = await db.execute(select(NetworkDevice).where(NetworkDevice.id == device_id))
d = q.scalar_one_or_none()
if not d:
raise HTTPException(404, "장비를 찾을 수 없습니다.")
d.is_active = False
await db.commit()
@router.post("/devices/{device_id}/backup")
async def backup_device_config(
device_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(_require_ops),
):
"""장비 설정 백업 실행. 이전 설정과 diff 비교 결과 포함."""
from core.network_scanner import NetworkScanner
result = await NetworkScanner().backup_config(
db, device_id, backup_type="MANUAL", backed_up_by=current_user.username
)
if not result["success"]:
raise HTTPException(400, result.get("error", "백업 실패"))
return result
@router.get("/devices/{device_id}/backups", response_model=List[BackupOut])
async def list_device_backups(
device_id: int,
limit: int = 20,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(_require_ops),
):
"""백업 이력 목록 (설정 내용 제외)."""
q = await db.execute(
select(NetworkConfigBackup)
.where(NetworkConfigBackup.device_id == device_id)
.order_by(desc(NetworkConfigBackup.backed_up_at))
.limit(limit)
)
return q.scalars().all()
@router.get("/devices/{device_id}/diff")
async def get_config_diff(
device_id: int,
old_id: Optional[int] = None,
new_id: Optional[int] = None,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(_require_ops),
):
"""설정 변경 비교. 파라미터 없으면 최근 2개 비교."""
from core.network_scanner import NetworkScanner
result = await NetworkScanner().get_config_diff(db, device_id, old_id, new_id)
if not result["success"]:
raise HTTPException(400, result.get("error", "비교 실패"))
return result
@router.post("/devices/{device_id}/command")
async def execute_device_command(
device_id: int,
body: CommandRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(_require_ops),
):
"""SSH 명령 실행 (안전 명령만 허용)."""
from core.network_scanner import NetworkScanner
from core.ssh_exec import _decrypt_password
q = await db.execute(select(NetworkDevice).where(NetworkDevice.id == device_id,
NetworkDevice.is_active == True))
device = q.scalar_one_or_none()
if not device:
raise HTTPException(404, "장비를 찾을 수 없습니다.")
scanner = NetworkScanner()
if not scanner.is_command_safe(body.command):
raise HTTPException(400, "허용되지 않는 명령어입니다.")
try:
pw = _decrypt_password(device.ssh_pw_enc)
except Exception:
raise HTTPException(500, "자격증명 복호화 실패")
result = await scanner.execute_command(
device.ip_addr, device.ssh_user, pw,
device.ssh_port or 22, body.command, body.timeout
)
return {
"device_name": device.device_name,
"command": body.command,
"success": result["success"],
"stdout": result["stdout"][:5000], # 최대 5000자
"stderr": result["stderr"][:500],
"exit_code": result["exit_code"],
}
@router.get("/topology")
async def get_topology(
inst_id: Optional[int] = None,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(_require_ops),
):
"""네트워크 토폴로지 (장비 목록 + 타입별 분류)."""
q = select(NetworkDevice).where(NetworkDevice.is_active == True)
if inst_id:
q = q.where(NetworkDevice.inst_id == inst_id)
result = await db.execute(q)
devices = result.scalars().all()
topology: dict = {"nodes": [], "by_type": {}}
for d in devices:
node = {
"id": d.id,
"name": d.device_name,
"type": d.device_type,
"vendor": d.vendor,
"location": d.location,
"inst_id": d.inst_id,
"last_backup_at": d.last_backup_at.isoformat() if d.last_backup_at else None,
}
topology["nodes"].append(node)
topology["by_type"].setdefault(d.device_type, []).append(node)
return {
"total": len(devices),
"topology": topology,
}