Implement stack-locking for multi-engine support
Implements a distributed stack lock using the database to avoid race conditions when multiple engines are deployed. Blueprint multiple-engines Change-Id: If3c47dafcc5bc1b2188b7737205291bbab8bc231
This commit is contained in:
parent
23fe65d714
commit
d7ae961fca
|
@ -42,6 +42,10 @@
|
||||||
# unlimited events per stack. (integer value)
|
# unlimited events per stack. (integer value)
|
||||||
#max_events_per_stack=1000
|
#max_events_per_stack=1000
|
||||||
|
|
||||||
|
# RPC timeout for the engine liveness check that is used for
|
||||||
|
# stack locking. (integer value)
|
||||||
|
#engine_life_check_timeout=2
|
||||||
|
|
||||||
# Name of the engine node. This can be an opaque identifier.It
|
# Name of the engine node. This can be an opaque identifier.It
|
||||||
# is not necessarily a hostname, FQDN, or IP address. (string
|
# is not necessarily a hostname, FQDN, or IP address. (string
|
||||||
# value)
|
# value)
|
||||||
|
|
|
@ -249,6 +249,16 @@ class HeatAPINotImplementedError(HeatAPIException):
|
||||||
err_type = "Server"
|
err_type = "Server"
|
||||||
|
|
||||||
|
|
||||||
|
class HeatActionInProgressError(HeatAPIException):
|
||||||
|
'''
|
||||||
|
Cannot perform action on stack in its current state
|
||||||
|
'''
|
||||||
|
code = 400
|
||||||
|
title = 'InvalidAction'
|
||||||
|
explanation = ("Cannot perform action on stack while other actions are " +
|
||||||
|
"in progress")
|
||||||
|
|
||||||
|
|
||||||
def map_remote_error(ex):
|
def map_remote_error(ex):
|
||||||
"""
|
"""
|
||||||
Map rpc_common.RemoteError exceptions returned by the engine
|
Map rpc_common.RemoteError exceptions returned by the engine
|
||||||
|
@ -273,6 +283,7 @@ def map_remote_error(ex):
|
||||||
)
|
)
|
||||||
denied_errors = ('Forbidden', 'NotAuthorized')
|
denied_errors = ('Forbidden', 'NotAuthorized')
|
||||||
already_exists_errors = ('StackExists')
|
already_exists_errors = ('StackExists')
|
||||||
|
invalid_action_errors = ('ActionInProgress',)
|
||||||
|
|
||||||
ex_type = ex.__class__.__name__
|
ex_type = ex.__class__.__name__
|
||||||
|
|
||||||
|
@ -285,6 +296,8 @@ def map_remote_error(ex):
|
||||||
return HeatAccessDeniedError(detail=str(ex))
|
return HeatAccessDeniedError(detail=str(ex))
|
||||||
elif ex_type in already_exists_errors:
|
elif ex_type in already_exists_errors:
|
||||||
return AlreadyExistsError(detail=str(ex))
|
return AlreadyExistsError(detail=str(ex))
|
||||||
|
elif ex_type in invalid_action_errors:
|
||||||
|
return HeatActionInProgressError(detail=str(ex))
|
||||||
else:
|
else:
|
||||||
# Map everything else to internal server error for now
|
# Map everything else to internal server error for now
|
||||||
return HeatInternalFailureError(detail=str(ex))
|
return HeatInternalFailureError(detail=str(ex))
|
||||||
|
|
|
@ -58,6 +58,7 @@ class FaultWrapper(wsgi.Middleware):
|
||||||
|
|
||||||
error_map = {
|
error_map = {
|
||||||
'AttributeError': webob.exc.HTTPBadRequest,
|
'AttributeError': webob.exc.HTTPBadRequest,
|
||||||
|
'ActionInProgress': webob.exc.HTTPConflict,
|
||||||
'ValueError': webob.exc.HTTPBadRequest,
|
'ValueError': webob.exc.HTTPBadRequest,
|
||||||
'StackNotFound': webob.exc.HTTPNotFound,
|
'StackNotFound': webob.exc.HTTPNotFound,
|
||||||
'ResourceNotFound': webob.exc.HTTPNotFound,
|
'ResourceNotFound': webob.exc.HTTPNotFound,
|
||||||
|
|
|
@ -106,7 +106,12 @@ engine_opts = [
|
||||||
default=1000,
|
default=1000,
|
||||||
help=_('Maximum events that will be available per stack. Older'
|
help=_('Maximum events that will be available per stack. Older'
|
||||||
' events will be deleted when this is reached. Set to 0'
|
' events will be deleted when this is reached. Set to 0'
|
||||||
' for unlimited events per stack.'))]
|
' for unlimited events per stack.')),
|
||||||
|
cfg.IntOpt('engine_life_check_timeout',
|
||||||
|
default=2,
|
||||||
|
help=_('RPC timeout for the engine liveness check that is used'
|
||||||
|
' for stack locking.'))]
|
||||||
|
|
||||||
rpc_opts = [
|
rpc_opts = [
|
||||||
cfg.StrOpt('host',
|
cfg.StrOpt('host',
|
||||||
default=socket.gethostname(),
|
default=socket.gethostname(),
|
||||||
|
|
|
@ -334,3 +334,8 @@ class RequestLimitExceeded(HeatException):
|
||||||
|
|
||||||
class StackResourceLimitExceeded(HeatException):
|
class StackResourceLimitExceeded(HeatException):
|
||||||
msg_fmt = _('Maximum resources per stack exceeded.')
|
msg_fmt = _('Maximum resources per stack exceeded.')
|
||||||
|
|
||||||
|
|
||||||
|
class ActionInProgress(HeatException):
|
||||||
|
msg_fmt = _("Stack %(stack_name)s already has an action (%(action)s) "
|
||||||
|
"in progress.")
|
||||||
|
|
|
@ -0,0 +1,102 @@
|
||||||
|
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import uuid
|
||||||
|
from oslo.config import cfg
|
||||||
|
|
||||||
|
cfg.CONF.import_opt('engine_life_check_timeout', 'heat.common.config')
|
||||||
|
|
||||||
|
from heat.common import exception
|
||||||
|
from heat.db import api as db_api
|
||||||
|
|
||||||
|
from heat.openstack.common import log as logging
|
||||||
|
from heat.openstack.common.gettextutils import _
|
||||||
|
from heat.openstack.common.rpc import proxy
|
||||||
|
from heat.openstack.common.rpc.common import Timeout
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
engine_id = str(uuid.uuid4())
|
||||||
|
|
||||||
|
|
||||||
|
class StackLock(object):
|
||||||
|
def __init__(self, context, stack):
|
||||||
|
self.context = context
|
||||||
|
self.stack = stack
|
||||||
|
self.listener = None
|
||||||
|
|
||||||
|
def _engine_alive(self, engine_id):
|
||||||
|
topic = engine_id
|
||||||
|
rpc = proxy.RpcProxy(topic, "1.0")
|
||||||
|
msg = rpc.make_msg("listening")
|
||||||
|
try:
|
||||||
|
return rpc.call(self.context, msg, topic=topic,
|
||||||
|
timeout=cfg.CONF.engine_life_check_timeout)
|
||||||
|
except Timeout:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def acquire(self, retry=True):
|
||||||
|
"""Acquire a lock on the stack."""
|
||||||
|
lock_engine_id = db_api.stack_lock_create(self.stack.id, engine_id)
|
||||||
|
if lock_engine_id is None:
|
||||||
|
logger.debug(_("Engine %(engine)s acquired lock on stack "
|
||||||
|
"%(stack)s") % {'engine': engine_id,
|
||||||
|
'stack': self.stack.id})
|
||||||
|
return
|
||||||
|
|
||||||
|
if lock_engine_id == engine_id or self._engine_alive(lock_engine_id):
|
||||||
|
logger.debug(_("Lock on stack %(stack)s is owned by engine "
|
||||||
|
"%(engine)s") % {'stack': self.stack.id,
|
||||||
|
'engine': lock_engine_id})
|
||||||
|
raise exception.ActionInProgress(stack_name=self.stack.name,
|
||||||
|
action=self.stack.action)
|
||||||
|
else:
|
||||||
|
logger.info(_("Stale lock detected on stack %(stack)s. Engine "
|
||||||
|
"%(engine)s will attempt to steal the lock")
|
||||||
|
% {'stack': self.stack.id, 'engine': engine_id})
|
||||||
|
|
||||||
|
result = db_api.stack_lock_steal(self.stack.id, lock_engine_id,
|
||||||
|
engine_id)
|
||||||
|
|
||||||
|
if result is None:
|
||||||
|
logger.info(_("Engine %(engine)s successfully stole the lock "
|
||||||
|
"on stack %(stack)s") % {'engine': engine_id,
|
||||||
|
'stack': self.stack.id})
|
||||||
|
return
|
||||||
|
elif result is True:
|
||||||
|
if retry:
|
||||||
|
logger.info(_("The lock on stack %(stack)s was released "
|
||||||
|
"while engine %(engine)s was stealing it. "
|
||||||
|
"Trying again") % {'stack': self.stack.id,
|
||||||
|
'engine': engine_id})
|
||||||
|
return self.acquire(retry=False)
|
||||||
|
else:
|
||||||
|
new_lock_engine_id = result
|
||||||
|
logger.info(_("Failed to steal lock on stack %(stack)s. "
|
||||||
|
"Engine %(engine)s stole the lock first")
|
||||||
|
% {'stack': self.stack.id,
|
||||||
|
'engine': new_lock_engine_id})
|
||||||
|
|
||||||
|
raise exception.ActionInProgress(
|
||||||
|
stack_name=self.stack.name, action=self.stack.action)
|
||||||
|
|
||||||
|
def release(self):
|
||||||
|
"""Release a stack lock."""
|
||||||
|
# Only the engine that owns the lock will be releasing it.
|
||||||
|
result = db_api.stack_lock_release(self.stack.id, engine_id)
|
||||||
|
if result is True:
|
||||||
|
logger.warning(_("Lock was already released on stack %s!")
|
||||||
|
% self.stack.id)
|
||||||
|
else:
|
||||||
|
logger.debug(_("Engine %(engine)s released lock on stack "
|
||||||
|
"%(stack)s") % {'engine': engine_id,
|
||||||
|
'stack': self.stack.id})
|
|
@ -0,0 +1,181 @@
|
||||||
|
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from heat.common import exception
|
||||||
|
from heat.db import api as db_api
|
||||||
|
from heat.engine import stack_lock
|
||||||
|
from heat.openstack.common.rpc import proxy
|
||||||
|
from heat.openstack.common.rpc.common import Timeout
|
||||||
|
from heat.tests.common import HeatTestCase
|
||||||
|
from heat.tests import utils
|
||||||
|
|
||||||
|
|
||||||
|
class StackLockTest(HeatTestCase):
|
||||||
|
def setUp(self):
|
||||||
|
super(StackLockTest, self).setUp()
|
||||||
|
utils.setup_dummy_db()
|
||||||
|
self.context = utils.dummy_context()
|
||||||
|
self.stack = self.m.CreateMockAnything()
|
||||||
|
self.stack.id = "aae01f2d-52ae-47ac-8a0d-3fde3d220fea"
|
||||||
|
self.stack.name = "test_stack"
|
||||||
|
self.stack.action = "CREATE"
|
||||||
|
|
||||||
|
def test_successful_acquire_new_lock(self):
|
||||||
|
self.m.StubOutWithMock(db_api, "stack_lock_create")
|
||||||
|
db_api.stack_lock_create(self.stack.id, stack_lock.engine_id).\
|
||||||
|
AndReturn(None)
|
||||||
|
|
||||||
|
self.m.ReplayAll()
|
||||||
|
|
||||||
|
slock = stack_lock.StackLock(self.context, self.stack)
|
||||||
|
slock.acquire()
|
||||||
|
self.m.VerifyAll()
|
||||||
|
|
||||||
|
def test_failed_acquire_existing_lock_current_engine(self):
|
||||||
|
self.m.StubOutWithMock(db_api, "stack_lock_create")
|
||||||
|
db_api.stack_lock_create(self.stack.id, stack_lock.engine_id).\
|
||||||
|
AndReturn(stack_lock.engine_id)
|
||||||
|
|
||||||
|
self.m.ReplayAll()
|
||||||
|
|
||||||
|
slock = stack_lock.StackLock(self.context, self.stack)
|
||||||
|
self.assertRaises(exception.ActionInProgress, slock.acquire)
|
||||||
|
self.m.VerifyAll()
|
||||||
|
|
||||||
|
def test_successful_acquire_existing_lock_engine_dead(self):
|
||||||
|
self.m.StubOutWithMock(db_api, "stack_lock_create")
|
||||||
|
db_api.stack_lock_create(self.stack.id, stack_lock.engine_id).\
|
||||||
|
AndReturn("fake-engine-id")
|
||||||
|
|
||||||
|
topic = self.stack.id
|
||||||
|
self.m.StubOutWithMock(proxy.RpcProxy, "call")
|
||||||
|
rpc = proxy.RpcProxy(topic, "1.0")
|
||||||
|
rpc.call(self.context, rpc.make_msg("listening"), timeout=2,
|
||||||
|
topic="fake-engine-id").AndRaise(Timeout)
|
||||||
|
|
||||||
|
self.m.StubOutWithMock(db_api, "stack_lock_steal")
|
||||||
|
db_api.stack_lock_steal(self.stack.id, "fake-engine-id",
|
||||||
|
stack_lock.engine_id).AndReturn(None)
|
||||||
|
|
||||||
|
self.m.ReplayAll()
|
||||||
|
|
||||||
|
slock = stack_lock.StackLock(self.context, self.stack)
|
||||||
|
slock.acquire()
|
||||||
|
self.m.VerifyAll()
|
||||||
|
|
||||||
|
def test_failed_acquire_existing_lock_engine_alive(self):
|
||||||
|
self.m.StubOutWithMock(db_api, "stack_lock_create")
|
||||||
|
db_api.stack_lock_create(self.stack.id, stack_lock.engine_id).\
|
||||||
|
AndReturn("fake-engine-id")
|
||||||
|
|
||||||
|
topic = self.stack.id
|
||||||
|
self.m.StubOutWithMock(proxy.RpcProxy, "call")
|
||||||
|
rpc = proxy.RpcProxy(topic, "1.0")
|
||||||
|
rpc.call(self.context, rpc.make_msg("listening"), timeout=2,
|
||||||
|
topic="fake-engine-id").AndReturn(True)
|
||||||
|
|
||||||
|
self.m.ReplayAll()
|
||||||
|
|
||||||
|
slock = stack_lock.StackLock(self.context, self.stack)
|
||||||
|
self.assertRaises(exception.ActionInProgress, slock.acquire)
|
||||||
|
self.m.VerifyAll()
|
||||||
|
|
||||||
|
def test_failed_acquire_existing_lock_engine_dead(self):
|
||||||
|
self.m.StubOutWithMock(db_api, "stack_lock_create")
|
||||||
|
db_api.stack_lock_create(self.stack.id, stack_lock.engine_id).\
|
||||||
|
AndReturn("fake-engine-id")
|
||||||
|
|
||||||
|
topic = self.stack.id
|
||||||
|
self.m.StubOutWithMock(proxy.RpcProxy, "call")
|
||||||
|
rpc = proxy.RpcProxy(topic, "1.0")
|
||||||
|
rpc.call(self.context, rpc.make_msg("listening"), timeout=2,
|
||||||
|
topic="fake-engine-id").AndRaise(Timeout)
|
||||||
|
|
||||||
|
self.m.StubOutWithMock(db_api, "stack_lock_steal")
|
||||||
|
db_api.stack_lock_steal(self.stack.id, "fake-engine-id",
|
||||||
|
stack_lock.engine_id).\
|
||||||
|
AndReturn("fake-engine-id2")
|
||||||
|
|
||||||
|
self.m.ReplayAll()
|
||||||
|
|
||||||
|
slock = stack_lock.StackLock(self.context, self.stack)
|
||||||
|
self.assertRaises(exception.ActionInProgress, slock.acquire)
|
||||||
|
self.m.VerifyAll()
|
||||||
|
|
||||||
|
def test_successful_acquire_with_retry(self):
|
||||||
|
self.m.StubOutWithMock(db_api, "stack_lock_create")
|
||||||
|
db_api.stack_lock_create(self.stack.id, stack_lock.engine_id).\
|
||||||
|
AndReturn("fake-engine-id")
|
||||||
|
|
||||||
|
topic = self.stack.id
|
||||||
|
self.m.StubOutWithMock(proxy.RpcProxy, "call")
|
||||||
|
rpc = proxy.RpcProxy(topic, "1.0")
|
||||||
|
rpc.call(self.context, rpc.make_msg("listening"), timeout=2,
|
||||||
|
topic="fake-engine-id").AndRaise(Timeout)
|
||||||
|
|
||||||
|
self.m.StubOutWithMock(db_api, "stack_lock_steal")
|
||||||
|
db_api.stack_lock_steal(self.stack.id, "fake-engine-id",
|
||||||
|
stack_lock.engine_id).\
|
||||||
|
AndReturn(True)
|
||||||
|
|
||||||
|
db_api.stack_lock_create(self.stack.id, stack_lock.engine_id).\
|
||||||
|
AndReturn("fake-engine-id")
|
||||||
|
|
||||||
|
topic = self.stack.id
|
||||||
|
rpc = proxy.RpcProxy(topic, "1.0")
|
||||||
|
rpc.call(self.context, rpc.make_msg("listening"), timeout=2,
|
||||||
|
topic="fake-engine-id").AndRaise(Timeout)
|
||||||
|
|
||||||
|
db_api.stack_lock_steal(self.stack.id, "fake-engine-id",
|
||||||
|
stack_lock.engine_id).\
|
||||||
|
AndReturn(None)
|
||||||
|
|
||||||
|
self.m.ReplayAll()
|
||||||
|
|
||||||
|
slock = stack_lock.StackLock(self.context, self.stack)
|
||||||
|
slock.acquire()
|
||||||
|
self.m.VerifyAll()
|
||||||
|
|
||||||
|
def test_failed_acquire_one_retry_only(self):
|
||||||
|
self.m.StubOutWithMock(db_api, "stack_lock_create")
|
||||||
|
db_api.stack_lock_create(self.stack.id, stack_lock.engine_id).\
|
||||||
|
AndReturn("fake-engine-id")
|
||||||
|
|
||||||
|
topic = self.stack.id
|
||||||
|
self.m.StubOutWithMock(proxy.RpcProxy, "call")
|
||||||
|
rpc = proxy.RpcProxy(topic, "1.0")
|
||||||
|
rpc.call(self.context, rpc.make_msg("listening"), timeout=2,
|
||||||
|
topic="fake-engine-id").AndRaise(Timeout)
|
||||||
|
|
||||||
|
self.m.StubOutWithMock(db_api, "stack_lock_steal")
|
||||||
|
db_api.stack_lock_steal(self.stack.id, "fake-engine-id",
|
||||||
|
stack_lock.engine_id).\
|
||||||
|
AndReturn(True)
|
||||||
|
|
||||||
|
db_api.stack_lock_create(self.stack.id, stack_lock.engine_id).\
|
||||||
|
AndReturn("fake-engine-id")
|
||||||
|
|
||||||
|
topic = self.stack.id
|
||||||
|
rpc = proxy.RpcProxy(topic, "1.0")
|
||||||
|
rpc.call(self.context, rpc.make_msg("listening"), timeout=2,
|
||||||
|
topic="fake-engine-id").AndRaise(Timeout)
|
||||||
|
|
||||||
|
db_api.stack_lock_steal(self.stack.id, "fake-engine-id",
|
||||||
|
stack_lock.engine_id).\
|
||||||
|
AndReturn(True)
|
||||||
|
|
||||||
|
self.m.ReplayAll()
|
||||||
|
|
||||||
|
slock = stack_lock.StackLock(self.context, self.stack)
|
||||||
|
self.assertRaises(exception.ActionInProgress, slock.acquire)
|
||||||
|
self.m.VerifyAll()
|
Loading…
Reference in New Issue