Add an OS::Zaqar::MistralTrigger resource type

This simplifies the task of having Mistral subscribe to a Zaqar queue
(compared to using the OS::Zaqar::Subscription resource type) by
automatically filling in the correct subscriber URL from the Keystone
catalog, and also making the required options explicit in the properties
schema.

Change-Id: I915297b3d0ce70bafa6ca0f3854668e88d4475b1
Related-Bug: #1628693
This commit is contained in:
Zane Bitter 2016-11-30 14:44:13 -05:00
parent da5ba93048
commit ec7abd0f20
3 changed files with 368 additions and 19 deletions

View File

@ -18,6 +18,8 @@ from heat.engine import properties
from heat.engine import resource
from heat.engine import support
from oslo_serialization import jsonutils
class ZaqarSubscription(resource.Resource):
"""A resource for managing Zaqar subscriptions.
@ -71,20 +73,34 @@ class ZaqarSubscription(resource.Resource):
'trust+https']
def validate(self):
super(ZaqarSubscription, self).validate()
self._validate_subscriber()
def _validate_subscriber(self):
subscriber_type = self.properties[self.SUBSCRIBER].split(":", 1)[0]
if subscriber_type not in self.VALID_SUBSCRIBER_TYPES:
msg = (_("The subscriber type of must be one of: %s.")
% ", ".join(self.VALID_SUBSCRIBER_TYPES))
raise exception.StackValidationFailed(message=msg)
def _subscriber_url(self):
return self.properties[self.SUBSCRIBER]
def _subscription_options(self):
return self.properties[self.OPTIONS]
def _subscription_data(self):
return {
'subscriber': self._subscriber_url(),
'ttl': self.properties[self.TTL],
'options': self._subscription_options(),
}
def handle_create(self):
"""Create a subscription to a Zaqar message queue."""
subscription = self.client().subscription(
self.properties[self.QUEUE_NAME],
subscriber=self.properties[self.SUBSCRIBER],
ttl=self.properties[self.TTL],
options=self.properties[self.OPTIONS]
)
**self._subscription_data())
self.resource_id_set(subscription.id)
def _get_subscription(self):
@ -96,8 +112,10 @@ class ZaqarSubscription(resource.Resource):
def handle_update(self, json_snippet, tmpl_diff, prop_diff):
"""Update a subscription to a Zaqar message queue."""
self.properties = json_snippet.properties(self.properties_schema,
self.context)
subscription = self._get_subscription()
subscription.update(prop_diff)
subscription.update(self._subscription_data())
def handle_delete(self):
try:
@ -120,7 +138,97 @@ class ZaqarSubscription(resource.Resource):
}
class MistralTrigger(ZaqarSubscription):
"""A Zaqar subscription for triggering Mistral workflows.
This Zaqar subscription type listens for messages in a queue and triggers a
Mistral workflow execution each time one is received.
The content of the Zaqar message is passed to the workflow in the
environment with the name "notification", and thus is accessible from
within the workflow as:
<% env().notification %>
Other environment variables can be set using the 'env' key in the params
property.
"""
support_status = support.SupportStatus(version='8.0.0',
status=support.SUPPORTED)
PROPERTIES = (
QUEUE_NAME, TTL,
WORKFLOW_ID, PARAMS, INPUT,
) = (
ZaqarSubscription.QUEUE_NAME, ZaqarSubscription.TTL,
'workflow_id', 'params', 'input',
)
properties_schema = {
QUEUE_NAME: ZaqarSubscription.properties_schema[QUEUE_NAME],
TTL: ZaqarSubscription.properties_schema[TTL],
WORKFLOW_ID: properties.Schema(
properties.Schema.STRING,
_("UUID of the Mistral workflow to trigger."),
required=True,
constraints=[constraints.CustomConstraint('mistral.workflow')],
update_allowed=True),
PARAMS: properties.Schema(
properties.Schema.MAP,
_("Parameters to pass to the Mistral workflow execution. "
"The parameters depend on the workflow type."),
required=False,
default={},
update_allowed=True),
INPUT: properties.Schema(
properties.Schema.MAP,
_("Input values to pass to the Mistral workflow."),
required=False,
default={},
update_allowed=True),
}
def _validate_subscriber(self):
pass
def _subscriber_url(self):
mistral_client = self.client('mistral')
return 'trust+%s/executions' % mistral_client.http_client.base_url
def _subscription_options(self):
params = dict(self.properties[self.PARAMS])
params.setdefault('env', {})
params['env']['notification'] = "$zaqar_message$"
post_data = {
self.WORKFLOW_ID: self.properties[self.WORKFLOW_ID],
self.PARAMS: params,
self.INPUT: self.properties[self.INPUT],
}
return {
'post_data': jsonutils.dumps(post_data)
}
def parse_live_resource_data(self, resource_properties, resource_data):
options = resource_data.get(self.OPTIONS, {})
post_data = jsonutils.loads(options.get('post_data', '{}'))
params = post_data.get(self.PARAMS, {})
env = params.get('env', {})
env.pop('notification', None)
if not env:
params.pop('env', None)
return {
self.QUEUE_NAME: resource_data.get(self.QUEUE_NAME),
self.TTL: resource_data.get(self.TTL),
self.WORKFLOW_ID: post_data.get(self.WORKFLOW_ID),
self.PARAMS: params,
self.INPUT: post_data.get(self.INPUT),
}
def resource_mapping():
return {
'OS::Zaqar::Subscription': ZaqarSubscription,
'OS::Zaqar::MistralTrigger': MistralTrigger,
}

View File

@ -16,6 +16,7 @@ import six
from heat.common import exception
from heat.common import template_format
from heat.engine.clients.os import mistral as mistral_client_plugin
from heat.engine import resource
from heat.engine import scheduler
from heat.engine import stack
@ -23,15 +24,17 @@ from heat.engine import template
from heat.tests import common
from heat.tests import utils
from oslo_serialization import jsonutils
try:
from zaqarclient.transport.errors import ResourceNotFound # noqa
except ImportError:
ResourceNotFound = Exception
class ResourceNotFound(Exception):
pass
wp_template = '''
subscr_template = '''
{
"AWSTemplateFormatVersion" : "2010-09-09",
"Description" : "openstack Zaqar queue service as a resource",
"Resources" : {
"MyQueue2" : {
"Type" : "OS::Zaqar::Queue",
@ -53,6 +56,22 @@ wp_template = '''
}
'''
mistral_template = '''
{
"heat_template_version" : "2015-10-15",
"resources" : {
"subscription" : {
"type" : "OS::Zaqar::MistralTrigger",
"properties" : {
"queue_name" : "myqueue",
"workflow_id": "abcd",
"input" : { "key1" : "value1" }
}
}
}
}
'''
class FakeSubscription(object):
def __init__(self, queue_name, id=None, ttl=None, subscriber=None,
@ -64,7 +83,10 @@ class FakeSubscription(object):
self.options = options
def update(self, prop_diff):
pass
allowed_keys = {'subscriber', 'ttl', 'options'}
for key in six.iterkeys(prop_diff):
if key not in allowed_keys:
raise KeyError(key)
def delete(self):
pass
@ -86,7 +108,7 @@ class ZaqarSubscriptionTest(common.HeatTestCase):
self.stack.store()
def test_validate_subscriber_type(self, mock_client, mock_plugin):
t = template_format.parse(wp_template)
t = template_format.parse(subscr_template)
t['Resources']['MySubscription']['Properties']['subscriber'] = "foo:ba"
stack_name = 'test_stack'
tmpl = template.Template(t)
@ -99,7 +121,7 @@ class ZaqarSubscriptionTest(common.HeatTestCase):
six.text_type(exc))
def test_create(self, mock_client, mock_plugin):
t = template_format.parse(wp_template)
t = template_format.parse(subscr_template)
self.parse_stack(t)
subscr = self.stack['MySubscription']
@ -124,7 +146,7 @@ class ZaqarSubscriptionTest(common.HeatTestCase):
self.m.VerifyAll()
def test_delete(self, mock_client, mock_plugin):
t = template_format.parse(wp_template)
t = template_format.parse(subscr_template)
self.parse_stack(t)
subscr = self.stack['MySubscription']
@ -154,7 +176,7 @@ class ZaqarSubscriptionTest(common.HeatTestCase):
self.m.VerifyAll()
def test_delete_not_found(self, mock_client, mock_plugin):
t = template_format.parse(wp_template)
t = template_format.parse(subscr_template)
self.parse_stack(t)
subscr = self.stack['MySubscription']
@ -182,7 +204,7 @@ class ZaqarSubscriptionTest(common.HeatTestCase):
self.m.VerifyAll()
def test_update_in_place(self, mock_client, mock_plugin):
t = template_format.parse(wp_template)
t = template_format.parse(subscr_template)
self.parse_stack(t)
subscr = self.stack['MySubscription']
@ -202,11 +224,12 @@ class ZaqarSubscriptionTest(common.HeatTestCase):
id=subscr_id,
auto_create=False).AndReturn(fake_subscr)
self.m.StubOutWithMock(fake_subscr, 'update')
fake_subscr.update({'ttl': 3601})
fake_subscr.update({'ttl': 3601, 'options': {'key1': 'value1'},
'subscriber': 'mailto:name@domain.com'})
self.m.ReplayAll()
t = template_format.parse(wp_template)
t = template_format.parse(subscr_template)
new_subscr = t['Resources']['MySubscription']
new_subscr['Properties']['ttl'] = "3601"
resource_defns = template.Template(t).resource_definitions(self.stack)
@ -217,7 +240,7 @@ class ZaqarSubscriptionTest(common.HeatTestCase):
self.m.VerifyAll()
def test_update_replace(self, mock_client, mock_plugin):
t = template_format.parse(wp_template)
t = template_format.parse(subscr_template)
self.parse_stack(t)
subscr = self.stack['MySubscription']
@ -236,7 +259,7 @@ class ZaqarSubscriptionTest(common.HeatTestCase):
self.m.ReplayAll()
t = template_format.parse(wp_template)
t = template_format.parse(subscr_template)
t['Resources']['MySubscription']['Properties']['queue_name'] = 'foo'
resource_defns = template.Template(t).resource_definitions(self.stack)
new_subscr = resource_defns['MySubscription']
@ -251,7 +274,7 @@ class ZaqarSubscriptionTest(common.HeatTestCase):
self.m.VerifyAll()
def test_show_resource(self, mock_client, mock_plugin):
t = template_format.parse(wp_template)
t = template_format.parse(subscr_template)
self.parse_stack(t)
subscr = self.stack['MySubscription']
@ -291,3 +314,218 @@ class ZaqarSubscriptionTest(common.HeatTestCase):
subscr._show_resource()))
self.m.VerifyAll()
class JsonString(object):
def __init__(self, data):
self._data = data
def __eq__(self, other):
return self._data == jsonutils.loads(other)
def __str__(self):
return jsonutils.dumps(self._data)
def __repr__(self):
return str(self)
@mock.patch.object(resource.Resource, "client_plugin")
@mock.patch.object(resource.Resource, "client")
class ZaqarMistralTriggerTest(common.HeatTestCase):
def setUp(self):
super(ZaqarMistralTriggerTest, self).setUp()
self.fc = self.m.CreateMockAnything()
self.ctx = utils.dummy_context()
self.patchobject(mistral_client_plugin.WorkflowConstraint,
'validate', return_value=True)
stack_name = 'test_stack'
t = template_format.parse(mistral_template)
tmpl = template.Template(t)
self.stack = stack.Stack(self.ctx, stack_name, tmpl)
self.stack.validate()
self.stack.store()
def client(name='zaqar'):
if name == 'mistral':
client = mock.Mock()
client.http_client = mock.Mock()
client.http_client.base_url = 'http://mistral.example.net:8989'
return client
elif name == 'zaqar':
return self.fc
self.subscr = self.stack['subscription']
self.subscr.client = mock.Mock(side_effect=client)
self.subscriber = 'trust+http://mistral.example.net:8989/executions'
self.options = {
'post_data': JsonString({
'workflow_id': 'abcd',
'input': {"key1": "value1"},
'params': {'env': {'notification': '$zaqar_message$'}},
})
}
def test_create(self, mock_client, mock_plugin):
subscr = self.subscr
subscr_id = "58138648c1e2eb7355d62137"
fake_subscr = FakeSubscription(subscr.properties['queue_name'],
subscr_id)
self.m.StubOutWithMock(self.fc, 'subscription')
self.fc.subscription(subscr.properties['queue_name'],
options=self.options,
subscriber=self.subscriber,
ttl=220367260800).AndReturn(fake_subscr)
self.m.ReplayAll()
scheduler.TaskRunner(subscr.create)()
self.assertEqual(subscr_id, subscr.FnGetRefId())
self.m.VerifyAll()
def test_delete(self, mock_client, mock_plugin):
subscr = self.subscr
subscr_id = "58138648c1e2eb7355d62137"
fake_subscr = FakeSubscription(subscr.properties['queue_name'],
subscr_id)
self.m.StubOutWithMock(self.fc, 'subscription')
self.fc.subscription(subscr.properties['queue_name'],
options=self.options,
subscriber=self.subscriber,
ttl=220367260800).AndReturn(fake_subscr)
self.fc.subscription(subscr.properties['queue_name'],
id=subscr_id,
auto_create=False).AndReturn(fake_subscr)
self.m.StubOutWithMock(fake_subscr, 'delete')
fake_subscr.delete()
self.m.ReplayAll()
scheduler.TaskRunner(subscr.create)()
scheduler.TaskRunner(subscr.delete)()
self.m.VerifyAll()
def test_delete_not_found(self, mock_client, mock_plugin):
subscr = self.subscr
subscr_id = "58138648c1e2eb7355d62137"
fake_subscr = FakeSubscription(subscr.properties['queue_name'],
subscr_id)
self.m.StubOutWithMock(self.fc, 'subscription')
self.fc.subscription(subscr.properties['queue_name'],
options=self.options,
subscriber=self.subscriber,
ttl=220367260800).AndReturn(fake_subscr)
self.fc.subscription(subscr.properties['queue_name'],
id=subscr_id,
auto_create=False).AndRaise(ResourceNotFound())
self.m.ReplayAll()
scheduler.TaskRunner(subscr.create)()
scheduler.TaskRunner(subscr.delete)()
self.m.VerifyAll()
def test_update_in_place(self, mock_client, mock_plugin):
subscr = self.subscr
subscr_id = "58138648c1e2eb7355d62137"
fake_subscr = FakeSubscription(subscr.properties['queue_name'],
subscr_id)
self.m.StubOutWithMock(self.fc, 'subscription')
self.fc.subscription(subscr.properties['queue_name'],
options=self.options,
subscriber=self.subscriber,
ttl=220367260800).AndReturn(fake_subscr)
self.fc.subscription(subscr.properties['queue_name'],
id=subscr_id,
auto_create=False).AndReturn(fake_subscr)
self.m.StubOutWithMock(fake_subscr, 'update')
fake_subscr.update({'ttl': 3601, 'subscriber': self.subscriber,
'options': self.options})
self.m.ReplayAll()
t = template_format.parse(mistral_template)
new_subscr = t['resources']['subscription']
new_subscr['properties']['ttl'] = "3601"
resource_defns = template.Template(t).resource_definitions(self.stack)
scheduler.TaskRunner(subscr.create)()
scheduler.TaskRunner(subscr.update, resource_defns['subscription'])()
self.m.VerifyAll()
def test_update_replace(self, mock_client, mock_plugin):
subscr = self.subscr
subscr_id = "58138648c1e2eb7355d62137"
fake_subscr = FakeSubscription(subscr.properties['queue_name'],
subscr_id)
self.m.StubOutWithMock(self.fc, 'subscription')
self.fc.subscription(subscr.properties['queue_name'],
options=self.options,
subscriber=self.subscriber,
ttl=220367260800).AndReturn(fake_subscr)
self.m.ReplayAll()
t = template_format.parse(mistral_template)
t['resources']['subscription']['properties']['queue_name'] = 'foo'
resource_defns = template.Template(t).resource_definitions(self.stack)
new_subscr = resource_defns['subscription']
scheduler.TaskRunner(subscr.create)()
err = self.assertRaises(resource.UpdateReplace,
scheduler.TaskRunner(subscr.update,
new_subscr))
msg = 'The Resource subscription requires replacement.'
self.assertEqual(msg, six.text_type(err))
self.m.VerifyAll()
def test_show_resource(self, mock_client, mock_plugin):
subscr = self.subscr
subscr_id = "58138648c1e2eb7355d62137"
fake_subscr = FakeSubscription(subscr.properties['queue_name'],
subscr_id)
fake_subscr.ttl = 220367260800
fake_subscr.subscriber = self.subscriber
fake_subscr.options = {'post_data': str(self.options['post_data'])}
self.m.StubOutWithMock(self.fc, 'subscription')
self.fc.subscription(subscr.properties['queue_name'],
options=self.options,
subscriber=self.subscriber,
ttl=220367260800).AndReturn(fake_subscr)
self.fc.subscription(
subscr.properties['queue_name'], id=subscr_id,
auto_create=False).MultipleTimes().AndReturn(fake_subscr)
self.m.ReplayAll()
props = self.stack.t.t['resources']['subscription']['properties']
scheduler.TaskRunner(subscr.create)()
self.assertEqual(
{'queue_name': props['queue_name'],
'id': subscr_id,
'subscriber': self.subscriber,
'options': self.options,
'ttl': 220367260800},
subscr._show_resource())
self.assertEqual(
{'queue_name': props['queue_name'],
'workflow_id': props['workflow_id'],
'input': props['input'],
'params': {},
'ttl': 220367260800},
subscr.parse_live_resource_data(subscr.properties,
subscr._show_resource()))

View File

@ -0,0 +1,3 @@
---
features:
- New ``OS::Zaqar::Notification`` and ``OS::Zaqar::MistralTrigger`` resource types allow users to attach to Zaqar queues (respectively) notifications in general, and notifications that trigger Mistral workflow executions in particular.