From 29b2adb656886774ffb3e87caeb9c319b26f1fb0 Mon Sep 17 00:00:00 2001 From: Mark Rattle Date: Wed, 28 Aug 2024 06:57:01 -0400 Subject: [PATCH] remove scratch files --- benches/bench.rs | 53 +- src/lib.rs | 45 ++ src/lib_with_parseinput.rs | 756 ----------------------------- src/lib_with_setadd_enum.rs | 744 ---------------------------- 4 files changed, 89 insertions(+), 1509 deletions(-) delete mode 100644 src/lib_with_parseinput.rs delete mode 100644 src/lib_with_setadd_enum.rs diff --git a/benches/bench.rs b/benches/bench.rs index f9a4cd9..46ff59b 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -33,10 +33,10 @@ fn bench_get(c: &mut Criterion) { }); } -fn bench_set_with_string(c: &mut Criterion) { +fn bench_parsed_set_with_string(c: &mut Criterion) { let rt = Runtime::new().unwrap(); - c.bench_function("set_small_string_with_parse_input", |b| { + c.bench_function("new_set_small_with_string", |b| { b.to_async(&rt).iter_custom(|iters| async move { let mut client = setup_client().await; let start = std::time::Instant::now(); @@ -48,10 +48,10 @@ fn bench_set_with_string(c: &mut Criterion) { }); } -fn bench_set_with_u64(c: &mut Criterion) { +fn bench_parsed_set_with_u64(c: &mut Criterion) { let rt = Runtime::new().unwrap(); - c.bench_function("set_small_int", |b| { + c.bench_function("new_set_small_with_int", |b| { b.to_async(&rt).iter_custom(|iters| async move { let mut client = setup_client().await; let start = std::time::Instant::now(); @@ -63,6 +63,21 @@ fn bench_set_with_u64(c: &mut Criterion) { }); } +fn bench_original_set_with_string(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + c.bench_function("original_set_small_string", |b| { + b.to_async(&rt).iter_custom(|iters| async move { + let mut client = setup_client().await; + let start = std::time::Instant::now(); + for _ in 0..iters { + let _ = client.original_set("foo", "bar", None, None).await; + } + start.elapsed() + }); + }); +} + fn bench_get_many(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let keys = &["foo", "bar", "baz"]; @@ -89,10 +104,10 @@ fn bench_get_many(c: &mut Criterion) { }); } -fn bench_set_large(c: &mut Criterion) { +fn bench_new_set_large_with_string(c: &mut Criterion) { let rt = Runtime::new().unwrap(); - c.bench_function("set_large", |b| { + c.bench_function("new_set_large_with_string", |b| { b.to_async(&rt).iter_custom(|iters| async move { let mut client = setup_client().await; let large_payload = "a".repeat(LARGE_PAYLOAD_SIZE); @@ -107,6 +122,24 @@ fn bench_set_large(c: &mut Criterion) { }); } +fn bench_original_set_large_with_string(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + c.bench_function("original_set_large_with_string", |b| { + b.to_async(&rt).iter_custom(|iters| async move { + let mut client = setup_client().await; + let large_payload = "a".repeat(LARGE_PAYLOAD_SIZE); + let start = std::time::Instant::now(); + for _ in 0..iters { + let _ = client + .original_set("large_foo", large_payload.as_str(), None, None) + .await; + } + start.elapsed() + }); + }); +} + fn bench_get_large(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let large_payload = "a".repeat(LARGE_PAYLOAD_SIZE); @@ -181,10 +214,12 @@ fn bench_increment(c: &mut Criterion) { criterion_group!( benches, bench_get, - bench_set_with_string, - bench_set_with_u64, + bench_parsed_set_with_string, + bench_parsed_set_with_u64, + bench_original_set_with_string, bench_get_many, - bench_set_large, + bench_new_set_large_with_string, + bench_original_set_large_with_string, bench_get_large, bench_get_many_large, bench_increment, diff --git a/src/lib.rs b/src/lib.rs index 3eeeef7..b99dd69 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -205,6 +205,51 @@ impl Client { } } + /// Sets the given key. + /// + /// If `ttl` or `flags` are not specified, they will default to 0. If the value is set + /// successfully, `()` is returned, otherwise [`Error`] is returned. + pub async fn original_set( + &mut self, + key: K, + value: V, + ttl: Option, + flags: Option, + ) -> Result<(), Error> + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + { + let kr = key.as_ref(); + let vr = value.as_ref(); + + self.conn.write_all(b"set ").await?; + self.conn.write_all(kr).await?; + + let flags = flags.unwrap_or(0).to_string(); + self.conn.write_all(b" ").await?; + self.conn.write_all(flags.as_ref()).await?; + + let ttl = ttl.unwrap_or(0).to_string(); + self.conn.write_all(b" ").await?; + self.conn.write_all(ttl.as_ref()).await?; + + self.conn.write_all(b" ").await?; + let vlen = vr.len().to_string(); + self.conn.write_all(vlen.as_ref()).await?; + self.conn.write_all(b"\r\n").await?; + + self.conn.write_all(vr).await?; + self.conn.write_all(b"\r\n").await?; + self.conn.flush().await?; + + match self.get_read_write_response().await? { + Response::Status(Status::Stored) => Ok(()), + Response::Status(s) => Err(s.into()), + _ => Err(Status::Error(ErrorKind::Protocol(None)).into()), + } + } + /// Add a key. If the value exists, Err(Protocol(NotStored)) is returned. pub async fn add( &mut self, diff --git a/src/lib_with_parseinput.rs b/src/lib_with_parseinput.rs deleted file mode 100644 index ae6617f..0000000 --- a/src/lib_with_parseinput.rs +++ /dev/null @@ -1,756 +0,0 @@ -//! A Tokio-based memcached client. -#![deny(warnings, missing_docs)] -use std::collections::HashMap; - -use bytes::BytesMut; -use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt}; - -mod connection; -use self::connection::Connection; - -mod error; -pub use self::error::Error; - -mod parser; -use self::parser::{ - parse_ascii_metadump_response, parse_ascii_response, parse_ascii_stats_response, Response, -}; -pub use self::parser::{ErrorKind, KeyMetadata, MetadumpResponse, StatsResponse, Status, Value}; - -/// High-level memcached client. -/// -/// [`Client`] is mapped one-to-one with a given connection to a memcached server, and provides a -/// high-level API for executing commands on that connection. -pub struct Client { - buf: BytesMut, - last_read_n: Option, - conn: Connection, -} - -/// A trait for parsing input of either u8 or str into ascii. -pub trait ParseInput { - /// Parses the input into the target type. - fn parse_input(&self) -> Vec; -} - -impl ParseInput for u64 { - fn parse_input(&self) -> Vec { - self.to_string().into_bytes() - } -} - -impl ParseInput for String { - fn parse_input(&self) -> Vec { - self.as_bytes().to_vec() - } -} - -impl Client { - /// Creates a new [`Client`] based on the given data source string. - /// - /// Supports UNIX domain sockets and TCP connections. - /// For TCP: the DSN should be in the format of `tcp://:` or `:`. - /// For UNIX: the DSN should be in the format of `unix://`. - pub async fn new>(dsn: S) -> Result { - let connection = Connection::new(dsn).await?; - - Ok(Client { - buf: BytesMut::new(), - last_read_n: None, - conn: connection, - }) - } - - pub(crate) async fn drive_receive(&mut self, op: F) -> Result - where - F: Fn(&[u8]) -> Result, ErrorKind>, - { - // If we serviced a previous request, advance our buffer forward. - if let Some(n) = self.last_read_n { - let _ = self.buf.split_to(n); - } - - let mut needs_more_data = false; - loop { - if self.buf.is_empty() || needs_more_data { - match self.conn { - Connection::Tcp(ref mut s) => { - self.buf.reserve(1024); - let n = s.read_buf(&mut self.buf).await?; - if n == 0 { - return Err(Error::Io(std::io::ErrorKind::UnexpectedEof.into())); - } - } - Connection::Unix(ref mut s) => { - self.buf.reserve(1024); - let n = s.read_buf(&mut self.buf).await?; - if n == 0 { - return Err(Error::Io(std::io::ErrorKind::UnexpectedEof.into())); - } - } - } - } - - // Try and parse out a response. - match op(&self.buf) { - // We got a response. - Ok(Some((n, response))) => { - self.last_read_n = Some(n); - return Ok(response); - } - // We didn't have enough data, so loop around and try again. - Ok(None) => { - needs_more_data = true; - continue; - } - // Invalid data not matching the protocol. - Err(kind) => return Err(Status::Error(kind).into()), - } - } - } - - pub(crate) async fn get_read_write_response(&mut self) -> Result { - self.drive_receive(parse_ascii_response).await - } - - pub(crate) async fn get_incrdecr_response(&mut self) -> Result { - self.drive_receive(parse_ascii_response).await - } - - pub(crate) async fn get_metadump_response(&mut self) -> Result { - self.drive_receive(parse_ascii_metadump_response).await - } - - pub(crate) async fn get_stats_response(&mut self) -> Result { - self.drive_receive(parse_ascii_stats_response).await - } - - /// Gets the given key. - /// - /// If the key is found, `Some(Value)` is returned, describing the metadata and data of the key. - /// - /// Otherwise, [`Error`] is returned. - pub async fn get>(&mut self, key: K) -> Result, Error> { - self.conn.write_all(b"get ").await?; - self.conn.write_all(key.as_ref()).await?; - self.conn.write_all(b"\r\n").await?; - self.conn.flush().await?; - - match self.get_read_write_response().await? { - Response::Status(Status::NotFound) => Ok(None), - Response::Status(s) => Err(s.into()), - Response::Data(d) => d - .map(|mut items| { - if items.len() != 1 { - Err(Status::Error(ErrorKind::Protocol(None)).into()) - } else { - Ok(items.remove(0)) - } - }) - .transpose(), - _ => Err(Error::Protocol(Status::Error(ErrorKind::Protocol(None)))), - } - } - - /// Gets the given keys. - /// - /// If any of the keys are found, a vector of [`Value`] will be returned, where [`Value`] - /// describes the metadata and data of the key. - /// - /// Otherwise, [`Error`] is returned. - pub async fn get_many(&mut self, keys: I) -> Result, Error> - where - I: IntoIterator, - K: AsRef<[u8]>, - { - self.conn.write_all(b"get ").await?; - for key in keys.into_iter() { - self.conn.write_all(key.as_ref()).await?; - self.conn.write_all(b" ").await?; - } - self.conn.write_all(b"\r\n").await?; - self.conn.flush().await?; - - match self.get_read_write_response().await? { - Response::Status(s) => Err(s.into()), - Response::Data(d) => d.ok_or(Status::NotFound.into()), - _ => Err(Status::Error(ErrorKind::Protocol(None)).into()), - } - } - - /// Sets the given key. - /// - /// If `ttl` or `flags` are not specified, they will default to 0. If the value is set - /// successfully, `()` is returned, otherwise [`Error`] is returned. - pub async fn set( - &mut self, - key: K, - value: V, - ttl: Option, - flags: Option, - ) -> Result<(), Error> - where - K: AsRef<[u8]>, - V: ParseInput, - { - let kr = key.as_ref(); - let vr = value.parse_input(); - - self.conn.write_all(b"set ").await?; - self.conn.write_all(kr).await?; - - let flags = flags.unwrap_or(0).to_string(); - self.conn.write_all(b" ").await?; - self.conn.write_all(flags.as_ref()).await?; - - let ttl = ttl.unwrap_or(0).to_string(); - self.conn.write_all(b" ").await?; - self.conn.write_all(ttl.as_ref()).await?; - - self.conn.write_all(b" ").await?; - let vlen = vr.len().to_string(); - self.conn.write_all(vlen.as_ref()).await?; - self.conn.write_all(b"\r\n").await?; - - self.conn.write_all(&vr).await?; - self.conn.write_all(b"\r\n").await?; - self.conn.flush().await?; - - match self.get_read_write_response().await? { - Response::Status(Status::Stored) => Ok(()), - Response::Status(s) => Err(s.into()), - _ => Err(Status::Error(ErrorKind::Protocol(None)).into()), - } - } - - /// Add a key. If the value exists, Err(Protocol(NotStored)) is returned. - pub async fn add( - &mut self, - key: K, - value: V, - ttl: Option, - flags: Option, - ) -> Result<(), Error> - where - K: AsRef<[u8]>, - V: AsRef<[u8]>, - { - let kr = key.as_ref(); - let vr = value.as_ref(); - - self.conn - .write_all( - &[ - b"add ", - kr, - b" ", - flags.unwrap_or(0).to_string().as_ref(), - b" ", - ttl.unwrap_or(0).to_string().as_ref(), - b" ", - vr.len().to_string().as_ref(), - b"\r\n", - vr, - b"\r\n", - ] - .concat(), - ) - .await?; - self.conn.flush().await?; - - match self.get_read_write_response().await? { - Response::Status(Status::Stored) => Ok(()), - Response::Status(s) => Err(s.into()), - _ => Err(Status::Error(ErrorKind::Protocol(None)).into()), - } - } - - /// Delete a key but don't wait for a reply. - pub async fn delete_no_reply(&mut self, key: K) -> Result<(), Error> - where - K: AsRef<[u8]>, - { - let kr = key.as_ref(); - - self.conn - .write_all(&[b"delete ", kr, b" noreply\r\n"].concat()) - .await?; - self.conn.flush().await?; - Ok(()) - } - - /// Delete a key and wait for a reply - pub async fn delete(&mut self, key: K) -> Result<(), Error> - where - K: AsRef<[u8]>, - { - let kr = key.as_ref(); - - self.conn - .write_all(&[b"delete ", kr, b"\r\n"].concat()) - .await?; - self.conn.flush().await?; - - match self.get_read_write_response().await? { - Response::Status(Status::Deleted) => Ok(()), - Response::Status(s) => Err(s.into()), - _ => Err(Status::Error(ErrorKind::Protocol(None)).into()), - } - } - - /// Increments the given key by the specified amount. - /// Can overflow from the max value of u64 (18446744073709551615) -> 0. - /// Returns the new value of the key if key exists, otherwise returns KeyNotFound error. - pub async fn increment(&mut self, key: K, amount: u64) -> Result - where - K: AsRef<[u8]>, - { - self.conn - .write_all( - &[ - b"incr ", - key.as_ref(), - b" ", - amount.to_string().as_bytes(), - b"\r\n", - ] - .concat(), - ) - .await?; - self.conn.flush().await?; - - match self.get_incrdecr_response().await? { - Response::Status(Status::NotFound) => Err(Error::KeyNotFound), - Response::Status(s) => Err(s.into()), - Response::IncrDecr(amount) => Ok(amount), - _ => Err(Error::Protocol(Status::Error(ErrorKind::Protocol(None)))), - } - } - - /// Increments the given key by the specified amount with no reply from the server. - /// /// Can overflow from the max value of u64 (18446744073709551615) -> 0. - pub async fn increment_no_reply(&mut self, key: K, amount: u64) -> Result<(), Error> - where - K: AsRef<[u8]>, - { - self.conn - .write_all( - &[ - b"incr ", - key.as_ref(), - b" ", - amount.to_string().as_bytes(), - b" noreply\r\n", - ] - .concat(), - ) - .await?; - self.conn.flush().await?; - - Ok(()) - } - - /// Decrements the given key by the specified amount. - /// Will not decrement the counter below 0. - pub async fn decrement(&mut self, key: K, amount: u64) -> Result - where - K: AsRef<[u8]>, - { - self.conn - .write_all( - &[ - b"decr ", - key.as_ref(), - b" ", - amount.to_string().as_bytes(), - b"\r\n", - ] - .concat(), - ) - .await?; - self.conn.flush().await?; - - match self.get_incrdecr_response().await? { - Response::Status(Status::NotFound) => Err(Error::KeyNotFound), - Response::Status(s) => Err(s.into()), - Response::IncrDecr(amount) => Ok(amount), - _ => Err(Error::Protocol(Status::Error(ErrorKind::Protocol(None)))), - } - } - - /// Decrements the given key by the specified amount with no reply from the server. - /// Will not decrement the counter below 0. - /// Returns the new value of the key if key exists, otherwise returns KeyNotFound error. - pub async fn decrement_no_reply(&mut self, key: K, amount: u64) -> Result<(), Error> - where - K: AsRef<[u8]>, - { - self.conn - .write_all( - &[ - b"decr ", - key.as_ref(), - b" ", - amount.to_string().as_bytes(), - b" noreply\r\n", - ] - .concat(), - ) - .await?; - self.conn.flush().await?; - - Ok(()) - } - - /// Gets the version of the server. - /// - /// If the version is retrieved successfully, `String` is returned containing the version - /// component e.g. `1.6.7`, otherwise [`Error`] is returned. - /// - /// For some setups, such as those using Twemproxy, this will return an error as those - /// intermediate proxies do not support the version command. - pub async fn version(&mut self) -> Result { - self.conn.write_all(b"version\r\n").await?; - self.conn.flush().await?; - - let mut version = String::new(); - let bytes = self.conn.read_line(&mut version).await?; - - // Peel off the leading "VERSION " header. - if bytes >= 8 && version.is_char_boundary(8) { - Ok(version.split_off(8)) - } else { - Err(Error::from(Status::Error(ErrorKind::Protocol(Some( - format!("Invalid response for `version` command: `{version}`"), - ))))) - } - } - - /// Dumps all keys from the server. - /// - /// This operation scans all slab classes from tail to head, in a non-blocking fashion. Thus, - /// not all items will be found as new items could be inserted or deleted while the crawler is - /// still running. - /// - /// [`MetadumpIter`] must be iterated over to discover whether or not the crawler successfully - /// started, as this call will only return [`Error`] if the command failed to be written to the - /// server at all. - /// - /// Available as of memcached 1.4.31. - pub async fn dump_keys(&mut self) -> Result, Error> { - self.conn.write_all(b"lru_crawler metadump all\r\n").await?; - self.conn.flush().await?; - - Ok(MetadumpIter { - client: self, - done: false, - }) - } - - /// Collects statistics from the server. - /// - /// The statistics that may be returned are detailed in the protocol specification for - /// memcached, but all values returned by this method are returned as strings and are not - /// further interpreted or validated for conformity. - pub async fn stats(&mut self) -> Result, Error> { - let mut entries = HashMap::new(); - - self.conn.write_all(b"stats\r\n").await?; - self.conn.flush().await?; - - while let StatsResponse::Entry(key, value) = self.get_stats_response().await? { - entries.insert(key, value); - } - - Ok(entries) - } -} - -/// Asynchronous iterator for metadump operations. -pub struct MetadumpIter<'a> { - client: &'a mut Client, - done: bool, -} - -impl<'a> MetadumpIter<'a> { - /// Gets the next result for the current operation. - /// - /// If there is another key in the dump, `Some(Ok(KeyMetadata))` will be returned. If there was - /// an error while attempting to start the metadump operation, or if there was a general - /// network/protocol-level error, `Some(Err(Error))` will be returned. - /// - /// Otherwise, `None` will be returned and signals the end of the iterator. Subsequent calls - /// will return `None`. - pub async fn next(&mut self) -> Option> { - if self.done { - return None; - } - - match self.client.get_metadump_response().await { - Ok(MetadumpResponse::End) => { - self.done = true; - None - } - Ok(MetadumpResponse::BadClass(s)) => { - self.done = true; - Some(Err(Error::Protocol(MetadumpResponse::BadClass(s).into()))) - } - Ok(MetadumpResponse::Busy(s)) => { - Some(Err(Error::Protocol(MetadumpResponse::Busy(s).into()))) - } - Ok(MetadumpResponse::Entry(km)) => Some(Ok(km)), - Err(e) => Some(Err(e)), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_add() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "async-memcache-test-key-add"; - - let result = client.delete_no_reply(key).await; - assert!(result.is_ok(), "failed to delete {}, {:?}", key, result); - - let result = client.add(key, "value", None, None).await; - - assert!(result.is_ok()); - } - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_delete() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "async-memcache-test-key-delete"; - - let value = rand::random::().to_string(); - let result = client.set(key, value.clone(), None, None).await; - - assert!(result.is_ok(), "failed to set {}, {:?}", key, result); - - let result = client.get(key).await; - - assert!(result.is_ok(), "failed to get {}, {:?}", key, result); - let get_result = result.unwrap(); - - match get_result { - Some(get_value) => assert_eq!( - String::from_utf8(get_value.data).expect("failed to parse a string"), - value - ), - None => panic!("failed to get {}", key), - } - - let result = client.delete(key).await; - - assert!(result.is_ok(), "failed to delete {}, {:?}", key, result); - } - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_delete_no_reply() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "async-memcache-test-key-delete-no-reply"; - - let value = rand::random::().to_string(); - let result = client.set(key, value.clone(), None, None).await; - - assert!(result.is_ok(), "failed to set {}, {:?}", key, result); - - let result = client.get(key).await; - - assert!(result.is_ok(), "failed to get {}, {:?}", key, result); - let get_result = result.unwrap(); - - match get_result { - Some(get_value) => assert_eq!( - String::from_utf8(get_value.data).expect("failed to parse a string"), - value - ), - None => panic!("failed to get {}", key), - } - - let result = client.delete_no_reply(key).await; - - assert!(result.is_ok(), "failed to delete {}, {:?}", key, result); - } - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_increment_raises_error_when_key_doesnt_exist() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "key-does-not-exist"; - let amount = 1; - - let result = client.increment(key, amount).await; - - assert!(result.is_err()); - assert!(matches!(result, Err(Error::KeyNotFound))) - } - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_increments_existing_key() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "key-to-increment"; - let value = 1; - - let _ = client.set(key, value, None, None).await; - - let amount = 1; - - let result = client.increment(key, amount).await; - - println!("result: {:?}", result); - - assert!(result.is_ok()); - assert_eq!(Ok(2), result); - } - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_increment_can_overflow() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "key-to-increment-overflow"; - let value = u64::MAX.to_string(); // max value for u64 - - let _ = client.set(key, value, None, None).await; - - let amount = 1; - - // First increment should overflow - let result = client.increment(key, amount).await; - - println!("result: {:?}", result); - - assert!(result.is_ok()); - assert_eq!(Ok(0), result); - - // Subsequent increments should work as normal - let result = client.increment(key, amount).await; - - println!("result: {:?}", result); - - assert!(result.is_ok()); - assert_eq!(Ok(1), result); - } - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_increments_existing_key_with_no_reply() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "key-to-increment-no-reply"; - let value = "1"; - - let _ = client.set(key, value.to_string(), None, None).await; - - let amount = 1; - - let result = client.increment_no_reply(key, amount).await; - - assert!(result.is_ok()); - assert_eq!(Ok(()), result); - } - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_decrement_raises_error_when_key_doesnt_exist() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "fails-to-decrement"; - let amount = 1; - - let result = client.decrement(key, amount).await; - - assert!(result.is_err()); - assert!(matches!(result, Err(Error::KeyNotFound))) - } - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_decrements_existing_key() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "key-to-decrement"; - let value = 2; - - let _ = client.set(key, value, None, None).await; - - let amount = 1; - - let result = client.decrement(key, amount).await; - - assert!(result.is_ok()); - assert_eq!(Ok(1), result); - } - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_decrement_does_not_reduce_value_below_zero() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "key-to-decrement-past-zero"; - let value = 0; - - let _ = client.set(key, value, None, None).await; - - let amount = 1; - - let result = client.decrement(key, amount).await; - - assert!(result.is_ok()); - assert_eq!(Ok(0), result); - } - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_decrements_existing_key_with_no_reply() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "key-to-decrement-no-reply"; - let value = 1; - - let _ = client.set(key, value, None, None).await; - - let amount = 1; - - let result = client.decrement_no_reply(key, amount).await; - - assert!(result.is_ok()); - assert_eq!(Ok(()), result); - } -} diff --git a/src/lib_with_setadd_enum.rs b/src/lib_with_setadd_enum.rs deleted file mode 100644 index aade759..0000000 --- a/src/lib_with_setadd_enum.rs +++ /dev/null @@ -1,744 +0,0 @@ -//! A Tokio-based memcached client. -#![deny(warnings, missing_docs)] -use std::collections::HashMap; - -use bytes::BytesMut; -use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt}; - -mod connection; -use self::connection::Connection; - -mod error; -pub use self::error::Error; - -mod parser; -use self::parser::{ - parse_ascii_metadump_response, parse_ascii_response, parse_ascii_stats_response, Response, -}; -pub use self::parser::{ErrorKind, KeyMetadata, MetadumpResponse, StatsResponse, Status, Value}; - -/// High-level memcached client. -/// -/// [`Client`] is mapped one-to-one with a given connection to a memcached server, and provides a -/// high-level API for executing commands on that connection. -pub struct Client<'a> { - buf: BytesMut, - last_read_n: Option, - conn: Connection, - _phantom: std::marker::PhantomData<&'a ()>, -} - -enum SetAddInput<'a> { - Str(&'a str), - Int(u8), -} - -impl<'a> Client<'a> { - /// Creates a new [`Client`] based on the given data source string. - /// - /// Supports UNIX domain sockets and TCP connections. - /// For TCP: the DSN should be in the format of `tcp://:` or `:`. - /// For UNIX: the DSN should be in the format of `unix://`. - pub async fn new>(dsn: S) -> Result, Error> { - let connection = Connection::new(dsn).await?; - - Ok(Client { - buf: BytesMut::new(), - last_read_n: None, - conn: connection, - _phantom: std::marker::PhantomData, - }) - } - - pub(crate) async fn drive_receive(&mut self, op: F) -> Result - where - F: Fn(&[u8]) -> Result, ErrorKind>, - { - // If we serviced a previous request, advance our buffer forward. - if let Some(n) = self.last_read_n { - let _ = self.buf.split_to(n); - } - - let mut needs_more_data = false; - loop { - if self.buf.is_empty() || needs_more_data { - match self.conn { - Connection::Tcp(ref mut s) => { - self.buf.reserve(1024); - let n = s.read_buf(&mut self.buf).await?; - if n == 0 { - return Err(Error::Io(std::io::ErrorKind::UnexpectedEof.into())); - } - } - Connection::Unix(ref mut s) => { - self.buf.reserve(1024); - let n = s.read_buf(&mut self.buf).await?; - if n == 0 { - return Err(Error::Io(std::io::ErrorKind::UnexpectedEof.into())); - } - } - } - } - - // Try and parse out a response. - match op(&self.buf) { - // We got a response. - Ok(Some((n, response))) => { - self.last_read_n = Some(n); - return Ok(response); - } - // We didn't have enough data, so loop around and try again. - Ok(None) => { - needs_more_data = true; - continue; - } - // Invalid data not matching the protocol. - Err(kind) => return Err(Status::Error(kind).into()), - } - } - } - - pub(crate) async fn get_read_write_response(&mut self) -> Result { - self.drive_receive(parse_ascii_response).await - } - - pub(crate) async fn get_incrdecr_response(&mut self) -> Result { - self.drive_receive(parse_ascii_response).await - } - - pub(crate) async fn get_metadump_response(&mut self) -> Result { - self.drive_receive(parse_ascii_metadump_response).await - } - - pub(crate) async fn get_stats_response(&mut self) -> Result { - self.drive_receive(parse_ascii_stats_response).await - } - - /// Gets the given key. - /// - /// If the key is found, `Some(Value)` is returned, describing the metadata and data of the key. - /// - /// Otherwise, [`Error`] is returned. - pub async fn get>(&mut self, key: K) -> Result, Error> { - self.conn.write_all(b"get ").await?; - self.conn.write_all(key.as_ref()).await?; - self.conn.write_all(b"\r\n").await?; - self.conn.flush().await?; - - match self.get_read_write_response().await? { - Response::Status(Status::NotFound) => Ok(None), - Response::Status(s) => Err(s.into()), - Response::Data(d) => d - .map(|mut items| { - if items.len() != 1 { - Err(Status::Error(ErrorKind::Protocol(None)).into()) - } else { - Ok(items.remove(0)) - } - }) - .transpose(), - _ => Err(Error::Protocol(Status::Error(ErrorKind::Protocol(None)))), - } - } - - /// Gets the given keys. - /// - /// If any of the keys are found, a vector of [`Value`] will be returned, where [`Value`] - /// describes the metadata and data of the key. - /// - /// Otherwise, [`Error`] is returned. - pub async fn get_many(&mut self, keys: I) -> Result, Error> - where - I: IntoIterator, - K: AsRef<[u8]>, - { - self.conn.write_all(b"get ").await?; - for key in keys.into_iter() { - self.conn.write_all(key.as_ref()).await?; - self.conn.write_all(b" ").await?; - } - self.conn.write_all(b"\r\n").await?; - self.conn.flush().await?; - - match self.get_read_write_response().await? { - Response::Status(s) => Err(s.into()), - Response::Data(d) => d.ok_or(Status::NotFound.into()), - _ => Err(Status::Error(ErrorKind::Protocol(None)).into()), - } - } - - /// Sets the given key. - /// - /// If `ttl` or `flags` are not specified, they will default to 0. If the value is set - /// successfully, `()` is returned, otherwise [`Error`] is returned. - pub async fn set( - &mut self, - key: K, - value: SetAddInput<'a>, - ttl: Option, - flags: Option, - ) -> Result<(), Error> - where - K: AsRef<[u8]>, - { - let kr = key.as_ref(); - let vr = value; - - self.conn.write_all(b"set ").await?; - self.conn.write_all(kr).await?; - - let flags = flags.unwrap_or(0).to_string(); - self.conn.write_all(b" ").await?; - self.conn.write_all(flags.as_ref()).await?; - - let ttl = ttl.unwrap_or(0).to_string(); - self.conn.write_all(b" ").await?; - self.conn.write_all(ttl.as_ref()).await?; - - self.conn.write_all(b" ").await?; - let vlen = vr.len().to_string(); - self.conn.write_all(vlen.as_ref()).await?; - self.conn.write_all(b"\r\n").await?; - - self.conn.write_all(&vr).await?; - self.conn.write_all(b"\r\n").await?; - self.conn.flush().await?; - - match self.get_read_write_response().await? { - Response::Status(Status::Stored) => Ok(()), - Response::Status(s) => Err(s.into()), - _ => Err(Status::Error(ErrorKind::Protocol(None)).into()), - } - } - - /// Add a key. If the value exists, Err(Protocol(NotStored)) is returned. - pub async fn add( - &mut self, - key: K, - value: V, - ttl: Option, - flags: Option, - ) -> Result<(), Error> - where - K: AsRef<[u8]>, - V: AsRef<[u8]>, - { - let kr = key.as_ref(); - let vr = value.as_ref(); - - self.conn - .write_all( - &[ - b"add ", - kr, - b" ", - flags.unwrap_or(0).to_string().as_ref(), - b" ", - ttl.unwrap_or(0).to_string().as_ref(), - b" ", - vr.len().to_string().as_ref(), - b"\r\n", - vr, - b"\r\n", - ] - .concat(), - ) - .await?; - self.conn.flush().await?; - - match self.get_read_write_response().await? { - Response::Status(Status::Stored) => Ok(()), - Response::Status(s) => Err(s.into()), - _ => Err(Status::Error(ErrorKind::Protocol(None)).into()), - } - } - - /// Delete a key but don't wait for a reply. - pub async fn delete_no_reply(&mut self, key: K) -> Result<(), Error> - where - K: AsRef<[u8]>, - { - let kr = key.as_ref(); - - self.conn - .write_all(&[b"delete ", kr, b" noreply\r\n"].concat()) - .await?; - self.conn.flush().await?; - Ok(()) - } - - /// Delete a key and wait for a reply - pub async fn delete(&mut self, key: K) -> Result<(), Error> - where - K: AsRef<[u8]>, - { - let kr = key.as_ref(); - - self.conn - .write_all(&[b"delete ", kr, b"\r\n"].concat()) - .await?; - self.conn.flush().await?; - - match self.get_read_write_response().await? { - Response::Status(Status::Deleted) => Ok(()), - Response::Status(s) => Err(s.into()), - _ => Err(Status::Error(ErrorKind::Protocol(None)).into()), - } - } - - /// Increments the given key by the specified amount. - /// Can overflow from the max value of u64 (18446744073709551615) -> 0. - /// Returns the new value of the key if key exists, otherwise returns KeyNotFound error. - pub async fn increment(&mut self, key: K, amount: u64) -> Result - where - K: AsRef<[u8]>, - { - self.conn - .write_all( - &[ - b"incr ", - key.as_ref(), - b" ", - amount.to_string().as_bytes(), - b"\r\n", - ] - .concat(), - ) - .await?; - self.conn.flush().await?; - - match self.get_incrdecr_response().await? { - Response::Status(Status::NotFound) => Err(Error::KeyNotFound), - Response::Status(s) => Err(s.into()), - Response::IncrDecr(amount) => Ok(amount), - _ => Err(Error::Protocol(Status::Error(ErrorKind::Protocol(None)))), - } - } - - /// Increments the given key by the specified amount with no reply from the server. - /// /// Can overflow from the max value of u64 (18446744073709551615) -> 0. - pub async fn increment_no_reply(&mut self, key: K, amount: u64) -> Result<(), Error> - where - K: AsRef<[u8]>, - { - self.conn - .write_all( - &[ - b"incr ", - key.as_ref(), - b" ", - amount.to_string().as_bytes(), - b" noreply\r\n", - ] - .concat(), - ) - .await?; - self.conn.flush().await?; - - Ok(()) - } - - /// Decrements the given key by the specified amount. - /// Will not decrement the counter below 0. - pub async fn decrement(&mut self, key: K, amount: u64) -> Result - where - K: AsRef<[u8]>, - { - self.conn - .write_all( - &[ - b"decr ", - key.as_ref(), - b" ", - amount.to_string().as_bytes(), - b"\r\n", - ] - .concat(), - ) - .await?; - self.conn.flush().await?; - - match self.get_incrdecr_response().await? { - Response::Status(Status::NotFound) => Err(Error::KeyNotFound), - Response::Status(s) => Err(s.into()), - Response::IncrDecr(amount) => Ok(amount), - _ => Err(Error::Protocol(Status::Error(ErrorKind::Protocol(None)))), - } - } - - /// Decrements the given key by the specified amount with no reply from the server. - /// Will not decrement the counter below 0. - /// Returns the new value of the key if key exists, otherwise returns KeyNotFound error. - pub async fn decrement_no_reply(&mut self, key: K, amount: u64) -> Result<(), Error> - where - K: AsRef<[u8]>, - { - self.conn - .write_all( - &[ - b"decr ", - key.as_ref(), - b" ", - amount.to_string().as_bytes(), - b" noreply\r\n", - ] - .concat(), - ) - .await?; - self.conn.flush().await?; - - Ok(()) - } - - /// Gets the version of the server. - /// - /// If the version is retrieved successfully, `String` is returned containing the version - /// component e.g. `1.6.7`, otherwise [`Error`] is returned. - /// - /// For some setups, such as those using Twemproxy, this will return an error as those - /// intermediate proxies do not support the version command. - pub async fn version(&mut self) -> Result { - self.conn.write_all(b"version\r\n").await?; - self.conn.flush().await?; - - let mut version = String::new(); - let bytes = self.conn.read_line(&mut version).await?; - - // Peel off the leading "VERSION " header. - if bytes >= 8 && version.is_char_boundary(8) { - Ok(version.split_off(8)) - } else { - Err(Error::from(Status::Error(ErrorKind::Protocol(Some( - format!("Invalid response for `version` command: `{version}`"), - ))))) - } - } - - /// Dumps all keys from the server. - /// - /// This operation scans all slab classes from tail to head, in a non-blocking fashion. Thus, - /// not all items will be found as new items could be inserted or deleted while the crawler is - /// still running. - /// - /// [`MetadumpIter`] must be iterated over to discover whether or not the crawler successfully - /// started, as this call will only return [`Error`] if the command failed to be written to the - /// server at all. - /// - /// Available as of memcached 1.4.31. - pub async fn dump_keys(&mut self) -> Result, Error> { - self.conn.write_all(b"lru_crawler metadump all\r\n").await?; - self.conn.flush().await?; - - Ok(MetadumpIter { - client: self, - done: false, - }) - } - - /// Collects statistics from the server. - /// - /// The statistics that may be returned are detailed in the protocol specification for - /// memcached, but all values returned by this method are returned as strings and are not - /// further interpreted or validated for conformity. - pub async fn stats(&mut self) -> Result, Error> { - let mut entries = HashMap::new(); - - self.conn.write_all(b"stats\r\n").await?; - self.conn.flush().await?; - - while let StatsResponse::Entry(key, value) = self.get_stats_response().await? { - entries.insert(key, value); - } - - Ok(entries) - } -} - -/// Asynchronous iterator for metadump operations. -pub struct MetadumpIter<'a> { - client: &'a mut Client, - done: bool, -} - -impl<'a> MetadumpIter<'a> { - /// Gets the next result for the current operation. - /// - /// If there is another key in the dump, `Some(Ok(KeyMetadata))` will be returned. If there was - /// an error while attempting to start the metadump operation, or if there was a general - /// network/protocol-level error, `Some(Err(Error))` will be returned. - /// - /// Otherwise, `None` will be returned and signals the end of the iterator. Subsequent calls - /// will return `None`. - pub async fn next(&mut self) -> Option> { - if self.done { - return None; - } - - match self.client.get_metadump_response().await { - Ok(MetadumpResponse::End) => { - self.done = true; - None - } - Ok(MetadumpResponse::BadClass(s)) => { - self.done = true; - Some(Err(Error::Protocol(MetadumpResponse::BadClass(s).into()))) - } - Ok(MetadumpResponse::Busy(s)) => { - Some(Err(Error::Protocol(MetadumpResponse::Busy(s).into()))) - } - Ok(MetadumpResponse::Entry(km)) => Some(Ok(km)), - Err(e) => Some(Err(e)), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_add() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "async-memcache-test-key-add"; - - let result = client.delete_no_reply(key).await; - assert!(result.is_ok(), "failed to delete {}, {:?}", key, result); - - let result = client.add(key, "value", None, None).await; - - assert!(result.is_ok()); - } - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_delete() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "async-memcache-test-key-delete"; - - let value = rand::random::().to_string(); - let result = client.set(key, &value, None, None).await; - - assert!(result.is_ok(), "failed to set {}, {:?}", key, result); - - let result = client.get(key).await; - - assert!(result.is_ok(), "failed to get {}, {:?}", key, result); - let get_result = result.unwrap(); - - match get_result { - Some(get_value) => assert_eq!( - String::from_utf8(get_value.data).expect("failed to parse a string"), - value - ), - None => panic!("failed to get {}", key), - } - - let result = client.delete(key).await; - - assert!(result.is_ok(), "failed to delete {}, {:?}", key, result); - } - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_delete_no_reply() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "async-memcache-test-key-delete-no-reply"; - - let value = rand::random::().to_string(); - let result = client.set(key, &value, None, None).await; - - assert!(result.is_ok(), "failed to set {}, {:?}", key, result); - - let result = client.get(key).await; - - assert!(result.is_ok(), "failed to get {}, {:?}", key, result); - let get_result = result.unwrap(); - - match get_result { - Some(get_value) => assert_eq!( - String::from_utf8(get_value.data).expect("failed to parse a string"), - value - ), - None => panic!("failed to get {}", key), - } - - let result = client.delete_no_reply(key).await; - - assert!(result.is_ok(), "failed to delete {}, {:?}", key, result); - } - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_increment_raises_error_when_key_doesnt_exist() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "key-does-not-exist"; - let amount = 1; - - let result = client.increment(key, amount).await; - - assert!(result.is_err()); - assert!(matches!(result, Err(Error::KeyNotFound))) - } - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_increments_existing_key() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "key-to-increment"; - let value = "1"; - - let _ = client.set(key, &value, None, None).await; - - let amount = 1; - - let result = client.increment(key, amount).await; - - println!("result: {:?}", result); - - assert!(result.is_ok()); - assert_eq!(Ok(2), result); - } - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_increment_can_overflow() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "key-to-increment-overflow"; - let value = u64::MAX.to_string(); // max value for u64 - - let _ = client.set(key, &value, None, None).await; - - let amount = 1; - - // First increment should overflow - let result = client.increment(key, amount).await; - - println!("result: {:?}", result); - - assert!(result.is_ok()); - assert_eq!(Ok(0), result); - - // Subsequent increments should work as normal - let result = client.increment(key, amount).await; - - println!("result: {:?}", result); - - assert!(result.is_ok()); - assert_eq!(Ok(1), result); - } - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_increments_existing_key_with_no_reply() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "key-to-increment-no-reply"; - let value = "1"; - - let _ = client.set(key, &value, None, None).await; - - let amount = 1; - - let result = client.increment_no_reply(key, amount).await; - - assert!(result.is_ok()); - assert_eq!(Ok(()), result); - } - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_decrement_raises_error_when_key_doesnt_exist() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "fails-to-decrement"; - let amount = 1; - - let result = client.decrement(key, amount).await; - - assert!(result.is_err()); - assert!(matches!(result, Err(Error::KeyNotFound))) - } - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_decrements_existing_key() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "key-to-decrement"; - let value = "2"; - - let _ = client.set(key, &value, None, None).await; - - let amount = 1; - - let result = client.decrement(key, amount).await; - - assert!(result.is_ok()); - assert_eq!(Ok(1), result); - } - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_decrement_does_not_reduce_value_below_zero() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "key-to-decrement-past-zero"; - let value = 0.to_string(); - - let _ = client.set(key, &value, None, None).await; - - let amount = 1; - - let result = client.decrement(key, amount).await; - - assert!(result.is_ok()); - assert_eq!(Ok(0), result); - } - - #[ignore = "Relies on a running memcached server"] - #[tokio::test] - async fn test_decrements_existing_key_with_no_reply() { - let mut client = Client::new("tcp://127.0.0.1:11211") - .await - .expect("Failed to connect to server"); - - let key = "key-to-decrement-no-reply"; - let value = "1"; - - let _ = client.set(key, &value, None, None).await; - - let amount = 1; - - let result = client.decrement_no_reply(key, amount).await; - - assert!(result.is_ok()); - assert_eq!(Ok(()), result); - } -}