Event transport

Implement a new mechanism to allow specifying a target in the
environment to send events to. It adds zaqar as the first
implementation.

Depends-On: Ie04f9204f3ba0f75de32253f096f439c512cddee
Change-Id: Icfc3864e08693cb4b4f921641af380b39bcf0bc0
This commit is contained in:
Thomas Herve 2015-10-07 11:58:51 +02:00
parent 8e818e8823
commit 4f3246db45
19 changed files with 249 additions and 11 deletions

View File

@ -177,3 +177,17 @@ resource name. For example, the following entry pauses while creating
Clear hooks by signaling the resource with ``{unset_hook: pre-create}``
or ``{unset_hook: pre-update}``.
Retrieving events
-----------------
By default events are stored in the database and can be retrieved via the API.
Using the environment, you can register an endpoint which will receive events
produced by your stack, so that you don't have to poll Heat.
You can specify endpoints using the ``event_sinks`` property::
event_sinks:
- type: zaqar-queue
target: myqueue
ttl: 1200

View File

@ -18,10 +18,10 @@ from heat.common import template_format
SECTIONS = (
PARAMETERS, RESOURCE_REGISTRY, PARAMETER_DEFAULTS,
ENCRYPTED_PARAM_NAMES
ENCRYPTED_PARAM_NAMES, EVENT_SINKS
) = (
'parameters', 'resource_registry', 'parameter_defaults',
'encrypted_param_names'
'encrypted_param_names', 'event_sinks'
)
@ -59,7 +59,7 @@ def default_for_missing(env):
"""Checks a parsed environment for missing sections."""
for param in SECTIONS:
if param not in env:
if param == ENCRYPTED_PARAM_NAMES:
if param in (ENCRYPTED_PARAM_NAMES, EVENT_SINKS):
env[param] = []
else:
env[param] = {}

View File

@ -59,3 +59,17 @@ class ZaqarClientPlugin(client_plugin.ClientPlugin):
def is_not_found(self, ex):
return isinstance(ex, zaqar_errors.ResourceNotFound)
class ZaqarEventSink(object):
def __init__(self, target, ttl=None):
self._target = target
self._ttl = ttl
def consume(self, context, event):
zaqar_plugin = context.clients.client_plugin('zaqar')
zaqar = zaqar_plugin.client()
queue = zaqar.queue(self._target, auto_create=False)
ttl = self._ttl if self._ttl is not None else zaqar_plugin.DEFAULT_TTL
queue.post({'body': event, 'ttl': ttl})

View File

@ -571,17 +571,17 @@ class Environment(object):
env = {}
if user_env:
from heat.engine import resources
global_registry = resources.global_env().registry
global_env = resources.global_env()
global_registry = global_env.registry
event_sink_classes = global_env.event_sink_classes
else:
global_registry = None
event_sink_classes = {}
self.registry = ResourceRegistry(global_registry, self)
self.registry.load(env.get(env_fmt.RESOURCE_REGISTRY, {}))
if env_fmt.PARAMETER_DEFAULTS in env:
self.param_defaults = env[env_fmt.PARAMETER_DEFAULTS]
else:
self.param_defaults = {}
self.param_defaults = env.get(env_fmt.PARAMETER_DEFAULTS, {})
self.encrypted_param_names = env.get(env_fmt.ENCRYPTED_PARAM_NAMES, [])
@ -590,7 +590,13 @@ class Environment(object):
else:
self.params = dict((k, v) for (k, v) in six.iteritems(env)
if k not in (env_fmt.PARAMETER_DEFAULTS,
env_fmt.ENCRYPTED_PARAM_NAMES,
env_fmt.EVENT_SINKS,
env_fmt.RESOURCE_REGISTRY))
self.event_sink_classes = event_sink_classes
self._event_sinks = []
self._built_event_sinks = []
self._update_event_sinks(env.get(env_fmt.EVENT_SINKS, []))
self.constraints = {}
self.stack_lifecycle_plugins = []
@ -599,13 +605,15 @@ class Environment(object):
self.params.update(env_snippet.get(env_fmt.PARAMETERS, {}))
self.param_defaults.update(
env_snippet.get(env_fmt.PARAMETER_DEFAULTS, {}))
self._update_event_sinks(env_snippet.get(env_fmt.EVENT_SINKS, []))
def user_env_as_dict(self):
"""Get the environment as a dict, ready for storing in the db."""
return {env_fmt.RESOURCE_REGISTRY: self.registry.as_dict(),
env_fmt.PARAMETERS: self.params,
env_fmt.PARAMETER_DEFAULTS: self.param_defaults,
env_fmt.ENCRYPTED_PARAM_NAMES: self.encrypted_param_names}
env_fmt.ENCRYPTED_PARAM_NAMES: self.encrypted_param_names,
env_fmt.EVENT_SINKS: self._event_sinks}
def register_class(self, resource_type, resource_class, path=None):
self.registry.register_class(resource_type, resource_class, path=path)
@ -618,6 +626,9 @@ class Environment(object):
self.stack_lifecycle_plugins.append((stack_lifecycle_name,
stack_lifecycle_class))
def register_event_sink(self, event_sink_name, event_sink_class):
self.event_sink_classes[event_sink_name] = event_sink_class
def get_class(self, resource_type, resource_name=None, files=None):
return self.registry.get_class(resource_type, resource_name,
files=files)
@ -647,6 +658,17 @@ class Environment(object):
def get_stack_lifecycle_plugins(self):
return self.stack_lifecycle_plugins
def _update_event_sinks(self, sinks):
self._event_sinks.extend(sinks)
for sink in sinks:
sink = sink.copy()
sink_class = sink.pop('type')
sink_class = self.event_sink_classes[sink_class]
self._built_event_sinks.append(sink_class(**sink))
def get_event_sinks(self):
return self._built_event_sinks
def get_child_environment(parent_env, child_params, item_to_remove=None,
child_resource_name=None):

