diff --git a/heat/api/cfn/v1/signal.py b/heat/api/cfn/v1/signal.py index 1d7f8e1a94..dd8a2081a6 100644 --- a/heat/api/cfn/v1/signal.py +++ b/heat/api/cfn/v1/signal.py @@ -30,7 +30,8 @@ class SignalController(object): con, stack_identity=dict(identity.stack()), resource_name=identity.resource_name, - details=body) + details=body, + sync_call=True) except Exception as ex: return exception.map_remote_error(ex) diff --git a/heat/engine/service.py b/heat/engine/service.py index e1fa3d7c4b..62c9f6fd03 100644 --- a/heat/engine/service.py +++ b/heat/engine/service.py @@ -267,7 +267,7 @@ class EngineService(service.Service): by the RPC caller. """ - RPC_API_VERSION = '1.2' + RPC_API_VERSION = '1.3' def __init__(self, host, topic, manager=None): super(EngineService, self).__init__() @@ -1034,7 +1034,27 @@ class EngineService(service.Service): with_attr=with_attr) @request_context - def resource_signal(self, cnxt, stack_identity, resource_name, details): + def resource_signal(self, cnxt, stack_identity, resource_name, details, + sync_call=False): + ''' + :param sync_call: indicates whether a synchronized call behavior is + expected. This is reserved for CFN WaitCondition + implementation. + ''' + + def _resource_signal(rsrc, details): + stack = rsrc.stack + LOG.debug("signaling resource %s:%s" % (stack.name, rsrc.name)) + rsrc.signal(details) + + # Refresh the metadata for all other resources, since signals can + # update metadata which is used by other resources, e.g + # when signalling a WaitConditionHandle resource, and other + # resources may refer to WaitCondition Fn::GetAtt Data + for r in stack.dependencies: + if r.name != rsrc.name and r.id is not None: + r.metadata_update() + s = self._get_stack(cnxt, stack_identity) # This is not "nice" converting to the stored context here, @@ -1044,18 +1064,14 @@ class EngineService(service.Service): stack = parser.Stack.load(cnxt, stack=s, use_stored_context=True) self._verify_stack_resource(stack, resource_name) - if callable(stack[resource_name].signal): - stack[resource_name].signal(details) - - # Refresh the metadata for all other resources, since signals can - # update metadata which is used by other resources, e.g - # when signalling a WaitConditionHandle resource, and other - # resources may refer to WaitCondition Fn::GetAtt Data - for res in stack.dependencies: - if res.name != resource_name and res.id is not None: - res.metadata_update() - - return stack[resource_name].metadata_get() + rsrc = stack[resource_name] + if callable(rsrc.signal): + if sync_call: + _resource_signal(rsrc, details) + return rsrc.metadata_get() + else: + self.thread_group_mgr.start(stack.id, _resource_signal, + rsrc, details) @request_context def find_physical_resource(self, cnxt, physical_resource_id): diff --git a/heat/rpc/client.py b/heat/rpc/client.py index 2e803d9918..c6d8f3e614 100644 --- a/heat/rpc/client.py +++ b/heat/rpc/client.py @@ -406,7 +406,8 @@ class EngineClient(object): resource_name=resource_name, metadata=metadata)) - def resource_signal(self, ctxt, stack_identity, resource_name, details): + def resource_signal(self, ctxt, stack_identity, resource_name, details, + sync_call=False): """ Generate an alarm on the resource. :param ctxt: RPC context. @@ -417,7 +418,10 @@ class EngineClient(object): return self.call(ctxt, self.make_msg('resource_signal', stack_identity=stack_identity, resource_name=resource_name, - details=details)) + details=details, + sync_call=sync_call), + + version='1.3') def create_watch_data(self, ctxt, watch_name, stats_data): ''' diff --git a/heat/tests/test_api_openstack_v1.py b/heat/tests/test_api_openstack_v1.py index a374306fd5..2894912be3 100644 --- a/heat/tests/test_api_openstack_v1.py +++ b/heat/tests/test_api_openstack_v1.py @@ -2544,10 +2544,11 @@ class ResourceControllerTest(ControllerTest, common.HeatTestCase): self.m.StubOutWithMock(rpc_client.EngineClient, 'call') rpc_client.EngineClient.call( req.context, - ('resource_signal', - {'stack_identity': stack_identity, - 'resource_name': res_name, - 'details': 'Signal content'})) + ('resource_signal', {'stack_identity': stack_identity, + 'resource_name': res_name, + 'details': 'Signal content', + 'sync_call': False}), + version='1.3') self.m.ReplayAll() result = self.controller.signal(req, tenant_id=self.tenant, diff --git a/heat/tests/test_engine_service.py b/heat/tests/test_engine_service.py index 27cae21613..f714c18764 100644 --- a/heat/tests/test_engine_service.py +++ b/heat/tests/test_engine_service.py @@ -2158,9 +2158,9 @@ class StackServiceTest(common.HeatTestCase): def test_list_resource_types_deprecated(self): resources = self.eng.list_resource_types(self.ctx, "DEPRECATED") - self.assertEqual({'OS::Neutron::RouterGateway', - 'OS::Heat::CWLiteAlarm', - 'OS::Heat::HARestarter'}, set(resources)) + self.assertEqual(set(['OS::Neutron::RouterGateway', + 'OS::Heat::CWLiteAlarm', + 'OS::Heat::HARestarter']), set(resources)) def test_list_resource_types_supported(self): resources = self.eng.list_resource_types(self.ctx, "SUPPORTED") @@ -2463,7 +2463,42 @@ class StackServiceTest(common.HeatTestCase): self.m.VerifyAll() - def test_signal_reception(self): + def test_signal_reception_async(self): + stack = get_stack('signal_reception', + self.ctx, + policy_template) + self.stack = stack + setup_keystone_mocks(self.m, stack) + self.m.ReplayAll() + stack.store() + stack.create() + test_data = {'food': 'yum'} + + self.m.StubOutWithMock(service.EngineService, '_get_stack') + s = db_api.stack_get(self.ctx, self.stack.id) + service.EngineService._get_stack(self.ctx, + self.stack.identifier()).AndReturn(s) + + # Mock out the aync work of thread starting + self.eng.thread_group_mgr.groups[stack.id] = DummyThreadGroup() + self.m.StubOutWithMock(self.eng.thread_group_mgr, 'start') + self.eng.thread_group_mgr.start(stack.id, + mox.IgnoreArg(), + mox.IgnoreArg(), + mox.IgnoreArg()).AndReturn(None) + + self.m.ReplayAll() + + self.eng.resource_signal(self.ctx, + dict(self.stack.identifier()), + 'WebServerScaleDownPolicy', + test_data) + + self.m.VerifyAll() + + self.stack.delete() + + def test_signal_reception_sync(self): stack = get_stack('signal_reception', self.ctx, policy_template) @@ -2486,7 +2521,9 @@ class StackServiceTest(common.HeatTestCase): self.eng.resource_signal(self.ctx, dict(self.stack.identifier()), 'WebServerScaleDownPolicy', - test_data) + test_data, + sync_call=True) + self.m.VerifyAll() self.stack.delete() @@ -2570,7 +2607,9 @@ class StackServiceTest(common.HeatTestCase): md = self.eng.resource_signal(self.ctx, dict(self.stack.identifier()), 'WebServerScaleDownPolicy', None) - self.assertEqual(test_metadata, md) + + self.eng.thread_group_mgr.groups[stack.id].wait() + self.assertIsNone(md) self.m.VerifyAll() @stack_context('service_metadata_test_stack') diff --git a/heat/tests/test_metadata_refresh.py b/heat/tests/test_metadata_refresh.py index 06db2c0682..6ad7e69272 100644 --- a/heat/tests/test_metadata_refresh.py +++ b/heat/tests/test_metadata_refresh.py @@ -215,6 +215,7 @@ class WaitCondMetadataUpdateTest(common.HeatTestCase): self.patch('heat.engine.service.warnings') self.man = service.EngineService('a-host', 'a-topic') + self.man.create_periodic_tasks() cfg.CONF.set_default('heat_waitcondition_server_url', 'http://server.test:8000/v1/waitcondition') @@ -266,7 +267,8 @@ class WaitCondMetadataUpdateTest(common.HeatTestCase): dict(self.stack.identifier()), 'WH', {'Data': data, 'Reason': reason, - 'Status': 'SUCCESS', 'UniqueId': id}) + 'Status': 'SUCCESS', 'UniqueId': id}, + sync_call=True) def post_success(sleep_time): update_metadata('123', 'foo', 'bar') @@ -286,6 +288,7 @@ class WaitCondMetadataUpdateTest(common.HeatTestCase): self.assertEqual('{"123": "foo"}', inst.metadata_get()['test']) update_metadata('456', 'blarg', 'wibble') + self.assertEqual('{"123": "foo", "456": "blarg"}', watch.FnGetAtt('Data')) self.assertEqual('{"123": "foo"}', diff --git a/heat/tests/test_rpc_client.py b/heat/tests/test_rpc_client.py index f164513863..75e94d8b2a 100644 --- a/heat/tests/test_rpc_client.py +++ b/heat/tests/test_rpc_client.py @@ -247,7 +247,8 @@ class EngineRpcAPITestCase(testtools.TestCase): self._test_engine_api('resource_signal', 'call', stack_identity=self.identity, resource_name='LogicalResourceId', - details={u'wordpress': []}) + details={u'wordpress': []}, + sync_call=True) def test_create_watch_data(self): self._test_engine_api('create_watch_data', 'call',