Skip to content

Commit

Permalink
Putting initial packet onto the queue when establishing connection (#471
Browse files Browse the repository at this point in the history
)
  • Loading branch information
jstuczyn committed Nov 25, 2020
1 parent fa5c512 commit 33d5eff
Showing 1 changed file with 20 additions and 8 deletions.
28 changes: 20 additions & 8 deletions common/client-libs/mixnet-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,17 @@ impl Client {
}
}

fn make_connection(&mut self, address: NymNodeRoutingAddress) {
let (sender, receiver) = mpsc::channel(self.config.maximum_connection_buffer_size);
fn make_connection(
&mut self,
address: NymNodeRoutingAddress,
pending_packet: FramedSphinxPacket,
) {
let (mut sender, receiver) = mpsc::channel(self.config.maximum_connection_buffer_size);

// this CAN'T fail because we just created the channel which has a non-zero capacity
if self.config.maximum_connection_buffer_size > 0 {
sender.try_send(pending_packet).unwrap();
}

// if we already tried to connect to `address` before, grab the current attempt count
let current_reconnection_attempt = if let Some(existing) = self.conn_new.get_mut(&address) {
Expand Down Expand Up @@ -198,13 +207,14 @@ impl Client {
packet_mode: PacketMode,
) -> io::Result<()> {
trace!("Sending packet to {:?}", address);
let framed_packet = FramedSphinxPacket::new(packet, packet_mode);

if let Some(sender) = self.conn_new.get_mut(&address) {
let framed_packet = FramedSphinxPacket::new(packet, packet_mode);
if let Err(err) = sender.channel.try_send(framed_packet) {
if err.is_full() {
debug!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address);
// it's not a 'big' error, but we did not manage to send the packet
// it's not a 'big' error, but we did not manage to send the packet
// if the queue is full, we can't really do anything but to drop the packet
Err(io::Error::new(
io::ErrorKind::WouldBlock,
"connection queue is full",
Expand All @@ -214,8 +224,9 @@ impl Client {
"Connection to {} seems to be dead. attempting to re-establish it...",
address
);
self.make_connection(address);
// it's not a 'big' error, but we did not manage to send the packet
// it's not a 'big' error, but we did not manage to send the packet, but queue
// it up to send it as soon as the connection is re-established
self.make_connection(address, err.into_inner());
Err(io::Error::new(
io::ErrorKind::ConnectionAborted,
"reconnection attempt is in progress",
Expand All @@ -233,8 +244,9 @@ impl Client {
} else {
// there was never a connection to begin with
debug!("establishing initial connection to {}", address);
self.make_connection(address);
// it's not a 'big' error, but we did not manage to send the packet
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
// for sending for as soon as the connection is created
self.make_connection(address, framed_packet);
Err(io::Error::new(
io::ErrorKind::NotConnected,
"connection is in progress",
Expand Down

0 comments on commit 33d5eff

Please sign in to comment.