""" 고객사 셀프서비스 포털 API 기능: - SR 접수 / 상태 조회 / 이력 조회 - AI FAQ 자가해결 추천 (SR 접수 전) - 서비스 카탈로그 셀프 주문 - 만족도 평가 - 공지사항 조회 엔드포인트: POST /api/portal/sr — SR 접수 GET /api/portal/sr — 내 SR 목록 GET /api/portal/sr/{sr_id} — SR 상세 + 처리 이력 POST /api/portal/sr/{sr_id}/rate — 처리 만족도 평가 POST /api/portal/faq/suggest — AI 자가해결 추천 GET /api/portal/catalog — 서비스 카탈로그 (고객용) GET /api/portal/announcements — 공지사항 GET /api/portal/stats — 내 기관 통계 """ from __future__ import annotations import logging from datetime import datetime from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user from database import get_db from models import ( SRRequest, SRStatus, SRCreate, AuditLog, Institution, Rating, User, UserRole, ) logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/portal", tags=["portal"]) def _require_customer(user: User) -> User: """고객 포털은 CUSTOMER 또는 모든 역할 허용 (기관 필터링으로 접근 제어).""" return user # ── SR 접수 ─────────────────────────────────────────────────────────────────── class PortalSRCreate(BaseModel): title: str description: Optional[str] = None sr_type: str = "INQUIRY" priority: str = "MEDIUM" category: Optional[str] = None # 카테고리 선택 @router.post("/sr", status_code=201) async def portal_create_sr( body: PortalSRCreate, db: AsyncSession = Depends(get_db), cu: User = Depends(get_current_user), ): """고객 포털 SR 접수.""" from uuid import uuid4 sr_id = f"SR-{datetime.now().strftime('%Y%m%d')}-{str(uuid4())[:6].upper()}" # 기관 조회 inst_id = None if cu.inst_code: inst = (await db.execute( select(Institution).where(Institution.inst_code == cu.inst_code) )).scalars().first() if inst: inst_id = inst.id sr = SRRequest( sr_id = sr_id, inst_id = inst_id, sr_type = body.sr_type, title = body.title, description = body.description, status = SRStatus.RECEIVED, priority = body.priority, requested_by= cu.username, ) db.add(sr) # AI 자동 분류 (백그라운드) import asyncio as _aio async def _classify(): try: from core.ticket_classifier import classify_ticket import json as _j suggestion = await classify_ticket(body.title, body.description or "") async with (await db.connection()).begin(): sr.ai_suggestion = _j.dumps(suggestion, ensure_ascii=False) except Exception: pass await db.commit() await db.refresh(sr) _aio.create_task(_classify()) # SLA 설정 from core.sla import set_sla_on_create async with db.begin(): await set_sla_on_create(sr.sr_id, db) return { "sr_id": sr.sr_id, "status": sr.status, "message": f"서비스 요청이 접수되었습니다. 담당자가 검토 후 연락드리겠습니다.", "created_at": sr.created_at.isoformat() if sr.created_at else None, "sla_deadline": sr.sla_deadline.isoformat() if sr.sla_deadline else None, } # ── SR 목록 (내 기관) ───────────────────────────────────────────────────────── @router.get("/sr") async def portal_list_sr( status: Optional[str] = None, limit: int = 20, skip: int = 0, db: AsyncSession = Depends(get_db), cu: User = Depends(get_current_user), ): """고객 포털 내 SR 목록 (기관 필터 자동 적용).""" q = select(SRRequest).order_by(SRRequest.created_at.desc()) if cu.role == UserRole.CUSTOMER and cu.inst_code: inst = (await db.execute( select(Institution).where(Institution.inst_code == cu.inst_code) )).scalars().first() if inst: q = q.where(SRRequest.inst_id == inst.id) else: q = q.where(SRRequest.requested_by == cu.username) else: q = q.where(SRRequest.requested_by == cu.username) if status: q = q.where(SRRequest.status == status) rows = (await db.execute(q.offset(skip).limit(limit))).scalars().all() return [ { "sr_id": r.sr_id, "title": r.title, "status": r.status, "priority": r.priority, "created_at": r.created_at.isoformat() if r.created_at else None, "sla_deadline":r.sla_deadline.isoformat() if r.sla_deadline else None, "sla_breached":r.sla_breached, "assigned_to": r.assigned_to, } for r in rows ] # ── SR 상세 + 처리 이력 ─────────────────────────────────────────────────────── @router.get("/sr/{sr_id}") async def portal_sr_detail( sr_id: str, db: AsyncSession = Depends(get_db), cu: User = Depends(get_current_user), ): """SR 상세 + 감사 이력 (타임라인 형식).""" sr = (await db.execute( select(SRRequest).where(SRRequest.sr_id == sr_id) )).scalars().first() if not sr: raise HTTPException(404, "SR을 찾을 수 없습니다.") # 감사 이력 logs = (await db.execute( select(AuditLog).where(AuditLog.sr_id == sr_id) .order_by(AuditLog.id.asc()) )).scalars().all() timeline = [ { "action": l.action, "actor": l.actor, "detail": l.detail, "created_at": l.created_at.isoformat() if l.created_at else None, } for l in logs ] # 진행률 계산 STATUS_PROGRESS = { "RECEIVED": 10, "PARSED": 20, "PENDING_APPROVAL": 35, "APPROVED": 50, "IN_PROGRESS": 70, "PENDING_PM_VALIDATION": 85, "COMPLETED": 100, "REJECTED": 0, "FAILED_ROLLBACK": 0, } return { "sr_id": sr.sr_id, "title": sr.title, "description": sr.description, "status": sr.status, "priority": sr.priority, "sr_type": sr.sr_type, "requested_by":sr.requested_by, "assigned_to": sr.assigned_to, "target_server":sr.target_server, "created_at": sr.created_at.isoformat() if sr.created_at else None, "updated_at": sr.updated_at.isoformat() if sr.updated_at else None, "sla_deadline":sr.sla_deadline.isoformat() if sr.sla_deadline else None, "sla_breached":sr.sla_breached, "progress_pct":STATUS_PROGRESS.get(sr.status, 0), "timeline": timeline, } # ── 만족도 평가 ─────────────────────────────────────────────────────────────── class RatingRequest(BaseModel): score: int # 1-5 comment: Optional[str] = None @router.post("/sr/{sr_id}/rate") async def portal_rate_sr( sr_id: str, body: RatingRequest, db: AsyncSession = Depends(get_db), cu: User = Depends(get_current_user), ): """SR 처리 만족도 평가 (1~5점).""" if not 1 <= body.score <= 5: raise HTTPException(400, "점수는 1~5 사이여야 합니다.") sr = (await db.execute(select(SRRequest).where(SRRequest.sr_id == sr_id))).scalars().first() if not sr: raise HTTPException(404, "SR을 찾을 수 없습니다.") if sr.status != SRStatus.COMPLETED: raise HTTPException(400, "완료된 SR에만 평가할 수 있습니다.") # 기존 평가 확인 existing = (await db.execute( select(Rating).where(Rating.sr_id == sr_id) )).scalars().first() if existing: existing.score = body.score existing.comment = body.comment existing.rated_at = datetime.now() else: db.add(Rating( sr_id = sr_id, score = body.score, comment = body.comment, rated_by= cu.username, )) await db.commit() return {"message": f"평가가 등록되었습니다. (점수: {body.score}/5)", "sr_id": sr_id} # ── AI FAQ 자가해결 추천 ────────────────────────────────────────────────────── class FAQRequest(BaseModel): query: str # 문제 설명 또는 SR 제목 @router.post("/faq/suggest") async def faq_suggest( body: FAQRequest, db: AsyncSession = Depends(get_db), _u: User = Depends(get_current_user), ): """SR 접수 전 AI가 유사 KB 문서를 추천하여 자가해결 유도.""" from models import KBDocument from sqlalchemy import select, or_ # 1. 키워드 기반 KB 검색 keywords = body.query.replace(".", " ").replace(",", " ").split()[:5] conditions = [KBDocument.title.contains(kw) for kw in keywords if len(kw) >= 2] conditions += [KBDocument.solution.contains(kw) for kw in keywords if len(kw) >= 2] conditions += [KBDocument.symptoms.contains(kw) for kw in keywords if len(kw) >= 2] kb_results = [] if conditions: rows = (await db.execute( select(KBDocument).where(or_(*conditions)).limit(5) )).scalars().all() kb_results = [ { "kb_id": r.id, "title": r.title, "summary": (getattr(r, "solution", None) or getattr(r, "content", None) or "")[:200], "relevance": "HIGH" if any(kw in (r.title or "").lower() for kw in keywords) else "MEDIUM", } for r in rows ] # 2. Ollama LLM으로 자가해결 방법 생성 llm_answer = None if not kb_results: try: from core.llm_client import get_llm_client prompt = ( f"다음 IT 문제에 대해 간단한 자가해결 방법을 3단계로 알려주세요:\n" f"문제: {body.query}\n" f"형식: 1. 첫 번째 단계 2. 두 번째 단계 3. 세 번째 단계" ) client = get_llm_client() resp = await client.chat(prompt) llm_answer = resp.content.strip()[:500] except Exception: pass # 3. 유사 SR 이력 조회 past_srs = (await db.execute( select(SRRequest) .where( SRRequest.status == SRStatus.COMPLETED, or_(*[SRRequest.title.contains(kw) for kw in keywords if len(kw) >= 2]) ) .limit(3) )).scalars().all() if keywords else [] similar_srs = [ { "sr_id": s.sr_id, "title": s.title, "summary": "이미 해결된 유사 사례입니다. 담당자에게 동일 조치를 요청하세요.", } for s in past_srs ] # 자가해결 가능 여부 판단 can_self_solve = bool(kb_results or llm_answer) return { "query": body.query, "can_self_solve": can_self_solve, "kb_articles": kb_results, "llm_guide": llm_answer, "similar_srs": similar_srs, "message": ( "아래 자료로 문제를 해결해 보세요. 해결되지 않으면 SR을 접수하세요." if can_self_solve else "유사한 해결 사례를 찾지 못했습니다. SR을 접수해 주세요." ), } # ── 서비스 카탈로그 (고객용) ────────────────────────────────────────────────── @router.get("/catalog") async def portal_catalog( db: AsyncSession = Depends(get_db), _u: User = Depends(get_current_user), ): """고객사 이용 가능한 서비스 카탈로그.""" from models import ServiceItem rows = (await db.execute( select(ServiceItem).where(ServiceItem.status == "ACTIVE") .order_by(ServiceItem.category, ServiceItem.name) )).scalars().all() by_cat: dict = {} for r in rows: cat = r.category or "기타" by_cat.setdefault(cat, []).append({ "service_id": r.service_id, "name": r.name, "description": r.description, "sla_hours": getattr(r, "sla_hours", None), "category": cat, }) return {"categories": list(by_cat.keys()), "by_category": by_cat} # ── 공지사항 ────────────────────────────────────────────────────────────────── _ANNOUNCEMENTS = [ { "id": 1, "title": "GUARDiA ITSM 2.0 업데이트 안내", "content": "PMS 기능 및 AI 보고서 자동 생성 기능이 추가되었습니다.", "category": "업데이트", "published": "2026-05-29", "pinned": True, }, { "id": 2, "title": "SR 처리 시간 단축 안내", "content": "AI 자동 분류 적용으로 평균 처리 시간이 30% 단축되었습니다.", "category": "운영", "published": "2026-05-28", "pinned": False, }, ] @router.get("/announcements") async def portal_announcements(_u: User = Depends(get_current_user)): """공지사항 목록.""" return {"announcements": sorted(_ANNOUNCEMENTS, key=lambda x: (not x["pinned"], x["published"]), reverse=False)} # ── 내 기관 통계 ────────────────────────────────────────────────────────────── @router.get("/stats") async def portal_stats( db: AsyncSession = Depends(get_db), cu: User = Depends(get_current_user), ): """고객 포털 — 내 기관 SR 통계.""" q_base = select(SRRequest) if cu.role == UserRole.CUSTOMER and cu.inst_code: inst = (await db.execute( select(Institution).where(Institution.inst_code == cu.inst_code) )).scalars().first() if inst: q_base = q_base.where(SRRequest.inst_id == inst.id) else: q_base = q_base.where(SRRequest.requested_by == cu.username) rows = (await db.execute(q_base)).scalars().all() total = len(rows) completed = sum(1 for r in rows if r.status == SRStatus.COMPLETED) in_prog = sum(1 for r in rows if r.status == SRStatus.IN_PROGRESS) breached = sum(1 for r in rows if r.sla_breached) # 평균 평점 rated = [r for r in rows if r.sr_id] ratings = (await db.execute( select(Rating).where(Rating.sr_id.in_([r.sr_id for r in rated[:50]])) )).scalars().all() avg_score = round(sum(r.score for r in ratings) / len(ratings), 1) if ratings else None return { "total": total, "completed": completed, "in_progress": in_prog, "sla_breached": breached, "completion_rate":round(completed / total * 100, 1) if total else 0.0, "avg_satisfaction":avg_score, "by_status": { s: sum(1 for r in rows if r.status == s) for s in ["RECEIVED", "IN_PROGRESS", "COMPLETED", "REJECTED"] }, }