Convergence: basic framework for cancelling workers

Implements mechanism to cancel existing workers (in_progress resources).
The stack-cancel-update request lands in one of the engines, and if
there are any workers in that engine which are working for the stack,
they are cancelled first and then other engines are requested to cancel
the workers.

Change-Id: I464c4fdb760247d436473af49448f7797dc0130d
This commit is contained in:
Anant Patil 2016-08-18 15:31:23 +05:30 committed by Thomas Herve
parent b9d1e30a01
commit 873a40851d
8 changed files with 290 additions and 78 deletions

View File

@ -142,6 +142,10 @@ def resource_get_all_by_root_stack(context, stack_id, filters=None):
return IMPL.resource_get_all_by_root_stack(context, stack_id, filters) return IMPL.resource_get_all_by_root_stack(context, stack_id, filters)
def engine_get_all_locked_by_stack(context, stack_id):
return IMPL.engine_get_all_locked_by_stack(context, stack_id)
def resource_purge_deleted(context, stack_id): def resource_purge_deleted(context, stack_id):
return IMPL.resource_purge_deleted(context, stack_id) return IMPL.resource_purge_deleted(context, stack_id)

View File

@ -410,6 +410,15 @@ def resource_get_all_by_root_stack(context, stack_id, filters=None):
return dict((res.id, res) for res in results) return dict((res.id, res) for res in results)
def engine_get_all_locked_by_stack(context, stack_id):
query = context.session.query(
func.distinct(models.Resource.engine_id)
).filter(
models.Resource.stack_id == stack_id,
models.Resource.engine_id.isnot(None))
return set(i[0] for i in query.all())
def stack_get_by_name_and_owner_id(context, stack_name, owner_id): def stack_get_by_name_and_owner_id(context, stack_name, owner_id):
query = soft_delete_aware_query( query = soft_delete_aware_query(
context, models.Stack context, models.Stack

View File

@ -25,12 +25,16 @@ from heat.common.i18n import _LE
from heat.common.i18n import _LI from heat.common.i18n import _LI
from heat.common.i18n import _LW from heat.common.i18n import _LW
from heat.common import messaging as rpc_messaging from heat.common import messaging as rpc_messaging
from heat.db import api as db_api
from heat.engine import check_resource from heat.engine import check_resource
from heat.engine import sync_point from heat.engine import sync_point
from heat.rpc import api as rpc_api
from heat.rpc import worker_client as rpc_client from heat.rpc import worker_client as rpc_client
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CANCEL_RETRIES = 3
@profiler.trace_cls("rpc") @profiler.trace_cls("rpc")
class WorkerService(service.Service): class WorkerService(service.Service):
@ -107,6 +111,26 @@ class WorkerService(service.Service):
"%(name)s while cancelling the operation."), "%(name)s while cancelling the operation."),
{'name': stack.name, 'trvsl': old_trvsl}) {'name': stack.name, 'trvsl': old_trvsl})
def stop_all_workers(self, stack):
# stop the traversal
if stack.status == stack.IN_PROGRESS:
self.stop_traversal(stack)
# cancel existing workers
cancelled = _cancel_workers(stack, self.thread_group_mgr,
self.engine_id, self._rpc_client)
if not cancelled:
LOG.error(_LE("Failed to stop all workers of stack %(name)s "
", stack cancel not complete"),
{'name': stack.name})
return False
LOG.info(_LI('[%(name)s(%(id)s)] Stopped all active workers for stack '
'%(action)s'),
{'name': stack.name, 'id': stack.id, 'action': stack.action})
return True
@context.request_context @context.request_context
def check_resource(self, cnxt, resource_id, current_traversal, data, def check_resource(self, cnxt, resource_id, current_traversal, data,
is_update, adopt_stack_data): is_update, adopt_stack_data):
@ -145,5 +169,42 @@ class WorkerService(service.Service):
All the workers running for the given stack will be All the workers running for the given stack will be
cancelled. cancelled.
""" """
# TODO(ananta): Implement cancel check-resource _cancel_check_resource(stack_id, self.engine_id, self.thread_group_mgr)
LOG.debug('Cancelling workers for stack [%s]', stack_id)
def _cancel_check_resource(stack_id, engine_id, tgm):
LOG.debug('Cancelling workers for stack [%s] in engine [%s]',
stack_id, engine_id)
tgm.send(stack_id, rpc_api.THREAD_CANCEL)
def _wait_for_cancellation(stack, wait=5):
# give enough time to wait till cancel is completed
retries = CANCEL_RETRIES
while retries > 0:
retries -= 1
eventlet.sleep(wait)
engines = db_api.engine_get_all_locked_by_stack(
stack.context, stack.id)
if not engines:
return True
return False
def _cancel_workers(stack, tgm, local_engine_id, rpc_client):
engines = db_api.engine_get_all_locked_by_stack(stack.context, stack.id)
if not engines:
return True
# cancel workers running locally
if local_engine_id in engines:
_cancel_check_resource(stack.id, local_engine_id, tgm)
engines.remove(local_engine_id)
# cancel workers on remote engines
for engine_id in engines:
rpc_client.cancel_check_resource(stack.context, stack.id, engine_id)
return _wait_for_cancellation(stack)

