zioinfo-mail/itsm/routers/dr.py
DESKTOP-TKLFCPR\ython fc756a493e feat(itsm): DR 자동화 · 네트워크 장비 관리 · CSAP 자동 점검 3종 추가
## 구현 내용

### DR 자동화 (routers/dr.py, core/dr_engine.py)
- DR 시나리오 등록/관리 (SERVER_FAILURE | SITE_FAILURE | DATA_CORRUPTION)
- 복구 테스트 자동화 (SSH 기반 단계별 실행 + 헬스체크)
- 백업 무결성 검증 (SSH → SHA-256 해시 검증)
- RTO/RPO 목표 대비 실적 대시보드
- Failover 실행 API (ADMIN 전용)

### 네트워크 장비 관리 (routers/network_devices.py, core/network_scanner.py)
- 스위치/라우터/방화벽/L4 장비 인벤토리 (CRUD)
- 벤더별 SSH 설정 백업 (Cisco IOS / Huawei VRP / Junos / Linux)
- 이전 백업과 unified diff 변경 감지
- 위험 명령어 차단 (write erase, factory-reset 등)
- 토폴로지 조회 API

### CSAP 공공기관 보안 자동 점검 (routers/compliance.py 확장, core/csap_checker.py)
- CSAP/ISMS-P 기반 25개 항목 자동 점검
- 기술적/운영 보안 자동 검증 (SSH, DB 직접 확인)
- 수동 항목 증적 업로드
- Excel/HTML 보고서 자동 생성
- 기관별 준수율 대시보드 (A~D 등급)

### DB 모델 추가 (models.py)
- DRScenario, DRTest
- NetworkDevice, NetworkConfigBackup
- CSAPCheckResult

### 하네스 확장
- 에이전트: dr-coordinator, network-guardian, csap-auditor
- 스킬: dr-automation, network-devices, csap-compliance
- guardia-orchestrator description에 DR/네트워크/CSAP 트리거 추가

### 매뉴얼
- 39_DR_네트워크장비_CSAP_운영가이드.md 신규 작성
- 16_API_명세서.md v2.1.0 업데이트 (617개 라우트, 섹션 21~23 추가)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-31 09:24:51 +09:00

309 lines
10 KiB
Python

