guardia-itsm/core/events.py
DESKTOP-TKLFCPRython 64c27c3509 feat(itsm): G-1~G-12 확장 기능 + 하네스/봇/설치스크립트 구현
G-1: 메신저 Webhook Relay + _send_to_room 실제 httpx 호출 구현
G-2: POST /api/tasks/bulk SR 대량작업 엔드포인트 (최대 100건)
G-3: 라이선스 만료 알림 스케줄러 (매일 09:00 KST)
G-4: 체험판 upgrade_banner 필드 + license.py 배너 로직
G-5: core/auto_rca.py + incidents/problem auto-rca 엔드포인트
G-6: core/deploy_impact.py + vibe impact-analysis 엔드포인트
G-7: core/ticket_classifier.py + SR 생성 시 AI 분류 + ai-suggestion API
G-8: VulnPatchRecord 모델 + vuln_scan 패치추적 4개 엔드포인트
G-9: core/jira_sync.py + gateway Jira/Confluence 연동 엔드포인트
G-10: core/push_notify.py + routers/push.py + PushSubscription 모델
G-11: approvals 다중승인 (위임/서명/기한초과/마감연장)
G-12: alembic.ini + migrations/ + cicd/migrate_to_postgres.sh

하네스: guardia-orchestrator 확장기능 Phase 반영
봇명령어: /sr /status /license /bulk 슬래시 명령어 추가
설치스크립트: setup/ (Ubuntu, CentOS, RHEL, Windows) --test 옵션 포함

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 18:18:52 +09:00

144 lines
4.7 KiB
Python

"""경량 인메모리 이벤트 버스 — SSE 브로드캐스트용.
단일 워커 환경(개발/온프레미스 데모)에서 모든 SSE 구독자에게
실시간 이벤트를 전달합니다.
사용법:
from core.events import broadcast, subscribe, unsubscribe
# 이벤트 발행 (router 내 async 함수에서)
await broadcast("sr_updated", {"sr_id": "SR-...", "new_status": "APPROVED"})
# SSE 엔드포인트에서 구독
queue = subscribe()
try:
msg = await asyncio.wait_for(queue.get(), timeout=20.0)
finally:
unsubscribe(queue)
"""
import asyncio
import json
from typing import Any
# 구독 중인 클라이언트 큐 목록
_subscribers: list[asyncio.Queue] = []
async def broadcast(event_type: str, data: dict[str, Any] | None = None) -> None:
"""모든 SSE 구독자에게 이벤트를 전송합니다.
큐가 가득 찬(오프라인) 구독자는 자동으로 제거됩니다.
"""
msg = json.dumps({"type": event_type, **(data or {})}, ensure_ascii=False)
dead: list[asyncio.Queue] = []
for q in list(_subscribers):
try:
q.put_nowait(msg)
except asyncio.QueueFull:
dead.append(q)
for q in dead:
unsubscribe(q)
def subscribe() -> asyncio.Queue:
"""새 구독 큐를 생성하고 등록합니다."""
q: asyncio.Queue = asyncio.Queue(maxsize=100)
_subscribers.append(q)
return q
def unsubscribe(q: asyncio.Queue) -> None:
"""구독을 해제합니다."""
try:
_subscribers.remove(q)
except ValueError:
pass
def subscriber_count() -> int:
"""현재 활성 구독자 수를 반환합니다."""
return len(_subscribers)
# ── 배치 실행 로그 SSE 스트리밍 ────────────────────────────────────────────────
async def stream_batch_log(run_id: int):
"""
배치 실행 로그를 SSE 형식으로 스트리밍합니다.
BatchRun.stdout_tail 을 주기적으로 폴링하여 새 내용을 yield.
result 가 RUNNING 이 아닌 상태가 되면 마지막 로그를 전송 후 종료.
Usage (FastAPI StreamingResponse):
return StreamingResponse(
stream_batch_log(run_id),
media_type="text/event-stream",
)
Yields:
SSE 형식 문자열: "data: {json}\\n\\n"
"""
import asyncio
from database import SessionLocal
from models import BatchRun, BatchRunResult
POLL_INTERVAL = 2.0 # 2초 폴링
MAX_WAIT_SEC = 3600 # 최대 1시간 대기
elapsed = 0.0
sent_lines = 0 # 이미 전송한 줄 수 (중복 방지)
while elapsed < MAX_WAIT_SEC:
async with SessionLocal() as db:
from sqlalchemy import select
row = (await db.execute(
select(BatchRun).where(BatchRun.id == run_id)
)).scalars().first()
if not row:
yield f"data: {json.dumps({'type': 'error', 'message': '실행 이력을 찾을 수 없습니다.'}, ensure_ascii=False)}\n\n"
return
# stdout_tail 에서 새 줄 추출
stdout = row.stdout_tail or ""
all_lines = stdout.splitlines()
new_lines = all_lines[sent_lines:]
if new_lines:
for line in new_lines:
payload = json.dumps(
{"type": "log", "run_id": run_id, "line": line},
ensure_ascii=False,
)
yield f"data: {payload}\n\n"
sent_lines += len(new_lines)
# 완료/실패/타임아웃 → 종료 이벤트 전송
if row.result != BatchRunResult.RUNNING:
result_payload = json.dumps({
"type": "done",
"run_id": run_id,
"result": row.result,
"exit_code": row.exit_code,
"ended_at": row.ended_at.isoformat() if row.ended_at else None,
"error_msg": row.error_msg,
}, ensure_ascii=False)
yield f"data: {result_payload}\n\n"
return
# 아직 실행 중 — heartbeat 전송 후 대기
heartbeat = json.dumps(
{"type": "heartbeat", "run_id": run_id, "elapsed": int(elapsed)},
ensure_ascii=False,
)
yield f"data: {heartbeat}\n\n"
await asyncio.sleep(POLL_INTERVAL)
elapsed += POLL_INTERVAL
# 타임아웃 — 스트리밍 강제 종료
timeout_payload = json.dumps(
{"type": "timeout", "run_id": run_id, "message": "스트리밍 최대 대기 시간 초과"},
ensure_ascii=False,
)
yield f"data: {timeout_payload}\n\n"