guardia-itsm/test_d1_ldap.py
DESKTOP-TKLFCPRython 64c27c3509 feat(itsm): G-1~G-12 확장 기능 + 하네스/봇/설치스크립트 구현
G-1: 메신저 Webhook Relay + _send_to_room 실제 httpx 호출 구현
G-2: POST /api/tasks/bulk SR 대량작업 엔드포인트 (최대 100건)
G-3: 라이선스 만료 알림 스케줄러 (매일 09:00 KST)
G-4: 체험판 upgrade_banner 필드 + license.py 배너 로직
G-5: core/auto_rca.py + incidents/problem auto-rca 엔드포인트
G-6: core/deploy_impact.py + vibe impact-analysis 엔드포인트
G-7: core/ticket_classifier.py + SR 생성 시 AI 분류 + ai-suggestion API
G-8: VulnPatchRecord 모델 + vuln_scan 패치추적 4개 엔드포인트
G-9: core/jira_sync.py + gateway Jira/Confluence 연동 엔드포인트
G-10: core/push_notify.py + routers/push.py + PushSubscription 모델
G-11: approvals 다중승인 (위임/서명/기한초과/마감연장)
G-12: alembic.ini + migrations/ + cicd/migrate_to_postgres.sh

하네스: guardia-orchestrator 확장기능 Phase 반영
봇명령어: /sr /status /license /bulk 슬래시 명령어 추가
설치스크립트: setup/ (Ubuntu, CentOS, RHEL, Windows) --test 옵션 포함

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 18:18:52 +09:00

281 lines
10 KiB
Python

