Add support for erasure coding

Add support for use of Erasure Coded pools with the Ceph RADOS Gateway.
Only the data pool is actually Erasure Coded - all other pools continue
to be replicated but have much smaller data footprints.

Depends-On: Iec4de19f7b39f0b08158d96c5cc1561b40aefa10
Change-Id: I661639e67853ff471a7d7ddea0e3fc2fcb30fed1
This commit is contained in:
James Page 2020-07-28 14:45:39 +01:00
parent 4c76721923
commit 414701455f
16 changed files with 1660 additions and 509 deletions

1
.gitignore vendored
View File

@ -9,3 +9,4 @@ tags
.unit-state.db
func-results.json
.stestr/
**/__pycache__

View File

@ -121,6 +121,104 @@ options:
that once a pool has been created, changes to this setting will be
ignored. Setting this value to -1, enables the number of placement
groups to be calculated based on the Ceph placement group calculator.
pool-type:
type: string
default: replicated
description: |
Ceph pool type to use for storage - valid values include replicated
and erasure-coded.
ec-profile-name:
type: string
default:
description: |
Name for the EC profile to be created for the EC pools. If not defined
a profile name will be generated based on the name of the pool used by
the application.
ec-rbd-metadata-pool:
type: string
default:
description: |
Name of the metadata pool to be created (for RBD use-cases). If not
defined a metadata pool name will be generated based on the name of
the data pool used by the application. The metadata pool is always
replicated, not erasure coded.
ec-profile-k:
type: int
default: 1
description: |
Number of data chunks that will be used for EC data pool. K+M factors
should never be greater than the number of available zones (or hosts)
for balancing.
ec-profile-m:
type: int
default: 2
description: |
Number of coding chunks that will be used for EC data pool. K+M factors
should never be greater than the number of available zones (or hosts)
for balancing.
ec-profile-locality:
type: int
default:
description: |
(lrc plugin - l) Group the coding and data chunks into sets of size l.
For instance, for k=4 and m=2, when l=3 two groups of three are created.
Each set can be recovered without reading chunks from another set. Note
that using the lrc plugin does incur more raw storage usage than isa or
jerasure in order to reduce the cost of recovery operations.
ec-profile-crush-locality:
type: string
default:
description: |
(lrc plugin) The type of the crush bucket in which each set of chunks
defined by l will be stored. For instance, if it is set to rack, each
group of l chunks will be placed in a different rack. It is used to
create a CRUSH rule step such as step choose rack. If it is not set,
no such grouping is done.
ec-profile-durability-estimator:
type: int
default:
description: |
(shec plugin - c) The number of parity chunks each of which includes
each data chunk in its calculation range. The number is used as a
durability estimator. For instance, if c=2, 2 OSDs can be down
without losing data.
ec-profile-helper-chunks:
type: int
default:
description: |
(clay plugin - d) Number of OSDs requested to send data during
recovery of a single chunk. d needs to be chosen such that
k+1 <= d <= k+m-1. Larger the d, the better the savings.
ec-profile-scalar-mds:
type: string
default:
description: |
(clay plugin) specifies the plugin that is used as a building
block in the layered construction. It can be one of jerasure,
isa, shec (defaults to jerasure).
ec-profile-plugin:
type: string
default: jerasure
description: |
EC plugin to use for this applications pool. The following list of
plugins acceptable - jerasure, lrc, isa, shec, clay.
ec-profile-technique:
type: string
default:
description: |
EC profile technique used for this applications pool - will be
validated based on the plugin configured via ec-profile-plugin.
Supported techniques are reed_sol_van, reed_sol_r6_op,
cauchy_orig, cauchy_good, liber8tion for jerasure,
reed_sol_van, cauchy for isa and single, multiple
for shec.
ec-profile-device-class:
type: string
default:
description: |
Device class from CRUSH map to use for placement groups for
erasure profile - valid values: ssd, hdd or nvme (or leave
unset to not use a device class).
# Keystone integration
operator-roles:
type: string

View File

