"""GUARDiA 스킬 레지스트리 — 운영 스킬 CRUD + 검색 + 실행""" from __future__ import annotations import json, logging, time from datetime import datetime from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel from sqlalchemy import select, desc, func from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user from database import get_db from models import User, Skill, SkillExecution logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/skills", tags=["스킬 레지스트리"]) BUILTIN_SKILLS = [ {"name": "disk_check", "description": "서버 디스크 사용량 확인", "category": "DIAGNOSTIC", "code_template": "df -h / && du -sh /var/log/*", "parameters_schema": '{"server_id":"int"}', "created_from": "BUILTIN"}, {"name": "service_status", "description": "주요 서비스 상태 확인", "category": "DIAGNOSTIC", "code_template": "systemctl status {service_name}", "parameters_schema": '{"server_id":"int","service_name":"str"}', "created_from": "BUILTIN"}, {"name": "memory_check", "description": "메모리 사용량 상세 확인", "category": "DIAGNOSTIC", "code_template": "free -h && ps aux --sort=-%mem | head -10", "parameters_schema": '{"server_id":"int"}', "created_from": "BUILTIN"}, {"name": "log_errors", "description": "최근 에러 로그 수집", "category": "DIAGNOSTIC", "code_template": "journalctl -p err --no-pager -n 20", "parameters_schema": '{"server_id":"int"}', "created_from": "BUILTIN"}, {"name": "network_check", "description": "네트워크 연결 상태 확인", "category": "NETWORK", "code_template": "ss -tlnp && ping -c 3 {target}", "parameters_schema": '{"server_id":"int","target":"str"}', "created_from": "BUILTIN"}, ] class SkillCreate(BaseModel): name: str; description: str = ""; category: str = "GENERAL" code_template: str = ""; parameters_schema: str = "{}" created_from: str = "MANUAL" class SkillExecuteIn(BaseModel): server_id: int; params: dict = {} @router.get("") async def list_skills( category: Optional[str] = None, q: Optional[str] = None, limit: int = Query(50, le=200), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): # 내장 스킬 포함 builtins = [{"id": f"builtin-{i}", **s, "is_builtin": True, "success_count": 0, "fail_count": 0} for i, s in enumerate(BUILTIN_SKILLS) if (not category or s["category"] == category) and (not q or q.lower() in s["name"].lower() or q.lower() in s["description"].lower())] stmt = select(Skill).where(Skill.is_active == True).order_by(desc(Skill.success_count)).limit(limit) if category: stmt = stmt.where(Skill.category == category) if q: stmt = stmt.where(Skill.name.contains(q) | Skill.description.contains(q)) rows = await db.execute(stmt) custom = [{"id": s.id, "name": s.name, "description": s.description, "category": s.category, "created_from": s.created_from, "success_count": s.success_count, "fail_count": s.fail_count, "is_builtin": False} for s in rows.scalars().all()] return builtins + custom @router.post("", status_code=201) async def create_skill(body: SkillCreate, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): skill = Skill( name=body.name, description=body.description, category=body.category, code_template=body.code_template, parameters_schema=body.parameters_schema, created_from=body.created_from, success_count=0, fail_count=0, is_active=True, created_at=datetime.utcnow(), ) db.add(skill); await db.commit(); await db.refresh(skill) return {"id": skill.id} @router.get("/search") async def search_skills_early(q: str = "", db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): rows = await db.execute( select(Skill).where(Skill.name.contains(q) | Skill.description.contains(q)).limit(10) ) return [{"id": s.id, "name": s.name, "category": s.category} for s in rows.scalars().all()] @router.get("/recommend") async def recommend_skills_early(context: str = "", db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): keywords = context.lower().split() candidates = [] for s in BUILTIN_SKILLS: score = sum(1 for k in keywords if k in s["name"] or k in s["description"].lower()) if score > 0: candidates.append({"skill": s["name"], "score": score, "desc": s["description"]}) candidates.sort(key=lambda x: x["score"], reverse=True) return candidates[:5] @router.get("/stats") async def skill_stats_early(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): total = (await db.execute(select(func.count(Skill.id)))).scalar() or 0 executions = (await db.execute(select(func.count(SkillExecution.id)))).scalar() or 0 return {"total_skills": total + len(BUILTIN_SKILLS), "custom_skills": total, "builtin_skills": len(BUILTIN_SKILLS), "total_executions": executions} @router.get("/{skill_id}") async def get_skill(skill_id, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): if str(skill_id).startswith("builtin-"): idx = int(str(skill_id).replace("builtin-", "")) if idx < len(BUILTIN_SKILLS): return {**BUILTIN_SKILLS[idx], "id": skill_id, "is_builtin": True} raise HTTPException(404) row = await db.execute(select(Skill).where(Skill.id == int(skill_id))) s = row.scalar_one_or_none() if not s: raise HTTPException(404) return {"id": s.id, "name": s.name, "description": s.description, "category": s.category, "code_template": s.code_template, "parameters_schema": s.parameters_schema, "success_count": s.success_count} @router.post("/{skill_id}/execute") async def execute_skill(skill_id, body: SkillExecuteIn, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): """스킬 실행 — SSH 에이전트리스.""" # 스킬 조회 if str(skill_id).startswith("builtin-"): idx = int(str(skill_id).replace("builtin-", "")) code = BUILTIN_SKILLS[idx]["code_template"] if idx < len(BUILTIN_SKILLS) else "" skill_db_id = None else: row = await db.execute(select(Skill).where(Skill.id == int(skill_id))) s = row.scalar_one_or_none() if not s: raise HTTPException(404) code = s.code_template; skill_db_id = s.id # 파라미터 치환 for k, v in body.params.items(): code = code.replace(f"{{{k}}}", str(v)) # SSH 실행 (에이전트리스) start = time.time() result = ""; success = False try: from routers.ssh import _exec_ssh_simple result = await _exec_ssh_simple(body.server_id, code, db, user) success = True except Exception as e: result = str(e) elapsed = int((time.time() - start) * 1000) # 실행 로그 저장 exec_log = SkillExecution( skill_id=skill_db_id, input_params=json.dumps(body.params), result=result[:500], success=success, duration_ms=elapsed, executed_by=user.id, executed_at=datetime.utcnow(), ) db.add(exec_log) # 성공/실패 카운트 업데이트 if skill_db_id: row = await db.execute(select(Skill).where(Skill.id == skill_db_id)) s = row.scalar_one_or_none() if s: if success: s.success_count = (s.success_count or 0) + 1 else: s.fail_count = (s.fail_count or 0) + 1 await db.commit() return {"success": success, "result": result[:300], "duration_ms": elapsed} @router.get("/search") async def search_skills(q: str = "", db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): rows = await db.execute( select(Skill).where(Skill.name.contains(q) | Skill.description.contains(q)).limit(10) ) return [{"id": s.id, "name": s.name, "category": s.category} for s in rows.scalars().all()] @router.get("/recommend") async def recommend_skills(context: str = "", db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): """컨텍스트 기반 스킬 추천.""" keywords = context.lower().split() candidates = [] for s in BUILTIN_SKILLS: score = sum(1 for k in keywords if k in s["name"] or k in s["description"].lower()) if score > 0: candidates.append({"skill": s["name"], "score": score, "desc": s["description"]}) candidates.sort(key=lambda x: x["score"], reverse=True) return candidates[:5] @router.post("/{skill_id}/feedback") async def skill_feedback(skill_id: int, success: bool, notes: str = "", db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): row = await db.execute(select(Skill).where(Skill.id == skill_id)) s = row.scalar_one_or_none() if not s: raise HTTPException(404) if success: s.success_count = (s.success_count or 0) + 1 else: s.fail_count = (s.fail_count or 0) + 1 await db.commit() return {"ok": True} @router.get("/stats") async def skill_stats(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): total = (await db.execute(select(func.count(Skill.id)))).scalar() or 0 executions = (await db.execute(select(func.count(SkillExecution.id)))).scalar() or 0 return {"total_skills": total + len(BUILTIN_SKILLS), "custom_skills": total, "builtin_skills": len(BUILTIN_SKILLS), "total_executions": executions}