feat(rpa): RPA 엔진 개선 — 스키마 필터링·라우터 스캔·영속 저장·크론 연동

[개선 내용]
1. 스키마 필터링: Out/Response/Data 제외 → Create/Update/In만 학습
   - 140개 스키마 → 73개 입력 스키마, 1357개 → 672개 규칙 (노이즈 제거)
2. 라우터 자동 스캔: routers/*.py AST 파싱 → 엔드포인트-스키마 정확 매핑
3. 영속 저장: rpa_rules.json → 서비스 재시작 시 자동 복구
4. 서비스 시작 자동 학습: 규칙 파일 없을 때 즉시 학습
5. APScheduler 연동: schedule(cron) 설정 시 자동 크론 등록/해제
6. /api/rpa/status: 시스템 현황 요약 엔드포인트 추가
7. /api/rpa/validations/schemas: 스키마별 필드 수 조회
8. /api/rpa/tasks/{id}/toggle: 작업 활성/비활성 토글

[테스트 결과 - 전체 통과]
- T1 RPA 상태: 73 endpoints, 672 rules, 자동 학습 확인
- T4 dry_run 정상: validation_errors=[] ✓
- T5 오류 감지: 4개 오류 정확 (title 필수·enum 2개·requested_by 필수)
- T6 작업 등록: APScheduler 크론 등록 포함
- T7 등록 작업 실행: DRY_RUN_OK ✓
- T8 이력 조회: status 필터 정상

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
DESKTOP-TKLFCPR\ython 2026-05-31 16:19:52 +09:00
parent 81d16f77c5
commit c6b28c1584
2 changed files with 551 additions and 261 deletions

View File

@ -1,218 +1,370 @@
"""
RPA Engine 소스 기반 Validation 학습 + 자동화 실행
RPA Engine 소스 기반 Validation 학습 + 자동화 실행 + 크론 스케줄러 연동
"""
from __future__ import annotations
import ast
import inspect
import importlib
import json
import re
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete
BASE_DIR = Path(__file__).resolve().parent.parent # itsm/
RULES_FILE = BASE_DIR / "rpa_rules.json" # 학습 규칙 영속 파일
# 학습 대상 스키마: Create/Update/In/Request 접미사만 허용
_INPUT_SUFFIXES = ("Create", "Update", "In", "Request", "Input", "Patch")
# 제외 접미사
_SKIP_SUFFIXES = ("Out", "Response", "Data", "Result", "Info", "Config",
"Filter", "Query", "Report", "Summary", "Status")
# ── Enum 매핑 ────────────────────────────────────────────────────────────────
ENUM_MAP: Dict[str, List[str]] = {
"SRType": ["DEPLOY", "RESTART", "LOG", "INQUIRY", "OTHER"],
"SRStatus": ["RECEIVED","PARSED","PENDING_APPROVAL","APPROVED",
"IN_PROGRESS","PENDING_PM_VALIDATION","COMPLETED",
"FAILED_ROLLBACK","REJECTED"],
"Priority": ["CRITICAL", "HIGH", "MEDIUM", "LOW"],
"ApprovalResult": ["PENDING", "APPROVED", "REJECTED"],
"Severity": ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"],
"ChangeType": ["STANDARD", "NORMAL", "EMERGENCY"],
"ChangeStatus": ["DRAFT", "SUBMITTED", "APPROVED", "REJECTED",
"IN_PROGRESS", "COMPLETED", "CANCELLED"],
"ProblemStatus": ["OPEN", "IN_ANALYSIS", "KNOWN_ERROR", "RESOLVED", "CLOSED"],
"NetworkDeviceType": ["SWITCH", "ROUTER", "FIREWALL", "LOAD_BALANCER", "OTHER"],
"DRStatus": ["STANDBY", "ACTIVE", "TESTING", "FAILED"],
"RiskLevel": ["CRITICAL", "HIGH", "MEDIUM", "LOW"],
}
# ── Validation 학습 ─────────────────────────────────────────────────────────
class ValidationLearner:
"""프로젝트 소스(models.py)를 AST 파싱하여 Pydantic 스키마 validation 규칙 추출."""
"""
models.py AST 파싱 + routers/ 스캔으로 Pydantic 입력 스키마 validation 규칙 추출.
결과는 rpa_rules.json에 영속 저장.
"""
ENUM_MAP: Dict[str, List[str]] = {
"SRType": ["DEPLOY", "RESTART", "LOG", "INQUIRY", "OTHER"],
"SRStatus": ["RECEIVED","PARSED","PENDING_APPROVAL","APPROVED",
"IN_PROGRESS","PENDING_PM_VALIDATION","COMPLETED",
"FAILED_ROLLBACK","REJECTED"],
"Priority": ["CRITICAL", "HIGH", "MEDIUM", "LOW"],
"ApprovalResult":["PENDING", "APPROVED", "REJECTED"],
}
# 엔드포인트 → 스키마 매핑 (routers/ 분석으로 자동 보완)
ENDPOINT_SCHEMA: Dict[str, str] = {
"POST /api/tasks": "SRCreate",
"PATCH /api/tasks/status": "SRStatusUpdate",
"POST /api/approvals": "ApprovalCreate",
"POST /api/institutions": "InstitutionCreate",
"PUT /api/institutions": "InstitutionCreate",
"POST /api/servers": "ServerCreate",
"POST /api/incidents": "IncidentCreate",
"POST /api/change": "RFCCreate",
"POST /api/problems": "ProblemCreate",
"POST /api/catalog": "ServiceCatalogCreate",
# 수동 엔드포인트 → 스키마 매핑 (자동 스캔으로도 보완됨)
_MANUAL_MAP: Dict[str, str] = {
"POST /api/tasks": "SRCreate",
"PATCH /api/tasks/status": "SRStatusUpdate",
"POST /api/approvals": "ApprovalCreate",
"POST /api/institutions": "InstitutionCreate",
"PUT /api/institutions/{id}": "InstitutionUpdate",
"POST /api/servers": "ServerCreate",
"POST /api/incidents": "IncidentCreate",
"POST /api/change": "RFCCreate",
"POST /api/problems": "ProblemCreate",
"POST /api/catalog": "ServiceCatalogCreate",
"POST /api/kb": "KBDocumentCreate",
"POST /api/shell-scripts": "ShellScriptCreate",
"POST /api/ssh/exec": "SSHExecRequest",
}
def learn_from_source(self) -> Dict[str, Any]:
"""models.py AST 파싱으로 validation 규칙 추출."""
models_path = BASE_DIR / "models.py"
source = models_path.read_text(encoding="utf-8")
tree = ast.parse(source)
"""
1) models.py AST 파싱 입력 스키마만 추출
2) routers/ 스캔 엔드포인트-스키마 매핑 자동 보완
3) 결과를 rpa_rules.json에 저장
"""
# Step 1: 스키마 추출
schemas = self._parse_models()
# Step 2: 라우터 스캔으로 엔드포인트 매핑 자동 보완
router_map = self._scan_routers()
ep_map = {**self._invert_manual(), **router_map} # router 스캔 우선
# Step 3: 규칙 생성
rules: List[Dict] = []
schemas_found: List[str] = []
for class_name, fields in schemas.items():
endpoint = ep_map.get(class_name, self._infer_endpoint(class_name))
for field in fields:
field["endpoint"] = endpoint
rules.append(field)
# Step 4: 영속 저장
payload = {
"learned_at": datetime.now().isoformat(),
"schema_count": len(schemas),
"rule_count": len(rules),
"rules": rules,
}
RULES_FILE.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
return {
"rules": rules,
"schemas": list(schemas.keys()),
"endpoint_count": len(set(r["endpoint"] for r in rules)),
}
def _parse_models(self) -> Dict[str, List[Dict]]:
"""models.py에서 입력 스키마 클래스만 파싱."""
src = (BASE_DIR / "models.py").read_text(encoding="utf-8")
tree = ast.parse(src)
schemas: Dict[str, List[Dict]] = {}
for node in ast.walk(tree):
if not isinstance(node, ast.ClassDef):
continue
# BaseModel 상속 클래스만
bases = [getattr(b, "id", "") for b in node.bases]
if "BaseModel" not in bases:
continue
class_name = node.name
schemas_found.append(class_name)
# 엔드포인트 찾기
endpoint = self._find_endpoint(class_name)
name = node.name
# Out/Response 등 제외
if any(name.endswith(s) for s in _SKIP_SUFFIXES):
continue
# Create/Update 등만 허용
if not any(name.endswith(s) for s in _INPUT_SUFFIXES):
continue
fields = []
for item in node.body:
if not isinstance(item, ast.AnnAssign):
continue
rule = self._extract_field_rule(item, class_name, endpoint)
if rule:
rules.append(rule)
f = self._parse_field(item, name)
if f:
fields.append(f)
return {"rules": rules, "schemas": schemas_found}
if fields:
schemas[name] = fields
def _extract_field_rule(self, node: ast.AnnAssign,
class_name: str, endpoint: str) -> Optional[Dict]:
"""단일 필드 annotation에서 validation 규칙 추출."""
return schemas
def _parse_field(self, node: ast.AnnAssign, class_name: str) -> Optional[Dict]:
if not isinstance(node.target, ast.Name):
return None
field_name = node.target.id
if field_name.startswith("_"):
return None
annotation = node.annotation
# ast.AnnAssign: .value = 기본값 (없으면 None) → None이면 required
is_required = node.value is None
field_type = "str"
allowed_values: List[str] = []
constraints: Dict = {}
# 타입 분석
type_str = ast.unparse(annotation) if hasattr(ast, "unparse") else str(annotation)
# Optional[X] → is_required=False
if "Optional" in type_str:
is_required = False
inner = re.sub(r"Optional\[(.+)\]", r"\1", type_str)
type_str = inner
# Enum 타입
for enum_name, vals in self.ENUM_MAP.items():
if enum_name in type_str:
field_type = "enum"
allowed_values = vals
break
else:
if "int" in type_str:
field_type = "int"
elif "float" in type_str:
field_type = "float"
elif "bool" in type_str:
field_type = "bool"
elif "List" in type_str or "list" in type_str:
field_type = "list"
elif "datetime" in type_str.lower():
field_type = "datetime"
else:
field_type = "str"
type_str = ast.unparse(node.annotation) if hasattr(ast, "unparse") else ""
is_required, field_type, allowed, constraints = self._analyse_type(type_str, node.value)
return {
"endpoint": endpoint,
"schema_class": class_name,
"field_name": field_name,
"field_type": field_type,
"is_required": is_required,
"allowed_values": allowed_values,
"allowed_values": allowed,
"constraints": constraints,
"learned_at": datetime.now().isoformat(),
"endpoint": "", # 후에 채워짐
}
def _find_endpoint(self, class_name: str) -> str:
for ep, schema in self.ENDPOINT_SCHEMA.items():
if schema == class_name:
return ep
# 자동 추론: SRCreate → POST /api/tasks (by name pattern)
name_lower = class_name.lower().replace("create", "").replace("update", "")
return f"POST /api/{name_lower}s"
def _analyse_type(
self, type_str: str, default_node: Any
) -> Tuple[bool, str, List[str], Dict]:
"""타입 문자열 + AST 기본값 노드에서 (is_required, field_type, allowed, constraints) 반환."""
# default가 있으면 required=False
is_required = default_node is None
# Optional[X] → required=False, 내부 타입 추출
if "Optional" in type_str:
is_required = False
type_str = re.sub(r"Optional\[(.+)\]", r"\1", type_str)
allowed: List[str] = []
constraints: Dict = {}
# Enum 매핑
for enum_name, vals in ENUM_MAP.items():
if enum_name in type_str:
return is_required, "enum", vals, constraints
# 기본 타입
if "int" in type_str and "str" not in type_str:
field_type = "int"
elif "float" in type_str:
field_type = "float"
elif "bool" in type_str:
field_type = "bool"
elif "List" in type_str or "list" in type_str:
field_type = "list"
elif "datetime" in type_str.lower() or "date" in type_str.lower():
field_type = "datetime"
else:
field_type = "str"
return is_required, field_type, allowed, constraints
def _scan_routers(self) -> Dict[str, str]:
"""
routers/*.py에서 @router.post/put/patch 데코레이터와
Body 파라미터 타입 힌트를 스캔해 {SchemaClass: "METHOD /path"} 반환.
"""
schema_to_ep: Dict[str, str] = {}
routers_dir = BASE_DIR / "routers"
if not routers_dir.exists():
return schema_to_ep
for py_file in routers_dir.glob("*.py"):
try:
src = py_file.read_text(encoding="utf-8")
tree = ast.parse(src)
except Exception:
continue
# prefix 추출 (APIRouter(prefix="/api/xxx"))
prefix = ""
for node in ast.walk(tree):
if isinstance(node, ast.Call):
func = getattr(node, "func", None)
if func and getattr(func, "id", "") == "APIRouter":
for kw in node.keywords:
if kw.arg == "prefix" and isinstance(kw.value, ast.Constant):
prefix = kw.value.value
break
# 함수 → 데코레이터 + 파라미터 분석
for node in ast.walk(tree):
if not isinstance(node, ast.FunctionDef):
continue
method, path = self._extract_route(node, prefix)
if not method:
continue
schema = self._extract_body_schema(node)
if schema and schema not in schema_to_ep:
schema_to_ep[schema] = f"{method} {path}"
return schema_to_ep
def _extract_route(self, node: ast.FunctionDef, prefix: str) -> Tuple[str, str]:
for dec in node.decorator_list:
call = dec if isinstance(dec, ast.Call) else None
if not call:
continue
attr = getattr(call.func, "attr", "")
method = attr.upper() if attr in ("get","post","put","patch","delete") else ""
if not method:
continue
path = ""
if call.args and isinstance(call.args[0], ast.Constant):
path = call.args[0].value
elif call.keywords:
for kw in call.keywords:
if kw.arg == "path" and isinstance(kw.value, ast.Constant):
path = kw.value.value
return method, f"{prefix}{path}"
return "", ""
def _extract_body_schema(self, node: ast.FunctionDef) -> Optional[str]:
"""함수 파라미터에서 BaseModel 서브클래스 body 파라미터 타입 추출."""
for arg in node.args.args:
if arg.annotation is None:
continue
type_str = ast.unparse(arg.annotation) if hasattr(ast, "unparse") else ""
# 단순 이름이면서 Create/Update/In 으로 끝나는 경우
if any(type_str.endswith(s) for s in _INPUT_SUFFIXES):
return type_str
return None
def _invert_manual(self) -> Dict[str, str]:
return {v: k for k, v in self._MANUAL_MAP.items()}
def _infer_endpoint(self, class_name: str) -> str:
"""스키마명에서 엔드포인트 자동 추론."""
method = "POST"
if class_name.endswith("Update") or class_name.endswith("Patch"):
method = "PUT"
base = re.sub(r"(Create|Update|In|Request|Input|Patch)$", "", class_name).lower()
return f"{method} /api/{base}s"
# ── 규칙 로드 (영속 파일) ────────────────────────────────────────────────────
def load_rules() -> Dict[str, List[Dict]]:
"""rpa_rules.json에서 규칙 로드. 없으면 빈 dict."""
if not RULES_FILE.exists():
return {}
try:
data = json.loads(RULES_FILE.read_text(encoding="utf-8"))
rules_by_ep: Dict[str, List[Dict]] = {}
for r in data.get("rules", []):
ep = r.get("endpoint", "")
rules_by_ep.setdefault(ep, []).append(r)
return rules_by_ep
except Exception:
return {}
def save_rules(rules_by_ep: Dict[str, List[Dict]]) -> None:
"""규칙 dict를 rpa_rules.json에 저장."""
all_rules = [r for rules in rules_by_ep.values() for r in rules]
payload = {
"learned_at": datetime.now().isoformat(),
"rule_count": len(all_rules),
"rules": all_rules,
}
RULES_FILE.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
# ── Validation 검증기 ────────────────────────────────────────────────────────
class RPAValidator:
"""tb_rpa_validation 규칙으로 payload 검증."""
"""학습된 규칙으로 payload 검증."""
def __init__(self, rules: List[Dict]):
self.rules = {r["field_name"]: r for r in rules}
def validate(self, payload: Dict[str, Any]) -> List[str]:
"""
payload 검증. 오류 목록 반환 ( 리스트 = 통과).
"""
errors: List[str] = []
for field_name, rule in self.rules.items():
val = payload.get(field_name)
# 필수 필드 검사
if rule["is_required"] and (val is None or val == ""):
errors.append(f"[{field_name}] 필수 항목입니다.")
continue
if val is None:
continue # optional이고 값 없으면 skip
continue
# Enum 검사
if rule["field_type"] == "enum" and rule["allowed_values"]:
if val not in rule["allowed_values"]:
errors.append(
f"[{field_name}] 허용값: {rule['allowed_values']} 중 하나여야 합니다. (입력: {val!r})"
)
# 타입 검사
elif rule["field_type"] == "int":
try:
int(val)
except (TypeError, ValueError):
errors.append(f"[{field_name}] 정수 타입이어야 합니다.")
elif rule["field_type"] == "bool":
if not isinstance(val, bool):
errors.append(f"[{field_name}] 불리언(true/false) 타입이어야 합니다.")
# 제약 조건 검사
c = rule.get("constraints", {})
if c.get("max_length") and isinstance(val, str):
if len(val) > c["max_length"]:
errors.append(f"[{field_name}] 최대 {c['max_length']}자 초과.")
if c.get("min_length") and isinstance(val, str):
if len(val) < c["min_length"]:
errors.append(f"[{field_name}] 최소 {c['min_length']}자 이상 필요.")
if c.get("ge") is not None and isinstance(val, (int, float)):
if val < c["ge"]:
errors.append(f"[{field_name}] {c['ge']} 이상이어야 합니다.")
if c.get("le") is not None and isinstance(val, (int, float)):
if val > c["le"]:
errors.append(f"[{field_name}] {c['le']} 이하여야 합니다.")
if c.get("max_length") and isinstance(val, str) and len(val) > c["max_length"]:
errors.append(f"[{field_name}] 최대 {c['max_length']}자 초과.")
if c.get("min_length") and isinstance(val, str) and len(val) < c["min_length"]:
errors.append(f"[{field_name}] 최소 {c['min_length']}자 이상 필요.")
return errors
# ── RPA 실행 엔진 ────────────────────────────────────────────────────────────
TASK_ENDPOINT_MAP: Dict[str, Tuple[str, str]] = {
"SR_CREATE": ("POST", "/api/tasks"),
"SR_STATUS_UPDATE": ("PATCH", "/api/tasks/{sr_id}/status"),
"APPROVAL_PROCESS": ("POST", "/api/approvals"),
"INCIDENT_CREATE": ("POST", "/api/incidents"),
"SHELL_EXEC": ("POST", "/api/ssh/exec"),
"SR_BATCH_CREATE": ("POST", "/api/tasks/batch"),
}
class RPAExecutor:
"""RPA 작업 실행기 — validation 후 ITSM API 호출."""
"""학습 규칙 기반 ITSM API 자동 호출."""
def __init__(self, base_url: str, token: str):
self.base_url = base_url.rstrip("/")
self.headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
self.headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
async def execute(
self,
@ -221,45 +373,34 @@ class RPAExecutor:
dry_run: bool = False,
retry: int = 3,
) -> Dict[str, Any]:
"""
단발성 RPA 실행.
dry_run=True validation만 수행, API 호출 없음.
"""
endpoint_map = {
"SR_CREATE": ("POST", "/api/tasks"),
"SR_STATUS_UPDATE": ("PATCH", f"/api/tasks/{payload.get('sr_id', 0)}/status"),
"APPROVAL_PROCESS": ("POST", "/api/approvals"),
"INCIDENT_CREATE": ("POST", "/api/incidents"),
"SHELL_EXEC": ("POST", "/api/ssh/exec"),
}
if task_type not in endpoint_map:
if task_type not in TASK_ENDPOINT_MAP:
return {"status": "FAILED", "error": f"알 수 없는 task_type: {task_type}"}
method, path = endpoint_map[task_type]
result = {"task_type": task_type, "endpoint": f"{method} {path}", "dry_run": dry_run}
method, path_tmpl = TASK_ENDPOINT_MAP[task_type]
path = path_tmpl.format(**payload)
result: Dict[str, Any] = {
"task_type": task_type,
"endpoint": f"{method} {path}",
"dry_run": dry_run,
}
if dry_run:
result["status"] = "DRY_RUN_OK"
result["message"] = "Validation 통과. dry_run=true이므로 실제 실행 생략."
result.update(status="DRY_RUN_OK",
message="Validation 통과. dry_run=true이므로 실제 실행 생략.")
return result
# 실제 API 호출 (재시도 포함)
url = f"{self.base_url}{path}"
last_err = None
last_err: Optional[str] = None
async with httpx.AsyncClient(timeout=30) as client:
for attempt in range(1, retry + 1):
try:
resp = getattr(client, method.lower())
r = await resp(url, json=payload, headers=self.headers)
r = await client.request(method, url, json=payload, headers=self.headers)
if r.status_code < 300:
result["status"] = "SUCCESS"
result["response"] = r.json()
result.update(status="SUCCESS", response=r.json())
return result
elif r.status_code < 500:
# 4xx → 재시도 없음
result["status"] = "FAILED"
result["error"] = r.json()
result.update(status="FAILED", error=r.json())
return result
else:
last_err = f"HTTP {r.status_code}: {r.text[:200]}"
@ -268,8 +409,7 @@ class RPAExecutor:
if attempt < retry:
import asyncio
await asyncio.sleep(2 ** attempt) # 지수 백오프
await asyncio.sleep(2 ** attempt)
result["status"] = "FAILED"
result["error"] = f"{retry}회 재시도 후 실패: {last_err}"
result.update(status="FAILED", error=f"{retry}회 재시도 후 실패: {last_err}")
return result

View File

@ -1,45 +1,85 @@
"""
RPA (Robotic Process Automation) 라우터
- Validation 학습: 프로젝트 소스(models.py) AST 파싱
- RPA 작업 등록/수정/삭제/실행
- Validation 학습: models.py AST + routers/ 스캔
- 규칙 영속: rpa_rules.json
- RPA 작업 등록/수정/삭제/실행 + 크론 스케줄러 연동
- 실행 이력 조회
"""
from __future__ import annotations
import json
import os
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import select, func, delete
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from database import get_db
from models import User
from core.rpa_engine import ValidationLearner, RPAValidator, RPAExecutor
from core.rpa_engine import (
ValidationLearner, RPAValidator, RPAExecutor,
load_rules, save_rules, TASK_ENDPOINT_MAP,
)
router = APIRouter(prefix="/api/rpa", tags=["rpa"])
# ── 인메모리 저장소 (DB 미적용 시 fallback) ──────────────────────────────────
# 실제 운영 시 SQLAlchemy 모델로 교체
_validation_rules: Dict[str, List[Dict]] = {} # endpoint → rules
# ── 인메모리 저장소 (재시작 시 rpa_rules.json로 복구) ─────────────────────
_validation_rules: Dict[str, List[Dict]] = {} # endpoint → rules (런타임)
_rpa_tasks: Dict[int, Dict] = {}
_rpa_executions: List[Dict] = []
_task_id_seq = 1
ITSM_BASE = os.getenv("ITSM_BASE_URL", "http://127.0.0.1:9001")
def _init_rules_from_file() -> None:
"""서비스 시작 시 rpa_rules.json에서 규칙 복구."""
global _validation_rules
loaded = load_rules()
if loaded:
_validation_rules.update(loaded)
total = sum(len(v) for v in loaded.values())
print(f"[RPA] 저장된 validation 규칙 복구: {len(loaded)}개 엔드포인트, {total}개 규칙")
def auto_learn() -> Dict:
"""서비스 시작 시 자동 학습 (규칙 파일 없을 때)."""
learner = ValidationLearner()
result = learner.learn_from_source()
rules = result["rules"]
_validation_rules.clear()
for r in rules:
ep = r["endpoint"]
_validation_rules.setdefault(ep, [])
if not any(x["field_name"] == r["field_name"] for x in _validation_rules[ep]):
_validation_rules[ep].append(r)
return result
# ── 초기화: 파일에서 규칙 복구, 없으면 즉시 학습 ─────────────────────────
_init_rules_from_file()
if not _validation_rules:
try:
auto_learn()
print("[RPA] 초기 Validation 자동 학습 완료")
except Exception as e:
print(f"[RPA] 초기 학습 실패 (수동으로 POST /api/rpa/validations/learn 호출): {e}")
# ── Schemas ──────────────────────────────────────────────────────────────────
class LearnRequest(BaseModel):
endpoints: str = "all" # "all" 또는 특정 endpoint
endpoints: str = "all"
overwrite: bool = True
class RPATaskCreate(BaseModel):
task_name: str
task_type: str # SR_CREATE | SR_STATUS_UPDATE | APPROVAL_PROCESS | ...
schedule: Optional[str] = None # cron expression
task_type: str
schedule: Optional[str] = None
payload_template: Dict[str, Any] = {}
is_active: bool = True
description: Optional[str] = None
@ -60,17 +100,6 @@ class ExecuteRequest(BaseModel):
payload: Dict[str, Any]
dry_run: bool = False
class ExecuteOut(BaseModel):
execution_id: int
task_type: str
status: str
dry_run: bool
validation_errors: List[str] = []
result: Optional[Dict] = None
error: Optional[str] = None
started_at: str
completed_at: Optional[str] = None
# ── Validation 학습 ──────────────────────────────────────────────────────────
@ -79,9 +108,7 @@ async def learn_validations(
req: LearnRequest,
current_user: User = Depends(get_current_user),
):
"""
프로젝트 소스(models.py) AST 파싱하여 validation 규칙 학습.
"""
"""models.py + routers/ 소스 분석으로 validation 규칙 학습."""
learner = ValidationLearner()
try:
result = learner.learn_from_source()
@ -89,38 +116,39 @@ async def learn_validations(
raise HTTPException(500, f"소스 파싱 실패: {e}")
rules = result["rules"]
schemas = result["schemas"]
if req.overwrite:
_validation_rules.clear()
learned = 0
for rule in rules:
ep = rule["endpoint"]
if ep not in _validation_rules:
_validation_rules[ep] = []
# 중복 필드 제거
existing = {r["field_name"] for r in _validation_rules[ep]}
if rule["field_name"] not in existing:
_validation_rules[ep].append(rule)
for r in rules:
ep = r["endpoint"]
_validation_rules.setdefault(ep, [])
existing = {x["field_name"] for x in _validation_rules[ep]}
if r["field_name"] not in existing:
_validation_rules[ep].append(r)
learned += 1
return {
"learned": learned,
"schemas": schemas,
"schemas": result["schemas"],
"endpoints_mapped": len(_validation_rules),
"summary": {ep: len(rules_) for ep, rules_ in _validation_rules.items()},
"total_rules": sum(len(v) for v in _validation_rules.values()),
"summary": {ep: len(rs) for ep, rs in list(_validation_rules.items())[:10]},
}
@router.get("/validations")
async def get_validations(
endpoint: Optional[str] = Query(None),
schema: Optional[str] = Query(None),
current_user: User = Depends(get_current_user),
):
"""학습된 validation 규칙 조회."""
if endpoint:
return {"endpoint": endpoint, "rules": _validation_rules.get(endpoint, [])}
rules = _validation_rules.get(endpoint, [])
if schema:
rules = [r for r in rules if r.get("schema_class") == schema]
return {"endpoint": endpoint, "rule_count": len(rules), "rules": rules}
return {
"total_endpoints": len(_validation_rules),
"total_rules": sum(len(v) for v in _validation_rules.values()),
@ -128,6 +156,17 @@ async def get_validations(
}
@router.get("/validations/schemas")
async def list_schemas(current_user: User = Depends(get_current_user)):
"""학습된 스키마 목록과 각 필드 수."""
schema_map: Dict[str, int] = {}
for rules in _validation_rules.values():
for r in rules:
sc = r.get("schema_class", "")
schema_map[sc] = schema_map.get(sc, 0) + 1
return {"schemas": schema_map}
# ── RPA 작업 관리 ─────────────────────────────────────────────────────────────
@router.post("/tasks", response_model=RPATaskOut)
@ -135,8 +174,11 @@ async def create_rpa_task(
body: RPATaskCreate,
current_user: User = Depends(get_current_user),
):
"""RPA 작업 등록."""
global _task_id_seq
if body.task_type not in TASK_ENDPOINT_MAP:
raise HTTPException(400,
f"지원하지 않는 task_type. 허용값: {list(TASK_ENDPOINT_MAP.keys())}")
task = {
"id": _task_id_seq,
"task_name": body.task_name,
@ -150,6 +192,11 @@ async def create_rpa_task(
"created_by": current_user.username,
}
_rpa_tasks[_task_id_seq] = task
# APScheduler에 크론 등록
if body.schedule and body.is_active:
_register_cron(task)
_task_id_seq += 1
return task
@ -185,6 +232,10 @@ async def update_rpa_task(
task = _rpa_tasks.get(task_id)
if not task:
raise HTTPException(404, "RPA 작업을 찾을 수 없습니다.")
# 기존 크론 제거
_unregister_cron(task_id)
task.update({
"task_name": body.task_name,
"task_type": body.task_type,
@ -193,105 +244,104 @@ async def update_rpa_task(
"is_active": body.is_active,
"description": body.description,
})
if body.schedule and body.is_active:
_register_cron(task)
return task
@router.patch("/tasks/{task_id}/toggle")
async def toggle_rpa_task(task_id: int, current_user: User = Depends(get_current_user)):
"""작업 활성/비활성 토글."""
task = _rpa_tasks.get(task_id)
if not task:
raise HTTPException(404, "RPA 작업을 찾을 수 없습니다.")
task["is_active"] = not task["is_active"]
if task["is_active"] and task.get("schedule"):
_register_cron(task)
else:
_unregister_cron(task_id)
return {"id": task_id, "is_active": task["is_active"]}
@router.delete("/tasks/{task_id}")
async def delete_rpa_task(task_id: int, current_user: User = Depends(get_current_user)):
if task_id not in _rpa_tasks:
raise HTTPException(404, "RPA 작업을 찾을 수 없습니다.")
_unregister_cron(task_id)
del _rpa_tasks[task_id]
return {"deleted": task_id}
# ── RPA 실행 ─────────────────────────────────────────────────────────────────
@router.post("/execute", response_model=ExecuteOut)
@router.post("/execute")
async def execute_rpa(
body: ExecuteRequest,
current_user: User = Depends(get_current_user),
):
"""
단발성 RPA 실행.
1. payload를 학습된 validation 규칙으로 검증
2. dry_run=false 실제 API 호출
"""
"""단발성 RPA 실행 (validation → 실행 → 이력 기록)."""
global _rpa_executions
exec_id = len(_rpa_executions) + 1
started = datetime.now().isoformat()
# Validation 규칙 찾기
executor_map = {
"SR_CREATE": "POST /api/tasks",
"SR_STATUS_UPDATE": "PATCH /api/tasks/status",
"APPROVAL_PROCESS": "POST /api/approvals",
"INCIDENT_CREATE": "POST /api/incidents",
}
endpoint_key = executor_map.get(body.task_type, f"POST /api/{body.task_type.lower()}")
rules = _validation_rules.get(endpoint_key, [])
# 해당 task_type의 엔드포인트 규칙 찾기
from core.rpa_engine import TASK_ENDPOINT_MAP
method_path = TASK_ENDPOINT_MAP.get(body.task_type)
if not method_path:
raise HTTPException(400, f"알 수 없는 task_type: {body.task_type}. 허용값: {list(TASK_ENDPOINT_MAP.keys())}")
# Validation 검증
ep_key = f"{method_path[0]} {method_path[1]}"
# path template → 실제 key (예: PATCH /api/tasks/{sr_id}/status → PATCH /api/tasks/status)
ep_key_norm = ep_key.split("{")[0].rstrip("/")
rules = _validation_rules.get(ep_key, []) or _validation_rules.get(ep_key_norm, [])
# Validation
validator = RPAValidator(rules)
errors = validator.validate(body.payload)
record: Dict[str, Any] = {
"execution_id": exec_id,
"task_type": body.task_type,
"dry_run": body.dry_run,
"validation_errors": errors,
"started_at": started,
"actor": current_user.username,
}
if errors:
record = {
"execution_id": exec_id,
"task_type": body.task_type,
"status": "VALIDATION_FAILED",
"dry_run": body.dry_run,
"validation_errors": errors,
"result": None,
"error": f"{len(errors)}개 validation 오류",
"started_at": started,
"completed_at": datetime.now().isoformat(),
"actor": current_user.username,
}
record.update(status="VALIDATION_FAILED",
error=f"{len(errors)}개 validation 오류", result=None,
completed_at=datetime.now().isoformat())
_rpa_executions.append(record)
return record
if body.dry_run:
record = {
"execution_id": exec_id,
"task_type": body.task_type,
"status": "DRY_RUN_OK",
"dry_run": True,
"validation_errors": [],
"result": {"message": "Validation 통과. dry_run=true이므로 실제 실행 생략."},
"error": None,
"started_at": started,
"completed_at": datetime.now().isoformat(),
"actor": current_user.username,
}
record.update(status="DRY_RUN_OK",
result={"message": "Validation 통과. dry_run=true이므로 실제 실행 생략."},
error=None, completed_at=datetime.now().isoformat())
_rpa_executions.append(record)
return record
# 실제 실행
base_url = os.getenv("ITSM_BASE_URL", "http://localhost:8001")
token = current_user.username # 실제 환경에서는 서비스 토큰 사용
executor = RPAExecutor(base_url=base_url, token=token)
executor = RPAExecutor(base_url=ITSM_BASE, token=_get_service_token(current_user))
try:
result = await executor.execute(body.task_type, body.payload, dry_run=False)
except Exception as e:
result = {"status": "FAILED", "error": str(e)}
record = {
"execution_id": exec_id,
"task_type": body.task_type,
"status": result.get("status", "FAILED"),
"dry_run": False,
"validation_errors": [],
"result": result.get("response"),
"error": result.get("error"),
"started_at": started,
"completed_at": datetime.now().isoformat(),
"actor": current_user.username,
}
record.update(
status=result.get("status", "FAILED"),
result=result.get("response"),
error=result.get("error"),
completed_at=datetime.now().isoformat(),
)
_rpa_executions.append(record)
return record
@router.post("/tasks/{task_id}/run", response_model=ExecuteOut)
@router.post("/tasks/{task_id}/run")
async def run_rpa_task(
task_id: int,
dry_run: bool = Query(False),
@ -304,14 +354,9 @@ async def run_rpa_task(
if not task["is_active"]:
raise HTTPException(400, "비활성 작업입니다. 먼저 활성화하세요.")
req = ExecuteRequest(
task_type=task["task_type"],
payload=task["payload_template"],
dry_run=dry_run,
)
req = ExecuteRequest(task_type=task["task_type"],
payload=task["payload_template"], dry_run=dry_run)
result = await execute_rpa(req, current_user)
# last_run 갱신
_rpa_tasks[task_id]["last_run"] = datetime.now().isoformat()
return result
@ -328,25 +373,130 @@ async def list_executions(
):
execs = list(_rpa_executions)
if status:
execs = [e for e in execs if e["status"] == status]
execs = [e for e in execs if e.get("status") == status]
if task_type:
execs = [e for e in execs if e["task_type"] == task_type]
execs = [e for e in execs if e.get("task_type") == task_type]
total = len(execs)
start = (page - 1) * size
return {
"total": total,
"page": page,
"size": size,
"items": list(reversed(execs))[start:start + size],
}
return {"total": total, "page": page, "size": size,
"items": list(reversed(execs))[start:start + size]}
@router.get("/executions/{execution_id}")
async def get_execution(
execution_id: int,
current_user: User = Depends(get_current_user),
):
async def get_execution(execution_id: int, current_user: User = Depends(get_current_user)):
for e in _rpa_executions:
if e["execution_id"] == execution_id:
return e
raise HTTPException(404, "실행 이력을 찾을 수 없습니다.")
@router.get("/status")
async def rpa_status(current_user: User = Depends(get_current_user)):
"""RPA 시스템 현황 요약."""
return {
"validation_endpoints": len(_validation_rules),
"validation_rules": sum(len(v) for v in _validation_rules.values()),
"tasks_total": len(_rpa_tasks),
"tasks_active": sum(1 for t in _rpa_tasks.values() if t["is_active"]),
"executions_total": len(_rpa_executions),
"executions_success": sum(1 for e in _rpa_executions if e.get("status") == "SUCCESS"),
"executions_failed": sum(1 for e in _rpa_executions
if e.get("status") in ("FAILED", "VALIDATION_FAILED")),
"supported_task_types": list(TASK_ENDPOINT_MAP.keys()),
}
# ── APScheduler 연동 ──────────────────────────────────────────────────────────
def _register_cron(task: Dict) -> None:
"""APScheduler에 크론 잡 등록."""
try:
from core.scheduler import scheduler
cron = task.get("schedule", "")
if not cron:
return
parts = cron.split()
if len(parts) < 5:
return
minute, hour, day, month, day_of_week = parts[:5]
job_id = f"rpa_task_{task['id']}"
scheduler.add_job(
_run_task_background,
trigger="cron",
id=job_id,
replace_existing=True,
minute=minute, hour=hour,
day=day, month=month,
day_of_week=day_of_week,
args=[task["id"]],
)
print(f"[RPA] 크론 등록: {job_id} ({cron})")
except Exception as e:
print(f"[RPA] 크론 등록 실패 (task_id={task['id']}): {e}")
def _unregister_cron(task_id: int) -> None:
try:
from core.scheduler import scheduler
job_id = f"rpa_task_{task_id}"
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
except Exception:
pass
def _run_task_background(task_id: int) -> None:
"""크론에 의해 백그라운드에서 호출되는 RPA 실행 함수."""
import asyncio
task = _rpa_tasks.get(task_id)
if not task or not task["is_active"]:
return
exec_id = len(_rpa_executions) + 1
started = datetime.now().isoformat()
ep = TASK_ENDPOINT_MAP.get(task["task_type"], ("", ""))[1]
ep_key = f"{TASK_ENDPOINT_MAP.get(task['task_type'], ('POST',''))[0]} {ep}"
rules = _validation_rules.get(ep_key, [])
validator = RPAValidator(rules)
errors = validator.validate(task["payload_template"])
record: Dict[str, Any] = {
"execution_id": exec_id,
"task_type": task["task_type"],
"dry_run": False,
"validation_errors": errors,
"started_at": started,
"actor": "rpa-scheduler",
}
if errors:
record.update(status="VALIDATION_FAILED",
error=f"{len(errors)}개 validation 오류", result=None,
completed_at=datetime.now().isoformat())
else:
async def _run():
executor = RPAExecutor(base_url=ITSM_BASE, token="")
return await executor.execute(task["task_type"], task["payload_template"])
try:
loop = asyncio.new_event_loop()
result = loop.run_until_complete(_run())
loop.close()
except Exception as e:
result = {"status": "FAILED", "error": str(e)}
record.update(
status=result.get("status", "FAILED"),
result=result.get("response"),
error=result.get("error"),
completed_at=datetime.now().isoformat(),
)
_rpa_executions.append(record)
_rpa_tasks[task_id]["last_run"] = datetime.now().isoformat()
print(f"[RPA Scheduler] task_id={task_id} status={record['status']}")
def _get_service_token(user: User) -> str:
"""서비스 계정용 토큰 생성 (내부 API 호출용)."""
from core.auth import create_access_token
return create_access_token({"sub": user.username, "role": user.role})