View File

@ -2436,6 +2436,35 @@ class DBAPIResourceTest(common.HeatTestCase):
self.assertRaises(exception.NotFound, db_api.resource_get, self.assertRaises(exception.NotFound, db_api.resource_get,
self.ctx, resource.id) self.ctx, resource.id)
def test_engine_get_all_locked_by_stack(self):
values = [
{'name': 'res1', 'action': rsrc.Resource.DELETE,
'root_stack_id': self.stack.id,
'status': rsrc.Resource.COMPLETE},
{'name': 'res2', 'action': rsrc.Resource.DELETE,
'root_stack_id': self.stack.id,
'status': rsrc.Resource.IN_PROGRESS, 'engine_id': 'engine-001'},
{'name': 'res3', 'action': rsrc.Resource.UPDATE,
'root_stack_id': self.stack.id,
'status': rsrc.Resource.IN_PROGRESS, 'engine_id': 'engine-002'},
{'name': 'res4', 'action': rsrc.Resource.CREATE,
'root_stack_id': self.stack.id,
'status': rsrc.Resource.COMPLETE},
{'name': 'res5', 'action': rsrc.Resource.INIT,
'root_stack_id': self.stack.id,
'status': rsrc.Resource.COMPLETE},
{'name': 'res6', 'action': rsrc.Resource.CREATE,
'root_stack_id': self.stack.id,
'status': rsrc.Resource.IN_PROGRESS, 'engine_id': 'engine-001'},
{'name': 'res6'},
]
for val in values:
create_resource(self.ctx, self.stack, **val)
engines = db_api.engine_get_all_locked_by_stack(self.ctx,
self.stack.id)
self.assertEqual({'engine-001', 'engine-002'}, engines)
class DBAPIStackLockTest(common.HeatTestCase): class DBAPIStackLockTest(common.HeatTestCase):
def setUp(self): def setUp(self):

View File

