zioinfo-mail/itsm/routers/vibe.py
DESKTOP-TKLFCPR\ython e228faabf5 feat(itsm): G-1~G-12 확장 기능 + 하네스/봇/설치스크립트 구현
G-1: 메신저 Webhook Relay + _send_to_room 실제 httpx 호출 구현
G-2: POST /api/tasks/bulk SR 대량작업 엔드포인트 (최대 100건)
G-3: 라이선스 만료 알림 스케줄러 (매일 09:00 KST)
G-4: 체험판 upgrade_banner 필드 + license.py 배너 로직
G-5: core/auto_rca.py + incidents/problem auto-rca 엔드포인트
G-6: core/deploy_impact.py + vibe impact-analysis 엔드포인트
G-7: core/ticket_classifier.py + SR 생성 시 AI 분류 + ai-suggestion API
G-8: VulnPatchRecord 모델 + vuln_scan 패치추적 4개 엔드포인트
G-9: core/jira_sync.py + gateway Jira/Confluence 연동 엔드포인트
G-10: core/push_notify.py + routers/push.py + PushSubscription 모델
G-11: approvals 다중승인 (위임/서명/기한초과/마감연장)
G-12: alembic.ini + migrations/ + cicd/migrate_to_postgres.sh

하네스: guardia-orchestrator 확장기능 Phase 반영
봇명령어: /sr /status /license /bulk 슬래시 명령어 추가
설치스크립트: setup/ (Ubuntu, CentOS, RHEL, Windows) --test 옵션 포함

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 18:18:52 +09:00

935 lines
33 KiB
Python

