zioinfo-mail/workspace/guardia-itsm/routers/approvals.py
DESKTOP-TKLFCPR\ython cfe2901a55 refactor(structure): consolidate all projects under workspace/
- itsm/    -> workspace/guardia-itsm/
- manager/ -> workspace/guardia-manager/
- app/     -> workspace/guardia-messenger/
- manual/  -> workspace/guardia-docs/

workspace/zioinfo-web/ unchanged.
git mv preserves full commit history.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-31 23:50:56 +09:00

248 lines
9.0 KiB
Python

"""PM Approval workflow endpoints."""
import hashlib
from datetime import datetime
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.events import broadcast
from database import get_db
from models import (
ApprovalCreate, ApprovalFlow, ApprovalOut, ApprovalResult,
SRRequest, SRStatus, AuditLog, User, compute_log_hash
)
router = APIRouter(prefix="/api/approvals", tags=["approvals"])
async def _write_audit(db: AsyncSession, sr_id: str, actor: str, action: str, detail: str) -> None:
result = await db.execute(
select(AuditLog).where(AuditLog.sr_id == sr_id)
.order_by(AuditLog.id.desc()).limit(1)
)
last = result.scalars().first()
prev_hash = last.log_hash if last else None
ts = datetime.now().isoformat()
log_hash = compute_log_hash(prev_hash, actor, action, detail, ts)
db.add(AuditLog(
sr_id=sr_id, actor=actor, action=action, detail=detail,
prev_hash=prev_hash, log_hash=log_hash
))
@router.get("/{sr_id}", response_model=List[ApprovalOut])
async def list_approvals(sr_id: str, db: AsyncSession = Depends(get_db),
_u: User = Depends(get_current_user)):
result = await db.execute(
select(ApprovalFlow).where(ApprovalFlow.sr_id == sr_id)
.order_by(ApprovalFlow.created_at)
)
return result.scalars().all()
@router.post("/{sr_id}", response_model=ApprovalOut, status_code=201)
async def decide_approval(sr_id: str, payload: ApprovalCreate,
db: AsyncSession = Depends(get_db),
_u: User = Depends(get_current_user)):
r = await db.execute(select(SRRequest).where(SRRequest.sr_id == sr_id))
sr = r.scalars().first()
if not sr:
raise HTTPException(404, detail="SR을 찾을 수 없습니다.")
if sr.status != SRStatus.PENDING_APPROVAL:
raise HTTPException(400, detail="승인 대기 상태의 SR만 처리할 수 있습니다.")
apv = ApprovalFlow(
sr_id=sr_id,
approver=payload.approver,
result=payload.result,
comment=payload.comment,
decided_at=datetime.now(),
)
db.add(apv)
if payload.result == ApprovalResult.APPROVED:
sr.status = SRStatus.APPROVED
action, detail = "SR_APPROVED", f"{payload.approver} 승인"
else:
sr.status = SRStatus.REJECTED
action, detail = "SR_REJECTED", f"{payload.approver} 반려: {payload.comment or ''}"
sr.updated_at = datetime.now()
await _write_audit(db, sr_id, payload.approver, action, detail)
await db.commit()
await db.refresh(apv)
# 실시간 이벤트 브로드캐스트
await broadcast("approval_done", {
"sr_id": sr_id,
"title": sr.title,
"result": payload.result,
"approver": payload.approver,
"new_status": sr.status,
})
return apv
# ── G-11: 다중 승인 체계 ─────────────────────────────────────────────────────
class DelegateRequest(BaseModel):
delegate_to: int # User.id
until: str # ISO datetime
reason: Optional[str] = None
class SignatureRequest(BaseModel):
signature: str # 전자서명 해시 (비밀번호 해시 or 서명 데이터)
class ExtendDeadlineRequest(BaseModel):
new_deadline: str # ISO datetime
reason: Optional[str] = None
@router.post("/{approval_id}/delegate")
async def delegate_approval(
approval_id: int,
body: DelegateRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""결재 위임 — 현재 결재자만 가능."""
apv = await db.get(ApprovalFlow, approval_id)
if not apv:
raise HTTPException(404, "승인 레코드를 찾을 수 없습니다.")
if apv.approver != current_user.username:
raise HTTPException(403, "본인의 결재만 위임할 수 있습니다.")
if apv.result != ApprovalResult.PENDING:
raise HTTPException(400, "이미 처리된 결재는 위임할 수 없습니다.")
try:
delegate_until = datetime.fromisoformat(body.until)
except ValueError:
raise HTTPException(400, "until 필드는 ISO 날짜시간 형식이어야 합니다.")
# 위임 대상 사용자 확인
delegate_user = await db.get(User, body.delegate_to)
if not delegate_user:
raise HTTPException(404, f"위임 대상 사용자 ID {body.delegate_to}를 찾을 수 없습니다.")
apv.delegate_to = body.delegate_to
apv.delegate_until = delegate_until
await _write_audit(
db, apv.sr_id, current_user.username, "APPROVAL_DELEGATED",
f"결재 위임 → {delegate_user.username} (until {body.until}) | 사유: {body.reason or ''}"
)
await db.commit()
return {
"approval_id": approval_id,
"delegated_to": delegate_user.username,
"delegate_until": body.until,
"message": f"{delegate_user.username}에게 결재가 위임되었습니다.",
}
@router.post("/{approval_id}/sign")
async def sign_approval(
approval_id: int,
body: SignatureRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""전자서명 등록 — 승인 결재자 또는 위임받은 사용자만 가능."""
apv = await db.get(ApprovalFlow, approval_id)
if not apv:
raise HTTPException(404, "승인 레코드를 찾을 수 없습니다.")
if apv.result != ApprovalResult.PENDING:
raise HTTPException(400, "대기 중인 결재에만 서명할 수 있습니다.")
# 서명자 검증
is_delegate = (
apv.delegate_to == current_user.id
and apv.delegate_until
and apv.delegate_until >= datetime.now()
)
if apv.approver != current_user.username and not is_delegate:
raise HTTPException(403, "결재 권한이 없습니다.")
# 서명 해시 저장 (평문 서명 데이터는 저장 금지)
sig_hash = hashlib.sha256(body.signature.encode()).hexdigest()
apv.signature = sig_hash
await _write_audit(
db, apv.sr_id, current_user.username, "APPROVAL_SIGNED",
f"전자서명 등록 (hash={sig_hash[:8]}...)"
)
await db.commit()
return {"approval_id": approval_id, "signed": True, "signer": current_user.username}
@router.get("/pending/overdue")
async def overdue_approvals(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""기한 초과 대기 승인 목록 — PM/ADMIN만 접근 가능."""
from models import UserRole
if current_user.role not in (UserRole.ADMIN, UserRole.PM):
raise HTTPException(403, "PM 또는 ADMIN만 접근할 수 있습니다.")
now = datetime.now()
rows = (await db.execute(
select(ApprovalFlow).where(
ApprovalFlow.result == ApprovalResult.PENDING,
ApprovalFlow.deadline_at.isnot(None),
ApprovalFlow.deadline_at < now,
).order_by(ApprovalFlow.deadline_at)
)).scalars().all()
return [
{
"id": r.id,
"sr_id": r.sr_id,
"approver": r.approver,
"deadline_at": r.deadline_at.isoformat(),
"overdue_hours": round((now - r.deadline_at).total_seconds() / 3600, 1),
"delegate_to": r.delegate_to,
}
for r in rows
]
@router.post("/{approval_id}/extend-deadline")
async def extend_deadline(
approval_id: int,
body: ExtendDeadlineRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""승인 마감 시간 연장 (PM/ADMIN 전용)."""
from models import UserRole
if current_user.role not in (UserRole.ADMIN, UserRole.PM):
raise HTTPException(403, "PM 또는 ADMIN만 마감을 연장할 수 있습니다.")
apv = await db.get(ApprovalFlow, approval_id)
if not apv:
raise HTTPException(404, "승인 레코드를 찾을 수 없습니다.")
if apv.result != ApprovalResult.PENDING:
raise HTTPException(400, "대기 중인 결재만 마감 연장 가능합니다.")
try:
new_deadline = datetime.fromisoformat(body.new_deadline)
except ValueError:
raise HTTPException(400, "new_deadline 필드는 ISO 날짜시간 형식이어야 합니다.")
old_deadline = apv.deadline_at
apv.deadline_at = new_deadline
await _write_audit(
db, apv.sr_id, current_user.username, "APPROVAL_DEADLINE_EXTENDED",
f"마감 연장: {old_deadline}{new_deadline} | 사유: {body.reason or ''}"
)
await db.commit()
return {
"approval_id": approval_id,
"new_deadline": body.new_deadline,
"message": "승인 마감이 연장되었습니다.",
}