diff --git a/charm-helpers-hooks.yaml b/charm-helpers-hooks.yaml index 0aee17e..fec1487 100644 --- a/charm-helpers-hooks.yaml +++ b/charm-helpers-hooks.yaml @@ -11,3 +11,4 @@ include: - payload.execd - contrib.network.ip - contrib.peerstorage + - contrib.python.packages diff --git a/config.yaml b/config.yaml index d37b3d9..02cb962 100644 --- a/config.yaml +++ b/config.yaml @@ -30,9 +30,23 @@ options: type: int description: Minimum replicas. min-hours: - default: 1 + default: 0 type: int - description: Minimum hours between balances + description: | + This is the Swift ring builder min_part_hours parameter. This + setting represents the amount of time in hours that Swift will wait + between subsequent ring re-balances in order to avoid large i/o loads as + data is re-balanced when new devices are added to the cluster. Once your + cluster has been built, you can set this to a higher value e.g. 1 + (upstream default). Note that changing this value will result in an + attempt to re-balance and if successful, rings will be redistributed. + disable-ring-balance: + type: boolean + default: False + description: | + This provides similar support to min-hours but without having to modify + the builders. If True, any changes to the builders will not result in a + ring re-balance and sync until this value is set back to False. zone-assignment: default: "manual" type: string @@ -71,7 +85,8 @@ options: workers: default: 0 type: int - description: Number of TCP workers to launch (0 for the number of system cores) + description: | + Number of TCP workers to launch (0 for the number of system cores). operator-roles: default: "Member,Admin" type: string @@ -87,7 +102,9 @@ options: node-timeout: default: 60 type: int - description: How long the proxy server will wait on responses from the a/c/o servers. + description: | + How long the proxy server will wait on responses from the + account/container/object servers. recoverable-node-timeout: default: 30 type: int diff --git a/hooks/charmhelpers/__init__.py b/hooks/charmhelpers/__init__.py index e69de29..b46e2e2 100644 --- a/hooks/charmhelpers/__init__.py +++ b/hooks/charmhelpers/__init__.py @@ -0,0 +1,22 @@ +# Bootstrap charm-helpers, installing its dependencies if necessary using +# only standard libraries. +import subprocess +import sys + +try: + import six # flake8: noqa +except ImportError: + if sys.version_info.major == 2: + subprocess.check_call(['apt-get', 'install', '-y', 'python-six']) + else: + subprocess.check_call(['apt-get', 'install', '-y', 'python3-six']) + import six # flake8: noqa + +try: + import yaml # flake8: noqa +except ImportError: + if sys.version_info.major == 2: + subprocess.check_call(['apt-get', 'install', '-y', 'python-yaml']) + else: + subprocess.check_call(['apt-get', 'install', '-y', 'python3-yaml']) + import yaml # flake8: noqa diff --git a/hooks/charmhelpers/contrib/hahelpers/cluster.py b/hooks/charmhelpers/contrib/hahelpers/cluster.py index 52ce4b7..912b2fe 100644 --- a/hooks/charmhelpers/contrib/hahelpers/cluster.py +++ b/hooks/charmhelpers/contrib/hahelpers/cluster.py @@ -13,6 +13,7 @@ clustering-related helpers. import subprocess import os + from socket import gethostname as get_unit_hostname import six @@ -28,12 +29,19 @@ from charmhelpers.core.hookenv import ( WARNING, unit_get, ) +from charmhelpers.core.decorators import ( + retry_on_exception, +) class HAIncompleteConfig(Exception): pass +class CRMResourceNotFound(Exception): + pass + + def is_elected_leader(resource): """ Returns True if the charm executing this is the elected cluster leader. @@ -68,24 +76,30 @@ def is_clustered(): return False -def is_crm_leader(resource): +@retry_on_exception(5, base_delay=2, exc_type=CRMResourceNotFound) +def is_crm_leader(resource, retry=False): """ Returns True if the charm calling this is the elected corosync leader, as returned by calling the external "crm" command. + + We allow this operation to be retried to avoid the possibility of getting a + false negative. See LP #1396246 for more info. """ - cmd = [ - "crm", "resource", - "show", resource - ] + cmd = ['crm', 'resource', 'show', resource] try: - status = subprocess.check_output(cmd).decode('UTF-8') + status = subprocess.check_output(cmd, stderr=subprocess.STDOUT) + if not isinstance(status, six.text_type): + status = six.text_type(status, "utf-8") except subprocess.CalledProcessError: - return False - else: - if get_unit_hostname() in status: - return True - else: - return False + status = None + + if status and get_unit_hostname() in status: + return True + + if status and "resource %s is NOT running" % (resource) in status: + raise CRMResourceNotFound("CRM resource %s not found" % (resource)) + + return False def is_leader(resource): diff --git a/hooks/charmhelpers/contrib/openstack/context.py b/hooks/charmhelpers/contrib/openstack/context.py index eebe8c0..eb10891 100644 --- a/hooks/charmhelpers/contrib/openstack/context.py +++ b/hooks/charmhelpers/contrib/openstack/context.py @@ -21,11 +21,15 @@ from charmhelpers.core.hookenv import ( relation_set, unit_get, unit_private_ip, + charm_name, DEBUG, INFO, WARNING, ERROR, ) + +from charmhelpers.core.sysctl import create as sysctl_create + from charmhelpers.core.host import ( mkdir, write_file, @@ -1015,3 +1019,14 @@ class NotificationDriverContext(OSContextGenerator): ctxt['notifications'] = "True" return ctxt + + +class SysctlContext(OSContextGenerator): + """This context check if the 'sysctl' option exists on configuration + then creates a file with the loaded contents""" + def __call__(self): + sysctl_dict = config('sysctl') + if sysctl_dict: + sysctl_create(sysctl_dict, + '/etc/sysctl.d/50-{0}.conf'.format(charm_name())) + return {'sysctl': sysctl_dict} diff --git a/hooks/charmhelpers/contrib/openstack/ip.py b/hooks/charmhelpers/contrib/openstack/ip.py index bc84fc4..f062c80 100644 --- a/hooks/charmhelpers/contrib/openstack/ip.py +++ b/hooks/charmhelpers/contrib/openstack/ip.py @@ -2,21 +2,19 @@ from charmhelpers.core.hookenv import ( config, unit_get, ) - from charmhelpers.contrib.network.ip import ( get_address_in_network, is_address_in_network, is_ipv6, get_ipv6_addr, ) - from charmhelpers.contrib.hahelpers.cluster import is_clustered PUBLIC = 'public' INTERNAL = 'int' ADMIN = 'admin' -_address_map = { +ADDRESS_MAP = { PUBLIC: { 'config': 'os-public-network', 'fallback': 'public-address' @@ -33,16 +31,14 @@ _address_map = { def canonical_url(configs, endpoint_type=PUBLIC): - ''' - Returns the correct HTTP URL to this host given the state of HTTPS + """Returns the correct HTTP URL to this host given the state of HTTPS configuration, hacluster and charm configuration. - :configs OSTemplateRenderer: A config tempating object to inspect for - a complete https context. - :endpoint_type str: The endpoint type to resolve. - - :returns str: Base URL for services on the current service unit. - ''' + :param configs: OSTemplateRenderer config templating object to inspect + for a complete https context. + :param endpoint_type: str endpoint type to resolve. + :param returns: str base URL for services on the current service unit. + """ scheme = 'http' if 'https' in configs.complete_contexts(): scheme = 'https' @@ -53,27 +49,45 @@ def canonical_url(configs, endpoint_type=PUBLIC): def resolve_address(endpoint_type=PUBLIC): + """Return unit address depending on net config. + + If unit is clustered with vip(s) and has net splits defined, return vip on + correct network. If clustered with no nets defined, return primary vip. + + If not clustered, return unit address ensuring address is on configured net + split if one is configured. + + :param endpoint_type: Network endpoing type + """ resolved_address = None - if is_clustered(): - if config(_address_map[endpoint_type]['config']) is None: - # Assume vip is simple and pass back directly - resolved_address = config('vip') + vips = config('vip') + if vips: + vips = vips.split() + + net_type = ADDRESS_MAP[endpoint_type]['config'] + net_addr = config(net_type) + net_fallback = ADDRESS_MAP[endpoint_type]['fallback'] + clustered = is_clustered() + if clustered: + if not net_addr: + # If no net-splits defined, we expect a single vip + resolved_address = vips[0] else: - for vip in config('vip').split(): - if is_address_in_network( - config(_address_map[endpoint_type]['config']), - vip): + for vip in vips: + if is_address_in_network(net_addr, vip): resolved_address = vip + break else: if config('prefer-ipv6'): - fallback_addr = get_ipv6_addr(exc_list=[config('vip')])[0] + fallback_addr = get_ipv6_addr(exc_list=vips)[0] else: - fallback_addr = unit_get(_address_map[endpoint_type]['fallback']) - resolved_address = get_address_in_network( - config(_address_map[endpoint_type]['config']), fallback_addr) + fallback_addr = unit_get(net_fallback) + + resolved_address = get_address_in_network(net_addr, fallback_addr) if resolved_address is None: - raise ValueError('Unable to resolve a suitable IP address' - ' based on charm state and configuration') - else: - return resolved_address + raise ValueError("Unable to resolve a suitable IP address based on " + "charm state and configuration. (net_type=%s, " + "clustered=%s)" % (net_type, clustered)) + + return resolved_address diff --git a/hooks/charmhelpers/contrib/openstack/utils.py b/hooks/charmhelpers/contrib/openstack/utils.py index 6447ce9..4417967 100644 --- a/hooks/charmhelpers/contrib/openstack/utils.py +++ b/hooks/charmhelpers/contrib/openstack/utils.py @@ -11,12 +11,12 @@ import socket import sys import six +import yaml from charmhelpers.core.hookenv import ( config, log as juju_log, charm_dir, - ERROR, INFO, relation_ids, relation_set @@ -33,7 +33,8 @@ from charmhelpers.contrib.network.ip import ( ) from charmhelpers.core.host import lsb_release, mounts, umount -from charmhelpers.fetch import apt_install, apt_cache +from charmhelpers.fetch import apt_install, apt_cache, install_remote +from charmhelpers.contrib.python.packages import pip_install from charmhelpers.contrib.storage.linux.utils import is_block_device, zap_disk from charmhelpers.contrib.storage.linux.loopback import ensure_loopback_device @@ -353,8 +354,8 @@ def ensure_block_device(block_device): ''' _none = ['None', 'none', None] if (block_device in _none): - error_out('prepare_storage(): Missing required input: ' - 'block_device=%s.' % block_device, level=ERROR) + error_out('prepare_storage(): Missing required input: block_device=%s.' + % block_device) if block_device.startswith('/dev/'): bdev = block_device @@ -370,8 +371,7 @@ def ensure_block_device(block_device): bdev = '/dev/%s' % block_device if not is_block_device(bdev): - error_out('Failed to locate valid block device at %s' % bdev, - level=ERROR) + error_out('Failed to locate valid block device at %s' % bdev) return bdev @@ -509,3 +509,111 @@ def os_requires_version(ostack_release, pkg): f(*args) return wrapped_f return wrap + + +def git_install_requested(): + """Returns true if openstack-origin-git is specified.""" + return config('openstack-origin-git') != "None" + + +requirements_dir = None + + +def git_clone_and_install(file_name, core_project): + """Clone/install all OpenStack repos specified in yaml config file.""" + global requirements_dir + + if file_name == "None": + return + + yaml_file = os.path.join(charm_dir(), file_name) + + # clone/install the requirements project first + installed = _git_clone_and_install_subset(yaml_file, + whitelist=['requirements']) + if 'requirements' not in installed: + error_out('requirements git repository must be specified') + + # clone/install all other projects except requirements and the core project + blacklist = ['requirements', core_project] + _git_clone_and_install_subset(yaml_file, blacklist=blacklist, + update_requirements=True) + + # clone/install the core project + whitelist = [core_project] + installed = _git_clone_and_install_subset(yaml_file, whitelist=whitelist, + update_requirements=True) + if core_project not in installed: + error_out('{} git repository must be specified'.format(core_project)) + + +def _git_clone_and_install_subset(yaml_file, whitelist=[], blacklist=[], + update_requirements=False): + """Clone/install subset of OpenStack repos specified in yaml config file.""" + global requirements_dir + installed = [] + + with open(yaml_file, 'r') as fd: + projects = yaml.load(fd) + for proj, val in projects.items(): + # The project subset is chosen based on the following 3 rules: + # 1) If project is in blacklist, we don't clone/install it, period. + # 2) If whitelist is empty, we clone/install everything else. + # 3) If whitelist is not empty, we clone/install everything in the + # whitelist. + if proj in blacklist: + continue + if whitelist and proj not in whitelist: + continue + repo = val['repository'] + branch = val['branch'] + repo_dir = _git_clone_and_install_single(repo, branch, + update_requirements) + if proj == 'requirements': + requirements_dir = repo_dir + installed.append(proj) + return installed + + +def _git_clone_and_install_single(repo, branch, update_requirements=False): + """Clone and install a single git repository.""" + dest_parent_dir = "/mnt/openstack-git/" + dest_dir = os.path.join(dest_parent_dir, os.path.basename(repo)) + + if not os.path.exists(dest_parent_dir): + juju_log('Host dir not mounted at {}. ' + 'Creating directory there instead.'.format(dest_parent_dir)) + os.mkdir(dest_parent_dir) + + if not os.path.exists(dest_dir): + juju_log('Cloning git repo: {}, branch: {}'.format(repo, branch)) + repo_dir = install_remote(repo, dest=dest_parent_dir, branch=branch) + else: + repo_dir = dest_dir + + if update_requirements: + if not requirements_dir: + error_out('requirements repo must be cloned before ' + 'updating from global requirements.') + _git_update_requirements(repo_dir, requirements_dir) + + juju_log('Installing git repo from dir: {}'.format(repo_dir)) + pip_install(repo_dir) + + return repo_dir + + +def _git_update_requirements(package_dir, reqs_dir): + """Update from global requirements. + + Update an OpenStack git directory's requirements.txt and + test-requirements.txt from global-requirements.txt.""" + orig_dir = os.getcwd() + os.chdir(reqs_dir) + cmd = "python update.py {}".format(package_dir) + try: + subprocess.check_call(cmd.split(' ')) + except subprocess.CalledProcessError: + package = os.path.basename(package_dir) + error_out("Error updating {} from global-requirements.txt".format(package)) + os.chdir(orig_dir) diff --git a/hooks/charmhelpers/contrib/python/__init__.py b/hooks/charmhelpers/contrib/python/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hooks/charmhelpers/contrib/python/packages.py b/hooks/charmhelpers/contrib/python/packages.py new file mode 100644 index 0000000..78162b1 --- /dev/null +++ b/hooks/charmhelpers/contrib/python/packages.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python +# coding: utf-8 + +__author__ = "Jorge Niedbalski " + +from charmhelpers.fetch import apt_install, apt_update +from charmhelpers.core.hookenv import log + +try: + from pip import main as pip_execute +except ImportError: + apt_update() + apt_install('python-pip') + from pip import main as pip_execute + + +def parse_options(given, available): + """Given a set of options, check if available""" + for key, value in sorted(given.items()): + if key in available: + yield "--{0}={1}".format(key, value) + + +def pip_install_requirements(requirements, **options): + """Install a requirements file """ + command = ["install"] + + available_options = ('proxy', 'src', 'log', ) + for option in parse_options(options, available_options): + command.append(option) + + command.append("-r {0}".format(requirements)) + log("Installing from file: {} with options: {}".format(requirements, + command)) + pip_execute(command) + + +def pip_install(package, fatal=False, **options): + """Install a python package""" + command = ["install"] + + available_options = ('proxy', 'src', 'log', "index-url", ) + for option in parse_options(options, available_options): + command.append(option) + + if isinstance(package, list): + command.extend(package) + else: + command.append(package) + + log("Installing {} package with options: {}".format(package, + command)) + pip_execute(command) + + +def pip_uninstall(package, **options): + """Uninstall a python package""" + command = ["uninstall", "-q", "-y"] + + available_options = ('proxy', 'log', ) + for option in parse_options(options, available_options): + command.append(option) + + if isinstance(package, list): + command.extend(package) + else: + command.append(package) + + log("Uninstalling {} package with options: {}".format(package, + command)) + pip_execute(command) + + +def pip_list(): + """Returns the list of current python installed packages + """ + return pip_execute(["list"]) diff --git a/hooks/charmhelpers/contrib/storage/linux/ceph.py b/hooks/charmhelpers/contrib/storage/linux/ceph.py index d47dc22..1479f4f 100644 --- a/hooks/charmhelpers/contrib/storage/linux/ceph.py +++ b/hooks/charmhelpers/contrib/storage/linux/ceph.py @@ -372,3 +372,46 @@ def ceph_version(): return None else: return None + + +class CephBrokerRq(object): + """Ceph broker request. + + Multiple operations can be added to a request and sent to the Ceph broker + to be executed. + + Request is json-encoded for sending over the wire. + + The API is versioned and defaults to version 1. + """ + def __init__(self, api_version=1): + self.api_version = api_version + self.ops = [] + + def add_op_create_pool(self, name, replica_count=3): + self.ops.append({'op': 'create-pool', 'name': name, + 'replicas': replica_count}) + + @property + def request(self): + return json.dumps({'api-version': self.api_version, 'ops': self.ops}) + + +class CephBrokerRsp(object): + """Ceph broker response. + + Response is json-decoded and contents provided as methods/properties. + + The API is versioned and defaults to version 1. + """ + def __init__(self, encoded_rsp): + self.api_version = None + self.rsp = json.loads(encoded_rsp) + + @property + def exit_code(self): + return self.rsp.get('exit-code') + + @property + def exit_msg(self): + return self.rsp.get('stderr') diff --git a/hooks/charmhelpers/core/decorators.py b/hooks/charmhelpers/core/decorators.py new file mode 100644 index 0000000..029a4ef --- /dev/null +++ b/hooks/charmhelpers/core/decorators.py @@ -0,0 +1,41 @@ +# +# Copyright 2014 Canonical Ltd. +# +# Authors: +# Edward Hope-Morley +# + +import time + +from charmhelpers.core.hookenv import ( + log, + INFO, +) + + +def retry_on_exception(num_retries, base_delay=0, exc_type=Exception): + """If the decorated function raises exception exc_type, allow num_retries + retry attempts before raise the exception. + """ + def _retry_on_exception_inner_1(f): + def _retry_on_exception_inner_2(*args, **kwargs): + retries = num_retries + multiplier = 1 + while True: + try: + return f(*args, **kwargs) + except exc_type: + if not retries: + raise + + delay = base_delay * multiplier + multiplier += 1 + log("Retrying '%s' %d more times (delay=%s)" % + (f.__name__, retries, delay), level=INFO) + retries -= 1 + if delay: + time.sleep(delay) + + return _retry_on_exception_inner_2 + + return _retry_on_exception_inner_1 diff --git a/hooks/charmhelpers/core/hookenv.py b/hooks/charmhelpers/core/hookenv.py index 99e5d20..69ae456 100644 --- a/hooks/charmhelpers/core/hookenv.py +++ b/hooks/charmhelpers/core/hookenv.py @@ -68,6 +68,8 @@ def log(message, level=None): command = ['juju-log'] if level: command += ['-l', level] + if not isinstance(message, six.string_types): + message = repr(message) command += [message] subprocess.call(command) @@ -393,21 +395,31 @@ def relations_of_type(reltype=None): return relation_data +@cached +def metadata(): + """Get the current charm metadata.yaml contents as a python object""" + with open(os.path.join(charm_dir(), 'metadata.yaml')) as md: + return yaml.safe_load(md) + + @cached def relation_types(): """Get a list of relation types supported by this charm""" - charmdir = os.environ.get('CHARM_DIR', '') - mdf = open(os.path.join(charmdir, 'metadata.yaml')) - md = yaml.safe_load(mdf) rel_types = [] + md = metadata() for key in ('provides', 'requires', 'peers'): section = md.get(key) if section: rel_types.extend(section.keys()) - mdf.close() return rel_types +@cached +def charm_name(): + """Get the name of the current charm as is specified on metadata.yaml""" + return metadata().get('name') + + @cached def relations(): """Get a nested dictionary of relation data for all related units""" diff --git a/hooks/charmhelpers/core/host.py b/hooks/charmhelpers/core/host.py index e6783d9..5221120 100644 --- a/hooks/charmhelpers/core/host.py +++ b/hooks/charmhelpers/core/host.py @@ -101,6 +101,26 @@ def adduser(username, password=None, shell='/bin/bash', system_user=False): return user_info +def add_group(group_name, system_group=False): + """Add a group to the system""" + try: + group_info = grp.getgrnam(group_name) + log('group {0} already exists!'.format(group_name)) + except KeyError: + log('creating group {0}'.format(group_name)) + cmd = ['addgroup'] + if system_group: + cmd.append('--system') + else: + cmd.extend([ + '--group', + ]) + cmd.append(group_name) + subprocess.check_call(cmd) + group_info = grp.getgrnam(group_name) + return group_info + + def add_user_to_group(username, group): """Add a user to a group""" cmd = [ @@ -142,13 +162,16 @@ def mkdir(path, owner='root', group='root', perms=0o555, force=False): uid = pwd.getpwnam(owner).pw_uid gid = grp.getgrnam(group).gr_gid realpath = os.path.abspath(path) - if os.path.exists(realpath): - if force and not os.path.isdir(realpath): + path_exists = os.path.exists(realpath) + if path_exists and force: + if not os.path.isdir(realpath): log("Removing non-directory file {} prior to mkdir()".format(path)) os.unlink(realpath) - else: + os.makedirs(realpath, perms) + os.chown(realpath, uid, gid) + elif not path_exists: os.makedirs(realpath, perms) - os.chown(realpath, uid, gid) + os.chown(realpath, uid, gid) def write_file(path, content, owner='root', group='root', perms=0o444): @@ -368,8 +391,8 @@ def cmp_pkgrevno(package, revno, pkgcache=None): ''' import apt_pkg - from charmhelpers.fetch import apt_cache if not pkgcache: + from charmhelpers.fetch import apt_cache pkgcache = apt_cache() pkg = pkgcache[package] return apt_pkg.version_compare(pkg.current_ver.ver_str, revno) diff --git a/hooks/charmhelpers/core/templating.py b/hooks/charmhelpers/core/templating.py index 83133fa..569eaed 100644 --- a/hooks/charmhelpers/core/templating.py +++ b/hooks/charmhelpers/core/templating.py @@ -48,5 +48,5 @@ def render(source, target, context, owner='root', group='root', level=hookenv.ERROR) raise e content = template.render(context) - host.mkdir(os.path.dirname(target)) + host.mkdir(os.path.dirname(target), owner, group) host.write_file(target, content, owner, group, perms) diff --git a/hooks/charmhelpers/fetch/giturl.py b/hooks/charmhelpers/fetch/giturl.py index 61684cb..f3aa282 100644 --- a/hooks/charmhelpers/fetch/giturl.py +++ b/hooks/charmhelpers/fetch/giturl.py @@ -34,11 +34,14 @@ class GitUrlFetchHandler(BaseFetchHandler): repo = Repo.clone_from(source, dest) repo.git.checkout(branch) - def install(self, source, branch="master"): + def install(self, source, branch="master", dest=None): url_parts = self.parse_url(source) branch_name = url_parts.path.strip("/").split("/")[-1] - dest_dir = os.path.join(os.environ.get('CHARM_DIR'), "fetched", - branch_name) + if dest: + dest_dir = os.path.join(dest, branch_name) + else: + dest_dir = os.path.join(os.environ.get('CHARM_DIR'), "fetched", + branch_name) if not os.path.exists(dest_dir): mkdir(dest_dir, perms=0o755) try: diff --git a/hooks/swift-storage-relation-joined b/hooks/swift-storage-relation-joined new file mode 120000 index 0000000..8623fba --- /dev/null +++ b/hooks/swift-storage-relation-joined @@ -0,0 +1 @@ +swift_hooks.py \ No newline at end of file diff --git a/hooks/swift_context.py b/hooks/swift_context.py index 6813012..6fb5cb0 100644 --- a/hooks/swift_context.py +++ b/hooks/swift_context.py @@ -1,3 +1,6 @@ +import os +import uuid + from charmhelpers.core.hookenv import ( config, log, @@ -5,38 +8,35 @@ from charmhelpers.core.hookenv import ( related_units, relation_get, unit_get, - service_name + service_name, ) - from charmhelpers.contrib.openstack.context import ( OSContextGenerator, ApacheSSLContext as SSLContext, context_complete, ) - from charmhelpers.contrib.hahelpers.cluster import ( determine_api_port, determine_apache_port, ) - from charmhelpers.contrib.network.ip import ( get_ipv6_addr ) - from charmhelpers.contrib.openstack.utils import get_host_ip -import os -import uuid + + +SWIFT_HASH_FILE = '/var/lib/juju/swift-hash-path.conf' +WWW_DIR = '/var/www/swift-rings' class HAProxyContext(OSContextGenerator): interfaces = ['cluster'] def __call__(self): - ''' - Extends the main charmhelpers HAProxyContext with a port mapping + """Extends the main charmhelpers HAProxyContext with a port mapping specific to this charm. Also used to extend cinder.conf context with correct api_listening_port - ''' + """ haproxy_port = config('bind-port') api_port = determine_apache_port(config('bind-port'), singlenode_mode=True) @@ -47,9 +47,6 @@ class HAProxyContext(OSContextGenerator): return ctxt -WWW_DIR = '/var/www/swift-rings' - - class ApacheSSLContext(SSLContext): interfaces = ['https'] external_ports = [config('bind-port')] @@ -67,6 +64,7 @@ class SwiftRingContext(OSContextGenerator): host_ip = get_ipv6_addr(exc_list=[config('vip')])[0] else: host_ip = get_host_ip(host) + allowed_hosts.append(host_ip) ctxt = { @@ -91,6 +89,7 @@ class SwiftIdentityContext(OSContextGenerator): else: proxy_ip = get_host_ip(unit_get('private-address')) memcached_ip = get_host_ip(unit_get('private-address')) + ctxt = { 'proxy_ip': proxy_ip, 'memcached_ip': memcached_ip, @@ -148,6 +147,7 @@ class SwiftIdentityContext(OSContextGenerator): } if context_complete(ks_auth): ctxt.update(ks_auth) + return ctxt @@ -159,9 +159,8 @@ class MemcachedContext(OSContextGenerator): ctxt['memcached_ip'] = 'ip6-localhost' else: ctxt['memcached_ip'] = get_host_ip(unit_get('private-address')) - return ctxt -SWIFT_HASH_FILE = '/var/lib/juju/swift-hash-path.conf' + return ctxt def get_swift_hash(): @@ -177,6 +176,7 @@ def get_swift_hash(): service_name())) with open(SWIFT_HASH_FILE, 'w') as hashfile: hashfile.write(swift_hash) + return swift_hash diff --git a/hooks/swift_hooks.py b/hooks/swift_hooks.py index 6d5c71c..39f92a0 100755 --- a/hooks/swift_hooks.py +++ b/hooks/swift_hooks.py @@ -2,71 +2,80 @@ import os import sys -import shutil -import uuid import subprocess -import charmhelpers.contrib.openstack.utils as openstack -import charmhelpers.contrib.hahelpers.cluster as cluster from swift_utils import ( + SwiftProxyCharmException, register_configs, restart_map, determine_packages, ensure_swift_dir, - SWIFT_RINGS, get_www_dir, + SWIFT_RINGS, + get_www_dir, initialize_ring, - swift_user, SWIFT_HA_RES, - balance_ring, - SWIFT_CONF_DIR, get_zone, - exists_in_ring, - add_to_ring, - should_balance, do_openstack_upgrade, - write_rc_script, - setup_ipv6 + setup_ipv6, + update_rings, + balance_rings, + fully_synced, + sync_proxy_rings, + broadcast_rings_available, + mark_www_rings_deleted, + SwiftProxyClusterRPC, + get_first_available_value, + all_responses_equal, + ensure_www_dir_permissions, ) -from swift_context import get_swift_hash +import charmhelpers.contrib.openstack.utils as openstack +from charmhelpers.contrib.hahelpers.cluster import ( + is_elected_leader, + is_crm_leader +) from charmhelpers.core.hookenv import ( config, + local_unit, + remote_unit, unit_get, relation_set, relation_ids, relation_get, + related_units, log, + DEBUG, INFO, WARNING, ERROR, Hooks, UnregisteredHookError, - open_port + open_port, ) from charmhelpers.core.host import ( service_restart, service_stop, service_start, - restart_on_change + restart_on_change, ) from charmhelpers.fetch import ( apt_install, - apt_update + apt_update, ) from charmhelpers.payload.execd import execd_preinstall - from charmhelpers.contrib.openstack.ip import ( canonical_url, - PUBLIC, INTERNAL, ADMIN + PUBLIC, + INTERNAL, + ADMIN, ) from charmhelpers.contrib.network.ip import ( get_iface_for_address, get_netmask_for_address, get_address_in_network, get_ipv6_addr, + is_ipv6, format_ipv6_addr, - is_ipv6 ) - from charmhelpers.contrib.openstack.context import ADDRESS_TYPES extra_pkgs = [ @@ -74,9 +83,7 @@ extra_pkgs = [ "python-jinja2" ] - hooks = Hooks() - CONFIGS = register_configs() @@ -86,35 +93,55 @@ def install(): src = config('openstack-origin') if src != 'distro': openstack.configure_installation_source(src) + apt_update(fatal=True) rel = openstack.get_os_codename_install_source(src) - pkgs = determine_packages(rel) apt_install(pkgs, fatal=True) apt_install(extra_pkgs, fatal=True) ensure_swift_dir() - if cluster.is_elected_leader(SWIFT_HA_RES): - log("Leader established, generating ring builders") + if is_elected_leader(SWIFT_HA_RES): + log("Leader established, generating ring builders", level=INFO) # initialize new storage rings. - for ring in SWIFT_RINGS.iteritems(): - initialize_ring(ring[1], + for path in SWIFT_RINGS.itervalues(): + initialize_ring(path, config('partition-power'), config('replicas'), config('min-hours')) # configure a directory on webserver for distributing rings. - www_dir = get_www_dir() - if not os.path.isdir(www_dir): - os.mkdir(www_dir, 0o755) - uid, gid = swift_user() - os.chown(www_dir, uid, gid) + ensure_www_dir_permissions(get_www_dir()) + + +@hooks.hook('config-changed') +@restart_on_change(restart_map()) +def config_changed(): + if config('prefer-ipv6'): + setup_ipv6() + + configure_https() + open_port(config('bind-port')) + + # Determine whether or not we should do an upgrade. + if openstack.openstack_upgrade_available('python-swift'): + do_openstack_upgrade(CONFIGS) + + update_rings(min_part_hours=config('min-hours')) + + if not config('disable-ring-balance') and is_elected_leader(SWIFT_HA_RES): + # Try ring balance. If rings are balanced, no sync will occur. + balance_rings() + + for r_id in relation_ids('identity-service'): + keystone_joined(relid=r_id) @hooks.hook('identity-service-relation-joined') def keystone_joined(relid=None): - if not cluster.eligible_leader(SWIFT_HA_RES): + if not is_elected_leader(SWIFT_HA_RES): return + port = config('bind-port') admin_url = '%s:%s' % (canonical_url(CONFIGS, ADMIN), port) internal_url = '%s:%s/v1/AUTH_$(tenant_id)s' % \ @@ -136,129 +163,74 @@ def keystone_changed(): configure_https() -def get_hostaddr(): - if config('prefer-ipv6'): - return get_ipv6_addr(exc_list=[config('vip')])[0] +@hooks.hook('swift-storage-relation-joined') +def storage_joined(): + if not is_elected_leader(SWIFT_HA_RES): + log("New storage relation joined - stopping proxy until ring builder " + "synced", level=INFO) + service_stop('swift-proxy') - return unit_get('private-address') - - -def builders_synced(): - for ring in SWIFT_RINGS.itervalues(): - if not os.path.exists(ring): - log("Builder not yet synced - %s" % (ring)) - return False - - return True - - -def balance_rings(): - '''handle doing ring balancing and distribution.''' - if not cluster.eligible_leader(SWIFT_HA_RES): - log("Balance rings called by non-leader - skipping", level=WARNING) - return - - new_ring = False - for ring in SWIFT_RINGS.itervalues(): - if balance_ring(ring): - log('Balanced ring %s' % ring) - new_ring = True - - if not new_ring: - log("Rings unchanged by rebalance - skipping sync", level=INFO) - return - - www_dir = get_www_dir() - for ring, builder_path in SWIFT_RINGS.iteritems(): - ringfile = '%s.ring.gz' % ring - shutil.copyfile(os.path.join(SWIFT_CONF_DIR, ringfile), - os.path.join(www_dir, ringfile)) - shutil.copyfile(builder_path, - os.path.join(www_dir, os.path.basename(builder_path))) - - if cluster.is_clustered(): - hostname = config('vip') - else: - hostname = get_hostaddr() - - hostname = format_ipv6_addr(hostname) or hostname - - # Notify peers that builders are available - for rid in relation_ids('cluster'): - log("Notifying peer(s) that rings are ready for sync (rid='%s')" % - (rid)) - relation_set(relation_id=rid, - relation_settings={'builder-broker': hostname}) - - log('Broadcasting notification to all storage nodes that new ring is ' - 'ready for consumption.') - - path = os.path.basename(www_dir) - trigger = uuid.uuid4() - - rings_url = 'http://%s/%s' % (hostname, path) - # notify storage nodes that there is a new ring to fetch. - for relid in relation_ids('swift-storage'): - relation_set(relation_id=relid, swift_hash=get_swift_hash(), - rings_url=rings_url, trigger=trigger) - - service_restart('swift-proxy') + # This unit is not currently responsible for distributing rings but + # may become so at some time in the future so we do this to avoid the + # possibility of storage nodes getting out-of-date rings by deprecating + # any existing ones from the www dir. + mark_www_rings_deleted() @hooks.hook('swift-storage-relation-changed') @restart_on_change(restart_map()) def storage_changed(): + """Storage relation. + + Only the leader unit can update and distribute rings so if we are not the + leader we ignore this event and wait for a resync request from the leader. + """ + if not is_elected_leader(SWIFT_HA_RES): + log("Not the leader - ignoring storage relation until leader ready.", + level=DEBUG) + return + + log("Leader established, updating ring builders", level=INFO) + addr = relation_get('private-address') if config('prefer-ipv6'): - host_ip = '[%s]' % relation_get('private-address') + host_ip = format_ipv6_addr(addr) + if not host_ip: + errmsg = ("Did not get IPv6 address from storage relation " + "(got=%s)" % (addr)) + raise SwiftProxyCharmException(errmsg) else: - host_ip = openstack.get_host_ip(relation_get('private-address')) + host_ip = openstack.get_host_ip(addr) - if cluster.is_elected_leader(SWIFT_HA_RES): - log("Leader established, updating ring builders") + zone = get_zone(config('zone-assignment')) + node_settings = { + 'ip': host_ip, + 'zone': zone, + 'account_port': relation_get('account_port'), + 'object_port': relation_get('object_port'), + 'container_port': relation_get('container_port'), + } - zone = get_zone(config('zone-assignment')) - node_settings = { - 'ip': host_ip, - 'zone': zone, - 'account_port': relation_get('account_port'), - 'object_port': relation_get('object_port'), - 'container_port': relation_get('container_port'), - } + if None in node_settings.itervalues(): + missing = [k for k, v in node_settings.iteritems() if v is None] + log("Relation not ready - some required values not provided by " + "relation (missing=%s)" % (', '.join(missing)), level=INFO) + return None - if None in node_settings.itervalues(): - log('storage_changed: Relation not ready.') - return None + for k in ['zone', 'account_port', 'object_port', 'container_port']: + node_settings[k] = int(node_settings[k]) - for k in ['zone', 'account_port', 'object_port', 'container_port']: - node_settings[k] = int(node_settings[k]) + CONFIGS.write_all() - CONFIGS.write_all() + # Allow for multiple devs per unit, passed along as a : separated list + # Update and balance rings. + devs = relation_get('device') + if devs: + node_settings['devices'] = devs.split(':') - # allow for multiple devs per unit, passed along as a : separated list - devs = relation_get('device').split(':') - for dev in devs: - node_settings['device'] = dev - for ring in SWIFT_RINGS.itervalues(): - if not exists_in_ring(ring, node_settings): - add_to_ring(ring, node_settings) - - if should_balance([r for r in SWIFT_RINGS.itervalues()]): - balance_rings() - - # Notify peers that builders are available - for rid in relation_ids('cluster'): - log("Notifying peer(s) that ring builder is ready (rid='%s')" % - (rid)) - relation_set(relation_id=rid, - relation_settings={'builder-broker': - get_hostaddr()}) - else: - log("Not yet ready to balance rings - insufficient replicas?", - level=INFO) - else: - log("New storage relation joined - stopping proxy until ring builder " - "synced") - service_stop('swift-proxy') + update_rings(node_settings) + # Restart proxy here in case no config changes made (so + # restart_on_change() ineffective). + service_restart('swift-proxy') @hooks.hook('swift-storage-relation-broken') @@ -267,23 +239,6 @@ def storage_broken(): CONFIGS.write_all() -@hooks.hook('config-changed') -@restart_on_change(restart_map()) -def config_changed(): - if config('prefer-ipv6'): - setup_ipv6() - - configure_https() - open_port(config('bind-port')) - # Determine whether or not we should do an upgrade, based on the - # the version offered in keyston-release. - if (openstack.openstack_upgrade_available('python-swift')): - do_openstack_upgrade(CONFIGS) - for r_id in relation_ids('identity-service'): - keystone_joined(relid=r_id) - [cluster_joined(rid) for rid in relation_ids('cluster')] - - @hooks.hook('cluster-relation-joined') def cluster_joined(relation_id=None): for addr_type in ADDRESS_TYPES: @@ -301,68 +256,154 @@ def cluster_joined(relation_id=None): private_addr = unit_get('private-address') -def sync_proxy_rings(broker_url): - """The leader proxy is responsible for intialising, updating and - rebalancing the ring. Once the leader is ready the rings must then be - synced into each other proxy unit. +def all_peers_stopped(responses): + """Establish whether all peers have stopped their proxy services. - Note that we sync the ring builder and .gz files since the builder itself - is linked to the underlying .gz ring. + Each peer unit will set stop-proxy-service-ack to rq value to indicate that + it has stopped its proxy service. We wait for all units to be stopped + before triggering a sync. Peer services will be restarted once their rings + are synced with the leader. + + To be safe, default expectation is that api is still running. """ - log('Fetching swift rings & builders from proxy @ %s.' % broker_url) - target = '/etc/swift' - for server in ['account', 'object', 'container']: - url = '%s/%s.builder' % (broker_url, server) - log('Fetching %s.' % url) - cmd = ['wget', url, '--retry-connrefused', '-t', '10', '-O', - "%s/%s.builder" % (target, server)] - subprocess.check_call(cmd) + rq_key = SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC + ack_key = SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC_ACK + token = relation_get(attribute=rq_key, unit=local_unit()) + if not token or token != responses[0].get(ack_key): + log("Unmatched token in ack (expected=%s, got=%s)" % + (token, responses[0].get(ack_key)), level=DEBUG) + return False - url = '%s/%s.ring.gz' % (broker_url, server) - log('Fetching %s.' % url) - cmd = ['wget', url, '--retry-connrefused', '-t', '10', '-O', - '%s/%s.ring.gz' % (target, server)] - subprocess.check_call(cmd) + if not all_responses_equal(responses, ack_key): + return False + + return True + + +def cluster_leader_actions(): + """Cluster relation hook actions to be performed by leader units. + + NOTE: must be called by leader from cluster relation hook. + """ + log("Cluster changed by unit=%s (local is leader)" % (remote_unit()), + level=DEBUG) + + # If we have received an ack, check other units + settings = relation_get() or {} + ack_key = SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC_ACK + + # Protect against leader changing mid-sync + if settings.get(SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC): + log("Sync request received yet this is leader unit. This would " + "indicate that the leader has changed mid-sync - stopping proxy " + "and notifying peers", level=ERROR) + service_stop('swift-proxy') + SwiftProxyClusterRPC().notify_leader_changed() + return + elif ack_key in settings: + token = settings[ack_key] + # Find out if all peer units have been stopped. + responses = [] + for rid in relation_ids('cluster'): + for unit in related_units(rid): + responses.append(relation_get(rid=rid, unit=unit)) + + # Ensure all peers stopped before starting sync + if all_peers_stopped(responses): + key = 'peers-only' + if not all_responses_equal(responses, key, must_exist=False): + msg = ("Did not get equal response from every peer unit for " + "'%s'" % (key)) + raise SwiftProxyCharmException(msg) + + peers_only = int(get_first_available_value(responses, key, + default=0)) + log("Syncing rings and builders (peers-only=%s)" % (peers_only), + level=DEBUG) + broadcast_rings_available(token, storage=not peers_only) + else: + log("Not all peer apis stopped - skipping sync until all peers " + "ready (got %s)" % (responses), level=INFO) + + CONFIGS.write_all() + + +def cluster_non_leader_actions(): + """Cluster relation hook actions to be performed by non-leader units. + + NOTE: must be called by non-leader from cluster relation hook. + """ + log("Cluster changed by unit=%s (local is non-leader)" % (remote_unit()), + level=DEBUG) + settings = relation_get() or {} + + # Check whether we have been requested to stop proxy service + rq_key = SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC + token = settings.get(rq_key, None) + if token: + log("Peer request to stop proxy service received (%s) - sending ack" % + (token), level=INFO) + service_stop('swift-proxy') + peers_only = settings.get('peers-only', None) + rq = SwiftProxyClusterRPC().stop_proxy_ack(echo_token=token, + echo_peers_only=peers_only) + relation_set(relation_settings=rq) + return + + # Check if there are any builder files we can sync from the leader. + log("Non-leader peer - checking if updated rings available", level=DEBUG) + broker = settings.get('builder-broker', None) + if not broker: + log("No update available", level=DEBUG) + service_start('swift-proxy') + return + + builders_only = int(settings.get('sync-only-builders', 0)) + path = os.path.basename(get_www_dir()) + try: + sync_proxy_rings('http://%s/%s' % (broker, path), + rings=not builders_only) + except subprocess.CalledProcessError: + log("Ring builder sync failed, builders not yet available - " + "leader not ready?", level=WARNING) + return None + + # Re-enable the proxy once all builders and rings are synced + if fully_synced(): + log("Ring builders synced - starting proxy", level=INFO) + CONFIGS.write_all() + service_start('swift-proxy') + else: + log("Not all builders and rings synced yet - waiting for peer sync " + "before starting proxy", level=INFO) @hooks.hook('cluster-relation-changed', 'cluster-relation-departed') @restart_on_change(restart_map()) def cluster_changed(): - CONFIGS.write_all() + key = SwiftProxyClusterRPC.KEY_NOTIFY_LEADER_CHANGED + leader_changed = relation_get(attribute=key) + if leader_changed: + log("Leader changed notification received from peer unit. Since this " + "most likely occurred during a ring sync proxies will be " + "disabled until the leader is restored and a fresh sync request " + "is set out", level=WARNING) + service_stop("swift-proxy") + return - # If not the leader, see if there are any builder files we can sync from - # the leader. - if not cluster.is_elected_leader(SWIFT_HA_RES): - settings = relation_get() - broker = settings.get('builder-broker', None) - if broker: - path = os.path.basename(get_www_dir()) - broker_url = 'http://%s/%s' % (broker, path) - try: - sync_proxy_rings(broker_url) - except subprocess.CalledProcessError: - log("Ring builder sync failed, builders not yet available - " - "leader not ready?", level=WARNING) - return None - - if builders_synced(): - log("Ring builders synced - balancing rings and starting " - "proxy") - - CONFIGS.write_all() - service_start('swift-proxy') - else: - log("Not all builders synced yet - waiting for peer sync " - "before starting proxy", level=INFO) + if is_elected_leader(SWIFT_HA_RES): + cluster_leader_actions() + else: + cluster_non_leader_actions() @hooks.hook('ha-relation-changed') def ha_relation_changed(): clustered = relation_get('clustered') - if clustered and cluster.is_leader(SWIFT_HA_RES): - log('Cluster configured, notifying other services and' - 'updating keystone endpoint configuration') + if clustered and is_crm_leader(SWIFT_HA_RES): + log("Cluster configured, notifying other services and updating " + "keystone endpoint configuration", level=INFO) # Tell all related services to start using # the VIP instead for r_id in relation_ids('identity-service'): @@ -377,17 +418,12 @@ def ha_relation_joined(): corosync_mcastport = config('ha-mcastport') vip = config('vip') if not vip: - log('Unable to configure hacluster as vip not provided', - level=ERROR) - sys.exit(1) + msg = 'Unable to configure hacluster as vip not provided' + raise SwiftProxyCharmException(msg) # Obtain resources - resources = { - 'res_swift_haproxy': 'lsb:haproxy' - } - resource_params = { - 'res_swift_haproxy': 'op monitor interval="5s"' - } + resources = {'res_swift_haproxy': 'lsb:haproxy'} + resource_params = {'res_swift_haproxy': 'op monitor interval="5s"'} vip_group = [] for vip in vip.split(): @@ -414,12 +450,8 @@ def ha_relation_joined(): if len(vip_group) >= 1: relation_set(groups={'grp_swift_vips': ' '.join(vip_group)}) - init_services = { - 'res_swift_haproxy': 'haproxy' - } - clones = { - 'cl_swift_haproxy': 'res_swift_haproxy' - } + init_services = {'res_swift_haproxy': 'haproxy'} + clones = {'cl_swift_haproxy': 'res_swift_haproxy'} relation_set(init_services=init_services, corosync_bindiface=corosync_bindiface, @@ -430,10 +462,9 @@ def ha_relation_joined(): def configure_https(): - ''' - Enables SSL API Apache config if appropriate and kicks identity-service + """Enables SSL API Apache config if appropriate and kicks identity-service with any required api updates. - ''' + """ # need to write all to ensure changes to the entire request pipeline # propagate (c-api, haprxy, apache) CONFIGS.write_all() @@ -451,14 +482,17 @@ def configure_https(): for rid in relation_ids('identity-service'): keystone_joined(relid=rid) - write_rc_script() + env_vars = {'OPENSTACK_SERVICE_SWIFT': 'proxy-server', + 'OPENSTACK_PORT_API': config('bind-port'), + 'OPENSTACK_PORT_MEMCACHED': 11211} + openstack.save_script_rc(**env_vars) def main(): try: hooks.execute(sys.argv) except UnregisteredHookError as e: - log('Unknown hook {} - skipping.'.format(e)) + log('Unknown hook {} - skipping.'.format(e), level=DEBUG) if __name__ == '__main__': diff --git a/hooks/swift_utils.py b/hooks/swift_utils.py index 7f3204c..31bcb4c 100644 --- a/hooks/swift_utils.py +++ b/hooks/swift_utils.py @@ -1,14 +1,48 @@ +import copy +import glob +import hashlib import os import pwd +import shutil import subprocess -import charmhelpers.contrib.openstack.utils as openstack -import sys -from collections import OrderedDict +import tempfile +import threading +import uuid +from collections import OrderedDict +from swift_context import ( + get_swift_hash, + SwiftHashContext, + SwiftIdentityContext, + HAProxyContext, + SwiftRingContext, + ApacheSSLContext, + MemcachedContext, +) + +import charmhelpers.contrib.openstack.context as context +import charmhelpers.contrib.openstack.templating as templating +from charmhelpers.contrib.openstack.utils import ( + get_os_codename_package, + get_os_codename_install_source, + configure_installation_source +) +from charmhelpers.contrib.hahelpers.cluster import ( + is_elected_leader, + is_clustered, + peer_units, +) from charmhelpers.core.hookenv import ( - log, ERROR, + log, + DEBUG, + INFO, + WARNING, config, relation_get, + unit_get, + relation_set, + relation_ids, + related_units, ) from charmhelpers.fetch import ( apt_update, @@ -16,31 +50,38 @@ from charmhelpers.fetch import ( apt_install, add_source ) - from charmhelpers.core.host import ( lsb_release ) - -import charmhelpers.contrib.openstack.context as context -import charmhelpers.contrib.openstack.templating as templating -import swift_context - +from charmhelpers.contrib.network.ip import ( + format_ipv6_addr, + get_ipv6_addr, +) +from charmhelpers.core.decorators import ( + retry_on_exception, +) # Various config files that are managed via templating. -SWIFT_CONF = '/etc/swift/swift.conf' -SWIFT_PROXY_CONF = '/etc/swift/proxy-server.conf' +SWIFT_CONF_DIR = '/etc/swift' +SWIFT_RING_EXT = 'ring.gz' +SWIFT_CONF = os.path.join(SWIFT_CONF_DIR, 'swift.conf') +SWIFT_PROXY_CONF = os.path.join(SWIFT_CONF_DIR, 'proxy-server.conf') SWIFT_CONF_DIR = os.path.dirname(SWIFT_CONF) MEMCACHED_CONF = '/etc/memcached.conf' SWIFT_RINGS_CONF = '/etc/apache2/conf.d/swift-rings' SWIFT_RINGS_24_CONF = '/etc/apache2/conf-available/swift-rings.conf' HAPROXY_CONF = '/etc/haproxy/haproxy.cfg' -APACHE_SITE_CONF = '/etc/apache2/sites-available/openstack_https_frontend' -APACHE_SITE_24_CONF = '/etc/apache2/sites-available/' \ - 'openstack_https_frontend.conf' +APACHE_SITES_AVAILABLE = '/etc/apache2/sites-available' +APACHE_SITE_CONF = os.path.join(APACHE_SITES_AVAILABLE, + 'openstack_https_frontend') +APACHE_SITE_24_CONF = os.path.join(APACHE_SITES_AVAILABLE, + 'openstack_https_frontend.conf') WWW_DIR = '/var/www/swift-rings' ALTERNATE_WWW_DIR = '/var/www/html/swift-rings' +RING_SYNC_SEMAPHORE = threading.Semaphore() + def get_www_dir(): if os.path.isdir(os.path.dirname(ALTERNATE_WWW_DIR)): @@ -50,13 +91,13 @@ def get_www_dir(): SWIFT_RINGS = { - 'account': '/etc/swift/account.builder', - 'container': '/etc/swift/container.builder', - 'object': '/etc/swift/object.builder' + 'account': os.path.join(SWIFT_CONF_DIR, 'account.builder'), + 'container': os.path.join(SWIFT_CONF_DIR, 'container.builder'), + 'object': os.path.join(SWIFT_CONF_DIR, 'object.builder') } -SSL_CERT = '/etc/swift/cert.crt' -SSL_KEY = '/etc/swift/cert.key' +SSL_CERT = os.path.join(SWIFT_CONF_DIR, 'cert.crt') +SSL_KEY = os.path.join(SWIFT_CONF_DIR, 'cert.key') # Essex packages BASE_PACKAGES = [ @@ -70,59 +111,179 @@ BASE_PACKAGES = [ FOLSOM_PACKAGES = BASE_PACKAGES + ['swift-plugin-s3'] SWIFT_HA_RES = 'grp_swift_vips' - TEMPLATES = 'templates/' # Map config files to hook contexts and services that will be associated # with file in restart_on_changes()'s service map. CONFIG_FILES = OrderedDict([ (SWIFT_CONF, { - 'hook_contexts': [swift_context.SwiftHashContext()], + 'hook_contexts': [SwiftHashContext()], 'services': ['swift-proxy'], }), (SWIFT_PROXY_CONF, { - 'hook_contexts': [swift_context.SwiftIdentityContext(), + 'hook_contexts': [SwiftIdentityContext(), context.BindHostContext()], 'services': ['swift-proxy'], }), (HAPROXY_CONF, { 'hook_contexts': [context.HAProxyContext(singlenode_mode=True), - swift_context.HAProxyContext()], + HAProxyContext()], 'services': ['haproxy'], }), (SWIFT_RINGS_CONF, { - 'hook_contexts': [swift_context.SwiftRingContext()], + 'hook_contexts': [SwiftRingContext()], 'services': ['apache2'], }), (SWIFT_RINGS_24_CONF, { - 'hook_contexts': [swift_context.SwiftRingContext()], + 'hook_contexts': [SwiftRingContext()], 'services': ['apache2'], }), (APACHE_SITE_CONF, { - 'hook_contexts': [swift_context.ApacheSSLContext()], + 'hook_contexts': [ApacheSSLContext()], 'services': ['apache2'], }), (APACHE_SITE_24_CONF, { - 'hook_contexts': [swift_context.ApacheSSLContext()], + 'hook_contexts': [ApacheSSLContext()], 'services': ['apache2'], }), (MEMCACHED_CONF, { - 'hook_contexts': [swift_context.MemcachedContext()], + 'hook_contexts': [MemcachedContext()], 'services': ['memcached'], }), ]) -def register_configs(): +class SwiftProxyCharmException(Exception): + pass + + +class SwiftProxyClusterRPC(object): + """Provides cluster relation rpc dicts. + + Crucially, this ensures that any settings we don't use in any given call + are set to None, therefore removing them from the relation so they don't + get accidentally interpreted by the receiver as part of the request. + + NOTE: these are only intended to be used from cluster peer relations. """ - Register config files with their respective contexts. - Regstration of some configs may not be required depending on + + KEY_STOP_PROXY_SVC = 'stop-proxy-service' + KEY_STOP_PROXY_SVC_ACK = 'stop-proxy-service-ack' + KEY_NOTIFY_LEADER_CHANGED = 'leader-changed-notification' + + def __init__(self, version=1): + self._version = version + + def template(self): + # Everything must be None by default so it gets dropped from the + # relation unless we want it to be set. + templates = {1: {'trigger': None, + 'broker-token': None, + 'builder-broker': None, + self.KEY_STOP_PROXY_SVC: None, + self.KEY_STOP_PROXY_SVC_ACK: None, + self.KEY_NOTIFY_LEADER_CHANGED: None, + 'peers-only': None, + 'sync-only-builders': None}} + return copy.deepcopy(templates[self._version]) + + def stop_proxy_request(self, peers_only=False): + """Request to stop peer proxy service. + + NOTE: leader action + """ + rq = self.template() + rq['trigger'] = str(uuid.uuid4()) + rq[self.KEY_STOP_PROXY_SVC] = rq['trigger'] + if peers_only: + rq['peers-only'] = 1 + + return rq + + def stop_proxy_ack(self, echo_token, echo_peers_only): + """Ack that peer proxy service is stopped. + + NOTE: non-leader action + """ + rq = self.template() + rq['trigger'] = str(uuid.uuid4()) + # These echo values should match those received in the request + rq[self.KEY_STOP_PROXY_SVC_ACK] = echo_token + rq['peers-only'] = echo_peers_only + return rq + + def sync_rings_request(self, broker_host, broker_token, + builders_only=False): + """Request for peer to sync rings. + + NOTE: leader action + """ + rq = self.template() + rq['trigger'] = str(uuid.uuid4()) + + if builders_only: + rq['sync-only-builders'] = 1 + + rq['broker-token'] = broker_token + rq['builder-broker'] = broker_host + return rq + + def notify_leader_changed(self): + """Notify peers that leader has changed. + + NOTE: leader action + """ + rq = self.template() + rq['trigger'] = str(uuid.uuid4()) + rq[self.KEY_NOTIFY_LEADER_CHANGED] = rq['trigger'] + return rq + + +def get_first_available_value(responses, key, default=None): + for r in responses: + if key in r: + return r[key] + + return default + + +def all_responses_equal(responses, key, must_exist=True): + """If key exists in responses, all values for it must be equal. + + If all equal return True. If key does not exist and must_exist is True + return False otherwise True. + """ + sentinel = object() + val = None + all_equal = True + for r in responses: + _val = r.get(key, sentinel) + if val is not None and val != _val: + all_equal = False + break + elif _val != sentinel: + val = _val + + if must_exist and val is None: + all_equal = False + + if all_equal: + return True + + log("Responses not all equal for key '%s'" % (key), level=DEBUG) + return False + + +def register_configs(): + """Register config files with their respective contexts. + + Registration of some configs may not be required depending on existing of certain relations. """ # if called without anything installed (eg during install hook) # just default to earliest supported release. configs dont get touched # till post-install, anyway. - release = openstack.get_os_codename_package('swift-proxy', fatal=False) \ + release = get_os_codename_package('swift-proxy', fatal=False) \ or 'essex' configs = templating.OSConfigRenderer(templates_dir=TEMPLATES, openstack_release=release) @@ -149,13 +310,12 @@ def register_configs(): def restart_map(): - ''' - Determine the correct resource map to be passed to + """Determine the correct resource map to be passed to charmhelpers.core.restart_on_change() based on the services configured. - :returns: dict: A dictionary mapping config file to lists of services + :returns dict: A dictionary mapping config file to lists of services that should be restarted when file changes. - ''' + """ _map = [] for f, ctxt in CONFIG_FILES.iteritems(): svcs = [] @@ -163,6 +323,7 @@ def restart_map(): svcs.append(svc) if svcs: _map.append((f, svcs)) + return OrderedDict(_map) @@ -174,12 +335,13 @@ def swift_user(username='swift'): def ensure_swift_dir(conf_dir=os.path.dirname(SWIFT_CONF)): if not os.path.isdir(conf_dir): os.mkdir(conf_dir, 0o750) + uid, gid = swift_user() os.chown(conf_dir, uid, gid) def determine_packages(release): - '''determine what packages are needed for a given OpenStack release''' + """Determine what packages are needed for a given OpenStack release.""" if release == 'essex': return BASE_PACKAGES elif release == 'folsom': @@ -190,13 +352,6 @@ def determine_packages(release): return FOLSOM_PACKAGES -def write_rc_script(): - env_vars = {'OPENSTACK_SERVICE_SWIFT': 'proxy-server', - 'OPENSTACK_PORT_API': config('bind-port'), - 'OPENSTACK_PORT_MEMCACHED': 11211} - openstack.save_script_rc(**env_vars) - - def _load_builder(path): # lifted straight from /usr/bin/swift-ring-builder from swift.common.ring import RingBuilder @@ -213,6 +368,7 @@ def _load_builder(path): for dev in builder.devs: if dev and 'meta' not in dev: dev['meta'] = '' + return builder @@ -222,14 +378,14 @@ def _write_ring(ring, ring_path): def ring_port(ring_path, node): - '''determine correct port from relation settings for a given ring file.''' + """Determine correct port from relation settings for a given ring file.""" for name in ['account', 'object', 'container']: if name in ring_path: return node[('%s_port' % name)] def initialize_ring(path, part_power, replicas, min_hours): - '''Initialize a new swift ring with given parameters.''' + """Initialize a new swift ring with given parameters.""" from swift.common.ring import RingBuilder ring = RingBuilder(part_power, replicas, min_hours) _write_ring(ring, path) @@ -244,14 +400,13 @@ def exists_in_ring(ring_path, node): n = [(i, node[i]) for i in node if i in dev and i != 'zone'] if sorted(d) == sorted(n): - msg = 'Node already exists in ring (%s).' % ring_path - log(msg) + log('Node already exists in ring (%s).' % ring_path, level=INFO) return True return False -def add_to_ring(ring_path, node): +def add_to_ring(ring_path, node, device): ring = _load_builder(ring_path) port = ring_port(ring_path, node) @@ -265,16 +420,14 @@ def add_to_ring(ring_path, node): 'zone': node['zone'], 'ip': node['ip'], 'port': port, - 'device': node['device'], + 'device': device, 'weight': 100, 'meta': '', } ring.add_dev(new_dev) _write_ring(ring, ring_path) - msg = 'Added new device to ring %s: %s' %\ - (ring_path, - [k for k in new_dev.iteritems()]) - log(msg) + msg = 'Added new device to ring %s: %s' % (ring_path, new_dev) + log(msg, level=INFO) def _get_zone(ring_builder): @@ -302,19 +455,33 @@ def _get_zone(ring_builder): return sorted(zone_distrib, key=zone_distrib.get).pop(0) +def get_min_part_hours(ring): + builder = _load_builder(ring) + return builder.min_part_hours + + +def set_min_part_hours(path, value): + cmd = ['swift-ring-builder', path, 'set_min_part_hours', str(value)] + p = subprocess.Popen(cmd) + p.communicate() + rc = p.returncode + if rc != 0: + msg = ("Failed to set min_part_hours=%s on %s" % (value, path)) + raise SwiftProxyCharmException(msg) + + def get_zone(assignment_policy): - ''' Determine the appropriate zone depending on configured assignment - policy. + """Determine appropriate zone based on configured assignment policy. - Manual assignment relies on each storage zone being deployed as a - separate service unit with its desired zone set as a configuration - option. + Manual assignment relies on each storage zone being deployed as a + separate service unit with its desired zone set as a configuration + option. - Auto assignment distributes swift-storage machine units across a number - of zones equal to the configured minimum replicas. This allows for a - single swift-storage service unit, with each 'add-unit'd machine unit - being assigned to a different zone. - ''' + Auto assignment distributes swift-storage machine units across a number + of zones equal to the configured minimum replicas. This allows for a + single swift-storage service unit, with each 'add-unit'd machine unit + being assigned to a different zone. + """ if assignment_policy == 'manual': return relation_get('zone') elif assignment_policy == 'auto': @@ -324,13 +491,15 @@ def get_zone(assignment_policy): potential_zones.append(_get_zone(builder)) return set(potential_zones).pop() else: - log('Invalid zone assignment policy: %s' % assignment_policy, - level=ERROR) - sys.exit(1) + msg = ('Invalid zone assignment policy: %s' % assignment_policy) + raise SwiftProxyCharmException(msg) def balance_ring(ring_path): - '''balance a ring. return True if it needs redistribution''' + """Balance a ring. + + Returns True if it needs redistribution. + """ # shell out to swift-ring-builder instead, since the balancing code there # does a bunch of un-importable validation.''' cmd = ['swift-ring-builder', ring_path, 'rebalance'] @@ -339,22 +508,42 @@ def balance_ring(ring_path): rc = p.returncode if rc == 0: return True - elif rc == 1: - # swift-ring-builder returns 1 on WARNING (ring didn't require balance) + + if rc == 1: + # Ring builder exit-code=1 is supposed to indicate warning but I have + # noticed that it can also return 1 with the following sort of message: + # + # NOTE: Balance of 166.67 indicates you should push this ring, wait + # at least 0 hours, and rebalance/repush. + # + # This indicates that a balance has occurred and a resync would be + # required so not sure why 1 is returned in this case. return False - else: - log('balance_ring: %s returned %s' % (cmd, rc), level=ERROR) - sys.exit(1) + + msg = ('balance_ring: %s returned %s' % (cmd, rc)) + raise SwiftProxyCharmException(msg) def should_balance(rings): - '''Based on zones vs min. replicas, determine whether or not the rings - should be balanced during initial configuration.''' + """Determine whether or not a re-balance is required and allowed. + + Ring balance can be disabled/postponed using the disable-ring-balance + config option. + + Otherwise, using zones vs min. replicas, determine whether or not the rings + should be balanced. + """ + if config('disable-ring-balance'): + return False + for ring in rings: builder = _load_builder(ring).to_dict() replicas = builder['replicas'] zones = [dev['zone'] for dev in builder['devs']] - if len(set(zones)) < replicas: + num_zones = len(set(zones)) + if num_zones < replicas: + log("Not enough zones (%d) defined to allow ring balance " + "(need >= %d)" % (num_zones, replicas), level=INFO) return False return True @@ -362,10 +551,10 @@ def should_balance(rings): def do_openstack_upgrade(configs): new_src = config('openstack-origin') - new_os_rel = openstack.get_os_codename_install_source(new_src) + new_os_rel = get_os_codename_install_source(new_src) - log('Performing OpenStack upgrade to %s.' % (new_os_rel)) - openstack.configure_installation_source(new_src) + log('Performing OpenStack upgrade to %s.' % (new_os_rel), level=DEBUG) + configure_installation_source(new_src) dpkg_opts = [ '--option', 'Dpkg::Options::=--force-confnew', '--option', 'Dpkg::Options::=--force-confdef', @@ -377,10 +566,16 @@ def do_openstack_upgrade(configs): def setup_ipv6(): + """Validate that we can support IPv6 mode. + + This should be called if prefer-ipv6 is True to ensure that we are running + in an environment that supports ipv6. + """ ubuntu_rel = lsb_release()['DISTRIB_CODENAME'].lower() if ubuntu_rel < "trusty": - raise Exception("IPv6 is not supported in the charms for Ubuntu " - "versions less than Trusty 14.04") + msg = ("IPv6 is not supported in the charms for Ubuntu versions less " + "than Trusty 14.04") + raise SwiftProxyCharmException(msg) # NOTE(xianghui): Need to install haproxy(1.5.3) from trusty-backports # to support ipv6 address, so check is required to make sure not @@ -390,3 +585,413 @@ def setup_ipv6(): ' main') apt_update() apt_install('haproxy/trusty-backports', fatal=True) + + +@retry_on_exception(3, base_delay=2, exc_type=subprocess.CalledProcessError) +def sync_proxy_rings(broker_url, builders=True, rings=True): + """The leader proxy is responsible for intialising, updating and + rebalancing the ring. Once the leader is ready the rings must then be + synced into each other proxy unit. + + Note that we sync the ring builder and .gz files since the builder itself + is linked to the underlying .gz ring. + """ + log('Fetching swift rings & builders from proxy @ %s.' % broker_url, + level=DEBUG) + target = SWIFT_CONF_DIR + synced = [] + tmpdir = tempfile.mkdtemp(prefix='swiftrings') + try: + for server in ['account', 'object', 'container']: + if builders: + url = '%s/%s.builder' % (broker_url, server) + log('Fetching %s.' % url, level=DEBUG) + builder = "%s.builder" % (server) + cmd = ['wget', url, '--retry-connrefused', '-t', '10', '-O', + os.path.join(tmpdir, builder)] + subprocess.check_call(cmd) + synced.append(builder) + + if rings: + url = '%s/%s.%s' % (broker_url, server, SWIFT_RING_EXT) + log('Fetching %s.' % url, level=DEBUG) + ring = '%s.%s' % (server, SWIFT_RING_EXT) + cmd = ['wget', url, '--retry-connrefused', '-t', '10', '-O', + os.path.join(tmpdir, ring)] + subprocess.check_call(cmd) + synced.append(ring) + + # Once all have been successfully downloaded, move them to actual + # location. + for f in synced: + os.rename(os.path.join(tmpdir, f), os.path.join(target, f)) + finally: + shutil.rmtree(tmpdir) + + +def ensure_www_dir_permissions(www_dir): + if not os.path.isdir(www_dir): + os.mkdir(www_dir, 0o755) + else: + os.chmod(www_dir, 0o755) + + uid, gid = swift_user() + os.chown(www_dir, uid, gid) + + +def update_www_rings(rings=True, builders=True): + """Copy rings to apache www dir. + + Try to do this as atomically as possible to avoid races with storage nodes + syncing rings. + """ + if not (rings or builders): + return + + tmp_dir = tempfile.mkdtemp(prefix='swift-rings-www-tmp') + for ring, builder_path in SWIFT_RINGS.iteritems(): + if rings: + ringfile = '%s.%s' % (ring, SWIFT_RING_EXT) + src = os.path.join(SWIFT_CONF_DIR, ringfile) + dst = os.path.join(tmp_dir, ringfile) + shutil.copyfile(src, dst) + + if builders: + src = builder_path + dst = os.path.join(tmp_dir, os.path.basename(builder_path)) + shutil.copyfile(src, dst) + + www_dir = get_www_dir() + deleted = "%s.deleted" % (www_dir) + ensure_www_dir_permissions(tmp_dir) + os.rename(www_dir, deleted) + os.rename(tmp_dir, www_dir) + shutil.rmtree(deleted) + + +def get_rings_checksum(): + """Returns sha256 checksum for rings in /etc/swift.""" + sha = hashlib.sha256() + for ring in SWIFT_RINGS.iterkeys(): + path = os.path.join(SWIFT_CONF_DIR, '%s.%s' % (ring, SWIFT_RING_EXT)) + if not os.path.isfile(path): + continue + + with open(path, 'rb') as fd: + sha.update(fd.read()) + + return sha.hexdigest() + + +def get_builders_checksum(): + """Returns sha256 checksum for builders in /etc/swift.""" + sha = hashlib.sha256() + for builder in SWIFT_RINGS.itervalues(): + if not os.path.exists(builder): + continue + + with open(builder, 'rb') as fd: + sha.update(fd.read()) + + return sha.hexdigest() + + +def get_broker_token(): + """Get ack token from peers to be used as broker token. + + Must be equal across all peers. + + Returns token or None if not found. + """ + responses = [] + ack_key = SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC_ACK + for rid in relation_ids('cluster'): + for unit in related_units(rid): + responses.append(relation_get(attribute=ack_key, rid=rid, + unit=unit)) + + # If no acks exist we have probably never done a sync so make up a token + if len(responses) == 0: + return str(uuid.uuid4()) + + if not all(responses) or len(set(responses)) != 1: + log("Not all ack tokens equal - %s" % (responses), level=DEBUG) + return None + + return responses[0] + + +def sync_builders_and_rings_if_changed(f): + """Only trigger a ring or builder sync if they have changed as a result of + the decorated operation. + """ + def _inner_sync_builders_and_rings_if_changed(*args, **kwargs): + if not is_elected_leader(SWIFT_HA_RES): + log("Sync rings called by non-leader - skipping", level=WARNING) + return + + try: + # Ensure we don't do a double sync if we are nested. + do_sync = False + if RING_SYNC_SEMAPHORE.acquire(blocking=0): + do_sync = True + rings_before = get_rings_checksum() + builders_before = get_builders_checksum() + + ret = f(*args, **kwargs) + + if not do_sync: + return ret + + rings_after = get_rings_checksum() + builders_after = get_builders_checksum() + + rings_path = os.path.join(SWIFT_CONF_DIR, '*.%s' % + (SWIFT_RING_EXT)) + rings_ready = len(glob.glob(rings_path)) == len(SWIFT_RINGS) + rings_changed = rings_after != rings_before + builders_changed = builders_after != builders_before + if rings_changed or builders_changed: + # Copy builders and rings (if available) to the server dir. + update_www_rings(rings=rings_ready) + if rings_changed and rings_ready: + # Trigger sync + cluster_sync_rings(peers_only=not rings_changed) + else: + cluster_sync_rings(peers_only=True, builders_only=True) + log("Rings not ready for sync - syncing builders", + level=DEBUG) + else: + log("Rings/builders unchanged - skipping sync", level=DEBUG) + + return ret + finally: + RING_SYNC_SEMAPHORE.release() + + return _inner_sync_builders_and_rings_if_changed + + +@sync_builders_and_rings_if_changed +def update_rings(node_settings=None, min_part_hours=None): + """Update builder with node settings and balance rings if necessary. + + Also update min_part_hours if provided. + """ + if not is_elected_leader(SWIFT_HA_RES): + log("Update rings called by non-leader - skipping", level=WARNING) + return + + balance_required = False + + if min_part_hours: + # NOTE: no need to stop the proxy since we are not changing the rings, + # only the builder. + + # Only update if all exist + if all([os.path.exists(p) for p in SWIFT_RINGS.itervalues()]): + for ring, path in SWIFT_RINGS.iteritems(): + current_min_part_hours = get_min_part_hours(path) + if min_part_hours != current_min_part_hours: + log("Setting ring %s min_part_hours to %s" % + (ring, min_part_hours), level=INFO) + try: + set_min_part_hours(path, min_part_hours) + except SwiftProxyCharmException as exc: + # TODO: ignore for now since this should not be + # critical but in the future we should support a + # rollback. + log(str(exc), level=WARNING) + else: + balance_required = True + + if node_settings: + for dev in node_settings.get('devices', []): + for ring in SWIFT_RINGS.itervalues(): + if not exists_in_ring(ring, node_settings): + add_to_ring(ring, node_settings, dev) + balance_required = True + + if balance_required: + balance_rings() + + +@sync_builders_and_rings_if_changed +def balance_rings(): + """Rebalance each ring and notify peers that new rings are available.""" + if not is_elected_leader(SWIFT_HA_RES): + log("Balance rings called by non-leader - skipping", level=WARNING) + return + + if not should_balance([r for r in SWIFT_RINGS.itervalues()]): + log("Not yet ready to balance rings - insufficient replicas?", + level=INFO) + return + + rebalanced = False + for path in SWIFT_RINGS.itervalues(): + if balance_ring(path): + log('Balanced ring %s' % path, level=DEBUG) + rebalanced = True + else: + log('Ring %s not rebalanced' % path, level=DEBUG) + + if not rebalanced: + log("Rings unchanged by rebalance", level=DEBUG) + # NOTE: checksum will tell for sure + + +def mark_www_rings_deleted(): + """Mark any rings from the apache server directory as deleted so that + storage units won't see them. + """ + www_dir = get_www_dir() + for ring, _ in SWIFT_RINGS.iteritems(): + path = os.path.join(www_dir, '%s.ring.gz' % ring) + if os.path.exists(path): + os.rename(path, "%s.deleted" % (path)) + + +def notify_peers_builders_available(broker_token, builders_only=False): + """Notify peer swift-proxy units that they should synchronise ring and + builder files. + + Note that this should only be called from the leader unit. + """ + if not is_elected_leader(SWIFT_HA_RES): + log("Ring availability peer broadcast requested by non-leader - " + "skipping", level=WARNING) + return + + if is_clustered(): + hostname = config('vip') + else: + hostname = get_hostaddr() + + hostname = format_ipv6_addr(hostname) or hostname + # Notify peers that builders are available + log("Notifying peer(s) that rings are ready for sync.", level=INFO) + rq = SwiftProxyClusterRPC().sync_rings_request(hostname, + broker_token, + builders_only=builders_only) + for rid in relation_ids('cluster'): + log("Notifying rid=%s (%s)" % (rid, rq), level=DEBUG) + relation_set(relation_id=rid, relation_settings=rq) + + +def broadcast_rings_available(broker_token, peers=True, storage=True, + builders_only=False): + """Notify storage relations and cluster (peer) relations that rings and + builders are availble for sync. + + We can opt to only notify peer or storage relations if needs be. + """ + if storage: + # TODO: get ack from storage units that they are synced before + # syncing proxies. + notify_storage_rings_available() + else: + log("Skipping notify storage relations", level=DEBUG) + + if peers: + notify_peers_builders_available(broker_token, + builders_only=builders_only) + else: + log("Skipping notify peer relations", level=DEBUG) + + +def cluster_sync_rings(peers_only=False, builders_only=False): + """Notify peer relations that they should stop their proxy services. + + Peer units will then be expected to do a relation_set with + stop-proxy-service-ack set rq value. Once all peers have responded, the + leader will send out notification to all relations that rings are available + for sync. + + If peers_only is True, only peer units will be synced. This is typically + used when only builder files have been changed. + + This should only be called by the leader unit. + """ + if not is_elected_leader(SWIFT_HA_RES): + # Only the leader can do this. + return + + if not peer_units(): + # If we have no peer units just go ahead and broadcast to storage + # relations. If we have been instructed to only broadcast to peers this + # should do nothing. + broker_token = get_broker_token() + broadcast_rings_available(broker_token, peers=False, + storage=not peers_only) + return + elif builders_only: + # No need to stop proxies if only syncing builders between peers. + broker_token = get_broker_token() + broadcast_rings_available(broker_token, storage=False, + builders_only=builders_only) + return + + rel_ids = relation_ids('cluster') + trigger = str(uuid.uuid4()) + + log("Sending request to stop proxy service to all peers (%s)" % (trigger), + level=INFO) + rq = SwiftProxyClusterRPC().stop_proxy_request(peers_only) + for rid in rel_ids: + relation_set(relation_id=rid, relation_settings=rq) + + +def notify_storage_rings_available(): + """Notify peer swift-storage relations that they should synchronise ring + and builder files. + + Note that this should only be called from the leader unit. + """ + if not is_elected_leader(SWIFT_HA_RES): + log("Ring availability storage-relation broadcast requested by " + "non-leader - skipping", level=WARNING) + return + + if is_clustered(): + hostname = config('vip') + else: + hostname = get_hostaddr() + + hostname = format_ipv6_addr(hostname) or hostname + path = os.path.basename(get_www_dir()) + rings_url = 'http://%s/%s' % (hostname, path) + trigger = uuid.uuid4() + # Notify storage nodes that there is a new ring to fetch. + log("Notifying storage nodes that new ring is ready for sync.", level=INFO) + for relid in relation_ids('swift-storage'): + relation_set(relation_id=relid, swift_hash=get_swift_hash(), + rings_url=rings_url, trigger=trigger) + + +def fully_synced(): + """Check that we have all the rings and builders synced from the leader. + + Returns True if we have all rings and builders. + """ + not_synced = [] + for ring, builder in SWIFT_RINGS.iteritems(): + if not os.path.exists(builder): + not_synced.append(builder) + + ringfile = os.path.join(SWIFT_CONF_DIR, + '%s.%s' % (ring, SWIFT_RING_EXT)) + if not os.path.exists(ringfile): + not_synced.append(ringfile) + + if not_synced: + log("Not yet synced: %s" % ', '.join(not_synced), level=INFO) + return False + + return True + + +def get_hostaddr(): + if config('prefer-ipv6'): + return get_ipv6_addr(exc_list=[config('vip')])[0] + + return unit_get('private-address') diff --git a/tests/charmhelpers/__init__.py b/tests/charmhelpers/__init__.py index e69de29..b46e2e2 100644 --- a/tests/charmhelpers/__init__.py +++ b/tests/charmhelpers/__init__.py @@ -0,0 +1,22 @@ +# Bootstrap charm-helpers, installing its dependencies if necessary using +# only standard libraries. +import subprocess +import sys + +try: + import six # flake8: noqa +except ImportError: + if sys.version_info.major == 2: + subprocess.check_call(['apt-get', 'install', '-y', 'python-six']) + else: + subprocess.check_call(['apt-get', 'install', '-y', 'python3-six']) + import six # flake8: noqa + +try: + import yaml # flake8: noqa +except ImportError: + if sys.version_info.major == 2: + subprocess.check_call(['apt-get', 'install', '-y', 'python-yaml']) + else: + subprocess.check_call(['apt-get', 'install', '-y', 'python3-yaml']) + import yaml # flake8: noqa diff --git a/unit_tests/test_swift_context.py b/unit_tests/test_swift_context.py new file mode 100644 index 0000000..e0dbb6b --- /dev/null +++ b/unit_tests/test_swift_context.py @@ -0,0 +1,56 @@ +import mock +import os +import tempfile +import unittest +import uuid + + +with mock.patch('charmhelpers.core.hookenv.config'): + import swift_context + + +class SwiftContextTestCase(unittest.TestCase): + + @mock.patch('swift_context.config') + def test_get_swift_hash_file(self, mock_config): + expected = '##FILEHASH##' + with tempfile.NamedTemporaryFile() as tmpfile: + swift_context.SWIFT_HASH_FILE = tmpfile.name + tmpfile.write(expected) + tmpfile.seek(0) + os.fsync(tmpfile) + hash = swift_context.get_swift_hash() + + self.assertFalse(mock_config.called) + self.assertEqual(expected, hash) + + @mock.patch('swift_context.config') + def test_get_swift_hash_config(self, mock_config): + expected = '##CFGHASH##' + mock_config.return_value = expected + tmpfile = tempfile.mktemp() + swift_context.SWIFT_HASH_FILE = tmpfile + hash = swift_context.get_swift_hash() + + with open(tmpfile, 'r') as fd: + self.assertEqual(expected, fd.read()) + + self.assertTrue(mock_config.called) + self.assertEqual(expected, hash) + + @mock.patch('swift_context.service_name') + @mock.patch('swift_context.config') + def test_get_swift_hash_env(self, mock_config, mock_service_name): + mock_config.return_value = None + mock_service_name.return_value = "testsvc" + tmpfile = tempfile.mktemp() + swift_context.SWIFT_HASH_FILE = tmpfile + with mock.patch('swift_context.os.environ.get') as mock_env_get: + mock_env_get.return_value = str(uuid.uuid4()) + hash = swift_context.get_swift_hash() + mock_env_get.assert_called_with('JUJU_ENV_UUID') + + with open(tmpfile, 'r') as fd: + self.assertEqual(hash, fd.read()) + + self.assertTrue(mock_config.called) diff --git a/unit_tests/test_swift_hooks.py b/unit_tests/test_swift_hooks.py new file mode 100644 index 0000000..acf4f24 --- /dev/null +++ b/unit_tests/test_swift_hooks.py @@ -0,0 +1,39 @@ +import mock +import unittest +import uuid + + +with mock.patch('charmhelpers.core.hookenv.log'): + import swift_hooks + + +class SwiftHooksTestCase(unittest.TestCase): + + @mock.patch("swift_hooks.relation_get") + @mock.patch("swift_hooks.local_unit") + def test_all_peers_stopped(self, mock_local_unit, mock_relation_get): + token1 = str(uuid.uuid4()) + token2 = str(uuid.uuid4()) + mock_relation_get.return_value = token1 + + responses = [{'some-other-key': token1}] + self.assertFalse(swift_hooks.all_peers_stopped(responses)) + + responses = [{'stop-proxy-service-ack': token1}, + {'stop-proxy-service-ack': token2}] + self.assertFalse(swift_hooks.all_peers_stopped(responses)) + + responses = [{'stop-proxy-service-ack': token1}, + {'stop-proxy-service-ack': token1}, + {'some-other-key': token1}] + self.assertFalse(swift_hooks.all_peers_stopped(responses)) + + responses = [{'stop-proxy-service-ack': token1}, + {'stop-proxy-service-ack': token1}] + self.assertTrue(swift_hooks.all_peers_stopped(responses)) + + mock_relation_get.return_value = token2 + + responses = [{'stop-proxy-service-ack': token1}, + {'stop-proxy-service-ack': token1}] + self.assertFalse(swift_hooks.all_peers_stopped(responses)) diff --git a/unit_tests/test_swift_utils.py b/unit_tests/test_swift_utils.py new file mode 100644 index 0000000..fc06e55 --- /dev/null +++ b/unit_tests/test_swift_utils.py @@ -0,0 +1,225 @@ +import mock +import os +import shutil +import tempfile +import uuid +import unittest + + +with mock.patch('charmhelpers.core.hookenv.config'): + import swift_utils + + +def init_ring_paths(tmpdir): + swift_utils.SWIFT_CONF_DIR = tmpdir + for ring in swift_utils.SWIFT_RINGS.iterkeys(): + path = os.path.join(tmpdir, "%s.builder" % ring) + swift_utils.SWIFT_RINGS[ring] = path + with open(path, 'w') as fd: + fd.write("0\n") + + +class SwiftUtilsTestCase(unittest.TestCase): + + @mock.patch('swift_utils.get_broker_token') + @mock.patch('swift_utils.update_www_rings') + @mock.patch('swift_utils.get_builders_checksum') + @mock.patch('swift_utils.get_rings_checksum') + @mock.patch('swift_utils.balance_rings') + @mock.patch('swift_utils.log') + @mock.patch('swift_utils.os.path.exists') + @mock.patch('swift_utils.is_elected_leader') + @mock.patch('swift_utils.get_min_part_hours') + @mock.patch('swift_utils.set_min_part_hours') + def test_update_rings(self, mock_set_min_hours, + mock_get_min_hours, + mock_is_elected_leader, mock_path_exists, + mock_log, mock_balance_rings, + mock_get_rings_checksum, + mock_get_builders_checksum, mock_update_www_rings, + mock_get_broker_token): + mock_get_broker_token.return_value = "token1" + + # Make sure same is returned for both so that we don't try to sync + mock_get_rings_checksum.return_value = None + mock_get_builders_checksum.return_value = None + + # Test blocker 1 + mock_is_elected_leader.return_value = False + swift_utils.update_rings() + self.assertFalse(mock_balance_rings.called) + + # Test blocker 2 + mock_path_exists.return_value = False + mock_is_elected_leader.return_value = True + swift_utils.update_rings() + self.assertFalse(mock_get_min_hours.called) + self.assertFalse(mock_balance_rings.called) + + # Test blocker 3 + mock_path_exists.return_value = True + mock_is_elected_leader.return_value = True + mock_get_min_hours.return_value = 10 + swift_utils.update_rings(min_part_hours=10) + self.assertTrue(mock_get_min_hours.called) + self.assertFalse(mock_set_min_hours.called) + self.assertFalse(mock_balance_rings.called) + + mock_get_min_hours.reset_mock() + + # Test go through + mock_path_exists.return_value = True + mock_is_elected_leader.return_value = True + mock_get_min_hours.return_value = 0 + swift_utils.update_rings(min_part_hours=10) + self.assertTrue(mock_get_min_hours.called) + self.assertTrue(mock_set_min_hours.called) + self.assertTrue(mock_balance_rings.called) + + @mock.patch('swift_utils.get_broker_token') + @mock.patch('swift_utils.balance_rings') + @mock.patch('swift_utils.log') + @mock.patch('swift_utils.is_elected_leader') + @mock.patch('swift_utils.config') + @mock.patch('swift_utils.update_www_rings') + @mock.patch('swift_utils.cluster_sync_rings') + def test_sync_builders_and_rings_if_changed(self, mock_cluster_sync_rings, + mock_update_www_rings, + mock_config, + mock_is_elected_leader, + mock_log, + mock_balance_rings, + mock_get_broker_token): + mock_get_broker_token.return_value = "token1" + + @swift_utils.sync_builders_and_rings_if_changed + def mock_balance(): + for ring, builder in swift_utils.SWIFT_RINGS.iteritems(): + ring = os.path.join(swift_utils.SWIFT_CONF_DIR, + '%s.ring.gz' % ring) + with open(ring, 'w') as fd: + fd.write(str(uuid.uuid4())) + + with open(builder, 'w') as fd: + fd.write(str(uuid.uuid4())) + + mock_balance_rings.side_effect = mock_balance + + init_ring_paths(tempfile.mkdtemp()) + try: + swift_utils.balance_rings() + finally: + shutil.rmtree(swift_utils.SWIFT_CONF_DIR) + + self.assertTrue(mock_update_www_rings.called) + self.assertTrue(mock_cluster_sync_rings.called) + + @mock.patch('swift_utils.get_www_dir') + def test_mark_www_rings_deleted(self, mock_get_www_dir): + try: + tmpdir = tempfile.mkdtemp() + mock_get_www_dir.return_value = tmpdir + swift_utils.mark_www_rings_deleted() + finally: + shutil.rmtree(tmpdir) + + @mock.patch('swift_utils.uuid') + def test_cluster_rpc_stop_proxy_request(self, mock_uuid): + mock_uuid.uuid4.return_value = 'test-uuid' + rpc = swift_utils.SwiftProxyClusterRPC() + rq = rpc.stop_proxy_request(peers_only=True) + self.assertEqual({'trigger': 'test-uuid', + 'broker-token': None, + 'builder-broker': None, + 'peers-only': True, + 'leader-changed-notification': None, + 'stop-proxy-service': 'test-uuid', + 'stop-proxy-service-ack': None, + 'sync-only-builders': None}, rq) + + rq = rpc.stop_proxy_request() + self.assertEqual({'trigger': 'test-uuid', + 'broker-token': None, + 'builder-broker': None, + 'peers-only': None, + 'leader-changed-notification': None, + 'stop-proxy-service': 'test-uuid', + 'stop-proxy-service-ack': None, + 'sync-only-builders': None}, rq) + + @mock.patch('swift_utils.uuid') + def test_cluster_rpc_stop_proxy_ack(self, mock_uuid): + mock_uuid.uuid4.return_value = 'token2' + rpc = swift_utils.SwiftProxyClusterRPC() + rq = rpc.stop_proxy_ack(echo_token='token1', echo_peers_only='1') + self.assertEqual({'trigger': 'token2', + 'broker-token': None, + 'builder-broker': None, + 'peers-only': '1', + 'leader-changed-notification': None, + 'stop-proxy-service': None, + 'stop-proxy-service-ack': 'token1', + 'sync-only-builders': None}, rq) + + @mock.patch('swift_utils.uuid') + def test_cluster_rpc_sync_request(self, mock_uuid): + mock_uuid.uuid4.return_value = 'token2' + rpc = swift_utils.SwiftProxyClusterRPC() + rq = rpc.sync_rings_request('HostA', 'token1') + self.assertEqual({'trigger': 'token2', + 'broker-token': 'token1', + 'builder-broker': 'HostA', + 'peers-only': None, + 'leader-changed-notification': None, + 'stop-proxy-service': None, + 'stop-proxy-service-ack': None, + 'sync-only-builders': None}, rq) + + @mock.patch('swift_utils.uuid') + def test_cluster_rpc_notify_leader_changed(self, mock_uuid): + mock_uuid.uuid4.return_value = 'token1' + rpc = swift_utils.SwiftProxyClusterRPC() + rq = rpc.notify_leader_changed() + self.assertEqual({'trigger': 'token1', + 'broker-token': None, + 'builder-broker': None, + 'peers-only': None, + 'leader-changed-notification': 'token1', + 'stop-proxy-service': None, + 'stop-proxy-service-ack': None, + 'sync-only-builders': None}, rq) + + def test_all_responses_equal(self): + responses = [{'a': 1, 'c': 3}] + self.assertTrue(swift_utils.all_responses_equal(responses, 'b', + must_exist=False)) + + responses = [{'a': 1, 'c': 3}] + self.assertFalse(swift_utils.all_responses_equal(responses, 'b')) + + responses = [{'a': 1, 'b': 2, 'c': 3}] + self.assertTrue(swift_utils.all_responses_equal(responses, 'b')) + + responses = [{'a': 1, 'b': 2, 'c': 3}, {'a': 1, 'b': 2, 'c': 3}] + self.assertTrue(swift_utils.all_responses_equal(responses, 'b')) + + responses = [{'a': 1, 'b': 2, 'c': 3}, {'a': 2, 'b': 2, 'c': 3}] + self.assertTrue(swift_utils.all_responses_equal(responses, 'b')) + + responses = [{'a': 1, 'b': 2, 'c': 3}, {'a': 1, 'b': 3, 'c': 3}] + self.assertFalse(swift_utils.all_responses_equal(responses, 'b')) + + def test_get_first_available_value(self): + rsps = [{'key1': 'A'}, {'key1': 'B'}] + self.assertEqual('A', + swift_utils.get_first_available_value(rsps, 'key1')) + + rsps = [{'key2': 'A'}, {'key1': 'B'}] + self.assertEqual('B', + swift_utils.get_first_available_value(rsps, 'key1')) + + rsps = [{'key2': 'A'}, {'key1': 'B'}] + self.assertIsNone(swift_utils.get_first_available_value(rsps, 'key3')) + + rsps = [] + self.assertIsNone(swift_utils.get_first_available_value(rsps, 'key3'))