Tooz locks
This change adds support for distributed locks, based on tooz as a part of work required to achieve A/A HA in c-vol service. Co-Authored-By: Gorka Eguileor <geguileo@redhat.com> Co-Authored-By: Michal Dulko <michal.dulko@intel.com> Implements: blueprint cinder-volume-active-active-support Depends-On: I86fa4340f850270b197919896b7f8639c214ceed Change-Id: I52b8d0a05a3dbedc67f3725f9ba6d009b8d1858f
This commit is contained in:
parent
e8efa5b364
commit
d6fabaa6cf
286
cinder/coordination.py
Normal file
286
cinder/coordination.py
Normal file
@ -0,0 +1,286 @@
|
|||||||
|
# Copyright 2015 Intel
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""Coordination and locking utilities."""
|
||||||
|
|
||||||
|
import inspect
|
||||||
|
import random
|
||||||
|
import threading
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
import eventlet
|
||||||
|
from eventlet import tpool
|
||||||
|
import itertools
|
||||||
|
from oslo_config import cfg
|
||||||
|
from oslo_log import log
|
||||||
|
import six
|
||||||
|
from tooz import coordination
|
||||||
|
from tooz import locking
|
||||||
|
|
||||||
|
from cinder import exception
|
||||||
|
from cinder.i18n import _, _LE, _LI, _LW
|
||||||
|
|
||||||
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
coordination_opts = [
|
||||||
|
cfg.StrOpt('backend_url',
|
||||||
|
default='file://$state_path',
|
||||||
|
help='The backend URL to use for distributed coordination.'),
|
||||||
|
cfg.FloatOpt('heartbeat',
|
||||||
|
default=1.0,
|
||||||
|
help='Number of seconds between heartbeats for distributed '
|
||||||
|
'coordination.'),
|
||||||
|
cfg.FloatOpt('initial_reconnect_backoff',
|
||||||
|
default=0.1,
|
||||||
|
help='Initial number of seconds to wait after failed '
|
||||||
|
'reconnection.'),
|
||||||
|
cfg.FloatOpt('max_reconnect_backoff',
|
||||||
|
default=60.0,
|
||||||
|
help='Maximum number of seconds between sequential '
|
||||||
|
'reconnection retries.'),
|
||||||
|
|
||||||
|
]
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
CONF.register_opts(coordination_opts, group='coordination')
|
||||||
|
|
||||||
|
|
||||||
|
class Coordinator(object):
|
||||||
|
"""Tooz coordination wrapper.
|
||||||
|
|
||||||
|
Coordination member id is created from concatenated
|
||||||
|
`prefix` and `agent_id` parameters.
|
||||||
|
|
||||||
|
:param str agent_id: Agent identifier
|
||||||
|
:param str prefix: Used to provide member identifier with a
|
||||||
|
meaningful prefix.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, agent_id=None, prefix=''):
|
||||||
|
self.coordinator = None
|
||||||
|
self.agent_id = agent_id or str(uuid.uuid4())
|
||||||
|
self.started = False
|
||||||
|
self.prefix = prefix
|
||||||
|
self._ev = None
|
||||||
|
self._dead = None
|
||||||
|
|
||||||
|
def is_active(self):
|
||||||
|
return self.coordinator is not None
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
"""Connect to coordination backend and start heartbeat."""
|
||||||
|
if not self.started:
|
||||||
|
try:
|
||||||
|
self._dead = threading.Event()
|
||||||
|
self._start()
|
||||||
|
self.started = True
|
||||||
|
# NOTE(bluex): Start heartbeat in separate thread to avoid
|
||||||
|
# being blocked by long coroutines.
|
||||||
|
if self.coordinator and self.coordinator.requires_beating:
|
||||||
|
self._ev = eventlet.spawn(
|
||||||
|
lambda: tpool.execute(self.heartbeat))
|
||||||
|
except coordination.ToozError:
|
||||||
|
LOG.exception(_LE('Error starting coordination backend.'))
|
||||||
|
raise
|
||||||
|
LOG.info(_LI('Coordination backend started successfully.'))
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""Disconnect from coordination backend and stop heartbeat."""
|
||||||
|
if self.started:
|
||||||
|
self.coordinator.stop()
|
||||||
|
self._dead.set()
|
||||||
|
if self._ev is not None:
|
||||||
|
self._ev.wait()
|
||||||
|
self._ev = None
|
||||||
|
self.coordinator = None
|
||||||
|
self.started = False
|
||||||
|
|
||||||
|
def get_lock(self, name):
|
||||||
|
"""Return a Tooz backend lock.
|
||||||
|
|
||||||
|
:param str name: The lock name that is used to identify it
|
||||||
|
across all nodes.
|
||||||
|
"""
|
||||||
|
if self.coordinator is not None:
|
||||||
|
return self.coordinator.get_lock(self.prefix + name)
|
||||||
|
else:
|
||||||
|
raise exception.LockCreationFailed(_('Coordinator uninitialized.'))
|
||||||
|
|
||||||
|
def heartbeat(self):
|
||||||
|
"""Coordinator heartbeat.
|
||||||
|
|
||||||
|
Method that every couple of seconds (config: `coordination.heartbeat`)
|
||||||
|
sends heartbeat to prove that the member is not dead.
|
||||||
|
|
||||||
|
If connection to coordination backend is broken it tries to
|
||||||
|
reconnect every couple of seconds
|
||||||
|
(config: `coordination.initial_reconnect_backoff` up to
|
||||||
|
`coordination.max_reconnect_backoff`)
|
||||||
|
|
||||||
|
"""
|
||||||
|
while self.coordinator is not None and not self._dead.is_set():
|
||||||
|
try:
|
||||||
|
self._heartbeat()
|
||||||
|
except coordination.ToozConnectionError:
|
||||||
|
self._reconnect()
|
||||||
|
else:
|
||||||
|
self._dead.wait(cfg.CONF.coordination.heartbeat)
|
||||||
|
|
||||||
|
def _start(self):
|
||||||
|
member_id = self.prefix + self.agent_id
|
||||||
|
self.coordinator = coordination.get_coordinator(
|
||||||
|
cfg.CONF.coordination.backend_url, member_id)
|
||||||
|
self.coordinator.start()
|
||||||
|
|
||||||
|
def _heartbeat(self):
|
||||||
|
try:
|
||||||
|
self.coordinator.heartbeat()
|
||||||
|
return True
|
||||||
|
except coordination.ToozConnectionError:
|
||||||
|
LOG.exception(_LE('Connection error while sending a heartbeat '
|
||||||
|
'to coordination backend.'))
|
||||||
|
raise
|
||||||
|
except coordination.ToozError:
|
||||||
|
LOG.exception(_LE('Error sending a heartbeat to coordination '
|
||||||
|
'backend.'))
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _reconnect(self):
|
||||||
|
"""Reconnect with jittered exponential backoff increase."""
|
||||||
|
LOG.info(_LI('Reconnecting to coordination backend.'))
|
||||||
|
cap = cfg.CONF.coordination.max_reconnect_backoff
|
||||||
|
backoff = base = cfg.CONF.coordination.initial_reconnect_backoff
|
||||||
|
for attempt in itertools.count(1):
|
||||||
|
try:
|
||||||
|
self._start()
|
||||||
|
break
|
||||||
|
except coordination.ToozError:
|
||||||
|
backoff = min(cap, random.uniform(base, backoff * 3))
|
||||||
|
msg = _LW('Reconnect attempt %(attempt)s failed. '
|
||||||
|
'Next try in %(backoff).2fs.')
|
||||||
|
LOG.warning(msg, {'attempt': attempt, 'backoff': backoff})
|
||||||
|
self._dead.wait(backoff)
|
||||||
|
LOG.info(_LI('Reconnected to coordination backend.'))
|
||||||
|
|
||||||
|
|
||||||
|
COORDINATOR = Coordinator(prefix='cinder-')
|
||||||
|
|
||||||
|
|
||||||
|
class Lock(locking.Lock):
|
||||||
|
"""Lock with dynamic name.
|
||||||
|
|
||||||
|
:param str lock_name: Lock name.
|
||||||
|
:param dict lock_data: Data for lock name formatting.
|
||||||
|
:param coordinator: Coordinator class to use when creating lock.
|
||||||
|
Defaults to the global coordinator.
|
||||||
|
|
||||||
|
Using it like so::
|
||||||
|
|
||||||
|
with Lock('mylock'):
|
||||||
|
...
|
||||||
|
|
||||||
|
ensures that only one process at a time will execute code in context.
|
||||||
|
Lock name can be formatted using Python format string syntax::
|
||||||
|
|
||||||
|
Lock('foo-{volume.id}, {'volume': ...,})
|
||||||
|
|
||||||
|
Available field names are keys of lock_data.
|
||||||
|
"""
|
||||||
|
def __init__(self, lock_name, lock_data=None, coordinator=None):
|
||||||
|
super(Lock, self).__init__(str(id(self)))
|
||||||
|
lock_data = lock_data or {}
|
||||||
|
self.coordinator = coordinator or COORDINATOR
|
||||||
|
self.blocking = True
|
||||||
|
self.lock = self._prepare_lock(lock_name, lock_data)
|
||||||
|
|
||||||
|
def _prepare_lock(self, lock_name, lock_data):
|
||||||
|
if not isinstance(lock_name, six.string_types):
|
||||||
|
raise ValueError(_('Not a valid string: %s') % lock_name)
|
||||||
|
return self.coordinator.get_lock(lock_name.format(**lock_data))
|
||||||
|
|
||||||
|
def acquire(self, blocking=None):
|
||||||
|
"""Attempts to acquire lock.
|
||||||
|
|
||||||
|
:param blocking: If True, blocks until the lock is acquired. If False,
|
||||||
|
returns right away. Otherwise, the value is used as a timeout
|
||||||
|
value and the call returns maximum after this number of seconds.
|
||||||
|
:return: returns true if acquired (false if not)
|
||||||
|
:rtype: bool
|
||||||
|
"""
|
||||||
|
blocking = self.blocking if blocking is None else blocking
|
||||||
|
return self.lock.acquire(blocking=blocking)
|
||||||
|
|
||||||
|
def release(self):
|
||||||
|
"""Attempts to release lock.
|
||||||
|
|
||||||
|
The behavior of releasing a lock which was not acquired in the first
|
||||||
|
place is undefined.
|
||||||
|
:return: returns true if released (false if not)
|
||||||
|
:rtype: bool
|
||||||
|
"""
|
||||||
|
self.lock.release()
|
||||||
|
|
||||||
|
|
||||||
|
def synchronized(lock_name, blocking=True, coordinator=None):
|
||||||
|
"""Synchronization decorator.
|
||||||
|
|
||||||
|
:param str lock_name: Lock name.
|
||||||
|
:param blocking: If True, blocks until the lock is acquired.
|
||||||
|
If False, raises exception when not acquired. Otherwise,
|
||||||
|
the value is used as a timeout value and if lock is not acquired
|
||||||
|
after this number of seconds exception is raised.
|
||||||
|
:param coordinator: Coordinator class to use when creating lock.
|
||||||
|
Defaults to the global coordinator.
|
||||||
|
:raises tooz.coordination.LockAcquireFailed: if lock is not acquired
|
||||||
|
|
||||||
|
Decorating a method like so::
|
||||||
|
|
||||||
|
@synchronized('mylock')
|
||||||
|
def foo(self, *args):
|
||||||
|
...
|
||||||
|
|
||||||
|
ensures that only one process will execute the foo method at a time.
|
||||||
|
|
||||||
|
Different methods can share the same lock::
|
||||||
|
|
||||||
|
@synchronized('mylock')
|
||||||
|
def foo(self, *args):
|
||||||
|
...
|
||||||
|
|
||||||
|
@synchronized('mylock')
|
||||||
|
def bar(self, *args):
|
||||||
|
...
|
||||||
|
|
||||||
|
This way only one of either foo or bar can be executing at a time.
|
||||||
|
|
||||||
|
Lock name can be formatted using Python format string syntax::
|
||||||
|
|
||||||
|
@synchronized('{f_name}-{vol.id}-{snap[name]}')
|
||||||
|
def foo(self, vol, snap):
|
||||||
|
...
|
||||||
|
|
||||||
|
Available field names are: decorated function parameters and
|
||||||
|
`f_name` as a decorated function name.
|
||||||
|
"""
|
||||||
|
def wrap(f):
|
||||||
|
@six.wraps(f)
|
||||||
|
def wrapped(*a, **k):
|
||||||
|
call_args = inspect.getcallargs(f, *a, **k)
|
||||||
|
call_args['f_name'] = f.__name__
|
||||||
|
lock = Lock(lock_name, call_args, coordinator)
|
||||||
|
with lock(blocking):
|
||||||
|
return f(*a, **k)
|
||||||
|
return wrapped
|
||||||
|
return wrap
|
@ -702,6 +702,14 @@ class EvaluatorParseException(Exception):
|
|||||||
message = _("Error during evaluator parsing: %(reason)s")
|
message = _("Error during evaluator parsing: %(reason)s")
|
||||||
|
|
||||||
|
|
||||||
|
class LockCreationFailed(CinderException):
|
||||||
|
message = _('Unable to create lock. Coordination backend not started.')
|
||||||
|
|
||||||
|
|
||||||
|
class LockingFailed(CinderException):
|
||||||
|
message = _('Lock acquisition failed.')
|
||||||
|
|
||||||
|
|
||||||
UnsupportedObjectError = obj_exc.UnsupportedObjectError
|
UnsupportedObjectError = obj_exc.UnsupportedObjectError
|
||||||
OrphanedObjectError = obj_exc.OrphanedObjectError
|
OrphanedObjectError = obj_exc.OrphanedObjectError
|
||||||
IncompatibleObjectVersion = obj_exc.IncompatibleObjectVersion
|
IncompatibleObjectVersion = obj_exc.IncompatibleObjectVersion
|
||||||
|
@ -34,6 +34,7 @@ from cinder.common import config as cinder_common_config
|
|||||||
import cinder.compute
|
import cinder.compute
|
||||||
from cinder.compute import nova as cinder_compute_nova
|
from cinder.compute import nova as cinder_compute_nova
|
||||||
from cinder import context as cinder_context
|
from cinder import context as cinder_context
|
||||||
|
from cinder import coordination as cinder_coordination
|
||||||
from cinder.db import api as cinder_db_api
|
from cinder.db import api as cinder_db_api
|
||||||
from cinder.db import base as cinder_db_base
|
from cinder.db import base as cinder_db_base
|
||||||
from cinder import exception as cinder_exception
|
from cinder import exception as cinder_exception
|
||||||
@ -332,6 +333,10 @@ def list_opts():
|
|||||||
cinder_zonemanager_drivers_brocade_brcdfabricopts.
|
cinder_zonemanager_drivers_brocade_brcdfabricopts.
|
||||||
brcd_zone_opts,
|
brcd_zone_opts,
|
||||||
)),
|
)),
|
||||||
|
('COORDINATION',
|
||||||
|
itertools.chain(
|
||||||
|
cinder_coordination.coordination_opts,
|
||||||
|
)),
|
||||||
('BACKEND',
|
('BACKEND',
|
||||||
itertools.chain(
|
itertools.chain(
|
||||||
[cinder_cmd_volume.host_opt],
|
[cinder_cmd_volume.host_opt],
|
||||||
|
131
cinder/tests/unit/test_coordination.py
Normal file
131
cinder/tests/unit/test_coordination.py
Normal file
@ -0,0 +1,131 @@
|
|||||||
|
# Copyright 2015 Intel
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import mock
|
||||||
|
import tooz.coordination
|
||||||
|
import tooz.locking
|
||||||
|
|
||||||
|
from cinder import coordination
|
||||||
|
from cinder import test
|
||||||
|
|
||||||
|
|
||||||
|
class Locked(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class MockToozLock(tooz.locking.Lock):
|
||||||
|
active_locks = set()
|
||||||
|
|
||||||
|
def acquire(self, blocking=True):
|
||||||
|
if self.name not in self.active_locks:
|
||||||
|
self.active_locks.add(self.name)
|
||||||
|
return True
|
||||||
|
elif not blocking:
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
raise Locked
|
||||||
|
|
||||||
|
def release(self):
|
||||||
|
self.active_locks.remove(self.name)
|
||||||
|
|
||||||
|
|
||||||
|
@mock.patch('time.sleep', lambda _: None)
|
||||||
|
@mock.patch('eventlet.spawn', lambda f: f())
|
||||||
|
@mock.patch('eventlet.tpool.execute', lambda f: f())
|
||||||
|
@mock.patch.object(coordination.Coordinator, 'heartbeat')
|
||||||
|
@mock.patch('tooz.coordination.get_coordinator')
|
||||||
|
class CoordinatorTestCase(test.TestCase):
|
||||||
|
def test_coordinator_start(self, get_coordinator, heartbeat):
|
||||||
|
crd = get_coordinator.return_value
|
||||||
|
|
||||||
|
agent = coordination.Coordinator()
|
||||||
|
agent.start()
|
||||||
|
self.assertTrue(get_coordinator.called)
|
||||||
|
self.assertTrue(heartbeat.called)
|
||||||
|
self.assertTrue(crd.start.called)
|
||||||
|
|
||||||
|
def test_coordinator_stop(self, get_coordinator, heartbeat):
|
||||||
|
crd = get_coordinator.return_value
|
||||||
|
|
||||||
|
agent = coordination.Coordinator()
|
||||||
|
agent.start()
|
||||||
|
self.assertIsNotNone(agent.coordinator)
|
||||||
|
agent.stop()
|
||||||
|
self.assertTrue(crd.stop.called)
|
||||||
|
self.assertIsNone(agent.coordinator)
|
||||||
|
|
||||||
|
def test_coordinator_lock(self, get_coordinator, heartbeat):
|
||||||
|
crd = get_coordinator.return_value
|
||||||
|
crd.get_lock.side_effect = lambda n: MockToozLock(n)
|
||||||
|
|
||||||
|
agent1 = coordination.Coordinator()
|
||||||
|
agent1.start()
|
||||||
|
agent2 = coordination.Coordinator()
|
||||||
|
agent2.start()
|
||||||
|
self.assertNotIn('lock', MockToozLock.active_locks)
|
||||||
|
with agent1.get_lock('lock'):
|
||||||
|
self.assertIn('lock', MockToozLock.active_locks)
|
||||||
|
self.assertRaises(Locked, agent1.get_lock('lock').acquire)
|
||||||
|
self.assertRaises(Locked, agent2.get_lock('lock').acquire)
|
||||||
|
self.assertNotIn('lock', MockToozLock.active_locks)
|
||||||
|
|
||||||
|
def test_coordinator_offline(self, get_coordinator, heartbeat):
|
||||||
|
crd = get_coordinator.return_value
|
||||||
|
crd.start.side_effect = tooz.coordination.ToozConnectionError('err')
|
||||||
|
|
||||||
|
agent = coordination.Coordinator()
|
||||||
|
self.assertRaises(tooz.coordination.ToozError, agent.start)
|
||||||
|
self.assertFalse(agent.started)
|
||||||
|
self.assertFalse(heartbeat.called)
|
||||||
|
|
||||||
|
def test_coordinator_reconnect(self, get_coordinator, heartbeat):
|
||||||
|
start_online = iter([True] + [False] * 5 + [True])
|
||||||
|
heartbeat_online = iter((False, True, True))
|
||||||
|
|
||||||
|
def raiser(cond):
|
||||||
|
if not cond:
|
||||||
|
raise tooz.coordination.ToozConnectionError('err')
|
||||||
|
|
||||||
|
crd = get_coordinator.return_value
|
||||||
|
crd.start.side_effect = lambda *_: raiser(next(start_online))
|
||||||
|
crd.heartbeat.side_effect = lambda *_: raiser(next(heartbeat_online))
|
||||||
|
|
||||||
|
agent = coordination.Coordinator()
|
||||||
|
agent.start()
|
||||||
|
self.assertRaises(tooz.coordination.ToozConnectionError,
|
||||||
|
agent._heartbeat)
|
||||||
|
self.assertEqual(1, get_coordinator.call_count)
|
||||||
|
agent._reconnect()
|
||||||
|
self.assertEqual(7, get_coordinator.call_count)
|
||||||
|
agent._heartbeat()
|
||||||
|
|
||||||
|
|
||||||
|
@mock.patch.object(coordination.COORDINATOR, 'get_lock')
|
||||||
|
class CoordinationTestCase(test.TestCase):
|
||||||
|
def test_lock(self, get_lock):
|
||||||
|
with coordination.Lock('lock'):
|
||||||
|
self.assertTrue(get_lock.called)
|
||||||
|
|
||||||
|
def test_synchronized(self, get_lock):
|
||||||
|
@coordination.synchronized('lock-{f_name}-{foo.val}-{bar[val]}')
|
||||||
|
def func(foo, bar):
|
||||||
|
pass
|
||||||
|
|
||||||
|
foo = mock.Mock()
|
||||||
|
foo.val = 7
|
||||||
|
bar = mock.MagicMock()
|
||||||
|
bar.__getitem__.return_value = 8
|
||||||
|
func(foo, bar)
|
||||||
|
get_lock.assert_called_with('lock-func-7-8')
|
4
releasenotes/notes/tooz-locks-0f9f2cc15f8dad5a.yaml
Normal file
4
releasenotes/notes/tooz-locks-0f9f2cc15f8dad5a.yaml
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- Locks may use Tooz as abstraction layer now, to support distributed lock
|
||||||
|
managers and prepare Cinder to better support HA configurations.
|
@ -52,3 +52,4 @@ oslo.i18n>=1.5.0 # Apache-2.0
|
|||||||
oslo.vmware>=1.16.0 # Apache-2.0
|
oslo.vmware>=1.16.0 # Apache-2.0
|
||||||
os-brick>=0.4.0 # Apache-2.0
|
os-brick>=0.4.0 # Apache-2.0
|
||||||
os-win>=0.0.7 # Apache-2.0
|
os-win>=0.0.7 # Apache-2.0
|
||||||
|
tooz>=1.28.0 # Apache-2.0
|
||||||
|
Loading…
Reference in New Issue
Block a user