guardia-itsm/core/code_review.py
DESKTOP-TKLFCPRython 64c27c3509 feat(itsm): G-1~G-12 확장 기능 + 하네스/봇/설치스크립트 구현
G-1: 메신저 Webhook Relay + _send_to_room 실제 httpx 호출 구현
G-2: POST /api/tasks/bulk SR 대량작업 엔드포인트 (최대 100건)
G-3: 라이선스 만료 알림 스케줄러 (매일 09:00 KST)
G-4: 체험판 upgrade_banner 필드 + license.py 배너 로직
G-5: core/auto_rca.py + incidents/problem auto-rca 엔드포인트
G-6: core/deploy_impact.py + vibe impact-analysis 엔드포인트
G-7: core/ticket_classifier.py + SR 생성 시 AI 분류 + ai-suggestion API
G-8: VulnPatchRecord 모델 + vuln_scan 패치추적 4개 엔드포인트
G-9: core/jira_sync.py + gateway Jira/Confluence 연동 엔드포인트
G-10: core/push_notify.py + routers/push.py + PushSubscription 모델
G-11: approvals 다중승인 (위임/서명/기한초과/마감연장)
G-12: alembic.ini + migrations/ + cicd/migrate_to_postgres.sh

하네스: guardia-orchestrator 확장기능 Phase 반영
봇명령어: /sr /status /license /bulk 슬래시 명령어 추가
설치스크립트: setup/ (Ubuntu, CentOS, RHEL, Windows) --test 옵션 포함

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 18:18:52 +09:00

426 lines
16 KiB
Python

