"""운영 지식 그래프 — 서버-장애-해결책 시간적 관계 그래프""" from __future__ import annotations import json, logging from datetime import datetime from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel from sqlalchemy import select, desc, func from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user from database import get_db from models import User, KGNode, KGEdge logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/kg", tags=["지식 그래프"]) NODE_TYPES = ["SERVER", "INCIDENT", "SOLUTION", "INSTITUTION", "COMPONENT", "PATTERN"] EDGE_TYPES = ["CAUSED_BY", "SOLVED_BY", "AFFECTS", "RECURS_WITH", "SIMILAR_TO", "PART_OF"] class NodeIn(BaseModel): node_type: str; name: str; properties: dict = {} class EdgeIn(BaseModel): from_node_id: int; to_node_id: int; edge_type: str weight: float = 1.0; valid_until: Optional[datetime] = None confidence: float = 0.5; notes: str = "" @router.post("/node", status_code=201) async def add_node(body: NodeIn, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): # 중복 확인 existing = await db.execute( select(KGNode).where(KGNode.name == body.name, KGNode.node_type == body.node_type) ) node = existing.scalar_one_or_none() if node: return {"node_id": node.id, "existed": True} node = KGNode(node_type=body.node_type, name=body.name, properties_json=json.dumps(body.properties), created_at=datetime.utcnow()) db.add(node); await db.commit(); await db.refresh(node) return {"node_id": node.id, "existed": False} @router.post("/edge", status_code=201) async def add_edge(body: EdgeIn, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): edge = KGEdge( from_node_id=body.from_node_id, to_node_id=body.to_node_id, edge_type=body.edge_type, weight=body.weight, valid_from=datetime.utcnow(), valid_until=body.valid_until, confidence=body.confidence, notes=body.notes, ) db.add(edge); await db.commit(); await db.refresh(edge) return {"edge_id": edge.id} @router.get("/query") async def query_graph( node_name: Optional[str] = None, node_type: Optional[str] = None, edge_type: Optional[str] = None, limit: int = Query(20, le=100), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): stmt = select(KGNode).limit(limit) if node_name: stmt = stmt.where(KGNode.name.contains(node_name)) if node_type: stmt = stmt.where(KGNode.node_type == node_type) rows = await db.execute(stmt) nodes = rows.scalars().all() result_nodes = [] for n in nodes: # 해당 노드의 엣지 조회 edges_out = await db.execute( select(KGEdge).where(KGEdge.from_node_id == n.id).limit(5) ) edges = [{"to": e.to_node_id, "type": e.edge_type, "confidence": e.confidence} for e in edges_out.scalars().all()] result_nodes.append({ "id": n.id, "type": n.node_type, "name": n.name, "properties": json.loads(n.properties_json or "{}"), "edges": edges, "created_at": n.created_at, }) return result_nodes @router.get("/server/{server_id}/history") async def server_history(server_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): """서버별 장애 이력 그래프 조회.""" server_nodes = await db.execute( select(KGNode).where(KGNode.node_type == "SERVER") .where(KGNode.properties_json.contains(str(server_id))).limit(5) ) nodes = server_nodes.scalars().all() history = [] for n in nodes: edges = await db.execute( select(KGEdge).where(KGEdge.from_node_id == n.id).order_by(desc(KGEdge.valid_from)) ) for e in edges.scalars().all(): target = await db.execute(select(KGNode).where(KGNode.id == e.to_node_id)) t = target.scalar_one_or_none() history.append({"from": n.name, "edge": e.edge_type, "to": t.name if t else "?", "confidence": e.confidence, "when": e.valid_from}) return {"server_id": server_id, "history": history} @router.get("/pattern") async def detect_patterns(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): """반복 패턴 탐지 — 동일 엣지 타입 빈도 분석.""" rows = await db.execute( select(KGEdge.edge_type, func.count(KGEdge.id).label("cnt")) .group_by(KGEdge.edge_type).order_by(desc("cnt")) ) patterns = [{"edge_type": r[0], "count": r[1]} for r in rows.all()] return {"patterns": patterns, "total_edges": sum(p["count"] for p in patterns)} @router.get("/visualization") async def visualization_data(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): """D3.js 시각화용 노드·링크 데이터.""" nodes = (await db.execute(select(KGNode).limit(50))).scalars().all() edges = (await db.execute(select(KGEdge).limit(100))).scalars().all() return { "nodes": [{"id": n.id, "label": n.name, "type": n.node_type} for n in nodes], "links": [{"source": e.from_node_id, "target": e.to_node_id, "type": e.edge_type, "weight": e.weight} for e in edges], } @router.post("/auto-record") async def auto_record_sr( sr_id: int, server_id: int, problem: str, solution: str, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """SR 해결 후 지식 그래프 자동 기록.""" # 서버 노드 sv = await db.execute(select(KGNode).where(KGNode.name == f"server-{server_id}", KGNode.node_type == "SERVER")) sv_node = sv.scalar_one_or_none() if not sv_node: sv_node = KGNode(node_type="SERVER", name=f"server-{server_id}", properties_json=json.dumps({"server_id": server_id}), created_at=datetime.utcnow()) db.add(sv_node); await db.flush() # 장애 노드 inc = KGNode(node_type="INCIDENT", name=f"SR-{sr_id}: {problem[:50]}", properties_json=json.dumps({"sr_id": sr_id}), created_at=datetime.utcnow()) db.add(inc); await db.flush() # 해결 노드 sol = KGNode(node_type="SOLUTION", name=solution[:100], properties_json=json.dumps({"sr_id": sr_id}), created_at=datetime.utcnow()) db.add(sol); await db.flush() # 엣지 연결 db.add(KGEdge(from_node_id=sv_node.id, to_node_id=inc.id, edge_type="AFFECTS", weight=1.0, valid_from=datetime.utcnow(), confidence=0.8)) db.add(KGEdge(from_node_id=inc.id, to_node_id=sol.id, edge_type="SOLVED_BY", weight=1.0, valid_from=datetime.utcnow(), confidence=0.9)) await db.commit() return {"ok": True, "incident_node": inc.id, "solution_node": sol.id}