685 lines
30 KiB
Python
685 lines
30 KiB
Python
"""
|
|
Playwright MCP 기반 jMeter 성능테스트 스크립트 자동작성 + DB 관리
|
|
|
|
파이프라인 4단계:
|
|
1. 시나리오 작성 — 자연어 설명(또는 직접 입력) → 재사용 가능한 시나리오로 등록 (DB)
|
|
2. 입력값 엑셀 — 시나리오의 ${변수} 파라미터를 엑셀 템플릿으로 내려받고, 작성된 엑셀을 업로드
|
|
3. 화면 녹화→변환 — Playwright MCP 녹화 결과(액션 로그 JSON)를 jMeter 샘플러 시퀀스로 변환
|
|
4. jMeter 셋팅 — Thread Group + HTTP Sampler + CSV Data Set Config로 구성된 .jmx 생성·다운로드
|
|
|
|
기존 routers/jmeter.py(`/api/perf`)는 JTL 분석·내장 실행·보고서만 제공하며 시나리오 작성/녹화/DB 관리
|
|
기능이 없다 — 본 라우터(`/api/perf-scenario`)가 그 앞단 파이프라인을 전담한다.
|
|
|
|
주의: 실제 브라우저 제어(Playwright MCP 녹화)는 오케스트레이터/사용자가 도구로 직접 수행하고,
|
|
본 라우터는 그 결과(JSON 액션 로그)를 입력받아 jMeter 스크립트로 변환하는 어댑터 역할에 집중한다
|
|
(Ollama는 직접 브라우저를 제어할 수 없으므로).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import io
|
|
import logging
|
|
import re
|
|
import xml.etree.ElementTree as ET
|
|
from datetime import datetime
|
|
from typing import Any, List, Optional
|
|
|
|
import httpx
|
|
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile
|
|
from fastapi.responses import Response, StreamingResponse
|
|
from pydantic import BaseModel, ConfigDict
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.auth import get_current_user
|
|
from database import get_db
|
|
from models import (
|
|
User, AuditLog,
|
|
PerfTestScenario, PerfTestInputData, PerfTestRecording,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/api/perf-scenario", tags=["성능테스트 스튜디오"])
|
|
|
|
_OLLAMA_URL = "http://localhost:11434/api/generate"
|
|
_OLLAMA_MODEL = "llama3"
|
|
|
|
_VAR_PATTERN = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)\}")
|
|
|
|
|
|
def _tenant(user: User) -> int:
|
|
return getattr(user, "tenant_id", None) or getattr(user, "id", 1) or 1
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# Pydantic 스키마
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
class ScenarioStep(BaseModel):
|
|
action: str # goto|fill|click|select|wait
|
|
selector: Optional[str] = None
|
|
value: Optional[str] = None
|
|
think_time: float = 0.5
|
|
|
|
|
|
class ScenarioOut(BaseModel):
|
|
model_config = ConfigDict(from_attributes=True)
|
|
id: int
|
|
name: str
|
|
description: Optional[str]
|
|
target_url: str
|
|
steps: Optional[List[dict]]
|
|
users: int
|
|
duration_sec: int
|
|
ramp_up_sec: int
|
|
status: str
|
|
created_at: Optional[datetime]
|
|
updated_at: Optional[datetime]
|
|
|
|
|
|
class ScenarioCreateIn(BaseModel):
|
|
name: str
|
|
description: Optional[str] = None
|
|
target_url: str
|
|
steps: List[ScenarioStep] = []
|
|
users: int = 10
|
|
duration_sec: int = 30
|
|
ramp_up_sec: int = 5
|
|
|
|
|
|
class ScenarioUpdateIn(BaseModel):
|
|
name: Optional[str] = None
|
|
description: Optional[str] = None
|
|
target_url: Optional[str] = None
|
|
steps: Optional[List[ScenarioStep]] = None
|
|
users: Optional[int] = None
|
|
duration_sec: Optional[int] = None
|
|
ramp_up_sec: Optional[int] = None
|
|
|
|
|
|
class ScenarioGenerateIn(BaseModel):
|
|
"""자연어 설명 → Ollama가 시나리오 단계를 구조화."""
|
|
description: str
|
|
target_url: str
|
|
|
|
|
|
class RecordingUploadIn(BaseModel):
|
|
action_log: List[dict]
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# 헬퍼
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
async def _get_scenario_for_user(db: AsyncSession, scenario_id: int, user: User) -> PerfTestScenario:
|
|
sc = await db.get(PerfTestScenario, scenario_id)
|
|
if not sc or sc.tenant_id != _tenant(user):
|
|
raise HTTPException(404, "성능테스트 시나리오를 찾을 수 없습니다.")
|
|
return sc
|
|
|
|
|
|
def _extract_variables(scenario: PerfTestScenario) -> List[str]:
|
|
"""steps의 value/selector에서 ${변수명} 패턴을 추출 (중복 제거, 등장 순서 보존)."""
|
|
seen: List[str] = []
|
|
for step in (scenario.steps or []):
|
|
for field in ("value", "selector"):
|
|
text = step.get(field) if isinstance(step, dict) else None
|
|
if not text:
|
|
continue
|
|
for m in _VAR_PATTERN.finditer(text):
|
|
name = m.group(1)
|
|
if name not in seen:
|
|
seen.append(name)
|
|
return seen
|
|
|
|
|
|
async def _call_ollama(prompt: str, timeout: float = 60.0) -> Optional[str]:
|
|
"""Ollama sLLM 호출. 실패 시 None 반환 (외부 API 절대 호출 금지)."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=timeout) as client:
|
|
resp = await client.post(
|
|
_OLLAMA_URL,
|
|
json={"model": _OLLAMA_MODEL, "prompt": prompt, "stream": False, "format": "json"},
|
|
)
|
|
if resp.status_code == 200:
|
|
return resp.json().get("response", "").strip()
|
|
except Exception as exc:
|
|
logger.warning("Ollama 호출 실패: %s", exc)
|
|
return None
|
|
|
|
|
|
def _fallback_steps(target_url: str, description: str) -> List[dict]:
|
|
"""Ollama 미응답 시 사용할 최소 단계 골격 — goto + 설명 기반 액션 1건."""
|
|
return [
|
|
{"action": "goto", "selector": None, "value": target_url, "think_time": 1.0},
|
|
{"action": "click", "selector": "body", "value": description[:50], "think_time": 0.5},
|
|
]
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# 1단계 — 시나리오 작성/관리
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
@router.post("/scenarios/generate", response_model=ScenarioOut)
|
|
async def generate_scenario(
|
|
body: ScenarioGenerateIn,
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""자연어 설명 → Ollama로 단계별 액션 시퀀스를 구조화하여 DRAFT 시나리오 생성."""
|
|
prompt = (
|
|
"다음 설명을 웹 성능테스트 시나리오의 단계별 액션 시퀀스(JSON 배열)로 변환해줘.\n"
|
|
"각 항목은 {\"action\": \"goto|fill|click|select|wait\", \"selector\": \"CSS선택자 또는 null\", "
|
|
"\"value\": \"입력값 또는 URL 또는 null\", \"think_time\": 초단위 숫자} 형식이며, "
|
|
"JSON 배열만 출력하고 다른 설명은 포함하지 마.\n"
|
|
f"대상 URL: {body.target_url}\n"
|
|
f"시나리오 설명: {body.description}"
|
|
)
|
|
raw = await _call_ollama(prompt)
|
|
steps: List[dict] = []
|
|
if raw:
|
|
try:
|
|
import json as _json
|
|
parsed = _json.loads(raw)
|
|
if isinstance(parsed, list):
|
|
steps = [
|
|
{
|
|
"action": s.get("action", "click"),
|
|
"selector": s.get("selector"),
|
|
"value": s.get("value"),
|
|
"think_time": float(s.get("think_time", 0.5)),
|
|
}
|
|
for s in parsed if isinstance(s, dict)
|
|
]
|
|
except Exception:
|
|
steps = []
|
|
if not steps:
|
|
steps = _fallback_steps(body.target_url, body.description)
|
|
|
|
scenario = PerfTestScenario(
|
|
tenant_id=_tenant(user),
|
|
name=body.description[:80] or "신규 시나리오",
|
|
description=body.description,
|
|
target_url=body.target_url,
|
|
steps=steps,
|
|
status="DRAFT",
|
|
created_by=user.id,
|
|
)
|
|
db.add(scenario)
|
|
await db.flush()
|
|
db.add(AuditLog(
|
|
actor=user.username if hasattr(user, "username") else str(user.id),
|
|
action="PERF_SCENARIO_GENERATE",
|
|
detail=f"AI 시나리오 자동생성: {scenario.name} (단계 {len(steps)}개)",
|
|
entity_type="PERF_SCENARIO", entity_id=str(scenario.id), severity="INFO",
|
|
))
|
|
await db.commit()
|
|
await db.refresh(scenario)
|
|
return scenario
|
|
|
|
|
|
@router.post("/scenarios", response_model=ScenarioOut)
|
|
async def create_scenario(
|
|
body: ScenarioCreateIn,
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""성능테스트 시나리오 직접 등록 (재사용 가능)."""
|
|
scenario = PerfTestScenario(
|
|
tenant_id=_tenant(user),
|
|
name=body.name,
|
|
description=body.description,
|
|
target_url=body.target_url,
|
|
steps=[s.model_dump() for s in body.steps],
|
|
users=body.users,
|
|
duration_sec=body.duration_sec,
|
|
ramp_up_sec=body.ramp_up_sec,
|
|
status="DRAFT",
|
|
created_by=user.id,
|
|
)
|
|
db.add(scenario)
|
|
await db.flush()
|
|
db.add(AuditLog(
|
|
actor=user.username if hasattr(user, "username") else str(user.id),
|
|
action="PERF_SCENARIO_CREATE",
|
|
detail=f"시나리오 등록: {scenario.name} ({scenario.target_url})",
|
|
entity_type="PERF_SCENARIO", entity_id=str(scenario.id), severity="INFO",
|
|
))
|
|
await db.commit()
|
|
await db.refresh(scenario)
|
|
return scenario
|
|
|
|
|
|
@router.get("/scenarios", response_model=List[ScenarioOut])
|
|
async def list_scenarios(
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""등록된 시나리오 목록 (재사용 가능한 그리드)."""
|
|
rows = (await db.execute(
|
|
select(PerfTestScenario)
|
|
.where(PerfTestScenario.tenant_id == _tenant(user))
|
|
.order_by(PerfTestScenario.created_at.desc())
|
|
)).scalars().all()
|
|
return rows
|
|
|
|
|
|
@router.get("/scenarios/{scenario_id}", response_model=ScenarioOut)
|
|
async def get_scenario(
|
|
scenario_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
return await _get_scenario_for_user(db, scenario_id, user)
|
|
|
|
|
|
@router.put("/scenarios/{scenario_id}", response_model=ScenarioOut)
|
|
async def update_scenario(
|
|
scenario_id: int,
|
|
body: ScenarioUpdateIn,
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
scenario = await _get_scenario_for_user(db, scenario_id, user)
|
|
data = body.model_dump(exclude_unset=True)
|
|
if "steps" in data and data["steps"] is not None:
|
|
data["steps"] = [s if isinstance(s, dict) else s.model_dump() for s in body.steps]
|
|
for k, v in data.items():
|
|
setattr(scenario, k, v)
|
|
scenario.updated_at = datetime.utcnow()
|
|
db.add(AuditLog(
|
|
actor=user.username if hasattr(user, "username") else str(user.id),
|
|
action="PERF_SCENARIO_UPDATE",
|
|
detail=f"시나리오 수정: {scenario.name}",
|
|
entity_type="PERF_SCENARIO", entity_id=str(scenario.id), severity="INFO",
|
|
))
|
|
await db.commit()
|
|
await db.refresh(scenario)
|
|
return scenario
|
|
|
|
|
|
@router.delete("/scenarios/{scenario_id}")
|
|
async def delete_scenario(
|
|
scenario_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
scenario = await _get_scenario_for_user(db, scenario_id, user)
|
|
name = scenario.name
|
|
await db.execute(
|
|
PerfTestInputData.__table__.delete().where(PerfTestInputData.scenario_id == scenario_id)
|
|
)
|
|
await db.execute(
|
|
PerfTestRecording.__table__.delete().where(PerfTestRecording.scenario_id == scenario_id)
|
|
)
|
|
await db.delete(scenario)
|
|
db.add(AuditLog(
|
|
actor=user.username if hasattr(user, "username") else str(user.id),
|
|
action="PERF_SCENARIO_DELETE",
|
|
detail=f"시나리오 삭제: {name}",
|
|
entity_type="PERF_SCENARIO", entity_id=str(scenario_id), severity="WARNING",
|
|
))
|
|
await db.commit()
|
|
return {"message": f"시나리오 '{name}'이(가) 삭제되었습니다."}
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# 2단계 — 테스트 입력값 엑셀 (openpyxl)
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
@router.get("/scenarios/{scenario_id}/input-template")
|
|
async def download_input_template(
|
|
scenario_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""시나리오 steps의 ${변수} 목록을 헤더로, 샘플 행 1개를 포함한 .xlsx 템플릿 다운로드."""
|
|
scenario = await _get_scenario_for_user(db, scenario_id, user)
|
|
variables = _extract_variables(scenario)
|
|
if not variables:
|
|
raise HTTPException(422, "이 시나리오에는 파라미터화 변수(${변수명})가 없습니다.")
|
|
|
|
try:
|
|
from openpyxl import Workbook
|
|
from openpyxl.styles import Font, PatternFill
|
|
except ImportError:
|
|
raise HTTPException(500, "openpyxl 라이브러리가 설치되지 않았습니다. pip install openpyxl")
|
|
|
|
wb = Workbook()
|
|
ws = wb.active
|
|
ws.title = "입력값"
|
|
header_fill = PatternFill(start_color="003366", end_color="003366", fill_type="solid")
|
|
header_font = Font(color="FFFFFF", bold=True)
|
|
for col, name in enumerate(variables, start=1):
|
|
cell = ws.cell(row=1, column=col, value=name)
|
|
cell.font = header_font
|
|
cell.fill = header_fill
|
|
for col, name in enumerate(variables, start=1):
|
|
ws.cell(row=2, column=col, value=f"샘플_{name}_1")
|
|
|
|
buf = io.BytesIO()
|
|
wb.save(buf)
|
|
buf.seek(0)
|
|
filename = f"perf_input_template_{scenario_id}.xlsx"
|
|
return StreamingResponse(
|
|
buf,
|
|
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
headers={"Content-Disposition": f"attachment; filename={filename}"},
|
|
)
|
|
|
|
|
|
@router.post("/scenarios/{scenario_id}/input-data")
|
|
async def upload_input_data(
|
|
scenario_id: int,
|
|
file: UploadFile = File(...),
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""작성된 입력값 엑셀 업로드 → 파싱하여 CSV Data Set Config 원천 데이터로 저장."""
|
|
scenario = await _get_scenario_for_user(db, scenario_id, user)
|
|
if not file.filename.lower().endswith((".xlsx", ".xlsm")):
|
|
raise HTTPException(422, "xlsx 형식의 엑셀 파일만 업로드 가능합니다.")
|
|
|
|
try:
|
|
from openpyxl import load_workbook
|
|
except ImportError:
|
|
raise HTTPException(500, "openpyxl 라이브러리가 설치되지 않았습니다. pip install openpyxl")
|
|
|
|
content = await file.read()
|
|
try:
|
|
wb = load_workbook(io.BytesIO(content), read_only=True, data_only=True)
|
|
ws = wb.active
|
|
rows_iter = ws.iter_rows(values_only=True)
|
|
header = [str(c) if c is not None else "" for c in next(rows_iter)]
|
|
data_rows = [
|
|
[("" if c is None else c) for c in r]
|
|
for r in rows_iter if any(c is not None for c in r)
|
|
]
|
|
except Exception as exc:
|
|
raise HTTPException(422, f"엑셀 파일을 읽을 수 없습니다: {exc}")
|
|
|
|
if not header or not data_rows:
|
|
raise HTTPException(422, "엑셀에 헤더 행과 최소 1개의 데이터 행이 필요합니다.")
|
|
|
|
# 기존 입력값 교체 (시나리오당 최신 1세트 유지)
|
|
await db.execute(
|
|
PerfTestInputData.__table__.delete().where(PerfTestInputData.scenario_id == scenario_id)
|
|
)
|
|
input_data = PerfTestInputData(scenario_id=scenario_id, column_names=header, rows=data_rows)
|
|
db.add(input_data)
|
|
db.add(AuditLog(
|
|
actor=user.username if hasattr(user, "username") else str(user.id),
|
|
action="PERF_INPUT_DATA_UPLOAD",
|
|
detail=f"입력값 업로드: {scenario.name} | 컬럼 {len(header)}개 · 행 {len(data_rows)}개",
|
|
entity_type="PERF_SCENARIO", entity_id=str(scenario_id), severity="INFO",
|
|
))
|
|
await db.commit()
|
|
return {
|
|
"scenario_id": scenario_id,
|
|
"column_names": header,
|
|
"row_count": len(data_rows),
|
|
"message": f"입력값 {len(data_rows)}건이 등록되었습니다.",
|
|
}
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# 3단계 — Playwright MCP 녹화 → jMeter 스크립트 변환
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
@router.post("/scenarios/{scenario_id}/recording")
|
|
async def upload_recording(
|
|
scenario_id: int,
|
|
body: RecordingUploadIn,
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""Playwright MCP로 녹화한 액션 로그(JSON)를 업로드 — 실제 브라우저 제어는 별도 도구가 수행."""
|
|
scenario = await _get_scenario_for_user(db, scenario_id, user)
|
|
if not body.action_log:
|
|
raise HTTPException(422, "action_log가 비어 있습니다.")
|
|
|
|
recording = PerfTestRecording(scenario_id=scenario_id, action_log=body.action_log)
|
|
db.add(recording)
|
|
scenario.status = "RECORDED"
|
|
scenario.updated_at = datetime.utcnow()
|
|
db.add(AuditLog(
|
|
actor=user.username if hasattr(user, "username") else str(user.id),
|
|
action="PERF_RECORDING_UPLOAD",
|
|
detail=f"Playwright 녹화 업로드: {scenario.name} | 액션 {len(body.action_log)}개",
|
|
entity_type="PERF_SCENARIO", entity_id=str(scenario_id), severity="INFO",
|
|
))
|
|
await db.commit()
|
|
await db.refresh(recording)
|
|
return {
|
|
"recording_id": recording.id,
|
|
"scenario_id": scenario_id,
|
|
"action_count": len(body.action_log),
|
|
"status": scenario.status,
|
|
"message": "녹화 결과가 저장되었습니다. /generate-script 로 jMeter 스크립트 변환을 진행하세요.",
|
|
}
|
|
|
|
|
|
@router.post("/scenarios/{scenario_id}/generate-script")
|
|
async def generate_script(
|
|
scenario_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""녹화 액션 로그 + 입력값 데이터 → jMeter .jmx 스크립트로 변환."""
|
|
scenario = await _get_scenario_for_user(db, scenario_id, user)
|
|
|
|
recording = (await db.execute(
|
|
select(PerfTestRecording)
|
|
.where(PerfTestRecording.scenario_id == scenario_id)
|
|
.order_by(PerfTestRecording.recorded_at.desc())
|
|
)).scalars().first()
|
|
if not recording:
|
|
raise HTTPException(422, "녹화 결과가 없습니다. 먼저 /recording 으로 액션 로그를 업로드하세요.")
|
|
|
|
input_data = (await db.execute(
|
|
select(PerfTestInputData).where(PerfTestInputData.scenario_id == scenario_id)
|
|
)).scalars().first()
|
|
csv_columns = input_data.column_names if input_data else _extract_variables(scenario)
|
|
|
|
jmx_xml = _build_jmx_xml(scenario, recording.action_log, csv_columns)
|
|
recording.generated_jmx = jmx_xml
|
|
scenario.status = "SCRIPT_READY"
|
|
scenario.updated_at = datetime.utcnow()
|
|
db.add(AuditLog(
|
|
actor=user.username if hasattr(user, "username") else str(user.id),
|
|
action="PERF_SCRIPT_GENERATE",
|
|
detail=f"jMeter 스크립트 변환: {scenario.name} | 샘플러 {len(recording.action_log)}개",
|
|
entity_type="PERF_SCENARIO", entity_id=str(scenario_id), severity="INFO",
|
|
))
|
|
await db.commit()
|
|
return {
|
|
"scenario_id": scenario_id,
|
|
"recording_id": recording.id,
|
|
"status": scenario.status,
|
|
"sampler_count": sum(1 for a in recording.action_log if a.get("type") in ("goto", "navigate", "fill", "click", "submit")),
|
|
"message": "jMeter 스크립트(.jmx) 변환이 완료되었습니다. /jmx 에서 다운로드하세요.",
|
|
}
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# 4단계 — jMeter 셋팅(.jmx) 다운로드 + 실행 연계
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
@router.get("/scenarios/{scenario_id}/jmx")
|
|
async def download_jmx(
|
|
scenario_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""생성된 jMeter .jmx 파일 다운로드."""
|
|
scenario = await _get_scenario_for_user(db, scenario_id, user)
|
|
recording = (await db.execute(
|
|
select(PerfTestRecording)
|
|
.where(PerfTestRecording.scenario_id == scenario_id, PerfTestRecording.generated_jmx.isnot(None))
|
|
.order_by(PerfTestRecording.recorded_at.desc())
|
|
)).scalars().first()
|
|
if not recording or not recording.generated_jmx:
|
|
raise HTTPException(422, "생성된 jMeter 스크립트가 없습니다. 먼저 /generate-script 를 실행하세요.")
|
|
|
|
scenario.status = "JMX_READY"
|
|
await db.commit()
|
|
filename = f"perf_scenario_{scenario_id}.jmx"
|
|
return Response(
|
|
content=recording.generated_jmx,
|
|
media_type="application/xml",
|
|
headers={"Content-Disposition": f"attachment; filename={filename}"},
|
|
)
|
|
|
|
|
|
@router.post("/scenarios/{scenario_id}/run")
|
|
async def run_scenario(
|
|
scenario_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""생성된 시나리오로 기존 jmeter.py(/api/perf/run) 내장 실행 흐름과 연계."""
|
|
scenario = await _get_scenario_for_user(db, scenario_id, user)
|
|
|
|
from routers.jmeter import PerfTestRequest, run_performance_test
|
|
|
|
endpoints = []
|
|
for step in (scenario.steps or []):
|
|
if step.get("action") in ("goto", "navigate") and step.get("value"):
|
|
path = step["value"]
|
|
if path.startswith("http"):
|
|
from urllib.parse import urlparse
|
|
path = urlparse(path).path or "/"
|
|
endpoints.append(path)
|
|
if not endpoints:
|
|
endpoints = ["/"]
|
|
|
|
req = PerfTestRequest(
|
|
target_url=scenario.target_url,
|
|
endpoints=list(dict.fromkeys(endpoints))[:10],
|
|
users=scenario.users,
|
|
duration=scenario.duration_sec,
|
|
ramp_up=scenario.ramp_up_sec,
|
|
)
|
|
result = await run_performance_test(req, cu=user)
|
|
db.add(AuditLog(
|
|
actor=user.username if hasattr(user, "username") else str(user.id),
|
|
action="PERF_SCENARIO_RUN",
|
|
detail=f"시나리오 실행: {scenario.name} → /api/perf 연계 실행",
|
|
entity_type="PERF_SCENARIO", entity_id=str(scenario_id), severity="INFO",
|
|
))
|
|
await db.commit()
|
|
return result
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# Playwright 액션 로그 → jMeter .jmx 변환 로직
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
def _build_jmx_xml(scenario: PerfTestScenario, actions: List[dict], csv_columns: List[str]) -> str:
|
|
"""표준 Apache JMeter 2.13+ XML 스키마(jmeterTestPlan)를 직렬화한다.
|
|
|
|
구조: jmeterTestPlan > hashTree > TestPlan > hashTree
|
|
> (CSVDataSet) > ThreadGroup > hashTree > [HTTPSamplerProxy ...] > ResultCollector
|
|
"""
|
|
root = ET.Element("jmeterTestPlan", version="1.2", properties="5.0", jmeter="5.6.3")
|
|
root_tree = ET.SubElement(root, "hashTree")
|
|
|
|
test_plan = ET.SubElement(root_tree, "TestPlan", testname=scenario.name, enabled="true")
|
|
ET.SubElement(test_plan, "stringProp", name="TestPlan.comments").text = scenario.description or ""
|
|
ET.SubElement(test_plan, "boolProp", name="TestPlan.functional_mode").text = "false"
|
|
ET.SubElement(test_plan, "boolProp", name="TestPlan.serialize_threadgroups").text = "false"
|
|
plan_tree = ET.SubElement(root_tree, "hashTree")
|
|
|
|
if csv_columns:
|
|
csv_cfg = ET.SubElement(
|
|
plan_tree, "CSVDataSet",
|
|
guiclass="TestBeanGUI", testclass="CSVDataSet",
|
|
testname="입력값 CSV Data Set Config", enabled="true",
|
|
)
|
|
ET.SubElement(csv_cfg, "stringProp", name="filename").text = f"perf_scenario_{scenario.id}_input.csv"
|
|
ET.SubElement(csv_cfg, "stringProp", name="variableNames").text = ",".join(csv_columns)
|
|
ET.SubElement(csv_cfg, "stringProp", name="delimiter").text = ","
|
|
ET.SubElement(csv_cfg, "boolProp", name="recycle").text = "true"
|
|
ET.SubElement(csv_cfg, "boolProp", name="stopThread").text = "false"
|
|
ET.SubElement(plan_tree, "hashTree")
|
|
|
|
thread_group = ET.SubElement(
|
|
plan_tree, "ThreadGroup",
|
|
guiclass="ThreadGroupGui", testclass="ThreadGroup",
|
|
testname=f"{scenario.name} — Thread Group", enabled="true",
|
|
)
|
|
loop_ctrl = ET.SubElement(thread_group, "elementProp", name="ThreadGroup.main_controller",
|
|
elementType="LoopController", guiclass="LoopControlPanel",
|
|
testclass="LoopController", testname="루프 제어")
|
|
ET.SubElement(loop_ctrl, "boolProp", name="LoopController.continue_forever").text = "false"
|
|
ET.SubElement(loop_ctrl, "stringProp", name="LoopController.loops").text = "1"
|
|
ET.SubElement(thread_group, "stringProp", name="ThreadGroup.num_threads").text = str(scenario.users)
|
|
ET.SubElement(thread_group, "stringProp", name="ThreadGroup.ramp_time").text = str(scenario.ramp_up_sec)
|
|
ET.SubElement(thread_group, "stringProp", name="ThreadGroup.duration").text = str(scenario.duration_sec)
|
|
ET.SubElement(thread_group, "boolProp", name="ThreadGroup.scheduler").text = "true"
|
|
tg_tree = ET.SubElement(plan_tree, "hashTree")
|
|
|
|
base_url = scenario.target_url or ""
|
|
from urllib.parse import urlparse
|
|
parsed = urlparse(base_url)
|
|
domain = parsed.hostname or "localhost"
|
|
port = str(parsed.port or (443 if parsed.scheme == "https" else 80))
|
|
protocol = parsed.scheme or "http"
|
|
|
|
sampler_idx = 0
|
|
for action in actions:
|
|
atype = (action.get("type") or action.get("action") or "").lower()
|
|
if atype not in ("goto", "navigate", "fill", "click", "submit"):
|
|
continue
|
|
sampler_idx += 1
|
|
method = "GET" if atype in ("goto", "navigate") else "POST"
|
|
path = action.get("url") or action.get("value") or "/"
|
|
if path.startswith("http"):
|
|
path = urlparse(path).path or "/"
|
|
sampler_name = f"#{sampler_idx} {atype} {path}"
|
|
|
|
sampler = ET.SubElement(
|
|
tg_tree, "HTTPSamplerProxy",
|
|
guiclass="HttpTestSampleGui", testclass="HTTPSamplerProxy",
|
|
testname=sampler_name, enabled="true",
|
|
)
|
|
ET.SubElement(sampler, "stringProp", name="HTTPSampler.domain").text = domain
|
|
ET.SubElement(sampler, "stringProp", name="HTTPSampler.port").text = port
|
|
ET.SubElement(sampler, "stringProp", name="HTTPSampler.protocol").text = protocol
|
|
ET.SubElement(sampler, "stringProp", name="HTTPSampler.path").text = path
|
|
ET.SubElement(sampler, "stringProp", name="HTTPSampler.method").text = method
|
|
|
|
if method == "POST" and action.get("value"):
|
|
args = ET.SubElement(sampler, "elementProp", name="HTTPsampler.Arguments",
|
|
elementType="Arguments", guiclass="HTTPArgumentsPanel",
|
|
testclass="Arguments", testname="사용자 정의 변수")
|
|
coll = ET.SubElement(args, "collectionProp", name="Arguments.arguments")
|
|
arg = ET.SubElement(coll, "elementProp",
|
|
name=action.get("selector") or f"param{sampler_idx}",
|
|
elementType="HTTPArgument")
|
|
ET.SubElement(arg, "boolProp", name="HTTPArgument.always_encode").text = "false"
|
|
ET.SubElement(arg, "stringProp", name="Argument.value").text = str(action.get("value"))
|
|
ET.SubElement(arg, "stringProp", name="Argument.name").text = action.get("selector") or f"param{sampler_idx}"
|
|
ET.SubElement(arg, "stringProp", name="Argument.metadata").text = "="
|
|
|
|
sampler_tree = ET.SubElement(tg_tree, "hashTree")
|
|
think_time = action.get("think_time")
|
|
if think_time:
|
|
timer = ET.SubElement(
|
|
sampler_tree, "ConstantTimer",
|
|
guiclass="ConstantTimerGui", testclass="ConstantTimer",
|
|
testname="고정 대기시간", enabled="true",
|
|
)
|
|
ET.SubElement(timer, "stringProp", name="ConstantTimer.delay").text = str(int(float(think_time) * 1000))
|
|
ET.SubElement(sampler_tree, "hashTree")
|
|
|
|
collector = ET.SubElement(
|
|
tg_tree, "ResultCollector",
|
|
guiclass="ViewResultsFullVisualizer", testclass="ResultCollector",
|
|
testname="결과 트리 리스너", enabled="true",
|
|
)
|
|
ET.SubElement(collector, "boolProp", name="ResultCollector.error_logging").text = "false"
|
|
ET.SubElement(tg_tree, "hashTree")
|
|
|
|
xml_bytes = ET.tostring(root, encoding="unicode")
|
|
return f'<?xml version="1.0" encoding="UTF-8"?>\n{xml_bytes}\n'
|