Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions quiche/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ use qlog::events::quic::QuicFrame;
#[cfg(feature = "qlog")]
use qlog::events::quic::StreamType;

pub const MAX_CRYPTO_OVERHEAD: usize = 8;
/// Maximum CRYPTO frame overhead:
/// 1-byte type + varint-encoded offset (bounded by `MAX_CRYPTO_STREAM_OFFSET`)
/// + 2-byte length (we always encode as 2 bytes)
pub const MAX_CRYPTO_OVERHEAD: usize =
1 + octets::varint_len(crate::MAX_CRYPTO_STREAM_OFFSET) + 2;
Comment thread
gregor-cf marked this conversation as resolved.
pub const MAX_DGRAM_OVERHEAD: usize = 2;
pub const MAX_STREAM_OVERHEAD: usize = 12;
pub const MAX_STREAM_SIZE: u64 = 1 << 62;

#[derive(Clone, Debug, PartialEq, Eq)]
Expand Down
40 changes: 25 additions & 15 deletions quiche/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ const MAX_PROBING_TIMEOUTS: usize = 3;
const DEFAULT_INITIAL_CONGESTION_WINDOW_PACKETS: usize = 10;

// The maximum data offset that can be stored in a crypto stream.
const MAX_CRYPTO_STREAM_OFFSET: u64 = 1 << 16;
pub(crate) const MAX_CRYPTO_STREAM_OFFSET: u64 = 1 << 16;

