guardia-itsm/middleware/tenant.py
DESKTOP-TKLFCPRython 64c27c3509 feat(itsm): G-1~G-12 확장 기능 + 하네스/봇/설치스크립트 구현
G-1: 메신저 Webhook Relay + _send_to_room 실제 httpx 호출 구현
G-2: POST /api/tasks/bulk SR 대량작업 엔드포인트 (최대 100건)
G-3: 라이선스 만료 알림 스케줄러 (매일 09:00 KST)
G-4: 체험판 upgrade_banner 필드 + license.py 배너 로직
G-5: core/auto_rca.py + incidents/problem auto-rca 엔드포인트
G-6: core/deploy_impact.py + vibe impact-analysis 엔드포인트
G-7: core/ticket_classifier.py + SR 생성 시 AI 분류 + ai-suggestion API
G-8: VulnPatchRecord 모델 + vuln_scan 패치추적 4개 엔드포인트
G-9: core/jira_sync.py + gateway Jira/Confluence 연동 엔드포인트
G-10: core/push_notify.py + routers/push.py + PushSubscription 모델
G-11: approvals 다중승인 (위임/서명/기한초과/마감연장)
G-12: alembic.ini + migrations/ + cicd/migrate_to_postgres.sh

하네스: guardia-orchestrator 확장기능 Phase 반영
봇명령어: /sr /status /license /bulk 슬래시 명령어 추가
설치스크립트: setup/ (Ubuntu, CentOS, RHEL, Windows) --test 옵션 포함

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 18:18:52 +09:00

256 lines
9.0 KiB
Python

