Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions stress-test/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
results/
3 changes: 3 additions & 0 deletions stress-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ serde_json = { workspace = true, features = ["std"] }
blake2 = "0.10"
twox-hash = "2"

# Ed25519 (HOP ephemeral recipient keys)
ed25519-dalek = { version = "2", features = ["rand_core"] }

# Utilities
anyhow = "1"
log = { workspace = true, features = ["std"] }
Expand Down
Binary file added stress-test/hop-results.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
139 changes: 139 additions & 0 deletions stress-test/plot-hop-results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#!/usr/bin/env python3
"""
Render a 2-panel chart from a stress-test --output-file JSON.

Left panel: throughput (ops/s + MB/s) per payload size.
Right panel: latency percentile bars stacked as p50 + (p90 - p50) +
(p99 - p90). Each band shows the *additional* latency
contributed by tightening the percentile.

Usage:
./plot-hop-results.py results/hop-all.json -o stress-test/hop-results.png
"""

import argparse
import json
import re
import sys
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np

PAYLOAD_RE = re.compile(r"(\d+(?:\.\d+)?)\s*(B|KB|MB|GB)$")


def parse_payload(name: str) -> float | None:
m = PAYLOAD_RE.search(name)
if not m:
return None
val, unit = float(m.group(1)), m.group(2)
return val * {"B": 1, "KB": 1024, "MB": 1024 ** 2, "GB": 1024 ** 3}[unit]


def fmt_payload(size: float) -> str:
if size >= 1024 ** 2:
return f"{int(size // 1024 ** 2)}MB"
if size >= 1024:
return f"{int(size // 1024)}KB"
return f"{int(size)}B"


def duration_ms(d) -> float:
if isinstance(d, dict):
return float(d.get("secs", 0)) * 1000.0 + float(d.get("nanos", 0)) / 1_000_000.0
return float(d) * 1000.0


def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("input", type=Path, help="results JSON from --output-file")
ap.add_argument("-o", "--output", type=Path, required=True)
ap.add_argument("--title",
default="HOP Stress Test Results (Rust, local zombienet, 2 collators)")
args = ap.parse_args()

with args.input.open() as f:
results = json.load(f)

submits = [r for r in results
if r.get("name", "").startswith("HOP submit ")
and r.get("inclusion_latency")]
submits.sort(key=lambda r: parse_payload(r["name"]) or 0)
if not submits:
print("No HOP submit-only results with latency data", file=sys.stderr)
return 1

labels = [fmt_payload(parse_payload(r["name"]) or 0) for r in submits]
ops = [r.get("throughput_tps", 0.0) for r in submits]
mbs = [r.get("throughput_bytes_per_sec", 0.0) / (1024 ** 2) for r in submits]

fig, (ax_left, ax_right) = plt.subplots(
1, 2, figsize=(13, 7), gridspec_kw={"width_ratios": [1, 1.2]}
)
fig.suptitle(args.title, fontsize=15, fontweight="bold")

# ---------------- Left: throughput ----------------
x = np.arange(len(submits))
bw = 0.36
ax_left.bar(x - bw / 2, ops, bw, label="ops/s", color="#1f77b4")
ax_l2 = ax_left.twinx()
ax_l2.bar(x + bw / 2, mbs, bw, label="MB/s", color="#ff7f0e")
ax_left.set_xticks(x); ax_left.set_xticklabels(labels)
ax_left.set_xlabel("Payload Size")
ax_left.set_ylabel("ops/s", color="#1f77b4")
ax_l2.set_ylabel("MB/s", color="#ff7f0e")
ax_left.set_title("Submit Throughput")
if ops:
ax_left.set_ylim(0, max(ops) * 1.18)
if mbs:
ax_l2.set_ylim(0, max(mbs) * 1.18)
for i, (o, m) in enumerate(zip(ops, mbs)):
ax_left.annotate(f"{o:.0f}", (i - bw / 2, o), ha="center", va="bottom", fontsize=9)
ax_l2.annotate(f"{m:.1f}", (i + bw / 2, m), ha="center", va="bottom", fontsize=9)
h1, l1 = ax_left.get_legend_handles_labels()
h2, l2 = ax_l2.get_legend_handles_labels()
ax_left.legend(h1 + h2, l1 + l2, loc="upper left", fontsize=10)
ax_left.grid(True, alpha=0.3, axis="y")

