guardia-itsm/test_b1_anomaly.py
DESKTOP-TKLFCPRython 64c27c3509 feat(itsm): G-1~G-12 확장 기능 + 하네스/봇/설치스크립트 구현
G-1: 메신저 Webhook Relay + _send_to_room 실제 httpx 호출 구현
G-2: POST /api/tasks/bulk SR 대량작업 엔드포인트 (최대 100건)
G-3: 라이선스 만료 알림 스케줄러 (매일 09:00 KST)
G-4: 체험판 upgrade_banner 필드 + license.py 배너 로직
G-5: core/auto_rca.py + incidents/problem auto-rca 엔드포인트
G-6: core/deploy_impact.py + vibe impact-analysis 엔드포인트
G-7: core/ticket_classifier.py + SR 생성 시 AI 분류 + ai-suggestion API
G-8: VulnPatchRecord 모델 + vuln_scan 패치추적 4개 엔드포인트
G-9: core/jira_sync.py + gateway Jira/Confluence 연동 엔드포인트
G-10: core/push_notify.py + routers/push.py + PushSubscription 모델
G-11: approvals 다중승인 (위임/서명/기한초과/마감연장)
G-12: alembic.ini + migrations/ + cicd/migrate_to_postgres.sh

하네스: guardia-orchestrator 확장기능 Phase 반영
봇명령어: /sr /status /license /bulk 슬래시 명령어 추가
설치스크립트: setup/ (Ubuntu, CentOS, RHEL, Windows) --test 옵션 포함

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 18:18:52 +09:00

265 lines
11 KiB
Python

