[gnuoy, r=hopem] Update ceph broker code to fix Bug #1453940
This commit is contained in:
commit
b2875098fc
@ -485,13 +485,15 @@ class CephContext(OSContextGenerator):
|
||||
|
||||
log('Generating template context for ceph', level=DEBUG)
|
||||
mon_hosts = []
|
||||
auth = None
|
||||
key = None
|
||||
use_syslog = str(config('use-syslog')).lower()
|
||||
ctxt = {
|
||||
'use_syslog': str(config('use-syslog')).lower()
|
||||
}
|
||||
for rid in relation_ids('ceph'):
|
||||
for unit in related_units(rid):
|
||||
auth = relation_get('auth', rid=rid, unit=unit)
|
||||
key = relation_get('key', rid=rid, unit=unit)
|
||||
if not ctxt.get('auth'):
|
||||
ctxt['auth'] = relation_get('auth', rid=rid, unit=unit)
|
||||
if not ctxt.get('key'):
|
||||
ctxt['key'] = relation_get('key', rid=rid, unit=unit)
|
||||
ceph_pub_addr = relation_get('ceph-public-address', rid=rid,
|
||||
unit=unit)
|
||||
unit_priv_addr = relation_get('private-address', rid=rid,
|
||||
@ -500,10 +502,7 @@ class CephContext(OSContextGenerator):
|
||||
ceph_addr = format_ipv6_addr(ceph_addr) or ceph_addr
|
||||
mon_hosts.append(ceph_addr)
|
||||
|
||||
ctxt = {'mon_hosts': ' '.join(sorted(mon_hosts)),
|
||||
'auth': auth,
|
||||
'key': key,
|
||||
'use_syslog': use_syslog}
|
||||
ctxt['mon_hosts'] = ' '.join(sorted(mon_hosts))
|
||||
|
||||
if not os.path.isdir('/etc/ceph'):
|
||||
os.mkdir('/etc/ceph')
|
||||
|
@ -28,6 +28,7 @@ import os
|
||||
import shutil
|
||||
import json
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from subprocess import (
|
||||
check_call,
|
||||
@ -35,8 +36,10 @@ from subprocess import (
|
||||
CalledProcessError,
|
||||
)
|
||||
from charmhelpers.core.hookenv import (
|
||||
local_unit,
|
||||
relation_get,
|
||||
relation_ids,
|
||||
relation_set,
|
||||
related_units,
|
||||
log,
|
||||
DEBUG,
|
||||
@ -411,17 +414,52 @@ class CephBrokerRq(object):
|
||||
|
||||
The API is versioned and defaults to version 1.
|
||||
"""
|
||||
def __init__(self, api_version=1):
|
||||
def __init__(self, api_version=1, request_id=None):
|
||||
self.api_version = api_version
|
||||
if request_id:
|
||||
self.request_id = request_id
|
||||
else:
|
||||
self.request_id = str(uuid.uuid1())
|
||||
self.ops = []
|
||||
|
||||
def add_op_create_pool(self, name, replica_count=3):
|
||||
self.ops.append({'op': 'create-pool', 'name': name,
|
||||
'replicas': replica_count})
|
||||
|
||||
def set_ops(self, ops):
|
||||
"""Set request ops to provided value.
|
||||
|
||||
Useful for injecting ops that come from a previous request
|
||||
to allow comparisons to ensure validity.
|
||||
"""
|
||||
self.ops = ops
|
||||
|
||||
@property
|
||||
def request(self):
|
||||
return json.dumps({'api-version': self.api_version, 'ops': self.ops})
|
||||
return json.dumps({'api-version': self.api_version, 'ops': self.ops,
|
||||
'request-id': self.request_id})
|
||||
|
||||
def _ops_equal(self, other):
|
||||
if len(self.ops) == len(other.ops):
|
||||
for req_no in range(0, len(self.ops)):
|
||||
for key in ['replicas', 'name', 'op']:
|
||||
if self.ops[req_no][key] != other.ops[req_no][key]:
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
return True
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, self.__class__):
|
||||
return False
|
||||
if self.api_version == other.api_version and \
|
||||
self._ops_equal(other):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self.__eq__(other)
|
||||
|
||||
|
||||
class CephBrokerRsp(object):
|
||||
@ -431,10 +469,15 @@ class CephBrokerRsp(object):
|
||||
|
||||
The API is versioned and defaults to version 1.
|
||||
"""
|
||||
|
||||
def __init__(self, encoded_rsp):
|
||||
self.api_version = None
|
||||
self.rsp = json.loads(encoded_rsp)
|
||||
|
||||
@property
|
||||
def request_id(self):
|
||||
return self.rsp.get('request-id')
|
||||
|
||||
@property
|
||||
def exit_code(self):
|
||||
return self.rsp.get('exit-code')
|
||||
@ -442,3 +485,182 @@ class CephBrokerRsp(object):
|
||||
@property
|
||||
def exit_msg(self):
|
||||
return self.rsp.get('stderr')
|
||||
|
||||
|
||||
# Ceph Broker Conversation:
|
||||
# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
|
||||
# and send that request to ceph via the ceph relation. The CephBrokerRq has a
|
||||
# unique id so that the client can identity which CephBrokerRsp is associated
|
||||
# with the request. Ceph will also respond to each client unit individually
|
||||
# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
|
||||
# via key broker-rsp-glance-0
|
||||
#
|
||||
# To use this the charm can just do something like:
|
||||
#
|
||||
# from charmhelpers.contrib.storage.linux.ceph import (
|
||||
# send_request_if_needed,
|
||||
# is_request_complete,
|
||||
# CephBrokerRq,
|
||||
# )
|
||||
#
|
||||
# @hooks.hook('ceph-relation-changed')
|
||||
# def ceph_changed():
|
||||
# rq = CephBrokerRq()
|
||||
# rq.add_op_create_pool(name='poolname', replica_count=3)
|
||||
#
|
||||
# if is_request_complete(rq):
|
||||
# <Request complete actions>
|
||||
# else:
|
||||
# send_request_if_needed(get_ceph_request())
|
||||
#
|
||||
# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
|
||||
# of glance having sent a request to ceph which ceph has successfully processed
|
||||
# 'ceph:8': {
|
||||
# 'ceph/0': {
|
||||
# 'auth': 'cephx',
|
||||
# 'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
|
||||
# 'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
|
||||
# 'ceph-public-address': '10.5.44.103',
|
||||
# 'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
|
||||
# 'private-address': '10.5.44.103',
|
||||
# },
|
||||
# 'glance/0': {
|
||||
# 'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
|
||||
# '"ops": [{"replicas": 3, "name": "glance", '
|
||||
# '"op": "create-pool"}]}'),
|
||||
# 'private-address': '10.5.44.109',
|
||||
# },
|
||||
# }
|
||||
|
||||
def get_previous_request(rid):
|
||||
"""Return the last ceph broker request sent on a given relation
|
||||
|
||||
@param rid: Relation id to query for request
|
||||
"""
|
||||
request = None
|
||||
broker_req = relation_get(attribute='broker_req', rid=rid,
|
||||
unit=local_unit())
|
||||
if broker_req:
|
||||
request_data = json.loads(broker_req)
|
||||
request = CephBrokerRq(api_version=request_data['api-version'],
|
||||
request_id=request_data['request-id'])
|
||||
request.set_ops(request_data['ops'])
|
||||
|
||||
return request
|
||||
|
||||
|
||||
def get_request_states(request):
|
||||
"""Return a dict of requests per relation id with their corresponding
|
||||
completion state.
|
||||
|
||||
This allows a charm, which has a request for ceph, to see whether there is
|
||||
an equivalent request already being processed and if so what state that
|
||||
request is in.
|
||||
|
||||
@param request: A CephBrokerRq object
|
||||
"""
|
||||
complete = []
|
||||
requests = {}
|
||||
for rid in relation_ids('ceph'):
|
||||
complete = False
|
||||
previous_request = get_previous_request(rid)
|
||||
if request == previous_request:
|
||||
sent = True
|
||||
complete = is_request_complete_for_rid(previous_request, rid)
|
||||
else:
|
||||
sent = False
|
||||
complete = False
|
||||
|
||||
requests[rid] = {
|
||||
'sent': sent,
|
||||
'complete': complete,
|
||||
}
|
||||
|
||||
return requests
|
||||
|
||||
|
||||
def is_request_sent(request):
|
||||
"""Check to see if a functionally equivalent request has already been sent
|
||||
|
||||
Returns True if a similair request has been sent
|
||||
|
||||
@param request: A CephBrokerRq object
|
||||
"""
|
||||
states = get_request_states(request)
|
||||
for rid in states.keys():
|
||||
if not states[rid]['sent']:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def is_request_complete(request):
|
||||
"""Check to see if a functionally equivalent request has already been
|
||||
completed
|
||||
|
||||
Returns True if a similair request has been completed
|
||||
|
||||
@param request: A CephBrokerRq object
|
||||
"""
|
||||
states = get_request_states(request)
|
||||
for rid in states.keys():
|
||||
if not states[rid]['complete']:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def is_request_complete_for_rid(request, rid):
|
||||
"""Check if a given request has been completed on the given relation
|
||||
|
||||
@param request: A CephBrokerRq object
|
||||
@param rid: Relation ID
|
||||
"""
|
||||
broker_key = get_broker_rsp_key()
|
||||
for unit in related_units(rid):
|
||||
rdata = relation_get(rid=rid, unit=unit)
|
||||
if rdata.get(broker_key):
|
||||
rsp = CephBrokerRsp(rdata.get(broker_key))
|
||||
if rsp.request_id == request.request_id:
|
||||
if not rsp.exit_code:
|
||||
return True
|
||||
else:
|
||||
# The remote unit sent no reply targeted at this unit so either the
|
||||
# remote ceph cluster does not support unit targeted replies or it
|
||||
# has not processed our request yet.
|
||||
if rdata.get('broker_rsp'):
|
||||
request_data = json.loads(rdata['broker_rsp'])
|
||||
if request_data.get('request-id'):
|
||||
log('Ignoring legacy broker_rsp without unit key as remote '
|
||||
'service supports unit specific replies', level=DEBUG)
|
||||
else:
|
||||
log('Using legacy broker_rsp as remote service does not '
|
||||
'supports unit specific replies', level=DEBUG)
|
||||
rsp = CephBrokerRsp(rdata['broker_rsp'])
|
||||
if not rsp.exit_code:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def get_broker_rsp_key():
|
||||
"""Return broker response key for this unit
|
||||
|
||||
This is the key that ceph is going to use to pass request status
|
||||
information back to this unit
|
||||
"""
|
||||
return 'broker-rsp-' + local_unit().replace('/', '-')
|
||||
|
||||
|
||||
def send_request_if_needed(request):
|
||||
"""Send broker request if an equivalent request has not already been sent
|
||||
|
||||
@param request: A CephBrokerRq object
|
||||
"""
|
||||
if is_request_sent(request):
|
||||
log('Request already sent but not complete, not sending new request',
|
||||
level=DEBUG)
|
||||
else:
|
||||
for rid in relation_ids('ceph'):
|
||||
log('Sending request {}'.format(request.request_id), level=DEBUG)
|
||||
relation_set(relation_id=rid, broker_req=request.request)
|
||||
|
@ -29,7 +29,6 @@ from charmhelpers.core.hookenv import (
|
||||
config,
|
||||
Hooks,
|
||||
log as juju_log,
|
||||
INFO,
|
||||
ERROR,
|
||||
open_port,
|
||||
is_relation_made,
|
||||
@ -66,9 +65,10 @@ from charmhelpers.contrib.openstack.utils import (
|
||||
sync_db_with_multi_ipv6_addresses,
|
||||
)
|
||||
from charmhelpers.contrib.storage.linux.ceph import (
|
||||
send_request_if_needed,
|
||||
is_request_complete,
|
||||
ensure_ceph_keyring,
|
||||
CephBrokerRq,
|
||||
CephBrokerRsp,
|
||||
delete_keyring,
|
||||
)
|
||||
from charmhelpers.payload.execd import (
|
||||
@ -240,6 +240,14 @@ def ceph_joined():
|
||||
apt_install(['ceph-common', 'python-ceph'])
|
||||
|
||||
|
||||
def get_ceph_request():
|
||||
service = service_name()
|
||||
rq = CephBrokerRq()
|
||||
replicas = config('ceph-osd-replication-count')
|
||||
rq.add_op_create_pool(name=service, replica_count=replicas)
|
||||
return rq
|
||||
|
||||
|
||||
@hooks.hook('ceph-relation-changed')
|
||||
@restart_on_change(restart_map())
|
||||
def ceph_changed():
|
||||
@ -253,29 +261,15 @@ def ceph_changed():
|
||||
juju_log('Could not create ceph keyring: peer not ready?')
|
||||
return
|
||||
|
||||
settings = relation_get()
|
||||
if settings and 'broker_rsp' in settings:
|
||||
rsp = CephBrokerRsp(settings['broker_rsp'])
|
||||
# Non-zero return code implies failure
|
||||
if rsp.exit_code:
|
||||
juju_log("Ceph broker request failed (rc=%s, msg=%s)" %
|
||||
(rsp.exit_code, rsp.exit_msg), level=ERROR)
|
||||
return
|
||||
|
||||
juju_log("Ceph broker request succeeded (rc=%s, msg=%s)" %
|
||||
(rsp.exit_code, rsp.exit_msg), level=INFO)
|
||||
if is_request_complete(get_ceph_request()):
|
||||
juju_log('Request complete')
|
||||
CONFIGS.write(GLANCE_API_CONF)
|
||||
CONFIGS.write(ceph_config_file())
|
||||
# Ensure that glance-api is restarted since only now can we
|
||||
# guarantee that ceph resources are ready.
|
||||
service_restart('glance-api')
|
||||
else:
|
||||
rq = CephBrokerRq()
|
||||
replicas = config('ceph-osd-replication-count')
|
||||
rq.add_op_create_pool(name=service, replica_count=replicas)
|
||||
for rid in relation_ids('ceph'):
|
||||
relation_set(relation_id=rid, broker_req=rq.request)
|
||||
juju_log("Request(s) sent to Ceph broker (rid=%s)" % (rid))
|
||||
send_request_if_needed(get_ceph_request())
|
||||
|
||||
|
||||
@hooks.hook('ceph-relation-broken')
|
||||
|
@ -10,7 +10,8 @@
|
||||
keyring = /etc/ceph/ceph.$name.keyring
|
||||
mon host = {{ mon_hosts }}
|
||||
{% endif -%}
|
||||
{% if use_syslog -%}
|
||||
log to syslog = {{ use_syslog }}
|
||||
err to syslog = {{ use_syslog }}
|
||||
clog to syslog = {{ use_syslog }}
|
||||
|
||||
{% endif -%}
|
||||
|
@ -19,9 +19,11 @@ import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import amulet
|
||||
import distro_info
|
||||
@ -114,7 +116,7 @@ class AmuletUtils(object):
|
||||
# /!\ DEPRECATION WARNING (beisner):
|
||||
# New and existing tests should be rewritten to use
|
||||
# validate_services_by_name() as it is aware of init systems.
|
||||
self.log.warn('/!\\ DEPRECATION WARNING: use '
|
||||
self.log.warn('DEPRECATION WARNING: use '
|
||||
'validate_services_by_name instead of validate_services '
|
||||
'due to init system differences.')
|
||||
|
||||
@ -269,33 +271,52 @@ class AmuletUtils(object):
|
||||
"""Get last modification time of directory."""
|
||||
return sentry_unit.directory_stat(directory)['mtime']
|
||||
|
||||
def _get_proc_start_time(self, sentry_unit, service, pgrep_full=False):
|
||||
"""Get process' start time.
|
||||
def _get_proc_start_time(self, sentry_unit, service, pgrep_full=None):
|
||||
"""Get start time of a process based on the last modification time
|
||||
of the /proc/pid directory.
|
||||
|
||||
Determine start time of the process based on the last modification
|
||||
time of the /proc/pid directory. If pgrep_full is True, the process
|
||||
name is matched against the full command line.
|
||||
"""
|
||||
if pgrep_full:
|
||||
cmd = 'pgrep -o -f {}'.format(service)
|
||||
else:
|
||||
cmd = 'pgrep -o {}'.format(service)
|
||||
cmd = cmd + ' | grep -v pgrep || exit 0'
|
||||
cmd_out = sentry_unit.run(cmd)
|
||||
self.log.debug('CMDout: ' + str(cmd_out))
|
||||
if cmd_out[0]:
|
||||
self.log.debug('Pid for %s %s' % (service, str(cmd_out[0])))
|
||||
proc_dir = '/proc/{}'.format(cmd_out[0].strip())
|
||||
return self._get_dir_mtime(sentry_unit, proc_dir)
|
||||
:sentry_unit: The sentry unit to check for the service on
|
||||
:service: service name to look for in process table
|
||||
:pgrep_full: [Deprecated] Use full command line search mode with pgrep
|
||||
:returns: epoch time of service process start
|
||||
:param commands: list of bash commands
|
||||
:param sentry_units: list of sentry unit pointers
|
||||
:returns: None if successful; Failure message otherwise
|
||||
"""
|
||||
if pgrep_full is not None:
|
||||
# /!\ DEPRECATION WARNING (beisner):
|
||||
# No longer implemented, as pidof is now used instead of pgrep.
|
||||
# https://bugs.launchpad.net/charm-helpers/+bug/1474030
|
||||
self.log.warn('DEPRECATION WARNING: pgrep_full bool is no '
|
||||
'longer implemented re: lp 1474030.')
|
||||
|
||||
pid_list = self.get_process_id_list(sentry_unit, service)
|
||||
pid = pid_list[0]
|
||||
proc_dir = '/proc/{}'.format(pid)
|
||||
self.log.debug('Pid for {} on {}: {}'.format(
|
||||
service, sentry_unit.info['unit_name'], pid))
|
||||
|
||||
return self._get_dir_mtime(sentry_unit, proc_dir)
|
||||
|
||||
def service_restarted(self, sentry_unit, service, filename,
|
||||
pgrep_full=False, sleep_time=20):
|
||||
pgrep_full=None, sleep_time=20):
|
||||
"""Check if service was restarted.
|
||||
|
||||
Compare a service's start time vs a file's last modification time
|
||||
(such as a config file for that service) to determine if the service
|
||||
has been restarted.
|
||||
"""
|
||||
# /!\ DEPRECATION WARNING (beisner):
|
||||
# This method is prone to races in that no before-time is known.
|
||||
# Use validate_service_config_changed instead.
|
||||
|
||||
# NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
|
||||
# used instead of pgrep. pgrep_full is still passed through to ensure
|
||||
# deprecation WARNS. lp1474030
|
||||
self.log.warn('DEPRECATION WARNING: use '
|
||||
'validate_service_config_changed instead of '
|
||||
'service_restarted due to known races.')
|
||||
|
||||
time.sleep(sleep_time)
|
||||
if (self._get_proc_start_time(sentry_unit, service, pgrep_full) >=
|
||||
self._get_file_mtime(sentry_unit, filename)):
|
||||
@ -304,15 +325,15 @@ class AmuletUtils(object):
|
||||
return False
|
||||
|
||||
def service_restarted_since(self, sentry_unit, mtime, service,
|
||||
pgrep_full=False, sleep_time=20,
|
||||
retry_count=2):
|
||||
pgrep_full=None, sleep_time=20,
|
||||
retry_count=2, retry_sleep_time=30):
|
||||
"""Check if service was been started after a given time.
|
||||
|
||||
Args:
|
||||
sentry_unit (sentry): The sentry unit to check for the service on
|
||||
mtime (float): The epoch time to check against
|
||||
service (string): service name to look for in process table
|
||||
pgrep_full (boolean): Use full command line search mode with pgrep
|
||||
pgrep_full: [Deprecated] Use full command line search mode with pgrep
|
||||
sleep_time (int): Seconds to sleep before looking for process
|
||||
retry_count (int): If service is not found, how many times to retry
|
||||
|
||||
@ -321,30 +342,44 @@ class AmuletUtils(object):
|
||||
False if service is older than mtime or if service was
|
||||
not found.
|
||||
"""
|
||||
self.log.debug('Checking %s restarted since %s' % (service, mtime))
|
||||
# NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
|
||||
# used instead of pgrep. pgrep_full is still passed through to ensure
|
||||
# deprecation WARNS. lp1474030
|
||||
|
||||
unit_name = sentry_unit.info['unit_name']
|
||||
self.log.debug('Checking that %s service restarted since %s on '
|
||||
'%s' % (service, mtime, unit_name))
|
||||
time.sleep(sleep_time)
|
||||
proc_start_time = self._get_proc_start_time(sentry_unit, service,
|
||||
pgrep_full)
|
||||
while retry_count > 0 and not proc_start_time:
|
||||
self.log.debug('No pid file found for service %s, will retry %i '
|
||||
'more times' % (service, retry_count))
|
||||
time.sleep(30)
|
||||
proc_start_time = self._get_proc_start_time(sentry_unit, service,
|
||||
pgrep_full)
|
||||
retry_count = retry_count - 1
|
||||
proc_start_time = None
|
||||
tries = 0
|
||||
while tries <= retry_count and not proc_start_time:
|
||||
try:
|
||||
proc_start_time = self._get_proc_start_time(sentry_unit,
|
||||
service,
|
||||
pgrep_full)
|
||||
self.log.debug('Attempt {} to get {} proc start time on {} '
|
||||
'OK'.format(tries, service, unit_name))
|
||||
except IOError:
|
||||
# NOTE(beisner) - race avoidance, proc may not exist yet.
|
||||
# https://bugs.launchpad.net/charm-helpers/+bug/1474030
|
||||
self.log.debug('Attempt {} to get {} proc start time on {} '
|
||||
'failed'.format(tries, service, unit_name))
|
||||
time.sleep(retry_sleep_time)
|
||||
tries += 1
|
||||
|
||||
if not proc_start_time:
|
||||
self.log.warn('No proc start time found, assuming service did '
|
||||
'not start')
|
||||
return False
|
||||
if proc_start_time >= mtime:
|
||||
self.log.debug('proc start time is newer than provided mtime'
|
||||
'(%s >= %s)' % (proc_start_time, mtime))
|
||||
self.log.debug('Proc start time is newer than provided mtime'
|
||||
'(%s >= %s) on %s (OK)' % (proc_start_time,
|
||||
mtime, unit_name))
|
||||
return True
|
||||
else:
|
||||
self.log.warn('proc start time (%s) is older than provided mtime '
|
||||
'(%s), service did not restart' % (proc_start_time,
|
||||
mtime))
|
||||
self.log.warn('Proc start time (%s) is older than provided mtime '
|
||||
'(%s) on %s, service did not '
|
||||
'restart' % (proc_start_time, mtime, unit_name))
|
||||
return False
|
||||
|
||||
def config_updated_since(self, sentry_unit, filename, mtime,
|
||||
@ -374,8 +409,9 @@ class AmuletUtils(object):
|
||||
return False
|
||||
|
||||
def validate_service_config_changed(self, sentry_unit, mtime, service,
|
||||
filename, pgrep_full=False,
|
||||
sleep_time=20, retry_count=2):
|
||||
filename, pgrep_full=None,
|
||||
sleep_time=20, retry_count=2,
|
||||
retry_sleep_time=30):
|
||||
"""Check service and file were updated after mtime
|
||||
|
||||
Args:
|
||||
@ -383,9 +419,10 @@ class AmuletUtils(object):
|
||||
mtime (float): The epoch time to check against
|
||||
service (string): service name to look for in process table
|
||||
filename (string): The file to check mtime of
|
||||
pgrep_full (boolean): Use full command line search mode with pgrep
|
||||
sleep_time (int): Seconds to sleep before looking for process
|
||||
pgrep_full: [Deprecated] Use full command line search mode with pgrep
|
||||
sleep_time (int): Initial sleep in seconds to pass to test helpers
|
||||
retry_count (int): If service is not found, how many times to retry
|
||||
retry_sleep_time (int): Time in seconds to wait between retries
|
||||
|
||||
Typical Usage:
|
||||
u = OpenStackAmuletUtils(ERROR)
|
||||
@ -402,15 +439,25 @@ class AmuletUtils(object):
|
||||
mtime, False if service is older than mtime or if service was
|
||||
not found or if filename was modified before mtime.
|
||||
"""
|
||||
self.log.debug('Checking %s restarted since %s' % (service, mtime))
|
||||
time.sleep(sleep_time)
|
||||
service_restart = self.service_restarted_since(sentry_unit, mtime,
|
||||
service,
|
||||
pgrep_full=pgrep_full,
|
||||
sleep_time=0,
|
||||
retry_count=retry_count)
|
||||
config_update = self.config_updated_since(sentry_unit, filename, mtime,
|
||||
sleep_time=0)
|
||||
|
||||
# NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
|
||||
# used instead of pgrep. pgrep_full is still passed through to ensure
|
||||
# deprecation WARNS. lp1474030
|
||||
|
||||
service_restart = self.service_restarted_since(
|
||||
sentry_unit, mtime,
|
||||
service,
|
||||
pgrep_full=pgrep_full,
|
||||
sleep_time=sleep_time,
|
||||
retry_count=retry_count,
|
||||
retry_sleep_time=retry_sleep_time)
|
||||
|
||||
config_update = self.config_updated_since(
|
||||
sentry_unit,
|
||||
filename,
|
||||
mtime,
|
||||
sleep_time=0)
|
||||
|
||||
return service_restart and config_update
|
||||
|
||||
def get_sentry_time(self, sentry_unit):
|
||||
@ -428,7 +475,6 @@ class AmuletUtils(object):
|
||||
"""Return a list of all Ubuntu releases in order of release."""
|
||||
_d = distro_info.UbuntuDistroInfo()
|
||||
_release_list = _d.all
|
||||
self.log.debug('Ubuntu release list: {}'.format(_release_list))
|
||||
return _release_list
|
||||
|
||||
def file_to_url(self, file_rel_path):
|
||||
@ -568,6 +614,142 @@ class AmuletUtils(object):
|
||||
|
||||
return None
|
||||
|
||||
def validate_sectionless_conf(self, file_contents, expected):
|
||||
"""A crude conf parser. Useful to inspect configuration files which
|
||||
do not have section headers (as would be necessary in order to use
|
||||
the configparser). Such as openstack-dashboard or rabbitmq confs."""
|
||||
for line in file_contents.split('\n'):
|
||||
if '=' in line:
|
||||
args = line.split('=')
|
||||
if len(args) <= 1:
|
||||
continue
|
||||
key = args[0].strip()
|
||||
value = args[1].strip()
|
||||
if key in expected.keys():
|
||||
if expected[key] != value:
|
||||
msg = ('Config mismatch. Expected, actual: {}, '
|
||||
'{}'.format(expected[key], value))
|
||||
amulet.raise_status(amulet.FAIL, msg=msg)
|
||||
|
||||
def get_unit_hostnames(self, units):
|
||||
"""Return a dict of juju unit names to hostnames."""
|
||||
host_names = {}
|
||||
for unit in units:
|
||||
host_names[unit.info['unit_name']] = \
|
||||
str(unit.file_contents('/etc/hostname').strip())
|
||||
self.log.debug('Unit host names: {}'.format(host_names))
|
||||
return host_names
|
||||
|
||||
def run_cmd_unit(self, sentry_unit, cmd):
|
||||
"""Run a command on a unit, return the output and exit code."""
|
||||
output, code = sentry_unit.run(cmd)
|
||||
if code == 0:
|
||||
self.log.debug('{} `{}` command returned {} '
|
||||
'(OK)'.format(sentry_unit.info['unit_name'],
|
||||
cmd, code))
|
||||
else:
|
||||
msg = ('{} `{}` command returned {} '
|
||||
'{}'.format(sentry_unit.info['unit_name'],
|
||||
cmd, code, output))
|
||||
amulet.raise_status(amulet.FAIL, msg=msg)
|
||||
return str(output), code
|
||||
|
||||
def file_exists_on_unit(self, sentry_unit, file_name):
|
||||
"""Check if a file exists on a unit."""
|
||||
try:
|
||||
sentry_unit.file_stat(file_name)
|
||||
return True
|
||||
except IOError:
|
||||
return False
|
||||
except Exception as e:
|
||||
msg = 'Error checking file {}: {}'.format(file_name, e)
|
||||
amulet.raise_status(amulet.FAIL, msg=msg)
|
||||
|
||||
def file_contents_safe(self, sentry_unit, file_name,
|
||||
max_wait=60, fatal=False):
|
||||
"""Get file contents from a sentry unit. Wrap amulet file_contents
|
||||
with retry logic to address races where a file checks as existing,
|
||||
but no longer exists by the time file_contents is called.
|
||||
Return None if file not found. Optionally raise if fatal is True."""
|
||||
unit_name = sentry_unit.info['unit_name']
|
||||
file_contents = False
|
||||
tries = 0
|
||||
while not file_contents and tries < (max_wait / 4):
|
||||
try:
|
||||
file_contents = sentry_unit.file_contents(file_name)
|
||||
except IOError:
|
||||
self.log.debug('Attempt {} to open file {} from {} '
|
||||
'failed'.format(tries, file_name,
|
||||
unit_name))
|
||||
time.sleep(4)
|
||||
tries += 1
|
||||
|
||||
if file_contents:
|
||||
return file_contents
|
||||
elif not fatal:
|
||||
return None
|
||||
elif fatal:
|
||||
msg = 'Failed to get file contents from unit.'
|
||||
amulet.raise_status(amulet.FAIL, msg)
|
||||
|
||||
def port_knock_tcp(self, host="localhost", port=22, timeout=15):
|
||||
"""Open a TCP socket to check for a listening sevice on a host.
|
||||
|
||||
:param host: host name or IP address, default to localhost
|
||||
:param port: TCP port number, default to 22
|
||||
:param timeout: Connect timeout, default to 15 seconds
|
||||
:returns: True if successful, False if connect failed
|
||||
"""
|
||||
|
||||
# Resolve host name if possible
|
||||
try:
|
||||
connect_host = socket.gethostbyname(host)
|
||||
host_human = "{} ({})".format(connect_host, host)
|
||||
except socket.error as e:
|
||||
self.log.warn('Unable to resolve address: '
|
||||
'{} ({}) Trying anyway!'.format(host, e))
|
||||
connect_host = host
|
||||
host_human = connect_host
|
||||
|
||||
# Attempt socket connection
|
||||
try:
|
||||
knock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
knock.settimeout(timeout)
|
||||
knock.connect((connect_host, port))
|
||||
knock.close()
|
||||
self.log.debug('Socket connect OK for host '
|
||||
'{} on port {}.'.format(host_human, port))
|
||||
return True
|
||||
except socket.error as e:
|
||||
self.log.debug('Socket connect FAIL for'
|
||||
' {} port {} ({})'.format(host_human, port, e))
|
||||
return False
|
||||
|
||||
def port_knock_units(self, sentry_units, port=22,
|
||||
timeout=15, expect_success=True):
|
||||
"""Open a TCP socket to check for a listening sevice on each
|
||||
listed juju unit.
|
||||
|
||||
:param sentry_units: list of sentry unit pointers
|
||||
:param port: TCP port number, default to 22
|
||||
:param timeout: Connect timeout, default to 15 seconds
|
||||
:expect_success: True by default, set False to invert logic
|
||||
:returns: None if successful, Failure message otherwise
|
||||
"""
|
||||
for unit in sentry_units:
|
||||
host = unit.info['public-address']
|
||||
connected = self.port_knock_tcp(host, port, timeout)
|
||||
if not connected and expect_success:
|
||||
return 'Socket connect failed.'
|
||||
elif connected and not expect_success:
|
||||
return 'Socket connected unexpectedly.'
|
||||
|
||||
def get_uuid_epoch_stamp(self):
|
||||
"""Returns a stamp string based on uuid4 and epoch time. Useful in
|
||||
generating test messages which need to be unique-ish."""
|
||||
return '[{}-{}]'.format(uuid.uuid4(), time.time())
|
||||
|
||||
# amulet juju action helpers:
|
||||
def run_action(self, unit_sentry, action,
|
||||
_check_output=subprocess.check_output):
|
||||
"""Run the named action on a given unit sentry.
|
||||
|
@ -44,8 +44,15 @@ class OpenStackAmuletDeployment(AmuletDeployment):
|
||||
Determine if the local branch being tested is derived from its
|
||||
stable or next (dev) branch, and based on this, use the corresonding
|
||||
stable or next branches for the other_services."""
|
||||
|
||||
# Charms outside the lp:~openstack-charmers namespace
|
||||
base_charms = ['mysql', 'mongodb', 'nrpe']
|
||||
|
||||
# Force these charms to current series even when using an older series.
|
||||
# ie. Use trusty/nrpe even when series is precise, as the P charm
|
||||
# does not possess the necessary external master config and hooks.
|
||||
force_series_current = ['nrpe']
|
||||
|
||||
if self.series in ['precise', 'trusty']:
|
||||
base_series = self.series
|
||||
else:
|
||||
@ -53,11 +60,17 @@ class OpenStackAmuletDeployment(AmuletDeployment):
|
||||
|
||||
if self.stable:
|
||||
for svc in other_services:
|
||||
if svc['name'] in force_series_current:
|
||||
base_series = self.current_next
|
||||
|
||||
temp = 'lp:charms/{}/{}'
|
||||
svc['location'] = temp.format(base_series,
|
||||
svc['name'])
|
||||
else:
|
||||
for svc in other_services:
|
||||
if svc['name'] in force_series_current:
|
||||
base_series = self.current_next
|
||||
|
||||
if svc['name'] in base_charms:
|
||||
temp = 'lp:charms/{}/{}'
|
||||
svc['location'] = temp.format(base_series,
|
||||
@ -77,21 +90,23 @@ class OpenStackAmuletDeployment(AmuletDeployment):
|
||||
|
||||
services = other_services
|
||||
services.append(this_service)
|
||||
|
||||
# Charms which should use the source config option
|
||||
use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph',
|
||||
'ceph-osd', 'ceph-radosgw']
|
||||
# Most OpenStack subordinate charms do not expose an origin option
|
||||
# as that is controlled by the principle.
|
||||
ignore = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
|
||||
|
||||
# Charms which can not use openstack-origin, ie. many subordinates
|
||||
no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
|
||||
|
||||
if self.openstack:
|
||||
for svc in services:
|
||||
if svc['name'] not in use_source + ignore:
|
||||
if svc['name'] not in use_source + no_origin:
|
||||
config = {'openstack-origin': self.openstack}
|
||||
self.d.configure(svc['name'], config)
|
||||
|
||||
if self.source:
|
||||
for svc in services:
|
||||
if svc['name'] in use_source and svc['name'] not in ignore:
|
||||
if svc['name'] in use_source and svc['name'] not in no_origin:
|
||||
config = {'source': self.source}
|
||||
self.d.configure(svc['name'], config)
|
||||
|
||||
|
@ -27,6 +27,7 @@ import glanceclient.v1.client as glance_client
|
||||
import heatclient.v1.client as heat_client
|
||||
import keystoneclient.v2_0 as keystone_client
|
||||
import novaclient.v1_1.client as nova_client
|
||||
import pika
|
||||
import swiftclient
|
||||
|
||||
from charmhelpers.contrib.amulet.utils import (
|
||||
@ -602,3 +603,361 @@ class OpenStackAmuletUtils(AmuletUtils):
|
||||
self.log.debug('Ceph {} samples (OK): '
|
||||
'{}'.format(sample_type, samples))
|
||||
return None
|
||||
|
||||
# rabbitmq/amqp specific helpers:
|
||||
def add_rmq_test_user(self, sentry_units,
|
||||
username="testuser1", password="changeme"):
|
||||
"""Add a test user via the first rmq juju unit, check connection as
|
||||
the new user against all sentry units.
|
||||
|
||||
:param sentry_units: list of sentry unit pointers
|
||||
:param username: amqp user name, default to testuser1
|
||||
:param password: amqp user password
|
||||
:returns: None if successful. Raise on error.
|
||||
"""
|
||||
self.log.debug('Adding rmq user ({})...'.format(username))
|
||||
|
||||
# Check that user does not already exist
|
||||
cmd_user_list = 'rabbitmqctl list_users'
|
||||
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
|
||||
if username in output:
|
||||
self.log.warning('User ({}) already exists, returning '
|
||||
'gracefully.'.format(username))
|
||||
return
|
||||
|
||||
perms = '".*" ".*" ".*"'
|
||||
cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
|
||||
'rabbitmqctl set_permissions {} {}'.format(username, perms)]
|
||||
|
||||
# Add user via first unit
|
||||
for cmd in cmds:
|
||||
output, _ = self.run_cmd_unit(sentry_units[0], cmd)
|
||||
|
||||
# Check connection against the other sentry_units
|
||||
self.log.debug('Checking user connect against units...')
|
||||
for sentry_unit in sentry_units:
|
||||
connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
|
||||
username=username,
|
||||
password=password)
|
||||
connection.close()
|
||||
|
||||
def delete_rmq_test_user(self, sentry_units, username="testuser1"):
|
||||
"""Delete a rabbitmq user via the first rmq juju unit.
|
||||
|
||||
:param sentry_units: list of sentry unit pointers
|
||||
:param username: amqp user name, default to testuser1
|
||||
:param password: amqp user password
|
||||
:returns: None if successful or no such user.
|
||||
"""
|
||||
self.log.debug('Deleting rmq user ({})...'.format(username))
|
||||
|
||||
# Check that the user exists
|
||||
cmd_user_list = 'rabbitmqctl list_users'
|
||||
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
|
||||
|
||||
if username not in output:
|
||||
self.log.warning('User ({}) does not exist, returning '
|
||||
'gracefully.'.format(username))
|
||||
return
|
||||
|
||||
# Delete the user
|
||||
cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
|
||||
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
|
||||
|
||||
def get_rmq_cluster_status(self, sentry_unit):
|
||||
"""Execute rabbitmq cluster status command on a unit and return
|
||||
the full output.
|
||||
|
||||
:param unit: sentry unit
|
||||
:returns: String containing console output of cluster status command
|
||||
"""
|
||||
cmd = 'rabbitmqctl cluster_status'
|
||||
output, _ = self.run_cmd_unit(sentry_unit, cmd)
|
||||
self.log.debug('{} cluster_status:\n{}'.format(
|
||||
sentry_unit.info['unit_name'], output))
|
||||
return str(output)
|
||||
|
||||
def get_rmq_cluster_running_nodes(self, sentry_unit):
|
||||
"""Parse rabbitmqctl cluster_status output string, return list of
|
||||
running rabbitmq cluster nodes.
|
||||
|
||||
:param unit: sentry unit
|
||||
:returns: List containing node names of running nodes
|
||||
"""
|
||||
# NOTE(beisner): rabbitmqctl cluster_status output is not
|
||||
# json-parsable, do string chop foo, then json.loads that.
|
||||
str_stat = self.get_rmq_cluster_status(sentry_unit)
|
||||
if 'running_nodes' in str_stat:
|
||||
pos_start = str_stat.find("{running_nodes,") + 15
|
||||
pos_end = str_stat.find("]},", pos_start) + 1
|
||||
str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
|
||||
run_nodes = json.loads(str_run_nodes)
|
||||
return run_nodes
|
||||
else:
|
||||
return []
|
||||
|
||||
def validate_rmq_cluster_running_nodes(self, sentry_units):
|
||||
"""Check that all rmq unit hostnames are represented in the
|
||||
cluster_status output of all units.
|
||||
|
||||
:param host_names: dict of juju unit names to host names
|
||||
:param units: list of sentry unit pointers (all rmq units)
|
||||
:returns: None if successful, otherwise return error message
|
||||
"""
|
||||
host_names = self.get_unit_hostnames(sentry_units)
|
||||
errors = []
|
||||
|
||||
# Query every unit for cluster_status running nodes
|
||||
for query_unit in sentry_units:
|
||||
query_unit_name = query_unit.info['unit_name']
|
||||
running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
|
||||
|
||||
# Confirm that every unit is represented in the queried unit's
|
||||
# cluster_status running nodes output.
|
||||
for validate_unit in sentry_units:
|
||||
val_host_name = host_names[validate_unit.info['unit_name']]
|
||||
val_node_name = 'rabbit@{}'.format(val_host_name)
|
||||
|
||||
if val_node_name not in running_nodes:
|
||||
errors.append('Cluster member check failed on {}: {} not '
|
||||
'in {}\n'.format(query_unit_name,
|
||||
val_node_name,
|
||||
running_nodes))
|
||||
if errors:
|
||||
return ''.join(errors)
|
||||
|
||||
def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
|
||||
"""Check a single juju rmq unit for ssl and port in the config file."""
|
||||
host = sentry_unit.info['public-address']
|
||||
unit_name = sentry_unit.info['unit_name']
|
||||
|
||||
conf_file = '/etc/rabbitmq/rabbitmq.config'
|
||||
conf_contents = str(self.file_contents_safe(sentry_unit,
|
||||
conf_file, max_wait=16))
|
||||
# Checks
|
||||
conf_ssl = 'ssl' in conf_contents
|
||||
conf_port = str(port) in conf_contents
|
||||
|
||||
# Port explicitly checked in config
|
||||
if port and conf_port and conf_ssl:
|
||||
self.log.debug('SSL is enabled @{}:{} '
|
||||
'({})'.format(host, port, unit_name))
|
||||
return True
|
||||
elif port and not conf_port and conf_ssl:
|
||||
self.log.debug('SSL is enabled @{} but not on port {} '
|
||||
'({})'.format(host, port, unit_name))
|
||||
return False
|
||||
# Port not checked (useful when checking that ssl is disabled)
|
||||
elif not port and conf_ssl:
|
||||
self.log.debug('SSL is enabled @{}:{} '
|
||||
'({})'.format(host, port, unit_name))
|
||||
return True
|
||||
elif not port and not conf_ssl:
|
||||
self.log.debug('SSL not enabled @{}:{} '
|
||||
'({})'.format(host, port, unit_name))
|
||||
return False
|
||||
else:
|
||||
msg = ('Unknown condition when checking SSL status @{}:{} '
|
||||
'({})'.format(host, port, unit_name))
|
||||
amulet.raise_status(amulet.FAIL, msg)
|
||||
|
||||
def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
|
||||
"""Check that ssl is enabled on rmq juju sentry units.
|
||||
|
||||
:param sentry_units: list of all rmq sentry units
|
||||
:param port: optional ssl port override to validate
|
||||
:returns: None if successful, otherwise return error message
|
||||
"""
|
||||
for sentry_unit in sentry_units:
|
||||
if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
|
||||
return ('Unexpected condition: ssl is disabled on unit '
|
||||
'({})'.format(sentry_unit.info['unit_name']))
|
||||
return None
|
||||
|
||||
def validate_rmq_ssl_disabled_units(self, sentry_units):
|
||||
"""Check that ssl is enabled on listed rmq juju sentry units.
|
||||
|
||||
:param sentry_units: list of all rmq sentry units
|
||||
:returns: True if successful. Raise on error.
|
||||
"""
|
||||
for sentry_unit in sentry_units:
|
||||
if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
|
||||
return ('Unexpected condition: ssl is enabled on unit '
|
||||
'({})'.format(sentry_unit.info['unit_name']))
|
||||
return None
|
||||
|
||||
def configure_rmq_ssl_on(self, sentry_units, deployment,
|
||||
port=None, max_wait=60):
|
||||
"""Turn ssl charm config option on, with optional non-default
|
||||
ssl port specification. Confirm that it is enabled on every
|
||||
unit.
|
||||
|
||||
:param sentry_units: list of sentry units
|
||||
:param deployment: amulet deployment object pointer
|
||||
:param port: amqp port, use defaults if None
|
||||
:param max_wait: maximum time to wait in seconds to confirm
|
||||
:returns: None if successful. Raise on error.
|
||||
"""
|
||||
self.log.debug('Setting ssl charm config option: on')
|
||||
|
||||
# Enable RMQ SSL
|
||||
config = {'ssl': 'on'}
|
||||
if port:
|
||||
config['ssl_port'] = port
|
||||
|
||||
deployment.configure('rabbitmq-server', config)
|
||||
|
||||
# Confirm
|
||||
tries = 0
|
||||
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
|
||||
while ret and tries < (max_wait / 4):
|
||||
time.sleep(4)
|
||||
self.log.debug('Attempt {}: {}'.format(tries, ret))
|
||||
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
|
||||
tries += 1
|
||||
|
||||
if ret:
|
||||
amulet.raise_status(amulet.FAIL, ret)
|
||||
|
||||
def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
|
||||
"""Turn ssl charm config option off, confirm that it is disabled
|
||||
on every unit.
|
||||
|
||||
:param sentry_units: list of sentry units
|
||||
:param deployment: amulet deployment object pointer
|
||||
:param max_wait: maximum time to wait in seconds to confirm
|
||||
:returns: None if successful. Raise on error.
|
||||
"""
|
||||
self.log.debug('Setting ssl charm config option: off')
|
||||
|
||||
# Disable RMQ SSL
|
||||
config = {'ssl': 'off'}
|
||||
deployment.configure('rabbitmq-server', config)
|
||||
|
||||
# Confirm
|
||||
tries = 0
|
||||
ret = self.validate_rmq_ssl_disabled_units(sentry_units)
|
||||
while ret and tries < (max_wait / 4):
|
||||
time.sleep(4)
|
||||
self.log.debug('Attempt {}: {}'.format(tries, ret))
|
||||
ret = self.validate_rmq_ssl_disabled_units(sentry_units)
|
||||
tries += 1
|
||||
|
||||
if ret:
|
||||
amulet.raise_status(amulet.FAIL, ret)
|
||||
|
||||
def connect_amqp_by_unit(self, sentry_unit, ssl=False,
|
||||
port=None, fatal=True,
|
||||
username="testuser1", password="changeme"):
|
||||
"""Establish and return a pika amqp connection to the rabbitmq service
|
||||
running on a rmq juju unit.
|
||||
|
||||
:param sentry_unit: sentry unit pointer
|
||||
:param ssl: boolean, default to False
|
||||
:param port: amqp port, use defaults if None
|
||||
:param fatal: boolean, default to True (raises on connect error)
|
||||
:param username: amqp user name, default to testuser1
|
||||
:param password: amqp user password
|
||||
:returns: pika amqp connection pointer or None if failed and non-fatal
|
||||
"""
|
||||
host = sentry_unit.info['public-address']
|
||||
unit_name = sentry_unit.info['unit_name']
|
||||
|
||||
# Default port logic if port is not specified
|
||||
if ssl and not port:
|
||||
port = 5671
|
||||
elif not ssl and not port:
|
||||
port = 5672
|
||||
|
||||
self.log.debug('Connecting to amqp on {}:{} ({}) as '
|
||||
'{}...'.format(host, port, unit_name, username))
|
||||
|
||||
try:
|
||||
credentials = pika.PlainCredentials(username, password)
|
||||
parameters = pika.ConnectionParameters(host=host, port=port,
|
||||
credentials=credentials,
|
||||
ssl=ssl,
|
||||
connection_attempts=3,
|
||||
retry_delay=5,
|
||||
socket_timeout=1)
|
||||
connection = pika.BlockingConnection(parameters)
|
||||
assert connection.server_properties['product'] == 'RabbitMQ'
|
||||
self.log.debug('Connect OK')
|
||||
return connection
|
||||
except Exception as e:
|
||||
msg = ('amqp connection failed to {}:{} as '
|
||||
'{} ({})'.format(host, port, username, str(e)))
|
||||
if fatal:
|
||||
amulet.raise_status(amulet.FAIL, msg)
|
||||
else:
|
||||
self.log.warn(msg)
|
||||
return None
|
||||
|
||||
def publish_amqp_message_by_unit(self, sentry_unit, message,
|
||||
queue="test", ssl=False,
|
||||
username="testuser1",
|
||||
password="changeme",
|
||||
port=None):
|
||||
"""Publish an amqp message to a rmq juju unit.
|
||||
|
||||
:param sentry_unit: sentry unit pointer
|
||||
:param message: amqp message string
|
||||
:param queue: message queue, default to test
|
||||
:param username: amqp user name, default to testuser1
|
||||
:param password: amqp user password
|
||||
:param ssl: boolean, default to False
|
||||
:param port: amqp port, use defaults if None
|
||||
:returns: None. Raises exception if publish failed.
|
||||
"""
|
||||
self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
|
||||
message))
|
||||
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
|
||||
port=port,
|
||||
username=username,
|
||||
password=password)
|
||||
|
||||
# NOTE(beisner): extra debug here re: pika hang potential:
|
||||
# https://github.com/pika/pika/issues/297
|
||||
# https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
|
||||
self.log.debug('Defining channel...')
|
||||
channel = connection.channel()
|
||||
self.log.debug('Declaring queue...')
|
||||
channel.queue_declare(queue=queue, auto_delete=False, durable=True)
|
||||
self.log.debug('Publishing message...')
|
||||
channel.basic_publish(exchange='', routing_key=queue, body=message)
|
||||
self.log.debug('Closing channel...')
|
||||
channel.close()
|
||||
self.log.debug('Closing connection...')
|
||||
connection.close()
|
||||
|
||||
def get_amqp_message_by_unit(self, sentry_unit, queue="test",
|
||||
username="testuser1",
|
||||
password="changeme",
|
||||
ssl=False, port=None):
|
||||
"""Get an amqp message from a rmq juju unit.
|
||||
|
||||
:param sentry_unit: sentry unit pointer
|
||||
:param queue: message queue, default to test
|
||||
:param username: amqp user name, default to testuser1
|
||||
:param password: amqp user password
|
||||
:param ssl: boolean, default to False
|
||||
:param port: amqp port, use defaults if None
|
||||
:returns: amqp message body as string. Raise if get fails.
|
||||
"""
|
||||
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
|
||||
port=port,
|
||||
username=username,
|
||||
password=password)
|
||||
channel = connection.channel()
|
||||
method_frame, _, body = channel.basic_get(queue)
|
||||
|
||||
if method_frame:
|
||||
self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
|
||||
body))
|
||||
channel.basic_ack(method_frame.delivery_tag)
|
||||
channel.close()
|
||||
connection.close()
|
||||
return body
|
||||
else:
|
||||
msg = 'No message retrieved.'
|
||||
amulet.raise_status(amulet.FAIL, msg)
|
||||
|
@ -1,5 +1,4 @@
|
||||
from mock import call, patch, MagicMock
|
||||
import json
|
||||
import os
|
||||
import yaml
|
||||
|
||||
@ -392,42 +391,43 @@ class GlanceRelationTests(CharmTestCase):
|
||||
'Could not create ceph keyring: peer not ready?'
|
||||
)
|
||||
|
||||
@patch("hooks.glance_relations.relation_set")
|
||||
@patch("hooks.glance_relations.relation_get")
|
||||
@patch.object(relations, 'get_ceph_request')
|
||||
@patch.object(relations, 'send_request_if_needed')
|
||||
@patch.object(relations, 'is_request_complete')
|
||||
@patch.object(relations, 'CONFIGS')
|
||||
def test_ceph_changed_broker_send_rq(self, configs, mock_relation_get,
|
||||
mock_relation_set):
|
||||
configs.complete_contexts.return_value = ['ceph']
|
||||
def test_ceph_changed_broker_send_rq(self, configs, mock_request_complete,
|
||||
mock_send_request_if_needed,
|
||||
mock_get_ceph_request):
|
||||
self.service_name.return_value = 'glance'
|
||||
configs.complete_contexts = MagicMock()
|
||||
configs.complete_contexts.return_value = ['ceph']
|
||||
mock_get_ceph_request.return_value = 'cephrq'
|
||||
self.ensure_ceph_keyring.return_value = True
|
||||
self.relation_ids.return_value = ['ceph:0']
|
||||
mock_request_complete.return_value = False
|
||||
relations.hooks.execute(['hooks/ceph-relation-changed'])
|
||||
self.ensure_ceph_keyring.assert_called_with(service='glance',
|
||||
user='glance',
|
||||
group='glance')
|
||||
req = {'api-version': 1,
|
||||
'ops': [{"op": "create-pool", "name": "glance", "replicas": 3}]}
|
||||
broker_dict = json.dumps(req)
|
||||
mock_relation_set.assert_called_with(broker_req=broker_dict,
|
||||
relation_id='ceph:0')
|
||||
for c in [call('/etc/glance/glance.conf')]:
|
||||
self.assertNotIn(c, configs.write.call_args_list)
|
||||
|
||||
@patch("charmhelpers.core.host.service")
|
||||
@patch("hooks.glance_relations.relation_get", autospec=True)
|
||||
@patch.object(relations, 'get_ceph_request')
|
||||
@patch.object(relations, 'send_request_if_needed')
|
||||
@patch.object(relations, 'is_request_complete')
|
||||
@patch.object(relations, 'CONFIGS')
|
||||
def test_ceph_changed_with_key_and_relation_data(self, configs,
|
||||
mock_relation_get,
|
||||
mock_service):
|
||||
def test_ceph_changed_key_and_relation_data(self, configs,
|
||||
mock_request_complete,
|
||||
mock_send_request_if_needed,
|
||||
mock_service):
|
||||
configs.complete_contexts = MagicMock()
|
||||
configs.complete_contexts.return_value = ['ceph']
|
||||
configs.write = MagicMock()
|
||||
self.ensure_ceph_keyring.return_value = True
|
||||
mock_relation_get.return_value = {'broker_rsp':
|
||||
json.dumps({'exit-code': 0})}
|
||||
mock_request_complete.return_value = True
|
||||
self.ceph_config_file.return_value = '/etc/ceph/ceph.conf'
|
||||
relations.ceph_changed()
|
||||
self.assertEquals([call('/etc/glance/glance-api.conf'),
|
||||
call(self.ceph_config_file())],
|
||||
call('/etc/ceph/ceph.conf')],
|
||||
configs.write.call_args_list)
|
||||
self.service_restart.assert_called_with('glance-api')
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user