From 4d49d2ecd9e1d60f18e665570e4ad1a2ba9b65b1 Mon Sep 17 00:00:00 2001 From: charsyam Date: Wed, 14 Oct 2015 02:38:38 +0900 Subject: [PATCH] add heartbeat --- src/hashkit/nc_ketama.c | 8 +- src/hashkit/nc_modula.c | 8 +- src/hashkit/nc_random.c | 6 +- src/nc_client.c | 5 ++ src/nc_client.h | 1 + src/nc_conf.c | 9 ++ src/nc_connection.c | 3 + src/nc_connection.h | 3 + src/nc_core.c | 73 +++++++++++++++- src/nc_core.h | 4 + src/nc_proxy.c | 5 ++ src/nc_proxy.h | 1 + src/nc_response.c | 10 +++ src/nc_server.c | 179 ++++++++++++++++++++++++++++++++++++++-- src/nc_server.h | 9 ++ 15 files changed, 296 insertions(+), 28 deletions(-) diff --git a/src/hashkit/nc_ketama.c b/src/hashkit/nc_ketama.c index 9d43f2b2..52ac48c1 100644 --- a/src/hashkit/nc_ketama.c +++ b/src/hashkit/nc_ketama.c @@ -90,12 +90,8 @@ ketama_update(struct server_pool *pool) struct server *server = array_get(&pool->server, server_index); if (pool->auto_eject_hosts) { - if (server->next_retry <= now) { - server->next_retry = 0LL; + if (server->fail == FAIL_STATUS_NORMAL) { nlive_server++; - } else if (pool->next_rebuild == 0LL || - server->next_retry < pool->next_rebuild) { - pool->next_rebuild = server->next_retry; } } else { nlive_server++; @@ -104,7 +100,7 @@ ketama_update(struct server_pool *pool) ASSERT(server->weight > 0); /* count weight only for live servers */ - if (!pool->auto_eject_hosts || server->next_retry <= now) { + if (!pool->auto_eject_hosts || server->fail == 0) { total_weight += server->weight; } } diff --git a/src/hashkit/nc_modula.c b/src/hashkit/nc_modula.c index 083f89a9..7faa6945 100644 --- a/src/hashkit/nc_modula.c +++ b/src/hashkit/nc_modula.c @@ -54,12 +54,8 @@ modula_update(struct server_pool *pool) struct server *server = array_get(&pool->server, server_index); if (pool->auto_eject_hosts) { - if (server->next_retry <= now) { - server->next_retry = 0LL; + if (server->fail == FAIL_STATUS_NORMAL) { nlive_server++; - } else if (pool->next_rebuild == 0LL || - server->next_retry < pool->next_rebuild) { - pool->next_rebuild = server->next_retry; } } else { nlive_server++; @@ -68,7 +64,7 @@ modula_update(struct server_pool *pool) ASSERT(server->weight > 0); /* count weight only for live servers */ - if (!pool->auto_eject_hosts || server->next_retry <= now) { + if (!pool->auto_eject_hosts || server->fail == FAIL_STATUS_NORMAL) { total_weight += server->weight; } } diff --git a/src/hashkit/nc_random.c b/src/hashkit/nc_random.c index 8c6261c7..1afe3c01 100644 --- a/src/hashkit/nc_random.c +++ b/src/hashkit/nc_random.c @@ -51,12 +51,8 @@ random_update(struct server_pool *pool) struct server *server = array_get(&pool->server, server_index); if (pool->auto_eject_hosts) { - if (server->next_retry <= now) { - server->next_retry = 0LL; + if (server->fail == FAIL_STATUS_NORMAL) { nlive_server++; - } else if (pool->next_rebuild == 0LL || - server->next_retry < pool->next_rebuild) { - pool->next_rebuild = server->next_retry; } } else { nlive_server++; diff --git a/src/nc_client.c b/src/nc_client.c index ba3cbf39..291b0ff0 100644 --- a/src/nc_client.c +++ b/src/nc_client.c @@ -187,3 +187,8 @@ client_close(struct context *ctx, struct conn *conn) conn_put(conn); } + +void +client_restore(struct context *ctx, struct conn *conn) +{ +} diff --git a/src/nc_client.h b/src/nc_client.h index 2becb0cb..cde4ce6e 100644 --- a/src/nc_client.h +++ b/src/nc_client.h @@ -24,5 +24,6 @@ bool client_active(struct conn *conn); void client_ref(struct conn *conn, void *owner); void client_unref(struct conn *conn); void client_close(struct context *ctx, struct conn *conn); +void client_restore(struct context *ctx, struct conn *conn); #endif diff --git a/src/nc_conf.c b/src/nc_conf.c index fa720796..a2acf566 100644 --- a/src/nc_conf.c +++ b/src/nc_conf.c @@ -167,6 +167,7 @@ conf_server_each_transform(void *elem, void *data) s->next_retry = 0LL; s->failure_count = 0; + s->fail = FAIL_STATUS_NORMAL; log_debug(LOG_VERB, "transform to server %"PRIu32" '%.*s'", s->idx, s->pname.len, s->pname.data); @@ -1136,6 +1137,14 @@ conf_pre_validate(struct conf *cf) return NC_OK; } +static int +conf_server_pname_cmp(const void *t1, const void *t2) +{ + const struct conf_server *s1 = t1, *s2 = t2; + + return string_compare(&s1->pname, &s2->pname); +} + static int conf_server_name_cmp(const void *t1, const void *t2) { diff --git a/src/nc_connection.c b/src/nc_connection.c index a0d1d9da..b7fa4a59 100644 --- a/src/nc_connection.c +++ b/src/nc_connection.c @@ -194,6 +194,7 @@ conn_get(void *owner, bool client, bool redis) conn->close = client_close; conn->active = client_active; + conn->restore = client_restore; conn->ref = client_ref; conn->unref = client_unref; @@ -221,6 +222,7 @@ conn_get(void *owner, bool client, bool redis) conn->close = server_close; conn->active = server_active; + conn->restore = server_restore; conn->ref = server_ref; conn->unref = server_unref; @@ -269,6 +271,7 @@ conn_get_proxy(void *owner) conn->close = proxy_close; conn->active = NULL; + conn->restore = proxy_restore; conn->ref = proxy_ref; conn->unref = proxy_unref; diff --git a/src/nc_connection.h b/src/nc_connection.h index 7ece25ef..345c36e6 100644 --- a/src/nc_connection.h +++ b/src/nc_connection.h @@ -33,6 +33,7 @@ typedef bool (*conn_active_t)(struct conn *); typedef void (*conn_ref_t)(struct conn *, void *); typedef void (*conn_unref_t)(struct conn *); +typedef void (*conn_restore_t)(struct context *, struct conn *); typedef void (*conn_msgq_t)(struct context *, struct conn *, struct msg *); typedef void (*conn_post_connect_t)(struct context *ctx, struct conn *, struct server *server); @@ -60,6 +61,7 @@ struct conn { conn_send_done_t send_done; /* write done handler */ conn_close_t close; /* close handler */ conn_active_t active; /* active? handler */ + conn_restore_t restore; /* restore handler */ conn_post_connect_t post_connect; /* post connect handler */ conn_swallow_msg_t swallow_msg; /* react on messages to be swallowed */ @@ -105,5 +107,6 @@ uint32_t conn_ncurr_conn(void); uint64_t conn_ntotal_conn(void); uint32_t conn_ncurr_cconn(void); bool conn_authenticated(struct conn *conn); +rstatus_t event_add_out_with_conn(struct context *ctx, struct conn *conn, struct msg *msg); #endif diff --git a/src/nc_core.c b/src/nc_core.c index c39a8057..59d07bf5 100644 --- a/src/nc_core.c +++ b/src/nc_core.c @@ -24,6 +24,30 @@ static uint32_t ctx_id; /* context generation */ +static void +core_failed_servers_init(struct context *ctx) +{ + int i; + + for (i = 0; i < 2; i++) { + array_init(&(ctx->failed_servers[i]), 10, sizeof(struct server *)); + } +} + +static void +core_failed_servers_deinit(struct context *ctx) +{ + uint32_t i, n, nsize; + + for (i = 0; i < 2; i++) { + nsize = array_n(&(ctx->failed_servers[i])); + for (n = 0; n < nsize; n++) { + array_pop(&(ctx->failed_servers[n])); + } + array_deinit(&(ctx->failed_servers[n])); + } +} + static rstatus_t core_calc_connections(struct context *ctx) { @@ -60,6 +84,11 @@ core_ctx_create(struct instance *nci) ctx->stats = NULL; ctx->evb = NULL; array_null(&ctx->pool); + array_null(&(ctx->failed_servers[0])); + array_null(&(ctx->failed_servers[1])); + ctx->failed_idx = 0; + ctx->fails = &(ctx->failed_servers[0]); + ctx->max_timeout = nci->stats_interval; ctx->timeout = ctx->max_timeout; ctx->max_nfd = 0; @@ -93,6 +122,8 @@ core_ctx_create(struct instance *nci) return NULL; } + core_failed_servers_init(ctx); + /* create stats per server pool */ ctx->stats = stats_create(nci->stats_port, nci->stats_addr, nci->stats_interval, nci->hostname, &ctx->pool); @@ -261,6 +292,41 @@ core_error(struct context *ctx, struct conn *conn) core_close(ctx, conn); } +static void +retry_connection(struct context *ctx) +{ + struct array *servers; + int idx; + struct server *server; + int64_t now; + uint32_t i, nsize; + rstatus_t status; + + servers = ctx->fails; + idx = (ctx->failed_idx == 0) ? 1 : 0; + + ctx->failed_idx = idx; + ctx->fails = &(ctx->failed_servers[idx]); + + now = nc_usec_now(); + nsize = array_n(servers); + if (nsize == 0) { + return; + } + + for (i = 0; i < nsize; i++) { + server = *(struct server **)array_pop(servers); + if (server->next_retry == 0 || server->next_retry < now) { + status = server_reconnect(ctx, server); + if (status != NC_OK) { + add_failed_server(ctx, server); + } + } else { + add_failed_server(ctx, server); + } + } +} + static void core_timeout(struct context *ctx) { @@ -272,14 +338,14 @@ core_timeout(struct context *ctx) msg = msg_tmo_min(); if (msg == NULL) { ctx->timeout = ctx->max_timeout; - return; + break; } /* skip over req that are in-error or done */ if (msg->error || msg->done) { msg_tmo_delete(msg); - continue; + break; } /* @@ -304,6 +370,8 @@ core_timeout(struct context *ctx) core_close(ctx, conn); } + + retry_connection(ctx); } rstatus_t @@ -324,6 +392,7 @@ core_core(void *arg, uint32_t events) conn->client ? 'c' : (conn->proxy ? 'p' : 's'), conn->sd); conn->events = events; + conn->restore(ctx, conn); /* error takes precedence over read | write */ if (events & EVENT_ERR) { diff --git a/src/nc_core.h b/src/nc_core.h index 3166ec9b..ef834722 100644 --- a/src/nc_core.h +++ b/src/nc_core.h @@ -124,6 +124,10 @@ struct context { struct stats *stats; /* stats */ struct array pool; /* server_pool[] */ + struct array failed_servers[2]; /* failed servers */ + struct array *fails; /* ref of current fails server */ + + int failed_idx; /* current idx for failed servers */ struct event_base *evb; /* event base */ int max_timeout; /* max timeout in msec */ int timeout; /* timeout in msec */ diff --git a/src/nc_proxy.c b/src/nc_proxy.c index 38cdd9e6..489e6e34 100644 --- a/src/nc_proxy.c +++ b/src/nc_proxy.c @@ -406,3 +406,8 @@ proxy_recv(struct context *ctx, struct conn *conn) return NC_OK; } + +void +proxy_restore(struct context *ctx, struct conn *conn) +{ +} diff --git a/src/nc_proxy.h b/src/nc_proxy.h index 10880d54..b62adb65 100644 --- a/src/nc_proxy.h +++ b/src/nc_proxy.h @@ -30,5 +30,6 @@ rstatus_t proxy_each_deinit(void *elem, void *data); rstatus_t proxy_init(struct context *ctx); void proxy_deinit(struct context *ctx); rstatus_t proxy_recv(struct context *ctx, struct conn *conn); +void proxy_restore(struct context *ctx, struct conn *conn); #endif diff --git a/src/nc_response.c b/src/nc_response.c index 694c8720..ed28f50e 100644 --- a/src/nc_response.c +++ b/src/nc_response.c @@ -142,9 +142,12 @@ static bool rsp_filter(struct context *ctx, struct conn *conn, struct msg *msg) { struct msg *pmsg; + struct server *server; ASSERT(!conn->client && !conn->proxy); + server = (struct server *)conn->owner; + if (msg_empty(msg)) { ASSERT(conn->rmsg == NULL); log_debug(LOG_VERB, "filter empty rsp %"PRIu64" on s %d", msg->id, @@ -204,6 +207,13 @@ rsp_filter(struct context *ctx, struct conn *conn, struct msg *msg) } if (pmsg->swallow) { + if (server->fail == FAIL_STATUS_ERR_TRY_HEARTBEAT) { + struct conn *c_conn; + + c_conn = pmsg->owner; + server_restore_from_heartbeat(server, c_conn); + } + conn->swallow_msg(conn, pmsg, msg); conn->dequeue_outq(ctx, conn, pmsg); diff --git a/src/nc_server.c b/src/nc_server.c index 7b1d5e4f..4e7de766 100644 --- a/src/nc_server.c +++ b/src/nc_server.c @@ -21,6 +21,7 @@ #include #include #include +#include static void server_resolve(struct server *server, struct conn *conn) @@ -266,31 +267,37 @@ static void server_failure(struct context *ctx, struct server *server) { struct server_pool *pool = server->owner; - int64_t now, next; + int64_t now; rstatus_t status; + bool is_reconnect; if (!pool->auto_eject_hosts) { return; } - server->failure_count++; - log_debug(LOG_VERB, "server '%.*s' failure count %"PRIu32" limit %"PRIu32, server->pname.len, server->pname.data, server->failure_count, pool->server_failure_limit); - if (server->failure_count < pool->server_failure_limit) { + now = nc_usec_now(); + if (now < 0) { return; } - now = nc_usec_now(); - if (now < 0) { + server->next_retry = now + pool->server_retry_timeout; + server->failure_count++; + is_reconnect = (server->fail != FAIL_STATUS_NORMAL) ? true : false; + + if (is_reconnect) { + add_failed_server(ctx, server); return; } - stats_server_set_ts(ctx, server, server_ejected_at, now); + if (server->failure_count < pool->server_failure_limit) { + return; + } - next = now + pool->server_retry_timeout; + stats_server_set_ts(ctx, server, server_ejected_at, now); log_debug(LOG_INFO, "update pool %"PRIu32" '%.*s' to delete server '%.*s' " "for next %"PRIu32" secs", pool->idx, pool->name.len, @@ -300,13 +307,13 @@ server_failure(struct context *ctx, struct server *server) stats_pool_incr(ctx, pool, server_ejects); server->failure_count = 0; - server->next_retry = next; status = server_pool_run(pool); if (status != NC_OK) { log_error("updating pool %"PRIu32" '%.*s' failed: %s", pool->idx, pool->name.len, pool->name.data, strerror(errno)); } + add_failed_server(ctx, server); } static void @@ -925,3 +932,157 @@ server_pool_deinit(struct array *server_pool) log_debug(LOG_DEBUG, "deinit %"PRIu32" pools", npool); } + +static struct msg * +heartbeat_msg_get(struct conn *conn) +{ + struct msg *msg; + + ASSERT(conn->client && !conn->proxy); + + msg = msg_get(conn, true, conn->redis); + if (msg == NULL) { + conn->err = errno; + } + + return msg; +} + +static uint32_t +set_heartbeat_command(struct mbuf *mbuf, int redis) +{ +#define HEARTBEAT_MEMCACHE_COMMAND "get twemproxy\r\n" +#define HEARTBEAT_REDIS_COMMAND "*2\r\n$3\r\nget\r\n$9\r\ntwemproxy\r\n" + char *command; + uint32_t n; + + command = redis ? HEARTBEAT_REDIS_COMMAND : HEARTBEAT_MEMCACHE_COMMAND; + n = (uint32_t)strlen(command); + + memcpy(mbuf->last, command, n); + ASSERT((mbuf->last + n) <= mbuf->end); + + return n; +} + +static rstatus_t +send_heartbeat(struct context *ctx, struct conn *conn, struct server *server) +{ + struct mbuf *mbuf; + uint32_t n; + struct msg *msg; + struct server_pool *pool; + struct conn *c_conn; + rstatus_t status; + + pool = (struct server_pool *)(server->owner); + + c_conn = conn_get(pool, true, conn->redis); + if (c_conn == NULL) { + return NC_ERROR; + } + + msg = heartbeat_msg_get(c_conn); + if (msg == NULL) { + return NC_ERROR; + } + + c_conn->rmsg = msg; + mbuf = STAILQ_LAST(&msg->mhdr, mbuf, next); + if (mbuf == NULL || mbuf_full(mbuf)) { + mbuf = mbuf_get(); + if (mbuf == NULL) { + return NC_ERROR; + } + mbuf_insert(&msg->mhdr, mbuf); + msg->pos = mbuf->pos; + } + ASSERT(mbuf->end - mbuf->last > 0); + + n = set_heartbeat_command(mbuf, conn->redis); + mbuf->last += n; + msg->mlen += n; + + msg->swallow = 1; + server->fail = FAIL_STATUS_ERR_TRY_HEARTBEAT; + + if (TAILQ_EMPTY(&conn->imsg_q)) { + status = event_add_out(ctx->evb, conn); + if (status != NC_OK) { + return status; + } + } + conn->enqueue_inq(ctx, conn, msg); + return NC_OK; +} + +void +server_restore(struct context *ctx, struct conn *conn) +{ + struct server *server; + + server = (struct server *)(conn->owner); + ASSERT(server != NULL); + + if (server->fail == FAIL_STATUS_NORMAL) { + return; + } + + send_heartbeat(ctx, conn, server); +} + +rstatus_t +server_reconnect(struct context *ctx, struct server *server) +{ + rstatus_t status; + struct conn *conn; + + conn = server_conn(server); + if (conn == NULL) { + return NC_ERROR; + } + + status = server_connect(ctx, server, conn); + if (status == NC_OK) { + if (conn->connected) { + conn->restore(ctx, conn); + } + } else { + server_close(ctx, conn); + } + + return status; +} + +void +add_failed_server(struct context *ctx, struct server *server) +{ + struct server **pserver; + + server->fail = FAIL_STATUS_ERR_TRY_CONNECT; + pserver = (struct server **)array_push(ctx->fails); + *pserver = server; +} + +void +server_restore_from_heartbeat(struct server *server, struct conn *conn) +{ + struct server_pool *pool; + rstatus_t status; + + conn->unref(conn); + conn_put(conn); + pool = (struct server_pool *)server->owner; + server->fail = FAIL_STATUS_NORMAL; + + status = server_pool_run(pool); + if (status == NC_OK) { + log_debug(LOG_NOTICE, "updating pool %"PRIu32" '%.*s'," + "restored server '%.*s'", pool->idx, + pool->name.len, pool->name.data, + server->name.len, server->name.data); + } else { + log_error("updating pool %"PRIu32" '%.*s' failed: %s", pool->idx, + pool->name.len, pool->name.data, strerror(errno)); + } +} diff --git a/src/nc_server.h b/src/nc_server.h index 11cdaa77..be65dc22 100644 --- a/src/nc_server.h +++ b/src/nc_server.h @@ -59,6 +59,10 @@ * // */ +#define FAIL_STATUS_NORMAL 0 +#define FAIL_STATUS_ERR_TRY_CONNECT 1 +#define FAIL_STATUS_ERR_TRY_HEARTBEAT 2 + typedef uint32_t (*hash_t)(const char *, size_t); struct continuum { @@ -82,6 +86,7 @@ struct server { int64_t next_retry; /* next retry time in usec */ uint32_t failure_count; /* # consecutive failures */ + uint32_t fail; }; struct server_pool { @@ -142,5 +147,9 @@ rstatus_t server_pool_preconnect(struct context *ctx); void server_pool_disconnect(struct context *ctx); rstatus_t server_pool_init(struct array *server_pool, struct array *conf_pool, struct context *ctx); void server_pool_deinit(struct array *server_pool); +void server_restore(struct context *ctx, struct conn *conn); +rstatus_t server_reconnect(struct context *ctx, struct server *server); +void add_failed_server(struct context *ctx, struct server *server); +void server_restore_from_heartbeat(struct server *server, struct conn *conn); #endif