Refactor methods which query rabbit
Refactor methods which query rabbit to remove the duplication
around checking if json output is supported.
Change-Id: Id4e3dbd85748e41bb4b1c8db282495cfffaa823d
(cherry picked from commit dbab66b0c5
)
This commit is contained in:
parent
4072c67444
commit
091cd02ee2
|
@ -205,57 +205,120 @@ class RabbitmqError(Exception):
|
|||
pass
|
||||
|
||||
|
||||
def list_vhosts():
|
||||
def run_cmd(cmd):
|
||||
"""Run provided command and decode the output.
|
||||
|
||||
:param cmd: Command to run
|
||||
:type cmd: List[str]
|
||||
:returns: output from command
|
||||
:rtype: str
|
||||
"""
|
||||
Returns a list of all the available vhosts
|
||||
"""
|
||||
try:
|
||||
cmd = [RABBITMQ_CTL, 'list_vhosts']
|
||||
# NOTE(ajkavanagh): In focal and above, rabbitmq-server now has a
|
||||
# --formatter option.
|
||||
if caching_cmp_pkgrevno('rabbitmq-server', '3.8.2') >= 0:
|
||||
cmd.append('--formatter=json')
|
||||
output = subprocess.check_output(cmd)
|
||||
output = output.decode('utf-8')
|
||||
return output
|
||||
|
||||
if caching_cmp_pkgrevno('rabbitmq-server', '3.8.2') >= 0:
|
||||
decoded = json.loads(output)
|
||||
return [ll['name'] for ll in decoded]
|
||||
# NOTE(jamespage): Earlier rabbitmqctl versions append "...done"
|
||||
# to the output of list_vhosts
|
||||
elif '...done' in output:
|
||||
|
||||
def rabbit_supports_json():
|
||||
"""Check if version of rabbit supports json formatted output.
|
||||
|
||||
:returns: If json output is supported.
|
||||
:rtype: bool
|
||||
"""
|
||||
return caching_cmp_pkgrevno('rabbitmq-server', '3.8.2') >= 0
|
||||
|
||||
|
||||
@cached
|
||||
def caching_cmp_pkgrevno(package, revno, pkgcache=None):
|
||||
"""Compare supplied revno with the revno of the installed package.
|
||||
|
||||
* 1 => Installed revno is greater than supplied arg
|
||||
* 0 => Installed revno is the same as supplied arg
|
||||
* -1 => Installed revno is less than supplied arg
|
||||
|
||||
:param package: Package to check revno of
|
||||
:type package: str
|
||||
:param revno: Revision number to compare against
|
||||
:type revno: str
|
||||
:param pkgcache: Version obj from pkgcache
|
||||
:type pkgcache: ubuntu_apt_pkg.Version
|
||||
:returns: Whether versions match
|
||||
:rtype: int
|
||||
"""
|
||||
return cmp_pkgrevno(package, revno, pkgcache)
|
||||
|
||||
|
||||
def query_rabbit(cmd, raw_processor=None, json_processor=None,
|
||||
binary=RABBITMQ_CTL):
|
||||
"""Run query against rabbit.
|
||||
|
||||
Run query against rabbit and then run post-query processor on the
|
||||
output. If the version of rabbit that is installed supports formatting
|
||||
the output in json format then the '--formatter=json' flag is added.
|
||||
|
||||
:param cmd: Query to run
|
||||
:type cmd: List[str]
|
||||
:param raw_processor: Function to call with command output as the only
|
||||
argument.
|
||||
:type raw_processor: Callable
|
||||
:param json_processor: Function to call with json loaded output as the only
|
||||
argument.
|
||||
:type json_processor: Callable
|
||||
:returns: Return processed output from query
|
||||
:rtype: ANY
|
||||
"""
|
||||
cmd.insert(0, binary)
|
||||
if rabbit_supports_json():
|
||||
cmd.append('--formatter=json')
|
||||
output = json.loads(run_cmd(cmd))
|
||||
if json_processor:
|
||||
return json_processor(output)
|
||||
else:
|
||||
# A processor may not be needed for loaded json.
|
||||
return output
|
||||
else:
|
||||
if raw_processor:
|
||||
return raw_processor(run_cmd(cmd))
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
def list_vhosts():
|
||||
"""Returns a list of all the available vhosts
|
||||
|
||||
:returns: List of vhosts
|
||||
:rtype: [str]
|
||||
"""
|
||||
def _json_processor(output):
|
||||
return [ll['name'] for ll in output]
|
||||
|
||||
def _raw_processor(output):
|
||||
if '...done' in output:
|
||||
return output.split('\n')[1:-2]
|
||||
else:
|
||||
return output.split('\n')[1:-1]
|
||||
|
||||
try:
|
||||
return query_rabbit(
|
||||
['list_vhosts'],
|
||||
raw_processor=_raw_processor,
|
||||
json_processor=_json_processor)
|
||||
except Exception as ex:
|
||||
# if no vhosts, just raises an exception
|
||||
log(str(ex), level='DEBUG')
|
||||
return []
|
||||
|
||||
|
||||
def vhost_queue_info(vhost):
|
||||
"""Provide a list of queue info objects for the given vhost in RabbitMQ
|
||||
Each object provides name (str), messages (int), and consumers (int)
|
||||
@raises CalledProcessError on failure to list_queues of the vhost
|
||||
def list_vhost_queue_info(vhost):
|
||||
"""Provide a list of queue info objects for the given vhost.
|
||||
|
||||
:returns: List of dictionaries of queue information
|
||||
eg [{'name': 'queue name', 'messages': 0, 'consumers': 1}, ...]
|
||||
:rtype: List[Dict[str, Union[str, int]]]
|
||||
:raises: CalledProcessError
|
||||
"""
|
||||
cmd = [RABBITMQ_CTL, '-p', vhost, 'list_queues',
|
||||
'name', 'messages', 'consumers']
|
||||
# NOTE(ajkavanagh): In focal and above, rabbitmq-server now has a
|
||||
# --formatter option.
|
||||
if caching_cmp_pkgrevno('rabbitmq-server', '3.8.2') >= 0:
|
||||
cmd.append('--formatter=json')
|
||||
output = subprocess.check_output(cmd).decode('utf-8')
|
||||
|
||||
def _raw_processor(output):
|
||||
queue_info = []
|
||||
|
||||
if caching_cmp_pkgrevno('rabbitmq-server', '3.8.2') >= 0:
|
||||
decoded = json.loads(output)
|
||||
# note that the json is already in the desired output of queue_info
|
||||
# below
|
||||
return decoded
|
||||
# NOTE(jamespage): Earlier rabbitmqctl versions append "...done"
|
||||
# to the output of list_queues
|
||||
elif '...done' in output:
|
||||
if '...done' in output:
|
||||
queues = output.split('\n')[1:-2]
|
||||
else:
|
||||
queues = output.split('\n')[1:-1]
|
||||
|
@ -270,6 +333,34 @@ def vhost_queue_info(vhost):
|
|||
|
||||
return queue_info
|
||||
|
||||
cmd = ['-p', vhost, 'list_queues', 'name', 'messages', 'consumers']
|
||||
return query_rabbit(
|
||||
cmd,
|
||||
raw_processor=_raw_processor)
|
||||
|
||||
|
||||
def list_users():
|
||||
"""Returns a list of users.
|
||||
|
||||
:returns: List of users
|
||||
:rtype: [str]
|
||||
"""
|
||||
def _json_processor(output):
|
||||
return [ll['user'] for ll in output]
|
||||
|
||||
def _raw_processor(output):
|
||||
lines = output.split('\n')[1:]
|
||||
return [line.split('\t')[0] for line in lines]
|
||||
|
||||
return query_rabbit(
|
||||
['list_users'],
|
||||
raw_processor=_raw_processor,
|
||||
json_processor=_json_processor)
|
||||
|
||||
|
||||
def vhost_queue_info(vhost):
|
||||
return list_vhost_queue_info(vhost)
|
||||
|
||||
|
||||
def vhost_exists(vhost):
|
||||
return vhost in list_vhosts()
|
||||
|
@ -283,25 +374,7 @@ def create_vhost(vhost):
|
|||
|
||||
|
||||
def user_exists(user):
|
||||
cmd = [RABBITMQ_CTL, 'list_users']
|
||||
# NOTE(ajkavanagh): In focal and above, rabbitmq-server now has a
|
||||
# --formatter option.
|
||||
if caching_cmp_pkgrevno('rabbitmq-server', '3.8.2') >= 0:
|
||||
cmd.append('--formatter=json')
|
||||
out = subprocess.check_output(cmd).decode('utf-8')
|
||||
decoded = json.loads(out)
|
||||
users = [ll['user'] for ll in decoded]
|
||||
return user in users
|
||||
|
||||
# NOTE(ajkavanagh): pre 3.8.2 the code needs to deal with just a text
|
||||
# output
|
||||
out = subprocess.check_output(cmd).decode('utf-8')
|
||||
lines = out.split('\n')[1:]
|
||||
for line in lines:
|
||||
_user = line.split('\t')[0]
|
||||
if _user == user:
|
||||
return True
|
||||
return False
|
||||
return user in list_users()
|
||||
|
||||
|
||||
def create_user(user, password, tags=[]):
|
||||
|
@ -340,6 +413,16 @@ def change_user_password(user, new_password):
|
|||
|
||||
|
||||
def grant_permissions(user, vhost):
|
||||
"""Grant all permissions on a vhost to a user.
|
||||
|
||||
:param user: Name of user to give permissions to.
|
||||
:type user: str
|
||||
:param vhost: Name of vhost to give permissions on
|
||||
:type vhost: str
|
||||
"""
|
||||
log(
|
||||
"Granting permissions for user {} on vhost {}".format(user, vhost),
|
||||
level='DEBUG')
|
||||
log("Granting permissions", level='DEBUG')
|
||||
rabbitmqctl('set_permissions', '-p',
|
||||
vhost, user, '.*', '.*', '.*')
|
||||
|
@ -351,11 +434,6 @@ def set_policy(vhost, policy_name, match, value):
|
|||
policy_name, match, value)
|
||||
|
||||
|
||||
@cached
|
||||
def caching_cmp_pkgrevno(package, revno, pkgcache=None):
|
||||
return cmp_pkgrevno(package, revno, pkgcache)
|
||||
|
||||
|
||||
def set_ha_mode(vhost, mode, params=None, sync_mode='automatic'):
|
||||
"""Valid mode values:
|
||||
|
||||
|
@ -684,12 +762,21 @@ def leave_cluster():
|
|||
raise
|
||||
|
||||
|
||||
def get_plugin_manager():
|
||||
"""Find the path to the executable for managing plugins.
|
||||
|
||||
:returns: Path to rabbitmq-plugins executable
|
||||
:rtype: str
|
||||
"""
|
||||
manager = glob.glob(
|
||||
'/usr/lib/rabbitmq/lib/rabbitmq_server-*/sbin/rabbitmq-plugins')[0]
|
||||
return manager
|
||||
|
||||
|
||||
def _manage_plugin(plugin, action):
|
||||
os.environ['HOME'] = '/root'
|
||||
_rabbitmq_plugins = \
|
||||
glob.glob('/usr/lib/rabbitmq/lib/rabbitmq_server-*'
|
||||
'/sbin/rabbitmq-plugins')[0]
|
||||
subprocess.check_call([_rabbitmq_plugins, action, plugin])
|
||||
plugin_manager = get_plugin_manager()
|
||||
subprocess.check_call([plugin_manager, action, plugin])
|
||||
|
||||
|
||||
def enable_plugin(plugin):
|
||||
|
|
|
@ -1378,7 +1378,6 @@ class UtilsTests(CharmTestCase):
|
|||
# default config value should show 10 minutes
|
||||
max_age = rabbit_utils.get_max_stats_file_age()
|
||||
self.assertEqual(600, max_age)
|
||||
|
||||
# changing to run every 15 minutes shows 30 minutes
|
||||
self.test_config.set('stats_cron_schedule', '*/15 * * * *')
|
||||
max_age = rabbit_utils.get_max_stats_file_age()
|
||||
|
@ -1485,3 +1484,10 @@ class UtilsTests(CharmTestCase):
|
|||
mock_get_usernames_for_passwords_on_disk.return_value = ['a']
|
||||
self.assertEqual(rabbit_utils.get_usernames_for_passwords(),
|
||||
['b', 'c'])
|
||||
|
||||
@mock.patch('rabbit_utils.caching_cmp_pkgrevno')
|
||||
def test_rabbit_supports_json(self, mock_cmp_pkgrevno):
|
||||
mock_cmp_pkgrevno.return_value = 1
|
||||
self.assertTrue(rabbit_utils.rabbit_supports_json())
|
||||
mock_cmp_pkgrevno.return_value = -1
|
||||
self.assertFalse(rabbit_utils.rabbit_supports_json())
|
||||
|
|
Loading…
Reference in New Issue