""" GUARDiA ITSM — Analytics API Enhancement A-5: E-2 배포 성공률 트렌드 + E-3 엔지니어 워크로드 분석 엔드포인트: GET /api/analytics/deploy/trend — 기간별 배포 성공률 트렌드 GET /api/analytics/deploy/summary — 배포 전체 요약 통계 GET /api/analytics/deploy/by-project — 프로젝트별 배포 성공률 GET /api/analytics/engineer/workload — 엔지니어별 SR 처리 워크로드 GET /api/analytics/engineer/overview — 엔지니어 전체 개요 GET /api/analytics/sr/resolution-time — SR 해결 시간 분포 GET /api/analytics/sr/trend — SR 유형/상태별 트렌드 """ from __future__ import annotations from datetime import date, datetime, timedelta from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy import select, func, and_, case from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user from database import get_db from models import User, UserRole, VibeSession, VibeSessionStatus, SRRequest, SRStatus # ── F-2/F-3: 캐시 & Rate Limit 관리 엔드포인트 ────────────────────────────── # (analytics 라우터 아래 /api/analytics/admin/* 으로 노출) router = APIRouter(prefix="/api/analytics", tags=["analytics"]) # ── 헬퍼 함수 ──────────────────────────────────────────────────────────────── def _date_range(days: int, offset: int = 0) -> tuple[date, date]: """(start_date, end_date) 반환.""" end = date.today() - timedelta(days=offset) start = end - timedelta(days=days - 1) return start, end def _require_non_customer(current_user: User) -> None: """CUSTOMER 역할 접근 차단.""" if current_user.role == UserRole.CUSTOMER: raise HTTPException(403, "권한이 없습니다.") # ═══════════════════════════════════════════════════════════════════════════════ # ── E-2: 배포 성공률 트렌드 ─────────────────────────────────────────────────── # ═══════════════════════════════════════════════════════════════════════════════ @router.get("/deploy/trend") async def deploy_trend( days: int = Query(30, ge=7, le=365, description="분석 기간(일)"), granularity: str = Query("day", pattern="^(day|week|month)$", description="집계 단위: day/week/month"), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 기간별 배포 성공률 트렌드. VibeSession 기준: - 성공 = status == COMPLETED - 실패 = status in (FAILED, CANCELLED) - 진행중 = 그 외 Returns: [{ "period": "2026-05", "total": 12, "success": 10, "failed": 2, "success_rate": 83.3, "avg_duration_min": 14.5 }] """ _require_non_customer(current_user) start, end = _date_range(days) rows = (await db.execute( select(VibeSession).where( and_( func.date(VibeSession.started_at) >= start, func.date(VibeSession.started_at) <= end, VibeSession.status.notin_([VibeSessionStatus.PENDING, VibeSessionStatus.CODING]), ) ).order_by(VibeSession.started_at) )).scalars().all() # 집계 buckets: dict[str, dict] = {} for s in rows: dt: datetime = s.started_at if granularity == "day": key = dt.strftime("%Y-%m-%d") elif granularity == "week": monday = dt - timedelta(days=dt.weekday()) key = monday.strftime("%Y-%m-%d") else: # month key = dt.strftime("%Y-%m") if key not in buckets: buckets[key] = {"total": 0, "success": 0, "failed": 0, "duration_sum": 0, "duration_cnt": 0} b = buckets[key] b["total"] += 1 if s.status == VibeSessionStatus.COMPLETED: b["success"] += 1 elif s.status in (VibeSessionStatus.FAILED, VibeSessionStatus.CANCELLED): b["failed"] += 1 # 배포 소요 시간 (started_at → deployed_at) if s.deployed_at and s.started_at: duration = (s.deployed_at - s.started_at).total_seconds() / 60 b["duration_sum"] += duration b["duration_cnt"] += 1 result = [] for period in sorted(buckets.keys()): b = buckets[period] total = b["total"] success_rate = round(b["success"] / total * 100, 1) if total > 0 else 0 avg_dur = round(b["duration_sum"] / b["duration_cnt"], 1) if b["duration_cnt"] > 0 else None result.append({ "period": period, "total": total, "success": b["success"], "failed": b["failed"], "in_progress": total - b["success"] - b["failed"], "success_rate": success_rate, "avg_duration_min": avg_dur, }) return { "granularity": granularity, "from": start.isoformat(), "to": end.isoformat(), "data": result, } @router.get("/deploy/summary") async def deploy_summary( days: int = Query(30, ge=1, le=365), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """배포 전체 요약 통계.""" _require_non_customer(current_user) start, end = _date_range(days) rows = (await db.execute( select(VibeSession).where( func.date(VibeSession.started_at) >= start, func.date(VibeSession.started_at) <= end, ) )).scalars().all() total = len(rows) completed = sum(1 for r in rows if r.status == VibeSessionStatus.COMPLETED) failed = sum(1 for r in rows if r.status == VibeSessionStatus.FAILED) cancelled = sum(1 for r in rows if r.status == VibeSessionStatus.CANCELLED) building = sum(1 for r in rows if r.status == VibeSessionStatus.BUILDING) # 성공 시 소요 시간 (분) durations = [ (r.deployed_at - r.started_at).total_seconds() / 60 for r in rows if r.status == VibeSessionStatus.COMPLETED and r.deployed_at and r.started_at ] avg_duration = round(sum(durations) / len(durations), 1) if durations else None min_duration = round(min(durations), 1) if durations else None max_duration = round(max(durations), 1) if durations else None active_rate = round(completed / total * 100, 1) if total > 0 else 0 return { "period_days": days, "from": start.isoformat(), "to": end.isoformat(), "total": total, "completed": completed, "failed": failed, "cancelled": cancelled, "in_progress": building, "success_rate_pct": active_rate, "duration_min": { "avg": avg_duration, "min": min_duration, "max": max_duration, }, } @router.get("/deploy/by-project") async def deploy_by_project( days: int = Query(30, ge=1, le=365), limit: int = Query(20, ge=1, le=100), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """프로젝트별 배포 성공률 순위.""" _require_non_customer(current_user) start, end = _date_range(days) from models import Project rows = (await db.execute( select(VibeSession, Project).join( Project, VibeSession.project_id == Project.id, isouter=True ).where( func.date(VibeSession.started_at) >= start, func.date(VibeSession.started_at) <= end, ) )).all() # 프로젝트별 집계 projects: dict[str, dict] = {} for session, project in rows: key = project.project_name if project else "(프로젝트 미지정)" if key not in projects: projects[key] = {"total": 0, "success": 0, "failed": 0} projects[key]["total"] += 1 if session.status == VibeSessionStatus.COMPLETED: projects[key]["success"] += 1 elif session.status in (VibeSessionStatus.FAILED, VibeSessionStatus.CANCELLED): projects[key]["failed"] += 1 result = [ { "project": name, "total": v["total"], "success": v["success"], "failed": v["failed"], "success_rate": round(v["success"] / v["total"] * 100, 1) if v["total"] > 0 else 0, } for name, v in sorted(projects.items(), key=lambda x: -x[1]["total"]) ][:limit] return {"period_days": days, "data": result} # ═══════════════════════════════════════════════════════════════════════════════ # ── E-3: 엔지니어 워크로드 분석 ────────────────────────────────────────────── # ═══════════════════════════════════════════════════════════════════════════════ @router.get("/engineer/workload") async def engineer_workload( days: int = Query(30, ge=1, le=365), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 엔지니어별 SR 처리 워크로드 분석. 통계: - 담당 SR 수 (전체/완료/진행중) - 평균 해결 시간 - 우선순위별 분포 - SLA 위반 비율 """ _require_non_customer(current_user) start, end = _date_range(days) rows = (await db.execute( select(SRRequest).where( func.date(SRRequest.created_at) >= start, func.date(SRRequest.created_at) <= end, SRRequest.assigned_to.isnot(None), ) )).scalars().all() # 엔지니어별 집계 engineers: dict[str, dict] = {} terminal = {SRStatus.COMPLETED, SRStatus.REJECTED, SRStatus.FAILED_ROLLBACK} for sr in rows: eng = sr.assigned_to if eng not in engineers: engineers[eng] = { "total": 0, "completed": 0, "in_progress": 0, "sla_breached": 0, "priority": {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0}, "resolution_times": [], } e = engineers[eng] e["total"] += 1 if SRStatus(sr.status) in terminal: e["completed"] += 1 # 해결 시간 계산 (created_at → updated_at) if sr.updated_at and sr.created_at: dur = (sr.updated_at - sr.created_at).total_seconds() / 3600 e["resolution_times"].append(round(dur, 2)) else: e["in_progress"] += 1 if sr.sla_breached: e["sla_breached"] += 1 prio = sr.priority.upper() if sr.priority else "MEDIUM" if prio in e["priority"]: e["priority"][prio] += 1 result = [] for eng_name, data in sorted(engineers.items(), key=lambda x: -x[1]["total"]): res_times = data["resolution_times"] avg_res = round(sum(res_times) / len(res_times), 1) if res_times else None sla_breach_rate = round(data["sla_breached"] / data["total"] * 100, 1) if data["total"] > 0 else 0 result.append({ "engineer": eng_name, "total_sr": data["total"], "completed_sr": data["completed"], "in_progress_sr": data["in_progress"], "sla_breached": data["sla_breached"], "sla_breach_rate_pct": sla_breach_rate, "avg_resolution_hours": avg_res, "priority_distribution": data["priority"], }) return { "period_days": days, "from": start.isoformat(), "to": end.isoformat(), "data": result, } @router.get("/engineer/overview") async def engineer_overview( db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 엔지니어 현재 상태 개요 (실시간). 현재 진행 중인 SR 수 기준 워크로드 순위. """ _require_non_customer(current_user) active_statuses = [ SRStatus.RECEIVED, SRStatus.PARSED, SRStatus.PENDING_APPROVAL, SRStatus.APPROVED, SRStatus.IN_PROGRESS, SRStatus.PENDING_PM_VALIDATION, ] rows = (await db.execute( select(SRRequest).where( SRRequest.assigned_to.isnot(None), SRRequest.status.in_([s.value for s in active_statuses]), ) )).scalars().all() engineers: dict[str, dict] = {} for sr in rows: eng = sr.assigned_to if eng not in engineers: engineers[eng] = {"active": 0, "critical": 0, "sla_overdue": 0} engineers[eng]["active"] += 1 if sr.priority and sr.priority.upper() == "CRITICAL": engineers[eng]["critical"] += 1 if sr.sla_breached: engineers[eng]["sla_overdue"] += 1 result = [ { "engineer": eng, "active_sr": d["active"], "critical_sr": d["critical"], "sla_overdue": d["sla_overdue"], } for eng, d in sorted(engineers.items(), key=lambda x: (-x[1]["active"], x[0])) ] return { "total_engineers_on_duty": len(result), "data": result, } # ═══════════════════════════════════════════════════════════════════════════════ # ── SR 트렌드 / 해결 시간 ───────────────────────────────────────────────────── # ═══════════════════════════════════════════════════════════════════════════════ @router.get("/sr/trend") async def sr_trend( days: int = Query(30, ge=7, le=365), granularity: str = Query("day", pattern="^(day|week|month)$"), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """SR 유형/상태별 트렌드 (생성 기준).""" _require_non_customer(current_user) start, end = _date_range(days) rows = (await db.execute( select(SRRequest).where( func.date(SRRequest.created_at) >= start, func.date(SRRequest.created_at) <= end, ).order_by(SRRequest.created_at) )).scalars().all() buckets: dict[str, dict] = {} for sr in rows: dt: datetime = sr.created_at if granularity == "day": key = dt.strftime("%Y-%m-%d") elif granularity == "week": monday = dt - timedelta(days=dt.weekday()) key = monday.strftime("%Y-%m-%d") else: key = dt.strftime("%Y-%m") if key not in buckets: buckets[key] = {"created": 0, "completed": 0, "rejected": 0, "by_type": {}} b = buckets[key] b["created"] += 1 if sr.status == SRStatus.COMPLETED: b["completed"] += 1 elif sr.status == SRStatus.REJECTED: b["rejected"] += 1 sr_type = sr.sr_type or "UNKNOWN" b["by_type"][sr_type] = b["by_type"].get(sr_type, 0) + 1 result = [ { "period": period, "created": b["created"], "completed": b["completed"], "rejected": b["rejected"], "by_type": b["by_type"], } for period, b in sorted(buckets.items()) ] return {"granularity": granularity, "from": start.isoformat(), "to": end.isoformat(), "data": result} @router.get("/sr/resolution-time") async def sr_resolution_time( days: int = Query(30, ge=1, le=365), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """SR 해결 시간 분포 통계 (완료된 SR 기준).""" _require_non_customer(current_user) start, end = _date_range(days) rows = (await db.execute( select(SRRequest).where( func.date(SRRequest.created_at) >= start, func.date(SRRequest.created_at) <= end, SRRequest.status == SRStatus.COMPLETED, SRRequest.updated_at.isnot(None), ) )).scalars().all() durations_h = [ (r.updated_at - r.created_at).total_seconds() / 3600 for r in rows if r.updated_at and r.created_at ] if not durations_h: return {"period_days": days, "count": 0, "stats": None} durations_h.sort() n = len(durations_h) avg = round(sum(durations_h) / n, 2) p50 = round(durations_h[n // 2], 2) p90 = round(durations_h[int(n * 0.9)], 2) p99 = round(durations_h[int(n * 0.99)], 2) # 분포 버킷 (0-4h / 4-8h / 8-24h / 24-72h / 72h+) buckets_dist = {"0-4h": 0, "4-8h": 0, "8-24h": 0, "24-72h": 0, "72h+": 0} for d in durations_h: if d < 4: buckets_dist["0-4h"] += 1 elif d < 8: buckets_dist["4-8h"] += 1 elif d < 24: buckets_dist["8-24h"] += 1 elif d < 72: buckets_dist["24-72h"] += 1 else: buckets_dist["72h+"] += 1 # 우선순위별 평균 by_priority: dict[str, list] = {} for r in rows: prio = r.priority or "MEDIUM" if prio not in by_priority: by_priority[prio] = [] dur = (r.updated_at - r.created_at).total_seconds() / 3600 by_priority[prio].append(dur) avg_by_priority = { p: round(sum(v) / len(v), 2) for p, v in by_priority.items() } return { "period_days": days, "count": n, "stats": { "avg_hours": avg, "p50_hours": p50, "p90_hours": p90, "p99_hours": p99, "min_hours": round(durations_h[0], 2), "max_hours": round(durations_h[-1], 2), }, "distribution": buckets_dist, "avg_by_priority": avg_by_priority, } # ═══════════════════════════════════════════════════════════════════════════════ # ── F-2/F-3: 캐시 & Rate Limit 관리 (ADMIN 전용) ───────────────────────────── # ═══════════════════════════════════════════════════════════════════════════════ @router.get("/admin/cache/info") async def cache_info_endpoint( current_user: User = Depends(get_current_user), ): """캐시 상태 조회 (ADMIN 전용).""" if current_user.role != UserRole.ADMIN: raise HTTPException(403, "ADMIN 권한이 필요합니다.") from core.cache import cache_info return await cache_info() @router.post("/admin/cache/flush", status_code=200) async def cache_flush_endpoint( prefix: Optional[str] = Query(None, description="특정 접두어만 삭제 (없으면 전체)"), current_user: User = Depends(get_current_user), ): """캐시 초기화 (ADMIN 전용).""" if current_user.role != UserRole.ADMIN: raise HTTPException(403, "ADMIN 권한이 필요합니다.") from core.cache import cache_flush_all, cache_invalidate_prefix if prefix: count = await cache_invalidate_prefix(prefix) return {"deleted": count, "prefix": prefix} else: count = await cache_flush_all() return {"deleted": count, "message": "전체 캐시 초기화 완료"} @router.get("/admin/ratelimit/info") async def ratelimit_info_endpoint( current_user: User = Depends(get_current_user), ): """Rate Limit 설정 조회 (ADMIN 전용).""" if current_user.role != UserRole.ADMIN: raise HTTPException(403, "ADMIN 권한이 필요합니다.") from core.ratelimit import ( get_rate_limit_status, DEFAULT_LIMIT, STRICT_LIMIT, LOGIN_LIMIT, AI_LIMIT, UPLOAD_LIMIT, ADMIN_LIMIT, ) return { "limits": { "default": DEFAULT_LIMIT, "strict_ip": STRICT_LIMIT, "login": LOGIN_LIMIT, "ai": AI_LIMIT, "upload": UPLOAD_LIMIT, "admin": ADMIN_LIMIT, }, "description": { "default": "인증된 사용자 일반 API", "strict_ip": "IP 기반 기본 제한", "login": "로그인 엔드포인트 (브루트포스 방지)", "ai": "LLM/AI 엔드포인트 (비용 보호)", "upload": "파일 업로드", "admin": "ADMIN 사용자 완화 제한", }, }