# ---------------- Right: stacked latency ----------------
p50 = np.array([duration_ms(r["inclusion_latency"]["p50"]) for r in submits])
p90 = np.array([
duration_ms(r["inclusion_latency"].get("p90", r["inclusion_latency"]["p99"]))
for r in submits
])
p99 = np.array([duration_ms(r["inclusion_latency"]["p99"]) for r in submits])

seg_p90 = np.maximum(p90 - p50, 0)
seg_p99 = np.maximum(p99 - p90, 0)

bar_w = 0.55
ax_right.bar(x, p50, bar_w, label="p50", color="#7ac74f")
ax_right.bar(x, seg_p90, bar_w, bottom=p50,
label="p90", color="#f7c948")
ax_right.bar(x, seg_p99, bar_w, bottom=p50 + seg_p90,
label="p99", color="#e57373")

for i, total in enumerate(p99):
ax_right.annotate(
f"{total:.0f}ms" if total < 1000 else f"{total / 1000:.2f}s",
(i, total), ha="center", va="bottom", fontsize=9, fontweight="bold",
)

ax_right.set_xticks(x); ax_right.set_xticklabels(labels)
ax_right.set_xlabel("Payload Size")
ax_right.set_ylabel("Latency (ms)")
ax_right.set_title("Submit Latency (p50 / p90 / p99)")
ax_right.legend(loc="upper left", fontsize=10)
ax_right.set_ylim(0, max(p99) * 1.15 if p99.size else 1)
ax_right.grid(True, alpha=0.3, axis="y")

plt.tight_layout()
args.output.parent.mkdir(parents=True, exist_ok=True)
fig.savefig(args.output, dpi=140)
print(f"Wrote {args.output}", file=sys.stderr)
return 0


if __name__ == "__main__":
sys.exit(main())
188 changes: 188 additions & 0 deletions stress-test/src/hop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
use anyhow::{Context, Result};
use ed25519_dalek::{Signer, SigningKey};
use jsonrpsee::{core::client::ClientT, rpc_params, ws_client::WsClient};
use rand::rngs::OsRng;
use serde::Deserialize;
use std::time::Instant;

use crate::client;

// ---------------------------------------------------------------------------
// Types matching the HOP RPC responses
// ---------------------------------------------------------------------------

#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PoolStatus {
pub entry_count: usize,
pub total_bytes: u64,
pub max_bytes: u64,
}

#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SubmitResult {
pub pool_status: PoolStatus,
}

// ---------------------------------------------------------------------------
// Ed25519 keypair helpers
// ---------------------------------------------------------------------------

/// An ephemeral ed25519 keypair used as a HOP recipient.
#[derive(Debug, Clone)]
pub struct RecipientKeypair {
pub signing_key: SigningKey,
}

impl RecipientKeypair {
pub fn generate() -> Self {
Self { signing_key: SigningKey::generate(&mut OsRng) }
}

/// 32-byte public key.
pub fn public_bytes(&self) -> [u8; 32] {
self.signing_key.verifying_key().to_bytes()
}

/// SCALE-encoded `MultiSigner::Ed25519(pubkey)`.
/// MultiSigner enum variant 0 = Ed25519, so: `[0x00] ++ pubkey[32]`.
pub fn scale_multi_signer(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(33);
buf.push(0x00); // Ed25519 variant
buf.extend_from_slice(&self.public_bytes());
buf
}

/// Sign `msg` and return SCALE-encoded `MultiSignature::Ed25519(sig)`.
/// MultiSignature enum variant 0 = Ed25519, so: `[0x00] ++ sig[64]`.
pub fn sign_multi_signature(&self, msg: &[u8]) -> Vec<u8> {
let sig = self.signing_key.sign(msg);
let mut buf = Vec::with_capacity(65);
buf.push(0x00); // Ed25519 variant
buf.extend_from_slice(&sig.to_bytes());
buf
}
}

// ---------------------------------------------------------------------------
// RPC helpers
// ---------------------------------------------------------------------------

