Skip to content

Commit

Permalink
feat(udp): use recvmmsg
Browse files Browse the repository at this point in the history
Read up to `BATCH_SIZE = 32` with single `recvmmsg` syscall.

Previously `neqo_bin::udp::Socket::recv` would use `recvmmsg`, but provide a
single buffer to write into only, effectively using `recvmsg` instead of
`recvmmsg`.

With this commit `Socket::recv` provides `BATCH_SIZE` number of buffers on each
`recvmmsg` syscall, thus reading more than one datagram at a time if available.
  • Loading branch information
mxinden committed Mar 21, 2024
1 parent 177385e commit a8a1b80
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 52 deletions.
4 changes: 4 additions & 0 deletions neqo-bin/src/bin/client/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ impl super::Client for Connection {
self.process(dgram, now)
}

fn process_input(&mut self, dgram: &Datagram, now: Instant) {
self.process_input(dgram, now);
}

fn close<S>(&mut self, now: Instant, app_error: neqo_transport::AppError, msg: S)
where
S: AsRef<str> + std::fmt::Display,
Expand Down
4 changes: 4 additions & 0 deletions neqo-bin/src/bin/client/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ impl super::Client for Http3Client {
self.process(dgram, now)
}

fn process_input(&mut self, dgram: &Datagram, now: Instant) {
self.process_input(dgram, now);
}