"""D-1 LDAP/AD 연동 테스트"""
import sys, ast, os
os.environ.setdefault("GUARDIA_SECRET_KEY", "test-d1-secret-key-32bytes-padded!")
os.environ.setdefault("DATABASE_URL", "sqlite+aiosqlite:///./test_d1.db")
ok = True
print("=== 1. 구문 검사 ===")
files = ["core/ldap_auth.py", "routers/ldap.py", "main.py"]
for f in files:
try:
with open(f, encoding="utf-8") as fh:
src = fh.read()
ast.parse(src)
print(f" OK {f}")
except SyntaxError as e:
print(f" ERR {f}: {e}")
ok = False
print("\n=== 2. core/ldap_auth.py 핵심 기능 확인 ===")
with open("core/ldap_auth.py", encoding="utf-8") as f:
ldap_src = f.read()
ldap_checks = [
("class LDAPConfig", "LDAPConfig 데이터클래스"),
("bind_password", "bind_password 필드"),
("DEFAULT_GROUP_ROLE_MAP", "기본 그룹→역할 매핑"),
("def _load_config_from_env", "_load_config_from_env 함수"),
("def init_ldap_config", "init_ldap_config 함수"),
("def get_ldap_config", "get_ldap_config 함수"),
("def set_group_role_map", "set_group_role_map 함수"),
("def map_groups_to_role", "map_groups_to_role 함수"),
("def authenticate_ldap", "authenticate_ldap 함수"),
("async def sync_ldap_user", "sync_ldap_user 비동기 함수"),
("def test_ldap_connection", "test_ldap_connection 함수"),
("LDAP_SERVER_URL", "LDAP_SERVER_URL 환경변수"),
("LDAP_BIND_PASSWORD", "LDAP_BIND_PASSWORD 환경변수"),
("LDAP_ENABLED", "LDAP_ENABLED 환경변수"),
("ldap3", "ldap3 패키지 참조"),
('"GUARDiA-ADMIN"', "GUARDiA-ADMIN 그룹 매핑"),
('"Domain Admins"', "Domain Admins 그룹 매핑"),
("절대 로그", "bind_password 로그 금지 주석"),
]
for sym, desc in ldap_checks:
status = "OK" if sym in ldap_src else "ERR"
if status == "ERR":
ok = False
print(f" {status} {desc}")
print("\n=== 3. routers/ldap.py 엔드포인트 확인 ===")
with open("routers/ldap.py", encoding="utf-8") as f:
router_src = f.read()
router_checks = [
('@router.get("/status"', "GET /status"),
('@router.post("/test"', "POST /test (연결 테스트)"),
('@router.post("/authenticate"', "POST /authenticate (인증 테스트)"),
('@router.get("/config"', "GET /config"),
('@router.put("/config"', "PUT /config"),
('@router.get("/group-map"', "GET /group-map"),
('@router.put("/group-map"', "PUT /group-map"),
('/sync/', "POST /sync/{username}"),
('@router.get("/users"', "GET /users"),
('"***"', "bind_password 마스킹"),
('"bind_password", None', "응답에서 bind_password 제거"),
('UserRole.ADMIN', "ADMIN 권한 검증"),
]
for sym, desc in router_checks:
found = sym in router_src
status = "OK" if found else "ERR"
if status == "ERR":
ok = False
print(f" {status} {desc}")
print("\n=== 4. main.py D-1 등록 확인 ===")
with open("main.py", encoding="utf-8") as f:
main_src = f.read()
for sym, desc in [
("ldap", "ldap import"),
("ldap.router", "ldap 라우터 등록"),
("D-1", "D-1 섹션 주석"),
]:
status = "OK" if sym in main_src else "ERR"
if status == "ERR":
ok = False
print(f" {status} {desc}")
import sys as _sys
import importlib.util as _ilu
if "." not in _sys.path:
_sys.path.insert(0, ".")
def _load_ldap_mod():
"""core/ldap_auth.py를 매번 새 모듈로 로드."""
import time
mod_name = f"_ldap_auth_{int(time.time()*1000)}"
spec = _ilu.spec_from_file_location(mod_name, "core/ldap_auth.py")
if spec is None:
raise ImportError("spec_from_file_location 실패")
m = _ilu.module_from_spec(spec)
m.__package__ = ""
# Python 3.14: @dataclass 가 sys.modules[cls.__module__].__dict__ 를 사용
_sys.modules[mod_name] = m
try:
spec.loader.exec_module(m)
finally:
# 테스트 후 정리 (sys.modules 오염 방지)
_sys.modules.pop(mod_name, None)
return m
print("\n=== 5. 환경변수 로딩 테스트 ===")
try:
mod = _load_ldap_mod()
# 환경변수 없을 때 기본값
cfg = mod.get_ldap_config()
assert cfg.enabled == False, f"기본값 enabled=False 기대: {cfg.enabled}"
print(" OK 기본값 enabled=False")
assert cfg.server_url == "", f"기본값 server_url='' 기대: {cfg.server_url}"
print(" OK 기본값 server_url=''")
# 환경변수 설정 후 로딩
os.environ["LDAP_ENABLED"] = "true"
os.environ["LDAP_SERVER_URL"] = "ldap://192.168.0.10:389"
os.environ["LDAP_BASE_DN"] = "DC=company,DC=local"
os.environ["LDAP_BIND_DN"] = "CN=svc-ldap,DC=company,DC=local"
os.environ["LDAP_BIND_PASSWORD"] = "secret123"
# 강제 재로드
mod._current_config = None
cfg2 = mod.get_ldap_config()
assert cfg2.enabled == True, f"enabled=True 기대: {cfg2.enabled}"
assert cfg2.server_url == "ldap://192.168.0.10:389", f"server_url 오류: {cfg2.server_url}"
assert cfg2.bind_dn == "CN=svc-ldap,DC=company,DC=local"
print(" OK 환경변수에서 설정 로딩")
# bind_password 로그 노출 없음 확인
import logging, io
log_buf = io.StringIO()
handler = logging.StreamHandler(log_buf)
mod.logger.addHandler(handler)
try:
mod.authenticate_ldap("testuser", "testpass")
except Exception:
pass
log_output = log_buf.getvalue()
assert "secret123" not in log_output, "bind_password가 로그에 노출됨!"
print(" OK bind_password 로그 미노출")
except AssertionError as e:
print(f" ERR {e}")
ok = False
except Exception as e:
print(f" ERR 환경변수 로딩 오류: {type(e).__name__}: {e}")
ok = False
finally:
for key in ["LDAP_ENABLED", "LDAP_SERVER_URL", "LDAP_BASE_DN",
"LDAP_BIND_DN", "LDAP_BIND_PASSWORD"]:
os.environ.pop(key, None)
print("\n=== 6. map_groups_to_role 역할 우선순위 테스트 ===")
try:
mod2 = _load_ldap_mod()
# ADMIN > PM > ENGINEER > VIEWER
role = mod2.map_groups_to_role(["GUARDiA-ENGINEER", "Domain Admins"])
assert role == "ADMIN", f"ADMIN이 우선이어야 함: {role}"
print(f" OK [ENGINEER, Domain Admins] -> {role}")
role = mod2.map_groups_to_role(["GUARDiA-PM", "GUARDiA-VIEWER"])
assert role == "PM", f"PM이 우선이어야 함: {role}"
print(f" OK [PM, VIEWER] -> {role}")
role = mod2.map_groups_to_role(["GUARDiA-VIEWER"])
assert role == "VIEWER", f"VIEWER 기대: {role}"
print(f" OK [VIEWER] -> {role}")
role = mod2.map_groups_to_role(["Unknown-Group"])
assert role == "VIEWER", f"알 수 없는 그룹 -> VIEWER: {role}"
print(f" OK [Unknown-Group] -> {role} (기본값)")
# 부분 일치 테스트
role = mod2.map_groups_to_role(["CN=GUARDiA-ADMIN,OU=Groups,DC=company,DC=local"])
print(f" OK CN= 형식 그룹: role={role}")
except AssertionError as e:
print(f" ERR {e}")
ok = False
except Exception as e:
print(f" ERR map_groups_to_role 오류: {type(e).__name__}: {e}")
ok = False
print("\n=== 7. set_group_role_map 업데이트 테스트 ===")
try:
mod3 = _load_ldap_mod()
mod3.set_group_role_map({"CustomGroup-Dev": "ENGINEER", "CustomGroup-Lead": "PM"})
role = mod3.map_groups_to_role(["CustomGroup-Dev"])
assert role == "ENGINEER", f"커스텀 그룹 매핑 오류: {role}"
print(f" OK CustomGroup-Dev -> {role}")
role = mod3.map_groups_to_role(["CustomGroup-Lead"])
assert role == "PM", f"커스텀 그룹 매핑 오류: {role}"
print(f" OK CustomGroup-Lead -> {role}")
role = mod3.map_groups_to_role(["Domain Admins"])
assert role == "ADMIN", f"기본 매핑 유지 실패: {role}"
print(f" OK Domain Admins -> {role} (기본 매핑 유지)")
except AssertionError as e:
print(f" ERR {e}")
ok = False
except Exception as e:
print(f" ERR set_group_role_map 오류: {type(e).__name__}: {e}")
ok = False
print("\n=== 8. LDAP 비활성화 Fallback 테스트 ===")
try:
mod4 = _load_ldap_mod()
# 비활성화 상태에서 인증 시도
success, info, err = mod4.authenticate_ldap("user", "pass")
assert success == False, f"비활성 LDAP에서 success=False 기대: {success}"
assert "비활성화" in err or "LDAP" in err, f"오류 메시지 확인: {err}"
print(f" OK 비활성 LDAP -> success=False, err='{err}'")
# 연결 테스트도 비활성 반환
result = mod4.test_ldap_connection()
assert result["success"] == False
print(f" OK test_ldap_connection 비활성 -> success=False")
except AssertionError as e:
print(f" ERR {e}")
ok = False
except Exception as e:
print(f" ERR Fallback 테스트 오류: {type(e).__name__}: {e}")
ok = False
print("\n=== 9. 보안 제약 확인 ===")
with open("routers/ldap.py", encoding="utf-8") as f:
router_full = f.read()
security_checks = [
# bind_password가 응답 dict에 직접 포함되지 않음
("bind_password.*return" not in router_full.replace("\n", " "),
"bind_password가 return에 미포함"),
# 마스킹 처리
('***' in router_full or '"***"' in router_full,
"비밀번호 마스킹 처리"),
# ADMIN 권한 검사
(router_full.count("UserRole.ADMIN") >= 5,
"핵심 엔드포인트 ADMIN 권한 검사"),
]
for check, desc in security_checks:
status = "OK" if check else "WARN"
print(f" {status} {desc}")
print("\n=== 10. User 모델 LDAP 연동 필드 확인 ===")
with open("models.py", encoding="utf-8") as f:
models_src = f.read()
for sym, desc in [
("auth_type", "auth_type 컬럼 (LDAP/LOCAL 구분)"),
("display_name", "display_name 컬럼"),
("department", "department 컬럼"),
]:
status = "OK" if sym in models_src else "WARN"
if status == "ERR":
ok = False
print(f" {status} {desc}")
print("\n=== D-1 LDAP/AD 연동 테스트 완료 ===")
if ok:
print("모든 검사 통과")
else:
sys.exit(1)