zioinfo-mail/itsm/routers/ldap.py
DESKTOP-TKLFCPR\ython e228faabf5 feat(itsm): G-1~G-12 확장 기능 + 하네스/봇/설치스크립트 구현
G-1: 메신저 Webhook Relay + _send_to_room 실제 httpx 호출 구현
G-2: POST /api/tasks/bulk SR 대량작업 엔드포인트 (최대 100건)
G-3: 라이선스 만료 알림 스케줄러 (매일 09:00 KST)
G-4: 체험판 upgrade_banner 필드 + license.py 배너 로직
G-5: core/auto_rca.py + incidents/problem auto-rca 엔드포인트
G-6: core/deploy_impact.py + vibe impact-analysis 엔드포인트
G-7: core/ticket_classifier.py + SR 생성 시 AI 분류 + ai-suggestion API
G-8: VulnPatchRecord 모델 + vuln_scan 패치추적 4개 엔드포인트
G-9: core/jira_sync.py + gateway Jira/Confluence 연동 엔드포인트
G-10: core/push_notify.py + routers/push.py + PushSubscription 모델
G-11: approvals 다중승인 (위임/서명/기한초과/마감연장)
G-12: alembic.ini + migrations/ + cicd/migrate_to_postgres.sh

하네스: guardia-orchestrator 확장기능 Phase 반영
봇명령어: /sr /status /license /bulk 슬래시 명령어 추가
설치스크립트: setup/ (Ubuntu, CentOS, RHEL, Windows) --test 옵션 포함

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 18:18:52 +09:00

334 lines
12 KiB
Python