@ -18,6 +18,7 @@ import subprocess
from charmhelpers.core.hookenv import (
config,
service_name,
)
from charmhelpers.core.host import (
@ -111,18 +112,61 @@ def get_create_rgw_pools_rq(prefix=None):
replicas = config('ceph-osd-replication-count')
prefix = prefix or 'default'
# Buckets likely to contain the most data and therefore
# requiring the most PGs
heavy = [
'.rgw.buckets.data'
]
bucket_weight = config('rgw-buckets-pool-weight')
for pool in heavy:
pool = "{prefix}{pool}".format(prefix=prefix, pool=pool)
rq.add_op_create_pool(name=pool, replica_count=replicas,
weight=bucket_weight, group='objects',
app_name=CEPH_POOL_APP_NAME)
if config('pool-type') == 'erasure-coded':
# General EC plugin config
plugin = config('ec-profile-plugin')
technique = config('ec-profile-technique')
device_class = config('ec-profile-device-class')
bdm_k = config('ec-profile-k')
bdm_m = config('ec-profile-m')
# LRC plugin config
bdm_l = config('ec-profile-locality')
crush_locality = config('ec-profile-crush-locality')
# SHEC plugin config
bdm_c = config('ec-profile-durability-estimator')
# CLAY plugin config
bdm_d = config('ec-profile-helper-chunks')
scalar_mds = config('ec-profile-scalar-mds')
# Profile name
service = service_name()
profile_name = (
config('ec-profile-name') or "{}-profile".format(service)
)
rq.add_op_create_erasure_profile(
name=profile_name,
k=bdm_k, m=bdm_m,
lrc_locality=bdm_l,
lrc_crush_locality=crush_locality,
shec_durability_estimator=bdm_c,
clay_helper_chunks=bdm_d,
clay_scalar_mds=scalar_mds,
device_class=device_class,
erasure_type=plugin,
erasure_technique=technique
)
for pool in heavy:
pool = "{prefix}{pool}".format(prefix=prefix, pool=pool)
rq.add_op_create_erasure_pool(
name=pool,
erasure_profile=profile_name,
weight=bucket_weight,
group="objects",
app_name=CEPH_POOL_APP_NAME
)
else:
for pool in heavy:
pool = "{prefix}{pool}".format(prefix=prefix, pool=pool)
rq.add_op_create_pool(name=pool, replica_count=replicas,
weight=bucket_weight, group='objects',
app_name=CEPH_POOL_APP_NAME)
# NOTE: we want these pools to have a smaller pg_num/pgp_num than the
# others since they are not expected to contain as much data

View File

@ -49,7 +49,8 @@ __deprecated_functions = {}
def deprecate(warning, date=None, log=None):
"""Add a deprecation warning the first time the function is used.
The date, which is a string in semi-ISO8660 format indicate the year-month
The date which is a string in semi-ISO8660 format indicates the year-month
that the function is officially going to be removed.
usage:
@ -62,10 +63,11 @@ def deprecate(warning, date=None, log=None):
The reason for passing the logging function (log) is so that hookenv.log
can be used for a charm if needed.
:param warning: String to indicat where it has moved ot.
:param date: optional sting, in YYYY-MM format to indicate when the
:param warning: String to indicate what is to be used instead.
:param date: Optional string in YYYY-MM format to indicate when the
function will definitely (probably) be removed.
:param log: The log function to call to log. If not, logs to stdout
:param log: The log function to call in order to log. If None, logs to
stdout
"""
def wrap(f):

View File

@ -18,14 +18,14 @@
# Authors:
# Matthew Wedgwood <matthew.wedgwood@canonical.com>
import subprocess
import pwd
import glob
import grp
import os
import glob
import shutil
import pwd
import re
import shlex
import shutil
import subprocess
import yaml
from charmhelpers.core.hookenv import (
@ -265,6 +265,11 @@ class NRPE(object):
relation_set(relation_id=rid, relation_settings={'primary': self.primary})
self.remove_check_queue = set()
@classmethod
def does_nrpe_conf_dir_exist(cls):
"""Return True if th nrpe_confdif directory exists."""
return os.path.isdir(cls.nrpe_confdir)
def add_check(self, *args, **kwargs):
shortname = None
if kwargs.get('shortname') is None:
@ -310,6 +315,12 @@ class NRPE(object):
nrpe_monitors = {}
monitors = {"monitors": {"remote": {"nrpe": nrpe_monitors}}}
# check that the charm can write to the conf dir. If not, then nagios
# probably isn't installed, and we can defer.
if not self.does_nrpe_conf_dir_exist():
return
for nrpecheck in self.checks:
nrpecheck.write(self.nagios_context, self.hostname,
self.nagios_servicegroups)
@ -400,7 +411,7 @@ def add_init_service_checks(nrpe, services, unit_name, immediate_check=True):
upstart_init = '/etc/init/%s.conf' % svc
sysv_init = '/etc/init.d/%s' % svc
if host.init_is_systemd():
if host.init_is_systemd(service_name=svc):
nrpe.add_check(
shortname=svc,
description='process check {%s}' % unit_name,

View File

@ -29,6 +29,8 @@ from subprocess import check_call, CalledProcessError
import six
import charmhelpers.contrib.storage.linux.ceph as ch_ceph
from charmhelpers.contrib.openstack.audits.openstack_security_guide import (
_config_ini as config_ini
)
@ -56,6 +58,7 @@ from charmhelpers.core.hookenv import (
status_set,
network_get_primary_address,
WARNING,
service_name,
)
from charmhelpers.core.sysctl import create as sysctl_create
@ -808,6 +811,12 @@ class CephContext(OSContextGenerator):
ctxt['mon_hosts'] = ' '.join(sorted(mon_hosts))
if config('pool-type') and config('pool-type') == 'erasure-coded':
base_pool_name = config('rbd-pool') or config('rbd-pool-name')
if not base_pool_name:
base_pool_name = service_name()
ctxt['rbd_default_data_pool'] = base_pool_name
if not os.path.isdir('/etc/ceph'):
os.mkdir('/etc/ceph')
@ -3175,3 +3184,78 @@ class SRIOVContext(OSContextGenerator):
:rtype: Dict[str,int]
"""
return self._map
class CephBlueStoreCompressionContext(OSContextGenerator):
"""Ceph BlueStore compression options."""
# Tuple with Tuples that map configuration option name to CephBrokerRq op
# property name
options = (
('bluestore-compression-algorithm',
'compression-algorithm'),
('bluestore-compression-mode',
'compression-mode'),
('bluestore-compression-required-ratio',
'compression-required-ratio'),
('bluestore-compression-min-blob-size',
'compression-min-blob-size'),
('bluestore-compression-min-blob-size-hdd',
'compression-min-blob-size-hdd'),
('bluestore-compression-min-blob-size-ssd',
'compression-min-blob-size-ssd'),
('bluestore-compression-max-blob-size',
'compression-max-blob-size'),
('bluestore-compression-max-blob-size-hdd',
'compression-max-blob-size-hdd'),
('bluestore-compression-max-blob-size-ssd',
'compression-max-blob-size-ssd'),
)
def __init__(self):
"""Initialize context by loading values from charm config.
We keep two maps, one suitable for use with CephBrokerRq's and one
suitable for template generation.
"""
charm_config = config()
# CephBrokerRq op map
self.op = {}
# Context exposed for template generation
self.ctxt = {}
for config_key, op_key in self.options:
value = charm_config.get(config_key)
self.ctxt.update({config_key.replace('-', '_'): value})
self.op.update({op_key: value})
def __call__(self):
"""Get context.
:returns: Context
:rtype: Dict[str,any]
"""
return self.ctxt
def get_op(self):
"""Get values for use in CephBrokerRq op.
:returns: Context values with CephBrokerRq op property name as key.
:rtype: Dict[str,any]
"""
return self.op
def validate(self):
"""Validate options.
:raises: AssertionError
"""
# We slip in a dummy name on class instantiation to allow validation of
# the other options. It will not affect further use.
#
# NOTE: once we retire Python 3.5 we can fold this into a in-line
# dictionary comprehension in the call to the initializer.
dummy_op = {'name': 'dummy-name'}
dummy_op.update(self.op)
pool = ch_ceph.BasePool('dummy-service', op=dummy_op)
pool.validate()

View File

@ -22,3 +22,7 @@ rbd default features = {{ rbd_features }}
{{ key }} = {{ value }}
{% endfor -%}
{%- endif %}
{% if rbd_default_data_pool -%}
rbd default data pool = {{ rbd_default_data_pool }}
{% endif %}

View File

@ -6,8 +6,14 @@ Listen {{ ext_port }}
<VirtualHost {{ address }}:{{ ext }}>
ServerName {{ endpoint }}
SSLEngine on
SSLProtocol +TLSv1 +TLSv1.1 +TLSv1.2
SSLCipherSuite HIGH:!RC4:!MD5:!aNULL:!eNULL:!EXP:!LOW:!MEDIUM
# This section is based on Mozilla's recommendation
# as the "intermediate" profile as of July 7th, 2020.
# https://wiki.mozilla.org/Security/Server_Side_TLS
SSLProtocol all -SSLv3 -TLSv1 -TLSv1.1
SSLCipherSuite ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384
SSLHonorCipherOrder off
SSLCertificateFile /etc/apache2/ssl/{{ namespace }}/cert_{{ endpoint }}
# See LP 1484489 - this is to support <= 2.4.7 and >= 2.4.8
SSLCertificateChainFile /etc/apache2/ssl/{{ namespace }}/cert_{{ endpoint }}

View File

@ -6,8 +6,14 @@ Listen {{ ext_port }}
<VirtualHost {{ address }}:{{ ext }}>
ServerName {{ endpoint }}
SSLEngine on
SSLProtocol +TLSv1 +TLSv1.1 +TLSv1.2
SSLCipherSuite HIGH:!RC4:!MD5:!aNULL:!eNULL:!EXP:!LOW:!MEDIUM
# This section is based on Mozilla's recommendation
# as the "intermediate" profile as of July 7th, 2020.
# https://wiki.mozilla.org/Security/Server_Side_TLS
SSLProtocol all -SSLv3 -TLSv1 -TLSv1.1
SSLCipherSuite ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384
SSLHonorCipherOrder off
SSLCertificateFile /etc/apache2/ssl/{{ namespace }}/cert_{{ endpoint }}
# See LP 1484489 - this is to support <= 2.4.7 and >= 2.4.8
SSLCertificateChainFile /etc/apache2/ssl/{{ namespace }}/cert_{{ endpoint }}

View File

@ -0,0 +1,28 @@
{# section header omitted as options can belong to multiple sections #}
{% if bluestore_compression_algorithm -%}
bluestore compression algorithm = {{ bluestore_compression_algorithm }}
{% endif -%}
{% if bluestore_compression_mode -%}
bluestore compression mode = {{ bluestore_compression_mode }}
{% endif -%}
{% if bluestore_compression_required_ratio -%}
bluestore compression required ratio = {{ bluestore_compression_required_ratio }}
{% endif -%}
{% if bluestore_compression_min_blob_size -%}
bluestore compression min blob size = {{ bluestore_compression_min_blob_size }}
{% endif -%}
{% if bluestore_compression_min_blob_size_hdd -%}
bluestore compression min blob size hdd = {{ bluestore_compression_min_blob_size_hdd }}
{% endif -%}
{% if bluestore_compression_min_blob_size_ssd -%}
bluestore compression min blob size ssd = {{ bluestore_compression_min_blob_size_ssd }}
{% endif -%}
{% if bluestore_compression_max_blob_size -%}
bluestore compression max blob size = {{ bluestore_compression_max_blob_size }}
{% endif -%}
{% if bluestore_compression_max_blob_size_hdd -%}
bluestore compression max blob size hdd = {{ bluestore_compression_max_blob_size_hdd }}
{% endif -%}
{% if bluestore_compression_max_blob_size_ssd -%}
bluestore compression max blob size ssd = {{ bluestore_compression_max_blob_size_ssd }}
{% endif -%}

View File

@ -2241,10 +2241,13 @@ def inform_peers_unit_state(state, relation_name='cluster'):
if state not in UNIT_STATES:
raise ValueError(
"Setting invalid state {} for unit".format(state))
this_unit = local_unit()
for r_id in relation_ids(relation_name):
juju_log('Telling peer behind relation {} that {} is {}'.format(
r_id, this_unit, state), 'DEBUG')
relation_set(relation_id=r_id,
relation_settings={
get_peer_key(local_unit()): state})
get_peer_key(this_unit): state})
def get_peers_unit_state(relation_name='cluster'):
@ -2276,8 +2279,10 @@ def are_peers_ready(relation_name='cluster'):
:returns: Whether all units are ready.
:rtype: bool
"""
unit_states = get_peers_unit_state(relation_name)
return all(v == UNIT_READY for v in unit_states.values())
unit_states = get_peers_unit_state(relation_name).values()
juju_log('{} peers are in the following states: {}'.format(
relation_name, unit_states), 'DEBUG')
return all(state == UNIT_READY for state in unit_states)
def inform_peers_if_ready(check_unit_ready_func, relation_name='cluster'):
@ -2360,7 +2365,9 @@ def get_api_application_status():
app_state, msg = get_api_unit_status()
if app_state == WORKLOAD_STATES.ACTIVE:
if are_peers_ready():
return WORKLOAD_STATES.ACTIVE, 'Application Ready'
msg = 'Application Ready'
else:
return WORKLOAD_STATES.WAITING, 'Some units are not ready'
app_state = WORKLOAD_STATES.WAITING
msg = 'Some units are not ready'
juju_log(msg, 'DEBUG')
return app_state, msg

File diff suppressed because it is too large Load Diff

View File

@ -193,7 +193,7 @@ def service_pause(service_name, init_dir="/etc/init", initd_dir="/etc/init.d",
stopped = service_stop(service_name, **kwargs)
upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
sysv_file = os.path.join(initd_dir, service_name)
if init_is_systemd():
if init_is_systemd(service_name=service_name):
service('disable', service_name)
service('mask', service_name)
elif os.path.exists(upstart_file):
@ -227,7 +227,7 @@ def service_resume(service_name, init_dir="/etc/init",
"""
upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
sysv_file = os.path.join(initd_dir, service_name)
if init_is_systemd():
if init_is_systemd(service_name=service_name):
service('unmask', service_name)
service('enable', service_name)
elif os.path.exists(upstart_file):
@ -257,7 +257,7 @@ def service(action, service_name, **kwargs):
:param **kwargs: additional params to be passed to the service command in
the form of key=value.
"""
if init_is_systemd():
if init_is_systemd(service_name=service_name):
cmd = ['systemctl', action, service_name]
else:
cmd = ['service', service_name, action]
@ -281,7 +281,7 @@ def service_running(service_name, **kwargs):
units (e.g. service ceph-osd status id=2). The kwargs
are ignored in systemd services.
"""
if init_is_systemd():
if init_is_systemd(service_name=service_name):
return service('is-active', service_name)
else:
if os.path.exists(_UPSTART_CONF.format(service_name)):
@ -311,8 +311,14 @@ def service_running(service_name, **kwargs):
SYSTEMD_SYSTEM = '/run/systemd/system'
def init_is_systemd():
"""Return True if the host system uses systemd, False otherwise."""
def init_is_systemd(service_name=None):
"""
Returns whether the host uses systemd for the specified service.
@param Optional[str] service_name: specific name of service
"""
if str(service_name).startswith("snap."):
return True
if lsb_release()['DISTRIB_CODENAME'] == 'trusty':
return False
return os.path.isdir(SYSTEMD_SYSTEM)

View File

@ -155,25 +155,47 @@ def handle_create_erasure_profile(request, service):
:param service: The ceph client to run the command under.
:returns: dict. exit-code and reason if not 0
"""
# "local" | "shec" or it defaults to "jerasure"
# "isa" | "lrc" | "shec" | "clay" or it defaults to "jerasure"
erasure_type = request.get('erasure-type')
# "host" | "rack" or it defaults to "host" # Any valid Ceph bucket
# dependent on erasure coding type
erasure_technique = request.get('erasure-technique')
# "host" | "rack" | ...
failure_domain = request.get('failure-domain')
name = request.get('name')
# Binary Distribution Matrix (BDM) parameters
bdm_k = request.get('k')
bdm_m = request.get('m')
# LRC parameters
bdm_l = request.get('l')
crush_locality = request.get('crush-locality')
# SHEC parameters
bdm_c = request.get('c')
# CLAY parameters
bdm_d = request.get('d')
scalar_mds = request.get('scalar-mds')
# Device Class
device_class = request.get('device-class')
if failure_domain not in CEPH_BUCKET_TYPES:
if failure_domain and failure_domain not in CEPH_BUCKET_TYPES:
msg = "failure-domain must be one of {}".format(CEPH_BUCKET_TYPES)
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
create_erasure_profile(service=service, erasure_plugin_name=erasure_type,
profile_name=name, failure_domain=failure_domain,
data_chunks=bdm_k, coding_chunks=bdm_m,
locality=bdm_l)
create_erasure_profile(service=service,
erasure_plugin_name=erasure_type,
profile_name=name,
failure_domain=failure_domain,
data_chunks=bdm_k,
coding_chunks=bdm_m,
locality=bdm_l,
durability_estimator=bdm_d,
helper_chunks=bdm_c,
scalar_mds=scalar_mds,
crush_locality=crush_locality,
device_class=device_class,
erasure_plugin_technique=erasure_technique)
return {'exit-code': 0}
def handle_add_permissions_to_key(request, service):
@ -387,6 +409,7 @@ def handle_erasure_pool(request, service):
max_objects = request.get('max-objects')
weight = request.get('weight')
group_name = request.get('group')
allow_ec_overwrites = request.get('allow-ec-overwrites')
if erasure_profile is None:
erasure_profile = "default-canonical"
@ -416,7 +439,9 @@ def handle_erasure_pool(request, service):
pool = ErasurePool(service=service, name=pool_name,
erasure_code_profile=erasure_profile,
percent_data=weight, app_name=app_name)
percent_data=weight,
app_name=app_name,
allow_ec_overwrites=allow_ec_overwrites)
# Ok make the erasure pool
if not pool_exists(service=service, name=pool_name):
log("Creating pool '{}' (erasure_profile={})"

View File

@ -2169,15 +2169,18 @@ def roll_monitor_cluster(new_version, upgrade_key):
status_set('blocked', 'failed to upgrade monitor')
# TODO(jamespage):
# Mimic support will need to ensure that ceph-mgr daemons are also
# restarted during upgrades - probably through use of one of the
# high level systemd targets shipped by the packaging.
def upgrade_monitor(new_version):
# For E731 we can't assign a lambda, therefore, instead pass this.
def noop():
pass
def upgrade_monitor(new_version, kick_function=None):
"""Upgrade the current ceph monitor to the new version
:param new_version: String version to upgrade to.
"""
if kick_function is None:
kick_function = noop
current_version = get_version()
status_set("maintenance", "Upgrading monitor")
log("Current ceph version is {}".format(current_version))
@ -2186,6 +2189,7 @@ def upgrade_monitor(new_version):
# Needed to determine if whether to stop/start ceph-mgr
luminous_or_later = cmp_pkgrevno('ceph-common', '12.2.0') >= 0
kick_function()
try:
add_source(config('source'), config('key'))
apt_update(fatal=True)
@ -2194,6 +2198,7 @@ def upgrade_monitor(new_version):
err))
status_set("blocked", "Upgrade to {} failed".format(new_version))
sys.exit(1)
kick_function()
try:
if systemd():
service_stop('ceph-mon')
@ -2204,6 +2209,7 @@ def upgrade_monitor(new_version):
else:
service_stop('ceph-mon-all')
apt_install(packages=determine_packages(), fatal=True)
kick_function()
owner = ceph_user()
@ -2217,6 +2223,8 @@ def upgrade_monitor(new_version):
group=owner,
follow_links=True)
kick_function()
# Ensure that mon directory is user writable
hostname = socket.gethostname()
path = '/var/lib/ceph/mon/ceph-{}'.format(hostname)
@ -2257,13 +2265,22 @@ def lock_and_roll(upgrade_key, service, my_name, version):
start_timestamp))
monitor_key_set(upgrade_key, "{}_{}_{}_start".format(
service, my_name, version), start_timestamp)
# alive indication:
alive_function = (
lambda: monitor_key_set(
upgrade_key, "{}_{}_{}_alive"
.format(service, my_name, version), time.time()))
dog = WatchDog(kick_interval=3 * 60,
kick_function=alive_function)
log("Rolling")
# This should be quick
if service == 'osd':
upgrade_osd(version)
upgrade_osd(version, kick_function=dog.kick_the_dog)
elif service == 'mon':
upgrade_monitor(version)
upgrade_monitor(version, kick_function=dog.kick_the_dog)
else:
log("Unknown service {}. Unable to upgrade".format(service),
level=ERROR)
@ -2294,45 +2311,225 @@ def wait_on_previous_node(upgrade_key, service, previous_node, version):
"""
log("Previous node is: {}".format(previous_node))
previous_node_finished = monitor_key_exists(
upgrade_key,
"{}_{}_{}_done".format(service, previous_node, version))
while previous_node_finished is False:
log("{} is not finished. Waiting".format(previous_node))
# Has this node been trying to upgrade for longer than
# 10 minutes?
# If so then move on and consider that node dead.
# NOTE: This assumes the clusters clocks are somewhat accurate
# If the hosts clock is really far off it may cause it to skip
# the previous node even though it shouldn't.
current_timestamp = time.time()
previous_node_start_time = monitor_key_get(
previous_node_started_f = (
lambda: monitor_key_exists(
upgrade_key,
"{}_{}_{}_start".format(service, previous_node, version))
if (previous_node_start_time is not None and
((current_timestamp - (10 * 60)) >
float(previous_node_start_time))):
# NOTE(jamespage):
# Previous node is probably dead as we've been waiting
# for 10 minutes - lets move on and upgrade
log("Waited 10 mins on node {}. current time: {} > "
"previous node start time: {} Moving on".format(
previous_node,
(current_timestamp - (10 * 60)),
previous_node_start_time))
return
# NOTE(jamespage)
# Previous node has not started, or started less than
# 10 minutes ago - sleep a random amount of time and
# then check again.
wait_time = random.randrange(5, 30)
log('waiting for {} seconds'.format(wait_time))
time.sleep(wait_time)
previous_node_finished = monitor_key_exists(
"{}_{}_{}_start".format(service, previous_node, version)))
previous_node_finished_f = (
lambda: monitor_key_exists(
upgrade_key,
"{}_{}_{}_done".format(service, previous_node, version))
"{}_{}_{}_done".format(service, previous_node, version)))
previous_node_alive_time_f = (
lambda: monitor_key_get(
upgrade_key,
"{}_{}_{}_alive".format(service, previous_node, version)))
# wait for 30 minutes until the previous node starts. We don't proceed
# unless we get a start condition.
try:
WatchDog.wait_until(previous_node_started_f, timeout=30 * 60)
except WatchDog.WatchDogTimeoutException:
log("Waited for previous node to start for 30 minutes. "
"It didn't start, so may have a serious issue. Continuing with "
"upgrade of this node.",
level=WARNING)
return
# keep the time it started from this nodes' perspective.
previous_node_started_at = time.time()
log("Detected that previous node {} has started. Time now: {}"
.format(previous_node, previous_node_started_at))
# Now wait for the node to complete. The node may optionally be kicking
# with the *_alive key, which allows this node to wait longer as it 'knows'
# the other node is proceeding.
try:
WatchDog.timed_wait(kicked_at_function=previous_node_alive_time_f,
complete_function=previous_node_finished_f,
wait_time=30 * 60,
compatibility_wait_time=10 * 60,
max_kick_interval=5 * 60)
except WatchDog.WatchDogDeadException:
# previous node was kicking, but timed out; log this condition and move
# on.
now = time.time()
waited = int((now - previous_node_started_at) / 60)
log("Previous node started, but has now not ticked for 5 minutes. "
"Waited total of {} mins on node {}. current time: {} > "
"previous node start time: {}. "
"Continuing with upgrade of this node."
.format(waited, previous_node, now, previous_node_started_at),
level=WARNING)
except WatchDog.WatchDogTimeoutException:
# previous node never kicked, or simply took too long; log this
# condition and move on.
now = time.time()
waited = int((now - previous_node_started_at) / 60)
log("Previous node is taking too long; assuming it has died."
"Waited {} mins on node {}. current time: {} > "
"previous node start time: {}. "
"Continuing with upgrade of this node."
.format(waited, previous_node, now, previous_node_started_at),
level=WARNING)
class WatchDog(object):
"""Watch a dog; basically a kickable timer with a timeout between two async
units.
The idea is that you have an overall timeout and then can kick that timeout
with intermediary hits, with a max time between those kicks allowed.
Note that this watchdog doesn't rely on the clock of the other side; just
roughly when it detects when the other side started. All timings are based
on the local clock.
The kicker will not 'kick' more often than a set interval, regardless of
how often the kick_the_dog() function is called. The kicker provides a
function (lambda: -> None) that is called when the kick interval is
reached.
The waiter calls the static method with a check function
(lambda: -> Boolean) that indicates when the wait should be over and the
maximum interval to wait. e.g. 30 minutes with a 5 minute kick interval.
So the waiter calls wait(f, 30, 3) and the kicker sets up a 3 minute kick
interval, or however long it is expected for the key to propagate and to
allow for other delays.
There is a compatibility mode where if the otherside never kicks, then it
simply waits for the compatability timer.
"""
class WatchDogDeadException(Exception):
pass
class WatchDogTimeoutException(Exception):
pass
def __init__(self, kick_interval=3 * 60, kick_function=None):
"""Initialise a new WatchDog
:param kick_interval: the interval when this side kicks the other in
seconds.
:type kick_interval: Int
:param kick_function: The function to call that does the kick.
:type kick_function: Callable[]
"""
self.start_time = time.time()
self.last_run_func = None
self.last_kick_at = None
self.kick_interval = kick_interval
self.kick_f = kick_function
def kick_the_dog(self):
"""Might call the kick_function if it's time.
This function can be called as frequently as needed, but will run the
self.kick_function after kick_interval seconds have passed.
"""
now = time.time()
if (self.last_run_func is None or
(now - self.last_run_func > self.kick_interval)):
if self.kick_f is not None:
self.kick_f()
self.last_run_func = now
self.last_kick_at = now
@staticmethod
def wait_until(wait_f, timeout=10 * 60):
"""Wait for timeout seconds until the passed function return True.
:param wait_f: The function to call that will end the wait.
:type wait_f: Callable[[], Boolean]
:param timeout: The time to wait in seconds.
:type timeout: int
"""
start_time = time.time()
while(not wait_f()):
now = time.time()
if now > start_time + timeout:
raise WatchDog.WatchDogTimeoutException()
wait_time = random.randrange(5, 30)
log('wait_until: waiting for {} seconds'.format(wait_time))
time.sleep(wait_time)
@staticmethod
def timed_wait(kicked_at_function,
complete_function,
wait_time=30 * 60,
compatibility_wait_time=10 * 60,
max_kick_interval=5 * 60):
"""Wait a maximum time with an intermediate 'kick' time.
This function will wait for max_kick_interval seconds unless the
kicked_at_function() call returns a time that is not older that
max_kick_interval (in seconds). i.e. the other side can signal that it
is still doing things during the max_kick_interval as long as it kicks
at least every max_kick_interval seconds.
The maximum wait is "wait_time", but the otherside must keep kicking
during this period.
The "compatibility_wait_time" is used if the other side never kicks
(i.e. the kicked_at_function() always returns None. In this case the
function wait up to "compatibility_wait_time".
Note that the type of the return from the kicked_at_function is an
Optional[str], not a Float. The function will coerce this to a float
for the comparison. This represents the return value of
time.time() at the "other side". It's a string to simplify the
function obtaining the time value from the other side.
The function raises WatchDogTimeoutException if either the
compatibility_wait_time or the wait_time are exceeded.
The function raises WatchDogDeadException if the max_kick_interval is
exceeded.
Note that it is possible that the first kick interval is extended to
compatibility_wait_time if the "other side" doesn't kick immediately.
The best solution is for the other side to kick early and often.
:param kicked_at_function: The function to call to retrieve the time
that the other side 'kicked' at. None if the other side hasn't
kicked.
:type kicked_at_function: Callable[[], Optional[str]]
:param complete_function: The callable that returns True when done.
:type complete_function: Callable[[], Boolean]
:param wait_time: the maximum time to wait, even with kicks, in
seconds.
:type wait_time: int
:param compatibility_wait_time: The time to wait if no kicks are
received, in seconds.
:type compatibility_wait_time: int
:param max_kick_interval: The maximum time allowed between kicks before
the wait is over, in seconds:
:type max_kick_interval: int
:raises: WatchDog.WatchDogTimeoutException,
WatchDog.WatchDogDeadException
"""
start_time = time.time()
while True:
if complete_function():
break
# the time when the waiting for unit last kicked.
kicked_at = kicked_at_function()
now = time.time()
if kicked_at is None:
# assume other end doesn't do alive kicks
if (now - start_time > compatibility_wait_time):
raise WatchDog.WatchDogTimeoutException()
else:
# other side is participating in kicks; must kick at least
# every 'max_kick_interval' to stay alive.
if (now - float(kicked_at) > max_kick_interval):
raise WatchDog.WatchDogDeadException()
if (now - start_time > wait_time):
raise WatchDog.WatchDogTimeoutException()
delay_time = random.randrange(5, 30)
log('waiting for {} seconds'.format(delay_time))
time.sleep(delay_time)
def get_upgrade_position(osd_sorted_list, match_name):
@ -2412,11 +2609,14 @@ def roll_osd_cluster(new_version, upgrade_key):
status_set('blocked', 'failed to upgrade osd')
def upgrade_osd(new_version):
def upgrade_osd(new_version, kick_function=None):
"""Upgrades the current osd
:param new_version: str. The new version to upgrade to
"""
if kick_function is None:
kick_function = noop
current_version = get_version()
status_set("maintenance", "Upgrading osd")
log("Current ceph version is {}".format(current_version))
@ -2431,10 +2631,13 @@ def upgrade_osd(new_version):
status_set("blocked", "Upgrade to {} failed".format(new_version))
sys.exit(1)
kick_function()
try:
# Upgrade the packages before restarting the daemons.
status_set('maintenance', 'Upgrading packages to %s' % new_version)
apt_install(packages=determine_packages(), fatal=True)
kick_function()
# If the upgrade does not need an ownership update of any of the
# directories in the osd service directory, then simply restart
@ -2458,13 +2661,16 @@ def upgrade_osd(new_version):
os.listdir(CEPH_BASE_DIR))
non_osd_dirs = map(lambda x: os.path.join(CEPH_BASE_DIR, x),
non_osd_dirs)
for path in non_osd_dirs:
for i, path in enumerate(non_osd_dirs):
if i % 100 == 0:
kick_function()
update_owner(path)
# Fast service restart wasn't an option because each of the OSD
# directories need the ownership updated for all the files on
# the OSD. Walk through the OSDs one-by-one upgrading the OSD.
for osd_dir in _get_child_dirs(OSD_BASE_DIR):
kick_function()
try:
osd_num = _get_osd_num_from_dirname(osd_dir)
_upgrade_single_osd(osd_num, osd_dir)

View File

@ -24,6 +24,7 @@ TO_PATCH = [
'os',
'subprocess',
'mkdir',
'service_name',
]
@ -31,6 +32,7 @@ class CephRadosGWCephTests(CharmTestCase):
def setUp(self):
super(CephRadosGWCephTests, self).setUp(ceph, TO_PATCH)
self.config.side_effect = self.test_config.get
self.service_name.return_value = 'ceph-radosgw'
def test_import_radosgw_key(self):
self.os.path.exists.return_value = False
@ -138,6 +140,82 @@ class CephRadosGWCephTests(CharmTestCase):
name='objects',
permission='rwx')
@patch('charmhelpers.contrib.storage.linux.ceph.CephBrokerRq'
'.add_op_create_erasure_profile')
@patch('charmhelpers.contrib.storage.linux.ceph.CephBrokerRq'
'.add_op_create_erasure_pool')
@patch('charmhelpers.contrib.storage.linux.ceph.CephBrokerRq'
'.add_op_request_access_to_group')
@patch('charmhelpers.contrib.storage.linux.ceph.CephBrokerRq'
'.add_op_create_pool')
def test_create_rgw_pools_rq_no_prefix_ec(self, mock_broker,
mock_request_access,
mock_request_create_ec_pool,
mock_request_create_ec_profile):
self.test_config.set('rgw-lightweight-pool-pg-num', -1)
self.test_config.set('ceph-osd-replication-count', 3)
self.test_config.set('rgw-buckets-pool-weight', 19)
self.test_config.set('restrict-ceph-pools', True)
self.test_config.set('pool-type', 'erasure-coded')
self.test_config.set('ec-profile-k', 3)
self.test_config.set('ec-profile-m', 9)
self.test_config.set('ec-profile-technique', 'cauchy_good')
ceph.get_create_rgw_pools_rq(prefix=None)
mock_request_create_ec_profile.assert_called_once_with(
name='ceph-radosgw-profile',
k=3, m=9,
lrc_locality=None,
lrc_crush_locality=None,
shec_durability_estimator=None,
clay_helper_chunks=None,
clay_scalar_mds=None,
device_class=None,
erasure_type='jerasure',
erasure_technique='cauchy_good'
)
mock_request_create_ec_pool.assert_has_calls([
call(name='default.rgw.buckets.data',
erasure_profile='ceph-radosgw-profile',
weight=19,
group="objects",
app_name='rgw')
])
mock_broker.assert_has_calls([
call(weight=0.10, replica_count=3, name='default.rgw.control',
group='objects', app_name='rgw'),
call(weight=0.10, replica_count=3, name='default.rgw.data.root',
group='objects', app_name='rgw'),
call(weight=0.10, replica_count=3, name='default.rgw.gc',
group='objects', app_name='rgw'),
call(weight=0.10, replica_count=3, name='default.rgw.log',
group='objects', app_name='rgw'),
call(weight=0.10, replica_count=3, name='default.rgw.intent-log',
group='objects', app_name='rgw'),
call(weight=0.10, replica_count=3, name='default.rgw.meta',
group='objects', app_name='rgw'),
call(weight=0.10, replica_count=3, name='default.rgw.usage',
group='objects', app_name='rgw'),
call(weight=0.10, replica_count=3, name='default.rgw.users.keys',
group='objects', app_name='rgw'),
call(weight=0.10, replica_count=3, name='default.rgw.users.email',
group='objects', app_name='rgw'),
call(weight=0.10, replica_count=3, name='default.rgw.users.swift',
group='objects', app_name='rgw'),
call(weight=0.10, replica_count=3, name='default.rgw.users.uid',
group='objects', app_name='rgw'),
call(weight=1.00, replica_count=3,
name='default.rgw.buckets.extra',
group='objects', app_name='rgw'),
call(weight=3.00, replica_count=3,
name='default.rgw.buckets.index',
group='objects', app_name='rgw'),
call(weight=0.10, replica_count=3, name='.rgw.root',
group='objects', app_name='rgw')],
)
mock_request_access.assert_called_with(key_name='radosgw.gateway',
name='objects',
permission='rwx')
@patch.object(utils.apt_pkg, 'version_compare', lambda *args: -1)
@patch.object(utils, 'lsb_release',
lambda: {'DISTRIB_CODENAME': 'trusty'})