Implement support for LeaderElections
Based on Tooz, this allows for individual service instances to be elected as a group leader. This is useful for ensuring only a single instance of a service is running a particular task at any given time. Change-Id: Ie5b1d11becc30b58392549a8ff469e5303d7c52f
This commit is contained in:
parent
9030d92ab1
commit
19f6bc0697
@ -23,8 +23,9 @@ from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import tooz.coordination
|
||||
|
||||
from designate.i18n import _LE
|
||||
from designate.i18n import _LI
|
||||
from designate.i18n import _LW
|
||||
from designate.i18n import _LE
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
@ -126,7 +127,7 @@ class Partitioner(object):
|
||||
self._callbacks = []
|
||||
|
||||
def _warn_no_backend(self):
|
||||
LOG.warning(_LW('No coordination backend configure, assuming we are '
|
||||
LOG.warning(_LW('No coordination backend configured, assuming we are '
|
||||
'the only worker. Please configure a coordination '
|
||||
'backend'))
|
||||
|
||||
@ -202,3 +203,93 @@ class Partitioner(object):
|
||||
|
||||
def unwatch_partition_change(self, callback):
|
||||
self._callbacks.remove(callback)
|
||||
|
||||
|
||||
class LeaderElection(object):
|
||||
def __init__(self, coordinator, group_id):
|
||||
self._coordinator = coordinator
|
||||
self._group_id = group_id
|
||||
|
||||
self._callbacks = []
|
||||
self._started = False
|
||||
self._leader = False
|
||||
|
||||
def _warn_no_backend(self):
|
||||
LOG.warning(_LW('No coordination backend configured, assuming we are '
|
||||
'the leader. Please configure a coordination backend'))
|
||||
|
||||
def start(self):
|
||||
self._started = True
|
||||
|
||||
if self._coordinator:
|
||||
LOG.info(_LI('Starting leader election for group %(group)s'),
|
||||
{'group': self._group_id})
|
||||
|
||||
# Nominate myself for election
|
||||
self._coordinator.watch_elected_as_leader(
|
||||
self._group_id, self._on_elected_leader)
|
||||
else:
|
||||
self._warn_no_backend()
|
||||
self._leader = True
|
||||
|
||||
for callback in self._callbacks:
|
||||
callback(None)
|
||||
|
||||
def stop(self):
|
||||
self._started = False
|
||||
|
||||
if self._coordinator:
|
||||
LOG.info(_LI('Stopping leader election for group %(group)s'),
|
||||
{'group': self._group_id})
|
||||
|
||||
try:
|
||||
# Remove the elected_as_leader callback
|
||||
self._coordinator.unwatch_elected_as_leader(
|
||||
self._group_id, self._on_elected_leader)
|
||||
|
||||
except AttributeError:
|
||||
# TODO(kiall): Remove when tooz bug #1467907 is fixed +
|
||||
# released, and is in our requirements.
|
||||
if not self._coordinator._hooks_elected_leader[self._group_id]:
|
||||
del self._coordinator._hooks_elected_leader[self._group_id]
|
||||
|
||||
if self._leader:
|
||||
# Tell Tooz we no longer wish to be the leader
|
||||
LOG.info(_LI('Standing down as leader candidate for group '
|
||||
'%(group)s'), {'group': self._group_id})
|
||||
self._leader = False
|
||||
self._coordinator.stand_down_group_leader(self._group_id)
|
||||
|
||||
elif self._leader:
|
||||
LOG.info(_LI('Standing down as leader candidate for group '
|
||||
'%(group)s'), {'group': self._group_id})
|
||||
self._leader = False
|
||||
|
||||
@property
|
||||
def is_leader(self):
|
||||
return self._leader
|
||||
|
||||
def _on_elected_leader(self, event):
|
||||
LOG.info(_LI('Sucessfully elected as leader of group %(group)s'),
|
||||
{'group': self._group_id})
|
||||
self._leader = True
|
||||
|
||||
for callback in self._callbacks:
|
||||
callback(event)
|
||||
|
||||
def watch_elected_as_leader(self, callback):
|
||||
self._callbacks.append(callback)
|
||||
|
||||
if self._started and self._leader:
|
||||
# We're started, and we're the leader, we should trigger the
|
||||
# callback
|
||||
callback(None)
|
||||
|
||||
elif self._started and not self._coordinator:
|
||||
# We're started, and there's no coordination backend configured,
|
||||
# we assume we're leader and call the callback.
|
||||
self._warn_no_backend()
|
||||
callback(None)
|
||||
|
||||
def unwatch_elected_as_leader(self, callback):
|
||||
self._callbacks.remove(callback)
|
||||
|
@ -199,3 +199,111 @@ class TestPartitionerWithoutBackend(TestCase):
|
||||
partitioner.start()
|
||||
partitioner.watch_partition_change(cb)
|
||||
cb.assert_called_with(partitions, None, None)
|
||||
|
||||
|
||||
class TestLeaderElection(TestCase):
|
||||
def setUp(self):
|
||||
super(TestLeaderElection, self).setUp()
|
||||
|
||||
self.coord_fixture = self.useFixture(fixtures.CoordinatorFixture(
|
||||
'zake://', 'InsertNameHere'))
|
||||
|
||||
self.election = coordination.LeaderElection(
|
||||
self.coordinator, 'President')
|
||||
|
||||
@property
|
||||
def coordinator(self):
|
||||
"""Helper for quick access to the raw coordinator"""
|
||||
return self.coord_fixture.coordinator
|
||||
|
||||
def test_is_leader(self):
|
||||
# We should not be leader until after we start the election.
|
||||
self.assertFalse(self.election.is_leader)
|
||||
|
||||
# Start the election
|
||||
self.election.start()
|
||||
self.coordinator.run_watchers()
|
||||
|
||||
# We should now be the leader.
|
||||
self.assertTrue(self.election.is_leader)
|
||||
|
||||
# Stop the election
|
||||
self.election.stop()
|
||||
|
||||
# We should no longer be the leader.
|
||||
self.assertFalse(self.election.is_leader)
|
||||
|
||||
def test_callbacks(self):
|
||||
# We should not be leader until after we start the election.
|
||||
self.assertFalse(self.election.is_leader)
|
||||
|
||||
# Create and attach a callback
|
||||
mock_callback_one = mock.Mock()
|
||||
self.election.watch_elected_as_leader(mock_callback_one)
|
||||
|
||||
# Ensure the callback has not yet been called.
|
||||
self.assertFalse(mock_callback_one.called)
|
||||
|
||||
# Start the election
|
||||
self.election.start()
|
||||
self.coordinator.run_watchers()
|
||||
|
||||
# Ensure the callback has been called exactly once.
|
||||
self.assertEqual(1, mock_callback_one.call_count)
|
||||
|
||||
# Create and attach a second callback after we start
|
||||
mock_callback_two = mock.Mock()
|
||||
self.election.watch_elected_as_leader(mock_callback_two)
|
||||
|
||||
# Ensure the callback has been called exactly once.
|
||||
self.assertEqual(1, mock_callback_two.call_count)
|
||||
|
||||
|
||||
class TestLeaderElectionWithoutBackend(TestCase):
|
||||
def setUp(self):
|
||||
super(TestLeaderElectionWithoutBackend, self).setUp()
|
||||
|
||||
# coordinator = None indicates no coordination backend has been
|
||||
# configured
|
||||
coordinator = None
|
||||
self.election = coordination.LeaderElection(coordinator, 'President')
|
||||
|
||||
def test_is_leader(self):
|
||||
# We should not be leader until after we start the election.
|
||||
self.assertFalse(self.election.is_leader)
|
||||
|
||||
# Start the election
|
||||
self.election.start()
|
||||
|
||||
# We should now be the leader.
|
||||
self.assertTrue(self.election.is_leader)
|
||||
|
||||
# Stop the election
|
||||
self.election.stop()
|
||||
|
||||
# We should no longer be the leader.
|
||||
self.assertFalse(self.election.is_leader)
|
||||
|
||||
def test_callbacks(self):
|
||||
# We should not be leader until after we start the election.
|
||||
self.assertFalse(self.election.is_leader)
|
||||
|
||||
# Create and attach a callback
|
||||
mock_callback_one = mock.Mock()
|
||||
self.election.watch_elected_as_leader(mock_callback_one)
|
||||
|
||||
# Ensure the callback has not yet been called.
|
||||
self.assertFalse(mock_callback_one.called)
|
||||
|
||||
# Start the election
|
||||
self.election.start()
|
||||
|
||||
# Ensure the callback has been called exactly once.
|
||||
self.assertEqual(1, mock_callback_one.call_count)
|
||||
|
||||
# Create and attach a second callback after we start
|
||||
mock_callback_two = mock.Mock()
|
||||
self.election.watch_elected_as_leader(mock_callback_two)
|
||||
|
||||
# Ensure the callback has been called exactly once.
|
||||
self.assertEqual(1, mock_callback_two.call_count)
|
||||
|
Loading…
x
Reference in New Issue
Block a user