"""Manager Gen6 — 크로스 시스템 연동: ITSM EventBus 구독·데이터 동기화·상태 집계""" import httpx import uuid from datetime import datetime from typing import Any, Dict, List, Optional from fastapi import APIRouter, HTTPException, Query from fastapi.responses import StreamingResponse from pydantic import BaseModel import asyncio import json router = APIRouter(prefix="/api/cross", tags=["Cross System"]) ITSM = "http://localhost:8001" _subscriptions: Dict[str, Dict] = {} _sync_state: Dict[str, Any] = {"last_sync": None, "sr_count": 0, "server_count": 0} _event_buffer: List[Dict] = [] class SubscribeRequest(BaseModel): channels: List[str] # sr|alert|deploy|server|approval|metric|chat|incident|audit|system subscriber: str; callback_url: str = "" # ── ITSM EventBus 구독 ─────────────────────────────────────────────────────── @router.post("/subscribe") async def subscribe_channels(req: SubscribeRequest): sid = f"SUB-{uuid.uuid4().hex[:8].upper()}" _subscriptions[sid] = {**req.model_dump(), "id": sid, "active": True, "created_at": datetime.utcnow().isoformat(), "events_received": 0} return _subscriptions[sid] @router.get("/subscriptions") async def list_subscriptions(): return {"subscriptions": list(_subscriptions.values()), "total": len(_subscriptions)} @router.delete("/subscriptions/{sid}") async def unsubscribe(sid: str): _subscriptions.pop(sid, None) return {"unsubscribed": sid} # ── ITSM 데이터 스냅샷 ───────────────────────────────────────────────────── @router.get("/snapshot/sr") async def snapshot_sr(): try: async with httpx.AsyncClient(timeout=10.0) as c: r = await c.get(f"{ITSM}/api/tasks", headers={"X-Internal": "manager"}) if r.status_code == 200: data = r.json() _sync_state["sr_count"] = data.get("total", 0) _sync_state["last_sync"] = datetime.utcnow().isoformat() return {"source": "itsm", "sr": data, "synced_at": _sync_state["last_sync"]} except Exception: pass return {"source": "itsm", "sr": {"items": [], "total": 0}, "error": "ITSM 연결 불가"} @router.get("/snapshot/servers") async def snapshot_servers(): try: async with httpx.AsyncClient(timeout=10.0) as c: r = await c.get(f"{ITSM}/api/cmdb/servers", headers={"X-Internal": "manager"}) if r.status_code == 200: data = r.json() _sync_state["server_count"] = len(data.get("servers", [])) return {"source": "itsm", "servers": data, "synced_at": datetime.utcnow().isoformat()} except Exception: pass return {"source": "itsm", "servers": {"servers": [], "total": 0}, "error": "ITSM 연결 불가"} @router.get("/snapshot/alerts") async def snapshot_alerts(): try: async with httpx.AsyncClient(timeout=10.0) as c: r = await c.get(f"{ITSM}/api/alert-rules/active", headers={"X-Internal": "manager"}) if r.status_code == 200: return {"source": "itsm", "alerts": r.json(), "synced_at": datetime.utcnow().isoformat()} except Exception: pass return {"source": "itsm", "alerts": [], "error": "ITSM 연결 불가"} # ── 통합 현황 집계 ───────────────────────────────────────────────────────── @router.get("/aggregate") async def aggregate_status(): return {"itsm": {"url": ITSM, "sr_count": _sync_state["sr_count"], "server_count": _sync_state["server_count"], "last_sync": _sync_state["last_sync"]}, "manager": {"subscriptions": len(_subscriptions), "events_buffered": len(_event_buffer)}, "messenger": {"connected": False, "push_enabled": False}, "ts": datetime.utcnow().isoformat()} # ── Manager → ITSM 이벤트 발행 ──────────────────────────────────────────── @router.post("/publish") async def publish_to_itsm(channel: str, payload: Dict[str, Any]): try: async with httpx.AsyncClient(timeout=10.0) as c: r = await c.post(f"{ITSM}/api/sync/publish/{channel}", json=payload, headers={"X-Internal": "manager"}) if r.status_code == 200: return {"published": True, "channel": channel, "payload": payload} except Exception as e: pass event = {"channel": channel, "payload": payload, "ts": datetime.utcnow().isoformat(), "buffered": True} _event_buffer.append(event) return event # ── SSE 이벤트 스트림 (Manager용 ITSM 구독) ───────────────────────────────── @router.get("/stream") async def manager_event_stream(channels: str = "sr,alert"): channel_list = [c.strip() for c in channels.split(",")] async def generate(): while True: for event in _event_buffer[-5:]: if event.get("channel") in channel_list: yield f"data: {json.dumps(event)}\n\n" await asyncio.sleep(2) return StreamingResponse(generate(), media_type="text/event-stream") @router.get("/health") async def health(): return {"status": "ok", "subscriptions": len(_subscriptions), "event_buffer": len(_event_buffer), "sync_state": _sync_state}