""" B-4: KB 자동 업데이트 에이전트 엔진 기능: 1. 해결된 SR + 작업 로그에서 KB 후보 추출 2. Ollama LLM으로 문제/원인/해결책 구조화 추출 3. 유사 KB 중복 검사 후 자동 생성/업데이트 4. 키워드 기반 자동 분류 및 태그 생성 """ from __future__ import annotations import json import logging import re from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple import httpx from sqlalchemy import select, and_, or_, desc from sqlalchemy.ext.asyncio import AsyncSession logger = logging.getLogger(__name__) OLLAMA_URL = "http://localhost:11434/api/generate" DEFAULT_MODEL = "llama3" # ── KB 카테고리 키워드 맵 ───────────────────────────────────────────────────── _CATEGORY_KEYWORDS: Dict[str, List[str]] = { "서버 운영": ["서버", "CPU", "메모리", "디스크", "프로세스", "재기동", "OS"], "배포": ["배포", "deploy", "release", "빌드", "jar", "war", "소스"], "DB": ["데이터베이스", "DB", "쿼리", "SQL", "connection", "테이블", "lock"], "네트워크": ["네트워크", "방화벽", "포트", "연결", "timeout", "DNS", "SSL"], "보안": ["보안", "권한", "인증", "비밀번호", "접근", "인가", "취약점"], "장애 대응": ["장애", "오류", "에러", "다운", "중단", "복구", "롤백"], "WAS": ["톰캣", "tomcat", "jboss", "weblogic", "jeus", "WAS", "힙"], "웹서버": ["아파치", "apache", "nginx", "웹서버", "static"], "모니터링": ["모니터링", "알림", "임계값", "지표", "메트릭"], "일반": [], # 기본 카테고리 } def classify_category(title: str, description: str) -> str: """제목+설명 기반 KB 카테고리 분류.""" text = (title + " " + description).lower() best_cat = "일반" best_score = 0 for cat, keywords in _CATEGORY_KEYWORDS.items(): if not keywords: continue score = sum(1 for kw in keywords if kw.lower() in text) if score > best_score: best_score = score best_cat = cat return best_cat def extract_tags_rule(title: str, description: str, solution: str = "") -> List[str]: """규칙 기반 태그 추출.""" text = f"{title} {description} {solution}".lower() tags = set() # 기술 키워드 추출 tech_patterns = [ (r'\btomcat\b', "tomcat"), (r'\bnginx\b', "nginx"), (r'\bjboss\b', "jboss"), (r'\boracle\b', "oracle"), (r'\bmysql\b', "mysql"), (r'\bpostgresql\b', "postgresql"), (r'\bjava\b', "java"), (r'\bpython\b', "python"), (r'\blinux\b', "linux"), (r'\bwindows\b', "windows"), (r'\bdocker\b', "docker"), (r'\bkubernetes\b|\bk8s\b', "kubernetes"), (r'\bjvm\b|\b힙\b|\bheap\b', "jvm"), (r'\bssl\b|\btls\b', "ssl"), (r'\bcpu\b', "cpu"), (r'\b메모리\b|\bmemory\b', "memory"), (r'\b배포\b|\bdeploy\b', "deploy"), (r'\b장애\b', "장애"), (r'\b재기동\b|\brestart\b', "restart"), ] for pattern, tag in tech_patterns: if re.search(pattern, text): tags.add(tag) return sorted(list(tags))[:10] # 최대 10개 # ── Ollama KB 추출 ──────────────────────────────────────────────────────────── _KB_EXTRACT_PROMPT = """\ 다음 IT 서비스 요청(SR) 정보를 분석하여 지식 베이스(KB) 문서를 생성하라. 반드시 JSON만 반환하라. SR 제목: {title} SR 설명: {description} 해결 방법 / 작업 로그: {work_log} SR 유형: {sr_type} 다음 JSON을 반환하라: {{ "title": "KB 문서 제목 (짧고 명확하게)", "category": "카테고리", "symptom": "증상 요약 (2-3 문장)", "cause": "원인 분석 (2-3 문장)", "solution": "해결 방법 (단계별 설명)", "commands": ["관련 명령어 또는 쿼리 (없으면 빈 배열)"], "tags": ["태그1", "태그2"], "difficulty": "EASY | MEDIUM | HARD", "estimated_time": "예상 해결 시간 (예: 30분, 2시간)", "prevention": "재발 방지 방법 (1-2 문장)" }}""" async def extract_kb_with_llm( title: str, description: str, work_log: str, sr_type: str, model: str = DEFAULT_MODEL, timeout: int = 60, ) -> Optional[Dict]: """Ollama LLM으로 KB 지식 추출.""" prompt = _KB_EXTRACT_PROMPT.format( title=title, description=description[:500], work_log=work_log[:500] if work_log else "없음", sr_type=sr_type, ) try: async with httpx.AsyncClient(timeout=timeout) as client: resp = await client.post( OLLAMA_URL, json={"model": model, "prompt": prompt, "stream": False}, ) if resp.status_code != 200: return None raw = resp.json().get("response", "") start = raw.find("{") end = raw.rfind("}") + 1 if start >= 0 and end > start: return json.loads(raw[start:end]) except (httpx.ConnectError, httpx.TimeoutException): logger.debug("Ollama 미연결 — KB 규칙 기반 생성") except json.JSONDecodeError as e: logger.debug("LLM KB JSON 파싱 실패: %s", e) return None def extract_kb_rule( title: str, description: str, work_log: str, sr_type: str, ) -> Dict: """규칙 기반 KB 지식 추출 (Ollama 미연결 폴백).""" category = classify_category(title, description) tags = extract_tags_rule(title, description, work_log) # 작업 로그에서 명령어 패턴 추출 commands = [] if work_log: cmd_lines = re.findall(r'(?:^|\n)\s*[$#>]\s*(.+)', work_log) commands = [c.strip() for c in cmd_lines[:5] if c.strip()] type_map = { "DEPLOY": "배포 작업", "RESTART": "서비스 재기동", "LOG": "로그 분석", "OTHER": "운영 작업", } return { "title": f"[{type_map.get(sr_type, '운영')}] {title[:60]}", "category": category, "symptom": description[:300], "cause": "SR 처리 과정에서 자동 수집된 항목입니다.", "solution": work_log[:500] if work_log else description[:300], "commands": commands, "tags": tags, "difficulty": "MEDIUM", "estimated_time": "1시간", "prevention": "정기 점검 및 모니터링 임계값 설정 권장", } # ── 유사 KB 검색 ───────────────────────────────────────────────────────────── def _tokenize(text: str) -> set: """간단 토크나이징.""" if not text: return set() tokens = re.split(r'[\s,;:.()]+', text.lower()) stopwords = {"이", "가", "을", "를", "의", "에", "는", "은", "그", "및", "the", "a", "is"} return {t for t in tokens if len(t) >= 2 and t not in stopwords} def compute_similarity(text1: str, text2: str) -> float: """자카드 유사도 계산.""" t1 = _tokenize(text1) t2 = _tokenize(text2) if not t1 or not t2: return 0.0 intersection = t1 & t2 union = t1 | t2 return len(intersection) / len(union) async def find_similar_kb( db: AsyncSession, title: str, description: str, threshold: float = 0.3, ) -> Optional[int]: """유사 KB 문서 ID 반환 (없으면 None).""" from models import KBDocument # 최근 200개 KB 검색 rows = (await db.execute( select(KBDocument).order_by(desc(KBDocument.created_at)).limit(200) )).scalars().all() best_sim = 0.0 best_id = None query_text = f"{title} {description}" for doc in rows: doc_text = f"{doc.title} {doc.symptoms or ''}" sim = compute_similarity(query_text, doc_text) if sim > best_sim: best_sim = sim best_id = doc.id if best_sim >= threshold: return best_id return None # ── KB 자동 생성 ────────────────────────────────────────────────────────────── async def auto_create_kb_from_sr( db: AsyncSession, sr_id: str, use_llm: bool = True, model: str = DEFAULT_MODEL, ) -> Optional[Dict]: """ 해결된 SR에서 KB 문서 자동 생성. Returns: {"created": bool, "kb_id": int, "title": str, "similar_id": int} """ from models import SRRequest, WorkLog, KBDocument, SRStatus # SR 조회 sr = (await db.execute( select(SRRequest).where(SRRequest.sr_id == sr_id) )).scalars().first() if not sr: logger.warning("KB 에이전트: SR %s 없음", sr_id) return None # 해결 상태 확인 if sr.status not in (SRStatus.COMPLETED, "COMPLETED", "RESOLVED"): return {"created": False, "reason": "SR이 미완료 상태"} # 이미 처리됐는지 확인 existing = (await db.execute( select(KBDocument).where(KBDocument.source_sr_id == sr_id) )).scalars().first() if existing: return {"created": False, "reason": "이미 KB 존재", "kb_id": existing.id} # 작업 로그 수집 work_logs = (await db.execute( select(WorkLog).where(WorkLog.sr_id == sr_id).order_by(WorkLog.worked_at) )).scalars().all() work_log_text = "\n".join( f"{wl.work_type}: {wl.memo or ''} {wl.result or ''}" for wl in work_logs ) title = sr.title or f"SR {sr_id}" desc = sr.description or sr.raw_text or "" sr_type = sr.sr_type or "OTHER" # LLM 또는 규칙 기반 추출 kb_data = None if use_llm: kb_data = await extract_kb_with_llm(title, desc, work_log_text, str(sr_type), model=model) if not kb_data: kb_data = extract_kb_rule(title, desc, work_log_text, str(sr_type)) # 유사 KB 중복 검사 similar_id = await find_similar_kb(db, title, desc) if similar_id: logger.info("KB 에이전트: 유사 KB %d 존재 → 생성 스킵", similar_id) return {"created": False, "reason": f"유사 KB 존재 (ID={similar_id})", "similar_id": similar_id} # doc_id 생성 (KB-YYYYMMDD-NNNN) today = datetime.utcnow().strftime("%Y%m%d") prefix = f"KB-{today}-" from sqlalchemy import desc as _desc q_seq = select(KBDocument.doc_id).where( KBDocument.doc_id.like(f"{prefix}%") ).order_by(_desc(KBDocument.doc_id)).limit(1) last_id = (await db.execute(q_seq)).scalar() seq = 1 if last_id: try: seq = int(last_id.split("-")[-1]) + 1 except ValueError: seq = 1 doc_id_val = f"{prefix}{seq:04d}" # KB 문서 생성 kb = KBDocument( doc_id = doc_id_val, title = kb_data.get("title", title), category = kb_data.get("category", "일반"), symptoms = kb_data.get("symptom", desc[:300]), cause = kb_data.get("cause", ""), solution = kb_data.get("solution", ""), commands = "\n".join(kb_data.get("commands", [])) if isinstance(kb_data.get("commands"), list) else kb_data.get("commands", ""), tags = ",".join(kb_data.get("tags", [])), source_sr_id = sr_id, author = "kb-agent", created_at = datetime.utcnow(), ) db.add(kb) await db.commit() await db.refresh(kb) logger.info("KB 에이전트: SR %s → KB %d 생성 (%s)", sr_id, kb.id, kb.title) return { "created": True, "kb_id": kb.id, "title": kb.title, "category": kb.category, "tags": kb.tags, } # ── 일괄 처리 ───────────────────────────────────────────────────────────────── async def run_kb_agent_batch( db: AsyncSession, days_back: int = 7, max_sr: int = 20, use_llm: bool = True, model: str = DEFAULT_MODEL, ) -> Dict: """ 최근 N일 해결된 SR 일괄 KB 처리. Returns: {"processed": int, "created": int, "skipped": int, "results": [...]} """ from models import SRRequest, SRStatus, KBDocument since = datetime.utcnow() - timedelta(days=days_back) # 완료 SR 조회 (KB 없는 것만) # 이미 처리된 SR ID 목록 processed_sr_ids = (await db.execute( select(KBDocument.source_sr_id).where(KBDocument.source_sr_id.isnot(None)) )).scalars().all() processed_set = set(processed_sr_ids) q = ( select(SRRequest) .where( and_( SRRequest.status == "COMPLETED", SRRequest.updated_at >= since, SRRequest.sr_id.notin_(processed_set) if processed_set else True, ) ) .order_by(desc(SRRequest.updated_at)) .limit(max_sr) ) srs = (await db.execute(q)).scalars().all() results = [] created = 0 skipped = 0 for sr in srs: try: result = await auto_create_kb_from_sr( db=db, sr_id=sr.sr_id, use_llm=use_llm, model=model ) if result: results.append({"sr_id": sr.sr_id, **result}) if result.get("created"): created += 1 else: skipped += 1 except Exception as e: logger.error("KB 에이전트 SR %s 처리 오류: %s", sr.sr_id, e) results.append({"sr_id": sr.sr_id, "created": False, "reason": str(e)[:50]}) skipped += 1 return { "processed": len(srs), "created": created, "skipped": skipped, "results": results, }