zioinfo-mail/workspace/guardia-itsm/routers/messenger.py
DESKTOP-TKLFCPR\ython cfe2901a55 refactor(structure): consolidate all projects under workspace/
- itsm/    -> workspace/guardia-itsm/
- manager/ -> workspace/guardia-manager/
- app/     -> workspace/guardia-messenger/
- manual/  -> workspace/guardia-docs/

workspace/zioinfo-web/ unchanged.
git mv preserves full commit history.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-31 23:50:56 +09:00

2223 lines
101 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
GUARDiA Messenger 연동 라우터.
역할:
1. ITSM에서 메신저로 보내는 outbound 알림 webhook 수신 (event dispatch)
2. 메신저 봇 → ITSM으로 들어오는 명령어 처리 (inbound bot commands)
봇 명령어:
!vibe <sr_id> [project_id] → 바이브 코딩 세션 시작
!build <session_id> → 빌드 실행
!deploy <session_id> → SSH 배포
!status <sr_id> → SR + 세션 상태 조회
!cancel <session_id> → 세션 취소
!sm <server> <script> → SM 스크립트 원격 실행
!health <server> → 시스템 헬스체크
!log <server> → 로그 분석
!help → 명령어 도움말
"""
import logging
import os
from typing import Any, Optional
import httpx
from fastapi import APIRouter, BackgroundTasks, Depends
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db, SessionLocal
from models import (
SRRequest, VibeSession, VibeSessionStatus, Project,
User, UserRole,
)
router = APIRouter(prefix="/api/messenger", tags=["messenger"])
logger = logging.getLogger(__name__)
MESSENGER_BASE_URL = os.getenv("MESSENGER_BASE_URL", "http://localhost:8002")
BOT_SEND_API = os.getenv("BOT_SEND_API", "")
# ── 스키마 ────────────────────────────────────────────────────────────────────
class MessengerEvent(BaseModel):
"""ITSM → 메신저 아웃바운드 이벤트."""
event: str
room: str = "ops"
sr_id: Optional[str] = None
title: Optional[str] = None
sr_type: Optional[str] = None
requested_by: Optional[str] = None
target_server: Optional[str] = None
result_summary: Optional[str] = None
# 바이브 세션 이벤트 추가 필드
session_id: Optional[int] = None
actor: Optional[str] = None
workspace: Optional[str] = None
success: Optional[bool] = None
summary: Optional[str] = None
class BotCommand(BaseModel):
"""메신저 봇 → ITSM 인바운드 명령."""
room: str
user: str
command: str # 예: "!vibe SR-20260525-ABCD" 또는 "!status SR-20260525-ABCD"
message: Optional[str] = None
class BotReply(BaseModel):
room: str
text: str
# ── 이벤트 수신 (ITSM outbound) ───────────────────────────────────────────────
@router.post("/webhook")
async def receive_event(
event: MessengerEvent,
bg: BackgroundTasks,
):
"""
ITSM 내부에서 발송하는 이벤트 수신 후 메신저 봇으로 relay.
실제 메신저 서버 연동 시 이 엔드포인트에서 외부 webhook 호출.
"""
logger.info("[Messenger] 이벤트 수신: %s | SR: %s", event.event, event.sr_id)
bg.add_task(_relay_event, event)
return {"ok": True}
async def _relay_event(event: MessengerEvent):
"""이벤트를 메신저 채널로 포맷팅하여 중계."""
msg = _format_event_message(event)
logger.info("[Messenger → %s] %s", event.room, msg[:100])
payload = {"room": event.room, "text": msg, "event": event.event}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
await client.post(f"{MESSENGER_BASE_URL}/api/webhook/itsm", json=payload)
except Exception:
pass # Fail-Safe: 메신저 미동작 시 ITSM 계속 진행
def _format_event_message(event: MessengerEvent) -> str:
"""이벤트 → 메신저 메시지 텍스트."""
if event.event == "sr_created":
return (
f"[신규 SR] {event.sr_id}\n"
f"제목: {event.title}\n"
f"유형: {event.sr_type} | 요청자: {event.requested_by}\n"
f"대상 서버: {event.target_server or ''}\n"
f"!vibe {event.sr_id} (바이브 코딩 시작)"
)
elif event.event == "itsm_complete":
return (
f"[SR 완료] {event.sr_id}\n"
f"제목: {event.title}\n"
f"결과: {event.result_summary or '완료'}"
)
elif event.event == "vibe_started":
return (
f"[바이브 세션 시작] Session #{event.session_id}\n"
f"SR: {event.sr_id or ''} | 담당: {event.actor or ''}\n"
f"워크스페이스: {event.workspace or ''}\n"
f"!build {event.session_id} (빌드 준비 완료 시)"
)
elif event.event == "build_result":
status = "성공" if event.success else "실패"
return (
f"[빌드 {status}] Session #{event.session_id}\n"
f"SR: {event.sr_id or ''}\n"
f"{event.summary or ''}\n"
+ (f"!deploy {event.session_id} (배포 진행)" if event.success else "")
)
elif event.event == "deploy_result":
status = "성공" if event.success else "실패"
return (
f"[배포 {status}] Session #{event.session_id}\n"
f"SR: {event.sr_id or ''}\n"
f"{event.summary or ''}"
)
elif event.event == "scrap_published":
return (
f"[스크랩 게시] {event.title or '제목 없음'}\n"
f"결과 ID: #{event.sr_id or ''}\n"
f"{event.result_summary or ''}"
)
else:
return f"[{event.event}] SR: {event.sr_id or ''}"
# ── 봇 명령어 처리 (inbound) ──────────────────────────────────────────────────
@router.post("/bot/nl", response_model=BotReply)
async def handle_nl_command(
cmd: BotCommand,
bg: BackgroundTasks,
db: AsyncSession = Depends(get_db),
):
"""
자연어 명령 처리 전용 엔드포인트.
자연어 → 명령어 변환 후 실행. 명시적 명령어도 처리 가능.
"""
from core.nl_command import parse_nl_command
parsed = await parse_nl_command(cmd.command.strip())
if not parsed.get("full_command") or parsed.get("confidence", 0) < 0.45:
return BotReply(
room=cmd.room,
text=(
f"❓ 요청을 이해하지 못했습니다.\n"
f"자연어 예시: '서버1 헬스체크 해줘', 'SR-2026-XXXX 배포해줘'\n"
f"!help 로 명령어 목록을 확인하세요."
),
)
# 파싱된 명령어를 BotCommand로 재생성해서 기존 핸들러 호출
nl_cmd = BotCommand(
room=cmd.room,
user=cmd.user,
command=parsed["full_command"],
message=f"[자연어→{parsed['command']}] {cmd.command}",
)
reply = await handle_bot_command(nl_cmd, bg, db)
# 신뢰도 낮으면 안내 메시지 추가
if parsed.get("confidence", 1.0) < 0.75:
reply.text = (
f"💬 자연어 해석: {parsed.get('explanation', '')}\n"
f"명령어: {parsed['full_command']}\n\n"
+ reply.text
)
return reply
@router.post("/bot/command", response_model=BotReply)
async def handle_bot_command(
cmd: BotCommand,
bg: BackgroundTasks,
db: AsyncSession = Depends(get_db),
):
"""
GUARDiA 메신저 봇에서 전달되는 명령어 처리.
명시적 명령어(!vibe, /sr 등)와 자연어 모두 처리.
"""
text = cmd.command.strip()
parts = text.split()
if not parts:
return BotReply(room=cmd.room, text="명령어를 입력해주세요. !help")
keyword = parts[0].lower()
# ── !help ──────────────────────────────────────────────────────────────────
if keyword == "!help":
return BotReply(room=cmd.room, text=_help_text())
# ── !vibe <sr_id> [project_id] ────────────────────────────────────────────
elif keyword == "!vibe":
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: !vibe <sr_id> [project_id]")
sr_id = parts[1]
project_id = int(parts[2]) if len(parts) >= 3 else None
bg.add_task(_cmd_start_vibe, cmd.room, cmd.user, sr_id, project_id)
return BotReply(room=cmd.room,
text=f"[바이브 세션] SR {sr_id} 에 대한 코딩 세션을 시작합니다...")
# ── !build <session_id> ───────────────────────────────────────────────────
elif keyword == "!build":
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: !build <session_id>")
try:
sid = int(parts[1])
except ValueError:
return BotReply(room=cmd.room, text="세션 ID는 숫자여야 합니다.")
bg.add_task(_cmd_build, cmd.room, cmd.user, sid)
return BotReply(room=cmd.room, text=f"[빌드] Session #{sid} 빌드를 시작합니다...")
# ── !deploy <session_id> ─────────────────────────────────────────────────
elif keyword == "!deploy":
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: !deploy <session_id>")
try:
sid = int(parts[1])
except ValueError:
return BotReply(room=cmd.room, text="세션 ID는 숫자여야 합니다.")
bg.add_task(_cmd_deploy, cmd.room, cmd.user, sid)
return BotReply(room=cmd.room, text=f"[배포] Session #{sid} 배포 파이프라인을 시작합니다...")
# ── !status <sr_id> ──────────────────────────────────────────────────────
elif keyword == "!status":
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: !status <sr_id>")
sr_id = parts[1]
reply = await _cmd_status(sr_id, db)
return BotReply(room=cmd.room, text=reply)
# ── !cancel <session_id> ─────────────────────────────────────────────────
elif keyword == "!cancel":
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: !cancel <session_id>")
try:
sid = int(parts[1])
except ValueError:
return BotReply(room=cmd.room, text="세션 ID는 숫자여야 합니다.")
reply = await _cmd_cancel(sid, db)
return BotReply(room=cmd.room, text=reply)
# ── !sm <server> <script> ─────────────────────────────────────────────────
elif keyword == "!sm":
if len(parts) < 3:
return BotReply(room=cmd.room, text="사용법: !sm <server명> <script키>")
server, script_key = parts[1], parts[2]
bg.add_task(_cmd_sm, cmd.room, cmd.user, server, script_key, None)
return BotReply(room=cmd.room,
text=f"[SM 점검] {server} 서버에 {script_key} 스크립트를 실행합니다...")
# ── !health <server> ─────────────────────────────────────────────────────
elif keyword == "!health":
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: !health <server명>")
server = parts[1]
bg.add_task(_cmd_sm, cmd.room, cmd.user, server, "system_health", None)
return BotReply(room=cmd.room,
text=f"[헬스체크] {server} 서버 시스템 점검을 시작합니다...")
# ── !log <server> [logfile] ───────────────────────────────────────────────
elif keyword == "!log":
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: !log <server명> [로그파일경로]")
server = parts[1]
env_vars = {}
if len(parts) >= 3:
env_vars["LOG_FILES"] = parts[2]
bg.add_task(_cmd_sm, cmd.room, cmd.user, server, "log_analysis", env_vars)
return BotReply(room=cmd.room,
text=f"[로그 분석] {server} 서버 로그 분석을 시작합니다...")
# ── /sr <제목> (슬래시 스타일 SR 접수) ──────────────────────────────────
elif keyword in ("/sr", "!sr"):
title = " ".join(parts[1:]) if len(parts) >= 2 else ""
if not title:
return BotReply(room=cmd.room, text="사용법: /sr <SR 제목>")
bg.add_task(_cmd_create_sr, cmd.room, cmd.user, title)
return BotReply(room=cmd.room, text=f"[SR 접수] '{title}' 처리 중...")
# ── /status (슬래시 스타일 시스템 현황) ─────────────────────────────────
elif keyword == "/status":
reply = await _cmd_dashboard_status()
return BotReply(room=cmd.room, text=reply)
# ── /license (슬래시 스타일 라이선스 조회) ──────────────────────────────
elif keyword == "/license":
reply = await _cmd_license_status(db)
return BotReply(room=cmd.room, text=reply)
# ── /bulk <action> <sr_ids...> ───────────────────────────────────────────
elif keyword in ("/bulk", "!bulk"):
if len(parts) < 3:
return BotReply(room=cmd.room, text="사용법: /bulk <action> <SR-ID1> [SR-ID2 ...]\naction: close|assign|status")
action = parts[1].upper()
sr_ids = parts[2:]
bg.add_task(_cmd_bulk_action, cmd.room, cmd.user, action, sr_ids)
return BotReply(room=cmd.room, text=f"[대량처리] {len(sr_ids)}{action} 처리 중...")
# ── /report <프로젝트코드> [daily|weekly|monthly] ─────────────────────────
elif keyword in ("/report", "!report"):
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: /report <프로젝트코드> [daily|weekly|monthly]")
proj_code = parts[1]
rtype = parts[2].lower() if len(parts) >= 3 else "weekly"
if rtype not in ("daily", "weekly", "monthly"):
return BotReply(room=cmd.room, text="보고서 유형: daily | weekly | monthly")
bg.add_task(_cmd_project_report, cmd.room, cmd.user, proj_code, rtype, db)
rtype_ko = {"daily": "일일", "weekly": "주간", "monthly": "월간"}.get(rtype, rtype)
return BotReply(room=cmd.room, text=f"[{rtype_ko}보고서] {proj_code} 보고서 생성 중...")
# ── /pms <프로젝트코드> ─── 프로젝트 현황 요약 ───────────────────────────
elif keyword in ("/pms", "!pms"):
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: /pms <프로젝트코드>")
proj_code = parts[1]
reply = await _cmd_pms_status(proj_code, db)
return BotReply(room=cmd.room, text=reply)
# ── /deliverables <프로젝트코드> ─── 산출물 현황 ────────────────────────
elif keyword in ("/deliverables", "/산출물", "!deliverables"):
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: /deliverables <프로젝트코드>")
proj_code = parts[1]
reply = await _cmd_deliverables(proj_code, db)
return BotReply(room=cmd.room, text=reply)
# ── /scan ─── 시큐어코딩 / 보안 스캔 ────────────────────────────────────
elif keyword in ("/scan", "!scan"):
bg.add_task(_cmd_compliance_scan, cmd.room, cmd.user)
return BotReply(room=cmd.room, text="[보안 스캔] 시큐어코딩/웹접근성/개인정보 점검 시작...")
# ── /checklist ─── 공공기관 체크리스트 현황 ──────────────────────────────
elif keyword in ("/checklist", "!checklist"):
reply = await _cmd_checklist_status()
return BotReply(room=cmd.room, text=reply)
# ── /perf [url] ─── 성능 테스트 ─────────────────────────────────────────
elif keyword in ("/perf", "!perf"):
target = parts[1] if len(parts) >= 2 else "http://localhost:8001"
bg.add_task(_cmd_perf_test, cmd.room, cmd.user, target)
return BotReply(room=cmd.room, text=f"[성능 테스트] {target} 대상 10명/30초 부하 테스트 시작...")
# ── /issues <프로젝트코드> ─── 미결 이슈 목록 ────────────────────────────
elif keyword in ("/issues", "!issues"):
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: /issues <프로젝트코드>")
proj_code = parts[1]
reply = await _cmd_open_issues(proj_code, db)
return BotReply(room=cmd.room, text=reply)
# ── /oncall ─── 현재 당직자 조회 ────────────────────────────────────────
elif keyword in ("/oncall", "!oncall"):
reply = await _cmd_oncall()
return BotReply(room=cmd.room, text=reply)
# ── !scrap ─── 웹 스크랩핑 봇 ───────────────────────────────────────────
elif keyword in ("!scrap", "/scrap"):
if len(parts) < 2:
return BotReply(room=cmd.room, text=(
"사용법:\n"
" !scrap <url> → 즉시 스크랩\n"
" !scrap list [n] → 최근 n개 결과\n"
" !scrap publish <id> → 게시 + 메신저 알림\n"
" !scrap del <id> → 삭제\n"
" !scrap restore <id> → 원복\n"
" !scrap status <id> → 상세 조회"
))
sub = parts[1].lower()
if sub == "list":
n = int(parts[2]) if len(parts) >= 3 and parts[2].isdigit() else 5
reply = await _cmd_scrap_list(n)
return BotReply(room=cmd.room, text=reply)
elif sub == "publish":
if len(parts) < 3 or not parts[2].isdigit():
return BotReply(room=cmd.room, text="사용법: !scrap publish <id>")
bg.add_task(_cmd_scrap_publish, cmd.room, cmd.user, int(parts[2]))
return BotReply(room=cmd.room, text=f"[스크랩 게시] #{parts[2]} 게시 처리 중...")
elif sub in ("del", "delete"):
if len(parts) < 3 or not parts[2].isdigit():
return BotReply(room=cmd.room, text="사용법: !scrap del <id>")
reply = await _cmd_scrap_delete(int(parts[2]))
return BotReply(room=cmd.room, text=reply)
elif sub == "restore":
if len(parts) < 3 or not parts[2].isdigit():
return BotReply(room=cmd.room, text="사용법: !scrap restore <id>")
reply = await _cmd_scrap_restore(int(parts[2]))
return BotReply(room=cmd.room, text=reply)
elif sub == "status":
if len(parts) < 3 or not parts[2].isdigit():
return BotReply(room=cmd.room, text="사용법: !scrap status <id>")
reply = await _cmd_scrap_status(int(parts[2]))
return BotReply(room=cmd.room, text=reply)
else:
# !scrap <url> 형식
url = parts[1]
bg.add_task(_cmd_scrap_url, cmd.room, cmd.user, url)
return BotReply(room=cmd.room, text=f"[스크랩] {url} 수집 중...")
# ── /incident <제목> [P1|P2|P3|P4] ─── 인시던트 빠른 등록 ───────────────
elif keyword in ("/incident", "!incident", "/inc"):
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: /incident <제목> [P1|P2|P3|P4]")
grade = parts[-1].upper() if parts[-1].upper() in ("P1","P2","P3","P4") else "P3"
title_parts = parts[1:-1] if grade in ("P1","P2","P3","P4") and len(parts) > 2 else parts[1:]
title = " ".join(title_parts)
bg.add_task(_cmd_create_incident, cmd.room, cmd.user, title, grade)
return BotReply(room=cmd.room, text=f"[{grade} 인시던트] '{title}' 등록 중...")
# ── /rca <인시던트ID> ─── AI 자동 RCA 분석 ───────────────────────────────
elif keyword in ("/rca", "!rca"):
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: /rca <INC-ID>")
inc_id = parts[1]
bg.add_task(_cmd_auto_rca, cmd.room, cmd.user, inc_id)
return BotReply(room=cmd.room, text=f"[AI RCA] {inc_id} 자동 근본원인 분석 중...")
# ── /escalate <SR-ID> ─── SR 에스컬레이션 ───────────────────────────────
elif keyword in ("/escalate", "!escalate"):
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: /escalate <SR-ID>")
sr_id = parts[1]
bg.add_task(_cmd_escalate, cmd.room, cmd.user, sr_id)
return BotReply(room=cmd.room, text=f"[에스컬레이션] {sr_id} 당직자에게 즉시 전달 중...")
# ── /sla ─── SLA 위반 현황 ───────────────────────────────────────────────
elif keyword in ("/sla", "!sla"):
reply = await _cmd_sla_violations()
return BotReply(room=cmd.room, text=reply)
# ── /assign <SR-ID> <담당자> ─── SR 담당자 배정 ──────────────────────────
elif keyword in ("/assign", "!assign"):
if len(parts) < 3:
return BotReply(room=cmd.room, text="사용법: /assign <SR-ID> <담당자>")
sr_id = parts[1]; assignee = parts[2]
reply = await _cmd_assign_sr(sr_id, assignee, cmd.user, db)
return BotReply(room=cmd.room, text=reply)
# ── /approve <SR-ID> ─── SR 즉시 승인 ───────────────────────────────────
elif keyword in ("/approve", "!approve"):
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: /approve <SR-ID>")
sr_id = parts[1]
comment = " ".join(parts[2:]) if len(parts) > 2 else "봇 승인"
reply = await _cmd_approve_sr(sr_id, cmd.user, comment, db)
return BotReply(room=cmd.room, text=reply)
# ── /reject <SR-ID> [사유] ─── SR 반려 ───────────────────────────────────
elif keyword in ("/reject", "!reject"):
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: /reject <SR-ID> [사유]")
sr_id = parts[1]
reason = " ".join(parts[2:]) if len(parts) > 2 else "봇 반려"
reply = await _cmd_reject_sr(sr_id, cmd.user, reason, db)
return BotReply(room=cmd.room, text=reply)
# ── /kb <검색어> ─── KB 문서 검색 ───────────────────────────────────────
elif keyword in ("/kb", "!kb"):
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: /kb <검색어>")
query = " ".join(parts[1:])
reply = await _cmd_kb_search(query, db)
return BotReply(room=cmd.room, text=reply)
# ── /wbs <프로젝트코드> ─── WBS 지연 현황 ───────────────────────────────
elif keyword in ("/wbs", "!wbs"):
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: /wbs <프로젝트코드>")
proj_code = parts[1]
reply = await _cmd_wbs_status(proj_code, db)
return BotReply(room=cmd.room, text=reply)
# ── /scouter <서버명> ─── Scouter APM 실시간 메트릭 ─────────────────────
elif keyword in ("/scouter", "!scouter"):
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: /scouter <서버명>")
server_name = parts[1]
reply = await _cmd_scouter_metrics(server_name)
return BotReply(room=cmd.room, text=reply)
# ── /rollback <세션ID> ─── 배포 긴급 롤백 ───────────────────────────────
elif keyword in ("/rollback", "!rollback"):
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: /rollback <세션ID>")
try:
sid = int(parts[1])
except ValueError:
return BotReply(room=cmd.room, text="세션 ID는 숫자여야 합니다.")
bg.add_task(_cmd_rollback, cmd.room, cmd.user, sid)
return BotReply(room=cmd.room, text=f"[긴급 롤백] Session #{sid} 롤백 시작...")
# ── /notify <메시지> ─── 운영팀 전체 공지 ───────────────────────────────
elif keyword in ("/notify", "!notify"):
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: /notify <메시지>")
message = " ".join(parts[1:])
bg.add_task(_cmd_broadcast_notify, cmd.room, cmd.user, message)
return BotReply(room=cmd.room, text=f"[공지] 운영팀 전체에 메시지 발송 중...")
# ── /topology <서버명> ─── CI 의존관계 조회 ─────────────────────────────
elif keyword in ("/topology", "!topology"):
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: /topology <서버명>")
server_name = parts[1]
reply = await _cmd_topology(server_name, db)
return BotReply(room=cmd.room, text=reply)
# ── /vuln <서버명> ─── 서버 취약점 스캔 ────────────────────────────────
elif keyword in ("/vuln", "!vuln"):
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: /vuln <서버명|IP>")
target = parts[1]
bg.add_task(_cmd_vuln_scan, cmd.room, cmd.user, target)
return BotReply(room=cmd.room, text=f"[취약점 스캔] {target} 스캔 시작 (약 30초 소요)...")
# ── /approve <action_id> ─── 자동처리 승인 ──────────────────────────────
elif keyword in ("/approve", "!approve"):
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: /approve <작업ID>\n예) /approve ACT-3F2A1B2C")
action_id = parts[1].upper()
comment = " ".join(parts[2:]) if len(parts) > 2 else None
reply = await _cmd_approve_action(action_id, cmd.user, comment, db)
return BotReply(room=cmd.room, text=reply)
# ── /reject <action_id> [사유] ─── 자동처리 거부 ────────────────────────
elif keyword in ("/reject", "!reject"):
if len(parts) < 2:
return BotReply(room=cmd.room, text="사용법: /reject <작업ID> [사유]\n예) /reject ACT-3F2A1B2C 시간 부적절")
action_id = parts[1].upper()
reason = " ".join(parts[2:]) if len(parts) > 2 else "거부됨"
reply = await _cmd_reject_action(action_id, cmd.user, reason, db)
return BotReply(room=cmd.room, text=reply)
# ── /autoq ─── 승인 대기 큐 조회 ────────────────────────────────────────
elif keyword in ("/autoq", "!autoq", "/queue"):
reply = await _cmd_auto_queue(db)
return BotReply(room=cmd.room, text=reply)
# ── /cicd [project] ─── CI/CD 전체 현황 ─────────────────────────────────
elif keyword in ("/cicd", "!cicd"):
project = parts[1] if len(parts) > 1 else None
reply = await _cmd_cicd_status(project)
return BotReply(room=cmd.room, text=reply)
# ── /jenkins <job> [build|status|log] ─── Jenkins 제어 ──────────────────
elif keyword in ("/jenkins", "!jenkins"):
if len(parts) < 2:
return BotReply(room=cmd.room,
text="사용법: /jenkins <job명> [build|status|log]\n"
"예) /jenkins guardia-itsm build\n"
"예) /jenkins guardia-itsm status")
job = parts[1]
action = parts[2].lower() if len(parts) > 2 else "status"
if action == "build":
bg.add_task(_cmd_jenkins_trigger, cmd.room, cmd.user, job)
return BotReply(room=cmd.room, text=f"[Jenkins] {job} 빌드 트리거 요청 중...")
else:
reply = await _cmd_jenkins_status(job, action)
return BotReply(room=cmd.room, text=reply)
# ── /git <repo> [branch|pr|log] ─── Gitea 저장소 상태 ───────────────────
elif keyword in ("/git", "!git"):
if len(parts) < 2:
return BotReply(room=cmd.room,
text="사용법: /git <저장소> [branch|pr|log]\n"
"예) /git guardia-itsm log\n"
"예) /git zioinfo-web pr")
repo = parts[1]
action = parts[2].lower() if len(parts) > 2 else "log"
reply = await _cmd_gitea_status(repo, action)
return BotReply(room=cmd.room, text=reply)
# ── /design ─── 디자인 리뉴얼 봇 ────────────────────────────────────────
elif keyword in ("/design", "!design"):
sub = parts[1].lower() if len(parts) >= 2 else "help"
design_cmds = {
"capture": "Playwright MCP로 현재 UI Before 스크린샷 캡처",
"tokens": "통합 디자인 토큰(tokens.css) 생성 → 4개 시스템 적용",
"qa": "Before/After 시각적 QA + 반응형 검증",
"homepage": "홈페이지 컴포넌트 리팩토링 시작",
"itsm": "ITSM UI 현대화 시작",
"manager": "Manager 디자인 개편 시작",
"app": "Messenger 앱 디자인 개편 시작",
}
if sub == "variant":
query = " ".join(parts[2:]) if len(parts) >= 3 else "enterprise dashboard"
return BotReply(room=cmd.room,
text=f"[디자인 봇] Variant 탐색: '{query}'\n"
f"→ playwright-visual-capture 스킬로 variant.com/community 탐색")
elif sub == "ab":
comp = parts[2] if len(parts) >= 3 else "button"
return BotReply(room=cmd.room,
text=f"[디자인 봇] {comp} A/B 테스트 컴포넌트 → component-refactor 스킬 실행")
elif sub in design_cmds:
return BotReply(room=cmd.room,
text=f"[디자인 봇] {design_cmds[sub]}\n"
f"→ ui-overhaul-orchestrator 스킬 Phase: {sub}")
else:
return BotReply(room=cmd.room, text=(
"[디자인 리뉴얼 봇] 명령어 목록\n"
"━━━━━━━━━━━━━━━━━━━━━\n"
"/design capture → 현재 UI Before 스크린샷\n"
"/design variant <검색어> → Variant 디자인 탐색\n"
"/design tokens → 통합 디자인 토큰 생성\n"
"/design homepage → 홈페이지 컴포넌트 개편\n"
"/design itsm → ITSM UI 현대화\n"
"/design manager → Manager 디자인 개편\n"
"/design app → Messenger 앱 개편\n"
"/design qa → Before/After 시각적 QA\n"
"/design ab <컴포넌트> → A/B 테스트 버전 생성"
))
# ── /release <project> [version] ─── 릴리즈 배포 트리거 ─────────────────
elif keyword in ("/release", "!release"):
if len(parts) < 2:
return BotReply(room=cmd.room,
text="사용법: /release <프로젝트명> [버전]\n"
"예) /release guardia-itsm v2.1.0")
project = parts[1]
version = parts[2] if len(parts) > 2 else "latest"
bg.add_task(_cmd_release, cmd.room, cmd.user, project, version)
return BotReply(room=cmd.room,
text=f"[릴리즈] {project} {version} 배포 파이프라인 시작...")
# ── /help ─────────────────────────────────────────────────────────────────
elif keyword == "/help":
return BotReply(room=cmd.room, text=_help_text())
else:
# ── 자연어 처리 폴백 ────────────────────────────────────────────────────
# 명시적 명령어가 아닌 경우 NL → 명령어 파싱 시도
return await _handle_natural_language(cmd, bg, db)
# ── 백그라운드 명령 실행 헬퍼 ────────────────────────────────────────────────
async def _cmd_start_vibe(room: str, actor: str, sr_id: str,
project_id: Optional[int]):
"""바이브 세션 생성."""
try:
async with SessionLocal() as db:
# SR 확인
r = await db.execute(
select(SRRequest).where(SRRequest.sr_id == sr_id)
)
sr = r.scalars().first()
if not sr:
await _send_to_room(room, f"SR을 찾을 수 없습니다: {sr_id}")
return
vs = VibeSession(
sr_id=sr_id,
project_id=project_id,
started_by=actor,
status=VibeSessionStatus.CODING,
)
db.add(vs)
await db.commit()
await db.refresh(vs)
msg = (
f"[바이브 세션 생성됨]\n"
f"세션 ID: #{vs.id}\n"
f"SR: {sr_id}\n"
f"담당: {actor}\n"
f"코딩 완료 후: !build {vs.id}"
)
await _send_to_room(room, msg)
except Exception as e:
logger.error("[Bot !vibe] 오류: %s", str(e)[:100])
await _send_to_room(room, f"세션 생성 오류: {str(e)[:100]}")
async def _cmd_build(room: str, actor: str, session_id: int):
"""빌드 실행."""
import httpx
try:
async with httpx.AsyncClient(timeout=30) as client:
r = await client.post(
f"http://localhost:8000/api/vibe/{session_id}/build",
json={},
headers={"Authorization": f"Bearer {_get_internal_token()}"},
)
if r.status_code == 200:
data = r.json()
status = data.get("status", "?")
log = (data.get("build_log") or "")[-200:]
await _send_to_room(
room,
f"[빌드 완료] Session #{session_id}{status}\n{log}"
+ (f"\n배포 진행: !deploy {session_id}" if status == "TESTING" else "")
)
else:
await _send_to_room(room, f"[빌드 오류] {r.status_code}: {r.text[:100]}")
except Exception as e:
await _send_to_room(room, f"[빌드 오류] {str(e)[:100]}")
async def _cmd_deploy(room: str, actor: str, session_id: int):
"""배포 실행."""
import httpx
try:
async with httpx.AsyncClient(timeout=120) as client:
r = await client.post(
f"http://localhost:8000/api/vibe/{session_id}/deploy",
json={"skip_test": False},
headers={"Authorization": f"Bearer {_get_internal_token()}"},
)
if r.status_code == 200:
data = r.json()
status = data.get("status", "?")
log = (data.get("deploy_log") or "")[-200:]
await _send_to_room(
room,
f"[배포 {'완료' if status == 'COMPLETED' else '실패'}] "
f"Session #{session_id}{status}\n{log}"
)
else:
await _send_to_room(room, f"[배포 오류] {r.status_code}: {r.text[:100]}")
except Exception as e:
await _send_to_room(room, f"[배포 오류] {str(e)[:100]}")
async def _cmd_status(sr_id: str, db: AsyncSession) -> str:
"""SR + 바이브 세션 상태 조회."""
try:
r = await db.execute(
select(SRRequest).where(SRRequest.sr_id == sr_id)
)
sr = r.scalars().first()
if not sr:
return f"SR을 찾을 수 없습니다: {sr_id}"
lines = [
f"[{sr_id}] {sr.title}",
f"상태: {sr.status} | 우선순위: {sr.priority}",
f"담당: {sr.assigned_to or '미배정'} | 서버: {sr.target_server or ''}",
]
# 바이브 세션
rv = await db.execute(
select(VibeSession).where(VibeSession.sr_id == sr_id)
.order_by(VibeSession.started_at.desc()).limit(1)
)
vs = rv.scalars().first()
if vs:
lines.append(f"바이브 세션: #{vs.id}{vs.status}")
if vs.claude_session_id:
lines.append(f"Claude 세션 ID: {vs.claude_session_id}")
return "\n".join(lines)
except Exception as e:
return f"조회 오류: {str(e)[:100]}"
async def _cmd_cancel(session_id: int, db: AsyncSession) -> str:
"""세션 취소."""
try:
r = await db.execute(
select(VibeSession).where(VibeSession.id == session_id)
)
vs = r.scalars().first()
if not vs:
return f"세션을 찾을 수 없습니다: #{session_id}"
if vs.status in (VibeSessionStatus.COMPLETED, VibeSessionStatus.FAILED):
return f"이미 종료된 세션입니다: #{session_id} ({vs.status})"
vs.status = VibeSessionStatus.CANCELLED
await db.commit()
return f"[취소됨] 세션 #{session_id}"
except Exception as e:
return f"취소 오류: {str(e)[:100]}"
async def _cmd_sm(room: str, actor: str, server: str,
script_key: str, env_vars: Optional[dict]):
"""SM 스크립트 실행 후 결과 전송."""
from core.ssh_exec import exec_script
import glob, os
SCRIPTS_ROOT = os.path.realpath(
os.path.join(os.path.dirname(__file__), "..", "scripts", "sm")
)
# 스크립트 경로 탐색
found = None
for sh in glob.glob(os.path.join(SCRIPTS_ROOT, "**", "*.sh"), recursive=True):
name = os.path.splitext(os.path.basename(sh))[0]
if name == script_key or name.replace("_sm", "") == script_key or \
name.replace("was_", "").replace("_sm", "") == script_key or \
name.replace("db_", "").replace("_sm", "") == script_key:
found = sh
break
if not found:
await _send_to_room(room, f"스크립트를 찾을 수 없습니다: {script_key}")
return
result = await exec_script(
server_name=server,
script_path=found,
env_vars=env_vars,
timeout=300,
actor=actor,
)
# 결과 요약 (최대 800자)
output = result.stdout or result.error or "출력 없음"
summary = output[-800:] if len(output) > 800 else output
status_tag = "[OK]" if result.success else "[WARN/CRIT]"
await _send_to_room(
room,
f"[SM 점검 결과] {server} / {script_key} {status_tag}\n"
f"소요: {result.elapsed}s | exit={result.exit_code}\n"
f"---\n{summary}"
)
# ══════════════════════════════════════════════════════════════════════════════
# 신규 봇 명령어 헬퍼 함수 (14개)
# ══════════════════════════════════════════════════════════════════════════════
async def _cmd_oncall() -> str:
"""/oncall — 현재 당직자 조회."""
import httpx as _h
try:
async with _h.AsyncClient(timeout=10.0) as c:
r = await c.get("http://localhost:8001/api/oncall/on-duty",
headers={"Authorization": f"Bearer {_get_internal_token()}"})
if r.status_code == 200:
d = r.json()
if not d:
return "[당직자] 오늘 등록된 당직자가 없습니다."
lines = ["[현재 당직자]"]
for oc in (d if isinstance(d, list) else [d])[:3]:
eng = oc.get("engineer") or oc.get("username") or ""
phone = oc.get("phone") or oc.get("contact") or ""
lines.append(f" {eng}" + (f" ({phone})" if phone else ""))
return "\n".join(lines)
return f"당직자 조회 실패 ({r.status_code})"
except Exception as e:
return f"당직자 조회 오류: {str(e)[:80]}"
async def _cmd_create_incident(room: str, actor: str, title: str, grade: str):
"""/incident — 인시던트 빠른 등록."""
import httpx as _h
try:
async with _h.AsyncClient(timeout=10.0) as c:
r = await c.post("http://localhost:8001/api/incidents",
headers={"Authorization": f"Bearer {_get_internal_token()}",
"Content-Type": "application/json"},
json={"title": title, "grade": grade,
"description": f"봇 등록 — 담당: {actor}",
"reported_by": actor})
if r.status_code == 201:
d = r.json()
inc_id = d.get("incident_id", "?")
grade_emoji = {"P1": "🚨", "P2": "🔴", "P3": "🟠", "P4": "🟡"}.get(grade, "⚠️")
msg = (f"{grade_emoji} [{grade}] 인시던트 등록 완료\n"
f"번호: {inc_id}\n제목: {title}\n담당: {actor}")
await _send_to_room(room, msg)
else:
await _send_to_room(room, f"[인시던트 등록 실패] {r.status_code}: {r.text[:100]}")
except Exception as e:
await _send_to_room(room, f"[인시던트 오류] {str(e)[:80]}")
async def _cmd_auto_rca(room: str, actor: str, inc_id: str):
"""/rca — AI 자동 RCA 분석."""
import httpx as _h
try:
async with _h.AsyncClient(timeout=120.0) as c:
r = await c.post(f"http://localhost:8001/api/incidents/{inc_id}/auto-rca",
headers={"Authorization": f"Bearer {_get_internal_token()}"})
if r.status_code == 200:
d = r.json()
rca = d.get("rca", {})
msg = (f"[AI RCA 완료] {inc_id}\n"
f"근본원인: {rca.get('root_cause','')[:100]}\n"
f"신뢰도: {int(rca.get('confidence',0)*100)}%\n"
f"재발방지: {', '.join(rca.get('prevention',[])[:2])}")
await _send_to_room(room, msg)
else:
# problem RCA 시도
async with _h.AsyncClient(timeout=120.0) as c:
r2 = await c.post(f"http://localhost:8001/api/problem/{inc_id}/auto-rca",
headers={"Authorization": f"Bearer {_get_internal_token()}"})
if r2.status_code == 200:
d = r2.json()
rca = d.get("rca", {})
msg = (f"[AI RCA 완료] {inc_id}\n"
f"근본원인: {rca.get('root_cause','')[:100]}\n"
f"신뢰도: {int(rca.get('confidence',0)*100)}%")
await _send_to_room(room, msg)
else:
await _send_to_room(room, f"[RCA 실패] {inc_id} — 인시던트/Problem ID를 확인하세요")
except Exception as e:
await _send_to_room(room, f"[RCA 오류] {str(e)[:80]}")
async def _cmd_escalate(room: str, actor: str, sr_id: str):
"""/escalate — SR 에스컬레이션."""
import httpx as _h
try:
# 당직자 조회
async with _h.AsyncClient(timeout=10.0) as c:
oc_r = await c.get("http://localhost:8001/api/oncall/on-duty",
headers={"Authorization": f"Bearer {_get_internal_token()}"})
oncall_info = oc_r.json() if oc_r.status_code == 200 else {}
oncall_eng = ""
if isinstance(oncall_info, list) and oncall_info:
oncall_eng = oncall_info[0].get("engineer", "")
elif isinstance(oncall_info, dict):
oncall_eng = oncall_info.get("engineer", "")
# SLA 에스컬레이션 API 호출
async with _h.AsyncClient(timeout=10.0) as c:
r = await c.post("http://localhost:8001/api/oncall/escalate",
headers={"Authorization": f"Bearer {_get_internal_token()}",
"Content-Type": "application/json"},
json={"sr_id": sr_id, "escalate_to": oncall_eng,
"reason": f"봇 에스컬레이션 — 요청자: {actor}"})
if r.status_code in (200, 201):
msg = (f"[에스컬레이션 완료] {sr_id}\n"
f"담당자: {oncall_eng or '당직자'}\n"
f"요청자: {actor}")
else:
msg = f"[에스컬레이션] {sr_id} → 당직자({oncall_eng or '미정'})에게 전달됨 (SR 상태 확인 필요)"
await _send_to_room(room, msg)
except Exception as e:
await _send_to_room(room, f"[에스컬레이션 오류] {str(e)[:80]}")
async def _cmd_sla_violations() -> str:
"""/sla — SLA 위반 현황."""
import httpx as _h
try:
async with _h.AsyncClient(timeout=10.0) as c:
r = await c.get("http://localhost:8001/api/tasks/sla/violations",
headers={"Authorization": f"Bearer {_get_internal_token()}"})
if r.status_code == 200:
rows = r.json()
if not rows:
return "[SLA] 현재 위반 중인 SR이 없습니다. ✅"
lines = [f"[SLA 위반] {len(rows)}"]
for sr in rows[:5]:
overdue = sr.get("overdue_minutes", 0)
h, m = divmod(overdue, 60)
lines.append(f" {sr.get('sr_id','?')} [{sr.get('priority','?')}] "
f"{sr.get('title','')[:25]}{h}시간{m}분 초과")
if len(rows) > 5:
lines.append(f" ... 외 {len(rows)-5}")
return "\n".join(lines)
return f"SLA 조회 실패 ({r.status_code})"
except Exception as e:
return f"SLA 조회 오류: {str(e)[:80]}"
async def _cmd_assign_sr(sr_id: str, assignee: str, actor: str, db) -> str:
"""/assign — SR 담당자 즉시 배정."""
import httpx as _h
try:
async with _h.AsyncClient(timeout=10.0) as c:
r = await c.post(f"http://localhost:8001/api/assign/{sr_id}",
headers={"Authorization": f"Bearer {_get_internal_token()}",
"Content-Type": "application/json"},
json={"engineer": assignee, "reason": f"봇 배정 by {actor}"})
if r.status_code in (200, 201):
return f"[배정 완료] {sr_id}{assignee}"
# 직접 상태 업데이트로 폴백
async with _h.AsyncClient(timeout=10.0) as c:
r2 = await c.patch(f"http://localhost:8001/api/tasks/{sr_id}/status",
headers={"Authorization": f"Bearer {_get_internal_token()}",
"Content-Type": "application/json"},
json={"status": "IN_PROGRESS", "actor": actor,
"comment": f"담당자 배정: {assignee}"})
return (f"[배정] {sr_id}{assignee} (수동 확인 필요)"
if r2.status_code == 200 else f"배정 실패 ({r.status_code})")
except Exception as e:
return f"배정 오류: {str(e)[:80]}"
async def _cmd_approve_sr(sr_id: str, actor: str, comment: str, db) -> str:
"""/approve — SR 즉시 승인."""
import httpx as _h
try:
async with _h.AsyncClient(timeout=10.0) as c:
r = await c.post(f"http://localhost:8001/api/approvals/{sr_id}",
headers={"Authorization": f"Bearer {_get_internal_token()}",
"Content-Type": "application/json"},
json={"approver": actor, "result": "APPROVED",
"comment": f"[봇승인] {comment}"})
if r.status_code == 201:
return f"[승인 완료] {sr_id} — 승인자: {actor}"
return f"승인 실패 ({r.status_code}) — SR이 승인 대기 상태인지 확인"
except Exception as e:
return f"승인 오류: {str(e)[:80]}"
async def _cmd_reject_sr(sr_id: str, actor: str, reason: str, db) -> str:
"""/reject — SR 반려."""
import httpx as _h
try:
async with _h.AsyncClient(timeout=10.0) as c:
r = await c.post(f"http://localhost:8001/api/approvals/{sr_id}",
headers={"Authorization": f"Bearer {_get_internal_token()}",
"Content-Type": "application/json"},
json={"approver": actor, "result": "REJECTED",
"comment": f"[봇반려] {reason}"})
if r.status_code == 201:
return f"[반려 완료] {sr_id} — 사유: {reason}"
return f"반려 실패 ({r.status_code})"
except Exception as e:
return f"반려 오류: {str(e)[:80]}"
async def _cmd_kb_search(query: str, db) -> str:
"""/kb — KB 문서 검색."""
import httpx as _h
try:
async with _h.AsyncClient(timeout=10.0) as c:
r = await c.get("http://localhost:8001/api/kb",
params={"keyword": query, "limit": 5},
headers={"Authorization": f"Bearer {_get_internal_token()}"})
if r.status_code == 200:
docs = r.json()
if isinstance(docs, dict):
docs = docs.get("items", docs.get("results", []))
if not docs:
return f"[KB 검색] '{query}' 결과 없음 — 다른 키워드로 시도해 보세요."
lines = [f"[KB 검색] '{query}'{len(docs)}"]
for d in docs[:4]:
title = d.get("title", "")
doc_id = d.get("doc_id", d.get("id", ""))
lines.append(f" [{doc_id}] {title[:45]}")
return "\n".join(lines)
return f"KB 검색 실패 ({r.status_code})"
except Exception as e:
return f"KB 검색 오류: {str(e)[:80]}"
async def _cmd_wbs_status(proj_code: str, db) -> str:
"""/wbs — WBS 지연 현황."""
import httpx as _h
from datetime import date as _date
try:
from models import SiProject
from sqlalchemy import select as _sel
async with SessionLocal() as _db:
proj = (await _db.execute(
_sel(SiProject).where(SiProject.project_code == proj_code)
)).scalars().first()
if not proj:
return f"프로젝트를 찾을 수 없습니다: {proj_code}"
async with _h.AsyncClient(timeout=10.0) as c:
r = await c.get(f"http://localhost:8001/api/si/projects/{proj.id}/wbs",
headers={"Authorization": f"Bearer {_get_internal_token()}"})
if r.status_code == 200:
wbs_items = r.json()
today = str(_date.today())
delayed = [
w for w in wbs_items
if w.get("completion_pct", 0) < 100
and w.get("planned_end", "") and w.get("planned_end", "") < today
and w.get("is_leaf", True)
]
lines = [f"[WBS 현황] {proj_code}",
f"전체: {len(wbs_items)}건 | 지연: {len(delayed)}"]
for w in delayed[:5]:
lines.append(f" {w.get('wbs_code','?')} {w.get('title','')[:30]} "
f"({w.get('completion_pct',0)}%)")
return "\n".join(lines)
return f"WBS 조회 실패 ({r.status_code})"
except Exception as e:
return f"WBS 조회 오류: {str(e)[:80]}"
async def _cmd_scouter_metrics(server_name: str) -> str:
"""/scouter — Scouter APM 실시간 메트릭."""
import httpx as _h
try:
async with _h.AsyncClient(timeout=10.0) as c:
r = await c.get("http://localhost:8001/api/scouter/servers",
headers={"Authorization": f"Bearer {_get_internal_token()}"})
if r.status_code != 200:
return "Scouter 서버 목록 조회 실패"
servers = r.json().get("servers", [])
target = next((s for s in servers
if server_name.lower() in s.get("objName","").lower()), None)
if not target:
return f"[Scouter] '{server_name}' 서버를 찾을 수 없습니다.\n모니터링 서버 목록을 확인하세요."
obj_hash = target.get("objHash")
async with _h.AsyncClient(timeout=10.0) as c:
r2 = await c.get(f"http://localhost:8001/api/scouter/servers/{obj_hash}/metrics",
headers={"Authorization": f"Bearer {_get_internal_token()}"})
if r2.status_code == 200:
m = r2.json().get("metrics", {})
return (f"[Scouter] {target.get('objName')}\n"
f"CPU: {m.get('cpu',0):.1f}% | "
f"Heap: {m.get('heap_used',0)}MB/{m.get('heap_max',0)}MB\n"
f"TPS: {m.get('tps',0):.1f} | "
f"응답: {m.get('response_time',0):.0f}ms | "
f"에러: {m.get('error_rate',0):.1f}%")
return f"메트릭 조회 실패 ({r2.status_code})"
except Exception as e:
return f"Scouter 조회 오류: {str(e)[:80]}"
async def _cmd_rollback(room: str, actor: str, session_id: int):
"""/rollback — 배포 긴급 롤백."""
import httpx as _h
try:
async with _h.AsyncClient(timeout=120.0) as c:
r = await c.post(f"http://localhost:8001/api/vibe/{session_id}/status",
headers={"Authorization": f"Bearer {_get_internal_token()}",
"Content-Type": "application/json"},
json={"status": "FAILED", "actor": actor,
"note": "봇 긴급 롤백 요청"})
msg = (f"[긴급 롤백] Session #{session_id}\n"
+ ("롤백 처리 중 — Jenkins 파이프라인을 확인하세요."
if r.status_code == 200
else f"롤백 요청 전송 ({r.status_code}) — 수동 확인 필요"))
await _send_to_room(room, msg)
except Exception as e:
await _send_to_room(room, f"[롤백 오류] {str(e)[:80]}")
async def _cmd_broadcast_notify(room: str, actor: str, message: str):
"""/notify — 운영팀 전체 공지 발송."""
full_msg = f"[공지 — {actor}]\n{message}"
# 운영팀 채널로 발송
await _send_to_room(room, full_msg)
# ops 채널 추가 발송 (다른 채널이면)
if room != "ops":
await _send_to_room("ops", full_msg)
async def _cmd_topology(server_name: str, db) -> str:
"""/topology — 서버 CI 의존관계."""
import httpx as _h
from models import Server, ConfigItem
from sqlalchemy import select as _sel
try:
async with SessionLocal() as _db:
srv = (await _db.execute(
_sel(Server).where(Server.server_name.contains(server_name))
)).scalars().first()
if not srv:
return f"[토폴로지] 서버 '{server_name}'를 찾을 수 없습니다."
ci = (await _db.execute(
_sel(ConfigItem).where(ConfigItem.linked_server_id == srv.id)
)).scalars().first()
if not ci:
return f"[토폴로지] '{server_name}' — CMDB CI가 연결되지 않았습니다."
async with _h.AsyncClient(timeout=10.0) as c:
r = await c.get(f"http://localhost:8001/api/topology/graph/{ci.id}",
params={"depth": 2},
headers={"Authorization": f"Bearer {_get_internal_token()}"})
if r.status_code == 200:
d = r.json()
nodes = d.get("nodes", [])
links = d.get("links", [])
lines = [f"[토폴로지] {server_name} ({ci.ci_type})",
f"연결 노드: {len(nodes)}개 | 관계: {len(links)}"]
for n in nodes[:6]:
if not n.get("is_root"):
lines.append(f" └─ {n.get('name','')} [{n.get('type','')}]")
if len(nodes) > 7:
lines.append(f" ... 외 {len(nodes)-7}")
lines.append(f"상세: http://localhost:8001/api/topology/page?ci={ci.id}")
return "\n".join(lines)
return f"토폴로지 조회 실패 ({r.status_code})"
except Exception as e:
return f"토폴로지 오류: {str(e)[:80]}"
async def _cmd_vuln_scan(room: str, actor: str, target: str):
"""/vuln — 서버 취약점 스캔."""
import httpx as _h
try:
async with _h.AsyncClient(timeout=60.0) as c:
r = await c.post("http://localhost:8001/api/vuln/scan",
headers={"Authorization": f"Bearer {_get_internal_token()}",
"Content-Type": "application/json"},
json={"host": target, "include_llm": False, "timeout": 1.0})
if r.status_code == 202:
d = r.json()
scan_id = d.get("scan_id", "?")
# 결과 폴링 (최대 30초)
import asyncio as _aio
await _aio.sleep(30)
async with _h.AsyncClient(timeout=10.0) as c2:
r2 = await c2.get(f"http://localhost:8001/api/vuln/scans/{scan_id}",
headers={"Authorization": f"Bearer {_get_internal_token()}"})
if r2.status_code == 200:
res = r2.json()
msg = (f"[취약점 스캔 완료] {target}\n"
f"위험 수준: {res.get('risk_level','?')} | 점수: {res.get('risk_score',0)}\n"
f"취약점: {len(res.get('vulnerabilities',[]))}\n"
f"열린 포트: {', '.join(str(p) for p in res.get('open_ports',[])[:5])}")
else:
msg = f"[취약점 스캔] {target} 완료 — scan_id: {scan_id} (결과 조회: /api/vuln/scans/{scan_id})"
else:
msg = f"[취약점 스캔 실패] {target} ({r.status_code})"
await _send_to_room(room, msg)
except Exception as e:
await _send_to_room(room, f"[스캔 오류] {str(e)[:80]}")
async def _cmd_project_report(room: str, actor: str, proj_code: str, rtype: str, db):
"""/report — 프로젝트 보고서 생성 후 요약 발송."""
import httpx as _httpx
rtype_ko = {"daily": "일일", "weekly": "주간", "monthly": "월간"}.get(rtype, rtype)
try:
# 프로젝트 조회
from models import SiProject
from sqlalchemy import select as sel
async with SessionLocal() as _db:
proj = (await _db.execute(
sel(SiProject).where(SiProject.project_code == proj_code)
)).scalars().first()
if not proj:
await _send_to_room(room, f"프로젝트를 찾을 수 없습니다: {proj_code}")
return
# 메신저 발송
async with _httpx.AsyncClient(timeout=30.0) as client:
r = await client.post(
f"http://localhost:8001/api/si/projects/{proj.id}/report/send",
json={"room": room, "report_type": rtype, "fmt": "excel"},
headers={"Authorization": f"Bearer {_get_internal_token()}"},
)
if r.status_code == 200:
data = r.json()
await _send_to_room(room, data.get("summary", f"[{rtype_ko}보고서] {proj_code} 완료"))
else:
await _send_to_room(room, f"[{rtype_ko}보고서] 생성 실패 ({r.status_code})")
except Exception as e:
await _send_to_room(room, f"[보고서 오류] {str(e)[:100]}")
async def _cmd_pms_status(proj_code: str, db) -> str:
"""/pms — 프로젝트 현황 요약."""
import httpx as _httpx
try:
async with SessionLocal() as _db:
from models import SiProject
from sqlalchemy import select as sel
proj = (await _db.execute(
sel(SiProject).where(SiProject.project_code == proj_code)
)).scalars().first()
if not proj:
return f"프로젝트를 찾을 수 없습니다: {proj_code}"
async with _httpx.AsyncClient(timeout=10.0) as client:
r = await client.get(
f"http://localhost:8001/api/si/projects/{proj.id}/report/status",
headers={"Authorization": f"Bearer {_get_internal_token()}"},
)
if r.status_code == 200:
d = r.json()
lines = [
f"[PMS 현황] {d.get('project_name','')} ({proj_code})",
f"단계: {d.get('phase','?')} | 건강: {d.get('health_status','?')}",
f"진척률: {d.get('overall_progress',0)}% | 예산: {d.get('budget_pct',0)}%",
f"WBS: {d.get('wbs_done',0)}/{d.get('wbs_total',0)} 완료 | 지연: {d.get('wbs_delayed',0)}",
f"미결 이슈: {d.get('issue_open',0)}건 | 고위험: {d.get('high_risks',0)}",
]
if d.get("upcoming_milestones"):
m = d["upcoming_milestones"][0]
lines.append(f"다음 마일스톤: {m['name']} ({m['target_date']})")
return "\n".join(lines)
return f"프로젝트 현황 조회 실패 ({r.status_code})"
except Exception as e:
return f"PMS 조회 오류: {str(e)[:100]}"
async def _cmd_deliverables(proj_code: str, db) -> str:
"""/deliverables — 산출물 제출 현황."""
import httpx as _httpx
try:
async with SessionLocal() as _db:
from models import SiProject
from sqlalchemy import select as sel
proj = (await _db.execute(
sel(SiProject).where(SiProject.project_code == proj_code)
)).scalars().first()
if not proj:
return f"프로젝트를 찾을 수 없습니다: {proj_code}"
async with _httpx.AsyncClient(timeout=10.0) as client:
r = await client.get(
f"http://localhost:8001/api/si/projects/{proj.id}/deliverables/summary",
headers={"Authorization": f"Bearer {_get_internal_token()}"},
)
if r.status_code == 200:
d = r.json()
lines = [
f"[산출물 현황] {proj_code}",
f"전체: {d['total']}건 | 제출률: {d['submit_rate']}% | 승인률: {d['approval_rate']}%",
]
if d.get("overdue"):
lines.append(f"⚠️ 기한 초과: {len(d['overdue'])}")
for o in d["overdue"][:3]:
lines.append(f" - {o['name']} ({o['days_overdue']}일 초과)")
return "\n".join(lines)
return f"산출물 현황 조회 실패 ({r.status_code})"
except Exception as e:
return f"산출물 조회 오류: {str(e)[:100]}"
async def _cmd_compliance_scan(room: str, actor: str):
"""/scan — 시큐어코딩/웹접근성/개인정보 자동 점검."""
import httpx as _httpx
try:
async with _httpx.AsyncClient(timeout=60.0) as client:
r = await client.post(
"http://localhost:8001/api/compliance/scan",
headers={"Authorization": f"Bearer {_get_internal_token()}"},
)
if r.status_code == 200:
d = r.json()
msg = (
f"[보안 스캔 완료]\n"
f"종합 위험도: {d.get('risk_level','?')}\n"
f"총 발견: {d.get('total_findings',0)}\n"
f"시큐어코딩: {d.get('summary',{}).get('secure_coding',0)}\n"
f"웹접근성: {d.get('summary',{}).get('accessibility',0)}\n"
f"개인정보: {d.get('summary',{}).get('privacy',0)}\n"
f"상세: http://localhost:8001/compliance"
)
else:
msg = f"[보안 스캔 실패] {r.status_code}"
await _send_to_room(room, msg)
except Exception as e:
await _send_to_room(room, f"[스캔 오류] {str(e)[:100]}")
async def _cmd_checklist_status() -> str:
"""/checklist — 공공기관 체크리스트 현황."""
import httpx as _httpx
try:
async with _httpx.AsyncClient(timeout=10.0) as client:
r = await client.get(
"http://localhost:8001/api/public/status",
headers={"Authorization": f"Bearer {_get_internal_token()}"},
)
if r.status_code == 200:
d = r.json()
by_cat = d.get("by_category", {})
lines = [
f"[공공기관 체크리스트]",
f"GUARDiA 구현율: {d.get('impl_rate',0)}% ({d.get('guardia_impl',0)}/{d.get('total_items',0)})",
f"검증 완료: {d.get('verify_rate',0)}% ({d.get('verified',0)}/{d.get('total_items',0)})",
]
for cat, v in by_cat.items():
lines.append(f" {cat}: {v['impl']}/{v['total']} 구현")
actions = d.get("action_items", [])
if actions:
lines.append(f"미완료 조치: {len(actions)}")
for a in actions[:2]:
lines.append(f" - {a[:60]}")
return "\n".join(lines)
return f"체크리스트 조회 실패 ({r.status_code})"
except Exception as e:
return f"체크리스트 오류: {str(e)[:100]}"
async def _cmd_perf_test(room: str, actor: str, target_url: str):
"""/perf — 성능 테스트 실행."""
import httpx as _httpx
try:
async with _httpx.AsyncClient(timeout=90.0) as client:
r = await client.post(
"http://localhost:8001/api/perf/run",
json={
"target_url": target_url,
"endpoints": ["/", "/api/tasks", "/api/dashboard/me"],
"users": 10,
"duration": 30,
"ramp_up": 5,
"think_time": 0.1,
},
headers={"Authorization": f"Bearer {_get_internal_token()}"},
)
if r.status_code == 200:
d = r.json()
s = d.get("summary", {})
msg = (
f"[성능 테스트 완료] {target_url}\n"
f"TPS: {s.get('tps',0)} | 평균: {s.get('avg_ms',0)}ms\n"
f"P95: {s.get('p95_ms',0)}ms | 에러율: {s.get('error_rate_pct',0)}%\n"
f"결과 ID: {d.get('result_id','?')}\n"
f"보고서: http://localhost:8001/api/perf/results/{d.get('result_id','')}/html"
)
else:
msg = f"[성능 테스트 실패] {r.status_code}"
await _send_to_room(room, msg)
except Exception as e:
await _send_to_room(room, f"[성능 테스트 오류] {str(e)[:100]}")
async def _cmd_open_issues(proj_code: str, db) -> str:
"""/issues — 프로젝트 미결 이슈 목록."""
import httpx as _httpx
try:
async with SessionLocal() as _db:
from models import SiProject
from sqlalchemy import select as sel
proj = (await _db.execute(
sel(SiProject).where(SiProject.project_code == proj_code)
)).scalars().first()
if not proj:
return f"프로젝트를 찾을 수 없습니다: {proj_code}"
async with _httpx.AsyncClient(timeout=10.0) as client:
r = await client.get(
f"http://localhost:8001/api/si/projects/{proj.id}/issues?status=OPEN&limit=5",
headers={"Authorization": f"Bearer {_get_internal_token()}"},
)
if r.status_code == 200:
issues = r.json()
if not issues:
return f"[이슈] {proj_code} — 미결 이슈 없음 ✅"
lines = [f"[미결 이슈] {proj_code} ({len(issues)}건)"]
for iss in issues[:5]:
lines.append(f" [{iss.get('issue_id','?')}] {iss.get('title','')[:40]}{iss.get('assigned_to','미배정')}")
return "\n".join(lines)
return f"이슈 조회 실패 ({r.status_code})"
except Exception as e:
return f"이슈 조회 오류: {str(e)[:100]}"
async def _cmd_create_sr(room: str, actor: str, title: str):
"""슬래시 /sr 명령 — SR 빠른 접수 (내부 API 호출)."""
import httpx as _httpx
try:
async with _httpx.AsyncClient(timeout=10.0) as client:
r = await client.post(
"http://localhost:8001/api/tasks",
json={"title": title, "requested_by": actor, "sr_type": "OTHER", "priority": "MEDIUM"},
headers={"Authorization": f"Bearer {_get_internal_token()}"},
)
if r.status_code == 201:
data = r.json()
await _send_to_room(room, f"[SR 접수 완료] {data.get('sr_id')}\n제목: {title}\n담당: {data.get('assigned_to') or '배정 대기'}")
else:
await _send_to_room(room, f"[SR 접수 실패] {r.status_code}")
except Exception as e:
await _send_to_room(room, f"[SR 접수 오류] {str(e)[:100]}")
async def _cmd_dashboard_status() -> str:
"""슬래시 /status — 시스템 현황 요약."""
import httpx as _httpx
try:
async with _httpx.AsyncClient(timeout=10.0) as client:
r = await client.get(
"http://localhost:8001/api/dashboard/overview",
headers={"Authorization": f"Bearer {_get_internal_token()}"},
)
if r.status_code == 200:
data = r.json()
return (
f"[GUARDiA 현황]\n"
f"전체 SR: {data.get('total_sr', 0)}\n"
f"처리 중: {data.get('in_progress', 0)}\n"
f"SLA 위반: {data.get('sla_breached', 0)}\n"
f"장애 진행: {data.get('open_incidents', 0)}"
)
return f"현황 조회 실패 ({r.status_code})"
except Exception as e:
return f"현황 조회 오류: {str(e)[:80]}"
async def _cmd_license_status(db) -> str:
"""슬래시 /license — 라이선스 상태 조회."""
try:
from routers.license import get_license_status
status = await get_license_status(db)
if status.get("valid"):
label = "[체험판]" if status.get("is_trial") else ""
return (
f"[GUARDiA 라이선스 {label}]\n"
f"에디션: {status.get('edition')}\n"
f"고객: {status.get('customer')}\n"
f"만료: {status.get('expires_at', '?')[:10]} (D-{status.get('days_remaining', '?')})"
)
elif status.get("expired"):
return "❌ 라이선스 만료됨 — /license 에서 갱신하세요."
else:
return "⚠️ 라이선스 미등록 — /license 에서 체험판을 시작하세요."
except Exception as e:
return f"라이선스 조회 오류: {str(e)[:80]}"
async def _cmd_bulk_action(room: str, actor: str, action: str, sr_ids: list):
"""슬래시 /bulk — SR 대량 처리."""
import httpx as _httpx
try:
async with _httpx.AsyncClient(timeout=30.0) as client:
r = await client.post(
"http://localhost:8001/api/tasks/bulk",
json={"sr_ids": sr_ids, "action": action, "params": {"note": f"봇 대량처리 by {actor}"}},
headers={"Authorization": f"Bearer {_get_internal_token()}"},
)
if r.status_code == 200:
data = r.json()
await _send_to_room(
room,
f"[대량처리 완료] {action}\n"
f"전체: {data['total']}건 | 성공: {data['success']}건 | 실패: {data['failed']}"
)
else:
await _send_to_room(room, f"[대량처리 실패] {r.status_code}: {r.text[:100]}")
except Exception as e:
await _send_to_room(room, f"[대량처리 오류] {str(e)[:100]}")
async def _send_to_room(room: str, text: str):
"""내부 봇 채널로 메시지 전송."""
logger.info("[Bot → %s] %s", room, text[:100])
if BOT_SEND_API:
try:
async with httpx.AsyncClient(timeout=5.0) as client:
await client.post(BOT_SEND_API, json={"room": room, "text": text})
except Exception:
pass
else:
try:
async with httpx.AsyncClient(timeout=5.0) as client:
await client.post(
f"{MESSENGER_BASE_URL}/api/webhook/bot-reply",
json={"room": room, "text": text},
)
except Exception:
pass
def _get_internal_token() -> str:
"""내부 API 호출용 토큰 (환경변수 또는 서비스 계정)."""
import os
return os.environ.get("INTERNAL_API_TOKEN", "")
# ── 자율 운영 봇 명령어 헬퍼 함수 ────────────────────────────────────────────
async def _cmd_approve_action(action_id: str, actor: str,
comment: Optional[str], db) -> str:
"""봇 /approve 명령 처리."""
try:
from models import AutoAction, AutoActionStatus
from sqlalchemy import select as _sel
async with SessionLocal() as _db:
q = await _db.execute(_sel(AutoAction).where(AutoAction.action_id == action_id))
action = q.scalar_one_or_none()
if not action:
return f"[승인 실패] 작업 {action_id} 를 찾을 수 없습니다."
if action.status != AutoActionStatus.PENDING_APPROVAL:
return f"[승인 실패] 현재 상태: {action.status} (대기 중 아님)"
from datetime import datetime as _dt
if action.expires_at and action.expires_at < _dt.now():
action.status = AutoActionStatus.EXPIRED
await _db.commit()
return f"[승인 실패] 작업 {action_id} 만료됨 — 재등록 필요"
action.status = AutoActionStatus.APPROVED
action.approved_by = actor
action.approved_at = _dt.now()
action.comment = comment
action.processed_at = _dt.now()
await _db.commit()
return (
f"✅ [승인 완료] {action.action_type}\n"
f" 작업 ID: {action_id}\n"
f" 승인자: {actor}\n"
f" {comment or ''}\n"
f"작업을 실행할 수 있습니다."
)
except Exception as e:
return f"[승인 오류] {str(e)[:100]}"
async def _cmd_reject_action(action_id: str, actor: str,
reason: str, db) -> str:
"""봇 /reject 명령 처리."""
try:
from models import AutoAction, AutoActionStatus
from sqlalchemy import select as _sel
from datetime import datetime as _dt
async with SessionLocal() as _db:
q = await _db.execute(_sel(AutoAction).where(AutoAction.action_id == action_id))
action = q.scalar_one_or_none()
if not action:
return f"[거부 실패] 작업 {action_id} 를 찾을 수 없습니다."
if action.status != AutoActionStatus.PENDING_APPROVAL:
return f"[거부 실패] 현재 상태: {action.status}"
action.status = AutoActionStatus.REJECTED
action.approved_by = actor
action.approved_at = _dt.now()
action.comment = reason
await _db.commit()
return (
f"❌ [거부] {action.action_type}\n"
f" 작업 ID: {action_id}\n"
f" 거부자: {actor}\n"
f" 사유: {reason}"
)
except Exception as e:
return f"[거부 오류] {str(e)[:100]}"
async def _cmd_auto_queue(db) -> str:
"""봇 /autoq — 승인 대기 큐 조회."""
try:
from models import AutoAction, AutoActionStatus
from sqlalchemy import select as _sel, desc as _desc
from datetime import datetime as _dt
async with SessionLocal() as _db:
q = await _db.execute(
_sel(AutoAction)
.where(
AutoAction.status == AutoActionStatus.PENDING_APPROVAL,
AutoAction.expires_at > _dt.now(),
)
.order_by(_desc(AutoAction.created_at))
.limit(10)
)
actions = q.scalars().all()
if not actions:
return "✅ 승인 대기 중인 작업이 없습니다."
lines = [f"⏳ 승인 대기 {len(actions)}"]
for a in actions:
risk_icon = {"CRITICAL": "🚨", "HIGH": "⚠️"}.get(a.risk_level, "")
lines.append(
f"\n{risk_icon} [{a.action_id}] {a.action_type}\n"
f" {a.description[:60]}\n"
f" 요청자: {a.requested_by} | 만료: {(a.expires_at.strftime('%H:%M') if a.expires_at else 'N/A')}\n"
f" → /approve {a.action_id} 또는 /reject {a.action_id}"
)
return "\n".join(lines)
except Exception as e:
return f"[큐 조회 오류] {str(e)[:100]}"
# ── CI/CD 봇 명령어 헬퍼 함수 ────────────────────────────────────────────────
JENKINS_URL = "http://localhost:9080" # Nginx 뒤 내부 포트
GITEA_URL = "http://localhost:9003" # Nginx 뒤 내부 포트
JENKINS_USER = "admin"
JENKINS_TOKEN_ENV = "JENKINS_API_TOKEN" # 환경변수에서 읽기
GITEA_USER = "zio"
GITEA_TOKEN_ENV = "GITEA_API_TOKEN" # 환경변수에서 읽기
import os as _os
def _jenkins_auth():
token = _os.environ.get(JENKINS_TOKEN_ENV, "")
return (JENKINS_USER, token) if token else (JENKINS_USER, "")
def _gitea_headers():
token = _os.environ.get(GITEA_TOKEN_ENV, "")
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"token {token}"
else:
# 토큰 없으면 Basic Auth (Gitea 기본)
import base64
cred = base64.b64encode(b"zio:Zio@Admin2026!").decode()
headers["Authorization"] = f"Basic {cred}"
return headers
async def _cmd_cicd_status(project: Optional[str]) -> str:
"""CI/CD 전체 현황: Jenkins 최근 빌드 + Gitea 최근 커밋."""
lines = ["[CI/CD 현황]"]
try:
async with httpx.AsyncClient(timeout=10.0) as c:
# Jenkins: 모든 Job 목록
r = await c.get(
f"{JENKINS_URL}/api/json?tree=jobs[name,color,lastBuild[number,result,timestamp,duration]]",
auth=_jenkins_auth(),
)
if r.status_code == 200:
jobs = r.json().get("jobs", [])
if project:
jobs = [j for j in jobs if project.lower() in j["name"].lower()]
lines.append("\n■ Jenkins 빌드")
for j in jobs[:8]:
lb = j.get("lastBuild") or {}
result = lb.get("result", "N/A") or "진행중"
num = lb.get("number", "-")
icon = {"SUCCESS":"","FAILURE":"","UNSTABLE":"⚠️",
"ABORTED":"","진행중":"🔄"}.get(result, "")
lines.append(f" {icon} {j['name']} #{num}{result}")
else:
lines.append(f"\n■ Jenkins 연결 실패 (HTTP {r.status_code})")
# Gitea: 저장소 목록
r2 = await c.get(
f"{GITEA_URL}/api/v1/repos/search?limit=5",
headers=_gitea_headers(),
)
if r2.status_code == 200:
repos = r2.json().get("data", [])
if project:
repos = [rep for rep in repos if project.lower() in rep["name"].lower()]
lines.append("\n■ Gitea 저장소")
for rep in repos[:5]:
updated = (rep.get("updated_at") or "")[:10]
lines.append(f" 📁 {rep['full_name']} — 최근: {updated}")
else:
lines.append(f"\n■ Gitea 연결 실패 (HTTP {r2.status_code})")
except Exception as e:
lines.append(f"\n연결 오류: {str(e)[:80]}")
lines.append("Jenkins/Gitea JENKINS_API_TOKEN, GITEA_API_TOKEN 환경변수를 확인하세요.")
return "\n".join(lines)
async def _cmd_jenkins_status(job: str, action: str) -> str:
"""Jenkins 잡 상태/로그 조회."""
try:
async with httpx.AsyncClient(timeout=15.0) as c:
if action == "log":
r = await c.get(
f"{JENKINS_URL}/job/{job}/lastBuild/consoleText",
auth=_jenkins_auth(),
)
if r.status_code == 200:
log = r.text[-1200:] # 마지막 1200자
return f"[Jenkins] {job} 최근 빌드 로그:\n```\n{log}\n```"
return f"[Jenkins] {job} 로그 조회 실패 (HTTP {r.status_code})"
else:
# status
r = await c.get(
f"{JENKINS_URL}/job/{job}/api/json?tree=name,color,lastBuild[number,result,timestamp,duration,url]",
auth=_jenkins_auth(),
)
if r.status_code == 200:
d = r.json()
lb = d.get("lastBuild") or {}
result = lb.get("result", "진행중") or "진행중"
num = lb.get("number", "N/A")
dur_sec = (lb.get("duration", 0) or 0) // 1000
icon = {"SUCCESS":"","FAILURE":"","UNSTABLE":"⚠️",
"ABORTED":"","진행중":"🔄"}.get(result, "")
return (
f"[Jenkins] {job}\n"
f" 최근 빌드: #{num}\n"
f" 결과: {icon} {result}\n"
f" 소요시간: {dur_sec}\n"
f" 빌드 URL: {JENKINS_URL}/job/{job}/{num}/"
)
return f"[Jenkins] {job} 조회 실패 (HTTP {r.status_code})\n잡 이름을 확인하세요."
except Exception as e:
return f"[Jenkins] 연결 오류: {str(e)[:100]}"
async def _cmd_jenkins_trigger(room: str, actor: str, job: str):
"""Jenkins 빌드 트리거 (백그라운드)."""
try:
async with httpx.AsyncClient(timeout=15.0) as c:
r = await c.post(
f"{JENKINS_URL}/job/{job}/build",
auth=_jenkins_auth(),
)
if r.status_code in (200, 201):
await _send_to_room(room, f"[Jenkins] ✅ {job} 빌드 트리거 완료 by {actor}\n빌드 상태 확인: /jenkins {job} status")
else:
await _send_to_room(room, f"[Jenkins] ❌ {job} 빌드 트리거 실패 (HTTP {r.status_code})")
except Exception as e:
await _send_to_room(room, f"[Jenkins] 연결 오류: {str(e)[:100]}")
async def _cmd_gitea_status(repo: str, action: str) -> str:
"""Gitea 저장소 상태 조회."""
# repo 형식: 'guardia-itsm' → 'zio/guardia-itsm' 자동 보완
full_repo = repo if "/" in repo else f"zio/{repo}"
try:
async with httpx.AsyncClient(timeout=10.0) as c:
if action == "pr":
r = await c.get(
f"{GITEA_URL}/api/v1/repos/{full_repo}/pulls?state=open&limit=5",
headers=_gitea_headers(),
)
if r.status_code == 200:
prs = r.json()
if not prs:
return f"[Gitea] {full_repo} — 오픈 PR 없음"
lines = [f"[Gitea] {full_repo} 오픈 PR {len(prs)}"]
for pr in prs:
lines.append(f" #{pr['number']} {pr['title']}{pr['user']['login']}")
return "\n".join(lines)
return f"[Gitea] PR 조회 실패 (HTTP {r.status_code})"
elif action == "branch":
r = await c.get(
f"{GITEA_URL}/api/v1/repos/{full_repo}/branches?limit=5",
headers=_gitea_headers(),
)
if r.status_code == 200:
branches = r.json()
lines = [f"[Gitea] {full_repo} 브랜치 목록"]
for b in branches:
commit = b.get("commit", {}).get("id", "")[:7]
lines.append(f" 🌿 {b['name']}{commit}")
return "\n".join(lines)
return f"[Gitea] 브랜치 조회 실패 (HTTP {r.status_code})"
else: # log (최근 커밋)
r = await c.get(
f"{GITEA_URL}/api/v1/repos/{full_repo}/commits?limit=5",
headers=_gitea_headers(),
)
if r.status_code == 200:
commits = r.json()
lines = [f"[Gitea] {full_repo} 최근 커밋"]
for cm in commits:
sha = cm.get("sha", "")[:7]
msg = (cm.get("commit", {}).get("message") or "")[:50].split("\n")[0]
auth = cm.get("commit", {}).get("author", {}).get("name", "")
lines.append(f" {sha} {msg}{auth}")
return "\n".join(lines)
return f"[Gitea] 커밋 조회 실패 (HTTP {r.status_code})"
except Exception as e:
return f"[Gitea] 연결 오류: {str(e)[:100]}"
async def _cmd_release(room: str, actor: str, project: str, version: str):
"""릴리즈 배포 파이프라인 (Jenkins + 배포 서버)."""
# 저장소명 → Jenkins Job 이름 매핑
job_map = {
"guardia-itsm": "guardia-itsm",
"guardia-manager": "guardia-manager",
"zioinfo-web": "zioinfo-web",
"guardia-messenger":"guardia-messenger",
}
job = job_map.get(project.lower(), project)
try:
await _send_to_room(room,
f"[릴리즈] 🚀 {project} {version} 배포 파이프라인 시작 by {actor}")
async with httpx.AsyncClient(timeout=15.0) as c:
# 1단계: Jenkins 빌드 트리거
r = await c.post(
f"{JENKINS_URL}/job/{job}/buildWithParameters",
auth=_jenkins_auth(),
params={"VERSION": version, "ACTOR": actor},
)
if r.status_code in (200, 201):
await _send_to_room(room, f"[릴리즈] ✅ 1단계 — Jenkins 빌드 트리거 완료 ({job})")
else:
# 파라미터 없는 빌드 시도
r2 = await c.post(
f"{JENKINS_URL}/job/{job}/build",
auth=_jenkins_auth(),
)
if r2.status_code in (200, 201):
await _send_to_room(room, f"[릴리즈] ✅ 1단계 — Jenkins 빌드 트리거 완료")
else:
await _send_to_room(room,
f"[릴리즈] ⚠️ Jenkins 빌드 트리거 실패 (HTTP {r.status_code})\n"
f"수동 빌드: {JENKINS_URL}/job/{job}/")
return
# 2단계: 완료 후 상태 확인 안내
await _send_to_room(room,
f"[릴리즈] 빌드 상태 확인:\n"
f" /jenkins {job} status\n"
f" /jenkins {job} log")
except Exception as e:
await _send_to_room(room, f"[릴리즈] 연결 오류: {str(e)[:100]}")
def _help_text() -> str:
return """GUARDiA ITSM 봇 명령어
━━━━━━━━━━━━━━━━━━━━━━━━
[SR 관리]
/sr <제목> → SR 빠른 접수
!status <sr_id> → SR + 세션 상태 조회
/bulk <action> <SR-ID...> → SR 대량 처리
/assign <SR-ID> <담당자> → SR 담당자 즉시 배정
/approve <SR-ID> [의견] → SR 즉시 승인
/reject <SR-ID> [사유] → SR 반려
/escalate <SR-ID> → 당직자에게 에스컬레이션
/sla → SLA 위반 현황
[인시던트 / 장애]
/incident <제목> [P1|P2|P3|P4] → 인시던트 빠른 등록
/rca <INC-ID> → AI 자동 RCA 분석
/oncall → 현재 당직자 조회
[PMS 프로젝트]
/pms <코드> → 프로젝트 진척 현황
/wbs <코드> → WBS 지연 현황
/report <코드> [daily|weekly|monthly] → 보고서 발송
/deliverables <코드> → 산출물 제출 현황
/issues <코드> → 미결 이슈 목록
[인프라 / 보안]
/topology <서버명> → CI 의존관계 조회
/scouter <서버명> → Scouter APM 실시간 메트릭
/vuln <서버명|IP> → 취약점 스캔
/scan → 소스 보안 점검
/checklist → 공공기관 이행 현황
/perf [url] → 성능 테스트
[자율 운영 — 자동처리 & 승인]
/autoq → 승인 대기 작업 목록
/approve <작업ID> [의견] → 고위험 작업 승인 (HIGH/CRITICAL)
/reject <작업ID> [사유] → 작업 거부
[CI/CD 파이프라인]
/cicd [project] → CI/CD 전체 현황 (Jenkins + Gitea)
/jenkins <job> [build|status|log] → Jenkins 빌드 트리거·상태·로그
/git <repo> [log|pr|branch] → Gitea 저장소 커밋·PR·브랜치
/release <project> [version] → 릴리즈 배포 파이프라인 실행
[배포 제어]
!vibe <sr_id> [project_id] → 바이브 코딩 세션
!build <session_id> → 빌드 실행
!deploy <session_id> → 배포
/rollback <session_id> → 긴급 롤백
!cancel <session_id> → 세션 취소
[정보 / 운영]
/status → 시스템 현황
/license → 라이선스 상태
/kb <검색어> → KB 문서 검색
/notify <메시지> → 운영팀 전체 공지
[SM 원격 제어]
!sm <server> <script> → SM 스크립트 실행
!health <server> → 헬스체크
!log <server> [logfile] → 로그 분석
━━━━━━━━━━━━━━━━━━━━━━━━
SM 스크립트 키: system, tomcat, jboss, jeus,
weblogic, postgresql, oracle, mysql, tibero,
esb, elasticsearch, solr, pinpoint, scouter
[디자인 리뉴얼 봇]
/design capture → 현재 UI Before 스크린샷 (Playwright MCP)
/design variant <검색어> → Variant 디자인 레퍼런스 탐색
/design tokens → 통합 디자인 토큰 생성
/design homepage → 홈페이지 컴포넌트 개편
/design itsm → ITSM UI 현대화
/design manager → Manager 디자인 개편
/design app → Messenger 앱 개편
/design qa → Before/After 시각적 QA
/design ab <컴포넌트> → A/B 테스트 버전 생성
[스크랩핑 봇]
!scrap <url> → URL 즉시 스크랩
!scrap list [n] → 최근 n개 결과 목록
!scrap publish <id> → 게시 + 메신저 알림
!scrap del <id> → 삭제
!scrap restore <id> → 삭제→DRAFT 원복
!scrap status <id> → 결과 상세 조회
━━━━━━━━━━━━━━━━━━━━━━━━"""
# ── 스크랩 봇 헬퍼 ────────────────────────────────────────────────────────────
async def _handle_natural_language(
cmd: BotCommand,
bg: BackgroundTasks,
db: AsyncSession,
) -> BotReply:
"""
명시적 명령어가 아닌 자연어 입력을 처리.
NL 파서 → 명령어 변환 → 기존 핸들러 재호출.
"""
from core.nl_command import parse_nl_command
text = cmd.command.strip()
parsed = await parse_nl_command(text)
confidence = parsed.get("confidence", 0)
full_cmd = parsed.get("full_command")
# 너무 낮은 신뢰도 → 안내
if not full_cmd or confidence < 0.45:
return BotReply(
room=cmd.room,
text=(
f"❓ 명령어를 인식하지 못했습니다.\n\n"
f"자연어로 입력 예시:\n"
f" • 서버1 헬스체크 해줘\n"
f" • SR-2026-XXXX 배포해줘\n"
f" • https://example.com 스크랩해줘\n"
f" • P1 긴급 장애 결제 시스템 다운\n\n"
f"!help 로 전체 명령어 목록 확인"
),
)
# 파싱된 명령어로 재호출
nl_cmd = BotCommand(
room=cmd.room,
user=cmd.user,
command=full_cmd,
)
reply = await handle_bot_command(nl_cmd, bg, db)
# 신뢰도 < 0.75면 해석 과정 투명하게 표시
if confidence < 0.75:
prefix = (
f"💬 자연어 해석 (신뢰도 {int(confidence*100)}%)\n"
f" 입력: {text}\n"
f" 명령: {full_cmd}\n\n"
)
reply.text = prefix + reply.text
else:
prefix = f"💬 → {full_cmd}\n"
reply.text = prefix + reply.text
return reply
async def _cmd_scrap_url(room: str, actor: str, url: str) -> None:
"""URL 즉시 스크랩 후 결과를 채널로 전송."""
from core.scraping_engine import scrape as _scrape
try:
eng = await _scrape(url)
async with SessionLocal() as db:
from models import ScrapingResult
rec = ScrapingResult(
title=eng.title or url,
content=eng.content,
plain_text=eng.plain_text,
url=url,
source_html=eng.source_html,
status="FAILED" if eng.error else "DRAFT",
meta=eng.meta,
error_msg=eng.error,
scraped_by=actor,
messenger_room=room,
)
db.add(rec)
await db.commit()
await db.refresh(rec)
rid = rec.id
title = rec.title
status = rec.status
err = rec.error_msg
if err:
msg = f"[스크랩 실패] #{rid}\n오류: {err}"
else:
summary = (eng.plain_text or "")[:200]
msg = (
f"[스크랩 완료] #{rid}{title}\n"
f"URL: {url}\n"
f"요약: {summary}{'...' if len(eng.plain_text or '') > 200 else ''}\n"
f"상태: {status}\n"
f"게시: !scrap publish {rid}"
)
except Exception as e:
msg = f"[스크랩 오류] {str(e)[:150]}"
await _send_to_room(room, msg)
async def _cmd_scrap_list(n: int) -> str:
"""최근 스크랩 결과 n개 목록."""
try:
from models import ScrapingResult
from sqlalchemy import select, desc
async with SessionLocal() as db:
rows = (await db.execute(
select(ScrapingResult)
.where(ScrapingResult.status != "DELETED")
.order_by(desc(ScrapingResult.scraped_at))
.limit(min(n, 20))
)).scalars().all()
if not rows:
return "스크랩 결과가 없습니다."
lines = ["[최근 스크랩 결과]"]
for r in rows:
lines.append(
f"#{r.id} [{r.status}] {r.title or r.url[:50]}\n"
f" {r.scraped_at.strftime('%m/%d %H:%M')}"
)
return "\n".join(lines)
except Exception as e:
return f"조회 오류: {e}"
async def _cmd_scrap_publish(room: str, actor: str, result_id: int) -> None:
"""스크랩 결과 게시."""
try:
from models import ScrapingResult
async with SessionLocal() as db:
r = await db.get(ScrapingResult, result_id)
if not r:
await _send_to_room(room, f"#{result_id} 결과를 찾을 수 없습니다.")
return
if r.status == "PUBLISHED":
await _send_to_room(room, f"#{result_id} 이미 게시된 결과입니다.")
return
if r.status == "FAILED":
await _send_to_room(room, f"#{result_id} 실패한 결과는 게시할 수 없습니다.")
return
r.status = "PUBLISHED"
r.published_at = datetime.utcnow()
r.published_by = actor
r.messenger_room = room
await db.commit()
summary = (r.plain_text or "")[:300]
msg = (
f"[스크랩 게시] #{r.id}{r.title}\n"
f"URL: {r.url}\n"
f"요약: {summary}{'...' if len(r.plain_text or '') > 300 else ''}\n"
f"게시자: {actor}"
)
await _send_to_room(room, msg)
except Exception as e:
await _send_to_room(room, f"게시 오류: {e}")
async def _cmd_scrap_delete(result_id: int) -> str:
try:
from models import ScrapingResult
async with SessionLocal() as db:
r = await db.get(ScrapingResult, result_id)
if not r:
return f"#{result_id} 결과를 찾을 수 없습니다."
if r.status == "DELETED":
return f"#{result_id} 이미 삭제된 결과입니다."
r.status = "DELETED"
r.deleted_at = datetime.utcnow()
await db.commit()
return f"[스크랩 삭제] #{result_id} 삭제 완료. (!scrap restore {result_id} 로 원복)"
except Exception as e:
return f"삭제 오류: {e}"
async def _cmd_scrap_restore(result_id: int) -> str:
try:
from models import ScrapingResult
async with SessionLocal() as db:
r = await db.get(ScrapingResult, result_id)
if not r:
return f"#{result_id} 결과를 찾을 수 없습니다."
if r.status != "DELETED":
return f"#{result_id} 삭제된 결과만 원복할 수 있습니다. (현재: {r.status})"
r.status = "DRAFT"
r.deleted_at = None
await db.commit()
return f"[스크랩 원복] #{result_id} DRAFT 상태로 원복 완료."
except Exception as e:
return f"원복 오류: {e}"
async def _cmd_scrap_status(result_id: int) -> str:
try:
from models import ScrapingResult
async with SessionLocal() as db:
r = await db.get(ScrapingResult, result_id)
if not r:
return f"#{result_id} 결과를 찾을 수 없습니다."
lines = [
f"[스크랩 상세] #{r.id}",
f"제목: {r.title or ''}",
f"URL: {r.url}",
f"상태: {r.status}",
f"수집일시: {r.scraped_at.strftime('%Y-%m-%d %H:%M:%S')}",
]
if r.published_at:
lines.append(f"게시일시: {r.published_at.strftime('%Y-%m-%d %H:%M:%S')}")
if r.error_msg:
lines.append(f"오류: {r.error_msg[:100]}")
if r.plain_text:
lines.append(f"요약: {r.plain_text[:200]}...")
return "\n".join(lines)
except Exception as e:
return f"조회 오류: {e}"