라우터 (611개 엔드포인트, P1+P2 75개 신규): - kubernetes.py: K8s 에이전트리스 관리 (SSH kubectl) - sso_provider.py: SAML 2.0 / OIDC / OAuth2 통합 인증 - predictive_ops.py: SLA위반·SR급증·서버장애 예측 + Ollama 인사이트 - slack_connector.py: Slack Incoming Webhook + Slash Commands - white_label.py: 기관별 브랜딩 + CSS 변수 동적 생성 DB 모델 (5개 신규): tb_k8s_cluster, tb_sso_config, tb_sso_session, tb_slack_config, tb_tenant_branding 수정: K8sCluster ForeignKey tb_server → tb_server_info Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
370 lines
14 KiB
Python
370 lines
14 KiB
Python
"""
|
|
Kubernetes 클러스터 관리 — 에이전트리스 (SSH 경유 kubectl)
|
|
|
|
기존 SSH 인프라(routers/ssh.py, routers/infra_ext.py)를 재사용하여
|
|
대상 서버에 소프트웨어 설치 없이 kubectl 명령을 SSH 경유로 실행.
|
|
|
|
엔드포인트:
|
|
GET /api/k8s/clusters — 등록된 클러스터 목록
|
|
POST /api/k8s/clusters — 클러스터 등록
|
|
DELETE /api/k8s/clusters/{id} — 클러스터 삭제
|
|
GET /api/k8s/clusters/{id}/nodes — 노드 목록 + 상태
|
|
GET /api/k8s/clusters/{id}/pods — Pod 목록 (네임스페이스별)
|
|
GET /api/k8s/clusters/{id}/deploys — Deployment 목록
|
|
POST /api/k8s/clusters/{id}/rollout — Deployment 롤링 업데이트
|
|
GET /api/k8s/clusters/{id}/events — 클러스터 이벤트 (WARNING 필터)
|
|
GET /api/k8s/clusters/{id}/metrics — 노드 리소스 사용률
|
|
POST /api/k8s/clusters/{id}/sr — 이상 감지 → SR 자동 생성
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import List, Optional
|
|
|
|
import paramiko
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
|
|
from pydantic import BaseModel, Field
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.auth import get_current_user, require_admin_role
|
|
from database import get_db
|
|
from models import User, Server, SRRequest, SRStatus, K8sCluster # 신규 모델
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/api/k8s", tags=["Kubernetes"])
|
|
|
|
|
|
# ── Pydantic 스키마 ──────────────────────────────────────────────────────────
|
|
|
|
class ClusterCreate(BaseModel):
|
|
name: str = Field(..., max_length=100)
|
|
description: Optional[str] = None
|
|
ssh_server_id: int = Field(..., description="SSH 경유할 마스터 노드 서버 ID")
|
|
namespace: str = Field("default", max_length=100)
|
|
kubeconfig_path: str = Field("/root/.kube/config", description="마스터 노드의 kubeconfig 경로")
|
|
|
|
class RolloutRequest(BaseModel):
|
|
namespace: str = "default"
|
|
deployment: str
|
|
image: Optional[str] = None # 특정 이미지로 업데이트 (None=재시작만)
|
|
|
|
|
|
# ── SSH 경유 kubectl 실행 ─────────────────────────────────────────────────────
|
|
|
|
async def _kubectl(server: Server, cmd: str, kubeconfig: str = "/root/.kube/config") -> dict:
|
|
"""
|
|
SSH 경유 kubectl 실행 (에이전트리스 원칙).
|
|
server: tb_server 레코드 (ip, ssh_user, os_pw_enc)
|
|
"""
|
|
from core.crypto import decrypt_password # AES-256-GCM 복호화
|
|
|
|
try:
|
|
pw = decrypt_password(server.os_pw_enc)
|
|
ssh = paramiko.SSHClient()
|
|
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
ssh.connect(
|
|
server.ip_addr, username=server.ssh_user,
|
|
password=pw, timeout=15,
|
|
)
|
|
full_cmd = f"KUBECONFIG={kubeconfig} kubectl {cmd} 2>&1"
|
|
_, stdout, stderr = ssh.exec_command(full_cmd, timeout=30)
|
|
out = stdout.read().decode('utf-8', 'replace').strip()
|
|
ssh.close()
|
|
return {"ok": True, "output": out}
|
|
except Exception as e:
|
|
logger.error(f"kubectl 실행 실패: {e}")
|
|
return {"ok": False, "error": str(e)}
|
|
|
|
|
|
async def _kubectl_json(server: Server, cmd: str, kubeconfig: str) -> Optional[dict]:
|
|
"""kubectl -o json 결과를 dict로 파싱."""
|
|
result = await _kubectl(server, f"{cmd} -o json", kubeconfig)
|
|
if result["ok"]:
|
|
try:
|
|
return json.loads(result["output"])
|
|
except json.JSONDecodeError:
|
|
return None
|
|
return None
|
|
|
|
|
|
# ── 엔드포인트 ───────────────────────────────────────────────────────────────
|
|
|
|
@router.get("/clusters")
|
|
async def list_clusters(
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
rows = await db.execute(
|
|
select(K8sCluster).where(K8sCluster.tenant_id == user.tenant_id)
|
|
)
|
|
clusters = rows.scalars().all()
|
|
return [
|
|
{
|
|
"id": c.id, "name": c.name, "description": c.description,
|
|
"namespace": c.namespace, "ssh_server_id": c.ssh_server_id,
|
|
"is_active": c.is_active, "created_at": c.created_at,
|
|
}
|
|
for c in clusters
|
|
]
|
|
|
|
|
|
@router.post("/clusters")
|
|
async def create_cluster(
|
|
req: ClusterCreate,
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(require_admin_role),
|
|
):
|
|
"""클러스터 등록. SSH 서버 존재 여부 확인 후 연결 테스트."""
|
|
srv_row = await db.execute(select(Server).where(Server.id == req.ssh_server_id))
|
|
server = srv_row.scalar_one_or_none()
|
|
if not server:
|
|
raise HTTPException(404, "SSH 서버를 찾을 수 없습니다")
|
|
|
|
# 연결 테스트
|
|
result = await _kubectl(server, "version --client --short", req.kubeconfig_path)
|
|
if not result["ok"]:
|
|
raise HTTPException(400, f"kubectl 연결 실패: {result.get('error', '')}")
|
|
|
|
cluster = K8sCluster(
|
|
tenant_id=user.tenant_id,
|
|
name=req.name,
|
|
description=req.description,
|
|
ssh_server_id=req.ssh_server_id,
|
|
namespace=req.namespace,
|
|
kubeconfig_path=req.kubeconfig_path,
|
|
is_active=True,
|
|
created_at=datetime.utcnow(),
|
|
)
|
|
db.add(cluster)
|
|
await db.commit()
|
|
await db.refresh(cluster)
|
|
return {"ok": True, "id": cluster.id, "kubectl_version": result["output"][:100]}
|
|
|
|
|
|
@router.delete("/clusters/{cluster_id}")
|
|
async def delete_cluster(
|
|
cluster_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(require_admin_role),
|
|
):
|
|
row = await db.execute(
|
|
select(K8sCluster).where(K8sCluster.id == cluster_id, K8sCluster.tenant_id == user.tenant_id)
|
|
)
|
|
cluster = row.scalar_one_or_none()
|
|
if not cluster:
|
|
raise HTTPException(404, "클러스터를 찾을 수 없습니다")
|
|
await db.delete(cluster)
|
|
await db.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
async def _get_cluster_server(cluster_id: int, tenant_id: int, db: AsyncSession):
|
|
row = await db.execute(
|
|
select(K8sCluster).where(K8sCluster.id == cluster_id, K8sCluster.tenant_id == tenant_id)
|
|
)
|
|
cluster = row.scalar_one_or_none()
|
|
if not cluster:
|
|
raise HTTPException(404, "클러스터를 찾을 수 없습니다")
|
|
srv_row = await db.execute(select(Server).where(Server.id == cluster.ssh_server_id))
|
|
server = srv_row.scalar_one_or_none()
|
|
if not server:
|
|
raise HTTPException(404, "SSH 서버를 찾을 수 없습니다")
|
|
return cluster, server
|
|
|
|
|
|
@router.get("/clusters/{cluster_id}/nodes")
|
|
async def list_nodes(
|
|
cluster_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""노드 목록 + 상태 (Ready/NotReady/SchedulingDisabled)."""
|
|
cluster, server = await _get_cluster_server(cluster_id, user.tenant_id, db)
|
|
data = await _kubectl_json(server, "get nodes", cluster.kubeconfig_path)
|
|
if not data:
|
|
raise HTTPException(500, "노드 목록 조회 실패")
|
|
|
|
nodes = []
|
|
for item in data.get("items", []):
|
|
name = item["metadata"]["name"]
|
|
conditions = item["status"].get("conditions", [])
|
|
ready = next((c for c in conditions if c["type"] == "Ready"), {})
|
|
capacity = item["status"].get("capacity", {})
|
|
allocatable = item["status"].get("allocatable", {})
|
|
nodes.append({
|
|
"name": name,
|
|
"status": "Ready" if ready.get("status") == "True" else "NotReady",
|
|
"roles": ",".join(
|
|
k.split("/")[-1]
|
|
for k in item["metadata"].get("labels", {})
|
|
if "node-role.kubernetes.io" in k
|
|
) or "worker",
|
|
"age": item["metadata"].get("creationTimestamp", ""),
|
|
"version": item["status"].get("nodeInfo", {}).get("kubeletVersion", ""),
|
|
"cpu_capacity": capacity.get("cpu", "?"),
|
|
"memory_capacity": capacity.get("memory", "?"),
|
|
"cpu_allocatable": allocatable.get("cpu", "?"),
|
|
"memory_allocatable": allocatable.get("memory", "?"),
|
|
})
|
|
return {"cluster": cluster.name, "nodes": nodes, "count": len(nodes)}
|
|
|
|
|
|
@router.get("/clusters/{cluster_id}/pods")
|
|
async def list_pods(
|
|
cluster_id: int,
|
|
namespace: str = "default",
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""Pod 목록 (네임스페이스별, Running/Pending/Error 상태 포함)."""
|
|
cluster, server = await _get_cluster_server(cluster_id, user.tenant_id, db)
|
|
ns_flag = f"-n {namespace}" if namespace != "all" else "--all-namespaces"
|
|
data = await _kubectl_json(server, f"get pods {ns_flag}", cluster.kubeconfig_path)
|
|
if not data:
|
|
raise HTTPException(500, "Pod 목록 조회 실패")
|
|
|
|
pods = []
|
|
for item in data.get("items", []):
|
|
meta = item["metadata"]
|
|
status = item["status"]
|
|
containers = status.get("containerStatuses", [])
|
|
ready_count = sum(1 for c in containers if c.get("ready"))
|
|
pods.append({
|
|
"name": meta["name"],
|
|
"namespace": meta.get("namespace", namespace),
|
|
"status": status.get("phase", "Unknown"),
|
|
"ready": f"{ready_count}/{len(containers)}",
|
|
"restarts": sum(c.get("restartCount", 0) for c in containers),
|
|
"age": meta.get("creationTimestamp", ""),
|
|
"node": item["spec"].get("nodeName", ""),
|
|
"image": containers[0].get("image", "") if containers else "",
|
|
})
|
|
return {"cluster": cluster.name, "namespace": namespace, "pods": pods, "count": len(pods)}
|
|
|
|
|
|
@router.get("/clusters/{cluster_id}/deploys")
|
|
async def list_deployments(
|
|
cluster_id: int,
|
|
namespace: str = "default",
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""Deployment 목록 + 롤아웃 상태."""
|
|
cluster, server = await _get_cluster_server(cluster_id, user.tenant_id, db)
|
|
data = await _kubectl_json(server, f"get deployments -n {namespace}", cluster.kubeconfig_path)
|
|
if not data:
|
|
raise HTTPException(500, "Deployment 목록 조회 실패")
|
|
|
|
deploys = []
|
|
for item in data.get("items", []):
|
|
meta = item["metadata"]
|
|
spec = item["spec"]
|
|
status = item["status"]
|
|
deploys.append({
|
|
"name": meta["name"],
|
|
"namespace": meta.get("namespace", namespace),
|
|
"desired": spec.get("replicas", 0),
|
|
"ready": status.get("readyReplicas", 0),
|
|
"available": status.get("availableReplicas", 0),
|
|
"updated": status.get("updatedReplicas", 0),
|
|
"age": meta.get("creationTimestamp", ""),
|
|
})
|
|
return {"cluster": cluster.name, "deployments": deploys, "count": len(deploys)}
|
|
|
|
|
|
@router.post("/clusters/{cluster_id}/rollout")
|
|
async def rollout_deployment(
|
|
cluster_id: int,
|
|
req: RolloutRequest,
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""Deployment 롤링 재시작 또는 이미지 업데이트."""
|
|
cluster, server = await _get_cluster_server(cluster_id, user.tenant_id, db)
|
|
|
|
if req.image:
|
|
# 이미지 업데이트
|
|
containers_cmd = f"get deployment {req.deployment} -n {req.namespace} -o jsonpath='{{.spec.template.spec.containers[0].name}}'"
|
|
container_result = await _kubectl(server, containers_cmd, cluster.kubeconfig_path)
|
|
container_name = container_result.get("output", "app").strip()
|
|
cmd = f"set image deployment/{req.deployment} {container_name}={req.image} -n {req.namespace}"
|
|
else:
|
|
# 재시작만
|
|
cmd = f"rollout restart deployment/{req.deployment} -n {req.namespace}"
|
|
|
|
result = await _kubectl(server, cmd, cluster.kubeconfig_path)
|
|
if not result["ok"]:
|
|
raise HTTPException(500, f"롤아웃 실패: {result.get('error', '')}")
|
|
|
|
return {
|
|
"ok": True,
|
|
"deployment": req.deployment,
|
|
"namespace": req.namespace,
|
|
"action": "image_update" if req.image else "restart",
|
|
"output": result["output"][:200],
|
|
}
|
|
|
|
|
|
@router.get("/clusters/{cluster_id}/events")
|
|
async def cluster_events(
|
|
cluster_id: int,
|
|
namespace: str = "default",
|
|
level: str = "Warning",
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""클러스터 이벤트 (Warning 이상 필터링)."""
|
|
cluster, server = await _get_cluster_server(cluster_id, user.tenant_id, db)
|
|
data = await _kubectl_json(server, f"get events -n {namespace}", cluster.kubeconfig_path)
|
|
if not data:
|
|
return {"events": []}
|
|
|
|
events = []
|
|
for item in data.get("items", []):
|
|
if level == "Warning" and item.get("type") != "Warning":
|
|
continue
|
|
events.append({
|
|
"type": item.get("type", "Normal"),
|
|
"reason": item.get("reason", ""),
|
|
"message": item.get("message", "")[:200],
|
|
"object": f"{item['involvedObject'].get('kind','')}/{item['involvedObject'].get('name','')}",
|
|
"count": item.get("count", 1),
|
|
"first_time": item.get("firstTimestamp", ""),
|
|
"last_time": item.get("lastTimestamp", ""),
|
|
})
|
|
events.sort(key=lambda x: x.get("last_time", ""), reverse=True)
|
|
return {"cluster": cluster.name, "namespace": namespace, "events": events[:50]}
|
|
|
|
|
|
@router.post("/clusters/{cluster_id}/sr")
|
|
async def create_sr_from_k8s(
|
|
cluster_id: int,
|
|
background_tasks: BackgroundTasks,
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""K8s Warning 이벤트 → SR 자동 생성."""
|
|
cluster, server = await _get_cluster_server(cluster_id, user.tenant_id, db)
|
|
events_data = await cluster_events(cluster_id, "default", "Warning", db, user)
|
|
warnings = events_data.get("events", [])
|
|
|
|
created = []
|
|
for evt in warnings[:5]: # 최대 5개 SR 생성
|
|
sr = SRRequest(
|
|
title=f"[K8s] {evt['reason']}: {evt['object']}",
|
|
description=f"클러스터: {cluster.name}\n이벤트: {evt['message']}\n발생횟수: {evt['count']}",
|
|
category="MONITORING",
|
|
priority="HIGH" if evt["count"] > 5 else "MEDIUM",
|
|
status=SRStatus.OPEN,
|
|
created_at=datetime.utcnow(),
|
|
)
|
|
db.add(sr)
|
|
await db.commit()
|
|
await db.refresh(sr)
|
|
created.append(sr.id)
|
|
|
|
return {"ok": True, "sr_created": created, "count": len(created)}
|