## 구현 내용 ### DR 자동화 (routers/dr.py, core/dr_engine.py) - DR 시나리오 등록/관리 (SERVER_FAILURE | SITE_FAILURE | DATA_CORRUPTION) - 복구 테스트 자동화 (SSH 기반 단계별 실행 + 헬스체크) - 백업 무결성 검증 (SSH → SHA-256 해시 검증) - RTO/RPO 목표 대비 실적 대시보드 - Failover 실행 API (ADMIN 전용) ### 네트워크 장비 관리 (routers/network_devices.py, core/network_scanner.py) - 스위치/라우터/방화벽/L4 장비 인벤토리 (CRUD) - 벤더별 SSH 설정 백업 (Cisco IOS / Huawei VRP / Junos / Linux) - 이전 백업과 unified diff 변경 감지 - 위험 명령어 차단 (write erase, factory-reset 등) - 토폴로지 조회 API ### CSAP 공공기관 보안 자동 점검 (routers/compliance.py 확장, core/csap_checker.py) - CSAP/ISMS-P 기반 25개 항목 자동 점검 - 기술적/운영 보안 자동 검증 (SSH, DB 직접 확인) - 수동 항목 증적 업로드 - Excel/HTML 보고서 자동 생성 - 기관별 준수율 대시보드 (A~D 등급) ### DB 모델 추가 (models.py) - DRScenario, DRTest - NetworkDevice, NetworkConfigBackup - CSAPCheckResult ### 하네스 확장 - 에이전트: dr-coordinator, network-guardian, csap-auditor - 스킬: dr-automation, network-devices, csap-compliance - guardia-orchestrator description에 DR/네트워크/CSAP 트리거 추가 ### 매뉴얼 - 39_DR_네트워크장비_CSAP_운영가이드.md 신규 작성 - 16_API_명세서.md v2.1.0 업데이트 (617개 라우트, 섹션 21~23 추가) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
321 lines
11 KiB
Python
321 lines
11 KiB
Python
"""
|
|
네트워크 장비 관리 API.
|
|
|
|
엔드포인트:
|
|
GET /api/network/devices 장비 목록
|
|
POST /api/network/devices 장비 등록 (ADMIN)
|
|
GET /api/network/devices/{id} 장비 상세
|
|
PUT /api/network/devices/{id} 장비 수정 (ADMIN)
|
|
DELETE /api/network/devices/{id} 장비 비활성화 (ADMIN)
|
|
POST /api/network/devices/{id}/backup 설정 백업 실행
|
|
GET /api/network/devices/{id}/backups 백업 이력
|
|
GET /api/network/devices/{id}/diff 설정 변경 비교
|
|
POST /api/network/devices/{id}/command SSH 명령 실행
|
|
GET /api/network/topology 네트워크 토폴로지
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import List, Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import select, desc
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.auth import get_current_user, require_admin_role
|
|
from database import get_db
|
|
from models import NetworkDevice, NetworkConfigBackup, User, UserRole
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/api/network", tags=["network"])
|
|
|
|
|
|
# ── 권한 ─────────────────────────────────────────────────────────────────────
|
|
|
|
def _require_ops(current_user: User = Depends(get_current_user)) -> User:
|
|
if current_user.role not in (UserRole.ADMIN, UserRole.PM, UserRole.ENGINEER):
|
|
raise HTTPException(403, "네트워크 관리 권한이 없습니다.")
|
|
return current_user
|
|
|
|
|
|
# ── Pydantic 스키마 ──────────────────────────────────────────────────────────
|
|
|
|
class DeviceCreate(BaseModel):
|
|
device_name: str
|
|
device_type: str # SWITCH | ROUTER | FIREWALL | LOAD_BALANCER
|
|
vendor: str # CISCO | HUAWEI | JUNIPER | PIOLINK | SECUI | RADWARE
|
|
model: Optional[str] = None
|
|
os_type: str = "cisco_ios" # cisco_ios | huawei_vrp | junos | linux
|
|
ip_addr: str # 저장용 (API 응답 미포함)
|
|
ssh_user: str # 저장용 (API 응답 미포함)
|
|
ssh_password: str # 저장 전 AES-256 암호화
|
|
ssh_port: int = 22
|
|
location: Optional[str] = None
|
|
inst_id: Optional[int] = None
|
|
|
|
|
|
class DeviceOut(BaseModel):
|
|
id: int
|
|
device_name: str
|
|
device_type: str
|
|
vendor: str
|
|
model: Optional[str]
|
|
os_type: str
|
|
# ip_addr, ssh_user, ssh_pw_enc 절대 미포함
|
|
location: Optional[str]
|
|
inst_id: Optional[int]
|
|
is_active: bool
|
|
last_backup_at: Optional[datetime]
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
class BackupOut(BaseModel):
|
|
id: int
|
|
device_id: int
|
|
config_hash: str
|
|
backup_type: str
|
|
backed_up_at: datetime
|
|
backed_up_by: Optional[str]
|
|
# config_text 미포함 (대용량)
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
class CommandRequest(BaseModel):
|
|
command: str
|
|
timeout: int = 30
|
|
|
|
|
|
# ── 엔드포인트 ───────────────────────────────────────────────────────────────
|
|
|
|
@router.get("/devices", response_model=List[DeviceOut])
|
|
async def list_devices(
|
|
inst_id: Optional[int] = None,
|
|
device_type: Optional[str] = None,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(_require_ops),
|
|
):
|
|
"""네트워크 장비 목록."""
|
|
q = select(NetworkDevice).where(NetworkDevice.is_active == True)
|
|
if inst_id:
|
|
q = q.where(NetworkDevice.inst_id == inst_id)
|
|
if device_type:
|
|
q = q.where(NetworkDevice.device_type == device_type)
|
|
result = await db.execute(q.order_by(NetworkDevice.device_name))
|
|
return result.scalars().all()
|
|
|
|
|
|
@router.post("/devices", response_model=DeviceOut, status_code=201)
|
|
async def create_device(
|
|
body: DeviceCreate,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(require_admin_role),
|
|
):
|
|
"""네트워크 장비 등록 (ADMIN 전용). 비밀번호는 AES-256-GCM 암호화 저장."""
|
|
from core.ssh_exec import _encrypt_password
|
|
|
|
try:
|
|
pw_enc = _encrypt_password(body.ssh_password)
|
|
except Exception:
|
|
raise HTTPException(500, "자격증명 암호화 실패")
|
|
|
|
device = NetworkDevice(
|
|
device_name=body.device_name,
|
|
device_type=body.device_type,
|
|
vendor=body.vendor,
|
|
model=body.model,
|
|
os_type=body.os_type,
|
|
ip_addr=body.ip_addr,
|
|
ssh_user=body.ssh_user,
|
|
ssh_pw_enc=pw_enc,
|
|
ssh_port=body.ssh_port,
|
|
location=body.location,
|
|
inst_id=body.inst_id,
|
|
)
|
|
db.add(device)
|
|
await db.commit()
|
|
await db.refresh(device)
|
|
return device
|
|
|
|
|
|
@router.get("/devices/{device_id}", response_model=DeviceOut)
|
|
async def get_device(
|
|
device_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(_require_ops),
|
|
):
|
|
q = await db.execute(select(NetworkDevice).where(NetworkDevice.id == device_id))
|
|
d = q.scalar_one_or_none()
|
|
if not d:
|
|
raise HTTPException(404, "장비를 찾을 수 없습니다.")
|
|
return d
|
|
|
|
|
|
@router.put("/devices/{device_id}", response_model=DeviceOut)
|
|
async def update_device(
|
|
device_id: int,
|
|
body: DeviceCreate,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(require_admin_role),
|
|
):
|
|
from core.ssh_exec import _encrypt_password
|
|
q = await db.execute(select(NetworkDevice).where(NetworkDevice.id == device_id))
|
|
d = q.scalar_one_or_none()
|
|
if not d:
|
|
raise HTTPException(404, "장비를 찾을 수 없습니다.")
|
|
|
|
d.device_name = body.device_name
|
|
d.device_type = body.device_type
|
|
d.vendor = body.vendor
|
|
d.model = body.model
|
|
d.os_type = body.os_type
|
|
d.ip_addr = body.ip_addr
|
|
d.ssh_user = body.ssh_user
|
|
d.ssh_pw_enc = _encrypt_password(body.ssh_password)
|
|
d.ssh_port = body.ssh_port
|
|
d.location = body.location
|
|
d.inst_id = body.inst_id
|
|
await db.commit()
|
|
await db.refresh(d)
|
|
return d
|
|
|
|
|
|
@router.delete("/devices/{device_id}", status_code=204)
|
|
async def deactivate_device(
|
|
device_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(require_admin_role),
|
|
):
|
|
"""장비 비활성화 (삭제 아님)."""
|
|
q = await db.execute(select(NetworkDevice).where(NetworkDevice.id == device_id))
|
|
d = q.scalar_one_or_none()
|
|
if not d:
|
|
raise HTTPException(404, "장비를 찾을 수 없습니다.")
|
|
d.is_active = False
|
|
await db.commit()
|
|
|
|
|
|
@router.post("/devices/{device_id}/backup")
|
|
async def backup_device_config(
|
|
device_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(_require_ops),
|
|
):
|
|
"""장비 설정 백업 실행. 이전 설정과 diff 비교 결과 포함."""
|
|
from core.network_scanner import NetworkScanner
|
|
result = await NetworkScanner().backup_config(
|
|
db, device_id, backup_type="MANUAL", backed_up_by=current_user.username
|
|
)
|
|
if not result["success"]:
|
|
raise HTTPException(400, result.get("error", "백업 실패"))
|
|
return result
|
|
|
|
|
|
@router.get("/devices/{device_id}/backups", response_model=List[BackupOut])
|
|
async def list_device_backups(
|
|
device_id: int,
|
|
limit: int = 20,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(_require_ops),
|
|
):
|
|
"""백업 이력 목록 (설정 내용 제외)."""
|
|
q = await db.execute(
|
|
select(NetworkConfigBackup)
|
|
.where(NetworkConfigBackup.device_id == device_id)
|
|
.order_by(desc(NetworkConfigBackup.backed_up_at))
|
|
.limit(limit)
|
|
)
|
|
return q.scalars().all()
|
|
|
|
|
|
@router.get("/devices/{device_id}/diff")
|
|
async def get_config_diff(
|
|
device_id: int,
|
|
old_id: Optional[int] = None,
|
|
new_id: Optional[int] = None,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(_require_ops),
|
|
):
|
|
"""설정 변경 비교. 파라미터 없으면 최근 2개 비교."""
|
|
from core.network_scanner import NetworkScanner
|
|
result = await NetworkScanner().get_config_diff(db, device_id, old_id, new_id)
|
|
if not result["success"]:
|
|
raise HTTPException(400, result.get("error", "비교 실패"))
|
|
return result
|
|
|
|
|
|
@router.post("/devices/{device_id}/command")
|
|
async def execute_device_command(
|
|
device_id: int,
|
|
body: CommandRequest,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(_require_ops),
|
|
):
|
|
"""SSH 명령 실행 (안전 명령만 허용)."""
|
|
from core.network_scanner import NetworkScanner
|
|
from core.ssh_exec import _decrypt_password
|
|
|
|
q = await db.execute(select(NetworkDevice).where(NetworkDevice.id == device_id,
|
|
NetworkDevice.is_active == True))
|
|
device = q.scalar_one_or_none()
|
|
if not device:
|
|
raise HTTPException(404, "장비를 찾을 수 없습니다.")
|
|
|
|
scanner = NetworkScanner()
|
|
if not scanner.is_command_safe(body.command):
|
|
raise HTTPException(400, "허용되지 않는 명령어입니다.")
|
|
|
|
try:
|
|
pw = _decrypt_password(device.ssh_pw_enc)
|
|
except Exception:
|
|
raise HTTPException(500, "자격증명 복호화 실패")
|
|
|
|
result = await scanner.execute_command(
|
|
device.ip_addr, device.ssh_user, pw,
|
|
device.ssh_port or 22, body.command, body.timeout
|
|
)
|
|
return {
|
|
"device_name": device.device_name,
|
|
"command": body.command,
|
|
"success": result["success"],
|
|
"stdout": result["stdout"][:5000], # 최대 5000자
|
|
"stderr": result["stderr"][:500],
|
|
"exit_code": result["exit_code"],
|
|
}
|
|
|
|
|
|
@router.get("/topology")
|
|
async def get_topology(
|
|
inst_id: Optional[int] = None,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(_require_ops),
|
|
):
|
|
"""네트워크 토폴로지 (장비 목록 + 타입별 분류)."""
|
|
q = select(NetworkDevice).where(NetworkDevice.is_active == True)
|
|
if inst_id:
|
|
q = q.where(NetworkDevice.inst_id == inst_id)
|
|
result = await db.execute(q)
|
|
devices = result.scalars().all()
|
|
|
|
topology: dict = {"nodes": [], "by_type": {}}
|
|
for d in devices:
|
|
node = {
|
|
"id": d.id,
|
|
"name": d.device_name,
|
|
"type": d.device_type,
|
|
"vendor": d.vendor,
|
|
"location": d.location,
|
|
"inst_id": d.inst_id,
|
|
"last_backup_at": d.last_backup_at.isoformat() if d.last_backup_at else None,
|
|
}
|
|
topology["nodes"].append(node)
|
|
topology["by_type"].setdefault(d.device_type, []).append(node)
|
|
|
|
return {
|
|
"total": len(devices),
|
|
"topology": topology,
|
|
}
|