""" Kubernetes 클러스터 관리 — 에이전트리스 (SSH 경유 kubectl) 기존 SSH 인프라(routers/ssh.py, routers/infra_ext.py)를 재사용하여 대상 서버에 소프트웨어 설치 없이 kubectl 명령을 SSH 경유로 실행. 엔드포인트: GET /api/k8s/clusters — 등록된 클러스터 목록 POST /api/k8s/clusters — 클러스터 등록 DELETE /api/k8s/clusters/{id} — 클러스터 삭제 GET /api/k8s/clusters/{id}/nodes — 노드 목록 + 상태 GET /api/k8s/clusters/{id}/pods — Pod 목록 (네임스페이스별) GET /api/k8s/clusters/{id}/deploys — Deployment 목록 POST /api/k8s/clusters/{id}/rollout — Deployment 롤링 업데이트 GET /api/k8s/clusters/{id}/events — 클러스터 이벤트 (WARNING 필터) GET /api/k8s/clusters/{id}/metrics — 노드 리소스 사용률 POST /api/k8s/clusters/{id}/sr — 이상 감지 → SR 자동 생성 """ from __future__ import annotations import json import logging from datetime import datetime from typing import List, Optional import paramiko from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException from pydantic import BaseModel, Field from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user, require_admin_role from database import get_db from models import User, Server, SRRequest, SRStatus, K8sCluster # 신규 모델 logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/k8s", tags=["Kubernetes"]) # ── Pydantic 스키마 ────────────────────────────────────────────────────────── class ClusterCreate(BaseModel): name: str = Field(..., max_length=100) description: Optional[str] = None ssh_server_id: int = Field(..., description="SSH 경유할 마스터 노드 서버 ID") namespace: str = Field("default", max_length=100) kubeconfig_path: str = Field("/root/.kube/config", description="마스터 노드의 kubeconfig 경로") class RolloutRequest(BaseModel): namespace: str = "default" deployment: str image: Optional[str] = None # 특정 이미지로 업데이트 (None=재시작만) # ── SSH 경유 kubectl 실행 ───────────────────────────────────────────────────── async def _kubectl(server: Server, cmd: str, kubeconfig: str = "/root/.kube/config") -> dict: """ SSH 경유 kubectl 실행 (에이전트리스 원칙). server: tb_server 레코드 (ip, ssh_user, os_pw_enc) """ from core.crypto import decrypt_password # AES-256-GCM 복호화 try: pw = decrypt_password(server.os_pw_enc) ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect( server.ip_addr, username=server.ssh_user, password=pw, timeout=15, ) full_cmd = f"KUBECONFIG={kubeconfig} kubectl {cmd} 2>&1" _, stdout, stderr = ssh.exec_command(full_cmd, timeout=30) out = stdout.read().decode('utf-8', 'replace').strip() ssh.close() return {"ok": True, "output": out} except Exception as e: logger.error(f"kubectl 실행 실패: {e}") return {"ok": False, "error": str(e)} async def _kubectl_json(server: Server, cmd: str, kubeconfig: str) -> Optional[dict]: """kubectl -o json 결과를 dict로 파싱.""" result = await _kubectl(server, f"{cmd} -o json", kubeconfig) if result["ok"]: try: return json.loads(result["output"]) except json.JSONDecodeError: return None return None # ── 엔드포인트 ─────────────────────────────────────────────────────────────── @router.get("/clusters") async def list_clusters( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): rows = await db.execute( select(K8sCluster).where(K8sCluster.tenant_id == user.tenant_id) ) clusters = rows.scalars().all() return [ { "id": c.id, "name": c.name, "description": c.description, "namespace": c.namespace, "ssh_server_id": c.ssh_server_id, "is_active": c.is_active, "created_at": c.created_at, } for c in clusters ] @router.post("/clusters") async def create_cluster( req: ClusterCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_admin_role), ): """클러스터 등록. SSH 서버 존재 여부 확인 후 연결 테스트.""" srv_row = await db.execute(select(Server).where(Server.id == req.ssh_server_id)) server = srv_row.scalar_one_or_none() if not server: raise HTTPException(404, "SSH 서버를 찾을 수 없습니다") # 연결 테스트 result = await _kubectl(server, "version --client --short", req.kubeconfig_path) if not result["ok"]: raise HTTPException(400, f"kubectl 연결 실패: {result.get('error', '')}") cluster = K8sCluster( tenant_id=user.tenant_id, name=req.name, description=req.description, ssh_server_id=req.ssh_server_id, namespace=req.namespace, kubeconfig_path=req.kubeconfig_path, is_active=True, created_at=datetime.utcnow(), ) db.add(cluster) await db.commit() await db.refresh(cluster) return {"ok": True, "id": cluster.id, "kubectl_version": result["output"][:100]} @router.delete("/clusters/{cluster_id}") async def delete_cluster( cluster_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(require_admin_role), ): row = await db.execute( select(K8sCluster).where(K8sCluster.id == cluster_id, K8sCluster.tenant_id == user.tenant_id) ) cluster = row.scalar_one_or_none() if not cluster: raise HTTPException(404, "클러스터를 찾을 수 없습니다") await db.delete(cluster) await db.commit() return {"ok": True} async def _get_cluster_server(cluster_id: int, tenant_id: int, db: AsyncSession): row = await db.execute( select(K8sCluster).where(K8sCluster.id == cluster_id, K8sCluster.tenant_id == tenant_id) ) cluster = row.scalar_one_or_none() if not cluster: raise HTTPException(404, "클러스터를 찾을 수 없습니다") srv_row = await db.execute(select(Server).where(Server.id == cluster.ssh_server_id)) server = srv_row.scalar_one_or_none() if not server: raise HTTPException(404, "SSH 서버를 찾을 수 없습니다") return cluster, server @router.get("/clusters/{cluster_id}/nodes") async def list_nodes( cluster_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """노드 목록 + 상태 (Ready/NotReady/SchedulingDisabled).""" cluster, server = await _get_cluster_server(cluster_id, user.tenant_id, db) data = await _kubectl_json(server, "get nodes", cluster.kubeconfig_path) if not data: raise HTTPException(500, "노드 목록 조회 실패") nodes = [] for item in data.get("items", []): name = item["metadata"]["name"] conditions = item["status"].get("conditions", []) ready = next((c for c in conditions if c["type"] == "Ready"), {}) capacity = item["status"].get("capacity", {}) allocatable = item["status"].get("allocatable", {}) nodes.append({ "name": name, "status": "Ready" if ready.get("status") == "True" else "NotReady", "roles": ",".join( k.split("/")[-1] for k in item["metadata"].get("labels", {}) if "node-role.kubernetes.io" in k ) or "worker", "age": item["metadata"].get("creationTimestamp", ""), "version": item["status"].get("nodeInfo", {}).get("kubeletVersion", ""), "cpu_capacity": capacity.get("cpu", "?"), "memory_capacity": capacity.get("memory", "?"), "cpu_allocatable": allocatable.get("cpu", "?"), "memory_allocatable": allocatable.get("memory", "?"), }) return {"cluster": cluster.name, "nodes": nodes, "count": len(nodes)} @router.get("/clusters/{cluster_id}/pods") async def list_pods( cluster_id: int, namespace: str = "default", db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """Pod 목록 (네임스페이스별, Running/Pending/Error 상태 포함).""" cluster, server = await _get_cluster_server(cluster_id, user.tenant_id, db) ns_flag = f"-n {namespace}" if namespace != "all" else "--all-namespaces" data = await _kubectl_json(server, f"get pods {ns_flag}", cluster.kubeconfig_path) if not data: raise HTTPException(500, "Pod 목록 조회 실패") pods = [] for item in data.get("items", []): meta = item["metadata"] status = item["status"] containers = status.get("containerStatuses", []) ready_count = sum(1 for c in containers if c.get("ready")) pods.append({ "name": meta["name"], "namespace": meta.get("namespace", namespace), "status": status.get("phase", "Unknown"), "ready": f"{ready_count}/{len(containers)}", "restarts": sum(c.get("restartCount", 0) for c in containers), "age": meta.get("creationTimestamp", ""), "node": item["spec"].get("nodeName", ""), "image": containers[0].get("image", "") if containers else "", }) return {"cluster": cluster.name, "namespace": namespace, "pods": pods, "count": len(pods)} @router.get("/clusters/{cluster_id}/deploys") async def list_deployments( cluster_id: int, namespace: str = "default", db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """Deployment 목록 + 롤아웃 상태.""" cluster, server = await _get_cluster_server(cluster_id, user.tenant_id, db) data = await _kubectl_json(server, f"get deployments -n {namespace}", cluster.kubeconfig_path) if not data: raise HTTPException(500, "Deployment 목록 조회 실패") deploys = [] for item in data.get("items", []): meta = item["metadata"] spec = item["spec"] status = item["status"] deploys.append({ "name": meta["name"], "namespace": meta.get("namespace", namespace), "desired": spec.get("replicas", 0), "ready": status.get("readyReplicas", 0), "available": status.get("availableReplicas", 0), "updated": status.get("updatedReplicas", 0), "age": meta.get("creationTimestamp", ""), }) return {"cluster": cluster.name, "deployments": deploys, "count": len(deploys)} @router.post("/clusters/{cluster_id}/rollout") async def rollout_deployment( cluster_id: int, req: RolloutRequest, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """Deployment 롤링 재시작 또는 이미지 업데이트.""" cluster, server = await _get_cluster_server(cluster_id, user.tenant_id, db) if req.image: # 이미지 업데이트 containers_cmd = f"get deployment {req.deployment} -n {req.namespace} -o jsonpath='{{.spec.template.spec.containers[0].name}}'" container_result = await _kubectl(server, containers_cmd, cluster.kubeconfig_path) container_name = container_result.get("output", "app").strip() cmd = f"set image deployment/{req.deployment} {container_name}={req.image} -n {req.namespace}" else: # 재시작만 cmd = f"rollout restart deployment/{req.deployment} -n {req.namespace}" result = await _kubectl(server, cmd, cluster.kubeconfig_path) if not result["ok"]: raise HTTPException(500, f"롤아웃 실패: {result.get('error', '')}") return { "ok": True, "deployment": req.deployment, "namespace": req.namespace, "action": "image_update" if req.image else "restart", "output": result["output"][:200], } @router.get("/clusters/{cluster_id}/events") async def cluster_events( cluster_id: int, namespace: str = "default", level: str = "Warning", db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """클러스터 이벤트 (Warning 이상 필터링).""" cluster, server = await _get_cluster_server(cluster_id, user.tenant_id, db) data = await _kubectl_json(server, f"get events -n {namespace}", cluster.kubeconfig_path) if not data: return {"events": []} events = [] for item in data.get("items", []): if level == "Warning" and item.get("type") != "Warning": continue events.append({ "type": item.get("type", "Normal"), "reason": item.get("reason", ""), "message": item.get("message", "")[:200], "object": f"{item['involvedObject'].get('kind','')}/{item['involvedObject'].get('name','')}", "count": item.get("count", 1), "first_time": item.get("firstTimestamp", ""), "last_time": item.get("lastTimestamp", ""), }) events.sort(key=lambda x: x.get("last_time", ""), reverse=True) return {"cluster": cluster.name, "namespace": namespace, "events": events[:50]} @router.post("/clusters/{cluster_id}/sr") async def create_sr_from_k8s( cluster_id: int, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """K8s Warning 이벤트 → SR 자동 생성.""" cluster, server = await _get_cluster_server(cluster_id, user.tenant_id, db) events_data = await cluster_events(cluster_id, "default", "Warning", db, user) warnings = events_data.get("events", []) created = [] for evt in warnings[:5]: # 최대 5개 SR 생성 sr = SRRequest( title=f"[K8s] {evt['reason']}: {evt['object']}", description=f"클러스터: {cluster.name}\n이벤트: {evt['message']}\n발생횟수: {evt['count']}", category="MONITORING", priority="HIGH" if evt["count"] > 5 else "MEDIUM", status=SRStatus.OPEN, created_at=datetime.utcnow(), ) db.add(sr) await db.commit() await db.refresh(sr) created.append(sr.id) return {"ok": True, "sr_created": created, "count": len(created)}