Marconi message queue resource implementation

This implements a Marconi backed native OpenStack message queue. Customers
can create a OS::Marconi::Queue queue resource declaratively in templates
and pass the href/endpoint of the queue to other resources by means of href
attribute of the queue.

Marconi bp,
https://blueprints.launchpad.net/marconi/+spec/heat-template

Implements: blueprint mqaas-marconi-resource

Change-Id: Icbbb1869b352dbdba22530f9ec185652f4da75b6
This commit is contained in:
cbjchen@cn.ibm.com 2014-01-14 14:48:14 +08:00
parent 834eb42e2e
commit a61d859039
7 changed files with 430 additions and 0 deletions

View File

@ -0,0 +1,21 @@
Marconi plugin for OpenStack Heat
================================
This plugin enable using Marconi queuing service as a resource in a Heat template.
### 1. Install the Marconi plugin in Heat
NOTE: Heat scans several directories to find plugins. The list of directories
is specified in the configuration file "heat.conf" with the "plugin_dirs"
directive.
To install the Marconi plugin, one needs to first make sure the
python-marconiclient package is installed - pip install -r requirements.txt, and
copy the plugin implementation, e.g. queue.py to wherever plugin_dirs points to.
### 2. Restart heat
Only the process "heat-engine" needs to be restarted to load the newly installed
plugin.

View File

View File

@ -0,0 +1,167 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from heat.common import exception
from heat.engine import clients
from heat.engine import properties
from heat.engine import resource
from heat.openstack.common import log as logging
logger = logging.getLogger(__name__)
try:
from marconiclient.queues.v1 import client as marconiclient
except ImportError:
marconiclient = None
logger.info(_('marconiclient not available'))
def resource_mapping():
return {}
else:
def resource_mapping():
return {
'OS::Marconi::Queue': MarconiQueue,
}
class Clients(clients.OpenStackClients):
'''
Convenience class to create and cache client instances.
'''
def __init__(self, context):
super(Clients, self).__init__(context)
self._marconi = None
def marconi(self, service_type="queuing"):
if self._marconi:
return self._marconi
con = self.context
if self.auth_token is None:
logger.error(_("Marconi connection failed, no auth_token!"))
return None
opts = {
'os_auth_token': con.auth_token,
'os_auth_url': con.auth_url,
'os_project_id': con.tenant,
'os_service_type': service_type,
}
auth_opts = {'backend': 'keystone',
'options': opts}
conf = {'auth_opts': auth_opts}
endpoint = self.url_for(service_type=service_type)
self._marconi = marconiclient.Client(url=endpoint, conf=conf)
return self._marconi
class MarconiQueue(resource.Resource):
PROPERTIES = (
NAME, METADATA,
) = (
'name', 'metadata',
)
properties_schema = {
NAME: properties.Schema(
properties.Schema.STRING,
_("Name of the queue instance to create."),
required=True),
METADATA: properties.Schema(
properties.Schema.MAP,
description=_("Arbitrary key/value metadata to store "
"contextual information about this queue."),
update_allowed=True)
}
attributes_schema = {
"queue_id": _("ID of the queue."),
"href": _("The resource href of the queue.")
}
update_allowed_keys = ('Properties',)
def __init__(self, name, json_snippet, stack):
super(MarconiQueue, self).__init__(name, json_snippet, stack)
self.clients = Clients(self.context)
def marconi(self):
return self.clients.marconi()
def physical_resource_name(self):
return self.properties[self.NAME]
def handle_create(self):
'''
Create a marconi message queue.
'''
queue_name = self.physical_resource_name()
queue = self.marconi().queue(queue_name, auto_create=False)
# Marconi client doesn't report an error if an queue with the same
# id/name already exists, which can cause issue with stack update.
if queue.exists():
raise exception.Error(_('Message queue %s already exists.')
% queue_name)
queue.ensure_exists()
self.resource_id_set(queue_name)
return queue
def check_create_complete(self, queue):
# set metadata of the newly created queue
if queue.exists():
metadata = self.properties.get('metadata')
if metadata:
queue.metadata(new_meta=metadata)
return True
queue_name = self.physical_resource_name()
raise exception.Error(_('Message queue %s creation failed.')
% queue_name)
def handle_update(self, json_snippet, tmpl_diff, prop_diff):
'''
Update queue metadata.
'''
if 'metadata' in prop_diff:
queue = self.marconi().queue(self.resource_id, auto_create=False)
metadata = prop_diff['metadata']
queue.metadata(new_meta=metadata)
def handle_delete(self):
'''
Delete a marconi message queue.
'''
if not self.resource_id:
return
queue = self.marconi().queue(self.resource_id, auto_create=False)
queue.delete()
def href(self):
api_endpoint = self.marconi().api_url
queue_name = self.physical_resource_name()
if api_endpoint.endswith('/'):
return '%squeues/%s' % (api_endpoint, queue_name)
else:
return '%s/queues/%s' % (api_endpoint, queue_name)
def _resolve_attribute(self, name):
if name == 'queue_id':
return self.resource_id
elif name == 'href':
return self.href()

