[corey.bryant,r=trivial] Sync charm-helpers.

This commit is contained in:
Corey Bryant 2016-01-04 16:28:44 -05:00
parent 00a624d5e8
commit c690403469
24 changed files with 1002 additions and 214 deletions

View File

@ -20,7 +20,7 @@ import sys
from six.moves import zip
from charmhelpers.core import unitdata
import charmhelpers.core.unitdata
class OutputFormatter(object):
@ -163,8 +163,8 @@ class CommandLine(object):
if getattr(arguments.func, '_cli_no_output', False):
output = ''
self.formatter.format_output(output, arguments.format)
if unitdata._KV:
unitdata._KV.flush()
if charmhelpers.core.unitdata._KV:
charmhelpers.core.unitdata._KV.flush()
cmdline = CommandLine()

View File

@ -148,6 +148,13 @@ define service {{
self.description = description
self.check_cmd = self._locate_cmd(check_cmd)
def _get_check_filename(self):
return os.path.join(NRPE.nrpe_confdir, '{}.cfg'.format(self.command))
def _get_service_filename(self, hostname):
return os.path.join(NRPE.nagios_exportdir,
'service__{}_{}.cfg'.format(hostname, self.command))
def _locate_cmd(self, check_cmd):
search_path = (
'/usr/lib/nagios/plugins',
@ -163,9 +170,21 @@ define service {{
log('Check command not found: {}'.format(parts[0]))
return ''
def _remove_service_files(self):
if not os.path.exists(NRPE.nagios_exportdir):
return
for f in os.listdir(NRPE.nagios_exportdir):
if f.endswith('_{}.cfg'.format(self.command)):
os.remove(os.path.join(NRPE.nagios_exportdir, f))
def remove(self, hostname):
nrpe_check_file = self._get_check_filename()
if os.path.exists(nrpe_check_file):
os.remove(nrpe_check_file)
self._remove_service_files()
def write(self, nagios_context, hostname, nagios_servicegroups):
nrpe_check_file = '/etc/nagios/nrpe.d/{}.cfg'.format(
self.command)
nrpe_check_file = self._get_check_filename()
with open(nrpe_check_file, 'w') as nrpe_check_config:
nrpe_check_config.write("# check {}\n".format(self.shortname))
nrpe_check_config.write("command[{}]={}\n".format(
@ -180,9 +199,7 @@ define service {{
def write_service_config(self, nagios_context, hostname,
nagios_servicegroups):
for f in os.listdir(NRPE.nagios_exportdir):
if re.search('.*{}.cfg'.format(self.command), f):
os.remove(os.path.join(NRPE.nagios_exportdir, f))
self._remove_service_files()
templ_vars = {
'nagios_hostname': hostname,
@ -192,8 +209,7 @@ define service {{
'command': self.command,
}
nrpe_service_text = Check.service_template.format(**templ_vars)
nrpe_service_file = '{}/service__{}_{}.cfg'.format(
NRPE.nagios_exportdir, hostname, self.command)
nrpe_service_file = self._get_service_filename(hostname)
with open(nrpe_service_file, 'w') as nrpe_service_config:
nrpe_service_config.write(str(nrpe_service_text))
@ -217,6 +233,10 @@ class NRPE(object):
self.unit_name = local_unit().replace('/', '-')
if hostname:
self.hostname = hostname
else:
nagios_hostname = get_nagios_hostname()
if nagios_hostname:
self.hostname = nagios_hostname
else:
self.hostname = "{}-{}".format(self.nagios_context, self.unit_name)
self.checks = []
@ -224,6 +244,22 @@ class NRPE(object):
def add_check(self, *args, **kwargs):
self.checks.append(Check(*args, **kwargs))
def remove_check(self, *args, **kwargs):
if kwargs.get('shortname') is None:
raise ValueError('shortname of check must be specified')
# Use sensible defaults if they're not specified - these are not
# actually used during removal, but they're required for constructing
# the Check object; check_disk is chosen because it's part of the
# nagios-plugins-basic package.
if kwargs.get('check_cmd') is None:
kwargs['check_cmd'] = 'check_disk'
if kwargs.get('description') is None:
kwargs['description'] = ''
check = Check(*args, **kwargs)
check.remove(self.hostname)
def write(self):
try:
nagios_uid = pwd.getpwnam('nagios').pw_uid
@ -260,7 +296,7 @@ def get_nagios_hostcontext(relation_name='nrpe-external-master'):
:param str relation_name: Name of relation nrpe sub joined to
"""
for rel in relations_of_type(relation_name):
if 'nagios_hostname' in rel:
if 'nagios_host_context' in rel:
return rel['nagios_host_context']
@ -301,6 +337,8 @@ def add_init_service_checks(nrpe, services, unit_name):
upstart_init = '/etc/init/%s.conf' % svc
sysv_init = '/etc/init.d/%s' % svc
if os.path.exists(upstart_init):
# Don't add a check for these services from neutron-gateway
if svc not in ['ext-port', 'os-charm-phy-nic-mtu']:
nrpe.add_check(
shortname=svc,
description='process check {%s}' % unit_name,

View File

@ -53,7 +53,7 @@ def _validate_cidr(network):
def no_ip_found_error_out(network):
errmsg = ("No IP address found in network: %s" % network)
errmsg = ("No IP address found in network(s): %s" % network)
raise ValueError(errmsg)
@ -61,7 +61,7 @@ def get_address_in_network(network, fallback=None, fatal=False):
"""Get an IPv4 or IPv6 address within the network from the host.
:param network (str): CIDR presentation format. For example,
'192.168.1.0/24'.
'192.168.1.0/24'. Supports multiple networks as a space-delimited list.
:param fallback (str): If no address is found, return fallback.
:param fatal (boolean): If no address is found, fallback is not
set and fatal is True then exit(1).
@ -75,6 +75,8 @@ def get_address_in_network(network, fallback=None, fatal=False):
else:
return None
networks = network.split() or [network]
for network in networks:
_validate_cidr(network)
network = netaddr.IPNetwork(network)
for iface in netifaces.interfaces():

View File

@ -14,13 +14,18 @@
# You should have received a copy of the GNU Lesser General Public License
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
import logging
import re
import sys
import six
from collections import OrderedDict
from charmhelpers.contrib.amulet.deployment import (
AmuletDeployment
)
DEBUG = logging.DEBUG
ERROR = logging.ERROR
class OpenStackAmuletDeployment(AmuletDeployment):
"""OpenStack amulet deployment.
@ -29,9 +34,12 @@ class OpenStackAmuletDeployment(AmuletDeployment):
that is specifically for use by OpenStack charms.
"""
def __init__(self, series=None, openstack=None, source=None, stable=True):
def __init__(self, series=None, openstack=None, source=None,
stable=True, log_level=DEBUG):
"""Initialize the deployment environment."""
super(OpenStackAmuletDeployment, self).__init__(series)
self.log = self.get_logger(level=log_level)
self.log.info('OpenStackAmuletDeployment: init')
self.openstack = openstack
self.source = source
self.stable = stable
@ -39,6 +47,22 @@ class OpenStackAmuletDeployment(AmuletDeployment):
# out.
self.current_next = "trusty"
def get_logger(self, name="deployment-logger", level=logging.DEBUG):
"""Get a logger object that will log to stdout."""
log = logging
logger = log.getLogger(name)
fmt = log.Formatter("%(asctime)s %(funcName)s "
"%(levelname)s: %(message)s")
handler = log.StreamHandler(stream=sys.stdout)
handler.setLevel(level)
handler.setFormatter(fmt)
logger.addHandler(handler)
logger.setLevel(level)
return logger
def _determine_branch_locations(self, other_services):
"""Determine the branch locations for the other services.
@ -46,6 +70,8 @@ class OpenStackAmuletDeployment(AmuletDeployment):
stable or next (dev) branch, and based on this, use the corresonding
stable or next branches for the other_services."""
self.log.info('OpenStackAmuletDeployment: determine branch locations')
# Charms outside the lp:~openstack-charmers namespace
base_charms = ['mysql', 'mongodb', 'nrpe']
@ -83,6 +109,8 @@ class OpenStackAmuletDeployment(AmuletDeployment):
def _add_services(self, this_service, other_services):
"""Add services to the deployment and set openstack-origin/source."""
self.log.info('OpenStackAmuletDeployment: adding services')
other_services = self._determine_branch_locations(other_services)
super(OpenStackAmuletDeployment, self)._add_services(this_service,
@ -96,7 +124,8 @@ class OpenStackAmuletDeployment(AmuletDeployment):
'ceph-osd', 'ceph-radosgw']
# Charms which can not use openstack-origin, ie. many subordinates
no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe',
'openvswitch-odl', 'neutron-api-odl', 'odl-controller']
if self.openstack:
for svc in services:
@ -112,11 +141,12 @@ class OpenStackAmuletDeployment(AmuletDeployment):
def _configure_services(self, configs):
"""Configure all of the services."""
self.log.info('OpenStackAmuletDeployment: configure services')
for service, config in six.iteritems(configs):
self.d.configure(service, config)
def _auto_wait_for_status(self, message=None, exclude_services=None,
timeout=1800):
include_only=None, timeout=1800):
"""Wait for all units to have a specific extended status, except
for any defined as excluded. Unless specified via message, any
status containing any case of 'ready' will be considered a match.
@ -127,7 +157,7 @@ class OpenStackAmuletDeployment(AmuletDeployment):
message = re.compile('.*ready.*|.*ok.*', re.IGNORECASE)
Wait for all units to reach this status (exact match):
message = 'Unit is ready'
message = re.compile('^Unit is ready and clustered$')
Wait for all units to reach any one of these (exact match):
message = re.compile('Unit is ready|OK|Ready')
@ -139,20 +169,50 @@ class OpenStackAmuletDeployment(AmuletDeployment):
https://github.com/juju/amulet/blob/master/amulet/sentry.py
:param message: Expected status match
:param exclude_services: List of juju service names to ignore
:param exclude_services: List of juju service names to ignore,
not to be used in conjuction with include_only.
:param include_only: List of juju service names to exclusively check,
not to be used in conjuction with exclude_services.
:param timeout: Maximum time in seconds to wait for status match
:returns: None. Raises if timeout is hit.
"""
self.log.info('Waiting for extended status on units...')
if not message:
all_services = self.d.services.keys()
if exclude_services and include_only:
raise ValueError('exclude_services can not be used '
'with include_only')
if message:
if isinstance(message, re._pattern_type):
match = message.pattern
else:
match = message
self.log.debug('Custom extended status wait match: '
'{}'.format(match))
else:
self.log.debug('Default extended status wait match: contains '
'READY (case-insensitive)')
message = re.compile('.*ready.*', re.IGNORECASE)
if not exclude_services:
if exclude_services:
self.log.debug('Excluding services from extended status match: '
'{}'.format(exclude_services))
else:
exclude_services = []
services = list(set(self.d.services.keys()) - set(exclude_services))
if include_only:
services = include_only
else:
services = list(set(all_services) - set(exclude_services))
self.log.debug('Waiting up to {}s for extended status on services: '
'{}'.format(timeout, services))
service_messages = {service: message for service in services}
self.d.sentry.wait_for_messages(service_messages, timeout=timeout)
self.log.info('OK')
def _get_openstack_release(self):
"""Get openstack release.
@ -165,7 +225,8 @@ class OpenStackAmuletDeployment(AmuletDeployment):
self.precise_havana, self.precise_icehouse,
self.trusty_icehouse, self.trusty_juno, self.utopic_juno,
self.trusty_kilo, self.vivid_kilo, self.trusty_liberty,
self.wily_liberty) = range(12)
self.wily_liberty, self.trusty_mitaka,
self.xenial_mitaka) = range(14)
releases = {
('precise', None): self.precise_essex,
@ -177,9 +238,11 @@ class OpenStackAmuletDeployment(AmuletDeployment):
('trusty', 'cloud:trusty-juno'): self.trusty_juno,
('trusty', 'cloud:trusty-kilo'): self.trusty_kilo,
('trusty', 'cloud:trusty-liberty'): self.trusty_liberty,
('trusty', 'cloud:trusty-mitaka'): self.trusty_mitaka,
('utopic', None): self.utopic_juno,
('vivid', None): self.vivid_kilo,
('wily', None): self.wily_liberty}
('wily', None): self.wily_liberty,
('xenial', None): self.xenial_mitaka}
return releases[(self.series, self.openstack)]
def _get_openstack_release_string(self):
@ -196,6 +259,7 @@ class OpenStackAmuletDeployment(AmuletDeployment):
('utopic', 'juno'),
('vivid', 'kilo'),
('wily', 'liberty'),
('xenial', 'mitaka'),
])
if self.openstack:
os_origin = self.openstack.split(':')[1]

View File

@ -18,6 +18,7 @@ import amulet
import json
import logging
import os
import re
import six
import time
import urllib
@ -605,6 +606,21 @@ class OpenStackAmuletUtils(AmuletUtils):
return None
# rabbitmq/amqp specific helpers:
def rmq_wait_for_cluster(self, deployment, init_sleep=15, timeout=1200):
"""Wait for rmq units extended status to show cluster readiness,
after an optional initial sleep period. Initial sleep is likely
necessary to be effective following a config change, as status
message may not instantly update to non-ready."""
if init_sleep:
time.sleep(init_sleep)
message = re.compile('^Unit is ready and clustered$')
deployment._auto_wait_for_status(message=message,
timeout=timeout,
include_only=['rabbitmq-server'])
def add_rmq_test_user(self, sentry_units,
username="testuser1", password="changeme"):
"""Add a test user via the first rmq juju unit, check connection as
@ -805,7 +821,10 @@ class OpenStackAmuletUtils(AmuletUtils):
if port:
config['ssl_port'] = port
deployment.configure('rabbitmq-server', config)
deployment.d.configure('rabbitmq-server', config)
# Wait for unit status
self.rmq_wait_for_cluster(deployment)
# Confirm
tries = 0
@ -832,7 +851,10 @@ class OpenStackAmuletUtils(AmuletUtils):
# Disable RMQ SSL
config = {'ssl': 'off'}
deployment.configure('rabbitmq-server', config)
deployment.d.configure('rabbitmq-server', config)
# Wait for unit status
self.rmq_wait_for_cluster(deployment)
# Confirm
tries = 0

View File

@ -626,6 +626,12 @@ class HAProxyContext(OSContextGenerator):
if config('haproxy-client-timeout'):
ctxt['haproxy_client_timeout'] = config('haproxy-client-timeout')
if config('haproxy-queue-timeout'):
ctxt['haproxy_queue_timeout'] = config('haproxy-queue-timeout')
if config('haproxy-connect-timeout'):
ctxt['haproxy_connect_timeout'] = config('haproxy-connect-timeout')
if config('prefer-ipv6'):
ctxt['ipv6'] = True
ctxt['local_host'] = 'ip6-localhost'
@ -1088,6 +1094,20 @@ class OSConfigFlagContext(OSContextGenerator):
config_flags_parser(config_flags)}
class LibvirtConfigFlagsContext(OSContextGenerator):
"""
This context provides support for extending
the libvirt section through user-defined flags.
"""
def __call__(self):
ctxt = {}
libvirt_flags = config('libvirt-flags')
if libvirt_flags:
ctxt['libvirt_flags'] = config_flags_parser(
libvirt_flags)
return ctxt
class SubordinateConfigContext(OSContextGenerator):
"""

View File

@ -9,15 +9,17 @@
CRITICAL=0
NOTACTIVE=''
LOGFILE=/var/log/nagios/check_haproxy.log
AUTH=$(grep -r "stats auth" /etc/haproxy | head -1 | awk '{print $4}')
AUTH=$(grep -r "stats auth" /etc/haproxy | awk 'NR=1{print $4}')
for appserver in $(grep ' server' /etc/haproxy/haproxy.cfg | awk '{print $2'});
typeset -i N_INSTANCES=0
for appserver in $(awk '/^\s+server/{print $2}' /etc/haproxy/haproxy.cfg)
do
output=$(/usr/lib/nagios/plugins/check_http -a ${AUTH} -I 127.0.0.1 -p 8888 --regex="class=\"(active|backup)(2|3).*${appserver}" -e ' 200 OK')
N_INSTANCES=N_INSTANCES+1
output=$(/usr/lib/nagios/plugins/check_http -a ${AUTH} -I 127.0.0.1 -p 8888 -u '/;csv' --regex=",${appserver},.*,UP.*" -e ' 200 OK')
if [ $? != 0 ]; then
date >> $LOGFILE
echo $output >> $LOGFILE
/usr/lib/nagios/plugins/check_http -a ${AUTH} -I 127.0.0.1 -p 8888 -v | grep $appserver >> $LOGFILE 2>&1
/usr/lib/nagios/plugins/check_http -a ${AUTH} -I 127.0.0.1 -p 8888 -u '/;csv' -v | grep ",${appserver}," >> $LOGFILE 2>&1
CRITICAL=1
NOTACTIVE="${NOTACTIVE} $appserver"
fi
@ -28,5 +30,5 @@ if [ $CRITICAL = 1 ]; then
exit 2
fi
echo "OK: All haproxy instances looking good"
echo "OK: All haproxy instances ($N_INSTANCES) looking good"
exit 0

View File

@ -204,8 +204,8 @@ def neutron_plugins():
database=config('database'),
ssl_dir=NEUTRON_CONF_DIR)],
'services': [],
'packages': [['plumgrid-lxc'],
['iovisor-dkms']],
'packages': ['plumgrid-lxc',
'iovisor-dkms'],
'server_packages': ['neutron-server',
'neutron-plugin-plumgrid'],
'server_services': ['neutron-server']

View File

@ -12,19 +12,26 @@ defaults
option tcplog
option dontlognull
retries 3
timeout queue 1000
timeout connect 1000
{% if haproxy_client_timeout -%}
{%- if haproxy_queue_timeout %}
timeout queue {{ haproxy_queue_timeout }}
{%- else %}
timeout queue 5000
{%- endif %}
{%- if haproxy_connect_timeout %}
timeout connect {{ haproxy_connect_timeout }}
{%- else %}
timeout connect 5000
{%- endif %}
{%- if haproxy_client_timeout %}
timeout client {{ haproxy_client_timeout }}
{% else -%}
{%- else %}
timeout client 30000
{% endif -%}
{% if haproxy_server_timeout -%}
{%- endif %}
{%- if haproxy_server_timeout %}
timeout server {{ haproxy_server_timeout }}
{% else -%}
{%- else %}
timeout server 30000
{% endif -%}
{%- endif %}
listen stats {{ stat_port }}
mode http

View File

@ -26,6 +26,7 @@ import re
import six
import traceback
import uuid
import yaml
from charmhelpers.contrib.network import ip
@ -41,6 +42,7 @@ from charmhelpers.core.hookenv import (
log as juju_log,
charm_dir,
INFO,
related_units,
relation_ids,
relation_set,
status_set,
@ -84,6 +86,7 @@ UBUNTU_OPENSTACK_RELEASE = OrderedDict([
('utopic', 'juno'),
('vivid', 'kilo'),
('wily', 'liberty'),
('xenial', 'mitaka'),
])
@ -97,6 +100,7 @@ OPENSTACK_CODENAMES = OrderedDict([
('2014.2', 'juno'),
('2015.1', 'kilo'),
('2015.2', 'liberty'),
('2016.1', 'mitaka'),
])
# The ugly duckling
@ -128,30 +132,39 @@ SWIFT_CODENAMES = OrderedDict([
PACKAGE_CODENAMES = {
'nova-common': OrderedDict([
('12.0.0', 'liberty'),
('13.0.0', 'mitaka'),
]),
'neutron-common': OrderedDict([
('7.0.0', 'liberty'),
('8.0.0', 'mitaka'),
]),
'cinder-common': OrderedDict([
('7.0.0', 'liberty'),
('8.0.0', 'mitaka'),
]),
'keystone': OrderedDict([
('8.0.0', 'liberty'),
('9.0.0', 'mitaka'),
]),
'horizon-common': OrderedDict([
('8.0.0', 'liberty'),
('9.0.0', 'mitaka'),
]),
'ceilometer-common': OrderedDict([
('5.0.0', 'liberty'),
('6.0.0', 'mitaka'),
]),
'heat-common': OrderedDict([
('5.0.0', 'liberty'),
('6.0.0', 'mitaka'),
]),
'glance-common': OrderedDict([
('11.0.0', 'liberty'),
('12.0.0', 'mitaka'),
]),
'openstack-dashboard': OrderedDict([
('8.0.0', 'liberty'),
('9.0.0', 'mitaka'),
]),
}
@ -375,6 +388,9 @@ def configure_installation_source(rel):
'liberty': 'trusty-updates/liberty',
'liberty/updates': 'trusty-updates/liberty',
'liberty/proposed': 'trusty-proposed/liberty',
'mitaka': 'trusty-updates/mitaka',
'mitaka/updates': 'trusty-updates/mitaka',
'mitaka/proposed': 'trusty-proposed/mitaka',
}
try:
@ -859,7 +875,9 @@ def set_os_workload_status(configs, required_interfaces, charm_func=None):
if charm_state != 'active' and charm_state != 'unknown':
state = workload_state_compare(state, charm_state)
if message:
message = "{} {}".format(message, charm_message)
charm_message = charm_message.replace("Incomplete relations: ",
"")
message = "{}, {}".format(message, charm_message)
else:
message = charm_message
@ -976,3 +994,19 @@ def do_action_openstack_upgrade(package, upgrade_callback, configs):
action_set({'outcome': 'no upgrade available.'})
return ret
def remote_restart(rel_name, remote_service=None):
trigger = {
'restart-trigger': str(uuid.uuid4()),
}
if remote_service:
trigger['remote-service'] = remote_service
for rid in relation_ids(rel_name):
# This subordinate can be related to two seperate services using
# different subordinate relations so only issue the restart if
# the principle is conencted down the relation we think it is
if related_units(relid=rid):
relation_set(relation_id=rid,
relation_settings=trigger,
)

View File

@ -42,8 +42,12 @@ def parse_options(given, available):
yield "--{0}={1}".format(key, value)
def pip_install_requirements(requirements, **options):
"""Install a requirements file """
def pip_install_requirements(requirements, constraints=None, **options):
"""Install a requirements file.
:param constraints: Path to pip constraints file.
http://pip.readthedocs.org/en/stable/user_guide/#constraints-files
"""
command = ["install"]
available_options = ('proxy', 'src', 'log', )
@ -51,6 +55,11 @@ def pip_install_requirements(requirements, **options):
command.append(option)
command.append("-r {0}".format(requirements))
if constraints:
command.append("-c {0}".format(constraints))
log("Installing from file: {} with constraints {} "
"and options: {}".format(requirements, constraints, command))
else:
log("Installing from file: {} with options: {}".format(requirements,
command))
pip_execute(command)

View File

@ -23,6 +23,8 @@
# James Page <james.page@ubuntu.com>
# Adam Gandelman <adamg@ubuntu.com>
#
import bisect
import six
import os
import shutil
@ -72,6 +74,394 @@ log to syslog = {use_syslog}
err to syslog = {use_syslog}
clog to syslog = {use_syslog}
"""
# For 50 < osds < 240,000 OSDs (Roughly 1 Exabyte at 6T OSDs)
powers_of_two = [8192, 16384, 32768, 65536, 131072, 262144, 524288, 1048576, 2097152, 4194304, 8388608]
def validator(value, valid_type, valid_range=None):
"""
Used to validate these: http://docs.ceph.com/docs/master/rados/operations/pools/#set-pool-values
Example input:
validator(value=1,
valid_type=int,
valid_range=[0, 2])
This says I'm testing value=1. It must be an int inclusive in [0,2]
:param value: The value to validate
:param valid_type: The type that value should be.
:param valid_range: A range of values that value can assume.
:return:
"""
assert isinstance(value, valid_type), "{} is not a {}".format(
value,
valid_type)
if valid_range is not None:
assert isinstance(valid_range, list), \
"valid_range must be a list, was given {}".format(valid_range)
# If we're dealing with strings
if valid_type is six.string_types:
assert value in valid_range, \
"{} is not in the list {}".format(value, valid_range)
# Integer, float should have a min and max
else:
if len(valid_range) != 2:
raise ValueError(
"Invalid valid_range list of {} for {}. "
"List must be [min,max]".format(valid_range, value))
assert value >= valid_range[0], \
"{} is less than minimum allowed value of {}".format(
value, valid_range[0])
assert value <= valid_range[1], \
"{} is greater than maximum allowed value of {}".format(
value, valid_range[1])
class PoolCreationError(Exception):
"""
A custom error to inform the caller that a pool creation failed. Provides an error message
"""
def __init__(self, message):
super(PoolCreationError, self).__init__(message)
class Pool(object):
"""
An object oriented approach to Ceph pool creation. This base class is inherited by ReplicatedPool and ErasurePool.
Do not call create() on this base class as it will not do anything. Instantiate a child class and call create().
"""
def __init__(self, service, name):
self.service = service
self.name = name
# Create the pool if it doesn't exist already
# To be implemented by subclasses
def create(self):
pass
def add_cache_tier(self, cache_pool, mode):
"""
Adds a new cache tier to an existing pool.
:param cache_pool: six.string_types. The cache tier pool name to add.
:param mode: six.string_types. The caching mode to use for this pool. valid range = ["readonly", "writeback"]
:return: None
"""
# Check the input types and values
validator(value=cache_pool, valid_type=six.string_types)
validator(value=mode, valid_type=six.string_types, valid_range=["readonly", "writeback"])
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'add', self.name, cache_pool])
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, mode])
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'set-overlay', self.name, cache_pool])
check_call(['ceph', '--id', self.service, 'osd', 'pool', 'set', cache_pool, 'hit_set_type', 'bloom'])
def remove_cache_tier(self, cache_pool):
"""
Removes a cache tier from Ceph. Flushes all dirty objects from writeback pools and waits for that to complete.
:param cache_pool: six.string_types. The cache tier pool name to remove.
:return: None
"""
# read-only is easy, writeback is much harder
mode = get_cache_mode(cache_pool)
if mode == 'readonly':
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none'])
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
elif mode == 'writeback':
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'forward'])
# Flush the cache and wait for it to return
check_call(['ceph', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all'])
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name])
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
def get_pgs(self, pool_size):
"""
:param pool_size: int. pool_size is either the number of replicas for replicated pools or the K+M sum for
erasure coded pools
:return: int. The number of pgs to use.
"""
validator(value=pool_size, valid_type=int)
osds = get_osds(self.service)
if not osds:
# NOTE(james-page): Default to 200 for older ceph versions
# which don't support OSD query from cli
return 200
# Calculate based on Ceph best practices
if osds < 5:
return 128
elif 5 < osds < 10:
return 512
elif 10 < osds < 50:
return 4096
else:
estimate = (osds * 100) / pool_size
# Return the next nearest power of 2
index = bisect.bisect_right(powers_of_two, estimate)
return powers_of_two[index]
class ReplicatedPool(Pool):
def __init__(self, service, name, replicas=2):
super(ReplicatedPool, self).__init__(service=service, name=name)
self.replicas = replicas
def create(self):
if not pool_exists(self.service, self.name):
# Create it
pgs = self.get_pgs(self.replicas)
cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create', self.name, str(pgs)]
try:
check_call(cmd)
except CalledProcessError:
raise
# Default jerasure erasure coded pool
class ErasurePool(Pool):
def __init__(self, service, name, erasure_code_profile="default"):
super(ErasurePool, self).__init__(service=service, name=name)
self.erasure_code_profile = erasure_code_profile
def create(self):
if not pool_exists(self.service, self.name):
# Try to find the erasure profile information so we can properly size the pgs
erasure_profile = get_erasure_profile(service=self.service, name=self.erasure_code_profile)
# Check for errors
if erasure_profile is None:
log(message='Failed to discover erasure_profile named={}'.format(self.erasure_code_profile),
level=ERROR)
raise PoolCreationError(message='unable to find erasure profile {}'.format(self.erasure_code_profile))
if 'k' not in erasure_profile or 'm' not in erasure_profile:
# Error
log(message='Unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile),
level=ERROR)
raise PoolCreationError(
message='unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile))
pgs = self.get_pgs(int(erasure_profile['k']) + int(erasure_profile['m']))
# Create it
cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create', self.name, str(pgs),
'erasure', self.erasure_code_profile]
try:
check_call(cmd)
except CalledProcessError:
raise
"""Get an existing erasure code profile if it already exists.
Returns json formatted output"""
def get_erasure_profile(service, name):
"""
:param service: six.string_types. The Ceph user name to run the command under
:param name:
:return:
"""
try:
out = check_output(['ceph', '--id', service,
'osd', 'erasure-code-profile', 'get',
name, '--format=json'])
return json.loads(out)
except (CalledProcessError, OSError, ValueError):
return None
def pool_set(service, pool_name, key, value):
"""
Sets a value for a RADOS pool in ceph.
:param service: six.string_types. The Ceph user name to run the command under
:param pool_name: six.string_types
:param key: six.string_types
:param value:
:return: None. Can raise CalledProcessError
"""
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', pool_name, key, value]
try:
check_call(cmd)
except CalledProcessError:
raise
def snapshot_pool(service, pool_name, snapshot_name):
"""
Snapshots a RADOS pool in ceph.
:param service: six.string_types. The Ceph user name to run the command under
:param pool_name: six.string_types
:param snapshot_name: six.string_types
:return: None. Can raise CalledProcessError
"""
cmd = ['ceph', '--id', service, 'osd', 'pool', 'mksnap', pool_name, snapshot_name]
try:
check_call(cmd)
except CalledProcessError:
raise
def remove_pool_snapshot(service, pool_name, snapshot_name):
"""
Remove a snapshot from a RADOS pool in ceph.
:param service: six.string_types. The Ceph user name to run the command under
:param pool_name: six.string_types
:param snapshot_name: six.string_types
:return: None. Can raise CalledProcessError
"""
cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name]
try:
check_call(cmd)
except CalledProcessError:
raise
# max_bytes should be an int or long
def set_pool_quota(service, pool_name, max_bytes):
"""
:param service: six.string_types. The Ceph user name to run the command under
:param pool_name: six.string_types
:param max_bytes: int or long
:return: None. Can raise CalledProcessError
"""
# Set a byte quota on a RADOS pool in ceph.
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, 'max_bytes', max_bytes]
try:
check_call(cmd)
except CalledProcessError:
raise
def remove_pool_quota(service, pool_name):
"""
Set a byte quota on a RADOS pool in ceph.
:param service: six.string_types. The Ceph user name to run the command under
:param pool_name: six.string_types
:return: None. Can raise CalledProcessError
"""
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, 'max_bytes', '0']
try:
check_call(cmd)
except CalledProcessError:
raise
def create_erasure_profile(service, profile_name, erasure_plugin_name='jerasure', failure_domain='host',
data_chunks=2, coding_chunks=1,
locality=None, durability_estimator=None):
"""
Create a new erasure code profile if one does not already exist for it. Updates
the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
for more details
:param service: six.string_types. The Ceph user name to run the command under
:param profile_name: six.string_types
:param erasure_plugin_name: six.string_types
:param failure_domain: six.string_types. One of ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region',
'room', 'root', 'row'])
:param data_chunks: int
:param coding_chunks: int
:param locality: int
:param durability_estimator: int
:return: None. Can raise CalledProcessError
"""
# Ensure this failure_domain is allowed by Ceph
validator(failure_domain, six.string_types,
['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', 'room', 'root', 'row'])
cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'set', profile_name,
'plugin=' + erasure_plugin_name, 'k=' + str(data_chunks), 'm=' + str(coding_chunks),
'ruleset_failure_domain=' + failure_domain]
if locality is not None and durability_estimator is not None:
raise ValueError("create_erasure_profile should be called with k, m and one of l or c but not both.")
# Add plugin specific information
if locality is not None:
# For local erasure codes
cmd.append('l=' + str(locality))
if durability_estimator is not None:
# For Shec erasure codes
cmd.append('c=' + str(durability_estimator))
if erasure_profile_exists(service, profile_name):
cmd.append('--force')
try:
check_call(cmd)
except CalledProcessError:
raise
def rename_pool(service, old_name, new_name):
"""
Rename a Ceph pool from old_name to new_name
:param service: six.string_types. The Ceph user name to run the command under
:param old_name: six.string_types
:param new_name: six.string_types
:return: None
"""
validator(value=old_name, valid_type=six.string_types)
validator(value=new_name, valid_type=six.string_types)
cmd = ['ceph', '--id', service, 'osd', 'pool', 'rename', old_name, new_name]
check_call(cmd)
def erasure_profile_exists(service, name):
"""
Check to see if an Erasure code profile already exists.
:param service: six.string_types. The Ceph user name to run the command under
:param name: six.string_types
:return: int or None
"""
validator(value=name, valid_type=six.string_types)
try:
check_call(['ceph', '--id', service,
'osd', 'erasure-code-profile', 'get',
name])
return True
except CalledProcessError:
return False
def get_cache_mode(service, pool_name):
"""
Find the current caching mode of the pool_name given.
:param service: six.string_types. The Ceph user name to run the command under
:param pool_name: six.string_types
:return: int or None
"""
validator(value=service, valid_type=six.string_types)
validator(value=pool_name, valid_type=six.string_types)
out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json'])
try:
osd_json = json.loads(out)
for pool in osd_json['pools']:
if pool['pool_name'] == pool_name:
return pool['cache_mode']
return None
except ValueError:
raise
def pool_exists(service, name):
"""Check to see if a RADOS pool already exists."""
try:
out = check_output(['rados', '--id', service,
'lspools']).decode('UTF-8')
except CalledProcessError:
return False
return name in out
def get_osds(service):
"""Return a list of all Ceph Object Storage Daemons currently in the
cluster.
"""
version = ceph_version()
if version and version >= '0.56':
return json.loads(check_output(['ceph', '--id', service,
'osd', 'ls',
'--format=json']).decode('UTF-8'))
return None
def install():
@ -101,53 +491,37 @@ def create_rbd_image(service, pool, image, sizemb):
check_call(cmd)
def pool_exists(service, name):
"""Check to see if a RADOS pool already exists."""
try:
out = check_output(['rados', '--id', service,
'lspools']).decode('UTF-8')
except CalledProcessError:
return False
def update_pool(client, pool, settings):
cmd = ['ceph', '--id', client, 'osd', 'pool', 'set', pool]
for k, v in six.iteritems(settings):
cmd.append(k)
cmd.append(v)
return name in out
check_call(cmd)
def get_osds(service):
"""Return a list of all Ceph Object Storage Daemons currently in the
cluster.
"""
version = ceph_version()
if version and version >= '0.56':
return json.loads(check_output(['ceph', '--id', service,
'osd', 'ls',
'--format=json']).decode('UTF-8'))
return None
def create_pool(service, name, replicas=3):
def create_pool(service, name, replicas=3, pg_num=None):
"""Create a new RADOS pool."""
if pool_exists(service, name):
log("Ceph pool {} already exists, skipping creation".format(name),
level=WARNING)
return
if not pg_num:
# Calculate the number of placement groups based
# on upstream recommended best practices.
osds = get_osds(service)
if osds:
pgnum = (len(osds) * 100 // replicas)
pg_num = (len(osds) * 100 // replicas)
else:
# NOTE(james-page): Default to 200 for older ceph versions
# which don't support OSD query from cli
pgnum = 200
pg_num = 200
cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pgnum)]
cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pg_num)]
check_call(cmd)
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', name, 'size',
str(replicas)]
check_call(cmd)
update_pool(service, name, settings={'size': str(replicas)})
def delete_pool(service, name):
@ -202,10 +576,10 @@ def create_key_file(service, key):
log('Created new keyfile at %s.' % keyfile, level=INFO)
def get_ceph_nodes():
"""Query named relation 'ceph' to determine current nodes."""
def get_ceph_nodes(relation='ceph'):
"""Query named relation to determine current nodes."""
hosts = []
for r_id in relation_ids('ceph'):
for r_id in relation_ids(relation):
for unit in related_units(r_id):
hosts.append(relation_get('private-address', unit=unit, rid=r_id))
@ -357,14 +731,14 @@ def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
service_start(svc)
def ensure_ceph_keyring(service, user=None, group=None):
def ensure_ceph_keyring(service, user=None, group=None, relation='ceph'):
"""Ensures a ceph keyring is created for a named service and optionally
ensures user and group ownership.
Returns False if no ceph key is available in relation state.
"""
key = None
for rid in relation_ids('ceph'):
for rid in relation_ids(relation):
for unit in related_units(rid):
key = relation_get('key', rid=rid, unit=unit)
if key:
@ -405,6 +779,7 @@ class CephBrokerRq(object):
The API is versioned and defaults to version 1.
"""
def __init__(self, api_version=1, request_id=None):
self.api_version = api_version
if request_id:
@ -413,9 +788,16 @@ class CephBrokerRq(object):
self.request_id = str(uuid.uuid1())
self.ops = []
def add_op_create_pool(self, name, replica_count=3):
def add_op_create_pool(self, name, replica_count=3, pg_num=None):
"""Adds an operation to create a pool.
@param pg_num setting: optional setting. If not provided, this value
will be calculated by the broker based on how many OSDs are in the
cluster at the time of creation. Note that, if provided, this value
will be capped at the current available maximum.
"""
self.ops.append({'op': 'create-pool', 'name': name,
'replicas': replica_count})
'replicas': replica_count, 'pg_num': pg_num})
def set_ops(self, ops):
"""Set request ops to provided value.
@ -433,8 +815,8 @@ class CephBrokerRq(object):
def _ops_equal(self, other):
if len(self.ops) == len(other.ops):
for req_no in range(0, len(self.ops)):
for key in ['replicas', 'name', 'op']:
if self.ops[req_no][key] != other.ops[req_no][key]:
for key in ['replicas', 'name', 'op', 'pg_num']:
if self.ops[req_no].get(key) != other.ops[req_no].get(key):
return False
else:
return False
@ -540,7 +922,7 @@ def get_previous_request(rid):
return request
def get_request_states(request):
def get_request_states(request, relation='ceph'):
"""Return a dict of requests per relation id with their corresponding
completion state.
@ -552,7 +934,7 @@ def get_request_states(request):
"""
complete = []
requests = {}
for rid in relation_ids('ceph'):
for rid in relation_ids(relation):
complete = False
previous_request = get_previous_request(rid)
if request == previous_request:
@ -570,14 +952,14 @@ def get_request_states(request):
return requests
def is_request_sent(request):
def is_request_sent(request, relation='ceph'):
"""Check to see if a functionally equivalent request has already been sent
Returns True if a similair request has been sent
@param request: A CephBrokerRq object
"""
states = get_request_states(request)
states = get_request_states(request, relation=relation)
for rid in states.keys():
if not states[rid]['sent']:
return False
@ -585,7 +967,7 @@ def is_request_sent(request):
return True
def is_request_complete(request):
def is_request_complete(request, relation='ceph'):
"""Check to see if a functionally equivalent request has already been
completed
@ -593,7 +975,7 @@ def is_request_complete(request):
@param request: A CephBrokerRq object
"""
states = get_request_states(request)
states = get_request_states(request, relation=relation)
for rid in states.keys():
if not states[rid]['complete']:
return False
@ -643,15 +1025,15 @@ def get_broker_rsp_key():
return 'broker-rsp-' + local_unit().replace('/', '-')
def send_request_if_needed(request):
def send_request_if_needed(request, relation='ceph'):
"""Send broker request if an equivalent request has not already been sent
@param request: A CephBrokerRq object
"""
if is_request_sent(request):
if is_request_sent(request, relation=relation):
log('Request already sent but not complete, not sending new request',
level=DEBUG)
else:
for rid in relation_ids('ceph'):
for rid in relation_ids(relation):
log('Sending request {}'.format(request.request_id), level=DEBUG)
relation_set(relation_id=rid, broker_req=request.request)

