""" core/deploy_pipeline.py — 배포 파이프라인 오케스트레이터. 빌드 → 테스트 → 배포 → 헬스체크 → ITSM 콜백의 전체 파이프라인을 관리한다. 단계별 진행 상황은 tb_vibe_session.status 와 work_log에 기록된다. 실패 시 자동 롤백을 수행하고 SR 담당자에게 메신저 알림을 발송한다. 보안: - SSH 실행은 core/ssh_exec.py의 execute_ssh_command() 경유 (명령어 안전성 검증 포함) - 서버 자격증명(IP, SSH 계정) API 응답에 포함 금지 - 헬스체크 URL은 내부 네트워크 도메인만 허용 """ from __future__ import annotations import asyncio import logging import os from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import Any, Optional import httpx logger = logging.getLogger(__name__) # ── 환경변수 ────────────────────────────────────────────────────────────────── _HEALTH_CHECK_RETRIES = int(os.getenv("DEPLOY_HEALTH_RETRIES", "10")) _HEALTH_CHECK_INTERVAL = float(os.getenv("DEPLOY_HEALTH_INTERVAL", "10")) # 초 _DEPLOY_TIMEOUT = int(os.getenv("DEPLOY_TIMEOUT", "1800")) # 30분 # ── 파이프라인 단계 ─────────────────────────────────────────────────────────── class PipelineStage(str, Enum): PRE_CHECK = "PRE_CHECK" BUILD = "BUILD" TEST = "TEST" BACKUP = "BACKUP" DEPLOY = "DEPLOY" RESTART = "RESTART" HEALTH_CHECK = "HEALTH_CHECK" NOTIFY = "NOTIFY" COMPLETED = "COMPLETED" FAILED = "FAILED" ROLLED_BACK = "ROLLED_BACK" @dataclass class StageResult: stage: PipelineStage success: bool message: str stdout: str = "" duration: float = 0.0 # 초 @dataclass class PipelineResult: session_id: int success: bool final_stage: PipelineStage stages: list[StageResult] = field(default_factory=list) rollback_done: bool = False error_msg: str = "" @property def summary(self) -> str: lines = [f"파이프라인 {'성공' if self.success else '실패'} — 세션 {self.session_id}"] for r in self.stages: icon = "✅" if r.success else "❌" lines.append(f" {icon} {r.stage.value}: {r.message} ({r.duration:.1f}s)") if self.rollback_done: lines.append(" ⏪ 자동 롤백 완료") return "\n".join(lines) @dataclass class PipelineStatus: session_id: int current_stage: PipelineStage completed_stages: list[str] started_at: Optional[datetime] is_running: bool # ── DeployPipeline ──────────────────────────────────────────────────────────── class DeployPipeline: """ 8단계 배포 파이프라인 오케스트레이터. 사용 예시: pipeline = DeployPipeline() result = await pipeline.run(session_id=42) if not result.success: await pipeline.rollback(session_id=42) """ def __init__(self) -> None: self._running: dict[int, PipelineStatus] = {} # ── 파이프라인 실행 ──────────────────────────────────────────────────────── async def run(self, session_id: int) -> PipelineResult: """ 전체 배포 파이프라인을 실행한다. 단계별 실패 처리: - test 단계 실패 → 배포 중단, SR 상태 FAILED_TEST - health_check 10회 실패 → 자동 rollback 실행 - 기타 단계 실패 → 배포 중단 후 에러 보고 Args: session_id: tb_vibe_session.id Returns: PipelineResult """ from database import SessionLocal from models import VibeSession, VibeSessionStatus, WorkLog result = PipelineResult(session_id=session_id, success=False, final_stage=PipelineStage.PRE_CHECK) stages: list[StageResult] = [] self._running[session_id] = PipelineStatus( session_id=session_id, current_stage=PipelineStage.PRE_CHECK, completed_stages=[], started_at=datetime.utcnow(), is_running=True, ) async with SessionLocal() as db: session = await db.get(VibeSession, session_id) if not session: result.error_msg = f"세션을 찾을 수 없습니다: {session_id}" return result project = session.project try: # 1. 사전 점검 sr = await self._pre_check(session_id, project) stages.append(sr) self._advance(session_id, PipelineStage.BUILD, sr) if not sr.success: result.error_msg = sr.message result.final_stage = PipelineStage.FAILED await self._update_session(session_id, "FAILED", sr.message) return result # 2. 빌드 sr = await self._build(session_id, project) stages.append(sr) self._advance(session_id, PipelineStage.TEST, sr) if not sr.success: result.error_msg = sr.message result.final_stage = PipelineStage.FAILED await self._update_session(session_id, "FAILED", sr.message) return result # 빌드 완료 기록 await self._update_session(session_id, "BUILDING", sr.message, mark_built=True) # 3. 테스트 sr = await self._test(session_id, project) stages.append(sr) if not sr.success: result.error_msg = sr.message result.final_stage = PipelineStage.FAILED await self._update_session(session_id, "FAILED_TEST", sr.message) await self._notify(session_id, False, f"테스트 실패로 배포 중단\n{sr.message}") return result await self._update_session(session_id, "TESTING", sr.message, mark_tested=True) self._advance(session_id, PipelineStage.BACKUP, sr) # 4. 백업 sr = await self._backup(session_id, project) stages.append(sr) self._advance(session_id, PipelineStage.DEPLOY, sr) # 백업 실패는 경고만 (중단하지 않음) if not sr.success: logger.warning("배포 백업 실패 (계속 진행): session_id=%d msg=%s", session_id, sr.message) # 5. 배포 sr = await self._deploy(session_id, project) stages.append(sr) self._advance(session_id, PipelineStage.RESTART, sr) if not sr.success: result.error_msg = sr.message result.final_stage = PipelineStage.FAILED await self._update_session(session_id, "DEPLOYING", sr.message) return result await self._update_session(session_id, "DEPLOYING", sr.message) # 6. 서비스 재시작 sr = await self._restart(session_id, project) stages.append(sr) self._advance(session_id, PipelineStage.HEALTH_CHECK, sr) if not sr.success: logger.warning("서비스 재시작 실패 (헬스체크 진행): %s", sr.message) # 7. 헬스체크 sr = await self._health_check(session_id, project) stages.append(sr) if not sr.success: # 자동 롤백 logger.error("헬스체크 실패 — 자동 롤백 시작: session_id=%d", session_id) rolled_back = await self.rollback(session_id) result.rollback_done = rolled_back result.error_msg = f"헬스체크 실패 (자동 롤백: {'성공' if rolled_back else '실패'})" result.final_stage = PipelineStage.ROLLED_BACK if rolled_back else PipelineStage.FAILED await self._update_session(session_id, "FAILED_ROLLBACK", result.error_msg) await self._notify(session_id, False, result.error_msg) return result # 8. 알림 sr = await self._notify(session_id, True, "배포 성공") stages.append(sr) # 완료 result.success = True result.final_stage = PipelineStage.COMPLETED await self._update_session(session_id, "COMPLETED", "배포 완료", mark_deployed=True) logger.info("배포 파이프라인 완료: session_id=%d", session_id) except Exception as exc: result.error_msg = str(exc)[:500] result.final_stage = PipelineStage.FAILED await self._update_session(session_id, "FAILED", result.error_msg) logger.exception("배포 파이프라인 예외: session_id=%d", session_id) finally: result.stages = stages status = self._running.pop(session_id, None) if status: status.is_running = False return result # ── 롤백 ──────────────────────────────────────────────────────────────── async def rollback(self, session_id: int) -> bool: """ 이전 배포본으로 롤백한다. 백업 디렉터리에서 파일을 복원하고 서비스를 재시작한다. Returns: True: 롤백 성공, False: 실패 """ from database import SessionLocal from models import VibeSession logger.info("배포 롤백 시작: session_id=%d", session_id) async with SessionLocal() as db: session = await db.get(VibeSession, session_id) if not session or not session.project: return False project = session.project try: project_name = getattr(project, "project_name", None) or getattr(project, "name", "") deploy_path = getattr(project, "deploy_path", "") restart_cmd = getattr(project, "was_restart_cmd", "") if not deploy_path: logger.error("롤백 실패: deploy_path 미설정 session_id=%d", session_id) return False # 백업본 복원 backup_path = f"{deploy_path}.backup" from core.ssh_exec import execute_ssh_command server_id = getattr(project, "server_id", None) if server_id: restore_cmd = f"cp -rf {backup_path}/* {deploy_path}/ 2>/dev/null && echo RESTORED" result = await execute_ssh_command(server_id, restore_cmd, timeout=300) if result.exit_code != 0: logger.error("백업 복원 실패: %s", result.stdout[:200]) return False if restart_cmd: await execute_ssh_command(server_id, restart_cmd, timeout=120) logger.info("롤백 완료: session_id=%d", session_id) return True except Exception as exc: logger.exception("롤백 예외: session_id=%d err=%s", session_id, exc) return False # ── 상태 조회 ──────────────────────────────────────────────────────────── async def get_status(self, session_id: int) -> PipelineStatus: """현재 파이프라인 진행 단계를 반환한다.""" status = self._running.get(session_id) if status: return status # 실행 중 아닌 경우 DB에서 조회 from database import SessionLocal from models import VibeSession async with SessionLocal() as db: session = await db.get(VibeSession, session_id) stage_str = (session.status if session else "COMPLETED") or "COMPLETED" return PipelineStatus( session_id=session_id, current_stage=PipelineStage(stage_str) if stage_str in PipelineStage.__members__ else PipelineStage.COMPLETED, completed_stages=[], started_at=None, is_running=False, ) # ── 단계별 구현 ────────────────────────────────────────────────────────── async def _pre_check(self, session_id: int, project: Any) -> StageResult: """배포 환경 사전 점검: 서버 연결, 디스크 용량.""" start = datetime.utcnow() try: from core.ssh_exec import execute_ssh_command server_id = getattr(project, "server_id", None) if not server_id: return StageResult(PipelineStage.PRE_CHECK, False, "server_id 미설정", duration=_elapsed(start)) result = await execute_ssh_command( server_id, "df -h / | tail -1 | awk '{print $5}' | sed 's/%//'", timeout=30, ) disk_pct = int(result.stdout.strip() or "0") if disk_pct >= 90: return StageResult( PipelineStage.PRE_CHECK, False, f"디스크 용량 부족: {disk_pct}% 사용 중", duration=_elapsed(start), ) return StageResult(PipelineStage.PRE_CHECK, True, f"사전 점검 통과 (디스크 {disk_pct}%)", duration=_elapsed(start)) except Exception as exc: return StageResult(PipelineStage.PRE_CHECK, False, f"사전 점검 오류: {exc}", duration=_elapsed(start)) async def _build(self, session_id: int, project: Any) -> StageResult: """build_cmd 실행.""" start = datetime.utcnow() build_cmd = getattr(project, "build_cmd", None) if not build_cmd: return StageResult(PipelineStage.BUILD, True, "빌드 명령어 미설정 (생략)", duration=_elapsed(start)) try: from core.ssh_exec import execute_ssh_command server_id = getattr(project, "server_id", None) result = await execute_ssh_command(server_id, build_cmd, timeout=_DEPLOY_TIMEOUT) ok = result.exit_code == 0 return StageResult( PipelineStage.BUILD, ok, "빌드 성공" if ok else f"빌드 실패 (rc={result.exit_code})", stdout=result.stdout[-3000:], duration=_elapsed(start), ) except Exception as exc: return StageResult(PipelineStage.BUILD, False, str(exc), duration=_elapsed(start)) async def _test(self, session_id: int, project: Any) -> StageResult: """test_cmd 실행.""" start = datetime.utcnow() test_cmd = getattr(project, "test_cmd", None) if not test_cmd: return StageResult(PipelineStage.TEST, True, "테스트 명령어 미설정 (생략)", duration=_elapsed(start)) try: from core.ssh_exec import execute_ssh_command server_id = getattr(project, "server_id", None) result = await execute_ssh_command(server_id, test_cmd, timeout=_DEPLOY_TIMEOUT) ok = result.exit_code == 0 return StageResult( PipelineStage.TEST, ok, "테스트 통과" if ok else f"테스트 실패 (rc={result.exit_code})", stdout=result.stdout[-3000:], duration=_elapsed(start), ) except Exception as exc: return StageResult(PipelineStage.TEST, False, str(exc), duration=_elapsed(start)) async def _backup(self, session_id: int, project: Any) -> StageResult: """현재 배포본 백업.""" start = datetime.utcnow() deploy_path = getattr(project, "deploy_path", None) if not deploy_path: return StageResult(PipelineStage.BACKUP, True, "deploy_path 미설정 (백업 생략)", duration=_elapsed(start)) try: from core.ssh_exec import execute_ssh_command server_id = getattr(project, "server_id", None) cmd = f"cp -rf {deploy_path} {deploy_path}.backup 2>/dev/null && echo OK" result = await execute_ssh_command(server_id, cmd, timeout=300) ok = "OK" in (result.stdout or "") return StageResult(PipelineStage.BACKUP, ok, "백업 완료" if ok else "백업 실패 (경고)", duration=_elapsed(start)) except Exception as exc: return StageResult(PipelineStage.BACKUP, False, f"백업 오류: {exc}", duration=_elapsed(start)) async def _deploy(self, session_id: int, project: Any) -> StageResult: """SSH로 파일 전송 및 deploy_path 배포.""" start = datetime.utcnow() deploy_path = getattr(project, "deploy_path", None) deploy_cmd = getattr(project, "deploy_cmd", None) if not deploy_path and not deploy_cmd: return StageResult(PipelineStage.DEPLOY, True, "배포 설정 미지정 (생략)", duration=_elapsed(start)) try: from core.ssh_exec import execute_ssh_command server_id = getattr(project, "server_id", None) cmd = deploy_cmd or f"ls {deploy_path} > /dev/null && echo DEPLOYED" result = await execute_ssh_command(server_id, cmd, timeout=_DEPLOY_TIMEOUT) ok = result.exit_code == 0 return StageResult( PipelineStage.DEPLOY, ok, "배포 완료" if ok else f"배포 실패 (rc={result.exit_code})", stdout=result.stdout[-2000:], duration=_elapsed(start), ) except Exception as exc: return StageResult(PipelineStage.DEPLOY, False, str(exc), duration=_elapsed(start)) async def _restart(self, session_id: int, project: Any) -> StageResult: """WAS/서비스 재시작.""" start = datetime.utcnow() restart_cmd = getattr(project, "was_restart_cmd", None) if not restart_cmd: return StageResult(PipelineStage.RESTART, True, "재시작 명령어 미설정 (생략)", duration=_elapsed(start)) try: from core.ssh_exec import execute_ssh_command server_id = getattr(project, "server_id", None) result = await execute_ssh_command(server_id, restart_cmd, timeout=120) ok = result.exit_code == 0 return StageResult( PipelineStage.RESTART, ok, "서비스 재시작 완료" if ok else f"재시작 실패 (rc={result.exit_code})", duration=_elapsed(start), ) except Exception as exc: return StageResult(PipelineStage.RESTART, False, str(exc), duration=_elapsed(start)) async def _health_check(self, session_id: int, project: Any) -> StageResult: """HTTP GET 헬스체크 (최대 _HEALTH_CHECK_RETRIES 회 재시도).""" start = datetime.utcnow() health_url = getattr(project, "health_check_url", None) if not health_url: return StageResult(PipelineStage.HEALTH_CHECK, True, "헬스체크 URL 미설정 (생략)", duration=_elapsed(start)) for attempt in range(1, _HEALTH_CHECK_RETRIES + 1): try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.get(health_url) if resp.status_code < 400: return StageResult( PipelineStage.HEALTH_CHECK, True, f"헬스체크 통과 (시도 {attempt}/{_HEALTH_CHECK_RETRIES}, HTTP {resp.status_code})", duration=_elapsed(start), ) logger.warning("헬스체크 실패: attempt=%d/%d status=%d", attempt, _HEALTH_CHECK_RETRIES, resp.status_code) except Exception as exc: logger.warning("헬스체크 오류: attempt=%d/%d err=%s", attempt, _HEALTH_CHECK_RETRIES, exc) if attempt < _HEALTH_CHECK_RETRIES: await asyncio.sleep(_HEALTH_CHECK_INTERVAL) return StageResult( PipelineStage.HEALTH_CHECK, False, f"헬스체크 {_HEALTH_CHECK_RETRIES}회 모두 실패", duration=_elapsed(start), ) async def _notify(self, session_id: int, success: bool, message: str) -> StageResult: """WorkLog 등록 + 메신저 알림.""" start = datetime.utcnow() try: from database import SessionLocal from models import VibeSession, WorkLog from core.notify import send_messenger import os async with SessionLocal() as db: session = await db.get(VibeSession, session_id) log = WorkLog( sr_id = session.sr_id if session else None, work_type = "DEPLOY", content = message, result_status = "PASS" if success else "FAIL", created_by = "deploy_pipeline", ) db.add(log) await db.commit() icon = "✅" if success else "❌" room = os.getenv("MESSENGER_OPS_ROOM", "ops") await send_messenger(room, { "type": "text", "text": f"{icon} [배포 알림] 세션 {session_id}\n{message}", }) return StageResult(PipelineStage.NOTIFY, True, "알림 발송 완료", duration=_elapsed(start)) except Exception as exc: logger.warning("알림 발송 실패: %s", exc) return StageResult(PipelineStage.NOTIFY, True, f"알림 발송 실패 (무시): {exc}", duration=_elapsed(start)) # ── 내부 유틸 ───────────────────────────────────────────────────────────── def _advance(self, session_id: int, next_stage: PipelineStage, prev_result: StageResult) -> None: """파이프라인 진행 단계를 업데이트한다.""" status = self._running.get(session_id) if status is None: return if prev_result.success: status.completed_stages.append(prev_result.stage.value) status.current_stage = next_stage async def _update_session( self, session_id: int, status: str, message: str, mark_built: bool = False, mark_tested: bool = False, mark_deployed: bool = False, ) -> None: """tb_vibe_session 상태 업데이트.""" from database import SessionLocal from models import VibeSession now = datetime.utcnow() try: async with SessionLocal() as db: session = await db.get(VibeSession, session_id) if session: session.status = status session.error_msg = message if "FAILED" in status else None if mark_built: session.built_at = now if mark_tested: session.tested_at = now if mark_deployed: session.deployed_at = now await db.commit() except Exception as exc: logger.warning("세션 상태 업데이트 실패: session_id=%d err=%s", session_id, exc) # ── 유틸 ───────────────────────────────────────────────────────────────────── def _elapsed(start: datetime) -> float: return (datetime.utcnow() - start).total_seconds() # ── 싱글턴 ─────────────────────────────────────────────────────────────────── _pipeline_instance: Optional[DeployPipeline] = None def get_deploy_pipeline() -> DeployPipeline: """싱글턴 DeployPipeline 인스턴스를 반환한다.""" global _pipeline_instance if _pipeline_instance is None: _pipeline_instance = DeployPipeline() return _pipeline_instance