Add OS::Zaqar::Subscription resource

Closes-Bug: #1628693
Change-Id: Ifb15bdfea2ff04c1f0fb1f890582c7e9d12b6db9
This commit is contained in:
Jason Dunsmore 2016-10-28 16:08:18 -05:00
parent 90372215e5
commit 073222e11f
2 changed files with 419 additions and 0 deletions

View File

@ -0,0 +1,126 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from heat.common import exception
from heat.common.i18n import _
from heat.engine import constraints
from heat.engine import properties
from heat.engine import resource
from heat.engine import support
class ZaqarSubscription(resource.Resource):
"""A resource for managing Zaqar subscriptions.
A Zaqar subscription listens for messages in a queue and sends a
notification over email or webhook.
"""
default_client_name = "zaqar"
support_status = support.SupportStatus(version='8.0.0',
status=support.SUPPORTED)
PROPERTIES = (
QUEUE_NAME, SUBSCRIBER, TTL, OPTIONS,
) = (
'queue_name', 'subscriber', 'ttl', 'options',
)
properties_schema = {
QUEUE_NAME: properties.Schema(
properties.Schema.STRING,
_("Name of the queue to subscribe to."),
required=True),
SUBSCRIBER: properties.Schema(
properties.Schema.STRING,
_("URI of the subscriber which will be notified. Must be in the "
"format: <TYPE>:<VALUE>."),
required=True,
update_allowed=True),
TTL: properties.Schema(
properties.Schema.INTEGER,
_("Time to live of the subscription in seconds."),
update_allowed=True,
default=220367260800, # Seconds until the year 9000
# (ie. never expire)
constraints=[
constraints.Range(
min=60,
),
],
),
OPTIONS: properties.Schema(
properties.Schema.MAP,
_("Options used to configure this subscription."),
required=False,
update_allowed=True)
}
VALID_SUBSCRIBER_TYPES = ['http', 'https', 'mailto', 'trust+http',
'trust+https']
def validate(self):
subscriber_type = self.properties[self.SUBSCRIBER].split(":", 1)[0]
if subscriber_type not in self.VALID_SUBSCRIBER_TYPES:
msg = (_("The subscriber type of must be one of: %s.")
% ", ".join(self.VALID_SUBSCRIBER_TYPES))
raise exception.StackValidationFailed(message=msg)
def handle_create(self):
"""Create a subscription to a Zaqar message queue."""
subscription = self.client().subscription(
self.properties[self.QUEUE_NAME],
subscriber=self.properties[self.SUBSCRIBER],
ttl=self.properties[self.TTL],
options=self.properties[self.OPTIONS]
)
self.resource_id_set(subscription.id)
def _get_subscription(self):
return self.client().subscription(
self.properties[self.QUEUE_NAME],
id=self.resource_id,
auto_create=False
)
def handle_update(self, json_snippet, tmpl_diff, prop_diff):
"""Update a subscription to a Zaqar message queue."""
subscription = self._get_subscription()
subscription.update(prop_diff)
def handle_delete(self):
try:
self._get_subscription().delete()
except Exception as ex:
self.client_plugin().ignore_not_found(ex)
else:
return True
def _show_resource(self):
subscription = self._get_subscription()
return vars(subscription)
def parse_live_resource_data(self, resource_properties, resource_data):
return {
self.QUEUE_NAME: resource_data[self.QUEUE_NAME],
self.SUBSCRIBER: resource_data[self.SUBSCRIBER],
self.TTL: resource_data[self.TTL],
self.OPTIONS: resource_data[self.OPTIONS]
}
def resource_mapping():
return {
'OS::Zaqar::Subscription': ZaqarSubscription,
}

View File

