zioinfo-mail/itsm/test_d5_audit.py
DESKTOP-TKLFCPR\ython e228faabf5 feat(itsm): G-1~G-12 확장 기능 + 하네스/봇/설치스크립트 구현
G-1: 메신저 Webhook Relay + _send_to_room 실제 httpx 호출 구현
G-2: POST /api/tasks/bulk SR 대량작업 엔드포인트 (최대 100건)
G-3: 라이선스 만료 알림 스케줄러 (매일 09:00 KST)
G-4: 체험판 upgrade_banner 필드 + license.py 배너 로직
G-5: core/auto_rca.py + incidents/problem auto-rca 엔드포인트
G-6: core/deploy_impact.py + vibe impact-analysis 엔드포인트
G-7: core/ticket_classifier.py + SR 생성 시 AI 분류 + ai-suggestion API
G-8: VulnPatchRecord 모델 + vuln_scan 패치추적 4개 엔드포인트
G-9: core/jira_sync.py + gateway Jira/Confluence 연동 엔드포인트
G-10: core/push_notify.py + routers/push.py + PushSubscription 모델
G-11: approvals 다중승인 (위임/서명/기한초과/마감연장)
G-12: alembic.ini + migrations/ + cicd/migrate_to_postgres.sh

하네스: guardia-orchestrator 확장기능 Phase 반영
봇명령어: /sr /status /license /bulk 슬래시 명령어 추가
설치스크립트: setup/ (Ubuntu, CentOS, RHEL, Windows) --test 옵션 포함

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 18:18:52 +09:00

266 lines
10 KiB
Python

