Merge "Legacy delete attempt thread cancel before stop"
This commit is contained in:
commit
29230abfd6
|
@ -1387,31 +1387,68 @@ class EngineService(service.Service):
|
|||
if acquire_result == self.engine_id:
|
||||
# give threads which are almost complete an opportunity to
|
||||
# finish naturally before force stopping them
|
||||
eventlet.sleep(0.2)
|
||||
self.thread_group_mgr.stop(stack.id)
|
||||
self.thread_group_mgr.send(stack.id, rpc_api.THREAD_CANCEL)
|
||||
|
||||
# Another active engine has the lock
|
||||
elif service_utils.engine_alive(cnxt, acquire_result):
|
||||
stop_result = self._remote_call(
|
||||
cnxt, acquire_result, self.listener.STOP_STACK,
|
||||
stack_identity=stack_identity)
|
||||
if stop_result is None:
|
||||
LOG.debug("Successfully stopped remote task on engine %s"
|
||||
% acquire_result)
|
||||
cancel_result = self._remote_call(
|
||||
cnxt, acquire_result, self.listener.SEND,
|
||||
stack_identity=stack_identity, message=rpc_api.THREAD_CANCEL)
|
||||
if cancel_result is None:
|
||||
LOG.debug("Successfully sent %(msg)s message "
|
||||
"to remote task on engine %(eng)s" % {
|
||||
'eng': acquire_result,
|
||||
'msg': rpc_api.THREAD_CANCEL})
|
||||
else:
|
||||
raise exception.StopActionFailed(stack_name=stack.name,
|
||||
engine_id=acquire_result)
|
||||
raise exception.EventSendFailed(stack_name=stack.name,
|
||||
engine_id=acquire_result)
|
||||
|
||||
# There may be additional resources that we don't know about
|
||||
# if an update was in-progress when the stack was stopped, so
|
||||
# reload the stack from the database.
|
||||
st = self._get_stack(cnxt, stack_identity)
|
||||
stack = parser.Stack.load(cnxt, stack=st)
|
||||
self.resource_enforcer.enforce_stack(stack)
|
||||
def reload():
|
||||
st = self._get_stack(cnxt, stack_identity)
|
||||
stack = parser.Stack.load(cnxt, stack=st)
|
||||
self.resource_enforcer.enforce_stack(stack)
|
||||
return stack
|
||||
|
||||
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
|
||||
stack.delete)
|
||||
return
|
||||
def wait_then_delete(stack):
|
||||
watch = timeutils.StopWatch(cfg.CONF.error_wait_time + 10)
|
||||
watch.start()
|
||||
|
||||
while not watch.expired():
|
||||
LOG.debug('Waiting for stack cancel to complete: %s' %
|
||||
stack.name)
|
||||
with lock.try_thread_lock() as acquire_result:
|
||||
|
||||
if acquire_result is None:
|
||||
stack = reload()
|
||||
# do the actual delete with the aquired lock
|
||||
self.thread_group_mgr.start_with_acquired_lock(
|
||||
stack, lock, stack.delete)
|
||||
return
|
||||
eventlet.sleep(1.0)
|
||||
|
||||
if acquire_result == self.engine_id:
|
||||
# cancel didn't finish in time, attempt a stop instead
|
||||
self.thread_group_mgr.stop(stack.id)
|
||||
elif service_utils.engine_alive(cnxt, acquire_result):
|
||||
# Another active engine has the lock
|
||||
stop_result = self._remote_call(
|
||||
cnxt, acquire_result, self.listener.STOP_STACK,
|
||||
stack_identity=stack_identity)
|
||||
if stop_result is None:
|
||||
LOG.debug("Successfully stopped remote task "
|
||||
"on engine %s" % acquire_result)
|
||||
else:
|
||||
raise exception.StopActionFailed(
|
||||
stack_name=stack.name, engine_id=acquire_result)
|
||||
|
||||
stack = reload()
|
||||
# do the actual delete in a locked task
|
||||
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
|
||||
stack.delete)
|
||||
|
||||
# Cancelling the stack could take some time, so do it in a task
|
||||
self.thread_group_mgr.start(stack.id, wait_then_delete,
|
||||
stack)
|
||||
|
||||
@context.request_context
|
||||
def export_stack(self, cnxt, stack_identity):
|
||||
|
|
|
@ -11,7 +11,9 @@
|
|||
# under the License.
|
||||
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_messaging.rpc import dispatcher
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from heat.common import exception
|
||||
from heat.common import service_utils
|
||||
|
@ -97,8 +99,11 @@ class StackDeleteTest(common.HeatTestCase):
|
|||
@mock.patch.object(parser.Stack, 'load')
|
||||
@mock.patch.object(stack_lock.StackLock, 'try_acquire')
|
||||
@mock.patch.object(stack_lock.StackLock, 'acquire')
|
||||
def test_stack_delete_current_engine_active_lock(self, mock_acquire,
|
||||
mock_try, mock_load):
|
||||
@mock.patch.object(timeutils.StopWatch, 'expired')
|
||||
def test_stack_delete_current_engine_active_lock(self, mock_expired,
|
||||
mock_acquire, mock_try,
|
||||
mock_load):
|
||||
cfg.CONF.set_override('error_wait_time', 0)
|
||||
self.man.start()
|
||||
stack_name = 'service_delete_test_stack_current_active_lock'
|
||||
stack = tools.get_stack(stack_name, self.ctx)
|
||||
|
@ -108,27 +113,32 @@ class StackDeleteTest(common.HeatTestCase):
|
|||
stack_lock_object.StackLock.create(
|
||||
self.ctx, stack.id, self.man.engine_id)
|
||||
|
||||
# Create a fake ThreadGroup too
|
||||
self.man.thread_group_mgr.groups[stack.id] = tools.DummyThreadGroup()
|
||||
st = stack_object.Stack.get_by_id(self.ctx, sid)
|
||||
|
||||
mock_load.return_value = stack
|
||||
mock_try.return_value = self.man.engine_id
|
||||
mock_stop = self.patchobject(self.man.thread_group_mgr, 'stop')
|
||||
mock_send = self.patchobject(self.man.thread_group_mgr, 'send')
|
||||
mock_expired.side_effect = [False, True]
|
||||
|
||||
self.assertIsNone(self.man.delete_stack(self.ctx, stack.identifier()))
|
||||
self.man.thread_group_mgr.groups[sid].wait()
|
||||
|
||||
mock_load.assert_called_with(self.ctx, stack=st)
|
||||
self.assertEqual(2, len(mock_load.mock_calls))
|
||||
mock_try.assert_called_once_with()
|
||||
mock_acquire.assert_called_once_with(True)
|
||||
mock_send.assert_called_once_with(stack.id, 'cancel')
|
||||
mock_stop.assert_called_once_with(stack.id)
|
||||
self.assertEqual(2, len(mock_load.mock_calls))
|
||||
mock_try.assert_called_with()
|
||||
mock_acquire.assert_called_once_with(True)
|
||||
|
||||
@mock.patch.object(parser.Stack, 'load')
|
||||
@mock.patch.object(stack_lock.StackLock, 'try_acquire')
|
||||
@mock.patch.object(service_utils, 'engine_alive')
|
||||
def test_stack_delete_other_engine_active_lock_failed(self, mock_alive,
|
||||
mock_try, mock_load):
|
||||
@mock.patch.object(timeutils.StopWatch, 'expired')
|
||||
def test_stack_delete_other_engine_active_lock_failed(self, mock_expired,
|
||||
mock_alive, mock_try,
|
||||
mock_load):
|
||||
cfg.CONF.set_override('error_wait_time', 0)
|
||||
OTHER_ENGINE = "other-engine-fake-uuid"
|
||||
self.man.start()
|
||||
stack_name = 'service_delete_test_stack_other_engine_lock_fail'
|
||||
|
@ -142,6 +152,7 @@ class StackDeleteTest(common.HeatTestCase):
|
|||
mock_load.return_value = stack
|
||||
mock_try.return_value = OTHER_ENGINE
|
||||
mock_alive.return_value = True
|
||||
mock_expired.side_effect = [False, True]
|
||||
|
||||
mock_call = self.patchobject(self.man, '_remote_call',
|
||||
return_value=False)
|
||||
|
@ -149,20 +160,23 @@ class StackDeleteTest(common.HeatTestCase):
|
|||
ex = self.assertRaises(dispatcher.ExpectedException,
|
||||
self.man.delete_stack,
|
||||
self.ctx, stack.identifier())
|
||||
self.assertEqual(exception.StopActionFailed, ex.exc_info[0])
|
||||
self.assertEqual(exception.EventSendFailed, ex.exc_info[0])
|
||||
|
||||
mock_load.assert_called_once_with(self.ctx, stack=st)
|
||||
mock_try.assert_called_once_with()
|
||||
mock_alive.assert_called_once_with(self.ctx, OTHER_ENGINE)
|
||||
mock_call.assert_called_once_with(self.ctx, OTHER_ENGINE, "stop_stack",
|
||||
mock_call.assert_called_once_with(self.ctx, OTHER_ENGINE, "send",
|
||||
message='cancel',
|
||||
stack_identity=mock.ANY)
|
||||
|
||||
@mock.patch.object(parser.Stack, 'load')
|
||||
@mock.patch.object(stack_lock.StackLock, 'try_acquire')
|
||||
@mock.patch.object(service_utils, 'engine_alive')
|
||||
@mock.patch.object(stack_lock.StackLock, 'acquire')
|
||||
@mock.patch.object(timeutils.StopWatch, 'expired')
|
||||
def test_stack_delete_other_engine_active_lock_succeeded(
|
||||
self, mock_acquire, mock_alive, mock_try, mock_load):
|
||||
self, mock_expired, mock_acquire, mock_alive, mock_try, mock_load):
|
||||
cfg.CONF.set_override('error_wait_time', 0)
|
||||
|
||||
OTHER_ENGINE = "other-engine-fake-uuid"
|
||||
self.man.start()
|
||||
|
@ -177,6 +191,7 @@ class StackDeleteTest(common.HeatTestCase):
|
|||
mock_load.return_value = stack
|
||||
mock_try.return_value = OTHER_ENGINE
|
||||
mock_alive.return_value = True
|
||||
mock_expired.side_effect = [False, True]
|
||||
mock_call = self.patchobject(self.man, '_remote_call',
|
||||
return_value=None)
|
||||
|
||||
|
@ -185,18 +200,25 @@ class StackDeleteTest(common.HeatTestCase):
|
|||
|
||||
self.assertEqual(2, len(mock_load.mock_calls))
|
||||
mock_load.assert_called_with(self.ctx, stack=st)
|
||||
mock_try.assert_called_once_with()
|
||||
mock_alive.assert_called_once_with(self.ctx, OTHER_ENGINE)
|
||||
mock_call.assert_called_once_with(self.ctx, OTHER_ENGINE, "stop_stack",
|
||||
stack_identity=mock.ANY)
|
||||
mock_try.assert_called_with()
|
||||
mock_alive.assert_called_with(self.ctx, OTHER_ENGINE)
|
||||
mock_call.assert_has_calls([
|
||||
mock.call(self.ctx, OTHER_ENGINE, "send",
|
||||
message='cancel',
|
||||
stack_identity=mock.ANY),
|
||||
mock.call(self.ctx, OTHER_ENGINE, "stop_stack",
|
||||
stack_identity=mock.ANY)
|
||||
])
|
||||
mock_acquire.assert_called_once_with(True)
|
||||
|
||||
@mock.patch.object(parser.Stack, 'load')
|
||||
@mock.patch.object(stack_lock.StackLock, 'try_acquire')
|
||||
@mock.patch.object(service_utils, 'engine_alive')
|
||||
@mock.patch.object(stack_lock.StackLock, 'acquire')
|
||||
@mock.patch.object(timeutils.StopWatch, 'expired')
|
||||
def test_stack_delete_other_dead_engine_active_lock(
|
||||
self, mock_acquire, mock_alive, mock_try, mock_load):
|
||||
self, mock_expired, mock_acquire, mock_alive, mock_try, mock_load):
|
||||
cfg.CONF.set_override('error_wait_time', 0)
|
||||
OTHER_ENGINE = "other-engine-fake-uuid"
|
||||
stack_name = 'service_delete_test_stack_other_dead_engine'
|
||||
stack = tools.get_stack(stack_name, self.ctx)
|
||||
|
@ -210,11 +232,12 @@ class StackDeleteTest(common.HeatTestCase):
|
|||
mock_load.return_value = stack
|
||||
mock_try.return_value = OTHER_ENGINE
|
||||
mock_alive.return_value = False
|
||||
mock_expired.side_effect = [False, True]
|
||||
|
||||
self.assertIsNone(self.man.delete_stack(self.ctx, stack.identifier()))
|
||||
self.man.thread_group_mgr.groups[sid].wait()
|
||||
|
||||
mock_load.assert_called_with(self.ctx, stack=st)
|
||||
mock_try.assert_called_once_with()
|
||||
mock_try.assert_called_with()
|
||||
mock_acquire.assert_called_once_with(True)
|
||||
mock_alive.assert_called_once_with(self.ctx, OTHER_ENGINE)
|
||||
mock_alive.assert_called_with(self.ctx, OTHER_ENGINE)
|
||||
|
|
Loading…
Reference in New Issue