guardia-itsm/core/cicd.py
DESKTOP-TKLFCPRython 64c27c3509 feat(itsm): G-1~G-12 확장 기능 + 하네스/봇/설치스크립트 구현
G-1: 메신저 Webhook Relay + _send_to_room 실제 httpx 호출 구현
G-2: POST /api/tasks/bulk SR 대량작업 엔드포인트 (최대 100건)
G-3: 라이선스 만료 알림 스케줄러 (매일 09:00 KST)
G-4: 체험판 upgrade_banner 필드 + license.py 배너 로직
G-5: core/auto_rca.py + incidents/problem auto-rca 엔드포인트
G-6: core/deploy_impact.py + vibe impact-analysis 엔드포인트
G-7: core/ticket_classifier.py + SR 생성 시 AI 분류 + ai-suggestion API
G-8: VulnPatchRecord 모델 + vuln_scan 패치추적 4개 엔드포인트
G-9: core/jira_sync.py + gateway Jira/Confluence 연동 엔드포인트
G-10: core/push_notify.py + routers/push.py + PushSubscription 모델
G-11: approvals 다중승인 (위임/서명/기한초과/마감연장)
G-12: alembic.ini + migrations/ + cicd/migrate_to_postgres.sh

하네스: guardia-orchestrator 확장기능 Phase 반영
봇명령어: /sr /status /license /bulk 슬래시 명령어 추가
설치스크립트: setup/ (Ubuntu, CentOS, RHEL, Windows) --test 옵션 포함

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 18:18:52 +09:00

611 lines
22 KiB
Python

