""" RPA (Robotic Process Automation) 라우터 - Validation 학습: models.py AST + routers/ 스캔 - 규칙 영속: rpa_rules.json - RPA 작업 등록/수정/삭제/실행 + 크론 스케줄러 연동 - 실행 이력 조회 """ from __future__ import annotations import json import os from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user from database import get_db from models import User from core.rpa_engine import ( ValidationLearner, RPAValidator, RPAExecutor, load_rules, save_rules, TASK_ENDPOINT_MAP, ) router = APIRouter(prefix="/api/rpa", tags=["rpa"]) # ── 인메모리 저장소 (재시작 시 rpa_rules.json로 복구) ───────────────────── _validation_rules: Dict[str, List[Dict]] = {} # endpoint → rules (런타임) _rpa_tasks: Dict[int, Dict] = {} _rpa_executions: List[Dict] = [] _task_id_seq = 1 ITSM_BASE = os.getenv("ITSM_BASE_URL", "http://127.0.0.1:9001") def _init_rules_from_file() -> None: """서비스 시작 시 rpa_rules.json에서 규칙 복구.""" global _validation_rules loaded = load_rules() if loaded: _validation_rules.update(loaded) total = sum(len(v) for v in loaded.values()) print(f"[RPA] 저장된 validation 규칙 복구: {len(loaded)}개 엔드포인트, {total}개 규칙") def auto_learn() -> Dict: """서비스 시작 시 자동 학습 (규칙 파일 없을 때).""" learner = ValidationLearner() result = learner.learn_from_source() rules = result["rules"] _validation_rules.clear() for r in rules: ep = r["endpoint"] _validation_rules.setdefault(ep, []) if not any(x["field_name"] == r["field_name"] for x in _validation_rules[ep]): _validation_rules[ep].append(r) return result # ── 초기화: 파일에서 규칙 복구, 없으면 즉시 학습 ───────────────────────── _init_rules_from_file() if not _validation_rules: try: auto_learn() print("[RPA] 초기 Validation 자동 학습 완료") except Exception as e: print(f"[RPA] 초기 학습 실패 (수동으로 POST /api/rpa/validations/learn 호출): {e}") # ── Schemas ────────────────────────────────────────────────────────────────── class LearnRequest(BaseModel): endpoints: str = "all" overwrite: bool = True class RPATaskCreate(BaseModel): task_name: str task_type: str schedule: Optional[str] = None payload_template: Dict[str, Any] = {} is_active: bool = True description: Optional[str] = None class RPATaskOut(BaseModel): id: int task_name: str task_type: str schedule: Optional[str] payload_template: Dict[str, Any] is_active: bool description: Optional[str] created_at: str last_run: Optional[str] class ExecuteRequest(BaseModel): task_type: str payload: Dict[str, Any] dry_run: bool = False # ── Validation 학습 ────────────────────────────────────────────────────────── @router.post("/validations/learn") async def learn_validations( req: LearnRequest, current_user: User = Depends(get_current_user), ): """models.py + routers/ 소스 분석으로 validation 규칙 학습.""" learner = ValidationLearner() try: result = learner.learn_from_source() except Exception as e: raise HTTPException(500, f"소스 파싱 실패: {e}") rules = result["rules"] if req.overwrite: _validation_rules.clear() learned = 0 for r in rules: ep = r["endpoint"] _validation_rules.setdefault(ep, []) existing = {x["field_name"] for x in _validation_rules[ep]} if r["field_name"] not in existing: _validation_rules[ep].append(r) learned += 1 return { "learned": learned, "schemas": result["schemas"], "endpoints_mapped": len(_validation_rules), "total_rules": sum(len(v) for v in _validation_rules.values()), "summary": {ep: len(rs) for ep, rs in list(_validation_rules.items())[:10]}, } @router.get("/validations") async def get_validations( endpoint: Optional[str] = Query(None), schema: Optional[str] = Query(None), current_user: User = Depends(get_current_user), ): """학습된 validation 규칙 조회.""" if endpoint: rules = _validation_rules.get(endpoint, []) if schema: rules = [r for r in rules if r.get("schema_class") == schema] return {"endpoint": endpoint, "rule_count": len(rules), "rules": rules} return { "total_endpoints": len(_validation_rules), "total_rules": sum(len(v) for v in _validation_rules.values()), "endpoints": list(_validation_rules.keys()), } @router.get("/validations/schemas") async def list_schemas(current_user: User = Depends(get_current_user)): """학습된 스키마 목록과 각 필드 수.""" schema_map: Dict[str, int] = {} for rules in _validation_rules.values(): for r in rules: sc = r.get("schema_class", "") schema_map[sc] = schema_map.get(sc, 0) + 1 return {"schemas": schema_map} # ── RPA 작업 관리 ───────────────────────────────────────────────────────────── @router.post("/tasks", response_model=RPATaskOut) async def create_rpa_task( body: RPATaskCreate, current_user: User = Depends(get_current_user), ): global _task_id_seq if body.task_type not in TASK_ENDPOINT_MAP: raise HTTPException(400, f"지원하지 않는 task_type. 허용값: {list(TASK_ENDPOINT_MAP.keys())}") task = { "id": _task_id_seq, "task_name": body.task_name, "task_type": body.task_type, "schedule": body.schedule, "payload_template": body.payload_template, "is_active": body.is_active, "description": body.description, "created_at": datetime.now().isoformat(), "last_run": None, "created_by": current_user.username, } _rpa_tasks[_task_id_seq] = task # APScheduler에 크론 등록 if body.schedule and body.is_active: _register_cron(task) _task_id_seq += 1 return task @router.get("/tasks", response_model=List[RPATaskOut]) async def list_rpa_tasks( is_active: Optional[bool] = Query(None), task_type: Optional[str] = Query(None), current_user: User = Depends(get_current_user), ): tasks = list(_rpa_tasks.values()) if is_active is not None: tasks = [t for t in tasks if t["is_active"] == is_active] if task_type: tasks = [t for t in tasks if t["task_type"] == task_type] return tasks @router.get("/tasks/{task_id}", response_model=RPATaskOut) async def get_rpa_task(task_id: int, current_user: User = Depends(get_current_user)): task = _rpa_tasks.get(task_id) if not task: raise HTTPException(404, "RPA 작업을 찾을 수 없습니다.") return task @router.put("/tasks/{task_id}", response_model=RPATaskOut) async def update_rpa_task( task_id: int, body: RPATaskCreate, current_user: User = Depends(get_current_user), ): task = _rpa_tasks.get(task_id) if not task: raise HTTPException(404, "RPA 작업을 찾을 수 없습니다.") # 기존 크론 제거 _unregister_cron(task_id) task.update({ "task_name": body.task_name, "task_type": body.task_type, "schedule": body.schedule, "payload_template": body.payload_template, "is_active": body.is_active, "description": body.description, }) if body.schedule and body.is_active: _register_cron(task) return task @router.patch("/tasks/{task_id}/toggle") async def toggle_rpa_task(task_id: int, current_user: User = Depends(get_current_user)): """작업 활성/비활성 토글.""" task = _rpa_tasks.get(task_id) if not task: raise HTTPException(404, "RPA 작업을 찾을 수 없습니다.") task["is_active"] = not task["is_active"] if task["is_active"] and task.get("schedule"): _register_cron(task) else: _unregister_cron(task_id) return {"id": task_id, "is_active": task["is_active"]} @router.delete("/tasks/{task_id}") async def delete_rpa_task(task_id: int, current_user: User = Depends(get_current_user)): if task_id not in _rpa_tasks: raise HTTPException(404, "RPA 작업을 찾을 수 없습니다.") _unregister_cron(task_id) del _rpa_tasks[task_id] return {"deleted": task_id} # ── RPA 실행 ───────────────────────────────────────────────────────────────── @router.post("/execute") async def execute_rpa( body: ExecuteRequest, current_user: User = Depends(get_current_user), ): """단발성 RPA 실행 (validation → 실행 → 이력 기록).""" global _rpa_executions exec_id = len(_rpa_executions) + 1 started = datetime.now().isoformat() # 해당 task_type의 엔드포인트 규칙 찾기 from core.rpa_engine import TASK_ENDPOINT_MAP method_path = TASK_ENDPOINT_MAP.get(body.task_type) if not method_path: raise HTTPException(400, f"알 수 없는 task_type: {body.task_type}. 허용값: {list(TASK_ENDPOINT_MAP.keys())}") ep_key = f"{method_path[0]} {method_path[1]}" # path template → 실제 key (예: PATCH /api/tasks/{sr_id}/status → PATCH /api/tasks/status) ep_key_norm = ep_key.split("{")[0].rstrip("/") rules = _validation_rules.get(ep_key, []) or _validation_rules.get(ep_key_norm, []) # Validation validator = RPAValidator(rules) errors = validator.validate(body.payload) record: Dict[str, Any] = { "execution_id": exec_id, "task_type": body.task_type, "dry_run": body.dry_run, "validation_errors": errors, "started_at": started, "actor": current_user.username, } if errors: record.update(status="VALIDATION_FAILED", error=f"{len(errors)}개 validation 오류", result=None, completed_at=datetime.now().isoformat()) _rpa_executions.append(record) return record if body.dry_run: record.update(status="DRY_RUN_OK", result={"message": "Validation 통과. dry_run=true이므로 실제 실행 생략."}, error=None, completed_at=datetime.now().isoformat()) _rpa_executions.append(record) return record # 실제 실행 executor = RPAExecutor(base_url=ITSM_BASE, token=_get_service_token(current_user)) try: result = await executor.execute(body.task_type, body.payload, dry_run=False) except Exception as e: result = {"status": "FAILED", "error": str(e)} record.update( status=result.get("status", "FAILED"), result=result.get("response"), error=result.get("error"), completed_at=datetime.now().isoformat(), ) _rpa_executions.append(record) return record @router.post("/tasks/{task_id}/run") async def run_rpa_task( task_id: int, dry_run: bool = Query(False), current_user: User = Depends(get_current_user), ): """등록된 RPA 작업 즉시 실행.""" task = _rpa_tasks.get(task_id) if not task: raise HTTPException(404, "RPA 작업을 찾을 수 없습니다.") if not task["is_active"]: raise HTTPException(400, "비활성 작업입니다. 먼저 활성화하세요.") req = ExecuteRequest(task_type=task["task_type"], payload=task["payload_template"], dry_run=dry_run) result = await execute_rpa(req, current_user) _rpa_tasks[task_id]["last_run"] = datetime.now().isoformat() return result # ── 실행 이력 ────────────────────────────────────────────────────────────────── @router.get("/executions") async def list_executions( status: Optional[str] = Query(None), task_type: Optional[str] = Query(None), page: int = Query(1, ge=1), size: int = Query(20, ge=1, le=100), current_user: User = Depends(get_current_user), ): execs = list(_rpa_executions) if status: execs = [e for e in execs if e.get("status") == status] if task_type: execs = [e for e in execs if e.get("task_type") == task_type] total = len(execs) start = (page - 1) * size return {"total": total, "page": page, "size": size, "items": list(reversed(execs))[start:start + size]} @router.get("/executions/{execution_id}") async def get_execution(execution_id: int, current_user: User = Depends(get_current_user)): for e in _rpa_executions: if e["execution_id"] == execution_id: return e raise HTTPException(404, "실행 이력을 찾을 수 없습니다.") @router.get("/status") async def rpa_status(current_user: User = Depends(get_current_user)): """RPA 시스템 현황 요약.""" return { "validation_endpoints": len(_validation_rules), "validation_rules": sum(len(v) for v in _validation_rules.values()), "tasks_total": len(_rpa_tasks), "tasks_active": sum(1 for t in _rpa_tasks.values() if t["is_active"]), "executions_total": len(_rpa_executions), "executions_success": sum(1 for e in _rpa_executions if e.get("status") == "SUCCESS"), "executions_failed": sum(1 for e in _rpa_executions if e.get("status") in ("FAILED", "VALIDATION_FAILED")), "supported_task_types": list(TASK_ENDPOINT_MAP.keys()), } # ── APScheduler 연동 ────────────────────────────────────────────────────────── def _register_cron(task: Dict) -> None: """APScheduler에 크론 잡 등록.""" try: from core.scheduler import scheduler cron = task.get("schedule", "") if not cron: return parts = cron.split() if len(parts) < 5: return minute, hour, day, month, day_of_week = parts[:5] job_id = f"rpa_task_{task['id']}" scheduler.add_job( _run_task_background, trigger="cron", id=job_id, replace_existing=True, minute=minute, hour=hour, day=day, month=month, day_of_week=day_of_week, args=[task["id"]], ) print(f"[RPA] 크론 등록: {job_id} ({cron})") except Exception as e: print(f"[RPA] 크론 등록 실패 (task_id={task['id']}): {e}") def _unregister_cron(task_id: int) -> None: try: from core.scheduler import scheduler job_id = f"rpa_task_{task_id}" if scheduler.get_job(job_id): scheduler.remove_job(job_id) except Exception: pass def _run_task_background(task_id: int) -> None: """크론에 의해 백그라운드에서 호출되는 RPA 실행 함수.""" import asyncio task = _rpa_tasks.get(task_id) if not task or not task["is_active"]: return exec_id = len(_rpa_executions) + 1 started = datetime.now().isoformat() ep = TASK_ENDPOINT_MAP.get(task["task_type"], ("", ""))[1] ep_key = f"{TASK_ENDPOINT_MAP.get(task['task_type'], ('POST',''))[0]} {ep}" rules = _validation_rules.get(ep_key, []) validator = RPAValidator(rules) errors = validator.validate(task["payload_template"]) record: Dict[str, Any] = { "execution_id": exec_id, "task_type": task["task_type"], "dry_run": False, "validation_errors": errors, "started_at": started, "actor": "rpa-scheduler", } if errors: record.update(status="VALIDATION_FAILED", error=f"{len(errors)}개 validation 오류", result=None, completed_at=datetime.now().isoformat()) else: async def _run(): executor = RPAExecutor(base_url=ITSM_BASE, token="") return await executor.execute(task["task_type"], task["payload_template"]) try: loop = asyncio.new_event_loop() result = loop.run_until_complete(_run()) loop.close() except Exception as e: result = {"status": "FAILED", "error": str(e)} record.update( status=result.get("status", "FAILED"), result=result.get("response"), error=result.get("error"), completed_at=datetime.now().isoformat(), ) _rpa_executions.append(record) _rpa_tasks[task_id]["last_run"] = datetime.now().isoformat() print(f"[RPA Scheduler] task_id={task_id} status={record['status']}") def _get_service_token(user: User) -> str: """서비스 계정용 토큰 생성 (내부 API 호출용).""" from core.auth import create_access_token return create_access_token({"sub": user.username, "role": user.role})