@ -108,11 +108,13 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
self.resource.id, self.resource.id,
mock.ANY, True) mock.ANY, True)
@mock.patch.object(resource.Resource, 'load')
@mock.patch.object(resource.Resource, 'make_replacement') @mock.patch.object(resource.Resource, 'make_replacement')
@mock.patch.object(stack.Stack, 'time_remaining') @mock.patch.object(stack.Stack, 'time_remaining')
def test_is_update_traversal_raise_update_replace( def test_is_update_traversal_raise_update_replace(
self, tr, mock_mr, mock_cru, mock_crc, mock_pcr, mock_csc, self, tr, mock_mr, mock_load, mock_cru, mock_crc, mock_pcr,
mock_cid): mock_csc, mock_cid):
mock_load.return_value = self.resource, self.stack, self.stack
mock_cru.side_effect = resource.UpdateReplace mock_cru.side_effect = resource.UpdateReplace
tr.return_value = 317 tr.return_value = 317
self.worker.check_resource( self.worker.check_resource(
@ -550,10 +552,13 @@ class CheckWorkflowCleanupTest(common.HeatTestCase):
self.is_update = False self.is_update = False
self.graph_key = (self.resource.id, self.is_update) self.graph_key = (self.resource.id, self.is_update)
@mock.patch.object(resource.Resource, 'load')
@mock.patch.object(stack.Stack, 'time_remaining') @mock.patch.object(stack.Stack, 'time_remaining')
def test_is_cleanup_traversal( def test_is_cleanup_traversal(
self, tr, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): self, tr, mock_load, mock_cru, mock_crc, mock_pcr, mock_csc,
mock_cid):
tr.return_value = 317 tr.return_value = 317
mock_load.return_value = self.resource, self.stack, self.stack
self.worker.check_resource( self.worker.check_resource(
self.ctx, self.resource.id, self.stack.current_traversal, {}, self.ctx, self.resource.id, self.stack.current_traversal, {},
self.is_update, None) self.is_update, None)

View File

@ -15,8 +15,10 @@
import mock import mock
from heat.db import api as db_api
from heat.engine import check_resource from heat.engine import check_resource
from heat.engine import worker from heat.engine import worker
from heat.rpc import worker_client as wc
from heat.tests import common from heat.tests import common
from heat.tests import utils from heat.tests import utils
@ -44,11 +46,11 @@ class WorkerServiceTest(common.HeatTestCase):
target_class, target_class,
rpc_server_method rpc_server_method
): ):
self.worker = worker.WorkerService('host-1', self.worker = worker.WorkerService('host-1',
'topic-1', 'topic-1',
'engine_id', 'engine_id',
mock.Mock()) mock.Mock())
self.worker.start() self.worker.start()
# Make sure target is called with proper parameters # Make sure target is called with proper parameters
@ -133,3 +135,88 @@ class WorkerServiceTest(common.HeatTestCase):
self.assertTrue(mock_tgm.add_msg_queue.called) self.assertTrue(mock_tgm.add_msg_queue.called)
# ensure remove is also called # ensure remove is also called
self.assertTrue(mock_tgm.remove_msg_queue.called) self.assertTrue(mock_tgm.remove_msg_queue.called)
@mock.patch.object(worker, '_wait_for_cancellation')
@mock.patch.object(worker, '_cancel_check_resource')
@mock.patch.object(wc.WorkerClient, 'cancel_check_resource')
@mock.patch.object(db_api, 'engine_get_all_locked_by_stack')
def test_cancel_workers_when_no_resource_found(self, mock_get_locked,
mock_ccr, mock_wccr,
mock_wc):
mock_tgm = mock.Mock()
_worker = worker.WorkerService('host-1', 'topic-1', 'engine-001',
mock_tgm)
stack = mock.MagicMock()
stack.id = 'stack_id'
mock_get_locked.return_value = []
worker._cancel_workers(stack, mock_tgm, 'engine-001',
_worker._rpc_client)
self.assertFalse(mock_wccr.called)
self.assertFalse(mock_ccr.called)
@mock.patch.object(worker, '_wait_for_cancellation')
@mock.patch.object(worker, '_cancel_check_resource')
@mock.patch.object(wc.WorkerClient, 'cancel_check_resource')
@mock.patch.object(db_api, 'engine_get_all_locked_by_stack')
def test_cancel_workers_with_resources_found(self, mock_get_locked,
mock_ccr, mock_wccr,
mock_wc):
mock_tgm = mock.Mock()
_worker = worker.WorkerService('host-1', 'topic-1', 'engine-001',
mock_tgm)
stack = mock.MagicMock()
stack.id = 'stack_id'
mock_get_locked.return_value = ['engine-001', 'engine-007',
'engine-008']
worker._cancel_workers(stack, mock_tgm, 'engine-001',
_worker._rpc_client)
mock_wccr.assert_called_once_with(stack.id, 'engine-001', mock_tgm)
self.assertEqual(2, mock_ccr.call_count)
calls = [mock.call(stack.context, stack.id, 'engine-007'),
mock.call(stack.context, stack.id, 'engine-008')]
mock_ccr.assert_has_calls(calls, any_order=True)
self.assertTrue(mock_wc.called)
@mock.patch.object(worker, '_cancel_workers')
@mock.patch.object(worker.WorkerService, 'stop_traversal')
def test_stop_all_workers_when_stack_in_progress(self, mock_st, mock_cw):
mock_tgm = mock.Mock()
_worker = worker.WorkerService('host-1', 'topic-1', 'engine-001',
mock_tgm)
stack = mock.MagicMock()
stack.IN_PROGRESS = 'IN_PROGRESS'
stack.status = stack.IN_PROGRESS
stack.id = 'stack_id'
stack.rollback = mock.MagicMock()
_worker.stop_all_workers(stack)
mock_st.assert_called_once_with(stack)
mock_cw.assert_called_once_with(stack, mock_tgm, 'engine-001',
_worker._rpc_client)
self.assertFalse(stack.rollback.called)
@mock.patch.object(worker, '_cancel_workers')
@mock.patch.object(worker.WorkerService, 'stop_traversal')
def test_stop_all_workers_when_stack_not_in_progress(self, mock_st,
mock_cw):
mock_tgm = mock.Mock()
_worker = worker.WorkerService('host-1', 'topic-1', 'engine-001',
mock_tgm)
stack = mock.MagicMock()
stack.FAILED = 'FAILED'
stack.status = stack.FAILED
stack.id = 'stack_id'
stack.rollback = mock.MagicMock()
_worker.stop_all_workers(stack)
self.assertFalse(mock_st.called)
mock_cw.assert_called_once_with(stack, mock_tgm, 'engine-001',
_worker._rpc_client)
self.assertFalse(stack.rollback.called)
# test when stack complete
stack.FAILED = 'FAILED'
stack.status = stack.FAILED
_worker.stop_all_workers(stack)
self.assertFalse(mock_st.called)
mock_cw.assert_called_with(stack, mock_tgm, 'engine-001',
_worker._rpc_client)
self.assertFalse(stack.rollback.called)

