Skip to content

Commit

Permalink
Minor mixnet client code simplification and optimization (#472)
Browse files Browse the repository at this point in the history
* Minor code simplification and optimization

* Made the connection establishing call non-blocking
  • Loading branch information
jstuczyn committed Nov 26, 2020
1 parent 33d5eff commit 210b546
Showing 1 changed file with 29 additions and 29 deletions.
58 changes: 29 additions & 29 deletions common/client-libs/mixnet-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use futures::channel::mpsc;
use futures::SinkExt;
use futures::StreamExt;
use log::*;
use nymsphinx::framing::codec::SphinxCodec;
use nymsphinx::framing::packet::FramedSphinxPacket;
Expand All @@ -25,7 +25,7 @@ use std::net::SocketAddr;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::stream::StreamExt;
use tokio::net::TcpStream;
use tokio::time::delay_for;
use tokio_util::codec::Framed;

Expand Down Expand Up @@ -81,51 +81,51 @@ impl Client {

async fn manage_connection(
address: SocketAddr,
mut receiver: mpsc::Receiver<FramedSphinxPacket>,
receiver: mpsc::Receiver<FramedSphinxPacket>,
connection_timeout: Duration,
current_reconnection: &AtomicU32,
) -> io::Result<()> {
let mut conn = match std::net::TcpStream::connect_timeout(&address, connection_timeout) {
Ok(stream) => {
let tokio_stream = tokio::net::TcpStream::from_std(stream).unwrap();
debug!("Managed to establish connection to {}", address);
// if we managed to connect, reset the reconnection count (whatever it might have been)
current_reconnection.store(0, Ordering::Release);
) {
let connection_fut = TcpStream::connect(address);

Framed::new(tokio_stream, SphinxCodec)
}
Err(err) => {
let conn = match tokio::time::timeout(connection_timeout, connection_fut).await {
Ok(stream_res) => match stream_res {
Ok(stream) => {
debug!("Managed to establish connection to {}", address);
// if we managed to connect, reset the reconnection count (whatever it might have been)
current_reconnection.store(0, Ordering::Release);
Framed::new(stream, SphinxCodec)
}
Err(err) => {
debug!(
"failed to establish connection to {} (err: {})",
address, err
);
return;
}
},
Err(_) => {
debug!(
"failed to connect to {} within {:?}",
address, connection_timeout
);

// we failed to connect - increase reconnection attempt
current_reconnection.fetch_add(1, Ordering::SeqCst);

return Err(err);
return;
}
};

while let Some(packet) = receiver.next().await {
if let Err(err) = conn.send(packet).await {
// I've put this as a warning rather than debug because this implies we managed
// to connect to this destination but it failed later
warn!("Failed to forward packet to {} - {:?}", address, err);
// there's no point in draining the channel, it's incredibly unlikely further
// messages might succeed
break;
} else {
trace!("managed to forward packet to {}", address)
}
// Take whatever the receiver channel produces and put it on the connection.
// We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care
// about neither receiver nor the connection, it doesn't matter which one gets consumed
if let Err(err) = receiver.map(Ok).forward(conn).await {
warn!("Failed to forward packets to {} - {:?}", address, err);
}

// if we got here it means the mixnet client was dropped
debug!(
"connection manager to {} is finished. Presumably mixnet client got dropped",
"connection manager to {} is finished. Either the connection failed or mixnet client got dropped",
address
);
Ok(())
}

/// If we're trying to reconnect, determine how long we should wait.
Expand Down

0 comments on commit 210b546

Please sign in to comment.