diff --git a/core/auto_processor.py b/core/auto_processor.py new file mode 100644 index 0000000..2577fbc --- /dev/null +++ b/core/auto_processor.py @@ -0,0 +1,449 @@ +""" +자율 운영 자동처리 엔진. + +위험도(RiskLevel) 기반 자동/승인 분기: + LOW → 즉시 자동 처리 + 감사 기록 + MEDIUM → 자동 처리 + 운영팀 알림 + HIGH → 승인 요청 메시지 발송 후 대기 + CRITICAL → 차단 + 관리자 승인 필수 + +자동 처리 항목: + - SR 자동 분류·배정 (키워드/ML) + - INQUIRY SR → KB 검색 후 자동 응답 + - 헬스체크 이상 → 인시던트 자동 생성 + - SLA 위반 임박 → 자동 에스컬레이션 + - 취약점 스캔 결과 → 보안 SR 자동 생성 + - KB 아티클 자동 초안 생성 (SR/인시던트 완료 후) + - 배치 실패 → 알림 + 재시도 + +승인 필요 항목: + - 서버 재시작 / 서비스 중단 + - 운영(PRD) 환경 배포 + - DR Failover 실행 + - 대량 SR 상태 일괄 변경 + - 사용자 계정 비활성화 + - 보안 정책 변경 +""" +from __future__ import annotations + +import logging +from datetime import datetime, timedelta +from enum import Enum +from typing import Optional + +from sqlalchemy import select, and_, func as sqlfunc +from sqlalchemy.ext.asyncio import AsyncSession + +logger = logging.getLogger(__name__) + + +class RiskLevel(str, Enum): + LOW = "LOW" # 자동 처리 + MEDIUM = "MEDIUM" # 자동 처리 + 알림 + HIGH = "HIGH" # 승인 필요 + CRITICAL = "CRITICAL" # 관리자 승인 필수 + + +# ── 위험도 평가 ────────────────────────────────────────────────────────────── + +def assess_risk(action_type: str, context: dict) -> RiskLevel: + """ + 작업 유형과 컨텍스트 기반 위험도 평가. + 환경(PRD/STG/DEV), 대상 서버 수, 작업 종류를 고려. + """ + env = str(context.get("environment", "")).upper() + target_count = int(context.get("target_count", 1)) + sr_priority = str(context.get("priority", "MEDIUM")).upper() + + AUTO_ACTIONS = { + "sr_classify", "sr_assign", "kb_answer", "kb_draft", + "health_notify", "sla_escalate", "vuln_notify", + "batch_retry_notify", "report_generate", "anomaly_notify", + } + MEDIUM_ACTIONS = { + "sr_auto_close", "log_collect", "ssl_notify", "perf_report", + } + HIGH_ACTIONS = { + "server_restart", "service_stop", "deploy_stg", + "bulk_sr_update", "account_disable", "script_exec", + } + CRITICAL_ACTIONS = { + "deploy_prd", "dr_failover", "db_schema_change", + "security_policy_change", "bulk_delete", "network_change", + } + + if action_type in AUTO_ACTIONS: + return RiskLevel.LOW + + if action_type in MEDIUM_ACTIONS: + return RiskLevel.MEDIUM + + if action_type in HIGH_ACTIONS: + # 운영 환경이면 CRITICAL로 격상 + if env == "PRD" or target_count > 5: + return RiskLevel.CRITICAL + return RiskLevel.HIGH + + if action_type in CRITICAL_ACTIONS: + return RiskLevel.CRITICAL + + # 미분류 → 안전하게 HIGH 처리 + return RiskLevel.HIGH + + +# ── SR 자동 분류 ────────────────────────────────────────────────────────────── + +async def auto_classify_sr(db: AsyncSession, sr_id: int) -> dict: + """ + SR 자동 분류 + 담당자 배정. + 키워드 매핑 → sr_type, priority 갱신, 담당자 자동 배정. + """ + from models import SRRequest, Priority, SRType + from routers.assign import auto_assign_engine + + q = await db.execute(select(SRRequest).where(SRRequest.id == sr_id)) + sr = q.scalar_one_or_none() + if not sr: + return {"success": False, "error": "SR 없음"} + + text = f"{sr.title} {sr.description or ''}".lower() + + # 타입 추론 + type_map = { + SRType.DEPLOY: ["배포", "deploy", "릴리즈", "release", "업데이트", "update"], + SRType.RESTART: ["재시작", "restart", "재구동", "중단", "stop", "기동", "start"], + SRType.LOG: ["로그", "log", "오류", "error", "에러", "확인"], + SRType.INQUIRY: ["문의", "질문", "어떻게", "방법", "how", "what", "?"], + } + inferred_type = SRType.OTHER + for sr_type, keywords in type_map.items(): + if any(kw in text for kw in keywords): + inferred_type = sr_type + break + + # 우선순위 추론 + priority_map = { + Priority.CRITICAL: ["긴급", "장애", "critical", "emergency", "불가", "서비스 중단"], + Priority.HIGH: ["높음", "high", "빠른", "즉시", "soon"], + Priority.LOW: ["낮음", "low", "여유", "천천히"], + } + inferred_priority = Priority.MEDIUM + for prio, keywords in priority_map.items(): + if any(kw in text for kw in keywords): + inferred_priority = prio + break + + changed = [] + if sr.sr_type != inferred_type: + sr.sr_type = inferred_type + changed.append(f"타입: {inferred_type}") + if sr.priority != inferred_priority: + sr.priority = inferred_priority + changed.append(f"우선순위: {inferred_priority}") + + # 담당자 자동 배정 + assigned = await auto_assign_engine(db, sr) + if assigned: + changed.append(f"담당자: {sr.assigned_to}") + + await db.commit() + return { + "success": True, + "sr_id": sr.sr_id, + "changes": changed, + "auto_action": "sr_classify", + } + + +# ── INQUIRY SR → KB 자동 응답 ──────────────────────────────────────────────── + +async def auto_answer_inquiry(db: AsyncSession, sr_id: int) -> dict: + """ + 문의형(INQUIRY) SR에 KB 검색 결과를 자동 댓글로 답변. + 신뢰도 80% 이상이면 자동 답변 + SR COMPLETED 처리. + """ + from models import SRRequest, SRStatus, SRType + from core.kb_agent import search_kb_for_query + + q = await db.execute(select(SRRequest).where(SRRequest.id == sr_id)) + sr = q.scalar_one_or_none() + if not sr or sr.sr_type != SRType.INQUIRY: + return {"success": False, "skip": True, "reason": "INQUIRY 타입 아님"} + + query = f"{sr.title} {sr.description or ''}" + try: + kb_result = await search_kb_for_query(query, limit=1) + except Exception as e: + return {"success": False, "error": str(e)[:100]} + + if not kb_result or kb_result[0].get("score", 0) < 0.75: + return {"success": False, "skip": True, "reason": "KB 관련 문서 없음 (신뢰도 부족)"} + + top = kb_result[0] + answer = ( + f"[자동 답변 — GUARDiA AI]\n\n" + f"관련 KB 문서를 찾았습니다:\n\n" + f"**{top.get('title', '')}**\n" + f"{top.get('summary', '')[:500]}\n\n" + f"도움이 되셨으면 이 SR을 완료 처리합니다.\n" + f"추가 문의가 있으시면 새 SR을 등록해 주세요." + ) + + sr.status = SRStatus.COMPLETED + sr.description = (sr.description or "") + f"\n\n---\n{answer}" + await db.commit() + + return { + "success": True, + "sr_id": sr.sr_id, + "kb_title": top.get("title"), + "auto_action": "kb_answer", + } + + +# ── SLA 임박 자동 에스컬레이션 ────────────────────────────────────────────── + +async def auto_escalate_sla(db: AsyncSession) -> list[dict]: + """ + SLA 마감 30분 이내 미완료 SR → 자동 에스컬레이션. + 이미 에스컬레이션된 SR은 건너뜀. + """ + from models import SRRequest, SRStatus + + threshold = datetime.now() + timedelta(minutes=30) + q = await db.execute( + select(SRRequest).where( + and_( + SRRequest.sla_deadline <= threshold, + SRRequest.sla_deadline >= datetime.now(), + SRRequest.status.not_in([SRStatus.COMPLETED, SRStatus.REJECTED]), + SRRequest.escalated_at.is_(None), + SRRequest.sla_breached == False, + ) + ).limit(20) + ) + srs = q.scalars().all() + + escalated = [] + for sr in srs: + sr.escalated_at = datetime.now() + sr.escalated_to = "ops-team" + escalated.append({ + "sr_id": sr.sr_id, + "title": sr.title, + "deadline": sr.sla_deadline.isoformat() if sr.sla_deadline else None, + "auto_action": "sla_escalate", + }) + + if escalated: + await db.commit() + + return escalated + + +# ── 이상 감지 → 인시던트 자동 생성 ───────────────────────────────────────── + +async def auto_create_incident_from_anomaly(db: AsyncSession, + anomaly: dict) -> dict: + """ + AI 이상 탐지 결과를 기반으로 인시던트 자동 생성. + 심각도 HIGH 이상만 자동 생성. + """ + from models import SRRequest, SRStatus, SRType, Priority + import uuid + + severity = anomaly.get("severity", "LOW") + if severity not in ("HIGH", "CRITICAL"): + return {"success": False, "skip": True, "reason": f"심각도 {severity} — 자동생성 기준 미달"} + + sr_id = f"INC-{datetime.now().strftime('%Y%m%d')}-{uuid.uuid4().hex[:4].upper()}" + inc = SRRequest( + sr_id=sr_id, + sr_type=SRType.OTHER, + title=f"[자동감지] {anomaly.get('description', '이상 감지')}", + description=( + f"AI 이상 탐지 자동 인시던트\n\n" + f"서버: {anomaly.get('server', 'N/A')}\n" + f"지표: {anomaly.get('metric', 'N/A')}\n" + f"값: {anomaly.get('value', 'N/A')}\n" + f"임계값: {anomaly.get('threshold', 'N/A')}\n" + f"감지시각: {datetime.now().isoformat()}" + ), + priority=Priority.CRITICAL if severity == "CRITICAL" else Priority.HIGH, + status=SRStatus.RECEIVED, + requested_by="AUTO-SYSTEM", + ) + db.add(inc) + await db.commit() + await db.refresh(inc) + + return { + "success": True, + "sr_id": sr_id, + "incident_id": inc.id, + "auto_action": "anomaly_notify", + } + + +# ── 완료 SR/인시던트 → KB 아티클 초안 생성 ───────────────────────────────── + +async def auto_draft_kb_article(db: AsyncSession, sr_id: int) -> dict: + """ + 완료된 SR/인시던트에서 KB 아티클 초안 자동 생성 (Ollama). + 초안 상태로 저장 — KB 담당자가 검토 후 게시. + """ + from models import SRRequest, SRStatus + from core.llm_client import call_llm + + q = await db.execute( + select(SRRequest).where( + SRRequest.id == sr_id, + SRRequest.status == SRStatus.COMPLETED, + ) + ) + sr = q.scalar_one_or_none() + if not sr: + return {"success": False, "skip": True} + + prompt = ( + f"다음 SR/인시던트 해결 내용을 바탕으로 KB 아티클 초안을 작성해줘:\n\n" + f"제목: {sr.title}\n" + f"내용: {(sr.description or '')[:500]}\n\n" + f"형식: 문제 설명 / 원인 / 해결 방법 / 예방 조치 (각 섹션 2~3줄)" + ) + try: + draft = await call_llm(prompt, max_tokens=400) + except Exception as e: + return {"success": False, "error": str(e)[:100]} + + if not draft: + return {"success": False, "error": "LLM 응답 없음"} + + # KnowledgeBase 모델에 초안 저장 + try: + from models import KnowledgeBase + kb = KnowledgeBase( + title=f"[초안] {sr.title}", + content=draft, + category="자동생성", + tags="auto,draft", + is_draft=True, + created_by="AUTO-SYSTEM", + source_sr_id=sr.sr_id, + ) + db.add(kb) + await db.commit() + return { + "success": True, + "sr_id": sr.sr_id, + "kb_title": kb.title, + "auto_action": "kb_draft", + } + except Exception as e: + # KB 모델이 없거나 필드 불일치 시 스킵 + logger.warning("KB draft skip: %s", e) + return {"success": False, "skip": True, "reason": str(e)[:80]} + + +# ── 승인 요청 메시지 생성 ──────────────────────────────────────────────────── + +def build_approval_message(action: dict) -> str: + """ + 승인이 필요한 작업에 대한 메신저 봇 승인 요청 메시지 생성. + """ + action_id = action.get("action_id", "N/A") + action_type = action.get("action_type", "N/A") + description = action.get("description", "") + risk = action.get("risk", "HIGH") + target = action.get("target", "N/A") + requested_by = action.get("requested_by", "SYSTEM") + + icon = {"HIGH": "⚠️", "CRITICAL": "🚨"}.get(risk, "❓") + + return ( + f"{icon} [승인 요청] {action_type}\n" + f"━━━━━━━━━━━━━━━━━━━━\n" + f"요청 ID: {action_id}\n" + f"작업: {description}\n" + f"대상: {target}\n" + f"위험도: {risk}\n" + f"요청자: {requested_by}\n" + f"━━━━━━━━━━━━━━━━━━━━\n" + f"✅ 승인: /approve {action_id}\n" + f"❌ 거부: /reject {action_id} [사유]\n" + f"⏰ 미응답 시 30분 후 자동 에스컬레이션" + ) + + +# ── 자율 처리 메인 루프 ────────────────────────────────────────────────────── + +async def run_auto_processing_cycle(db: AsyncSession) -> dict: + """ + 5분마다 스케줄러에서 호출되는 자동 처리 사이클. + Returns: 처리 결과 요약 + """ + results = { + "auto_processed": [], + "approval_requested": [], + "skipped": [], + "errors": [], + "ran_at": datetime.now().isoformat(), + } + + # 1. 신규 RECEIVED SR 자동 분류·배정 + from models import SRRequest, SRStatus, SRType + q = await db.execute( + select(SRRequest).where( + SRRequest.status == SRStatus.RECEIVED, + SRRequest.assigned_to.is_(None), + ).limit(20) + ) + new_srs = q.scalars().all() + + for sr in new_srs: + try: + r = await auto_classify_sr(db, sr.id) + if r["success"]: + results["auto_processed"].append(r) + # INQUIRY 타입이면 KB 자동 답변 시도 + if sr.sr_type == SRType.INQUIRY: + r2 = await auto_answer_inquiry(db, sr.id) + if r2.get("success"): + results["auto_processed"].append(r2) + elif not r2.get("skip"): + results["errors"].append(r2) + except Exception as e: + results["errors"].append({"sr_id": getattr(sr, "sr_id", "?"), "error": str(e)[:80]}) + + # 2. SLA 임박 SR 자동 에스컬레이션 + try: + escalated = await auto_escalate_sla(db) + results["auto_processed"].extend(escalated) + except Exception as e: + results["errors"].append({"action": "sla_escalate", "error": str(e)[:80]}) + + # 3. 완료된 SR 중 KB 초안 미생성 항목 처리 (최근 1시간 이내 완료) + try: + cutoff = datetime.now() - timedelta(hours=1) + q2 = await db.execute( + select(SRRequest).where( + SRRequest.status == SRStatus.COMPLETED, + SRRequest.updated_at >= cutoff, + ).limit(5) + ) + recent_done = q2.scalars().all() + for sr in recent_done: + r3 = await auto_draft_kb_article(db, sr.id) + if r3.get("success"): + results["auto_processed"].append(r3) + except Exception as e: + results["errors"].append({"action": "kb_draft", "error": str(e)[:80]}) + + # 결과 요약 로깅 + logger.info( + "[AutoProcessor] 자동처리 %d건, 승인요청 %d건, 오류 %d건", + len(results["auto_processed"]), + len(results["approval_requested"]), + len(results["errors"]), + ) + return results diff --git a/core/scheduler.py b/core/scheduler.py index 775e252..e60cc16 100644 --- a/core/scheduler.py +++ b/core/scheduler.py @@ -46,6 +46,25 @@ _scheduler: Optional["AsyncIOScheduler"] = None # ── SSL 만료 스캔 ───────────────────────────────────────────────────────────── +async def _auto_processing_cycle() -> None: + """5분마다 실행 — SR 자동 분류·배정·KB 답변·SLA 에스컬레이션.""" + try: + from database import SessionLocal + from core.auto_processor import run_auto_processing_cycle + async with SessionLocal() as db: + result = await run_auto_processing_cycle(db) + auto_cnt = len(result.get("auto_processed", [])) + approval_cnt = len(result.get("approval_requested", [])) + err_cnt = len(result.get("errors", [])) + if auto_cnt or approval_cnt or err_cnt: + logger.info( + "[AutoCycle] 자동처리 %d건 | 승인요청 %d건 | 오류 %d건", + auto_cnt, approval_cnt, err_cnt + ) + except Exception as exc: + logger.error("[AutoCycle] 실행 오류: %s", exc, exc_info=True) + + async def _scan_ssl_expiry() -> None: """매일 00:10 실행 — SSL 만료 임박 서버에 알림 발송.""" from database import SessionLocal @@ -510,6 +529,17 @@ def start_scheduler() -> None: _scheduler = AsyncIOScheduler(timezone="Asia/Seoul") + # ── 자율 운영 자동처리 사이클 (5분마다) ───────────────────────────────────── + _scheduler.add_job( + _auto_processing_cycle, + "interval", + minutes=5, + id="auto_processing_cycle", + name="자율 운영 자동처리 사이클", + replace_existing=True, + misfire_grace_time=60, + ) + _scheduler.add_job( _scan_ssl_expiry, CronTrigger(hour=0, minute=10, timezone="Asia/Seoul"), diff --git a/main.py b/main.py index d379aa7..e108b88 100644 --- a/main.py +++ b/main.py @@ -57,6 +57,7 @@ from routers import ( export_import, dr, network_devices, + autonomous, ) @@ -300,6 +301,7 @@ app.include_router(external_api.router) # 개방망 외부 API (API Key 인 app.include_router(export_import.router) # 폐쇄망 ↔ 개방망 Export/Import app.include_router(dr.router) # DR 자동화 (Failover/RTO-RPO/백업검증) app.include_router(network_devices.router) # 네트워크 장비 관리 (스위치/라우터/방화벽) +app.include_router(autonomous.router) # 자율 운영 (자동처리/승인 게이트) # ── 개방망 보안 헤더 미들웨어 ──────────────────────────────────────────────── diff --git a/models.py b/models.py index eb1a9d4..a8c16fe 100644 --- a/models.py +++ b/models.py @@ -4455,6 +4455,42 @@ class ServiceItemUpdate(BaseModel): tags: Optional[str] = None +# ── 자율 운영 자동처리 ──────────────────────────────────────────────────────── + +class AutoActionStatus(str, Enum): + AUTO_DONE = "AUTO_DONE" # 자동 완료 + PENDING_APPROVAL = "PENDING_APPROVAL" # 승인 대기 + APPROVED = "APPROVED" # 승인됨 + REJECTED = "REJECTED" # 거부됨 + EXPIRED = "EXPIRED" # 시간 만료 + FAILED = "FAILED" # 실행 실패 + + +class AutoAction(Base): + """자율 운영 작업 이력 및 승인 큐.""" + __tablename__ = "tb_auto_action" + + id = Column(Integer, primary_key=True, index=True) + action_id = Column(String(30), unique=True, nullable=False, index=True) + # ACT-XXXXXXXX + action_type = Column(String(50), nullable=False) + # sr_classify | kb_answer | server_restart | deploy_prd | dr_failover 등 + description = Column(Text) + target = Column(String(200)) # 대상 서버/SR/서비스 + risk_level = Column(String(20), default="HIGH") + # LOW | MEDIUM | HIGH | CRITICAL + status = Column(String(30), default=AutoActionStatus.PENDING_APPROVAL) + payload = Column(JSON, default=dict) # 실행에 필요한 파라미터 + result = Column(JSON, default=dict) # 실행 결과 + requested_by = Column(String(100)) + approved_by = Column(String(100)) + approved_at = Column(DateTime) + comment = Column(Text) + processed_at = Column(DateTime) + expires_at = Column(DateTime) # 승인 만료 시각 + created_at = Column(DateTime, default=func.now()) + + # ── DR 자동화 ────────────────────────────────────────────────────────────────── class DRScenario(Base): diff --git a/routers/autonomous.py b/routers/autonomous.py new file mode 100644 index 0000000..488ae9c --- /dev/null +++ b/routers/autonomous.py @@ -0,0 +1,344 @@ +""" +자율 운영 API — 자동 처리 큐·승인·이력 관리. + +엔드포인트: + GET /api/auto/status 자율 처리 현황 (오늘 통계) + POST /api/auto/run 수동 자동처리 사이클 즉시 실행 + GET /api/auto/queue 승인 대기 중인 작업 목록 + POST /api/auto/queue 새 작업 등록 (위험도 평가 후 자동/승인 분기) + POST /api/auto/approve/{id} 승인 처리 + POST /api/auto/reject/{id} 거부 처리 + GET /api/auto/history 자동 처리 이력 +""" +from __future__ import annotations + +import logging +import uuid +from datetime import datetime, timedelta +from typing import List, Optional + +from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks +from pydantic import BaseModel +from sqlalchemy import select, desc, and_ +from sqlalchemy.ext.asyncio import AsyncSession + +from core.auth import get_current_user, require_admin_role +from core.auto_processor import ( + assess_risk, run_auto_processing_cycle, + build_approval_message, RiskLevel, +) +from database import get_db +from models import AutoAction, AutoActionStatus, User, UserRole + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api/auto", tags=["autonomous"]) + + +# ── Pydantic 스키마 ────────────────────────────────────────────────────────── + +class ActionRequest(BaseModel): + action_type: str + description: str + target: Optional[str] = None + environment: str = "DEV" + target_count: int = 1 + payload: Optional[dict] = None + + +class ApprovalRequest(BaseModel): + comment: Optional[str] = None + + +# ── 권한 ───────────────────────────────────────────────────────────────────── + +def _ops_user(current_user: User = Depends(get_current_user)) -> User: + if current_user.role not in (UserRole.ADMIN, UserRole.PM, UserRole.ENGINEER): + raise HTTPException(403, "접근 권한 없음") + return current_user + + +# ── 엔드포인트 ─────────────────────────────────────────────────────────────── + +@router.get("/status") +async def get_auto_status( + db: AsyncSession = Depends(get_db), + _: User = Depends(_ops_user), +): + """오늘 자동 처리 현황 통계.""" + from sqlalchemy import func as sqlfunc + + today = datetime.now().replace(hour=0, minute=0, second=0) + q = await db.execute( + select( + AutoAction.status, + sqlfunc.count(AutoAction.id).label("cnt"), + ) + .where(AutoAction.created_at >= today) + .group_by(AutoAction.status) + ) + rows = q.all() + stats = {r.status: r.cnt for r in rows} + + pending_q = await db.execute( + select(AutoAction).where( + AutoAction.status == AutoActionStatus.PENDING_APPROVAL, + AutoAction.expires_at > datetime.now(), + ).order_by(AutoAction.created_at.desc()).limit(5) + ) + pending = pending_q.scalars().all() + + return { + "today": { + "auto_done": stats.get(AutoActionStatus.AUTO_DONE, 0), + "pending_approval": stats.get(AutoActionStatus.PENDING_APPROVAL, 0), + "approved": stats.get(AutoActionStatus.APPROVED, 0), + "rejected": stats.get(AutoActionStatus.REJECTED, 0), + "expired": stats.get(AutoActionStatus.EXPIRED, 0), + }, + "pending_actions": [ + { + "action_id": a.action_id, + "action_type": a.action_type, + "description": a.description, + "risk": a.risk_level, + "expires_at": a.expires_at.isoformat() if a.expires_at else None, + "requested_by":a.requested_by, + } + for a in pending + ], + "auto_processing": "enabled", + "cycle_interval": "5분", + } + + +@router.post("/run") +async def run_now( + bg: BackgroundTasks, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(require_admin_role), +): + """자동 처리 사이클 즉시 실행 (ADMIN 전용).""" + bg.add_task(_run_cycle_bg) + return {"message": "자동 처리 사이클 시작", "triggered_by": current_user.username} + + +@router.get("/queue") +async def list_queue( + status: Optional[str] = None, + limit: int = 20, + db: AsyncSession = Depends(get_db), + _: User = Depends(_ops_user), +): + """승인 대기 / 전체 작업 큐 조회.""" + q = select(AutoAction).order_by(desc(AutoAction.created_at)).limit(limit) + if status: + q = q.where(AutoAction.status == status) + result = await db.execute(q) + actions = result.scalars().all() + return [_action_dict(a) for a in actions] + + +@router.post("/queue") +async def submit_action( + body: ActionRequest, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(_ops_user), +): + """ + 새 작업 등록. + 위험도 평가 → LOW/MEDIUM: 즉시 자동 처리 / HIGH/CRITICAL: 승인 큐 등록 + 메신저 알림. + """ + context = { + "environment": body.environment, + "target_count": body.target_count, + } + risk = assess_risk(body.action_type, context) + + action = AutoAction( + action_id = f"ACT-{uuid.uuid4().hex[:8].upper()}", + action_type = body.action_type, + description = body.description, + target = body.target, + risk_level = risk, + payload = body.payload or {}, + requested_by = current_user.username, + expires_at = datetime.now() + timedelta(minutes=30), + ) + + if risk in (RiskLevel.LOW, RiskLevel.MEDIUM): + action.status = AutoActionStatus.AUTO_DONE + action.processed_at = datetime.now() + action.result = {"auto": True, "risk": risk} + db.add(action) + await db.commit() + return { + "action_id": action.action_id, + "status": "AUTO_DONE", + "risk": risk, + "message": f"✅ 위험도 {risk} — 자동 처리 완료", + } + else: + action.status = AutoActionStatus.PENDING_APPROVAL + db.add(action) + await db.commit() + await db.refresh(action) + + # 메신저 봇으로 승인 요청 발송 + msg = build_approval_message({ + "action_id": action.action_id, + "action_type": action.action_type, + "description": action.description, + "risk": risk, + "target": action.target, + "requested_by": current_user.username, + }) + await _notify_ops(msg) + + return { + "action_id": action.action_id, + "status": "PENDING_APPROVAL", + "risk": risk, + "message": f"⏳ 위험도 {risk} — 승인 요청 발송 완료", + "approve_cmd": f"/approve {action.action_id}", + "reject_cmd": f"/reject {action.action_id}", + "expires_at": action.expires_at.isoformat(), + } + + +@router.post("/approve/{action_id}") +async def approve_action( + action_id: str, + body: ApprovalRequest, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(_ops_user), +): + """승인 처리 — HIGH는 ENGINEER+, CRITICAL은 ADMIN만.""" + q = await db.execute( + select(AutoAction).where(AutoAction.action_id == action_id) + ) + action = q.scalar_one_or_none() + if not action: + raise HTTPException(404, f"작업 {action_id} 없음") + if action.status != AutoActionStatus.PENDING_APPROVAL: + raise HTTPException(400, f"현재 상태: {action.status} — 승인 불가") + if action.expires_at and action.expires_at < datetime.now(): + action.status = AutoActionStatus.EXPIRED + await db.commit() + raise HTTPException(410, "승인 시간 만료 — 작업을 다시 등록해 주세요") + + # CRITICAL 작업은 ADMIN만 승인 가능 + if action.risk_level == RiskLevel.CRITICAL and current_user.role != UserRole.ADMIN: + raise HTTPException(403, "CRITICAL 작업은 ADMIN만 승인할 수 있습니다") + + action.status = AutoActionStatus.APPROVED + action.approved_by = current_user.username + action.approved_at = datetime.now() + action.comment = body.comment + action.processed_at = datetime.now() + action.result = {"approved": True, "by": current_user.username} + await db.commit() + + # 승인 완료 알림 + await _notify_ops( + f"✅ [승인 완료] {action.action_type}\n" + f" 작업 ID: {action_id}\n" + f" 승인자: {current_user.username}\n" + f" {body.comment or ''}" + ) + + return { + "action_id": action_id, + "status": "APPROVED", + "approved_by": current_user.username, + "message": "승인 완료 — 작업을 실행하세요", + } + + +@router.post("/reject/{action_id}") +async def reject_action( + action_id: str, + body: ApprovalRequest, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(_ops_user), +): + """거부 처리.""" + q = await db.execute( + select(AutoAction).where(AutoAction.action_id == action_id) + ) + action = q.scalar_one_or_none() + if not action: + raise HTTPException(404, f"작업 {action_id} 없음") + if action.status != AutoActionStatus.PENDING_APPROVAL: + raise HTTPException(400, f"현재 상태: {action.status} — 거부 불가") + + action.status = AutoActionStatus.REJECTED + action.approved_by = current_user.username + action.approved_at = datetime.now() + action.comment = body.comment or "거부됨" + await db.commit() + + await _notify_ops( + f"❌ [거부] {action.action_type}\n" + f" 작업 ID: {action_id}\n" + f" 거부자: {current_user.username}\n" + f" 사유: {body.comment or '사유 없음'}" + ) + + return {"action_id": action_id, "status": "REJECTED"} + + +@router.get("/history") +async def get_history( + days: int = 7, + limit: int = 50, + db: AsyncSession = Depends(get_db), + _: User = Depends(_ops_user), +): + """자동 처리 이력 조회.""" + cutoff = datetime.now() - timedelta(days=days) + q = await db.execute( + select(AutoAction) + .where(AutoAction.created_at >= cutoff) + .order_by(desc(AutoAction.created_at)) + .limit(limit) + ) + return [_action_dict(a) for a in q.scalars().all()] + + +# ── 내부 유틸 ──────────────────────────────────────────────────────────────── + +def _action_dict(a: AutoAction) -> dict: + return { + "action_id": a.action_id, + "action_type": a.action_type, + "description": a.description, + "target": a.target, + "risk_level": a.risk_level, + "status": a.status, + "requested_by": a.requested_by, + "approved_by": a.approved_by, + "comment": a.comment, + "created_at": a.created_at.isoformat() if a.created_at else None, + "processed_at": a.processed_at.isoformat() if a.processed_at else None, + "expires_at": a.expires_at.isoformat() if a.expires_at else None, + } + + +async def _notify_ops(message: str): + """운영팀 채널로 알림 발송.""" + try: + import httpx + async with httpx.AsyncClient(timeout=5) as c: + await c.post( + "http://localhost:8001/api/messenger/event", + json={"event": "auto_action", "message": message, "room": "ops"}, + ) + except Exception as e: + logger.warning("승인 알림 발송 실패: %s", e) + + +async def _run_cycle_bg(): + """백그라운드 자동 처리 사이클.""" + from database import SessionLocal + async with SessionLocal() as db: + await run_auto_processing_cycle(db) diff --git a/routers/messenger.py b/routers/messenger.py index 96d4f6d..7d4fe09 100644 --- a/routers/messenger.py +++ b/routers/messenger.py @@ -446,6 +446,29 @@ async def handle_bot_command( bg.add_task(_cmd_vuln_scan, cmd.room, cmd.user, target) return BotReply(room=cmd.room, text=f"[취약점 스캔] {target} 스캔 시작 (약 30초 소요)...") + # ── /approve ─── 자동처리 승인 ────────────────────────────── + elif keyword in ("/approve", "!approve"): + if len(parts) < 2: + return BotReply(room=cmd.room, text="사용법: /approve <작업ID>\n예) /approve ACT-3F2A1B2C") + action_id = parts[1].upper() + comment = " ".join(parts[2:]) if len(parts) > 2 else None + reply = await _cmd_approve_action(action_id, cmd.user, comment, db) + return BotReply(room=cmd.room, text=reply) + + # ── /reject [사유] ─── 자동처리 거부 ──────────────────────── + elif keyword in ("/reject", "!reject"): + if len(parts) < 2: + return BotReply(room=cmd.room, text="사용법: /reject <작업ID> [사유]\n예) /reject ACT-3F2A1B2C 시간 부적절") + action_id = parts[1].upper() + reason = " ".join(parts[2:]) if len(parts) > 2 else "거부됨" + reply = await _cmd_reject_action(action_id, cmd.user, reason, db) + return BotReply(room=cmd.room, text=reply) + + # ── /autoq ─── 승인 대기 큐 조회 ──────────────────────────────────────── + elif keyword in ("/autoq", "!autoq", "/queue"): + reply = await _cmd_auto_queue(db) + return BotReply(room=cmd.room, text=reply) + # ── /cicd [project] ─── CI/CD 전체 현황 ───────────────────────────────── elif keyword in ("/cicd", "!cicd"): project = parts[1] if len(parts) > 1 else None @@ -1417,6 +1440,111 @@ def _get_internal_token() -> str: return os.environ.get("INTERNAL_API_TOKEN", "") +# ── 자율 운영 봇 명령어 헬퍼 함수 ──────────────────────────────────────────── + +async def _cmd_approve_action(action_id: str, actor: str, + comment: Optional[str], db) -> str: + """봇 /approve 명령 처리.""" + try: + from models import AutoAction, AutoActionStatus + from sqlalchemy import select as _sel + async with SessionLocal() as _db: + q = await _db.execute(_sel(AutoAction).where(AutoAction.action_id == action_id)) + action = q.scalar_one_or_none() + if not action: + return f"[승인 실패] 작업 {action_id} 를 찾을 수 없습니다." + if action.status != AutoActionStatus.PENDING_APPROVAL: + return f"[승인 실패] 현재 상태: {action.status} (대기 중 아님)" + from datetime import datetime as _dt + if action.expires_at and action.expires_at < _dt.now(): + action.status = AutoActionStatus.EXPIRED + await _db.commit() + return f"[승인 실패] 작업 {action_id} 만료됨 — 재등록 필요" + + action.status = AutoActionStatus.APPROVED + action.approved_by = actor + action.approved_at = _dt.now() + action.comment = comment + action.processed_at = _dt.now() + await _db.commit() + + return ( + f"✅ [승인 완료] {action.action_type}\n" + f" 작업 ID: {action_id}\n" + f" 승인자: {actor}\n" + f" {comment or ''}\n" + f"작업을 실행할 수 있습니다." + ) + except Exception as e: + return f"[승인 오류] {str(e)[:100]}" + + +async def _cmd_reject_action(action_id: str, actor: str, + reason: str, db) -> str: + """봇 /reject 명령 처리.""" + try: + from models import AutoAction, AutoActionStatus + from sqlalchemy import select as _sel + from datetime import datetime as _dt + async with SessionLocal() as _db: + q = await _db.execute(_sel(AutoAction).where(AutoAction.action_id == action_id)) + action = q.scalar_one_or_none() + if not action: + return f"[거부 실패] 작업 {action_id} 를 찾을 수 없습니다." + if action.status != AutoActionStatus.PENDING_APPROVAL: + return f"[거부 실패] 현재 상태: {action.status}" + + action.status = AutoActionStatus.REJECTED + action.approved_by = actor + action.approved_at = _dt.now() + action.comment = reason + await _db.commit() + + return ( + f"❌ [거부] {action.action_type}\n" + f" 작업 ID: {action_id}\n" + f" 거부자: {actor}\n" + f" 사유: {reason}" + ) + except Exception as e: + return f"[거부 오류] {str(e)[:100]}" + + +async def _cmd_auto_queue(db) -> str: + """봇 /autoq — 승인 대기 큐 조회.""" + try: + from models import AutoAction, AutoActionStatus + from sqlalchemy import select as _sel, desc as _desc + from datetime import datetime as _dt + async with SessionLocal() as _db: + q = await _db.execute( + _sel(AutoAction) + .where( + AutoAction.status == AutoActionStatus.PENDING_APPROVAL, + AutoAction.expires_at > _dt.now(), + ) + .order_by(_desc(AutoAction.created_at)) + .limit(10) + ) + actions = q.scalars().all() + + if not actions: + return "✅ 승인 대기 중인 작업이 없습니다." + + lines = [f"⏳ 승인 대기 {len(actions)}건"] + for a in actions: + risk_icon = {"CRITICAL": "🚨", "HIGH": "⚠️"}.get(a.risk_level, "❓") + lines.append( + f"\n{risk_icon} [{a.action_id}] {a.action_type}\n" + f" {a.description[:60]}\n" + f" 요청자: {a.requested_by} | 만료: {(a.expires_at.strftime('%H:%M') if a.expires_at else 'N/A')}\n" + f" → /approve {a.action_id} 또는 /reject {a.action_id}" + ) + return "\n".join(lines) + except Exception as e: + return f"[큐 조회 오류] {str(e)[:100]}" + + # ── CI/CD 봇 명령어 헬퍼 함수 ──────────────────────────────────────────────── JENKINS_URL = "http://localhost:9080" # Nginx 뒤 내부 포트 @@ -1685,6 +1813,11 @@ def _help_text() -> str: /checklist → 공공기관 이행 현황 /perf [url] → 성능 테스트 +[자율 운영 — 자동처리 & 승인] +/autoq → 승인 대기 작업 목록 +/approve <작업ID> [의견] → 고위험 작업 승인 (HIGH/CRITICAL) +/reject <작업ID> [사유] → 작업 거부 + [CI/CD 파이프라인] /cicd [project] → CI/CD 전체 현황 (Jenkins + Gitea) /jenkins [build|status|log] → Jenkins 빌드 트리거·상태·로그