"""
바이브 코딩 세션 관리 API.
워크플로우:
POST /api/vibe → 세션 생성 (PENDING)
PATCH /api/vibe/{id}/status → 상태 변경 (Claude CLI 세션 ID 업데이트 포함)
POST /api/vibe/{id}/build → Jenkins 파이프라인 빌드 트리거
POST /api/vibe/{id}/deploy → Jenkins 파이프라인 배포 트리거
POST /api/vibe/callback → Jenkins 파이프라인 완료 콜백 수신
GET /api/vibe/{id} → 세션 상세
"""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, Header, HTTPException
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.cicd import (
abort_build,
ping as jenkins_ping,
trigger_pipeline,
verify_callback_secret,
)
from database import get_db
from models import (
Project, SRRequest, SRStatus,
VibeSession, VibeSessionCreate, VibeSessionOut, VibeSessionUpdate,
VibeSessionStatus, User, UserRole, Server, WorkLog, WorkActionType,
)
router = APIRouter(prefix="/api/vibe", tags=["vibe"])
logger = logging.getLogger(__name__)
# ── 권한 헬퍼 ─────────────────────────────────────────────────────────────────
def _require_ops(u: User = Depends(get_current_user)) -> User:
if u.role not in (UserRole.ADMIN, UserRole.PM, UserRole.ENGINEER):
raise HTTPException(403, "바이브 세션 권한이 없습니다.")
return u
# ── 요청/응답 스키마 ──────────────────────────────────────────────────────────
class BuildRequest(BaseModel):
deploy_env: str = "dev"
skip_test: bool = False
job_name: Optional[str] = None # 명시 지정 시 사용, 없으면 프로젝트에서 자동 결정
extra_params: Optional[dict[str, Any]] = None
class DeployRequest(BaseModel):
deploy_env: str = "dev"
skip_test: bool = False
job_name: Optional[str] = None
extra_params: Optional[dict[str, Any]] = None
class StatusChangeRequest(BaseModel):
status: VibeSessionStatus
note: Optional[str] = None
claude_session_id: Optional[str] = None
class PipelineCallbackRequest(BaseModel):
"""Jenkins Stage 8: ITSM Callback에서 POST하는 페이로드."""
session_id: str
sr_id: Optional[str] = None
status: str # COMPLETED | FAILED | UNSTABLE | ROLLED_BACK …
stage: Optional[str] = None
message: Optional[str] = None
build_number: Optional[str] = None
build_url: Optional[str] = None
deploy_env: Optional[str] = None
job_name: Optional[str] = None
timestamp: Optional[str] = None
logs: Optional[dict[str, str]] = None
# 콜백 검증용 시크릿 (X-Jenkins-Secret 헤더로도 전달 가능)
secret: Optional[str] = None
# ── CRUD ─────────────────────────────────────────────────────────────────────
@router.get("", response_model=List[VibeSessionOut])
async def list_sessions(
sr_id: Optional[str] = None,
status: Optional[str] = None,
db: AsyncSession = Depends(get_db),
_u: User = Depends(_require_ops),
):
q = select(VibeSession).order_by(VibeSession.started_at.desc())
if sr_id:
q = q.where(VibeSession.sr_id == sr_id)
if status:
q = q.where(VibeSession.status == status)
r = await db.execute(q.limit(50))
return r.scalars().all()
@router.get("/{session_id}", response_model=VibeSessionOut)
async def get_session(
session_id: int,
db: AsyncSession = Depends(get_db),
_u: User = Depends(_require_ops),
):
r = await db.execute(select(VibeSession).where(VibeSession.id == session_id))
vs = r.scalars().first()
if not vs:
raise HTTPException(404, "바이브 세션을 찾을 수 없습니다.")
return vs
@router.post("", response_model=VibeSessionOut, status_code=201)
async def create_session(
payload: VibeSessionCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(_require_ops),
):
"""
바이브 코딩 세션 시작.
- sr_id: 연결할 SR
- project_id: 프로젝트 (소스 경로, 빌드 명령, 배포 서버 포함)
- claude_session_id: Claude CLI 세션 ID (이미 시작한 경우 전달)
"""
# SR 존재 확인
if payload.sr_id:
r = await db.execute(
select(SRRequest).where(SRRequest.sr_id == payload.sr_id)
)
if not r.scalars().first():
raise HTTPException(404, f"SR을 찾을 수 없습니다: {payload.sr_id}")
# 프로젝트 확인
workspace = payload.workspace_path
if payload.project_id:
r = await db.execute(
select(Project).where(Project.id == payload.project_id)
)
proj = r.scalars().first()
if not proj:
raise HTTPException(404, f"프로젝트를 찾을 수 없습니다: {payload.project_id}")
if not workspace and proj.source_path:
workspace = proj.source_path
vs = VibeSession(
sr_id=payload.sr_id,
project_id=payload.project_id,
claude_session_id=payload.claude_session_id,
workspace_path=workspace,
started_by=payload.started_by or current_user.username,
status=VibeSessionStatus.PENDING,
)
db.add(vs)
await db.commit()
await db.refresh(vs)
asyncio.create_task(_notify_vibe_started(vs, current_user.username))
return vs
@router.patch("/{session_id}/status", response_model=VibeSessionOut)
async def update_session_status(
session_id: int,
payload: StatusChangeRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(_require_ops),
):
"""
세션 상태 수동 변경 (CODING → BUILDING 등).
Claude CLI 세션 ID도 이 엔드포인트로 업데이트.
"""
r = await db.execute(select(VibeSession).where(VibeSession.id == session_id))
vs = r.scalars().first()
if not vs:
raise HTTPException(404, "바이브 세션을 찾을 수 없습니다.")
vs.status = payload.status
if payload.claude_session_id:
vs.claude_session_id = payload.claude_session_id
now = datetime.now()
if payload.status == VibeSessionStatus.CODING and not vs.started_at:
vs.started_at = now
elif payload.status == VibeSessionStatus.BUILDING:
vs.coded_at = now
elif payload.status == VibeSessionStatus.TESTING:
vs.built_at = now
elif payload.status == VibeSessionStatus.DEPLOYING:
vs.tested_at = now
elif payload.status == VibeSessionStatus.COMPLETED:
vs.deployed_at = now
await db.commit()
await db.refresh(vs)
return vs
# ── Jenkins 파이프라인 트리거 ──────────────────────────────────────────────────
@router.post("/{session_id}/build", response_model=VibeSessionOut)
async def trigger_build(
session_id: int,
payload: BuildRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(_require_ops),
):
"""
Jenkins 빌드 + 테스트 파이프라인 트리거.
tb_project.jenkins_job_name 또는 payload.job_name 사용.
"""
vs, proj = await _get_session_with_project(session_id, db)
job_name = payload.job_name or getattr(proj, "jenkins_job_name", None) \
or f"{proj.project_name}-build"
# 배포 서버 이름
target_server = await _resolve_target_server(proj, db)
try:
result = await trigger_pipeline(
job_name=job_name,
session_id=vs.id,
sr_id=vs.sr_id or "",
deploy_env=payload.deploy_env,
target_server=target_server,
skip_test=payload.skip_test,
extra_params=payload.extra_params,
)
except RuntimeError as exc:
raise HTTPException(502, f"Jenkins 연동 오류: {exc}")
# 상태 업데이트
vs.status = VibeSessionStatus.BUILDING
vs.coded_at = datetime.now()
# 큐 ID를 build_log에 임시 저장
vs.build_log = (
f"Jenkins 파이프라인 트리거 완료\n"
f"Job: {job_name}\n"
f"Queue ID: {result.get('queue_id', 'N/A')}\n"
f"Queue URL: {result.get('queue_url', 'N/A')}"
)
await db.commit()
await db.refresh(vs)
asyncio.create_task(_notify_build_triggered(vs, job_name, result))
logger.info("빌드 트리거: session=%d job=%s queue=%s", vs.id, job_name, result.get("queue_id"))
return vs
@router.post("/{session_id}/deploy", response_model=VibeSessionOut)
async def trigger_deploy(
session_id: int,
payload: DeployRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(_require_ops),
):
"""
Jenkins Full Pipeline 배포 트리거.
"""
vs, proj = await _get_session_with_project(session_id, db)
job_name = payload.job_name or getattr(proj, "jenkins_job_name", None) \
or f"{proj.project_name}-deploy"
target_server = await _resolve_target_server(proj, db)
extra: dict[str, Any] = {}
if proj.health_check_url:
extra["HEALTH_CHECK_URL"] = proj.health_check_url
if payload.extra_params:
extra.update(payload.extra_params)
try:
result = await trigger_pipeline(
job_name=job_name,
session_id=vs.id,
sr_id=vs.sr_id or "",
deploy_env=payload.deploy_env,
target_server=target_server,
skip_test=payload.skip_test,
extra_params=extra,
)
except RuntimeError as exc:
raise HTTPException(502, f"Jenkins 연동 오류: {exc}")
vs.status = VibeSessionStatus.DEPLOYING
vs.tested_at = datetime.now()
vs.deploy_log = (
f"Jenkins 배포 파이프라인 트리거 완료\n"
f"Job: {job_name}\n"
f"Environment: {payload.deploy_env}\n"
f"Queue ID: {result.get('queue_id', 'N/A')}"
)
await db.commit()
await db.refresh(vs)
asyncio.create_task(_notify_deploy_triggered(vs, job_name, result))
logger.info("배포 트리거: session=%d job=%s env=%s", vs.id, job_name, payload.deploy_env)
return vs
# ── Jenkins 콜백 수신 ─────────────────────────────────────────────────────────
@router.post("/callback")
async def pipeline_callback(
body: PipelineCallbackRequest,
x_jenkins_secret: Optional[str] = Header(None, alias="X-Jenkins-Secret"),
db: AsyncSession = Depends(get_db),
):
"""
Jenkins Stage 8 (ITSM Callback) → 파이프라인 결과 수신.
인증: X-Jenkins-Secret 헤더 또는 body.secret 으로 검증.
"""
# 시크릿 검증
received_secret = x_jenkins_secret or body.secret or ""
if not verify_callback_secret(received_secret):
raise HTTPException(401, "콜백 인증 실패")
# 세션 조회
try:
vs_id = int(body.session_id)
except ValueError:
raise HTTPException(400, "유효하지 않은 session_id")
r = await db.execute(select(VibeSession).where(VibeSession.id == vs_id))
vs = r.scalars().first()
if not vs:
raise HTTPException(404, f"바이브 세션을 찾을 수 없습니다: {body.session_id}")
# 상태 매핑
status_map = {
"BUILDING": VibeSessionStatus.BUILDING,
"TESTING": VibeSessionStatus.TESTING,
"DEPLOYING": VibeSessionStatus.DEPLOYING,
"COMPLETED": VibeSessionStatus.COMPLETED,
"FAILED": VibeSessionStatus.FAILED,
"UNSTABLE": VibeSessionStatus.FAILED,
"PENDING_APPROVAL": VibeSessionStatus.BUILDING,
"ROLLED_BACK": VibeSessionStatus.FAILED,
"CANCELLED": VibeSessionStatus.CANCELLED,
}
new_status = status_map.get(body.status.upper(), VibeSessionStatus.FAILED)
vs.status = new_status
now = datetime.now()
if body.logs:
if body.logs.get("build"):
vs.build_log = body.logs["build"][:2000]
vs.built_at = now
if body.logs.get("test"):
vs.test_result = body.logs["test"][:500]
vs.tested_at = now
if body.logs.get("deploy"):
vs.deploy_log = body.logs["deploy"][:2000]
vs.deployed_at = now
if new_status == VibeSessionStatus.FAILED:
vs.error_msg = body.message or "파이프라인 실패"
is_success = new_status == VibeSessionStatus.COMPLETED
# ── SR COMPLETED 처리 ─────────────────────────────────────
sr_notifications = []
if is_success and vs.sr_id:
rs = await db.execute(
select(SRRequest).where(SRRequest.sr_id == vs.sr_id)
)
sr = rs.scalars().first()
if sr and sr.status not in (SRStatus.COMPLETED, SRStatus.FAILED_ROLLBACK):
sr.status = SRStatus.COMPLETED
sr.updated_at = now
final_msg = (
body.message
or f"CI/CD 배포 완료 (빌드 #{body.build_number or 'N/A'})"
)
db.add(WorkLog(
sr_id=vs.sr_id,
engineer="jenkins-bot",
action_type=WorkActionType.COMPLETE,
content="CI/CD 파이프라인 배포 완료",
result=final_msg,
is_success=True,
))
sr_notifications.append((sr, final_msg))
await db.commit()
# 알림 (비동기)
for sr_obj, msg in sr_notifications:
asyncio.create_task(_notify_sr_done(sr_obj, msg))
asyncio.create_task(_notify_pipeline_result(vs, body, is_success))
# A-3: 배포 완료/실패 통합 알림
asyncio.create_task(_a3_notify_deploy_completed(
vs, is_success,
summary=body.message or f"빌드 #{body.build_number or 'N/A'}",
))
logger.info(
"Jenkins 콜백 수신: session=%d status=%s build=#%s",
vs.id, body.status, body.build_number or "N/A",
)
return {
"ok": True,
"sr_id": vs.sr_id,
"sr_status": SRStatus.COMPLETED if is_success else "UNCHANGED",
"notified": ["messenger"],
}
# ── Jenkins 연결 상태 확인 ──────────────────────────────────────────────────────
@router.get("/jenkins/health")
async def jenkins_health(_u: User = Depends(_require_ops)):
"""Jenkins 서버 연결 상태 확인."""
result = await jenkins_ping()
if not result.get("ok"):
raise HTTPException(503, f"Jenkins 연결 실패: {result.get('error', '알 수 없는 오류')}")
return result
# ── 내부 헬퍼 ────────────────────────────────────────────────────────────────
async def _get_session_with_project(
session_id: int,
db: AsyncSession,
) -> tuple[VibeSession, Project]:
r = await db.execute(select(VibeSession).where(VibeSession.id == session_id))
vs = r.scalars().first()
if not vs:
raise HTTPException(404, "바이브 세션을 찾을 수 없습니다.")
if not vs.project_id:
raise HTTPException(400, "프로젝트가 연결되지 않은 세션입니다.")
rp = await db.execute(select(Project).where(Project.id == vs.project_id))
proj = rp.scalars().first()
if not proj:
raise HTTPException(400, "프로젝트를 찾을 수 없습니다.")
return vs, proj
async def _resolve_target_server(proj: Project, db: AsyncSession) -> str:
"""프로젝트의 배포 서버 이름을 반환."""
if not proj.deploy_server_id:
return ""
rs = await db.execute(
select(Server).where(Server.id == proj.deploy_server_id)
)
srv = rs.scalars().first()
return srv.server_name if srv else ""
# ── 알림 헬퍼 ─────────────────────────────────────────────────────────────────
async def _notify_vibe_started(vs: VibeSession, actor: str):
try:
import httpx, os
webhook = os.environ.get("MESSENGER_WEBHOOK", "")
if not webhook:
return
await _post_webhook(webhook, {
"event": "vibe_started",
"room": os.environ.get("NOTIFY_ROOM", "ops"),
"session_id": vs.id,
"sr_id": vs.sr_id or "",
"actor": actor,
"workspace": vs.workspace_path or "",
})
except Exception:
pass
async def _notify_build_triggered(vs: VibeSession, job_name: str, result: dict):
try:
import os
webhook = os.environ.get("MESSENGER_WEBHOOK", "")
if not webhook:
return
await _post_webhook(webhook, {
"event": "build_triggered",
"room": os.environ.get("NOTIFY_ROOM", "ops"),
"session_id": vs.id,
"sr_id": vs.sr_id or "",
"job_name": job_name,
"queue_id": result.get("queue_id"),
})
except Exception:
pass
async def _notify_deploy_triggered(vs: VibeSession, job_name: str, result: dict):
try:
import os
webhook = os.environ.get("MESSENGER_WEBHOOK", "")
if not webhook:
return
await _post_webhook(webhook, {
"event": "deploy_triggered",
"room": os.environ.get("NOTIFY_ROOM", "ops"),
"session_id": vs.id,
"sr_id": vs.sr_id or "",
"job_name": job_name,
"queue_id": result.get("queue_id"),
})
except Exception:
pass
async def _notify_pipeline_result(
vs: VibeSession,
body: PipelineCallbackRequest,
success: bool,
):
try:
import os
webhook = os.environ.get("MESSENGER_WEBHOOK", "")
if not webhook:
return
await _post_webhook(webhook, {
"event": "pipeline_result",
"room": os.environ.get("NOTIFY_ROOM", "ops"),
"session_id": vs.id,
"sr_id": vs.sr_id or "",
"success": success,
"status": body.status,
"message": body.message or "",
"build_number": body.build_number or "",
"build_url": body.build_url or "",
"deploy_env": body.deploy_env or "",
})
except Exception:
pass
async def _notify_sr_done(sr: SRRequest, final_msg: str):
try:
from core.notify import notify_sr_status_changed
await notify_sr_status_changed(sr, SRStatus.COMPLETED, final_msg)
except Exception:
pass
async def _post_webhook(url: str, payload: dict):
import httpx
async with httpx.AsyncClient(timeout=5.0) as client:
await client.post(url, json=payload)
# ── Priority 5: Jenkins 운영 배포 승인 연동 ──────────────────────────────────
class ApprovalRequestBody(BaseModel):
environment: str = "prd"
build_number: Optional[str] = None
@router.post("/{session_id}/request-approval")
async def request_approval(
session_id: int,
body: ApprovalRequestBody,
db: AsyncSession = Depends(get_db),
):
"""
Jenkins Pipeline → ITSM 승인 요청 생성.
PRD 배포 시 Jenkins가 이 엔드포인트를 호출하여 PM 승인을 요청한다.
인증: ITSM_TOKEN (Jenkins에서 Bearer 토큰으로 전달)
"""
r = await db.execute(select(VibeSession).where(VibeSession.id == session_id))
vs = r.scalars().first()
if not vs:
raise HTTPException(404, f"세션 없음: {session_id}")
# 승인 요청 상태로 전환
vs.status = VibeSessionStatus.BUILDING # 승인 대기 중
vs.error_msg = None
# build_number를 deploy_log에 보관
vs.deploy_log = (
f"PRD 배포 승인 대기 중\n"
f"환경: {body.environment}\n"
f"빌드: #{body.build_number or 'N/A'}"
)
await db.commit()
# PM에게 메신저 알림 (기존)
asyncio.create_task(_notify_approval_requested(vs, body.environment, body.build_number))
# A-3: 승인 필요 통합 알림 (이메일 + 메신저)
asyncio.create_task(_a3_notify_approval_required(vs))
logger.info("PRD 배포 승인 요청: session=%d env=%s", session_id, body.environment)
return {"ok": True, "status": "PENDING", "session_id": session_id}
@router.get("/{session_id}/approval-status")
async def get_approval_status(
session_id: int,
db: AsyncSession = Depends(get_db),
):
"""
Jenkins Pipeline polling → 승인 상태 반환.
Jenkins `waitUntil` 블록이 이 엔드포인트를 폴링한다.
Returns: "PENDING" | "APPROVED" | "REJECTED"
"""
r = await db.execute(select(VibeSession).where(VibeSession.id == session_id))
vs = r.scalars().first()
if not vs:
raise HTTPException(404, f"세션 없음: {session_id}")
if vs.status == VibeSessionStatus.DEPLOYING:
return "APPROVED"
elif vs.status in (VibeSessionStatus.FAILED, VibeSessionStatus.CANCELLED):
return "REJECTED"
return "PENDING"
@router.patch("/{session_id}/approve")
async def approve_deploy(
session_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
PM이 ITSM UI에서 PRD 배포를 승인한다.
승인되면 approval-status 폴링이 "APPROVED"를 반환하여 Jenkins 파이프라인이 재개된다.
"""
if current_user.role not in (UserRole.ADMIN, UserRole.PM):
raise HTTPException(403, "PM 또는 ADMIN 권한이 필요합니다.")
r = await db.execute(select(VibeSession).where(VibeSession.id == session_id))
vs = r.scalars().first()
if not vs:
raise HTTPException(404, f"세션 없음: {session_id}")
vs.status = VibeSessionStatus.DEPLOYING
vs.tested_at = datetime.now()
await db.commit()
await db.refresh(vs)
logger.info("PRD 배포 승인: session=%d by=%s", session_id, current_user.username)
asyncio.create_task(_notify_approval_result(vs, True, current_user.username))
return {"ok": True, "status": "APPROVED", "session_id": session_id}
@router.patch("/{session_id}/reject")
async def reject_deploy(
session_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""PM이 PRD 배포를 반려한다."""
if current_user.role not in (UserRole.ADMIN, UserRole.PM):
raise HTTPException(403, "PM 또는 ADMIN 권한이 필요합니다.")
r = await db.execute(select(VibeSession).where(VibeSession.id == session_id))
vs = r.scalars().first()
if not vs:
raise HTTPException(404, f"세션 없음: {session_id}")
vs.status = VibeSessionStatus.FAILED
vs.error_msg = f"PRD 배포 반려 by {current_user.username}"
await db.commit()
await db.refresh(vs)
logger.info("PRD 배포 반려: session=%d by=%s", session_id, current_user.username)
asyncio.create_task(_notify_approval_result(vs, False, current_user.username))
return {"ok": True, "status": "REJECTED", "session_id": session_id}
# ── Priority 5: SonarQube Quality Gate 결과 수신 ─────────────────────────────
class SonarResultBody(BaseModel):
session_id: int
project_key: str
@router.post("/sonar-result")
async def receive_sonar_result(
body: SonarResultBody,
db: AsyncSession = Depends(get_db),
):
"""
Jenkins Quality Gate 단계에서 SonarQube 분석 결과를 ITSM에 전달한다.
Quality Gate 실패(ERROR) 시 SR을 자동 생성한다.
"""
from core.cicd import get_sonarqube_result, handle_sonar_gate_failure
try:
result = await get_sonarqube_result(body.project_key)
except Exception as exc:
logger.warning("SonarQube 결과 조회 실패: %s", exc)
return {"ok": False, "error": str(exc)}
# 세션에 소나큐브 결과 기록
r = await db.execute(select(VibeSession).where(VibeSession.id == body.session_id))
vs = r.scalars().first()
if vs:
import json
vs.test_result = json.dumps({
"sonar_status": result.status,
"coverage": result.coverage,
"bugs": result.bugs,
"vulnerabilities": result.vulnerabilities,
"code_smells": result.code_smells,
}, ensure_ascii=False)
vs.tested_at = datetime.now()
await db.commit()
# Quality Gate 실패 → SR 자동 생성
if result.status == "ERROR":
await handle_sonar_gate_failure(body.session_id, result, db)
return {
"ok": True,
"status": result.status,
"coverage": result.coverage,
"bugs": result.bugs,
}
# ── Priority 5: 빌드 로그 SSE 스트리밍 ───────────────────────────────────────
@router.get("/{session_id}/build/stream")
async def stream_build_log(
session_id: int,
_u: User = Depends(_require_ops),
):
"""
Jenkins 빌드 로그를 실시간 SSE로 스트리밍한다.
Jenkins Progressive Log API를 폴링하여 청크 단위로 전달한다.
"""
from fastapi.responses import StreamingResponse
from core.cicd import get_progressive_log, is_build_complete
import json
async def _generator():
offset = 0
max_iter = 300 # 최대 10분 (2초 간격)
for _ in range(max_iter):
try:
log_chunk, next_offset = await get_progressive_log(session_id, offset)
if log_chunk:
yield f"data: {json.dumps({'chunk': log_chunk, 'offset': next_offset})}\n\n"
offset = next_offset
done = await is_build_complete(session_id)
if done:
yield f"data: {json.dumps({'done': True})}\n\n"
break
except Exception as exc:
yield f"data: {json.dumps({'error': str(exc)})}\n\n"
break
await asyncio.sleep(2)
return StreamingResponse(_generator(), media_type="text/event-stream")
# ── G-6: 배포 영향 분석 ───────────────────────────────────────────────────────
@router.post("/{session_id}/impact-analysis")
async def deploy_impact_analysis(
session_id: int,
db: AsyncSession = Depends(get_db),
_u: User = Depends(_require_ops),
):
"""바이브 세션의 대상 서버 CI 의존성 분석으로 배포 영향 범위 파악."""
from models import VibeSession, ConfigItem, Server
from sqlalchemy import select
vs = await db.get(VibeSession, session_id)
if not vs:
raise HTTPException(404, "세션을 찾을 수 없습니다.")
# 대상 서버를 통해 CI 목록 수집
ci_id = None
if vs.target_server:
srv = (await db.execute(
select(Server).where(Server.server_name == vs.target_server)
)).scalars().first()
if srv:
ci = (await db.execute(
select(ConfigItem).where(ConfigItem.linked_server_id == srv.id)
)).scalars().first()
if ci:
ci_id = ci.id
if not ci_id:
return {
"session_id": session_id,
"message": "연결된 CI 정보가 없습니다. CMDB에서 서버와 CI를 연결하세요.",
"risk_level": "UNKNOWN",
"affected_cis": [],
"affected_institutions": [],
}
try:
from core.deploy_impact import analyze_deploy_impact
result = await analyze_deploy_impact(ci_id, db)
result["session_id"] = session_id
return result
except Exception as e:
raise HTTPException(500, f"영향 분석 오류: {str(e)[:200]}")
# ── 승인 알림 헬퍼 ────────────────────────────────────────────────────────────
async def _notify_approval_requested(vs: VibeSession, env: str, build_number: Optional[str]):
try:
import os
from core.notify import send_messenger
room = os.getenv("MESSENGER_PM_ROOM", "pm")
await send_messenger(room, {
"type": "text",
"text": (
f"🚀 [PRD 배포 승인 요청]\n"
f"세션: {vs.id} | SR: {vs.sr_id or ''}\n"
f"환경: {env} | 빌드: #{build_number or 'N/A'}\n"
f"ITSM에서 승인 또는 반려해주세요."
),
})
except Exception:
pass
async def _notify_approval_result(vs: VibeSession, approved: bool, approver: str):
try:
import os
from core.notify import send_messenger
icon = "" if approved else ""
room = os.getenv("MESSENGER_OPS_ROOM", "ops")
await send_messenger(room, {
"type": "text",
"text": (
f"{icon} [PRD 배포 {'승인' if approved else '반려'}]\n"
f"세션: {vs.id} | SR: {vs.sr_id or ''}\n"
f"처리자: {approver}"
),
})
except Exception:
pass
# ── A-3: 배포 승인/완료 통합 알림 헬퍼 ──────────────────────────────────────
async def _a3_notify_approval_required(vs: VibeSession) -> None:
"""
A-3: 배포 승인 필요 시 이메일 + 메신저 통합 알림.
PM / ADMIN 역할 사용자에게 승인 요청 발송.
"""
try:
import os
from core.notify import notify_deploy_approval_required
from database import SessionLocal
from sqlalchemy import select
from models import User, UserRole
# 승인 담당자: PM + ADMIN 역할 사용자 조회
async with SessionLocal() as db:
result = await db.execute(
select(User.username).where(
User.role.in_([UserRole.PM, UserRole.ADMIN]),
User.is_active == True,
)
)
approvers = [row[0] for row in result.all()]
if not approvers:
approvers = ["admin"]
project_name = f"세션 #{vs.id}"
if vs.project_id:
try:
async with SessionLocal() as db:
from models import Project
pr = await db.execute(
select(Project.project_name).where(Project.id == vs.project_id)
)
row = pr.first()
if row:
project_name = row[0]
except Exception:
pass
base_url = os.getenv("ITSM_BASE_URL", "http://localhost:8000")
approve_url = f"{base_url}/vibe?session={vs.id}&action=approve"
await notify_deploy_approval_required(
session_id=vs.id,
sr_id=vs.sr_id,
project_name=project_name,
approvers=approvers,
approve_url=approve_url,
)
except Exception as exc:
logger.debug("A-3 승인 알림 오류: %s", exc)
async def _a3_notify_deploy_completed(vs: VibeSession, success: bool, summary: str = "") -> None:
"""
A-3: 배포 완료/실패 시 운영팀 통합 알림.
Jenkins 콜백 수신 후 호출.
"""
try:
from core.notify import notify_deploy_completed
project_name = f"세션 #{vs.id}"
if vs.project_id:
try:
from database import SessionLocal
from sqlalchemy import select
from models import Project
async with SessionLocal() as db:
pr = await db.execute(
select(Project.project_name).where(Project.id == vs.project_id)
)
row = pr.first()
if row:
project_name = row[0]
except Exception:
pass
await notify_deploy_completed(
session_id=vs.id,
sr_id=vs.sr_id,
project_name=project_name,
success=success,
summary=summary or ("배포 성공" if success else "배포 실패"),
)
except Exception as exc:
logger.debug("A-3 배포완료 알림 오류: %s", exc)