diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..7f7b5be --- /dev/null +++ b/.coveragerc @@ -0,0 +1,7 @@ +[report] +# Regexes for lines to exclude from consideration +exclude_lines = + if __name__ == .__main__.: +include= + hooks/hooks.py + hooks/ceph*.py diff --git a/Makefile b/Makefile index 70e95fb..e29ab2b 100644 --- a/Makefile +++ b/Makefile @@ -2,9 +2,13 @@ PYTHON := /usr/bin/env python lint: - @flake8 --exclude hooks/charmhelpers hooks tests + @flake8 --exclude hooks/charmhelpers hooks tests unit_tests @charm proof +unit_test: + @echo Starting unit tests... + @$(PYTHON) /usr/bin/nosetests --nologcapture --with-coverage unit_tests + test: @echo Starting Amulet tests... # coreycb note: The -v should only be temporary until Amulet sends diff --git a/charm-helpers-hooks.yaml b/charm-helpers-hooks.yaml index f697867..b75fd92 100644 --- a/charm-helpers-hooks.yaml +++ b/charm-helpers-hooks.yaml @@ -5,6 +5,7 @@ include: - fetch - contrib.storage.linux: - utils + - ceph - payload.execd - contrib.openstack.alternatives - contrib.network.ip diff --git a/config.yaml b/config.yaml index 1581052..6baf03b 100644 --- a/config.yaml +++ b/config.yaml @@ -155,14 +155,17 @@ options: order for this charm to function correctly, the privacy extension must be disabled and a non-temporary address must be configured/available on your network interface. + sysctl: + type: string + default: '' + description: | + YAML-formatted associative array of sysctl key/value pairs to be set + persistently e.g. '{ kernel.pid_max : 4194303 }'. nagios_context: default: "juju" - type: string - description: | Used by the nrpe-external-master subordinate charm. A string that will be prepended to instance name to set the host name in nagios. So for instance the hostname would be something like: juju-myservice-0 If you're running multiple environments with the same services in them this allows you to differentiate between them. - diff --git a/hooks/__init__.py b/hooks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hooks/ceph_broker.py b/hooks/ceph_broker.py new file mode 100644 index 0000000..9fced94 --- /dev/null +++ b/hooks/ceph_broker.py @@ -0,0 +1,90 @@ +#!/usr/bin/python +# +# Copyright 2014 Canonical Ltd. +# +import json + +from charmhelpers.core.hookenv import ( + log, + DEBUG, + INFO, + ERROR, +) +from charmhelpers.contrib.storage.linux.ceph import ( + create_pool, + pool_exists, +) + + +def decode_req_encode_rsp(f): + """Decorator to decode incoming requests and encode responses.""" + def decode_inner(req): + return json.dumps(f(json.loads(req))) + + return decode_inner + + +@decode_req_encode_rsp +def process_requests(reqs): + """Process Ceph broker request(s). + + This is a versioned api. API version must be supplied by the client making + the request. + """ + try: + version = reqs.get('api-version') + if version == 1: + return process_requests_v1(reqs['ops']) + + except Exception as exc: + log(str(exc), level=ERROR) + msg = ("Unexpected error occurred while processing requests: %s" % + (reqs)) + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + msg = ("Missing or invalid api version (%s)" % (version)) + return {'exit-code': 1, 'stderr': msg} + + +def process_requests_v1(reqs): + """Process v1 requests. + + Takes a list of requests (dicts) and processes each one. If an error is + found, processing stops and the client is notified in the response. + + Returns a response dict containing the exit code (non-zero if any + operation failed along with an explanation). + """ + log("Processing %s ceph broker requests" % (len(reqs)), level=INFO) + for req in reqs: + op = req.get('op') + log("Processing op='%s'" % (op), level=DEBUG) + # Use admin client since we do not have other client key locations + # setup to use them for these operations. + svc = 'admin' + if op == "create-pool": + params = {'pool': req.get('name'), + 'replicas': req.get('replicas')} + if not all(params.iteritems()): + msg = ("Missing parameter(s): %s" % + (' '.join([k for k in params.iterkeys() + if not params[k]]))) + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + pool = params['pool'] + replicas = params['replicas'] + if not pool_exists(service=svc, name=pool): + log("Creating pool '%s' (replicas=%s)" % (pool, replicas), + level=INFO) + create_pool(service=svc, name=pool, replicas=replicas) + else: + log("Pool '%s' already exists - skipping create" % (pool), + level=DEBUG) + else: + msg = "Unknown operation '%s'" % (op) + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + + return {'exit-code': 0} diff --git a/hooks/charmhelpers/__init__.py b/hooks/charmhelpers/__init__.py index e69de29..b46e2e2 100644 --- a/hooks/charmhelpers/__init__.py +++ b/hooks/charmhelpers/__init__.py @@ -0,0 +1,22 @@ +# Bootstrap charm-helpers, installing its dependencies if necessary using +# only standard libraries. +import subprocess +import sys + +try: + import six # flake8: noqa +except ImportError: + if sys.version_info.major == 2: + subprocess.check_call(['apt-get', 'install', '-y', 'python-six']) + else: + subprocess.check_call(['apt-get', 'install', '-y', 'python3-six']) + import six # flake8: noqa + +try: + import yaml # flake8: noqa +except ImportError: + if sys.version_info.major == 2: + subprocess.check_call(['apt-get', 'install', '-y', 'python-yaml']) + else: + subprocess.check_call(['apt-get', 'install', '-y', 'python3-yaml']) + import yaml # flake8: noqa diff --git a/hooks/charmhelpers/contrib/network/ip.py b/hooks/charmhelpers/contrib/network/ip.py index e62e565..8dc8316 100644 --- a/hooks/charmhelpers/contrib/network/ip.py +++ b/hooks/charmhelpers/contrib/network/ip.py @@ -1,15 +1,12 @@ import glob import re import subprocess -import sys from functools import partial from charmhelpers.core.hookenv import unit_get from charmhelpers.fetch import apt_install from charmhelpers.core.hookenv import ( - WARNING, - ERROR, log ) @@ -34,31 +31,28 @@ def _validate_cidr(network): network) +def no_ip_found_error_out(network): + errmsg = ("No IP address found in network: %s" % network) + raise ValueError(errmsg) + + def get_address_in_network(network, fallback=None, fatal=False): - """ - Get an IPv4 or IPv6 address within the network from the host. + """Get an IPv4 or IPv6 address within the network from the host. :param network (str): CIDR presentation format. For example, '192.168.1.0/24'. :param fallback (str): If no address is found, return fallback. :param fatal (boolean): If no address is found, fallback is not set and fatal is True then exit(1). - """ - - def not_found_error_out(): - log("No IP address found in network: %s" % network, - level=ERROR) - sys.exit(1) - if network is None: if fallback is not None: return fallback + + if fatal: + no_ip_found_error_out(network) else: - if fatal: - not_found_error_out() - else: - return None + return None _validate_cidr(network) network = netaddr.IPNetwork(network) @@ -70,6 +64,7 @@ def get_address_in_network(network, fallback=None, fatal=False): cidr = netaddr.IPNetwork("%s/%s" % (addr, netmask)) if cidr in network: return str(cidr.ip) + if network.version == 6 and netifaces.AF_INET6 in addresses: for addr in addresses[netifaces.AF_INET6]: if not addr['addr'].startswith('fe80'): @@ -82,20 +77,20 @@ def get_address_in_network(network, fallback=None, fatal=False): return fallback if fatal: - not_found_error_out() + no_ip_found_error_out(network) return None def is_ipv6(address): - '''Determine whether provided address is IPv6 or not''' + """Determine whether provided address is IPv6 or not.""" try: address = netaddr.IPAddress(address) except netaddr.AddrFormatError: # probably a hostname - so not an address at all! return False - else: - return address.version == 6 + + return address.version == 6 def is_address_in_network(network, address): @@ -113,11 +108,13 @@ def is_address_in_network(network, address): except (netaddr.core.AddrFormatError, ValueError): raise ValueError("Network (%s) is not in CIDR presentation format" % network) + try: address = netaddr.IPAddress(address) except (netaddr.core.AddrFormatError, ValueError): raise ValueError("Address (%s) is not in correct presentation format" % address) + if address in network: return True else: @@ -147,6 +144,7 @@ def _get_for_address(address, key): return iface else: return addresses[netifaces.AF_INET][0][key] + if address.version == 6 and netifaces.AF_INET6 in addresses: for addr in addresses[netifaces.AF_INET6]: if not addr['addr'].startswith('fe80'): @@ -160,41 +158,42 @@ def _get_for_address(address, key): return str(cidr).split('/')[1] else: return addr[key] + return None get_iface_for_address = partial(_get_for_address, key='iface') + get_netmask_for_address = partial(_get_for_address, key='netmask') def format_ipv6_addr(address): - """ - IPv6 needs to be wrapped with [] in url link to parse correctly. + """If address is IPv6, wrap it in '[]' otherwise return None. + + This is required by most configuration files when specifying IPv6 + addresses. """ if is_ipv6(address): - address = "[%s]" % address - else: - log("Not a valid ipv6 address: %s" % address, level=WARNING) - address = None + return "[%s]" % address - return address + return None def get_iface_addr(iface='eth0', inet_type='AF_INET', inc_aliases=False, fatal=True, exc_list=None): - """ - Return the assigned IP address for a given interface, if any, or []. - """ + """Return the assigned IP address for a given interface, if any.""" # Extract nic if passed /dev/ethX if '/' in iface: iface = iface.split('/')[-1] + if not exc_list: exc_list = [] + try: inet_num = getattr(netifaces, inet_type) except AttributeError: - raise Exception('Unknown inet type ' + str(inet_type)) + raise Exception("Unknown inet type '%s'" % str(inet_type)) interfaces = netifaces.interfaces() if inc_aliases: @@ -202,15 +201,18 @@ def get_iface_addr(iface='eth0', inet_type='AF_INET', inc_aliases=False, for _iface in interfaces: if iface == _iface or _iface.split(':')[0] == iface: ifaces.append(_iface) + if fatal and not ifaces: raise Exception("Invalid interface '%s'" % iface) + ifaces.sort() else: if iface not in interfaces: if fatal: - raise Exception("%s not found " % (iface)) + raise Exception("Interface '%s' not found " % (iface)) else: return [] + else: ifaces = [iface] @@ -221,10 +223,13 @@ def get_iface_addr(iface='eth0', inet_type='AF_INET', inc_aliases=False, for entry in net_info[inet_num]: if 'addr' in entry and entry['addr'] not in exc_list: addresses.append(entry['addr']) + if fatal and not addresses: raise Exception("Interface '%s' doesn't have any %s addresses." % (iface, inet_type)) - return addresses + + return sorted(addresses) + get_ipv4_addr = partial(get_iface_addr, inet_type='AF_INET') @@ -241,6 +246,7 @@ def get_iface_from_addr(addr): raw = re.match(ll_key, _addr) if raw: _addr = raw.group(1) + if _addr == addr: log("Address '%s' is configured on iface '%s'" % (addr, iface)) @@ -251,8 +257,9 @@ def get_iface_from_addr(addr): def sniff_iface(f): - """If no iface provided, inject net iface inferred from unit private - address. + """Ensure decorated function is called with a value for iface. + + If no iface provided, inject net iface inferred from unit private address. """ def iface_sniffer(*args, **kwargs): if not kwargs.get('iface', None): @@ -295,7 +302,7 @@ def get_ipv6_addr(iface=None, inc_aliases=False, fatal=True, exc_list=None, if global_addrs: # Make sure any found global addresses are not temporary cmd = ['ip', 'addr', 'show', iface] - out = subprocess.check_output(cmd) + out = subprocess.check_output(cmd).decode('UTF-8') if dynamic_only: key = re.compile("inet6 (.+)/[0-9]+ scope global dynamic.*") else: @@ -317,33 +324,28 @@ def get_ipv6_addr(iface=None, inc_aliases=False, fatal=True, exc_list=None, return addrs if fatal: - raise Exception("Interface '%s' doesn't have a scope global " + raise Exception("Interface '%s' does not have a scope global " "non-temporary ipv6 address." % iface) return [] def get_bridges(vnic_dir='/sys/devices/virtual/net'): - """ - Return a list of bridges on the system or [] - """ - b_rgex = vnic_dir + '/*/bridge' - return [x.replace(vnic_dir, '').split('/')[1] for x in glob.glob(b_rgex)] + """Return a list of bridges on the system.""" + b_regex = "%s/*/bridge" % vnic_dir + return [x.replace(vnic_dir, '').split('/')[1] for x in glob.glob(b_regex)] def get_bridge_nics(bridge, vnic_dir='/sys/devices/virtual/net'): - """ - Return a list of nics comprising a given bridge on the system or [] - """ - brif_rgex = "%s/%s/brif/*" % (vnic_dir, bridge) - return [x.split('/')[-1] for x in glob.glob(brif_rgex)] + """Return a list of nics comprising a given bridge on the system.""" + brif_regex = "%s/%s/brif/*" % (vnic_dir, bridge) + return [x.split('/')[-1] for x in glob.glob(brif_regex)] def is_bridge_member(nic): - """ - Check if a given nic is a member of a bridge - """ + """Check if a given nic is a member of a bridge.""" for bridge in get_bridges(): if nic in get_bridge_nics(bridge): return True + return False diff --git a/hooks/charmhelpers/contrib/storage/linux/ceph.py b/hooks/charmhelpers/contrib/storage/linux/ceph.py new file mode 100644 index 0000000..d47dc22 --- /dev/null +++ b/hooks/charmhelpers/contrib/storage/linux/ceph.py @@ -0,0 +1,374 @@ +# +# Copyright 2012 Canonical Ltd. +# +# This file is sourced from lp:openstack-charm-helpers +# +# Authors: +# James Page +# Adam Gandelman +# + +import os +import shutil +import json +import time + +from subprocess import ( + check_call, + check_output, + CalledProcessError, +) +from charmhelpers.core.hookenv import ( + relation_get, + relation_ids, + related_units, + log, + DEBUG, + INFO, + WARNING, + ERROR, +) +from charmhelpers.core.host import ( + mount, + mounts, + service_start, + service_stop, + service_running, + umount, +) +from charmhelpers.fetch import ( + apt_install, +) + +KEYRING = '/etc/ceph/ceph.client.{}.keyring' +KEYFILE = '/etc/ceph/ceph.client.{}.key' + +CEPH_CONF = """[global] + auth supported = {auth} + keyring = {keyring} + mon host = {mon_hosts} + log to syslog = {use_syslog} + err to syslog = {use_syslog} + clog to syslog = {use_syslog} +""" + + +def install(): + """Basic Ceph client installation.""" + ceph_dir = "/etc/ceph" + if not os.path.exists(ceph_dir): + os.mkdir(ceph_dir) + + apt_install('ceph-common', fatal=True) + + +def rbd_exists(service, pool, rbd_img): + """Check to see if a RADOS block device exists.""" + try: + out = check_output(['rbd', 'list', '--id', + service, '--pool', pool]).decode('UTF-8') + except CalledProcessError: + return False + + return rbd_img in out + + +def create_rbd_image(service, pool, image, sizemb): + """Create a new RADOS block device.""" + cmd = ['rbd', 'create', image, '--size', str(sizemb), '--id', service, + '--pool', pool] + check_call(cmd) + + +def pool_exists(service, name): + """Check to see if a RADOS pool already exists.""" + try: + out = check_output(['rados', '--id', service, + 'lspools']).decode('UTF-8') + except CalledProcessError: + return False + + return name in out + + +def get_osds(service): + """Return a list of all Ceph Object Storage Daemons currently in the + cluster. + """ + version = ceph_version() + if version and version >= '0.56': + return json.loads(check_output(['ceph', '--id', service, + 'osd', 'ls', + '--format=json']).decode('UTF-8')) + + return None + + +def create_pool(service, name, replicas=3): + """Create a new RADOS pool.""" + if pool_exists(service, name): + log("Ceph pool {} already exists, skipping creation".format(name), + level=WARNING) + return + + # Calculate the number of placement groups based + # on upstream recommended best practices. + osds = get_osds(service) + if osds: + pgnum = (len(osds) * 100 // replicas) + else: + # NOTE(james-page): Default to 200 for older ceph versions + # which don't support OSD query from cli + pgnum = 200 + + cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pgnum)] + check_call(cmd) + + cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', name, 'size', + str(replicas)] + check_call(cmd) + + +def delete_pool(service, name): + """Delete a RADOS pool from ceph.""" + cmd = ['ceph', '--id', service, 'osd', 'pool', 'delete', name, + '--yes-i-really-really-mean-it'] + check_call(cmd) + + +def _keyfile_path(service): + return KEYFILE.format(service) + + +def _keyring_path(service): + return KEYRING.format(service) + + +def create_keyring(service, key): + """Create a new Ceph keyring containing key.""" + keyring = _keyring_path(service) + if os.path.exists(keyring): + log('Ceph keyring exists at %s.' % keyring, level=WARNING) + return + + cmd = ['ceph-authtool', keyring, '--create-keyring', + '--name=client.{}'.format(service), '--add-key={}'.format(key)] + check_call(cmd) + log('Created new ceph keyring at %s.' % keyring, level=DEBUG) + + +def create_key_file(service, key): + """Create a file containing key.""" + keyfile = _keyfile_path(service) + if os.path.exists(keyfile): + log('Keyfile exists at %s.' % keyfile, level=WARNING) + return + + with open(keyfile, 'w') as fd: + fd.write(key) + + log('Created new keyfile at %s.' % keyfile, level=INFO) + + +def get_ceph_nodes(): + """Query named relation 'ceph' to determine current nodes.""" + hosts = [] + for r_id in relation_ids('ceph'): + for unit in related_units(r_id): + hosts.append(relation_get('private-address', unit=unit, rid=r_id)) + + return hosts + + +def configure(service, key, auth, use_syslog): + """Perform basic configuration of Ceph.""" + create_keyring(service, key) + create_key_file(service, key) + hosts = get_ceph_nodes() + with open('/etc/ceph/ceph.conf', 'w') as ceph_conf: + ceph_conf.write(CEPH_CONF.format(auth=auth, + keyring=_keyring_path(service), + mon_hosts=",".join(map(str, hosts)), + use_syslog=use_syslog)) + modprobe('rbd') + + +def image_mapped(name): + """Determine whether a RADOS block device is mapped locally.""" + try: + out = check_output(['rbd', 'showmapped']).decode('UTF-8') + except CalledProcessError: + return False + + return name in out + + +def map_block_storage(service, pool, image): + """Map a RADOS block device for local use.""" + cmd = [ + 'rbd', + 'map', + '{}/{}'.format(pool, image), + '--user', + service, + '--secret', + _keyfile_path(service), + ] + check_call(cmd) + + +def filesystem_mounted(fs): + """Determine whether a filesytems is already mounted.""" + return fs in [f for f, m in mounts()] + + +def make_filesystem(blk_device, fstype='ext4', timeout=10): + """Make a new filesystem on the specified block device.""" + count = 0 + e_noent = os.errno.ENOENT + while not os.path.exists(blk_device): + if count >= timeout: + log('Gave up waiting on block device %s' % blk_device, + level=ERROR) + raise IOError(e_noent, os.strerror(e_noent), blk_device) + + log('Waiting for block device %s to appear' % blk_device, + level=DEBUG) + count += 1 + time.sleep(1) + else: + log('Formatting block device %s as filesystem %s.' % + (blk_device, fstype), level=INFO) + check_call(['mkfs', '-t', fstype, blk_device]) + + +def place_data_on_block_device(blk_device, data_src_dst): + """Migrate data in data_src_dst to blk_device and then remount.""" + # mount block device into /mnt + mount(blk_device, '/mnt') + # copy data to /mnt + copy_files(data_src_dst, '/mnt') + # umount block device + umount('/mnt') + # Grab user/group ID's from original source + _dir = os.stat(data_src_dst) + uid = _dir.st_uid + gid = _dir.st_gid + # re-mount where the data should originally be + # TODO: persist is currently a NO-OP in core.host + mount(blk_device, data_src_dst, persist=True) + # ensure original ownership of new mount. + os.chown(data_src_dst, uid, gid) + + +# TODO: re-use +def modprobe(module): + """Load a kernel module and configure for auto-load on reboot.""" + log('Loading kernel module', level=INFO) + cmd = ['modprobe', module] + check_call(cmd) + with open('/etc/modules', 'r+') as modules: + if module not in modules.read(): + modules.write(module) + + +def copy_files(src, dst, symlinks=False, ignore=None): + """Copy files from src to dst.""" + for item in os.listdir(src): + s = os.path.join(src, item) + d = os.path.join(dst, item) + if os.path.isdir(s): + shutil.copytree(s, d, symlinks, ignore) + else: + shutil.copy2(s, d) + + +def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point, + blk_device, fstype, system_services=[], + replicas=3): + """NOTE: This function must only be called from a single service unit for + the same rbd_img otherwise data loss will occur. + + Ensures given pool and RBD image exists, is mapped to a block device, + and the device is formatted and mounted at the given mount_point. + + If formatting a device for the first time, data existing at mount_point + will be migrated to the RBD device before being re-mounted. + + All services listed in system_services will be stopped prior to data + migration and restarted when complete. + """ + # Ensure pool, RBD image, RBD mappings are in place. + if not pool_exists(service, pool): + log('Creating new pool {}.'.format(pool), level=INFO) + create_pool(service, pool, replicas=replicas) + + if not rbd_exists(service, pool, rbd_img): + log('Creating RBD image ({}).'.format(rbd_img), level=INFO) + create_rbd_image(service, pool, rbd_img, sizemb) + + if not image_mapped(rbd_img): + log('Mapping RBD Image {} as a Block Device.'.format(rbd_img), + level=INFO) + map_block_storage(service, pool, rbd_img) + + # make file system + # TODO: What happens if for whatever reason this is run again and + # the data is already in the rbd device and/or is mounted?? + # When it is mounted already, it will fail to make the fs + # XXX: This is really sketchy! Need to at least add an fstab entry + # otherwise this hook will blow away existing data if its executed + # after a reboot. + if not filesystem_mounted(mount_point): + make_filesystem(blk_device, fstype) + + for svc in system_services: + if service_running(svc): + log('Stopping services {} prior to migrating data.' + .format(svc), level=DEBUG) + service_stop(svc) + + place_data_on_block_device(blk_device, mount_point) + + for svc in system_services: + log('Starting service {} after migrating data.' + .format(svc), level=DEBUG) + service_start(svc) + + +def ensure_ceph_keyring(service, user=None, group=None): + """Ensures a ceph keyring is created for a named service and optionally + ensures user and group ownership. + + Returns False if no ceph key is available in relation state. + """ + key = None + for rid in relation_ids('ceph'): + for unit in related_units(rid): + key = relation_get('key', rid=rid, unit=unit) + if key: + break + + if not key: + return False + + create_keyring(service=service, key=key) + keyring = _keyring_path(service) + if user and group: + check_call(['chown', '%s.%s' % (user, group), keyring]) + + return True + + +def ceph_version(): + """Retrieve the local version of ceph.""" + if os.path.exists('/usr/bin/ceph'): + cmd = ['ceph', '-v'] + output = check_output(cmd).decode('US-ASCII') + output = output.split() + if len(output) > 3: + return output[2] + else: + return None + else: + return None diff --git a/hooks/charmhelpers/contrib/storage/linux/utils.py b/hooks/charmhelpers/contrib/storage/linux/utils.py index 1b95871..c6a15e1 100644 --- a/hooks/charmhelpers/contrib/storage/linux/utils.py +++ b/hooks/charmhelpers/contrib/storage/linux/utils.py @@ -30,7 +30,8 @@ def zap_disk(block_device): # sometimes sgdisk exits non-zero; this is OK, dd will clean up call(['sgdisk', '--zap-all', '--mbrtogpt', '--clear', block_device]) - dev_end = check_output(['blockdev', '--getsz', block_device]) + dev_end = check_output(['blockdev', '--getsz', + block_device]).decode('UTF-8') gpt_end = int(dev_end.split()[0]) - 100 check_call(['dd', 'if=/dev/zero', 'of=%s' % (block_device), 'bs=1M', 'count=1']) @@ -47,7 +48,7 @@ def is_device_mounted(device): it doesn't. ''' is_partition = bool(re.search(r".*[0-9]+\b", device)) - out = check_output(['mount']) + out = check_output(['mount']).decode('UTF-8') if is_partition: return bool(re.search(device + r"\b", out)) return bool(re.search(device + r"[0-9]+\b", out)) diff --git a/hooks/charmhelpers/core/fstab.py b/hooks/charmhelpers/core/fstab.py index cfaf0a6..0adf0db 100644 --- a/hooks/charmhelpers/core/fstab.py +++ b/hooks/charmhelpers/core/fstab.py @@ -3,10 +3,11 @@ __author__ = 'Jorge Niedbalski R. ' +import io import os -class Fstab(file): +class Fstab(io.FileIO): """This class extends file in order to implement a file reader/writer for file `/etc/fstab` """ @@ -24,8 +25,8 @@ class Fstab(file): options = "defaults" self.options = options - self.d = d - self.p = p + self.d = int(d) + self.p = int(p) def __eq__(self, o): return str(self) == str(o) @@ -45,7 +46,7 @@ class Fstab(file): self._path = path else: self._path = self.DEFAULT_PATH - file.__init__(self, self._path, 'r+') + super(Fstab, self).__init__(self._path, 'rb+') def _hydrate_entry(self, line): # NOTE: use split with no arguments to split on any @@ -58,8 +59,9 @@ class Fstab(file): def entries(self): self.seek(0) for line in self.readlines(): + line = line.decode('us-ascii') try: - if not line.startswith("#"): + if line.strip() and not line.startswith("#"): yield self._hydrate_entry(line) except ValueError: pass @@ -75,14 +77,14 @@ class Fstab(file): if self.get_entry_by_attr('device', entry.device): return False - self.write(str(entry) + '\n') + self.write((str(entry) + '\n').encode('us-ascii')) self.truncate() return entry def remove_entry(self, entry): self.seek(0) - lines = self.readlines() + lines = [l.decode('us-ascii') for l in self.readlines()] found = False for index, line in enumerate(lines): @@ -97,7 +99,7 @@ class Fstab(file): lines.remove(line) self.seek(0) - self.write(''.join(lines)) + self.write(''.join(lines).encode('us-ascii')) self.truncate() return True diff --git a/hooks/charmhelpers/core/hookenv.py b/hooks/charmhelpers/core/hookenv.py index 083a709..69ae456 100644 --- a/hooks/charmhelpers/core/hookenv.py +++ b/hooks/charmhelpers/core/hookenv.py @@ -9,9 +9,14 @@ import json import yaml import subprocess import sys -import UserDict from subprocess import CalledProcessError +import six +if not six.PY3: + from UserDict import UserDict +else: + from collections import UserDict + CRITICAL = "CRITICAL" ERROR = "ERROR" WARNING = "WARNING" @@ -63,16 +68,18 @@ def log(message, level=None): command = ['juju-log'] if level: command += ['-l', level] + if not isinstance(message, six.string_types): + message = repr(message) command += [message] subprocess.call(command) -class Serializable(UserDict.IterableUserDict): +class Serializable(UserDict): """Wrapper, an object that can be serialized to yaml or json""" def __init__(self, obj): # wrap the object - UserDict.IterableUserDict.__init__(self) + UserDict.__init__(self) self.data = obj def __getattr__(self, attr): @@ -218,7 +225,7 @@ class Config(dict): prev_keys = [] if self._prev_dict is not None: prev_keys = self._prev_dict.keys() - return list(set(prev_keys + dict.keys(self))) + return list(set(prev_keys + list(dict.keys(self)))) def load_previous(self, path=None): """Load previous copy of config from disk. @@ -269,7 +276,7 @@ class Config(dict): """ if self._prev_dict: - for k, v in self._prev_dict.iteritems(): + for k, v in six.iteritems(self._prev_dict): if k not in self: self[k] = v with open(self.path, 'w') as f: @@ -284,7 +291,8 @@ def config(scope=None): config_cmd_line.append(scope) config_cmd_line.append('--format=json') try: - config_data = json.loads(subprocess.check_output(config_cmd_line)) + config_data = json.loads( + subprocess.check_output(config_cmd_line).decode('UTF-8')) if scope is not None: return config_data return Config(config_data) @@ -303,10 +311,10 @@ def relation_get(attribute=None, unit=None, rid=None): if unit: _args.append(unit) try: - return json.loads(subprocess.check_output(_args)) + return json.loads(subprocess.check_output(_args).decode('UTF-8')) except ValueError: return None - except CalledProcessError, e: + except CalledProcessError as e: if e.returncode == 2: return None raise @@ -318,7 +326,7 @@ def relation_set(relation_id=None, relation_settings=None, **kwargs): relation_cmd_line = ['relation-set'] if relation_id is not None: relation_cmd_line.extend(('-r', relation_id)) - for k, v in (relation_settings.items() + kwargs.items()): + for k, v in (list(relation_settings.items()) + list(kwargs.items())): if v is None: relation_cmd_line.append('{}='.format(k)) else: @@ -335,7 +343,8 @@ def relation_ids(reltype=None): relid_cmd_line = ['relation-ids', '--format=json'] if reltype is not None: relid_cmd_line.append(reltype) - return json.loads(subprocess.check_output(relid_cmd_line)) or [] + return json.loads( + subprocess.check_output(relid_cmd_line).decode('UTF-8')) or [] return [] @@ -346,7 +355,8 @@ def related_units(relid=None): units_cmd_line = ['relation-list', '--format=json'] if relid is not None: units_cmd_line.extend(('-r', relid)) - return json.loads(subprocess.check_output(units_cmd_line)) or [] + return json.loads( + subprocess.check_output(units_cmd_line).decode('UTF-8')) or [] @cached @@ -385,21 +395,31 @@ def relations_of_type(reltype=None): return relation_data +@cached +def metadata(): + """Get the current charm metadata.yaml contents as a python object""" + with open(os.path.join(charm_dir(), 'metadata.yaml')) as md: + return yaml.safe_load(md) + + @cached def relation_types(): """Get a list of relation types supported by this charm""" - charmdir = os.environ.get('CHARM_DIR', '') - mdf = open(os.path.join(charmdir, 'metadata.yaml')) - md = yaml.safe_load(mdf) rel_types = [] + md = metadata() for key in ('provides', 'requires', 'peers'): section = md.get(key) if section: rel_types.extend(section.keys()) - mdf.close() return rel_types +@cached +def charm_name(): + """Get the name of the current charm as is specified on metadata.yaml""" + return metadata().get('name') + + @cached def relations(): """Get a nested dictionary of relation data for all related units""" @@ -455,7 +475,7 @@ def unit_get(attribute): """Get the unit ID for the remote unit""" _args = ['unit-get', '--format=json', attribute] try: - return json.loads(subprocess.check_output(_args)) + return json.loads(subprocess.check_output(_args).decode('UTF-8')) except ValueError: return None diff --git a/hooks/charmhelpers/core/host.py b/hooks/charmhelpers/core/host.py index 0b8bdc5..c6f1680 100644 --- a/hooks/charmhelpers/core/host.py +++ b/hooks/charmhelpers/core/host.py @@ -14,11 +14,12 @@ import string import subprocess import hashlib from contextlib import contextmanager - from collections import OrderedDict -from hookenv import log -from fstab import Fstab +import six + +from .hookenv import log +from .fstab import Fstab def service_start(service_name): @@ -54,7 +55,9 @@ def service(action, service_name): def service_running(service): """Determine whether a system service is running""" try: - output = subprocess.check_output(['service', service, 'status'], stderr=subprocess.STDOUT) + output = subprocess.check_output( + ['service', service, 'status'], + stderr=subprocess.STDOUT).decode('UTF-8') except subprocess.CalledProcessError: return False else: @@ -67,7 +70,9 @@ def service_running(service): def service_available(service_name): """Determine whether a system service is available""" try: - subprocess.check_output(['service', service_name, 'status'], stderr=subprocess.STDOUT) + subprocess.check_output( + ['service', service_name, 'status'], + stderr=subprocess.STDOUT).decode('UTF-8') except subprocess.CalledProcessError as e: return 'unrecognized service' not in e.output else: @@ -96,6 +101,26 @@ def adduser(username, password=None, shell='/bin/bash', system_user=False): return user_info +def add_group(group_name, system_group=False): + """Add a group to the system""" + try: + group_info = grp.getgrnam(group_name) + log('group {0} already exists!'.format(group_name)) + except KeyError: + log('creating group {0}'.format(group_name)) + cmd = ['addgroup'] + if system_group: + cmd.append('--system') + else: + cmd.extend([ + '--group', + ]) + cmd.append(group_name) + subprocess.check_call(cmd) + group_info = grp.getgrnam(group_name) + return group_info + + def add_user_to_group(username, group): """Add a user to a group""" cmd = [ @@ -115,7 +140,7 @@ def rsync(from_path, to_path, flags='-r', options=None): cmd.append(from_path) cmd.append(to_path) log(" ".join(cmd)) - return subprocess.check_output(cmd).strip() + return subprocess.check_output(cmd).decode('UTF-8').strip() def symlink(source, destination): @@ -130,7 +155,7 @@ def symlink(source, destination): subprocess.check_call(cmd) -def mkdir(path, owner='root', group='root', perms=0555, force=False): +def mkdir(path, owner='root', group='root', perms=0o555, force=False): """Create a directory""" log("Making dir {} {}:{} {:o}".format(path, owner, group, perms)) @@ -146,7 +171,7 @@ def mkdir(path, owner='root', group='root', perms=0555, force=False): os.chown(realpath, uid, gid) -def write_file(path, content, owner='root', group='root', perms=0444): +def write_file(path, content, owner='root', group='root', perms=0o444): """Create or overwrite a file with the contents of a string""" log("Writing file {} {}:{} {:o}".format(path, owner, group, perms)) uid = pwd.getpwnam(owner).pw_uid @@ -177,7 +202,7 @@ def mount(device, mountpoint, options=None, persist=False, filesystem="ext3"): cmd_args.extend([device, mountpoint]) try: subprocess.check_output(cmd_args) - except subprocess.CalledProcessError, e: + except subprocess.CalledProcessError as e: log('Error mounting {} at {}\n{}'.format(device, mountpoint, e.output)) return False @@ -191,7 +216,7 @@ def umount(mountpoint, persist=False): cmd_args = ['umount', mountpoint] try: subprocess.check_output(cmd_args) - except subprocess.CalledProcessError, e: + except subprocess.CalledProcessError as e: log('Error unmounting {}\n{}'.format(mountpoint, e.output)) return False @@ -218,8 +243,8 @@ def file_hash(path, hash_type='md5'): """ if os.path.exists(path): h = getattr(hashlib, hash_type)() - with open(path, 'r') as source: - h.update(source.read()) # IGNORE:E1101 - it does have update + with open(path, 'rb') as source: + h.update(source.read()) return h.hexdigest() else: return None @@ -297,7 +322,7 @@ def pwgen(length=None): if length is None: length = random.choice(range(35, 45)) alphanumeric_chars = [ - l for l in (string.letters + string.digits) + l for l in (string.ascii_letters + string.digits) if l not in 'l0QD1vAEIOUaeiou'] random_chars = [ random.choice(alphanumeric_chars) for _ in range(length)] @@ -306,14 +331,14 @@ def pwgen(length=None): def list_nics(nic_type): '''Return a list of nics of given type(s)''' - if isinstance(nic_type, basestring): + if isinstance(nic_type, six.string_types): int_types = [nic_type] else: int_types = nic_type interfaces = [] for int_type in int_types: cmd = ['ip', 'addr', 'show', 'label', int_type + '*'] - ip_output = subprocess.check_output(cmd).split('\n') + ip_output = subprocess.check_output(cmd).decode('UTF-8').split('\n') ip_output = (line for line in ip_output if line) for line in ip_output: if line.split()[1].startswith(int_type): @@ -335,7 +360,7 @@ def set_nic_mtu(nic, mtu): def get_nic_mtu(nic): cmd = ['ip', 'addr', 'show', nic] - ip_output = subprocess.check_output(cmd).split('\n') + ip_output = subprocess.check_output(cmd).decode('UTF-8').split('\n') mtu = "" for line in ip_output: words = line.split() @@ -346,7 +371,7 @@ def get_nic_mtu(nic): def get_nic_hwaddr(nic): cmd = ['ip', '-o', '-0', 'addr', 'show', nic] - ip_output = subprocess.check_output(cmd) + ip_output = subprocess.check_output(cmd).decode('UTF-8') hwaddr = "" words = ip_output.split() if 'link/ether' in words: @@ -363,8 +388,8 @@ def cmp_pkgrevno(package, revno, pkgcache=None): ''' import apt_pkg - from charmhelpers.fetch import apt_cache if not pkgcache: + from charmhelpers.fetch import apt_cache pkgcache = apt_cache() pkg = pkgcache[package] return apt_pkg.version_compare(pkg.current_ver.ver_str, revno) diff --git a/hooks/charmhelpers/core/services/__init__.py b/hooks/charmhelpers/core/services/__init__.py index e8039a8..69dde79 100644 --- a/hooks/charmhelpers/core/services/__init__.py +++ b/hooks/charmhelpers/core/services/__init__.py @@ -1,2 +1,2 @@ -from .base import * -from .helpers import * +from .base import * # NOQA +from .helpers import * # NOQA diff --git a/hooks/charmhelpers/core/services/helpers.py b/hooks/charmhelpers/core/services/helpers.py index 7067b94..163a793 100644 --- a/hooks/charmhelpers/core/services/helpers.py +++ b/hooks/charmhelpers/core/services/helpers.py @@ -196,7 +196,7 @@ class StoredContext(dict): if not os.path.isabs(file_name): file_name = os.path.join(hookenv.charm_dir(), file_name) with open(file_name, 'w') as file_stream: - os.fchmod(file_stream.fileno(), 0600) + os.fchmod(file_stream.fileno(), 0o600) yaml.dump(config_data, file_stream) def read_context(self, file_name): @@ -211,15 +211,19 @@ class StoredContext(dict): class TemplateCallback(ManagerCallback): """ - Callback class that will render a Jinja2 template, for use as a ready action. + Callback class that will render a Jinja2 template, for use as a ready + action. + + :param str source: The template source file, relative to + `$CHARM_DIR/templates` - :param str source: The template source file, relative to `$CHARM_DIR/templates` :param str target: The target to write the rendered template to :param str owner: The owner of the rendered file :param str group: The group of the rendered file :param int perms: The permissions of the rendered file """ - def __init__(self, source, target, owner='root', group='root', perms=0444): + def __init__(self, source, target, + owner='root', group='root', perms=0o444): self.source = source self.target = target self.owner = owner diff --git a/hooks/charmhelpers/core/templating.py b/hooks/charmhelpers/core/templating.py index 2c63885..569eaed 100644 --- a/hooks/charmhelpers/core/templating.py +++ b/hooks/charmhelpers/core/templating.py @@ -4,7 +4,8 @@ from charmhelpers.core import host from charmhelpers.core import hookenv -def render(source, target, context, owner='root', group='root', perms=0444, templates_dir=None): +def render(source, target, context, owner='root', group='root', + perms=0o444, templates_dir=None): """ Render a template. @@ -47,5 +48,5 @@ def render(source, target, context, owner='root', group='root', perms=0444, temp level=hookenv.ERROR) raise e content = template.render(context) - host.mkdir(os.path.dirname(target)) + host.mkdir(os.path.dirname(target), owner, group) host.write_file(target, content, owner, group, perms) diff --git a/hooks/charmhelpers/fetch/__init__.py b/hooks/charmhelpers/fetch/__init__.py index 6724d29..0a126fc 100644 --- a/hooks/charmhelpers/fetch/__init__.py +++ b/hooks/charmhelpers/fetch/__init__.py @@ -5,10 +5,6 @@ from yaml import safe_load from charmhelpers.core.host import ( lsb_release ) -from urlparse import ( - urlparse, - urlunparse, -) import subprocess from charmhelpers.core.hookenv import ( config, @@ -16,6 +12,12 @@ from charmhelpers.core.hookenv import ( ) import os +import six +if six.PY3: + from urllib.parse import urlparse, urlunparse +else: + from urlparse import urlparse, urlunparse + CLOUD_ARCHIVE = """# Ubuntu Cloud Archive deb http://ubuntu-cloud.archive.canonical.com/ubuntu {} main @@ -149,7 +151,7 @@ def apt_install(packages, options=None, fatal=False): cmd = ['apt-get', '--assume-yes'] cmd.extend(options) cmd.append('install') - if isinstance(packages, basestring): + if isinstance(packages, six.string_types): cmd.append(packages) else: cmd.extend(packages) @@ -182,7 +184,7 @@ def apt_update(fatal=False): def apt_purge(packages, fatal=False): """Purge one or more packages""" cmd = ['apt-get', '--assume-yes', 'purge'] - if isinstance(packages, basestring): + if isinstance(packages, six.string_types): cmd.append(packages) else: cmd.extend(packages) @@ -193,7 +195,7 @@ def apt_purge(packages, fatal=False): def apt_hold(packages, fatal=False): """Hold one or more packages""" cmd = ['apt-mark', 'hold'] - if isinstance(packages, basestring): + if isinstance(packages, six.string_types): cmd.append(packages) else: cmd.extend(packages) @@ -256,11 +258,11 @@ def add_source(source, key=None): elif source == 'distro': pass else: - raise SourceConfigError("Unknown source: {!r}".format(source)) + log("Unknown source: {!r}".format(source)) if key: if '-----BEGIN PGP PUBLIC KEY BLOCK-----' in key: - with NamedTemporaryFile() as key_file: + with NamedTemporaryFile('w+') as key_file: key_file.write(key) key_file.flush() key_file.seek(0) @@ -297,14 +299,14 @@ def configure_sources(update=False, sources = safe_load((config(sources_var) or '').strip()) or [] keys = safe_load((config(keys_var) or '').strip()) or None - if isinstance(sources, basestring): + if isinstance(sources, six.string_types): sources = [sources] if keys is None: for source in sources: add_source(source, None) else: - if isinstance(keys, basestring): + if isinstance(keys, six.string_types): keys = [keys] if len(sources) != len(keys): @@ -401,7 +403,7 @@ def _run_apt_command(cmd, fatal=False): while result is None or result == APT_NO_LOCK: try: result = subprocess.check_call(cmd, env=env) - except subprocess.CalledProcessError, e: + except subprocess.CalledProcessError as e: retry_count = retry_count + 1 if retry_count > APT_NO_LOCK_RETRY_COUNT: raise diff --git a/hooks/charmhelpers/fetch/archiveurl.py b/hooks/charmhelpers/fetch/archiveurl.py index 8c04565..8a4624b 100644 --- a/hooks/charmhelpers/fetch/archiveurl.py +++ b/hooks/charmhelpers/fetch/archiveurl.py @@ -1,8 +1,23 @@ import os -import urllib2 -from urllib import urlretrieve -import urlparse import hashlib +import re + +import six +if six.PY3: + from urllib.request import ( + build_opener, install_opener, urlopen, urlretrieve, + HTTPPasswordMgrWithDefaultRealm, HTTPBasicAuthHandler, + ) + from urllib.parse import urlparse, urlunparse, parse_qs + from urllib.error import URLError +else: + from urllib import urlretrieve + from urllib2 import ( + build_opener, install_opener, urlopen, + HTTPPasswordMgrWithDefaultRealm, HTTPBasicAuthHandler, + URLError + ) + from urlparse import urlparse, urlunparse, parse_qs from charmhelpers.fetch import ( BaseFetchHandler, @@ -15,6 +30,24 @@ from charmhelpers.payload.archive import ( from charmhelpers.core.host import mkdir, check_hash +def splituser(host): + '''urllib.splituser(), but six's support of this seems broken''' + _userprog = re.compile('^(.*)@(.*)$') + match = _userprog.match(host) + if match: + return match.group(1, 2) + return None, host + + +def splitpasswd(user): + '''urllib.splitpasswd(), but six's support of this is missing''' + _passwdprog = re.compile('^([^:]*):(.*)$', re.S) + match = _passwdprog.match(user) + if match: + return match.group(1, 2) + return user, None + + class ArchiveUrlFetchHandler(BaseFetchHandler): """ Handler to download archive files from arbitrary URLs. @@ -42,20 +75,20 @@ class ArchiveUrlFetchHandler(BaseFetchHandler): """ # propogate all exceptions # URLError, OSError, etc - proto, netloc, path, params, query, fragment = urlparse.urlparse(source) + proto, netloc, path, params, query, fragment = urlparse(source) if proto in ('http', 'https'): - auth, barehost = urllib2.splituser(netloc) + auth, barehost = splituser(netloc) if auth is not None: - source = urlparse.urlunparse((proto, barehost, path, params, query, fragment)) - username, password = urllib2.splitpasswd(auth) - passman = urllib2.HTTPPasswordMgrWithDefaultRealm() + source = urlunparse((proto, barehost, path, params, query, fragment)) + username, password = splitpasswd(auth) + passman = HTTPPasswordMgrWithDefaultRealm() # Realm is set to None in add_password to force the username and password # to be used whatever the realm passman.add_password(None, source, username, password) - authhandler = urllib2.HTTPBasicAuthHandler(passman) - opener = urllib2.build_opener(authhandler) - urllib2.install_opener(opener) - response = urllib2.urlopen(source) + authhandler = HTTPBasicAuthHandler(passman) + opener = build_opener(authhandler) + install_opener(opener) + response = urlopen(source) try: with open(dest, 'w') as dest_file: dest_file.write(response.read()) @@ -91,17 +124,21 @@ class ArchiveUrlFetchHandler(BaseFetchHandler): url_parts = self.parse_url(source) dest_dir = os.path.join(os.environ.get('CHARM_DIR'), 'fetched') if not os.path.exists(dest_dir): - mkdir(dest_dir, perms=0755) + mkdir(dest_dir, perms=0o755) dld_file = os.path.join(dest_dir, os.path.basename(url_parts.path)) try: self.download(source, dld_file) - except urllib2.URLError as e: + except URLError as e: raise UnhandledSource(e.reason) except OSError as e: raise UnhandledSource(e.strerror) - options = urlparse.parse_qs(url_parts.fragment) + options = parse_qs(url_parts.fragment) for key, value in options.items(): - if key in hashlib.algorithms: + if not six.PY3: + algorithms = hashlib.algorithms + else: + algorithms = hashlib.algorithms_available + if key in algorithms: check_hash(dld_file, value, key) if checksum: check_hash(dld_file, checksum, hash_type) diff --git a/hooks/charmhelpers/fetch/bzrurl.py b/hooks/charmhelpers/fetch/bzrurl.py index 0e580e4..8ef48f3 100644 --- a/hooks/charmhelpers/fetch/bzrurl.py +++ b/hooks/charmhelpers/fetch/bzrurl.py @@ -5,6 +5,10 @@ from charmhelpers.fetch import ( ) from charmhelpers.core.host import mkdir +import six +if six.PY3: + raise ImportError('bzrlib does not support Python3') + try: from bzrlib.branch import Branch except ImportError: @@ -42,7 +46,7 @@ class BzrUrlFetchHandler(BaseFetchHandler): dest_dir = os.path.join(os.environ.get('CHARM_DIR'), "fetched", branch_name) if not os.path.exists(dest_dir): - mkdir(dest_dir, perms=0755) + mkdir(dest_dir, perms=0o755) try: self.branch(source, dest_dir) except OSError as e: diff --git a/hooks/charmhelpers/fetch/giturl.py b/hooks/charmhelpers/fetch/giturl.py index 7d67246..f3aa282 100644 --- a/hooks/charmhelpers/fetch/giturl.py +++ b/hooks/charmhelpers/fetch/giturl.py @@ -5,6 +5,10 @@ from charmhelpers.fetch import ( ) from charmhelpers.core.host import mkdir +import six +if six.PY3: + raise ImportError('GitPython does not support Python 3') + try: from git import Repo except ImportError: @@ -17,7 +21,7 @@ class GitUrlFetchHandler(BaseFetchHandler): """Handler for git branches via generic and github URLs""" def can_handle(self, source): url_parts = self.parse_url(source) - #TODO (mattyw) no support for ssh git@ yet + # TODO (mattyw) no support for ssh git@ yet if url_parts.scheme not in ('http', 'https', 'git'): return False else: @@ -30,13 +34,16 @@ class GitUrlFetchHandler(BaseFetchHandler): repo = Repo.clone_from(source, dest) repo.git.checkout(branch) - def install(self, source, branch="master"): + def install(self, source, branch="master", dest=None): url_parts = self.parse_url(source) branch_name = url_parts.path.strip("/").split("/")[-1] - dest_dir = os.path.join(os.environ.get('CHARM_DIR'), "fetched", - branch_name) + if dest: + dest_dir = os.path.join(dest, branch_name) + else: + dest_dir = os.path.join(os.environ.get('CHARM_DIR'), "fetched", + branch_name) if not os.path.exists(dest_dir): - mkdir(dest_dir, perms=0755) + mkdir(dest_dir, perms=0o755) try: self.clone(source, dest_dir, branch) except OSError as e: diff --git a/hooks/client-relation-changed b/hooks/client-relation-changed new file mode 120000 index 0000000..9416ca6 --- /dev/null +++ b/hooks/client-relation-changed @@ -0,0 +1 @@ +hooks.py \ No newline at end of file diff --git a/hooks/hooks.py b/hooks/hooks.py index 4747a09..20a6bd8 100755 --- a/hooks/hooks.py +++ b/hooks/hooks.py @@ -15,7 +15,9 @@ import sys import ceph from charmhelpers.core.hookenv import ( - log, ERROR, + log, + DEBUG, + ERROR, config, relation_ids, related_units, @@ -27,7 +29,6 @@ from charmhelpers.core.hookenv import ( service_name, relations_of_type ) - from charmhelpers.core.host import ( service_restart, umount, @@ -48,12 +49,16 @@ from charmhelpers.contrib.network.ip import ( get_ipv6_addr, format_ipv6_addr ) +from charmhelpers.core.sysctl import create as create_sysctl from utils import ( render_template, get_public_addr, assert_charm_supports_ipv6 ) +from ceph_broker import ( + process_requests +) from charmhelpers.contrib.charmsupport.nrpe import NRPE @@ -130,6 +135,10 @@ def config_changed(): log('Invalid OSD disk format configuration specified', level=ERROR) sys.exit(1) + sysctl_dict = config('sysctl') + if sysctl_dict: + create_sysctl(sysctl_dict, '/etc/sysctl.d/50-ceph-charm.conf') + emit_cephconf() e_mountpoint = config('ephemeral-unmount') @@ -229,7 +238,7 @@ def notify_radosgws(): def notify_client(): for relid in relation_ids('client'): - client_relation(relid) + client_relation_joined(relid) def upgrade_keys(): @@ -280,28 +289,46 @@ def radosgw_relation(relid=None): @hooks.hook('client-relation-joined') -def client_relation(relid=None): +def client_relation_joined(relid=None): if ceph.is_quorum(): log('mon cluster in quorum - providing client with keys') service_name = None if relid is None: - service_name = remote_unit().split('/')[0] + units = [remote_unit()] + service_name = units[0].split('/')[0] else: units = related_units(relid) if len(units) > 0: service_name = units[0].split('/')[0] + if service_name is not None: - data = { - 'key': ceph.get_named_key(service_name), - 'auth': config('auth-supported'), - 'ceph-public-address': get_public_addr(), - } + data = {'key': ceph.get_named_key(service_name), + 'auth': config('auth-supported'), + 'ceph-public-address': get_public_addr()} relation_set(relation_id=relid, relation_settings=data) + + client_relation_changed(relid=relid) else: log('mon cluster not in quorum - deferring key provision') +@hooks.hook('client-relation-changed') +def client_relation_changed(relid=None): + """Process broker requests from ceph client relations.""" + if ceph.is_quorum(): + settings = relation_get(rid=relid) + if 'broker_req' in settings: + if not ceph.is_leader(): + log("Not leader - ignoring broker request", level=DEBUG) + else: + rsp = process_requests(settings['broker_req']) + relation_set(relation_id=relid, + relation_settings={'broker_rsp': rsp}) + else: + log('mon cluster not in quorum', level=DEBUG) + + @hooks.hook('upgrade-charm') def upgrade_charm(): emit_cephconf() diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..37083b6 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,5 @@ +[nosetests] +verbosity=2 +with-coverage=1 +cover-erase=1 +cover-package=hooks diff --git a/tests/charmhelpers/__init__.py b/tests/charmhelpers/__init__.py index e69de29..b46e2e2 100644 --- a/tests/charmhelpers/__init__.py +++ b/tests/charmhelpers/__init__.py @@ -0,0 +1,22 @@ +# Bootstrap charm-helpers, installing its dependencies if necessary using +# only standard libraries. +import subprocess +import sys + +try: + import six # flake8: noqa +except ImportError: + if sys.version_info.major == 2: + subprocess.check_call(['apt-get', 'install', '-y', 'python-six']) + else: + subprocess.check_call(['apt-get', 'install', '-y', 'python3-six']) + import six # flake8: noqa + +try: + import yaml # flake8: noqa +except ImportError: + if sys.version_info.major == 2: + subprocess.check_call(['apt-get', 'install', '-y', 'python-yaml']) + else: + subprocess.check_call(['apt-get', 'install', '-y', 'python3-yaml']) + import yaml # flake8: noqa diff --git a/tests/charmhelpers/contrib/amulet/deployment.py b/tests/charmhelpers/contrib/amulet/deployment.py index d859d36..3d3ef33 100644 --- a/tests/charmhelpers/contrib/amulet/deployment.py +++ b/tests/charmhelpers/contrib/amulet/deployment.py @@ -1,6 +1,6 @@ import amulet - import os +import six class AmuletDeployment(object): @@ -52,12 +52,12 @@ class AmuletDeployment(object): def _add_relations(self, relations): """Add all of the relations for the services.""" - for k, v in relations.iteritems(): + for k, v in six.iteritems(relations): self.d.relate(k, v) def _configure_services(self, configs): """Configure all of the services.""" - for service, config in configs.iteritems(): + for service, config in six.iteritems(configs): self.d.configure(service, config) def _deploy(self): diff --git a/tests/charmhelpers/contrib/amulet/utils.py b/tests/charmhelpers/contrib/amulet/utils.py index c843333..d333e63 100644 --- a/tests/charmhelpers/contrib/amulet/utils.py +++ b/tests/charmhelpers/contrib/amulet/utils.py @@ -5,6 +5,8 @@ import re import sys import time +import six + class AmuletUtils(object): """Amulet utilities. @@ -58,7 +60,7 @@ class AmuletUtils(object): Verify the specified services are running on the corresponding service units. """ - for k, v in commands.iteritems(): + for k, v in six.iteritems(commands): for cmd in v: output, code = k.run(cmd) if code != 0: @@ -100,11 +102,11 @@ class AmuletUtils(object): longs, or can be a function that evaluate a variable and returns a bool. """ - for k, v in expected.iteritems(): + for k, v in six.iteritems(expected): if k in actual: - if (isinstance(v, basestring) or + if (isinstance(v, six.string_types) or isinstance(v, bool) or - isinstance(v, (int, long))): + isinstance(v, six.integer_types)): if v != actual[k]: return "{}:{}".format(k, actual[k]) elif not v(actual[k]): diff --git a/tests/charmhelpers/contrib/openstack/amulet/deployment.py b/tests/charmhelpers/contrib/openstack/amulet/deployment.py index 3c7f422..f3fee07 100644 --- a/tests/charmhelpers/contrib/openstack/amulet/deployment.py +++ b/tests/charmhelpers/contrib/openstack/amulet/deployment.py @@ -1,3 +1,4 @@ +import six from charmhelpers.contrib.amulet.deployment import ( AmuletDeployment ) @@ -69,7 +70,7 @@ class OpenStackAmuletDeployment(AmuletDeployment): def _configure_services(self, configs): """Configure all of the services.""" - for service, config in configs.iteritems(): + for service, config in six.iteritems(configs): self.d.configure(service, config) def _get_openstack_release(self): diff --git a/tests/charmhelpers/contrib/openstack/amulet/utils.py b/tests/charmhelpers/contrib/openstack/amulet/utils.py index 0f312b9..3e0cc61 100644 --- a/tests/charmhelpers/contrib/openstack/amulet/utils.py +++ b/tests/charmhelpers/contrib/openstack/amulet/utils.py @@ -7,6 +7,8 @@ import glanceclient.v1.client as glance_client import keystoneclient.v2_0 as keystone_client import novaclient.v1_1.client as nova_client +import six + from charmhelpers.contrib.amulet.utils import ( AmuletUtils ) @@ -60,7 +62,7 @@ class OpenStackAmuletUtils(AmuletUtils): expected service catalog endpoints. """ self.log.debug('actual: {}'.format(repr(actual))) - for k, v in expected.iteritems(): + for k, v in six.iteritems(expected): if k in actual: ret = self._validate_dict_data(expected[k][0], actual[k][0]) if ret: diff --git a/unit_tests/__init__.py b/unit_tests/__init__.py new file mode 100644 index 0000000..f80aab3 --- /dev/null +++ b/unit_tests/__init__.py @@ -0,0 +1,2 @@ +import sys +sys.path.append('hooks') diff --git a/unit_tests/test_ceph_broker.py b/unit_tests/test_ceph_broker.py new file mode 100644 index 0000000..0176d11 --- /dev/null +++ b/unit_tests/test_ceph_broker.py @@ -0,0 +1,72 @@ +import json +import mock +import unittest + +import ceph_broker + + +class CephBrokerTestCase(unittest.TestCase): + + def setUp(self): + super(CephBrokerTestCase, self).setUp() + + @mock.patch('ceph_broker.log') + def test_process_requests_noop(self, mock_log): + req = json.dumps({'api-version': 1, 'ops': []}) + rc = ceph_broker.process_requests(req) + self.assertEqual(json.loads(rc), {'exit-code': 0}) + + @mock.patch('ceph_broker.log') + def test_process_requests_missing_api_version(self, mock_log): + req = json.dumps({'ops': []}) + rc = ceph_broker.process_requests(req) + self.assertEqual(json.loads(rc), {'exit-code': 1, + 'stderr': + ('Missing or invalid api version ' + '(None)')}) + + @mock.patch('ceph_broker.log') + def test_process_requests_invalid_api_version(self, mock_log): + req = json.dumps({'api-version': 2, 'ops': []}) + rc = ceph_broker.process_requests(req) + self.assertEqual(json.loads(rc), + {'exit-code': 1, + 'stderr': 'Missing or invalid api version (2)'}) + + @mock.patch('ceph_broker.log') + def test_process_requests_invalid(self, mock_log): + reqs = json.dumps({'api-version': 1, 'ops': [{'op': 'invalid_op'}]}) + rc = ceph_broker.process_requests(reqs) + self.assertEqual(json.loads(rc), + {'exit-code': 1, + 'stderr': "Unknown operation 'invalid_op'"}) + + @mock.patch('ceph_broker.create_pool') + @mock.patch('ceph_broker.pool_exists') + @mock.patch('ceph_broker.log') + def test_process_requests_create_pool(self, mock_log, mock_pool_exists, + mock_create_pool): + mock_pool_exists.return_value = False + reqs = json.dumps({'api-version': 1, + 'ops': [{'op': 'create-pool', 'name': + 'foo', 'replicas': 3}]}) + rc = ceph_broker.process_requests(reqs) + mock_pool_exists.assert_called_with(service='admin', name='foo') + mock_create_pool.assert_called_with(service='admin', name='foo', + replicas=3) + self.assertEqual(json.loads(rc), {'exit-code': 0}) + + @mock.patch('ceph_broker.create_pool') + @mock.patch('ceph_broker.pool_exists') + @mock.patch('ceph_broker.log') + def test_process_requests_create_pool_exists(self, mock_log, + mock_pool_exists, + mock_create_pool): + mock_pool_exists.return_value = True + reqs = json.dumps({'api-version': 1, + 'ops': [{'op': 'create-pool', 'name': 'foo', + 'replicas': 3}]}) + rc = ceph_broker.process_requests(reqs) + mock_pool_exists.assert_called_with(service='admin', name='foo') + self.assertFalse(mock_create_pool.called) + self.assertEqual(json.loads(rc), {'exit-code': 0})