""" 테넌트 셀프서비스 포털 — 기관 관리자가 직접 설정 관리 기능: - 기관 관리자가 직접 사용자 등록/삭제/역할 변경 - 서버 자산 셀프 등록 - 알림 수신자·임계값 설정 - 비밀번호 정책 설정 - 기관 정보 조회 및 수정 - 사용량 현황 (쿼터 대비 사용률) 엔드포인트: GET /api/portal/me — 내 기관 정보 요약 GET /api/portal/users — 기관 내 사용자 목록 POST /api/portal/users — 사용자 초대/등록 PUT /api/portal/users/{id}/role — 역할 변경 DELETE /api/portal/users/{id} — 사용자 비활성화 GET /api/portal/quota — 쿼터 사용량 PUT /api/portal/settings — 기관 알림·정책 설정 GET /api/portal/activity — 최근 활동 로그 """ from __future__ import annotations import logging import secrets from datetime import datetime from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, Field from sqlalchemy import select, func, desc from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user, require_admin_role from database import get_db from models import User, UserRole, AuditLog, Server, SRRequest logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/portal", tags=["Tenant Portal"]) # ── Pydantic 스키마 ────────────────────────────────────────────────────────── class PortalUserInvite(BaseModel): name: str = Field(..., max_length=100) email: str = Field(..., pattern=r'^[^@]+@[^@]+\.[^@]+$') role: UserRole = UserRole.ENGINEER department: Optional[str] = None class PortalUserOut(BaseModel): id: int name: str email: str role: str department: Optional[str] is_active: bool last_login_at: Optional[datetime] created_at: datetime class RoleUpdateRequest(BaseModel): role: UserRole class PortalSettings(BaseModel): notify_emails: List[str] = [] sr_alert_threshold: int = Field(10, ge=1, description="SR 적체 경고 임계값") sla_breach_alert: bool = True incident_notify: bool = True weekly_report_email: Optional[str] = None password_min_length: int = Field(8, ge=6, le=32) password_expires_days: int = Field(90, ge=30, le=365) mfa_required: bool = False class QuotaInfo(BaseModel): plan: str servers_used: int servers_limit: int users_used: int users_limit: int sr_this_month: int storage_mb: int # ── 엔드포인트 ─────────────────────────────────────────────────────────────── @router.get("/me") async def my_tenant_info( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """내 기관 정보 + 현황 요약.""" # 사용자 수 user_count = await db.execute( select(func.count(User.id)).where( User.tenant_id == user.tenant_id, User.is_active == True, ) ) # 서버 수 server_count = await db.execute( select(func.count(Server.id)).where(Server.institution_id == user.tenant_id) ) # 이번 달 SR 수 from datetime import date month_start = date.today().replace(day=1) sr_count = await db.execute( select(func.count(SRRequest.id)).where( SRRequest.created_at >= month_start ) ) # 미처리 SR open_sr = await db.execute( select(func.count(SRRequest.id)).where( SRRequest.status.in_(["OPEN", "IN_PROGRESS"]) ) ) return { "tenant_id": user.tenant_id, "organization": getattr(user, "institution_name", "기관명 미설정"), "plan": "STANDARD", # 실제: subscription 테이블 참조 "stats": { "users": user_count.scalar() or 0, "servers": server_count.scalar() or 0, "sr_this_month": sr_count.scalar() or 0, "open_sr": open_sr.scalar() or 0, }, "my_role": user.role.value if hasattr(user.role, 'value') else str(user.role), } @router.get("/users", response_model=List[PortalUserOut]) async def list_portal_users( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """기관 내 사용자 목록.""" rows = await db.execute( select(User).where( User.tenant_id == user.tenant_id, ).order_by(desc(User.created_at)) ) users = rows.scalars().all() return [ PortalUserOut( id=u.id, name=u.name, email=u.email, role=u.role.value if hasattr(u.role, 'value') else str(u.role), department=getattr(u, 'department', None), is_active=u.is_active, last_login_at=getattr(u, 'last_login_at', None), created_at=u.created_at, ) for u in users ] @router.post("/users") async def invite_user( req: PortalUserInvite, db: AsyncSession = Depends(get_db), user: User = Depends(require_admin_role), ): """사용자 초대 (기관 관리자 전용).""" # 중복 이메일 확인 existing = await db.execute(select(User).where(User.email == req.email)) if existing.scalar_one_or_none(): raise HTTPException(409, "이미 등록된 이메일입니다") # 임시 비밀번호 생성 temp_pw = secrets.token_urlsafe(12) from passlib.context import CryptContext pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto") new_user = User( name=req.name, email=req.email, hashed_password=pwd_ctx.hash(temp_pw), role=req.role, tenant_id=user.tenant_id, is_active=True, created_at=datetime.utcnow(), ) db.add(new_user) # 감사 로그 log = AuditLog( user_id=user.id, action="USER_INVITED", detail=f"신규 사용자 초대: {req.email} ({req.role})", created_at=datetime.utcnow(), ) db.add(log) await db.commit() await db.refresh(new_user) logger.info(f"사용자 초대: {req.email} by admin {user.email}") return { "ok": True, "user_id": new_user.id, "temp_password": temp_pw, "message": f"{req.email}에 임시 비밀번호를 발급했습니다. 최초 로그인 시 변경 필요.", } @router.put("/users/{target_id}/role") async def update_user_role( target_id: int, req: RoleUpdateRequest, db: AsyncSession = Depends(get_db), user: User = Depends(require_admin_role), ): """사용자 역할 변경.""" target = await db.execute( select(User).where( User.id == target_id, User.tenant_id == user.tenant_id, ) ) target_user = target.scalar_one_or_none() if not target_user: raise HTTPException(404, "사용자를 찾을 수 없습니다") old_role = target_user.role target_user.role = req.role log = AuditLog( user_id=user.id, action="ROLE_CHANGED", detail=f"역할 변경: {target_user.email} {old_role} → {req.role}", created_at=datetime.utcnow(), ) db.add(log) await db.commit() return {"ok": True, "user_id": target_id, "new_role": req.role} @router.delete("/users/{target_id}") async def deactivate_user( target_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(require_admin_role), ): """사용자 비활성화 (삭제 대신 비활성화로 감사 추적 유지).""" if target_id == user.id: raise HTTPException(400, "자기 자신을 비활성화할 수 없습니다") target = await db.execute( select(User).where( User.id == target_id, User.tenant_id == user.tenant_id, ) ) target_user = target.scalar_one_or_none() if not target_user: raise HTTPException(404, "사용자를 찾을 수 없습니다") target_user.is_active = False log = AuditLog( user_id=user.id, action="USER_DEACTIVATED", detail=f"사용자 비활성화: {target_user.email}", created_at=datetime.utcnow(), ) db.add(log) await db.commit() return {"ok": True} @router.get("/quota", response_model=QuotaInfo) async def get_quota( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """현재 쿼터 사용량 조회.""" # STANDARD 플랜 기본값 (실제: subscription 테이블 참조) PLAN_LIMITS = {"STANDARD": {"servers": 200, "users": 100}} limits = PLAN_LIMITS.get("STANDARD", {"servers": 20, "users": 10}) server_used = (await db.execute( select(func.count(Server.id)).where(Server.institution_id == user.tenant_id) )).scalar() or 0 user_used = (await db.execute( select(func.count(User.id)).where( User.tenant_id == user.tenant_id, User.is_active == True ) )).scalar() or 0 from datetime import date sr_this_month = (await db.execute( select(func.count(SRRequest.id)).where( SRRequest.created_at >= date.today().replace(day=1) ) )).scalar() or 0 return QuotaInfo( plan="STANDARD", servers_used=server_used, servers_limit=limits["servers"], users_used=user_used, users_limit=limits["users"], sr_this_month=sr_this_month, storage_mb=0, # 추후 구현 ) @router.get("/activity") async def recent_activity( limit: int = 20, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """기관 내 최근 활동 로그.""" rows = await db.execute( select(AuditLog, User.name.label("actor_name")).join( User, AuditLog.user_id == User.id, isouter=True ).where( User.tenant_id == user.tenant_id, ).order_by(desc(AuditLog.created_at)).limit(limit) ) return [ { "id": row.AuditLog.id, "action": row.AuditLog.action, "detail": row.AuditLog.detail, "actor": row.actor_name, "created_at": row.AuditLog.created_at, } for row in rows.all() ]