zioinfo-mail/workspace/guardia-itsm/routers/drift_detection.py
DESKTOP-TKLFCPR\ython b8faec44e0 feat(advanced): GUARDiA 고급 확장 구현 — 20 routers + 754 endpoints
CMDB 자동 발견 (4개):
- autodiscovery.py: SSH 네트워크 스캔 + CMDB 자동 등록
- snmp_discovery.py: SNMP v2c/v3 장비 자동 발견
- dependency_map.py: 서비스 의존성 자동 매핑 (netstat)
- config_inventory.py: 서버 인벤토리 자동 수집 (SSH)

NL 쿼리 엔진 (3개):
- nlquery.py: Text-to-SQL (SELECT 전용, DML 차단)
- op_assistant.py: Multi-turn 대화형 운영 어시스턴트
- query_history.py: 쿼리 이력·즐겨찾기·공유

구성 드리프트 (3개):
- drift_detection.py: 골든 구성 vs 실제 비교·SR 자동 생성
- golden_config.py: 내장 CSAP 템플릿 + 버전 관리
- auto_remediation.py: 승인 기반 자동 교정 + 롤백

멀티클라우드 (4개):
- multicloud.py: 통합 관제 (NCloud+AWS+KT)
- aws_connector.py: AWS SigV4 직접 서명 연동
- cost_optimizer.py: AI 비용 최적화 권고
- cloud_migration.py: On-prem→K-Cloud 체크리스트

공공기관 특화 (6개):
- narasajang.py: 나라장터 OpenAPI 연동
- public_api_hub.py: data.go.kr KISA·기상청 허브
- isp_support.py: ISP 수립 지원 + AI 보고서
- network_zone.py: 행정망/인터넷망 분리 관리
- k_cloud.py: 정부 K-Cloud 전환 자동화
- e_procurement.py: 전자조달 계약·검수·납품

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-02 14:33:41 +09:00

277 lines
9.5 KiB
Python

