Merge "Upgrade OSDs one at a time when changing ownership"

This commit is contained in:
Jenkins 2017-04-05 20:55:07 +00:00 committed by Gerrit Code Review
commit 957ea0ad69
35 changed files with 1180 additions and 201 deletions

View File

@ -4,3 +4,4 @@ include:
- contrib.amulet
- contrib.openstack.amulet
- core
- osplatform

View File

@ -26,8 +26,9 @@ import ceph
from charmhelpers.core import hookenv
from charmhelpers.core.hookenv import (
log,
ERROR,
DEBUG,
ERROR,
INFO,
config,
relation_ids,
related_units,
@ -100,16 +101,27 @@ def check_for_upgrade():
'distro')
log('new_version: {}'.format(new_version))
if old_version == new_version:
# May be in a previous upgrade that was failed if the directories
# still need an ownership update. Check this condition.
resuming_upgrade = ceph.dirs_need_ownership_update('osd')
if old_version == new_version and not resuming_upgrade:
log("No new ceph version detected, skipping upgrade.", DEBUG)
return
if (old_version in ceph.UPGRADE_PATHS and
new_version == ceph.UPGRADE_PATHS[old_version]):
if (ceph.UPGRADE_PATHS.get(old_version) == new_version) or\
resuming_upgrade:
if old_version == new_version:
log('Attempting to resume possibly failed upgrade.',
INFO)
else:
log("{} to {} is a valid upgrade path. Proceeding.".format(
old_version, new_version))
emit_cephconf(upgrading=True)
ceph.roll_osd_cluster(new_version=new_version,
upgrade_key='osd-upgrade')
emit_cephconf(upgrading=False)
else:
# Log a helpful error message
log("Invalid upgrade path from {} to {}. "
@ -215,7 +227,14 @@ def use_short_objects():
return False
def get_ceph_context():
def get_ceph_context(upgrading=False):
"""Returns the current context dictionary for generating ceph.conf
:param upgrading: bool - determines if the context is invoked as
part of an upgrade proedure Setting this to true
causes settings useful during an upgrade to be
defined in the ceph.conf file
"""
mon_hosts = get_mon_hosts()
log('Monitor hosts are ' + repr(mon_hosts))
@ -237,6 +256,7 @@ def get_ceph_context():
'loglevel': config('loglevel'),
'dio': str(config('use-direct-io')).lower(),
'short_object_len': use_short_objects(),
'upgrade_in_progress': upgrading,
}
if config('prefer-ipv6'):
@ -267,14 +287,15 @@ def get_ceph_context():
return cephcontext
def emit_cephconf():
def emit_cephconf(upgrading=False):
# Install ceph.conf as an alternative to support
# co-existence with other charms that write this file
charm_ceph_conf = "/var/lib/charm/{}/ceph.conf".format(service_name())
mkdir(os.path.dirname(charm_ceph_conf), owner=ceph.ceph_user(),
group=ceph.ceph_user())
with open(charm_ceph_conf, 'w') as cephconf:
cephconf.write(render_template('ceph.conf', get_ceph_context()))
context = get_ceph_context(upgrading)
cephconf.write(render_template('ceph.conf', context))
install_alternative('ceph.conf', '/etc/ceph/ceph.conf',
charm_ceph_conf, 90)

View File

@ -26,6 +26,7 @@ from charmhelpers.contrib.hardening.audits.file import (
DirectoryPermissionAudit,
NoReadWriteForOther,
TemplatedFile,
DeletedFile
)
from charmhelpers.contrib.hardening.audits.apache import DisabledModuleAudit
from charmhelpers.contrib.hardening.apache import TEMPLATES_DIR
@ -52,13 +53,13 @@ def get_audits():
'mods-available/alias.conf'),
context,
TEMPLATES_DIR,
mode=0o0755,
mode=0o0640,
user='root',
service_actions=[{'service': 'apache2',
'actions': ['restart']}]),
TemplatedFile(os.path.join(settings['common']['apache_dir'],
'conf-enabled/hardening.conf'),
'conf-enabled/99-hardening.conf'),
context,
TEMPLATES_DIR,
mode=0o0640,
@ -69,11 +70,13 @@ def get_audits():
DirectoryPermissionAudit(settings['common']['apache_dir'],
user='root',
group='root',
mode=0o640),
mode=0o0750),
DisabledModuleAudit(settings['hardening']['modules_to_disable']),
NoReadWriteForOther(settings['common']['apache_dir']),
DeletedFile(['/var/www/html/index.html'])
]
return audits
@ -94,5 +97,4 @@ class ApacheConfContext(object):
ctxt['apache_version'] = re.search(r'.+version: Apache/(.+?)\s.+',
out).group(1)
ctxt['apache_icondir'] = '/usr/share/apache2/icons/'
ctxt['traceenable'] = settings['hardening']['traceenable']
return ctxt

View File

@ -15,4 +15,18 @@
</LimitExcept>
</Location>
<Directory />
Options -Indexes -FollowSymLinks
AllowOverride None
</Directory>
<Directory /var/www/>
Options -Indexes -FollowSymLinks
AllowOverride None
</Directory>
TraceEnable {{ traceenable }}
ServerTokens {{ servertokens }}
SSLHonorCipherOrder {{ honor_cipher_order }}
SSLCipherSuite {{ cipher_suite }}

View File

@ -49,13 +49,6 @@ class BaseAudit(object): # NO-QA
# Invoke the callback if there is one.
if hasattr(self.unless, '__call__'):
results = self.unless()
if results:
return False
else:
return True
return not self.unless()
if self.unless:
return False
else:
return True
return not self.unless

View File

@ -11,3 +11,6 @@ hardening:
traceenable: 'off'
allowed_http_methods: "GET POST"
modules_to_disable: [ cgi, cgid ]
servertokens: 'Prod'
honor_cipher_order: 'on'
cipher_suite: 'ALL:+MEDIUM:+HIGH:!LOW:!MD5:!RC4:!eNULL:!aNULL:!3DES'

View File

@ -7,3 +7,6 @@ common:
hardening:
allowed_http_methods:
modules_to_disable:
servertokens:
honor_cipher_order:
cipher_suite:

View File

@ -58,6 +58,7 @@ security:
rsync
kernel_enable_module_loading: True # (type:boolean)
kernel_enable_core_dump: False # (type:boolean)
ssh_tmout: 300
sysctl:
kernel_secure_sysrq: 244 # 4 + 16 + 32 + 64 + 128

View File

@ -34,6 +34,7 @@ security:
packages_list:
kernel_enable_module_loading:
kernel_enable_core_dump:
ssh_tmout:
sysctl:
kernel_secure_sysrq:
kernel_enable_sysrq:

View File

@ -25,7 +25,6 @@ def get_audits():
audits = []
settings = utils.get_settings('os')
# If core dumps are not enabled, then don't allow core dumps to be
# created as they may contain sensitive information.
if not settings['security']['kernel_enable_core_dump']:
@ -33,11 +32,18 @@ def get_audits():
ProfileContext(),
template_dir=TEMPLATES_DIR,
mode=0o0755, user='root', group='root'))
if settings['security']['ssh_tmout']:
audits.append(TemplatedFile('/etc/profile.d/99-hardening.sh',
ProfileContext(),
template_dir=TEMPLATES_DIR,
mode=0o0644, user='root', group='root'))
return audits
class ProfileContext(object):
def __call__(self):
ctxt = {}
settings = utils.get_settings('os')
ctxt = {'ssh_tmout':
settings['security']['ssh_tmout']}
return ctxt

View File

@ -0,0 +1,5 @@
TMOUT={{ tmout }}
readonly TMOUT
export TMOUT
readonly HISTFILE

View File

@ -27,7 +27,10 @@ from charmhelpers.fetch import (
apt_install,
apt_update,
)
from charmhelpers.core.host import lsb_release
from charmhelpers.core.host import (
lsb_release,
CompareHostReleases,
)
from charmhelpers.contrib.hardening.audits.file import (
TemplatedFile,
FileContentAudit,
@ -68,7 +71,8 @@ class SSHConfigContext(object):
'weak': default + ',hmac-sha1'}
# Use newer ciphers on Ubuntu Trusty and above
if lsb_release()['DISTRIB_CODENAME'].lower() >= 'trusty':
_release = lsb_release()['DISTRIB_CODENAME'].lower()
if CompareHostReleases(_release) >= 'trusty':
log("Detected Ubuntu 14.04 or newer, using new macs", level=DEBUG)
macs = macs_66
@ -96,7 +100,8 @@ class SSHConfigContext(object):
'weak': weak}
# Use newer kex on Ubuntu Trusty and above
if lsb_release()['DISTRIB_CODENAME'].lower() >= 'trusty':
_release = lsb_release()['DISTRIB_CODENAME'].lower()
if CompareHostReleases(_release) >= 'trusty':
log('Detected Ubuntu 14.04 or newer, using new key exchange '
'algorithms', level=DEBUG)
kex = kex_66
@ -119,7 +124,8 @@ class SSHConfigContext(object):
'weak': default + ',aes256-cbc,aes192-cbc,aes128-cbc'}
# Use newer ciphers on ubuntu Trusty and above
if lsb_release()['DISTRIB_CODENAME'].lower() >= 'trusty':
_release = lsb_release()['DISTRIB_CODENAME'].lower()
if CompareHostReleases(_release) >= 'trusty':
log('Detected Ubuntu 14.04 or newer, using new ciphers',
level=DEBUG)
cipher = ciphers_66
@ -291,7 +297,8 @@ class SSHConfigFileContentAudit(FileContentAudit):
self.fail_cases = []
settings = utils.get_settings('ssh')
if lsb_release()['DISTRIB_CODENAME'].lower() >= 'trusty':
_release = lsb_release()['DISTRIB_CODENAME'].lower()
if CompareHostReleases(_release) >= 'trusty':
if not settings['server']['weak_hmac']:
self.pass_cases.append(r'^MACs.+,hmac-ripemd160$')
else:
@ -364,7 +371,8 @@ class SSHDConfigFileContentAudit(FileContentAudit):
self.fail_cases = []
settings = utils.get_settings('ssh')
if lsb_release()['DISTRIB_CODENAME'].lower() >= 'trusty':
_release = lsb_release()['DISTRIB_CODENAME'].lower()
if CompareHostReleases(_release) >= 'trusty':
if not settings['server']['weak_hmac']:
self.pass_cases.append(r'^MACs.+,hmac-ripemd160$')
else:

