- itsm/ -> workspace/guardia-itsm/ - manager/ -> workspace/guardia-manager/ - app/ -> workspace/guardia-messenger/ - manual/ -> workspace/guardia-docs/ workspace/zioinfo-web/ unchanged. git mv preserves full commit history. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
321 lines
11 KiB
Python
321 lines
11 KiB
Python
"""
|
|
네트워크 장비 관리 API.
|
|
|
|
엔드포인트:
|
|
GET /api/network/devices 장비 목록
|
|
POST /api/network/devices 장비 등록 (ADMIN)
|
|
GET /api/network/devices/{id} 장비 상세
|
|
PUT /api/network/devices/{id} 장비 수정 (ADMIN)
|
|
DELETE /api/network/devices/{id} 장비 비활성화 (ADMIN)
|
|
POST /api/network/devices/{id}/backup 설정 백업 실행
|
|
GET /api/network/devices/{id}/backups 백업 이력
|
|
GET /api/network/devices/{id}/diff 설정 변경 비교
|
|
POST /api/network/devices/{id}/command SSH 명령 실행
|
|
GET /api/network/topology 네트워크 토폴로지
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import List, Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import select, desc
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.auth import get_current_user, require_admin_role
|
|
from database import get_db
|
|
from models import NetworkDevice, NetworkConfigBackup, User, UserRole
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/api/network", tags=["network"])
|
|
|
|
|
|
# ── 권한 ─────────────────────────────────────────────────────────────────────
|
|
|
|
def _require_ops(current_user: User = Depends(get_current_user)) -> User:
|
|
if current_user.role not in (UserRole.ADMIN, UserRole.PM, UserRole.ENGINEER):
|
|
raise HTTPException(403, "네트워크 관리 권한이 없습니다.")
|
|
return current_user
|
|
|
|
|
|
# ── Pydantic 스키마 ──────────────────────────────────────────────────────────
|
|
|
|
class DeviceCreate(BaseModel):
|
|
device_name: str
|
|
device_type: str # SWITCH | ROUTER | FIREWALL | LOAD_BALANCER
|
|
vendor: str # CISCO | HUAWEI | JUNIPER | PIOLINK | SECUI | RADWARE
|
|
model: Optional[str] = None
|
|
os_type: str = "cisco_ios" # cisco_ios | huawei_vrp | junos | linux
|
|
ip_addr: str # 저장용 (API 응답 미포함)
|
|
ssh_user: str # 저장용 (API 응답 미포함)
|
|
ssh_password: str # 저장 전 AES-256 암호화
|
|
ssh_port: int = 22
|
|
location: Optional[str] = None
|
|
inst_id: Optional[int] = None
|
|
|
|
|
|
class DeviceOut(BaseModel):
|
|
id: int
|
|
device_name: str
|
|
device_type: str
|
|
vendor: str
|
|
model: Optional[str]
|
|
os_type: str
|
|
# ip_addr, ssh_user, ssh_pw_enc 절대 미포함
|
|
location: Optional[str]
|
|
inst_id: Optional[int]
|
|
is_active: bool
|
|
last_backup_at: Optional[datetime]
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
class BackupOut(BaseModel):
|
|
id: int
|
|
device_id: int
|
|
config_hash: str
|
|
backup_type: str
|
|
backed_up_at: datetime
|
|
backed_up_by: Optional[str]
|
|
# config_text 미포함 (대용량)
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
class CommandRequest(BaseModel):
|
|
command: str
|
|
timeout: int = 30
|
|
|
|
|
|
# ── 엔드포인트 ───────────────────────────────────────────────────────────────
|
|
|
|
@router.get("/devices", response_model=List[DeviceOut])
|
|
async def list_devices(
|
|
inst_id: Optional[int] = None,
|
|
device_type: Optional[str] = None,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(_require_ops),
|
|
):
|
|
"""네트워크 장비 목록."""
|
|
q = select(NetworkDevice).where(NetworkDevice.is_active == True)
|
|
if inst_id:
|
|
q = q.where(NetworkDevice.inst_id == inst_id)
|
|
if device_type:
|
|
q = q.where(NetworkDevice.device_type == device_type)
|
|
result = await db.execute(q.order_by(NetworkDevice.device_name))
|
|
return result.scalars().all()
|
|
|
|
|
|
@router.post("/devices", response_model=DeviceOut, status_code=201)
|
|
async def create_device(
|
|
body: DeviceCreate,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(require_admin_role),
|
|
):
|
|
"""네트워크 장비 등록 (ADMIN 전용). 비밀번호는 AES-256-GCM 암호화 저장."""
|
|
from core.ssh_exec import _encrypt_password
|
|
|
|
try:
|
|
pw_enc = _encrypt_password(body.ssh_password)
|
|
except Exception:
|
|
raise HTTPException(500, "자격증명 암호화 실패")
|
|
|
|
device = NetworkDevice(
|
|
device_name=body.device_name,
|
|
device_type=body.device_type,
|
|
vendor=body.vendor,
|
|
model=body.model,
|
|
os_type=body.os_type,
|
|
ip_addr=body.ip_addr,
|
|
ssh_user=body.ssh_user,
|
|
ssh_pw_enc=pw_enc,
|
|
ssh_port=body.ssh_port,
|
|
location=body.location,
|
|
inst_id=body.inst_id,
|
|
)
|
|
db.add(device)
|
|
await db.commit()
|
|
await db.refresh(device)
|
|
return device
|
|
|
|
|
|
@router.get("/devices/{device_id}", response_model=DeviceOut)
|
|
async def get_device(
|
|
device_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(_require_ops),
|
|
):
|
|
q = await db.execute(select(NetworkDevice).where(NetworkDevice.id == device_id))
|
|
d = q.scalar_one_or_none()
|
|
if not d:
|
|
raise HTTPException(404, "장비를 찾을 수 없습니다.")
|
|
return d
|
|
|
|
|
|
@router.put("/devices/{device_id}", response_model=DeviceOut)
|
|
async def update_device(
|
|
device_id: int,
|
|
body: DeviceCreate,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(require_admin_role),
|
|
):
|
|
from core.ssh_exec import _encrypt_password
|
|
q = await db.execute(select(NetworkDevice).where(NetworkDevice.id == device_id))
|
|
d = q.scalar_one_or_none()
|
|
if not d:
|
|
raise HTTPException(404, "장비를 찾을 수 없습니다.")
|
|
|
|
d.device_name = body.device_name
|
|
d.device_type = body.device_type
|
|
d.vendor = body.vendor
|
|
d.model = body.model
|
|
d.os_type = body.os_type
|
|
d.ip_addr = body.ip_addr
|
|
d.ssh_user = body.ssh_user
|
|
d.ssh_pw_enc = _encrypt_password(body.ssh_password)
|
|
d.ssh_port = body.ssh_port
|
|
d.location = body.location
|
|
d.inst_id = body.inst_id
|
|
await db.commit()
|
|
await db.refresh(d)
|
|
return d
|
|
|
|
|
|
@router.delete("/devices/{device_id}", status_code=204)
|
|
async def deactivate_device(
|
|
device_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(require_admin_role),
|
|
):
|
|
"""장비 비활성화 (삭제 아님)."""
|
|
q = await db.execute(select(NetworkDevice).where(NetworkDevice.id == device_id))
|
|
d = q.scalar_one_or_none()
|
|
if not d:
|
|
raise HTTPException(404, "장비를 찾을 수 없습니다.")
|
|
d.is_active = False
|
|
await db.commit()
|
|
|
|
|
|
@router.post("/devices/{device_id}/backup")
|
|
async def backup_device_config(
|
|
device_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(_require_ops),
|
|
):
|
|
"""장비 설정 백업 실행. 이전 설정과 diff 비교 결과 포함."""
|
|
from core.network_scanner import NetworkScanner
|
|
result = await NetworkScanner().backup_config(
|
|
db, device_id, backup_type="MANUAL", backed_up_by=current_user.username
|
|
)
|
|
if not result["success"]:
|
|
raise HTTPException(400, result.get("error", "백업 실패"))
|
|
return result
|
|
|
|
|
|
@router.get("/devices/{device_id}/backups", response_model=List[BackupOut])
|
|
async def list_device_backups(
|
|
device_id: int,
|
|
limit: int = 20,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(_require_ops),
|
|
):
|
|
"""백업 이력 목록 (설정 내용 제외)."""
|
|
q = await db.execute(
|
|
select(NetworkConfigBackup)
|
|
.where(NetworkConfigBackup.device_id == device_id)
|
|
.order_by(desc(NetworkConfigBackup.backed_up_at))
|
|
.limit(limit)
|
|
)
|
|
return q.scalars().all()
|
|
|
|
|
|
@router.get("/devices/{device_id}/diff")
|
|
async def get_config_diff(
|
|
device_id: int,
|
|
old_id: Optional[int] = None,
|
|
new_id: Optional[int] = None,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(_require_ops),
|
|
):
|
|
"""설정 변경 비교. 파라미터 없으면 최근 2개 비교."""
|
|
from core.network_scanner import NetworkScanner
|
|
result = await NetworkScanner().get_config_diff(db, device_id, old_id, new_id)
|
|
if not result["success"]:
|
|
raise HTTPException(400, result.get("error", "비교 실패"))
|
|
return result
|
|
|
|
|
|
@router.post("/devices/{device_id}/command")
|
|
async def execute_device_command(
|
|
device_id: int,
|
|
body: CommandRequest,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(_require_ops),
|
|
):
|
|
"""SSH 명령 실행 (안전 명령만 허용)."""
|
|
from core.network_scanner import NetworkScanner
|
|
from core.ssh_exec import _decrypt_password
|
|
|
|
q = await db.execute(select(NetworkDevice).where(NetworkDevice.id == device_id,
|
|
NetworkDevice.is_active == True))
|
|
device = q.scalar_one_or_none()
|
|
if not device:
|
|
raise HTTPException(404, "장비를 찾을 수 없습니다.")
|
|
|
|
scanner = NetworkScanner()
|
|
if not scanner.is_command_safe(body.command):
|
|
raise HTTPException(400, "허용되지 않는 명령어입니다.")
|
|
|
|
try:
|
|
pw = _decrypt_password(device.ssh_pw_enc)
|
|
except Exception:
|
|
raise HTTPException(500, "자격증명 복호화 실패")
|
|
|
|
result = await scanner.execute_command(
|
|
device.ip_addr, device.ssh_user, pw,
|
|
device.ssh_port or 22, body.command, body.timeout
|
|
)
|
|
return {
|
|
"device_name": device.device_name,
|
|
"command": body.command,
|
|
"success": result["success"],
|
|
"stdout": result["stdout"][:5000], # 최대 5000자
|
|
"stderr": result["stderr"][:500],
|
|
"exit_code": result["exit_code"],
|
|
}
|
|
|
|
|
|
@router.get("/topology")
|
|
async def get_topology(
|
|
inst_id: Optional[int] = None,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(_require_ops),
|
|
):
|
|
"""네트워크 토폴로지 (장비 목록 + 타입별 분류)."""
|
|
q = select(NetworkDevice).where(NetworkDevice.is_active == True)
|
|
if inst_id:
|
|
q = q.where(NetworkDevice.inst_id == inst_id)
|
|
result = await db.execute(q)
|
|
devices = result.scalars().all()
|
|
|
|
topology: dict = {"nodes": [], "by_type": {}}
|
|
for d in devices:
|
|
node = {
|
|
"id": d.id,
|
|
"name": d.device_name,
|
|
"type": d.device_type,
|
|
"vendor": d.vendor,
|
|
"location": d.location,
|
|
"inst_id": d.inst_id,
|
|
"last_backup_at": d.last_backup_at.isoformat() if d.last_backup_at else None,
|
|
}
|
|
topology["nodes"].append(node)
|
|
topology["by_type"].setdefault(d.device_type, []).append(node)
|
|
|
|
return {
|
|
"total": len(devices),
|
|
"topology": topology,
|
|
}
|