diff --git a/modules/cachedb_redis/cachedb_redis_dbase.c b/modules/cachedb_redis/cachedb_redis_dbase.c index 5ef92c6a5a4..f9ee1c2d1cb 100644 --- a/modules/cachedb_redis/cachedb_redis_dbase.c +++ b/modules/cachedb_redis/cachedb_redis_dbase.c @@ -564,48 +564,83 @@ static int _redis_run_command(cachedb_con *connection, redisReply **rpl, str *ke reply,reply?(unsigned)reply->len:7,reply?reply->str:"FAILURE", node->context->errstr); - if (match_prefix(reply->str, reply->len, MOVED_PREFIX, MOVED_PREFIX_LEN)) { - // It's a MOVED response + if (match_prefix(reply->str, reply->len, MOVED_PREFIX, MOVED_PREFIX_LEN) || + match_prefix(reply->str, reply->len, ASK_PREFIX, ASK_PREFIX_LEN)) { + int is_ask = match_prefix(reply->str, reply->len, + ASK_PREFIX, ASK_PREFIX_LEN); redis_moved *moved_info = pkg_malloc(sizeof(redis_moved)); - if (!moved_info) { - LM_ERR("cachedb_redis: Unable to allocate redis_moved struct, no more pkg memory\n"); - freeReplyObject(reply); - reply = NULL; - goto try_next_con; - } else { - if (parse_moved_reply(reply, moved_info) < 0) { - LM_ERR("cachedb_redis: Unable to parse MOVED reply\n"); - pkg_free(moved_info); - moved_info = NULL; - freeReplyObject(reply); - goto try_next_con; - } - - LM_DBG("cachedb_redis: MOVED slot: [%d] endpoint: [%.*s] port: [%d]\n", moved_info->slot, moved_info->endpoint.len, moved_info->endpoint.s, moved_info->port); - node = get_redis_connection_by_endpoint(con, moved_info); + if (!moved_info) { + LM_ERR("Unable to allocate redis_moved struct," + " no more pkg memory\n"); + freeReplyObject(reply); + reply = NULL; + goto try_next_con; + } + if ((is_ask ? + parse_ask_reply(reply, moved_info) : + parse_moved_reply(reply, moved_info)) < 0) { + LM_ERR("Unable to parse %s reply\n", + is_ask ? "ASK" : "MOVED"); pkg_free(moved_info); - moved_info = NULL; freeReplyObject(reply); - reply = NULL; + goto try_next_con; + } - if (node == NULL) { - LM_ERR("Unable to locate connection by endpoint\n"); - last_err = -10; + LM_DBG("%s slot: [%d] endpoint: [%.*s]" + " port: [%d]\n", + is_ask ? "ASK" : "MOVED", + moved_info->slot, + moved_info->endpoint.len, + moved_info->endpoint.s, + moved_info->port); + node = get_redis_connection_by_endpoint( + con, moved_info); + + pkg_free(moved_info); + freeReplyObject(reply); + reply = NULL; + + if (node == NULL) { + LM_ERR("Unable to locate connection" + " by endpoint\n"); + last_err = -10; + goto try_next_con; + } + + if (node->context == NULL) { + if (redis_reconnect_node(con,node) < 0) { + LM_ERR("Unable to reconnect to" + " node %s:%d\n", + node->ip, node->port); + last_err = -1; goto try_next_con; } + } - if (node->context == NULL) { - if (redis_reconnect_node(con,node) < 0) { - LM_ERR("Unable to reconnect to node %p endpoint: %s:%d\n", node, node->ip, node->port); - last_err = -1; - goto try_next_con; - } + if (is_ask) { + /* ASK requires sending ASKING before + * retrying the command on the new node */ + redisReply *asking_reply; + asking_reply = redisCommand( + node->context, "ASKING"); + if (!asking_reply || + asking_reply->type == + REDIS_REPLY_ERROR) { + LM_ERR("ASKING command failed" + " on %s:%d\n", + node->ip, node->port); + if (asking_reply) + freeReplyObject( + asking_reply); + last_err = -1; + goto try_next_con; } - - i = QUERY_ATTEMPTS; // New node that is the target being MOVED to, should have the attempts reset - continue; + freeReplyObject(asking_reply); } + + i = QUERY_ATTEMPTS; + continue; } freeReplyObject(reply); diff --git a/modules/cachedb_redis/cachedb_redis_utils.c b/modules/cachedb_redis/cachedb_redis_utils.c index 6abac0882b5..fa3d8202f7d 100644 --- a/modules/cachedb_redis/cachedb_redis_utils.c +++ b/modules/cachedb_redis/cachedb_redis_utils.c @@ -348,18 +348,19 @@ int build_cluster_nodes(redis_con *con,char *info,int size) } /* - When Redis is operating as a cluster, it is possible (very likely) - that a MOVED redirection will be returned by the Redis nodes that - received the request. The general format of the reply from Redis is: - MOVED slot [IP|FQDN]:port + When Redis is operating as a cluster, MOVED or ASK redirections may + be returned by the Redis nodes that received the request. The + general format of both redirect replies is: + MOVED|ASK slot [IP|FQDN]:port - This routine will parse the Redis MOVED reply into its components. + This routine parses a redirect reply into its components given + the expected prefix (e.g. "MOVED " or "ASK "). Note that the redisReply struct MUST be released outside of this routine to avoid a memory leak. The out->endpoint pointer must not be used after the redisReply has been released. The parsed data is stored into the following redis_moved struct: - + typedef struct { int slot; const_str endpoint; @@ -367,35 +368,35 @@ int build_cluster_nodes(redis_con *con,char *info,int size) } redis_moved; */ -int parse_moved_reply(redisReply *reply, redis_moved *out) { - int i; +int parse_redirect_reply(redisReply *reply, redis_moved *out, + const char *prefix, size_t prefix_len) { + size_t i; int slot = 0; const char *p; const char *end; const char *host_start; const char *colon = NULL; const char *port_start; - int port = REDIS_DF_PORT; // Default to Redis standard port + int port = REDIS_DF_PORT; - if (!reply || !reply->str || reply->len < MOVED_PREFIX_LEN || !out) + if (!reply || !reply->str || (size_t)reply->len < prefix_len || !out) return ERR_INVALID_REPLY; p = reply->str; end = reply->str + reply->len; - for (i = 0; i < MOVED_PREFIX_LEN; ++i) { - if (p[i] != MOVED_PREFIX[i]) { - return ERR_INVALID_REPLY; - } + for (i = 0; i < prefix_len; ++i) { + if (p[i] != prefix[i]) + return ERR_INVALID_REPLY; } - p += MOVED_PREFIX_LEN; + p += prefix_len; // Parse slot number while (p < end && *p >= '0' && *p <= '9') { slot = slot * 10 + (*p - '0'); p++; } - if (slot == 0 && (p == reply->str + MOVED_PREFIX_LEN || *(p - 1) < '0' || *(p - 1) > '9')) + if (slot == 0 && (p == reply->str + prefix_len || *(p - 1) < '0' || *(p - 1) > '9')) return ERR_INVALID_SLOT; // Skip spaces diff --git a/modules/cachedb_redis/cachedb_redis_utils.h b/modules/cachedb_redis/cachedb_redis_utils.h index d8213ba7b73..bc360ec1898 100644 --- a/modules/cachedb_redis/cachedb_redis_utils.h +++ b/modules/cachedb_redis/cachedb_redis_utils.h @@ -31,6 +31,9 @@ #define MOVED_PREFIX "MOVED " #define MOVED_PREFIX_LEN (sizeof(MOVED_PREFIX) - 1) +#define ASK_PREFIX "ASK " +#define ASK_PREFIX_LEN (sizeof(ASK_PREFIX) - 1) + #define ERR_INVALID_REPLY -1 #define ERR_INVALID_SLOT -2 #define ERR_INVALID_PORT -3 @@ -41,7 +44,16 @@ int build_cluster_nodes(redis_con *con,char *info,int size); cluster_node *get_redis_connection(redis_con *con,str *key); cluster_node *get_redis_connection_by_endpoint(redis_con *con, redis_moved *redis_info); void destroy_cluster_nodes(redis_con *con); -int parse_moved_reply(redisReply *reply, redis_moved *out); +int parse_redirect_reply(redisReply *reply, redis_moved *out, + const char *prefix, size_t prefix_len); + +static inline int parse_moved_reply(redisReply *reply, redis_moved *out) { + return parse_redirect_reply(reply, out, MOVED_PREFIX, MOVED_PREFIX_LEN); +} + +static inline int parse_ask_reply(redisReply *reply, redis_moved *out) { + return parse_redirect_reply(reply, out, ASK_PREFIX, ASK_PREFIX_LEN); +} static inline int match_prefix(const char *buf, size_t len, const char *prefix, size_t prefix_len) { size_t i;