zioinfo-mail/workspace/guardia-itsm/core/rpa_engine.py
DESKTOP-TKLFCPR\ython cfe2901a55 refactor(structure): consolidate all projects under workspace/
- itsm/    -> workspace/guardia-itsm/
- manager/ -> workspace/guardia-manager/
- app/     -> workspace/guardia-messenger/
- manual/  -> workspace/guardia-docs/

workspace/zioinfo-web/ unchanged.
git mv preserves full commit history.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-31 23:50:56 +09:00

416 lines
17 KiB
Python

"""
RPA Engine — 소스 기반 Validation 학습 + 자동화 실행 + 크론 스케줄러 연동
"""
from __future__ import annotations
import ast
import json
import re
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import httpx
BASE_DIR = Path(__file__).resolve().parent.parent # itsm/
RULES_FILE = BASE_DIR / "rpa_rules.json" # 학습 규칙 영속 파일
# 학습 대상 스키마: Create/Update/In/Request 접미사만 허용
_INPUT_SUFFIXES = ("Create", "Update", "In", "Request", "Input", "Patch")
# 제외 접미사
_SKIP_SUFFIXES = ("Out", "Response", "Data", "Result", "Info", "Config",
"Filter", "Query", "Report", "Summary", "Status")
# ── Enum 매핑 ────────────────────────────────────────────────────────────────
ENUM_MAP: Dict[str, List[str]] = {
"SRType": ["DEPLOY", "RESTART", "LOG", "INQUIRY", "OTHER"],
"SRStatus": ["RECEIVED","PARSED","PENDING_APPROVAL","APPROVED",
"IN_PROGRESS","PENDING_PM_VALIDATION","COMPLETED",
"FAILED_ROLLBACK","REJECTED"],
"Priority": ["CRITICAL", "HIGH", "MEDIUM", "LOW"],
"ApprovalResult": ["PENDING", "APPROVED", "REJECTED"],
"Severity": ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"],
"ChangeType": ["STANDARD", "NORMAL", "EMERGENCY"],
"ChangeStatus": ["DRAFT", "SUBMITTED", "APPROVED", "REJECTED",
"IN_PROGRESS", "COMPLETED", "CANCELLED"],
"ProblemStatus": ["OPEN", "IN_ANALYSIS", "KNOWN_ERROR", "RESOLVED", "CLOSED"],
"NetworkDeviceType": ["SWITCH", "ROUTER", "FIREWALL", "LOAD_BALANCER", "OTHER"],
"DRStatus": ["STANDBY", "ACTIVE", "TESTING", "FAILED"],
"RiskLevel": ["CRITICAL", "HIGH", "MEDIUM", "LOW"],
}
# ── Validation 학습 ─────────────────────────────────────────────────────────
class ValidationLearner:
"""
models.py AST 파싱 + routers/ 스캔으로 Pydantic 입력 스키마 validation 규칙 추출.
결과는 rpa_rules.json에 영속 저장.
"""
# 수동 엔드포인트 → 스키마 매핑 (자동 스캔으로도 보완됨)
_MANUAL_MAP: Dict[str, str] = {
"POST /api/tasks": "SRCreate",
"PATCH /api/tasks/status": "SRStatusUpdate",
"POST /api/approvals": "ApprovalCreate",
"POST /api/institutions": "InstitutionCreate",
"PUT /api/institutions/{id}": "InstitutionUpdate",
"POST /api/servers": "ServerCreate",
"POST /api/incidents": "IncidentCreate",
"POST /api/change": "RFCCreate",
"POST /api/problems": "ProblemCreate",
"POST /api/catalog": "ServiceCatalogCreate",
"POST /api/kb": "KBDocumentCreate",
"POST /api/shell-scripts": "ShellScriptCreate",
"POST /api/ssh/exec": "SSHExecRequest",
}
def learn_from_source(self) -> Dict[str, Any]:
"""
1) models.py AST 파싱 → 입력 스키마만 추출
2) routers/ 스캔 → 엔드포인트-스키마 매핑 자동 보완
3) 결과를 rpa_rules.json에 저장
"""
# Step 1: 스키마 추출
schemas = self._parse_models()
# Step 2: 라우터 스캔으로 엔드포인트 매핑 자동 보완
router_map = self._scan_routers()
ep_map = {**self._invert_manual(), **router_map} # router 스캔 우선
# Step 3: 규칙 생성
rules: List[Dict] = []
for class_name, fields in schemas.items():
endpoint = ep_map.get(class_name, self._infer_endpoint(class_name))
for field in fields:
field["endpoint"] = endpoint
rules.append(field)
# Step 4: 영속 저장
payload = {
"learned_at": datetime.now().isoformat(),
"schema_count": len(schemas),
"rule_count": len(rules),
"rules": rules,
}
RULES_FILE.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
return {
"rules": rules,
"schemas": list(schemas.keys()),
"endpoint_count": len(set(r["endpoint"] for r in rules)),
}
def _parse_models(self) -> Dict[str, List[Dict]]:
"""models.py에서 입력 스키마 클래스만 파싱."""
src = (BASE_DIR / "models.py").read_text(encoding="utf-8")
tree = ast.parse(src)
schemas: Dict[str, List[Dict]] = {}
for node in ast.walk(tree):
if not isinstance(node, ast.ClassDef):
continue
bases = [getattr(b, "id", "") for b in node.bases]
if "BaseModel" not in bases:
continue
name = node.name
# Out/Response 등 제외
if any(name.endswith(s) for s in _SKIP_SUFFIXES):
continue
# Create/Update 등만 허용
if not any(name.endswith(s) for s in _INPUT_SUFFIXES):
continue
fields = []
for item in node.body:
if not isinstance(item, ast.AnnAssign):
continue
f = self._parse_field(item, name)
if f:
fields.append(f)
if fields:
schemas[name] = fields
return schemas
def _parse_field(self, node: ast.AnnAssign, class_name: str) -> Optional[Dict]:
if not isinstance(node.target, ast.Name):
return None
field_name = node.target.id
if field_name.startswith("_"):
return None
type_str = ast.unparse(node.annotation) if hasattr(ast, "unparse") else ""
is_required, field_type, allowed, constraints = self._analyse_type(type_str, node.value)
return {
"schema_class": class_name,
"field_name": field_name,
"field_type": field_type,
"is_required": is_required,
"allowed_values": allowed,
"constraints": constraints,
"learned_at": datetime.now().isoformat(),
"endpoint": "", # 후에 채워짐
}
def _analyse_type(
self, type_str: str, default_node: Any
) -> Tuple[bool, str, List[str], Dict]:
"""타입 문자열 + AST 기본값 노드에서 (is_required, field_type, allowed, constraints) 반환."""
# default가 있으면 required=False
is_required = default_node is None
# Optional[X] → required=False, 내부 타입 추출
if "Optional" in type_str:
is_required = False
type_str = re.sub(r"Optional\[(.+)\]", r"\1", type_str)
allowed: List[str] = []
constraints: Dict = {}
# Enum 매핑
for enum_name, vals in ENUM_MAP.items():
if enum_name in type_str:
return is_required, "enum", vals, constraints
# 기본 타입
if "int" in type_str and "str" not in type_str:
field_type = "int"
elif "float" in type_str:
field_type = "float"
elif "bool" in type_str:
field_type = "bool"
elif "List" in type_str or "list" in type_str:
field_type = "list"
elif "datetime" in type_str.lower() or "date" in type_str.lower():
field_type = "datetime"
else:
field_type = "str"
return is_required, field_type, allowed, constraints
def _scan_routers(self) -> Dict[str, str]:
"""
routers/*.py에서 @router.post/put/patch 데코레이터와
Body 파라미터 타입 힌트를 스캔해 {SchemaClass: "METHOD /path"} 반환.
"""
schema_to_ep: Dict[str, str] = {}
routers_dir = BASE_DIR / "routers"
if not routers_dir.exists():
return schema_to_ep
for py_file in routers_dir.glob("*.py"):
try:
src = py_file.read_text(encoding="utf-8")
tree = ast.parse(src)
except Exception:
continue
# prefix 추출 (APIRouter(prefix="/api/xxx"))
prefix = ""
for node in ast.walk(tree):
if isinstance(node, ast.Call):
func = getattr(node, "func", None)
if func and getattr(func, "id", "") == "APIRouter":
for kw in node.keywords:
if kw.arg == "prefix" and isinstance(kw.value, ast.Constant):
prefix = kw.value.value
break
# 함수 → 데코레이터 + 파라미터 분석
for node in ast.walk(tree):
if not isinstance(node, ast.FunctionDef):
continue
method, path = self._extract_route(node, prefix)
if not method:
continue
schema = self._extract_body_schema(node)
if schema and schema not in schema_to_ep:
schema_to_ep[schema] = f"{method} {path}"
return schema_to_ep
def _extract_route(self, node: ast.FunctionDef, prefix: str) -> Tuple[str, str]:
for dec in node.decorator_list:
call = dec if isinstance(dec, ast.Call) else None
if not call:
continue
attr = getattr(call.func, "attr", "")
method = attr.upper() if attr in ("get","post","put","patch","delete") else ""
if not method:
continue
path = ""
if call.args and isinstance(call.args[0], ast.Constant):
path = call.args[0].value
elif call.keywords:
for kw in call.keywords:
if kw.arg == "path" and isinstance(kw.value, ast.Constant):
path = kw.value.value
return method, f"{prefix}{path}"
return "", ""
def _extract_body_schema(self, node: ast.FunctionDef) -> Optional[str]:
"""함수 파라미터에서 BaseModel 서브클래스 body 파라미터 타입 추출."""
for arg in node.args.args:
if arg.annotation is None:
continue
type_str = ast.unparse(arg.annotation) if hasattr(ast, "unparse") else ""
# 단순 이름이면서 Create/Update/In 으로 끝나는 경우
if any(type_str.endswith(s) for s in _INPUT_SUFFIXES):
return type_str
return None
def _invert_manual(self) -> Dict[str, str]:
return {v: k for k, v in self._MANUAL_MAP.items()}
def _infer_endpoint(self, class_name: str) -> str:
"""스키마명에서 엔드포인트 자동 추론."""
method = "POST"
if class_name.endswith("Update") or class_name.endswith("Patch"):
method = "PUT"
base = re.sub(r"(Create|Update|In|Request|Input|Patch)$", "", class_name).lower()
return f"{method} /api/{base}s"
# ── 규칙 로드 (영속 파일) ────────────────────────────────────────────────────
def load_rules() -> Dict[str, List[Dict]]:
"""rpa_rules.json에서 규칙 로드. 없으면 빈 dict."""
if not RULES_FILE.exists():
return {}
try:
data = json.loads(RULES_FILE.read_text(encoding="utf-8"))
rules_by_ep: Dict[str, List[Dict]] = {}
for r in data.get("rules", []):
ep = r.get("endpoint", "")
rules_by_ep.setdefault(ep, []).append(r)
return rules_by_ep
except Exception:
return {}
def save_rules(rules_by_ep: Dict[str, List[Dict]]) -> None:
"""규칙 dict를 rpa_rules.json에 저장."""
all_rules = [r for rules in rules_by_ep.values() for r in rules]
payload = {
"learned_at": datetime.now().isoformat(),
"rule_count": len(all_rules),
"rules": all_rules,
}
RULES_FILE.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
# ── Validation 검증기 ────────────────────────────────────────────────────────
class RPAValidator:
"""학습된 규칙으로 payload 검증."""
def __init__(self, rules: List[Dict]):
self.rules = {r["field_name"]: r for r in rules}
def validate(self, payload: Dict[str, Any]) -> List[str]:
errors: List[str] = []
for field_name, rule in self.rules.items():
val = payload.get(field_name)
if rule["is_required"] and (val is None or val == ""):
errors.append(f"[{field_name}] 필수 항목입니다.")
continue
if val is None:
continue
if rule["field_type"] == "enum" and rule["allowed_values"]:
if val not in rule["allowed_values"]:
errors.append(
f"[{field_name}] 허용값: {rule['allowed_values']} 중 하나여야 합니다. (입력: {val!r})"
)
elif rule["field_type"] == "int":
try:
int(val)
except (TypeError, ValueError):
errors.append(f"[{field_name}] 정수 타입이어야 합니다.")
elif rule["field_type"] == "bool":
if not isinstance(val, bool):
errors.append(f"[{field_name}] 불리언(true/false) 타입이어야 합니다.")
c = rule.get("constraints", {})
if c.get("max_length") and isinstance(val, str) and len(val) > c["max_length"]:
errors.append(f"[{field_name}] 최대 {c['max_length']}자 초과.")
if c.get("min_length") and isinstance(val, str) and len(val) < c["min_length"]:
errors.append(f"[{field_name}] 최소 {c['min_length']}자 이상 필요.")
return errors
# ── RPA 실행 엔진 ────────────────────────────────────────────────────────────
TASK_ENDPOINT_MAP: Dict[str, Tuple[str, str]] = {
"SR_CREATE": ("POST", "/api/tasks"),
"SR_STATUS_UPDATE": ("PATCH", "/api/tasks/{sr_id}/status"),
"APPROVAL_PROCESS": ("POST", "/api/approvals"),
"INCIDENT_CREATE": ("POST", "/api/incidents"),
"SHELL_EXEC": ("POST", "/api/ssh/exec"),
"SR_BATCH_CREATE": ("POST", "/api/tasks/batch"),
}
class RPAExecutor:
"""학습 규칙 기반 ITSM API 자동 호출."""
def __init__(self, base_url: str, token: str):
self.base_url = base_url.rstrip("/")
self.headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
async def execute(
self,
task_type: str,
payload: Dict[str, Any],
dry_run: bool = False,
retry: int = 3,
) -> Dict[str, Any]:
if task_type not in TASK_ENDPOINT_MAP:
return {"status": "FAILED", "error": f"알 수 없는 task_type: {task_type}"}
method, path_tmpl = TASK_ENDPOINT_MAP[task_type]
path = path_tmpl.format(**payload)
result: Dict[str, Any] = {
"task_type": task_type,
"endpoint": f"{method} {path}",
"dry_run": dry_run,
}
if dry_run:
result.update(status="DRY_RUN_OK",
message="Validation 통과. dry_run=true이므로 실제 실행 생략.")
return result
url = f"{self.base_url}{path}"
last_err: Optional[str] = None
async with httpx.AsyncClient(timeout=30) as client:
for attempt in range(1, retry + 1):
try:
r = await client.request(method, url, json=payload, headers=self.headers)
if r.status_code < 300:
result.update(status="SUCCESS", response=r.json())
return result
elif r.status_code < 500:
result.update(status="FAILED", error=r.json())
return result
else:
last_err = f"HTTP {r.status_code}: {r.text[:200]}"
except Exception as e:
last_err = str(e)
if attempt < retry:
import asyncio
await asyncio.sleep(2 ** attempt)
result.update(status="FAILED", error=f"{retry}회 재시도 후 실패: {last_err}")
return result