Skip to content

Commit

Permalink
feature: added undocumented API for attaching arbitrary key-value pai…
Browse files Browse the repository at this point in the history
…r data to a cosocket TCP connecton, which can survive connection pool reuse.

This API may change in the future without notice.
  • Loading branch information
agentzh committed Oct 10, 2020
1 parent 7367a23 commit fdf752d
Show file tree
Hide file tree
Showing 2 changed files with 267 additions and 0 deletions.
244 changes: 244 additions & 0 deletions src/ngx_http_lua_socket_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ static char ngx_http_lua_ssl_session_metatable_key;
#endif


#define ngx_http_lua_tcp_socket_metatable_literal_key "__tcp_cosocket_mt"


void
ngx_http_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L)
{
Expand Down Expand Up @@ -355,6 +358,12 @@ ngx_http_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L)
lua_pushvalue(L, -1);
lua_setfield(L, -2, "__index");
lua_rawset(L, LUA_REGISTRYINDEX);

lua_pushliteral(L, ngx_http_lua_tcp_socket_metatable_literal_key);
lua_pushlightuserdata(L, ngx_http_lua_lightudata_mask(
tcp_socket_metatable_key));
lua_rawget(L, LUA_REGISTRYINDEX);
lua_rawset(L, LUA_REGISTRYINDEX);
/* }}} */

/* {{{upstream userdata metatable */
Expand Down Expand Up @@ -5382,6 +5391,8 @@ ngx_http_lua_socket_tcp_setkeepalive(lua_State *L)
item->socklen = pc->socklen;
ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen);
item->reused = u->reused;
item->udata_queue = u->udata_queue;
u->udata_queue = NULL;

