zioinfo-mail/itsm/core/auto_processor.py
DESKTOP-TKLFCPR\ython 27e46ba535 feat(autonomous): 자율 운영 자동처리 + 승인 게이트 시스템 구현
## 자동처리 엔진 (core/auto_processor.py)
위험도 평가 함수 assess_risk():
  LOW/MEDIUM → 즉시 자동 처리
  HIGH       → 승인 요청 발송 후 대기
  CRITICAL   → 관리자 승인 필수

자동 처리 항목:
  - SR 자동 분류·배정 (키워드/우선순위 추론)
  - INQUIRY SR → KB 검색 후 자동 답변 (신뢰도 75% 이상)
  - SLA 임박(30분) → 자동 에스컬레이션
  - 이상 감지(HIGH+) → 인시던트 자동 생성
  - 완료 SR → KB 아티클 초안 자동 생성

## 자율 운영 API (routers/autonomous.py)
  GET  /api/auto/status          오늘 자동처리 통계
  POST /api/auto/run             사이클 즉시 실행 (ADMIN)
  GET  /api/auto/queue           승인 대기 작업 목록
  POST /api/auto/queue           작업 등록 → 위험도 평가 후 분기
  POST /api/auto/approve/{id}    승인 (HIGH=ENGINEER+, CRITICAL=ADMIN)
  POST /api/auto/reject/{id}     거부
  GET  /api/auto/history         처리 이력

## 스케줄러 (core/scheduler.py)
  5분마다 _auto_processing_cycle() 실행
  - 신규 SR 자동 분류·배정
  - INQUIRY SR KB 자동 답변
  - SLA 에스컬레이션
  - 완료 SR KB 초안 생성

## 봇 명령어 (routers/messenger.py)
  /autoq              승인 대기 큐 조회
  /approve <ID> [의견] 승인
  /reject  <ID> [사유] 거부

## DB 모델 (models.py)
  AutoAction: 자동처리 이력 + 승인 큐
  AutoActionStatus: AUTO_DONE|PENDING_APPROVAL|APPROVED|REJECTED|EXPIRED

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-31 13:03:41 +09:00

450 lines
16 KiB
Python

