zioinfo-mail/itsm/routers/siem.py
DESKTOP-TKLFCPR\ython 25d02183e3 feat(itsm): 추가 기능 7개 + API 명세서 완성
[고객 셀프서비스 포털]
- routers/customer_portal.py: SR접수/추적/AI FAQ자가해결/카탈로그/만족도/통계
  POST /api/portal/faq/suggest — KB+Ollama 기반 SR 접수 전 자가해결 유도

[그룹웨어 전자결재 연동]
- routers/groupware.py: 카카오워크/네이버웍스/한컴/Custom 웹훅
  POST /api/groupware/send-approval → 결재 발송
  POST /api/groupware/callback → 승인/반려 콜백 → SR 상태 자동 갱신

[SIEM 보안 이벤트 연동]
- routers/siem.py: Elasticsearch/Splunk HEC/OpenSearch
  POST /api/siem/alert/receive → SIEM 경보 → 인시던트 자동 생성

[네트워크 토폴로지 시각화]
- routers/topology.py: CMDB CI 의존관계 D3.js 인터랙티브 그래프
  GET /api/topology/page — 드래그/줌/헬스오버레이 뷰어

[포트폴리오 + 리소스/인력 관리]
- routers/portfolio.py: 다중 프로젝트 포트폴리오 대시보드
  + 인원 배치(M/M) + 역량 매핑

[Zero Trust + Kubernetes + ERP]
- routers/infra_ext.py:
  - Zero Trust 세션 재검증 (위험점수 70 이상 → 강제 재인증)
  - K8s pods/services/nodes API 연동
  - ERP 예산 동기화

[API 명세서]
- manual/16_API_명세서.md: 전체 588개 라우트 도메인별 정리

[버그 수정]
- customer_portal.py: ServiceCatalog→ServiceItem, KBDocument.content→solution/symptoms
- customer_portal.py: catalog is_active→status="ACTIVE"

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-30 07:37:52 +09:00

301 lines
10 KiB
Python

