Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redis scan command support #678

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
317 changes: 9 additions & 308 deletions README.md

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions src/nc_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@
#define NC_EAGAIN -2
#define NC_ENOMEM -3

#define NC_MAX_NSERVER_BITS 12
#define NC_MAX_NSERVER_MASK ((1<<NC_MAX_NSERVER_BITS) -1)

/* reserved fds for std streams, log, stats fd, epoll etc. */
#define RESERVED_FDS 32

Expand Down
5 changes: 5 additions & 0 deletions src/nc_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ typedef enum msg_parse_result {
ACTION( REQ_REDIS_PERSIST ) \
ACTION( REQ_REDIS_PTTL ) \
ACTION( REQ_REDIS_SORT ) \
ACTION( REQ_REDIS_SCAN ) \
ACTION( REQ_REDIS_TOUCH ) \
ACTION( REQ_REDIS_TTL ) \
ACTION( REQ_REDIS_TYPE ) \
Expand Down Expand Up @@ -196,6 +197,7 @@ typedef enum msg_parse_result {
ACTION( REQ_REDIS_GEOSEARCHSTORE) \
ACTION( REQ_REDIS_EVAL ) /* redis requests - eval */ \
ACTION( REQ_REDIS_EVALSHA ) \
ACTION( REQ_REDIS_SCRIPT) \
ACTION( REQ_REDIS_PING ) /* redis requests - ping/quit */ \
ACTION( REQ_REDIS_QUIT) \
ACTION( REQ_REDIS_AUTH) \
Expand Down Expand Up @@ -288,6 +290,7 @@ struct msg {
uint32_t nfrag_done; /* # fragment done */
uint64_t frag_id; /* id of fragmented message */
struct msg **frag_seq; /* sequence of fragment message, map from keys to fragments*/
uint32_t frag_multibulk_len; /* fragment response multibulk length */

err_t err; /* errno on error? */
unsigned error:1; /* error? */
Expand All @@ -300,6 +303,8 @@ struct msg {
unsigned fdone:1; /* all fragments are done? */
unsigned swallow:1; /* swallow response? */
unsigned redis:1; /* redis? */

uint32_t server_index; /* the server index which the requstion should be forwarded */
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is not clear enough. how about this one:

the server index which the requstion should be forwarded.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is not clear enough. how about this one:

the server index which the requstion should be forwarded.

ok


TAILQ_HEAD(msg_tqh, msg);
Expand Down
48 changes: 47 additions & 1 deletion src/nc_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <nc_core.h>
#include <nc_server.h>
#include <stdlib.h>

struct msg *
req_get(struct conn *conn)
Expand Down Expand Up @@ -471,6 +472,49 @@ req_make_reply(struct context *ctx, struct conn *conn, struct msg *req)
return NC_OK;
}

/* If the user request is "scan 45066",
* the cursor 45066 in the request,
* we get server_index=45066 & NC_MAX_NSERVER_MASK=10,
* real_cursor = 45066>>NC_MAX_NSERVER_BITS = 11,
* and finally the request sent to the redis server will be "scan 00011".
*/
static void req_update_for_scan(struct server_pool *pool,struct msg *msg)
{
if (msg->type != MSG_REQ_REDIS_SCAN) {
return;
}

ASSERT(array_n(msg->keys) > 0);

uint8_t *key;
uint32_t keylen;
struct keypos *kpos;
uint32_t idx;
unsigned long long cursor;
unsigned long real_cursor;
char arr[16];
char format[16];

kpos = array_get(msg->keys, 0);
key = kpos->start;
keylen = (uint32_t)(kpos->end - kpos->start);

if (keylen == 1 && key[0] == '0') {
idx=0;
}else{
cursor=strtoull((const char *)key,NULL,10);
idx = cursor & NC_MAX_NSERVER_MASK;
if (array_n(&pool->server)<=idx){
idx=0;
}
real_cursor = (cursor >> NC_MAX_NSERVER_BITS);
sprintf(format,"%%0%dd",keylen);
sprintf(arr,format,real_cursor);
nc_memcpy(key,arr,keylen);
}
msg->server_index=idx;
}

static bool
req_filter(struct conn *conn, struct msg *msg)
{
Expand Down Expand Up @@ -511,6 +555,8 @@ req_filter(struct conn *conn, struct msg *msg)
msg->noforward = 1;
}

req_update_for_scan(conn->owner,msg);

return false;
}

Expand Down Expand Up @@ -573,7 +619,7 @@ req_forward(struct context *ctx, struct conn *c_conn, struct msg *msg)
key = kpos->start;
keylen = (uint32_t)(kpos->end - kpos->start);

s_conn = server_pool_conn(ctx, c_conn->owner, key, keylen);
s_conn = server_pool_conn(ctx, c_conn->owner, msg, key, keylen);
if (s_conn == NULL) {
/*
* Handle a failure to establish a new connection to a server,
Expand Down
46 changes: 46 additions & 0 deletions src/nc_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <nc_core.h>
#include <nc_server.h>
#include <stdlib.h>

struct msg *
rsp_get(struct conn *conn)
Expand Down Expand Up @@ -138,6 +139,47 @@ rsp_recv_next(struct context *ctx, struct conn *conn, bool alloc)
return msg;
}

static void
rsp_update_for_scan(struct server_pool *pool,struct msg *request) {
struct msg *response = request->peer;
struct mbuf *mbuf,*nbuf;
struct mbuf *first_mbuf;
rstatus_t status;
char tmp_str[40];
int len;

for(mbuf=STAILQ_FIRST(&response->mhdr);mbuf!=NULL;mbuf=nbuf){
nbuf=STAILQ_NEXT(mbuf,next);
if(mbuf_empty(mbuf)) continue;

first_mbuf=mbuf;
break;
}
ASSERT(nc_strncmp(first_mbuf->pos,"*2\r\n$",strlen("*2\r\n$")) ==0);
uint8_t * p=nc_strchr(first_mbuf->pos + sizeof("*2\r\n$"),first_mbuf->last,'\n');
unsigned long cursor = strtoul((const char *)(p+1),NULL,10);
unsigned long next_cursor;
if(cursor == 0 && request->server_index == array_n(&pool->server)-1){
// all redis servers have been scanned, and the scan command of the last redis server has returned.
return;
}else if(cursor ==0 && request->server_index < array_n(&pool->server)-1){
// the current redis server have been scanned,now we continue scan next redis server
next_cursor=(cursor << NC_MAX_NSERVER_BITS) | (request->server_index+1);
}else{
// the current redis server scan not finish , go on
next_cursor=(cursor << NC_MAX_NSERVER_BITS) | request->server_index;
}
// discard the old head "*2\r\n$%d\r\n\%dr\n"
p=nc_strchr(p+1,first_mbuf->last,'\n');
ASSERT(p < first_mbuf->last);
first_mbuf->pos = p+1;

// we get a new head "*2\r\n$%d\r\n\%dr\n", the cursor contain server index
len=sprintf(tmp_str,"%ld",next_cursor);
status=msg_prepend_format(response,"*2\r\n$%d\r\n%ld\r\n",len,next_cursor);
ASSERT(status == NC_OK);
}

static bool
rsp_filter(struct context *ctx, struct conn *conn, struct msg *msg)
{
Expand Down Expand Up @@ -261,6 +303,10 @@ rsp_forward(struct context *ctx, struct conn *s_conn, struct msg *msg)
c_conn = pmsg->owner;
ASSERT(c_conn->client && !c_conn->proxy);

if(pmsg->type == MSG_REQ_REDIS_SCAN){
rsp_update_for_scan(c_conn->owner,pmsg);
}

if (req_done(c_conn, TAILQ_FIRST(&c_conn->omsg_q))) {
status = event_add_out(ctx->evb, c_conn);
if (status != NC_OK) {
Expand Down
15 changes: 10 additions & 5 deletions src/nc_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -700,12 +700,18 @@ server_pool_idx(const struct server_pool *pool, const uint8_t *key, uint32_t key
}

static struct server *
server_pool_server(struct server_pool *pool, const uint8_t *key, uint32_t keylen)
server_pool_server(struct server_pool *pool, struct msg *r, const uint8_t *key, uint32_t keylen)
{
struct server *server;
uint32_t idx;

idx = server_pool_idx(pool, key, keylen);
if (r->type == MSG_REQ_REDIS_SCAN) {
idx = r->server_index;
}else if (r->type == MSG_REQ_REDIS_SCRIPT) {
idx = r->server_index;
} else {
idx = server_pool_idx(pool, key, keylen);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the cursor of request in req_filter and save the server_index on msg in the mean time.

server = array_get(&pool->server, idx);

log_debug(LOG_VERB, "key '%.*s' on dist %d maps to server '%.*s'", keylen,
Expand All @@ -715,8 +721,7 @@ server_pool_server(struct server_pool *pool, const uint8_t *key, uint32_t keylen
}

struct conn *
server_pool_conn(struct context *ctx, struct server_pool *pool, const uint8_t *key,
uint32_t keylen)
server_pool_conn(struct context *ctx, struct server_pool *pool, struct msg *msg, const uint8_t *key, uint32_t keylen)
{
rstatus_t status;
struct server *server;
Expand All @@ -728,7 +733,7 @@ server_pool_conn(struct context *ctx, struct server_pool *pool, const uint8_t *k
}

/* from a given {key, keylen} pick a server from pool */
server = server_pool_server(pool, key, keylen);
server = server_pool_server(pool, msg, key, keylen);
if (server == NULL) {
return NULL;
}
Expand Down
2 changes: 1 addition & 1 deletion src/nc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ void server_connected(struct context *ctx, struct conn *conn);
void server_ok(struct context *ctx, struct conn *conn);

uint32_t server_pool_idx(const struct server_pool *pool, const uint8_t *key, uint32_t keylen);
struct conn *server_pool_conn(struct context *ctx, struct server_pool *pool, const uint8_t *key, uint32_t keylen);
struct conn *server_pool_conn(struct context *ctx, struct server_pool *pool, struct msg *msg, const uint8_t *key, uint32_t keylen);
rstatus_t server_pool_run(struct server_pool *pool);
rstatus_t server_pool_preconnect(struct context *ctx);
void server_pool_disconnect(struct context *ctx);
Expand Down
Loading