Skip to content

Commit

Permalink
refactor(client): use process_output and process_multiple_input (#1794)
Browse files Browse the repository at this point in the history
* refactor(client): use process_output and process_multiple_input

`neqo_transport::Connection` offers 4 process methods:

- `process`
- `process_output`
- `process_input`
- `process_multiple_input`

Where `process` is a wrapper around `process_input` and `process_output` calling
both in sequence.

https://github.com/mozilla/neqo/blob/5dfe106669ccb695187511305c21b8e8a8775e91/neqo-transport/src/connection/mod.rs#L1099-L1107

Where `process_input` delegates to `process_multiple_input`.

https://github.com/mozilla/neqo/blob/5dfe106669ccb695187511305c21b8e8a8775e91/neqo-transport/src/connection/mod.rs#L979-L1000

Previously `neqo-client` would use `process` only. Thus continuously
interleaving output and input. Say `neqo-client` would have multiple datagrams
buffered through a GRO read, it could potentially have to do a write in between
each `process` calls, as each call to `process` with an input datagram might
return an output datagram to be written.

With this commit `neqo-client` uses `process_output` and `process_multiple_input`
directly, thus reducing interleaving on batch reads (GRO and in the future
recvmmsg) and in the future batch writes (GSO and sendmmsg).

By using `process_multiple_input` instead of `process` or `process_input`,
auxiliarry logic, like `self.cleanup_closed_streams` only has to run per input
datagram batch, and not for each input datagram.

Extracted from #1741.

* process_output before handle

* process_ouput after each input batch
  • Loading branch information
mxinden committed Apr 9, 2024
1 parent 342e4e7 commit 5f9c3e7
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 37 deletions.
11 changes: 9 additions & 2 deletions neqo-bin/src/client/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,15 @@ pub(crate) fn create_client(
}

impl super::Client for Connection {
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output {
self.process(dgram, now)
fn process_output(&mut self, now: Instant) -> Output {
self.process_output(now)
}

fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>,
{
self.process_multiple_input(dgrams, now);
}

fn close<S>(&mut self, now: Instant, app_error: neqo_transport::AppError, msg: S)
Expand Down
11 changes: 9 additions & 2 deletions neqo-bin/src/client/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,15 @@ impl super::Client for Http3Client {
None
}

fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output {
self.process(dgram, now)
fn process_output(&mut self, now: Instant) -> Output {
self.process_output(now)
}

fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>,
{
self.process_multiple_input(dgrams, now);
}

fn close<S>(&mut self, now: Instant, app_error: AppError, msg: S)
Expand Down
61 changes: 35 additions & 26 deletions neqo-bin/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,10 @@ trait Handler {

/// Network client, e.g. [`neqo_transport::Connection`] or [`neqo_http3::Http3Client`].
trait Client {
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output;
fn process_output(&mut self, now: Instant) -> Output;
fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>;
fn close<S>(&mut self, now: Instant, app_error: AppError, msg: S)
where
S: AsRef<str> + Display;
Expand All @@ -404,21 +407,21 @@ impl<'a, H: Handler> Runner<'a, H> {
let handler_done = self.handler.handle(&mut self.client)?;

match (handler_done, self.args.resume, self.handler.has_token()) {
// Handler isn't done. Continue.
(false, _, _) => {},
// Handler done. Resumption token needed but not present. Continue.
(true, true, false) => {
qdebug!("Handler done. Waiting for resumption token.");
}
// Handler is done, no resumption token needed. Close.
(true, false, _) |
// Handler is done, resumption token needed and present. Close.
(true, true, true) => {
self.client.close(Instant::now(), 0, "kthxbye!");
}
// Handler isn't done. Continue.
(false, _, _) => {},
// Handler done. Resumption token needed but not present. Continue.
(true, true, false) => {
qdebug!("Handler done. Waiting for resumption token.");
}
// Handler is done, no resumption token needed. Close.
(true, false, _) |
// Handler is done, resumption token needed and present. Close.
(true, true, true) => {
self.client.close(Instant::now(), 0, "kthxbye!");
}
}

self.process(None).await?;
self.process_output().await?;

if let Some(reason) = self.client.is_closed() {
if self.args.stats {
Expand All @@ -432,26 +435,17 @@ impl<'a, H: Handler> Runner<'a, H> {
}

match ready(self.socket, self.timeout.as_mut()).await? {
Ready::Socket => loop {
let dgrams = self.socket.recv(&self.local_addr)?;
if dgrams.is_empty() {
break;
}
for dgram in &dgrams {
self.process(Some(dgram)).await?;
}
self.handler.maybe_key_update(&mut self.client)?;
},
Ready::Socket => self.process_multiple_input().await?,
Ready::Timeout => {
self.timeout = None;
}
}
}
}

async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> {
async fn process_output(&mut self) -> Result<(), io::Error> {
loop {
match self.client.process(dgram.take(), Instant::now()) {
match self.client.process_output(Instant::now()) {
Output::Datagram(dgram) => {
self.socket.writable().await?;
self.socket.send(dgram)?;
Expand All @@ -470,6 +464,21 @@ impl<'a, H: Handler> Runner<'a, H> {

Ok(())
}

async fn process_multiple_input(&mut self) -> Res<()> {
loop {
let dgrams = self.socket.recv(&self.local_addr)?;
if dgrams.is_empty() {
break;
}
self.client
.process_multiple_input(dgrams.iter(), Instant::now());
self.process_output().await?;
self.handler.maybe_key_update(&mut self.client)?;
}

Ok(())
}
}

fn qlog_new(args: &Args, hostname: &str, cid: &ConnectionId) -> Res<NeqoQlog> {
Expand Down
7 changes: 3 additions & 4 deletions neqo-http3/src/connection_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,11 +880,10 @@ impl Http3Client {
pub fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>,
I::IntoIter: ExactSizeIterator,
{
let dgrams = dgrams.into_iter();
qtrace!([self], "Process multiple datagrams, len={}", dgrams.len());
if dgrams.len() == 0 {
let mut dgrams = dgrams.into_iter().peekable();
qtrace!([self], "Process multiple datagrams");
if dgrams.peek().is_none() {
return;
}
self.conn.process_multiple_input(dgrams, now);
Expand Down
5 changes: 2 additions & 3 deletions neqo-transport/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -985,10 +985,9 @@ impl Connection {
pub fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>,
I::IntoIter: ExactSizeIterator,
{
let dgrams = dgrams.into_iter();
if dgrams.len() == 0 {
let mut dgrams = dgrams.into_iter().peekable();
if dgrams.peek().is_none() {
return;
}

Expand Down

0 comments on commit 5f9c3e7

Please sign in to comment.