[gnuoy, trivial] Fix charmhelper sync location for amulet tests and sync
This commit is contained in:
parent
67e61c7d61
commit
b6febcf116
|
@ -1,4 +1,4 @@
|
||||||
branch: lp:~gnuoy/charm-helpers/cisco-vpp/
|
branch: lp:charm-helpers
|
||||||
destination: tests/charmhelpers
|
destination: tests/charmhelpers
|
||||||
include:
|
include:
|
||||||
- contrib.amulet
|
- contrib.amulet
|
||||||
|
|
|
@ -64,6 +64,7 @@ class AmuletDeployment(object):
|
||||||
|
|
||||||
if 'units' not in svc:
|
if 'units' not in svc:
|
||||||
svc['units'] = 1
|
svc['units'] = 1
|
||||||
|
|
||||||
self.d.add(svc['name'], charm=branch_location, units=svc['units'],
|
self.d.add(svc['name'], charm=branch_location, units=svc['units'],
|
||||||
constraints=svc.get('constraints'))
|
constraints=svc.get('constraints'))
|
||||||
|
|
||||||
|
|
|
@ -15,11 +15,15 @@
|
||||||
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
|
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
import io
|
import io
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
import socket
|
||||||
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
import uuid
|
||||||
|
|
||||||
import amulet
|
import amulet
|
||||||
import distro_info
|
import distro_info
|
||||||
|
@ -112,7 +116,7 @@ class AmuletUtils(object):
|
||||||
# /!\ DEPRECATION WARNING (beisner):
|
# /!\ DEPRECATION WARNING (beisner):
|
||||||
# New and existing tests should be rewritten to use
|
# New and existing tests should be rewritten to use
|
||||||
# validate_services_by_name() as it is aware of init systems.
|
# validate_services_by_name() as it is aware of init systems.
|
||||||
self.log.warn('/!\\ DEPRECATION WARNING: use '
|
self.log.warn('DEPRECATION WARNING: use '
|
||||||
'validate_services_by_name instead of validate_services '
|
'validate_services_by_name instead of validate_services '
|
||||||
'due to init system differences.')
|
'due to init system differences.')
|
||||||
|
|
||||||
|
@ -267,33 +271,52 @@ class AmuletUtils(object):
|
||||||
"""Get last modification time of directory."""
|
"""Get last modification time of directory."""
|
||||||
return sentry_unit.directory_stat(directory)['mtime']
|
return sentry_unit.directory_stat(directory)['mtime']
|
||||||
|
|
||||||
def _get_proc_start_time(self, sentry_unit, service, pgrep_full=False):
|
def _get_proc_start_time(self, sentry_unit, service, pgrep_full=None):
|
||||||
"""Get process' start time.
|
"""Get start time of a process based on the last modification time
|
||||||
|
of the /proc/pid directory.
|
||||||
|
|
||||||
Determine start time of the process based on the last modification
|
:sentry_unit: The sentry unit to check for the service on
|
||||||
time of the /proc/pid directory. If pgrep_full is True, the process
|
:service: service name to look for in process table
|
||||||
name is matched against the full command line.
|
:pgrep_full: [Deprecated] Use full command line search mode with pgrep
|
||||||
|
:returns: epoch time of service process start
|
||||||
|
:param commands: list of bash commands
|
||||||
|
:param sentry_units: list of sentry unit pointers
|
||||||
|
:returns: None if successful; Failure message otherwise
|
||||||
"""
|
"""
|
||||||
if pgrep_full:
|
if pgrep_full is not None:
|
||||||
cmd = 'pgrep -o -f {}'.format(service)
|
# /!\ DEPRECATION WARNING (beisner):
|
||||||
else:
|
# No longer implemented, as pidof is now used instead of pgrep.
|
||||||
cmd = 'pgrep -o {}'.format(service)
|
# https://bugs.launchpad.net/charm-helpers/+bug/1474030
|
||||||
cmd = cmd + ' | grep -v pgrep || exit 0'
|
self.log.warn('DEPRECATION WARNING: pgrep_full bool is no '
|
||||||
cmd_out = sentry_unit.run(cmd)
|
'longer implemented re: lp 1474030.')
|
||||||
self.log.debug('CMDout: ' + str(cmd_out))
|
|
||||||
if cmd_out[0]:
|
pid_list = self.get_process_id_list(sentry_unit, service)
|
||||||
self.log.debug('Pid for %s %s' % (service, str(cmd_out[0])))
|
pid = pid_list[0]
|
||||||
proc_dir = '/proc/{}'.format(cmd_out[0].strip())
|
proc_dir = '/proc/{}'.format(pid)
|
||||||
|
self.log.debug('Pid for {} on {}: {}'.format(
|
||||||
|
service, sentry_unit.info['unit_name'], pid))
|
||||||
|
|
||||||
return self._get_dir_mtime(sentry_unit, proc_dir)
|
return self._get_dir_mtime(sentry_unit, proc_dir)
|
||||||
|
|
||||||
def service_restarted(self, sentry_unit, service, filename,
|
def service_restarted(self, sentry_unit, service, filename,
|
||||||
pgrep_full=False, sleep_time=20):
|
pgrep_full=None, sleep_time=20):
|
||||||
"""Check if service was restarted.
|
"""Check if service was restarted.
|
||||||
|
|
||||||
Compare a service's start time vs a file's last modification time
|
Compare a service's start time vs a file's last modification time
|
||||||
(such as a config file for that service) to determine if the service
|
(such as a config file for that service) to determine if the service
|
||||||
has been restarted.
|
has been restarted.
|
||||||
"""
|
"""
|
||||||
|
# /!\ DEPRECATION WARNING (beisner):
|
||||||
|
# This method is prone to races in that no before-time is known.
|
||||||
|
# Use validate_service_config_changed instead.
|
||||||
|
|
||||||
|
# NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
|
||||||
|
# used instead of pgrep. pgrep_full is still passed through to ensure
|
||||||
|
# deprecation WARNS. lp1474030
|
||||||
|
self.log.warn('DEPRECATION WARNING: use '
|
||||||
|
'validate_service_config_changed instead of '
|
||||||
|
'service_restarted due to known races.')
|
||||||
|
|
||||||
time.sleep(sleep_time)
|
time.sleep(sleep_time)
|
||||||
if (self._get_proc_start_time(sentry_unit, service, pgrep_full) >=
|
if (self._get_proc_start_time(sentry_unit, service, pgrep_full) >=
|
||||||
self._get_file_mtime(sentry_unit, filename)):
|
self._get_file_mtime(sentry_unit, filename)):
|
||||||
|
@ -302,78 +325,122 @@ class AmuletUtils(object):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def service_restarted_since(self, sentry_unit, mtime, service,
|
def service_restarted_since(self, sentry_unit, mtime, service,
|
||||||
pgrep_full=False, sleep_time=20,
|
pgrep_full=None, sleep_time=20,
|
||||||
retry_count=2):
|
retry_count=30, retry_sleep_time=10):
|
||||||
"""Check if service was been started after a given time.
|
"""Check if service was been started after a given time.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
sentry_unit (sentry): The sentry unit to check for the service on
|
sentry_unit (sentry): The sentry unit to check for the service on
|
||||||
mtime (float): The epoch time to check against
|
mtime (float): The epoch time to check against
|
||||||
service (string): service name to look for in process table
|
service (string): service name to look for in process table
|
||||||
pgrep_full (boolean): Use full command line search mode with pgrep
|
pgrep_full: [Deprecated] Use full command line search mode with pgrep
|
||||||
sleep_time (int): Seconds to sleep before looking for process
|
sleep_time (int): Initial sleep time (s) before looking for file
|
||||||
retry_count (int): If service is not found, how many times to retry
|
retry_sleep_time (int): Time (s) to sleep between retries
|
||||||
|
retry_count (int): If file is not found, how many times to retry
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
bool: True if service found and its start time it newer than mtime,
|
bool: True if service found and its start time it newer than mtime,
|
||||||
False if service is older than mtime or if service was
|
False if service is older than mtime or if service was
|
||||||
not found.
|
not found.
|
||||||
"""
|
"""
|
||||||
self.log.debug('Checking %s restarted since %s' % (service, mtime))
|
# NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
|
||||||
|
# used instead of pgrep. pgrep_full is still passed through to ensure
|
||||||
|
# deprecation WARNS. lp1474030
|
||||||
|
|
||||||
|
unit_name = sentry_unit.info['unit_name']
|
||||||
|
self.log.debug('Checking that %s service restarted since %s on '
|
||||||
|
'%s' % (service, mtime, unit_name))
|
||||||
time.sleep(sleep_time)
|
time.sleep(sleep_time)
|
||||||
proc_start_time = self._get_proc_start_time(sentry_unit, service,
|
proc_start_time = None
|
||||||
|
tries = 0
|
||||||
|
while tries <= retry_count and not proc_start_time:
|
||||||
|
try:
|
||||||
|
proc_start_time = self._get_proc_start_time(sentry_unit,
|
||||||
|
service,
|
||||||
pgrep_full)
|
pgrep_full)
|
||||||
while retry_count > 0 and not proc_start_time:
|
self.log.debug('Attempt {} to get {} proc start time on {} '
|
||||||
self.log.debug('No pid file found for service %s, will retry %i '
|
'OK'.format(tries, service, unit_name))
|
||||||
'more times' % (service, retry_count))
|
except IOError as e:
|
||||||
time.sleep(30)
|
# NOTE(beisner) - race avoidance, proc may not exist yet.
|
||||||
proc_start_time = self._get_proc_start_time(sentry_unit, service,
|
# https://bugs.launchpad.net/charm-helpers/+bug/1474030
|
||||||
pgrep_full)
|
self.log.debug('Attempt {} to get {} proc start time on {} '
|
||||||
retry_count = retry_count - 1
|
'failed\n{}'.format(tries, service,
|
||||||
|
unit_name, e))
|
||||||
|
time.sleep(retry_sleep_time)
|
||||||
|
tries += 1
|
||||||
|
|
||||||
if not proc_start_time:
|
if not proc_start_time:
|
||||||
self.log.warn('No proc start time found, assuming service did '
|
self.log.warn('No proc start time found, assuming service did '
|
||||||
'not start')
|
'not start')
|
||||||
return False
|
return False
|
||||||
if proc_start_time >= mtime:
|
if proc_start_time >= mtime:
|
||||||
self.log.debug('proc start time is newer than provided mtime'
|
self.log.debug('Proc start time is newer than provided mtime'
|
||||||
'(%s >= %s)' % (proc_start_time, mtime))
|
'(%s >= %s) on %s (OK)' % (proc_start_time,
|
||||||
|
mtime, unit_name))
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
self.log.warn('proc start time (%s) is older than provided mtime '
|
self.log.warn('Proc start time (%s) is older than provided mtime '
|
||||||
'(%s), service did not restart' % (proc_start_time,
|
'(%s) on %s, service did not '
|
||||||
mtime))
|
'restart' % (proc_start_time, mtime, unit_name))
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def config_updated_since(self, sentry_unit, filename, mtime,
|
def config_updated_since(self, sentry_unit, filename, mtime,
|
||||||
sleep_time=20):
|
sleep_time=20, retry_count=30,
|
||||||
|
retry_sleep_time=10):
|
||||||
"""Check if file was modified after a given time.
|
"""Check if file was modified after a given time.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
sentry_unit (sentry): The sentry unit to check the file mtime on
|
sentry_unit (sentry): The sentry unit to check the file mtime on
|
||||||
filename (string): The file to check mtime of
|
filename (string): The file to check mtime of
|
||||||
mtime (float): The epoch time to check against
|
mtime (float): The epoch time to check against
|
||||||
sleep_time (int): Seconds to sleep before looking for process
|
sleep_time (int): Initial sleep time (s) before looking for file
|
||||||
|
retry_sleep_time (int): Time (s) to sleep between retries
|
||||||
|
retry_count (int): If file is not found, how many times to retry
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
bool: True if file was modified more recently than mtime, False if
|
bool: True if file was modified more recently than mtime, False if
|
||||||
file was modified before mtime,
|
file was modified before mtime, or if file not found.
|
||||||
"""
|
"""
|
||||||
self.log.debug('Checking %s updated since %s' % (filename, mtime))
|
unit_name = sentry_unit.info['unit_name']
|
||||||
|
self.log.debug('Checking that %s updated since %s on '
|
||||||
|
'%s' % (filename, mtime, unit_name))
|
||||||
time.sleep(sleep_time)
|
time.sleep(sleep_time)
|
||||||
|
file_mtime = None
|
||||||
|
tries = 0
|
||||||
|
while tries <= retry_count and not file_mtime:
|
||||||
|
try:
|
||||||
file_mtime = self._get_file_mtime(sentry_unit, filename)
|
file_mtime = self._get_file_mtime(sentry_unit, filename)
|
||||||
|
self.log.debug('Attempt {} to get {} file mtime on {} '
|
||||||
|
'OK'.format(tries, filename, unit_name))
|
||||||
|
except IOError as e:
|
||||||
|
# NOTE(beisner) - race avoidance, file may not exist yet.
|
||||||
|
# https://bugs.launchpad.net/charm-helpers/+bug/1474030
|
||||||
|
self.log.debug('Attempt {} to get {} file mtime on {} '
|
||||||
|
'failed\n{}'.format(tries, filename,
|
||||||
|
unit_name, e))
|
||||||
|
time.sleep(retry_sleep_time)
|
||||||
|
tries += 1
|
||||||
|
|
||||||
|
if not file_mtime:
|
||||||
|
self.log.warn('Could not determine file mtime, assuming '
|
||||||
|
'file does not exist')
|
||||||
|
return False
|
||||||
|
|
||||||
if file_mtime >= mtime:
|
if file_mtime >= mtime:
|
||||||
self.log.debug('File mtime is newer than provided mtime '
|
self.log.debug('File mtime is newer than provided mtime '
|
||||||
'(%s >= %s)' % (file_mtime, mtime))
|
'(%s >= %s) on %s (OK)' % (file_mtime,
|
||||||
|
mtime, unit_name))
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
self.log.warn('File mtime %s is older than provided mtime %s'
|
self.log.warn('File mtime is older than provided mtime'
|
||||||
% (file_mtime, mtime))
|
'(%s < on %s) on %s' % (file_mtime,
|
||||||
|
mtime, unit_name))
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def validate_service_config_changed(self, sentry_unit, mtime, service,
|
def validate_service_config_changed(self, sentry_unit, mtime, service,
|
||||||
filename, pgrep_full=False,
|
filename, pgrep_full=None,
|
||||||
sleep_time=20, retry_count=2):
|
sleep_time=20, retry_count=30,
|
||||||
|
retry_sleep_time=10):
|
||||||
"""Check service and file were updated after mtime
|
"""Check service and file were updated after mtime
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
@ -381,9 +448,10 @@ class AmuletUtils(object):
|
||||||
mtime (float): The epoch time to check against
|
mtime (float): The epoch time to check against
|
||||||
service (string): service name to look for in process table
|
service (string): service name to look for in process table
|
||||||
filename (string): The file to check mtime of
|
filename (string): The file to check mtime of
|
||||||
pgrep_full (boolean): Use full command line search mode with pgrep
|
pgrep_full: [Deprecated] Use full command line search mode with pgrep
|
||||||
sleep_time (int): Seconds to sleep before looking for process
|
sleep_time (int): Initial sleep in seconds to pass to test helpers
|
||||||
retry_count (int): If service is not found, how many times to retry
|
retry_count (int): If service is not found, how many times to retry
|
||||||
|
retry_sleep_time (int): Time in seconds to wait between retries
|
||||||
|
|
||||||
Typical Usage:
|
Typical Usage:
|
||||||
u = OpenStackAmuletUtils(ERROR)
|
u = OpenStackAmuletUtils(ERROR)
|
||||||
|
@ -400,15 +468,27 @@ class AmuletUtils(object):
|
||||||
mtime, False if service is older than mtime or if service was
|
mtime, False if service is older than mtime or if service was
|
||||||
not found or if filename was modified before mtime.
|
not found or if filename was modified before mtime.
|
||||||
"""
|
"""
|
||||||
self.log.debug('Checking %s restarted since %s' % (service, mtime))
|
|
||||||
time.sleep(sleep_time)
|
# NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
|
||||||
service_restart = self.service_restarted_since(sentry_unit, mtime,
|
# used instead of pgrep. pgrep_full is still passed through to ensure
|
||||||
|
# deprecation WARNS. lp1474030
|
||||||
|
|
||||||
|
service_restart = self.service_restarted_since(
|
||||||
|
sentry_unit, mtime,
|
||||||
service,
|
service,
|
||||||
pgrep_full=pgrep_full,
|
pgrep_full=pgrep_full,
|
||||||
sleep_time=0,
|
sleep_time=sleep_time,
|
||||||
retry_count=retry_count)
|
retry_count=retry_count,
|
||||||
config_update = self.config_updated_since(sentry_unit, filename, mtime,
|
retry_sleep_time=retry_sleep_time)
|
||||||
sleep_time=0)
|
|
||||||
|
config_update = self.config_updated_since(
|
||||||
|
sentry_unit,
|
||||||
|
filename,
|
||||||
|
mtime,
|
||||||
|
sleep_time=sleep_time,
|
||||||
|
retry_count=retry_count,
|
||||||
|
retry_sleep_time=retry_sleep_time)
|
||||||
|
|
||||||
return service_restart and config_update
|
return service_restart and config_update
|
||||||
|
|
||||||
def get_sentry_time(self, sentry_unit):
|
def get_sentry_time(self, sentry_unit):
|
||||||
|
@ -426,7 +506,6 @@ class AmuletUtils(object):
|
||||||
"""Return a list of all Ubuntu releases in order of release."""
|
"""Return a list of all Ubuntu releases in order of release."""
|
||||||
_d = distro_info.UbuntuDistroInfo()
|
_d = distro_info.UbuntuDistroInfo()
|
||||||
_release_list = _d.all
|
_release_list = _d.all
|
||||||
self.log.debug('Ubuntu release list: {}'.format(_release_list))
|
|
||||||
return _release_list
|
return _release_list
|
||||||
|
|
||||||
def file_to_url(self, file_rel_path):
|
def file_to_url(self, file_rel_path):
|
||||||
|
@ -458,15 +537,20 @@ class AmuletUtils(object):
|
||||||
cmd, code, output))
|
cmd, code, output))
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def get_process_id_list(self, sentry_unit, process_name):
|
def get_process_id_list(self, sentry_unit, process_name,
|
||||||
|
expect_success=True):
|
||||||
"""Get a list of process ID(s) from a single sentry juju unit
|
"""Get a list of process ID(s) from a single sentry juju unit
|
||||||
for a single process name.
|
for a single process name.
|
||||||
|
|
||||||
:param sentry_unit: Pointer to amulet sentry instance (juju unit)
|
:param sentry_unit: Amulet sentry instance (juju unit)
|
||||||
:param process_name: Process name
|
:param process_name: Process name
|
||||||
|
:param expect_success: If False, expect the PID to be missing,
|
||||||
|
raise if it is present.
|
||||||
:returns: List of process IDs
|
:returns: List of process IDs
|
||||||
"""
|
"""
|
||||||
cmd = 'pidof {}'.format(process_name)
|
cmd = 'pidof -x {}'.format(process_name)
|
||||||
|
if not expect_success:
|
||||||
|
cmd += " || exit 0 && exit 1"
|
||||||
output, code = sentry_unit.run(cmd)
|
output, code = sentry_unit.run(cmd)
|
||||||
if code != 0:
|
if code != 0:
|
||||||
msg = ('{} `{}` returned {} '
|
msg = ('{} `{}` returned {} '
|
||||||
|
@ -475,14 +559,23 @@ class AmuletUtils(object):
|
||||||
amulet.raise_status(amulet.FAIL, msg=msg)
|
amulet.raise_status(amulet.FAIL, msg=msg)
|
||||||
return str(output).split()
|
return str(output).split()
|
||||||
|
|
||||||
def get_unit_process_ids(self, unit_processes):
|
def get_unit_process_ids(self, unit_processes, expect_success=True):
|
||||||
"""Construct a dict containing unit sentries, process names, and
|
"""Construct a dict containing unit sentries, process names, and
|
||||||
process IDs."""
|
process IDs.
|
||||||
|
|
||||||
|
:param unit_processes: A dictionary of Amulet sentry instance
|
||||||
|
to list of process names.
|
||||||
|
:param expect_success: if False expect the processes to not be
|
||||||
|
running, raise if they are.
|
||||||
|
:returns: Dictionary of Amulet sentry instance to dictionary
|
||||||
|
of process names to PIDs.
|
||||||
|
"""
|
||||||
pid_dict = {}
|
pid_dict = {}
|
||||||
for sentry_unit, process_list in unit_processes.iteritems():
|
for sentry_unit, process_list in six.iteritems(unit_processes):
|
||||||
pid_dict[sentry_unit] = {}
|
pid_dict[sentry_unit] = {}
|
||||||
for process in process_list:
|
for process in process_list:
|
||||||
pids = self.get_process_id_list(sentry_unit, process)
|
pids = self.get_process_id_list(
|
||||||
|
sentry_unit, process, expect_success=expect_success)
|
||||||
pid_dict[sentry_unit].update({process: pids})
|
pid_dict[sentry_unit].update({process: pids})
|
||||||
return pid_dict
|
return pid_dict
|
||||||
|
|
||||||
|
@ -496,7 +589,7 @@ class AmuletUtils(object):
|
||||||
return ('Unit count mismatch. expected, actual: {}, '
|
return ('Unit count mismatch. expected, actual: {}, '
|
||||||
'{} '.format(len(expected), len(actual)))
|
'{} '.format(len(expected), len(actual)))
|
||||||
|
|
||||||
for (e_sentry, e_proc_names) in expected.iteritems():
|
for (e_sentry, e_proc_names) in six.iteritems(expected):
|
||||||
e_sentry_name = e_sentry.info['unit_name']
|
e_sentry_name = e_sentry.info['unit_name']
|
||||||
if e_sentry in actual.keys():
|
if e_sentry in actual.keys():
|
||||||
a_proc_names = actual[e_sentry]
|
a_proc_names = actual[e_sentry]
|
||||||
|
@ -551,3 +644,175 @@ class AmuletUtils(object):
|
||||||
return 'Dicts within list are not identical'
|
return 'Dicts within list are not identical'
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def validate_sectionless_conf(self, file_contents, expected):
|
||||||
|
"""A crude conf parser. Useful to inspect configuration files which
|
||||||
|
do not have section headers (as would be necessary in order to use
|
||||||
|
the configparser). Such as openstack-dashboard or rabbitmq confs."""
|
||||||
|
for line in file_contents.split('\n'):
|
||||||
|
if '=' in line:
|
||||||
|
args = line.split('=')
|
||||||
|
if len(args) <= 1:
|
||||||
|
continue
|
||||||
|
key = args[0].strip()
|
||||||
|
value = args[1].strip()
|
||||||
|
if key in expected.keys():
|
||||||
|
if expected[key] != value:
|
||||||
|
msg = ('Config mismatch. Expected, actual: {}, '
|
||||||
|
'{}'.format(expected[key], value))
|
||||||
|
amulet.raise_status(amulet.FAIL, msg=msg)
|
||||||
|
|
||||||
|
def get_unit_hostnames(self, units):
|
||||||
|
"""Return a dict of juju unit names to hostnames."""
|
||||||
|
host_names = {}
|
||||||
|
for unit in units:
|
||||||
|
host_names[unit.info['unit_name']] = \
|
||||||
|
str(unit.file_contents('/etc/hostname').strip())
|
||||||
|
self.log.debug('Unit host names: {}'.format(host_names))
|
||||||
|
return host_names
|
||||||
|
|
||||||
|
def run_cmd_unit(self, sentry_unit, cmd):
|
||||||
|
"""Run a command on a unit, return the output and exit code."""
|
||||||
|
output, code = sentry_unit.run(cmd)
|
||||||
|
if code == 0:
|
||||||
|
self.log.debug('{} `{}` command returned {} '
|
||||||
|
'(OK)'.format(sentry_unit.info['unit_name'],
|
||||||
|
cmd, code))
|
||||||
|
else:
|
||||||
|
msg = ('{} `{}` command returned {} '
|
||||||
|
'{}'.format(sentry_unit.info['unit_name'],
|
||||||
|
cmd, code, output))
|
||||||
|
amulet.raise_status(amulet.FAIL, msg=msg)
|
||||||
|
return str(output), code
|
||||||
|
|
||||||
|
def file_exists_on_unit(self, sentry_unit, file_name):
|
||||||
|
"""Check if a file exists on a unit."""
|
||||||
|
try:
|
||||||
|
sentry_unit.file_stat(file_name)
|
||||||
|
return True
|
||||||
|
except IOError:
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
msg = 'Error checking file {}: {}'.format(file_name, e)
|
||||||
|
amulet.raise_status(amulet.FAIL, msg=msg)
|
||||||
|
|
||||||
|
def file_contents_safe(self, sentry_unit, file_name,
|
||||||
|
max_wait=60, fatal=False):
|
||||||
|
"""Get file contents from a sentry unit. Wrap amulet file_contents
|
||||||
|
with retry logic to address races where a file checks as existing,
|
||||||
|
but no longer exists by the time file_contents is called.
|
||||||
|
Return None if file not found. Optionally raise if fatal is True."""
|
||||||
|
unit_name = sentry_unit.info['unit_name']
|
||||||
|
file_contents = False
|
||||||
|
tries = 0
|
||||||
|
while not file_contents and tries < (max_wait / 4):
|
||||||
|
try:
|
||||||
|
file_contents = sentry_unit.file_contents(file_name)
|
||||||
|
except IOError:
|
||||||
|
self.log.debug('Attempt {} to open file {} from {} '
|
||||||
|
'failed'.format(tries, file_name,
|
||||||
|
unit_name))
|
||||||
|
time.sleep(4)
|
||||||
|
tries += 1
|
||||||
|
|
||||||
|
if file_contents:
|
||||||
|
return file_contents
|
||||||
|
elif not fatal:
|
||||||
|
return None
|
||||||
|
elif fatal:
|
||||||
|
msg = 'Failed to get file contents from unit.'
|
||||||
|
amulet.raise_status(amulet.FAIL, msg)
|
||||||
|
|
||||||
|
def port_knock_tcp(self, host="localhost", port=22, timeout=15):
|
||||||
|
"""Open a TCP socket to check for a listening sevice on a host.
|
||||||
|
|
||||||
|
:param host: host name or IP address, default to localhost
|
||||||
|
:param port: TCP port number, default to 22
|
||||||
|
:param timeout: Connect timeout, default to 15 seconds
|
||||||
|
:returns: True if successful, False if connect failed
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Resolve host name if possible
|
||||||
|
try:
|
||||||
|
connect_host = socket.gethostbyname(host)
|
||||||
|
host_human = "{} ({})".format(connect_host, host)
|
||||||
|
except socket.error as e:
|
||||||
|
self.log.warn('Unable to resolve address: '
|
||||||
|
'{} ({}) Trying anyway!'.format(host, e))
|
||||||
|
connect_host = host
|
||||||
|
host_human = connect_host
|
||||||
|
|
||||||
|
# Attempt socket connection
|
||||||
|
try:
|
||||||
|
knock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
knock.settimeout(timeout)
|
||||||
|
knock.connect((connect_host, port))
|
||||||
|
knock.close()
|
||||||
|
self.log.debug('Socket connect OK for host '
|
||||||
|
'{} on port {}.'.format(host_human, port))
|
||||||
|
return True
|
||||||
|
except socket.error as e:
|
||||||
|
self.log.debug('Socket connect FAIL for'
|
||||||
|
' {} port {} ({})'.format(host_human, port, e))
|
||||||
|
return False
|
||||||
|
|
||||||
|
def port_knock_units(self, sentry_units, port=22,
|
||||||
|
timeout=15, expect_success=True):
|
||||||
|
"""Open a TCP socket to check for a listening sevice on each
|
||||||
|
listed juju unit.
|
||||||
|
|
||||||
|
:param sentry_units: list of sentry unit pointers
|
||||||
|
:param port: TCP port number, default to 22
|
||||||
|
:param timeout: Connect timeout, default to 15 seconds
|
||||||
|
:expect_success: True by default, set False to invert logic
|
||||||
|
:returns: None if successful, Failure message otherwise
|
||||||
|
"""
|
||||||
|
for unit in sentry_units:
|
||||||
|
host = unit.info['public-address']
|
||||||
|
connected = self.port_knock_tcp(host, port, timeout)
|
||||||
|
if not connected and expect_success:
|
||||||
|
return 'Socket connect failed.'
|
||||||
|
elif connected and not expect_success:
|
||||||
|
return 'Socket connected unexpectedly.'
|
||||||
|
|
||||||
|
def get_uuid_epoch_stamp(self):
|
||||||
|
"""Returns a stamp string based on uuid4 and epoch time. Useful in
|
||||||
|
generating test messages which need to be unique-ish."""
|
||||||
|
return '[{}-{}]'.format(uuid.uuid4(), time.time())
|
||||||
|
|
||||||
|
# amulet juju action helpers:
|
||||||
|
def run_action(self, unit_sentry, action,
|
||||||
|
_check_output=subprocess.check_output):
|
||||||
|
"""Run the named action on a given unit sentry.
|
||||||
|
|
||||||
|
_check_output parameter is used for dependency injection.
|
||||||
|
|
||||||
|
@return action_id.
|
||||||
|
"""
|
||||||
|
unit_id = unit_sentry.info["unit_name"]
|
||||||
|
command = ["juju", "action", "do", "--format=json", unit_id, action]
|
||||||
|
self.log.info("Running command: %s\n" % " ".join(command))
|
||||||
|
output = _check_output(command, universal_newlines=True)
|
||||||
|
data = json.loads(output)
|
||||||
|
action_id = data[u'Action queued with id']
|
||||||
|
return action_id
|
||||||
|
|
||||||
|
def wait_on_action(self, action_id, _check_output=subprocess.check_output):
|
||||||
|
"""Wait for a given action, returning if it completed or not.
|
||||||
|
|
||||||
|
_check_output parameter is used for dependency injection.
|
||||||
|
"""
|
||||||
|
command = ["juju", "action", "fetch", "--format=json", "--wait=0",
|
||||||
|
action_id]
|
||||||
|
output = _check_output(command, universal_newlines=True)
|
||||||
|
data = json.loads(output)
|
||||||
|
return data.get(u"status") == "completed"
|
||||||
|
|
||||||
|
def status_get(self, unit):
|
||||||
|
"""Return the current service status of this unit."""
|
||||||
|
raw_status, return_code = unit.run(
|
||||||
|
"status-get --format=json --include-data")
|
||||||
|
if return_code != 0:
|
||||||
|
return ("unknown", "")
|
||||||
|
status = json.loads(raw_status)
|
||||||
|
return (status["status"], status["message"])
|
||||||
|
|
|
@ -14,12 +14,18 @@
|
||||||
# You should have received a copy of the GNU Lesser General Public License
|
# You should have received a copy of the GNU Lesser General Public License
|
||||||
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
|
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
import six
|
import six
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from charmhelpers.contrib.amulet.deployment import (
|
from charmhelpers.contrib.amulet.deployment import (
|
||||||
AmuletDeployment
|
AmuletDeployment
|
||||||
)
|
)
|
||||||
|
|
||||||
|
DEBUG = logging.DEBUG
|
||||||
|
ERROR = logging.ERROR
|
||||||
|
|
||||||
|
|
||||||
class OpenStackAmuletDeployment(AmuletDeployment):
|
class OpenStackAmuletDeployment(AmuletDeployment):
|
||||||
"""OpenStack amulet deployment.
|
"""OpenStack amulet deployment.
|
||||||
|
@ -28,9 +34,12 @@ class OpenStackAmuletDeployment(AmuletDeployment):
|
||||||
that is specifically for use by OpenStack charms.
|
that is specifically for use by OpenStack charms.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, series=None, openstack=None, source=None, stable=True):
|
def __init__(self, series=None, openstack=None, source=None,
|
||||||
|
stable=True, log_level=DEBUG):
|
||||||
"""Initialize the deployment environment."""
|
"""Initialize the deployment environment."""
|
||||||
super(OpenStackAmuletDeployment, self).__init__(series)
|
super(OpenStackAmuletDeployment, self).__init__(series)
|
||||||
|
self.log = self.get_logger(level=log_level)
|
||||||
|
self.log.info('OpenStackAmuletDeployment: init')
|
||||||
self.openstack = openstack
|
self.openstack = openstack
|
||||||
self.source = source
|
self.source = source
|
||||||
self.stable = stable
|
self.stable = stable
|
||||||
|
@ -38,30 +47,55 @@ class OpenStackAmuletDeployment(AmuletDeployment):
|
||||||
# out.
|
# out.
|
||||||
self.current_next = "trusty"
|
self.current_next = "trusty"
|
||||||
|
|
||||||
|
def get_logger(self, name="deployment-logger", level=logging.DEBUG):
|
||||||
|
"""Get a logger object that will log to stdout."""
|
||||||
|
log = logging
|
||||||
|
logger = log.getLogger(name)
|
||||||
|
fmt = log.Formatter("%(asctime)s %(funcName)s "
|
||||||
|
"%(levelname)s: %(message)s")
|
||||||
|
|
||||||
|
handler = log.StreamHandler(stream=sys.stdout)
|
||||||
|
handler.setLevel(level)
|
||||||
|
handler.setFormatter(fmt)
|
||||||
|
|
||||||
|
logger.addHandler(handler)
|
||||||
|
logger.setLevel(level)
|
||||||
|
|
||||||
|
return logger
|
||||||
|
|
||||||
def _determine_branch_locations(self, other_services):
|
def _determine_branch_locations(self, other_services):
|
||||||
"""Determine the branch locations for the other services.
|
"""Determine the branch locations for the other services.
|
||||||
|
|
||||||
Determine if the local branch being tested is derived from its
|
Determine if the local branch being tested is derived from its
|
||||||
stable or next (dev) branch, and based on this, use the corresonding
|
stable or next (dev) branch, and based on this, use the corresonding
|
||||||
stable or next branches for the other_services."""
|
stable or next branches for the other_services."""
|
||||||
|
|
||||||
|
self.log.info('OpenStackAmuletDeployment: determine branch locations')
|
||||||
|
|
||||||
|
# Charms outside the lp:~openstack-charmers namespace
|
||||||
base_charms = ['mysql', 'mongodb', 'nrpe']
|
base_charms = ['mysql', 'mongodb', 'nrpe']
|
||||||
|
|
||||||
|
# Force these charms to current series even when using an older series.
|
||||||
|
# ie. Use trusty/nrpe even when series is precise, as the P charm
|
||||||
|
# does not possess the necessary external master config and hooks.
|
||||||
|
force_series_current = ['nrpe']
|
||||||
|
|
||||||
if self.series in ['precise', 'trusty']:
|
if self.series in ['precise', 'trusty']:
|
||||||
base_series = self.series
|
base_series = self.series
|
||||||
else:
|
else:
|
||||||
base_series = self.current_next
|
base_series = self.current_next
|
||||||
|
|
||||||
if self.stable:
|
|
||||||
for svc in other_services:
|
for svc in other_services:
|
||||||
|
if svc['name'] in force_series_current:
|
||||||
|
base_series = self.current_next
|
||||||
|
# If a location has been explicitly set, use it
|
||||||
if svc.get('location'):
|
if svc.get('location'):
|
||||||
continue
|
continue
|
||||||
|
if self.stable:
|
||||||
temp = 'lp:charms/{}/{}'
|
temp = 'lp:charms/{}/{}'
|
||||||
svc['location'] = temp.format(base_series,
|
svc['location'] = temp.format(base_series,
|
||||||
svc['name'])
|
svc['name'])
|
||||||
else:
|
else:
|
||||||
for svc in other_services:
|
|
||||||
if svc.get('location'):
|
|
||||||
continue
|
|
||||||
if svc['name'] in base_charms:
|
if svc['name'] in base_charms:
|
||||||
temp = 'lp:charms/{}/{}'
|
temp = 'lp:charms/{}/{}'
|
||||||
svc['location'] = temp.format(base_series,
|
svc['location'] = temp.format(base_series,
|
||||||
|
@ -70,10 +104,13 @@ class OpenStackAmuletDeployment(AmuletDeployment):
|
||||||
temp = 'lp:~openstack-charmers/charms/{}/{}/next'
|
temp = 'lp:~openstack-charmers/charms/{}/{}/next'
|
||||||
svc['location'] = temp.format(self.current_next,
|
svc['location'] = temp.format(self.current_next,
|
||||||
svc['name'])
|
svc['name'])
|
||||||
|
|
||||||
return other_services
|
return other_services
|
||||||
|
|
||||||
def _add_services(self, this_service, other_services):
|
def _add_services(self, this_service, other_services):
|
||||||
"""Add services to the deployment and set openstack-origin/source."""
|
"""Add services to the deployment and set openstack-origin/source."""
|
||||||
|
self.log.info('OpenStackAmuletDeployment: adding services')
|
||||||
|
|
||||||
other_services = self._determine_branch_locations(other_services)
|
other_services = self._determine_branch_locations(other_services)
|
||||||
|
|
||||||
super(OpenStackAmuletDeployment, self)._add_services(this_service,
|
super(OpenStackAmuletDeployment, self)._add_services(this_service,
|
||||||
|
@ -81,30 +118,102 @@ class OpenStackAmuletDeployment(AmuletDeployment):
|
||||||
|
|
||||||
services = other_services
|
services = other_services
|
||||||
services.append(this_service)
|
services.append(this_service)
|
||||||
|
|
||||||
|
# Charms which should use the source config option
|
||||||
use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph',
|
use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph',
|
||||||
'ceph-osd', 'ceph-radosgw']
|
'ceph-osd', 'ceph-radosgw']
|
||||||
# Most OpenStack subordinate charms do not expose an origin option
|
|
||||||
# as that is controlled by the principle.
|
# Charms which can not use openstack-origin, ie. many subordinates
|
||||||
ignore = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe',
|
no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe',
|
||||||
'cisco-vpp', 'odl-controller']
|
'openvswitch-odl', 'neutron-api-odl', 'odl-controller']
|
||||||
|
|
||||||
if self.openstack:
|
if self.openstack:
|
||||||
for svc in services:
|
for svc in services:
|
||||||
if svc['name'] not in use_source + ignore:
|
if svc['name'] not in use_source + no_origin:
|
||||||
config = {'openstack-origin': self.openstack}
|
config = {'openstack-origin': self.openstack}
|
||||||
self.d.configure(svc['name'], config)
|
self.d.configure(svc['name'], config)
|
||||||
|
|
||||||
if self.source:
|
if self.source:
|
||||||
for svc in services:
|
for svc in services:
|
||||||
if svc['name'] in use_source and svc['name'] not in ignore:
|
if svc['name'] in use_source and svc['name'] not in no_origin:
|
||||||
config = {'source': self.source}
|
config = {'source': self.source}
|
||||||
self.d.configure(svc['name'], config)
|
self.d.configure(svc['name'], config)
|
||||||
|
|
||||||
def _configure_services(self, configs):
|
def _configure_services(self, configs):
|
||||||
"""Configure all of the services."""
|
"""Configure all of the services."""
|
||||||
|
self.log.info('OpenStackAmuletDeployment: configure services')
|
||||||
for service, config in six.iteritems(configs):
|
for service, config in six.iteritems(configs):
|
||||||
self.d.configure(service, config)
|
self.d.configure(service, config)
|
||||||
|
|
||||||
|
def _auto_wait_for_status(self, message=None, exclude_services=None,
|
||||||
|
include_only=None, timeout=1800):
|
||||||
|
"""Wait for all units to have a specific extended status, except
|
||||||
|
for any defined as excluded. Unless specified via message, any
|
||||||
|
status containing any case of 'ready' will be considered a match.
|
||||||
|
|
||||||
|
Examples of message usage:
|
||||||
|
|
||||||
|
Wait for all unit status to CONTAIN any case of 'ready' or 'ok':
|
||||||
|
message = re.compile('.*ready.*|.*ok.*', re.IGNORECASE)
|
||||||
|
|
||||||
|
Wait for all units to reach this status (exact match):
|
||||||
|
message = re.compile('^Unit is ready and clustered$')
|
||||||
|
|
||||||
|
Wait for all units to reach any one of these (exact match):
|
||||||
|
message = re.compile('Unit is ready|OK|Ready')
|
||||||
|
|
||||||
|
Wait for at least one unit to reach this status (exact match):
|
||||||
|
message = {'ready'}
|
||||||
|
|
||||||
|
See Amulet's sentry.wait_for_messages() for message usage detail.
|
||||||
|
https://github.com/juju/amulet/blob/master/amulet/sentry.py
|
||||||
|
|
||||||
|
:param message: Expected status match
|
||||||
|
:param exclude_services: List of juju service names to ignore,
|
||||||
|
not to be used in conjuction with include_only.
|
||||||
|
:param include_only: List of juju service names to exclusively check,
|
||||||
|
not to be used in conjuction with exclude_services.
|
||||||
|
:param timeout: Maximum time in seconds to wait for status match
|
||||||
|
:returns: None. Raises if timeout is hit.
|
||||||
|
"""
|
||||||
|
self.log.info('Waiting for extended status on units...')
|
||||||
|
|
||||||
|
all_services = self.d.services.keys()
|
||||||
|
|
||||||
|
if exclude_services and include_only:
|
||||||
|
raise ValueError('exclude_services can not be used '
|
||||||
|
'with include_only')
|
||||||
|
|
||||||
|
if message:
|
||||||
|
if isinstance(message, re._pattern_type):
|
||||||
|
match = message.pattern
|
||||||
|
else:
|
||||||
|
match = message
|
||||||
|
|
||||||
|
self.log.debug('Custom extended status wait match: '
|
||||||
|
'{}'.format(match))
|
||||||
|
else:
|
||||||
|
self.log.debug('Default extended status wait match: contains '
|
||||||
|
'READY (case-insensitive)')
|
||||||
|
message = re.compile('.*ready.*', re.IGNORECASE)
|
||||||
|
|
||||||
|
if exclude_services:
|
||||||
|
self.log.debug('Excluding services from extended status match: '
|
||||||
|
'{}'.format(exclude_services))
|
||||||
|
else:
|
||||||
|
exclude_services = []
|
||||||
|
|
||||||
|
if include_only:
|
||||||
|
services = include_only
|
||||||
|
else:
|
||||||
|
services = list(set(all_services) - set(exclude_services))
|
||||||
|
|
||||||
|
self.log.debug('Waiting up to {}s for extended status on services: '
|
||||||
|
'{}'.format(timeout, services))
|
||||||
|
service_messages = {service: message for service in services}
|
||||||
|
self.d.sentry.wait_for_messages(service_messages, timeout=timeout)
|
||||||
|
self.log.info('OK')
|
||||||
|
|
||||||
def _get_openstack_release(self):
|
def _get_openstack_release(self):
|
||||||
"""Get openstack release.
|
"""Get openstack release.
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@ import amulet
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import re
|
||||||
import six
|
import six
|
||||||
import time
|
import time
|
||||||
import urllib
|
import urllib
|
||||||
|
@ -27,6 +28,7 @@ import glanceclient.v1.client as glance_client
|
||||||
import heatclient.v1.client as heat_client
|
import heatclient.v1.client as heat_client
|
||||||
import keystoneclient.v2_0 as keystone_client
|
import keystoneclient.v2_0 as keystone_client
|
||||||
import novaclient.v1_1.client as nova_client
|
import novaclient.v1_1.client as nova_client
|
||||||
|
import pika
|
||||||
import swiftclient
|
import swiftclient
|
||||||
|
|
||||||
from charmhelpers.contrib.amulet.utils import (
|
from charmhelpers.contrib.amulet.utils import (
|
||||||
|
@ -602,3 +604,382 @@ class OpenStackAmuletUtils(AmuletUtils):
|
||||||
self.log.debug('Ceph {} samples (OK): '
|
self.log.debug('Ceph {} samples (OK): '
|
||||||
'{}'.format(sample_type, samples))
|
'{}'.format(sample_type, samples))
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
# rabbitmq/amqp specific helpers:
|
||||||
|
|
||||||
|
def rmq_wait_for_cluster(self, deployment, init_sleep=15, timeout=1200):
|
||||||
|
"""Wait for rmq units extended status to show cluster readiness,
|
||||||
|
after an optional initial sleep period. Initial sleep is likely
|
||||||
|
necessary to be effective following a config change, as status
|
||||||
|
message may not instantly update to non-ready."""
|
||||||
|
|
||||||
|
if init_sleep:
|
||||||
|
time.sleep(init_sleep)
|
||||||
|
|
||||||
|
message = re.compile('^Unit is ready and clustered$')
|
||||||
|
deployment._auto_wait_for_status(message=message,
|
||||||
|
timeout=timeout,
|
||||||
|
include_only=['rabbitmq-server'])
|
||||||
|
|
||||||
|
def add_rmq_test_user(self, sentry_units,
|
||||||
|
username="testuser1", password="changeme"):
|
||||||
|
"""Add a test user via the first rmq juju unit, check connection as
|
||||||
|
the new user against all sentry units.
|
||||||
|
|
||||||
|
:param sentry_units: list of sentry unit pointers
|
||||||
|
:param username: amqp user name, default to testuser1
|
||||||
|
:param password: amqp user password
|
||||||
|
:returns: None if successful. Raise on error.
|
||||||
|
"""
|
||||||
|
self.log.debug('Adding rmq user ({})...'.format(username))
|
||||||
|
|
||||||
|
# Check that user does not already exist
|
||||||
|
cmd_user_list = 'rabbitmqctl list_users'
|
||||||
|
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
|
||||||
|
if username in output:
|
||||||
|
self.log.warning('User ({}) already exists, returning '
|
||||||
|
'gracefully.'.format(username))
|
||||||
|
return
|
||||||
|
|
||||||
|
perms = '".*" ".*" ".*"'
|
||||||
|
cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
|
||||||
|
'rabbitmqctl set_permissions {} {}'.format(username, perms)]
|
||||||
|
|
||||||
|
# Add user via first unit
|
||||||
|
for cmd in cmds:
|
||||||
|
output, _ = self.run_cmd_unit(sentry_units[0], cmd)
|
||||||
|
|
||||||
|
# Check connection against the other sentry_units
|
||||||
|
self.log.debug('Checking user connect against units...')
|
||||||
|
for sentry_unit in sentry_units:
|
||||||
|
connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
|
||||||
|
username=username,
|
||||||
|
password=password)
|
||||||
|
connection.close()
|
||||||
|
|
||||||
|
def delete_rmq_test_user(self, sentry_units, username="testuser1"):
|
||||||
|
"""Delete a rabbitmq user via the first rmq juju unit.
|
||||||
|
|
||||||
|
:param sentry_units: list of sentry unit pointers
|
||||||
|
:param username: amqp user name, default to testuser1
|
||||||
|
:param password: amqp user password
|
||||||
|
:returns: None if successful or no such user.
|
||||||
|
"""
|
||||||
|
self.log.debug('Deleting rmq user ({})...'.format(username))
|
||||||
|
|
||||||
|
# Check that the user exists
|
||||||
|
cmd_user_list = 'rabbitmqctl list_users'
|
||||||
|
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
|
||||||
|
|
||||||
|
if username not in output:
|
||||||
|
self.log.warning('User ({}) does not exist, returning '
|
||||||
|
'gracefully.'.format(username))
|
||||||
|
return
|
||||||
|
|
||||||
|
# Delete the user
|
||||||
|
cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
|
||||||
|
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
|
||||||
|
|
||||||
|
def get_rmq_cluster_status(self, sentry_unit):
|
||||||
|
"""Execute rabbitmq cluster status command on a unit and return
|
||||||
|
the full output.
|
||||||
|
|
||||||
|
:param unit: sentry unit
|
||||||
|
:returns: String containing console output of cluster status command
|
||||||
|
"""
|
||||||
|
cmd = 'rabbitmqctl cluster_status'
|
||||||
|
output, _ = self.run_cmd_unit(sentry_unit, cmd)
|
||||||
|
self.log.debug('{} cluster_status:\n{}'.format(
|
||||||
|
sentry_unit.info['unit_name'], output))
|
||||||
|
return str(output)
|
||||||
|
|
||||||
|
def get_rmq_cluster_running_nodes(self, sentry_unit):
|
||||||
|
"""Parse rabbitmqctl cluster_status output string, return list of
|
||||||
|
running rabbitmq cluster nodes.
|
||||||
|
|
||||||
|
:param unit: sentry unit
|
||||||
|
:returns: List containing node names of running nodes
|
||||||
|
"""
|
||||||
|
# NOTE(beisner): rabbitmqctl cluster_status output is not
|
||||||
|
# json-parsable, do string chop foo, then json.loads that.
|
||||||
|
str_stat = self.get_rmq_cluster_status(sentry_unit)
|
||||||
|
if 'running_nodes' in str_stat:
|
||||||
|
pos_start = str_stat.find("{running_nodes,") + 15
|
||||||
|
pos_end = str_stat.find("]},", pos_start) + 1
|
||||||
|
str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
|
||||||
|
run_nodes = json.loads(str_run_nodes)
|
||||||
|
return run_nodes
|
||||||
|
else:
|
||||||
|
return []
|
||||||
|
|
||||||
|
def validate_rmq_cluster_running_nodes(self, sentry_units):
|
||||||
|
"""Check that all rmq unit hostnames are represented in the
|
||||||
|
cluster_status output of all units.
|
||||||
|
|
||||||
|
:param host_names: dict of juju unit names to host names
|
||||||
|
:param units: list of sentry unit pointers (all rmq units)
|
||||||
|
:returns: None if successful, otherwise return error message
|
||||||
|
"""
|
||||||
|
host_names = self.get_unit_hostnames(sentry_units)
|
||||||
|
errors = []
|
||||||
|
|
||||||
|
# Query every unit for cluster_status running nodes
|
||||||
|
for query_unit in sentry_units:
|
||||||
|
query_unit_name = query_unit.info['unit_name']
|
||||||
|
running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
|
||||||
|
|
||||||
|
# Confirm that every unit is represented in the queried unit's
|
||||||
|
# cluster_status running nodes output.
|
||||||
|
for validate_unit in sentry_units:
|
||||||
|
val_host_name = host_names[validate_unit.info['unit_name']]
|
||||||
|
val_node_name = 'rabbit@{}'.format(val_host_name)
|
||||||
|
|
||||||
|
if val_node_name not in running_nodes:
|
||||||
|
errors.append('Cluster member check failed on {}: {} not '
|
||||||
|
'in {}\n'.format(query_unit_name,
|
||||||
|
val_node_name,
|
||||||
|
running_nodes))
|
||||||
|
if errors:
|
||||||
|
return ''.join(errors)
|
||||||
|
|
||||||
|
def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
|
||||||
|
"""Check a single juju rmq unit for ssl and port in the config file."""
|
||||||
|
host = sentry_unit.info['public-address']
|
||||||
|
unit_name = sentry_unit.info['unit_name']
|
||||||
|
|
||||||
|
conf_file = '/etc/rabbitmq/rabbitmq.config'
|
||||||
|
conf_contents = str(self.file_contents_safe(sentry_unit,
|
||||||
|
conf_file, max_wait=16))
|
||||||
|
# Checks
|
||||||
|
conf_ssl = 'ssl' in conf_contents
|
||||||
|
conf_port = str(port) in conf_contents
|
||||||
|
|
||||||
|
# Port explicitly checked in config
|
||||||
|
if port and conf_port and conf_ssl:
|
||||||
|
self.log.debug('SSL is enabled @{}:{} '
|
||||||
|
'({})'.format(host, port, unit_name))
|
||||||
|
return True
|
||||||
|
elif port and not conf_port and conf_ssl:
|
||||||
|
self.log.debug('SSL is enabled @{} but not on port {} '
|
||||||
|
'({})'.format(host, port, unit_name))
|
||||||
|
return False
|
||||||
|
# Port not checked (useful when checking that ssl is disabled)
|
||||||
|
elif not port and conf_ssl:
|
||||||
|
self.log.debug('SSL is enabled @{}:{} '
|
||||||
|
'({})'.format(host, port, unit_name))
|
||||||
|
return True
|
||||||
|
elif not conf_ssl:
|
||||||
|
self.log.debug('SSL not enabled @{}:{} '
|
||||||
|
'({})'.format(host, port, unit_name))
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
msg = ('Unknown condition when checking SSL status @{}:{} '
|
||||||
|
'({})'.format(host, port, unit_name))
|
||||||
|
amulet.raise_status(amulet.FAIL, msg)
|
||||||
|
|
||||||
|
def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
|
||||||
|
"""Check that ssl is enabled on rmq juju sentry units.
|
||||||
|
|
||||||
|
:param sentry_units: list of all rmq sentry units
|
||||||
|
:param port: optional ssl port override to validate
|
||||||
|
:returns: None if successful, otherwise return error message
|
||||||
|
"""
|
||||||
|
for sentry_unit in sentry_units:
|
||||||
|
if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
|
||||||
|
return ('Unexpected condition: ssl is disabled on unit '
|
||||||
|
'({})'.format(sentry_unit.info['unit_name']))
|
||||||
|
return None
|
||||||
|
|
||||||
|
def validate_rmq_ssl_disabled_units(self, sentry_units):
|
||||||
|
"""Check that ssl is enabled on listed rmq juju sentry units.
|
||||||
|
|
||||||
|
:param sentry_units: list of all rmq sentry units
|
||||||
|
:returns: True if successful. Raise on error.
|
||||||
|
"""
|
||||||
|
for sentry_unit in sentry_units:
|
||||||
|
if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
|
||||||
|
return ('Unexpected condition: ssl is enabled on unit '
|
||||||
|
'({})'.format(sentry_unit.info['unit_name']))
|
||||||
|
return None
|
||||||
|
|
||||||
|
def configure_rmq_ssl_on(self, sentry_units, deployment,
|
||||||
|
port=None, max_wait=60):
|
||||||
|
"""Turn ssl charm config option on, with optional non-default
|
||||||
|
ssl port specification. Confirm that it is enabled on every
|
||||||
|
unit.
|
||||||
|
|
||||||
|
:param sentry_units: list of sentry units
|
||||||
|
:param deployment: amulet deployment object pointer
|
||||||
|
:param port: amqp port, use defaults if None
|
||||||
|
:param max_wait: maximum time to wait in seconds to confirm
|
||||||
|
:returns: None if successful. Raise on error.
|
||||||
|
"""
|
||||||
|
self.log.debug('Setting ssl charm config option: on')
|
||||||
|
|
||||||
|
# Enable RMQ SSL
|
||||||
|
config = {'ssl': 'on'}
|
||||||
|
if port:
|
||||||
|
config['ssl_port'] = port
|
||||||
|
|
||||||
|
deployment.d.configure('rabbitmq-server', config)
|
||||||
|
|
||||||
|
# Wait for unit status
|
||||||
|
self.rmq_wait_for_cluster(deployment)
|
||||||
|
|
||||||
|
# Confirm
|
||||||
|
tries = 0
|
||||||
|
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
|
||||||
|
while ret and tries < (max_wait / 4):
|
||||||
|
time.sleep(4)
|
||||||
|
self.log.debug('Attempt {}: {}'.format(tries, ret))
|
||||||
|
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
|
||||||
|
tries += 1
|
||||||
|
|
||||||
|
if ret:
|
||||||
|
amulet.raise_status(amulet.FAIL, ret)
|
||||||
|
|
||||||
|
def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
|
||||||
|
"""Turn ssl charm config option off, confirm that it is disabled
|
||||||
|
on every unit.
|
||||||
|
|
||||||
|
:param sentry_units: list of sentry units
|
||||||
|
:param deployment: amulet deployment object pointer
|
||||||
|
:param max_wait: maximum time to wait in seconds to confirm
|
||||||
|
:returns: None if successful. Raise on error.
|
||||||
|
"""
|
||||||
|
self.log.debug('Setting ssl charm config option: off')
|
||||||
|
|
||||||
|
# Disable RMQ SSL
|
||||||
|
config = {'ssl': 'off'}
|
||||||
|
deployment.d.configure('rabbitmq-server', config)
|
||||||
|
|
||||||
|
# Wait for unit status
|
||||||
|
self.rmq_wait_for_cluster(deployment)
|
||||||
|
|
||||||
|
# Confirm
|
||||||
|
tries = 0
|
||||||
|
ret = self.validate_rmq_ssl_disabled_units(sentry_units)
|
||||||
|
while ret and tries < (max_wait / 4):
|
||||||
|
time.sleep(4)
|
||||||
|
self.log.debug('Attempt {}: {}'.format(tries, ret))
|
||||||
|
ret = self.validate_rmq_ssl_disabled_units(sentry_units)
|
||||||
|
tries += 1
|
||||||
|
|
||||||
|
if ret:
|
||||||
|
amulet.raise_status(amulet.FAIL, ret)
|
||||||
|
|
||||||
|
def connect_amqp_by_unit(self, sentry_unit, ssl=False,
|
||||||
|
port=None, fatal=True,
|
||||||
|
username="testuser1", password="changeme"):
|
||||||
|
"""Establish and return a pika amqp connection to the rabbitmq service
|
||||||
|
running on a rmq juju unit.
|
||||||
|
|
||||||
|
:param sentry_unit: sentry unit pointer
|
||||||
|
:param ssl: boolean, default to False
|
||||||
|
:param port: amqp port, use defaults if None
|
||||||
|
:param fatal: boolean, default to True (raises on connect error)
|
||||||
|
:param username: amqp user name, default to testuser1
|
||||||
|
:param password: amqp user password
|
||||||
|
:returns: pika amqp connection pointer or None if failed and non-fatal
|
||||||
|
"""
|
||||||
|
host = sentry_unit.info['public-address']
|
||||||
|
unit_name = sentry_unit.info['unit_name']
|
||||||
|
|
||||||
|
# Default port logic if port is not specified
|
||||||
|
if ssl and not port:
|
||||||
|
port = 5671
|
||||||
|
elif not ssl and not port:
|
||||||
|
port = 5672
|
||||||
|
|
||||||
|
self.log.debug('Connecting to amqp on {}:{} ({}) as '
|
||||||
|
'{}...'.format(host, port, unit_name, username))
|
||||||
|
|
||||||
|
try:
|
||||||
|
credentials = pika.PlainCredentials(username, password)
|
||||||
|
parameters = pika.ConnectionParameters(host=host, port=port,
|
||||||
|
credentials=credentials,
|
||||||
|
ssl=ssl,
|
||||||
|
connection_attempts=3,
|
||||||
|
retry_delay=5,
|
||||||
|
socket_timeout=1)
|
||||||
|
connection = pika.BlockingConnection(parameters)
|
||||||
|
assert connection.server_properties['product'] == 'RabbitMQ'
|
||||||
|
self.log.debug('Connect OK')
|
||||||
|
return connection
|
||||||
|
except Exception as e:
|
||||||
|
msg = ('amqp connection failed to {}:{} as '
|
||||||
|
'{} ({})'.format(host, port, username, str(e)))
|
||||||
|
if fatal:
|
||||||
|
amulet.raise_status(amulet.FAIL, msg)
|
||||||
|
else:
|
||||||
|
self.log.warn(msg)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def publish_amqp_message_by_unit(self, sentry_unit, message,
|
||||||
|
queue="test", ssl=False,
|
||||||
|
username="testuser1",
|
||||||
|
password="changeme",
|
||||||
|
port=None):
|
||||||
|
"""Publish an amqp message to a rmq juju unit.
|
||||||
|
|
||||||
|
:param sentry_unit: sentry unit pointer
|
||||||
|
:param message: amqp message string
|
||||||
|
:param queue: message queue, default to test
|
||||||
|
:param username: amqp user name, default to testuser1
|
||||||
|
:param password: amqp user password
|
||||||
|
:param ssl: boolean, default to False
|
||||||
|
:param port: amqp port, use defaults if None
|
||||||
|
:returns: None. Raises exception if publish failed.
|
||||||
|
"""
|
||||||
|
self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
|
||||||
|
message))
|
||||||
|
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
|
||||||
|
port=port,
|
||||||
|
username=username,
|
||||||
|
password=password)
|
||||||
|
|
||||||
|
# NOTE(beisner): extra debug here re: pika hang potential:
|
||||||
|
# https://github.com/pika/pika/issues/297
|
||||||
|
# https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
|
||||||
|
self.log.debug('Defining channel...')
|
||||||
|
channel = connection.channel()
|
||||||
|
self.log.debug('Declaring queue...')
|
||||||
|
channel.queue_declare(queue=queue, auto_delete=False, durable=True)
|
||||||
|
self.log.debug('Publishing message...')
|
||||||
|
channel.basic_publish(exchange='', routing_key=queue, body=message)
|
||||||
|
self.log.debug('Closing channel...')
|
||||||
|
channel.close()
|
||||||
|
self.log.debug('Closing connection...')
|
||||||
|
connection.close()
|
||||||
|
|
||||||
|
def get_amqp_message_by_unit(self, sentry_unit, queue="test",
|
||||||
|
username="testuser1",
|
||||||
|
password="changeme",
|
||||||
|
ssl=False, port=None):
|
||||||
|
"""Get an amqp message from a rmq juju unit.
|
||||||
|
|
||||||
|
:param sentry_unit: sentry unit pointer
|
||||||
|
:param queue: message queue, default to test
|
||||||
|
:param username: amqp user name, default to testuser1
|
||||||
|
:param password: amqp user password
|
||||||
|
:param ssl: boolean, default to False
|
||||||
|
:param port: amqp port, use defaults if None
|
||||||
|
:returns: amqp message body as string. Raise if get fails.
|
||||||
|
"""
|
||||||
|
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
|
||||||
|
port=port,
|
||||||
|
username=username,
|
||||||
|
password=password)
|
||||||
|
channel = connection.channel()
|
||||||
|
method_frame, _, body = channel.basic_get(queue)
|
||||||
|
|
||||||
|
if method_frame:
|
||||||
|
self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
|
||||||
|
body))
|
||||||
|
channel.basic_ack(method_frame.delivery_tag)
|
||||||
|
channel.close()
|
||||||
|
connection.close()
|
||||||
|
return body
|
||||||
|
else:
|
||||||
|
msg = 'No message retrieved.'
|
||||||
|
amulet.raise_status(amulet.FAIL, msg)
|
||||||
|
|
Loading…
Reference in New Issue