if (c->read->ready) {
rc = ngx_http_lua_socket_keepalive_close_handler(c->read);
Expand Down Expand Up @@ -5457,6 +5468,8 @@ ngx_http_lua_get_keepalive_peer(ngx_http_request_t *r,
pc->cached = 1;

u->reused = item->reused + 1;
u->udata_queue = item->udata_queue;
item->udata_queue = NULL;

#if 1
u->write_event_handler = ngx_http_lua_socket_dummy_handler;
Expand Down Expand Up @@ -6151,4 +6164,235 @@ ngx_http_lua_cleanup_conn_pools(lua_State *L)
lua_pop(L, 1);
}


int
ngx_http_lua_ffi_socket_tcp_init_udata_queue(
ngx_http_lua_socket_tcp_upstream_t *u, int capacity, char **err_msg)
{
int i, max_size;
ngx_pool_t *pool;
ngx_http_lua_socket_udata_queue_t *udata_queue;
ngx_http_lua_socket_node_t *node;

pool = u->peer.connection->pool;

if (u->udata_queue == NULL) {
max_size = capacity;
if (max_size == 0) {
max_size = 4;
}

udata_queue = ngx_palloc(pool,
sizeof(ngx_http_lua_socket_udata_queue_t) +
sizeof(ngx_http_lua_socket_node_t) * max_size);

if (udata_queue == NULL) {
*err_msg = "no memory";
return NGX_ERROR;
}

udata_queue->pool = pool;
udata_queue->capacity = capacity;
udata_queue->len = 0;
ngx_queue_init(&udata_queue->queue);
ngx_queue_init(&udata_queue->free);

node = (ngx_http_lua_socket_node_t *) (udata_queue + 1);

for (i = 0; i < max_size; i++) {
ngx_queue_insert_head(&udata_queue->free, &node->queue);
node++;
}

u->udata_queue = udata_queue;

ngx_log_debug3(NGX_LOG_DEBUG_HTTP, u->request->connection->log, 0,
"init udata_queue %uD, cosocket %p udata %p",
capacity, u, udata_queue);
}

return NGX_OK;
}


int
ngx_http_lua_ffi_socket_tcp_count_udata(ngx_http_lua_socket_tcp_upstream_t *u)
{
/* return NGX_ERROR (-1) for missing udata_queue to
* distinguish it from empty udata_queue */
if (u->udata_queue == NULL) {
return NGX_ERROR;
}

return u->udata_queue->len;
}


int
ngx_http_lua_ffi_socket_tcp_add_udata(ngx_http_lua_socket_tcp_upstream_t *u,
uint64_t key, uint64_t value, uint64_t *evicted_key,
uint64_t *evicted_value, char **err_msg)
{
int evicted = 0;
ngx_pool_t *pool;
ngx_http_lua_socket_node_t *node = NULL;
ngx_queue_t *q, *uqueue;

pool = u->peer.connection->pool;

if (u->udata_queue == NULL) {
*err_msg = "no udata queue";
return NGX_ERROR;
}

uqueue = &u->udata_queue->queue;

for (q = ngx_queue_head(uqueue);
q != ngx_queue_sentinel(uqueue);
q = ngx_queue_next(q))
{
node = ngx_queue_data(q, ngx_http_lua_socket_node_t, queue);

if (node->key == key) {
/* key exists */
ngx_log_debug3(NGX_LOG_DEBUG_HTTP, u->request->connection->log, 0,
"found %uD, cosocket %p udata %p",
key, u, u->udata_queue);
ngx_queue_remove(q);
node->value = value;

break;
}
}

if (q == ngx_queue_sentinel(uqueue)) {

if (u->udata_queue->capacity
&& u->udata_queue->capacity == u->udata_queue->len)
{
/* evict key */
q = ngx_queue_last(uqueue);
node = ngx_queue_data(q, ngx_http_lua_socket_node_t, queue);
ngx_queue_remove(q);
ngx_log_debug4(NGX_LOG_DEBUG_HTTP, u->request->connection->log, 0,
"evict %uD for %uD, cosocket %p udata %p",
node->key, key, u, u->udata_queue);
*evicted_key = node->key;
*evicted_value = node->value;
evicted = 1;

} else {
/* insert key */
ngx_log_debug3(NGX_LOG_DEBUG_HTTP, u->request->connection->log, 0,
"insert %uD, cosocket %p udata %p",
key, u, u->udata_queue);

if (!ngx_queue_empty(&u->udata_queue->free)) {
q = ngx_queue_head(&u->udata_queue->free);
node = ngx_queue_data(q, ngx_http_lua_socket_node_t, queue);
ngx_queue_remove(q);
ngx_log_debug3(NGX_LOG_DEBUG_HTTP, u->request->connection->log,
0, "reuse free node %p, cosocket %p udata %p",
node, u, u->udata_queue);

} else {
node = ngx_palloc(pool, sizeof(ngx_http_lua_socket_node_t));
if (node == NULL) {
goto nomem;
}

ngx_log_debug3(NGX_LOG_DEBUG_HTTP, u->request->connection->log,
0, "allocate new node %p, cosocket %p udata %p",
node, u, u->udata_queue);
}

u->udata_queue->len++;
}

node->key = key;
node->value = value;
}

ngx_queue_insert_head(uqueue, &node->queue);
return evicted ? NGX_DONE : NGX_OK;

nomem:

*err_msg = "no memory";
return NGX_ERROR;
}


int
ngx_http_lua_ffi_socket_tcp_get_udata(ngx_http_lua_socket_tcp_upstream_t *u,
uint64_t key, uint64_t *value, char **err_msg)
{
ngx_http_lua_socket_node_t *node;
ngx_queue_t *q, *uqueue;

if (u->udata_queue == NULL) {
*err_msg = "no udata queue";
return NGX_ERROR;
}

uqueue = &u->udata_queue->queue;

for (q = ngx_queue_head(uqueue);
q != ngx_queue_sentinel(uqueue);
q = ngx_queue_next(q))
{
node = ngx_queue_data(q, ngx_http_lua_socket_node_t, queue);

if (node->key == key) {
ngx_log_debug3(NGX_LOG_DEBUG_HTTP, u->request->connection->log, 0,
"found %uD, cosocket %p udata %p",
key, u, u->udata_queue);
ngx_queue_remove(q);
ngx_queue_insert_head(uqueue, &node->queue);
*value = node->value;
return NGX_OK;
}
}

*err_msg = "not found";
return NGX_ERROR;
}


int
ngx_http_lua_ffi_socket_tcp_del_udata(ngx_http_lua_socket_tcp_upstream_t *u,
uint64_t key, char **err_msg)
{
ngx_http_lua_socket_node_t *node;
ngx_queue_t *q, *uqueue;

if (u->udata_queue == NULL) {
*err_msg = "no udata queue";
return NGX_ERROR;
}

uqueue = &u->udata_queue->queue;

for (q = ngx_queue_head(uqueue);
q != ngx_queue_sentinel(uqueue);
q = ngx_queue_next(q))
{
node = ngx_queue_data(q, ngx_http_lua_socket_node_t, queue);

if (node->key == key) {
ngx_log_debug3(NGX_LOG_DEBUG_HTTP, u->request->connection->log, 0,
"delete %uD, cosocket %p udata %p",
key, u, u->udata_queue);
ngx_queue_remove(q);
ngx_queue_insert_head(&u->udata_queue->free, &node->queue);
u->udata_queue->len--;
return NGX_OK;
}
}

*err_msg = "not found";
return NGX_ERROR;
}


/* vi:set ft=c ts=4 sw=4 et fdm=marker: */
23 changes: 23 additions & 0 deletions src/ngx_http_lua_socket_tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ typedef struct ngx_http_lua_socket_tcp_upstream_s
ngx_http_lua_socket_tcp_upstream_t;


typedef struct ngx_http_lua_socket_udata_queue_s
ngx_http_lua_socket_udata_queue_t;


typedef
int (*ngx_http_lua_socket_tcp_retval_handler)(ngx_http_request_t *r,
ngx_http_lua_socket_tcp_upstream_t *u, lua_State *L);
Expand Down Expand Up @@ -79,6 +83,8 @@ struct ngx_http_lua_socket_tcp_upstream_s {
ngx_http_lua_socket_tcp_upstream_handler_pt read_event_handler;
ngx_http_lua_socket_tcp_upstream_handler_pt write_event_handler;

ngx_http_lua_socket_udata_queue_t *udata_queue;

ngx_http_lua_socket_pool_t *socket_pool;

ngx_http_lua_loc_conf_t *conf;
Expand Down Expand Up @@ -166,9 +172,26 @@ typedef struct {

ngx_uint_t reused;

ngx_http_lua_socket_udata_queue_t *udata_queue;
} ngx_http_lua_socket_pool_item_t;


struct ngx_http_lua_socket_udata_queue_s {
ngx_pool_t *pool;
ngx_queue_t queue;
ngx_queue_t free;
int len;
int capacity;
};


typedef struct {
ngx_queue_t queue;
uint64_t key;
uint64_t value;
} ngx_http_lua_socket_node_t;


void ngx_http_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L);
void ngx_http_lua_inject_req_socket_api(lua_State *L);
void ngx_http_lua_cleanup_conn_pools(lua_State *L);
Expand Down

0 comments on commit fdf752d

Please sign in to comment.