- itsm/ -> workspace/guardia-itsm/ - manager/ -> workspace/guardia-manager/ - app/ -> workspace/guardia-messenger/ - manual/ -> workspace/guardia-docs/ workspace/zioinfo-web/ unchanged. git mv preserves full commit history. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
256 lines
9.0 KiB
Python
256 lines
9.0 KiB
Python
"""
|
|
F-1: 멀티테넌트 데이터 격리
|
|
|
|
설계 원칙:
|
|
- 테넌트 식별: X-Tenant-ID 헤더 > JWT claim > 기본값(DEFAULT)
|
|
- ContextVar 기반 async-safe 테넌트 컨텍스트
|
|
- 모든 DB 쿼리에 tenant_id 필터 자동 주입 가능
|
|
- 테넌트별 설정(rate limit, storage quota) 지원
|
|
- ADMIN은 모든 테넌트 데이터 열람 가능 (X-Tenant-Override)
|
|
|
|
사용법:
|
|
# main.py에서 미들웨어 등록
|
|
from middleware.tenant import TenantMiddleware
|
|
app.add_middleware(TenantMiddleware)
|
|
|
|
# 라우터에서 현재 테넌트 조회
|
|
from middleware.tenant import get_current_tenant_id, require_tenant
|
|
tenant_id = get_current_tenant_id() # 현재 요청의 테넌트 ID
|
|
|
|
# SQLAlchemy 쿼리에 테넌트 필터 적용
|
|
from middleware.tenant import apply_tenant_filter
|
|
stmt = apply_tenant_filter(select(SRRequest), SRRequest)
|
|
|
|
엔드포인트 (routers/tenant_mgmt.py):
|
|
POST /api/tenants — 테넌트 생성 (SUPER_ADMIN)
|
|
GET /api/tenants — 테넌트 목록
|
|
GET /api/tenants/{tid} — 테넌트 상세
|
|
PUT /api/tenants/{tid} — 테넌트 수정
|
|
DELETE /api/tenants/{tid} — 테넌트 비활성화
|
|
GET /api/tenants/current — 현재 요청의 테넌트 정보
|
|
POST /api/tenants/{tid}/quota — 쿼터 설정
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from contextvars import ContextVar
|
|
from datetime import datetime
|
|
from typing import Any, Callable, Dict, Optional
|
|
from uuid import uuid4
|
|
|
|
from starlette.middleware.base import BaseHTTPMiddleware
|
|
from starlette.requests import Request
|
|
from starlette.responses import JSONResponse, Response
|
|
from starlette.types import ASGIApp
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ── 테넌트 컨텍스트 (async-safe ContextVar) ──────────────────────────────────
|
|
_tenant_id_ctx: ContextVar[str] = ContextVar("tenant_id", default="DEFAULT")
|
|
_tenant_ctx: ContextVar[Optional[Dict]] = ContextVar("tenant_obj", default=None)
|
|
|
|
# ── 테넌트 레지스트리 (인메모리 — 프로덕션에서는 DB 사용) ────────────────────
|
|
_tenants: Dict[str, Dict] = {
|
|
"DEFAULT": {
|
|
"tenant_id": "DEFAULT",
|
|
"name": "기본 테넌트",
|
|
"code": "DEFAULT",
|
|
"is_active": True,
|
|
"is_system": True,
|
|
"plan": "ENTERPRISE",
|
|
"quota": {
|
|
"max_users": 1000,
|
|
"max_servers": 500,
|
|
"max_sr_per_month": 10000,
|
|
"storage_gb": 100,
|
|
},
|
|
"settings": {
|
|
"rate_limit_rpm": 600,
|
|
"allow_external": False,
|
|
},
|
|
"created_at": datetime.utcnow().isoformat(),
|
|
"created_by": "system",
|
|
}
|
|
}
|
|
|
|
# ── 공개 헬퍼 함수 ───────────────────────────────────────────────────────────
|
|
|
|
def get_current_tenant_id() -> str:
|
|
"""현재 요청의 테넌트 ID 반환."""
|
|
return _tenant_id_ctx.get()
|
|
|
|
|
|
def get_current_tenant() -> Optional[Dict]:
|
|
"""현재 요청의 테넌트 객체 반환."""
|
|
return _tenant_ctx.get()
|
|
|
|
|
|
def set_tenant(tenant_id: str) -> None:
|
|
"""테넌트 컨텍스트 수동 설정 (테스트용)."""
|
|
_tenant_id_ctx.set(tenant_id)
|
|
_tenant_ctx.set(_tenants.get(tenant_id))
|
|
|
|
|
|
def get_tenant(tenant_id: str) -> Optional[Dict]:
|
|
"""테넌트 ID로 테넌트 조회."""
|
|
return _tenants.get(tenant_id)
|
|
|
|
|
|
def list_tenants() -> list:
|
|
"""전체 테넌트 목록."""
|
|
return list(_tenants.values())
|
|
|
|
|
|
def apply_tenant_filter(stmt: Any, model: Any,
|
|
tenant_field: str = "tenant_id") -> Any:
|
|
"""
|
|
SQLAlchemy SELECT 구문에 테넌트 필터를 적용합니다.
|
|
|
|
사용 예:
|
|
stmt = apply_tenant_filter(select(SRRequest), SRRequest)
|
|
results = await db.execute(stmt)
|
|
"""
|
|
tenant_id = get_current_tenant_id()
|
|
if tenant_id == "DEFAULT":
|
|
return stmt # DEFAULT 테넌트는 모든 데이터 접근
|
|
|
|
col = getattr(model, tenant_field, None)
|
|
if col is None:
|
|
# 모델에 tenant_id 컬럼이 없으면 필터 미적용 (하위 호환)
|
|
return stmt
|
|
|
|
return stmt.where(col == tenant_id)
|
|
|
|
|
|
def require_tenant(tenant_id: str) -> None:
|
|
"""
|
|
특정 테넌트가 활성 상태인지 검증합니다.
|
|
비활성 테넌트면 ValueError를 발생시킵니다.
|
|
"""
|
|
t = _tenants.get(tenant_id)
|
|
if not t:
|
|
raise ValueError(f"테넌트 '{tenant_id}'가 존재하지 않습니다.")
|
|
if not t.get("is_active", False):
|
|
raise ValueError(f"테넌트 '{tenant_id}'가 비활성 상태입니다.")
|
|
|
|
|
|
def register_tenant(
|
|
tenant_id: str,
|
|
name: str,
|
|
code: str,
|
|
plan: str = "STANDARD",
|
|
created_by: str = "system",
|
|
) -> Dict:
|
|
"""테넌트를 레지스트리에 등록합니다."""
|
|
if tenant_id in _tenants:
|
|
raise ValueError(f"테넌트 ID '{tenant_id}'가 이미 존재합니다.")
|
|
record = {
|
|
"tenant_id": tenant_id,
|
|
"name": name,
|
|
"code": code,
|
|
"is_active": True,
|
|
"is_system": False,
|
|
"plan": plan,
|
|
"quota": {
|
|
"max_users": 100 if plan == "STANDARD" else 1000,
|
|
"max_servers": 50 if plan == "STANDARD" else 500,
|
|
"max_sr_per_month": 1000 if plan == "STANDARD" else 10000,
|
|
"storage_gb": 10 if plan == "STANDARD" else 100,
|
|
},
|
|
"settings": {
|
|
"rate_limit_rpm": 120 if plan == "STANDARD" else 600,
|
|
"allow_external": False,
|
|
},
|
|
"created_at": datetime.utcnow().isoformat(),
|
|
"created_by": created_by,
|
|
}
|
|
_tenants[tenant_id] = record
|
|
logger.info("테넌트 등록: %s (%s)", tenant_id, name)
|
|
return record
|
|
|
|
|
|
# ── Starlette/FastAPI 미들웨어 ────────────────────────────────────────────────
|
|
|
|
TENANT_HEADER = "X-Tenant-ID"
|
|
OVERRIDE_HEADER = "X-Tenant-Override" # ADMIN 전용 오버라이드
|
|
TENANT_EXEMPT_PATHS = {
|
|
"/", "/login", "/customer", "/change-password",
|
|
"/api/auth/login", "/api/auth/token",
|
|
"/health", "/api/metrics/health",
|
|
"/docs", "/openapi.json", "/redoc",
|
|
"/static",
|
|
}
|
|
|
|
|
|
class TenantMiddleware(BaseHTTPMiddleware):
|
|
"""
|
|
멀티테넌트 격리 미들웨어.
|
|
|
|
처리 순서:
|
|
1. 면제 경로 → 테넌트 컨텍스트 없이 통과
|
|
2. X-Tenant-ID 헤더에서 테넌트 추출
|
|
3. JWT 토큰의 tenant_id 클레임 추출 (헤더 없을 때)
|
|
4. 기본값: "DEFAULT"
|
|
5. 테넌트 활성 여부 검증
|
|
6. ContextVar에 테넌트 설정 → downstream 핸들러에서 사용
|
|
"""
|
|
|
|
def __init__(self, app: ASGIApp, default_tenant: str = "DEFAULT") -> None:
|
|
super().__init__(app)
|
|
self.default_tenant = default_tenant
|
|
|
|
async def dispatch(self, request: Request, call_next: Callable) -> Response:
|
|
# 면제 경로 처리
|
|
path = request.url.path
|
|
if any(path == ep or path.startswith(ep + "/") for ep in TENANT_EXEMPT_PATHS):
|
|
return await call_next(request)
|
|
|
|
# 테넌트 ID 추출 (우선순위: 헤더 > JWT > 기본값)
|
|
tenant_id = (
|
|
request.headers.get(TENANT_HEADER)
|
|
or self._extract_from_jwt(request)
|
|
or self.default_tenant
|
|
)
|
|
|
|
# 테넌트 존재·활성 검증
|
|
tenant = _tenants.get(tenant_id)
|
|
if tenant_id != self.default_tenant and not tenant:
|
|
return JSONResponse(
|
|
status_code=404,
|
|
content={"detail": f"테넌트 '{tenant_id}'를 찾을 수 없습니다."},
|
|
)
|
|
if tenant and not tenant.get("is_active", True):
|
|
return JSONResponse(
|
|
status_code=403,
|
|
content={"detail": f"테넌트 '{tenant_id}'가 비활성 상태입니다."},
|
|
)
|
|
|
|
# ContextVar 설정
|
|
token_tid = _tenant_id_ctx.set(tenant_id)
|
|
token_t = _tenant_ctx.set(tenant or _tenants.get(self.default_tenant))
|
|
|
|
try:
|
|
response = await call_next(request)
|
|
# 응답 헤더에 테넌트 ID 반영
|
|
response.headers["X-Tenant-ID"] = tenant_id
|
|
return response
|
|
finally:
|
|
_tenant_id_ctx.reset(token_tid)
|
|
_tenant_ctx.reset(token_t)
|
|
|
|
@staticmethod
|
|
def _extract_from_jwt(request: Request) -> Optional[str]:
|
|
"""Authorization 헤더에서 JWT tenant_id 클레임 추출."""
|
|
try:
|
|
auth = request.headers.get("Authorization", "")
|
|
if not auth.startswith("Bearer "):
|
|
return None
|
|
token = auth[7:]
|
|
import jwt as pyjwt
|
|
import os
|
|
secret = os.getenv("GUARDIA_SECRET_KEY", "")
|
|
payload = pyjwt.decode(token, secret, algorithms=["HS256"])
|
|
return payload.get("tenant_id")
|
|
except Exception:
|
|
return None
|