"""
구성 드리프트 감지 + 자동 교정
골든 구성과 실제 서버 환경을 비교하여 이탈(드리프트) 감지.
드리프트 발견 시 SR 자동 생성 + 승인 기반 자동 교정.
엔드포인트:
POST /api/drift/scan/{server_id} — 단일 서버 드리프트 스캔
POST /api/drift/scan-all — 전체 서버 스캔
GET /api/drift/results — 드리프트 결과 목록
GET /api/drift/results/{server_id} — 서버별 드리프트 상세
GET /api/drift/summary — 전체 준수율 요약
POST /api/drift/remediate/{result_id} — 자동 교정 요청 (승인 필요)
"""
from __future__ import annotations
import json
import logging
import re
from datetime import datetime
from typing import Optional
import paramiko
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
from sqlalchemy import select, func, desc
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user, require_admin_role
from core.ssh_exec import _decrypt_password as decrypt_password
from database import get_db
from models import (
User, Server, GoldenConfig, DriftResult,
SRRequest, SRStatus, AutoRemediationJob,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/drift", tags=["구성 드리프트"])
async def _check_item(server: Server, item: dict) -> dict:
"""단일 구성 항목 체크."""
try:
pw = decrypt_password(server.os_pw_enc)
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(server.ip_addr, username=server.ssh_user, password=pw, timeout=8)
_, stdout, _ = ssh.exec_command(item["cmd"], timeout=8)
actual = stdout.read().decode('utf-8', 'replace').strip()
ssh.close()
# 기대값 비교
expected = item.get("expected")
expected_regex = item.get("expected_regex")
expected_contains = item.get("expected_contains")
expected_not_contains = item.get("expected_not_contains")
compliant = True
if expected is not None:
compliant = actual == expected
elif expected_regex:
compliant = bool(re.search(expected_regex, actual, re.IGNORECASE))
elif expected_contains:
compliant = expected_contains.lower() in actual.lower()
elif expected_not_contains:
compliant = expected_not_contains.lower() not in actual.lower()
return {
"key": item["key"],
"description": item.get("description", ""),
"severity": item.get("severity", "MEDIUM"),
"compliant": compliant,
"actual": actual[:200],
"expected": expected or expected_regex or expected_contains or "",
"auto_fix": item.get("auto_fix"),
}
except Exception as e:
return {
"key": item["key"],
"description": item.get("description", ""),
"severity": item.get("severity", "MEDIUM"),
"compliant": None, # 체크 불가
"actual": f"ERROR: {str(e)[:100]}",
"expected": "",
"auto_fix": None,
}
async def _do_scan(server_id: int, config_id: Optional[int], db: AsyncSession):
"""단일 서버 드리프트 스캔 (백그라운드)."""
srv_row = await db.execute(select(Server).where(Server.id == server_id))
server = srv_row.scalar_one_or_none()
if not server:
return
# 골든 구성 선택 (지정 없으면 서버 유형으로 자동 선택)
if config_id:
cfg_row = await db.execute(select(GoldenConfig).where(GoldenConfig.id == config_id))
else:
cfg_row = await db.execute(
select(GoldenConfig).where(
GoldenConfig.is_active == True,
).limit(1)
)
config = cfg_row.scalar_one_or_none()
if not config:
return
items = json.loads(config.items_json or "[]")
results = []
for item in items:
result = await _check_item(server, item)
results.append(result)
non_compliant = [r for r in results if r["compliant"] is False]
total = len(results)
compliant_count = sum(1 for r in results if r["compliant"] is True)
drift = DriftResult(
server_id=server_id,
config_id=config.id,
total_checks=total,
compliant_count=compliant_count,
non_compliant_count=len(non_compliant),
compliance_pct=round(compliant_count / total * 100, 1) if total else 0,
results_json=json.dumps(results, ensure_ascii=False),
scanned_at=datetime.utcnow(),
)
db.add(drift)
# 드리프트 발견 시 SR 자동 생성
if non_compliant:
high_sev = [r for r in non_compliant if r["severity"] == "HIGH"]
priority = "HIGH" if high_sev else "MEDIUM"
sr = SRRequest(
title=f"[드리프트] {server.hostname}: {len(non_compliant)}개 구성 이탈",
description=f"골든 구성 '{config.name}' 대비 이탈 항목:\n" + "\n".join(
f"- [{r['severity']}] {r['description']}: 실제={r['actual'][:50]}"
for r in non_compliant[:5]
),
category="CONFIG_DRIFT",
priority=priority,
status=SRStatus.OPEN,
created_at=datetime.utcnow(),
)
db.add(sr)
await db.commit()
logger.info(f"서버 {server_id} 드리프트 스캔 완료: {len(non_compliant)}/{total} 이탈")
@router.post("/scan/{server_id}")
async def scan_server(
server_id: int,
config_id: Optional[int] = None,
background_tasks: BackgroundTasks = ...,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
background_tasks.add_task(_do_scan, server_id, config_id, db)
return {"ok": True, "server_id": server_id, "queued": True}
@router.post("/scan-all")
async def scan_all_servers(
config_id: Optional[int] = None,
background_tasks: BackgroundTasks = ...,
db: AsyncSession = Depends(get_db),
user: User = Depends(require_admin_role),
):
rows = await db.execute(select(Server).limit(100))
servers = rows.scalars().all()
for s in servers:
background_tasks.add_task(_do_scan, s.id, config_id, db)
return {"ok": True, "queued": len(servers)}
@router.get("/results")
async def list_drift_results(
limit: int = 50,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
rows = await db.execute(
select(DriftResult, Server.hostname, Server.ip_addr).join(
Server, DriftResult.server_id == Server.id
).order_by(desc(DriftResult.scanned_at)).limit(limit)
)
return [
{
"id": r.DriftResult.id,
"server": r.hostname, "ip": r.ip_addr,
"compliance_pct": r.DriftResult.compliance_pct,
"non_compliant": r.DriftResult.non_compliant_count,
"total": r.DriftResult.total_checks,
"scanned_at": r.DriftResult.scanned_at,
}
for r in rows.all()
]
@router.get("/results/{server_id}")
async def get_server_drift(
server_id: int,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
row = await db.execute(
select(DriftResult).where(DriftResult.server_id == server_id)
.order_by(desc(DriftResult.scanned_at)).limit(1)
)
result = row.scalar_one_or_none()
if not result:
raise HTTPException(404, "스캔 결과 없음 — 먼저 스캔하세요")
return {
"id": result.id,
"compliance_pct": result.compliance_pct,
"non_compliant": result.non_compliant_count,
"total": result.total_checks,
"items": json.loads(result.results_json or "[]"),
"scanned_at": result.scanned_at,
}
@router.get("/summary")
async def drift_summary(
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
avg_row = await db.execute(
select(func.avg(DriftResult.compliance_pct)).where(
DriftResult.compliance_pct.isnot(None)
)
)
total_row = await db.execute(select(func.count(DriftResult.id)))
critical_row = await db.execute(
select(func.count(DriftResult.id)).where(DriftResult.compliance_pct < 70)
)
return {
"avg_compliance_pct": round(avg_row.scalar() or 0, 1),
"total_scanned": total_row.scalar() or 0,
"critical_servers": critical_row.scalar() or 0,
"status": "CRITICAL" if (avg_row.scalar() or 100) < 70 else "WARNING" if (avg_row.scalar() or 100) < 90 else "GOOD",
}
@router.post("/remediate/{result_id}")
async def request_remediation(
result_id: int,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
"""자동 교정 요청 — 관리자 승인 필요."""
row = await db.execute(select(DriftResult).where(DriftResult.id == result_id))
result = row.scalar_one_or_none()
if not result:
raise HTTPException(404)
items = json.loads(result.results_json or "[]")
fixable = [i for i in items if not i.get("compliant") and i.get("auto_fix")]
jobs_created = 0
for item in fixable:
job = AutoRemediationJob(
drift_result_id=result_id,
server_id=result.server_id,
item_key=item["key"],
fix_cmd=item["auto_fix"],
status="PENDING_APPROVAL",
requested_by=user.id,
created_at=datetime.utcnow(),
)
db.add(job)
jobs_created += 1
await db.commit()
return {"ok": True, "jobs_created": jobs_created, "status": "PENDING_APPROVAL"}