"""D-5 불변 감사 로그 Hash Chain 테스트"""
import sys, ast, os, json, hashlib
os.environ.setdefault("GUARDIA_SECRET_KEY", "test-d5-secret-key-32bytes-padded!")
os.environ.setdefault("DATABASE_URL", "sqlite+aiosqlite:///./test_d5.db")
ok = True
print("=== 1. 구문 검사 ===")
files = ["routers/audit.py", "models.py"]
for f in files:
try:
with open(f, encoding="utf-8") as fh:
src = fh.read()
ast.parse(src)
print(f" OK {f}")
except SyntaxError as e:
print(f" ERR {f}: {e}")
ok = False
print("\n=== 2. routers/audit.py 엔드포인트 및 기능 확인 ===")
with open("routers/audit.py", encoding="utf-8") as f:
audit_src = f.read()
checks = [
('@router.get("",', "GET /audit 목록"),
('@router.post("/record"', "POST /record 이벤트 기록"),
('@router.get("/verify"', "GET /verify 체인 검증"),
('/verify/{from_id}/{to_id}', "GET /verify 범위 검증"),
('@router.get("/stats"', "GET /stats 통계"),
('@router.get("/export"', "GET /export 내보내기"),
('/entity/{entity_type}/{entity_id}', "GET /entity 엔티티별 이력"),
('@router.get("/{log_id}"', "GET /{log_id} 상세"),
("async def append_audit_log", "append_audit_log 함수"),
("async def _get_last_hash", "_get_last_hash 함수"),
("compute_log_hash", "compute_log_hash 사용"),
("hashlib.sha256", "SHA-256 해시"),
("prev_hash", "prev_hash 체인 연결"),
("ip_addr_hash", "IP 해시 저장"),
("client_ip", "클라이언트 IP 처리"),
("StreamingResponse", "StreamingResponse (CSV/JSON 내보내기)"),
("csv.writer", "CSV 내보내기"),
("UserRole.ADMIN", "ADMIN 전용 내보내기"),
("SEVERITY_LEVELS", "심각도 레벨 정의"),
("entity_type", "엔티티 유형 필드"),
]
for sym, desc in checks:
status = "OK" if sym in audit_src else "ERR"
if status == "ERR": ok = False
print(f" {status} {desc}")
print("\n=== 3. models.py AuditLog D-5 확장 필드 확인 ===")
with open("models.py", encoding="utf-8") as f:
models_src = f.read()
model_checks = [
("entity_type", "entity_type 컬럼"),
("entity_id", "entity_id 컬럼"),
("ip_addr_hash", "IP 해시 컬럼 (원본 저장 금지)"),
("severity", "severity 컬럼"),
("prev_hash", "prev_hash 체인 컬럼"),
("compute_log_hash", "compute_log_hash 함수"),
]
for sym, desc in model_checks:
status = "OK" if sym in models_src else "ERR"
if status == "ERR": ok = False
print(f" {status} {desc}")
print("\n=== 4. SHA-256 해시 체인 수학적 검증 ===")
try:
# compute_log_hash 로직 직접 테스트
def compute_log_hash_local(prev_hash, actor, action, detail, ts):
payload = json.dumps(
{"prev": prev_hash or "", "actor": actor, "action": action,
"detail": detail, "ts": ts},
ensure_ascii=False, sort_keys=True
)
return hashlib.sha256(payload.encode()).hexdigest()
# 로그 체인 시뮬레이션
chain = []
prev = None
events = [
("admin", "USER_LOGIN", "관리자 로그인"),
("admin", "SR_CREATE", "SR-20260526-0001 생성"),
("admin", "SR_ASSIGN", "엔지니어 배정"),
("eng1", "SR_RESOLVE", "SR 해결"),
("admin", "SR_CLOSE", "SR 종료"),
]
for actor, action, detail in events:
ts = f"2026-05-26T{len(chain):02d}:00:00"
h = compute_log_hash_local(prev, actor, action, detail, ts)
chain.append({"prev": prev, "hash": h, "actor": actor, "action": action})
prev = h
print(f" OK 체인 {len(chain)}개 생성 완료")
print(f" OK 체인 예시: {chain[0]['hash'][:16]}...")
# 체인 무결성 검증
broken = None
for i, log in enumerate(chain):
if i == 0:
exp_prev = None
else:
exp_prev = chain[i-1]["hash"]
if log["prev"] != exp_prev:
broken = i
break
h = compute_log_hash_local(log["prev"], log["actor"], log["action"],
events[i][2], f"2026-05-26T{i:02d}:00:00")
if h != log["hash"]:
broken = i
break
assert broken is None, f"정상 체인 검증 실패 at {broken}"
print(f" OK 정상 체인 무결성 검증 통과")
# 변조 시 탐지
chain_tampered = [dict(e) for e in chain]
chain_tampered[2]["action"] = "TAMPERED_ACTION" # 중간 항목 변조
broken_tampered = None
for i, log in enumerate(chain_tampered):
h = compute_log_hash_local(log["prev"], log["actor"], log["action"],
events[i][2], f"2026-05-26T{i:02d}:00:00")
if h != log["hash"]:
broken_tampered = i
break
assert broken_tampered == 2, f"변조 탐지 실패: {broken_tampered}"
print(f" OK 변조 탐지: idx={broken_tampered}에서 무결성 위반 감지")
except AssertionError as e:
print(f" ERR {e}")
ok = False
except Exception as e:
print(f" ERR 해시 체인 검증 오류: {type(e).__name__}: {e}")
ok = False
print("\n=== 5. SHA-256 결정론적 해시 검증 ===")
try:
# 동일 입력 → 동일 해시 (결정론적)
def _h(prev, actor, action, detail, ts):
payload = json.dumps(
{"prev": prev or "", "actor": actor, "action": action,
"detail": detail, "ts": ts},
ensure_ascii=False, sort_keys=True
)
return hashlib.sha256(payload.encode()).hexdigest()
h1 = _h(None, "admin", "LOGIN", "로그인 성공", "2026-01-01T00:00:00")
h2 = _h(None, "admin", "LOGIN", "로그인 성공", "2026-01-01T00:00:00")
assert h1 == h2, "동일 입력 다른 해시 — 결정론적 실패"
assert len(h1) == 64, f"SHA-256 출력 64자 기대: {len(h1)}"
print(f" OK 결정론적: 동일 입력 = {h1[:16]}...")
print(f" OK 해시 길이: {len(h1)}")
# 다른 입력 → 다른 해시
h3 = _h(None, "admin", "LOGIN", "다른 내용", "2026-01-01T00:00:00")
assert h1 != h3, "다른 입력이 동일 해시 — 충돌 위험"
print(f" OK 다른 입력 = 다른 해시: {h3[:16]}...")
# prev_hash 체인 효과
h4 = _h("abc123", "admin", "LOGIN", "로그인 성공", "2026-01-01T00:00:00")
assert h1 != h4, "prev_hash가 해시에 영향을 미치지 않음"
print(f" OK prev_hash 체인 효과 확인")
except AssertionError as e:
print(f" ERR {e}")
ok = False
except Exception as e:
print(f" ERR SHA-256 검증 오류: {type(e).__name__}: {e}")
ok = False
print("\n=== 6. IP 주소 해시 처리 검증 ===")
try:
# IP는 SHA-256으로만 저장해야 함
ip = "192.168.1.100"
ip_hash = hashlib.sha256(ip.encode()).hexdigest()
assert len(ip_hash) == 64, "IP 해시 길이 오류"
assert ip not in ip_hash, "IP 원문이 해시에 포함되면 안 됨"
print(f" OK IP {ip} -> 해시 {ip_hash[:16]}... (원본 미포함)")
# 동일 IP → 동일 해시 (추적 가능)
ip_hash2 = hashlib.sha256(ip.encode()).hexdigest()
assert ip_hash == ip_hash2, "IP 해시 결정론적이어야 함"
print(f" OK IP 해시 결정론적 (추적 가능)")
# 코드에서 IP 원본 저장 금지 확인
assert "ip_addr_hash" in audit_src and "client_ip.encode" in audit_src, \
"IP 해시 저장 로직 없음"
assert "ip_addr" not in audit_src.replace("ip_addr_hash", ""), \
"ip_addr 원본 저장 시도 감지"
print(f" OK audit.py에서 IP 원본 저장 안 함 (ip_addr_hash만 사용)")
except AssertionError as e:
print(f" ERR {e}")
ok = False
except Exception as e:
print(f" ERR IP 해시 검증 오류: {type(e).__name__}: {e}")
ok = False
print("\n=== 7. 심각도 레벨 확인 ===")
severity_checks = [
("INFO", "INFO 심각도"),
("WARN", "WARN 심각도"),
("ERROR", "ERROR 심각도"),
("CRITICAL", "CRITICAL 심각도"),
]
for sym, desc in severity_checks:
status = "OK" if f'"{sym}"' in audit_src or f"'{sym}'" in audit_src else "ERR"
if status == "ERR": ok = False
print(f" {status} {desc}")
print("\n=== 8. 내보내기 보안 정책 확인 ===")
export_checks = [
("ADMIN 전용" in audit_src or "UserRole.ADMIN" in audit_src, "ADMIN 전용 내보내기"),
("csv" in audit_src and "json" in audit_src, "CSV/JSON 포맷 지원"),
("filename=audit_log" in audit_src, "다운로드 파일명 설정"),
("Content-Disposition" in audit_src, "Content-Disposition 헤더"),
("10000" in audit_src, "내보내기 최대 10000건 제한"),
]
for check, desc in export_checks:
status = "OK" if check else "ERR"
if status == "ERR": ok = False
print(f" {status} {desc}")
print("\n=== 9. 체인 검증 로직 완성도 ===")
chain_checks = [
("broken_at", "변조 탐지 시 broken_at 반환"),
("intact", "무결성 여부 반환"),
("prev_hash_expected", "순차 prev_hash 검증"),
("compute_log_hash", "재계산으로 검증"),
("chain_start", "체인 시작 ID"),
("chain_end", "체인 종료 ID"),
("verified_at", "검증 시각"),
]
for sym, desc in chain_checks:
status = "OK" if sym in audit_src else "ERR"
if status == "ERR": ok = False
print(f" {status} {desc}")
print("\n=== 10. AuditLogOut 스키마 확인 ===")
pydantic_checks = [
("prev_hash", "prev_hash 노출 (체인 검증용)"),
("entity_type", "entity_type 필드"),
("entity_id", "entity_id 필드"),
("severity", "severity 필드"),
("log_hash", "log_hash 필드"),
]
for sym, desc in pydantic_checks:
# AuditLogOut 클래스 내에서만 검색
import re
block = re.search(r'class AuditLogOut.*?(?=\nclass |\Z)', models_src, re.DOTALL)
found = sym in (block.group(0) if block else "")
status = "OK" if found else "ERR"
if status == "ERR": ok = False
print(f" {status} {desc}")
print("\n=== D-5 불변 감사 로그 Hash Chain 테스트 완료 ===")
if ok:
print("모든 검사 통과")
else:
sys.exit(1)