812 lines
29 KiB
Python
812 lines
29 KiB
Python
"""
|
|
공급망 보안 (Supply Chain Security)
|
|
|
|
엔드포인트:
|
|
GET /api/supply-chain/scan — 공급망 스캔 현황
|
|
POST /api/supply-chain/scan — 전체 공급망 스캔 실행
|
|
GET /api/supply-chain/vulnerabilities — 취약점 목록 (심각도별)
|
|
POST /api/supply-chain/vulnerabilities/{id}/patch — 취약점 패치 요청 (SR 생성)
|
|
GET /api/supply-chain/dependencies — 의존성 + CVE 상태
|
|
GET /api/supply-chain/slsa-level — SLSA 레벨 평가 (0~3)
|
|
GET /api/supply-chain/pipeline-integrity — 파이프라인 무결성
|
|
GET /api/supply-chain/report — 공급망 보안 리포트
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import List, Optional
|
|
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import desc, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.auth import get_current_user
|
|
from database import get_db, SessionLocal
|
|
from models import (
|
|
SCSScan,
|
|
SLSAAssessment,
|
|
SRRequest,
|
|
SRStatus,
|
|
SRType,
|
|
SupplyChainVulnerability,
|
|
User,
|
|
UserRole,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/api/supply-chain", tags=["supply_chain_security"])
|
|
|
|
|
|
# ── 알려진 취약 패키지 내장 데이터베이스 ─────────────────────────────────────────
|
|
|
|
KNOWN_VULNERABILITIES: List[dict] = [
|
|
{
|
|
"package": "log4j",
|
|
"versions": ["<2.17.0"],
|
|
"cve": "CVE-2021-44228",
|
|
"severity": "CRITICAL",
|
|
"cvss": 10.0,
|
|
"description": "Apache Log4j2 원격 코드 실행 취약점 (Log4Shell)",
|
|
"fixed_version": "2.17.0",
|
|
},
|
|
{
|
|
"package": "spring-core",
|
|
"versions": ["<5.3.18"],
|
|
"cve": "CVE-2022-22965",
|
|
"severity": "CRITICAL",
|
|
"cvss": 9.8,
|
|
"description": "Spring Framework RCE 취약점 (Spring4Shell)",
|
|
"fixed_version": "5.3.18",
|
|
},
|
|
{
|
|
"package": "requests",
|
|
"versions": ["<2.31.0"],
|
|
"cve": "CVE-2023-32681",
|
|
"severity": "MEDIUM",
|
|
"cvss": 6.1,
|
|
"description": "requests 라이브러리 Proxy-Authorization 헤더 노출",
|
|
"fixed_version": "2.31.0",
|
|
},
|
|
{
|
|
"package": "pillow",
|
|
"versions": ["<10.0.0"],
|
|
"cve": "CVE-2023-44271",
|
|
"severity": "HIGH",
|
|
"cvss": 7.5,
|
|
"description": "Pillow 이미지 처리 DoS 취약점",
|
|
"fixed_version": "10.0.0",
|
|
},
|
|
{
|
|
"package": "openssl",
|
|
"versions": ["<3.0.7"],
|
|
"cve": "CVE-2022-3786",
|
|
"severity": "HIGH",
|
|
"cvss": 7.5,
|
|
"description": "OpenSSL 버퍼 오버플로우 취약점",
|
|
"fixed_version": "3.0.7",
|
|
},
|
|
{
|
|
"package": "django",
|
|
"versions": ["<4.2.7"],
|
|
"cve": "CVE-2023-43665",
|
|
"severity": "HIGH",
|
|
"cvss": 7.5,
|
|
"description": "Django Denial of Service 취약점",
|
|
"fixed_version": "4.2.7",
|
|
},
|
|
{
|
|
"package": "fastapi",
|
|
"versions": ["<0.109.1"],
|
|
"cve": "CVE-2024-24762",
|
|
"severity": "HIGH",
|
|
"cvss": 7.5,
|
|
"description": "FastAPI ReDoS 취약점 (multipart form data)",
|
|
"fixed_version": "0.109.1",
|
|
},
|
|
{
|
|
"package": "cryptography",
|
|
"versions": ["<41.0.6"],
|
|
"cve": "CVE-2023-49083",
|
|
"severity": "MEDIUM",
|
|
"cvss": 4.0,
|
|
"description": "Python cryptography NULL 포인터 역참조",
|
|
"fixed_version": "41.0.6",
|
|
},
|
|
]
|
|
|
|
# ── SLSA 레벨 정의 ──────────────────────────────────────────────────────────
|
|
|
|
SLSA_REQUIREMENTS = {
|
|
0: {
|
|
"name": "SLSA Level 0 — 기준 없음",
|
|
"description": "SLSA 요구사항 미충족. 기본 빌드 프로세스만 존재.",
|
|
"requirements": ["기본 소스 코드 존재"],
|
|
"guardia_checks": [],
|
|
},
|
|
1: {
|
|
"name": "SLSA Level 1 — 빌드 스크립트 정의",
|
|
"description": "빌드 프로세스가 스크립트로 정의되어 있어야 함.",
|
|
"requirements": [
|
|
"빌드 스크립트 존재 (Jenkinsfile, Makefile 등)",
|
|
"빌드 결과물 생성 기록",
|
|
],
|
|
"guardia_checks": ["Jenkinsfile 존재 여부 확인"],
|
|
},
|
|
2: {
|
|
"name": "SLSA Level 2 — 버전 관리 + CI 서비스",
|
|
"description": "버전 관리 시스템과 CI 서비스를 통한 빌드가 필요.",
|
|
"requirements": [
|
|
"소스 버전 관리 (Git)",
|
|
"CI 서비스 사용 (Jenkins)",
|
|
"빌드 출처 메타데이터 생성",
|
|
],
|
|
"guardia_checks": [
|
|
"Gitea 저장소 연결",
|
|
"Jenkins 빌드 이력",
|
|
"빌드 아티팩트 해시 기록",
|
|
],
|
|
},
|
|
3: {
|
|
"name": "SLSA Level 3 — 검증 가능한 빌드 출처",
|
|
"description": "서명된 빌드 출처(provenance)가 포함된 아티팩트 배포.",
|
|
"requirements": [
|
|
"서명된 아티팩트 (코드 서명)",
|
|
"빌드 환경 격리",
|
|
"외부 검증 가능한 빌드 출처 문서",
|
|
"재현 가능한 빌드(Reproducible Build)",
|
|
],
|
|
"guardia_checks": [
|
|
"아티팩트 서명 검증",
|
|
"빌드 환경 컨테이너 격리",
|
|
"SLSA Provenance 문서 생성",
|
|
],
|
|
},
|
|
}
|
|
|
|
|
|
# ── 버전 비교 헬퍼 ──────────────────────────────────────────────────────────
|
|
|
|
def _version_lt(ver: str, threshold: str) -> bool:
|
|
"""단순 버전 비교: ver < threshold 여부 반환."""
|
|
try:
|
|
def _parse(v: str):
|
|
return tuple(int(x) for x in v.strip().lstrip("v").split(".")[:4])
|
|
return _parse(ver) < _parse(threshold)
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _check_package_vuln(pkg_name: str, pkg_version: str) -> Optional[dict]:
|
|
"""패키지명·버전을 KNOWN_VULNERABILITIES와 대조하여 매칭 항목 반환."""
|
|
pkg_lower = pkg_name.lower()
|
|
for vuln in KNOWN_VULNERABILITIES:
|
|
if vuln["package"].lower() not in pkg_lower:
|
|
continue
|
|
for ver_constraint in vuln["versions"]:
|
|
if ver_constraint.startswith("<"):
|
|
threshold = ver_constraint[1:].strip()
|
|
if _version_lt(pkg_version, threshold):
|
|
return vuln
|
|
return None
|
|
|
|
|
|
# ── 샘플 의존성 파싱 (에이전트리스 SSH 스텁) ─────────────────────────────────
|
|
|
|
async def _parse_dependencies_sample() -> List[dict]:
|
|
"""
|
|
실제 구현 시: paramiko SSH로 requirements.txt / package.json 파싱.
|
|
현재는 현실적인 샘플 데이터를 반환한다.
|
|
"""
|
|
return [
|
|
{"name": "fastapi", "version": "0.100.0", "ecosystem": "pypi"},
|
|
{"name": "sqlalchemy", "version": "2.0.15", "ecosystem": "pypi"},
|
|
{"name": "requests", "version": "2.28.0", "ecosystem": "pypi"},
|
|
{"name": "pillow", "version": "9.5.0", "ecosystem": "pypi"},
|
|
{"name": "cryptography","version": "41.0.3", "ecosystem": "pypi"},
|
|
{"name": "pydantic", "version": "2.5.0", "ecosystem": "pypi"},
|
|
{"name": "uvicorn", "version": "0.24.0", "ecosystem": "pypi"},
|
|
{"name": "nginx", "version": "1.24.0", "ecosystem": "system"},
|
|
{"name": "openssl", "version": "3.0.2", "ecosystem": "system"},
|
|
{"name": "django", "version": "4.1.13", "ecosystem": "pypi"},
|
|
]
|
|
|
|
|
|
# ── 백그라운드 스캔 실행 ─────────────────────────────────────────────────────
|
|
|
|
async def _run_supply_chain_scan(scan_id: int) -> None:
|
|
"""전체 공급망 스캔: 의존성 파싱 → CVE 매핑 → DB 저장."""
|
|
async with SessionLocal() as db:
|
|
row = await db.execute(select(SCSScan).where(SCSScan.id == scan_id))
|
|
scan = row.scalar_one_or_none()
|
|
if not scan:
|
|
return
|
|
|
|
try:
|
|
scan.status = "running"
|
|
await db.commit()
|
|
|
|
deps = await _parse_dependencies_sample()
|
|
found_vulns = []
|
|
critical = 0
|
|
high = 0
|
|
|
|
for dep in deps:
|
|
match = _check_package_vuln(dep["name"], dep["version"])
|
|
if match:
|
|
found_vulns.append({
|
|
"package": dep["name"],
|
|
"version": dep["version"],
|
|
"cve": match["cve"],
|
|
"severity": match["severity"],
|
|
"cvss": match["cvss"],
|
|
"fixed_version": match["fixed_version"],
|
|
"description": match["description"],
|
|
})
|
|
if match["severity"] == "CRITICAL":
|
|
critical += 1
|
|
elif match["severity"] == "HIGH":
|
|
high += 1
|
|
|
|
# 취약점 레코드 upsert
|
|
existing = (await db.execute(
|
|
select(SupplyChainVulnerability).where(
|
|
SupplyChainVulnerability.cve_id == match["cve"],
|
|
SupplyChainVulnerability.package == dep["name"],
|
|
)
|
|
)).scalar_one_or_none()
|
|
|
|
if not existing:
|
|
db.add(SupplyChainVulnerability(
|
|
cve_id = match["cve"],
|
|
package = dep["name"],
|
|
version = dep["version"],
|
|
fixed_version = match["fixed_version"],
|
|
severity = match["severity"],
|
|
cvss_score = match["cvss"],
|
|
description = match["description"],
|
|
patch_available = True,
|
|
status = "open",
|
|
))
|
|
|
|
scan.status = "completed"
|
|
scan.findings_count = len(found_vulns)
|
|
scan.critical_count = critical
|
|
scan.high_count = high
|
|
scan.report = json.dumps(found_vulns, ensure_ascii=False)
|
|
await db.commit()
|
|
|
|
except Exception as exc:
|
|
scan.status = "failed"
|
|
scan.report = json.dumps({"error": str(exc)[:200]})
|
|
await db.commit()
|
|
logger.error("공급망 스캔 실패 (scan_id=%d): %s", scan_id, exc)
|
|
|
|
|
|
# ── SLSA 평가 헬퍼 ──────────────────────────────────────────────────────────
|
|
|
|
def _evaluate_slsa_level() -> dict:
|
|
"""
|
|
GUARDiA 환경 기준 SLSA 레벨 평가.
|
|
Gitea + Jenkins 운영 중 → Level 2 달성 가능.
|
|
"""
|
|
from pathlib import Path
|
|
|
|
achieved = 0
|
|
gaps: List[str] = []
|
|
details: dict = {}
|
|
|
|
# Level 1: Jenkinsfile 존재 여부
|
|
jenkinsfile_paths = [
|
|
Path("C:/GUARDiA/workspace/guardia-itsm/Jenkinsfile"),
|
|
Path("C:/GUARDiA/repos/guardia-itsm/Jenkinsfile"),
|
|
]
|
|
has_jenkinsfile = any(p.exists() for p in jenkinsfile_paths)
|
|
details["level1_jenkinsfile"] = has_jenkinsfile
|
|
if not has_jenkinsfile:
|
|
gaps.append("Jenkinsfile 미존재 — CI 빌드 스크립트 정의 필요")
|
|
|
|
# Level 2: Gitea 저장소 연결 + Jenkins 가용성
|
|
gitea_repo_exists = Path("C:/GUARDiA/repos/guardia-itsm/.git").exists()
|
|
details["level2_gitea_repo"] = gitea_repo_exists
|
|
if not gitea_repo_exists:
|
|
gaps.append("Gitea 저장소 미연결")
|
|
|
|
# Level 2 달성 조건: Jenkinsfile + Gitea repo
|
|
if has_jenkinsfile and gitea_repo_exists:
|
|
achieved = 2
|
|
elif has_jenkinsfile or gitea_repo_exists:
|
|
achieved = 1
|
|
else:
|
|
achieved = 0
|
|
|
|
# Level 3: 서명된 아티팩트 — 현재 미구현
|
|
details["level3_signed_artifacts"] = False
|
|
gaps.append("서명된 아티팩트 미구현 — Level 3 달성을 위해 코드 서명 도구 도입 필요")
|
|
|
|
score = (achieved / 3) * 100.0
|
|
return {
|
|
"level": achieved,
|
|
"score": round(score, 1),
|
|
"gaps": gaps,
|
|
"details": details,
|
|
"definition": SLSA_REQUIREMENTS[achieved],
|
|
}
|
|
|
|
|
|
# ── Pydantic 스키마 ──────────────────────────────────────────────────────────
|
|
|
|
class PatchRequestIn(BaseModel):
|
|
note: Optional[str] = None
|
|
|
|
|
|
# ── 엔드포인트 ───────────────────────────────────────────────────────────────
|
|
|
|
@router.get("/scan")
|
|
async def get_scan_status(
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""공급망 스캔 현황 — 최근 20건."""
|
|
rows = await db.execute(
|
|
select(SCSScan).order_by(desc(SCSScan.created_at)).limit(20)
|
|
)
|
|
scans = rows.scalars().all()
|
|
|
|
latest = scans[0] if scans else None
|
|
summary = {
|
|
"total_scans": len(scans),
|
|
"last_scan_at": latest.created_at.isoformat() if latest else None,
|
|
"last_status": latest.status if latest else "none",
|
|
"last_findings": latest.findings_count if latest else 0,
|
|
"last_critical": latest.critical_count if latest else 0,
|
|
"last_high": latest.high_count if latest else 0,
|
|
}
|
|
|
|
return {
|
|
"summary": summary,
|
|
"scans": [
|
|
{
|
|
"id": s.id,
|
|
"scan_type": s.scan_type,
|
|
"target": s.target,
|
|
"status": s.status,
|
|
"findings_count": s.findings_count,
|
|
"critical_count": s.critical_count,
|
|
"high_count": s.high_count,
|
|
"created_at": s.created_at.isoformat(),
|
|
}
|
|
for s in scans
|
|
],
|
|
}
|
|
|
|
|
|
@router.post("/scan", status_code=202)
|
|
async def run_supply_chain_scan(
|
|
background_tasks: BackgroundTasks,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""전체 공급망 스캔 실행 (비동기, 202 Accepted)."""
|
|
if current_user.role not in (UserRole.ADMIN, UserRole.PM):
|
|
raise HTTPException(403, "PM/ADMIN 권한이 필요합니다.")
|
|
|
|
scan = SCSScan(
|
|
scan_type="dependency",
|
|
target="guardia-itsm/requirements.txt",
|
|
status="queued",
|
|
)
|
|
db.add(scan)
|
|
await db.commit()
|
|
await db.refresh(scan)
|
|
|
|
background_tasks.add_task(_run_supply_chain_scan, scan.id)
|
|
|
|
logger.info("공급망 스캔 시작 (scan_id=%d, by=%s)", scan.id, current_user.username)
|
|
return {
|
|
"scan_id": scan.id,
|
|
"status": "queued",
|
|
"message": "공급망 스캔이 시작되었습니다. GET /api/supply-chain/scan 으로 결과를 확인하세요.",
|
|
}
|
|
|
|
|
|
@router.get("/vulnerabilities")
|
|
async def list_vulnerabilities(
|
|
severity: Optional[str] = Query(None, description="CRITICAL|HIGH|MEDIUM|LOW"),
|
|
status: Optional[str] = Query(None, description="open|patched|accepted"),
|
|
limit: int = Query(50, ge=1, le=200),
|
|
offset: int = Query(0, ge=0),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""취약점 목록 조회 (심각도별 필터 지원)."""
|
|
query = select(SupplyChainVulnerability).order_by(
|
|
desc(SupplyChainVulnerability.cvss_score)
|
|
)
|
|
|
|
if severity:
|
|
query = query.where(
|
|
SupplyChainVulnerability.severity == severity.upper()
|
|
)
|
|
if status:
|
|
query = query.where(
|
|
SupplyChainVulnerability.status == status.lower()
|
|
)
|
|
|
|
total_rows = await db.execute(query)
|
|
all_vulns = total_rows.scalars().all()
|
|
|
|
# 심각도 집계
|
|
severity_counts: dict = {}
|
|
for v in all_vulns:
|
|
sev = v.severity or "UNKNOWN"
|
|
severity_counts[sev] = severity_counts.get(sev, 0) + 1
|
|
|
|
paged = all_vulns[offset: offset + limit]
|
|
|
|
return {
|
|
"total": len(all_vulns),
|
|
"severity_summary": severity_counts,
|
|
"vulnerabilities": [
|
|
{
|
|
"id": v.id,
|
|
"cve_id": v.cve_id,
|
|
"package": v.package,
|
|
"version": v.version,
|
|
"fixed_version": v.fixed_version,
|
|
"severity": v.severity,
|
|
"cvss_score": v.cvss_score,
|
|
"description": v.description,
|
|
"patch_available": v.patch_available,
|
|
"status": v.status,
|
|
"created_at": v.created_at.isoformat(),
|
|
}
|
|
for v in paged
|
|
],
|
|
}
|
|
|
|
|
|
@router.post("/vulnerabilities/{vuln_id}/patch", status_code=201)
|
|
async def request_patch_sr(
|
|
vuln_id: int,
|
|
body: PatchRequestIn,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""
|
|
취약점 패치 요청 — SR(서비스 요청) 자동 생성.
|
|
보안: IP, 비밀번호, SSH 계정은 SR 내용에 절대 포함하지 않는다.
|
|
"""
|
|
row = await db.execute(
|
|
select(SupplyChainVulnerability).where(
|
|
SupplyChainVulnerability.id == vuln_id
|
|
)
|
|
)
|
|
vuln = row.scalar_one_or_none()
|
|
if not vuln:
|
|
raise HTTPException(404, f"취약점 ID {vuln_id}를 찾을 수 없습니다.")
|
|
|
|
if vuln.status == "patched":
|
|
raise HTTPException(400, "이미 패치 완료된 취약점입니다.")
|
|
|
|
# SR 내용 구성 — 서버 자격증명 절대 미포함
|
|
sr_title = (
|
|
f"[공급망 보안] {vuln.package} 취약점 패치 요청 ({vuln.cve_id or '미분류'})"
|
|
)
|
|
sr_description = (
|
|
f"패키지: {vuln.package} v{vuln.version or '미상'}\n"
|
|
f"취약점: {vuln.cve_id or '미분류'} (CVSS {vuln.cvss_score:.1f} / {vuln.severity})\n"
|
|
f"설명: {vuln.description or '-'}\n"
|
|
f"권고 버전: {vuln.fixed_version or '최신 버전으로 업그레이드'}\n"
|
|
f"추가 요청 사항: {body.note or '-'}"
|
|
)
|
|
|
|
import hashlib as _hs
|
|
_ts = datetime.utcnow().strftime("%Y%m%d%H%M%S")
|
|
_uid = _hs.sha256(f"scs-{vuln_id}-{_ts}".encode()).hexdigest()[:8].upper()
|
|
sr_id_str = f"SCS-{_ts[:8]}-{_uid}"
|
|
|
|
sr = SRRequest(
|
|
sr_id = sr_id_str,
|
|
title = sr_title,
|
|
description = sr_description,
|
|
status = SRStatus.RECEIVED,
|
|
sr_type = SRType.OTHER,
|
|
requested_by = current_user.username,
|
|
)
|
|
db.add(sr)
|
|
|
|
# 취약점 상태를 'open' → 패치 요청 접수로 표시
|
|
vuln.status = "open" # SR 생성 후에도 open 유지 — 실제 패치 완료 시 patched 처리
|
|
await db.commit()
|
|
await db.refresh(sr)
|
|
|
|
logger.info(
|
|
"공급망 취약점 패치 SR 생성: vuln_id=%d cve=%s sr_id=%d by=%s",
|
|
vuln_id, vuln.cve_id, sr.id, current_user.username,
|
|
)
|
|
|
|
return {
|
|
"message": f"패치 요청 SR이 생성되었습니다. (SR #{sr.id})",
|
|
"sr_id": sr.id,
|
|
"sr_title": sr.title,
|
|
"vuln_id": vuln_id,
|
|
"cve_id": vuln.cve_id,
|
|
"severity": vuln.severity,
|
|
}
|
|
|
|
|
|
@router.get("/dependencies")
|
|
async def list_dependencies(
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""
|
|
의존성 목록 + 각 패키지의 CVE 상태 반환.
|
|
에이전트리스: requirements.txt 샘플 파싱.
|
|
"""
|
|
deps = await _parse_dependencies_sample()
|
|
result = []
|
|
|
|
for dep in deps:
|
|
match = _check_package_vuln(dep["name"], dep["version"])
|
|
result.append({
|
|
"name": dep["name"],
|
|
"version": dep["version"],
|
|
"ecosystem": dep["ecosystem"],
|
|
"vulnerable": match is not None,
|
|
"cve_id": match["cve"] if match else None,
|
|
"severity": match["severity"] if match else None,
|
|
"cvss_score": match["cvss"] if match else None,
|
|
"fixed_version": match["fixed_version"] if match else None,
|
|
})
|
|
|
|
vulnerable_count = sum(1 for d in result if d["vulnerable"])
|
|
return {
|
|
"total_dependencies": len(result),
|
|
"vulnerable_count": vulnerable_count,
|
|
"safe_count": len(result) - vulnerable_count,
|
|
"dependencies": result,
|
|
}
|
|
|
|
|
|
@router.get("/slsa-level")
|
|
async def get_slsa_level(
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""
|
|
SLSA 레벨 평가 (0~3).
|
|
GUARDiA는 Gitea + Jenkins 운영 → Level 2 달성 가능.
|
|
"""
|
|
evaluation = _evaluate_slsa_level()
|
|
|
|
# DB에 평가 이력 저장
|
|
assessment = SLSAAssessment(
|
|
level = evaluation["level"],
|
|
score = evaluation["score"],
|
|
requirements = json.dumps(
|
|
evaluation["definition"]["requirements"], ensure_ascii=False
|
|
),
|
|
gaps = json.dumps(evaluation["gaps"], ensure_ascii=False),
|
|
)
|
|
db.add(assessment)
|
|
await db.commit()
|
|
|
|
return {
|
|
"current_level": evaluation["level"],
|
|
"level_name": evaluation["definition"]["name"],
|
|
"score_pct": evaluation["score"],
|
|
"description": evaluation["definition"]["description"],
|
|
"achieved_checks": evaluation["details"],
|
|
"gaps": evaluation["gaps"],
|
|
"level_definitions": {
|
|
str(k): {
|
|
"name": v["name"],
|
|
"description": v["description"],
|
|
"requirements": v["requirements"],
|
|
}
|
|
for k, v in SLSA_REQUIREMENTS.items()
|
|
},
|
|
"recommendation": (
|
|
"Level 3 달성을 위해 아티팩트 서명(코드 서명) 및 "
|
|
"재현 가능한 빌드 환경 구축이 필요합니다."
|
|
if evaluation["level"] < 3 else
|
|
"SLSA Level 3 달성 완료. 정기 감사를 유지하세요."
|
|
),
|
|
}
|
|
|
|
|
|
@router.get("/pipeline-integrity")
|
|
async def get_pipeline_integrity(
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""
|
|
CI/CD 파이프라인 무결성 점검.
|
|
Jenkinsfile, Gitea 저장소, 배포 스크립트 존재 여부 확인.
|
|
"""
|
|
from pathlib import Path
|
|
|
|
checks = []
|
|
|
|
# 1. Jenkinsfile 존재 여부
|
|
jenkinsfile_paths = [
|
|
("guardia-itsm Jenkinsfile", "C:/GUARDiA/workspace/guardia-itsm/Jenkinsfile"),
|
|
("guardia-manager Jenkinsfile", "C:/GUARDiA/workspace/guardia-manager/Jenkinsfile"),
|
|
]
|
|
for label, path in jenkinsfile_paths:
|
|
exists = Path(path).exists()
|
|
checks.append({
|
|
"check": label,
|
|
"status": "pass" if exists else "fail",
|
|
"detail": f"{path} {'존재' if exists else '미존재'}",
|
|
})
|
|
|
|
# 2. Gitea repo .git 존재
|
|
repo_paths = [
|
|
("guardia-itsm Gitea repo", "C:/GUARDiA/repos/guardia-itsm/.git"),
|
|
("guardia-manager Gitea repo", "C:/GUARDiA/repos/guardia-manager/.git"),
|
|
]
|
|
for label, path in repo_paths:
|
|
exists = Path(path).exists()
|
|
checks.append({
|
|
"check": label,
|
|
"status": "pass" if exists else "warn",
|
|
"detail": f"{path} {'연결됨' if exists else '미연결'}",
|
|
})
|
|
|
|
# 3. deploy_server.py (webhook 수신기)
|
|
deploy_server = Path("C:/GUARDiA/scripts/deploy/deploy_server.py")
|
|
checks.append({
|
|
"check": "Webhook 배포 수신기",
|
|
"status": "pass" if deploy_server.exists() else "fail",
|
|
"detail": str(deploy_server),
|
|
})
|
|
|
|
# 4. requirements.txt 잠금 파일
|
|
req_file = Path("C:/GUARDiA/workspace/guardia-itsm/requirements.txt")
|
|
checks.append({
|
|
"check": "requirements.txt 의존성 잠금",
|
|
"status": "pass" if req_file.exists() else "warn",
|
|
"detail": str(req_file),
|
|
})
|
|
|
|
pass_count = sum(1 for c in checks if c["status"] == "pass")
|
|
fail_count = sum(1 for c in checks if c["status"] == "fail")
|
|
warn_count = sum(1 for c in checks if c["status"] == "warn")
|
|
|
|
overall = (
|
|
"healthy" if fail_count == 0 and warn_count == 0 else
|
|
"degraded" if fail_count == 0 else
|
|
"critical"
|
|
)
|
|
|
|
return {
|
|
"overall_status": overall,
|
|
"pass_count": pass_count,
|
|
"fail_count": fail_count,
|
|
"warn_count": warn_count,
|
|
"integrity_score": round(pass_count / len(checks) * 100, 1),
|
|
"checks": checks,
|
|
"checked_at": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
|
|
@router.get("/report")
|
|
async def get_supply_chain_report(
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""
|
|
공급망 보안 종합 리포트:
|
|
스캔 이력, 취약점 통계, SLSA 레벨, 파이프라인 무결성 요약.
|
|
"""
|
|
# 최신 스캔
|
|
scan_row = await db.execute(
|
|
select(SCSScan).order_by(desc(SCSScan.created_at)).limit(1)
|
|
)
|
|
latest_scan = scan_row.scalar_one_or_none()
|
|
|
|
# 취약점 통계
|
|
vuln_rows = await db.execute(select(SupplyChainVulnerability))
|
|
all_vulns = vuln_rows.scalars().all()
|
|
open_vulns = [v for v in all_vulns if v.status == "open"]
|
|
patched_vulns = [v for v in all_vulns if v.status == "patched"]
|
|
|
|
sev_summary: dict = {}
|
|
for v in open_vulns:
|
|
sev = v.severity or "UNKNOWN"
|
|
sev_summary[sev] = sev_summary.get(sev, 0) + 1
|
|
|
|
# SLSA 평가
|
|
slsa = _evaluate_slsa_level()
|
|
|
|
# 의존성 취약률
|
|
deps = await _parse_dependencies_sample()
|
|
vuln_dep_count = sum(
|
|
1 for d in deps if _check_package_vuln(d["name"], d["version"])
|
|
)
|
|
|
|
# 전체 위험 점수 (간이 계산)
|
|
critical_weight = sev_summary.get("CRITICAL", 0) * 10
|
|
high_weight = sev_summary.get("HIGH", 0) * 7
|
|
medium_weight = sev_summary.get("MEDIUM", 0) * 4
|
|
risk_score = min(100, critical_weight + high_weight + medium_weight)
|
|
|
|
risk_label = (
|
|
"CRITICAL" if risk_score >= 70 else
|
|
"HIGH" if risk_score >= 40 else
|
|
"MEDIUM" if risk_score >= 20 else
|
|
"LOW"
|
|
)
|
|
|
|
return {
|
|
"generated_at": datetime.utcnow().isoformat(),
|
|
"generated_by": current_user.username,
|
|
"risk_score": risk_score,
|
|
"risk_level": risk_label,
|
|
"scan_summary": {
|
|
"last_scan_at": latest_scan.created_at.isoformat() if latest_scan else None,
|
|
"last_status": latest_scan.status if latest_scan else "none",
|
|
"last_findings": latest_scan.findings_count if latest_scan else 0,
|
|
},
|
|
"vulnerability_summary": {
|
|
"total_open": len(open_vulns),
|
|
"total_patched": len(patched_vulns),
|
|
"by_severity": sev_summary,
|
|
"patch_rate_pct": (
|
|
round(len(patched_vulns) / len(all_vulns) * 100, 1)
|
|
if all_vulns else 0.0
|
|
),
|
|
},
|
|
"dependency_summary": {
|
|
"total_dependencies": len(deps),
|
|
"vulnerable_count": vuln_dep_count,
|
|
"vulnerability_rate_pct": round(
|
|
vuln_dep_count / len(deps) * 100, 1
|
|
) if deps else 0.0,
|
|
},
|
|
"slsa_summary": {
|
|
"current_level": slsa["level"],
|
|
"level_name": slsa["definition"]["name"],
|
|
"score_pct": slsa["score"],
|
|
"gaps_count": len(slsa["gaps"]),
|
|
},
|
|
"recommendations": _build_recommendations(sev_summary, slsa["level"], vuln_dep_count),
|
|
}
|
|
|
|
|
|
def _build_recommendations(
|
|
sev_summary: dict,
|
|
slsa_level: int,
|
|
vuln_dep_count: int,
|
|
) -> List[str]:
|
|
"""우선순위 개선 권고 사항 생성."""
|
|
recs: List[str] = []
|
|
|
|
if sev_summary.get("CRITICAL", 0) > 0:
|
|
recs.append(
|
|
f"[긴급] CRITICAL 취약점 {sev_summary['CRITICAL']}건을 즉시 패치하십시오."
|
|
)
|
|
if sev_summary.get("HIGH", 0) > 0:
|
|
recs.append(
|
|
f"[높음] HIGH 취약점 {sev_summary['HIGH']}건에 대한 패치 SR을 금주 내 생성하십시오."
|
|
)
|
|
if vuln_dep_count > 0:
|
|
recs.append(
|
|
f"의존성 {vuln_dep_count}개에 알려진 취약점이 존재합니다. requirements.txt를 갱신하십시오."
|
|
)
|
|
if slsa_level < 2:
|
|
recs.append(
|
|
"SLSA Level 2 달성을 위해 Gitea 저장소 연결 및 Jenkinsfile 작성이 필요합니다."
|
|
)
|
|
if slsa_level < 3:
|
|
recs.append(
|
|
"SLSA Level 3 달성을 위해 빌드 아티팩트 코드 서명 도구 도입을 검토하십시오."
|
|
)
|
|
if not recs:
|
|
recs.append("현재 공급망 보안 상태가 양호합니다. 정기 스캔을 유지하십시오.")
|
|
|
|
return recs
|