guardia-itsm/routers/code_review.py
DESKTOP-TKLFCPRython 64c27c3509 feat(itsm): G-1~G-12 확장 기능 + 하네스/봇/설치스크립트 구현
G-1: 메신저 Webhook Relay + _send_to_room 실제 httpx 호출 구현
G-2: POST /api/tasks/bulk SR 대량작업 엔드포인트 (최대 100건)
G-3: 라이선스 만료 알림 스케줄러 (매일 09:00 KST)
G-4: 체험판 upgrade_banner 필드 + license.py 배너 로직
G-5: core/auto_rca.py + incidents/problem auto-rca 엔드포인트
G-6: core/deploy_impact.py + vibe impact-analysis 엔드포인트
G-7: core/ticket_classifier.py + SR 생성 시 AI 분류 + ai-suggestion API
G-8: VulnPatchRecord 모델 + vuln_scan 패치추적 4개 엔드포인트
G-9: core/jira_sync.py + gateway Jira/Confluence 연동 엔드포인트
G-10: core/push_notify.py + routers/push.py + PushSubscription 모델
G-11: approvals 다중승인 (위임/서명/기한초과/마감연장)
G-12: alembic.ini + migrations/ + cicd/migrate_to_postgres.sh

하네스: guardia-orchestrator 확장기능 Phase 반영
봇명령어: /sr /status /license /bulk 슬래시 명령어 추가
설치스크립트: setup/ (Ubuntu, CentOS, RHEL, Windows) --test 옵션 포함

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 18:18:52 +09:00

293 lines
9.7 KiB
Python

