Handle a race between pre-populate and hash ring bootstrapping
On startup, if the worker receives messages for pre-populated resources prior to processing the initial cluster rebalance event, the messages will be dropped. This fixes the race by tracking when the hash ring has been initialized. Any events it receives prior to finishing init will be batched up and processed as part of the initial bootstrapping procedure. Change-Id: I3caf95f57380076ab48e4270e1cd575906fba386 Closes-bug: #1554248
This commit is contained in:
parent
9092133d37
commit
f9c5941b97
|
@ -166,6 +166,11 @@ class HashRingManager(object):
|
|||
|
||||
def __init__(self):
|
||||
self._hosts = []
|
||||
self._balanced = False
|
||||
|
||||
@property
|
||||
def balanced(self):
|
||||
return self._balanced
|
||||
|
||||
@property
|
||||
def ring(self):
|
||||
|
@ -190,8 +195,10 @@ class HashRingManager(object):
|
|||
def reset(cls):
|
||||
with cls._lock:
|
||||
cls._hash_ring = None
|
||||
cls._balanced = False
|
||||
|
||||
def rebalance(self, hosts):
|
||||
self.reset()
|
||||
with self._lock:
|
||||
self._hosts = hosts
|
||||
self._balanced = True
|
||||
|
|
|
@ -117,6 +117,12 @@ class TestWorker(WorkerTestBase):
|
|||
def setUp(self):
|
||||
super(TestWorker, self).setUp()
|
||||
self.config(enabled=True, group='coordination')
|
||||
self._balanced_p = mock.patch.object(
|
||||
self.w, '_ring_balanced')
|
||||
self._mock_balanced = self._balanced_p.start()
|
||||
self._mock_balanced.return_value = True
|
||||
self.addCleanup(mock.patch.stopall)
|
||||
|
||||
self.target = self.tenant_id
|
||||
self.resource = event.Resource(
|
||||
self.driver,
|
||||
|
@ -213,6 +219,14 @@ class TestWorker(WorkerTestBase):
|
|||
self.assertFalse(fake_deliver.called)
|
||||
fake_should_process.assert_called_with(self.target, self.msg)
|
||||
|
||||
@mock.patch('astara.worker.Worker._deliver_message')
|
||||
@mock.patch('astara.worker.Worker._defer_message')
|
||||
def test_handle_message_defer_message(self, fake_defer, fake_deliver):
|
||||
self._mock_balanced.return_value = False
|
||||
self.w.handle_message(self.target, self.msg)
|
||||
fake_defer.assert_called_with(self.target, self.msg)
|
||||
self.assertFalse(fake_deliver.called)
|
||||
|
||||
@mock.patch('astara.worker.hash_ring', autospec=True)
|
||||
def test__should_process_message_does_not_hash(self, fake_hash):
|
||||
fake_ring_manager = fake_hash.HashRingManager()
|
||||
|
@ -394,6 +408,40 @@ class TestWorker(WorkerTestBase):
|
|||
set(['sm1', 'sm2', 'sm3', 'sm4'])
|
||||
)
|
||||
|
||||
@mock.patch('astara.worker.hash_ring', autospec=True)
|
||||
def test__ring_balanced(self, fake_hash):
|
||||
self._balanced_p.stop()
|
||||
fake_ring_manager = fake_hash.HashRingManager()
|
||||
fake_ring_manager.balanced = False
|
||||
self.w.hash_ring_mgr = fake_ring_manager
|
||||
self.assertFalse(self.w._ring_balanced())
|
||||
|
||||
def test__defer_message(self):
|
||||
self.assertEqual(
|
||||
self.w._deferred_messages, [])
|
||||
self.w._defer_message(self.target, self.msg)
|
||||
self.assertEqual(
|
||||
self.w._deferred_messages, [(self.target, self.msg)])
|
||||
|
||||
@mock.patch('astara.worker.Worker.handle_message')
|
||||
def test__replay_deferred_messages_none(self, fakehandle):
|
||||
self.w._deferred_messages = []
|
||||
self.w._replay_deferred_messages()
|
||||
self.assertFalse(fakehandle.called)
|
||||
|
||||
@mock.patch('astara.worker.Worker.handle_message')
|
||||
def test__replay_deferred_messages(self, fake_handle):
|
||||
msgs = [
|
||||
('fake_tgt_1', 'fake_tgt_1'),
|
||||
('fake_tgt_2', 'fake_tgt_2'),
|
||||
('fake_tgt_3', 'fake_tgt_3'),
|
||||
]
|
||||
self.w._deferred_messages = msgs
|
||||
self.w._replay_deferred_messages()
|
||||
exp_calls = [mock.call(t, m) for t, m in msgs]
|
||||
self.assertEqual(
|
||||
fake_handle.call_args_list, exp_calls)
|
||||
|
||||
|
||||
class TestResourceCache(WorkerTestBase):
|
||||
def setUp(self):
|
||||
|
@ -623,6 +671,7 @@ class TestReportStatus(WorkerTestBase):
|
|||
|
||||
def test_handle_message_report_status(self):
|
||||
with mock.patch('astara.worker.cfg.CONF') as conf:
|
||||
conf.coordination = mock.Mock(enabled=False)
|
||||
self.w.handle_message(
|
||||
'debug',
|
||||
event.Event('*', event.COMMAND,
|
||||
|
@ -774,6 +823,7 @@ class TestConfigReload(WorkerTestBase):
|
|||
def test(self, mock_cfg):
|
||||
mock_cfg.CONF = mock.MagicMock(
|
||||
log_opt_values=mock.MagicMock())
|
||||
mock_cfg.CONF.coordination.enabled = False
|
||||
tenant_id = '*'
|
||||
resource_id = '*'
|
||||
msg = event.Event(
|
||||
|
@ -842,8 +892,9 @@ class TestRebalance(WorkerTestBase):
|
|||
body={'key': 'value'},
|
||||
)
|
||||
|
||||
@mock.patch('astara.worker.Worker._replay_deferred_messages')
|
||||
@mock.patch('astara.worker.Worker._repopulate')
|
||||
def test_rebalance_bootstrap(self, fake_repop):
|
||||
def test_rebalance_bootstrap(self, fake_repop, fake_replay):
|
||||
fake_hash = mock.Mock(
|
||||
rebalance=mock.Mock(),
|
||||
)
|
||||
|
@ -858,6 +909,7 @@ class TestRebalance(WorkerTestBase):
|
|||
)
|
||||
self.w.handle_message('*', msg)
|
||||
fake_hash.rebalance.assert_called_with(['foo', 'bar'])
|
||||
self.assertTrue(fake_replay.called)
|
||||
self.assertFalse(fake_repop.called)
|
||||
|
||||
@mock.patch('astara.worker.Worker._add_resource_to_work_queue')
|
||||
|
|
|
@ -189,6 +189,7 @@ class Worker(object):
|
|||
]
|
||||
|
||||
self.hash_ring_mgr = hash_ring.HashRingManager()
|
||||
self._deferred_messages = []
|
||||
|
||||
for t in self.threads:
|
||||
t.setDaemon(True)
|
||||
|
@ -415,10 +416,36 @@ class Worker(object):
|
|||
|
||||
return message
|
||||
|
||||
def _ring_balanced(self):
|
||||
return self.hash_ring_mgr.balanced
|
||||
|
||||
def _defer_message(self, target, message):
|
||||
LOG.debug("Deferring message for %s: %s", target, message)
|
||||
self._deferred_messages.append((target, message))
|
||||
|
||||
def _replay_deferred_messages(self):
|
||||
if not self._deferred_messages:
|
||||
return
|
||||
|
||||
LOG.debug(
|
||||
'Replaying pre-rebalance deferred messages on worker %s',
|
||||
self.proc_name)
|
||||
[self.handle_message(tgt, msg) for tgt, msg in self._deferred_messages]
|
||||
|
||||
def handle_message(self, target, message):
|
||||
"""Callback to be used in main
|
||||
"""
|
||||
LOG.debug('got: %s %r', target, message)
|
||||
|
||||
# If the cluster ring hasnt been seeded yet, we cannot make decisions
|
||||
# about which messages to process. Instead, receive them and defer
|
||||
# handling until we know the ring layout.
|
||||
if (cfg.CONF.coordination.enabled and
|
||||
not self._ring_balanced() and
|
||||
message.crud != event.REBALANCE):
|
||||
self._defer_message(target, message)
|
||||
return
|
||||
|
||||
if target is None:
|
||||
# We got the shutdown instruction from our parent process.
|
||||
self._shutdown()
|
||||
|
@ -495,6 +522,9 @@ class Worker(object):
|
|||
# to a bootstrapping rebalance, we don't need to worry about adjusting
|
||||
# state because there is none yet.
|
||||
if message.body.get('node_bootstrap'):
|
||||
# replay any messages that may have accumulated while we were
|
||||
# waiting to finish cluster bootstrap
|
||||
self._replay_deferred_messages()
|
||||
return
|
||||
|
||||
# track which SMs we initially owned
|
||||
|
|
Loading…
Reference in New Issue