[개선 내용]
1. 스키마 필터링: Out/Response/Data 제외 → Create/Update/In만 학습
- 140개 스키마 → 73개 입력 스키마, 1357개 → 672개 규칙 (노이즈 제거)
2. 라우터 자동 스캔: routers/*.py AST 파싱 → 엔드포인트-스키마 정확 매핑
3. 영속 저장: rpa_rules.json → 서비스 재시작 시 자동 복구
4. 서비스 시작 자동 학습: 규칙 파일 없을 때 즉시 학습
5. APScheduler 연동: schedule(cron) 설정 시 자동 크론 등록/해제
6. /api/rpa/status: 시스템 현황 요약 엔드포인트 추가
7. /api/rpa/validations/schemas: 스키마별 필드 수 조회
8. /api/rpa/tasks/{id}/toggle: 작업 활성/비활성 토글
[테스트 결과 - 전체 통과]
- T1 RPA 상태: 73 endpoints, 672 rules, 자동 학습 확인
- T4 dry_run 정상: validation_errors=[] ✓
- T5 오류 감지: 4개 오류 정확 (title 필수·enum 2개·requested_by 필수)
- T6 작업 등록: APScheduler 크론 등록 포함
- T7 등록 작업 실행: DRY_RUN_OK ✓
- T8 이력 조회: status 필터 정상
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
416 lines
17 KiB
Python
416 lines
17 KiB
Python
"""
|
|
RPA Engine — 소스 기반 Validation 학습 + 자동화 실행 + 크론 스케줄러 연동
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import ast
|
|
import json
|
|
import re
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
import httpx
|
|
|
|
BASE_DIR = Path(__file__).resolve().parent.parent # itsm/
|
|
RULES_FILE = BASE_DIR / "rpa_rules.json" # 학습 규칙 영속 파일
|
|
|
|
# 학습 대상 스키마: Create/Update/In/Request 접미사만 허용
|
|
_INPUT_SUFFIXES = ("Create", "Update", "In", "Request", "Input", "Patch")
|
|
# 제외 접미사
|
|
_SKIP_SUFFIXES = ("Out", "Response", "Data", "Result", "Info", "Config",
|
|
"Filter", "Query", "Report", "Summary", "Status")
|
|
|
|
|
|
# ── Enum 매핑 ────────────────────────────────────────────────────────────────
|
|
|
|
ENUM_MAP: Dict[str, List[str]] = {
|
|
"SRType": ["DEPLOY", "RESTART", "LOG", "INQUIRY", "OTHER"],
|
|
"SRStatus": ["RECEIVED","PARSED","PENDING_APPROVAL","APPROVED",
|
|
"IN_PROGRESS","PENDING_PM_VALIDATION","COMPLETED",
|
|
"FAILED_ROLLBACK","REJECTED"],
|
|
"Priority": ["CRITICAL", "HIGH", "MEDIUM", "LOW"],
|
|
"ApprovalResult": ["PENDING", "APPROVED", "REJECTED"],
|
|
"Severity": ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"],
|
|
"ChangeType": ["STANDARD", "NORMAL", "EMERGENCY"],
|
|
"ChangeStatus": ["DRAFT", "SUBMITTED", "APPROVED", "REJECTED",
|
|
"IN_PROGRESS", "COMPLETED", "CANCELLED"],
|
|
"ProblemStatus": ["OPEN", "IN_ANALYSIS", "KNOWN_ERROR", "RESOLVED", "CLOSED"],
|
|
"NetworkDeviceType": ["SWITCH", "ROUTER", "FIREWALL", "LOAD_BALANCER", "OTHER"],
|
|
"DRStatus": ["STANDBY", "ACTIVE", "TESTING", "FAILED"],
|
|
"RiskLevel": ["CRITICAL", "HIGH", "MEDIUM", "LOW"],
|
|
}
|
|
|
|
|
|
# ── Validation 학습 ─────────────────────────────────────────────────────────
|
|
|
|
class ValidationLearner:
|
|
"""
|
|
models.py AST 파싱 + routers/ 스캔으로 Pydantic 입력 스키마 validation 규칙 추출.
|
|
결과는 rpa_rules.json에 영속 저장.
|
|
"""
|
|
|
|
# 수동 엔드포인트 → 스키마 매핑 (자동 스캔으로도 보완됨)
|
|
_MANUAL_MAP: Dict[str, str] = {
|
|
"POST /api/tasks": "SRCreate",
|
|
"PATCH /api/tasks/status": "SRStatusUpdate",
|
|
"POST /api/approvals": "ApprovalCreate",
|
|
"POST /api/institutions": "InstitutionCreate",
|
|
"PUT /api/institutions/{id}": "InstitutionUpdate",
|
|
"POST /api/servers": "ServerCreate",
|
|
"POST /api/incidents": "IncidentCreate",
|
|
"POST /api/change": "RFCCreate",
|
|
"POST /api/problems": "ProblemCreate",
|
|
"POST /api/catalog": "ServiceCatalogCreate",
|
|
"POST /api/kb": "KBDocumentCreate",
|
|
"POST /api/shell-scripts": "ShellScriptCreate",
|
|
"POST /api/ssh/exec": "SSHExecRequest",
|
|
}
|
|
|
|
def learn_from_source(self) -> Dict[str, Any]:
|
|
"""
|
|
1) models.py AST 파싱 → 입력 스키마만 추출
|
|
2) routers/ 스캔 → 엔드포인트-스키마 매핑 자동 보완
|
|
3) 결과를 rpa_rules.json에 저장
|
|
"""
|
|
# Step 1: 스키마 추출
|
|
schemas = self._parse_models()
|
|
|
|
# Step 2: 라우터 스캔으로 엔드포인트 매핑 자동 보완
|
|
router_map = self._scan_routers()
|
|
ep_map = {**self._invert_manual(), **router_map} # router 스캔 우선
|
|
|
|
# Step 3: 규칙 생성
|
|
rules: List[Dict] = []
|
|
for class_name, fields in schemas.items():
|
|
endpoint = ep_map.get(class_name, self._infer_endpoint(class_name))
|
|
for field in fields:
|
|
field["endpoint"] = endpoint
|
|
rules.append(field)
|
|
|
|
# Step 4: 영속 저장
|
|
payload = {
|
|
"learned_at": datetime.now().isoformat(),
|
|
"schema_count": len(schemas),
|
|
"rule_count": len(rules),
|
|
"rules": rules,
|
|
}
|
|
RULES_FILE.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
return {
|
|
"rules": rules,
|
|
"schemas": list(schemas.keys()),
|
|
"endpoint_count": len(set(r["endpoint"] for r in rules)),
|
|
}
|
|
|
|
def _parse_models(self) -> Dict[str, List[Dict]]:
|
|
"""models.py에서 입력 스키마 클래스만 파싱."""
|
|
src = (BASE_DIR / "models.py").read_text(encoding="utf-8")
|
|
tree = ast.parse(src)
|
|
schemas: Dict[str, List[Dict]] = {}
|
|
|
|
for node in ast.walk(tree):
|
|
if not isinstance(node, ast.ClassDef):
|
|
continue
|
|
bases = [getattr(b, "id", "") for b in node.bases]
|
|
if "BaseModel" not in bases:
|
|
continue
|
|
|
|
name = node.name
|
|
# Out/Response 등 제외
|
|
if any(name.endswith(s) for s in _SKIP_SUFFIXES):
|
|
continue
|
|
# Create/Update 등만 허용
|
|
if not any(name.endswith(s) for s in _INPUT_SUFFIXES):
|
|
continue
|
|
|
|
fields = []
|
|
for item in node.body:
|
|
if not isinstance(item, ast.AnnAssign):
|
|
continue
|
|
f = self._parse_field(item, name)
|
|
if f:
|
|
fields.append(f)
|
|
|
|
if fields:
|
|
schemas[name] = fields
|
|
|
|
return schemas
|
|
|
|
def _parse_field(self, node: ast.AnnAssign, class_name: str) -> Optional[Dict]:
|
|
if not isinstance(node.target, ast.Name):
|
|
return None
|
|
field_name = node.target.id
|
|
if field_name.startswith("_"):
|
|
return None
|
|
|
|
type_str = ast.unparse(node.annotation) if hasattr(ast, "unparse") else ""
|
|
is_required, field_type, allowed, constraints = self._analyse_type(type_str, node.value)
|
|
|
|
return {
|
|
"schema_class": class_name,
|
|
"field_name": field_name,
|
|
"field_type": field_type,
|
|
"is_required": is_required,
|
|
"allowed_values": allowed,
|
|
"constraints": constraints,
|
|
"learned_at": datetime.now().isoformat(),
|
|
"endpoint": "", # 후에 채워짐
|
|
}
|
|
|
|
def _analyse_type(
|
|
self, type_str: str, default_node: Any
|
|
) -> Tuple[bool, str, List[str], Dict]:
|
|
"""타입 문자열 + AST 기본값 노드에서 (is_required, field_type, allowed, constraints) 반환."""
|
|
# default가 있으면 required=False
|
|
is_required = default_node is None
|
|
# Optional[X] → required=False, 내부 타입 추출
|
|
if "Optional" in type_str:
|
|
is_required = False
|
|
type_str = re.sub(r"Optional\[(.+)\]", r"\1", type_str)
|
|
|
|
allowed: List[str] = []
|
|
constraints: Dict = {}
|
|
|
|
# Enum 매핑
|
|
for enum_name, vals in ENUM_MAP.items():
|
|
if enum_name in type_str:
|
|
return is_required, "enum", vals, constraints
|
|
|
|
# 기본 타입
|
|
if "int" in type_str and "str" not in type_str:
|
|
field_type = "int"
|
|
elif "float" in type_str:
|
|
field_type = "float"
|
|
elif "bool" in type_str:
|
|
field_type = "bool"
|
|
elif "List" in type_str or "list" in type_str:
|
|
field_type = "list"
|
|
elif "datetime" in type_str.lower() or "date" in type_str.lower():
|
|
field_type = "datetime"
|
|
else:
|
|
field_type = "str"
|
|
|
|
return is_required, field_type, allowed, constraints
|
|
|
|
def _scan_routers(self) -> Dict[str, str]:
|
|
"""
|
|
routers/*.py에서 @router.post/put/patch 데코레이터와
|
|
Body 파라미터 타입 힌트를 스캔해 {SchemaClass: "METHOD /path"} 반환.
|
|
"""
|
|
schema_to_ep: Dict[str, str] = {}
|
|
routers_dir = BASE_DIR / "routers"
|
|
if not routers_dir.exists():
|
|
return schema_to_ep
|
|
|
|
for py_file in routers_dir.glob("*.py"):
|
|
try:
|
|
src = py_file.read_text(encoding="utf-8")
|
|
tree = ast.parse(src)
|
|
except Exception:
|
|
continue
|
|
|
|
# prefix 추출 (APIRouter(prefix="/api/xxx"))
|
|
prefix = ""
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.Call):
|
|
func = getattr(node, "func", None)
|
|
if func and getattr(func, "id", "") == "APIRouter":
|
|
for kw in node.keywords:
|
|
if kw.arg == "prefix" and isinstance(kw.value, ast.Constant):
|
|
prefix = kw.value.value
|
|
break
|
|
|
|
# 함수 → 데코레이터 + 파라미터 분석
|
|
for node in ast.walk(tree):
|
|
if not isinstance(node, ast.FunctionDef):
|
|
continue
|
|
method, path = self._extract_route(node, prefix)
|
|
if not method:
|
|
continue
|
|
schema = self._extract_body_schema(node)
|
|
if schema and schema not in schema_to_ep:
|
|
schema_to_ep[schema] = f"{method} {path}"
|
|
|
|
return schema_to_ep
|
|
|
|
def _extract_route(self, node: ast.FunctionDef, prefix: str) -> Tuple[str, str]:
|
|
for dec in node.decorator_list:
|
|
call = dec if isinstance(dec, ast.Call) else None
|
|
if not call:
|
|
continue
|
|
attr = getattr(call.func, "attr", "")
|
|
method = attr.upper() if attr in ("get","post","put","patch","delete") else ""
|
|
if not method:
|
|
continue
|
|
path = ""
|
|
if call.args and isinstance(call.args[0], ast.Constant):
|
|
path = call.args[0].value
|
|
elif call.keywords:
|
|
for kw in call.keywords:
|
|
if kw.arg == "path" and isinstance(kw.value, ast.Constant):
|
|
path = kw.value.value
|
|
return method, f"{prefix}{path}"
|
|
return "", ""
|
|
|
|
def _extract_body_schema(self, node: ast.FunctionDef) -> Optional[str]:
|
|
"""함수 파라미터에서 BaseModel 서브클래스 body 파라미터 타입 추출."""
|
|
for arg in node.args.args:
|
|
if arg.annotation is None:
|
|
continue
|
|
type_str = ast.unparse(arg.annotation) if hasattr(ast, "unparse") else ""
|
|
# 단순 이름이면서 Create/Update/In 으로 끝나는 경우
|
|
if any(type_str.endswith(s) for s in _INPUT_SUFFIXES):
|
|
return type_str
|
|
return None
|
|
|
|
def _invert_manual(self) -> Dict[str, str]:
|
|
return {v: k for k, v in self._MANUAL_MAP.items()}
|
|
|
|
def _infer_endpoint(self, class_name: str) -> str:
|
|
"""스키마명에서 엔드포인트 자동 추론."""
|
|
method = "POST"
|
|
if class_name.endswith("Update") or class_name.endswith("Patch"):
|
|
method = "PUT"
|
|
base = re.sub(r"(Create|Update|In|Request|Input|Patch)$", "", class_name).lower()
|
|
return f"{method} /api/{base}s"
|
|
|
|
|
|
# ── 규칙 로드 (영속 파일) ────────────────────────────────────────────────────
|
|
|
|
def load_rules() -> Dict[str, List[Dict]]:
|
|
"""rpa_rules.json에서 규칙 로드. 없으면 빈 dict."""
|
|
if not RULES_FILE.exists():
|
|
return {}
|
|
try:
|
|
data = json.loads(RULES_FILE.read_text(encoding="utf-8"))
|
|
rules_by_ep: Dict[str, List[Dict]] = {}
|
|
for r in data.get("rules", []):
|
|
ep = r.get("endpoint", "")
|
|
rules_by_ep.setdefault(ep, []).append(r)
|
|
return rules_by_ep
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def save_rules(rules_by_ep: Dict[str, List[Dict]]) -> None:
|
|
"""규칙 dict를 rpa_rules.json에 저장."""
|
|
all_rules = [r for rules in rules_by_ep.values() for r in rules]
|
|
payload = {
|
|
"learned_at": datetime.now().isoformat(),
|
|
"rule_count": len(all_rules),
|
|
"rules": all_rules,
|
|
}
|
|
RULES_FILE.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
|
|
# ── Validation 검증기 ────────────────────────────────────────────────────────
|
|
|
|
class RPAValidator:
|
|
"""학습된 규칙으로 payload 검증."""
|
|
|
|
def __init__(self, rules: List[Dict]):
|
|
self.rules = {r["field_name"]: r for r in rules}
|
|
|
|
def validate(self, payload: Dict[str, Any]) -> List[str]:
|
|
errors: List[str] = []
|
|
for field_name, rule in self.rules.items():
|
|
val = payload.get(field_name)
|
|
|
|
if rule["is_required"] and (val is None or val == ""):
|
|
errors.append(f"[{field_name}] 필수 항목입니다.")
|
|
continue
|
|
|
|
if val is None:
|
|
continue
|
|
|
|
if rule["field_type"] == "enum" and rule["allowed_values"]:
|
|
if val not in rule["allowed_values"]:
|
|
errors.append(
|
|
f"[{field_name}] 허용값: {rule['allowed_values']} 중 하나여야 합니다. (입력: {val!r})"
|
|
)
|
|
elif rule["field_type"] == "int":
|
|
try:
|
|
int(val)
|
|
except (TypeError, ValueError):
|
|
errors.append(f"[{field_name}] 정수 타입이어야 합니다.")
|
|
elif rule["field_type"] == "bool":
|
|
if not isinstance(val, bool):
|
|
errors.append(f"[{field_name}] 불리언(true/false) 타입이어야 합니다.")
|
|
|
|
c = rule.get("constraints", {})
|
|
if c.get("max_length") and isinstance(val, str) and len(val) > c["max_length"]:
|
|
errors.append(f"[{field_name}] 최대 {c['max_length']}자 초과.")
|
|
if c.get("min_length") and isinstance(val, str) and len(val) < c["min_length"]:
|
|
errors.append(f"[{field_name}] 최소 {c['min_length']}자 이상 필요.")
|
|
|
|
return errors
|
|
|
|
|
|
# ── RPA 실행 엔진 ────────────────────────────────────────────────────────────
|
|
|
|
TASK_ENDPOINT_MAP: Dict[str, Tuple[str, str]] = {
|
|
"SR_CREATE": ("POST", "/api/tasks"),
|
|
"SR_STATUS_UPDATE": ("PATCH", "/api/tasks/{sr_id}/status"),
|
|
"APPROVAL_PROCESS": ("POST", "/api/approvals"),
|
|
"INCIDENT_CREATE": ("POST", "/api/incidents"),
|
|
"SHELL_EXEC": ("POST", "/api/ssh/exec"),
|
|
"SR_BATCH_CREATE": ("POST", "/api/tasks/batch"),
|
|
}
|
|
|
|
|
|
class RPAExecutor:
|
|
"""학습 규칙 기반 ITSM API 자동 호출."""
|
|
|
|
def __init__(self, base_url: str, token: str):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
|
|
async def execute(
|
|
self,
|
|
task_type: str,
|
|
payload: Dict[str, Any],
|
|
dry_run: bool = False,
|
|
retry: int = 3,
|
|
) -> Dict[str, Any]:
|
|
if task_type not in TASK_ENDPOINT_MAP:
|
|
return {"status": "FAILED", "error": f"알 수 없는 task_type: {task_type}"}
|
|
|
|
method, path_tmpl = TASK_ENDPOINT_MAP[task_type]
|
|
path = path_tmpl.format(**payload)
|
|
result: Dict[str, Any] = {
|
|
"task_type": task_type,
|
|
"endpoint": f"{method} {path}",
|
|
"dry_run": dry_run,
|
|
}
|
|
|
|
if dry_run:
|
|
result.update(status="DRY_RUN_OK",
|
|
message="Validation 통과. dry_run=true이므로 실제 실행 생략.")
|
|
return result
|
|
|
|
url = f"{self.base_url}{path}"
|
|
last_err: Optional[str] = None
|
|
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
for attempt in range(1, retry + 1):
|
|
try:
|
|
r = await client.request(method, url, json=payload, headers=self.headers)
|
|
if r.status_code < 300:
|
|
result.update(status="SUCCESS", response=r.json())
|
|
return result
|
|
elif r.status_code < 500:
|
|
result.update(status="FAILED", error=r.json())
|
|
return result
|
|
else:
|
|
last_err = f"HTTP {r.status_code}: {r.text[:200]}"
|
|
except Exception as e:
|
|
last_err = str(e)
|
|
|
|
if attempt < retry:
|
|
import asyncio
|
|
await asyncio.sleep(2 ** attempt)
|
|
|
|
result.update(status="FAILED", error=f"{retry}회 재시도 후 실패: {last_err}")
|
|
return result
|