diff --git a/actions.yaml b/actions.yaml
new file mode 100644
index 0000000..92bbce7
--- /dev/null
+++ b/actions.yaml
@@ -0,0 +1,5 @@
+pause:
+ description: Put hacluster unit in crm standby mode which migrates resources
+ from this unit to another unit in the hacluster
+resume:
+ descrpition: Take hacluster unit out of standby mode
diff --git a/actions/actions.py b/actions/actions.py
new file mode 100755
index 0000000..e5bee5a
--- /dev/null
+++ b/actions/actions.py
@@ -0,0 +1,41 @@
+#!/usr/bin/python
+
+import sys
+import os
+sys.path.append('hooks/')
+import subprocess
+from charmhelpers.core.hookenv import action_fail
+from utils import (
+ pause_unit,
+ resume_unit,
+)
+
+def pause(args):
+ """Pause the Ceilometer services.
+ @raises Exception should the service fail to stop.
+ """
+ pause_unit()
+
+def resume(args):
+ """Resume the Ceilometer services.
+ @raises Exception should the service fail to start."""
+ resume_unit()
+
+
+ACTIONS = {"pause": pause, "resume": resume}
+
+def main(args):
+ action_name = os.path.basename(args[0])
+ try:
+ action = ACTIONS[action_name]
+ except KeyError:
+ return "Action %s undefined" % action_name
+ else:
+ try:
+ action(args)
+ except Exception as e:
+ action_fail(str(e))
+
+
+if __name__ == "__main__":
+ sys.exit(main(sys.argv))
diff --git a/actions/pause b/actions/pause
new file mode 120000
index 0000000..405a394
--- /dev/null
+++ b/actions/pause
@@ -0,0 +1 @@
+actions.py
\ No newline at end of file
diff --git a/actions/resume b/actions/resume
new file mode 120000
index 0000000..405a394
--- /dev/null
+++ b/actions/resume
@@ -0,0 +1 @@
+actions.py
\ No newline at end of file
diff --git a/hooks/charmhelpers/cli/__init__.py b/hooks/charmhelpers/cli/__init__.py
index 7118daf..2d37ab3 100644
--- a/hooks/charmhelpers/cli/__init__.py
+++ b/hooks/charmhelpers/cli/__init__.py
@@ -20,7 +20,7 @@ import sys
from six.moves import zip
-from charmhelpers.core import unitdata
+import charmhelpers.core.unitdata
class OutputFormatter(object):
@@ -152,23 +152,19 @@ class CommandLine(object):
arguments = self.argument_parser.parse_args()
argspec = inspect.getargspec(arguments.func)
vargs = []
- kwargs = {}
for arg in argspec.args:
vargs.append(getattr(arguments, arg))
if argspec.varargs:
vargs.extend(getattr(arguments, argspec.varargs))
- if argspec.keywords:
- for kwarg in argspec.keywords.items():
- kwargs[kwarg] = getattr(arguments, kwarg)
- output = arguments.func(*vargs, **kwargs)
+ output = arguments.func(*vargs)
if getattr(arguments.func, '_cli_test_command', False):
self.exit_code = 0 if output else 1
output = ''
if getattr(arguments.func, '_cli_no_output', False):
output = ''
self.formatter.format_output(output, arguments.format)
- if unitdata._KV:
- unitdata._KV.flush()
+ if charmhelpers.core.unitdata._KV:
+ charmhelpers.core.unitdata._KV.flush()
cmdline = CommandLine()
diff --git a/hooks/charmhelpers/cli/commands.py b/hooks/charmhelpers/cli/commands.py
index 443ff05..7e91db0 100644
--- a/hooks/charmhelpers/cli/commands.py
+++ b/hooks/charmhelpers/cli/commands.py
@@ -26,7 +26,7 @@ from . import CommandLine # noqa
"""
Import the sub-modules which have decorated subcommands to register with chlp.
"""
-import host # noqa
-import benchmark # noqa
-import unitdata # noqa
-from charmhelpers.core import hookenv # noqa
+from . import host # noqa
+from . import benchmark # noqa
+from . import unitdata # noqa
+from . import hookenv # noqa
diff --git a/hooks/charmhelpers/cli/hookenv.py b/hooks/charmhelpers/cli/hookenv.py
new file mode 100644
index 0000000..265c816
--- /dev/null
+++ b/hooks/charmhelpers/cli/hookenv.py
@@ -0,0 +1,23 @@
+# Copyright 2014-2015 Canonical Limited.
+#
+# This file is part of charm-helpers.
+#
+# charm-helpers is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# charm-helpers is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with charm-helpers. If not, see .
+
+from . import cmdline
+from charmhelpers.core import hookenv
+
+
+cmdline.subcommand('relation-id')(hookenv.relation_id._wrapped)
+cmdline.subcommand('service-name')(hookenv.service_name)
+cmdline.subcommand('remote-service-name')(hookenv.remote_service_name._wrapped)
diff --git a/hooks/charmhelpers/contrib/charmsupport/nrpe.py b/hooks/charmhelpers/contrib/charmsupport/nrpe.py
index 95a79c2..2f24642 100644
--- a/hooks/charmhelpers/contrib/charmsupport/nrpe.py
+++ b/hooks/charmhelpers/contrib/charmsupport/nrpe.py
@@ -148,6 +148,13 @@ define service {{
self.description = description
self.check_cmd = self._locate_cmd(check_cmd)
+ def _get_check_filename(self):
+ return os.path.join(NRPE.nrpe_confdir, '{}.cfg'.format(self.command))
+
+ def _get_service_filename(self, hostname):
+ return os.path.join(NRPE.nagios_exportdir,
+ 'service__{}_{}.cfg'.format(hostname, self.command))
+
def _locate_cmd(self, check_cmd):
search_path = (
'/usr/lib/nagios/plugins',
@@ -163,9 +170,21 @@ define service {{
log('Check command not found: {}'.format(parts[0]))
return ''
+ def _remove_service_files(self):
+ if not os.path.exists(NRPE.nagios_exportdir):
+ return
+ for f in os.listdir(NRPE.nagios_exportdir):
+ if f.endswith('_{}.cfg'.format(self.command)):
+ os.remove(os.path.join(NRPE.nagios_exportdir, f))
+
+ def remove(self, hostname):
+ nrpe_check_file = self._get_check_filename()
+ if os.path.exists(nrpe_check_file):
+ os.remove(nrpe_check_file)
+ self._remove_service_files()
+
def write(self, nagios_context, hostname, nagios_servicegroups):
- nrpe_check_file = '/etc/nagios/nrpe.d/{}.cfg'.format(
- self.command)
+ nrpe_check_file = self._get_check_filename()
with open(nrpe_check_file, 'w') as nrpe_check_config:
nrpe_check_config.write("# check {}\n".format(self.shortname))
nrpe_check_config.write("command[{}]={}\n".format(
@@ -180,9 +199,7 @@ define service {{
def write_service_config(self, nagios_context, hostname,
nagios_servicegroups):
- for f in os.listdir(NRPE.nagios_exportdir):
- if re.search('.*{}.cfg'.format(self.command), f):
- os.remove(os.path.join(NRPE.nagios_exportdir, f))
+ self._remove_service_files()
templ_vars = {
'nagios_hostname': hostname,
@@ -192,8 +209,7 @@ define service {{
'command': self.command,
}
nrpe_service_text = Check.service_template.format(**templ_vars)
- nrpe_service_file = '{}/service__{}_{}.cfg'.format(
- NRPE.nagios_exportdir, hostname, self.command)
+ nrpe_service_file = self._get_service_filename(hostname)
with open(nrpe_service_file, 'w') as nrpe_service_config:
nrpe_service_config.write(str(nrpe_service_text))
@@ -218,12 +234,32 @@ class NRPE(object):
if hostname:
self.hostname = hostname
else:
- self.hostname = "{}-{}".format(self.nagios_context, self.unit_name)
+ nagios_hostname = get_nagios_hostname()
+ if nagios_hostname:
+ self.hostname = nagios_hostname
+ else:
+ self.hostname = "{}-{}".format(self.nagios_context, self.unit_name)
self.checks = []
def add_check(self, *args, **kwargs):
self.checks.append(Check(*args, **kwargs))
+ def remove_check(self, *args, **kwargs):
+ if kwargs.get('shortname') is None:
+ raise ValueError('shortname of check must be specified')
+
+ # Use sensible defaults if they're not specified - these are not
+ # actually used during removal, but they're required for constructing
+ # the Check object; check_disk is chosen because it's part of the
+ # nagios-plugins-basic package.
+ if kwargs.get('check_cmd') is None:
+ kwargs['check_cmd'] = 'check_disk'
+ if kwargs.get('description') is None:
+ kwargs['description'] = ''
+
+ check = Check(*args, **kwargs)
+ check.remove(self.hostname)
+
def write(self):
try:
nagios_uid = pwd.getpwnam('nagios').pw_uid
@@ -260,7 +296,7 @@ def get_nagios_hostcontext(relation_name='nrpe-external-master'):
:param str relation_name: Name of relation nrpe sub joined to
"""
for rel in relations_of_type(relation_name):
- if 'nagios_hostname' in rel:
+ if 'nagios_host_context' in rel:
return rel['nagios_host_context']
@@ -301,11 +337,13 @@ def add_init_service_checks(nrpe, services, unit_name):
upstart_init = '/etc/init/%s.conf' % svc
sysv_init = '/etc/init.d/%s' % svc
if os.path.exists(upstart_init):
- nrpe.add_check(
- shortname=svc,
- description='process check {%s}' % unit_name,
- check_cmd='check_upstart_job %s' % svc
- )
+ # Don't add a check for these services from neutron-gateway
+ if svc not in ['ext-port', 'os-charm-phy-nic-mtu']:
+ nrpe.add_check(
+ shortname=svc,
+ description='process check {%s}' % unit_name,
+ check_cmd='check_upstart_job %s' % svc
+ )
elif os.path.exists(sysv_init):
cronpath = '/etc/cron.d/nagios-service-check-%s' % svc
cron_file = ('*/5 * * * * root '
diff --git a/hooks/charmhelpers/contrib/network/ip.py b/hooks/charmhelpers/contrib/network/ip.py
index fff6d5c..4efe799 100644
--- a/hooks/charmhelpers/contrib/network/ip.py
+++ b/hooks/charmhelpers/contrib/network/ip.py
@@ -23,7 +23,7 @@ import socket
from functools import partial
from charmhelpers.core.hookenv import unit_get
-from charmhelpers.fetch import apt_install
+from charmhelpers.fetch import apt_install, apt_update
from charmhelpers.core.hookenv import (
log,
WARNING,
@@ -32,13 +32,15 @@ from charmhelpers.core.hookenv import (
try:
import netifaces
except ImportError:
- apt_install('python-netifaces')
+ apt_update(fatal=True)
+ apt_install('python-netifaces', fatal=True)
import netifaces
try:
import netaddr
except ImportError:
- apt_install('python-netaddr')
+ apt_update(fatal=True)
+ apt_install('python-netaddr', fatal=True)
import netaddr
@@ -51,7 +53,7 @@ def _validate_cidr(network):
def no_ip_found_error_out(network):
- errmsg = ("No IP address found in network: %s" % network)
+ errmsg = ("No IP address found in network(s): %s" % network)
raise ValueError(errmsg)
@@ -59,7 +61,7 @@ def get_address_in_network(network, fallback=None, fatal=False):
"""Get an IPv4 or IPv6 address within the network from the host.
:param network (str): CIDR presentation format. For example,
- '192.168.1.0/24'.
+ '192.168.1.0/24'. Supports multiple networks as a space-delimited list.
:param fallback (str): If no address is found, return fallback.
:param fatal (boolean): If no address is found, fallback is not
set and fatal is True then exit(1).
@@ -73,24 +75,26 @@ def get_address_in_network(network, fallback=None, fatal=False):
else:
return None
- _validate_cidr(network)
- network = netaddr.IPNetwork(network)
- for iface in netifaces.interfaces():
- addresses = netifaces.ifaddresses(iface)
- if network.version == 4 and netifaces.AF_INET in addresses:
- addr = addresses[netifaces.AF_INET][0]['addr']
- netmask = addresses[netifaces.AF_INET][0]['netmask']
- cidr = netaddr.IPNetwork("%s/%s" % (addr, netmask))
- if cidr in network:
- return str(cidr.ip)
+ networks = network.split() or [network]
+ for network in networks:
+ _validate_cidr(network)
+ network = netaddr.IPNetwork(network)
+ for iface in netifaces.interfaces():
+ addresses = netifaces.ifaddresses(iface)
+ if network.version == 4 and netifaces.AF_INET in addresses:
+ addr = addresses[netifaces.AF_INET][0]['addr']
+ netmask = addresses[netifaces.AF_INET][0]['netmask']
+ cidr = netaddr.IPNetwork("%s/%s" % (addr, netmask))
+ if cidr in network:
+ return str(cidr.ip)
- if network.version == 6 and netifaces.AF_INET6 in addresses:
- for addr in addresses[netifaces.AF_INET6]:
- if not addr['addr'].startswith('fe80'):
- cidr = netaddr.IPNetwork("%s/%s" % (addr['addr'],
- addr['netmask']))
- if cidr in network:
- return str(cidr.ip)
+ if network.version == 6 and netifaces.AF_INET6 in addresses:
+ for addr in addresses[netifaces.AF_INET6]:
+ if not addr['addr'].startswith('fe80'):
+ cidr = netaddr.IPNetwork("%s/%s" % (addr['addr'],
+ addr['netmask']))
+ if cidr in network:
+ return str(cidr.ip)
if fallback is not None:
return fallback
@@ -435,8 +439,12 @@ def get_hostname(address, fqdn=True):
rev = dns.reversename.from_address(address)
result = ns_query(rev)
+
if not result:
- return None
+ try:
+ result = socket.gethostbyaddr(address)[0]
+ except:
+ return None
else:
result = address
@@ -448,3 +456,18 @@ def get_hostname(address, fqdn=True):
return result
else:
return result.split('.')[0]
+
+
+def port_has_listener(address, port):
+ """
+ Returns True if the address:port is open and being listened to,
+ else False.
+
+ @param address: an IP address or hostname
+ @param port: integer port
+
+ Note calls 'zc' via a subprocess shell
+ """
+ cmd = ['nc', '-z', address, str(port)]
+ result = subprocess.call(cmd)
+ return not(bool(result))
diff --git a/hooks/charmhelpers/contrib/openstack/utils.py b/hooks/charmhelpers/contrib/openstack/utils.py
index 4dd000c..3fb67b1 100644
--- a/hooks/charmhelpers/contrib/openstack/utils.py
+++ b/hooks/charmhelpers/contrib/openstack/utils.py
@@ -1,5 +1,3 @@
-#!/usr/bin/python
-
# Copyright 2014-2015 Canonical Limited.
#
# This file is part of charm-helpers.
@@ -24,8 +22,14 @@ import subprocess
import json
import os
import sys
+import re
+import itertools
+import functools
import six
+import tempfile
+import traceback
+import uuid
import yaml
from charmhelpers.contrib.network import ip
@@ -35,12 +39,18 @@ from charmhelpers.core import (
)
from charmhelpers.core.hookenv import (
+ action_fail,
+ action_set,
config,
log as juju_log,
charm_dir,
+ DEBUG,
INFO,
+ related_units,
relation_ids,
- relation_set
+ relation_set,
+ status_set,
+ hook_name
)
from charmhelpers.contrib.storage.linux.lvm import (
@@ -50,7 +60,9 @@ from charmhelpers.contrib.storage.linux.lvm import (
)
from charmhelpers.contrib.network.ip import (
- get_ipv6_addr
+ get_ipv6_addr,
+ is_ipv6,
+ port_has_listener,
)
from charmhelpers.contrib.python.packages import (
@@ -58,7 +70,15 @@ from charmhelpers.contrib.python.packages import (
pip_install,
)
-from charmhelpers.core.host import lsb_release, mounts, umount
+from charmhelpers.core.host import (
+ lsb_release,
+ mounts,
+ umount,
+ service_running,
+ service_pause,
+ service_resume,
+ restart_on_change_helper,
+)
from charmhelpers.fetch import apt_install, apt_cache, install_remote
from charmhelpers.contrib.storage.linux.utils import is_block_device, zap_disk
from charmhelpers.contrib.storage.linux.loopback import ensure_loopback_device
@@ -69,7 +89,6 @@ CLOUD_ARCHIVE_KEY_ID = '5EDB1B62EC4926EA'
DISTRO_PROPOSED = ('deb http://archive.ubuntu.com/ubuntu/ %s-proposed '
'restricted main multiverse universe')
-
UBUNTU_OPENSTACK_RELEASE = OrderedDict([
('oneiric', 'diablo'),
('precise', 'essex'),
@@ -80,6 +99,7 @@ UBUNTU_OPENSTACK_RELEASE = OrderedDict([
('utopic', 'juno'),
('vivid', 'kilo'),
('wily', 'liberty'),
+ ('xenial', 'mitaka'),
])
@@ -93,31 +113,73 @@ OPENSTACK_CODENAMES = OrderedDict([
('2014.2', 'juno'),
('2015.1', 'kilo'),
('2015.2', 'liberty'),
+ ('2016.1', 'mitaka'),
])
-# The ugly duckling
+# The ugly duckling - must list releases oldest to newest
SWIFT_CODENAMES = OrderedDict([
- ('1.4.3', 'diablo'),
- ('1.4.8', 'essex'),
- ('1.7.4', 'folsom'),
- ('1.8.0', 'grizzly'),
- ('1.7.7', 'grizzly'),
- ('1.7.6', 'grizzly'),
- ('1.10.0', 'havana'),
- ('1.9.1', 'havana'),
- ('1.9.0', 'havana'),
- ('1.13.1', 'icehouse'),
- ('1.13.0', 'icehouse'),
- ('1.12.0', 'icehouse'),
- ('1.11.0', 'icehouse'),
- ('2.0.0', 'juno'),
- ('2.1.0', 'juno'),
- ('2.2.0', 'juno'),
- ('2.2.1', 'kilo'),
- ('2.2.2', 'kilo'),
- ('2.3.0', 'liberty'),
+ ('diablo',
+ ['1.4.3']),
+ ('essex',
+ ['1.4.8']),
+ ('folsom',
+ ['1.7.4']),
+ ('grizzly',
+ ['1.7.6', '1.7.7', '1.8.0']),
+ ('havana',
+ ['1.9.0', '1.9.1', '1.10.0']),
+ ('icehouse',
+ ['1.11.0', '1.12.0', '1.13.0', '1.13.1']),
+ ('juno',
+ ['2.0.0', '2.1.0', '2.2.0']),
+ ('kilo',
+ ['2.2.1', '2.2.2']),
+ ('liberty',
+ ['2.3.0', '2.4.0', '2.5.0']),
+ ('mitaka',
+ ['2.5.0', '2.6.0']),
])
+# >= Liberty version->codename mapping
+PACKAGE_CODENAMES = {
+ 'nova-common': OrderedDict([
+ ('12.0', 'liberty'),
+ ('13.0', 'mitaka'),
+ ]),
+ 'neutron-common': OrderedDict([
+ ('7.0', 'liberty'),
+ ('8.0', 'mitaka'),
+ ]),
+ 'cinder-common': OrderedDict([
+ ('7.0', 'liberty'),
+ ('8.0', 'mitaka'),
+ ]),
+ 'keystone': OrderedDict([
+ ('8.0', 'liberty'),
+ ('9.0', 'mitaka'),
+ ]),
+ 'horizon-common': OrderedDict([
+ ('8.0', 'liberty'),
+ ('9.0', 'mitaka'),
+ ]),
+ 'ceilometer-common': OrderedDict([
+ ('5.0', 'liberty'),
+ ('6.0', 'mitaka'),
+ ]),
+ 'heat-common': OrderedDict([
+ ('5.0', 'liberty'),
+ ('6.0', 'mitaka'),
+ ]),
+ 'glance-common': OrderedDict([
+ ('11.0', 'liberty'),
+ ('12.0', 'mitaka'),
+ ]),
+ 'openstack-dashboard': OrderedDict([
+ ('8.0', 'liberty'),
+ ('9.0', 'mitaka'),
+ ]),
+}
+
DEFAULT_LOOPBACK_SIZE = '5G'
@@ -167,9 +229,9 @@ def get_os_codename_version(vers):
error_out(e)
-def get_os_version_codename(codename):
+def get_os_version_codename(codename, version_map=OPENSTACK_CODENAMES):
'''Determine OpenStack version number from codename.'''
- for k, v in six.iteritems(OPENSTACK_CODENAMES):
+ for k, v in six.iteritems(version_map):
if v == codename:
return k
e = 'Could not derive OpenStack version for '\
@@ -177,6 +239,33 @@ def get_os_version_codename(codename):
error_out(e)
+def get_os_version_codename_swift(codename):
+ '''Determine OpenStack version number of swift from codename.'''
+ for k, v in six.iteritems(SWIFT_CODENAMES):
+ if k == codename:
+ return v[-1]
+ e = 'Could not derive swift version for '\
+ 'codename: %s' % codename
+ error_out(e)
+
+
+def get_swift_codename(version):
+ '''Determine OpenStack codename that corresponds to swift version.'''
+ codenames = [k for k, v in six.iteritems(SWIFT_CODENAMES) if version in v]
+ if len(codenames) > 1:
+ # If more than one release codename contains this version we determine
+ # the actual codename based on the highest available install source.
+ for codename in reversed(codenames):
+ releases = UBUNTU_OPENSTACK_RELEASE
+ release = [k for k, v in six.iteritems(releases) if codename in v]
+ ret = subprocess.check_output(['apt-cache', 'policy', 'swift'])
+ if codename in ret or release[0] in ret:
+ return codename
+ elif len(codenames) == 1:
+ return codenames[0]
+ return None
+
+
def get_os_codename_package(package, fatal=True):
'''Derive OpenStack release codename from an installed package.'''
import apt_pkg as apt
@@ -201,20 +290,33 @@ def get_os_codename_package(package, fatal=True):
error_out(e)
vers = apt.upstream_version(pkg.current_ver.ver_str)
+ if 'swift' in pkg.name:
+ # Fully x.y.z match for swift versions
+ match = re.match('^(\d+)\.(\d+)\.(\d+)', vers)
+ else:
+ # x.y match only for 20XX.X
+ # and ignore patch level for other packages
+ match = re.match('^(\d+)\.(\d+)', vers)
- try:
- if 'swift' in pkg.name:
- swift_vers = vers[:5]
- if swift_vers not in SWIFT_CODENAMES:
- # Deal with 1.10.0 upward
- swift_vers = vers[:6]
- return SWIFT_CODENAMES[swift_vers]
- else:
- vers = vers[:6]
- return OPENSTACK_CODENAMES[vers]
- except KeyError:
- e = 'Could not determine OpenStack codename for version %s' % vers
- error_out(e)
+ if match:
+ vers = match.group(0)
+
+ # >= Liberty independent project versions
+ if (package in PACKAGE_CODENAMES and
+ vers in PACKAGE_CODENAMES[package]):
+ return PACKAGE_CODENAMES[package][vers]
+ else:
+ # < Liberty co-ordinated project versions
+ try:
+ if 'swift' in pkg.name:
+ return get_swift_codename(vers)
+ else:
+ return OPENSTACK_CODENAMES[vers]
+ except KeyError:
+ if not fatal:
+ return None
+ e = 'Could not determine OpenStack codename for version %s' % vers
+ error_out(e)
def get_os_version_package(pkg, fatal=True):
@@ -226,12 +328,14 @@ def get_os_version_package(pkg, fatal=True):
if 'swift' in pkg:
vers_map = SWIFT_CODENAMES
+ for cname, version in six.iteritems(vers_map):
+ if cname == codename:
+ return version[-1]
else:
vers_map = OPENSTACK_CODENAMES
-
- for version, cname in six.iteritems(vers_map):
- if cname == codename:
- return version
+ for version, cname in six.iteritems(vers_map):
+ if cname == codename:
+ return version
# e = "Could not determine OpenStack version for package: %s" % pkg
# error_out(e)
@@ -256,12 +360,42 @@ def os_release(package, base='essex'):
def import_key(keyid):
- cmd = "apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 " \
- "--recv-keys %s" % keyid
- try:
- subprocess.check_call(cmd.split(' '))
- except subprocess.CalledProcessError:
- error_out("Error importing repo key %s" % keyid)
+ key = keyid.strip()
+ if (key.startswith('-----BEGIN PGP PUBLIC KEY BLOCK-----') and
+ key.endswith('-----END PGP PUBLIC KEY BLOCK-----')):
+ juju_log("PGP key found (looks like ASCII Armor format)", level=DEBUG)
+ juju_log("Importing ASCII Armor PGP key", level=DEBUG)
+ with tempfile.NamedTemporaryFile() as keyfile:
+ with open(keyfile.name, 'w') as fd:
+ fd.write(key)
+ fd.write("\n")
+
+ cmd = ['apt-key', 'add', keyfile.name]
+ try:
+ subprocess.check_call(cmd)
+ except subprocess.CalledProcessError:
+ error_out("Error importing PGP key '%s'" % key)
+ else:
+ juju_log("PGP key found (looks like Radix64 format)", level=DEBUG)
+ juju_log("Importing PGP key from keyserver", level=DEBUG)
+ cmd = ['apt-key', 'adv', '--keyserver',
+ 'hkp://keyserver.ubuntu.com:80', '--recv-keys', key]
+ try:
+ subprocess.check_call(cmd)
+ except subprocess.CalledProcessError:
+ error_out("Error importing PGP key '%s'" % key)
+
+
+def get_source_and_pgp_key(input):
+ """Look for a pgp key ID or ascii-armor key in the given input."""
+ index = input.strip()
+ index = input.rfind('|')
+ if index < 0:
+ return input, None
+
+ key = input[index + 1:].strip('|')
+ source = input[:index]
+ return source, key
def configure_installation_source(rel):
@@ -273,16 +407,16 @@ def configure_installation_source(rel):
with open('/etc/apt/sources.list.d/juju_deb.list', 'w') as f:
f.write(DISTRO_PROPOSED % ubuntu_rel)
elif rel[:4] == "ppa:":
- src = rel
+ src, key = get_source_and_pgp_key(rel)
+ if key:
+ import_key(key)
+
subprocess.check_call(["add-apt-repository", "-y", src])
elif rel[:3] == "deb":
- l = len(rel.split('|'))
- if l == 2:
- src, key = rel.split('|')
- juju_log("Importing PPA key from keyserver for %s" % src)
+ src, key = get_source_and_pgp_key(rel)
+ if key:
import_key(key)
- elif l == 1:
- src = rel
+
with open('/etc/apt/sources.list.d/juju_deb.list', 'w') as f:
f.write(src)
elif rel[:6] == 'cloud:':
@@ -327,6 +461,9 @@ def configure_installation_source(rel):
'liberty': 'trusty-updates/liberty',
'liberty/updates': 'trusty-updates/liberty',
'liberty/proposed': 'trusty-proposed/liberty',
+ 'mitaka': 'trusty-updates/mitaka',
+ 'mitaka/updates': 'trusty-updates/mitaka',
+ 'mitaka/proposed': 'trusty-proposed/mitaka',
}
try:
@@ -392,9 +529,18 @@ def openstack_upgrade_available(package):
import apt_pkg as apt
src = config('openstack-origin')
cur_vers = get_os_version_package(package)
- available_vers = get_os_version_install_source(src)
+ if "swift" in package:
+ codename = get_os_codename_install_source(src)
+ avail_vers = get_os_version_codename_swift(codename)
+ else:
+ avail_vers = get_os_version_install_source(src)
apt.init()
- return apt.version_compare(available_vers, cur_vers) == 1
+ if "swift" in package:
+ major_cur_vers = cur_vers.split('.', 1)[0]
+ major_avail_vers = avail_vers.split('.', 1)[0]
+ major_diff = apt.version_compare(major_avail_vers, major_cur_vers)
+ return avail_vers > cur_vers and (major_diff == 1 or major_diff == 0)
+ return apt.version_compare(avail_vers, cur_vers) == 1
def ensure_block_device(block_device):
@@ -469,6 +615,12 @@ def sync_db_with_multi_ipv6_addresses(database, database_user,
relation_prefix=None):
hosts = get_ipv6_addr(dynamic_only=False)
+ if config('vip'):
+ vips = config('vip').split()
+ for vip in vips:
+ if vip and is_ipv6(vip):
+ hosts.append(vip)
+
kwargs = {'database': database,
'username': database_user,
'hostname': json.dumps(hosts)}
@@ -517,7 +669,7 @@ def _git_yaml_load(projects_yaml):
return yaml.load(projects_yaml)
-def git_clone_and_install(projects_yaml, core_project, depth=1):
+def git_clone_and_install(projects_yaml, core_project):
"""
Clone/install all specified OpenStack repositories.
@@ -567,6 +719,9 @@ def git_clone_and_install(projects_yaml, core_project, depth=1):
for p in projects['repositories']:
repo = p['repository']
branch = p['branch']
+ depth = '1'
+ if 'depth' in p.keys():
+ depth = p['depth']
if p['name'] == 'requirements':
repo_dir = _git_clone_and_install_single(repo, branch, depth,
parent_dir, http_proxy,
@@ -611,19 +766,14 @@ def _git_clone_and_install_single(repo, branch, depth, parent_dir, http_proxy,
"""
Clone and install a single git repository.
"""
- dest_dir = os.path.join(parent_dir, os.path.basename(repo))
-
if not os.path.exists(parent_dir):
juju_log('Directory already exists at {}. '
'No need to create directory.'.format(parent_dir))
os.mkdir(parent_dir)
- if not os.path.exists(dest_dir):
- juju_log('Cloning git repo: {}, branch: {}'.format(repo, branch))
- repo_dir = install_remote(repo, dest=parent_dir, branch=branch,
- depth=depth)
- else:
- repo_dir = dest_dir
+ juju_log('Cloning git repo: {}, branch: {}'.format(repo, branch))
+ repo_dir = install_remote(
+ repo, dest=parent_dir, branch=branch, depth=depth)
venv = os.path.join(parent_dir, 'venv')
@@ -704,3 +854,719 @@ def git_yaml_value(projects_yaml, key):
return projects[key]
return None
+
+
+def os_workload_status(configs, required_interfaces, charm_func=None):
+ """
+ Decorator to set workload status based on complete contexts
+ """
+ def wrap(f):
+ @wraps(f)
+ def wrapped_f(*args, **kwargs):
+ # Run the original function first
+ f(*args, **kwargs)
+ # Set workload status now that contexts have been
+ # acted on
+ set_os_workload_status(configs, required_interfaces, charm_func)
+ return wrapped_f
+ return wrap
+
+
+def set_os_workload_status(configs, required_interfaces, charm_func=None,
+ services=None, ports=None):
+ """Set the state of the workload status for the charm.
+
+ This calls _determine_os_workload_status() to get the new state, message
+ and sets the status using status_set()
+
+ @param configs: a templating.OSConfigRenderer() object
+ @param required_interfaces: {generic: [specific, specific2, ...]}
+ @param charm_func: a callable function that returns state, message. The
+ signature is charm_func(configs) -> (state, message)
+ @param services: list of strings OR dictionary specifying services/ports
+ @param ports: OPTIONAL list of port numbers.
+ @returns state, message: the new workload status, user message
+ """
+ state, message = _determine_os_workload_status(
+ configs, required_interfaces, charm_func, services, ports)
+ status_set(state, message)
+
+
+def _determine_os_workload_status(
+ configs, required_interfaces, charm_func=None,
+ services=None, ports=None):
+ """Determine the state of the workload status for the charm.
+
+ This function returns the new workload status for the charm based
+ on the state of the interfaces, the paused state and whether the
+ services are actually running and any specified ports are open.
+
+ This checks:
+
+ 1. if the unit should be paused, that it is actually paused. If so the
+ state is 'maintenance' + message, else 'broken'.
+ 2. that the interfaces/relations are complete. If they are not then
+ it sets the state to either 'broken' or 'waiting' and an appropriate
+ message.
+ 3. If all the relation data is set, then it checks that the actual
+ services really are running. If not it sets the state to 'broken'.
+
+ If everything is okay then the state returns 'active'.
+
+ @param configs: a templating.OSConfigRenderer() object
+ @param required_interfaces: {generic: [specific, specific2, ...]}
+ @param charm_func: a callable function that returns state, message. The
+ signature is charm_func(configs) -> (state, message)
+ @param services: list of strings OR dictionary specifying services/ports
+ @param ports: OPTIONAL list of port numbers.
+ @returns state, message: the new workload status, user message
+ """
+ state, message = _ows_check_if_paused(services, ports)
+
+ if state is None:
+ state, message = _ows_check_generic_interfaces(
+ configs, required_interfaces)
+
+ if state != 'maintenance' and charm_func:
+ # _ows_check_charm_func() may modify the state, message
+ state, message = _ows_check_charm_func(
+ state, message, lambda: charm_func(configs))
+
+ if state is None:
+ state, message = _ows_check_services_running(services, ports)
+
+ if state is None:
+ state = 'active'
+ message = "Unit is ready"
+ juju_log(message, 'INFO')
+
+ return state, message
+
+
+def _ows_check_if_paused(services=None, ports=None):
+ """Check if the unit is supposed to be paused, and if so check that the
+ services/ports (if passed) are actually stopped/not being listened to.
+
+ if the unit isn't supposed to be paused, just return None, None
+
+ @param services: OPTIONAL services spec or list of service names.
+ @param ports: OPTIONAL list of port numbers.
+ @returns state, message or None, None
+ """
+ if is_unit_paused_set():
+ state, message = check_actually_paused(services=services,
+ ports=ports)
+ if state is None:
+ # we're paused okay, so set maintenance and return
+ state = "maintenance"
+ message = "Paused. Use 'resume' action to resume normal service."
+ return state, message
+ return None, None
+
+
+def _ows_check_generic_interfaces(configs, required_interfaces):
+ """Check the complete contexts to determine the workload status.
+
+ - Checks for missing or incomplete contexts
+ - juju log details of missing required data.
+ - determines the correct workload status
+ - creates an appropriate message for status_set(...)
+
+ if there are no problems then the function returns None, None
+
+ @param configs: a templating.OSConfigRenderer() object
+ @params required_interfaces: {generic_interface: [specific_interface], }
+ @returns state, message or None, None
+ """
+ incomplete_rel_data = incomplete_relation_data(configs,
+ required_interfaces)
+ state = None
+ message = None
+ missing_relations = set()
+ incomplete_relations = set()
+
+ for generic_interface, relations_states in incomplete_rel_data.items():
+ related_interface = None
+ missing_data = {}
+ # Related or not?
+ for interface, relation_state in relations_states.items():
+ if relation_state.get('related'):
+ related_interface = interface
+ missing_data = relation_state.get('missing_data')
+ break
+ # No relation ID for the generic_interface?
+ if not related_interface:
+ juju_log("{} relation is missing and must be related for "
+ "functionality. ".format(generic_interface), 'WARN')
+ state = 'blocked'
+ missing_relations.add(generic_interface)
+ else:
+ # Relation ID eists but no related unit
+ if not missing_data:
+ # Edge case - relation ID exists but departings
+ _hook_name = hook_name()
+ if (('departed' in _hook_name or 'broken' in _hook_name) and
+ related_interface in _hook_name):
+ state = 'blocked'
+ missing_relations.add(generic_interface)
+ juju_log("{} relation's interface, {}, "
+ "relationship is departed or broken "
+ "and is required for functionality."
+ "".format(generic_interface, related_interface),
+ "WARN")
+ # Normal case relation ID exists but no related unit
+ # (joining)
+ else:
+ juju_log("{} relations's interface, {}, is related but has"
+ " no units in the relation."
+ "".format(generic_interface, related_interface),
+ "INFO")
+ # Related unit exists and data missing on the relation
+ else:
+ juju_log("{} relation's interface, {}, is related awaiting "
+ "the following data from the relationship: {}. "
+ "".format(generic_interface, related_interface,
+ ", ".join(missing_data)), "INFO")
+ if state != 'blocked':
+ state = 'waiting'
+ if generic_interface not in missing_relations:
+ incomplete_relations.add(generic_interface)
+
+ if missing_relations:
+ message = "Missing relations: {}".format(", ".join(missing_relations))
+ if incomplete_relations:
+ message += "; incomplete relations: {}" \
+ "".format(", ".join(incomplete_relations))
+ state = 'blocked'
+ elif incomplete_relations:
+ message = "Incomplete relations: {}" \
+ "".format(", ".join(incomplete_relations))
+ state = 'waiting'
+
+ return state, message
+
+
+def _ows_check_charm_func(state, message, charm_func_with_configs):
+ """Run a custom check function for the charm to see if it wants to
+ change the state. This is only run if not in 'maintenance' and
+ tests to see if the new state is more important that the previous
+ one determined by the interfaces/relations check.
+
+ @param state: the previously determined state so far.
+ @param message: the user orientated message so far.
+ @param charm_func: a callable function that returns state, message
+ @returns state, message strings.
+ """
+ if charm_func_with_configs:
+ charm_state, charm_message = charm_func_with_configs()
+ if charm_state != 'active' and charm_state != 'unknown':
+ state = workload_state_compare(state, charm_state)
+ if message:
+ charm_message = charm_message.replace("Incomplete relations: ",
+ "")
+ message = "{}, {}".format(message, charm_message)
+ else:
+ message = charm_message
+ return state, message
+
+
+def _ows_check_services_running(services, ports):
+ """Check that the services that should be running are actually running
+ and that any ports specified are being listened to.
+
+ @param services: list of strings OR dictionary specifying services/ports
+ @param ports: list of ports
+ @returns state, message: strings or None, None
+ """
+ messages = []
+ state = None
+ if services is not None:
+ services = _extract_services_list_helper(services)
+ services_running, running = _check_running_services(services)
+ if not all(running):
+ messages.append(
+ "Services not running that should be: {}"
+ .format(", ".join(_filter_tuples(services_running, False))))
+ state = 'blocked'
+ # also verify that the ports that should be open are open
+ # NB, that ServiceManager objects only OPTIONALLY have ports
+ map_not_open, ports_open = (
+ _check_listening_on_services_ports(services))
+ if not all(ports_open):
+ # find which service has missing ports. They are in service
+ # order which makes it a bit easier.
+ message_parts = {service: ", ".join([str(v) for v in open_ports])
+ for service, open_ports in map_not_open.items()}
+ message = ", ".join(
+ ["{}: [{}]".format(s, sp) for s, sp in message_parts.items()])
+ messages.append(
+ "Services with ports not open that should be: {}"
+ .format(message))
+ state = 'blocked'
+
+ if ports is not None:
+ # and we can also check ports which we don't know the service for
+ ports_open, ports_open_bools = _check_listening_on_ports_list(ports)
+ if not all(ports_open_bools):
+ messages.append(
+ "Ports which should be open, but are not: {}"
+ .format(", ".join([str(p) for p, v in ports_open
+ if not v])))
+ state = 'blocked'
+
+ if state is not None:
+ message = "; ".join(messages)
+ return state, message
+
+ return None, None
+
+
+def _extract_services_list_helper(services):
+ """Extract a OrderedDict of {service: [ports]} of the supplied services
+ for use by the other functions.
+
+ The services object can either be:
+ - None : no services were passed (an empty dict is returned)
+ - a list of strings
+ - A dictionary (optionally OrderedDict) {service_name: {'service': ..}}
+ - An array of [{'service': service_name, ...}, ...]
+
+ @param services: see above
+ @returns OrderedDict(service: [ports], ...)
+ """
+ if services is None:
+ return {}
+ if isinstance(services, dict):
+ services = services.values()
+ # either extract the list of services from the dictionary, or if
+ # it is a simple string, use that. i.e. works with mixed lists.
+ _s = OrderedDict()
+ for s in services:
+ if isinstance(s, dict) and 'service' in s:
+ _s[s['service']] = s.get('ports', [])
+ if isinstance(s, str):
+ _s[s] = []
+ return _s
+
+
+def _check_running_services(services):
+ """Check that the services dict provided is actually running and provide
+ a list of (service, boolean) tuples for each service.
+
+ Returns both a zipped list of (service, boolean) and a list of booleans
+ in the same order as the services.
+
+ @param services: OrderedDict of strings: [ports], one for each service to
+ check.
+ @returns [(service, boolean), ...], : results for checks
+ [boolean] : just the result of the service checks
+ """
+ services_running = [service_running(s) for s in services]
+ return list(zip(services, services_running)), services_running
+
+
+def _check_listening_on_services_ports(services, test=False):
+ """Check that the unit is actually listening (has the port open) on the
+ ports that the service specifies are open. If test is True then the
+ function returns the services with ports that are open rather than
+ closed.
+
+ Returns an OrderedDict of service: ports and a list of booleans
+
+ @param services: OrderedDict(service: [port, ...], ...)
+ @param test: default=False, if False, test for closed, otherwise open.
+ @returns OrderedDict(service: [port-not-open, ...]...), [boolean]
+ """
+ test = not(not(test)) # ensure test is True or False
+ all_ports = list(itertools.chain(*services.values()))
+ ports_states = [port_has_listener('0.0.0.0', p) for p in all_ports]
+ map_ports = OrderedDict()
+ matched_ports = [p for p, opened in zip(all_ports, ports_states)
+ if opened == test] # essentially opened xor test
+ for service, ports in services.items():
+ set_ports = set(ports).intersection(matched_ports)
+ if set_ports:
+ map_ports[service] = set_ports
+ return map_ports, ports_states
+
+
+def _check_listening_on_ports_list(ports):
+ """Check that the ports list given are being listened to
+
+ Returns a list of ports being listened to and a list of the
+ booleans.
+
+ @param ports: LIST or port numbers.
+ @returns [(port_num, boolean), ...], [boolean]
+ """
+ ports_open = [port_has_listener('0.0.0.0', p) for p in ports]
+ return zip(ports, ports_open), ports_open
+
+
+def _filter_tuples(services_states, state):
+ """Return a simple list from a list of tuples according to the condition
+
+ @param services_states: LIST of (string, boolean): service and running
+ state.
+ @param state: Boolean to match the tuple against.
+ @returns [LIST of strings] that matched the tuple RHS.
+ """
+ return [s for s, b in services_states if b == state]
+
+
+def workload_state_compare(current_workload_state, workload_state):
+ """ Return highest priority of two states"""
+ hierarchy = {'unknown': -1,
+ 'active': 0,
+ 'maintenance': 1,
+ 'waiting': 2,
+ 'blocked': 3,
+ }
+
+ if hierarchy.get(workload_state) is None:
+ workload_state = 'unknown'
+ if hierarchy.get(current_workload_state) is None:
+ current_workload_state = 'unknown'
+
+ # Set workload_state based on hierarchy of statuses
+ if hierarchy.get(current_workload_state) > hierarchy.get(workload_state):
+ return current_workload_state
+ else:
+ return workload_state
+
+
+def incomplete_relation_data(configs, required_interfaces):
+ """Check complete contexts against required_interfaces
+ Return dictionary of incomplete relation data.
+
+ configs is an OSConfigRenderer object with configs registered
+
+ required_interfaces is a dictionary of required general interfaces
+ with dictionary values of possible specific interfaces.
+ Example:
+ required_interfaces = {'database': ['shared-db', 'pgsql-db']}
+
+ The interface is said to be satisfied if anyone of the interfaces in the
+ list has a complete context.
+
+ Return dictionary of incomplete or missing required contexts with relation
+ status of interfaces and any missing data points. Example:
+ {'message':
+ {'amqp': {'missing_data': ['rabbitmq_password'], 'related': True},
+ 'zeromq-configuration': {'related': False}},
+ 'identity':
+ {'identity-service': {'related': False}},
+ 'database':
+ {'pgsql-db': {'related': False},
+ 'shared-db': {'related': True}}}
+ """
+ complete_ctxts = configs.complete_contexts()
+ incomplete_relations = [
+ svc_type
+ for svc_type, interfaces in required_interfaces.items()
+ if not set(interfaces).intersection(complete_ctxts)]
+ return {
+ i: configs.get_incomplete_context_data(required_interfaces[i])
+ for i in incomplete_relations}
+
+
+def do_action_openstack_upgrade(package, upgrade_callback, configs):
+ """Perform action-managed OpenStack upgrade.
+
+ Upgrades packages to the configured openstack-origin version and sets
+ the corresponding action status as a result.
+
+ If the charm was installed from source we cannot upgrade it.
+ For backwards compatibility a config flag (action-managed-upgrade) must
+ be set for this code to run, otherwise a full service level upgrade will
+ fire on config-changed.
+
+ @param package: package name for determining if upgrade available
+ @param upgrade_callback: function callback to charm's upgrade function
+ @param configs: templating object derived from OSConfigRenderer class
+
+ @return: True if upgrade successful; False if upgrade failed or skipped
+ """
+ ret = False
+
+ if git_install_requested():
+ action_set({'outcome': 'installed from source, skipped upgrade.'})
+ else:
+ if openstack_upgrade_available(package):
+ if config('action-managed-upgrade'):
+ juju_log('Upgrading OpenStack release')
+
+ try:
+ upgrade_callback(configs=configs)
+ action_set({'outcome': 'success, upgrade completed.'})
+ ret = True
+ except:
+ action_set({'outcome': 'upgrade failed, see traceback.'})
+ action_set({'traceback': traceback.format_exc()})
+ action_fail('do_openstack_upgrade resulted in an '
+ 'unexpected error')
+ else:
+ action_set({'outcome': 'action-managed-upgrade config is '
+ 'False, skipped upgrade.'})
+ else:
+ action_set({'outcome': 'no upgrade available.'})
+
+ return ret
+
+
+def remote_restart(rel_name, remote_service=None):
+ trigger = {
+ 'restart-trigger': str(uuid.uuid4()),
+ }
+ if remote_service:
+ trigger['remote-service'] = remote_service
+ for rid in relation_ids(rel_name):
+ # This subordinate can be related to two seperate services using
+ # different subordinate relations so only issue the restart if
+ # the principle is conencted down the relation we think it is
+ if related_units(relid=rid):
+ relation_set(relation_id=rid,
+ relation_settings=trigger,
+ )
+
+
+def check_actually_paused(services=None, ports=None):
+ """Check that services listed in the services object and and ports
+ are actually closed (not listened to), to verify that the unit is
+ properly paused.
+
+ @param services: See _extract_services_list_helper
+ @returns status, : string for status (None if okay)
+ message : string for problem for status_set
+ """
+ state = None
+ message = None
+ messages = []
+ if services is not None:
+ services = _extract_services_list_helper(services)
+ services_running, services_states = _check_running_services(services)
+ if any(services_states):
+ # there shouldn't be any running so this is a problem
+ messages.append("these services running: {}"
+ .format(", ".join(
+ _filter_tuples(services_running, True))))
+ state = "blocked"
+ ports_open, ports_open_bools = (
+ _check_listening_on_services_ports(services, True))
+ if any(ports_open_bools):
+ message_parts = {service: ", ".join([str(v) for v in open_ports])
+ for service, open_ports in ports_open.items()}
+ message = ", ".join(
+ ["{}: [{}]".format(s, sp) for s, sp in message_parts.items()])
+ messages.append(
+ "these service:ports are open: {}".format(message))
+ state = 'blocked'
+ if ports is not None:
+ ports_open, bools = _check_listening_on_ports_list(ports)
+ if any(bools):
+ messages.append(
+ "these ports which should be closed, but are open: {}"
+ .format(", ".join([str(p) for p, v in ports_open if v])))
+ state = 'blocked'
+ if messages:
+ message = ("Services should be paused but {}"
+ .format(", ".join(messages)))
+ return state, message
+
+
+def set_unit_paused():
+ """Set the unit to a paused state in the local kv() store.
+ This does NOT actually pause the unit
+ """
+ with unitdata.HookData()() as t:
+ kv = t[0]
+ kv.set('unit-paused', True)
+
+
+def clear_unit_paused():
+ """Clear the unit from a paused state in the local kv() store
+ This does NOT actually restart any services - it only clears the
+ local state.
+ """
+ with unitdata.HookData()() as t:
+ kv = t[0]
+ kv.set('unit-paused', False)
+
+
+def is_unit_paused_set():
+ """Return the state of the kv().get('unit-paused').
+ This does NOT verify that the unit really is paused.
+
+ To help with units that don't have HookData() (testing)
+ if it excepts, return False
+ """
+ try:
+ with unitdata.HookData()() as t:
+ kv = t[0]
+ # transform something truth-y into a Boolean.
+ return not(not(kv.get('unit-paused')))
+ except:
+ return False
+
+
+def pause_unit(assess_status_func, services=None, ports=None,
+ charm_func=None):
+ """Pause a unit by stopping the services and setting 'unit-paused'
+ in the local kv() store.
+
+ Also checks that the services have stopped and ports are no longer
+ being listened to.
+
+ An optional charm_func() can be called that can either raise an
+ Exception or return non None, None to indicate that the unit
+ didn't pause cleanly.
+
+ The signature for charm_func is:
+ charm_func() -> message: string
+
+ charm_func() is executed after any services are stopped, if supplied.
+
+ The services object can either be:
+ - None : no services were passed (an empty dict is returned)
+ - a list of strings
+ - A dictionary (optionally OrderedDict) {service_name: {'service': ..}}
+ - An array of [{'service': service_name, ...}, ...]
+
+ @param assess_status_func: (f() -> message: string | None) or None
+ @param services: OPTIONAL see above
+ @param ports: OPTIONAL list of port
+ @param charm_func: function to run for custom charm pausing.
+ @returns None
+ @raises Exception(message) on an error for action_fail().
+ """
+ services = _extract_services_list_helper(services)
+ messages = []
+ if services:
+ for service in services.keys():
+ stopped = service_pause(service)
+ if not stopped:
+ messages.append("{} didn't stop cleanly.".format(service))
+ if charm_func:
+ try:
+ message = charm_func()
+ if message:
+ messages.append(message)
+ except Exception as e:
+ message.append(str(e))
+ set_unit_paused()
+ if assess_status_func:
+ message = assess_status_func()
+ if message:
+ messages.append(message)
+ if messages:
+ raise Exception("Couldn't pause: {}".format("; ".join(messages)))
+
+
+def resume_unit(assess_status_func, services=None, ports=None,
+ charm_func=None):
+ """Resume a unit by starting the services and clearning 'unit-paused'
+ in the local kv() store.
+
+ Also checks that the services have started and ports are being listened to.
+
+ An optional charm_func() can be called that can either raise an
+ Exception or return non None to indicate that the unit
+ didn't resume cleanly.
+
+ The signature for charm_func is:
+ charm_func() -> message: string
+
+ charm_func() is executed after any services are started, if supplied.
+
+ The services object can either be:
+ - None : no services were passed (an empty dict is returned)
+ - a list of strings
+ - A dictionary (optionally OrderedDict) {service_name: {'service': ..}}
+ - An array of [{'service': service_name, ...}, ...]
+
+ @param assess_status_func: (f() -> message: string | None) or None
+ @param services: OPTIONAL see above
+ @param ports: OPTIONAL list of port
+ @param charm_func: function to run for custom charm resuming.
+ @returns None
+ @raises Exception(message) on an error for action_fail().
+ """
+ services = _extract_services_list_helper(services)
+ messages = []
+ if services:
+ for service in services.keys():
+ started = service_resume(service)
+ if not started:
+ messages.append("{} didn't start cleanly.".format(service))
+ if charm_func:
+ try:
+ message = charm_func()
+ if message:
+ messages.append(message)
+ except Exception as e:
+ message.append(str(e))
+ clear_unit_paused()
+ if assess_status_func:
+ message = assess_status_func()
+ if message:
+ messages.append(message)
+ if messages:
+ raise Exception("Couldn't resume: {}".format("; ".join(messages)))
+
+
+def make_assess_status_func(*args, **kwargs):
+ """Creates an assess_status_func() suitable for handing to pause_unit()
+ and resume_unit().
+
+ This uses the _determine_os_workload_status(...) function to determine
+ what the workload_status should be for the unit. If the unit is
+ not in maintenance or active states, then the message is returned to
+ the caller. This is so an action that doesn't result in either a
+ complete pause or complete resume can signal failure with an action_fail()
+ """
+ def _assess_status_func():
+ state, message = _determine_os_workload_status(*args, **kwargs)
+ status_set(state, message)
+ if state not in ['maintenance', 'active']:
+ return message
+ return None
+
+ return _assess_status_func
+
+
+def pausable_restart_on_change(restart_map, stopstart=False):
+ """A restart_on_change decorator that checks to see if the unit is
+ paused. If it is paused then the decorated function doesn't fire.
+
+ This is provided as a helper, as the @restart_on_change(...) decorator
+ is in core.host, yet the openstack specific helpers are in this file
+ (contrib.openstack.utils). Thus, this needs to be an optional feature
+ for openstack charms (or charms that wish to use the openstack
+ pause/resume type features).
+
+ It is used as follows:
+
+ from contrib.openstack.utils import (
+ pausable_restart_on_change as restart_on_change)
+
+ @restart_on_change(restart_map, stopstart=)
+ def some_hook(...):
+ pass
+
+ see core.utils.restart_on_change() for more details.
+
+ @param f: the function to decorate
+ @param restart_map: the restart map {conf_file: [services]}
+ @param stopstart: DEFAULT false; whether to stop, start or just restart
+ @returns decorator to use a restart_on_change with pausability
+ """
+ def wrap(f):
+ @functools.wraps(f)
+ def wrapped_f(*args, **kwargs):
+ if is_unit_paused_set():
+ return f(*args, **kwargs)
+ # otherwise, normal restart_on_change functionality
+ return restart_on_change_helper(
+ (lambda: f(*args, **kwargs)), restart_map, stopstart)
+ return wrapped_f
+ return wrap
diff --git a/hooks/charmhelpers/contrib/python/packages.py b/hooks/charmhelpers/contrib/python/packages.py
index 10b32e3..a2411c3 100644
--- a/hooks/charmhelpers/contrib/python/packages.py
+++ b/hooks/charmhelpers/contrib/python/packages.py
@@ -19,20 +19,35 @@
import os
import subprocess
+import sys
from charmhelpers.fetch import apt_install, apt_update
from charmhelpers.core.hookenv import charm_dir, log
-try:
- from pip import main as pip_execute
-except ImportError:
- apt_update()
- apt_install('python-pip')
- from pip import main as pip_execute
-
__author__ = "Jorge Niedbalski "
+def pip_execute(*args, **kwargs):
+ """Overriden pip_execute() to stop sys.path being changed.
+
+ The act of importing main from the pip module seems to cause add wheels
+ from the /usr/share/python-wheels which are installed by various tools.
+ This function ensures that sys.path remains the same after the call is
+ executed.
+ """
+ try:
+ _path = sys.path
+ try:
+ from pip import main as _pip_execute
+ except ImportError:
+ apt_update()
+ apt_install('python-pip')
+ from pip import main as _pip_execute
+ _pip_execute(*args, **kwargs)
+ finally:
+ sys.path = _path
+
+
def parse_options(given, available):
"""Given a set of options, check if available"""
for key, value in sorted(given.items()):
@@ -42,8 +57,12 @@ def parse_options(given, available):
yield "--{0}={1}".format(key, value)
-def pip_install_requirements(requirements, **options):
- """Install a requirements file """
+def pip_install_requirements(requirements, constraints=None, **options):
+ """Install a requirements file.
+
+ :param constraints: Path to pip constraints file.
+ http://pip.readthedocs.org/en/stable/user_guide/#constraints-files
+ """
command = ["install"]
available_options = ('proxy', 'src', 'log', )
@@ -51,8 +70,13 @@ def pip_install_requirements(requirements, **options):
command.append(option)
command.append("-r {0}".format(requirements))
- log("Installing from file: {} with options: {}".format(requirements,
- command))
+ if constraints:
+ command.append("-c {0}".format(constraints))
+ log("Installing from file: {} with constraints {} "
+ "and options: {}".format(requirements, constraints, command))
+ else:
+ log("Installing from file: {} with options: {}".format(requirements,
+ command))
pip_execute(command)
diff --git a/hooks/charmhelpers/contrib/storage/linux/ceph.py b/hooks/charmhelpers/contrib/storage/linux/ceph.py
index 00dbffb..1b4b1de 100644
--- a/hooks/charmhelpers/contrib/storage/linux/ceph.py
+++ b/hooks/charmhelpers/contrib/storage/linux/ceph.py
@@ -23,11 +23,16 @@
# James Page
# Adam Gandelman
#
+import bisect
+import errno
+import hashlib
+import six
import os
import shutil
import json
import time
+import uuid
from subprocess import (
check_call,
@@ -35,8 +40,10 @@ from subprocess import (
CalledProcessError,
)
from charmhelpers.core.hookenv import (
+ local_unit,
relation_get,
relation_ids,
+ relation_set,
related_units,
log,
DEBUG,
@@ -56,6 +63,8 @@ from charmhelpers.fetch import (
apt_install,
)
+from charmhelpers.core.kernel import modprobe
+
KEYRING = '/etc/ceph/ceph.client.{}.keyring'
KEYFILE = '/etc/ceph/ceph.client.{}.key'
@@ -67,6 +76,548 @@ log to syslog = {use_syslog}
err to syslog = {use_syslog}
clog to syslog = {use_syslog}
"""
+# For 50 < osds < 240,000 OSDs (Roughly 1 Exabyte at 6T OSDs)
+powers_of_two = [8192, 16384, 32768, 65536, 131072, 262144, 524288, 1048576, 2097152, 4194304, 8388608]
+
+
+def validator(value, valid_type, valid_range=None):
+ """
+ Used to validate these: http://docs.ceph.com/docs/master/rados/operations/pools/#set-pool-values
+ Example input:
+ validator(value=1,
+ valid_type=int,
+ valid_range=[0, 2])
+ This says I'm testing value=1. It must be an int inclusive in [0,2]
+
+ :param value: The value to validate
+ :param valid_type: The type that value should be.
+ :param valid_range: A range of values that value can assume.
+ :return:
+ """
+ assert isinstance(value, valid_type), "{} is not a {}".format(
+ value,
+ valid_type)
+ if valid_range is not None:
+ assert isinstance(valid_range, list), \
+ "valid_range must be a list, was given {}".format(valid_range)
+ # If we're dealing with strings
+ if valid_type is six.string_types:
+ assert value in valid_range, \
+ "{} is not in the list {}".format(value, valid_range)
+ # Integer, float should have a min and max
+ else:
+ if len(valid_range) != 2:
+ raise ValueError(
+ "Invalid valid_range list of {} for {}. "
+ "List must be [min,max]".format(valid_range, value))
+ assert value >= valid_range[0], \
+ "{} is less than minimum allowed value of {}".format(
+ value, valid_range[0])
+ assert value <= valid_range[1], \
+ "{} is greater than maximum allowed value of {}".format(
+ value, valid_range[1])
+
+
+class PoolCreationError(Exception):
+ """
+ A custom error to inform the caller that a pool creation failed. Provides an error message
+ """
+
+ def __init__(self, message):
+ super(PoolCreationError, self).__init__(message)
+
+
+class Pool(object):
+ """
+ An object oriented approach to Ceph pool creation. This base class is inherited by ReplicatedPool and ErasurePool.
+ Do not call create() on this base class as it will not do anything. Instantiate a child class and call create().
+ """
+
+ def __init__(self, service, name):
+ self.service = service
+ self.name = name
+
+ # Create the pool if it doesn't exist already
+ # To be implemented by subclasses
+ def create(self):
+ pass
+
+ def add_cache_tier(self, cache_pool, mode):
+ """
+ Adds a new cache tier to an existing pool.
+ :param cache_pool: six.string_types. The cache tier pool name to add.
+ :param mode: six.string_types. The caching mode to use for this pool. valid range = ["readonly", "writeback"]
+ :return: None
+ """
+ # Check the input types and values
+ validator(value=cache_pool, valid_type=six.string_types)
+ validator(value=mode, valid_type=six.string_types, valid_range=["readonly", "writeback"])
+
+ check_call(['ceph', '--id', self.service, 'osd', 'tier', 'add', self.name, cache_pool])
+ check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, mode])
+ check_call(['ceph', '--id', self.service, 'osd', 'tier', 'set-overlay', self.name, cache_pool])
+ check_call(['ceph', '--id', self.service, 'osd', 'pool', 'set', cache_pool, 'hit_set_type', 'bloom'])
+
+ def remove_cache_tier(self, cache_pool):
+ """
+ Removes a cache tier from Ceph. Flushes all dirty objects from writeback pools and waits for that to complete.
+ :param cache_pool: six.string_types. The cache tier pool name to remove.
+ :return: None
+ """
+ # read-only is easy, writeback is much harder
+ mode = get_cache_mode(self.service, cache_pool)
+ if mode == 'readonly':
+ check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none'])
+ check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
+
+ elif mode == 'writeback':
+ check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'forward'])
+ # Flush the cache and wait for it to return
+ check_call(['rados', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all'])
+ check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name])
+ check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
+
+ def get_pgs(self, pool_size):
+ """
+ :param pool_size: int. pool_size is either the number of replicas for replicated pools or the K+M sum for
+ erasure coded pools
+ :return: int. The number of pgs to use.
+ """
+ validator(value=pool_size, valid_type=int)
+ osd_list = get_osds(self.service)
+ if not osd_list:
+ # NOTE(james-page): Default to 200 for older ceph versions
+ # which don't support OSD query from cli
+ return 200
+
+ osd_list_length = len(osd_list)
+ # Calculate based on Ceph best practices
+ if osd_list_length < 5:
+ return 128
+ elif 5 < osd_list_length < 10:
+ return 512
+ elif 10 < osd_list_length < 50:
+ return 4096
+ else:
+ estimate = (osd_list_length * 100) / pool_size
+ # Return the next nearest power of 2
+ index = bisect.bisect_right(powers_of_two, estimate)
+ return powers_of_two[index]
+
+
+class ReplicatedPool(Pool):
+ def __init__(self, service, name, pg_num=None, replicas=2):
+ super(ReplicatedPool, self).__init__(service=service, name=name)
+ self.replicas = replicas
+ if pg_num is None:
+ self.pg_num = self.get_pgs(self.replicas)
+ else:
+ self.pg_num = pg_num
+
+ def create(self):
+ if not pool_exists(self.service, self.name):
+ # Create it
+ cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create',
+ self.name, str(self.pg_num)]
+ try:
+ check_call(cmd)
+ except CalledProcessError:
+ raise
+
+
+# Default jerasure erasure coded pool
+class ErasurePool(Pool):
+ def __init__(self, service, name, erasure_code_profile="default"):
+ super(ErasurePool, self).__init__(service=service, name=name)
+ self.erasure_code_profile = erasure_code_profile
+
+ def create(self):
+ if not pool_exists(self.service, self.name):
+ # Try to find the erasure profile information so we can properly size the pgs
+ erasure_profile = get_erasure_profile(service=self.service, name=self.erasure_code_profile)
+
+ # Check for errors
+ if erasure_profile is None:
+ log(message='Failed to discover erasure_profile named={}'.format(self.erasure_code_profile),
+ level=ERROR)
+ raise PoolCreationError(message='unable to find erasure profile {}'.format(self.erasure_code_profile))
+ if 'k' not in erasure_profile or 'm' not in erasure_profile:
+ # Error
+ log(message='Unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile),
+ level=ERROR)
+ raise PoolCreationError(
+ message='unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile))
+
+ pgs = self.get_pgs(int(erasure_profile['k']) + int(erasure_profile['m']))
+ # Create it
+ cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create', self.name, str(pgs), str(pgs),
+ 'erasure', self.erasure_code_profile]
+ try:
+ check_call(cmd)
+ except CalledProcessError:
+ raise
+
+ """Get an existing erasure code profile if it already exists.
+ Returns json formatted output"""
+
+
+def get_mon_map(service):
+ """
+ Returns the current monitor map.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :return: json string. :raise: ValueError if the monmap fails to parse.
+ Also raises CalledProcessError if our ceph command fails
+ """
+ try:
+ mon_status = check_output(
+ ['ceph', '--id', service,
+ 'mon_status', '--format=json'])
+ try:
+ return json.loads(mon_status)
+ except ValueError as v:
+ log("Unable to parse mon_status json: {}. Error: {}".format(
+ mon_status, v.message))
+ raise
+ except CalledProcessError as e:
+ log("mon_status command failed with message: {}".format(
+ e.message))
+ raise
+
+
+def hash_monitor_names(service):
+ """
+ Uses the get_mon_map() function to get information about the monitor
+ cluster.
+ Hash the name of each monitor. Return a sorted list of monitor hashes
+ in an ascending order.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :rtype : dict. json dict of monitor name, ip address and rank
+ example: {
+ 'name': 'ip-172-31-13-165',
+ 'rank': 0,
+ 'addr': '172.31.13.165:6789/0'}
+ """
+ try:
+ hash_list = []
+ monitor_list = get_mon_map(service=service)
+ if monitor_list['monmap']['mons']:
+ for mon in monitor_list['monmap']['mons']:
+ hash_list.append(
+ hashlib.sha224(mon['name'].encode('utf-8')).hexdigest())
+ return sorted(hash_list)
+ else:
+ return None
+ except (ValueError, CalledProcessError):
+ raise
+
+
+def monitor_key_delete(service, key):
+ """
+ Delete a key and value pair from the monitor cluster
+ :param service: six.string_types. The Ceph user name to run the command under
+ Deletes a key value pair on the monitor cluster.
+ :param key: six.string_types. The key to delete.
+ """
+ try:
+ check_output(
+ ['ceph', '--id', service,
+ 'config-key', 'del', str(key)])
+ except CalledProcessError as e:
+ log("Monitor config-key put failed with message: {}".format(
+ e.output))
+ raise
+
+
+def monitor_key_set(service, key, value):
+ """
+ Sets a key value pair on the monitor cluster.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param key: six.string_types. The key to set.
+ :param value: The value to set. This will be converted to a string
+ before setting
+ """
+ try:
+ check_output(
+ ['ceph', '--id', service,
+ 'config-key', 'put', str(key), str(value)])
+ except CalledProcessError as e:
+ log("Monitor config-key put failed with message: {}".format(
+ e.output))
+ raise
+
+
+def monitor_key_get(service, key):
+ """
+ Gets the value of an existing key in the monitor cluster.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param key: six.string_types. The key to search for.
+ :return: Returns the value of that key or None if not found.
+ """
+ try:
+ output = check_output(
+ ['ceph', '--id', service,
+ 'config-key', 'get', str(key)])
+ return output
+ except CalledProcessError as e:
+ log("Monitor config-key get failed with message: {}".format(
+ e.output))
+ return None
+
+
+def monitor_key_exists(service, key):
+ """
+ Searches for the existence of a key in the monitor cluster.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param key: six.string_types. The key to search for
+ :return: Returns True if the key exists, False if not and raises an
+ exception if an unknown error occurs. :raise: CalledProcessError if
+ an unknown error occurs
+ """
+ try:
+ check_call(
+ ['ceph', '--id', service,
+ 'config-key', 'exists', str(key)])
+ # I can return true here regardless because Ceph returns
+ # ENOENT if the key wasn't found
+ return True
+ except CalledProcessError as e:
+ if e.returncode == errno.ENOENT:
+ return False
+ else:
+ log("Unknown error from ceph config-get exists: {} {}".format(
+ e.returncode, e.output))
+ raise
+
+
+def get_erasure_profile(service, name):
+ """
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param name:
+ :return:
+ """
+ try:
+ out = check_output(['ceph', '--id', service,
+ 'osd', 'erasure-code-profile', 'get',
+ name, '--format=json'])
+ return json.loads(out)
+ except (CalledProcessError, OSError, ValueError):
+ return None
+
+
+def pool_set(service, pool_name, key, value):
+ """
+ Sets a value for a RADOS pool in ceph.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param pool_name: six.string_types
+ :param key: six.string_types
+ :param value:
+ :return: None. Can raise CalledProcessError
+ """
+ cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', pool_name, key, value]
+ try:
+ check_call(cmd)
+ except CalledProcessError:
+ raise
+
+
+def snapshot_pool(service, pool_name, snapshot_name):
+ """
+ Snapshots a RADOS pool in ceph.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param pool_name: six.string_types
+ :param snapshot_name: six.string_types
+ :return: None. Can raise CalledProcessError
+ """
+ cmd = ['ceph', '--id', service, 'osd', 'pool', 'mksnap', pool_name, snapshot_name]
+ try:
+ check_call(cmd)
+ except CalledProcessError:
+ raise
+
+
+def remove_pool_snapshot(service, pool_name, snapshot_name):
+ """
+ Remove a snapshot from a RADOS pool in ceph.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param pool_name: six.string_types
+ :param snapshot_name: six.string_types
+ :return: None. Can raise CalledProcessError
+ """
+ cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name]
+ try:
+ check_call(cmd)
+ except CalledProcessError:
+ raise
+
+
+# max_bytes should be an int or long
+def set_pool_quota(service, pool_name, max_bytes):
+ """
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param pool_name: six.string_types
+ :param max_bytes: int or long
+ :return: None. Can raise CalledProcessError
+ """
+ # Set a byte quota on a RADOS pool in ceph.
+ cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name,
+ 'max_bytes', str(max_bytes)]
+ try:
+ check_call(cmd)
+ except CalledProcessError:
+ raise
+
+
+def remove_pool_quota(service, pool_name):
+ """
+ Set a byte quota on a RADOS pool in ceph.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param pool_name: six.string_types
+ :return: None. Can raise CalledProcessError
+ """
+ cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, 'max_bytes', '0']
+ try:
+ check_call(cmd)
+ except CalledProcessError:
+ raise
+
+
+def remove_erasure_profile(service, profile_name):
+ """
+ Create a new erasure code profile if one does not already exist for it. Updates
+ the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
+ for more details
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param profile_name: six.string_types
+ :return: None. Can raise CalledProcessError
+ """
+ cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'rm',
+ profile_name]
+ try:
+ check_call(cmd)
+ except CalledProcessError:
+ raise
+
+
+def create_erasure_profile(service, profile_name, erasure_plugin_name='jerasure',
+ failure_domain='host',
+ data_chunks=2, coding_chunks=1,
+ locality=None, durability_estimator=None):
+ """
+ Create a new erasure code profile if one does not already exist for it. Updates
+ the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
+ for more details
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param profile_name: six.string_types
+ :param erasure_plugin_name: six.string_types
+ :param failure_domain: six.string_types. One of ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region',
+ 'room', 'root', 'row'])
+ :param data_chunks: int
+ :param coding_chunks: int
+ :param locality: int
+ :param durability_estimator: int
+ :return: None. Can raise CalledProcessError
+ """
+ # Ensure this failure_domain is allowed by Ceph
+ validator(failure_domain, six.string_types,
+ ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', 'room', 'root', 'row'])
+
+ cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'set', profile_name,
+ 'plugin=' + erasure_plugin_name, 'k=' + str(data_chunks), 'm=' + str(coding_chunks),
+ 'ruleset_failure_domain=' + failure_domain]
+ if locality is not None and durability_estimator is not None:
+ raise ValueError("create_erasure_profile should be called with k, m and one of l or c but not both.")
+
+ # Add plugin specific information
+ if locality is not None:
+ # For local erasure codes
+ cmd.append('l=' + str(locality))
+ if durability_estimator is not None:
+ # For Shec erasure codes
+ cmd.append('c=' + str(durability_estimator))
+
+ if erasure_profile_exists(service, profile_name):
+ cmd.append('--force')
+
+ try:
+ check_call(cmd)
+ except CalledProcessError:
+ raise
+
+
+def rename_pool(service, old_name, new_name):
+ """
+ Rename a Ceph pool from old_name to new_name
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param old_name: six.string_types
+ :param new_name: six.string_types
+ :return: None
+ """
+ validator(value=old_name, valid_type=six.string_types)
+ validator(value=new_name, valid_type=six.string_types)
+
+ cmd = ['ceph', '--id', service, 'osd', 'pool', 'rename', old_name, new_name]
+ check_call(cmd)
+
+
+def erasure_profile_exists(service, name):
+ """
+ Check to see if an Erasure code profile already exists.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param name: six.string_types
+ :return: int or None
+ """
+ validator(value=name, valid_type=six.string_types)
+ try:
+ check_call(['ceph', '--id', service,
+ 'osd', 'erasure-code-profile', 'get',
+ name])
+ return True
+ except CalledProcessError:
+ return False
+
+
+def get_cache_mode(service, pool_name):
+ """
+ Find the current caching mode of the pool_name given.
+ :param service: six.string_types. The Ceph user name to run the command under
+ :param pool_name: six.string_types
+ :return: int or None
+ """
+ validator(value=service, valid_type=six.string_types)
+ validator(value=pool_name, valid_type=six.string_types)
+ out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json'])
+ try:
+ osd_json = json.loads(out)
+ for pool in osd_json['pools']:
+ if pool['pool_name'] == pool_name:
+ return pool['cache_mode']
+ return None
+ except ValueError:
+ raise
+
+
+def pool_exists(service, name):
+ """Check to see if a RADOS pool already exists."""
+ try:
+ out = check_output(['rados', '--id', service,
+ 'lspools']).decode('UTF-8')
+ except CalledProcessError:
+ return False
+
+ return name in out
+
+
+def get_osds(service):
+ """Return a list of all Ceph Object Storage Daemons currently in the
+ cluster.
+ """
+ version = ceph_version()
+ if version and version >= '0.56':
+ return json.loads(check_output(['ceph', '--id', service,
+ 'osd', 'ls',
+ '--format=json']).decode('UTF-8'))
+
+ return None
def install():
@@ -96,53 +647,37 @@ def create_rbd_image(service, pool, image, sizemb):
check_call(cmd)
-def pool_exists(service, name):
- """Check to see if a RADOS pool already exists."""
- try:
- out = check_output(['rados', '--id', service,
- 'lspools']).decode('UTF-8')
- except CalledProcessError:
- return False
+def update_pool(client, pool, settings):
+ cmd = ['ceph', '--id', client, 'osd', 'pool', 'set', pool]
+ for k, v in six.iteritems(settings):
+ cmd.append(k)
+ cmd.append(v)
- return name in out
+ check_call(cmd)
-def get_osds(service):
- """Return a list of all Ceph Object Storage Daemons currently in the
- cluster.
- """
- version = ceph_version()
- if version and version >= '0.56':
- return json.loads(check_output(['ceph', '--id', service,
- 'osd', 'ls',
- '--format=json']).decode('UTF-8'))
-
- return None
-
-
-def create_pool(service, name, replicas=3):
+def create_pool(service, name, replicas=3, pg_num=None):
"""Create a new RADOS pool."""
if pool_exists(service, name):
log("Ceph pool {} already exists, skipping creation".format(name),
level=WARNING)
return
- # Calculate the number of placement groups based
- # on upstream recommended best practices.
- osds = get_osds(service)
- if osds:
- pgnum = (len(osds) * 100 // replicas)
- else:
- # NOTE(james-page): Default to 200 for older ceph versions
- # which don't support OSD query from cli
- pgnum = 200
+ if not pg_num:
+ # Calculate the number of placement groups based
+ # on upstream recommended best practices.
+ osds = get_osds(service)
+ if osds:
+ pg_num = (len(osds) * 100 // replicas)
+ else:
+ # NOTE(james-page): Default to 200 for older ceph versions
+ # which don't support OSD query from cli
+ pg_num = 200
- cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pgnum)]
+ cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pg_num)]
check_call(cmd)
- cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', name, 'size',
- str(replicas)]
- check_call(cmd)
+ update_pool(service, name, settings={'size': str(replicas)})
def delete_pool(service, name):
@@ -197,10 +732,10 @@ def create_key_file(service, key):
log('Created new keyfile at %s.' % keyfile, level=INFO)
-def get_ceph_nodes():
- """Query named relation 'ceph' to determine current nodes."""
+def get_ceph_nodes(relation='ceph'):
+ """Query named relation to determine current nodes."""
hosts = []
- for r_id in relation_ids('ceph'):
+ for r_id in relation_ids(relation):
for unit in related_units(r_id):
hosts.append(relation_get('private-address', unit=unit, rid=r_id))
@@ -288,17 +823,6 @@ def place_data_on_block_device(blk_device, data_src_dst):
os.chown(data_src_dst, uid, gid)
-# TODO: re-use
-def modprobe(module):
- """Load a kernel module and configure for auto-load on reboot."""
- log('Loading kernel module', level=INFO)
- cmd = ['modprobe', module]
- check_call(cmd)
- with open('/etc/modules', 'r+') as modules:
- if module not in modules.read():
- modules.write(module)
-
-
def copy_files(src, dst, symlinks=False, ignore=None):
"""Copy files from src to dst."""
for item in os.listdir(src):
@@ -363,14 +887,14 @@ def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
service_start(svc)
-def ensure_ceph_keyring(service, user=None, group=None):
+def ensure_ceph_keyring(service, user=None, group=None, relation='ceph'):
"""Ensures a ceph keyring is created for a named service and optionally
ensures user and group ownership.
Returns False if no ceph key is available in relation state.
"""
key = None
- for rid in relation_ids('ceph'):
+ for rid in relation_ids(relation):
for unit in related_units(rid):
key = relation_get('key', rid=rid, unit=unit)
if key:
@@ -411,17 +935,60 @@ class CephBrokerRq(object):
The API is versioned and defaults to version 1.
"""
- def __init__(self, api_version=1):
+
+ def __init__(self, api_version=1, request_id=None):
self.api_version = api_version
+ if request_id:
+ self.request_id = request_id
+ else:
+ self.request_id = str(uuid.uuid1())
self.ops = []
- def add_op_create_pool(self, name, replica_count=3):
+ def add_op_create_pool(self, name, replica_count=3, pg_num=None):
+ """Adds an operation to create a pool.
+
+ @param pg_num setting: optional setting. If not provided, this value
+ will be calculated by the broker based on how many OSDs are in the
+ cluster at the time of creation. Note that, if provided, this value
+ will be capped at the current available maximum.
+ """
self.ops.append({'op': 'create-pool', 'name': name,
- 'replicas': replica_count})
+ 'replicas': replica_count, 'pg_num': pg_num})
+
+ def set_ops(self, ops):
+ """Set request ops to provided value.
+
+ Useful for injecting ops that come from a previous request
+ to allow comparisons to ensure validity.
+ """
+ self.ops = ops
@property
def request(self):
- return json.dumps({'api-version': self.api_version, 'ops': self.ops})
+ return json.dumps({'api-version': self.api_version, 'ops': self.ops,
+ 'request-id': self.request_id})
+
+ def _ops_equal(self, other):
+ if len(self.ops) == len(other.ops):
+ for req_no in range(0, len(self.ops)):
+ for key in ['replicas', 'name', 'op', 'pg_num']:
+ if self.ops[req_no].get(key) != other.ops[req_no].get(key):
+ return False
+ else:
+ return False
+ return True
+
+ def __eq__(self, other):
+ if not isinstance(other, self.__class__):
+ return False
+ if self.api_version == other.api_version and \
+ self._ops_equal(other):
+ return True
+ else:
+ return False
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
class CephBrokerRsp(object):
@@ -431,10 +998,15 @@ class CephBrokerRsp(object):
The API is versioned and defaults to version 1.
"""
+
def __init__(self, encoded_rsp):
self.api_version = None
self.rsp = json.loads(encoded_rsp)
+ @property
+ def request_id(self):
+ return self.rsp.get('request-id')
+
@property
def exit_code(self):
return self.rsp.get('exit-code')
@@ -442,3 +1014,182 @@ class CephBrokerRsp(object):
@property
def exit_msg(self):
return self.rsp.get('stderr')
+
+
+# Ceph Broker Conversation:
+# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
+# and send that request to ceph via the ceph relation. The CephBrokerRq has a
+# unique id so that the client can identity which CephBrokerRsp is associated
+# with the request. Ceph will also respond to each client unit individually
+# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
+# via key broker-rsp-glance-0
+#
+# To use this the charm can just do something like:
+#
+# from charmhelpers.contrib.storage.linux.ceph import (
+# send_request_if_needed,
+# is_request_complete,
+# CephBrokerRq,
+# )
+#
+# @hooks.hook('ceph-relation-changed')
+# def ceph_changed():
+# rq = CephBrokerRq()
+# rq.add_op_create_pool(name='poolname', replica_count=3)
+#
+# if is_request_complete(rq):
+#
+# else:
+# send_request_if_needed(get_ceph_request())
+#
+# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
+# of glance having sent a request to ceph which ceph has successfully processed
+# 'ceph:8': {
+# 'ceph/0': {
+# 'auth': 'cephx',
+# 'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
+# 'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
+# 'ceph-public-address': '10.5.44.103',
+# 'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
+# 'private-address': '10.5.44.103',
+# },
+# 'glance/0': {
+# 'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
+# '"ops": [{"replicas": 3, "name": "glance", '
+# '"op": "create-pool"}]}'),
+# 'private-address': '10.5.44.109',
+# },
+# }
+
+def get_previous_request(rid):
+ """Return the last ceph broker request sent on a given relation
+
+ @param rid: Relation id to query for request
+ """
+ request = None
+ broker_req = relation_get(attribute='broker_req', rid=rid,
+ unit=local_unit())
+ if broker_req:
+ request_data = json.loads(broker_req)
+ request = CephBrokerRq(api_version=request_data['api-version'],
+ request_id=request_data['request-id'])
+ request.set_ops(request_data['ops'])
+
+ return request
+
+
+def get_request_states(request, relation='ceph'):
+ """Return a dict of requests per relation id with their corresponding
+ completion state.
+
+ This allows a charm, which has a request for ceph, to see whether there is
+ an equivalent request already being processed and if so what state that
+ request is in.
+
+ @param request: A CephBrokerRq object
+ """
+ complete = []
+ requests = {}
+ for rid in relation_ids(relation):
+ complete = False
+ previous_request = get_previous_request(rid)
+ if request == previous_request:
+ sent = True
+ complete = is_request_complete_for_rid(previous_request, rid)
+ else:
+ sent = False
+ complete = False
+
+ requests[rid] = {
+ 'sent': sent,
+ 'complete': complete,
+ }
+
+ return requests
+
+
+def is_request_sent(request, relation='ceph'):
+ """Check to see if a functionally equivalent request has already been sent
+
+ Returns True if a similair request has been sent
+
+ @param request: A CephBrokerRq object
+ """
+ states = get_request_states(request, relation=relation)
+ for rid in states.keys():
+ if not states[rid]['sent']:
+ return False
+
+ return True
+
+
+def is_request_complete(request, relation='ceph'):
+ """Check to see if a functionally equivalent request has already been
+ completed
+
+ Returns True if a similair request has been completed
+
+ @param request: A CephBrokerRq object
+ """
+ states = get_request_states(request, relation=relation)
+ for rid in states.keys():
+ if not states[rid]['complete']:
+ return False
+
+ return True
+
+
+def is_request_complete_for_rid(request, rid):
+ """Check if a given request has been completed on the given relation
+
+ @param request: A CephBrokerRq object
+ @param rid: Relation ID
+ """
+ broker_key = get_broker_rsp_key()
+ for unit in related_units(rid):
+ rdata = relation_get(rid=rid, unit=unit)
+ if rdata.get(broker_key):
+ rsp = CephBrokerRsp(rdata.get(broker_key))
+ if rsp.request_id == request.request_id:
+ if not rsp.exit_code:
+ return True
+ else:
+ # The remote unit sent no reply targeted at this unit so either the
+ # remote ceph cluster does not support unit targeted replies or it
+ # has not processed our request yet.
+ if rdata.get('broker_rsp'):
+ request_data = json.loads(rdata['broker_rsp'])
+ if request_data.get('request-id'):
+ log('Ignoring legacy broker_rsp without unit key as remote '
+ 'service supports unit specific replies', level=DEBUG)
+ else:
+ log('Using legacy broker_rsp as remote service does not '
+ 'supports unit specific replies', level=DEBUG)
+ rsp = CephBrokerRsp(rdata['broker_rsp'])
+ if not rsp.exit_code:
+ return True
+
+ return False
+
+
+def get_broker_rsp_key():
+ """Return broker response key for this unit
+
+ This is the key that ceph is going to use to pass request status
+ information back to this unit
+ """
+ return 'broker-rsp-' + local_unit().replace('/', '-')
+
+
+def send_request_if_needed(request, relation='ceph'):
+ """Send broker request if an equivalent request has not already been sent
+
+ @param request: A CephBrokerRq object
+ """
+ if is_request_sent(request, relation=relation):
+ log('Request already sent but not complete, not sending new request',
+ level=DEBUG)
+ else:
+ for rid in relation_ids(relation):
+ log('Sending request {}'.format(request.request_id), level=DEBUG)
+ relation_set(relation_id=rid, broker_req=request.request)
diff --git a/hooks/charmhelpers/contrib/storage/linux/loopback.py b/hooks/charmhelpers/contrib/storage/linux/loopback.py
index c296f09..3a3f514 100644
--- a/hooks/charmhelpers/contrib/storage/linux/loopback.py
+++ b/hooks/charmhelpers/contrib/storage/linux/loopback.py
@@ -76,3 +76,13 @@ def ensure_loopback_device(path, size):
check_call(cmd)
return create_loopback(path)
+
+
+def is_mapped_loopback_device(device):
+ """
+ Checks if a given device name is an existing/mapped loopback device.
+ :param device: str: Full path to the device (eg, /dev/loop1).
+ :returns: str: Path to the backing file if is a loopback device
+ empty string otherwise
+ """
+ return loopback_devices().get(device, "")
diff --git a/hooks/charmhelpers/contrib/storage/linux/utils.py b/hooks/charmhelpers/contrib/storage/linux/utils.py
index e2769e4..1e57941 100644
--- a/hooks/charmhelpers/contrib/storage/linux/utils.py
+++ b/hooks/charmhelpers/contrib/storage/linux/utils.py
@@ -43,9 +43,10 @@ def zap_disk(block_device):
:param block_device: str: Full path of block device to clean.
'''
+ # https://github.com/ceph/ceph/commit/fdd7f8d83afa25c4e09aaedd90ab93f3b64a677b
# sometimes sgdisk exits non-zero; this is OK, dd will clean up
- call(['sgdisk', '--zap-all', '--mbrtogpt',
- '--clear', block_device])
+ call(['sgdisk', '--zap-all', '--', block_device])
+ call(['sgdisk', '--clear', '--mbrtogpt', '--', block_device])
dev_end = check_output(['blockdev', '--getsz',
block_device]).decode('UTF-8')
gpt_end = int(dev_end.split()[0]) - 100
diff --git a/hooks/charmhelpers/core/hookenv.py b/hooks/charmhelpers/core/hookenv.py
index 18860f5..0132129 100644
--- a/hooks/charmhelpers/core/hookenv.py
+++ b/hooks/charmhelpers/core/hookenv.py
@@ -34,23 +34,6 @@ import errno
import tempfile
from subprocess import CalledProcessError
-try:
- from charmhelpers.cli import cmdline
-except ImportError as e:
- # due to the anti-pattern of partially synching charmhelpers directly
- # into charms, it's possible that charmhelpers.cli is not available;
- # if that's the case, they don't really care about using the cli anyway,
- # so mock it out
- if str(e) == 'No module named cli':
- class cmdline(object):
- @classmethod
- def subcommand(cls, *args, **kwargs):
- def _wrap(func):
- return func
- return _wrap
- else:
- raise
-
import six
if not six.PY3:
from UserDict import UserDict
@@ -91,6 +74,7 @@ def cached(func):
res = func(*args, **kwargs)
cache[key] = res
return res
+ wrapper._wrapped = func
return wrapper
@@ -190,7 +174,6 @@ def relation_type():
return os.environ.get('JUJU_RELATION', None)
-@cmdline.subcommand()
@cached
def relation_id(relation_name=None, service_or_unit=None):
"""The relation ID for the current or a specified relation"""
@@ -216,13 +199,11 @@ def remote_unit():
return os.environ.get('JUJU_REMOTE_UNIT', None)
-@cmdline.subcommand()
def service_name():
"""The name service group this unit belongs to"""
return local_unit().split('/')[0]
-@cmdline.subcommand()
@cached
def remote_service_name(relid=None):
"""The remote service name for a given relation-id (or the current relation)"""
@@ -509,6 +490,19 @@ def relation_types():
return rel_types
+@cached
+def peer_relation_id():
+ '''Get the peers relation id if a peers relation has been joined, else None.'''
+ md = metadata()
+ section = md.get('peers')
+ if section:
+ for key in section:
+ relids = relation_ids(key)
+ if relids:
+ return relids[0]
+ return None
+
+
@cached
def relation_to_interface(relation_name):
"""
@@ -523,12 +517,12 @@ def relation_to_interface(relation_name):
def relation_to_role_and_interface(relation_name):
"""
Given the name of a relation, return the role and the name of the interface
- that relation uses (where role is one of ``provides``, ``requires``, or ``peer``).
+ that relation uses (where role is one of ``provides``, ``requires``, or ``peers``).
:returns: A tuple containing ``(role, interface)``, or ``(None, None)``.
"""
_metadata = metadata()
- for role in ('provides', 'requires', 'peer'):
+ for role in ('provides', 'requires', 'peers'):
interface = _metadata.get(role, {}).get(relation_name, {}).get('interface')
if interface:
return role, interface
@@ -540,7 +534,7 @@ def role_and_interface_to_relations(role, interface_name):
"""
Given a role and interface name, return a list of relation names for the
current charm that use that interface under that role (where role is one
- of ``provides``, ``requires``, or ``peer``).
+ of ``provides``, ``requires``, or ``peers``).
:returns: A list of relation names.
"""
@@ -561,7 +555,7 @@ def interface_to_relations(interface_name):
:returns: A list of relation names.
"""
results = []
- for role in ('provides', 'requires', 'peer'):
+ for role in ('provides', 'requires', 'peers'):
results.extend(role_and_interface_to_relations(role, interface_name))
return results
@@ -642,6 +636,38 @@ def unit_private_ip():
return unit_get('private-address')
+@cached
+def storage_get(attribute=None, storage_id=None):
+ """Get storage attributes"""
+ _args = ['storage-get', '--format=json']
+ if storage_id:
+ _args.extend(('-s', storage_id))
+ if attribute:
+ _args.append(attribute)
+ try:
+ return json.loads(subprocess.check_output(_args).decode('UTF-8'))
+ except ValueError:
+ return None
+
+
+@cached
+def storage_list(storage_name=None):
+ """List the storage IDs for the unit"""
+ _args = ['storage-list', '--format=json']
+ if storage_name:
+ _args.append(storage_name)
+ try:
+ return json.loads(subprocess.check_output(_args).decode('UTF-8'))
+ except ValueError:
+ return None
+ except OSError as e:
+ import errno
+ if e.errno == errno.ENOENT:
+ # storage-list does not exist
+ return []
+ raise
+
+
class UnregisteredHookError(Exception):
"""Raised when an undefined hook is called"""
pass
@@ -786,25 +812,28 @@ def status_set(workload_state, message):
def status_get():
- """Retrieve the previously set juju workload state
+ """Retrieve the previously set juju workload state and message
+
+ If the status-get command is not found then assume this is juju < 1.23 and
+ return 'unknown', ""
- If the status-set command is not found then assume this is juju < 1.23 and
- return 'unknown'
"""
- cmd = ['status-get']
+ cmd = ['status-get', "--format=json", "--include-data"]
try:
- raw_status = subprocess.check_output(cmd, universal_newlines=True)
- status = raw_status.rstrip()
- return status
+ raw_status = subprocess.check_output(cmd)
except OSError as e:
if e.errno == errno.ENOENT:
- return 'unknown'
+ return ('unknown', "")
else:
raise
+ else:
+ status = json.loads(raw_status.decode("UTF-8"))
+ return (status["status"], status["message"])
def translate_exc(from_exc, to_exc):
def inner_translate_exc1(f):
+ @wraps(f)
def inner_translate_exc2(*args, **kwargs):
try:
return f(*args, **kwargs)
@@ -849,6 +878,58 @@ def leader_set(settings=None, **kwargs):
subprocess.check_call(cmd)
+@translate_exc(from_exc=OSError, to_exc=NotImplementedError)
+def payload_register(ptype, klass, pid):
+ """ is used while a hook is running to let Juju know that a
+ payload has been started."""
+ cmd = ['payload-register']
+ for x in [ptype, klass, pid]:
+ cmd.append(x)
+ subprocess.check_call(cmd)
+
+
+@translate_exc(from_exc=OSError, to_exc=NotImplementedError)
+def payload_unregister(klass, pid):
+ """ is used while a hook is running to let Juju know
+ that a payload has been manually stopped. The and provided
+ must match a payload that has been previously registered with juju using
+ payload-register."""
+ cmd = ['payload-unregister']
+ for x in [klass, pid]:
+ cmd.append(x)
+ subprocess.check_call(cmd)
+
+
+@translate_exc(from_exc=OSError, to_exc=NotImplementedError)
+def payload_status_set(klass, pid, status):
+ """is used to update the current status of a registered payload.
+ The and provided must match a payload that has been previously
+ registered with juju using payload-register. The must be one of the
+ follow: starting, started, stopping, stopped"""
+ cmd = ['payload-status-set']
+ for x in [klass, pid, status]:
+ cmd.append(x)
+ subprocess.check_call(cmd)
+
+
+@translate_exc(from_exc=OSError, to_exc=NotImplementedError)
+def resource_get(name):
+ """used to fetch the resource path of the given name.
+
+ must match a name of defined resource in metadata.yaml
+
+ returns either a path or False if resource not available
+ """
+ if not name:
+ return False
+
+ cmd = ['resource-get', name]
+ try:
+ return subprocess.check_output(cmd).decode('UTF-8')
+ except subprocess.CalledProcessError:
+ return False
+
+
@cached
def juju_version():
"""Full version string (eg. '1.23.3.1-trusty-amd64')"""
@@ -913,3 +994,16 @@ def _run_atexit():
for callback, args, kwargs in reversed(_atexit):
callback(*args, **kwargs)
del _atexit[:]
+
+
+@translate_exc(from_exc=OSError, to_exc=NotImplementedError)
+def network_get_primary_address(binding):
+ '''
+ Retrieve the primary network address for a named binding
+
+ :param binding: string. The name of a relation of extra-binding
+ :return: string. The primary IP address for the named binding
+ :raise: NotImplementedError if run on Juju < 2.0
+ '''
+ cmd = ['network-get', '--primary-address', binding]
+ return subprocess.check_output(cmd).strip()
diff --git a/hooks/charmhelpers/core/host.py b/hooks/charmhelpers/core/host.py
index 8ae8ef8..481087b 100644
--- a/hooks/charmhelpers/core/host.py
+++ b/hooks/charmhelpers/core/host.py
@@ -30,6 +30,8 @@ import random
import string
import subprocess
import hashlib
+import functools
+import itertools
from contextlib import contextmanager
from collections import OrderedDict
@@ -63,55 +65,86 @@ def service_reload(service_name, restart_on_failure=False):
return service_result
-def service_pause(service_name, init_dir=None):
+def service_pause(service_name, init_dir="/etc/init", initd_dir="/etc/init.d"):
"""Pause a system service.
Stop it, and prevent it from starting again at boot."""
- if init_dir is None:
- init_dir = "/etc/init"
- stopped = service_stop(service_name)
- # XXX: Support systemd too
- override_path = os.path.join(
- init_dir, '{}.conf.override'.format(service_name))
- with open(override_path, 'w') as fh:
- fh.write("manual\n")
+ stopped = True
+ if service_running(service_name):
+ stopped = service_stop(service_name)
+ upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
+ sysv_file = os.path.join(initd_dir, service_name)
+ if init_is_systemd():
+ service('disable', service_name)
+ elif os.path.exists(upstart_file):
+ override_path = os.path.join(
+ init_dir, '{}.override'.format(service_name))
+ with open(override_path, 'w') as fh:
+ fh.write("manual\n")
+ elif os.path.exists(sysv_file):
+ subprocess.check_call(["update-rc.d", service_name, "disable"])
+ else:
+ raise ValueError(
+ "Unable to detect {0} as SystemD, Upstart {1} or"
+ " SysV {2}".format(
+ service_name, upstart_file, sysv_file))
return stopped
-def service_resume(service_name, init_dir=None):
+def service_resume(service_name, init_dir="/etc/init",
+ initd_dir="/etc/init.d"):
"""Resume a system service.
Reenable starting again at boot. Start the service"""
- # XXX: Support systemd too
- if init_dir is None:
- init_dir = "/etc/init"
- override_path = os.path.join(
- init_dir, '{}.conf.override'.format(service_name))
- if os.path.exists(override_path):
- os.unlink(override_path)
- started = service_start(service_name)
+ upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
+ sysv_file = os.path.join(initd_dir, service_name)
+ if init_is_systemd():
+ service('enable', service_name)
+ elif os.path.exists(upstart_file):
+ override_path = os.path.join(
+ init_dir, '{}.override'.format(service_name))
+ if os.path.exists(override_path):
+ os.unlink(override_path)
+ elif os.path.exists(sysv_file):
+ subprocess.check_call(["update-rc.d", service_name, "enable"])
+ else:
+ raise ValueError(
+ "Unable to detect {0} as SystemD, Upstart {1} or"
+ " SysV {2}".format(
+ service_name, upstart_file, sysv_file))
+
+ started = service_running(service_name)
+ if not started:
+ started = service_start(service_name)
return started
def service(action, service_name):
"""Control a system service"""
- cmd = ['service', service_name, action]
+ if init_is_systemd():
+ cmd = ['systemctl', action, service_name]
+ else:
+ cmd = ['service', service_name, action]
return subprocess.call(cmd) == 0
-def service_running(service):
+def service_running(service_name):
"""Determine whether a system service is running"""
- try:
- output = subprocess.check_output(
- ['service', service, 'status'],
- stderr=subprocess.STDOUT).decode('UTF-8')
- except subprocess.CalledProcessError:
- return False
+ if init_is_systemd():
+ return service('is-active', service_name)
else:
- if ("start/running" in output or "is running" in output):
- return True
- else:
+ try:
+ output = subprocess.check_output(
+ ['service', service_name, 'status'],
+ stderr=subprocess.STDOUT).decode('UTF-8')
+ except subprocess.CalledProcessError:
return False
+ else:
+ if ("start/running" in output or "is running" in output or
+ "up and running" in output):
+ return True
+ else:
+ return False
def service_available(service_name):
@@ -126,8 +159,29 @@ def service_available(service_name):
return True
-def adduser(username, password=None, shell='/bin/bash', system_user=False):
- """Add a user to the system"""
+SYSTEMD_SYSTEM = '/run/systemd/system'
+
+
+def init_is_systemd():
+ """Return True if the host system uses systemd, False otherwise."""
+ return os.path.isdir(SYSTEMD_SYSTEM)
+
+
+def adduser(username, password=None, shell='/bin/bash', system_user=False,
+ primary_group=None, secondary_groups=None):
+ """Add a user to the system.
+
+ Will log but otherwise succeed if the user already exists.
+
+ :param str username: Username to create
+ :param str password: Password for user; if ``None``, create a system user
+ :param str shell: The default shell for the user
+ :param bool system_user: Whether to create a login or system user
+ :param str primary_group: Primary group for user; defaults to username
+ :param list secondary_groups: Optional list of additional groups
+
+ :returns: The password database entry struct, as returned by `pwd.getpwnam`
+ """
try:
user_info = pwd.getpwnam(username)
log('user {0} already exists!'.format(username))
@@ -142,12 +196,32 @@ def adduser(username, password=None, shell='/bin/bash', system_user=False):
'--shell', shell,
'--password', password,
])
+ if not primary_group:
+ try:
+ grp.getgrnam(username)
+ primary_group = username # avoid "group exists" error
+ except KeyError:
+ pass
+ if primary_group:
+ cmd.extend(['-g', primary_group])
+ if secondary_groups:
+ cmd.extend(['-G', ','.join(secondary_groups)])
cmd.append(username)
subprocess.check_call(cmd)
user_info = pwd.getpwnam(username)
return user_info
+def user_exists(username):
+ """Check if a user exists"""
+ try:
+ pwd.getpwnam(username)
+ user_exists = True
+ except KeyError:
+ user_exists = False
+ return user_exists
+
+
def add_group(group_name, system_group=False):
"""Add a group to the system"""
try:
@@ -229,14 +303,12 @@ def write_file(path, content, owner='root', group='root', perms=0o444):
def fstab_remove(mp):
- """Remove the given mountpoint entry from /etc/fstab
- """
+ """Remove the given mountpoint entry from /etc/fstab"""
return Fstab.remove_by_mountpoint(mp)
def fstab_add(dev, mp, fs, options=None):
- """Adds the given device entry to the /etc/fstab file
- """
+ """Adds the given device entry to the /etc/fstab file"""
return Fstab.add(dev, mp, fs, options=options)
@@ -280,9 +352,19 @@ def mounts():
return system_mounts
+def fstab_mount(mountpoint):
+ """Mount filesystem using fstab"""
+ cmd_args = ['mount', mountpoint]
+ try:
+ subprocess.check_output(cmd_args)
+ except subprocess.CalledProcessError as e:
+ log('Error unmounting {}\n{}'.format(mountpoint, e.output))
+ return False
+ return True
+
+
def file_hash(path, hash_type='md5'):
- """
- Generate a hash checksum of the contents of 'path' or None if not found.
+ """Generate a hash checksum of the contents of 'path' or None if not found.
:param str hash_type: Any hash alrgorithm supported by :mod:`hashlib`,
such as md5, sha1, sha256, sha512, etc.
@@ -297,10 +379,9 @@ def file_hash(path, hash_type='md5'):
def path_hash(path):
- """
- Generate a hash checksum of all files matching 'path'. Standard wildcards
- like '*' and '?' are supported, see documentation for the 'glob' module for
- more information.
+ """Generate a hash checksum of all files matching 'path'. Standard
+ wildcards like '*' and '?' are supported, see documentation for the 'glob'
+ module for more information.
:return: dict: A { filename: hash } dictionary for all matched files.
Empty if none found.
@@ -312,8 +393,7 @@ def path_hash(path):
def check_hash(path, checksum, hash_type='md5'):
- """
- Validate a file using a cryptographic checksum.
+ """Validate a file using a cryptographic checksum.
:param str checksum: Value of the checksum used to validate the file.
:param str hash_type: Hash algorithm used to generate `checksum`.
@@ -328,6 +408,7 @@ def check_hash(path, checksum, hash_type='md5'):
class ChecksumError(ValueError):
+ """A class derived from Value error to indicate the checksum failed."""
pass
@@ -349,27 +430,47 @@ def restart_on_change(restart_map, stopstart=False):
restarted if any file matching the pattern got changed, created
or removed. Standard wildcards are supported, see documentation
for the 'glob' module for more information.
+
+ @param restart_map: {path_file_name: [service_name, ...]
+ @param stopstart: DEFAULT false; whether to stop, start OR restart
+ @returns result from decorated function
"""
def wrap(f):
+ @functools.wraps(f)
def wrapped_f(*args, **kwargs):
- checksums = {path: path_hash(path) for path in restart_map}
- f(*args, **kwargs)
- restarts = []
- for path in restart_map:
- if path_hash(path) != checksums[path]:
- restarts += restart_map[path]
- services_list = list(OrderedDict.fromkeys(restarts))
- if not stopstart:
- for service_name in services_list:
- service('restart', service_name)
- else:
- for action in ['stop', 'start']:
- for service_name in services_list:
- service(action, service_name)
+ return restart_on_change_helper(
+ (lambda: f(*args, **kwargs)), restart_map, stopstart)
return wrapped_f
return wrap
+def restart_on_change_helper(lambda_f, restart_map, stopstart=False):
+ """Helper function to perform the restart_on_change function.
+
+ This is provided for decorators to restart services if files described
+ in the restart_map have changed after an invocation of lambda_f().
+
+ @param lambda_f: function to call.
+ @param restart_map: {file: [service, ...]}
+ @param stopstart: whether to stop, start or restart a service
+ @returns result of lambda_f()
+ """
+ checksums = {path: path_hash(path) for path in restart_map}
+ r = lambda_f()
+ # create a list of lists of the services to restart
+ restarts = [restart_map[path]
+ for path in restart_map
+ if path_hash(path) != checksums[path]]
+ # create a flat list of ordered services without duplicates from lists
+ services_list = list(OrderedDict.fromkeys(itertools.chain(*restarts)))
+ if services_list:
+ actions = ('stop', 'start') if stopstart else ('restart',)
+ for action in actions:
+ for service_name in services_list:
+ service(action, service_name)
+ return r
+
+
def lsb_release():
"""Return /etc/lsb-release in a dict"""
d = {}
@@ -396,36 +497,92 @@ def pwgen(length=None):
return(''.join(random_chars))
-def list_nics(nic_type):
- '''Return a list of nics of given type(s)'''
+def is_phy_iface(interface):
+ """Returns True if interface is not virtual, otherwise False."""
+ if interface:
+ sys_net = '/sys/class/net'
+ if os.path.isdir(sys_net):
+ for iface in glob.glob(os.path.join(sys_net, '*')):
+ if '/virtual/' in os.path.realpath(iface):
+ continue
+
+ if interface == os.path.basename(iface):
+ return True
+
+ return False
+
+
+def get_bond_master(interface):
+ """Returns bond master if interface is bond slave otherwise None.
+
+ NOTE: the provided interface is expected to be physical
+ """
+ if interface:
+ iface_path = '/sys/class/net/%s' % (interface)
+ if os.path.exists(iface_path):
+ if '/virtual/' in os.path.realpath(iface_path):
+ return None
+
+ master = os.path.join(iface_path, 'master')
+ if os.path.exists(master):
+ master = os.path.realpath(master)
+ # make sure it is a bond master
+ if os.path.exists(os.path.join(master, 'bonding')):
+ return os.path.basename(master)
+
+ return None
+
+
+def list_nics(nic_type=None):
+ """Return a list of nics of given type(s)"""
if isinstance(nic_type, six.string_types):
int_types = [nic_type]
else:
int_types = nic_type
+
interfaces = []
- for int_type in int_types:
- cmd = ['ip', 'addr', 'show', 'label', int_type + '*']
+ if nic_type:
+ for int_type in int_types:
+ cmd = ['ip', 'addr', 'show', 'label', int_type + '*']
+ ip_output = subprocess.check_output(cmd).decode('UTF-8')
+ ip_output = ip_output.split('\n')
+ ip_output = (line for line in ip_output if line)
+ for line in ip_output:
+ if line.split()[1].startswith(int_type):
+ matched = re.search('.*: (' + int_type +
+ r'[0-9]+\.[0-9]+)@.*', line)
+ if matched:
+ iface = matched.groups()[0]
+ else:
+ iface = line.split()[1].replace(":", "")
+
+ if iface not in interfaces:
+ interfaces.append(iface)
+ else:
+ cmd = ['ip', 'a']
ip_output = subprocess.check_output(cmd).decode('UTF-8').split('\n')
- ip_output = (line for line in ip_output if line)
+ ip_output = (line.strip() for line in ip_output if line)
+
+ key = re.compile('^[0-9]+:\s+(.+):')
for line in ip_output:
- if line.split()[1].startswith(int_type):
- matched = re.search('.*: (' + int_type + r'[0-9]+\.[0-9]+)@.*', line)
- if matched:
- interface = matched.groups()[0]
- else:
- interface = line.split()[1].replace(":", "")
- interfaces.append(interface)
+ matched = re.search(key, line)
+ if matched:
+ iface = matched.group(1)
+ iface = iface.partition("@")[0]
+ if iface not in interfaces:
+ interfaces.append(iface)
return interfaces
def set_nic_mtu(nic, mtu):
- '''Set MTU on a network interface'''
+ """Set the Maximum Transmission Unit (MTU) on a network interface."""
cmd = ['ip', 'link', 'set', nic, 'mtu', mtu]
subprocess.check_call(cmd)
def get_nic_mtu(nic):
+ """Return the Maximum Transmission Unit (MTU) for a network interface."""
cmd = ['ip', 'addr', 'show', nic]
ip_output = subprocess.check_output(cmd).decode('UTF-8').split('\n')
mtu = ""
@@ -437,6 +594,7 @@ def get_nic_mtu(nic):
def get_nic_hwaddr(nic):
+ """Return the Media Access Control (MAC) for a network interface."""
cmd = ['ip', '-o', '-0', 'addr', 'show', nic]
ip_output = subprocess.check_output(cmd).decode('UTF-8')
hwaddr = ""
@@ -447,7 +605,7 @@ def get_nic_hwaddr(nic):
def cmp_pkgrevno(package, revno, pkgcache=None):
- '''Compare supplied revno with the revno of the installed package
+ """Compare supplied revno with the revno of the installed package
* 1 => Installed revno is greater than supplied arg
* 0 => Installed revno is the same as supplied arg
@@ -456,7 +614,7 @@ def cmp_pkgrevno(package, revno, pkgcache=None):
This function imports apt_cache function from charmhelpers.fetch if
the pkgcache argument is None. Be sure to add charmhelpers.fetch if
you call this function, or pass an apt_pkg.Cache() instance.
- '''
+ """
import apt_pkg
if not pkgcache:
from charmhelpers.fetch import apt_cache
@@ -466,15 +624,30 @@ def cmp_pkgrevno(package, revno, pkgcache=None):
@contextmanager
-def chdir(d):
+def chdir(directory):
+ """Change the current working directory to a different directory for a code
+ block and return the previous directory after the block exits. Useful to
+ run commands from a specificed directory.
+
+ :param str directory: The directory path to change to for this context.
+ """
cur = os.getcwd()
try:
- yield os.chdir(d)
+ yield os.chdir(directory)
finally:
os.chdir(cur)
-def chownr(path, owner, group, follow_links=True):
+def chownr(path, owner, group, follow_links=True, chowntopdir=False):
+ """Recursively change user and group ownership of files and directories
+ in given path. Doesn't chown path itself by default, only its children.
+
+ :param str path: The string path to start changing ownership.
+ :param str owner: The owner string to use when looking up the uid.
+ :param str group: The group string to use when looking up the gid.
+ :param bool follow_links: Also Chown links if True
+ :param bool chowntopdir: Also chown path itself if True
+ """
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
if follow_links:
@@ -482,6 +655,10 @@ def chownr(path, owner, group, follow_links=True):
else:
chown = os.lchown
+ if chowntopdir:
+ broken_symlink = os.path.lexists(path) and not os.path.exists(path)
+ if not broken_symlink:
+ chown(path, uid, gid)
for root, dirs, files in os.walk(path):
for name in dirs + files:
full = os.path.join(root, name)
@@ -491,4 +668,28 @@ def chownr(path, owner, group, follow_links=True):
def lchownr(path, owner, group):
+ """Recursively change user and group ownership of files and directories
+ in a given path, not following symbolic links. See the documentation for
+ 'os.lchown' for more information.
+
+ :param str path: The string path to start changing ownership.
+ :param str owner: The owner string to use when looking up the uid.
+ :param str group: The group string to use when looking up the gid.
+ """
chownr(path, owner, group, follow_links=False)
+
+
+def get_total_ram():
+ """The total amount of system RAM in bytes.
+
+ This is what is reported by the OS, and may be overcommitted when
+ there are multiple containers hosted on the same machine.
+ """
+ with open('/proc/meminfo', 'r') as f:
+ for line in f.readlines():
+ if line:
+ key, value, unit = line.split()
+ if key == 'MemTotal:':
+ assert unit == 'kB', 'Unknown unit'
+ return int(value) * 1024 # Classic, not KiB.
+ raise NotImplementedError()
diff --git a/hooks/charmhelpers/core/hugepage.py b/hooks/charmhelpers/core/hugepage.py
new file mode 100644
index 0000000..a783ad9
--- /dev/null
+++ b/hooks/charmhelpers/core/hugepage.py
@@ -0,0 +1,71 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2014-2015 Canonical Limited.
+#
+# This file is part of charm-helpers.
+#
+# charm-helpers is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# charm-helpers is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with charm-helpers. If not, see .
+
+import yaml
+from charmhelpers.core import fstab
+from charmhelpers.core import sysctl
+from charmhelpers.core.host import (
+ add_group,
+ add_user_to_group,
+ fstab_mount,
+ mkdir,
+)
+from charmhelpers.core.strutils import bytes_from_string
+from subprocess import check_output
+
+
+def hugepage_support(user, group='hugetlb', nr_hugepages=256,
+ max_map_count=65536, mnt_point='/run/hugepages/kvm',
+ pagesize='2MB', mount=True, set_shmmax=False):
+ """Enable hugepages on system.
+
+ Args:
+ user (str) -- Username to allow access to hugepages to
+ group (str) -- Group name to own hugepages
+ nr_hugepages (int) -- Number of pages to reserve
+ max_map_count (int) -- Number of Virtual Memory Areas a process can own
+ mnt_point (str) -- Directory to mount hugepages on
+ pagesize (str) -- Size of hugepages
+ mount (bool) -- Whether to Mount hugepages
+ """
+ group_info = add_group(group)
+ gid = group_info.gr_gid
+ add_user_to_group(user, group)
+ if max_map_count < 2 * nr_hugepages:
+ max_map_count = 2 * nr_hugepages
+ sysctl_settings = {
+ 'vm.nr_hugepages': nr_hugepages,
+ 'vm.max_map_count': max_map_count,
+ 'vm.hugetlb_shm_group': gid,
+ }
+ if set_shmmax:
+ shmmax_current = int(check_output(['sysctl', '-n', 'kernel.shmmax']))
+ shmmax_minsize = bytes_from_string(pagesize) * nr_hugepages
+ if shmmax_minsize > shmmax_current:
+ sysctl_settings['kernel.shmmax'] = shmmax_minsize
+ sysctl.create(yaml.dump(sysctl_settings), '/etc/sysctl.d/10-hugepage.conf')
+ mkdir(mnt_point, owner='root', group='root', perms=0o755, force=False)
+ lfstab = fstab.Fstab()
+ fstab_entry = lfstab.get_entry_by_attr('mountpoint', mnt_point)
+ if fstab_entry:
+ lfstab.remove_entry(fstab_entry)
+ entry = lfstab.Entry('nodev', mnt_point, 'hugetlbfs',
+ 'mode=1770,gid={},pagesize={}'.format(gid, pagesize), 0, 0)
+ lfstab.add_entry(entry)
+ if mount:
+ fstab_mount(mnt_point)
diff --git a/hooks/charmhelpers/core/kernel.py b/hooks/charmhelpers/core/kernel.py
new file mode 100644
index 0000000..5dc6495
--- /dev/null
+++ b/hooks/charmhelpers/core/kernel.py
@@ -0,0 +1,68 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# Copyright 2014-2015 Canonical Limited.
+#
+# This file is part of charm-helpers.
+#
+# charm-helpers is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# charm-helpers is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with charm-helpers. If not, see .
+
+__author__ = "Jorge Niedbalski "
+
+from charmhelpers.core.hookenv import (
+ log,
+ INFO
+)
+
+from subprocess import check_call, check_output
+import re
+
+
+def modprobe(module, persist=True):
+ """Load a kernel module and configure for auto-load on reboot."""
+ cmd = ['modprobe', module]
+
+ log('Loading kernel module %s' % module, level=INFO)
+
+ check_call(cmd)
+ if persist:
+ with open('/etc/modules', 'r+') as modules:
+ if module not in modules.read():
+ modules.write(module)
+
+
+def rmmod(module, force=False):
+ """Remove a module from the linux kernel"""
+ cmd = ['rmmod']
+ if force:
+ cmd.append('-f')
+ cmd.append(module)
+ log('Removing kernel module %s' % module, level=INFO)
+ return check_call(cmd)
+
+
+def lsmod():
+ """Shows what kernel modules are currently loaded"""
+ return check_output(['lsmod'],
+ universal_newlines=True)
+
+
+def is_module_loaded(module):
+ """Checks if a kernel module is already loaded"""
+ matches = re.findall('^%s[ ]+' % module, lsmod(), re.M)
+ return len(matches) > 0
+
+
+def update_initramfs(version='all'):
+ """Updates an initramfs image"""
+ return check_call(["update-initramfs", "-k", version, "-u"])
diff --git a/hooks/charmhelpers/core/services/helpers.py b/hooks/charmhelpers/core/services/helpers.py
index 8005c41..2423704 100644
--- a/hooks/charmhelpers/core/services/helpers.py
+++ b/hooks/charmhelpers/core/services/helpers.py
@@ -16,7 +16,9 @@
import os
import yaml
+
from charmhelpers.core import hookenv
+from charmhelpers.core import host
from charmhelpers.core import templating
from charmhelpers.core.services.base import ManagerCallback
@@ -240,27 +242,50 @@ class TemplateCallback(ManagerCallback):
:param str source: The template source file, relative to
`$CHARM_DIR/templates`
- :param str target: The target to write the rendered template to
+
+ :param str target: The target to write the rendered template to (or None)
:param str owner: The owner of the rendered file
:param str group: The group of the rendered file
:param int perms: The permissions of the rendered file
+ :param partial on_change_action: functools partial to be executed when
+ rendered file changes
+ :param jinja2 loader template_loader: A jinja2 template loader
+ :return str: The rendered template
"""
def __init__(self, source, target,
- owner='root', group='root', perms=0o444):
+ owner='root', group='root', perms=0o444,
+ on_change_action=None, template_loader=None):
self.source = source
self.target = target
self.owner = owner
self.group = group
self.perms = perms
+ self.on_change_action = on_change_action
+ self.template_loader = template_loader
def __call__(self, manager, service_name, event_name):
+ pre_checksum = ''
+ if self.on_change_action and os.path.isfile(self.target):
+ pre_checksum = host.file_hash(self.target)
service = manager.get_service(service_name)
- context = {}
+ context = {'ctx': {}}
for ctx in service.get('required_data', []):
context.update(ctx)
- templating.render(self.source, self.target, context,
- self.owner, self.group, self.perms)
+ context['ctx'].update(ctx)
+
+ result = templating.render(self.source, self.target, context,
+ self.owner, self.group, self.perms,
+ template_loader=self.template_loader)
+ if self.on_change_action:
+ if pre_checksum == host.file_hash(self.target):
+ hookenv.log(
+ 'No change detected: {}'.format(self.target),
+ hookenv.DEBUG)
+ else:
+ self.on_change_action()
+
+ return result
# Convenience aliases for templates
diff --git a/hooks/charmhelpers/core/strutils.py b/hooks/charmhelpers/core/strutils.py
index a2a784a..7e3f969 100644
--- a/hooks/charmhelpers/core/strutils.py
+++ b/hooks/charmhelpers/core/strutils.py
@@ -18,6 +18,7 @@
# along with charm-helpers. If not, see .
import six
+import re
def bool_from_string(value):
@@ -40,3 +41,32 @@ def bool_from_string(value):
msg = "Unable to interpret string value '%s' as boolean" % (value)
raise ValueError(msg)
+
+
+def bytes_from_string(value):
+ """Interpret human readable string value as bytes.
+
+ Returns int
+ """
+ BYTE_POWER = {
+ 'K': 1,
+ 'KB': 1,
+ 'M': 2,
+ 'MB': 2,
+ 'G': 3,
+ 'GB': 3,
+ 'T': 4,
+ 'TB': 4,
+ 'P': 5,
+ 'PB': 5,
+ }
+ if isinstance(value, six.string_types):
+ value = six.text_type(value)
+ else:
+ msg = "Unable to interpret non-string value '%s' as boolean" % (value)
+ raise ValueError(msg)
+ matches = re.match("([0-9]+)([a-zA-Z]+)", value)
+ if not matches:
+ msg = "Unable to interpret string value '%s' as bytes" % (value)
+ raise ValueError(msg)
+ return int(matches.group(1)) * (1024 ** BYTE_POWER[matches.group(2)])
diff --git a/hooks/charmhelpers/core/templating.py b/hooks/charmhelpers/core/templating.py
index 4531999..d2d8eaf 100644
--- a/hooks/charmhelpers/core/templating.py
+++ b/hooks/charmhelpers/core/templating.py
@@ -21,13 +21,14 @@ from charmhelpers.core import hookenv
def render(source, target, context, owner='root', group='root',
- perms=0o444, templates_dir=None, encoding='UTF-8'):
+ perms=0o444, templates_dir=None, encoding='UTF-8', template_loader=None):
"""
Render a template.
The `source` path, if not absolute, is relative to the `templates_dir`.
- The `target` path should be absolute.
+ The `target` path should be absolute. It can also be `None`, in which
+ case no file will be written.
The context should be a dict containing the values to be replaced in the
template.
@@ -36,6 +37,9 @@ def render(source, target, context, owner='root', group='root',
If omitted, `templates_dir` defaults to the `templates` folder in the charm.
+ The rendered template will be written to the file as well as being returned
+ as a string.
+
Note: Using this requires python-jinja2; if it is not installed, calling
this will attempt to use charmhelpers.fetch.apt_install to install it.
"""
@@ -52,17 +56,26 @@ def render(source, target, context, owner='root', group='root',
apt_install('python-jinja2', fatal=True)
from jinja2 import FileSystemLoader, Environment, exceptions
- if templates_dir is None:
- templates_dir = os.path.join(hookenv.charm_dir(), 'templates')
- loader = Environment(loader=FileSystemLoader(templates_dir))
+ if template_loader:
+ template_env = Environment(loader=template_loader)
+ else:
+ if templates_dir is None:
+ templates_dir = os.path.join(hookenv.charm_dir(), 'templates')
+ template_env = Environment(loader=FileSystemLoader(templates_dir))
try:
source = source
- template = loader.get_template(source)
+ template = template_env.get_template(source)
except exceptions.TemplateNotFound as e:
hookenv.log('Could not load template %s from %s.' %
(source, templates_dir),
level=hookenv.ERROR)
raise e
content = template.render(context)
- host.mkdir(os.path.dirname(target), owner, group, perms=0o755)
- host.write_file(target, content.encode(encoding), owner, group, perms)
+ if target is not None:
+ target_dir = os.path.dirname(target)
+ if not os.path.exists(target_dir):
+ # This is a terrible default directory permission, as the file
+ # or its siblings will often contain secrets.
+ host.mkdir(os.path.dirname(target), owner, group, perms=0o755)
+ host.write_file(target, content.encode(encoding), owner, group, perms)
+ return content
diff --git a/hooks/charmhelpers/fetch/__init__.py b/hooks/charmhelpers/fetch/__init__.py
index 0a3bb96..db0d86a 100644
--- a/hooks/charmhelpers/fetch/__init__.py
+++ b/hooks/charmhelpers/fetch/__init__.py
@@ -90,6 +90,22 @@ CLOUD_ARCHIVE_POCKETS = {
'kilo/proposed': 'trusty-proposed/kilo',
'trusty-kilo/proposed': 'trusty-proposed/kilo',
'trusty-proposed/kilo': 'trusty-proposed/kilo',
+ # Liberty
+ 'liberty': 'trusty-updates/liberty',
+ 'trusty-liberty': 'trusty-updates/liberty',
+ 'trusty-liberty/updates': 'trusty-updates/liberty',
+ 'trusty-updates/liberty': 'trusty-updates/liberty',
+ 'liberty/proposed': 'trusty-proposed/liberty',
+ 'trusty-liberty/proposed': 'trusty-proposed/liberty',
+ 'trusty-proposed/liberty': 'trusty-proposed/liberty',
+ # Mitaka
+ 'mitaka': 'trusty-updates/mitaka',
+ 'trusty-mitaka': 'trusty-updates/mitaka',
+ 'trusty-mitaka/updates': 'trusty-updates/mitaka',
+ 'trusty-updates/mitaka': 'trusty-updates/mitaka',
+ 'mitaka/proposed': 'trusty-proposed/mitaka',
+ 'trusty-mitaka/proposed': 'trusty-proposed/mitaka',
+ 'trusty-proposed/mitaka': 'trusty-proposed/mitaka',
}
# The order of this list is very important. Handlers should be listed in from
@@ -217,12 +233,12 @@ def apt_purge(packages, fatal=False):
def apt_mark(packages, mark, fatal=False):
"""Flag one or more packages using apt-mark"""
+ log("Marking {} as {}".format(packages, mark))
cmd = ['apt-mark', mark]
if isinstance(packages, six.string_types):
cmd.append(packages)
else:
cmd.extend(packages)
- log("Holding {}".format(packages))
if fatal:
subprocess.check_call(cmd, universal_newlines=True)
@@ -403,7 +419,7 @@ def plugins(fetch_handlers=None):
importlib.import_module(package),
classname)
plugin_list.append(handler_class())
- except (ImportError, AttributeError):
+ except NotImplementedError:
# Skip missing plugins so that they can be ommitted from
# installation if desired
log("FetchHandler {} not found, skipping plugin".format(
diff --git a/hooks/charmhelpers/fetch/archiveurl.py b/hooks/charmhelpers/fetch/archiveurl.py
index efd7f9f..b8e0943 100644
--- a/hooks/charmhelpers/fetch/archiveurl.py
+++ b/hooks/charmhelpers/fetch/archiveurl.py
@@ -108,7 +108,7 @@ class ArchiveUrlFetchHandler(BaseFetchHandler):
install_opener(opener)
response = urlopen(source)
try:
- with open(dest, 'w') as dest_file:
+ with open(dest, 'wb') as dest_file:
dest_file.write(response.read())
except Exception as e:
if os.path.isfile(dest):
diff --git a/hooks/charmhelpers/fetch/bzrurl.py b/hooks/charmhelpers/fetch/bzrurl.py
index 3531315..cafd27f 100644
--- a/hooks/charmhelpers/fetch/bzrurl.py
+++ b/hooks/charmhelpers/fetch/bzrurl.py
@@ -15,60 +15,50 @@
# along with charm-helpers. If not, see .
import os
+from subprocess import check_call
from charmhelpers.fetch import (
BaseFetchHandler,
- UnhandledSource
+ UnhandledSource,
+ filter_installed_packages,
+ apt_install,
)
from charmhelpers.core.host import mkdir
-import six
-if six.PY3:
- raise ImportError('bzrlib does not support Python3')
-try:
- from bzrlib.branch import Branch
- from bzrlib import bzrdir, workingtree, errors
-except ImportError:
- from charmhelpers.fetch import apt_install
- apt_install("python-bzrlib")
- from bzrlib.branch import Branch
- from bzrlib import bzrdir, workingtree, errors
+if filter_installed_packages(['bzr']) != []:
+ apt_install(['bzr'])
+ if filter_installed_packages(['bzr']) != []:
+ raise NotImplementedError('Unable to install bzr')
class BzrUrlFetchHandler(BaseFetchHandler):
"""Handler for bazaar branches via generic and lp URLs"""
def can_handle(self, source):
url_parts = self.parse_url(source)
- if url_parts.scheme not in ('bzr+ssh', 'lp'):
+ if url_parts.scheme not in ('bzr+ssh', 'lp', ''):
return False
+ elif not url_parts.scheme:
+ return os.path.exists(os.path.join(source, '.bzr'))
else:
return True
def branch(self, source, dest):
- url_parts = self.parse_url(source)
- # If we use lp:branchname scheme we need to load plugins
if not self.can_handle(source):
raise UnhandledSource("Cannot handle {}".format(source))
- if url_parts.scheme == "lp":
- from bzrlib.plugin import load_plugins
- load_plugins()
- try:
- local_branch = bzrdir.BzrDir.create_branch_convenience(dest)
- except errors.AlreadyControlDirError:
- local_branch = Branch.open(dest)
- try:
- remote_branch = Branch.open(source)
- remote_branch.push(local_branch)
- tree = workingtree.WorkingTree.open(dest)
- tree.update()
- except Exception as e:
- raise e
+ if os.path.exists(dest):
+ check_call(['bzr', 'pull', '--overwrite', '-d', dest, source])
+ else:
+ check_call(['bzr', 'branch', source, dest])
- def install(self, source):
+ def install(self, source, dest=None):
url_parts = self.parse_url(source)
branch_name = url_parts.path.strip("/").split("/")[-1]
- dest_dir = os.path.join(os.environ.get('CHARM_DIR'), "fetched",
- branch_name)
+ if dest:
+ dest_dir = os.path.join(dest, branch_name)
+ else:
+ dest_dir = os.path.join(os.environ.get('CHARM_DIR'), "fetched",
+ branch_name)
+
if not os.path.exists(dest_dir):
mkdir(dest_dir, perms=0o755)
try:
diff --git a/hooks/charmhelpers/fetch/giturl.py b/hooks/charmhelpers/fetch/giturl.py
index f023b26..65ed531 100644
--- a/hooks/charmhelpers/fetch/giturl.py
+++ b/hooks/charmhelpers/fetch/giturl.py
@@ -15,24 +15,18 @@
# along with charm-helpers. If not, see .
import os
+from subprocess import check_call, CalledProcessError
from charmhelpers.fetch import (
BaseFetchHandler,
- UnhandledSource
+ UnhandledSource,
+ filter_installed_packages,
+ apt_install,
)
-from charmhelpers.core.host import mkdir
-import six
-if six.PY3:
- raise ImportError('GitPython does not support Python 3')
-
-try:
- from git import Repo
-except ImportError:
- from charmhelpers.fetch import apt_install
- apt_install("python-git")
- from git import Repo
-
-from git.exc import GitCommandError # noqa E402
+if filter_installed_packages(['git']) != []:
+ apt_install(['git'])
+ if filter_installed_packages(['git']) != []:
+ raise NotImplementedError('Unable to install git')
class GitUrlFetchHandler(BaseFetchHandler):
@@ -40,19 +34,24 @@ class GitUrlFetchHandler(BaseFetchHandler):
def can_handle(self, source):
url_parts = self.parse_url(source)
# TODO (mattyw) no support for ssh git@ yet
- if url_parts.scheme not in ('http', 'https', 'git'):
+ if url_parts.scheme not in ('http', 'https', 'git', ''):
return False
+ elif not url_parts.scheme:
+ return os.path.exists(os.path.join(source, '.git'))
else:
return True
- def clone(self, source, dest, branch, depth=None):
+ def clone(self, source, dest, branch="master", depth=None):
if not self.can_handle(source):
raise UnhandledSource("Cannot handle {}".format(source))
- if depth:
- Repo.clone_from(source, dest, branch=branch, depth=depth)
+ if os.path.exists(dest):
+ cmd = ['git', '-C', dest, 'pull', source, branch]
else:
- Repo.clone_from(source, dest, branch=branch)
+ cmd = ['git', 'clone', source, dest, '--branch', branch]
+ if depth:
+ cmd.extend(['--depth', depth])
+ check_call(cmd)
def install(self, source, branch="master", dest=None, depth=None):
url_parts = self.parse_url(source)
@@ -62,11 +61,9 @@ class GitUrlFetchHandler(BaseFetchHandler):
else:
dest_dir = os.path.join(os.environ.get('CHARM_DIR'), "fetched",
branch_name)
- if not os.path.exists(dest_dir):
- mkdir(dest_dir, perms=0o755)
try:
self.clone(source, dest_dir, branch, depth)
- except GitCommandError as e:
+ except CalledProcessError as e:
raise UnhandledSource(e)
except OSError as e:
raise UnhandledSource(e.strerror)
diff --git a/hooks/hooks.py b/hooks/hooks.py
index 7884e88..cf7f9a1 100755
--- a/hooks/hooks.py
+++ b/hooks/hooks.py
@@ -55,6 +55,7 @@ from utils import (
disable_lsb_services,
disable_upstart_services,
get_ipv6_addr,
+ set_unit_status,
)
from charmhelpers.contrib.charmsupport import nrpe
@@ -406,22 +407,9 @@ def update_nrpe_config():
nrpe_setup.write()
-def assess_status():
- '''Assess status of current unit'''
- node_count = int(config('cluster_count'))
- # not enough peers
- for relid in relation_ids('hanode'):
- if len(related_units(relid)) + 1 < node_count:
- status_set('blocked', 'Insufficient peer units for ha cluster '
- '(require {})'.format(node_count))
- return
-
- status_set('active', 'Unit is ready and clustered')
-
-
if __name__ == '__main__':
try:
hooks.execute(sys.argv)
except UnregisteredHookError as e:
log('Unknown hook {} - skipping.'.format(e), level=DEBUG)
- assess_status()
+ set_unit_status()
diff --git a/hooks/utils.py b/hooks/utils.py
index 032f8c5..d4c3753 100644
--- a/hooks/utils.py
+++ b/hooks/utils.py
@@ -7,6 +7,7 @@ import subprocess
import socket
import fcntl
import struct
+import xml.etree.ElementTree as ET
from base64 import b64decode
@@ -23,7 +24,12 @@ from charmhelpers.core.hookenv import (
unit_get,
status_set,
)
-from charmhelpers.contrib.openstack.utils import get_host_ip
+from charmhelpers.contrib.openstack.utils import (
+ get_host_ip,
+ set_unit_paused,
+ clear_unit_paused,
+ is_unit_paused_set,
+)
from charmhelpers.core.host import (
service_start,
service_stop,
@@ -168,7 +174,7 @@ def get_corosync_id(unit_name):
def nulls(data):
"""Returns keys of values that are null (but not bool)"""
return [k for k in data.iterkeys()
- if not bool == type(data[k]) and not data[k]]
+ if not isinstance(data[k], bool) and not data[k]]
def get_corosync_conf():
@@ -503,5 +509,129 @@ def restart_corosync():
if service_running("pacemaker"):
service_stop("pacemaker")
- service_restart("corosync")
- service_start("pacemaker")
+ if not is_unit_paused_set():
+ service_restart("corosync")
+ service_start("pacemaker")
+
+
+def is_in_standby_mode(node_name):
+ """Check if node is in standby mode in pacemaker
+
+ @param node_name: The name of the node to check
+ @returns boolean - True if node_name is in standby mode
+ """
+ out = subprocess.check_output(['crm', 'node', 'status', node_name])
+ root = ET.fromstring(out)
+
+ standby_mode = False
+ for nvpair in root.iter('nvpair'):
+ if (nvpair.attrib.get('name') == 'standby' and
+ nvpair.attrib.get('value') == 'on'):
+ standby_mode = True
+ return standby_mode
+
+
+def get_hostname():
+ """Return the hostname of this unit
+
+ @returns hostname
+ """
+ return subprocess.check_output(['uname', '-n']).rstrip()
+
+
+def enter_standby_mode(node_name, duration='forever'):
+ """Put this node into standby mode in pacemaker
+
+ @returns None
+ """
+ subprocess.check_call(['crm', 'node', 'standby', node_name, duration])
+
+
+def leave_standby_mode(node_name):
+ """Take this node out of standby mode in pacemaker
+
+ @returns None
+ """
+ subprocess.check_call(['crm', 'node', 'online', node_name])
+
+
+def node_has_resources(node_name):
+ """Check if this node is running resources
+
+ @param node_name: The name of the node to check
+ @returns boolean - True if node_name has resources
+ """
+ out = subprocess.check_output(['crm_mon', '-X'])
+ root = ET.fromstring(out)
+ has_resources = False
+ for resource in root.iter('resource'):
+ for child in resource:
+ if child.tag == 'node' and child.attrib.get('name') == node_name:
+ has_resources = True
+ return has_resources
+
+
+def set_unit_status():
+ """Set the workload status for this unit
+
+ @returns None
+ """
+ status, messages = assess_status_helper()
+ status_set(status, messages)
+
+
+def resume_unit():
+ """Resume services on this unit and update the units status
+
+ @returns None
+ """
+ node_name = get_hostname()
+ messages = []
+ leave_standby_mode(node_name)
+ if is_in_standby_mode(node_name):
+ messages.append("Node still in standby mode")
+ if messages:
+ raise Exception("Couldn't resume: {}".format("; ".join(messages)))
+ else:
+ clear_unit_paused()
+ set_unit_status()
+
+
+def pause_unit():
+ """Pause services on this unit and update the units status
+
+ @returns None
+ """
+ node_name = get_hostname()
+ messages = []
+ enter_standby_mode(node_name)
+ if not is_in_standby_mode(node_name):
+ messages.append("Node not in standby mode")
+ if node_has_resources(node_name):
+ messages.append("Resources still running on unit")
+ status, message = assess_status_helper()
+ if status != 'active':
+ messages.append(message)
+ if messages:
+ raise Exception("Couldn't pause: {}".format("; ".join(messages)))
+ else:
+ set_unit_paused()
+ status_set("maintenance",
+ "Paused. Use 'resume' action to resume normal service.")
+
+
+def assess_status_helper():
+ """Assess status of unit
+
+ @returns status, message - status is workload status and message is any
+ corresponding messages
+ """
+ node_count = int(config('cluster_count'))
+ status = 'active'
+ message = 'Unit is ready and clustered'
+ for relid in relation_ids('hanode'):
+ if len(related_units(relid)) + 1 < node_count:
+ status = 'blocked'
+ message = ("Insufficient peer units for ha cluster "
+ "(require {})".format(node_count))
+ return status, message
diff --git a/tests/basic_deployment.py b/tests/basic_deployment.py
index 2c5ed99..1998a8e 100644
--- a/tests/basic_deployment.py
+++ b/tests/basic_deployment.py
@@ -1,6 +1,9 @@
#!/usr/bin/env python
import os
+import subprocess
+import json
import amulet
+import time
import keystoneclient.v2_0 as keystone_client
@@ -132,3 +135,46 @@ class HAClusterBasicDeployment(OpenStackAmuletDeployment):
user=self.demo_user,
password='password',
tenant=self.demo_tenant)
+
+ def _run_action(self, unit_id, action, *args):
+ command = ["juju", "action", "do", "--format=json", unit_id, action]
+ command.extend(args)
+ print("Running command: %s\n" % " ".join(command))
+ output = subprocess.check_output(command)
+ output_json = output.decode(encoding="UTF-8")
+ data = json.loads(output_json)
+ action_id = data[u'Action queued with id']
+ return action_id
+
+ def _wait_on_action(self, action_id):
+ command = ["juju", "action", "fetch", "--format=json", action_id]
+ while True:
+ try:
+ output = subprocess.check_output(command)
+ except Exception as e:
+ print(e)
+ return False
+ output_json = output.decode(encoding="UTF-8")
+ data = json.loads(output_json)
+ if data[u"status"] == "completed":
+ return True
+ elif data[u"status"] == "failed":
+ return False
+ time.sleep(2)
+
+ def test_910_pause_and_resume(self):
+ """The services can be paused and resumed. """
+ u.log.debug('Checking pause and resume actions...')
+ unit_name = "hacluster/0"
+ unit = self.d.sentry.unit[unit_name]
+
+ assert u.status_get(unit)[0] == "active"
+
+ action_id = self._run_action(unit_name, "pause")
+ assert self._wait_on_action(action_id), "Pause action failed."
+ assert u.status_get(unit)[0] == "maintenance"
+
+ action_id = self._run_action(unit_name, "resume")
+ assert self._wait_on_action(action_id), "Resume action failed."
+ assert u.status_get(unit)[0] == "active"
+ u.log.debug('OK')
diff --git a/tests/charmhelpers/contrib/amulet/utils.py b/tests/charmhelpers/contrib/amulet/utils.py
index 2591a9b..3e15903 100644
--- a/tests/charmhelpers/contrib/amulet/utils.py
+++ b/tests/charmhelpers/contrib/amulet/utils.py
@@ -782,15 +782,20 @@ class AmuletUtils(object):
# amulet juju action helpers:
def run_action(self, unit_sentry, action,
- _check_output=subprocess.check_output):
+ _check_output=subprocess.check_output,
+ params=None):
"""Run the named action on a given unit sentry.
+ params a dict of parameters to use
_check_output parameter is used for dependency injection.
@return action_id.
"""
unit_id = unit_sentry.info["unit_name"]
command = ["juju", "action", "do", "--format=json", unit_id, action]
+ if params is not None:
+ for key, value in params.iteritems():
+ command.append("{}={}".format(key, value))
self.log.info("Running command: %s\n" % " ".join(command))
output = _check_output(command, universal_newlines=True)
data = json.loads(output)
diff --git a/tests/charmhelpers/contrib/openstack/amulet/deployment.py b/tests/charmhelpers/contrib/openstack/amulet/deployment.py
index 58b1a79..d2ede32 100644
--- a/tests/charmhelpers/contrib/openstack/amulet/deployment.py
+++ b/tests/charmhelpers/contrib/openstack/amulet/deployment.py
@@ -121,11 +121,12 @@ class OpenStackAmuletDeployment(AmuletDeployment):
# Charms which should use the source config option
use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph',
- 'ceph-osd', 'ceph-radosgw']
+ 'ceph-osd', 'ceph-radosgw', 'ceph-mon']
# Charms which can not use openstack-origin, ie. many subordinates
no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe',
- 'openvswitch-odl', 'neutron-api-odl', 'odl-controller']
+ 'openvswitch-odl', 'neutron-api-odl', 'odl-controller',
+ 'cinder-backup']
if self.openstack:
for svc in services:
diff --git a/tests/charmhelpers/contrib/openstack/amulet/utils.py b/tests/charmhelpers/contrib/openstack/amulet/utils.py
index 388b60e..ef3bdcc 100644
--- a/tests/charmhelpers/contrib/openstack/amulet/utils.py
+++ b/tests/charmhelpers/contrib/openstack/amulet/utils.py
@@ -27,7 +27,11 @@ import cinderclient.v1.client as cinder_client
import glanceclient.v1.client as glance_client
import heatclient.v1.client as heat_client
import keystoneclient.v2_0 as keystone_client
-import novaclient.v1_1.client as nova_client
+from keystoneclient.auth.identity import v3 as keystone_id_v3
+from keystoneclient import session as keystone_session
+from keystoneclient.v3 import client as keystone_client_v3
+
+import novaclient.client as nova_client
import pika
import swiftclient
@@ -38,6 +42,8 @@ from charmhelpers.contrib.amulet.utils import (
DEBUG = logging.DEBUG
ERROR = logging.ERROR
+NOVA_CLIENT_VERSION = "2"
+
class OpenStackAmuletUtils(AmuletUtils):
"""OpenStack amulet utilities.
@@ -139,7 +145,7 @@ class OpenStackAmuletUtils(AmuletUtils):
return "role {} does not exist".format(e['name'])
return ret
- def validate_user_data(self, expected, actual):
+ def validate_user_data(self, expected, actual, api_version=None):
"""Validate user data.
Validate a list of actual user data vs a list of expected user
@@ -150,10 +156,15 @@ class OpenStackAmuletUtils(AmuletUtils):
for e in expected:
found = False
for act in actual:
- a = {'enabled': act.enabled, 'name': act.name,
- 'email': act.email, 'tenantId': act.tenantId,
- 'id': act.id}
- if e['name'] == a['name']:
+ if e['name'] == act.name:
+ a = {'enabled': act.enabled, 'name': act.name,
+ 'email': act.email, 'id': act.id}
+ if api_version == 3:
+ a['default_project_id'] = getattr(act,
+ 'default_project_id',
+ 'none')
+ else:
+ a['tenantId'] = act.tenantId
found = True
ret = self._validate_dict_data(e, a)
if ret:
@@ -188,15 +199,30 @@ class OpenStackAmuletUtils(AmuletUtils):
return cinder_client.Client(username, password, tenant, ept)
def authenticate_keystone_admin(self, keystone_sentry, user, password,
- tenant):
+ tenant=None, api_version=None,
+ keystone_ip=None):
"""Authenticates admin user with the keystone admin endpoint."""
self.log.debug('Authenticating keystone admin...')
unit = keystone_sentry
- service_ip = unit.relation('shared-db',
- 'mysql:shared-db')['private-address']
- ep = "http://{}:35357/v2.0".format(service_ip.strip().decode('utf-8'))
- return keystone_client.Client(username=user, password=password,
- tenant_name=tenant, auth_url=ep)
+ if not keystone_ip:
+ keystone_ip = unit.relation('shared-db',
+ 'mysql:shared-db')['private-address']
+ base_ep = "http://{}:35357".format(keystone_ip.strip().decode('utf-8'))
+ if not api_version or api_version == 2:
+ ep = base_ep + "/v2.0"
+ return keystone_client.Client(username=user, password=password,
+ tenant_name=tenant, auth_url=ep)
+ else:
+ ep = base_ep + "/v3"
+ auth = keystone_id_v3.Password(
+ user_domain_name='admin_domain',
+ username=user,
+ password=password,
+ domain_name='admin_domain',
+ auth_url=ep,
+ )
+ sess = keystone_session.Session(auth=auth)
+ return keystone_client_v3.Client(session=sess)
def authenticate_keystone_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with the keystone public endpoint."""
@@ -225,7 +251,8 @@ class OpenStackAmuletUtils(AmuletUtils):
self.log.debug('Authenticating nova user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
endpoint_type='publicURL')
- return nova_client.Client(username=user, api_key=password,
+ return nova_client.Client(NOVA_CLIENT_VERSION,
+ username=user, api_key=password,
project_id=tenant, auth_url=ep)
def authenticate_swift_user(self, keystone, user, password, tenant):