""" GUARDiA ITSM — B-3 코드 리뷰 API 엔드포인트: POST /api/code-review — 코드 리뷰 요청 (비동기) GET /api/code-review/{id} — 리뷰 결과 조회 GET /api/code-review/project/{pid}— 프로젝트 리뷰 목록 POST /api/code-review/quick-scan — 빠른 보안 스캔 (LLM 없이, 즉시 반환) GET /api/code-review/projects/list— 리뷰 가능 프로젝트 목록 (projects/ 하위) """ from __future__ import annotations import asyncio import json import logging import os from datetime import datetime from pathlib import Path from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user from database import get_db from models import ( User, UserRole, Project, CodeReview, CodeReviewOut, CodeReviewRequest, ) logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/code-review", tags=["code-review"]) PROJECTS_ROOT = Path(os.getenv("GUARDIA_PROJECTS_ROOT", r"C:\GUARDiA\projects")) def _require_ops(current_user: User) -> User: if current_user.role not in (UserRole.ADMIN, UserRole.ENGINEER, UserRole.PM): raise HTTPException(403, "엔지니어 이상 권한이 필요합니다.") return current_user # ── 리뷰 실행 백그라운드 작업 ───────────────────────────────────────────────── async def _run_review_background( review_id: int, project_dir: str, target_subpath: Optional[str], focus: Optional[str], model: str, project_id: Optional[int], ): """백그라운드 코드 리뷰 실행 + Project 메타 업데이트.""" from database import SessionLocal from core.code_review import run_code_review async with SessionLocal() as db: result = await run_code_review( review_id=review_id, project_dir=project_dir, db=db, target_subpath=target_subpath, focus=focus, model=model, ) # Project 메타 업데이트 if project_id and result.get("score") is not None: await db.execute( update(Project) .where(Project.id == project_id) .values( last_review_at=datetime.now(), last_review_score=result["score"], tech_stack=result.get("tech_stack"), ) ) await db.commit() # WebSocket 이벤트 발행 try: from routers.ws import manager await manager.broadcast("code_review_done", { "review_id": review_id, "project_dir": project_dir, "score": result.get("score", 0), "summary": (result.get("summary") or "")[:200], "status": "DONE", }) except Exception: pass # ══════════════════════════════════════════════════════════════════════════════ # 엔드포인트 # ══════════════════════════════════════════════════════════════════════════════ @router.post("", response_model=CodeReviewOut, status_code=202) async def request_code_review( body: CodeReviewRequest, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 코드 리뷰 요청 (비동기 실행). - project_id OR target_path 중 하나 필수 - 결과는 GET /api/code-review/{id} 로 폴링 """ _require_ops(current_user) project_dir: Optional[str] = None project_id = body.project_id if project_id: r = await db.execute(select(Project).where(Project.id == project_id)) proj = r.scalars().first() if not proj: raise HTTPException(404, f"프로젝트를 찾을 수 없습니다: {project_id}") project_dir = proj.project_dir or proj.project_name elif body.target_path: project_dir = Path(body.target_path).name else: raise HTTPException(400, "project_id 또는 target_path 중 하나가 필요합니다.") check_path = PROJECTS_ROOT / project_dir if not check_path.exists(): raise HTTPException(404, f"프로젝트 디렉토리 없음: {check_path}") # CodeReview 레코드 생성 (PENDING) review = CodeReview( project_id=project_id, vibe_session_id=body.vibe_session_id, sr_id=body.sr_id, target_path=str(check_path), status="PENDING", model_used=body.model, requested_by=current_user.username, ) db.add(review) await db.commit() await db.refresh(review) # 백그라운드 실행 background_tasks.add_task( _run_review_background, review_id=review.id, project_dir=project_dir, target_subpath=None, focus=body.focus, model=body.model, project_id=project_id, ) logger.info( "코드 리뷰 요청: review_id=%d project=%s by=%s", review.id, project_dir, current_user.username, ) return review @router.get("/projects/list") async def list_reviewable_projects( current_user: User = Depends(get_current_user), ): """C:\\GUARDiA\\projects\\ 하위 리뷰 가능 디렉토리 목록.""" _require_ops(current_user) if not PROJECTS_ROOT.exists(): return {"projects": [], "root": str(PROJECTS_ROOT)} projects = [] for item in sorted(PROJECTS_ROOT.iterdir()): if item.is_dir() and not item.name.startswith("."): # 파일 카운트 (간략히) try: file_count = sum(1 for _ in item.rglob("*") if _.is_file()) except Exception: file_count = 0 projects.append({ "name": item.name, "path": str(item), "file_count": file_count, }) return {"projects": projects, "root": str(PROJECTS_ROOT)} @router.get("/{review_id}", response_model=CodeReviewOut) async def get_review( review_id: int, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """코드 리뷰 결과 조회.""" _require_ops(current_user) r = await db.execute(select(CodeReview).where(CodeReview.id == review_id)) review = r.scalars().first() if not review: raise HTTPException(404, f"리뷰를 찾을 수 없습니다: {review_id}") return review @router.get("/project/{project_id}", response_model=List[CodeReviewOut]) async def list_project_reviews( project_id: int, limit: int = Query(10, ge=1, le=50), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """프로젝트의 코드 리뷰 이력.""" _require_ops(current_user) rows = (await db.execute( select(CodeReview) .where(CodeReview.project_id == project_id) .order_by(CodeReview.created_at.desc()) .limit(limit) )).scalars().all() return rows @router.post("/quick-scan") async def quick_security_scan( project_dir: str = Query(..., description="프로젝트 디렉토리명 (projects/ 하위)"), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 빠른 보안 취약점 스캔 (LLM 없이 정규식 기반, 즉시 반환). SQL 인젝션, XSS, 하드코딩 패스워드 등 패턴 감지. """ _require_ops(current_user) from core.code_review import quick_security_scan as do_scan, PROJECTS_ROOT as root base_path = root / project_dir if not base_path.exists(): raise HTTPException(404, f"프로젝트 디렉토리 없음: {base_path}") findings = do_scan(base_path) severity_counts = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0, "INFO": 0} for f in findings: sev = f.get("severity", "INFO") severity_counts[sev] = severity_counts.get(sev, 0) + 1 return { "project_dir": project_dir, "scan_type": "QUICK_SECURITY", "total_findings": len(findings), "severity_summary": severity_counts, "findings": findings, "scanned_at": datetime.now().isoformat(), } @router.get("/{review_id}/findings") async def get_review_findings( review_id: int, severity: Optional[str] = Query(None, description="CRITICAL/HIGH/MEDIUM/LOW/INFO"), category: Optional[str] = Query(None, description="SECURITY/PERFORMANCE/CODE_QUALITY/..."), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """코드 리뷰 발견 항목 상세 조회 (필터 지원).""" _require_ops(current_user) r = await db.execute(select(CodeReview).where(CodeReview.id == review_id)) review = r.scalars().first() if not review: raise HTTPException(404, f"리뷰를 찾을 수 없습니다: {review_id}") if review.status != "DONE": return {"status": review.status, "findings": []} findings = json.loads(review.findings_json or "[]") if severity: findings = [f for f in findings if f.get("severity") == severity.upper()] if category: findings = [f for f in findings if f.get("category") == category.upper()] return { "review_id": review_id, "total": len(findings), "findings": findings, }