"""
D-1: LDAP/AD 연동 API 라우터
엔드포인트:
GET /api/ldap/status — LDAP 연결 상태 조회
POST /api/ldap/test — LDAP 연결 테스트 (ping)
POST /api/ldap/authenticate — LDAP 인증 테스트 (관리자용)
GET /api/ldap/config — 현재 LDAP 설정 조회 (민감정보 마스킹)
PUT /api/ldap/config — LDAP 설정 업데이트
GET /api/ldap/group-map — 그룹→역할 매핑 조회
PUT /api/ldap/group-map — 그룹→역할 매핑 업데이트
POST /api/ldap/sync/{username} — 특정 사용자 LDAP 동기화
GET /api/ldap/users — LDAP 연동 사용자 목록
"""
from __future__ import annotations
import logging
import os
from typing import Dict, Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from database import get_db
from models import User, UserRole
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/ldap", tags=["ldap"])
# ── Pydantic 스키마 ──────────────────────────────────────────────────────────
class LDAPConfigIn(BaseModel):
"""LDAP 설정 업데이트 요청."""
server_url: Optional[str] = None
base_dn: Optional[str] = None
bind_dn: Optional[str] = None
bind_password: Optional[str] = None # 입력만 허용, 출력 절대 불가
user_search_base: Optional[str] = None
group_search_base: Optional[str] = None
user_filter: Optional[str] = None
use_ssl: Optional[bool] = None
use_tls: Optional[bool] = None
timeout: Optional[int] = None
enabled: Optional[bool] = None
class LDAPAuthTestIn(BaseModel):
"""LDAP 인증 테스트 요청."""
username: str
password: str
class GroupRoleMapIn(BaseModel):
"""그룹→역할 매핑 업데이트."""
mapping: Dict[str, str] # {"GroupName": "ADMIN|PM|ENGINEER|VIEWER"}
# ── 임시 설정 저장소 (실운영에서는 암호화 DB 컬럼 사용) ────────────────────────────
_runtime_override: Dict = {}
def _get_masked_config() -> Dict:
"""LDAP 설정 반환 (bind_password 마스킹)."""
from core.ldap_auth import get_ldap_config
cfg = get_ldap_config()
return {
"server_url": cfg.server_url,
"base_dn": cfg.base_dn,
"bind_dn": cfg.bind_dn,
"bind_password": "***" if cfg.bind_password else "", # 절대 노출 금지
"user_search_base": cfg.user_search_base,
"group_search_base": cfg.group_search_base,
"user_filter": cfg.user_filter,
"group_attr": cfg.group_attr,
"uid_attr": cfg.uid_attr,
"email_attr": cfg.email_attr,
"display_name_attr": cfg.display_name_attr,
"department_attr": cfg.department_attr,
"use_ssl": cfg.use_ssl,
"use_tls": cfg.use_tls,
"timeout": cfg.timeout,
"enabled": cfg.enabled,
}
# ── 엔드포인트 ───────────────────────────────────────────────────────────────
@router.get("/status")
async def ldap_status(
_u: User = Depends(get_current_user),
):
"""LDAP 연결 상태 및 설정 요약."""
from core.ldap_auth import get_ldap_config
cfg = get_ldap_config()
return {
"enabled": cfg.enabled,
"server_url": cfg.server_url if cfg.server_url else "(미설정)",
"base_dn": cfg.base_dn if cfg.base_dn else "(미설정)",
"bind_dn": cfg.bind_dn if cfg.bind_dn else "(미설정)",
"use_ssl": cfg.use_ssl,
"use_tls": cfg.use_tls,
"configured": bool(cfg.server_url and cfg.bind_dn),
}
@router.post("/test")
async def test_connection(
current_user: User = Depends(get_current_user),
):
"""LDAP 서비스 계정으로 연결 테스트."""
if current_user.role not in (UserRole.ADMIN,):
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
from core.ldap_auth import test_ldap_connection
result = test_ldap_connection()
# bind_dn만 노출, bind_password 절대 미포함
result.pop("bind_password", None)
return result
@router.post("/authenticate")
async def test_authenticate(
body: LDAPAuthTestIn,
current_user: User = Depends(get_current_user),
):
"""LDAP 사용자 인증 테스트 (관리자용)."""
if current_user.role not in (UserRole.ADMIN,):
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
from core.ldap_auth import authenticate_ldap
success, user_info, error = authenticate_ldap(body.username, body.password)
if not success:
return {"success": False, "error": error}
# 민감정보 제외하고 반환
safe_info = {k: v for k, v in (user_info or {}).items() if k != "dn"}
return {"success": True, "user_info": safe_info}
@router.get("/config")
async def get_config(
current_user: User = Depends(get_current_user),
):
"""현재 LDAP 설정 조회 (비밀번호 마스킹)."""
if current_user.role not in (UserRole.ADMIN,):
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
return _get_masked_config()
@router.put("/config")
async def update_config(
body: LDAPConfigIn,
current_user: User = Depends(get_current_user),
):
"""LDAP 설정 업데이트 (환경변수 재적용)."""
if current_user.role not in (UserRole.ADMIN,):
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
from core.ldap_auth import get_ldap_config, init_ldap_config, LDAPConfig
import dataclasses
cfg = get_ldap_config()
cfg_dict = dataclasses.asdict(cfg)
# 요청값으로 덮어쓰기 (비밀번호는 제공된 경우만)
update_data = body.model_dump(exclude_none=True)
cfg_dict.update(update_data)
new_cfg = LDAPConfig(**cfg_dict)
init_ldap_config(new_cfg)
logger.info("LDAP 설정 업데이트 by %s", current_user.username)
# 비밀번호 마스킹 후 반환
result = _get_masked_config()
return {"message": "LDAP 설정이 업데이트되었습니다.", "config": result}
@router.get("/group-map")
async def get_group_map(
current_user: User = Depends(get_current_user),
):
"""그룹→역할 매핑 조회."""
if current_user.role not in (UserRole.ADMIN, UserRole.PM):
raise HTTPException(403, "PM/ADMIN 권한이 필요합니다.")
from core.ldap_auth import _group_role_map, DEFAULT_GROUP_ROLE_MAP
return {
"current_mapping": dict(_group_role_map),
"default_mapping": DEFAULT_GROUP_ROLE_MAP,
}
@router.put("/group-map")
async def update_group_map(
body: GroupRoleMapIn,
current_user: User = Depends(get_current_user),
):
"""그룹→역할 매핑 업데이트."""
if current_user.role not in (UserRole.ADMIN,):
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
valid_roles = {"ADMIN", "PM", "ENGINEER", "VIEWER"}
for grp, role in body.mapping.items():
if role.upper() not in valid_roles:
raise HTTPException(400, f"유효하지 않은 역할: {role}. 허용값: {valid_roles}")
from core.ldap_auth import set_group_role_map
normalized = {k: v.upper() for k, v in body.mapping.items()}
set_group_role_map(normalized)
logger.info("그룹→역할 매핑 업데이트 by %s: %d", current_user.username, len(normalized))
return {"message": f"{len(normalized)}개 그룹 매핑이 업데이트되었습니다.", "mapping": normalized}
@router.post("/sync/{username}")
async def sync_user(
username: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""특정 사용자를 LDAP에서 조회하여 로컬 DB와 동기화."""
if current_user.role not in (UserRole.ADMIN,):
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
from core.ldap_auth import get_ldap_config, authenticate_ldap
cfg = get_ldap_config()
if not cfg.enabled:
raise HTTPException(503, "LDAP가 비활성화 상태입니다.")
# 서비스 계정으로 사용자 정보만 조회 (인증 없이)
# authenticate_ldap은 인증도 포함하므로 여기서는 직접 조회 로직 구현
ldap3 = None
try:
import ldap3 as _ldap3 # type: ignore
ldap3 = _ldap3
except ImportError:
raise HTTPException(503, "ldap3 패키지가 설치되지 않았습니다.")
try:
server = ldap3.Server(cfg.server_url, use_ssl=cfg.use_ssl, connect_timeout=cfg.timeout)
with ldap3.Connection(server, user=cfg.bind_dn, password=cfg.bind_password,
auto_bind=ldap3.AUTO_BIND_NO_TLS) as conn:
if cfg.use_tls:
conn.start_tls()
user_filter = cfg.user_filter.format(username=username)
attrs = [cfg.uid_attr, cfg.email_attr, cfg.display_name_attr,
cfg.department_attr, cfg.group_attr]
conn.search(
search_base=cfg.user_search_base or cfg.base_dn,
search_filter=user_filter,
attributes=attrs,
)
if not conn.entries:
raise HTTPException(404, f"LDAP에서 사용자 '{username}'를 찾을 수 없습니다.")
entry = conn.entries[0]
groups_raw = []
if hasattr(entry, cfg.group_attr):
groups_raw = [str(g) for g in getattr(entry, cfg.group_attr)]
from core.ldap_auth import map_groups_to_role
groups = []
for g in groups_raw:
if g.startswith("CN="):
cn = g.split(",")[0].replace("CN=", "")
groups.append(cn)
else:
groups.append(g)
role = map_groups_to_role(groups)
ldap_info = {
"username": str(getattr(entry, cfg.uid_attr, username)),
"email": str(getattr(entry, cfg.email_attr, "")) or None,
"display_name": str(getattr(entry, cfg.display_name_attr, username)),
"department": str(getattr(entry, cfg.department_attr, "")) or None,
"groups": groups,
"role": role,
"dn": entry.entry_dn,
}
from core.ldap_auth import sync_ldap_user
user = await sync_ldap_user(db, username, ldap_info)
if user is None:
raise HTTPException(500, "사용자 동기화 중 오류가 발생했습니다.")
return {
"synced": True,
"username": user.username,
"email": user.email,
"role": user.role.value if hasattr(user.role, "value") else str(user.role),
"auth_type": getattr(user, "auth_type", "LDAP"),
"groups": groups,
}
except HTTPException:
raise
except Exception as e:
logger.error("LDAP 사용자 동기화 실패 (%s): %s", username, str(e)[:100])
raise HTTPException(500, f"LDAP 조회 오류: {str(e)[:80]}")
@router.get("/users")
async def list_ldap_users(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""LDAP 인증 사용자 목록 조회."""
if current_user.role not in (UserRole.ADMIN, UserRole.PM):
raise HTTPException(403, "PM/ADMIN 권한이 필요합니다.")
q = select(User).where(
User.auth_type == "LDAP" # type: ignore[attr-defined]
).order_by(User.username)
users = (await db.execute(q)).scalars().all()
return [
{
"username": u.username,
"email": u.email,
"display_name": getattr(u, "display_name", None),
"department": getattr(u, "department", None),
"role": u.role.value if hasattr(u.role, "value") else str(u.role),
"is_active": u.is_active,
"auth_type": getattr(u, "auth_type", "LDAP"),
}
for u in users
]