- itsm/ -> workspace/guardia-itsm/ - manager/ -> workspace/guardia-manager/ - app/ -> workspace/guardia-messenger/ - manual/ -> workspace/guardia-docs/ workspace/zioinfo-web/ unchanged. git mv preserves full commit history. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
266 lines
10 KiB
Python
266 lines
10 KiB
Python
"""D-5 불변 감사 로그 Hash Chain 테스트"""
|
|
import sys, ast, os, json, hashlib
|
|
|
|
os.environ.setdefault("GUARDIA_SECRET_KEY", "test-d5-secret-key-32bytes-padded!")
|
|
os.environ.setdefault("DATABASE_URL", "sqlite+aiosqlite:///./test_d5.db")
|
|
|
|
ok = True
|
|
|
|
print("=== 1. 구문 검사 ===")
|
|
files = ["routers/audit.py", "models.py"]
|
|
for f in files:
|
|
try:
|
|
with open(f, encoding="utf-8") as fh:
|
|
src = fh.read()
|
|
ast.parse(src)
|
|
print(f" OK {f}")
|
|
except SyntaxError as e:
|
|
print(f" ERR {f}: {e}")
|
|
ok = False
|
|
|
|
print("\n=== 2. routers/audit.py 엔드포인트 및 기능 확인 ===")
|
|
with open("routers/audit.py", encoding="utf-8") as f:
|
|
audit_src = f.read()
|
|
|
|
checks = [
|
|
('@router.get("",', "GET /audit 목록"),
|
|
('@router.post("/record"', "POST /record 이벤트 기록"),
|
|
('@router.get("/verify"', "GET /verify 체인 검증"),
|
|
('/verify/{from_id}/{to_id}', "GET /verify 범위 검증"),
|
|
('@router.get("/stats"', "GET /stats 통계"),
|
|
('@router.get("/export"', "GET /export 내보내기"),
|
|
('/entity/{entity_type}/{entity_id}', "GET /entity 엔티티별 이력"),
|
|
('@router.get("/{log_id}"', "GET /{log_id} 상세"),
|
|
("async def append_audit_log", "append_audit_log 함수"),
|
|
("async def _get_last_hash", "_get_last_hash 함수"),
|
|
("compute_log_hash", "compute_log_hash 사용"),
|
|
("hashlib.sha256", "SHA-256 해시"),
|
|
("prev_hash", "prev_hash 체인 연결"),
|
|
("ip_addr_hash", "IP 해시 저장"),
|
|
("client_ip", "클라이언트 IP 처리"),
|
|
("StreamingResponse", "StreamingResponse (CSV/JSON 내보내기)"),
|
|
("csv.writer", "CSV 내보내기"),
|
|
("UserRole.ADMIN", "ADMIN 전용 내보내기"),
|
|
("SEVERITY_LEVELS", "심각도 레벨 정의"),
|
|
("entity_type", "엔티티 유형 필드"),
|
|
]
|
|
for sym, desc in checks:
|
|
status = "OK" if sym in audit_src else "ERR"
|
|
if status == "ERR": ok = False
|
|
print(f" {status} {desc}")
|
|
|
|
print("\n=== 3. models.py AuditLog D-5 확장 필드 확인 ===")
|
|
with open("models.py", encoding="utf-8") as f:
|
|
models_src = f.read()
|
|
|
|
model_checks = [
|
|
("entity_type", "entity_type 컬럼"),
|
|
("entity_id", "entity_id 컬럼"),
|
|
("ip_addr_hash", "IP 해시 컬럼 (원본 저장 금지)"),
|
|
("severity", "severity 컬럼"),
|
|
("prev_hash", "prev_hash 체인 컬럼"),
|
|
("compute_log_hash", "compute_log_hash 함수"),
|
|
]
|
|
for sym, desc in model_checks:
|
|
status = "OK" if sym in models_src else "ERR"
|
|
if status == "ERR": ok = False
|
|
print(f" {status} {desc}")
|
|
|
|
print("\n=== 4. SHA-256 해시 체인 수학적 검증 ===")
|
|
try:
|
|
# compute_log_hash 로직 직접 테스트
|
|
def compute_log_hash_local(prev_hash, actor, action, detail, ts):
|
|
payload = json.dumps(
|
|
{"prev": prev_hash or "", "actor": actor, "action": action,
|
|
"detail": detail, "ts": ts},
|
|
ensure_ascii=False, sort_keys=True
|
|
)
|
|
return hashlib.sha256(payload.encode()).hexdigest()
|
|
|
|
# 로그 체인 시뮬레이션
|
|
chain = []
|
|
prev = None
|
|
events = [
|
|
("admin", "USER_LOGIN", "관리자 로그인"),
|
|
("admin", "SR_CREATE", "SR-20260526-0001 생성"),
|
|
("admin", "SR_ASSIGN", "엔지니어 배정"),
|
|
("eng1", "SR_RESOLVE", "SR 해결"),
|
|
("admin", "SR_CLOSE", "SR 종료"),
|
|
]
|
|
|
|
for actor, action, detail in events:
|
|
ts = f"2026-05-26T{len(chain):02d}:00:00"
|
|
h = compute_log_hash_local(prev, actor, action, detail, ts)
|
|
chain.append({"prev": prev, "hash": h, "actor": actor, "action": action})
|
|
prev = h
|
|
|
|
print(f" OK 체인 {len(chain)}개 생성 완료")
|
|
print(f" OK 체인 예시: {chain[0]['hash'][:16]}...")
|
|
|
|
# 체인 무결성 검증
|
|
broken = None
|
|
for i, log in enumerate(chain):
|
|
if i == 0:
|
|
exp_prev = None
|
|
else:
|
|
exp_prev = chain[i-1]["hash"]
|
|
if log["prev"] != exp_prev:
|
|
broken = i
|
|
break
|
|
h = compute_log_hash_local(log["prev"], log["actor"], log["action"],
|
|
events[i][2], f"2026-05-26T{i:02d}:00:00")
|
|
if h != log["hash"]:
|
|
broken = i
|
|
break
|
|
|
|
assert broken is None, f"정상 체인 검증 실패 at {broken}"
|
|
print(f" OK 정상 체인 무결성 검증 통과")
|
|
|
|
# 변조 시 탐지
|
|
chain_tampered = [dict(e) for e in chain]
|
|
chain_tampered[2]["action"] = "TAMPERED_ACTION" # 중간 항목 변조
|
|
|
|
broken_tampered = None
|
|
for i, log in enumerate(chain_tampered):
|
|
h = compute_log_hash_local(log["prev"], log["actor"], log["action"],
|
|
events[i][2], f"2026-05-26T{i:02d}:00:00")
|
|
if h != log["hash"]:
|
|
broken_tampered = i
|
|
break
|
|
|
|
assert broken_tampered == 2, f"변조 탐지 실패: {broken_tampered}"
|
|
print(f" OK 변조 탐지: idx={broken_tampered}에서 무결성 위반 감지")
|
|
|
|
except AssertionError as e:
|
|
print(f" ERR {e}")
|
|
ok = False
|
|
except Exception as e:
|
|
print(f" ERR 해시 체인 검증 오류: {type(e).__name__}: {e}")
|
|
ok = False
|
|
|
|
print("\n=== 5. SHA-256 결정론적 해시 검증 ===")
|
|
try:
|
|
# 동일 입력 → 동일 해시 (결정론적)
|
|
def _h(prev, actor, action, detail, ts):
|
|
payload = json.dumps(
|
|
{"prev": prev or "", "actor": actor, "action": action,
|
|
"detail": detail, "ts": ts},
|
|
ensure_ascii=False, sort_keys=True
|
|
)
|
|
return hashlib.sha256(payload.encode()).hexdigest()
|
|
|
|
h1 = _h(None, "admin", "LOGIN", "로그인 성공", "2026-01-01T00:00:00")
|
|
h2 = _h(None, "admin", "LOGIN", "로그인 성공", "2026-01-01T00:00:00")
|
|
assert h1 == h2, "동일 입력 다른 해시 — 결정론적 실패"
|
|
assert len(h1) == 64, f"SHA-256 출력 64자 기대: {len(h1)}"
|
|
print(f" OK 결정론적: 동일 입력 = {h1[:16]}...")
|
|
print(f" OK 해시 길이: {len(h1)}자")
|
|
|
|
# 다른 입력 → 다른 해시
|
|
h3 = _h(None, "admin", "LOGIN", "다른 내용", "2026-01-01T00:00:00")
|
|
assert h1 != h3, "다른 입력이 동일 해시 — 충돌 위험"
|
|
print(f" OK 다른 입력 = 다른 해시: {h3[:16]}...")
|
|
|
|
# prev_hash 체인 효과
|
|
h4 = _h("abc123", "admin", "LOGIN", "로그인 성공", "2026-01-01T00:00:00")
|
|
assert h1 != h4, "prev_hash가 해시에 영향을 미치지 않음"
|
|
print(f" OK prev_hash 체인 효과 확인")
|
|
|
|
except AssertionError as e:
|
|
print(f" ERR {e}")
|
|
ok = False
|
|
except Exception as e:
|
|
print(f" ERR SHA-256 검증 오류: {type(e).__name__}: {e}")
|
|
ok = False
|
|
|
|
print("\n=== 6. IP 주소 해시 처리 검증 ===")
|
|
try:
|
|
# IP는 SHA-256으로만 저장해야 함
|
|
ip = "192.168.1.100"
|
|
ip_hash = hashlib.sha256(ip.encode()).hexdigest()
|
|
assert len(ip_hash) == 64, "IP 해시 길이 오류"
|
|
assert ip not in ip_hash, "IP 원문이 해시에 포함되면 안 됨"
|
|
print(f" OK IP {ip} -> 해시 {ip_hash[:16]}... (원본 미포함)")
|
|
|
|
# 동일 IP → 동일 해시 (추적 가능)
|
|
ip_hash2 = hashlib.sha256(ip.encode()).hexdigest()
|
|
assert ip_hash == ip_hash2, "IP 해시 결정론적이어야 함"
|
|
print(f" OK IP 해시 결정론적 (추적 가능)")
|
|
|
|
# 코드에서 IP 원본 저장 금지 확인
|
|
assert "ip_addr_hash" in audit_src and "client_ip.encode" in audit_src, \
|
|
"IP 해시 저장 로직 없음"
|
|
assert "ip_addr" not in audit_src.replace("ip_addr_hash", ""), \
|
|
"ip_addr 원본 저장 시도 감지"
|
|
print(f" OK audit.py에서 IP 원본 저장 안 함 (ip_addr_hash만 사용)")
|
|
|
|
except AssertionError as e:
|
|
print(f" ERR {e}")
|
|
ok = False
|
|
except Exception as e:
|
|
print(f" ERR IP 해시 검증 오류: {type(e).__name__}: {e}")
|
|
ok = False
|
|
|
|
print("\n=== 7. 심각도 레벨 확인 ===")
|
|
severity_checks = [
|
|
("INFO", "INFO 심각도"),
|
|
("WARN", "WARN 심각도"),
|
|
("ERROR", "ERROR 심각도"),
|
|
("CRITICAL", "CRITICAL 심각도"),
|
|
]
|
|
for sym, desc in severity_checks:
|
|
status = "OK" if f'"{sym}"' in audit_src or f"'{sym}'" in audit_src else "ERR"
|
|
if status == "ERR": ok = False
|
|
print(f" {status} {desc}")
|
|
|
|
print("\n=== 8. 내보내기 보안 정책 확인 ===")
|
|
export_checks = [
|
|
("ADMIN 전용" in audit_src or "UserRole.ADMIN" in audit_src, "ADMIN 전용 내보내기"),
|
|
("csv" in audit_src and "json" in audit_src, "CSV/JSON 포맷 지원"),
|
|
("filename=audit_log" in audit_src, "다운로드 파일명 설정"),
|
|
("Content-Disposition" in audit_src, "Content-Disposition 헤더"),
|
|
("10000" in audit_src, "내보내기 최대 10000건 제한"),
|
|
]
|
|
for check, desc in export_checks:
|
|
status = "OK" if check else "ERR"
|
|
if status == "ERR": ok = False
|
|
print(f" {status} {desc}")
|
|
|
|
print("\n=== 9. 체인 검증 로직 완성도 ===")
|
|
chain_checks = [
|
|
("broken_at", "변조 탐지 시 broken_at 반환"),
|
|
("intact", "무결성 여부 반환"),
|
|
("prev_hash_expected", "순차 prev_hash 검증"),
|
|
("compute_log_hash", "재계산으로 검증"),
|
|
("chain_start", "체인 시작 ID"),
|
|
("chain_end", "체인 종료 ID"),
|
|
("verified_at", "검증 시각"),
|
|
]
|
|
for sym, desc in chain_checks:
|
|
status = "OK" if sym in audit_src else "ERR"
|
|
if status == "ERR": ok = False
|
|
print(f" {status} {desc}")
|
|
|
|
print("\n=== 10. AuditLogOut 스키마 확인 ===")
|
|
pydantic_checks = [
|
|
("prev_hash", "prev_hash 노출 (체인 검증용)"),
|
|
("entity_type", "entity_type 필드"),
|
|
("entity_id", "entity_id 필드"),
|
|
("severity", "severity 필드"),
|
|
("log_hash", "log_hash 필드"),
|
|
]
|
|
for sym, desc in pydantic_checks:
|
|
# AuditLogOut 클래스 내에서만 검색
|
|
import re
|
|
block = re.search(r'class AuditLogOut.*?(?=\nclass |\Z)', models_src, re.DOTALL)
|
|
found = sym in (block.group(0) if block else "")
|
|
status = "OK" if found else "ERR"
|
|
if status == "ERR": ok = False
|
|
print(f" {status} {desc}")
|
|
|
|
print("\n=== D-5 불변 감사 로그 Hash Chain 테스트 완료 ===")
|
|
if ok:
|
|
print("모든 검사 통과")
|
|
else:
|
|
sys.exit(1)
|