130 lines
5.0 KiB
Python
130 lines
5.0 KiB
Python
"""OpenTelemetry 분산 트레이싱 수집·조회"""
|
|
from __future__ import annotations
|
|
import json, logging
|
|
from datetime import datetime
|
|
from typing import List, Optional
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import select, desc, func
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from core.auth import get_current_user
|
|
from database import get_db
|
|
from models import User, OtelTrace, OtelSpan
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/api/tracing", tags=["분산 트레이싱"])
|
|
|
|
|
|
class SpanIn(BaseModel):
|
|
trace_id: str; span_id: str; parent_span_id: Optional[str] = None
|
|
service: str; operation: str
|
|
start_time: int; end_time: int # epoch ms
|
|
status: str = "OK"; attributes: dict = {}; events: List[dict] = []
|
|
|
|
|
|
class OtlpIngest(BaseModel):
|
|
spans: List[SpanIn]
|
|
|
|
|
|
@router.post("/ingest", status_code=202)
|
|
async def ingest_spans(body: OtlpIngest, db: AsyncSession = Depends(get_db)):
|
|
"""OTLP HTTP 수집 엔드포인트 (인증 불필요 — 내부망 전용)."""
|
|
new_traces: set = set()
|
|
# 1단계: 트레이스 먼저 커밋 (FK 충족)
|
|
for sp in body.spans:
|
|
if sp.trace_id not in new_traces:
|
|
existing = await db.execute(select(OtelTrace).where(OtelTrace.trace_id == sp.trace_id))
|
|
if not existing.scalar_one_or_none():
|
|
db.add(OtelTrace(trace_id=sp.trace_id, service=sp.service,
|
|
start_time=datetime.utcfromtimestamp(sp.start_time / 1000),
|
|
created_at=datetime.utcnow()))
|
|
new_traces.add(sp.trace_id)
|
|
await db.commit()
|
|
# 2단계: 스팬 저장
|
|
for sp in body.spans:
|
|
db.add(OtelSpan(
|
|
trace_id=sp.trace_id, span_id=sp.span_id, parent_span_id=sp.parent_span_id,
|
|
service=sp.service, operation=sp.operation,
|
|
start_time=datetime.utcfromtimestamp(sp.start_time / 1000),
|
|
end_time=datetime.utcfromtimestamp(sp.end_time / 1000),
|
|
duration_ms=sp.end_time - sp.start_time,
|
|
status=sp.status, attributes=json.dumps(sp.attributes),
|
|
))
|
|
await db.commit()
|
|
return {"ok": True, "ingested": len(body.spans)}
|
|
|
|
|
|
@router.get("/traces")
|
|
async def list_traces(
|
|
service: Optional[str] = None,
|
|
limit: int = Query(50, le=200),
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
q = select(OtelTrace).order_by(desc(OtelTrace.start_time)).limit(limit)
|
|
if service:
|
|
q = q.where(OtelTrace.service == service)
|
|
rows = await db.execute(q)
|
|
traces = rows.scalars().all()
|
|
return [{"trace_id": t.trace_id, "service": t.service, "start_time": t.start_time} for t in traces]
|
|
|
|
|
|
@router.get("/traces/{trace_id}")
|
|
async def get_trace(trace_id: str, db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user)):
|
|
spans = (await db.execute(
|
|
select(OtelSpan).where(OtelSpan.trace_id == trace_id).order_by(OtelSpan.start_time)
|
|
)).scalars().all()
|
|
if not spans:
|
|
raise HTTPException(404)
|
|
return {
|
|
"trace_id": trace_id,
|
|
"spans": [{
|
|
"span_id": s.span_id, "parent_span_id": s.parent_span_id,
|
|
"service": s.service, "operation": s.operation,
|
|
"start_time": s.start_time, "end_time": s.end_time,
|
|
"duration_ms": s.duration_ms, "status": s.status,
|
|
} for s in spans]
|
|
}
|
|
|
|
|
|
@router.get("/services")
|
|
async def list_services(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
|
|
rows = await db.execute(select(OtelTrace.service).distinct())
|
|
return [r[0] for r in rows.all()]
|
|
|
|
|
|
@router.get("/deps")
|
|
async def service_deps(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
|
|
"""서비스 간 호출 의존성 맵."""
|
|
rows = await db.execute(
|
|
select(OtelSpan.service, OtelSpan.operation)
|
|
.where(OtelSpan.parent_span_id.isnot(None))
|
|
.distinct().limit(100)
|
|
)
|
|
edges = [{"from": r[0], "call": r[1]} for r in rows.all()]
|
|
return {"edges": edges}
|
|
|
|
|
|
class TraceSearch(BaseModel):
|
|
service: Optional[str] = None; operation: Optional[str] = None
|
|
min_duration_ms: Optional[int] = None; status: Optional[str] = None
|
|
|
|
|
|
@router.post("/search")
|
|
async def search_traces(body: TraceSearch, db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user)):
|
|
q = select(OtelSpan).order_by(desc(OtelSpan.start_time)).limit(100)
|
|
if body.service:
|
|
q = q.where(OtelSpan.service == body.service)
|
|
if body.operation:
|
|
q = q.where(OtelSpan.operation.contains(body.operation))
|
|
if body.min_duration_ms:
|
|
q = q.where(OtelSpan.duration_ms >= body.min_duration_ms)
|
|
if body.status:
|
|
q = q.where(OtelSpan.status == body.status)
|
|
rows = await db.execute(q)
|
|
return [{"trace_id": s.trace_id, "span_id": s.span_id, "service": s.service,
|
|
"operation": s.operation, "duration_ms": s.duration_ms, "status": s.status}
|
|
for s in rows.scalars().all()]
|