"""
F-1: 멀티테넌트 데이터 격리
설계 원칙:
- 테넌트 식별: X-Tenant-ID 헤더 > JWT claim > 기본값(DEFAULT)
- ContextVar 기반 async-safe 테넌트 컨텍스트
- 모든 DB 쿼리에 tenant_id 필터 자동 주입 가능
- 테넌트별 설정(rate limit, storage quota) 지원
- ADMIN은 모든 테넌트 데이터 열람 가능 (X-Tenant-Override)
사용법:
# main.py에서 미들웨어 등록
from middleware.tenant import TenantMiddleware
app.add_middleware(TenantMiddleware)
# 라우터에서 현재 테넌트 조회
from middleware.tenant import get_current_tenant_id, require_tenant
tenant_id = get_current_tenant_id() # 현재 요청의 테넌트 ID
# SQLAlchemy 쿼리에 테넌트 필터 적용
from middleware.tenant import apply_tenant_filter
stmt = apply_tenant_filter(select(SRRequest), SRRequest)
엔드포인트 (routers/tenant_mgmt.py):
POST /api/tenants — 테넌트 생성 (SUPER_ADMIN)
GET /api/tenants — 테넌트 목록
GET /api/tenants/{tid} — 테넌트 상세
PUT /api/tenants/{tid} — 테넌트 수정
DELETE /api/tenants/{tid} — 테넌트 비활성화
GET /api/tenants/current — 현재 요청의 테넌트 정보
POST /api/tenants/{tid}/quota — 쿼터 설정
"""
from __future__ import annotations
import logging
from contextvars import ContextVar
from datetime import datetime
from typing import Any, Callable, Dict, Optional
from uuid import uuid4
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from starlette.types import ASGIApp
logger = logging.getLogger(__name__)
# ── 테넌트 컨텍스트 (async-safe ContextVar) ──────────────────────────────────
_tenant_id_ctx: ContextVar[str] = ContextVar("tenant_id", default="DEFAULT")
_tenant_ctx: ContextVar[Optional[Dict]] = ContextVar("tenant_obj", default=None)
# ── 테넌트 레지스트리 (인메모리 — 프로덕션에서는 DB 사용) ────────────────────
_tenants: Dict[str, Dict] = {
"DEFAULT": {
"tenant_id": "DEFAULT",
"name": "기본 테넌트",
"code": "DEFAULT",
"is_active": True,
"is_system": True,
"plan": "ENTERPRISE",
"quota": {
"max_users": 1000,
"max_servers": 500,
"max_sr_per_month": 10000,
"storage_gb": 100,
},
"settings": {
"rate_limit_rpm": 600,
"allow_external": False,
},
"created_at": datetime.utcnow().isoformat(),
"created_by": "system",
}
}
# ── 공개 헬퍼 함수 ───────────────────────────────────────────────────────────
def get_current_tenant_id() -> str:
"""현재 요청의 테넌트 ID 반환."""
return _tenant_id_ctx.get()
def get_current_tenant() -> Optional[Dict]:
"""현재 요청의 테넌트 객체 반환."""
return _tenant_ctx.get()
def set_tenant(tenant_id: str) -> None:
"""테넌트 컨텍스트 수동 설정 (테스트용)."""
_tenant_id_ctx.set(tenant_id)
_tenant_ctx.set(_tenants.get(tenant_id))
def get_tenant(tenant_id: str) -> Optional[Dict]:
"""테넌트 ID로 테넌트 조회."""
return _tenants.get(tenant_id)
def list_tenants() -> list:
"""전체 테넌트 목록."""
return list(_tenants.values())
def apply_tenant_filter(stmt: Any, model: Any,
tenant_field: str = "tenant_id") -> Any:
"""
SQLAlchemy SELECT 구문에 테넌트 필터를 적용합니다.
사용 예:
stmt = apply_tenant_filter(select(SRRequest), SRRequest)
results = await db.execute(stmt)
"""
tenant_id = get_current_tenant_id()
if tenant_id == "DEFAULT":
return stmt # DEFAULT 테넌트는 모든 데이터 접근
col = getattr(model, tenant_field, None)
if col is None:
# 모델에 tenant_id 컬럼이 없으면 필터 미적용 (하위 호환)
return stmt
return stmt.where(col == tenant_id)
def require_tenant(tenant_id: str) -> None:
"""
특정 테넌트가 활성 상태인지 검증합니다.
비활성 테넌트면 ValueError를 발생시킵니다.
"""
t = _tenants.get(tenant_id)
if not t:
raise ValueError(f"테넌트 '{tenant_id}'가 존재하지 않습니다.")
if not t.get("is_active", False):
raise ValueError(f"테넌트 '{tenant_id}'가 비활성 상태입니다.")
def register_tenant(
tenant_id: str,
name: str,
code: str,
plan: str = "STANDARD",
created_by: str = "system",
) -> Dict:
"""테넌트를 레지스트리에 등록합니다."""
if tenant_id in _tenants:
raise ValueError(f"테넌트 ID '{tenant_id}'가 이미 존재합니다.")
record = {
"tenant_id": tenant_id,
"name": name,
"code": code,
"is_active": True,
"is_system": False,
"plan": plan,
"quota": {
"max_users": 100 if plan == "STANDARD" else 1000,
"max_servers": 50 if plan == "STANDARD" else 500,
"max_sr_per_month": 1000 if plan == "STANDARD" else 10000,
"storage_gb": 10 if plan == "STANDARD" else 100,
},
"settings": {
"rate_limit_rpm": 120 if plan == "STANDARD" else 600,
"allow_external": False,
},
"created_at": datetime.utcnow().isoformat(),
"created_by": created_by,
}
_tenants[tenant_id] = record
logger.info("테넌트 등록: %s (%s)", tenant_id, name)
return record
# ── Starlette/FastAPI 미들웨어 ────────────────────────────────────────────────
TENANT_HEADER = "X-Tenant-ID"
OVERRIDE_HEADER = "X-Tenant-Override" # ADMIN 전용 오버라이드
TENANT_EXEMPT_PATHS = {
"/", "/login", "/customer", "/change-password",
"/api/auth/login", "/api/auth/token",
"/health", "/api/metrics/health",
"/docs", "/openapi.json", "/redoc",
"/static",
}
class TenantMiddleware(BaseHTTPMiddleware):
"""
멀티테넌트 격리 미들웨어.
처리 순서:
1. 면제 경로 → 테넌트 컨텍스트 없이 통과
2. X-Tenant-ID 헤더에서 테넌트 추출
3. JWT 토큰의 tenant_id 클레임 추출 (헤더 없을 때)
4. 기본값: "DEFAULT"
5. 테넌트 활성 여부 검증
6. ContextVar에 테넌트 설정 → downstream 핸들러에서 사용
"""
def __init__(self, app: ASGIApp, default_tenant: str = "DEFAULT") -> None:
super().__init__(app)
self.default_tenant = default_tenant
async def dispatch(self, request: Request, call_next: Callable) -> Response:
# 면제 경로 처리
path = request.url.path
if any(path == ep or path.startswith(ep + "/") for ep in TENANT_EXEMPT_PATHS):
return await call_next(request)
# 테넌트 ID 추출 (우선순위: 헤더 > JWT > 기본값)
tenant_id = (
request.headers.get(TENANT_HEADER)
or self._extract_from_jwt(request)
or self.default_tenant
)
# 테넌트 존재·활성 검증
tenant = _tenants.get(tenant_id)
if tenant_id != self.default_tenant and not tenant:
return JSONResponse(
status_code=404,
content={"detail": f"테넌트 '{tenant_id}'를 찾을 수 없습니다."},
)
if tenant and not tenant.get("is_active", True):
return JSONResponse(
status_code=403,
content={"detail": f"테넌트 '{tenant_id}'가 비활성 상태입니다."},
)
# ContextVar 설정
token_tid = _tenant_id_ctx.set(tenant_id)
token_t = _tenant_ctx.set(tenant or _tenants.get(self.default_tenant))
try:
response = await call_next(request)
# 응답 헤더에 테넌트 ID 반영
response.headers["X-Tenant-ID"] = tenant_id
return response
finally:
_tenant_id_ctx.reset(token_tid)
_tenant_ctx.reset(token_t)
@staticmethod
def _extract_from_jwt(request: Request) -> Optional[str]:
"""Authorization 헤더에서 JWT tenant_id 클레임 추출."""
try:
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
return None
token = auth[7:]
import jwt as pyjwt
import os
secret = os.getenv("GUARDIA_SECRET_KEY", "")
payload = pyjwt.decode(token, secret, algorithms=["HS256"])
return payload.get("tenant_id")
except Exception:
return None