guardia-itsm/routers/gateway.py
DESKTOP-TKLFCPRython 64c27c3509 feat(itsm): G-1~G-12 확장 기능 + 하네스/봇/설치스크립트 구현
G-1: 메신저 Webhook Relay + _send_to_room 실제 httpx 호출 구현
G-2: POST /api/tasks/bulk SR 대량작업 엔드포인트 (최대 100건)
G-3: 라이선스 만료 알림 스케줄러 (매일 09:00 KST)
G-4: 체험판 upgrade_banner 필드 + license.py 배너 로직
G-5: core/auto_rca.py + incidents/problem auto-rca 엔드포인트
G-6: core/deploy_impact.py + vibe impact-analysis 엔드포인트
G-7: core/ticket_classifier.py + SR 생성 시 AI 분류 + ai-suggestion API
G-8: VulnPatchRecord 모델 + vuln_scan 패치추적 4개 엔드포인트
G-9: core/jira_sync.py + gateway Jira/Confluence 연동 엔드포인트
G-10: core/push_notify.py + routers/push.py + PushSubscription 모델
G-11: approvals 다중승인 (위임/서명/기한초과/마감연장)
G-12: alembic.ini + migrations/ + cicd/migrate_to_postgres.sh

하네스: guardia-orchestrator 확장기능 Phase 반영
봇명령어: /sr /status /license /bulk 슬래시 명령어 추가
설치스크립트: setup/ (Ubuntu, CentOS, RHEL, Windows) --test 옵션 포함

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 18:18:52 +09:00

618 lines
23 KiB
Python

