Skip to content

Commit

Permalink
Complete second resiliency test and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
mrattle committed Sep 6, 2024
1 parent b6dfab6 commit 610ddf1
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 77 deletions.
21 changes: 16 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,13 @@ impl Client {
pub(crate) async fn get_write_multi_responses(
&mut self,
num_results: usize,
) -> Result<(), Error> {
) -> Result<Response, Error> {
println!("get_read_write_many_response trigger 1");
let mut results: Vec<Result<(), Error>> = Vec::new();

for _ in 0..num_results {
// The server will send a response for each key set, this parses each response into a Result or Error
// which can be dealt with accordingly.
let result = match self.drive_receive(parse_ascii_response).await? {
Response::Status(Status::Stored) => Ok(()),
Response::Status(s) => Err(s.into()),
Expand All @@ -127,7 +129,7 @@ impl Client {
}

println!("get_read_write_many_response trigger 2");
Ok(())
Ok(Response::Status(Status::Stored))
}

pub(crate) async fn get_metadump_response(&mut self) -> Result<MetadumpResponse, Error> {
Expand Down Expand Up @@ -254,10 +256,15 @@ impl Client {
K: AsRef<[u8]>,
V: AsMemcachedValue,
{
println!("set_multi trigger 1");
let mut kv_iter = kv.into_iter().peekable();

if kv_iter.peek().is_none() {
return Ok(());
}

let mut num_results = 0;

for (key, value) in kv {
for (key, value) in kv_iter {
let kr = key.as_ref();
let vr = value.as_bytes();

Expand All @@ -284,7 +291,11 @@ impl Client {
}
self.conn.flush().await?;

self.get_write_multi_responses(num_results).await
match self.get_write_multi_responses(num_results).await? {
Response::Status(Status::Stored) => Ok(()),
Response::Status(s) => Err(s.into()),
_ => Err(Status::Error(ErrorKind::Protocol(None)).into()),
}
}

/// Add a key. If the value exists, Err(Protocol(NotStored)) is returned.
Expand Down
2 changes: 0 additions & 2 deletions src/parser/ascii.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ fn parse_ascii_data(buf: &[u8]) -> IResult<&[u8], Response> {
}

pub fn parse_ascii_response(buf: &[u8]) -> Result<Option<(usize, Response)>, ErrorKind> {
println!("parse_ascii_response trigger 1");
println!("buf: {:?}", String::from_utf8(buf.to_vec()));
let bufn = buf.len();
let result = alt((
parse_ascii_status,
Expand Down
18 changes: 2 additions & 16 deletions tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,11 @@ async fn test_set_with_u64_value() {

let result = client.set(key, value, None, None).await;

println!("result: {:?}", result);
assert!(result.is_ok());

let result = client.get(key).await;

assert!(result.is_ok());

let get_result = result.unwrap();

println!("get_result: {:?}", get_result);

println!(
"get_result.unwrap().data: {:?}",
String::from_utf8(get_result.unwrap().data).unwrap()
);
}

#[ignore = "Relies on a running memcached server"]
Expand Down Expand Up @@ -225,9 +215,7 @@ async fn test_set_multi_with_string_values() {

let mut client = setup_client(keys).await;

let response = client.set_multi(kv, None, None).await;

println!("response: {:?}", response);
let _ = client.set_multi(kv, None, None).await;

let result = client.get("key2").await;

Expand All @@ -248,9 +236,7 @@ async fn test_set_multi_with_large_string_values() {

let mut client = setup_client(keys).await;

let response = client.set_multi(kv, None, None).await;

println!("response: {:?}", response);
let _ = client.set_multi(kv, None, None).await;

let result = client.get("key2").await;

Expand Down
110 changes: 56 additions & 54 deletions tests/resiliency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,58 +124,60 @@ mod tests {
});
}

// #[test]
// fn test_set_multi_errors_with_toxiproxy_via_limit_data() {
// let rt = tokio::runtime::Builder::new_multi_thread()
// .enable_all()
// .build()
// .unwrap();

// let (proxies, toxic_local_addr) = create_proxies_and_configs();

// let toxic_local_url = "tcp://".to_string() + &toxic_local_addr;
// let toxic_proxy = &proxies[0];
// let keys = vec!["newkey1", "newkey2", "newkey3"];
// let values = vec!["value1", "value2", "value3"];
// let kv: Vec<(&str, &str)> = keys.clone().into_iter().zip(values.clone()).collect();

// let mut clean_client = rt.block_on(async {
// async_memcached::Client::new("tcp://127.0.0.1:11211".to_string()).await.unwrap()
// });

// let mut toxic_client = rt.block_on(async {
// async_memcached::Client::new(toxic_local_url).await.unwrap()
// });

// let result = rt.block_on(async {
// clean_client.set_multi(kv, None, None).await
// });

// assert!(result.is_ok());

// for key in &keys {
// let _ = rt.block_on(async { clean_client.delete(key).await });
// let result = rt.block_on(async { clean_client.get(key).await });
// assert_eq!(result, Ok(None));
// }

// // let byte_limit = format!(
// // "set {} 0 0 1\r\n{}\r\n",
// // keys.first().unwrap(),
// // values.first().unwrap()
// // ).as_bytes().len()- 1;

// let byte_limit = 3;
// println!("byte_limit: {}", byte_limit);

// let _ = toxic_proxy.with_limit_data("downstream".into(), byte_limit as u32, 1.0)
// .apply(|| {
// rt.block_on(async {
// let kv: Vec<(&str, &str)> = keys.clone().into_iter().zip(values.clone()).collect();
// let result = toxic_client.set_multi(kv.clone(), None, None).await;
// println!("result: {:?}", result);
// assert_eq!(result, Err(async_memcached::Error::Io(std::io::ErrorKind::UnexpectedEof.into())));
// });
// });
// }
#[ignore = "Relies on a running memcached server and toxiproxy service"]
#[test]
fn test_set_multi_errors_with_toxiproxy_via_limit_data() {
// This test simulates a network error where the response from the server is cut short
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

let (proxies, toxic_local_addr) = create_proxies_and_configs();

let toxic_local_url = "tcp://".to_string() + &toxic_local_addr;
let toxic_proxy = &proxies[0];
let keys = vec!["newkey1", "newkey2", "newkey3"];
let values = vec!["value1", "value2", "value3"];
let kv: Vec<(&str, &str)> = keys.clone().into_iter().zip(values.clone()).collect();

let mut clean_client = rt.block_on(async {
async_memcached::Client::new("tcp://127.0.0.1:11211".to_string())
.await
.unwrap()
});

let mut toxic_client =
rt.block_on(async { async_memcached::Client::new(toxic_local_url).await.unwrap() });

let result = rt.block_on(async { clean_client.set_multi(kv, None, None).await });

assert!(result.is_ok());

for key in &keys {
let _ = rt.block_on(async { clean_client.delete(key).await });
let result = rt.block_on(async { clean_client.get(key).await });
assert_eq!(result, Ok(None));
}

// Simulate a network error where a complete response is received for the first key but then
// the connection is closed before the other responses are received.
let byte_limit = "STORED\r\n".as_bytes().len() + 1;

let _ = toxic_proxy
.with_limit_data("downstream".into(), byte_limit as u32, 1.0)
.apply(|| {
rt.block_on(async {
let kv: Vec<(&str, &str)> =
keys.clone().into_iter().zip(values.clone()).collect();
let result = toxic_client.set_multi(kv.clone(), None, None).await;
assert_eq!(
result,
Err(async_memcached::Error::Io(
std::io::ErrorKind::UnexpectedEof.into()
))
);
});
});
}
}

0 comments on commit 610ddf1

Please sign in to comment.