diff --git a/heat/engine/resources/openstack/zaqar/subscription.py b/heat/engine/resources/openstack/zaqar/subscription.py index 2246e6f59e..457b869011 100644 --- a/heat/engine/resources/openstack/zaqar/subscription.py +++ b/heat/engine/resources/openstack/zaqar/subscription.py @@ -18,6 +18,8 @@ from heat.engine import properties from heat.engine import resource from heat.engine import support +from oslo_serialization import jsonutils + class ZaqarSubscription(resource.Resource): """A resource for managing Zaqar subscriptions. @@ -71,20 +73,34 @@ class ZaqarSubscription(resource.Resource): 'trust+https'] def validate(self): + super(ZaqarSubscription, self).validate() + self._validate_subscriber() + + def _validate_subscriber(self): subscriber_type = self.properties[self.SUBSCRIBER].split(":", 1)[0] if subscriber_type not in self.VALID_SUBSCRIBER_TYPES: msg = (_("The subscriber type of must be one of: %s.") % ", ".join(self.VALID_SUBSCRIBER_TYPES)) raise exception.StackValidationFailed(message=msg) + def _subscriber_url(self): + return self.properties[self.SUBSCRIBER] + + def _subscription_options(self): + return self.properties[self.OPTIONS] + + def _subscription_data(self): + return { + 'subscriber': self._subscriber_url(), + 'ttl': self.properties[self.TTL], + 'options': self._subscription_options(), + } + def handle_create(self): """Create a subscription to a Zaqar message queue.""" subscription = self.client().subscription( self.properties[self.QUEUE_NAME], - subscriber=self.properties[self.SUBSCRIBER], - ttl=self.properties[self.TTL], - options=self.properties[self.OPTIONS] - ) + **self._subscription_data()) self.resource_id_set(subscription.id) def _get_subscription(self): @@ -96,8 +112,10 @@ class ZaqarSubscription(resource.Resource): def handle_update(self, json_snippet, tmpl_diff, prop_diff): """Update a subscription to a Zaqar message queue.""" + self.properties = json_snippet.properties(self.properties_schema, + self.context) subscription = self._get_subscription() - subscription.update(prop_diff) + subscription.update(self._subscription_data()) def handle_delete(self): try: @@ -120,7 +138,97 @@ class ZaqarSubscription(resource.Resource): } +class MistralTrigger(ZaqarSubscription): + """A Zaqar subscription for triggering Mistral workflows. + + This Zaqar subscription type listens for messages in a queue and triggers a + Mistral workflow execution each time one is received. + + The content of the Zaqar message is passed to the workflow in the + environment with the name "notification", and thus is accessible from + within the workflow as: + + <% env().notification %> + + Other environment variables can be set using the 'env' key in the params + property. + """ + + support_status = support.SupportStatus(version='8.0.0', + status=support.SUPPORTED) + + PROPERTIES = ( + QUEUE_NAME, TTL, + WORKFLOW_ID, PARAMS, INPUT, + ) = ( + ZaqarSubscription.QUEUE_NAME, ZaqarSubscription.TTL, + 'workflow_id', 'params', 'input', + ) + + properties_schema = { + QUEUE_NAME: ZaqarSubscription.properties_schema[QUEUE_NAME], + TTL: ZaqarSubscription.properties_schema[TTL], + WORKFLOW_ID: properties.Schema( + properties.Schema.STRING, + _("UUID of the Mistral workflow to trigger."), + required=True, + constraints=[constraints.CustomConstraint('mistral.workflow')], + update_allowed=True), + PARAMS: properties.Schema( + properties.Schema.MAP, + _("Parameters to pass to the Mistral workflow execution. " + "The parameters depend on the workflow type."), + required=False, + default={}, + update_allowed=True), + INPUT: properties.Schema( + properties.Schema.MAP, + _("Input values to pass to the Mistral workflow."), + required=False, + default={}, + update_allowed=True), + } + + def _validate_subscriber(self): + pass + + def _subscriber_url(self): + mistral_client = self.client('mistral') + return 'trust+%s/executions' % mistral_client.http_client.base_url + + def _subscription_options(self): + params = dict(self.properties[self.PARAMS]) + params.setdefault('env', {}) + params['env']['notification'] = "$zaqar_message$" + post_data = { + self.WORKFLOW_ID: self.properties[self.WORKFLOW_ID], + self.PARAMS: params, + self.INPUT: self.properties[self.INPUT], + } + return { + 'post_data': jsonutils.dumps(post_data) + } + + def parse_live_resource_data(self, resource_properties, resource_data): + options = resource_data.get(self.OPTIONS, {}) + post_data = jsonutils.loads(options.get('post_data', '{}')) + params = post_data.get(self.PARAMS, {}) + env = params.get('env', {}) + env.pop('notification', None) + if not env: + params.pop('env', None) + + return { + self.QUEUE_NAME: resource_data.get(self.QUEUE_NAME), + self.TTL: resource_data.get(self.TTL), + self.WORKFLOW_ID: post_data.get(self.WORKFLOW_ID), + self.PARAMS: params, + self.INPUT: post_data.get(self.INPUT), + } + + def resource_mapping(): return { 'OS::Zaqar::Subscription': ZaqarSubscription, + 'OS::Zaqar::MistralTrigger': MistralTrigger, } diff --git a/heat/tests/openstack/zaqar/test_subscription.py b/heat/tests/openstack/zaqar/test_subscription.py index 616471fba5..d3ffe39c1c 100644 --- a/heat/tests/openstack/zaqar/test_subscription.py +++ b/heat/tests/openstack/zaqar/test_subscription.py @@ -16,6 +16,7 @@ import six from heat.common import exception from heat.common import template_format +from heat.engine.clients.os import mistral as mistral_client_plugin from heat.engine import resource from heat.engine import scheduler from heat.engine import stack @@ -23,15 +24,17 @@ from heat.engine import template from heat.tests import common from heat.tests import utils +from oslo_serialization import jsonutils + try: from zaqarclient.transport.errors import ResourceNotFound # noqa except ImportError: - ResourceNotFound = Exception + class ResourceNotFound(Exception): + pass -wp_template = ''' +subscr_template = ''' { "AWSTemplateFormatVersion" : "2010-09-09", - "Description" : "openstack Zaqar queue service as a resource", "Resources" : { "MyQueue2" : { "Type" : "OS::Zaqar::Queue", @@ -53,6 +56,22 @@ wp_template = ''' } ''' +mistral_template = ''' +{ + "heat_template_version" : "2015-10-15", + "resources" : { + "subscription" : { + "type" : "OS::Zaqar::MistralTrigger", + "properties" : { + "queue_name" : "myqueue", + "workflow_id": "abcd", + "input" : { "key1" : "value1" } + } + } + } +} +''' + class FakeSubscription(object): def __init__(self, queue_name, id=None, ttl=None, subscriber=None, @@ -64,7 +83,10 @@ class FakeSubscription(object): self.options = options def update(self, prop_diff): - pass + allowed_keys = {'subscriber', 'ttl', 'options'} + for key in six.iterkeys(prop_diff): + if key not in allowed_keys: + raise KeyError(key) def delete(self): pass @@ -86,7 +108,7 @@ class ZaqarSubscriptionTest(common.HeatTestCase): self.stack.store() def test_validate_subscriber_type(self, mock_client, mock_plugin): - t = template_format.parse(wp_template) + t = template_format.parse(subscr_template) t['Resources']['MySubscription']['Properties']['subscriber'] = "foo:ba" stack_name = 'test_stack' tmpl = template.Template(t) @@ -99,7 +121,7 @@ class ZaqarSubscriptionTest(common.HeatTestCase): six.text_type(exc)) def test_create(self, mock_client, mock_plugin): - t = template_format.parse(wp_template) + t = template_format.parse(subscr_template) self.parse_stack(t) subscr = self.stack['MySubscription'] @@ -124,7 +146,7 @@ class ZaqarSubscriptionTest(common.HeatTestCase): self.m.VerifyAll() def test_delete(self, mock_client, mock_plugin): - t = template_format.parse(wp_template) + t = template_format.parse(subscr_template) self.parse_stack(t) subscr = self.stack['MySubscription'] @@ -154,7 +176,7 @@ class ZaqarSubscriptionTest(common.HeatTestCase): self.m.VerifyAll() def test_delete_not_found(self, mock_client, mock_plugin): - t = template_format.parse(wp_template) + t = template_format.parse(subscr_template) self.parse_stack(t) subscr = self.stack['MySubscription'] @@ -182,7 +204,7 @@ class ZaqarSubscriptionTest(common.HeatTestCase): self.m.VerifyAll() def test_update_in_place(self, mock_client, mock_plugin): - t = template_format.parse(wp_template) + t = template_format.parse(subscr_template) self.parse_stack(t) subscr = self.stack['MySubscription'] @@ -202,11 +224,12 @@ class ZaqarSubscriptionTest(common.HeatTestCase): id=subscr_id, auto_create=False).AndReturn(fake_subscr) self.m.StubOutWithMock(fake_subscr, 'update') - fake_subscr.update({'ttl': 3601}) + fake_subscr.update({'ttl': 3601, 'options': {'key1': 'value1'}, + 'subscriber': 'mailto:name@domain.com'}) self.m.ReplayAll() - t = template_format.parse(wp_template) + t = template_format.parse(subscr_template) new_subscr = t['Resources']['MySubscription'] new_subscr['Properties']['ttl'] = "3601" resource_defns = template.Template(t).resource_definitions(self.stack) @@ -217,7 +240,7 @@ class ZaqarSubscriptionTest(common.HeatTestCase): self.m.VerifyAll() def test_update_replace(self, mock_client, mock_plugin): - t = template_format.parse(wp_template) + t = template_format.parse(subscr_template) self.parse_stack(t) subscr = self.stack['MySubscription'] @@ -236,7 +259,7 @@ class ZaqarSubscriptionTest(common.HeatTestCase): self.m.ReplayAll() - t = template_format.parse(wp_template) + t = template_format.parse(subscr_template) t['Resources']['MySubscription']['Properties']['queue_name'] = 'foo' resource_defns = template.Template(t).resource_definitions(self.stack) new_subscr = resource_defns['MySubscription'] @@ -251,7 +274,7 @@ class ZaqarSubscriptionTest(common.HeatTestCase): self.m.VerifyAll() def test_show_resource(self, mock_client, mock_plugin): - t = template_format.parse(wp_template) + t = template_format.parse(subscr_template) self.parse_stack(t) subscr = self.stack['MySubscription'] @@ -291,3 +314,218 @@ class ZaqarSubscriptionTest(common.HeatTestCase): subscr._show_resource())) self.m.VerifyAll() + + +class JsonString(object): + def __init__(self, data): + self._data = data + + def __eq__(self, other): + return self._data == jsonutils.loads(other) + + def __str__(self): + return jsonutils.dumps(self._data) + + def __repr__(self): + return str(self) + + +@mock.patch.object(resource.Resource, "client_plugin") +@mock.patch.object(resource.Resource, "client") +class ZaqarMistralTriggerTest(common.HeatTestCase): + def setUp(self): + super(ZaqarMistralTriggerTest, self).setUp() + self.fc = self.m.CreateMockAnything() + self.ctx = utils.dummy_context() + self.patchobject(mistral_client_plugin.WorkflowConstraint, + 'validate', return_value=True) + + stack_name = 'test_stack' + t = template_format.parse(mistral_template) + tmpl = template.Template(t) + self.stack = stack.Stack(self.ctx, stack_name, tmpl) + self.stack.validate() + self.stack.store() + + def client(name='zaqar'): + if name == 'mistral': + client = mock.Mock() + client.http_client = mock.Mock() + client.http_client.base_url = 'http://mistral.example.net:8989' + return client + elif name == 'zaqar': + return self.fc + + self.subscr = self.stack['subscription'] + self.subscr.client = mock.Mock(side_effect=client) + + self.subscriber = 'trust+http://mistral.example.net:8989/executions' + self.options = { + 'post_data': JsonString({ + 'workflow_id': 'abcd', + 'input': {"key1": "value1"}, + 'params': {'env': {'notification': '$zaqar_message$'}}, + }) + } + + def test_create(self, mock_client, mock_plugin): + subscr = self.subscr + subscr_id = "58138648c1e2eb7355d62137" + + fake_subscr = FakeSubscription(subscr.properties['queue_name'], + subscr_id) + self.m.StubOutWithMock(self.fc, 'subscription') + self.fc.subscription(subscr.properties['queue_name'], + options=self.options, + subscriber=self.subscriber, + ttl=220367260800).AndReturn(fake_subscr) + + self.m.ReplayAll() + + scheduler.TaskRunner(subscr.create)() + self.assertEqual(subscr_id, subscr.FnGetRefId()) + + self.m.VerifyAll() + + def test_delete(self, mock_client, mock_plugin): + subscr = self.subscr + subscr_id = "58138648c1e2eb7355d62137" + + fake_subscr = FakeSubscription(subscr.properties['queue_name'], + subscr_id) + self.m.StubOutWithMock(self.fc, 'subscription') + self.fc.subscription(subscr.properties['queue_name'], + options=self.options, + subscriber=self.subscriber, + ttl=220367260800).AndReturn(fake_subscr) + self.fc.subscription(subscr.properties['queue_name'], + id=subscr_id, + auto_create=False).AndReturn(fake_subscr) + self.m.StubOutWithMock(fake_subscr, 'delete') + fake_subscr.delete() + + self.m.ReplayAll() + + scheduler.TaskRunner(subscr.create)() + scheduler.TaskRunner(subscr.delete)() + + self.m.VerifyAll() + + def test_delete_not_found(self, mock_client, mock_plugin): + subscr = self.subscr + subscr_id = "58138648c1e2eb7355d62137" + + fake_subscr = FakeSubscription(subscr.properties['queue_name'], + subscr_id) + self.m.StubOutWithMock(self.fc, 'subscription') + self.fc.subscription(subscr.properties['queue_name'], + options=self.options, + subscriber=self.subscriber, + ttl=220367260800).AndReturn(fake_subscr) + self.fc.subscription(subscr.properties['queue_name'], + id=subscr_id, + auto_create=False).AndRaise(ResourceNotFound()) + + self.m.ReplayAll() + + scheduler.TaskRunner(subscr.create)() + scheduler.TaskRunner(subscr.delete)() + + self.m.VerifyAll() + + def test_update_in_place(self, mock_client, mock_plugin): + subscr = self.subscr + subscr_id = "58138648c1e2eb7355d62137" + + fake_subscr = FakeSubscription(subscr.properties['queue_name'], + subscr_id) + self.m.StubOutWithMock(self.fc, 'subscription') + self.fc.subscription(subscr.properties['queue_name'], + options=self.options, + subscriber=self.subscriber, + ttl=220367260800).AndReturn(fake_subscr) + self.fc.subscription(subscr.properties['queue_name'], + id=subscr_id, + auto_create=False).AndReturn(fake_subscr) + self.m.StubOutWithMock(fake_subscr, 'update') + fake_subscr.update({'ttl': 3601, 'subscriber': self.subscriber, + 'options': self.options}) + + self.m.ReplayAll() + + t = template_format.parse(mistral_template) + new_subscr = t['resources']['subscription'] + new_subscr['properties']['ttl'] = "3601" + resource_defns = template.Template(t).resource_definitions(self.stack) + + scheduler.TaskRunner(subscr.create)() + scheduler.TaskRunner(subscr.update, resource_defns['subscription'])() + + self.m.VerifyAll() + + def test_update_replace(self, mock_client, mock_plugin): + subscr = self.subscr + subscr_id = "58138648c1e2eb7355d62137" + + fake_subscr = FakeSubscription(subscr.properties['queue_name'], + subscr_id) + self.m.StubOutWithMock(self.fc, 'subscription') + self.fc.subscription(subscr.properties['queue_name'], + options=self.options, + subscriber=self.subscriber, + ttl=220367260800).AndReturn(fake_subscr) + + self.m.ReplayAll() + + t = template_format.parse(mistral_template) + t['resources']['subscription']['properties']['queue_name'] = 'foo' + resource_defns = template.Template(t).resource_definitions(self.stack) + new_subscr = resource_defns['subscription'] + + scheduler.TaskRunner(subscr.create)() + err = self.assertRaises(resource.UpdateReplace, + scheduler.TaskRunner(subscr.update, + new_subscr)) + msg = 'The Resource subscription requires replacement.' + self.assertEqual(msg, six.text_type(err)) + + self.m.VerifyAll() + + def test_show_resource(self, mock_client, mock_plugin): + subscr = self.subscr + subscr_id = "58138648c1e2eb7355d62137" + + fake_subscr = FakeSubscription(subscr.properties['queue_name'], + subscr_id) + fake_subscr.ttl = 220367260800 + fake_subscr.subscriber = self.subscriber + fake_subscr.options = {'post_data': str(self.options['post_data'])} + + self.m.StubOutWithMock(self.fc, 'subscription') + self.fc.subscription(subscr.properties['queue_name'], + options=self.options, + subscriber=self.subscriber, + ttl=220367260800).AndReturn(fake_subscr) + self.fc.subscription( + subscr.properties['queue_name'], id=subscr_id, + auto_create=False).MultipleTimes().AndReturn(fake_subscr) + + self.m.ReplayAll() + + props = self.stack.t.t['resources']['subscription']['properties'] + scheduler.TaskRunner(subscr.create)() + self.assertEqual( + {'queue_name': props['queue_name'], + 'id': subscr_id, + 'subscriber': self.subscriber, + 'options': self.options, + 'ttl': 220367260800}, + subscr._show_resource()) + self.assertEqual( + {'queue_name': props['queue_name'], + 'workflow_id': props['workflow_id'], + 'input': props['input'], + 'params': {}, + 'ttl': 220367260800}, + subscr.parse_live_resource_data(subscr.properties, + subscr._show_resource())) diff --git a/releasenotes/notes/zaqar-notification-a4d240bbf31b7440.yaml b/releasenotes/notes/zaqar-notification-a4d240bbf31b7440.yaml new file mode 100644 index 0000000000..ba7474b6e8 --- /dev/null +++ b/releasenotes/notes/zaqar-notification-a4d240bbf31b7440.yaml @@ -0,0 +1,3 @@ +--- +features: + - New ``OS::Zaqar::Notification`` and ``OS::Zaqar::MistralTrigger`` resource types allow users to attach to Zaqar queues (respectively) notifications in general, and notifications that trigger Mistral workflow executions in particular.