View File

@ -0,0 +1 @@
python-marconiclient>=0.0.1a1

View File

View File

@ -0,0 +1,241 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from heat.common import exception
from heat.common import template_format
from heat.engine import parser
from heat.engine import resource
from heat.engine import scheduler
from heat.tests.common import HeatTestCase
from heat.tests import utils
from ..plugin import queue # noqa
wp_template = '''
{
"AWSTemplateFormatVersion" : "2010-09-09",
"Description" : "openstack Marconi queue service as a resource",
"Resources" : {
"MyQueue2" : {
"Type" : "OS::Marconi::Queue",
"Properties" : {
"name": "myqueue",
"metadata": { "key1": { "key2": "value", "key3": [1, 2] } }
}
}
},
"Outputs" : {
"queue_id": {
"Value": { "Fn::GetAtt" : [ "MyQueue2", "queue_id" ]},
"Description": "queue name"
},
"queue_href": {
"Value": { "Fn::GetAtt" : [ "MyQueue2", "href" ]},
"Description": "queue href"
}
}
}
'''
class FakeQueue(object):
def __init__(self, queue_name, auto_create=True):
self._id = queue_name
self._auto_create = auto_create
self._exists = False
def exists(self):
return self._exists
def ensure_exists(self):
self._exists = True
def metadata(self, new_meta=None):
pass
def delete(self):
pass
class MarconiMessageQueueTest(HeatTestCase):
def setUp(self):
super(MarconiMessageQueueTest, self).setUp()
self.fc = self.m.CreateMockAnything()
utils.setup_dummy_db()
self.ctx = utils.dummy_context()
resource._register_class("OS::Marconi::Queue",
queue.MarconiQueue)
def parse_stack(self, t):
stack_name = 'test_stack'
tmpl = parser.Template(t)
self.stack = parser.Stack(self.ctx, stack_name, tmpl)
self.stack.validate()
self.stack.store()
@utils.stack_delete_after
def test_create(self):
t = template_format.parse(wp_template)
self.parse_stack(t)
queue = self.stack['MyQueue2']
self.m.StubOutWithMock(queue, 'marconi')
queue.marconi().MultipleTimes().AndReturn(self.fc)
fake_q = FakeQueue(queue.physical_resource_name(), auto_create=False)
self.m.StubOutWithMock(self.fc, 'queue')
self.fc.queue(queue.physical_resource_name(),
auto_create=False).AndReturn(fake_q)
self.m.StubOutWithMock(fake_q, 'exists')
fake_q.exists().AndReturn(False)
self.m.StubOutWithMock(fake_q, 'ensure_exists')
fake_q.ensure_exists()
fake_q.exists().AndReturn(True)
self.m.StubOutWithMock(fake_q, 'metadata')
fake_q.metadata(new_meta=queue.properties.get('metadata'))
self.m.ReplayAll()
scheduler.TaskRunner(queue.create)()
self.fc.api_url = 'http://127.0.0.1:8888/v1'
self.assertEqual('myqueue', queue.FnGetAtt('queue_id'))
self.assertEqual('http://127.0.0.1:8888/v1/queues/myqueue',
queue.FnGetAtt('href'))
self.m.VerifyAll()
@utils.stack_delete_after
def test_create_existing_queue(self):
t = template_format.parse(wp_template)
self.parse_stack(t)
queue = self.stack['MyQueue2']
self.m.StubOutWithMock(queue, 'marconi')
queue.marconi().MultipleTimes().AndReturn(self.fc)
fake_q = FakeQueue("myqueue", auto_create=False)
self.m.StubOutWithMock(self.fc, 'queue')
self.fc.queue("myqueue", auto_create=False).AndReturn(fake_q)
self.m.StubOutWithMock(fake_q, 'exists')
fake_q.exists().AndReturn(True)
self.m.ReplayAll()
err = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(queue.create))
self.assertEqual("Error: Message queue myqueue already exists.",
str(err))
self.m.VerifyAll()
@utils.stack_delete_after
def test_create_failed(self):
t = template_format.parse(wp_template)
self.parse_stack(t)
queue = self.stack['MyQueue2']
self.m.StubOutWithMock(queue, 'marconi')
queue.marconi().MultipleTimes().AndReturn(self.fc)
fake_q = FakeQueue("myqueue", auto_create=False)
self.m.StubOutWithMock(self.fc, 'queue')
self.fc.queue("myqueue", auto_create=False).AndReturn(fake_q)
self.m.StubOutWithMock(fake_q, 'exists')
fake_q.exists().AndReturn(False)
self.m.StubOutWithMock(fake_q, 'ensure_exists')
fake_q.ensure_exists()
fake_q.exists().AndReturn(False)
self.m.ReplayAll()
err = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(queue.create))
self.assertEqual("Error: Message queue myqueue creation failed.",
str(err))
self.m.VerifyAll()
@utils.stack_delete_after
def test_delete(self):
t = template_format.parse(wp_template)
self.parse_stack(t)
queue = self.stack['MyQueue2']
queue.resource_id_set(queue.properties.get('name'))
self.m.StubOutWithMock(queue, 'marconi')
queue.marconi().MultipleTimes().AndReturn(self.fc)
fake_q = FakeQueue("myqueue", auto_create=False)
self.m.StubOutWithMock(self.fc, 'queue')
self.fc.queue("myqueue",
auto_create=False).MultipleTimes().AndReturn(fake_q)
self.m.StubOutWithMock(fake_q, 'delete')
fake_q.delete()
self.m.ReplayAll()
scheduler.TaskRunner(queue.create)()
scheduler.TaskRunner(queue.delete)()
self.m.VerifyAll()
@utils.stack_delete_after
def test_update_in_place(self):
t = template_format.parse(wp_template)
self.parse_stack(t)
queue = self.stack['MyQueue2']
queue.resource_id_set(queue.properties.get('name'))
self.m.StubOutWithMock(queue, 'marconi')
queue.marconi().MultipleTimes().AndReturn(self.fc)
fake_q = FakeQueue('myqueue', auto_create=False)
self.m.StubOutWithMock(self.fc, 'queue')
self.fc.queue('myqueue',
auto_create=False).MultipleTimes().AndReturn(fake_q)
self.m.StubOutWithMock(fake_q, 'metadata')
fake_q.metadata(new_meta={"key1": {"key2": "value", "key3": [1, 2]}})
# Expected to be called during update
fake_q.metadata(new_meta={'key1': 'value'})
self.m.ReplayAll()
t = template_format.parse(wp_template)
new_queue = t['Resources']['MyQueue2']
new_queue['Properties']['metadata'] = {'key1': 'value'}
scheduler.TaskRunner(queue.create)()
scheduler.TaskRunner(queue.update, new_queue)()
self.m.VerifyAll()
@utils.stack_delete_after
def test_update_replace(self):
t = template_format.parse(wp_template)
self.parse_stack(t)
queue = self.stack['MyQueue2']
queue.resource_id_set(queue.properties.get('name'))
self.m.StubOutWithMock(queue, 'marconi')
queue.marconi().MultipleTimes().AndReturn(self.fc)
fake_q = FakeQueue('myqueue', auto_create=False)
self.m.StubOutWithMock(self.fc, 'queue')
self.fc.queue('myqueue',
auto_create=False).MultipleTimes().AndReturn(fake_q)
self.m.ReplayAll()
t = template_format.parse(wp_template)
t['Resources']['MyQueue2']['Properties']['name'] = 'new_queue'
new_queue = t['Resources']['MyQueue2']
scheduler.TaskRunner(queue.create)()
err = self.assertRaises(resource.UpdateReplace,
scheduler.TaskRunner(queue.update,
new_queue))
msg = 'The Resource MyQueue2 requires replacement.'
self.assertEqual(msg, str(err))
self.m.VerifyAll()