Implement baylock in conductor for horizontal-scale

Implements a distributed bay lock to avoid race conditions when
multiple conductors are deployed. A conductor should acquire a lock
when it is about to perform an operation on a bay. The lock
acquisition will succeed if the bay is not currently locked or the
conductor that owned the lock is dead. An ActionInProgress exception
will be raised if the lock acquisition fails.

Change-Id: I055696c43e356dc6fbd03996f1dd28e3c8c41594
Partial-Implements: blueprint horizontal-scale
This commit is contained in:
Hongbin Lu 2015-04-13 01:33:37 +00:00
parent 848641d5de
commit c5bd2530e8
7 changed files with 327 additions and 8 deletions

View File

@ -123,7 +123,6 @@ function create_magnum_conf {
iniset $MAGNUM_CONF database connection `database_connection_url magnum`
iniset $MAGNUM_CONF api host "$MAGNUM_SERVICE_HOST"
iniset $MAGNUM_CONF api port "$MAGNUM_SERVICE_PORT"
iniset $MAGNUM_CONF conductor host "$MAGNUM_SERVICE_HOST"
configure_auth_token_middleware $MAGNUM_CONF magnum $MAGNUM_AUTH_CACHE_DIR

View File

@ -331,8 +331,9 @@
# The queue to add conductor tasks to (string value)
#topic = magnum-conductor
# The location of the conductor rpc queue (string value)
#host = localhost
# RPC timeout for the conductor liveness check that is used for bay
# locking. (integer value)
#conductor_life_check_timeout = 4
[database]

View File

@ -21,6 +21,7 @@ import sys
from oslo_config import cfg
from magnum.common import rpc_service as service
from magnum.common import short_id
from magnum.conductor.handlers import bay_k8s_heat
from magnum.conductor.handlers import conductor_listener
from magnum.conductor.handlers import docker_conductor
@ -41,8 +42,8 @@ def main():
cfg.CONF.log_opt_values(LOG, std_logging.DEBUG)
cfg.CONF.import_opt('topic', 'magnum.conductor.config', group='conductor')
cfg.CONF.import_opt('host', 'magnum.conductor.config', group='conductor')
conductor_id = short_id.generate_id()
endpoints = [
docker_conductor.Handler(),
k8s_conductor.Handler(),
@ -56,5 +57,5 @@ def main():
exit(-1)
server = service.Service(cfg.CONF.conductor.topic,
cfg.CONF.conductor.host, endpoints)
conductor_id, endpoints)
server.serve()

View File

@ -431,3 +431,7 @@ class RequiredParameterNotProvided(MagnumException):
class Urllib2InvalidScheme(MagnumException):
message = _("The urllib2 URL %(url) has an invalid scheme.")
class OperationInProgress(Invalid):
message = _("Bay %(bay_name)s already has an operation in progress.")

View File

@ -0,0 +1,130 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_utils import excutils
from magnum.common import exception
from magnum.conductor.api import ListenerAPI
from magnum import objects
from magnum.openstack.common._i18n import _LI
from magnum.openstack.common._i18n import _LW
from magnum.openstack.common import log as logging
cfg.CONF.import_opt('topic', 'magnum.conductor.config',
group='conductor')
cfg.CONF.import_opt('conductor_life_check_timeout', 'magnum.conductor.config',
group='conductor')
LOG = logging.getLogger(__name__)
class BayLock(object):
def __init__(self, context, bay, conductor_id):
self.context = context
self.bay = bay
self.conductor_id = conductor_id
@staticmethod
def conductor_alive(context, conductor_id):
topic = cfg.CONF.conductor.topic
timeout = cfg.CONF.conductor.conductor_life_check_timeout
listener_api = ListenerAPI(context=context, topic=topic,
server=conductor_id, timeout=timeout)
try:
return listener_api.ping_conductor()
except messaging.MessagingTimeout:
return False
def acquire(self, retry=True):
"""Acquire a lock on the bay.
:param retry: When True, retry if lock was released while stealing.
"""
lock_conductor_id = objects.BayLock.create(self.bay.uuid,
self.conductor_id)
if lock_conductor_id is None:
LOG.debug("Conductor %(conductor)s acquired lock on bay "
"%(bay)s" % {'conductor': self.conductor_id,
'bay': self.bay.uuid})
return
if (lock_conductor_id == self.conductor_id or
self.conductor_alive(self.context, lock_conductor_id)):
LOG.debug("Lock on bay %(bay)s is owned by conductor "
"%(conductor)s" % {'bay': self.bay.uuid,
'conductor': lock_conductor_id})
raise exception.OperationInProgress(bay_name=self.bay.name)
else:
LOG.info(_LI("Stale lock detected on bay %(bay)s. Conductor "
"%(conductor)s will attempt to steal the lock"),
{'bay': self.bay.uuid, 'conductor': self.conductor_id})
result = objects.BayLock.steal(self.bay.uuid,
lock_conductor_id,
self.conductor_id)
if result is None:
LOG.info(_LI("Conductor %(conductor)s successfully stole the "
"lock on bay %(bay)s"),
{'conductor': self.conductor_id,
'bay': self.bay.uuid})
return
elif result is True:
if retry:
LOG.info(_LI("The lock on bay %(bay)s was released while "
"conductor %(conductor)s was stealing it. "
"Trying again"),
{'bay': self.bay.uuid,
'conductor': self.conductor_id})
return self.acquire(retry=False)
else:
new_lock_conductor_id = result
LOG.info(_LI("Failed to steal lock on bay %(bay)s. "
"Conductor %(conductor)s stole the lock first"),
{'bay': self.bay.uuid,
'conductor': new_lock_conductor_id})
raise exception.OperationInProgress(bay_name=self.bay.name)
def release(self, bay_uuid):
"""Release a bay lock."""
# Only the conductor that owns the lock will be releasing it.
result = objects.BayLock.release(bay_uuid, self.conductor_id)
if result is True:
LOG.warn(_LW("Lock was already released on bay %s!"), bay_uuid)
else:
LOG.debug("Conductor %(conductor)s released lock on bay "
"%(bay)s" % {'conductor': self.conductor_id,
'bay': bay_uuid})
@contextlib.contextmanager
def thread_lock(self, bay_uuid):
"""Acquire a lock and release it only if there is an exception.
The release method still needs to be scheduled to be run at the
end of the thread using the Thread.link method.
"""
try:
self.acquire()
yield
except exception.OperationInProgress:
raise
except: # noqa
with excutils.save_and_reraise_exception():
self.release(bay_uuid)

View File

@ -21,9 +21,10 @@ SERVICE_OPTS = [
cfg.StrOpt('topic',
default='magnum-conductor',
help='The queue to add conductor tasks to'),
cfg.StrOpt('host',
default='localhost',
help='The location of the conductor rpc queue'),
cfg.IntOpt('conductor_life_check_timeout',
default=4,
help=('RPC timeout for the conductor liveness check that is '
'used for bay locking.')),
]
opt_group = cfg.OptGroup(

View File

@ -0,0 +1,183 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import oslo_messaging as messaging
from magnum.common import exception
from magnum.common import short_id
from magnum.conductor import bay_lock
from magnum.tests import base
from magnum.tests.unit.objects import utils as obj_utils
from mock import patch
class BayLockTest(base.TestCase):
def setUp(self):
super(BayLockTest, self).setUp()
self.conductor_id = short_id.generate_id()
self.bay = obj_utils.get_test_bay(self.context)
class TestThreadLockException(Exception):
pass
@patch('magnum.objects.BayLock.create', return_value=None)
def test_successful_acquire_new_lock(self, mock_object_create):
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
baylock.acquire()
mock_object_create.assert_called_once_with(self.bay.uuid,
self.conductor_id)
@patch('magnum.objects.BayLock.create')
def test_failed_acquire_current_conductor_lock(self, mock_object_create):
mock_object_create.return_value = self.conductor_id
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
self.assertRaises(exception.OperationInProgress, baylock.acquire)
mock_object_create.assert_called_once_with(self.bay.uuid,
self.conductor_id)
@patch('magnum.objects.BayLock.steal', return_value=None)
@patch('magnum.objects.BayLock.create', return_value='fake-conductor-id')
def test_successful_acquire_dead_conductor_lock(self, mock_object_create,
mock_object_steal):
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
with mock.patch.object(baylock, 'conductor_alive',
return_value=False):
baylock.acquire()
mock_object_create.assert_called_once_with(self.bay.uuid,
self.conductor_id)
mock_object_steal.assert_called_once_with(self.bay.uuid,
'fake-conductor-id', self.conductor_id)
@patch('magnum.objects.BayLock.create', return_value='fake-conductor-id')
def test_failed_acquire_alive_conductor_lock(self, mock_object_create):
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
with mock.patch.object(baylock, 'conductor_alive',
return_value=True):
self.assertRaises(exception.OperationInProgress, baylock.acquire)
mock_object_create.assert_called_once_with(self.bay.uuid,
self.conductor_id)
@patch('magnum.objects.BayLock.steal', return_value='fake-conductor-id2')
@patch('magnum.objects.BayLock.create', return_value='fake-conductor-id')
def test_failed_acquire_dead_conductor_lock(self, mock_object_create,
mock_object_steal):
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
with mock.patch.object(baylock, 'conductor_alive',
return_value=False):
self.assertRaises(exception.OperationInProgress, baylock.acquire)
mock_object_create.assert_called_once_with(self.bay.uuid,
self.conductor_id)
mock_object_steal.assert_called_once_with(self.bay.uuid,
'fake-conductor-id', self.conductor_id)
@patch('magnum.objects.BayLock.steal', side_effect=[True, None])
@patch('magnum.objects.BayLock.create', return_value='fake-conductor-id')
def test_successful_acquire_with_retry(self, mock_object_create,
mock_object_steal):
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
with mock.patch.object(baylock, 'conductor_alive',
return_value=False):
baylock.acquire()
mock_object_create.assert_has_calls(
[mock.call(self.bay.uuid, self.conductor_id)] * 2)
mock_object_steal.assert_has_calls(
[mock.call(self.bay.uuid, 'fake-conductor-id',
self.conductor_id)] * 2)
@patch('magnum.objects.BayLock.steal', return_value=True)
@patch('magnum.objects.BayLock.create', return_value='fake-conductor-id')
def test_failed_acquire_one_retry_only(self, mock_object_create,
mock_object_steal):
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
with mock.patch.object(baylock, 'conductor_alive',
return_value=False):
self.assertRaises(exception.OperationInProgress, baylock.acquire)
mock_object_create.assert_has_calls(
[mock.call(self.bay.uuid, self.conductor_id)] * 2)
mock_object_steal.assert_has_calls(
[mock.call(self.bay.uuid, 'fake-conductor-id',
self.conductor_id)] * 2)
@patch('magnum.objects.BayLock.release', return_value=None)
@patch('magnum.objects.BayLock.create', return_value=None)
def test_thread_lock_acquire_success_with_exception(self,
mock_object_create,
mock_object_release):
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
def check_thread_lock():
with baylock.thread_lock(self.bay.uuid):
self.assertEqual(1, mock_object_create.call_count)
raise self.TestThreadLockException
self.assertRaises(self.TestThreadLockException, check_thread_lock)
self.assertEqual(1, mock_object_release.call_count)
@patch('magnum.objects.BayLock.release', return_value=None)
@patch('magnum.objects.BayLock.create')
def test_thread_lock_acquire_fail_with_exception(self, mock_object_create,
mock_object_release):
mock_object_create.return_value = self.conductor_id
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
def check_thread_lock():
with baylock.thread_lock(self.bay.uuid):
self.assertEqual(1, mock_object_create.call_count)
raise exception.OperationInProgress
self.assertRaises(exception.OperationInProgress, check_thread_lock)
assert not mock_object_release.called
@patch('magnum.objects.BayLock.release', return_value=None)
@patch('magnum.objects.BayLock.create', return_value=None)
def test_thread_lock_acquire_success_no_exception(self, mock_object_create,
mock_object_release):
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
with baylock.thread_lock(self.bay.uuid):
self.assertEqual(1, mock_object_create.call_count)
assert not mock_object_release.called
@patch('magnum.conductor.api.ListenerAPI.__new__')
def test_conductor_alive_ok(self, mock_listener_api_new):
mock_listener_api = mock.MagicMock()
mock_listener_api.ping_conductor.return_value = True
mock_listener_api_new.return_value = mock_listener_api
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
ret = baylock.conductor_alive(self.context, self.conductor_id)
self.assertIs(True, ret)
self.assertEqual(1, mock_listener_api_new.call_count)
@patch('magnum.conductor.api.ListenerAPI.__new__')
def test_conductor_alive_timeout(self, mock_listener_api_new):
mock_listener_api = mock.MagicMock()
mock_listener_api.ping_conductor.side_effect = (
messaging.MessagingTimeout('too slow'))
mock_listener_api_new.return_value = mock_listener_api
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
ret = baylock.conductor_alive(self.context, self.conductor_id)
self.assertIs(False, ret)
self.assertEqual(1, mock_listener_api_new.call_count)