- itsm/ -> workspace/guardia-itsm/ - manager/ -> workspace/guardia-manager/ - app/ -> workspace/guardia-messenger/ - manual/ -> workspace/guardia-docs/ workspace/zioinfo-web/ unchanged. git mv preserves full commit history. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
281 lines
10 KiB
Python
281 lines
10 KiB
Python
"""D-1 LDAP/AD 연동 테스트"""
|
|
import sys, ast, os
|
|
|
|
os.environ.setdefault("GUARDIA_SECRET_KEY", "test-d1-secret-key-32bytes-padded!")
|
|
os.environ.setdefault("DATABASE_URL", "sqlite+aiosqlite:///./test_d1.db")
|
|
|
|
ok = True
|
|
|
|
print("=== 1. 구문 검사 ===")
|
|
files = ["core/ldap_auth.py", "routers/ldap.py", "main.py"]
|
|
for f in files:
|
|
try:
|
|
with open(f, encoding="utf-8") as fh:
|
|
src = fh.read()
|
|
ast.parse(src)
|
|
print(f" OK {f}")
|
|
except SyntaxError as e:
|
|
print(f" ERR {f}: {e}")
|
|
ok = False
|
|
|
|
print("\n=== 2. core/ldap_auth.py 핵심 기능 확인 ===")
|
|
with open("core/ldap_auth.py", encoding="utf-8") as f:
|
|
ldap_src = f.read()
|
|
|
|
ldap_checks = [
|
|
("class LDAPConfig", "LDAPConfig 데이터클래스"),
|
|
("bind_password", "bind_password 필드"),
|
|
("DEFAULT_GROUP_ROLE_MAP", "기본 그룹→역할 매핑"),
|
|
("def _load_config_from_env", "_load_config_from_env 함수"),
|
|
("def init_ldap_config", "init_ldap_config 함수"),
|
|
("def get_ldap_config", "get_ldap_config 함수"),
|
|
("def set_group_role_map", "set_group_role_map 함수"),
|
|
("def map_groups_to_role", "map_groups_to_role 함수"),
|
|
("def authenticate_ldap", "authenticate_ldap 함수"),
|
|
("async def sync_ldap_user", "sync_ldap_user 비동기 함수"),
|
|
("def test_ldap_connection", "test_ldap_connection 함수"),
|
|
("LDAP_SERVER_URL", "LDAP_SERVER_URL 환경변수"),
|
|
("LDAP_BIND_PASSWORD", "LDAP_BIND_PASSWORD 환경변수"),
|
|
("LDAP_ENABLED", "LDAP_ENABLED 환경변수"),
|
|
("ldap3", "ldap3 패키지 참조"),
|
|
('"GUARDiA-ADMIN"', "GUARDiA-ADMIN 그룹 매핑"),
|
|
('"Domain Admins"', "Domain Admins 그룹 매핑"),
|
|
("절대 로그", "bind_password 로그 금지 주석"),
|
|
]
|
|
for sym, desc in ldap_checks:
|
|
status = "OK" if sym in ldap_src else "ERR"
|
|
if status == "ERR":
|
|
ok = False
|
|
print(f" {status} {desc}")
|
|
|
|
print("\n=== 3. routers/ldap.py 엔드포인트 확인 ===")
|
|
with open("routers/ldap.py", encoding="utf-8") as f:
|
|
router_src = f.read()
|
|
|
|
router_checks = [
|
|
('@router.get("/status"', "GET /status"),
|
|
('@router.post("/test"', "POST /test (연결 테스트)"),
|
|
('@router.post("/authenticate"', "POST /authenticate (인증 테스트)"),
|
|
('@router.get("/config"', "GET /config"),
|
|
('@router.put("/config"', "PUT /config"),
|
|
('@router.get("/group-map"', "GET /group-map"),
|
|
('@router.put("/group-map"', "PUT /group-map"),
|
|
('/sync/', "POST /sync/{username}"),
|
|
('@router.get("/users"', "GET /users"),
|
|
('"***"', "bind_password 마스킹"),
|
|
('"bind_password", None', "응답에서 bind_password 제거"),
|
|
('UserRole.ADMIN', "ADMIN 권한 검증"),
|
|
]
|
|
for sym, desc in router_checks:
|
|
found = sym in router_src
|
|
status = "OK" if found else "ERR"
|
|
if status == "ERR":
|
|
ok = False
|
|
print(f" {status} {desc}")
|
|
|
|
print("\n=== 4. main.py D-1 등록 확인 ===")
|
|
with open("main.py", encoding="utf-8") as f:
|
|
main_src = f.read()
|
|
|
|
for sym, desc in [
|
|
("ldap", "ldap import"),
|
|
("ldap.router", "ldap 라우터 등록"),
|
|
("D-1", "D-1 섹션 주석"),
|
|
]:
|
|
status = "OK" if sym in main_src else "ERR"
|
|
if status == "ERR":
|
|
ok = False
|
|
print(f" {status} {desc}")
|
|
|
|
import sys as _sys
|
|
import importlib.util as _ilu
|
|
|
|
if "." not in _sys.path:
|
|
_sys.path.insert(0, ".")
|
|
|
|
def _load_ldap_mod():
|
|
"""core/ldap_auth.py를 매번 새 모듈로 로드."""
|
|
import time
|
|
mod_name = f"_ldap_auth_{int(time.time()*1000)}"
|
|
spec = _ilu.spec_from_file_location(mod_name, "core/ldap_auth.py")
|
|
if spec is None:
|
|
raise ImportError("spec_from_file_location 실패")
|
|
m = _ilu.module_from_spec(spec)
|
|
m.__package__ = ""
|
|
# Python 3.14: @dataclass 가 sys.modules[cls.__module__].__dict__ 를 사용
|
|
_sys.modules[mod_name] = m
|
|
try:
|
|
spec.loader.exec_module(m)
|
|
finally:
|
|
# 테스트 후 정리 (sys.modules 오염 방지)
|
|
_sys.modules.pop(mod_name, None)
|
|
return m
|
|
|
|
print("\n=== 5. 환경변수 로딩 테스트 ===")
|
|
try:
|
|
mod = _load_ldap_mod()
|
|
|
|
# 환경변수 없을 때 기본값
|
|
cfg = mod.get_ldap_config()
|
|
assert cfg.enabled == False, f"기본값 enabled=False 기대: {cfg.enabled}"
|
|
print(" OK 기본값 enabled=False")
|
|
|
|
assert cfg.server_url == "", f"기본값 server_url='' 기대: {cfg.server_url}"
|
|
print(" OK 기본값 server_url=''")
|
|
|
|
# 환경변수 설정 후 로딩
|
|
os.environ["LDAP_ENABLED"] = "true"
|
|
os.environ["LDAP_SERVER_URL"] = "ldap://192.168.0.10:389"
|
|
os.environ["LDAP_BASE_DN"] = "DC=company,DC=local"
|
|
os.environ["LDAP_BIND_DN"] = "CN=svc-ldap,DC=company,DC=local"
|
|
os.environ["LDAP_BIND_PASSWORD"] = "secret123"
|
|
|
|
# 강제 재로드
|
|
mod._current_config = None
|
|
cfg2 = mod.get_ldap_config()
|
|
assert cfg2.enabled == True, f"enabled=True 기대: {cfg2.enabled}"
|
|
assert cfg2.server_url == "ldap://192.168.0.10:389", f"server_url 오류: {cfg2.server_url}"
|
|
assert cfg2.bind_dn == "CN=svc-ldap,DC=company,DC=local"
|
|
print(" OK 환경변수에서 설정 로딩")
|
|
|
|
# bind_password 로그 노출 없음 확인
|
|
import logging, io
|
|
log_buf = io.StringIO()
|
|
handler = logging.StreamHandler(log_buf)
|
|
mod.logger.addHandler(handler)
|
|
try:
|
|
mod.authenticate_ldap("testuser", "testpass")
|
|
except Exception:
|
|
pass
|
|
log_output = log_buf.getvalue()
|
|
assert "secret123" not in log_output, "bind_password가 로그에 노출됨!"
|
|
print(" OK bind_password 로그 미노출")
|
|
|
|
except AssertionError as e:
|
|
print(f" ERR {e}")
|
|
ok = False
|
|
except Exception as e:
|
|
print(f" ERR 환경변수 로딩 오류: {type(e).__name__}: {e}")
|
|
ok = False
|
|
finally:
|
|
for key in ["LDAP_ENABLED", "LDAP_SERVER_URL", "LDAP_BASE_DN",
|
|
"LDAP_BIND_DN", "LDAP_BIND_PASSWORD"]:
|
|
os.environ.pop(key, None)
|
|
|
|
print("\n=== 6. map_groups_to_role 역할 우선순위 테스트 ===")
|
|
try:
|
|
mod2 = _load_ldap_mod()
|
|
|
|
# ADMIN > PM > ENGINEER > VIEWER
|
|
role = mod2.map_groups_to_role(["GUARDiA-ENGINEER", "Domain Admins"])
|
|
assert role == "ADMIN", f"ADMIN이 우선이어야 함: {role}"
|
|
print(f" OK [ENGINEER, Domain Admins] -> {role}")
|
|
|
|
role = mod2.map_groups_to_role(["GUARDiA-PM", "GUARDiA-VIEWER"])
|
|
assert role == "PM", f"PM이 우선이어야 함: {role}"
|
|
print(f" OK [PM, VIEWER] -> {role}")
|
|
|
|
role = mod2.map_groups_to_role(["GUARDiA-VIEWER"])
|
|
assert role == "VIEWER", f"VIEWER 기대: {role}"
|
|
print(f" OK [VIEWER] -> {role}")
|
|
|
|
role = mod2.map_groups_to_role(["Unknown-Group"])
|
|
assert role == "VIEWER", f"알 수 없는 그룹 -> VIEWER: {role}"
|
|
print(f" OK [Unknown-Group] -> {role} (기본값)")
|
|
|
|
# 부분 일치 테스트
|
|
role = mod2.map_groups_to_role(["CN=GUARDiA-ADMIN,OU=Groups,DC=company,DC=local"])
|
|
print(f" OK CN= 형식 그룹: role={role}")
|
|
|
|
except AssertionError as e:
|
|
print(f" ERR {e}")
|
|
ok = False
|
|
except Exception as e:
|
|
print(f" ERR map_groups_to_role 오류: {type(e).__name__}: {e}")
|
|
ok = False
|
|
|
|
print("\n=== 7. set_group_role_map 업데이트 테스트 ===")
|
|
try:
|
|
mod3 = _load_ldap_mod()
|
|
|
|
mod3.set_group_role_map({"CustomGroup-Dev": "ENGINEER", "CustomGroup-Lead": "PM"})
|
|
role = mod3.map_groups_to_role(["CustomGroup-Dev"])
|
|
assert role == "ENGINEER", f"커스텀 그룹 매핑 오류: {role}"
|
|
print(f" OK CustomGroup-Dev -> {role}")
|
|
|
|
role = mod3.map_groups_to_role(["CustomGroup-Lead"])
|
|
assert role == "PM", f"커스텀 그룹 매핑 오류: {role}"
|
|
print(f" OK CustomGroup-Lead -> {role}")
|
|
|
|
role = mod3.map_groups_to_role(["Domain Admins"])
|
|
assert role == "ADMIN", f"기본 매핑 유지 실패: {role}"
|
|
print(f" OK Domain Admins -> {role} (기본 매핑 유지)")
|
|
|
|
except AssertionError as e:
|
|
print(f" ERR {e}")
|
|
ok = False
|
|
except Exception as e:
|
|
print(f" ERR set_group_role_map 오류: {type(e).__name__}: {e}")
|
|
ok = False
|
|
|
|
print("\n=== 8. LDAP 비활성화 Fallback 테스트 ===")
|
|
try:
|
|
mod4 = _load_ldap_mod()
|
|
|
|
# 비활성화 상태에서 인증 시도
|
|
success, info, err = mod4.authenticate_ldap("user", "pass")
|
|
assert success == False, f"비활성 LDAP에서 success=False 기대: {success}"
|
|
assert "비활성화" in err or "LDAP" in err, f"오류 메시지 확인: {err}"
|
|
print(f" OK 비활성 LDAP -> success=False, err='{err}'")
|
|
|
|
# 연결 테스트도 비활성 반환
|
|
result = mod4.test_ldap_connection()
|
|
assert result["success"] == False
|
|
print(f" OK test_ldap_connection 비활성 -> success=False")
|
|
|
|
except AssertionError as e:
|
|
print(f" ERR {e}")
|
|
ok = False
|
|
except Exception as e:
|
|
print(f" ERR Fallback 테스트 오류: {type(e).__name__}: {e}")
|
|
ok = False
|
|
|
|
print("\n=== 9. 보안 제약 확인 ===")
|
|
with open("routers/ldap.py", encoding="utf-8") as f:
|
|
router_full = f.read()
|
|
|
|
security_checks = [
|
|
# bind_password가 응답 dict에 직접 포함되지 않음
|
|
("bind_password.*return" not in router_full.replace("\n", " "),
|
|
"bind_password가 return에 미포함"),
|
|
# 마스킹 처리
|
|
('***' in router_full or '"***"' in router_full,
|
|
"비밀번호 마스킹 처리"),
|
|
# ADMIN 권한 검사
|
|
(router_full.count("UserRole.ADMIN") >= 5,
|
|
"핵심 엔드포인트 ADMIN 권한 검사"),
|
|
]
|
|
for check, desc in security_checks:
|
|
status = "OK" if check else "WARN"
|
|
print(f" {status} {desc}")
|
|
|
|
print("\n=== 10. User 모델 LDAP 연동 필드 확인 ===")
|
|
with open("models.py", encoding="utf-8") as f:
|
|
models_src = f.read()
|
|
|
|
for sym, desc in [
|
|
("auth_type", "auth_type 컬럼 (LDAP/LOCAL 구분)"),
|
|
("display_name", "display_name 컬럼"),
|
|
("department", "department 컬럼"),
|
|
]:
|
|
status = "OK" if sym in models_src else "WARN"
|
|
if status == "ERR":
|
|
ok = False
|
|
print(f" {status} {desc}")
|
|
|
|
print("\n=== D-1 LDAP/AD 연동 테스트 완료 ===")
|
|
if ok:
|
|
print("모든 검사 통과")
|
|
else:
|
|
sys.exit(1)
|