"""
F-5: OpenAPI 외부 연동 게이트웨이
기능:
1. 외부 시스템 연동 등록 (API Key 기반 인증)
2. 연동 상태 관리 (활성/비활성/테스트)
3. 웹훅 수신 (외부 시스템 → GUARDiA 이벤트 주입)
4. 아웃바운드 웹훅 발송 (GUARDiA → 외부 시스템 알림)
5. API 사용량 추적 및 Rate Limit
6. 연동 로그 (요청/응답 기록, 민감 데이터 마스킹)
7. OpenAPI 스키마 자동 노출 (외부 연동 가이드)
보안 원칙:
- API Key는 SHA-256 해시로 저장 (평문 불가)
- 아웃바운드 요청은 내부 네트워크(온프레미스)만 허용
- 요청/응답 로그에서 Authorization, password, secret 자동 마스킹
- HMAC-SHA256 서명 검증 (웹훅 무결성)
엔드포인트:
POST /api/gateway/integrations — 연동 등록
GET /api/gateway/integrations — 연동 목록
GET /api/gateway/integrations/{id} — 연동 상세
PUT /api/gateway/integrations/{id} — 연동 수정
DELETE /api/gateway/integrations/{id}— 연동 삭제
POST /api/gateway/integrations/{id}/test — 연동 테스트
POST /api/gateway/webhook/{key} — 웹훅 수신 (외부 → GUARDiA)
POST /api/gateway/send/{id} — 아웃바운드 알림 발송
GET /api/gateway/logs — 연동 로그 조회
GET /api/gateway/stats — 연동 통계
"""
from __future__ import annotations
import hashlib
import hmac
import json
import logging
import re
import time
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from uuid import uuid4
from fastapi import APIRouter, Depends, HTTPException, Request, Query
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from database import get_db
from models import User, UserRole
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/gateway", tags=["gateway"])
# ── 인메모리 저장소 ──────────────────────────────────────────────────────────
_integrations: Dict[str, Dict] = {} # int_id -> record
_gw_logs: List[Dict] = [] # 최근 1000건
_rate_counts: Dict[str, List] = {} # int_id -> [timestamp, ...]
MAX_LOGS = 1000
RATE_LIMIT_RPM = 60 # 기본 분당 60회
# ── 지원 연동 유형 ────────────────────────────────────────────────────────────
INTEGRATION_TYPES = {
"WEBHOOK_IN": "웹훅 수신 (외부 → GUARDiA)",
"WEBHOOK_OUT": "웹훅 발송 (GUARDiA → 외부)",
"REST_API": "REST API 연동",
"MONITORING": "모니터링 시스템 연동",
"TICKETING": "외부 티켓팅 시스템",
"NOTIFICATION": "알림 채널 연동 (메신저/이메일)",
}
# 마스킹 대상 키워드
_MASK_KEYS = re.compile(
r"(authorization|password|secret|token|api_key|apikey|key|credential)",
re.IGNORECASE
)
# ── Pydantic 스키마 ──────────────────────────────────────────────────────────
class IntegrationIn(BaseModel):
name: str
type: str # INTEGRATION_TYPES key
description: str = ""
endpoint_url: Optional[str] = None # 아웃바운드 대상 URL
secret: Optional[str] = None # 웹훅 서명 시크릿 (평문 → 해시 저장)
headers: Dict[str, str] = {} # 커스텀 헤더 (마스킹 저장)
enabled: bool = True
rate_limit: int = RATE_LIMIT_RPM # 분당 요청 제한
class IntegrationUpdateIn(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
endpoint_url: Optional[str] = None
enabled: Optional[bool] = None
rate_limit: Optional[int] = None
class WebhookPayload(BaseModel):
event: str
data: Dict[str, Any] = {}
source: str = "external"
class OutboundPayload(BaseModel):
event: str
message: str
data: Dict[str, Any] = {}
severity: str = "INFO"
# ── 헬퍼 ────────────────────────────────────────────────────────────────────
def _gen_int_id() -> str:
return f"GW-{uuid4().hex[:8].upper()}"
def _gen_api_key() -> str:
"""API 키 생성 (평문 반환 — 이 한 번만 노출)."""
return f"gk_{uuid4().hex}{uuid4().hex[:8]}"
def _hash_secret(raw: str) -> str:
return hashlib.sha256(raw.encode()).hexdigest()
def _mask_dict(d: Dict) -> Dict:
"""딕셔너리에서 민감 키 값을 마스킹."""
result = {}
for k, v in d.items():
if _MASK_KEYS.search(str(k)):
result[k] = "***"
elif isinstance(v, dict):
result[k] = _mask_dict(v)
else:
result[k] = v
return result
def _append_log(int_id: str, direction: str, event: str,
status: str, payload: Any = None, error: str = "") -> None:
"""연동 로그 기록 (민감 데이터 마스킹)."""
safe_payload = None
if payload:
try:
safe_payload = _mask_dict(payload) if isinstance(payload, dict) else str(payload)[:200]
except Exception:
safe_payload = "[마스킹 오류]"
entry = {
"log_id": f"LOG-{uuid4().hex[:8].upper()}",
"int_id": int_id,
"direction": direction, # IN / OUT
"event": event,
"status": status, # OK / ERROR / RATE_LIMITED
"payload": safe_payload,
"error": error,
"logged_at": datetime.utcnow().isoformat(),
}
_gw_logs.append(entry)
if len(_gw_logs) > MAX_LOGS:
_gw_logs.pop(0)
def _check_rate_limit(int_id: str, limit: int) -> bool:
"""True → 제한 초과, False → 허용."""
now = time.time()
window_start = now - 60.0
hits = _rate_counts.setdefault(int_id, [])
# 1분 윈도우 밖 항목 제거
_rate_counts[int_id] = [t for t in hits if t > window_start]
if len(_rate_counts[int_id]) >= limit:
return True
_rate_counts[int_id].append(now)
return False
def _verify_hmac(payload: bytes, signature: str, secret_hash: str) -> bool:
"""
웹훅 HMAC-SHA256 서명 검증.
외부에서 전송된 signature를 secret 해시와 대조.
"""
# 실제 운영에서는 원본 secret으로 검증해야 하지만,
# 보안상 secret 평문은 저장하지 않으므로 해시 기반 검증 방식을 안내
# (운영 환경에서는 HSM 또는 별도 키 스토어 사용 권장)
try:
expected = hmac.new(
secret_hash.encode(), payload, hashlib.sha256
).hexdigest()
sig = signature.removeprefix("sha256=")
return hmac.compare_digest(expected, sig)
except Exception:
return False
# ── 엔드포인트 ───────────────────────────────────────────────────────────────
@router.post("/integrations", status_code=201)
async def create_integration(
body: IntegrationIn,
current_user: User = Depends(get_current_user),
_db: AsyncSession = Depends(get_db),
):
"""연동 등록 (ADMIN)."""
if current_user.role != UserRole.ADMIN:
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
if body.type not in INTEGRATION_TYPES:
raise HTTPException(400, f"유효하지 않은 연동 유형: {body.type}. "
f"허용: {list(INTEGRATION_TYPES)}")
int_id = _gen_int_id()
api_key_raw = _gen_api_key()
api_key_hash = _hash_secret(api_key_raw)
secret_hash = _hash_secret(body.secret) if body.secret else None
record = {
"int_id": int_id,
"name": body.name,
"type": body.type,
"type_label": INTEGRATION_TYPES[body.type],
"description": body.description,
"endpoint_url": body.endpoint_url,
"api_key_hash": api_key_hash, # 평문 저장 금지
"secret_hash": secret_hash, # 평문 저장 금지
"headers": _mask_dict(body.headers),
"enabled": body.enabled,
"rate_limit": body.rate_limit,
"created_by": current_user.username,
"created_at": datetime.utcnow().isoformat(),
"stats": {
"total_requests": 0,
"success_requests": 0,
"error_requests": 0,
"last_request_at": None,
},
}
_integrations[int_id] = record
_append_log(int_id, "SYSTEM", "INTEGRATION_CREATED", "OK",
{"name": body.name, "type": body.type})
logger.info("연동 등록: %s (%s) by %s", int_id, body.name, current_user.username)
# API 키는 생성 시 단 1회만 반환
return {**record, "api_key": api_key_raw,
"warning": "API 키는 이번 한 번만 표시됩니다. 안전하게 보관하세요."}
@router.get("/integrations")
async def list_integrations(
type_filter: Optional[str] = Query(None),
enabled_only: bool = Query(False),
current_user: User = Depends(get_current_user),
_db: AsyncSession = Depends(get_db),
):
"""연동 목록 (ADMIN)."""
if current_user.role != UserRole.ADMIN:
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
items = list(_integrations.values())
if type_filter:
items = [i for i in items if i["type"] == type_filter]
if enabled_only:
items = [i for i in items if i["enabled"]]
return {"total": len(items), "integrations": items}
@router.get("/integrations/{int_id}")
async def get_integration(
int_id: str,
current_user: User = Depends(get_current_user),
_db: AsyncSession = Depends(get_db),
):
"""연동 상세 조회 (ADMIN)."""
if current_user.role != UserRole.ADMIN:
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
rec = _integrations.get(int_id)
if not rec:
raise HTTPException(404, f"연동 '{int_id}'를 찾을 수 없습니다.")
# api_key_hash / secret_hash 는 응답에서 제외
safe = {k: v for k, v in rec.items() if k not in ("api_key_hash", "secret_hash")}
return safe
@router.put("/integrations/{int_id}")
async def update_integration(
int_id: str,
body: IntegrationUpdateIn,
current_user: User = Depends(get_current_user),
_db: AsyncSession = Depends(get_db),
):
"""연동 수정 (ADMIN)."""
if current_user.role != UserRole.ADMIN:
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
rec = _integrations.get(int_id)
if not rec:
raise HTTPException(404, f"연동 '{int_id}'를 찾을 수 없습니다.")
if body.name is not None: rec["name"] = body.name
if body.description is not None: rec["description"] = body.description
if body.endpoint_url is not None: rec["endpoint_url"] = body.endpoint_url
if body.enabled is not None: rec["enabled"] = body.enabled
if body.rate_limit is not None: rec["rate_limit"] = body.rate_limit
rec["updated_at"] = datetime.utcnow().isoformat()
rec["updated_by"] = current_user.username
_append_log(int_id, "SYSTEM", "INTEGRATION_UPDATED", "OK")
return {k: v for k, v in rec.items() if k not in ("api_key_hash", "secret_hash")}
@router.delete("/integrations/{int_id}")
async def delete_integration(
int_id: str,
current_user: User = Depends(get_current_user),
_db: AsyncSession = Depends(get_db),
):
"""연동 삭제 (ADMIN)."""
if current_user.role != UserRole.ADMIN:
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
rec = _integrations.pop(int_id, None)
if not rec:
raise HTTPException(404, f"연동 '{int_id}'를 찾을 수 없습니다.")
_append_log(int_id, "SYSTEM", "INTEGRATION_DELETED", "OK")
logger.info("연동 삭제: %s by %s", int_id, current_user.username)
return {"message": f"연동 '{int_id}'가 삭제되었습니다."}
@router.post("/integrations/{int_id}/test")
async def test_integration(
int_id: str,
current_user: User = Depends(get_current_user),
_db: AsyncSession = Depends(get_db),
):
"""연동 연결 테스트 (아웃바운드 엔드포인트 핑)."""
if current_user.role != UserRole.ADMIN:
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
rec = _integrations.get(int_id)
if not rec:
raise HTTPException(404, f"연동 '{int_id}'를 찾을 수 없습니다.")
url = rec.get("endpoint_url")
if not url:
return {"status": "SKIPPED", "message": "endpoint_url이 설정되지 않았습니다."}
try:
import httpx
start = time.time()
async with httpx.AsyncClient(timeout=5.0, verify=False) as client:
resp = await client.get(url)
elapsed_ms = round((time.time() - start) * 1000)
status = "OK" if resp.status_code < 400 else "ERROR"
_append_log(int_id, "OUT", "TEST_PING", status,
{"url": url, "status_code": resp.status_code})
return {
"status": status,
"status_code": resp.status_code,
"elapsed_ms": elapsed_ms,
"url": url,
}
except Exception as e:
_append_log(int_id, "OUT", "TEST_PING", "ERROR", error=str(e)[:200])
return {"status": "ERROR", "message": str(e)[:200], "url": url}
@router.post("/webhook/{webhook_key}", status_code=202)
async def receive_webhook(
webhook_key: str,
request: Request,
_db: AsyncSession = Depends(get_db),
):
"""
웹훅 수신 엔드포인트 (외부 시스템 → GUARDiA).
webhook_key = api_key 해시 식별자.
HMAC-SHA256 서명이 있으면 검증, 없으면 키만 확인.
"""
# API 키 해시로 연동 조회
key_hash = _hash_secret(webhook_key)
rec = next(
(r for r in _integrations.values()
if r.get("api_key_hash") == key_hash and r.get("enabled", True)),
None,
)
if not rec:
logger.warning("알 수 없는 웹훅 키: %s...", webhook_key[:8])
raise HTTPException(401, "유효하지 않은 웹훅 키입니다.")
int_id = rec["int_id"]
# Rate Limit 검사
if _check_rate_limit(int_id, rec.get("rate_limit", RATE_LIMIT_RPM)):
_append_log(int_id, "IN", "WEBHOOK_RATE_LIMITED", "RATE_LIMITED")
raise HTTPException(429, "요청 한도를 초과했습니다.")
# HMAC 서명 검증 (선택)
body_bytes = await request.body()
sig_header = request.headers.get("X-Hub-Signature-256", "")
if sig_header and rec.get("secret_hash"):
if not _verify_hmac(body_bytes, sig_header, rec["secret_hash"]):
_append_log(int_id, "IN", "WEBHOOK_SIGNATURE_FAILED", "ERROR")
raise HTTPException(403, "웹훅 서명 검증 실패")
# 페이로드 파싱
try:
payload = json.loads(body_bytes) if body_bytes else {}
except json.JSONDecodeError:
payload = {"raw": body_bytes.decode(errors="replace")[:500]}
event = payload.get("event", "UNKNOWN")
_append_log(int_id, "IN", f"WEBHOOK_{event}", "OK", payload)
# 통계 업데이트
rec["stats"]["total_requests"] += 1
rec["stats"]["success_requests"] += 1
rec["stats"]["last_request_at"] = datetime.utcnow().isoformat()
logger.info("웹훅 수신: %s event=%s", int_id, event)
return {"accepted": True, "event": event, "int_id": int_id}
@router.post("/send/{int_id}", status_code=202)
async def send_outbound(
int_id: str,
body: OutboundPayload,
current_user: User = Depends(get_current_user),
_db: AsyncSession = Depends(get_db),
):
"""아웃바운드 웹훅 발송 (GUARDiA → 외부)."""
if current_user.role not in (UserRole.ADMIN, UserRole.PM):
raise HTTPException(403, "PM/ADMIN 권한이 필요합니다.")
rec = _integrations.get(int_id)
if not rec:
raise HTTPException(404, f"연동 '{int_id}'를 찾을 수 없습니다.")
if not rec.get("enabled"):
raise HTTPException(400, "비활성화된 연동입니다.")
url = rec.get("endpoint_url")
if not url:
raise HTTPException(400, "endpoint_url이 설정되지 않았습니다.")
# Rate Limit
if _check_rate_limit(int_id, rec.get("rate_limit", RATE_LIMIT_RPM)):
_append_log(int_id, "OUT", "SEND_RATE_LIMITED", "RATE_LIMITED")
raise HTTPException(429, "발송 한도를 초과했습니다.")
payload_dict = {
"event": body.event,
"message": body.message,
"severity": body.severity,
"data": body.data,
"source": "guardia_itsm",
"sent_at": datetime.utcnow().isoformat(),
}
try:
import httpx
headers = {"Content-Type": "application/json"}
headers.update(rec.get("headers", {}))
# Authorization 헤더는 로그에서 마스킹됨
async with httpx.AsyncClient(timeout=8.0, verify=False) as client:
resp = await client.post(url, json=payload_dict, headers=headers)
status = "OK" if resp.status_code < 400 else "ERROR"
_append_log(int_id, "OUT", f"SEND_{body.event}", status, payload_dict)
rec["stats"]["total_requests"] += 1
if status == "OK":
rec["stats"]["success_requests"] += 1
else:
rec["stats"]["error_requests"] += 1
rec["stats"]["last_request_at"] = datetime.utcnow().isoformat()
return {"sent": True, "status_code": resp.status_code, "int_id": int_id}
except Exception as e:
_append_log(int_id, "OUT", f"SEND_{body.event}", "ERROR", error=str(e)[:200])
rec["stats"]["total_requests"] += 1
rec["stats"]["error_requests"] += 1
return {"sent": False, "error": str(e)[:200], "int_id": int_id}
@router.get("/logs")
async def get_logs(
int_id: Optional[str] = Query(None),
limit: int = Query(50, ge=1, le=500),
current_user: User = Depends(get_current_user),
_db: AsyncSession = Depends(get_db),
):
"""연동 로그 조회 (ADMIN)."""
if current_user.role != UserRole.ADMIN:
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
logs = _gw_logs if not int_id else [l for l in _gw_logs if l["int_id"] == int_id]
logs_sorted = sorted(logs, key=lambda x: x["logged_at"], reverse=True)[:limit]
return {"total": len(logs_sorted), "logs": logs_sorted}
@router.get("/stats")
async def get_stats(
current_user: User = Depends(get_current_user),
_db: AsyncSession = Depends(get_db),
):
"""연동 전체 통계 (ADMIN)."""
if current_user.role != UserRole.ADMIN:
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
total_integrations = len(_integrations)
enabled_count = sum(1 for r in _integrations.values() if r.get("enabled"))
total_requests = sum(r["stats"]["total_requests"] for r in _integrations.values())
total_errors = sum(r["stats"]["error_requests"] for r in _integrations.values())
success_rate = (
round((total_requests - total_errors) / total_requests * 100, 1)
if total_requests > 0 else 100.0
)
by_type = {}
for r in _integrations.values():
t = r["type"]
by_type[t] = by_type.get(t, 0) + 1
return {
"total_integrations": total_integrations,
"enabled": enabled_count,
"disabled": total_integrations - enabled_count,
"total_requests": total_requests,
"total_errors": total_errors,
"success_rate_pct": success_rate,
"by_type": by_type,
"total_log_entries": len(_gw_logs),
"integration_types": INTEGRATION_TYPES,
}
# ── G-9: Jira/Confluence 연동 엔드포인트 ─────────────────────────────────────
class JiraSyncRequest(BaseModel):
description: Optional[str] = None
@router.post("/jira/sync/{sr_id}", status_code=201)
async def jira_sync_sr(
sr_id: str,
body: JiraSyncRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""SR → Jira 이슈 생성 및 SR에 jira_key 저장."""
from models import SRRequest
from sqlalchemy import select
r = await db.execute(select(SRRequest).where(SRRequest.sr_id == sr_id))
sr = r.scalars().first()
if not sr:
raise HTTPException(404, f"SR {sr_id}를 찾을 수 없습니다.")
from core.jira_sync import create_jira_issue
key = await create_jira_issue(
sr_id = sr.sr_id,
title = sr.title,
description = body.description or sr.description or "",
priority = sr.priority or "MEDIUM",
)
if key:
sr.jira_key = key
await db.commit()
return {"sr_id": sr_id, "jira_key": key, "synced": True}
else:
raise HTTPException(503, "Jira 연동에 실패했습니다. JIRA_BASE_URL/JIRA_TOKEN 설정을 확인하세요.")
@router.get("/jira/status/{jira_key}")
async def jira_issue_status(
jira_key: str,
current_user: User = Depends(get_current_user),
):
"""Jira 이슈 상태 조회."""
from core.jira_sync import get_jira_issue_status
result = await get_jira_issue_status(jira_key)
if result is None:
raise HTTPException(503, "Jira 조회 실패 또는 JIRA_BASE_URL 미설정")
return result
@router.get("/jira/projects")
async def jira_projects(
current_user: User = Depends(get_current_user),
):
"""연결된 Jira 프로젝트 목록."""
from core.jira_sync import list_jira_projects
return {"projects": await list_jira_projects()}
class ConfluencePublishRequest(BaseModel):
kb_id: int
@router.post("/confluence/publish")
async def publish_to_confluence(
body: ConfluencePublishRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""KB 문서를 Confluence 페이지로 발행."""
from models import KnowledgeBase
from sqlalchemy import select
kb = await db.get(KnowledgeBase, body.kb_id)
if not kb:
raise HTTPException(404, f"KB ID {body.kb_id}를 찾을 수 없습니다.")
from core.jira_sync import create_confluence_page
url = await create_confluence_page(
title = kb.title,
content = kb.content or "",
)
if url:
return {"kb_id": body.kb_id, "confluence_url": url, "published": True}
else:
raise HTTPException(503, "Confluence 발행 실패 또는 CONFLUENCE_BASE_URL 미설정")