[고객 셀프서비스 포털] - routers/customer_portal.py: SR접수/추적/AI FAQ자가해결/카탈로그/만족도/통계 POST /api/portal/faq/suggest — KB+Ollama 기반 SR 접수 전 자가해결 유도 [그룹웨어 전자결재 연동] - routers/groupware.py: 카카오워크/네이버웍스/한컴/Custom 웹훅 POST /api/groupware/send-approval → 결재 발송 POST /api/groupware/callback → 승인/반려 콜백 → SR 상태 자동 갱신 [SIEM 보안 이벤트 연동] - routers/siem.py: Elasticsearch/Splunk HEC/OpenSearch POST /api/siem/alert/receive → SIEM 경보 → 인시던트 자동 생성 [네트워크 토폴로지 시각화] - routers/topology.py: CMDB CI 의존관계 D3.js 인터랙티브 그래프 GET /api/topology/page — 드래그/줌/헬스오버레이 뷰어 [포트폴리오 + 리소스/인력 관리] - routers/portfolio.py: 다중 프로젝트 포트폴리오 대시보드 + 인원 배치(M/M) + 역량 매핑 [Zero Trust + Kubernetes + ERP] - routers/infra_ext.py: - Zero Trust 세션 재검증 (위험점수 70 이상 → 강제 재인증) - K8s pods/services/nodes API 연동 - ERP 예산 동기화 [API 명세서] - manual/16_API_명세서.md: 전체 588개 라우트 도메인별 정리 [버그 수정] - customer_portal.py: ServiceCatalog→ServiceItem, KBDocument.content→solution/symptoms - customer_portal.py: catalog is_active→status="ACTIVE" Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
301 lines
10 KiB
Python
301 lines
10 KiB
Python
"""
|
|
SIEM 연동 API (ELK/Splunk/OpenSearch)
|
|
|
|
기능:
|
|
1. GUARDiA 보안 이벤트 → SIEM 실시간 전송
|
|
2. SIEM 경보 → GUARDiA 인시던트 자동 생성 (역방향)
|
|
3. 이벤트 조회 / 통계
|
|
|
|
환경변수:
|
|
SIEM_TYPE = elastic|splunk|opensearch|custom
|
|
ELASTIC_URL = http://elasticsearch:9200
|
|
ELASTIC_INDEX = guardia-events
|
|
ELASTIC_API_KEY = ...
|
|
SPLUNK_HEC_URL = http://splunk:8088/services/collector
|
|
SPLUNK_HEC_TOKEN = ...
|
|
OPENSEARCH_URL = http://opensearch:9200
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from datetime import datetime
|
|
from typing import Any, Optional
|
|
|
|
import httpx
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request
|
|
from pydantic import BaseModel
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.auth import get_current_user
|
|
from database import get_db
|
|
from models import User
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/api/siem", tags=["siem"])
|
|
|
|
SIEM_TYPE = os.getenv("SIEM_TYPE", "")
|
|
ELASTIC_URL = os.getenv("ELASTIC_URL", "")
|
|
ELASTIC_INDEX = os.getenv("ELASTIC_INDEX", "guardia-events")
|
|
ELASTIC_API_KEY = os.getenv("ELASTIC_API_KEY", "")
|
|
SPLUNK_HEC_URL = os.getenv("SPLUNK_HEC_URL", "")
|
|
SPLUNK_HEC_TOKEN= os.getenv("SPLUNK_HEC_TOKEN", "")
|
|
OPENSEARCH_URL = os.getenv("OPENSEARCH_URL", "")
|
|
|
|
# 이벤트 버퍼 (운영 시 Redis Queue로 전환)
|
|
_event_buffer: list[dict] = []
|
|
MAX_BUFFER = 1000
|
|
|
|
|
|
# ── 이벤트 스키마 ────────────────────────────────────────────────────────────
|
|
|
|
class SecurityEvent(BaseModel):
|
|
event_type: str # LOGIN_FAIL | PRIVILEGE_ESCALATION | VULN_DETECTED | etc
|
|
severity: str = "INFO" # INFO | LOW | MEDIUM | HIGH | CRITICAL
|
|
source: str = "GUARDiA"
|
|
user: Optional[str] = None
|
|
resource: Optional[str] = None
|
|
action: Optional[str] = None
|
|
description: Optional[str] = None
|
|
metadata: Optional[dict] = None
|
|
|
|
|
|
class SIEMAlertRequest(BaseModel):
|
|
"""SIEM에서 역방향으로 보내는 경보."""
|
|
alert_id: str
|
|
rule_name: str
|
|
severity: str
|
|
description: str
|
|
source_ip: Optional[str] = None
|
|
affected: Optional[str] = None
|
|
metadata: Optional[dict] = None
|
|
|
|
|
|
# ── SIEM 별 전송 ─────────────────────────────────────────────────────────────
|
|
|
|
async def _send_elastic(events: list[dict]) -> bool:
|
|
if not ELASTIC_URL:
|
|
return False
|
|
bulk_body = ""
|
|
for ev in events:
|
|
bulk_body += json.dumps({"index": {"_index": ELASTIC_INDEX}}) + "\n"
|
|
bulk_body += json.dumps(ev) + "\n"
|
|
try:
|
|
headers: dict = {"Content-Type": "application/x-ndjson"}
|
|
if ELASTIC_API_KEY:
|
|
headers["Authorization"] = f"ApiKey {ELASTIC_API_KEY}"
|
|
async with httpx.AsyncClient(timeout=10.0) as c:
|
|
r = await c.post(f"{ELASTIC_URL}/_bulk", content=bulk_body, headers=headers)
|
|
return r.status_code in (200, 201)
|
|
except Exception as e:
|
|
logger.warning("Elasticsearch 전송 실패: %s", e)
|
|
return False
|
|
|
|
|
|
async def _send_splunk(events: list[dict]) -> bool:
|
|
if not SPLUNK_HEC_URL or not SPLUNK_HEC_TOKEN:
|
|
return False
|
|
payload = "\n".join(json.dumps({"event": ev, "sourcetype": "guardia"}) for ev in events)
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as c:
|
|
r = await c.post(
|
|
SPLUNK_HEC_URL,
|
|
content=payload,
|
|
headers={"Authorization": f"Splunk {SPLUNK_HEC_TOKEN}", "Content-Type": "application/json"},
|
|
)
|
|
return r.status_code == 200
|
|
except Exception as e:
|
|
logger.warning("Splunk HEC 전송 실패: %s", e)
|
|
return False
|
|
|
|
|
|
async def _send_opensearch(events: list[dict]) -> bool:
|
|
if not OPENSEARCH_URL:
|
|
return False
|
|
bulk_body = ""
|
|
for ev in events:
|
|
bulk_body += json.dumps({"index": {"_index": ELASTIC_INDEX}}) + "\n"
|
|
bulk_body += json.dumps(ev) + "\n"
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as c:
|
|
r = await c.post(
|
|
f"{OPENSEARCH_URL}/_bulk",
|
|
content=bulk_body,
|
|
headers={"Content-Type": "application/x-ndjson"},
|
|
)
|
|
return r.status_code in (200, 201)
|
|
except Exception as e:
|
|
logger.warning("OpenSearch 전송 실패: %s", e)
|
|
return False
|
|
|
|
|
|
async def send_to_siem(events: list[dict]) -> bool:
|
|
"""SIEM 유형에 따라 이벤트 전송."""
|
|
t = SIEM_TYPE.lower()
|
|
if t == "elastic":
|
|
return await _send_elastic(events)
|
|
elif t == "splunk":
|
|
return await _send_splunk(events)
|
|
elif t == "opensearch":
|
|
return await _send_opensearch(events)
|
|
elif ELASTIC_URL:
|
|
return await _send_elastic(events)
|
|
elif SPLUNK_HEC_URL:
|
|
return await _send_splunk(events)
|
|
elif OPENSEARCH_URL:
|
|
return await _send_opensearch(events)
|
|
else:
|
|
logger.debug("SIEM 미설정 — 이벤트 버퍼에만 저장")
|
|
return False
|
|
|
|
|
|
def _build_event(ev: SecurityEvent, actor: str = "system") -> dict:
|
|
return {
|
|
"@timestamp": datetime.utcnow().isoformat() + "Z",
|
|
"event_type": ev.event_type,
|
|
"severity": ev.severity,
|
|
"source": ev.source,
|
|
"user": ev.user or actor,
|
|
"resource": ev.resource,
|
|
"action": ev.action,
|
|
"description": ev.description,
|
|
"metadata": ev.metadata or {},
|
|
"tags": ["guardia", "itsm"],
|
|
}
|
|
|
|
|
|
# ── 이벤트 발송 API ───────────────────────────────────────────────────────────
|
|
|
|
@router.post("/events")
|
|
async def push_event(
|
|
body: SecurityEvent,
|
|
bg: BackgroundTasks,
|
|
_u: User = Depends(get_current_user),
|
|
):
|
|
"""보안 이벤트를 SIEM으로 전송."""
|
|
ev = _build_event(body, _u.username)
|
|
|
|
# 버퍼에 저장
|
|
_event_buffer.append(ev)
|
|
if len(_event_buffer) > MAX_BUFFER:
|
|
_event_buffer.pop(0)
|
|
|
|
# SIEM 전송 (백그라운드)
|
|
bg.add_task(send_to_siem, [ev])
|
|
|
|
return {
|
|
"message": "이벤트가 전송되었습니다.",
|
|
"event_type": ev["event_type"],
|
|
"severity": ev["severity"],
|
|
}
|
|
|
|
|
|
@router.post("/events/batch")
|
|
async def push_events_batch(
|
|
events: list[SecurityEvent],
|
|
bg: BackgroundTasks,
|
|
_u: User = Depends(get_current_user),
|
|
):
|
|
"""여러 이벤트를 일괄 전송."""
|
|
if len(events) > 100:
|
|
raise HTTPException(400, "한 번에 최대 100개까지 전송 가능합니다.")
|
|
|
|
evs = [_build_event(e, _u.username) for e in events]
|
|
_event_buffer.extend(evs[-MAX_BUFFER:])
|
|
if len(_event_buffer) > MAX_BUFFER:
|
|
_event_buffer[:] = _event_buffer[-MAX_BUFFER:]
|
|
|
|
bg.add_task(send_to_siem, evs)
|
|
return {"message": f"{len(evs)}개 이벤트 전송", "count": len(evs)}
|
|
|
|
|
|
# ── SIEM 역방향 경보 수신 ────────────────────────────────────────────────────
|
|
|
|
@router.post("/alert/receive")
|
|
async def receive_siem_alert(
|
|
body: SIEMAlertRequest,
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""SIEM 경보 수신 → GUARDiA 인시던트 자동 생성 (ADMIN 인증 불필요 — webhook)."""
|
|
from models import Incident, IncidentGrade, IncidentStatus
|
|
from uuid import uuid4
|
|
|
|
grade_map = {"CRITICAL": IncidentGrade.P1, "HIGH": IncidentGrade.P2,
|
|
"MEDIUM": IncidentGrade.P3, "LOW": IncidentGrade.P4}
|
|
grade = grade_map.get(body.severity.upper(), IncidentGrade.P3)
|
|
|
|
incident = Incident(
|
|
incident_id = f"INC-{datetime.now().strftime('%Y%m%d')}-{str(uuid4())[:6].upper()}",
|
|
title = f"[SIEM] {body.rule_name}",
|
|
description = (
|
|
f"SIEM 경보 자동 수신\n"
|
|
f"규칙: {body.rule_name}\n"
|
|
f"설명: {body.description}\n"
|
|
f"소스 IP: {body.source_ip or '미상'}\n"
|
|
f"영향 자산: {body.affected or '미상'}"
|
|
),
|
|
grade = grade,
|
|
status = IncidentStatus.OPEN,
|
|
occurred_at = datetime.now(),
|
|
affected_service = body.affected,
|
|
reported_by = f"SIEM:{body.alert_id}",
|
|
)
|
|
db.add(incident)
|
|
await db.commit()
|
|
|
|
# P1/P2 즉시 알림
|
|
if grade in (IncidentGrade.P1, IncidentGrade.P2):
|
|
try:
|
|
from core.notify import send_messenger
|
|
import os as _os
|
|
await send_messenger(
|
|
_os.getenv("MESSENGER_OPS_ROOM", "ops"),
|
|
{"type": "text", "text": f"🚨 SIEM 경보 [{grade}]\n{body.rule_name}\n{body.description[:200]}"}
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"message": f"인시던트 {incident.incident_id} 자동 생성",
|
|
"incident_id": incident.incident_id,
|
|
"grade": grade,
|
|
}
|
|
|
|
|
|
# ── 이벤트 조회 / 통계 ───────────────────────────────────────────────────────
|
|
|
|
@router.get("/events")
|
|
async def list_events(
|
|
limit: int = 50,
|
|
severity: Optional[str] = None,
|
|
_u: User = Depends(get_current_user),
|
|
):
|
|
"""최근 보안 이벤트 목록 (버퍼에서 조회)."""
|
|
events = _event_buffer[-limit:][::-1]
|
|
if severity:
|
|
events = [e for e in events if e.get("severity", "").upper() == severity.upper()]
|
|
return {"total": len(_event_buffer), "events": events[:limit]}
|
|
|
|
|
|
@router.get("/stats")
|
|
async def siem_stats(_u: User = Depends(get_current_user)):
|
|
"""SIEM 연동 현황 통계."""
|
|
by_sev: dict = {}
|
|
by_type: dict = {}
|
|
for ev in _event_buffer:
|
|
sev = ev.get("severity", "INFO")
|
|
etype= ev.get("event_type", "UNKNOWN")
|
|
by_sev[sev] = by_sev.get(sev, 0) + 1
|
|
by_type[etype] = by_type.get(etype, 0) + 1
|
|
|
|
return {
|
|
"siem_type": SIEM_TYPE or "미설정",
|
|
"elastic_url": ELASTIC_URL[:30] + "..." if ELASTIC_URL else "",
|
|
"splunk_url": SPLUNK_HEC_URL[:30] + "..." if SPLUNK_HEC_URL else "",
|
|
"total_events": len(_event_buffer),
|
|
"by_severity": by_sev,
|
|
"by_type": by_type,
|
|
"configured": bool(ELASTIC_URL or SPLUNK_HEC_URL or OPENSEARCH_URL),
|
|
}
|