"""
Jenkins CI/CD 연동 클라이언트.
GUARDiA ITSM → Jenkins REST API 호출 및 콜백 처리.
보안: 서버 IP, 자격증명은 환경변수에서만 로드 — 코드에 하드코딩 금지.
"""
from __future__ import annotations
import logging
import os
from typing import Any, Optional
import httpx
logger = logging.getLogger(__name__)
# ── 환경변수 ──────────────────────────────────────────────────────────────────
_JENKINS_URL = os.getenv("JENKINS_URL", "http://jenkins.agency.go.kr:8080")
_JENKINS_USER = os.getenv("JENKINS_USER", "itsm-bot")
_JENKINS_TOKEN = os.getenv("JENKINS_TOKEN", "")
# Jenkins → ITSM 콜백을 인증할 공유 시크릿
_CALLBACK_SECRET = os.getenv("JENKINS_CALLBACK_SECRET", "")
# HTTP 타임아웃 (초)
_TIMEOUT = float(os.getenv("JENKINS_TIMEOUT", "30"))
# ── 유틸 ─────────────────────────────────────────────────────────────────────
def _auth() -> tuple[str, str]:
if not _JENKINS_TOKEN:
raise RuntimeError(
"JENKINS_TOKEN 환경변수가 설정되지 않았습니다. "
"Jenkins API 토큰을 .env에 등록하세요."
)
return (_JENKINS_USER, _JENKINS_TOKEN)
def _sanitize_error(exc: Exception) -> str:
"""에러 메시지에서 IP·계정·비밀번호 패턴 제거."""
import re
msg = str(exc)
msg = re.sub(r"\b\d{1,3}(?:\.\d{1,3}){3}\b", "<IP>", msg)
msg = re.sub(r"(?i)(password|token|secret|key)\s*[=:]\s*\S+", r"\1=<hidden>", msg)
return msg
# ── 파이프라인 트리거 ────────────────────────────────────────────────────────
async def trigger_pipeline(
job_name: str,
session_id: int,
sr_id: str,
deploy_env: str = "dev",
target_server: str = "",
skip_test: bool = False,
extra_params: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
"""
Jenkins 파이프라인 빌드 트리거.
Returns:
{"queued": True, "queue_id": "<숫자>", "queue_url": "<URL>"}
Raises:
RuntimeError: 트리거 실패 시
"""
url = f"{_JENKINS_URL}/job/{job_name}/buildWithParameters"
params: dict[str, str] = {
"ITSM_SESSION_ID": str(session_id),
"ITSM_SR_ID": sr_id,
"DEPLOY_ENV": deploy_env,
"TARGET_SERVER": target_server,
"SKIP_TEST": str(skip_test).lower(),
}
if extra_params:
params.update({k: str(v) for k, v in extra_params.items()})
logger.info(
"Jenkins 파이프라인 트리거: job=%s session_id=%s env=%s",
job_name, session_id, deploy_env,
)
try:
async with httpx.AsyncClient(timeout=_TIMEOUT) as client:
resp = await client.post(url, auth=_auth(), params=params)
except httpx.ConnectError as exc:
raise RuntimeError(f"Jenkins 서버 연결 실패: {_sanitize_error(exc)}") from exc
except httpx.TimeoutException as exc:
raise RuntimeError(f"Jenkins 요청 타임아웃 ({_TIMEOUT}s)") from exc
except Exception as exc:
raise RuntimeError(f"Jenkins 트리거 오류: {_sanitize_error(exc)}") from exc
if resp.status_code not in (200, 201):
raise RuntimeError(
f"Jenkins 트리거 실패: HTTP {resp.status_code} "
f"(job={job_name})"
)
# 큐 ID는 Location 헤더에서 추출 (예: /queue/item/42/)
location = resp.headers.get("Location", "")
queue_id: Optional[str] = None
if location:
parts = location.rstrip("/").split("/")
queue_id = parts[-1] if parts[-1].isdigit() else None
logger.info("Jenkins 파이프라인 큐 등록: queue_id=%s", queue_id)
return {
"queued": True,
"queue_id": queue_id,
"queue_url": location,
"job_name": job_name,
"session_id": session_id,
}
# ── 빌드 상태 조회 ───────────────────────────────────────────────────────────
async def get_build_status(job_name: str, build_number: int) -> dict[str, Any]:
"""
특정 빌드의 상태 조회.
Returns:
{"result": "SUCCESS|FAILURE|ABORTED|null", "building": bool, ...}
"""
url = f"{_JENKINS_URL}/job/{job_name}/{build_number}/api/json"
try:
async with httpx.AsyncClient(timeout=_TIMEOUT) as client:
resp = await client.get(url, auth=_auth())
resp.raise_for_status()
data = resp.json()
return {
"result": data.get("result"),
"building": data.get("building", False),
"duration_ms": data.get("duration", 0),
"timestamp": data.get("timestamp"),
"url": data.get("url", ""),
"display_name": data.get("displayName", ""),
}
except httpx.HTTPStatusError as exc:
if exc.response.status_code == 404:
raise RuntimeError(f"빌드를 찾을 수 없습니다: {job_name} #{build_number}") from exc
raise RuntimeError(f"빌드 상태 조회 실패: {_sanitize_error(exc)}") from exc
except Exception as exc:
raise RuntimeError(f"빌드 상태 조회 오류: {_sanitize_error(exc)}") from exc
# ── 큐 아이템 → 빌드 번호 조회 ──────────────────────────────────────────────
async def get_queue_item(queue_id: str) -> dict[str, Any]:
"""
큐 아이템에서 실제 빌드 번호를 조회.
빌드가 시작되면 executable.number 필드가 채워진다.
"""
url = f"{_JENKINS_URL}/queue/item/{queue_id}/api/json"
try:
async with httpx.AsyncClient(timeout=_TIMEOUT) as client:
resp = await client.get(url, auth=_auth())
resp.raise_for_status()
data = resp.json()
executable = data.get("executable") or {}
return {
"queue_id": queue_id,
"build_number": executable.get("number"),
"build_url": executable.get("url", ""),
"blocked": data.get("blocked", False),
"why": data.get("why", ""),
}
except Exception as exc:
raise RuntimeError(f"큐 아이템 조회 오류: {_sanitize_error(exc)}") from exc
# ── 빌드 취소 ────────────────────────────────────────────────────────────────
async def abort_build(job_name: str, build_number: int) -> bool:
"""실행 중인 빌드를 강제 중단."""
url = f"{_JENKINS_URL}/job/{job_name}/{build_number}/stop"
try:
async with httpx.AsyncClient(timeout=_TIMEOUT) as client:
resp = await client.post(url, auth=_auth())
return resp.status_code in (200, 302)
except Exception as exc:
logger.warning("빌드 중단 실패: %s", _sanitize_error(exc))
return False
# ── Jenkins 연결 테스트 ───────────────────────────────────────────────────────
async def ping() -> dict[str, Any]:
"""Jenkins 서버 연결 상태 확인."""
url = f"{_JENKINS_URL}/api/json"
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(url, auth=_auth())
resp.raise_for_status()
data = resp.json()
return {
"ok": True,
"version": resp.headers.get("X-Jenkins", "unknown"),
"mode": data.get("mode", ""),
"num_executors": data.get("numExecutors", 0),
}
except Exception as exc:
return {"ok": False, "error": _sanitize_error(exc)}
# ── 콜백 인증 검증 ────────────────────────────────────────────────────────────
def verify_callback_secret(received_secret: str) -> bool:
"""
Jenkins에서 보낸 콜백 시크릿을 검증.
JENKINS_CALLBACK_SECRET이 설정되지 않으면 검증 생략 (개발용).
"""
if not _CALLBACK_SECRET:
logger.warning("JENKINS_CALLBACK_SECRET 미설정 — 콜백 인증 건너뜀 (개발 전용)")
return True
import hmac
return hmac.compare_digest(_CALLBACK_SECRET, received_secret)
# ── SonarQube 연동 ────────────────────────────────────────────────────────────
_SONAR_URL = os.getenv("SONARQUBE_URL", "http://sonarqube.agency.go.kr:9000")
_SONAR_TOKEN = os.getenv("SONARQUBE_TOKEN", "")
class SonarStatus(str):
OK = "OK"
WARN = "WARN"
ERROR = "ERROR"
NONE = "NONE"
class SonarResult:
"""SonarQube Quality Gate 결과 데이터 클래스."""
__slots__ = (
"project_key", "status", "conditions",
"raw_url", "error_msg",
)
def __init__(
self,
project_key: str,
status: str,
conditions: list[dict[str, Any]],
raw_url: str = "",
error_msg: str = "",
) -> None:
self.project_key = project_key
self.status = status # OK | WARN | ERROR | NONE
self.conditions = conditions # [{"metric": ..., "status": ..., "actual": ...}]
self.raw_url = raw_url
self.error_msg = error_msg
@property
def is_ok(self) -> bool:
return self.status == "OK"
@property
def failed_conditions(self) -> list[dict[str, Any]]:
return [c for c in self.conditions if c.get("status") == "ERROR"]
def to_dict(self) -> dict[str, Any]:
return {
"project_key": self.project_key,
"status": self.status,
"conditions": self.conditions,
"failed_conditions": self.failed_conditions,
"raw_url": self.raw_url,
"error_msg": self.error_msg,
}
async def _get_vibe_session(session_id: int) -> Any:
"""VibeSession 레코드 조회 헬퍼."""
from sqlalchemy import select
from database import SessionLocal
from models import VibeSession
async with SessionLocal() as db:
return (await db.execute(
select(VibeSession).where(VibeSession.id == session_id)
)).scalars().first()
async def _resolve_job_name(session_id: int) -> Optional[str]:
"""
VibeSession.project_id -> Project.jenkins_job_name 으로
Jenkins 잡 이름을 조회합니다.
project_id 없거나 jenkins_job_name 미설정 시 None 반환.
"""
from sqlalchemy import select
from database import SessionLocal
from models import VibeSession, Project
async with SessionLocal() as db:
vs = (await db.execute(
select(VibeSession).where(VibeSession.id == session_id)
)).scalars().first()
if not vs or not vs.project_id:
return None
proj = (await db.execute(
select(Project).where(Project.id == vs.project_id)
)).scalars().first()
if not proj:
return None
return getattr(proj, "jenkins_job_name", None)
async def get_sonarqube_result(project_key: str) -> SonarResult:
"""
SonarQube Quality Gate 상태 조회.
API: GET {SONARQUBE_URL}/api/qualitygates/project_status?projectKey={project_key}
Returns:
SonarResult(status="OK"|"WARN"|"ERROR"|"NONE", conditions=[...])
Raises:
RuntimeError: API 호출 실패 시
"""
if not _SONAR_TOKEN:
logger.warning("SONARQUBE_TOKEN 미설정 — SonarQube 연동 불가")
return SonarResult(project_key=project_key, status="NONE", conditions=[],
error_msg="SONARQUBE_TOKEN 미설정")
url = f"{_SONAR_URL}/api/qualitygates/project_status"
headers = {"Authorization": f"Bearer {_SONAR_TOKEN}"}
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.get(url, headers=headers,
params={"projectKey": project_key})
resp.raise_for_status()
data = resp.json()
except httpx.HTTPStatusError as exc:
if exc.response.status_code == 404:
return SonarResult(project_key=project_key, status="NONE", conditions=[],
error_msg=f"프로젝트를 찾을 수 없습니다: {project_key}")
raise RuntimeError(f"SonarQube API 오류: HTTP {exc.response.status_code}") from exc
except Exception as exc:
raise RuntimeError(f"SonarQube 조회 실패: {_sanitize_error(exc)}") from exc
pj_status = data.get("projectStatus", {})
gate_status = pj_status.get("status", "NONE") # OK | WARN | ERROR | NONE
# conditions 정규화
raw_conditions = pj_status.get("conditions", [])
conditions: list[dict[str, Any]] = []
for c in raw_conditions:
conditions.append({
"metric": c.get("metricKey", ""),
"status": c.get("status", ""),
"actual": c.get("actualValue", ""),
"error_threshold": c.get("errorThreshold", ""),
"comparator": c.get("comparator", ""),
})
raw_url = f"{_SONAR_URL}/dashboard?id={project_key}"
logger.info("SonarQube QG: project=%s status=%s", project_key, gate_status)
return SonarResult(
project_key = project_key,
status = gate_status,
conditions = conditions,
raw_url = raw_url,
)
async def handle_sonar_gate_failure(
session_id: int,
result: SonarResult,
db: Any, # AsyncSession — Any to avoid circular import
) -> Optional[str]:
"""
SonarQube Quality Gate ERROR 시 SR 자동 생성.
조건:
- result.status == "ERROR"
- 동일 session_id + 오늘 날짜 SR이 없을 때만 생성 (중복 방지)
inst_id 조회 경로:
VibeSession.sr_id -> SRRequest.inst_id
Returns:
생성된 SR ID 또는 None (스킵된 경우)
"""
if result.status != "ERROR":
return None
from datetime import date
from uuid import uuid4
from sqlalchemy import select
from models import SRRequest, SRStatus, SRType, Priority, VibeSession
# VibeSession 조회
vs_row = (await db.execute(
select(VibeSession).where(VibeSession.id == session_id)
)).scalars().first()
if not vs_row:
logger.warning("SonarQube: VibeSession session_id=%s 없음 — SR 미생성", session_id)
return None
# inst_id: VibeSession.sr_id -> SRRequest.inst_id
inst_id: Optional[int] = None
if vs_row.sr_id:
sr_parent = (await db.execute(
select(SRRequest).where(SRRequest.sr_id == vs_row.sr_id)
)).scalars().first()
if sr_parent:
inst_id = sr_parent.inst_id
# 중복 SR 방지: 오늘 동일 session_id에 대한 SONAR SR 확인
today_prefix = f"SONAR-{date.today().strftime('%Y%m%d')}-{session_id}-"
dup = (await db.execute(
select(SRRequest).where(SRRequest.sr_id.startswith(today_prefix))
)).scalars().first()
if dup:
logger.info("SonarQube SR 중복 스킵: %s", dup.sr_id)
return dup.sr_id
sr_id = f"SONAR-{date.today().strftime('%Y%m%d')}-{session_id}-{str(uuid4())[:4].upper()}"
# 실패한 조건 요약
failed = result.failed_conditions
condition_summary = "\n".join(
f" - {c['metric']}: {c['actual']} (기준: {c['comparator']} {c['error_threshold']})"
for c in failed
) if failed else " (상세 없음)"
description = (
f"SonarQube Quality Gate 실패\n"
f"프로젝트: {result.project_key}\n"
f"상태: {result.status}\n"
f"실패 조건:\n{condition_summary}\n\n"
f"SonarQube 대시보드: {result.raw_url}\n"
f"배포 세션: {session_id}"
)
sr = SRRequest(
sr_id = sr_id,
sr_type = SRType.OTHER,
priority = Priority.HIGH,
status = SRStatus.RECEIVED,
title = f"[SonarQube] Quality Gate 실패 - {result.project_key}",
description = description,
inst_id = inst_id,
requested_by = "sonarqube-bot",
)
db.add(sr)
await db.commit()
await db.refresh(sr)
logger.warning(
"SonarQube Quality Gate 실패 SR 생성: sr_id=%s project=%s",
sr_id, result.project_key,
)
return sr_id
# ── Jenkins 빌드 로그 SSE 스트리밍 ─────────────────────────────────────────────
async def get_progressive_log(
session_id: int,
offset: int = 0,
) -> tuple[str, int]:
"""
Jenkins progressive log 폴링.
Jenkins API: GET {job_url}/logText/progressiveText?start={offset}
응답 헤더의 X-Text-Size 가 다음 offset.
job_name 조회 경로:
VibeSession.project_id -> Project.jenkins_job_name
build_number: VibeSession 에 별도 컬럼 없음 -> "lastBuild" 사용
Returns:
(log_chunk: str, next_offset: int)
"""
vs = await _get_vibe_session(session_id)
if not vs:
return ("VibeSession을 찾을 수 없습니다.", offset)
job_name = await _resolve_job_name(session_id)
if not job_name:
return ("[빌드 정보 없음 - project.jenkins_job_name 미설정]", offset)
# build_number: VibeSession 에 별도 저장하지 않으므로 lastBuild 사용
# (트리거 후 최신 빌드 = 현재 진행 중인 빌드)
build_ref = "lastBuild"
url = f"{_JENKINS_URL}/job/{job_name}/{build_ref}/logText/progressiveText"
params = {"start": str(offset)}
try:
async with httpx.AsyncClient(timeout=_TIMEOUT) as client:
resp = await client.get(url, auth=_auth(), params=params)
if resp.status_code == 404:
return ("[로그를 찾을 수 없습니다]", offset)
resp.raise_for_status()
chunk = resp.text
next_offset = int(resp.headers.get("X-Text-Size", offset + len(chunk.encode())))
return (chunk, next_offset)
except Exception as exc:
logger.warning("Jenkins 로그 조회 실패: %s", _sanitize_error(exc))
return (f"[로그 조회 오류: {_sanitize_error(exc)}]", offset)
async def is_build_complete(session_id: int) -> bool:
"""
Jenkins 빌드가 완료(또는 오류)되었는지 확인.
VibeSession.project_id -> Project.jenkins_job_name 으로 job 조회.
lastBuild 기준으로 building 상태를 확인.
Returns:
True — building=False (완료/실패/중단) 또는 VibeSession 세션이 최종 상태
False — building=True 또는 정보 없음
"""
from models import VibeSession, VibeSessionStatus
vs = await _get_vibe_session(session_id)
if not vs:
return True # 세션 없음 → 스트리밍 종료
# VibeSession 자체가 최종 상태면 빌드 완료로 간주
final_statuses = {
VibeSessionStatus.COMPLETED,
VibeSessionStatus.FAILED,
VibeSessionStatus.CANCELLED,
}
if vs.status in final_statuses:
return True
job_name = await _resolve_job_name(session_id)
if not job_name:
return False # 아직 job 정보 없음
try:
status = await get_build_status(job_name, "lastBuild")
return not status.get("building", True)
except Exception as exc:
logger.warning("빌드 상태 확인 실패: %s", _sanitize_error(exc))
return False
# ── 빌드 이력 조회 ────────────────────────────────────────────────────────────
class BuildInfo:
"""Jenkins 빌드 이력 항목."""
__slots__ = ("number", "result", "building", "duration_ms", "timestamp", "url")
def __init__(self, **kwargs: Any) -> None:
for k in self.__slots__:
setattr(self, k, kwargs.get(k))
def to_dict(self) -> dict[str, Any]:
return {k: getattr(self, k) for k in self.__slots__}
async def get_build_history(
project_name: str,
limit: int = 10,
) -> list[BuildInfo]:
"""
Jenkins 프로젝트의 최근 빌드 이력 조회.
Returns:
list[BuildInfo] (최신순)
"""
url = f"{_JENKINS_URL}/job/{project_name}/api/json"
params = {
"tree": (
"builds[number,result,building,duration,timestamp,url]"
f"{{0,{limit}}}"
)
}
try:
async with httpx.AsyncClient(timeout=_TIMEOUT) as client:
resp = await client.get(url, auth=_auth(), params=params)
resp.raise_for_status()
data = resp.json()
except Exception as exc:
raise RuntimeError(f"빌드 이력 조회 실패: {_sanitize_error(exc)}") from exc
builds: list[BuildInfo] = []
for b in data.get("builds", []):
builds.append(BuildInfo(
number = b.get("number"),
result = b.get("result"),
building = b.get("building", False),
duration_ms = b.get("duration", 0),
timestamp = b.get("timestamp"),
url = b.get("url", ""),
))
return builds
async def get_artifact_url(
project_name: str,
build_number: int,
artifact: str,
) -> str:
"""
Jenkins 특정 빌드의 아티팩트 다운로드 URL 생성.
artifact 예: "target/app.jar", "dist/app.zip"
Returns:
직접 다운로드 URL (인증 없이 접근 가능하도록 Jenkins 설정 필요)
"""
# artifact 경로 traversal 방지
import re
safe_artifact = re.sub(r"\.\./", "", artifact).lstrip("/")
return (
f"{_JENKINS_URL}/job/{project_name}/{build_number}"
f"/artifact/{safe_artifact}"
)