diff --git a/itsm/core/rpa_engine.py b/itsm/core/rpa_engine.py index a6247eb3..3db7884a 100644 --- a/itsm/core/rpa_engine.py +++ b/itsm/core/rpa_engine.py @@ -1,218 +1,370 @@ """ -RPA Engine — 소스 기반 Validation 학습 + 자동화 실행 +RPA Engine — 소스 기반 Validation 학습 + 자동화 실행 + 크론 스케줄러 연동 """ from __future__ import annotations import ast -import inspect -import importlib +import json import re from datetime import datetime from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple import httpx -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy import select, delete BASE_DIR = Path(__file__).resolve().parent.parent # itsm/ +RULES_FILE = BASE_DIR / "rpa_rules.json" # 학습 규칙 영속 파일 + +# 학습 대상 스키마: Create/Update/In/Request 접미사만 허용 +_INPUT_SUFFIXES = ("Create", "Update", "In", "Request", "Input", "Patch") +# 제외 접미사 +_SKIP_SUFFIXES = ("Out", "Response", "Data", "Result", "Info", "Config", + "Filter", "Query", "Report", "Summary", "Status") + + +# ── Enum 매핑 ──────────────────────────────────────────────────────────────── + +ENUM_MAP: Dict[str, List[str]] = { + "SRType": ["DEPLOY", "RESTART", "LOG", "INQUIRY", "OTHER"], + "SRStatus": ["RECEIVED","PARSED","PENDING_APPROVAL","APPROVED", + "IN_PROGRESS","PENDING_PM_VALIDATION","COMPLETED", + "FAILED_ROLLBACK","REJECTED"], + "Priority": ["CRITICAL", "HIGH", "MEDIUM", "LOW"], + "ApprovalResult": ["PENDING", "APPROVED", "REJECTED"], + "Severity": ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"], + "ChangeType": ["STANDARD", "NORMAL", "EMERGENCY"], + "ChangeStatus": ["DRAFT", "SUBMITTED", "APPROVED", "REJECTED", + "IN_PROGRESS", "COMPLETED", "CANCELLED"], + "ProblemStatus": ["OPEN", "IN_ANALYSIS", "KNOWN_ERROR", "RESOLVED", "CLOSED"], + "NetworkDeviceType": ["SWITCH", "ROUTER", "FIREWALL", "LOAD_BALANCER", "OTHER"], + "DRStatus": ["STANDBY", "ACTIVE", "TESTING", "FAILED"], + "RiskLevel": ["CRITICAL", "HIGH", "MEDIUM", "LOW"], +} # ── Validation 학습 ───────────────────────────────────────────────────────── class ValidationLearner: - """프로젝트 소스(models.py)를 AST 파싱하여 Pydantic 스키마 validation 규칙 추출.""" + """ + models.py AST 파싱 + routers/ 스캔으로 Pydantic 입력 스키마 validation 규칙 추출. + 결과는 rpa_rules.json에 영속 저장. + """ - ENUM_MAP: Dict[str, List[str]] = { - "SRType": ["DEPLOY", "RESTART", "LOG", "INQUIRY", "OTHER"], - "SRStatus": ["RECEIVED","PARSED","PENDING_APPROVAL","APPROVED", - "IN_PROGRESS","PENDING_PM_VALIDATION","COMPLETED", - "FAILED_ROLLBACK","REJECTED"], - "Priority": ["CRITICAL", "HIGH", "MEDIUM", "LOW"], - "ApprovalResult":["PENDING", "APPROVED", "REJECTED"], - } - - # 엔드포인트 → 스키마 매핑 (routers/ 분석으로 자동 보완) - ENDPOINT_SCHEMA: Dict[str, str] = { - "POST /api/tasks": "SRCreate", - "PATCH /api/tasks/status": "SRStatusUpdate", - "POST /api/approvals": "ApprovalCreate", - "POST /api/institutions": "InstitutionCreate", - "PUT /api/institutions": "InstitutionCreate", - "POST /api/servers": "ServerCreate", - "POST /api/incidents": "IncidentCreate", - "POST /api/change": "RFCCreate", - "POST /api/problems": "ProblemCreate", - "POST /api/catalog": "ServiceCatalogCreate", + # 수동 엔드포인트 → 스키마 매핑 (자동 스캔으로도 보완됨) + _MANUAL_MAP: Dict[str, str] = { + "POST /api/tasks": "SRCreate", + "PATCH /api/tasks/status": "SRStatusUpdate", + "POST /api/approvals": "ApprovalCreate", + "POST /api/institutions": "InstitutionCreate", + "PUT /api/institutions/{id}": "InstitutionUpdate", + "POST /api/servers": "ServerCreate", + "POST /api/incidents": "IncidentCreate", + "POST /api/change": "RFCCreate", + "POST /api/problems": "ProblemCreate", + "POST /api/catalog": "ServiceCatalogCreate", + "POST /api/kb": "KBDocumentCreate", + "POST /api/shell-scripts": "ShellScriptCreate", + "POST /api/ssh/exec": "SSHExecRequest", } def learn_from_source(self) -> Dict[str, Any]: - """models.py AST 파싱으로 validation 규칙 추출.""" - models_path = BASE_DIR / "models.py" - source = models_path.read_text(encoding="utf-8") - tree = ast.parse(source) + """ + 1) models.py AST 파싱 → 입력 스키마만 추출 + 2) routers/ 스캔 → 엔드포인트-스키마 매핑 자동 보완 + 3) 결과를 rpa_rules.json에 저장 + """ + # Step 1: 스키마 추출 + schemas = self._parse_models() + # Step 2: 라우터 스캔으로 엔드포인트 매핑 자동 보완 + router_map = self._scan_routers() + ep_map = {**self._invert_manual(), **router_map} # router 스캔 우선 + + # Step 3: 규칙 생성 rules: List[Dict] = [] - schemas_found: List[str] = [] + for class_name, fields in schemas.items(): + endpoint = ep_map.get(class_name, self._infer_endpoint(class_name)) + for field in fields: + field["endpoint"] = endpoint + rules.append(field) + + # Step 4: 영속 저장 + payload = { + "learned_at": datetime.now().isoformat(), + "schema_count": len(schemas), + "rule_count": len(rules), + "rules": rules, + } + RULES_FILE.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + + return { + "rules": rules, + "schemas": list(schemas.keys()), + "endpoint_count": len(set(r["endpoint"] for r in rules)), + } + + def _parse_models(self) -> Dict[str, List[Dict]]: + """models.py에서 입력 스키마 클래스만 파싱.""" + src = (BASE_DIR / "models.py").read_text(encoding="utf-8") + tree = ast.parse(src) + schemas: Dict[str, List[Dict]] = {} for node in ast.walk(tree): if not isinstance(node, ast.ClassDef): continue - # BaseModel 상속 클래스만 bases = [getattr(b, "id", "") for b in node.bases] if "BaseModel" not in bases: continue - class_name = node.name - schemas_found.append(class_name) - - # 엔드포인트 찾기 - endpoint = self._find_endpoint(class_name) + name = node.name + # Out/Response 등 제외 + if any(name.endswith(s) for s in _SKIP_SUFFIXES): + continue + # Create/Update 등만 허용 + if not any(name.endswith(s) for s in _INPUT_SUFFIXES): + continue + fields = [] for item in node.body: if not isinstance(item, ast.AnnAssign): continue - rule = self._extract_field_rule(item, class_name, endpoint) - if rule: - rules.append(rule) + f = self._parse_field(item, name) + if f: + fields.append(f) - return {"rules": rules, "schemas": schemas_found} + if fields: + schemas[name] = fields - def _extract_field_rule(self, node: ast.AnnAssign, - class_name: str, endpoint: str) -> Optional[Dict]: - """단일 필드 annotation에서 validation 규칙 추출.""" + return schemas + + def _parse_field(self, node: ast.AnnAssign, class_name: str) -> Optional[Dict]: if not isinstance(node.target, ast.Name): return None - field_name = node.target.id if field_name.startswith("_"): return None - annotation = node.annotation - # ast.AnnAssign: .value = 기본값 (없으면 None) → None이면 required - is_required = node.value is None - field_type = "str" - allowed_values: List[str] = [] - constraints: Dict = {} - - # 타입 분석 - type_str = ast.unparse(annotation) if hasattr(ast, "unparse") else str(annotation) - - # Optional[X] → is_required=False - if "Optional" in type_str: - is_required = False - inner = re.sub(r"Optional\[(.+)\]", r"\1", type_str) - type_str = inner - - # Enum 타입 - for enum_name, vals in self.ENUM_MAP.items(): - if enum_name in type_str: - field_type = "enum" - allowed_values = vals - break - else: - if "int" in type_str: - field_type = "int" - elif "float" in type_str: - field_type = "float" - elif "bool" in type_str: - field_type = "bool" - elif "List" in type_str or "list" in type_str: - field_type = "list" - elif "datetime" in type_str.lower(): - field_type = "datetime" - else: - field_type = "str" + type_str = ast.unparse(node.annotation) if hasattr(ast, "unparse") else "" + is_required, field_type, allowed, constraints = self._analyse_type(type_str, node.value) return { - "endpoint": endpoint, "schema_class": class_name, "field_name": field_name, "field_type": field_type, "is_required": is_required, - "allowed_values": allowed_values, + "allowed_values": allowed, "constraints": constraints, "learned_at": datetime.now().isoformat(), + "endpoint": "", # 후에 채워짐 } - def _find_endpoint(self, class_name: str) -> str: - for ep, schema in self.ENDPOINT_SCHEMA.items(): - if schema == class_name: - return ep - # 자동 추론: SRCreate → POST /api/tasks (by name pattern) - name_lower = class_name.lower().replace("create", "").replace("update", "") - return f"POST /api/{name_lower}s" + def _analyse_type( + self, type_str: str, default_node: Any + ) -> Tuple[bool, str, List[str], Dict]: + """타입 문자열 + AST 기본값 노드에서 (is_required, field_type, allowed, constraints) 반환.""" + # default가 있으면 required=False + is_required = default_node is None + # Optional[X] → required=False, 내부 타입 추출 + if "Optional" in type_str: + is_required = False + type_str = re.sub(r"Optional\[(.+)\]", r"\1", type_str) + + allowed: List[str] = [] + constraints: Dict = {} + + # Enum 매핑 + for enum_name, vals in ENUM_MAP.items(): + if enum_name in type_str: + return is_required, "enum", vals, constraints + + # 기본 타입 + if "int" in type_str and "str" not in type_str: + field_type = "int" + elif "float" in type_str: + field_type = "float" + elif "bool" in type_str: + field_type = "bool" + elif "List" in type_str or "list" in type_str: + field_type = "list" + elif "datetime" in type_str.lower() or "date" in type_str.lower(): + field_type = "datetime" + else: + field_type = "str" + + return is_required, field_type, allowed, constraints + + def _scan_routers(self) -> Dict[str, str]: + """ + routers/*.py에서 @router.post/put/patch 데코레이터와 + Body 파라미터 타입 힌트를 스캔해 {SchemaClass: "METHOD /path"} 반환. + """ + schema_to_ep: Dict[str, str] = {} + routers_dir = BASE_DIR / "routers" + if not routers_dir.exists(): + return schema_to_ep + + for py_file in routers_dir.glob("*.py"): + try: + src = py_file.read_text(encoding="utf-8") + tree = ast.parse(src) + except Exception: + continue + + # prefix 추출 (APIRouter(prefix="/api/xxx")) + prefix = "" + for node in ast.walk(tree): + if isinstance(node, ast.Call): + func = getattr(node, "func", None) + if func and getattr(func, "id", "") == "APIRouter": + for kw in node.keywords: + if kw.arg == "prefix" and isinstance(kw.value, ast.Constant): + prefix = kw.value.value + break + + # 함수 → 데코레이터 + 파라미터 분석 + for node in ast.walk(tree): + if not isinstance(node, ast.FunctionDef): + continue + method, path = self._extract_route(node, prefix) + if not method: + continue + schema = self._extract_body_schema(node) + if schema and schema not in schema_to_ep: + schema_to_ep[schema] = f"{method} {path}" + + return schema_to_ep + + def _extract_route(self, node: ast.FunctionDef, prefix: str) -> Tuple[str, str]: + for dec in node.decorator_list: + call = dec if isinstance(dec, ast.Call) else None + if not call: + continue + attr = getattr(call.func, "attr", "") + method = attr.upper() if attr in ("get","post","put","patch","delete") else "" + if not method: + continue + path = "" + if call.args and isinstance(call.args[0], ast.Constant): + path = call.args[0].value + elif call.keywords: + for kw in call.keywords: + if kw.arg == "path" and isinstance(kw.value, ast.Constant): + path = kw.value.value + return method, f"{prefix}{path}" + return "", "" + + def _extract_body_schema(self, node: ast.FunctionDef) -> Optional[str]: + """함수 파라미터에서 BaseModel 서브클래스 body 파라미터 타입 추출.""" + for arg in node.args.args: + if arg.annotation is None: + continue + type_str = ast.unparse(arg.annotation) if hasattr(ast, "unparse") else "" + # 단순 이름이면서 Create/Update/In 으로 끝나는 경우 + if any(type_str.endswith(s) for s in _INPUT_SUFFIXES): + return type_str + return None + + def _invert_manual(self) -> Dict[str, str]: + return {v: k for k, v in self._MANUAL_MAP.items()} + + def _infer_endpoint(self, class_name: str) -> str: + """스키마명에서 엔드포인트 자동 추론.""" + method = "POST" + if class_name.endswith("Update") or class_name.endswith("Patch"): + method = "PUT" + base = re.sub(r"(Create|Update|In|Request|Input|Patch)$", "", class_name).lower() + return f"{method} /api/{base}s" + + +# ── 규칙 로드 (영속 파일) ──────────────────────────────────────────────────── + +def load_rules() -> Dict[str, List[Dict]]: + """rpa_rules.json에서 규칙 로드. 없으면 빈 dict.""" + if not RULES_FILE.exists(): + return {} + try: + data = json.loads(RULES_FILE.read_text(encoding="utf-8")) + rules_by_ep: Dict[str, List[Dict]] = {} + for r in data.get("rules", []): + ep = r.get("endpoint", "") + rules_by_ep.setdefault(ep, []).append(r) + return rules_by_ep + except Exception: + return {} + + +def save_rules(rules_by_ep: Dict[str, List[Dict]]) -> None: + """규칙 dict를 rpa_rules.json에 저장.""" + all_rules = [r for rules in rules_by_ep.values() for r in rules] + payload = { + "learned_at": datetime.now().isoformat(), + "rule_count": len(all_rules), + "rules": all_rules, + } + RULES_FILE.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") # ── Validation 검증기 ──────────────────────────────────────────────────────── class RPAValidator: - """tb_rpa_validation 규칙으로 payload 검증.""" + """학습된 규칙으로 payload 검증.""" def __init__(self, rules: List[Dict]): self.rules = {r["field_name"]: r for r in rules} def validate(self, payload: Dict[str, Any]) -> List[str]: - """ - payload 검증. 오류 목록 반환 (빈 리스트 = 통과). - """ errors: List[str] = [] - for field_name, rule in self.rules.items(): val = payload.get(field_name) - # 필수 필드 검사 if rule["is_required"] and (val is None or val == ""): errors.append(f"[{field_name}] 필수 항목입니다.") continue if val is None: - continue # optional이고 값 없으면 skip + continue - # Enum 검사 if rule["field_type"] == "enum" and rule["allowed_values"]: if val not in rule["allowed_values"]: errors.append( f"[{field_name}] 허용값: {rule['allowed_values']} 중 하나여야 합니다. (입력: {val!r})" ) - - # 타입 검사 elif rule["field_type"] == "int": try: int(val) except (TypeError, ValueError): errors.append(f"[{field_name}] 정수 타입이어야 합니다.") - elif rule["field_type"] == "bool": if not isinstance(val, bool): errors.append(f"[{field_name}] 불리언(true/false) 타입이어야 합니다.") - # 제약 조건 검사 c = rule.get("constraints", {}) - if c.get("max_length") and isinstance(val, str): - if len(val) > c["max_length"]: - errors.append(f"[{field_name}] 최대 {c['max_length']}자 초과.") - if c.get("min_length") and isinstance(val, str): - if len(val) < c["min_length"]: - errors.append(f"[{field_name}] 최소 {c['min_length']}자 이상 필요.") - if c.get("ge") is not None and isinstance(val, (int, float)): - if val < c["ge"]: - errors.append(f"[{field_name}] {c['ge']} 이상이어야 합니다.") - if c.get("le") is not None and isinstance(val, (int, float)): - if val > c["le"]: - errors.append(f"[{field_name}] {c['le']} 이하여야 합니다.") + if c.get("max_length") and isinstance(val, str) and len(val) > c["max_length"]: + errors.append(f"[{field_name}] 최대 {c['max_length']}자 초과.") + if c.get("min_length") and isinstance(val, str) and len(val) < c["min_length"]: + errors.append(f"[{field_name}] 최소 {c['min_length']}자 이상 필요.") return errors # ── RPA 실행 엔진 ──────────────────────────────────────────────────────────── +TASK_ENDPOINT_MAP: Dict[str, Tuple[str, str]] = { + "SR_CREATE": ("POST", "/api/tasks"), + "SR_STATUS_UPDATE": ("PATCH", "/api/tasks/{sr_id}/status"), + "APPROVAL_PROCESS": ("POST", "/api/approvals"), + "INCIDENT_CREATE": ("POST", "/api/incidents"), + "SHELL_EXEC": ("POST", "/api/ssh/exec"), + "SR_BATCH_CREATE": ("POST", "/api/tasks/batch"), +} + + class RPAExecutor: - """RPA 작업 실행기 — validation 후 ITSM API 호출.""" + """학습 규칙 기반 ITSM API 자동 호출.""" def __init__(self, base_url: str, token: str): self.base_url = base_url.rstrip("/") - self.headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + self.headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} async def execute( self, @@ -221,45 +373,34 @@ class RPAExecutor: dry_run: bool = False, retry: int = 3, ) -> Dict[str, Any]: - """ - 단발성 RPA 실행. - dry_run=True → validation만 수행, API 호출 없음. - """ - endpoint_map = { - "SR_CREATE": ("POST", "/api/tasks"), - "SR_STATUS_UPDATE": ("PATCH", f"/api/tasks/{payload.get('sr_id', 0)}/status"), - "APPROVAL_PROCESS": ("POST", "/api/approvals"), - "INCIDENT_CREATE": ("POST", "/api/incidents"), - "SHELL_EXEC": ("POST", "/api/ssh/exec"), - } - - if task_type not in endpoint_map: + if task_type not in TASK_ENDPOINT_MAP: return {"status": "FAILED", "error": f"알 수 없는 task_type: {task_type}"} - method, path = endpoint_map[task_type] - result = {"task_type": task_type, "endpoint": f"{method} {path}", "dry_run": dry_run} + method, path_tmpl = TASK_ENDPOINT_MAP[task_type] + path = path_tmpl.format(**payload) + result: Dict[str, Any] = { + "task_type": task_type, + "endpoint": f"{method} {path}", + "dry_run": dry_run, + } if dry_run: - result["status"] = "DRY_RUN_OK" - result["message"] = "Validation 통과. dry_run=true이므로 실제 실행 생략." + result.update(status="DRY_RUN_OK", + message="Validation 통과. dry_run=true이므로 실제 실행 생략.") return result - # 실제 API 호출 (재시도 포함) url = f"{self.base_url}{path}" - last_err = None + last_err: Optional[str] = None + async with httpx.AsyncClient(timeout=30) as client: for attempt in range(1, retry + 1): try: - resp = getattr(client, method.lower()) - r = await resp(url, json=payload, headers=self.headers) + r = await client.request(method, url, json=payload, headers=self.headers) if r.status_code < 300: - result["status"] = "SUCCESS" - result["response"] = r.json() + result.update(status="SUCCESS", response=r.json()) return result elif r.status_code < 500: - # 4xx → 재시도 없음 - result["status"] = "FAILED" - result["error"] = r.json() + result.update(status="FAILED", error=r.json()) return result else: last_err = f"HTTP {r.status_code}: {r.text[:200]}" @@ -268,8 +409,7 @@ class RPAExecutor: if attempt < retry: import asyncio - await asyncio.sleep(2 ** attempt) # 지수 백오프 + await asyncio.sleep(2 ** attempt) - result["status"] = "FAILED" - result["error"] = f"{retry}회 재시도 후 실패: {last_err}" + result.update(status="FAILED", error=f"{retry}회 재시도 후 실패: {last_err}") return result diff --git a/itsm/routers/rpa.py b/itsm/routers/rpa.py index 16d962ee..02974e7b 100644 --- a/itsm/routers/rpa.py +++ b/itsm/routers/rpa.py @@ -1,45 +1,85 @@ """ RPA (Robotic Process Automation) 라우터 -- Validation 학습: 프로젝트 소스(models.py) AST 파싱 -- RPA 작업 등록/수정/삭제/실행 +- Validation 학습: models.py AST + routers/ 스캔 +- 규칙 영속: rpa_rules.json +- RPA 작업 등록/수정/삭제/실행 + 크론 스케줄러 연동 - 실행 이력 조회 """ from __future__ import annotations +import json import os from datetime import datetime +from pathlib import Path from typing import Any, Dict, List, Optional -from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks +from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel -from sqlalchemy import select, func, delete from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user from database import get_db from models import User -from core.rpa_engine import ValidationLearner, RPAValidator, RPAExecutor +from core.rpa_engine import ( + ValidationLearner, RPAValidator, RPAExecutor, + load_rules, save_rules, TASK_ENDPOINT_MAP, +) router = APIRouter(prefix="/api/rpa", tags=["rpa"]) -# ── 인메모리 저장소 (DB 미적용 시 fallback) ────────────────────────────────── -# 실제 운영 시 SQLAlchemy 모델로 교체 -_validation_rules: Dict[str, List[Dict]] = {} # endpoint → rules +# ── 인메모리 저장소 (재시작 시 rpa_rules.json로 복구) ───────────────────── +_validation_rules: Dict[str, List[Dict]] = {} # endpoint → rules (런타임) _rpa_tasks: Dict[int, Dict] = {} _rpa_executions: List[Dict] = [] _task_id_seq = 1 +ITSM_BASE = os.getenv("ITSM_BASE_URL", "http://127.0.0.1:9001") + + +def _init_rules_from_file() -> None: + """서비스 시작 시 rpa_rules.json에서 규칙 복구.""" + global _validation_rules + loaded = load_rules() + if loaded: + _validation_rules.update(loaded) + total = sum(len(v) for v in loaded.values()) + print(f"[RPA] 저장된 validation 규칙 복구: {len(loaded)}개 엔드포인트, {total}개 규칙") + + +def auto_learn() -> Dict: + """서비스 시작 시 자동 학습 (규칙 파일 없을 때).""" + learner = ValidationLearner() + result = learner.learn_from_source() + rules = result["rules"] + _validation_rules.clear() + for r in rules: + ep = r["endpoint"] + _validation_rules.setdefault(ep, []) + if not any(x["field_name"] == r["field_name"] for x in _validation_rules[ep]): + _validation_rules[ep].append(r) + return result + + +# ── 초기화: 파일에서 규칙 복구, 없으면 즉시 학습 ───────────────────────── +_init_rules_from_file() +if not _validation_rules: + try: + auto_learn() + print("[RPA] 초기 Validation 자동 학습 완료") + except Exception as e: + print(f"[RPA] 초기 학습 실패 (수동으로 POST /api/rpa/validations/learn 호출): {e}") + # ── Schemas ────────────────────────────────────────────────────────────────── class LearnRequest(BaseModel): - endpoints: str = "all" # "all" 또는 특정 endpoint + endpoints: str = "all" overwrite: bool = True class RPATaskCreate(BaseModel): task_name: str - task_type: str # SR_CREATE | SR_STATUS_UPDATE | APPROVAL_PROCESS | ... - schedule: Optional[str] = None # cron expression + task_type: str + schedule: Optional[str] = None payload_template: Dict[str, Any] = {} is_active: bool = True description: Optional[str] = None @@ -60,17 +100,6 @@ class ExecuteRequest(BaseModel): payload: Dict[str, Any] dry_run: bool = False -class ExecuteOut(BaseModel): - execution_id: int - task_type: str - status: str - dry_run: bool - validation_errors: List[str] = [] - result: Optional[Dict] = None - error: Optional[str] = None - started_at: str - completed_at: Optional[str] = None - # ── Validation 학습 ────────────────────────────────────────────────────────── @@ -79,9 +108,7 @@ async def learn_validations( req: LearnRequest, current_user: User = Depends(get_current_user), ): - """ - 프로젝트 소스(models.py)를 AST 파싱하여 validation 규칙 학습. - """ + """models.py + routers/ 소스 분석으로 validation 규칙 학습.""" learner = ValidationLearner() try: result = learner.learn_from_source() @@ -89,38 +116,39 @@ async def learn_validations( raise HTTPException(500, f"소스 파싱 실패: {e}") rules = result["rules"] - schemas = result["schemas"] - if req.overwrite: _validation_rules.clear() learned = 0 - for rule in rules: - ep = rule["endpoint"] - if ep not in _validation_rules: - _validation_rules[ep] = [] - # 중복 필드 제거 - existing = {r["field_name"] for r in _validation_rules[ep]} - if rule["field_name"] not in existing: - _validation_rules[ep].append(rule) + for r in rules: + ep = r["endpoint"] + _validation_rules.setdefault(ep, []) + existing = {x["field_name"] for x in _validation_rules[ep]} + if r["field_name"] not in existing: + _validation_rules[ep].append(r) learned += 1 return { "learned": learned, - "schemas": schemas, + "schemas": result["schemas"], "endpoints_mapped": len(_validation_rules), - "summary": {ep: len(rules_) for ep, rules_ in _validation_rules.items()}, + "total_rules": sum(len(v) for v in _validation_rules.values()), + "summary": {ep: len(rs) for ep, rs in list(_validation_rules.items())[:10]}, } @router.get("/validations") async def get_validations( endpoint: Optional[str] = Query(None), + schema: Optional[str] = Query(None), current_user: User = Depends(get_current_user), ): """학습된 validation 규칙 조회.""" if endpoint: - return {"endpoint": endpoint, "rules": _validation_rules.get(endpoint, [])} + rules = _validation_rules.get(endpoint, []) + if schema: + rules = [r for r in rules if r.get("schema_class") == schema] + return {"endpoint": endpoint, "rule_count": len(rules), "rules": rules} return { "total_endpoints": len(_validation_rules), "total_rules": sum(len(v) for v in _validation_rules.values()), @@ -128,6 +156,17 @@ async def get_validations( } +@router.get("/validations/schemas") +async def list_schemas(current_user: User = Depends(get_current_user)): + """학습된 스키마 목록과 각 필드 수.""" + schema_map: Dict[str, int] = {} + for rules in _validation_rules.values(): + for r in rules: + sc = r.get("schema_class", "") + schema_map[sc] = schema_map.get(sc, 0) + 1 + return {"schemas": schema_map} + + # ── RPA 작업 관리 ───────────────────────────────────────────────────────────── @router.post("/tasks", response_model=RPATaskOut) @@ -135,8 +174,11 @@ async def create_rpa_task( body: RPATaskCreate, current_user: User = Depends(get_current_user), ): - """RPA 작업 등록.""" global _task_id_seq + if body.task_type not in TASK_ENDPOINT_MAP: + raise HTTPException(400, + f"지원하지 않는 task_type. 허용값: {list(TASK_ENDPOINT_MAP.keys())}") + task = { "id": _task_id_seq, "task_name": body.task_name, @@ -150,6 +192,11 @@ async def create_rpa_task( "created_by": current_user.username, } _rpa_tasks[_task_id_seq] = task + + # APScheduler에 크론 등록 + if body.schedule and body.is_active: + _register_cron(task) + _task_id_seq += 1 return task @@ -185,6 +232,10 @@ async def update_rpa_task( task = _rpa_tasks.get(task_id) if not task: raise HTTPException(404, "RPA 작업을 찾을 수 없습니다.") + + # 기존 크론 제거 + _unregister_cron(task_id) + task.update({ "task_name": body.task_name, "task_type": body.task_type, @@ -193,105 +244,104 @@ async def update_rpa_task( "is_active": body.is_active, "description": body.description, }) + + if body.schedule and body.is_active: + _register_cron(task) + return task +@router.patch("/tasks/{task_id}/toggle") +async def toggle_rpa_task(task_id: int, current_user: User = Depends(get_current_user)): + """작업 활성/비활성 토글.""" + task = _rpa_tasks.get(task_id) + if not task: + raise HTTPException(404, "RPA 작업을 찾을 수 없습니다.") + task["is_active"] = not task["is_active"] + if task["is_active"] and task.get("schedule"): + _register_cron(task) + else: + _unregister_cron(task_id) + return {"id": task_id, "is_active": task["is_active"]} + + @router.delete("/tasks/{task_id}") async def delete_rpa_task(task_id: int, current_user: User = Depends(get_current_user)): if task_id not in _rpa_tasks: raise HTTPException(404, "RPA 작업을 찾을 수 없습니다.") + _unregister_cron(task_id) del _rpa_tasks[task_id] return {"deleted": task_id} # ── RPA 실행 ───────────────────────────────────────────────────────────────── -@router.post("/execute", response_model=ExecuteOut) +@router.post("/execute") async def execute_rpa( body: ExecuteRequest, current_user: User = Depends(get_current_user), ): - """ - 단발성 RPA 실행. - 1. payload를 학습된 validation 규칙으로 검증 - 2. dry_run=false 시 실제 API 호출 - """ + """단발성 RPA 실행 (validation → 실행 → 이력 기록).""" + global _rpa_executions exec_id = len(_rpa_executions) + 1 started = datetime.now().isoformat() - # Validation 규칙 찾기 - executor_map = { - "SR_CREATE": "POST /api/tasks", - "SR_STATUS_UPDATE": "PATCH /api/tasks/status", - "APPROVAL_PROCESS": "POST /api/approvals", - "INCIDENT_CREATE": "POST /api/incidents", - } - endpoint_key = executor_map.get(body.task_type, f"POST /api/{body.task_type.lower()}") - rules = _validation_rules.get(endpoint_key, []) + # 해당 task_type의 엔드포인트 규칙 찾기 + from core.rpa_engine import TASK_ENDPOINT_MAP + method_path = TASK_ENDPOINT_MAP.get(body.task_type) + if not method_path: + raise HTTPException(400, f"알 수 없는 task_type: {body.task_type}. 허용값: {list(TASK_ENDPOINT_MAP.keys())}") - # Validation 검증 + ep_key = f"{method_path[0]} {method_path[1]}" + # path template → 실제 key (예: PATCH /api/tasks/{sr_id}/status → PATCH /api/tasks/status) + ep_key_norm = ep_key.split("{")[0].rstrip("/") + rules = _validation_rules.get(ep_key, []) or _validation_rules.get(ep_key_norm, []) + + # Validation validator = RPAValidator(rules) errors = validator.validate(body.payload) + record: Dict[str, Any] = { + "execution_id": exec_id, + "task_type": body.task_type, + "dry_run": body.dry_run, + "validation_errors": errors, + "started_at": started, + "actor": current_user.username, + } + if errors: - record = { - "execution_id": exec_id, - "task_type": body.task_type, - "status": "VALIDATION_FAILED", - "dry_run": body.dry_run, - "validation_errors": errors, - "result": None, - "error": f"{len(errors)}개 validation 오류", - "started_at": started, - "completed_at": datetime.now().isoformat(), - "actor": current_user.username, - } + record.update(status="VALIDATION_FAILED", + error=f"{len(errors)}개 validation 오류", result=None, + completed_at=datetime.now().isoformat()) _rpa_executions.append(record) return record if body.dry_run: - record = { - "execution_id": exec_id, - "task_type": body.task_type, - "status": "DRY_RUN_OK", - "dry_run": True, - "validation_errors": [], - "result": {"message": "Validation 통과. dry_run=true이므로 실제 실행 생략."}, - "error": None, - "started_at": started, - "completed_at": datetime.now().isoformat(), - "actor": current_user.username, - } + record.update(status="DRY_RUN_OK", + result={"message": "Validation 통과. dry_run=true이므로 실제 실행 생략."}, + error=None, completed_at=datetime.now().isoformat()) _rpa_executions.append(record) return record # 실제 실행 - base_url = os.getenv("ITSM_BASE_URL", "http://localhost:8001") - token = current_user.username # 실제 환경에서는 서비스 토큰 사용 - executor = RPAExecutor(base_url=base_url, token=token) - + executor = RPAExecutor(base_url=ITSM_BASE, token=_get_service_token(current_user)) try: result = await executor.execute(body.task_type, body.payload, dry_run=False) except Exception as e: result = {"status": "FAILED", "error": str(e)} - record = { - "execution_id": exec_id, - "task_type": body.task_type, - "status": result.get("status", "FAILED"), - "dry_run": False, - "validation_errors": [], - "result": result.get("response"), - "error": result.get("error"), - "started_at": started, - "completed_at": datetime.now().isoformat(), - "actor": current_user.username, - } + record.update( + status=result.get("status", "FAILED"), + result=result.get("response"), + error=result.get("error"), + completed_at=datetime.now().isoformat(), + ) _rpa_executions.append(record) return record -@router.post("/tasks/{task_id}/run", response_model=ExecuteOut) +@router.post("/tasks/{task_id}/run") async def run_rpa_task( task_id: int, dry_run: bool = Query(False), @@ -304,14 +354,9 @@ async def run_rpa_task( if not task["is_active"]: raise HTTPException(400, "비활성 작업입니다. 먼저 활성화하세요.") - req = ExecuteRequest( - task_type=task["task_type"], - payload=task["payload_template"], - dry_run=dry_run, - ) + req = ExecuteRequest(task_type=task["task_type"], + payload=task["payload_template"], dry_run=dry_run) result = await execute_rpa(req, current_user) - - # last_run 갱신 _rpa_tasks[task_id]["last_run"] = datetime.now().isoformat() return result @@ -328,25 +373,130 @@ async def list_executions( ): execs = list(_rpa_executions) if status: - execs = [e for e in execs if e["status"] == status] + execs = [e for e in execs if e.get("status") == status] if task_type: - execs = [e for e in execs if e["task_type"] == task_type] + execs = [e for e in execs if e.get("task_type") == task_type] total = len(execs) start = (page - 1) * size - return { - "total": total, - "page": page, - "size": size, - "items": list(reversed(execs))[start:start + size], - } + return {"total": total, "page": page, "size": size, + "items": list(reversed(execs))[start:start + size]} @router.get("/executions/{execution_id}") -async def get_execution( - execution_id: int, - current_user: User = Depends(get_current_user), -): +async def get_execution(execution_id: int, current_user: User = Depends(get_current_user)): for e in _rpa_executions: if e["execution_id"] == execution_id: return e raise HTTPException(404, "실행 이력을 찾을 수 없습니다.") + + +@router.get("/status") +async def rpa_status(current_user: User = Depends(get_current_user)): + """RPA 시스템 현황 요약.""" + return { + "validation_endpoints": len(_validation_rules), + "validation_rules": sum(len(v) for v in _validation_rules.values()), + "tasks_total": len(_rpa_tasks), + "tasks_active": sum(1 for t in _rpa_tasks.values() if t["is_active"]), + "executions_total": len(_rpa_executions), + "executions_success": sum(1 for e in _rpa_executions if e.get("status") == "SUCCESS"), + "executions_failed": sum(1 for e in _rpa_executions + if e.get("status") in ("FAILED", "VALIDATION_FAILED")), + "supported_task_types": list(TASK_ENDPOINT_MAP.keys()), + } + + +# ── APScheduler 연동 ────────────────────────────────────────────────────────── + +def _register_cron(task: Dict) -> None: + """APScheduler에 크론 잡 등록.""" + try: + from core.scheduler import scheduler + cron = task.get("schedule", "") + if not cron: + return + parts = cron.split() + if len(parts) < 5: + return + minute, hour, day, month, day_of_week = parts[:5] + job_id = f"rpa_task_{task['id']}" + scheduler.add_job( + _run_task_background, + trigger="cron", + id=job_id, + replace_existing=True, + minute=minute, hour=hour, + day=day, month=month, + day_of_week=day_of_week, + args=[task["id"]], + ) + print(f"[RPA] 크론 등록: {job_id} ({cron})") + except Exception as e: + print(f"[RPA] 크론 등록 실패 (task_id={task['id']}): {e}") + + +def _unregister_cron(task_id: int) -> None: + try: + from core.scheduler import scheduler + job_id = f"rpa_task_{task_id}" + if scheduler.get_job(job_id): + scheduler.remove_job(job_id) + except Exception: + pass + + +def _run_task_background(task_id: int) -> None: + """크론에 의해 백그라운드에서 호출되는 RPA 실행 함수.""" + import asyncio + task = _rpa_tasks.get(task_id) + if not task or not task["is_active"]: + return + + exec_id = len(_rpa_executions) + 1 + started = datetime.now().isoformat() + ep = TASK_ENDPOINT_MAP.get(task["task_type"], ("", ""))[1] + ep_key = f"{TASK_ENDPOINT_MAP.get(task['task_type'], ('POST',''))[0]} {ep}" + rules = _validation_rules.get(ep_key, []) + validator = RPAValidator(rules) + errors = validator.validate(task["payload_template"]) + + record: Dict[str, Any] = { + "execution_id": exec_id, + "task_type": task["task_type"], + "dry_run": False, + "validation_errors": errors, + "started_at": started, + "actor": "rpa-scheduler", + } + + if errors: + record.update(status="VALIDATION_FAILED", + error=f"{len(errors)}개 validation 오류", result=None, + completed_at=datetime.now().isoformat()) + else: + async def _run(): + executor = RPAExecutor(base_url=ITSM_BASE, token="") + return await executor.execute(task["task_type"], task["payload_template"]) + try: + loop = asyncio.new_event_loop() + result = loop.run_until_complete(_run()) + loop.close() + except Exception as e: + result = {"status": "FAILED", "error": str(e)} + + record.update( + status=result.get("status", "FAILED"), + result=result.get("response"), + error=result.get("error"), + completed_at=datetime.now().isoformat(), + ) + + _rpa_executions.append(record) + _rpa_tasks[task_id]["last_run"] = datetime.now().isoformat() + print(f"[RPA Scheduler] task_id={task_id} status={record['status']}") + + +def _get_service_token(user: User) -> str: + """서비스 계정용 토큰 생성 (내부 API 호출용).""" + from core.auth import create_access_token + return create_access_token({"sub": user.username, "role": user.role})