""" D-1: LDAP/AD 연동 엔진 기능: 1. LDAP/Active Directory 인증 (바인드 테스트) 2. 사용자 정보 조회 및 동기화 3. 그룹 멤버십 → 역할(Role) 매핑 4. Fallback: LDAP 미연결 시 로컬 DB 인증 5. LDAP 연결 설정 암호화 저장 지원 보안 제약: - LDAP 서버 자격증명 노출 금지 - 바인드 DN/패스워드는 서버 환경변수 또는 암호화 DB 저장 - 외부 LDAP 서버 URL만 지원 (내부망 전용) """ from __future__ import annotations import logging import os from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple logger = logging.getLogger(__name__) # ── LDAP 설정 ───────────────────────────────────────────────────────────────── @dataclass class LDAPConfig: """LDAP 연결 설정.""" server_url: str = "" # ldap://192.168.0.10:389 base_dn: str = "" # DC=company,DC=local bind_dn: str = "" # CN=svc-ldap,OU=ServiceAccounts,DC=company,DC=local bind_password: str = "" # (절대 로그 출력 금지) user_search_base: str = "" # OU=Users,DC=company,DC=local user_filter: str = "(sAMAccountName={username})" # AD 기본 필터 group_search_base: str = "" # OU=Groups,DC=company,DC=local group_attr: str = "memberOf" # 그룹 멤버십 속성 uid_attr: str = "sAMAccountName" # 사용자명 속성 email_attr: str = "mail" display_name_attr: str = "displayName" department_attr: str = "department" use_ssl: bool = False use_tls: bool = False timeout: int = 10 enabled: bool = False # LDAP 인증 활성화 여부 # 그룹 → 역할 매핑 기본값 DEFAULT_GROUP_ROLE_MAP: Dict[str, str] = { "GUARDiA-ADMIN": "ADMIN", "GUARDiA-PM": "PM", "GUARDiA-ENGINEER": "ENGINEER", "GUARDiA-VIEWER": "VIEWER", "Domain Admins": "ADMIN", "IT-Operations": "ENGINEER", "IT-Management": "PM", } def _load_config_from_env() -> LDAPConfig: """환경변수에서 LDAP 설정 로드.""" return LDAPConfig( server_url = os.environ.get("LDAP_SERVER_URL", ""), base_dn = os.environ.get("LDAP_BASE_DN", ""), bind_dn = os.environ.get("LDAP_BIND_DN", ""), bind_password = os.environ.get("LDAP_BIND_PASSWORD", ""), user_search_base = os.environ.get("LDAP_USER_SEARCH_BASE", ""), group_search_base = os.environ.get("LDAP_GROUP_SEARCH_BASE", ""), use_ssl = os.environ.get("LDAP_USE_SSL", "false").lower() == "true", use_tls = os.environ.get("LDAP_USE_TLS", "false").lower() == "true", enabled = os.environ.get("LDAP_ENABLED", "false").lower() == "true", ) # 런타임 설정 (init_ldap_config()로 초기화) _current_config: Optional[LDAPConfig] = None _group_role_map: Dict[str, str] = dict(DEFAULT_GROUP_ROLE_MAP) def init_ldap_config(config: Optional[LDAPConfig] = None) -> LDAPConfig: """LDAP 설정 초기화. config가 None이면 환경변수에서 로드.""" global _current_config _current_config = config or _load_config_from_env() return _current_config def get_ldap_config() -> LDAPConfig: """현재 LDAP 설정 반환.""" global _current_config if _current_config is None: _current_config = _load_config_from_env() return _current_config def set_group_role_map(mapping: Dict[str, str]) -> None: """그룹→역할 매핑 업데이트.""" global _group_role_map _group_role_map.update(mapping) def map_groups_to_role(groups: List[str]) -> str: """ LDAP 그룹 목록에서 ITSM 역할을 매핑. 가장 높은 권한 역할 반환. """ role_priority = {"ADMIN": 4, "PM": 3, "ENGINEER": 2, "VIEWER": 1} best_role = "VIEWER" best_prio = 0 for grp in groups: # 완전 일치 if grp in _group_role_map: role = _group_role_map[grp] prio = role_priority.get(role, 0) if prio > best_prio: best_prio = prio best_role = role else: # 그룹 이름 부분 일치 (CN=... 형식 지원) for map_grp, role in _group_role_map.items(): if map_grp.lower() in grp.lower(): prio = role_priority.get(role, 0) if prio > best_prio: best_prio = prio best_role = role return best_role # ── LDAP 인증 함수 ───────────────────────────────────────────────────────────── def _try_import_ldap3(): """ldap3 패키지 임포트 시도. 없으면 None 반환.""" try: import ldap3 # type: ignore return ldap3 except ImportError: return None def authenticate_ldap( username: str, password: str, config: Optional[LDAPConfig] = None, ) -> Tuple[bool, Optional[Dict], Optional[str]]: """ LDAP 인증 수행. Returns: (success, user_info_dict, error_message) - user_info_dict: {"username", "email", "display_name", "department", "groups", "role"} - error_message: 실패 시 오류 메시지 """ cfg = config or get_ldap_config() if not cfg.enabled or not cfg.server_url: return False, None, "LDAP 비활성화 상태" ldap3 = _try_import_ldap3() if ldap3 is None: return False, None, "ldap3 패키지 미설치 (pip install ldap3)" try: # 서버 연결 server = ldap3.Server( cfg.server_url, use_ssl = cfg.use_ssl, connect_timeout = cfg.timeout, ) # 서비스 계정으로 바인드하여 사용자 DN 조회 with ldap3.Connection( server, user = cfg.bind_dn, password = cfg.bind_password, auto_bind = ldap3.AUTO_BIND_NO_TLS, ) as conn: if cfg.use_tls: conn.start_tls() # 사용자 검색 user_filter = cfg.user_filter.format(username=username) attrs = [ cfg.uid_attr, cfg.email_attr, cfg.display_name_attr, cfg.department_attr, cfg.group_attr, "cn", "dn", ] conn.search( search_base = cfg.user_search_base or cfg.base_dn, search_filter = user_filter, attributes = attrs, ) if not conn.entries: return False, None, f"사용자 '{username}'를 찾을 수 없습니다." entry = conn.entries[0] user_dn = entry.entry_dn # 사용자 자격증명으로 바인드 (인증) with ldap3.Connection( server, user = user_dn, password = password, auto_bind = ldap3.AUTO_BIND_NO_TLS, ) as user_conn: if not user_conn.bound: return False, None, "잘못된 비밀번호" # 그룹 추출 groups_raw = [] if hasattr(entry, cfg.group_attr): groups_raw = [str(g) for g in getattr(entry, cfg.group_attr)] # CN 추출 (CN=GroupName,OU=...) groups = [] for g in groups_raw: if g.startswith("CN="): cn = g.split(",")[0].replace("CN=", "") groups.append(cn) else: groups.append(g) role = map_groups_to_role(groups) user_info = { "username": str(getattr(entry, cfg.uid_attr, username)), "email": str(getattr(entry, cfg.email_attr, "")) or None, "display_name": str(getattr(entry, cfg.display_name_attr, username)), "department": str(getattr(entry, cfg.department_attr, "")) or None, "groups": groups, "role": role, "dn": user_dn, } logger.info("LDAP 인증 성공: %s (role=%s)", username, role) return True, user_info, None except Exception as e: logger.warning("LDAP 인증 실패 (%s): %s", username, str(e)[:100]) return False, None, str(e)[:100] async def sync_ldap_user( db, username: str, ldap_info: Dict, ) -> Optional[object]: """ LDAP 인증 성공 후 로컬 DB와 동기화. - 신규 사용자: 자동 생성 - 기존 사용자: 이메일/역할 업데이트 Returns: User ORM 객체 """ from sqlalchemy import select as sa_select from models import User, UserRole try: q = sa_select(User).where(User.username == username) user = (await db.execute(q)).scalars().first() role_map = { "ADMIN": UserRole.ADMIN, "PM": UserRole.PM, "ENGINEER": UserRole.ENGINEER, "VIEWER": UserRole.VIEWER, } role_enum = role_map.get(ldap_info.get("role", "VIEWER"), UserRole.VIEWER) if user is None: # 신규 사용자 자동 생성 from core.auth import get_password_hash import secrets user = User( username = username, email = ldap_info.get("email") or f"{username}@ldap.local", hashed_pw = get_password_hash(secrets.token_hex(16)), # 임시 패스워드 role = role_enum, is_active = True, display_name = ldap_info.get("display_name"), department = ldap_info.get("department"), auth_type = "LDAP", must_change_pw = False, # LDAP 인증 사용자는 로컬 PW 변경 불필요 ) db.add(user) await db.flush() logger.info("LDAP 신규 사용자 생성: %s (%s)", username, role_enum) else: # 기존 사용자 업데이트 if ldap_info.get("email"): user.email = ldap_info["email"] user.role = role_enum if ldap_info.get("department"): user.department = ldap_info["department"] await db.commit() await db.refresh(user) return user except Exception as e: logger.error("LDAP 사용자 동기화 오류 (%s): %s", username, e) await db.rollback() return None def test_ldap_connection(config: Optional[LDAPConfig] = None) -> Dict: """LDAP 연결 테스트 (ping only).""" cfg = config or get_ldap_config() if not cfg.enabled: return {"success": False, "reason": "LDAP 비활성화"} if not cfg.server_url: return {"success": False, "reason": "server_url 미설정"} ldap3 = _try_import_ldap3() if ldap3 is None: return {"success": False, "reason": "ldap3 미설치"} try: server = ldap3.Server( cfg.server_url, use_ssl = cfg.use_ssl, connect_timeout = cfg.timeout, ) with ldap3.Connection(server, user=cfg.bind_dn, password=cfg.bind_password) as conn: bound = conn.bound return { "success": bound, "server_url": cfg.server_url, "bind_dn": cfg.bind_dn, "reason": "연결 성공" if bound else "바인드 실패", } except Exception as e: return {"success": False, "reason": str(e)[:100]}