[hopem,r=]
Sync charmhelpers to get fix for bug 1499643
This commit is contained in:
		@@ -23,7 +23,7 @@ import socket
 | 
			
		||||
from functools import partial
 | 
			
		||||
 | 
			
		||||
from charmhelpers.core.hookenv import unit_get
 | 
			
		||||
from charmhelpers.fetch import apt_install
 | 
			
		||||
from charmhelpers.fetch import apt_install, apt_update
 | 
			
		||||
from charmhelpers.core.hookenv import (
 | 
			
		||||
    log,
 | 
			
		||||
    WARNING,
 | 
			
		||||
@@ -32,13 +32,15 @@ from charmhelpers.core.hookenv import (
 | 
			
		||||
try:
 | 
			
		||||
    import netifaces
 | 
			
		||||
except ImportError:
 | 
			
		||||
    apt_install('python-netifaces')
 | 
			
		||||
    apt_update(fatal=True)
 | 
			
		||||
    apt_install('python-netifaces', fatal=True)
 | 
			
		||||
    import netifaces
 | 
			
		||||
 | 
			
		||||
try:
 | 
			
		||||
    import netaddr
 | 
			
		||||
except ImportError:
 | 
			
		||||
    apt_install('python-netaddr')
 | 
			
		||||
    apt_update(fatal=True)
 | 
			
		||||
    apt_install('python-netaddr', fatal=True)
 | 
			
		||||
    import netaddr
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -44,20 +44,31 @@ class OpenStackAmuletDeployment(AmuletDeployment):
 | 
			
		||||
           Determine if the local branch being tested is derived from its
 | 
			
		||||
           stable or next (dev) branch, and based on this, use the corresonding
 | 
			
		||||
           stable or next branches for the other_services."""
 | 
			
		||||
 | 
			
		||||
        # Charms outside the lp:~openstack-charmers namespace
 | 
			
		||||
        base_charms = ['mysql', 'mongodb', 'nrpe']
 | 
			
		||||
 | 
			
		||||
        # Force these charms to current series even when using an older series.
 | 
			
		||||
        # ie. Use trusty/nrpe even when series is precise, as the P charm
 | 
			
		||||
        # does not possess the necessary external master config and hooks.
 | 
			
		||||
        force_series_current = ['nrpe']
 | 
			
		||||
 | 
			
		||||
        if self.series in ['precise', 'trusty']:
 | 
			
		||||
            base_series = self.series
 | 
			
		||||
        else:
 | 
			
		||||
            base_series = self.current_next
 | 
			
		||||
 | 
			
		||||
        if self.stable:
 | 
			
		||||
            for svc in other_services:
 | 
			
		||||
        for svc in other_services:
 | 
			
		||||
            if svc['name'] in force_series_current:
 | 
			
		||||
                base_series = self.current_next
 | 
			
		||||
            # If a location has been explicitly set, use it
 | 
			
		||||
            if svc.get('location'):
 | 
			
		||||
                continue
 | 
			
		||||
            if self.stable:
 | 
			
		||||
                temp = 'lp:charms/{}/{}'
 | 
			
		||||
                svc['location'] = temp.format(base_series,
 | 
			
		||||
                                              svc['name'])
 | 
			
		||||
        else:
 | 
			
		||||
            for svc in other_services:
 | 
			
		||||
            else:
 | 
			
		||||
                if svc['name'] in base_charms:
 | 
			
		||||
                    temp = 'lp:charms/{}/{}'
 | 
			
		||||
                    svc['location'] = temp.format(base_series,
 | 
			
		||||
@@ -66,6 +77,7 @@ class OpenStackAmuletDeployment(AmuletDeployment):
 | 
			
		||||
                    temp = 'lp:~openstack-charmers/charms/{}/{}/next'
 | 
			
		||||
                    svc['location'] = temp.format(self.current_next,
 | 
			
		||||
                                                  svc['name'])
 | 
			
		||||
 | 
			
		||||
        return other_services
 | 
			
		||||
 | 
			
		||||
    def _add_services(self, this_service, other_services):
 | 
			
		||||
@@ -77,21 +89,23 @@ class OpenStackAmuletDeployment(AmuletDeployment):
 | 
			
		||||
 | 
			
		||||
        services = other_services
 | 
			
		||||
        services.append(this_service)
 | 
			
		||||
 | 
			
		||||
        # Charms which should use the source config option
 | 
			
		||||
        use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph',
 | 
			
		||||
                      'ceph-osd', 'ceph-radosgw']
 | 
			
		||||
        # Most OpenStack subordinate charms do not expose an origin option
 | 
			
		||||
        # as that is controlled by the principle.
 | 
			
		||||
        ignore = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
 | 
			
		||||
 | 
			
		||||
        # Charms which can not use openstack-origin, ie. many subordinates
 | 
			
		||||
        no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
 | 
			
		||||
 | 
			
		||||
        if self.openstack:
 | 
			
		||||
            for svc in services:
 | 
			
		||||
                if svc['name'] not in use_source + ignore:
 | 
			
		||||
                if svc['name'] not in use_source + no_origin:
 | 
			
		||||
                    config = {'openstack-origin': self.openstack}
 | 
			
		||||
                    self.d.configure(svc['name'], config)
 | 
			
		||||
 | 
			
		||||
        if self.source:
 | 
			
		||||
            for svc in services:
 | 
			
		||||
                if svc['name'] in use_source and svc['name'] not in ignore:
 | 
			
		||||
                if svc['name'] in use_source and svc['name'] not in no_origin:
 | 
			
		||||
                    config = {'source': self.source}
 | 
			
		||||
                    self.d.configure(svc['name'], config)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -27,6 +27,7 @@ import glanceclient.v1.client as glance_client
 | 
			
		||||
import heatclient.v1.client as heat_client
 | 
			
		||||
import keystoneclient.v2_0 as keystone_client
 | 
			
		||||
import novaclient.v1_1.client as nova_client
 | 
			
		||||
import pika
 | 
			
		||||
import swiftclient
 | 
			
		||||
 | 
			
		||||
from charmhelpers.contrib.amulet.utils import (
 | 
			
		||||
@@ -602,3 +603,361 @@ class OpenStackAmuletUtils(AmuletUtils):
 | 
			
		||||
            self.log.debug('Ceph {} samples (OK): '
 | 
			
		||||
                           '{}'.format(sample_type, samples))
 | 
			
		||||
            return None
 | 
			
		||||
 | 
			
		||||
# rabbitmq/amqp specific helpers:
 | 
			
		||||
    def add_rmq_test_user(self, sentry_units,
 | 
			
		||||
                          username="testuser1", password="changeme"):
 | 
			
		||||
        """Add a test user via the first rmq juju unit, check connection as
 | 
			
		||||
        the new user against all sentry units.
 | 
			
		||||
 | 
			
		||||
        :param sentry_units: list of sentry unit pointers
 | 
			
		||||
        :param username: amqp user name, default to testuser1
 | 
			
		||||
        :param password: amqp user password
 | 
			
		||||
        :returns: None if successful.  Raise on error.
 | 
			
		||||
        """
 | 
			
		||||
        self.log.debug('Adding rmq user ({})...'.format(username))
 | 
			
		||||
 | 
			
		||||
        # Check that user does not already exist
 | 
			
		||||
        cmd_user_list = 'rabbitmqctl list_users'
 | 
			
		||||
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
 | 
			
		||||
        if username in output:
 | 
			
		||||
            self.log.warning('User ({}) already exists, returning '
 | 
			
		||||
                             'gracefully.'.format(username))
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        perms = '".*" ".*" ".*"'
 | 
			
		||||
        cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
 | 
			
		||||
                'rabbitmqctl set_permissions {} {}'.format(username, perms)]
 | 
			
		||||
 | 
			
		||||
        # Add user via first unit
 | 
			
		||||
        for cmd in cmds:
 | 
			
		||||
            output, _ = self.run_cmd_unit(sentry_units[0], cmd)
 | 
			
		||||
 | 
			
		||||
        # Check connection against the other sentry_units
 | 
			
		||||
        self.log.debug('Checking user connect against units...')
 | 
			
		||||
        for sentry_unit in sentry_units:
 | 
			
		||||
            connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
 | 
			
		||||
                                                   username=username,
 | 
			
		||||
                                                   password=password)
 | 
			
		||||
            connection.close()
 | 
			
		||||
 | 
			
		||||
    def delete_rmq_test_user(self, sentry_units, username="testuser1"):
 | 
			
		||||
        """Delete a rabbitmq user via the first rmq juju unit.
 | 
			
		||||
 | 
			
		||||
        :param sentry_units: list of sentry unit pointers
 | 
			
		||||
        :param username: amqp user name, default to testuser1
 | 
			
		||||
        :param password: amqp user password
 | 
			
		||||
        :returns: None if successful or no such user.
 | 
			
		||||
        """
 | 
			
		||||
        self.log.debug('Deleting rmq user ({})...'.format(username))
 | 
			
		||||
 | 
			
		||||
        # Check that the user exists
 | 
			
		||||
        cmd_user_list = 'rabbitmqctl list_users'
 | 
			
		||||
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
 | 
			
		||||
 | 
			
		||||
        if username not in output:
 | 
			
		||||
            self.log.warning('User ({}) does not exist, returning '
 | 
			
		||||
                             'gracefully.'.format(username))
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        # Delete the user
 | 
			
		||||
        cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
 | 
			
		||||
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
 | 
			
		||||
 | 
			
		||||
    def get_rmq_cluster_status(self, sentry_unit):
 | 
			
		||||
        """Execute rabbitmq cluster status command on a unit and return
 | 
			
		||||
        the full output.
 | 
			
		||||
 | 
			
		||||
        :param unit: sentry unit
 | 
			
		||||
        :returns: String containing console output of cluster status command
 | 
			
		||||
        """
 | 
			
		||||
        cmd = 'rabbitmqctl cluster_status'
 | 
			
		||||
        output, _ = self.run_cmd_unit(sentry_unit, cmd)
 | 
			
		||||
        self.log.debug('{} cluster_status:\n{}'.format(
 | 
			
		||||
            sentry_unit.info['unit_name'], output))
 | 
			
		||||
        return str(output)
 | 
			
		||||
 | 
			
		||||
    def get_rmq_cluster_running_nodes(self, sentry_unit):
 | 
			
		||||
        """Parse rabbitmqctl cluster_status output string, return list of
 | 
			
		||||
        running rabbitmq cluster nodes.
 | 
			
		||||
 | 
			
		||||
        :param unit: sentry unit
 | 
			
		||||
        :returns: List containing node names of running nodes
 | 
			
		||||
        """
 | 
			
		||||
        # NOTE(beisner): rabbitmqctl cluster_status output is not
 | 
			
		||||
        # json-parsable, do string chop foo, then json.loads that.
 | 
			
		||||
        str_stat = self.get_rmq_cluster_status(sentry_unit)
 | 
			
		||||
        if 'running_nodes' in str_stat:
 | 
			
		||||
            pos_start = str_stat.find("{running_nodes,") + 15
 | 
			
		||||
            pos_end = str_stat.find("]},", pos_start) + 1
 | 
			
		||||
            str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
 | 
			
		||||
            run_nodes = json.loads(str_run_nodes)
 | 
			
		||||
            return run_nodes
 | 
			
		||||
        else:
 | 
			
		||||
            return []
 | 
			
		||||
 | 
			
		||||
    def validate_rmq_cluster_running_nodes(self, sentry_units):
 | 
			
		||||
        """Check that all rmq unit hostnames are represented in the
 | 
			
		||||
        cluster_status output of all units.
 | 
			
		||||
 | 
			
		||||
        :param host_names: dict of juju unit names to host names
 | 
			
		||||
        :param units: list of sentry unit pointers (all rmq units)
 | 
			
		||||
        :returns: None if successful, otherwise return error message
 | 
			
		||||
        """
 | 
			
		||||
        host_names = self.get_unit_hostnames(sentry_units)
 | 
			
		||||
        errors = []
 | 
			
		||||
 | 
			
		||||
        # Query every unit for cluster_status running nodes
 | 
			
		||||
        for query_unit in sentry_units:
 | 
			
		||||
            query_unit_name = query_unit.info['unit_name']
 | 
			
		||||
            running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
 | 
			
		||||
 | 
			
		||||
            # Confirm that every unit is represented in the queried unit's
 | 
			
		||||
            # cluster_status running nodes output.
 | 
			
		||||
            for validate_unit in sentry_units:
 | 
			
		||||
                val_host_name = host_names[validate_unit.info['unit_name']]
 | 
			
		||||
                val_node_name = 'rabbit@{}'.format(val_host_name)
 | 
			
		||||
 | 
			
		||||
                if val_node_name not in running_nodes:
 | 
			
		||||
                    errors.append('Cluster member check failed on {}: {} not '
 | 
			
		||||
                                  'in {}\n'.format(query_unit_name,
 | 
			
		||||
                                                   val_node_name,
 | 
			
		||||
                                                   running_nodes))
 | 
			
		||||
        if errors:
 | 
			
		||||
            return ''.join(errors)
 | 
			
		||||
 | 
			
		||||
    def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
 | 
			
		||||
        """Check a single juju rmq unit for ssl and port in the config file."""
 | 
			
		||||
        host = sentry_unit.info['public-address']
 | 
			
		||||
        unit_name = sentry_unit.info['unit_name']
 | 
			
		||||
 | 
			
		||||
        conf_file = '/etc/rabbitmq/rabbitmq.config'
 | 
			
		||||
        conf_contents = str(self.file_contents_safe(sentry_unit,
 | 
			
		||||
                                                    conf_file, max_wait=16))
 | 
			
		||||
        # Checks
 | 
			
		||||
        conf_ssl = 'ssl' in conf_contents
 | 
			
		||||
        conf_port = str(port) in conf_contents
 | 
			
		||||
 | 
			
		||||
        # Port explicitly checked in config
 | 
			
		||||
        if port and conf_port and conf_ssl:
 | 
			
		||||
            self.log.debug('SSL is enabled  @{}:{} '
 | 
			
		||||
                           '({})'.format(host, port, unit_name))
 | 
			
		||||
            return True
 | 
			
		||||
        elif port and not conf_port and conf_ssl:
 | 
			
		||||
            self.log.debug('SSL is enabled @{} but not on port {} '
 | 
			
		||||
                           '({})'.format(host, port, unit_name))
 | 
			
		||||
            return False
 | 
			
		||||
        # Port not checked (useful when checking that ssl is disabled)
 | 
			
		||||
        elif not port and conf_ssl:
 | 
			
		||||
            self.log.debug('SSL is enabled  @{}:{} '
 | 
			
		||||
                           '({})'.format(host, port, unit_name))
 | 
			
		||||
            return True
 | 
			
		||||
        elif not port and not conf_ssl:
 | 
			
		||||
            self.log.debug('SSL not enabled @{}:{} '
 | 
			
		||||
                           '({})'.format(host, port, unit_name))
 | 
			
		||||
            return False
 | 
			
		||||
        else:
 | 
			
		||||
            msg = ('Unknown condition when checking SSL status @{}:{} '
 | 
			
		||||
                   '({})'.format(host, port, unit_name))
 | 
			
		||||
            amulet.raise_status(amulet.FAIL, msg)
 | 
			
		||||
 | 
			
		||||
    def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
 | 
			
		||||
        """Check that ssl is enabled on rmq juju sentry units.
 | 
			
		||||
 | 
			
		||||
        :param sentry_units: list of all rmq sentry units
 | 
			
		||||
        :param port: optional ssl port override to validate
 | 
			
		||||
        :returns: None if successful, otherwise return error message
 | 
			
		||||
        """
 | 
			
		||||
        for sentry_unit in sentry_units:
 | 
			
		||||
            if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
 | 
			
		||||
                return ('Unexpected condition:  ssl is disabled on unit '
 | 
			
		||||
                        '({})'.format(sentry_unit.info['unit_name']))
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    def validate_rmq_ssl_disabled_units(self, sentry_units):
 | 
			
		||||
        """Check that ssl is enabled on listed rmq juju sentry units.
 | 
			
		||||
 | 
			
		||||
        :param sentry_units: list of all rmq sentry units
 | 
			
		||||
        :returns: True if successful.  Raise on error.
 | 
			
		||||
        """
 | 
			
		||||
        for sentry_unit in sentry_units:
 | 
			
		||||
            if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
 | 
			
		||||
                return ('Unexpected condition:  ssl is enabled on unit '
 | 
			
		||||
                        '({})'.format(sentry_unit.info['unit_name']))
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    def configure_rmq_ssl_on(self, sentry_units, deployment,
 | 
			
		||||
                             port=None, max_wait=60):
 | 
			
		||||
        """Turn ssl charm config option on, with optional non-default
 | 
			
		||||
        ssl port specification.  Confirm that it is enabled on every
 | 
			
		||||
        unit.
 | 
			
		||||
 | 
			
		||||
        :param sentry_units: list of sentry units
 | 
			
		||||
        :param deployment: amulet deployment object pointer
 | 
			
		||||
        :param port: amqp port, use defaults if None
 | 
			
		||||
        :param max_wait: maximum time to wait in seconds to confirm
 | 
			
		||||
        :returns: None if successful.  Raise on error.
 | 
			
		||||
        """
 | 
			
		||||
        self.log.debug('Setting ssl charm config option:  on')
 | 
			
		||||
 | 
			
		||||
        # Enable RMQ SSL
 | 
			
		||||
        config = {'ssl': 'on'}
 | 
			
		||||
        if port:
 | 
			
		||||
            config['ssl_port'] = port
 | 
			
		||||
 | 
			
		||||
        deployment.configure('rabbitmq-server', config)
 | 
			
		||||
 | 
			
		||||
        # Confirm
 | 
			
		||||
        tries = 0
 | 
			
		||||
        ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
 | 
			
		||||
        while ret and tries < (max_wait / 4):
 | 
			
		||||
            time.sleep(4)
 | 
			
		||||
            self.log.debug('Attempt {}: {}'.format(tries, ret))
 | 
			
		||||
            ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
 | 
			
		||||
            tries += 1
 | 
			
		||||
 | 
			
		||||
        if ret:
 | 
			
		||||
            amulet.raise_status(amulet.FAIL, ret)
 | 
			
		||||
 | 
			
		||||
    def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
 | 
			
		||||
        """Turn ssl charm config option off, confirm that it is disabled
 | 
			
		||||
        on every unit.
 | 
			
		||||
 | 
			
		||||
        :param sentry_units: list of sentry units
 | 
			
		||||
        :param deployment: amulet deployment object pointer
 | 
			
		||||
        :param max_wait: maximum time to wait in seconds to confirm
 | 
			
		||||
        :returns: None if successful.  Raise on error.
 | 
			
		||||
        """
 | 
			
		||||
        self.log.debug('Setting ssl charm config option:  off')
 | 
			
		||||
 | 
			
		||||
        # Disable RMQ SSL
 | 
			
		||||
        config = {'ssl': 'off'}
 | 
			
		||||
        deployment.configure('rabbitmq-server', config)
 | 
			
		||||
 | 
			
		||||
        # Confirm
 | 
			
		||||
        tries = 0
 | 
			
		||||
        ret = self.validate_rmq_ssl_disabled_units(sentry_units)
 | 
			
		||||
        while ret and tries < (max_wait / 4):
 | 
			
		||||
            time.sleep(4)
 | 
			
		||||
            self.log.debug('Attempt {}: {}'.format(tries, ret))
 | 
			
		||||
            ret = self.validate_rmq_ssl_disabled_units(sentry_units)
 | 
			
		||||
            tries += 1
 | 
			
		||||
 | 
			
		||||
        if ret:
 | 
			
		||||
            amulet.raise_status(amulet.FAIL, ret)
 | 
			
		||||
 | 
			
		||||
    def connect_amqp_by_unit(self, sentry_unit, ssl=False,
 | 
			
		||||
                             port=None, fatal=True,
 | 
			
		||||
                             username="testuser1", password="changeme"):
 | 
			
		||||
        """Establish and return a pika amqp connection to the rabbitmq service
 | 
			
		||||
        running on a rmq juju unit.
 | 
			
		||||
 | 
			
		||||
        :param sentry_unit: sentry unit pointer
 | 
			
		||||
        :param ssl: boolean, default to False
 | 
			
		||||
        :param port: amqp port, use defaults if None
 | 
			
		||||
        :param fatal: boolean, default to True (raises on connect error)
 | 
			
		||||
        :param username: amqp user name, default to testuser1
 | 
			
		||||
        :param password: amqp user password
 | 
			
		||||
        :returns: pika amqp connection pointer or None if failed and non-fatal
 | 
			
		||||
        """
 | 
			
		||||
        host = sentry_unit.info['public-address']
 | 
			
		||||
        unit_name = sentry_unit.info['unit_name']
 | 
			
		||||
 | 
			
		||||
        # Default port logic if port is not specified
 | 
			
		||||
        if ssl and not port:
 | 
			
		||||
            port = 5671
 | 
			
		||||
        elif not ssl and not port:
 | 
			
		||||
            port = 5672
 | 
			
		||||
 | 
			
		||||
        self.log.debug('Connecting to amqp on {}:{} ({}) as '
 | 
			
		||||
                       '{}...'.format(host, port, unit_name, username))
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            credentials = pika.PlainCredentials(username, password)
 | 
			
		||||
            parameters = pika.ConnectionParameters(host=host, port=port,
 | 
			
		||||
                                                   credentials=credentials,
 | 
			
		||||
                                                   ssl=ssl,
 | 
			
		||||
                                                   connection_attempts=3,
 | 
			
		||||
                                                   retry_delay=5,
 | 
			
		||||
                                                   socket_timeout=1)
 | 
			
		||||
            connection = pika.BlockingConnection(parameters)
 | 
			
		||||
            assert connection.server_properties['product'] == 'RabbitMQ'
 | 
			
		||||
            self.log.debug('Connect OK')
 | 
			
		||||
            return connection
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            msg = ('amqp connection failed to {}:{} as '
 | 
			
		||||
                   '{} ({})'.format(host, port, username, str(e)))
 | 
			
		||||
            if fatal:
 | 
			
		||||
                amulet.raise_status(amulet.FAIL, msg)
 | 
			
		||||
            else:
 | 
			
		||||
                self.log.warn(msg)
 | 
			
		||||
                return None
 | 
			
		||||
 | 
			
		||||
    def publish_amqp_message_by_unit(self, sentry_unit, message,
 | 
			
		||||
                                     queue="test", ssl=False,
 | 
			
		||||
                                     username="testuser1",
 | 
			
		||||
                                     password="changeme",
 | 
			
		||||
                                     port=None):
 | 
			
		||||
        """Publish an amqp message to a rmq juju unit.
 | 
			
		||||
 | 
			
		||||
        :param sentry_unit: sentry unit pointer
 | 
			
		||||
        :param message: amqp message string
 | 
			
		||||
        :param queue: message queue, default to test
 | 
			
		||||
        :param username: amqp user name, default to testuser1
 | 
			
		||||
        :param password: amqp user password
 | 
			
		||||
        :param ssl: boolean, default to False
 | 
			
		||||
        :param port: amqp port, use defaults if None
 | 
			
		||||
        :returns: None.  Raises exception if publish failed.
 | 
			
		||||
        """
 | 
			
		||||
        self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
 | 
			
		||||
                                                                    message))
 | 
			
		||||
        connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
 | 
			
		||||
                                               port=port,
 | 
			
		||||
                                               username=username,
 | 
			
		||||
                                               password=password)
 | 
			
		||||
 | 
			
		||||
        # NOTE(beisner): extra debug here re: pika hang potential:
 | 
			
		||||
        #   https://github.com/pika/pika/issues/297
 | 
			
		||||
        #   https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
 | 
			
		||||
        self.log.debug('Defining channel...')
 | 
			
		||||
        channel = connection.channel()
 | 
			
		||||
        self.log.debug('Declaring queue...')
 | 
			
		||||
        channel.queue_declare(queue=queue, auto_delete=False, durable=True)
 | 
			
		||||
        self.log.debug('Publishing message...')
 | 
			
		||||
        channel.basic_publish(exchange='', routing_key=queue, body=message)
 | 
			
		||||
        self.log.debug('Closing channel...')
 | 
			
		||||
        channel.close()
 | 
			
		||||
        self.log.debug('Closing connection...')
 | 
			
		||||
        connection.close()
 | 
			
		||||
 | 
			
		||||
    def get_amqp_message_by_unit(self, sentry_unit, queue="test",
 | 
			
		||||
                                 username="testuser1",
 | 
			
		||||
                                 password="changeme",
 | 
			
		||||
                                 ssl=False, port=None):
 | 
			
		||||
        """Get an amqp message from a rmq juju unit.
 | 
			
		||||
 | 
			
		||||
        :param sentry_unit: sentry unit pointer
 | 
			
		||||
        :param queue: message queue, default to test
 | 
			
		||||
        :param username: amqp user name, default to testuser1
 | 
			
		||||
        :param password: amqp user password
 | 
			
		||||
        :param ssl: boolean, default to False
 | 
			
		||||
        :param port: amqp port, use defaults if None
 | 
			
		||||
        :returns: amqp message body as string.  Raise if get fails.
 | 
			
		||||
        """
 | 
			
		||||
        connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
 | 
			
		||||
                                               port=port,
 | 
			
		||||
                                               username=username,
 | 
			
		||||
                                               password=password)
 | 
			
		||||
        channel = connection.channel()
 | 
			
		||||
        method_frame, _, body = channel.basic_get(queue)
 | 
			
		||||
 | 
			
		||||
        if method_frame:
 | 
			
		||||
            self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
 | 
			
		||||
                                                                         body))
 | 
			
		||||
            channel.basic_ack(method_frame.delivery_tag)
 | 
			
		||||
            channel.close()
 | 
			
		||||
            connection.close()
 | 
			
		||||
            return body
 | 
			
		||||
        else:
 | 
			
		||||
            msg = 'No message retrieved.'
 | 
			
		||||
            amulet.raise_status(amulet.FAIL, msg)
 | 
			
		||||
 
 | 
			
		||||
@@ -194,10 +194,50 @@ def config_flags_parser(config_flags):
 | 
			
		||||
class OSContextGenerator(object):
 | 
			
		||||
    """Base class for all context generators."""
 | 
			
		||||
    interfaces = []
 | 
			
		||||
    related = False
 | 
			
		||||
    complete = False
 | 
			
		||||
    missing_data = []
 | 
			
		||||
 | 
			
		||||
    def __call__(self):
 | 
			
		||||
        raise NotImplementedError
 | 
			
		||||
 | 
			
		||||
    def context_complete(self, ctxt):
 | 
			
		||||
        """Check for missing data for the required context data.
 | 
			
		||||
        Set self.missing_data if it exists and return False.
 | 
			
		||||
        Set self.complete if no missing data and return True.
 | 
			
		||||
        """
 | 
			
		||||
        # Fresh start
 | 
			
		||||
        self.complete = False
 | 
			
		||||
        self.missing_data = []
 | 
			
		||||
        for k, v in six.iteritems(ctxt):
 | 
			
		||||
            if v is None or v == '':
 | 
			
		||||
                if k not in self.missing_data:
 | 
			
		||||
                    self.missing_data.append(k)
 | 
			
		||||
 | 
			
		||||
        if self.missing_data:
 | 
			
		||||
            self.complete = False
 | 
			
		||||
            log('Missing required data: %s' % ' '.join(self.missing_data), level=INFO)
 | 
			
		||||
        else:
 | 
			
		||||
            self.complete = True
 | 
			
		||||
        return self.complete
 | 
			
		||||
 | 
			
		||||
    def get_related(self):
 | 
			
		||||
        """Check if any of the context interfaces have relation ids.
 | 
			
		||||
        Set self.related and return True if one of the interfaces
 | 
			
		||||
        has relation ids.
 | 
			
		||||
        """
 | 
			
		||||
        # Fresh start
 | 
			
		||||
        self.related = False
 | 
			
		||||
        try:
 | 
			
		||||
            for interface in self.interfaces:
 | 
			
		||||
                if relation_ids(interface):
 | 
			
		||||
                    self.related = True
 | 
			
		||||
            return self.related
 | 
			
		||||
        except AttributeError as e:
 | 
			
		||||
            log("{} {}"
 | 
			
		||||
                "".format(self, e), 'INFO')
 | 
			
		||||
            return self.related
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SharedDBContext(OSContextGenerator):
 | 
			
		||||
    interfaces = ['shared-db']
 | 
			
		||||
@@ -213,6 +253,7 @@ class SharedDBContext(OSContextGenerator):
 | 
			
		||||
        self.database = database
 | 
			
		||||
        self.user = user
 | 
			
		||||
        self.ssl_dir = ssl_dir
 | 
			
		||||
        self.rel_name = self.interfaces[0]
 | 
			
		||||
 | 
			
		||||
    def __call__(self):
 | 
			
		||||
        self.database = self.database or config('database')
 | 
			
		||||
@@ -246,6 +287,7 @@ class SharedDBContext(OSContextGenerator):
 | 
			
		||||
            password_setting = self.relation_prefix + '_password'
 | 
			
		||||
 | 
			
		||||
        for rid in relation_ids(self.interfaces[0]):
 | 
			
		||||
            self.related = True
 | 
			
		||||
            for unit in related_units(rid):
 | 
			
		||||
                rdata = relation_get(rid=rid, unit=unit)
 | 
			
		||||
                host = rdata.get('db_host')
 | 
			
		||||
@@ -257,7 +299,7 @@ class SharedDBContext(OSContextGenerator):
 | 
			
		||||
                    'database_password': rdata.get(password_setting),
 | 
			
		||||
                    'database_type': 'mysql'
 | 
			
		||||
                }
 | 
			
		||||
                if context_complete(ctxt):
 | 
			
		||||
                if self.context_complete(ctxt):
 | 
			
		||||
                    db_ssl(rdata, ctxt, self.ssl_dir)
 | 
			
		||||
                    return ctxt
 | 
			
		||||
        return {}
 | 
			
		||||
@@ -278,6 +320,7 @@ class PostgresqlDBContext(OSContextGenerator):
 | 
			
		||||
 | 
			
		||||
        ctxt = {}
 | 
			
		||||
        for rid in relation_ids(self.interfaces[0]):
 | 
			
		||||
            self.related = True
 | 
			
		||||
            for unit in related_units(rid):
 | 
			
		||||
                rel_host = relation_get('host', rid=rid, unit=unit)
 | 
			
		||||
                rel_user = relation_get('user', rid=rid, unit=unit)
 | 
			
		||||
@@ -287,7 +330,7 @@ class PostgresqlDBContext(OSContextGenerator):
 | 
			
		||||
                        'database_user': rel_user,
 | 
			
		||||
                        'database_password': rel_passwd,
 | 
			
		||||
                        'database_type': 'postgresql'}
 | 
			
		||||
                if context_complete(ctxt):
 | 
			
		||||
                if self.context_complete(ctxt):
 | 
			
		||||
                    return ctxt
 | 
			
		||||
 | 
			
		||||
        return {}
 | 
			
		||||
@@ -348,6 +391,7 @@ class IdentityServiceContext(OSContextGenerator):
 | 
			
		||||
            ctxt['signing_dir'] = cachedir
 | 
			
		||||
 | 
			
		||||
        for rid in relation_ids(self.rel_name):
 | 
			
		||||
            self.related = True
 | 
			
		||||
            for unit in related_units(rid):
 | 
			
		||||
                rdata = relation_get(rid=rid, unit=unit)
 | 
			
		||||
                serv_host = rdata.get('service_host')
 | 
			
		||||
@@ -366,7 +410,7 @@ class IdentityServiceContext(OSContextGenerator):
 | 
			
		||||
                             'service_protocol': svc_protocol,
 | 
			
		||||
                             'auth_protocol': auth_protocol})
 | 
			
		||||
 | 
			
		||||
                if context_complete(ctxt):
 | 
			
		||||
                if self.context_complete(ctxt):
 | 
			
		||||
                    # NOTE(jamespage) this is required for >= icehouse
 | 
			
		||||
                    # so a missing value just indicates keystone needs
 | 
			
		||||
                    # upgrading
 | 
			
		||||
@@ -405,6 +449,7 @@ class AMQPContext(OSContextGenerator):
 | 
			
		||||
        ctxt = {}
 | 
			
		||||
        for rid in relation_ids(self.rel_name):
 | 
			
		||||
            ha_vip_only = False
 | 
			
		||||
            self.related = True
 | 
			
		||||
            for unit in related_units(rid):
 | 
			
		||||
                if relation_get('clustered', rid=rid, unit=unit):
 | 
			
		||||
                    ctxt['clustered'] = True
 | 
			
		||||
@@ -437,7 +482,7 @@ class AMQPContext(OSContextGenerator):
 | 
			
		||||
                ha_vip_only = relation_get('ha-vip-only',
 | 
			
		||||
                                           rid=rid, unit=unit) is not None
 | 
			
		||||
 | 
			
		||||
                if context_complete(ctxt):
 | 
			
		||||
                if self.context_complete(ctxt):
 | 
			
		||||
                    if 'rabbit_ssl_ca' in ctxt:
 | 
			
		||||
                        if not self.ssl_dir:
 | 
			
		||||
                            log("Charm not setup for ssl support but ssl ca "
 | 
			
		||||
@@ -469,7 +514,7 @@ class AMQPContext(OSContextGenerator):
 | 
			
		||||
            ctxt['oslo_messaging_flags'] = config_flags_parser(
 | 
			
		||||
                oslo_messaging_flags)
 | 
			
		||||
 | 
			
		||||
        if not context_complete(ctxt):
 | 
			
		||||
        if not self.complete:
 | 
			
		||||
            return {}
 | 
			
		||||
 | 
			
		||||
        return ctxt
 | 
			
		||||
@@ -485,13 +530,15 @@ class CephContext(OSContextGenerator):
 | 
			
		||||
 | 
			
		||||
        log('Generating template context for ceph', level=DEBUG)
 | 
			
		||||
        mon_hosts = []
 | 
			
		||||
        auth = None
 | 
			
		||||
        key = None
 | 
			
		||||
        use_syslog = str(config('use-syslog')).lower()
 | 
			
		||||
        ctxt = {
 | 
			
		||||
            'use_syslog': str(config('use-syslog')).lower()
 | 
			
		||||
        }
 | 
			
		||||
        for rid in relation_ids('ceph'):
 | 
			
		||||
            for unit in related_units(rid):
 | 
			
		||||
                auth = relation_get('auth', rid=rid, unit=unit)
 | 
			
		||||
                key = relation_get('key', rid=rid, unit=unit)
 | 
			
		||||
                if not ctxt.get('auth'):
 | 
			
		||||
                    ctxt['auth'] = relation_get('auth', rid=rid, unit=unit)
 | 
			
		||||
                if not ctxt.get('key'):
 | 
			
		||||
                    ctxt['key'] = relation_get('key', rid=rid, unit=unit)
 | 
			
		||||
                ceph_pub_addr = relation_get('ceph-public-address', rid=rid,
 | 
			
		||||
                                             unit=unit)
 | 
			
		||||
                unit_priv_addr = relation_get('private-address', rid=rid,
 | 
			
		||||
@@ -500,15 +547,12 @@ class CephContext(OSContextGenerator):
 | 
			
		||||
                ceph_addr = format_ipv6_addr(ceph_addr) or ceph_addr
 | 
			
		||||
                mon_hosts.append(ceph_addr)
 | 
			
		||||
 | 
			
		||||
        ctxt = {'mon_hosts': ' '.join(sorted(mon_hosts)),
 | 
			
		||||
                'auth': auth,
 | 
			
		||||
                'key': key,
 | 
			
		||||
                'use_syslog': use_syslog}
 | 
			
		||||
        ctxt['mon_hosts'] = ' '.join(sorted(mon_hosts))
 | 
			
		||||
 | 
			
		||||
        if not os.path.isdir('/etc/ceph'):
 | 
			
		||||
            os.mkdir('/etc/ceph')
 | 
			
		||||
 | 
			
		||||
        if not context_complete(ctxt):
 | 
			
		||||
        if not self.context_complete(ctxt):
 | 
			
		||||
            return {}
 | 
			
		||||
 | 
			
		||||
        ensure_packages(['ceph-common'])
 | 
			
		||||
@@ -1367,6 +1411,6 @@ class NetworkServiceContext(OSContextGenerator):
 | 
			
		||||
                    'auth_protocol':
 | 
			
		||||
                    rdata.get('auth_protocol') or 'http',
 | 
			
		||||
                }
 | 
			
		||||
                if context_complete(ctxt):
 | 
			
		||||
                if self.context_complete(ctxt):
 | 
			
		||||
                    return ctxt
 | 
			
		||||
        return {}
 | 
			
		||||
 
 | 
			
		||||
@@ -18,7 +18,7 @@ import os
 | 
			
		||||
 | 
			
		||||
import six
 | 
			
		||||
 | 
			
		||||
from charmhelpers.fetch import apt_install
 | 
			
		||||
from charmhelpers.fetch import apt_install, apt_update
 | 
			
		||||
from charmhelpers.core.hookenv import (
 | 
			
		||||
    log,
 | 
			
		||||
    ERROR,
 | 
			
		||||
@@ -29,6 +29,7 @@ from charmhelpers.contrib.openstack.utils import OPENSTACK_CODENAMES
 | 
			
		||||
try:
 | 
			
		||||
    from jinja2 import FileSystemLoader, ChoiceLoader, Environment, exceptions
 | 
			
		||||
except ImportError:
 | 
			
		||||
    apt_update(fatal=True)
 | 
			
		||||
    apt_install('python-jinja2', fatal=True)
 | 
			
		||||
    from jinja2 import FileSystemLoader, ChoiceLoader, Environment, exceptions
 | 
			
		||||
 | 
			
		||||
@@ -112,7 +113,7 @@ class OSConfigTemplate(object):
 | 
			
		||||
 | 
			
		||||
    def complete_contexts(self):
 | 
			
		||||
        '''
 | 
			
		||||
        Return a list of interfaces that have atisfied contexts.
 | 
			
		||||
        Return a list of interfaces that have satisfied contexts.
 | 
			
		||||
        '''
 | 
			
		||||
        if self._complete_contexts:
 | 
			
		||||
            return self._complete_contexts
 | 
			
		||||
@@ -293,3 +294,30 @@ class OSConfigRenderer(object):
 | 
			
		||||
        [interfaces.extend(i.complete_contexts())
 | 
			
		||||
         for i in six.itervalues(self.templates)]
 | 
			
		||||
        return interfaces
 | 
			
		||||
 | 
			
		||||
    def get_incomplete_context_data(self, interfaces):
 | 
			
		||||
        '''
 | 
			
		||||
        Return dictionary of relation status of interfaces and any missing
 | 
			
		||||
        required context data. Example:
 | 
			
		||||
            {'amqp': {'missing_data': ['rabbitmq_password'], 'related': True},
 | 
			
		||||
             'zeromq-configuration': {'related': False}}
 | 
			
		||||
        '''
 | 
			
		||||
        incomplete_context_data = {}
 | 
			
		||||
 | 
			
		||||
        for i in six.itervalues(self.templates):
 | 
			
		||||
            for context in i.contexts:
 | 
			
		||||
                for interface in interfaces:
 | 
			
		||||
                    related = False
 | 
			
		||||
                    if interface in context.interfaces:
 | 
			
		||||
                        related = context.get_related()
 | 
			
		||||
                        missing_data = context.missing_data
 | 
			
		||||
                        if missing_data:
 | 
			
		||||
                            incomplete_context_data[interface] = {'missing_data': missing_data}
 | 
			
		||||
                        if related:
 | 
			
		||||
                            if incomplete_context_data.get(interface):
 | 
			
		||||
                                incomplete_context_data[interface].update({'related': True})
 | 
			
		||||
                            else:
 | 
			
		||||
                                incomplete_context_data[interface] = {'related': True}
 | 
			
		||||
                        else:
 | 
			
		||||
                            incomplete_context_data[interface] = {'related': False}
 | 
			
		||||
        return incomplete_context_data
 | 
			
		||||
 
 | 
			
		||||
@@ -42,7 +42,9 @@ from charmhelpers.core.hookenv import (
 | 
			
		||||
    charm_dir,
 | 
			
		||||
    INFO,
 | 
			
		||||
    relation_ids,
 | 
			
		||||
    relation_set
 | 
			
		||||
    relation_set,
 | 
			
		||||
    status_set,
 | 
			
		||||
    hook_name
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
from charmhelpers.contrib.storage.linux.lvm import (
 | 
			
		||||
@@ -52,7 +54,8 @@ from charmhelpers.contrib.storage.linux.lvm import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
from charmhelpers.contrib.network.ip import (
 | 
			
		||||
    get_ipv6_addr
 | 
			
		||||
    get_ipv6_addr,
 | 
			
		||||
    is_ipv6,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
from charmhelpers.contrib.python.packages import (
 | 
			
		||||
@@ -517,6 +520,12 @@ def sync_db_with_multi_ipv6_addresses(database, database_user,
 | 
			
		||||
                                      relation_prefix=None):
 | 
			
		||||
    hosts = get_ipv6_addr(dynamic_only=False)
 | 
			
		||||
 | 
			
		||||
    if config('vip'):
 | 
			
		||||
        vips = config('vip').split()
 | 
			
		||||
        for vip in vips:
 | 
			
		||||
            if vip and is_ipv6(vip):
 | 
			
		||||
                hosts.append(vip)
 | 
			
		||||
 | 
			
		||||
    kwargs = {'database': database,
 | 
			
		||||
              'username': database_user,
 | 
			
		||||
              'hostname': json.dumps(hosts)}
 | 
			
		||||
@@ -754,6 +763,176 @@ def git_yaml_value(projects_yaml, key):
 | 
			
		||||
    return None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def os_workload_status(configs, required_interfaces, charm_func=None):
 | 
			
		||||
    """
 | 
			
		||||
    Decorator to set workload status based on complete contexts
 | 
			
		||||
    """
 | 
			
		||||
    def wrap(f):
 | 
			
		||||
        @wraps(f)
 | 
			
		||||
        def wrapped_f(*args, **kwargs):
 | 
			
		||||
            # Run the original function first
 | 
			
		||||
            f(*args, **kwargs)
 | 
			
		||||
            # Set workload status now that contexts have been
 | 
			
		||||
            # acted on
 | 
			
		||||
            set_os_workload_status(configs, required_interfaces, charm_func)
 | 
			
		||||
        return wrapped_f
 | 
			
		||||
    return wrap
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def set_os_workload_status(configs, required_interfaces, charm_func=None):
 | 
			
		||||
    """
 | 
			
		||||
    Set workload status based on complete contexts.
 | 
			
		||||
    status-set missing or incomplete contexts
 | 
			
		||||
    and juju-log details of missing required data.
 | 
			
		||||
    charm_func is a charm specific function to run checking
 | 
			
		||||
    for charm specific requirements such as a VIP setting.
 | 
			
		||||
    """
 | 
			
		||||
    incomplete_rel_data = incomplete_relation_data(configs, required_interfaces)
 | 
			
		||||
    state = 'active'
 | 
			
		||||
    missing_relations = []
 | 
			
		||||
    incomplete_relations = []
 | 
			
		||||
    message = None
 | 
			
		||||
    charm_state = None
 | 
			
		||||
    charm_message = None
 | 
			
		||||
 | 
			
		||||
    for generic_interface in incomplete_rel_data.keys():
 | 
			
		||||
        related_interface = None
 | 
			
		||||
        missing_data = {}
 | 
			
		||||
        # Related or not?
 | 
			
		||||
        for interface in incomplete_rel_data[generic_interface]:
 | 
			
		||||
            if incomplete_rel_data[generic_interface][interface].get('related'):
 | 
			
		||||
                related_interface = interface
 | 
			
		||||
                missing_data = incomplete_rel_data[generic_interface][interface].get('missing_data')
 | 
			
		||||
        # No relation ID for the generic_interface
 | 
			
		||||
        if not related_interface:
 | 
			
		||||
            juju_log("{} relation is missing and must be related for "
 | 
			
		||||
                     "functionality. ".format(generic_interface), 'WARN')
 | 
			
		||||
            state = 'blocked'
 | 
			
		||||
            if generic_interface not in missing_relations:
 | 
			
		||||
                missing_relations.append(generic_interface)
 | 
			
		||||
        else:
 | 
			
		||||
            # Relation ID exists but no related unit
 | 
			
		||||
            if not missing_data:
 | 
			
		||||
                # Edge case relation ID exists but departing
 | 
			
		||||
                if ('departed' in hook_name() or 'broken' in hook_name()) \
 | 
			
		||||
                        and related_interface in hook_name():
 | 
			
		||||
                    state = 'blocked'
 | 
			
		||||
                    if generic_interface not in missing_relations:
 | 
			
		||||
                        missing_relations.append(generic_interface)
 | 
			
		||||
                    juju_log("{} relation's interface, {}, "
 | 
			
		||||
                             "relationship is departed or broken "
 | 
			
		||||
                             "and is required for functionality."
 | 
			
		||||
                             "".format(generic_interface, related_interface), "WARN")
 | 
			
		||||
                # Normal case relation ID exists but no related unit
 | 
			
		||||
                # (joining)
 | 
			
		||||
                else:
 | 
			
		||||
                    juju_log("{} relations's interface, {}, is related but has "
 | 
			
		||||
                             "no units in the relation."
 | 
			
		||||
                             "".format(generic_interface, related_interface), "INFO")
 | 
			
		||||
            # Related unit exists and data missing on the relation
 | 
			
		||||
            else:
 | 
			
		||||
                juju_log("{} relation's interface, {}, is related awaiting "
 | 
			
		||||
                         "the following data from the relationship: {}. "
 | 
			
		||||
                         "".format(generic_interface, related_interface,
 | 
			
		||||
                                   ", ".join(missing_data)), "INFO")
 | 
			
		||||
            if state != 'blocked':
 | 
			
		||||
                state = 'waiting'
 | 
			
		||||
            if generic_interface not in incomplete_relations \
 | 
			
		||||
                    and generic_interface not in missing_relations:
 | 
			
		||||
                incomplete_relations.append(generic_interface)
 | 
			
		||||
 | 
			
		||||
    if missing_relations:
 | 
			
		||||
        message = "Missing relations: {}".format(", ".join(missing_relations))
 | 
			
		||||
        if incomplete_relations:
 | 
			
		||||
            message += "; incomplete relations: {}" \
 | 
			
		||||
                       "".format(", ".join(incomplete_relations))
 | 
			
		||||
        state = 'blocked'
 | 
			
		||||
    elif incomplete_relations:
 | 
			
		||||
        message = "Incomplete relations: {}" \
 | 
			
		||||
                  "".format(", ".join(incomplete_relations))
 | 
			
		||||
        state = 'waiting'
 | 
			
		||||
 | 
			
		||||
    # Run charm specific checks
 | 
			
		||||
    if charm_func:
 | 
			
		||||
        charm_state, charm_message = charm_func(configs)
 | 
			
		||||
        if charm_state != 'active' and charm_state != 'unknown':
 | 
			
		||||
            state = workload_state_compare(state, charm_state)
 | 
			
		||||
            if message:
 | 
			
		||||
                message = "{} {}".format(message, charm_message)
 | 
			
		||||
            else:
 | 
			
		||||
                message = charm_message
 | 
			
		||||
 | 
			
		||||
    # Set to active if all requirements have been met
 | 
			
		||||
    if state == 'active':
 | 
			
		||||
        message = "Unit is ready"
 | 
			
		||||
        juju_log(message, "INFO")
 | 
			
		||||
 | 
			
		||||
    status_set(state, message)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def workload_state_compare(current_workload_state, workload_state):
 | 
			
		||||
    """ Return highest priority of two states"""
 | 
			
		||||
    hierarchy = {'unknown': -1,
 | 
			
		||||
                 'active': 0,
 | 
			
		||||
                 'maintenance': 1,
 | 
			
		||||
                 'waiting': 2,
 | 
			
		||||
                 'blocked': 3,
 | 
			
		||||
                 }
 | 
			
		||||
 | 
			
		||||
    if hierarchy.get(workload_state) is None:
 | 
			
		||||
        workload_state = 'unknown'
 | 
			
		||||
    if hierarchy.get(current_workload_state) is None:
 | 
			
		||||
        current_workload_state = 'unknown'
 | 
			
		||||
 | 
			
		||||
    # Set workload_state based on hierarchy of statuses
 | 
			
		||||
    if hierarchy.get(current_workload_state) > hierarchy.get(workload_state):
 | 
			
		||||
        return current_workload_state
 | 
			
		||||
    else:
 | 
			
		||||
        return workload_state
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def incomplete_relation_data(configs, required_interfaces):
 | 
			
		||||
    """
 | 
			
		||||
    Check complete contexts against required_interfaces
 | 
			
		||||
    Return dictionary of incomplete relation data.
 | 
			
		||||
 | 
			
		||||
    configs is an OSConfigRenderer object with configs registered
 | 
			
		||||
 | 
			
		||||
    required_interfaces is a dictionary of required general interfaces
 | 
			
		||||
    with dictionary values of possible specific interfaces.
 | 
			
		||||
    Example:
 | 
			
		||||
    required_interfaces = {'database': ['shared-db', 'pgsql-db']}
 | 
			
		||||
 | 
			
		||||
    The interface is said to be satisfied if anyone of the interfaces in the
 | 
			
		||||
    list has a complete context.
 | 
			
		||||
 | 
			
		||||
    Return dictionary of incomplete or missing required contexts with relation
 | 
			
		||||
    status of interfaces and any missing data points. Example:
 | 
			
		||||
        {'message':
 | 
			
		||||
             {'amqp': {'missing_data': ['rabbitmq_password'], 'related': True},
 | 
			
		||||
              'zeromq-configuration': {'related': False}},
 | 
			
		||||
         'identity':
 | 
			
		||||
             {'identity-service': {'related': False}},
 | 
			
		||||
         'database':
 | 
			
		||||
             {'pgsql-db': {'related': False},
 | 
			
		||||
              'shared-db': {'related': True}}}
 | 
			
		||||
    """
 | 
			
		||||
    complete_ctxts = configs.complete_contexts()
 | 
			
		||||
    incomplete_relations = []
 | 
			
		||||
    for svc_type in required_interfaces.keys():
 | 
			
		||||
        # Avoid duplicates
 | 
			
		||||
        found_ctxt = False
 | 
			
		||||
        for interface in required_interfaces[svc_type]:
 | 
			
		||||
            if interface in complete_ctxts:
 | 
			
		||||
                found_ctxt = True
 | 
			
		||||
        if not found_ctxt:
 | 
			
		||||
            incomplete_relations.append(svc_type)
 | 
			
		||||
    incomplete_context_data = {}
 | 
			
		||||
    for i in incomplete_relations:
 | 
			
		||||
        incomplete_context_data[i] = configs.get_incomplete_context_data(required_interfaces[i])
 | 
			
		||||
    return incomplete_context_data
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def do_action_openstack_upgrade(package, upgrade_callback, configs):
 | 
			
		||||
    """Perform action-managed OpenStack upgrade.
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -28,6 +28,7 @@ import os
 | 
			
		||||
import shutil
 | 
			
		||||
import json
 | 
			
		||||
import time
 | 
			
		||||
import uuid
 | 
			
		||||
 | 
			
		||||
from subprocess import (
 | 
			
		||||
    check_call,
 | 
			
		||||
@@ -35,8 +36,10 @@ from subprocess import (
 | 
			
		||||
    CalledProcessError,
 | 
			
		||||
)
 | 
			
		||||
from charmhelpers.core.hookenv import (
 | 
			
		||||
    local_unit,
 | 
			
		||||
    relation_get,
 | 
			
		||||
    relation_ids,
 | 
			
		||||
    relation_set,
 | 
			
		||||
    related_units,
 | 
			
		||||
    log,
 | 
			
		||||
    DEBUG,
 | 
			
		||||
@@ -402,17 +405,52 @@ class CephBrokerRq(object):
 | 
			
		||||
 | 
			
		||||
    The API is versioned and defaults to version 1.
 | 
			
		||||
    """
 | 
			
		||||
    def __init__(self, api_version=1):
 | 
			
		||||
    def __init__(self, api_version=1, request_id=None):
 | 
			
		||||
        self.api_version = api_version
 | 
			
		||||
        if request_id:
 | 
			
		||||
            self.request_id = request_id
 | 
			
		||||
        else:
 | 
			
		||||
            self.request_id = str(uuid.uuid1())
 | 
			
		||||
        self.ops = []
 | 
			
		||||
 | 
			
		||||
    def add_op_create_pool(self, name, replica_count=3):
 | 
			
		||||
        self.ops.append({'op': 'create-pool', 'name': name,
 | 
			
		||||
                         'replicas': replica_count})
 | 
			
		||||
 | 
			
		||||
    def set_ops(self, ops):
 | 
			
		||||
        """Set request ops to provided value.
 | 
			
		||||
 | 
			
		||||
        Useful for injecting ops that come from a previous request
 | 
			
		||||
        to allow comparisons to ensure validity.
 | 
			
		||||
        """
 | 
			
		||||
        self.ops = ops
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def request(self):
 | 
			
		||||
        return json.dumps({'api-version': self.api_version, 'ops': self.ops})
 | 
			
		||||
        return json.dumps({'api-version': self.api_version, 'ops': self.ops,
 | 
			
		||||
                           'request-id': self.request_id})
 | 
			
		||||
 | 
			
		||||
    def _ops_equal(self, other):
 | 
			
		||||
        if len(self.ops) == len(other.ops):
 | 
			
		||||
            for req_no in range(0, len(self.ops)):
 | 
			
		||||
                for key in ['replicas', 'name', 'op']:
 | 
			
		||||
                    if self.ops[req_no][key] != other.ops[req_no][key]:
 | 
			
		||||
                        return False
 | 
			
		||||
        else:
 | 
			
		||||
            return False
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    def __eq__(self, other):
 | 
			
		||||
        if not isinstance(other, self.__class__):
 | 
			
		||||
            return False
 | 
			
		||||
        if self.api_version == other.api_version and \
 | 
			
		||||
                self._ops_equal(other):
 | 
			
		||||
            return True
 | 
			
		||||
        else:
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
    def __ne__(self, other):
 | 
			
		||||
        return not self.__eq__(other)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class CephBrokerRsp(object):
 | 
			
		||||
@@ -422,10 +460,15 @@ class CephBrokerRsp(object):
 | 
			
		||||
 | 
			
		||||
    The API is versioned and defaults to version 1.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def __init__(self, encoded_rsp):
 | 
			
		||||
        self.api_version = None
 | 
			
		||||
        self.rsp = json.loads(encoded_rsp)
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def request_id(self):
 | 
			
		||||
        return self.rsp.get('request-id')
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def exit_code(self):
 | 
			
		||||
        return self.rsp.get('exit-code')
 | 
			
		||||
@@ -433,3 +476,182 @@ class CephBrokerRsp(object):
 | 
			
		||||
    @property
 | 
			
		||||
    def exit_msg(self):
 | 
			
		||||
        return self.rsp.get('stderr')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Ceph Broker Conversation:
 | 
			
		||||
# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
 | 
			
		||||
# and send that request to ceph via the ceph relation. The CephBrokerRq has a
 | 
			
		||||
# unique id so that the client can identity which CephBrokerRsp is associated
 | 
			
		||||
# with the request. Ceph will also respond to each client unit individually
 | 
			
		||||
# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
 | 
			
		||||
# via key broker-rsp-glance-0
 | 
			
		||||
#
 | 
			
		||||
# To use this the charm can just do something like:
 | 
			
		||||
#
 | 
			
		||||
# from charmhelpers.contrib.storage.linux.ceph import (
 | 
			
		||||
#     send_request_if_needed,
 | 
			
		||||
#     is_request_complete,
 | 
			
		||||
#     CephBrokerRq,
 | 
			
		||||
# )
 | 
			
		||||
#
 | 
			
		||||
# @hooks.hook('ceph-relation-changed')
 | 
			
		||||
# def ceph_changed():
 | 
			
		||||
#     rq = CephBrokerRq()
 | 
			
		||||
#     rq.add_op_create_pool(name='poolname', replica_count=3)
 | 
			
		||||
#
 | 
			
		||||
#     if is_request_complete(rq):
 | 
			
		||||
#         <Request complete actions>
 | 
			
		||||
#     else:
 | 
			
		||||
#         send_request_if_needed(get_ceph_request())
 | 
			
		||||
#
 | 
			
		||||
# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
 | 
			
		||||
# of glance having sent a request to ceph which ceph has successfully processed
 | 
			
		||||
#  'ceph:8': {
 | 
			
		||||
#      'ceph/0': {
 | 
			
		||||
#          'auth': 'cephx',
 | 
			
		||||
#          'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
 | 
			
		||||
#          'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
 | 
			
		||||
#          'ceph-public-address': '10.5.44.103',
 | 
			
		||||
#          'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
 | 
			
		||||
#          'private-address': '10.5.44.103',
 | 
			
		||||
#      },
 | 
			
		||||
#      'glance/0': {
 | 
			
		||||
#          'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
 | 
			
		||||
#                         '"ops": [{"replicas": 3, "name": "glance", '
 | 
			
		||||
#                         '"op": "create-pool"}]}'),
 | 
			
		||||
#          'private-address': '10.5.44.109',
 | 
			
		||||
#      },
 | 
			
		||||
#  }
 | 
			
		||||
 | 
			
		||||
def get_previous_request(rid):
 | 
			
		||||
    """Return the last ceph broker request sent on a given relation
 | 
			
		||||
 | 
			
		||||
    @param rid: Relation id to query for request
 | 
			
		||||
    """
 | 
			
		||||
    request = None
 | 
			
		||||
    broker_req = relation_get(attribute='broker_req', rid=rid,
 | 
			
		||||
                              unit=local_unit())
 | 
			
		||||
    if broker_req:
 | 
			
		||||
        request_data = json.loads(broker_req)
 | 
			
		||||
        request = CephBrokerRq(api_version=request_data['api-version'],
 | 
			
		||||
                               request_id=request_data['request-id'])
 | 
			
		||||
        request.set_ops(request_data['ops'])
 | 
			
		||||
 | 
			
		||||
    return request
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_request_states(request):
 | 
			
		||||
    """Return a dict of requests per relation id with their corresponding
 | 
			
		||||
       completion state.
 | 
			
		||||
 | 
			
		||||
    This allows a charm, which has a request for ceph, to see whether there is
 | 
			
		||||
    an equivalent request already being processed and if so what state that
 | 
			
		||||
    request is in.
 | 
			
		||||
 | 
			
		||||
    @param request: A CephBrokerRq object
 | 
			
		||||
    """
 | 
			
		||||
    complete = []
 | 
			
		||||
    requests = {}
 | 
			
		||||
    for rid in relation_ids('ceph'):
 | 
			
		||||
        complete = False
 | 
			
		||||
        previous_request = get_previous_request(rid)
 | 
			
		||||
        if request == previous_request:
 | 
			
		||||
            sent = True
 | 
			
		||||
            complete = is_request_complete_for_rid(previous_request, rid)
 | 
			
		||||
        else:
 | 
			
		||||
            sent = False
 | 
			
		||||
            complete = False
 | 
			
		||||
 | 
			
		||||
        requests[rid] = {
 | 
			
		||||
            'sent': sent,
 | 
			
		||||
            'complete': complete,
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    return requests
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def is_request_sent(request):
 | 
			
		||||
    """Check to see if a functionally equivalent request has already been sent
 | 
			
		||||
 | 
			
		||||
    Returns True if a similair request has been sent
 | 
			
		||||
 | 
			
		||||
    @param request: A CephBrokerRq object
 | 
			
		||||
    """
 | 
			
		||||
    states = get_request_states(request)
 | 
			
		||||
    for rid in states.keys():
 | 
			
		||||
        if not states[rid]['sent']:
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
    return True
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def is_request_complete(request):
 | 
			
		||||
    """Check to see if a functionally equivalent request has already been
 | 
			
		||||
    completed
 | 
			
		||||
 | 
			
		||||
    Returns True if a similair request has been completed
 | 
			
		||||
 | 
			
		||||
    @param request: A CephBrokerRq object
 | 
			
		||||
    """
 | 
			
		||||
    states = get_request_states(request)
 | 
			
		||||
    for rid in states.keys():
 | 
			
		||||
        if not states[rid]['complete']:
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
    return True
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def is_request_complete_for_rid(request, rid):
 | 
			
		||||
    """Check if a given request has been completed on the given relation
 | 
			
		||||
 | 
			
		||||
    @param request: A CephBrokerRq object
 | 
			
		||||
    @param rid: Relation ID
 | 
			
		||||
    """
 | 
			
		||||
    broker_key = get_broker_rsp_key()
 | 
			
		||||
    for unit in related_units(rid):
 | 
			
		||||
        rdata = relation_get(rid=rid, unit=unit)
 | 
			
		||||
        if rdata.get(broker_key):
 | 
			
		||||
            rsp = CephBrokerRsp(rdata.get(broker_key))
 | 
			
		||||
            if rsp.request_id == request.request_id:
 | 
			
		||||
                if not rsp.exit_code:
 | 
			
		||||
                    return True
 | 
			
		||||
        else:
 | 
			
		||||
            # The remote unit sent no reply targeted at this unit so either the
 | 
			
		||||
            # remote ceph cluster does not support unit targeted replies or it
 | 
			
		||||
            # has not processed our request yet.
 | 
			
		||||
            if rdata.get('broker_rsp'):
 | 
			
		||||
                request_data = json.loads(rdata['broker_rsp'])
 | 
			
		||||
                if request_data.get('request-id'):
 | 
			
		||||
                    log('Ignoring legacy broker_rsp without unit key as remote '
 | 
			
		||||
                        'service supports unit specific replies', level=DEBUG)
 | 
			
		||||
                else:
 | 
			
		||||
                    log('Using legacy broker_rsp as remote service does not '
 | 
			
		||||
                        'supports unit specific replies', level=DEBUG)
 | 
			
		||||
                    rsp = CephBrokerRsp(rdata['broker_rsp'])
 | 
			
		||||
                    if not rsp.exit_code:
 | 
			
		||||
                        return True
 | 
			
		||||
 | 
			
		||||
    return False
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_broker_rsp_key():
 | 
			
		||||
    """Return broker response key for this unit
 | 
			
		||||
 | 
			
		||||
    This is the key that ceph is going to use to pass request status
 | 
			
		||||
    information back to this unit
 | 
			
		||||
    """
 | 
			
		||||
    return 'broker-rsp-' + local_unit().replace('/', '-')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def send_request_if_needed(request):
 | 
			
		||||
    """Send broker request if an equivalent request has not already been sent
 | 
			
		||||
 | 
			
		||||
    @param request: A CephBrokerRq object
 | 
			
		||||
    """
 | 
			
		||||
    if is_request_sent(request):
 | 
			
		||||
        log('Request already sent but not complete, not sending new request',
 | 
			
		||||
            level=DEBUG)
 | 
			
		||||
    else:
 | 
			
		||||
        for rid in relation_ids('ceph'):
 | 
			
		||||
            log('Sending request {}'.format(request.request_id), level=DEBUG)
 | 
			
		||||
            relation_set(relation_id=rid, broker_req=request.request)
 | 
			
		||||
 
 | 
			
		||||
@@ -623,6 +623,38 @@ def unit_private_ip():
 | 
			
		||||
    return unit_get('private-address')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@cached
 | 
			
		||||
def storage_get(attribute="", storage_id=""):
 | 
			
		||||
    """Get storage attributes"""
 | 
			
		||||
    _args = ['storage-get', '--format=json']
 | 
			
		||||
    if storage_id:
 | 
			
		||||
        _args.extend(('-s', storage_id))
 | 
			
		||||
    if attribute:
 | 
			
		||||
        _args.append(attribute)
 | 
			
		||||
    try:
 | 
			
		||||
        return json.loads(subprocess.check_output(_args).decode('UTF-8'))
 | 
			
		||||
    except ValueError:
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@cached
 | 
			
		||||
def storage_list(storage_name=""):
 | 
			
		||||
    """List the storage IDs for the unit"""
 | 
			
		||||
    _args = ['storage-list', '--format=json']
 | 
			
		||||
    if storage_name:
 | 
			
		||||
        _args.append(storage_name)
 | 
			
		||||
    try:
 | 
			
		||||
        return json.loads(subprocess.check_output(_args).decode('UTF-8'))
 | 
			
		||||
    except ValueError:
 | 
			
		||||
        return None
 | 
			
		||||
    except OSError as e:
 | 
			
		||||
        import errno
 | 
			
		||||
        if e.errno == errno.ENOENT:
 | 
			
		||||
            # storage-list does not exist
 | 
			
		||||
            return []
 | 
			
		||||
        raise
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class UnregisteredHookError(Exception):
 | 
			
		||||
    """Raised when an undefined hook is called"""
 | 
			
		||||
    pass
 | 
			
		||||
 
 | 
			
		||||
@@ -25,11 +25,13 @@ from charmhelpers.core.host import (
 | 
			
		||||
    fstab_mount,
 | 
			
		||||
    mkdir,
 | 
			
		||||
)
 | 
			
		||||
from charmhelpers.core.strutils import bytes_from_string
 | 
			
		||||
from subprocess import check_output
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def hugepage_support(user, group='hugetlb', nr_hugepages=256,
 | 
			
		||||
                     max_map_count=65536, mnt_point='/run/hugepages/kvm',
 | 
			
		||||
                     pagesize='2MB', mount=True):
 | 
			
		||||
                     pagesize='2MB', mount=True, set_shmmax=False):
 | 
			
		||||
    """Enable hugepages on system.
 | 
			
		||||
 | 
			
		||||
    Args:
 | 
			
		||||
@@ -49,6 +51,11 @@ def hugepage_support(user, group='hugetlb', nr_hugepages=256,
 | 
			
		||||
        'vm.max_map_count': max_map_count,
 | 
			
		||||
        'vm.hugetlb_shm_group': gid,
 | 
			
		||||
    }
 | 
			
		||||
    if set_shmmax:
 | 
			
		||||
        shmmax_current = int(check_output(['sysctl', '-n', 'kernel.shmmax']))
 | 
			
		||||
        shmmax_minsize = bytes_from_string(pagesize) * nr_hugepages
 | 
			
		||||
        if shmmax_minsize > shmmax_current:
 | 
			
		||||
            sysctl_settings['kernel.shmmax'] = shmmax_minsize
 | 
			
		||||
    sysctl.create(yaml.dump(sysctl_settings), '/etc/sysctl.d/10-hugepage.conf')
 | 
			
		||||
    mkdir(mnt_point, owner='root', group='root', perms=0o755, force=False)
 | 
			
		||||
    lfstab = fstab.Fstab()
 | 
			
		||||
 
 | 
			
		||||
@@ -18,6 +18,7 @@
 | 
			
		||||
# along with charm-helpers.  If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
 | 
			
		||||
import six
 | 
			
		||||
import re
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def bool_from_string(value):
 | 
			
		||||
@@ -40,3 +41,32 @@ def bool_from_string(value):
 | 
			
		||||
 | 
			
		||||
    msg = "Unable to interpret string value '%s' as boolean" % (value)
 | 
			
		||||
    raise ValueError(msg)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def bytes_from_string(value):
 | 
			
		||||
    """Interpret human readable string value as bytes.
 | 
			
		||||
 | 
			
		||||
    Returns int
 | 
			
		||||
    """
 | 
			
		||||
    BYTE_POWER = {
 | 
			
		||||
        'K': 1,
 | 
			
		||||
        'KB': 1,
 | 
			
		||||
        'M': 2,
 | 
			
		||||
        'MB': 2,
 | 
			
		||||
        'G': 3,
 | 
			
		||||
        'GB': 3,
 | 
			
		||||
        'T': 4,
 | 
			
		||||
        'TB': 4,
 | 
			
		||||
        'P': 5,
 | 
			
		||||
        'PB': 5,
 | 
			
		||||
    }
 | 
			
		||||
    if isinstance(value, six.string_types):
 | 
			
		||||
        value = six.text_type(value)
 | 
			
		||||
    else:
 | 
			
		||||
        msg = "Unable to interpret non-string value '%s' as boolean" % (value)
 | 
			
		||||
        raise ValueError(msg)
 | 
			
		||||
    matches = re.match("([0-9]+)([a-zA-Z]+)", value)
 | 
			
		||||
    if not matches:
 | 
			
		||||
        msg = "Unable to interpret string value '%s' as bytes" % (value)
 | 
			
		||||
        raise ValueError(msg)
 | 
			
		||||
    return int(matches.group(1)) * (1024 ** BYTE_POWER[matches.group(2)])
 | 
			
		||||
 
 | 
			
		||||
@@ -51,7 +51,8 @@ class AmuletDeployment(object):
 | 
			
		||||
        if 'units' not in this_service:
 | 
			
		||||
            this_service['units'] = 1
 | 
			
		||||
 | 
			
		||||
        self.d.add(this_service['name'], units=this_service['units'])
 | 
			
		||||
        self.d.add(this_service['name'], units=this_service['units'],
 | 
			
		||||
                   constraints=this_service.get('constraints'))
 | 
			
		||||
 | 
			
		||||
        for svc in other_services:
 | 
			
		||||
            if 'location' in svc:
 | 
			
		||||
@@ -64,7 +65,8 @@ class AmuletDeployment(object):
 | 
			
		||||
            if 'units' not in svc:
 | 
			
		||||
                svc['units'] = 1
 | 
			
		||||
 | 
			
		||||
            self.d.add(svc['name'], charm=branch_location, units=svc['units'])
 | 
			
		||||
            self.d.add(svc['name'], charm=branch_location, units=svc['units'],
 | 
			
		||||
                       constraints=svc.get('constraints'))
 | 
			
		||||
 | 
			
		||||
    def _add_relations(self, relations):
 | 
			
		||||
        """Add all of the relations for the services."""
 | 
			
		||||
 
 | 
			
		||||
@@ -19,9 +19,11 @@ import json
 | 
			
		||||
import logging
 | 
			
		||||
import os
 | 
			
		||||
import re
 | 
			
		||||
import socket
 | 
			
		||||
import subprocess
 | 
			
		||||
import sys
 | 
			
		||||
import time
 | 
			
		||||
import uuid
 | 
			
		||||
 | 
			
		||||
import amulet
 | 
			
		||||
import distro_info
 | 
			
		||||
@@ -114,7 +116,7 @@ class AmuletUtils(object):
 | 
			
		||||
        # /!\ DEPRECATION WARNING (beisner):
 | 
			
		||||
        # New and existing tests should be rewritten to use
 | 
			
		||||
        # validate_services_by_name() as it is aware of init systems.
 | 
			
		||||
        self.log.warn('/!\\ DEPRECATION WARNING:  use '
 | 
			
		||||
        self.log.warn('DEPRECATION WARNING:  use '
 | 
			
		||||
                      'validate_services_by_name instead of validate_services '
 | 
			
		||||
                      'due to init system differences.')
 | 
			
		||||
 | 
			
		||||
@@ -269,33 +271,52 @@ class AmuletUtils(object):
 | 
			
		||||
        """Get last modification time of directory."""
 | 
			
		||||
        return sentry_unit.directory_stat(directory)['mtime']
 | 
			
		||||
 | 
			
		||||
    def _get_proc_start_time(self, sentry_unit, service, pgrep_full=False):
 | 
			
		||||
        """Get process' start time.
 | 
			
		||||
    def _get_proc_start_time(self, sentry_unit, service, pgrep_full=None):
 | 
			
		||||
        """Get start time of a process based on the last modification time
 | 
			
		||||
           of the /proc/pid directory.
 | 
			
		||||
 | 
			
		||||
           Determine start time of the process based on the last modification
 | 
			
		||||
           time of the /proc/pid directory. If pgrep_full is True, the process
 | 
			
		||||
           name is matched against the full command line.
 | 
			
		||||
           """
 | 
			
		||||
        if pgrep_full:
 | 
			
		||||
            cmd = 'pgrep -o -f {}'.format(service)
 | 
			
		||||
        else:
 | 
			
		||||
            cmd = 'pgrep -o {}'.format(service)
 | 
			
		||||
        cmd = cmd + '  | grep  -v pgrep || exit 0'
 | 
			
		||||
        cmd_out = sentry_unit.run(cmd)
 | 
			
		||||
        self.log.debug('CMDout: ' + str(cmd_out))
 | 
			
		||||
        if cmd_out[0]:
 | 
			
		||||
            self.log.debug('Pid for %s %s' % (service, str(cmd_out[0])))
 | 
			
		||||
            proc_dir = '/proc/{}'.format(cmd_out[0].strip())
 | 
			
		||||
            return self._get_dir_mtime(sentry_unit, proc_dir)
 | 
			
		||||
        :sentry_unit:  The sentry unit to check for the service on
 | 
			
		||||
        :service:  service name to look for in process table
 | 
			
		||||
        :pgrep_full:  [Deprecated] Use full command line search mode with pgrep
 | 
			
		||||
        :returns:  epoch time of service process start
 | 
			
		||||
        :param commands:  list of bash commands
 | 
			
		||||
        :param sentry_units:  list of sentry unit pointers
 | 
			
		||||
        :returns:  None if successful; Failure message otherwise
 | 
			
		||||
        """
 | 
			
		||||
        if pgrep_full is not None:
 | 
			
		||||
            # /!\ DEPRECATION WARNING (beisner):
 | 
			
		||||
            # No longer implemented, as pidof is now used instead of pgrep.
 | 
			
		||||
            # https://bugs.launchpad.net/charm-helpers/+bug/1474030
 | 
			
		||||
            self.log.warn('DEPRECATION WARNING:  pgrep_full bool is no '
 | 
			
		||||
                          'longer implemented re: lp 1474030.')
 | 
			
		||||
 | 
			
		||||
        pid_list = self.get_process_id_list(sentry_unit, service)
 | 
			
		||||
        pid = pid_list[0]
 | 
			
		||||
        proc_dir = '/proc/{}'.format(pid)
 | 
			
		||||
        self.log.debug('Pid for {} on {}: {}'.format(
 | 
			
		||||
            service, sentry_unit.info['unit_name'], pid))
 | 
			
		||||
 | 
			
		||||
        return self._get_dir_mtime(sentry_unit, proc_dir)
 | 
			
		||||
 | 
			
		||||
    def service_restarted(self, sentry_unit, service, filename,
 | 
			
		||||
                          pgrep_full=False, sleep_time=20):
 | 
			
		||||
                          pgrep_full=None, sleep_time=20):
 | 
			
		||||
        """Check if service was restarted.
 | 
			
		||||
 | 
			
		||||
           Compare a service's start time vs a file's last modification time
 | 
			
		||||
           (such as a config file for that service) to determine if the service
 | 
			
		||||
           has been restarted.
 | 
			
		||||
           """
 | 
			
		||||
        # /!\ DEPRECATION WARNING (beisner):
 | 
			
		||||
        # This method is prone to races in that no before-time is known.
 | 
			
		||||
        # Use validate_service_config_changed instead.
 | 
			
		||||
 | 
			
		||||
        # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
 | 
			
		||||
        # used instead of pgrep.  pgrep_full is still passed through to ensure
 | 
			
		||||
        # deprecation WARNS.  lp1474030
 | 
			
		||||
        self.log.warn('DEPRECATION WARNING:  use '
 | 
			
		||||
                      'validate_service_config_changed instead of '
 | 
			
		||||
                      'service_restarted due to known races.')
 | 
			
		||||
 | 
			
		||||
        time.sleep(sleep_time)
 | 
			
		||||
        if (self._get_proc_start_time(sentry_unit, service, pgrep_full) >=
 | 
			
		||||
                self._get_file_mtime(sentry_unit, filename)):
 | 
			
		||||
@@ -304,78 +325,122 @@ class AmuletUtils(object):
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
    def service_restarted_since(self, sentry_unit, mtime, service,
 | 
			
		||||
                                pgrep_full=False, sleep_time=20,
 | 
			
		||||
                                retry_count=2):
 | 
			
		||||
                                pgrep_full=None, sleep_time=20,
 | 
			
		||||
                                retry_count=30, retry_sleep_time=10):
 | 
			
		||||
        """Check if service was been started after a given time.
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
          sentry_unit (sentry): The sentry unit to check for the service on
 | 
			
		||||
          mtime (float): The epoch time to check against
 | 
			
		||||
          service (string): service name to look for in process table
 | 
			
		||||
          pgrep_full (boolean): Use full command line search mode with pgrep
 | 
			
		||||
          sleep_time (int): Seconds to sleep before looking for process
 | 
			
		||||
          retry_count (int): If service is not found, how many times to retry
 | 
			
		||||
          pgrep_full: [Deprecated] Use full command line search mode with pgrep
 | 
			
		||||
          sleep_time (int): Initial sleep time (s) before looking for file
 | 
			
		||||
          retry_sleep_time (int): Time (s) to sleep between retries
 | 
			
		||||
          retry_count (int): If file is not found, how many times to retry
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
          bool: True if service found and its start time it newer than mtime,
 | 
			
		||||
                False if service is older than mtime or if service was
 | 
			
		||||
                not found.
 | 
			
		||||
        """
 | 
			
		||||
        self.log.debug('Checking %s restarted since %s' % (service, mtime))
 | 
			
		||||
        # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
 | 
			
		||||
        # used instead of pgrep.  pgrep_full is still passed through to ensure
 | 
			
		||||
        # deprecation WARNS.  lp1474030
 | 
			
		||||
 | 
			
		||||
        unit_name = sentry_unit.info['unit_name']
 | 
			
		||||
        self.log.debug('Checking that %s service restarted since %s on '
 | 
			
		||||
                       '%s' % (service, mtime, unit_name))
 | 
			
		||||
        time.sleep(sleep_time)
 | 
			
		||||
        proc_start_time = self._get_proc_start_time(sentry_unit, service,
 | 
			
		||||
                                                    pgrep_full)
 | 
			
		||||
        while retry_count > 0 and not proc_start_time:
 | 
			
		||||
            self.log.debug('No pid file found for service %s, will retry %i '
 | 
			
		||||
                           'more times' % (service, retry_count))
 | 
			
		||||
            time.sleep(30)
 | 
			
		||||
            proc_start_time = self._get_proc_start_time(sentry_unit, service,
 | 
			
		||||
                                                        pgrep_full)
 | 
			
		||||
            retry_count = retry_count - 1
 | 
			
		||||
        proc_start_time = None
 | 
			
		||||
        tries = 0
 | 
			
		||||
        while tries <= retry_count and not proc_start_time:
 | 
			
		||||
            try:
 | 
			
		||||
                proc_start_time = self._get_proc_start_time(sentry_unit,
 | 
			
		||||
                                                            service,
 | 
			
		||||
                                                            pgrep_full)
 | 
			
		||||
                self.log.debug('Attempt {} to get {} proc start time on {} '
 | 
			
		||||
                               'OK'.format(tries, service, unit_name))
 | 
			
		||||
            except IOError as e:
 | 
			
		||||
                # NOTE(beisner) - race avoidance, proc may not exist yet.
 | 
			
		||||
                # https://bugs.launchpad.net/charm-helpers/+bug/1474030
 | 
			
		||||
                self.log.debug('Attempt {} to get {} proc start time on {} '
 | 
			
		||||
                               'failed\n{}'.format(tries, service,
 | 
			
		||||
                                                   unit_name, e))
 | 
			
		||||
                time.sleep(retry_sleep_time)
 | 
			
		||||
                tries += 1
 | 
			
		||||
 | 
			
		||||
        if not proc_start_time:
 | 
			
		||||
            self.log.warn('No proc start time found, assuming service did '
 | 
			
		||||
                          'not start')
 | 
			
		||||
            return False
 | 
			
		||||
        if proc_start_time >= mtime:
 | 
			
		||||
            self.log.debug('proc start time is newer than provided mtime'
 | 
			
		||||
                           '(%s >= %s)' % (proc_start_time, mtime))
 | 
			
		||||
            self.log.debug('Proc start time is newer than provided mtime'
 | 
			
		||||
                           '(%s >= %s) on %s (OK)' % (proc_start_time,
 | 
			
		||||
                                                      mtime, unit_name))
 | 
			
		||||
            return True
 | 
			
		||||
        else:
 | 
			
		||||
            self.log.warn('proc start time (%s) is older than provided mtime '
 | 
			
		||||
                          '(%s), service did not restart' % (proc_start_time,
 | 
			
		||||
                                                             mtime))
 | 
			
		||||
            self.log.warn('Proc start time (%s) is older than provided mtime '
 | 
			
		||||
                          '(%s) on %s, service did not '
 | 
			
		||||
                          'restart' % (proc_start_time, mtime, unit_name))
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
    def config_updated_since(self, sentry_unit, filename, mtime,
 | 
			
		||||
                             sleep_time=20):
 | 
			
		||||
                             sleep_time=20, retry_count=30,
 | 
			
		||||
                             retry_sleep_time=10):
 | 
			
		||||
        """Check if file was modified after a given time.
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
          sentry_unit (sentry): The sentry unit to check the file mtime on
 | 
			
		||||
          filename (string): The file to check mtime of
 | 
			
		||||
          mtime (float): The epoch time to check against
 | 
			
		||||
          sleep_time (int): Seconds to sleep before looking for process
 | 
			
		||||
          sleep_time (int): Initial sleep time (s) before looking for file
 | 
			
		||||
          retry_sleep_time (int): Time (s) to sleep between retries
 | 
			
		||||
          retry_count (int): If file is not found, how many times to retry
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
          bool: True if file was modified more recently than mtime, False if
 | 
			
		||||
                file was modified before mtime,
 | 
			
		||||
                file was modified before mtime, or if file not found.
 | 
			
		||||
        """
 | 
			
		||||
        self.log.debug('Checking %s updated since %s' % (filename, mtime))
 | 
			
		||||
        unit_name = sentry_unit.info['unit_name']
 | 
			
		||||
        self.log.debug('Checking that %s updated since %s on '
 | 
			
		||||
                       '%s' % (filename, mtime, unit_name))
 | 
			
		||||
        time.sleep(sleep_time)
 | 
			
		||||
        file_mtime = self._get_file_mtime(sentry_unit, filename)
 | 
			
		||||
        file_mtime = None
 | 
			
		||||
        tries = 0
 | 
			
		||||
        while tries <= retry_count and not file_mtime:
 | 
			
		||||
            try:
 | 
			
		||||
                file_mtime = self._get_file_mtime(sentry_unit, filename)
 | 
			
		||||
                self.log.debug('Attempt {} to get {} file mtime on {} '
 | 
			
		||||
                               'OK'.format(tries, filename, unit_name))
 | 
			
		||||
            except IOError as e:
 | 
			
		||||
                # NOTE(beisner) - race avoidance, file may not exist yet.
 | 
			
		||||
                # https://bugs.launchpad.net/charm-helpers/+bug/1474030
 | 
			
		||||
                self.log.debug('Attempt {} to get {} file mtime on {} '
 | 
			
		||||
                               'failed\n{}'.format(tries, filename,
 | 
			
		||||
                                                   unit_name, e))
 | 
			
		||||
                time.sleep(retry_sleep_time)
 | 
			
		||||
                tries += 1
 | 
			
		||||
 | 
			
		||||
        if not file_mtime:
 | 
			
		||||
            self.log.warn('Could not determine file mtime, assuming '
 | 
			
		||||
                          'file does not exist')
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
        if file_mtime >= mtime:
 | 
			
		||||
            self.log.debug('File mtime is newer than provided mtime '
 | 
			
		||||
                           '(%s >= %s)' % (file_mtime, mtime))
 | 
			
		||||
                           '(%s >= %s) on %s (OK)' % (file_mtime,
 | 
			
		||||
                                                      mtime, unit_name))
 | 
			
		||||
            return True
 | 
			
		||||
        else:
 | 
			
		||||
            self.log.warn('File mtime %s is older than provided mtime %s'
 | 
			
		||||
                          % (file_mtime, mtime))
 | 
			
		||||
            self.log.warn('File mtime is older than provided mtime'
 | 
			
		||||
                          '(%s < on %s) on %s' % (file_mtime,
 | 
			
		||||
                                                  mtime, unit_name))
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
    def validate_service_config_changed(self, sentry_unit, mtime, service,
 | 
			
		||||
                                        filename, pgrep_full=False,
 | 
			
		||||
                                        sleep_time=20, retry_count=2):
 | 
			
		||||
                                        filename, pgrep_full=None,
 | 
			
		||||
                                        sleep_time=20, retry_count=30,
 | 
			
		||||
                                        retry_sleep_time=10):
 | 
			
		||||
        """Check service and file were updated after mtime
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
@@ -383,9 +448,10 @@ class AmuletUtils(object):
 | 
			
		||||
          mtime (float): The epoch time to check against
 | 
			
		||||
          service (string): service name to look for in process table
 | 
			
		||||
          filename (string): The file to check mtime of
 | 
			
		||||
          pgrep_full (boolean): Use full command line search mode with pgrep
 | 
			
		||||
          sleep_time (int): Seconds to sleep before looking for process
 | 
			
		||||
          pgrep_full: [Deprecated] Use full command line search mode with pgrep
 | 
			
		||||
          sleep_time (int): Initial sleep in seconds to pass to test helpers
 | 
			
		||||
          retry_count (int): If service is not found, how many times to retry
 | 
			
		||||
          retry_sleep_time (int): Time in seconds to wait between retries
 | 
			
		||||
 | 
			
		||||
        Typical Usage:
 | 
			
		||||
            u = OpenStackAmuletUtils(ERROR)
 | 
			
		||||
@@ -402,15 +468,27 @@ class AmuletUtils(object):
 | 
			
		||||
                mtime, False if service is older than mtime or if service was
 | 
			
		||||
                not found or if filename was modified before mtime.
 | 
			
		||||
        """
 | 
			
		||||
        self.log.debug('Checking %s restarted since %s' % (service, mtime))
 | 
			
		||||
        time.sleep(sleep_time)
 | 
			
		||||
        service_restart = self.service_restarted_since(sentry_unit, mtime,
 | 
			
		||||
                                                       service,
 | 
			
		||||
                                                       pgrep_full=pgrep_full,
 | 
			
		||||
                                                       sleep_time=0,
 | 
			
		||||
                                                       retry_count=retry_count)
 | 
			
		||||
        config_update = self.config_updated_since(sentry_unit, filename, mtime,
 | 
			
		||||
                                                  sleep_time=0)
 | 
			
		||||
 | 
			
		||||
        # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
 | 
			
		||||
        # used instead of pgrep.  pgrep_full is still passed through to ensure
 | 
			
		||||
        # deprecation WARNS.  lp1474030
 | 
			
		||||
 | 
			
		||||
        service_restart = self.service_restarted_since(
 | 
			
		||||
            sentry_unit, mtime,
 | 
			
		||||
            service,
 | 
			
		||||
            pgrep_full=pgrep_full,
 | 
			
		||||
            sleep_time=sleep_time,
 | 
			
		||||
            retry_count=retry_count,
 | 
			
		||||
            retry_sleep_time=retry_sleep_time)
 | 
			
		||||
 | 
			
		||||
        config_update = self.config_updated_since(
 | 
			
		||||
            sentry_unit,
 | 
			
		||||
            filename,
 | 
			
		||||
            mtime,
 | 
			
		||||
            sleep_time=sleep_time,
 | 
			
		||||
            retry_count=retry_count,
 | 
			
		||||
            retry_sleep_time=retry_sleep_time)
 | 
			
		||||
 | 
			
		||||
        return service_restart and config_update
 | 
			
		||||
 | 
			
		||||
    def get_sentry_time(self, sentry_unit):
 | 
			
		||||
