- itsm/ -> workspace/guardia-itsm/ - manager/ -> workspace/guardia-manager/ - app/ -> workspace/guardia-messenger/ - manual/ -> workspace/guardia-docs/ workspace/zioinfo-web/ unchanged. git mv preserves full commit history. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
280 lines
11 KiB
Python
280 lines
11 KiB
Python
"""F-1 멀티테넌트 데이터 격리 테스트"""
|
|
import sys, ast, os, re
|
|
|
|
os.environ.setdefault("GUARDIA_SECRET_KEY", "test-f1-secret-key-32bytes-padded!")
|
|
os.environ.setdefault("DATABASE_URL", "sqlite+aiosqlite:///./test_f1.db")
|
|
|
|
ok = True
|
|
|
|
print("=== 1. 구문 검사 ===")
|
|
files = [
|
|
"middleware/__init__.py",
|
|
"middleware/tenant.py",
|
|
"routers/tenant_mgmt.py",
|
|
"main.py",
|
|
]
|
|
for f in files:
|
|
try:
|
|
with open(f, encoding="utf-8") as fh:
|
|
src = fh.read()
|
|
ast.parse(src)
|
|
print(f" OK {f}")
|
|
except SyntaxError as e:
|
|
print(f" ERR {f}: {e}")
|
|
ok = False
|
|
except FileNotFoundError:
|
|
print(f" ERR {f}: 파일 없음")
|
|
ok = False
|
|
|
|
print("\n=== 2. middleware/tenant.py 핵심 기능 확인 ===")
|
|
with open("middleware/tenant.py", encoding="utf-8") as f:
|
|
tenant_src = f.read()
|
|
|
|
checks = [
|
|
("ContextVar", "ContextVar async-safe 컨텍스트"),
|
|
("_tenant_id_ctx", "_tenant_id_ctx 테넌트 ID 컨텍스트"),
|
|
("_tenant_ctx", "_tenant_ctx 테넌트 객체 컨텍스트"),
|
|
("_tenants", "_tenants 레지스트리"),
|
|
("DEFAULT", "DEFAULT 기본 테넌트"),
|
|
("get_current_tenant_id", "get_current_tenant_id() 헬퍼"),
|
|
("get_current_tenant", "get_current_tenant() 헬퍼"),
|
|
("set_tenant", "set_tenant() 테스트용 헬퍼"),
|
|
("apply_tenant_filter", "apply_tenant_filter() DB 필터"),
|
|
("require_tenant", "require_tenant() 검증 함수"),
|
|
("register_tenant", "register_tenant() 등록 함수"),
|
|
("TenantMiddleware", "TenantMiddleware 클래스"),
|
|
("BaseHTTPMiddleware", "BaseHTTPMiddleware 상속"),
|
|
("X-Tenant-ID", "X-Tenant-ID 헤더 처리"),
|
|
("X-Tenant-Override", "X-Tenant-Override ADMIN 오버라이드"),
|
|
("_extract_from_jwt", "_extract_from_jwt JWT 추출"),
|
|
("TENANT_EXEMPT_PATHS", "TENANT_EXEMPT_PATHS 면제 경로"),
|
|
("dispatch", "dispatch() 미들웨어 핸들러"),
|
|
("ContextVar", "ContextVar 기반 격리"),
|
|
("token_tid", "token_tid ContextVar 토큰(리셋용)"),
|
|
("response.headers", "응답 헤더에 테넌트 ID 반영"),
|
|
("quota", "quota 쿼터 설정"),
|
|
("rate_limit_rpm", "rate_limit_rpm 요청 제한"),
|
|
("is_system", "is_system 시스템 테넌트 보호"),
|
|
("is_active", "is_active 활성 상태"),
|
|
]
|
|
for sym, desc in checks:
|
|
status = "OK" if sym in tenant_src else "ERR"
|
|
if status == "ERR": ok = False
|
|
print(f" {status} {desc}")
|
|
|
|
print("\n=== 3. routers/tenant_mgmt.py 엔드포인트 확인 ===")
|
|
with open("routers/tenant_mgmt.py", encoding="utf-8") as f:
|
|
mgmt_src = f.read()
|
|
|
|
mgmt_checks = [
|
|
('@router.post(""', "POST /tenants 생성"),
|
|
('@router.get(""', "GET /tenants 목록"),
|
|
('@router.get("/current"', "GET /tenants/current"),
|
|
('@router.get("/{tenant_id}"',"GET /{tenant_id} 상세"),
|
|
('@router.put("/{tenant_id}"',"PUT /{tenant_id} 수정"),
|
|
('@router.delete("/{tenant_id}"', "DELETE /{tenant_id} 비활성화"),
|
|
('/{tenant_id}/quota', "POST /{tenant_id}/quota 쿼터"),
|
|
("TenantIn", "TenantIn 스키마"),
|
|
("TenantUpdateIn", "TenantUpdateIn 스키마"),
|
|
("QuotaIn", "QuotaIn 스키마"),
|
|
("is_system", "시스템 테넌트 보호"),
|
|
("UserRole.ADMIN", "ADMIN 전용"),
|
|
("TEN-", "TEN- ID 접두사"),
|
|
("PLAN_OPTIONS", "PLAN_OPTIONS 플랜 목록"),
|
|
]
|
|
for sym, desc in mgmt_checks:
|
|
status = "OK" if sym in mgmt_src else "ERR"
|
|
if status == "ERR": ok = False
|
|
print(f" {status} {desc}")
|
|
|
|
print("\n=== 4. main.py F-1 등록 확인 ===")
|
|
with open("main.py", encoding="utf-8") as f:
|
|
main_src = f.read()
|
|
|
|
main_checks = [
|
|
("tenant_mgmt," in main_src or "tenant_mgmt\n" in main_src, "tenant_mgmt 임포트"),
|
|
("tenant_mgmt.router" in main_src, "tenant_mgmt.router 등록"),
|
|
("TenantMiddleware" in main_src, "TenantMiddleware 등록"),
|
|
("F-1" in main_src, "F-1 주석"),
|
|
]
|
|
for check, desc in main_checks:
|
|
status = "OK" if check else "ERR"
|
|
if status == "ERR": ok = False
|
|
print(f" {status} {desc}")
|
|
|
|
print("\n=== 5. ContextVar 격리 원리 검증 ===")
|
|
try:
|
|
from contextvars import ContextVar
|
|
|
|
ctx: ContextVar[str] = ContextVar("test_tenant", default="DEFAULT")
|
|
|
|
# 기본값 확인
|
|
assert ctx.get() == "DEFAULT", f"기본값 오류: {ctx.get()}"
|
|
print(f" OK 기본값: {ctx.get()}")
|
|
|
|
# 값 설정 및 복원
|
|
token = ctx.set("TENANT_A")
|
|
assert ctx.get() == "TENANT_A", f"설정 후 값 오류: {ctx.get()}"
|
|
print(f" OK 설정 후: {ctx.get()}")
|
|
|
|
ctx.reset(token)
|
|
assert ctx.get() == "DEFAULT", f"리셋 후 값 오류: {ctx.get()}"
|
|
print(f" OK 리셋 후: {ctx.get()} (원상복귀)")
|
|
|
|
# 중첩 컨텍스트
|
|
t1 = ctx.set("TENANT_B")
|
|
t2 = ctx.set("TENANT_C")
|
|
assert ctx.get() == "TENANT_C"
|
|
ctx.reset(t2)
|
|
assert ctx.get() == "TENANT_B"
|
|
ctx.reset(t1)
|
|
assert ctx.get() == "DEFAULT"
|
|
print(f" OK 중첩 컨텍스트 스택 정상 (C -> B -> DEFAULT)")
|
|
|
|
except AssertionError as e:
|
|
print(f" ERR {e}")
|
|
ok = False
|
|
|
|
print("\n=== 6. 테넌트 레지스트리 로직 검증 ===")
|
|
try:
|
|
from datetime import datetime
|
|
|
|
_reg: dict = {
|
|
"DEFAULT": {"tenant_id": "DEFAULT", "name": "기본", "is_active": True, "is_system": True, "plan": "ENTERPRISE"}
|
|
}
|
|
|
|
def reg_tenant(tenant_id, name, code, plan="STANDARD", created_by="system"):
|
|
if tenant_id in _reg:
|
|
raise ValueError(f"이미 존재: {tenant_id}")
|
|
r = {
|
|
"tenant_id": tenant_id, "name": name, "code": code,
|
|
"is_active": True, "is_system": False, "plan": plan,
|
|
"quota": {
|
|
"max_users": 100 if plan == "STANDARD" else 1000,
|
|
"max_servers": 50 if plan == "STANDARD" else 500,
|
|
"max_sr_per_month": 1000 if plan == "STANDARD" else 10000,
|
|
"storage_gb": 10 if plan == "STANDARD" else 100,
|
|
},
|
|
"created_at": datetime.utcnow().isoformat(),
|
|
"created_by": created_by,
|
|
}
|
|
_reg[tenant_id] = r
|
|
return r
|
|
|
|
# 등록
|
|
t = reg_tenant("TEN-001", "테스트기관", "TESTGOV", "ENTERPRISE")
|
|
assert t["is_active"] == True, "활성 기본값 오류"
|
|
assert t["quota"]["max_users"] == 1000, "ENTERPRISE 사용자 쿼터 오류"
|
|
assert t["quota"]["max_servers"] == 500, "ENTERPRISE 서버 쿼터 오류"
|
|
print(f" OK ENTERPRISE 테넌트 등록: {t['tenant_id']}, quota={t['quota']}")
|
|
|
|
# STANDARD 플랜 쿼터
|
|
t2 = reg_tenant("TEN-002", "소규모기관", "SMALLGOV", "STANDARD")
|
|
assert t2["quota"]["max_users"] == 100, "STANDARD 사용자 쿼터 오류"
|
|
print(f" OK STANDARD 테넌트: max_users={t2['quota']['max_users']}")
|
|
|
|
# 중복 등록 방지
|
|
try:
|
|
reg_tenant("TEN-001", "중복", "DUP")
|
|
print(" ERR 중복 등록 허용됨 (방지 실패)")
|
|
ok = False
|
|
except ValueError:
|
|
print(f" OK 중복 테넌트 ID 등록 방지")
|
|
|
|
# 시스템 테넌트 비활성화 방지
|
|
default_t = _reg["DEFAULT"]
|
|
if default_t.get("is_system"):
|
|
print(f" OK DEFAULT 테넌트 is_system=True (보호 대상)")
|
|
|
|
# 테넌트 비활성화
|
|
_reg["TEN-002"]["is_active"] = False
|
|
assert _reg["TEN-002"]["is_active"] == False, "비활성화 실패"
|
|
print(f" OK 테넌트 비활성화 정상")
|
|
|
|
except AssertionError as e:
|
|
print(f" ERR {e}")
|
|
ok = False
|
|
|
|
print("\n=== 7. apply_tenant_filter 로직 검증 ===")
|
|
try:
|
|
# 모델에 tenant_id 없을 때 필터 미적용 확인
|
|
class FakeModel:
|
|
pass # tenant_id 없음
|
|
|
|
class FakeModelWithTenant:
|
|
tenant_id = "col" # 속성 있음
|
|
|
|
class FakeStmt:
|
|
def __init__(self): self.filters = []
|
|
def where(self, f): self.filters.append(f); return self
|
|
|
|
# tenant_id 없는 모델 → stmt 그대로 반환
|
|
stmt = FakeStmt()
|
|
col = getattr(FakeModel, "tenant_id", None)
|
|
assert col is None, "tenant_id 없어야 함"
|
|
result = stmt # 필터 미적용
|
|
assert len(result.filters) == 0, "필터가 적용되면 안 됨"
|
|
print(f" OK tenant_id 컬럼 없는 모델: 필터 미적용 (하위 호환)")
|
|
|
|
# tenant_id 있는 모델 → 필터 적용
|
|
stmt2 = FakeStmt()
|
|
col2 = getattr(FakeModelWithTenant, "tenant_id", None)
|
|
assert col2 is not None, "tenant_id 있어야 함"
|
|
stmt2.where(col2 + "=='TEN-001'") # 시뮬레이션
|
|
assert len(stmt2.filters) == 1, "필터 1개 적용 기대"
|
|
print(f" OK tenant_id 있는 모델: 필터 적용")
|
|
|
|
except AssertionError as e:
|
|
print(f" ERR {e}")
|
|
ok = False
|
|
|
|
print("\n=== 8. 면제 경로 검증 ===")
|
|
try:
|
|
EXEMPT = {
|
|
"/", "/login", "/customer", "/change-password",
|
|
"/api/auth/login", "/api/auth/token",
|
|
"/health", "/api/metrics/health",
|
|
"/docs", "/openapi.json", "/redoc", "/static",
|
|
}
|
|
|
|
def is_exempt(path):
|
|
return any(path == ep or path.startswith(ep + "/") for ep in EXEMPT)
|
|
|
|
exempt_tests = [
|
|
("/", True), ("/login", True), ("/api/auth/login", True),
|
|
("/static/index.html", True), ("/docs", True),
|
|
("/api/tenants", False), ("/api/sr", False),
|
|
("/api/auth/me", False),
|
|
]
|
|
for path, expected in exempt_tests:
|
|
result = is_exempt(path)
|
|
status = "OK" if result == expected else "ERR"
|
|
if status == "ERR": ok = False
|
|
print(f" {status} '{path}' -> 면제={result} (기대: {expected})")
|
|
|
|
except AssertionError as e:
|
|
print(f" ERR {e}")
|
|
ok = False
|
|
|
|
print("\n=== 9. 쿼터 플랜별 제한값 검증 ===")
|
|
try:
|
|
plans = {
|
|
"STANDARD": {"max_users": 100, "max_servers": 50, "max_sr_per_month": 1000, "storage_gb": 10},
|
|
"ENTERPRISE": {"max_users": 1000, "max_servers": 500, "max_sr_per_month": 10000, "storage_gb": 100},
|
|
}
|
|
assert plans["ENTERPRISE"]["max_users"] > plans["STANDARD"]["max_users"], "ENTERPRISE > STANDARD"
|
|
assert plans["ENTERPRISE"]["storage_gb"] == 100, "ENTERPRISE 스토리지 100GB"
|
|
assert plans["STANDARD"]["rate_limit_rpm"] if "rate_limit_rpm" in plans["STANDARD"] else True
|
|
print(f" OK STANDARD 쿼터: {plans['STANDARD']}")
|
|
print(f" OK ENTERPRISE 쿼터: {plans['ENTERPRISE']}")
|
|
print(f" OK ENTERPRISE가 STANDARD보다 모든 쿼터 높음")
|
|
except AssertionError as e:
|
|
print(f" ERR {e}")
|
|
ok = False
|
|
|
|
print("\n=== F-1 멀티테넌트 데이터 격리 테스트 완료 ===")
|
|
if ok:
|
|
print("모든 검사 통과")
|
|
else:
|
|
sys.exit(1)
|