Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions quiche/src/stream/recv_buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,12 @@ impl RecvBuf {
}

// No need to store empty buffer that doesn't carry the fin flag.
// However, we must still advance the max received offset. A
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
// However, we must still advance the max received offset. A
//
// However, we must still advance the max received offset. A

// zero-length non-FIN STREAM frame at offset N implies the peer has
// data at that offset.
// NOTE: connection level flow control accounting also expects this.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
// NOTE: connection level flow control accounting also expects this.
//
// NOTE: connection level flow control accounting also expects this.

if !buf.fin() && buf.is_empty() {
self.len = cmp::max(self.len, buf.max_off());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this update happen unconditionally before the first return Ok()?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only return Ok(()) before this line happens when we already have a fin_off in which case the current buffer can't change self.len.

That whole write function could probably be edited and simplified a bit in the long term

return Ok(());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should self.off also be updated? Specifically if self.drain = true, after stream_shutdown(..., Shutdown::Read, ...), Connection treats the new offset delta as consumed flow control, but off stays stale here. A later RESET_STREAM then computes consumed_flowcontrol = final_size - self.off and can count the same range again.

See following test case which fails on this branch:

#[rstest]
/// Tests that empty non-FIN STREAM frames on a draining stream are not counted
/// again when the same final size is later received in a RESET_STREAM frame.
fn flow_control_drain_empty_stream_frame_reset(
    #[values("cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
) {
    let mut buf = [0; 65536];

    let mut pipe = test_utils::Pipe::new(cc_algorithm_name).unwrap();
    assert_eq!(pipe.handshake(), Ok(()));

    assert_eq!(pipe.client.stream_send(4, b"aaaaa", false), Ok(5));
    assert_eq!(pipe.advance(), Ok(()));

    assert_eq!(pipe.server.rx_data, 5);
    assert_eq!(pipe.server.flow_control.consumed(), 0);

    assert_eq!(pipe.server.stream_shutdown(4, Shutdown::Read, 42), Ok(()));

    assert_eq!(pipe.server.rx_data, 5);
    assert_eq!(pipe.server.flow_control.consumed(), 5);

    let frames = [frame::Frame::Stream {
        stream_id: 4,
        data: RangeBuf::from(&[], 10, false),
    }];

    let written =
        test_utils::encode_pkt(&mut pipe.client, Type::Short, &frames, &mut buf)
            .unwrap();
    assert_eq!(pipe.server_recv(&mut buf[..written]), Ok(written));

    assert_eq!(pipe.server.rx_data, 10);
    assert_eq!(pipe.server.flow_control.consumed(), 10);

    let frames = [frame::Frame::ResetStream {
        stream_id: 4,
        error_code: 42,
        final_size: 10,
    }];

    let written =
        test_utils::encode_pkt(&mut pipe.client, Type::Short, &frames, &mut buf)
            .unwrap();
    assert_eq!(pipe.server_recv(&mut buf[..written]), Ok(written));

    assert_eq!(pipe.server.rx_data, 10);
    assert_eq!(pipe.server.flow_control.consumed(), 10);
}

So I think we need something like in https://github.com/cloudflare/quiche/blob/master/quiche/src/stream/recv_buf.rs#L194:

Suggested change
return Ok(());
if self.drain {
self.off = self.len;
}
return Ok(());

}

Expand Down Expand Up @@ -510,16 +515,31 @@ mod tests {

assert_emit_discard(&mut recv, emit, 32, 5, false, None);

// Don't store non-fin empty buffer.
// Empty non-FIN frame advances the high-water mark but stores no data.
let buf = RangeBuf::from(b"", 10, false);
assert!(recv.write(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.len, 10);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 0);

// Check flow control for empty buffer.
// Check flow control for empty non-FIN buffer past the limit.
let buf = RangeBuf::from(b"", 16, false);
assert_eq!(recv.write(buf), Err(Error::FlowControl));
}

