diff --git a/neqo-bin/src/client/http09.rs b/neqo-bin/src/client/http09.rs index b157a6a13..e0b254f67 100644 --- a/neqo-bin/src/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -138,8 +138,15 @@ pub(crate) fn create_client( } impl super::Client for Connection { - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { - self.process(dgram, now) + fn process_output(&mut self, now: Instant) -> Output { + self.process_output(now) + } + + fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) + where + I: IntoIterator, + { + self.process_multiple_input(dgrams, now); } fn close(&mut self, now: Instant, app_error: neqo_transport::AppError, msg: S) diff --git a/neqo-bin/src/client/http3.rs b/neqo-bin/src/client/http3.rs index d56af5eda..09a30461b 100644 --- a/neqo-bin/src/client/http3.rs +++ b/neqo-bin/src/client/http3.rs @@ -119,8 +119,15 @@ impl super::Client for Http3Client { None } - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { - self.process(dgram, now) + fn process_output(&mut self, now: Instant) -> Output { + self.process_output(now) + } + + fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) + where + I: IntoIterator, + { + self.process_multiple_input(dgrams, now); } fn close(&mut self, now: Instant, app_error: AppError, msg: S) diff --git a/neqo-bin/src/client/mod.rs b/neqo-bin/src/client/mod.rs index 49c116aa9..791e2a636 100644 --- a/neqo-bin/src/client/mod.rs +++ b/neqo-bin/src/client/mod.rs @@ -377,7 +377,10 @@ trait Handler { /// Network client, e.g. [`neqo_transport::Connection`] or [`neqo_http3::Http3Client`]. trait Client { - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output; + fn process_output(&mut self, now: Instant) -> Output; + fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) + where + I: IntoIterator; fn close(&mut self, now: Instant, app_error: AppError, msg: S) where S: AsRef + Display; @@ -404,21 +407,21 @@ impl<'a, H: Handler> Runner<'a, H> { let handler_done = self.handler.handle(&mut self.client)?; match (handler_done, self.args.resume, self.handler.has_token()) { - // Handler isn't done. Continue. - (false, _, _) => {}, - // Handler done. Resumption token needed but not present. Continue. - (true, true, false) => { - qdebug!("Handler done. Waiting for resumption token."); - } - // Handler is done, no resumption token needed. Close. - (true, false, _) | - // Handler is done, resumption token needed and present. Close. - (true, true, true) => { - self.client.close(Instant::now(), 0, "kthxbye!"); - } + // Handler isn't done. Continue. + (false, _, _) => {}, + // Handler done. Resumption token needed but not present. Continue. + (true, true, false) => { + qdebug!("Handler done. Waiting for resumption token."); + } + // Handler is done, no resumption token needed. Close. + (true, false, _) | + // Handler is done, resumption token needed and present. Close. + (true, true, true) => { + self.client.close(Instant::now(), 0, "kthxbye!"); } + } - self.process(None).await?; + self.process_output().await?; if let Some(reason) = self.client.is_closed() { if self.args.stats { @@ -432,16 +435,7 @@ impl<'a, H: Handler> Runner<'a, H> { } match ready(self.socket, self.timeout.as_mut()).await? { - Ready::Socket => loop { - let dgrams = self.socket.recv(&self.local_addr)?; - if dgrams.is_empty() { - break; - } - for dgram in &dgrams { - self.process(Some(dgram)).await?; - } - self.handler.maybe_key_update(&mut self.client)?; - }, + Ready::Socket => self.process_multiple_input().await?, Ready::Timeout => { self.timeout = None; } @@ -449,9 +443,9 @@ impl<'a, H: Handler> Runner<'a, H> { } } - async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> { + async fn process_output(&mut self) -> Result<(), io::Error> { loop { - match self.client.process(dgram.take(), Instant::now()) { + match self.client.process_output(Instant::now()) { Output::Datagram(dgram) => { self.socket.writable().await?; self.socket.send(dgram)?; @@ -470,6 +464,21 @@ impl<'a, H: Handler> Runner<'a, H> { Ok(()) } + + async fn process_multiple_input(&mut self) -> Res<()> { + loop { + let dgrams = self.socket.recv(&self.local_addr)?; + if dgrams.is_empty() { + break; + } + self.client + .process_multiple_input(dgrams.iter(), Instant::now()); + self.process_output().await?; + self.handler.maybe_key_update(&mut self.client)?; + } + + Ok(()) + } } fn qlog_new(args: &Args, hostname: &str, cid: &ConnectionId) -> Res { diff --git a/neqo-http3/src/connection_client.rs b/neqo-http3/src/connection_client.rs index be2012635..4c8772d14 100644 --- a/neqo-http3/src/connection_client.rs +++ b/neqo-http3/src/connection_client.rs @@ -880,11 +880,10 @@ impl Http3Client { pub fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) where I: IntoIterator, - I::IntoIter: ExactSizeIterator, { - let dgrams = dgrams.into_iter(); - qtrace!([self], "Process multiple datagrams, len={}", dgrams.len()); - if dgrams.len() == 0 { + let mut dgrams = dgrams.into_iter().peekable(); + qtrace!([self], "Process multiple datagrams"); + if dgrams.peek().is_none() { return; } self.conn.process_multiple_input(dgrams, now); diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index 9cddcdac2..e471c29c2 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -985,10 +985,9 @@ impl Connection { pub fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) where I: IntoIterator, - I::IntoIter: ExactSizeIterator, { - let dgrams = dgrams.into_iter(); - if dgrams.len() == 0 { + let mut dgrams = dgrams.into_iter().peekable(); + if dgrams.peek().is_none() { return; }