View File

@ -76,3 +76,13 @@ def ensure_loopback_device(path, size):
check_call(cmd)
return create_loopback(path)
def is_mapped_loopback_device(device):
"""
Checks if a given device name is an existing/mapped loopback device.
:param device: str: Full path to the device (eg, /dev/loop1).
:returns: str: Path to the backing file if is a loopback device
empty string otherwise
"""
return loopback_devices().get(device, "")

View File

@ -490,6 +490,19 @@ def relation_types():
return rel_types
@cached
def peer_relation_id():
'''Get the peers relation id if a peers relation has been joined, else None.'''
md = metadata()
section = md.get('peers')
if section:
for key in section:
relids = relation_ids(key)
if relids:
return relids[0]
return None
@cached
def relation_to_interface(relation_name):
"""
@ -504,12 +517,12 @@ def relation_to_interface(relation_name):
def relation_to_role_and_interface(relation_name):
"""
Given the name of a relation, return the role and the name of the interface
that relation uses (where role is one of ``provides``, ``requires``, or ``peer``).
that relation uses (where role is one of ``provides``, ``requires``, or ``peers``).
:returns: A tuple containing ``(role, interface)``, or ``(None, None)``.
"""
_metadata = metadata()
for role in ('provides', 'requires', 'peer'):
for role in ('provides', 'requires', 'peers'):
interface = _metadata.get(role, {}).get(relation_name, {}).get('interface')
if interface:
return role, interface
@ -521,7 +534,7 @@ def role_and_interface_to_relations(role, interface_name):
"""
Given a role and interface name, return a list of relation names for the
current charm that use that interface under that role (where role is one
of ``provides``, ``requires``, or ``peer``).
of ``provides``, ``requires``, or ``peers``).
:returns: A list of relation names.
"""
@ -542,7 +555,7 @@ def interface_to_relations(interface_name):
:returns: A list of relation names.
"""
results = []
for role in ('provides', 'requires', 'peer'):
for role in ('provides', 'requires', 'peers'):
results.extend(role_and_interface_to_relations(role, interface_name))
return results
@ -624,7 +637,7 @@ def unit_private_ip():
@cached
def storage_get(attribute="", storage_id=""):
def storage_get(attribute=None, storage_id=None):
"""Get storage attributes"""
_args = ['storage-get', '--format=json']
if storage_id:
@ -638,7 +651,7 @@ def storage_get(attribute="", storage_id=""):
@cached
def storage_list(storage_name=""):
def storage_list(storage_name=None):
"""List the storage IDs for the unit"""
_args = ['storage-list', '--format=json']
if storage_name:
@ -820,6 +833,7 @@ def status_get():
def translate_exc(from_exc, to_exc):
def inner_translate_exc1(f):
@wraps(f)
def inner_translate_exc2(*args, **kwargs):
try:
return f(*args, **kwargs)
@ -864,6 +878,40 @@ def leader_set(settings=None, **kwargs):
subprocess.check_call(cmd)
@translate_exc(from_exc=OSError, to_exc=NotImplementedError)
def payload_register(ptype, klass, pid):
""" is used while a hook is running to let Juju know that a
payload has been started."""
cmd = ['payload-register']
for x in [ptype, klass, pid]:
cmd.append(x)
subprocess.check_call(cmd)
@translate_exc(from_exc=OSError, to_exc=NotImplementedError)
def payload_unregister(klass, pid):
""" is used while a hook is running to let Juju know
that a payload has been manually stopped. The <class> and <id> provided
must match a payload that has been previously registered with juju using
payload-register."""
cmd = ['payload-unregister']
for x in [klass, pid]:
cmd.append(x)
subprocess.check_call(cmd)
@translate_exc(from_exc=OSError, to_exc=NotImplementedError)
def payload_status_set(klass, pid, status):
"""is used to update the current status of a registered payload.
The <class> and <id> provided must match a payload that has been previously
registered with juju using payload-register. The <status> must be one of the
follow: starting, started, stopping, stopped"""
cmd = ['payload-status-set']
for x in [klass, pid, status]:
cmd.append(x)
subprocess.check_call(cmd)
@cached
def juju_version():
"""Full version string (eg. '1.23.3.1-trusty-amd64')"""