#[rstest]
fn empty_fin_stream_frame(#[values(true, false)] emit: bool) {
let mut recv =
RecvBuf::new(15, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);

let buf = RangeBuf::from(b"hello", 0, false);
assert!(recv.write(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);

assert_emit_discard(&mut recv, emit, 32, 5, false, None);

// Store fin empty buffer.
let buf = RangeBuf::from(b"", 5, true);
Expand Down
76 changes: 76 additions & 0 deletions quiche/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1674,6 +1674,82 @@ fn flow_control_drain(
assert_eq!(pipe.server.flow_control.consumed(), 25);
}

#[rstest]
/// Tests that empty non-FIN STREAM frames advance connection-level flow
/// control *exactly once*, even when subsequent frames cover the same offset
/// range.
fn flow_control_empty_stream_frame(
#[values("cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
) {
let mut config = test_utils::Pipe::default_config(cc_algorithm_name).unwrap();
// Large limits so flow control on individual streams / connection don't
// interfere with the specific accounting we want to observe.
config.set_initial_max_data(1000);
config.set_initial_max_stream_data_bidi_local(1000);
config.set_initial_max_stream_data_bidi_remote(1000);
config.set_initial_max_streams_bidi(3);
config.set_initial_max_streams_uni(3);
config.verify_peer(false);

let mut pipe = test_utils::Pipe::with_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Ok(()));

let send_frame_helper =
|pipe: &mut test_utils::Pipe, data: RangeBuf| -> Result<()> {
let mut buf = [0; 65536];
let frames = [frame::Frame::Stream { stream_id: 4, data }];
let written = test_utils::encode_pkt(
&mut pipe.client,
Type::Short,
&frames,
&mut buf,
)?;
assert_eq!(pipe.server_recv(&mut buf[..written]), Ok(written));
Ok(())
};

// 1. First real data: off=0, len=10.
send_frame_helper(&mut pipe, RangeBuf::from(&[1; 10], 0, false)).unwrap();
assert_eq!(pipe.server.rx_data, 10);

// 2. Empty non-FIN frame at off=15. High-water mark advances to 15 even
// though no bytes are stored.
send_frame_helper(&mut pipe, RangeBuf::from(&[0; 0], 15, false)).unwrap();
assert_eq!(pipe.server.rx_data, 15);

// 3. Real data at off=15, len=5 with FIN. The [15..20] range is new, so
// rx_data advances to 20. The [15..15] range already counted, so it must
// NOT be double-counted.
send_frame_helper(&mut pipe, RangeBuf::from(&[2; 5], 15, true)).unwrap();
assert_eq!(pipe.server.rx_data, 20);

// 4. Fill the gap: off=10, len=5. All bytes in [10..15] were already counted
// (via the empty frame at off=15), so rx_data stays at 20.
send_frame_helper(&mut pipe, RangeBuf::from(&[3; 5], 10, false)).unwrap();
assert_eq!(pipe.server.rx_data, 20);

// The stream is readable.
let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);

// Read all 20 bytes and verify content and fin flag.
// Expected content (in order):
// off 0..10 -> [1; 10] (frame 1)
// off 10..15 -> [3; 5] (frame 4, gap fill)
// off 15..20 -> [2; 5] (frame 3)
let mut out = [0u8; 64];
let (n, fin) = pipe.server.stream_recv(4, &mut out).unwrap();
assert_eq!(n, 20);
assert!(fin);
assert_eq!(&out[..10], &[1u8; 10]);
assert_eq!(&out[10..15], &[3u8; 5]);
assert_eq!(&out[15..20], &[2u8; 5]);

// stream_finished() reflects the recv-side is_fin() state.
assert!(pipe.server.stream_finished(4));
}

#[rstest]
/// Tests that flow control is properly updated when a stream receives a RESET
fn flow_control_reset_stream(
Expand Down
Loading