""" GUARDiA ITSM — B-3 코드 리뷰 에이전트 엔진 기능: - 프로젝트 소스 트리 스캔 (C:\\GUARDiA\\projects\\{project_dir}) - 파일 청크 분할 → Ollama API 순차 전송 - 발견 항목 구조화 (severity / category / file / line / message / suggestion) - 종합 점수 산출 (0-100) - 결과 DB 저장 (CodeReview 모델) Ollama 모델: codellama / deepseek-coder / qwen2.5-coder (환경변수로 선택) 내부 전용: 외부 API 호출 없음 """ from __future__ import annotations import json import logging import os import time from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import httpx from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, update logger = logging.getLogger(__name__) # ── 설정 ────────────────────────────────────────────────────────────────────── PROJECTS_ROOT = Path(os.getenv("GUARDIA_PROJECTS_ROOT", r"C:\GUARDiA\projects")) OLLAMA_URL = os.getenv("OLLAMA_URL", "http://localhost:11434") DEFAULT_MODEL = os.getenv("CODE_REVIEW_MODEL", "codellama") MAX_FILE_SIZE = 50_000 # 50KB 이상 파일 스킵 MAX_FILES = 50 # 프로젝트당 최대 리뷰 파일 수 CHUNK_LINES = 150 # LLM에 한 번에 보낼 최대 줄 수 # 리뷰 대상 확장자 REVIEWABLE_EXT = { ".java", ".py", ".php", ".js", ".ts", ".jsx", ".tsx", ".cs", ".go", ".rb", ".kt", ".swift", ".cpp", ".c", ".h", ".sql", ".html", ".vue", ".svelte", } # 제외 경로 패턴 SKIP_DIRS = { "node_modules", ".git", "__pycache__", "target", "build", "dist", ".venv", "venv", ".mvn", ".metadata", ".idea", ".vscode", ".settings", "vendor", } # ── 파일 스캔 ───────────────────────────────────────────────────────────────── def scan_source_files(base_path: Path) -> List[Path]: """리뷰 가능한 소스 파일 목록 반환.""" files: List[Path] = [] for root, dirs, filenames in os.walk(base_path): # 제외 디렉토리 건너뜀 dirs[:] = [d for d in dirs if d not in SKIP_DIRS and not d.startswith(".")] for fn in filenames: p = Path(root) / fn if p.suffix.lower() not in REVIEWABLE_EXT: continue if p.stat().st_size > MAX_FILE_SIZE: continue files.append(p) if len(files) >= MAX_FILES: return files return files def detect_tech_stack(files: List[Path]) -> str: """파일 확장자 분포로 기술 스택 감지.""" ext_counts: Dict[str, int] = {} for f in files: ext = f.suffix.lower() ext_counts[ext] = ext_counts.get(ext, 0) + 1 if not ext_counts: return "unknown" stack_map = { ".java": "java", ".py": "python", ".php": "php", ".go": "go", ".cs": "csharp", ".rb": "ruby", ".kt": "kotlin", ".swift": "swift", ".cpp": "cpp", } js_exts = {".js", ".ts", ".jsx", ".tsx", ".vue", ".svelte"} detected = set() for ext, count in ext_counts.items(): if count > 0: if ext in js_exts: detected.add("javascript") elif ext in stack_map: detected.add(stack_map[ext]) if len(detected) == 0: return "mixed" elif len(detected) == 1: return detected.pop() else: return "mixed" # ── LLM 호출 ───────────────────────────────────────────────────────────────── async def _call_ollama(prompt: str, model: str, timeout: int = 120) -> str: """Ollama API 호출 (스트리밍 없이 단일 응답).""" try: async with httpx.AsyncClient(timeout=timeout) as client: resp = await client.post( f"{OLLAMA_URL}/api/generate", json={ "model": model, "prompt": prompt, "stream": False, "options": { "temperature": 0.1, "top_p": 0.9, "num_predict": 1500, }, }, ) resp.raise_for_status() data = resp.json() return data.get("response", "") except httpx.ConnectError: logger.warning("Ollama 연결 실패 (%s) — 오프라인 모드로 대체", OLLAMA_URL) return "" except Exception as exc: logger.warning("Ollama 호출 오류: %s", exc) return "" def _build_review_prompt(file_path: str, code_chunk: str, tech_stack: str, focus: Optional[str]) -> str: """코드 리뷰 프롬프트 생성.""" focus_hint = f"\n특히 다음에 집중: {focus}" if focus else "" return f"""당신은 시니어 소프트웨어 엔지니어입니다. 아래 코드를 검토하고 문제점을 JSON 형식으로 보고하세요. 기술 스택: {tech_stack} 파일: {file_path}{focus_hint} 검토 항목: 1. SECURITY: SQL 인젝션, XSS, 인증/인가 취약점, 패스워드 평문 저장, 민감정보 하드코딩 2. PERFORMANCE: N+1 쿼리, 메모리 누수, 불필요한 반복, 비효율적 알고리즘 3. CODE_QUALITY: SRP 위반, 복잡도, 중복 코드, null 처리, 예외 처리 누락 4. ARCHITECTURE: 레이어 위반, 순환 의존성, 강결합 5. TESTING: 테스트 부족, 테스트 가능성 ``` {code_chunk} ``` 반드시 다음 JSON 배열 형식으로만 응답하세요. 설명 없이 JSON만 출력: [ {{ "severity": "CRITICAL|HIGH|MEDIUM|LOW|INFO", "category": "SECURITY|PERFORMANCE|CODE_QUALITY|ARCHITECTURE|TESTING|DOCUMENTATION", "line": <줄번호 또는 null>, "message": "문제 설명 (한국어)", "suggestion": "개선 방안 (한국어)" }} ] 문제가 없으면 빈 배열 [] 반환.""" def _parse_findings(llm_output: str, file_path: str) -> List[Dict[str, Any]]: """LLM 출력에서 findings JSON 파싱.""" if not llm_output.strip(): return [] try: # JSON 배열 추출 (앞뒤 텍스트 제거) start = llm_output.find("[") end = llm_output.rfind("]") + 1 if start == -1 or end == 0: return [] raw = llm_output[start:end] items = json.loads(raw) result = [] for item in items: if not isinstance(item, dict): continue result.append({ "file": file_path, "severity": item.get("severity", "INFO"), "category": item.get("category", "CODE_QUALITY"), "line": item.get("line"), "message": item.get("message", ""), "suggestion": item.get("suggestion", ""), }) return result except (json.JSONDecodeError, ValueError): return [] # ── 점수 산출 ───────────────────────────────────────────────────────────────── def _calculate_score(findings: List[Dict[str, Any]]) -> int: """발견 항목 기반 점수 산출 (100점 만점, 감점 방식).""" if not findings: return 95 # 발견 없음: 기본 95점 deductions = { "CRITICAL": 20, "HIGH": 10, "MEDIUM": 5, "LOW": 2, "INFO": 0, } total_deduction = sum(deductions.get(f.get("severity", "INFO"), 0) for f in findings) score = max(0, 100 - total_deduction) return score # ── 요약 생성 ───────────────────────────────────────────────────────────────── async def _generate_summary( findings: List[Dict[str, Any]], tech_stack: str, score: int, model: str, ) -> str: """전체 리뷰 결과 요약 생성.""" if not findings: return f"코드 품질 점수: {score}/100. 특별한 문제점이 발견되지 않았습니다." # 심각도별 카운트 counts = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0, "INFO": 0} for f in findings: counts[f.get("severity", "INFO")] = counts.get(f.get("severity", "INFO"), 0) + 1 # 주요 발견 항목 (CRITICAL, HIGH만) critical_findings = [f for f in findings if f.get("severity") in ("CRITICAL", "HIGH")][:5] critical_text = "\n".join([f"- [{f['severity']}] {f['message']}" for f in critical_findings]) prompt = f"""다음 코드 리뷰 결과를 3-5문장으로 요약하세요. 한국어로 작성. 기술스택: {tech_stack}, 점수: {score}/100 발견 항목: CRITICAL={counts['CRITICAL']}, HIGH={counts['HIGH']}, MEDIUM={counts['MEDIUM']}, LOW={counts['LOW']} 주요 문제: {critical_text} 요약:""" summary = await _call_ollama(prompt, model, timeout=30) if not summary.strip(): # LLM 없을 때 기본 요약 parts = [] if counts["CRITICAL"] > 0: parts.append(f"심각한 보안/오류 {counts['CRITICAL']}건") if counts["HIGH"] > 0: parts.append(f"높은 우선순위 문제 {counts['HIGH']}건") if counts["MEDIUM"] > 0: parts.append(f"중간 우선순위 {counts['MEDIUM']}건") summary = ( f"코드 품질 점수: {score}/100. " + (", ".join(parts) + "이 발견되었습니다." if parts else "경미한 개선 사항이 있습니다.") ) return summary.strip() # ── 메인 리뷰 실행 ──────────────────────────────────────────────────────────── async def run_code_review( review_id: int, project_dir: str, db: AsyncSession, target_subpath: Optional[str] = None, focus: Optional[str] = None, model: str = DEFAULT_MODEL, ) -> Dict[str, Any]: """ 코드 리뷰 실행 메인 함수. Args: review_id: CodeReview.id (상태 업데이트용) project_dir: C:\\GUARDiA\\projects\\ 하위 디렉토리명 db: AsyncSession (상태 업데이트용) target_subpath: 특정 서브경로만 리뷰 (None=전체) focus: 리뷰 집중 포인트 (예: "security") model: Ollama 모델명 Returns: {score, summary, findings, file_count, line_count} """ from models import CodeReview start_time = time.time() async def _update_status(status: str, error: Optional[str] = None): await db.execute( update(CodeReview) .where(CodeReview.id == review_id) .values(status=status, error_msg=error) ) await db.commit() await _update_status("RUNNING") try: base_path = PROJECTS_ROOT / project_dir if target_subpath: base_path = base_path / target_subpath if not base_path.exists(): await _update_status("FAILED", f"경로를 찾을 수 없습니다: {base_path}") return {"score": 0, "summary": f"경로 없음: {base_path}", "findings": [], "file_count": 0, "line_count": 0} # 파일 스캔 files = scan_source_files(base_path) tech_stack = detect_tech_stack(files) total_lines = 0 all_findings: List[Dict[str, Any]] = [] logger.info("코드 리뷰 시작: %s (%d파일, 스택=%s)", project_dir, len(files), tech_stack) for file_path in files: try: code = file_path.read_text(encoding="utf-8", errors="replace") lines = code.splitlines() total_lines += len(lines) # 큰 파일은 청크 분할 for chunk_start in range(0, len(lines), CHUNK_LINES): chunk = "\n".join(lines[chunk_start:chunk_start + CHUNK_LINES]) if not chunk.strip(): continue rel_path = str(file_path.relative_to(PROJECTS_ROOT)) prompt = _build_review_prompt(rel_path, chunk, tech_stack, focus) llm_output = await _call_ollama(prompt, model) findings = _parse_findings(llm_output, rel_path) # 줄 번호 오프셋 보정 for f in findings: if f.get("line") and isinstance(f["line"], int): f["line"] += chunk_start all_findings.extend(findings) except Exception as exc: logger.warning("파일 리뷰 오류 %s: %s", file_path, exc) score = _calculate_score(all_findings) summary = await _generate_summary(all_findings, tech_stack, score, model) duration = int(time.time() - start_time) # DB 업데이트 await db.execute( update(CodeReview) .where(CodeReview.id == review_id) .values( status="DONE", score=score, summary=summary, findings_json=json.dumps(all_findings, ensure_ascii=False), file_count=len(files), line_count=total_lines, tech_stack=tech_stack, model_used=model, duration_sec=duration, completed_at=datetime.now(), ) ) await db.commit() logger.info( "코드 리뷰 완료: review_id=%d score=%d findings=%d duration=%ds", review_id, score, len(all_findings), duration, ) return { "score": score, "summary": summary, "findings": all_findings, "file_count": len(files), "line_count": total_lines, "tech_stack": tech_stack, } except Exception as exc: logger.error("코드 리뷰 실패 review_id=%d: %s", review_id, exc) await _update_status("FAILED", str(exc)[:500]) return {"score": 0, "summary": f"리뷰 실패: {exc}", "findings": [], "file_count": 0, "line_count": 0} # ── 빠른 보안 스캔 (정규식 기반, LLM 없이) ──────────────────────────────────── SECURITY_PATTERNS: List[Tuple[str, str, str]] = [ # (패턴, 심각도, 설명) (r"password\s*=\s*[\"'][^\"']+[\"']", "CRITICAL", "하드코딩된 패스워드"), (r"api[_-]?key\s*=\s*[\"'][^\"']+[\"']", "CRITICAL", "하드코딩된 API 키"), (r"secret\s*=\s*[\"'][^\"']+[\"']", "HIGH", "하드코딩된 시크릿"), (r"execute\s*\(\s*[\"'].*?\+", "CRITICAL", "SQL 인젝션 가능성"), (r"innerHTML\s*=", "HIGH", "XSS 취약점 가능성 (innerHTML)"), (r"eval\s*\(", "HIGH", "eval() 사용 위험"), (r"md5\s*\(", "MEDIUM", "취약한 해시 알고리즘 (MD5)"), (r"sha1\s*\(", "MEDIUM", "취약한 해시 알고리즘 (SHA1)"), (r"verify\s*=\s*False", "HIGH", "SSL 검증 비활성화"), (r"DEBUG\s*=\s*True", "MEDIUM", "프로덕션 DEBUG 모드"), ] def quick_security_scan(base_path: Path) -> List[Dict[str, Any]]: """정규식 기반 빠른 보안 취약점 스캔 (LLM 불필요).""" import re findings = [] files = scan_source_files(base_path) for file_path in files: try: code = file_path.read_text(encoding="utf-8", errors="replace") for i, line in enumerate(code.splitlines(), 1): for pattern, severity, description in SECURITY_PATTERNS: if re.search(pattern, line, re.IGNORECASE): rel_path = str(file_path.relative_to(PROJECTS_ROOT)) findings.append({ "file": rel_path, "severity": severity, "category": "SECURITY", "line": i, "message": f"{description}: {line.strip()[:80]}", "suggestion": "코드에서 민감 정보를 제거하고 환경변수/비밀 저장소를 사용하세요.", }) except Exception: pass return findings