"""
자율 운영 자동처리 엔진.
위험도(RiskLevel) 기반 자동/승인 분기:
LOW → 즉시 자동 처리 + 감사 기록
MEDIUM → 자동 처리 + 운영팀 알림
HIGH → 승인 요청 메시지 발송 후 대기
CRITICAL → 차단 + 관리자 승인 필수
자동 처리 항목:
- SR 자동 분류·배정 (키워드/ML)
- INQUIRY SR → KB 검색 후 자동 응답
- 헬스체크 이상 → 인시던트 자동 생성
- SLA 위반 임박 → 자동 에스컬레이션
- 취약점 스캔 결과 → 보안 SR 자동 생성
- KB 아티클 자동 초안 생성 (SR/인시던트 완료 후)
- 배치 실패 → 알림 + 재시도
승인 필요 항목:
- 서버 재시작 / 서비스 중단
- 운영(PRD) 환경 배포
- DR Failover 실행
- 대량 SR 상태 일괄 변경
- 사용자 계정 비활성화
- 보안 정책 변경
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional
from sqlalchemy import select, and_, func as sqlfunc
from sqlalchemy.ext.asyncio import AsyncSession
logger = logging.getLogger(__name__)
class RiskLevel(str, Enum):
LOW = "LOW" # 자동 처리
MEDIUM = "MEDIUM" # 자동 처리 + 알림
HIGH = "HIGH" # 승인 필요
CRITICAL = "CRITICAL" # 관리자 승인 필수
# ── 위험도 평가 ──────────────────────────────────────────────────────────────
def assess_risk(action_type: str, context: dict) -> RiskLevel:
"""
작업 유형과 컨텍스트 기반 위험도 평가.
환경(PRD/STG/DEV), 대상 서버 수, 작업 종류를 고려.
"""
env = str(context.get("environment", "")).upper()
target_count = int(context.get("target_count", 1))
sr_priority = str(context.get("priority", "MEDIUM")).upper()
AUTO_ACTIONS = {
"sr_classify", "sr_assign", "kb_answer", "kb_draft",
"health_notify", "sla_escalate", "vuln_notify",
"batch_retry_notify", "report_generate", "anomaly_notify",
}
MEDIUM_ACTIONS = {
"sr_auto_close", "log_collect", "ssl_notify", "perf_report",
}
HIGH_ACTIONS = {
"server_restart", "service_stop", "deploy_stg",
"bulk_sr_update", "account_disable", "script_exec",
}
CRITICAL_ACTIONS = {
"deploy_prd", "dr_failover", "db_schema_change",
"security_policy_change", "bulk_delete", "network_change",
}
if action_type in AUTO_ACTIONS:
return RiskLevel.LOW
if action_type in MEDIUM_ACTIONS:
return RiskLevel.MEDIUM
if action_type in HIGH_ACTIONS:
# 운영 환경이면 CRITICAL로 격상
if env == "PRD" or target_count > 5:
return RiskLevel.CRITICAL
return RiskLevel.HIGH
if action_type in CRITICAL_ACTIONS:
return RiskLevel.CRITICAL
# 미분류 → 안전하게 HIGH 처리
return RiskLevel.HIGH
# ── SR 자동 분류 ──────────────────────────────────────────────────────────────
async def auto_classify_sr(db: AsyncSession, sr_id: int) -> dict:
"""
SR 자동 분류 + 담당자 배정.
키워드 매핑 → sr_type, priority 갱신, 담당자 자동 배정.
"""
from models import SRRequest, Priority, SRType
from routers.assign import auto_assign_engine
q = await db.execute(select(SRRequest).where(SRRequest.id == sr_id))
sr = q.scalar_one_or_none()
if not sr:
return {"success": False, "error": "SR 없음"}
text = f"{sr.title} {sr.description or ''}".lower()
# 타입 추론
type_map = {
SRType.DEPLOY: ["배포", "deploy", "릴리즈", "release", "업데이트", "update"],
SRType.RESTART: ["재시작", "restart", "재구동", "중단", "stop", "기동", "start"],
SRType.LOG: ["로그", "log", "오류", "error", "에러", "확인"],
SRType.INQUIRY: ["문의", "질문", "어떻게", "방법", "how", "what", "?"],
}
inferred_type = SRType.OTHER
for sr_type, keywords in type_map.items():
if any(kw in text for kw in keywords):
inferred_type = sr_type
break
# 우선순위 추론
priority_map = {
Priority.CRITICAL: ["긴급", "장애", "critical", "emergency", "불가", "서비스 중단"],
Priority.HIGH: ["높음", "high", "빠른", "즉시", "soon"],
Priority.LOW: ["낮음", "low", "여유", "천천히"],
}
inferred_priority = Priority.MEDIUM
for prio, keywords in priority_map.items():
if any(kw in text for kw in keywords):
inferred_priority = prio
break
changed = []
if sr.sr_type != inferred_type:
sr.sr_type = inferred_type
changed.append(f"타입: {inferred_type}")
if sr.priority != inferred_priority:
sr.priority = inferred_priority
changed.append(f"우선순위: {inferred_priority}")
# 담당자 자동 배정
assigned = await auto_assign_engine(db, sr)
if assigned:
changed.append(f"담당자: {sr.assigned_to}")
await db.commit()
return {
"success": True,
"sr_id": sr.sr_id,
"changes": changed,
"auto_action": "sr_classify",
}
# ── INQUIRY SR → KB 자동 응답 ────────────────────────────────────────────────
async def auto_answer_inquiry(db: AsyncSession, sr_id: int) -> dict:
"""
문의형(INQUIRY) SR에 KB 검색 결과를 자동 댓글로 답변.
신뢰도 80% 이상이면 자동 답변 + SR COMPLETED 처리.
"""
from models import SRRequest, SRStatus, SRType
from core.kb_agent import search_kb_for_query
q = await db.execute(select(SRRequest).where(SRRequest.id == sr_id))
sr = q.scalar_one_or_none()
if not sr or sr.sr_type != SRType.INQUIRY:
return {"success": False, "skip": True, "reason": "INQUIRY 타입 아님"}
query = f"{sr.title} {sr.description or ''}"
try:
kb_result = await search_kb_for_query(query, limit=1)
except Exception as e:
return {"success": False, "error": str(e)[:100]}
if not kb_result or kb_result[0].get("score", 0) < 0.75:
return {"success": False, "skip": True, "reason": "KB 관련 문서 없음 (신뢰도 부족)"}
top = kb_result[0]
answer = (
f"[자동 답변 — GUARDiA AI]\n\n"
f"관련 KB 문서를 찾았습니다:\n\n"
f"**{top.get('title', '')}**\n"
f"{top.get('summary', '')[:500]}\n\n"
f"도움이 되셨으면 이 SR을 완료 처리합니다.\n"
f"추가 문의가 있으시면 새 SR을 등록해 주세요."
)
sr.status = SRStatus.COMPLETED
sr.description = (sr.description or "") + f"\n\n---\n{answer}"
await db.commit()
return {
"success": True,
"sr_id": sr.sr_id,
"kb_title": top.get("title"),
"auto_action": "kb_answer",
}
# ── SLA 임박 자동 에스컬레이션 ──────────────────────────────────────────────
async def auto_escalate_sla(db: AsyncSession) -> list[dict]:
"""
SLA 마감 30분 이내 미완료 SR → 자동 에스컬레이션.
이미 에스컬레이션된 SR은 건너뜀.
"""
from models import SRRequest, SRStatus
threshold = datetime.now() + timedelta(minutes=30)
q = await db.execute(
select(SRRequest).where(
and_(
SRRequest.sla_deadline <= threshold,
SRRequest.sla_deadline >= datetime.now(),
SRRequest.status.not_in([SRStatus.COMPLETED, SRStatus.REJECTED]),
SRRequest.escalated_at.is_(None),
SRRequest.sla_breached == False,
)
).limit(20)
)
srs = q.scalars().all()
escalated = []
for sr in srs:
sr.escalated_at = datetime.now()
sr.escalated_to = "ops-team"
escalated.append({
"sr_id": sr.sr_id,
"title": sr.title,
"deadline": sr.sla_deadline.isoformat() if sr.sla_deadline else None,
"auto_action": "sla_escalate",
})
if escalated:
await db.commit()
return escalated
# ── 이상 감지 → 인시던트 자동 생성 ─────────────────────────────────────────
async def auto_create_incident_from_anomaly(db: AsyncSession,
anomaly: dict) -> dict:
"""
AI 이상 탐지 결과를 기반으로 인시던트 자동 생성.
심각도 HIGH 이상만 자동 생성.
"""
from models import SRRequest, SRStatus, SRType, Priority
import uuid
severity = anomaly.get("severity", "LOW")
if severity not in ("HIGH", "CRITICAL"):
return {"success": False, "skip": True, "reason": f"심각도 {severity} — 자동생성 기준 미달"}
sr_id = f"INC-{datetime.now().strftime('%Y%m%d')}-{uuid.uuid4().hex[:4].upper()}"
inc = SRRequest(
sr_id=sr_id,
sr_type=SRType.OTHER,
title=f"[자동감지] {anomaly.get('description', '이상 감지')}",
description=(
f"AI 이상 탐지 자동 인시던트\n\n"
f"서버: {anomaly.get('server', 'N/A')}\n"
f"지표: {anomaly.get('metric', 'N/A')}\n"
f"값: {anomaly.get('value', 'N/A')}\n"
f"임계값: {anomaly.get('threshold', 'N/A')}\n"
f"감지시각: {datetime.now().isoformat()}"
),
priority=Priority.CRITICAL if severity == "CRITICAL" else Priority.HIGH,
status=SRStatus.RECEIVED,
requested_by="AUTO-SYSTEM",
)
db.add(inc)
await db.commit()
await db.refresh(inc)
return {
"success": True,
"sr_id": sr_id,
"incident_id": inc.id,
"auto_action": "anomaly_notify",
}
# ── 완료 SR/인시던트 → KB 아티클 초안 생성 ─────────────────────────────────
async def auto_draft_kb_article(db: AsyncSession, sr_id: int) -> dict:
"""
완료된 SR/인시던트에서 KB 아티클 초안 자동 생성 (Ollama).
초안 상태로 저장 — KB 담당자가 검토 후 게시.
"""
from models import SRRequest, SRStatus
from core.llm_client import call_llm
q = await db.execute(
select(SRRequest).where(
SRRequest.id == sr_id,
SRRequest.status == SRStatus.COMPLETED,
)
)
sr = q.scalar_one_or_none()
if not sr:
return {"success": False, "skip": True}
prompt = (
f"다음 SR/인시던트 해결 내용을 바탕으로 KB 아티클 초안을 작성해줘:\n\n"
f"제목: {sr.title}\n"
f"내용: {(sr.description or '')[:500]}\n\n"
f"형식: 문제 설명 / 원인 / 해결 방법 / 예방 조치 (각 섹션 2~3줄)"
)
try:
draft = await call_llm(prompt, max_tokens=400)
except Exception as e:
return {"success": False, "error": str(e)[:100]}
if not draft:
return {"success": False, "error": "LLM 응답 없음"}
# KnowledgeBase 모델에 초안 저장
try:
from models import KnowledgeBase
kb = KnowledgeBase(
title=f"[초안] {sr.title}",
content=draft,
category="자동생성",
tags="auto,draft",
is_draft=True,
created_by="AUTO-SYSTEM",
source_sr_id=sr.sr_id,
)
db.add(kb)
await db.commit()
return {
"success": True,
"sr_id": sr.sr_id,
"kb_title": kb.title,
"auto_action": "kb_draft",
}
except Exception as e:
# KB 모델이 없거나 필드 불일치 시 스킵
logger.warning("KB draft skip: %s", e)
return {"success": False, "skip": True, "reason": str(e)[:80]}
# ── 승인 요청 메시지 생성 ────────────────────────────────────────────────────
def build_approval_message(action: dict) -> str:
"""
승인이 필요한 작업에 대한 메신저 봇 승인 요청 메시지 생성.
"""
action_id = action.get("action_id", "N/A")
action_type = action.get("action_type", "N/A")
description = action.get("description", "")
risk = action.get("risk", "HIGH")
target = action.get("target", "N/A")
requested_by = action.get("requested_by", "SYSTEM")
icon = {"HIGH": "⚠️", "CRITICAL": "🚨"}.get(risk, "")
return (
f"{icon} [승인 요청] {action_type}\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"요청 ID: {action_id}\n"
f"작업: {description}\n"
f"대상: {target}\n"
f"위험도: {risk}\n"
f"요청자: {requested_by}\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"✅ 승인: /approve {action_id}\n"
f"❌ 거부: /reject {action_id} [사유]\n"
f"⏰ 미응답 시 30분 후 자동 에스컬레이션"
)
# ── 자율 처리 메인 루프 ──────────────────────────────────────────────────────
async def run_auto_processing_cycle(db: AsyncSession) -> dict:
"""
5분마다 스케줄러에서 호출되는 자동 처리 사이클.
Returns: 처리 결과 요약
"""
results = {
"auto_processed": [],
"approval_requested": [],
"skipped": [],
"errors": [],
"ran_at": datetime.now().isoformat(),
}
# 1. 신규 RECEIVED SR 자동 분류·배정
from models import SRRequest, SRStatus, SRType
q = await db.execute(
select(SRRequest).where(
SRRequest.status == SRStatus.RECEIVED,
SRRequest.assigned_to.is_(None),
).limit(20)
)
new_srs = q.scalars().all()
for sr in new_srs:
try:
r = await auto_classify_sr(db, sr.id)
if r["success"]:
results["auto_processed"].append(r)
# INQUIRY 타입이면 KB 자동 답변 시도
if sr.sr_type == SRType.INQUIRY:
r2 = await auto_answer_inquiry(db, sr.id)
if r2.get("success"):
results["auto_processed"].append(r2)
elif not r2.get("skip"):
results["errors"].append(r2)
except Exception as e:
results["errors"].append({"sr_id": getattr(sr, "sr_id", "?"), "error": str(e)[:80]})
# 2. SLA 임박 SR 자동 에스컬레이션
try:
escalated = await auto_escalate_sla(db)
results["auto_processed"].extend(escalated)
except Exception as e:
results["errors"].append({"action": "sla_escalate", "error": str(e)[:80]})
# 3. 완료된 SR 중 KB 초안 미생성 항목 처리 (최근 1시간 이내 완료)
try:
cutoff = datetime.now() - timedelta(hours=1)
q2 = await db.execute(
select(SRRequest).where(
SRRequest.status == SRStatus.COMPLETED,
SRRequest.updated_at >= cutoff,
).limit(5)
)
recent_done = q2.scalars().all()
for sr in recent_done:
r3 = await auto_draft_kb_article(db, sr.id)
if r3.get("success"):
results["auto_processed"].append(r3)
except Exception as e:
results["errors"].append({"action": "kb_draft", "error": str(e)[:80]})
# 결과 요약 로깅
logger.info(
"[AutoProcessor] 자동처리 %d건, 승인요청 %d건, 오류 %d",
len(results["auto_processed"]),
len(results["approval_requested"]),
len(results["errors"]),
)
return results