guardia-itsm/routers/cost_optimizer_ai.py

691 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
자율 비용 최적화 (AutonomousCostOps)
기능:
1. 비용 AI 분석 현황 조회
2. Ollama sLLM 기반 비용 분석 실행 → CostRecommendation 자동 생성
3. 비용 예측 (30/60/90일) — 선형 회귀 기반 + AI 보정
4. 최적화 권고 목록 조회
5. 권고 자동 적용 (승인 후) / 반려
6. 낭비 리소스 감지 (CPU < 10%, 메모리 < 20%, 30일 이상 SR 없는 서버)
7. 절감 실적 리포트
엔드포인트:
GET /api/cost-ai/analysis
POST /api/cost-ai/analyze
GET /api/cost-ai/forecast/{days}
GET /api/cost-ai/recommendations
POST /api/cost-ai/recommendations/{id}/apply
POST /api/cost-ai/recommendations/{id}/reject
GET /api/cost-ai/waste
GET /api/cost-ai/savings-report
"""
from __future__ import annotations
import json
import logging
import math
from datetime import datetime, timedelta
from typing import List, Optional
import httpx
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import select, func, text
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user, require_admin_role
from database import get_db
from models import (
CostAIAnalysis,
CostForecast,
CostRecommendation,
MetricSnapshot,
SRRequest,
Server,
User,
UserRole,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/cost-ai", tags=["cost-ai"])
# ── 상수 ──────────────────────────────────────────────────────────────────────
_OLLAMA_URL = "http://localhost:11434/api/generate"
_OLLAMA_MODEL = "llama3"
# 낭비 기준
_WASTE_CPU_THRESHOLD = 10.0 # CPU 7일 평균 (%)
_WASTE_MEM_THRESHOLD = 20.0 # 메모리 사용률 (%)
_WASTE_SR_DAYS = 30 # SR 미발생 일수
# 절감 단가 (만원/월) — 유형별 기본 추산
_SAVING_UNIT = {
"server": 50, # 서버 1대 유휴 절감 추산
"license": 20, # 라이선스 1건 해지
"cloud": 30, # 클라우드 리소스 최적화
}
# ── Pydantic 스키마 ───────────────────────────────────────────────────────────
class RecommendationOut(BaseModel):
id: int
category: str
title: str
description: Optional[str] = None
estimated_saving: float
risk_level: str
auto_applicable: bool
status: str
created_at: datetime
model_config = {"from_attributes": True}
class AnalysisOut(BaseModel):
id: int
period: str
total_cost: float
ai_insights: Optional[str] = None
waste_detected: Optional[str] = None
created_at: datetime
model_config = {"from_attributes": True}
class ForecastOut(BaseModel):
id: int
forecast_date: datetime
predicted_cost: float
confidence: float
factors: Optional[str] = None
created_at: datetime
model_config = {"from_attributes": True}
# ── Ollama 호출 헬퍼 ──────────────────────────────────────────────────────────
async def _call_ollama(prompt: str, timeout: float = 30.0) -> Optional[str]:
"""Ollama sLLM 호출. 실패 시 None 반환."""
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(
_OLLAMA_URL,
json={"model": _OLLAMA_MODEL, "prompt": prompt, "stream": False},
)
if resp.status_code == 200:
return resp.json().get("response", "").strip()
except Exception as exc:
logger.warning("Ollama 호출 실패: %s", exc)
return None
# ── AI 비용 분석 핵심 로직 ────────────────────────────────────────────────────
async def _collect_cost_snapshot(db: AsyncSession) -> dict:
"""FinOps 비용 기반 현황 요약을 수집한다."""
now = datetime.utcnow()
period = f"{now.year}-{now.month:02d}"
# 서버 수
server_count = (await db.execute(select(func.count(Server.id)))).scalar() or 0
# 최근 MetricSnapshot 집계 (CPU, 메모리)
# 7일치 스냅샷을 가져와 평균 계산
seven_days_ago = now - timedelta(days=7)
snapshots = (
await db.execute(
select(MetricSnapshot).where(MetricSnapshot.ts >= seven_days_ago)
)
).scalars().all()
avg_cpu = 0.0
avg_mem = 0.0
if snapshots:
avg_cpu = sum(s.cpu_pct for s in snapshots if s.cpu_pct is not None) / len(snapshots)
avg_mem = sum(s.mem_pct for s in snapshots if s.mem_pct is not None) / len(snapshots)
# 서버당 월 운영비 추산 (단순 계산: 서버 수 × 50만원)
estimated_monthly = server_count * 50.0 # 만원
return {
"period": period,
"server_count": server_count,
"avg_cpu_pct": round(avg_cpu, 1),
"avg_mem_pct": round(avg_mem, 1),
"estimated_monthly": estimated_monthly,
"snapshot_count": len(snapshots),
}
async def _detect_waste(db: AsyncSession) -> List[dict]:
"""낭비 리소스 감지 — 3가지 기준."""
now = datetime.utcnow()
seven_days_ago = now - timedelta(days=7)
thirty_days_ago = now - timedelta(days=_WASTE_SR_DAYS)
waste_items = []
# 모든 서버 조회
servers = (await db.execute(select(Server))).scalars().all()
for srv in servers:
reasons = []
# 1. CPU 7일 평균 < 10%
cpu_snaps = (
await db.execute(
select(MetricSnapshot).where(
MetricSnapshot.server_id == srv.id,
MetricSnapshot.ts >= seven_days_ago,
)
)
).scalars().all()
if cpu_snaps:
avg_cpu = sum(s.cpu_pct for s in cpu_snaps if s.cpu_pct is not None) / len(cpu_snaps)
if avg_cpu < _WASTE_CPU_THRESHOLD:
reasons.append(f"CPU 7일 평균 {avg_cpu:.1f}% (기준 {_WASTE_CPU_THRESHOLD}% 미만)")
# 2. 메모리 사용률 < 20%
avg_mem = sum(s.mem_pct for s in cpu_snaps if s.mem_pct is not None) / len(cpu_snaps)
if avg_mem < _WASTE_MEM_THRESHOLD:
reasons.append(f"메모리 사용률 {avg_mem:.1f}% (기준 {_WASTE_MEM_THRESHOLD}% 미만)")
# 3. 30일 이상 SR 없는 서버
sr_count = (
await db.execute(
select(func.count(SRRequest.id)).where(
SRRequest.server_id == srv.id,
SRRequest.created_at >= thirty_days_ago,
)
)
).scalar() or 0
if sr_count == 0:
reasons.append(f"{_WASTE_SR_DAYS}일 이상 SR 발생 없음")
if reasons:
waste_items.append({
"server_id": srv.id,
"server_name": srv.server_name,
"server_role": srv.server_role,
"reasons": reasons,
"waste_score": len(reasons), # 많을수록 낭비 심각
"est_monthly_saving": _SAVING_UNIT["server"],
})
waste_items.sort(key=lambda x: x["waste_score"], reverse=True)
return waste_items
async def _build_recommendations_from_ai(
ai_text: str, db: AsyncSession
) -> List[CostRecommendation]:
"""Ollama 응답 텍스트를 파싱하여 CostRecommendation 레코드 생성."""
recs = []
# 번호 목록 패턴 파싱: "1. ...", "2. ..." 등
lines = [l.strip() for l in ai_text.split("\n") if l.strip()]
current_title = ""
current_desc_parts: List[str] = []
idx = 0
for line in lines:
# "숫자. " 로 시작하는 행 = 새 권고 항목
if len(line) > 2 and line[0].isdigit() and line[1] in (".", ")"):
# 이전 항목 저장
if current_title:
rec = CostRecommendation(
category="cloud",
title=current_title[:300],
description="\n".join(current_desc_parts) or None,
estimated_saving=float(_SAVING_UNIT["cloud"]),
risk_level="LOW",
auto_applicable=False,
status="pending",
)
recs.append(rec)
idx += 1
current_title = line[2:].strip()
current_desc_parts = []
else:
current_desc_parts.append(line)
# 마지막 항목 저장
if current_title:
rec = CostRecommendation(
category="cloud",
title=current_title[:300],
description="\n".join(current_desc_parts) or None,
estimated_saving=float(_SAVING_UNIT["cloud"]),
risk_level="LOW",
auto_applicable=False,
status="pending",
)
recs.append(rec)
# 최대 5개 제한
return recs[:5]
# ── 엔드포인트 ────────────────────────────────────────────────────────────────
@router.get("/analysis")
async def get_analysis_status(
limit: int = Query(10, ge=1, le=50),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""비용 AI 분석 현황 조회 — 최근 분석 이력을 반환한다."""
rows = (
await db.execute(
select(CostAIAnalysis)
.order_by(CostAIAnalysis.created_at.desc())
.limit(limit)
)
).scalars().all()
pending_recs = (
await db.execute(
select(func.count(CostRecommendation.id)).where(
CostRecommendation.status == "pending"
)
)
).scalar() or 0
applied_recs = (
await db.execute(
select(func.count(CostRecommendation.id)).where(
CostRecommendation.status == "applied"
)
)
).scalar() or 0
total_saved = (
await db.execute(
select(func.coalesce(func.sum(CostRecommendation.estimated_saving), 0.0)).where(
CostRecommendation.status == "applied"
)
)
).scalar() or 0.0
return {
"analysis_count": len(rows),
"pending_recs": pending_recs,
"applied_recs": applied_recs,
"total_saved_manwon": round(total_saved, 1),
"latest_analysis": [AnalysisOut.model_validate(r) for r in rows],
}
@router.post("/analyze", status_code=201)
async def run_analysis(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""AI 비용 분석 실행 — Ollama 기반 절감 권고를 자동 생성한다.
분석 흐름:
1. 비용 현황 스냅샷 수집
2. 낭비 리소스 감지
3. Ollama sLLM에 분석 요청
4. 응답 파싱 → CostRecommendation 자동 생성
5. CostAIAnalysis 기록 저장
"""
snapshot = await _collect_cost_snapshot(db)
waste_items = await _detect_waste(db)
# Ollama 프롬프트 조합
waste_summary = (
f"\n낭비 감지 서버 {len(waste_items)}대:\n" +
"\n".join(
f" - {w['server_name']}: {', '.join(w['reasons'])}"
for w in waste_items[:5]
)
if waste_items else "\n낭비 감지 서버 없음"
)
prompt = (
"다음 IT 인프라 비용 현황을 분석하여 절감 기회 3가지를 한국어로 제안해줘:\n\n"
f"분석 기간: {snapshot['period']}\n"
f"서버 수: {snapshot['server_count']}\n"
f"7일 평균 CPU: {snapshot['avg_cpu_pct']}%\n"
f"7일 평균 메모리: {snapshot['avg_mem_pct']}%\n"
f"월 추산 운영비: {snapshot['estimated_monthly']:.0f}만원"
f"{waste_summary}\n\n"
"각 항목은 '번호. 제목' 형식으로 시작하고 2~3줄 설명을 덧붙여줘."
)
ai_text = await _call_ollama(prompt, timeout=30.0)
# 폴백: 규칙 기반 인사이트
if not ai_text:
ai_text = (
"1. 유휴 서버 통합 가상화\n"
" CPU/메모리 사용률이 낮은 서버를 가상화하여 물리 서버 수를 줄이세요.\n"
"2. 미사용 라이선스 정기 감사\n"
" 분기마다 소프트웨어 라이선스 사용 현황을 점검하고 불필요한 계약을 해지하세요.\n"
"3. 네트워크 대역폭 최적화\n"
" 실제 사용량 대비 과잉 할당된 회선을 축소하여 통신비를 절감하세요."
)
# CostRecommendation 자동 생성 (낭비 서버 권고 포함)
new_recs: List[CostRecommendation] = []
# 낭비 서버 권고
for w in waste_items[:3]:
rec = CostRecommendation(
category="server",
title=f"[유휴 서버 절감] {w['server_name']}{w['reasons'][0]}",
description="서버 통합·가상화 또는 하드웨어 반납을 검토하세요.",
estimated_saving=float(w["est_monthly_saving"]),
risk_level="LOW" if w["waste_score"] == 1 else "MEDIUM",
auto_applicable=False,
status="pending",
)
new_recs.append(rec)
# AI 텍스트 파싱 권고
ai_recs = await _build_recommendations_from_ai(ai_text, db)
new_recs.extend(ai_recs)
for rec in new_recs:
db.add(rec)
# 분석 결과 저장
analysis = CostAIAnalysis(
period=snapshot["period"],
total_cost=snapshot["estimated_monthly"],
breakdown=json.dumps(snapshot, ensure_ascii=False),
ai_insights=ai_text,
waste_detected=json.dumps(waste_items[:10], ensure_ascii=False),
)
db.add(analysis)
await db.commit()
await db.refresh(analysis)
logger.info("비용 AI 분석 완료: period=%s recs=%d", snapshot["period"], len(new_recs))
return {
"analysis_id": analysis.id,
"period": analysis.period,
"total_cost_manwon": analysis.total_cost,
"waste_count": len(waste_items),
"recommendations_created": len(new_recs),
"ai_insights": ai_text,
"ollama_used": ai_text != "" and "유휴 서버 통합" not in ai_text,
}
@router.get("/forecast/{days}")
async def get_forecast(
days: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""비용 예측 — 30/60/90일 선형 추세 기반.
과거 분석 이력에서 monthly_cost 시계열을 추출하여
단순 선형 회귀로 미래 비용을 예측한다.
"""
if days not in (30, 60, 90):
raise HTTPException(400, "days는 30, 60, 90 중 하나여야 합니다.")
# 과거 분석 이력 수집
rows = (
await db.execute(
select(CostAIAnalysis)
.order_by(CostAIAnalysis.created_at.asc())
.limit(12)
)
).scalars().all()
now = datetime.utcnow()
# 데이터 부족 시 기본 추산
if len(rows) < 2:
base_cost = rows[0].total_cost if rows else 500.0 # 만원
trend_rate = 0.02 # 월 2% 성장 가정
else:
costs = [r.total_cost for r in rows]
n = len(costs)
x_mean = (n - 1) / 2.0
y_mean = sum(costs) / n
numerator = sum((i - x_mean) * (costs[i] - y_mean) for i in range(n))
denominator = sum((i - x_mean) ** 2 for i in range(n))
slope = numerator / denominator if denominator > 0 else 0.0
base_cost = costs[-1]
# 월 환산 추세율
trend_rate = slope / base_cost if base_cost > 0 else 0.02
# 예측 포인트 생성 (월 단위)
months_ahead = days // 30
forecasts_saved = []
for m in range(1, months_ahead + 1):
target_date = now + timedelta(days=m * 30)
predicted = base_cost * ((1 + trend_rate) ** m)
# 신뢰도: 데이터 적을수록, 예측 기간 길수록 낮아짐
confidence = max(0.3, min(0.95, 0.95 - 0.1 * m - (0.05 if len(rows) < 4 else 0)))
factors_obj = {
"trend_rate_pct": round(trend_rate * 100, 2),
"base_cost": round(base_cost, 1),
"month_offset": m,
"history_points": len(rows),
}
fc = CostForecast(
forecast_date=target_date,
predicted_cost=round(predicted, 1),
confidence=round(confidence, 2),
factors=json.dumps(factors_obj, ensure_ascii=False),
)
db.add(fc)
forecasts_saved.append(fc)
await db.commit()
for fc in forecasts_saved:
await db.refresh(fc)
total_predicted = sum(fc.predicted_cost for fc in forecasts_saved)
delta_pct = round((total_predicted / (base_cost * months_ahead) - 1) * 100, 1) if base_cost > 0 else 0.0
return {
"days": days,
"base_period_cost": round(base_cost, 1),
"trend_rate_pct": round(trend_rate * 100, 2),
"history_points": len(rows),
"total_predicted": round(total_predicted, 1),
"delta_vs_flat_pct": delta_pct,
"forecasts": [ForecastOut.model_validate(fc) for fc in forecasts_saved],
"disclaimer": "예측은 과거 추세 기반 참고값입니다. 실제와 다를 수 있습니다.",
}
@router.get("/recommendations")
async def list_recommendations(
status: Optional[str] = Query(None, description="pending|applied|rejected"),
category: Optional[str] = Query(None, description="server|license|cloud"),
limit: int = Query(20, ge=1, le=100),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""최적화 권고 목록 조회."""
q = select(CostRecommendation).order_by(
CostRecommendation.estimated_saving.desc(),
CostRecommendation.created_at.desc(),
)
if status:
q = q.where(CostRecommendation.status == status)
if category:
q = q.where(CostRecommendation.category == category)
q = q.limit(limit)
rows = (await db.execute(q)).scalars().all()
total_saving = sum(r.estimated_saving for r in rows)
return {
"total": len(rows),
"total_saving_manwon": round(total_saving, 1),
"recommendations": [RecommendationOut.model_validate(r) for r in rows],
}
@router.post("/recommendations/{rec_id}/apply")
async def apply_recommendation(
rec_id: int,
current_user: User = Depends(require_admin_role),
db: AsyncSession = Depends(get_db),
):
"""권고 자동 적용 — ADMIN 승인 후 상태를 applied로 전환한다.
실제 자동화 액션(서버 셧다운 등)은 별도 SSH 실행 레이어가 담당한다.
여기서는 상태 전환 + 감사 기록만 처리한다.
"""
rec = (
await db.execute(select(CostRecommendation).where(CostRecommendation.id == rec_id))
).scalar_one_or_none()
if not rec:
raise HTTPException(404, f"권고 ID {rec_id} 를 찾을 수 없습니다.")
if rec.status != "pending":
raise HTTPException(400, f"이미 처리된 권고입니다 (현재 상태: {rec.status}).")
if not rec.auto_applicable:
raise HTTPException(
400,
"이 권고는 자동 적용이 불가합니다. 수동으로 조치 후 상태를 업데이트하세요.",
)
rec.status = "applied"
await db.commit()
await db.refresh(rec)
logger.info("비용 권고 적용: id=%d title=%s by=%s", rec.id, rec.title, current_user.username)
return {
"message": "권고가 적용되었습니다.",
"recommendation": RecommendationOut.model_validate(rec),
"applied_by": current_user.username,
"applied_at": datetime.utcnow().isoformat(),
}
@router.post("/recommendations/{rec_id}/reject")
async def reject_recommendation(
rec_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""권고 반려 — 불필요한 권고를 rejected 상태로 전환한다."""
rec = (
await db.execute(select(CostRecommendation).where(CostRecommendation.id == rec_id))
).scalar_one_or_none()
if not rec:
raise HTTPException(404, f"권고 ID {rec_id} 를 찾을 수 없습니다.")
if rec.status != "pending":
raise HTTPException(400, f"이미 처리된 권고입니다 (현재 상태: {rec.status}).")
rec.status = "rejected"
await db.commit()
await db.refresh(rec)
logger.info("비용 권고 반려: id=%d by=%s", rec.id, current_user.username)
return {
"message": "권고가 반려되었습니다.",
"recommendation": RecommendationOut.model_validate(rec),
}
@router.get("/waste")
async def detect_waste_resources(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""낭비 리소스 감지.
기준:
- 서버 CPU 7일 평균 < 10%
- 메모리 사용률 < 20%
- 30일 이상 SR 없는 서버
"""
waste_items = await _detect_waste(db)
total_saving = sum(w["est_monthly_saving"] for w in waste_items)
return {
"waste_count": len(waste_items),
"total_saving_manwon": total_saving,
"cpu_threshold_pct": _WASTE_CPU_THRESHOLD,
"mem_threshold_pct": _WASTE_MEM_THRESHOLD,
"sr_inactive_days": _WASTE_SR_DAYS,
"waste_resources": waste_items,
"detection_at": datetime.utcnow().isoformat(),
}
@router.get("/savings-report")
async def savings_report(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""절감 실적 리포트 — 적용된 권고 기반 누적 절감 효과를 리포트한다."""
# 상태별 집계
status_counts: dict = {}
for status_val in ("pending", "applied", "rejected"):
cnt = (
await db.execute(
select(func.count(CostRecommendation.id)).where(
CostRecommendation.status == status_val
)
)
).scalar() or 0
status_counts[status_val] = cnt
# 카테고리별 절감액 (적용된 항목만)
applied_rows = (
await db.execute(
select(CostRecommendation).where(CostRecommendation.status == "applied")
)
).scalars().all()
by_category: dict = {}
for r in applied_rows:
by_category.setdefault(r.category, {"count": 0, "saving": 0.0})
by_category[r.category]["count"] += 1
by_category[r.category]["saving"] += r.estimated_saving
total_applied_saving = sum(r.estimated_saving for r in applied_rows)
# 최근 분석 이력
latest_analysis = (
await db.execute(
select(CostAIAnalysis)
.order_by(CostAIAnalysis.created_at.desc())
.limit(1)
)
).scalar_one_or_none()
# 12개월 누적 추산 (월 절감 × 12)
annual_projected = total_applied_saving * 12
return {
"report_date": datetime.utcnow().isoformat(),
"recommendation_status": status_counts,
"total_applied_saving_manwon": round(total_applied_saving, 1),
"annual_projected_manwon": round(annual_projected, 1),
"by_category": {
k: {"count": v["count"], "saving_manwon": round(v["saving"], 1)}
for k, v in by_category.items()
},
"latest_analysis_period": latest_analysis.period if latest_analysis else None,
"total_analyses": (
await db.execute(select(func.count(CostAIAnalysis.id)))
).scalar() or 0,
"roi_note": (
f"현재까지 월 {total_applied_saving:.0f}만원 절감 권고 적용 완료. "
f"연 환산 약 {annual_projected:.0f}만원 절감 예상."
),
}