Merge "Implement baylock in conductor for horizontal-scale"
This commit is contained in:
commit
0c29b5a47c
|
@ -123,7 +123,6 @@ function create_magnum_conf {
|
|||
iniset $MAGNUM_CONF database connection `database_connection_url magnum`
|
||||
iniset $MAGNUM_CONF api host "$MAGNUM_SERVICE_HOST"
|
||||
iniset $MAGNUM_CONF api port "$MAGNUM_SERVICE_PORT"
|
||||
iniset $MAGNUM_CONF conductor host "$MAGNUM_SERVICE_HOST"
|
||||
|
||||
configure_auth_token_middleware $MAGNUM_CONF magnum $MAGNUM_AUTH_CACHE_DIR
|
||||
|
||||
|
|
|
@ -242,8 +242,9 @@
|
|||
# The queue to add conductor tasks to (string value)
|
||||
#topic = magnum-conductor
|
||||
|
||||
# The location of the conductor rpc queue (string value)
|
||||
#host = localhost
|
||||
# RPC timeout for the conductor liveness check that is used for bay
|
||||
# locking. (integer value)
|
||||
#conductor_life_check_timeout = 4
|
||||
|
||||
|
||||
[database]
|
||||
|
|
|
@ -21,6 +21,7 @@ import sys
|
|||
from oslo_config import cfg
|
||||
|
||||
from magnum.common import rpc_service as service
|
||||
from magnum.common import short_id
|
||||
from magnum.conductor.handlers import bay_k8s_heat
|
||||
from magnum.conductor.handlers import conductor_listener
|
||||
from magnum.conductor.handlers import docker_conductor
|
||||
|
@ -41,8 +42,8 @@ def main():
|
|||
cfg.CONF.log_opt_values(LOG, std_logging.DEBUG)
|
||||
|
||||
cfg.CONF.import_opt('topic', 'magnum.conductor.config', group='conductor')
|
||||
cfg.CONF.import_opt('host', 'magnum.conductor.config', group='conductor')
|
||||
|
||||
conductor_id = short_id.generate_id()
|
||||
endpoints = [
|
||||
docker_conductor.Handler(),
|
||||
k8s_conductor.Handler(),
|
||||
|
@ -60,5 +61,5 @@ def main():
|
|||
'coreos_template': cfg.CONF.bay.k8s_coreos_template_path})
|
||||
|
||||
server = service.Service(cfg.CONF.conductor.topic,
|
||||
cfg.CONF.conductor.host, endpoints)
|
||||
conductor_id, endpoints)
|
||||
server.serve()
|
||||
|
|
|
@ -431,3 +431,7 @@ class RequiredParameterNotProvided(MagnumException):
|
|||
|
||||
class Urllib2InvalidScheme(MagnumException):
|
||||
message = _("The urllib2 URL %(url) has an invalid scheme.")
|
||||
|
||||
|
||||
class OperationInProgress(Invalid):
|
||||
message = _("Bay %(bay_name)s already has an operation in progress.")
|
||||
|
|
|
@ -0,0 +1,130 @@
|
|||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
|
||||
from oslo_config import cfg
|
||||
import oslo_messaging as messaging
|
||||
from oslo_utils import excutils
|
||||
|
||||
from magnum.common import exception
|
||||
from magnum.conductor.api import ListenerAPI
|
||||
from magnum import objects
|
||||
from magnum.openstack.common._i18n import _LI
|
||||
from magnum.openstack.common._i18n import _LW
|
||||
from magnum.openstack.common import log as logging
|
||||
|
||||
|
||||
cfg.CONF.import_opt('topic', 'magnum.conductor.config',
|
||||
group='conductor')
|
||||
cfg.CONF.import_opt('conductor_life_check_timeout', 'magnum.conductor.config',
|
||||
group='conductor')
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BayLock(object):
|
||||
|
||||
def __init__(self, context, bay, conductor_id):
|
||||
self.context = context
|
||||
self.bay = bay
|
||||
self.conductor_id = conductor_id
|
||||
|
||||
@staticmethod
|
||||
def conductor_alive(context, conductor_id):
|
||||
topic = cfg.CONF.conductor.topic
|
||||
timeout = cfg.CONF.conductor.conductor_life_check_timeout
|
||||
listener_api = ListenerAPI(context=context, topic=topic,
|
||||
server=conductor_id, timeout=timeout)
|
||||
try:
|
||||
return listener_api.ping_conductor()
|
||||
except messaging.MessagingTimeout:
|
||||
return False
|
||||
|
||||
def acquire(self, retry=True):
|
||||
"""Acquire a lock on the bay.
|
||||
|
||||
:param retry: When True, retry if lock was released while stealing.
|
||||
"""
|
||||
lock_conductor_id = objects.BayLock.create(self.bay.uuid,
|
||||
self.conductor_id)
|
||||
if lock_conductor_id is None:
|
||||
LOG.debug("Conductor %(conductor)s acquired lock on bay "
|
||||
"%(bay)s" % {'conductor': self.conductor_id,
|
||||
'bay': self.bay.uuid})
|
||||
return
|
||||
|
||||
if (lock_conductor_id == self.conductor_id or
|
||||
self.conductor_alive(self.context, lock_conductor_id)):
|
||||
LOG.debug("Lock on bay %(bay)s is owned by conductor "
|
||||
"%(conductor)s" % {'bay': self.bay.uuid,
|
||||
'conductor': lock_conductor_id})
|
||||
raise exception.OperationInProgress(bay_name=self.bay.name)
|
||||
else:
|
||||
LOG.info(_LI("Stale lock detected on bay %(bay)s. Conductor "
|
||||
"%(conductor)s will attempt to steal the lock"),
|
||||
{'bay': self.bay.uuid, 'conductor': self.conductor_id})
|
||||
|
||||
result = objects.BayLock.steal(self.bay.uuid,
|
||||
lock_conductor_id,
|
||||
self.conductor_id)
|
||||
|
||||
if result is None:
|
||||
LOG.info(_LI("Conductor %(conductor)s successfully stole the "
|
||||
"lock on bay %(bay)s"),
|
||||
{'conductor': self.conductor_id,
|
||||
'bay': self.bay.uuid})
|
||||
return
|
||||
elif result is True:
|
||||
if retry:
|
||||
LOG.info(_LI("The lock on bay %(bay)s was released while "
|
||||
"conductor %(conductor)s was stealing it. "
|
||||
"Trying again"),
|
||||
{'bay': self.bay.uuid,
|
||||
'conductor': self.conductor_id})
|
||||
return self.acquire(retry=False)
|
||||
else:
|
||||
new_lock_conductor_id = result
|
||||
LOG.info(_LI("Failed to steal lock on bay %(bay)s. "
|
||||
"Conductor %(conductor)s stole the lock first"),
|
||||
{'bay': self.bay.uuid,
|
||||
'conductor': new_lock_conductor_id})
|
||||
|
||||
raise exception.OperationInProgress(bay_name=self.bay.name)
|
||||
|
||||
def release(self, bay_uuid):
|
||||
"""Release a bay lock."""
|
||||
# Only the conductor that owns the lock will be releasing it.
|
||||
result = objects.BayLock.release(bay_uuid, self.conductor_id)
|
||||
if result is True:
|
||||
LOG.warn(_LW("Lock was already released on bay %s!"), bay_uuid)
|
||||
else:
|
||||
LOG.debug("Conductor %(conductor)s released lock on bay "
|
||||
"%(bay)s" % {'conductor': self.conductor_id,
|
||||
'bay': bay_uuid})
|
||||
|
||||
@contextlib.contextmanager
|
||||
def thread_lock(self, bay_uuid):
|
||||
"""Acquire a lock and release it only if there is an exception.
|
||||
The release method still needs to be scheduled to be run at the
|
||||
end of the thread using the Thread.link method.
|
||||
"""
|
||||
try:
|
||||
self.acquire()
|
||||
yield
|
||||
except exception.OperationInProgress:
|
||||
raise
|
||||
except: # noqa
|
||||
with excutils.save_and_reraise_exception():
|
||||
self.release(bay_uuid)
|
|
@ -21,9 +21,10 @@ SERVICE_OPTS = [
|
|||
cfg.StrOpt('topic',
|
||||
default='magnum-conductor',
|
||||
help='The queue to add conductor tasks to'),
|
||||
cfg.StrOpt('host',
|
||||
default='localhost',
|
||||
help='The location of the conductor rpc queue'),
|
||||
cfg.IntOpt('conductor_life_check_timeout',
|
||||
default=4,
|
||||
help=('RPC timeout for the conductor liveness check that is '
|
||||
'used for bay locking.')),
|
||||
]
|
||||
|
||||
opt_group = cfg.OptGroup(
|
||||
|
|
|
@ -0,0 +1,183 @@
|
|||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
import oslo_messaging as messaging
|
||||
|
||||
from magnum.common import exception
|
||||
from magnum.common import short_id
|
||||
from magnum.conductor import bay_lock
|
||||
from magnum.tests import base
|
||||
from magnum.tests.unit.objects import utils as obj_utils
|
||||
from mock import patch
|
||||
|
||||
|
||||
class BayLockTest(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(BayLockTest, self).setUp()
|
||||
self.conductor_id = short_id.generate_id()
|
||||
self.bay = obj_utils.get_test_bay(self.context)
|
||||
|
||||
class TestThreadLockException(Exception):
|
||||
pass
|
||||
|
||||
@patch('magnum.objects.BayLock.create', return_value=None)
|
||||
def test_successful_acquire_new_lock(self, mock_object_create):
|
||||
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
|
||||
baylock.acquire()
|
||||
|
||||
mock_object_create.assert_called_once_with(self.bay.uuid,
|
||||
self.conductor_id)
|
||||
|
||||
@patch('magnum.objects.BayLock.create')
|
||||
def test_failed_acquire_current_conductor_lock(self, mock_object_create):
|
||||
mock_object_create.return_value = self.conductor_id
|
||||
|
||||
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
|
||||
|
||||
self.assertRaises(exception.OperationInProgress, baylock.acquire)
|
||||
mock_object_create.assert_called_once_with(self.bay.uuid,
|
||||
self.conductor_id)
|
||||
|
||||
@patch('magnum.objects.BayLock.steal', return_value=None)
|
||||
@patch('magnum.objects.BayLock.create', return_value='fake-conductor-id')
|
||||
def test_successful_acquire_dead_conductor_lock(self, mock_object_create,
|
||||
mock_object_steal):
|
||||
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
|
||||
with mock.patch.object(baylock, 'conductor_alive',
|
||||
return_value=False):
|
||||
baylock.acquire()
|
||||
|
||||
mock_object_create.assert_called_once_with(self.bay.uuid,
|
||||
self.conductor_id)
|
||||
mock_object_steal.assert_called_once_with(self.bay.uuid,
|
||||
'fake-conductor-id', self.conductor_id)
|
||||
|
||||
@patch('magnum.objects.BayLock.create', return_value='fake-conductor-id')
|
||||
def test_failed_acquire_alive_conductor_lock(self, mock_object_create):
|
||||
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
|
||||
with mock.patch.object(baylock, 'conductor_alive',
|
||||
return_value=True):
|
||||
self.assertRaises(exception.OperationInProgress, baylock.acquire)
|
||||
|
||||
mock_object_create.assert_called_once_with(self.bay.uuid,
|
||||
self.conductor_id)
|
||||
|
||||
@patch('magnum.objects.BayLock.steal', return_value='fake-conductor-id2')
|
||||
@patch('magnum.objects.BayLock.create', return_value='fake-conductor-id')
|
||||
def test_failed_acquire_dead_conductor_lock(self, mock_object_create,
|
||||
mock_object_steal):
|
||||
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
|
||||
with mock.patch.object(baylock, 'conductor_alive',
|
||||
return_value=False):
|
||||
self.assertRaises(exception.OperationInProgress, baylock.acquire)
|
||||
|
||||
mock_object_create.assert_called_once_with(self.bay.uuid,
|
||||
self.conductor_id)
|
||||
mock_object_steal.assert_called_once_with(self.bay.uuid,
|
||||
'fake-conductor-id', self.conductor_id)
|
||||
|
||||
@patch('magnum.objects.BayLock.steal', side_effect=[True, None])
|
||||
@patch('magnum.objects.BayLock.create', return_value='fake-conductor-id')
|
||||
def test_successful_acquire_with_retry(self, mock_object_create,
|
||||
mock_object_steal):
|
||||
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
|
||||
with mock.patch.object(baylock, 'conductor_alive',
|
||||
return_value=False):
|
||||
baylock.acquire()
|
||||
|
||||
mock_object_create.assert_has_calls(
|
||||
[mock.call(self.bay.uuid, self.conductor_id)] * 2)
|
||||
mock_object_steal.assert_has_calls(
|
||||
[mock.call(self.bay.uuid, 'fake-conductor-id',
|
||||
self.conductor_id)] * 2)
|
||||
|
||||
@patch('magnum.objects.BayLock.steal', return_value=True)
|
||||
@patch('magnum.objects.BayLock.create', return_value='fake-conductor-id')
|
||||
def test_failed_acquire_one_retry_only(self, mock_object_create,
|
||||
mock_object_steal):
|
||||
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
|
||||
with mock.patch.object(baylock, 'conductor_alive',
|
||||
return_value=False):
|
||||
self.assertRaises(exception.OperationInProgress, baylock.acquire)
|
||||
|
||||
mock_object_create.assert_has_calls(
|
||||
[mock.call(self.bay.uuid, self.conductor_id)] * 2)
|
||||
mock_object_steal.assert_has_calls(
|
||||
[mock.call(self.bay.uuid, 'fake-conductor-id',
|
||||
self.conductor_id)] * 2)
|
||||
|
||||
@patch('magnum.objects.BayLock.release', return_value=None)
|
||||
@patch('magnum.objects.BayLock.create', return_value=None)
|
||||
def test_thread_lock_acquire_success_with_exception(self,
|
||||
mock_object_create,
|
||||
mock_object_release):
|
||||
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
|
||||
|
||||
def check_thread_lock():
|
||||
with baylock.thread_lock(self.bay.uuid):
|
||||
self.assertEqual(1, mock_object_create.call_count)
|
||||
raise self.TestThreadLockException
|
||||
|
||||
self.assertRaises(self.TestThreadLockException, check_thread_lock)
|
||||
self.assertEqual(1, mock_object_release.call_count)
|
||||
|
||||
@patch('magnum.objects.BayLock.release', return_value=None)
|
||||
@patch('magnum.objects.BayLock.create')
|
||||
def test_thread_lock_acquire_fail_with_exception(self, mock_object_create,
|
||||
mock_object_release):
|
||||
mock_object_create.return_value = self.conductor_id
|
||||
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
|
||||
|
||||
def check_thread_lock():
|
||||
with baylock.thread_lock(self.bay.uuid):
|
||||
self.assertEqual(1, mock_object_create.call_count)
|
||||
raise exception.OperationInProgress
|
||||
|
||||
self.assertRaises(exception.OperationInProgress, check_thread_lock)
|
||||
assert not mock_object_release.called
|
||||
|
||||
@patch('magnum.objects.BayLock.release', return_value=None)
|
||||
@patch('magnum.objects.BayLock.create', return_value=None)
|
||||
def test_thread_lock_acquire_success_no_exception(self, mock_object_create,
|
||||
mock_object_release):
|
||||
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
|
||||
with baylock.thread_lock(self.bay.uuid):
|
||||
self.assertEqual(1, mock_object_create.call_count)
|
||||
assert not mock_object_release.called
|
||||
|
||||
@patch('magnum.conductor.api.ListenerAPI.__new__')
|
||||
def test_conductor_alive_ok(self, mock_listener_api_new):
|
||||
mock_listener_api = mock.MagicMock()
|
||||
mock_listener_api.ping_conductor.return_value = True
|
||||
mock_listener_api_new.return_value = mock_listener_api
|
||||
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
|
||||
|
||||
ret = baylock.conductor_alive(self.context, self.conductor_id)
|
||||
|
||||
self.assertIs(True, ret)
|
||||
self.assertEqual(1, mock_listener_api_new.call_count)
|
||||
|
||||
@patch('magnum.conductor.api.ListenerAPI.__new__')
|
||||
def test_conductor_alive_timeout(self, mock_listener_api_new):
|
||||
mock_listener_api = mock.MagicMock()
|
||||
mock_listener_api.ping_conductor.side_effect = (
|
||||
messaging.MessagingTimeout('too slow'))
|
||||
mock_listener_api_new.return_value = mock_listener_api
|
||||
baylock = bay_lock.BayLock(self.context, self.bay, self.conductor_id)
|
||||
|
||||
ret = baylock.conductor_alive(self.context, self.conductor_id)
|
||||
|
||||
self.assertIs(False, ret)
|
||||
self.assertEqual(1, mock_listener_api_new.call_count)
|
Loading…
Reference in New Issue