""" F-5: OpenAPI 외부 연동 게이트웨이 기능: 1. 외부 시스템 연동 등록 (API Key 기반 인증) 2. 연동 상태 관리 (활성/비활성/테스트) 3. 웹훅 수신 (외부 시스템 → GUARDiA 이벤트 주입) 4. 아웃바운드 웹훅 발송 (GUARDiA → 외부 시스템 알림) 5. API 사용량 추적 및 Rate Limit 6. 연동 로그 (요청/응답 기록, 민감 데이터 마스킹) 7. OpenAPI 스키마 자동 노출 (외부 연동 가이드) 보안 원칙: - API Key는 SHA-256 해시로 저장 (평문 불가) - 아웃바운드 요청은 내부 네트워크(온프레미스)만 허용 - 요청/응답 로그에서 Authorization, password, secret 자동 마스킹 - HMAC-SHA256 서명 검증 (웹훅 무결성) 엔드포인트: POST /api/gateway/integrations — 연동 등록 GET /api/gateway/integrations — 연동 목록 GET /api/gateway/integrations/{id} — 연동 상세 PUT /api/gateway/integrations/{id} — 연동 수정 DELETE /api/gateway/integrations/{id}— 연동 삭제 POST /api/gateway/integrations/{id}/test — 연동 테스트 POST /api/gateway/webhook/{key} — 웹훅 수신 (외부 → GUARDiA) POST /api/gateway/send/{id} — 아웃바운드 알림 발송 GET /api/gateway/logs — 연동 로그 조회 GET /api/gateway/stats — 연동 통계 """ from __future__ import annotations import hashlib import hmac import json import logging import re import time from datetime import datetime, timedelta from typing import Any, Dict, List, Optional from uuid import uuid4 from fastapi import APIRouter, Depends, HTTPException, Request, Query from fastapi.responses import JSONResponse from pydantic import BaseModel, Field from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user from database import get_db from models import User, UserRole logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/gateway", tags=["gateway"]) # ── 인메모리 저장소 ────────────────────────────────────────────────────────── _integrations: Dict[str, Dict] = {} # int_id -> record _gw_logs: List[Dict] = [] # 최근 1000건 _rate_counts: Dict[str, List] = {} # int_id -> [timestamp, ...] MAX_LOGS = 1000 RATE_LIMIT_RPM = 60 # 기본 분당 60회 # ── 지원 연동 유형 ──────────────────────────────────────────────────────────── INTEGRATION_TYPES = { "WEBHOOK_IN": "웹훅 수신 (외부 → GUARDiA)", "WEBHOOK_OUT": "웹훅 발송 (GUARDiA → 외부)", "REST_API": "REST API 연동", "MONITORING": "모니터링 시스템 연동", "TICKETING": "외부 티켓팅 시스템", "NOTIFICATION": "알림 채널 연동 (메신저/이메일)", } # 마스킹 대상 키워드 _MASK_KEYS = re.compile( r"(authorization|password|secret|token|api_key|apikey|key|credential)", re.IGNORECASE ) # ── Pydantic 스키마 ────────────────────────────────────────────────────────── class IntegrationIn(BaseModel): name: str type: str # INTEGRATION_TYPES key description: str = "" endpoint_url: Optional[str] = None # 아웃바운드 대상 URL secret: Optional[str] = None # 웹훅 서명 시크릿 (평문 → 해시 저장) headers: Dict[str, str] = {} # 커스텀 헤더 (마스킹 저장) enabled: bool = True rate_limit: int = RATE_LIMIT_RPM # 분당 요청 제한 class IntegrationUpdateIn(BaseModel): name: Optional[str] = None description: Optional[str] = None endpoint_url: Optional[str] = None enabled: Optional[bool] = None rate_limit: Optional[int] = None class WebhookPayload(BaseModel): event: str data: Dict[str, Any] = {} source: str = "external" class OutboundPayload(BaseModel): event: str message: str data: Dict[str, Any] = {} severity: str = "INFO" # ── 헬퍼 ──────────────────────────────────────────────────────────────────── def _gen_int_id() -> str: return f"GW-{uuid4().hex[:8].upper()}" def _gen_api_key() -> str: """API 키 생성 (평문 반환 — 이 한 번만 노출).""" return f"gk_{uuid4().hex}{uuid4().hex[:8]}" def _hash_secret(raw: str) -> str: return hashlib.sha256(raw.encode()).hexdigest() def _mask_dict(d: Dict) -> Dict: """딕셔너리에서 민감 키 값을 마스킹.""" result = {} for k, v in d.items(): if _MASK_KEYS.search(str(k)): result[k] = "***" elif isinstance(v, dict): result[k] = _mask_dict(v) else: result[k] = v return result def _append_log(int_id: str, direction: str, event: str, status: str, payload: Any = None, error: str = "") -> None: """연동 로그 기록 (민감 데이터 마스킹).""" safe_payload = None if payload: try: safe_payload = _mask_dict(payload) if isinstance(payload, dict) else str(payload)[:200] except Exception: safe_payload = "[마스킹 오류]" entry = { "log_id": f"LOG-{uuid4().hex[:8].upper()}", "int_id": int_id, "direction": direction, # IN / OUT "event": event, "status": status, # OK / ERROR / RATE_LIMITED "payload": safe_payload, "error": error, "logged_at": datetime.utcnow().isoformat(), } _gw_logs.append(entry) if len(_gw_logs) > MAX_LOGS: _gw_logs.pop(0) def _check_rate_limit(int_id: str, limit: int) -> bool: """True → 제한 초과, False → 허용.""" now = time.time() window_start = now - 60.0 hits = _rate_counts.setdefault(int_id, []) # 1분 윈도우 밖 항목 제거 _rate_counts[int_id] = [t for t in hits if t > window_start] if len(_rate_counts[int_id]) >= limit: return True _rate_counts[int_id].append(now) return False def _verify_hmac(payload: bytes, signature: str, secret_hash: str) -> bool: """ 웹훅 HMAC-SHA256 서명 검증. 외부에서 전송된 signature를 secret 해시와 대조. """ # 실제 운영에서는 원본 secret으로 검증해야 하지만, # 보안상 secret 평문은 저장하지 않으므로 해시 기반 검증 방식을 안내 # (운영 환경에서는 HSM 또는 별도 키 스토어 사용 권장) try: expected = hmac.new( secret_hash.encode(), payload, hashlib.sha256 ).hexdigest() sig = signature.removeprefix("sha256=") return hmac.compare_digest(expected, sig) except Exception: return False # ── 엔드포인트 ─────────────────────────────────────────────────────────────── @router.post("/integrations", status_code=201) async def create_integration( body: IntegrationIn, current_user: User = Depends(get_current_user), _db: AsyncSession = Depends(get_db), ): """연동 등록 (ADMIN).""" if current_user.role != UserRole.ADMIN: raise HTTPException(403, "ADMIN 권한이 필요합니다.") if body.type not in INTEGRATION_TYPES: raise HTTPException(400, f"유효하지 않은 연동 유형: {body.type}. " f"허용: {list(INTEGRATION_TYPES)}") int_id = _gen_int_id() api_key_raw = _gen_api_key() api_key_hash = _hash_secret(api_key_raw) secret_hash = _hash_secret(body.secret) if body.secret else None record = { "int_id": int_id, "name": body.name, "type": body.type, "type_label": INTEGRATION_TYPES[body.type], "description": body.description, "endpoint_url": body.endpoint_url, "api_key_hash": api_key_hash, # 평문 저장 금지 "secret_hash": secret_hash, # 평문 저장 금지 "headers": _mask_dict(body.headers), "enabled": body.enabled, "rate_limit": body.rate_limit, "created_by": current_user.username, "created_at": datetime.utcnow().isoformat(), "stats": { "total_requests": 0, "success_requests": 0, "error_requests": 0, "last_request_at": None, }, } _integrations[int_id] = record _append_log(int_id, "SYSTEM", "INTEGRATION_CREATED", "OK", {"name": body.name, "type": body.type}) logger.info("연동 등록: %s (%s) by %s", int_id, body.name, current_user.username) # API 키는 생성 시 단 1회만 반환 return {**record, "api_key": api_key_raw, "warning": "API 키는 이번 한 번만 표시됩니다. 안전하게 보관하세요."} @router.get("/integrations") async def list_integrations( type_filter: Optional[str] = Query(None), enabled_only: bool = Query(False), current_user: User = Depends(get_current_user), _db: AsyncSession = Depends(get_db), ): """연동 목록 (ADMIN).""" if current_user.role != UserRole.ADMIN: raise HTTPException(403, "ADMIN 권한이 필요합니다.") items = list(_integrations.values()) if type_filter: items = [i for i in items if i["type"] == type_filter] if enabled_only: items = [i for i in items if i["enabled"]] return {"total": len(items), "integrations": items} @router.get("/integrations/{int_id}") async def get_integration( int_id: str, current_user: User = Depends(get_current_user), _db: AsyncSession = Depends(get_db), ): """연동 상세 조회 (ADMIN).""" if current_user.role != UserRole.ADMIN: raise HTTPException(403, "ADMIN 권한이 필요합니다.") rec = _integrations.get(int_id) if not rec: raise HTTPException(404, f"연동 '{int_id}'를 찾을 수 없습니다.") # api_key_hash / secret_hash 는 응답에서 제외 safe = {k: v for k, v in rec.items() if k not in ("api_key_hash", "secret_hash")} return safe @router.put("/integrations/{int_id}") async def update_integration( int_id: str, body: IntegrationUpdateIn, current_user: User = Depends(get_current_user), _db: AsyncSession = Depends(get_db), ): """연동 수정 (ADMIN).""" if current_user.role != UserRole.ADMIN: raise HTTPException(403, "ADMIN 권한이 필요합니다.") rec = _integrations.get(int_id) if not rec: raise HTTPException(404, f"연동 '{int_id}'를 찾을 수 없습니다.") if body.name is not None: rec["name"] = body.name if body.description is not None: rec["description"] = body.description if body.endpoint_url is not None: rec["endpoint_url"] = body.endpoint_url if body.enabled is not None: rec["enabled"] = body.enabled if body.rate_limit is not None: rec["rate_limit"] = body.rate_limit rec["updated_at"] = datetime.utcnow().isoformat() rec["updated_by"] = current_user.username _append_log(int_id, "SYSTEM", "INTEGRATION_UPDATED", "OK") return {k: v for k, v in rec.items() if k not in ("api_key_hash", "secret_hash")} @router.delete("/integrations/{int_id}") async def delete_integration( int_id: str, current_user: User = Depends(get_current_user), _db: AsyncSession = Depends(get_db), ): """연동 삭제 (ADMIN).""" if current_user.role != UserRole.ADMIN: raise HTTPException(403, "ADMIN 권한이 필요합니다.") rec = _integrations.pop(int_id, None) if not rec: raise HTTPException(404, f"연동 '{int_id}'를 찾을 수 없습니다.") _append_log(int_id, "SYSTEM", "INTEGRATION_DELETED", "OK") logger.info("연동 삭제: %s by %s", int_id, current_user.username) return {"message": f"연동 '{int_id}'가 삭제되었습니다."} @router.post("/integrations/{int_id}/test") async def test_integration( int_id: str, current_user: User = Depends(get_current_user), _db: AsyncSession = Depends(get_db), ): """연동 연결 테스트 (아웃바운드 엔드포인트 핑).""" if current_user.role != UserRole.ADMIN: raise HTTPException(403, "ADMIN 권한이 필요합니다.") rec = _integrations.get(int_id) if not rec: raise HTTPException(404, f"연동 '{int_id}'를 찾을 수 없습니다.") url = rec.get("endpoint_url") if not url: return {"status": "SKIPPED", "message": "endpoint_url이 설정되지 않았습니다."} try: import httpx start = time.time() async with httpx.AsyncClient(timeout=5.0, verify=False) as client: resp = await client.get(url) elapsed_ms = round((time.time() - start) * 1000) status = "OK" if resp.status_code < 400 else "ERROR" _append_log(int_id, "OUT", "TEST_PING", status, {"url": url, "status_code": resp.status_code}) return { "status": status, "status_code": resp.status_code, "elapsed_ms": elapsed_ms, "url": url, } except Exception as e: _append_log(int_id, "OUT", "TEST_PING", "ERROR", error=str(e)[:200]) return {"status": "ERROR", "message": str(e)[:200], "url": url} @router.post("/webhook/{webhook_key}", status_code=202) async def receive_webhook( webhook_key: str, request: Request, _db: AsyncSession = Depends(get_db), ): """ 웹훅 수신 엔드포인트 (외부 시스템 → GUARDiA). webhook_key = api_key 해시 식별자. HMAC-SHA256 서명이 있으면 검증, 없으면 키만 확인. """ # API 키 해시로 연동 조회 key_hash = _hash_secret(webhook_key) rec = next( (r for r in _integrations.values() if r.get("api_key_hash") == key_hash and r.get("enabled", True)), None, ) if not rec: logger.warning("알 수 없는 웹훅 키: %s...", webhook_key[:8]) raise HTTPException(401, "유효하지 않은 웹훅 키입니다.") int_id = rec["int_id"] # Rate Limit 검사 if _check_rate_limit(int_id, rec.get("rate_limit", RATE_LIMIT_RPM)): _append_log(int_id, "IN", "WEBHOOK_RATE_LIMITED", "RATE_LIMITED") raise HTTPException(429, "요청 한도를 초과했습니다.") # HMAC 서명 검증 (선택) body_bytes = await request.body() sig_header = request.headers.get("X-Hub-Signature-256", "") if sig_header and rec.get("secret_hash"): if not _verify_hmac(body_bytes, sig_header, rec["secret_hash"]): _append_log(int_id, "IN", "WEBHOOK_SIGNATURE_FAILED", "ERROR") raise HTTPException(403, "웹훅 서명 검증 실패") # 페이로드 파싱 try: payload = json.loads(body_bytes) if body_bytes else {} except json.JSONDecodeError: payload = {"raw": body_bytes.decode(errors="replace")[:500]} event = payload.get("event", "UNKNOWN") _append_log(int_id, "IN", f"WEBHOOK_{event}", "OK", payload) # 통계 업데이트 rec["stats"]["total_requests"] += 1 rec["stats"]["success_requests"] += 1 rec["stats"]["last_request_at"] = datetime.utcnow().isoformat() logger.info("웹훅 수신: %s event=%s", int_id, event) return {"accepted": True, "event": event, "int_id": int_id} @router.post("/send/{int_id}", status_code=202) async def send_outbound( int_id: str, body: OutboundPayload, current_user: User = Depends(get_current_user), _db: AsyncSession = Depends(get_db), ): """아웃바운드 웹훅 발송 (GUARDiA → 외부).""" if current_user.role not in (UserRole.ADMIN, UserRole.PM): raise HTTPException(403, "PM/ADMIN 권한이 필요합니다.") rec = _integrations.get(int_id) if not rec: raise HTTPException(404, f"연동 '{int_id}'를 찾을 수 없습니다.") if not rec.get("enabled"): raise HTTPException(400, "비활성화된 연동입니다.") url = rec.get("endpoint_url") if not url: raise HTTPException(400, "endpoint_url이 설정되지 않았습니다.") # Rate Limit if _check_rate_limit(int_id, rec.get("rate_limit", RATE_LIMIT_RPM)): _append_log(int_id, "OUT", "SEND_RATE_LIMITED", "RATE_LIMITED") raise HTTPException(429, "발송 한도를 초과했습니다.") payload_dict = { "event": body.event, "message": body.message, "severity": body.severity, "data": body.data, "source": "guardia_itsm", "sent_at": datetime.utcnow().isoformat(), } try: import httpx headers = {"Content-Type": "application/json"} headers.update(rec.get("headers", {})) # Authorization 헤더는 로그에서 마스킹됨 async with httpx.AsyncClient(timeout=8.0, verify=False) as client: resp = await client.post(url, json=payload_dict, headers=headers) status = "OK" if resp.status_code < 400 else "ERROR" _append_log(int_id, "OUT", f"SEND_{body.event}", status, payload_dict) rec["stats"]["total_requests"] += 1 if status == "OK": rec["stats"]["success_requests"] += 1 else: rec["stats"]["error_requests"] += 1 rec["stats"]["last_request_at"] = datetime.utcnow().isoformat() return {"sent": True, "status_code": resp.status_code, "int_id": int_id} except Exception as e: _append_log(int_id, "OUT", f"SEND_{body.event}", "ERROR", error=str(e)[:200]) rec["stats"]["total_requests"] += 1 rec["stats"]["error_requests"] += 1 return {"sent": False, "error": str(e)[:200], "int_id": int_id} @router.get("/logs") async def get_logs( int_id: Optional[str] = Query(None), limit: int = Query(50, ge=1, le=500), current_user: User = Depends(get_current_user), _db: AsyncSession = Depends(get_db), ): """연동 로그 조회 (ADMIN).""" if current_user.role != UserRole.ADMIN: raise HTTPException(403, "ADMIN 권한이 필요합니다.") logs = _gw_logs if not int_id else [l for l in _gw_logs if l["int_id"] == int_id] logs_sorted = sorted(logs, key=lambda x: x["logged_at"], reverse=True)[:limit] return {"total": len(logs_sorted), "logs": logs_sorted} @router.get("/stats") async def get_stats( current_user: User = Depends(get_current_user), _db: AsyncSession = Depends(get_db), ): """연동 전체 통계 (ADMIN).""" if current_user.role != UserRole.ADMIN: raise HTTPException(403, "ADMIN 권한이 필요합니다.") total_integrations = len(_integrations) enabled_count = sum(1 for r in _integrations.values() if r.get("enabled")) total_requests = sum(r["stats"]["total_requests"] for r in _integrations.values()) total_errors = sum(r["stats"]["error_requests"] for r in _integrations.values()) success_rate = ( round((total_requests - total_errors) / total_requests * 100, 1) if total_requests > 0 else 100.0 ) by_type = {} for r in _integrations.values(): t = r["type"] by_type[t] = by_type.get(t, 0) + 1 return { "total_integrations": total_integrations, "enabled": enabled_count, "disabled": total_integrations - enabled_count, "total_requests": total_requests, "total_errors": total_errors, "success_rate_pct": success_rate, "by_type": by_type, "total_log_entries": len(_gw_logs), "integration_types": INTEGRATION_TYPES, } # ── G-9: Jira/Confluence 연동 엔드포인트 ───────────────────────────────────── class JiraSyncRequest(BaseModel): description: Optional[str] = None @router.post("/jira/sync/{sr_id}", status_code=201) async def jira_sync_sr( sr_id: str, body: JiraSyncRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """SR → Jira 이슈 생성 및 SR에 jira_key 저장.""" from models import SRRequest from sqlalchemy import select r = await db.execute(select(SRRequest).where(SRRequest.sr_id == sr_id)) sr = r.scalars().first() if not sr: raise HTTPException(404, f"SR {sr_id}를 찾을 수 없습니다.") from core.jira_sync import create_jira_issue key = await create_jira_issue( sr_id = sr.sr_id, title = sr.title, description = body.description or sr.description or "", priority = sr.priority or "MEDIUM", ) if key: sr.jira_key = key await db.commit() return {"sr_id": sr_id, "jira_key": key, "synced": True} else: raise HTTPException(503, "Jira 연동에 실패했습니다. JIRA_BASE_URL/JIRA_TOKEN 설정을 확인하세요.") @router.get("/jira/status/{jira_key}") async def jira_issue_status( jira_key: str, current_user: User = Depends(get_current_user), ): """Jira 이슈 상태 조회.""" from core.jira_sync import get_jira_issue_status result = await get_jira_issue_status(jira_key) if result is None: raise HTTPException(503, "Jira 조회 실패 또는 JIRA_BASE_URL 미설정") return result @router.get("/jira/projects") async def jira_projects( current_user: User = Depends(get_current_user), ): """연결된 Jira 프로젝트 목록.""" from core.jira_sync import list_jira_projects return {"projects": await list_jira_projects()} class ConfluencePublishRequest(BaseModel): kb_id: int @router.post("/confluence/publish") async def publish_to_confluence( body: ConfluencePublishRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """KB 문서를 Confluence 페이지로 발행.""" from models import KnowledgeBase from sqlalchemy import select kb = await db.get(KnowledgeBase, body.kb_id) if not kb: raise HTTPException(404, f"KB ID {body.kb_id}를 찾을 수 없습니다.") from core.jira_sync import create_confluence_page url = await create_confluence_page( title = kb.title, content = kb.content or "", ) if url: return {"kb_id": body.kb_id, "confluence_url": url, "published": True} else: raise HTTPException(503, "Confluence 발행 실패 또는 CONFLUENCE_BASE_URL 미설정")