View File

@ -67,6 +67,8 @@ def service_pause(service_name, init_dir="/etc/init", initd_dir="/etc/init.d"):
"""Pause a system service.
Stop it, and prevent it from starting again at boot."""
stopped = True
if service_running(service_name):
stopped = service_stop(service_name)
upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
sysv_file = os.path.join(initd_dir, service_name)
@ -105,6 +107,8 @@ def service_resume(service_name, init_dir="/etc/init",
"Unable to detect {0} as either Upstart {1} or SysV {2}".format(
service_name, upstart_file, sysv_file))
started = service_running(service_name)
if not started:
started = service_start(service_name)
return started
@ -142,8 +146,22 @@ def service_available(service_name):
return True
def adduser(username, password=None, shell='/bin/bash', system_user=False):
"""Add a user to the system"""
def adduser(username, password=None, shell='/bin/bash', system_user=False,
primary_group=None, secondary_groups=None):
"""
Add a user to the system.
Will log but otherwise succeed if the user already exists.
:param str username: Username to create
:param str password: Password for user; if ``None``, create a system user
:param str shell: The default shell for the user
:param bool system_user: Whether to create a login or system user
:param str primary_group: Primary group for user; defaults to their username
:param list secondary_groups: Optional list of additional groups
:returns: The password database entry struct, as returned by `pwd.getpwnam`
"""
try:
user_info = pwd.getpwnam(username)
log('user {0} already exists!'.format(username))
@ -158,6 +176,16 @@ def adduser(username, password=None, shell='/bin/bash', system_user=False):
'--shell', shell,
'--password', password,
])
if not primary_group:
try:
grp.getgrnam(username)
primary_group = username # avoid "group exists" error
except KeyError:
pass
if primary_group:
cmd.extend(['-g', primary_group])
if secondary_groups:
cmd.extend(['-G', ','.join(secondary_groups)])
cmd.append(username)
subprocess.check_call(cmd)
user_info = pwd.getpwnam(username)
@ -595,3 +623,19 @@ def chownr(path, owner, group, follow_links=True, chowntopdir=False):
def lchownr(path, owner, group):
chownr(path, owner, group, follow_links=False)
def get_total_ram():
'''The total amount of system RAM in bytes.
This is what is reported by the OS, and may be overcommitted when
there are multiple containers hosted on the same machine.
'''
with open('/proc/meminfo', 'r') as f:
for line in f.readlines():
if line:
key, value, unit = line.split()
if key == 'MemTotal:':
assert unit == 'kB', 'Unknown unit'
return int(value) * 1024 # Classic, not KiB.
raise NotImplementedError()

