Merge "Convergence: basic framework for cancelling workers"
This commit is contained in:
commit
1f2cb45040
|
@ -142,6 +142,10 @@ def resource_get_all_by_root_stack(context, stack_id, filters=None):
|
|||
return IMPL.resource_get_all_by_root_stack(context, stack_id, filters)
|
||||
|
||||
|
||||
def engine_get_all_locked_by_stack(context, stack_id):
|
||||
return IMPL.engine_get_all_locked_by_stack(context, stack_id)
|
||||
|
||||
|
||||
def resource_purge_deleted(context, stack_id):
|
||||
return IMPL.resource_purge_deleted(context, stack_id)
|
||||
|
||||
|
|
|
@ -410,6 +410,15 @@ def resource_get_all_by_root_stack(context, stack_id, filters=None):
|
|||
return dict((res.id, res) for res in results)
|
||||
|
||||
|
||||
def engine_get_all_locked_by_stack(context, stack_id):
|
||||
query = context.session.query(
|
||||
func.distinct(models.Resource.engine_id)
|
||||
).filter(
|
||||
models.Resource.stack_id == stack_id,
|
||||
models.Resource.engine_id.isnot(None))
|
||||
return set(i[0] for i in query.all())
|
||||
|
||||
|
||||
def stack_get_by_name_and_owner_id(context, stack_name, owner_id):
|
||||
query = soft_delete_aware_query(
|
||||
context, models.Stack
|
||||
|
|
|
@ -25,12 +25,16 @@ from heat.common.i18n import _LE
|
|||
from heat.common.i18n import _LI
|
||||
from heat.common.i18n import _LW
|
||||
from heat.common import messaging as rpc_messaging
|
||||
from heat.db import api as db_api
|
||||
from heat.engine import check_resource
|
||||
from heat.engine import sync_point
|
||||
from heat.rpc import api as rpc_api
|
||||
from heat.rpc import worker_client as rpc_client
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CANCEL_RETRIES = 3
|
||||
|
||||
|
||||
@profiler.trace_cls("rpc")
|
||||
class WorkerService(service.Service):
|
||||
|
@ -107,6 +111,26 @@ class WorkerService(service.Service):
|
|||
"%(name)s while cancelling the operation."),
|
||||
{'name': stack.name, 'trvsl': old_trvsl})
|
||||
|
||||
def stop_all_workers(self, stack):
|
||||
# stop the traversal
|
||||
if stack.status == stack.IN_PROGRESS:
|
||||
self.stop_traversal(stack)
|
||||
|
||||
# cancel existing workers
|
||||
cancelled = _cancel_workers(stack, self.thread_group_mgr,
|
||||
self.engine_id, self._rpc_client)
|
||||
if not cancelled:
|
||||
LOG.error(_LE("Failed to stop all workers of stack %(name)s "
|
||||
", stack cancel not complete"),
|
||||
{'name': stack.name})
|
||||
return False
|
||||
|
||||
LOG.info(_LI('[%(name)s(%(id)s)] Stopped all active workers for stack '
|
||||
'%(action)s'),
|
||||
{'name': stack.name, 'id': stack.id, 'action': stack.action})
|
||||
|
||||
return True
|
||||
|
||||
@context.request_context
|
||||
def check_resource(self, cnxt, resource_id, current_traversal, data,
|
||||
is_update, adopt_stack_data):
|
||||
|
@ -145,5 +169,42 @@ class WorkerService(service.Service):
|
|||
All the workers running for the given stack will be
|
||||
cancelled.
|
||||
"""
|
||||
# TODO(ananta): Implement cancel check-resource
|
||||
LOG.debug('Cancelling workers for stack [%s]', stack_id)
|
||||
_cancel_check_resource(stack_id, self.engine_id, self.thread_group_mgr)
|
||||
|
||||
|
||||
def _cancel_check_resource(stack_id, engine_id, tgm):
|
||||
LOG.debug('Cancelling workers for stack [%s] in engine [%s]',
|
||||
stack_id, engine_id)
|
||||
tgm.send(stack_id, rpc_api.THREAD_CANCEL)
|
||||
|
||||
|
||||
def _wait_for_cancellation(stack, wait=5):
|
||||
# give enough time to wait till cancel is completed
|
||||
retries = CANCEL_RETRIES
|
||||
while retries > 0:
|
||||
retries -= 1
|
||||
eventlet.sleep(wait)
|
||||
engines = db_api.engine_get_all_locked_by_stack(
|
||||
stack.context, stack.id)
|
||||
if not engines:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _cancel_workers(stack, tgm, local_engine_id, rpc_client):
|
||||
engines = db_api.engine_get_all_locked_by_stack(stack.context, stack.id)
|
||||
|
||||
if not engines:
|
||||
return True
|
||||
|
||||
# cancel workers running locally
|
||||
if local_engine_id in engines:
|
||||
_cancel_check_resource(stack.id, local_engine_id, tgm)
|
||||
engines.remove(local_engine_id)
|
||||
|
||||
# cancel workers on remote engines
|
||||
for engine_id in engines:
|
||||
rpc_client.cancel_check_resource(stack.context, stack.id, engine_id)
|
||||
|
||||
return _wait_for_cancellation(stack)
|
||||
|
|
|
@ -2436,6 +2436,35 @@ class DBAPIResourceTest(common.HeatTestCase):
|
|||
self.assertRaises(exception.NotFound, db_api.resource_get,
|
||||
self.ctx, resource.id)
|
||||
|
||||
def test_engine_get_all_locked_by_stack(self):
|
||||
values = [
|
||||
{'name': 'res1', 'action': rsrc.Resource.DELETE,
|
||||
'root_stack_id': self.stack.id,
|
||||
'status': rsrc.Resource.COMPLETE},
|
||||
{'name': 'res2', 'action': rsrc.Resource.DELETE,
|
||||
'root_stack_id': self.stack.id,
|
||||
'status': rsrc.Resource.IN_PROGRESS, 'engine_id': 'engine-001'},
|
||||
{'name': 'res3', 'action': rsrc.Resource.UPDATE,
|
||||
'root_stack_id': self.stack.id,
|
||||
'status': rsrc.Resource.IN_PROGRESS, 'engine_id': 'engine-002'},
|
||||
{'name': 'res4', 'action': rsrc.Resource.CREATE,
|
||||
'root_stack_id': self.stack.id,
|
||||
'status': rsrc.Resource.COMPLETE},
|
||||
{'name': 'res5', 'action': rsrc.Resource.INIT,
|
||||
'root_stack_id': self.stack.id,
|
||||
'status': rsrc.Resource.COMPLETE},
|
||||
{'name': 'res6', 'action': rsrc.Resource.CREATE,
|
||||
'root_stack_id': self.stack.id,
|
||||
'status': rsrc.Resource.IN_PROGRESS, 'engine_id': 'engine-001'},
|
||||
{'name': 'res6'},
|
||||
]
|
||||
for val in values:
|
||||
create_resource(self.ctx, self.stack, **val)
|
||||
|
||||
engines = db_api.engine_get_all_locked_by_stack(self.ctx,
|
||||
self.stack.id)
|
||||
self.assertEqual({'engine-001', 'engine-002'}, engines)
|
||||
|
||||
|
||||
class DBAPIStackLockTest(common.HeatTestCase):
|
||||
def setUp(self):
|
||||
|
|
|
@ -108,11 +108,13 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
|
|||
self.resource.id,
|
||||
mock.ANY, True)
|
||||
|
||||
@mock.patch.object(resource.Resource, 'load')
|
||||
@mock.patch.object(resource.Resource, 'make_replacement')
|
||||
@mock.patch.object(stack.Stack, 'time_remaining')
|
||||
def test_is_update_traversal_raise_update_replace(
|
||||
self, tr, mock_mr, mock_cru, mock_crc, mock_pcr, mock_csc,
|
||||
mock_cid):
|
||||
self, tr, mock_mr, mock_load, mock_cru, mock_crc, mock_pcr,
|
||||
mock_csc, mock_cid):
|
||||
mock_load.return_value = self.resource, self.stack, self.stack
|
||||
mock_cru.side_effect = resource.UpdateReplace
|
||||
tr.return_value = 317
|
||||
self.worker.check_resource(
|
||||
|
@ -550,10 +552,13 @@ class CheckWorkflowCleanupTest(common.HeatTestCase):
|
|||
self.is_update = False
|
||||
self.graph_key = (self.resource.id, self.is_update)
|
||||
|
||||
@mock.patch.object(resource.Resource, 'load')
|
||||
@mock.patch.object(stack.Stack, 'time_remaining')
|
||||
def test_is_cleanup_traversal(
|
||||
self, tr, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
|
||||
self, tr, mock_load, mock_cru, mock_crc, mock_pcr, mock_csc,
|
||||
mock_cid):
|
||||
tr.return_value = 317
|
||||
mock_load.return_value = self.resource, self.stack, self.stack
|
||||
self.worker.check_resource(
|
||||
self.ctx, self.resource.id, self.stack.current_traversal, {},
|
||||
self.is_update, None)
|
||||
|
|
|
@ -15,8 +15,10 @@
|
|||
|
||||
import mock
|
||||
|
||||
from heat.db import api as db_api
|
||||
from heat.engine import check_resource
|
||||
from heat.engine import worker
|
||||
from heat.rpc import worker_client as wc
|
||||
from heat.tests import common
|
||||
from heat.tests import utils
|
||||
|
||||
|
@ -44,11 +46,11 @@ class WorkerServiceTest(common.HeatTestCase):
|
|||
target_class,
|
||||
rpc_server_method
|
||||
):
|
||||
|
||||
self.worker = worker.WorkerService('host-1',
|
||||
'topic-1',
|
||||
'engine_id',
|
||||
mock.Mock())
|
||||
|
||||
self.worker.start()
|
||||
|
||||
# Make sure target is called with proper parameters
|
||||
|
@ -133,3 +135,88 @@ class WorkerServiceTest(common.HeatTestCase):
|
|||
self.assertTrue(mock_tgm.add_msg_queue.called)
|
||||
# ensure remove is also called
|
||||
self.assertTrue(mock_tgm.remove_msg_queue.called)
|
||||
|
||||
@mock.patch.object(worker, '_wait_for_cancellation')
|
||||
@mock.patch.object(worker, '_cancel_check_resource')
|
||||
@mock.patch.object(wc.WorkerClient, 'cancel_check_resource')
|
||||
@mock.patch.object(db_api, 'engine_get_all_locked_by_stack')
|
||||
def test_cancel_workers_when_no_resource_found(self, mock_get_locked,
|
||||
mock_ccr, mock_wccr,
|
||||
mock_wc):
|
||||
mock_tgm = mock.Mock()
|
||||
_worker = worker.WorkerService('host-1', 'topic-1', 'engine-001',
|
||||
mock_tgm)
|
||||
stack = mock.MagicMock()
|
||||
stack.id = 'stack_id'
|
||||
mock_get_locked.return_value = []
|
||||
worker._cancel_workers(stack, mock_tgm, 'engine-001',
|
||||
_worker._rpc_client)
|
||||
self.assertFalse(mock_wccr.called)
|
||||
self.assertFalse(mock_ccr.called)
|
||||
|
||||
@mock.patch.object(worker, '_wait_for_cancellation')
|
||||
@mock.patch.object(worker, '_cancel_check_resource')
|
||||
@mock.patch.object(wc.WorkerClient, 'cancel_check_resource')
|
||||
@mock.patch.object(db_api, 'engine_get_all_locked_by_stack')
|
||||
def test_cancel_workers_with_resources_found(self, mock_get_locked,
|
||||
mock_ccr, mock_wccr,
|
||||
mock_wc):
|
||||
mock_tgm = mock.Mock()
|
||||
_worker = worker.WorkerService('host-1', 'topic-1', 'engine-001',
|
||||
mock_tgm)
|
||||
stack = mock.MagicMock()
|
||||
stack.id = 'stack_id'
|
||||
mock_get_locked.return_value = ['engine-001', 'engine-007',
|
||||
'engine-008']
|
||||
worker._cancel_workers(stack, mock_tgm, 'engine-001',
|
||||
_worker._rpc_client)
|
||||
mock_wccr.assert_called_once_with(stack.id, 'engine-001', mock_tgm)
|
||||
self.assertEqual(2, mock_ccr.call_count)
|
||||
calls = [mock.call(stack.context, stack.id, 'engine-007'),
|
||||
mock.call(stack.context, stack.id, 'engine-008')]
|
||||
mock_ccr.assert_has_calls(calls, any_order=True)
|
||||
self.assertTrue(mock_wc.called)
|
||||
|
||||
@mock.patch.object(worker, '_cancel_workers')
|
||||
@mock.patch.object(worker.WorkerService, 'stop_traversal')
|
||||
def test_stop_all_workers_when_stack_in_progress(self, mock_st, mock_cw):
|
||||
mock_tgm = mock.Mock()
|
||||
_worker = worker.WorkerService('host-1', 'topic-1', 'engine-001',
|
||||
mock_tgm)
|
||||
stack = mock.MagicMock()
|
||||
stack.IN_PROGRESS = 'IN_PROGRESS'
|
||||
stack.status = stack.IN_PROGRESS
|
||||
stack.id = 'stack_id'
|
||||
stack.rollback = mock.MagicMock()
|
||||
_worker.stop_all_workers(stack)
|
||||
mock_st.assert_called_once_with(stack)
|
||||
mock_cw.assert_called_once_with(stack, mock_tgm, 'engine-001',
|
||||
_worker._rpc_client)
|
||||
self.assertFalse(stack.rollback.called)
|
||||
|
||||
@mock.patch.object(worker, '_cancel_workers')
|
||||
@mock.patch.object(worker.WorkerService, 'stop_traversal')
|
||||
def test_stop_all_workers_when_stack_not_in_progress(self, mock_st,
|
||||
mock_cw):
|
||||
mock_tgm = mock.Mock()
|
||||
_worker = worker.WorkerService('host-1', 'topic-1', 'engine-001',
|
||||
mock_tgm)
|
||||
stack = mock.MagicMock()
|
||||
stack.FAILED = 'FAILED'
|
||||
stack.status = stack.FAILED
|
||||
stack.id = 'stack_id'
|
||||
stack.rollback = mock.MagicMock()
|
||||
_worker.stop_all_workers(stack)
|
||||
self.assertFalse(mock_st.called)
|
||||
mock_cw.assert_called_once_with(stack, mock_tgm, 'engine-001',
|
||||
_worker._rpc_client)
|
||||
self.assertFalse(stack.rollback.called)
|
||||
|
||||
# test when stack complete
|
||||
stack.FAILED = 'FAILED'
|
||||
stack.status = stack.FAILED
|
||||
_worker.stop_all_workers(stack)
|
||||
self.assertFalse(mock_st.called)
|
||||
mock_cw.assert_called_with(stack, mock_tgm, 'engine-001',
|
||||
_worker._rpc_client)
|
||||
self.assertFalse(stack.rollback.called)
|
||||
|
|
|
@ -22,6 +22,11 @@ from heat.tests import utils
|
|||
|
||||
|
||||
class SyncPointTestCase(common.HeatTestCase):
|
||||
def setUp(self):
|
||||
super(SyncPointTestCase, self).setUp()
|
||||
self.dummy_event = mock.MagicMock()
|
||||
self.dummy_event.ready.return_value = False
|
||||
|
||||
def test_sync_waiting(self):
|
||||
ctx = utils.dummy_context()
|
||||
stack = tools.get_stack('test_stack', utils.dummy_context(),
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
# under the License.
|
||||
|
||||
import collections
|
||||
import eventlet
|
||||
import itertools
|
||||
import json
|
||||
import os
|
||||
|
@ -72,6 +73,7 @@ class ResourceTest(common.HeatTestCase):
|
|||
env=self.env),
|
||||
stack_id=str(uuid.uuid4()))
|
||||
self.dummy_timeout = 10
|
||||
self.dummy_event = eventlet.event.Event()
|
||||
|
||||
def test_get_class_ok(self):
|
||||
cls = resources.global_env().get_class_to_instantiate(
|
||||
|
@ -1880,10 +1882,7 @@ class ResourceTest(common.HeatTestCase):
|
|||
res._release('engine-id')
|
||||
self.assertFalse(mock_sau.called)
|
||||
|
||||
@mock.patch.object(resource.scheduler.TaskRunner, '__init__',
|
||||
return_value=None)
|
||||
@mock.patch.object(resource.scheduler.TaskRunner, '__call__')
|
||||
def test_create_convergence(self, mock_call, mock_init):
|
||||
def test_create_convergence(self):
|
||||
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
|
||||
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
|
||||
res.action = res.CREATE
|
||||
|
@ -1893,12 +1892,11 @@ class ResourceTest(common.HeatTestCase):
|
|||
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
|
||||
pcb = mock.Mock()
|
||||
|
||||
res.create_convergence(self.stack.t.id, res_data, 'engine-007',
|
||||
60, pcb)
|
||||
with mock.patch.object(resource.Resource, 'create') as mock_create:
|
||||
res.create_convergence(self.stack.t.id, res_data, 'engine-007',
|
||||
-1, pcb)
|
||||
self.assertTrue(mock_create.called)
|
||||
|
||||
mock_init.assert_called_once_with(res.create)
|
||||
mock_call.assert_called_once_with(timeout=60, progress_callback=pcb)
|
||||
self.assertEqual(self.stack.t.id, res.current_template_id)
|
||||
self.assertItemsEqual([1, 3], res.requires)
|
||||
self._assert_resource_lock(res.id, None, 2)
|
||||
|
||||
|
@ -1910,9 +1908,9 @@ class ResourceTest(common.HeatTestCase):
|
|||
res_data = {(1, True): {u'id': 1, u'name': 'A', 'attrs': {}},
|
||||
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
|
||||
|
||||
pcb = mock.Mock()
|
||||
self.assertRaises(scheduler.Timeout, res.create_convergence,
|
||||
self.stack.t.id, res_data, 'engine-007',
|
||||
-1)
|
||||
self.stack.t.id, res_data, 'engine-007', -1, pcb)
|
||||
|
||||
def test_create_convergence_sets_requires_for_failure(self):
|
||||
"""Ensure that requires are computed correctly.
|
||||
|
@ -1930,7 +1928,7 @@ class ResourceTest(common.HeatTestCase):
|
|||
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
|
||||
self.assertRaises(exception.ResourceNotAvailable,
|
||||
res.create_convergence, self.stack.t.id, res_data,
|
||||
'engine-007', self.dummy_timeout)
|
||||
'engine-007', self.dummy_timeout, self.dummy_event)
|
||||
self.assertItemsEqual([5, 3], res.requires)
|
||||
self._assert_resource_lock(res.id, None, 2)
|
||||
|
||||
|
@ -1945,9 +1943,10 @@ class ResourceTest(common.HeatTestCase):
|
|||
self._assert_resource_lock(res.id, None, None)
|
||||
res_data = {(1, True): {u'id': 5, u'name': 'A', 'attrs': {}},
|
||||
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
|
||||
res.create_convergence(self.stack.t.id, res_data, 'engine-007',
|
||||
self.dummy_timeout)
|
||||
|
||||
tr = scheduler.TaskRunner(res.create_convergence, self.stack.t.id,
|
||||
res_data, 'engine-007', self.dummy_timeout,
|
||||
self.dummy_event)
|
||||
tr()
|
||||
mock_adopt.assert_called_once_with(
|
||||
resource_data={'resource_id': 'fluffy'})
|
||||
self.assertItemsEqual([5, 3], res.requires)
|
||||
|
@ -1962,15 +1961,13 @@ class ResourceTest(common.HeatTestCase):
|
|||
self._assert_resource_lock(res.id, None, None)
|
||||
res_data = {(1, True): {u'id': 5, u'name': 'A', 'attrs': {}},
|
||||
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
|
||||
exc = self.assertRaises(exception.ResourceFailure,
|
||||
res.create_convergence, self.stack.t.id,
|
||||
res_data, 'engine-007', self.dummy_timeout)
|
||||
tr = scheduler.TaskRunner(res.create_convergence, self.stack.t.id,
|
||||
res_data, 'engine-007', self.dummy_timeout,
|
||||
self.dummy_event)
|
||||
exc = self.assertRaises(exception.ResourceFailure, tr)
|
||||
self.assertIn('Resource ID was not provided', six.text_type(exc))
|
||||
|
||||
@mock.patch.object(resource.scheduler.TaskRunner, '__init__',
|
||||
return_value=None)
|
||||
@mock.patch.object(resource.scheduler.TaskRunner, '__call__')
|
||||
def test_update_convergence(self, mock_call, mock_init):
|
||||
def test_update_convergence(self):
|
||||
tmpl = template.Template({
|
||||
'HeatTemplateFormatVersion': '2012-12-12',
|
||||
'Resources': {
|
||||
|
@ -1981,6 +1978,7 @@ class ResourceTest(common.HeatTestCase):
|
|||
stack.converge_stack(stack.t, action=stack.CREATE)
|
||||
res = stack.resources['test_res']
|
||||
res.requires = [2]
|
||||
res.action = res.CREATE
|
||||
res._store()
|
||||
self._assert_resource_lock(res.id, None, None)
|
||||
|
||||
|
@ -1997,13 +1995,13 @@ class ResourceTest(common.HeatTestCase):
|
|||
res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}},
|
||||
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
|
||||
pcb = mock.Mock()
|
||||
res.update_convergence(new_temp.id, res_data, 'engine-007', 120,
|
||||
new_stack, pcb)
|
||||
with mock.patch.object(resource.Resource, 'update') as mock_update:
|
||||
tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
|
||||
res_data, 'engine-007', 120, new_stack,
|
||||
pcb)
|
||||
tr()
|
||||
self.assertTrue(mock_update.called)
|
||||
|
||||
expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name]
|
||||
mock_init.assert_called_once_with(res.update, expected_rsrc_def)
|
||||
mock_call.assert_called_once_with(timeout=120, progress_callback=pcb)
|
||||
self.assertEqual(new_temp.id, res.current_template_id)
|
||||
self.assertItemsEqual([3, 4], res.requires)
|
||||
self._assert_resource_lock(res.id, None, 2)
|
||||
|
||||
|
@ -2030,9 +2028,10 @@ class ResourceTest(common.HeatTestCase):
|
|||
new_temp, stack_id=self.stack.id)
|
||||
|
||||
res_data = {}
|
||||
self.assertRaises(scheduler.Timeout, res.update_convergence,
|
||||
new_temp.id, res_data, 'engine-007',
|
||||
-1, new_stack)
|
||||
tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
|
||||
res_data, 'engine-007', -1, new_stack,
|
||||
self.dummy_event)
|
||||
self.assertRaises(scheduler.Timeout, tr)
|
||||
|
||||
def test_update_convergence_with_substitute_class(self):
|
||||
tmpl = rsrc_defn.ResourceDefinition('test_res',
|
||||
|
@ -2073,9 +2072,10 @@ class ResourceTest(common.HeatTestCase):
|
|||
new_temp, stack_id=self.stack.id)
|
||||
|
||||
res_data = {}
|
||||
self.assertRaises(resource.UpdateReplace, res.update_convergence,
|
||||
new_temp.id, res_data, 'engine-007',
|
||||
-1, new_stack)
|
||||
tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
|
||||
res_data, 'engine-007', -1, new_stack,
|
||||
self.dummy_event)
|
||||
self.assertRaises(resource.UpdateReplace, tr)
|
||||
|
||||
def test_update_in_progress_convergence(self):
|
||||
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
|
||||
|
@ -2088,12 +2088,10 @@ class ResourceTest(common.HeatTestCase):
|
|||
|
||||
res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}},
|
||||
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
|
||||
ex = self.assertRaises(exception.UpdateInProgress,
|
||||
res.update_convergence,
|
||||
'template_key',
|
||||
res_data, 'engine-007',
|
||||
self.dummy_timeout,
|
||||
mock.ANY)
|
||||
tr = scheduler.TaskRunner(res.update_convergence, 'template_key',
|
||||
res_data, 'engine-007', self.dummy_timeout,
|
||||
mock.ANY, self.dummy_event)
|
||||
ex = self.assertRaises(exception.UpdateInProgress, tr)
|
||||
msg = ("The resource %s is already being updated." %
|
||||
res.name)
|
||||
self.assertEqual(msg, six.text_type(ex))
|
||||
|
@ -2130,9 +2128,10 @@ class ResourceTest(common.HeatTestCase):
|
|||
new_temp, stack_id=self.stack.id)
|
||||
dummy_ex = exception.ResourceFailure(exc, res, action=res.UPDATE)
|
||||
mock_update.side_effect = dummy_ex
|
||||
self.assertRaises(exception.ResourceFailure,
|
||||
res.update_convergence, new_temp.id, res_data,
|
||||
'engine-007', 120, new_stack)
|
||||
tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
|
||||
res_data, 'engine-007', 120, new_stack,
|
||||
self.dummy_event)
|
||||
self.assertRaises(exception.ResourceFailure, tr)
|
||||
|
||||
expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name]
|
||||
mock_update.assert_called_once_with(expected_rsrc_def)
|
||||
|
@ -2170,9 +2169,10 @@ class ResourceTest(common.HeatTestCase):
|
|||
mock_update.side_effect = resource.UpdateReplace
|
||||
new_stack = parser.Stack(utils.dummy_context(), 'test_stack',
|
||||
new_temp, stack_id=self.stack.id)
|
||||
self.assertRaises(resource.UpdateReplace,
|
||||
res.update_convergence, new_temp.id, res_data,
|
||||
'engine-007', 120, new_stack)
|
||||
tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
|
||||
res_data, 'engine-007', 120, new_stack,
|
||||
self.dummy_event)
|
||||
self.assertRaises(resource.UpdateReplace, tr)
|
||||
|
||||
expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name]
|
||||
mock_update.assert_called_once_with(expected_rsrc_def)
|
||||
|
@ -2202,7 +2202,7 @@ class ResourceTest(common.HeatTestCase):
|
|||
res.restore_prev_rsrc = mock.Mock()
|
||||
tr = scheduler.TaskRunner(res.update_convergence, 'new_tmpl_id', {},
|
||||
'engine-007', self.dummy_timeout,
|
||||
new_stack)
|
||||
new_stack, self.dummy_event)
|
||||
self.assertRaises(resource.UpdateReplace, tr)
|
||||
self.assertTrue(res.restore_prev_rsrc.called)
|
||||
|
||||
|
@ -2225,15 +2225,13 @@ class ResourceTest(common.HeatTestCase):
|
|||
'Simulate rollback')
|
||||
res.restore_prev_rsrc = mock.Mock(side_effect=Exception)
|
||||
tr = scheduler.TaskRunner(res.update_convergence, 'new_tmpl_id', {},
|
||||
'engine-007', self.dummy_timeout, new_stack)
|
||||
'engine-007', self.dummy_timeout, new_stack,
|
||||
self.dummy_event)
|
||||
self.assertRaises(exception.ResourceFailure, tr)
|
||||
self.assertTrue(res.restore_prev_rsrc.called)
|
||||
self.assertEqual((res.UPDATE, res.FAILED), res.state)
|
||||
|
||||
@mock.patch.object(resource.scheduler.TaskRunner, '__init__',
|
||||
return_value=None)
|
||||
@mock.patch.object(resource.scheduler.TaskRunner, '__call__')
|
||||
def test_delete_convergence_ok(self, mock_call, mock_init):
|
||||
def test_delete_convergence_ok(self):
|
||||
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
|
||||
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
|
||||
res.current_template_id = 1
|
||||
|
@ -2244,11 +2242,13 @@ class ResourceTest(common.HeatTestCase):
|
|||
res._update_replacement_data = mock.Mock()
|
||||
self._assert_resource_lock(res.id, None, None)
|
||||
pcb = mock.Mock()
|
||||
res.delete_convergence(2, {}, 'engine-007', 20, pcb)
|
||||
|
||||
mock_init.assert_called_once_with(res.delete)
|
||||
mock_call.assert_called_once_with(timeout=20, progress_callback=pcb)
|
||||
with mock.patch.object(resource.Resource, 'delete') as mock_delete:
|
||||
tr = scheduler.TaskRunner(res.delete_convergence, 2, {},
|
||||
'engine-007', 20, pcb)
|
||||
tr()
|
||||
self.assertTrue(mock_delete.called)
|
||||
self.assertTrue(res._update_replacement_data.called)
|
||||
self._assert_resource_lock(res.id, None, 2)
|
||||
|
||||
def test_delete_convergence_does_not_delete_same_template_resource(self):
|
||||
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
|
||||
|
@ -2256,8 +2256,10 @@ class ResourceTest(common.HeatTestCase):
|
|||
res.current_template_id = 'same-template'
|
||||
res._store()
|
||||
res.delete = mock.Mock()
|
||||
res.delete_convergence('same-template', {}, 'engine-007',
|
||||
self.dummy_timeout)
|
||||
tr = scheduler.TaskRunner(res.delete_convergence, 'same-template', {},
|
||||
'engine-007', self.dummy_timeout,
|
||||
self.dummy_event)
|
||||
tr()
|
||||
self.assertFalse(res.delete.called)
|
||||
|
||||
def test_delete_convergence_fail(self):
|
||||
|
@ -2270,9 +2272,9 @@ class ResourceTest(common.HeatTestCase):
|
|||
res_id = res.id
|
||||
res.handle_delete = mock.Mock(side_effect=ValueError('test'))
|
||||
self._assert_resource_lock(res.id, None, None)
|
||||
self.assertRaises(exception.ResourceFailure,
|
||||
res.delete_convergence, 2, {}, 'engine-007',
|
||||
self.dummy_timeout)
|
||||
tr = scheduler.TaskRunner(res.delete_convergence, 2, {}, 'engine-007',
|
||||
self.dummy_timeout, self.dummy_event)
|
||||
self.assertRaises(exception.ResourceFailure, tr)
|
||||
self.assertTrue(res.handle_delete.called)
|
||||
|
||||
# confirm that the DB object still exists, and it's lock is released.
|
||||
|
@ -2291,9 +2293,10 @@ class ResourceTest(common.HeatTestCase):
|
|||
rs = resource_objects.Resource.get_obj(self.stack.context, res.id)
|
||||
rs.update_and_save({'engine_id': 'not-this'})
|
||||
self._assert_resource_lock(res.id, 'not-this', None)
|
||||
ex = self.assertRaises(exception.UpdateInProgress,
|
||||
res.delete_convergence,
|
||||
1, {}, 'engine-007', self.dummy_timeout)
|
||||
|
||||
tr = scheduler.TaskRunner(res.delete_convergence, 1, {}, 'engine-007',
|
||||
self.dummy_timeout, self.dummy_event)
|
||||
ex = self.assertRaises(exception.UpdateInProgress, tr)
|
||||
msg = ("The resource %s is already being updated." %
|
||||
res.name)
|
||||
self.assertEqual(msg, six.text_type(ex))
|
||||
|
@ -2308,7 +2311,9 @@ class ResourceTest(common.HeatTestCase):
|
|||
res.destroy = mock.Mock()
|
||||
input_data = {(1, False): 4, (2, False): 5} # needed_by resource ids
|
||||
self._assert_resource_lock(res.id, None, None)
|
||||
res.delete_convergence(1, input_data, 'engine-007', self.dummy_timeout)
|
||||
scheduler.TaskRunner(res.delete_convergence, 1, input_data,
|
||||
'engine-007', self.dummy_timeout,
|
||||
self.dummy_event)()
|
||||
self.assertItemsEqual([4, 5], res.needed_by)
|
||||
|
||||
@mock.patch.object(resource_objects.Resource, 'get_obj')
|
||||
|
@ -2449,7 +2454,9 @@ class ResourceTest(common.HeatTestCase):
|
|||
res._store()
|
||||
with mock.patch.object(resource_objects.Resource,
|
||||
'delete') as resource_del:
|
||||
res.delete_convergence(1, {}, 'engine-007', 1)
|
||||
tr = scheduler.TaskRunner(res.delete_convergence, 1, {},
|
||||
'engine-007', 1, self.dummy_event)
|
||||
tr()
|
||||
resource_del.assert_called_once_with(res.context, res.id)
|
||||
|
||||
def test_delete_convergence_throws_timeout(self):
|
||||
|
@ -2458,8 +2465,9 @@ class ResourceTest(common.HeatTestCase):
|
|||
res.action = res.CREATE
|
||||
res._store()
|
||||
timeout = -1 # to emulate timeout
|
||||
self.assertRaises(scheduler.Timeout, res.delete_convergence,
|
||||
1, {}, 'engine-007', timeout)
|
||||
tr = scheduler.TaskRunner(res.delete_convergence, 1, {}, 'engine-007',
|
||||
timeout, self.dummy_event)
|
||||
self.assertRaises(scheduler.Timeout, tr)
|
||||
|
||||
@mock.patch.object(parser.Stack, 'load')
|
||||
@mock.patch.object(resource.Resource, '_load_data')
|
||||
|
@ -4054,6 +4062,7 @@ class ResourceUpdateRestrictionTest(common.HeatTestCase):
|
|||
}
|
||||
}
|
||||
self.dummy_timeout = 10
|
||||
self.dummy_event = eventlet.event.Event()
|
||||
|
||||
def create_resource(self):
|
||||
self.stack = parser.Stack(utils.dummy_context(), 'test_stack',
|
||||
|
@ -4069,9 +4078,10 @@ class ResourceUpdateRestrictionTest(common.HeatTestCase):
|
|||
stack_id=str(uuid.uuid4()))
|
||||
res_data = {}
|
||||
res = self.stack['bar']
|
||||
pcb = mock.Mock()
|
||||
self.patchobject(res, 'lock')
|
||||
scheduler.TaskRunner(res.create_convergence, self.stack.t.id,
|
||||
res_data, 'engine-007', self.dummy_timeout)()
|
||||
res.create_convergence(self.stack.t.id, res_data, 'engine-007',
|
||||
self.dummy_timeout, pcb)
|
||||
return res
|
||||
|
||||
def test_update_restricted(self):
|
||||
|
@ -4193,7 +4203,8 @@ class ResourceUpdateRestrictionTest(common.HeatTestCase):
|
|||
{},
|
||||
'engine-007',
|
||||
self.dummy_timeout,
|
||||
self.new_stack))
|
||||
self.new_stack,
|
||||
eventlet.event.Event()))
|
||||
self.assertEqual('ResourceActionRestricted: resources.bar: '
|
||||
'replace is restricted for resource.',
|
||||
six.text_type(error))
|
||||
|
@ -4224,7 +4235,8 @@ class ResourceUpdateRestrictionTest(common.HeatTestCase):
|
|||
{},
|
||||
'engine-007',
|
||||
self.dummy_timeout,
|
||||
self.new_stack))
|
||||
self.new_stack,
|
||||
eventlet.event.Event()))
|
||||
self.assertIn('requires replacement', six.text_type(error))
|
||||
ev.assert_not_called()
|
||||
|
||||
|
|
Loading…
Reference in New Issue