charm-swift-proxy/lib/swift_utils.py
Alex Kavanagh 4336b8d644 Convert charm to Python 3 only
* Needed to add a swift_manager/manager.py file which uses the payload
  software python modules to perform certain functions on behalf of the
  charm.  These were part of the main charm, which couldn't be retained
  in the charm due to the charm changing to Py3.
* Changed to absolute imports using the charm root as the root for all
  charm modules.
* The py2 target in tox.ini is used to test the swift_manager/manager.py
  file only.
* The .testr.conf file has been migrated to .stestr.conf

Change-Id: If37a393aa6ed27651b04810aa0bbf69eda37d7b4
2017-12-06 14:20:25 +00:00

1240 lines
41 KiB
Python

import copy
from collections import OrderedDict
import functools
import glob
import hashlib
import json
import os
import pwd
import shutil
import subprocess
import sys
import tempfile
import threading
import time
import uuid
from lib.swift_context import (
get_swift_hash,
SwiftHashContext,
SwiftIdentityContext,
HAProxyContext,
SwiftRingContext,
ApacheSSLContext,
MemcachedContext,
)
import charmhelpers.contrib.openstack.context as context
import charmhelpers.contrib.openstack.templating as templating
from charmhelpers.contrib.openstack.utils import (
os_release,
get_os_codename_package,
get_os_codename_install_source,
configure_installation_source,
make_assess_status_func,
os_application_version_set,
CompareOpenStackReleases,
)
from charmhelpers.contrib.hahelpers.cluster import (
is_elected_leader,
peer_units,
determine_api_port,
)
from charmhelpers.core.hookenv import (
log,
DEBUG,
ERROR,
INFO,
WARNING,
local_unit,
relation_get,
unit_get,
relation_set,
relation_ids,
related_units,
config,
is_leader,
leader_set,
leader_get,
)
from charmhelpers.fetch import (
apt_update,
apt_upgrade,
apt_install,
add_source
)
from charmhelpers.core.host import (
lsb_release,
CompareHostReleases,
)
from charmhelpers.contrib.network.ip import (
format_ipv6_addr,
get_ipv6_addr,
is_ipv6,
)
from charmhelpers.core.decorators import (
retry_on_exception,
)
# Various config files that are managed via templating.
SWIFT_CONF_DIR = '/etc/swift'
SWIFT_RING_EXT = 'ring.gz'
SWIFT_CONF = os.path.join(SWIFT_CONF_DIR, 'swift.conf')
SWIFT_PROXY_CONF = os.path.join(SWIFT_CONF_DIR, 'proxy-server.conf')
MEMCACHED_CONF = '/etc/memcached.conf'
SWIFT_RINGS_CONF = '/etc/apache2/conf.d/swift-rings'
SWIFT_RINGS_24_CONF = '/etc/apache2/conf-available/swift-rings.conf'
HAPROXY_CONF = '/etc/haproxy/haproxy.cfg'
APACHE_SITES_AVAILABLE = '/etc/apache2/sites-available'
APACHE_SITE_CONF = os.path.join(APACHE_SITES_AVAILABLE,
'openstack_https_frontend')
APACHE_SITE_24_CONF = os.path.join(APACHE_SITES_AVAILABLE,
'openstack_https_frontend.conf')
WWW_DIR = '/var/www/swift-rings'
ALTERNATE_WWW_DIR = '/var/www/html/swift-rings'
RING_SYNC_SEMAPHORE = threading.Semaphore()
VERSION_PACKAGE = 'swift-proxy'
def get_www_dir():
if os.path.isdir(os.path.dirname(ALTERNATE_WWW_DIR)):
return ALTERNATE_WWW_DIR
else:
return WWW_DIR
SWIFT_RINGS = OrderedDict((
('account', os.path.join(SWIFT_CONF_DIR, 'account.builder')),
('container', os.path.join(SWIFT_CONF_DIR, 'container.builder')),
('object', os.path.join(SWIFT_CONF_DIR, 'object.builder')),
))
SSL_CERT = os.path.join(SWIFT_CONF_DIR, 'cert.crt')
SSL_KEY = os.path.join(SWIFT_CONF_DIR, 'cert.key')
# Essex packages
BASE_PACKAGES = [
'swift',
'swift-proxy',
'memcached',
'apache2',
'python-keystone',
]
# > Folsom specific packages
FOLSOM_PACKAGES = BASE_PACKAGES + ['swift-plugin-s3', 'swauth']
# > Mitaka specific packages
MITAKA_PACKAGES = FOLSOM_PACKAGES + ['python-ceilometermiddleware']
SWIFT_HA_RES = 'grp_swift_vips'
TEMPLATES = 'templates/'
# Map config files to hook contexts and services that will be associated
# with file in restart_on_changes()'s service map.
CONFIG_FILES = OrderedDict([
(SWIFT_CONF, {
'hook_contexts': [SwiftHashContext()],
'services': ['swift-proxy'],
}),
(SWIFT_PROXY_CONF, {
'hook_contexts': [SwiftIdentityContext(),
context.BindHostContext(),
context.AMQPContext(ssl_dir=SWIFT_CONF_DIR)],
'services': ['swift-proxy'],
}),
(HAPROXY_CONF, {
'hook_contexts': [context.HAProxyContext(singlenode_mode=True),
HAProxyContext()],
'services': ['haproxy'],
}),
(SWIFT_RINGS_CONF, {
'hook_contexts': [SwiftRingContext()],
'services': ['apache2'],
}),
(SWIFT_RINGS_24_CONF, {
'hook_contexts': [SwiftRingContext()],
'services': ['apache2'],
}),
(APACHE_SITE_CONF, {
'hook_contexts': [ApacheSSLContext()],
'services': ['apache2'],
}),
(APACHE_SITE_24_CONF, {
'hook_contexts': [ApacheSSLContext()],
'services': ['apache2'],
}),
(MEMCACHED_CONF, {
'hook_contexts': [MemcachedContext()],
'services': ['memcached'],
}),
])
class SwiftProxyCharmException(Exception):
pass
class SwiftProxyClusterRPC(object):
"""Provides cluster relation rpc dicts.
Crucially, this ensures that any settings we don't use in any given call
are set to None, therefore removing them from the relation so they don't
get accidentally interpreted by the receiver as part of the request.
NOTE: these are only intended to be used from cluster peer relations.
"""
KEY_STOP_PROXY_SVC = 'stop-proxy-service'
KEY_STOP_PROXY_SVC_ACK = 'stop-proxy-service-ack'
KEY_NOTIFY_LEADER_CHANGED = 'leader-changed-notification'
KEY_REQUEST_RESYNC = 'resync-request'
KEY_REQUEST_RESYNC_ACK = 'resync-request-ack'
def __init__(self, version=1):
self._version = version
@property
def _hostname(self):
hostname = get_hostaddr()
return format_ipv6_addr(hostname) or hostname
def template(self):
# Everything must be None by default so it gets dropped from the
# relation unless we want it to be set.
templates = {1: {'trigger': None,
'broker-token': None,
'builder-broker': None,
'broker-timestamp': None,
self.KEY_STOP_PROXY_SVC: None,
self.KEY_STOP_PROXY_SVC_ACK: None,
self.KEY_NOTIFY_LEADER_CHANGED: None,
self.KEY_REQUEST_RESYNC: None,
'peers-only': None,
'sync-only-builders': None}}
return copy.deepcopy(templates[self._version])
def stop_proxy_request(self, peers_only=False, token=None):
"""Request to stop peer proxy service.
A token can optionally be supplied in case we want to restart a
previously triggered sync e.g. following a leader change notification.
NOTE: this action must only be performed by the cluster leader.
:param peers_only: If True, indicates that we only want peer
(i.e. proxy not storage) units to be notified.
:param token: optional request token expected to be echoed in ACK from
peer. If token not provided, a new one is generated.
"""
if not is_elected_leader(SWIFT_HA_RES):
errmsg = "Leader function called by non-leader"
raise SwiftProxyCharmException(errmsg)
rq = self.template()
if not token:
token = str(uuid.uuid4())
rq['trigger'] = token
rq[self.KEY_STOP_PROXY_SVC] = rq['trigger']
if peers_only:
rq['peers-only'] = 1
rq['builder-broker'] = self._hostname
return rq
def stop_proxy_ack(self, echo_token, echo_peers_only):
"""Ack that peer proxy service is stopped.
NOTE: this action must NOT be performed by the cluster leader.
:param echo_peers_only: peers_only value from leader that we echo back.
:param echo_token: request token received from leader that we must now
echo as an ACK.
"""
rq = self.template()
rq['trigger'] = str(uuid.uuid4())
# These echo values should match those received in the request
rq[self.KEY_STOP_PROXY_SVC_ACK] = echo_token
rq['peers-only'] = echo_peers_only
return rq
def sync_rings_request(self, broker_token, builders_only=False):
"""Request for peers to sync rings from leader.
NOTE: this action must only be performed by the cluster leader.
:param broker_token: token to identify sync request.
:param builders_only: if False, tell peers to sync builders only (not
rings).
"""
if not is_elected_leader(SWIFT_HA_RES):
errmsg = "Leader function called by non-leader"
raise SwiftProxyCharmException(errmsg)
rq = self.template()
rq['trigger'] = str(uuid.uuid4())
if builders_only:
rq['sync-only-builders'] = 1
rq['broker-token'] = broker_token
rq['broker-timestamp'] = "{:f}".format(time.time())
rq['builder-broker'] = self._hostname
return rq
def notify_leader_changed(self, token):
"""Notify peers that leader has changed.
The token passed in must be that associated with the sync we claim to
have been interrupted. It will be re-used by the restored leader once
it receives this notification.
NOTE: this action must only be performed by the cluster leader that
has relinquished it's leader status as part of the current hook
context.
"""
if not is_elected_leader(SWIFT_HA_RES):
errmsg = "Leader function called by non-leader"
raise SwiftProxyCharmException(errmsg)
rq = self.template()
rq['trigger'] = str(uuid.uuid4())
rq[self.KEY_NOTIFY_LEADER_CHANGED] = token
return rq
def request_resync(self, token):
"""Request re-sync from leader.
NOTE: this action must not be performed by the cluster leader.
"""
rq = self.template()
rq['trigger'] = str(uuid.uuid4())
rq[self.KEY_REQUEST_RESYNC] = token
return rq
def try_initialize_swauth():
if is_leader() and config('auth-type') == 'swauth':
if leader_get('swauth-init') is not True:
try:
admin_key = config('swauth-admin-key')
if admin_key == '' or admin_key is None:
admin_key = leader_get('swauth-admin-key')
if admin_key is None:
admin_key = uuid.uuid4()
leader_set({'swauth-admin-key': admin_key})
bind_port = config('bind-port')
bind_port = determine_api_port(bind_port, singlenode_mode=True)
subprocess.check_call([
'swauth-prep',
'-A',
'http://localhost:{}/auth'.format(bind_port),
'-K', admin_key])
leader_set({'swauth-init': True})
except subprocess.CalledProcessError:
log("had a problem initializing swauth!")
def get_first_available_value(responses, key, default=None):
for r in responses:
if key in r:
return r[key]
return default
def all_responses_equal(responses, key, must_exist=True):
"""If key exists in responses, all values for it must be equal.
If all equal return True. If key does not exist and must_exist is True
return False otherwise True.
"""
sentinel = object()
val = None
all_equal = True
for r in responses:
_val = r.get(key, sentinel)
if val is not None and val != _val:
all_equal = False
break
elif _val != sentinel:
val = _val
if must_exist and val is None:
all_equal = False
if all_equal:
return True
log("Responses not all equal for key '{}'".format(key), level=DEBUG)
return False
def register_configs():
"""Register config files with their respective contexts.
Registration of some configs may not be required depending on
existing of certain relations.
"""
# if called without anything installed (eg during install hook)
# just default to earliest supported release. configs dont get touched
# till post-install, anyway.
release = get_os_codename_package('swift-proxy', fatal=False) or 'essex'
configs = templating.OSConfigRenderer(templates_dir=TEMPLATES,
openstack_release=release)
confs = [SWIFT_CONF,
SWIFT_PROXY_CONF,
HAPROXY_CONF,
MEMCACHED_CONF]
for conf in confs:
configs.register(conf, CONFIG_FILES[conf]['hook_contexts'])
if os.path.exists('/etc/apache2/conf-available'):
configs.register(SWIFT_RINGS_24_CONF,
CONFIG_FILES[SWIFT_RINGS_24_CONF]['hook_contexts'])
configs.register(APACHE_SITE_24_CONF,
CONFIG_FILES[APACHE_SITE_24_CONF]['hook_contexts'])
else:
configs.register(SWIFT_RINGS_CONF,
CONFIG_FILES[SWIFT_RINGS_CONF]['hook_contexts'])
configs.register(APACHE_SITE_CONF,
CONFIG_FILES[APACHE_SITE_CONF]['hook_contexts'])
return configs
def restart_map():
"""Determine the correct resource map to be passed to
charmhelpers.core.restart_on_change() based on the services configured.
:returns dict: A dictionary mapping config file to lists of services
that should be restarted when file changes.
"""
_map = []
for f, ctxt in CONFIG_FILES.items():
svcs = []
for svc in ctxt['services']:
svcs.append(svc)
if svcs:
_map.append((f, svcs))
return OrderedDict(_map)
def services():
''' Returns a list of services associate with this charm '''
_services = []
for v in restart_map().values():
_services.extend(v)
return list(set(_services))
def swift_user(username='swift'):
user = pwd.getpwnam(username)
return (user.pw_uid, user.pw_gid)
def ensure_swift_dir(conf_dir=os.path.dirname(SWIFT_CONF)):
if not os.path.isdir(conf_dir):
os.mkdir(conf_dir, 0o750)
uid, gid = swift_user()
os.chown(conf_dir, uid, gid)
def determine_packages(release):
"""Determine what packages are needed for a given OpenStack release."""
cmp_openstack = CompareOpenStackReleases(release)
if cmp_openstack >= 'mitaka':
return MITAKA_PACKAGES
elif cmp_openstack >= 'folsom':
return FOLSOM_PACKAGES
else:
return BASE_PACKAGES
def initialize_ring(path, part_power, replicas, min_hours):
get_manager().initialize_ring(path, part_power, replicas, min_hours)
def exists_in_ring(ring_path, node):
node['port'] = _ring_port(ring_path, node)
result = get_manager().exists_in_ring(ring_path, node)
if result:
log('Node already exists in ring ({}).'
.format(ring_path), level=INFO)
return result
def add_to_ring(ring_path, node):
port = _ring_port(ring_path, node)
# Note: this code used to attempt to calculate new dev ids, but made
# various assumptions (e.g. in order devices, all devices in the ring
# being valid, etc). The RingBuilder from the Swift library will
# automatically calculate the new device's ID if no id is provided (at
# least since the Icehouse release).
new_dev = {
'zone': node['zone'],
'ip': node['ip'],
'port': port,
'device': node['device'],
'weight': 100,
'meta': '',
}
get_manager().add_dev(ring_path, new_dev)
msg = 'Added new device to ring {}: {}'.format(ring_path, new_dev)
log(msg, level=INFO)
def _ring_port(ring_path, node):
"""Determine correct port from relation settings for a given ring file."""
for name in ['account', 'object', 'container']:
if name in ring_path:
return node[('{}_port'.format(name))]
def get_zone(assignment_policy):
"""Determine appropriate zone based on configured assignment policy.
Manual assignment relies on each storage zone being deployed as a
separate service unit with its desired zone set as a configuration
option.
Auto assignment distributes swift-storage machine units across a number
of zones equal to the configured minimum replicas. This allows for a
single swift-storage service unit, with each 'add-unit'd machine unit
being assigned to a different zone.
:param assignment_policy: <string> the policy
:returns: <integer> zone id
"""
if assignment_policy == 'manual':
return relation_get('zone')
elif assignment_policy == 'auto':
_manager = get_manager()
potential_zones = [_manager.get_zone(ring_path)
for ring_path in SWIFT_RINGS.values()]
return set(potential_zones).pop()
else:
raise SwiftProxyCharmException(
'Invalid zone assignment policy: {}'.format(assignment_policy))
def balance_ring(ring_path):
"""Balance a ring.
Returns True if it needs redistribution.
"""
# shell out to swift-ring-builder instead, since the balancing code there
# does a bunch of un-importable validation.
cmd = ['swift-ring-builder', ring_path, 'rebalance']
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError as e:
if e.returncode == 1:
# Ring builder exit-code=1 is supposed to indicate warning but I
# have noticed that it can also return 1 with the following sort of
# message:
#
# NOTE: Balance of 166.67 indicates you should push this ring,
# wait at least 0 hours, and rebalance/repush.
#
# This indicates that a balance has occurred and a resync would be
# required so not sure why 1 is returned in this case.
return False
raise SwiftProxyCharmException(
'balance_ring: {} returned {}'.format(cmd, e.returncode))
# return True if it needs redistribution
return True
def should_balance(rings):
"""Determine whether or not a re-balance is required and allowed.
Ring balance can be disabled/postponed using the disable-ring-balance
config option.
Otherwise, using zones vs min. replicas, determine whether or not the rings
should be balanced.
"""
if config('disable-ring-balance'):
return False
return has_minimum_zones(rings)
def do_openstack_upgrade(configs):
new_src = config('openstack-origin')
new_os_rel = get_os_codename_install_source(new_src)
log('Performing OpenStack upgrade to {}.'.format(new_os_rel), level=DEBUG)
configure_installation_source(new_src)
dpkg_opts = [
'--option', 'Dpkg::Options::=--force-confnew',
'--option', 'Dpkg::Options::=--force-confdef',
]
apt_update()
apt_upgrade(options=dpkg_opts, fatal=True, dist=True)
configs.set_release(openstack_release=new_os_rel)
configs.write_all()
def setup_ipv6():
"""Validate that we can support IPv6 mode.
This should be called if prefer-ipv6 is True to ensure that we are running
in an environment that supports ipv6.
"""
ubuntu_rel = lsb_release()['DISTRIB_CODENAME'].lower()
if CompareHostReleases(ubuntu_rel) < "trusty":
msg = ("IPv6 is not supported in the charms for Ubuntu versions less "
"than Trusty 14.04")
raise SwiftProxyCharmException(msg)
# Need haproxy >= 1.5.3 for ipv6 so for Trusty if we are <= Kilo we need to
# use trusty-backports otherwise we can use the UCA.
if (ubuntu_rel == 'trusty' and
CompareOpenStackReleases(os_release('swift-proxy')) < 'liberty'):
add_source('deb http://archive.ubuntu.com/ubuntu trusty-backports '
'main')
apt_update()
apt_install('haproxy/trusty-backports', fatal=True)
@retry_on_exception(3, base_delay=2, exc_type=subprocess.CalledProcessError)
def sync_proxy_rings(broker_url, builders=True, rings=True):
"""The leader proxy is responsible for intialising, updating and
rebalancing the ring. Once the leader is ready the rings must then be
synced into each other proxy unit.
Note that we sync the ring builder and .gz files since the builder itself
is linked to the underlying .gz ring.
"""
log('Fetching swift rings & builders from proxy @ {}.'.format(broker_url),
level=DEBUG)
target = SWIFT_CONF_DIR
synced = []
tmpdir = tempfile.mkdtemp(prefix='swiftrings')
try:
for server in ['account', 'object', 'container']:
if builders:
url = '{}/{}.builder'.format(broker_url, server)
log('Fetching {}.'.format(url), level=DEBUG)
builder = "{}.builder".format(server)
cmd = ['wget', url, '--retry-connrefused', '-t', '10', '-O',
os.path.join(tmpdir, builder)]
subprocess.check_call(cmd)
synced.append(builder)
if rings:
url = '{}/{}.{}'.format(broker_url, server, SWIFT_RING_EXT)
log('Fetching {}.'.format(url), level=DEBUG)
ring = '{}.{}'.format(server, SWIFT_RING_EXT)
cmd = ['wget', url, '--retry-connrefused', '-t', '10', '-O',
os.path.join(tmpdir, ring)]
subprocess.check_call(cmd)
synced.append(ring)
# Once all have been successfully downloaded, move them to actual
# location.
for f in synced:
os.rename(os.path.join(tmpdir, f), os.path.join(target, f))
finally:
shutil.rmtree(tmpdir)
def ensure_www_dir_permissions(www_dir):
if not os.path.isdir(www_dir):
os.mkdir(www_dir, 0o755)
else:
os.chmod(www_dir, 0o755)
uid, gid = swift_user()
os.chown(www_dir, uid, gid)
def update_www_rings(rings=True, builders=True):
"""Copy rings to apache www dir.
Try to do this as atomically as possible to avoid races with storage nodes
syncing rings.
"""
if not (rings or builders):
return
tmp_dir = tempfile.mkdtemp(prefix='swift-rings-www-tmp')
for ring, builder_path in SWIFT_RINGS.items():
if rings:
ringfile = '{}.{}'.format(ring, SWIFT_RING_EXT)
src = os.path.join(SWIFT_CONF_DIR, ringfile)
dst = os.path.join(tmp_dir, ringfile)
shutil.copyfile(src, dst)
if builders:
src = builder_path
dst = os.path.join(tmp_dir, os.path.basename(builder_path))
shutil.copyfile(src, dst)
www_dir = get_www_dir()
deleted = "{}.deleted".format(www_dir)
ensure_www_dir_permissions(tmp_dir)
os.rename(www_dir, deleted)
os.rename(tmp_dir, www_dir)
shutil.rmtree(deleted)
def get_rings_checksum():
"""Returns sha256 checksum for rings in /etc/swift."""
sha = hashlib.sha256()
for ring in SWIFT_RINGS.keys():
path = os.path.join(SWIFT_CONF_DIR, '{}.{}'
.format(ring, SWIFT_RING_EXT))
if not os.path.isfile(path):
continue
with open(path, 'rb') as fd:
sha.update(fd.read())
return sha.hexdigest()
def get_builders_checksum():
"""Returns sha256 checksum for builders in /etc/swift."""
sha = hashlib.sha256()
for builder in SWIFT_RINGS.values():
if not os.path.exists(builder):
continue
with open(builder, 'rb') as fd:
sha.update(fd.read())
return sha.hexdigest()
def non_null_unique(data):
"""Return True if data is a list containing all non-null values that are
all equal.
"""
return (all(data) and len(set(data)) > 1)
def previously_synced():
"""If a full sync is not known to have been performed from this unit, False
is returned."""
broker = None
for rid in relation_ids('cluster'):
broker = relation_get(attribute='builder-broker', rid=rid,
unit=local_unit())
only_builders_synced = relation_get(attribute='sync-only-builders',
rid=rid, unit=local_unit())
if broker and not only_builders_synced:
return True
return False
def sync_builders_and_rings_if_changed(f):
"""Only trigger a ring or builder sync if they have changed as a result of
the decorated operation.
"""
@functools.wraps(f)
def _inner_sync_builders_and_rings_if_changed(*args, **kwargs):
if not is_elected_leader(SWIFT_HA_RES):
log("Sync rings called by non-leader - skipping", level=WARNING)
return
try:
# Ensure we don't do a double sync if we are nested.
do_sync = False
if RING_SYNC_SEMAPHORE.acquire(blocking=0):
do_sync = True
rings_before = get_rings_checksum()
builders_before = get_builders_checksum()
ret = f(*args, **kwargs)
if not do_sync:
return ret
rings_after = get_rings_checksum()
builders_after = get_builders_checksum()
rings_path = os.path.join(SWIFT_CONF_DIR, '*.{}'
.format(SWIFT_RING_EXT))
rings_ready = len(glob.glob(rings_path)) == len(SWIFT_RINGS)
rings_changed = ((rings_after != rings_before) or
not previously_synced())
builders_changed = builders_after != builders_before
if rings_changed or builders_changed:
# Copy builders and rings (if available) to the server dir.
update_www_rings(rings=rings_ready)
if rings_changed and rings_ready:
# Trigger sync
cluster_sync_rings(peers_only=not rings_changed)
else:
log("Rings not ready for sync - syncing builders",
level=DEBUG)
cluster_sync_rings(peers_only=True, builders_only=True)
else:
log("Rings/builders unchanged - skipping sync", level=DEBUG)
return ret
finally:
RING_SYNC_SEMAPHORE.release()
return _inner_sync_builders_and_rings_if_changed
@sync_builders_and_rings_if_changed
def update_rings(nodes=None, min_part_hours=None):
"""Update builder with node settings and balance rings if necessary.
Also update min_part_hours if provided.
"""
if not is_elected_leader(SWIFT_HA_RES):
log("Update rings called by non-leader - skipping", level=WARNING)
return
balance_required = False
if min_part_hours is not None:
# NOTE: no need to stop the proxy since we are not changing the rings,
# only the builder.
# Only update if all exist
if all(os.path.exists(p) for p in SWIFT_RINGS.values()):
for ring, path in SWIFT_RINGS.items():
current_min_part_hours = get_min_part_hours(path)
if min_part_hours != current_min_part_hours:
log("Setting ring {} min_part_hours to {}"
.format(ring, min_part_hours), level=INFO)
try:
set_min_part_hours(path, min_part_hours)
except SwiftProxyCharmException as exc:
# TODO: ignore for now since this should not be
# critical but in the future we should support a
# rollback.
log(str(exc), level=WARNING)
else:
balance_required = True
if nodes is not None:
for node in nodes:
for ring in SWIFT_RINGS.values():
if not exists_in_ring(ring, node):
add_to_ring(ring, node)
balance_required = True
if balance_required:
balance_rings()
def get_min_part_hours(path):
"""Just a proxy to the manager.py:get_min_part_hours() function
:param path: the path to get the min_part_hours for
:returns: integer
"""
return get_manager().get_min_part_hours(path)
def set_min_part_hours(path, value):
cmd = ['swift-ring-builder', path, 'set_min_part_hours', str(value)]
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
raise SwiftProxyCharmException(
"Failed to set min_part_hours={} on {}".format(value, path))
@sync_builders_and_rings_if_changed
def balance_rings():
"""Rebalance each ring and notify peers that new rings are available."""
if not is_elected_leader(SWIFT_HA_RES):
log("Balance rings called by non-leader - skipping", level=WARNING)
return
if not should_balance([r for r in SWIFT_RINGS.values()]):
log("Not yet ready to balance rings - insufficient replicas?",
level=INFO)
return
rebalanced = False
log("Rebalancing rings", level=INFO)
for path in SWIFT_RINGS.values():
if balance_ring(path):
log('Balanced ring {}'.format(path), level=DEBUG)
rebalanced = True
else:
log('Ring {} not rebalanced'.format(path), level=DEBUG)
if not rebalanced:
log("Rings unchanged by rebalance", level=DEBUG)
# NOTE: checksum will tell for sure
def mark_www_rings_deleted():
"""Mark any rings from the apache server directory as deleted so that
storage units won't see them.
"""
www_dir = get_www_dir()
for ring in SWIFT_RINGS.keys():
path = os.path.join(www_dir, '{}.ring.gz'.format(ring))
if os.path.exists(path):
os.rename(path, "{}.deleted".format(path))
def notify_peers_builders_available(broker_token, builders_only=False):
"""Notify peer swift-proxy units that they should synchronise ring and
builder files.
Note that this should only be called from the leader unit.
"""
if not is_elected_leader(SWIFT_HA_RES):
log("Ring availability peer broadcast requested by non-leader - "
"skipping", level=WARNING)
return
if not broker_token:
log("No broker token - aborting sync", level=WARNING)
return
cluster_rids = relation_ids('cluster')
if not cluster_rids:
log("Cluster relation not yet available - skipping sync", level=DEBUG)
return
if builders_only:
_type = "builders"
else:
_type = "builders & rings"
# Notify peers that builders are available
log("Notifying peer(s) that {} are ready for sync."
.format(_type), level=INFO)
rq = SwiftProxyClusterRPC().sync_rings_request(broker_token,
builders_only=builders_only)
for rid in cluster_rids:
log("Notifying rid={} ({})".format(rid, rq), level=DEBUG)
relation_set(relation_id=rid, relation_settings=rq)
def broadcast_rings_available(storage=True, builders_only=False,
broker_token=None):
"""Notify storage relations and cluster (peer) relations that rings and
builders are availble for sync.
We can opt to only notify peer or storage relations if needs be.
"""
if storage:
# TODO: get ack from storage units that they are synced before
# syncing proxies.
notify_storage_rings_available()
else:
log("Skipping notify storage relations", level=DEBUG)
# Always set peer info even if not clustered so that info is present when
# units join
notify_peers_builders_available(broker_token,
builders_only=builders_only)
def cluster_sync_rings(peers_only=False, builders_only=False, token=None):
"""Notify peer relations that they should stop their proxy services.
Peer units will then be expected to do a relation_set with
stop-proxy-service-ack set rq value. Once all peers have responded, the
leader will send out notification to all relations that rings are available
for sync.
If peers_only is True, only peer units will be synced. This is typically
used when only builder files have been changed.
This should only be called by the leader unit.
"""
if not is_elected_leader(SWIFT_HA_RES):
# Only the leader can do this.
return
if not peer_units():
# If we have no peer units just go ahead and broadcast to storage
# relations. If we have been instructed to only broadcast to peers this
# should do nothing.
broadcast_rings_available(broker_token=str(uuid.uuid4()),
storage=not peers_only)
return
elif builders_only:
if not token:
token = str(uuid.uuid4())
# No need to stop proxies if only syncing builders between peers.
broadcast_rings_available(storage=False, builders_only=True,
broker_token=token)
return
log("Sending stop proxy service request to all peers", level=INFO)
rq = SwiftProxyClusterRPC().stop_proxy_request(peers_only, token=token)
for rid in relation_ids('cluster'):
relation_set(relation_id=rid, relation_settings=rq)
def notify_storage_rings_available():
"""Notify peer swift-storage relations that they should synchronise ring
and builder files.
Note that this should only be called from the leader unit.
"""
if not is_elected_leader(SWIFT_HA_RES):
log("Ring availability storage-relation broadcast requested by "
"non-leader - skipping", level=WARNING)
return
hostname = get_hostaddr()
hostname = format_ipv6_addr(hostname) or hostname
path = os.path.basename(get_www_dir())
rings_url = 'http://{}/{}'.format(hostname, path)
trigger = uuid.uuid4()
# Notify storage nodes that there is a new ring to fetch.
log("Notifying storage nodes that new rings are ready for sync.",
level=INFO)
for relid in relation_ids('swift-storage'):
relation_set(relation_id=relid, swift_hash=get_swift_hash(),
rings_url=rings_url, trigger=trigger)
def fully_synced():
"""Check that we have all the rings and builders synced from the leader.
Returns True if we have all rings and builders.
"""
not_synced = []
for ring, builder in SWIFT_RINGS.items():
if not os.path.exists(builder):
not_synced.append(builder)
ringfile = os.path.join(SWIFT_CONF_DIR,
'{}.{}'.format(ring, SWIFT_RING_EXT))
if not os.path.exists(ringfile):
not_synced.append(ringfile)
if not_synced:
log("Not yet synced: {}".format(', '.join(not_synced), level=INFO))
return False
return True
def get_hostaddr():
if config('prefer-ipv6'):
return get_ipv6_addr(exc_list=[config('vip')])[0]
return unit_get('private-address')
def is_most_recent_timestamp(timestamp):
ts = []
for rid in relation_ids('cluster'):
for unit in related_units(rid):
settings = relation_get(rid=rid, unit=unit)
t = settings.get('broker-timestamp')
if t:
ts.append(t)
if not ts or not timestamp:
return False
return timestamp >= max(ts)
def timestamps_available(excluded_unit):
for rid in relation_ids('cluster'):
for unit in related_units(rid):
if unit == excluded_unit:
continue
settings = relation_get(rid=rid, unit=unit)
if settings.get('broker-timestamp'):
return True
return False
def get_manager():
return ManagerProxy()
class ManagerProxy(object):
def __init__(self, path=None):
self._path = path or []
def __getattribute__(self, attr):
if attr in ['__class__', '_path', 'api_version']:
return super().__getattribute__(attr)
return self.__class__(path=self._path + [attr])
def __call__(self, *args, **kwargs):
# Following line retained commented-out for future debugging
# print("Called: {} ({}, {})".format(self._path, args, kwargs))
return _proxy_manager_call(self._path, args, kwargs)
JSON_ENCODE_OPTIONS = dict(
sort_keys=True,
allow_nan=False,
indent=None,
separators=(',', ':'),
)
def _proxy_manager_call(path, args, kwargs):
package = dict(path=path,
args=args,
kwargs=kwargs)
serialized = json.dumps(package, **JSON_ENCODE_OPTIONS)
script = os.path.abspath(os.path.join(os.path.dirname(__file__),
'..',
'swift_manager',
'manager.py'))
env = os.environ
try:
if sys.version_info < (3, 5):
# remove this after trusty support is removed. No subprocess.run
# in Python 3.4
process = subprocess.Popen([script, serialized],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env)
out, err = process.communicate()
result = json.loads(out.decode('UTF-8'))
else:
completed = subprocess.run([script, serialized],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env)
result = json.loads(completed.stdout.decode('UTF-8'))
if 'error' in result:
s = ("The call within manager.py failed with the error: '{}'. "
"The call was: path={}, args={}, kwargs={}"
.format(result['error'], path, args, kwargs))
log(s, level=ERROR)
raise RuntimeError(s)
return result['result']
except subprocess.CalledProcessError as e:
s = ("manger.py failed when called with path={}, args={}, kwargs={},"
" with the error: {}".format(path, args, kwargs, str(e)))
log(s, level=ERROR)
if sys.version_info < (3, 5):
# remove this after trusty support is removed.
log("stderr was:\n{}\n".format(err.decode('UTF-8')),
level=ERROR)
else:
log("stderr was:\n{}\n".format(completed.stderr.decode('UTF-8')),
level=ERROR)
raise RuntimeError(s)
except Exception as e:
s = ("Decoding the result from the call to manager.py resulted in "
"error '{}' (command: path={}, args={}, kwargs={}"
.format(str(e), path, args, kwargs))
log(s, level=ERROR)
raise RuntimeError(s)
def customer_check_assess_status(configs):
"""Custom check function provided to assess_status() to check the current
status of current unit beyond checking that the relevant services are
running
@param configs: An OSConfigRenderer object of the current services
@returns (status, message): The outcome of the checks.
"""
# Check for required swift-storage relation
if len(relation_ids('swift-storage')) < 1:
return ('blocked', 'Missing relation: storage')
# Verify allowed_hosts is populated with enough unit IP addresses
ctxt = SwiftRingContext()()
if len(ctxt['allowed_hosts']) < config('replicas'):
return ('blocked', 'Not enough related storage nodes')
# Verify there are enough storage zones to satisfy minimum replicas
rings = [r for r in SWIFT_RINGS.values()]
if not has_minimum_zones(rings):
return ('blocked', 'Not enough storage zones for minimum replicas')
if config('prefer-ipv6'):
for rid in relation_ids('swift-storage'):
for unit in related_units(rid):
addr = relation_get(attribute='private-address', unit=unit,
rid=rid)
if not is_ipv6(addr):
return ('blocked',
'Did not get IPv6 address from '
'storage relation (got={})'.format(addr))
return 'active', 'Unit is ready'
def has_minimum_zones(rings):
"""Determine if enough zones exist to satisfy minimum replicas
Uses manager.py as it accesses the ring_builder object in swift
:param rings: the list of ring_paths to check
:returns: Boolean
"""
result = get_manager().has_minimum_zones(rings)
if 'log' in result:
log(result['log'], level=result['level'])
return result['result']
def assess_status(configs, check_services=None):
"""Assess status of current unit
Decides what the state of the unit should be based on the current
configuration.
SIDE EFFECT: calls set_os_workload_status(...) which sets the workload
status of the unit.
Also calls status_set(...) directly if paused state isn't complete.
@param configs: a templating.OSConfigRenderer() object
@returns None - this function is executed for its side-effect
"""
assess_status_func(configs, check_services)()
os_application_version_set(VERSION_PACKAGE)
def assess_status_func(configs, check_services=None):
"""Helper function to create the function that will assess_status() for
the unit.
Uses charmhelpers.contrib.openstack.utils.make_assess_status_func() to
create the appropriate status function and then returns it.
Used directly by assess_status() and also for pausing and resuming
the unit.
NOTE(ajkavanagh) ports are not checked due to race hazards with services
that don't behave sychronously w.r.t their service scripts. e.g.
apache2.
@param configs: a templating.OSConfigRenderer() object
@param check_services: a list of services to check. If None, then
services() is used instead. To not check services pass an empty list.
@return f() -> None : a function that assesses the unit's workload status
"""
if check_services is None:
check_services = services()
required_interfaces = {}
if relation_ids('identity-service'):
required_interfaces['identity'] = ['identity-service']
return make_assess_status_func(
configs, required_interfaces,
charm_func=customer_check_assess_status,
services=check_services, ports=None)