zioinfo-mail/itsm/routers/tasks.py
DESKTOP-TKLFCPR\ython 79061ee89c feat(itsm): Jira-like ITSM 시스템 구현
- FastAPI + SQLAlchemy(aiosqlite) 기반 SR 상태 머신
  (RECEIVED → PARSED → PENDING_APPROVAL → APPROVED → IN_PROGRESS
   → PENDING_PM_VALIDATION → COMPLETED / FAILED_ROLLBACK)
- PM 승인 워크플로우 (ApprovalFlow 테이블)
- SHA-256 해시 체인 감사 로그 (위변조 방지)
- AES-256-GCM 서버 자격증명 암호화 (IP/PW API 미노출)
- CMDB: 기관(MOF/MOIS/MSS) + 서버 정보 관리
- 더미 데이터 자동 시딩 (6개 SR, 3개 기관, 6개 서버)
- Dark-theme SPA: 대시보드 / 칸반 보드 / SR 목록 / 감사 로그 / CMDB

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-24 19:31:09 +09:00

157 lines
5.5 KiB
Python

"""SR / Task CRUD + status transition endpoints."""
from datetime import datetime
from typing import List, Optional
from uuid import uuid4
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
from models import (
AuditLog, Institution, SRCreate, SROut, SRRequest, SRStatus,
SRStatusUpdate, SRType, compute_log_hash
)
router = APIRouter(prefix="/api/tasks", tags=["tasks"])
# Valid state transitions
_TRANSITIONS: dict[str, list[str]] = {
SRStatus.RECEIVED: [SRStatus.PARSED, SRStatus.REJECTED],
SRStatus.PARSED: [SRStatus.PENDING_APPROVAL, SRStatus.REJECTED],
SRStatus.PENDING_APPROVAL: [SRStatus.APPROVED, SRStatus.REJECTED],
SRStatus.APPROVED: [SRStatus.IN_PROGRESS, SRStatus.REJECTED],
SRStatus.IN_PROGRESS: [SRStatus.PENDING_PM_VALIDATION, SRStatus.FAILED_ROLLBACK],
SRStatus.PENDING_PM_VALIDATION:[SRStatus.COMPLETED, SRStatus.FAILED_ROLLBACK],
SRStatus.COMPLETED: [],
SRStatus.FAILED_ROLLBACK: [],
SRStatus.REJECTED: [],
}
def _new_sr_id() -> str:
return f"SR-{datetime.now().strftime('%Y%m%d')}-{str(uuid4())[:6].upper()}"
async def _write_audit(db: AsyncSession, sr_id: str, actor: str,
action: str, detail: str) -> None:
from sqlalchemy import select as sel
result = await db.execute(
sel(AuditLog).where(AuditLog.sr_id == sr_id)
.order_by(AuditLog.id.desc()).limit(1)
)
last = result.scalars().first()
prev_hash = last.log_hash if last else None
ts = datetime.now().isoformat()
log_hash = compute_log_hash(prev_hash, actor, action, detail, ts)
db.add(AuditLog(
sr_id=sr_id, actor=actor, action=action, detail=detail,
prev_hash=prev_hash, log_hash=log_hash
))
@router.get("", response_model=List[SROut])
async def list_tasks(
status: Optional[str] = Query(None),
sr_type: Optional[str] = Query(None),
priority: Optional[str]= Query(None),
keyword: Optional[str] = Query(None),
skip: int = 0,
limit: int = 50,
db: AsyncSession = Depends(get_db),
):
q = select(SRRequest).order_by(SRRequest.created_at.desc())
if status:
q = q.where(SRRequest.status == status)
if sr_type:
q = q.where(SRRequest.sr_type == sr_type)
if priority:
q = q.where(SRRequest.priority == priority)
if keyword:
q = q.where(SRRequest.title.contains(keyword))
q = q.offset(skip).limit(limit)
result = await db.execute(q)
return result.scalars().all()
@router.get("/stats")
async def get_stats(db: AsyncSession = Depends(get_db)):
total = (await db.execute(select(func.count(SRRequest.id)))).scalar()
by_status: dict[str, int] = {}
for s in SRStatus:
cnt = (await db.execute(
select(func.count(SRRequest.id)).where(SRRequest.status == s)
)).scalar()
if cnt:
by_status[s.value] = cnt
by_type: dict[str, int] = {}
for t in SRType:
cnt = (await db.execute(
select(func.count(SRRequest.id)).where(SRRequest.sr_type == t)
)).scalar()
if cnt:
by_type[t.value] = cnt
return {"total": total, "by_status": by_status, "by_type": by_type}
@router.get("/{sr_id}", response_model=SROut)
async def get_task(sr_id: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(SRRequest).where(SRRequest.sr_id == sr_id))
sr = result.scalars().first()
if not sr:
raise HTTPException(404, detail="SR을 찾을 수 없습니다.")
return sr
@router.post("", response_model=SROut, status_code=201)
async def create_task(payload: SRCreate, db: AsyncSession = Depends(get_db)):
inst_id = None
if payload.inst_code:
r = await db.execute(
select(Institution).where(Institution.inst_code == payload.inst_code)
)
inst = r.scalars().first()
if inst:
inst_id = inst.id
sr = SRRequest(
sr_id=_new_sr_id(),
inst_id=inst_id,
sr_type=payload.sr_type,
title=payload.title,
description=payload.description,
status=SRStatus.RECEIVED,
priority=payload.priority,
requested_by=payload.requested_by,
assigned_to=payload.assigned_to,
target_server=payload.target_server,
)
db.add(sr)
await db.flush()
await _write_audit(db, sr.sr_id, payload.requested_by, "SR_CREATED", f"SR 생성: {payload.title}")
await db.commit()
await db.refresh(sr)
return sr
@router.patch("/{sr_id}/status", response_model=SROut)
async def update_status(sr_id: str, payload: SRStatusUpdate,
db: AsyncSession = Depends(get_db)):
result = await db.execute(select(SRRequest).where(SRRequest.sr_id == sr_id))
sr = result.scalars().first()
if not sr:
raise HTTPException(404, detail="SR을 찾을 수 없습니다.")
allowed = _TRANSITIONS.get(SRStatus(sr.status), [])
if payload.status not in allowed:
raise HTTPException(400, detail=f"'{sr.status}''{payload.status}' 전이는 허용되지 않습니다.")
old_status = sr.status
sr.status = payload.status
sr.updated_at = datetime.now()
detail = payload.comment or f"상태 변경: {old_status}{payload.status}"
await _write_audit(db, sr_id, payload.actor, "STATUS_CHANGED", detail)
await db.commit()
await db.refresh(sr)
return sr