diff --git a/modules/clusterer/api.h b/modules/clusterer/api.h index 4be797cb44..ad6ccdb6f4 100644 --- a/modules/clusterer/api.h +++ b/modules/clusterer/api.h @@ -74,6 +74,7 @@ enum clusterer_event { enum cl_node_match_op { NODE_CMP_ANY, + NODE_CMP_ALL, /* same as ANY, but additionally includes current node */ NODE_CMP_EQ_SIP_ADDR, NODE_CMP_NEQ_SIP_ADDR }; diff --git a/modules/clusterer/clusterer.c b/modules/clusterer/clusterer.c index 5c5a1ac4c1..2eeb9e062f 100644 --- a/modules/clusterer/clusterer.c +++ b/modules/clusterer/clusterer.c @@ -60,6 +60,8 @@ extern int clusterer_enable_rerouting; int dispatch_jobs = 1; +void handle_cl_gen_msg(bin_packet_t *packet, int cluster_id, int source_id); + str cap_sr_details_str[] = { str_init("not synced"), str_init("sync pending"), @@ -392,12 +394,16 @@ enum clusterer_send_ret clusterer_send_msg(bin_packet_t *packet, } lock_release(cl->current_node->lock); - node = get_node_by_id(cl, dst_node_id); - if (!node) { - LM_ERR("Node id [%d] not found in cluster\n", dst_node_id); - if (!locked) - lock_stop_read(cl_list_lock); - return CLUSTERER_SEND_ERR; + if (dst_node_id == cl->current_node->node_id) { + node = cl->current_node; + } else { + node = get_node_by_id(cl, dst_node_id); + if (!node) { + LM_ERR("Node id [%d] not found in cluster\n", dst_node_id); + if (!locked) + lock_stop_read(cl_list_lock); + return CLUSTERER_SEND_ERR; + } } lock_get(node->lock); @@ -422,7 +428,16 @@ enum clusterer_send_ret clusterer_send_msg(bin_packet_t *packet, } } - rc = msg_send_retry(packet, node, 0, &ev_actions_required); + if (node == cl->current_node && packet->type == CLUSTERER_GENERIC_MSG) { + bin_remove_int_buffer_end(packet, 1); + bin_push_int(packet, node->node_id); + bin_get_capability(packet, &capability); + packet->front_pointer = capability.s + capability.len + CMD_FIELD_SIZE; + handle_cl_gen_msg(packet, cluster_id, node->node_id); + rc = 0; + } else { + rc = msg_send_retry(packet, node, 0, &ev_actions_required); + } bin_remove_int_buffer_end(packet, 3); @@ -510,6 +525,17 @@ clusterer_bcast_msg(bin_packet_t *packet, int dst_cid, sent = 1; } + if (match_op == NODE_CMP_ALL && packet->type == CLUSTERER_GENERIC_MSG) { + LM_DBG("broadcasting gen to self (cl: %d, node: %d)\n", + dst_cid, dst_cl->current_node->node_id); + bin_remove_int_buffer_end(packet, 1); + bin_push_int(packet, dst_cl->current_node->node_id); + bin_get_capability(packet, &capability); + packet->front_pointer = capability.s + capability.len + CMD_FIELD_SIZE; + + handle_cl_gen_msg(packet, dst_cid, dst_cl->current_node->node_id); + } + bin_remove_int_buffer_end(packet, 3); if (ev_actions_required) @@ -615,7 +641,8 @@ enum clusterer_send_ret send_gen_msg(int cluster_id, int dst_id, str *gen_msg, return rc; } -enum clusterer_send_ret bcast_gen_msg(int cluster_id, str *gen_msg, str *exchg_tag) +enum clusterer_send_ret bcast_gen_msg(int cluster_id, str *gen_msg, + str *exchg_tag, int all) { bin_packet_t packet; int rc; @@ -626,7 +653,8 @@ enum clusterer_send_ret bcast_gen_msg(int cluster_id, str *gen_msg, str *exchg_t return CLUSTERER_SEND_ERR; } - rc = clusterer_bcast_msg(&packet, cluster_id, NODE_CMP_ANY, 0); + rc = clusterer_bcast_msg(&packet, cluster_id, + all ? NODE_CMP_ALL : NODE_CMP_ANY, 0); bin_free_packet(&packet); @@ -940,7 +968,7 @@ static void handle_internal_msg(bin_packet_t *received, int packet_type, } } -static void handle_cl_gen_msg(bin_packet_t *packet, int cluster_id, int source_id) +void handle_cl_gen_msg(bin_packet_t *packet, int cluster_id, int source_id) { int req_like; str rcv_msg, rcv_tag; diff --git a/modules/clusterer/clusterer.h b/modules/clusterer/clusterer.h index e87ce7dfc7..906fcb2b89 100644 --- a/modules/clusterer/clusterer.h +++ b/modules/clusterer/clusterer.h @@ -191,7 +191,7 @@ unsigned long clusterer_get_num_nodes(int state); enum clusterer_send_ret send_gen_msg(int cluster_id, int node_id, str *gen_msg, str *exchg_tag, int req_like); -enum clusterer_send_ret bcast_gen_msg(int cluster_id, str *gen_msg, str *exchg_tag); +enum clusterer_send_ret bcast_gen_msg(int cluster_id, str *gen_msg, str *exchg_tag, int all); enum clusterer_send_ret send_mi_cmd(int cluster_id, int dst_id, str cmd_name, mi_item_t *cmd_params_arr, int no_params); enum clusterer_send_ret bcast_remove_node(int cluster_id, int target_node); diff --git a/modules/clusterer/clusterer_mod.c b/modules/clusterer/clusterer_mod.c index 9e431c2519..b92e1d4661 100644 --- a/modules/clusterer/clusterer_mod.c +++ b/modules/clusterer/clusterer_mod.c @@ -100,7 +100,7 @@ static void heartbeats_timer_handler(unsigned int ticks, void *param); static void heartbeats_utimer_handler(utime_t ticks, void *param); int cmd_broadcast_req(struct sip_msg *msg, int *cluster_id, str *gen_msg, - pv_spec_t *param_tag); + pv_spec_t *param_tag, int *all); int cmd_send_req(struct sip_msg *msg, int *cluster_id, int *node_id, str *gen_msg, pv_spec_t *param_tag); int cmd_send_rpl(struct sip_msg *msg, int *cluster_id, int *node_id, @@ -118,7 +118,8 @@ static const cmd_export_t cmds[] = { {"cluster_broadcast_req", (cmd_function)cmd_broadcast_req, { {CMD_PARAM_INT,0,0}, {CMD_PARAM_STR,0,0}, - {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {0,0,0}}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, + {CMD_PARAM_INT|CMD_PARAM_OPT,0,0}, {0,0,0}}, REQUEST_ROUTE | FAILURE_ROUTE | ONREPLY_ROUTE | LOCAL_ROUTE | BRANCH_ROUTE | EVENT_ROUTE}, {"cluster_send_req", (cmd_function)cmd_send_req, { {CMD_PARAM_INT,0,0}, @@ -1267,7 +1268,7 @@ static inline void generate_msg_tag(pv_value_t *tag_val, int cluster_id) } int cmd_broadcast_req(struct sip_msg *msg, int *cluster_id, str *gen_msg, - pv_spec_t *param_tag) + pv_spec_t *param_tag, int *all) { pv_value_t tag_val; int rc; @@ -1280,7 +1281,7 @@ int cmd_broadcast_req(struct sip_msg *msg, int *cluster_id, str *gen_msg, return -1; } - rc = bcast_gen_msg(*cluster_id, gen_msg, &tag_val.rs); + rc = bcast_gen_msg(*cluster_id, gen_msg, &tag_val.rs, (all && *all)); switch (rc) { case 0: return 1; diff --git a/modules/clusterer/doc/clusterer_admin.xml b/modules/clusterer/doc/clusterer_admin.xml index f44c9353cc..7779b59703 100644 --- a/modules/clusterer/doc/clusterer_admin.xml +++ b/modules/clusterer/doc/clusterer_admin.xml @@ -792,11 +792,16 @@ event_route[E_CLUSTERER_REQ_RECEIVED] {
- <function moreinfo="none">cluster_broadcast_req(cluster_id, msg, [tag])</function> + <function moreinfo="none">cluster_broadcast_req(cluster_id, msg, [tag], [include_self])</function> This function has a similar behaviour to the cluster_send_req() function with the exception that the message is sent to all the nodes in the specified cluster. + + + include_self (bool, optional, default: false) - raise the event for current node as well, but without actually sending a packet (both req and rpl). + + The function can return the following values: @@ -824,7 +829,8 @@ event_route[E_CLUSTERER_REQ_RECEIVED] { cluster_broadcast_req() usage ... -cluster_broadcast_req($var(cl_id), $var(share_data)); +# also raise the event for current node +cluster_broadcast_req($var(cl_id), $var(share_data), , true); ... diff --git a/modules/clusterer/node_info.c b/modules/clusterer/node_info.c index 6f0baf9679..a5bbc2bda3 100644 --- a/modules/clusterer/node_info.c +++ b/modules/clusterer/node_info.c @@ -1014,6 +1014,7 @@ int match_node(const node_info_t *a, const node_info_t *b, { switch (match_op) { case NODE_CMP_ANY: + case NODE_CMP_ALL: break; case NODE_CMP_EQ_SIP_ADDR: lock_get(a->lock);