"""
GUARDiA ITSM — B-3 코드 리뷰 API
엔드포인트:
POST /api/code-review — 코드 리뷰 요청 (비동기)
GET /api/code-review/{id} — 리뷰 결과 조회
GET /api/code-review/project/{pid}— 프로젝트 리뷰 목록
POST /api/code-review/quick-scan — 빠른 보안 스캔 (LLM 없이, 즉시 반환)
GET /api/code-review/projects/list— 리뷰 가능 프로젝트 목록 (projects/ 하위)
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
from datetime import datetime
from pathlib import Path
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from database import get_db
from models import (
User, UserRole,
Project,
CodeReview, CodeReviewOut, CodeReviewRequest,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/code-review", tags=["code-review"])
PROJECTS_ROOT = Path(os.getenv("GUARDIA_PROJECTS_ROOT", r"C:\GUARDiA\projects"))
def _require_ops(current_user: User) -> User:
if current_user.role not in (UserRole.ADMIN, UserRole.ENGINEER, UserRole.PM):
raise HTTPException(403, "엔지니어 이상 권한이 필요합니다.")
return current_user
# ── 리뷰 실행 백그라운드 작업 ─────────────────────────────────────────────────
async def _run_review_background(
review_id: int,
project_dir: str,
target_subpath: Optional[str],
focus: Optional[str],
model: str,
project_id: Optional[int],
):
"""백그라운드 코드 리뷰 실행 + Project 메타 업데이트."""
from database import SessionLocal
from core.code_review import run_code_review
async with SessionLocal() as db:
result = await run_code_review(
review_id=review_id,
project_dir=project_dir,
db=db,
target_subpath=target_subpath,
focus=focus,
model=model,
)
# Project 메타 업데이트
if project_id and result.get("score") is not None:
await db.execute(
update(Project)
.where(Project.id == project_id)
.values(
last_review_at=datetime.now(),
last_review_score=result["score"],
tech_stack=result.get("tech_stack"),
)
)
await db.commit()
# WebSocket 이벤트 발행
try:
from routers.ws import manager
await manager.broadcast("code_review_done", {
"review_id": review_id,
"project_dir": project_dir,
"score": result.get("score", 0),
"summary": (result.get("summary") or "")[:200],
"status": "DONE",
})
except Exception:
pass
# ══════════════════════════════════════════════════════════════════════════════
# 엔드포인트
# ══════════════════════════════════════════════════════════════════════════════
@router.post("", response_model=CodeReviewOut, status_code=202)
async def request_code_review(
body: CodeReviewRequest,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
코드 리뷰 요청 (비동기 실행).
- project_id OR target_path 중 하나 필수
- 결과는 GET /api/code-review/{id} 로 폴링
"""
_require_ops(current_user)
project_dir: Optional[str] = None
project_id = body.project_id
if project_id:
r = await db.execute(select(Project).where(Project.id == project_id))
proj = r.scalars().first()
if not proj:
raise HTTPException(404, f"프로젝트를 찾을 수 없습니다: {project_id}")
project_dir = proj.project_dir or proj.project_name
elif body.target_path:
project_dir = Path(body.target_path).name
else:
raise HTTPException(400, "project_id 또는 target_path 중 하나가 필요합니다.")
check_path = PROJECTS_ROOT / project_dir
if not check_path.exists():
raise HTTPException(404, f"프로젝트 디렉토리 없음: {check_path}")
# CodeReview 레코드 생성 (PENDING)
review = CodeReview(
project_id=project_id,
vibe_session_id=body.vibe_session_id,
sr_id=body.sr_id,
target_path=str(check_path),
status="PENDING",
model_used=body.model,
requested_by=current_user.username,
)
db.add(review)
await db.commit()
await db.refresh(review)
# 백그라운드 실행
background_tasks.add_task(
_run_review_background,
review_id=review.id,
project_dir=project_dir,
target_subpath=None,
focus=body.focus,
model=body.model,
project_id=project_id,
)
logger.info(
"코드 리뷰 요청: review_id=%d project=%s by=%s",
review.id, project_dir, current_user.username,
)
return review
@router.get("/projects/list")
async def list_reviewable_projects(
current_user: User = Depends(get_current_user),
):
"""C:\\GUARDiA\\projects\\ 하위 리뷰 가능 디렉토리 목록."""
_require_ops(current_user)
if not PROJECTS_ROOT.exists():
return {"projects": [], "root": str(PROJECTS_ROOT)}
projects = []
for item in sorted(PROJECTS_ROOT.iterdir()):
if item.is_dir() and not item.name.startswith("."):
# 파일 카운트 (간략히)
try:
file_count = sum(1 for _ in item.rglob("*") if _.is_file())
except Exception:
file_count = 0
projects.append({
"name": item.name,
"path": str(item),
"file_count": file_count,
})
return {"projects": projects, "root": str(PROJECTS_ROOT)}
@router.get("/{review_id}", response_model=CodeReviewOut)
async def get_review(
review_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""코드 리뷰 결과 조회."""
_require_ops(current_user)
r = await db.execute(select(CodeReview).where(CodeReview.id == review_id))
review = r.scalars().first()
if not review:
raise HTTPException(404, f"리뷰를 찾을 수 없습니다: {review_id}")
return review
@router.get("/project/{project_id}", response_model=List[CodeReviewOut])
async def list_project_reviews(
project_id: int,
limit: int = Query(10, ge=1, le=50),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""프로젝트의 코드 리뷰 이력."""
_require_ops(current_user)
rows = (await db.execute(
select(CodeReview)
.where(CodeReview.project_id == project_id)
.order_by(CodeReview.created_at.desc())
.limit(limit)
)).scalars().all()
return rows
@router.post("/quick-scan")
async def quick_security_scan(
project_dir: str = Query(..., description="프로젝트 디렉토리명 (projects/ 하위)"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
빠른 보안 취약점 스캔 (LLM 없이 정규식 기반, 즉시 반환).
SQL 인젝션, XSS, 하드코딩 패스워드 등 패턴 감지.
"""
_require_ops(current_user)
from core.code_review import quick_security_scan as do_scan, PROJECTS_ROOT as root
base_path = root / project_dir
if not base_path.exists():
raise HTTPException(404, f"프로젝트 디렉토리 없음: {base_path}")
findings = do_scan(base_path)
severity_counts = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0, "INFO": 0}
for f in findings:
sev = f.get("severity", "INFO")
severity_counts[sev] = severity_counts.get(sev, 0) + 1
return {
"project_dir": project_dir,
"scan_type": "QUICK_SECURITY",
"total_findings": len(findings),
"severity_summary": severity_counts,
"findings": findings,
"scanned_at": datetime.now().isoformat(),
}
@router.get("/{review_id}/findings")
async def get_review_findings(
review_id: int,
severity: Optional[str] = Query(None, description="CRITICAL/HIGH/MEDIUM/LOW/INFO"),
category: Optional[str] = Query(None, description="SECURITY/PERFORMANCE/CODE_QUALITY/..."),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""코드 리뷰 발견 항목 상세 조회 (필터 지원)."""
_require_ops(current_user)
r = await db.execute(select(CodeReview).where(CodeReview.id == review_id))
review = r.scalars().first()
if not review:
raise HTTPException(404, f"리뷰를 찾을 수 없습니다: {review_id}")
if review.status != "DONE":
return {"status": review.status, "findings": []}
findings = json.loads(review.findings_json or "[]")
if severity:
findings = [f for f in findings if f.get("severity") == severity.upper()]
if category:
findings = [f for f in findings if f.get("category") == category.upper()]
return {
"review_id": review_id,
"total": len(findings),
"findings": findings,
}