guardia-itsm/routers/predictive.py
DESKTOP-TKLFCPRython 64c27c3509 feat(itsm): G-1~G-12 확장 기능 + 하네스/봇/설치스크립트 구현
G-1: 메신저 Webhook Relay + _send_to_room 실제 httpx 호출 구현
G-2: POST /api/tasks/bulk SR 대량작업 엔드포인트 (최대 100건)
G-3: 라이선스 만료 알림 스케줄러 (매일 09:00 KST)
G-4: 체험판 upgrade_banner 필드 + license.py 배너 로직
G-5: core/auto_rca.py + incidents/problem auto-rca 엔드포인트
G-6: core/deploy_impact.py + vibe impact-analysis 엔드포인트
G-7: core/ticket_classifier.py + SR 생성 시 AI 분류 + ai-suggestion API
G-8: VulnPatchRecord 모델 + vuln_scan 패치추적 4개 엔드포인트
G-9: core/jira_sync.py + gateway Jira/Confluence 연동 엔드포인트
G-10: core/push_notify.py + routers/push.py + PushSubscription 모델
G-11: approvals 다중승인 (위임/서명/기한초과/마감연장)
G-12: alembic.ini + migrations/ + cicd/migrate_to_postgres.sh

하네스: guardia-orchestrator 확장기능 Phase 반영
봇명령어: /sr /status /license /bulk 슬래시 명령어 추가
설치스크립트: setup/ (Ubuntu, CentOS, RHEL, Windows) --test 옵션 포함

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 18:18:52 +09:00

315 lines
11 KiB
Python

