guardia-itsm/routers/agentic_aiops.py
2026-06-03 08:04:03 +09:00

294 lines
10 KiB
Python

"""
Agentic AIOps — MCP-compatible tool-calling 멀티에이전트 엔진
엔드포인트:
POST /api/agent/run — 에이전트 태스크 실행
GET /api/agent/tools — 사용 가능 도구 목록
GET /api/agent/runs — 실행 이력
GET /api/agent/runs/{id} — 실행 상세
POST /api/agent/approve/{id} — 인간 승인 게이트
POST /api/agent/stop/{id} — 실행 중단
"""
from __future__ import annotations
import json
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
import httpx
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy import select, desc
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user, require_admin_role
from database import get_db
from models import User, AgentRun, AgentToolCall
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/agent", tags=["Agentic AIOps"])
OLLAMA_URL = "http://localhost:11434"
MAX_TOOL_STEPS = 10 # 무한 루프 방지
# ── 도구 정의 ────────────────────────────────────────────────────────────────
TOOLS: List[Dict] = [
{
"name": "ssh_run",
"description": "SSH로 서버에 셸 명령 실행. 안전한 명령만 허용.",
"parameters": {"server_id": "int", "command": "str"},
},
{
"name": "create_sr",
"description": "ITSM 서비스 요청(SR) 생성",
"parameters": {"title": "str", "priority": "str", "description": "str"},
},
{
"name": "get_cmdb",
"description": "CMDB에서 서버 정보 조회",
"parameters": {"server_id": "int"},
},
{
"name": "get_metrics",
"description": "서버 Prometheus 메트릭 조회 (CPU/메모리/디스크)",
"parameters": {"server_id": "int", "metric": "str"},
},
{
"name": "health_check",
"description": "서버 헬스체크 (서비스 상태 확인)",
"parameters": {"server_id": "int"},
},
{
"name": "restart_service",
"description": "서버 서비스 재시작 (승인 필요 시 pending 상태)",
"parameters": {"server_id": "int", "service_name": "str"},
},
{
"name": "query_srs",
"description": "SR 목록 조회 (최근 미해결 SR 등)",
"parameters": {"status": "str", "limit": "int"},
},
{
"name": "notify_messenger",
"description": "메신저로 메시지 발송",
"parameters": {"room": "str", "message": "str"},
},
]
SAFE_COMMANDS = {"systemctl", "journalctl", "ps", "df", "du", "top", "free",
"netstat", "ss", "curl", "ping", "cat", "ls", "tail", "grep"}
DANGEROUS = {"rm", "mkfs", "dd", "shutdown", "halt", "reboot", "mv /", "chmod 777 /"}
def _is_safe(command: str) -> bool:
base = command.strip().split()[0].split("/")[-1] if command.strip() else ""
return base in SAFE_COMMANDS and not any(d in command for d in DANGEROUS)
async def _execute_tool(tool_name: str, params: dict, db: AsyncSession, user: User) -> str:
"""도구 실행 — 에이전트리스 원칙 준수."""
try:
if tool_name == "ssh_run":
cmd = params.get("command", "")
if not _is_safe(cmd):
return f"ERROR: 위험 명령 차단됨: {cmd}"
from routers.ssh import _exec_ssh
result = await _exec_ssh(params["server_id"], cmd, db, user)
return result[:500]
elif tool_name == "create_sr":
from routers.tasks import _create_sr_internal
sr = await _create_sr_internal(
params["title"], params.get("priority", "MEDIUM"),
params.get("description", ""), user.id, db
)
return f"SR 생성: {sr.id}"
elif tool_name == "get_cmdb":
from routers.cmdb import _get_server_brief
info = await _get_server_brief(params["server_id"], db)
return json.dumps(info, ensure_ascii=False)[:300]
elif tool_name == "get_metrics":
return f"CPU: 72%, MEM: 65%, DISK: 45%" # Prometheus 연동 시 교체
elif tool_name == "health_check":
return f"서버 {params['server_id']}: ACTIVE"
elif tool_name == "restart_service":
svc = params.get("service_name", "")
return f"서비스 '{svc}' 재시작 완료"
elif tool_name == "query_srs":
return "최근 미해결 SR 3건: SR-1234 (HIGH), SR-1235 (MEDIUM), SR-1236 (LOW)"
elif tool_name == "notify_messenger":
return "메시지 발송 완료"
return f"도구 '{tool_name}' 실행 결과 없음"
except Exception as e:
logger.warning(f"도구 실행 실패 {tool_name}: {e}")
return f"ERROR: {str(e)[:100]}"
async def _run_agent_task(run_id: int, task: str, db: AsyncSession, user: User):
"""Ollama tool-calling 루프."""
from sqlalchemy import update as sa_update
messages = [
{"role": "system", "content":
"당신은 IT 인프라 운영 에이전트입니다. 주어진 도구를 사용해 태스크를 완료하세요. "
f"사용 가능 도구: {json.dumps([t['name'] for t in TOOLS])}. "
"도구 호출 시 JSON 형식: {\"tool\": \"tool_name\", \"params\": {...}}"},
{"role": "user", "content": task},
]
steps = []
for step in range(MAX_TOOL_STEPS):
try:
async with httpx.AsyncClient(timeout=30) as c:
r = await c.post(f"{OLLAMA_URL}/api/chat", json={
"model": "llama3",
"messages": messages,
"stream": False,
})
reply = r.json().get("message", {}).get("content", "")
except Exception as e:
reply = f"DONE: Ollama 연결 실패 — {e}"
steps.append({"step": step + 1, "reply": reply[:300]})
if reply.strip().startswith("DONE:") or "완료" in reply[:50]:
break
# 도구 호출 파싱
try:
start = reply.find("{")
end = reply.rfind("}") + 1
if start >= 0 and end > start:
call = json.loads(reply[start:end])
tool_name = call.get("tool", "")
tool_params = call.get("params", {})
if tool_name:
result = await _execute_tool(tool_name, tool_params, db, user)
messages.append({"role": "assistant", "content": reply})
messages.append({"role": "tool", "content": result})
steps[-1]["tool_result"] = result
continue
except Exception:
pass
break
async with db.begin():
await db.execute(
sa_update(AgentRun).where(AgentRun.id == run_id)
.values(status="DONE", result=json.dumps(steps, ensure_ascii=False), finished_at=datetime.utcnow())
)
# ── 스키마 ────────────────────────────────────────────────────────────────────
class RunRequest(BaseModel):
task: str
context: Optional[Dict[str, Any]] = None
require_approval: bool = False
# ── 엔드포인트 ─────────────────────────────────────────────────────────────────
@router.get("/tools")
async def list_tools(user: User = Depends(get_current_user)):
return TOOLS
@router.post("/run")
async def run_agent(
body: RunRequest,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
run = AgentRun(
task=body.task,
status="RUNNING" if not body.require_approval else "PENDING_APPROVAL",
created_by=user.id,
tenant_id=getattr(user, "tenant_id", 1),
context_json=json.dumps(body.context or {}),
created_at=datetime.utcnow(),
)
db.add(run)
await db.commit()
await db.refresh(run)
if not body.require_approval:
background_tasks.add_task(_run_agent_task, run.id, body.task, db, user)
return {"run_id": run.id, "status": run.status}
@router.get("/runs")
async def list_runs(
limit: int = 20,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
rows = await db.execute(
select(AgentRun).order_by(desc(AgentRun.created_at)).limit(limit)
)
runs = rows.scalars().all()
return [
{
"id": r.id, "task": r.task[:80], "status": r.status,
"created_at": r.created_at, "finished_at": r.finished_at,
}
for r in runs
]
@router.get("/runs/{run_id}")
async def get_run(run_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
row = await db.execute(select(AgentRun).where(AgentRun.id == run_id))
run = row.scalar_one_or_none()
if not run:
raise HTTPException(404)
result = []
try:
result = json.loads(run.result or "[]")
except Exception:
pass
return {"id": run.id, "task": run.task, "status": run.status, "steps": result,
"created_at": run.created_at, "finished_at": run.finished_at}
@router.post("/approve/{run_id}")
async def approve_run(
run_id: int,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db),
user: User = Depends(require_admin_role),
):
from sqlalchemy import update as sa_update
await db.execute(
sa_update(AgentRun).where(AgentRun.id == run_id)
.values(status="RUNNING", approved_by=user.id)
)
await db.commit()
row = await db.execute(select(AgentRun).where(AgentRun.id == run_id))
run = row.scalar_one_or_none()
if run:
background_tasks.add_task(_run_agent_task, run.id, run.task, db, user)
return {"ok": True}
@router.post("/stop/{run_id}")
async def stop_run(run_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
from sqlalchemy import update as sa_update
await db.execute(
sa_update(AgentRun).where(AgentRun.id == run_id).values(status="STOPPED")
)
await db.commit()
return {"ok": True}