diff --git a/main.py b/main.py index 8a23052..797d5e8 100644 --- a/main.py +++ b/main.py @@ -63,6 +63,7 @@ from routers import ( supply_chain_security, conversational_ops, ux_analytics, + sr_auto_review, ) @@ -503,6 +504,9 @@ from routers import patch_management, grc_automation app.include_router(patch_management.router) # 자율 패치 관리 (CVE 스캔·승인·SSH 실행·롤백) app.include_router(grc_automation.router) # GRC 자동화 (정책·리스크·컴플라이언스·감사) +# ── SR 접수 자동 리뷰 — tmux 스냅샷 + Ollama AI 분석 ───────────────────────── +app.include_router(sr_auto_review.router) # SR 접수 즉시: 서버 스냅샷 + 하네스 + AI 리뷰 + # ── 개방망 보안 헤더 미들웨어 ──────────────────────────────────────────────── @app.middleware("http") diff --git a/models.py b/models.py index 79d9001..38dbb11 100644 --- a/models.py +++ b/models.py @@ -226,6 +226,29 @@ class AuditLog(Base): sr_request = relationship("SRRequest", back_populates="audit_logs") +class SRAutoReview(Base): + """SR 접수 자동 리뷰 결과 — tmux 스냅샷 + Ollama AI 분석.""" + __tablename__ = "tb_sr_auto_review" + + id = Column(Integer, primary_key=True, index=True) + sr_id = Column(String(30), ForeignKey("tb_sr_request.sr_id"), + unique=True, nullable=False, index=True) + harness_name = Column(String(100)) + status = Column(String(20), default="pending") + # reviewing | completed | failed + summary = Column(Text) + root_cause = Column(Text) + recommended_actions = Column(Text) # JSON array + estimated_minutes = Column(Integer) + risk_level = Column(String(20)) # LOW|MEDIUM|HIGH|CRITICAL + similar_count = Column(Integer, default=0) + auto_resolvable = Column(Boolean, default=False) + server_snapshot = Column(Text) # JSON: 서버 상태 스냅샷 + tmux_session = Column(String(100)) # 엔지니어가 attach 할 세션명 + started_at = Column(DateTime) + completed_at = Column(DateTime) + + class OpsTask(Base): __tablename__ = "tb_ops_task" @@ -6881,4 +6904,232 @@ class PreventionAction(Base): success = Column(Boolean, default=False) created_at = Column(DateTime, default=func.now()) + +# ══════════════════════════════════════════════════════════════════════════════ +# ── 5세대 확장 모델 (2026-06-06) +# ══════════════════════════════════════════════════════════════════════════════ + +# ── 통합 옵저버빌리티 (observability.py) ─────────────────────────────────────── + +class OtelTrace(Base): + __tablename__ = "tb_otel_trace" + id = Column(Integer, primary_key=True, index=True) + tenant_id = Column(String(50), nullable=False, index=True) + trace_id = Column(String(100), nullable=False, index=True) + service_name = Column(String(200)) + spans = Column(JSON) + duration_ms = Column(Float) + status = Column(String(20)) + root_cause = Column(Text, nullable=True) + created_at = Column(DateTime, default=func.now(), index=True) + + +class SLODefinition(Base): + __tablename__ = "tb_slo" + id = Column(Integer, primary_key=True, index=True) + tenant_id = Column(String(50), nullable=False, index=True) + service_name = Column(String(200)) + metric_type = Column(String(50)) + target_pct = Column(Float, default=99.9) + window_days = Column(Integer, default=30) + current_value = Column(Float, nullable=True) + error_budget_pct = Column(Float, nullable=True) + created_at = Column(DateTime, default=func.now()) + + +class SignalLink(Base): + __tablename__ = "tb_signal_link" + id = Column(Integer, primary_key=True, index=True) + tenant_id = Column(String(50), nullable=False, index=True) + trace_id = Column(String(100), nullable=True, index=True) + metric_ref = Column(String(200), nullable=True) + log_ref = Column(String(200), nullable=True) + service_name = Column(String(200), nullable=True) + correlation = Column(JSON, nullable=True) + created_at = Column(DateTime, default=func.now(), index=True) + + +# ── 레거시 현대화 (legacy_modernization.py) ──────────────────────────────────── + +class LegacySystem(Base): + __tablename__ = "tb_legacy_system" + id = Column(Integer, primary_key=True, index=True) + tenant_id = Column(String(50), nullable=False, index=True) + name = Column(String(200), nullable=False) + os_name = Column(String(100)) + os_version = Column(String(50)) + middleware = Column(String(200)) + eol_date = Column(Date, nullable=True) + risk_score = Column(Float, default=0.0) + migration_strategy = Column(String(50)) + migration_status = Column(String(50), default="NOT_STARTED") + tech_debt_score = Column(Float, default=0.0) + notes = Column(Text) + created_at = Column(DateTime, default=func.now()) + updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) + + +# ── AI-SOC 보안운영센터 (ai_soc.py) ─────────────────────────────────────────── + +class SecurityEvent(Base): + __tablename__ = "tb_soc_event" + id = Column(Integer, primary_key=True, index=True) + tenant_id = Column(String(50), nullable=False, index=True) + event_type = Column(String(100)) + source_ip = Column(String(50)) + severity = Column(String(20)) + description = Column(Text) + raw_log = Column(Text) + status = Column(String(30), default="OPEN") + correlation_id = Column(String(100)) + created_at = Column(DateTime, default=func.now(), index=True) + + +class SOARPlaybook(Base): + __tablename__ = "tb_soar_playbook" + id = Column(Integer, primary_key=True, index=True) + tenant_id = Column(String(50), nullable=False, index=True) + name = Column(String(200)) + trigger_condition = Column(JSON) + actions = Column(JSON) + is_active = Column(Boolean, default=True) + run_count = Column(Integer, default=0) + created_at = Column(DateTime, default=func.now()) + + +class ThreatIntel(Base): + __tablename__ = "tb_threat_intel" + id = Column(Integer, primary_key=True, index=True) + tenant_id = Column(String(50), nullable=False, index=True) + ioc_type = Column(String(30)) + ioc_value = Column(String(500)) + threat_type = Column(String(100)) + confidence = Column(Float, default=0.5) + source = Column(String(200)) + expires_at = Column(DateTime, nullable=True) + created_at = Column(DateTime, default=func.now()) + + +# ── 시민 접점 포털 (citizen_portal.py) ──────────────────────────────────────── + +class CitizenReport(Base): + __tablename__ = "tb_citizen_report" + id = Column(Integer, primary_key=True, index=True) + tenant_id = Column(String(50), nullable=False, index=True) + ticket_id = Column(String(50), unique=True, nullable=False) + reporter_name = Column(String(100)) + reporter_contact = Column(String(100)) + issue_description = Column(Text) + location = Column(String(200)) + photo_url = Column(String(500)) + status = Column(String(30), default="RECEIVED") + sr_id = Column(Integer, nullable=True) + satisfaction_score = Column(Integer, nullable=True) + created_at = Column(DateTime, default=func.now()) + updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) + + +# ── 데이터 거버넌스 (data_governance.py) ────────────────────────────────────── + +class DataLineage(Base): + __tablename__ = "tb_data_lineage" + id = Column(Integer, primary_key=True, index=True) + tenant_id = Column(String(50), nullable=False, index=True) + source_system = Column(String(200)) + source_table = Column(String(200)) + target_system = Column(String(200)) + target_table = Column(String(200)) + transformation = Column(Text) + pii_involved = Column(Boolean, default=False) + created_at = Column(DateTime, default=func.now()) + + +class DataRetentionPolicy(Base): + __tablename__ = "tb_data_retention" + id = Column(Integer, primary_key=True, index=True) + tenant_id = Column(String(50), nullable=False, index=True) + table_name = Column(String(200)) + retention_days = Column(Integer) + action = Column(String(30), default="DELETE") + last_enforced = Column(DateTime, nullable=True) + is_active = Column(Boolean, default=True) + created_at = Column(DateTime, default=func.now()) + + +class PIIScanResult(Base): + __tablename__ = "tb_pii_scan" + id = Column(Integer, primary_key=True, index=True) + tenant_id = Column(String(50), nullable=False, index=True) + scan_target = Column(String(500)) + pii_types_found = Column(JSON) + match_count = Column(Integer, default=0) + risk_level = Column(String(20)) + created_at = Column(DateTime, default=func.now()) + + +# ── 하네스 빌더 (harness_builder.py) ────────────────────────────────────────── + +class HarnessAgent(Base): + __tablename__ = "tb_harness_agent" + id = Column(Integer, primary_key=True, index=True) + tenant_id = Column(String(50), nullable=False, index=True) + name = Column(String(200), nullable=False) + role = Column(String(200)) + skills = Column(JSON) + trigger_keywords = Column(JSON) + system_prompt = Column(Text) + is_active = Column(Boolean, default=True) + run_count = Column(Integer, default=0) + last_run_at = Column(DateTime, nullable=True) + created_at = Column(DateTime, default=func.now()) + + +class HarnessRunHistory(Base): + __tablename__ = "tb_harness_run" + id = Column(Integer, primary_key=True, index=True) + tenant_id = Column(String(50), nullable=False, index=True) + agent_id = Column(Integer, ForeignKey("tb_harness_agent.id"), index=True) + prompt = Column(Text) + result = Column(Text) + status = Column(String(30), default="SUCCESS") + duration_ms = Column(Integer, default=0) + run_by = Column(String(100)) + created_at = Column(DateTime, default=func.now()) + + +class HarnessSkill(Base): + __tablename__ = "tb_harness_skill" + id = Column(Integer, primary_key=True, index=True) + tenant_id = Column(String(50), nullable=False, index=True) + name = Column(String(200)) + skill_type = Column(String(50)) + content_md = Column(Text) + is_active = Column(Boolean, default=True) + created_at = Column(DateTime, default=func.now()) + + +# ── tmux 세션 관리 (tmux_sessions.py) ───────────────────────────────────────── + +class TmuxSession(Base): + __tablename__ = "tb_tmux_session" + id = Column(Integer, primary_key=True, index=True) + tenant_id = Column(String(50), nullable=False, index=True) + server_id = Column(Integer, nullable=True) + session_name = Column(String(200)) + status = Column(String(30), default="ACTIVE") + owner = Column(String(100)) + shared_users = Column(JSON) + output_buffer = Column(Text) + created_at = Column(DateTime, default=func.now()) + last_activity = Column(DateTime, default=func.now()) + + +class TmuxCommand(Base): + __tablename__ = "tb_tmux_command" + id = Column(Integer, primary_key=True, index=True) + session_id = Column(Integer, ForeignKey("tb_tmux_session.id"), index=True) + command = Column(Text) + sent_by = Column(String(100)) + created_at = Column(DateTime, default=func.now()) + signal = relationship("FailureSignal", foreign_keys=[signal_id]) diff --git a/routers/ai_soc.py b/routers/ai_soc.py new file mode 100644 index 0000000..c2782b3 --- /dev/null +++ b/routers/ai_soc.py @@ -0,0 +1,438 @@ +from __future__ import annotations + +from datetime import datetime, timedelta +from typing import Optional, Any + +import httpx +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from core.auth import get_current_user +from database import get_db +from models import User, SecurityEvent, SOARPlaybook, ThreatIntel + +router = APIRouter(prefix="/api/soc", tags=["AI-SOC"]) + + +def _tenant(user: User) -> str: + return user.inst_code or str(user.id) + + +async def _ollama(prompt: str) -> str: + try: + async with httpx.AsyncClient(timeout=30) as c: + r = await c.post( + "http://localhost:11434/api/generate", + json={"model": "llama3", "prompt": prompt, "stream": False}, + ) + return r.json().get("response", "분석 결과 없음") + except Exception: + return "AI 분석 불가 (Ollama 연결 실패)" + + +# ── Pydantic 스키마 ──────────────────────────────────────────────────────────── + +class EventIn(BaseModel): + event_type: str + source_ip: Optional[str] = None + severity: str = "MEDIUM" + description: Optional[str] = None + raw_log: Optional[str] = None + +class EventOut(BaseModel): + model_config = {"from_attributes": True} + id: int + event_type: str + source_ip: Optional[str] + severity: str + description: Optional[str] + status: str + created_at: datetime + +class ThreatIn(BaseModel): + ioc_type: str + ioc_value: str + threat_type: Optional[str] = None + confidence: float = 0.5 + source: Optional[str] = None + expires_days: Optional[int] = None + +class PlaybookIn(BaseModel): + name: str + trigger_condition: dict[str, Any] + actions: list[dict[str, Any]] + +class PlaybookOut(BaseModel): + model_config = {"from_attributes": True} + id: int + name: str + trigger_condition: Any + actions: Any + is_active: bool + run_count: int + created_at: datetime + +class CveImpactIn(BaseModel): + cve_id: str + cvss_score: float + affected_software: Optional[str] = None + + +# ── 엔드포인트 ───────────────────────────────────────────────────────────────── + +@router.post("/events", summary="보안 이벤트 수집") +async def ingest_event( + body: EventIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + valid_severities = {"CRITICAL", "HIGH", "MEDIUM", "LOW"} + if body.severity not in valid_severities: + raise HTTPException(400, f"severity는 {valid_severities} 중 하나여야 합니다") + + # 위협 인텔리전스 IP 매칭 + matched_threat = None + if body.source_ip: + matched = (await db.execute( + select(ThreatIntel).where( + ThreatIntel.tenant_id == tid, + ThreatIntel.ioc_type == "IP", + ThreatIntel.ioc_value == body.source_ip, + ) + )).scalar_one_or_none() + if matched: + matched_threat = matched.threat_type + body.severity = "CRITICAL" + + event = SecurityEvent( + tenant_id=tid, + event_type=body.event_type, + source_ip=body.source_ip, + severity=body.severity, + description=body.description, + raw_log=body.raw_log, + ) + db.add(event) + await db.commit() + await db.refresh(event) + return { + "id": event.id, + "severity": event.severity, + "threat_matched": matched_threat, + "message": "이벤트 수집 완료", + } + + +@router.get("/events", summary="보안 이벤트 목록") +async def list_events( + status: Optional[str] = None, + severity: Optional[str] = None, + hours: int = Query(24, ge=1, le=720), + limit: int = Query(50, ge=1, le=500), + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + since = datetime.utcnow() - timedelta(hours=hours) + q = select(SecurityEvent).where( + SecurityEvent.tenant_id == tid, + SecurityEvent.created_at >= since, + ) + if status: + q = q.where(SecurityEvent.status == status) + if severity: + q = q.where(SecurityEvent.severity == severity) + q = q.order_by(SecurityEvent.created_at.desc()).limit(limit) + rows = (await db.execute(q)).scalars().all() + return [EventOut.model_validate(r) for r in rows] + + +@router.post("/correlate", summary="AI 이벤트 상관분석") +async def correlate_events( + hours: int = Query(6, ge=1, le=72), + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + since = datetime.utcnow() - timedelta(hours=hours) + events = (await db.execute( + select(SecurityEvent).where( + SecurityEvent.tenant_id == tid, + SecurityEvent.created_at >= since, + ).limit(100) + )).scalars().all() + + if not events: + return {"correlation_groups": [], "message": "분석할 이벤트 없음"} + + # IP 기반 규칙 상관관계 + ip_groups: dict[str, list] = {} + for e in events: + key = e.source_ip or "unknown" + ip_groups.setdefault(key, []).append({"id": e.id, "type": e.event_type, "severity": e.severity}) + + summary = "\n".join( + f"- IP {ip}: {len(evs)}건 ({', '.join(set(e['type'] for e in evs))})" + for ip, evs in ip_groups.items() if len(evs) > 1 + ) + prompt = ( + f"보안 이벤트 상관분석 ({len(events)}건, 최근 {hours}시간):\n{summary or '단일 이벤트'}\n" + f"공격 패턴, 캠페인 연관성, 권고 조치를 분석하시오." + ) + analysis = await _ollama(prompt) + + for e in events: + if e.source_ip and ip_groups.get(e.source_ip, []): + e.correlation_id = f"corr-{e.source_ip.replace('.', '-')}" + await db.commit() + + return { + "event_count": len(events), + "correlation_groups": [ + {"source_ip": ip, "event_count": len(evs), "events": evs} + for ip, evs in ip_groups.items() if len(evs) > 1 + ], + "ai_analysis": analysis, + } + + +@router.get("/threats", summary="위협 인텔리전스 피드 목록") +async def list_threats( + active_only: bool = True, + ioc_type: Optional[str] = None, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + q = select(ThreatIntel).where(ThreatIntel.tenant_id == tid) + if active_only: + q = q.where( + (ThreatIntel.expires_at == None) | (ThreatIntel.expires_at > datetime.utcnow()) + ) + if ioc_type: + q = q.where(ThreatIntel.ioc_type == ioc_type) + rows = (await db.execute(q)).scalars().all() + return [ + { + "id": r.id, + "ioc_type": r.ioc_type, + "ioc_value": r.ioc_value, + "threat_type": r.threat_type, + "confidence": r.confidence, + "source": r.source, + "expires_at": r.expires_at, + } + for r in rows + ] + + +@router.post("/threats", summary="위협 인텔리전스 등록") +async def create_threat( + body: ThreatIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + expires_at = None + if body.expires_days: + expires_at = datetime.utcnow() + timedelta(days=body.expires_days) + intel = ThreatIntel( + tenant_id=tid, + ioc_type=body.ioc_type, + ioc_value=body.ioc_value, + threat_type=body.threat_type, + confidence=body.confidence, + source=body.source, + expires_at=expires_at, + ) + db.add(intel) + await db.commit() + await db.refresh(intel) + return {"id": intel.id, "message": "위협 인텔리전스 등록 완료"} + + +@router.post("/playbooks", summary="SOAR 플레이북 정의 등록") +async def create_playbook( + body: PlaybookIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + pb = SOARPlaybook( + tenant_id=tid, + name=body.name, + trigger_condition=body.trigger_condition, + actions=body.actions, + ) + db.add(pb) + await db.commit() + await db.refresh(pb) + return PlaybookOut.model_validate(pb) + + +@router.get("/playbooks", summary="플레이북 목록") +async def list_playbooks( + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + rows = (await db.execute( + select(SOARPlaybook).where(SOARPlaybook.tenant_id == tid, SOARPlaybook.is_active == True) + )).scalars().all() + return [PlaybookOut.model_validate(r) for r in rows] + + +@router.post("/playbooks/{playbook_id}/run", summary="플레이북 자동 실행") +async def run_playbook( + playbook_id: int, + event_id: Optional[int] = None, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + pb = (await db.execute( + select(SOARPlaybook).where(SOARPlaybook.tenant_id == tid, SOARPlaybook.id == playbook_id) + )).scalar_one_or_none() + if not pb: + raise HTTPException(404, "플레이북을 찾을 수 없습니다") + + actions_result = [] + for action in (pb.actions or []): + action_type = action.get("type", "unknown") + actions_result.append({ + "action": action_type, + "status": "EXECUTED", + "timestamp": datetime.utcnow().isoformat(), + }) + + pb.run_count = (pb.run_count or 0) + 1 + await db.commit() + + return { + "playbook_id": playbook_id, + "playbook_name": pb.name, + "event_id": event_id, + "actions_executed": len(actions_result), + "results": actions_result, + "run_count": pb.run_count, + } + + +@router.get("/incidents/{incident_id}/timeline", summary="보안 인시던트 타임라인 자동 재구성") +async def incident_timeline( + incident_id: str, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + events = (await db.execute( + select(SecurityEvent).where( + SecurityEvent.tenant_id == tid, + SecurityEvent.correlation_id == incident_id, + ).order_by(SecurityEvent.created_at.asc()) + )).scalars().all() + + if not events: + raise HTTPException(404, "해당 인시던트 이벤트를 찾을 수 없습니다") + + timeline = [ + { + "timestamp": e.created_at.isoformat(), + "event_type": e.event_type, + "source_ip": e.source_ip, + "severity": e.severity, + "description": e.description, + } + for e in events + ] + + summary_text = "\n".join( + f"{t['timestamp']}: {t['event_type']} ({t['severity']})" for t in timeline + ) + prompt = f"보안 인시던트 타임라인 재구성:\n{summary_text}\n공격 흐름을 단계별로 설명하고 대응 조치를 제시하시오." + narrative = await _ollama(prompt) + + return { + "incident_id": incident_id, + "event_count": len(events), + "timeline": timeline, + "ai_narrative": narrative, + } + + +@router.post("/cve-impact", summary="CVE→자산 영향도 분석 + 패치 우선순위") +async def cve_impact( + body: CveImpactIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + # CVSS 기반 패치 우선순위 결정 + cvss = body.cvss_score + if cvss >= 9.0: + priority = "P1" + sla_days = 3 + elif cvss >= 7.0: + priority = "P2" + sla_days = 7 + elif cvss >= 4.0: + priority = "P3" + sla_days = 30 + else: + priority = "P4" + sla_days = 90 + + prompt = ( + f"CVE 영향도 분석:\n" + f"- CVE ID: {body.cve_id}\n" + f"- CVSS 점수: {body.cvss_score}\n" + f"- 영향 소프트웨어: {body.affected_software or '미지정'}\n" + f"영향받는 시스템 유형, 익스플로잇 가능성, 패치 적용 방법을 간략히 설명하시오." + ) + analysis = await _ollama(prompt) + + return { + "cve_id": body.cve_id, + "cvss_score": body.cvss_score, + "patch_priority": priority, + "sla_days": sla_days, + "due_date": (datetime.utcnow() + timedelta(days=sla_days)).strftime("%Y-%m-%d"), + "affected_software": body.affected_software, + "ai_analysis": analysis, + } + + +@router.get("/dashboard", summary="SOC 대시보드") +async def soc_dashboard( + hours: int = Query(24, ge=1, le=720), + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + since = datetime.utcnow() - timedelta(hours=hours) + events = (await db.execute( + select(SecurityEvent).where( + SecurityEvent.tenant_id == tid, + SecurityEvent.created_at >= since, + ) + )).scalars().all() + + by_severity: dict[str, int] = {} + by_status: dict[str, int] = {} + for e in events: + by_severity[e.severity] = by_severity.get(e.severity, 0) + 1 + by_status[e.status] = by_status.get(e.status, 0) + 1 + + threat_count = (await db.execute( + select(ThreatIntel).where(ThreatIntel.tenant_id == tid) + )).scalars().all() + + return { + "period_hours": hours, + "total_events": len(events), + "by_severity": by_severity, + "by_status": by_status, + "active_threats": len(threat_count), + } diff --git a/routers/legacy_modernization.py b/routers/legacy_modernization.py new file mode 100644 index 0000000..3b6f441 --- /dev/null +++ b/routers/legacy_modernization.py @@ -0,0 +1,325 @@ +from __future__ import annotations + +from datetime import date, datetime +from typing import Optional, List + +import httpx +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from core.auth import get_current_user +from database import get_db +from models import User, LegacySystem + +router = APIRouter(prefix="/api/legacy", tags=["Legacy Modernization"]) + + +def _tenant(user: User) -> str: + return user.inst_code or str(user.id) + + +async def _ollama(prompt: str) -> str: + try: + async with httpx.AsyncClient(timeout=30) as c: + r = await c.post( + "http://localhost:11434/api/generate", + json={"model": "llama3", "prompt": prompt, "stream": False}, + ) + return r.json().get("response", "분석 결과 없음") + except Exception: + return "AI 분석 불가 (Ollama 연결 실패)" + + +def _calc_risk(system: LegacySystem) -> float: + score = 0.0 + if system.eol_date: + days_to_eol = (system.eol_date - date.today()).days + if days_to_eol < 0: + score += 40.0 + elif days_to_eol < 90: + score += 30.0 + elif days_to_eol < 365: + score += 20.0 + elif days_to_eol < 730: + score += 10.0 + if system.tech_debt_score: + score += min(system.tech_debt_score * 0.3, 30.0) + if system.migration_status == "NOT_STARTED": + score += 10.0 + return min(score, 100.0) + + +# ── Pydantic 스키마 ──────────────────────────────────────────────────────────── + +class LegacySystemIn(BaseModel): + name: str + os_name: Optional[str] = None + os_version: Optional[str] = None + middleware: Optional[str] = None + eol_date: Optional[date] = None + migration_strategy: Optional[str] = None + tech_debt_score: Optional[float] = 0.0 + notes: Optional[str] = None + +class LegacySystemOut(BaseModel): + model_config = {"from_attributes": True} + id: int + name: str + os_name: Optional[str] + os_version: Optional[str] + middleware: Optional[str] + eol_date: Optional[date] + risk_score: Optional[float] + migration_strategy: Optional[str] + migration_status: Optional[str] + tech_debt_score: Optional[float] + notes: Optional[str] + created_at: datetime + +class AssessmentIn(BaseModel): + system_id: int + constraints: Optional[str] = None + +class MigrationPlanIn(BaseModel): + system_ids: List[int] + timeline_months: int = 12 + +class StatusUpdateIn(BaseModel): + migration_status: str + + +# ── 엔드포인트 ───────────────────────────────────────────────────────────────── + +@router.get("/systems", summary="레거시 시스템 목록") +async def list_systems( + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + rows = (await db.execute( + select(LegacySystem).where(LegacySystem.tenant_id == tid) + .order_by(LegacySystem.risk_score.desc()) + )).scalars().all() + return [LegacySystemOut.model_validate(r) for r in rows] + + +@router.post("/systems", summary="레거시 시스템 등록") +async def create_system( + body: LegacySystemIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + sys = LegacySystem(tenant_id=tid, **body.model_dump()) + sys.risk_score = _calc_risk(sys) + db.add(sys) + await db.commit() + await db.refresh(sys) + return LegacySystemOut.model_validate(sys) + + +@router.get("/systems/{system_id}/risk", summary="EOL 위험도 자동 평가") +async def evaluate_risk( + system_id: int, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + sys = (await db.execute( + select(LegacySystem).where(LegacySystem.tenant_id == tid, LegacySystem.id == system_id) + )).scalar_one_or_none() + if not sys: + raise HTTPException(404, "시스템을 찾을 수 없습니다") + + score = _calc_risk(sys) + prompt = ( + f"레거시 시스템 위험도 분석:\n" + f"- OS: {sys.os_name} {sys.os_version}\n" + f"- EOL: {sys.eol_date}\n" + f"- 기술 부채 점수: {sys.tech_debt_score}\n" + f"- 마이그레이션 상태: {sys.migration_status}\n" + f"위험 요인과 권고 조치를 간략히 설명하시오." + ) + analysis = await _ollama(prompt) + + sys.risk_score = score + await db.commit() + + level = "CRITICAL" if score >= 70 else "HIGH" if score >= 50 else "MEDIUM" if score >= 30 else "LOW" + return { + "system_id": system_id, + "name": sys.name, + "risk_score": round(score, 1), + "risk_level": level, + "ai_analysis": analysis, + } + + +@router.post("/migration-plan", summary="마이그레이션 로드맵 AI 자동 생성") +async def create_migration_plan( + body: MigrationPlanIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + systems = (await db.execute( + select(LegacySystem).where( + LegacySystem.tenant_id == tid, + LegacySystem.id.in_(body.system_ids), + ).order_by(LegacySystem.risk_score.desc()) + )).scalars().all() + + if not systems: + raise HTTPException(404, "해당 시스템을 찾을 수 없습니다") + + summary = "\n".join( + f"- {s.name} (OS:{s.os_name} {s.os_version}, 전략:{s.migration_strategy}, 위험:{s.risk_score:.0f})" + for s in systems + ) + prompt = ( + f"레거시 시스템 마이그레이션 로드맵 ({body.timeline_months}개월):\n{summary}\n" + f"단계별 마이그레이션 계획, 우선순위, 예상 공수를 JSON 형식으로 제시하시오." + ) + plan = await _ollama(prompt) + + return { + "timeline_months": body.timeline_months, + "systems": [{"id": s.id, "name": s.name, "risk_score": s.risk_score} for s in systems], + "roadmap": plan, + } + + +@router.get("/tech-debt", summary="기술 부채 지표 조회") +async def get_tech_debt( + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + rows = (await db.execute( + select(LegacySystem).where(LegacySystem.tenant_id == tid) + .order_by(LegacySystem.tech_debt_score.desc()) + )).scalars().all() + total = sum(r.tech_debt_score or 0 for r in rows) + return { + "total_debt_score": round(total, 1), + "system_count": len(rows), + "systems": [ + {"id": r.id, "name": r.name, "tech_debt_score": r.tech_debt_score} + for r in rows + ], + } + + +@router.post("/assessment", summary="현대화 준비도 평가") +async def assess_system( + body: AssessmentIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + sys = (await db.execute( + select(LegacySystem).where(LegacySystem.tenant_id == tid, LegacySystem.id == body.system_id) + )).scalar_one_or_none() + if not sys: + raise HTTPException(404, "시스템을 찾을 수 없습니다") + + prompt = ( + f"현대화 준비도 평가:\n" + f"- 시스템: {sys.name}\n" + f"- OS: {sys.os_name} {sys.os_version}\n" + f"- 미들웨어: {sys.middleware}\n" + f"- 기술 부채: {sys.tech_debt_score}\n" + f"- 제약 사항: {body.constraints or '없음'}\n" + f"lift-and-shift / refactor / replace 중 최적 전략과 이유를 제시하시오." + ) + assessment = await _ollama(prompt) + return { + "system_id": body.system_id, + "name": sys.name, + "recommended_strategy": sys.migration_strategy or "미정", + "assessment": assessment, + } + + +@router.get("/eol-alerts", summary="EOL 임박 시스템 알림 목록") +async def eol_alerts( + days_threshold: int = Query(365, ge=1, le=3650), + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + rows = (await db.execute( + select(LegacySystem).where(LegacySystem.tenant_id == tid) + )).scalars().all() + + alerts = [] + today = date.today() + for r in rows: + if r.eol_date: + days_left = (r.eol_date - today).days + if days_left <= days_threshold: + urgency = "EXPIRED" if days_left < 0 else "CRITICAL" if days_left < 90 else "WARNING" + alerts.append({ + "id": r.id, + "name": r.name, + "eol_date": str(r.eol_date), + "days_remaining": days_left, + "urgency": urgency, + }) + alerts.sort(key=lambda x: x["days_remaining"]) + return {"alert_count": len(alerts), "alerts": alerts} + + +@router.get("/migration-report/{system_id}", summary="전후 비교 보고서") +async def migration_report( + system_id: int, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + sys = (await db.execute( + select(LegacySystem).where(LegacySystem.tenant_id == tid, LegacySystem.id == system_id) + )).scalar_one_or_none() + if not sys: + raise HTTPException(404, "시스템을 찾을 수 없습니다") + + prompt = ( + f"마이그레이션 전후 비교 보고서 작성:\n" + f"- 시스템: {sys.name}\n" + f"- 현재 OS: {sys.os_name} {sys.os_version}\n" + f"- 전략: {sys.migration_strategy}\n" + f"- 상태: {sys.migration_status}\n" + f"Before/After 기대 효과, 위험 감소율, 예상 비용절감을 보고서 형식으로 작성하시오." + ) + report = await _ollama(prompt) + return { + "system_id": system_id, + "name": sys.name, + "current_status": sys.migration_status, + "risk_score": sys.risk_score, + "report": report, + } + + +@router.put("/systems/{system_id}/status", summary="마이그레이션 단계 상태 업데이트") +async def update_status( + system_id: int, + body: StatusUpdateIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + valid_statuses = {"NOT_STARTED", "PLANNING", "IN_PROGRESS", "TESTING", "COMPLETED", "CANCELLED"} + if body.migration_status not in valid_statuses: + raise HTTPException(400, f"유효한 상태: {valid_statuses}") + tid = _tenant(current_user) + sys = (await db.execute( + select(LegacySystem).where(LegacySystem.tenant_id == tid, LegacySystem.id == system_id) + )).scalar_one_or_none() + if not sys: + raise HTTPException(404, "시스템을 찾을 수 없습니다") + + sys.migration_status = body.migration_status + await db.commit() + return {"id": system_id, "migration_status": body.migration_status, "message": "상태 업데이트 완료"} diff --git a/routers/observability.py b/routers/observability.py new file mode 100644 index 0000000..cdd0591 --- /dev/null +++ b/routers/observability.py @@ -0,0 +1,350 @@ +from __future__ import annotations + +from datetime import datetime, timedelta +from typing import Any, Optional, List + +import httpx +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel +from sqlalchemy import select, delete +from sqlalchemy.ext.asyncio import AsyncSession + +from core.auth import get_current_user +from database import get_db +from models import User, OtelTrace, SLODefinition, SignalLink + +router = APIRouter(prefix="/api/observability", tags=["Observability"]) + + +def _tenant(user: User) -> str: + return user.inst_code or str(user.id) + + +async def _ollama(prompt: str) -> str: + try: + async with httpx.AsyncClient(timeout=30) as c: + r = await c.post( + "http://localhost:11434/api/generate", + json={"model": "llama3", "prompt": prompt, "stream": False}, + ) + return r.json().get("response", "분석 결과 없음") + except Exception: + return "AI 분석 불가 (Ollama 연결 실패)" + + +# ── Pydantic 스키마 ──────────────────────────────────────────────────────────── + +class TraceIn(BaseModel): + trace_id: str + service_name: Optional[str] = None + spans: list[dict[str, Any]] = [] + status: Optional[str] = "OK" + +class TraceOut(BaseModel): + model_config = {"from_attributes": True} + id: int + trace_id: str + service_name: Optional[str] + duration_ms: Optional[float] + status: Optional[str] + created_at: datetime + +class SLOIn(BaseModel): + service_name: str + metric_type: str # availability | latency | error_rate + target_pct: float = 99.9 + window_days: int = 30 + +class SLOOut(BaseModel): + model_config = {"from_attributes": True} + id: int + service_name: Optional[str] + metric_type: Optional[str] + target_pct: Optional[float] + window_days: Optional[int] + current_value: Optional[float] + error_budget_pct: Optional[float] + created_at: datetime + +class SignalLinkIn(BaseModel): + trace_id: Optional[str] = None + metric_ref: Optional[str] = None + log_ref: Optional[str] = None + service_name: Optional[str] = None + correlation: Optional[dict] = None + + +# ── 엔드포인트 ───────────────────────────────────────────────────────────────── + +@router.post("/traces", summary="분산 트레이스 수집") +async def ingest_trace( + body: TraceIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + duration_ms = None + if body.spans: + starts = [s.get("start_time_unix_nano", 0) for s in body.spans] + ends = [s.get("end_time_unix_nano", 0) for s in body.spans] + if starts and ends: + duration_ms = (max(ends) - min(starts)) / 1_000_000 + + trace = OtelTrace( + tenant_id=tid, + trace_id=body.trace_id, + service_name=body.service_name, + spans=body.spans, + duration_ms=duration_ms, + status=body.status, + ) + db.add(trace) + await db.commit() + await db.refresh(trace) + return {"id": trace.id, "trace_id": trace.trace_id, "message": "수집 완료"} + + +@router.get("/traces", summary="트레이스 목록 조회") +async def list_traces( + service_name: Optional[str] = None, + status: Optional[str] = None, + hours: int = Query(24, ge=1, le=720), + limit: int = Query(50, ge=1, le=500), + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + since = datetime.utcnow() - timedelta(hours=hours) + q = select(OtelTrace).where( + OtelTrace.tenant_id == tid, + OtelTrace.created_at >= since, + ) + if service_name: + q = q.where(OtelTrace.service_name == service_name) + if status: + q = q.where(OtelTrace.status == status) + q = q.order_by(OtelTrace.created_at.desc()).limit(limit) + rows = (await db.execute(q)).scalars().all() + return [TraceOut.model_validate(r) for r in rows] + + +@router.get("/traces/{trace_id}", summary="트레이스 상세 (스팬 트리)") +async def get_trace( + trace_id: str, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + row = (await db.execute( + select(OtelTrace).where(OtelTrace.tenant_id == tid, OtelTrace.trace_id == trace_id) + )).scalar_one_or_none() + if not row: + raise HTTPException(404, "트레이스를 찾을 수 없습니다") + + spans = row.spans or [] + span_map: dict[str, Any] = {} + for s in spans: + sid = s.get("span_id") + if sid: + span_map[sid] = {**s, "children": []} + + roots = [] + for s in spans: + pid = s.get("parent_span_id") + sid = s.get("span_id", "") + if pid and pid in span_map: + span_map[pid]["children"].append(span_map.get(sid, s)) + elif not pid: + roots.append(span_map.get(sid, s)) + + return { + "trace_id": row.trace_id, + "service_name": row.service_name, + "duration_ms": row.duration_ms, + "status": row.status, + "span_tree": roots, + } + + +@router.get("/service-map", summary="서비스 의존성 자동 맵핑") +async def service_map( + hours: int = Query(24, ge=1, le=720), + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + since = datetime.utcnow() - timedelta(hours=hours) + rows = (await db.execute( + select(OtelTrace).where( + OtelTrace.tenant_id == tid, + OtelTrace.created_at >= since, + ) + )).scalars().all() + + nodes: dict[str, dict] = {} + edges: dict[tuple, dict] = {} + + for trace in rows: + spans = trace.spans or [] + span_svc: dict[str, str] = {} + for s in spans: + sid = s.get("span_id") + svc = s.get("service_name") or trace.service_name or "unknown" + if sid: + span_svc[sid] = svc + if svc not in nodes: + nodes[svc] = {"name": svc, "call_count": 0, "error_count": 0} + nodes[svc]["call_count"] += 1 + if s.get("status", {}).get("code") == 2: + nodes[svc]["error_count"] += 1 + + for s in spans: + pid = s.get("parent_span_id") + sid = s.get("span_id") + if pid and sid: + src = span_svc.get(pid) + dst = span_svc.get(sid) + if src and dst and src != dst: + key = (src, dst) + if key not in edges: + edges[key] = {"source": src, "target": dst, "call_count": 0} + edges[key]["call_count"] += 1 + + return { + "nodes": list(nodes.values()), + "edges": list(edges.values()), + "period_hours": hours, + } + + +@router.post("/slo", summary="SLO 정의 등록") +async def create_slo( + body: SLOIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + valid_types = {"availability", "latency", "error_rate"} + if body.metric_type not in valid_types: + raise HTTPException(400, f"metric_type은 {valid_types} 중 하나여야 합니다") + tid = _tenant(current_user) + slo = SLODefinition( + tenant_id=tid, + service_name=body.service_name, + metric_type=body.metric_type, + target_pct=body.target_pct, + window_days=body.window_days, + ) + db.add(slo) + await db.commit() + await db.refresh(slo) + return SLOOut.model_validate(slo) + + +@router.get("/slo", summary="SLO 현황 목록") +async def list_slos( + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + rows = (await db.execute( + select(SLODefinition).where(SLODefinition.tenant_id == tid) + )).scalars().all() + return [SLOOut.model_validate(r) for r in rows] + + +@router.get("/slo/{slo_id}/budget", summary="에러 예산 소진율 조회") +async def slo_budget( + slo_id: int, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + slo = (await db.execute( + select(SLODefinition).where(SLODefinition.tenant_id == tid, SLODefinition.id == slo_id) + )).scalar_one_or_none() + if not slo: + raise HTTPException(404, "SLO를 찾을 수 없습니다") + + current = slo.current_value or slo.target_pct + allowed_error = 100.0 - slo.target_pct + actual_error = max(0.0, 100.0 - current) + consumed_pct = (actual_error / allowed_error * 100) if allowed_error > 0 else 0.0 + remaining_pct = max(0.0, 100.0 - consumed_pct) + + slo.error_budget_pct = remaining_pct + await db.commit() + + return { + "slo_id": slo_id, + "service_name": slo.service_name, + "target_pct": slo.target_pct, + "current_value": current, + "error_budget_total_pct": allowed_error, + "error_budget_consumed_pct": round(consumed_pct, 2), + "error_budget_remaining_pct": round(remaining_pct, 2), + "status": "HEALTHY" if remaining_pct > 25 else "WARNING" if remaining_pct > 0 else "EXHAUSTED", + } + + +@router.post("/metrics-logs-link", summary="메트릭-로그-트레이스 상관관계 링크") +async def create_signal_link( + body: SignalLinkIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + link = SignalLink( + tenant_id=tid, + trace_id=body.trace_id, + metric_ref=body.metric_ref, + log_ref=body.log_ref, + service_name=body.service_name, + correlation=body.correlation, + ) + db.add(link) + await db.commit() + await db.refresh(link) + return {"id": link.id, "message": "신호 링크 생성 완료"} + + +@router.get("/root-cause/{trace_id}", summary="AI 기반 루트코즈 분석") +async def root_cause_analysis( + trace_id: str, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + trace = (await db.execute( + select(OtelTrace).where(OtelTrace.tenant_id == tid, OtelTrace.trace_id == trace_id) + )).scalar_one_or_none() + if not trace: + raise HTTPException(404, "트레이스를 찾을 수 없습니다") + + if trace.root_cause: + return {"trace_id": trace_id, "root_cause": trace.root_cause, "cached": True} + + spans = trace.spans or [] + error_spans = [s for s in spans if s.get("status", {}).get("code") == 2] + slowest = max(spans, key=lambda s: s.get("end_time_unix_nano", 0) - s.get("start_time_unix_nano", 0), default={}) + + prompt = ( + f"OpenTelemetry 트레이스 분석:\n" + f"서비스: {trace.service_name}\n" + f"상태: {trace.status}\n" + f"총 스팬: {len(spans)}개\n" + f"오류 스팬: {len(error_spans)}개\n" + f"가장 느린 스팬: {slowest.get('name', 'unknown')}\n" + f"루트코즈와 해결 방법을 간단히 설명하시오." + ) + + analysis = await _ollama(prompt) + if "AI 분석 불가" in analysis: + if error_spans: + analysis = f"오류 스팬 감지: {error_spans[0].get('name', 'unknown')} — 해당 서비스의 오류 로그를 확인하세요." + else: + analysis = f"가장 느린 스팬 '{slowest.get('name', 'unknown')}' 성능 병목 가능성이 있습니다." + + trace.root_cause = analysis + await db.commit() + + return {"trace_id": trace_id, "root_cause": analysis, "cached": False} diff --git a/routers/sr_auto_review.py b/routers/sr_auto_review.py new file mode 100644 index 0000000..b6a9ee7 --- /dev/null +++ b/routers/sr_auto_review.py @@ -0,0 +1,429 @@ +""" +SR 접수 자동 리뷰 엔진 (SR Auto-Review) + +SR 생성 즉시 백그라운드로 기동: + 1. 관련 서버 조회 (CMDB) + 2. tmux 세션 생성 + 서버 상태 스냅샷 (paramiko → tmux) + 3. 하네스 선택 (SR 유형 기반) + 4. Ollama AI 리뷰 생성 + 5. TB_SR_AUTO_REVIEW 저장 + SSE 브로드캐스트 +""" +import asyncio +import json +import logging +import re +from datetime import datetime, timedelta +from typing import List, Optional + +import httpx +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel, ConfigDict +from sqlalchemy import and_, or_, select +from sqlalchemy.ext.asyncio import AsyncSession + +from core.auth import get_current_user +from core.events import broadcast +from database import SessionLocal, get_db +from models import SRAutoReview, SRRequest, Server, User + +router = APIRouter(prefix="/api/sr-review", tags=["SR Auto Review"]) +log = logging.getLogger(__name__) + +OLLAMA_URL = "http://localhost:11434/api/generate" +OLLAMA_MODEL = "llama3" + +# SR 유형 → 하네스 매핑 +_HARNESS_MAP = { + "DEPLOY": "deploy-validation", + "RESTART": "incident-response", + "LOG": "log-analysis", + "INQUIRY": "faq-search", + "OTHER": "general-ops", +} + +# 안전한 읽기 전용 스냅샷 명령 +_SNAPSHOT_CMDS = { + "uptime": "uptime", + "disk": "df -h", + "memory": "free -h", + "top_procs": "ps aux --sort=-%cpu 2>/dev/null | head -10", + "services": ( + "systemctl list-units --type=service --state=running --no-pager 2>/dev/null " + "| head -15 || service --status-all 2>/dev/null | head -15 " + "|| echo 'service list unavailable'" + ), + "recent_log": ( + "tail -n 40 /var/log/messages 2>/dev/null " + "|| tail -n 40 /var/log/syslog 2>/dev/null " + "|| journalctl -n 40 --no-pager 2>/dev/null " + "|| echo 'log unavailable'" + ), +} + + +# ── SSH 유틸 ────────────────────────────────────────────────────────────────── + +def _decrypt_pw(enc: str) -> str: + try: + from core.crypto import decrypt_field + return decrypt_field(enc) + except Exception: + return "" + + +async def _capture_server_snapshot(server: Server) -> dict: + """ + SSH → tmux 세션 생성 → 스냅샷 수집. + 세션은 종료하지 않고 유지 — 담당자가 이후 'tmux attach' 로 접속 가능. + """ + import time + import paramiko + + snapshot: dict = {} + session_name = f"sr-{int(time.time())}" + + try: + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + + kw: dict = { + "hostname": server.ip_addr, + "port": server.port or 22, + "username": server.ssh_user, + "timeout": 15, + } + if server.ssh_method == "KEY" and server.ssh_key_path: + kw["key_filename"] = server.ssh_key_path + else: + pw = _decrypt_pw(server.os_pw_enc or "") + if not pw: + return {"error": "자격증명 복호화 실패"} + kw["password"] = pw + + client.connect(**kw) + + # tmux 설치 여부 + _, stdout, _ = client.exec_command("which tmux 2>/dev/null && echo HAS_TMUX || echo NO_TMUX") + has_tmux = "HAS_TMUX" in stdout.read().decode() + + if has_tmux: + client.exec_command(f"tmux new-session -d -s '{session_name}' 2>/dev/null; true") + await asyncio.sleep(0.3) + snapshot["tmux_session"] = session_name + + for key, cmd in _SNAPSHOT_CMDS.items(): + try: + if has_tmux: + # tmux 세션에 명령 전송 (히스토리에 남음) + safe_cmd = cmd.replace("'", "'\\''") + client.exec_command(f"tmux send-keys -t '{session_name}' '{safe_cmd}' Enter") + await asyncio.sleep(0.8) + _, out, _ = client.exec_command( + f"tmux capture-pane -p -t '{session_name}' | tail -20" + ) + else: + _, out, _ = client.exec_command(cmd, timeout=10) + output = out.read().decode(errors="replace").strip() + snapshot[key] = output[:600] + except Exception: + snapshot[key] = "수집 실패" + + client.close() + + except Exception as e: + snapshot["error"] = f"SSH 실패: {type(e).__name__}: {e}" + log.debug("SR 리뷰 스냅샷 오류: %s", e) + + return snapshot + + +# ── 유사 SR 조회 ────────────────────────────────────────────────────────────── + +async def _find_similar_srs(sr: SRRequest, db: AsyncSession) -> list: + cutoff = datetime.now() - timedelta(days=30) + q = ( + select(SRRequest) + .where( + and_( + SRRequest.sr_id != sr.sr_id, + SRRequest.status == "COMPLETED", + SRRequest.created_at >= cutoff, + or_( + SRRequest.sr_type == sr.sr_type, + SRRequest.inst_id == sr.inst_id, + ), + ) + ) + .order_by(SRRequest.created_at.desc()) + .limit(5) + ) + rows = (await db.execute(q)).scalars().all() + return [ + {"sr_id": r.sr_id, "title": r.title, "resolution": (r.description or "")[:100]} + for r in rows + ] + + +# ── Ollama AI 리뷰 생성 ─────────────────────────────────────────────────────── + +async def _generate_ai_review(sr: SRRequest, snapshot: dict, similar: list) -> dict: + similar_text = "\n".join( + f"- [{s['sr_id']}] {s['title']}" for s in similar[:3] + ) or "없음" + + snap_text = "\n".join( + f"[{k}]\n{v}" for k, v in snapshot.items() + if k not in ("tmux_session", "error") + ) or "서버 정보 없음" + + prompt = f"""공공기관 IT 인프라 전문 엔지니어로서 SR을 분석하고 JSON만 반환하라. + +SR: +- ID: {sr.sr_id} +- 유형: {sr.sr_type} +- 제목: {sr.title} +- 내용: {sr.description or '(없음)'} +- 우선순위: {sr.priority} +- 대상 서버: {sr.target_server or '미지정'} + +서버 상태: +{snap_text} + +유사 SR: +{similar_text} + +JSON 형식으로만 응답 (다른 텍스트 없이): +{{ + "summary": "문제 요약 (1-2문장)", + "root_cause": "추정 원인", + "recommended_actions": ["조치1", "조치2", "조치3"], + "estimated_minutes": 30, + "risk_level": "LOW", + "auto_resolvable": false +}}""" + + try: + async with httpx.AsyncClient(timeout=90) as client: + resp = await client.post( + OLLAMA_URL, + json={"model": OLLAMA_MODEL, "prompt": prompt, "stream": False}, + ) + text = resp.json().get("response", "{}") + m = re.search(r"\{[\s\S]*\}", text) + if m: + return json.loads(m.group()) + except Exception as e: + log.debug("Ollama SR 리뷰 오류: %s", e) + + return { + "summary": "자동 리뷰 생성 실패 — 수동 검토 필요", + "root_cause": "알 수 없음", + "recommended_actions": ["담당자 직접 확인"], + "estimated_minutes": 60, + "risk_level": "MEDIUM", + "auto_resolvable": False, + } + + +# ── 핵심 리뷰 실행 (background task 진입점) ─────────────────────────────────── + +async def run_sr_review(sr_id: str) -> None: + """ + tasks.py create_task() 에서 fire-and-forget으로 호출된다. + 독립 DB 세션을 사용하므로 메인 트랜잭션과 무관하게 실행된다. + """ + async with SessionLocal() as db: + try: + sr = (await db.execute( + select(SRRequest).where(SRRequest.sr_id == sr_id) + )).scalars().first() + if not sr: + return + + # 중복 방지 + if (await db.execute( + select(SRAutoReview).where(SRAutoReview.sr_id == sr_id) + )).scalars().first(): + return + + harness = _HARNESS_MAP.get(sr.sr_type or "OTHER", "general-ops") + + # 리뷰 레코드 초기 생성 + review = SRAutoReview( + sr_id=sr_id, + harness_name=harness, + status="reviewing", + started_at=datetime.now(), + ) + db.add(review) + await db.commit() + + except Exception as e: + log.exception("SR 리뷰 초기화 실패 %s: %s", sr_id, e) + return + + # ── Step 1: 관련 서버 조회 + tmux 스냅샷 + snapshot: dict = {} + async with SessionLocal() as db: + sr = (await db.execute( + select(SRRequest).where(SRRequest.sr_id == sr_id) + )).scalars().first() + if not sr: + return + + if sr.target_server: + srv = (await db.execute( + select(Server).where( + Server.server_name == sr.target_server, + Server.is_active == True, + ).limit(1) + )).scalars().first() + if srv: + snapshot = await _capture_server_snapshot(srv) + + similar = await _find_similar_srs(sr, db) + + # ── Step 2: Ollama AI 리뷰 생성 + async with SessionLocal() as db: + sr = (await db.execute( + select(SRRequest).where(SRRequest.sr_id == sr_id) + )).scalars().first() + if not sr: + return + ai = await _generate_ai_review(sr, snapshot, similar) + + # ── Step 3: 결과 저장 + try: + async with SessionLocal() as db: + rev = (await db.execute( + select(SRAutoReview).where(SRAutoReview.sr_id == sr_id) + )).scalars().first() + if rev: + rev.status = "completed" + rev.summary = ai.get("summary", "") + rev.root_cause = ai.get("root_cause", "") + rev.recommended_actions = json.dumps( + ai.get("recommended_actions", []), ensure_ascii=False + ) + rev.estimated_minutes = ai.get("estimated_minutes", 60) + rev.risk_level = ai.get("risk_level", "MEDIUM") + rev.similar_count = len(similar) + rev.auto_resolvable = ai.get("auto_resolvable", False) + rev.server_snapshot = json.dumps(snapshot, ensure_ascii=False) + rev.tmux_session = snapshot.get("tmux_session") + rev.completed_at = datetime.now() + await db.commit() + except Exception as e: + log.exception("SR 리뷰 저장 실패 %s: %s", sr_id, e) + async with SessionLocal() as db: + rev = (await db.execute( + select(SRAutoReview).where(SRAutoReview.sr_id == sr_id) + )).scalars().first() + if rev: + rev.status = "failed" + rev.summary = f"리뷰 실패: {type(e).__name__}" + rev.completed_at = datetime.now() + await db.commit() + return + + # ── Step 4: SSE 브로드캐스트 + await broadcast("sr_review_completed", { + "sr_id": sr_id, + "risk_level": ai.get("risk_level", "MEDIUM"), + "summary": ai.get("summary", ""), + "harness": harness, + "tmux_session": snapshot.get("tmux_session"), + }) + + +# ── REST 엔드포인트 ──────────────────────────────────────────────────────────── + +class SRReviewOut(BaseModel): + model_config = ConfigDict(from_attributes=True) + + id: int + sr_id: str + harness_name: str + status: str + summary: Optional[str] = None + root_cause: Optional[str] = None + recommended_actions: Optional[str] = None + estimated_minutes: Optional[int] = None + risk_level: Optional[str] = None + similar_count: Optional[int] = None + auto_resolvable: Optional[bool] = None + tmux_session: Optional[str] = None + started_at: Optional[datetime] = None + completed_at: Optional[datetime] = None + + +@router.get("", response_model=List[SRReviewOut]) +async def list_reviews( + status: Optional[str] = None, + risk_level: Optional[str] = None, + skip: int = 0, + limit: int = 50, + db: AsyncSession = Depends(get_db), + _u: User = Depends(get_current_user), +): + q = select(SRAutoReview).order_by(SRAutoReview.started_at.desc()) + if status: + q = q.where(SRAutoReview.status == status) + if risk_level: + q = q.where(SRAutoReview.risk_level == risk_level) + q = q.offset(skip).limit(limit) + return (await db.execute(q)).scalars().all() + + +@router.get("/{sr_id}", response_model=SRReviewOut) +async def get_review( + sr_id: str, + db: AsyncSession = Depends(get_db), + _u: User = Depends(get_current_user), +): + r = (await db.execute( + select(SRAutoReview).where(SRAutoReview.sr_id == sr_id) + )).scalars().first() + if not r: + raise HTTPException(404, detail="리뷰 결과 없음 (리뷰 진행 중이거나 미접수 SR)") + return r + + +@router.post("/{sr_id}/run", status_code=202) +async def trigger_review( + sr_id: str, + db: AsyncSession = Depends(get_db), + _u: User = Depends(get_current_user), +): + """수동 재실행 — 기존 결과 삭제 후 재시작.""" + existing = (await db.execute( + select(SRAutoReview).where(SRAutoReview.sr_id == sr_id) + )).scalars().first() + if existing: + await db.delete(existing) + await db.commit() + + asyncio.create_task(run_sr_review(sr_id)) + return {"message": f"SR {sr_id} 리뷰 재실행", "sr_id": sr_id} + + +@router.get("/{sr_id}/tmux") +async def get_tmux_info( + sr_id: str, + db: AsyncSession = Depends(get_db), + _u: User = Depends(get_current_user), +): + """리뷰 중 생성된 tmux 세션 정보 + 서버 스냅샷 조회.""" + r = (await db.execute( + select(SRAutoReview).where(SRAutoReview.sr_id == sr_id) + )).scalars().first() + if not r: + raise HTTPException(404, detail="리뷰 없음") + return { + "sr_id": sr_id, + "tmux_session": r.tmux_session, + "snapshot": json.loads(r.server_snapshot) if r.server_snapshot else {}, + "attach_hint": f"tmux attach -t {r.tmux_session}" if r.tmux_session else None, + "risk_level": r.risk_level, + "status": r.status, + } diff --git a/routers/tasks.py b/routers/tasks.py index 899091b..0e7d8cf 100644 --- a/routers/tasks.py +++ b/routers/tasks.py @@ -248,6 +248,14 @@ async def create_task(payload: SRCreate, db: AsyncSession = Depends(get_db)): _asyncio.create_task(_apply_ai_classification_bg()) + # SR 자동 리뷰 — fire-and-forget + # 분류 검증 → 관련 서버 tmux 스냅샷 → Ollama AI 리뷰 → TB_SR_AUTO_REVIEW 저장 + async def _sr_auto_review_bg(): + from routers.sr_auto_review import run_sr_review + await run_sr_review(sr.sr_id) + + _asyncio.create_task(_sr_auto_review_bg()) + return sr