From a61d8590391d69427a95589912bf6339f62a7a54 Mon Sep 17 00:00:00 2001 From: "cbjchen@cn.ibm.com" Date: Tue, 14 Jan 2014 14:48:14 +0800 Subject: [PATCH] Marconi message queue resource implementation This implements a Marconi backed native OpenStack message queue. Customers can create a OS::Marconi::Queue queue resource declaratively in templates and pass the href/endpoint of the queue to other resources by means of href attribute of the queue. Marconi bp, https://blueprints.launchpad.net/marconi/+spec/heat-template Implements: blueprint mqaas-marconi-resource Change-Id: Icbbb1869b352dbdba22530f9ec185652f4da75b6 --- contrib/marconi-plugin/README.md | 21 ++ contrib/marconi-plugin/__init__.py | 0 contrib/marconi-plugin/plugin/__init__.py | 0 contrib/marconi-plugin/plugin/queue.py | 167 ++++++++++++++ contrib/marconi-plugin/requirements.txt | 1 + contrib/marconi-plugin/tests/__init__.py | 0 contrib/marconi-plugin/tests/test_queue.py | 241 +++++++++++++++++++++ 7 files changed, 430 insertions(+) create mode 100644 contrib/marconi-plugin/README.md create mode 100644 contrib/marconi-plugin/__init__.py create mode 100644 contrib/marconi-plugin/plugin/__init__.py create mode 100644 contrib/marconi-plugin/plugin/queue.py create mode 100644 contrib/marconi-plugin/requirements.txt create mode 100644 contrib/marconi-plugin/tests/__init__.py create mode 100644 contrib/marconi-plugin/tests/test_queue.py diff --git a/contrib/marconi-plugin/README.md b/contrib/marconi-plugin/README.md new file mode 100644 index 0000000000..29b84801d2 --- /dev/null +++ b/contrib/marconi-plugin/README.md @@ -0,0 +1,21 @@ +Marconi plugin for OpenStack Heat +================================ + +This plugin enable using Marconi queuing service as a resource in a Heat template. + + +### 1. Install the Marconi plugin in Heat + +NOTE: Heat scans several directories to find plugins. The list of directories +is specified in the configuration file "heat.conf" with the "plugin_dirs" +directive. + +To install the Marconi plugin, one needs to first make sure the +python-marconiclient package is installed - pip install -r requirements.txt, and +copy the plugin implementation, e.g. queue.py to wherever plugin_dirs points to. + + +### 2. Restart heat + +Only the process "heat-engine" needs to be restarted to load the newly installed +plugin. diff --git a/contrib/marconi-plugin/__init__.py b/contrib/marconi-plugin/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/contrib/marconi-plugin/plugin/__init__.py b/contrib/marconi-plugin/plugin/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/contrib/marconi-plugin/plugin/queue.py b/contrib/marconi-plugin/plugin/queue.py new file mode 100644 index 0000000000..d427b1bbe6 --- /dev/null +++ b/contrib/marconi-plugin/plugin/queue.py @@ -0,0 +1,167 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from heat.common import exception +from heat.engine import clients +from heat.engine import properties +from heat.engine import resource +from heat.openstack.common import log as logging + + +logger = logging.getLogger(__name__) + + +try: + from marconiclient.queues.v1 import client as marconiclient +except ImportError: + marconiclient = None + logger.info(_('marconiclient not available')) + + def resource_mapping(): + return {} +else: + def resource_mapping(): + return { + 'OS::Marconi::Queue': MarconiQueue, + } + + +class Clients(clients.OpenStackClients): + ''' + Convenience class to create and cache client instances. + ''' + def __init__(self, context): + super(Clients, self).__init__(context) + self._marconi = None + + def marconi(self, service_type="queuing"): + if self._marconi: + return self._marconi + + con = self.context + if self.auth_token is None: + logger.error(_("Marconi connection failed, no auth_token!")) + return None + + opts = { + 'os_auth_token': con.auth_token, + 'os_auth_url': con.auth_url, + 'os_project_id': con.tenant, + 'os_service_type': service_type, + } + auth_opts = {'backend': 'keystone', + 'options': opts} + conf = {'auth_opts': auth_opts} + endpoint = self.url_for(service_type=service_type) + + self._marconi = marconiclient.Client(url=endpoint, conf=conf) + + return self._marconi + + +class MarconiQueue(resource.Resource): + + PROPERTIES = ( + NAME, METADATA, + ) = ( + 'name', 'metadata', + ) + + properties_schema = { + NAME: properties.Schema( + properties.Schema.STRING, + _("Name of the queue instance to create."), + required=True), + METADATA: properties.Schema( + properties.Schema.MAP, + description=_("Arbitrary key/value metadata to store " + "contextual information about this queue."), + update_allowed=True) + } + + attributes_schema = { + "queue_id": _("ID of the queue."), + "href": _("The resource href of the queue.") + } + + update_allowed_keys = ('Properties',) + + def __init__(self, name, json_snippet, stack): + super(MarconiQueue, self).__init__(name, json_snippet, stack) + self.clients = Clients(self.context) + + def marconi(self): + return self.clients.marconi() + + def physical_resource_name(self): + return self.properties[self.NAME] + + def handle_create(self): + ''' + Create a marconi message queue. + ''' + queue_name = self.physical_resource_name() + queue = self.marconi().queue(queue_name, auto_create=False) + # Marconi client doesn't report an error if an queue with the same + # id/name already exists, which can cause issue with stack update. + if queue.exists(): + raise exception.Error(_('Message queue %s already exists.') + % queue_name) + queue.ensure_exists() + self.resource_id_set(queue_name) + return queue + + def check_create_complete(self, queue): + # set metadata of the newly created queue + if queue.exists(): + metadata = self.properties.get('metadata') + if metadata: + queue.metadata(new_meta=metadata) + return True + + queue_name = self.physical_resource_name() + raise exception.Error(_('Message queue %s creation failed.') + % queue_name) + + def handle_update(self, json_snippet, tmpl_diff, prop_diff): + ''' + Update queue metadata. + ''' + if 'metadata' in prop_diff: + queue = self.marconi().queue(self.resource_id, auto_create=False) + metadata = prop_diff['metadata'] + queue.metadata(new_meta=metadata) + + def handle_delete(self): + ''' + Delete a marconi message queue. + ''' + if not self.resource_id: + return + + queue = self.marconi().queue(self.resource_id, auto_create=False) + queue.delete() + + def href(self): + api_endpoint = self.marconi().api_url + queue_name = self.physical_resource_name() + if api_endpoint.endswith('/'): + return '%squeues/%s' % (api_endpoint, queue_name) + else: + return '%s/queues/%s' % (api_endpoint, queue_name) + + def _resolve_attribute(self, name): + if name == 'queue_id': + return self.resource_id + elif name == 'href': + return self.href() diff --git a/contrib/marconi-plugin/requirements.txt b/contrib/marconi-plugin/requirements.txt new file mode 100644 index 0000000000..0bd5c0f864 --- /dev/null +++ b/contrib/marconi-plugin/requirements.txt @@ -0,0 +1 @@ +python-marconiclient>=0.0.1a1 diff --git a/contrib/marconi-plugin/tests/__init__.py b/contrib/marconi-plugin/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/contrib/marconi-plugin/tests/test_queue.py b/contrib/marconi-plugin/tests/test_queue.py new file mode 100644 index 0000000000..f48a466d00 --- /dev/null +++ b/contrib/marconi-plugin/tests/test_queue.py @@ -0,0 +1,241 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from heat.common import exception +from heat.common import template_format +from heat.engine import parser +from heat.engine import resource +from heat.engine import scheduler +from heat.tests.common import HeatTestCase +from heat.tests import utils + +from ..plugin import queue # noqa + +wp_template = ''' +{ + "AWSTemplateFormatVersion" : "2010-09-09", + "Description" : "openstack Marconi queue service as a resource", + "Resources" : { + "MyQueue2" : { + "Type" : "OS::Marconi::Queue", + "Properties" : { + "name": "myqueue", + "metadata": { "key1": { "key2": "value", "key3": [1, 2] } } + } + } + }, + "Outputs" : { + "queue_id": { + "Value": { "Fn::GetAtt" : [ "MyQueue2", "queue_id" ]}, + "Description": "queue name" + }, + "queue_href": { + "Value": { "Fn::GetAtt" : [ "MyQueue2", "href" ]}, + "Description": "queue href" + } + } +} +''' + + +class FakeQueue(object): + def __init__(self, queue_name, auto_create=True): + self._id = queue_name + self._auto_create = auto_create + self._exists = False + + def exists(self): + return self._exists + + def ensure_exists(self): + self._exists = True + + def metadata(self, new_meta=None): + pass + + def delete(self): + pass + + +class MarconiMessageQueueTest(HeatTestCase): + def setUp(self): + super(MarconiMessageQueueTest, self).setUp() + self.fc = self.m.CreateMockAnything() + utils.setup_dummy_db() + self.ctx = utils.dummy_context() + resource._register_class("OS::Marconi::Queue", + queue.MarconiQueue) + + def parse_stack(self, t): + stack_name = 'test_stack' + tmpl = parser.Template(t) + self.stack = parser.Stack(self.ctx, stack_name, tmpl) + self.stack.validate() + self.stack.store() + + @utils.stack_delete_after + def test_create(self): + t = template_format.parse(wp_template) + self.parse_stack(t) + + queue = self.stack['MyQueue2'] + self.m.StubOutWithMock(queue, 'marconi') + queue.marconi().MultipleTimes().AndReturn(self.fc) + + fake_q = FakeQueue(queue.physical_resource_name(), auto_create=False) + self.m.StubOutWithMock(self.fc, 'queue') + self.fc.queue(queue.physical_resource_name(), + auto_create=False).AndReturn(fake_q) + self.m.StubOutWithMock(fake_q, 'exists') + fake_q.exists().AndReturn(False) + self.m.StubOutWithMock(fake_q, 'ensure_exists') + fake_q.ensure_exists() + fake_q.exists().AndReturn(True) + self.m.StubOutWithMock(fake_q, 'metadata') + fake_q.metadata(new_meta=queue.properties.get('metadata')) + + self.m.ReplayAll() + + scheduler.TaskRunner(queue.create)() + self.fc.api_url = 'http://127.0.0.1:8888/v1' + self.assertEqual('myqueue', queue.FnGetAtt('queue_id')) + self.assertEqual('http://127.0.0.1:8888/v1/queues/myqueue', + queue.FnGetAtt('href')) + + self.m.VerifyAll() + + @utils.stack_delete_after + def test_create_existing_queue(self): + t = template_format.parse(wp_template) + self.parse_stack(t) + + queue = self.stack['MyQueue2'] + self.m.StubOutWithMock(queue, 'marconi') + queue.marconi().MultipleTimes().AndReturn(self.fc) + + fake_q = FakeQueue("myqueue", auto_create=False) + self.m.StubOutWithMock(self.fc, 'queue') + self.fc.queue("myqueue", auto_create=False).AndReturn(fake_q) + self.m.StubOutWithMock(fake_q, 'exists') + fake_q.exists().AndReturn(True) + self.m.ReplayAll() + + err = self.assertRaises(exception.ResourceFailure, + scheduler.TaskRunner(queue.create)) + self.assertEqual("Error: Message queue myqueue already exists.", + str(err)) + self.m.VerifyAll() + + @utils.stack_delete_after + def test_create_failed(self): + t = template_format.parse(wp_template) + self.parse_stack(t) + + queue = self.stack['MyQueue2'] + self.m.StubOutWithMock(queue, 'marconi') + queue.marconi().MultipleTimes().AndReturn(self.fc) + + fake_q = FakeQueue("myqueue", auto_create=False) + self.m.StubOutWithMock(self.fc, 'queue') + self.fc.queue("myqueue", auto_create=False).AndReturn(fake_q) + self.m.StubOutWithMock(fake_q, 'exists') + fake_q.exists().AndReturn(False) + self.m.StubOutWithMock(fake_q, 'ensure_exists') + fake_q.ensure_exists() + fake_q.exists().AndReturn(False) + + self.m.ReplayAll() + + err = self.assertRaises(exception.ResourceFailure, + scheduler.TaskRunner(queue.create)) + self.assertEqual("Error: Message queue myqueue creation failed.", + str(err)) + self.m.VerifyAll() + + @utils.stack_delete_after + def test_delete(self): + t = template_format.parse(wp_template) + self.parse_stack(t) + + queue = self.stack['MyQueue2'] + queue.resource_id_set(queue.properties.get('name')) + self.m.StubOutWithMock(queue, 'marconi') + queue.marconi().MultipleTimes().AndReturn(self.fc) + + fake_q = FakeQueue("myqueue", auto_create=False) + self.m.StubOutWithMock(self.fc, 'queue') + self.fc.queue("myqueue", + auto_create=False).MultipleTimes().AndReturn(fake_q) + self.m.StubOutWithMock(fake_q, 'delete') + fake_q.delete() + + self.m.ReplayAll() + + scheduler.TaskRunner(queue.create)() + scheduler.TaskRunner(queue.delete)() + self.m.VerifyAll() + + @utils.stack_delete_after + def test_update_in_place(self): + t = template_format.parse(wp_template) + self.parse_stack(t) + queue = self.stack['MyQueue2'] + queue.resource_id_set(queue.properties.get('name')) + self.m.StubOutWithMock(queue, 'marconi') + queue.marconi().MultipleTimes().AndReturn(self.fc) + fake_q = FakeQueue('myqueue', auto_create=False) + self.m.StubOutWithMock(self.fc, 'queue') + self.fc.queue('myqueue', + auto_create=False).MultipleTimes().AndReturn(fake_q) + self.m.StubOutWithMock(fake_q, 'metadata') + fake_q.metadata(new_meta={"key1": {"key2": "value", "key3": [1, 2]}}) + + # Expected to be called during update + fake_q.metadata(new_meta={'key1': 'value'}) + + self.m.ReplayAll() + + t = template_format.parse(wp_template) + new_queue = t['Resources']['MyQueue2'] + new_queue['Properties']['metadata'] = {'key1': 'value'} + + scheduler.TaskRunner(queue.create)() + scheduler.TaskRunner(queue.update, new_queue)() + self.m.VerifyAll() + + @utils.stack_delete_after + def test_update_replace(self): + t = template_format.parse(wp_template) + self.parse_stack(t) + queue = self.stack['MyQueue2'] + queue.resource_id_set(queue.properties.get('name')) + self.m.StubOutWithMock(queue, 'marconi') + queue.marconi().MultipleTimes().AndReturn(self.fc) + fake_q = FakeQueue('myqueue', auto_create=False) + self.m.StubOutWithMock(self.fc, 'queue') + self.fc.queue('myqueue', + auto_create=False).MultipleTimes().AndReturn(fake_q) + + self.m.ReplayAll() + + t = template_format.parse(wp_template) + t['Resources']['MyQueue2']['Properties']['name'] = 'new_queue' + new_queue = t['Resources']['MyQueue2'] + + scheduler.TaskRunner(queue.create)() + err = self.assertRaises(resource.UpdateReplace, + scheduler.TaskRunner(queue.update, + new_queue)) + msg = 'The Resource MyQueue2 requires replacement.' + self.assertEqual(msg, str(err)) + + self.m.VerifyAll()