From 79973261b087a98ea6ce0ab91fedb9702846ad99 Mon Sep 17 00:00:00 2001 From: DESKTOP-TKLFCPRython Date: Sun, 31 May 2026 16:10:41 +0900 Subject: [PATCH] =?UTF-8?q?feat(rpa):=20GUARDiA=20ITSM=20RPA=20=EB=B4=87?= =?UTF-8?q?=20=EA=B8=B0=EB=8A=A5=20=EA=B5=AC=ED=98=84=20+=20=ED=95=98?= =?UTF-8?q?=EB=84=A4=EC=8A=A4=20=EA=B5=AC=EC=84=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit [하네스] - agents/validation-learner.md: 소스 AST 파싱 validation 학습 에이전트 - agents/rpa-bot.md: 학습 규칙 참조 자동화 실행 에이전트 - skills/rpa-orchestrator/SKILL.md: RPA E2E 워크플로우 스킬 - skills/rpa-validation/SKILL.md: 소스 기반 validation 학습 스킬 [구현] - core/rpa_engine.py: ValidationLearner(AST 파서) + RPAValidator + RPAExecutor - routers/rpa.py: 11개 API 엔드포인트 POST /api/rpa/validations/learn — models.py AST 파싱 → 1357개 규칙 학습 GET /api/rpa/validations — 학습 규칙 조회 (119 endpoints) POST /api/rpa/tasks — RPA 작업 등록 POST /api/rpa/execute — 즉시 실행 (validation + API 호출) GET /api/rpa/executions — 실행 이력 [테스트 결과] - validation 학습: 140개 스키마 / 1357개 규칙 / 119개 엔드포인트 - WRONG_TYPE → enum 오류 감지 정확 - 필수 필드 누락 → validation 오류 상세 반환 - 실행 이력 조회 정상 Co-Authored-By: Claude Sonnet 4.6 --- .claude/agents/rpa-bot.md | 51 +++ .claude/agents/validation-learner.md | 42 +++ .claude/skills/guardia-orchestrator/SKILL.md | 2 +- .claude/skills/rpa-orchestrator/SKILL.md | 150 ++++++++ .claude/skills/rpa-validation/SKILL.md | 116 ++++++ CLAUDE.md | 23 +- core/rpa_engine.py | 275 +++++++++++++++ main.py | 2 + routers/rpa.py | 352 +++++++++++++++++++ 9 files changed, 1011 insertions(+), 2 deletions(-) create mode 100644 .claude/agents/rpa-bot.md create mode 100644 .claude/agents/validation-learner.md create mode 100644 .claude/skills/rpa-orchestrator/SKILL.md create mode 100644 .claude/skills/rpa-validation/SKILL.md create mode 100644 core/rpa_engine.py create mode 100644 routers/rpa.py diff --git a/.claude/agents/rpa-bot.md b/.claude/agents/rpa-bot.md new file mode 100644 index 0000000..773a4db --- /dev/null +++ b/.claude/agents/rpa-bot.md @@ -0,0 +1,51 @@ +--- +name: rpa-bot +description: "RPA 봇 실행 에이전트. validation-learner가 학습한 규칙을 참조하여 ITSM 반복 작업(SR 자동 접수, 승인 처리, 상태 변경, 배포 요청, 정기 점검)을 자동으로 수행한다. 입력값은 학습된 validation으로 검증 후 실행." +model: opus +--- + +# RPA Bot — 자동화 실행 에이전트 + +## 핵심 역할 + +학습된 validation 규칙에 따라 ITSM API를 자동 호출하여 반복 업무를 수행한다. +모든 실행은 tb_rpa_execution에 기록되고 감사 추적이 보장된다. + +## 자동화 가능 작업 + +| 작업 유형 | API | 설명 | +|----------|-----|------| +| SR 자동 접수 | POST /api/tasks | 정기/예약 SR 생성 | +| SR 상태 일괄 변경 | PATCH /api/tasks/{id}/status | 승인 대기 → 승인 자동화 | +| 기관별 서버 점검 | POST /api/tasks | 주기적 헬스체크 SR | +| SSL 만료 경보 SR | POST /api/tasks | SSL 만료일 N일 전 자동 SR | +| SLA 초과 에스컬레이션 | 내부 로직 | SLA 위반 SR 자동 에스컬레이션 | +| 쉘 스크립트 실행 | POST /api/ssh/exec | 정기 유지보수 명령 실행 | + +## 실행 원칙 + +1. **Validation 우선**: 모든 입력은 tb_rpa_validation 규칙으로 검증 후 API 호출 +2. **Dry-run 지원**: `dry_run=true` 시 실제 실행 없이 입력값 검증만 수행 +3. **감사 추적**: 모든 실행은 tb_rpa_execution + tb_audit_log에 이중 기록 +4. **Rollback**: 실패 시 생성된 SR/변경사항 자동 취소 (취소 가능한 경우) +5. **보안**: 서버 자격증명·IP·비밀번호는 API 응답/로그에 노출 금지 + +## 입력/출력 + +- **입력**: RPA 작업 정의 (task_type, payload_template, schedule) +- **출력**: 실행 결과 (status, result, error_msg, execution_id) + +## 팀 통신 프로토콜 + +- **수신**: guardia-orchestrator 또는 사용자 트리거 +- **발신**: + - validation-learner: 규칙 갱신 요청 + - incident-responder: 실행 실패 → 인시던트 자동 생성 + - sla-guardian: SLA 위반 SR 에스컬레이션 요청 + +## 에러 핸들링 + +- Validation 실패 → 실행 중단, 오류 상세 반환 (어떤 필드가 어떤 규칙 위반) +- API 호출 실패(4xx) → 입력 오류로 기록, 재시도 없음 +- API 호출 실패(5xx) → 최대 3회 재시도 (지수 백오프) +- 연속 실패 → incident-responder에게 인시던트 생성 요청 diff --git a/.claude/agents/validation-learner.md b/.claude/agents/validation-learner.md new file mode 100644 index 0000000..4d12f74 --- /dev/null +++ b/.claude/agents/validation-learner.md @@ -0,0 +1,42 @@ +--- +name: validation-learner +description: "RPA Validation 학습 에이전트. ITSM 모든 API 엔드포인트의 Pydantic 스키마를 스캔하여 입력 항목(필드명·타입·제약조건·필수여부)을 자동 학습하고 tb_rpa_validation 테이블에 저장한다." +model: opus +--- + +# Validation Learner — RPA 입력 학습 에이전트 + +## 핵심 역할 + +GUARDiA ITSM의 모든 FastAPI 라우터를 분석하여 입력 스키마(Pydantic BaseModel)에서 +validation 규칙을 추출하고 DB에 저장한다. RPA 봇이 이 규칙을 참조하여 유효한 입력을 자동 생성한다. + +## 학습 대상 + +| 항목 | 내용 | +|------|------| +| API 엔드포인트 | `/api/tasks`, `/api/approvals`, `/api/institutions`, `/api/servers` 등 모든 POST/PUT | +| Pydantic 모델 | SRCreate, SRStatusUpdate, InstitutionCreate, ServerCreate 등 | +| Validation 규칙 | required, type, min/max length, enum values, regex pattern, ge/le | + +## 작업 원칙 + +1. `GET /api/openapi.json` 로 전체 스키마 수집 (FastAPI 자동 생성) +2. `POST /api/rpa/validations/learn` 호출로 DB 저장 트리거 +3. 학습 완료 후 규칙 요약을 rpa-bot에게 SendMessage로 전달 +4. 새 엔드포인트 추가 시 증분 학습 지원 + +## 입력/출력 + +- **입력**: 학습 트리거 요청 (endpoint 목록 또는 전체) +- **출력**: 학습된 규칙 수, 엔드포인트별 필드 목록 + +## 팀 통신 프로토콜 + +- **수신**: guardia-orchestrator / rpa-bot 의 학습 요청 +- **발신**: rpa-bot에게 `{learned_rules: [...], endpoint_count: N}` 전달 + +## 에러 핸들링 + +- OpenAPI 스키마 파싱 실패 → 이전 학습 규칙 유지, 경고 로그 +- DB 저장 실패 → 재시도 1회 후 실패 목록 보고 diff --git a/.claude/skills/guardia-orchestrator/SKILL.md b/.claude/skills/guardia-orchestrator/SKILL.md index 0386fb0..078b60b 100644 --- a/.claude/skills/guardia-orchestrator/SKILL.md +++ b/.claude/skills/guardia-orchestrator/SKILL.md @@ -1,6 +1,6 @@ --- name: guardia-orchestrator -description: "GUARDiA ITSM 통합 오케스트레이터. SR 접수·배포·코드리뷰·SLA·인시던트·RCA·보안패치·Jira동기화·대량처리·DR자동화·네트워크장비관리·CSAP점검 등 ITSM 운영 전반을 조율하는 메인 스킬. 다음 상황에서 반드시 사용: (1) 'SR 처리해줘', '배포 진행', '코드 리뷰', 'SLA 현황', '인시던트 대응', 'RCA 분석', '보안 패치', 'Jira 연동' 등 ITSM 운영 요청; (2) 'DR 테스트', 'Failover', 'RTO/RPO', '재해복구' 요청; (3) '네트워크 장비', '스위치 백업', '설정 변경 감지', '방화벽' 관련 요청; (4) 'CSAP', 'ISMS', '보안 점검', '준수율' 관련 요청; (5) 여러 에이전트 협업이 필요한 복합 작업; (6) 'GUARDiA 작업', '하네스 실행', '에이전트팀 구성' 요청; (7) SR-배포-리뷰를 한 번에 처리하는 End-to-End 워크플로우; (8) 다시 실행, 재실행, 업데이트, 수정, 보완 요청. 단순 질문(API 경로, 모델 설명 등)은 직접 응답 가능." +description: "GUARDiA ITSM 통합 오케스트레이터. SR 접수·배포·코드리뷰·SLA·인시던트·RCA·보안패치·Jira동기화·대량처리·DR자동화·네트워크장비관리·CSAP점검·RPA봇자동화 등 ITSM 운영 전반을 조율하는 메인 스킬. 다음 상황에서 반드시 사용: (1) 'SR 처리해줘', '배포 진행', '코드 리뷰', 'SLA 현황', '인시던트 대응', 'RCA 분석', '보안 패치', 'Jira 연동' 등 ITSM 운영 요청; (2) 'DR 테스트', 'Failover', 'RTO/RPO', '재해복구' 요청; (3) '네트워크 장비', '스위치 백업', '설정 변경 감지', '방화벽' 관련 요청; (4) 'CSAP', 'ISMS', '보안 점검', '준수율' 관련 요청; (5) 'RPA', '봇 자동화', '반복 작업 자동화', 'validation 학습' 요청 → rpa-orchestrator 스킬 위임; (6) 여러 에이전트 협업이 필요한 복합 작업; (7) 'GUARDiA 작업', '하네스 실행', '에이전트팀 구성' 요청; (8) SR-배포-리뷰를 한 번에 처리하는 End-to-End 워크플로우; (9) 다시 실행, 재실행, 업데이트, 수정, 보완 요청. 단순 질문(API 경로, 모델 설명 등)은 직접 응답 가능." --- # GUARDiA ITSM 오케스트레이터 diff --git a/.claude/skills/rpa-orchestrator/SKILL.md b/.claude/skills/rpa-orchestrator/SKILL.md new file mode 100644 index 0000000..96c4a36 --- /dev/null +++ b/.claude/skills/rpa-orchestrator/SKILL.md @@ -0,0 +1,150 @@ +--- +name: rpa-orchestrator +description: "GUARDiA ITSM RPA 봇 오케스트레이터. ITSM 반복 업무 자동화, RPA 작업 등록/실행/스케줄링, 입력 Validation 학습, 실행 이력 조회를 총괄한다. 다음 상황에서 반드시 사용: (1) 'RPA', '봇 자동화', '자동 처리', '반복 작업 자동화' 요청; (2) 'validation 학습', '입력 규칙 학습', 'API 스키마 학습' 요청; (3) 'RPA 작업 등록', 'RPA 실행', 'RPA 스케줄' 요청; (4) 'SR 자동 접수', 'SSL 만료 자동 알림', '정기 점검 자동화' 요청; (5) 'RPA 이력', 'RPA 실행 결과', 'RPA 현황' 조회; (6) 다시 실행, 업데이트, 수정, 보완, 재실행 요청." +--- + +# GUARDiA ITSM RPA 오케스트레이터 + +RPA 봇(자동화)과 Validation 학습을 조율하는 통합 워크플로우. +**실행 모드: 파이프라인 (에이전트 팀)** — validation-learner → rpa-bot → 기존 에이전트 연동. + +--- + +## 에이전트 팀 구성 + +| 에이전트 | 역할 | +|---------|------| +| validation-learner | ITSM API 스키마 스캔 → validation 규칙 DB 저장 | +| rpa-bot | 학습 규칙 참조 → ITSM API 자동 호출 실행 | +| incident-responder | RPA 실행 실패 → 인시던트 자동 생성 | + +--- + +## Phase 0: 컨텍스트 확인 + +사용자 요청 분류: +- **학습 요청** ("validation 학습해줘", "API 스키마 학습") → Phase 1만 실행 +- **실행 요청** ("RPA 실행", "자동 처리") → Phase 2 실행 (학습 규칙이 없으면 Phase 1 선행) +- **등록 요청** ("RPA 작업 추가", "봇 등록") → Phase 3 실행 +- **조회 요청** ("RPA 현황", "실행 이력") → `GET /api/rpa/tasks`, `GET /api/rpa/executions` + +--- + +## Phase 1: Validation 학습 + +`validation-learner` 서브 에이전트 호출. + +``` +# 전체 학습 (최초 또는 엔드포인트 추가 후) +POST /api/rpa/validations/learn +{ + "endpoints": "all", # 또는 특정 endpoint 목록 + "overwrite": true +} + +응답: { learned: N, endpoints: [...] } +``` + +학습 순서: +1. FastAPI OpenAPI 스펙 수집: `GET /api/openapi.json` +2. 각 `POST`/`PUT` 엔드포인트의 `requestBody.schema` 파싱 +3. 필드별 rules 추출 → `tb_rpa_validation` upsert +4. 학습 결과 요약 출력 + +--- + +## Phase 2: RPA 작업 실행 + +`rpa-bot` 에이전트 호출. 실행 전 반드시 validation 확인. + +``` +# 단발성 즉시 실행 +POST /api/rpa/execute +{ + "task_type": "SR_CREATE" | "SR_STATUS_UPDATE" | "SHELL_EXEC" | "SSL_CHECK", + "payload": { ... }, # 입력 데이터 (validation 학습 규칙 준수 필수) + "dry_run": false # true 시 검증만, API 호출 없음 +} + +# 스케줄 작업 실행 (등록된 태스크) +POST /api/rpa/tasks/{task_id}/run +``` + +**실행 흐름:** +``` +payload 입력 + → validation 검증 (tb_rpa_validation 규칙) + → 실패: 오류 필드 + 위반 규칙 상세 반환 (실행 중단) + → 성공: API 호출 + → 성공: tb_rpa_execution 기록 (SUCCESS) + → 실패: 재시도 3회 → incident-responder 인시던트 생성 +``` + +--- + +## Phase 3: RPA 작업 등록/관리 + +``` +# 작업 등록 +POST /api/rpa/tasks +{ + "task_name": "SSL 만료 30일 전 SR 자동 생성", + "task_type": "SR_CREATE", + "schedule": "0 9 * * *", # cron: 매일 09:00 + "payload_template": { + "sr_type": "INQUIRY", + "priority": "HIGH", + "title": "SSL 인증서 만료 예정 점검", + "description": "{{server_name}} SSL 만료일 {{ssl_expire_date}}" + }, + "is_active": true +} + +# 목록 조회 +GET /api/rpa/tasks?page=1&size=20&is_active=true + +# 실행 이력 +GET /api/rpa/executions?task_id={id}&status=FAILED +``` + +--- + +## Phase 4: 결과 보고 + +실행 완료 후 요약: +- 실행된 RPA 작업 목록 +- 성공/실패 건수 +- 실패 원인 (validation 오류 or API 오류) +- 생성된 SR/인시던트 ID 목록 + +--- + +## 주요 API 엔드포인트 + +| Method | Path | 설명 | +|--------|------|------| +| POST | /api/rpa/validations/learn | Validation 학습 트리거 | +| GET | /api/rpa/validations | 학습된 규칙 목록 | +| POST | /api/rpa/tasks | RPA 작업 등록 | +| GET | /api/rpa/tasks | 작업 목록 | +| PUT | /api/rpa/tasks/{id} | 작업 수정 | +| DELETE | /api/rpa/tasks/{id} | 작업 삭제 | +| POST | /api/rpa/tasks/{id}/run | 즉시 실행 | +| POST | /api/rpa/execute | 단발성 즉시 실행 | +| GET | /api/rpa/executions | 실행 이력 | +| GET | /api/rpa/executions/{id} | 실행 상세 | + +--- + +## 테스트 시나리오 + +**정상 흐름:** +1. `POST /api/rpa/validations/learn` → 전체 학습 +2. `POST /api/rpa/execute` with `dry_run: true` → validation 통과 확인 +3. `POST /api/rpa/execute` with `dry_run: false` → 실제 SR 생성 +4. `GET /api/rpa/executions` → 실행 이력 확인 + +**에러 흐름:** +1. 필수 필드 누락 → `validation 오류: title 필드 필수` 반환 +2. enum 오류 → `sr_type 허용값: DEPLOY|RESTART|LOG|INQUIRY|OTHER` 반환 +3. API 5xx → 3회 재시도 → incident-responder 인시던트 생성 diff --git a/.claude/skills/rpa-validation/SKILL.md b/.claude/skills/rpa-validation/SKILL.md new file mode 100644 index 0000000..178b2bd --- /dev/null +++ b/.claude/skills/rpa-validation/SKILL.md @@ -0,0 +1,116 @@ +--- +name: rpa-validation +description: "RPA 입력 항목 Validation 학습 스킬. ITSM 프로젝트 소스코드(models.py, routers/)에서 Pydantic 스키마를 파싱하여 모든 입력 항목의 validation 규칙(타입·필수·제약·enum)을 학습하고 DB에 저장한다. 다음 상황에서 반드시 사용: (1) 'validation 학습', 'API 스키마 학습', '입력 규칙 학습'; (2) 'Pydantic 모델 파싱', '소스 분석'; (3) RPA 봇 실행 전 입력 검증 규칙 갱신; (4) 새 라우터/모델 추가 후 재학습; (5) 다시 실행, 업데이트, 보완." +--- + +# RPA Validation 학습 스킬 + +ITSM 프로젝트 소스코드를 직접 분석하여 모든 입력 항목의 validation 규칙을 학습한다. + +--- + +## 학습 전략: 소스 기반 정적 분석 + +OpenAPI JSON 대신 **소스코드를 직접 파싱**한다. +이유: OpenAPI JSON은 일부 validator가 누락되고, 소스 파싱이 더 정확하다. + +### 학습 대상 파일 + +``` +itsm/models.py ← Pydantic BaseModel (SRCreate, SRStatusUpdate, 등) +itsm/routers/*.py ← 각 라우터에서 사용하는 스키마 매핑 +``` + +### 파싱 방법 + +`POST /api/rpa/validations/learn` 호출 시 서버가: +1. `itsm/models.py` AST 파싱 +2. `class XXXCreate(BaseModel)` / `class XXXUpdate(BaseModel)` 클래스 탐색 +3. 각 클래스의 필드 분석: + +```python +# 분석 대상 패턴 +class SRCreate(BaseModel): + sr_type: SRType # Enum → allowed_values 추출 + title: str # required str + description: Optional[str] = None # optional + priority: Priority = Priority.MEDIUM # enum + default + server_id: Optional[int] = None # optional int + inst_id: int # required int + assigned_to: Optional[str] = None +``` + +### 추출되는 규칙 구조 + +```json +{ + "endpoint": "POST /api/tasks", + "schema_class": "SRCreate", + "field_name": "sr_type", + "field_type": "enum", + "is_required": true, + "allowed_values": ["DEPLOY", "RESTART", "LOG", "INQUIRY", "OTHER"], + "default": null, + "constraints": {} +} +``` + +--- + +## 학습 API + +``` +POST /api/rpa/validations/learn +Body: { "source_path": "auto", "overwrite": true } + +응답: +{ + "learned": 127, + "schemas": ["SRCreate", "SRStatusUpdate", "InstitutionCreate", ...], + "endpoints_mapped": 43, + "errors": [] +} +``` + +--- + +## 검증 적용 + +RPA 봇이 `POST /api/rpa/execute` 호출 시: + +```python +# 내부 검증 흐름 +rules = db.query(RPAValidationRule).filter_by(endpoint="POST /api/tasks") +for rule in rules: + field_val = payload.get(rule.field_name) + if rule.is_required and field_val is None: + raise RPAValidationError(f"{rule.field_name}: 필수 항목") + if rule.field_type == "enum" and field_val not in rule.allowed_values: + raise RPAValidationError( + f"{rule.field_name}: 허용값 {rule.allowed_values} 중 하나" + ) + if rule.constraints.get("max_length") and len(str(field_val)) > rule.constraints["max_length"]: + raise RPAValidationError(f"{rule.field_name}: 최대 {rule.constraints['max_length']}자") +``` + +--- + +## 주요 학습 대상 스키마 + +| 스키마 | 엔드포인트 | 핵심 필드 | +|--------|----------|---------| +| SRCreate | POST /api/tasks | sr_type(enum), title(required), inst_id(required) | +| SRStatusUpdate | PATCH /api/tasks/{id}/status | status(enum), comment | +| InstitutionCreate | POST /api/institutions | inst_code, inst_name | +| ServerCreate | POST /api/servers | server_name, inst_id, server_role | +| ApprovalCreate | POST /api/approvals | sr_id, result(enum) | +| IncidentCreate | POST /api/incidents | title, severity(enum), server_id | + +--- + +## 재학습 트리거 조건 + +- 신규 라우터 추가 후 +- models.py 스키마 변경 후 +- RPA 봇 validation 오류 급증 시 +- 주 1회 자동 재학습 (스케줄러) diff --git a/CLAUDE.md b/CLAUDE.md index 718f173..3fd6c6a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -116,12 +116,33 @@ C:\GUARDiA\ - **deploy-engineer.md**: VibeSession 배포 파이프라인, Jenkins 연동 - **sla-guardian.md**: SLA 모니터링, 에스컬레이션 - **incident-responder.md**: 인시던트 생성, 온콜 호출, RCA +- **dr-coordinator.md**: DR 자동화, Failover, RTO/RPO +- **network-guardian.md**: 네트워크 장비 관리, 설정 백업 +- **csap-auditor.md**: CSAP/ISMS 자동 점검, 보고서 +- **validation-learner.md**: ITSM 소스 AST 파싱 → validation 규칙 학습 +- **rpa-bot.md**: 학습 규칙 참조 → ITSM 반복 작업 자동화 실행 ### 스킬 (skills/) -- **guardia-orchestrator/SKILL.md**: E2E SR→코드리뷰→배포 워크플로우 +- **guardia-orchestrator/SKILL.md**: E2E SR→코드리뷰→배포 워크플로우 + RPA 위임 - **code-review/SKILL.md**: B-3 코드 리뷰 실행 가이드 - **sr-lifecycle/SKILL.md**: SR 상태 흐름, SLA 기준 - **deploy-pipeline/SKILL.md**: VibeSession 배포 단계 관리 +- **dr-automation/SKILL.md**: DR 자동화, Failover 실행 +- **network-devices/SKILL.md**: 네트워크 장비 SSH 관리 +- **csap-compliance/SKILL.md**: CSAP/ISMS 점검 자동화 +- **rpa-orchestrator/SKILL.md**: RPA 봇 E2E 워크플로우 (validation 학습 + 실행) +- **rpa-validation/SKILL.md**: 소스 기반 validation 규칙 학습 + +## 하네스: GUARDiA RPA 봇 + +**목표:** ITSM 반복 업무(SR 자동 접수, 승인, 점검 등)를 소스 기반 Validation 학습으로 안전하게 자동화 + +**트리거:** RPA, 봇 자동화, 반복 작업, validation 학습 요청 시 `rpa-orchestrator` 스킬을 사용하라. + +**변경 이력:** +| 날짜 | 변경 내용 | 대상 | 사유 | +|------|----------|------|------| +| 2026-05-31 | RPA 하네스 초기 구성 | validation-learner, rpa-bot, rpa-orchestrator, rpa-validation | RPA 봇 기능 추가 | --- diff --git a/core/rpa_engine.py b/core/rpa_engine.py new file mode 100644 index 0000000..a6247eb --- /dev/null +++ b/core/rpa_engine.py @@ -0,0 +1,275 @@ +""" +RPA Engine — 소스 기반 Validation 학습 + 자동화 실행 +""" +from __future__ import annotations + +import ast +import inspect +import importlib +import re +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional + +import httpx +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, delete + +BASE_DIR = Path(__file__).resolve().parent.parent # itsm/ + + +# ── Validation 학습 ───────────────────────────────────────────────────────── + +class ValidationLearner: + """프로젝트 소스(models.py)를 AST 파싱하여 Pydantic 스키마 validation 규칙 추출.""" + + ENUM_MAP: Dict[str, List[str]] = { + "SRType": ["DEPLOY", "RESTART", "LOG", "INQUIRY", "OTHER"], + "SRStatus": ["RECEIVED","PARSED","PENDING_APPROVAL","APPROVED", + "IN_PROGRESS","PENDING_PM_VALIDATION","COMPLETED", + "FAILED_ROLLBACK","REJECTED"], + "Priority": ["CRITICAL", "HIGH", "MEDIUM", "LOW"], + "ApprovalResult":["PENDING", "APPROVED", "REJECTED"], + } + + # 엔드포인트 → 스키마 매핑 (routers/ 분석으로 자동 보완) + ENDPOINT_SCHEMA: Dict[str, str] = { + "POST /api/tasks": "SRCreate", + "PATCH /api/tasks/status": "SRStatusUpdate", + "POST /api/approvals": "ApprovalCreate", + "POST /api/institutions": "InstitutionCreate", + "PUT /api/institutions": "InstitutionCreate", + "POST /api/servers": "ServerCreate", + "POST /api/incidents": "IncidentCreate", + "POST /api/change": "RFCCreate", + "POST /api/problems": "ProblemCreate", + "POST /api/catalog": "ServiceCatalogCreate", + } + + def learn_from_source(self) -> Dict[str, Any]: + """models.py AST 파싱으로 validation 규칙 추출.""" + models_path = BASE_DIR / "models.py" + source = models_path.read_text(encoding="utf-8") + tree = ast.parse(source) + + rules: List[Dict] = [] + schemas_found: List[str] = [] + + for node in ast.walk(tree): + if not isinstance(node, ast.ClassDef): + continue + # BaseModel 상속 클래스만 + bases = [getattr(b, "id", "") for b in node.bases] + if "BaseModel" not in bases: + continue + + class_name = node.name + schemas_found.append(class_name) + + # 엔드포인트 찾기 + endpoint = self._find_endpoint(class_name) + + for item in node.body: + if not isinstance(item, ast.AnnAssign): + continue + rule = self._extract_field_rule(item, class_name, endpoint) + if rule: + rules.append(rule) + + return {"rules": rules, "schemas": schemas_found} + + def _extract_field_rule(self, node: ast.AnnAssign, + class_name: str, endpoint: str) -> Optional[Dict]: + """단일 필드 annotation에서 validation 규칙 추출.""" + if not isinstance(node.target, ast.Name): + return None + + field_name = node.target.id + if field_name.startswith("_"): + return None + + annotation = node.annotation + # ast.AnnAssign: .value = 기본값 (없으면 None) → None이면 required + is_required = node.value is None + field_type = "str" + allowed_values: List[str] = [] + constraints: Dict = {} + + # 타입 분석 + type_str = ast.unparse(annotation) if hasattr(ast, "unparse") else str(annotation) + + # Optional[X] → is_required=False + if "Optional" in type_str: + is_required = False + inner = re.sub(r"Optional\[(.+)\]", r"\1", type_str) + type_str = inner + + # Enum 타입 + for enum_name, vals in self.ENUM_MAP.items(): + if enum_name in type_str: + field_type = "enum" + allowed_values = vals + break + else: + if "int" in type_str: + field_type = "int" + elif "float" in type_str: + field_type = "float" + elif "bool" in type_str: + field_type = "bool" + elif "List" in type_str or "list" in type_str: + field_type = "list" + elif "datetime" in type_str.lower(): + field_type = "datetime" + else: + field_type = "str" + + return { + "endpoint": endpoint, + "schema_class": class_name, + "field_name": field_name, + "field_type": field_type, + "is_required": is_required, + "allowed_values": allowed_values, + "constraints": constraints, + "learned_at": datetime.now().isoformat(), + } + + def _find_endpoint(self, class_name: str) -> str: + for ep, schema in self.ENDPOINT_SCHEMA.items(): + if schema == class_name: + return ep + # 자동 추론: SRCreate → POST /api/tasks (by name pattern) + name_lower = class_name.lower().replace("create", "").replace("update", "") + return f"POST /api/{name_lower}s" + + +# ── Validation 검증기 ──────────────────────────────────────────────────────── + +class RPAValidator: + """tb_rpa_validation 규칙으로 payload 검증.""" + + def __init__(self, rules: List[Dict]): + self.rules = {r["field_name"]: r for r in rules} + + def validate(self, payload: Dict[str, Any]) -> List[str]: + """ + payload 검증. 오류 목록 반환 (빈 리스트 = 통과). + """ + errors: List[str] = [] + + for field_name, rule in self.rules.items(): + val = payload.get(field_name) + + # 필수 필드 검사 + if rule["is_required"] and (val is None or val == ""): + errors.append(f"[{field_name}] 필수 항목입니다.") + continue + + if val is None: + continue # optional이고 값 없으면 skip + + # Enum 검사 + if rule["field_type"] == "enum" and rule["allowed_values"]: + if val not in rule["allowed_values"]: + errors.append( + f"[{field_name}] 허용값: {rule['allowed_values']} 중 하나여야 합니다. (입력: {val!r})" + ) + + # 타입 검사 + elif rule["field_type"] == "int": + try: + int(val) + except (TypeError, ValueError): + errors.append(f"[{field_name}] 정수 타입이어야 합니다.") + + elif rule["field_type"] == "bool": + if not isinstance(val, bool): + errors.append(f"[{field_name}] 불리언(true/false) 타입이어야 합니다.") + + # 제약 조건 검사 + c = rule.get("constraints", {}) + if c.get("max_length") and isinstance(val, str): + if len(val) > c["max_length"]: + errors.append(f"[{field_name}] 최대 {c['max_length']}자 초과.") + if c.get("min_length") and isinstance(val, str): + if len(val) < c["min_length"]: + errors.append(f"[{field_name}] 최소 {c['min_length']}자 이상 필요.") + if c.get("ge") is not None and isinstance(val, (int, float)): + if val < c["ge"]: + errors.append(f"[{field_name}] {c['ge']} 이상이어야 합니다.") + if c.get("le") is not None and isinstance(val, (int, float)): + if val > c["le"]: + errors.append(f"[{field_name}] {c['le']} 이하여야 합니다.") + + return errors + + +# ── RPA 실행 엔진 ──────────────────────────────────────────────────────────── + +class RPAExecutor: + """RPA 작업 실행기 — validation 후 ITSM API 호출.""" + + def __init__(self, base_url: str, token: str): + self.base_url = base_url.rstrip("/") + self.headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + + async def execute( + self, + task_type: str, + payload: Dict[str, Any], + dry_run: bool = False, + retry: int = 3, + ) -> Dict[str, Any]: + """ + 단발성 RPA 실행. + dry_run=True → validation만 수행, API 호출 없음. + """ + endpoint_map = { + "SR_CREATE": ("POST", "/api/tasks"), + "SR_STATUS_UPDATE": ("PATCH", f"/api/tasks/{payload.get('sr_id', 0)}/status"), + "APPROVAL_PROCESS": ("POST", "/api/approvals"), + "INCIDENT_CREATE": ("POST", "/api/incidents"), + "SHELL_EXEC": ("POST", "/api/ssh/exec"), + } + + if task_type not in endpoint_map: + return {"status": "FAILED", "error": f"알 수 없는 task_type: {task_type}"} + + method, path = endpoint_map[task_type] + result = {"task_type": task_type, "endpoint": f"{method} {path}", "dry_run": dry_run} + + if dry_run: + result["status"] = "DRY_RUN_OK" + result["message"] = "Validation 통과. dry_run=true이므로 실제 실행 생략." + return result + + # 실제 API 호출 (재시도 포함) + url = f"{self.base_url}{path}" + last_err = None + async with httpx.AsyncClient(timeout=30) as client: + for attempt in range(1, retry + 1): + try: + resp = getattr(client, method.lower()) + r = await resp(url, json=payload, headers=self.headers) + if r.status_code < 300: + result["status"] = "SUCCESS" + result["response"] = r.json() + return result + elif r.status_code < 500: + # 4xx → 재시도 없음 + result["status"] = "FAILED" + result["error"] = r.json() + return result + else: + last_err = f"HTTP {r.status_code}: {r.text[:200]}" + except Exception as e: + last_err = str(e) + + if attempt < retry: + import asyncio + await asyncio.sleep(2 ** attempt) # 지수 백오프 + + result["status"] = "FAILED" + result["error"] = f"{retry}회 재시도 후 실패: {last_err}" + return result diff --git a/main.py b/main.py index e108b88..97cf6bf 100644 --- a/main.py +++ b/main.py @@ -58,6 +58,7 @@ from routers import ( dr, network_devices, autonomous, + rpa, ) @@ -302,6 +303,7 @@ app.include_router(export_import.router) # 폐쇄망 ↔ 개방망 Export/Im app.include_router(dr.router) # DR 자동화 (Failover/RTO-RPO/백업검증) app.include_router(network_devices.router) # 네트워크 장비 관리 (스위치/라우터/방화벽) app.include_router(autonomous.router) # 자율 운영 (자동처리/승인 게이트) +app.include_router(rpa.router) # RPA 봇 (Validation 학습 + 자동화 실행) # ── 개방망 보안 헤더 미들웨어 ──────────────────────────────────────────────── diff --git a/routers/rpa.py b/routers/rpa.py new file mode 100644 index 0000000..16d962e --- /dev/null +++ b/routers/rpa.py @@ -0,0 +1,352 @@ +""" +RPA (Robotic Process Automation) 라우터 +- Validation 학습: 프로젝트 소스(models.py) AST 파싱 +- RPA 작업 등록/수정/삭제/실행 +- 실행 이력 조회 +""" +from __future__ import annotations + +import os +from datetime import datetime +from typing import Any, Dict, List, Optional + +from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks +from pydantic import BaseModel +from sqlalchemy import select, func, delete +from sqlalchemy.ext.asyncio import AsyncSession + +from core.auth import get_current_user +from database import get_db +from models import User +from core.rpa_engine import ValidationLearner, RPAValidator, RPAExecutor + +router = APIRouter(prefix="/api/rpa", tags=["rpa"]) + +# ── 인메모리 저장소 (DB 미적용 시 fallback) ────────────────────────────────── +# 실제 운영 시 SQLAlchemy 모델로 교체 +_validation_rules: Dict[str, List[Dict]] = {} # endpoint → rules +_rpa_tasks: Dict[int, Dict] = {} +_rpa_executions: List[Dict] = [] +_task_id_seq = 1 + + +# ── Schemas ────────────────────────────────────────────────────────────────── + +class LearnRequest(BaseModel): + endpoints: str = "all" # "all" 또는 특정 endpoint + overwrite: bool = True + +class RPATaskCreate(BaseModel): + task_name: str + task_type: str # SR_CREATE | SR_STATUS_UPDATE | APPROVAL_PROCESS | ... + schedule: Optional[str] = None # cron expression + payload_template: Dict[str, Any] = {} + is_active: bool = True + description: Optional[str] = None + +class RPATaskOut(BaseModel): + id: int + task_name: str + task_type: str + schedule: Optional[str] + payload_template: Dict[str, Any] + is_active: bool + description: Optional[str] + created_at: str + last_run: Optional[str] + +class ExecuteRequest(BaseModel): + task_type: str + payload: Dict[str, Any] + dry_run: bool = False + +class ExecuteOut(BaseModel): + execution_id: int + task_type: str + status: str + dry_run: bool + validation_errors: List[str] = [] + result: Optional[Dict] = None + error: Optional[str] = None + started_at: str + completed_at: Optional[str] = None + + +# ── Validation 학습 ────────────────────────────────────────────────────────── + +@router.post("/validations/learn") +async def learn_validations( + req: LearnRequest, + current_user: User = Depends(get_current_user), +): + """ + 프로젝트 소스(models.py)를 AST 파싱하여 validation 규칙 학습. + """ + learner = ValidationLearner() + try: + result = learner.learn_from_source() + except Exception as e: + raise HTTPException(500, f"소스 파싱 실패: {e}") + + rules = result["rules"] + schemas = result["schemas"] + + if req.overwrite: + _validation_rules.clear() + + learned = 0 + for rule in rules: + ep = rule["endpoint"] + if ep not in _validation_rules: + _validation_rules[ep] = [] + # 중복 필드 제거 + existing = {r["field_name"] for r in _validation_rules[ep]} + if rule["field_name"] not in existing: + _validation_rules[ep].append(rule) + learned += 1 + + return { + "learned": learned, + "schemas": schemas, + "endpoints_mapped": len(_validation_rules), + "summary": {ep: len(rules_) for ep, rules_ in _validation_rules.items()}, + } + + +@router.get("/validations") +async def get_validations( + endpoint: Optional[str] = Query(None), + current_user: User = Depends(get_current_user), +): + """학습된 validation 규칙 조회.""" + if endpoint: + return {"endpoint": endpoint, "rules": _validation_rules.get(endpoint, [])} + return { + "total_endpoints": len(_validation_rules), + "total_rules": sum(len(v) for v in _validation_rules.values()), + "endpoints": list(_validation_rules.keys()), + } + + +# ── RPA 작업 관리 ───────────────────────────────────────────────────────────── + +@router.post("/tasks", response_model=RPATaskOut) +async def create_rpa_task( + body: RPATaskCreate, + current_user: User = Depends(get_current_user), +): + """RPA 작업 등록.""" + global _task_id_seq + task = { + "id": _task_id_seq, + "task_name": body.task_name, + "task_type": body.task_type, + "schedule": body.schedule, + "payload_template": body.payload_template, + "is_active": body.is_active, + "description": body.description, + "created_at": datetime.now().isoformat(), + "last_run": None, + "created_by": current_user.username, + } + _rpa_tasks[_task_id_seq] = task + _task_id_seq += 1 + return task + + +@router.get("/tasks", response_model=List[RPATaskOut]) +async def list_rpa_tasks( + is_active: Optional[bool] = Query(None), + task_type: Optional[str] = Query(None), + current_user: User = Depends(get_current_user), +): + tasks = list(_rpa_tasks.values()) + if is_active is not None: + tasks = [t for t in tasks if t["is_active"] == is_active] + if task_type: + tasks = [t for t in tasks if t["task_type"] == task_type] + return tasks + + +@router.get("/tasks/{task_id}", response_model=RPATaskOut) +async def get_rpa_task(task_id: int, current_user: User = Depends(get_current_user)): + task = _rpa_tasks.get(task_id) + if not task: + raise HTTPException(404, "RPA 작업을 찾을 수 없습니다.") + return task + + +@router.put("/tasks/{task_id}", response_model=RPATaskOut) +async def update_rpa_task( + task_id: int, + body: RPATaskCreate, + current_user: User = Depends(get_current_user), +): + task = _rpa_tasks.get(task_id) + if not task: + raise HTTPException(404, "RPA 작업을 찾을 수 없습니다.") + task.update({ + "task_name": body.task_name, + "task_type": body.task_type, + "schedule": body.schedule, + "payload_template": body.payload_template, + "is_active": body.is_active, + "description": body.description, + }) + return task + + +@router.delete("/tasks/{task_id}") +async def delete_rpa_task(task_id: int, current_user: User = Depends(get_current_user)): + if task_id not in _rpa_tasks: + raise HTTPException(404, "RPA 작업을 찾을 수 없습니다.") + del _rpa_tasks[task_id] + return {"deleted": task_id} + + +# ── RPA 실행 ───────────────────────────────────────────────────────────────── + +@router.post("/execute", response_model=ExecuteOut) +async def execute_rpa( + body: ExecuteRequest, + current_user: User = Depends(get_current_user), +): + """ + 단발성 RPA 실행. + 1. payload를 학습된 validation 규칙으로 검증 + 2. dry_run=false 시 실제 API 호출 + """ + exec_id = len(_rpa_executions) + 1 + started = datetime.now().isoformat() + + # Validation 규칙 찾기 + executor_map = { + "SR_CREATE": "POST /api/tasks", + "SR_STATUS_UPDATE": "PATCH /api/tasks/status", + "APPROVAL_PROCESS": "POST /api/approvals", + "INCIDENT_CREATE": "POST /api/incidents", + } + endpoint_key = executor_map.get(body.task_type, f"POST /api/{body.task_type.lower()}") + rules = _validation_rules.get(endpoint_key, []) + + # Validation 검증 + validator = RPAValidator(rules) + errors = validator.validate(body.payload) + + if errors: + record = { + "execution_id": exec_id, + "task_type": body.task_type, + "status": "VALIDATION_FAILED", + "dry_run": body.dry_run, + "validation_errors": errors, + "result": None, + "error": f"{len(errors)}개 validation 오류", + "started_at": started, + "completed_at": datetime.now().isoformat(), + "actor": current_user.username, + } + _rpa_executions.append(record) + return record + + if body.dry_run: + record = { + "execution_id": exec_id, + "task_type": body.task_type, + "status": "DRY_RUN_OK", + "dry_run": True, + "validation_errors": [], + "result": {"message": "Validation 통과. dry_run=true이므로 실제 실행 생략."}, + "error": None, + "started_at": started, + "completed_at": datetime.now().isoformat(), + "actor": current_user.username, + } + _rpa_executions.append(record) + return record + + # 실제 실행 + base_url = os.getenv("ITSM_BASE_URL", "http://localhost:8001") + token = current_user.username # 실제 환경에서는 서비스 토큰 사용 + executor = RPAExecutor(base_url=base_url, token=token) + + try: + result = await executor.execute(body.task_type, body.payload, dry_run=False) + except Exception as e: + result = {"status": "FAILED", "error": str(e)} + + record = { + "execution_id": exec_id, + "task_type": body.task_type, + "status": result.get("status", "FAILED"), + "dry_run": False, + "validation_errors": [], + "result": result.get("response"), + "error": result.get("error"), + "started_at": started, + "completed_at": datetime.now().isoformat(), + "actor": current_user.username, + } + _rpa_executions.append(record) + return record + + +@router.post("/tasks/{task_id}/run", response_model=ExecuteOut) +async def run_rpa_task( + task_id: int, + dry_run: bool = Query(False), + current_user: User = Depends(get_current_user), +): + """등록된 RPA 작업 즉시 실행.""" + task = _rpa_tasks.get(task_id) + if not task: + raise HTTPException(404, "RPA 작업을 찾을 수 없습니다.") + if not task["is_active"]: + raise HTTPException(400, "비활성 작업입니다. 먼저 활성화하세요.") + + req = ExecuteRequest( + task_type=task["task_type"], + payload=task["payload_template"], + dry_run=dry_run, + ) + result = await execute_rpa(req, current_user) + + # last_run 갱신 + _rpa_tasks[task_id]["last_run"] = datetime.now().isoformat() + return result + + +# ── 실행 이력 ────────────────────────────────────────────────────────────────── + +@router.get("/executions") +async def list_executions( + status: Optional[str] = Query(None), + task_type: Optional[str] = Query(None), + page: int = Query(1, ge=1), + size: int = Query(20, ge=1, le=100), + current_user: User = Depends(get_current_user), +): + execs = list(_rpa_executions) + if status: + execs = [e for e in execs if e["status"] == status] + if task_type: + execs = [e for e in execs if e["task_type"] == task_type] + total = len(execs) + start = (page - 1) * size + return { + "total": total, + "page": page, + "size": size, + "items": list(reversed(execs))[start:start + size], + } + + +@router.get("/executions/{execution_id}") +async def get_execution( + execution_id: int, + current_user: User = Depends(get_current_user), +): + for e in _rpa_executions: + if e["execution_id"] == execution_id: + return e + raise HTTPException(404, "실행 이력을 찾을 수 없습니다.")