View File

@ -103,6 +103,8 @@ class Event(object):
ev['resource_properties'] = {'Error': err}
new_ev = event_object.Event.create(self.context, ev)
self.id = new_ev.id
self.timestamp = new_ev.created_at
self.uuid = new_ev.uuid
return self.id
def identifier(self):
@ -114,3 +116,22 @@ class Event(object):
resource_name=self.resource_name, **self.stack.identifier())
return identifier.EventIdentifier(event_id=str(self.uuid), **res_id)
def as_dict(self):
return {
'timestamp': self.timestamp.isoformat(),
'version': '0.1',
'type': 'os.heat.event',
'id': self.uuid,
'payload': {
'resource_name': self.resource_name,
'physical_resource_id': self.physical_resource_id,
'stack_id': self.stack.id,
'resource_action': self.action,
'resource_status': self.status,
'resource_status_reason': self.reason,
'resource_type': self.resource_type,
'resource_properties': self.resource_properties,
'version': '0.1'
}
}

View File

@ -1338,6 +1338,7 @@ class Resource(object):
self.name, self.type())
ev.store()
self.stack.dispatch_event(ev)
def _store_or_update(self, action, status, reason):
prev_action = self.action

View File

@ -34,6 +34,11 @@ def _register_stack_lifecycle_plugins(env, type_pairs):
stack_lifecycle_class)
def _register_event_sinks(env, type_pairs):
for sink_name, sink_class in type_pairs:
env.register_event_sink(sink_name, sink_class)
def _get_mapping(namespace):
mgr = extension.ExtensionManager(
namespace=namespace,
@ -73,6 +78,9 @@ def _load_global_resources(env):
_register_stack_lifecycle_plugins(
env,
_get_mapping('heat.stack_lifecycle_plugins'))
_register_event_sinks(
env,
_get_mapping('heat.event_sinks'))
manager = plugin_manager.PluginManager(__name__)
# Sometimes resources should not be available for registration in Heat due

View File

@ -183,6 +183,8 @@ class ThreadGroupManager(object):
else:
lock.release()
# Link to self to allow the stack to run tasks
stack.thread_group_mgr = self
th = self.start(stack.id, func, *args, **kwargs)
th.link(release)
return th

View File

@ -174,6 +174,7 @@ class Stack(collections.Mapping):
self.cache_data = cache_data
self._worker_client = None
self._convg_deps = None
self.thread_group_mgr = None
# strict_validate can be used to disable value validation
# in the resource properties schema, this is useful when
@ -750,6 +751,20 @@ class Stack(collections.Mapping):
self.name, 'OS::Heat::Stack')
ev.store()
self.dispatch_event(ev)
def dispatch_event(self, ev):
def _dispatch(ctx, sinks, ev):
try:
for sink in sinks:
sink.consume(ctx, ev)
except Exception as e:
LOG.debug('Got error sending events %s' % e)
if self.thread_group_mgr is not None:
self.thread_group_mgr.start(self.id, _dispatch,
self.context,
self.env.get_event_sinks(),
ev.as_dict())
@profiler.trace('Stack.state_set', hide_args=False)
def state_set(self, action, status, reason):

View File

