""" B-5: 멀티 에이전트 협업 오케스트레이션 API 라우터 엔드포인트: POST /api/orchestrator/workflows — 워크플로우 생성 및 실행 GET /api/orchestrator/workflows — 워크플로우 목록 GET /api/orchestrator/workflows/{id} — 워크플로우 상세 (단계 포함) POST /api/orchestrator/workflows/{id}/retry — 실패한 워크플로우 재시도 DELETE /api/orchestrator/workflows/{id} — 워크플로우 취소 GET /api/orchestrator/templates — 사용 가능한 워크플로우 템플릿 GET /api/orchestrator/stats — 오케스트레이터 통계 """ from __future__ import annotations import json import logging from datetime import datetime, timedelta from typing import List, Optional from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query from sqlalchemy import desc, func, select from sqlalchemy.ext.asyncio import AsyncSession from database import get_db, SessionLocal from models import ( WorkflowInstance, WorkflowInstanceOut, WorkflowStep, WorkflowStepOut, WorkflowCreateRequest, ) from core.orchestrator import ( create_workflow_instance, execute_workflow, WORKFLOW_TEMPLATES, ) logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/orchestrator", tags=["orchestrator"]) # ── 워크플로우 생성 및 실행 ────────────────────────────────────────────────── @router.post("/workflows", response_model=WorkflowInstanceOut, status_code=202) async def create_workflow( body: WorkflowCreateRequest, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db), ): """ 워크플로우 생성 및 비동기 실행. - 인스턴스 + 단계 레코드 즉시 생성 → 202 반환 - 백그라운드에서 단계 순차 실행 """ # 커스텀 워크플로우 외에는 템플릿 필수 if body.workflow_type != "CUSTOM" and body.workflow_type not in WORKFLOW_TEMPLATES: raise HTTPException( 400, f"알 수 없는 워크플로우 유형: {body.workflow_type}. " f"사용 가능: {list(WORKFLOW_TEMPLATES.keys())}" ) instance = await create_workflow_instance( db = db, workflow_type= body.workflow_type, title = body.title, sr_id = body.sr_id, project_id = body.project_id, context = body.context, triggered_by = "api", ) steps = WORKFLOW_TEMPLATES.get(body.workflow_type, []) ctx = json.loads(instance.context_json) if instance.context_json else {} # 백그라운드 실행 background_tasks.add_task( execute_workflow, instance.id, steps, ctx, SessionLocal, ) return instance @router.get("/workflows", response_model=List[WorkflowInstanceOut]) async def list_workflows( status: Optional[str] = Query(None), workflow_type: Optional[str] = Query(None), hours: int = Query(72, ge=1, le=720), limit: int = Query(20, ge=1, le=100), offset: int = Query(0, ge=0), db: AsyncSession = Depends(get_db), ): """워크플로우 인스턴스 목록.""" since = datetime.utcnow() - timedelta(hours=hours) conditions = [WorkflowInstance.created_at >= since] if status: conditions.append(WorkflowInstance.status == status.upper()) if workflow_type: conditions.append(WorkflowInstance.workflow_type == workflow_type.upper()) from sqlalchemy import and_ q = ( select(WorkflowInstance) .where(and_(*conditions)) .order_by(desc(WorkflowInstance.created_at)) .limit(limit) .offset(offset) ) rows = (await db.execute(q)).scalars().all() return rows @router.get("/workflows/{instance_id}", response_model=WorkflowInstanceOut) async def get_workflow( instance_id: int, db: AsyncSession = Depends(get_db), ): """워크플로우 상세 (단계 목록 포함).""" from sqlalchemy.orm import selectinload q = ( select(WorkflowInstance) .options(selectinload(WorkflowInstance.steps)) .where(WorkflowInstance.id == instance_id) ) inst = (await db.execute(q)).scalars().first() if not inst: raise HTTPException(404, f"워크플로우 {instance_id}를 찾을 수 없습니다.") return inst @router.post("/workflows/{instance_id}/retry", status_code=202) async def retry_workflow( instance_id: int, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db), ): """실패한 워크플로우 재시도.""" inst = (await db.execute( select(WorkflowInstance).where(WorkflowInstance.id == instance_id) )).scalars().first() if not inst: raise HTTPException(404, f"워크플로우 {instance_id}를 찾을 수 없습니다.") if inst.status not in ("FAILED", "CANCELLED"): raise HTTPException(400, f"FAILED/CANCELLED 상태가 아닙니다: {inst.status}") # 상태 초기화 inst.status = "PENDING" inst.error_msg = None inst.progress_pct = 0 inst.current_step = 0 inst.started_at = None inst.completed_at = None # 모든 단계 초기화 steps_rows = (await db.execute( select(WorkflowStep).where(WorkflowStep.instance_id == instance_id) )).scalars().all() for s in steps_rows: s.status = "PENDING" s.output_json = None s.error_msg = None s.started_at = None s.completed_at = None await db.commit() # 재실행 steps = WORKFLOW_TEMPLATES.get(inst.workflow_type, []) ctx = json.loads(inst.context_json) if inst.context_json else {} background_tasks.add_task(execute_workflow, instance_id, steps, ctx, SessionLocal) return {"instance_id": instance_id, "status": "retry_queued"} @router.delete("/workflows/{instance_id}", status_code=204) async def cancel_workflow( instance_id: int, db: AsyncSession = Depends(get_db), ): """워크플로우 취소 (실행 중인 것은 취소 표시만).""" inst = (await db.execute( select(WorkflowInstance).where(WorkflowInstance.id == instance_id) )).scalars().first() if not inst: raise HTTPException(404, f"워크플로우 {instance_id}를 찾을 수 없습니다.") if inst.status == "COMPLETED": raise HTTPException(400, "이미 완료된 워크플로우입니다.") inst.status = "CANCELLED" inst.completed_at = datetime.utcnow() await db.commit() # ── 워크플로우 템플릿 ───────────────────────────────────────────────────────── @router.get("/templates") async def list_templates(): """사용 가능한 워크플로우 템플릿 목록.""" templates = [] for wf_type, steps in WORKFLOW_TEMPLATES.items(): templates.append({ "workflow_type": wf_type, "step_count": len(steps), "agents": list({s["agent_name"] for s in steps}), "steps": [ { "order": s["order"], "agent": s["agent_name"], "action": s["action"], "description": s.get("description", ""), "requires_llm": s.get("requires_llm", False), } for s in steps ], }) return {"templates": templates} # ── 통계 ───────────────────────────────────────────────────────────────────── @router.get("/stats") async def get_orchestrator_stats( db: AsyncSession = Depends(get_db), ): """오케스트레이터 통계.""" total = (await db.execute( select(func.count()).select_from(WorkflowInstance) )).scalar() or 0 by_status = {} rows = (await db.execute( select(WorkflowInstance.status, func.count()) .group_by(WorkflowInstance.status) )).all() for status, cnt in rows: by_status[status] = cnt by_type = {} rows2 = (await db.execute( select(WorkflowInstance.workflow_type, func.count()) .group_by(WorkflowInstance.workflow_type) )).all() for wf_type, cnt in rows2: by_type[wf_type] = cnt # 평균 소요 시간 (완료된 것) from sqlalchemy import extract avg_rows = (await db.execute( select(WorkflowInstance) .where(WorkflowInstance.status == "COMPLETED") .order_by(desc(WorkflowInstance.completed_at)) .limit(100) )).scalars().all() avg_duration = 0.0 if avg_rows: durations = [ (r.completed_at - r.started_at).total_seconds() for r in avg_rows if r.started_at and r.completed_at ] avg_duration = round(sum(durations) / len(durations), 1) if durations else 0.0 return { "total_workflows": total, "by_status": by_status, "by_type": by_type, "avg_duration_sec": avg_duration, "available_templates": list(WORKFLOW_TEMPLATES.keys()), }