@@ -428,7 +506,6 @@ class AmuletUtils(object):
 | 
			
		||||
        """Return a list of all Ubuntu releases in order of release."""
 | 
			
		||||
        _d = distro_info.UbuntuDistroInfo()
 | 
			
		||||
        _release_list = _d.all
 | 
			
		||||
        self.log.debug('Ubuntu release list: {}'.format(_release_list))
 | 
			
		||||
        return _release_list
 | 
			
		||||
 | 
			
		||||
    def file_to_url(self, file_rel_path):
 | 
			
		||||
@@ -568,6 +645,142 @@ class AmuletUtils(object):
 | 
			
		||||
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    def validate_sectionless_conf(self, file_contents, expected):
 | 
			
		||||
        """A crude conf parser.  Useful to inspect configuration files which
 | 
			
		||||
        do not have section headers (as would be necessary in order to use
 | 
			
		||||
        the configparser).  Such as openstack-dashboard or rabbitmq confs."""
 | 
			
		||||
        for line in file_contents.split('\n'):
 | 
			
		||||
            if '=' in line:
 | 
			
		||||
                args = line.split('=')
 | 
			
		||||
                if len(args) <= 1:
 | 
			
		||||
                    continue
 | 
			
		||||
                key = args[0].strip()
 | 
			
		||||
                value = args[1].strip()
 | 
			
		||||
                if key in expected.keys():
 | 
			
		||||
                    if expected[key] != value:
 | 
			
		||||
                        msg = ('Config mismatch.  Expected, actual:  {}, '
 | 
			
		||||
                               '{}'.format(expected[key], value))
 | 
			
		||||
                        amulet.raise_status(amulet.FAIL, msg=msg)
 | 
			
		||||
 | 
			
		||||
    def get_unit_hostnames(self, units):
 | 
			
		||||
        """Return a dict of juju unit names to hostnames."""
 | 
			
		||||
        host_names = {}
 | 
			
		||||
        for unit in units:
 | 
			
		||||
            host_names[unit.info['unit_name']] = \
 | 
			
		||||
                str(unit.file_contents('/etc/hostname').strip())
 | 
			
		||||
        self.log.debug('Unit host names: {}'.format(host_names))
 | 
			
		||||
        return host_names
 | 
			
		||||
 | 
			
		||||
    def run_cmd_unit(self, sentry_unit, cmd):
 | 
			
		||||
        """Run a command on a unit, return the output and exit code."""
 | 
			
		||||
        output, code = sentry_unit.run(cmd)
 | 
			
		||||
        if code == 0:
 | 
			
		||||
            self.log.debug('{} `{}` command returned {} '
 | 
			
		||||
                           '(OK)'.format(sentry_unit.info['unit_name'],
 | 
			
		||||
                                         cmd, code))
 | 
			
		||||
        else:
 | 
			
		||||
            msg = ('{} `{}` command returned {} '
 | 
			
		||||
                   '{}'.format(sentry_unit.info['unit_name'],
 | 
			
		||||
                               cmd, code, output))
 | 
			
		||||
            amulet.raise_status(amulet.FAIL, msg=msg)
 | 
			
		||||
        return str(output), code
 | 
			
		||||
 | 
			
		||||
    def file_exists_on_unit(self, sentry_unit, file_name):
 | 
			
		||||
        """Check if a file exists on a unit."""
 | 
			
		||||
        try:
 | 
			
		||||
            sentry_unit.file_stat(file_name)
 | 
			
		||||
            return True
 | 
			
		||||
        except IOError:
 | 
			
		||||
            return False
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            msg = 'Error checking file {}: {}'.format(file_name, e)
 | 
			
		||||
            amulet.raise_status(amulet.FAIL, msg=msg)
 | 
			
		||||
 | 
			
		||||
    def file_contents_safe(self, sentry_unit, file_name,
 | 
			
		||||
                           max_wait=60, fatal=False):
 | 
			
		||||
        """Get file contents from a sentry unit.  Wrap amulet file_contents
 | 
			
		||||
        with retry logic to address races where a file checks as existing,
 | 
			
		||||
        but no longer exists by the time file_contents is called.
 | 
			
		||||
        Return None if file not found. Optionally raise if fatal is True."""
 | 
			
		||||
        unit_name = sentry_unit.info['unit_name']
 | 
			
		||||
        file_contents = False
 | 
			
		||||
        tries = 0
 | 
			
		||||
        while not file_contents and tries < (max_wait / 4):
 | 
			
		||||
            try:
 | 
			
		||||
                file_contents = sentry_unit.file_contents(file_name)
 | 
			
		||||
            except IOError:
 | 
			
		||||
                self.log.debug('Attempt {} to open file {} from {} '
 | 
			
		||||
                               'failed'.format(tries, file_name,
 | 
			
		||||
                                               unit_name))
 | 
			
		||||
                time.sleep(4)
 | 
			
		||||
                tries += 1
 | 
			
		||||
 | 
			
		||||
        if file_contents:
 | 
			
		||||
            return file_contents
 | 
			
		||||
        elif not fatal:
 | 
			
		||||
            return None
 | 
			
		||||
        elif fatal:
 | 
			
		||||
            msg = 'Failed to get file contents from unit.'
 | 
			
		||||
            amulet.raise_status(amulet.FAIL, msg)
 | 
			
		||||
 | 
			
		||||
    def port_knock_tcp(self, host="localhost", port=22, timeout=15):
 | 
			
		||||
        """Open a TCP socket to check for a listening sevice on a host.
 | 
			
		||||
 | 
			
		||||
        :param host: host name or IP address, default to localhost
 | 
			
		||||
        :param port: TCP port number, default to 22
 | 
			
		||||
        :param timeout: Connect timeout, default to 15 seconds
 | 
			
		||||
        :returns: True if successful, False if connect failed
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
        # Resolve host name if possible
 | 
			
		||||
        try:
 | 
			
		||||
            connect_host = socket.gethostbyname(host)
 | 
			
		||||
            host_human = "{} ({})".format(connect_host, host)
 | 
			
		||||
        except socket.error as e:
 | 
			
		||||
            self.log.warn('Unable to resolve address: '
 | 
			
		||||
                          '{} ({}) Trying anyway!'.format(host, e))
 | 
			
		||||
            connect_host = host
 | 
			
		||||
            host_human = connect_host
 | 
			
		||||
 | 
			
		||||
        # Attempt socket connection
 | 
			
		||||
        try:
 | 
			
		||||
            knock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | 
			
		||||
            knock.settimeout(timeout)
 | 
			
		||||
            knock.connect((connect_host, port))
 | 
			
		||||
            knock.close()
 | 
			
		||||
            self.log.debug('Socket connect OK for host '
 | 
			
		||||
                           '{} on port {}.'.format(host_human, port))
 | 
			
		||||
            return True
 | 
			
		||||
        except socket.error as e:
 | 
			
		||||
            self.log.debug('Socket connect FAIL for'
 | 
			
		||||
                           ' {} port {} ({})'.format(host_human, port, e))
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
    def port_knock_units(self, sentry_units, port=22,
 | 
			
		||||
                         timeout=15, expect_success=True):
 | 
			
		||||
        """Open a TCP socket to check for a listening sevice on each
 | 
			
		||||
        listed juju unit.
 | 
			
		||||
 | 
			
		||||
        :param sentry_units: list of sentry unit pointers
 | 
			
		||||
        :param port: TCP port number, default to 22
 | 
			
		||||
        :param timeout: Connect timeout, default to 15 seconds
 | 
			
		||||
        :expect_success: True by default, set False to invert logic
 | 
			
		||||
        :returns: None if successful, Failure message otherwise
 | 
			
		||||
        """
 | 
			
		||||
        for unit in sentry_units:
 | 
			
		||||
            host = unit.info['public-address']
 | 
			
		||||
            connected = self.port_knock_tcp(host, port, timeout)
 | 
			
		||||
            if not connected and expect_success:
 | 
			
		||||
                return 'Socket connect failed.'
 | 
			
		||||
            elif connected and not expect_success:
 | 
			
		||||
                return 'Socket connected unexpectedly.'
 | 
			
		||||
 | 
			
		||||
    def get_uuid_epoch_stamp(self):
 | 
			
		||||
        """Returns a stamp string based on uuid4 and epoch time.  Useful in
 | 
			
		||||
        generating test messages which need to be unique-ish."""
 | 
			
		||||
        return '[{}-{}]'.format(uuid.uuid4(), time.time())
 | 
			
		||||
 | 
			
		||||
# amulet juju action helpers:
 | 
			
		||||
    def run_action(self, unit_sentry, action,
 | 
			
		||||
                   _check_output=subprocess.check_output):
 | 
			
		||||
        """Run the named action on a given unit sentry.
 | 
			
		||||
 
 | 
			
		||||
