Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions stress-test/src/bin/assign_cores.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//! Assign extra coretime cores to a parachain via sudo on the relay chain.
//! Usage: assign_cores [relay_ws_url] [para_id] [num_extra_cores]

use anyhow::Result;
use subxt::{
config::substrate::SubstrateConfig, dynamic::tx, ext::scale_value::Value, OnlineClient,
};

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();

let args: Vec<String> = std::env::args().collect();
let relay_url = args.get(1).map(|s| s.as_str()).unwrap_or("ws://127.0.0.1:9942");
let para_id: u32 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(2487);
let num_cores: u32 = args.get(3).and_then(|s| s.parse().ok()).unwrap_or(2);

tracing::info!("Connecting to relay at {relay_url}...");
let client = OnlineClient::<SubstrateConfig>::from_url(relay_url).await?;
let alice = subxt_signer::sr25519::dev::alice();

for core in 0..num_cores {
let assign_call = tx(
"Coretime",
"assign_core",
vec![
Value::u128(core as u128),
Value::u128(0),
Value::unnamed_composite([Value::unnamed_composite([
Value::named_variant("Task", [("0".to_string(), Value::u128(para_id as u128))]),
Value::u128(57600),
])]),
Value::unnamed_variant("None", []),
],
);
let sudo_call = tx("Sudo", "sudo", vec![assign_call.into_value()]);

tracing::info!("Assigning core {core} to para {para_id}...");
client
.tx()
.sign_and_submit_then_watch_default(&sudo_call, &alice)
.await?
.wait_for_finalized_success()
.await?;
tracing::info!("Core {core} assigned!");
}

