diff --git a/devstack/lib/magnum b/devstack/lib/magnum index 21a90040ae..f154e7a082 100644 --- a/devstack/lib/magnum +++ b/devstack/lib/magnum @@ -123,7 +123,6 @@ function create_magnum_conf { iniset $MAGNUM_CONF database connection `database_connection_url magnum` iniset $MAGNUM_CONF api host "$MAGNUM_SERVICE_HOST" iniset $MAGNUM_CONF api port "$MAGNUM_SERVICE_PORT" - iniset $MAGNUM_CONF conductor host "$MAGNUM_SERVICE_HOST" configure_auth_token_middleware $MAGNUM_CONF magnum $MAGNUM_AUTH_CACHE_DIR diff --git a/etc/magnum/magnum.conf.sample b/etc/magnum/magnum.conf.sample index afbe5778dd..7f4ebe68bf 100644 --- a/etc/magnum/magnum.conf.sample +++ b/etc/magnum/magnum.conf.sample @@ -331,8 +331,9 @@ # The queue to add conductor tasks to (string value) #topic = magnum-conductor -# The location of the conductor rpc queue (string value) -#host = localhost +# RPC timeout for the conductor liveness check that is used for bay +# locking. (integer value) +#conductor_life_check_timeout = 4 [database] diff --git a/magnum/cmd/conductor.py b/magnum/cmd/conductor.py index bb1c949455..776ace0edd 100644 --- a/magnum/cmd/conductor.py +++ b/magnum/cmd/conductor.py @@ -21,6 +21,7 @@ import sys from oslo_config import cfg from magnum.common import rpc_service as service +from magnum.common import short_id from magnum.conductor.handlers import bay_k8s_heat from magnum.conductor.handlers import conductor_listener from magnum.conductor.handlers import docker_conductor @@ -41,8 +42,8 @@ def main(): cfg.CONF.log_opt_values(LOG, std_logging.DEBUG) cfg.CONF.import_opt('topic', 'magnum.conductor.config', group='conductor') - cfg.CONF.import_opt('host', 'magnum.conductor.config', group='conductor') + conductor_id = short_id.generate_id() endpoints = [ docker_conductor.Handler(), k8s_conductor.Handler(), @@ -56,5 +57,5 @@ def main(): exit(-1) server = service.Service(cfg.CONF.conductor.topic, - cfg.CONF.conductor.host, endpoints) + conductor_id, endpoints) server.serve() diff --git a/magnum/common/exception.py b/magnum/common/exception.py index 1c9ec081a2..b883a1d055 100644 --- a/magnum/common/exception.py +++ b/magnum/common/exception.py @@ -431,3 +431,7 @@ class RequiredParameterNotProvided(MagnumException): class Urllib2InvalidScheme(MagnumException): message = _("The urllib2 URL %(url) has an invalid scheme.") + + +class OperationInProgress(Invalid): + message = _("Bay %(bay_name)s already has an operation in progress.") diff --git a/magnum/conductor/bay_lock.py b/magnum/conductor/bay_lock.py new file mode 100644 index 0000000000..6412854c52 --- /dev/null +++ b/magnum/conductor/bay_lock.py @@ -0,0 +1,130 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib + +from oslo_config import cfg +import oslo_messaging as messaging +from oslo_utils import excutils + +from magnum.common import exception +from magnum.conductor.api import ListenerAPI +from magnum import objects +from magnum.openstack.common._i18n import _LI +from magnum.openstack.common._i18n import _LW +from magnum.openstack.common import log as logging + + +cfg.CONF.import_opt('topic', 'magnum.conductor.config', + group='conductor') +cfg.CONF.import_opt('conductor_life_check_timeout', 'magnum.conductor.config', + group='conductor') + + +LOG = logging.getLogger(__name__) + + +class BayLock(object): + + def __init__(self, context, bay, conductor_id): + self.context = context + self.bay = bay + self.conductor_id = conductor_id + + @staticmethod + def conductor_alive(context, conductor_id): + topic = cfg.CONF.conductor.topic + timeout = cfg.CONF.conductor.conductor_life_check_timeout + listener_api = ListenerAPI(context=context, topic=topic, + server=conductor_id, timeout=timeout) + try: + return listener_api.ping_conductor() + except messaging.MessagingTimeout: + return False + + def acquire(self, retry=True): + """Acquire a lock on the bay. + + :param retry: When True, retry if lock was released while stealing. + """ + lock_conductor_id = objects.BayLock.create(self.bay.uuid, + self.conductor_id) + if lock_conductor_id is None: + LOG.debug("Conductor %(conductor)s acquired lock on bay " + "%(bay)s" % {'conductor': self.conductor_id, + 'bay': self.bay.uuid}) + return + + if (lock_conductor_id == self.conductor_id or + self.conductor_alive(self.context, lock_conductor_id)): + LOG.debug("Lock on bay %(bay)s is owned by conductor " + "%(conductor)s" % {'bay': self.bay.uuid, + 'conductor': lock_conductor_id}) + raise exception.OperationInProgress(bay_name=self.bay.name) + else: + LOG.info(_LI("Stale lock detected on bay %(bay)s. Conductor " + "%(conductor)s will attempt to steal the lock"), + {'bay': self.bay.uuid, 'conductor': self.conductor_id}) + + result = objects.BayLock.steal(self.bay.uuid, + lock_conductor_id, + self.conductor_id) + + if result is None: + LOG.info(_LI("Conductor %(conductor)s successfully stole the " + "lock on bay %(bay)s"), + {'conductor': self.conductor_id, + 'bay': self.bay.uuid}) + return + elif result is True: + if retry: + LOG.info(_LI("The lock on bay %(bay)s was released while " + "conductor %(conductor)s was stealing it. " + "Trying again"), + {'bay': self.bay.uuid, + 'conductor': self.conductor_id}) + return self.acquire(retry=False) + else: + new_lock_conductor_id = result + LOG.info(_LI("Failed to steal lock on bay %(bay)s. " + "Conductor %(conductor)s stole the lock first"), + {'bay': self.bay.uuid, + 'conductor': new_lock_conductor_id}) + + raise exception.OperationInProgress(bay_name=self.bay.name) + + def release(self, bay_uuid): + """Release a bay lock.""" + # Only the conductor that owns the lock will be releasing it. + result = objects.BayLock.release(bay_uuid, self.conductor_id) + if result is True: + LOG.warn(_LW("Lock was already released on bay %s!"), bay_uuid) + else: + LOG.debug("Conductor %(conductor)s released lock on bay " + "%(bay)s" % {'conductor': self.conductor_id, + 'bay': bay_uuid}) + + @contextlib.contextmanager + def thread_lock(self, bay_uuid): + """Acquire a lock and release it only if there is an exception. + The release method still needs to be scheduled to be run at the + end of the thread using the Thread.link method. + """ + try: + self.acquire() + yield + except exception.OperationInProgress: + raise + except: # noqa + with excutils.save_and_reraise_exception(): + self.release(bay_uuid) diff --git a/magnum/conductor/config.py b/magnum/conductor/config.py index 06ed1dba9d..52ca9c87de 100644 --- a/magnum/conductor/config.py +++ b/magnum/conductor/config.py @@ -21,9 +21,10 @@ SERVICE_OPTS = [ cfg.StrOpt('topic', default='magnum-conductor', help='The queue to add conductor tasks to'), - cfg.StrOpt('host', - default='localhost', - help='The location of the conductor rpc queue'), + cfg.IntOpt('conductor_life_check_timeout', + default=4, + help=('RPC timeout for the conductor liveness check that is ' + 'used for bay locking.')), ] opt_group = cfg.OptGroup( diff --git a/magnum/tests/unit/conductor/test_bay_lock.py b/magnum/tests/unit/conductor/test_bay_lock.py new file mode 100644 index 0000000000..41b09f102f --- /dev/null +++ b/magnum/tests/unit/conductor/test_bay_lock.py @@ -0,0 +1,183 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +import oslo_messaging as messaging + +from magnum.common import exception +from magnum.common import short_id +from magnum.conductor import bay_lock +from magnum.tests import base +from magnum.tests.unit.objects import utils as obj_utils +from mock import patch + + +class BayLockTest(base.TestCase): + + def setUp(self): + super(BayLockTest, self).setUp() + self.conductor_id = short_id.generate_id() + self.bay = obj_utils.get_test_bay(self.context) + + class TestThreadLockException(Exception): + pass + + @patch('magnum.objects.BayLock.create', return_value=None) + def test_successful_acquire_new_lock(self, mock_object_create): + baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id) + baylock.acquire() + + mock_object_create.assert_called_once_with(self.bay.uuid, + self.conductor_id) + + @patch('magnum.objects.BayLock.create') + def test_failed_acquire_current_conductor_lock(self, mock_object_create): + mock_object_create.return_value = self.conductor_id + + baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id) + + self.assertRaises(exception.OperationInProgress, baylock.acquire) + mock_object_create.assert_called_once_with(self.bay.uuid, + self.conductor_id) + + @patch('magnum.objects.BayLock.steal', return_value=None) + @patch('magnum.objects.BayLock.create', return_value='fake-conductor-id') + def test_successful_acquire_dead_conductor_lock(self, mock_object_create, + mock_object_steal): + baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id) + with mock.patch.object(baylock, 'conductor_alive', + return_value=False): + baylock.acquire() + + mock_object_create.assert_called_once_with(self.bay.uuid, + self.conductor_id) + mock_object_steal.assert_called_once_with(self.bay.uuid, + 'fake-conductor-id', self.conductor_id) + + @patch('magnum.objects.BayLock.create', return_value='fake-conductor-id') + def test_failed_acquire_alive_conductor_lock(self, mock_object_create): + baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id) + with mock.patch.object(baylock, 'conductor_alive', + return_value=True): + self.assertRaises(exception.OperationInProgress, baylock.acquire) + + mock_object_create.assert_called_once_with(self.bay.uuid, + self.conductor_id) + + @patch('magnum.objects.BayLock.steal', return_value='fake-conductor-id2') + @patch('magnum.objects.BayLock.create', return_value='fake-conductor-id') + def test_failed_acquire_dead_conductor_lock(self, mock_object_create, + mock_object_steal): + baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id) + with mock.patch.object(baylock, 'conductor_alive', + return_value=False): + self.assertRaises(exception.OperationInProgress, baylock.acquire) + + mock_object_create.assert_called_once_with(self.bay.uuid, + self.conductor_id) + mock_object_steal.assert_called_once_with(self.bay.uuid, + 'fake-conductor-id', self.conductor_id) + + @patch('magnum.objects.BayLock.steal', side_effect=[True, None]) + @patch('magnum.objects.BayLock.create', return_value='fake-conductor-id') + def test_successful_acquire_with_retry(self, mock_object_create, + mock_object_steal): + baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id) + with mock.patch.object(baylock, 'conductor_alive', + return_value=False): + baylock.acquire() + + mock_object_create.assert_has_calls( + [mock.call(self.bay.uuid, self.conductor_id)] * 2) + mock_object_steal.assert_has_calls( + [mock.call(self.bay.uuid, 'fake-conductor-id', + self.conductor_id)] * 2) + + @patch('magnum.objects.BayLock.steal', return_value=True) + @patch('magnum.objects.BayLock.create', return_value='fake-conductor-id') + def test_failed_acquire_one_retry_only(self, mock_object_create, + mock_object_steal): + baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id) + with mock.patch.object(baylock, 'conductor_alive', + return_value=False): + self.assertRaises(exception.OperationInProgress, baylock.acquire) + + mock_object_create.assert_has_calls( + [mock.call(self.bay.uuid, self.conductor_id)] * 2) + mock_object_steal.assert_has_calls( + [mock.call(self.bay.uuid, 'fake-conductor-id', + self.conductor_id)] * 2) + + @patch('magnum.objects.BayLock.release', return_value=None) + @patch('magnum.objects.BayLock.create', return_value=None) + def test_thread_lock_acquire_success_with_exception(self, + mock_object_create, + mock_object_release): + baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id) + + def check_thread_lock(): + with baylock.thread_lock(self.bay.uuid): + self.assertEqual(1, mock_object_create.call_count) + raise self.TestThreadLockException + + self.assertRaises(self.TestThreadLockException, check_thread_lock) + self.assertEqual(1, mock_object_release.call_count) + + @patch('magnum.objects.BayLock.release', return_value=None) + @patch('magnum.objects.BayLock.create') + def test_thread_lock_acquire_fail_with_exception(self, mock_object_create, + mock_object_release): + mock_object_create.return_value = self.conductor_id + baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id) + + def check_thread_lock(): + with baylock.thread_lock(self.bay.uuid): + self.assertEqual(1, mock_object_create.call_count) + raise exception.OperationInProgress + + self.assertRaises(exception.OperationInProgress, check_thread_lock) + assert not mock_object_release.called + + @patch('magnum.objects.BayLock.release', return_value=None) + @patch('magnum.objects.BayLock.create', return_value=None) + def test_thread_lock_acquire_success_no_exception(self, mock_object_create, + mock_object_release): + baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id) + with baylock.thread_lock(self.bay.uuid): + self.assertEqual(1, mock_object_create.call_count) + assert not mock_object_release.called + + @patch('magnum.conductor.api.ListenerAPI.__new__') + def test_conductor_alive_ok(self, mock_listener_api_new): + mock_listener_api = mock.MagicMock() + mock_listener_api.ping_conductor.return_value = True + mock_listener_api_new.return_value = mock_listener_api + baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id) + + ret = baylock.conductor_alive(self.context, self.conductor_id) + + self.assertIs(True, ret) + self.assertEqual(1, mock_listener_api_new.call_count) + + @patch('magnum.conductor.api.ListenerAPI.__new__') + def test_conductor_alive_timeout(self, mock_listener_api_new): + mock_listener_api = mock.MagicMock() + mock_listener_api.ping_conductor.side_effect = ( + messaging.MessagingTimeout('too slow')) + mock_listener_api_new.return_value = mock_listener_api + baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id) + + ret = baylock.conductor_alive(self.context, self.conductor_id) + + self.assertIs(False, ret) + self.assertEqual(1, mock_listener_api_new.call_count)