You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

sse_benchmark.py 29KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770
  1. #!/usr/bin/env python3
  2. """
  3. SSE (Server-Sent Events) Stress Test for Dify Workflow API
  4. This script stress tests the streaming performance of Dify's workflow execution API,
  5. measuring key metrics like connection rate, event throughput, and time to first event (TTFE).
  6. """
  7. import json
  8. import time
  9. import random
  10. import sys
  11. import threading
  12. import os
  13. import logging
  14. import statistics
  15. from pathlib import Path
  16. from collections import deque
  17. from datetime import datetime
  18. from dataclasses import dataclass, asdict
  19. from locust import HttpUser, task, between, events, constant
  20. from typing import TypedDict, Literal, TypeAlias
  21. import requests.exceptions
  22. # Add the stress-test directory to path to import common modules
  23. sys.path.insert(0, str(Path(__file__).parent))
  24. from common.config_helper import ConfigHelper # type: ignore[import-not-found]
  25. # Configure logging
  26. logging.basicConfig(
  27. level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
  28. )
  29. logger = logging.getLogger(__name__)
  30. # Configuration from environment
  31. WORKFLOW_PATH = os.getenv("WORKFLOW_PATH", "/v1/workflows/run")
  32. CONNECT_TIMEOUT = float(os.getenv("CONNECT_TIMEOUT", "10"))
  33. READ_TIMEOUT = float(os.getenv("READ_TIMEOUT", "60"))
  34. TERMINAL_EVENTS = [e.strip() for e in os.getenv("TERMINAL_EVENTS", "workflow_finished,error").split(",") if e.strip()]
  35. QUESTIONS_FILE = os.getenv("QUESTIONS_FILE", "")
  36. # Type definitions
  37. ErrorType: TypeAlias = Literal[
  38. "connection_error",
  39. "timeout",
  40. "invalid_json",
  41. "http_4xx",
  42. "http_5xx",
  43. "early_termination",
  44. "invalid_response",
  45. ]
  46. class ErrorCounts(TypedDict):
  47. """Error count tracking"""
  48. connection_error: int
  49. timeout: int
  50. invalid_json: int
  51. http_4xx: int
  52. http_5xx: int
  53. early_termination: int
  54. invalid_response: int
  55. class SSEEvent(TypedDict):
  56. """Server-Sent Event structure"""
  57. data: str
  58. event: str
  59. id: str | None
  60. class WorkflowInputs(TypedDict):
  61. """Workflow input structure"""
  62. question: str
  63. class WorkflowRequestData(TypedDict):
  64. """Workflow request payload"""
  65. inputs: WorkflowInputs
  66. response_mode: Literal["streaming"]
  67. user: str
  68. class ParsedEventData(TypedDict, total=False):
  69. """Parsed event data from SSE stream"""
  70. event: str
  71. task_id: str
  72. workflow_run_id: str
  73. data: object # For dynamic content
  74. created_at: int
  75. class LocustStats(TypedDict):
  76. """Locust statistics structure"""
  77. total_requests: int
  78. total_failures: int
  79. avg_response_time: float
  80. min_response_time: float
  81. max_response_time: float
  82. class ReportData(TypedDict):
  83. """JSON report structure"""
  84. timestamp: str
  85. duration_seconds: float
  86. metrics: dict[str, object] # Metrics as dict for JSON serialization
  87. locust_stats: LocustStats | None
  88. @dataclass
  89. class StreamMetrics:
  90. """Metrics for a single stream"""
  91. stream_duration: float
  92. events_count: int
  93. bytes_received: int
  94. ttfe: float
  95. inter_event_times: list[float]
  96. @dataclass
  97. class MetricsSnapshot:
  98. """Snapshot of current metrics state"""
  99. active_connections: int
  100. total_connections: int
  101. total_events: int
  102. connection_rate: float
  103. event_rate: float
  104. overall_conn_rate: float
  105. overall_event_rate: float
  106. ttfe_avg: float
  107. ttfe_min: float
  108. ttfe_max: float
  109. ttfe_p50: float
  110. ttfe_p95: float
  111. ttfe_samples: int
  112. ttfe_total_samples: int # Total TTFE samples collected (not limited by window)
  113. error_counts: ErrorCounts
  114. stream_duration_avg: float
  115. stream_duration_p50: float
  116. stream_duration_p95: float
  117. events_per_stream_avg: float
  118. inter_event_latency_avg: float
  119. inter_event_latency_p50: float
  120. inter_event_latency_p95: float
  121. class MetricsTracker:
  122. def __init__(self) -> None:
  123. self.lock = threading.Lock()
  124. self.active_connections = 0
  125. self.total_connections = 0
  126. self.total_events = 0
  127. self.start_time = time.time()
  128. # Enhanced metrics with memory limits
  129. self.max_samples = 10000 # Prevent unbounded growth
  130. self.ttfe_samples: deque[float] = deque(maxlen=self.max_samples)
  131. self.ttfe_total_count = 0 # Track total TTFE samples collected
  132. # For rate calculations - no maxlen to avoid artificial limits
  133. self.connection_times: deque[float] = deque()
  134. self.event_times: deque[float] = deque()
  135. self.last_stats_time = time.time()
  136. self.last_total_connections = 0
  137. self.last_total_events = 0
  138. self.stream_metrics: deque[StreamMetrics] = deque(maxlen=self.max_samples)
  139. self.error_counts: ErrorCounts = ErrorCounts(
  140. connection_error=0,
  141. timeout=0,
  142. invalid_json=0,
  143. http_4xx=0,
  144. http_5xx=0,
  145. early_termination=0,
  146. invalid_response=0,
  147. )
  148. def connection_started(self) -> None:
  149. with self.lock:
  150. self.active_connections += 1
  151. self.total_connections += 1
  152. self.connection_times.append(time.time())
  153. def connection_ended(self) -> None:
  154. with self.lock:
  155. self.active_connections -= 1
  156. def event_received(self) -> None:
  157. with self.lock:
  158. self.total_events += 1
  159. self.event_times.append(time.time())
  160. def record_ttfe(self, ttfe_ms: float) -> None:
  161. with self.lock:
  162. self.ttfe_samples.append(ttfe_ms) # deque handles maxlen
  163. self.ttfe_total_count += 1 # Increment total counter
  164. def record_stream_metrics(self, metrics: StreamMetrics) -> None:
  165. with self.lock:
  166. self.stream_metrics.append(metrics) # deque handles maxlen
  167. def record_error(self, error_type: ErrorType) -> None:
  168. with self.lock:
  169. self.error_counts[error_type] += 1
  170. def get_stats(self) -> MetricsSnapshot:
  171. with self.lock:
  172. current_time = time.time()
  173. time_window = 10.0 # 10 second window for rate calculation
  174. # Clean up old timestamps outside the window
  175. cutoff_time = current_time - time_window
  176. while self.connection_times and self.connection_times[0] < cutoff_time:
  177. self.connection_times.popleft()
  178. while self.event_times and self.event_times[0] < cutoff_time:
  179. self.event_times.popleft()
  180. # Calculate rates based on actual window or elapsed time
  181. window_duration = min(time_window, current_time - self.start_time)
  182. if window_duration > 0:
  183. conn_rate = len(self.connection_times) / window_duration
  184. event_rate = len(self.event_times) / window_duration
  185. else:
  186. conn_rate = 0
  187. event_rate = 0
  188. # Calculate TTFE statistics
  189. if self.ttfe_samples:
  190. avg_ttfe = statistics.mean(self.ttfe_samples)
  191. min_ttfe = min(self.ttfe_samples)
  192. max_ttfe = max(self.ttfe_samples)
  193. p50_ttfe = statistics.median(self.ttfe_samples)
  194. if len(self.ttfe_samples) >= 2:
  195. quantiles = statistics.quantiles(
  196. self.ttfe_samples, n=20, method="inclusive"
  197. )
  198. p95_ttfe = quantiles[18] # 19th of 19 quantiles = 95th percentile
  199. else:
  200. p95_ttfe = max_ttfe
  201. else:
  202. avg_ttfe = min_ttfe = max_ttfe = p50_ttfe = p95_ttfe = 0
  203. # Calculate stream metrics
  204. if self.stream_metrics:
  205. durations = [m.stream_duration for m in self.stream_metrics]
  206. events_per_stream = [m.events_count for m in self.stream_metrics]
  207. stream_duration_avg = statistics.mean(durations)
  208. stream_duration_p50 = statistics.median(durations)
  209. stream_duration_p95 = (
  210. statistics.quantiles(durations, n=20, method="inclusive")[18]
  211. if len(durations) >= 2
  212. else max(durations)
  213. if durations
  214. else 0
  215. )
  216. events_per_stream_avg = (
  217. statistics.mean(events_per_stream) if events_per_stream else 0
  218. )
  219. # Calculate inter-event latency statistics
  220. all_inter_event_times = []
  221. for m in self.stream_metrics:
  222. all_inter_event_times.extend(m.inter_event_times)
  223. if all_inter_event_times:
  224. inter_event_latency_avg = statistics.mean(all_inter_event_times)
  225. inter_event_latency_p50 = statistics.median(all_inter_event_times)
  226. inter_event_latency_p95 = (
  227. statistics.quantiles(
  228. all_inter_event_times, n=20, method="inclusive"
  229. )[18]
  230. if len(all_inter_event_times) >= 2
  231. else max(all_inter_event_times)
  232. )
  233. else:
  234. inter_event_latency_avg = inter_event_latency_p50 = (
  235. inter_event_latency_p95
  236. ) = 0
  237. else:
  238. stream_duration_avg = stream_duration_p50 = stream_duration_p95 = (
  239. events_per_stream_avg
  240. ) = 0
  241. inter_event_latency_avg = inter_event_latency_p50 = (
  242. inter_event_latency_p95
  243. ) = 0
  244. # Also calculate overall average rates
  245. total_elapsed = current_time - self.start_time
  246. overall_conn_rate = (
  247. self.total_connections / total_elapsed if total_elapsed > 0 else 0
  248. )
  249. overall_event_rate = (
  250. self.total_events / total_elapsed if total_elapsed > 0 else 0
  251. )
  252. return MetricsSnapshot(
  253. active_connections=self.active_connections,
  254. total_connections=self.total_connections,
  255. total_events=self.total_events,
  256. connection_rate=conn_rate,
  257. event_rate=event_rate,
  258. overall_conn_rate=overall_conn_rate,
  259. overall_event_rate=overall_event_rate,
  260. ttfe_avg=avg_ttfe,
  261. ttfe_min=min_ttfe,
  262. ttfe_max=max_ttfe,
  263. ttfe_p50=p50_ttfe,
  264. ttfe_p95=p95_ttfe,
  265. ttfe_samples=len(self.ttfe_samples),
  266. ttfe_total_samples=self.ttfe_total_count, # Return total count
  267. error_counts=ErrorCounts(**self.error_counts),
  268. stream_duration_avg=stream_duration_avg,
  269. stream_duration_p50=stream_duration_p50,
  270. stream_duration_p95=stream_duration_p95,
  271. events_per_stream_avg=events_per_stream_avg,
  272. inter_event_latency_avg=inter_event_latency_avg,
  273. inter_event_latency_p50=inter_event_latency_p50,
  274. inter_event_latency_p95=inter_event_latency_p95,
  275. )
  276. # Global metrics instance
  277. metrics = MetricsTracker()
  278. class SSEParser:
  279. """Parser for Server-Sent Events according to W3C spec"""
  280. def __init__(self) -> None:
  281. self.data_buffer: list[str] = []
  282. self.event_type: str | None = None
  283. self.event_id: str | None = None
  284. def parse_line(self, line: str) -> SSEEvent | None:
  285. """Parse a single SSE line and return event if complete"""
  286. # Empty line signals end of event
  287. if not line:
  288. if self.data_buffer:
  289. event = SSEEvent(
  290. data="\n".join(self.data_buffer),
  291. event=self.event_type or "message",
  292. id=self.event_id,
  293. )
  294. self.data_buffer = []
  295. self.event_type = None
  296. self.event_id = None
  297. return event
  298. return None
  299. # Comment line
  300. if line.startswith(":"):
  301. return None
  302. # Parse field
  303. if ":" in line:
  304. field, value = line.split(":", 1)
  305. value = value.lstrip()
  306. if field == "data":
  307. self.data_buffer.append(value)
  308. elif field == "event":
  309. self.event_type = value
  310. elif field == "id":
  311. self.event_id = value
  312. return None
  313. # Note: SSEClient removed - we'll handle SSE parsing directly in the task for better Locust integration
  314. class DifyWorkflowUser(HttpUser):
  315. """Locust user for testing Dify workflow SSE endpoints"""
  316. # Use constant wait for streaming workloads
  317. wait_time = constant(0) if os.getenv("WAIT_TIME", "0") == "0" else between(1, 3)
  318. def __init__(self, *args: object, **kwargs: object) -> None:
  319. super().__init__(*args, **kwargs) # type: ignore[arg-type]
  320. # Load API configuration
  321. config_helper = ConfigHelper()
  322. self.api_token = config_helper.get_api_key()
  323. if not self.api_token:
  324. raise ValueError("API key not found. Please run setup_all.py first.")
  325. # Load questions from file or use defaults
  326. if QUESTIONS_FILE and os.path.exists(QUESTIONS_FILE):
  327. with open(QUESTIONS_FILE, "r") as f:
  328. self.questions = [line.strip() for line in f if line.strip()]
  329. else:
  330. self.questions = [
  331. "What is artificial intelligence?",
  332. "Explain quantum computing",
  333. "What is machine learning?",
  334. "How do neural networks work?",
  335. "What is renewable energy?",
  336. ]
  337. self.user_counter = 0
  338. def on_start(self) -> None:
  339. """Called when a user starts"""
  340. self.user_counter = 0
  341. @task
  342. def test_workflow_stream(self) -> None:
  343. """Test workflow SSE streaming endpoint"""
  344. question = random.choice(self.questions)
  345. self.user_counter += 1
  346. headers = {
  347. "Authorization": f"Bearer {self.api_token}",
  348. "Content-Type": "application/json",
  349. "Accept": "text/event-stream",
  350. "Cache-Control": "no-cache",
  351. }
  352. data = WorkflowRequestData(
  353. inputs=WorkflowInputs(question=question),
  354. response_mode="streaming",
  355. user=f"user_{self.user_counter}",
  356. )
  357. start_time = time.time()
  358. first_event_time = None
  359. event_count = 0
  360. inter_event_times: list[float] = []
  361. last_event_time = None
  362. ttfe = 0
  363. request_success = False
  364. bytes_received = 0
  365. metrics.connection_started()
  366. # Use catch_response context manager directly
  367. with self.client.request(
  368. method="POST",
  369. url=WORKFLOW_PATH,
  370. headers=headers,
  371. json=data,
  372. stream=True,
  373. catch_response=True,
  374. timeout=(CONNECT_TIMEOUT, READ_TIMEOUT),
  375. name="/v1/workflows/run", # Name for Locust stats
  376. ) as response:
  377. try:
  378. # Validate response
  379. if response.status_code >= 400:
  380. error_type: ErrorType = (
  381. "http_4xx" if response.status_code < 500 else "http_5xx"
  382. )
  383. metrics.record_error(error_type)
  384. response.failure(f"HTTP {response.status_code}")
  385. return
  386. content_type = response.headers.get("Content-Type", "")
  387. if (
  388. "text/event-stream" not in content_type
  389. and "application/json" not in content_type
  390. ):
  391. logger.error(f"Expected text/event-stream, got: {content_type}")
  392. metrics.record_error("invalid_response")
  393. response.failure(f"Invalid content type: {content_type}")
  394. return
  395. # Parse SSE events
  396. parser = SSEParser()
  397. for line in response.iter_lines(decode_unicode=True):
  398. # Check if runner is stopping
  399. if getattr(self.environment.runner, 'state', '') in ('stopping', 'stopped'):
  400. logger.debug("Runner stopping, breaking streaming loop")
  401. break
  402. if line is not None:
  403. bytes_received += len(line.encode("utf-8"))
  404. # Parse SSE line
  405. event = parser.parse_line(line if line is not None else "")
  406. if event:
  407. event_count += 1
  408. current_time = time.time()
  409. metrics.event_received()
  410. # Track inter-event timing
  411. if last_event_time:
  412. inter_event_times.append(
  413. (current_time - last_event_time) * 1000
  414. )
  415. last_event_time = current_time
  416. if first_event_time is None:
  417. first_event_time = current_time
  418. ttfe = (first_event_time - start_time) * 1000
  419. metrics.record_ttfe(ttfe)
  420. try:
  421. # Parse event data
  422. event_data = event.get("data", "")
  423. if event_data:
  424. if event_data == "[DONE]":
  425. logger.debug("Received [DONE] sentinel")
  426. request_success = True
  427. break
  428. try:
  429. parsed_event: ParsedEventData = json.loads(event_data)
  430. # Check for terminal events
  431. if parsed_event.get("event") in TERMINAL_EVENTS:
  432. logger.debug(
  433. f"Received terminal event: {parsed_event.get('event')}"
  434. )
  435. request_success = True
  436. break
  437. except json.JSONDecodeError as e:
  438. logger.debug(
  439. f"JSON decode error: {e} for data: {event_data[:100]}"
  440. )
  441. metrics.record_error("invalid_json")
  442. except Exception as e:
  443. logger.error(f"Error processing event: {e}")
  444. # Mark success only if terminal condition was met or events were received
  445. if request_success:
  446. response.success()
  447. elif event_count > 0:
  448. # Got events but no proper terminal condition
  449. metrics.record_error("early_termination")
  450. response.failure("Stream ended without terminal event")
  451. else:
  452. response.failure("No events received")
  453. except (
  454. requests.exceptions.ConnectTimeout,
  455. requests.exceptions.ReadTimeout,
  456. ) as e:
  457. metrics.record_error("timeout")
  458. response.failure(f"Timeout: {e}")
  459. except (
  460. requests.exceptions.ConnectionError,
  461. requests.exceptions.RequestException,
  462. ) as e:
  463. metrics.record_error("connection_error")
  464. response.failure(f"Connection error: {e}")
  465. except Exception as e:
  466. response.failure(str(e))
  467. raise
  468. finally:
  469. metrics.connection_ended()
  470. # Record stream metrics
  471. if event_count > 0:
  472. stream_duration = (time.time() - start_time) * 1000
  473. stream_metrics = StreamMetrics(
  474. stream_duration=stream_duration,
  475. events_count=event_count,
  476. bytes_received=bytes_received,
  477. ttfe=ttfe,
  478. inter_event_times=inter_event_times,
  479. )
  480. metrics.record_stream_metrics(stream_metrics)
  481. logger.debug(
  482. f"Stream completed: {event_count} events, {stream_duration:.1f}ms, success={request_success}"
  483. )
  484. else:
  485. logger.warning("No events received in stream")
  486. # Event handlers
  487. @events.test_start.add_listener # type: ignore[misc]
  488. def on_test_start(environment: object, **kwargs: object) -> None:
  489. logger.info("=" * 80)
  490. logger.info(" " * 25 + "DIFY SSE BENCHMARK - REAL-TIME METRICS")
  491. logger.info("=" * 80)
  492. logger.info(f"Started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  493. logger.info("=" * 80)
  494. # Periodic stats reporting
  495. def report_stats() -> None:
  496. if not hasattr(environment, 'runner'):
  497. return
  498. runner = environment.runner
  499. while hasattr(runner, 'state') and runner.state not in ["stopped", "stopping"]:
  500. time.sleep(5) # Report every 5 seconds
  501. if hasattr(runner, 'state') and runner.state == "running":
  502. stats = metrics.get_stats()
  503. # Only log on master node in distributed mode
  504. is_master = not getattr(environment.runner, "worker_id", None) if hasattr(environment, 'runner') else True
  505. if is_master:
  506. # Clear previous lines and show updated stats
  507. logger.info("\n" + "=" * 80)
  508. logger.info(
  509. f"{'METRIC':<25} {'CURRENT':>15} {'RATE (10s)':>15} {'AVG (overall)':>15} {'TOTAL':>12}"
  510. )
  511. logger.info("-" * 80)
  512. # Active SSE Connections
  513. logger.info(
  514. f"{'Active SSE Connections':<25} {stats.active_connections:>15,d} {'-':>15} {'-':>12} {'-':>12}"
  515. )
  516. # New Connection Rate
  517. logger.info(
  518. f"{'New Connections':<25} {'-':>15} {stats.connection_rate:>13.2f}/s {stats.overall_conn_rate:>13.2f}/s {stats.total_connections:>12,d}"
  519. )
  520. # Event Throughput
  521. logger.info(
  522. f"{'Event Throughput':<25} {'-':>15} {stats.event_rate:>13.2f}/s {stats.overall_event_rate:>13.2f}/s {stats.total_events:>12,d}"
  523. )
  524. logger.info("-" * 80)
  525. logger.info(
  526. f"{'TIME TO FIRST EVENT':<25} {'AVG':>15} {'P50':>10} {'P95':>10} {'MIN':>10} {'MAX':>10}"
  527. )
  528. logger.info(
  529. f"{'(TTFE in ms)':<25} {stats.ttfe_avg:>15.1f} {stats.ttfe_p50:>10.1f} {stats.ttfe_p95:>10.1f} {stats.ttfe_min:>10.1f} {stats.ttfe_max:>10.1f}"
  530. )
  531. logger.info(f"{'Window Samples':<25} {stats.ttfe_samples:>15,d} (last {min(10000, stats.ttfe_total_samples):,d} samples)")
  532. logger.info(f"{'Total Samples':<25} {stats.ttfe_total_samples:>15,d}")
  533. # Inter-event latency
  534. if stats.inter_event_latency_avg > 0:
  535. logger.info("-" * 80)
  536. logger.info(
  537. f"{'INTER-EVENT LATENCY':<25} {'AVG':>15} {'P50':>10} {'P95':>10}"
  538. )
  539. logger.info(
  540. f"{'(ms between events)':<25} {stats.inter_event_latency_avg:>15.1f} {stats.inter_event_latency_p50:>10.1f} {stats.inter_event_latency_p95:>10.1f}"
  541. )
  542. # Error stats
  543. if any(stats.error_counts.values()):
  544. logger.info("-" * 80)
  545. logger.info(f"{'ERROR TYPE':<25} {'COUNT':>15}")
  546. for error_type, count in stats.error_counts.items():
  547. if isinstance(count, int) and count > 0:
  548. logger.info(f"{error_type:<25} {count:>15,d}")
  549. logger.info("=" * 80)
  550. # Show Locust stats summary
  551. if hasattr(environment, 'stats') and hasattr(environment.stats, 'total'):
  552. total = environment.stats.total
  553. if hasattr(total, 'num_requests') and total.num_requests > 0:
  554. logger.info(
  555. f"{'LOCUST STATS':<25} {'Requests':>12} {'Fails':>8} {'Avg (ms)':>12} {'Min':>8} {'Max':>8}"
  556. )
  557. logger.info("-" * 80)
  558. logger.info(
  559. f"{'Aggregated':<25} {total.num_requests:>12,d} "
  560. f"{total.num_failures:>8,d} "
  561. f"{total.avg_response_time:>12.1f} "
  562. f"{total.min_response_time:>8.0f} "
  563. f"{total.max_response_time:>8.0f}"
  564. )
  565. logger.info("=" * 80)
  566. threading.Thread(target=report_stats, daemon=True).start()
  567. @events.test_stop.add_listener # type: ignore[misc]
  568. def on_test_stop(environment: object, **kwargs: object) -> None:
  569. stats = metrics.get_stats()
  570. test_duration = time.time() - metrics.start_time
  571. # Log final results
  572. logger.info("\n" + "=" * 80)
  573. logger.info(" " * 30 + "FINAL BENCHMARK RESULTS")
  574. logger.info("=" * 80)
  575. logger.info(f"Test Duration: {test_duration:.1f} seconds")
  576. logger.info("-" * 80)
  577. logger.info("")
  578. logger.info("CONNECTIONS")
  579. logger.info(f" {'Total Connections:':<30} {stats.total_connections:>10,d}")
  580. logger.info(f" {'Final Active:':<30} {stats.active_connections:>10,d}")
  581. logger.info(f" {'Average Rate:':<30} {stats.overall_conn_rate:>10.2f} conn/s")
  582. logger.info("")
  583. logger.info("EVENTS")
  584. logger.info(f" {'Total Events Received:':<30} {stats.total_events:>10,d}")
  585. logger.info(
  586. f" {'Average Throughput:':<30} {stats.overall_event_rate:>10.2f} events/s"
  587. )
  588. logger.info(
  589. f" {'Final Rate (10s window):':<30} {stats.event_rate:>10.2f} events/s"
  590. )
  591. logger.info("")
  592. logger.info("STREAM METRICS")
  593. logger.info(f" {'Avg Stream Duration:':<30} {stats.stream_duration_avg:>10.1f} ms")
  594. logger.info(f" {'P50 Stream Duration:':<30} {stats.stream_duration_p50:>10.1f} ms")
  595. logger.info(f" {'P95 Stream Duration:':<30} {stats.stream_duration_p95:>10.1f} ms")
  596. logger.info(
  597. f" {'Avg Events per Stream:':<30} {stats.events_per_stream_avg:>10.1f}"
  598. )
  599. logger.info("")
  600. logger.info("INTER-EVENT LATENCY")
  601. logger.info(f" {'Average:':<30} {stats.inter_event_latency_avg:>10.1f} ms")
  602. logger.info(f" {'Median (P50):':<30} {stats.inter_event_latency_p50:>10.1f} ms")
  603. logger.info(f" {'95th Percentile:':<30} {stats.inter_event_latency_p95:>10.1f} ms")
  604. logger.info("")
  605. logger.info("TIME TO FIRST EVENT (ms)")
  606. logger.info(f" {'Average:':<30} {stats.ttfe_avg:>10.1f} ms")
  607. logger.info(f" {'Median (P50):':<30} {stats.ttfe_p50:>10.1f} ms")
  608. logger.info(f" {'95th Percentile:':<30} {stats.ttfe_p95:>10.1f} ms")
  609. logger.info(f" {'Minimum:':<30} {stats.ttfe_min:>10.1f} ms")
  610. logger.info(f" {'Maximum:':<30} {stats.ttfe_max:>10.1f} ms")
  611. logger.info(f" {'Window Samples:':<30} {stats.ttfe_samples:>10,d} (last {min(10000, stats.ttfe_total_samples):,d})")
  612. logger.info(f" {'Total Samples:':<30} {stats.ttfe_total_samples:>10,d}")
  613. # Error summary
  614. if any(stats.error_counts.values()):
  615. logger.info("")
  616. logger.info("ERRORS")
  617. for error_type, count in stats.error_counts.items():
  618. if isinstance(count, int) and count > 0:
  619. logger.info(f" {error_type:<30} {count:>10,d}")
  620. logger.info("=" * 80 + "\n")
  621. # Export machine-readable report (only on master node)
  622. is_master = not getattr(environment.runner, 'worker_id', None) if hasattr(environment, 'runner') else True
  623. if is_master:
  624. export_json_report(stats, test_duration, environment)
  625. def export_json_report(stats: MetricsSnapshot, duration: float, environment: object) -> None:
  626. """Export metrics to JSON file for CI/CD analysis"""
  627. reports_dir = Path(__file__).parent / "reports"
  628. reports_dir.mkdir(exist_ok=True)
  629. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  630. report_file = reports_dir / f"sse_metrics_{timestamp}.json"
  631. # Access environment.stats.total attributes safely
  632. locust_stats: LocustStats | None = None
  633. if hasattr(environment, 'stats') and hasattr(environment.stats, 'total'):
  634. total = environment.stats.total
  635. if hasattr(total, 'num_requests') and total.num_requests > 0:
  636. locust_stats = LocustStats(
  637. total_requests=total.num_requests,
  638. total_failures=total.num_failures,
  639. avg_response_time=total.avg_response_time,
  640. min_response_time=total.min_response_time,
  641. max_response_time=total.max_response_time,
  642. )
  643. report_data = ReportData(
  644. timestamp=datetime.now().isoformat(),
  645. duration_seconds=duration,
  646. metrics=asdict(stats), # type: ignore[arg-type]
  647. locust_stats=locust_stats,
  648. )
  649. with open(report_file, "w") as f:
  650. json.dump(report_data, f, indent=2)
  651. logger.info(f"Exported metrics to {report_file}")