tracing::info!("Done! {num_cores} extra cores assigned to para {para_id}");
Ok(())
}
161 changes: 96 additions & 65 deletions stress-test/src/bitswap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,38 @@ mod bitswap_schema {
const BITSWAP_PROTOCOL: &str = "/ipfs/bitswap/1.2.0";

enum BitswapCommand {
Fetch { peer: PeerId, cid_bytes: Vec<u8>, response_tx: oneshot::Sender<Result<Vec<u8>>> },
/// Fetch one or more blocks. Response is one Vec<u8> per requested CID.
Fetch {
peer: PeerId,
cid_bytes_list: Vec<Vec<u8>>,
response_tx: oneshot::Sender<Result<Vec<Vec<u8>>>>,
},
}

struct PendingRequest {
peer: PeerId,
cid_bytes: Vec<u8>,
response_tx: oneshot::Sender<Result<Vec<u8>>>,
cid_bytes_list: Vec<Vec<u8>>,
response_tx: oneshot::Sender<Result<Vec<Vec<u8>>>>,
}

struct BitswapProtocol {
cmd_rx: mpsc::Receiver<BitswapCommand>,
}

impl BitswapProtocol {
fn build_wantlist(cid_bytes: &[u8]) -> Vec<u8> {
fn build_wantlist(cid_bytes_list: &[Vec<u8>]) -> Vec<u8> {
let entries = cid_bytes_list
.iter()
.map(|cid_bytes| bitswap_schema::message::wantlist::Entry {
block: cid_bytes.clone(),
priority: 1,
cancel: false,
want_type: bitswap_schema::message::wantlist::WantType::Block as i32,
send_dont_have: true,
})
.collect();
let request = bitswap_schema::Message {
wantlist: Some(bitswap_schema::message::Wantlist {
entries: vec![bitswap_schema::message::wantlist::Entry {
block: cid_bytes.to_vec(),
priority: 1,
cancel: false,
want_type: bitswap_schema::message::wantlist::WantType::Block as i32,
send_dont_have: true,
}],
full: false,
}),
wantlist: Some(bitswap_schema::message::Wantlist { entries, full: false }),
blocks: vec![],
payload: vec![],
block_presences: vec![],
Expand All @@ -69,7 +75,7 @@ impl UserProtocol for BitswapProtocol {
}

fn codec(&self) -> ProtocolCodec {
ProtocolCodec::UnsignedVarint(Some(2 * 1024 * 1024))
ProtocolCodec::UnsignedVarint(Some(16 * 1024 * 1024))
}

async fn run(mut self: Box<Self>, service: TransportService) -> litep2p::Result<()> {
Expand All @@ -80,21 +86,25 @@ impl UserProtocol for BitswapProtocol {
let mut pending_connection: Vec<PendingRequest> = Vec::new();
let mut pending_outbound: HashMap<
litep2p::types::SubstreamId,
(Vec<u8>, oneshot::Sender<Result<Vec<u8>>>),
(Vec<Vec<u8>>, oneshot::Sender<Result<Vec<Vec<u8>>>>),
> = HashMap::new();
let mut waiting_response: Option<(PeerId, oneshot::Sender<Result<Vec<u8>>>)> = None;
let mut waiting_responses: std::collections::VecDeque<(
PeerId,
usize,
oneshot::Sender<Result<Vec<Vec<u8>>>>,
)> = std::collections::VecDeque::new();

loop {
tokio::select! {
cmd = self.cmd_rx.recv() => {
match cmd {
Some(BitswapCommand::Fetch { peer, cid_bytes, response_tx }) => {
tracing::debug!("bitswap: fetch command for peer {peer}");
Some(BitswapCommand::Fetch { peer, cid_bytes_list, response_tx }) => {
tracing::debug!("bitswap: fetch command for peer {peer} ({} CIDs)", cid_bytes_list.len());
if connected_peers.contains(&peer) {
match service.open_substream(peer) {
Ok(substream_id) => {
tracing::debug!("bitswap: opened outbound substream {substream_id:?}");
pending_outbound.insert(substream_id, (cid_bytes, response_tx));
pending_outbound.insert(substream_id, (cid_bytes_list, response_tx));
}
Err(e) => {
tracing::warn!("bitswap: failed to open substream: {e:?}");
Expand All @@ -103,7 +113,7 @@ impl UserProtocol for BitswapProtocol {
}
} else {
tracing::debug!("bitswap: peer not connected, queuing");
pending_connection.push(PendingRequest { peer, cid_bytes, response_tx });
pending_connection.push(PendingRequest { peer, cid_bytes_list, response_tx });
}
}
None => break,
Expand All @@ -116,13 +126,14 @@ impl UserProtocol for BitswapProtocol {
match direction {
Direction::Outbound(substream_id) => {
tracing::debug!("bitswap: outbound substream opened {substream_id:?}");
if let Some((cid_bytes, response_tx)) = pending_outbound.remove(&substream_id) {
let wantlist = Self::build_wantlist(&cid_bytes);
tracing::debug!("bitswap: sending wantlist ({} bytes)", wantlist.len());
if let Some((cid_bytes_list, response_tx)) = pending_outbound.remove(&substream_id) {
let num_cids = cid_bytes_list.len();
let wantlist = Self::build_wantlist(&cid_bytes_list);
tracing::debug!("bitswap: sending wantlist ({num_cids} CIDs, {} bytes)", wantlist.len());
match substream.send_framed(Bytes::from(wantlist)).await {
Ok(()) => {
tracing::debug!("bitswap: wantlist sent, waiting for inbound response");
waiting_response = Some((peer, response_tx));
waiting_responses.push_back((peer, num_cids, response_tx));
}
Err(e) => {
tracing::warn!("bitswap: failed to send wantlist: {e:?}");
Expand All @@ -132,53 +143,59 @@ impl UserProtocol for BitswapProtocol {
}
}
Direction::Inbound => {
tracing::debug!("bitswap: inbound substream from {peer}, waiting_response={}", waiting_response.is_some());
if let Some((waiting_peer, response_tx)) = waiting_response.take() {
if waiting_peer == peer {
tracing::debug!("bitswap: reading response from inbound substream...");
tracing::debug!("bitswap: inbound substream from {peer}, pending={}", waiting_responses.len());
let matched_idx = waiting_responses.iter()
.position(|(p, _, _)| *p == peer);
if let Some(idx) = matched_idx {
let (_, expected_count, response_tx) = waiting_responses.remove(idx).unwrap();
tracing::debug!("bitswap: reading response (expecting {expected_count} blocks)...");
// Read ALL frames — the server may split
// blocks across multiple framed messages.
let mut all_blocks: Vec<Vec<u8>> = Vec::new();
let mut had_error = false;
while all_blocks.len() < expected_count {
match substream.next().await {
Some(Ok(data)) => {
tracing::debug!("bitswap: received {} bytes on inbound substream", data.len());
tracing::debug!("bitswap: frame {} bytes", data.len());
match bitswap_schema::Message::decode(data.as_ref()) {
Ok(msg) => {
if let Some(block) = msg.payload.first() {
tracing::debug!("bitswap: got block payload ({} bytes)", block.data.len());
let _ = response_tx.send(Ok(block.data.clone()));
} else if !msg.block_presences.is_empty() {
let presence = &msg.block_presences[0];
if presence.r#type == bitswap_schema::message::BlockPresenceType::DontHave as i32 {
tracing::warn!("bitswap: peer responded DontHave");
let _ = response_tx.send(Err(anyhow!("Peer does not have the block")));
} else {
tracing::warn!("bitswap: peer has block but didn't send it");
let _ = response_tx.send(Err(anyhow!("Peer has block but didn't send it")));
}
} else {
tracing::warn!("bitswap: empty response (no payload, no presences)");
let _ = response_tx.send(Err(anyhow!("Empty bitswap response")));
for block in &msg.payload {
all_blocks.push(block.data.clone());
}
if !msg.block_presences.is_empty() {
// DontHave — stop reading.
break;
}
if msg.payload.is_empty() {
break;
}
}
Err(e) => {
tracing::warn!("bitswap: failed to decode response: {e:?}");
let _ = response_tx.send(Err(anyhow!("Failed to decode response: {e:?}")));
tracing::warn!("bitswap: decode error: {e:?}");
had_error = true;
break;
}
}
}
Some(Err(e)) => {
tracing::warn!("bitswap: substream error: {e:?}");
let _ = response_tx.send(Err(anyhow!("Substream error: {e:?}")));
}
None => {
tracing::warn!("bitswap: substream closed without data");
let _ = response_tx.send(Err(anyhow!("Substream closed without data")));
had_error = true;
break;
}
None => break,
}
}
if !all_blocks.is_empty() {
tracing::debug!("bitswap: got {} block(s) total", all_blocks.len());
let _ = response_tx.send(Ok(all_blocks));
} else if had_error {
let _ = response_tx.send(Err(anyhow!("Substream error during read")));
} else {
tracing::debug!("bitswap: inbound from wrong peer {peer}, expected {waiting_peer}");
waiting_response = Some((waiting_peer, response_tx));
tracing::warn!("bitswap: no blocks received");
let _ = response_tx.send(Err(anyhow!("No blocks in response")));
}
} else {
tracing::debug!("bitswap: unexpected inbound substream (no waiting response)");
tracing::debug!("bitswap: unexpected inbound from {peer}, discarding");
}
}
}
Expand All @@ -197,7 +214,7 @@ impl UserProtocol for BitswapProtocol {
if req.peer == peer {
match service.open_substream(peer) {
Ok(substream_id) => {
pending_outbound.insert(substream_id, (req.cid_bytes, req.response_tx));
pending_outbound.insert(substream_id, (req.cid_bytes_list, req.response_tx));
}
Err(e) => {
let _ = req.response_tx.send(Err(anyhow!("Failed to open substream: {e:?}")));
Expand All @@ -212,13 +229,16 @@ impl UserProtocol for BitswapProtocol {
Some(TransportEvent::ConnectionClosed { peer }) => {
tracing::warn!("bitswap: connection closed to {peer}");
connected_peers.remove(&peer);
if let Some((waiting_peer, response_tx)) = waiting_response.take() {
if waiting_peer == peer {
let _ = response_tx.send(Err(anyhow!("Connection closed")));
// Fail all pending responses for this peer.
let mut remaining = std::collections::VecDeque::new();
for (p, n, tx) in waiting_responses.drain(..) {
if p == peer {
let _ = tx.send(Err(anyhow!("Connection closed")));
} else {
waiting_response = Some((waiting_peer, response_tx));
remaining.push_back((p, n, tx));
}
}
waiting_responses = remaining;
let mut remaining = Vec::new();
for req in pending_connection.drain(..) {
if req.peer == peer {
Expand Down Expand Up @@ -306,24 +326,35 @@ impl BitswapClient {
Ok(())
}

/// Fetch a block by CID from a specific peer.
/// Fetch a single block by CID from a specific peer.
pub async fn fetch_block(
&self,
peer: PeerId,
cid: cid::Cid,
timeout_duration: Duration,
) -> Result<Vec<u8>> {
let cid_bytes = cid.to_bytes();
let mut blocks = self.fetch_blocks(peer, &[cid], timeout_duration).await?;
blocks.pop().ok_or_else(|| anyhow!("No blocks in response"))
}

/// Fetch multiple blocks by CID in a single wantlist (max 16).
pub async fn fetch_blocks(
&self,
peer: PeerId,
cids: &[cid::Cid],
timeout_duration: Duration,
) -> Result<Vec<Vec<u8>>> {
let cid_bytes_list: Vec<Vec<u8>> = cids.iter().map(|c| c.to_bytes()).collect();
let (response_tx, response_rx) = oneshot::channel();

self.cmd_tx
.send(BitswapCommand::Fetch { peer, cid_bytes, response_tx })
.send(BitswapCommand::Fetch { peer, cid_bytes_list, response_tx })
.await
.map_err(|_| anyhow!("Failed to send fetch command (protocol task closed)"))?;
.map_err(|_| anyhow!("Failed to send fetch command"))?;

tokio::time::timeout(timeout_duration, response_rx)
.await
.map_err(|_| anyhow!("Bitswap fetch timed out for CID {cid}"))?
.map_err(|_| anyhow!("Bitswap fetch timed out"))?
.map_err(|_| anyhow!("Response channel closed"))?
}

Expand Down
5 changes: 5 additions & 0 deletions stress-test/src/chain_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ impl ChainLimits {
TheoreticalLimits { weight_cap, length_cap, count_cap, effective_cap, bottleneck }
}

/// Estimate how many store txs of `payload_size` bytes fit in one block.
pub fn estimate_block_capacity(&self, payload_size: usize) -> usize {
self.compute_theoretical_limits(payload_size).effective_cap as usize
}

/// Print a human-readable summary of the queried limits.
pub fn print_text(&self) {
println!();
Expand Down
10 changes: 10 additions & 0 deletions stress-test/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ impl BulletinExtrinsicParamsBuilder {
self
}

pub fn mortal(mut self, period: u64) -> Self {
self.0 = self.0.mortal(period);
self
}

pub fn immortal(mut self) -> Self {
self.0 = self.0.immortal();
self
}

pub fn build(self) -> <BulletinExtrinsicParams as ExtrinsicParams<BulletinConfig>>::Params {
self.0.build()
}
Expand Down
Loading