From 610ddf10fbd3beab32a01eef75de400cda58abd7 Mon Sep 17 00:00:00 2001 From: Mark Rattle Date: Fri, 6 Sep 2024 15:41:26 -0400 Subject: [PATCH] Complete second resiliency test and cleanup --- src/lib.rs | 21 +++++++-- src/parser/ascii.rs | 2 - tests/lib.rs | 18 +------- tests/resiliency.rs | 110 ++++++++++++++++++++++---------------------- 4 files changed, 74 insertions(+), 77 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b9607b6..904b987 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -101,11 +101,13 @@ impl Client { pub(crate) async fn get_write_multi_responses( &mut self, num_results: usize, - ) -> Result<(), Error> { + ) -> Result { println!("get_read_write_many_response trigger 1"); let mut results: Vec> = Vec::new(); for _ in 0..num_results { + // The server will send a response for each key set, this parses each response into a Result or Error + // which can be dealt with accordingly. let result = match self.drive_receive(parse_ascii_response).await? { Response::Status(Status::Stored) => Ok(()), Response::Status(s) => Err(s.into()), @@ -127,7 +129,7 @@ impl Client { } println!("get_read_write_many_response trigger 2"); - Ok(()) + Ok(Response::Status(Status::Stored)) } pub(crate) async fn get_metadump_response(&mut self) -> Result { @@ -254,10 +256,15 @@ impl Client { K: AsRef<[u8]>, V: AsMemcachedValue, { - println!("set_multi trigger 1"); + let mut kv_iter = kv.into_iter().peekable(); + + if kv_iter.peek().is_none() { + return Ok(()); + } + let mut num_results = 0; - for (key, value) in kv { + for (key, value) in kv_iter { let kr = key.as_ref(); let vr = value.as_bytes(); @@ -284,7 +291,11 @@ impl Client { } self.conn.flush().await?; - self.get_write_multi_responses(num_results).await + match self.get_write_multi_responses(num_results).await? { + Response::Status(Status::Stored) => Ok(()), + Response::Status(s) => Err(s.into()), + _ => Err(Status::Error(ErrorKind::Protocol(None)).into()), + } } /// Add a key. If the value exists, Err(Protocol(NotStored)) is returned. diff --git a/src/parser/ascii.rs b/src/parser/ascii.rs index 21b1531..4837e60 100644 --- a/src/parser/ascii.rs +++ b/src/parser/ascii.rs @@ -119,8 +119,6 @@ fn parse_ascii_data(buf: &[u8]) -> IResult<&[u8], Response> { } pub fn parse_ascii_response(buf: &[u8]) -> Result, ErrorKind> { - println!("parse_ascii_response trigger 1"); - println!("buf: {:?}", String::from_utf8(buf.to_vec())); let bufn = buf.len(); let result = alt(( parse_ascii_status, diff --git a/tests/lib.rs b/tests/lib.rs index a3cb9bf..0ca4e5c 100644 --- a/tests/lib.rs +++ b/tests/lib.rs @@ -118,21 +118,11 @@ async fn test_set_with_u64_value() { let result = client.set(key, value, None, None).await; - println!("result: {:?}", result); assert!(result.is_ok()); let result = client.get(key).await; assert!(result.is_ok()); - - let get_result = result.unwrap(); - - println!("get_result: {:?}", get_result); - - println!( - "get_result.unwrap().data: {:?}", - String::from_utf8(get_result.unwrap().data).unwrap() - ); } #[ignore = "Relies on a running memcached server"] @@ -225,9 +215,7 @@ async fn test_set_multi_with_string_values() { let mut client = setup_client(keys).await; - let response = client.set_multi(kv, None, None).await; - - println!("response: {:?}", response); + let _ = client.set_multi(kv, None, None).await; let result = client.get("key2").await; @@ -248,9 +236,7 @@ async fn test_set_multi_with_large_string_values() { let mut client = setup_client(keys).await; - let response = client.set_multi(kv, None, None).await; - - println!("response: {:?}", response); + let _ = client.set_multi(kv, None, None).await; let result = client.get("key2").await; diff --git a/tests/resiliency.rs b/tests/resiliency.rs index c583f28..7a58545 100644 --- a/tests/resiliency.rs +++ b/tests/resiliency.rs @@ -124,58 +124,60 @@ mod tests { }); } - // #[test] - // fn test_set_multi_errors_with_toxiproxy_via_limit_data() { - // let rt = tokio::runtime::Builder::new_multi_thread() - // .enable_all() - // .build() - // .unwrap(); - - // let (proxies, toxic_local_addr) = create_proxies_and_configs(); - - // let toxic_local_url = "tcp://".to_string() + &toxic_local_addr; - // let toxic_proxy = &proxies[0]; - // let keys = vec!["newkey1", "newkey2", "newkey3"]; - // let values = vec!["value1", "value2", "value3"]; - // let kv: Vec<(&str, &str)> = keys.clone().into_iter().zip(values.clone()).collect(); - - // let mut clean_client = rt.block_on(async { - // async_memcached::Client::new("tcp://127.0.0.1:11211".to_string()).await.unwrap() - // }); - - // let mut toxic_client = rt.block_on(async { - // async_memcached::Client::new(toxic_local_url).await.unwrap() - // }); - - // let result = rt.block_on(async { - // clean_client.set_multi(kv, None, None).await - // }); - - // assert!(result.is_ok()); - - // for key in &keys { - // let _ = rt.block_on(async { clean_client.delete(key).await }); - // let result = rt.block_on(async { clean_client.get(key).await }); - // assert_eq!(result, Ok(None)); - // } - - // // let byte_limit = format!( - // // "set {} 0 0 1\r\n{}\r\n", - // // keys.first().unwrap(), - // // values.first().unwrap() - // // ).as_bytes().len()- 1; - - // let byte_limit = 3; - // println!("byte_limit: {}", byte_limit); - - // let _ = toxic_proxy.with_limit_data("downstream".into(), byte_limit as u32, 1.0) - // .apply(|| { - // rt.block_on(async { - // let kv: Vec<(&str, &str)> = keys.clone().into_iter().zip(values.clone()).collect(); - // let result = toxic_client.set_multi(kv.clone(), None, None).await; - // println!("result: {:?}", result); - // assert_eq!(result, Err(async_memcached::Error::Io(std::io::ErrorKind::UnexpectedEof.into()))); - // }); - // }); - // } + #[ignore = "Relies on a running memcached server and toxiproxy service"] + #[test] + fn test_set_multi_errors_with_toxiproxy_via_limit_data() { + // This test simulates a network error where the response from the server is cut short + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let (proxies, toxic_local_addr) = create_proxies_and_configs(); + + let toxic_local_url = "tcp://".to_string() + &toxic_local_addr; + let toxic_proxy = &proxies[0]; + let keys = vec!["newkey1", "newkey2", "newkey3"]; + let values = vec!["value1", "value2", "value3"]; + let kv: Vec<(&str, &str)> = keys.clone().into_iter().zip(values.clone()).collect(); + + let mut clean_client = rt.block_on(async { + async_memcached::Client::new("tcp://127.0.0.1:11211".to_string()) + .await + .unwrap() + }); + + let mut toxic_client = + rt.block_on(async { async_memcached::Client::new(toxic_local_url).await.unwrap() }); + + let result = rt.block_on(async { clean_client.set_multi(kv, None, None).await }); + + assert!(result.is_ok()); + + for key in &keys { + let _ = rt.block_on(async { clean_client.delete(key).await }); + let result = rt.block_on(async { clean_client.get(key).await }); + assert_eq!(result, Ok(None)); + } + + // Simulate a network error where a complete response is received for the first key but then + // the connection is closed before the other responses are received. + let byte_limit = "STORED\r\n".as_bytes().len() + 1; + + let _ = toxic_proxy + .with_limit_data("downstream".into(), byte_limit as u32, 1.0) + .apply(|| { + rt.block_on(async { + let kv: Vec<(&str, &str)> = + keys.clone().into_iter().zip(values.clone()).collect(); + let result = toxic_client.set_multi(kv.clone(), None, None).await; + assert_eq!( + result, + Err(async_memcached::Error::Io( + std::io::ErrorKind::UnexpectedEof.into() + )) + ); + }); + }); + } }