@ -0,0 +1,293 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import six
from heat.common import exception
from heat.common import template_format
from heat.engine import resource
from heat.engine import scheduler
from heat.engine import stack
from heat.engine import template
from heat.tests import common
from heat.tests import utils
try:
from zaqarclient.transport.errors import ResourceNotFound # noqa
except ImportError:
ResourceNotFound = Exception
wp_template = '''
{
"AWSTemplateFormatVersion" : "2010-09-09",
"Description" : "openstack Zaqar queue service as a resource",
"Resources" : {
"MyQueue2" : {
"Type" : "OS::Zaqar::Queue",
"Properties" : {
"name" : "myqueue",
"metadata" : { "key1" : { "key2" : "value", "key3" : [1, 2] } }
}
},
"MySubscription" : {
"Type" : "OS::Zaqar::Subscription",
"Properties" : {
"queue_name" : "myqueue",
"subscriber" : "mailto:name@domain.com",
"ttl" : "3600",
"options" : { "key1" : "value1" }
}
}
}
}
'''
class FakeSubscription(object):
def __init__(self, queue_name, id=None, ttl=None, subscriber=None,
options=None, auto_create=True):
self.id = id
self.queue_name = queue_name
self.ttl = ttl
self.subscriber = subscriber
self.options = options
def update(self, prop_diff):
pass
def delete(self):
pass
@mock.patch.object(resource.Resource, "client_plugin")
@mock.patch.object(resource.Resource, "client")
class ZaqarSubscriptionTest(common.HeatTestCase):
def setUp(self):
super(ZaqarSubscriptionTest, self).setUp()
self.fc = self.m.CreateMockAnything()
self.ctx = utils.dummy_context()
def parse_stack(self, t):
stack_name = 'test_stack'
tmpl = template.Template(t)
self.stack = stack.Stack(self.ctx, stack_name, tmpl)
self.stack.validate()
self.stack.store()
def test_validate_subscriber_type(self, mock_client, mock_plugin):
t = template_format.parse(wp_template)
t['Resources']['MySubscription']['Properties']['subscriber'] = "foo:ba"
stack_name = 'test_stack'
tmpl = template.Template(t)
self.stack = stack.Stack(self.ctx, stack_name, tmpl)
exc = self.assertRaises(exception.StackValidationFailed,
self.stack.validate)
self.assertEqual('The subscriber type of must be one of: http, https, '
'mailto, trust+http, trust+https.',
six.text_type(exc))
def test_create(self, mock_client, mock_plugin):
t = template_format.parse(wp_template)
self.parse_stack(t)
subscr = self.stack['MySubscription']
subscr_id = "58138648c1e2eb7355d62137"
self.m.StubOutWithMock(subscr, 'client')
subscr.client().MultipleTimes().AndReturn(self.fc)
fake_subscr = FakeSubscription(subscr.properties['queue_name'],
subscr_id)
self.m.StubOutWithMock(self.fc, 'subscription')
self.fc.subscription(subscr.properties['queue_name'],
options={'key1': 'value1'},
subscriber=u'mailto:name@domain.com',
ttl=3600).AndReturn(fake_subscr)
self.m.ReplayAll()
scheduler.TaskRunner(subscr.create)()
self.assertEqual(subscr_id, subscr.FnGetRefId())
self.m.VerifyAll()
def test_delete(self, mock_client, mock_plugin):
t = template_format.parse(wp_template)
self.parse_stack(t)
subscr = self.stack['MySubscription']
subscr_id = "58138648c1e2eb7355d62137"
self.m.StubOutWithMock(subscr, 'client')
subscr.client().MultipleTimes().AndReturn(self.fc)
fake_subscr = FakeSubscription(subscr.properties['queue_name'],
subscr_id)
self.m.StubOutWithMock(self.fc, 'subscription')
self.fc.subscription(subscr.properties['queue_name'],
options={'key1': 'value1'},
subscriber=u'mailto:name@domain.com',
ttl=3600).AndReturn(fake_subscr)
self.fc.subscription(subscr.properties['queue_name'],
id=subscr_id,
auto_create=False).AndReturn(fake_subscr)
self.m.StubOutWithMock(fake_subscr, 'delete')
fake_subscr.delete()
self.m.ReplayAll()
scheduler.TaskRunner(subscr.create)()
scheduler.TaskRunner(subscr.delete)()
self.m.VerifyAll()
def test_delete_not_found(self, mock_client, mock_plugin):
t = template_format.parse(wp_template)
self.parse_stack(t)
subscr = self.stack['MySubscription']
subscr_id = "58138648c1e2eb7355d62137"
self.m.StubOutWithMock(subscr, 'client')
subscr.client().MultipleTimes().AndReturn(self.fc)
fake_subscr = FakeSubscription(subscr.properties['queue_name'],
subscr_id)
self.m.StubOutWithMock(self.fc, 'subscription')
self.fc.subscription(subscr.properties['queue_name'],
options={'key1': 'value1'},
subscriber=u'mailto:name@domain.com',
ttl=3600).AndReturn(fake_subscr)
self.fc.subscription(subscr.properties['queue_name'],
id=subscr_id,
auto_create=False).AndRaise(ResourceNotFound())
self.m.ReplayAll()
scheduler.TaskRunner(subscr.create)()
scheduler.TaskRunner(subscr.delete)()
self.m.VerifyAll()
def test_update_in_place(self, mock_client, mock_plugin):
t = template_format.parse(wp_template)
self.parse_stack(t)
subscr = self.stack['MySubscription']
subscr_id = "58138648c1e2eb7355d62137"
self.m.StubOutWithMock(subscr, 'client')
subscr.client().MultipleTimes().AndReturn(self.fc)
fake_subscr = FakeSubscription(subscr.properties['queue_name'],
subscr_id)
self.m.StubOutWithMock(self.fc, 'subscription')
self.fc.subscription(subscr.properties['queue_name'],
options={'key1': 'value1'},
subscriber=u'mailto:name@domain.com',
ttl=3600).AndReturn(fake_subscr)
self.fc.subscription(subscr.properties['queue_name'],
id=subscr_id,
auto_create=False).AndReturn(fake_subscr)
self.m.StubOutWithMock(fake_subscr, 'update')
fake_subscr.update({'ttl': 3601})
self.m.ReplayAll()
t = template_format.parse(wp_template)
new_subscr = t['Resources']['MySubscription']
new_subscr['Properties']['ttl'] = "3601"
resource_defns = template.Template(t).resource_definitions(self.stack)
scheduler.TaskRunner(subscr.create)()
scheduler.TaskRunner(subscr.update, resource_defns['MySubscription'])()
self.m.VerifyAll()
def test_update_replace(self, mock_client, mock_plugin):
t = template_format.parse(wp_template)
self.parse_stack(t)
subscr = self.stack['MySubscription']
subscr_id = "58138648c1e2eb7355d62137"
self.m.StubOutWithMock(subscr, 'client')
subscr.client().MultipleTimes().AndReturn(self.fc)
fake_subscr = FakeSubscription(subscr.properties['queue_name'],
subscr_id)
self.m.StubOutWithMock(self.fc, 'subscription')
self.fc.subscription(subscr.properties['queue_name'],
options={'key1': 'value1'},
subscriber=u'mailto:name@domain.com',
ttl=3600).AndReturn(fake_subscr)
self.m.ReplayAll()
t = template_format.parse(wp_template)
t['Resources']['MySubscription']['Properties']['queue_name'] = 'foo'
resource_defns = template.Template(t).resource_definitions(self.stack)
new_subscr = resource_defns['MySubscription']
scheduler.TaskRunner(subscr.create)()
err = self.assertRaises(resource.UpdateReplace,
scheduler.TaskRunner(subscr.update,
new_subscr))
msg = 'The Resource MySubscription requires replacement.'
self.assertEqual(msg, six.text_type(err))
self.m.VerifyAll()
def test_show_resource(self, mock_client, mock_plugin):
t = template_format.parse(wp_template)
self.parse_stack(t)
subscr = self.stack['MySubscription']
subscr_id = "58138648c1e2eb7355d62137"
self.m.StubOutWithMock(subscr, 'client')
subscr.client().MultipleTimes().AndReturn(self.fc)
fake_subscr = FakeSubscription(subscr.properties['queue_name'],
subscr_id)
props = t['Resources']['MySubscription']['Properties']
fake_subscr.ttl = props['ttl']
fake_subscr.subscriber = props['subscriber']
fake_subscr.options = props['options']
self.m.StubOutWithMock(self.fc, 'subscription')
self.fc.subscription(subscr.properties['queue_name'],
options={'key1': 'value1'},
subscriber=u'mailto:name@domain.com',
ttl=3600).AndReturn(fake_subscr)
self.fc.subscription(
subscr.properties['queue_name'], id=subscr_id,
auto_create=False).MultipleTimes().AndReturn(fake_subscr)
self.m.ReplayAll()
rsrc_data = props.copy()
rsrc_data['id'] = subscr_id
scheduler.TaskRunner(subscr.create)()
self.assertEqual(rsrc_data, subscr._show_resource())
self.assertEqual(
{'queue_name': props['queue_name'],
'subscriber': props['subscriber'],
'ttl': props['ttl'],
'options': props['options']},
subscr.parse_live_resource_data(subscr.properties,
subscr._show_resource()))
self.m.VerifyAll()