From 0548ffa6fa35e917fc1e72bab9256045dfd151c7 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 6 Nov 2024 11:13:41 +0000 Subject: [PATCH] raft info --- deps/rabbit/src/rabbit_fifo.erl | 5 +- deps/rabbit/src/rabbit_quorum_queue.erl | 43 ++++++++++-- .../priv/www/js/tmpl/queue.ejs | 11 ++++ .../priv/www/js/tmpl/quorum-members-info.ejs | 65 +++++++++++++++++++ 4 files changed, 117 insertions(+), 7 deletions(-) create mode 100644 deps/rabbitmq_management/priv/www/js/tmpl/quorum-members-info.ejs diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index b771a5cc1cd7..8ebfc3a109a1 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -1029,9 +1029,10 @@ handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds, %% for returns with a delivery limit set we can just return as before {no_reply, Aux0, RaAux0, [{append, Ret, {notify, Corr, Pid}}]} end; -handle_aux(leader, _, {handle_tick, [QName, Overview0, Nodes]}, +handle_aux(leader, _, {handle_tick, [QName, MacOverview0, Nodes]}, #?AUX{tick_pid = Pid} = Aux, RaAux) -> - Overview = Overview0#{members_info => ra_aux:members_info(RaAux)}, + Overview = MacOverview0#{members_info => ra_aux:members_info(RaAux), + ra_overview => ra_aux:overview(RaAux)}, NewPid = case process_is_alive(Pid) of false -> diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 33b9f704af8c..bde50152c404 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -560,12 +560,29 @@ handle_tick(QName, num_discarded := NumDiscarded, num_discard_checked_out := NumDiscardedCheckedOut, discard_message_bytes := DiscardBytes, - discard_checkout_message_bytes := DiscardCheckoutBytes, - smallest_raft_index := _} = Overview, - Nodes) -> + discard_checkout_message_bytes := DiscardCheckoutBytes + } = Overview, + KnownNodes) -> %% this makes calls to remote processes so cannot be run inside the %% ra server Self = self(), + MembersInfo0 = maps:get(members_info, Overview, #{}), + RaOverview0 = maps:get(ra_overview, Overview, #{}), + RaOverview = maps:update_with(log, + fun (L) -> + maps:with([last_index, + first_index, + last_written_index, + last_wal_write, + lastest_checkpoint_index], + case L of + #{last_written_index_term := {I, _}} = LO -> + LO#{last_written_index => I}; + LO -> + LO + end) + end, #{}, RaOverview0), + spawn( fun() -> try @@ -600,6 +617,17 @@ handle_tick(QName, end, info(Q, Keys), Overview), MsgBytesDiscarded = DiscardBytes + DiscardCheckoutBytes, MsgBytes = EnqueueBytes + CheckoutBytes + MsgBytesDiscarded, + MembersInfo = maps:fold(fun + ({_, N}, Val0, Acc) -> + Val = maps:map( + fun (status, {S, _}) -> + S; + (_K, V) -> + V + end, Val0), + Acc#{N => Val} + end, #{}, MembersInfo0), + Infos = [{consumers, NumConsumers}, {publishers, NumEnqueuers}, {consumer_capacity, Util}, @@ -620,7 +648,12 @@ handle_tick(QName, unlimited; Limit -> Limit - end} + end}, + {members_info, MembersInfo}, + {raft, maps:with([current_term, + commit_index, + last_applied, + log], RaOverview)} | Infos0], rabbit_core_metrics:queue_stats(QName, Infos), ok = repair_leader_record(Q, Self), @@ -631,7 +664,7 @@ handle_tick(QName, rabbit_log:debug("Repaired quorum queue ~ts amqqueue record", [rabbit_misc:rs(QName)]) end, ExpectedNodes = rabbit_nodes:list_members(), - case Nodes -- ExpectedNodes of + case KnownNodes -- ExpectedNodes of [] -> ok; Stale when length(ExpectedNodes) > 0 -> diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs index 7f2c9e131a55..b87444fdbfcf 100644 --- a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs +++ b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs @@ -421,6 +421,17 @@ <% } %> +<% if(is_quorum(queue) && queue.hasOwnProperty('raft')) { %> +
+

Raft Info (Advanced)

+
+<%= format('quorum-members-info', {'mode': 'queue', + 'queue': queue, + 'members_info': queue.members_info}) %> +
+
+<% } %> + <% if(queue.reductions || queue.garbage_collection) { %>

Runtime Metrics (Advanced)

diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/quorum-members-info.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/quorum-members-info.ejs new file mode 100644 index 000000000000..c6293dad8828 --- /dev/null +++ b/deps/rabbitmq_management/priv/www/js/tmpl/quorum-members-info.ejs @@ -0,0 +1,65 @@ +<% if (members_info) { +var i = 0; +var r = queue.raft; +var l = r.log; +delete members_info[queue.leader]; +%> +

Leader

+ + + + + + + + + + + + + + + + + + + + + + +
NodeTermFirst IndexLast IndexLog LengthLast AppliedCommit Index
<%= fmt_string(queue.leader) %><%= fmt_string(r.current_term) %><%= fmt_string(l.first_index) %><%= fmt_string(l.last_index) %><%= fmt_string(l.last_index - l.first_index + 1) %><%= fmt_string(r.last_applied) %><%= fmt_string(r.commit_index) %>
+

Followers

+ + + + + + + + + + + + + <% for (var node in members_info) { + i++; + var m = members_info[node]; + %> + > + + + + + + <% if(m.hasOwnProperty('commit_index_sent')) { %> + + <% } else { %> + + <% } %> + + + <% } %> +
NodeStatusNext IndexMatch IndexLagCommit Index SentVoter Status
<%= fmt_string(node) %><%= fmt_string(m.status) %><%= fmt_string(m.next_index) %><%= fmt_string(m.match_index) %><%= fmt_string(l.last_index - Math.max(l.first_index, m.match_index)) %><%= fmt_string(m.commit_index_sent) %><%= fmt_string("n/a") %><%= fmt_string(m.voter_status.membership) %>
+<% } else { %> +

... no info ...

+<% } %>