zioinfo-mail/workspace/guardia-itsm/routers/dashboard.py
DESKTOP-TKLFCPR\ython cfe2901a55 refactor(structure): consolidate all projects under workspace/
- itsm/    -> workspace/guardia-itsm/
- manager/ -> workspace/guardia-manager/
- app/     -> workspace/guardia-messenger/
- manual/  -> workspace/guardia-docs/

workspace/zioinfo-web/ unchanged.
git mv preserves full commit history.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-31 23:50:56 +09:00

494 lines
19 KiB
Python

"""
역할 기반 개인화 대시보드 — GET /api/dashboard/me.
ADMIN : 시스템 전체 현황 + 엔지니어 워크로드 + 승인 대기 목록 + 7일 추이
ENGINEER : 내 담당 SR + 처리 대기 + 이번 달 완료 통계
PM : 승인 대기 큐 + 담당 기관 현황 + 엔지니어 워크로드
CUSTOMER : 내 기관 SR 현황 + 진행 중 + 완료 + 평균 별점
실시간:
GET /api/dashboard/events — SSE 이벤트 스트림 (token 쿼리 파라미터)
GET /api/dashboard/stats/trend — 최근 7일 SR 생성/완료 추이
"""
import asyncio
from datetime import date, datetime, timedelta
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from database import get_db
from models import (
ApprovalFlow, ApprovalResult, EngineerProfile, Institution,
Rating, SRRequest, SRStatus, User, UserRole,
)
router = APIRouter(prefix="/api/dashboard", tags=["dashboard"])
_ACTIVE = [
SRStatus.RECEIVED.value, SRStatus.PARSED.value,
SRStatus.PENDING_APPROVAL.value, SRStatus.APPROVED.value,
SRStatus.IN_PROGRESS.value, SRStatus.PENDING_PM_VALIDATION.value,
]
PRIORITY_LABEL = {"CRITICAL": "긴급", "HIGH": "높음", "MEDIUM": "보통", "LOW": "낮음"}
STATUS_LABEL = {
"RECEIVED": "접수", "PARSED": "파싱 완료",
"PENDING_APPROVAL": "승인 대기", "APPROVED": "승인됨",
"IN_PROGRESS": "진행 중", "PENDING_PM_VALIDATION": "PM 검증 대기",
"COMPLETED": "완료", "FAILED_ROLLBACK": "롤백 실패", "REJECTED": "반려",
}
TYPE_LABEL = {
"DEPLOY": "배포", "RESTART": "재기동",
"LOG": "로그", "INQUIRY": "문의", "OTHER": "기타",
}
def _sr_dict(sr: SRRequest) -> dict:
return {
"sr_id": sr.sr_id,
"title": sr.title,
"sr_type": sr.sr_type,
"status": sr.status,
"priority": sr.priority,
"requested_by": sr.requested_by,
"assigned_to": sr.assigned_to,
"target_server": sr.target_server,
"created_at": sr.created_at.isoformat() if sr.created_at else None,
"updated_at": sr.updated_at.isoformat() if sr.updated_at else None,
}
# ── ADMIN ──────────────────────────────────────────────────────────────────────
async def _admin_dashboard(db: AsyncSession, user: User) -> dict:
now = datetime.now()
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
total = (await db.execute(select(func.count(SRRequest.id)))).scalar() or 0
pending = (await db.execute(
select(func.count(SRRequest.id))
.where(SRRequest.status == SRStatus.PENDING_APPROVAL)
)).scalar() or 0
in_prog = (await db.execute(
select(func.count(SRRequest.id))
.where(SRRequest.status == SRStatus.IN_PROGRESS)
)).scalar() or 0
done_today = (await db.execute(
select(func.count(SRRequest.id))
.where(SRRequest.status == SRStatus.COMPLETED)
.where(SRRequest.updated_at >= today_start)
)).scalar() or 0
failed = (await db.execute(
select(func.count(SRRequest.id))
.where(SRRequest.status == SRStatus.FAILED_ROLLBACK)
)).scalar() or 0
completed_total = (await db.execute(
select(func.count(SRRequest.id))
.where(SRRequest.status == SRStatus.COMPLETED)
)).scalar() or 0
# AI 자동 처리율 (simulate 작업 로그가 있는 SR / 전체 완료 SR)
from models import WorkLog, WorkActionType
auto_cnt = (await db.execute(
select(func.count(func.distinct(WorkLog.sr_id)))
.where(WorkLog.engineer == "GUARDiA-AI")
)).scalar() or 0
auto_rate = round(auto_cnt / max(completed_total, 1) * 100, 1)
# 엔지니어 워크로드
eps = (await db.execute(select(EngineerProfile))).scalars().all()
workload = []
for ep in eps:
cnt = (await db.execute(
select(func.count(SRRequest.id))
.where(SRRequest.assigned_to == ep.username)
.where(SRRequest.status.in_(_ACTIVE))
)).scalar() or 0
done = (await db.execute(
select(func.count(SRRequest.id))
.where(SRRequest.assigned_to == ep.username)
.where(SRRequest.status == SRStatus.COMPLETED.value)
)).scalar() or 0
workload.append({
"username": ep.username,
"display_name": ep.display_name or ep.username,
"skill_types": ep.skill_types or "",
"active": cnt, "max_workload": ep.max_workload, "completed": done,
"utilization": round(cnt / ep.max_workload * 100) if ep.max_workload else 0,
"is_available": ep.is_available,
})
# 승인 대기 SR
pend_res = await db.execute(
select(SRRequest)
.where(SRRequest.status == SRStatus.PENDING_APPROVAL)
.order_by(SRRequest.created_at)
.limit(10)
)
pending_srs = [_sr_dict(sr) for sr in pend_res.scalars().all()]
# 최근 SR
recent_res = await db.execute(
select(SRRequest).order_by(SRRequest.created_at.desc()).limit(8)
)
return {
"role": "ADMIN",
"greeting": f"안녕하세요, {user.display_name or user.username}님 👋",
"kpi": {
"total": total, "pending_approval": pending,
"in_progress": in_prog, "completed_today": done_today,
"failed": failed, "auto_rate": auto_rate,
},
"workload": workload,
"pending_srs": pending_srs,
"recent_srs": [_sr_dict(sr) for sr in recent_res.scalars().all()],
}
# ── ENGINEER ───────────────────────────────────────────────────────────────────
async def _engineer_dashboard(db: AsyncSession, user: User) -> dict:
uname = user.username
now = datetime.now()
mo_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
# 내 워크로드
active_cnt = (await db.execute(
select(func.count(SRRequest.id))
.where(SRRequest.assigned_to == uname)
.where(SRRequest.status.in_(_ACTIVE))
)).scalar() or 0
done_month = (await db.execute(
select(func.count(SRRequest.id))
.where(SRRequest.assigned_to == uname)
.where(SRRequest.status == SRStatus.COMPLETED.value)
.where(SRRequest.updated_at >= mo_start)
)).scalar() or 0
done_total = (await db.execute(
select(func.count(SRRequest.id))
.where(SRRequest.assigned_to == uname)
.where(SRRequest.status == SRStatus.COMPLETED.value)
)).scalar() or 0
ep_res = await db.execute(select(EngineerProfile).where(EngineerProfile.username == uname))
ep = ep_res.scalars().first()
max_wl = ep.max_workload if ep else 5
# 처리 대기 (APPROVED — 바로 실행 가능)
ready_res = await db.execute(
select(SRRequest)
.where(SRRequest.assigned_to == uname)
.where(SRRequest.status == SRStatus.APPROVED)
.order_by(SRRequest.priority, SRRequest.created_at)
.limit(5)
)
# 진행 중
inprog_res = await db.execute(
select(SRRequest)
.where(SRRequest.assigned_to == uname)
.where(SRRequest.status == SRStatus.IN_PROGRESS)
)
# 최근 완료
done_res = await db.execute(
select(SRRequest)
.where(SRRequest.assigned_to == uname)
.where(SRRequest.status == SRStatus.COMPLETED.value)
.order_by(SRRequest.updated_at.desc())
.limit(5)
)
# 전체 담당 (활성)
all_active_res = await db.execute(
select(SRRequest)
.where(SRRequest.assigned_to == uname)
.where(SRRequest.status.in_(_ACTIVE))
.order_by(SRRequest.created_at.desc())
)
return {
"role": "ENGINEER",
"greeting": f"안녕하세요, {user.display_name or user.username}님 👷",
"my_workload": {
"active": active_cnt, "max": max_wl,
"completed_month": done_month, "completed_total": done_total,
"utilization": round(active_cnt / max_wl * 100) if max_wl else 0,
"skills": ep.skill_types if ep else "",
},
"action_required": [_sr_dict(sr) for sr in ready_res.scalars().all()],
"in_progress": [_sr_dict(sr) for sr in inprog_res.scalars().all()],
"recent_completed": [_sr_dict(sr) for sr in done_res.scalars().all()],
"all_active": [_sr_dict(sr) for sr in all_active_res.scalars().all()],
}
# ── PM ─────────────────────────────────────────────────────────────────────────
async def _pm_dashboard(db: AsyncSession, user: User) -> dict:
# PM이 담당하는 기관 (contact_pm 필드 = display_name 또는 username 기준)
pm_name = user.display_name or user.username
inst_res = await db.execute(
select(Institution).where(Institution.contact_pm.contains(pm_name))
)
my_insts = inst_res.scalars().all()
my_inst_ids = [i.id for i in my_insts]
# 승인 대기 전체 (PM은 모든 기관 SR 승인 가능)
pend_res = await db.execute(
select(SRRequest)
.where(SRRequest.status == SRStatus.PENDING_APPROVAL)
.order_by(SRRequest.priority, SRRequest.created_at)
.limit(15)
)
pending = [_sr_dict(sr) for sr in pend_res.scalars().all()]
# 담당 기관별 현황
inst_stats = []
for inst in my_insts:
total_i = (await db.execute(
select(func.count(SRRequest.id)).where(SRRequest.inst_id == inst.id)
)).scalar() or 0
active_i = (await db.execute(
select(func.count(SRRequest.id))
.where(SRRequest.inst_id == inst.id)
.where(SRRequest.status.in_(_ACTIVE))
)).scalar() or 0
done_i = (await db.execute(
select(func.count(SRRequest.id))
.where(SRRequest.inst_id == inst.id)
.where(SRRequest.status == SRStatus.COMPLETED.value)
)).scalar() or 0
inst_stats.append({
"inst_code": inst.inst_code, "inst_name": inst.inst_name,
"total": total_i, "active": active_i, "completed": done_i,
})
# 엔지니어 워크로드
eps = (await db.execute(select(EngineerProfile))).scalars().all()
workload = []
for ep in eps:
cnt = (await db.execute(
select(func.count(SRRequest.id))
.where(SRRequest.assigned_to == ep.username)
.where(SRRequest.status.in_(_ACTIVE))
)).scalar() or 0
workload.append({
"username": ep.username, "display_name": ep.display_name or ep.username,
"active": cnt, "max_workload": ep.max_workload,
"utilization": round(cnt / ep.max_workload * 100) if ep.max_workload else 0,
})
# 최근 승인 처리 내역
my_approvals_res = await db.execute(
select(ApprovalFlow)
.where(ApprovalFlow.approver.contains(pm_name))
.where(ApprovalFlow.result != ApprovalResult.PENDING)
.order_by(ApprovalFlow.decided_at.desc())
.limit(5)
)
return {
"role": "PM",
"greeting": f"안녕하세요, {user.display_name or user.username}",
"pending_count": len(pending),
"pending_srs": pending,
"inst_stats": inst_stats,
"workload": workload,
"recent_decisions": [
{
"sr_id": a.sr_id, "approver": a.approver,
"result": a.result, "comment": a.comment,
"decided_at": a.decided_at.isoformat() if a.decided_at else None,
}
for a in my_approvals_res.scalars().all()
],
}
# ── CUSTOMER ───────────────────────────────────────────────────────────────────
async def _customer_dashboard(db: AsyncSession, user: User) -> dict:
inst_code = user.inst_code or ""
inst_r = await db.execute(
select(Institution).where(Institution.inst_code == inst_code)
)
inst = inst_r.scalars().first()
inst_name = inst.inst_name if inst else inst_code
inst_id = inst.id if inst else -1
total_c = (await db.execute(
select(func.count(SRRequest.id)).where(SRRequest.inst_id == inst_id)
)).scalar() or 0
active_c = (await db.execute(
select(func.count(SRRequest.id))
.where(SRRequest.inst_id == inst_id)
.where(SRRequest.status.in_(_ACTIVE))
)).scalar() or 0
done_c = (await db.execute(
select(func.count(SRRequest.id))
.where(SRRequest.inst_id == inst_id)
.where(SRRequest.status == SRStatus.COMPLETED.value)
)).scalar() or 0
pend_c = (await db.execute(
select(func.count(SRRequest.id))
.where(SRRequest.inst_id == inst_id)
.where(SRRequest.status == SRStatus.PENDING_APPROVAL)
)).scalar() or 0
# 진행 중 SR
active_res = await db.execute(
select(SRRequest)
.where(SRRequest.inst_id == inst_id)
.where(SRRequest.status.in_(_ACTIVE))
.order_by(SRRequest.priority, SRRequest.created_at.desc())
.limit(10)
)
# 최근 완료
done_res = await db.execute(
select(SRRequest)
.where(SRRequest.inst_id == inst_id)
.where(SRRequest.status == SRStatus.COMPLETED.value)
.order_by(SRRequest.updated_at.desc())
.limit(5)
)
# 평균 별점
ratings_res = await db.execute(
select(Rating)
.join(SRRequest, Rating.sr_id == SRRequest.sr_id)
.where(SRRequest.inst_id == inst_id)
)
ratings = ratings_res.scalars().all()
avg_rating = round(sum(r.stars for r in ratings) / len(ratings), 1) if ratings else None
return {
"role": "CUSTOMER",
"greeting": f"안녕하세요, {user.display_name or user.username}",
"inst_code": inst_code,
"inst_name": inst_name,
"stats": {
"total": total_c, "active": active_c,
"completed": done_c, "pending_approval": pend_c,
},
"active_srs": [_sr_dict(sr) for sr in active_res.scalars().all()],
"recent_completed": [_sr_dict(sr) for sr in done_res.scalars().all()],
"avg_rating": avg_rating,
"rating_count": len(ratings),
}
# ── Endpoint ───────────────────────────────────────────────────────────────────
@router.get("/me")
async def get_my_dashboard(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""로그인 사용자 역할에 맞는 대시보드 데이터 반환."""
if current_user.role == UserRole.ADMIN:
return await _admin_dashboard(db, current_user)
elif current_user.role == UserRole.ENGINEER:
return await _engineer_dashboard(db, current_user)
elif current_user.role == UserRole.PM:
return await _pm_dashboard(db, current_user)
elif current_user.role == UserRole.CUSTOMER:
return await _customer_dashboard(db, current_user)
return {"role": current_user.role, "message": "대시보드 없음"}
# ── 7일 추이 ─────────────────────────────────────────────────────────────────
@router.get("/stats/trend")
async def get_trend(
db: AsyncSession = Depends(get_db),
_: User = Depends(get_current_user),
):
"""최근 7일 SR 생성 / 완료 추이를 반환합니다."""
today = date.today()
day_list = [(today - timedelta(days=i)) for i in range(6, -1, -1)]
result = []
for d in day_list:
start = datetime(d.year, d.month, d.day, 0, 0, 0)
end = datetime(d.year, d.month, d.day, 23, 59, 59)
created = (await db.execute(
select(func.count(SRRequest.id))
.where(SRRequest.created_at >= start)
.where(SRRequest.created_at <= end)
)).scalar() or 0
completed = (await db.execute(
select(func.count(SRRequest.id))
.where(SRRequest.status == SRStatus.COMPLETED.value)
.where(SRRequest.updated_at >= start)
.where(SRRequest.updated_at <= end)
)).scalar() or 0
result.append({
"date": d.isoformat(),
"label": d.strftime("%m/%d"),
"created": created,
"completed": completed,
})
return result
# ── SSE 실시간 이벤트 스트림 ──────────────────────────────────────────────────
@router.get("/events")
async def dashboard_events(
token: str = Query(..., description="JWT Bearer 토큰 (EventSource는 헤더 미지원)"),
db: AsyncSession = Depends(get_db),
):
"""SSE 이벤트 스트림.
EventSource API 는 Authorization 헤더를 지원하지 않으므로
`token` 쿼리 파라미터로 JWT를 수신하여 검증합니다.
"""
from jose import JWTError, jwt as jose_jwt
from core.auth import SECRET_KEY, ALGORITHM
from core.events import subscribe, unsubscribe
# ── 토큰 검증 ──────────────────────────────────────────────────
try:
payload = jose_jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if not username:
raise HTTPException(401, "유효하지 않은 토큰")
except JWTError:
raise HTTPException(401, "유효하지 않은 토큰")
user_r = await db.execute(select(User).where(User.username == username))
user = user_r.scalars().first()
if not user or not user.is_active:
raise HTTPException(401, "사용자를 찾을 수 없습니다")
# ── SSE 스트림 ─────────────────────────────────────────────────
queue = subscribe()
async def generator():
try:
while True:
try:
msg = await asyncio.wait_for(queue.get(), timeout=20.0)
yield f"data: {msg}\n\n"
except asyncio.TimeoutError:
# 연결 유지용 heartbeat 주석
yield ": heartbeat\n\n"
except Exception:
break
except (asyncio.CancelledError, GeneratorExit):
pass
finally:
unsubscribe(queue)
return StreamingResponse(
generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # nginx 버퍼링 비활성화
},
)