"""D-5 불변 감사 로그 Hash Chain 테스트""" import sys, ast, os, json, hashlib os.environ.setdefault("GUARDIA_SECRET_KEY", "test-d5-secret-key-32bytes-padded!") os.environ.setdefault("DATABASE_URL", "sqlite+aiosqlite:///./test_d5.db") ok = True print("=== 1. 구문 검사 ===") files = ["routers/audit.py", "models.py"] for f in files: try: with open(f, encoding="utf-8") as fh: src = fh.read() ast.parse(src) print(f" OK {f}") except SyntaxError as e: print(f" ERR {f}: {e}") ok = False print("\n=== 2. routers/audit.py 엔드포인트 및 기능 확인 ===") with open("routers/audit.py", encoding="utf-8") as f: audit_src = f.read() checks = [ ('@router.get("",', "GET /audit 목록"), ('@router.post("/record"', "POST /record 이벤트 기록"), ('@router.get("/verify"', "GET /verify 체인 검증"), ('/verify/{from_id}/{to_id}', "GET /verify 범위 검증"), ('@router.get("/stats"', "GET /stats 통계"), ('@router.get("/export"', "GET /export 내보내기"), ('/entity/{entity_type}/{entity_id}', "GET /entity 엔티티별 이력"), ('@router.get("/{log_id}"', "GET /{log_id} 상세"), ("async def append_audit_log", "append_audit_log 함수"), ("async def _get_last_hash", "_get_last_hash 함수"), ("compute_log_hash", "compute_log_hash 사용"), ("hashlib.sha256", "SHA-256 해시"), ("prev_hash", "prev_hash 체인 연결"), ("ip_addr_hash", "IP 해시 저장"), ("client_ip", "클라이언트 IP 처리"), ("StreamingResponse", "StreamingResponse (CSV/JSON 내보내기)"), ("csv.writer", "CSV 내보내기"), ("UserRole.ADMIN", "ADMIN 전용 내보내기"), ("SEVERITY_LEVELS", "심각도 레벨 정의"), ("entity_type", "엔티티 유형 필드"), ] for sym, desc in checks: status = "OK" if sym in audit_src else "ERR" if status == "ERR": ok = False print(f" {status} {desc}") print("\n=== 3. models.py AuditLog D-5 확장 필드 확인 ===") with open("models.py", encoding="utf-8") as f: models_src = f.read() model_checks = [ ("entity_type", "entity_type 컬럼"), ("entity_id", "entity_id 컬럼"), ("ip_addr_hash", "IP 해시 컬럼 (원본 저장 금지)"), ("severity", "severity 컬럼"), ("prev_hash", "prev_hash 체인 컬럼"), ("compute_log_hash", "compute_log_hash 함수"), ] for sym, desc in model_checks: status = "OK" if sym in models_src else "ERR" if status == "ERR": ok = False print(f" {status} {desc}") print("\n=== 4. SHA-256 해시 체인 수학적 검증 ===") try: # compute_log_hash 로직 직접 테스트 def compute_log_hash_local(prev_hash, actor, action, detail, ts): payload = json.dumps( {"prev": prev_hash or "", "actor": actor, "action": action, "detail": detail, "ts": ts}, ensure_ascii=False, sort_keys=True ) return hashlib.sha256(payload.encode()).hexdigest() # 로그 체인 시뮬레이션 chain = [] prev = None events = [ ("admin", "USER_LOGIN", "관리자 로그인"), ("admin", "SR_CREATE", "SR-20260526-0001 생성"), ("admin", "SR_ASSIGN", "엔지니어 배정"), ("eng1", "SR_RESOLVE", "SR 해결"), ("admin", "SR_CLOSE", "SR 종료"), ] for actor, action, detail in events: ts = f"2026-05-26T{len(chain):02d}:00:00" h = compute_log_hash_local(prev, actor, action, detail, ts) chain.append({"prev": prev, "hash": h, "actor": actor, "action": action}) prev = h print(f" OK 체인 {len(chain)}개 생성 완료") print(f" OK 체인 예시: {chain[0]['hash'][:16]}...") # 체인 무결성 검증 broken = None for i, log in enumerate(chain): if i == 0: exp_prev = None else: exp_prev = chain[i-1]["hash"] if log["prev"] != exp_prev: broken = i break h = compute_log_hash_local(log["prev"], log["actor"], log["action"], events[i][2], f"2026-05-26T{i:02d}:00:00") if h != log["hash"]: broken = i break assert broken is None, f"정상 체인 검증 실패 at {broken}" print(f" OK 정상 체인 무결성 검증 통과") # 변조 시 탐지 chain_tampered = [dict(e) for e in chain] chain_tampered[2]["action"] = "TAMPERED_ACTION" # 중간 항목 변조 broken_tampered = None for i, log in enumerate(chain_tampered): h = compute_log_hash_local(log["prev"], log["actor"], log["action"], events[i][2], f"2026-05-26T{i:02d}:00:00") if h != log["hash"]: broken_tampered = i break assert broken_tampered == 2, f"변조 탐지 실패: {broken_tampered}" print(f" OK 변조 탐지: idx={broken_tampered}에서 무결성 위반 감지") except AssertionError as e: print(f" ERR {e}") ok = False except Exception as e: print(f" ERR 해시 체인 검증 오류: {type(e).__name__}: {e}") ok = False print("\n=== 5. SHA-256 결정론적 해시 검증 ===") try: # 동일 입력 → 동일 해시 (결정론적) def _h(prev, actor, action, detail, ts): payload = json.dumps( {"prev": prev or "", "actor": actor, "action": action, "detail": detail, "ts": ts}, ensure_ascii=False, sort_keys=True ) return hashlib.sha256(payload.encode()).hexdigest() h1 = _h(None, "admin", "LOGIN", "로그인 성공", "2026-01-01T00:00:00") h2 = _h(None, "admin", "LOGIN", "로그인 성공", "2026-01-01T00:00:00") assert h1 == h2, "동일 입력 다른 해시 — 결정론적 실패" assert len(h1) == 64, f"SHA-256 출력 64자 기대: {len(h1)}" print(f" OK 결정론적: 동일 입력 = {h1[:16]}...") print(f" OK 해시 길이: {len(h1)}자") # 다른 입력 → 다른 해시 h3 = _h(None, "admin", "LOGIN", "다른 내용", "2026-01-01T00:00:00") assert h1 != h3, "다른 입력이 동일 해시 — 충돌 위험" print(f" OK 다른 입력 = 다른 해시: {h3[:16]}...") # prev_hash 체인 효과 h4 = _h("abc123", "admin", "LOGIN", "로그인 성공", "2026-01-01T00:00:00") assert h1 != h4, "prev_hash가 해시에 영향을 미치지 않음" print(f" OK prev_hash 체인 효과 확인") except AssertionError as e: print(f" ERR {e}") ok = False except Exception as e: print(f" ERR SHA-256 검증 오류: {type(e).__name__}: {e}") ok = False print("\n=== 6. IP 주소 해시 처리 검증 ===") try: # IP는 SHA-256으로만 저장해야 함 ip = "192.168.1.100" ip_hash = hashlib.sha256(ip.encode()).hexdigest() assert len(ip_hash) == 64, "IP 해시 길이 오류" assert ip not in ip_hash, "IP 원문이 해시에 포함되면 안 됨" print(f" OK IP {ip} -> 해시 {ip_hash[:16]}... (원본 미포함)") # 동일 IP → 동일 해시 (추적 가능) ip_hash2 = hashlib.sha256(ip.encode()).hexdigest() assert ip_hash == ip_hash2, "IP 해시 결정론적이어야 함" print(f" OK IP 해시 결정론적 (추적 가능)") # 코드에서 IP 원본 저장 금지 확인 assert "ip_addr_hash" in audit_src and "client_ip.encode" in audit_src, \ "IP 해시 저장 로직 없음" assert "ip_addr" not in audit_src.replace("ip_addr_hash", ""), \ "ip_addr 원본 저장 시도 감지" print(f" OK audit.py에서 IP 원본 저장 안 함 (ip_addr_hash만 사용)") except AssertionError as e: print(f" ERR {e}") ok = False except Exception as e: print(f" ERR IP 해시 검증 오류: {type(e).__name__}: {e}") ok = False print("\n=== 7. 심각도 레벨 확인 ===") severity_checks = [ ("INFO", "INFO 심각도"), ("WARN", "WARN 심각도"), ("ERROR", "ERROR 심각도"), ("CRITICAL", "CRITICAL 심각도"), ] for sym, desc in severity_checks: status = "OK" if f'"{sym}"' in audit_src or f"'{sym}'" in audit_src else "ERR" if status == "ERR": ok = False print(f" {status} {desc}") print("\n=== 8. 내보내기 보안 정책 확인 ===") export_checks = [ ("ADMIN 전용" in audit_src or "UserRole.ADMIN" in audit_src, "ADMIN 전용 내보내기"), ("csv" in audit_src and "json" in audit_src, "CSV/JSON 포맷 지원"), ("filename=audit_log" in audit_src, "다운로드 파일명 설정"), ("Content-Disposition" in audit_src, "Content-Disposition 헤더"), ("10000" in audit_src, "내보내기 최대 10000건 제한"), ] for check, desc in export_checks: status = "OK" if check else "ERR" if status == "ERR": ok = False print(f" {status} {desc}") print("\n=== 9. 체인 검증 로직 완성도 ===") chain_checks = [ ("broken_at", "변조 탐지 시 broken_at 반환"), ("intact", "무결성 여부 반환"), ("prev_hash_expected", "순차 prev_hash 검증"), ("compute_log_hash", "재계산으로 검증"), ("chain_start", "체인 시작 ID"), ("chain_end", "체인 종료 ID"), ("verified_at", "검증 시각"), ] for sym, desc in chain_checks: status = "OK" if sym in audit_src else "ERR" if status == "ERR": ok = False print(f" {status} {desc}") print("\n=== 10. AuditLogOut 스키마 확인 ===") pydantic_checks = [ ("prev_hash", "prev_hash 노출 (체인 검증용)"), ("entity_type", "entity_type 필드"), ("entity_id", "entity_id 필드"), ("severity", "severity 필드"), ("log_hash", "log_hash 필드"), ] for sym, desc in pydantic_checks: # AuditLogOut 클래스 내에서만 검색 import re block = re.search(r'class AuditLogOut.*?(?=\nclass |\Z)', models_src, re.DOTALL) found = sym in (block.group(0) if block else "") status = "OK" if found else "ERR" if status == "ERR": ok = False print(f" {status} {desc}") print("\n=== D-5 불변 감사 로그 Hash Chain 테스트 완료 ===") if ok: print("모든 검사 통과") else: sys.exit(1)