View File

@ -46,6 +46,8 @@ def hugepage_support(user, group='hugetlb', nr_hugepages=256,
group_info = add_group(group)
gid = group_info.gr_gid
add_user_to_group(user, group)
if max_map_count < 2 * nr_hugepages:
max_map_count = 2 * nr_hugepages
sysctl_settings = {
'vm.nr_hugepages': nr_hugepages,
'vm.max_map_count': max_map_count,

View File

@ -243,33 +243,40 @@ class TemplateCallback(ManagerCallback):
:param str source: The template source file, relative to
`$CHARM_DIR/templates`
:param str target: The target to write the rendered template to
:param str target: The target to write the rendered template to (or None)
:param str owner: The owner of the rendered file
:param str group: The group of the rendered file
:param int perms: The permissions of the rendered file
:param partial on_change_action: functools partial to be executed when
rendered file changes
:param jinja2 loader template_loader: A jinja2 template loader
:return str: The rendered template
"""
def __init__(self, source, target,
owner='root', group='root', perms=0o444,
on_change_action=None):
on_change_action=None, template_loader=None):
self.source = source
self.target = target
self.owner = owner
self.group = group
self.perms = perms
self.on_change_action = on_change_action
self.template_loader = template_loader
def __call__(self, manager, service_name, event_name):
pre_checksum = ''
if self.on_change_action and os.path.isfile(self.target):
pre_checksum = host.file_hash(self.target)
service = manager.get_service(service_name)
context = {}
context = {'ctx': {}}
for ctx in service.get('required_data', []):
context.update(ctx)
templating.render(self.source, self.target, context,
self.owner, self.group, self.perms)
context['ctx'].update(ctx)
result = templating.render(self.source, self.target, context,
self.owner, self.group, self.perms,
template_loader=self.template_loader)
if self.on_change_action:
if pre_checksum == host.file_hash(self.target):
hookenv.log(
@ -278,6 +285,8 @@ class TemplateCallback(ManagerCallback):
else:
self.on_change_action()
return result
# Convenience aliases for templates
render_template = template = TemplateCallback

View File

@ -21,13 +21,14 @@ from charmhelpers.core import hookenv
def render(source, target, context, owner='root', group='root',
perms=0o444, templates_dir=None, encoding='UTF-8'):
perms=0o444, templates_dir=None, encoding='UTF-8', template_loader=None):
"""
Render a template.
The `source` path, if not absolute, is relative to the `templates_dir`.
The `target` path should be absolute.
The `target` path should be absolute. It can also be `None`, in which
case no file will be written.
The context should be a dict containing the values to be replaced in the
template.
@ -36,6 +37,9 @@ def render(source, target, context, owner='root', group='root',
If omitted, `templates_dir` defaults to the `templates` folder in the charm.
The rendered template will be written to the file as well as being returned
as a string.
Note: Using this requires python-jinja2; if it is not installed, calling
this will attempt to use charmhelpers.fetch.apt_install to install it.
"""
@ -52,17 +56,26 @@ def render(source, target, context, owner='root', group='root',
apt_install('python-jinja2', fatal=True)
from jinja2 import FileSystemLoader, Environment, exceptions
if template_loader:
template_env = Environment(loader=template_loader)
else:
if templates_dir is None:
templates_dir = os.path.join(hookenv.charm_dir(), 'templates')
loader = Environment(loader=FileSystemLoader(templates_dir))
template_env = Environment(loader=FileSystemLoader(templates_dir))
try:
source = source
template = loader.get_template(source)
template = template_env.get_template(source)
except exceptions.TemplateNotFound as e:
hookenv.log('Could not load template %s from %s.' %
(source, templates_dir),
level=hookenv.ERROR)
raise e
content = template.render(context)
if target is not None:
target_dir = os.path.dirname(target)
if not os.path.exists(target_dir):
# This is a terrible default directory permission, as the file
# or its siblings will often contain secrets.
host.mkdir(os.path.dirname(target), owner, group, perms=0o755)
host.write_file(target, content.encode(encoding), owner, group, perms)
return content

View File

@ -98,6 +98,14 @@ CLOUD_ARCHIVE_POCKETS = {
'liberty/proposed': 'trusty-proposed/liberty',
'trusty-liberty/proposed': 'trusty-proposed/liberty',
'trusty-proposed/liberty': 'trusty-proposed/liberty',
# Mitaka
'mitaka': 'trusty-updates/mitaka',
'trusty-mitaka': 'trusty-updates/mitaka',
'trusty-mitaka/updates': 'trusty-updates/mitaka',
'trusty-updates/mitaka': 'trusty-updates/mitaka',
'mitaka/proposed': 'trusty-proposed/mitaka',
'trusty-mitaka/proposed': 'trusty-proposed/mitaka',
'trusty-proposed/mitaka': 'trusty-proposed/mitaka',
}
# The order of this list is very important. Handlers should be listed in from
@ -225,12 +233,12 @@ def apt_purge(packages, fatal=False):
def apt_mark(packages, mark, fatal=False):
"""Flag one or more packages using apt-mark"""
log("Marking {} as {}".format(packages, mark))
cmd = ['apt-mark', mark]
if isinstance(packages, six.string_types):
cmd.append(packages)
else:
cmd.extend(packages)
log("Holding {}".format(packages))
if fatal:
subprocess.check_call(cmd, universal_newlines=True)
@ -411,7 +419,7 @@ def plugins(fetch_handlers=None):
importlib.import_module(package),
classname)
plugin_list.append(handler_class())
except (ImportError, AttributeError):
except NotImplementedError:
# Skip missing plugins so that they can be ommitted from
# installation if desired
log("FetchHandler {} not found, skipping plugin".format(

View File

@ -108,7 +108,7 @@ class ArchiveUrlFetchHandler(BaseFetchHandler):
install_opener(opener)
response = urlopen(source)
try:
with open(dest, 'w') as dest_file:
with open(dest, 'wb') as dest_file:
dest_file.write(response.read())
except Exception as e:
if os.path.isfile(dest):

View File

@ -15,60 +15,50 @@
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
import os
from subprocess import check_call
from charmhelpers.fetch import (
BaseFetchHandler,
UnhandledSource
UnhandledSource,
filter_installed_packages,
apt_install,
)
from charmhelpers.core.host import mkdir
import six
if six.PY3:
raise ImportError('bzrlib does not support Python3')
try:
from bzrlib.branch import Branch
from bzrlib import bzrdir, workingtree, errors
except ImportError:
from charmhelpers.fetch import apt_install
apt_install("python-bzrlib")
from bzrlib.branch import Branch
from bzrlib import bzrdir, workingtree, errors
if filter_installed_packages(['bzr']) != []:
apt_install(['bzr'])
if filter_installed_packages(['bzr']) != []:
raise NotImplementedError('Unable to install bzr')
class BzrUrlFetchHandler(BaseFetchHandler):
"""Handler for bazaar branches via generic and lp URLs"""
def can_handle(self, source):
url_parts = self.parse_url(source)
if url_parts.scheme not in ('bzr+ssh', 'lp'):
if url_parts.scheme not in ('bzr+ssh', 'lp', ''):
return False
elif not url_parts.scheme:
return os.path.exists(os.path.join(source, '.bzr'))
else:
return True
def branch(self, source, dest):
url_parts = self.parse_url(source)
# If we use lp:branchname scheme we need to load plugins
if not self.can_handle(source):
raise UnhandledSource("Cannot handle {}".format(source))
if url_parts.scheme == "lp":
from bzrlib.plugin import load_plugins
load_plugins()
try:
local_branch = bzrdir.BzrDir.create_branch_convenience(dest)
except errors.AlreadyControlDirError:
local_branch = Branch.open(dest)
try:
remote_branch = Branch.open(source)
remote_branch.push(local_branch)
tree = workingtree.WorkingTree.open(dest)
tree.update()
except Exception as e:
raise e
if os.path.exists(dest):
check_call(['bzr', 'pull', '--overwrite', '-d', dest, source])
else:
check_call(['bzr', 'branch', source, dest])
def install(self, source):
def install(self, source, dest=None):
url_parts = self.parse_url(source)
branch_name = url_parts.path.strip("/").split("/")[-1]
if dest:
dest_dir = os.path.join(dest, branch_name)
else:
dest_dir = os.path.join(os.environ.get('CHARM_DIR'), "fetched",
branch_name)
if not os.path.exists(dest_dir):
mkdir(dest_dir, perms=0o755)
try:

View File

@ -15,24 +15,19 @@
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
import os
from subprocess import check_call
from charmhelpers.fetch import (
BaseFetchHandler,
UnhandledSource
UnhandledSource,
filter_installed_packages,
apt_install,
)
from charmhelpers.core.host import mkdir
import six
if six.PY3:
raise ImportError('GitPython does not support Python 3')
try:
from git import Repo
except ImportError:
from charmhelpers.fetch import apt_install
apt_install("python-git")
from git import Repo
from git.exc import GitCommandError # noqa E402
if filter_installed_packages(['git']) != []:
apt_install(['git'])
if filter_installed_packages(['git']) != []:
raise NotImplementedError('Unable to install git')
class GitUrlFetchHandler(BaseFetchHandler):
@ -40,19 +35,24 @@ class GitUrlFetchHandler(BaseFetchHandler):
def can_handle(self, source):
url_parts = self.parse_url(source)
# TODO (mattyw) no support for ssh git@ yet
if url_parts.scheme not in ('http', 'https', 'git'):
if url_parts.scheme not in ('http', 'https', 'git', ''):
return False
elif not url_parts.scheme:
return os.path.exists(os.path.join(source, '.git'))
else:
return True
def clone(self, source, dest, branch, depth=None):
def clone(self, source, dest, branch="master", depth=None):
if not self.can_handle(source):
raise UnhandledSource("Cannot handle {}".format(source))
if depth:
Repo.clone_from(source, dest, branch=branch, depth=depth)
if os.path.exists(dest):
cmd = ['git', '-C', dest, 'pull', source, branch]
else:
Repo.clone_from(source, dest, branch=branch)
cmd = ['git', 'clone', source, dest, '--branch', branch]
if depth:
cmd.extend(['--depth', depth])
check_call(cmd)
def install(self, source, branch="master", dest=None, depth=None):
url_parts = self.parse_url(source)
@ -66,8 +66,6 @@ class GitUrlFetchHandler(BaseFetchHandler):
mkdir(dest_dir, perms=0o755)
try:
self.clone(source, dest_dir, branch, depth)
except GitCommandError as e:
raise UnhandledSource(e)
except OSError as e:
raise UnhandledSource(e.strerror)
return dest_dir

View File

@ -14,13 +14,18 @@
# You should have received a copy of the GNU Lesser General Public License
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
import logging
import re
import sys
import six
from collections import OrderedDict
from charmhelpers.contrib.amulet.deployment import (
AmuletDeployment
)
DEBUG = logging.DEBUG
ERROR = logging.ERROR
class OpenStackAmuletDeployment(AmuletDeployment):
"""OpenStack amulet deployment.
@ -29,9 +34,12 @@ class OpenStackAmuletDeployment(AmuletDeployment):
that is specifically for use by OpenStack charms.
"""
def __init__(self, series=None, openstack=None, source=None, stable=True):
def __init__(self, series=None, openstack=None, source=None,
stable=True, log_level=DEBUG):
"""Initialize the deployment environment."""
super(OpenStackAmuletDeployment, self).__init__(series)
self.log = self.get_logger(level=log_level)
self.log.info('OpenStackAmuletDeployment: init')
self.openstack = openstack
self.source = source
self.stable = stable
@ -39,6 +47,22 @@ class OpenStackAmuletDeployment(AmuletDeployment):
# out.
self.current_next = "trusty"
def get_logger(self, name="deployment-logger", level=logging.DEBUG):
"""Get a logger object that will log to stdout."""
log = logging
logger = log.getLogger(name)
fmt = log.Formatter("%(asctime)s %(funcName)s "
"%(levelname)s: %(message)s")
handler = log.StreamHandler(stream=sys.stdout)
handler.setLevel(level)
handler.setFormatter(fmt)
logger.addHandler(handler)
logger.setLevel(level)
return logger
def _determine_branch_locations(self, other_services):
"""Determine the branch locations for the other services.
@ -46,6 +70,8 @@ class OpenStackAmuletDeployment(AmuletDeployment):
stable or next (dev) branch, and based on this, use the corresonding
stable or next branches for the other_services."""
self.log.info('OpenStackAmuletDeployment: determine branch locations')
# Charms outside the lp:~openstack-charmers namespace
base_charms = ['mysql', 'mongodb', 'nrpe']
@ -83,6 +109,8 @@ class OpenStackAmuletDeployment(AmuletDeployment):
def _add_services(self, this_service, other_services):
"""Add services to the deployment and set openstack-origin/source."""
self.log.info('OpenStackAmuletDeployment: adding services')
other_services = self._determine_branch_locations(other_services)
super(OpenStackAmuletDeployment, self)._add_services(this_service,
@ -96,7 +124,8 @@ class OpenStackAmuletDeployment(AmuletDeployment):
'ceph-osd', 'ceph-radosgw']
# Charms which can not use openstack-origin, ie. many subordinates
no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe',
'openvswitch-odl', 'neutron-api-odl', 'odl-controller']
if self.openstack:
for svc in services:
@ -112,11 +141,12 @@ class OpenStackAmuletDeployment(AmuletDeployment):
def _configure_services(self, configs):
"""Configure all of the services."""
self.log.info('OpenStackAmuletDeployment: configure services')
for service, config in six.iteritems(configs):
self.d.configure(service, config)
def _auto_wait_for_status(self, message=None, exclude_services=None,
timeout=1800):
include_only=None, timeout=1800):
"""Wait for all units to have a specific extended status, except
for any defined as excluded. Unless specified via message, any
status containing any case of 'ready' will be considered a match.
@ -127,7 +157,7 @@ class OpenStackAmuletDeployment(AmuletDeployment):
message = re.compile('.*ready.*|.*ok.*', re.IGNORECASE)
Wait for all units to reach this status (exact match):
message = 'Unit is ready'
message = re.compile('^Unit is ready and clustered$')
Wait for all units to reach any one of these (exact match):
message = re.compile('Unit is ready|OK|Ready')
@ -139,20 +169,50 @@ class OpenStackAmuletDeployment(AmuletDeployment):
https://github.com/juju/amulet/blob/master/amulet/sentry.py
:param message: Expected status match
:param exclude_services: List of juju service names to ignore
:param exclude_services: List of juju service names to ignore,
not to be used in conjuction with include_only.
:param include_only: List of juju service names to exclusively check,
not to be used in conjuction with exclude_services.
:param timeout: Maximum time in seconds to wait for status match
:returns: None. Raises if timeout is hit.
"""
self.log.info('Waiting for extended status on units...')
if not message:
all_services = self.d.services.keys()
if exclude_services and include_only:
raise ValueError('exclude_services can not be used '
'with include_only')
if message:
if isinstance(message, re._pattern_type):
match = message.pattern
else:
match = message
self.log.debug('Custom extended status wait match: '
'{}'.format(match))
else:
self.log.debug('Default extended status wait match: contains '
'READY (case-insensitive)')
message = re.compile('.*ready.*', re.IGNORECASE)
if not exclude_services:
if exclude_services:
self.log.debug('Excluding services from extended status match: '
'{}'.format(exclude_services))
else:
exclude_services = []
services = list(set(self.d.services.keys()) - set(exclude_services))
if include_only:
services = include_only
else:
services = list(set(all_services) - set(exclude_services))
self.log.debug('Waiting up to {}s for extended status on services: '
'{}'.format(timeout, services))
service_messages = {service: message for service in services}
self.d.sentry.wait_for_messages(service_messages, timeout=timeout)
self.log.info('OK')
def _get_openstack_release(self):
"""Get openstack release.
@ -165,7 +225,8 @@ class OpenStackAmuletDeployment(AmuletDeployment):
self.precise_havana, self.precise_icehouse,
self.trusty_icehouse, self.trusty_juno, self.utopic_juno,
self.trusty_kilo, self.vivid_kilo, self.trusty_liberty,
self.wily_liberty) = range(12)
self.wily_liberty, self.trusty_mitaka,
self.xenial_mitaka) = range(14)
releases = {
('precise', None): self.precise_essex,
@ -177,9 +238,11 @@ class OpenStackAmuletDeployment(AmuletDeployment):
('trusty', 'cloud:trusty-juno'): self.trusty_juno,
('trusty', 'cloud:trusty-kilo'): self.trusty_kilo,
('trusty', 'cloud:trusty-liberty'): self.trusty_liberty,
('trusty', 'cloud:trusty-mitaka'): self.trusty_mitaka,
('utopic', None): self.utopic_juno,
('vivid', None): self.vivid_kilo,
('wily', None): self.wily_liberty}
('wily', None): self.wily_liberty,
('xenial', None): self.xenial_mitaka}
return releases[(self.series, self.openstack)]
def _get_openstack_release_string(self):
@ -196,6 +259,7 @@ class OpenStackAmuletDeployment(AmuletDeployment):
('utopic', 'juno'),
('vivid', 'kilo'),
('wily', 'liberty'),
('xenial', 'mitaka'),
])
if self.openstack:
os_origin = self.openstack.split(':')[1]

