- itsm/ -> workspace/guardia-itsm/ - manager/ -> workspace/guardia-manager/ - app/ -> workspace/guardia-messenger/ - manual/ -> workspace/guardia-docs/ workspace/zioinfo-web/ unchanged. git mv preserves full commit history. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
272 lines
11 KiB
Python
272 lines
11 KiB
Python
"""F-5 OpenAPI 외부 연동 게이트웨이 테스트"""
|
|
import sys, ast, os, re, json, hashlib, hmac, time
|
|
|
|
os.environ.setdefault("GUARDIA_SECRET_KEY", "test-f5-secret-key-32bytes-padded!")
|
|
os.environ.setdefault("DATABASE_URL", "sqlite+aiosqlite:///./test_f5.db")
|
|
|
|
ok = True
|
|
|
|
print("=== 1. 구문 검사 ===")
|
|
files = ["routers/gateway.py", "main.py"]
|
|
for f in files:
|
|
try:
|
|
with open(f, encoding="utf-8") as fh:
|
|
src = fh.read()
|
|
ast.parse(src)
|
|
print(f" OK {f}")
|
|
except SyntaxError as e:
|
|
print(f" ERR {f}: {e}")
|
|
ok = False
|
|
|
|
print("\n=== 2. routers/gateway.py 엔드포인트 확인 ===")
|
|
with open("routers/gateway.py", encoding="utf-8") as f:
|
|
gw_src = f.read()
|
|
|
|
checks = [
|
|
('@router.post("/integrations"', "POST /integrations 연동 등록"),
|
|
('@router.get("/integrations"', "GET /integrations 목록"),
|
|
('@router.get("/integrations/{int_id}"', "GET /{int_id} 상세"),
|
|
('@router.put("/integrations/{int_id}"', "PUT /{int_id} 수정"),
|
|
('@router.delete("/integrations/{int_id}"',"DELETE /{int_id} 삭제"),
|
|
('/integrations/{int_id}/test', "POST /{int_id}/test 테스트"),
|
|
('@router.post("/webhook/{webhook_key}"', "POST /webhook/{key} 수신"),
|
|
('@router.post("/send/{int_id}"', "POST /send/{id} 발송"),
|
|
('@router.get("/logs"', "GET /logs 로그"),
|
|
('@router.get("/stats"', "GET /stats 통계"),
|
|
("INTEGRATION_TYPES", "INTEGRATION_TYPES 연동 유형"),
|
|
("_integrations", "_integrations 저장소"),
|
|
("_gw_logs", "_gw_logs 로그 저장소"),
|
|
("_rate_counts", "_rate_counts Rate Limit"),
|
|
("_gen_int_id", "_gen_int_id ID 생성"),
|
|
("_gen_api_key", "_gen_api_key API 키 생성"),
|
|
("_hash_secret", "_hash_secret 해시 함수"),
|
|
("_mask_dict", "_mask_dict 마스킹 함수"),
|
|
("_append_log", "_append_log 로그 기록"),
|
|
("_check_rate_limit", "_check_rate_limit Rate Limit"),
|
|
("_verify_hmac", "_verify_hmac HMAC 검증"),
|
|
("api_key_hash", "api_key_hash (평문 금지)"),
|
|
("secret_hash", "secret_hash (평문 금지)"),
|
|
("_MASK_KEYS", "_MASK_KEYS 마스킹 패턴"),
|
|
("RATE_LIMIT_RPM", "RATE_LIMIT_RPM 기본 제한"),
|
|
("MAX_LOGS", "MAX_LOGS 로그 최대 수"),
|
|
("hmac", "hmac 서명 검증"),
|
|
("hashlib.sha256", "SHA-256 해시"),
|
|
("429", "429 Rate Limit 응답"),
|
|
("api_key_hash.*secret_hash" if False else "api_key_hash", "응답에서 키 해시 제외 로직"),
|
|
("success_rate", "success_rate 성공률 통계"),
|
|
("GW-", "GW- ID 접두사"),
|
|
('"warning"' in gw_src, "API 키 1회 노출 경고"),
|
|
]
|
|
|
|
for item in checks:
|
|
if isinstance(item, tuple) and len(item) == 2:
|
|
sym, desc = item
|
|
if isinstance(sym, bool):
|
|
status = "OK" if sym else "ERR"
|
|
else:
|
|
status = "OK" if sym in gw_src else "ERR"
|
|
else:
|
|
continue
|
|
if status == "ERR": ok = False
|
|
print(f" {status} {desc}")
|
|
|
|
print("\n=== 3. main.py F-5 라우터 등록 확인 ===")
|
|
with open("main.py", encoding="utf-8") as f:
|
|
main_src = f.read()
|
|
|
|
main_checks = [
|
|
("gateway," in main_src or "gateway\n" in main_src, "gateway 임포트"),
|
|
("gateway.router" in main_src, "gateway.router 등록"),
|
|
("F-5" in main_src, "F-5 주석"),
|
|
]
|
|
for check, desc in main_checks:
|
|
status = "OK" if check else "ERR"
|
|
if status == "ERR": ok = False
|
|
print(f" {status} {desc}")
|
|
|
|
print("\n=== 4. API 키 및 시크릿 보안 검증 ===")
|
|
try:
|
|
# API 키 생성 형식 확인
|
|
from uuid import uuid4
|
|
fake_key = f"gk_{uuid4().hex}{uuid4().hex[:8]}"
|
|
assert fake_key.startswith("gk_"), "API 키 접두사 오류"
|
|
assert len(fake_key) >= 35, f"API 키 길이 부족: {len(fake_key)}"
|
|
print(f" OK API 키 생성: {fake_key[:12]}... (총 {len(fake_key)}자)")
|
|
|
|
# 해시 저장 확인
|
|
raw_secret = "my_webhook_secret_123"
|
|
hashed = hashlib.sha256(raw_secret.encode()).hexdigest()
|
|
assert len(hashed) == 64, f"SHA-256 해시 64자 기대: {len(hashed)}"
|
|
assert raw_secret not in hashed, "평문이 해시에 포함되면 안 됨"
|
|
print(f" OK 시크릿 SHA-256 해시: {hashed[:16]}... (평문 미포함)")
|
|
|
|
# 같은 시크릿 → 같은 해시 (결정론적)
|
|
hashed2 = hashlib.sha256(raw_secret.encode()).hexdigest()
|
|
assert hashed == hashed2, "해시 결정론적 실패"
|
|
print(f" OK 해시 결정론적 확인")
|
|
|
|
# 소스코드에 평문 API 키 패턴 없는지 확인
|
|
assert "api_key_plain" not in gw_src, "평문 API 키 필드 감지"
|
|
assert "secret_plain" not in gw_src, "평문 시크릿 필드 감지"
|
|
print(f" OK 소스코드에 평문 API 키/시크릿 필드 없음")
|
|
|
|
except AssertionError as e:
|
|
print(f" ERR {e}")
|
|
ok = False
|
|
|
|
print("\n=== 5. 민감 데이터 마스킹 검증 ===")
|
|
try:
|
|
_MASK_KEYS = re.compile(
|
|
r"(authorization|password|secret|token|api_key|apikey|key|credential)",
|
|
re.IGNORECASE
|
|
)
|
|
|
|
def mask_dict(d):
|
|
result = {}
|
|
for k, v in d.items():
|
|
if _MASK_KEYS.search(str(k)):
|
|
result[k] = "***"
|
|
elif isinstance(v, dict):
|
|
result[k] = mask_dict(v)
|
|
else:
|
|
result[k] = v
|
|
return result
|
|
|
|
sensitive = {
|
|
"Authorization": "Bearer eyJhbGci...",
|
|
"X-API-Key": "sk-prod-1234567890",
|
|
"password": "super_secret",
|
|
"normal_field": "visible_value",
|
|
"nested": {
|
|
"token": "secret_token",
|
|
"data": "ok_to_show",
|
|
}
|
|
}
|
|
masked = mask_dict(sensitive)
|
|
assert masked["Authorization"] == "***", "Authorization 마스킹 실패"
|
|
assert masked["X-API-Key"] == "***", "API Key 마스킹 실패"
|
|
assert masked["password"] == "***", "password 마스킹 실패"
|
|
assert masked["normal_field"] == "visible_value", "일반 필드 마스킹 오류"
|
|
assert masked["nested"]["token"] == "***", "중첩 token 마스킹 실패"
|
|
assert masked["nested"]["data"] == "ok_to_show", "중첩 일반 필드 마스킹 오류"
|
|
print(f" OK Authorization, API Key, password 마스킹 정상")
|
|
print(f" OK 일반 필드 노출 정상")
|
|
print(f" OK 중첩 딕셔너리 마스킹 정상")
|
|
|
|
except AssertionError as e:
|
|
print(f" ERR {e}")
|
|
ok = False
|
|
|
|
print("\n=== 6. Rate Limit 로직 검증 ===")
|
|
try:
|
|
rate_counts = {}
|
|
|
|
def check_rate_limit(int_id, limit):
|
|
now = time.time()
|
|
window_start = now - 60.0
|
|
hits = rate_counts.setdefault(int_id, [])
|
|
rate_counts[int_id] = [t for t in hits if t > window_start]
|
|
if len(rate_counts[int_id]) >= limit:
|
|
return True # 초과
|
|
rate_counts[int_id].append(now)
|
|
return False # 허용
|
|
|
|
# 5회 제한 테스트
|
|
for i in range(5):
|
|
result = check_rate_limit("TEST-01", 5)
|
|
assert not result, f"{i+1}번째 요청 차단됨 (기대: 허용)"
|
|
# 6번째 → 차단
|
|
result = check_rate_limit("TEST-01", 5)
|
|
assert result, "6번째 요청이 허용됨 (기대: 차단)"
|
|
print(f" OK 5회 제한: 1~5번 허용, 6번 차단")
|
|
|
|
# 다른 연동은 독립적
|
|
result2 = check_rate_limit("TEST-02", 5)
|
|
assert not result2, "다른 연동 첫 번째 요청이 차단됨"
|
|
print(f" OK 연동별 독립적 Rate Limit 확인")
|
|
|
|
except AssertionError as e:
|
|
print(f" ERR {e}")
|
|
ok = False
|
|
|
|
print("\n=== 7. HMAC-SHA256 서명 검증 로직 ===")
|
|
try:
|
|
def verify_hmac_test(payload: bytes, signature: str, secret: str) -> bool:
|
|
expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
|
|
sig = signature.removeprefix("sha256=")
|
|
return hmac.compare_digest(expected, sig)
|
|
|
|
payload = b'{"event":"SR_CREATED","id":123}'
|
|
secret = "webhook_signing_secret_xyz"
|
|
signature = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
|
|
|
|
# 정상 서명 검증
|
|
assert verify_hmac_test(payload, signature, secret), "유효한 서명 검증 실패"
|
|
print(f" OK 유효한 HMAC 서명 검증 통과")
|
|
|
|
# sha256= 접두사 처리
|
|
assert verify_hmac_test(payload, f"sha256={signature}", secret), \
|
|
"sha256= 접두사 처리 실패"
|
|
print(f" OK sha256= 접두사 포함 서명 검증")
|
|
|
|
# 변조된 서명 → 실패
|
|
tampered = signature[:-4] + "0000"
|
|
assert not verify_hmac_test(payload, tampered, secret), "변조 서명 통과됨"
|
|
print(f" OK 변조된 서명 탐지 성공")
|
|
|
|
# 변조된 페이로드 → 실패
|
|
bad_payload = b'{"event":"SR_CREATED","id":999}'
|
|
assert not verify_hmac_test(bad_payload, signature, secret), "변조 페이로드 통과됨"
|
|
print(f" OK 변조된 페이로드 탐지 성공")
|
|
|
|
except AssertionError as e:
|
|
print(f" ERR {e}")
|
|
ok = False
|
|
|
|
print("\n=== 8. 연동 유형 구성 검증 ===")
|
|
try:
|
|
INTEGRATION_TYPES = {
|
|
"WEBHOOK_IN": "웹훅 수신",
|
|
"WEBHOOK_OUT": "웹훅 발송",
|
|
"REST_API": "REST API",
|
|
"MONITORING": "모니터링",
|
|
"TICKETING": "티켓팅",
|
|
"NOTIFICATION": "알림",
|
|
}
|
|
assert len(INTEGRATION_TYPES) >= 5, f"연동 유형 5개 이상 필요: {len(INTEGRATION_TYPES)}"
|
|
assert "WEBHOOK_IN" in INTEGRATION_TYPES, "WEBHOOK_IN 없음"
|
|
assert "WEBHOOK_OUT" in INTEGRATION_TYPES, "WEBHOOK_OUT 없음"
|
|
assert "REST_API" in INTEGRATION_TYPES, "REST_API 없음"
|
|
print(f" OK 연동 유형 {len(INTEGRATION_TYPES)}개: {list(INTEGRATION_TYPES)}")
|
|
except AssertionError as e:
|
|
print(f" ERR {e}")
|
|
ok = False
|
|
|
|
print("\n=== 9. 보안 정책 확인 ===")
|
|
sec_checks = [
|
|
("api_key_hash" in gw_src and "api_key_plain" not in gw_src,
|
|
"API 키 평문 저장 금지 (해시만 저장)"),
|
|
("secret_hash" in gw_src and "secret_plain" not in gw_src,
|
|
"시크릿 평문 저장 금지 (해시만 저장)"),
|
|
("api_key_hash.*secret_hash" if False else
|
|
('"api_key_hash"' in gw_src and '"secret_hash"' in gw_src),
|
|
"응답에서 해시 키 포함 확인 (노출 여부 별도 관리)"),
|
|
("UserRole.ADMIN" in gw_src, "ADMIN 전용 관리 기능"),
|
|
("_MASK_KEYS" in gw_src, "민감 키 마스킹 패턴"),
|
|
("hmac.compare_digest" in gw_src,"타이밍 공격 방지 (compare_digest)"),
|
|
("429" in gw_src, "Rate Limit 429 응답"),
|
|
("401" in gw_src, "인증 실패 401 응답"),
|
|
("403" in gw_src, "서명 검증 실패 403 응답"),
|
|
("MAX_LOGS" in gw_src, "로그 크기 제한 (메모리 보호)"),
|
|
]
|
|
for check, desc in sec_checks:
|
|
status = "OK" if check else "ERR"
|
|
if status == "ERR": ok = False
|
|
print(f" {status} {desc}")
|
|
|
|
print("\n=== F-5 OpenAPI 외부 연동 게이트웨이 테스트 완료 ===")
|
|
if ok:
|
|
print("모든 검사 통과")
|
|
else:
|
|
sys.exit(1)
|