View File

@ -31,6 +31,7 @@ from charmhelpers.core.hookenv import (
from charmhelpers.core.host import (
lsb_release,
CompareHostReleases,
)
try:
@ -67,6 +68,24 @@ def no_ip_found_error_out(network):
raise ValueError(errmsg)
def _get_ipv6_network_from_address(address):
"""Get an netaddr.IPNetwork for the given IPv6 address
:param address: a dict as returned by netifaces.ifaddresses
:returns netaddr.IPNetwork: None if the address is a link local or loopback
address
"""
if address['addr'].startswith('fe80') or address['addr'] == "::1":
return None
prefix = address['netmask'].split("/")
if len(prefix) > 1:
netmask = prefix[1]
else:
netmask = address['netmask']
return netaddr.IPNetwork("%s/%s" % (address['addr'],
netmask))
def get_address_in_network(network, fallback=None, fatal=False):
"""Get an IPv4 or IPv6 address within the network from the host.
@ -100,10 +119,8 @@ def get_address_in_network(network, fallback=None, fatal=False):
if network.version == 6 and netifaces.AF_INET6 in addresses:
for addr in addresses[netifaces.AF_INET6]:
if not addr['addr'].startswith('fe80'):
cidr = netaddr.IPNetwork("%s/%s" % (addr['addr'],
addr['netmask']))
if cidr in network:
cidr = _get_ipv6_network_from_address(addr)
if cidr and cidr in network:
return str(cidr.ip)
if fallback is not None:
@ -180,9 +197,10 @@ def _get_for_address(address, key):
if address.version == 6 and netifaces.AF_INET6 in addresses:
for addr in addresses[netifaces.AF_INET6]:
if not addr['addr'].startswith('fe80'):
network = netaddr.IPNetwork("%s/%s" % (addr['addr'],
addr['netmask']))
network = _get_ipv6_network_from_address(addr)
if not network:
continue
cidr = network.cidr
if address in cidr:
if key == 'iface':
@ -191,7 +209,6 @@ def _get_for_address(address, key):
return str(cidr).split('/')[1]
else:
return addr[key]
return None
@ -521,7 +538,8 @@ def port_has_listener(address, port):
def assert_charm_supports_ipv6():
"""Check whether we are able to support charms ipv6."""
if lsb_release()['DISTRIB_CODENAME'].lower() < "trusty":
release = lsb_release()['DISTRIB_CODENAME'].lower()
if CompareHostReleases(release) < "trusty":
raise Exception("IPv6 is not supported in the charms for Ubuntu "
"versions less than Trusty 14.04")

View File

@ -59,6 +59,7 @@ from charmhelpers.core.host import (
write_file,
pwgen,
lsb_release,
CompareHostReleases,
)
from charmhelpers.contrib.hahelpers.cluster import (
determine_apache_port,
@ -155,7 +156,8 @@ class OSContextGenerator(object):
if self.missing_data:
self.complete = False
log('Missing required data: %s' % ' '.join(self.missing_data), level=INFO)
log('Missing required data: %s' % ' '.join(self.missing_data),
level=INFO)
else:
self.complete = True
return self.complete
@ -213,7 +215,8 @@ class SharedDBContext(OSContextGenerator):
hostname_key = "{}_hostname".format(self.relation_prefix)
else:
hostname_key = "hostname"
access_hostname = get_address_in_network(access_network,
access_hostname = get_address_in_network(
access_network,
unit_get('private-address'))
set_hostname = relation_get(attribute=hostname_key,
unit=local_unit())
@ -308,7 +311,10 @@ def db_ssl(rdata, ctxt, ssl_dir):
class IdentityServiceContext(OSContextGenerator):
def __init__(self, service=None, service_user=None, rel_name='identity-service'):
def __init__(self,
service=None,
service_user=None,
rel_name='identity-service'):
self.service = service
self.service_user = service_user
self.rel_name = rel_name
@ -457,19 +463,17 @@ class AMQPContext(OSContextGenerator):
host = format_ipv6_addr(host) or host
rabbitmq_hosts.append(host)
ctxt['rabbitmq_hosts'] = ','.join(sorted(rabbitmq_hosts))
rabbitmq_hosts = sorted(rabbitmq_hosts)
ctxt['rabbitmq_hosts'] = ','.join(rabbitmq_hosts)
transport_hosts = rabbitmq_hosts
if transport_hosts:
transport_url_hosts = ''
for host in transport_hosts:
if transport_url_hosts:
format_string = ",{}:{}@{}:{}"
else:
format_string = "{}:{}@{}:{}"
transport_url_hosts += format_string.format(
ctxt['rabbitmq_user'], ctxt['rabbitmq_password'],
host, rabbitmq_port)
transport_url_hosts = ','.join([
"{}:{}@{}:{}".format(ctxt['rabbitmq_user'],
ctxt['rabbitmq_password'],
host_,
rabbitmq_port)
for host_ in transport_hosts])
ctxt['transport_url'] = "rabbit://{}/{}".format(
transport_url_hosts, vhost)
@ -1601,7 +1605,8 @@ class MemcacheContext(OSContextGenerator):
if ctxt['use_memcache']:
# Trusty version of memcached does not support ::1 as a listen
# address so use host file entry instead
if lsb_release()['DISTRIB_CODENAME'].lower() > 'trusty':
release = lsb_release()['DISTRIB_CODENAME'].lower()
if CompareHostReleases(release) > 'trusty':
ctxt['memcache_server'] = '::1'
else:
ctxt['memcache_server'] = 'ip6-localhost'

View File

@ -23,7 +23,10 @@ from charmhelpers.core.hookenv import (
ERROR,
)
from charmhelpers.contrib.openstack.utils import os_release
from charmhelpers.contrib.openstack.utils import (
os_release,
CompareOpenStackReleases,
)
def headers_package():
@ -198,7 +201,8 @@ def neutron_plugins():
},
'plumgrid': {
'config': '/etc/neutron/plugins/plumgrid/plumgrid.ini',
'driver': 'neutron.plugins.plumgrid.plumgrid_plugin.plumgrid_plugin.NeutronPluginPLUMgridV2',
'driver': ('neutron.plugins.plumgrid.plumgrid_plugin'
'.plumgrid_plugin.NeutronPluginPLUMgridV2'),
'contexts': [
context.SharedDBContext(user=config('database-user'),
database=config('database'),
@ -225,7 +229,7 @@ def neutron_plugins():
'server_services': ['neutron-server']
}
}
if release >= 'icehouse':
if CompareOpenStackReleases(release) >= 'icehouse':
# NOTE: patch in ml2 plugin for icehouse onwards
plugins['ovs']['config'] = '/etc/neutron/plugins/ml2/ml2_conf.ini'
plugins['ovs']['driver'] = 'neutron.plugins.ml2.plugin.Ml2Plugin'
@ -233,10 +237,10 @@ def neutron_plugins():
'neutron-plugin-ml2']
# NOTE: patch in vmware renames nvp->nsx for icehouse onwards
plugins['nvp'] = plugins['nsx']
if release >= 'kilo':
if CompareOpenStackReleases(release) >= 'kilo':
plugins['midonet']['driver'] = (
'neutron.plugins.midonet.plugin.MidonetPluginV2')
if release >= 'liberty':
if CompareOpenStackReleases(release) >= 'liberty':
plugins['midonet']['driver'] = (
'midonet.neutron.plugin_v1.MidonetPluginV2')
plugins['midonet']['server_packages'].remove(
@ -244,10 +248,11 @@ def neutron_plugins():
plugins['midonet']['server_packages'].append(
'python-networking-midonet')
plugins['plumgrid']['driver'] = (
'networking_plumgrid.neutron.plugins.plugin.NeutronPluginPLUMgridV2')
'networking_plumgrid.neutron.plugins'
'.plugin.NeutronPluginPLUMgridV2')
plugins['plumgrid']['server_packages'].remove(
'neutron-plugin-plumgrid')
if release >= 'mitaka':
if CompareOpenStackReleases(release) >= 'mitaka':
plugins['nsx']['server_packages'].remove('neutron-plugin-vmware')
plugins['nsx']['server_packages'].append('python-vmware-nsx')
plugins['nsx']['config'] = '/etc/neutron/nsx.ini'

View File

@ -33,9 +33,7 @@ import yaml
from charmhelpers.contrib.network import ip
from charmhelpers.core import (
unitdata,
)
from charmhelpers.core import unitdata
from charmhelpers.core.hookenv import (
action_fail,
@ -55,6 +53,8 @@ from charmhelpers.core.hookenv import (
application_version_set,
)
from charmhelpers.core.strutils import BasicStringComparator
from charmhelpers.contrib.storage.linux.lvm import (
deactivate_lvm_volume_group,
is_lvm_physical_volume,
@ -97,6 +97,22 @@ CLOUD_ARCHIVE_KEY_ID = '5EDB1B62EC4926EA'
DISTRO_PROPOSED = ('deb http://archive.ubuntu.com/ubuntu/ %s-proposed '
'restricted main multiverse universe')
OPENSTACK_RELEASES = (
'diablo',
'essex',
'folsom',
'grizzly',
'havana',
'icehouse',
'juno',
'kilo',
'liberty',
'mitaka',
'newton',
'ocata',
'pike',
)
UBUNTU_OPENSTACK_RELEASE = OrderedDict([
('oneiric', 'diablo'),
('precise', 'essex'),
@ -238,6 +254,17 @@ GIT_DEFAULT_BRANCHES = {
DEFAULT_LOOPBACK_SIZE = '5G'
class CompareOpenStackReleases(BasicStringComparator):
"""Provide comparisons of OpenStack releases.
Use in the form of
if CompareOpenStackReleases(release) > 'mitaka':
# do something with mitaka
"""
_list = OPENSTACK_RELEASES
def error_out(msg):
juju_log("FATAL ERROR: %s" % msg, level='ERROR')
sys.exit(1)
@ -1066,7 +1093,8 @@ def git_generate_systemd_init_files(templates_dir):
shutil.copyfile(init_in_source, init_source)
with open(init_source, 'a') as outfile:
template = '/usr/share/openstack-pkg-tools/init-script-template'
template = ('/usr/share/openstack-pkg-tools/'
'init-script-template')
with open(template) as infile:
outfile.write('\n\n{}'.format(infile.read()))
@ -1971,9 +1999,7 @@ def enable_memcache(source=None, release=None, package=None):
if not _release:
_release = get_os_codename_install_source(source)
# TODO: this should be changed to a numeric comparison using a known list
# of releases and comparing by index.
return _release >= 'mitaka'
return CompareOpenStackReleases(_release) >= 'mitaka'
def token_cache_pkgs(source=None, release=None):

View File

@ -45,6 +45,7 @@ if __platform__ == "ubuntu":
add_new_group,
lsb_release,
cmp_pkgrevno,
CompareHostReleases,
) # flake8: noqa -- ignore F401 for this import
elif __platform__ == "centos":
from charmhelpers.core.host_factory.centos import (
@ -52,6 +53,7 @@ elif __platform__ == "centos":
add_new_group,
lsb_release,
cmp_pkgrevno,
CompareHostReleases,
) # flake8: noqa -- ignore F401 for this import
UPDATEDB_PATH = '/etc/updatedb.conf'

View File

@ -2,6 +2,22 @@ import subprocess
import yum
import os
from charmhelpers.core.strutils import BasicStringComparator
class CompareHostReleases(BasicStringComparator):
"""Provide comparisons of Host releases.
Use in the form of
if CompareHostReleases(release) > 'trusty':
# do something with mitaka
"""
def __init__(self, item):
raise NotImplementedError(
"CompareHostReleases() is not implemented for CentOS")
def service_available(service_name):
# """Determine whether a system service is available."""

View File

@ -1,5 +1,37 @@
import subprocess
from charmhelpers.core.strutils import BasicStringComparator
UBUNTU_RELEASES = (
'lucid',
'maverick',
'natty',
'oneiric',
'precise',
'quantal',
'raring',
'saucy',
'trusty',
'utopic',
'vivid',
'wily',
'xenial',
'yakkety',
'zesty',
)
class CompareHostReleases(BasicStringComparator):
"""Provide comparisons of Ubuntu releases.
Use in the form of
if CompareHostReleases(release) > 'trusty':
# do something with mitaka
"""
_list = UBUNTU_RELEASES
def service_available(service_name):
"""Determine whether a system service is available"""

View File

@ -68,3 +68,56 @@ def bytes_from_string(value):
msg = "Unable to interpret string value '%s' as bytes" % (value)
raise ValueError(msg)
return int(matches.group(1)) * (1024 ** BYTE_POWER[matches.group(2)])
class BasicStringComparator(object):
"""Provides a class that will compare strings from an iterator type object.
Used to provide > and < comparisons on strings that may not necessarily be
alphanumerically ordered. e.g. OpenStack or Ubuntu releases AFTER the
z-wrap.
"""
_list = None
def __init__(self, item):
if self._list is None:
raise Exception("Must define the _list in the class definition!")
try:
self.index = self._list.index(item)
except Exception:
raise KeyError("Item '{}' is not in list '{}'"
.format(item, self._list))
def __eq__(self, other):
assert isinstance(other, str) or isinstance(other, self.__class__)
return self.index == self._list.index(other)
def __ne__(self, other):
return not self.__eq__(other)
def __lt__(self, other):
assert isinstance(other, str) or isinstance(other, self.__class__)
return self.index < self._list.index(other)
def __ge__(self, other):
return not self.__lt__(other)
def __gt__(self, other):
assert isinstance(other, str) or isinstance(other, self.__class__)
return self.index > self._list.index(other)
def __le__(self, other):
return not self.__gt__(other)
def __str__(self):
"""Always give back the item at the index so it can be used in
comparisons like:
s_mitaka = CompareOpenStack('mitaka')
s_newton = CompareOpenstack('newton')
assert s_newton > s_mitaka
@returns: <string>
"""
return self._list[self.index]

View File

@ -25,20 +25,28 @@ import errno
import shutil
import pyudev
from datetime import datetime
from charmhelpers.core import hookenv
from charmhelpers.core import templating
from charmhelpers.core.host import (
mkdir,
chownr,
service_restart,
cmp_pkgrevno,
lsb_release,
cmp_pkgrevno, service_stop, mounts, service_start)
mkdir,
mounts,
owner,
service_restart,
service_start,
service_stop)
from charmhelpers.core.hookenv import (
log,
ERROR,
cached,
config,
log,
status_set,
WARNING, DEBUG, config)
from charmhelpers.core.services import render_template
DEBUG,
ERROR,
WARNING)
from charmhelpers.fetch import (
apt_cache,
add_source, apt_install, apt_update)
@ -54,6 +62,12 @@ from charmhelpers.contrib.storage.linux.utils import (
from charmhelpers.contrib.openstack.utils import (
get_os_codename_install_source)
from ceph.ceph_helpers import check_output
CEPH_BASE_DIR = os.path.join(os.sep, 'var', 'lib', 'ceph')
OSD_BASE_DIR = os.path.join(CEPH_BASE_DIR, 'osd')
HDPARM_FILE = os.path.join(os.sep, 'etc', 'hdparm.conf')
LEADER = 'leader'
PEON = 'peon'
QUORUM = [LEADER, PEON]
@ -167,7 +181,7 @@ def tune_nic(network_interface):
try:
# Apply the settings
log("Applying sysctl settings", level=DEBUG)
subprocess.check_output(["sysctl", "-p", sysctl_file])
check_output(["sysctl", "-p", sysctl_file])
except subprocess.CalledProcessError as err:
log('sysctl -p {} failed with error {}'.format(sysctl_file,
err.output),
@ -218,14 +232,21 @@ def persist_settings(settings_dict):
The settings_dict should be in the form of {"uuid": {"key":"value"}}
:param settings_dict: dict of settings to save
"""
hdparm_path = os.path.join(os.sep, 'etc', 'hdparm.conf')
if not settings_dict:
return
try:
with open(hdparm_path, 'w') as hdparm:
hdparm.write(render_template('hdparm.conf', settings_dict))
templating.render(source='hdparm.conf', target=HDPARM_FILE,
context=settings_dict)
except IOError as err:
log("Unable to open {path} because of error: {error}".format(
path=hdparm_path,
error=err.message), level=ERROR)
path=HDPARM_FILE, error=err.message), level=ERROR)
except Exception as e:
# The templating.render can raise a jinja2 exception if the
# template is not found. Rather than polluting the import
# space of this charm, simply catch Exception
log('Unable to render {path} due to error: {error}'.format(
path=HDPARM_FILE, error=e.message), level=ERROR)
def set_max_sectors_kb(dev_name, max_sectors_size):
@ -299,7 +320,7 @@ def set_hdd_read_ahead(dev_name, read_ahead_sectors=256):
log('Setting read ahead to {} for device {}'.format(
read_ahead_sectors,
dev_name))
subprocess.check_output(['hdparm',
check_output(['hdparm',
'-a{}'.format(read_ahead_sectors),
dev_name])
except subprocess.CalledProcessError as e:
@ -314,7 +335,7 @@ def get_block_uuid(block_dev):
:return: The UUID of the device or None on Error.
"""
try:
block_info = subprocess.check_output(
block_info = check_output(
['blkid', '-o', 'export', block_dev])
for tag in block_info.split('\n'):
parts = tag.split('=')
@ -390,6 +411,7 @@ def tune_dev(block_dev):
if uuid is None:
log('block device {} uuid is None. Unable to save to '
'hdparm.conf'.format(block_dev), level=DEBUG)
return
save_settings_dict = {}
log('Tuning device {}'.format(block_dev))
status_set('maintenance', 'Tuning device {}'.format(block_dev))
@ -455,6 +477,33 @@ class CrushLocation(object):
return self.name < other.name
def get_osd_weight(osd_id):
"""
Returns the weight of the specified OSD
:return: Float :raise: ValueError if the monmap fails to parse.
Also raises CalledProcessError if our ceph command fails
"""
try:
tree = check_output(
['ceph', 'osd', 'tree', '--format=json'])
try:
json_tree = json.loads(tree)
# Make sure children are present in the json
if not json_tree['nodes']:
return None
for device in json_tree['nodes']:
if device['type'] == 'osd' and device['name'] == osd_id:
return device['crush_weight']
except ValueError as v:
log("Unable to parse ceph tree json: {}. Error: {}".format(
tree, v.message))
raise
except subprocess.CalledProcessError as e:
log("ceph osd tree command failed with message: {}".format(
e.message))
raise
def get_osd_tree(service):
"""
Returns the current osd map in JSON.
@ -462,7 +511,7 @@ def get_osd_tree(service):
Also raises CalledProcessError if our ceph command fails
"""
try:
tree = subprocess.check_output(
tree = check_output(
['ceph', '--id', service,
'osd', 'tree', '--format=json'])
try:
@ -497,6 +546,43 @@ def get_osd_tree(service):
raise
def _get_child_dirs(path):
"""Returns a list of directory names in the specified path.
:param path: a full path listing of the parent directory to return child
directory names
:return: list. A list of child directories under the parent directory
:raises: ValueError if the specified path does not exist or is not a
directory,
OSError if an error occurs reading the directory listing
"""
if not os.path.exists(path):
raise ValueError('Specfied path "%s" does not exist' % path)
if not os.path.isdir(path):
raise ValueError('Specified path "%s" is not a directory' % path)
files_in_dir = [os.path.join(path, f) for f in os.listdir(path)]
return list(filter(os.path.isdir, files_in_dir))
def _get_osd_num_from_dirname(dirname):
"""Parses the dirname and returns the OSD id.
Parses a string in the form of 'ceph-{osd#}' and returns the osd number
from the directory name.
:param dirname: the directory name to return the OSD number from
:return int: the osd number the directory name corresponds to
:raises ValueError: if the osd number cannot be parsed from the provided
directory name.
"""
match = re.search('ceph-(?P<osd_id>\d+)', dirname)
if not match:
raise ValueError("dirname not in correct format: %s" % dirname)
return match.group('osd_id')
def get_local_osd_ids():
"""
This will list the /var/lib/ceph/osd/* directories and try
@ -602,7 +688,7 @@ def is_quorum():
]
if os.path.exists(asok):
try:
result = json.loads(subprocess.check_output(cmd))
result = json.loads(check_output(cmd))
except subprocess.CalledProcessError:
return False
except ValueError:
@ -629,7 +715,7 @@ def is_leader():
]
if os.path.exists(asok):
try:
result = json.loads(subprocess.check_output(cmd))
result = json.loads(check_output(cmd))
except subprocess.CalledProcessError:
return False
except ValueError:
@ -736,7 +822,7 @@ def replace_osd(dead_osd_number,
# Drop this osd out of the cluster. This will begin a
# rebalance operation
status_set('maintenance', 'Removing osd {}'.format(dead_osd_number))
subprocess.check_output([
check_output([
'ceph',
'--id',
'osd-upgrade',
@ -747,8 +833,8 @@ def replace_osd(dead_osd_number,
if systemd():
service_stop('ceph-osd@{}'.format(dead_osd_number))
else:
subprocess.check_output(['stop', 'ceph-osd', 'id={}'.format(
dead_osd_number)]),
check_output(['stop', 'ceph-osd', 'id={}'.format(
dead_osd_number)])
# umount if still mounted
ret = umount(mount_point)
if ret < 0:
@ -756,20 +842,20 @@ def replace_osd(dead_osd_number,
mount_point, os.strerror(ret)))
# Clean up the old mount point
shutil.rmtree(mount_point)
subprocess.check_output([
check_output([
'ceph',
'--id',
'osd-upgrade',
'osd', 'crush', 'remove',
'osd.{}'.format(dead_osd_number)])
# Revoke the OSDs access keys
subprocess.check_output([
check_output([
'ceph',
'--id',
'osd-upgrade',
'auth', 'del',
'osd.{}'.format(dead_osd_number)])
subprocess.check_output([
check_output([
'ceph',
'--id',
'osd-upgrade',
@ -788,7 +874,7 @@ def replace_osd(dead_osd_number,
def is_osd_disk(dev):
try:
info = subprocess.check_output(['sgdisk', '-i', '1', dev])
info = check_output(['sgdisk', '-i', '1', dev])
info = info.split("\n") # IGNORE:E1103
for line in info:
for ptype in CEPH_PARTITIONS:
@ -869,7 +955,7 @@ def generate_monitor_secret():
'--name=mon.',
'--gen-key'
]
res = subprocess.check_output(cmd)
res = check_output(cmd)
return "{}==".format(res.split('=')[1].strip())
@ -901,7 +987,7 @@ def parse_key(raw_key):
else:
for element in raw_key.splitlines():
if 'key' in element:
key = element.split(' = ')[1].strip() # IGNORE:E1103
return element.split(' = ')[1].strip() # IGNORE:E1103
return key
@ -1017,8 +1103,8 @@ def create_named_keyring(entity, name, caps=None):
]
for subsystem, subcaps in caps.items():
cmd.extend([subsystem, '; '.join(subcaps)])
log("Calling subprocess.check_output: {}".format(cmd), level=DEBUG)
return parse_key(subprocess.check_output(cmd).strip()) # IGNORE:E1103
log("Calling check_output: {}".format(cmd), level=DEBUG)
return parse_key(check_output(cmd).strip()) # IGNORE:E1103
def get_upgrade_key():
@ -1033,6 +1119,26 @@ def get_named_key(name, caps=None, pool_list=None):
:param caps: dict of cephx capabilities
:return: Returns a cephx key
"""
try:
# Does the key already exist?
output = check_output(
[
'sudo',
'-u', ceph_user(),
'ceph',
'--name', 'mon.',
'--keyring',
'/var/lib/ceph/mon/ceph-{}/keyring'.format(
socket.gethostname()
),
'auth',
'get',
'client.{}'.format(name),
]).strip()
return parse_key(output)
except subprocess.CalledProcessError:
# Couldn't get the key, time to create it!
log("Creating new key for {}".format(name), level=DEBUG)
caps = caps or _default_caps
cmd = [
"sudo",
@ -1055,8 +1161,8 @@ def get_named_key(name, caps=None, pool_list=None):
pools = " ".join(['pool={0}'.format(i) for i in pool_list])
subcaps[0] = subcaps[0] + " " + pools
cmd.extend([subsystem, '; '.join(subcaps)])
log("Calling subprocess.check_output: {}".format(cmd), level=DEBUG)
return parse_key(subprocess.check_output(cmd).strip()) # IGNORE:E1103
log("Calling check_output: {}".format(cmd), level=DEBUG)
return parse_key(check_output(cmd).strip()) # IGNORE:E1103
def upgrade_key_caps(key, caps):
@ -1148,7 +1254,7 @@ def maybe_zap_journal(journal_dev):
def get_partitions(dev):
cmd = ['partx', '--raw', '--noheadings', dev]
try:
out = subprocess.check_output(cmd).splitlines()
out = check_output(cmd).splitlines()
log("get partitions: {}".format(out), level=DEBUG)
return out
except subprocess.CalledProcessError as e:
@ -1216,12 +1322,12 @@ def osdize_dev(dev, osd_format, osd_journal, reformat_osd=False,
try:
log("osdize cmd: {}".format(cmd))
subprocess.check_call(cmd)
except subprocess.CalledProcessError as e:
except subprocess.CalledProcessError:
if ignore_errors:
log('Unable to initialize device: {}'.format(dev), WARNING)
else:
log('Unable to initialize device: {}'.format(dev), ERROR)
raise e
raise
def osdize_dir(path, encrypt=False):
@ -1258,7 +1364,7 @@ def get_running_osds():
"""Returns a list of the pids of the current running OSD daemons"""
cmd = ['pgrep', 'ceph-osd']
try:
result = subprocess.check_output(cmd)
result = check_output(cmd)
return result.split()
except subprocess.CalledProcessError:
return []
@ -1274,7 +1380,7 @@ def get_cephfs(service):
# This command wasn't introduced until 0.86 ceph
return []
try:
output = subprocess.check_output(["ceph",
output = check_output(["ceph",
'--id', service,
"fs", "ls"])
if not output:
@ -1403,10 +1509,17 @@ def upgrade_monitor(new_version):
service_stop('ceph-mon-all')
apt_install(packages=PACKAGES, fatal=True)
# Ensure the files and directories under /var/lib/ceph is chowned
# properly as part of the move to the Jewel release, which moved the
# ceph daemons to running as ceph:ceph instead of root:root.
if new_version == 'jewel':
# Ensure the ownership of Ceph's directories is correct
owner = ceph_user()
chownr(path=os.path.join(os.sep, "var", "lib", "ceph"),
owner=ceph_user(),
group=ceph_user())
owner=owner,
group=owner,
follow_links=True)
if systemd():
for mon_id in get_local_mon_ids():
service_start('ceph-mon@{}'.format(mon_id))
@ -1447,6 +1560,7 @@ def lock_and_roll(upgrade_key, service, my_name, version):
my_name,
version,
stop_timestamp))
status_set('maintenance', 'Finishing upgrade')
monitor_key_set(upgrade_key, "{}_{}_{}_done".format(service,
my_name,
version),
@ -1569,34 +1683,198 @@ def upgrade_osd(new_version):
add_source(config('source'), config('key'))
apt_update(fatal=True)
except subprocess.CalledProcessError as err:
log("Adding the ceph source failed with message: {}".format(
log("Adding the ceph sources failed with message: {}".format(
err.message))
status_set("blocked", "Upgrade to {} failed".format(new_version))
sys.exit(1)
try:
if systemd():
for osd_id in get_local_osd_ids():
service_stop('ceph-osd@{}'.format(osd_id))
else:
service_stop('ceph-osd-all')
# Upgrade the packages before restarting the daemons.
status_set('maintenance', 'Upgrading packages to %s' % new_version)
apt_install(packages=PACKAGES, fatal=True)
# Ensure the ownership of Ceph's directories is correct
chownr(path=os.path.join(os.sep, "var", "lib", "ceph"),
owner=ceph_user(),
group=ceph_user())
if systemd():
for osd_id in get_local_osd_ids():
service_start('ceph-osd@{}'.format(osd_id))
else:
service_start('ceph-osd-all')
except subprocess.CalledProcessError as err:
# If the upgrade does not need an ownership update of any of the
# directories in the osd service directory, then simply restart
# all of the OSDs at the same time as this will be the fastest
# way to update the code on the node.
if not dirs_need_ownership_update('osd'):
log('Restarting all OSDs to load new binaries', DEBUG)
service_restart('ceph-osd-all')
return
# Need to change the ownership of all directories which are not OSD
# directories as well.
# TODO - this should probably be moved to the general upgrade function
# and done before mon/osd.
update_owner(CEPH_BASE_DIR, recurse_dirs=False)
non_osd_dirs = filter(lambda x: not x == 'osd',
os.listdir(CEPH_BASE_DIR))
non_osd_dirs = map(lambda x: os.path.join(CEPH_BASE_DIR, x),
non_osd_dirs)
for path in non_osd_dirs:
update_owner(path)
# Fast service restart wasn't an option because each of the OSD
# directories need the ownership updated for all the files on
# the OSD. Walk through the OSDs one-by-one upgrading the OSD.
for osd_dir in _get_child_dirs(OSD_BASE_DIR):
try:
osd_num = _get_osd_num_from_dirname(osd_dir)
_upgrade_single_osd(osd_num, osd_dir)
except ValueError as ex:
# Directory could not be parsed - junk directory?
log('Could not parse osd directory %s: %s' % (osd_dir, ex),
WARNING)
continue
except (subprocess.CalledProcessError, IOError) as err:
log("Stopping ceph and upgrading packages failed "
"with message: {}".format(err.message))
status_set("blocked", "Upgrade to {} failed".format(new_version))
sys.exit(1)
def _upgrade_single_osd(osd_num, osd_dir):
"""Upgrades the single OSD directory.
:param osd_num: the num of the OSD
:param osd_dir: the directory of the OSD to upgrade
:raises CalledProcessError: if an error occurs in a command issued as part
of the upgrade process
:raises IOError: if an error occurs reading/writing to a file as part
of the upgrade process
"""
stop_osd(osd_num)
disable_osd(osd_num)
update_owner(osd_dir)
enable_osd(osd_num)
start_osd(osd_num)
def stop_osd(osd_num):
"""Stops the specified OSD number.
:param osd_num: the osd number to stop
"""
if systemd():
service_stop('ceph-osd@{}'.format(osd_num))
else:
service_stop('ceph-osd', id=osd_num)
def start_osd(osd_num):
"""Starts the specified OSD number.
:param osd_num: the osd number to start.
"""
if systemd():
service_start('ceph-osd@{}'.format(osd_num))
else:
service_start('ceph-osd', id=osd_num)
def disable_osd(osd_num):
"""Disables the specified OSD number.
Ensures that the specified osd will not be automatically started at the
next reboot of the system. Due to differences between init systems,
this method cannot make any guarantees that the specified osd cannot be
started manually.
:param osd_num: the osd id which should be disabled.
:raises CalledProcessError: if an error occurs invoking the systemd cmd
to disable the OSD
:raises IOError, OSError: if the attempt to read/remove the ready file in
an upstart enabled system fails
"""
if systemd():
# When running under systemd, the individual ceph-osd daemons run as
# templated units and can be directly addressed by referring to the
# templated service name ceph-osd@<osd_num>. Additionally, systemd
# allows one to disable a specific templated unit by running the
# 'systemctl disable ceph-osd@<osd_num>' command. When disabled, the
# OSD should remain disabled until re-enabled via systemd.
# Note: disabling an already disabled service in systemd returns 0, so
# no need to check whether it is enabled or not.
cmd = ['systemctl', 'disable', 'ceph-osd@{}'.format(osd_num)]
subprocess.check_call(cmd)
else:
# Neither upstart nor the ceph-osd upstart script provides for
# disabling the starting of an OSD automatically. The specific OSD
# cannot be prevented from running manually, however it can be
# prevented from running automatically on reboot by removing the
# 'ready' file in the OSD's root directory. This is due to the
# ceph-osd-all upstart script checking for the presence of this file
# before starting the OSD.
ready_file = os.path.join(OSD_BASE_DIR, 'ceph-{}'.format(osd_num),
'ready')
if os.path.exists(ready_file):
os.unlink(ready_file)
def enable_osd(osd_num):
"""Enables the specified OSD number.
Ensures that the specified osd_num will be enabled and ready to start
automatically in the event of a reboot.
:param osd_num: the osd id which should be enabled.
:raises CalledProcessError: if the call to the systemd command issued
fails when enabling the service
:raises IOError: if the attempt to write the ready file in an usptart
enabled system fails
"""
if systemd():
cmd = ['systemctl', 'enable', 'ceph-osd@{}'.format(osd_num)]
subprocess.check_call(cmd)
else:
# When running on upstart, the OSDs are started via the ceph-osd-all
# upstart script which will only start the osd if it has a 'ready'
# file. Make sure that file exists.
ready_file = os.path.join(OSD_BASE_DIR, 'ceph-{}'.format(osd_num),
'ready')
with open(ready_file, 'w') as f:
f.write('ready')
# Make sure the correct user owns the file. It shouldn't be necessary
# as the upstart script should run with root privileges, but its better
# to have all the files matching ownership.
update_owner(ready_file)
def update_owner(path, recurse_dirs=True):
"""Changes the ownership of the specified path.
Changes the ownership of the specified path to the new ceph daemon user
using the system's native chown functionality. This may take awhile,
so this method will issue a set_status for any changes of ownership which
recurses into directory structures.
:param path: the path to recursively change ownership for
:param recurse_dirs: boolean indicating whether to recursively change the
ownership of all the files in a path's subtree or to
simply change the ownership of the path.
:raises CalledProcessError: if an error occurs issuing the chown system
command
"""
user = ceph_user()
user_group = '{ceph_user}:{ceph_user}'.format(ceph_user=user)
cmd = ['chown', user_group, path]
if os.path.isdir(path) and recurse_dirs:
status_set('maintenance', ('Updating ownership of %s to %s' %
(path, user)))
cmd.insert(1, '-R')
log('Changing ownership of {path} to {user}'.format(
path=path, user=user_group), DEBUG)
start = datetime.now()
subprocess.check_call(cmd)
elapsed_time = (datetime.now() - start)
log('Took {secs} seconds to change the ownership of path: {path}'.format(
secs=elapsed_time.total_seconds(), path=path), DEBUG)
def list_pools(service):
"""
This will list the current pools that Ceph has
@ -1607,7 +1885,7 @@ def list_pools(service):
"""
try:
pool_list = []
pools = subprocess.check_output(['rados', '--id', service, 'lspools'])
pools = check_output(['rados', '--id', service, 'lspools'])
for pool in pools.splitlines():
pool_list.append(pool)
return pool_list
@ -1616,6 +1894,36 @@ def list_pools(service):
raise
def dirs_need_ownership_update(service):
"""Determines if directories still need change of ownership.
Examines the set of directories under the /var/lib/ceph/{service} directory
and determines if they have the correct ownership or not. This is
necessary due to the upgrade from Hammer to Jewel where the daemon user
changes from root: to ceph:.
:param service: the name of the service folder to check (e.g. osd, mon)
:return: boolean. True if the directories need a change of ownership,
False otherwise.
:raises IOError: if an error occurs reading the file stats from one of
the child directories.
:raises OSError: if the specified path does not exist or some other error
"""
expected_owner = expected_group = ceph_user()
path = os.path.join(CEPH_BASE_DIR, service)
for child in _get_child_dirs(path):
curr_owner, curr_group = owner(child)
if (curr_owner == expected_owner) and (curr_group == expected_group):
continue
log('Directory "%s" needs its ownership updated' % child, DEBUG)
return True
# All child directories had the expected ownership
return False
# A dict of valid ceph upgrade paths. Mapping is old -> new
UPGRADE_PATHS = {
'firefly': 'hammer',

View File

@ -28,12 +28,14 @@ from ceph import (
get_cephfs,
get_osd_weight
)
from ceph_helpers import Crushmap
from ceph.ceph_helpers import Crushmap
from charmhelpers.contrib.storage.linux.ceph import (
create_erasure_profile,
delete_pool,
erasure_profile_exists,
get_osds,
monitor_key_get,
monitor_key_set,
pool_exists,
pool_set,
remove_pool_snapshot,
@ -49,7 +51,7 @@ from charmhelpers.contrib.storage.linux.ceph import (
# This comes from http://docs.ceph.com/docs/master/rados/operations/pools/
# This should do a decent job of preventing people from passing in bad values.
# It will give a useful error message
from subprocess import check_output, CalledProcessError
from subprocess import check_call, check_output, CalledProcessError
POOL_KEYS = {
# "Ceph Key Name": [Python type, [Valid Range]]
@ -157,11 +159,192 @@ def handle_create_erasure_profile(request, service):
data_chunks=k, coding_chunks=m, locality=l)
def handle_add_permissions_to_key(request, service):
"""
Groups are defined by the key cephx.groups.(namespace-)?-(name). This key
will contain a dict serialized to JSON with data about the group, including
pools and members.
A group can optionally have a namespace defined that will be used to
further restrict pool access.
"""
service_name = request.get('name')
group_name = request.get('group')
group_namespace = request.get('group-namespace')
if group_namespace:
group_name = "{}-{}".format(group_namespace, group_name)
group = get_group(group_name=group_name)
service_obj = get_service_groups(service=service_name,
namespace=group_namespace)
format("Service object: {}".format(service_obj))
permission = request.get('group-permission') or "rwx"
if service_name not in group['services']:
group['services'].append(service_name)
save_group(group=group, group_name=group_name)
if permission not in service_obj['group_names']:
service_obj['group_names'][permission] = []
if group_name not in service_obj['group_names'][permission]:
service_obj['group_names'][permission].append(group_name)
save_service(service=service_obj, service_name=service_name)
service_obj['groups'] = _build_service_groups(service_obj,
group_namespace)
update_service_permissions(service_name, service_obj, group_namespace)
def update_service_permissions(service, service_obj=None, namespace=None):
"""Update the key permissions for the named client in Ceph"""
if not service_obj:
service_obj = get_service_groups(service=service, namespace=namespace)
permissions = pool_permission_list_for_service(service_obj)
call = ['ceph', 'auth', 'caps', 'client.{}'.format(service)] + permissions
try:
check_call(call)
except CalledProcessError as e:
log("Error updating key capabilities: {}".format(e))
def add_pool_to_group(pool, group, namespace=None):
"""Add a named pool to a named group"""
group_name = group
if namespace:
group_name = "{}-{}".format(namespace, group_name)
group = get_group(group_name=group_name)
if pool not in group['pools']:
group["pools"].append(pool)
save_group(group, group_name=group_name)
for service in group['services']:
update_service_permissions(service, namespace=namespace)
def pool_permission_list_for_service(service):
"""Build the permission string for Ceph for a given service"""
permissions = []
permission_types = {}
for permission, group in service["group_names"].items():
if permission not in permission_types:
permission_types[permission] = []
for item in group:
permission_types[permission].append(item)
for permission, groups in permission_types.items():
permission = "allow {}".format(permission)
for group in groups:
for pool in service['groups'][group]['pools']:
permissions.append("{} pool={}".format(permission, pool))
return ["mon", "allow r", "osd", ', '.join(permissions)]
def get_service_groups(service, namespace=None):
"""
Services are objects stored with some metadata, they look like (for a
service named "nova"):
{
group_names: {'rwx': ['images']},
groups: {}
}
After populating the group, it looks like:
{
group_names: {'rwx': ['images']},
groups: {
'images': {
pools: ['glance'],
services: ['nova']
}
}
}
"""
service_json = monitor_key_get(service='admin',
key="cephx.services.{}".format(service))
try:
service = json.loads(service_json)
except TypeError:
service = None
except ValueError:
service = None
if service:
service['groups'] = _build_service_groups(service, namespace)
else:
service = {'group_names': {}, 'groups': {}}
return service
def _build_service_groups(service, namespace=None):
'''Rebuild the 'groups' dict for a service group
:returns: dict: dictionary keyed by group name of the following
format:
{
'images': {
pools: ['glance'],
services: ['nova', 'glance]
},
'vms':{
pools: ['nova'],
services: ['nova']
}
}
'''
all_groups = {}
for _, groups in service['group_names'].items():
for group in groups:
name = group
if namespace:
name = "{}-{}".format(namespace, name)
all_groups[group] = get_group(group_name=name)
return all_groups
def get_group(group_name):
"""
A group is a structure to hold data about a named group, structured as:
{
pools: ['glance'],
services: ['nova']
}
"""
group_key = get_group_key(group_name=group_name)
group_json = monitor_key_get(service='admin', key=group_key)
try:
group = json.loads(group_json)
except TypeError:
group = None
except ValueError:
group = None
if not group:
group = {
'pools': [],
'services': []
}
return group
def save_service(service_name, service):
"""Persist a service in the monitor cluster"""
service['groups'] = {}
return monitor_key_set(service='admin',
key="cephx.services.{}".format(service_name),
value=json.dumps(service))
def save_group(group, group_name):
"""Persist a group in the monitor cluster"""
group_key = get_group_key(group_name=group_name)
return monitor_key_set(service='admin',
key=group_key,
value=json.dumps(group))
def get_group_key(group_name):
"""Build group key"""
return 'cephx.groups.{}'.format(group_name)
def handle_erasure_pool(request, service):
pool_name = request.get('name')
erasure_profile = request.get('erasure-profile')
quota = request.get('max-bytes')
weight = request.get('weight')
group_name = request.get('group')
if erasure_profile is None:
erasure_profile = "default-canonical"
@ -172,6 +355,13 @@ def handle_erasure_pool(request, service):
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
if group_name:
group_namespace = request.get('group-namespace')
# Add the pool to the group named "group_name"
add_pool_to_group(pool=pool_name,
group=group_name,
namespace=group_namespace)
# TODO: Default to 3/2 erasure coding. I believe this requires min 5 osds
if not erasure_profile_exists(service=service, name=erasure_profile):
# TODO: Fail and tell them to create the profile or default
@ -200,6 +390,7 @@ def handle_replicated_pool(request, service):
replicas = request.get('replicas')
quota = request.get('max-bytes')
weight = request.get('weight')
group_name = request.get('group')
# Optional params
pg_num = request.get('pg_num')
@ -215,6 +406,13 @@ def handle_replicated_pool(request, service):
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
if group_name:
group_namespace = request.get('group-namespace')
# Add the pool to the group named "group_name"
add_pool_to_group(pool=pool_name,
group=group_name,
namespace=group_namespace)
kwargs = {}
if pg_num:
kwargs['pg_num'] = pg_num
@ -570,6 +768,8 @@ def process_requests_v1(reqs):
ret = handle_rgw_create_user(request=req, service=svc)
elif op == "move-osd-to-bucket":
ret = handle_put_osd_in_bucket(request=req, service=svc)
elif op == "add-permissions-to-key":
ret = handle_add_permissions_to_key(request=req, service=svc)
else:
msg = "Unknown operation '%s'" % op
log(msg, level=ERROR)

View File

@ -36,7 +36,11 @@ import uuid
import re
import subprocess
from subprocess import (check_call, check_output, CalledProcessError, )
from subprocess import (
check_call,
check_output as s_check_output,
CalledProcessError,
)
from charmhelpers.core.hookenv import (config,
local_unit,
relation_get,
@ -111,6 +115,15 @@ DEFAULT_POOL_WEIGHT = 10.0
LEGACY_PG_COUNT = 200
def check_output(*args, **kwargs):
'''
Helper wrapper for py2/3 compat with subprocess.check_output
@returns str: UTF-8 decoded representation of output
'''
return s_check_output(*args, **kwargs).decode('UTF-8')
def validator(value, valid_type, valid_range=None):
"""
Used to validate these: http://docs.ceph.com/docs/master/rados/operations/
@ -188,7 +201,7 @@ class Crushmap(object):
stdout=subprocess.PIPE)
return subprocess.check_output(
('crushtool', '-d', '-'),
stdin=crush.stdout).decode('utf-8')
stdin=crush.stdout)
except Exception as e:
log("load_crushmap error: {}".format(e))
raise "Failed to read Crushmap"
@ -565,7 +578,8 @@ def monitor_key_delete(service, key):
:param key: six.string_types. The key to delete.
"""
try:
check_output(['ceph', '--id', service, 'config-key', 'del', str(key)])
check_output(['ceph', '--id', service,
'config-key', 'del', str(key)])
except CalledProcessError as e:
log("Monitor config-key put failed with message: {}".format(e.output))
raise
@ -867,8 +881,7 @@ def get_cache_mode(service, pool_name):
def pool_exists(service, name):
"""Check to see if a RADOS pool already exists."""
try:
out = check_output(['rados', '--id', service, 'lspools']).decode(
'UTF-8')
out = check_output(['rados', '--id', service, 'lspools'])
except CalledProcessError:
return False
@ -882,7 +895,7 @@ def get_osds(service):
version = ceph_version()
if version and version >= '0.56':
return json.loads(check_output(['ceph', '--id', service, 'osd', 'ls',
'--format=json']).decode('UTF-8'))
'--format=json']))
return None
@ -900,7 +913,7 @@ def rbd_exists(service, pool, rbd_img):
"""Check to see if a RADOS block device exists."""
try:
out = check_output(['rbd', 'list', '--id', service, '--pool', pool
]).decode('UTF-8')
])
except CalledProcessError:
return False
@ -1025,7 +1038,7 @@ def configure(service, key, auth, use_syslog):
def image_mapped(name):
"""Determine whether a RADOS block device is mapped locally."""
try:
out = check_output(['rbd', 'showmapped']).decode('UTF-8')
out = check_output(['rbd', 'showmapped'])
except CalledProcessError:
return False
@ -1212,7 +1225,7 @@ def ceph_version():
"""Retrieve the local version of ceph."""
if os.path.exists('/usr/bin/ceph'):
cmd = ['ceph', '-v']
output = check_output(cmd).decode('US-ASCII')
output = check_output(cmd)
output = output.split()
if len(output) > 3:
return output[2]

85
lib/setup.py Normal file
View File

@ -0,0 +1,85 @@
# -*- coding: utf-8 -*-
from __future__ import print_function
import os
import sys
from setuptools import setup, find_packages
from setuptools.command.test import test as TestCommand
version = "0.0.1.dev1"
install_require = [
]
tests_require = [
'tox >= 2.3.1',
]
class Tox(TestCommand):
user_options = [('tox-args=', 'a', "Arguments to pass to tox")]
def initialize_options(self):
TestCommand.initialize_options(self)
self.tox_args = None
def finalize_options(self):
TestCommand.finalize_options(self)
self.test_args = []
self.test_suite = True
def run_tests(self):
# import here, cause outside the eggs aren't loaded
import tox
import shlex
args = self.tox_args
# remove the 'test' arg from argv as tox passes it to ostestr which
# breaks it.
sys.argv.pop()
if args:
args = shlex.split(self.tox_args)
errno = tox.cmdline(args=args)
sys.exit(errno)
if sys.argv[-1] == 'publish':
os.system("python setup.py sdist upload")
os.system("python setup.py bdist_wheel upload")
sys.exit()
if sys.argv[-1] == 'tag':
os.system("git tag -a %s -m 'version %s'" % (version, version))
os.system("git push --tags")
sys.exit()
setup(
name='charms.ceph',
version=version,
description='Provide base module for ceph charms.',
classifiers=[
"Development Status :: 2 - Pre-Alpha",
"Intended Audience :: Developers",
"Topic :: System",
"Topic :: System :: Installation/Setup",
"Topic :: System :: Software Distribution",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.5",
"License :: OSI Approved :: Apache Software License",
],
url='https://github.com/openstack/charms.ceph',
author='OpenStack Charmers',
author_email='openstack-dev@lists.openstack.org',
license='Apache-2.0: http://www.apache.org/licenses/LICENSE-2.0',
packages=find_packages(exclude=["unit_tests"]),
zip_safe=False,
cmdclass={'test': Tox},
install_requires=install_require,
extras_require={
'testing': tests_require,
},
tests_require=tests_require,
)

View File

@ -30,6 +30,9 @@ cluster addr = {{ cluster_addr }}
{%- if crush_location %}
osd crush location = {{crush_location}}
{%- endif %}
{%- if upgrade_in_progress %}
setuser match path = /var/lib/ceph/$type/$cluster-$id
{%- endif %}
{% if global -%}
# The following are user-provided options provided via the config-flags charm option.
# User-provided [global] section config

View File

@ -40,6 +40,7 @@ from charmhelpers.contrib.amulet.utils import (
AmuletUtils
)
from charmhelpers.core.decorators import retry_on_exception
from charmhelpers.core.host import CompareHostReleases
DEBUG = logging.DEBUG
ERROR = logging.ERROR
@ -1255,7 +1256,7 @@ class OpenStackAmuletUtils(AmuletUtils):
contents = self.file_contents_safe(sentry_unit, '/etc/memcached.conf',
fatal=True)
ubuntu_release, _ = self.run_cmd_unit(sentry_unit, 'lsb_release -cs')
if ubuntu_release <= 'trusty':
if CompareHostReleases(ubuntu_release) <= 'trusty':
memcache_listen_addr = 'ip6-localhost'
else:
memcache_listen_addr = '::1'

View File

@ -45,6 +45,7 @@ if __platform__ == "ubuntu":
add_new_group,
lsb_release,
cmp_pkgrevno,
CompareHostReleases,
) # flake8: noqa -- ignore F401 for this import
elif __platform__ == "centos":
from charmhelpers.core.host_factory.centos import (
@ -52,6 +53,7 @@ elif __platform__ == "centos":
add_new_group,
lsb_release,
cmp_pkgrevno,
CompareHostReleases,
) # flake8: noqa -- ignore F401 for this import
UPDATEDB_PATH = '/etc/updatedb.conf'

View File

@ -2,6 +2,22 @@ import subprocess
import yum
import os
from charmhelpers.core.strutils import BasicStringComparator
class CompareHostReleases(BasicStringComparator):
"""Provide comparisons of Host releases.
Use in the form of
if CompareHostReleases(release) > 'trusty':
# do something with mitaka
"""
def __init__(self, item):
raise NotImplementedError(
"CompareHostReleases() is not implemented for CentOS")
def service_available(service_name):
# """Determine whether a system service is available."""

View File

@ -1,5 +1,37 @@
import subprocess
from charmhelpers.core.strutils import BasicStringComparator
UBUNTU_RELEASES = (
'lucid',
'maverick',
'natty',
'oneiric',
'precise',
'quantal',
'raring',
'saucy',
'trusty',
'utopic',
'vivid',
'wily',
'xenial',
'yakkety',
'zesty',
)
class CompareHostReleases(BasicStringComparator):
"""Provide comparisons of Ubuntu releases.
Use in the form of
if CompareHostReleases(release) > 'trusty':
# do something with mitaka
"""
_list = UBUNTU_RELEASES
def service_available(service_name):
"""Determine whether a system service is available"""

View File

@ -68,3 +68,56 @@ def bytes_from_string(value):
msg = "Unable to interpret string value '%s' as bytes" % (value)
raise ValueError(msg)
return int(matches.group(1)) * (1024 ** BYTE_POWER[matches.group(2)])
class BasicStringComparator(object):
"""Provides a class that will compare strings from an iterator type object.
Used to provide > and < comparisons on strings that may not necessarily be
alphanumerically ordered. e.g. OpenStack or Ubuntu releases AFTER the
z-wrap.
"""
_list = None
def __init__(self, item):
if self._list is None:
raise Exception("Must define the _list in the class definition!")
try:
self.index = self._list.index(item)
except Exception:
raise KeyError("Item '{}' is not in list '{}'"
.format(item, self._list))
def __eq__(self, other):
assert isinstance(other, str) or isinstance(other, self.__class__)
return self.index == self._list.index(other)
def __ne__(self, other):
return not self.__eq__(other)
def __lt__(self, other):
assert isinstance(other, str) or isinstance(other, self.__class__)
return self.index < self._list.index(other)
def __ge__(self, other):
return not self.__lt__(other)
def __gt__(self, other):
assert isinstance(other, str) or isinstance(other, self.__class__)
return self.index > self._list.index(other)
def __le__(self, other):
return not self.__gt__(other)
def __str__(self):
"""Always give back the item at the index so it can be used in
comparisons like:
s_mitaka = CompareOpenStack('mitaka')
s_newton = CompareOpenstack('newton')
assert s_newton > s_mitaka
@returns: <string>
"""
return self._list[self.index]

View File

@ -0,0 +1,25 @@
import platform
def get_platform():
"""Return the current OS platform.
For example: if current os platform is Ubuntu then a string "ubuntu"
will be returned (which is the name of the module).
This string is used to decide which platform module should be imported.
"""
# linux_distribution is deprecated and will be removed in Python 3.7
# Warings *not* disabled, as we certainly need to fix this.
tuple_platform = platform.linux_distribution()
current_platform = tuple_platform[0]
if "Ubuntu" in current_platform:
return "ubuntu"
elif "CentOS" in current_platform:
return "centos"
elif "debian" in current_platform:
# Stock Python does not detect Ubuntu and instead returns debian.
# Or at least it does in some build environments like Travis CI
return "ubuntu"
else:
raise RuntimeError("This module is not supported on {}."
.format(current_platform))

View File

@ -62,6 +62,7 @@ class CephHooksTestCase(unittest.TestCase):
'osd_journal_size': 1024,
'public_addr': '10.0.0.1',
'short_object_len': True,
'upgrade_in_progress': False,
'use_syslog': 'true'}
self.assertEqual(ctxt, expected)
@ -94,6 +95,7 @@ class CephHooksTestCase(unittest.TestCase):
'osd_journal_size': 1024,
'public_addr': '10.0.0.1',
'short_object_len': True,
'upgrade_in_progress': False,
'use_syslog': 'true'}
self.assertEqual(ctxt, expected)
@ -128,6 +130,7 @@ class CephHooksTestCase(unittest.TestCase):
'osd_journal_size': 1024,
'public_addr': '10.0.0.1',
'short_object_len': True,
'upgrade_in_progress': False,
'use_syslog': 'true'}
self.assertEqual(ctxt, expected)

View File

@ -74,23 +74,17 @@ class ReplaceOsdTestCase(test_utils.CharmTestCase):
assert ret == 0
@patch('ceph.mounts')
@patch('ceph.subprocess')
@patch('ceph.check_output')
@patch('ceph.umount')
@patch('ceph.osdize')
@patch('ceph.shutil')
@patch('ceph.systemd')
@patch('ceph.ceph_user')
def test_replace_osd(self,
ceph_user,
systemd,
shutil,
osdize,
umount,
subprocess,
mounts):
def test_replace_osd(self, ceph_user, systemd, shutil, osdize, umount,
check_output, mounts):
ceph_user.return_value = "ceph"
mounts.return_value = [['/var/lib/ceph/osd/ceph-a', '/dev/sda']]
subprocess.check_output.return_value = True
check_output.return_value = True
self.status_set.return_value = None
systemd.return_value = False
umount.return_value = 0
@ -103,7 +97,7 @@ class ReplaceOsdTestCase(test_utils.CharmTestCase):
osd_journal=None,
reformat_osd=False,
ignore_errors=False)
subprocess.check_output.assert_has_calls(
check_output.assert_has_calls(
[
call(['ceph', '--id', 'osd-upgrade',
'osd', 'out', 'osd.0']),

View File

@ -6,7 +6,6 @@ import ceph
TO_PATCH = [
'hookenv',
'status_set',
'subprocess',
'log',
]
@ -15,36 +14,37 @@ class PerformanceTestCase(test_utils.CharmTestCase):
def setUp(self):
super(PerformanceTestCase, self).setUp(ceph, TO_PATCH)
def test_tune_nic(self):
with patch('ceph.get_link_speed', return_value=10000):
with patch('ceph.save_sysctls') as save_sysctls:
@patch.object(ceph, 'check_output')
@patch.object(ceph, 'get_link_speed')
@patch.object(ceph, 'save_sysctls')
def test_tune_nic(self, save_sysctls, get_link_speed, check_output):
get_link_speed.return_value = 10000
ceph.tune_nic('eth0')
save_sysctls.assert_has_calls(
[
save_sysctls.assert_has_calls([
call(
save_location='/etc/sysctl.d/'
'51-ceph-osd-charm-eth0.conf',
save_location='/etc/sysctl.d/51-ceph-osd-charm-eth0.conf',
sysctl_dict={
'net.core.rmem_max': 524287,
'net.core.wmem_max': 524287,
'net.core.rmem_default': 524287,
'net.ipv4.tcp_wmem':
'10000000 10000000 10000000',
'net.ipv4.tcp_wmem': '10000000 10000000 10000000',
'net.core.netdev_max_backlog': 300000,
'net.core.optmem_max': 524287,
'net.ipv4.tcp_mem':
'10000000 10000000 10000000',
'net.ipv4.tcp_rmem':
'10000000 10000000 10000000',
'net.core.wmem_default': 524287})
'net.ipv4.tcp_mem': '10000000 10000000 10000000',
'net.ipv4.tcp_rmem': '10000000 10000000 10000000',
'net.core.wmem_default': 524287
})
])
self.status_set.assert_has_calls(
[
check_output.assert_called_with(['sysctl', '-p',
'/etc/sysctl.d/'
'51-ceph-osd-charm-eth0.conf'])
self.status_set.assert_has_calls([
call('maintenance', 'Tuning device eth0'),
])
def test_get_block_uuid(self):
self.subprocess.check_output.return_value = \
@patch('ceph.check_output')
def test_get_block_uuid(self, check_output):
check_output.return_value = \
'UUID=378f3c86-b21a-4172-832d-e2b3d4bc7511\nTYPE=ext2\n'
uuid = ceph.get_block_uuid('/dev/sda1')
self.assertEqual(uuid, '378f3c86-b21a-4172-832d-e2b3d4bc7511')
@ -118,8 +118,9 @@ class PerformanceTestCase(test_utils.CharmTestCase):
call('maintenance', 'Finished tuning device /dev/sda')
])
def test_set_hdd_read_ahead(self):
@patch('ceph.check_output')
def test_set_hdd_read_ahead(self, check_output):
ceph.set_hdd_read_ahead(dev_name='/dev/sda')
self.subprocess.check_output.assert_called_with(
check_output.assert_called_with(
['hdparm', '-a256', '/dev/sda']
)

View File

@ -2,7 +2,7 @@ import unittest
__author__ = 'Chris Holcombe <chris.holcombe@canonical.com>'
from mock import patch, MagicMock
from mock import call, patch, MagicMock
from ceph_hooks import check_for_upgrade
@ -18,13 +18,17 @@ def config_side_effect(*args):
class UpgradeRollingTestCase(unittest.TestCase):
@patch('ceph_hooks.ceph.dirs_need_ownership_update')
@patch('ceph_hooks.ceph.is_bootstrapped')
@patch('ceph_hooks.ceph.resolve_ceph_version')
@patch('ceph_hooks.emit_cephconf')
@patch('ceph_hooks.hookenv')
@patch('ceph_hooks.host')
@patch('ceph_hooks.ceph.roll_osd_cluster')
def test_check_for_upgrade(self, roll_osd_cluster, host, hookenv,
version, is_bootstrapped):
emit_cephconf, version, is_bootstrapped,
dirs_need_ownership_update):
dirs_need_ownership_update.return_value = False
is_bootstrapped.return_value = True
version.side_effect = ['firefly', 'hammer']
host.lsb_release.return_value = {
@ -36,9 +40,33 @@ class UpgradeRollingTestCase(unittest.TestCase):
config_side_effect('source')]
check_for_upgrade()
roll_osd_cluster.assert_called_with(
new_version='hammer',
roll_osd_cluster.assert_called_with(new_version='hammer',
upgrade_key='osd-upgrade')
emit_cephconf.assert_has_calls([call(upgrading=True),
call(upgrading=False)])
@patch('ceph_hooks.ceph.dirs_need_ownership_update')
@patch('ceph_hooks.ceph.is_bootstrapped')
@patch('ceph_hooks.ceph.resolve_ceph_version')
@patch('ceph_hooks.emit_cephconf')
@patch('ceph_hooks.hookenv')
@patch('ceph_hooks.host.lsb_release')
@patch('ceph_hooks.ceph.roll_osd_cluster')
def test_resume_failed_upgrade(self, roll_osd_cluster, lsb_release,
hookenv, emit_cephconf, version,
is_bootstrapped,
dirs_need_ownership_update):
dirs_need_ownership_update.return_value = True
is_bootstrapped.return_value = True
version.side_effect = ['jewel', 'jewel']
lsb_release.return_value = {'DISTRIB_CODENAME': 'trusty'}
check_for_upgrade()
roll_osd_cluster.assert_called_with(new_version='jewel',
upgrade_key='osd-upgrade')
emit_cephconf.assert_has_calls([call(upgrading=True),
call(upgrading=False)])
@patch('ceph_hooks.ceph.is_bootstrapped')
@patch('ceph_hooks.ceph.resolve_ceph_version')