"""B-1 AI 이상 탐지 에이전트 테스트"""
import sys, ast, asyncio, os
os.environ.setdefault("GUARDIA_SECRET_KEY", "test-b1-secret-key-32bytes-padded!")
os.environ.setdefault("DATABASE_URL", "sqlite+aiosqlite:///./test_b1.db")
os.environ.setdefault("GUARDIA_PROJECTS_ROOT", r"C:\GUARDiA\projects")
ok = True
print("=== 1. 구문 검사 ===")
files = ["core/anomaly.py", "routers/anomaly.py", "main.py"]
for f in files:
try:
with open(f, encoding="utf-8") as fh:
src = fh.read()
ast.parse(src)
print(f" OK {f}")
except SyntaxError as e:
print(f" ERR {f}: {e}")
ok = False
print("\n=== 2. models.py AnomalyEvent 모델 확인 ===")
with open("models.py", encoding="utf-8") as f:
models_src = f.read()
checks = [
("class AnomalySeverity(str, Enum):", "AnomalySeverity Enum"),
("class AnomalyStatus(str, Enum):", "AnomalyStatus Enum"),
("class MetricType(str, Enum):", "MetricType Enum"),
("class MetricSnapshot(Base):", "MetricSnapshot DB 모델"),
("class AnomalyEvent(Base):", "AnomalyEvent DB 모델"),
("class AnomalyRule(Base):", "AnomalyRule DB 모델"),
("class MetricSnapshotIn(BaseModel):", "MetricSnapshotIn Pydantic"),
("class AnomalyEventOut(BaseModel):", "AnomalyEventOut Pydantic"),
("class AnomalyRuleCreate(BaseModel):", "AnomalyRuleCreate Pydantic"),
("class SimulateMetricIn(BaseModel):", "SimulateMetricIn Pydantic"),
("Float", "Float 타입 임포트"),
("tb_metric_snapshot", "tb_metric_snapshot 테이블명"),
("tb_anomaly_event", "tb_anomaly_event 테이블명"),
("tb_anomaly_rule", "tb_anomaly_rule 테이블명"),
]
for sym, desc in checks:
status = "OK" if sym in models_src else "ERR"
if status == "ERR":
ok = False
print(f" {status} {desc}")
print("\n=== 3. core/anomaly.py 함수 확인 ===")
with open("core/anomaly.py", encoding="utf-8") as f:
anom_src = f.read()
fn_checks = [
("def detect_zscore(", "Z-score 탐지 함수"),
("def detect_iqr(", "IQR 탐지 함수"),
("def detect_threshold(", "임계값 탐지 함수"),
("def detect_trend(", "추세 탐지 함수"),
("def run_detection(", "통합 탐지 실행 함수"),
("def build_event_title(", "이벤트 제목 생성"),
("def build_event_description(", "이벤트 설명 생성"),
("async def _call_ollama_analysis(", "Ollama LLM 분석"),
("async def fetch_recent_values(", "히스토리 조회"),
("async def run_rules_on_metric(", "룰 기반 탐지 실행"),
("def generate_simulation_data(", "시뮬레이션 데이터 생성"),
("DEFAULT_THRESHOLDS", "기본 임계값 테이블"),
("METRIC_UNITS", "메트릭 단위 맵"),
("OLLAMA_URL", "Ollama URL 설정"),
]
for sym, desc in fn_checks:
status = "OK" if sym in anom_src else "ERR"
if status == "ERR":
ok = False
print(f" {status} {desc}")
print("\n=== 4. routers/anomaly.py 엔드포인트 확인 ===")
with open("routers/anomaly.py", encoding="utf-8") as f:
router_src = f.read()
endpoint_checks = [
('@router.post("/metrics"', "POST /api/anomaly/metrics (메트릭 수집)"),
('@router.post("/metrics/batch"', "POST /api/anomaly/metrics/batch (일괄 수집)"),
('@router.get("/metrics/{source}"', "GET /api/anomaly/metrics/{source}"),
('@router.post("/detect")', "POST /api/anomaly/detect (단순 탐지)"),
('@router.get("/events"', "GET /api/anomaly/events"),
('@router.get("/events/{event_id}"', "GET /api/anomaly/events/{id}"),
('@router.patch("/events/{event_id}/acknowledge")', "PATCH acknowledge"),
('@router.patch("/events/{event_id}/resolve")', "PATCH resolve"),
('@router.post("/rules"', "POST /api/anomaly/rules"),
('@router.get("/rules"', "GET /api/anomaly/rules"),
('@router.get("/summary")', "GET /api/anomaly/summary"),
('@router.post("/simulate")', "POST /api/anomaly/simulate"),
("BackgroundTasks", "비동기 백그라운드 탐지"),
("_detect_background", "백그라운드 탐지 함수"),
]
for sym, desc in endpoint_checks:
status = "OK" if sym in router_src else "ERR"
if status == "ERR":
ok = False
print(f" {status} {desc}")
print("\n=== 5. main.py 등록 확인 ===")
with open("main.py", encoding="utf-8") as f:
main_src = f.read()
main_checks = [
("anomaly", "anomaly 라우터 임포트"),
("anomaly.router", "anomaly 라우터 등록"),
]
for sym, desc in main_checks:
status = "OK" if sym in main_src else "ERR"
if status == "ERR":
ok = False
print(f" {status} {desc}")
print("\n=== 6. detect_zscore 단위 테스트 ===")
try:
import importlib.util
spec = importlib.util.spec_from_file_location("anomaly_mod", "core/anomaly.py")
anom_mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(anom_mod)
# 정상 케이스 (Z-score < 3.0)
normal_vals = [40.0 + i * 0.1 for i in range(30)] # 40~43 범위
is_anom, mean, std, z = anom_mod.detect_zscore(normal_vals, 42.0, z_threshold=3.0)
assert not is_anom, f"정상 값이 이상으로 탐지됨: z={z}"
print(f" OK 정상 케이스 (z={z:.2f}, 이상아님)")
# 이상 케이스 (Z-score > 3.0)
is_anom2, mean2, std2, z2 = anom_mod.detect_zscore(normal_vals, 95.0, z_threshold=3.0)
assert is_anom2, f"이상 값이 정상으로 탐지됨: z={z2}"
print(f" OK 이상 케이스 (z={z2:.2f}, 이상탐지)")
# 최소 샘플 미달
is_anom3, _, _, _ = anom_mod.detect_zscore([40.0, 41.0], 95.0, min_samples=10)
assert not is_anom3, "샘플 부족 시 이상 탐지 안 되어야 함"
print(f" OK 최소 샘플 미달 (이상 탐지 안 함)")
except Exception as e:
print(f" ERR detect_zscore 오류: {type(e).__name__}: {e}")
ok = False
print("\n=== 7. detect_iqr 단위 테스트 ===")
try:
vals = [20, 25, 30, 35, 40, 45, 50, 55, 60] # Q1=30, Q3=52.5, IQR=22.5
is_anom, lower, upper = anom_mod.detect_iqr(vals, 40.0, iqr_factor=1.5, min_samples=5)
assert not is_anom, f"정상 값이 IQR 이상으로 탐지됨"
print(f" OK IQR 정상 케이스 (범위={lower:.1f}~{upper:.1f})")
is_anom2, lower2, upper2 = anom_mod.detect_iqr(vals, 200.0, iqr_factor=1.5, min_samples=5)
assert is_anom2, f"이상 값이 IQR 정상으로 탐지됨"
print(f" OK IQR 이상 케이스 (200.0 > 상한 {upper2:.1f})")
except Exception as e:
print(f" ERR detect_iqr 오류: {type(e).__name__}: {e}")
ok = False
print("\n=== 8. detect_threshold 단위 테스트 ===")
try:
assert anom_mod.detect_threshold(95.0, 90.0) == True
assert anom_mod.detect_threshold(80.0, 90.0) == False
assert anom_mod.detect_threshold(90.0, 90.0, "gte") == True
assert anom_mod.detect_threshold(89.9, 90.0, "lt") == True
print(" OK 임계값 탐지 4개 케이스 모두 통과")
except AssertionError as e:
print(f" ERR 임계값 탐지 오류: {e}")
ok = False
print("\n=== 9. detect_trend 단위 테스트 ===")
try:
# 연속 상승 추세 (20% 이상 상승)
rising = [40, 45, 52, 60, 71, 90] # 40→90 = +125%
is_anom, direction = anom_mod.detect_trend(rising, window=5, deviation_pct=20.0)
assert is_anom and direction == "RISING", f"상승 추세 탐지 실패: is_anom={is_anom}, dir={direction}"
print(f" OK 상승 추세 탐지 (direction={direction})")
# 안정 추세
stable = [40, 41, 39, 42, 40, 41]
is_anom2, direction2 = anom_mod.detect_trend(stable, window=5)
assert not is_anom2, f"안정 값이 추세 이상으로 탐지됨"
print(f" OK 안정 추세 (이상 없음)")
except AssertionError as e:
print(f" ERR 추세 탐지 오류: {e}")
ok = False
except Exception as e:
print(f" ERR 추세 탐지 예외: {type(e).__name__}: {e}")
ok = False
print("\n=== 10. run_detection 통합 테스트 ===")
try:
import statistics as stat
vals = [40.0 + i * 0.2 for i in range(50)] # 40.0~49.8 평균 ≈ 45
mean_v = stat.mean(vals)
std_v = stat.stdev(vals)
# ZSCORE 이상
result = anom_mod.run_detection("CPU_USAGE", 95.0, vals, method="ZSCORE")
assert result["is_anomaly"], f"ZSCORE: 이상 값(95.0) 탐지 실패"
assert result["method"] == "ZSCORE"
print(f" OK ZSCORE 이상 탐지 (z={result['z_score']:.2f})")
# THRESHOLD 이상 (CPU > 90%)
result2 = anom_mod.run_detection("CPU_USAGE", 92.0, vals, method="THRESHOLD", threshold=90.0)
assert result2["is_anomaly"], "THRESHOLD: 이상 값(92.0) 탐지 실패"
print(f" OK THRESHOLD 이상 탐지")
# THRESHOLD 정상 (CPU = 80%)
result3 = anom_mod.run_detection("CPU_USAGE", 80.0, vals, method="THRESHOLD", threshold=90.0)
assert not result3["is_anomaly"], "THRESHOLD: 정상 값(80.0) 이상 탐지됨"
print(f" OK THRESHOLD 정상 케이스")
except AssertionError as e:
print(f" ERR run_detection 오류: {e}")
ok = False
except Exception as e:
print(f" ERR run_detection 예외: {type(e).__name__}: {e}")
ok = False
print("\n=== 11. generate_simulation_data 테스트 ===")
try:
normal_vals, anomaly_val = anom_mod.generate_simulation_data(
normal_count=50, baseline_mean=40.0, baseline_std=10.0, anomaly_value=95.0
)
assert len(normal_vals) == 50, f"정상 데이터 수: {len(normal_vals)}"
assert anomaly_val == 95.0, f"이상 값: {anomaly_val}"
assert all(0 <= v <= 100 for v in normal_vals), "정상 데이터 범위 초과"
# 이상 값이 정상 분포에서 이상으로 탐지되는지 확인
det = anom_mod.run_detection("CPU_USAGE", anomaly_val, normal_vals, method="ZSCORE", min_samples=5)
assert det["is_anomaly"], f"시뮬레이션 이상 값이 탐지되지 않음: z={det.get('z_score')}"
print(f" OK 시뮬레이션 데이터 생성 및 탐지 성공 (z={det['z_score']:.2f})")
except AssertionError as e:
print(f" ERR 시뮬레이션 오류: {e}")
ok = False
except Exception as e:
print(f" ERR 시뮬레이션 예외: {type(e).__name__}: {e}")
ok = False
print("\n=== 12. Ollama 연결 없는 폴백 테스트 ===")
async def test_ollama_fallback():
try:
result = await anom_mod._call_ollama_analysis(
source="test-server",
metric_type="CPU_USAGE",
current_value=95.0,
baseline_mean=40.0,
z_score=5.5,
detect_detail="Z-score=5.50 초과",
model="llama3",
timeout=2,
)
# Ollama 연결 실패 시 빈 문자열 반환 (오류 없이)
assert isinstance(result, str), "반환값이 str이어야 함"
print(f" OK Ollama 폴백 (연결 없음 → 빈 문자열 반환): '{result[:30]}'")
except Exception as e:
print(f" ERR Ollama 폴백 실패: {type(e).__name__}: {e}")
asyncio.run(test_ollama_fallback())
print("\n=== B-1 AI 이상 탐지 테스트 완료 ===")
if ok:
print("모든 검사 통과")
else:
sys.exit(1)