""" RPA Engine — 소스 기반 Validation 학습 + 자동화 실행 + 크론 스케줄러 연동 """ from __future__ import annotations import ast import json import re from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import httpx BASE_DIR = Path(__file__).resolve().parent.parent # itsm/ RULES_FILE = BASE_DIR / "rpa_rules.json" # 학습 규칙 영속 파일 # 학습 대상 스키마: Create/Update/In/Request 접미사만 허용 _INPUT_SUFFIXES = ("Create", "Update", "In", "Request", "Input", "Patch") # 제외 접미사 _SKIP_SUFFIXES = ("Out", "Response", "Data", "Result", "Info", "Config", "Filter", "Query", "Report", "Summary", "Status") # ── Enum 매핑 ──────────────────────────────────────────────────────────────── ENUM_MAP: Dict[str, List[str]] = { "SRType": ["DEPLOY", "RESTART", "LOG", "INQUIRY", "OTHER"], "SRStatus": ["RECEIVED","PARSED","PENDING_APPROVAL","APPROVED", "IN_PROGRESS","PENDING_PM_VALIDATION","COMPLETED", "FAILED_ROLLBACK","REJECTED"], "Priority": ["CRITICAL", "HIGH", "MEDIUM", "LOW"], "ApprovalResult": ["PENDING", "APPROVED", "REJECTED"], "Severity": ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"], "ChangeType": ["STANDARD", "NORMAL", "EMERGENCY"], "ChangeStatus": ["DRAFT", "SUBMITTED", "APPROVED", "REJECTED", "IN_PROGRESS", "COMPLETED", "CANCELLED"], "ProblemStatus": ["OPEN", "IN_ANALYSIS", "KNOWN_ERROR", "RESOLVED", "CLOSED"], "NetworkDeviceType": ["SWITCH", "ROUTER", "FIREWALL", "LOAD_BALANCER", "OTHER"], "DRStatus": ["STANDBY", "ACTIVE", "TESTING", "FAILED"], "RiskLevel": ["CRITICAL", "HIGH", "MEDIUM", "LOW"], } # ── Validation 학습 ───────────────────────────────────────────────────────── class ValidationLearner: """ models.py AST 파싱 + routers/ 스캔으로 Pydantic 입력 스키마 validation 규칙 추출. 결과는 rpa_rules.json에 영속 저장. """ # 수동 엔드포인트 → 스키마 매핑 (자동 스캔으로도 보완됨) _MANUAL_MAP: Dict[str, str] = { "POST /api/tasks": "SRCreate", "PATCH /api/tasks/status": "SRStatusUpdate", "POST /api/approvals": "ApprovalCreate", "POST /api/institutions": "InstitutionCreate", "PUT /api/institutions/{id}": "InstitutionUpdate", "POST /api/servers": "ServerCreate", "POST /api/incidents": "IncidentCreate", "POST /api/change": "RFCCreate", "POST /api/problems": "ProblemCreate", "POST /api/catalog": "ServiceCatalogCreate", "POST /api/kb": "KBDocumentCreate", "POST /api/shell-scripts": "ShellScriptCreate", "POST /api/ssh/exec": "SSHExecRequest", } def learn_from_source(self) -> Dict[str, Any]: """ 1) models.py AST 파싱 → 입력 스키마만 추출 2) routers/ 스캔 → 엔드포인트-스키마 매핑 자동 보완 3) 결과를 rpa_rules.json에 저장 """ # Step 1: 스키마 추출 schemas = self._parse_models() # Step 2: 라우터 스캔으로 엔드포인트 매핑 자동 보완 router_map = self._scan_routers() ep_map = {**self._invert_manual(), **router_map} # router 스캔 우선 # Step 3: 규칙 생성 rules: List[Dict] = [] for class_name, fields in schemas.items(): endpoint = ep_map.get(class_name, self._infer_endpoint(class_name)) for field in fields: field["endpoint"] = endpoint rules.append(field) # Step 4: 영속 저장 payload = { "learned_at": datetime.now().isoformat(), "schema_count": len(schemas), "rule_count": len(rules), "rules": rules, } RULES_FILE.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") return { "rules": rules, "schemas": list(schemas.keys()), "endpoint_count": len(set(r["endpoint"] for r in rules)), } def _parse_models(self) -> Dict[str, List[Dict]]: """models.py에서 입력 스키마 클래스만 파싱.""" src = (BASE_DIR / "models.py").read_text(encoding="utf-8") tree = ast.parse(src) schemas: Dict[str, List[Dict]] = {} for node in ast.walk(tree): if not isinstance(node, ast.ClassDef): continue bases = [getattr(b, "id", "") for b in node.bases] if "BaseModel" not in bases: continue name = node.name # Out/Response 등 제외 if any(name.endswith(s) for s in _SKIP_SUFFIXES): continue # Create/Update 등만 허용 if not any(name.endswith(s) for s in _INPUT_SUFFIXES): continue fields = [] for item in node.body: if not isinstance(item, ast.AnnAssign): continue f = self._parse_field(item, name) if f: fields.append(f) if fields: schemas[name] = fields return schemas def _parse_field(self, node: ast.AnnAssign, class_name: str) -> Optional[Dict]: if not isinstance(node.target, ast.Name): return None field_name = node.target.id if field_name.startswith("_"): return None type_str = ast.unparse(node.annotation) if hasattr(ast, "unparse") else "" is_required, field_type, allowed, constraints = self._analyse_type(type_str, node.value) return { "schema_class": class_name, "field_name": field_name, "field_type": field_type, "is_required": is_required, "allowed_values": allowed, "constraints": constraints, "learned_at": datetime.now().isoformat(), "endpoint": "", # 후에 채워짐 } def _analyse_type( self, type_str: str, default_node: Any ) -> Tuple[bool, str, List[str], Dict]: """타입 문자열 + AST 기본값 노드에서 (is_required, field_type, allowed, constraints) 반환.""" # default가 있으면 required=False is_required = default_node is None # Optional[X] → required=False, 내부 타입 추출 if "Optional" in type_str: is_required = False type_str = re.sub(r"Optional\[(.+)\]", r"\1", type_str) allowed: List[str] = [] constraints: Dict = {} # Enum 매핑 for enum_name, vals in ENUM_MAP.items(): if enum_name in type_str: return is_required, "enum", vals, constraints # 기본 타입 if "int" in type_str and "str" not in type_str: field_type = "int" elif "float" in type_str: field_type = "float" elif "bool" in type_str: field_type = "bool" elif "List" in type_str or "list" in type_str: field_type = "list" elif "datetime" in type_str.lower() or "date" in type_str.lower(): field_type = "datetime" else: field_type = "str" return is_required, field_type, allowed, constraints def _scan_routers(self) -> Dict[str, str]: """ routers/*.py에서 @router.post/put/patch 데코레이터와 Body 파라미터 타입 힌트를 스캔해 {SchemaClass: "METHOD /path"} 반환. """ schema_to_ep: Dict[str, str] = {} routers_dir = BASE_DIR / "routers" if not routers_dir.exists(): return schema_to_ep for py_file in routers_dir.glob("*.py"): try: src = py_file.read_text(encoding="utf-8") tree = ast.parse(src) except Exception: continue # prefix 추출 (APIRouter(prefix="/api/xxx")) prefix = "" for node in ast.walk(tree): if isinstance(node, ast.Call): func = getattr(node, "func", None) if func and getattr(func, "id", "") == "APIRouter": for kw in node.keywords: if kw.arg == "prefix" and isinstance(kw.value, ast.Constant): prefix = kw.value.value break # 함수 → 데코레이터 + 파라미터 분석 for node in ast.walk(tree): if not isinstance(node, ast.FunctionDef): continue method, path = self._extract_route(node, prefix) if not method: continue schema = self._extract_body_schema(node) if schema and schema not in schema_to_ep: schema_to_ep[schema] = f"{method} {path}" return schema_to_ep def _extract_route(self, node: ast.FunctionDef, prefix: str) -> Tuple[str, str]: for dec in node.decorator_list: call = dec if isinstance(dec, ast.Call) else None if not call: continue attr = getattr(call.func, "attr", "") method = attr.upper() if attr in ("get","post","put","patch","delete") else "" if not method: continue path = "" if call.args and isinstance(call.args[0], ast.Constant): path = call.args[0].value elif call.keywords: for kw in call.keywords: if kw.arg == "path" and isinstance(kw.value, ast.Constant): path = kw.value.value return method, f"{prefix}{path}" return "", "" def _extract_body_schema(self, node: ast.FunctionDef) -> Optional[str]: """함수 파라미터에서 BaseModel 서브클래스 body 파라미터 타입 추출.""" for arg in node.args.args: if arg.annotation is None: continue type_str = ast.unparse(arg.annotation) if hasattr(ast, "unparse") else "" # 단순 이름이면서 Create/Update/In 으로 끝나는 경우 if any(type_str.endswith(s) for s in _INPUT_SUFFIXES): return type_str return None def _invert_manual(self) -> Dict[str, str]: return {v: k for k, v in self._MANUAL_MAP.items()} def _infer_endpoint(self, class_name: str) -> str: """스키마명에서 엔드포인트 자동 추론.""" method = "POST" if class_name.endswith("Update") or class_name.endswith("Patch"): method = "PUT" base = re.sub(r"(Create|Update|In|Request|Input|Patch)$", "", class_name).lower() return f"{method} /api/{base}s" # ── 규칙 로드 (영속 파일) ──────────────────────────────────────────────────── def load_rules() -> Dict[str, List[Dict]]: """rpa_rules.json에서 규칙 로드. 없으면 빈 dict.""" if not RULES_FILE.exists(): return {} try: data = json.loads(RULES_FILE.read_text(encoding="utf-8")) rules_by_ep: Dict[str, List[Dict]] = {} for r in data.get("rules", []): ep = r.get("endpoint", "") rules_by_ep.setdefault(ep, []).append(r) return rules_by_ep except Exception: return {} def save_rules(rules_by_ep: Dict[str, List[Dict]]) -> None: """규칙 dict를 rpa_rules.json에 저장.""" all_rules = [r for rules in rules_by_ep.values() for r in rules] payload = { "learned_at": datetime.now().isoformat(), "rule_count": len(all_rules), "rules": all_rules, } RULES_FILE.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") # ── Validation 검증기 ──────────────────────────────────────────────────────── class RPAValidator: """학습된 규칙으로 payload 검증.""" def __init__(self, rules: List[Dict]): self.rules = {r["field_name"]: r for r in rules} def validate(self, payload: Dict[str, Any]) -> List[str]: errors: List[str] = [] for field_name, rule in self.rules.items(): val = payload.get(field_name) if rule["is_required"] and (val is None or val == ""): errors.append(f"[{field_name}] 필수 항목입니다.") continue if val is None: continue if rule["field_type"] == "enum" and rule["allowed_values"]: if val not in rule["allowed_values"]: errors.append( f"[{field_name}] 허용값: {rule['allowed_values']} 중 하나여야 합니다. (입력: {val!r})" ) elif rule["field_type"] == "int": try: int(val) except (TypeError, ValueError): errors.append(f"[{field_name}] 정수 타입이어야 합니다.") elif rule["field_type"] == "bool": if not isinstance(val, bool): errors.append(f"[{field_name}] 불리언(true/false) 타입이어야 합니다.") c = rule.get("constraints", {}) if c.get("max_length") and isinstance(val, str) and len(val) > c["max_length"]: errors.append(f"[{field_name}] 최대 {c['max_length']}자 초과.") if c.get("min_length") and isinstance(val, str) and len(val) < c["min_length"]: errors.append(f"[{field_name}] 최소 {c['min_length']}자 이상 필요.") return errors # ── RPA 실행 엔진 ──────────────────────────────────────────────────────────── TASK_ENDPOINT_MAP: Dict[str, Tuple[str, str]] = { "SR_CREATE": ("POST", "/api/tasks"), "SR_STATUS_UPDATE": ("PATCH", "/api/tasks/{sr_id}/status"), "APPROVAL_PROCESS": ("POST", "/api/approvals"), "INCIDENT_CREATE": ("POST", "/api/incidents"), "SHELL_EXEC": ("POST", "/api/ssh/exec"), "SR_BATCH_CREATE": ("POST", "/api/tasks/batch"), } class RPAExecutor: """학습 규칙 기반 ITSM API 자동 호출.""" def __init__(self, base_url: str, token: str): self.base_url = base_url.rstrip("/") self.headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} async def execute( self, task_type: str, payload: Dict[str, Any], dry_run: bool = False, retry: int = 3, ) -> Dict[str, Any]: if task_type not in TASK_ENDPOINT_MAP: return {"status": "FAILED", "error": f"알 수 없는 task_type: {task_type}"} method, path_tmpl = TASK_ENDPOINT_MAP[task_type] path = path_tmpl.format(**payload) result: Dict[str, Any] = { "task_type": task_type, "endpoint": f"{method} {path}", "dry_run": dry_run, } if dry_run: result.update(status="DRY_RUN_OK", message="Validation 통과. dry_run=true이므로 실제 실행 생략.") return result url = f"{self.base_url}{path}" last_err: Optional[str] = None async with httpx.AsyncClient(timeout=30) as client: for attempt in range(1, retry + 1): try: r = await client.request(method, url, json=payload, headers=self.headers) if r.status_code < 300: result.update(status="SUCCESS", response=r.json()) return result elif r.status_code < 500: result.update(status="FAILED", error=r.json()) return result else: last_err = f"HTTP {r.status_code}: {r.text[:200]}" except Exception as e: last_err = str(e) if attempt < retry: import asyncio await asyncio.sleep(2 ** attempt) result.update(status="FAILED", error=f"{retry}회 재시도 후 실패: {last_err}") return result