zioinfo-mail/docs/deployment_engine.md
DESKTOP-TKLFCPR\ython 45f96176a6 Initial commit: GUARDiA project setup
- CLAUDE.md: project context and architecture spec
- docs/: system specs, DB schema, messenger integration, deployment engine
- skills/: guardia-deploy, guardia-agent, guardia-messenger
- .claude/settings.json: project-level permissions
- .gitignore: Python/FastAPI project

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-24 18:50:19 +09:00

7.5 KiB

[Specification] 에이전트리스 배포 엔진 (Agentless Deployment Engine)


1. 개요

대상 서버에 어떤 소프트웨어도 설치하지 않고, 표준 SSH/SFTP 프로토콜만으로
파일 배포 · 서비스 재기동 · 헬스체크 · 자동 롤백을 수행하는 핵심 실행 엔진.


2. SSH Executor 모듈 (src/deploy/ssh_executor.py)

import paramiko
import hashlib

class SSHExecutor:
    def __init__(self, host: str, user: str, password: str, port: int = 22):
        self.client = paramiko.SSHClient()
        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.client.connect(host, port=port, username=user, password=password, timeout=10)

    def execute_command(self, command: str, timeout: int = 300) -> dict:
        """원격 명령 실행 — exit_code 반드시 확인"""
        stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout)
        exit_code = stdout.channel.recv_exit_status()
        return {
            "exit_code": exit_code,
            "stdout": stdout.read().decode("utf-8", errors="replace"),
            "stderr": stderr.read().decode("utf-8", errors="replace")
        }

    def upload_file(self, local_path: str, remote_path: str) -> str:
        """SFTP 파일 업로드 + MD5 무결성 검증"""
        sftp = self.client.open_sftp()
        sftp.put(local_path, remote_path)
        sftp.close()

        # 원격지 MD5 검증
        result = self.execute_command(f"md5sum {remote_path}")
        return result["stdout"].split()[0]  # 원격 MD5

    def close(self):
        self.client.close()

3. 롤링 배포 엔진 (src/deploy/rolling_deployer.py)

import time
import requests
from .ssh_executor import SSHExecutor
from ..db.audit import log_execution

class RollingDeployer:
    HEALTH_TIMEOUT = 30  # 헬스체크 최대 대기 (초)

    def deploy(self, nodes: list, artifact_path: str, sr_id: str) -> bool:
        """무중단 롤링 배포 — 노드 순차 처리"""
        for node in nodes:
            success = self._deploy_single_node(node, artifact_path, sr_id)
            if not success:
                return False
        return True

    def _deploy_single_node(self, node: dict, artifact_path: str, sr_id: str) -> bool:
        executor = SSHExecutor(node["ip"], node["user"], node["password"], node["ssh_port"])
        try:
            # 1. Pre-check: 디스크 용량 확인
            result = executor.execute_command("df -h /app | awk 'NR==2{print $5}' | sed 's/%//'")
            if int(result["stdout"].strip()) > 90:
                raise Exception("디스크 사용량 90% 초과 — 배포 중단")

            # 2. 백업 (타임스탬프 포함)
            ts = time.strftime("%Y%m%d_%H%M%S")
            backup_cmd = f"cp -rp {node['deploy_path']} {node['deploy_path']}.bak_{ts}"
            executor.execute_command(backup_cmd)

            # 3. SFTP 전송 + 무결성 검증
            local_md5 = self._local_md5(artifact_path)
            remote_path = f"{node['deploy_path']}/{artifact_path.split('/')[-1]}"
            remote_md5 = executor.upload_file(artifact_path, remote_path)
            if local_md5 != remote_md5:
                raise Exception(f"MD5 불일치: {local_md5} vs {remote_md5}")

            # 4. WAS 재기동 (동적 자원인 경우)
            if node.get("requires_restart", True):
                executor.execute_command("sh /app/scripts/shutdown.sh")
                executor.execute_command(f"cp {remote_path} {node['deploy_path']}/")
                result = executor.execute_command("sh /app/scripts/startup.sh")
                if result["exit_code"] != 0:
                    raise Exception(f"startup.sh 실패: {result['stderr']}")

            # 5. 헬스체크
            if not self._health_check(node["ip"], node.get("health_port", 8080)):
                raise Exception("헬스체크 실패 — 롤백 시작")

            log_execution(sr_id, node["ip"], "DEPLOY_SUCCESS", 0, "")
            return True

        except Exception as e:
            self._rollback(executor, node, ts)
            log_execution(sr_id, node["ip"], "DEPLOY_FAILED", 1, str(e))
            return False
        finally:
            executor.close()

    def _health_check(self, ip: str, port: int) -> bool:
        """서비스 포트 응답 확인 — 최대 30초 대기"""
        deadline = time.time() + self.HEALTH_TIMEOUT
        while time.time() < deadline:
            try:
                r = requests.get(f"http://{ip}:{port}", timeout=3)
                if r.status_code == 200:
                    return True
            except Exception:
                pass
            time.sleep(2)
        return False

    def _rollback(self, executor: SSHExecutor, node: dict, ts: str):
        """백업본으로 자동 롤백"""
        try:
            executor.execute_command("sh /app/scripts/shutdown.sh")
            executor.execute_command(f"cp -rp {node['deploy_path']}.bak_{ts} {node['deploy_path']}")
            executor.execute_command("sh /app/scripts/startup.sh")
        except Exception as e:
            print(f"[ROLLBACK ERROR] {e}")

    @staticmethod
    def _local_md5(path: str) -> str:
        with open(path, "rb") as f:
            return hashlib.md5(f.read()).hexdigest()