@ -135,6 +135,7 @@ blarg: wibble
body = {'parameters': params,
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}}
data = stacks.InstantiationData(body)
self.assertEqual(body, data.environment())
@ -152,6 +153,7 @@ blarg: wibble
'foo': 'bar'},
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}}
data = stacks.InstantiationData(body)
self.assertEqual(expect, data.environment())
@ -168,6 +170,7 @@ blarg: wibble
'tester': 'Yes'},
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}}
data = stacks.InstantiationData(body)
self.assertEqual(expect, data.environment())
@ -183,7 +186,8 @@ blarg: wibble
body = {'not the environment': env}
data = stacks.InstantiationData(body)
self.assertEqual({'parameters': {}, 'encrypted_param_names': [],
'parameter_defaults': {}, 'resource_registry': {}},
'parameter_defaults': {}, 'resource_registry': {},
'event_sinks': []},
data.environment())
def test_args(self):
@ -650,6 +654,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': parameters,
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {},
'args': {'timeout_mins': 30},
@ -695,6 +700,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': parameters,
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {},
'args': {'timeout_mins': 30, 'tags': ['tag1', 'tag2']},
@ -757,6 +763,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': parameters,
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {},
'args': {'timeout_mins': 30,
@ -846,6 +853,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': parameters,
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {'my.yaml': 'This is the file contents.'},
'args': {'timeout_mins': 30},
@ -891,6 +899,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': parameters,
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {},
'args': {'timeout_mins': 30},
@ -909,6 +918,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': parameters,
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {},
'args': {'timeout_mins': 30},
@ -927,6 +937,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': parameters,
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {},
'args': {'timeout_mins': 30},
@ -985,6 +996,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': parameters,
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {},
'args': {'timeout_mins': 30},
@ -1069,6 +1081,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': parameters,
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {},
'args': {'timeout_mins': 30},
@ -1153,6 +1166,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': parameters,
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {},
'args': {'timeout_mins': 30, 'tags': ['tag1', 'tag2']}})
@ -1187,6 +1201,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': parameters,
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {},
'args': {'timeout_mins': 30}}),
@ -1227,6 +1242,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': parameters,
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {},
'args': {rpc_api.PARAM_EXISTING: True,
@ -1587,6 +1603,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': parameters,
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {},
'args': {'timeout_mins': 30}})
@ -1624,6 +1641,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': parameters,
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {},
'args': {'timeout_mins': 30, 'tags': ['tag1', 'tag2']}})
@ -1661,6 +1679,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {u'parameters': parameters,
u'encrypted_param_names': [],
u'parameter_defaults': {},
u'event_sinks': [],
u'resource_registry': {}},
'files': {},
'args': {'timeout_mins': 30}})
@ -1745,6 +1764,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': {},
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {},
'args': {rpc_api.PARAM_EXISTING: True,
@ -1781,6 +1801,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': {},
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {},
'args': {rpc_api.PARAM_EXISTING: True,
@ -1818,6 +1839,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': {},
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {},
'args': {rpc_api.PARAM_EXISTING: True,
@ -1856,6 +1878,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': parameters,
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {},
'args': {rpc_api.PARAM_EXISTING: True,
@ -1919,6 +1942,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': {},
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {},
'args': {rpc_api.PARAM_EXISTING: True,
@ -1960,6 +1984,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': parameters,
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {},
'args': {rpc_api.PARAM_EXISTING: True,
@ -2099,6 +2124,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': {},
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {},
'show_nested': False}),
@ -2127,6 +2153,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'params': {'parameters': {},
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {}},
'files': {},
'show_nested': False}),

View File

@ -11,6 +11,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
from heat.engine.clients.os import zaqar
from heat.tests import common
from heat.tests import utils
@ -32,3 +36,13 @@ class ZaqarClientPluginTests(common.HeatTestCase):
client = plugin.create_for_tenant('other_tenant')
self.assertEqual('other_tenant',
client.conf['auth_opts']['options']['os_project_id'])
def test_event_sink(self):
context = utils.dummy_context()
client = context.clients.client('zaqar')
fake_queue = mock.MagicMock()
client.queue = lambda x, auto_create: fake_queue
sink = zaqar.ZaqarEventSink('myqueue')
sink.consume(context, {'hello': 'world'})
fake_queue.post.assert_called_once_with(
{'body': {'hello': 'world'}, 'ttl': 3600})

View File

@ -104,6 +104,7 @@ class ServiceStackUpdateTest(common.HeatTestCase):
stack_name = 'service_update_test_stack_existing_parameters'
update_params = {'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'parameters': {'newparam': 123},
'resource_registry': {'resources': {}}}
api_args = {rpc_api.PARAM_TIMEOUT: 60,
@ -138,6 +139,7 @@ class ServiceStackUpdateTest(common.HeatTestCase):
stack_name = 'service_update_test_stack_existing_parameters_remove'
update_params = {'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'parameters': {'newparam': 123},
'resource_registry': {'resources': {}}}
api_args = {rpc_api.PARAM_TIMEOUT: 60,
@ -177,6 +179,7 @@ class ServiceStackUpdateTest(common.HeatTestCase):
intial_params = {'encrypted_param_names': [],
'parameter_defaults': {},
'parameters': {},
'event_sinks': [],
'resource_registry': intital_registry}
initial_files = {'foo.yaml': 'foo',
'foo2.yaml': 'foo2',
@ -207,6 +210,7 @@ class ServiceStackUpdateTest(common.HeatTestCase):
expected_env = {'encrypted_param_names': [],
'parameter_defaults': {},
'parameters': {},
'event_sinks': [],
'resource_registry': expected_reg}
# FIXME(shardy): Currently we don't prune unused old files
expected_files = {'foo.yaml': 'foo',
@ -257,6 +261,7 @@ class ServiceStackUpdateTest(common.HeatTestCase):
'mydefault': 123,
'default2': 456},
'parameters': {},
'event_sinks': [],
'resource_registry': {'resources': {}}}
with mock.patch('heat.engine.stack.Stack') as mock_stack:
stk.update = mock.Mock()

View File

@ -190,3 +190,14 @@ class FakeKeystoneClient(object):
return self.context.auth_plugin.get_token(self.session)
else:
return self.token
class FakeEventSink(object):
def __init__(self, evt):
self.events = []
self.evt = evt
def consume(self, stack, event):
self.events.append(event)
self.evt.send(None)

View File

@ -43,6 +43,7 @@ class EnvironmentTest(common.HeatTestCase):
expected = {u'parameters': old,
u'encrypted_param_names': [],
u'parameter_defaults': {},
u'event_sinks': [],
u'resource_registry': {u'resources': {}}}
env = environment.Environment(old)
self.assertEqual(expected, env.user_env_as_dict())
@ -51,6 +52,7 @@ class EnvironmentTest(common.HeatTestCase):
new_env = {u'parameters': {u'a': u'ff', u'b': u'ss'},
u'encrypted_param_names': [],
u'parameter_defaults': {u'ff': 'new_def'},
u'event_sinks': [],
u'resource_registry': {u'OS::Food': u'fruity.yaml',
u'resources': {}}}
env = environment.Environment(new_env)
@ -212,6 +214,23 @@ def constraint_mapping():
env.get_constraint("nova.flavor").__name__)
self.assertIs(None, env.get_constraint("no_constraint"))
def test_event_sinks(self):
env = environment.Environment(
{"event_sinks": [{"type": "zaqar-queue", "target": "myqueue"}]})
self.assertEqual([{"type": "zaqar-queue", "target": "myqueue"}],
env.user_env_as_dict()["event_sinks"])
sinks = env.get_event_sinks()
self.assertEqual(1, len(sinks))
self.assertEqual("myqueue", sinks[0]._target)
def test_event_sinks_load(self):
env = environment.Environment()
self.assertEqual([], env.get_event_sinks())
env.load(
{"event_sinks": [{"type": "zaqar-queue", "target": "myqueue"}]})
self.assertEqual([{"type": "zaqar-queue", "target": "myqueue"}],
env.user_env_as_dict()["event_sinks"])
class EnvironmentDuplicateTest(common.HeatTestCase):
@ -491,6 +510,7 @@ class ChildEnvTest(common.HeatTestCase):
expected = {'parameters': new_params,
'encrypted_param_names': [],
'parameter_defaults': {},
'event_sinks': [],
'resource_registry': {'resources': {}}}
cenv = environment.get_child_environment(penv, new_params)
self.assertEqual(expected, cenv.user_env_as_dict())
@ -500,6 +520,7 @@ class ChildEnvTest(common.HeatTestCase):
penv = environment.Environment()
expected = {'parameter_defaults': {},
'encrypted_param_names': [],
'event_sinks': [],
'resource_registry': {'resources': {}}}
expected.update(new_params)
cenv = environment.get_child_environment(penv, new_params)
@ -511,6 +532,7 @@ class ChildEnvTest(common.HeatTestCase):
penv = environment.Environment(env=parent_params)
expected = {'parameter_defaults': {},
'encrypted_param_names': [],
'event_sinks': [],
'resource_registry': {'resources': {}}}
expected.update(new_params)
cenv = environment.get_child_environment(penv, new_params)

View File

@ -26,6 +26,7 @@ class YamlEnvironmentTest(common.HeatTestCase):
parameters: {}
encrypted_param_names: []
parameter_defaults: {}
event_sinks: []
resource_registry: {}
'''
tpl1 = environment_format.parse(yaml1)

View File

@ -188,8 +188,9 @@ class EventTest(EventCommon):
'wibble', self.resource.properties,
self.resource.name, self.resource.type())
e.store()
self.assertIsNone(e.identifier())
e.store()
self.assertIsNotNone(e.identifier())
def test_badprop(self):
rname = 'bad_resource'
@ -202,6 +203,28 @@ class EventTest(EventCommon):
'wibble', res.properties, res.name, res.type())
self.assertIn('Error', e.resource_properties)
def test_as_dict(self):
e = event.Event(self.ctx, self.stack, 'TEST', 'IN_PROGRESS', 'Testing',
'wibble', self.resource.properties,
self.resource.name, self.resource.type())
e.store()
expected = {
'id': e.uuid,
'timestamp': e.timestamp.isoformat(),
'type': 'os.heat.event',
'version': '0.1',
'payload': {'physical_resource_id': 'wibble',
'resource_action': 'TEST',
'resource_name': 'EventTestResource',
'resource_properties': {'Foo': 'goo'},
'resource_status': 'IN_PROGRESS',
'resource_status_reason': 'Testing',
'resource_type': 'ResourceWithRequiredProps',
'stack_id': self.stack.id,
'version': '0.1'}}
self.assertEqual(expected, e.as_dict())
class EventTestProps(EventCommon):

View File

@ -14,6 +14,7 @@
import collections
import copy
import datetime
import eventlet
import json
import time
@ -33,6 +34,7 @@ from heat.engine import environment
from heat.engine import function
from heat.engine import resource
from heat.engine import scheduler
from heat.engine import service
from heat.engine import stack
from heat.engine import template
from heat.objects import raw_template as raw_template_object
@ -2251,6 +2253,37 @@ class StackTest(common.HeatTestCase):
self.assertEqual('foo', params.get('param1'))
self.assertEqual('bar', params.get('param2'))
def test_event_dispatch(self):
env = environment.Environment()
evt = eventlet.event.Event()
sink = fakes.FakeEventSink(evt)
env.register_event_sink('dummy', lambda: sink)
env.load({"event_sinks": [{"type": "dummy"}]})
stk = stack.Stack(self.ctx, 'test',
template.Template(empty_template, env=env))
stk.thread_group_mgr = service.ThreadGroupManager()
self.addCleanup(stk.thread_group_mgr.stop, stk.id)
stk.store()
stk._add_event('CREATE', 'IN_PROGRESS', '')
evt.wait()
expected = [{
'id': mock.ANY,
'timestamp': mock.ANY,
'type': 'os.heat.event',
'version': '0.1',
'payload': {
'physical_resource_id': stk.id,
'resource_action': 'CREATE',
'resource_name': 'test',
'resource_properties': {},
'resource_status': 'IN_PROGRESS',
'resource_status_reason': '',
'resource_type':
'OS::Heat::Stack',
'stack_id': stk.id,
'version': '0.1'}}]
self.assertEqual(expected, sink.events)
@mock.patch.object(stack_object.Stack, 'delete')
@mock.patch.object(raw_template_object.RawTemplate, 'delete')
def test_mark_complete_create(self, mock_tmpl_delete, mock_stack_delete):

View File

@ -819,6 +819,7 @@ class WithTemplateTest(StackResourceBaseTest):
def test_create_with_template(self):
child_env = {'parameter_defaults': {},
'event_sinks': [],
'parameters': self.params,
'resource_registry': {'resources': {}},
'encrypted_param_names': []}
@ -855,6 +856,7 @@ class WithTemplateTest(StackResourceBaseTest):
self.parent_resource._nested = nested
child_env = {'parameter_defaults': {},
'event_sinks': [],
'parameters': self.params,
'resource_registry': {'resources': {}},
'encrypted_param_names': []}

View File

@ -122,6 +122,9 @@ heat.constraints =
heat.stack_lifecycle_plugins =
heat.event_sinks =
zaqar-queue = heat.engine.clients.os.zaqar:ZaqarEventSink
heat.templates =
heat_template_version.2013-05-23 = heat.engine.hot.template:HOTemplate20130523
heat_template_version.2014-10-16 = heat.engine.hot.template:HOTemplate20141016