126 lines
5.3 KiB
Python
126 lines
5.3 KiB
Python
"""Edge/IoT 디바이스 모니터링"""
|
|
from __future__ import annotations
|
|
import json, logging
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
from fastapi import APIRouter, Depends, HTTPException, Request
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import select, desc
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from core.auth import get_current_user
|
|
from database import get_db
|
|
from models import User, EdgeDevice, EdgeMetric, EdgeAlert
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/api/edge", tags=["Edge/IoT"])
|
|
|
|
DEVICE_TYPES = ["SERVER_EDGE", "NETWORK_EDGE", "IOT_SENSOR", "CCTV", "KIOSK", "GATEWAY"]
|
|
|
|
|
|
class DeviceRegister(BaseModel):
|
|
name: str; device_type: str; location: str = ""
|
|
ip_hint: str = ""; protocol: str = "HTTP" # HTTP|SNMP|MQTT
|
|
metadata: dict = {}
|
|
|
|
|
|
class TelemetryIn(BaseModel):
|
|
device_token: str; metrics: dict # {"cpu":70,"memory":60,"temp":45}
|
|
timestamp: Optional[int] = None # epoch ms
|
|
|
|
|
|
class AlertCreate(BaseModel):
|
|
device_id: int; alert_type: str; message: str; severity: str = "WARNING"
|
|
|
|
|
|
@router.get("/devices")
|
|
async def list_devices(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
|
|
rows = await db.execute(select(EdgeDevice).order_by(EdgeDevice.name))
|
|
devices = rows.scalars().all()
|
|
return [{"id":d.id,"name":d.name,"device_type":d.device_type,"location":d.location,
|
|
"status":d.status,"last_seen":d.last_seen} for d in devices]
|
|
|
|
|
|
@router.post("/devices", status_code=201)
|
|
async def register_device(body: DeviceRegister, db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user)):
|
|
import uuid
|
|
d = EdgeDevice(
|
|
name=body.name, device_type=body.device_type, location=body.location,
|
|
protocol=body.protocol, device_token=str(uuid.uuid4()),
|
|
meta_json=json.dumps(body.metadata), status="REGISTERED",
|
|
registered_by=user.id, created_at=datetime.utcnow()
|
|
)
|
|
db.add(d); await db.commit(); await db.refresh(d)
|
|
return {"id": d.id, "device_token": d.device_token}
|
|
|
|
|
|
@router.get("/devices/{device_id}")
|
|
async def get_device(device_id: int, db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user)):
|
|
row = await db.execute(select(EdgeDevice).where(EdgeDevice.id == device_id))
|
|
d = row.scalar_one_or_none()
|
|
if not d: raise HTTPException(404)
|
|
return {"id":d.id,"name":d.name,"device_type":d.device_type,"location":d.location,
|
|
"protocol":d.protocol,"status":d.status,"last_seen":d.last_seen,
|
|
"meta": json.loads(d.meta_json or "{}")}
|
|
|
|
|
|
@router.get("/devices/{device_id}/metrics")
|
|
async def get_metrics(device_id: int, limit: int = 100, db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user)):
|
|
rows = await db.execute(
|
|
select(EdgeMetric).where(EdgeMetric.device_id == device_id)
|
|
.order_by(desc(EdgeMetric.recorded_at)).limit(limit)
|
|
)
|
|
metrics = rows.scalars().all()
|
|
return [{"id":m.id,"metrics":json.loads(m.metrics_json or "{}"),
|
|
"recorded_at":m.recorded_at} for m in metrics]
|
|
|
|
|
|
@router.get("/devices/{device_id}/alerts")
|
|
async def get_alerts(device_id: int, db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user)):
|
|
rows = await db.execute(
|
|
select(EdgeAlert).where(EdgeAlert.device_id == device_id)
|
|
.order_by(desc(EdgeAlert.created_at)).limit(50)
|
|
)
|
|
return [{"id":a.id,"alert_type":a.alert_type,"message":a.message,
|
|
"severity":a.severity,"created_at":a.created_at}
|
|
for a in rows.scalars().all()]
|
|
|
|
|
|
@router.post("/telemetry", status_code=202)
|
|
async def ingest_telemetry(body: TelemetryIn, db: AsyncSession = Depends(get_db)):
|
|
"""디바이스 텔레메트리 수집 (Push 방식, 인증 불필요 — 내부망 전용)."""
|
|
row = await db.execute(select(EdgeDevice).where(EdgeDevice.device_token == body.device_token))
|
|
device = row.scalar_one_or_none()
|
|
if not device:
|
|
raise HTTPException(401, "등록되지 않은 디바이스")
|
|
|
|
ts = datetime.utcfromtimestamp(body.timestamp / 1000) if body.timestamp else datetime.utcnow()
|
|
db.add(EdgeMetric(device_id=device.id, metrics_json=json.dumps(body.metrics), recorded_at=ts))
|
|
|
|
# 이상 감지 — 임계값 초과 시 알림
|
|
cpu = body.metrics.get("cpu", 0)
|
|
temp = body.metrics.get("temp", 0)
|
|
if cpu > 90:
|
|
db.add(EdgeAlert(device_id=device.id, alert_type="HIGH_CPU",
|
|
message=f"CPU {cpu}% 초과", severity="WARNING", created_at=datetime.utcnow()))
|
|
if temp > 80:
|
|
db.add(EdgeAlert(device_id=device.id, alert_type="HIGH_TEMP",
|
|
message=f"온도 {temp}°C 초과", severity="CRITICAL", created_at=datetime.utcnow()))
|
|
|
|
device.last_seen = datetime.utcnow()
|
|
device.status = "ONLINE"
|
|
await db.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@router.get("/topology")
|
|
async def get_topology(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
|
|
rows = await db.execute(select(EdgeDevice).order_by(EdgeDevice.device_type))
|
|
devices = rows.scalars().all()
|
|
nodes = [{"id": d.id, "name": d.name, "type": d.device_type,
|
|
"location": d.location, "status": d.status} for d in devices]
|
|
return {"nodes": nodes, "edge_count": len(nodes)}
|