"""
GUARDiA ITSM — B-3 코드 리뷰 에이전트 엔진
기능:
- 프로젝트 소스 트리 스캔 (C:\\GUARDiA\\projects\\{project_dir})
- 파일 청크 분할 → Ollama API 순차 전송
- 발견 항목 구조화 (severity / category / file / line / message / suggestion)
- 종합 점수 산출 (0-100)
- 결과 DB 저장 (CodeReview 모델)
Ollama 모델: codellama / deepseek-coder / qwen2.5-coder (환경변수로 선택)
내부 전용: 외부 API 호출 없음
"""
from __future__ import annotations
import json
import logging
import os
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update
logger = logging.getLogger(__name__)
# ── 설정 ──────────────────────────────────────────────────────────────────────
PROJECTS_ROOT = Path(os.getenv("GUARDIA_PROJECTS_ROOT", r"C:\GUARDiA\projects"))
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://localhost:11434")
DEFAULT_MODEL = os.getenv("CODE_REVIEW_MODEL", "codellama")
MAX_FILE_SIZE = 50_000 # 50KB 이상 파일 스킵
MAX_FILES = 50 # 프로젝트당 최대 리뷰 파일 수
CHUNK_LINES = 150 # LLM에 한 번에 보낼 최대 줄 수
# 리뷰 대상 확장자
REVIEWABLE_EXT = {
".java", ".py", ".php", ".js", ".ts", ".jsx", ".tsx",
".cs", ".go", ".rb", ".kt", ".swift", ".cpp", ".c", ".h",
".sql", ".html", ".vue", ".svelte",
}
# 제외 경로 패턴
SKIP_DIRS = {
"node_modules", ".git", "__pycache__", "target", "build",
"dist", ".venv", "venv", ".mvn", ".metadata", ".idea",
".vscode", ".settings", "vendor",
}
# ── 파일 스캔 ─────────────────────────────────────────────────────────────────
def scan_source_files(base_path: Path) -> List[Path]:
"""리뷰 가능한 소스 파일 목록 반환."""
files: List[Path] = []
for root, dirs, filenames in os.walk(base_path):
# 제외 디렉토리 건너뜀
dirs[:] = [d for d in dirs if d not in SKIP_DIRS and not d.startswith(".")]
for fn in filenames:
p = Path(root) / fn
if p.suffix.lower() not in REVIEWABLE_EXT:
continue
if p.stat().st_size > MAX_FILE_SIZE:
continue
files.append(p)
if len(files) >= MAX_FILES:
return files
return files
def detect_tech_stack(files: List[Path]) -> str:
"""파일 확장자 분포로 기술 스택 감지."""
ext_counts: Dict[str, int] = {}
for f in files:
ext = f.suffix.lower()
ext_counts[ext] = ext_counts.get(ext, 0) + 1
if not ext_counts:
return "unknown"
stack_map = {
".java": "java", ".py": "python", ".php": "php",
".go": "go", ".cs": "csharp", ".rb": "ruby",
".kt": "kotlin", ".swift": "swift", ".cpp": "cpp",
}
js_exts = {".js", ".ts", ".jsx", ".tsx", ".vue", ".svelte"}
detected = set()
for ext, count in ext_counts.items():
if count > 0:
if ext in js_exts:
detected.add("javascript")
elif ext in stack_map:
detected.add(stack_map[ext])
if len(detected) == 0:
return "mixed"
elif len(detected) == 1:
return detected.pop()
else:
return "mixed"
# ── LLM 호출 ─────────────────────────────────────────────────────────────────
async def _call_ollama(prompt: str, model: str, timeout: int = 120) -> str:
"""Ollama API 호출 (스트리밍 없이 단일 응답)."""
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": model,
"prompt": prompt,
"stream": False,
"options": {
"temperature": 0.1,
"top_p": 0.9,
"num_predict": 1500,
},
},
)
resp.raise_for_status()
data = resp.json()
return data.get("response", "")
except httpx.ConnectError:
logger.warning("Ollama 연결 실패 (%s) — 오프라인 모드로 대체", OLLAMA_URL)
return ""
except Exception as exc:
logger.warning("Ollama 호출 오류: %s", exc)
return ""
def _build_review_prompt(file_path: str, code_chunk: str, tech_stack: str, focus: Optional[str]) -> str:
"""코드 리뷰 프롬프트 생성."""
focus_hint = f"\n특히 다음에 집중: {focus}" if focus else ""
return f"""당신은 시니어 소프트웨어 엔지니어입니다. 아래 코드를 검토하고 문제점을 JSON 형식으로 보고하세요.
기술 스택: {tech_stack}
파일: {file_path}{focus_hint}
검토 항목:
1. SECURITY: SQL 인젝션, XSS, 인증/인가 취약점, 패스워드 평문 저장, 민감정보 하드코딩
2. PERFORMANCE: N+1 쿼리, 메모리 누수, 불필요한 반복, 비효율적 알고리즘
3. CODE_QUALITY: SRP 위반, 복잡도, 중복 코드, null 처리, 예외 처리 누락
4. ARCHITECTURE: 레이어 위반, 순환 의존성, 강결합
5. TESTING: 테스트 부족, 테스트 가능성
```
{code_chunk}
```
반드시 다음 JSON 배열 형식으로만 응답하세요. 설명 없이 JSON만 출력:
[
{{
"severity": "CRITICAL|HIGH|MEDIUM|LOW|INFO",
"category": "SECURITY|PERFORMANCE|CODE_QUALITY|ARCHITECTURE|TESTING|DOCUMENTATION",
"line": <줄번호 또는 null>,
"message": "문제 설명 (한국어)",
"suggestion": "개선 방안 (한국어)"
}}
]
문제가 없으면 빈 배열 [] 반환."""
def _parse_findings(llm_output: str, file_path: str) -> List[Dict[str, Any]]:
"""LLM 출력에서 findings JSON 파싱."""
if not llm_output.strip():
return []
try:
# JSON 배열 추출 (앞뒤 텍스트 제거)
start = llm_output.find("[")
end = llm_output.rfind("]") + 1
if start == -1 or end == 0:
return []
raw = llm_output[start:end]
items = json.loads(raw)
result = []
for item in items:
if not isinstance(item, dict):
continue
result.append({
"file": file_path,
"severity": item.get("severity", "INFO"),
"category": item.get("category", "CODE_QUALITY"),
"line": item.get("line"),
"message": item.get("message", ""),
"suggestion": item.get("suggestion", ""),
})
return result
except (json.JSONDecodeError, ValueError):
return []
# ── 점수 산출 ─────────────────────────────────────────────────────────────────
def _calculate_score(findings: List[Dict[str, Any]]) -> int:
"""발견 항목 기반 점수 산출 (100점 만점, 감점 방식)."""
if not findings:
return 95 # 발견 없음: 기본 95점
deductions = {
"CRITICAL": 20,
"HIGH": 10,
"MEDIUM": 5,
"LOW": 2,
"INFO": 0,
}
total_deduction = sum(deductions.get(f.get("severity", "INFO"), 0) for f in findings)
score = max(0, 100 - total_deduction)
return score
# ── 요약 생성 ─────────────────────────────────────────────────────────────────
async def _generate_summary(
findings: List[Dict[str, Any]],
tech_stack: str,
score: int,
model: str,
) -> str:
"""전체 리뷰 결과 요약 생성."""
if not findings:
return f"코드 품질 점수: {score}/100. 특별한 문제점이 발견되지 않았습니다."
# 심각도별 카운트
counts = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0, "INFO": 0}
for f in findings:
counts[f.get("severity", "INFO")] = counts.get(f.get("severity", "INFO"), 0) + 1
# 주요 발견 항목 (CRITICAL, HIGH만)
critical_findings = [f for f in findings if f.get("severity") in ("CRITICAL", "HIGH")][:5]
critical_text = "\n".join([f"- [{f['severity']}] {f['message']}" for f in critical_findings])
prompt = f"""다음 코드 리뷰 결과를 3-5문장으로 요약하세요. 한국어로 작성.
기술스택: {tech_stack}, 점수: {score}/100
발견 항목: CRITICAL={counts['CRITICAL']}, HIGH={counts['HIGH']}, MEDIUM={counts['MEDIUM']}, LOW={counts['LOW']}
주요 문제:
{critical_text}
요약:"""
summary = await _call_ollama(prompt, model, timeout=30)
if not summary.strip():
# LLM 없을 때 기본 요약
parts = []
if counts["CRITICAL"] > 0:
parts.append(f"심각한 보안/오류 {counts['CRITICAL']}")
if counts["HIGH"] > 0:
parts.append(f"높은 우선순위 문제 {counts['HIGH']}")
if counts["MEDIUM"] > 0:
parts.append(f"중간 우선순위 {counts['MEDIUM']}")
summary = (
f"코드 품질 점수: {score}/100. "
+ (", ".join(parts) + "이 발견되었습니다." if parts else "경미한 개선 사항이 있습니다.")
)
return summary.strip()
# ── 메인 리뷰 실행 ────────────────────────────────────────────────────────────
async def run_code_review(
review_id: int,
project_dir: str,
db: AsyncSession,
target_subpath: Optional[str] = None,
focus: Optional[str] = None,
model: str = DEFAULT_MODEL,
) -> Dict[str, Any]:
"""
코드 리뷰 실행 메인 함수.
Args:
review_id: CodeReview.id (상태 업데이트용)
project_dir: C:\\GUARDiA\\projects\\ 하위 디렉토리명
db: AsyncSession (상태 업데이트용)
target_subpath: 특정 서브경로만 리뷰 (None=전체)
focus: 리뷰 집중 포인트 (예: "security")
model: Ollama 모델명
Returns:
{score, summary, findings, file_count, line_count}
"""
from models import CodeReview
start_time = time.time()
async def _update_status(status: str, error: Optional[str] = None):
await db.execute(
update(CodeReview)
.where(CodeReview.id == review_id)
.values(status=status, error_msg=error)
)
await db.commit()
await _update_status("RUNNING")
try:
base_path = PROJECTS_ROOT / project_dir
if target_subpath:
base_path = base_path / target_subpath
if not base_path.exists():
await _update_status("FAILED", f"경로를 찾을 수 없습니다: {base_path}")
return {"score": 0, "summary": f"경로 없음: {base_path}", "findings": [], "file_count": 0, "line_count": 0}
# 파일 스캔
files = scan_source_files(base_path)
tech_stack = detect_tech_stack(files)
total_lines = 0
all_findings: List[Dict[str, Any]] = []
logger.info("코드 리뷰 시작: %s (%d파일, 스택=%s)", project_dir, len(files), tech_stack)
for file_path in files:
try:
code = file_path.read_text(encoding="utf-8", errors="replace")
lines = code.splitlines()
total_lines += len(lines)
# 큰 파일은 청크 분할
for chunk_start in range(0, len(lines), CHUNK_LINES):
chunk = "\n".join(lines[chunk_start:chunk_start + CHUNK_LINES])
if not chunk.strip():
continue
rel_path = str(file_path.relative_to(PROJECTS_ROOT))
prompt = _build_review_prompt(rel_path, chunk, tech_stack, focus)
llm_output = await _call_ollama(prompt, model)
findings = _parse_findings(llm_output, rel_path)
# 줄 번호 오프셋 보정
for f in findings:
if f.get("line") and isinstance(f["line"], int):
f["line"] += chunk_start
all_findings.extend(findings)
except Exception as exc:
logger.warning("파일 리뷰 오류 %s: %s", file_path, exc)
score = _calculate_score(all_findings)
summary = await _generate_summary(all_findings, tech_stack, score, model)
duration = int(time.time() - start_time)
# DB 업데이트
await db.execute(
update(CodeReview)
.where(CodeReview.id == review_id)
.values(
status="DONE",
score=score,
summary=summary,
findings_json=json.dumps(all_findings, ensure_ascii=False),
file_count=len(files),
line_count=total_lines,
tech_stack=tech_stack,
model_used=model,
duration_sec=duration,
completed_at=datetime.now(),
)
)
await db.commit()
logger.info(
"코드 리뷰 완료: review_id=%d score=%d findings=%d duration=%ds",
review_id, score, len(all_findings), duration,
)
return {
"score": score,
"summary": summary,
"findings": all_findings,
"file_count": len(files),
"line_count": total_lines,
"tech_stack": tech_stack,
}
except Exception as exc:
logger.error("코드 리뷰 실패 review_id=%d: %s", review_id, exc)
await _update_status("FAILED", str(exc)[:500])
return {"score": 0, "summary": f"리뷰 실패: {exc}", "findings": [], "file_count": 0, "line_count": 0}
# ── 빠른 보안 스캔 (정규식 기반, LLM 없이) ────────────────────────────────────
SECURITY_PATTERNS: List[Tuple[str, str, str]] = [
# (패턴, 심각도, 설명)
(r"password\s*=\s*[\"'][^\"']+[\"']", "CRITICAL", "하드코딩된 패스워드"),
(r"api[_-]?key\s*=\s*[\"'][^\"']+[\"']", "CRITICAL", "하드코딩된 API 키"),
(r"secret\s*=\s*[\"'][^\"']+[\"']", "HIGH", "하드코딩된 시크릿"),
(r"execute\s*\(\s*[\"'].*?\+", "CRITICAL", "SQL 인젝션 가능성"),
(r"innerHTML\s*=", "HIGH", "XSS 취약점 가능성 (innerHTML)"),
(r"eval\s*\(", "HIGH", "eval() 사용 위험"),
(r"md5\s*\(", "MEDIUM", "취약한 해시 알고리즘 (MD5)"),
(r"sha1\s*\(", "MEDIUM", "취약한 해시 알고리즘 (SHA1)"),
(r"verify\s*=\s*False", "HIGH", "SSL 검증 비활성화"),
(r"DEBUG\s*=\s*True", "MEDIUM", "프로덕션 DEBUG 모드"),
]
def quick_security_scan(base_path: Path) -> List[Dict[str, Any]]:
"""정규식 기반 빠른 보안 취약점 스캔 (LLM 불필요)."""
import re
findings = []
files = scan_source_files(base_path)
for file_path in files:
try:
code = file_path.read_text(encoding="utf-8", errors="replace")
for i, line in enumerate(code.splitlines(), 1):
for pattern, severity, description in SECURITY_PATTERNS:
if re.search(pattern, line, re.IGNORECASE):
rel_path = str(file_path.relative_to(PROJECTS_ROOT))
findings.append({
"file": rel_path,
"severity": severity,
"category": "SECURITY",
"line": i,
"message": f"{description}: {line.strip()[:80]}",
"suggestion": "코드에서 민감 정보를 제거하고 환경변수/비밀 저장소를 사용하세요.",
})
except Exception:
pass
return findings