Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support of script load command by broadcast #424

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/nc_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ _msg_get(void)
msg->result = MSG_PARSE_OK;

msg->fragment = NULL;
msg->broadcast = NULL;
msg->reply = NULL;
msg->pre_coalesce = NULL;
msg->post_coalesce = NULL;
Expand Down Expand Up @@ -295,6 +296,7 @@ msg_get(struct conn *conn, bool request, bool redis)
}
msg->add_auth = redis_add_auth;
msg->fragment = redis_fragment;
msg->broadcast = redis_broadcast;
msg->reply = redis_reply;
msg->failure = redis_failure;
msg->pre_coalesce = redis_pre_coalesce;
Expand Down Expand Up @@ -888,3 +890,26 @@ msg_send(struct context *ctx, struct conn *conn)

return NC_OK;
}

struct msg *
msg_clone(struct msg *r) {
struct msg *ret = msg_get(r->owner, r->request, r->redis);
struct mbuf *mbuf, *nbuf; /* current and next mbuf */
rstatus_t status; uint32_t mlen;

for (mbuf = STAILQ_FIRST(&r->mhdr); mbuf != NULL; mbuf = nbuf) {
nbuf = STAILQ_NEXT(mbuf, next);

if (mbuf_empty(mbuf)) {
continue;
}

mlen = mbuf_length(mbuf);
status = msg_append(ret, mbuf->pos, mlen);
if (status != NC_OK) {
msg_put(ret);
return NULL;
}
}
return ret;
}
4 changes: 4 additions & 0 deletions src/nc_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ typedef rstatus_t (*msg_add_auth_t)(struct context *ctx, struct conn *c_conn, st
typedef rstatus_t (*msg_fragment_t)(struct msg *, uint32_t, struct msg_tqh *);
typedef void (*msg_coalesce_t)(struct msg *r);
typedef rstatus_t (*msg_reply_t)(struct msg *r);
typedef bool (*msg_broadcast_t)(struct msg *r);
typedef bool (*msg_failure_t)(struct msg *r);

typedef enum msg_parse_result {
Expand Down Expand Up @@ -161,6 +162,7 @@ typedef enum msg_parse_result {
ACTION( REQ_REDIS_ZSCORE ) \
ACTION( REQ_REDIS_ZUNIONSTORE ) \
ACTION( REQ_REDIS_ZSCAN) \
ACTION( REQ_REDIS_SCRIPT ) /* redis requests - script */ \
ACTION( REQ_REDIS_EVAL ) /* redis requests - eval */ \
ACTION( REQ_REDIS_EVALSHA ) \
ACTION( REQ_REDIS_PING ) /* redis requests - ping/quit */ \
Expand Down Expand Up @@ -223,6 +225,7 @@ struct msg {

msg_fragment_t fragment; /* message fragment */
msg_reply_t reply; /* generate message reply (example: ping) */
msg_broadcast_t broadcast; /* check this message need broadcast */
msg_add_auth_t add_auth; /* add auth message when we forward msg */
msg_failure_t failure; /* transient failure response? */

Expand Down Expand Up @@ -273,6 +276,7 @@ void msg_deinit(void);
struct string *msg_type_string(msg_type_t type);
struct msg *msg_get(struct conn *conn, bool request, bool redis);
void msg_put(struct msg *msg);
struct msg *msg_clone(struct msg *src);
struct msg *msg_get_error(bool redis, err_t err);
void msg_dump(struct msg *msg, int level);
bool msg_empty(struct msg *msg);
Expand Down
72 changes: 62 additions & 10 deletions src/nc_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -553,14 +553,14 @@ req_forward_stats(struct context *ctx, struct server *server, struct msg *msg)
}

static void
req_forward(struct context *ctx, struct conn *c_conn, struct msg *msg)
req_forward_base(struct context *ctx, struct conn *c_conn, struct msg *msg, struct conn *s_conn)
{
rstatus_t status;
struct conn *s_conn;
struct server_pool *pool;
uint8_t *key;
uint32_t keylen;
struct keypos *kpos;
bool fixed_dest = !s_conn;

ASSERT(c_conn->client && !c_conn->proxy);

Expand All @@ -571,12 +571,14 @@ req_forward(struct context *ctx, struct conn *c_conn, struct msg *msg)

pool = c_conn->owner;

ASSERT(array_n(msg->keys) > 0);
kpos = array_get(msg->keys, 0);
key = kpos->start;
keylen = (uint32_t)(kpos->end - kpos->start);
if (s_conn == NULL) {
ASSERT(array_n(msg->keys) > 0);
kpos = array_get(msg->keys, 0);
key = kpos->start;
keylen = (uint32_t)(kpos->end - kpos->start);

s_conn = server_pool_conn(ctx, c_conn->owner, key, keylen);
s_conn = server_pool_conn(ctx, c_conn->owner, key, keylen);
}
if (s_conn == NULL) {
req_forward_error(ctx, c_conn, msg);
return;
Expand Down Expand Up @@ -606,9 +608,47 @@ req_forward(struct context *ctx, struct conn *c_conn, struct msg *msg)

req_forward_stats(ctx, s_conn->owner, msg);

log_debug(LOG_VERB, "forward from c %d to s %d req %"PRIu64" len %"PRIu32
" type %d with key '%.*s'", c_conn->sd, s_conn->sd, msg->id,
msg->mlen, msg->type, keylen, key);
if (fixed_dest) {
log_debug(LOG_VERB, "forward from c %d to s %d req %"PRIu64" len %"PRIu32
" type %d", c_conn->sd, s_conn->sd, msg->id,
msg->mlen, msg->type);
}
else {
log_debug(LOG_VERB, "forward from c %d to s %d req %"PRIu64" len %"PRIu32
" type %d with key '%.*s'", c_conn->sd, s_conn->sd, msg->id,
msg->mlen, msg->type, keylen, key);
}
}

static void
req_forward(struct context *ctx, struct conn *c_conn, struct msg *msg)
{
req_forward_base(ctx, c_conn, msg, NULL);
}

struct broadcast_data {
struct msg *msg;
struct context *ctx;
};

static rstatus_t req_server_pool_iter(void *elem, void *data)
{
struct broadcast_data *p = data;
struct conn *s_conn = server_established_conn(p->ctx, elem);
struct msg *copied = msg_clone(p->msg);
if (!copied) {
return NC_ENOMEM;
}
req_forward_base(p->ctx, p->msg->owner, copied, s_conn);
return NC_OK;
}

static rstatus_t
broadcast(struct context *ctx, struct conn *c_conn, struct msg *msg)
{
struct server_pool *pool = c_conn->owner;
struct broadcast_data data = { msg, ctx };
return array_each(&(pool->server), req_server_pool_iter, &data);
}

void
Expand Down Expand Up @@ -655,6 +695,18 @@ req_recv_done(struct context *ctx, struct conn *conn, struct msg *msg,
return;
}

/* if required, do broadcast */
if (msg->broadcast && msg->broadcast(msg)) {
status = broadcast(ctx, conn, msg);
if (status != NC_OK) {
if (!msg->noreply) {
conn->enqueue_outq(ctx, conn, msg);
}
req_forward_error(ctx, conn, msg);
}
return;
}

/* do fragment */
pool = conn->owner;
TAILQ_INIT(&frag_msgq);
Expand Down
11 changes: 11 additions & 0 deletions src/nc_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,17 @@ server_conn(struct server *server)
return conn;
}

struct conn *
server_established_conn(struct context *ctx, struct server *server) {
struct conn *conn = server_conn(server);
rstatus_t status = server_connect(ctx, server, conn);
if (status != NC_OK) {
server_close(ctx, conn);
return NULL;
}
return conn;
}

static rstatus_t
server_each_preconnect(void *elem, void *data)
{
Expand Down
1 change: 1 addition & 0 deletions src/nc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ bool server_active(struct conn *conn);
rstatus_t server_init(struct array *server, struct array *conf_server, struct server_pool *sp);
void server_deinit(struct array *server);
struct conn *server_conn(struct server *server);
struct conn *server_established_conn(struct context *ctx, struct server *server);
rstatus_t server_connect(struct context *ctx, struct server *server, struct conn *conn);
void server_close(struct context *ctx, struct conn *conn);
void server_connected(struct context *ctx, struct conn *conn);
Expand Down
1 change: 1 addition & 0 deletions src/proto/nc_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ void redis_pre_coalesce(struct msg *r);
void redis_post_coalesce(struct msg *r);
rstatus_t redis_add_auth(struct context *ctx, struct conn *c_conn, struct conn *s_conn);
rstatus_t redis_fragment(struct msg *r, uint32_t ncontinuum, struct msg_tqh *frag_msgq);
bool redis_broadcast(struct msg *r);
rstatus_t redis_reply(struct msg *r);
void redis_post_connect(struct context *ctx, struct conn *conn, struct server *server);
void redis_swallow_msg(struct conn *conn, struct msg *pmsg, struct msg *msg);
Expand Down
31 changes: 31 additions & 0 deletions src/proto/nc_redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ redis_argn(struct msg *r)
case MSG_REQ_REDIS_ZREVRANGEBYSCORE:
case MSG_REQ_REDIS_ZUNIONSTORE:
case MSG_REQ_REDIS_ZSCAN:

case MSG_REQ_REDIS_SCRIPT:
return true;

default:
Expand Down Expand Up @@ -316,6 +318,25 @@ redis_argeval(struct msg *r)
return false;
}


/*
* return true if the redis command needs to broadcast to all cluster members.
*
*/
static bool
redis_need_broadcast(struct msg *r)
{
switch (r->type) {
case MSG_REQ_REDIS_SCRIPT:
return true;

default:
break;
}

return false;
}

/*
* Return true, if the redis response is an error response i.e. a simple
* string whose first character is '-', otherwise return false.
Expand Down Expand Up @@ -879,6 +900,11 @@ redis_parse_req(struct msg *r)
break;
}

if (str6icmp(m, 's', 'c', 'r', 'i', 'p', 't')) {
r->type = MSG_REQ_REDIS_SCRIPT;
break;
}

break;

case 7:
Expand Down Expand Up @@ -2675,6 +2701,11 @@ redis_fragment(struct msg *r, uint32_t ncontinuum, struct msg_tqh *frag_msgq)
}
}

bool
redis_broadcast(struct msg *r) {
return redis_need_broadcast(r);
}

rstatus_t
redis_reply(struct msg *r)
{
Expand Down
Loading