guardia-itsm/routers/infra_ext.py
DESKTOP-TKLFCPRython 6c85fba90f feat(itsm): 추가 기능 7개 + API 명세서 완성
[고객 셀프서비스 포털]
- routers/customer_portal.py: SR접수/추적/AI FAQ자가해결/카탈로그/만족도/통계
  POST /api/portal/faq/suggest — KB+Ollama 기반 SR 접수 전 자가해결 유도

[그룹웨어 전자결재 연동]
- routers/groupware.py: 카카오워크/네이버웍스/한컴/Custom 웹훅
  POST /api/groupware/send-approval → 결재 발송
  POST /api/groupware/callback → 승인/반려 콜백 → SR 상태 자동 갱신

[SIEM 보안 이벤트 연동]
- routers/siem.py: Elasticsearch/Splunk HEC/OpenSearch
  POST /api/siem/alert/receive → SIEM 경보 → 인시던트 자동 생성

[네트워크 토폴로지 시각화]
- routers/topology.py: CMDB CI 의존관계 D3.js 인터랙티브 그래프
  GET /api/topology/page — 드래그/줌/헬스오버레이 뷰어

[포트폴리오 + 리소스/인력 관리]
- routers/portfolio.py: 다중 프로젝트 포트폴리오 대시보드
  + 인원 배치(M/M) + 역량 매핑

[Zero Trust + Kubernetes + ERP]
- routers/infra_ext.py:
  - Zero Trust 세션 재검증 (위험점수 70 이상 → 강제 재인증)
  - K8s pods/services/nodes API 연동
  - ERP 예산 동기화

[API 명세서]
- manual/16_API_명세서.md: 전체 588개 라우트 도메인별 정리

[버그 수정]
- customer_portal.py: ServiceCatalog→ServiceItem, KBDocument.content→solution/symptoms
- customer_portal.py: catalog is_active→status="ACTIVE"

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-30 07:37:52 +09:00

320 lines
12 KiB
Python

