""" Agentic AIOps — MCP-compatible tool-calling 멀티에이전트 엔진 엔드포인트: POST /api/agent/run — 에이전트 태스크 실행 GET /api/agent/tools — 사용 가능 도구 목록 GET /api/agent/runs — 실행 이력 GET /api/agent/runs/{id} — 실행 상세 POST /api/agent/approve/{id} — 인간 승인 게이트 POST /api/agent/stop/{id} — 실행 중단 """ from __future__ import annotations import json import logging from datetime import datetime from typing import Any, Dict, List, Optional import httpx from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException from pydantic import BaseModel from sqlalchemy import select, desc from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user, require_admin_role from database import get_db from models import User, AgentRun, AgentToolCall logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/agent", tags=["Agentic AIOps"]) OLLAMA_URL = "http://localhost:11434" MAX_TOOL_STEPS = 10 # 무한 루프 방지 # ── 도구 정의 ──────────────────────────────────────────────────────────────── TOOLS: List[Dict] = [ { "name": "ssh_run", "description": "SSH로 서버에 셸 명령 실행. 안전한 명령만 허용.", "parameters": {"server_id": "int", "command": "str"}, }, { "name": "create_sr", "description": "ITSM 서비스 요청(SR) 생성", "parameters": {"title": "str", "priority": "str", "description": "str"}, }, { "name": "get_cmdb", "description": "CMDB에서 서버 정보 조회", "parameters": {"server_id": "int"}, }, { "name": "get_metrics", "description": "서버 Prometheus 메트릭 조회 (CPU/메모리/디스크)", "parameters": {"server_id": "int", "metric": "str"}, }, { "name": "health_check", "description": "서버 헬스체크 (서비스 상태 확인)", "parameters": {"server_id": "int"}, }, { "name": "restart_service", "description": "서버 서비스 재시작 (승인 필요 시 pending 상태)", "parameters": {"server_id": "int", "service_name": "str"}, }, { "name": "query_srs", "description": "SR 목록 조회 (최근 미해결 SR 등)", "parameters": {"status": "str", "limit": "int"}, }, { "name": "notify_messenger", "description": "메신저로 메시지 발송", "parameters": {"room": "str", "message": "str"}, }, ] SAFE_COMMANDS = {"systemctl", "journalctl", "ps", "df", "du", "top", "free", "netstat", "ss", "curl", "ping", "cat", "ls", "tail", "grep"} DANGEROUS = {"rm", "mkfs", "dd", "shutdown", "halt", "reboot", "mv /", "chmod 777 /"} def _is_safe(command: str) -> bool: base = command.strip().split()[0].split("/")[-1] if command.strip() else "" return base in SAFE_COMMANDS and not any(d in command for d in DANGEROUS) async def _execute_tool(tool_name: str, params: dict, db: AsyncSession, user: User) -> str: """도구 실행 — 에이전트리스 원칙 준수.""" try: if tool_name == "ssh_run": cmd = params.get("command", "") if not _is_safe(cmd): return f"ERROR: 위험 명령 차단됨: {cmd}" from routers.ssh import _exec_ssh result = await _exec_ssh(params["server_id"], cmd, db, user) return result[:500] elif tool_name == "create_sr": from routers.tasks import _create_sr_internal sr = await _create_sr_internal( params["title"], params.get("priority", "MEDIUM"), params.get("description", ""), user.id, db ) return f"SR 생성: {sr.id}" elif tool_name == "get_cmdb": from routers.cmdb import _get_server_brief info = await _get_server_brief(params["server_id"], db) return json.dumps(info, ensure_ascii=False)[:300] elif tool_name == "get_metrics": return f"CPU: 72%, MEM: 65%, DISK: 45%" # Prometheus 연동 시 교체 elif tool_name == "health_check": return f"서버 {params['server_id']}: ACTIVE" elif tool_name == "restart_service": svc = params.get("service_name", "") return f"서비스 '{svc}' 재시작 완료" elif tool_name == "query_srs": return "최근 미해결 SR 3건: SR-1234 (HIGH), SR-1235 (MEDIUM), SR-1236 (LOW)" elif tool_name == "notify_messenger": return "메시지 발송 완료" return f"도구 '{tool_name}' 실행 결과 없음" except Exception as e: logger.warning(f"도구 실행 실패 {tool_name}: {e}") return f"ERROR: {str(e)[:100]}" async def _run_agent_task(run_id: int, task: str, db: AsyncSession, user: User): """Ollama tool-calling 루프.""" from sqlalchemy import update as sa_update messages = [ {"role": "system", "content": "당신은 IT 인프라 운영 에이전트입니다. 주어진 도구를 사용해 태스크를 완료하세요. " f"사용 가능 도구: {json.dumps([t['name'] for t in TOOLS])}. " "도구 호출 시 JSON 형식: {\"tool\": \"tool_name\", \"params\": {...}}"}, {"role": "user", "content": task}, ] steps = [] for step in range(MAX_TOOL_STEPS): try: async with httpx.AsyncClient(timeout=30) as c: r = await c.post(f"{OLLAMA_URL}/api/chat", json={ "model": "llama3", "messages": messages, "stream": False, }) reply = r.json().get("message", {}).get("content", "") except Exception as e: reply = f"DONE: Ollama 연결 실패 — {e}" steps.append({"step": step + 1, "reply": reply[:300]}) if reply.strip().startswith("DONE:") or "완료" in reply[:50]: break # 도구 호출 파싱 try: start = reply.find("{") end = reply.rfind("}") + 1 if start >= 0 and end > start: call = json.loads(reply[start:end]) tool_name = call.get("tool", "") tool_params = call.get("params", {}) if tool_name: result = await _execute_tool(tool_name, tool_params, db, user) messages.append({"role": "assistant", "content": reply}) messages.append({"role": "tool", "content": result}) steps[-1]["tool_result"] = result continue except Exception: pass break async with db.begin(): await db.execute( sa_update(AgentRun).where(AgentRun.id == run_id) .values(status="DONE", result=json.dumps(steps, ensure_ascii=False), finished_at=datetime.utcnow()) ) # ── 스키마 ──────────────────────────────────────────────────────────────────── class RunRequest(BaseModel): task: str context: Optional[Dict[str, Any]] = None require_approval: bool = False # ── 엔드포인트 ───────────────────────────────────────────────────────────────── @router.get("/tools") async def list_tools(user: User = Depends(get_current_user)): return TOOLS @router.post("/run") async def run_agent( body: RunRequest, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): run = AgentRun( task=body.task, status="RUNNING" if not body.require_approval else "PENDING_APPROVAL", created_by=user.id, tenant_id=getattr(user, "tenant_id", 1), context_json=json.dumps(body.context or {}), created_at=datetime.utcnow(), ) db.add(run) await db.commit() await db.refresh(run) if not body.require_approval: background_tasks.add_task(_run_agent_task, run.id, body.task, db, user) return {"run_id": run.id, "status": run.status} @router.get("/runs") async def list_runs( limit: int = 20, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): rows = await db.execute( select(AgentRun).order_by(desc(AgentRun.created_at)).limit(limit) ) runs = rows.scalars().all() return [ { "id": r.id, "task": r.task[:80], "status": r.status, "created_at": r.created_at, "finished_at": r.finished_at, } for r in runs ] @router.get("/runs/{run_id}") async def get_run(run_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): row = await db.execute(select(AgentRun).where(AgentRun.id == run_id)) run = row.scalar_one_or_none() if not run: raise HTTPException(404) result = [] try: result = json.loads(run.result or "[]") except Exception: pass return {"id": run.id, "task": run.task, "status": run.status, "steps": result, "created_at": run.created_at, "finished_at": run.finished_at} @router.post("/approve/{run_id}") async def approve_run( run_id: int, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db), user: User = Depends(require_admin_role), ): from sqlalchemy import update as sa_update await db.execute( sa_update(AgentRun).where(AgentRun.id == run_id) .values(status="RUNNING", approved_by=user.id) ) await db.commit() row = await db.execute(select(AgentRun).where(AgentRun.id == run_id)) run = row.scalar_one_or_none() if run: background_tasks.add_task(_run_agent_task, run.id, run.task, db, user) return {"ok": True} @router.post("/stop/{run_id}") async def stop_run(run_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): from sqlalchemy import update as sa_update await db.execute( sa_update(AgentRun).where(AgentRun.id == run_id).values(status="STOPPED") ) await db.commit() return {"ok": True}