/// Submit data to HOP pool. Returns (content_hash, submit_result, latency).
pub async fn hop_submit(
ws: &WsClient,
data: &[u8],
recipients: &[RecipientKeypair],
) -> Result<([u8; 32], SubmitResult, std::time::Duration)> {
let data_hex = format!("0x{}", hex::encode(data));
let recipient_hexes: Vec<String> = recipients
.iter()
.map(|r| format!("0x{}", hex::encode(r.scale_multi_signer())))
.collect();
let proof_hex = "0x"; // NoopVerifier accepts empty proof

let start = Instant::now();
let result: SubmitResult = ws
.request("hop_submit", rpc_params![data_hex, recipient_hexes, proof_hex])
.await
.map_err(|e| anyhow::anyhow!("hop_submit: {e}"))?;
let latency = start.elapsed();

let hash = client::blake2b_256(data);
Ok((hash, result, latency))
}

/// Claim data from HOP pool. Returns (data, latency).
pub async fn hop_claim(
ws: &WsClient,
hash: &[u8; 32],
recipient: &RecipientKeypair,
) -> Result<(Vec<u8>, std::time::Duration)> {
let hash_hex = format!("0x{}", hex::encode(hash));
let signature = recipient.sign_multi_signature(hash);
let sig_hex = format!("0x{}", hex::encode(&signature));

let start = Instant::now();
let data_hex: String = ws
.request("hop_claim", rpc_params![hash_hex, sig_hex])
.await
.map_err(|e| anyhow::anyhow!("hop_claim: {e}"))?;
let latency = start.elapsed();

let data = hex::decode(data_hex.strip_prefix("0x").unwrap_or(&data_hex))
.context("decoding claimed data")?;
Ok((data, latency))
}

/// Get pool status.
pub async fn hop_pool_status(ws: &WsClient) -> Result<PoolStatus> {
let status: PoolStatus = ws
.request("hop_poolStatus", rpc_params![])
.await
.context("hop_poolStatus RPC")?;
Ok(status)
}

/// Try an RPC call that should fail, return the error code (or None if it succeeded).
pub async fn try_hop_submit(
ws: &WsClient,
data: &[u8],
recipients: &[RecipientKeypair],
) -> Option<i32> {
let data_hex = format!("0x{}", hex::encode(data));
let recipient_hexes: Vec<String> = recipients
.iter()
.map(|r| format!("0x{}", hex::encode(r.scale_multi_signer())))
.collect();

let result: Result<SubmitResult, _> =
ws.request("hop_submit", rpc_params![data_hex, recipient_hexes, "0x"]).await;
match result {
Err(e) => extract_error_code(&e),
Ok(_) => None,
}
}

pub async fn try_hop_claim(
ws: &WsClient,
hash: &[u8],
recipient: &RecipientKeypair,
) -> Option<i32> {
let hash_hex = format!("0x{}", hex::encode(hash));
let signature = recipient.sign_multi_signature(hash);
let sig_hex = format!("0x{}", hex::encode(&signature));

let result: Result<String, _> = ws.request("hop_claim", rpc_params![hash_hex, sig_hex]).await;
match result {
Err(e) => extract_error_code(&e),
Ok(_) => None,
}
}

fn extract_error_code(err: &jsonrpsee::core::ClientError) -> Option<i32> {
if let jsonrpsee::core::ClientError::Call(obj) = err {
Some(obj.code())
} else {
None
}
}

// ---------------------------------------------------------------------------
// Payload generation (deterministic, unique per index)
// ---------------------------------------------------------------------------

/// Generate a unique payload of `size` bytes for index `i`.
/// Uses a combination of index, size, and a per-process random salt so
/// successive runs never produce duplicate content hashes.
pub fn generate_payload(size: usize, index: u64) -> Vec<u8> {
use rand::{Rng, SeedableRng};
use std::sync::OnceLock;
static SALT: OnceLock<u64> = OnceLock::new();
let salt = *SALT.get_or_init(|| rand::random());
let seed = index ^ salt ^ (size as u64).wrapping_mul(0x9E3779B97F4A7C15);
let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
let mut data = vec![0u8; size];
rng.fill(&mut data[..]);
data
}
1 change: 1 addition & 0 deletions stress-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod authorize;
pub mod bitswap;
pub mod chain_info;
pub mod client;
pub mod hop;
pub mod pipeline;
pub mod report;
pub mod scenarios;
Expand Down
Loading