Skip to content

Commit

Permalink
allow to bind interface when connect to peer
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq committed Sep 22, 2024
1 parent 5133ec1 commit 26ae1c6
Show file tree
Hide file tree
Showing 54 changed files with 498 additions and 347 deletions.
7 changes: 6 additions & 1 deletion g3bench/src/module/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@
* limitations under the License.
*/

pub(crate) mod socket;

pub(crate) mod proxy_protocol;

pub(crate) mod http;

pub(crate) mod openssl;
pub(crate) mod proxy_protocol;
pub(crate) mod rustls;

pub(crate) mod ssl;
104 changes: 104 additions & 0 deletions g3bench/src/module/socket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2024 ByteDance and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#[cfg(feature = "quic")]
use std::net::UdpSocket;
use std::net::{IpAddr, SocketAddr};

use anyhow::anyhow;
use clap::{value_parser, Arg, ArgMatches, Command};
use tokio::net::TcpStream;

use g3_socket::BindAddr;
#[cfg(any(target_os = "linux", target_os = "android"))]
use g3_socket::InterfaceName;

const SOCKET_ARG_LOCAL_ADDRESS: &str = "local-address";
#[cfg(any(target_os = "linux", target_os = "android"))]
const SOCKET_ARG_INTERFACE: &str = "interface";

pub(crate) trait AppendSocketArgs {
fn append_socket_args(self) -> Self;
}

#[derive(Default)]
pub(crate) struct SocketArgs {
bind: BindAddr,
}

impl SocketArgs {
pub(crate) async fn tcp_connect_to(&self, peer: SocketAddr) -> anyhow::Result<TcpStream> {
let socket = g3_socket::tcp::new_socket_to(
peer.ip(),
&self.bind,
&Default::default(),
&Default::default(),
true,
)
.map_err(|e| anyhow!("failed to setup socket to {peer}: {e:?}"))?;
socket
.connect(peer)
.await
.map_err(|e| anyhow!("connect to {peer} error: {e:?}"))
}

#[cfg(feature = "quic")]
pub(crate) fn udp_std_socket_to(&self, peer: SocketAddr) -> anyhow::Result<UdpSocket> {
g3_socket::udp::new_std_socket_to(peer, &self.bind, Default::default(), Default::default())
.map_err(|e| anyhow!("failed to setup local udp socket: {e}"))
}

pub(crate) fn parse_args(&mut self, args: &ArgMatches) -> anyhow::Result<()> {
if let Some(ip) = args.get_one::<IpAddr>(SOCKET_ARG_LOCAL_ADDRESS) {
self.bind = BindAddr::Ip(*ip);
}
#[cfg(any(target_os = "linux", target_os = "android"))]
if let Some(name) = args.get_one::<InterfaceName>(SOCKET_ARG_INTERFACE) {
self.bind = BindAddr::Interface(*name);
}
Ok(())
}
}

impl AppendSocketArgs for Command {
fn append_socket_args(self) -> Self {
append_socket_args(self)
}
}

pub(crate) fn append_socket_args(mut cmd: Command) -> Command {
macro_rules! add_arg {
($arg:expr) => {
cmd = cmd.arg($arg);
};
}

add_arg!(Arg::new(SOCKET_ARG_LOCAL_ADDRESS)
.value_name("LOCAL IP ADDRESS")
.short('B')
.long(SOCKET_ARG_LOCAL_ADDRESS)
.num_args(1)
.value_parser(value_parser!(IpAddr)));
#[cfg(any(target_os = "linux", target_os = "android"))]
add_arg!(Arg::new(SOCKET_ARG_INTERFACE)
.value_name("INTERFACE NAME")
.short('I')
.long(SOCKET_ARG_INTERFACE)
.num_args(1)
.value_parser(value_parser!(InterfaceName))
.conflicts_with(SOCKET_ARG_LOCAL_ADDRESS));
cmd
}
38 changes: 10 additions & 28 deletions g3bench/src/target/h1/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

use std::io;
use std::net::{IpAddr, SocketAddr};
use std::net::SocketAddr;
use std::str::FromStr;
use std::time::Duration;

