zioinfo-mail/itsm/routers/incidents.py
DESKTOP-TKLFCPR\ython e228faabf5 feat(itsm): G-1~G-12 확장 기능 + 하네스/봇/설치스크립트 구현
G-1: 메신저 Webhook Relay + _send_to_room 실제 httpx 호출 구현
G-2: POST /api/tasks/bulk SR 대량작업 엔드포인트 (최대 100건)
G-3: 라이선스 만료 알림 스케줄러 (매일 09:00 KST)
G-4: 체험판 upgrade_banner 필드 + license.py 배너 로직
G-5: core/auto_rca.py + incidents/problem auto-rca 엔드포인트
G-6: core/deploy_impact.py + vibe impact-analysis 엔드포인트
G-7: core/ticket_classifier.py + SR 생성 시 AI 분류 + ai-suggestion API
G-8: VulnPatchRecord 모델 + vuln_scan 패치추적 4개 엔드포인트
G-9: core/jira_sync.py + gateway Jira/Confluence 연동 엔드포인트
G-10: core/push_notify.py + routers/push.py + PushSubscription 모델
G-11: approvals 다중승인 (위임/서명/기한초과/마감연장)
G-12: alembic.ini + migrations/ + cicd/migrate_to_postgres.sh

하네스: guardia-orchestrator 확장기능 Phase 반영
봇명령어: /sr /status /license /bulk 슬래시 명령어 추가
설치스크립트: setup/ (Ubuntu, CentOS, RHEL, Windows) --test 옵션 포함

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 18:18:52 +09:00

452 lines
16 KiB
Python

