coordination: use tooz builtin heartbeat feature
Since a few versions, tooz can manage the heartbeat entirely internally. Use that feature and simplify the manila code. See Cinder change [1]. Closes-Bug: #1691323 [1] I7fa654caf6620d410d4e297d3d8af2215e27ed12 Change-Id: I86ef7a092a3ab7fd0a90ab664b7f9f62a275901b
This commit is contained in:
parent
1a489e8a7b
commit
326755f42c
@ -13,14 +13,10 @@
|
||||
"""Tooz Coordination and locking utilities."""
|
||||
|
||||
import inspect
|
||||
import itertools
|
||||
import random
|
||||
|
||||
import decorator
|
||||
import eventlet
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_service import loopingcall
|
||||
from oslo_utils import uuidutils
|
||||
import six
|
||||
from tooz import coordination
|
||||
@ -40,15 +36,28 @@ coordination_opts = [
|
||||
cfg.FloatOpt('heartbeat',
|
||||
default=1.0,
|
||||
help='Number of seconds between heartbeats for distributed '
|
||||
'coordination.'),
|
||||
'coordination. No longer used since distributed '
|
||||
'coordination manages its heartbeat internally.',
|
||||
deprecated_for_removal=True,
|
||||
deprecated_reason='This option is no longer used.',
|
||||
deprecated_since='16.0.0'),
|
||||
cfg.FloatOpt('initial_reconnect_backoff',
|
||||
default=0.1,
|
||||
help='Initial number of seconds to wait after failed '
|
||||
'reconnection.'),
|
||||
'reconnection. No longer used since distributed '
|
||||
'coordination manages its heartbeat internally.',
|
||||
deprecated_for_removal=True,
|
||||
deprecated_reason='This option is no longer used.',
|
||||
deprecated_since='16.0.0'),
|
||||
cfg.FloatOpt('max_reconnect_backoff',
|
||||
default=60.0,
|
||||
help='Maximum number of seconds between sequential '
|
||||
'reconnection retries.'),
|
||||
'reconnection retries. No longer used since '
|
||||
'distributed coordination manages its heartbeat '
|
||||
'internally.',
|
||||
deprecated_for_removal=True,
|
||||
deprecated_reason='This option is no longer used.',
|
||||
deprecated_since='16.0.0'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -71,47 +80,24 @@ class Coordinator(object):
|
||||
self.agent_id = agent_id or uuidutils.generate_uuid()
|
||||
self.started = False
|
||||
self.prefix = prefix
|
||||
self._heartbeat_thread = loopingcall.FixedIntervalLoopingCall(
|
||||
self.heartbeat)
|
||||
|
||||
def _is_active(self):
|
||||
return self.started
|
||||
|
||||
def start(self):
|
||||
"""Connect to coordination back end and start heartbeat."""
|
||||
if not self._is_active():
|
||||
try:
|
||||
self._start()
|
||||
self.started = True
|
||||
# NOTE(gouthamr): Start heartbeat in separate thread to avoid
|
||||
# being blocked by long co-routines.
|
||||
if self.coordinator and self.coordinator.requires_beating:
|
||||
LOG.debug("This tooz lock management back end supports "
|
||||
"heart beats. Spawning a new thread to "
|
||||
"send regular heart beats.")
|
||||
self._heartbeat_thread.start(
|
||||
cfg.CONF.coordination.heartbeat)
|
||||
else:
|
||||
LOG.debug("This tooz lock management back end does not "
|
||||
"support heart beats.")
|
||||
except coordination.ToozError:
|
||||
LOG.exception('Error starting coordination back end.')
|
||||
raise
|
||||
LOG.info('Coordination back end started successfully.')
|
||||
"""Connect to coordination back end."""
|
||||
if self.started:
|
||||
return
|
||||
|
||||
# NOTE(gouthamr): Tooz expects member_id as a byte string.
|
||||
member_id = (self.prefix + self.agent_id).encode('ascii')
|
||||
self.coordinator = coordination.get_coordinator(
|
||||
cfg.CONF.coordination.backend_url, member_id)
|
||||
self.coordinator.start(start_heart=True)
|
||||
self.started = True
|
||||
|
||||
def stop(self):
|
||||
"""Disconnect from coordination back end and stop heartbeat."""
|
||||
msg = _('Stopped Coordinator (Agent ID: %(agent)s, prefix: '
|
||||
'%(prefix)s)')
|
||||
"""Disconnect from coordination back end."""
|
||||
msg = 'Stopped Coordinator (Agent ID: %(agent)s, prefix: %(prefix)s)'
|
||||
msg_args = {'agent': self.agent_id, 'prefix': self.prefix}
|
||||
if self._is_active():
|
||||
|
||||
debug_msg = ('Stopping heartbeat thread for coordinator with '
|
||||
'(Agent ID: %(agent)s, prefix: %(prefix)s).')
|
||||
LOG.debug(debug_msg, msg_args)
|
||||
if self._heartbeat_thread is not None:
|
||||
self._heartbeat_thread.stop()
|
||||
self._heartbeat_thread = None
|
||||
if self.started:
|
||||
self.coordinator.stop()
|
||||
self.coordinator = None
|
||||
self.started = False
|
||||
@ -126,64 +112,11 @@ class Coordinator(object):
|
||||
"""
|
||||
# NOTE(gouthamr): Tooz expects lock name as a byte string
|
||||
lock_name = (self.prefix + name).encode('ascii')
|
||||
if self._is_active():
|
||||
if self.started:
|
||||
return self.coordinator.get_lock(lock_name)
|
||||
else:
|
||||
raise exception.LockCreationFailed(_('Coordinator uninitialized.'))
|
||||
|
||||
def heartbeat(self):
|
||||
"""Coordinator heartbeat.
|
||||
|
||||
Method that every couple of seconds (config: `coordination.heartbeat`)
|
||||
sends heartbeat to prove that the member is not dead.
|
||||
|
||||
If connection to coordination back end is broken it tries to
|
||||
reconnect every couple of seconds
|
||||
(config: `coordination.initial_reconnect_backoff` up to
|
||||
`coordination.max_reconnect_backoff`)
|
||||
|
||||
"""
|
||||
if self._is_active():
|
||||
try:
|
||||
self._heartbeat()
|
||||
except coordination.ToozConnectionError:
|
||||
self._reconnect()
|
||||
|
||||
def _start(self):
|
||||
# NOTE(gouthamr): Tooz expects member_id as a byte string.
|
||||
member_id = (self.prefix + self.agent_id).encode('ascii')
|
||||
self.coordinator = coordination.get_coordinator(
|
||||
cfg.CONF.coordination.backend_url, member_id)
|
||||
self.coordinator.start()
|
||||
|
||||
def _heartbeat(self):
|
||||
try:
|
||||
self.coordinator.heartbeat()
|
||||
except coordination.ToozConnectionError:
|
||||
LOG.exception('Connection error while sending a heartbeat '
|
||||
'to coordination back end.')
|
||||
raise
|
||||
except coordination.ToozError:
|
||||
LOG.exception('Error sending a heartbeat to coordination '
|
||||
'back end.')
|
||||
|
||||
def _reconnect(self):
|
||||
"""Reconnect with jittered exponential back off."""
|
||||
LOG.info('Reconnecting to coordination back end.')
|
||||
cap = cfg.CONF.coordination.max_reconnect_backoff
|
||||
backoff = base = cfg.CONF.coordination.initial_reconnect_backoff
|
||||
for attempt in itertools.count(1):
|
||||
try:
|
||||
self._start()
|
||||
break
|
||||
except coordination.ToozError:
|
||||
backoff = min(cap, random.uniform(base, backoff * 3))
|
||||
msg = ('Reconnect attempt %(attempt)s failed. '
|
||||
'Next try in %(backoff).2fs.')
|
||||
LOG.warning(msg, {'attempt': attempt, 'backoff': backoff})
|
||||
eventlet.sleep(backoff)
|
||||
LOG.info('Reconnected to coordination back end.')
|
||||
|
||||
|
||||
LOCK_COORDINATOR = Coordinator(prefix='manila-')
|
||||
|
||||
|
@ -15,7 +15,6 @@
|
||||
|
||||
import ddt
|
||||
import mock
|
||||
from oslo_service import loopingcall
|
||||
from tooz import coordination as tooz_coordination
|
||||
from tooz import locking as tooz_locking
|
||||
|
||||
@ -43,9 +42,6 @@ class MockToozLock(tooz_locking.Lock):
|
||||
self.active_locks.remove(self.name)
|
||||
|
||||
|
||||
@mock.patch('time.sleep', lambda _: None)
|
||||
@mock.patch('eventlet.sleep', lambda _: None)
|
||||
@mock.patch('random.uniform', lambda _a, _b: 0)
|
||||
@ddt.ddt
|
||||
class CoordinatorTestCase(test.TestCase):
|
||||
|
||||
@ -53,24 +49,16 @@ class CoordinatorTestCase(test.TestCase):
|
||||
super(CoordinatorTestCase, self).setUp()
|
||||
self.get_coordinator = self.mock_object(tooz_coordination,
|
||||
'get_coordinator')
|
||||
self.heartbeat = self.mock_object(coordination.Coordinator,
|
||||
'heartbeat')
|
||||
|
||||
@ddt.data(True, False)
|
||||
def test_coordinator_start_with_heartbeat(self, requires_beating):
|
||||
mock_start_heartbeat = mock.Mock(
|
||||
loopingcall, 'FixedIntervalLoopingCall').return_value
|
||||
self.mock_object(loopingcall, 'FixedIntervalLoopingCall',
|
||||
mock.Mock(return_value=mock_start_heartbeat))
|
||||
def test_coordinator_start(self):
|
||||
crd = self.get_coordinator.return_value
|
||||
crd.requires_beating = requires_beating
|
||||
|
||||
agent = coordination.Coordinator()
|
||||
agent.start()
|
||||
|
||||
self.assertTrue(self.get_coordinator.called)
|
||||
self.assertTrue(crd.start.called)
|
||||
self.assertEqual(requires_beating, mock_start_heartbeat.start.called)
|
||||
self.assertTrue(agent.started)
|
||||
|
||||
def test_coordinator_stop(self):
|
||||
crd = self.get_coordinator.return_value
|
||||
@ -83,6 +71,7 @@ class CoordinatorTestCase(test.TestCase):
|
||||
|
||||
self.assertTrue(crd.stop.called)
|
||||
self.assertIsNone(agent.coordinator)
|
||||
self.assertFalse(agent.started)
|
||||
|
||||
def test_coordinator_lock(self):
|
||||
crd = self.get_coordinator.return_value
|
||||
@ -110,29 +99,6 @@ class CoordinatorTestCase(test.TestCase):
|
||||
agent = coordination.Coordinator()
|
||||
self.assertRaises(tooz_coordination.ToozError, agent.start)
|
||||
self.assertFalse(agent.started)
|
||||
self.assertFalse(self.heartbeat.called)
|
||||
|
||||
def test_coordinator_reconnect(self):
|
||||
start_online = iter([True] + [False] * 5 + [True])
|
||||
heartbeat_online = iter((False, True, True))
|
||||
|
||||
def raiser(cond):
|
||||
if not cond:
|
||||
raise tooz_coordination.ToozConnectionError('err')
|
||||
|
||||
crd = self.get_coordinator.return_value
|
||||
crd.start.side_effect = lambda *_: raiser(next(start_online))
|
||||
crd.heartbeat.side_effect = lambda *_: raiser(next(heartbeat_online))
|
||||
|
||||
agent = coordination.Coordinator()
|
||||
agent.start()
|
||||
|
||||
self.assertRaises(tooz_coordination.ToozConnectionError,
|
||||
agent._heartbeat)
|
||||
self.assertEqual(1, self.get_coordinator.call_count)
|
||||
agent._reconnect()
|
||||
self.assertEqual(7, self.get_coordinator.call_count)
|
||||
agent._heartbeat()
|
||||
|
||||
|
||||
@mock.patch.object(coordination.LOCK_COORDINATOR, 'get_lock')
|
||||
|
Loading…
Reference in New Issue
Block a user