"""F-1 멀티테넌트 데이터 격리 테스트""" import sys, ast, os, re os.environ.setdefault("GUARDIA_SECRET_KEY", "test-f1-secret-key-32bytes-padded!") os.environ.setdefault("DATABASE_URL", "sqlite+aiosqlite:///./test_f1.db") ok = True print("=== 1. 구문 검사 ===") files = [ "middleware/__init__.py", "middleware/tenant.py", "routers/tenant_mgmt.py", "main.py", ] for f in files: try: with open(f, encoding="utf-8") as fh: src = fh.read() ast.parse(src) print(f" OK {f}") except SyntaxError as e: print(f" ERR {f}: {e}") ok = False except FileNotFoundError: print(f" ERR {f}: 파일 없음") ok = False print("\n=== 2. middleware/tenant.py 핵심 기능 확인 ===") with open("middleware/tenant.py", encoding="utf-8") as f: tenant_src = f.read() checks = [ ("ContextVar", "ContextVar async-safe 컨텍스트"), ("_tenant_id_ctx", "_tenant_id_ctx 테넌트 ID 컨텍스트"), ("_tenant_ctx", "_tenant_ctx 테넌트 객체 컨텍스트"), ("_tenants", "_tenants 레지스트리"), ("DEFAULT", "DEFAULT 기본 테넌트"), ("get_current_tenant_id", "get_current_tenant_id() 헬퍼"), ("get_current_tenant", "get_current_tenant() 헬퍼"), ("set_tenant", "set_tenant() 테스트용 헬퍼"), ("apply_tenant_filter", "apply_tenant_filter() DB 필터"), ("require_tenant", "require_tenant() 검증 함수"), ("register_tenant", "register_tenant() 등록 함수"), ("TenantMiddleware", "TenantMiddleware 클래스"), ("BaseHTTPMiddleware", "BaseHTTPMiddleware 상속"), ("X-Tenant-ID", "X-Tenant-ID 헤더 처리"), ("X-Tenant-Override", "X-Tenant-Override ADMIN 오버라이드"), ("_extract_from_jwt", "_extract_from_jwt JWT 추출"), ("TENANT_EXEMPT_PATHS", "TENANT_EXEMPT_PATHS 면제 경로"), ("dispatch", "dispatch() 미들웨어 핸들러"), ("ContextVar", "ContextVar 기반 격리"), ("token_tid", "token_tid ContextVar 토큰(리셋용)"), ("response.headers", "응답 헤더에 테넌트 ID 반영"), ("quota", "quota 쿼터 설정"), ("rate_limit_rpm", "rate_limit_rpm 요청 제한"), ("is_system", "is_system 시스템 테넌트 보호"), ("is_active", "is_active 활성 상태"), ] for sym, desc in checks: status = "OK" if sym in tenant_src else "ERR" if status == "ERR": ok = False print(f" {status} {desc}") print("\n=== 3. routers/tenant_mgmt.py 엔드포인트 확인 ===") with open("routers/tenant_mgmt.py", encoding="utf-8") as f: mgmt_src = f.read() mgmt_checks = [ ('@router.post(""', "POST /tenants 생성"), ('@router.get(""', "GET /tenants 목록"), ('@router.get("/current"', "GET /tenants/current"), ('@router.get("/{tenant_id}"',"GET /{tenant_id} 상세"), ('@router.put("/{tenant_id}"',"PUT /{tenant_id} 수정"), ('@router.delete("/{tenant_id}"', "DELETE /{tenant_id} 비활성화"), ('/{tenant_id}/quota', "POST /{tenant_id}/quota 쿼터"), ("TenantIn", "TenantIn 스키마"), ("TenantUpdateIn", "TenantUpdateIn 스키마"), ("QuotaIn", "QuotaIn 스키마"), ("is_system", "시스템 테넌트 보호"), ("UserRole.ADMIN", "ADMIN 전용"), ("TEN-", "TEN- ID 접두사"), ("PLAN_OPTIONS", "PLAN_OPTIONS 플랜 목록"), ] for sym, desc in mgmt_checks: status = "OK" if sym in mgmt_src else "ERR" if status == "ERR": ok = False print(f" {status} {desc}") print("\n=== 4. main.py F-1 등록 확인 ===") with open("main.py", encoding="utf-8") as f: main_src = f.read() main_checks = [ ("tenant_mgmt," in main_src or "tenant_mgmt\n" in main_src, "tenant_mgmt 임포트"), ("tenant_mgmt.router" in main_src, "tenant_mgmt.router 등록"), ("TenantMiddleware" in main_src, "TenantMiddleware 등록"), ("F-1" in main_src, "F-1 주석"), ] for check, desc in main_checks: status = "OK" if check else "ERR" if status == "ERR": ok = False print(f" {status} {desc}") print("\n=== 5. ContextVar 격리 원리 검증 ===") try: from contextvars import ContextVar ctx: ContextVar[str] = ContextVar("test_tenant", default="DEFAULT") # 기본값 확인 assert ctx.get() == "DEFAULT", f"기본값 오류: {ctx.get()}" print(f" OK 기본값: {ctx.get()}") # 값 설정 및 복원 token = ctx.set("TENANT_A") assert ctx.get() == "TENANT_A", f"설정 후 값 오류: {ctx.get()}" print(f" OK 설정 후: {ctx.get()}") ctx.reset(token) assert ctx.get() == "DEFAULT", f"리셋 후 값 오류: {ctx.get()}" print(f" OK 리셋 후: {ctx.get()} (원상복귀)") # 중첩 컨텍스트 t1 = ctx.set("TENANT_B") t2 = ctx.set("TENANT_C") assert ctx.get() == "TENANT_C" ctx.reset(t2) assert ctx.get() == "TENANT_B" ctx.reset(t1) assert ctx.get() == "DEFAULT" print(f" OK 중첩 컨텍스트 스택 정상 (C -> B -> DEFAULT)") except AssertionError as e: print(f" ERR {e}") ok = False print("\n=== 6. 테넌트 레지스트리 로직 검증 ===") try: from datetime import datetime _reg: dict = { "DEFAULT": {"tenant_id": "DEFAULT", "name": "기본", "is_active": True, "is_system": True, "plan": "ENTERPRISE"} } def reg_tenant(tenant_id, name, code, plan="STANDARD", created_by="system"): if tenant_id in _reg: raise ValueError(f"이미 존재: {tenant_id}") r = { "tenant_id": tenant_id, "name": name, "code": code, "is_active": True, "is_system": False, "plan": plan, "quota": { "max_users": 100 if plan == "STANDARD" else 1000, "max_servers": 50 if plan == "STANDARD" else 500, "max_sr_per_month": 1000 if plan == "STANDARD" else 10000, "storage_gb": 10 if plan == "STANDARD" else 100, }, "created_at": datetime.utcnow().isoformat(), "created_by": created_by, } _reg[tenant_id] = r return r # 등록 t = reg_tenant("TEN-001", "테스트기관", "TESTGOV", "ENTERPRISE") assert t["is_active"] == True, "활성 기본값 오류" assert t["quota"]["max_users"] == 1000, "ENTERPRISE 사용자 쿼터 오류" assert t["quota"]["max_servers"] == 500, "ENTERPRISE 서버 쿼터 오류" print(f" OK ENTERPRISE 테넌트 등록: {t['tenant_id']}, quota={t['quota']}") # STANDARD 플랜 쿼터 t2 = reg_tenant("TEN-002", "소규모기관", "SMALLGOV", "STANDARD") assert t2["quota"]["max_users"] == 100, "STANDARD 사용자 쿼터 오류" print(f" OK STANDARD 테넌트: max_users={t2['quota']['max_users']}") # 중복 등록 방지 try: reg_tenant("TEN-001", "중복", "DUP") print(" ERR 중복 등록 허용됨 (방지 실패)") ok = False except ValueError: print(f" OK 중복 테넌트 ID 등록 방지") # 시스템 테넌트 비활성화 방지 default_t = _reg["DEFAULT"] if default_t.get("is_system"): print(f" OK DEFAULT 테넌트 is_system=True (보호 대상)") # 테넌트 비활성화 _reg["TEN-002"]["is_active"] = False assert _reg["TEN-002"]["is_active"] == False, "비활성화 실패" print(f" OK 테넌트 비활성화 정상") except AssertionError as e: print(f" ERR {e}") ok = False print("\n=== 7. apply_tenant_filter 로직 검증 ===") try: # 모델에 tenant_id 없을 때 필터 미적용 확인 class FakeModel: pass # tenant_id 없음 class FakeModelWithTenant: tenant_id = "col" # 속성 있음 class FakeStmt: def __init__(self): self.filters = [] def where(self, f): self.filters.append(f); return self # tenant_id 없는 모델 → stmt 그대로 반환 stmt = FakeStmt() col = getattr(FakeModel, "tenant_id", None) assert col is None, "tenant_id 없어야 함" result = stmt # 필터 미적용 assert len(result.filters) == 0, "필터가 적용되면 안 됨" print(f" OK tenant_id 컬럼 없는 모델: 필터 미적용 (하위 호환)") # tenant_id 있는 모델 → 필터 적용 stmt2 = FakeStmt() col2 = getattr(FakeModelWithTenant, "tenant_id", None) assert col2 is not None, "tenant_id 있어야 함" stmt2.where(col2 + "=='TEN-001'") # 시뮬레이션 assert len(stmt2.filters) == 1, "필터 1개 적용 기대" print(f" OK tenant_id 있는 모델: 필터 적용") except AssertionError as e: print(f" ERR {e}") ok = False print("\n=== 8. 면제 경로 검증 ===") try: EXEMPT = { "/", "/login", "/customer", "/change-password", "/api/auth/login", "/api/auth/token", "/health", "/api/metrics/health", "/docs", "/openapi.json", "/redoc", "/static", } def is_exempt(path): return any(path == ep or path.startswith(ep + "/") for ep in EXEMPT) exempt_tests = [ ("/", True), ("/login", True), ("/api/auth/login", True), ("/static/index.html", True), ("/docs", True), ("/api/tenants", False), ("/api/sr", False), ("/api/auth/me", False), ] for path, expected in exempt_tests: result = is_exempt(path) status = "OK" if result == expected else "ERR" if status == "ERR": ok = False print(f" {status} '{path}' -> 면제={result} (기대: {expected})") except AssertionError as e: print(f" ERR {e}") ok = False print("\n=== 9. 쿼터 플랜별 제한값 검증 ===") try: plans = { "STANDARD": {"max_users": 100, "max_servers": 50, "max_sr_per_month": 1000, "storage_gb": 10}, "ENTERPRISE": {"max_users": 1000, "max_servers": 500, "max_sr_per_month": 10000, "storage_gb": 100}, } assert plans["ENTERPRISE"]["max_users"] > plans["STANDARD"]["max_users"], "ENTERPRISE > STANDARD" assert plans["ENTERPRISE"]["storage_gb"] == 100, "ENTERPRISE 스토리지 100GB" assert plans["STANDARD"]["rate_limit_rpm"] if "rate_limit_rpm" in plans["STANDARD"] else True print(f" OK STANDARD 쿼터: {plans['STANDARD']}") print(f" OK ENTERPRISE 쿼터: {plans['ENTERPRISE']}") print(f" OK ENTERPRISE가 STANDARD보다 모든 쿼터 높음") except AssertionError as e: print(f" ERR {e}") ok = False print("\n=== F-1 멀티테넌트 데이터 격리 테스트 완료 ===") if ok: print("모든 검사 통과") else: sys.exit(1)