diff --git a/marconi/storage/base.py b/marconi/storage/base.py index 836cb02e2..76b26a2ba 100644 --- a/marconi/storage/base.py +++ b/marconi/storage/base.py @@ -100,8 +100,8 @@ class QueueBase(ControllerBase): raise NotImplementedError @abc.abstractmethod - def get(self, name, project=None): - """Base method for queue retrieval. + def get_metadata(self, name, project=None): + """Base method for queue metadata retrieval. :param name: The queue name :param project: Project id diff --git a/marconi/storage/mongodb/queues.py b/marconi/storage/mongodb/queues.py index 69622f709..b4bf9f226 100644 --- a/marconi/storage/mongodb/queues.py +++ b/marconi/storage/mongodb/queues.py @@ -108,7 +108,7 @@ class QueueController(storage.QueueBase): yield marker_name and marker_name['next'] @utils.raises_conn_error - def get(self, name, project=None): + def get_metadata(self, name, project=None): queue = self._get(name, project) return queue.get('m', {}) diff --git a/marconi/storage/sqlite/queues.py b/marconi/storage/sqlite/queues.py index 8ae2bc7a6..01b5d7e47 100644 --- a/marconi/storage/sqlite/queues.py +++ b/marconi/storage/sqlite/queues.py @@ -71,7 +71,7 @@ class QueueController(base.QueueBase): yield it() yield marker_name['next'] - def get(self, name, project): + def get_metadata(self, name, project): if project is None: project = '' diff --git a/marconi/tests/storage/base.py b/marconi/tests/storage/base.py index 20bec25ef..5253b7de7 100644 --- a/marconi/tests/storage/base.py +++ b/marconi/tests/storage/base.py @@ -76,21 +76,21 @@ class QueueControllerTest(ControllerBaseTest): self.assertTrue(created) # Test Queue retrieval - metadata = self.controller.get('test', project=self.project) + metadata = self.controller.get_metadata('test', project=self.project) self.assertEqual(metadata, {}) # Test Queue Update created = self.controller.set_metadata('test', project=self.project, metadata=dict(meta='test_meta')) - metadata = self.controller.get('test', project=self.project) + metadata = self.controller.get_metadata('test', project=self.project) self.assertEqual(metadata['meta'], 'test_meta') # Touching an existing queue does not affect metadata created = self.controller.create('test', project=self.project) self.assertFalse(created) - metadata = self.controller.get('test', project=self.project) + metadata = self.controller.get_metadata('test', project=self.project) self.assertEqual(metadata['meta'], 'test_meta') # Test Queue Statistic @@ -105,7 +105,7 @@ class QueueControllerTest(ControllerBaseTest): # Test DoesNotExist Exception with testing.expect(storage.exceptions.DoesNotExist): - self.controller.get('test', project=self.project) + self.controller.get_metadata('test', project=self.project) with testing.expect(storage.exceptions.DoesNotExist): self.controller.set_metadata('test', '{}', project=self.project) diff --git a/marconi/tests/transport/wsgi/test_media_type.py b/marconi/tests/transport/wsgi/test_media_type.py index ddb714c55..418e44b15 100644 --- a/marconi/tests/transport/wsgi/test_media_type.py +++ b/marconi/tests/transport/wsgi/test_media_type.py @@ -29,7 +29,7 @@ class TestWSGIMediaType(base.TestBase): endpoints = [ ('GET', '/v1/queues'), - ('GET', '/v1/queues/nonexistent'), + ('GET', '/v1/queues/nonexistent/metadata'), ('GET', '/v1/queues/nonexistent/stats'), ('POST', '/v1/queues/nonexistent/messages'), ('GET', '/v1/queues/nonexistent/messages/deadbeaf'), diff --git a/marconi/tests/transport/wsgi/test_queue_lifecycle.py b/marconi/tests/transport/wsgi/test_queue_lifecycle.py index e932ea8bd..cda17ee66 100644 --- a/marconi/tests/transport/wsgi/test_queue_lifecycle.py +++ b/marconi/tests/transport/wsgi/test_queue_lifecycle.py @@ -29,23 +29,36 @@ class QueueLifecycleBaseTest(base.TestBase): config_filename = None - def test_simple(self): + def test_basics_thoroughly(self): path = '/v1/queues/gumshoe' for project_id in ('480924', 'foo', '', None): - # Stats + # Stats not found - queue not created yet self.simulate_get(path + '/stats', project_id) self.assertEquals(self.srmock.status, falcon.HTTP_404) + # Metadata not found - queue not created yet + self.simulate_get(path + '/metadata', project_id) + self.assertEquals(self.srmock.status, falcon.HTTP_404) + # Create - doc = '{"messages": {"ttl": 600}}' - self.simulate_put(path, project_id, body=doc) + self.simulate_put(path, project_id) self.assertEquals(self.srmock.status, falcon.HTTP_201) location = ('Location', '/v1/queues/gumshoe') self.assertIn(location, self.srmock.headers) - result = self.simulate_get(path, project_id) + # Get on queues shouldn't work any more + self.simulate_get(path, project_id) + self.assertEquals(self.srmock.status, falcon.HTTP_405) + + # Add metadata + doc = '{"messages": {"ttl": 600}}' + self.simulate_put(path + '/metadata', project_id, body=doc) + self.assertEquals(self.srmock.status, falcon.HTTP_204) + + # Fetch metadata + result = self.simulate_get(path + '/metadata', project_id) result_doc = json.loads(result[0]) self.assertEquals(self.srmock.status, falcon.HTTP_200) self.assertEquals(result_doc, json.loads(doc)) @@ -54,70 +67,87 @@ class QueueLifecycleBaseTest(base.TestBase): self.simulate_delete(path, project_id) self.assertEquals(self.srmock.status, falcon.HTTP_204) - # Get non-existing - self.simulate_get(path, project_id) + # Get non-existent stats + self.simulate_get(path + '/stats', project_id) + self.assertEquals(self.srmock.status, falcon.HTTP_404) + + # Get non-existent metadata + self.simulate_get(path + '/metadata', project_id) self.assertEquals(self.srmock.status, falcon.HTTP_404) def test_no_metadata(self): self.simulate_put('/v1/queues/fizbat') - self.assertEquals(self.srmock.status, falcon.HTTP_400) + self.assertEquals(self.srmock.status, falcon.HTTP_201) def test_bad_metadata(self): + self.simulate_put('/v1/queues/fizbat', '7e55e1a7e') + self.assertEquals(self.srmock.status, falcon.HTTP_201) for document in ('{', '[]', '.', ' ', ''): - self.simulate_put('/v1/queues/fizbat', '7e55e1a7e', + self.simulate_put('/v1/queues/fizbat/metadata', '7e55e1a7e', body=document) self.assertEquals(self.srmock.status, falcon.HTTP_400) def test_too_much_metadata(self): + self.simulate_put('/v1/queues/fizbat', '7e55e1a7e') + self.assertEquals(self.srmock.status, falcon.HTTP_201) doc = '{"messages": {"ttl": 600}, "padding": "%s"}' padding_len = transport.MAX_QUEUE_METADATA_SIZE - (len(doc) - 2) + 1 doc = doc % ('x' * padding_len) - self.simulate_put('/v1/queues/fizbat', 'deadbeef', body=doc) + self.simulate_put('/v1/queues/fizbat/metadata', '7e55e1a7e', body=doc) self.assertEquals(self.srmock.status, falcon.HTTP_400) def test_way_too_much_metadata(self): + self.simulate_put('/v1/queues/fizbat', '7e55e1a7e') + self.assertEquals(self.srmock.status, falcon.HTTP_201) doc = '{"messages": {"ttl": 600}, "padding": "%s"}' padding_len = transport.MAX_QUEUE_METADATA_SIZE * 100 doc = doc % ('x' * padding_len) - self.simulate_put('/v1/queues/fizbat', 'deadbeef', body=doc) + self.simulate_put('/v1/queues/fizbat/metadata', '7e55e1a7e', body=doc) self.assertEquals(self.srmock.status, falcon.HTTP_400) def test_custom_metadata(self): + self.simulate_put('/v1/queues/fizbat', '480924') + self.assertEquals(self.srmock.status, falcon.HTTP_201) + # Set doc = '{"messages": {"ttl": 600}, "padding": "%s"}' padding_len = transport.MAX_QUEUE_METADATA_SIZE - (len(doc) - 2) doc = doc % ('x' * padding_len) - self.simulate_put('/v1/queues/fizbat', '480924', body=doc) - self.assertEquals(self.srmock.status, falcon.HTTP_201) - - # Get - result = self.simulate_get('/v1/queues/fizbat', '480924') - result_doc = json.loads(result[0]) - self.assertEquals(result_doc, json.loads(doc)) - - def test_update_metadata(self): - path = '/v1/queues/xyz' - project_id = '480924' - - # Create - doc1 = '{"messages": {"ttl": 600}}' - self.simulate_put(path, project_id, body=doc1) - self.assertEquals(self.srmock.status, falcon.HTTP_201) - - # Update - doc2 = '{"messages": {"ttl": 100}}' - self.simulate_put(path, project_id, body=doc2) + self.simulate_put('/v1/queues/fizbat/metadata', '480924', body=doc) self.assertEquals(self.srmock.status, falcon.HTTP_204) # Get - result = self.simulate_get(path, project_id) + result = self.simulate_get('/v1/queues/fizbat/metadata', '480924') + result_doc = json.loads(result[0]) + self.assertEquals(result_doc, json.loads(doc)) + self.assertEquals(self.srmock.status, falcon.HTTP_200) + + def test_update_metadata(self): + # Create + path = '/v1/queues/xyz' + project_id = '480924' + self.simulate_put(path, project_id) + self.assertEquals(self.srmock.status, falcon.HTTP_201) + + # Set meta + doc1 = '{"messages": {"ttl": 600}}' + self.simulate_put(path + '/metadata', project_id, body=doc1) + self.assertEquals(self.srmock.status, falcon.HTTP_204) + + # Update + doc2 = '{"messages": {"ttl": 100}}' + self.simulate_put(path + '/metadata', project_id, body=doc2) + self.assertEquals(self.srmock.status, falcon.HTTP_204) + + # Get + result = self.simulate_get(path + '/metadata', project_id) result_doc = json.loads(result[0]) self.assertEquals(result_doc, json.loads(doc2)) self.assertEquals(self.srmock.headers_dict['Content-Location'], - path) + path + '/metadata') def test_list(self): project_id = '644079696574693' @@ -144,10 +174,10 @@ class QueueLifecycleBaseTest(base.TestBase): '/v1/queues?limit=2') for queue in result_doc['queues']: - self.simulate_get(queue['href'], project_id) + self.simulate_get(queue['href'] + '/metadata', project_id) self.assertEquals(self.srmock.status, falcon.HTTP_200) - self.simulate_get(queue['href'], alt_project_id) + self.simulate_get(queue['href'] + '/metadata', alt_project_id) self.assertEquals(self.srmock.status, falcon.HTTP_404) self.assertNotIn('metadata', queue) @@ -161,7 +191,7 @@ class QueueLifecycleBaseTest(base.TestBase): [target, params] = result_doc['links'][0]['href'].split('?') [queue] = result_doc['queues'] - result = self.simulate_get(queue['href'], project_id) + result = self.simulate_get(queue['href'] + '/metadata', project_id) result_doc = json.loads(result[0]) self.assertEquals(result_doc, queue['metadata']) @@ -205,7 +235,7 @@ class QueueFaultyDriverTests(base.TestBaseFaulty): location = ('Location', path) self.assertNotIn(location, self.srmock.headers) - result = self.simulate_get(path, '480924') + result = self.simulate_get(path + '/metadata', '480924') result_doc = json.loads(result[0]) self.assertEquals(self.srmock.status, falcon.HTTP_503) self.assertNotEquals(result_doc, json.loads(doc)) diff --git a/marconi/tests/util/faulty_storage.py b/marconi/tests/util/faulty_storage.py index ea7043fba..f5a96d0ac 100644 --- a/marconi/tests/util/faulty_storage.py +++ b/marconi/tests/util/faulty_storage.py @@ -38,7 +38,7 @@ class QueueController(storage.QueueBase): def list(self, project=None): raise NotImplementedError() - def get(self, name, project=None): + def get_metadata(self, name, project=None): raise NotImplementedError() def create(self, name, project=None): diff --git a/marconi/transport/wsgi/driver.py b/marconi/transport/wsgi/driver.py index 4f9a2e635..b0f439530 100644 --- a/marconi/transport/wsgi/driver.py +++ b/marconi/transport/wsgi/driver.py @@ -23,6 +23,7 @@ from marconi.transport import auth from marconi.transport.wsgi import claims from marconi.transport.wsgi import health from marconi.transport.wsgi import messages +from marconi.transport.wsgi import metadata from marconi.transport.wsgi import queues from marconi.transport.wsgi import stats @@ -79,6 +80,11 @@ class Driver(transport.DriverBase): self.app.add_route('/v1/queues/{queue_name}' '/stats', stats_endpoint) + # Metadata Endpoints + metadata_endpoint = metadata.Resource(queue_controller) + self.app.add_route('/v1/queues/{queue_name}' + '/metadata', metadata_endpoint) + # Messages Endpoints msg_collection = messages.CollectionResource(message_controller) self.app.add_route('/v1/queues/{queue_name}' diff --git a/marconi/transport/wsgi/metadata.py b/marconi/transport/wsgi/metadata.py new file mode 100644 index 000000000..cce285389 --- /dev/null +++ b/marconi/transport/wsgi/metadata.py @@ -0,0 +1,95 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import falcon + +import marconi.openstack.common.log as logging +from marconi.storage import exceptions as storage_exceptions +from marconi import transport +from marconi.transport import helpers +from marconi.transport.wsgi import exceptions as wsgi_exceptions + + +LOG = logging.getLogger(__name__) + + +class Resource(object): + __slots__ = ('queue_ctrl', ) + + def __init__(self, queue_controller): + self.queue_ctrl = queue_controller + + def on_get(self, req, resp, project_id, queue_name): + LOG.debug(_("Queue metadata GET - queue: %(queue)s, " + "project: %(project)s") % + {"queue": queue_name, "project": project_id}) + + try: + resp_dict = self.queue_ctrl.get_metadata(queue_name, + project=project_id) + + except storage_exceptions.DoesNotExist: + raise falcon.HTTPNotFound() + + except Exception as ex: + LOG.exception(ex) + description = _('Queue metadata could not be retrieved.') + raise wsgi_exceptions.HTTPServiceUnavailable(description) + + resp.content_location = req.path + resp.body = helpers.to_json(resp_dict) + resp.status = falcon.HTTP_200 + + def on_put(self, req, resp, project_id, queue_name): + LOG.debug(_("Queue metadata PUT - queue: %(queue)s, " + "project: %(project)s") % + {"queue": queue_name, "project": project_id}) + + # TODO(kgriffs): Migrate this check to input validator middleware + if req.content_length > transport.MAX_QUEUE_METADATA_SIZE: + description = _('Queue metadata size is too large.') + raise wsgi_exceptions.HTTPBadRequestBody(description) + + # Deserialize queue metadata + try: + metadata = helpers.read_json(req.stream, req.content_length) + except helpers.MalformedJSON: + description = _('Request body could not be parsed.') + raise wsgi_exceptions.HTTPBadRequestBody(description) + except Exception as ex: + LOG.exception(ex) + description = _('Request body could not be read.') + raise wsgi_exceptions.HTTPServiceUnavailable(description) + + # Metadata must be a JSON object + if not isinstance(metadata, dict): + description = _('Queue metadata must be an object.') + raise wsgi_exceptions.HTTPBadRequestBody(description) + + try: + self.queue_ctrl.set_metadata(queue_name, + metadata=metadata, + project=project_id) + + except storage_exceptions.QueueDoesNotExist: + raise falcon.HTTPNotFound() + + except Exception as ex: + LOG.exception(ex) + description = _('Metadata could not be updated.') + raise wsgi_exceptions.HTTPServiceUnavailable(description) + + resp.status = falcon.HTTP_204 + resp.location = req.path diff --git a/marconi/transport/wsgi/queues.py b/marconi/transport/wsgi/queues.py index d5d3cbf4d..bfee70de3 100644 --- a/marconi/transport/wsgi/queues.py +++ b/marconi/transport/wsgi/queues.py @@ -16,8 +16,6 @@ import falcon import marconi.openstack.common.log as logging -from marconi.storage import exceptions as storage_exceptions -from marconi import transport from marconi.transport import helpers from marconi.transport.wsgi import exceptions as wsgi_exceptions @@ -37,38 +35,12 @@ class ItemResource(object): LOG.debug(_("Queue item PUT - queue: %(queue)s, " "project: %(project)s") % {"queue": queue_name, "project": project_id}) - # TODO(kgriffs): Migrate this check to input validator middleware - if req.content_length > transport.MAX_QUEUE_METADATA_SIZE: - description = _('Queue metadata size is too large.') - raise wsgi_exceptions.HTTPBadRequestBody(description) - # Deserialize queue metadata - try: - metadata = helpers.read_json(req.stream, req.content_length) - except helpers.MalformedJSON: - description = _('Request body could not be parsed.') - raise wsgi_exceptions.HTTPBadRequestBody(description) - except Exception as ex: - LOG.exception(ex) - description = _('Request body could not be read.') - raise wsgi_exceptions.HTTPServiceUnavailable(description) - - # Metadata must be a JSON object - if not isinstance(metadata, dict): - description = _('Queue metadata must be an object.') - raise wsgi_exceptions.HTTPBadRequestBody(description) - - # Create or update the queue try: created = self.queue_controller.create( queue_name, project=project_id) - self.queue_controller.set_metadata( - queue_name, - metadata=metadata, - project=project_id) - except Exception as ex: LOG.exception(ex) description = _('Queue could not be created.') @@ -77,26 +49,6 @@ class ItemResource(object): resp.status = falcon.HTTP_201 if created else falcon.HTTP_204 resp.location = req.path - def on_get(self, req, resp, project_id, queue_name): - LOG.debug(_("Queue item GET - queue: %(queue)s, " - "project: %(project)s") % - {"queue": queue_name, "project": project_id}) - - try: - doc = self.queue_controller.get(queue_name, project=project_id) - - except storage_exceptions.DoesNotExist: - raise falcon.HTTPNotFound() - - except Exception as ex: - LOG.exception(ex) - description = _('Queue metadata could not be retrieved.') - raise wsgi_exceptions.HTTPServiceUnavailable(description) - - else: - resp.content_location = req.relative_uri - resp.body = helpers.to_json(doc) - def on_delete(self, req, resp, project_id, queue_name): LOG.debug(_("Queue item DELETE - queue: %(queue)s, " "project: %(project)s") %