""" 자율 워크플로우 엔진 — 조건 기반 자동 작업 흐름 기존 autonomous.py의 단순 자동 승인 큐를 넘어 규칙 기반 자동화 워크플로우를 정의하고 실행한다. 기능: - 워크플로우 규칙 정의 (트리거 + 조건 + 액션 시퀀스) - 트리거: SR_CREATED, ANOMALY_DETECTED, CRON, INCIDENT_CREATED - 액션: AUTO_ASSIGN, NOTIFY, HEALTH_CHECK, ESCALATE, SR_CREATE - 실행 이력 조회 - 최대 자동 실행 횟수 제한 (무한 루프 방지) 엔드포인트: GET /api/workflow/rules — 워크플로우 규칙 목록 POST /api/workflow/rules — 규칙 생성 PUT /api/workflow/rules/{id} — 규칙 수정 DELETE /api/workflow/rules/{id} — 규칙 삭제 POST /api/workflow/rules/{id}/run — 규칙 수동 실행 (테스트) GET /api/workflow/history — 실행 이력 POST /api/workflow/trigger — 이벤트 트리거 (내부 시스템용) """ from __future__ import annotations import json import logging from datetime import datetime from typing import Any, Dict, List, Optional from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException from pydantic import BaseModel, Field from sqlalchemy import select, desc from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user, require_admin_role from database import get_db from models import ( User, SRRequest, SRStatus, AutoWorkflowRule, AutoWorkflowRun, # 신규 모델 ) logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/workflow", tags=["Autonomous Workflow"]) # 지원 트리거 유형 TRIGGER_TYPES = ["SR_CREATED", "ANOMALY_DETECTED", "CRON", "INCIDENT_CREATED", "SR_STATUS_CHANGED"] # 지원 액션 유형 ACTION_TYPES = ["AUTO_ASSIGN", "NOTIFY_MESSENGER", "HEALTH_CHECK", "ESCALATE", "SR_CREATE", "DELAY"] # ── Pydantic 스키마 ────────────────────────────────────────────────────────── class WorkflowAction(BaseModel): type: str = Field(..., description="AUTO_ASSIGN | NOTIFY_MESSENGER | HEALTH_CHECK | ESCALATE | SR_CREATE | DELAY") params: Dict[str, Any] = Field(default_factory=dict) class WorkflowRuleCreate(BaseModel): name: str = Field(..., max_length=200) description: Optional[str] = None trigger_type: str = Field(..., description="SR_CREATED | ANOMALY_DETECTED | CRON | ...") conditions: Dict[str, Any] = Field( default_factory=dict, description='예: {"priority": "HIGH", "category": "MONITORING"}' ) actions: List[WorkflowAction] = Field(..., min_length=1) approval_required: bool = False max_daily_runs: int = Field(100, ge=1, le=1000) is_active: bool = True cron_expr: Optional[str] = Field(None, description="CRON 트리거 시 cron 표현식") class WorkflowRuleOut(BaseModel): id: int name: str description: Optional[str] trigger_type: str conditions: dict actions: list approval_required: bool max_daily_runs: int is_active: bool run_count_today: int last_run_at: Optional[datetime] created_at: datetime class TriggerRequest(BaseModel): event: str payload: Dict[str, Any] = Field(default_factory=dict) # ── 조건 평가 ──────────────────────────────────────────────────────────────── def _evaluate_condition(condition: dict, payload: dict) -> bool: """간단한 조건 평가 (AND 조합).""" for key, expected in condition.items(): actual = payload.get(key) if isinstance(expected, list): if actual not in expected: return False elif actual != expected: return False return True # ── 액션 실행 ──────────────────────────────────────────────────────────────── async def _execute_action(action: WorkflowAction, payload: dict, db: AsyncSession) -> dict: """단일 액션 실행.""" params = action.params if action.type == "AUTO_ASSIGN": # SR 자동 배정 sr_id = payload.get("sr_id") assignee_id = params.get("assignee_id") if sr_id and assignee_id: sr_row = await db.execute(select(SRRequest).where(SRRequest.id == sr_id)) sr = sr_row.scalar_one_or_none() if sr: sr.assignee_id = assignee_id sr.status = SRStatus.IN_PROGRESS await db.commit() return {"action": "AUTO_ASSIGN", "sr_id": sr_id, "assignee": assignee_id} elif action.type == "NOTIFY_MESSENGER": # ITSM 메신저 알림 import httpx msg = params.get("message", "자동화 워크플로우 알림").format(**payload) room = params.get("room", "ops") try: async with httpx.AsyncClient(timeout=5) as client: await client.post( "http://localhost:9001/api/messenger/webhook", json={"event": "workflow_notify", "room": room, "message": msg}, ) except Exception as e: logger.warning(f"메신저 알림 실패: {e}") return {"action": "NOTIFY_MESSENGER", "room": room} elif action.type == "HEALTH_CHECK": # 대상 서버 헬스체크 트리거 server_id = payload.get("server_id") or params.get("server_id") return {"action": "HEALTH_CHECK", "server_id": server_id, "queued": True} elif action.type == "ESCALATE": # SR 에스컬레이션 sr_id = payload.get("sr_id") if sr_id: sr_row = await db.execute(select(SRRequest).where(SRRequest.id == sr_id)) sr = sr_row.scalar_one_or_none() if sr: sr.priority = "HIGH" await db.commit() return {"action": "ESCALATE", "sr_id": sr_id} elif action.type == "SR_CREATE": # SR 자동 생성 new_sr = SRRequest( title=params.get("title", "자동 생성 SR").format(**payload), description=params.get("description", "워크플로우에 의해 자동 생성"), category=params.get("category", "AUTO"), priority=params.get("priority", "MEDIUM"), status=SRStatus.OPEN, created_at=datetime.utcnow(), ) db.add(new_sr) await db.commit() await db.refresh(new_sr) return {"action": "SR_CREATE", "sr_id": new_sr.id} elif action.type == "DELAY": import asyncio seconds = params.get("seconds", 5) await asyncio.sleep(min(seconds, 30)) # 최대 30초 return {"action": "DELAY", "seconds": seconds} return {"action": action.type, "skipped": True} # ── 워크플로우 실행 ────────────────────────────────────────────────────────── async def _run_workflow(rule: AutoWorkflowRule, payload: dict, db: AsyncSession): """워크플로우 규칙 실행 (비동기 백그라운드).""" run = AutoWorkflowRun( rule_id=rule.id, trigger_payload=json.dumps(payload), status="RUNNING", started_at=datetime.utcnow(), ) db.add(run) await db.commit() results = [] try: actions = json.loads(rule.actions_json) if isinstance(rule.actions_json, str) else rule.actions_json for action_data in actions: action = WorkflowAction(**action_data) result = await _execute_action(action, payload, db) results.append(result) run.status = "SUCCESS" except Exception as e: run.status = "FAILED" run.error_message = str(e)[:500] logger.error(f"워크플로우 실행 실패 (rule={rule.id}): {e}") finally: run.finished_at = datetime.utcnow() run.result_json = json.dumps(results) rule.last_run_at = datetime.utcnow() await db.commit() # ── 엔드포인트 ─────────────────────────────────────────────────────────────── @router.get("/rules", response_model=List[WorkflowRuleOut]) async def list_rules( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """워크플로우 규칙 목록.""" rows = await db.execute( select(AutoWorkflowRule).order_by(desc(AutoWorkflowRule.created_at)) ) rules = rows.scalars().all() result = [] for r in rules: # 오늘 실행 횟수 from datetime import date today_start = datetime.combine(date.today(), datetime.min.time()) run_today = await db.execute( select(func_count := __import__('sqlalchemy', fromlist=['func']).func.count(AutoWorkflowRun.id)).where( AutoWorkflowRun.rule_id == r.id, AutoWorkflowRun.started_at >= today_start, ) ) result.append(WorkflowRuleOut( id=r.id, name=r.name, description=r.description, trigger_type=r.trigger_type, conditions=json.loads(r.conditions_json) if r.conditions_json else {}, actions=json.loads(r.actions_json) if r.actions_json else [], approval_required=r.approval_required, max_daily_runs=r.max_daily_runs, is_active=r.is_active, run_count_today=run_today.scalar() or 0, last_run_at=r.last_run_at, created_at=r.created_at, )) return result @router.post("/rules") async def create_rule( req: WorkflowRuleCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_admin_role), ): """워크플로우 규칙 생성.""" if req.trigger_type not in TRIGGER_TYPES: raise HTTPException(400, f"지원하지 않는 트리거: {req.trigger_type}. 지원: {TRIGGER_TYPES}") rule = AutoWorkflowRule( name=req.name, description=req.description, trigger_type=req.trigger_type, conditions_json=json.dumps(req.conditions), actions_json=json.dumps([a.model_dump() for a in req.actions]), approval_required=req.approval_required, max_daily_runs=req.max_daily_runs, cron_expr=req.cron_expr, is_active=req.is_active, created_by=user.id, created_at=datetime.utcnow(), ) db.add(rule) await db.commit() await db.refresh(rule) return {"ok": True, "id": rule.id, "name": rule.name} @router.put("/rules/{rule_id}") async def update_rule( rule_id: int, req: WorkflowRuleCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_admin_role), ): """워크플로우 규칙 수정.""" row = await db.execute(select(AutoWorkflowRule).where(AutoWorkflowRule.id == rule_id)) rule = row.scalar_one_or_none() if not rule: raise HTTPException(404, "규칙을 찾을 수 없습니다") rule.name = req.name rule.description = req.description rule.trigger_type = req.trigger_type rule.conditions_json = json.dumps(req.conditions) rule.actions_json = json.dumps([a.model_dump() for a in req.actions]) rule.approval_required = req.approval_required rule.max_daily_runs = req.max_daily_runs rule.is_active = req.is_active await db.commit() return {"ok": True} @router.delete("/rules/{rule_id}") async def delete_rule( rule_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(require_admin_role), ): """워크플로우 규칙 삭제.""" row = await db.execute(select(AutoWorkflowRule).where(AutoWorkflowRule.id == rule_id)) rule = row.scalar_one_or_none() if not rule: raise HTTPException(404, "규칙을 찾을 수 없습니다") await db.delete(rule) await db.commit() return {"ok": True} @router.post("/rules/{rule_id}/run") async def run_rule_manually( rule_id: int, payload: dict = {}, background_tasks: BackgroundTasks = None, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """규칙 수동 실행 (테스트용).""" row = await db.execute(select(AutoWorkflowRule).where(AutoWorkflowRule.id == rule_id)) rule = row.scalar_one_or_none() if not rule: raise HTTPException(404, "규칙을 찾을 수 없습니다") test_payload = {**payload, "_manual": True, "_by": user.email} if background_tasks: background_tasks.add_task(_run_workflow, rule, test_payload, db) else: await _run_workflow(rule, test_payload, db) return {"ok": True, "rule_id": rule_id, "queued": True} @router.post("/trigger") async def trigger_event( req: TriggerRequest, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """이벤트 발생 → 매칭 규칙 자동 실행.""" rows = await db.execute( select(AutoWorkflowRule).where( AutoWorkflowRule.trigger_type == req.event, AutoWorkflowRule.is_active == True, ) ) rules = rows.scalars().all() triggered = [] for rule in rules: conditions = json.loads(rule.conditions_json) if rule.conditions_json else {} if _evaluate_condition(conditions, req.payload): background_tasks.add_task(_run_workflow, rule, req.payload, db) triggered.append(rule.id) return {"event": req.event, "triggered_rules": triggered, "count": len(triggered)} @router.get("/history") async def workflow_history( limit: int = 50, rule_id: Optional[int] = None, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """워크플로우 실행 이력.""" q = select(AutoWorkflowRun, AutoWorkflowRule.name.label("rule_name")).join( AutoWorkflowRule, AutoWorkflowRun.rule_id == AutoWorkflowRule.id ).order_by(desc(AutoWorkflowRun.started_at)).limit(limit) if rule_id: q = q.where(AutoWorkflowRun.rule_id == rule_id) rows = await db.execute(q) return [ { "id": r.AutoWorkflowRun.id, "rule_id": r.AutoWorkflowRun.rule_id, "rule_name": r.rule_name, "status": r.AutoWorkflowRun.status, "started_at": r.AutoWorkflowRun.started_at, "finished_at": r.AutoWorkflowRun.finished_at, "error": r.AutoWorkflowRun.error_message, } for r in rows.all() ]