zioinfo-mail/itsm/core/ldap_auth.py
DESKTOP-TKLFCPR\ython e228faabf5 feat(itsm): G-1~G-12 확장 기능 + 하네스/봇/설치스크립트 구현
G-1: 메신저 Webhook Relay + _send_to_room 실제 httpx 호출 구현
G-2: POST /api/tasks/bulk SR 대량작업 엔드포인트 (최대 100건)
G-3: 라이선스 만료 알림 스케줄러 (매일 09:00 KST)
G-4: 체험판 upgrade_banner 필드 + license.py 배너 로직
G-5: core/auto_rca.py + incidents/problem auto-rca 엔드포인트
G-6: core/deploy_impact.py + vibe impact-analysis 엔드포인트
G-7: core/ticket_classifier.py + SR 생성 시 AI 분류 + ai-suggestion API
G-8: VulnPatchRecord 모델 + vuln_scan 패치추적 4개 엔드포인트
G-9: core/jira_sync.py + gateway Jira/Confluence 연동 엔드포인트
G-10: core/push_notify.py + routers/push.py + PushSubscription 모델
G-11: approvals 다중승인 (위임/서명/기한초과/마감연장)
G-12: alembic.ini + migrations/ + cicd/migrate_to_postgres.sh

하네스: guardia-orchestrator 확장기능 Phase 반영
봇명령어: /sr /status /license /bulk 슬래시 명령어 추가
설치스크립트: setup/ (Ubuntu, CentOS, RHEL, Windows) --test 옵션 포함

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 18:18:52 +09:00

333 lines
11 KiB
Python