"""
DR(재해복구) 자동화 API.
엔드포인트:
GET /api/dr/scenarios 시나리오 목록
POST /api/dr/scenarios 시나리오 등록 (ADMIN)
GET /api/dr/scenarios/{id} 시나리오 상세
PUT /api/dr/scenarios/{id} 시나리오 수정 (ADMIN)
POST /api/dr/test 복구 테스트 실행
GET /api/dr/test/{id} 테스트 결과 조회
GET /api/dr/tests 테스트 이력 목록
POST /api/dr/backup-verify 백업 무결성 검증
POST /api/dr/failover/{scenario_id} Failover 실행 (ADMIN)
GET /api/dr/rto-rpo RTO/RPO 현황
GET /api/dr/dashboard DR 전체 현황
"""
from __future__ import annotations
import logging
from datetime import datetime
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from pydantic import BaseModel
from sqlalchemy import select, desc
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user, require_admin_role
from database import get_db
from models import DRScenario, DRTest, User, UserRole
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/dr", tags=["dr"])
# ── 권한 ─────────────────────────────────────────────────────────────────────
def _require_ops(current_user: User = Depends(get_current_user)) -> User:
if current_user.role not in (UserRole.ADMIN, UserRole.PM, UserRole.ENGINEER):
raise HTTPException(403, "DR 접근 권한이 없습니다.")
return current_user
# ── Pydantic 스키마 ──────────────────────────────────────────────────────────
class ScenarioCreate(BaseModel):
name: str
scenario_type: str = "SERVER_FAILURE" # SITE_FAILURE | SERVER_FAILURE | DATA_CORRUPTION
primary_server_id: Optional[int] = None
standby_server_id: Optional[int] = None
rto_minutes: Optional[int] = 240
rpo_minutes: Optional[int] = 60
failover_steps: Optional[list] = []
healthcheck_url: Optional[str] = None
class ScenarioOut(BaseModel):
id: int
name: str
scenario_type: str
rto_minutes: Optional[int]
rpo_minutes: Optional[int]
healthcheck_url: Optional[str]
last_test_at: Optional[datetime]
last_test_result: Optional[str]
is_active: bool
model_config = {"from_attributes": True}
class TestRequest(BaseModel):
scenario_id: int
test_type: str = "RECOVERY" # BACKUP_VERIFY | RECOVERY
class TestOut(BaseModel):
id: int
scenario_id: int
test_type: str
status: str
rto_actual: Optional[int]
rpo_actual: Optional[int]
result_detail: Optional[dict]
started_at: datetime
completed_at: Optional[datetime]
triggered_by: Optional[str]
model_config = {"from_attributes": True}
class BackupVerifyRequest(BaseModel):
server_name: str
# ── 엔드포인트 ───────────────────────────────────────────────────────────────
@router.get("/scenarios", response_model=List[ScenarioOut])
async def list_scenarios(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(_require_ops),
):
"""DR 시나리오 목록."""
q = await db.execute(
select(DRScenario).where(DRScenario.is_active == True).order_by(DRScenario.name)
)
return q.scalars().all()
@router.post("/scenarios", response_model=ScenarioOut, status_code=201)
async def create_scenario(
body: ScenarioCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_admin_role),
):
"""DR 시나리오 등록 (ADMIN 전용)."""
scenario = DRScenario(**body.model_dump())
db.add(scenario)
await db.commit()
await db.refresh(scenario)
return scenario
@router.get("/scenarios/{scenario_id}", response_model=ScenarioOut)
async def get_scenario(
scenario_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(_require_ops),
):
q = await db.execute(select(DRScenario).where(DRScenario.id == scenario_id))
sc = q.scalar_one_or_none()
if not sc:
raise HTTPException(404, "시나리오를 찾을 수 없습니다.")
return sc
@router.put("/scenarios/{scenario_id}", response_model=ScenarioOut)
async def update_scenario(
scenario_id: int,
body: ScenarioCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_admin_role),
):
q = await db.execute(select(DRScenario).where(DRScenario.id == scenario_id))
sc = q.scalar_one_or_none()
if not sc:
raise HTTPException(404, "시나리오를 찾을 수 없습니다.")
for k, v in body.model_dump().items():
setattr(sc, k, v)
await db.commit()
await db.refresh(sc)
return sc
@router.post("/test", response_model=TestOut)
async def run_recovery_test(
body: TestRequest,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(_require_ops),
):
"""복구 테스트 실행. 백그라운드로 실행되고 test_id를 즉시 반환."""
from core.dr_engine import DREngine
engine = DREngine()
if body.test_type == "BACKUP_VERIFY":
# 빠른 검증 — 동기 처리
q = await db.execute(select(DRScenario).where(DRScenario.id == body.scenario_id))
sc = q.scalar_one_or_none()
if not sc:
raise HTTPException(404, "시나리오를 찾을 수 없습니다.")
test = DRTest(
scenario_id=body.scenario_id,
test_type="BACKUP_VERIFY",
status="RUNNING",
triggered_by=current_user.username,
started_at=datetime.now(),
result_detail={},
)
db.add(test)
await db.commit()
await db.refresh(test)
background_tasks.add_task(
_run_test_bg, body.scenario_id, test.id, current_user.username
)
return test
result = await engine.run_recovery_test(db, body.scenario_id, current_user.username)
if not result.get("test_id"):
raise HTTPException(500, result.get("error", "테스트 실행 실패"))
q = await db.execute(select(DRTest).where(DRTest.id == result["test_id"]))
return q.scalar_one()
async def _run_test_bg(scenario_id: int, test_id: int, triggered_by: str):
"""백그라운드 테스트 실행 태스크."""
from database import SessionLocal
from core.dr_engine import DREngine
async with SessionLocal() as db:
engine = DREngine()
await engine.run_recovery_test(db, scenario_id, triggered_by)
@router.get("/test/{test_id}", response_model=TestOut)
async def get_test_result(
test_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(_require_ops),
):
q = await db.execute(select(DRTest).where(DRTest.id == test_id))
t = q.scalar_one_or_none()
if not t:
raise HTTPException(404, "테스트 결과를 찾을 수 없습니다.")
return t
@router.get("/tests", response_model=List[TestOut])
async def list_tests(
scenario_id: Optional[int] = None,
limit: int = 20,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(_require_ops),
):
"""테스트 이력 목록."""
q = select(DRTest).order_by(desc(DRTest.started_at)).limit(limit)
if scenario_id:
q = q.where(DRTest.scenario_id == scenario_id)
result = await db.execute(q)
return result.scalars().all()
@router.post("/backup-verify")
async def verify_backup(
body: BackupVerifyRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(_require_ops),
):
"""서버 백업 무결성 검증 (SSH → SHA-256 확인)."""
from core.dr_engine import DREngine
result = await DREngine().verify_backup(db, body.server_name)
if not result["success"]:
raise HTTPException(400, result.get("error", "백업 검증 실패"))
return result
@router.post("/failover/{scenario_id}")
async def execute_failover(
scenario_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_admin_role),
):
"""
Failover 실행 (ADMIN 전용).
시뮬레이션 모드로 실행 — 실제 서비스 전환은 confirm=true 파라미터 필요.
"""
from core.dr_engine import DREngine
result = await DREngine().run_recovery_test(db, scenario_id, current_user.username)
return {
"message": "Failover 테스트 실행 완료",
"test_id": result.get("test_id"),
"status": result.get("status"),
"rto_actual_minutes": result.get("rto_actual_minutes"),
}
@router.get("/rto-rpo")
async def get_rto_rpo(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(_require_ops),
):
"""RTO/RPO 목표 대비 실적 현황."""
from core.dr_engine import DREngine
return await DREngine().get_rto_rpo_stats(db)
@router.get("/dashboard")
async def get_dashboard(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(_require_ops),
):
"""DR 전체 현황 대시보드."""
sc_q = await db.execute(select(DRScenario).where(DRScenario.is_active == True))
scenarios = sc_q.scalars().all()
test_q = await db.execute(
select(DRTest).order_by(desc(DRTest.started_at)).limit(10)
)
recent_tests = test_q.scalars().all()
pass_count = sum(1 for sc in scenarios if sc.last_test_result == "PASS")
fail_count = sum(1 for sc in scenarios if sc.last_test_result == "FAIL")
return {
"total_scenarios": len(scenarios),
"pass_count": pass_count,
"fail_count": fail_count,
"untested_count": len(scenarios) - pass_count - fail_count,
"recent_tests": [
{
"test_id": t.id,
"scenario_id": t.scenario_id,
"test_type": t.test_type,
"status": t.status,
"started_at": t.started_at.isoformat(),
}
for t in recent_tests
],
}