Merge "Tooz integration"

This commit is contained in:
Jenkins 2017-01-20 00:04:56 +00:00 committed by Gerrit Code Review
commit c8ee9fba72
12 changed files with 514 additions and 12 deletions

@ -48,7 +48,8 @@ def main():
host = "%s@%s" % (CONF.host, backend) host = "%s@%s" % (CONF.host, backend)
server = service.Service.create(host=host, server = service.Service.create(host=host,
service_name=backend, service_name=backend,
binary='manila-share') binary='manila-share',
coordination=True)
launcher.launch_service(server) launcher.launch_service(server)
else: else:
server = service.Service.create(binary='manila-share') server = service.Service.create(binary='manila-share')

294
manila/coordination.py Normal file

@ -0,0 +1,294 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tooz Coordination and locking utilities."""
import inspect
import itertools
import random
import decorator
import eventlet
from oslo_config import cfg
from oslo_log import log
from oslo_service import loopingcall
from oslo_utils import uuidutils
import six
from tooz import coordination
from tooz import locking
from manila import exception
from manila.i18n import _, _LE, _LI, _LW
LOG = log.getLogger(__name__)
coordination_opts = [
cfg.StrOpt('backend_url',
default='file://$state_path',
help='The back end URL to use for distributed coordination.'),
cfg.FloatOpt('heartbeat',
default=1.0,
help='Number of seconds between heartbeats for distributed '
'coordination.'),
cfg.FloatOpt('initial_reconnect_backoff',
default=0.1,
help='Initial number of seconds to wait after failed '
'reconnection.'),
cfg.FloatOpt('max_reconnect_backoff',
default=60.0,
help='Maximum number of seconds between sequential '
'reconnection retries.'),
]
CONF = cfg.CONF
CONF.register_opts(coordination_opts, group='coordination')
class Coordinator(object):
"""Tooz coordination wrapper.
Coordination member id is created from concatenated `prefix` and
`agent_id` parameters.
:param str agent_id: Agent identifier
:param str prefix: Used to provide member identifier with a
meaningful prefix.
"""
def __init__(self, agent_id=None, prefix=''):
self.coordinator = None
self.agent_id = agent_id or uuidutils.generate_uuid()
self.started = False
self.prefix = prefix
self._heartbeat_thread = loopingcall.FixedIntervalLoopingCall(
self.heartbeat)
def _is_active(self):
return self.started
def start(self):
"""Connect to coordination back end and start heartbeat."""
if not self._is_active():
try:
self._start()
self.started = True
# NOTE(gouthamr): Start heartbeat in separate thread to avoid
# being blocked by long co-routines.
if self.coordinator and self.coordinator.requires_beating:
LOG.debug("This tooz lock management back end supports "
"heart beats. Spawning a new thread to "
"send regular heart beats.")
self._heartbeat_thread.start(
cfg.CONF.coordination.heartbeat)
else:
LOG.debug("This tooz lock management back end does not "
"support heart beats.")
except coordination.ToozError:
LOG.exception(_LE('Error starting coordination back end.'))
raise
LOG.info(_LI('Coordination back end started successfully.'))
def stop(self):
"""Disconnect from coordination back end and stop heartbeat."""
msg = _('Stopped Coordinator (Agent ID: %(agent)s, prefix: '
'%(prefix)s)')
msg_args = {'agent': self.agent_id, 'prefix': self.prefix}
if self._is_active():
debug_msg = ('Stopping heartbeat thread for coordinator with '
'(Agent ID: %(agent)s, prefix: %(prefix)s).')
LOG.debug(debug_msg, msg_args)
if self._heartbeat_thread is not None:
self._heartbeat_thread.stop()
self._heartbeat_thread = None
self.coordinator.stop()
self.coordinator = None
self.started = False
LOG.info(msg, msg_args)
def get_lock(self, name):
"""Return a Tooz back end lock.
:param str name: The lock name that is used to identify it
across all nodes.
"""
# NOTE(gouthamr): Tooz expects lock name as a byte string
lock_name = (self.prefix + name).encode('ascii')
if self._is_active():
return self.coordinator.get_lock(lock_name)
else:
raise exception.LockCreationFailed(_('Coordinator uninitialized.'))
def heartbeat(self):
"""Coordinator heartbeat.
Method that every couple of seconds (config: `coordination.heartbeat`)
sends heartbeat to prove that the member is not dead.
If connection to coordination back end is broken it tries to
reconnect every couple of seconds
(config: `coordination.initial_reconnect_backoff` up to
`coordination.max_reconnect_backoff`)
"""
if self._is_active():
try:
self._heartbeat()
except coordination.ToozConnectionError:
self._reconnect()
def _start(self):
# NOTE(gouthamr): Tooz expects member_id as a byte string.
member_id = (self.prefix + self.agent_id).encode('ascii')
self.coordinator = coordination.get_coordinator(
cfg.CONF.coordination.backend_url, member_id)
self.coordinator.start()
def _heartbeat(self):
try:
self.coordinator.heartbeat()
except coordination.ToozConnectionError:
LOG.exception(_LE('Connection error while sending a heartbeat '
'to coordination back end.'))
raise
except coordination.ToozError:
LOG.exception(_LE('Error sending a heartbeat to coordination '
'back end.'))
def _reconnect(self):
"""Reconnect with jittered exponential back off."""
LOG.info(_LI('Reconnecting to coordination back end.'))
cap = cfg.CONF.coordination.max_reconnect_backoff
backoff = base = cfg.CONF.coordination.initial_reconnect_backoff
for attempt in itertools.count(1):
try:
self._start()
break
except coordination.ToozError:
backoff = min(cap, random.uniform(base, backoff * 3))
msg = _LW('Reconnect attempt %(attempt)s failed. '
'Next try in %(backoff).2fs.')
LOG.warning(msg, {'attempt': attempt, 'backoff': backoff})
eventlet.sleep(backoff)
LOG.info(_LI('Reconnected to coordination back end.'))
LOCK_COORDINATOR = Coordinator(prefix='manila-')
class Lock(locking.Lock):
"""Lock with dynamic name.
:param str lock_name: Lock name.
:param dict lock_data: Data for lock name formatting.
:param coordinator: Coordinator object to use when creating lock.
Defaults to the global coordinator.
Using it like so::
with Lock('mylock'):
...
ensures that only one process at a time will execute code in context.
Lock name can be formatted using Python format string syntax::
Lock('foo-{share.id}, {'share': ...,}')
Available field names are keys of lock_data.
"""
def __init__(self, lock_name, lock_data=None, coordinator=None):
super(Lock, self).__init__(six.text_type(id(self)))
lock_data = lock_data or {}
self.coordinator = coordinator or LOCK_COORDINATOR
self.blocking = True
self.lock = self._prepare_lock(lock_name, lock_data)
def _prepare_lock(self, lock_name, lock_data):
if not isinstance(lock_name, six.string_types):
raise ValueError(_('Not a valid string: %s') % lock_name)
return self.coordinator.get_lock(lock_name.format(**lock_data))
def acquire(self, blocking=None):
"""Attempts to acquire lock.
:param blocking: If True, blocks until the lock is acquired. If False,
returns right away. Otherwise, the value is used as a timeout
value and the call returns maximum after this number of seconds.
:return: returns true if acquired (false if not)
:rtype: bool
"""
blocking = self.blocking if blocking is None else blocking
return self.lock.acquire(blocking=blocking)
def release(self):
"""Attempts to release lock.
The behavior of releasing a lock which was not acquired in the first
place is undefined.
"""
self.lock.release()
def synchronized(lock_name, blocking=True, coordinator=None):
"""Synchronization decorator.
:param str lock_name: Lock name.
:param blocking: If True, blocks until the lock is acquired.
If False, raises exception when not acquired. Otherwise,
the value is used as a timeout value and if lock is not acquired
after this number of seconds exception is raised.
:param coordinator: Coordinator object to use when creating lock.
Defaults to the global coordinator.
:raises tooz.coordination.LockAcquireFailed: if lock is not acquired
Decorating a method like so::
@synchronized('mylock')
def foo(self, *args):
...
ensures that only one process will execute the foo method at a time.
Different methods can share the same lock::
@synchronized('mylock')
def foo(self, *args):
...
@synchronized('mylock')
def bar(self, *args):
...
This way only one of either foo or bar can be executing at a time.
Lock name can be formatted using Python format string syntax::
@synchronized('{f_name}-{shr.id}-{snap[name]}')
def foo(self, shr, snap):
...
Available field names are: decorated function parameters and
`f_name` as a decorated function name.
"""
@decorator.decorator
def _synchronized(f, *a, **k):
call_args = inspect.getcallargs(f, *a, **k)
call_args['f_name'] = f.__name__
lock = Lock(lock_name, call_args, coordinator)
with lock(blocking):
LOG.debug('Lock "%(name)s" acquired by "%(function)s".',
{'name': lock_name, 'function': f.__name__})
return f(*a, **k)
return _synchronized

@ -828,3 +828,12 @@ class HSPItemNotFoundException(ShareBackendException):
class NexentaException(ShareBackendException): class NexentaException(ShareBackendException):
message = _("Exception due to Nexenta failure. %(reason)s") message = _("Exception due to Nexenta failure. %(reason)s")
# Tooz locking
class LockCreationFailed(ManilaException):
message = _('Unable to create lock. Coordination backend not started.')
class LockingFailed(ManilaException):
message = _('Lock acquisition failed.')

@ -29,6 +29,7 @@ import manila.api.middleware.auth
import manila.common.config import manila.common.config
import manila.compute import manila.compute
import manila.compute.nova import manila.compute.nova
import manila.coordination
import manila.db.api import manila.db.api
import manila.db.base import manila.db.base
import manila.exception import manila.exception
@ -95,6 +96,7 @@ _global_opt_lists = [
manila.common.config.debug_opts, manila.common.config.debug_opts,
manila.common.config.global_opts, manila.common.config.global_opts,
manila.compute._compute_opts, manila.compute._compute_opts,
manila.coordination.coordination_opts,
manila.db.api.db_opts, manila.db.api.db_opts,
[manila.db.base.db_driver_opt], [manila.db.base.db_driver_opt],
manila.exception.exc_log_opts, manila.exception.exc_log_opts,

@ -29,6 +29,7 @@ from oslo_service import service
from oslo_utils import importutils from oslo_utils import importutils
from manila import context from manila import context
from manila import coordination
from manila import db from manila import db
from manila import exception from manila import exception
from manila.i18n import _, _LE, _LI, _LW from manila.i18n import _, _LE, _LI, _LW
@ -75,7 +76,7 @@ class Service(service.Service):
def __init__(self, host, binary, topic, manager, report_interval=None, def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_interval=None, periodic_fuzzy_delay=None, periodic_interval=None, periodic_fuzzy_delay=None,
service_name=None, *args, **kwargs): service_name=None, coordination=False, *args, **kwargs):
super(Service, self).__init__() super(Service, self).__init__()
if not rpc.initialized(): if not rpc.initialized():
rpc.init(CONF) rpc.init(CONF)
@ -92,6 +93,7 @@ class Service(service.Service):
self.periodic_fuzzy_delay = periodic_fuzzy_delay self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.saved_args, self.saved_kwargs = args, kwargs self.saved_args, self.saved_kwargs = args, kwargs
self.timers = [] self.timers = []
self.coordinator = coordination
def start(self): def start(self):
version_string = version.version_string() version_string = version.version_string()
@ -99,6 +101,10 @@ class Service(service.Service):
{'topic': self.topic, 'version_string': version_string}) {'topic': self.topic, 'version_string': version_string})
self.model_disconnected = False self.model_disconnected = False
ctxt = context.get_admin_context() ctxt = context.get_admin_context()
if self.coordinator:
coordination.LOCK_COORDINATOR.start()
try: try:
service_ref = db.service_get_by_args(ctxt, service_ref = db.service_get_by_args(ctxt,
self.host, self.host,
@ -151,7 +157,8 @@ class Service(service.Service):
@classmethod @classmethod
def create(cls, host=None, binary=None, topic=None, manager=None, def create(cls, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_interval=None, report_interval=None, periodic_interval=None,
periodic_fuzzy_delay=None, service_name=None): periodic_fuzzy_delay=None, service_name=None,
coordination=False):
"""Instantiates class and passes back application object. """Instantiates class and passes back application object.
:param host: defaults to CONF.host :param host: defaults to CONF.host
@ -182,7 +189,8 @@ class Service(service.Service):
report_interval=report_interval, report_interval=report_interval,
periodic_interval=periodic_interval, periodic_interval=periodic_interval,
periodic_fuzzy_delay=periodic_fuzzy_delay, periodic_fuzzy_delay=periodic_fuzzy_delay,
service_name=service_name) service_name=service_name,
coordination=coordination)
return service_obj return service_obj
@ -206,6 +214,13 @@ class Service(service.Service):
x.stop() x.stop()
except Exception: except Exception:
pass pass
if self.coordinator:
try:
coordination.LOCK_COORDINATOR.stop()
except Exception:
LOG.exception(_LE("Unable to stop the Tooz Locking "
"Coordinator."))
self.timers = [] self.timers = []
super(Service, self).stop() super(Service, self).stop()

@ -35,6 +35,7 @@ import six
from manila.common import constants from manila.common import constants
from manila import context from manila import context
from manila import coordination
from manila.data import rpcapi as data_rpcapi from manila.data import rpcapi as data_rpcapi
from manila import exception from manila import exception
from manila.i18n import _, _LE, _LI, _LW from manila.i18n import _, _LE, _LI, _LW
@ -144,12 +145,12 @@ def locked_share_replica_operation(operation):
def wrapped(*args, **kwargs): def wrapped(*args, **kwargs):
share_id = kwargs.get('share_id') share_id = kwargs.get('share_id')
@utils.synchronized( @coordination.synchronized(
"locked_share_replica_operation_by_share_%s" % share_id, 'locked-share-replica-operation-for-share-%s' % share_id)
external=True) def locked_replica_operation(*_args, **_kwargs):
def locked_operation(*_args, **_kwargs):
return operation(*_args, **_kwargs) return operation(*_args, **_kwargs)
return locked_operation(*args, **kwargs) return locked_replica_operation(*args, **kwargs)
return wrapped return wrapped

@ -35,6 +35,7 @@ from oslo_utils import uuidutils
import oslotest.base as base_test import oslotest.base as base_test
from manila.api.openstack import api_version_request as api_version from manila.api.openstack import api_version_request as api_version
from manila import coordination
from manila.db import migration from manila.db import migration
from manila.db.sqlalchemy import api as db_api from manila.db.sqlalchemy import api as db_api
from manila.db.sqlalchemy import models as db_models from manila.db.sqlalchemy import models as db_models
@ -145,6 +146,12 @@ class TestCase(base_test.BaseTestCase):
fake_notifier.stub_notifier(self) fake_notifier.stub_notifier(self)
# Locks must be cleaned up after tests
CONF.set_override('backend_url', 'file://' + lock_path,
group='coordination')
coordination.LOCK_COORDINATOR.start()
self.addCleanup(coordination.LOCK_COORDINATOR.stop)
def tearDown(self): def tearDown(self):
"""Runs after each test method to tear down test environment.""" """Runs after each test method to tear down test environment."""
super(TestCase, self).tearDown() super(TestCase, self).tearDown()

@ -57,7 +57,9 @@ class ManilaCmdShareTestCase(test.TestCase):
mock.call( mock.call(
host=fake_host + '@' + backend, host=fake_host + '@' + backend,
service_name=backend, service_name=backend,
binary='manila-share') for backend in backends binary='manila-share',
coordination=True,
) for backend in backends
]) ])
self.launcher.launch_service.assert_has_calls([ self.launcher.launch_service.assert_has_calls([
mock.call(self.server) for backend in backends]) mock.call(self.server) for backend in backends])

@ -27,6 +27,7 @@ import six
from manila.common import constants from manila.common import constants
from manila import context from manila import context
from manila import coordination
from manila.data import rpcapi as data_rpc from manila.data import rpcapi as data_rpc
from manila import db from manila import db
from manila.db.sqlalchemy import models from manila.db.sqlalchemy import models
@ -65,7 +66,7 @@ class LockedOperationsTestCase(test.TestCase):
self.manager = self.FakeManager() self.manager = self.FakeManager()
self.fake_context = test_fakes.FakeRequestContext self.fake_context = test_fakes.FakeRequestContext
self.lock_call = self.mock_object( self.lock_call = self.mock_object(
utils, 'synchronized', mock.Mock(return_value=lambda f: f)) coordination, 'synchronized', mock.Mock(return_value=lambda f: f))
@ddt.data({'id': 'FAKE_REPLICA_ID'}, 'FAKE_REPLICA_ID') @ddt.data({'id': 'FAKE_REPLICA_ID'}, 'FAKE_REPLICA_ID')
@ddt.unpack @ddt.unpack
@ -94,7 +95,7 @@ class ShareManagerTestCase(test.TestCase):
mock.patch.object( mock.patch.object(
lockutils, 'lock', fake_utils.get_fake_lock_context()) lockutils, 'lock', fake_utils.get_fake_lock_context())
self.synchronized_lock_decorator_call = self.mock_object( self.synchronized_lock_decorator_call = self.mock_object(
utils, 'synchronized', mock.Mock(return_value=lambda f: f)) coordination, 'synchronized', mock.Mock(return_value=lambda f: f))
def test_share_manager_instance(self): def test_share_manager_instance(self):
fake_service_name = "fake_service" fake_service_name = "fake_service"

@ -0,0 +1,154 @@
# Copyright 2015 Intel
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
from oslo_service import loopingcall
from tooz import coordination as tooz_coordination
from tooz import locking as tooz_locking
from manila import coordination
from manila import test
class Locked(Exception):
pass
class MockToozLock(tooz_locking.Lock):
active_locks = set()
def acquire(self, blocking=True):
if self.name not in self.active_locks:
self.active_locks.add(self.name)
return True
elif not blocking:
return False
else:
raise Locked
def release(self):
self.active_locks.remove(self.name)
@mock.patch('time.sleep', lambda _: None)
@mock.patch('eventlet.sleep', lambda _: None)
@mock.patch('random.uniform', lambda _a, _b: 0)
@ddt.ddt
class CoordinatorTestCase(test.TestCase):
def setUp(self):
super(CoordinatorTestCase, self).setUp()
self.get_coordinator = self.mock_object(tooz_coordination,
'get_coordinator')
self.heartbeat = self.mock_object(coordination.Coordinator,
'heartbeat')
@ddt.data(True, False)
def test_coordinator_start_with_heartbeat(self, requires_beating):
mock_start_heartbeat = mock.Mock(
loopingcall, 'FixedIntervalLoopingCall').return_value
self.mock_object(loopingcall, 'FixedIntervalLoopingCall',
mock.Mock(return_value=mock_start_heartbeat))
crd = self.get_coordinator.return_value
crd.requires_beating = requires_beating
agent = coordination.Coordinator()
agent.start()
self.assertTrue(self.get_coordinator.called)
self.assertTrue(crd.start.called)
self.assertEqual(requires_beating, mock_start_heartbeat.start.called)
def test_coordinator_stop(self):
crd = self.get_coordinator.return_value
agent = coordination.Coordinator()
agent.start()
self.assertIsNotNone(agent.coordinator)
agent.stop()
self.assertTrue(crd.stop.called)
self.assertIsNone(agent.coordinator)
def test_coordinator_lock(self):
crd = self.get_coordinator.return_value
crd.get_lock.side_effect = lambda n: MockToozLock(n)
agent1 = coordination.Coordinator()
agent1.start()
agent2 = coordination.Coordinator()
agent2.start()
lock_string = 'lock'
expected_lock = lock_string.encode('ascii')
self.assertNotIn(expected_lock, MockToozLock.active_locks)
with agent1.get_lock(lock_string):
self.assertIn(expected_lock, MockToozLock.active_locks)
self.assertRaises(Locked, agent1.get_lock(lock_string).acquire)
self.assertRaises(Locked, agent2.get_lock(lock_string).acquire)
self.assertNotIn(expected_lock, MockToozLock.active_locks)
def test_coordinator_offline(self):
crd = self.get_coordinator.return_value
crd.start.side_effect = tooz_coordination.ToozConnectionError('err')
agent = coordination.Coordinator()
self.assertRaises(tooz_coordination.ToozError, agent.start)
self.assertFalse(agent.started)
self.assertFalse(self.heartbeat.called)
def test_coordinator_reconnect(self):
start_online = iter([True] + [False] * 5 + [True])
heartbeat_online = iter((False, True, True))
def raiser(cond):
if not cond:
raise tooz_coordination.ToozConnectionError('err')
crd = self.get_coordinator.return_value
crd.start.side_effect = lambda *_: raiser(next(start_online))
crd.heartbeat.side_effect = lambda *_: raiser(next(heartbeat_online))
agent = coordination.Coordinator()
agent.start()
self.assertRaises(tooz_coordination.ToozConnectionError,
agent._heartbeat)
self.assertEqual(1, self.get_coordinator.call_count)
agent._reconnect()
self.assertEqual(7, self.get_coordinator.call_count)
agent._heartbeat()
@mock.patch.object(coordination.LOCK_COORDINATOR, 'get_lock')
class CoordinationTestCase(test.TestCase):
def test_lock(self, get_lock):
with coordination.Lock('lock'):
self.assertTrue(get_lock.called)
def test_synchronized(self, get_lock):
@coordination.synchronized('lock-{f_name}-{foo.val}-{bar[val]}')
def func(foo, bar):
pass
foo = mock.Mock()
foo.val = 7
bar = mock.MagicMock()
bar.__getitem__.return_value = 8
func(foo, bar)
get_lock.assert_called_with('lock-func-7-8')

@ -0,0 +1,15 @@
---
features:
- Add support for the tooz library.
- Allow configuration of file/distributed locking for the share manager
service.
upgrade:
- New options are necessary in manila.conf to specify the coordination
back-end URL (for example, a Distributed Locking Manager (DLM) back-end
or a file based lock location). The configuration determines the tooz
driver invoked for the locking/coordination.
fixes:
- Share replication workflows are coordinated by the share-manager service
with the help of the tooz library instead of oslo_concurrency. This
allows for deployers to configure Distributed Locking Management if
multiple manila-share services are run across different nodes.

@ -38,6 +38,7 @@ Routes!=2.0,!=2.3.0,>=1.12.3;python_version!='2.7' # MIT
six>=1.9.0 # MIT six>=1.9.0 # MIT
SQLAlchemy<1.1.0,>=1.0.10 # MIT SQLAlchemy<1.1.0,>=1.0.10 # MIT
stevedore>=1.17.1 # Apache-2.0 stevedore>=1.17.1 # Apache-2.0
tooz>=1.47.0 # Apache-2.0
python-cinderclient!=1.7.0,!=1.7.1,>=1.6.0 # Apache-2.0 python-cinderclient!=1.7.0,!=1.7.1,>=1.6.0 # Apache-2.0
python-novaclient!=2.33.0,>=2.29.0 # Apache-2.0 python-novaclient!=2.33.0,>=2.29.0 # Apache-2.0
WebOb>=1.6.0 # MIT WebOb>=1.6.0 # MIT