Add support for leader-election

This commit is contained in:
James Page 2015-06-09 10:57:24 +01:00
commit 5d0844e2bc
11 changed files with 251 additions and 45 deletions

View File

@ -44,6 +44,7 @@ from charmhelpers.core.hookenv import (
ERROR,
WARNING,
unit_get,
is_leader as juju_is_leader
)
from charmhelpers.core.decorators import (
retry_on_exception,
@ -52,6 +53,8 @@ from charmhelpers.core.strutils import (
bool_from_string,
)
DC_RESOURCE_NAME = 'DC'
class HAIncompleteConfig(Exception):
pass
@ -66,12 +69,21 @@ def is_elected_leader(resource):
Returns True if the charm executing this is the elected cluster leader.
It relies on two mechanisms to determine leadership:
1. If the charm is part of a corosync cluster, call corosync to
1. If juju is sufficiently new and leadership election is supported,
the is_leader command will be used.
2. If the charm is part of a corosync cluster, call corosync to
determine leadership.
2. If the charm is not part of a corosync cluster, the leader is
3. If the charm is not part of a corosync cluster, the leader is
determined as being "the alive unit with the lowest unit numer". In
other words, the oldest surviving unit.
"""
try:
return juju_is_leader()
except NotImplementedError:
log('Juju leadership election feature not enabled'
', using fallback support',
level=WARNING)
if is_clustered():
if not is_crm_leader(resource):
log('Deferring action to CRM leader.', level=INFO)
@ -95,6 +107,27 @@ def is_clustered():
return False
def is_crm_dc():
"""
Determine leadership by querying the pacemaker Designated Controller
"""
cmd = ['crm', 'status']
try:
status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
if not isinstance(status, six.text_type):
status = six.text_type(status, "utf-8")
except subprocess.CalledProcessError:
return False
current_dc = ''
for line in status.split('\n'):
if line.startswith('Current DC'):
# Current DC: juju-lytrusty-machine-2 (168108163) - partition with quorum
current_dc = line.split(':')[1].split()[0]
if current_dc == get_unit_hostname():
return True
return False
@retry_on_exception(5, base_delay=2, exc_type=CRMResourceNotFound)
def is_crm_leader(resource, retry=False):
"""
@ -104,6 +137,8 @@ def is_crm_leader(resource, retry=False):
We allow this operation to be retried to avoid the possibility of getting a
false negative. See LP #1396246 for more info.
"""
if resource == DC_RESOURCE_NAME:
return is_crm_dc()
cmd = ['crm', 'resource', 'show', resource]
try:
status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)

View File

@ -256,11 +256,14 @@ def network_manager():
def parse_mappings(mappings):
parsed = {}
if mappings:
mappings = mappings.split(' ')
mappings = mappings.split()
for m in mappings:
p = m.partition(':')
if p[1] == ':':
parsed[p[0].strip()] = p[2].strip()
key = p[0].strip()
if p[1]:
parsed[key] = p[2].strip()
else:
parsed[key] = ''
return parsed
@ -283,13 +286,13 @@ def parse_data_port_mappings(mappings, default_bridge='br-data'):
Returns dict of the form {bridge:port}.
"""
_mappings = parse_mappings(mappings)
if not _mappings:
if not _mappings or list(_mappings.values()) == ['']:
if not mappings:
return {}
# For backwards-compatibility we need to support port-only provided in
# config.
_mappings = {default_bridge: mappings.split(' ')[0]}
_mappings = {default_bridge: mappings.split()[0]}
bridges = _mappings.keys()
ports = _mappings.values()
@ -309,6 +312,8 @@ def parse_vlan_range_mappings(mappings):
Mappings must be a space-delimited list of provider:start:end mappings.
The start:end range is optional and may be omitted.
Returns dict of the form {provider: (start, end)}.
"""
_mappings = parse_mappings(mappings)

View File

@ -21,12 +21,14 @@
# Charm Helpers Developers <juju@lists.ubuntu.com>
from __future__ import print_function
from functools import wraps
import os
import json
import yaml
import subprocess
import sys
import errno
import tempfile
from subprocess import CalledProcessError
import six
@ -58,12 +60,14 @@ def cached(func):
will cache the result of unit_get + 'test' for future calls.
"""
@wraps(func)
def wrapper(*args, **kwargs):
global cache
key = str((func, args, kwargs))
try:
return cache[key]
except KeyError:
pass # Drop out of the exception handler scope.
res = func(*args, **kwargs)
cache[key] = res
return res
@ -178,7 +182,7 @@ def local_unit():
def remote_unit():
"""The remote unit for the current relation hook"""
return os.environ['JUJU_REMOTE_UNIT']
return os.environ.get('JUJU_REMOTE_UNIT', None)
def service_name():
@ -250,6 +254,12 @@ class Config(dict):
except KeyError:
return (self._prev_dict or {})[key]
def get(self, key, default=None):
try:
return self[key]
except KeyError:
return default
def keys(self):
prev_keys = []
if self._prev_dict is not None:
@ -353,18 +363,49 @@ def relation_set(relation_id=None, relation_settings=None, **kwargs):
"""Set relation information for the current unit"""
relation_settings = relation_settings if relation_settings else {}
relation_cmd_line = ['relation-set']
accepts_file = "--file" in subprocess.check_output(
relation_cmd_line + ["--help"], universal_newlines=True)
if relation_id is not None:
relation_cmd_line.extend(('-r', relation_id))
for k, v in (list(relation_settings.items()) + list(kwargs.items())):
if v is None:
relation_cmd_line.append('{}='.format(k))
settings = relation_settings.copy()
settings.update(kwargs)
for key, value in settings.items():
# Force value to be a string: it always should, but some call
# sites pass in things like dicts or numbers.
if value is not None:
settings[key] = "{}".format(value)
if accepts_file:
# --file was introduced in Juju 1.23.2. Use it by default if
# available, since otherwise we'll break if the relation data is
# too big. Ideally we should tell relation-set to read the data from
# stdin, but that feature is broken in 1.23.2: Bug #1454678.
with tempfile.NamedTemporaryFile(delete=False) as settings_file:
settings_file.write(yaml.safe_dump(settings).encode("utf-8"))
subprocess.check_call(
relation_cmd_line + ["--file", settings_file.name])
os.remove(settings_file.name)
else:
relation_cmd_line.append('{}={}'.format(k, v))
for key, value in settings.items():
if value is None:
relation_cmd_line.append('{}='.format(key))
else:
relation_cmd_line.append('{}={}'.format(key, value))
subprocess.check_call(relation_cmd_line)
# Flush cache of any relation-gets for local unit
flush(local_unit())
def relation_clear(r_id=None):
''' Clears any relation data already set on relation r_id '''
settings = relation_get(rid=r_id,
unit=local_unit())
for setting in settings:
if setting not in ['public-address', 'private-address']:
settings[setting] = None
relation_set(relation_id=r_id,
**settings)
@cached
def relation_ids(reltype=None):
"""A list of relation_ids"""
@ -509,6 +550,11 @@ def unit_get(attribute):
return None
def unit_public_ip():
"""Get this unit's public IP address"""
return unit_get('public-address')
def unit_private_ip():
"""Get this unit's private IP address"""
return unit_get('private-address')
@ -605,3 +651,94 @@ def action_fail(message):
The results set by action_set are preserved."""
subprocess.check_call(['action-fail', message])
def status_set(workload_state, message):
"""Set the workload state with a message
Use status-set to set the workload state with a message which is visible
to the user via juju status. If the status-set command is not found then
assume this is juju < 1.23 and juju-log the message unstead.
workload_state -- valid juju workload state.
message -- status update message
"""
valid_states = ['maintenance', 'blocked', 'waiting', 'active']
if workload_state not in valid_states:
raise ValueError(
'{!r} is not a valid workload state'.format(workload_state)
)
cmd = ['status-set', workload_state, message]
try:
ret = subprocess.call(cmd)
if ret == 0:
return
except OSError as e:
if e.errno != errno.ENOENT:
raise
log_message = 'status-set failed: {} {}'.format(workload_state,
message)
log(log_message, level='INFO')
def status_get():
"""Retrieve the previously set juju workload state
If the status-set command is not found then assume this is juju < 1.23 and
return 'unknown'
"""
cmd = ['status-get']
try:
raw_status = subprocess.check_output(cmd, universal_newlines=True)
status = raw_status.rstrip()
return status
except OSError as e:
if e.errno == errno.ENOENT:
return 'unknown'
else:
raise
def translate_exc(from_exc, to_exc):
def inner_translate_exc1(f):
def inner_translate_exc2(*args, **kwargs):
try:
return f(*args, **kwargs)
except from_exc:
raise to_exc
return inner_translate_exc2
return inner_translate_exc1
@translate_exc(from_exc=OSError, to_exc=NotImplementedError)
def is_leader():
"""Does the current unit hold the juju leadership
Uses juju to determine whether the current unit is the leader of its peers
"""
cmd = ['is-leader', '--format=json']
return json.loads(subprocess.check_output(cmd).decode('UTF-8'))
@translate_exc(from_exc=OSError, to_exc=NotImplementedError)
def leader_get(attribute=None):
"""Juju leader get value(s)"""
cmd = ['leader-get', '--format=json'] + [attribute or '-']
return json.loads(subprocess.check_output(cmd).decode('UTF-8'))
@translate_exc(from_exc=OSError, to_exc=NotImplementedError)
def leader_set(settings=None, **kwargs):
"""Juju leader set value(s)"""
log("Juju leader-set '%s'" % (settings), level=DEBUG)
cmd = ['leader-set']
settings = settings or {}
settings.update(kwargs)
for k, v in settings.iteritems():
if v is None:
cmd.append('{}='.format(k))
else:
cmd.append('{}={}'.format(k, v))
subprocess.check_call(cmd)

View File

@ -90,7 +90,7 @@ def service_available(service_name):
['service', service_name, 'status'],
stderr=subprocess.STDOUT).decode('UTF-8')
except subprocess.CalledProcessError as e:
return 'unrecognized service' not in e.output
return b'unrecognized service' not in e.output
else:
return True

View File

@ -15,9 +15,9 @@
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
import os
import re
import json
from collections import Iterable
from inspect import getargspec
from collections import Iterable, OrderedDict
from charmhelpers.core import host
from charmhelpers.core import hookenv
@ -119,7 +119,7 @@ class ServiceManager(object):
"""
self._ready_file = os.path.join(hookenv.charm_dir(), 'READY-SERVICES.json')
self._ready = None
self.services = {}
self.services = OrderedDict()
for service in services or []:
service_name = service['service']
self.services[service_name] = service
@ -132,8 +132,8 @@ class ServiceManager(object):
if hook_name == 'stop':
self.stop_services()
else:
self.provide_data()
self.reconfigure_services()
self.provide_data()
cfg = hookenv.config()
if cfg.implicit_save:
cfg.save()
@ -145,15 +145,36 @@ class ServiceManager(object):
A provider must have a `name` attribute, which indicates which relation
to set data on, and a `provide_data()` method, which returns a dict of
data to set.
The `provide_data()` method can optionally accept two parameters:
* ``remote_service`` The name of the remote service that the data will
be provided to. The `provide_data()` method will be called once
for each connected service (not unit). This allows the method to
tailor its data to the given service.
* ``service_ready`` Whether or not the service definition had all of
its requirements met, and thus the ``data_ready`` callbacks run.
Note that the ``provided_data`` methods are now called **after** the
``data_ready`` callbacks are run. This gives the ``data_ready`` callbacks
a chance to generate any data necessary for the providing to the remote
services.
"""
hook_name = hookenv.hook_name()
for service in self.services.values():
for service_name, service in self.services.items():
service_ready = self.is_ready(service_name)
for provider in service.get('provided_data', []):
if re.match(r'{}-relation-(joined|changed)'.format(provider.name), hook_name):
for relid in hookenv.relation_ids(provider.name):
units = hookenv.related_units(relid)
if not units:
continue
remote_service = units[0].split('/')[0]
argspec = getargspec(provider.provide_data)
if len(argspec.args) > 1:
data = provider.provide_data(remote_service, service_ready)
else:
data = provider.provide_data()
_ready = provider._is_ready(data) if hasattr(provider, '_is_ready') else data
if _ready:
hookenv.relation_set(None, data)
if data:
hookenv.relation_set(relid, data)
def reconfigure_services(self, *service_names):
"""

View File

@ -158,7 +158,7 @@ def filter_installed_packages(packages):
def apt_cache(in_memory=True):
"""Build and return an apt cache"""
import apt_pkg
from apt import apt_pkg
apt_pkg.init()
if in_memory:
apt_pkg.config.set("Dir::Cache::pkgcache", "")

View File

@ -72,7 +72,7 @@ from charmhelpers.contrib.storage.linux.ceph import (
)
from charmhelpers.contrib.hahelpers.cluster import (
eligible_leader,
is_elected_leader,
get_hacluster_config,
)
@ -192,7 +192,7 @@ def db_changed():
juju_log('shared-db relation incomplete. Peer not ready?')
return
CONFIGS.write(CINDER_CONF)
if eligible_leader(CLUSTER_RES):
if is_elected_leader(CLUSTER_RES):
# Bugs 1353135 & 1187508. Dbs can appear to be ready before the units
# acl entry has been added. So, if the db supports passing a list of
# permitted units then check if we're in the list.
@ -212,7 +212,7 @@ def pgsql_db_changed():
juju_log('pgsql-db relation incomplete. Peer not ready?')
return
CONFIGS.write(CINDER_CONF)
if eligible_leader(CLUSTER_RES):
if is_elected_leader(CLUSTER_RES):
juju_log('Cluster leader, performing db sync')
migrate_database()

View File

@ -45,7 +45,7 @@ from charmhelpers.core.host import (
from charmhelpers.contrib.openstack.alternatives import install_alternative
from charmhelpers.contrib.hahelpers.cluster import (
eligible_leader,
is_elected_leader,
)
from charmhelpers.contrib.storage.linux.utils import (
@ -83,6 +83,9 @@ from charmhelpers.contrib.openstack.utils import (
os_release,
)
from charmhelpers.core.decorators import (
retry_on_exception,
)
from charmhelpers.core.templating import render
import cinder_contexts
@ -488,6 +491,9 @@ def check_db_initialised():
relation_set(**{CINDER_DB_INIT_ECHO_RKEY: init_id})
# NOTE(jamespage): Retry deals with sync issues during one-shot HA deploys.
# mysql might be restarting or suchlike.
@retry_on_exception(5, base_delay=3, exc_type=subprocess.CalledProcessError)
def migrate_database():
'Runs cinder-manage to initialize a new database or migrate existing'
cmd = ['cinder-manage', 'db', 'sync']
@ -542,7 +548,7 @@ def do_openstack_upgrade(configs):
# Stop/start services and migrate DB if leader
[service_stop(s) for s in services()]
if eligible_leader(CLUSTER_RES):
if is_elected_leader(CLUSTER_RES):
migrate_database()
[service_start(s) for s in services()]

View File

@ -62,13 +62,14 @@ TO_PATCH = [
'apt_install',
'apt_update',
'service_reload',
'service_restart',
# charmhelpers.contrib.openstack.openstack_utils
'configure_installation_source',
'openstack_upgrade_available',
'os_release',
# charmhelpers.contrib.hahelpers.cluster_utils
'canonical_url',
'eligible_leader',
'is_elected_leader',
'get_hacluster_config',
'execd_preinstall',
'get_ipv6_addr',
@ -240,7 +241,7 @@ class TestChangedHooks(CharmTestCase):
self.relation_get.return_value = 'cinder/1 cinder/2'
self.local_unit.return_value = 'cinder/0'
self.CONFIGS.complete_contexts.return_value = ['shared-db']
self.eligible_leader.return_value = True
self.is_elected_leader.return_value = True
hooks.hooks.execute(['hooks/shared-db-relation-changed'])
self.assertFalse(self.migrate_database.called)
@ -249,7 +250,7 @@ class TestChangedHooks(CharmTestCase):
self.relation_get.return_value = None
self.local_unit.return_value = 'cinder/0'
self.CONFIGS.complete_contexts.return_value = ['shared-db']
self.eligible_leader.return_value = True
self.is_elected_leader.return_value = True
hooks.hooks.execute(['hooks/shared-db-relation-changed'])
self.assertFalse(self.migrate_database.called)
@ -263,7 +264,7 @@ class TestChangedHooks(CharmTestCase):
'It does not migrate database when not leader'
self.relation_get.return_value = 'cinder/0 cinder/1'
self.local_unit.return_value = 'cinder/0'
self.eligible_leader.return_value = False
self.is_elected_leader.return_value = False
self.CONFIGS.complete_contexts.return_value = ['shared-db']
hooks.hooks.execute(['hooks/shared-db-relation-changed'])
self.CONFIGS.write.assert_called_with('/etc/cinder/cinder.conf')
@ -271,7 +272,7 @@ class TestChangedHooks(CharmTestCase):
def test_pgsql_db_changed_not_leader(self):
'It does not migrate database when not leader'
self.eligible_leader.return_value = False
self.is_elected_leader.return_value = False
self.CONFIGS.complete_contexts.return_value = ['pgsql-db']
hooks.hooks.execute(['hooks/pgsql-db-relation-changed'])
self.CONFIGS.write.assert_called_with('/etc/cinder/cinder.conf')
@ -518,6 +519,7 @@ class TestJoinedHooks(CharmTestCase):
call('/etc/cinder/cinder.conf')]:
self.assertIn(c, self.CONFIGS.write.call_args_list)
self.set_ceph_env_variables.assert_called_with(service='cinder')
self.service_restart.assert_called_with('cinder-volume')
@patch("cinder_hooks.relation_get", autospec=True)
def test_ceph_changed_broker_nonzero_rc(self, mock_relation_get):
@ -555,7 +557,7 @@ class TestJoinedHooks(CharmTestCase):
def test_ceph_changed_no_leadership(self):
'''It does not attempt to create ceph pool if not leader'''
self.eligible_leader.return_value = False
self.is_elected_leader.return_value = False
self.service_name.return_value = 'cinder'
self.ensure_ceph_keyring.return_value = True
hooks.hooks.execute(['hooks/ceph-relation-changed'])

View File

@ -38,7 +38,7 @@ TO_PATCH = [
'os_release',
'get_os_codename_install_source',
'configure_installation_source',
'eligible_leader',
'is_elected_leader',
'templating',
'install_alternative',
# fetch
@ -501,7 +501,7 @@ class TestCinderUtils(CharmTestCase):
self.config.side_effect = None
self.config.return_value = 'cloud:precise-havana'
services.return_value = ['cinder-api', 'cinder-volume']
self.eligible_leader.return_value = True
self.is_elected_leader.return_value = True
self.get_os_codename_install_source.return_value = 'havana'
configs = MagicMock()
cinder_utils.do_openstack_upgrade(configs)
@ -520,7 +520,7 @@ class TestCinderUtils(CharmTestCase):
self.config.side_effect = None
self.config.return_value = 'cloud:precise-havana'
services.return_value = ['cinder-api', 'cinder-volume']
self.eligible_leader.return_value = False
self.is_elected_leader.return_value = False
self.get_os_codename_install_source.return_value = 'havana'
configs = MagicMock()
cinder_utils.do_openstack_upgrade(configs)

View File

@ -48,7 +48,7 @@ TO_PATCH = [
# charmhelpers.contrib.openstack.openstack_utils
'configure_installation_source',
# charmhelpers.contrib.hahelpers.cluster_utils
'eligible_leader',
'is_elected_leader',
'get_hacluster_config',
# charmhelpers.contrib.network.ip
'get_iface_for_address',