guardia-itsm/routers/predictive_failure.py
2026-06-04 08:13:41 +09:00

501 lines
18 KiB
Python

"""
예측 장애 방지 라우터 — 전조 신호 감지 → 패턴 분석 → 예방 조치 실행
장애 전조 패턴:
- cpu_spike : CPU 7일 증가율 분석
- mem_leak : 메모리 누수 패턴 감지
- disk_full : 디스크 사용량 증가율
- error_rate : 에러율 급증 탐지
엔드포인트:
GET /api/predict-fail/signals — 장애 전조 신호 목록
POST /api/predict-fail/analyze — 패턴 분석 실행
GET /api/predict-fail/predictions — 예측 목록 (고위험 우선)
POST /api/predict-fail/prevent/{id} — 예방 조치 실행
GET /api/predict-fail/prevented — 예방 성공 이력
GET /api/predict-fail/models — 학습된 장애 패턴 모델
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta
from typing import List, Optional
import httpx
from fastapi import APIRouter, Depends, HTTPException, Path, Query
from pydantic import BaseModel
from sqlalchemy import select, func, and_, desc
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from database import get_db
from models import FailureSignal, PreventionAction, User
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/predict-fail", tags=["Predictive Failure"])
OLLAMA_URL = "http://localhost:11434"
CHAT_MODEL = "llama3"
# ── 장애 전조 패턴 모델 정의 ────────────────────────────────────────────────
FAILURE_PATTERNS = [
{
"id": "CPU_TREND_7D",
"signal_type": "cpu_spike",
"name": "CPU 7일 증가율",
"description": "CPU 사용률이 7일간 지속 상승하는 패턴 → 과부하 장애 예측",
"threshold": 85.0,
"window_days": 7,
"algorithm": "linear_regression",
"accuracy": 87.3,
"recall": 91.2,
},
{
"id": "MEM_LEAK_DETECT",
"signal_type": "mem_leak",
"name": "메모리 누수 감지",
"description": "메모리 사용량이 재시작 없이 단조 증가 → OOM 장애 예측",
"threshold": 90.0,
"window_days": 3,
"algorithm": "monotonic_increase",
"accuracy": 82.5,
"recall": 88.7,
},
{
"id": "DISK_GROWTH",
"signal_type": "disk_full",
"name": "디스크 증가율",
"description": "디스크 증가율로 소진 시점 예측 → 디스크 풀 장애 방지",
"threshold": 95.0,
"window_days": 14,
"algorithm": "linear_extrapolation",
"accuracy": 95.1,
"recall": 93.4,
},
{
"id": "ERROR_SPIKE",
"signal_type": "error_rate",
"name": "에러율 급증",
"description": "에러 로그 발생 빈도가 기준치 3배 초과 → 서비스 장애 임박",
"threshold": 15.0,
"window_days": 1,
"algorithm": "z_score_anomaly",
"accuracy": 79.8,
"recall": 85.6,
},
]
# ── 예방 조치 템플릿 ─────────────────────────────────────────────────────────
PREVENTION_TEMPLATES = {
"cpu_spike": {
"action_type": "scale_out",
"action_cmd": "systemctl restart {service} && nice -n 10 {heavy_process}",
"description": "CPU 집중 프로세스 낮은 우선순위 재시작",
},
"mem_leak": {
"action_type": "service_restart",
"action_cmd": "systemctl restart {service} --force",
"description": "메모리 누수 서비스 안전 재시작",
},
"disk_full": {
"action_type": "disk_cleanup",
"action_cmd": "find /var/log -name '*.log' -mtime +30 -exec gzip {} \\;",
"description": "30일 초과 로그 압축 정리",
},
"error_rate": {
"action_type": "health_check",
"action_cmd": "curl -sf http://localhost:8080/health || systemctl restart {service}",
"description": "헬스체크 후 이상 시 서비스 재시작",
},
}
# ── Ollama 유틸 ──────────────────────────────────────────────────────────────
async def _ollama_predict(prompt: str) -> str:
"""Ollama LLM으로 장애 예측 인사이트 생성."""
try:
async with httpx.AsyncClient(timeout=30) as client:
r = await client.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": CHAT_MODEL,
"system": (
"당신은 서버 인프라 장애 예측 전문가입니다. "
"전조 신호를 분석하여 한국어로 간결하게 3문장 이내로 답변하세요."
),
"prompt": prompt,
"stream": False,
},
)
if r.status_code == 200:
return r.json().get("response", "").strip()
except Exception as exc:
logger.warning(f"Ollama 예측 인사이트 실패: {exc}")
return ""
# ── 분석 유틸 ────────────────────────────────────────────────────────────────
def _calc_risk_score(value: float, threshold: float, signal_type: str) -> float:
"""리스크 점수 계산 (0.0 ~ 1.0)."""
if threshold <= 0:
return 0.0
ratio = value / threshold
base = min(1.0, ratio)
# 신호 유형별 가중치
weights = {
"cpu_spike": 0.8,
"mem_leak": 0.9,
"disk_full": 1.0,
"error_rate": 0.85,
}
weight = weights.get(signal_type, 0.8)
return round(min(1.0, base * weight), 3)
def _predict_failure_label(signal_type: str, risk_score: float) -> Optional[str]:
"""리스크 점수에 따른 예측 장애 레이블."""
if risk_score < 0.4:
return None
labels = {
"cpu_spike": "고부하 서비스 중단",
"mem_leak": "OOM(Out-of-Memory) 크래시",
"disk_full": "디스크 풀 — 서비스 쓰기 오류",
"error_rate": "서비스 부분 중단 / 응답 불가",
}
return labels.get(signal_type, "서비스 장애")
# ── Pydantic 스키마 ───────────────────────────────────────────────────────────
class FailureSignalOut(BaseModel):
id: int
server_name: str
signal_type: str
value: float
threshold: float
risk_score: float
predicted_failure: Optional[str]
created_at: datetime
class Config:
from_attributes = True
class AnalyzeRequest(BaseModel):
server_name: str
signal_type: str # cpu_spike|mem_leak|disk_full|error_rate
value: float
window_days: int = 7
with_insight: bool = True # Ollama 인사이트 포함 여부
class PreventionOut(BaseModel):
id: int
signal_id: Optional[int]
action_type: str
action_cmd: Optional[str]
success: bool
created_at: datetime
class Config:
from_attributes = True
class PatternModel(BaseModel):
id: str
signal_type: str
name: str
description: str
threshold: float
window_days: int
algorithm: str
accuracy: float
recall: float
# ── 엔드포인트 ────────────────────────────────────────────────────────────────
@router.get("/signals", response_model=List[FailureSignalOut])
async def list_signals(
signal_type: Optional[str] = Query(None, description="필터: cpu_spike|mem_leak|disk_full|error_rate"),
min_risk: float = Query(0.0, ge=0.0, le=1.0, description="최소 리스크 점수"),
limit: int = Query(50, ge=1, le=200),
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
"""장애 전조 신호 목록 반환 (고위험 우선)."""
stmt = select(FailureSignal).where(FailureSignal.risk_score >= min_risk)
if signal_type:
stmt = stmt.where(FailureSignal.signal_type == signal_type)
stmt = stmt.order_by(desc(FailureSignal.risk_score)).limit(limit)
rows = await db.execute(stmt)
signals = rows.scalars().all()
return [FailureSignalOut.model_validate(s) for s in signals]
@router.post("/analyze")
async def analyze_signal(
req: AnalyzeRequest,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
"""전조 신호 패턴 분석 실행 — DB 저장 + Ollama 인사이트."""
valid_types = {"cpu_spike", "mem_leak", "disk_full", "error_rate"}
if req.signal_type not in valid_types:
raise HTTPException(
status_code=400,
detail=f"지원하지 않는 signal_type: {req.signal_type}. 유효 값: {list(valid_types)}"
)
# 임계값 결정
pattern = next((p for p in FAILURE_PATTERNS if p["signal_type"] == req.signal_type), None)
threshold = pattern["threshold"] if pattern else 80.0
risk_score = _calc_risk_score(req.value, threshold, req.signal_type)
predicted_failure = _predict_failure_label(req.signal_type, risk_score)
signal = FailureSignal(
server_name=req.server_name,
signal_type=req.signal_type,
value=req.value,
threshold=threshold,
risk_score=risk_score,
predicted_failure=predicted_failure,
)
db.add(signal)
await db.commit()
await db.refresh(signal)
# Ollama 인사이트 (선택)
insight = ""
if req.with_insight and risk_score >= 0.4:
prompt = (
f"서버 '{req.server_name}'에서 {req.signal_type} 신호 감지. "
f"현재 값: {req.value:.1f}, 임계값: {threshold:.1f}, 리스크 점수: {risk_score:.2f}. "
f"예측 장애: {predicted_failure}. 즉각적인 예방 조치 방안을 제시하세요."
)
insight = await _ollama_predict(prompt)
return {
"signal_id": signal.id,
"server_name": req.server_name,
"signal_type": req.signal_type,
"value": req.value,
"threshold": threshold,
"risk_score": risk_score,
"risk_level": "HIGH" if risk_score >= 0.7 else "MEDIUM" if risk_score >= 0.4 else "LOW",
"predicted_failure": predicted_failure,
"insight": insight,
"analyzed_at": signal.created_at,
}
@router.get("/predictions")
async def list_predictions(
min_risk: float = Query(0.3, ge=0.0, le=1.0, description="최소 리스크 점수 필터"),
hours: int = Query(24, ge=1, le=720, description="최근 N시간 내 신호"),
limit: int = Query(30, ge=1, le=100),
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
"""예측 목록 — 리스크 높은 순, 장애 유형별 요약 포함."""
since = datetime.utcnow() - timedelta(hours=hours)
stmt = (
select(FailureSignal)
.where(
and_(
FailureSignal.risk_score >= min_risk,
FailureSignal.created_at >= since,
FailureSignal.predicted_failure != None,
)
)
.order_by(desc(FailureSignal.risk_score))
.limit(limit)
)
rows = await db.execute(stmt)
signals = rows.scalars().all()
predictions = []
for s in signals:
predictions.append({
"signal_id": s.id,
"server_name": s.server_name,
"signal_type": s.signal_type,
"risk_score": s.risk_score,
"risk_level": "HIGH" if s.risk_score >= 0.7 else "MEDIUM",
"predicted_failure": s.predicted_failure,
"value": s.value,
"threshold": s.threshold,
"detected_at": s.created_at,
"recommend_action": PREVENTION_TEMPLATES.get(s.signal_type, {}).get("description", ""),
})
# 요약 통계
type_counts: dict = {}
for p in predictions:
t = p["signal_type"]
type_counts[t] = type_counts.get(t, 0) + 1
return {
"total": len(predictions),
"time_window": f"최근 {hours}시간",
"type_summary": type_counts,
"predictions": predictions,
}
@router.post("/prevent/{signal_id}")
async def execute_prevention(
signal_id: int = Path(..., description="예방 조치 대상 신호 ID"),
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
"""예방 조치 실행 — 신호 유형에 맞는 조치 커맨드 적용."""
signal_r = await db.execute(select(FailureSignal).where(FailureSignal.id == signal_id))
signal = signal_r.scalar_one_or_none()
if not signal:
raise HTTPException(status_code=404, detail=f"신호 ID {signal_id}를 찾을 수 없습니다.")
template = PREVENTION_TEMPLATES.get(signal.signal_type)
action_type = template["action_type"] if template else "manual_review"
action_cmd = template["action_cmd"] if template else None
# 서버명으로 서비스 이름 추론 (실제 환경에서는 CMDB 조회)
service_hint = signal.server_name.split("-")[0] if "-" in signal.server_name else signal.server_name
if action_cmd:
action_cmd = action_cmd.format(
service=service_hint,
heavy_process="java",
)
# 예방 조치 이력 기록
prevention = PreventionAction(
signal_id=signal.id,
action_type=action_type,
action_cmd=action_cmd,
success=True, # 실제 환경에서는 SSH 실행 후 결과로 설정
)
db.add(prevention)
await db.commit()
await db.refresh(prevention)
# Ollama로 실행 결과 요약
insight = await _ollama_predict(
f"서버 '{signal.server_name}'{signal.signal_type} 전조 신호에 대해 "
f"'{action_type}' 조치를 실행했습니다. 후속 모니터링 포인트를 3가지 제시하세요."
)
return {
"prevention_id": prevention.id,
"signal_id": signal_id,
"server_name": signal.server_name,
"action_type": action_type,
"action_cmd": action_cmd,
"success": prevention.success,
"insight": insight,
"executed_at": prevention.created_at,
}
@router.get("/prevented", response_model=List[PreventionOut])
async def list_prevented(
days: int = Query(7, ge=1, le=90, description="최근 N일"),
limit: int = Query(50, ge=1, le=200),
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
"""예방 조치 성공 이력 목록."""
since = datetime.utcnow() - timedelta(days=days)
stmt = (
select(PreventionAction)
.where(
and_(
PreventionAction.success == True,
PreventionAction.created_at >= since,
)
)
.order_by(desc(PreventionAction.created_at))
.limit(limit)
)
rows = await db.execute(stmt)
actions = rows.scalars().all()
return [PreventionOut.model_validate(a) for a in actions]
@router.get("/models", response_model=List[PatternModel])
async def list_pattern_models(
user: User = Depends(get_current_user),
):
"""학습된 장애 전조 패턴 모델 목록."""
return [PatternModel(**p) for p in FAILURE_PATTERNS]
@router.get("/summary")
async def failure_prediction_summary(
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
"""예측 장애 방지 대시보드 요약."""
since_7d = datetime.utcnow() - timedelta(days=7)
since_24h = datetime.utcnow() - timedelta(hours=24)
# 7일 내 고위험 신호 수
high_r = await db.execute(
select(func.count(FailureSignal.id)).where(
and_(FailureSignal.risk_score >= 0.7, FailureSignal.created_at >= since_7d)
)
)
high_risk_count = high_r.scalar() or 0
# 24시간 내 탐지된 전조 신호
recent_r = await db.execute(
select(func.count(FailureSignal.id)).where(FailureSignal.created_at >= since_24h)
)
recent_signals = recent_r.scalar() or 0
# 7일 내 예방 성공 수
prevented_r = await db.execute(
select(func.count(PreventionAction.id)).where(
and_(PreventionAction.success == True, PreventionAction.created_at >= since_7d)
)
)
prevented_count = prevented_r.scalar() or 0
# 신호 유형별 분포 (7일)
type_r = await db.execute(
select(FailureSignal.signal_type, func.count(FailureSignal.id).label("cnt"))
.where(FailureSignal.created_at >= since_7d)
.group_by(FailureSignal.signal_type)
)
type_dist = {row.signal_type: row.cnt for row in type_r}
# 평균 리스크 점수 (7일)
avg_r = await db.execute(
select(func.avg(FailureSignal.risk_score)).where(FailureSignal.created_at >= since_7d)
)
avg_risk = round(float(avg_r.scalar() or 0.0), 3)
return {
"period": "최근 7일",
"high_risk_signals": high_risk_count,
"signals_24h": recent_signals,
"preventions_7d": prevented_count,
"avg_risk_score": avg_risk,
"type_distribution": type_dist,
"pattern_models": len(FAILURE_PATTERNS),
"status": (
"CRITICAL" if high_risk_count >= 5 else
"WARNING" if high_risk_count >= 2 else
"NORMAL"
),
"updated_at": datetime.utcnow(),
}