diff --git a/quiche/src/lib.rs b/quiche/src/lib.rs index 5275629ceee..324694f37ed 100644 --- a/quiche/src/lib.rs +++ b/quiche/src/lib.rs @@ -573,6 +573,7 @@ pub struct Config { grease: bool, cc_algorithm: CongestionControlAlgorithm, + enable_bbr_app_limited_fix: bool, custom_bbr_params: Option, initial_congestion_window_packets: usize, enable_relaxed_loss_threshold: bool, @@ -658,6 +659,7 @@ impl Config { application_protos: Vec::new(), grease: true, cc_algorithm: CongestionControlAlgorithm::CUBIC, + enable_bbr_app_limited_fix: false, custom_bbr_params: None, initial_congestion_window_packets: DEFAULT_INITIAL_CONGESTION_WINDOW_PACKETS, @@ -1086,6 +1088,15 @@ impl Config { self.cc_algorithm = algo; } + /// Enable a rework of how the BBR congestion control implementation + /// computes app-limited. This config parameter will be removed after we + /// build confidence on the new implementation or decide to roll it back. + /// + /// Defaults to `false`. + pub fn set_enable_bbr_app_limited_fix(&mut self, value: bool) { + self.enable_bbr_app_limited_fix = value; + } + /// Sets custom BBR settings. /// /// This API is experimental and will be removed in the future. @@ -2509,6 +2520,32 @@ impl Connection { Ok(()) } + /// Enable a rework of how the BBR congestion control implementation + /// computes app-limited. This config parameter will be removed after we + /// build confidence on the new implementation or decide to roll it back. + /// + /// Currently this only applies if cc_algorithm is + /// `CongestionControlAlgorithm::Bbr2Gcongestion`. + /// + /// This function can only be called inside one of BoringSSL's handshake + /// callbacks, before any packet has been sent. Calling this function any + /// other time will have no effect. + /// + /// See [`Config::set_enable_bbr_app_limited_fix()`]. + /// + /// [`Config::set_enable_bbr_app_limited_fix()`]: struct.Config.html#method.set_enable_bbr_app_limited_fix + #[cfg(feature = "boringssl-boring-crate")] + #[cfg_attr(docsrs, doc(cfg(feature = "boringssl-boring-crate")))] + pub fn set_enable_bbr_app_limited_fix_in_handshake( + ssl: &mut boring::ssl::SslRef, value: bool, + ) -> Result<()> { + let ex_data = tls::ExData::from_ssl_ref(ssl).ok_or(Error::TlsFail)?; + + ex_data.recovery_config.enable_bbr_app_limited_fix = value; + + Ok(()) + } + /// Sets the congestion control algorithm used by string. /// /// This function can only be called inside one of BoringSSL's handshake @@ -2810,6 +2847,12 @@ impl Connection { Ok(()) } + /// Returns true if at least 1 stream has headers or body data to + /// write, or there are items in the DATAGRAM send queue. + fn has_flushable_data(&self) -> bool { + self.streams.has_flushable() || !self.dgram_send_queue.is_empty() + } + /// Processes QUIC packets received from the peer. /// /// On success the number of bytes processed from the input buffer is @@ -2859,11 +2902,18 @@ impl Connection { return Err(Error::BufferTooShort); } + let now = Instant::now(); + let recv_pid = self.paths.path_id_from_addrs(&(info.to, info.from)); if let Some(recv_pid) = recv_pid { let recv_path = self.paths.get_mut(recv_pid)?; + // Run the app limited check if the previous send loop ran out of data + // to send. Limit the check to the path where the packet + // was received. + recv_path.recovery.bbr_check_if_app_limited(&now); + // Keep track of how many bytes we received from the client, so we // can limit bytes sent back before address validation, to a // multiple of this. The limit needs to be increased early on, so @@ -2896,6 +2946,7 @@ impl Connection { // Process coalesced packets. while left > 0 { let read = match self.recv_single( + now, &mut buf[len - left..len], &info, recv_pid, @@ -2993,10 +3044,9 @@ impl Connection { /// /// [`Done`]: enum.Error.html#variant.Done fn recv_single( - &mut self, buf: &mut [u8], info: &RecvInfo, recv_pid: Option, + &mut self, now: Instant, buf: &mut [u8], info: &RecvInfo, + recv_pid: Option, ) -> Result { - let now = Instant::now(); - if buf.is_empty() { return Err(Error::Done); } @@ -4035,6 +4085,11 @@ impl Connection { let send_path = self.paths.get_mut(send_pid)?; + // Run the app limited check if the previous send loop ran out of data to + // send. Limit the check to the path where the connection is + // trying to send. + send_path.recovery.bbr_check_if_app_limited(&now); + // Update max datagram size to allow path MTU discovery probe to be sent. if let Some(pmtud) = send_path.pmtud.as_mut() { if pmtud.should_probe() { @@ -4068,7 +4123,15 @@ impl Connection { ) { Ok(v) => v, - Err(Error::BufferTooShort) | Err(Error::Done) => break, + Err(Error::BufferTooShort) => break, + + Err(Error::Done) => { + let has_flushable_data = self.has_flushable_data(); + let send_path = self.paths.get_mut(send_pid)?; + send_path.recovery.send_stopped_early(has_flushable_data); + + break; + }, Err(e) => return Err(e), }; @@ -7031,6 +7094,11 @@ impl Connection { pub fn on_timeout(&mut self) { let now = Instant::now(); + // Check for app limited on all paths when handling a timeout. + for (_, path) in self.paths.iter_mut() { + path.recovery.bbr_check_if_app_limited(&now); + } + if let Some(draining_timer) = self.draining_timer { if draining_timer <= now { trace!("{} draining timeout expired", self.trace_id); diff --git a/quiche/src/recovery/congestion/recovery.rs b/quiche/src/recovery/congestion/recovery.rs index bd519e4ce68..66b976ae819 100644 --- a/quiche/src/recovery/congestion/recovery.rs +++ b/quiche/src/recovery/congestion/recovery.rs @@ -956,7 +956,6 @@ impl RecoveryOps for LegacyRecovery { self.detect_lost_packets(epoch, now, "") } - // FIXME only used by gcongestion fn on_app_limited(&mut self) { // Not implemented for legacy recovery, update_app_limited and // delivery_rate_update_app_limited used instead. @@ -1038,6 +1037,16 @@ impl RecoveryOps for LegacyRecovery { false } + fn send_stopped_early(&mut self, _has_flushable_data: bool) { + // Not implemented -- only used by the BBR implementation + // which lives in the gcongestion directory. + } + + fn bbr_check_if_app_limited(&mut self, _now: &Instant) { + // Not implemented -- only used by the BBR implementation + // which lives in the gcongestion directory. + } + fn lost_count(&self) -> usize { self.congestion.lost_count } diff --git a/quiche/src/recovery/gcongestion/bbr/bandwidth_sampler.rs b/quiche/src/recovery/gcongestion/bbr/bandwidth_sampler.rs index a9cb810af58..360a1b2dd76 100644 --- a/quiche/src/recovery/gcongestion/bbr/bandwidth_sampler.rs +++ b/quiche/src/recovery/gcongestion/bbr/bandwidth_sampler.rs @@ -517,7 +517,7 @@ impl BandwidthSampler { } } - #[allow(dead_code)] + #[cfg(test)] pub(crate) fn is_app_limited(&self) -> bool { self.is_app_limited } diff --git a/quiche/src/recovery/gcongestion/bbr2.rs b/quiche/src/recovery/gcongestion/bbr2.rs index c9a46f02af5..202cbe70612 100644 --- a/quiche/src/recovery/gcongestion/bbr2.rs +++ b/quiche/src/recovery/gcongestion/bbr2.rs @@ -749,8 +749,9 @@ impl CongestionControl for BBRv2 { self.last_quiescence_start.is_none() } - fn is_cwnd_limited(&self, bytes_in_flight: usize) -> bool { - bytes_in_flight >= self.get_congestion_window() + #[cfg(test)] + fn is_app_limited(&self) -> bool { + self.mode.network_model().is_app_limited() } fn pacing_rate( diff --git a/quiche/src/recovery/gcongestion/bbr2/network_model.rs b/quiche/src/recovery/gcongestion/bbr2/network_model.rs index 2cb14bb6395..5ad20d550ab 100644 --- a/quiche/src/recovery/gcongestion/bbr2/network_model.rs +++ b/quiche/src/recovery/gcongestion/bbr2/network_model.rs @@ -774,6 +774,11 @@ impl BBRv2NetworkModel { self.bandwidth_sampler.on_app_limited() } + #[cfg(test)] + pub(super) fn is_app_limited(&self) -> bool { + self.bandwidth_sampler.is_app_limited() + } + pub(super) fn loss_events_in_round(&self) -> usize { self.loss_events_in_round } diff --git a/quiche/src/recovery/gcongestion/mod.rs b/quiche/src/recovery/gcongestion/mod.rs index 05ceca85b33..0dfc235b4a8 100644 --- a/quiche/src/recovery/gcongestion/mod.rs +++ b/quiche/src/recovery/gcongestion/mod.rs @@ -110,8 +110,8 @@ pub(super) trait CongestionControl: Debug { fn is_in_recovery(&self) -> bool; - #[allow(dead_code)] - fn is_cwnd_limited(&self, bytes_in_flight: usize) -> bool; + #[cfg(test)] + fn is_app_limited(&self) -> bool; fn pacing_rate( &self, bytes_in_flight: usize, rtt_stats: &RttStats, diff --git a/quiche/src/recovery/gcongestion/pacer.rs b/quiche/src/recovery/gcongestion/pacer.rs index 7a199a124cb..62de094cbf7 100644 --- a/quiche/src/recovery/gcongestion/pacer.rs +++ b/quiche/src/recovery/gcongestion/pacer.rs @@ -283,12 +283,7 @@ impl Pacer { } #[cfg(test)] - pub fn is_app_limited(&self, bytes_in_flight: usize) -> bool { - !self.is_cwnd_limited(bytes_in_flight) - } - - #[cfg(test)] - fn is_cwnd_limited(&self, bytes_in_flight: usize) -> bool { - !self.pacing_limited && self.sender.is_cwnd_limited(bytes_in_flight) + pub fn is_app_limited(&self) -> bool { + self.sender.is_app_limited() } } diff --git a/quiche/src/recovery/gcongestion/recovery.rs b/quiche/src/recovery/gcongestion/recovery.rs index b30a13286c5..d6f04cc2f26 100644 --- a/quiche/src/recovery/gcongestion/recovery.rs +++ b/quiche/src/recovery/gcongestion/recovery.rs @@ -31,6 +31,7 @@ use crate::recovery::RecoveryConfig; use crate::recovery::RecoveryOps; use crate::recovery::RecoveryStats; use crate::recovery::ReleaseDecision; +use crate::recovery::ReleaseTime; use crate::recovery::Sent; use crate::recovery::StartupExit; use crate::recovery::GRANULARITY; @@ -466,6 +467,10 @@ pub struct GRecovery { lost_reuse: Vec, pacer: Pacer, + + enable_bbr_app_limited_fix: bool, + + check_for_app_limited_next_iteration: bool, } impl GRecovery { @@ -530,6 +535,9 @@ impl GRecovery { newly_acked: Vec::new(), lost_reuse: Vec::new(), + enable_bbr_app_limited_fix: recovery_config + .enable_bbr_app_limited_fix, + check_for_app_limited_next_iteration: false, }) } @@ -1045,7 +1053,9 @@ impl RecoveryOps for GRecovery { // FIXME only used by gcongestion fn on_app_limited(&mut self) { - self.pacer.on_app_limited(self.bytes_in_flight.get()) + if !self.enable_bbr_app_limited_fix { + self.pacer.on_app_limited(self.bytes_in_flight.get()) + } } #[cfg(test)] @@ -1109,7 +1119,7 @@ impl RecoveryOps for GRecovery { #[cfg(test)] fn app_limited(&self) -> bool { - self.pacer.is_app_limited(self.bytes_in_flight.get()) + self.pacer.is_app_limited() } // FIXME only used by congestion @@ -1134,6 +1144,58 @@ impl RecoveryOps for GRecovery { true } + fn bbr_check_if_app_limited(&mut self, now: &Instant) { + if !self.enable_bbr_app_limited_fix { + return; + } + + if !self.check_for_app_limited_next_iteration { + return; + } + + // check_for_app_limited_next_iteration is only set after + // send_single returns Err::Done which implies that either + // cwnd is fully exhausted or there is no data in output + // buffers. Retransmisions are done with the help of stream + // send buffers so the `NoUnsentData()` check also covers the + // `C.lost_out <= C.retrans_out` check. + // + self.check_for_app_limited_next_iteration = false; + + // Implements CheckIfApplicationLimited from the BBR RFC. + // + // The cwnd vs inflight check needs to take frame overheads + // into account since due to these overheads it may not be + // possible for the connection to use every last byte of the + // available window. The check could be relaxed further by + // asserting that cwnd - inflight is less than 1 packet's + // worth of bytes. + // + // The check for C.pending_transmissions == 0 is done by + // checking !pacer_has_pending_transmissions by comparing the + // pacer's next release time vs 'now'. + // + // The case of Immediate next_release_time results in the work_loop + // iterating until min(cwnd, tx buffer) is exhausted. If we decide to + // exit the send loop earlier we should mark the connection in some way so + // the next call to bbr_check_if_app_limited does not mark the + // connection app-limited after yielding when C.pending_transmissions > 0. + if self.cwnd() > self.bytes_in_flight.get() + frame::MAX_STREAM_OVERHEAD && + !pacer_has_pending_transmissions( + &self.pacer.get_next_release_time(), + now, + ) + { + self.pacer.on_app_limited(self.bytes_in_flight.get()) + } + } + + fn send_stopped_early(&mut self, has_flushable_data: bool) { + if !has_flushable_data { + self.check_for_app_limited_next_iteration = true; + } + } + #[cfg(feature = "qlog")] fn state_str(&self, _now: Instant) -> &'static str { self.pacer.state_str() @@ -1207,11 +1269,53 @@ impl std::fmt::Debug for GRecovery { } } +fn pacer_has_pending_transmissions( + next_release_time: &ReleaseDecision, now: &Instant, +) -> bool { + match next_release_time.time { + ReleaseTime::Immediate => false, + ReleaseTime::At(time) => time.gt(now), + } +} + #[cfg(test)] mod tests { use super::*; use crate::Config; + #[test] + fn test_pacer_has_pending_transmissions() { + let now = &Instant::now(); + assert!(!pacer_has_pending_transmissions( + &ReleaseDecision { + time: ReleaseTime::Immediate, + allow_burst: true + }, + now + )); + assert!(!pacer_has_pending_transmissions( + &ReleaseDecision { + time: ReleaseTime::At(*now), + allow_burst: true + }, + now + )); + assert!(pacer_has_pending_transmissions( + &ReleaseDecision { + time: ReleaseTime::At(*now + Duration::from_millis(1)), + allow_burst: true + }, + now + )); + assert!(!pacer_has_pending_transmissions( + &ReleaseDecision { + time: ReleaseTime::At(*now - Duration::from_millis(1)), + allow_burst: true + }, + now + )); + } + #[test] fn loss_threshold() { let config = Config::new(crate::PROTOCOL_VERSION).unwrap(); diff --git a/quiche/src/recovery/mod.rs b/quiche/src/recovery/mod.rs index 920f46cbe85..20c3c7a1ee1 100644 --- a/quiche/src/recovery/mod.rs +++ b/quiche/src/recovery/mod.rs @@ -132,6 +132,7 @@ pub struct RecoveryConfig { pub max_send_udp_payload_size: usize, pub max_ack_delay: Duration, pub cc_algorithm: CongestionControlAlgorithm, + pub enable_bbr_app_limited_fix: bool, pub custom_bbr_params: Option, pub hystart: bool, pub pacing: bool, @@ -148,6 +149,7 @@ impl RecoveryConfig { max_send_udp_payload_size: config.max_send_udp_payload_size, max_ack_delay: Duration::ZERO, cc_algorithm: config.cc_algorithm, + enable_bbr_app_limited_fix: config.enable_bbr_app_limited_fix, custom_bbr_params: config.custom_bbr_params, hystart: config.hystart, pacing: config.pacing, @@ -319,6 +321,17 @@ pub trait RecoveryOps { fn get_next_release_time(&self) -> ReleaseDecision; fn gcongestion_enabled(&self) -> bool; + + // Called if the send loop stopped early because send_single + // returned Err::Done; this indicates that it is time to yield + // because either there is no cwnd remaining or there is no data + // left to send. + fn send_stopped_early(&mut self, has_flushable_data: bool); + + // Allow the BBR implementation to compute if the congestion + // controller is app-limited, before processing timeouts, ACKs or + // generating packets to send. + fn bbr_check_if_app_limited(&mut self, now: &Instant); } impl Recovery { diff --git a/quiche/src/tests.rs b/quiche/src/tests.rs index 7eec0653996..144dbb4b6ee 100644 --- a/quiche/src/tests.rs +++ b/quiche/src/tests.rs @@ -6236,9 +6236,11 @@ fn stream_data_blocked_unblocked_flow_control( fn app_limited_true( #[values("cubic", "bbr2_gcongestion")] cc_algorithm_name: &str, #[values(true, false)] discard: bool, + #[values(false, true)] enable_bbr_app_limited_fix: bool, ) { let mut config = Config::new(PROTOCOL_VERSION).unwrap(); assert_eq!(config.set_cc_algorithm_name(cc_algorithm_name), Ok(())); + config.set_enable_bbr_app_limited_fix(enable_bbr_app_limited_fix); config .set_application_protos(&[b"proto1", b"proto2"]) .unwrap(); @@ -6278,9 +6280,11 @@ fn app_limited_true( fn app_limited_false( #[values("cubic", "bbr2_gcongestion")] cc_algorithm_name: &str, #[values(true, false)] discard: bool, + #[values(false, true)] enable_bbr_app_limited_fix: bool, ) { let mut config = Config::new(PROTOCOL_VERSION).unwrap(); assert_eq!(config.set_cc_algorithm_name(cc_algorithm_name), Ok(())); + config.set_enable_bbr_app_limited_fix(enable_bbr_app_limited_fix); config .set_application_protos(&[b"proto1", b"proto2"]) .unwrap(); @@ -7072,9 +7076,11 @@ fn prevent_optimistic_ack( fn app_limited_false_no_frame( #[values("cubic", "bbr2_gcongestion")] cc_algorithm_name: &str, #[values(true, false)] discard: bool, + #[values(false, true)] enable_bbr_app_limited_fix: bool, ) { let mut config = Config::new(PROTOCOL_VERSION).unwrap(); assert_eq!(config.set_cc_algorithm_name(cc_algorithm_name), Ok(())); + config.set_enable_bbr_app_limited_fix(enable_bbr_app_limited_fix); config .set_application_protos(&[b"proto1", b"proto2"]) .unwrap(); @@ -7116,9 +7122,11 @@ fn app_limited_false_no_frame( fn app_limited_false_no_header( #[values("cubic", "bbr2_gcongestion")] cc_algorithm_name: &str, #[values(true, false)] discard: bool, + #[values(false, true)] enable_bbr_app_limited_fix: bool, ) { let mut config = Config::new(PROTOCOL_VERSION).unwrap(); assert_eq!(config.set_cc_algorithm_name(cc_algorithm_name), Ok(())); + config.set_enable_bbr_app_limited_fix(enable_bbr_app_limited_fix); config .set_application_protos(&[b"proto1", b"proto2"]) .unwrap(); @@ -7160,9 +7168,11 @@ fn app_limited_false_no_header( fn app_limited_not_changed_on_no_new_frames( #[values("cubic", "bbr2_gcongestion")] cc_algorithm_name: &str, #[values(true, false)] discard: bool, + #[values(false, true)] enable_bbr_app_limited_fix: bool, ) { let mut config = Config::new(PROTOCOL_VERSION).unwrap(); assert_eq!(config.set_cc_algorithm_name(cc_algorithm_name), Ok(())); + config.set_enable_bbr_app_limited_fix(enable_bbr_app_limited_fix); config .set_application_protos(&[b"proto1", b"proto2"]) .unwrap(); @@ -7185,25 +7195,51 @@ fn app_limited_not_changed_on_no_new_frames( // Client's app_limited is true because its bytes-in-flight // is much smaller than the current cwnd. - assert!(pipe - .client - .paths - .get_active() - .expect("no active") - .recovery - .app_limited()); + if cc_algorithm_name == "cubic" || enable_bbr_app_limited_fix { + assert!(pipe + .client + .paths + .get_active() + .expect("no active") + .recovery + .app_limited()); + } else { + // For BBR, the test-only app_limited() function used to be computed by + // looking at bytes in flight instead of looking at the app-limited state + // tracked by the bandwidth_sampler. The state tracked in the + // bandwidth_sampler is incorrect. + assert!(!pipe + .client + .paths + .get_active() + .expect("no active") + .recovery + .app_limited()); + } // Client has no new frames to send - returns Done. assert_eq!(test_utils::emit_flight(&mut pipe.client), Err(Error::Done)); // Client's app_limited should remain the same. - assert!(pipe - .client - .paths - .get_active() - .expect("no active") - .recovery - .app_limited()); + if cc_algorithm_name == "cubic" || enable_bbr_app_limited_fix { + assert!(pipe + .client + .paths + .get_active() + .expect("no active") + .recovery + .app_limited()); + } else { + // See comments further up about BBR. The app-limited computed by the + // bandwidth_sampler is incorrect in this case. + assert!(!pipe + .client + .paths + .get_active() + .expect("no active") + .recovery + .app_limited()); + } } #[rstest] @@ -8142,12 +8178,14 @@ fn dgram_send_fails_invalidstate( #[rstest] fn dgram_send_app_limited( #[values("cubic", "bbr2_gcongestion")] cc_algorithm_name: &str, + #[values(false, true)] enable_bbr_app_limited_fix: bool, ) { let mut buf = [0; 65535]; let send_buf = [0xcf; 1000]; let mut config = Config::new(PROTOCOL_VERSION).unwrap(); assert_eq!(config.set_cc_algorithm_name(cc_algorithm_name), Ok(())); + config.set_enable_bbr_app_limited_fix(enable_bbr_app_limited_fix); config .load_cert_chain_from_pem_file("examples/cert.crt") .unwrap(); @@ -8224,7 +8262,7 @@ fn dgram_send_app_limited( .expect("no active") .recovery .app_limited(), - should_be_app_limited + should_be_app_limited || enable_bbr_app_limited_fix ); } diff --git a/tokio-quiche/Cargo.toml b/tokio-quiche/Cargo.toml index eedc234488d..26ee01daee1 100644 --- a/tokio-quiche/Cargo.toml +++ b/tokio-quiche/Cargo.toml @@ -86,7 +86,9 @@ h3i = { workspace = true } http = "1" http-body = "1" http-body-util = "0.1" +parking_lot = { workspace = true } regex = { workspace = true } +rstest = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true, features = ["time", "test-util", "rt-multi-thread"] } diff --git a/tokio-quiche/src/quic/io/worker.rs b/tokio-quiche/src/quic/io/worker.rs index d5841d6e362..5440a159598 100644 --- a/tokio-quiche/src/quic/io/worker.rs +++ b/tokio-quiche/src/quic/io/worker.rs @@ -225,6 +225,16 @@ where loop { let now = Instant::now(); + if let Some(deadline) = current_deadline { + if deadline <= now { + qconn.on_timeout(); + + self.write_state.next_release_time = None; + current_deadline = None; + sleep.as_mut().reset((now + DEFAULT_SLEEP).into()); + } + } + self.write_state.has_pending_data = true; while self.write_state.has_pending_data { @@ -313,19 +323,8 @@ where select! { biased; - () = &mut sleep => { - // It's very important that we keep the timeout arm at the top of this loop so - // that we poll it every time we need to. Since this is a biased `select!`, if - // we put this behind another arm, we could theoretically starve the sleep arm - // and hang connections. - // - // See https://docs.rs/tokio/latest/tokio/macro.select.html#fairness for more - qconn.on_timeout(); - - self.write_state.next_release_time = None; - current_deadline = None; - sleep.as_mut().reset((now + DEFAULT_SLEEP).into()); - } + // The sleep branch will be handled by the current_deadline and on_timeout check on the next iteration of the loop. + () = &mut sleep => (), Some(pkt) = incoming_recv.recv() => ctx.in_pkt = Some(pkt), directive = self.wait_for_data_or_handshake(qconn, application) => { match directive? { diff --git a/tokio-quiche/src/settings/config.rs b/tokio-quiche/src/settings/config.rs index e163bb0446b..94b0fbb357c 100644 --- a/tokio-quiche/src/settings/config.rs +++ b/tokio-quiche/src/settings/config.rs @@ -213,6 +213,9 @@ fn make_quiche_config( if params.settings.enable_early_data { config.enable_early_data(); } + config.set_enable_bbr_app_limited_fix( + params.settings.enable_bbr_app_limited_fix, + ); if should_log_keys { config.log_keys(); diff --git a/tokio-quiche/src/settings/quic.rs b/tokio-quiche/src/settings/quic.rs index 5f0fcb4f941..1a2c5c78c88 100644 --- a/tokio-quiche/src/settings/quic.rs +++ b/tokio-quiche/src/settings/quic.rs @@ -340,6 +340,14 @@ pub struct QuicSettings { /// /// [`enable_track_unknown_transport_parameters()`]: https://docs.rs/quiche/latest/quiche/struct.Config.html#method.enable_track_unknown_transport_parameters pub track_unknown_transport_parameters: Option, + + /// Temporary parameter to enable a rework of how the BBR congestion control + /// implementation computes app-limited. This parameter will be removed + /// after we build confidence on the new implementation or decide to roll it + /// back. + /// + /// Defaults to `false`. + pub enable_bbr_app_limited_fix: bool, } impl QuicSettings { diff --git a/tokio-quiche/tests/integration_tests/app_limited.rs b/tokio-quiche/tests/integration_tests/app_limited.rs new file mode 100644 index 00000000000..5de1dd35d7f --- /dev/null +++ b/tokio-quiche/tests/integration_tests/app_limited.rs @@ -0,0 +1,190 @@ +// Copyright (C) 2026, Cloudflare, Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use crate::fixtures::*; + +use bytes::Bytes; +use futures::SinkExt; +use parking_lot::Mutex; +use rstest::rstest; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Notify; + +use h3i::client::connection_summary::ConnectionSummary; +use h3i::frame::H3iFrame; +use tokio_quiche::http3::driver::H3Event; +use tokio_quiche::http3::driver::IncomingH3Headers; +use tokio_quiche::http3::driver::OutboundFrame; +use tokio_quiche::http3::driver::ServerH3Event; +use tokio_quiche::quic::QuicCommand; +use tokio_quiche::quic::QuicConnectionStats; +use tokio_quiche::quiche::h3::Header; + +// A response that slowly trickles to the client will always be +// app-limited. The connection should not exit the BBR startup phase +// since has_bandwidth_growth ignores app-limited rounds. +#[rstest] +#[case::cubic("cubic", false)] +#[case::bbr2_app_limited_broken("bbr2", false)] +#[case::bbr2_app_limited_fixed("bbr2", true)] +#[tokio::test] +async fn test_app_limited_slow_upstream( + #[case] cc_algorithm_name: &str, #[case] enable_bbr_app_limited_fix: bool, +) { + let hook = TestConnectionHook::new(); + + // The size of the response to trickle at 1 byte / msec. + let response_size = 2000; + + let server_path_stats: Arc>> = + Default::default(); + let server_path_stats_clone = server_path_stats.clone(); + let stats_notify: Arc = Arc::new(Notify::new()); + let stats_notify_clone = stats_notify.clone(); + + let capture_stats = Box::new(move |stats: QuicConnectionStats| { + *server_path_stats_clone.lock() = stats.path_stats.clone(); + stats_notify_clone.notify_one(); + }); + + let mut quic_settings = QuicSettings::default(); + quic_settings.cc_algorithm = cc_algorithm_name.to_string(); + quic_settings.enable_bbr_app_limited_fix = enable_bbr_app_limited_fix; + + let (url, _) = start_server_with_settings( + quic_settings, + Http3Settings::default(), + hook, + move |mut h3_conn| { + let capture_stats = capture_stats.clone(); + async move { + let cmd_sender = h3_conn.h3_controller.cmd_sender(); + let event_rx = h3_conn.h3_controller.event_receiver_mut(); + + while let Some(event) = event_rx.recv().await { + match event { + ServerH3Event::Core(event) => match event { + H3Event::ConnectionShutdown(_) => break, + + _ => (), + }, + + ServerH3Event::Headers { + incoming_headers, .. + } => { + let IncomingH3Headers { mut send, .. } = + incoming_headers; + + // Send additional headers. + send.send(OutboundFrame::Headers( + vec![Header::new(b":status", b"200")], + None, + )) + .await + .unwrap(); + + for _ in 0..response_size { + send.send(OutboundFrame::Body( + Bytes::copy_from_slice(&[23; 1]), + false, + )) + .await + .unwrap(); + tokio::time::sleep(Duration::from_millis(1)) + .await; + } + + // Work around: send the stats gathering command + // before the fin to guarantee that the command will + // be accepted. + cmd_sender + .send(QuicCommand::ConnectionStats( + capture_stats.clone(), + )) + .expect("driver is gone?"); + + // Send fin + send.send(OutboundFrame::Body( + Default::default(), + true, + )) + .await + .unwrap(); + }, + } + } + } + }, + ); + + let summary = h3i_fixtures::request(&url, 1) + .await + .expect("request failed"); + + let mut headers = summary.stream_map.headers_on_stream(0).into_iter(); + + assert_eq!( + headers.next().expect("headers").status_code(), + Some(&Vec::from("200".as_bytes())) + ); + assert!(headers.next().is_none()); + + let frame_bytes = body_bytes(&summary, 0); + assert_eq!(frame_bytes, response_size); + + stats_notify.notified().await; + + // BBR shouldn't have exited startup because the full workflow was app-limited + // and there was no loss. + let server_path_stats = server_path_stats.lock().clone().unwrap(); + assert_eq!(server_path_stats.lost, 0); + if cc_algorithm_name == "cubic" || enable_bbr_app_limited_fix { + assert_eq!(server_path_stats.startup_exit, None); + } else { + // BBR incorrectly exits startup when enable_bbr_app_limited_fix is not + // enabled. + assert_ne!(server_path_stats.startup_exit, None); + } +} + +fn body_bytes(summary: &ConnectionSummary, stream_id: u64) -> usize { + summary + .stream_map + .stream(stream_id) + .into_iter() + .map(|h3i_frame| { + if let H3iFrame::QuicheH3(quiche::h3::frame::Frame::Data { + payload, + }) = h3i_frame + { + payload.len() + } else { + 0 + } + }) + .sum::() +} diff --git a/tokio-quiche/tests/integration_tests/mod.rs b/tokio-quiche/tests/integration_tests/mod.rs index a7f4f6c0cb9..c217bbbc401 100644 --- a/tokio-quiche/tests/integration_tests/mod.rs +++ b/tokio-quiche/tests/integration_tests/mod.rs @@ -40,6 +40,7 @@ use tokio_quiche::settings::TlsCertificatePaths; use tokio_quiche::ConnectionParams; use tokio_quiche::InitialQuicConnection; +pub mod app_limited; pub mod async_callbacks; pub mod connection_close; pub mod headers;