guardia-itsm/routers/approvals.py
DESKTOP-TKLFCPRython 64c27c3509 feat(itsm): G-1~G-12 확장 기능 + 하네스/봇/설치스크립트 구현
G-1: 메신저 Webhook Relay + _send_to_room 실제 httpx 호출 구현
G-2: POST /api/tasks/bulk SR 대량작업 엔드포인트 (최대 100건)
G-3: 라이선스 만료 알림 스케줄러 (매일 09:00 KST)
G-4: 체험판 upgrade_banner 필드 + license.py 배너 로직
G-5: core/auto_rca.py + incidents/problem auto-rca 엔드포인트
G-6: core/deploy_impact.py + vibe impact-analysis 엔드포인트
G-7: core/ticket_classifier.py + SR 생성 시 AI 분류 + ai-suggestion API
G-8: VulnPatchRecord 모델 + vuln_scan 패치추적 4개 엔드포인트
G-9: core/jira_sync.py + gateway Jira/Confluence 연동 엔드포인트
G-10: core/push_notify.py + routers/push.py + PushSubscription 모델
G-11: approvals 다중승인 (위임/서명/기한초과/마감연장)
G-12: alembic.ini + migrations/ + cicd/migrate_to_postgres.sh

하네스: guardia-orchestrator 확장기능 Phase 반영
봇명령어: /sr /status /license /bulk 슬래시 명령어 추가
설치스크립트: setup/ (Ubuntu, CentOS, RHEL, Windows) --test 옵션 포함

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 18:18:52 +09:00

248 lines
9.0 KiB
Python