Expand All @@ -36,12 +36,12 @@ use g3_types::net::{
use super::{BoxHttpForwardConnection, ProcArgs};
use crate::module::openssl::{AppendOpensslArgs, OpensslTlsClientArgs};
use crate::module::proxy_protocol::{AppendProxyProtocolArgs, ProxyProtocolArgs};
use crate::module::socket::{AppendSocketArgs, SocketArgs};

const HTTP_ARG_URL: &str = "url";
const HTTP_ARG_METHOD: &str = "method";
const HTTP_ARG_PROXY: &str = "proxy";
const HTTP_ARG_PROXY_TUNNEL: &str = "proxy-tunnel";
const HTTP_ARG_LOCAL_ADDRESS: &str = "local-address";
const HTTP_ARG_NO_KEEPALIVE: &str = "no-keepalive";
const HTTP_ARG_OK_STATUS: &str = "ok-status";
const HTTP_ARG_TIMEOUT: &str = "timeout";
Expand All @@ -53,13 +53,13 @@ pub(super) struct BenchHttpArgs {
target_url: Url,
forward_proxy: Option<HttpProxy>,
connect_proxy: Option<Proxy>,
bind: Option<IpAddr>,
pub(super) no_keepalive: bool,
pub(super) ok_status: Option<StatusCode>,
pub(super) timeout: Duration,
pub(super) max_header_size: usize,
pub(super) connect_timeout: Duration,

socket: SocketArgs,
target_tls: OpensslTlsClientArgs,
proxy_tls: OpensslTlsClientArgs,
proxy_protocol: ProxyProtocolArgs,
Expand All @@ -85,12 +85,12 @@ impl BenchHttpArgs {
target_url: url,
forward_proxy: None,
connect_proxy: None,
bind: None,
no_keepalive: false,
ok_status: None,
timeout: Duration::from_secs(30),
max_header_size: 4096,
connect_timeout: Duration::from_secs(15),
socket: SocketArgs::default(),
target_tls,
proxy_tls: OpensslTlsClientArgs::default(),
proxy_protocol: ProxyProtocolArgs::default(),
Expand Down Expand Up @@ -126,18 +126,7 @@ impl BenchHttpArgs {
.ok_or_else(|| anyhow!("no peer address set"))?;
let peer = *proc_args.select_peer(addrs);

let socket = g3_socket::tcp::new_socket_to(
peer.ip(),
self.bind,
&Default::default(),
&Default::default(),
!self.no_keepalive,
)
.map_err(|e| anyhow!("failed to setup socket to {peer}: {e:?}"))?;
let mut stream = socket
.connect(peer)
.await
.map_err(|e| anyhow!("connect to {peer} error: {e:?}"))?;
let mut stream = self.socket.tcp_connect_to(peer).await?;

if let Some(data) = self.proxy_protocol.data() {
stream
Expand Down Expand Up @@ -385,14 +374,6 @@ pub(super) fn add_http_args(app: Command) -> Command {
.action(ArgAction::SetTrue)
.help("Use tunnel if the proxy is an HTTP proxy"),
)
.arg(
Arg::new(HTTP_ARG_LOCAL_ADDRESS)
.value_name("LOCAL IP ADDRESS")
.short('B')
.long(HTTP_ARG_LOCAL_ADDRESS)
.num_args(1)
.value_parser(value_parser!(IpAddr)),
)
.arg(
Arg::new(HTTP_ARG_NO_KEEPALIVE)
.help("Disable http keepalive")
Expand Down Expand Up @@ -431,6 +412,7 @@ pub(super) fn add_http_args(app: Command) -> Command {
.long(HTTP_ARG_CONNECT_TIMEOUT)
.num_args(1),
)
.append_socket_args()
.append_openssl_args()
.append_proxy_openssl_args()
.append_proxy_protocol_args()
Expand Down Expand Up @@ -465,10 +447,6 @@ pub(super) fn parse_http_args(args: &ArgMatches) -> anyhow::Result<BenchHttpArgs
}
}

if let Some(ip) = args.get_one::<IpAddr>(HTTP_ARG_LOCAL_ADDRESS) {
h1_args.bind = Some(*ip);
}

if args.get_flag(HTTP_ARG_NO_KEEPALIVE) {
h1_args.no_keepalive = true;
}
Expand All @@ -488,6 +466,10 @@ pub(super) fn parse_http_args(args: &ArgMatches) -> anyhow::Result<BenchHttpArgs
h1_args.connect_timeout = timeout;
}

h1_args
.socket
.parse_args(args)
.context("invalid socket config")?;
h1_args
.target_tls
.parse_tls_args(args)
Expand Down
38 changes: 10 additions & 28 deletions g3bench/src/target/h2/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

use std::net::{IpAddr, SocketAddr};
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -38,12 +38,12 @@ use g3_types::net::{
use super::{H2PreRequest, HttpRuntimeStats, ProcArgs};
use crate::module::openssl::{AppendOpensslArgs, OpensslTlsClientArgs};
use crate::module::proxy_protocol::{AppendProxyProtocolArgs, ProxyProtocolArgs};
use crate::module::socket::{AppendSocketArgs, SocketArgs};

const HTTP_ARG_CONNECTION_POOL: &str = "connection-pool";
const HTTP_ARG_URI: &str = "uri";
const HTTP_ARG_METHOD: &str = "method";
const HTTP_ARG_PROXY: &str = "proxy";
const HTTP_ARG_LOCAL_ADDRESS: &str = "local-address";
const HTTP_ARG_NO_MULTIPLEX: &str = "no-multiplex";
const HTTP_ARG_OK_STATUS: &str = "ok-status";
const HTTP_ARG_TIMEOUT: &str = "timeout";
Expand All @@ -54,12 +54,12 @@ pub(super) struct BenchH2Args {
pub(super) method: Method,
target_url: Url,
connect_proxy: Option<Proxy>,
bind: Option<IpAddr>,
pub(super) no_multiplex: bool,
pub(super) ok_status: Option<StatusCode>,
pub(super) timeout: Duration,
pub(super) connect_timeout: Duration,

socket: SocketArgs,
target_tls: OpensslTlsClientArgs,
proxy_tls: OpensslTlsClientArgs,
proxy_protocol: ProxyProtocolArgs,
Expand All @@ -86,11 +86,11 @@ impl BenchH2Args {
method: Method::GET,
target_url: url,
connect_proxy: None,
bind: None,
no_multiplex: false,
ok_status: None,
timeout: Duration::from_secs(30),
connect_timeout: Duration::from_secs(15),
socket: SocketArgs::default(),
target_tls,
proxy_tls: OpensslTlsClientArgs::default(),
proxy_protocol: ProxyProtocolArgs::default(),
Expand Down Expand Up @@ -121,18 +121,7 @@ impl BenchH2Args {
.ok_or_else(|| anyhow!("no peer address set"))?;
let peer = *proc_args.select_peer(addrs);

let socket = g3_socket::tcp::new_socket_to(
peer.ip(),
self.bind,
&Default::default(),
&Default::default(),
true,
)
.map_err(|e| anyhow!("failed to setup socket to {peer}: {e:?}"))?;
let mut stream = socket
.connect(peer)
.await
.map_err(|e| anyhow!("connect to {peer} error: {e:?}"))?;
let mut stream = self.socket.tcp_connect_to(peer).await?;

if let Some(data) = self.proxy_protocol.data() {
stream
Expand Down Expand Up @@ -384,14 +373,6 @@ pub(super) fn add_h2_args(app: Command) -> Command {
.num_args(1)
.value_name("PROXY URL"),
)
.arg(
Arg::new(HTTP_ARG_LOCAL_ADDRESS)
.value_name("LOCAL IP ADDRESS")
.short('B')
.long(HTTP_ARG_LOCAL_ADDRESS)
.num_args(1)
.value_parser(value_parser!(IpAddr)),
)
.arg(
Arg::new(HTTP_ARG_NO_MULTIPLEX)
.help("Disable h2 connection multiplexing")
Expand Down Expand Up @@ -423,6 +404,7 @@ pub(super) fn add_h2_args(app: Command) -> Command {
.long(HTTP_ARG_CONNECT_TIMEOUT)
.num_args(1),
)
.append_socket_args()
.append_openssl_args()
.append_proxy_openssl_args()
.append_proxy_protocol_args()
Expand Down Expand Up @@ -454,10 +436,6 @@ pub(super) fn parse_h2_args(args: &ArgMatches) -> anyhow::Result<BenchH2Args> {
h2_args.connect_proxy = Some(proxy);
}

if let Some(ip) = args.get_one::<IpAddr>(HTTP_ARG_LOCAL_ADDRESS) {
h2_args.bind = Some(*ip);
}

if args.get_flag(HTTP_ARG_NO_MULTIPLEX) {
h2_args.no_multiplex = true;
}
Expand All @@ -474,6 +452,10 @@ pub(super) fn parse_h2_args(args: &ArgMatches) -> anyhow::Result<BenchH2Args> {
h2_args.connect_timeout = timeout;
}

h2_args
.socket
.parse_args(args)
.context("invalid socket config")?;
h2_args
.target_tls
.parse_tls_args(args)
Expand Down
Loading

0 comments on commit 26ae1c6

Please sign in to comment.