diff --git a/files/fuel-ha-utils/ocf/rabbitmq b/files/fuel-ha-utils/ocf/rabbitmq index abf5879f81..e75aedd98f 100755 --- a/files/fuel-ha-utils/ocf/rabbitmq +++ b/files/fuel-ha-utils/ocf/rabbitmq @@ -1500,6 +1500,7 @@ get_monitor() { local timeout_alive su_rabbit_cmd "${OCF_RESKEY_ctl} list_channels 2>&1 > /dev/null" rc_alive=$? + [ $rc_alive -eq 137 -o $rc_alive -eq 124 ] && ocf_log err "${LH} 'rabbitmqctl list_channels' timed out, per-node explanation: $(enhanced_list_channels)" check_timeouts $rc_alive "rabbit_list_channels_timeouts" "list_channels" timeout_alive=$? @@ -1693,6 +1694,114 @@ action_stop() { } +####################################################################### +# Enhanced list_channels: +# - nodes are processed in parallel +# - report contains information about which nodes timed out +# +# 'list_channels' is used as a healh-check for current node, but it +# actually checks overall health of all node in cluster. And there were +# some bugs where only one (non-local) channel became stuck, but OCF +# script was wrongfully killing local node. +# +# Hopefully all such bugs are fixed, but if not - it will allow to +# detect such conditions. +# +# Somewhat strange implementation is due to the following reasons: +# - ability to support older versions of RabbitMQ which have reached +# end-of-life with single version of the script +# - zero dependencies - for older versions this functionality could be +# implemented as a plugin, but it'll require this plugin installation +enhanced_list_channels() { + # One second less than timeout of su_rabbit_cmd + local timeout=$((${TIMEOUT_ARG:-5} - 1)) + + su_rabbit_cmd "xargs -0 ${OCF_RESKEY_ctl} eval" < + {Mega, Secs, Micro} = os:timestamp(), + Mili = Micro div 1000, + Mili + 1000 * (Secs + 1000000 * Mega) + end, + +%% We shouldn't continue execution past this time +ShouldEndAt = Now() + SecondsToCompletion * 1000, + +%% How many milliseconds we still have +Timeout = fun() -> + case ShouldEndAt - Now() of + Past when Past =< 0 -> + 0; + Timeout -> + Timeout + end + end, + +%% Lambda combinator - for defining anonymous recursive functions +Y = fun(F) -> + (fun (X) -> F(fun(Y) -> (X(X))(Y) end) end)( + fun (X) -> F(fun(Y) -> (X(X))(Y) end) end) + end, + +Parent = self(), + +ListChannels = Y(fun(Rec) -> + fun (({Node, [], OkChannelsCount})) -> + Parent ! {Node, ok, OkChannelsCount}; + ({Node, [Chan|Rest], OkChannelsCount}) -> + case catch rpc:call(Node, rabbit_channel, info, [Chan], Timeout()) of + Infos when is_list(Infos) -> + Rec({Node, Rest, OkChannelsCount + 1}); + {badrpc, {'EXIT', {noproc, _}}} -> + %% Channel became dead before we could request it's status, don't care + Rec({Node, Rest, OkChannelsCount}); + Err -> + Parent ! {Node, Err, OkChannelsCount} + end + end + end), + +SingleNodeListing = fun(Node) -> + case catch rpc:call(Node, pg_local, get_members, [rabbit_channels], Timeout()) of + LocalChannels when is_list(LocalChannels) -> + ListChannels({Node, LocalChannels, 0}); + Err -> + Parent ! {Node, Err, 0} + end + end, + +AllNodes = rabbit_mnesia:cluster_nodes(running), +[ spawn(fun() -> SingleNodeListing(Node) end) || Node <- AllNodes ], + +WaitForNodes = Y(fun(Rec) -> + fun ({[], Acc}) -> + Acc; + ({RemainingNodes, Acc}) -> + receive + {Node, _Status, _ChannelCount} = Smth -> + RemainingNodes1 = lists:delete(Node, RemainingNodes), + Rec({RemainingNodes1, [Smth|Acc]}) + after Timeout() + 100 -> + Acc + end + end + end), + +Result = WaitForNodes({AllNodes, []}), + +ExpandedResult = [ case lists:keysearch(Node, 1, Result) of + {value, NodeResult} -> + NodeResult; + false -> + {Node, no_data_collected, 0} + end || Node <- AllNodes ], + +ExpandedResult. +EOF +} + ####################################################################### # Join the cluster and return OCF_SUCCESS, if joined. # Return 10, if node is trying to join to itself or empty destination.