diff --git a/marconi/queues/bootstrap.py b/marconi/queues/bootstrap.py index 954148e76..9eda2abd2 100644 --- a/marconi/queues/bootstrap.py +++ b/marconi/queues/bootstrap.py @@ -70,8 +70,6 @@ class Bootstrap(object): self.driver_conf = self.conf[_DRIVER_GROUP] log.setup('marconi') - mode = 'admin' if conf.admin_mode else 'public' - self._transport_type = 'marconi.queues.{0}.transport'.format(mode) @decorators.lazy_property(write=False) def storage(self): @@ -110,9 +108,15 @@ class Bootstrap(object): transport_name = self.driver_conf.transport LOG.debug(_(u'Loading transport driver: %s'), transport_name) - args = [self.conf, self.storage, self.cache, self.control] + args = [ + self.conf, + self.storage, + self.cache, + self.control, + ] + try: - mgr = driver.DriverManager(self._transport_type, + mgr = driver.DriverManager('marconi.queues.transport', transport_name, invoke_on_load=True, invoke_args=args) diff --git a/marconi/queues/transport/wsgi/__init__.py b/marconi/queues/transport/wsgi/__init__.py index 5c444dff3..47957b62d 100644 --- a/marconi/queues/transport/wsgi/__init__.py +++ b/marconi/queues/transport/wsgi/__init__.py @@ -16,4 +16,4 @@ from marconi.queues.transport.wsgi import driver # Hoist into package namespace -Driver = driver.DriverBase +Driver = driver.Driver diff --git a/marconi/queues/transport/wsgi/admin/__init__.py b/marconi/queues/transport/wsgi/admin/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/marconi/queues/transport/wsgi/admin/driver.py b/marconi/queues/transport/wsgi/admin/driver.py deleted file mode 100644 index 5669698a4..000000000 --- a/marconi/queues/transport/wsgi/admin/driver.py +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright (c) 2013 Rackspace Hosting, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""marconi-queues (admin): interface for managing partitions.""" - -from marconi.common.transport.wsgi import health -from marconi.queues.transport.wsgi.public import driver as public_driver -from marconi.queues.transport.wsgi import shards - - -class Driver(public_driver.Driver): - - @property - def bridge(self): - shards_controller = self._control.shards_controller - return super(Driver, self).bridge + [ - ('/shards', - shards.Listing(shards_controller)), - ('/shards/{shard}', - shards.Resource(shards_controller)), - ('/health', - health.Resource(self._storage)) - ] diff --git a/marconi/queues/transport/wsgi/driver.py b/marconi/queues/transport/wsgi/driver.py index 242c59da4..2d86296c3 100644 --- a/marconi/queues/transport/wsgi/driver.py +++ b/marconi/queues/transport/wsgi/driver.py @@ -13,17 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import abc import functools import itertools from wsgiref import simple_server import falcon from oslo.config import cfg -import six from marconi.common import decorators -from marconi.common.transport import version from marconi.common.transport.wsgi import helpers from marconi.common import utils from marconi.openstack.common.gettextutils import _ @@ -31,6 +28,8 @@ import marconi.openstack.common.log as logging from marconi.queues import transport from marconi.queues.transport import auth from marconi.queues.transport import validation +from marconi.queues.transport.wsgi import v1_0 +from marconi.queues.transport.wsgi import v1_1 _WSGI_OPTIONS = [ cfg.StrOpt('bind', default='127.0.0.1', @@ -49,11 +48,10 @@ def _config_options(): return itertools.chain(utils.options_iter(_WSGI_OPTIONS, _WSGI_GROUP)) -@six.add_metaclass(abc.ABCMeta) -class DriverBase(transport.DriverBase): +class Driver(transport.DriverBase): def __init__(self, conf, storage, cache, control): - super(DriverBase, self).__init__(conf, storage, cache, control) + super(Driver, self).__init__(conf, storage, cache, control) self._conf.register_opts(_WSGI_OPTIONS, group=_WSGI_GROUP) self._wsgi_conf = self._conf[_WSGI_GROUP] @@ -77,10 +75,23 @@ class DriverBase(transport.DriverBase): def _init_routes(self): """Initialize hooks and URI routes to resources.""" + + catalog = [ + ('/v1', v1_0.public_endpoints(self)), + ('/v1.1', v1_1.public_endpoints(self)), + ] + + if self._conf.admin_mode: + catalog.extend([ + ('/v1', v1_0.private_endpoints(self)), + ('/v1.1', v1_1.private_endpoints(self)), + ]) + self.app = falcon.API(before=self.before_hooks) - version_path = version.path() - for route, resource in self.bridge: - self.app.add_route(version_path + route, resource) + + for version_path, endpoints in catalog: + for route, resource in endpoints: + self.app.add_route(version_path + route, resource) def _init_middleware(self): """Initialize WSGI middlewarez.""" @@ -90,17 +101,6 @@ class DriverBase(transport.DriverBase): strategy = auth.strategy(self._conf.auth_strategy) self.app = strategy.install(self.app, self._conf) - @abc.abstractproperty - def bridge(self): - """Constructs a list of route/responder pairs that can be used to - establish the functionality of this driver. - - Note: the routes should be unversioned. - - :rtype: [(str, falcon-compatible responser)] - """ - raise NotImplementedError - def listen(self): """Self-host using 'bind' and 'port' from the WSGI config group.""" diff --git a/marconi/queues/transport/wsgi/public/__init__.py b/marconi/queues/transport/wsgi/public/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/marconi/queues/transport/wsgi/public/driver.py b/marconi/queues/transport/wsgi/public/driver.py deleted file mode 100644 index d159c3aed..000000000 --- a/marconi/queues/transport/wsgi/public/driver.py +++ /dev/null @@ -1,76 +0,0 @@ -# Copyright (c) 2013 Rackspace Hosting, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""marconi-queues public interface. - -Handles all the routes for queuing, messaging, and claiming. -""" - -from marconi.common.transport.wsgi import health -from marconi.queues.transport.wsgi import claims -from marconi.queues.transport.wsgi import driver -from marconi.queues.transport.wsgi import messages -from marconi.queues.transport.wsgi import metadata -from marconi.queues.transport.wsgi import queues -from marconi.queues.transport.wsgi import stats -from marconi.queues.transport.wsgi import v1 - - -class Driver(driver.DriverBase): - - @property - def bridge(self): - queue_controller = self._storage.queue_controller - message_controller = self._storage.message_controller - claim_controller = self._storage.claim_controller - return [ - # Home - ('/', - v1.Resource()), - - # Queues Endpoints - ('/queues', - queues.CollectionResource(self._validate, - queue_controller)), - ('/queues/{queue_name}', - queues.ItemResource(queue_controller, - message_controller)), - ('/queues/{queue_name}/stats', - stats.Resource(queue_controller)), - ('/queues/{queue_name}/metadata', - metadata.Resource(self._wsgi_conf, self._validate, - queue_controller)), - - # Messages Endpoints - ('/queues/{queue_name}/messages', - messages.CollectionResource(self._wsgi_conf, - self._validate, - message_controller)), - ('/queues/{queue_name}/messages/{message_id}', - messages.ItemResource(message_controller)), - - # Claims Endpoints - ('/queues/{queue_name}/claims', - claims.CollectionResource(self._wsgi_conf, - self._validate, - claim_controller)), - ('/queues/{queue_name}/claims/{claim_id}', - claims.ItemResource(self._wsgi_conf, - self._validate, - claim_controller)), - - # Health - ('/health', - health.Resource(self._storage)) - ] diff --git a/marconi/queues/transport/wsgi/v1_0/__init__.py b/marconi/queues/transport/wsgi/v1_0/__init__.py new file mode 100644 index 000000000..a6c91d7bd --- /dev/null +++ b/marconi/queues/transport/wsgi/v1_0/__init__.py @@ -0,0 +1,66 @@ +from marconi.queues.transport.wsgi.v1_0 import claims +from marconi.queues.transport.wsgi.v1_0 import health +from marconi.queues.transport.wsgi.v1_0 import homedoc +from marconi.queues.transport.wsgi.v1_0 import messages +from marconi.queues.transport.wsgi.v1_0 import metadata +from marconi.queues.transport.wsgi.v1_0 import queues +from marconi.queues.transport.wsgi.v1_0 import shards +from marconi.queues.transport.wsgi.v1_0 import stats + + +def public_endpoints(driver): + queue_controller = driver._storage.queue_controller + message_controller = driver._storage.message_controller + claim_controller = driver._storage.claim_controller + + return [ + # Home + ('/', + homedoc.Resource()), + + # Queues Endpoints + ('/queues', + queues.CollectionResource(driver._validate, + queue_controller)), + ('/queues/{queue_name}', + queues.ItemResource(queue_controller, + message_controller)), + ('/queues/{queue_name}/stats', + stats.Resource(queue_controller)), + ('/queues/{queue_name}/metadata', + metadata.Resource(driver._wsgi_conf, driver._validate, + queue_controller)), + + # Messages Endpoints + ('/queues/{queue_name}/messages', + messages.CollectionResource(driver._wsgi_conf, + driver._validate, + message_controller)), + ('/queues/{queue_name}/messages/{message_id}', + messages.ItemResource(message_controller)), + + # Claims Endpoints + ('/queues/{queue_name}/claims', + claims.CollectionResource(driver._wsgi_conf, + driver._validate, + claim_controller)), + ('/queues/{queue_name}/claims/{claim_id}', + claims.ItemResource(driver._wsgi_conf, + driver._validate, + claim_controller)), + + # Health + ('/health', + health.Resource(driver._storage)) + ] + + +def private_endpoints(driver): + shards_controller = driver._control.shards_controller + + return [ + ('/shards', + shards.Listing(shards_controller)), + ('/shards/{shard}', + shards.Resource(shards_controller)), + ] diff --git a/marconi/queues/transport/wsgi/claims.py b/marconi/queues/transport/wsgi/v1_0/claims.py similarity index 100% rename from marconi/queues/transport/wsgi/claims.py rename to marconi/queues/transport/wsgi/v1_0/claims.py diff --git a/marconi/common/transport/wsgi/health.py b/marconi/queues/transport/wsgi/v1_0/health.py similarity index 100% rename from marconi/common/transport/wsgi/health.py rename to marconi/queues/transport/wsgi/v1_0/health.py diff --git a/marconi/queues/transport/wsgi/v1.py b/marconi/queues/transport/wsgi/v1_0/homedoc.py similarity index 100% rename from marconi/queues/transport/wsgi/v1.py rename to marconi/queues/transport/wsgi/v1_0/homedoc.py diff --git a/marconi/queues/transport/wsgi/messages.py b/marconi/queues/transport/wsgi/v1_0/messages.py similarity index 100% rename from marconi/queues/transport/wsgi/messages.py rename to marconi/queues/transport/wsgi/v1_0/messages.py diff --git a/marconi/queues/transport/wsgi/metadata.py b/marconi/queues/transport/wsgi/v1_0/metadata.py similarity index 100% rename from marconi/queues/transport/wsgi/metadata.py rename to marconi/queues/transport/wsgi/v1_0/metadata.py diff --git a/marconi/queues/transport/wsgi/queues.py b/marconi/queues/transport/wsgi/v1_0/queues.py similarity index 100% rename from marconi/queues/transport/wsgi/queues.py rename to marconi/queues/transport/wsgi/v1_0/queues.py diff --git a/marconi/queues/transport/wsgi/shards.py b/marconi/queues/transport/wsgi/v1_0/shards.py similarity index 100% rename from marconi/queues/transport/wsgi/shards.py rename to marconi/queues/transport/wsgi/v1_0/shards.py diff --git a/marconi/queues/transport/wsgi/stats.py b/marconi/queues/transport/wsgi/v1_0/stats.py similarity index 100% rename from marconi/queues/transport/wsgi/stats.py rename to marconi/queues/transport/wsgi/v1_0/stats.py diff --git a/marconi/queues/transport/wsgi/v1_1/__init__.py b/marconi/queues/transport/wsgi/v1_1/__init__.py new file mode 100644 index 000000000..0302376e1 --- /dev/null +++ b/marconi/queues/transport/wsgi/v1_1/__init__.py @@ -0,0 +1,66 @@ +from marconi.queues.transport.wsgi.v1_1 import claims +from marconi.queues.transport.wsgi.v1_1 import health +from marconi.queues.transport.wsgi.v1_1 import homedoc +from marconi.queues.transport.wsgi.v1_1 import messages +from marconi.queues.transport.wsgi.v1_1 import metadata +from marconi.queues.transport.wsgi.v1_1 import queues +from marconi.queues.transport.wsgi.v1_1 import shards +from marconi.queues.transport.wsgi.v1_1 import stats + + +def public_endpoints(driver): + queue_controller = driver._storage.queue_controller + message_controller = driver._storage.message_controller + claim_controller = driver._storage.claim_controller + + return [ + # Home + ('/', + homedoc.Resource()), + + # Queues Endpoints + ('/queues', + queues.CollectionResource(driver._validate, + queue_controller)), + ('/queues/{queue_name}', + queues.ItemResource(queue_controller, + message_controller)), + ('/queues/{queue_name}/stats', + stats.Resource(queue_controller)), + ('/queues/{queue_name}/metadata', + metadata.Resource(driver._wsgi_conf, driver._validate, + queue_controller)), + + # Messages Endpoints + ('/queues/{queue_name}/messages', + messages.CollectionResource(driver._wsgi_conf, + driver._validate, + message_controller)), + ('/queues/{queue_name}/messages/{message_id}', + messages.ItemResource(message_controller)), + + # Claims Endpoints + ('/queues/{queue_name}/claims', + claims.CollectionResource(driver._wsgi_conf, + driver._validate, + claim_controller)), + ('/queues/{queue_name}/claims/{claim_id}', + claims.ItemResource(driver._wsgi_conf, + driver._validate, + claim_controller)), + + # Health + ('/health', + health.Resource(driver._storage)) + ] + + +def private_endpoints(driver): + shards_controller = driver._control.shards_controller + + return [ + ('/shards', + shards.Listing(shards_controller)), + ('/shards/{shard}', + shards.Resource(shards_controller)), + ] diff --git a/marconi/queues/transport/wsgi/v1_1/claims.py b/marconi/queues/transport/wsgi/v1_1/claims.py new file mode 100644 index 000000000..5e0536253 --- /dev/null +++ b/marconi/queues/transport/wsgi/v1_1/claims.py @@ -0,0 +1,199 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import falcon +import six + +from marconi.openstack.common.gettextutils import _ +import marconi.openstack.common.log as logging +from marconi.queues.storage import errors as storage_errors +from marconi.queues.transport import utils +from marconi.queues.transport import validation +from marconi.queues.transport.wsgi import errors as wsgi_errors +from marconi.queues.transport.wsgi import utils as wsgi_utils + +LOG = logging.getLogger(__name__) + +CLAIM_POST_SPEC = (('ttl', int), ('grace', int)) +CLAIM_PATCH_SPEC = (('ttl', int),) + + +class Resource(object): + + __slots__ = ('claim_controller', '_validate') + + def __init__(self, wsgi_conf, validate, claim_controller): + self.claim_controller = claim_controller + self._validate = validate + + +class CollectionResource(Resource): + + def on_post(self, req, resp, project_id, queue_name): + LOG.debug(_(u'Claims collection POST - queue: %(queue)s, ' + u'project: %(project)s'), + {'queue': queue_name, 'project': project_id}) + + # Check for an explicit limit on the # of messages to claim + limit = req.get_param_as_int('limit') + claim_options = {} if limit is None else {'limit': limit} + + # Read claim metadata (e.g., TTL) and raise appropriate + # HTTP errors as needed. + metadata, = wsgi_utils.filter_stream(req.stream, req.content_length, + CLAIM_POST_SPEC) + + # Claim some messages + try: + self._validate.claim_creation(metadata, limit=limit) + cid, msgs = self.claim_controller.create( + queue_name, + metadata=metadata, + project=project_id, + **claim_options) + + # Buffer claimed messages + # TODO(kgriffs): optimize, along with serialization (below) + resp_msgs = list(msgs) + + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + + except Exception as ex: + LOG.exception(ex) + description = _(u'Claim could not be created.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + # Serialize claimed messages, if any. This logic assumes + # the storage driver returned well-formed messages. + if len(resp_msgs) != 0: + for msg in resp_msgs: + msg['href'] = _msg_uri_from_claim( + req.path.rpartition('/')[0], msg['id'], cid) + + del msg['id'] + + resp.location = req.path + '/' + cid + resp.body = utils.to_json(resp_msgs) + resp.status = falcon.HTTP_201 + else: + resp.status = falcon.HTTP_204 + + +class ItemResource(Resource): + + __slots__ = ('claim_controller', '_validate') + + def __init__(self, wsgi_conf, validate, claim_controller): + self.claim_controller = claim_controller + self._validate = validate + + def on_get(self, req, resp, project_id, queue_name, claim_id): + LOG.debug(_(u'Claim item GET - claim: %(claim_id)s, ' + u'queue: %(queue_name)s, project: %(project_id)s'), + {'queue_name': queue_name, + 'project_id': project_id, + 'claim_id': claim_id}) + try: + meta, msgs = self.claim_controller.get( + queue_name, + claim_id=claim_id, + project=project_id) + + # Buffer claimed messages + # TODO(kgriffs): Optimize along with serialization (see below) + meta['messages'] = list(msgs) + + except storage_errors.DoesNotExist as ex: + LOG.debug(ex) + raise falcon.HTTPNotFound() + except Exception as ex: + LOG.exception(ex) + description = _(u'Claim could not be queried.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + # Serialize claimed messages + # TODO(kgriffs): Optimize + for msg in meta['messages']: + msg['href'] = _msg_uri_from_claim( + req.path.rsplit('/', 2)[0], msg['id'], meta['id']) + del msg['id'] + + meta['href'] = req.path + del meta['id'] + + resp.content_location = req.relative_uri + resp.body = utils.to_json(meta) + # status defaults to 200 + + def on_patch(self, req, resp, project_id, queue_name, claim_id): + LOG.debug(_(u'Claim Item PATCH - claim: %(claim_id)s, ' + u'queue: %(queue_name)s, project:%(project_id)s') % + {'queue_name': queue_name, + 'project_id': project_id, + 'claim_id': claim_id}) + + # Read claim metadata (e.g., TTL) and raise appropriate + # HTTP errors as needed. + metadata, = wsgi_utils.filter_stream(req.stream, req.content_length, + CLAIM_PATCH_SPEC) + + try: + self._validate.claim_updating(metadata) + self.claim_controller.update(queue_name, + claim_id=claim_id, + metadata=metadata, + project=project_id) + + resp.status = falcon.HTTP_204 + + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + + except storage_errors.DoesNotExist as ex: + LOG.debug(ex) + raise falcon.HTTPNotFound() + + except Exception as ex: + LOG.exception(ex) + description = _(u'Claim could not be updated.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + def on_delete(self, req, resp, project_id, queue_name, claim_id): + LOG.debug(_(u'Claim item DELETE - claim: %(claim_id)s, ' + u'queue: %(queue_name)s, project: %(project_id)s') % + {'queue_name': queue_name, + 'project_id': project_id, + 'claim_id': claim_id}) + try: + self.claim_controller.delete(queue_name, + claim_id=claim_id, + project=project_id) + + resp.status = falcon.HTTP_204 + + except Exception as ex: + LOG.exception(ex) + description = _(u'Claim could not be deleted.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + +# TODO(kgriffs): Clean up/optimize and move to wsgi.utils +def _msg_uri_from_claim(base_path, msg_id, claim_id): + return '/'.join( + [base_path, 'messages', msg_id] + ) + falcon.to_query_str({'claim_id': claim_id}) diff --git a/marconi/common/transport/version.py b/marconi/queues/transport/wsgi/v1_1/health.py similarity index 58% rename from marconi/common/transport/version.py rename to marconi/queues/transport/wsgi/v1_1/health.py index ed4509820..3043e32c7 100644 --- a/marconi/common/transport/version.py +++ b/marconi/queues/transport/wsgi/v1_1/health.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013 Rackspace Hosting, Inc. +# Copyright (c) 2013 Rackspace, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -10,24 +10,23 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. +# # See the License for the specific language governing permissions and # limitations under the License. -"""version: version information for the transport API.""" +import falcon -def info(): - """Returns the API version as a tuple. +class Resource(object): - :rtype: (int, int) - """ - return (1, 0) + __slots__ = ('driver',) + def __init__(self, driver): + self.driver = driver -def path(): - """Returns the API version as /v{version}. + def on_get(self, req, resp, **kwargs): + resp.status = (falcon.HTTP_204 if self.driver.is_alive() + else falcon.HTTP_503) - :returns: /v{version} - :rtype: text - """ - return '/v{0}'.format(info()[0]) + def on_head(self, req, resp, **kwargs): + resp.status = falcon.HTTP_204 diff --git a/marconi/queues/transport/wsgi/v1_1/homedoc.py b/marconi/queues/transport/wsgi/v1_1/homedoc.py new file mode 100644 index 000000000..19d85b133 --- /dev/null +++ b/marconi/queues/transport/wsgi/v1_1/homedoc.py @@ -0,0 +1,144 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + + +# NOTE(kgriffs): http://tools.ietf.org/html/draft-nottingham-json-home-03 +JSON_HOME = { + 'resources': { + #------------------------------------------------------------------ + # Queues + #------------------------------------------------------------------ + 'rel/queues': { + 'href-template': '/v1/queues{?marker,limit,detailed}', + 'href-vars': { + 'marker': 'param/marker', + 'limit': 'param/queue_limit', + 'detailed': 'param/detailed', + }, + 'hints': { + 'allow': ['GET'], + 'formats': { + 'application/json': {}, + }, + }, + }, + 'rel/queue': { + 'href-template': '/v1/queues/{queue_name}', + 'href-vars': { + 'queue_name': 'param/queue_name', + }, + 'hints': { + 'allow': ['GET', 'HEAD', 'PUT', 'DELETE'], + 'formats': { + 'application/json': {}, + }, + }, + }, + 'rel/queue-metadata': { + 'href-template': '/v1/queues/{queue_name}/metadata', + 'href-vars': { + 'queue_name': 'param/queue_name', + }, + 'hints': { + 'allow': ['GET', 'PUT'], + 'formats': { + 'application/json': {}, + }, + }, + }, + 'rel/queue-stats': { + 'href-template': '/v1/queues/{queue_name}/stats', + 'href-vars': { + 'queue_name': 'param/queue_name', + }, + 'hints': { + 'allow': ['GET'], + 'formats': { + 'application/json': {}, + }, + }, + }, + + #------------------------------------------------------------------ + # Messages + #------------------------------------------------------------------ + 'rel/messages': { + 'href-template': ('/v1/queues/{queue_name}/messages' + '{?marker,limit,echo,include_claimed}'), + 'href-vars': { + 'queue_name': 'param/queue_name', + 'marker': 'param/marker', + 'limit': 'param/messages_limit', + 'echo': 'param/echo', + 'include_claimed': 'param/include_claimed', + }, + 'hints': { + 'allow': ['GET'], + 'formats': { + 'application/json': {}, + }, + }, + }, + 'rel/post-messages': { + 'href-template': '/v1/queues/{queue_name}/messages', + 'href-vars': { + 'queue_name': 'param/queue_name', + }, + 'hints': { + 'allow': ['POST'], + 'formats': { + 'application/json': {}, + }, + 'accept-post': ['application/json'], + }, + }, + + #------------------------------------------------------------------ + # Claims + #------------------------------------------------------------------ + 'rel/claim': { + 'href-template': '/v1/queues/{queue_name}/claims{?limit}', + 'href-vars': { + 'queue_name': 'param/queue_name', + 'limit': 'param/claim_limit', + }, + 'hints': { + 'allow': ['POST'], + 'formats': { + 'application/json': {}, + }, + 'accept-post': ['application/json'] + }, + }, + + } +} + + +class Resource(object): + + def __init__(self): + document = json.dumps(JSON_HOME, ensure_ascii=False, indent=4) + self.document_utf8 = document.encode('utf-8') + + def on_get(self, req, resp, project_id): + resp.data = self.document_utf8 + + resp.content_type = 'application/json-home' + resp.cache_control = ['max-age=86400'] + # status defaults to 200 diff --git a/marconi/queues/transport/wsgi/v1_1/messages.py b/marconi/queues/transport/wsgi/v1_1/messages.py new file mode 100644 index 000000000..03e71678f --- /dev/null +++ b/marconi/queues/transport/wsgi/v1_1/messages.py @@ -0,0 +1,306 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import falcon +import six + +from marconi.openstack.common.gettextutils import _ +import marconi.openstack.common.log as logging +from marconi.queues.storage import errors as storage_errors +from marconi.queues.transport import utils +from marconi.queues.transport import validation +from marconi.queues.transport.wsgi import errors as wsgi_errors +from marconi.queues.transport.wsgi import utils as wsgi_utils + +LOG = logging.getLogger(__name__) + +MESSAGE_POST_SPEC = (('ttl', int), ('body', '*')) + + +class CollectionResource(object): + + __slots__ = ('message_controller', '_wsgi_conf', '_validate') + + def __init__(self, wsgi_conf, validate, message_controller): + self._wsgi_conf = wsgi_conf + self._validate = validate + self.message_controller = message_controller + + #----------------------------------------------------------------------- + # Helpers + #----------------------------------------------------------------------- + + def _get_by_id(self, base_path, project_id, queue_name, ids): + """Returns one or more messages from the queue by ID.""" + try: + self._validate.message_listing(limit=len(ids)) + messages = self.message_controller.bulk_get( + queue_name, + message_ids=ids, + project=project_id) + + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + + except Exception as ex: + LOG.exception(ex) + description = _(u'Message could not be retrieved.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + # Prepare response + messages = list(messages) + if not messages: + return None + + base_path += '/' + for each_message in messages: + each_message['href'] = base_path + each_message['id'] + del each_message['id'] + + return messages + + def _get(self, req, project_id, queue_name): + client_uuid = wsgi_utils.get_client_uuid(req) + kwargs = {} + + # NOTE(kgriffs): This syntax ensures that + # we don't clobber default values with None. + req.get_param('marker', store=kwargs) + req.get_param_as_int('limit', store=kwargs) + req.get_param_as_bool('echo', store=kwargs) + req.get_param_as_bool('include_claimed', store=kwargs) + + try: + self._validate.message_listing(**kwargs) + results = self.message_controller.list( + queue_name, + project=project_id, + client_uuid=client_uuid, + **kwargs) + + # Buffer messages + cursor = next(results) + messages = list(cursor) + + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + + except storage_errors.DoesNotExist as ex: + LOG.debug(ex) + raise falcon.HTTPNotFound() + + except Exception as ex: + LOG.exception(ex) + description = _(u'Messages could not be listed.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + if not messages: + return None + + # Found some messages, so prepare the response + kwargs['marker'] = next(results) + for each_message in messages: + each_message['href'] = req.path + '/' + each_message['id'] + del each_message['id'] + + return { + 'messages': messages, + 'links': [ + { + 'rel': 'next', + 'href': req.path + falcon.to_query_str(kwargs) + } + ] + } + + #----------------------------------------------------------------------- + # Interface + #----------------------------------------------------------------------- + + def on_post(self, req, resp, project_id, queue_name): + LOG.debug(_(u'Messages collection POST - queue: %(queue)s, ' + u'project: %(project)s'), + {'queue': queue_name, 'project': project_id}) + + client_uuid = wsgi_utils.get_client_uuid(req) + + try: + # Place JSON size restriction before parsing + self._validate.message_length(req.content_length) + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + + # Pull out just the fields we care about + messages = wsgi_utils.filter_stream( + req.stream, + req.content_length, + MESSAGE_POST_SPEC, + doctype=wsgi_utils.JSONArray) + + # Enqueue the messages + partial = False + + try: + self._validate.message_posting(messages) + + message_ids = self.message_controller.post( + queue_name, + messages=messages, + project=project_id, + client_uuid=client_uuid) + + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + + except storage_errors.DoesNotExist as ex: + LOG.debug(ex) + raise falcon.HTTPNotFound() + + except storage_errors.MessageConflict as ex: + LOG.exception(ex) + partial = True + message_ids = ex.succeeded_ids + + if not message_ids: + # TODO(kgriffs): Include error code that is different + # from the code used in the generic case, below. + description = _(u'No messages could be enqueued.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + except Exception as ex: + LOG.exception(ex) + description = _(u'Messages could not be enqueued.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + # Prepare the response + ids_value = ','.join(message_ids) + resp.location = req.path + '?ids=' + ids_value + + hrefs = [req.path + '/' + id for id in message_ids] + body = {'resources': hrefs, 'partial': partial} + resp.body = utils.to_json(body) + resp.status = falcon.HTTP_201 + + def on_get(self, req, resp, project_id, queue_name): + LOG.debug(_(u'Messages collection GET - queue: %(queue)s, ' + u'project: %(project)s'), + {'queue': queue_name, 'project': project_id}) + + resp.content_location = req.relative_uri + + ids = req.get_param_as_list('ids') + if ids is None: + response = self._get(req, project_id, queue_name) + else: + response = self._get_by_id(req.path, project_id, queue_name, ids) + + if response is None: + resp.status = falcon.HTTP_204 + return + + resp.body = utils.to_json(response) + # status defaults to 200 + + def on_delete(self, req, resp, project_id, queue_name): + # NOTE(zyuan): Attempt to delete the whole message collection + # (without an "ids" parameter) is not allowed + ids = req.get_param_as_list('ids', required=True) + + try: + self._validate.message_listing(limit=len(ids)) + self.message_controller.bulk_delete( + queue_name, + message_ids=ids, + project=project_id) + + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + + except Exception as ex: + LOG.exception(ex) + description = _(u'Messages could not be deleted.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + resp.status = falcon.HTTP_204 + + +class ItemResource(object): + + __slots__ = ('message_controller') + + def __init__(self, message_controller): + self.message_controller = message_controller + + def on_get(self, req, resp, project_id, queue_name, message_id): + LOG.debug(_(u'Messages item GET - message: %(message)s, ' + u'queue: %(queue)s, project: %(project)s'), + {'message': message_id, + 'queue': queue_name, + 'project': project_id}) + try: + message = self.message_controller.get( + queue_name, + message_id, + project=project_id) + + except storage_errors.DoesNotExist as ex: + LOG.debug(ex) + raise falcon.HTTPNotFound() + + except Exception as ex: + LOG.exception(ex) + description = _(u'Message could not be retrieved.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + # Prepare response + message['href'] = req.path + del message['id'] + + resp.content_location = req.relative_uri + resp.body = utils.to_json(message) + # status defaults to 200 + + def on_delete(self, req, resp, project_id, queue_name, message_id): + LOG.debug(_(u'Messages item DELETE - message: %(message)s, ' + u'queue: %(queue)s, project: %(project)s'), + {'message': message_id, + 'queue': queue_name, + 'project': project_id}) + try: + self.message_controller.delete( + queue_name, + message_id=message_id, + project=project_id, + claim=req.get_param('claim_id')) + + except storage_errors.NotPermitted as ex: + LOG.exception(ex) + title = _(u'Unable to delete') + description = _(u'This message is claimed; it cannot be ' + u'deleted without a valid claim_id.') + raise falcon.HTTPForbidden(title, description) + + except Exception as ex: + LOG.exception(ex) + description = _(u'Message could not be deleted.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + # Alles guete + resp.status = falcon.HTTP_204 diff --git a/marconi/queues/transport/wsgi/v1_1/metadata.py b/marconi/queues/transport/wsgi/v1_1/metadata.py new file mode 100644 index 000000000..f2c00092b --- /dev/null +++ b/marconi/queues/transport/wsgi/v1_1/metadata.py @@ -0,0 +1,96 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import falcon +import six + +from marconi.openstack.common.gettextutils import _ +import marconi.openstack.common.log as logging +from marconi.queues.storage import errors as storage_errors +from marconi.queues.transport import utils +from marconi.queues.transport import validation +from marconi.queues.transport.wsgi import errors as wsgi_errors +from marconi.queues.transport.wsgi import utils as wsgi_utils + + +LOG = logging.getLogger(__name__) + + +class Resource(object): + __slots__ = ('_wsgi_conf', '_validate', 'queue_ctrl') + + def __init__(self, _wsgi_conf, validate, queue_controller): + self._wsgi_conf = _wsgi_conf + self._validate = validate + self.queue_ctrl = queue_controller + + def on_get(self, req, resp, project_id, queue_name): + LOG.debug(_(u'Queue metadata GET - queue: %(queue)s, ' + u'project: %(project)s'), + {'queue': queue_name, 'project': project_id}) + + try: + resp_dict = self.queue_ctrl.get_metadata(queue_name, + project=project_id) + + except storage_errors.DoesNotExist as ex: + LOG.debug(ex) + raise falcon.HTTPNotFound() + + except Exception as ex: + LOG.exception(ex) + description = _(u'Queue metadata could not be retrieved.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + resp.content_location = req.path + resp.body = utils.to_json(resp_dict) + # status defaults to 200 + + def on_put(self, req, resp, project_id, queue_name): + LOG.debug(_(u'Queue metadata PUT - queue: %(queue)s, ' + u'project: %(project)s'), + {'queue': queue_name, 'project': project_id}) + + try: + # Place JSON size restriction before parsing + self._validate.queue_metadata_length(req.content_length) + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + + # Deserialize queue metadata + metadata, = wsgi_utils.filter_stream(req.stream, + req.content_length, + spec=None) + + try: + self.queue_ctrl.set_metadata(queue_name, + metadata=metadata, + project=project_id) + + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + + except storage_errors.QueueDoesNotExist: + raise falcon.HTTPNotFound() + + except Exception as ex: + LOG.exception(ex) + description = _(u'Metadata could not be updated.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + resp.status = falcon.HTTP_204 + resp.location = req.path diff --git a/marconi/queues/transport/wsgi/v1_1/queues.py b/marconi/queues/transport/wsgi/v1_1/queues.py new file mode 100644 index 000000000..eef6a1988 --- /dev/null +++ b/marconi/queues/transport/wsgi/v1_1/queues.py @@ -0,0 +1,139 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import falcon +import six + +from marconi.openstack.common.gettextutils import _ +import marconi.openstack.common.log as logging +from marconi.queues.transport import utils +from marconi.queues.transport import validation +from marconi.queues.transport.wsgi import errors as wsgi_errors + + +LOG = logging.getLogger(__name__) + + +class ItemResource(object): + + __slots__ = ('queue_controller', 'message_controller') + + def __init__(self, queue_controller, message_controller): + self.queue_controller = queue_controller + self.message_controller = message_controller + + def on_put(self, req, resp, project_id, queue_name): + LOG.debug(_(u'Queue item PUT - queue: %(queue)s, ' + u'project: %(project)s'), + {'queue': queue_name, 'project': project_id}) + + try: + created = self.queue_controller.create( + queue_name, project=project_id) + + except Exception as ex: + LOG.exception(ex) + description = _(u'Queue could not be created.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + resp.status = falcon.HTTP_201 if created else falcon.HTTP_204 + resp.location = req.path + + def on_head(self, req, resp, project_id, queue_name): + LOG.debug(_(u'Queue item exists - queue: %(queue)s, ' + u'project: %(project)s'), + {'queue': queue_name, 'project': project_id}) + + if self.queue_controller.exists(queue_name, project=project_id): + resp.status = falcon.HTTP_204 + else: + resp.status = falcon.HTTP_404 + + resp.content_location = req.path + + on_get = on_head + + def on_delete(self, req, resp, project_id, queue_name): + LOG.debug(_(u'Queue item DELETE - queue: %(queue)s, ' + u'project: %(project)s'), + {'queue': queue_name, 'project': project_id}) + try: + self.queue_controller.delete(queue_name, project=project_id) + + except Exception as ex: + LOG.exception(ex) + description = _(u'Queue could not be deleted.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + resp.status = falcon.HTTP_204 + + +class CollectionResource(object): + + __slots__ = ('queue_controller', '_validate') + + def __init__(self, validate, queue_controller): + self.queue_controller = queue_controller + self._validate = validate + + def on_get(self, req, resp, project_id): + + kwargs = {} + + # NOTE(kgriffs): This syntax ensures that + # we don't clobber default values with None. + req.get_param('marker', store=kwargs) + req.get_param_as_int('limit', store=kwargs) + req.get_param_as_bool('detailed', store=kwargs) + + try: + self._validate.queue_listing(**kwargs) + results = self.queue_controller.list(project=project_id, **kwargs) + + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + + except Exception as ex: + LOG.exception(ex) + description = _(u'Queues could not be listed.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + # Buffer list of queues + queues = list(next(results)) + + # Check for an empty list + if len(queues) == 0: + resp.status = falcon.HTTP_204 + return + + # Got some. Prepare the response. + kwargs['marker'] = next(results) + for each_queue in queues: + each_queue['href'] = req.path + '/' + each_queue['name'] + + response_body = { + 'queues': queues, + 'links': [ + { + 'rel': 'next', + 'href': req.path + falcon.to_query_str(kwargs) + } + ] + } + + resp.content_location = req.relative_uri + resp.body = utils.to_json(response_body) + # status defaults to 200 diff --git a/marconi/queues/transport/wsgi/v1_1/shards.py b/marconi/queues/transport/wsgi/v1_1/shards.py new file mode 100644 index 000000000..1fdd48d5a --- /dev/null +++ b/marconi/queues/transport/wsgi/v1_1/shards.py @@ -0,0 +1,199 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""shards: a resource to handle storage shard management + +A shard is added by an operator by interacting with the +sharding-related endpoints. When specifying a shard, the +following fields are required: + +{ + "name": string, + "weight": integer, + "uri": string::uri +} + +Furthermore, depending on the underlying storage type of shard being +registered, there is an optional field: + +{ + "options": {...} +} +""" + +import falcon +import jsonschema + +from marconi.common.schemas import shards as schema +from marconi.common.transport.wsgi import utils +from marconi.common import utils as common_utils +from marconi.openstack.common import log +from marconi.queues.storage import errors +from marconi.queues.storage import utils as storage_utils +from marconi.queues.transport import utils as transport_utils +from marconi.queues.transport.wsgi import errors as wsgi_errors + +LOG = log.getLogger(__name__) + + +class Listing(object): + """A resource to list registered shards + + :param shards_controller: means to interact with storage + """ + def __init__(self, shards_controller): + self._ctrl = shards_controller + + def on_get(self, request, response, project_id): + """Returns a shard listing as objects embedded in an array: + + [ + {"href": "", "weight": 100, "uri": ""}, + ... + ] + + :returns: HTTP | [200, 204] + """ + LOG.debug(u'LIST shards') + + store = {} + request.get_param('marker', store=store) + request.get_param_as_int('limit', store=store) + request.get_param_as_bool('detailed', store=store) + + results = {} + results['shards'] = list(self._ctrl.list(**store)) + for entry in results['shards']: + entry['href'] = request.path + '/' + entry.pop('name') + + if not results['shards']: + response.status = falcon.HTTP_204 + return + + response.content_location = request.relative_uri + response.body = transport_utils.to_json(results) + response.status = falcon.HTTP_200 + + +class Resource(object): + """A handler for individual shard. + + :param shards_controller: means to interact with storage + """ + def __init__(self, shards_controller): + self._ctrl = shards_controller + validator_type = jsonschema.Draft4Validator + self._validators = { + 'weight': validator_type(schema.patch_weight), + 'uri': validator_type(schema.patch_uri), + 'options': validator_type(schema.patch_options), + 'create': validator_type(schema.create) + } + + def on_get(self, request, response, project_id, shard): + """Returns a JSON object for a single shard entry: + + {"weight": 100, "uri": "", options: {...}} + + :returns: HTTP | [200, 404] + """ + LOG.debug(u'GET shard - name: %s', shard) + data = None + detailed = request.get_param_as_bool('detailed') or False + + try: + data = self._ctrl.get(shard, detailed) + + except errors.ShardDoesNotExist as ex: + LOG.debug(ex) + raise falcon.HTTPNotFound() + + data['href'] = request.path + + # remove the name entry - it isn't needed on GET + del data['name'] + response.body = transport_utils.to_json(data) + response.content_location = request.relative_uri + + def on_put(self, request, response, project_id, shard): + """Registers a new shard. Expects the following input: + + {"weight": 100, "uri": ""} + + An options object may also be provided. + + :returns: HTTP | [201, 204] + """ + LOG.debug(u'PUT shard - name: %s', shard) + + data = utils.load(request) + utils.validate(self._validators['create'], data) + if not storage_utils.can_connect(data['uri']): + raise wsgi_errors.HTTPBadRequestBody( + 'cannot connect to %s' % data['uri'] + ) + self._ctrl.create(shard, weight=data['weight'], + uri=data['uri'], + options=data.get('options', {})) + response.status = falcon.HTTP_201 + response.location = request.path + + def on_delete(self, request, response, project_id, shard): + """Deregisters a shard. + + :returns: HTTP | 204 + """ + LOG.debug(u'DELETE shard - name: %s', shard) + self._ctrl.delete(shard) + response.status = falcon.HTTP_204 + + def on_patch(self, request, response, project_id, shard): + """Allows one to update a shard's weight, uri, and/or options. + + This method expects the user to submit a JSON object + containing atleast one of: 'uri', 'weight', 'options'. If + none are found, the request is flagged as bad. There is also + strict format checking through the use of + jsonschema. Appropriate errors are returned in each case for + badly formatted input. + + :returns: HTTP | 200,400 + """ + LOG.debug(u'PATCH shard - name: %s', shard) + data = utils.load(request) + + EXPECT = ('weight', 'uri', 'options') + if not any([(field in data) for field in EXPECT]): + LOG.debug(u'PATCH shard, bad params') + raise wsgi_errors.HTTPBadRequestBody( + 'One of `uri`, `weight`, or `options` needs ' + 'to be specified' + ) + + for field in EXPECT: + utils.validate(self._validators[field], data) + + if 'uri' in data and not storage_utils.can_connect(data['uri']): + raise wsgi_errors.HTTPBadRequestBody( + 'cannot connect to %s' % data['uri'] + ) + fields = common_utils.fields(data, EXPECT, + pred=lambda v: v is not None) + + try: + self._ctrl.update(shard, **fields) + except errors.ShardDoesNotExist as ex: + LOG.exception(ex) + raise falcon.HTTPNotFound() diff --git a/marconi/queues/transport/wsgi/v1_1/stats.py b/marconi/queues/transport/wsgi/v1_1/stats.py new file mode 100644 index 000000000..9d80afb74 --- /dev/null +++ b/marconi/queues/transport/wsgi/v1_1/stats.py @@ -0,0 +1,64 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import falcon + +from marconi.openstack.common.gettextutils import _ +import marconi.openstack.common.log as logging +from marconi.queues.storage import errors as storage_errors +from marconi.queues.transport import utils +from marconi.queues.transport.wsgi import errors as wsgi_errors + + +LOG = logging.getLogger(__name__) + + +class Resource(object): + + __slots__ = ('queue_ctrl') + + def __init__(self, queue_controller): + self.queue_ctrl = queue_controller + + def on_get(self, req, resp, project_id, queue_name): + try: + resp_dict = self.queue_ctrl.stats(queue_name, + project=project_id) + + message_stats = resp_dict['messages'] + + if message_stats['total'] != 0: + base_path = req.path[:req.path.rindex('/')] + '/messages/' + + newest = message_stats['newest'] + newest['href'] = base_path + newest['id'] + del newest['id'] + + oldest = message_stats['oldest'] + oldest['href'] = base_path + oldest['id'] + del oldest['id'] + + resp.content_location = req.path + resp.body = utils.to_json(resp_dict) + # status defaults to 200 + + except storage_errors.DoesNotExist as ex: + LOG.debug(ex) + raise falcon.HTTPNotFound() + + except Exception as ex: + LOG.exception(ex) + description = _(u'Queue stats could not be read.') + raise wsgi_errors.HTTPServiceUnavailable(description) diff --git a/setup.cfg b/setup.cfg index e086f77d2..5e640eaca 100644 --- a/setup.cfg +++ b/setup.cfg @@ -39,11 +39,8 @@ marconi.queues.control.storage = mongodb = marconi.queues.storage.mongodb.driver:ControlDriver faulty = marconi.tests.faulty_storage:ControlDriver -marconi.queues.public.transport = - wsgi = marconi.queues.transport.wsgi.public.driver:Driver - -marconi.queues.admin.transport = - wsgi = marconi.queues.transport.wsgi.admin.driver:Driver +marconi.queues.transport = + wsgi = marconi.queues.transport.wsgi.driver:Driver marconi.openstack.common.cache.backends = memory = marconi.openstack.common.cache._backends.memory:MemoryBackend @@ -55,7 +52,7 @@ oslo.config.opts = marconi.storage.sharding = marconi.queues.storage.sharding._config_options marconi.storage.mongodb = marconi.queues.storage.mongodb.options._config_options marconi.storage.sqlite = marconi.queues.storage.sqlite.options._config_options - marconi.transport.wsgi = marconi.queues.transport.wsgi.driver._config_options + marconi.transport.wsgi = marconi.queues.transport.wsgi.v1_0.driver._config_options marconi.transport.base = marconi.queues.transport.base._config_options marconi.transport.validation = marconi.queues.transport.validation._config_options