Enable resource signals be handled asynchronously

This is an attempt to fix bug #1394095 which reported that alarm_url can
be signaled only in a blocked way. The reason is that the resource_signal
API was a synchronous call. To avoid breakng CFN/OpenStack WaitCondition
behavior, a new 'sync_call' argument is added which default to False.

A not-so-related fix to test_engine_service.py (L2161-2163) was included
in this patch because without that fix, I was not able to pass the test
cases. It was so trivial, so I don't think it deserves a separate patch.

Closes-Bug: 1394095
Change-Id: I45e094630ef01b34d09248dbac17ac477933ef53
This commit is contained in:
tengqm 2015-01-04 16:02:32 +08:00
parent e930bc2887
commit 37ef669705
7 changed files with 94 additions and 29 deletions

View File

@ -30,7 +30,8 @@ class SignalController(object):
con,
stack_identity=dict(identity.stack()),
resource_name=identity.resource_name,
details=body)
details=body,
sync_call=True)
except Exception as ex:
return exception.map_remote_error(ex)

View File

@ -267,7 +267,7 @@ class EngineService(service.Service):
by the RPC caller.
"""
RPC_API_VERSION = '1.2'
RPC_API_VERSION = '1.3'
def __init__(self, host, topic, manager=None):
super(EngineService, self).__init__()
@ -1034,7 +1034,27 @@ class EngineService(service.Service):
with_attr=with_attr)
@request_context
def resource_signal(self, cnxt, stack_identity, resource_name, details):
def resource_signal(self, cnxt, stack_identity, resource_name, details,
sync_call=False):
'''
:param sync_call: indicates whether a synchronized call behavior is
expected. This is reserved for CFN WaitCondition
implementation.
'''
def _resource_signal(rsrc, details):
stack = rsrc.stack
LOG.debug("signaling resource %s:%s" % (stack.name, rsrc.name))
rsrc.signal(details)
# Refresh the metadata for all other resources, since signals can
# update metadata which is used by other resources, e.g
# when signalling a WaitConditionHandle resource, and other
# resources may refer to WaitCondition Fn::GetAtt Data
for r in stack.dependencies:
if r.name != rsrc.name and r.id is not None:
r.metadata_update()
s = self._get_stack(cnxt, stack_identity)
# This is not "nice" converting to the stored context here,
@ -1044,18 +1064,14 @@ class EngineService(service.Service):
stack = parser.Stack.load(cnxt, stack=s, use_stored_context=True)
self._verify_stack_resource(stack, resource_name)
if callable(stack[resource_name].signal):
stack[resource_name].signal(details)
# Refresh the metadata for all other resources, since signals can
# update metadata which is used by other resources, e.g
# when signalling a WaitConditionHandle resource, and other
# resources may refer to WaitCondition Fn::GetAtt Data
for res in stack.dependencies:
if res.name != resource_name and res.id is not None:
res.metadata_update()
return stack[resource_name].metadata_get()
rsrc = stack[resource_name]
if callable(rsrc.signal):
if sync_call:
_resource_signal(rsrc, details)
return rsrc.metadata_get()
else:
self.thread_group_mgr.start(stack.id, _resource_signal,
rsrc, details)
@request_context
def find_physical_resource(self, cnxt, physical_resource_id):

View File

@ -406,7 +406,8 @@ class EngineClient(object):
resource_name=resource_name,
metadata=metadata))
def resource_signal(self, ctxt, stack_identity, resource_name, details):
def resource_signal(self, ctxt, stack_identity, resource_name, details,
sync_call=False):
"""
Generate an alarm on the resource.
:param ctxt: RPC context.
@ -417,7 +418,10 @@ class EngineClient(object):
return self.call(ctxt, self.make_msg('resource_signal',
stack_identity=stack_identity,
resource_name=resource_name,
details=details))
details=details,
sync_call=sync_call),
version='1.3')
def create_watch_data(self, ctxt, watch_name, stats_data):
'''

View File

@ -2544,10 +2544,11 @@ class ResourceControllerTest(ControllerTest, common.HeatTestCase):
self.m.StubOutWithMock(rpc_client.EngineClient, 'call')
rpc_client.EngineClient.call(
req.context,
('resource_signal',
{'stack_identity': stack_identity,
('resource_signal', {'stack_identity': stack_identity,
'resource_name': res_name,
'details': 'Signal content'}))
'details': 'Signal content',
'sync_call': False}),
version='1.3')
self.m.ReplayAll()
result = self.controller.signal(req, tenant_id=self.tenant,

View File

@ -2158,9 +2158,9 @@ class StackServiceTest(common.HeatTestCase):
def test_list_resource_types_deprecated(self):
resources = self.eng.list_resource_types(self.ctx, "DEPRECATED")
self.assertEqual({'OS::Neutron::RouterGateway',
self.assertEqual(set(['OS::Neutron::RouterGateway',
'OS::Heat::CWLiteAlarm',
'OS::Heat::HARestarter'}, set(resources))
'OS::Heat::HARestarter']), set(resources))
def test_list_resource_types_supported(self):
resources = self.eng.list_resource_types(self.ctx, "SUPPORTED")
@ -2463,7 +2463,42 @@ class StackServiceTest(common.HeatTestCase):
self.m.VerifyAll()
def test_signal_reception(self):
def test_signal_reception_async(self):
stack = get_stack('signal_reception',
self.ctx,
policy_template)
self.stack = stack
setup_keystone_mocks(self.m, stack)
self.m.ReplayAll()
stack.store()
stack.create()
test_data = {'food': 'yum'}
self.m.StubOutWithMock(service.EngineService, '_get_stack')
s = db_api.stack_get(self.ctx, self.stack.id)
service.EngineService._get_stack(self.ctx,
self.stack.identifier()).AndReturn(s)
# Mock out the aync work of thread starting
self.eng.thread_group_mgr.groups[stack.id] = DummyThreadGroup()
self.m.StubOutWithMock(self.eng.thread_group_mgr, 'start')
self.eng.thread_group_mgr.start(stack.id,
mox.IgnoreArg(),
mox.IgnoreArg(),
mox.IgnoreArg()).AndReturn(None)
self.m.ReplayAll()
self.eng.resource_signal(self.ctx,
dict(self.stack.identifier()),
'WebServerScaleDownPolicy',
test_data)
self.m.VerifyAll()
self.stack.delete()
def test_signal_reception_sync(self):
stack = get_stack('signal_reception',
self.ctx,
policy_template)
@ -2486,7 +2521,9 @@ class StackServiceTest(common.HeatTestCase):
self.eng.resource_signal(self.ctx,
dict(self.stack.identifier()),
'WebServerScaleDownPolicy',
test_data)
test_data,
sync_call=True)
self.m.VerifyAll()
self.stack.delete()
@ -2541,7 +2578,9 @@ class StackServiceTest(common.HeatTestCase):
md = self.eng.resource_signal(self.ctx,
dict(self.stack.identifier()),
'WebServerScaleDownPolicy', None)
self.assertEqual(test_metadata, md)
self.eng.thread_group_mgr.groups[stack.id].wait()
self.assertIsNone(md)
self.m.VerifyAll()
@stack_context('service_metadata_test_stack')

View File

@ -215,6 +215,7 @@ class WaitCondMetadataUpdateTest(common.HeatTestCase):
self.patch('heat.engine.service.warnings')
self.man = service.EngineService('a-host', 'a-topic')
self.man.create_periodic_tasks()
cfg.CONF.set_default('heat_waitcondition_server_url',
'http://server.test:8000/v1/waitcondition')
@ -266,7 +267,8 @@ class WaitCondMetadataUpdateTest(common.HeatTestCase):
dict(self.stack.identifier()),
'WH',
{'Data': data, 'Reason': reason,
'Status': 'SUCCESS', 'UniqueId': id})
'Status': 'SUCCESS', 'UniqueId': id},
sync_call=True)
def post_success(sleep_time):
update_metadata('123', 'foo', 'bar')
@ -286,6 +288,7 @@ class WaitCondMetadataUpdateTest(common.HeatTestCase):
self.assertEqual('{"123": "foo"}', inst.metadata_get()['test'])
update_metadata('456', 'blarg', 'wibble')
self.assertEqual('{"123": "foo", "456": "blarg"}',
watch.FnGetAtt('Data'))
self.assertEqual('{"123": "foo"}',

View File

@ -247,7 +247,8 @@ class EngineRpcAPITestCase(testtools.TestCase):
self._test_engine_api('resource_signal', 'call',
stack_identity=self.identity,
resource_name='LogicalResourceId',
details={u'wordpress': []})
details={u'wordpress': []},
sync_call=True)
def test_create_watch_data(self):
self._test_engine_api('create_watch_data', 'call',