diff --git a/README.md b/README.md index e5d3916d..3b2f490e 100644 --- a/README.md +++ b/README.md @@ -1,313 +1,14 @@ # twemproxy (nutcracker) [![Build Status](https://github.com/twitter/twemproxy/actions/workflows/main.yml/badge.svg?branch=master)](https://github.com/twitter/twemproxy/actions/workflows/main.yml?query=branch%3Amaster) -**twemproxy** (pronounced "two-em-proxy"), aka **nutcracker** is a fast and lightweight proxy for [memcached](http://www.memcached.org/) and [redis](http://redis.io/) protocol. It was built primarily to reduce the number of connections to the caching servers on the backend. This, together with protocol pipelining and sharding enables you to horizontally scale your distributed caching architecture. +### 背景 Background information +鉴于官方twemproxy已经很久没更新了,而随着redis版本的不断迭代,新增了越来越多的命令和特性支持,twemproxy不支持的命令和特性越来越多。 +因此本人开启这个仓库,用于一些实用命令的支持。 -## Build +Since the official twemproxy has not been updated for a long time, and with the continuous iteration of the redis version, more and more commands and feature support have been added, more and more commands and features are not supported by twemproxy. +Therefore, I opened this repository and started to support some useful commands of redis. -To build twemproxy 0.5.0+ from [distribution tarball](https://github.com/twitter/twemproxy/releases): +**能力有限,代码质量不敢保证。** - $ ./configure - $ make - $ sudo make install - -To build twemproxy 0.5.0+ from [distribution tarball](https://github.com/twitter/twemproxy/releases) in _debug mode_: - - $ CFLAGS="-ggdb3 -O0" ./configure --enable-debug=full - $ make - $ sudo make install - -To build twemproxy from source with _debug logs enabled_ and _assertions enabled_: - - $ git clone git@github.com:twitter/twemproxy.git - $ cd twemproxy - $ autoreconf -fvi - $ ./configure --enable-debug=full - $ make - $ src/nutcracker -h - -A quick checklist: - -+ Use newer version of gcc (older version of gcc has problems) -+ Use CFLAGS="-O1" ./configure && make -+ Use CFLAGS="-O3 -fno-strict-aliasing" ./configure && make -+ `autoreconf -fvi && ./configure` needs `automake` and `libtool` to be installed - -`make check` will run unit tests. - -### Older Releases - -Distribution tarballs for older twemproxy releases (<= 0.4.1) can be found on [Google Drive](https://drive.google.com/open?id=0B6pVMMV5F5dfMUdJV25abllhUWM&authuser=0). -The build steps are the same (`./configure; make; sudo make install`). - -## Features - -+ Fast. -+ Lightweight. -+ Maintains persistent server connections. -+ Keeps connection count on the backend caching servers low. -+ Enables pipelining of requests and responses. -+ Supports proxying to multiple servers. -+ Supports multiple server pools simultaneously. -+ Shard data automatically across multiple servers. -+ Implements the complete [memcached ascii](notes/memcache.md) and [redis](notes/redis.md) protocol. -+ Easy configuration of server pools through a YAML file. -+ Supports multiple hashing modes including consistent hashing and distribution. -+ Can be configured to disable nodes on failures. -+ Observability via stats exposed on the stats monitoring port. -+ Works with Linux, *BSD, OS X and SmartOS (Solaris) - -## Help - - Usage: nutcracker [-?hVdDt] [-v verbosity level] [-o output file] - [-c conf file] [-s stats port] [-a stats addr] - [-i stats interval] [-p pid file] [-m mbuf size] - - Options: - -h, --help : this help - -V, --version : show version and exit - -t, --test-conf : test configuration for syntax errors and exit - -d, --daemonize : run as a daemon - -D, --describe-stats : print stats description and exit - -v, --verbose=N : set logging level (default: 5, min: 0, max: 11) - -o, --output=S : set logging file (default: stderr) - -c, --conf-file=S : set configuration file (default: conf/nutcracker.yml) - -s, --stats-port=N : set stats monitoring port (default: 22222) - -a, --stats-addr=S : set stats monitoring ip (default: 0.0.0.0) - -i, --stats-interval=N : set stats aggregation interval in msec (default: 30000 msec) - -p, --pid-file=S : set pid file (default: off) - -m, --mbuf-size=N : set size of mbuf chunk in bytes (default: 16384 bytes) - -## Zero Copy - -In twemproxy, all the memory for incoming requests and outgoing responses is allocated in mbuf. Mbuf enables zero-copy because the same buffer on which a request was received from the client is used for forwarding it to the server. Similarly the same mbuf on which a response was received from the server is used for forwarding it to the client. - -Furthermore, memory for mbufs is managed using a reuse pool. This means that once mbuf is allocated, it is not deallocated, but just put back into the reuse pool. By default each mbuf chunk is set to 16K bytes in size. There is a trade-off between the mbuf size and number of concurrent connections twemproxy can support. A large mbuf size reduces the number of read syscalls made by twemproxy when reading requests or responses. However, with a large mbuf size, every active connection would use up 16K bytes of buffer which might be an issue when twemproxy is handling large number of concurrent connections from clients. When twemproxy is meant to handle a large number of concurrent client connections, you should set chunk size to a small value like 512 bytes using the -m or --mbuf-size=N argument. - -## Configuration - -Twemproxy can be configured through a YAML file specified by the -c or --conf-file command-line argument on process start. The configuration file is used to specify the server pools and the servers within each pool that twemproxy manages. The configuration files parses and understands the following keys: - -+ **listen**: The listening address and port (name:port or ip:port) or an absolute path to sock file (e.g. /var/run/nutcracker.sock) for this server pool. -+ **client_connections**: The maximum number of connections allowed from redis clients. Unlimited by default, though OS-imposed limitations will still apply. -+ **hash**: The name of the hash function. Possible values are: - + one_at_a_time - + md5 - + crc16 - + crc32 (crc32 implementation compatible with [libmemcached](http://libmemcached.org/)) - + crc32a (correct crc32 implementation as per the spec) - + fnv1_64 - + fnv1a_64 (default) - + fnv1_32 - + fnv1a_32 - + hsieh - + murmur - + jenkins -+ **hash_tag**: A two character string that specifies the part of the key used for hashing. Eg "{}" or "$$". [Hash tag](notes/recommendation.md#hash-tags) enable mapping different keys to the same server as long as the part of the key within the tag is the same. -+ **distribution**: The key distribution mode for choosing backend servers based on the computed hash value. Possible values are: - + ketama (default, recommended. An implementation of https://en.wikipedia.org/wiki/Consistent_hashing) - + modula (use hash modulo number of servers to choose the backend) - + random (choose a random backend for each key of each request) -+ **timeout**: The timeout value in msec that we wait for to establish a connection to the server or receive a response from a server. By default, we wait indefinitely. -+ **backlog**: The TCP backlog argument. Defaults to 512. -+ **tcpkeepalive**: A boolean value that controls if tcp keepalive is enabled for connections to servers. Defaults to false. -+ **preconnect**: A boolean value that controls if twemproxy should preconnect to all the servers in this pool on process start. Defaults to false. -+ **redis**: A boolean value that controls if a server pool speaks redis or memcached protocol. Defaults to false. -+ **redis_auth**: Authenticate to the Redis server on connect. -+ **redis_db**: The DB number to use on the pool servers. Defaults to 0. Note: Twemproxy will always present itself to clients as DB 0. -+ **server_connections**: The maximum number of connections that can be opened to each server. By default, we open at most 1 server connection. -+ **auto_eject_hosts**: A boolean value that controls if server should be ejected temporarily when it fails consecutively server_failure_limit times. See [liveness recommendations](notes/recommendation.md#liveness) for information. Defaults to false. -+ **server_retry_timeout**: The timeout value in msec to wait for before retrying on a temporarily ejected server, when auto_eject_hosts is set to true. Defaults to 30000 msec. -+ **server_failure_limit**: The number of consecutive failures on a server that would lead to it being temporarily ejected when auto_eject_hosts is set to true. Defaults to 2. -+ **servers**: A list of server address, port and weight (name:port:weight or ip:port:weight) for this server pool. - - -For example, the configuration file in [conf/nutcracker.yml](conf/nutcracker.yml), also shown below, configures 5 server pools with names - _alpha_, _beta_, _gamma_, _delta_ and omega. Clients that intend to send requests to one of the 10 servers in pool delta connect to port 22124 on 127.0.0.1. Clients that intend to send request to one of 2 servers in pool omega connect to unix path /tmp/gamma. Requests sent to pool alpha and omega have no timeout and might require timeout functionality to be implemented on the client side. On the other hand, requests sent to pool beta, gamma and delta timeout after 400 msec, 400 msec and 100 msec respectively when no response is received from the server. Of the 5 server pools, only pools alpha, gamma and delta are configured to use server ejection and hence are resilient to server failures. All the 5 server pools use ketama consistent hashing for key distribution with the key hasher for pools alpha, beta, gamma and delta set to fnv1a_64 while that for pool omega set to hsieh. Also only pool beta uses [nodes names](notes/recommendation.md#node-names-for-consistent-hashing) for consistent hashing, while pool alpha, gamma, delta and omega use 'host:port:weight' for consistent hashing. Finally, only pool alpha and beta can speak the redis protocol, while pool gamma, delta and omega speak memcached protocol. - - alpha: - listen: 127.0.0.1:22121 - hash: fnv1a_64 - distribution: ketama - auto_eject_hosts: true - redis: true - server_retry_timeout: 2000 - server_failure_limit: 1 - servers: - - 127.0.0.1:6379:1 - - beta: - listen: 127.0.0.1:22122 - hash: fnv1a_64 - hash_tag: "{}" - distribution: ketama - auto_eject_hosts: false - timeout: 400 - redis: true - servers: - - 127.0.0.1:6380:1 server1 - - 127.0.0.1:6381:1 server2 - - 127.0.0.1:6382:1 server3 - - 127.0.0.1:6383:1 server4 - - gamma: - listen: 127.0.0.1:22123 - hash: fnv1a_64 - distribution: ketama - timeout: 400 - backlog: 1024 - preconnect: true - auto_eject_hosts: true - server_retry_timeout: 2000 - server_failure_limit: 3 - servers: - - 127.0.0.1:11212:1 - - 127.0.0.1:11213:1 - - delta: - listen: 127.0.0.1:22124 - hash: fnv1a_64 - distribution: ketama - timeout: 100 - auto_eject_hosts: true - server_retry_timeout: 2000 - server_failure_limit: 1 - servers: - - 127.0.0.1:11214:1 - - 127.0.0.1:11215:1 - - 127.0.0.1:11216:1 - - 127.0.0.1:11217:1 - - 127.0.0.1:11218:1 - - 127.0.0.1:11219:1 - - 127.0.0.1:11220:1 - - 127.0.0.1:11221:1 - - 127.0.0.1:11222:1 - - 127.0.0.1:11223:1 - - omega: - listen: /tmp/gamma 0666 - hash: hsieh - distribution: ketama - auto_eject_hosts: false - servers: - - 127.0.0.1:11214:100000 - - 127.0.0.1:11215:1 - -Finally, to make writing a syntactically correct configuration file easier, twemproxy provides a command-line argument `-t` or `--test-conf` that can be used to test the YAML configuration file for any syntax error. - -## Observability - -Observability in twemproxy is through logs and stats. - -Twemproxy exposes stats at the granularity of server pool and servers per pool through the stats monitoring port by responding with the raw data over TCP. The stats are essentially JSON formatted key-value pairs, with the keys corresponding to counter names. By default stats are exposed on port 22222 and aggregated every 30 seconds. Both these values can be configured on program start using the `-c` or `--conf-file` and `-i` or `--stats-interval` command-line arguments respectively. You can print the description of all stats exported by using the `-D` or `--describe-stats` command-line argument. - - $ nutcracker --describe-stats - - pool stats: - client_eof "# eof on client connections" - client_err "# errors on client connections" - client_connections "# active client connections" - server_ejects "# times backend server was ejected" - forward_error "# times we encountered a forwarding error" - fragments "# fragments created from a multi-vector request" - - server stats: - server_eof "# eof on server connections" - server_err "# errors on server connections" - server_timedout "# timeouts on server connections" - server_connections "# active server connections" - requests "# requests" - request_bytes "total request bytes" - responses "# responses" - response_bytes "total response bytes" - in_queue "# requests in incoming queue" - in_queue_bytes "current request bytes in incoming queue" - out_queue "# requests in outgoing queue" - out_queue_bytes "current request bytes in outgoing queue" - -See [`notes/debug.txt`](notes/debug.txt) for examples of how to read the stats from the stats port. - -Logging in twemproxy is only available when twemproxy is built with logging enabled. By default logs are written to stderr. Twemproxy can also be configured to write logs to a specific file through the `-o` or `--output` command-line argument. On a running twemproxy, we can turn log levels up and down by sending it SIGTTIN and SIGTTOU signals respectively and reopen log files by sending it SIGHUP signal. - -## Pipelining - -Twemproxy enables proxying multiple client connections onto one or few server connections. This architectural setup makes it ideal for pipelining requests and responses and hence saving on the round trip time. - -For example, if twemproxy is proxying three client connections onto a single server and we get requests - `get key\r\n`, `set key 0 0 3\r\nval\r\n` and `delete key\r\n` on these three connections respectively, twemproxy would try to batch these requests and send them as a single message onto the server connection as `get key\r\nset key 0 0 3\r\nval\r\ndelete key\r\n`. - -Pipelining is the reason why twemproxy ends up doing better in terms of throughput even though it introduces an extra hop between the client and server. - -## Deployment - -If you are deploying twemproxy in production, you might consider reading through the [recommendation document](notes/recommendation.md) to understand the parameters you could tune in twemproxy to run it efficiently in the production environment. - -## Utils -+ [collectd-plugin](https://github.com/bewie/collectd-twemproxy) -+ [munin-plugin](https://github.com/eveiga/contrib/tree/nutcracker/plugins/nutcracker) -+ [twemproxy-ganglia-module](https://github.com/ganglia/gmond_python_modules/tree/master/twemproxy) -+ [nagios checks](https://github.com/wanelo/nagios-checks/blob/master/check_twemproxy) -+ [circonus](https://github.com/wanelo-chef/nad-checks/blob/master/recipes/twemproxy.rb) -+ [puppet module](https://github.com/wuakitv/puppet-twemproxy) -+ [nutcracker-web](https://github.com/kontera-technologies/nutcracker-web) -+ [redis-twemproxy agent](https://github.com/Stono/redis-twemproxy-agent) -+ [sensu-metrics](https://github.com/sensu-plugins/sensu-plugins-twemproxy/blob/master/bin/metrics-twemproxy.rb) -+ [redis-mgr](https://github.com/idning/redis-mgr) -+ [smitty for twemproxy failover](https://github.com/areina/smitty) -+ [Beholder, a Python agent for twemproxy failover](https://github.com/Serekh/beholder) -+ [chef cookbook](https://supermarket.getchef.com/cookbooks/twemproxy) -+ [twemsentinel](https://github.com/yak0/twemsentinel) - -## Companies using Twemproxy in Production -+ [Twitter](https://twitter.com/) -+ [Wikimedia](https://www.wikimedia.org/) -+ [Pinterest](http://pinterest.com/) -+ [Snapchat](http://www.snapchat.com/) -+ [Flickr](https://www.flickr.com) -+ [Yahoo!](https://www.yahoo.com) -+ [Tumblr](https://www.tumblr.com/) -+ [Vine](http://vine.co/) -+ [Wayfair](http://www.wayfair.com/) -+ [Kiip](http://www.kiip.me/) -+ [Wuaki.tv](https://wuaki.tv/) -+ [Wanelo](http://wanelo.com/) -+ [Kontera](http://kontera.com/) -+ [Bright](http://www.bright.com/) -+ [56.com](http://www.56.com/) -+ [Digg](http://digg.com/) -+ [Gawkermedia](http://advertising.gawker.com/) -+ [3scale.net](http://3scale.net) -+ [Ooyala](http://www.ooyala.com) -+ [Twitch](http://twitch.tv) -+ [Socrata](http://www.socrata.com/) -+ [Hootsuite](http://hootsuite.com/) -+ [Trivago](http://www.trivago.com/) -+ [Machinezone](http://www.machinezone.com) -+ [Path](https://path.com) -+ [AOL](http://engineering.aol.com/) -+ [Soysuper](https://soysuper.com/) -+ [Vinted](http://vinted.com/) -+ [Poshmark](https://poshmark.com/) -+ [FanDuel](https://www.fanduel.com/) -+ [Bloomreach](http://bloomreach.com/) -+ [Hootsuite](https://hootsuite.com) -+ [Tradesy](https://www.tradesy.com/) -+ [Uber](http://uber.com) ([details](http://highscalability.com/blog/2015/9/14/how-uber-scales-their-real-time-market-platform.html)) -+ [Greta](https://greta.io/) - -## Issues and Support - -Have a bug or a question? Please create an issue here on GitHub! - -https://github.com/twitter/twemproxy/issues - -## Committers - -* Manju Rajashekhar ([@manju](https://twitter.com/manju)) -* Lin Yang ([@idning](https://github.com/idning)) -* Tyson Andre ([@TysonAndre](https://github.com/TysonAndre)) - -Thank you to all of our [contributors](https://github.com/twitter/twemproxy/graphs/contributors)! - -## License - -Copyright 2012 Twitter, Inc. - -Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0 +### 新增已支持命令 NEW SUPPORTED COMMANDS +- [script](https://redis.io/commands/script-load/) :script load/script exists/script flush +- [scan](https://redis.io/commands/scan/) diff --git a/src/nc_core.h b/src/nc_core.h index 3ac6dce6..d832a8c1 100644 --- a/src/nc_core.h +++ b/src/nc_core.h @@ -63,6 +63,9 @@ #define NC_EAGAIN -2 #define NC_ENOMEM -3 +#define NC_MAX_NSERVER_BITS 12 +#define NC_MAX_NSERVER_MASK ((1< #include +#include struct msg * req_get(struct conn *conn) @@ -471,6 +472,49 @@ req_make_reply(struct context *ctx, struct conn *conn, struct msg *req) return NC_OK; } +/* If the user request is "scan 45066", + * the cursor 45066 in the request, + * we get server_index=45066 & NC_MAX_NSERVER_MASK=10, + * real_cursor = 45066>>NC_MAX_NSERVER_BITS = 11, + * and finally the request sent to the redis server will be "scan 00011". +*/ +static void req_update_for_scan(struct server_pool *pool,struct msg *msg) +{ + if (msg->type != MSG_REQ_REDIS_SCAN) { + return; + } + + ASSERT(array_n(msg->keys) > 0); + + uint8_t *key; + uint32_t keylen; + struct keypos *kpos; + uint32_t idx; + unsigned long long cursor; + unsigned long real_cursor; + char arr[16]; + char format[16]; + + kpos = array_get(msg->keys, 0); + key = kpos->start; + keylen = (uint32_t)(kpos->end - kpos->start); + + if (keylen == 1 && key[0] == '0') { + idx=0; + }else{ + cursor=strtoull((const char *)key,NULL,10); + idx = cursor & NC_MAX_NSERVER_MASK; + if (array_n(&pool->server)<=idx){ + idx=0; + } + real_cursor = (cursor >> NC_MAX_NSERVER_BITS); + sprintf(format,"%%0%dd",keylen); + sprintf(arr,format,real_cursor); + nc_memcpy(key,arr,keylen); + } + msg->server_index=idx; +} + static bool req_filter(struct conn *conn, struct msg *msg) { @@ -511,6 +555,8 @@ req_filter(struct conn *conn, struct msg *msg) msg->noforward = 1; } + req_update_for_scan(conn->owner,msg); + return false; } @@ -573,7 +619,7 @@ req_forward(struct context *ctx, struct conn *c_conn, struct msg *msg) key = kpos->start; keylen = (uint32_t)(kpos->end - kpos->start); - s_conn = server_pool_conn(ctx, c_conn->owner, key, keylen); + s_conn = server_pool_conn(ctx, c_conn->owner, msg, key, keylen); if (s_conn == NULL) { /* * Handle a failure to establish a new connection to a server, diff --git a/src/nc_response.c b/src/nc_response.c index 694c8720..ce64ad4d 100644 --- a/src/nc_response.c +++ b/src/nc_response.c @@ -17,6 +17,7 @@ #include #include +#include struct msg * rsp_get(struct conn *conn) @@ -138,6 +139,47 @@ rsp_recv_next(struct context *ctx, struct conn *conn, bool alloc) return msg; } +static void +rsp_update_for_scan(struct server_pool *pool,struct msg *request) { + struct msg *response = request->peer; + struct mbuf *mbuf,*nbuf; + struct mbuf *first_mbuf; + rstatus_t status; + char tmp_str[40]; + int len; + + for(mbuf=STAILQ_FIRST(&response->mhdr);mbuf!=NULL;mbuf=nbuf){ + nbuf=STAILQ_NEXT(mbuf,next); + if(mbuf_empty(mbuf)) continue; + + first_mbuf=mbuf; + break; + } + ASSERT(nc_strncmp(first_mbuf->pos,"*2\r\n$",strlen("*2\r\n$")) ==0); + uint8_t * p=nc_strchr(first_mbuf->pos + sizeof("*2\r\n$"),first_mbuf->last,'\n'); + unsigned long cursor = strtoul((const char *)(p+1),NULL,10); + unsigned long next_cursor; + if(cursor == 0 && request->server_index == array_n(&pool->server)-1){ + // all redis servers have been scanned, and the scan command of the last redis server has returned. + return; + }else if(cursor ==0 && request->server_index < array_n(&pool->server)-1){ + // the current redis server have been scanned,now we continue scan next redis server + next_cursor=(cursor << NC_MAX_NSERVER_BITS) | (request->server_index+1); + }else{ + // the current redis server scan not finish , go on + next_cursor=(cursor << NC_MAX_NSERVER_BITS) | request->server_index; + } + // discard the old head "*2\r\n$%d\r\n\%dr\n" + p=nc_strchr(p+1,first_mbuf->last,'\n'); + ASSERT(p < first_mbuf->last); + first_mbuf->pos = p+1; + + // we get a new head "*2\r\n$%d\r\n\%dr\n", the cursor contain server index + len=sprintf(tmp_str,"%ld",next_cursor); + status=msg_prepend_format(response,"*2\r\n$%d\r\n%ld\r\n",len,next_cursor); + ASSERT(status == NC_OK); +} + static bool rsp_filter(struct context *ctx, struct conn *conn, struct msg *msg) { @@ -261,6 +303,10 @@ rsp_forward(struct context *ctx, struct conn *s_conn, struct msg *msg) c_conn = pmsg->owner; ASSERT(c_conn->client && !c_conn->proxy); + if(pmsg->type == MSG_REQ_REDIS_SCAN){ + rsp_update_for_scan(c_conn->owner,pmsg); + } + if (req_done(c_conn, TAILQ_FIRST(&c_conn->omsg_q))) { status = event_add_out(ctx->evb, c_conn); if (status != NC_OK) { diff --git a/src/nc_server.c b/src/nc_server.c index dab6a79b..03ad75fe 100644 --- a/src/nc_server.c +++ b/src/nc_server.c @@ -700,12 +700,18 @@ server_pool_idx(const struct server_pool *pool, const uint8_t *key, uint32_t key } static struct server * -server_pool_server(struct server_pool *pool, const uint8_t *key, uint32_t keylen) +server_pool_server(struct server_pool *pool, struct msg *r, const uint8_t *key, uint32_t keylen) { struct server *server; uint32_t idx; - idx = server_pool_idx(pool, key, keylen); + if (r->type == MSG_REQ_REDIS_SCAN) { + idx = r->server_index; + }else if (r->type == MSG_REQ_REDIS_SCRIPT) { + idx = r->server_index; + } else { + idx = server_pool_idx(pool, key, keylen); + } server = array_get(&pool->server, idx); log_debug(LOG_VERB, "key '%.*s' on dist %d maps to server '%.*s'", keylen, @@ -715,8 +721,7 @@ server_pool_server(struct server_pool *pool, const uint8_t *key, uint32_t keylen } struct conn * -server_pool_conn(struct context *ctx, struct server_pool *pool, const uint8_t *key, - uint32_t keylen) +server_pool_conn(struct context *ctx, struct server_pool *pool, struct msg *msg, const uint8_t *key, uint32_t keylen) { rstatus_t status; struct server *server; @@ -728,7 +733,7 @@ server_pool_conn(struct context *ctx, struct server_pool *pool, const uint8_t *k } /* from a given {key, keylen} pick a server from pool */ - server = server_pool_server(pool, key, keylen); + server = server_pool_server(pool, msg, key, keylen); if (server == NULL) { return NULL; } diff --git a/src/nc_server.h b/src/nc_server.h index 6bab9b56..b87466a2 100644 --- a/src/nc_server.h +++ b/src/nc_server.h @@ -137,7 +137,7 @@ void server_connected(struct context *ctx, struct conn *conn); void server_ok(struct context *ctx, struct conn *conn); uint32_t server_pool_idx(const struct server_pool *pool, const uint8_t *key, uint32_t keylen); -struct conn *server_pool_conn(struct context *ctx, struct server_pool *pool, const uint8_t *key, uint32_t keylen); +struct conn *server_pool_conn(struct context *ctx, struct server_pool *pool, struct msg *msg, const uint8_t *key, uint32_t keylen); rstatus_t server_pool_run(struct server_pool *pool); rstatus_t server_pool_preconnect(struct context *ctx); void server_pool_disconnect(struct context *ctx); diff --git a/src/proto/nc_redis.c b/src/proto/nc_redis.c index a80f84a3..7d7c78c3 100644 --- a/src/proto/nc_redis.c +++ b/src/proto/nc_redis.c @@ -283,6 +283,8 @@ redis_argn(const struct msg *r) case MSG_REQ_REDIS_GEOSEARCHSTORE: case MSG_REQ_REDIS_RESTORE: + case MSG_REQ_REDIS_SCAN: + case MSG_REQ_REDIS_SCRIPT: return true; default: @@ -750,6 +752,11 @@ redis_parse_req(struct msg *r) break; } + if (str4icmp(m, 's', 'c', 'a', 'n')) { + r->type = MSG_REQ_REDIS_SCAN; + break; + } + break; case 5: @@ -1009,6 +1016,11 @@ redis_parse_req(struct msg *r) break; } + if (str6icmp(m, 's', 'c', 'r', 'i', 'p', 't')) { + r->type = MSG_REQ_REDIS_SCRIPT; + break; + } + break; case 7: @@ -1633,14 +1645,14 @@ redis_parse_req(struct msg *r) m = p + r->rlen; if (m >= b->last) { - /* + /* * For EVAL/EVALHASH, the r->token has been assigned a value. When - * m >= b->last happens will need to repair mbuf. - * + * m >= b->last happens will need to repair mbuf. + * * At the end of redis_parse_req, r->token will be used to choose * the start (p) for the next call to redis_parse_req and clear * r->token when repairing this and adding more data. - * + * * So, only when r->token == NULL we need to calculate r->rlen again. */ if (r->token == NULL) { @@ -2585,12 +2597,18 @@ redis_copy_bulk(struct msg *dst, struct msg *src) } p = mbuf->pos; - ASSERT(*p == '$'); + // ASSERT(*p == '$'); p++; if (p[0] == '-' && p[1] == '1') { len = 1 + 2 + CRLF_LEN; /* $-1\r\n */ p = mbuf->pos + len; + } else if ((mbuf->pos)[0] == ':') { + for (; p < mbuf->last && isdigit(*p);) { + p++; + } + len = (p - mbuf->pos); + len += CRLF_LEN; } else { len = 0; for (; p < mbuf->last && isdigit(*p); p++) { @@ -2643,6 +2661,8 @@ redis_pre_coalesce(struct msg *r) { struct msg *pr = r->peer; /* peer request */ struct mbuf *mbuf; + uint8_t *key; + struct keypos *kpos; ASSERT(!r->request); ASSERT(pr->request); @@ -2653,6 +2673,9 @@ redis_pre_coalesce(struct msg *r) } pr->frag_owner->nfrag_done++; + kpos = array_get(pr->keys, 0); + key = kpos->start; + switch (r->type) { case MSG_RSP_REDIS_INTEGER: /* only redis 'del' fragmented request sends back integer reply */ @@ -2677,7 +2700,7 @@ redis_pre_coalesce(struct msg *r) case MSG_RSP_REDIS_MULTIBULK: /* only redis 'mget' fragmented request sends back multi-bulk reply */ - ASSERT(pr->type == MSG_REQ_REDIS_MGET); + ASSERT(pr->type == MSG_REQ_REDIS_MGET || pr->type == MSG_REQ_REDIS_SCRIPT); mbuf = STAILQ_FIRST(&r->mhdr); /* @@ -2693,6 +2716,26 @@ redis_pre_coalesce(struct msg *r) r->mlen -= (uint32_t)(r->narg_end - r->narg_start); mbuf->pos = r->narg_end; + if (pr->type == MSG_REQ_REDIS_SCRIPT && str6icmp(key, 'e', 'x', 'i', 's', 't', 's')) { + uint8_t *p; + uint32_t len = 0; + p = r->narg_start; + ASSERT(p[0] == '*'); + p++; + + if (p[0] == '-' && p[1] == '1') { + r->frag_multibulk_len = 0; + } else { + for(;p < r->narg_end && isdigit(*p); p++){ + len = 10*len + (uint32_t)(*p - '0'); + } + r->frag_multibulk_len = len; + } + } + + break; + + case MSG_RSP_REDIS_BULK: break; case MSG_RSP_REDIS_STATUS: @@ -2945,10 +2988,97 @@ redis_fragment_argx(struct msg *r, uint32_t nserver, struct msg_tqh *frag_msgq, return NC_OK; } +static rstatus_t redis_fragment_script(struct msg *r, struct msg_tqh *frag_msgq) { + struct server_pool *sp = r->owner->owner; + struct msg **sub_msgs; + uint32_t i,n; + struct mbuf *mbuf,*nbuf,*rbuf; + size_t mlen; + rstatus_t status; + struct keypos *rkpos; + uint32_t rklen; + uint32_t rkey_offset; + + ASSERT(sp != NULL); + + n = array_n(&sp->server); + log_debug(LOG_VVERB,"server_name: %.*s,server_count:%i",sp->name.len,sp->name.data,n); + + sub_msgs = nc_zalloc(n * sizeof(*sub_msgs)); + if (sub_msgs == NULL) { + return NC_ENOMEM; + } + + ASSERT(r->frag_seq == NULL); + r->frag_seq = nc_alloc(n * sizeof(*r->frag_seq)); + if (r->frag_seq == NULL) { + nc_free(sub_msgs); + return NC_ENOMEM; + } + + r->frag_id = msg_gen_frag_id(); + r->nfrag = 0; + r->frag_owner = r; + + ASSERT(array_n(r->keys)>0); + rkpos=array_get(r->keys,0); // key position in original request + rklen = (uint32_t)(rkpos->end - rkpos->start); // the keylen in original request + rbuf = STAILQ_FIRST(&r->mhdr); + if (rbuf == NULL) { + return NC_ERROR; + } + rkey_offset = (uint32_t)(rkpos->start - rbuf->pos); // the offset of key->start from rbuf->pos + + for (i = 0; i < n; i++) { /* create a sub_msg for per server */ + struct msg *sub_msg; + uint32_t idx = i; + if (sub_msgs[idx] == NULL) { + sub_msgs[idx] = msg_get(r->owner, r->request, r->redis); + if (sub_msgs[idx] == NULL) { + nc_free(sub_msgs); + return NC_ENOMEM; + } + } + r->frag_seq[i] = sub_msg = sub_msgs[idx]; + + sub_msg->narg = r->narg; + sub_msg->server_index = idx; + //copy r->mhdr + for (mbuf=STAILQ_FIRST(&r->mhdr);mbuf!=NULL;mbuf=nbuf) { + nbuf=STAILQ_NEXT(mbuf,next); + if(mbuf_empty(mbuf)) continue; + + mlen=mbuf_length(mbuf); + status=msg_append(sub_msg,mbuf->pos,mlen); + if (status != NC_OK) return status; + } + struct keypos *kpos; + kpos = array_push(sub_msg->keys); + if (kpos == NULL) { + return NC_ENOMEM; + } + mbuf = STAILQ_FIRST(&sub_msg->mhdr); + if (mbuf == NULL) { + return NC_ERROR; + } + kpos->start=mbuf->pos + rkey_offset; //confirm sub_msg key position + kpos->end=kpos->start + rklen; + + sub_msg->type = r->type; + sub_msg->frag_id = r->frag_id; + sub_msg->frag_owner = r->frag_owner; + TAILQ_INSERT_TAIL(frag_msgq, sub_msg, m_tqe); + r->nfrag++; + } + + nc_free(sub_msgs); + return NC_OK; +} + rstatus_t redis_fragment(struct msg *r, uint32_t nserver, struct msg_tqh *frag_msgq) { - if (1 == array_n(r->keys)){ + if (1 == array_n(r->keys) && r->type != MSG_REQ_REDIS_SCRIPT){ return NC_OK; } @@ -2962,7 +3092,8 @@ redis_fragment(struct msg *r, uint32_t nserver, struct msg_tqh *frag_msgq) /* TODO: MSETNX - instead of responding with OK, respond with 1 if all fragments respond with 1 */ case MSG_REQ_REDIS_MSET: return redis_fragment_argx(r, nserver, frag_msgq, 2); - + case MSG_REQ_REDIS_SCRIPT: + return redis_fragment_script(r,frag_msgq); default: return NC_OK; } @@ -3053,6 +3184,61 @@ redis_post_coalesce_mget(struct msg *request) } } +static void redis_post_coalesce_script(struct msg *request) +{ + struct msg *response = request->peer; + struct msg *sub_msg; + rstatus_t status; + uint32_t i,j; + uint8_t *key; + struct keypos *kpos; + + kpos = array_get(request->keys, 0); + key = kpos->start; + + for (i = 0; i < request->nfrag; i++) { /* for each key */ + sub_msg = request->frag_seq[i]->peer; /* get it's peer response */ + if (sub_msg == NULL) { + response->owner->err = 1; + return; + } + /* Only one response data is retained and the rest is discarded */ + if(i ==0){ + if(str6icmp(key, 'e', 'x', 'i', 's', 't', 's')){ + status = msg_prepend_format(response, "*%d\r\n", sub_msg->frag_multibulk_len); + if (status != NC_OK) { + response->owner->err = 1; + return; + } + for(j=0;jfrag_multibulk_len;j++){ + status = redis_copy_bulk(response, sub_msg); + if (status != NC_OK) { + response->owner->err = 1; + return; + } + } + }else{ + status = redis_copy_bulk(response, sub_msg); + } + }else{ + if(str6icmp(key, 'e', 'x', 'i', 's', 't', 's')){ + for(j=0;jfrag_multibulk_len;j++){ + status = redis_copy_bulk(NULL, sub_msg); + if (status != NC_OK) { + response->owner->err = 1; + return; + } + } + }else{ + status = redis_copy_bulk(NULL, sub_msg); + } + } + if (status != NC_OK) { + response->owner->err = 1; + return; + } + } +} /* * Post-coalesce handler is invoked when the message is a response to * the fragmented multi vector request - 'mget' or 'del' and all the @@ -3083,6 +3269,9 @@ redis_post_coalesce(struct msg *r) case MSG_REQ_REDIS_MSET: return redis_post_coalesce_mset(r); + case MSG_REQ_REDIS_SCRIPT: + return redis_post_coalesce_script(r); + default: NOT_REACHED(); } diff --git a/tests/test_redis/test_commands.py b/tests/test_redis/test_commands.py index f4ed3a73..a87e9fb9 100644 --- a/tests/test_redis/test_commands.py +++ b/tests/test_redis/test_commands.py @@ -101,3 +101,43 @@ def test_sscan(): assert_equal('0', str(cursor)) assert_equal({b'1'}, set(members)) + +def test_scan(): + r = getconn() + r.set('hello_scan_a1',11) + r.set('hello_scan_b1',22) + r.hmset("hello_scan_h1",{"a":1,"b":2}) + r.rpush("hello_scan_l1","a","b") + r.sadd("hello_scan_s1","a","a","b") + r.zadd("hello_scan_z1",{"one": 1, "two": 2, "three": 3}) + + zsetval=r.zrange("hello_scan_z1",0,-1,False,True) + + cursor = 0 + match_str = "hello_scan_*" + rets = [] + subrets = [] + while True: + cursor,subrets = r.scan(cursor,match_str,100) + if len(subrets): + rets.extend(subrets) + if cursor == 0: + break + rets.sort() + assert_equal(rets,[b'hello_scan_a1', b'hello_scan_b1', b'hello_scan_h1', b'hello_scan_l1', b'hello_scan_s1', b'hello_scan_z1']) + +def test_script_load_and_exits(): + r = getconn() + + evalsha=r.script_load("return redis.call('hset',KEYS[1],KEYS[1],KEYS[1])") + assert_equal(evalsha,"dbbae75a09f1390aaf069fb60e951ec23cab7a15") + + exists=r.script_exists("dbbae75a09f1390aaf069fb60e951ec23cab7a15") + assert_equal([True],exists) + + assert_equal(1,r.evalsha("dbbae75a09f1390aaf069fb60e951ec23cab7a15",1,"scriptA")) + + dic=r.hgetall("scriptA") + assert_equal(dic,{b'scriptA': b'scriptA'}) + + assert_equal(True,r.script_flush())