""" GUARDiA ITSM — 캐시 레이어 (Enhancement F-2) Redis 우선, 미설치/연결 실패 시 인메모리 LRU 캐시로 자동 폴백. 기능: 1. 비동기 get/set/delete/invalidate 2. 접두어 기반 패턴 삭제 (invalidate_prefix) 3. 함수 결과 캐싱 데코레이터 (cached) 4. 캐시 통계 (히트/미스/에러 카운터) 5. 상태 조회 엔드포인트용 헬퍼 주요 캐시 키: itsm:tasks:list:{hash} — SR 목록 (30초) itsm:stats:summary — 대시보드 통계 (60초) itsm:server:list — 서버 목록 (120초) itsm:oncall:today — 오늘 당직자 (300초) """ from __future__ import annotations import asyncio import hashlib import json import logging import os import time from collections import OrderedDict from functools import wraps from typing import Any, Callable, Optional logger = logging.getLogger(__name__) # ── 설정 ───────────────────────────────────────────────────────────────────── REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379/0") CACHE_ENABLED = os.environ.get("CACHE_ENABLED", "true").lower() == "true" CACHE_KEY_PREFIX = "itsm:" _DEFAULT_TTL_SEC = 60 _MEMORY_MAX_ITEMS = 1000 # ── 통계 카운터 ─────────────────────────────────────────────────────────────── _stats = {"hits": 0, "misses": 0, "errors": 0, "sets": 0} # ═══════════════════════════════════════════════════════════════════════════════ # ── 인메모리 LRU 캐시 (Redis 폴백) ───────────────────────────────────────────── # ═══════════════════════════════════════════════════════════════════════════════ class _MemoryCache: """TTL 지원 인메모리 LRU 캐시.""" def __init__(self, maxsize: int = _MEMORY_MAX_ITEMS): self._store: OrderedDict[str, tuple[Any, float]] = OrderedDict() self._maxsize = maxsize def get(self, key: str) -> Optional[Any]: if key not in self._store: return None value, expires_at = self._store[key] if time.monotonic() > expires_at: del self._store[key] return None self._store.move_to_end(key) return value def set(self, key: str, value: Any, ttl: int = _DEFAULT_TTL_SEC) -> None: if key in self._store: self._store.move_to_end(key) self._store[key] = (value, time.monotonic() + ttl) if len(self._store) > self._maxsize: self._store.popitem(last=False) def delete(self, key: str) -> None: self._store.pop(key, None) def keys_with_prefix(self, prefix: str) -> list[str]: return [k for k in self._store if k.startswith(prefix)] def flush(self) -> int: n = len(self._store) self._store.clear() return n def info(self) -> dict: now = time.monotonic() live = sum(1 for _, (_, exp) in self._store.items() if exp > now) return {"backend": "memory", "items_total": len(self._store), "items_live": live} _mem_cache = _MemoryCache() _redis_client: Optional[Any] = None # aioredis / redis.asyncio 클라이언트 # ── Redis 연결 ──────────────────────────────────────────────────────────────── async def _get_redis(): """Redis 클라이언트 싱글톤 (연결 실패 시 None).""" global _redis_client if _redis_client is not None: return _redis_client try: import redis.asyncio as aioredis client = aioredis.from_url(REDIS_URL, decode_responses=True, socket_timeout=1.0) await client.ping() _redis_client = client logger.info("Redis 캐시 연결 성공: %s", REDIS_URL) return _redis_client except Exception as exc: logger.info("Redis 미연결, 인메모리 캐시 사용: %s", exc) return None async def close_redis() -> None: """앱 종료 시 Redis 연결 해제.""" global _redis_client if _redis_client: await _redis_client.aclose() _redis_client = None # ═══════════════════════════════════════════════════════════════════════════════ # ── 공개 API ────────────────────────────────────────────────────────────────── # ═══════════════════════════════════════════════════════════════════════════════ def _full_key(key: str) -> str: return f"{CACHE_KEY_PREFIX}{key}" async def cache_get(key: str) -> Optional[Any]: """캐시에서 값 조회. 없으면 None.""" if not CACHE_ENABLED: return None full_key = _full_key(key) try: redis = await _get_redis() if redis: raw = await redis.get(full_key) if raw is None: _stats["misses"] += 1 return None _stats["hits"] += 1 return json.loads(raw) else: val = _mem_cache.get(full_key) if val is None: _stats["misses"] += 1 else: _stats["hits"] += 1 return val except Exception as exc: _stats["errors"] += 1 logger.debug("cache_get 오류: key=%s err=%s", key, exc) return None async def cache_set(key: str, value: Any, ttl: int = _DEFAULT_TTL_SEC) -> bool: """캐시에 값 저장.""" if not CACHE_ENABLED: return False full_key = _full_key(key) try: redis = await _get_redis() if redis: await redis.setex(full_key, ttl, json.dumps(value, default=str)) else: _mem_cache.set(full_key, value, ttl) _stats["sets"] += 1 return True except Exception as exc: _stats["errors"] += 1 logger.debug("cache_set 오류: key=%s err=%s", key, exc) return False async def cache_delete(key: str) -> bool: """캐시에서 특정 키 삭제.""" if not CACHE_ENABLED: return True full_key = _full_key(key) try: redis = await _get_redis() if redis: await redis.delete(full_key) else: _mem_cache.delete(full_key) return True except Exception as exc: _stats["errors"] += 1 logger.debug("cache_delete 오류: key=%s err=%s", key, exc) return False async def cache_invalidate_prefix(prefix: str) -> int: """접두어로 시작하는 모든 캐시 키 삭제.""" if not CACHE_ENABLED: return 0 full_prefix = _full_key(prefix) count = 0 try: redis = await _get_redis() if redis: keys = await redis.keys(f"{full_prefix}*") if keys: count = await redis.delete(*keys) else: keys = _mem_cache.keys_with_prefix(full_prefix) for k in keys: _mem_cache.delete(k) count = len(keys) except Exception as exc: _stats["errors"] += 1 logger.debug("cache_invalidate_prefix 오류: prefix=%s err=%s", prefix, exc) return count def make_cache_key(*args, **kwargs) -> str: """인자 목록으로 캐시 키 생성 (MD5 해시).""" raw = json.dumps({"args": list(args), "kwargs": kwargs}, sort_keys=True, default=str) return hashlib.md5(raw.encode()).hexdigest()[:16] # ── 캐시 데코레이터 ─────────────────────────────────────────────────────────── def cached(key_prefix: str, ttl: int = _DEFAULT_TTL_SEC): """ 비동기 함수 결과 캐싱 데코레이터. 사용 예:: @cached("tasks:list", ttl=30) async def get_tasks(...): ... """ def decorator(func: Callable): @wraps(func) async def wrapper(*args, **kwargs): cache_key = f"{key_prefix}:{make_cache_key(*args, **kwargs)}" cached_val = await cache_get(cache_key) if cached_val is not None: return cached_val result = await func(*args, **kwargs) await cache_set(cache_key, result, ttl) return result return wrapper return decorator # ── 캐시 상태 조회 ──────────────────────────────────────────────────────────── async def cache_info() -> dict: """캐시 상태 정보 반환 (관리자 대시보드용).""" try: redis = await _get_redis() if redis: info = await redis.info("stats") backend_info = { "backend": "redis", "url": REDIS_URL.split("@")[-1], # 비밀번호 마스킹 "keyspace_hits": info.get("keyspace_hits", 0), "keyspace_misses": info.get("keyspace_misses", 0), } else: backend_info = _mem_cache.info() except Exception: backend_info = {"backend": "memory (redis unavailable)", **_mem_cache.info()} return { "enabled": CACHE_ENABLED, "backend": backend_info, "app_stats": dict(_stats), } async def cache_flush_all() -> int: """전체 캐시 초기화 (테스트/관리자 전용).""" try: redis = await _get_redis() if redis: keys = await redis.keys(f"{CACHE_KEY_PREFIX}*") if keys: return await redis.delete(*keys) return 0 else: return _mem_cache.flush() except Exception as exc: logger.warning("cache_flush_all 오류: %s", exc) return 0