"""
인프라 확장 모듈: Zero Trust + Kubernetes + ERP 예산
1. Zero Trust 세션 재검증 (지속 인증)
2. Kubernetes 파드/서비스 모니터링
3. ERP/예산 시스템 연동 (디지털예산회계/SAP)
환경변수:
# Zero Trust
ZERO_TRUST_INTERVAL_MIN = 30 (세션 재검증 주기, 분)
# Kubernetes
K8S_API_URL = https://kubernetes.default.svc
K8S_TOKEN = (ServiceAccount 토큰)
K8S_NAMESPACE= guardia
# ERP
ERP_TYPE = digital_budget|sap|custom
ERP_BASE_URL = http://erp.agency.go.kr
ERP_API_KEY = ...
"""
from __future__ import annotations
import logging
import os
from datetime import datetime
from typing import Optional
import httpx
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from database import get_db
from models import SiProject, User
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/infra", tags=["infra_ext"])
# ── Zero Trust 설정 ───────────────────────────────────────────────────────────
ZT_INTERVAL = int(os.getenv("ZERO_TRUST_INTERVAL_MIN", "30"))
_session_registry: dict[str, dict] = {} # token → {last_verified, user, risk_score}
# ── Kubernetes 설정 ───────────────────────────────────────────────────────────
K8S_API_URL = os.getenv("K8S_API_URL", "")
K8S_TOKEN = os.getenv("K8S_TOKEN", "")
K8S_NAMESPACE = os.getenv("K8S_NAMESPACE", "guardia")
K8S_CA_CERT = os.getenv("K8S_CA_CERT", "")
# ── ERP 설정 ──────────────────────────────────────────────────────────────────
ERP_TYPE = os.getenv("ERP_TYPE", "")
ERP_BASE = os.getenv("ERP_BASE_URL", "")
ERP_API_KEY = os.getenv("ERP_API_KEY", "")
# ═══════════════════════════════════════════════════════════
# 1. ZERO TRUST 지속 인증
# ═══════════════════════════════════════════════════════════
class ZTVerifyRequest(BaseModel):
risk_factors: Optional[list] = [] # 비정상 패턴 목록
@router.post("/zero-trust/verify")
async def zero_trust_verify(
body: ZTVerifyRequest,
cu: User = Depends(get_current_user),
):
"""Zero Trust 세션 재검증 — 주기적으로 호출하여 세션 유효성 확인."""
now = datetime.utcnow()
username = cu.username
# 위험 점수 계산 (0=정상, 100=최고위험)
risk_score = 0
if body.risk_factors:
risk_score += len(body.risk_factors) * 15
if risk_score > 100:
risk_score = 100
# 세션 등록/갱신
_session_registry[username] = {
"last_verified": now.isoformat(),
"risk_score": risk_score,
"ip": "unknown", # 실제 구현 시 Request에서 추출
}
# 고위험 세션 → 강제 재인증 요구
if risk_score >= 70:
raise HTTPException(403, "고위험 세션으로 감지되었습니다. 재인증이 필요합니다.")
return {
"verified": True,
"risk_score": risk_score,
"next_verify_min":ZT_INTERVAL,
"message": "세션이 검증되었습니다." if risk_score < 30 else "세션이 검증되었습니다. (주의 수준)",
}
@router.get("/zero-trust/sessions")
async def zt_sessions(cu: User = Depends(get_current_user)):
"""활성 세션 목록 (ADMIN 전용)."""
if cu.role != "ADMIN":
raise HTTPException(403, "ADMIN만 세션 목록을 조회할 수 있습니다.")
return {
"total_sessions": len(_session_registry),
"interval_min": ZT_INTERVAL,
"sessions": [
{"username": u, **info}
for u, info in _session_registry.items()
],
}
# ═══════════════════════════════════════════════════════════
# 2. KUBERNETES 모니터링
# ═══════════════════════════════════════════════════════════
def _k8s_headers() -> dict:
h: dict = {"Accept": "application/json"}
if K8S_TOKEN:
h["Authorization"] = f"Bearer {K8S_TOKEN}"
return h
async def _k8s_get(path: str) -> Optional[dict]:
if not K8S_API_URL:
return None
try:
async with httpx.AsyncClient(timeout=10.0, verify=False) as c:
r = await c.get(f"{K8S_API_URL}{path}", headers=_k8s_headers())
return r.json() if r.status_code == 200 else None
except Exception as e:
logger.warning("K8s API 오류: %s", e)
return None
@router.get("/k8s/pods")
async def k8s_pods(
namespace: str = K8S_NAMESPACE,
_u: User = Depends(get_current_user),
):
"""Kubernetes 파드 목록 및 상태."""
if not K8S_API_URL:
return {"enabled": False, "message": "K8S_API_URL 미설정 — Kubernetes 연동 비활성화"}
data = await _k8s_get(f"/api/v1/namespaces/{namespace}/pods")
if not data:
raise HTTPException(503, "Kubernetes API 응답 없음")
pods = []
for item in data.get("items", []):
meta = item.get("metadata", {})
status = item.get("status", {})
spec = item.get("spec", {})
containers = status.get("containerStatuses", [])
ready_cnt = sum(1 for c in containers if c.get("ready", False))
total_cnt = len(containers)
pod_phase = status.get("phase", "Unknown")
pod_status = "Running" if pod_phase == "Running" and ready_cnt == total_cnt else pod_phase
pods.append({
"name": meta.get("name"),
"namespace": meta.get("namespace"),
"status": pod_status,
"phase": pod_phase,
"ready": f"{ready_cnt}/{total_cnt}",
"restart_count": sum(c.get("restartCount", 0) for c in containers),
"node": spec.get("nodeName"),
"created_at": meta.get("creationTimestamp"),
"labels": meta.get("labels", {}),
})
running = sum(1 for p in pods if p["status"] == "Running")
return {
"enabled": True,
"namespace": namespace,
"total": len(pods),
"running": running,
"not_ready": len(pods) - running,
"pods": pods,
}
@router.get("/k8s/services")
async def k8s_services(
namespace: str = K8S_NAMESPACE,
_u: User = Depends(get_current_user),
):
"""Kubernetes 서비스 목록."""
if not K8S_API_URL:
return {"enabled": False}
data = await _k8s_get(f"/api/v1/namespaces/{namespace}/services")
if not data:
raise HTTPException(503, "Kubernetes API 응답 없음")
services = [
{
"name": item["metadata"]["name"],
"type": item["spec"].get("type", "ClusterIP"),
"cluster_ip": item["spec"].get("clusterIP"),
"ports": item["spec"].get("ports", []),
"created_at": item["metadata"].get("creationTimestamp"),
}
for item in data.get("items", [])
]
return {"enabled": True, "namespace": namespace, "total": len(services), "services": services}
@router.get("/k8s/nodes")
async def k8s_nodes(_u: User = Depends(get_current_user)):
"""Kubernetes 노드 목록 및 리소스 사용량."""
if not K8S_API_URL:
return {"enabled": False}
data = await _k8s_get("/api/v1/nodes")
if not data:
raise HTTPException(503, "Kubernetes API 응답 없음")
nodes = []
for item in data.get("items", []):
meta = item.get("metadata", {})
conds = item.get("status", {}).get("conditions", [])
ready = next((c for c in conds if c["type"] == "Ready"), {})
cap = item.get("status", {}).get("capacity", {})
nodes.append({
"name": meta.get("name"),
"ready": ready.get("status") == "True",
"cpu": cap.get("cpu"),
"memory": cap.get("memory"),
"pods": cap.get("pods"),
"os": item.get("status", {}).get("nodeInfo", {}).get("osImage", ""),
"version": item.get("status", {}).get("nodeInfo", {}).get("kubeletVersion", ""),
})
return {"enabled": True, "total": len(nodes), "ready": sum(1 for n in nodes if n["ready"]), "nodes": nodes}
# ═══════════════════════════════════════════════════════════
# 3. ERP 예산 연동
# ═══════════════════════════════════════════════════════════
async def _erp_get(path: str) -> Optional[dict]:
if not ERP_BASE:
return None
try:
headers: dict = {"Accept": "application/json"}
if ERP_API_KEY:
headers["Authorization"] = f"Bearer {ERP_API_KEY}"
async with httpx.AsyncClient(timeout=15.0) as c:
r = await c.get(f"{ERP_BASE}{path}", headers=headers)
return r.json() if r.status_code == 200 else None
except Exception as e:
logger.warning("ERP API 오류: %s", e)
return None
@router.get("/erp/budget/{project_code}")
async def erp_budget(
project_code: str,
db: AsyncSession = Depends(get_db),
_u: User = Depends(get_current_user),
):
"""ERP 예산 데이터 조회 및 프로젝트와 동기화."""
if not ERP_BASE:
return {
"enabled": False,
"message": "ERP_BASE_URL 미설정",
"fallback": "GUARDiA 내부 예산 데이터 사용 중",
}
# ERP에서 예산 데이터 조회
erp_data = await _erp_get(f"/api/budget/{project_code}")
# GUARDiA 프로젝트 데이터
from sqlalchemy import select as sel
proj = (await db.execute(
sel(SiProject).where(SiProject.project_code == project_code)
)).scalars().first()
if not proj:
raise HTTPException(404, f"프로젝트 {project_code}를 찾을 수 없습니다.")
if erp_data:
# ERP 데이터로 프로젝트 예산 갱신
erp_total = erp_data.get("budget_total", 0)
erp_used = erp_data.get("budget_used", 0)
if erp_total and erp_total != proj.budget_total:
proj.budget_total = erp_total
proj.budget_used = erp_used
await db.commit()
return {
"enabled": True,
"erp_type": ERP_TYPE,
"project_code": project_code,
"guardia_total": proj.budget_total,
"guardia_used": proj.budget_used,
"erp_data": erp_data,
"synced": bool(erp_data),
"sync_time": datetime.utcnow().isoformat(),
}
@router.get("/erp/status")
async def erp_status(_u: User = Depends(get_current_user)):
"""ERP 연동 설정 현황."""
return {
"enabled": bool(ERP_BASE),
"erp_type": ERP_TYPE or "미설정",
"base_url": ERP_BASE[:40] + "..." if ERP_BASE else "",
"endpoints": {
"budget": "/api/infra/erp/budget/{project_code}",
}
}