View File

@ -22,6 +22,11 @@ from heat.tests import utils
class SyncPointTestCase(common.HeatTestCase): class SyncPointTestCase(common.HeatTestCase):
def setUp(self):
super(SyncPointTestCase, self).setUp()
self.dummy_event = mock.MagicMock()
self.dummy_event.ready.return_value = False
def test_sync_waiting(self): def test_sync_waiting(self):
ctx = utils.dummy_context() ctx = utils.dummy_context()
stack = tools.get_stack('test_stack', utils.dummy_context(), stack = tools.get_stack('test_stack', utils.dummy_context(),

View File

@ -12,6 +12,7 @@
# under the License. # under the License.
import collections import collections
import eventlet
import itertools import itertools
import json import json
import os import os
@ -72,6 +73,7 @@ class ResourceTest(common.HeatTestCase):
env=self.env), env=self.env),
stack_id=str(uuid.uuid4())) stack_id=str(uuid.uuid4()))
self.dummy_timeout = 10 self.dummy_timeout = 10
self.dummy_event = eventlet.event.Event()
def test_get_class_ok(self): def test_get_class_ok(self):
cls = resources.global_env().get_class_to_instantiate( cls = resources.global_env().get_class_to_instantiate(
@ -1880,10 +1882,7 @@ class ResourceTest(common.HeatTestCase):
res._release('engine-id') res._release('engine-id')
self.assertFalse(mock_sau.called) self.assertFalse(mock_sau.called)
@mock.patch.object(resource.scheduler.TaskRunner, '__init__', def test_create_convergence(self):
return_value=None)
@mock.patch.object(resource.scheduler.TaskRunner, '__call__')
def test_create_convergence(self, mock_call, mock_init):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack) res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
res.action = res.CREATE res.action = res.CREATE
@ -1893,12 +1892,11 @@ class ResourceTest(common.HeatTestCase):
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
pcb = mock.Mock() pcb = mock.Mock()
res.create_convergence(self.stack.t.id, res_data, 'engine-007', with mock.patch.object(resource.Resource, 'create') as mock_create:
60, pcb) res.create_convergence(self.stack.t.id, res_data, 'engine-007',
-1, pcb)
self.assertTrue(mock_create.called)
mock_init.assert_called_once_with(res.create)
mock_call.assert_called_once_with(timeout=60, progress_callback=pcb)
self.assertEqual(self.stack.t.id, res.current_template_id)
self.assertItemsEqual([1, 3], res.requires) self.assertItemsEqual([1, 3], res.requires)
self._assert_resource_lock(res.id, None, 2) self._assert_resource_lock(res.id, None, 2)
@ -1910,9 +1908,9 @@ class ResourceTest(common.HeatTestCase):
res_data = {(1, True): {u'id': 1, u'name': 'A', 'attrs': {}}, res_data = {(1, True): {u'id': 1, u'name': 'A', 'attrs': {}},
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
pcb = mock.Mock()
self.assertRaises(scheduler.Timeout, res.create_convergence, self.assertRaises(scheduler.Timeout, res.create_convergence,
self.stack.t.id, res_data, 'engine-007', self.stack.t.id, res_data, 'engine-007', -1, pcb)
-1)
def test_create_convergence_sets_requires_for_failure(self): def test_create_convergence_sets_requires_for_failure(self):
"""Ensure that requires are computed correctly. """Ensure that requires are computed correctly.
@ -1930,7 +1928,7 @@ class ResourceTest(common.HeatTestCase):
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
self.assertRaises(exception.ResourceNotAvailable, self.assertRaises(exception.ResourceNotAvailable,
res.create_convergence, self.stack.t.id, res_data, res.create_convergence, self.stack.t.id, res_data,
'engine-007', self.dummy_timeout) 'engine-007', self.dummy_timeout, self.dummy_event)
self.assertItemsEqual([5, 3], res.requires) self.assertItemsEqual([5, 3], res.requires)
self._assert_resource_lock(res.id, None, 2) self._assert_resource_lock(res.id, None, 2)
@ -1945,9 +1943,10 @@ class ResourceTest(common.HeatTestCase):
self._assert_resource_lock(res.id, None, None) self._assert_resource_lock(res.id, None, None)
res_data = {(1, True): {u'id': 5, u'name': 'A', 'attrs': {}}, res_data = {(1, True): {u'id': 5, u'name': 'A', 'attrs': {}},
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
res.create_convergence(self.stack.t.id, res_data, 'engine-007', tr = scheduler.TaskRunner(res.create_convergence, self.stack.t.id,
self.dummy_timeout) res_data, 'engine-007', self.dummy_timeout,
self.dummy_event)
tr()
mock_adopt.assert_called_once_with( mock_adopt.assert_called_once_with(
resource_data={'resource_id': 'fluffy'}) resource_data={'resource_id': 'fluffy'})
self.assertItemsEqual([5, 3], res.requires) self.assertItemsEqual([5, 3], res.requires)
@ -1962,15 +1961,13 @@ class ResourceTest(common.HeatTestCase):
self._assert_resource_lock(res.id, None, None) self._assert_resource_lock(res.id, None, None)
res_data = {(1, True): {u'id': 5, u'name': 'A', 'attrs': {}}, res_data = {(1, True): {u'id': 5, u'name': 'A', 'attrs': {}},
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
exc = self.assertRaises(exception.ResourceFailure, tr = scheduler.TaskRunner(res.create_convergence, self.stack.t.id,
res.create_convergence, self.stack.t.id, res_data, 'engine-007', self.dummy_timeout,
res_data, 'engine-007', self.dummy_timeout) self.dummy_event)
exc = self.assertRaises(exception.ResourceFailure, tr)
self.assertIn('Resource ID was not provided', six.text_type(exc)) self.assertIn('Resource ID was not provided', six.text_type(exc))
@mock.patch.object(resource.scheduler.TaskRunner, '__init__', def test_update_convergence(self):
return_value=None)
@mock.patch.object(resource.scheduler.TaskRunner, '__call__')
def test_update_convergence(self, mock_call, mock_init):
tmpl = template.Template({ tmpl = template.Template({
'HeatTemplateFormatVersion': '2012-12-12', 'HeatTemplateFormatVersion': '2012-12-12',
'Resources': { 'Resources': {
@ -1981,6 +1978,7 @@ class ResourceTest(common.HeatTestCase):
stack.converge_stack(stack.t, action=stack.CREATE) stack.converge_stack(stack.t, action=stack.CREATE)
res = stack.resources['test_res'] res = stack.resources['test_res']
res.requires = [2] res.requires = [2]
res.action = res.CREATE
res._store() res._store()
self._assert_resource_lock(res.id, None, None) self._assert_resource_lock(res.id, None, None)
@ -1997,13 +1995,13 @@ class ResourceTest(common.HeatTestCase):
res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}}, res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}},
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
pcb = mock.Mock() pcb = mock.Mock()
res.update_convergence(new_temp.id, res_data, 'engine-007', 120, with mock.patch.object(resource.Resource, 'update') as mock_update:
new_stack, pcb) tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
res_data, 'engine-007', 120, new_stack,
pcb)
tr()
self.assertTrue(mock_update.called)
expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name]
mock_init.assert_called_once_with(res.update, expected_rsrc_def)
mock_call.assert_called_once_with(timeout=120, progress_callback=pcb)
self.assertEqual(new_temp.id, res.current_template_id)
self.assertItemsEqual([3, 4], res.requires) self.assertItemsEqual([3, 4], res.requires)
self._assert_resource_lock(res.id, None, 2) self._assert_resource_lock(res.id, None, 2)
@ -2030,9 +2028,10 @@ class ResourceTest(common.HeatTestCase):
new_temp, stack_id=self.stack.id) new_temp, stack_id=self.stack.id)
res_data = {} res_data = {}
self.assertRaises(scheduler.Timeout, res.update_convergence, tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
new_temp.id, res_data, 'engine-007', res_data, 'engine-007', -1, new_stack,
-1, new_stack) self.dummy_event)
self.assertRaises(scheduler.Timeout, tr)
def test_update_convergence_with_substitute_class(self): def test_update_convergence_with_substitute_class(self):
tmpl = rsrc_defn.ResourceDefinition('test_res', tmpl = rsrc_defn.ResourceDefinition('test_res',
@ -2073,9 +2072,10 @@ class ResourceTest(common.HeatTestCase):
new_temp, stack_id=self.stack.id) new_temp, stack_id=self.stack.id)
res_data = {} res_data = {}
self.assertRaises(resource.UpdateReplace, res.update_convergence, tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
new_temp.id, res_data, 'engine-007', res_data, 'engine-007', -1, new_stack,
-1, new_stack) self.dummy_event)
self.assertRaises(resource.UpdateReplace, tr)
def test_update_in_progress_convergence(self): def test_update_in_progress_convergence(self):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
@ -2088,12 +2088,10 @@ class ResourceTest(common.HeatTestCase):
res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}}, res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}},
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
ex = self.assertRaises(exception.UpdateInProgress, tr = scheduler.TaskRunner(res.update_convergence, 'template_key',
res.update_convergence, res_data, 'engine-007', self.dummy_timeout,
'template_key', mock.ANY, self.dummy_event)
res_data, 'engine-007', ex = self.assertRaises(exception.UpdateInProgress, tr)
self.dummy_timeout,
mock.ANY)
msg = ("The resource %s is already being updated." % msg = ("The resource %s is already being updated." %
res.name) res.name)
self.assertEqual(msg, six.text_type(ex)) self.assertEqual(msg, six.text_type(ex))
@ -2130,9 +2128,10 @@ class ResourceTest(common.HeatTestCase):
new_temp, stack_id=self.stack.id) new_temp, stack_id=self.stack.id)
dummy_ex = exception.ResourceFailure(exc, res, action=res.UPDATE) dummy_ex = exception.ResourceFailure(exc, res, action=res.UPDATE)
mock_update.side_effect = dummy_ex mock_update.side_effect = dummy_ex
self.assertRaises(exception.ResourceFailure, tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
res.update_convergence, new_temp.id, res_data, res_data, 'engine-007', 120, new_stack,
'engine-007', 120, new_stack) self.dummy_event)
self.assertRaises(exception.ResourceFailure, tr)
expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name] expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name]
mock_update.assert_called_once_with(expected_rsrc_def) mock_update.assert_called_once_with(expected_rsrc_def)
@ -2170,9 +2169,10 @@ class ResourceTest(common.HeatTestCase):
mock_update.side_effect = resource.UpdateReplace mock_update.side_effect = resource.UpdateReplace
new_stack = parser.Stack(utils.dummy_context(), 'test_stack', new_stack = parser.Stack(utils.dummy_context(), 'test_stack',
new_temp, stack_id=self.stack.id) new_temp, stack_id=self.stack.id)
self.assertRaises(resource.UpdateReplace, tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
res.update_convergence, new_temp.id, res_data, res_data, 'engine-007', 120, new_stack,
'engine-007', 120, new_stack) self.dummy_event)
self.assertRaises(resource.UpdateReplace, tr)
expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name] expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name]
mock_update.assert_called_once_with(expected_rsrc_def) mock_update.assert_called_once_with(expected_rsrc_def)
@ -2202,7 +2202,7 @@ class ResourceTest(common.HeatTestCase):
res.restore_prev_rsrc = mock.Mock() res.restore_prev_rsrc = mock.Mock()
tr = scheduler.TaskRunner(res.update_convergence, 'new_tmpl_id', {}, tr = scheduler.TaskRunner(res.update_convergence, 'new_tmpl_id', {},
'engine-007', self.dummy_timeout, 'engine-007', self.dummy_timeout,
new_stack) new_stack, self.dummy_event)
self.assertRaises(resource.UpdateReplace, tr) self.assertRaises(resource.UpdateReplace, tr)
self.assertTrue(res.restore_prev_rsrc.called) self.assertTrue(res.restore_prev_rsrc.called)
@ -2225,15 +2225,13 @@ class ResourceTest(common.HeatTestCase):
'Simulate rollback') 'Simulate rollback')
res.restore_prev_rsrc = mock.Mock(side_effect=Exception) res.restore_prev_rsrc = mock.Mock(side_effect=Exception)
tr = scheduler.TaskRunner(res.update_convergence, 'new_tmpl_id', {}, tr = scheduler.TaskRunner(res.update_convergence, 'new_tmpl_id', {},
'engine-007', self.dummy_timeout, new_stack) 'engine-007', self.dummy_timeout, new_stack,
self.dummy_event)
self.assertRaises(exception.ResourceFailure, tr) self.assertRaises(exception.ResourceFailure, tr)
self.assertTrue(res.restore_prev_rsrc.called) self.assertTrue(res.restore_prev_rsrc.called)
self.assertEqual((res.UPDATE, res.FAILED), res.state) self.assertEqual((res.UPDATE, res.FAILED), res.state)
@mock.patch.object(resource.scheduler.TaskRunner, '__init__', def test_delete_convergence_ok(self):
return_value=None)
@mock.patch.object(resource.scheduler.TaskRunner, '__call__')
def test_delete_convergence_ok(self, mock_call, mock_init):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack) res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
res.current_template_id = 1 res.current_template_id = 1
@ -2244,11 +2242,13 @@ class ResourceTest(common.HeatTestCase):
res._update_replacement_data = mock.Mock() res._update_replacement_data = mock.Mock()
self._assert_resource_lock(res.id, None, None) self._assert_resource_lock(res.id, None, None)
pcb = mock.Mock() pcb = mock.Mock()
res.delete_convergence(2, {}, 'engine-007', 20, pcb) with mock.patch.object(resource.Resource, 'delete') as mock_delete:
tr = scheduler.TaskRunner(res.delete_convergence, 2, {},
mock_init.assert_called_once_with(res.delete) 'engine-007', 20, pcb)
mock_call.assert_called_once_with(timeout=20, progress_callback=pcb) tr()
self.assertTrue(mock_delete.called)
self.assertTrue(res._update_replacement_data.called) self.assertTrue(res._update_replacement_data.called)
self._assert_resource_lock(res.id, None, 2)
def test_delete_convergence_does_not_delete_same_template_resource(self): def test_delete_convergence_does_not_delete_same_template_resource(self):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
@ -2256,8 +2256,10 @@ class ResourceTest(common.HeatTestCase):
res.current_template_id = 'same-template' res.current_template_id = 'same-template'
res._store() res._store()
res.delete = mock.Mock() res.delete = mock.Mock()
res.delete_convergence('same-template', {}, 'engine-007', tr = scheduler.TaskRunner(res.delete_convergence, 'same-template', {},
self.dummy_timeout) 'engine-007', self.dummy_timeout,
self.dummy_event)
tr()
self.assertFalse(res.delete.called) self.assertFalse(res.delete.called)
def test_delete_convergence_fail(self): def test_delete_convergence_fail(self):
@ -2270,9 +2272,9 @@ class ResourceTest(common.HeatTestCase):
res_id = res.id res_id = res.id
res.handle_delete = mock.Mock(side_effect=ValueError('test')) res.handle_delete = mock.Mock(side_effect=ValueError('test'))
self._assert_resource_lock(res.id, None, None) self._assert_resource_lock(res.id, None, None)
self.assertRaises(exception.ResourceFailure, tr = scheduler.TaskRunner(res.delete_convergence, 2, {}, 'engine-007',
res.delete_convergence, 2, {}, 'engine-007', self.dummy_timeout, self.dummy_event)
self.dummy_timeout) self.assertRaises(exception.ResourceFailure, tr)
self.assertTrue(res.handle_delete.called) self.assertTrue(res.handle_delete.called)
# confirm that the DB object still exists, and it's lock is released. # confirm that the DB object still exists, and it's lock is released.
@ -2291,9 +2293,10 @@ class ResourceTest(common.HeatTestCase):
rs = resource_objects.Resource.get_obj(self.stack.context, res.id) rs = resource_objects.Resource.get_obj(self.stack.context, res.id)
rs.update_and_save({'engine_id': 'not-this'}) rs.update_and_save({'engine_id': 'not-this'})
self._assert_resource_lock(res.id, 'not-this', None) self._assert_resource_lock(res.id, 'not-this', None)
ex = self.assertRaises(exception.UpdateInProgress,
res.delete_convergence, tr = scheduler.TaskRunner(res.delete_convergence, 1, {}, 'engine-007',
1, {}, 'engine-007', self.dummy_timeout) self.dummy_timeout, self.dummy_event)
ex = self.assertRaises(exception.UpdateInProgress, tr)
msg = ("The resource %s is already being updated." % msg = ("The resource %s is already being updated." %
res.name) res.name)
self.assertEqual(msg, six.text_type(ex)) self.assertEqual(msg, six.text_type(ex))
@ -2308,7 +2311,9 @@ class ResourceTest(common.HeatTestCase):
res.destroy = mock.Mock() res.destroy = mock.Mock()
input_data = {(1, False): 4, (2, False): 5} # needed_by resource ids input_data = {(1, False): 4, (2, False): 5} # needed_by resource ids
self._assert_resource_lock(res.id, None, None) self._assert_resource_lock(res.id, None, None)
res.delete_convergence(1, input_data, 'engine-007', self.dummy_timeout) scheduler.TaskRunner(res.delete_convergence, 1, input_data,
'engine-007', self.dummy_timeout,
self.dummy_event)()
self.assertItemsEqual([4, 5], res.needed_by) self.assertItemsEqual([4, 5], res.needed_by)
@mock.patch.object(resource_objects.Resource, 'get_obj') @mock.patch.object(resource_objects.Resource, 'get_obj')
@ -2449,7 +2454,9 @@ class ResourceTest(common.HeatTestCase):
res._store() res._store()
with mock.patch.object(resource_objects.Resource, with mock.patch.object(resource_objects.Resource,
'delete') as resource_del: 'delete') as resource_del:
res.delete_convergence(1, {}, 'engine-007', 1) tr = scheduler.TaskRunner(res.delete_convergence, 1, {},
'engine-007', 1, self.dummy_event)
tr()
resource_del.assert_called_once_with(res.context, res.id) resource_del.assert_called_once_with(res.context, res.id)
def test_delete_convergence_throws_timeout(self): def test_delete_convergence_throws_timeout(self):
@ -2458,8 +2465,9 @@ class ResourceTest(common.HeatTestCase):
res.action = res.CREATE res.action = res.CREATE
res._store() res._store()
timeout = -1 # to emulate timeout timeout = -1 # to emulate timeout
self.assertRaises(scheduler.Timeout, res.delete_convergence, tr = scheduler.TaskRunner(res.delete_convergence, 1, {}, 'engine-007',
1, {}, 'engine-007', timeout) timeout, self.dummy_event)
self.assertRaises(scheduler.Timeout, tr)
@mock.patch.object(parser.Stack, 'load') @mock.patch.object(parser.Stack, 'load')
@mock.patch.object(resource.Resource, '_load_data') @mock.patch.object(resource.Resource, '_load_data')
@ -4054,6 +4062,7 @@ class ResourceUpdateRestrictionTest(common.HeatTestCase):
} }
} }
self.dummy_timeout = 10 self.dummy_timeout = 10
self.dummy_event = eventlet.event.Event()
def create_resource(self): def create_resource(self):
self.stack = parser.Stack(utils.dummy_context(), 'test_stack', self.stack = parser.Stack(utils.dummy_context(), 'test_stack',
@ -4069,9 +4078,10 @@ class ResourceUpdateRestrictionTest(common.HeatTestCase):
stack_id=str(uuid.uuid4())) stack_id=str(uuid.uuid4()))
res_data = {} res_data = {}
res = self.stack['bar'] res = self.stack['bar']
pcb = mock.Mock()
self.patchobject(res, 'lock') self.patchobject(res, 'lock')
scheduler.TaskRunner(res.create_convergence, self.stack.t.id, res.create_convergence(self.stack.t.id, res_data, 'engine-007',
res_data, 'engine-007', self.dummy_timeout)() self.dummy_timeout, pcb)
return res return res
def test_update_restricted(self): def test_update_restricted(self):
@ -4193,7 +4203,8 @@ class ResourceUpdateRestrictionTest(common.HeatTestCase):
{}, {},
'engine-007', 'engine-007',
self.dummy_timeout, self.dummy_timeout,
self.new_stack)) self.new_stack,
eventlet.event.Event()))
self.assertEqual('ResourceActionRestricted: resources.bar: ' self.assertEqual('ResourceActionRestricted: resources.bar: '
'replace is restricted for resource.', 'replace is restricted for resource.',
six.text_type(error)) six.text_type(error))
@ -4224,7 +4235,8 @@ class ResourceUpdateRestrictionTest(common.HeatTestCase):
{}, {},
'engine-007', 'engine-007',
self.dummy_timeout, self.dummy_timeout,
self.new_stack)) self.new_stack,
eventlet.event.Event()))
self.assertIn('requires replacement', six.text_type(error)) self.assertIn('requires replacement', six.text_type(error))
ev.assert_not_called() ev.assert_not_called()