Skip to content

Commit

Permalink
clusterer: Enhance cluster_broadcast_req() to include self
Browse files Browse the repository at this point in the history
... via an optional new parameter (default: false).
  • Loading branch information
liviuchircu committed Feb 13, 2025
1 parent 147072a commit 94173b8
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 17 deletions.
1 change: 1 addition & 0 deletions modules/clusterer/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ enum clusterer_event {

enum cl_node_match_op {
NODE_CMP_ANY,
NODE_CMP_ALL, /* same as ANY, but additionally includes current node */
NODE_CMP_EQ_SIP_ADDR,
NODE_CMP_NEQ_SIP_ADDR
};
Expand Down
48 changes: 38 additions & 10 deletions modules/clusterer/clusterer.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ extern int clusterer_enable_rerouting;

int dispatch_jobs = 1;

void handle_cl_gen_msg(bin_packet_t *packet, int cluster_id, int source_id);

str cap_sr_details_str[] = {
str_init("not synced"),
str_init("sync pending"),
Expand Down Expand Up @@ -392,12 +394,16 @@ enum clusterer_send_ret clusterer_send_msg(bin_packet_t *packet,
}
lock_release(cl->current_node->lock);

node = get_node_by_id(cl, dst_node_id);
if (!node) {
LM_ERR("Node id [%d] not found in cluster\n", dst_node_id);
if (!locked)
lock_stop_read(cl_list_lock);
return CLUSTERER_SEND_ERR;
if (dst_node_id == cl->current_node->node_id) {
node = cl->current_node;
} else {
node = get_node_by_id(cl, dst_node_id);
if (!node) {
LM_ERR("Node id [%d] not found in cluster\n", dst_node_id);
if (!locked)
lock_stop_read(cl_list_lock);
return CLUSTERER_SEND_ERR;
}
}

lock_get(node->lock);
Expand All @@ -422,7 +428,16 @@ enum clusterer_send_ret clusterer_send_msg(bin_packet_t *packet,
}
}

rc = msg_send_retry(packet, node, 0, &ev_actions_required);
if (node == cl->current_node && packet->type == CLUSTERER_GENERIC_MSG) {
bin_remove_int_buffer_end(packet, 1);
bin_push_int(packet, node->node_id);
bin_get_capability(packet, &capability);
packet->front_pointer = capability.s + capability.len + CMD_FIELD_SIZE;
handle_cl_gen_msg(packet, cluster_id, node->node_id);
rc = 0;
} else {
rc = msg_send_retry(packet, node, 0, &ev_actions_required);
}

bin_remove_int_buffer_end(packet, 3);

Expand Down Expand Up @@ -510,6 +525,17 @@ clusterer_bcast_msg(bin_packet_t *packet, int dst_cid,
sent = 1;
}

if (match_op == NODE_CMP_ALL && packet->type == CLUSTERER_GENERIC_MSG) {
LM_DBG("broadcasting gen to self (cl: %d, node: %d)\n",
dst_cid, dst_cl->current_node->node_id);
bin_remove_int_buffer_end(packet, 1);
bin_push_int(packet, dst_cl->current_node->node_id);
bin_get_capability(packet, &capability);
packet->front_pointer = capability.s + capability.len + CMD_FIELD_SIZE;

handle_cl_gen_msg(packet, dst_cid, dst_cl->current_node->node_id);
}

bin_remove_int_buffer_end(packet, 3);

if (ev_actions_required)
Expand Down Expand Up @@ -615,7 +641,8 @@ enum clusterer_send_ret send_gen_msg(int cluster_id, int dst_id, str *gen_msg,
return rc;
}

enum clusterer_send_ret bcast_gen_msg(int cluster_id, str *gen_msg, str *exchg_tag)
enum clusterer_send_ret bcast_gen_msg(int cluster_id, str *gen_msg,
str *exchg_tag, int all)
{
bin_packet_t packet;
int rc;
Expand All @@ -626,7 +653,8 @@ enum clusterer_send_ret bcast_gen_msg(int cluster_id, str *gen_msg, str *exchg_t
return CLUSTERER_SEND_ERR;
}

rc = clusterer_bcast_msg(&packet, cluster_id, NODE_CMP_ANY, 0);
rc = clusterer_bcast_msg(&packet, cluster_id,
all ? NODE_CMP_ALL : NODE_CMP_ANY, 0);

bin_free_packet(&packet);

Expand Down Expand Up @@ -940,7 +968,7 @@ static void handle_internal_msg(bin_packet_t *received, int packet_type,
}
}

static void handle_cl_gen_msg(bin_packet_t *packet, int cluster_id, int source_id)
void handle_cl_gen_msg(bin_packet_t *packet, int cluster_id, int source_id)
{
int req_like;
str rcv_msg, rcv_tag;
Expand Down
2 changes: 1 addition & 1 deletion modules/clusterer/clusterer.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ unsigned long clusterer_get_num_nodes(int state);

enum clusterer_send_ret send_gen_msg(int cluster_id, int node_id, str *gen_msg,
str *exchg_tag, int req_like);
enum clusterer_send_ret bcast_gen_msg(int cluster_id, str *gen_msg, str *exchg_tag);
enum clusterer_send_ret bcast_gen_msg(int cluster_id, str *gen_msg, str *exchg_tag, int all);
enum clusterer_send_ret send_mi_cmd(int cluster_id, int dst_id, str cmd_name,
mi_item_t *cmd_params_arr, int no_params);
enum clusterer_send_ret bcast_remove_node(int cluster_id, int target_node);
Expand Down
9 changes: 5 additions & 4 deletions modules/clusterer/clusterer_mod.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ static void heartbeats_timer_handler(unsigned int ticks, void *param);
static void heartbeats_utimer_handler(utime_t ticks, void *param);

int cmd_broadcast_req(struct sip_msg *msg, int *cluster_id, str *gen_msg,
pv_spec_t *param_tag);
pv_spec_t *param_tag, int *all);
int cmd_send_req(struct sip_msg *msg, int *cluster_id, int *node_id,
str *gen_msg, pv_spec_t *param_tag);
int cmd_send_rpl(struct sip_msg *msg, int *cluster_id, int *node_id,
Expand All @@ -118,7 +118,8 @@ static const cmd_export_t cmds[] = {
{"cluster_broadcast_req", (cmd_function)cmd_broadcast_req, {
{CMD_PARAM_INT,0,0},
{CMD_PARAM_STR,0,0},
{CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {0,0,0}},
{CMD_PARAM_VAR|CMD_PARAM_OPT,0,0},
{CMD_PARAM_INT|CMD_PARAM_OPT,0,0}, {0,0,0}},
REQUEST_ROUTE | FAILURE_ROUTE | ONREPLY_ROUTE | LOCAL_ROUTE | BRANCH_ROUTE | EVENT_ROUTE},
{"cluster_send_req", (cmd_function)cmd_send_req, {
{CMD_PARAM_INT,0,0},
Expand Down Expand Up @@ -1267,7 +1268,7 @@ static inline void generate_msg_tag(pv_value_t *tag_val, int cluster_id)
}

int cmd_broadcast_req(struct sip_msg *msg, int *cluster_id, str *gen_msg,
pv_spec_t *param_tag)
pv_spec_t *param_tag, int *all)
{
pv_value_t tag_val;
int rc;
Expand All @@ -1280,7 +1281,7 @@ int cmd_broadcast_req(struct sip_msg *msg, int *cluster_id, str *gen_msg,
return -1;
}

rc = bcast_gen_msg(*cluster_id, gen_msg, &tag_val.rs);
rc = bcast_gen_msg(*cluster_id, gen_msg, &tag_val.rs, (all && *all));
switch (rc) {
case 0:
return 1;
Expand Down
10 changes: 8 additions & 2 deletions modules/clusterer/doc/clusterer_admin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -792,11 +792,16 @@ event_route[E_CLUSTERER_REQ_RECEIVED] {

<section id="func_cluster_broadcast_req" xreflabel="cluster_broadcast_req()">
<title>
<function moreinfo="none">cluster_broadcast_req(cluster_id, msg, [tag])</function>
<function moreinfo="none">cluster_broadcast_req(cluster_id, msg, [tag], [include_self])</function>
</title>
<para>
This function has a similar behaviour to the <function moreinfo="none">cluster_send_req()</function> function with the exception that the message is sent to all the nodes in the specified cluster.
</para>
<itemizedlist>
<listitem>
<para><emphasis>include_self</emphasis> (bool, optional, default: <emphasis>false</emphasis>) - raise the event for current node as well, but without actually sending a packet (both req and rpl).</para>
</listitem>
</itemizedlist>
<para>
The function can return the following values:
<itemizedlist>
Expand Down Expand Up @@ -824,7 +829,8 @@ event_route[E_CLUSTERER_REQ_RECEIVED] {
<title>cluster_broadcast_req() usage</title>
<programlisting format="linespecific">
...
cluster_broadcast_req($var(cl_id), $var(share_data));
# also raise the event for current node
cluster_broadcast_req($var(cl_id), $var(share_data), , true);
...
</programlisting>
</example>
Expand Down
1 change: 1 addition & 0 deletions modules/clusterer/node_info.c
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,7 @@ int match_node(const node_info_t *a, const node_info_t *b,
{
switch (match_op) {
case NODE_CMP_ANY:
case NODE_CMP_ALL:
break;
case NODE_CMP_EQ_SIP_ADDR:
lock_get(a->lock);
Expand Down

0 comments on commit 94173b8

Please sign in to comment.