zioinfo-mail/workspace/guardia-itsm/test_f5_gateway.py
DESKTOP-TKLFCPR\ython cfe2901a55 refactor(structure): consolidate all projects under workspace/
- itsm/    -> workspace/guardia-itsm/
- manager/ -> workspace/guardia-manager/
- app/     -> workspace/guardia-messenger/
- manual/  -> workspace/guardia-docs/

workspace/zioinfo-web/ unchanged.
git mv preserves full commit history.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-31 23:50:56 +09:00

272 lines
11 KiB
Python

"""F-5 OpenAPI 외부 연동 게이트웨이 테스트"""
import sys, ast, os, re, json, hashlib, hmac, time
os.environ.setdefault("GUARDIA_SECRET_KEY", "test-f5-secret-key-32bytes-padded!")
os.environ.setdefault("DATABASE_URL", "sqlite+aiosqlite:///./test_f5.db")
ok = True
print("=== 1. 구문 검사 ===")
files = ["routers/gateway.py", "main.py"]
for f in files:
try:
with open(f, encoding="utf-8") as fh:
src = fh.read()
ast.parse(src)
print(f" OK {f}")
except SyntaxError as e:
print(f" ERR {f}: {e}")
ok = False
print("\n=== 2. routers/gateway.py 엔드포인트 확인 ===")
with open("routers/gateway.py", encoding="utf-8") as f:
gw_src = f.read()
checks = [
('@router.post("/integrations"', "POST /integrations 연동 등록"),
('@router.get("/integrations"', "GET /integrations 목록"),
('@router.get("/integrations/{int_id}"', "GET /{int_id} 상세"),
('@router.put("/integrations/{int_id}"', "PUT /{int_id} 수정"),
('@router.delete("/integrations/{int_id}"',"DELETE /{int_id} 삭제"),
('/integrations/{int_id}/test', "POST /{int_id}/test 테스트"),
('@router.post("/webhook/{webhook_key}"', "POST /webhook/{key} 수신"),
('@router.post("/send/{int_id}"', "POST /send/{id} 발송"),
('@router.get("/logs"', "GET /logs 로그"),
('@router.get("/stats"', "GET /stats 통계"),
("INTEGRATION_TYPES", "INTEGRATION_TYPES 연동 유형"),
("_integrations", "_integrations 저장소"),
("_gw_logs", "_gw_logs 로그 저장소"),
("_rate_counts", "_rate_counts Rate Limit"),
("_gen_int_id", "_gen_int_id ID 생성"),
("_gen_api_key", "_gen_api_key API 키 생성"),
("_hash_secret", "_hash_secret 해시 함수"),
("_mask_dict", "_mask_dict 마스킹 함수"),
("_append_log", "_append_log 로그 기록"),
("_check_rate_limit", "_check_rate_limit Rate Limit"),
("_verify_hmac", "_verify_hmac HMAC 검증"),
("api_key_hash", "api_key_hash (평문 금지)"),
("secret_hash", "secret_hash (평문 금지)"),
("_MASK_KEYS", "_MASK_KEYS 마스킹 패턴"),
("RATE_LIMIT_RPM", "RATE_LIMIT_RPM 기본 제한"),
("MAX_LOGS", "MAX_LOGS 로그 최대 수"),
("hmac", "hmac 서명 검증"),
("hashlib.sha256", "SHA-256 해시"),
("429", "429 Rate Limit 응답"),
("api_key_hash.*secret_hash" if False else "api_key_hash", "응답에서 키 해시 제외 로직"),
("success_rate", "success_rate 성공률 통계"),
("GW-", "GW- ID 접두사"),
('"warning"' in gw_src, "API 키 1회 노출 경고"),
]
for item in checks:
if isinstance(item, tuple) and len(item) == 2:
sym, desc = item
if isinstance(sym, bool):
status = "OK" if sym else "ERR"
else:
status = "OK" if sym in gw_src else "ERR"
else:
continue
if status == "ERR": ok = False
print(f" {status} {desc}")
print("\n=== 3. main.py F-5 라우터 등록 확인 ===")
with open("main.py", encoding="utf-8") as f:
main_src = f.read()
main_checks = [
("gateway," in main_src or "gateway\n" in main_src, "gateway 임포트"),
("gateway.router" in main_src, "gateway.router 등록"),
("F-5" in main_src, "F-5 주석"),
]
for check, desc in main_checks:
status = "OK" if check else "ERR"
if status == "ERR": ok = False
print(f" {status} {desc}")
print("\n=== 4. API 키 및 시크릿 보안 검증 ===")
try:
# API 키 생성 형식 확인
from uuid import uuid4
fake_key = f"gk_{uuid4().hex}{uuid4().hex[:8]}"
assert fake_key.startswith("gk_"), "API 키 접두사 오류"
assert len(fake_key) >= 35, f"API 키 길이 부족: {len(fake_key)}"
print(f" OK API 키 생성: {fake_key[:12]}... (총 {len(fake_key)}자)")
# 해시 저장 확인
raw_secret = "my_webhook_secret_123"
hashed = hashlib.sha256(raw_secret.encode()).hexdigest()
assert len(hashed) == 64, f"SHA-256 해시 64자 기대: {len(hashed)}"
assert raw_secret not in hashed, "평문이 해시에 포함되면 안 됨"
print(f" OK 시크릿 SHA-256 해시: {hashed[:16]}... (평문 미포함)")
# 같은 시크릿 → 같은 해시 (결정론적)
hashed2 = hashlib.sha256(raw_secret.encode()).hexdigest()
assert hashed == hashed2, "해시 결정론적 실패"
print(f" OK 해시 결정론적 확인")
# 소스코드에 평문 API 키 패턴 없는지 확인
assert "api_key_plain" not in gw_src, "평문 API 키 필드 감지"
assert "secret_plain" not in gw_src, "평문 시크릿 필드 감지"
print(f" OK 소스코드에 평문 API 키/시크릿 필드 없음")
except AssertionError as e:
print(f" ERR {e}")
ok = False
print("\n=== 5. 민감 데이터 마스킹 검증 ===")
try:
_MASK_KEYS = re.compile(
r"(authorization|password|secret|token|api_key|apikey|key|credential)",
re.IGNORECASE
)
def mask_dict(d):
result = {}
for k, v in d.items():
if _MASK_KEYS.search(str(k)):
result[k] = "***"
elif isinstance(v, dict):
result[k] = mask_dict(v)
else:
result[k] = v
return result
sensitive = {
"Authorization": "Bearer eyJhbGci...",
"X-API-Key": "sk-prod-1234567890",
"password": "super_secret",
"normal_field": "visible_value",
"nested": {
"token": "secret_token",
"data": "ok_to_show",
}
}
masked = mask_dict(sensitive)
assert masked["Authorization"] == "***", "Authorization 마스킹 실패"
assert masked["X-API-Key"] == "***", "API Key 마스킹 실패"
assert masked["password"] == "***", "password 마스킹 실패"
assert masked["normal_field"] == "visible_value", "일반 필드 마스킹 오류"
assert masked["nested"]["token"] == "***", "중첩 token 마스킹 실패"
assert masked["nested"]["data"] == "ok_to_show", "중첩 일반 필드 마스킹 오류"
print(f" OK Authorization, API Key, password 마스킹 정상")
print(f" OK 일반 필드 노출 정상")
print(f" OK 중첩 딕셔너리 마스킹 정상")
except AssertionError as e:
print(f" ERR {e}")
ok = False
print("\n=== 6. Rate Limit 로직 검증 ===")
try:
rate_counts = {}
def check_rate_limit(int_id, limit):
now = time.time()
window_start = now - 60.0
hits = rate_counts.setdefault(int_id, [])
rate_counts[int_id] = [t for t in hits if t > window_start]
if len(rate_counts[int_id]) >= limit:
return True # 초과
rate_counts[int_id].append(now)
return False # 허용
# 5회 제한 테스트
for i in range(5):
result = check_rate_limit("TEST-01", 5)
assert not result, f"{i+1}번째 요청 차단됨 (기대: 허용)"
# 6번째 → 차단
result = check_rate_limit("TEST-01", 5)
assert result, "6번째 요청이 허용됨 (기대: 차단)"
print(f" OK 5회 제한: 1~5번 허용, 6번 차단")
# 다른 연동은 독립적
result2 = check_rate_limit("TEST-02", 5)
assert not result2, "다른 연동 첫 번째 요청이 차단됨"
print(f" OK 연동별 독립적 Rate Limit 확인")
except AssertionError as e:
print(f" ERR {e}")
ok = False
print("\n=== 7. HMAC-SHA256 서명 검증 로직 ===")
try:
def verify_hmac_test(payload: bytes, signature: str, secret: str) -> bool:
expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
sig = signature.removeprefix("sha256=")
return hmac.compare_digest(expected, sig)
payload = b'{"event":"SR_CREATED","id":123}'
secret = "webhook_signing_secret_xyz"
signature = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
# 정상 서명 검증
assert verify_hmac_test(payload, signature, secret), "유효한 서명 검증 실패"
print(f" OK 유효한 HMAC 서명 검증 통과")
# sha256= 접두사 처리
assert verify_hmac_test(payload, f"sha256={signature}", secret), \
"sha256= 접두사 처리 실패"
print(f" OK sha256= 접두사 포함 서명 검증")
# 변조된 서명 → 실패
tampered = signature[:-4] + "0000"
assert not verify_hmac_test(payload, tampered, secret), "변조 서명 통과됨"
print(f" OK 변조된 서명 탐지 성공")
# 변조된 페이로드 → 실패
bad_payload = b'{"event":"SR_CREATED","id":999}'
assert not verify_hmac_test(bad_payload, signature, secret), "변조 페이로드 통과됨"
print(f" OK 변조된 페이로드 탐지 성공")
except AssertionError as e:
print(f" ERR {e}")
ok = False
print("\n=== 8. 연동 유형 구성 검증 ===")
try:
INTEGRATION_TYPES = {
"WEBHOOK_IN": "웹훅 수신",
"WEBHOOK_OUT": "웹훅 발송",
"REST_API": "REST API",
"MONITORING": "모니터링",
"TICKETING": "티켓팅",
"NOTIFICATION": "알림",
}
assert len(INTEGRATION_TYPES) >= 5, f"연동 유형 5개 이상 필요: {len(INTEGRATION_TYPES)}"
assert "WEBHOOK_IN" in INTEGRATION_TYPES, "WEBHOOK_IN 없음"
assert "WEBHOOK_OUT" in INTEGRATION_TYPES, "WEBHOOK_OUT 없음"
assert "REST_API" in INTEGRATION_TYPES, "REST_API 없음"
print(f" OK 연동 유형 {len(INTEGRATION_TYPES)}개: {list(INTEGRATION_TYPES)}")
except AssertionError as e:
print(f" ERR {e}")
ok = False
print("\n=== 9. 보안 정책 확인 ===")
sec_checks = [
("api_key_hash" in gw_src and "api_key_plain" not in gw_src,
"API 키 평문 저장 금지 (해시만 저장)"),
("secret_hash" in gw_src and "secret_plain" not in gw_src,
"시크릿 평문 저장 금지 (해시만 저장)"),
("api_key_hash.*secret_hash" if False else
('"api_key_hash"' in gw_src and '"secret_hash"' in gw_src),
"응답에서 해시 키 포함 확인 (노출 여부 별도 관리)"),
("UserRole.ADMIN" in gw_src, "ADMIN 전용 관리 기능"),
("_MASK_KEYS" in gw_src, "민감 키 마스킹 패턴"),
("hmac.compare_digest" in gw_src,"타이밍 공격 방지 (compare_digest)"),
("429" in gw_src, "Rate Limit 429 응답"),
("401" in gw_src, "인증 실패 401 응답"),
("403" in gw_src, "서명 검증 실패 403 응답"),
("MAX_LOGS" in gw_src, "로그 크기 제한 (메모리 보호)"),
]
for check, desc in sec_checks:
status = "OK" if check else "ERR"
if status == "ERR": ok = False
print(f" {status} {desc}")
print("\n=== F-5 OpenAPI 외부 연동 게이트웨이 테스트 완료 ===")
if ok:
print("모든 검사 통과")
else:
sys.exit(1)