"""
B-6: 예측 유지보수 API 라우터
엔드포인트:
POST /api/predictive/analyze/{source} — 단일 서버 예측 분석
POST /api/predictive/batch — 전체 서버 배치 예측
GET /api/predictive/health/{source} — 서버 종합 건강도
GET /api/predictive/lifecycle — 장비 수명 주기 분석
GET /api/predictive/lifecycle/{source} — 단일 장비 수명 평가
GET /api/predictive/thresholds — 예측 임계값 조회
PUT /api/predictive/thresholds/{metric} — 예측 임계값 수정
GET /api/predictive/stats — 예측 통계
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Body
from pydantic import BaseModel
from sqlalchemy import select, desc, func
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
from core.predictive import (
predict_metric_trend,
analyze_server_health,
run_predictive_batch,
run_lifecycle_analysis,
assess_equipment_lifecycle,
PREDICTION_THRESHOLDS,
EQUIPMENT_LIFESPAN,
linear_regression,
moving_average,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/predictive", tags=["predictive"])
# ── Pydantic 스키마 ───────────────────────────────────────────────────────────
class ThresholdUpdateRequest(BaseModel):
warning: Optional[float] = None
critical: Optional[float] = None
horizon_hours: Optional[int] = None
class BatchRunRequest(BaseModel):
auto_create_sr: bool = True
ttr_threshold_hours: float = 48.0
max_sources: int = 50
# ── 단일 서버 예측 ────────────────────────────────────────────────────────────
@router.post("/analyze/{source}")
async def analyze_source(
source: str,
metric_type: str = Query("CPU_USAGE"),
horizon_hours: int = Query(24, ge=1, le=720),
hours_back: int = Query(72, ge=6, le=720),
db: AsyncSession = Depends(get_db),
):
"""
단일 서버/소스에 대한 메트릭 트렌드 예측.
- 선형 회귀로 향후 horizon_hours 후 값 예측
- 임계값 도달 시간(TTR) 계산
"""
result = await predict_metric_trend(
db, source, metric_type.upper(),
horizon_hours=horizon_hours,
hours_back=hours_back,
)
return result
@router.get("/health/{source}")
async def server_health(
source: str,
metric_types: Optional[str] = Query(None, description="쉼표 구분 메트릭 목록"),
db: AsyncSession = Depends(get_db),
):
"""
서버 종합 건강도 분석 (여러 메트릭 종합 점수).
health_score: 0~100 (높을수록 양호)
risk_level: LOW / MEDIUM / HIGH / CRITICAL
"""
mt_list = None
if metric_types:
mt_list = [m.strip().upper() for m in metric_types.split(",") if m.strip()]
result = await analyze_server_health(db, source, mt_list)
return result
# ── 배치 예측 ────────────────────────────────────────────────────────────────
@router.post("/batch")
async def run_batch(
body: BatchRunRequest = Body(...),
db: AsyncSession = Depends(get_db),
):
"""
모든 활성 서버 예측 배치 실행.
- 최근 24시간 내 메트릭이 있는 소스 자동 감지
- TTR < threshold_hours 이면 경고 발생
- auto_create_sr=true이면 예방 SR 자동 생성
"""
result = await run_predictive_batch(
db,
auto_create_sr = body.auto_create_sr,
ttr_threshold_hours = body.ttr_threshold_hours,
max_sources = body.max_sources,
)
return result
# ── 수명 주기 분석 ────────────────────────────────────────────────────────────
@router.get("/lifecycle")
async def lifecycle_overview(
equipment_type: Optional[str] = Query(None),
max_items: int = Query(100, ge=1, le=500),
db: AsyncSession = Depends(get_db),
):
"""
CMDB 전체 장비 수명 주기 분석.
EOL / CRITICAL / WARNING / HEALTHY 분류 및 교체 권고.
"""
result = await run_lifecycle_analysis(db, equipment_type, max_items)
return result
@router.get("/lifecycle/{source}")
async def lifecycle_single(
source: str,
install_date: Optional[str] = Query(None, description="설치일 (YYYY-MM-DD)"),
equipment_type: str = Query("SERVER"),
db: AsyncSession = Depends(get_db),
):
"""단일 장비 수명 주기 평가."""
if install_date:
try:
install_dt = datetime.strptime(install_date, "%Y-%m-%d")
except ValueError:
raise HTTPException(400, "install_date 형식 오류 (YYYY-MM-DD)")
else:
# DB에서 서버 정보 조회 시도
try:
from models import Server
srv = (await db.execute(
select(Server).where(Server.hostname == source)
)).scalars().first()
if srv and hasattr(srv, "install_date") and srv.install_date:
install_dt = srv.install_date
equipment_type = getattr(srv, "server_type", equipment_type) or equipment_type
else:
# 기본값: 3년 전
install_dt = datetime.utcnow() - timedelta(days=365 * 3)
except Exception:
install_dt = datetime.utcnow() - timedelta(days=365 * 3)
result = assess_equipment_lifecycle(
equipment_type = equipment_type,
install_date = install_dt,
)
result["source"] = source
return result
# ── 임계값 관리 ───────────────────────────────────────────────────────────────
@router.get("/thresholds")
async def get_thresholds():
"""예측 임계값 조회 (런타임 설정)."""
return {
"thresholds": PREDICTION_THRESHOLDS,
"equipment_lifespan": EQUIPMENT_LIFESPAN,
}
@router.put("/thresholds/{metric_type}")
async def update_threshold(
metric_type: str,
body: ThresholdUpdateRequest,
):
"""예측 임계값 런타임 수정 (재시작 시 초기화)."""
mt = metric_type.upper()
if mt not in PREDICTION_THRESHOLDS:
raise HTTPException(404, f"알 수 없는 메트릭: {mt}")
if body.warning is not None:
PREDICTION_THRESHOLDS[mt]["warning"] = body.warning
if body.critical is not None:
PREDICTION_THRESHOLDS[mt]["critical"] = body.critical
if body.horizon_hours is not None:
PREDICTION_THRESHOLDS[mt]["horizon_hours"] = body.horizon_hours
return {
"metric_type": mt,
"updated": PREDICTION_THRESHOLDS[mt],
}
# ── 통계 ─────────────────────────────────────────────────────────────────────
@router.get("/stats")
async def get_predictive_stats(
hours: int = Query(24, ge=1, le=720),
db: AsyncSession = Depends(get_db),
):
"""예측 유지보수 통계 (데이터 소스 수, 메트릭 포인트 수 등)."""
try:
from models import MetricSnapshot, SRRequest
since = datetime.utcnow() - timedelta(hours=hours)
# 메트릭 소스 수
sources_count = (await db.execute(
select(func.count(func.distinct(MetricSnapshot.source)))
.where(MetricSnapshot.measured_at >= since)
)).scalar() or 0
# 메트릭 포인트 수
points_count = (await db.execute(
select(func.count())
.select_from(MetricSnapshot)
.where(MetricSnapshot.measured_at >= since)
)).scalar() or 0
# 예방 SR 수
pm_sr_count = (await db.execute(
select(func.count())
.select_from(SRRequest)
.where(
SRRequest.title.like("%[예방]%"),
SRRequest.created_at >= since,
)
)).scalar() or 0
except Exception as e:
logger.debug("예측 통계 조회 오류: %s", e)
sources_count = 0
points_count = 0
pm_sr_count = 0
return {
"period_hours": hours,
"monitored_sources": sources_count,
"metric_data_points": points_count,
"preventive_srs_created": pm_sr_count,
"supported_metrics": list(PREDICTION_THRESHOLDS.keys()),
"supported_equipment_types": list(EQUIPMENT_LIFESPAN.keys()),
}
# ── 시뮬레이션 / 테스트 ────────────────────────────────────────────────────────
@router.post("/simulate")
async def simulate_prediction(
metric_type: str = Query("CPU_USAGE"),
baseline: float = Query(60.0, description="기준선 값"),
slope_per_hour: float = Query(0.5, description="시간당 증가율"),
data_points: int = Query(48, ge=5, le=200),
horizon_hours: int = Query(24, ge=1, le=168),
):
"""
예측 로직 시뮬레이션 (실제 DB 없이 테스트용).
지정된 기준선과 기울기로 가상 데이터를 생성하고 예측 결과 반환.
"""
import random
random.seed(42)
# 가상 시계열 생성
x_vals = list(range(data_points))
y_vals = [
baseline + slope_per_hour * x + random.gauss(0, 2)
for x in x_vals
]
slope, intercept, r_sq = linear_regression(
[float(x) for x in x_vals],
y_vals,
)
predicted = slope * (data_points - 1 + horizon_hours) + intercept
cfg = PREDICTION_THRESHOLDS.get(metric_type.upper(), {})
warn_th = cfg.get("warning")
crit_th = cfg.get("critical")
from core.predictive import time_to_reach
current_x = float(data_points - 1)
ttr_warn = time_to_reach(slope, intercept, current_x, warn_th) if warn_th else None
ttr_crit = time_to_reach(slope, intercept, current_x, crit_th) if crit_th else None
ma = moving_average(y_vals, 5)
return {
"simulation": True,
"metric_type": metric_type.upper(),
"baseline": baseline,
"input_slope": slope_per_hour,
"detected_slope": round(slope, 4),
"r_squared": round(r_sq, 4),
"current_value": round(y_vals[-1], 2),
"predicted_value": round(predicted, 2),
"horizon_hours": horizon_hours,
"ttr_warning_hours": round(ttr_warn, 1) if ttr_warn else None,
"ttr_critical_hours": round(ttr_crit, 1) if ttr_crit else None,
"moving_avg_last5": [round(v, 2) for v in ma[-5:]],
"data_points": data_points,
}