zioinfo-mail/workspace/guardia-itsm/routers/agents.py
DESKTOP-TKLFCPR\ython cfe2901a55 refactor(structure): consolidate all projects under workspace/
- itsm/    -> workspace/guardia-itsm/
- manager/ -> workspace/guardia-manager/
- app/     -> workspace/guardia-messenger/
- manual/  -> workspace/guardia-docs/

workspace/zioinfo-web/ unchanged.
git mv preserves full commit history.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-31 23:50:56 +09:00

591 lines
21 KiB
Python

"""
GUARDiA ITSM — AI 에이전트 관리 API
엔드포인트:
GET /api/agents 에이전트 목록
POST /api/agents 에이전트 생성
GET /api/agents/{id} 에이전트 상세
PATCH /api/agents/{id} 에이전트 수정
DELETE /api/agents/{id} 에이전트 삭제
POST /api/agents/{id}/heartbeat 수동 하트비트 실행
POST /api/agents/{id}/pause 일시 중지
POST /api/agents/{id}/resume 재개
GET /api/agents/{id}/tasks 태스크 목록
POST /api/agents/{id}/tasks 태스크 직접 생성 (Developer 에이전트용)
GET /api/agents/approvals 전체 승인 대기 목록
PATCH /api/agents/approvals/{id}/review 승인 또는 거부
GET /api/agents/stats 전체 에이전트 통계
GET /api/agents/orgchart 조직도 반환 (Phase 4 대시보드용)
GET /api/agents/llm/health Ollama 헬스체크
GET /api/agents/llm/models 설치된 모델 목록
POST /api/agents/llm/pull 모델 다운로드 시작
RBAC: CUSTOMER 역할 전체 차단.
"""
from __future__ import annotations
import logging
from datetime import datetime, date
from typing import Optional, List
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
from models import (
User, UserRole,
AgentConfig, AgentTask, AgentApproval,
AgentRole, AgentStatus, LLMProvider, AgentTaskStatus, AgentApprovalStatus,
AgentConfigOut, AgentConfigCreate, AgentConfigUpdate,
AgentTaskOut, AgentTaskCreate,
AgentApprovalOut, AgentApprovalReview,
AgentStatsOut, AgentOrgNode,
)
from routers.auth import get_current_user
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/agents", tags=["agents"])
# ── 공통 헬퍼 ────────────────────────────────────────────────────────────────
def _check_access(user: User) -> None:
if user.role == UserRole.CUSTOMER:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="에이전트 관리는 운영 계정만 접근 가능합니다.",
)
async def _get_agent_or_404(agent_id: int, db: AsyncSession) -> AgentConfig:
agent = (await db.execute(
select(AgentConfig).where(AgentConfig.id == agent_id)
)).scalars().first()
if not agent:
raise HTTPException(status_code=404, detail=f"에이전트 ID {agent_id} 없음")
return agent
# ── 에이전트 CRUD ────────────────────────────────────────────────────────────
@router.get("", response_model=List[AgentConfigOut])
async def list_agents(
role: Optional[str] = None,
active_only: bool = False,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""에이전트 목록 조회."""
_check_access(current_user)
q = select(AgentConfig)
if role:
q = q.where(AgentConfig.role == role)
if active_only:
q = q.where(AgentConfig.is_active.is_(True))
q = q.order_by(AgentConfig.role, AgentConfig.name)
return (await db.execute(q)).scalars().all()
@router.post("", response_model=AgentConfigOut, status_code=status.HTTP_201_CREATED)
async def create_agent(
body: AgentConfigCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""에이전트 등록."""
_check_access(current_user)
if current_user.role not in (UserRole.ADMIN, UserRole.PM):
raise HTTPException(status_code=403, detail="에이전트 생성은 ADMIN/PM만 가능합니다.")
agent = AgentConfig(
name=body.name,
role=body.role,
description=body.description,
llm_provider=body.llm_provider,
llm_model=body.llm_model,
system_prompt=body.system_prompt,
heartbeat_cron=body.heartbeat_cron,
is_active=body.is_active,
status=AgentStatus.IDLE,
created_by=current_user.id,
)
db.add(agent)
await db.commit()
await db.refresh(agent)
# 하트비트 크론 등록 (APScheduler)
if agent.heartbeat_cron:
_register_heartbeat(agent)
logger.info("에이전트 생성: id=%d role=%s user=%s", agent.id, agent.role, current_user.username)
return agent
@router.get("/stats", response_model=AgentStatsOut)
async def get_stats(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""에이전트 전체 통계 — Phase 4 대시보드용."""
_check_access(current_user)
total = (await db.execute(select(func.count()).select_from(AgentConfig))).scalar() or 0
active = (await db.execute(
select(func.count()).select_from(AgentConfig).where(AgentConfig.is_active.is_(True))
)).scalar() or 0
today_start = datetime.combine(date.today(), datetime.min.time())
tasks_today = (await db.execute(
select(func.count()).select_from(AgentTask)
.where(AgentTask.created_at >= today_start)
)).scalar() or 0
tokens_today = (await db.execute(
select(func.coalesce(func.sum(AgentTask.tokens_used), 0))
.where(AgentTask.created_at >= today_start)
)).scalar() or 0
pending_approvals = (await db.execute(
select(func.count()).select_from(AgentApproval)
.where(AgentApproval.status == AgentApprovalStatus.PENDING)
)).scalar() or 0
from core.llm_client import get_llm_client
llm_online = await get_llm_client().health_check()
return AgentStatsOut(
total_agents=total,
active_agents=active,
total_tasks_today=tasks_today,
total_tokens_today=tokens_today,
pending_approvals=pending_approvals,
llm_online=llm_online,
)
@router.get("/orgchart", response_model=List[AgentOrgNode])
async def get_orgchart(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""에이전트 조직도 — Phase 4 대시보드용."""
_check_access(current_user)
agents = (await db.execute(
select(AgentConfig).where(AgentConfig.is_active.is_(True))
)).scalars().all()
# 조직 계층: CEO → CTO/PM_AGENT → DEV/QA
hierarchy = {
AgentRole.CEO: [],
AgentRole.CTO: [AgentRole.DEVELOPER, AgentRole.QA],
AgentRole.PM_AGENT: [],
AgentRole.DEVELOPER: [],
AgentRole.QA: [],
AgentRole.INCIDENT_TRIAGE: [],
AgentRole.KB_CURATOR: [],
AgentRole.SSL_WATCHER: [],
AgentRole.WBS_MONITOR: [],
AgentRole.PM_SUGGESTER: [],
}
ceo_children = [AgentRole.CTO, AgentRole.PM_AGENT,
AgentRole.INCIDENT_TRIAGE, AgentRole.KB_CURATOR,
AgentRole.SSL_WATCHER, AgentRole.WBS_MONITOR, AgentRole.PM_SUGGESTER]
def build_node(a: AgentConfig) -> AgentOrgNode:
child_roles = hierarchy.get(a.role, [])
child_agents = [x for x in agents if x.role in [r.value for r in child_roles]]
return AgentOrgNode(
id=a.id,
name=a.name,
role=a.role,
status=a.status,
children=[build_node(c) for c in child_agents],
)
ceo_agents = [a for a in agents if a.role == AgentRole.CEO]
return [build_node(a) for a in ceo_agents] if ceo_agents else [
AgentOrgNode(id=0, name="(CEO 미등록)", role="CEO", status="IDLE")
]
@router.get("/approvals", response_model=List[AgentApprovalOut])
async def list_approvals(
pending_only: bool = True,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""승인 대기 목록."""
_check_access(current_user)
q = select(AgentApproval).order_by(AgentApproval.requested_at.desc())
if pending_only:
q = q.where(AgentApproval.status == AgentApprovalStatus.PENDING)
return (await db.execute(q.limit(50))).scalars().all()
@router.patch("/approvals/{approval_id}/review", response_model=AgentApprovalOut)
async def review_approval(
approval_id: int,
body: AgentApprovalReview,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""승인 또는 거부."""
_check_access(current_user)
approval = (await db.execute(
select(AgentApproval).where(AgentApproval.id == approval_id)
)).scalars().first()
if not approval:
raise HTTPException(status_code=404, detail="승인 항목 없음")
if approval.status != AgentApprovalStatus.PENDING:
raise HTTPException(status_code=400, detail="이미 처리된 항목입니다.")
approval.status = (
AgentApprovalStatus.APPROVED if body.approved else AgentApprovalStatus.REJECTED
)
approval.notes = body.notes
approval.reviewed_by = current_user.id
approval.reviewed_at = datetime.now()
await db.commit()
await db.refresh(approval)
logger.info(
"승인 처리: approval_id=%d status=%s user=%s",
approval_id, approval.status, current_user.username,
)
return approval
# ── LLM 헬스·모델 관리 ────────────────────────────────────────────────────
@router.get("/llm/health")
async def llm_health(current_user: User = Depends(get_current_user)):
"""Ollama 서버 상태 확인."""
_check_access(current_user)
from core.llm_client import get_llm_client
client = get_llm_client()
online = await client.health_check()
models = await client.list_models() if online else []
return {
"online": online,
"base_url": client.base_url,
"models": [{"name": m.name, "size_gb": round(m.size / 1e9, 1)} for m in models],
}
@router.post("/llm/pull")
async def pull_model(
model: str,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_user),
):
"""Ollama 모델 다운로드 (백그라운드)."""
_check_access(current_user)
if current_user.role != UserRole.ADMIN:
raise HTTPException(status_code=403, detail="모델 다운로드는 ADMIN만 가능합니다.")
from core.llm_client import get_llm_client
background_tasks.add_task(get_llm_client().pull_model, model)
return {"message": f"모델 '{model}' 다운로드 시작 (백그라운드)", "model": model}
# ── 에이전트 상세·수정·삭제 ───────────────────────────────────────────────
@router.get("/{agent_id}", response_model=AgentConfigOut)
async def get_agent(
agent_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
_check_access(current_user)
return await _get_agent_or_404(agent_id, db)
@router.patch("/{agent_id}", response_model=AgentConfigOut)
async def update_agent(
agent_id: int,
body: AgentConfigUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
_check_access(current_user)
agent = await _get_agent_or_404(agent_id, db)
for k, v in body.model_dump(exclude_none=True).items():
setattr(agent, k, v)
await db.commit()
await db.refresh(agent)
return agent
@router.delete("/{agent_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_agent(
agent_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if current_user.role != UserRole.ADMIN:
raise HTTPException(status_code=403, detail="ADMIN만 에이전트 삭제 가능합니다.")
agent = await _get_agent_or_404(agent_id, db)
await db.delete(agent)
await db.commit()
# ── 하트비트·제어 ────────────────────────────────────────────────────────────
@router.post("/{agent_id}/heartbeat")
async def manual_heartbeat(
agent_id: int,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""수동 하트비트 트리거 (즉시 실행, 백그라운드)."""
_check_access(current_user)
agent = await _get_agent_or_404(agent_id, db)
if not agent.is_active:
raise HTTPException(status_code=400, detail="비활성 에이전트입니다.")
from core.agents import get_agent_engine
background_tasks.add_task(get_agent_engine().run_heartbeat, agent_id)
return {"message": f"에이전트 '{agent.name}' 하트비트 시작", "agent_id": agent_id}
@router.post("/{agent_id}/pause")
async def pause_agent(
agent_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""에이전트 일시 중지."""
_check_access(current_user)
agent = await _get_agent_or_404(agent_id, db)
agent.is_active = False
agent.status = AgentStatus.PAUSED
await db.commit()
return {"message": f"에이전트 '{agent.name}' 일시 중지", "status": "PAUSED"}
@router.post("/{agent_id}/resume")
async def resume_agent(
agent_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""에이전트 재개."""
_check_access(current_user)
agent = await _get_agent_or_404(agent_id, db)
agent.is_active = True
agent.status = AgentStatus.IDLE
await db.commit()
return {"message": f"에이전트 '{agent.name}' 재개", "status": "IDLE"}
# ── 태스크 ───────────────────────────────────────────────────────────────────
@router.get("/{agent_id}/tasks", response_model=List[AgentTaskOut])
async def list_tasks(
agent_id: int,
limit: int = 20,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
_check_access(current_user)
tasks = (await db.execute(
select(AgentTask)
.where(AgentTask.agent_id == agent_id)
.order_by(AgentTask.created_at.desc())
.limit(limit)
)).scalars().all()
return tasks
@router.post("/{agent_id}/tasks", response_model=AgentTaskOut,
status_code=status.HTTP_201_CREATED)
async def create_task(
agent_id: int,
body: AgentTaskCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Developer 에이전트에게 태스크 직접 등록."""
_check_access(current_user)
agent = await _get_agent_or_404(agent_id, db)
task = AgentTask(
agent_id=agent_id,
title=body.title,
description=body.description,
status=AgentTaskStatus.PENDING,
input_data=body.input_data,
created_at=datetime.now(),
)
db.add(task)
await db.commit()
await db.refresh(task)
# Developer 에이전트면 바로 하트비트 실행
if agent.role == AgentRole.DEVELOPER and agent.is_active:
from core.agents import get_agent_engine
background_tasks.add_task(get_agent_engine().run_heartbeat, agent_id)
return task
# ── 스케줄러 등록 헬퍼 ────────────────────────────────────────────────────
def _register_heartbeat(agent: AgentConfig) -> None:
"""APScheduler에 에이전트 하트비트 크론 잡 등록."""
try:
from core.scheduler import get_scheduler
from apscheduler.triggers.cron import CronTrigger
scheduler = get_scheduler()
if scheduler is None or not scheduler.running:
return
from core.agents import get_agent_engine
job_id = f"agent_heartbeat_{agent.id}"
scheduler.add_job(
get_agent_engine().run_heartbeat,
CronTrigger.from_crontab(agent.heartbeat_cron, timezone="Asia/Seoul"),
args=[agent.id],
id=job_id,
name=f"에이전트 하트비트: {agent.name}",
replace_existing=True,
misfire_grace_time=300,
)
logger.info("에이전트 하트비트 등록: job_id=%s cron=%s", job_id, agent.heartbeat_cron)
except Exception as exc:
logger.warning("에이전트 하트비트 스케줄 등록 실패: %s", exc)
# ── 에이전트 간 메시지 API (Priority 3) ───────────────────────────────────────
@router.get("/{agent_id}/messages", summary="수신 메시지 목록")
async def get_agent_messages(
agent_id: int,
unread_only: bool = False,
limit: int = 50,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""에이전트가 수신한 메시지 목록을 반환한다."""
_check_access(current_user)
from models import AgentMessage, AgentMessageOut
q = select(AgentMessage).where(AgentMessage.to_agent_id == agent_id)
if unread_only:
q = q.where(AgentMessage.is_read.is_(False))
q = q.order_by(AgentMessage.created_at.desc()).limit(limit)
msgs = (await db.execute(q)).scalars().all()
return [AgentMessageOut.model_validate(m) for m in msgs]
@router.post("/{agent_id}/messages", status_code=201, summary="메시지 전송")
async def send_agent_message(
agent_id: int,
payload: dict,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
에이전트에게 메시지를 전송한다.
Body: {
"from_agent_id": int | null,
"message_type": "TASK_DELEGATION|STATUS_UPDATE|ESCALATION",
"subject": "string",
"body": "string",
"metadata_json": "string"
}
"""
_check_access(current_user)
from models import AgentMessage, AgentMessageOut
msg = AgentMessage(
from_agent_id=payload.get("from_agent_id"),
to_agent_id=agent_id,
message_type=payload.get("message_type", "STATUS_UPDATE"),
subject=payload.get("subject", ""),
body=payload.get("body"),
metadata_json=payload.get("metadata_json"),
)
db.add(msg)
await db.commit()
await db.refresh(msg)
return AgentMessageOut.model_validate(msg)
@router.patch("/messages/{msg_id}/read", summary="메시지 읽음 처리")
async def mark_message_read(
msg_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""메시지를 읽음 처리한다."""
_check_access(current_user)
from models import AgentMessage, AgentMessageOut
msg = await db.get(AgentMessage, msg_id)
if not msg:
raise HTTPException(404, "메시지를 찾을 수 없습니다.")
msg.is_read = True
msg.read_at = datetime.utcnow()
await db.commit()
await db.refresh(msg)
return AgentMessageOut.model_validate(msg)
# ── 파인튜닝 API (Priority 3) ─────────────────────────────────────────────────
@router.post("/finetune/start", summary="파인튜닝 시작 (ADMIN only)")
async def start_finetune(
background_tasks: BackgroundTasks,
payload: dict = {},
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
tb_agent_task(COMPLETED) 데이터로 Ollama 커스텀 모델 파인튜닝을 시작한다.
Body: { "model_name": "guardia-agent-v2", "limit": 1000 }
"""
if current_user.role != UserRole.ADMIN:
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
import os
model_name = payload.get("model_name", "guardia-agent-v2")
limit = int(payload.get("limit", 1000))
dataset_path = f"/opt/guardia/finetune/{model_name}.jsonl"
async def _run_finetune():
from core.llm_client import get_llm_client
llm = get_llm_client()
count = await llm.export_finetune_dataset(dataset_path, limit=limit)
if count == 0:
logger.error("[finetune] 데이터셋 내보내기 실패 또는 데이터 없음")
return
ok = await llm.fine_tune(dataset_path, model_name)
if ok:
logger.info("[finetune] 파인튜닝 완료: model=%s", model_name)
else:
logger.error("[finetune] 파인튜닝 실패: model=%s", model_name)
background_tasks.add_task(_run_finetune)
return {"status": "started", "model_name": model_name, "dataset_path": dataset_path}
@router.get("/finetune/status", summary="파인튜닝 진행 상태 조회")
async def get_finetune_status(
current_user: User = Depends(get_current_user),
):
"""파인튜닝 진행 상태를 반환한다 (Ollama 모델 목록 기반)."""
_check_access(current_user)
from core.llm_client import get_llm_client
llm = get_llm_client()
models = await llm.list_models()
return {"models": [{"name": m.name, "size": m.size} for m in models]}