guardia-itsm/core/ratelimit.py
DESKTOP-TKLFCPRython 64c27c3509 feat(itsm): G-1~G-12 확장 기능 + 하네스/봇/설치스크립트 구현
G-1: 메신저 Webhook Relay + _send_to_room 실제 httpx 호출 구현
G-2: POST /api/tasks/bulk SR 대량작업 엔드포인트 (최대 100건)
G-3: 라이선스 만료 알림 스케줄러 (매일 09:00 KST)
G-4: 체험판 upgrade_banner 필드 + license.py 배너 로직
G-5: core/auto_rca.py + incidents/problem auto-rca 엔드포인트
G-6: core/deploy_impact.py + vibe impact-analysis 엔드포인트
G-7: core/ticket_classifier.py + SR 생성 시 AI 분류 + ai-suggestion API
G-8: VulnPatchRecord 모델 + vuln_scan 패치추적 4개 엔드포인트
G-9: core/jira_sync.py + gateway Jira/Confluence 연동 엔드포인트
G-10: core/push_notify.py + routers/push.py + PushSubscription 모델
G-11: approvals 다중승인 (위임/서명/기한초과/마감연장)
G-12: alembic.ini + migrations/ + cicd/migrate_to_postgres.sh

하네스: guardia-orchestrator 확장기능 Phase 반영
봇명령어: /sr /status /license /bulk 슬래시 명령어 추가
설치스크립트: setup/ (Ubuntu, CentOS, RHEL, Windows) --test 옵션 포함

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 18:18:52 +09:00

177 lines
6.4 KiB
Python

"""
GUARDiA ITSM — API Rate Limiting (Enhancement F-3)
기능:
1. SlowAPI 기반 FastAPI 미들웨어 통합
2. IP 기반 기본 제한 + JWT 사용자 식별 제한
3. 엔드포인트별 차등 제한 설정
4. Redis 백엔드 (미설치 시 메모리 폴백)
5. X-RateLimit-* 응답 헤더 자동 추가
6. 429 Too Many Requests 시 재시도 안내
제한 설정:
- 기본: 60 req/min per IP
- 로그인: 10 req/min per IP (brute-force 방지)
- 일반 API: 120 req/min per user
- AI/LLM 엔드포인트: 10 req/min per user (비용 보호)
- 파일 업로드: 20 req/min per user
사용 예::
from core.ratelimit import limiter, DEFAULT_LIMIT
@router.get("/items")
@limiter.limit(DEFAULT_LIMIT)
async def list_items(request: Request, ...):
...
"""
from __future__ import annotations
import logging
import os
from typing import Callable, Optional
logger = logging.getLogger(__name__)
# ── 제한 상수 ─────────────────────────────────────────────────────────────────
DEFAULT_LIMIT = "120/minute" # 일반 API (인증된 사용자)
STRICT_LIMIT = "60/minute" # 기본 IP 제한
LOGIN_LIMIT = "10/minute" # 로그인 엔드포인트
AI_LIMIT = "10/minute" # LLM/AI 엔드포인트
UPLOAD_LIMIT = "20/minute" # 파일 업로드
ADMIN_LIMIT = "300/minute" # ADMIN 사용자 (완화)
# ── SlowAPI 초기화 ────────────────────────────────────────────────────────────
_limiter: Optional[object] = None
def _get_real_ip(request: object) -> str:
"""
클라이언트 실제 IP 추출.
리버스 프록시(X-Forwarded-For) 우선, 없으면 직접 IP.
"""
forwarded = getattr(getattr(request, "headers", None), "get", lambda k: None)("X-Forwarded-For")
if forwarded:
return forwarded.split(",")[0].strip()
client = getattr(request, "client", None)
if client:
return getattr(client, "host", "unknown")
return "unknown"
def _get_user_key(request: object) -> str:
"""
Rate limit 키 결정.
Authorization 헤더에서 JWT sub(사용자명)을 추출하여 사용.
없으면 IP 주소로 폴백.
"""
auth = getattr(getattr(request, "headers", None), "get", lambda k: None)("Authorization")
if auth and auth.startswith("Bearer "):
try:
from jose import jwt
from core.auth import SECRET_KEY, ALGORITHM
payload = jwt.decode(auth[7:], SECRET_KEY, algorithms=[ALGORITHM])
sub = payload.get("sub")
if sub:
return f"user:{sub}"
except Exception:
pass
return f"ip:{_get_real_ip(request)}"
def create_limiter():
"""SlowAPI 리미터 생성. slowapi 미설치 시 더미 리미터 반환."""
global _limiter
if _limiter is not None:
return _limiter
try:
from slowapi import Limiter
from slowapi.util import get_remote_address
# Redis 사용 시 slowapi storage URL 설정
redis_url = os.environ.get("REDIS_URL", "")
storage_uri = redis_url if redis_url and "redis" in redis_url else "memory://"
_limiter = Limiter(
key_func=_get_user_key,
default_limits=[STRICT_LIMIT],
storage_uri=storage_uri,
strategy="fixed-window",
)
logger.info("Rate limiter 초기화: storage=%s", storage_uri)
return _limiter
except ImportError:
logger.info("slowapi 미설치 — Rate limiting 비활성화 (더미 리미터 사용)")
_limiter = _DummyLimiter()
return _limiter
class _DummyLimiter:
"""slowapi 미설치 시 사용하는 no-op 리미터."""
def limit(self, limit_str: str, *args, **kwargs):
"""데코레이터 호출 시 그냥 원본 함수 반환."""
def decorator(func: Callable) -> Callable:
return func
return decorator
def shared_limit(self, *args, **kwargs):
def decorator(func: Callable) -> Callable:
return func
return decorator
# ── 싱글톤 접근 ───────────────────────────────────────────────────────────────
limiter = create_limiter()
# ── FastAPI 앱 통합 헬퍼 ──────────────────────────────────────────────────────
def setup_rate_limiting(app) -> None:
"""
FastAPI 앱에 SlowAPI 미들웨어 및 예외 핸들러 등록.
main.py 의 앱 생성 직후 호출::
from core.ratelimit import setup_rate_limiting
setup_rate_limiting(app)
"""
try:
from slowapi import _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(SlowAPIMiddleware)
logger.info("SlowAPI Rate Limiting 미들웨어 등록 완료")
except ImportError:
logger.info("slowapi 미설치 — Rate Limiting 미들웨어 스킵")
except Exception as exc:
logger.warning("Rate Limiting 설정 실패 (무시): %s", exc)
# ── 관리자 API: 현재 제한 상태 조회 ──────────────────────────────────────────
async def get_rate_limit_status(user_key: str) -> dict:
"""특정 사용자/IP의 현재 Rate Limit 상태 조회 (관리자 전용)."""
try:
from slowapi import Limiter
if isinstance(limiter, _DummyLimiter):
return {"status": "disabled", "message": "slowapi 미설치"}
return {
"status": "enabled",
"key": user_key,
"limits": {
"default": DEFAULT_LIMIT,
"login": LOGIN_LIMIT,
"ai": AI_LIMIT,
"upload": UPLOAD_LIMIT,
},
}
except Exception:
return {"status": "unknown"}