diff --git a/heat/db/api.py b/heat/db/api.py index 347b936727..c930268f33 100644 --- a/heat/db/api.py +++ b/heat/db/api.py @@ -55,6 +55,10 @@ def raw_template_update(context, template_id, values): return IMPL.raw_template_update(context, template_id, values) +def raw_template_delete(context, template_id): + return IMPL.raw_template_delete(context, template_id) + + def resource_data_get_all(resource, data=None): return IMPL.resource_data_get_all(resource, data) diff --git a/heat/db/sqlalchemy/api.py b/heat/db/sqlalchemy/api.py index 135600f806..91ac654b66 100644 --- a/heat/db/sqlalchemy/api.py +++ b/heat/db/sqlalchemy/api.py @@ -116,6 +116,11 @@ def raw_template_update(context, template_id, values): return raw_template_ref +def raw_template_delete(context, template_id): + raw_template = raw_template_get(context, template_id) + raw_template.delete() + + def resource_get(context, resource_id): result = model_query(context, models.Resource).get(resource_id) diff --git a/heat/engine/resource.py b/heat/engine/resource.py index 580ff0edc8..9c76048d66 100644 --- a/heat/engine/resource.py +++ b/heat/engine/resource.py @@ -41,6 +41,7 @@ from heat.engine import resources from heat.engine import rsrc_defn from heat.engine import scheduler from heat.engine import support +from heat.engine import template from heat.objects import resource as resource_objects from heat.objects import resource_data as resource_data_objects from heat.rpc import client as rpc_client @@ -86,6 +87,12 @@ class ResourceUnknownStatus(exception.HeatException): result=result, status_reason=status_reason, **kwargs) +class UpdateInProgress(Exception): + def __init__(self, resource_name='Unknown'): + msg = _("The resource %s is already being updated.") % resource_name + super(Exception, self).__init__(six.text_type(msg)) + + class Resource(object): ACTIONS = ( INIT, CREATE, DELETE, UPDATE, ROLLBACK, @@ -228,6 +235,26 @@ class Resource(object): def stack(self, stack): self._stackref = weakref.ref(stack) + @classmethod + def load(cls, context, resource_id, data): + # FIXME(sirushtim): Import this in global space. + from heat.engine import stack as stack_mod + db_res = resource_objects.Resource.get_obj(context, resource_id) + stack = stack_mod.Stack.load(context, db_res.stack_id, cache_data=data) + # NOTE(sirushtim): Because on delete/cleanup operations, we simply + # update with another template, the stack object won't have the + # template of the previous stack-run. + tmpl = template.Template.load(context, db_res.current_template_id) + stack_res = tmpl.resource_definitions(stack)[db_res.name] + resource = cls(db_res.name, stack_res, stack) + resource._load_data(db_res) + return resource, stack + + def make_replacement(self): + # NOTE(sirushtim): Used for mocking. Will be complete + # once convergence-resource-replacement is implemented. + pass + def reparse(self): self.properties = self.t.properties(self.properties_schema, self.context) @@ -267,6 +294,13 @@ class Resource(object): rs.update_and_save({'rsrc_metadata': metadata}) self._rsrc_metadata = metadata + def clear_requirers(self, gone_requires): + self.requires = set(self.requires) - set(gone_requires) + self.requires = list(self.requires) + self._store_or_update(self.action, + self.status, + self.status_reason) + @classmethod def set_needed_by(cls, db_rsrc, needed_by): if db_rsrc: diff --git a/heat/engine/stack.py b/heat/engine/stack.py index d3129c43b3..fdc47002d9 100755 --- a/heat/engine/stack.py +++ b/heat/engine/stack.py @@ -44,12 +44,14 @@ from heat.engine import scheduler from heat.engine import sync_point from heat.engine import template as tmpl from heat.engine import update +from heat.objects import raw_template as raw_template_object from heat.objects import resource as resource_objects from heat.objects import snapshot as snapshot_object from heat.objects import stack as stack_object from heat.objects import stack_tag as stack_tag_object from heat.objects import user_creds as ucreds_object from heat.rpc import api as rpc_api +from heat.rpc import worker_client as rpc_worker_client cfg.CONF.import_opt('error_wait_time', 'heat.common.config') @@ -140,6 +142,7 @@ class Stack(collections.Mapping): self.prev_raw_template_id = prev_raw_template_id self.current_deps = current_deps self.cache_data = cache_data + self._worker_client = None if use_stored_context: self.context = self.stored_context() @@ -164,6 +167,13 @@ class Stack(collections.Mapping): else: self.outputs = {} + @property + def worker_client(self): + '''Return a client for making engine RPC calls.''' + if not self._worker_client: + self._worker_client = rpc_worker_client.WorkerClient() + return self._worker_client + @property def env(self): """This is a helper to allow resources to access stack.env.""" @@ -972,6 +982,10 @@ class Stack(collections.Mapping): LOG.info(_LI("Triggering resource %(rsrc_id)s " "for update=%(is_update)s"), {'rsrc_id': rsrc_id, 'is_update': is_update}) + self.worker_client.check_resource(self.context, rsrc_id, + self.current_traversal, + {}, is_update) + self.temp_update_requires(self.convergence_dependencies) def _update_or_store_resources(self): @@ -1583,3 +1597,29 @@ class Stack(collections.Mapping): def cache_data_resource_attribute(self, resource_name, attribute_key): return self.cache_data.get( resource_name, {}).get('attributes', {}).get(attribute_key) + + def mark_complete(self, traversal_id): + ''' + Mark the update as complete. + + This currently occurs when all resources have been updated; there may + still be resources being cleaned up, but the Stack should now be in + service. + ''' + if traversal_id != self.current_traversal: + return + + LOG.info('[%s(%s)] update traversal %s complete', + self.name, self.id, traversal_id) + + prev_prev_id = self.prev_raw_template_id + self.prev_raw_template_id = self.t.id + self.store() + + if (prev_prev_id is not None and + prev_prev_id != self.t.id): + raw_template_object.RawTemplate.delete(self.context, + prev_prev_id) + + reason = 'Stack %s completed successfully' % self.action + self.state_set(self.action, self.COMPLETE, reason) diff --git a/heat/engine/sync_point.py b/heat/engine/sync_point.py index dbb66222c8..c26d6de667 100644 --- a/heat/engine/sync_point.py +++ b/heat/engine/sync_point.py @@ -12,8 +12,26 @@ # See the License for the specific language governing permissions and # limitations under the License. +from oslo_log import log as logging +import six + +from heat.common.i18n import _ from heat.objects import sync_point as sync_point_object +LOG = logging.getLogger(__name__) + + +KEY_SEPERATOR = ':' + + +def _dump_list(items, separator=', '): + return separator.join(map(str, items)) + + +def make_key(*components): + assert len(components) >= 2 + return _dump_list(components, KEY_SEPERATOR) + def create(context, entity_id, traversal_id, is_update, stack_id): """ @@ -29,8 +47,14 @@ def get(context, entity_id, traversal_id, is_update): """ Retrieves a sync point entry from DB. """ - return sync_point_object.SyncPoint.get_by_key(context, entity_id, - traversal_id, is_update) + sync_point = sync_point_object.SyncPoint.get_by_key(context, entity_id, + traversal_id, + is_update) + if sync_point is None: + key = (entity_id, traversal_id, is_update) + raise SyncPointNotFound(key) + + return sync_point def delete_all(context, stack_id, traversal_id): @@ -40,3 +64,52 @@ def delete_all(context, stack_id, traversal_id): return sync_point_object.SyncPoint.delete_all_by_stack_and_traversal( context, stack_id, traversal_id ) + + +def update_input_data(context, entity_id, current_traversal, + is_update, atomic_key, input_data): + sync_point_object.SyncPoint.update_input_data( + context, entity_id, current_traversal, is_update, atomic_key, + input_data) + + +def deserialize_input_data(db_input_data): + db_input_data = db_input_data.get('input_data') + if not db_input_data: + return {} + + return {tuple(i): j for i, j in db_input_data} + + +def serialize_input_data(input_data): + return {'input_data': [[list(i), j] for i, j in six.iteritems(input_data)]} + + +def sync(cnxt, entity_id, current_traversal, is_update, propagate, + predecessors, new_data): + sync_point = get(cnxt, entity_id, current_traversal, + is_update) + input_data = dict(deserialize_input_data(sync_point.input_data)) + input_data.update(new_data) + waiting = predecessors - set(input_data) + + # Note: update must be atomic + update_input_data(cnxt, entity_id, current_traversal, + is_update, sync_point.atomic_key, + serialize_input_data(input_data)) + + key = make_key(entity_id, current_traversal, is_update) + if waiting: + LOG.debug('[%s] Waiting %s: Got %s; still need %s', + key, entity_id, _dump_list(input_data), _dump_list(waiting)) + else: + LOG.debug('[%s] Ready %s: Got %s', + key, entity_id, _dump_list(input_data)) + propagate(entity_id, input_data) + + +class SyncPointNotFound(Exception): + '''Raised when resource update requires replacement.''' + def __init__(self, sync_point): + msg = _("Sync Point %s not found") % (sync_point, ) + super(Exception, self).__init__(six.text_type(msg)) diff --git a/heat/engine/worker.py b/heat/engine/worker.py index 897a1c3509..3fcad93344 100644 --- a/heat/engine/worker.py +++ b/heat/engine/worker.py @@ -16,10 +16,16 @@ from oslo_log import log as logging import oslo_messaging from osprofiler import profiler +import six +from heat.common import context +from heat.common import exception from heat.common.i18n import _LE from heat.common.i18n import _LI from heat.common import messaging as rpc_messaging +from heat.engine import dependencies +from heat.engine import resource +from heat.engine import sync_point from heat.openstack.common import service from heat.rpc import worker_client as rpc_client @@ -36,7 +42,7 @@ class WorkerService(service.Service): or expect replies from these messages. """ - RPC_API_VERSION = '1.0' + RPC_API_VERSION = '1.1' def __init__(self, host, @@ -76,3 +82,146 @@ class WorkerService(service.Service): LOG.error(_LE("WorkerService is failed to stop, %s"), e) super(WorkerService, self).stop() + + @context.request_context + def check_resource(self, cnxt, resource_id, current_traversal, data, + is_update): + ''' + Process a node in the dependency graph. + + The node may be associated with either an update or a cleanup of its + associated resource. + ''' + try: + rsrc, stack = resource.Resource.load(cnxt, resource_id, data) + except exception.NotFound: + return + tmpl = stack.t + + if current_traversal != rsrc.stack.current_traversal: + LOG.debug('[%s] Traversal cancelled; stopping.', current_traversal) + return + + current_deps = ([tuple(i), (tuple(j) if j is not None else None)] + for i, j in rsrc.stack.current_deps['edges']) + deps = dependencies.Dependencies(edges=current_deps) + graph = deps.graph() + + if is_update: + if (rsrc.replaced_by is not None and + rsrc.current_template_id != tmpl.id): + return + + try: + check_resource_update(rsrc, tmpl.id, data) + except resource.UpdateReplace: + # NOTE(sirushtim): Implemented by spec + # convergence-resource-replacement. + rsrc.make_replacement() + return + except resource.UpdateInProgress: + return + + input_data = construct_input_data(rsrc) + else: + try: + check_resource_cleanup(rsrc, tmpl.id, data) + except resource.UpdateInProgress: + return + + graph_key = (rsrc.id, is_update) + if graph_key not in graph and rsrc.replaces is not None: + # If we are a replacement, impersonate the replaced resource for + # the purposes of calculating whether subsequent resources are + # ready, since everybody has to work from the same version of the + # graph. Our real resource ID is sent in the input_data, so the + # dependencies will get updated to point to this resource in time + # for the next traversal. + graph_key = (rsrc.replaces, is_update) + + try: + for req, fwd in deps.required_by(graph_key): + propagate_check_resource( + cnxt, self._rpc_client, req, current_traversal, + set(graph[(req, fwd)]), graph_key, + input_data if fwd else rsrc.id, fwd) + + check_stack_complete(cnxt, rsrc.stack, current_traversal, + rsrc.id, graph, is_update) + except sync_point.SyncPointNotFound: + # NOTE(sirushtim): Implemented by spec + # convergence-concurrent-workflow + pass + + +def construct_input_data(rsrc): + attributes = rsrc.stack.get_dep_attrs( + six.itervalues(rsrc.stack.resources), + rsrc.stack.outputs, + rsrc.name) + resolved_attributes = {attr: rsrc.FnGetAtt(attr) for attr in attributes} + input_data = {'id': rsrc.id, + 'name': rsrc.name, + 'physical_resource_id': rsrc.resource_id, + 'attrs': resolved_attributes} + return input_data + + +def check_stack_complete(cnxt, stack, current_traversal, sender, graph, + is_update): + ''' + Mark the stack complete if the update is complete. + + Complete is currently in the sense that all desired resources are in + service, not that superfluous ones have been cleaned up. + ''' + roots = set(key for (key, fwd), node in graph.items() + if not any(f for k, f in node.required_by())) + + if sender not in roots: + return + + def mark_complete(stack_id, data): + stack.mark_complete(current_traversal) + + sync_point.sync(cnxt, stack.id, current_traversal, is_update, + mark_complete, roots, {sender: None}) + + +def propagate_check_resource(cnxt, rpc_client, next_res_id, + current_traversal, predecessors, sender, + sender_data, is_update): + ''' + Trigger processing of a node if all of its dependencies are satisfied. + ''' + def do_check(entity_id, data): + rpc_client.check_resource(cnxt, entity_id, current_traversal, + data, is_update) + + sync_point.sync(cnxt, next_res_id, current_traversal, + is_update, do_check, predecessors, + {sender: sender_data}) + + +def check_resource_update(rsrc, template_id, data): + ''' + Create or update the Resource if appropriate. + ''' + input_data = {in_data.name: in_data for in_data in data.values()} + + if rsrc.resource_id is None: + rsrc.create(template_id, input_data) + else: + rsrc.update(template_id, input_data) + + +def check_resource_cleanup(rsrc, template_id, data): + ''' + Delete the Resource if appropriate. + ''' + # Clear out deleted resources from the requirers list + rsrc.clear_requirers(rsrc_id for rsrc_id, id in data.items() + if id is None) + + if rsrc.current_template_id != template_id: + rsrc.delete(template_id, data) diff --git a/heat/objects/raw_template.py b/heat/objects/raw_template.py index d83bf2eb13..af20200726 100644 --- a/heat/objects/raw_template.py +++ b/heat/objects/raw_template.py @@ -87,3 +87,7 @@ class RawTemplate( @classmethod def update_by_id(cls, context, template_id, values): return db_api.raw_template_update(context, template_id, values) + + @classmethod + def delete(cls, context, template_id): + return db_api.raw_template_delete(context, template_id) diff --git a/heat/rpc/worker_client.py b/heat/rpc/worker_client.py index 5855ddfacb..b8fa5f7ea8 100644 --- a/heat/rpc/worker_client.py +++ b/heat/rpc/worker_client.py @@ -27,6 +27,7 @@ class WorkerClient(object): API version history:: 1.0 - Initial version. + 1.1 - Added check_resource. ''' BASE_RPC_API_VERSION = '1.0' @@ -47,3 +48,10 @@ class WorkerClient(object): else: client = self._client client.cast(ctxt, method, **kwargs) + + def check_resource(self, ctxt, resource_id, + current_traversal, data, is_update): + self.cast(ctxt, self.make_msg( + 'check_resource', resource_id=resource_id, + current_traversal=current_traversal, data=data, + is_update=is_update)) diff --git a/heat/tests/db/test_sqlalchemy_api.py b/heat/tests/db/test_sqlalchemy_api.py index d529a68899..b9ab1d5fc2 100644 --- a/heat/tests/db/test_sqlalchemy_api.py +++ b/heat/tests/db/test_sqlalchemy_api.py @@ -1471,6 +1471,13 @@ class DBAPIRawTemplateTest(common.HeatTestCase): self.assertEqual(new_t, updated_tp.template) self.assertEqual(new_files, updated_tp.files) + def test_raw_template_delete(self): + t = template_format.parse(wp_template) + tp = create_raw_template(self.ctx, template=t) + db_api.raw_template_delete(self.ctx, tp.id) + self.assertRaises(exception.NotFound, db_api.raw_template_get, + self.ctx, tp.id) + class DBAPIUserCredsTest(common.HeatTestCase): def setUp(self): diff --git a/heat/tests/engine/tools.py b/heat/tests/engine/tools.py index cbc8c42bff..1571fea526 100644 --- a/heat/tests/engine/tools.py +++ b/heat/tests/engine/tools.py @@ -45,6 +45,45 @@ resources: UserData: wordpress ''' +string_template_five = ''' +heat_template_version: 2013-05-23 +description: Random String templates + +parameters: + salt: + type: string + default: "quickbrownfox" + +resources: + A: + type: OS::Heat::RandomString + properties: + salt: {get_param: salt} + + B: + type: OS::Heat::RandomString + properties: + salt: {get_param: salt} + + C: + type: OS::Heat::RandomString + depends_on: [A, B] + properties: + salt: {get_attr: [A, value]} + + D: + type: OS::Heat::RandomString + depends_on: C + properties: + salt: {get_param: salt} + + E: + type: OS::Heat::RandomString + depends_on: C + properties: + salt: {get_param: salt} +''' + def get_stack(stack_name, ctx, template=None, with_params=True, convergence=False): diff --git a/heat/tests/test_engine_service.py b/heat/tests/test_engine_service.py index a482c2ea36..87e95d373f 100644 --- a/heat/tests/test_engine_service.py +++ b/heat/tests/test_engine_service.py @@ -51,6 +51,7 @@ from heat.objects import watch_rule as watch_rule_object from heat.openstack.common import threadgroup from heat.rpc import api as rpc_api from heat.rpc import worker_api +from heat.rpc import worker_client from heat.tests import common from heat.tests.engine import tools from heat.tests import generic_resource as generic_rsrc @@ -61,45 +62,6 @@ cfg.CONF.import_opt('engine_life_check_timeout', 'heat.common.config') cfg.CONF.import_opt('enable_stack_abandon', 'heat.common.config') -string_template_five = ''' -heat_template_version: 2013-05-23 -description: Random String templates - -parameters: - salt: - type: string - default: "quickbrownfox" - -resources: - A: - type: OS::Heat::RandomString - properties: - salt: {get_param: salt} - - B: - type: OS::Heat::RandomString - properties: - salt: {get_param: salt} - - C: - type: OS::Heat::RandomString - depends_on: [A, B] - properties: - salt: {get_param: salt} - - D: - type: OS::Heat::RandomString - depends_on: C - properties: - salt: {get_param: salt} - - E: - type: OS::Heat::RandomString - depends_on: C - properties: - salt: {get_param: salt} -''' - string_template_five_update = ''' heat_template_version: 2013-05-23 description: Random String templates @@ -231,14 +193,16 @@ resources: ''' +@mock.patch.object(worker_client.WorkerClient, 'check_resource') class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase): def setUp(self): super(StackConvergenceCreateUpdateDeleteTest, self).setUp() cfg.CONF.set_override('convergence_engine', True) - def test_conv_wordpress_single_instance_stack_create(self): + def test_conv_wordpress_single_instance_stack_create(self, mock_cr): stack = tools.get_stack('test_stack', utils.dummy_context(), convergence=True) + stack.converge_stack(template=stack.t, action=stack.CREATE) self.assertIsNone(stack.ext_rsrcs_db) self.assertEqual('Dependencies([((1, True), None)])', @@ -252,11 +216,20 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase): self.assertEqual(stack_db.convergence, True) self.assertEqual({'edges': [[[1, True], None]]}, stack_db.current_deps) + leaves = stack.convergence_dependencies.leaves() + expected_calls = [] + for rsrc_id, is_update in leaves: + expected_calls.append( + mock.call.worker_client.WorkerClient.check_resource( + stack.context, rsrc_id, stack.current_traversal, {}, + is_update)) + self.assertEqual(expected_calls, mock_cr.mock_calls) - def test_conv_string_five_instance_stack_create(self): + def test_conv_string_five_instance_stack_create(self, mock_cr): stack = tools.get_stack('test_stack', utils.dummy_context(), - template=string_template_five, + template=tools.string_template_five, convergence=True) + stack.converge_stack(template=stack.t, action=stack.CREATE) self.assertIsNone(stack.ext_rsrcs_db) self.assertEqual('Dependencies([' @@ -299,9 +272,18 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase): self.assertIsNotNone(sync_point) self.assertEqual(stack_db.id, sync_point.stack_id) - def test_conv_string_five_instance_stack_update(self): + leaves = stack.convergence_dependencies.leaves() + expected_calls = [] + for rsrc_id, is_update in leaves: + expected_calls.append( + mock.call.worker_client.WorkerClient.check_resource( + stack.context, rsrc_id, stack.current_traversal, {}, + is_update)) + self.assertEqual(expected_calls, mock_cr.mock_calls) + + def test_conv_string_five_instance_stack_update(self, mock_cr): stack = tools.get_stack('test_stack', utils.dummy_context(), - template=string_template_five, + template=tools.string_template_five, convergence=True) # create stack stack.converge_stack(template=stack.t, action=stack.CREATE) @@ -401,9 +383,25 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase): self.assertIsNotNone(sync_point) self.assertEqual(stack_db.id, sync_point.stack_id) - def test_conv_empty_template_stack_update_delete(self): + leaves = stack.convergence_dependencies.leaves() + expected_calls = [] + for rsrc_id, is_update in leaves: + expected_calls.append( + mock.call.worker_client.WorkerClient.check_resource( + stack.context, rsrc_id, stack.current_traversal, {}, + is_update)) + + leaves = curr_stack.convergence_dependencies.leaves() + for rsrc_id, is_update in leaves: + expected_calls.append( + mock.call.worker_client.WorkerClient.check_resource( + curr_stack.context, rsrc_id, curr_stack.current_traversal, + {}, is_update)) + self.assertEqual(expected_calls, mock_cr.mock_calls) + + def test_conv_empty_template_stack_update_delete(self, mock_cr): stack = tools.get_stack('test_stack', utils.dummy_context(), - template=string_template_five, + template=tools.string_template_five, convergence=True) # create stack stack.converge_stack(template=stack.t, action=stack.CREATE) @@ -458,6 +456,22 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase): self.assertIsNotNone(sync_point) self.assertEqual(stack_db.id, sync_point.stack_id) + leaves = stack.convergence_dependencies.leaves() + expected_calls = [] + for rsrc_id, is_update in leaves: + expected_calls.append( + mock.call.worker_client.WorkerClient.check_resource( + stack.context, rsrc_id, stack.current_traversal, {}, + is_update)) + + leaves = curr_stack.convergence_dependencies.leaves() + for rsrc_id, is_update in leaves: + expected_calls.append( + mock.call.worker_client.WorkerClient.check_resource( + curr_stack.context, rsrc_id, curr_stack.current_traversal, + {}, is_update)) + self.assertEqual(expected_calls, mock_cr.mock_calls) + class StackCreateTest(common.HeatTestCase): def setUp(self): @@ -1342,7 +1356,7 @@ class StackConvergenceServiceCreateUpdateTest(common.HeatTestCase): template = '{ "Template": "data" }' stack = tools.get_stack(stack_name, self.ctx, - template=string_template_five, + template=tools.string_template_five, convergence=True) self.m.StubOutWithMock(templatem, 'Template') @@ -1382,7 +1396,7 @@ class StackConvergenceServiceCreateUpdateTest(common.HeatTestCase): params = {'foo': 'bar'} template = '{ "Template": "data" }' old_stack = tools.get_stack(stack_name, self.ctx, - template=string_template_five, + template=tools.string_template_five, convergence=True) sid = old_stack.store() s = stack_object.Stack.get_by_id(self.ctx, sid) diff --git a/heat/tests/test_engine_worker.py b/heat/tests/test_engine_worker.py index 33720b8c38..1c061feab6 100644 --- a/heat/tests/test_engine_worker.py +++ b/heat/tests/test_engine_worker.py @@ -15,22 +15,28 @@ import mock +from heat.engine import resource +from heat.engine import stack +from heat.engine import sync_point from heat.engine import worker +from heat.rpc import worker_client from heat.tests import common +from heat.tests.engine import tools +from heat.tests import utils class WorkerServiceTest(common.HeatTestCase): def setUp(self): super(WorkerServiceTest, self).setUp() - thread_gruop_mgr = mock.Mock() + thread_group_mgr = mock.Mock() self.worker = worker.WorkerService('host-1', 'topic-1', 'engine_id', - thread_gruop_mgr) + thread_group_mgr) def test_make_sure_rpc_version(self): self.assertEqual( - '1.0', + '1.1', worker.WorkerService.RPC_API_VERSION, ('RPC version is changed, please update this test to new version ' 'and make sure additional test cases are added for RPC APIs ' @@ -80,3 +86,223 @@ class WorkerServiceTest(common.HeatTestCase): self.worker.stop() mock_rpc_server.stop.assert_called_once_with() mock_rpc_server.wait.assert_called_once_with() + + +@mock.patch.object(worker, 'construct_input_data') +@mock.patch.object(worker, 'check_stack_complete') +@mock.patch.object(worker, 'propagate_check_resource') +@mock.patch.object(worker, 'check_resource_cleanup') +@mock.patch.object(worker, 'check_resource_update') +class CheckWorkflowUpdateTest(common.HeatTestCase): + @mock.patch.object(worker_client.WorkerClient, 'check_resource', + lambda *_: None) + def setUp(self): + super(CheckWorkflowUpdateTest, self).setUp() + thread_group_mgr = mock.Mock() + self.worker = worker.WorkerService('host-1', + 'topic-1', + 'engine_id', + thread_group_mgr) + self.worker._rpc_client = worker_client.WorkerClient() + self.ctx = utils.dummy_context() + self.stack = tools.get_stack( + 'check_workflow_create_stack', self.ctx, + template=tools.string_template_five, convergence=True) + self.stack.converge_stack(self.stack.t) + self.resource = self.stack['A'] + self.is_update = True + self.graph_key = (self.resource.id, self.is_update) + + def test_resource_not_available( + self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): + self.worker.check_resource( + self.ctx, 'non-existant-id', self.stack.current_traversal, {}, + True) + for mocked in [mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid]: + self.assertFalse(mocked.called) + + def test_stale_traversal( + self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): + self.worker.check_resource(self.ctx, self.resource.id, + 'stale-traversal', {}, True) + for mocked in [mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid]: + self.assertFalse(mocked.called) + + def test_is_update_traversal( + self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): + self.worker.check_resource( + self.ctx, self.resource.id, self.stack.current_traversal, {}, + self.is_update) + mock_cru.assert_called_once_with(self.resource, + self.resource.stack.t.id, + {}) + self.assertFalse(mock_crc.called) + + expected_calls = [] + for req, fwd in self.stack.convergence_dependencies.leaves(): + expected_calls.append( + (mock.call.worker.propagate_check_resource. + assert_called_once_with( + self.ctx, mock.ANY, mock.ANY, + self.stack.current_traversal, mock.ANY, + self.graph_key, {}, self.is_update))) + mock_csc.assert_called_once_with( + self.ctx, mock.ANY, self.stack.current_traversal, + self.resource.id, + mock.ANY, True) + + @mock.patch.object(resource.Resource, 'make_replacement') + def test_is_update_traversal_raise_update_replace( + self, mock_mr, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): + mock_cru.side_effect = resource.UpdateReplace + self.worker.check_resource( + self.ctx, self.resource.id, self.stack.current_traversal, {}, + self.is_update) + mock_cru.assert_called_once_with(self.resource, + self.resource.stack.t.id, + {}) + self.assertTrue(mock_mr.called) + self.assertFalse(mock_crc.called) + self.assertFalse(mock_pcr.called) + self.assertFalse(mock_csc.called) + + @mock.patch.object(resource.Resource, 'make_replacement') + def test_is_update_traversal_raise_update_inprogress( + self, mock_mr, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): + mock_cru.side_effect = resource.UpdateInProgress + self.worker.check_resource( + self.ctx, self.resource.id, self.stack.current_traversal, {}, + self.is_update) + mock_cru.assert_called_once_with(self.resource, + self.resource.stack.t.id, + {}) + self.assertFalse(mock_mr.called) + self.assertFalse(mock_crc.called) + self.assertFalse(mock_pcr.called) + self.assertFalse(mock_csc.called) + + +@mock.patch.object(worker, 'construct_input_data') +@mock.patch.object(worker, 'check_stack_complete') +@mock.patch.object(worker, 'propagate_check_resource') +@mock.patch.object(worker, 'check_resource_cleanup') +@mock.patch.object(worker, 'check_resource_update') +class CheckWorkflowCleanupTest(common.HeatTestCase): + @mock.patch.object(worker_client.WorkerClient, 'check_resource', + lambda *_: None) + def setUp(self): + super(CheckWorkflowCleanupTest, self).setUp() + thread_group_mgr = mock.Mock() + self.worker = worker.WorkerService('host-1', + 'topic-1', + 'engine_id', + thread_group_mgr) + self.worker._rpc_client = worker_client.WorkerClient() + self.ctx = utils.dummy_context() + tstack = tools.get_stack( + 'check_workflow_create_stack', self.ctx, + template=tools.string_template_five, convergence=True) + tstack.converge_stack(tstack.t, action=tstack.CREATE) + self.stack = stack.Stack.load(self.ctx, stack_id=tstack.id) + self.stack.converge_stack(self.stack.t, action=self.stack.DELETE) + self.resource = self.stack['A'] + self.is_update = False + self.graph_key = (self.resource.id, self.is_update) + + def test_is_cleanup_traversal( + self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): + self.worker.check_resource( + self.ctx, self.resource.id, self.stack.current_traversal, {}, + self.is_update) + self.assertFalse(mock_cru.called) + mock_crc.assert_called_once_with( + self.resource, self.resource.stack.t.id, + {}) + + def test_is_cleanup_traversal_raise_update_inprogress( + self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): + mock_crc.side_effect = resource.UpdateInProgress + self.worker.check_resource( + self.ctx, self.resource.id, self.stack.current_traversal, {}, + self.is_update) + mock_crc.assert_called_once_with(self.resource, + self.resource.stack.t.id, + {}) + self.assertFalse(mock_cru.called) + self.assertFalse(mock_pcr.called) + self.assertFalse(mock_csc.called) + + +class MiscMethodsTest(common.HeatTestCase): + def setUp(self): + super(MiscMethodsTest, self).setUp() + self.ctx = utils.dummy_context() + self.stack = tools.get_stack( + 'check_workflow_create_stack', self.ctx, + template=tools.string_template_five, convergence=True) + self.stack.converge_stack(self.stack.t) + self.resource = self.stack['A'] + + def test_construct_input_data(self): + expected_input_data = {'attrs': {'value': None}, + 'id': mock.ANY, + 'physical_resource_id': None, + 'name': 'A'} + actual_input_data = worker.construct_input_data(self.resource) + self.assertEqual(expected_input_data, actual_input_data) + + @mock.patch.object(sync_point, 'sync') + def test_check_stack_complete_root(self, mock_sync): + worker.check_stack_complete( + self.ctx, self.stack, self.stack.current_traversal, + self.stack['E'].id, self.stack.convergence_dependencies.graph(), + True) + mock_sync.assert_called_once_with( + self.ctx, self.stack.id, self.stack.current_traversal, True, + mock.ANY, mock.ANY, {self.stack['E'].id: None}) + + @mock.patch.object(sync_point, 'sync') + def test_check_stack_complete_child(self, mock_sync): + worker.check_stack_complete( + self.ctx, self.stack, self.stack.current_traversal, + self.resource.id, self.stack.convergence_dependencies.graph(), + True) + self.assertFalse(mock_sync.called) + + @mock.patch.object(sync_point, 'sync') + def test_propagate_check_resource(self, mock_sync): + worker.propagate_check_resource( + self.ctx, mock.ANY, mock.ANY, + self.stack.current_traversal, mock.ANY, + mock.ANY, {}, True) + self.assertTrue(mock_sync.called) + + @mock.patch.object(resource.Resource, 'create') + def test_check_resource_update_create(self, mock_create): + worker.check_resource_update(self.resource, self.resource.stack.t.id, + {}) + self.assertTrue(mock_create.called) + + @mock.patch.object(resource.Resource, 'update') + def test_check_resource_update_update(self, mock_update): + self.resource.resource_id = 'physical-res-id' + worker.check_resource_update(self.resource, self.resource.stack.t.id, + {}) + self.assertTrue(mock_update.called) + + @mock.patch.object(resource.Resource, 'delete') + @mock.patch.object(resource.Resource, 'clear_requirers') + def test_check_resource_cleanup_delete(self, mock_cr, mock_delete): + self.resource.current_template_id = 'new-template-id' + worker.check_resource_cleanup(self.resource, self.resource.stack.t.id, + {}) + self.assertTrue(mock_cr.called) + self.assertTrue(mock_delete.called) + + @mock.patch.object(resource.Resource, 'delete') + @mock.patch.object(resource.Resource, 'clear_requirers') + def test_check_resource_cleanup_nodelete(self, mock_cr, mock_delete): + worker.check_resource_cleanup(self.resource, self.resource.stack.t.id, + {}) + self.assertTrue(mock_cr.called) + self.assertFalse(mock_delete.called) diff --git a/heat/tests/test_resource.py b/heat/tests/test_resource.py index debe3addd0..cc0ad59538 100644 --- a/heat/tests/test_resource.py +++ b/heat/tests/test_resource.py @@ -85,6 +85,21 @@ class ResourceTest(common.HeatTestCase): self.assertIsInstance(res, generic_rsrc.GenericResource) self.assertEqual("INIT", res.action) + def test_resource_load_with_state(self): + self.stack = parser.Stack(utils.dummy_context(), 'test_stack', + template.Template(empty_template)) + self.stack.store() + snippet = rsrc_defn.ResourceDefinition('aresource', + 'GenericResourceType') + # Store Resource + res = resource.Resource('aresource', snippet, self.stack) + res.current_template_id = self.stack.t.id + res.state_set('CREATE', 'IN_PROGRESS') + self.stack.add_resource(res) + loaded_res, stack = resource.Resource.load(self.stack.context, + res.id, {}) + self.assertEqual(loaded_res.id, res.id) + def test_resource_invalid_name(self): snippet = rsrc_defn.ResourceDefinition('wrong/name', 'GenericResourceType') diff --git a/heat/tests/test_stack.py b/heat/tests/test_stack.py index 89d6bc4ead..2e631681ce 100644 --- a/heat/tests/test_stack.py +++ b/heat/tests/test_stack.py @@ -32,6 +32,7 @@ from heat.engine import resource from heat.engine import scheduler from heat.engine import stack from heat.engine import template +from heat.objects import raw_template as raw_template_object from heat.objects import stack as stack_object from heat.objects import stack_tag as stack_tag_object from heat.objects import user_creds as ucreds_object @@ -2036,6 +2037,62 @@ class StackTest(common.HeatTestCase): self.assertEqual('foo', params.get('param1')) self.assertEqual('bar', params.get('param2')) + @mock.patch.object(raw_template_object.RawTemplate, 'delete') + def test_mark_complete_create(self, mock_delete): + tmpl = template.Template({ + 'HeatTemplateFormatVersion': '2012-12-12', + 'Resources': { + 'foo': {'Type': 'GenericResourceType'} + } + }) + + tmpl_stack = stack.Stack(self.ctx, 'test', tmpl) + tmpl_stack.store() + tmpl_stack.current_traversal = 'some-traversal' + tmpl_stack.mark_complete('some-traversal') + self.assertEqual(tmpl_stack.prev_raw_template_id, + tmpl_stack.t.id) + self.assertFalse(mock_delete.called) + self.assertEqual(tmpl_stack.status, tmpl_stack.COMPLETE) + + @mock.patch.object(raw_template_object.RawTemplate, 'delete') + @mock.patch.object(stack.Stack, 'store') + def test_mark_complete_update(self, mock_store, mock_delete): + tmpl = template.Template({ + 'HeatTemplateFormatVersion': '2012-12-12', + 'Resources': { + 'foo': {'Type': 'GenericResourceType'} + } + }) + + tmpl_stack = stack.Stack(self.ctx, 'test', tmpl) + tmpl_stack.id = 2 + tmpl_stack.t.id = 2 + tmpl_stack.prev_raw_template_id = 1 + tmpl_stack.current_traversal = 'some-traversal' + tmpl_stack.mark_complete('some-traversal') + self.assertEqual(tmpl_stack.prev_raw_template_id, + tmpl_stack.t.id) + mock_delete.assert_called_once_with(self.ctx, 1) + self.assertEqual(tmpl_stack.status, tmpl_stack.COMPLETE) + + @mock.patch.object(raw_template_object.RawTemplate, 'delete') + @mock.patch.object(stack.Stack, 'store') + def test_mark_complete_stale_traversal(self, mock_store, mock_delete): + tmpl = template.Template({ + 'HeatTemplateFormatVersion': '2012-12-12', + 'Resources': { + 'foo': {'Type': 'GenericResourceType'} + } + }) + + tmpl_stack = stack.Stack(self.ctx, 'test', tmpl) + tmpl_stack.current_traversal = 'new-traversal' + tmpl_stack.mark_complete('old-traversal') + self.assertFalse(mock_delete.called) + self.assertIsNone(tmpl_stack.prev_raw_template_id) + self.assertFalse(mock_store.called) + class StackKwargsForCloningTest(common.HeatTestCase): scenarios = [ diff --git a/heat/tests/test_sync_point.py b/heat/tests/test_sync_point.py new file mode 100644 index 0000000000..54c553a811 --- /dev/null +++ b/heat/tests/test_sync_point.py @@ -0,0 +1,64 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from heat.engine import sync_point +from heat.tests import common +from heat.tests.engine import tools +from heat.tests import utils + + +class SyncPointTestCase(common.HeatTestCase): + def test_sync_waiting(self): + ctx = utils.dummy_context() + stack = tools.get_stack('test_stack', utils.dummy_context(), + template=tools.string_template_five, + convergence=True) + stack.converge_stack(stack.t, action=stack.CREATE) + resource = stack['C'] + graph = stack.convergence_dependencies.graph() + + sender = (4, True) + mock_callback = mock.Mock() + sync_point.sync(ctx, resource.id, stack.current_traversal, True, + mock_callback, set(graph[(resource.id, True)]), + {sender: None}) + updated_sync_point = sync_point.get(ctx, resource.id, + stack.current_traversal, True) + input_data = sync_point.deserialize_input_data( + updated_sync_point.input_data) + self.assertEqual({sender: None}, input_data) + self.assertFalse(mock_callback.called) + + def test_sync_non_waiting(self): + ctx = utils.dummy_context() + stack = tools.get_stack('test_stack', utils.dummy_context(), + template=tools.string_template_five, + convergence=True) + stack.converge_stack(stack.t, action=stack.CREATE) + resource = stack['A'] + graph = stack.convergence_dependencies.graph() + + sender = (3, True) + mock_callback = mock.Mock() + sync_point.sync(ctx, resource.id, stack.current_traversal, True, + mock_callback, set(graph[(resource.id, True)]), + {sender: None}) + updated_sync_point = sync_point.get(ctx, resource.id, + stack.current_traversal, True) + input_data = sync_point.deserialize_input_data( + updated_sync_point.input_data) + self.assertEqual({sender: None}, input_data) + self.assertTrue(mock_callback.called)