diff --git a/main.py b/main.py index 72dda2b..cff8b8e 100644 --- a/main.py +++ b/main.py @@ -580,6 +580,10 @@ app.include_router(bid_watcher.router) # 나라장터 입찰워처 ( from routers import jasper_report app.include_router(jasper_report.router) # 산출물·회의록·보고서 자동생성 (JRXML 파싱→ReportLab/openpyxl) +# ── Playwright MCP 기반 jMeter 성능테스트 스크립트 자동작성 + DB 관리 ───────── +from routers import perf_scenario +app.include_router(perf_scenario.router) # 시나리오 작성→입력값엑셀→녹화변환→jMeter(.jmx) 파이프라인 + # ── 개방망 보안 헤더 미들웨어 ──────────────────────────────────────────────── @app.middleware("http") async def add_security_headers(request, call_next): diff --git a/models.py b/models.py index 721a622..3f1642f 100644 --- a/models.py +++ b/models.py @@ -7345,3 +7345,46 @@ class BidWatchAssignee(Base): active = Column(Boolean, default=True) created_by = Column(Integer, nullable=True) created_at = Column(DateTime, default=func.now()) + + +# ── perf-test-studio-dev: Playwright MCP 기반 jMeter 성능테스트 스크립트 자동작성 ── + +class PerfTestScenario(Base): + """성능테스트 시나리오 — 재사용 가능한 등록 단위 (시나리오→입력값→녹화→jmx 파이프라인).""" + __tablename__ = "tb_perf_scenario" + + id = Column(Integer, primary_key=True, index=True) + tenant_id = Column(Integer, index=True, default=1) + name = Column(String(200)) + description = Column(Text, nullable=True) + target_url = Column(String(500)) + steps = Column(JSON) # [{action, selector, value, think_time}, ...] + users = Column(Integer, default=10) + duration_sec = Column(Integer, default=30) + ramp_up_sec = Column(Integer, default=5) + status = Column(String(20), default="DRAFT") # DRAFT|RECORDED|SCRIPT_READY|JMX_READY + created_by = Column(Integer, nullable=True) + created_at = Column(DateTime, default=func.now()) + updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) + + +class PerfTestInputData(Base): + """파라미터화 입력값 (엑셀 업로드 결과) — CSV Data Set Config 원천.""" + __tablename__ = "tb_perf_input_data" + + id = Column(Integer, primary_key=True, index=True) + scenario_id = Column(Integer, index=True) + column_names = Column(JSON) # ["username","password","search_keyword",...] + rows = Column(JSON) # [["user1","pw1","검색어1"], ...] + uploaded_at = Column(DateTime, default=func.now()) + + +class PerfTestRecording(Base): + """Playwright MCP 녹화 결과 + 변환된 jMeter 스크립트(.jmx).""" + __tablename__ = "tb_perf_recording" + + id = Column(Integer, primary_key=True, index=True) + scenario_id = Column(Integer, index=True) + action_log = Column(JSON) # Playwright 액션 시퀀스 [{type,url,selector,value,timestamp}] + generated_jmx = Column(Text, nullable=True) + recorded_at = Column(DateTime, default=func.now()) diff --git a/routers/perf_scenario.py b/routers/perf_scenario.py new file mode 100644 index 0000000..8a4648a --- /dev/null +++ b/routers/perf_scenario.py @@ -0,0 +1,684 @@ +""" +Playwright MCP 기반 jMeter 성능테스트 스크립트 자동작성 + DB 관리 + +파이프라인 4단계: + 1. 시나리오 작성 — 자연어 설명(또는 직접 입력) → 재사용 가능한 시나리오로 등록 (DB) + 2. 입력값 엑셀 — 시나리오의 ${변수} 파라미터를 엑셀 템플릿으로 내려받고, 작성된 엑셀을 업로드 + 3. 화면 녹화→변환 — Playwright MCP 녹화 결과(액션 로그 JSON)를 jMeter 샘플러 시퀀스로 변환 + 4. jMeter 셋팅 — Thread Group + HTTP Sampler + CSV Data Set Config로 구성된 .jmx 생성·다운로드 + +기존 routers/jmeter.py(`/api/perf`)는 JTL 분석·내장 실행·보고서만 제공하며 시나리오 작성/녹화/DB 관리 +기능이 없다 — 본 라우터(`/api/perf-scenario`)가 그 앞단 파이프라인을 전담한다. + +주의: 실제 브라우저 제어(Playwright MCP 녹화)는 오케스트레이터/사용자가 도구로 직접 수행하고, +본 라우터는 그 결과(JSON 액션 로그)를 입력받아 jMeter 스크립트로 변환하는 어댑터 역할에 집중한다 +(Ollama는 직접 브라우저를 제어할 수 없으므로). +""" +from __future__ import annotations + +import io +import logging +import re +import xml.etree.ElementTree as ET +from datetime import datetime +from typing import Any, List, Optional + +import httpx +from fastapi import APIRouter, Depends, File, HTTPException, UploadFile +from fastapi.responses import Response, StreamingResponse +from pydantic import BaseModel, ConfigDict +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from core.auth import get_current_user +from database import get_db +from models import ( + User, AuditLog, + PerfTestScenario, PerfTestInputData, PerfTestRecording, +) + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api/perf-scenario", tags=["성능테스트 스튜디오"]) + +_OLLAMA_URL = "http://localhost:11434/api/generate" +_OLLAMA_MODEL = "llama3" + +_VAR_PATTERN = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)\}") + + +def _tenant(user: User) -> int: + return getattr(user, "tenant_id", None) or getattr(user, "id", 1) or 1 + + +# ════════════════════════════════════════════════════════════════════════════ +# Pydantic 스키마 +# ════════════════════════════════════════════════════════════════════════════ + +class ScenarioStep(BaseModel): + action: str # goto|fill|click|select|wait + selector: Optional[str] = None + value: Optional[str] = None + think_time: float = 0.5 + + +class ScenarioOut(BaseModel): + model_config = ConfigDict(from_attributes=True) + id: int + name: str + description: Optional[str] + target_url: str + steps: Optional[List[dict]] + users: int + duration_sec: int + ramp_up_sec: int + status: str + created_at: Optional[datetime] + updated_at: Optional[datetime] + + +class ScenarioCreateIn(BaseModel): + name: str + description: Optional[str] = None + target_url: str + steps: List[ScenarioStep] = [] + users: int = 10 + duration_sec: int = 30 + ramp_up_sec: int = 5 + + +class ScenarioUpdateIn(BaseModel): + name: Optional[str] = None + description: Optional[str] = None + target_url: Optional[str] = None + steps: Optional[List[ScenarioStep]] = None + users: Optional[int] = None + duration_sec: Optional[int] = None + ramp_up_sec: Optional[int] = None + + +class ScenarioGenerateIn(BaseModel): + """자연어 설명 → Ollama가 시나리오 단계를 구조화.""" + description: str + target_url: str + + +class RecordingUploadIn(BaseModel): + action_log: List[dict] + + +# ════════════════════════════════════════════════════════════════════════════ +# 헬퍼 +# ════════════════════════════════════════════════════════════════════════════ + +async def _get_scenario_for_user(db: AsyncSession, scenario_id: int, user: User) -> PerfTestScenario: + sc = await db.get(PerfTestScenario, scenario_id) + if not sc or sc.tenant_id != _tenant(user): + raise HTTPException(404, "성능테스트 시나리오를 찾을 수 없습니다.") + return sc + + +def _extract_variables(scenario: PerfTestScenario) -> List[str]: + """steps의 value/selector에서 ${변수명} 패턴을 추출 (중복 제거, 등장 순서 보존).""" + seen: List[str] = [] + for step in (scenario.steps or []): + for field in ("value", "selector"): + text = step.get(field) if isinstance(step, dict) else None + if not text: + continue + for m in _VAR_PATTERN.finditer(text): + name = m.group(1) + if name not in seen: + seen.append(name) + return seen + + +async def _call_ollama(prompt: str, timeout: float = 60.0) -> Optional[str]: + """Ollama sLLM 호출. 실패 시 None 반환 (외부 API 절대 호출 금지).""" + try: + async with httpx.AsyncClient(timeout=timeout) as client: + resp = await client.post( + _OLLAMA_URL, + json={"model": _OLLAMA_MODEL, "prompt": prompt, "stream": False, "format": "json"}, + ) + if resp.status_code == 200: + return resp.json().get("response", "").strip() + except Exception as exc: + logger.warning("Ollama 호출 실패: %s", exc) + return None + + +def _fallback_steps(target_url: str, description: str) -> List[dict]: + """Ollama 미응답 시 사용할 최소 단계 골격 — goto + 설명 기반 액션 1건.""" + return [ + {"action": "goto", "selector": None, "value": target_url, "think_time": 1.0}, + {"action": "click", "selector": "body", "value": description[:50], "think_time": 0.5}, + ] + + +# ════════════════════════════════════════════════════════════════════════════ +# 1단계 — 시나리오 작성/관리 +# ════════════════════════════════════════════════════════════════════════════ + +@router.post("/scenarios/generate", response_model=ScenarioOut) +async def generate_scenario( + body: ScenarioGenerateIn, + db: AsyncSession = Depends(get_db), + user: User = Depends(get_current_user), +): + """자연어 설명 → Ollama로 단계별 액션 시퀀스를 구조화하여 DRAFT 시나리오 생성.""" + prompt = ( + "다음 설명을 웹 성능테스트 시나리오의 단계별 액션 시퀀스(JSON 배열)로 변환해줘.\n" + "각 항목은 {\"action\": \"goto|fill|click|select|wait\", \"selector\": \"CSS선택자 또는 null\", " + "\"value\": \"입력값 또는 URL 또는 null\", \"think_time\": 초단위 숫자} 형식이며, " + "JSON 배열만 출력하고 다른 설명은 포함하지 마.\n" + f"대상 URL: {body.target_url}\n" + f"시나리오 설명: {body.description}" + ) + raw = await _call_ollama(prompt) + steps: List[dict] = [] + if raw: + try: + import json as _json + parsed = _json.loads(raw) + if isinstance(parsed, list): + steps = [ + { + "action": s.get("action", "click"), + "selector": s.get("selector"), + "value": s.get("value"), + "think_time": float(s.get("think_time", 0.5)), + } + for s in parsed if isinstance(s, dict) + ] + except Exception: + steps = [] + if not steps: + steps = _fallback_steps(body.target_url, body.description) + + scenario = PerfTestScenario( + tenant_id=_tenant(user), + name=body.description[:80] or "신규 시나리오", + description=body.description, + target_url=body.target_url, + steps=steps, + status="DRAFT", + created_by=user.id, + ) + db.add(scenario) + await db.flush() + db.add(AuditLog( + actor=user.username if hasattr(user, "username") else str(user.id), + action="PERF_SCENARIO_GENERATE", + detail=f"AI 시나리오 자동생성: {scenario.name} (단계 {len(steps)}개)", + entity_type="PERF_SCENARIO", entity_id=str(scenario.id), severity="INFO", + )) + await db.commit() + await db.refresh(scenario) + return scenario + + +@router.post("/scenarios", response_model=ScenarioOut) +async def create_scenario( + body: ScenarioCreateIn, + db: AsyncSession = Depends(get_db), + user: User = Depends(get_current_user), +): + """성능테스트 시나리오 직접 등록 (재사용 가능).""" + scenario = PerfTestScenario( + tenant_id=_tenant(user), + name=body.name, + description=body.description, + target_url=body.target_url, + steps=[s.model_dump() for s in body.steps], + users=body.users, + duration_sec=body.duration_sec, + ramp_up_sec=body.ramp_up_sec, + status="DRAFT", + created_by=user.id, + ) + db.add(scenario) + await db.flush() + db.add(AuditLog( + actor=user.username if hasattr(user, "username") else str(user.id), + action="PERF_SCENARIO_CREATE", + detail=f"시나리오 등록: {scenario.name} ({scenario.target_url})", + entity_type="PERF_SCENARIO", entity_id=str(scenario.id), severity="INFO", + )) + await db.commit() + await db.refresh(scenario) + return scenario + + +@router.get("/scenarios", response_model=List[ScenarioOut]) +async def list_scenarios( + db: AsyncSession = Depends(get_db), + user: User = Depends(get_current_user), +): + """등록된 시나리오 목록 (재사용 가능한 그리드).""" + rows = (await db.execute( + select(PerfTestScenario) + .where(PerfTestScenario.tenant_id == _tenant(user)) + .order_by(PerfTestScenario.created_at.desc()) + )).scalars().all() + return rows + + +@router.get("/scenarios/{scenario_id}", response_model=ScenarioOut) +async def get_scenario( + scenario_id: int, + db: AsyncSession = Depends(get_db), + user: User = Depends(get_current_user), +): + return await _get_scenario_for_user(db, scenario_id, user) + + +@router.put("/scenarios/{scenario_id}", response_model=ScenarioOut) +async def update_scenario( + scenario_id: int, + body: ScenarioUpdateIn, + db: AsyncSession = Depends(get_db), + user: User = Depends(get_current_user), +): + scenario = await _get_scenario_for_user(db, scenario_id, user) + data = body.model_dump(exclude_unset=True) + if "steps" in data and data["steps"] is not None: + data["steps"] = [s if isinstance(s, dict) else s.model_dump() for s in body.steps] + for k, v in data.items(): + setattr(scenario, k, v) + scenario.updated_at = datetime.utcnow() + db.add(AuditLog( + actor=user.username if hasattr(user, "username") else str(user.id), + action="PERF_SCENARIO_UPDATE", + detail=f"시나리오 수정: {scenario.name}", + entity_type="PERF_SCENARIO", entity_id=str(scenario.id), severity="INFO", + )) + await db.commit() + await db.refresh(scenario) + return scenario + + +@router.delete("/scenarios/{scenario_id}") +async def delete_scenario( + scenario_id: int, + db: AsyncSession = Depends(get_db), + user: User = Depends(get_current_user), +): + scenario = await _get_scenario_for_user(db, scenario_id, user) + name = scenario.name + await db.execute( + PerfTestInputData.__table__.delete().where(PerfTestInputData.scenario_id == scenario_id) + ) + await db.execute( + PerfTestRecording.__table__.delete().where(PerfTestRecording.scenario_id == scenario_id) + ) + await db.delete(scenario) + db.add(AuditLog( + actor=user.username if hasattr(user, "username") else str(user.id), + action="PERF_SCENARIO_DELETE", + detail=f"시나리오 삭제: {name}", + entity_type="PERF_SCENARIO", entity_id=str(scenario_id), severity="WARNING", + )) + await db.commit() + return {"message": f"시나리오 '{name}'이(가) 삭제되었습니다."} + + +# ════════════════════════════════════════════════════════════════════════════ +# 2단계 — 테스트 입력값 엑셀 (openpyxl) +# ════════════════════════════════════════════════════════════════════════════ + +@router.get("/scenarios/{scenario_id}/input-template") +async def download_input_template( + scenario_id: int, + db: AsyncSession = Depends(get_db), + user: User = Depends(get_current_user), +): + """시나리오 steps의 ${변수} 목록을 헤더로, 샘플 행 1개를 포함한 .xlsx 템플릿 다운로드.""" + scenario = await _get_scenario_for_user(db, scenario_id, user) + variables = _extract_variables(scenario) + if not variables: + raise HTTPException(422, "이 시나리오에는 파라미터화 변수(${변수명})가 없습니다.") + + try: + from openpyxl import Workbook + from openpyxl.styles import Font, PatternFill + except ImportError: + raise HTTPException(500, "openpyxl 라이브러리가 설치되지 않았습니다. pip install openpyxl") + + wb = Workbook() + ws = wb.active + ws.title = "입력값" + header_fill = PatternFill(start_color="003366", end_color="003366", fill_type="solid") + header_font = Font(color="FFFFFF", bold=True) + for col, name in enumerate(variables, start=1): + cell = ws.cell(row=1, column=col, value=name) + cell.font = header_font + cell.fill = header_fill + for col, name in enumerate(variables, start=1): + ws.cell(row=2, column=col, value=f"샘플_{name}_1") + + buf = io.BytesIO() + wb.save(buf) + buf.seek(0) + filename = f"perf_input_template_{scenario_id}.xlsx" + return StreamingResponse( + buf, + media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + headers={"Content-Disposition": f"attachment; filename={filename}"}, + ) + + +@router.post("/scenarios/{scenario_id}/input-data") +async def upload_input_data( + scenario_id: int, + file: UploadFile = File(...), + db: AsyncSession = Depends(get_db), + user: User = Depends(get_current_user), +): + """작성된 입력값 엑셀 업로드 → 파싱하여 CSV Data Set Config 원천 데이터로 저장.""" + scenario = await _get_scenario_for_user(db, scenario_id, user) + if not file.filename.lower().endswith((".xlsx", ".xlsm")): + raise HTTPException(422, "xlsx 형식의 엑셀 파일만 업로드 가능합니다.") + + try: + from openpyxl import load_workbook + except ImportError: + raise HTTPException(500, "openpyxl 라이브러리가 설치되지 않았습니다. pip install openpyxl") + + content = await file.read() + try: + wb = load_workbook(io.BytesIO(content), read_only=True, data_only=True) + ws = wb.active + rows_iter = ws.iter_rows(values_only=True) + header = [str(c) if c is not None else "" for c in next(rows_iter)] + data_rows = [ + [("" if c is None else c) for c in r] + for r in rows_iter if any(c is not None for c in r) + ] + except Exception as exc: + raise HTTPException(422, f"엑셀 파일을 읽을 수 없습니다: {exc}") + + if not header or not data_rows: + raise HTTPException(422, "엑셀에 헤더 행과 최소 1개의 데이터 행이 필요합니다.") + + # 기존 입력값 교체 (시나리오당 최신 1세트 유지) + await db.execute( + PerfTestInputData.__table__.delete().where(PerfTestInputData.scenario_id == scenario_id) + ) + input_data = PerfTestInputData(scenario_id=scenario_id, column_names=header, rows=data_rows) + db.add(input_data) + db.add(AuditLog( + actor=user.username if hasattr(user, "username") else str(user.id), + action="PERF_INPUT_DATA_UPLOAD", + detail=f"입력값 업로드: {scenario.name} | 컬럼 {len(header)}개 · 행 {len(data_rows)}개", + entity_type="PERF_SCENARIO", entity_id=str(scenario_id), severity="INFO", + )) + await db.commit() + return { + "scenario_id": scenario_id, + "column_names": header, + "row_count": len(data_rows), + "message": f"입력값 {len(data_rows)}건이 등록되었습니다.", + } + + +# ════════════════════════════════════════════════════════════════════════════ +# 3단계 — Playwright MCP 녹화 → jMeter 스크립트 변환 +# ════════════════════════════════════════════════════════════════════════════ + +@router.post("/scenarios/{scenario_id}/recording") +async def upload_recording( + scenario_id: int, + body: RecordingUploadIn, + db: AsyncSession = Depends(get_db), + user: User = Depends(get_current_user), +): + """Playwright MCP로 녹화한 액션 로그(JSON)를 업로드 — 실제 브라우저 제어는 별도 도구가 수행.""" + scenario = await _get_scenario_for_user(db, scenario_id, user) + if not body.action_log: + raise HTTPException(422, "action_log가 비어 있습니다.") + + recording = PerfTestRecording(scenario_id=scenario_id, action_log=body.action_log) + db.add(recording) + scenario.status = "RECORDED" + scenario.updated_at = datetime.utcnow() + db.add(AuditLog( + actor=user.username if hasattr(user, "username") else str(user.id), + action="PERF_RECORDING_UPLOAD", + detail=f"Playwright 녹화 업로드: {scenario.name} | 액션 {len(body.action_log)}개", + entity_type="PERF_SCENARIO", entity_id=str(scenario_id), severity="INFO", + )) + await db.commit() + await db.refresh(recording) + return { + "recording_id": recording.id, + "scenario_id": scenario_id, + "action_count": len(body.action_log), + "status": scenario.status, + "message": "녹화 결과가 저장되었습니다. /generate-script 로 jMeter 스크립트 변환을 진행하세요.", + } + + +@router.post("/scenarios/{scenario_id}/generate-script") +async def generate_script( + scenario_id: int, + db: AsyncSession = Depends(get_db), + user: User = Depends(get_current_user), +): + """녹화 액션 로그 + 입력값 데이터 → jMeter .jmx 스크립트로 변환.""" + scenario = await _get_scenario_for_user(db, scenario_id, user) + + recording = (await db.execute( + select(PerfTestRecording) + .where(PerfTestRecording.scenario_id == scenario_id) + .order_by(PerfTestRecording.recorded_at.desc()) + )).scalars().first() + if not recording: + raise HTTPException(422, "녹화 결과가 없습니다. 먼저 /recording 으로 액션 로그를 업로드하세요.") + + input_data = (await db.execute( + select(PerfTestInputData).where(PerfTestInputData.scenario_id == scenario_id) + )).scalars().first() + csv_columns = input_data.column_names if input_data else _extract_variables(scenario) + + jmx_xml = _build_jmx_xml(scenario, recording.action_log, csv_columns) + recording.generated_jmx = jmx_xml + scenario.status = "SCRIPT_READY" + scenario.updated_at = datetime.utcnow() + db.add(AuditLog( + actor=user.username if hasattr(user, "username") else str(user.id), + action="PERF_SCRIPT_GENERATE", + detail=f"jMeter 스크립트 변환: {scenario.name} | 샘플러 {len(recording.action_log)}개", + entity_type="PERF_SCENARIO", entity_id=str(scenario_id), severity="INFO", + )) + await db.commit() + return { + "scenario_id": scenario_id, + "recording_id": recording.id, + "status": scenario.status, + "sampler_count": sum(1 for a in recording.action_log if a.get("type") in ("goto", "navigate", "fill", "click", "submit")), + "message": "jMeter 스크립트(.jmx) 변환이 완료되었습니다. /jmx 에서 다운로드하세요.", + } + + +# ════════════════════════════════════════════════════════════════════════════ +# 4단계 — jMeter 셋팅(.jmx) 다운로드 + 실행 연계 +# ════════════════════════════════════════════════════════════════════════════ + +@router.get("/scenarios/{scenario_id}/jmx") +async def download_jmx( + scenario_id: int, + db: AsyncSession = Depends(get_db), + user: User = Depends(get_current_user), +): + """생성된 jMeter .jmx 파일 다운로드.""" + scenario = await _get_scenario_for_user(db, scenario_id, user) + recording = (await db.execute( + select(PerfTestRecording) + .where(PerfTestRecording.scenario_id == scenario_id, PerfTestRecording.generated_jmx.isnot(None)) + .order_by(PerfTestRecording.recorded_at.desc()) + )).scalars().first() + if not recording or not recording.generated_jmx: + raise HTTPException(422, "생성된 jMeter 스크립트가 없습니다. 먼저 /generate-script 를 실행하세요.") + + scenario.status = "JMX_READY" + await db.commit() + filename = f"perf_scenario_{scenario_id}.jmx" + return Response( + content=recording.generated_jmx, + media_type="application/xml", + headers={"Content-Disposition": f"attachment; filename={filename}"}, + ) + + +@router.post("/scenarios/{scenario_id}/run") +async def run_scenario( + scenario_id: int, + db: AsyncSession = Depends(get_db), + user: User = Depends(get_current_user), +): + """생성된 시나리오로 기존 jmeter.py(/api/perf/run) 내장 실행 흐름과 연계.""" + scenario = await _get_scenario_for_user(db, scenario_id, user) + + from routers.jmeter import PerfTestRequest, run_performance_test + + endpoints = [] + for step in (scenario.steps or []): + if step.get("action") in ("goto", "navigate") and step.get("value"): + path = step["value"] + if path.startswith("http"): + from urllib.parse import urlparse + path = urlparse(path).path or "/" + endpoints.append(path) + if not endpoints: + endpoints = ["/"] + + req = PerfTestRequest( + target_url=scenario.target_url, + endpoints=list(dict.fromkeys(endpoints))[:10], + users=scenario.users, + duration=scenario.duration_sec, + ramp_up=scenario.ramp_up_sec, + ) + result = await run_performance_test(req, cu=user) + db.add(AuditLog( + actor=user.username if hasattr(user, "username") else str(user.id), + action="PERF_SCENARIO_RUN", + detail=f"시나리오 실행: {scenario.name} → /api/perf 연계 실행", + entity_type="PERF_SCENARIO", entity_id=str(scenario_id), severity="INFO", + )) + await db.commit() + return result + + +# ════════════════════════════════════════════════════════════════════════════ +# Playwright 액션 로그 → jMeter .jmx 변환 로직 +# ════════════════════════════════════════════════════════════════════════════ + +def _build_jmx_xml(scenario: PerfTestScenario, actions: List[dict], csv_columns: List[str]) -> str: + """표준 Apache JMeter 2.13+ XML 스키마(jmeterTestPlan)를 직렬화한다. + + 구조: jmeterTestPlan > hashTree > TestPlan > hashTree + > (CSVDataSet) > ThreadGroup > hashTree > [HTTPSamplerProxy ...] > ResultCollector + """ + root = ET.Element("jmeterTestPlan", version="1.2", properties="5.0", jmeter="5.6.3") + root_tree = ET.SubElement(root, "hashTree") + + test_plan = ET.SubElement(root_tree, "TestPlan", testname=scenario.name, enabled="true") + ET.SubElement(test_plan, "stringProp", name="TestPlan.comments").text = scenario.description or "" + ET.SubElement(test_plan, "boolProp", name="TestPlan.functional_mode").text = "false" + ET.SubElement(test_plan, "boolProp", name="TestPlan.serialize_threadgroups").text = "false" + plan_tree = ET.SubElement(root_tree, "hashTree") + + if csv_columns: + csv_cfg = ET.SubElement( + plan_tree, "CSVDataSet", + guiclass="TestBeanGUI", testclass="CSVDataSet", + testname="입력값 CSV Data Set Config", enabled="true", + ) + ET.SubElement(csv_cfg, "stringProp", name="filename").text = f"perf_scenario_{scenario.id}_input.csv" + ET.SubElement(csv_cfg, "stringProp", name="variableNames").text = ",".join(csv_columns) + ET.SubElement(csv_cfg, "stringProp", name="delimiter").text = "," + ET.SubElement(csv_cfg, "boolProp", name="recycle").text = "true" + ET.SubElement(csv_cfg, "boolProp", name="stopThread").text = "false" + ET.SubElement(plan_tree, "hashTree") + + thread_group = ET.SubElement( + plan_tree, "ThreadGroup", + guiclass="ThreadGroupGui", testclass="ThreadGroup", + testname=f"{scenario.name} — Thread Group", enabled="true", + ) + loop_ctrl = ET.SubElement(thread_group, "elementProp", name="ThreadGroup.main_controller", + elementType="LoopController", guiclass="LoopControlPanel", + testclass="LoopController", testname="루프 제어") + ET.SubElement(loop_ctrl, "boolProp", name="LoopController.continue_forever").text = "false" + ET.SubElement(loop_ctrl, "stringProp", name="LoopController.loops").text = "1" + ET.SubElement(thread_group, "stringProp", name="ThreadGroup.num_threads").text = str(scenario.users) + ET.SubElement(thread_group, "stringProp", name="ThreadGroup.ramp_time").text = str(scenario.ramp_up_sec) + ET.SubElement(thread_group, "stringProp", name="ThreadGroup.duration").text = str(scenario.duration_sec) + ET.SubElement(thread_group, "boolProp", name="ThreadGroup.scheduler").text = "true" + tg_tree = ET.SubElement(plan_tree, "hashTree") + + base_url = scenario.target_url or "" + from urllib.parse import urlparse + parsed = urlparse(base_url) + domain = parsed.hostname or "localhost" + port = str(parsed.port or (443 if parsed.scheme == "https" else 80)) + protocol = parsed.scheme or "http" + + sampler_idx = 0 + for action in actions: + atype = (action.get("type") or action.get("action") or "").lower() + if atype not in ("goto", "navigate", "fill", "click", "submit"): + continue + sampler_idx += 1 + method = "GET" if atype in ("goto", "navigate") else "POST" + path = action.get("url") or action.get("value") or "/" + if path.startswith("http"): + path = urlparse(path).path or "/" + sampler_name = f"#{sampler_idx} {atype} {path}" + + sampler = ET.SubElement( + tg_tree, "HTTPSamplerProxy", + guiclass="HttpTestSampleGui", testclass="HTTPSamplerProxy", + testname=sampler_name, enabled="true", + ) + ET.SubElement(sampler, "stringProp", name="HTTPSampler.domain").text = domain + ET.SubElement(sampler, "stringProp", name="HTTPSampler.port").text = port + ET.SubElement(sampler, "stringProp", name="HTTPSampler.protocol").text = protocol + ET.SubElement(sampler, "stringProp", name="HTTPSampler.path").text = path + ET.SubElement(sampler, "stringProp", name="HTTPSampler.method").text = method + + if method == "POST" and action.get("value"): + args = ET.SubElement(sampler, "elementProp", name="HTTPsampler.Arguments", + elementType="Arguments", guiclass="HTTPArgumentsPanel", + testclass="Arguments", testname="사용자 정의 변수") + coll = ET.SubElement(args, "collectionProp", name="Arguments.arguments") + arg = ET.SubElement(coll, "elementProp", + name=action.get("selector") or f"param{sampler_idx}", + elementType="HTTPArgument") + ET.SubElement(arg, "boolProp", name="HTTPArgument.always_encode").text = "false" + ET.SubElement(arg, "stringProp", name="Argument.value").text = str(action.get("value")) + ET.SubElement(arg, "stringProp", name="Argument.name").text = action.get("selector") or f"param{sampler_idx}" + ET.SubElement(arg, "stringProp", name="Argument.metadata").text = "=" + + sampler_tree = ET.SubElement(tg_tree, "hashTree") + think_time = action.get("think_time") + if think_time: + timer = ET.SubElement( + sampler_tree, "ConstantTimer", + guiclass="ConstantTimerGui", testclass="ConstantTimer", + testname="고정 대기시간", enabled="true", + ) + ET.SubElement(timer, "stringProp", name="ConstantTimer.delay").text = str(int(float(think_time) * 1000)) + ET.SubElement(sampler_tree, "hashTree") + + collector = ET.SubElement( + tg_tree, "ResultCollector", + guiclass="ViewResultsFullVisualizer", testclass="ResultCollector", + testname="결과 트리 리스너", enabled="true", + ) + ET.SubElement(collector, "boolProp", name="ResultCollector.error_logging").text = "false" + ET.SubElement(tg_tree, "hashTree") + + xml_bytes = ET.tostring(root, encoding="unicode") + return f'\n{xml_bytes}\n'