Add action to check rmq queues
Report back to the user any queues with greater than N messages. Change-Id: Ia39c05a4fe0ce74682af24ffe3d17d3006001b62 Closes-Bug: 1716981
This commit is contained in:
parent
33e0d1d74b
commit
58b8ca09fc
19
actions.yaml
19
actions.yaml
|
@ -3,4 +3,21 @@ pause:
|
|||
resume:
|
||||
descrpition: Resume the rabbitmq unit.
|
||||
cluster-status:
|
||||
description: Show the current cluster status.
|
||||
description: Show the current cluster status.
|
||||
check-queues:
|
||||
description: |
|
||||
Show current queues, optionally only show queues with more than N messages
|
||||
or queues from specified vhost.
|
||||
params:
|
||||
queue-depth:
|
||||
type: integer
|
||||
default: -1
|
||||
description: |
|
||||
Only show queues with >= this many messages. -1 shows all. Note that
|
||||
if the result exceeds command line length (1/4 ulimit -s) on the target
|
||||
system this will fail (For ex; -1 in an openstack env)
|
||||
See lp:1437366, lp:1274460
|
||||
vhost:
|
||||
type: string
|
||||
default: "/"
|
||||
description: Show queues from the specified vhost. Eg; "openstack".
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
import os
|
||||
import sys
|
||||
import re
|
||||
|
||||
from subprocess import (
|
||||
check_output,
|
||||
CalledProcessError,
|
||||
|
@ -26,6 +28,7 @@ sys.path.append('hooks/')
|
|||
from charmhelpers.core.hookenv import (
|
||||
action_fail,
|
||||
action_set,
|
||||
action_get,
|
||||
)
|
||||
|
||||
from rabbit_utils import (
|
||||
|
@ -62,9 +65,34 @@ def cluster_status(args):
|
|||
raise
|
||||
|
||||
|
||||
def check_queues(args):
|
||||
"""Check for queues with greater than N messages.
|
||||
Return those queues to the user."""
|
||||
queue_depth = (action_get('queue-depth'))
|
||||
vhost = (action_get('vhost'))
|
||||
result = []
|
||||
# rabbitmqctl's output contains lines we don't want, such as
|
||||
# 'Listing queues ..' and '...done.', which may vary by release.
|
||||
# Actual queue results *should* always look like 'test\t0'
|
||||
queue_pattern = re.compile('.*\t[0-9]*')
|
||||
try:
|
||||
queues = check_output(['rabbitmqctl', 'list_queues',
|
||||
'-p', vhost]).split('\n')
|
||||
result = list({queue: size for (queue, size) in
|
||||
[i.split('\t') for i in queues
|
||||
if re.search(queue_pattern, i)]
|
||||
if int(size) >= queue_depth})
|
||||
|
||||
action_set({'output': result, 'outcome': 'Success'})
|
||||
except CalledProcessError as e:
|
||||
action_set({'output': e.output})
|
||||
action_fail('Failed to run rabbitmqctl list_queues')
|
||||
|
||||
|
||||
# A dictionary of all the defined actions to callables (which take
|
||||
# parsed arguments).
|
||||
ACTIONS = {"pause": pause, "resume": resume, "cluster-status": cluster_status}
|
||||
ACTIONS = {"pause": pause, "resume": resume, "cluster-status": cluster_status,
|
||||
"check-queues": check_queues}
|
||||
|
||||
|
||||
def main(args):
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
actions.py
|
|
@ -715,3 +715,10 @@ class RmqBasicDeployment(OpenStackAmuletDeployment):
|
|||
assert u.wait_on_action(action_id), "Cluster status action failed."
|
||||
|
||||
u.log.debug('OK')
|
||||
|
||||
def test_912_check_queues(self):
|
||||
""" rabbitmqctl check_queues action can be returned. """
|
||||
u.log.debug('Checking cluster status action...')
|
||||
|
||||
action_id = u.run_action(self.rmq0_sentry, "check-queues")
|
||||
assert u.wait_on_action(action_id), "Check queues action failed."
|
||||
|
|
|
@ -77,6 +77,40 @@ class ClusterStatusTestCase(CharmTestCase):
|
|||
self.action_fail.assert_called()
|
||||
|
||||
|
||||
class CheckQueuesTestCase(CharmTestCase):
|
||||
TEST_QUEUE_RESULT = 'Listing queues ...\ntest\t0\ntest\t0\n""'
|
||||
|
||||
def dummy_action_get(self, key):
|
||||
action_values = {"queue-depth": -1, "vhost": "/"}
|
||||
return action_values[key]
|
||||
|
||||
def setUp(self):
|
||||
super(CheckQueuesTestCase, self).setUp(
|
||||
actions, ["check_output", "action_set", "action_fail",
|
||||
"ConfigRenderer", "action_get"])
|
||||
|
||||
def test_check_queues(self):
|
||||
self.action_get.side_effect = self.dummy_action_get
|
||||
self.check_output.return_value = self.TEST_QUEUE_RESULT
|
||||
|
||||
actions.check_queues([])
|
||||
self.check_output.assert_called_once_with(['rabbitmqctl',
|
||||
'list_queues',
|
||||
'-p', "/"])
|
||||
self.action_set.assert_called()
|
||||
|
||||
def test_check_queues_execption(self):
|
||||
self.action_get.side_effect = self.dummy_action_get
|
||||
self.check_output.return_value = self.TEST_QUEUE_RESULT
|
||||
|
||||
self.check_output.side_effect = actions.CalledProcessError(1,
|
||||
"Failure")
|
||||
actions.check_queues([])
|
||||
self.check_output.assert_called_once_with(['rabbitmqctl',
|
||||
'list_queues',
|
||||
'-p', '/'])
|
||||
|
||||
|
||||
class MainTestCase(CharmTestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
|
Loading…
Reference in New Issue