""" SSH 실행 API — SM 스크립트 원격 실행 + 임시 명령어 실행. 보안: - ADMIN / PM / ENGINEER 만 접근 가능 - 명령어 안전성 검증 (core/ssh_exec.py) - 서버 IP/계정/비밀번호 응답에 미포함 - 모든 실행 감사 로그 기록 """ import glob import os from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user from core.ssh_exec import exec_command, exec_script from database import get_db from models import User, UserRole router = APIRouter(prefix="/api/ssh", tags=["ssh"]) # SM 스크립트 루트 _SCRIPTS_ROOT = os.path.realpath( os.path.join(os.path.dirname(__file__), "..", "scripts", "sm") ) # ── 스키마 ──────────────────────────────────────────────────────────────────── class SSHExecRequest(BaseModel): server_name: str command: str timeout: int = 120 sr_id: Optional[str] = None class SSHScriptRequest(BaseModel): server_name: str script_key: str # 예: "tomcat", "system", "postgresql" env_vars: Optional[dict] = None timeout: int = 300 sr_id: Optional[str] = None class SSHResultOut(BaseModel): success: bool stdout: str stderr: str exit_code: int elapsed: float error: str # 허용된 역할 def _require_ops(current_user: User = Depends(get_current_user)) -> User: if current_user.role not in (UserRole.ADMIN, UserRole.PM, UserRole.ENGINEER): raise HTTPException(403, "SSH 실행 권한이 없습니다.") return current_user # SM 스크립트 키→경로 매핑 def _script_map() -> dict[str, str]: mapping: dict[str, str] = {} for sh in glob.glob(os.path.join(_SCRIPTS_ROOT, "**", "*.sh"), recursive=True): key = os.path.splitext(os.path.basename(sh))[0] # 단축 키 생성 (예: was_tomcat_sm → tomcat) short = key.replace("_sm", "").replace("web_", "").replace("was_", "") \ .replace("db_", "").replace("search_", "").replace("agent_", "") mapping[key] = sh mapping[short] = sh return mapping # ── 엔드포인트 ──────────────────────────────────────────────────────────────── @router.get("/scripts", summary="실행 가능한 SM 스크립트 목록") async def list_sm_scripts( _u: User = Depends(_require_ops), ): """SM 스크립트 목록 반환 (경로 비포함).""" scripts = [] for sh in sorted(glob.glob(os.path.join(_SCRIPTS_ROOT, "**", "*.sh"), recursive=True)): rel = os.path.relpath(sh, _SCRIPTS_ROOT) name = os.path.splitext(os.path.basename(sh))[0] category = rel.split(os.sep)[0] scripts.append({ "key": name, "category": category, "description": _script_description(name), }) return {"scripts": scripts, "total": len(scripts)} @router.post("/exec", response_model=SSHResultOut, summary="서버 SSH 명령 실행") async def ssh_exec( payload: SSHExecRequest, current_user: User = Depends(_require_ops), ): """ 지정 서버에 SSH 접속 후 임시 명령어 실행. 위험 패턴은 자동 차단됩니다. """ result = await exec_command( server_name=payload.server_name, command=payload.command, timeout=payload.timeout, actor=current_user.username, sr_id=payload.sr_id, ) return SSHResultOut(**result.to_dict()) @router.post("/script", response_model=SSHResultOut, summary="SM 스크립트 원격 실행") async def ssh_run_script( payload: SSHScriptRequest, current_user: User = Depends(_require_ops), ): """ 지정 서버에 SM 스크립트를 전송·실행하고 결과를 반환합니다. script_key 예시: system, tomcat, postgresql, ping, log_analysis """ sm_map = _script_map() script_path = sm_map.get(payload.script_key) if not script_path: raise HTTPException( 404, f"스크립트를 찾을 수 없습니다: {payload.script_key}. " f"GET /api/ssh/scripts 에서 목록 확인" ) result = await exec_script( server_name=payload.server_name, script_path=script_path, env_vars=payload.env_vars, timeout=payload.timeout, actor=current_user.username, sr_id=payload.sr_id, ) return SSHResultOut(**result.to_dict()) # ── 스크립트 설명 헬퍼 ──────────────────────────────────────────────────────── _DESC_MAP = { "system_health": "시스템 리소스 종합 점검 (CPU/메모리/디스크/inode/OOM)", "sm_full_check": "전체 SM 점검 오케스트레이터", "web_apache_sm": "Apache HTTP Server 점검", "web_nginx_sm": "Nginx 점검", "web_webtob_sm": "WebtoB (TMAX) 점검", "was_tomcat_sm": "Apache Tomcat WAS 점검", "was_jboss_sm": "JBoss / WildFly WAS 점검", "was_jeus_sm": "JEUS (TmaxSoft) WAS 점검", "was_weblogic_sm": "Oracle WebLogic Server 점검", "db_postgresql_sm": "PostgreSQL DB 점검", "db_oracle_sm": "Oracle DB 점검", "db_mysql_sm": "MySQL / MariaDB 점검", "db_tibero_sm": "Tibero (TmaxSoft) DB 점검", "esb_check": "ESB/MQ (ActiveMQ/IBM MQ/WSO2) 점검", "search_elasticsearch_sm": "Elasticsearch 점검", "search_solr_sm": "Apache Solr 점검", "agent_pinpoint_sm": "Pinpoint APM 점검", "agent_scouter_sm": "Scouter APM 점검", "crontab_sm": "Crontab 등록 현황 점검", "ping_test": "네트워크 연결 테스트 (ICMP/TCP)", "log_analysis": "서버 로그 분석 (ERROR/WARN/OOM 집계)", } def _script_description(name: str) -> str: return _DESC_MAP.get(name, f"{name} 점검 스크립트")