""" 자율 운영 API — 자동 처리 큐·승인·이력 관리. 엔드포인트: GET /api/auto/status 자율 처리 현황 (오늘 통계) POST /api/auto/run 수동 자동처리 사이클 즉시 실행 GET /api/auto/queue 승인 대기 중인 작업 목록 POST /api/auto/queue 새 작업 등록 (위험도 평가 후 자동/승인 분기) POST /api/auto/approve/{id} 승인 처리 POST /api/auto/reject/{id} 거부 처리 GET /api/auto/history 자동 처리 이력 """ from __future__ import annotations import logging import uuid from datetime import datetime, timedelta from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks from pydantic import BaseModel from sqlalchemy import select, desc, and_ from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user, require_admin_role from core.auto_processor import ( assess_risk, run_auto_processing_cycle, build_approval_message, RiskLevel, ) from database import get_db from models import AutoAction, AutoActionStatus, User, UserRole logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/auto", tags=["autonomous"]) # ── Pydantic 스키마 ────────────────────────────────────────────────────────── class ActionRequest(BaseModel): action_type: str description: str target: Optional[str] = None environment: str = "DEV" target_count: int = 1 payload: Optional[dict] = None class ApprovalRequest(BaseModel): comment: Optional[str] = None # ── 권한 ───────────────────────────────────────────────────────────────────── def _ops_user(current_user: User = Depends(get_current_user)) -> User: if current_user.role not in (UserRole.ADMIN, UserRole.PM, UserRole.ENGINEER): raise HTTPException(403, "접근 권한 없음") return current_user # ── 엔드포인트 ─────────────────────────────────────────────────────────────── @router.get("/status") async def get_auto_status( db: AsyncSession = Depends(get_db), _: User = Depends(_ops_user), ): """오늘 자동 처리 현황 통계.""" from sqlalchemy import func as sqlfunc today = datetime.now().replace(hour=0, minute=0, second=0) q = await db.execute( select( AutoAction.status, sqlfunc.count(AutoAction.id).label("cnt"), ) .where(AutoAction.created_at >= today) .group_by(AutoAction.status) ) rows = q.all() stats = {r.status: r.cnt for r in rows} pending_q = await db.execute( select(AutoAction).where( AutoAction.status == AutoActionStatus.PENDING_APPROVAL, AutoAction.expires_at > datetime.now(), ).order_by(AutoAction.created_at.desc()).limit(5) ) pending = pending_q.scalars().all() return { "today": { "auto_done": stats.get(AutoActionStatus.AUTO_DONE, 0), "pending_approval": stats.get(AutoActionStatus.PENDING_APPROVAL, 0), "approved": stats.get(AutoActionStatus.APPROVED, 0), "rejected": stats.get(AutoActionStatus.REJECTED, 0), "expired": stats.get(AutoActionStatus.EXPIRED, 0), }, "pending_actions": [ { "action_id": a.action_id, "action_type": a.action_type, "description": a.description, "risk": a.risk_level, "expires_at": a.expires_at.isoformat() if a.expires_at else None, "requested_by":a.requested_by, } for a in pending ], "auto_processing": "enabled", "cycle_interval": "5분", } @router.post("/run") async def run_now( bg: BackgroundTasks, db: AsyncSession = Depends(get_db), current_user: User = Depends(require_admin_role), ): """자동 처리 사이클 즉시 실행 (ADMIN 전용).""" bg.add_task(_run_cycle_bg) return {"message": "자동 처리 사이클 시작", "triggered_by": current_user.username} @router.get("/queue") async def list_queue( status: Optional[str] = None, limit: int = 20, db: AsyncSession = Depends(get_db), _: User = Depends(_ops_user), ): """승인 대기 / 전체 작업 큐 조회.""" q = select(AutoAction).order_by(desc(AutoAction.created_at)).limit(limit) if status: q = q.where(AutoAction.status == status) result = await db.execute(q) actions = result.scalars().all() return [_action_dict(a) for a in actions] @router.post("/queue") async def submit_action( body: ActionRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(_ops_user), ): """ 새 작업 등록. 위험도 평가 → LOW/MEDIUM: 즉시 자동 처리 / HIGH/CRITICAL: 승인 큐 등록 + 메신저 알림. """ context = { "environment": body.environment, "target_count": body.target_count, } risk = assess_risk(body.action_type, context) action = AutoAction( action_id = f"ACT-{uuid.uuid4().hex[:8].upper()}", action_type = body.action_type, description = body.description, target = body.target, risk_level = risk, payload = body.payload or {}, requested_by = current_user.username, expires_at = datetime.now() + timedelta(minutes=30), ) if risk in (RiskLevel.LOW, RiskLevel.MEDIUM): action.status = AutoActionStatus.AUTO_DONE action.processed_at = datetime.now() action.result = {"auto": True, "risk": risk} db.add(action) await db.commit() return { "action_id": action.action_id, "status": "AUTO_DONE", "risk": risk, "message": f"✅ 위험도 {risk} — 자동 처리 완료", } else: action.status = AutoActionStatus.PENDING_APPROVAL db.add(action) await db.commit() await db.refresh(action) # 메신저 봇으로 승인 요청 발송 msg = build_approval_message({ "action_id": action.action_id, "action_type": action.action_type, "description": action.description, "risk": risk, "target": action.target, "requested_by": current_user.username, }) await _notify_ops(msg) return { "action_id": action.action_id, "status": "PENDING_APPROVAL", "risk": risk, "message": f"⏳ 위험도 {risk} — 승인 요청 발송 완료", "approve_cmd": f"/approve {action.action_id}", "reject_cmd": f"/reject {action.action_id}", "expires_at": action.expires_at.isoformat(), } @router.post("/approve/{action_id}") async def approve_action( action_id: str, body: ApprovalRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(_ops_user), ): """승인 처리 — HIGH는 ENGINEER+, CRITICAL은 ADMIN만.""" q = await db.execute( select(AutoAction).where(AutoAction.action_id == action_id) ) action = q.scalar_one_or_none() if not action: raise HTTPException(404, f"작업 {action_id} 없음") if action.status != AutoActionStatus.PENDING_APPROVAL: raise HTTPException(400, f"현재 상태: {action.status} — 승인 불가") if action.expires_at and action.expires_at < datetime.now(): action.status = AutoActionStatus.EXPIRED await db.commit() raise HTTPException(410, "승인 시간 만료 — 작업을 다시 등록해 주세요") # CRITICAL 작업은 ADMIN만 승인 가능 if action.risk_level == RiskLevel.CRITICAL and current_user.role != UserRole.ADMIN: raise HTTPException(403, "CRITICAL 작업은 ADMIN만 승인할 수 있습니다") action.status = AutoActionStatus.APPROVED action.approved_by = current_user.username action.approved_at = datetime.now() action.comment = body.comment action.processed_at = datetime.now() action.result = {"approved": True, "by": current_user.username} await db.commit() # 승인 완료 알림 await _notify_ops( f"✅ [승인 완료] {action.action_type}\n" f" 작업 ID: {action_id}\n" f" 승인자: {current_user.username}\n" f" {body.comment or ''}" ) return { "action_id": action_id, "status": "APPROVED", "approved_by": current_user.username, "message": "승인 완료 — 작업을 실행하세요", } @router.post("/reject/{action_id}") async def reject_action( action_id: str, body: ApprovalRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(_ops_user), ): """거부 처리.""" q = await db.execute( select(AutoAction).where(AutoAction.action_id == action_id) ) action = q.scalar_one_or_none() if not action: raise HTTPException(404, f"작업 {action_id} 없음") if action.status != AutoActionStatus.PENDING_APPROVAL: raise HTTPException(400, f"현재 상태: {action.status} — 거부 불가") action.status = AutoActionStatus.REJECTED action.approved_by = current_user.username action.approved_at = datetime.now() action.comment = body.comment or "거부됨" await db.commit() await _notify_ops( f"❌ [거부] {action.action_type}\n" f" 작업 ID: {action_id}\n" f" 거부자: {current_user.username}\n" f" 사유: {body.comment or '사유 없음'}" ) return {"action_id": action_id, "status": "REJECTED"} @router.get("/history") async def get_history( days: int = 7, limit: int = 50, db: AsyncSession = Depends(get_db), _: User = Depends(_ops_user), ): """자동 처리 이력 조회.""" cutoff = datetime.now() - timedelta(days=days) q = await db.execute( select(AutoAction) .where(AutoAction.created_at >= cutoff) .order_by(desc(AutoAction.created_at)) .limit(limit) ) return [_action_dict(a) for a in q.scalars().all()] # ── 내부 유틸 ──────────────────────────────────────────────────────────────── def _action_dict(a: AutoAction) -> dict: return { "action_id": a.action_id, "action_type": a.action_type, "description": a.description, "target": a.target, "risk_level": a.risk_level, "status": a.status, "requested_by": a.requested_by, "approved_by": a.approved_by, "comment": a.comment, "created_at": a.created_at.isoformat() if a.created_at else None, "processed_at": a.processed_at.isoformat() if a.processed_at else None, "expires_at": a.expires_at.isoformat() if a.expires_at else None, } async def _notify_ops(message: str): """운영팀 채널로 알림 발송.""" try: import httpx async with httpx.AsyncClient(timeout=5) as c: await c.post( "http://localhost:8001/api/messenger/event", json={"event": "auto_action", "message": message, "room": "ops"}, ) except Exception as e: logger.warning("승인 알림 발송 실패: %s", e) async def _run_cycle_bg(): """백그라운드 자동 처리 사이클.""" from database import SessionLocal async with SessionLocal() as db: await run_auto_processing_cycle(db)