// The send capacity factor.
const TX_CAP_FACTOR: f64 = 1.0;
Expand Down Expand Up @@ -5199,7 +5199,6 @@ impl<F: BufFactory> Connection<F> {

// Create a single STREAM frame for the first stream that is flushable.
if (pkt_type == Type::Short || pkt_type == Type::ZeroRTT) &&
left > frame::MAX_STREAM_OVERHEAD &&
!is_closing &&
path.active() &&
!dgram_emitted
Expand Down Expand Up @@ -5239,13 +5238,16 @@ impl<F: BufFactory> Connection<F> {
octets::varint_len(stream_off) + // offset
2; // length, always encode as 2-byte varint

let max_len = match left.checked_sub(hdr_len) {
Some(v) => v,
// Require at least 1 byte of payload beyond the header.
// If even the minimum payload (hdr_len + 1 byte) doesn't
// fit, stop trying. We could continue to the next stream
// (a lower stream_id or offset might encode in fewer varint
// bytes), but we'd need a different iterator, since
// peek_flushable() returns the same stream on every iteration.
let max_len = match left.checked_sub(hdr_len + 1) {
Some(v) => v + 1,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically the +1 is only needed in cases where the stream has at least 1 byte to send. The case of stream FIN with no data could be transmitted with v == 0.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I think we need a little more nuance here

None => {
let priority_key = Arc::clone(&stream.priority_key);
self.streams.remove_flushable(&priority_key);

continue;
break;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An implication of break instead of continue is that other streams with lower stream id or offset could have written a tiny frame but now they won't.

I need to look into the implications of the remove_flushable

The check for MAX_STREAM_OVERHEAD in the original code made this branch unlikely to be reached and we could run into problems when we do hit it.

I think with the lower MIN this branch will be easier to test correct. This is a very good thing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, remove_flushable() means we don't try to emit from this stream again until it's re-inserted into flushable. That only happens when the app writes more data to the stream. If the app already wrote a FIN, the stream will be stranded forever.

I was actually wondering if should remove the MIN_STREAM_OVERHEAD guard altogether and just rely on this check here. As soon as a stream has offset>64 we're past the guard anyways, so I don't think it really does much and just adds clutter / complexity.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no objections to removal of the MIN_STREAM_OVERHEAD guard.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to continue instead of break, we'd just need to add a different iterator for flushable. I think many of the other streams.<adjective> collection already to.
But with the old MAX_STREAM_OVERHEAD guard, we also skipped many streams that would have fit.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with the break.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused by the remove_flushable call. Wouldn't this cause a stream hang?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, you removed the call to remove_flushable. Did we just discover another bug here? :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change in behaviour needs some more consideration, the break risks a high-priority stream blocking forward progress on others and introducing tail latency. Worst case, a certain combo could completely stall out all other streams.

I think I'd be more comfortable with having a min guard (to prevent looping through the whole set when we really tand no hope of sending anything) and trying to walk the entire flushable queue. We could avoid needing a new fluhable iterator type if we store the removed flushable values and reinsert them after we're done sending. But if its easy enough to add a new iterator that would be ok too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is still a net improvement. The vast majority of cases / streams would have never hit this continue at all (only streams with hdr_len > 12 would have, so essentially only a stream_off of 1GB+ would have). The MAX_STREAM_OVERHEAD guard would have skipped even attempting to fit a frame. Now we at least try to see how much space the header would actually need. In many cases (hdr_len < 12), we can actually send a frame now where we previously couldn't. The break here we mimics the behavior of the old MAX_STREAM_OVERHEAD: if we can't fit the frame, we simply skip the attempt of sending a STREAM until cwnd_available increases again.

The impact on lower priority streams should be minimal, I think. In the best case, another stream could have fit
a max 14bytes of payload into a frame before it also has to wait for more available cwnd.

That said, adding the iterator is pretty straightforward and it's certainly a further improvement.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes the new approach does benefit from supporting smaller stream frames.

I think the old bug had an upside. If you have a huge STREAM frame to send and left was between 12 and 20 bytes so you couldn't actually send, the stream would be dropped and since checked_sub didn't update left, the continue an then emit one frame in the same round, and never be worried about the huge frame again 👿

Adding an iterator lets us have the best of both worlds

},
};

Expand All @@ -5256,6 +5258,14 @@ impl<F: BufFactory> Connection<F> {
let (len, fin) =
stream.send.emit(&mut stream_payload.as_mut()[..max_len])?;

// The stream was flushable and max_len ≥ 1, so emit() must
// have produced at least one byte, or set FIN. While a zero
// length non-fin STREAM frame is legal, it's silly.
debug_assert!(
len > 0 || fin,
"emit() from flushable stream produced 0 bytes without FIN"
);

// Encode the frame's header.
//
// Due to how `OctetsMut::split_at()` works, `stream_hdr` starts
Expand Down Expand Up @@ -5301,10 +5311,9 @@ impl<F: BufFactory> Connection<F> {

#[cfg(feature = "fuzzing")]
// Coalesce STREAM frames when fuzzing.
if left > frame::MAX_STREAM_OVERHEAD {
continue;
}
continue;

#[cfg(not(feature = "fuzzing"))]
break;
}
}
Expand Down Expand Up @@ -5335,14 +5344,15 @@ impl<F: BufFactory> Connection<F> {
path.recovery.ping_sent(epoch);
}

if !has_data &&
!dgram_emitted &&
Comment thread
gregor-cf marked this conversation as resolved.
cwnd_available > frame::MAX_STREAM_OVERHEAD
{
// TODO: 12 is the old MAX_STREAM_OVERHEAD value. See
// https://github.com/cloudflare/quiche/pull/2453 for more
// comprehensive app-limited changes/fixes.
if !has_data && !dgram_emitted && cwnd_available > 12 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we discovered that a more correct number is 20. But I'm fine keeping it as it for consistency.

path.recovery.on_app_limited();
}

if frames.is_empty() {
// Used by legacy recovery.
// When we reach this point we are not able to write more, so set
// app_limited to false.
path.recovery.update_app_limited(false);
Expand Down
36 changes: 36 additions & 0 deletions quiche/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1683,6 +1683,42 @@ mod tests {
assert_eq!(&buf[..2], b"ro");
}

#[test]
/// Tests that `is_flushable()` returns false when `off_front` reaches the
/// stream-level flow control limit (`max_off`).
/// This prevents zero-length stream frames when a stream is at the stream
/// level flow control limit.
fn is_flushable_at_flow_control_limit() {
let max_tx_data = 5;
let mut stream =
<Stream>::new(0, 0, max_tx_data, true, 0, DEFAULT_STREAM_WINDOW);

// Nothing written yet: not flushable.
assert!(!stream.is_flushable());

// Write up to the flow control limit.
assert_eq!(stream.send.write(b"hello", false), Ok(5));
assert!(stream.is_flushable());

// Emit all 5 bytes.
let mut buf = [0u8; 5];
assert_eq!(stream.send.emit(&mut buf), Ok((5, false)));
assert_eq!(stream.send.off_front(), 5);

// off_front (5) == max_off (5): NOT flushable.
assert!(!stream.is_flushable());

// Raise the flow control limit to allow 5 more bytes.
stream.send.update_max_data(10);

// But the send buffer is empty, so still not flushable.
assert!(!stream.is_flushable());

// Write more data; now flushable.
assert_eq!(stream.send.write(b"world", false), Ok(5));
assert!(stream.is_flushable());
}

#[test]
fn rangebuf_split_off() {
let mut buf = <RangeBuf>::from(b"helloworld", 5, true);
Expand Down
4 changes: 3 additions & 1 deletion quiche/src/stream/send_buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,9 @@ impl<F: BufFactory> SendBuf<F> {
// report final_size
self.emit_off = cmp::max(self.emit_off, next_off);

Ok((out.len() - out_len, fin))
let written = out.len() - out_len;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this change do?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a functional no-op


Ok((written, fin))
}

/// Updates the max_data limit to the given value.
Expand Down
141 changes: 141 additions & 0 deletions quiche/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5442,6 +5442,147 @@ fn stream_zero_length_non_fin(
assert!(r.next().is_none());
}

/// Helper for parameterizing tests over different QUIC stream ID varint
/// sizes. Each variant yields a client-initiated bidirectional stream ID
/// whose varint encoding is 1, 2, or 4 bytes respectively.
#[derive(Clone, Copy, Debug)]
enum StreamIdSize {
/// stream_id 0: 1-byte varint.
OneByteVarint,
/// stream_id 64: 2-byte varint.
TwoByteVarint,
/// stream_id 16384: 4-byte varint.
FourByteVarint,
}

impl StreamIdSize {
fn stream_id(self) -> u64 {
match self {
StreamIdSize::OneByteVarint => 0,
StreamIdSize::TwoByteVarint => 64,
StreamIdSize::FourByteVarint => 16384,
}
}
}

#[rstest]
/// Tests that we do not emit zero-length stream frames that do not have
/// the FIN bit set, and that these streams remain flushable.
///
/// The test is parameterized over stream IDs with 1-, 2-, and 4-byte
/// varint encodings to exercise different STREAM frame header sizes.
///
/// We sweep buffer sizes from 35 to 50. A Short packet header for a
/// 16-byte DCID is approximately 35 bytes
/// (1 flags + 16 dcid + 2 pkt_num + 16 AEAD tag).
fn do_not_send_empty_stream_frames_without_fin(
#[values("cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
#[values(
StreamIdSize::OneByteVarint,
StreamIdSize::TwoByteVarint,
StreamIdSize::FourByteVarint
)]
stream_id_size: StreamIdSize,
) {
// We need to set initial_max_streams high enough to "fit" the stream_id
// Since the two low-bits of the id encode the type, we can use
// `(id >> 2) + 1`
let stream_id = stream_id_size.stream_id();
let max_streams_bidi = (stream_id >> 2) + 1;

let mut config = test_utils::Pipe::default_config(cc_algorithm_name).unwrap();
config.set_initial_max_streams_bidi(max_streams_bidi);
// Just use some large flow control values so they don't get in the way of the
// test
config.set_initial_max_data(10_000);
config.set_initial_max_stream_data_bidi_local(10_000);
config.set_initial_max_stream_data_bidi_remote(10_000);
let mut pipe = test_utils::Pipe::with_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Ok(()));
// drain any additional ACKs.
assert_eq!(pipe.advance(), Ok(()));

assert_eq!(pipe.client.stream_send(stream_id, b"data", false), Ok(4));
assert!(pipe.client.streams.has_flushable());

let active_pid = pipe
.client
.paths
.get_active_path_id()
.expect("no active path");

for buf_sz in 35..=50_usize {
let mut small_buf = vec![0u8; buf_sz];
let result = pipe.client.send_single(
&mut small_buf,
active_pid,
false,
Instant::now(),
);

match result {
Err(Error::Done) => {
// Header didn't fit: stream must remain flushable.
assert!(
pipe.client.streams.has_flushable(),
"stream removed from flushable at buf_sz={buf_sz}"
);
},

Ok((_, len)) => {
// A packet was emitted. Decrypt and inspect its frames.
let frames = test_utils::decode_pkt(
&mut pipe.server,
&mut small_buf[..len],
)
.unwrap();

let stream_frames: Vec<_> = frames
.iter()
.filter(|f| matches!(f, frame::Frame::Stream { .. }))
.collect();

if stream_frames.is_empty() {
// Some other frame; this shouldn't really happen, since
// we don't expect any frames. The Pipe::handshake() call
// above should have drained all such
// frames.
panic!(
"We got unexpected frames. We expect to see only \
stream frames. frames={:?}",
frames
);
}

// A STREAM frame was emitted. At this point there should
// have been space for exactly one byte of payload.
assert_eq!(
stream_frames.len(),
1,
"expected exactly one STREAM frame at buf_sz={buf_sz}"
);

match &stream_frames[0] {
frame::Frame::Stream { data, .. } => {
assert_eq!(
data.len(),
1,
"STREAM frame payload at buf_sz={buf_sz}"
);
},
_ => unreachable!(),
}

return;
},

Err(e) => panic!("unexpected error at buf_sz={buf_sz}: {e:?}"),
}
}

panic!("no buffer size in the range we swept produced a packet with a STREAM frame");
}

#[rstest]
/// Tests that completed streams are garbage collected.
fn collect_streams(
Expand Down
Loading