"""PM Approval workflow endpoints."""
import hashlib
from datetime import datetime
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.events import broadcast
from database import get_db
from models import (
ApprovalCreate, ApprovalFlow, ApprovalOut, ApprovalResult,
SRRequest, SRStatus, AuditLog, User, compute_log_hash
)
router = APIRouter(prefix="/api/approvals", tags=["approvals"])
async def _write_audit(db: AsyncSession, sr_id: str, actor: str, action: str, detail: str) -> None:
result = await db.execute(
select(AuditLog).where(AuditLog.sr_id == sr_id)
.order_by(AuditLog.id.desc()).limit(1)
)
last = result.scalars().first()
prev_hash = last.log_hash if last else None
ts = datetime.now().isoformat()
log_hash = compute_log_hash(prev_hash, actor, action, detail, ts)
db.add(AuditLog(
sr_id=sr_id, actor=actor, action=action, detail=detail,
prev_hash=prev_hash, log_hash=log_hash
))
@router.get("/{sr_id}", response_model=List[ApprovalOut])
async def list_approvals(sr_id: str, db: AsyncSession = Depends(get_db),
_u: User = Depends(get_current_user)):
result = await db.execute(
select(ApprovalFlow).where(ApprovalFlow.sr_id == sr_id)
.order_by(ApprovalFlow.created_at)
)
return result.scalars().all()
@router.post("/{sr_id}", response_model=ApprovalOut, status_code=201)
async def decide_approval(sr_id: str, payload: ApprovalCreate,
db: AsyncSession = Depends(get_db),
_u: User = Depends(get_current_user)):
r = await db.execute(select(SRRequest).where(SRRequest.sr_id == sr_id))
sr = r.scalars().first()
if not sr:
raise HTTPException(404, detail="SR을 찾을 수 없습니다.")
if sr.status != SRStatus.PENDING_APPROVAL:
raise HTTPException(400, detail="승인 대기 상태의 SR만 처리할 수 있습니다.")
apv = ApprovalFlow(
sr_id=sr_id,
approver=payload.approver,
result=payload.result,
comment=payload.comment,
decided_at=datetime.now(),
)
db.add(apv)
if payload.result == ApprovalResult.APPROVED:
sr.status = SRStatus.APPROVED
action, detail = "SR_APPROVED", f"{payload.approver} 승인"
else:
sr.status = SRStatus.REJECTED
action, detail = "SR_REJECTED", f"{payload.approver} 반려: {payload.comment or ''}"
sr.updated_at = datetime.now()
await _write_audit(db, sr_id, payload.approver, action, detail)
await db.commit()
await db.refresh(apv)
# 실시간 이벤트 브로드캐스트
await broadcast("approval_done", {
"sr_id": sr_id,
"title": sr.title,
"result": payload.result,
"approver": payload.approver,
"new_status": sr.status,
})
return apv
# ── G-11: 다중 승인 체계 ─────────────────────────────────────────────────────
class DelegateRequest(BaseModel):
delegate_to: int # User.id
until: str # ISO datetime
reason: Optional[str] = None
class SignatureRequest(BaseModel):
signature: str # 전자서명 해시 (비밀번호 해시 or 서명 데이터)
class ExtendDeadlineRequest(BaseModel):
new_deadline: str # ISO datetime
reason: Optional[str] = None
@router.post("/{approval_id}/delegate")
async def delegate_approval(
approval_id: int,
body: DelegateRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""결재 위임 — 현재 결재자만 가능."""
apv = await db.get(ApprovalFlow, approval_id)
if not apv:
raise HTTPException(404, "승인 레코드를 찾을 수 없습니다.")
if apv.approver != current_user.username:
raise HTTPException(403, "본인의 결재만 위임할 수 있습니다.")
if apv.result != ApprovalResult.PENDING:
raise HTTPException(400, "이미 처리된 결재는 위임할 수 없습니다.")
try:
delegate_until = datetime.fromisoformat(body.until)
except ValueError:
raise HTTPException(400, "until 필드는 ISO 날짜시간 형식이어야 합니다.")
# 위임 대상 사용자 확인
delegate_user = await db.get(User, body.delegate_to)
if not delegate_user:
raise HTTPException(404, f"위임 대상 사용자 ID {body.delegate_to}를 찾을 수 없습니다.")
apv.delegate_to = body.delegate_to
apv.delegate_until = delegate_until
await _write_audit(
db, apv.sr_id, current_user.username, "APPROVAL_DELEGATED",
f"결재 위임 → {delegate_user.username} (until {body.until}) | 사유: {body.reason or ''}"
)
await db.commit()
return {
"approval_id": approval_id,
"delegated_to": delegate_user.username,
"delegate_until": body.until,
"message": f"{delegate_user.username}에게 결재가 위임되었습니다.",
}
@router.post("/{approval_id}/sign")
async def sign_approval(
approval_id: int,
body: SignatureRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""전자서명 등록 — 승인 결재자 또는 위임받은 사용자만 가능."""
apv = await db.get(ApprovalFlow, approval_id)
if not apv:
raise HTTPException(404, "승인 레코드를 찾을 수 없습니다.")
if apv.result != ApprovalResult.PENDING:
raise HTTPException(400, "대기 중인 결재에만 서명할 수 있습니다.")
# 서명자 검증
is_delegate = (
apv.delegate_to == current_user.id
and apv.delegate_until
and apv.delegate_until >= datetime.now()
)
if apv.approver != current_user.username and not is_delegate:
raise HTTPException(403, "결재 권한이 없습니다.")
# 서명 해시 저장 (평문 서명 데이터는 저장 금지)
sig_hash = hashlib.sha256(body.signature.encode()).hexdigest()
apv.signature = sig_hash
await _write_audit(
db, apv.sr_id, current_user.username, "APPROVAL_SIGNED",
f"전자서명 등록 (hash={sig_hash[:8]}...)"
)
await db.commit()
return {"approval_id": approval_id, "signed": True, "signer": current_user.username}
@router.get("/pending/overdue")
async def overdue_approvals(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""기한 초과 대기 승인 목록 — PM/ADMIN만 접근 가능."""
from models import UserRole
if current_user.role not in (UserRole.ADMIN, UserRole.PM):
raise HTTPException(403, "PM 또는 ADMIN만 접근할 수 있습니다.")
now = datetime.now()
rows = (await db.execute(
select(ApprovalFlow).where(
ApprovalFlow.result == ApprovalResult.PENDING,
ApprovalFlow.deadline_at.isnot(None),
ApprovalFlow.deadline_at < now,
).order_by(ApprovalFlow.deadline_at)
)).scalars().all()
return [
{
"id": r.id,
"sr_id": r.sr_id,
"approver": r.approver,
"deadline_at": r.deadline_at.isoformat(),
"overdue_hours": round((now - r.deadline_at).total_seconds() / 3600, 1),
"delegate_to": r.delegate_to,
}
for r in rows
]
@router.post("/{approval_id}/extend-deadline")
async def extend_deadline(
approval_id: int,
body: ExtendDeadlineRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""승인 마감 시간 연장 (PM/ADMIN 전용)."""
from models import UserRole
if current_user.role not in (UserRole.ADMIN, UserRole.PM):
raise HTTPException(403, "PM 또는 ADMIN만 마감을 연장할 수 있습니다.")
apv = await db.get(ApprovalFlow, approval_id)
if not apv:
raise HTTPException(404, "승인 레코드를 찾을 수 없습니다.")
if apv.result != ApprovalResult.PENDING:
raise HTTPException(400, "대기 중인 결재만 마감 연장 가능합니다.")
try:
new_deadline = datetime.fromisoformat(body.new_deadline)
except ValueError:
raise HTTPException(400, "new_deadline 필드는 ISO 날짜시간 형식이어야 합니다.")
old_deadline = apv.deadline_at
apv.deadline_at = new_deadline
await _write_audit(
db, apv.sr_id, current_user.username, "APPROVAL_DEADLINE_EXTENDED",
f"마감 연장: {old_deadline}{new_deadline} | 사유: {body.reason or ''}"
)
await db.commit()
return {
"approval_id": approval_id,
"new_deadline": body.new_deadline,
"message": "승인 마감이 연장되었습니다.",
}