Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redis scan command support #678

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/nc_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@
#define NC_EAGAIN -2
#define NC_ENOMEM -3

#define NC_MAX_NSERVER_BITS 12
#define NC_MAX_NSERVER_MASK ((1<<NC_MAX_NSERVER_BITS) -1)

/* reserved fds for std streams, log, stats fd, epoll etc. */
#define RESERVED_FDS 32

Expand Down
3 changes: 3 additions & 0 deletions src/nc_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ typedef enum msg_parse_result {
ACTION( REQ_REDIS_PERSIST ) \
ACTION( REQ_REDIS_PTTL ) \
ACTION( REQ_REDIS_SORT ) \
ACTION( REQ_REDIS_SCAN ) \
ACTION( REQ_REDIS_TOUCH ) \
ACTION( REQ_REDIS_TTL ) \
ACTION( REQ_REDIS_TYPE ) \
Expand Down Expand Up @@ -300,6 +301,8 @@ struct msg {
unsigned fdone:1; /* all fragments are done? */
unsigned swallow:1; /* swallow response? */
unsigned redis:1; /* redis? */

uint32_t server_index; /* used for store the redis server index in server pool */
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is not clear enough. how about this one:

the server index which the requstion should be forwarded.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is not clear enough. how about this one:

the server index which the requstion should be forwarded.

ok


TAILQ_HEAD(msg_tqh, msg);
Expand Down
36 changes: 35 additions & 1 deletion src/nc_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,15 @@ req_make_reply(struct context *ctx, struct conn *conn, struct msg *req)
static bool
req_filter(struct conn *conn, struct msg *msg)
{
uint8_t *key;
uint32_t keylen;
struct keypos *kpos;
uint32_t idx;
unsigned long long cursor;
unsigned long real_cursor;
char arr[16];
char format[16];

ASSERT(conn->client && !conn->proxy);

if (msg_empty(msg)) {
Expand Down Expand Up @@ -511,6 +520,31 @@ req_filter(struct conn *conn, struct msg *msg)
msg->noforward = 1;
}

if (msg->type == MSG_REQ_REDIS_SCAN) {
ASSERT(array_n(msg->keys) > 0);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this into a function and call the function here like what you did in rsp_filter.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good advice

kpos = array_get(msg->keys, 0);
key = kpos->start;
keylen = (uint32_t)(kpos->end - kpos->start);

if (keylen == 1 && key[0] == '0') {
idx=0;
}else{
/* If the user request is "scan 45066",
the cursor 45066 in the request,
we get server_index=45066 & NC_MAX_NSERVER_MASK=10,
real_cursor = 45066>>NC_MAX_NSERVER_BITS = 11,
and finally the request sent by the proxy to the redis server will be "scan 00011".
*/
cursor=strtoull(key,NULL,10);
idx = cursor & NC_MAX_NSERVER_MASK;
real_cursor = (cursor >> NC_MAX_NSERVER_BITS);
sprintf(format,"%%0%dd",keylen);
sprintf(arr,format,real_cursor);
nc_memcpy(key,arr,keylen);
}
msg->server_index=idx;
}

return false;
}

Expand Down Expand Up @@ -573,7 +607,7 @@ req_forward(struct context *ctx, struct conn *c_conn, struct msg *msg)
key = kpos->start;
keylen = (uint32_t)(kpos->end - kpos->start);

s_conn = server_pool_conn(ctx, c_conn->owner, key, keylen);
s_conn = server_pool_conn(ctx, c_conn->owner, msg, key, keylen);
if (s_conn == NULL) {
/*
* Handle a failure to establish a new connection to a server,
Expand Down
46 changes: 46 additions & 0 deletions src/nc_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <nc_core.h>
#include <nc_server.h>
#include <stdlib.h>

struct msg *
rsp_get(struct conn *conn)
Expand Down Expand Up @@ -138,6 +139,47 @@ rsp_recv_next(struct context *ctx, struct conn *conn, bool alloc)
return msg;
}

static void
rsp_update_for_scan(struct server_pool *pool,struct msg *request) {
struct msg *response = request->peer;
struct mbuf *mbuf,*nbuf;
struct mbuf *first_mbuf;
rstatus_t status;
char tmp_str[40];
int len;

for(mbuf=STAILQ_FIRST(&response->mhdr);mbuf!=NULL;mbuf=nbuf){
nbuf=STAILQ_NEXT(mbuf,next);
if(mbuf_empty(mbuf)) continue;

first_mbuf=mbuf;
break;
}
ASSERT(nc_strncmp(first_mbuf->pos,"*2\r\n$",strlen("*2\r\n$")) ==0);
uint8_t * p=nc_strchr(first_mbuf->pos + sizeof("*2\r\n$"),first_mbuf->last,'\n');
unsigned long cursor = strtoul((const char *)(p+1),NULL,10);
unsigned long next_cursor;
if(cursor == 0 && request->server_index == array_n(&pool->server)-1){
// all redis servers have been scanned, and the scan command of the last redis server has returned.
return;
}else if(cursor ==0 && request->server_index < array_n(&pool->server)-1){
// the current redis server have been scanned,now we continue scan next redis server
next_cursor=(cursor << NC_MAX_NSERVER_BITS) | (request->server_index+1);
}else{
// the current redis server scan not finish , go on
next_cursor=(cursor << NC_MAX_NSERVER_BITS) | request->server_index;
}
// discard the old head "*2\r\n$%d\r\n\%dr\n"
p=nc_strchr(p+1,first_mbuf->last,'\n');
ASSERT(p < first_mbuf->last);
first_mbuf->pos = p+1;

// we get a new head "*2\r\n$%d\r\n\%dr\n", the cursor contain server index
len=sprintf(tmp_str,"%ld",next_cursor);
status=msg_prepend_format(response,"*2\r\n$%d\r\n%ld\r\n",len,next_cursor);
ASSERT(status == NC_OK);
}

static bool
rsp_filter(struct context *ctx, struct conn *conn, struct msg *msg)
{
Expand Down Expand Up @@ -261,6 +303,10 @@ rsp_forward(struct context *ctx, struct conn *s_conn, struct msg *msg)
c_conn = pmsg->owner;
ASSERT(c_conn->client && !c_conn->proxy);

if(pmsg->type == MSG_REQ_REDIS_SCAN){
rsp_update_for_scan(c_conn->owner,pmsg);
}

if (req_done(c_conn, TAILQ_FIRST(&c_conn->omsg_q))) {
status = event_add_out(ctx->evb, c_conn);
if (status != NC_OK) {
Expand Down
13 changes: 8 additions & 5 deletions src/nc_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -700,12 +700,16 @@ server_pool_idx(const struct server_pool *pool, const uint8_t *key, uint32_t key
}

static struct server *
server_pool_server(struct server_pool *pool, const uint8_t *key, uint32_t keylen)
server_pool_server(struct server_pool *pool, struct msg *r, const uint8_t *key, uint32_t keylen)
{
struct server *server;
uint32_t idx;

idx = server_pool_idx(pool, key, keylen);
if (r->type == MSG_REQ_REDIS_SCAN) {
idx = r->server_index;
}else{
idx = server_pool_idx(pool, key, keylen);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the cursor of request in req_filter and save the server_index on msg in the mean time.

server = array_get(&pool->server, idx);

log_debug(LOG_VERB, "key '%.*s' on dist %d maps to server '%.*s'", keylen,
Expand All @@ -715,8 +719,7 @@ server_pool_server(struct server_pool *pool, const uint8_t *key, uint32_t keylen
}

struct conn *
server_pool_conn(struct context *ctx, struct server_pool *pool, const uint8_t *key,
uint32_t keylen)
server_pool_conn(struct context *ctx, struct server_pool *pool, struct msg *msg, const uint8_t *key, uint32_t keylen)
{
rstatus_t status;
struct server *server;
Expand All @@ -728,7 +731,7 @@ server_pool_conn(struct context *ctx, struct server_pool *pool, const uint8_t *k
}

/* from a given {key, keylen} pick a server from pool */
server = server_pool_server(pool, key, keylen);
server = server_pool_server(pool, msg, key, keylen);
if (server == NULL) {
return NULL;
}
Expand Down
2 changes: 1 addition & 1 deletion src/nc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ void server_connected(struct context *ctx, struct conn *conn);
void server_ok(struct context *ctx, struct conn *conn);

uint32_t server_pool_idx(const struct server_pool *pool, const uint8_t *key, uint32_t keylen);
struct conn *server_pool_conn(struct context *ctx, struct server_pool *pool, const uint8_t *key, uint32_t keylen);
struct conn *server_pool_conn(struct context *ctx, struct server_pool *pool, struct msg *msg, const uint8_t *key, uint32_t keylen);
rstatus_t server_pool_run(struct server_pool *pool);
rstatus_t server_pool_preconnect(struct context *ctx);
void server_pool_disconnect(struct context *ctx);
Expand Down
14 changes: 10 additions & 4 deletions src/proto/nc_redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ redis_argn(const struct msg *r)
case MSG_REQ_REDIS_GEOSEARCHSTORE:

case MSG_REQ_REDIS_RESTORE:
case MSG_REQ_REDIS_SCAN:
return true;

default:
Expand Down Expand Up @@ -750,6 +751,11 @@ redis_parse_req(struct msg *r)
break;
}

if (str4icmp(m, 's', 'c', 'a', 'n')) {
r->type = MSG_REQ_REDIS_SCAN;
break;
}

break;

case 5:
Expand Down Expand Up @@ -1633,14 +1639,14 @@ redis_parse_req(struct msg *r)

m = p + r->rlen;
if (m >= b->last) {
/*
/*
* For EVAL/EVALHASH, the r->token has been assigned a value. When
* m >= b->last happens will need to repair mbuf.
*
* m >= b->last happens will need to repair mbuf.
*
* At the end of redis_parse_req, r->token will be used to choose
* the start (p) for the next call to redis_parse_req and clear
* r->token when repairing this and adding more data.
*
*
* So, only when r->token == NULL we need to calculate r->rlen again.
*/
if (r->token == NULL) {
Expand Down
24 changes: 24 additions & 0 deletions tests/test_redis/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,27 @@ def test_sscan():
assert_equal('0', str(cursor))
assert_equal({b'1'}, set(members))


def test_scan():
r = getconn()
r.set('hello_scan_a1',11)
r.set('hello_scan_b1',22)
r.hmset("hello_scan_h1",{"a":1,"b":2})
r.rpush("hello_scan_l1","a","b")
r.sadd("hello_scan_s1","a","a","b")
r.zadd("hello_scan_z1",{"one": 1, "two": 2, "three": 3})

zsetval=r.zrange("hello_scan_z1",0,-1,False,True)

cursor = 0
match_str = "hello_scan_*"
rets = []
subrets = []
while True:
cursor,subrets = r.scan(cursor,match_str,100)
if len(subrets):
rets.extend(subrets)
if cursor == 0:
break
rets.sort()
assert_equal(rets,[b'hello_scan_a1', b'hello_scan_b1', b'hello_scan_h1', b'hello_scan_l1', b'hello_scan_s1', b'hello_scan_z1'])