"""Audit log endpoints with hash-chain verification.""" import hashlib import json from typing import List, Optional from fastapi import APIRouter, Depends, Query from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from database import get_db from models import AuditLog, AuditLogOut router = APIRouter(prefix="/api/audit", tags=["audit"]) @router.get("", response_model=List[AuditLogOut]) async def list_audit_logs( sr_id: Optional[str] = Query(None), skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_db), ): q = select(AuditLog).order_by(AuditLog.created_at.desc()) if sr_id: q = q.where(AuditLog.sr_id == sr_id) q = q.offset(skip).limit(limit) result = await db.execute(q) return result.scalars().all() @router.get("/verify") async def verify_chain(db: AsyncSession = Depends(get_db)): """Verify SHA-256 hash chain integrity.""" result = await db.execute(select(AuditLog).order_by(AuditLog.id)) logs = result.scalars().all() broken_at: Optional[int] = None for log in logs: payload = json.dumps( {"prev": log.prev_hash or "", "actor": log.actor or "", "action": log.action, "detail": log.detail or "", "ts": log.created_at.isoformat() if log.created_at else ""}, ensure_ascii=False, sort_keys=True ) expected = hashlib.sha256(payload.encode()).hexdigest() if expected != log.log_hash: broken_at = log.id break return { "total": len(logs), "intact": broken_at is None, "broken_at_id": broken_at, }