4. Command Sanitizer (보안 필터)

BLACKLIST_PATTERNS = [
    r"rm\s+-[rRf].*\/",      # rm -rf /
    r"\bmkfs\b",
    r"\bformat\b",
    r"\bdrop\s+table\b",
    r"\btruncate\b",
    r">\s*/dev/sda",
]

def sanitize_command(command: str) -> None:
    """위험 명령어 사전 차단 — SecurityViolationError 발생"""
    import re
    for pattern in BLACKLIST_PATTERNS:
        if re.search(pattern, command, re.IGNORECASE):
            raise SecurityViolationError(f"금지 명령어 패턴 탐지: {pattern}")

5. 티어드 롤링 배포 (1,000+ 사이트 대응)

def tiered_rolling_deployment(sites: list, playbook: dict) -> bool:
    """
    Tier 1 (미션 크리티컬): 순차 배포, 엄격한 헬스체크
    Tier 2 (표준 사이트): 50개 단위 병렬
    Tier 3 (테스트/개발): 전량 병렬
    """
    tiers = _categorize_by_tier(sites)

    for tier_name, tier_sites in tiers.items():
        batch_size = 1 if tier_name == "TIER1" else 50
        results = []
        for i in range(0, len(tier_sites), batch_size):
            batch = tier_sites[i:i+batch_size]
            # 병렬 처리 (asyncio 또는 ThreadPoolExecutor)
            batch_results = _deploy_batch(batch, playbook)
            results.extend(batch_results)

        # Safety Gate: 실패율 > 10% 이면 중단
        fail_rate = sum(1 for r in results if not r) / len(results)
        if fail_rate > 0.1:
            print(f"[ABORT] {tier_name} 실패율 {fail_rate:.0%} > 10% — 전체 배포 중단")
            return False

    return True

6. 배포 파이프라인 API 엔드포인트

@app.post("/api/deploy/trigger")
async def trigger_deployment(sr_id: str, approver_id: str):
    """승인 완료 후 배포 엔진 가동"""
    task = db.get_task(sr_id)
    nodes = db.get_server_nodes(task["inst_id"], task["system_name"])
    artifacts = db.get_staged_artifacts(sr_id)

    deployer = RollingDeployer()
    success = deployer.deploy(nodes, artifacts[0], sr_id)

    if success:
        db.update_task_status(sr_id, "PENDING_PM_VALIDATION")
        notify_pm_for_hitl_review(sr_id)
    else:
        db.update_task_status(sr_id, "FAILED_ROLLBACK")
        notify_failure(sr_id)

    return {"sr_id": sr_id, "success": success}