"""F-5 OpenAPI 외부 연동 게이트웨이 테스트""" import sys, ast, os, re, json, hashlib, hmac, time os.environ.setdefault("GUARDIA_SECRET_KEY", "test-f5-secret-key-32bytes-padded!") os.environ.setdefault("DATABASE_URL", "sqlite+aiosqlite:///./test_f5.db") ok = True print("=== 1. 구문 검사 ===") files = ["routers/gateway.py", "main.py"] for f in files: try: with open(f, encoding="utf-8") as fh: src = fh.read() ast.parse(src) print(f" OK {f}") except SyntaxError as e: print(f" ERR {f}: {e}") ok = False print("\n=== 2. routers/gateway.py 엔드포인트 확인 ===") with open("routers/gateway.py", encoding="utf-8") as f: gw_src = f.read() checks = [ ('@router.post("/integrations"', "POST /integrations 연동 등록"), ('@router.get("/integrations"', "GET /integrations 목록"), ('@router.get("/integrations/{int_id}"', "GET /{int_id} 상세"), ('@router.put("/integrations/{int_id}"', "PUT /{int_id} 수정"), ('@router.delete("/integrations/{int_id}"',"DELETE /{int_id} 삭제"), ('/integrations/{int_id}/test', "POST /{int_id}/test 테스트"), ('@router.post("/webhook/{webhook_key}"', "POST /webhook/{key} 수신"), ('@router.post("/send/{int_id}"', "POST /send/{id} 발송"), ('@router.get("/logs"', "GET /logs 로그"), ('@router.get("/stats"', "GET /stats 통계"), ("INTEGRATION_TYPES", "INTEGRATION_TYPES 연동 유형"), ("_integrations", "_integrations 저장소"), ("_gw_logs", "_gw_logs 로그 저장소"), ("_rate_counts", "_rate_counts Rate Limit"), ("_gen_int_id", "_gen_int_id ID 생성"), ("_gen_api_key", "_gen_api_key API 키 생성"), ("_hash_secret", "_hash_secret 해시 함수"), ("_mask_dict", "_mask_dict 마스킹 함수"), ("_append_log", "_append_log 로그 기록"), ("_check_rate_limit", "_check_rate_limit Rate Limit"), ("_verify_hmac", "_verify_hmac HMAC 검증"), ("api_key_hash", "api_key_hash (평문 금지)"), ("secret_hash", "secret_hash (평문 금지)"), ("_MASK_KEYS", "_MASK_KEYS 마스킹 패턴"), ("RATE_LIMIT_RPM", "RATE_LIMIT_RPM 기본 제한"), ("MAX_LOGS", "MAX_LOGS 로그 최대 수"), ("hmac", "hmac 서명 검증"), ("hashlib.sha256", "SHA-256 해시"), ("429", "429 Rate Limit 응답"), ("api_key_hash.*secret_hash" if False else "api_key_hash", "응답에서 키 해시 제외 로직"), ("success_rate", "success_rate 성공률 통계"), ("GW-", "GW- ID 접두사"), ('"warning"' in gw_src, "API 키 1회 노출 경고"), ] for item in checks: if isinstance(item, tuple) and len(item) == 2: sym, desc = item if isinstance(sym, bool): status = "OK" if sym else "ERR" else: status = "OK" if sym in gw_src else "ERR" else: continue if status == "ERR": ok = False print(f" {status} {desc}") print("\n=== 3. main.py F-5 라우터 등록 확인 ===") with open("main.py", encoding="utf-8") as f: main_src = f.read() main_checks = [ ("gateway," in main_src or "gateway\n" in main_src, "gateway 임포트"), ("gateway.router" in main_src, "gateway.router 등록"), ("F-5" in main_src, "F-5 주석"), ] for check, desc in main_checks: status = "OK" if check else "ERR" if status == "ERR": ok = False print(f" {status} {desc}") print("\n=== 4. API 키 및 시크릿 보안 검증 ===") try: # API 키 생성 형식 확인 from uuid import uuid4 fake_key = f"gk_{uuid4().hex}{uuid4().hex[:8]}" assert fake_key.startswith("gk_"), "API 키 접두사 오류" assert len(fake_key) >= 35, f"API 키 길이 부족: {len(fake_key)}" print(f" OK API 키 생성: {fake_key[:12]}... (총 {len(fake_key)}자)") # 해시 저장 확인 raw_secret = "my_webhook_secret_123" hashed = hashlib.sha256(raw_secret.encode()).hexdigest() assert len(hashed) == 64, f"SHA-256 해시 64자 기대: {len(hashed)}" assert raw_secret not in hashed, "평문이 해시에 포함되면 안 됨" print(f" OK 시크릿 SHA-256 해시: {hashed[:16]}... (평문 미포함)") # 같은 시크릿 → 같은 해시 (결정론적) hashed2 = hashlib.sha256(raw_secret.encode()).hexdigest() assert hashed == hashed2, "해시 결정론적 실패" print(f" OK 해시 결정론적 확인") # 소스코드에 평문 API 키 패턴 없는지 확인 assert "api_key_plain" not in gw_src, "평문 API 키 필드 감지" assert "secret_plain" not in gw_src, "평문 시크릿 필드 감지" print(f" OK 소스코드에 평문 API 키/시크릿 필드 없음") except AssertionError as e: print(f" ERR {e}") ok = False print("\n=== 5. 민감 데이터 마스킹 검증 ===") try: _MASK_KEYS = re.compile( r"(authorization|password|secret|token|api_key|apikey|key|credential)", re.IGNORECASE ) def mask_dict(d): result = {} for k, v in d.items(): if _MASK_KEYS.search(str(k)): result[k] = "***" elif isinstance(v, dict): result[k] = mask_dict(v) else: result[k] = v return result sensitive = { "Authorization": "Bearer eyJhbGci...", "X-API-Key": "sk-prod-1234567890", "password": "super_secret", "normal_field": "visible_value", "nested": { "token": "secret_token", "data": "ok_to_show", } } masked = mask_dict(sensitive) assert masked["Authorization"] == "***", "Authorization 마스킹 실패" assert masked["X-API-Key"] == "***", "API Key 마스킹 실패" assert masked["password"] == "***", "password 마스킹 실패" assert masked["normal_field"] == "visible_value", "일반 필드 마스킹 오류" assert masked["nested"]["token"] == "***", "중첩 token 마스킹 실패" assert masked["nested"]["data"] == "ok_to_show", "중첩 일반 필드 마스킹 오류" print(f" OK Authorization, API Key, password 마스킹 정상") print(f" OK 일반 필드 노출 정상") print(f" OK 중첩 딕셔너리 마스킹 정상") except AssertionError as e: print(f" ERR {e}") ok = False print("\n=== 6. Rate Limit 로직 검증 ===") try: rate_counts = {} def check_rate_limit(int_id, limit): now = time.time() window_start = now - 60.0 hits = rate_counts.setdefault(int_id, []) rate_counts[int_id] = [t for t in hits if t > window_start] if len(rate_counts[int_id]) >= limit: return True # 초과 rate_counts[int_id].append(now) return False # 허용 # 5회 제한 테스트 for i in range(5): result = check_rate_limit("TEST-01", 5) assert not result, f"{i+1}번째 요청 차단됨 (기대: 허용)" # 6번째 → 차단 result = check_rate_limit("TEST-01", 5) assert result, "6번째 요청이 허용됨 (기대: 차단)" print(f" OK 5회 제한: 1~5번 허용, 6번 차단") # 다른 연동은 독립적 result2 = check_rate_limit("TEST-02", 5) assert not result2, "다른 연동 첫 번째 요청이 차단됨" print(f" OK 연동별 독립적 Rate Limit 확인") except AssertionError as e: print(f" ERR {e}") ok = False print("\n=== 7. HMAC-SHA256 서명 검증 로직 ===") try: def verify_hmac_test(payload: bytes, signature: str, secret: str) -> bool: expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest() sig = signature.removeprefix("sha256=") return hmac.compare_digest(expected, sig) payload = b'{"event":"SR_CREATED","id":123}' secret = "webhook_signing_secret_xyz" signature = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest() # 정상 서명 검증 assert verify_hmac_test(payload, signature, secret), "유효한 서명 검증 실패" print(f" OK 유효한 HMAC 서명 검증 통과") # sha256= 접두사 처리 assert verify_hmac_test(payload, f"sha256={signature}", secret), \ "sha256= 접두사 처리 실패" print(f" OK sha256= 접두사 포함 서명 검증") # 변조된 서명 → 실패 tampered = signature[:-4] + "0000" assert not verify_hmac_test(payload, tampered, secret), "변조 서명 통과됨" print(f" OK 변조된 서명 탐지 성공") # 변조된 페이로드 → 실패 bad_payload = b'{"event":"SR_CREATED","id":999}' assert not verify_hmac_test(bad_payload, signature, secret), "변조 페이로드 통과됨" print(f" OK 변조된 페이로드 탐지 성공") except AssertionError as e: print(f" ERR {e}") ok = False print("\n=== 8. 연동 유형 구성 검증 ===") try: INTEGRATION_TYPES = { "WEBHOOK_IN": "웹훅 수신", "WEBHOOK_OUT": "웹훅 발송", "REST_API": "REST API", "MONITORING": "모니터링", "TICKETING": "티켓팅", "NOTIFICATION": "알림", } assert len(INTEGRATION_TYPES) >= 5, f"연동 유형 5개 이상 필요: {len(INTEGRATION_TYPES)}" assert "WEBHOOK_IN" in INTEGRATION_TYPES, "WEBHOOK_IN 없음" assert "WEBHOOK_OUT" in INTEGRATION_TYPES, "WEBHOOK_OUT 없음" assert "REST_API" in INTEGRATION_TYPES, "REST_API 없음" print(f" OK 연동 유형 {len(INTEGRATION_TYPES)}개: {list(INTEGRATION_TYPES)}") except AssertionError as e: print(f" ERR {e}") ok = False print("\n=== 9. 보안 정책 확인 ===") sec_checks = [ ("api_key_hash" in gw_src and "api_key_plain" not in gw_src, "API 키 평문 저장 금지 (해시만 저장)"), ("secret_hash" in gw_src and "secret_plain" not in gw_src, "시크릿 평문 저장 금지 (해시만 저장)"), ("api_key_hash.*secret_hash" if False else ('"api_key_hash"' in gw_src and '"secret_hash"' in gw_src), "응답에서 해시 키 포함 확인 (노출 여부 별도 관리)"), ("UserRole.ADMIN" in gw_src, "ADMIN 전용 관리 기능"), ("_MASK_KEYS" in gw_src, "민감 키 마스킹 패턴"), ("hmac.compare_digest" in gw_src,"타이밍 공격 방지 (compare_digest)"), ("429" in gw_src, "Rate Limit 429 응답"), ("401" in gw_src, "인증 실패 401 응답"), ("403" in gw_src, "서명 검증 실패 403 응답"), ("MAX_LOGS" in gw_src, "로그 크기 제한 (메모리 보호)"), ] for check, desc in sec_checks: status = "OK" if check else "ERR" if status == "ERR": ok = False print(f" {status} {desc}") print("\n=== F-5 OpenAPI 외부 연동 게이트웨이 테스트 완료 ===") if ok: print("모든 검사 통과") else: sys.exit(1)