View File

@ -18,6 +18,7 @@ import amulet
import json
import logging
import os
import re
import six
import time
import urllib
@ -605,6 +606,21 @@ class OpenStackAmuletUtils(AmuletUtils):
return None
# rabbitmq/amqp specific helpers:
def rmq_wait_for_cluster(self, deployment, init_sleep=15, timeout=1200):
"""Wait for rmq units extended status to show cluster readiness,
after an optional initial sleep period. Initial sleep is likely
necessary to be effective following a config change, as status
message may not instantly update to non-ready."""
if init_sleep:
time.sleep(init_sleep)
message = re.compile('^Unit is ready and clustered$')
deployment._auto_wait_for_status(message=message,
timeout=timeout,
include_only=['rabbitmq-server'])
def add_rmq_test_user(self, sentry_units,
username="testuser1", password="changeme"):
"""Add a test user via the first rmq juju unit, check connection as
@ -805,7 +821,10 @@ class OpenStackAmuletUtils(AmuletUtils):
if port:
config['ssl_port'] = port
deployment.configure('rabbitmq-server', config)
deployment.d.configure('rabbitmq-server', config)
# Wait for unit status
self.rmq_wait_for_cluster(deployment)
# Confirm
tries = 0
@ -832,7 +851,10 @@ class OpenStackAmuletUtils(AmuletUtils):
# Disable RMQ SSL
config = {'ssl': 'off'}
deployment.configure('rabbitmq-server', config)
deployment.d.configure('rabbitmq-server', config)
# Wait for unit status
self.rmq_wait_for_cluster(deployment)
# Confirm
tries = 0