"""
장애 관리 (Incident Management) API.
엔드포인트:
GET /api/incidents — 장애 목록 (등급·상태·기관 필터)
POST /api/incidents — 장애 등록
GET /api/incidents/{id} — 장애 상세
PATCH /api/incidents/{id} — 장애 수정
PATCH /api/incidents/{id}/status — 상태 전환
POST /api/incidents/{id}/link-sr — SR 연결
DELETE /api/incidents/{id}/link-sr/{sr_id} — SR 연결 해제
GET /api/incidents/{id}/srs — 연결된 SR 목록
POST /api/incidents/{id}/close — RCA 포함 종료
GET /api/incidents/stats — 등급별·상태별 통계
장애 번호 형식: INC-YYYYMMDD-NNNNNN (UUID 앞 6자리)
"""
from __future__ import annotations
import logging
from datetime import datetime
from typing import List, Optional
from uuid import uuid4
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import select, or_, desc
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from database import get_db
from models import (
Institution, SRRequest,
Incident, IncidentCreate, IncidentGrade, IncidentOut, IncidentStatus, IncidentUpdate,
IncidentSR,
User, UserRole,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/incidents", tags=["incidents"])
def _new_incident_id() -> str:
return f"INC-{datetime.now().strftime('%Y%m%d')}-{str(uuid4())[:6].upper()}"
# ── 장애 목록 ─────────────────────────────────────────────────────────────────
@router.get("", response_model=List[IncidentOut])
async def list_incidents(
inst_id: Optional[int] = Query(None),
grade: Optional[str] = Query(None, description="P1/P2/P3/P4"),
status: Optional[str] = Query(None),
keyword: Optional[str] = Query(None),
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
q = select(Incident)
if current_user.role == UserRole.CUSTOMER and current_user.inst_code:
r_i = await db.execute(
select(Institution).where(Institution.inst_code == current_user.inst_code)
)
own = r_i.scalars().first()
q = q.where(Incident.inst_id == own.id) if own else q.where(Incident.id == -1)
elif inst_id:
q = q.where(Incident.inst_id == inst_id)
if grade:
q = q.where(Incident.grade == grade)
if status:
q = q.where(Incident.status == status)
if keyword:
q = q.where(or_(
Incident.title.contains(keyword),
Incident.description.contains(keyword),
))
q = q.order_by(desc(Incident.occurred_at)).offset(skip).limit(limit)
result = await db.execute(q)
return result.scalars().all()
# ── 장애 등록 ─────────────────────────────────────────────────────────────────
@router.post("", response_model=IncidentOut, status_code=201)
async def create_incident(
payload: IncidentCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if current_user.role not in (UserRole.ADMIN, UserRole.PM, UserRole.ENGINEER):
raise HTTPException(403, "권한이 없습니다.")
inc = Incident(
**payload.model_dump(),
incident_id = _new_incident_id(),
reported_by = payload.reported_by or current_user.username,
occurred_at = payload.occurred_at or datetime.now(),
detected_at = datetime.now(),
)
db.add(inc)
await db.commit()
await db.refresh(inc)
logger.info(
"장애 등록: %s grade=%s reported_by=%s",
inc.incident_id, inc.grade, inc.reported_by,
)
# P1/P2는 즉시 알림
if inc.grade in (IncidentGrade.P1, IncidentGrade.P2):
await _notify_incident(inc)
return inc
# ── 장애 상세 ─────────────────────────────────────────────────────────────────
@router.get("/stats")
async def incident_stats(
inst_id: Optional[int] = Query(None),
db: AsyncSession = Depends(get_db),
_u: User = Depends(get_current_user),
):
"""장애 통계 — 등급별·상태별 집계."""
q = select(Incident)
if inst_id:
q = q.where(Incident.inst_id == inst_id)
incs = (await db.execute(q)).scalars().all()
by_grade: dict[str, int] = {}
by_status: dict[str, int] = {}
for inc in incs:
by_grade[inc.grade] = by_grade.get(inc.grade, 0) + 1
by_status[inc.status] = by_status.get(inc.status, 0) + 1
# 평균 복구 시간 (MTTRS — resolved 기준)
resolved = [
i for i in incs
if i.resolved_at and i.occurred_at
]
mttr_min = 0
if resolved:
total_sec = sum(
(i.resolved_at - i.occurred_at).total_seconds()
for i in resolved
)
mttr_min = int(total_sec / len(resolved) / 60)
return {
"total": len(incs),
"by_grade": by_grade,
"by_status": by_status,
"mttr_min": mttr_min,
"open_p1_p2": sum(
1 for i in incs
if i.grade in (IncidentGrade.P1, IncidentGrade.P2)
and i.status not in (IncidentStatus.RESOLVED, IncidentStatus.CLOSED)
),
}
@router.get("/{incident_id}", response_model=IncidentOut)
async def get_incident(
incident_id: str,
db: AsyncSession = Depends(get_db),
_u: User = Depends(get_current_user),
):
r = await db.execute(
select(Incident).where(Incident.incident_id == incident_id)
)
inc = r.scalars().first()
if not inc:
raise HTTPException(404, "장애를 찾을 수 없습니다.")
return inc
# ── 장애 수정 ─────────────────────────────────────────────────────────────────
@router.patch("/{incident_id}", response_model=IncidentOut)
async def update_incident(
incident_id: str,
payload: IncidentUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if current_user.role not in (UserRole.ADMIN, UserRole.PM, UserRole.ENGINEER):
raise HTTPException(403, "권한이 없습니다.")
r = await db.execute(
select(Incident).where(Incident.incident_id == incident_id)
)
inc = r.scalars().first()
if not inc:
raise HTTPException(404, "장애를 찾을 수 없습니다.")
for k, v in payload.model_dump(exclude_unset=True).items():
setattr(inc, k, v)
inc.updated_at = datetime.now()
await db.commit()
await db.refresh(inc)
return inc
# ── 상태 전환 ─────────────────────────────────────────────────────────────────
_VALID_TRANSITIONS: dict[str, list[str]] = {
IncidentStatus.OPEN: [IncidentStatus.INVESTIGATING, IncidentStatus.CLOSED],
IncidentStatus.INVESTIGATING: [IncidentStatus.MITIGATED, IncidentStatus.RESOLVED],
IncidentStatus.MITIGATED: [IncidentStatus.INVESTIGATING, IncidentStatus.RESOLVED],
IncidentStatus.RESOLVED: [IncidentStatus.CLOSED],
IncidentStatus.CLOSED: [],
}
@router.patch("/{incident_id}/status", response_model=IncidentOut)
async def change_status(
incident_id: str,
new_status: IncidentStatus = Query(...),
note: Optional[str] = Query(None),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if current_user.role not in (UserRole.ADMIN, UserRole.PM, UserRole.ENGINEER):
raise HTTPException(403, "권한이 없습니다.")
r = await db.execute(
select(Incident).where(Incident.incident_id == incident_id)
)
inc = r.scalars().first()
if not inc:
raise HTTPException(404, "장애를 찾을 수 없습니다.")
allowed = _VALID_TRANSITIONS.get(inc.status, [])
if new_status not in allowed:
raise HTTPException(
422,
f"'{inc.status}' 상태에서 '{new_status}'로 전환할 수 없습니다. "
f"허용: {allowed}",
)
inc.status = new_status
inc.updated_at = datetime.now()
now = datetime.now()
if new_status == IncidentStatus.MITIGATED:
inc.mitigated_at = now
elif new_status == IncidentStatus.RESOLVED:
inc.resolved_at = now
elif new_status == IncidentStatus.CLOSED:
inc.closed_at = now
await db.commit()
await db.refresh(inc)
logger.info(
"장애 상태 전환: %s%s by=%s",
incident_id, new_status, current_user.username,
)
return inc
# ── SR 연결 ───────────────────────────────────────────────────────────────────
@router.post("/{incident_id}/link-sr", status_code=201)
async def link_sr(
incident_id: str,
sr_id: str = Query(..., description="연결할 SR 번호"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if current_user.role not in (UserRole.ADMIN, UserRole.PM, UserRole.ENGINEER):
raise HTTPException(403, "권한이 없습니다.")
# 장애 존재 확인
inc_r = await db.execute(
select(Incident).where(Incident.incident_id == incident_id)
)
if not inc_r.scalars().first():
raise HTTPException(404, "장애를 찾을 수 없습니다.")
# SR 존재 확인
sr_r = await db.execute(select(SRRequest).where(SRRequest.sr_id == sr_id))
if not sr_r.scalars().first():
raise HTTPException(404, "SR을 찾을 수 없습니다.")
# 중복 확인
dup = (await db.execute(
select(IncidentSR).where(
IncidentSR.incident_id == incident_id,
IncidentSR.sr_id == sr_id,
)
)).scalars().first()
if dup:
raise HTTPException(409, "이미 연결된 SR입니다.")
link = IncidentSR(incident_id=incident_id, sr_id=sr_id)
db.add(link)
await db.commit()
return {"incident_id": incident_id, "sr_id": sr_id, "linked": True}
@router.delete("/{incident_id}/link-sr/{sr_id}", status_code=204)
async def unlink_sr(
incident_id: str,
sr_id: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if current_user.role not in (UserRole.ADMIN, UserRole.PM):
raise HTTPException(403, "ADMIN 또는 PM 권한이 필요합니다.")
r = await db.execute(
select(IncidentSR).where(
IncidentSR.incident_id == incident_id,
IncidentSR.sr_id == sr_id,
)
)
link = r.scalars().first()
if not link:
raise HTTPException(404, "연결을 찾을 수 없습니다.")
await db.delete(link)
await db.commit()
@router.get("/{incident_id}/srs")
async def list_linked_srs(
incident_id: str,
db: AsyncSession = Depends(get_db),
_u: User = Depends(get_current_user),
):
"""장애에 연결된 SR 목록."""
links = (await db.execute(
select(IncidentSR).where(IncidentSR.incident_id == incident_id)
)).scalars().all()
sr_ids = [lk.sr_id for lk in links]
if not sr_ids:
return []
srs = (await db.execute(
select(SRRequest).where(SRRequest.sr_id.in_(sr_ids))
)).scalars().all()
return [
{
"sr_id": sr.sr_id,
"title": sr.title,
"status": sr.status,
"priority": sr.priority,
"sr_type": sr.sr_type,
"created_at": sr.created_at.isoformat() if sr.created_at else None,
}
for sr in srs
]
# ── 종료 (RCA 포함) ───────────────────────────────────────────────────────────
@router.post("/{incident_id}/close", response_model=IncidentOut)
async def close_incident(
incident_id: str,
rca: str = Query(..., description="근본 원인 분석 (Root Cause Analysis)"),
prevention: Optional[str] = Query(None, description="재발 방지 조치"),
kb_doc_id: Optional[str] = Query(None, description="연관 KB 문서 ID"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if current_user.role not in (UserRole.ADMIN, UserRole.PM, UserRole.ENGINEER):
raise HTTPException(403, "권한이 없습니다.")
r = await db.execute(
select(Incident).where(Incident.incident_id == incident_id)
)
inc = r.scalars().first()
if not inc:
raise HTTPException(404, "장애를 찾을 수 없습니다.")
if inc.status == IncidentStatus.CLOSED:
raise HTTPException(409, "이미 종료된 장애입니다.")
inc.status = IncidentStatus.CLOSED
inc.rca = rca
inc.prevention = prevention
inc.kb_doc_id = kb_doc_id
inc.closed_at = datetime.now()
if not inc.resolved_at:
inc.resolved_at = datetime.now()
inc.updated_at = datetime.now()
await db.commit()
await db.refresh(inc)
logger.info(
"장애 종료: %s grade=%s closed_by=%s",
incident_id, inc.grade, current_user.username,
)
return inc
# ── G-5: 자동 RCA 분석 ────────────────────────────────────────────────────────
@router.post("/{incident_id}/auto-rca")
async def auto_rca(
incident_id: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Ollama LLM으로 RCA 초안 자동 생성 후 장애 레코드에 저장."""
inc = (await db.execute(
select(Incident).where(Incident.incident_id == incident_id)
)).scalars().first()
if not inc:
raise HTTPException(404, f"장애 {incident_id}를 찾을 수 없습니다.")
try:
from core.auto_rca import analyze_rca
result = await analyze_rca(inc.id, db)
except Exception as e:
raise HTTPException(500, f"RCA 분석 오류: {str(e)[:200]}")
# 생성된 RCA를 장애 레코드에 저장
import json as _json
rca_data = result["rca"]
inc.rca = rca_data.get("root_cause", "")
inc.prevention = _json.dumps(rca_data.get("prevention", []), ensure_ascii=False)
inc.updated_at = datetime.now()
await db.commit()
return result
# ── 내부 알림 ─────────────────────────────────────────────────────────────────
async def _notify_incident(inc: Incident) -> None:
"""P1/P2 장애 긴급 알림."""
try:
from core.notify import send_messenger
import os
room = os.getenv("MESSENGER_OPS_ROOM", "ops")
grade_emoji = {"P1": "🚨", "P2": "🔴", "P3": "🟠", "P4": "🟡"}.get(inc.grade, "⚠️")
msg = (
f"{grade_emoji} [{inc.grade}] 장애 발생\n"
f"장애번호: {inc.incident_id}\n"
f"제목: {inc.title}\n"
f"발생일시: {inc.occurred_at.strftime('%Y-%m-%d %H:%M')}\n"
f"담당: {inc.assigned_to or '미지정'}\n"
f"즉시 확인 바랍니다."
)
await send_messenger(room, {"type": "text", "text": msg})
except Exception as exc:
logger.warning("장애 알림 발송 실패: %s", exc)