"""
D-1: LDAP/AD 연동 엔진
기능:
1. LDAP/Active Directory 인증 (바인드 테스트)
2. 사용자 정보 조회 및 동기화
3. 그룹 멤버십 → 역할(Role) 매핑
4. Fallback: LDAP 미연결 시 로컬 DB 인증
5. LDAP 연결 설정 암호화 저장 지원
보안 제약:
- LDAP 서버 자격증명 노출 금지
- 바인드 DN/패스워드는 서버 환경변수 또는 암호화 DB 저장
- 외부 LDAP 서버 URL만 지원 (내부망 전용)
"""
from __future__ import annotations
import logging
import os
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# ── LDAP 설정 ─────────────────────────────────────────────────────────────────
@dataclass
class LDAPConfig:
"""LDAP 연결 설정."""
server_url: str = "" # ldap://192.168.0.10:389
base_dn: str = "" # DC=company,DC=local
bind_dn: str = "" # CN=svc-ldap,OU=ServiceAccounts,DC=company,DC=local
bind_password: str = "" # (절대 로그 출력 금지)
user_search_base: str = "" # OU=Users,DC=company,DC=local
user_filter: str = "(sAMAccountName={username})" # AD 기본 필터
group_search_base: str = "" # OU=Groups,DC=company,DC=local
group_attr: str = "memberOf" # 그룹 멤버십 속성
uid_attr: str = "sAMAccountName" # 사용자명 속성
email_attr: str = "mail"
display_name_attr: str = "displayName"
department_attr: str = "department"
use_ssl: bool = False
use_tls: bool = False
timeout: int = 10
enabled: bool = False # LDAP 인증 활성화 여부
# 그룹 → 역할 매핑 기본값
DEFAULT_GROUP_ROLE_MAP: Dict[str, str] = {
"GUARDiA-ADMIN": "ADMIN",
"GUARDiA-PM": "PM",
"GUARDiA-ENGINEER": "ENGINEER",
"GUARDiA-VIEWER": "VIEWER",
"Domain Admins": "ADMIN",
"IT-Operations": "ENGINEER",
"IT-Management": "PM",
}
def _load_config_from_env() -> LDAPConfig:
"""환경변수에서 LDAP 설정 로드."""
return LDAPConfig(
server_url = os.environ.get("LDAP_SERVER_URL", ""),
base_dn = os.environ.get("LDAP_BASE_DN", ""),
bind_dn = os.environ.get("LDAP_BIND_DN", ""),
bind_password = os.environ.get("LDAP_BIND_PASSWORD", ""),
user_search_base = os.environ.get("LDAP_USER_SEARCH_BASE", ""),
group_search_base = os.environ.get("LDAP_GROUP_SEARCH_BASE", ""),
use_ssl = os.environ.get("LDAP_USE_SSL", "false").lower() == "true",
use_tls = os.environ.get("LDAP_USE_TLS", "false").lower() == "true",
enabled = os.environ.get("LDAP_ENABLED", "false").lower() == "true",
)
# 런타임 설정 (init_ldap_config()로 초기화)
_current_config: Optional[LDAPConfig] = None
_group_role_map: Dict[str, str] = dict(DEFAULT_GROUP_ROLE_MAP)
def init_ldap_config(config: Optional[LDAPConfig] = None) -> LDAPConfig:
"""LDAP 설정 초기화. config가 None이면 환경변수에서 로드."""
global _current_config
_current_config = config or _load_config_from_env()
return _current_config
def get_ldap_config() -> LDAPConfig:
"""현재 LDAP 설정 반환."""
global _current_config
if _current_config is None:
_current_config = _load_config_from_env()
return _current_config
def set_group_role_map(mapping: Dict[str, str]) -> None:
"""그룹→역할 매핑 업데이트."""
global _group_role_map
_group_role_map.update(mapping)
def map_groups_to_role(groups: List[str]) -> str:
"""
LDAP 그룹 목록에서 ITSM 역할을 매핑.
가장 높은 권한 역할 반환.
"""
role_priority = {"ADMIN": 4, "PM": 3, "ENGINEER": 2, "VIEWER": 1}
best_role = "VIEWER"
best_prio = 0
for grp in groups:
# 완전 일치
if grp in _group_role_map:
role = _group_role_map[grp]
prio = role_priority.get(role, 0)
if prio > best_prio:
best_prio = prio
best_role = role
else:
# 그룹 이름 부분 일치 (CN=... 형식 지원)
for map_grp, role in _group_role_map.items():
if map_grp.lower() in grp.lower():
prio = role_priority.get(role, 0)
if prio > best_prio:
best_prio = prio
best_role = role
return best_role
# ── LDAP 인증 함수 ─────────────────────────────────────────────────────────────
def _try_import_ldap3():
"""ldap3 패키지 임포트 시도. 없으면 None 반환."""
try:
import ldap3 # type: ignore
return ldap3
except ImportError:
return None
def authenticate_ldap(
username: str,
password: str,
config: Optional[LDAPConfig] = None,
) -> Tuple[bool, Optional[Dict], Optional[str]]:
"""
LDAP 인증 수행.
Returns: (success, user_info_dict, error_message)
- user_info_dict: {"username", "email", "display_name", "department", "groups", "role"}
- error_message: 실패 시 오류 메시지
"""
cfg = config or get_ldap_config()
if not cfg.enabled or not cfg.server_url:
return False, None, "LDAP 비활성화 상태"
ldap3 = _try_import_ldap3()
if ldap3 is None:
return False, None, "ldap3 패키지 미설치 (pip install ldap3)"
try:
# 서버 연결
server = ldap3.Server(
cfg.server_url,
use_ssl = cfg.use_ssl,
connect_timeout = cfg.timeout,
)
# 서비스 계정으로 바인드하여 사용자 DN 조회
with ldap3.Connection(
server,
user = cfg.bind_dn,
password = cfg.bind_password,
auto_bind = ldap3.AUTO_BIND_NO_TLS,
) as conn:
if cfg.use_tls:
conn.start_tls()
# 사용자 검색
user_filter = cfg.user_filter.format(username=username)
attrs = [
cfg.uid_attr, cfg.email_attr,
cfg.display_name_attr, cfg.department_attr,
cfg.group_attr, "cn", "dn",
]
conn.search(
search_base = cfg.user_search_base or cfg.base_dn,
search_filter = user_filter,
attributes = attrs,
)
if not conn.entries:
return False, None, f"사용자 '{username}'를 찾을 수 없습니다."
entry = conn.entries[0]
user_dn = entry.entry_dn
# 사용자 자격증명으로 바인드 (인증)
with ldap3.Connection(
server,
user = user_dn,
password = password,
auto_bind = ldap3.AUTO_BIND_NO_TLS,
) as user_conn:
if not user_conn.bound:
return False, None, "잘못된 비밀번호"
# 그룹 추출
groups_raw = []
if hasattr(entry, cfg.group_attr):
groups_raw = [str(g) for g in getattr(entry, cfg.group_attr)]
# CN 추출 (CN=GroupName,OU=...)
groups = []
for g in groups_raw:
if g.startswith("CN="):
cn = g.split(",")[0].replace("CN=", "")
groups.append(cn)
else:
groups.append(g)
role = map_groups_to_role(groups)
user_info = {
"username": str(getattr(entry, cfg.uid_attr, username)),
"email": str(getattr(entry, cfg.email_attr, "")) or None,
"display_name": str(getattr(entry, cfg.display_name_attr, username)),
"department": str(getattr(entry, cfg.department_attr, "")) or None,
"groups": groups,
"role": role,
"dn": user_dn,
}
logger.info("LDAP 인증 성공: %s (role=%s)", username, role)
return True, user_info, None
except Exception as e:
logger.warning("LDAP 인증 실패 (%s): %s", username, str(e)[:100])
return False, None, str(e)[:100]
async def sync_ldap_user(
db,
username: str,
ldap_info: Dict,
) -> Optional[object]:
"""
LDAP 인증 성공 후 로컬 DB와 동기화.
- 신규 사용자: 자동 생성
- 기존 사용자: 이메일/역할 업데이트
Returns: User ORM 객체
"""
from sqlalchemy import select as sa_select
from models import User, UserRole
try:
q = sa_select(User).where(User.username == username)
user = (await db.execute(q)).scalars().first()
role_map = {
"ADMIN": UserRole.ADMIN,
"PM": UserRole.PM,
"ENGINEER": UserRole.ENGINEER,
"VIEWER": UserRole.VIEWER,
}
role_enum = role_map.get(ldap_info.get("role", "VIEWER"), UserRole.VIEWER)
if user is None:
# 신규 사용자 자동 생성
from core.auth import get_password_hash
import secrets
user = User(
username = username,
email = ldap_info.get("email") or f"{username}@ldap.local",
hashed_pw = get_password_hash(secrets.token_hex(16)), # 임시 패스워드
role = role_enum,
is_active = True,
display_name = ldap_info.get("display_name"),
department = ldap_info.get("department"),
auth_type = "LDAP",
must_change_pw = False, # LDAP 인증 사용자는 로컬 PW 변경 불필요
)
db.add(user)
await db.flush()
logger.info("LDAP 신규 사용자 생성: %s (%s)", username, role_enum)
else:
# 기존 사용자 업데이트
if ldap_info.get("email"):
user.email = ldap_info["email"]
user.role = role_enum
if ldap_info.get("department"):
user.department = ldap_info["department"]
await db.commit()
await db.refresh(user)
return user
except Exception as e:
logger.error("LDAP 사용자 동기화 오류 (%s): %s", username, e)
await db.rollback()
return None
def test_ldap_connection(config: Optional[LDAPConfig] = None) -> Dict:
"""LDAP 연결 테스트 (ping only)."""
cfg = config or get_ldap_config()
if not cfg.enabled:
return {"success": False, "reason": "LDAP 비활성화"}
if not cfg.server_url:
return {"success": False, "reason": "server_url 미설정"}
ldap3 = _try_import_ldap3()
if ldap3 is None:
return {"success": False, "reason": "ldap3 미설치"}
try:
server = ldap3.Server(
cfg.server_url,
use_ssl = cfg.use_ssl,
connect_timeout = cfg.timeout,
)
with ldap3.Connection(server, user=cfg.bind_dn, password=cfg.bind_password) as conn:
bound = conn.bound
return {
"success": bound,
"server_url": cfg.server_url,
"bind_dn": cfg.bind_dn,
"reason": "연결 성공" if bound else "바인드 실패",
}
except Exception as e:
return {"success": False, "reason": str(e)[:100]}