zioinfo-mail/workspace/guardia-itsm/core/agents.py
DESKTOP-TKLFCPR\ython cfe2901a55 refactor(structure): consolidate all projects under workspace/
- itsm/    -> workspace/guardia-itsm/
- manager/ -> workspace/guardia-manager/
- app/     -> workspace/guardia-messenger/
- manual/  -> workspace/guardia-docs/

workspace/zioinfo-web/ unchanged.
git mv preserves full commit history.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-31 23:50:56 +09:00

861 lines
38 KiB
Python

"""
GUARDiA ITSM — AI 에이전트 엔진 (Paperclip 스타일)
하트비트 기반 자율 운영 에이전트:
INCIDENT_TRIAGE : 미배정 인시던트 자동 분류·배정
KB_CURATOR : 해결된 SR → 지식베이스 자동 생성
SSL_WATCHER : SSL 만료 임박 → SR 자동 생성
WBS_MONITOR : WBS 지연 위험 LLM 분석
PM_SUGGESTER : PM 스케줄 미등록 서버 탐지
보안:
- 외부 API 호출 없음 (Ollama localhost:11434만 사용)
- CRITICAL 액션은 AgentApproval(PENDING) 생성 → 사람 승인 필요
- 저위험 액션은 AUTO_APPROVED로 즉시 적용
"""
from __future__ import annotations
import json
import logging
from datetime import date, datetime
from typing import Optional
from uuid import uuid4
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_
logger = logging.getLogger(__name__)
# ── 에이전트 엔진 ────────────────────────────────────────────────────────────
class AgentEngine:
"""
Paperclip 스타일 에이전트 실행 엔진.
scheduler.py의 하트비트 잡에서 호출된다.
"""
def __init__(self) -> None:
from core.llm_client import get_llm_client
self.llm = get_llm_client()
# ── 공통 실행 진입점 ─────────────────────────────────────────────────────
async def run_heartbeat(self, agent_id: int) -> None:
"""에이전트 하트비트 — scheduler.py에서 호출."""
from database import SessionLocal
from models import AgentConfig, AgentStatus, AgentRole
from sqlalchemy import select
# 에이전트 상태 ACTIVE로 변경
async with SessionLocal() as db:
agent = (await db.execute(
select(AgentConfig).where(AgentConfig.id == agent_id)
)).scalars().first()
if not agent or not agent.is_active:
return
if not await self.llm.health_check():
logger.warning("[Agent %d] Ollama 응답 없음 — 하트비트 스킵", agent_id)
return
agent.status = AgentStatus.ACTIVE
agent.last_heartbeat = datetime.now()
await db.commit()
# 역할별 핸들러 매핑
handlers = {
AgentRole.INCIDENT_TRIAGE: self._incident_triage,
AgentRole.KB_CURATOR: self._kb_curator,
AgentRole.SSL_WATCHER: self._ssl_watcher,
AgentRole.WBS_MONITOR: self._wbs_monitor,
AgentRole.PM_SUGGESTER: self._pm_suggester,
AgentRole.DEVELOPER: self._developer_agent,
}
try:
async with SessionLocal() as db:
agent = (await db.execute(
select(AgentConfig).where(AgentConfig.id == agent_id)
)).scalars().first()
if not agent:
return
agent.status = AgentStatus.WORKING
await db.flush()
handler = handlers.get(agent.role)
if handler:
await handler(db, agent)
agent.status = AgentStatus.IDLE
await db.commit()
except Exception as exc:
logger.error("[Agent %d] 하트비트 오류: %s", agent_id, exc, exc_info=True)
async with SessionLocal() as db:
a = (await db.execute(
select(AgentConfig).where(AgentConfig.id == agent_id)
)).scalars().first()
if a:
a.status = AgentStatus.ERROR
a.last_error = str(exc)[:500]
await db.commit()
# ── 헬퍼 ─────────────────────────────────────────────────────────────────
def _new_task(self, agent_id: int, title: str, input_data: dict | None = None):
from models import AgentTask, AgentTaskStatus
return AgentTask(
agent_id=agent_id,
title=title,
status=AgentTaskStatus.IN_PROGRESS,
input_data=input_data or {},
started_at=datetime.now(),
)
def _approval(
self,
agent_id: int,
task_id: int | None,
action_type: str,
action_data: dict,
auto: bool = False,
):
from models import AgentApproval, AgentApprovalStatus
return AgentApproval(
agent_id=agent_id,
task_id=task_id,
action_type=action_type,
action_data=action_data,
status=(
AgentApprovalStatus.AUTO_APPROVED
if auto
else AgentApprovalStatus.PENDING
),
)
# ══════════════════════════════════════════════════════════════════════════
# ── 1. 인시던트 트리아지 ────────────────────────────────────────────────
# ══════════════════════════════════════════════════════════════════════════
async def _incident_triage(self, db: AsyncSession, agent) -> None:
"""
미배정 인시던트 → LLM 심각도·카테고리 분류 → 자동 배정.
CRITICAL은 사람 승인 대기, 나머지는 자동 적용.
"""
from models import Incident, IncidentStatus, AgentTask, AgentTaskStatus
incidents = (await db.execute(
select(Incident).where(
Incident.assigned_to.is_(None),
Incident.status.in_(["OPEN", "RECEIVED"]),
).limit(10)
)).scalars().all()
if not incidents:
logger.debug("[INCIDENT_TRIAGE] 처리 대상 없음")
return
system_prompt = (
agent.system_prompt
or "당신은 ITSM 인시던트 트리아지 전문가입니다. 인시던트를 정확히 분류하세요."
)
for inc in incidents:
task = self._new_task(
agent.id,
f"인시던트 트리아지 #{inc.incident_id}",
{"incident_id": inc.id, "title": inc.title},
)
db.add(task)
await db.flush()
result = await self.llm.json_generate(
prompt=(
f"인시던트를 분류하세요.\n\n"
f"제목: {inc.title}\n"
f"설명: {inc.description or '없음'}\n"
f"서비스: {inc.affected_service or '미지정'}\n\n"
f"응답 JSON:\n"
f'{{"severity":"CRITICAL|HIGH|MEDIUM|LOW",'
f'"category":"NETWORK|SERVER|APPLICATION|DATABASE|SECURITY|OTHER",'
f'"reason":"분류 근거 (50자 이내)"}}'
),
model=agent.llm_model,
system=system_prompt,
)
severity = result.get("severity", "MEDIUM")
is_critical = severity == "CRITICAL"
task.status = AgentTaskStatus.COMPLETED
task.output_data = result
task.tokens_used = 0
task.completed_at = datetime.now()
approval = self._approval(
agent_id=agent.id,
task_id=task.id,
action_type="TRIAGE_INCIDENT",
action_data={
"incident_id": inc.id,
"incident_ref": inc.incident_id,
"severity": severity,
"category": result.get("category", "OTHER"),
"reason": result.get("reason", ""),
},
auto=not is_critical, # CRITICAL만 사람 승인
)
db.add(approval)
# 자동 승인 액션 즉시 적용
if not is_critical:
from models import IncidentGrade
grade_map = {
"CRITICAL": IncidentGrade.CRITICAL,
"HIGH": IncidentGrade.HIGH,
"MEDIUM": IncidentGrade.MEDIUM,
"LOW": IncidentGrade.LOW,
}
inc.grade = grade_map.get(severity, IncidentGrade.MEDIUM)
# 에이전트 통계 업데이트
agent.total_tasks += 1
await db.commit()
logger.info("[INCIDENT_TRIAGE] %d건 처리 완료", len(incidents))
# ── RCA 자동 초안 생성 (RESOLVED 상태, rca_draft 없는 인시던트) ──────
await self._generate_rca_drafts(db, agent)
# ── RCA 자동 초안 생성 헬퍼 ─────────────────────────────────────────────
async def _generate_rca_drafts(self, db: AsyncSession, agent) -> None:
"""
RESOLVED 장애 중 rca_draft 가 없는 항목에 대해 Ollama로 RCA 문서 초안을 자동 생성.
생성된 초안은 AgentApproval(PENDING) 을 통해 담당자 검토 후 확정한다.
외부 API 없음 — 모든 LLM 추론은 Ollama localhost:11434.
"""
from models import Incident, IncidentStatus, AgentTask, AgentTaskStatus, AgentApproval
resolved = (await db.execute(
select(Incident).where(
Incident.status == IncidentStatus.RESOLVED,
Incident.rca_draft.is_(None),
).limit(5)
)).scalars().all()
if not resolved:
return
for inc in resolved:
try:
rca = await self.llm.json_generate(
prompt=(
f"장애 인시던트에 대한 RCA(근본 원인 분석) 초안을 작성하세요.\n\n"
f"제목: {inc.title}\n"
f"설명: {inc.description or '없음'}\n"
f"등급: {inc.grade}\n"
f"발생 시각: {inc.occurred_at}\n"
f"해소 시각: {inc.resolved_at}\n\n"
f"응답 JSON (한국어로 작성):\n"
f'{{"root_cause":"근본 원인 (2~3문장)",'
f'"timeline":"사건 타임라인",'
f'"impact":"영향 범위 (서비스/사용자 수)",'
f'"resolution":"해결 과정",'
f'"preventive_measures":"재발 방지 조치 (3가지 이상)",'
f'"lessons_learned":"교훈"}}'
),
model=agent.llm_model,
system=(
agent.system_prompt
or "당신은 IT 인프라 장애 분석 전문가입니다. 정확하고 실용적인 RCA를 작성하세요."
),
)
if not rca or not rca.get("root_cause"):
continue
inc.rca_draft = json.dumps(rca, ensure_ascii=False)
await db.flush()
# 담당자 검토 필요 — PENDING 승인 생성
task = self._new_task(
agent.id,
f"RCA 초안 생성: {inc.incident_id}",
{"incident_id": inc.id, "incident_ref": inc.incident_id},
)
db.add(task)
await db.flush()
task.status = "COMPLETED"
task.output_data = rca
task.completed_at = datetime.now()
db.add(self._approval(
agent_id=agent.id,
task_id=task.id,
action_type="RCA_DRAFT",
action_data={"incident_id": inc.id, "incident_ref": inc.incident_id},
auto=False, # 항상 사람 검토 필요
))
agent.total_tasks += 1
except Exception as exc:
logger.warning("[RCA_DRAFT] 인시던트 %s 처리 실패: %s", inc.incident_id, exc)
await db.commit()
logger.info("[RCA_DRAFT] %d건 초안 생성 완료", len(resolved))
# ══════════════════════════════════════════════════════════════════════════
# ── 2. KB 큐레이터 ──────────────────────────────────────────────────────
# ══════════════════════════════════════════════════════════════════════════
async def _kb_curator(self, db: AsyncSession, agent) -> None:
"""
해결된 SR 중 KB 미생성 항목 → LLM으로 KB 아티클 자동 생성.
생성된 KB는 is_published=False(검토 대기)로 저장.
"""
from models import (
SRRequest, SRStatus, KBDocument,
AgentTask, AgentTaskStatus, AgentApproval,
)
# 해결된 SR 중 이미 KB로 변환된 것 제외
# (AgentApproval에서 같은 SR에 대한 CREATE_KB 이력 확인)
existing_kb_sr_ids = [
row[0]
for row in (await db.execute(
select(AgentApproval.action_data["sr_id"].as_string())
.where(AgentApproval.action_type == "CREATE_KB")
)).all()
if row[0] is not None
]
srs = (await db.execute(
select(SRRequest).where(
SRRequest.status == SRStatus.COMPLETED,
SRRequest.id.notin_([int(x) for x in existing_kb_sr_ids if x.isdigit()]),
).limit(5)
)).scalars().all()
if not srs:
logger.debug("[KB_CURATOR] 처리 대상 없음")
return
system_prompt = (
agent.system_prompt
or "당신은 IT 지식베이스 전문 에디터입니다. 체계적인 KB 아티클을 작성하세요."
)
for sr in srs:
task = self._new_task(
agent.id,
f"KB 자동 생성: SR #{sr.sr_id}",
{"sr_id": sr.id, "title": sr.title},
)
db.add(task)
await db.flush()
result = await self.llm.json_generate(
prompt=(
f"해결된 서비스 요청으로 지식베이스 아티클을 작성하세요.\n\n"
f"SR 제목: {sr.title}\n"
f"SR 유형: {sr.sr_type}\n"
f"설명: {sr.description or '없음'}\n\n"
f"응답 JSON:\n"
f'{{"kb_title":"검색 가능한 KB 제목",'
f'"symptom":"증상 설명 (2~3문장)",'
f'"cause":"원인 분석 (2~3문장)",'
f'"solution":"해결 방법 (단계별)",'
f'"tags":["태그1","태그2","태그3"]}}'
),
model=agent.llm_model,
system=system_prompt,
)
if not result or not result.get("kb_title"):
task.status = AgentTaskStatus.FAILED
task.output_data = {"error": "LLM 응답 파싱 실패"}
task.completed_at = datetime.now()
continue
# KB 문서 생성 (검토 대기 상태)
kb = KBDocument(
doc_id=f"KB-{datetime.now().strftime('%Y%m%d')}-{str(uuid4())[:6].upper()}",
category=sr.sr_type or "OTHER",
title=result.get("kb_title", sr.title),
symptoms=result.get("symptom", ""),
cause=result.get("cause", ""),
solution=result.get("solution", ""),
tags=",".join(result.get("tags", [])),
sr_type=sr.sr_type,
)
db.add(kb)
await db.flush()
# ── KB 품질 자동 평가 ───────────────────────────────────────────
quality = await self.llm.json_generate(
prompt=(
f"아래 KB 아티클의 품질을 평가하세요.\n\n"
f"제목: {kb.title}\n"
f"증상: {kb.symptoms}\n"
f"원인: {kb.cause}\n"
f"해결책: {kb.solution}\n\n"
f"응답 JSON (0~100 점수):\n"
f'{{"completeness":점수,"clarity":점수,"actionability":점수,"overall":점수}}'
),
model=agent.llm_model,
)
overall = int(quality.get("overall", 0)) if quality else 0
kb.quality_score = min(max(overall, 0), 100)
if kb.quality_score >= 80:
kb.published = True # 자동 발행
auto_approved = True
logger.info("[KB_CURATOR] KB %s 자동 발행 (품질 %d점)", kb.doc_id, kb.quality_score)
else:
kb.published = False
auto_approved = False
logger.info("[KB_CURATOR] KB %s 수동 검토 필요 (품질 %d점)", kb.doc_id, kb.quality_score)
# ────────────────────────────────────────────────────────────────
task.status = AgentTaskStatus.COMPLETED
task.output_data = {**result, "kb_doc_id": kb.doc_id, "quality_score": kb.quality_score}
task.completed_at = datetime.now()
db.add(self._approval(
agent_id=agent.id,
task_id=task.id,
action_type="CREATE_KB",
action_data={"sr_id": sr.id, "kb_doc_id": kb.doc_id, "quality_score": kb.quality_score},
auto=auto_approved,
))
agent.total_tasks += 1
await db.commit()
logger.info("[KB_CURATOR] %d건 처리 완료", len(srs))
# ══════════════════════════════════════════════════════════════════════════
# ── 3. SSL 감시자 ────────────────────────────────────────────────────────
# ══════════════════════════════════════════════════════════════════════════
async def _ssl_watcher(self, db: AsyncSession, agent) -> None:
"""
SSL 만료 D-30 이내 서버 탐지 → SR 자동 생성.
D-7 이하: HIGH 우선순위, 사람 승인 없이 즉시 생성.
"""
from models import Server, SRRequest, SRStatus, SRType, AgentTask, AgentTaskStatus
today = date.today()
servers = (await db.execute(
select(Server).where(Server.ssl_expire_date.isnot(None))
)).scalars().all()
created = 0
for srv in servers:
days_left = (srv.ssl_expire_date - today).days
if not (0 <= days_left <= 30):
continue
# 이미 미완료 SSL 갱신 SR 있는지 확인
existing = (await db.execute(
select(SRRequest).where(
SRRequest.title.contains(f"SSL 갱신 [{srv.server_name}]"),
SRRequest.status.notin_([SRStatus.COMPLETED, SRStatus.REJECTED]),
)
)).scalars().first()
if existing:
continue
priority = "HIGH" if days_left <= 7 else "MEDIUM"
sr = SRRequest(
sr_id=f"SR-{datetime.now().strftime('%Y%m%d')}-{str(uuid4())[:5].upper()}",
title=f"SSL 갱신 [{srv.server_name}] D-{days_left}",
description=(
f"🤖 AI 에이전트(SSL 감시자)가 자동 생성한 서비스 요청입니다.\n\n"
f"서버명: {srv.server_name}\n"
f"만료일: {srv.ssl_expire_date.isoformat()}\n"
f"남은 기간: D-{days_left}\n\n"
f"조치: SSL 인증서 갱신 또는 교체 진행 필요"
),
sr_type=SRType.OTHER,
status=SRStatus.RECEIVED,
priority=priority,
requested_by=f"agent:{agent.id}",
)
db.add(sr)
await db.flush()
task = self._new_task(
agent.id,
f"SSL 만료 감지: {srv.server_name} D-{days_left}",
{"server_id": srv.id, "days_left": days_left},
)
task.status = AgentTaskStatus.COMPLETED
task.output_data = {"sr_id": sr.id, "priority": priority}
task.completed_at = datetime.now()
db.add(task)
await db.flush()
db.add(self._approval(
agent_id=agent.id, task_id=task.id,
action_type="CREATE_SR",
action_data={"sr_db_id": sr.id, "server_id": srv.id, "days_left": days_left},
auto=True,
))
agent.total_tasks += 1
created += 1
if created:
await db.commit()
logger.info("[SSL_WATCHER] SSL 만료 SR %d건 자동 생성", created)
# ══════════════════════════════════════════════════════════════════════════
# ── 4. WBS 모니터 ────────────────────────────────────────────────────────
# ══════════════════════════════════════════════════════════════════════════
async def _wbs_monitor(self, db: AsyncSession, agent) -> None:
"""
지연 WBS가 있는 SI 프로젝트 → LLM 위험 분석 → ProjectRisk 자동 생성.
WBS 지연이 3건 이상이거나 10일 이상 초과 시 위험 생성.
"""
from models import (
WbsItem, WbsStatus, SiProject, ProjectRisk,
RiskStatus, AgentTask, AgentTaskStatus, ProjectPhase,
)
today = date.today()
projects = (await db.execute(
select(SiProject).where(
SiProject.is_active.is_(True),
SiProject.phase.notin_([ProjectPhase.CLOSED]),
)
)).scalars().all()
for proj in projects:
delayed = (await db.execute(
select(WbsItem).where(
WbsItem.project_id == proj.id,
WbsItem.is_leaf.is_(True),
WbsItem.planned_end < today,
WbsItem.completion_pct < 100,
WbsItem.status != WbsStatus.CANCELLED,
)
)).scalars().all()
if not delayed:
continue
max_delay = max((today - w.planned_end).days for w in delayed)
if len(delayed) < 3 and max_delay < 10:
continue # 경미한 지연은 스킵
# 이미 AI 생성 위험이 있으면 스킵 (중복 방지)
existing_risk = (await db.execute(
select(ProjectRisk).where(
ProjectRisk.project_id == proj.id,
ProjectRisk.raised_by.startswith("agent:"),
ProjectRisk.status.notin_(["CLOSED", "ACCEPTED"]),
)
)).scalars().first()
if existing_risk:
continue
total_wbs = (await db.execute(
select(WbsItem).where(
WbsItem.project_id == proj.id,
WbsItem.is_leaf.is_(True),
)
)).scalars().all()
avg_pct = (
sum(w.completion_pct for w in total_wbs) / len(total_wbs)
if total_wbs else 0
)
task = self._new_task(
agent.id,
f"WBS 위험 분석: {proj.project_name}",
{"project_id": proj.id, "delayed_count": len(delayed), "max_delay_days": max_delay},
)
db.add(task)
await db.flush()
result = await self.llm.json_generate(
prompt=(
f"SI 프로젝트 WBS 현황을 분석하고 위험을 평가하세요.\n\n"
f"프로젝트: {proj.project_name}\n"
f"전체 WBS: {len(total_wbs)}건 | 지연: {len(delayed)}건 | 최대 지연: {max_delay}\n"
f"평균 진척률: {avg_pct:.1f}%\n\n"
f"응답 JSON:\n"
f'{{"risk_level":"CRITICAL|HIGH|MEDIUM|LOW",'
f'"probability":1~3,"impact":1~3,'
f'"title":"위험 제목 (30자 이내)",'
f'"description":"위험 상세 (100자 이내)",'
f'"mitigation":"완화 방안 (100자 이내)"}}'
),
model=agent.llm_model,
system=agent.system_prompt or "당신은 SI 프로젝트 관리 전문가입니다.",
)
prob = min(max(int(result.get("probability", 2)), 1), 3)
impact = min(max(int(result.get("impact", 2)), 1), 3)
score = prob * impact
from models import RiskLevel
level_map = {9: RiskLevel.CRITICAL, 6: RiskLevel.HIGH, 4: RiskLevel.HIGH,
3: RiskLevel.MEDIUM, 2: RiskLevel.MEDIUM}
risk_level = next(
(v for k, v in sorted(level_map.items(), reverse=True) if score >= k),
RiskLevel.LOW,
)
risk = ProjectRisk(
risk_id=f"RSK-{datetime.now().strftime('%Y%m%d')}-{str(uuid4())[:5].upper()}",
project_id=proj.id,
title=result.get("title", f"WBS 지연 위험 ({len(delayed)}건)"),
description=result.get("description", ""),
probability_level=prob,
impact_level=impact,
risk_score=score,
risk_level=risk_level,
mitigation_plan=result.get("mitigation", ""),
status=RiskStatus.IDENTIFIED,
raised_by=f"agent:{agent.id}",
)
db.add(risk)
await db.flush()
task.status = AgentTaskStatus.COMPLETED
task.output_data = {**result, "risk_id": risk.risk_id}
task.completed_at = datetime.now()
db.add(self._approval(
agent_id=agent.id, task_id=task.id,
action_type="CREATE_RISK",
action_data={"risk_id": risk.id, "project_id": proj.id, "score": score},
auto=(risk_level not in ["CRITICAL"]),
))
agent.total_tasks += 1
await db.commit()
logger.info("[WBS_MONITOR] 위험 분석 완료")
# ── WBS 지연 완료 예측 ────────────────────────────────────────────
await self._predict_wbs_completion(db, agent)
# ── WBS 완료 예측 헬퍼 ───────────────────────────────────────────────────
async def _predict_wbs_completion(self, db: AsyncSession, agent) -> None:
"""
최근 7일간 완료율 추이(기울기)로 프로젝트 완료 예상일을 계산한다.
계획 종료일 초과 시 리스크 자동 등록.
외부 API 없음 — 순수 계산 로직.
"""
from models import SiProject, AgentTask, ProjectPhase, ProjectRisk, RiskLevel, RiskStatus
from sqlalchemy import select
import math
today = date.today()
projects = (await db.execute(
select(SiProject).where(
SiProject.is_active.is_(True),
SiProject.phase.notin_([ProjectPhase.CLOSED]),
SiProject.planned_end_date.isnot(None),
)
)).scalars().all()
for proj in projects:
try:
curr_rate = float(proj.completion_rate or 0)
# 7일 전 스냅샷이 없으면 estimated_completion에서 역산
# (간소화: 7일 전 비율 = curr_rate - 7*daily_delta 로 추정)
prev_rate_estimate = max(curr_rate - 7.0, 0.0) # 이전 데이터 없으면 7% 성장 가정
daily_delta = (curr_rate - prev_rate_estimate) / 7 if curr_rate > prev_rate_estimate else 0.0
if daily_delta <= 0:
continue # 진척 없는 프로젝트는 스킵
remaining = 100.0 - curr_rate
est_days = math.ceil(remaining / daily_delta)
from datetime import timedelta
est_completion = today + timedelta(days=est_days)
# estimated_completion 업데이트
proj.estimated_completion = est_completion
# 계획 종료일 초과 여부 확인
planned_end = proj.planned_end_date
if isinstance(planned_end, str):
from datetime import datetime as dt
planned_end = dt.strptime(planned_end[:10], "%Y-%m-%d").date()
if est_completion > planned_end:
delay_days = (est_completion - planned_end).days
# 이미 DELAY_PREDICTION 리스크가 있으면 스킵
existing = (await db.execute(
select(ProjectRisk).where(
ProjectRisk.project_id == proj.id,
ProjectRisk.raised_by == "agent:wbs_prediction",
ProjectRisk.status.notin_(["CLOSED"]),
)
)).scalars().first()
if not existing:
risk = ProjectRisk(
risk_id=f"RSK-PRED-{datetime.now().strftime('%Y%m%d')}-{proj.id}",
project_id=proj.id,
title=f"완료 예측 지연 ({delay_days}일 초과)",
description=(
f"현재 진척률 {curr_rate:.1f}% (일 평균 +{daily_delta:.2f}%).\n"
f"예상 완료일: {est_completion.isoformat()} "
f"(계획 종료일 {planned_end} 대비 {delay_days}일 초과)"
),
probability_level=2,
impact_level=3,
risk_score=6,
risk_level=RiskLevel.HIGH,
mitigation_plan="일일 진척률 향상 계획 수립 및 자원 재배분 검토",
status=RiskStatus.IDENTIFIED,
raised_by="agent:wbs_prediction",
)
db.add(risk)
logger.info(
"[WBS_PREDICTION] 지연 리스크 등록: project=%s delay=%dd",
proj.project_name, delay_days
)
except Exception as exc:
logger.warning("[WBS_PREDICTION] project_id=%d 예측 오류: %s", proj.id, exc)
await db.commit()
logger.info("[WBS_PREDICTION] 완료 예측 업데이트 완료")
# ══════════════════════════════════════════════════════════════════════════
# ── 5. PM 제안 에이전트 ──────────────────────────────────────────────────
# ══════════════════════════════════════════════════════════════════════════
async def _pm_suggester(self, db: AsyncSession, agent) -> None:
"""PM 스케줄 미등록 활성 서버 탐지 → 알림 태스크 기록."""
from models import Server, PmSchedule, AgentTask, AgentTaskStatus
scheduled_server_ids = [
row[0]
for row in (await db.execute(
select(PmSchedule.server_id).where(PmSchedule.is_active.is_(True))
)).all()
if row[0] is not None
]
unmanaged = (await db.execute(
select(Server).where(
Server.id.notin_(scheduled_server_ids),
Server.is_active.is_(True),
).limit(10)
)).scalars().all()
if not unmanaged:
logger.debug("[PM_SUGGESTER] 미등록 서버 없음")
return
for srv in unmanaged:
task = self._new_task(
agent.id,
f"PM 미등록 서버 감지: {srv.server_name}",
{"server_id": srv.id, "server_name": srv.server_name},
)
task.status = AgentTaskStatus.COMPLETED
task.output_data = {"message": "PM 스케줄 등록 권고"}
task.completed_at = datetime.now()
db.add(task)
await db.flush()
db.add(self._approval(
agent_id=agent.id, task_id=task.id,
action_type="SUGGEST_PM",
action_data={"server_id": srv.id, "server_name": srv.server_name},
auto=True,
))
agent.total_tasks += 1
await db.commit()
logger.info("[PM_SUGGESTER] PM 미등록 서버 %d건 감지", len(unmanaged))
# ══════════════════════════════════════════════════════════════════════════
# ── 6. 개발자 에이전트 (Phase 1 Paperclip 스타일) ────────────────────────
# ══════════════════════════════════════════════════════════════════════════
async def _developer_agent(self, db: AsyncSession, agent) -> None:
"""
PENDING 태스크를 받아 LLM으로 코드 생성.
코드 포함 시 사람 승인(PENDING) 생성.
"""
from models import AgentTask, AgentTaskStatus
pending = (await db.execute(
select(AgentTask).where(
AgentTask.agent_id == agent.id,
AgentTask.status == AgentTaskStatus.PENDING,
).limit(3)
)).scalars().all()
if not pending:
return
for task in pending:
task.status = AgentTaskStatus.IN_PROGRESS
task.started_at = datetime.now()
await db.flush()
input_data = task.input_data or {}
prompt = input_data.get("prompt") or task.description or task.title
resp = await self.llm.generate(
prompt=prompt,
model=agent.llm_model,
system=(
agent.system_prompt
or (
"당신은 GUARDiA ITSM 백엔드 개발자입니다.\n"
"FastAPI + SQLAlchemy async 코드를 작성합니다.\n"
"보안 규칙: ServerOut에 ip_addr/ssh_user/os_pw_enc 포함 금지."
)
),
)
contains_code = any(kw in resp.content for kw in ["```python", "def ", "class ", "async def"])
task.status = AgentTaskStatus.COMPLETED
task.output_data = {"response": resp.content[:5000], "tokens": resp.tokens_total}
task.tokens_used = resp.tokens_total
task.completed_at = datetime.now()
db.add(self._approval(
agent_id=agent.id, task_id=task.id,
action_type="CODE_CHANGE",
action_data={"preview": resp.content[:500]},
auto=not contains_code,
))
agent.total_tasks += 1
agent.total_tokens += resp.tokens_total
await db.commit()
logger.info("[DEVELOPER] %d건 태스크 처리", len(pending))
# ── 싱글턴 ──────────────────────────────────────────────────────────────────
_engine: Optional[AgentEngine] = None
def get_agent_engine() -> AgentEngine:
"""에이전트 엔진 싱글턴 반환."""
global _engine
if _engine is None:
_engine = AgentEngine()
return _engine