## 자동처리 엔진 (core/auto_processor.py)
위험도 평가 함수 assess_risk():
LOW/MEDIUM → 즉시 자동 처리
HIGH → 승인 요청 발송 후 대기
CRITICAL → 관리자 승인 필수
자동 처리 항목:
- SR 자동 분류·배정 (키워드/우선순위 추론)
- INQUIRY SR → KB 검색 후 자동 답변 (신뢰도 75% 이상)
- SLA 임박(30분) → 자동 에스컬레이션
- 이상 감지(HIGH+) → 인시던트 자동 생성
- 완료 SR → KB 아티클 초안 자동 생성
## 자율 운영 API (routers/autonomous.py)
GET /api/auto/status 오늘 자동처리 통계
POST /api/auto/run 사이클 즉시 실행 (ADMIN)
GET /api/auto/queue 승인 대기 작업 목록
POST /api/auto/queue 작업 등록 → 위험도 평가 후 분기
POST /api/auto/approve/{id} 승인 (HIGH=ENGINEER+, CRITICAL=ADMIN)
POST /api/auto/reject/{id} 거부
GET /api/auto/history 처리 이력
## 스케줄러 (core/scheduler.py)
5분마다 _auto_processing_cycle() 실행
- 신규 SR 자동 분류·배정
- INQUIRY SR KB 자동 답변
- SLA 에스컬레이션
- 완료 SR KB 초안 생성
## 봇 명령어 (routers/messenger.py)
/autoq 승인 대기 큐 조회
/approve <ID> [의견] 승인
/reject <ID> [사유] 거부
## DB 모델 (models.py)
AutoAction: 자동처리 이력 + 승인 큐
AutoActionStatus: AUTO_DONE|PENDING_APPROVAL|APPROVED|REJECTED|EXPIRED
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
450 lines
16 KiB
Python
450 lines
16 KiB
Python
"""
|
|
자율 운영 자동처리 엔진.
|
|
|
|
위험도(RiskLevel) 기반 자동/승인 분기:
|
|
LOW → 즉시 자동 처리 + 감사 기록
|
|
MEDIUM → 자동 처리 + 운영팀 알림
|
|
HIGH → 승인 요청 메시지 발송 후 대기
|
|
CRITICAL → 차단 + 관리자 승인 필수
|
|
|
|
자동 처리 항목:
|
|
- SR 자동 분류·배정 (키워드/ML)
|
|
- INQUIRY SR → KB 검색 후 자동 응답
|
|
- 헬스체크 이상 → 인시던트 자동 생성
|
|
- SLA 위반 임박 → 자동 에스컬레이션
|
|
- 취약점 스캔 결과 → 보안 SR 자동 생성
|
|
- KB 아티클 자동 초안 생성 (SR/인시던트 완료 후)
|
|
- 배치 실패 → 알림 + 재시도
|
|
|
|
승인 필요 항목:
|
|
- 서버 재시작 / 서비스 중단
|
|
- 운영(PRD) 환경 배포
|
|
- DR Failover 실행
|
|
- 대량 SR 상태 일괄 변경
|
|
- 사용자 계정 비활성화
|
|
- 보안 정책 변경
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from enum import Enum
|
|
from typing import Optional
|
|
|
|
from sqlalchemy import select, and_, func as sqlfunc
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RiskLevel(str, Enum):
|
|
LOW = "LOW" # 자동 처리
|
|
MEDIUM = "MEDIUM" # 자동 처리 + 알림
|
|
HIGH = "HIGH" # 승인 필요
|
|
CRITICAL = "CRITICAL" # 관리자 승인 필수
|
|
|
|
|
|
# ── 위험도 평가 ──────────────────────────────────────────────────────────────
|
|
|
|
def assess_risk(action_type: str, context: dict) -> RiskLevel:
|
|
"""
|
|
작업 유형과 컨텍스트 기반 위험도 평가.
|
|
환경(PRD/STG/DEV), 대상 서버 수, 작업 종류를 고려.
|
|
"""
|
|
env = str(context.get("environment", "")).upper()
|
|
target_count = int(context.get("target_count", 1))
|
|
sr_priority = str(context.get("priority", "MEDIUM")).upper()
|
|
|
|
AUTO_ACTIONS = {
|
|
"sr_classify", "sr_assign", "kb_answer", "kb_draft",
|
|
"health_notify", "sla_escalate", "vuln_notify",
|
|
"batch_retry_notify", "report_generate", "anomaly_notify",
|
|
}
|
|
MEDIUM_ACTIONS = {
|
|
"sr_auto_close", "log_collect", "ssl_notify", "perf_report",
|
|
}
|
|
HIGH_ACTIONS = {
|
|
"server_restart", "service_stop", "deploy_stg",
|
|
"bulk_sr_update", "account_disable", "script_exec",
|
|
}
|
|
CRITICAL_ACTIONS = {
|
|
"deploy_prd", "dr_failover", "db_schema_change",
|
|
"security_policy_change", "bulk_delete", "network_change",
|
|
}
|
|
|
|
if action_type in AUTO_ACTIONS:
|
|
return RiskLevel.LOW
|
|
|
|
if action_type in MEDIUM_ACTIONS:
|
|
return RiskLevel.MEDIUM
|
|
|
|
if action_type in HIGH_ACTIONS:
|
|
# 운영 환경이면 CRITICAL로 격상
|
|
if env == "PRD" or target_count > 5:
|
|
return RiskLevel.CRITICAL
|
|
return RiskLevel.HIGH
|
|
|
|
if action_type in CRITICAL_ACTIONS:
|
|
return RiskLevel.CRITICAL
|
|
|
|
# 미분류 → 안전하게 HIGH 처리
|
|
return RiskLevel.HIGH
|
|
|
|
|
|
# ── SR 자동 분류 ──────────────────────────────────────────────────────────────
|
|
|
|
async def auto_classify_sr(db: AsyncSession, sr_id: int) -> dict:
|
|
"""
|
|
SR 자동 분류 + 담당자 배정.
|
|
키워드 매핑 → sr_type, priority 갱신, 담당자 자동 배정.
|
|
"""
|
|
from models import SRRequest, Priority, SRType
|
|
from routers.assign import auto_assign_engine
|
|
|
|
q = await db.execute(select(SRRequest).where(SRRequest.id == sr_id))
|
|
sr = q.scalar_one_or_none()
|
|
if not sr:
|
|
return {"success": False, "error": "SR 없음"}
|
|
|
|
text = f"{sr.title} {sr.description or ''}".lower()
|
|
|
|
# 타입 추론
|
|
type_map = {
|
|
SRType.DEPLOY: ["배포", "deploy", "릴리즈", "release", "업데이트", "update"],
|
|
SRType.RESTART: ["재시작", "restart", "재구동", "중단", "stop", "기동", "start"],
|
|
SRType.LOG: ["로그", "log", "오류", "error", "에러", "확인"],
|
|
SRType.INQUIRY: ["문의", "질문", "어떻게", "방법", "how", "what", "?"],
|
|
}
|
|
inferred_type = SRType.OTHER
|
|
for sr_type, keywords in type_map.items():
|
|
if any(kw in text for kw in keywords):
|
|
inferred_type = sr_type
|
|
break
|
|
|
|
# 우선순위 추론
|
|
priority_map = {
|
|
Priority.CRITICAL: ["긴급", "장애", "critical", "emergency", "불가", "서비스 중단"],
|
|
Priority.HIGH: ["높음", "high", "빠른", "즉시", "soon"],
|
|
Priority.LOW: ["낮음", "low", "여유", "천천히"],
|
|
}
|
|
inferred_priority = Priority.MEDIUM
|
|
for prio, keywords in priority_map.items():
|
|
if any(kw in text for kw in keywords):
|
|
inferred_priority = prio
|
|
break
|
|
|
|
changed = []
|
|
if sr.sr_type != inferred_type:
|
|
sr.sr_type = inferred_type
|
|
changed.append(f"타입: {inferred_type}")
|
|
if sr.priority != inferred_priority:
|
|
sr.priority = inferred_priority
|
|
changed.append(f"우선순위: {inferred_priority}")
|
|
|
|
# 담당자 자동 배정
|
|
assigned = await auto_assign_engine(db, sr)
|
|
if assigned:
|
|
changed.append(f"담당자: {sr.assigned_to}")
|
|
|
|
await db.commit()
|
|
return {
|
|
"success": True,
|
|
"sr_id": sr.sr_id,
|
|
"changes": changed,
|
|
"auto_action": "sr_classify",
|
|
}
|
|
|
|
|
|
# ── INQUIRY SR → KB 자동 응답 ────────────────────────────────────────────────
|
|
|
|
async def auto_answer_inquiry(db: AsyncSession, sr_id: int) -> dict:
|
|
"""
|
|
문의형(INQUIRY) SR에 KB 검색 결과를 자동 댓글로 답변.
|
|
신뢰도 80% 이상이면 자동 답변 + SR COMPLETED 처리.
|
|
"""
|
|
from models import SRRequest, SRStatus, SRType
|
|
from core.kb_agent import search_kb_for_query
|
|
|
|
q = await db.execute(select(SRRequest).where(SRRequest.id == sr_id))
|
|
sr = q.scalar_one_or_none()
|
|
if not sr or sr.sr_type != SRType.INQUIRY:
|
|
return {"success": False, "skip": True, "reason": "INQUIRY 타입 아님"}
|
|
|
|
query = f"{sr.title} {sr.description or ''}"
|
|
try:
|
|
kb_result = await search_kb_for_query(query, limit=1)
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)[:100]}
|
|
|
|
if not kb_result or kb_result[0].get("score", 0) < 0.75:
|
|
return {"success": False, "skip": True, "reason": "KB 관련 문서 없음 (신뢰도 부족)"}
|
|
|
|
top = kb_result[0]
|
|
answer = (
|
|
f"[자동 답변 — GUARDiA AI]\n\n"
|
|
f"관련 KB 문서를 찾았습니다:\n\n"
|
|
f"**{top.get('title', '')}**\n"
|
|
f"{top.get('summary', '')[:500]}\n\n"
|
|
f"도움이 되셨으면 이 SR을 완료 처리합니다.\n"
|
|
f"추가 문의가 있으시면 새 SR을 등록해 주세요."
|
|
)
|
|
|
|
sr.status = SRStatus.COMPLETED
|
|
sr.description = (sr.description or "") + f"\n\n---\n{answer}"
|
|
await db.commit()
|
|
|
|
return {
|
|
"success": True,
|
|
"sr_id": sr.sr_id,
|
|
"kb_title": top.get("title"),
|
|
"auto_action": "kb_answer",
|
|
}
|
|
|
|
|
|
# ── SLA 임박 자동 에스컬레이션 ──────────────────────────────────────────────
|
|
|
|
async def auto_escalate_sla(db: AsyncSession) -> list[dict]:
|
|
"""
|
|
SLA 마감 30분 이내 미완료 SR → 자동 에스컬레이션.
|
|
이미 에스컬레이션된 SR은 건너뜀.
|
|
"""
|
|
from models import SRRequest, SRStatus
|
|
|
|
threshold = datetime.now() + timedelta(minutes=30)
|
|
q = await db.execute(
|
|
select(SRRequest).where(
|
|
and_(
|
|
SRRequest.sla_deadline <= threshold,
|
|
SRRequest.sla_deadline >= datetime.now(),
|
|
SRRequest.status.not_in([SRStatus.COMPLETED, SRStatus.REJECTED]),
|
|
SRRequest.escalated_at.is_(None),
|
|
SRRequest.sla_breached == False,
|
|
)
|
|
).limit(20)
|
|
)
|
|
srs = q.scalars().all()
|
|
|
|
escalated = []
|
|
for sr in srs:
|
|
sr.escalated_at = datetime.now()
|
|
sr.escalated_to = "ops-team"
|
|
escalated.append({
|
|
"sr_id": sr.sr_id,
|
|
"title": sr.title,
|
|
"deadline": sr.sla_deadline.isoformat() if sr.sla_deadline else None,
|
|
"auto_action": "sla_escalate",
|
|
})
|
|
|
|
if escalated:
|
|
await db.commit()
|
|
|
|
return escalated
|
|
|
|
|
|
# ── 이상 감지 → 인시던트 자동 생성 ─────────────────────────────────────────
|
|
|
|
async def auto_create_incident_from_anomaly(db: AsyncSession,
|
|
anomaly: dict) -> dict:
|
|
"""
|
|
AI 이상 탐지 결과를 기반으로 인시던트 자동 생성.
|
|
심각도 HIGH 이상만 자동 생성.
|
|
"""
|
|
from models import SRRequest, SRStatus, SRType, Priority
|
|
import uuid
|
|
|
|
severity = anomaly.get("severity", "LOW")
|
|
if severity not in ("HIGH", "CRITICAL"):
|
|
return {"success": False, "skip": True, "reason": f"심각도 {severity} — 자동생성 기준 미달"}
|
|
|
|
sr_id = f"INC-{datetime.now().strftime('%Y%m%d')}-{uuid.uuid4().hex[:4].upper()}"
|
|
inc = SRRequest(
|
|
sr_id=sr_id,
|
|
sr_type=SRType.OTHER,
|
|
title=f"[자동감지] {anomaly.get('description', '이상 감지')}",
|
|
description=(
|
|
f"AI 이상 탐지 자동 인시던트\n\n"
|
|
f"서버: {anomaly.get('server', 'N/A')}\n"
|
|
f"지표: {anomaly.get('metric', 'N/A')}\n"
|
|
f"값: {anomaly.get('value', 'N/A')}\n"
|
|
f"임계값: {anomaly.get('threshold', 'N/A')}\n"
|
|
f"감지시각: {datetime.now().isoformat()}"
|
|
),
|
|
priority=Priority.CRITICAL if severity == "CRITICAL" else Priority.HIGH,
|
|
status=SRStatus.RECEIVED,
|
|
requested_by="AUTO-SYSTEM",
|
|
)
|
|
db.add(inc)
|
|
await db.commit()
|
|
await db.refresh(inc)
|
|
|
|
return {
|
|
"success": True,
|
|
"sr_id": sr_id,
|
|
"incident_id": inc.id,
|
|
"auto_action": "anomaly_notify",
|
|
}
|
|
|
|
|
|
# ── 완료 SR/인시던트 → KB 아티클 초안 생성 ─────────────────────────────────
|
|
|
|
async def auto_draft_kb_article(db: AsyncSession, sr_id: int) -> dict:
|
|
"""
|
|
완료된 SR/인시던트에서 KB 아티클 초안 자동 생성 (Ollama).
|
|
초안 상태로 저장 — KB 담당자가 검토 후 게시.
|
|
"""
|
|
from models import SRRequest, SRStatus
|
|
from core.llm_client import call_llm
|
|
|
|
q = await db.execute(
|
|
select(SRRequest).where(
|
|
SRRequest.id == sr_id,
|
|
SRRequest.status == SRStatus.COMPLETED,
|
|
)
|
|
)
|
|
sr = q.scalar_one_or_none()
|
|
if not sr:
|
|
return {"success": False, "skip": True}
|
|
|
|
prompt = (
|
|
f"다음 SR/인시던트 해결 내용을 바탕으로 KB 아티클 초안을 작성해줘:\n\n"
|
|
f"제목: {sr.title}\n"
|
|
f"내용: {(sr.description or '')[:500]}\n\n"
|
|
f"형식: 문제 설명 / 원인 / 해결 방법 / 예방 조치 (각 섹션 2~3줄)"
|
|
)
|
|
try:
|
|
draft = await call_llm(prompt, max_tokens=400)
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)[:100]}
|
|
|
|
if not draft:
|
|
return {"success": False, "error": "LLM 응답 없음"}
|
|
|
|
# KnowledgeBase 모델에 초안 저장
|
|
try:
|
|
from models import KnowledgeBase
|
|
kb = KnowledgeBase(
|
|
title=f"[초안] {sr.title}",
|
|
content=draft,
|
|
category="자동생성",
|
|
tags="auto,draft",
|
|
is_draft=True,
|
|
created_by="AUTO-SYSTEM",
|
|
source_sr_id=sr.sr_id,
|
|
)
|
|
db.add(kb)
|
|
await db.commit()
|
|
return {
|
|
"success": True,
|
|
"sr_id": sr.sr_id,
|
|
"kb_title": kb.title,
|
|
"auto_action": "kb_draft",
|
|
}
|
|
except Exception as e:
|
|
# KB 모델이 없거나 필드 불일치 시 스킵
|
|
logger.warning("KB draft skip: %s", e)
|
|
return {"success": False, "skip": True, "reason": str(e)[:80]}
|
|
|
|
|
|
# ── 승인 요청 메시지 생성 ────────────────────────────────────────────────────
|
|
|
|
def build_approval_message(action: dict) -> str:
|
|
"""
|
|
승인이 필요한 작업에 대한 메신저 봇 승인 요청 메시지 생성.
|
|
"""
|
|
action_id = action.get("action_id", "N/A")
|
|
action_type = action.get("action_type", "N/A")
|
|
description = action.get("description", "")
|
|
risk = action.get("risk", "HIGH")
|
|
target = action.get("target", "N/A")
|
|
requested_by = action.get("requested_by", "SYSTEM")
|
|
|
|
icon = {"HIGH": "⚠️", "CRITICAL": "🚨"}.get(risk, "❓")
|
|
|
|
return (
|
|
f"{icon} [승인 요청] {action_type}\n"
|
|
f"━━━━━━━━━━━━━━━━━━━━\n"
|
|
f"요청 ID: {action_id}\n"
|
|
f"작업: {description}\n"
|
|
f"대상: {target}\n"
|
|
f"위험도: {risk}\n"
|
|
f"요청자: {requested_by}\n"
|
|
f"━━━━━━━━━━━━━━━━━━━━\n"
|
|
f"✅ 승인: /approve {action_id}\n"
|
|
f"❌ 거부: /reject {action_id} [사유]\n"
|
|
f"⏰ 미응답 시 30분 후 자동 에스컬레이션"
|
|
)
|
|
|
|
|
|
# ── 자율 처리 메인 루프 ──────────────────────────────────────────────────────
|
|
|
|
async def run_auto_processing_cycle(db: AsyncSession) -> dict:
|
|
"""
|
|
5분마다 스케줄러에서 호출되는 자동 처리 사이클.
|
|
Returns: 처리 결과 요약
|
|
"""
|
|
results = {
|
|
"auto_processed": [],
|
|
"approval_requested": [],
|
|
"skipped": [],
|
|
"errors": [],
|
|
"ran_at": datetime.now().isoformat(),
|
|
}
|
|
|
|
# 1. 신규 RECEIVED SR 자동 분류·배정
|
|
from models import SRRequest, SRStatus, SRType
|
|
q = await db.execute(
|
|
select(SRRequest).where(
|
|
SRRequest.status == SRStatus.RECEIVED,
|
|
SRRequest.assigned_to.is_(None),
|
|
).limit(20)
|
|
)
|
|
new_srs = q.scalars().all()
|
|
|
|
for sr in new_srs:
|
|
try:
|
|
r = await auto_classify_sr(db, sr.id)
|
|
if r["success"]:
|
|
results["auto_processed"].append(r)
|
|
# INQUIRY 타입이면 KB 자동 답변 시도
|
|
if sr.sr_type == SRType.INQUIRY:
|
|
r2 = await auto_answer_inquiry(db, sr.id)
|
|
if r2.get("success"):
|
|
results["auto_processed"].append(r2)
|
|
elif not r2.get("skip"):
|
|
results["errors"].append(r2)
|
|
except Exception as e:
|
|
results["errors"].append({"sr_id": getattr(sr, "sr_id", "?"), "error": str(e)[:80]})
|
|
|
|
# 2. SLA 임박 SR 자동 에스컬레이션
|
|
try:
|
|
escalated = await auto_escalate_sla(db)
|
|
results["auto_processed"].extend(escalated)
|
|
except Exception as e:
|
|
results["errors"].append({"action": "sla_escalate", "error": str(e)[:80]})
|
|
|
|
# 3. 완료된 SR 중 KB 초안 미생성 항목 처리 (최근 1시간 이내 완료)
|
|
try:
|
|
cutoff = datetime.now() - timedelta(hours=1)
|
|
q2 = await db.execute(
|
|
select(SRRequest).where(
|
|
SRRequest.status == SRStatus.COMPLETED,
|
|
SRRequest.updated_at >= cutoff,
|
|
).limit(5)
|
|
)
|
|
recent_done = q2.scalars().all()
|
|
for sr in recent_done:
|
|
r3 = await auto_draft_kb_article(db, sr.id)
|
|
if r3.get("success"):
|
|
results["auto_processed"].append(r3)
|
|
except Exception as e:
|
|
results["errors"].append({"action": "kb_draft", "error": str(e)[:80]})
|
|
|
|
# 결과 요약 로깅
|
|
logger.info(
|
|
"[AutoProcessor] 자동처리 %d건, 승인요청 %d건, 오류 %d건",
|
|
len(results["auto_processed"]),
|
|
len(results["approval_requested"]),
|
|
len(results["errors"]),
|
|
)
|
|
return results
|