cinder/cinder/coordination.py

245 lines
8.0 KiB
Python

# Copyright 2015 Intel
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Coordination and locking utilities."""
import errno
import glob
import inspect
import os
import re
import sys
from typing import Callable, Optional
import uuid
import decorator
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
from tooz import coordination
from cinder import exception
from cinder.i18n import _
from cinder import utils
LOG = log.getLogger(__name__)
coordination_opts = [
cfg.StrOpt('backend_url',
secret=True,
default='file://$state_path',
help='The backend URL to use for distributed coordination.'),
]
CONF = cfg.CONF
CONF.register_opts(coordination_opts, group='coordination')
class Coordinator(object):
"""Tooz coordination wrapper.
Coordination member id is created from concatenated
`prefix` and `agent_id` parameters.
:param str agent_id: Agent identifier
:param str prefix: Used to provide member identifier with a
meaningful prefix.
"""
def __init__(self, agent_id: Optional[str] = None, prefix: str = ''):
self.coordinator = None
self.agent_id = agent_id or str(uuid.uuid4())
self.started = False
self.prefix = prefix
self._file_path = None
def _get_file_path(self, backend_url):
if backend_url.startswith('file://'):
path = backend_url[7:]
# Copied from TooZ's _normalize_path to get the same path they use
if sys.platform == 'win32':
path = re.sub(r'\\(?=\w:\\)', '', os.path.normpath(path))
return os.path.abspath(os.path.join(path, self.prefix))
return None
def start(self) -> None:
if self.started:
return
backend_url = cfg.CONF.coordination.backend_url
# NOTE(bluex): Tooz expects member_id as a byte string.
member_id = (self.prefix + self.agent_id).encode('ascii')
self.coordinator = coordination.get_coordinator(backend_url, member_id)
assert self.coordinator is not None
self.coordinator.start(start_heart=True)
self._file_path = self._get_file_path(backend_url)
self.started = True
def stop(self) -> None:
"""Disconnect from coordination backend and stop heartbeat."""
if self.started:
if self.coordinator is not None:
self.coordinator.stop()
self.coordinator = None
self.started = False
def get_lock(self, name: str):
"""Return a Tooz backend lock.
:param str name: The lock name that is used to identify it
across all nodes.
"""
# NOTE(bluex): Tooz expects lock name as a byte string.
lock_name = (self.prefix + name).encode('ascii')
if self.coordinator is not None:
return self.coordinator.get_lock(lock_name)
else:
raise exception.LockCreationFailed(_('Coordinator uninitialized.'))
def remove_lock(self, glob_name):
# Most locks clean up on release, but not the file lock, so we manually
# clean them.
def _err(file_name: str, exc: Exception) -> None:
LOG.warning('Failed to cleanup lock %(name)s: %(exc)s',
{'name': file_name, 'exc': exc})
if self._file_path:
files = glob.glob(self._file_path + glob_name)
for file_name in files:
try:
os.remove(file_name)
except OSError as exc:
if (exc.errno != errno.ENOENT):
_err(file_name, exc)
except Exception as exc:
_err(file_name, exc)
COORDINATOR = Coordinator(prefix='cinder-')
def synchronized_remove(glob_name, coordinator=COORDINATOR):
coordinator.remove_lock(glob_name)
def __acquire(lock, blocking, f_name):
"""Acquire a lock and return the time when it was acquired."""
t1 = timeutils.now()
name = utils.convert_str(lock.name)
LOG.debug('Acquiring lock "%s" by "%s"', name, f_name)
lock.acquire(blocking)
t2 = timeutils.now()
LOG.debug('Lock "%s" acquired by "%s" :: waited %0.3fs',
name, f_name, t2 - t1)
return t2
def __release(lock, acquired_time, f_name):
"""Release a lock ignoring exceptions."""
name = utils.convert_str(lock.name)
try:
lock.release()
held = timeutils.now() - acquired_time
LOG.debug('Lock "%s" released by "%s" :: held %0.3fs',
name, f_name, held)
except Exception as e:
LOG.error('Failed to release lock "%s": %s', name, e)
def synchronized(*lock_names: str,
blocking: bool = True,
coordinator: Coordinator = COORDINATOR) -> Callable:
"""Synchronization decorator.
:param str lock_names: Arbitrary number of Lock names.
:param blocking: If True, blocks until the lock is acquired.
If False, raises exception when not acquired. Otherwise,
the value is used as a timeout value and if lock is not acquired
after this number of seconds exception is raised. This is a keyword
only argument.
:param coordinator: Coordinator class to use when creating lock.
Defaults to the global coordinator. This is a keyword only argument.
:raises tooz.coordination.LockAcquireFailed: if lock is not acquired
Decorating a method like so::
@synchronized('mylock')
def foo(self, *args):
...
ensures that only one process will execute the foo method at a time.
Different methods can share the same lock::
@synchronized('mylock')
def foo(self, *args):
...
@synchronized('mylock')
def bar(self, *args):
...
This way only one of either foo or bar can be executing at a time.
Lock name can be formatted using Python format string syntax::
@synchronized('{f_name}-{vol.id}-{snap[name]}')
def foo(self, vol, snap):
...
Multiple locks can be requested simultaneously and the decorator will
reorder the names by rendered lock names to prevent potential deadlocks.
@synchronized('{f_name}-{vol.id}-{snap[name]}',
'{f_name}-{vol.id}.delete')
def foo(self, vol, snap):
...
Available field names are: decorated function parameters and
`f_name` as a decorated function name.
"""
@decorator.decorator
def _synchronized(f, *a, **k) -> Callable:
call_args = inspect.getcallargs(f, *a, **k)
call_args['f_name'] = f.__name__
# Prevent deadlocks not duplicating and sorting them by name to always
# acquire them in the same order.
names = sorted(set([name.format(**call_args) for name in lock_names]))
locks = [coordinator.get_lock(name) for name in names]
acquired_times = []
f_name = f.__name__
t1 = timeutils.now()
try:
if len(locks) > 1: # Don't pollute logs for single locks
LOG.debug('Acquiring %s locks by %s', len(locks), f_name)
for lock in locks:
acquired_times.append(__acquire(lock, blocking, f_name))
if len(locks) > 1:
t = timeutils.now() - t1
LOG.debug('Acquired %s locks by %s in %0.3fs',
len(locks), f_name, t)
return f(*a, **k)
finally:
for lock, acquired_time in zip(locks, acquired_times):
__release(lock, acquired_time, f_name)
return _synchronized