fn close<S>(&mut self, now: Instant, app_error: AppError, msg: S)
where
S: AsRef<str> + Display,
Expand Down
12 changes: 8 additions & 4 deletions neqo-bin/src/bin/client/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,9 @@ trait Handler {

/// Network client, e.g. [`neqo_transport::Connection`] or [`neqo_http3::Http3Client`].
trait Client {
// TODO: datagram option needed?
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output;
fn process_input(&mut self, dgram: &Datagram, now: Instant);
fn close<S>(&mut self, now: Instant, app_error: AppError, msg: S)
where
S: AsRef<str> + Display;
Expand Down Expand Up @@ -365,11 +367,13 @@ impl<'a, H: Handler> Runner<'a, H> {
match ready(self.socket, self.timeout.as_mut()).await? {
Ready::Socket => loop {
let dgrams = self.socket.recv(&self.local_addr)?;
if dgrams.is_empty() {
break;
let mut is_empty = true;
for dgram in dgrams {
is_empty = false;
self.client.process_input(&dgram, Instant::now());
}
for dgram in &dgrams {
self.process(Some(dgram)).await?;
if is_empty {
break;
}
self.handler.maybe_key_update(&mut self.client)?;
},
Expand Down
3 changes: 2 additions & 1 deletion neqo-bin/src/bin/server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,8 @@ impl ServersRunner {
match self.ready().await? {
Ready::Socket(inx) => loop {
let (host, socket) = self.sockets.get_mut(inx).unwrap();
let dgrams = socket.recv(host)?;
// TODO: Remove collect.
let dgrams: Vec<_> = socket.recv(host)?.collect();
if dgrams.is_empty() {
break;
}
Expand Down
108 changes: 61 additions & 47 deletions neqo-bin/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,22 @@

use std::{
io::{self, IoSliceMut},
mem::MaybeUninit,
net::{SocketAddr, ToSocketAddrs},
slice,
};

use neqo_common::{Datagram, IpTos};
use neqo_common::{qwarn, Datagram, IpTos};
use quinn_udp::{EcnCodepoint, RecvMeta, Transmit, UdpSocketState};
use tokio::io::Interest;

#[cfg(not(any(target_os = "macos", target_os = "ios")))]
// Chosen somewhat arbitrarily; might benefit from additional tuning.
pub(crate) const BATCH_SIZE: usize = 32;

#[cfg(any(target_os = "macos", target_os = "ios"))]
pub(crate) const BATCH_SIZE: usize = 1;

/// Socket receive buffer size.
///
/// Allows reading multiple datagrams in a single [`Socket::recv`] call.
Expand All @@ -25,7 +33,7 @@ const RECV_BUF_SIZE: usize = u16::MAX as usize;
pub struct Socket {
socket: tokio::net::UdpSocket,
state: UdpSocketState,
recv_buf: Vec<u8>,
recv_bufs: [Vec<u8>; BATCH_SIZE],
}

impl Socket {
Expand All @@ -36,7 +44,11 @@ impl Socket {
Ok(Self {
state: quinn_udp::UdpSocketState::new((&socket).into())?,
socket: tokio::net::UdpSocket::from_std(socket)?,
recv_buf: vec![0; RECV_BUF_SIZE],
recv_bufs: (0..BATCH_SIZE)
.map(|_| vec![0; RECV_BUF_SIZE])
.collect::<Vec<_>>()
.try_into()
.expect("successful array instantiation"),
})
}

Expand Down Expand Up @@ -76,53 +88,55 @@ impl Socket {
}

/// Receive a UDP datagram on the specified socket.
pub fn recv(&mut self, local_address: &SocketAddr) -> Result<Vec<Datagram>, io::Error> {
let mut meta = RecvMeta::default();

match self.socket.try_io(Interest::READABLE, || {
self.state.recv(
(&self.socket).into(),
&mut [IoSliceMut::new(&mut self.recv_buf)],
slice::from_mut(&mut meta),
)
pub fn recv<'a>(
&'a mut self,
local_address: &'a SocketAddr,
) -> Result<impl Iterator<Item = Datagram> + 'a, io::Error> {
let mut metas = [RecvMeta::default(); BATCH_SIZE];

// TODO: Safe? Double check.
let mut iovs = MaybeUninit::<[IoSliceMut<'_>; BATCH_SIZE]>::uninit();
for (i, buf) in self.recv_bufs.iter_mut().enumerate() {
unsafe {
iovs.as_mut_ptr()
.cast::<IoSliceMut>()
.add(i)
.write(IoSliceMut::new(buf));
};
}
let mut iovs = unsafe { iovs.assume_init() };

let msgs = match self.socket.try_io(Interest::READABLE, || {
self.state
.recv((&self.socket).into(), &mut iovs, &mut metas)
}) {
Ok(n) => {
assert_eq!(n, 1, "only passed one slice");
}
Err(ref err)
if err.kind() == io::ErrorKind::WouldBlock
|| err.kind() == io::ErrorKind::Interrupted =>
{
return Ok(vec![])
}
Err(err) => {
return Err(err);
}
Ok(n) => n,
Err(e) if e.kind() == io::ErrorKind::WouldBlock => 0,
Err(e) => return Err(e),
};

if meta.len == 0 {
eprintln!("zero length datagram received?");
return Ok(vec![]);
}
if meta.len == self.recv_buf.len() {
eprintln!(
"Might have received more than {} bytes",
self.recv_buf.len()
);
}

Ok(self.recv_buf[0..meta.len]
.chunks(meta.stride.min(self.recv_buf.len()))
.map(|d| {
Datagram::new(
meta.addr,
*local_address,
meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(),
None, // TODO: get the real TTL https://github.com/quinn-rs/quinn/issues/1749
d,
)
})
.collect())
Ok(metas
.into_iter()
.zip(self.recv_bufs.iter())
.take(msgs)
.flat_map(move |(meta, buf)| {
// TODO: Needed?
if meta.len == buf.len() {
qwarn!("Might have received more than {} bytes", buf.len());
}

buf[0..meta.len]
.chunks(meta.stride.min(buf.len()))
.map(move |d| {
Datagram::new(
meta.addr,
*local_address,
meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(),
None, // TODO: get the real TTL https://github.com/quinn-rs/quinn/issues/1749
d,
)
})
}))
}
}

Expand Down

0 comments on commit a8a1b80

Please sign in to comment.