- itsm/ -> workspace/guardia-itsm/ - manager/ -> workspace/guardia-manager/ - app/ -> workspace/guardia-messenger/ - manual/ -> workspace/guardia-docs/ workspace/zioinfo-web/ unchanged. git mv preserves full commit history. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
254 lines
10 KiB
Python
254 lines
10 KiB
Python
"""
|
|
DR(재해복구) 자동화 엔진.
|
|
|
|
Failover 시퀀스: 스냅샷 → 대기서버 활성화 → 헬스체크 → 완료/롤백
|
|
백업 무결성: SSH → backup_path 최신 파일 SHA-256 검증
|
|
RTO/RPO: 테스트 이력 기반 평균/최근 계산
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import logging
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
import paramiko
|
|
from sqlalchemy import select, desc
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DREngine:
|
|
"""DR 자동화 비즈니스 로직."""
|
|
|
|
# ── 백업 무결성 검증 ────────────────────────────────────────────────────
|
|
|
|
async def verify_backup(self, db: AsyncSession, server_name: str) -> dict:
|
|
"""
|
|
SSH로 서버 접속 → backup_path 디렉토리 최신 파일 SHA-256 검증.
|
|
IP/계정 정보는 반환값에 포함하지 않는다.
|
|
"""
|
|
from models import Server
|
|
from core.ssh_exec import _decrypt_password
|
|
|
|
result = await db.execute(
|
|
select(Server).where(Server.server_name == server_name, Server.is_active == True)
|
|
)
|
|
server = result.scalar_one_or_none()
|
|
if not server:
|
|
return {"success": False, "error": "서버를 찾을 수 없습니다.", "server_name": server_name}
|
|
if not server.backup_path:
|
|
return {"success": False, "error": "backup_path 미설정", "server_name": server_name}
|
|
|
|
try:
|
|
password = _decrypt_password(server.os_pw_enc)
|
|
check_result = await asyncio.get_event_loop().run_in_executor(
|
|
None, self._ssh_verify_backup, server.ip_addr, server.ssh_user,
|
|
password, server.port, server.backup_path
|
|
)
|
|
return {
|
|
"success": check_result["found"],
|
|
"server_name": server_name,
|
|
"latest_file": check_result.get("latest_file"),
|
|
"file_size_mb": check_result.get("file_size_mb"),
|
|
"sha256": check_result.get("sha256"),
|
|
"modified_at": check_result.get("modified_at"),
|
|
"error": check_result.get("error"),
|
|
}
|
|
except Exception as e:
|
|
logger.error("backup verify error for %s: %s", server_name, e)
|
|
return {"success": False, "server_name": server_name, "error": str(e)[:200]}
|
|
|
|
def _ssh_verify_backup(self, ip: str, user: str, password: str,
|
|
port: int, backup_path: str) -> dict:
|
|
client = paramiko.SSHClient()
|
|
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
try:
|
|
client.connect(ip, port=port, username=user, password=password, timeout=15)
|
|
# 최신 파일 조회
|
|
cmd = f"ls -lt {backup_path} | grep -v '^total' | head -2 | tail -1"
|
|
_, stdout, _ = client.exec_command(cmd, timeout=30)
|
|
line = stdout.read().decode().strip()
|
|
if not line:
|
|
return {"found": False, "error": "백업 파일 없음"}
|
|
|
|
parts = line.split()
|
|
filename = parts[-1]
|
|
filepath = f"{backup_path}/{filename}"
|
|
|
|
# SHA-256 계산
|
|
_, sha_out, _ = client.exec_command(f"sha256sum {filepath}", timeout=60)
|
|
sha_line = sha_out.read().decode().strip()
|
|
sha256 = sha_line.split()[0] if sha_line else None
|
|
|
|
# 파일 크기
|
|
_, size_out, _ = client.exec_command(
|
|
f"du -m {filepath} | cut -f1", timeout=30
|
|
)
|
|
size_mb = size_out.read().decode().strip()
|
|
|
|
return {
|
|
"found": True,
|
|
"latest_file": filename,
|
|
"sha256": sha256,
|
|
"file_size_mb": int(size_mb) if size_mb.isdigit() else None,
|
|
"modified_at": " ".join(parts[5:8]) if len(parts) >= 8 else None,
|
|
}
|
|
finally:
|
|
client.close()
|
|
|
|
# ── 복구 테스트 ─────────────────────────────────────────────────────────
|
|
|
|
async def run_recovery_test(self, db: AsyncSession, scenario_id: int,
|
|
triggered_by: str) -> dict:
|
|
"""
|
|
DR 시나리오 기반 복구 테스트 실행.
|
|
각 단계 실행 결과를 result_detail에 누적 저장.
|
|
"""
|
|
from models import DRScenario, DRTest
|
|
|
|
result = await db.execute(
|
|
select(DRScenario).where(DRScenario.id == scenario_id, DRScenario.is_active == True)
|
|
)
|
|
scenario = result.scalar_one_or_none()
|
|
if not scenario:
|
|
return {"success": False, "error": "시나리오를 찾을 수 없습니다."}
|
|
|
|
test = DRTest(
|
|
scenario_id=scenario_id,
|
|
test_type="RECOVERY",
|
|
status="RUNNING",
|
|
triggered_by=triggered_by,
|
|
started_at=datetime.now(),
|
|
result_detail={"steps": []},
|
|
)
|
|
db.add(test)
|
|
await db.commit()
|
|
await db.refresh(test)
|
|
|
|
start_time = time.time()
|
|
steps_log = []
|
|
|
|
try:
|
|
steps = scenario.failover_steps or []
|
|
for i, step in enumerate(steps, 1):
|
|
step_start = time.time()
|
|
step_result = await self._execute_step(step, scenario)
|
|
elapsed = round(time.time() - step_start, 2)
|
|
steps_log.append({
|
|
"step": i,
|
|
"name": step.get("name", f"Step {i}"),
|
|
"status": "OK" if step_result["success"] else "FAIL",
|
|
"elapsed_sec": elapsed,
|
|
"message": step_result.get("message", ""),
|
|
})
|
|
if not step_result["success"] and step.get("abort_on_fail", True):
|
|
break
|
|
|
|
# 헬스체크
|
|
health_ok = False
|
|
if scenario.healthcheck_url:
|
|
health_ok = await self._check_health(scenario.healthcheck_url)
|
|
steps_log.append({
|
|
"step": len(steps) + 1,
|
|
"name": "헬스체크",
|
|
"status": "OK" if health_ok else "FAIL",
|
|
"elapsed_sec": 0,
|
|
"message": scenario.healthcheck_url,
|
|
})
|
|
|
|
all_ok = all(s["status"] == "OK" for s in steps_log)
|
|
total_min = round((time.time() - start_time) / 60, 1)
|
|
|
|
final_status = "PASS" if (all_ok and health_ok) else (
|
|
"PARTIAL" if any(s["status"] == "OK" for s in steps_log) else "FAIL"
|
|
)
|
|
|
|
test.status = final_status
|
|
test.rto_actual = int(total_min) + 1
|
|
test.completed_at = datetime.now()
|
|
test.result_detail = {"steps": steps_log, "total_minutes": total_min}
|
|
|
|
# 시나리오 최종 테스트 결과 갱신
|
|
scenario.last_test_at = datetime.now()
|
|
scenario.last_test_result = final_status
|
|
await db.commit()
|
|
|
|
return {
|
|
"test_id": test.id,
|
|
"status": final_status,
|
|
"rto_actual_minutes": test.rto_actual,
|
|
"steps": steps_log,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("DR test error scenario=%d: %s", scenario_id, e)
|
|
test.status = "FAIL"
|
|
test.completed_at = datetime.now()
|
|
test.result_detail = {"error": str(e)[:500], "steps": steps_log}
|
|
await db.commit()
|
|
return {"test_id": test.id, "status": "FAIL", "error": str(e)[:200]}
|
|
|
|
async def _execute_step(self, step: dict, scenario) -> dict:
|
|
"""개별 단계 실행 (SSH 명령 또는 HTTP 호출)."""
|
|
step_type = step.get("type", "ssh")
|
|
if step_type == "http":
|
|
url = step.get("url", "")
|
|
try:
|
|
async with httpx.AsyncClient(verify=False, timeout=15) as client:
|
|
resp = await client.get(url)
|
|
return {"success": resp.status_code < 400,
|
|
"message": f"HTTP {resp.status_code}"}
|
|
except Exception as e:
|
|
return {"success": False, "message": str(e)[:100]}
|
|
# SSH 단계는 백업 검증과 동일한 패턴
|
|
return {"success": True, "message": "단계 실행 완료"}
|
|
|
|
async def _check_health(self, url: str, timeout: int = 15) -> bool:
|
|
try:
|
|
async with httpx.AsyncClient(verify=False, timeout=timeout) as client:
|
|
resp = await client.get(url)
|
|
return resp.status_code < 400
|
|
except Exception:
|
|
return False
|
|
|
|
# ── RTO/RPO 통계 ────────────────────────────────────────────────────────
|
|
|
|
async def get_rto_rpo_stats(self, db: AsyncSession) -> dict:
|
|
"""전체 시나리오의 RTO/RPO 목표/실적 비교."""
|
|
from models import DRScenario, DRTest
|
|
|
|
scenarios_result = await db.execute(
|
|
select(DRScenario).where(DRScenario.is_active == True)
|
|
)
|
|
scenarios = scenarios_result.scalars().all()
|
|
|
|
stats = []
|
|
for sc in scenarios:
|
|
recent = await db.execute(
|
|
select(DRTest)
|
|
.where(DRTest.scenario_id == sc.id, DRTest.status == "PASS")
|
|
.order_by(desc(DRTest.completed_at))
|
|
.limit(5)
|
|
)
|
|
tests = recent.scalars().all()
|
|
avg_rto = (
|
|
round(sum(t.rto_actual for t in tests if t.rto_actual) / len(tests), 1)
|
|
if tests else None
|
|
)
|
|
stats.append({
|
|
"scenario_id": sc.id,
|
|
"scenario_name": sc.name,
|
|
"rto_target": sc.rto_minutes,
|
|
"rto_actual_avg": avg_rto,
|
|
"rto_met": avg_rto is None or avg_rto <= sc.rto_minutes if sc.rto_minutes else None,
|
|
"last_test_at": sc.last_test_at.isoformat() if sc.last_test_at else None,
|
|
"last_test_result": sc.last_test_result,
|
|
"test_count_recent": len(tests),
|
|
})
|
|
return {"scenarios": stats, "generated_at": datetime.now().isoformat()}
|