f824bc0565
Before we introduced the DLM synchronization mechanism in the Cinder volume manager we were using Oslo Concurrency synchronized method that logs, at debug level, all lock acquisition and releases, but now that we are using our own synchronized decorator we have lost those logs. Now that we are almost ready to do Active/Active, these log messages will be even more relevant and important. This patch adds debug log messages with the exact same message used in Oslo concurrency for convenience. TrivialFix Change-Id: I7e6aa5b28f289ecf067b4d399bd2fb2ffb189a5f
315 lines
11 KiB
Python
315 lines
11 KiB
Python
# Copyright 2015 Intel
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""Coordination and locking utilities."""
|
|
|
|
import inspect
|
|
import random
|
|
import threading
|
|
import uuid
|
|
|
|
import decorator
|
|
import eventlet
|
|
from eventlet import tpool
|
|
import itertools
|
|
from oslo_config import cfg
|
|
from oslo_log import log
|
|
from oslo_utils import timeutils
|
|
import six
|
|
from tooz import coordination
|
|
from tooz import locking
|
|
|
|
from cinder import exception
|
|
from cinder.i18n import _, _LE, _LI, _LW
|
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
coordination_opts = [
|
|
cfg.StrOpt('backend_url',
|
|
default='file://$state_path',
|
|
help='The backend URL to use for distributed coordination.'),
|
|
cfg.FloatOpt('heartbeat',
|
|
default=1.0,
|
|
help='Number of seconds between heartbeats for distributed '
|
|
'coordination.'),
|
|
cfg.FloatOpt('initial_reconnect_backoff',
|
|
default=0.1,
|
|
help='Initial number of seconds to wait after failed '
|
|
'reconnection.'),
|
|
cfg.FloatOpt('max_reconnect_backoff',
|
|
default=60.0,
|
|
help='Maximum number of seconds between sequential '
|
|
'reconnection retries.'),
|
|
|
|
]
|
|
|
|
CONF = cfg.CONF
|
|
CONF.register_opts(coordination_opts, group='coordination')
|
|
|
|
|
|
class Coordinator(object):
|
|
"""Tooz coordination wrapper.
|
|
|
|
Coordination member id is created from concatenated
|
|
`prefix` and `agent_id` parameters.
|
|
|
|
:param str agent_id: Agent identifier
|
|
:param str prefix: Used to provide member identifier with a
|
|
meaningful prefix.
|
|
"""
|
|
|
|
def __init__(self, agent_id=None, prefix=''):
|
|
self.coordinator = None
|
|
self.agent_id = agent_id or str(uuid.uuid4())
|
|
self.started = False
|
|
self.prefix = prefix
|
|
self._ev = None
|
|
self._dead = None
|
|
|
|
def is_active(self):
|
|
return self.coordinator is not None
|
|
|
|
def start(self):
|
|
"""Connect to coordination backend and start heartbeat."""
|
|
if not self.started:
|
|
try:
|
|
self._dead = threading.Event()
|
|
self._start()
|
|
self.started = True
|
|
# NOTE(bluex): Start heartbeat in separate thread to avoid
|
|
# being blocked by long coroutines.
|
|
if self.coordinator and self.coordinator.requires_beating:
|
|
self._ev = eventlet.spawn(
|
|
lambda: tpool.execute(self.heartbeat))
|
|
except coordination.ToozError:
|
|
LOG.exception(_LE('Error starting coordination backend.'))
|
|
raise
|
|
LOG.info(_LI('Coordination backend started successfully.'))
|
|
|
|
def stop(self):
|
|
"""Disconnect from coordination backend and stop heartbeat."""
|
|
if self.started:
|
|
self.coordinator.stop()
|
|
self._dead.set()
|
|
if self._ev is not None:
|
|
self._ev.wait()
|
|
self._ev = None
|
|
self.coordinator = None
|
|
self.started = False
|
|
|
|
def get_lock(self, name):
|
|
"""Return a Tooz backend lock.
|
|
|
|
:param str name: The lock name that is used to identify it
|
|
across all nodes.
|
|
"""
|
|
# NOTE(bluex): Tooz expects lock name as a byte string.
|
|
lock_name = (self.prefix + name).encode('ascii')
|
|
if self.coordinator is not None:
|
|
return self.coordinator.get_lock(lock_name)
|
|
else:
|
|
raise exception.LockCreationFailed(_('Coordinator uninitialized.'))
|
|
|
|
def heartbeat(self):
|
|
"""Coordinator heartbeat.
|
|
|
|
Method that every couple of seconds (config: `coordination.heartbeat`)
|
|
sends heartbeat to prove that the member is not dead.
|
|
|
|
If connection to coordination backend is broken it tries to
|
|
reconnect every couple of seconds
|
|
(config: `coordination.initial_reconnect_backoff` up to
|
|
`coordination.max_reconnect_backoff`)
|
|
|
|
"""
|
|
while self.coordinator is not None and not self._dead.is_set():
|
|
try:
|
|
self._heartbeat()
|
|
except coordination.ToozConnectionError:
|
|
self._reconnect()
|
|
else:
|
|
self._dead.wait(cfg.CONF.coordination.heartbeat)
|
|
|
|
def _start(self):
|
|
# NOTE(bluex): Tooz expects member_id as a byte string.
|
|
member_id = (self.prefix + self.agent_id).encode('ascii')
|
|
self.coordinator = coordination.get_coordinator(
|
|
cfg.CONF.coordination.backend_url, member_id)
|
|
self.coordinator.start()
|
|
|
|
def _heartbeat(self):
|
|
try:
|
|
self.coordinator.heartbeat()
|
|
return True
|
|
except coordination.ToozConnectionError:
|
|
LOG.exception(_LE('Connection error while sending a heartbeat '
|
|
'to coordination backend.'))
|
|
raise
|
|
except coordination.ToozError:
|
|
LOG.exception(_LE('Error sending a heartbeat to coordination '
|
|
'backend.'))
|
|
return False
|
|
|
|
def _reconnect(self):
|
|
"""Reconnect with jittered exponential backoff increase."""
|
|
LOG.info(_LI('Reconnecting to coordination backend.'))
|
|
cap = cfg.CONF.coordination.max_reconnect_backoff
|
|
backoff = base = cfg.CONF.coordination.initial_reconnect_backoff
|
|
for attempt in itertools.count(1):
|
|
try:
|
|
self._start()
|
|
break
|
|
except coordination.ToozError:
|
|
backoff = min(cap, random.uniform(base, backoff * 3))
|
|
msg = _LW('Reconnect attempt %(attempt)s failed. '
|
|
'Next try in %(backoff).2fs.')
|
|
LOG.warning(msg, {'attempt': attempt, 'backoff': backoff})
|
|
self._dead.wait(backoff)
|
|
LOG.info(_LI('Reconnected to coordination backend.'))
|
|
|
|
|
|
COORDINATOR = Coordinator(prefix='cinder-')
|
|
|
|
|
|
class Lock(locking.Lock):
|
|
"""Lock with dynamic name.
|
|
|
|
:param str lock_name: Lock name.
|
|
:param dict lock_data: Data for lock name formatting.
|
|
:param coordinator: Coordinator class to use when creating lock.
|
|
Defaults to the global coordinator.
|
|
|
|
Using it like so::
|
|
|
|
with Lock('mylock'):
|
|
...
|
|
|
|
ensures that only one process at a time will execute code in context.
|
|
Lock name can be formatted using Python format string syntax::
|
|
|
|
Lock('foo-{volume.id}, {'volume': ...,})
|
|
|
|
Available field names are keys of lock_data.
|
|
"""
|
|
def __init__(self, lock_name, lock_data=None, coordinator=None):
|
|
super(Lock, self).__init__(str(id(self)))
|
|
lock_data = lock_data or {}
|
|
self.coordinator = coordinator or COORDINATOR
|
|
self.blocking = True
|
|
self.lock = self._prepare_lock(lock_name, lock_data)
|
|
|
|
def _prepare_lock(self, lock_name, lock_data):
|
|
if not isinstance(lock_name, six.string_types):
|
|
raise ValueError(_('Not a valid string: %s') % lock_name)
|
|
self._name = lock_name.format(**lock_data)
|
|
return self.coordinator.get_lock(self._name)
|
|
|
|
@property
|
|
def name(self):
|
|
return self._name
|
|
|
|
def acquire(self, blocking=None):
|
|
"""Attempts to acquire lock.
|
|
|
|
:param blocking: If True, blocks until the lock is acquired. If False,
|
|
returns right away. Otherwise, the value is used as a timeout
|
|
value and the call returns maximum after this number of seconds.
|
|
:return: returns true if acquired (false if not)
|
|
:rtype: bool
|
|
"""
|
|
blocking = self.blocking if blocking is None else blocking
|
|
return self.lock.acquire(blocking=blocking)
|
|
|
|
def release(self):
|
|
"""Attempts to release lock.
|
|
|
|
The behavior of releasing a lock which was not acquired in the first
|
|
place is undefined.
|
|
"""
|
|
self.lock.release()
|
|
|
|
|
|
def synchronized(lock_name, blocking=True, coordinator=None):
|
|
"""Synchronization decorator.
|
|
|
|
:param str lock_name: Lock name.
|
|
:param blocking: If True, blocks until the lock is acquired.
|
|
If False, raises exception when not acquired. Otherwise,
|
|
the value is used as a timeout value and if lock is not acquired
|
|
after this number of seconds exception is raised.
|
|
:param coordinator: Coordinator class to use when creating lock.
|
|
Defaults to the global coordinator.
|
|
:raises tooz.coordination.LockAcquireFailed: if lock is not acquired
|
|
|
|
Decorating a method like so::
|
|
|
|
@synchronized('mylock')
|
|
def foo(self, *args):
|
|
...
|
|
|
|
ensures that only one process will execute the foo method at a time.
|
|
|
|
Different methods can share the same lock::
|
|
|
|
@synchronized('mylock')
|
|
def foo(self, *args):
|
|
...
|
|
|
|
@synchronized('mylock')
|
|
def bar(self, *args):
|
|
...
|
|
|
|
This way only one of either foo or bar can be executing at a time.
|
|
|
|
Lock name can be formatted using Python format string syntax::
|
|
|
|
@synchronized('{f_name}-{vol.id}-{snap[name]}')
|
|
def foo(self, vol, snap):
|
|
...
|
|
|
|
Available field names are: decorated function parameters and
|
|
`f_name` as a decorated function name.
|
|
"""
|
|
|
|
@decorator.decorator
|
|
def _synchronized(f, *a, **k):
|
|
call_args = inspect.getcallargs(f, *a, **k)
|
|
call_args['f_name'] = f.__name__
|
|
lock = Lock(lock_name, call_args, coordinator)
|
|
t1 = timeutils.now()
|
|
t2 = None
|
|
try:
|
|
with lock(blocking):
|
|
t2 = timeutils.now()
|
|
LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: '
|
|
'waited %(wait_secs)0.3fs',
|
|
{'name': lock.name,
|
|
'function': f.__name__,
|
|
'wait_secs': (t2 - t1)})
|
|
return f(*a, **k)
|
|
finally:
|
|
t3 = timeutils.now()
|
|
if t2 is None:
|
|
held_secs = "N/A"
|
|
else:
|
|
held_secs = "%0.3fs" % (t3 - t2)
|
|
LOG.debug('Lock "%(name)s" released by "%(function)s" :: held '
|
|
'%(held_secs)s',
|
|
{'name': lock.name,
|
|
'function': f.__name__,
|
|
'held_secs': held_secs})
|
|
|
|
return _synchronized
|