6ad30a39e6
* Removed all log translation. * Always lazy-load logging. * Fixed broken log lines and updated tests. Logs should no longer be translated (starting with Pike)[1]. Also, according to OpenStack Guideline[2], logged string message should be interpolated by the logger. [1]: http://lists.openstack.org/pipermail/openstack-dev/2017-March/thread.html#113365 [2]: https://docs.openstack.org/oslo.i18n/latest/user/guidelines.html#adding-variables-to-log-messages Change-Id: I07825694f173a8bebd7d62ade089c38d3c666283
323 lines
11 KiB
Python
323 lines
11 KiB
Python
#
|
|
# Copyright 2014 Red Hat, Inc.
|
|
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
|
#
|
|
# Author: Endre Karlson <endre.karlson@hpe.com>
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import math
|
|
import time
|
|
|
|
from oslo_config import cfg
|
|
from oslo_log import log
|
|
import tenacity
|
|
import tooz.coordination
|
|
|
|
from designate.utils import generate_uuid
|
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
coordination_group = cfg.OptGroup(
|
|
name='coordination', title="Configuration for coordination"
|
|
)
|
|
|
|
coordination_opts = [
|
|
cfg.StrOpt('backend_url',
|
|
help='The backend URL to use for distributed coordination. If '
|
|
'unset services that need coordination will function as '
|
|
'a standalone service. This is a `tooz` url - see '
|
|
'https://docs.openstack.org/tooz/latest/user/compatibility.html'), # noqa
|
|
cfg.FloatOpt('heartbeat_interval',
|
|
default=1.0,
|
|
help='Number of seconds between heartbeats for distributed '
|
|
'coordination.'),
|
|
cfg.FloatOpt('run_watchers_interval',
|
|
default=10.0,
|
|
help='Number of seconds between checks to see if group '
|
|
'membership has changed')
|
|
|
|
]
|
|
cfg.CONF.register_group(coordination_group)
|
|
cfg.CONF.register_opts(coordination_opts, group=coordination_group)
|
|
|
|
CONF = cfg.CONF
|
|
|
|
|
|
def _retry_if_tooz_error(exception):
|
|
"""Return True if we should retry, False otherwise"""
|
|
return isinstance(exception, tooz.coordination.ToozError)
|
|
|
|
|
|
class CoordinationMixin(object):
|
|
def __init__(self, *args, **kwargs):
|
|
super(CoordinationMixin, self).__init__(*args, **kwargs)
|
|
|
|
self._coordinator = None
|
|
|
|
def start(self):
|
|
self._coordination_id = ":".join([CONF.host, generate_uuid()])
|
|
|
|
if CONF.coordination.backend_url is not None:
|
|
backend_url = cfg.CONF.coordination.backend_url
|
|
|
|
self._coordinator = tooz.coordination.get_coordinator(
|
|
backend_url, self._coordination_id)
|
|
self._coordination_started = False
|
|
|
|
self.tg.add_timer(cfg.CONF.coordination.heartbeat_interval,
|
|
self._coordinator_heartbeat)
|
|
self.tg.add_timer(cfg.CONF.coordination.run_watchers_interval,
|
|
self._coordinator_run_watchers)
|
|
|
|
else:
|
|
LOG.warning("No coordination backend configured, distributed "
|
|
"coordination functionality will be disabled. "
|
|
"Please configure a coordination backend.")
|
|
|
|
super(CoordinationMixin, self).start()
|
|
|
|
if self._coordinator is not None:
|
|
while not self._coordination_started:
|
|
try:
|
|
self._coordinator.start()
|
|
|
|
try:
|
|
create_group_req = self._coordinator.create_group(
|
|
self.service_name)
|
|
create_group_req.get()
|
|
except tooz.coordination.GroupAlreadyExist:
|
|
pass
|
|
|
|
join_group_req = self._coordinator.join_group(
|
|
self.service_name)
|
|
join_group_req.get()
|
|
|
|
self._coordination_started = True
|
|
|
|
except Exception:
|
|
LOG.warning("Failed to start Coordinator:", exc_info=True)
|
|
time.sleep(15)
|
|
|
|
def stop(self):
|
|
if self._coordinator is not None:
|
|
self._coordination_started = False
|
|
|
|
leave_group_req = self._coordinator.leave_group(self.service_name)
|
|
leave_group_req.get()
|
|
self._coordinator.stop()
|
|
|
|
super(CoordinationMixin, self).stop()
|
|
|
|
self._coordinator = None
|
|
|
|
def _coordinator_heartbeat(self):
|
|
if not self._coordination_started:
|
|
return
|
|
|
|
try:
|
|
self._coordinator.heartbeat()
|
|
except tooz.coordination.ToozError:
|
|
LOG.exception('Error sending a heartbeat to coordination backend.')
|
|
|
|
def _coordinator_run_watchers(self):
|
|
if not self._coordination_started:
|
|
return
|
|
|
|
self._coordinator.run_watchers()
|
|
|
|
|
|
class Partitioner(object):
|
|
def __init__(self, coordinator, group_id, my_id, partitions):
|
|
self._coordinator = coordinator
|
|
self._group_id = group_id
|
|
self._my_id = my_id
|
|
self._partitions = partitions
|
|
|
|
self._started = False
|
|
self._my_partitions = None
|
|
self._callbacks = []
|
|
|
|
def _warn_no_backend(self):
|
|
LOG.warning('No coordination backend configured, assuming we are '
|
|
'the only worker. Please configure a coordination '
|
|
'backend')
|
|
|
|
@tenacity.retry(stop=tenacity.stop_after_attempt(5),
|
|
wait=tenacity.wait_random(max=2),
|
|
retry=tenacity.retry_if_exception(_retry_if_tooz_error),
|
|
reraise=True)
|
|
def _get_members(self, group_id):
|
|
get_members_req = self._coordinator.get_members(group_id)
|
|
try:
|
|
return get_members_req.get()
|
|
|
|
except tooz.coordination.GroupNotCreated:
|
|
LOG.error('Attempting to partition over a non-existent group: %s',
|
|
self._group_id)
|
|
|
|
raise
|
|
except tooz.coordination.ToozError:
|
|
LOG.error('Error getting group membership info from coordination '
|
|
'backend.')
|
|
raise
|
|
|
|
def _on_group_change(self, event):
|
|
LOG.debug("Received member change %s", event)
|
|
members, self._my_partitions = self._update_partitions()
|
|
|
|
self._run_callbacks(members, event)
|
|
|
|
def _partition(self, members, me, partitions):
|
|
member_count = len(members)
|
|
partition_count = len(partitions)
|
|
partition_size = int(
|
|
math.ceil(float(partition_count) / float(member_count)))
|
|
|
|
my_index = members.index(me)
|
|
my_start = partition_size * my_index
|
|
my_end = my_start + partition_size
|
|
|
|
return partitions[my_start:my_end]
|
|
|
|
def _run_callbacks(self, members, event):
|
|
for cb in self._callbacks:
|
|
cb(self.my_partitions, members, event)
|
|
|
|
def _update_partitions(self):
|
|
# Recalculate partitions - we need to sort the list of members
|
|
# alphabetically so that it's the same order across all nodes.
|
|
members = sorted(list(self._get_members(self._group_id)))
|
|
partitions = self._partition(
|
|
members, self._my_id, self._partitions)
|
|
return members, partitions
|
|
|
|
@property
|
|
def my_partitions(self):
|
|
return self._my_partitions
|
|
|
|
def start(self):
|
|
"""Allow the partitioner to start timers after the coordinator has
|
|
gotten it's connections up.
|
|
"""
|
|
LOG.debug("Starting partitioner")
|
|
if self._coordinator:
|
|
self._coordinator.watch_join_group(
|
|
self._group_id, self._on_group_change)
|
|
self._coordinator.watch_leave_group(
|
|
self._group_id, self._on_group_change)
|
|
|
|
# We need to get our partitions now. Events doesn't help in this
|
|
# case since they require state change in the group that we wont
|
|
# get when starting initially
|
|
self._my_partitions = self._update_partitions()[1]
|
|
else:
|
|
self._my_partitions = self._partitions
|
|
self._run_callbacks(None, None)
|
|
|
|
self._started = True
|
|
|
|
def watch_partition_change(self, callback):
|
|
LOG.debug("Watching for change %s", self._group_id)
|
|
self._callbacks.append(callback)
|
|
if self._started:
|
|
if not self._coordinator:
|
|
self._warn_no_backend()
|
|
callback(self._my_partitions, None, None)
|
|
|
|
def unwatch_partition_change(self, callback):
|
|
self._callbacks.remove(callback)
|
|
|
|
|
|
class LeaderElection(object):
|
|
def __init__(self, coordinator, group_id):
|
|
self._coordinator = coordinator
|
|
self._group_id = group_id
|
|
|
|
self._callbacks = []
|
|
self._started = False
|
|
self._leader = False
|
|
|
|
def _warn_no_backend(self):
|
|
LOG.warning('No coordination backend configured, assuming we are the '
|
|
'leader. Please configure a coordination backend')
|
|
|
|
def start(self):
|
|
self._started = True
|
|
|
|
if self._coordinator:
|
|
LOG.info('Starting leader election for group %(group)s',
|
|
{'group': self._group_id})
|
|
|
|
# Nominate myself for election
|
|
self._coordinator.watch_elected_as_leader(
|
|
self._group_id, self._on_elected_leader)
|
|
else:
|
|
self._warn_no_backend()
|
|
self._leader = True
|
|
|
|
for callback in self._callbacks:
|
|
callback(None)
|
|
|
|
def stop(self):
|
|
self._started = False
|
|
|
|
if self._coordinator:
|
|
LOG.info('Stopping leader election for group %(group)s',
|
|
{'group': self._group_id})
|
|
|
|
# Remove the elected_as_leader callback
|
|
self._coordinator.unwatch_elected_as_leader(
|
|
self._group_id, self._on_elected_leader)
|
|
|
|
if self._leader:
|
|
# Tell Tooz we no longer wish to be the leader
|
|
LOG.info('Standing down as leader candidate for group '
|
|
'%(group)s', {'group': self._group_id})
|
|
self._leader = False
|
|
self._coordinator.stand_down_group_leader(self._group_id)
|
|
|
|
elif self._leader:
|
|
LOG.info('Standing down as leader candidate for group %(group)s',
|
|
{'group': self._group_id})
|
|
self._leader = False
|
|
|
|
@property
|
|
def is_leader(self):
|
|
return self._leader
|
|
|
|
def _on_elected_leader(self, event):
|
|
LOG.info('Successfully elected as leader of group %(group)s',
|
|
{'group': self._group_id})
|
|
self._leader = True
|
|
|
|
for callback in self._callbacks:
|
|
callback(event)
|
|
|
|
def watch_elected_as_leader(self, callback):
|
|
self._callbacks.append(callback)
|
|
|
|
if self._started and self._leader:
|
|
# We're started, and we're the leader, we should trigger the
|
|
# callback
|
|
callback(None)
|
|
|
|
elif self._started and not self._coordinator:
|
|
# We're started, and there's no coordination backend configured,
|
|
# we assume we're leader and call the callback.
|
|
self._warn_no_backend()
|
|
callback(None)
|
|
|
|
def unwatch_elected_as_leader(self, callback):
|
|
self._callbacks.remove(callback)
|