From 210b5466bb85a26acddcc8b43fee0412c1eaae76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C4=99drzej=20Stuczy=C5=84ski?= Date: Thu, 26 Nov 2020 13:06:05 +0000 Subject: [PATCH] Minor mixnet client code simplification and optimization (#472) * Minor code simplification and optimization * Made the connection establishing call non-blocking --- .../client-libs/mixnet-client/src/client.rs | 58 +++++++++---------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/common/client-libs/mixnet-client/src/client.rs b/common/client-libs/mixnet-client/src/client.rs index 44f8e68268..a0781824f2 100644 --- a/common/client-libs/mixnet-client/src/client.rs +++ b/common/client-libs/mixnet-client/src/client.rs @@ -13,7 +13,7 @@ // limitations under the License. use futures::channel::mpsc; -use futures::SinkExt; +use futures::StreamExt; use log::*; use nymsphinx::framing::codec::SphinxCodec; use nymsphinx::framing::packet::FramedSphinxPacket; @@ -25,7 +25,7 @@ use std::net::SocketAddr; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use std::time::Duration; -use tokio::stream::StreamExt; +use tokio::net::TcpStream; use tokio::time::delay_for; use tokio_util::codec::Framed; @@ -81,20 +81,29 @@ impl Client { async fn manage_connection( address: SocketAddr, - mut receiver: mpsc::Receiver, + receiver: mpsc::Receiver, connection_timeout: Duration, current_reconnection: &AtomicU32, - ) -> io::Result<()> { - let mut conn = match std::net::TcpStream::connect_timeout(&address, connection_timeout) { - Ok(stream) => { - let tokio_stream = tokio::net::TcpStream::from_std(stream).unwrap(); - debug!("Managed to establish connection to {}", address); - // if we managed to connect, reset the reconnection count (whatever it might have been) - current_reconnection.store(0, Ordering::Release); + ) { + let connection_fut = TcpStream::connect(address); - Framed::new(tokio_stream, SphinxCodec) - } - Err(err) => { + let conn = match tokio::time::timeout(connection_timeout, connection_fut).await { + Ok(stream_res) => match stream_res { + Ok(stream) => { + debug!("Managed to establish connection to {}", address); + // if we managed to connect, reset the reconnection count (whatever it might have been) + current_reconnection.store(0, Ordering::Release); + Framed::new(stream, SphinxCodec) + } + Err(err) => { + debug!( + "failed to establish connection to {} (err: {})", + address, err + ); + return; + } + }, + Err(_) => { debug!( "failed to connect to {} within {:?}", address, connection_timeout @@ -102,30 +111,21 @@ impl Client { // we failed to connect - increase reconnection attempt current_reconnection.fetch_add(1, Ordering::SeqCst); - - return Err(err); + return; } }; - while let Some(packet) = receiver.next().await { - if let Err(err) = conn.send(packet).await { - // I've put this as a warning rather than debug because this implies we managed - // to connect to this destination but it failed later - warn!("Failed to forward packet to {} - {:?}", address, err); - // there's no point in draining the channel, it's incredibly unlikely further - // messages might succeed - break; - } else { - trace!("managed to forward packet to {}", address) - } + // Take whatever the receiver channel produces and put it on the connection. + // We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care + // about neither receiver nor the connection, it doesn't matter which one gets consumed + if let Err(err) = receiver.map(Ok).forward(conn).await { + warn!("Failed to forward packets to {} - {:?}", address, err); } - // if we got here it means the mixnet client was dropped debug!( - "connection manager to {} is finished. Presumably mixnet client got dropped", + "connection manager to {} is finished. Either the connection failed or mixnet client got dropped", address ); - Ok(()) } /// If we're trying to reconnect, determine how long we should wait.