"""
SIEM 연동 API (ELK/Splunk/OpenSearch)
기능:
1. GUARDiA 보안 이벤트 → SIEM 실시간 전송
2. SIEM 경보 → GUARDiA 인시던트 자동 생성 (역방향)
3. 이벤트 조회 / 통계
환경변수:
SIEM_TYPE = elastic|splunk|opensearch|custom
ELASTIC_URL = http://elasticsearch:9200
ELASTIC_INDEX = guardia-events
ELASTIC_API_KEY = ...
SPLUNK_HEC_URL = http://splunk:8088/services/collector
SPLUNK_HEC_TOKEN = ...
OPENSEARCH_URL = http://opensearch:9200
"""
from __future__ import annotations
import json
import logging
import os
from datetime import datetime
from typing import Any, Optional
import httpx
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from database import get_db
from models import User
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/siem", tags=["siem"])
SIEM_TYPE = os.getenv("SIEM_TYPE", "")
ELASTIC_URL = os.getenv("ELASTIC_URL", "")
ELASTIC_INDEX = os.getenv("ELASTIC_INDEX", "guardia-events")
ELASTIC_API_KEY = os.getenv("ELASTIC_API_KEY", "")
SPLUNK_HEC_URL = os.getenv("SPLUNK_HEC_URL", "")
SPLUNK_HEC_TOKEN= os.getenv("SPLUNK_HEC_TOKEN", "")
OPENSEARCH_URL = os.getenv("OPENSEARCH_URL", "")
# 이벤트 버퍼 (운영 시 Redis Queue로 전환)
_event_buffer: list[dict] = []
MAX_BUFFER = 1000
# ── 이벤트 스키마 ────────────────────────────────────────────────────────────
class SecurityEvent(BaseModel):
event_type: str # LOGIN_FAIL | PRIVILEGE_ESCALATION | VULN_DETECTED | etc
severity: str = "INFO" # INFO | LOW | MEDIUM | HIGH | CRITICAL
source: str = "GUARDiA"
user: Optional[str] = None
resource: Optional[str] = None
action: Optional[str] = None
description: Optional[str] = None
metadata: Optional[dict] = None
class SIEMAlertRequest(BaseModel):
"""SIEM에서 역방향으로 보내는 경보."""
alert_id: str
rule_name: str
severity: str
description: str
source_ip: Optional[str] = None
affected: Optional[str] = None
metadata: Optional[dict] = None
# ── SIEM 별 전송 ─────────────────────────────────────────────────────────────
async def _send_elastic(events: list[dict]) -> bool:
if not ELASTIC_URL:
return False
bulk_body = ""
for ev in events:
bulk_body += json.dumps({"index": {"_index": ELASTIC_INDEX}}) + "\n"
bulk_body += json.dumps(ev) + "\n"
try:
headers: dict = {"Content-Type": "application/x-ndjson"}
if ELASTIC_API_KEY:
headers["Authorization"] = f"ApiKey {ELASTIC_API_KEY}"
async with httpx.AsyncClient(timeout=10.0) as c:
r = await c.post(f"{ELASTIC_URL}/_bulk", content=bulk_body, headers=headers)
return r.status_code in (200, 201)
except Exception as e:
logger.warning("Elasticsearch 전송 실패: %s", e)
return False
async def _send_splunk(events: list[dict]) -> bool:
if not SPLUNK_HEC_URL or not SPLUNK_HEC_TOKEN:
return False
payload = "\n".join(json.dumps({"event": ev, "sourcetype": "guardia"}) for ev in events)
try:
async with httpx.AsyncClient(timeout=10.0) as c:
r = await c.post(
SPLUNK_HEC_URL,
content=payload,
headers={"Authorization": f"Splunk {SPLUNK_HEC_TOKEN}", "Content-Type": "application/json"},
)
return r.status_code == 200
except Exception as e:
logger.warning("Splunk HEC 전송 실패: %s", e)
return False
async def _send_opensearch(events: list[dict]) -> bool:
if not OPENSEARCH_URL:
return False
bulk_body = ""
for ev in events:
bulk_body += json.dumps({"index": {"_index": ELASTIC_INDEX}}) + "\n"
bulk_body += json.dumps(ev) + "\n"
try:
async with httpx.AsyncClient(timeout=10.0) as c:
r = await c.post(
f"{OPENSEARCH_URL}/_bulk",
content=bulk_body,
headers={"Content-Type": "application/x-ndjson"},
)
return r.status_code in (200, 201)
except Exception as e:
logger.warning("OpenSearch 전송 실패: %s", e)
return False
async def send_to_siem(events: list[dict]) -> bool:
"""SIEM 유형에 따라 이벤트 전송."""
t = SIEM_TYPE.lower()
if t == "elastic":
return await _send_elastic(events)
elif t == "splunk":
return await _send_splunk(events)
elif t == "opensearch":
return await _send_opensearch(events)
elif ELASTIC_URL:
return await _send_elastic(events)
elif SPLUNK_HEC_URL:
return await _send_splunk(events)
elif OPENSEARCH_URL:
return await _send_opensearch(events)
else:
logger.debug("SIEM 미설정 — 이벤트 버퍼에만 저장")
return False
def _build_event(ev: SecurityEvent, actor: str = "system") -> dict:
return {
"@timestamp": datetime.utcnow().isoformat() + "Z",
"event_type": ev.event_type,
"severity": ev.severity,
"source": ev.source,
"user": ev.user or actor,
"resource": ev.resource,
"action": ev.action,
"description": ev.description,
"metadata": ev.metadata or {},
"tags": ["guardia", "itsm"],
}
# ── 이벤트 발송 API ───────────────────────────────────────────────────────────
@router.post("/events")
async def push_event(
body: SecurityEvent,
bg: BackgroundTasks,
_u: User = Depends(get_current_user),
):
"""보안 이벤트를 SIEM으로 전송."""
ev = _build_event(body, _u.username)
# 버퍼에 저장
_event_buffer.append(ev)
if len(_event_buffer) > MAX_BUFFER:
_event_buffer.pop(0)
# SIEM 전송 (백그라운드)
bg.add_task(send_to_siem, [ev])
return {
"message": "이벤트가 전송되었습니다.",
"event_type": ev["event_type"],
"severity": ev["severity"],
}
@router.post("/events/batch")
async def push_events_batch(
events: list[SecurityEvent],
bg: BackgroundTasks,
_u: User = Depends(get_current_user),
):
"""여러 이벤트를 일괄 전송."""
if len(events) > 100:
raise HTTPException(400, "한 번에 최대 100개까지 전송 가능합니다.")
evs = [_build_event(e, _u.username) for e in events]
_event_buffer.extend(evs[-MAX_BUFFER:])
if len(_event_buffer) > MAX_BUFFER:
_event_buffer[:] = _event_buffer[-MAX_BUFFER:]
bg.add_task(send_to_siem, evs)
return {"message": f"{len(evs)}개 이벤트 전송", "count": len(evs)}
# ── SIEM 역방향 경보 수신 ────────────────────────────────────────────────────
@router.post("/alert/receive")
async def receive_siem_alert(
body: SIEMAlertRequest,
db: AsyncSession = Depends(get_db),
):
"""SIEM 경보 수신 → GUARDiA 인시던트 자동 생성 (ADMIN 인증 불필요 — webhook)."""
from models import Incident, IncidentGrade, IncidentStatus
from uuid import uuid4
grade_map = {"CRITICAL": IncidentGrade.P1, "HIGH": IncidentGrade.P2,
"MEDIUM": IncidentGrade.P3, "LOW": IncidentGrade.P4}
grade = grade_map.get(body.severity.upper(), IncidentGrade.P3)
incident = Incident(
incident_id = f"INC-{datetime.now().strftime('%Y%m%d')}-{str(uuid4())[:6].upper()}",
title = f"[SIEM] {body.rule_name}",
description = (
f"SIEM 경보 자동 수신\n"
f"규칙: {body.rule_name}\n"
f"설명: {body.description}\n"
f"소스 IP: {body.source_ip or '미상'}\n"
f"영향 자산: {body.affected or '미상'}"
),
grade = grade,
status = IncidentStatus.OPEN,
occurred_at = datetime.now(),
affected_service = body.affected,
reported_by = f"SIEM:{body.alert_id}",
)
db.add(incident)
await db.commit()
# P1/P2 즉시 알림
if grade in (IncidentGrade.P1, IncidentGrade.P2):
try:
from core.notify import send_messenger
import os as _os
await send_messenger(
_os.getenv("MESSENGER_OPS_ROOM", "ops"),
{"type": "text", "text": f"🚨 SIEM 경보 [{grade}]\n{body.rule_name}\n{body.description[:200]}"}
)
except Exception:
pass
return {
"message": f"인시던트 {incident.incident_id} 자동 생성",
"incident_id": incident.incident_id,
"grade": grade,
}
# ── 이벤트 조회 / 통계 ───────────────────────────────────────────────────────
@router.get("/events")
async def list_events(
limit: int = 50,
severity: Optional[str] = None,
_u: User = Depends(get_current_user),
):
"""최근 보안 이벤트 목록 (버퍼에서 조회)."""
events = _event_buffer[-limit:][::-1]
if severity:
events = [e for e in events if e.get("severity", "").upper() == severity.upper()]
return {"total": len(_event_buffer), "events": events[:limit]}
@router.get("/stats")
async def siem_stats(_u: User = Depends(get_current_user)):
"""SIEM 연동 현황 통계."""
by_sev: dict = {}
by_type: dict = {}
for ev in _event_buffer:
sev = ev.get("severity", "INFO")
etype= ev.get("event_type", "UNKNOWN")
by_sev[sev] = by_sev.get(sev, 0) + 1
by_type[etype] = by_type.get(etype, 0) + 1
return {
"siem_type": SIEM_TYPE or "미설정",
"elastic_url": ELASTIC_URL[:30] + "..." if ELASTIC_URL else "",
"splunk_url": SPLUNK_HEC_URL[:30] + "..." if SPLUNK_HEC_URL else "",
"total_events": len(_event_buffer),
"by_severity": by_sev,
"by_type": by_type,
"configured": bool(ELASTIC_URL or SPLUNK_HEC_URL or OPENSEARCH_URL),
}