- itsm/ -> workspace/guardia-itsm/ - manager/ -> workspace/guardia-manager/ - app/ -> workspace/guardia-messenger/ - manual/ -> workspace/guardia-docs/ workspace/zioinfo-web/ unchanged. git mv preserves full commit history. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
334 lines
12 KiB
Python
334 lines
12 KiB
Python
"""
|
|
D-1: LDAP/AD 연동 API 라우터
|
|
|
|
엔드포인트:
|
|
GET /api/ldap/status — LDAP 연결 상태 조회
|
|
POST /api/ldap/test — LDAP 연결 테스트 (ping)
|
|
POST /api/ldap/authenticate — LDAP 인증 테스트 (관리자용)
|
|
GET /api/ldap/config — 현재 LDAP 설정 조회 (민감정보 마스킹)
|
|
PUT /api/ldap/config — LDAP 설정 업데이트
|
|
GET /api/ldap/group-map — 그룹→역할 매핑 조회
|
|
PUT /api/ldap/group-map — 그룹→역할 매핑 업데이트
|
|
POST /api/ldap/sync/{username} — 특정 사용자 LDAP 동기화
|
|
GET /api/ldap/users — LDAP 연동 사용자 목록
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
from typing import Dict, Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.auth import get_current_user
|
|
from database import get_db
|
|
from models import User, UserRole
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/api/ldap", tags=["ldap"])
|
|
|
|
|
|
# ── Pydantic 스키마 ──────────────────────────────────────────────────────────
|
|
|
|
class LDAPConfigIn(BaseModel):
|
|
"""LDAP 설정 업데이트 요청."""
|
|
server_url: Optional[str] = None
|
|
base_dn: Optional[str] = None
|
|
bind_dn: Optional[str] = None
|
|
bind_password: Optional[str] = None # 입력만 허용, 출력 절대 불가
|
|
user_search_base: Optional[str] = None
|
|
group_search_base: Optional[str] = None
|
|
user_filter: Optional[str] = None
|
|
use_ssl: Optional[bool] = None
|
|
use_tls: Optional[bool] = None
|
|
timeout: Optional[int] = None
|
|
enabled: Optional[bool] = None
|
|
|
|
|
|
class LDAPAuthTestIn(BaseModel):
|
|
"""LDAP 인증 테스트 요청."""
|
|
username: str
|
|
password: str
|
|
|
|
|
|
class GroupRoleMapIn(BaseModel):
|
|
"""그룹→역할 매핑 업데이트."""
|
|
mapping: Dict[str, str] # {"GroupName": "ADMIN|PM|ENGINEER|VIEWER"}
|
|
|
|
|
|
# ── 임시 설정 저장소 (실운영에서는 암호화 DB 컬럼 사용) ────────────────────────────
|
|
_runtime_override: Dict = {}
|
|
|
|
|
|
def _get_masked_config() -> Dict:
|
|
"""LDAP 설정 반환 (bind_password 마스킹)."""
|
|
from core.ldap_auth import get_ldap_config
|
|
cfg = get_ldap_config()
|
|
return {
|
|
"server_url": cfg.server_url,
|
|
"base_dn": cfg.base_dn,
|
|
"bind_dn": cfg.bind_dn,
|
|
"bind_password": "***" if cfg.bind_password else "", # 절대 노출 금지
|
|
"user_search_base": cfg.user_search_base,
|
|
"group_search_base": cfg.group_search_base,
|
|
"user_filter": cfg.user_filter,
|
|
"group_attr": cfg.group_attr,
|
|
"uid_attr": cfg.uid_attr,
|
|
"email_attr": cfg.email_attr,
|
|
"display_name_attr": cfg.display_name_attr,
|
|
"department_attr": cfg.department_attr,
|
|
"use_ssl": cfg.use_ssl,
|
|
"use_tls": cfg.use_tls,
|
|
"timeout": cfg.timeout,
|
|
"enabled": cfg.enabled,
|
|
}
|
|
|
|
|
|
# ── 엔드포인트 ───────────────────────────────────────────────────────────────
|
|
|
|
@router.get("/status")
|
|
async def ldap_status(
|
|
_u: User = Depends(get_current_user),
|
|
):
|
|
"""LDAP 연결 상태 및 설정 요약."""
|
|
from core.ldap_auth import get_ldap_config
|
|
cfg = get_ldap_config()
|
|
return {
|
|
"enabled": cfg.enabled,
|
|
"server_url": cfg.server_url if cfg.server_url else "(미설정)",
|
|
"base_dn": cfg.base_dn if cfg.base_dn else "(미설정)",
|
|
"bind_dn": cfg.bind_dn if cfg.bind_dn else "(미설정)",
|
|
"use_ssl": cfg.use_ssl,
|
|
"use_tls": cfg.use_tls,
|
|
"configured": bool(cfg.server_url and cfg.bind_dn),
|
|
}
|
|
|
|
|
|
@router.post("/test")
|
|
async def test_connection(
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""LDAP 서비스 계정으로 연결 테스트."""
|
|
if current_user.role not in (UserRole.ADMIN,):
|
|
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
|
|
|
|
from core.ldap_auth import test_ldap_connection
|
|
result = test_ldap_connection()
|
|
# bind_dn만 노출, bind_password 절대 미포함
|
|
result.pop("bind_password", None)
|
|
return result
|
|
|
|
|
|
@router.post("/authenticate")
|
|
async def test_authenticate(
|
|
body: LDAPAuthTestIn,
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""LDAP 사용자 인증 테스트 (관리자용)."""
|
|
if current_user.role not in (UserRole.ADMIN,):
|
|
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
|
|
|
|
from core.ldap_auth import authenticate_ldap
|
|
success, user_info, error = authenticate_ldap(body.username, body.password)
|
|
|
|
if not success:
|
|
return {"success": False, "error": error}
|
|
|
|
# 민감정보 제외하고 반환
|
|
safe_info = {k: v for k, v in (user_info or {}).items() if k != "dn"}
|
|
return {"success": True, "user_info": safe_info}
|
|
|
|
|
|
@router.get("/config")
|
|
async def get_config(
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""현재 LDAP 설정 조회 (비밀번호 마스킹)."""
|
|
if current_user.role not in (UserRole.ADMIN,):
|
|
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
|
|
return _get_masked_config()
|
|
|
|
|
|
@router.put("/config")
|
|
async def update_config(
|
|
body: LDAPConfigIn,
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""LDAP 설정 업데이트 (환경변수 재적용)."""
|
|
if current_user.role not in (UserRole.ADMIN,):
|
|
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
|
|
|
|
from core.ldap_auth import get_ldap_config, init_ldap_config, LDAPConfig
|
|
import dataclasses
|
|
|
|
cfg = get_ldap_config()
|
|
cfg_dict = dataclasses.asdict(cfg)
|
|
|
|
# 요청값으로 덮어쓰기 (비밀번호는 제공된 경우만)
|
|
update_data = body.model_dump(exclude_none=True)
|
|
cfg_dict.update(update_data)
|
|
|
|
new_cfg = LDAPConfig(**cfg_dict)
|
|
init_ldap_config(new_cfg)
|
|
|
|
logger.info("LDAP 설정 업데이트 by %s", current_user.username)
|
|
# 비밀번호 마스킹 후 반환
|
|
result = _get_masked_config()
|
|
return {"message": "LDAP 설정이 업데이트되었습니다.", "config": result}
|
|
|
|
|
|
@router.get("/group-map")
|
|
async def get_group_map(
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""그룹→역할 매핑 조회."""
|
|
if current_user.role not in (UserRole.ADMIN, UserRole.PM):
|
|
raise HTTPException(403, "PM/ADMIN 권한이 필요합니다.")
|
|
|
|
from core.ldap_auth import _group_role_map, DEFAULT_GROUP_ROLE_MAP
|
|
return {
|
|
"current_mapping": dict(_group_role_map),
|
|
"default_mapping": DEFAULT_GROUP_ROLE_MAP,
|
|
}
|
|
|
|
|
|
@router.put("/group-map")
|
|
async def update_group_map(
|
|
body: GroupRoleMapIn,
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""그룹→역할 매핑 업데이트."""
|
|
if current_user.role not in (UserRole.ADMIN,):
|
|
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
|
|
|
|
valid_roles = {"ADMIN", "PM", "ENGINEER", "VIEWER"}
|
|
for grp, role in body.mapping.items():
|
|
if role.upper() not in valid_roles:
|
|
raise HTTPException(400, f"유효하지 않은 역할: {role}. 허용값: {valid_roles}")
|
|
|
|
from core.ldap_auth import set_group_role_map
|
|
normalized = {k: v.upper() for k, v in body.mapping.items()}
|
|
set_group_role_map(normalized)
|
|
|
|
logger.info("그룹→역할 매핑 업데이트 by %s: %d 개", current_user.username, len(normalized))
|
|
return {"message": f"{len(normalized)}개 그룹 매핑이 업데이트되었습니다.", "mapping": normalized}
|
|
|
|
|
|
@router.post("/sync/{username}")
|
|
async def sync_user(
|
|
username: str,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""특정 사용자를 LDAP에서 조회하여 로컬 DB와 동기화."""
|
|
if current_user.role not in (UserRole.ADMIN,):
|
|
raise HTTPException(403, "ADMIN 권한이 필요합니다.")
|
|
|
|
from core.ldap_auth import get_ldap_config, authenticate_ldap
|
|
cfg = get_ldap_config()
|
|
|
|
if not cfg.enabled:
|
|
raise HTTPException(503, "LDAP가 비활성화 상태입니다.")
|
|
|
|
# 서비스 계정으로 사용자 정보만 조회 (인증 없이)
|
|
# authenticate_ldap은 인증도 포함하므로 여기서는 직접 조회 로직 구현
|
|
ldap3 = None
|
|
try:
|
|
import ldap3 as _ldap3 # type: ignore
|
|
ldap3 = _ldap3
|
|
except ImportError:
|
|
raise HTTPException(503, "ldap3 패키지가 설치되지 않았습니다.")
|
|
|
|
try:
|
|
server = ldap3.Server(cfg.server_url, use_ssl=cfg.use_ssl, connect_timeout=cfg.timeout)
|
|
with ldap3.Connection(server, user=cfg.bind_dn, password=cfg.bind_password,
|
|
auto_bind=ldap3.AUTO_BIND_NO_TLS) as conn:
|
|
if cfg.use_tls:
|
|
conn.start_tls()
|
|
user_filter = cfg.user_filter.format(username=username)
|
|
attrs = [cfg.uid_attr, cfg.email_attr, cfg.display_name_attr,
|
|
cfg.department_attr, cfg.group_attr]
|
|
conn.search(
|
|
search_base=cfg.user_search_base or cfg.base_dn,
|
|
search_filter=user_filter,
|
|
attributes=attrs,
|
|
)
|
|
if not conn.entries:
|
|
raise HTTPException(404, f"LDAP에서 사용자 '{username}'를 찾을 수 없습니다.")
|
|
|
|
entry = conn.entries[0]
|
|
groups_raw = []
|
|
if hasattr(entry, cfg.group_attr):
|
|
groups_raw = [str(g) for g in getattr(entry, cfg.group_attr)]
|
|
|
|
from core.ldap_auth import map_groups_to_role
|
|
groups = []
|
|
for g in groups_raw:
|
|
if g.startswith("CN="):
|
|
cn = g.split(",")[0].replace("CN=", "")
|
|
groups.append(cn)
|
|
else:
|
|
groups.append(g)
|
|
|
|
role = map_groups_to_role(groups)
|
|
ldap_info = {
|
|
"username": str(getattr(entry, cfg.uid_attr, username)),
|
|
"email": str(getattr(entry, cfg.email_attr, "")) or None,
|
|
"display_name": str(getattr(entry, cfg.display_name_attr, username)),
|
|
"department": str(getattr(entry, cfg.department_attr, "")) or None,
|
|
"groups": groups,
|
|
"role": role,
|
|
"dn": entry.entry_dn,
|
|
}
|
|
|
|
from core.ldap_auth import sync_ldap_user
|
|
user = await sync_ldap_user(db, username, ldap_info)
|
|
if user is None:
|
|
raise HTTPException(500, "사용자 동기화 중 오류가 발생했습니다.")
|
|
|
|
return {
|
|
"synced": True,
|
|
"username": user.username,
|
|
"email": user.email,
|
|
"role": user.role.value if hasattr(user.role, "value") else str(user.role),
|
|
"auth_type": getattr(user, "auth_type", "LDAP"),
|
|
"groups": groups,
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("LDAP 사용자 동기화 실패 (%s): %s", username, str(e)[:100])
|
|
raise HTTPException(500, f"LDAP 조회 오류: {str(e)[:80]}")
|
|
|
|
|
|
@router.get("/users")
|
|
async def list_ldap_users(
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""LDAP 인증 사용자 목록 조회."""
|
|
if current_user.role not in (UserRole.ADMIN, UserRole.PM):
|
|
raise HTTPException(403, "PM/ADMIN 권한이 필요합니다.")
|
|
|
|
q = select(User).where(
|
|
User.auth_type == "LDAP" # type: ignore[attr-defined]
|
|
).order_by(User.username)
|
|
|
|
users = (await db.execute(q)).scalars().all()
|
|
return [
|
|
{
|
|
"username": u.username,
|
|
"email": u.email,
|
|
"display_name": getattr(u, "display_name", None),
|
|
"department": getattr(u, "department", None),
|
|
"role": u.role.value if hasattr(u.role, "value") else str(u.role),
|
|
"is_active": u.is_active,
|
|
"auth_type": getattr(u, "auth_type", "LDAP"),
|
|
}
|
|
for u in users
|
|
]
|