@@ -44,20 +44,31 @@ class OpenStackAmuletDeployment(AmuletDeployment):
 | 
			
		||||
           Determine if the local branch being tested is derived from its
 | 
			
		||||
           stable or next (dev) branch, and based on this, use the corresonding
 | 
			
		||||
           stable or next branches for the other_services."""
 | 
			
		||||
 | 
			
		||||
        # Charms outside the lp:~openstack-charmers namespace
 | 
			
		||||
        base_charms = ['mysql', 'mongodb', 'nrpe']
 | 
			
		||||
 | 
			
		||||
        # Force these charms to current series even when using an older series.
 | 
			
		||||
        # ie. Use trusty/nrpe even when series is precise, as the P charm
 | 
			
		||||
        # does not possess the necessary external master config and hooks.
 | 
			
		||||
        force_series_current = ['nrpe']
 | 
			
		||||
 | 
			
		||||
        if self.series in ['precise', 'trusty']:
 | 
			
		||||
            base_series = self.series
 | 
			
		||||
        else:
 | 
			
		||||
            base_series = self.current_next
 | 
			
		||||
 | 
			
		||||
        if self.stable:
 | 
			
		||||
            for svc in other_services:
 | 
			
		||||
        for svc in other_services:
 | 
			
		||||
            if svc['name'] in force_series_current:
 | 
			
		||||
                base_series = self.current_next
 | 
			
		||||
            # If a location has been explicitly set, use it
 | 
			
		||||
            if svc.get('location'):
 | 
			
		||||
                continue
 | 
			
		||||
            if self.stable:
 | 
			
		||||
                temp = 'lp:charms/{}/{}'
 | 
			
		||||
                svc['location'] = temp.format(base_series,
 | 
			
		||||
                                              svc['name'])
 | 
			
		||||
        else:
 | 
			
		||||
            for svc in other_services:
 | 
			
		||||
            else:
 | 
			
		||||
                if svc['name'] in base_charms:
 | 
			
		||||
                    temp = 'lp:charms/{}/{}'
 | 
			
		||||
                    svc['location'] = temp.format(base_series,
 | 
			
		||||
@@ -66,6 +77,7 @@ class OpenStackAmuletDeployment(AmuletDeployment):
 | 
			
		||||
                    temp = 'lp:~openstack-charmers/charms/{}/{}/next'
 | 
			
		||||
                    svc['location'] = temp.format(self.current_next,
 | 
			
		||||
                                                  svc['name'])
 | 
			
		||||
 | 
			
		||||
        return other_services
 | 
			
		||||
 | 
			
		||||
    def _add_services(self, this_service, other_services):
 | 
			
		||||
@@ -77,21 +89,23 @@ class OpenStackAmuletDeployment(AmuletDeployment):
 | 
			
		||||
 | 
			
		||||
        services = other_services
 | 
			
		||||
        services.append(this_service)
 | 
			
		||||
 | 
			
		||||
        # Charms which should use the source config option
 | 
			
		||||
        use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph',
 | 
			
		||||
                      'ceph-osd', 'ceph-radosgw']
 | 
			
		||||
        # Most OpenStack subordinate charms do not expose an origin option
 | 
			
		||||
        # as that is controlled by the principle.
 | 
			
		||||
        ignore = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
 | 
			
		||||
 | 
			
		||||
        # Charms which can not use openstack-origin, ie. many subordinates
 | 
			
		||||
        no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
 | 
			
		||||
 | 
			
		||||
        if self.openstack:
 | 
			
		||||
            for svc in services:
 | 
			
		||||
                if svc['name'] not in use_source + ignore:
 | 
			
		||||
                if svc['name'] not in use_source + no_origin:
 | 
			
		||||
                    config = {'openstack-origin': self.openstack}
 | 
			
		||||
                    self.d.configure(svc['name'], config)
 | 
			
		||||
 | 
			
		||||
        if self.source:
 | 
			
		||||
            for svc in services:
 | 
			
		||||
                if svc['name'] in use_source and svc['name'] not in ignore:
 | 
			
		||||
                if svc['name'] in use_source and svc['name'] not in no_origin:
 | 
			
		||||
                    config = {'source': self.source}
 | 
			
		||||
                    self.d.configure(svc['name'], config)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -27,6 +27,7 @@ import glanceclient.v1.client as glance_client
 | 
			
		||||
import heatclient.v1.client as heat_client
 | 
			
		||||
import keystoneclient.v2_0 as keystone_client
 | 
			
		||||
import novaclient.v1_1.client as nova_client
 | 
			
		||||
import pika
 | 
			
		||||
import swiftclient
 | 
			
		||||
 | 
			
		||||
from charmhelpers.contrib.amulet.utils import (
 | 
			
		||||
@@ -602,3 +603,361 @@ class OpenStackAmuletUtils(AmuletUtils):
 | 
			
		||||
            self.log.debug('Ceph {} samples (OK): '
 | 
			
		||||
                           '{}'.format(sample_type, samples))
 | 
			
		||||
            return None
 | 
			
		||||
 | 
			
		||||
# rabbitmq/amqp specific helpers:
 | 
			
		||||
    def add_rmq_test_user(self, sentry_units,
 | 
			
		||||
                          username="testuser1", password="changeme"):
 | 
			
		||||
        """Add a test user via the first rmq juju unit, check connection as
 | 
			
		||||
        the new user against all sentry units.
 | 
			
		||||
 | 
			
		||||
        :param sentry_units: list of sentry unit pointers
 | 
			
		||||
        :param username: amqp user name, default to testuser1
 | 
			
		||||
        :param password: amqp user password
 | 
			
		||||
        :returns: None if successful.  Raise on error.
 | 
			
		||||
        """
 | 
			
		||||
        self.log.debug('Adding rmq user ({})...'.format(username))
 | 
			
		||||
 | 
			
		||||
        # Check that user does not already exist
 | 
			
		||||
        cmd_user_list = 'rabbitmqctl list_users'
 | 
			
		||||
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
 | 
			
		||||
        if username in output:
 | 
			
		||||
            self.log.warning('User ({}) already exists, returning '
 | 
			
		||||
                             'gracefully.'.format(username))
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        perms = '".*" ".*" ".*"'
 | 
			
		||||
        cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
 | 
			
		||||
                'rabbitmqctl set_permissions {} {}'.format(username, perms)]
 | 
			
		||||
 | 
			
		||||
        # Add user via first unit
 | 
			
		||||
        for cmd in cmds:
 | 
			
		||||
            output, _ = self.run_cmd_unit(sentry_units[0], cmd)
 | 
			
		||||
 | 
			
		||||
        # Check connection against the other sentry_units
 | 
			
		||||
        self.log.debug('Checking user connect against units...')
 | 
			
		||||
        for sentry_unit in sentry_units:
 | 
			
		||||
            connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
 | 
			
		||||
                                                   username=username,
 | 
			
		||||
                                                   password=password)
 | 
			
		||||
            connection.close()
 | 
			
		||||
 | 
			
		||||
    def delete_rmq_test_user(self, sentry_units, username="testuser1"):
 | 
			
		||||
        """Delete a rabbitmq user via the first rmq juju unit.
 | 
			
		||||
 | 
			
		||||
        :param sentry_units: list of sentry unit pointers
 | 
			
		||||
        :param username: amqp user name, default to testuser1
 | 
			
		||||
        :param password: amqp user password
 | 
			
		||||
        :returns: None if successful or no such user.
 | 
			
		||||
        """
 | 
			
		||||
        self.log.debug('Deleting rmq user ({})...'.format(username))
 | 
			
		||||
 | 
			
		||||
        # Check that the user exists
 | 
			
		||||
        cmd_user_list = 'rabbitmqctl list_users'
 | 
			
		||||
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
 | 
			
		||||
 | 
			
		||||
        if username not in output:
 | 
			
		||||
            self.log.warning('User ({}) does not exist, returning '
 | 
			
		||||
                             'gracefully.'.format(username))
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        # Delete the user
 | 
			
		||||
        cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
 | 
			
		||||
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
 | 
			
		||||
 | 
			
		||||
    def get_rmq_cluster_status(self, sentry_unit):
 | 
			
		||||
        """Execute rabbitmq cluster status command on a unit and return
 | 
			
		||||
        the full output.
 | 
			
		||||
 | 
			
		||||
        :param unit: sentry unit
 | 
			
		||||
        :returns: String containing console output of cluster status command
 | 
			
		||||
        """
 | 
			
		||||
        cmd = 'rabbitmqctl cluster_status'
 | 
			
		||||
        output, _ = self.run_cmd_unit(sentry_unit, cmd)
 | 
			
		||||
        self.log.debug('{} cluster_status:\n{}'.format(
 | 
			
		||||
            sentry_unit.info['unit_name'], output))
 | 
			
		||||
        return str(output)
 | 
			
		||||
 | 
			
		||||
    def get_rmq_cluster_running_nodes(self, sentry_unit):
 | 
			
		||||
        """Parse rabbitmqctl cluster_status output string, return list of
 | 
			
		||||
        running rabbitmq cluster nodes.
 | 
			
		||||
 | 
			
		||||
        :param unit: sentry unit
 | 
			
		||||
        :returns: List containing node names of running nodes
 | 
			
		||||
        """
 | 
			
		||||
        # NOTE(beisner): rabbitmqctl cluster_status output is not
 | 
			
		||||
        # json-parsable, do string chop foo, then json.loads that.
 | 
			
		||||
        str_stat = self.get_rmq_cluster_status(sentry_unit)
 | 
			
		||||
        if 'running_nodes' in str_stat:
 | 
			
		||||
            pos_start = str_stat.find("{running_nodes,") + 15
 | 
			
		||||
            pos_end = str_stat.find("]},", pos_start) + 1
 | 
			
		||||
            str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
 | 
			
		||||
            run_nodes = json.loads(str_run_nodes)
 | 
			
		||||
            return run_nodes
 | 
			
		||||
        else:
 | 
			
		||||
            return []
 | 
			
		||||
 | 
			
		||||
    def validate_rmq_cluster_running_nodes(self, sentry_units):
 | 
			
		||||
        """Check that all rmq unit hostnames are represented in the
 | 
			
		||||
        cluster_status output of all units.
 | 
			
		||||
 | 
			
		||||
        :param host_names: dict of juju unit names to host names
 | 
			
		||||
        :param units: list of sentry unit pointers (all rmq units)
 | 
			
		||||
        :returns: None if successful, otherwise return error message
 | 
			
		||||
        """
 | 
			
		||||
        host_names = self.get_unit_hostnames(sentry_units)
 | 
			
		||||
        errors = []
 | 
			
		||||
 | 
			
		||||
        # Query every unit for cluster_status running nodes
 | 
			
		||||
        for query_unit in sentry_units:
 | 
			
		||||
            query_unit_name = query_unit.info['unit_name']
 | 
			
		||||
            running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
 | 
			
		||||
 | 
			
		||||
            # Confirm that every unit is represented in the queried unit's
 | 
			
		||||
            # cluster_status running nodes output.
 | 
			
		||||
            for validate_unit in sentry_units:
 | 
			
		||||
                val_host_name = host_names[validate_unit.info['unit_name']]
 | 
			
		||||
                val_node_name = 'rabbit@{}'.format(val_host_name)
 | 
			
		||||
 | 
			
		||||
                if val_node_name not in running_nodes:
 | 
			
		||||
                    errors.append('Cluster member check failed on {}: {} not '
 | 
			
		||||
                                  'in {}\n'.format(query_unit_name,
 | 
			
		||||
                                                   val_node_name,
 | 
			
		||||
                                                   running_nodes))
 | 
			
		||||
        if errors:
 | 
			
		||||
            return ''.join(errors)
 | 
			
		||||
 | 
			
		||||
    def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
 | 
			
		||||
        """Check a single juju rmq unit for ssl and port in the config file."""
 | 
			
		||||
        host = sentry_unit.info['public-address']
 | 
			
		||||
        unit_name = sentry_unit.info['unit_name']
 | 
			
		||||
 | 
			
		||||
        conf_file = '/etc/rabbitmq/rabbitmq.config'
 | 
			
		||||
        conf_contents = str(self.file_contents_safe(sentry_unit,
 | 
			
		||||
                                                    conf_file, max_wait=16))
 | 
			
		||||
        # Checks
 | 
			
		||||
        conf_ssl = 'ssl' in conf_contents
 | 
			
		||||
        conf_port = str(port) in conf_contents
 | 
			
		||||
 | 
			
		||||
        # Port explicitly checked in config
 | 
			
		||||
        if port and conf_port and conf_ssl:
 | 
			
		||||
            self.log.debug('SSL is enabled  @{}:{} '
 | 
			
		||||
                           '({})'.format(host, port, unit_name))
 | 
			
		||||
            return True
 | 
			
		||||
        elif port and not conf_port and conf_ssl:
 | 
			
		||||
            self.log.debug('SSL is enabled @{} but not on port {} '
 | 
			
		||||
                           '({})'.format(host, port, unit_name))
 | 
			
		||||
            return False
 | 
			
		||||
        # Port not checked (useful when checking that ssl is disabled)
 | 
			
		||||
        elif not port and conf_ssl:
 | 
			
		||||
            self.log.debug('SSL is enabled  @{}:{} '
 | 
			
		||||
                           '({})'.format(host, port, unit_name))
 | 
			
		||||
            return True
 | 
			
		||||
        elif not port and not conf_ssl:
 | 
			
		||||
            self.log.debug('SSL not enabled @{}:{} '
 | 
			
		||||
                           '({})'.format(host, port, unit_name))
 | 
			
		||||
            return False
 | 
			
		||||
        else:
 | 
			
		||||
            msg = ('Unknown condition when checking SSL status @{}:{} '
 | 
			
		||||
                   '({})'.format(host, port, unit_name))
 | 
			
		||||
            amulet.raise_status(amulet.FAIL, msg)
 | 
			
		||||
 | 
			
		||||
    def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
 | 
			
		||||
        """Check that ssl is enabled on rmq juju sentry units.
 | 
			
		||||
 | 
			
		||||
        :param sentry_units: list of all rmq sentry units
 | 
			
		||||
        :param port: optional ssl port override to validate
 | 
			
		||||
        :returns: None if successful, otherwise return error message
 | 
			
		||||
        """
 | 
			
		||||
        for sentry_unit in sentry_units:
 | 
			
		||||
            if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
 | 
			
		||||
                return ('Unexpected condition:  ssl is disabled on unit '
 | 
			
		||||
                        '({})'.format(sentry_unit.info['unit_name']))
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    def validate_rmq_ssl_disabled_units(self, sentry_units):
 | 
			
		||||
        """Check that ssl is enabled on listed rmq juju sentry units.
 | 
			
		||||
 | 
			
		||||
        :param sentry_units: list of all rmq sentry units
 | 
			
		||||
        :returns: True if successful.  Raise on error.
 | 
			
		||||
        """
 | 
			
		||||
        for sentry_unit in sentry_units:
 | 
			
		||||
            if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
 | 
			
		||||
                return ('Unexpected condition:  ssl is enabled on unit '
 | 
			
		||||
                        '({})'.format(sentry_unit.info['unit_name']))
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    def configure_rmq_ssl_on(self, sentry_units, deployment,
 | 
			
		||||
                             port=None, max_wait=60):
 | 
			
		||||
        """Turn ssl charm config option on, with optional non-default
 | 
			
		||||
        ssl port specification.  Confirm that it is enabled on every
 | 
			
		||||
        unit.
 | 
			
		||||
 | 
			
		||||
        :param sentry_units: list of sentry units
 | 
			
		||||
        :param deployment: amulet deployment object pointer
 | 
			
		||||
        :param port: amqp port, use defaults if None
 | 
			
		||||
        :param max_wait: maximum time to wait in seconds to confirm
 | 
			
		||||
        :returns: None if successful.  Raise on error.
 | 
			
		||||
        """
 | 
			
		||||
        self.log.debug('Setting ssl charm config option:  on')
 | 
			
		||||
 | 
			
		||||
        # Enable RMQ SSL
 | 
			
		||||
        config = {'ssl': 'on'}
 | 
			
		||||
        if port:
 | 
			
		||||
            config['ssl_port'] = port
 | 
			
		||||
 | 
			
		||||
        deployment.configure('rabbitmq-server', config)
 | 
			
		||||
 | 
			
		||||
        # Confirm
 | 
			
		||||
        tries = 0
 | 
			
		||||
        ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
 | 
			
		||||
        while ret and tries < (max_wait / 4):
 | 
			
		||||
            time.sleep(4)
 | 
			
		||||
            self.log.debug('Attempt {}: {}'.format(tries, ret))
 | 
			
		||||
            ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
 | 
			
		||||
            tries += 1
 | 
			
		||||
 | 
			
		||||
        if ret:
 | 
			
		||||
            amulet.raise_status(amulet.FAIL, ret)
 | 
			
		||||
 | 
			
		||||
    def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
 | 
			
		||||
        """Turn ssl charm config option off, confirm that it is disabled
 | 
			
		||||
        on every unit.
 | 
			
		||||
 | 
			
		||||
        :param sentry_units: list of sentry units
 | 
			
		||||
        :param deployment: amulet deployment object pointer
 | 
			
		||||
        :param max_wait: maximum time to wait in seconds to confirm
 | 
			
		||||
        :returns: None if successful.  Raise on error.
 | 
			
		||||
        """
 | 
			
		||||
        self.log.debug('Setting ssl charm config option:  off')
 | 
			
		||||
 | 
			
		||||
        # Disable RMQ SSL
 | 
			
		||||
        config = {'ssl': 'off'}
 | 
			
		||||
        deployment.configure('rabbitmq-server', config)
 | 
			
		||||
 | 
			
		||||
        # Confirm
 | 
			
		||||
        tries = 0
 | 
			
		||||
        ret = self.validate_rmq_ssl_disabled_units(sentry_units)
 | 
			
		||||
        while ret and tries < (max_wait / 4):
 | 
			
		||||
            time.sleep(4)
 | 
			
		||||
            self.log.debug('Attempt {}: {}'.format(tries, ret))
 | 
			
		||||
            ret = self.validate_rmq_ssl_disabled_units(sentry_units)
 | 
			
		||||
            tries += 1
 | 
			
		||||
 | 
			
		||||
        if ret:
 | 
			
		||||
            amulet.raise_status(amulet.FAIL, ret)
 | 
			
		||||
 | 
			
		||||
    def connect_amqp_by_unit(self, sentry_unit, ssl=False,
 | 
			
		||||
                             port=None, fatal=True,
 | 
			
		||||
                             username="testuser1", password="changeme"):
 | 
			
		||||
        """Establish and return a pika amqp connection to the rabbitmq service
 | 
			
		||||
        running on a rmq juju unit.
 | 
			
		||||
 | 
			
		||||
        :param sentry_unit: sentry unit pointer
 | 
			
		||||
        :param ssl: boolean, default to False
 | 
			
		||||
        :param port: amqp port, use defaults if None
 | 
			
		||||
        :param fatal: boolean, default to True (raises on connect error)
 | 
			
		||||
        :param username: amqp user name, default to testuser1
 | 
			
		||||
        :param password: amqp user password
 | 
			
		||||
        :returns: pika amqp connection pointer or None if failed and non-fatal
 | 
			
		||||
        """
 | 
			
		||||
        host = sentry_unit.info['public-address']
 | 
			
		||||
        unit_name = sentry_unit.info['unit_name']
 | 
			
		||||
 | 
			
		||||
        # Default port logic if port is not specified
 | 
			
		||||
        if ssl and not port:
 | 
			
		||||
            port = 5671
 | 
			
		||||
        elif not ssl and not port:
 | 
			
		||||
            port = 5672
 | 
			
		||||
 | 
			
		||||
        self.log.debug('Connecting to amqp on {}:{} ({}) as '
 | 
			
		||||
                       '{}...'.format(host, port, unit_name, username))
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            credentials = pika.PlainCredentials(username, password)
 | 
			
		||||
            parameters = pika.ConnectionParameters(host=host, port=port,
 | 
			
		||||
                                                   credentials=credentials,
 | 
			
		||||
                                                   ssl=ssl,
 | 
			
		||||
                                                   connection_attempts=3,
 | 
			
		||||
                                                   retry_delay=5,
 | 
			
		||||
                                                   socket_timeout=1)
 | 
			
		||||
            connection = pika.BlockingConnection(parameters)
 | 
			
		||||
            assert connection.server_properties['product'] == 'RabbitMQ'
 | 
			
		||||
            self.log.debug('Connect OK')
 | 
			
		||||
            return connection
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            msg = ('amqp connection failed to {}:{} as '
 | 
			
		||||
                   '{} ({})'.format(host, port, username, str(e)))
 | 
			
		||||
            if fatal:
 | 
			
		||||
                amulet.raise_status(amulet.FAIL, msg)
 | 
			
		||||
            else:
 | 
			
		||||
                self.log.warn(msg)
 | 
			
		||||
                return None
 | 
			
		||||
 | 
			
		||||
    def publish_amqp_message_by_unit(self, sentry_unit, message,
 | 
			
		||||
                                     queue="test", ssl=False,
 | 
			
		||||
                                     username="testuser1",
 | 
			
		||||
                                     password="changeme",
 | 
			
		||||
                                     port=None):
 | 
			
		||||
        """Publish an amqp message to a rmq juju unit.
 | 
			
		||||
 | 
			
		||||
        :param sentry_unit: sentry unit pointer
 | 
			
		||||
        :param message: amqp message string
 | 
			
		||||
        :param queue: message queue, default to test
 | 
			
		||||
        :param username: amqp user name, default to testuser1
 | 
			
		||||
        :param password: amqp user password
 | 
			
		||||
        :param ssl: boolean, default to False
 | 
			
		||||
        :param port: amqp port, use defaults if None
 | 
			
		||||
        :returns: None.  Raises exception if publish failed.
 | 
			
		||||
        """
 | 
			
		||||
        self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
 | 
			
		||||
                                                                    message))
 | 
			
		||||
        connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
 | 
			
		||||
                                               port=port,
 | 
			
		||||
                                               username=username,
 | 
			
		||||
                                               password=password)
 | 
			
		||||
 | 
			
		||||
        # NOTE(beisner): extra debug here re: pika hang potential:
 | 
			
		||||
        #   https://github.com/pika/pika/issues/297
 | 
			
		||||
        #   https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
 | 
			
		||||
        self.log.debug('Defining channel...')
 | 
			
		||||
        channel = connection.channel()
 | 
			
		||||
        self.log.debug('Declaring queue...')
 | 
			
		||||
        channel.queue_declare(queue=queue, auto_delete=False, durable=True)
 | 
			
		||||
        self.log.debug('Publishing message...')
 | 
			
		||||
        channel.basic_publish(exchange='', routing_key=queue, body=message)
 | 
			
		||||
        self.log.debug('Closing channel...')
 | 
			
		||||
        channel.close()
 | 
			
		||||
        self.log.debug('Closing connection...')
 | 
			
		||||
        connection.close()
 | 
			
		||||
 | 
			
		||||
    def get_amqp_message_by_unit(self, sentry_unit, queue="test",
 | 
			
		||||
                                 username="testuser1",
 | 
			
		||||
                                 password="changeme",
 | 
			
		||||
                                 ssl=False, port=None):
 | 
			
		||||
        """Get an amqp message from a rmq juju unit.
 | 
			
		||||
 | 
			
		||||
        :param sentry_unit: sentry unit pointer
 | 
			
		||||
        :param queue: message queue, default to test
 | 
			
		||||
        :param username: amqp user name, default to testuser1
 | 
			
		||||
        :param password: amqp user password
 | 
			
		||||
        :param ssl: boolean, default to False
 | 
			
		||||
        :param port: amqp port, use defaults if None
 | 
			
		||||
        :returns: amqp message body as string.  Raise if get fails.
 | 
			
		||||
        """
 | 
			
		||||
        connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
 | 
			
		||||
                                               port=port,
 | 
			
		||||
                                               username=username,
 | 
			
		||||
                                               password=password)
 | 
			
		||||
        channel = connection.channel()
 | 
			
		||||
        method_frame, _, body = channel.basic_get(queue)
 | 
			
		||||
 | 
			
		||||
        if method_frame:
 | 
			
		||||
            self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
 | 
			
		||||
                                                                         body))
 | 
			
		||||
            channel.basic_ack(method_frame.delivery_tag)
 | 
			
		||||
            channel.close()
 | 
			
		||||
            connection.close()
 | 
			
		||||
            return body
 | 
			
		||||
        else:
 | 
			
		||||
            msg = 'No message retrieved.'
 | 
			
		||||
            amulet.raise_status(amulet.FAIL, msg)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user