feat(v1.1): Plumb v1.1 API in the WSGI transport

This patch creates a copy of the v1.0 API as a starting point for
implementing the v1.1 API. The way that the "admin_mode" config option was
implemented was changed, in order to simplify the grafting of the two API
versions into a single driver. When the mode was implemented via inheritance,
grafting the two API versions required some ugly hacks.

Partially-Implements: blueprint api-v1.1
Change-Id: I0e753315e3171bb5c23a57b8fb3826d08bfd5f26
This commit is contained in:
kgriffs 2014-02-17 07:45:25 -06:00
parent de4aa6bf75
commit be4b3f0b66
26 changed files with 1323 additions and 154 deletions

View File

@ -70,8 +70,6 @@ class Bootstrap(object):
self.driver_conf = self.conf[_DRIVER_GROUP]
log.setup('marconi')
mode = 'admin' if conf.admin_mode else 'public'
self._transport_type = 'marconi.queues.{0}.transport'.format(mode)
@decorators.lazy_property(write=False)
def storage(self):
@ -110,9 +108,15 @@ class Bootstrap(object):
transport_name = self.driver_conf.transport
LOG.debug(_(u'Loading transport driver: %s'), transport_name)
args = [self.conf, self.storage, self.cache, self.control]
args = [
self.conf,
self.storage,
self.cache,
self.control,
]
try:
mgr = driver.DriverManager(self._transport_type,
mgr = driver.DriverManager('marconi.queues.transport',
transport_name,
invoke_on_load=True,
invoke_args=args)

View File

@ -16,4 +16,4 @@
from marconi.queues.transport.wsgi import driver
# Hoist into package namespace
Driver = driver.DriverBase
Driver = driver.Driver

View File

@ -1,34 +0,0 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""marconi-queues (admin): interface for managing partitions."""
from marconi.common.transport.wsgi import health
from marconi.queues.transport.wsgi.public import driver as public_driver
from marconi.queues.transport.wsgi import shards
class Driver(public_driver.Driver):
@property
def bridge(self):
shards_controller = self._control.shards_controller
return super(Driver, self).bridge + [
('/shards',
shards.Listing(shards_controller)),
('/shards/{shard}',
shards.Resource(shards_controller)),
('/health',
health.Resource(self._storage))
]

View File

@ -13,17 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import functools
import itertools
from wsgiref import simple_server
import falcon
from oslo.config import cfg
import six
from marconi.common import decorators
from marconi.common.transport import version
from marconi.common.transport.wsgi import helpers
from marconi.common import utils
from marconi.openstack.common.gettextutils import _
@ -31,6 +28,8 @@ import marconi.openstack.common.log as logging
from marconi.queues import transport
from marconi.queues.transport import auth
from marconi.queues.transport import validation
from marconi.queues.transport.wsgi import v1_0
from marconi.queues.transport.wsgi import v1_1
_WSGI_OPTIONS = [
cfg.StrOpt('bind', default='127.0.0.1',
@ -49,11 +48,10 @@ def _config_options():
return itertools.chain(utils.options_iter(_WSGI_OPTIONS, _WSGI_GROUP))
@six.add_metaclass(abc.ABCMeta)
class DriverBase(transport.DriverBase):
class Driver(transport.DriverBase):
def __init__(self, conf, storage, cache, control):
super(DriverBase, self).__init__(conf, storage, cache, control)
super(Driver, self).__init__(conf, storage, cache, control)
self._conf.register_opts(_WSGI_OPTIONS, group=_WSGI_GROUP)
self._wsgi_conf = self._conf[_WSGI_GROUP]
@ -77,10 +75,23 @@ class DriverBase(transport.DriverBase):
def _init_routes(self):
"""Initialize hooks and URI routes to resources."""
catalog = [
('/v1', v1_0.public_endpoints(self)),
('/v1.1', v1_1.public_endpoints(self)),
]
if self._conf.admin_mode:
catalog.extend([
('/v1', v1_0.private_endpoints(self)),
('/v1.1', v1_1.private_endpoints(self)),
])
self.app = falcon.API(before=self.before_hooks)
version_path = version.path()
for route, resource in self.bridge:
self.app.add_route(version_path + route, resource)
for version_path, endpoints in catalog:
for route, resource in endpoints:
self.app.add_route(version_path + route, resource)
def _init_middleware(self):
"""Initialize WSGI middlewarez."""
@ -90,17 +101,6 @@ class DriverBase(transport.DriverBase):
strategy = auth.strategy(self._conf.auth_strategy)
self.app = strategy.install(self.app, self._conf)
@abc.abstractproperty
def bridge(self):
"""Constructs a list of route/responder pairs that can be used to
establish the functionality of this driver.
Note: the routes should be unversioned.
:rtype: [(str, falcon-compatible responser)]
"""
raise NotImplementedError
def listen(self):
"""Self-host using 'bind' and 'port' from the WSGI config group."""

View File

@ -1,76 +0,0 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""marconi-queues public interface.
Handles all the routes for queuing, messaging, and claiming.
"""
from marconi.common.transport.wsgi import health
from marconi.queues.transport.wsgi import claims
from marconi.queues.transport.wsgi import driver
from marconi.queues.transport.wsgi import messages
from marconi.queues.transport.wsgi import metadata
from marconi.queues.transport.wsgi import queues
from marconi.queues.transport.wsgi import stats
from marconi.queues.transport.wsgi import v1
class Driver(driver.DriverBase):
@property
def bridge(self):
queue_controller = self._storage.queue_controller
message_controller = self._storage.message_controller
claim_controller = self._storage.claim_controller
return [
# Home
('/',
v1.Resource()),
# Queues Endpoints
('/queues',
queues.CollectionResource(self._validate,
queue_controller)),
('/queues/{queue_name}',
queues.ItemResource(queue_controller,
message_controller)),
('/queues/{queue_name}/stats',
stats.Resource(queue_controller)),
('/queues/{queue_name}/metadata',
metadata.Resource(self._wsgi_conf, self._validate,
queue_controller)),
# Messages Endpoints
('/queues/{queue_name}/messages',
messages.CollectionResource(self._wsgi_conf,
self._validate,
message_controller)),
('/queues/{queue_name}/messages/{message_id}',
messages.ItemResource(message_controller)),
# Claims Endpoints
('/queues/{queue_name}/claims',
claims.CollectionResource(self._wsgi_conf,
self._validate,
claim_controller)),
('/queues/{queue_name}/claims/{claim_id}',
claims.ItemResource(self._wsgi_conf,
self._validate,
claim_controller)),
# Health
('/health',
health.Resource(self._storage))
]

View File

@ -0,0 +1,66 @@
from marconi.queues.transport.wsgi.v1_0 import claims
from marconi.queues.transport.wsgi.v1_0 import health
from marconi.queues.transport.wsgi.v1_0 import homedoc
from marconi.queues.transport.wsgi.v1_0 import messages
from marconi.queues.transport.wsgi.v1_0 import metadata
from marconi.queues.transport.wsgi.v1_0 import queues
from marconi.queues.transport.wsgi.v1_0 import shards
from marconi.queues.transport.wsgi.v1_0 import stats
def public_endpoints(driver):
queue_controller = driver._storage.queue_controller
message_controller = driver._storage.message_controller
claim_controller = driver._storage.claim_controller
return [
# Home
('/',
homedoc.Resource()),
# Queues Endpoints
('/queues',
queues.CollectionResource(driver._validate,
queue_controller)),
('/queues/{queue_name}',
queues.ItemResource(queue_controller,
message_controller)),
('/queues/{queue_name}/stats',
stats.Resource(queue_controller)),
('/queues/{queue_name}/metadata',
metadata.Resource(driver._wsgi_conf, driver._validate,
queue_controller)),
# Messages Endpoints
('/queues/{queue_name}/messages',
messages.CollectionResource(driver._wsgi_conf,
driver._validate,
message_controller)),
('/queues/{queue_name}/messages/{message_id}',
messages.ItemResource(message_controller)),
# Claims Endpoints
('/queues/{queue_name}/claims',
claims.CollectionResource(driver._wsgi_conf,
driver._validate,
claim_controller)),
('/queues/{queue_name}/claims/{claim_id}',
claims.ItemResource(driver._wsgi_conf,
driver._validate,
claim_controller)),
# Health
('/health',
health.Resource(driver._storage))
]
def private_endpoints(driver):
shards_controller = driver._control.shards_controller
return [
('/shards',
shards.Listing(shards_controller)),
('/shards/{shard}',
shards.Resource(shards_controller)),
]

View File

@ -0,0 +1,66 @@
from marconi.queues.transport.wsgi.v1_1 import claims
from marconi.queues.transport.wsgi.v1_1 import health
from marconi.queues.transport.wsgi.v1_1 import homedoc
from marconi.queues.transport.wsgi.v1_1 import messages
from marconi.queues.transport.wsgi.v1_1 import metadata
from marconi.queues.transport.wsgi.v1_1 import queues
from marconi.queues.transport.wsgi.v1_1 import shards
from marconi.queues.transport.wsgi.v1_1 import stats
def public_endpoints(driver):
queue_controller = driver._storage.queue_controller
message_controller = driver._storage.message_controller
claim_controller = driver._storage.claim_controller
return [
# Home
('/',
homedoc.Resource()),
# Queues Endpoints
('/queues',
queues.CollectionResource(driver._validate,
queue_controller)),
('/queues/{queue_name}',
queues.ItemResource(queue_controller,
message_controller)),
('/queues/{queue_name}/stats',
stats.Resource(queue_controller)),
('/queues/{queue_name}/metadata',
metadata.Resource(driver._wsgi_conf, driver._validate,
queue_controller)),
# Messages Endpoints
('/queues/{queue_name}/messages',
messages.CollectionResource(driver._wsgi_conf,
driver._validate,
message_controller)),
('/queues/{queue_name}/messages/{message_id}',
messages.ItemResource(message_controller)),
# Claims Endpoints
('/queues/{queue_name}/claims',
claims.CollectionResource(driver._wsgi_conf,
driver._validate,
claim_controller)),
('/queues/{queue_name}/claims/{claim_id}',
claims.ItemResource(driver._wsgi_conf,
driver._validate,
claim_controller)),
# Health
('/health',
health.Resource(driver._storage))
]
def private_endpoints(driver):
shards_controller = driver._control.shards_controller
return [
('/shards',
shards.Listing(shards_controller)),
('/shards/{shard}',
shards.Resource(shards_controller)),
]

View File

@ -0,0 +1,199 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import six
from marconi.openstack.common.gettextutils import _
import marconi.openstack.common.log as logging
from marconi.queues.storage import errors as storage_errors
from marconi.queues.transport import utils
from marconi.queues.transport import validation
from marconi.queues.transport.wsgi import errors as wsgi_errors
from marconi.queues.transport.wsgi import utils as wsgi_utils
LOG = logging.getLogger(__name__)
CLAIM_POST_SPEC = (('ttl', int), ('grace', int))
CLAIM_PATCH_SPEC = (('ttl', int),)
class Resource(object):
__slots__ = ('claim_controller', '_validate')
def __init__(self, wsgi_conf, validate, claim_controller):
self.claim_controller = claim_controller
self._validate = validate
class CollectionResource(Resource):
def on_post(self, req, resp, project_id, queue_name):
LOG.debug(_(u'Claims collection POST - queue: %(queue)s, '
u'project: %(project)s'),
{'queue': queue_name, 'project': project_id})
# Check for an explicit limit on the # of messages to claim
limit = req.get_param_as_int('limit')
claim_options = {} if limit is None else {'limit': limit}
# Read claim metadata (e.g., TTL) and raise appropriate
# HTTP errors as needed.
metadata, = wsgi_utils.filter_stream(req.stream, req.content_length,
CLAIM_POST_SPEC)
# Claim some messages
try:
self._validate.claim_creation(metadata, limit=limit)
cid, msgs = self.claim_controller.create(
queue_name,
metadata=metadata,
project=project_id,
**claim_options)
# Buffer claimed messages
# TODO(kgriffs): optimize, along with serialization (below)
resp_msgs = list(msgs)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except Exception as ex:
LOG.exception(ex)
description = _(u'Claim could not be created.')
raise wsgi_errors.HTTPServiceUnavailable(description)
# Serialize claimed messages, if any. This logic assumes
# the storage driver returned well-formed messages.
if len(resp_msgs) != 0:
for msg in resp_msgs:
msg['href'] = _msg_uri_from_claim(
req.path.rpartition('/')[0], msg['id'], cid)
del msg['id']
resp.location = req.path + '/' + cid
resp.body = utils.to_json(resp_msgs)
resp.status = falcon.HTTP_201
else:
resp.status = falcon.HTTP_204
class ItemResource(Resource):
__slots__ = ('claim_controller', '_validate')
def __init__(self, wsgi_conf, validate, claim_controller):
self.claim_controller = claim_controller
self._validate = validate
def on_get(self, req, resp, project_id, queue_name, claim_id):
LOG.debug(_(u'Claim item GET - claim: %(claim_id)s, '
u'queue: %(queue_name)s, project: %(project_id)s'),
{'queue_name': queue_name,
'project_id': project_id,
'claim_id': claim_id})
try:
meta, msgs = self.claim_controller.get(
queue_name,
claim_id=claim_id,
project=project_id)
# Buffer claimed messages
# TODO(kgriffs): Optimize along with serialization (see below)
meta['messages'] = list(msgs)
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise falcon.HTTPNotFound()
except Exception as ex:
LOG.exception(ex)
description = _(u'Claim could not be queried.')
raise wsgi_errors.HTTPServiceUnavailable(description)
# Serialize claimed messages
# TODO(kgriffs): Optimize
for msg in meta['messages']:
msg['href'] = _msg_uri_from_claim(
req.path.rsplit('/', 2)[0], msg['id'], meta['id'])
del msg['id']
meta['href'] = req.path
del meta['id']
resp.content_location = req.relative_uri
resp.body = utils.to_json(meta)
# status defaults to 200
def on_patch(self, req, resp, project_id, queue_name, claim_id):
LOG.debug(_(u'Claim Item PATCH - claim: %(claim_id)s, '
u'queue: %(queue_name)s, project:%(project_id)s') %
{'queue_name': queue_name,
'project_id': project_id,
'claim_id': claim_id})
# Read claim metadata (e.g., TTL) and raise appropriate
# HTTP errors as needed.
metadata, = wsgi_utils.filter_stream(req.stream, req.content_length,
CLAIM_PATCH_SPEC)
try:
self._validate.claim_updating(metadata)
self.claim_controller.update(queue_name,
claim_id=claim_id,
metadata=metadata,
project=project_id)
resp.status = falcon.HTTP_204
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise falcon.HTTPNotFound()
except Exception as ex:
LOG.exception(ex)
description = _(u'Claim could not be updated.')
raise wsgi_errors.HTTPServiceUnavailable(description)
def on_delete(self, req, resp, project_id, queue_name, claim_id):
LOG.debug(_(u'Claim item DELETE - claim: %(claim_id)s, '
u'queue: %(queue_name)s, project: %(project_id)s') %
{'queue_name': queue_name,
'project_id': project_id,
'claim_id': claim_id})
try:
self.claim_controller.delete(queue_name,
claim_id=claim_id,
project=project_id)
resp.status = falcon.HTTP_204
except Exception as ex:
LOG.exception(ex)
description = _(u'Claim could not be deleted.')
raise wsgi_errors.HTTPServiceUnavailable(description)
# TODO(kgriffs): Clean up/optimize and move to wsgi.utils
def _msg_uri_from_claim(base_path, msg_id, claim_id):
return '/'.join(
[base_path, 'messages', msg_id]
) + falcon.to_query_str({'claim_id': claim_id})

View File

@ -1,4 +1,4 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -10,24 +10,23 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
"""version: version information for the transport API."""
import falcon
def info():
"""Returns the API version as a tuple.
class Resource(object):
:rtype: (int, int)
"""
return (1, 0)
__slots__ = ('driver',)
def __init__(self, driver):
self.driver = driver
def path():
"""Returns the API version as /v{version}.
def on_get(self, req, resp, **kwargs):
resp.status = (falcon.HTTP_204 if self.driver.is_alive()
else falcon.HTTP_503)
:returns: /v{version}
:rtype: text
"""
return '/v{0}'.format(info()[0])
def on_head(self, req, resp, **kwargs):
resp.status = falcon.HTTP_204

View File

@ -0,0 +1,144 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
import json
# NOTE(kgriffs): http://tools.ietf.org/html/draft-nottingham-json-home-03
JSON_HOME = {
'resources': {
#------------------------------------------------------------------
# Queues
#------------------------------------------------------------------
'rel/queues': {
'href-template': '/v1/queues{?marker,limit,detailed}',
'href-vars': {
'marker': 'param/marker',
'limit': 'param/queue_limit',
'detailed': 'param/detailed',
},
'hints': {
'allow': ['GET'],
'formats': {
'application/json': {},
},
},
},
'rel/queue': {
'href-template': '/v1/queues/{queue_name}',
'href-vars': {
'queue_name': 'param/queue_name',
},
'hints': {
'allow': ['GET', 'HEAD', 'PUT', 'DELETE'],
'formats': {
'application/json': {},
},
},
},
'rel/queue-metadata': {
'href-template': '/v1/queues/{queue_name}/metadata',
'href-vars': {
'queue_name': 'param/queue_name',
},
'hints': {
'allow': ['GET', 'PUT'],
'formats': {
'application/json': {},
},
},
},
'rel/queue-stats': {
'href-template': '/v1/queues/{queue_name}/stats',
'href-vars': {
'queue_name': 'param/queue_name',
},
'hints': {
'allow': ['GET'],
'formats': {
'application/json': {},
},
},
},
#------------------------------------------------------------------
# Messages
#------------------------------------------------------------------
'rel/messages': {
'href-template': ('/v1/queues/{queue_name}/messages'
'{?marker,limit,echo,include_claimed}'),
'href-vars': {
'queue_name': 'param/queue_name',
'marker': 'param/marker',
'limit': 'param/messages_limit',
'echo': 'param/echo',
'include_claimed': 'param/include_claimed',
},
'hints': {
'allow': ['GET'],
'formats': {
'application/json': {},
},
},
},
'rel/post-messages': {
'href-template': '/v1/queues/{queue_name}/messages',
'href-vars': {
'queue_name': 'param/queue_name',
},
'hints': {
'allow': ['POST'],
'formats': {
'application/json': {},
},
'accept-post': ['application/json'],
},
},
#------------------------------------------------------------------
# Claims
#------------------------------------------------------------------
'rel/claim': {
'href-template': '/v1/queues/{queue_name}/claims{?limit}',
'href-vars': {
'queue_name': 'param/queue_name',
'limit': 'param/claim_limit',
},
'hints': {
'allow': ['POST'],
'formats': {
'application/json': {},
},
'accept-post': ['application/json']
},
},
}
}
class Resource(object):
def __init__(self):
document = json.dumps(JSON_HOME, ensure_ascii=False, indent=4)
self.document_utf8 = document.encode('utf-8')
def on_get(self, req, resp, project_id):
resp.data = self.document_utf8
resp.content_type = 'application/json-home'
resp.cache_control = ['max-age=86400']
# status defaults to 200

View File

@ -0,0 +1,306 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import six
from marconi.openstack.common.gettextutils import _
import marconi.openstack.common.log as logging
from marconi.queues.storage import errors as storage_errors
from marconi.queues.transport import utils
from marconi.queues.transport import validation
from marconi.queues.transport.wsgi import errors as wsgi_errors
from marconi.queues.transport.wsgi import utils as wsgi_utils
LOG = logging.getLogger(__name__)
MESSAGE_POST_SPEC = (('ttl', int), ('body', '*'))
class CollectionResource(object):
__slots__ = ('message_controller', '_wsgi_conf', '_validate')
def __init__(self, wsgi_conf, validate, message_controller):
self._wsgi_conf = wsgi_conf
self._validate = validate
self.message_controller = message_controller
#-----------------------------------------------------------------------
# Helpers
#-----------------------------------------------------------------------
def _get_by_id(self, base_path, project_id, queue_name, ids):
"""Returns one or more messages from the queue by ID."""
try:
self._validate.message_listing(limit=len(ids))
messages = self.message_controller.bulk_get(
queue_name,
message_ids=ids,
project=project_id)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except Exception as ex:
LOG.exception(ex)
description = _(u'Message could not be retrieved.')
raise wsgi_errors.HTTPServiceUnavailable(description)
# Prepare response
messages = list(messages)
if not messages:
return None
base_path += '/'
for each_message in messages:
each_message['href'] = base_path + each_message['id']
del each_message['id']
return messages
def _get(self, req, project_id, queue_name):
client_uuid = wsgi_utils.get_client_uuid(req)
kwargs = {}
# NOTE(kgriffs): This syntax ensures that
# we don't clobber default values with None.
req.get_param('marker', store=kwargs)
req.get_param_as_int('limit', store=kwargs)
req.get_param_as_bool('echo', store=kwargs)
req.get_param_as_bool('include_claimed', store=kwargs)
try:
self._validate.message_listing(**kwargs)
results = self.message_controller.list(
queue_name,
project=project_id,
client_uuid=client_uuid,
**kwargs)
# Buffer messages
cursor = next(results)
messages = list(cursor)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise falcon.HTTPNotFound()
except Exception as ex:
LOG.exception(ex)
description = _(u'Messages could not be listed.')
raise wsgi_errors.HTTPServiceUnavailable(description)
if not messages:
return None
# Found some messages, so prepare the response
kwargs['marker'] = next(results)
for each_message in messages:
each_message['href'] = req.path + '/' + each_message['id']
del each_message['id']
return {
'messages': messages,
'links': [
{
'rel': 'next',
'href': req.path + falcon.to_query_str(kwargs)
}
]
}
#-----------------------------------------------------------------------
# Interface
#-----------------------------------------------------------------------
def on_post(self, req, resp, project_id, queue_name):
LOG.debug(_(u'Messages collection POST - queue: %(queue)s, '
u'project: %(project)s'),
{'queue': queue_name, 'project': project_id})
client_uuid = wsgi_utils.get_client_uuid(req)
try:
# Place JSON size restriction before parsing
self._validate.message_length(req.content_length)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
# Pull out just the fields we care about
messages = wsgi_utils.filter_stream(
req.stream,
req.content_length,
MESSAGE_POST_SPEC,
doctype=wsgi_utils.JSONArray)
# Enqueue the messages
partial = False
try:
self._validate.message_posting(messages)
message_ids = self.message_controller.post(
queue_name,
messages=messages,
project=project_id,
client_uuid=client_uuid)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise falcon.HTTPNotFound()
except storage_errors.MessageConflict as ex:
LOG.exception(ex)
partial = True
message_ids = ex.succeeded_ids
if not message_ids:
# TODO(kgriffs): Include error code that is different
# from the code used in the generic case, below.
description = _(u'No messages could be enqueued.')
raise wsgi_errors.HTTPServiceUnavailable(description)
except Exception as ex:
LOG.exception(ex)
description = _(u'Messages could not be enqueued.')
raise wsgi_errors.HTTPServiceUnavailable(description)
# Prepare the response
ids_value = ','.join(message_ids)
resp.location = req.path + '?ids=' + ids_value
hrefs = [req.path + '/' + id for id in message_ids]
body = {'resources': hrefs, 'partial': partial}
resp.body = utils.to_json(body)
resp.status = falcon.HTTP_201
def on_get(self, req, resp, project_id, queue_name):
LOG.debug(_(u'Messages collection GET - queue: %(queue)s, '
u'project: %(project)s'),
{'queue': queue_name, 'project': project_id})
resp.content_location = req.relative_uri
ids = req.get_param_as_list('ids')
if ids is None:
response = self._get(req, project_id, queue_name)
else:
response = self._get_by_id(req.path, project_id, queue_name, ids)
if response is None:
resp.status = falcon.HTTP_204
return
resp.body = utils.to_json(response)
# status defaults to 200
def on_delete(self, req, resp, project_id, queue_name):
# NOTE(zyuan): Attempt to delete the whole message collection
# (without an "ids" parameter) is not allowed
ids = req.get_param_as_list('ids', required=True)
try:
self._validate.message_listing(limit=len(ids))
self.message_controller.bulk_delete(
queue_name,
message_ids=ids,
project=project_id)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except Exception as ex:
LOG.exception(ex)
description = _(u'Messages could not be deleted.')
raise wsgi_errors.HTTPServiceUnavailable(description)
resp.status = falcon.HTTP_204
class ItemResource(object):
__slots__ = ('message_controller')
def __init__(self, message_controller):
self.message_controller = message_controller
def on_get(self, req, resp, project_id, queue_name, message_id):
LOG.debug(_(u'Messages item GET - message: %(message)s, '
u'queue: %(queue)s, project: %(project)s'),
{'message': message_id,
'queue': queue_name,
'project': project_id})
try:
message = self.message_controller.get(
queue_name,
message_id,
project=project_id)
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise falcon.HTTPNotFound()
except Exception as ex:
LOG.exception(ex)
description = _(u'Message could not be retrieved.')
raise wsgi_errors.HTTPServiceUnavailable(description)
# Prepare response
message['href'] = req.path
del message['id']
resp.content_location = req.relative_uri
resp.body = utils.to_json(message)
# status defaults to 200
def on_delete(self, req, resp, project_id, queue_name, message_id):
LOG.debug(_(u'Messages item DELETE - message: %(message)s, '
u'queue: %(queue)s, project: %(project)s'),
{'message': message_id,
'queue': queue_name,
'project': project_id})
try:
self.message_controller.delete(
queue_name,
message_id=message_id,
project=project_id,
claim=req.get_param('claim_id'))
except storage_errors.NotPermitted as ex:
LOG.exception(ex)
title = _(u'Unable to delete')
description = _(u'This message is claimed; it cannot be '
u'deleted without a valid claim_id.')
raise falcon.HTTPForbidden(title, description)
except Exception as ex:
LOG.exception(ex)
description = _(u'Message could not be deleted.')
raise wsgi_errors.HTTPServiceUnavailable(description)
# Alles guete
resp.status = falcon.HTTP_204

View File

@ -0,0 +1,96 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import six
from marconi.openstack.common.gettextutils import _
import marconi.openstack.common.log as logging
from marconi.queues.storage import errors as storage_errors
from marconi.queues.transport import utils
from marconi.queues.transport import validation
from marconi.queues.transport.wsgi import errors as wsgi_errors
from marconi.queues.transport.wsgi import utils as wsgi_utils
LOG = logging.getLogger(__name__)
class Resource(object):
__slots__ = ('_wsgi_conf', '_validate', 'queue_ctrl')
def __init__(self, _wsgi_conf, validate, queue_controller):
self._wsgi_conf = _wsgi_conf
self._validate = validate
self.queue_ctrl = queue_controller
def on_get(self, req, resp, project_id, queue_name):
LOG.debug(_(u'Queue metadata GET - queue: %(queue)s, '
u'project: %(project)s'),
{'queue': queue_name, 'project': project_id})
try:
resp_dict = self.queue_ctrl.get_metadata(queue_name,
project=project_id)
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise falcon.HTTPNotFound()
except Exception as ex:
LOG.exception(ex)
description = _(u'Queue metadata could not be retrieved.')
raise wsgi_errors.HTTPServiceUnavailable(description)
resp.content_location = req.path
resp.body = utils.to_json(resp_dict)
# status defaults to 200
def on_put(self, req, resp, project_id, queue_name):
LOG.debug(_(u'Queue metadata PUT - queue: %(queue)s, '
u'project: %(project)s'),
{'queue': queue_name, 'project': project_id})
try:
# Place JSON size restriction before parsing
self._validate.queue_metadata_length(req.content_length)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
# Deserialize queue metadata
metadata, = wsgi_utils.filter_stream(req.stream,
req.content_length,
spec=None)
try:
self.queue_ctrl.set_metadata(queue_name,
metadata=metadata,
project=project_id)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except storage_errors.QueueDoesNotExist:
raise falcon.HTTPNotFound()
except Exception as ex:
LOG.exception(ex)
description = _(u'Metadata could not be updated.')
raise wsgi_errors.HTTPServiceUnavailable(description)
resp.status = falcon.HTTP_204
resp.location = req.path

View File

@ -0,0 +1,139 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import six
from marconi.openstack.common.gettextutils import _
import marconi.openstack.common.log as logging
from marconi.queues.transport import utils
from marconi.queues.transport import validation
from marconi.queues.transport.wsgi import errors as wsgi_errors
LOG = logging.getLogger(__name__)
class ItemResource(object):
__slots__ = ('queue_controller', 'message_controller')
def __init__(self, queue_controller, message_controller):
self.queue_controller = queue_controller
self.message_controller = message_controller
def on_put(self, req, resp, project_id, queue_name):
LOG.debug(_(u'Queue item PUT - queue: %(queue)s, '
u'project: %(project)s'),
{'queue': queue_name, 'project': project_id})
try:
created = self.queue_controller.create(
queue_name, project=project_id)
except Exception as ex:
LOG.exception(ex)
description = _(u'Queue could not be created.')
raise wsgi_errors.HTTPServiceUnavailable(description)
resp.status = falcon.HTTP_201 if created else falcon.HTTP_204
resp.location = req.path
def on_head(self, req, resp, project_id, queue_name):
LOG.debug(_(u'Queue item exists - queue: %(queue)s, '
u'project: %(project)s'),
{'queue': queue_name, 'project': project_id})
if self.queue_controller.exists(queue_name, project=project_id):
resp.status = falcon.HTTP_204
else:
resp.status = falcon.HTTP_404
resp.content_location = req.path
on_get = on_head
def on_delete(self, req, resp, project_id, queue_name):
LOG.debug(_(u'Queue item DELETE - queue: %(queue)s, '
u'project: %(project)s'),
{'queue': queue_name, 'project': project_id})
try:
self.queue_controller.delete(queue_name, project=project_id)
except Exception as ex:
LOG.exception(ex)
description = _(u'Queue could not be deleted.')
raise wsgi_errors.HTTPServiceUnavailable(description)
resp.status = falcon.HTTP_204
class CollectionResource(object):
__slots__ = ('queue_controller', '_validate')
def __init__(self, validate, queue_controller):
self.queue_controller = queue_controller
self._validate = validate
def on_get(self, req, resp, project_id):
kwargs = {}
# NOTE(kgriffs): This syntax ensures that
# we don't clobber default values with None.
req.get_param('marker', store=kwargs)
req.get_param_as_int('limit', store=kwargs)
req.get_param_as_bool('detailed', store=kwargs)
try:
self._validate.queue_listing(**kwargs)
results = self.queue_controller.list(project=project_id, **kwargs)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except Exception as ex:
LOG.exception(ex)
description = _(u'Queues could not be listed.')
raise wsgi_errors.HTTPServiceUnavailable(description)
# Buffer list of queues
queues = list(next(results))
# Check for an empty list
if len(queues) == 0:
resp.status = falcon.HTTP_204
return
# Got some. Prepare the response.
kwargs['marker'] = next(results)
for each_queue in queues:
each_queue['href'] = req.path + '/' + each_queue['name']
response_body = {
'queues': queues,
'links': [
{
'rel': 'next',
'href': req.path + falcon.to_query_str(kwargs)
}
]
}
resp.content_location = req.relative_uri
resp.body = utils.to_json(response_body)
# status defaults to 200

View File

@ -0,0 +1,199 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""shards: a resource to handle storage shard management
A shard is added by an operator by interacting with the
sharding-related endpoints. When specifying a shard, the
following fields are required:
{
"name": string,
"weight": integer,
"uri": string::uri
}
Furthermore, depending on the underlying storage type of shard being
registered, there is an optional field:
{
"options": {...}
}
"""
import falcon
import jsonschema
from marconi.common.schemas import shards as schema
from marconi.common.transport.wsgi import utils
from marconi.common import utils as common_utils
from marconi.openstack.common import log
from marconi.queues.storage import errors
from marconi.queues.storage import utils as storage_utils
from marconi.queues.transport import utils as transport_utils
from marconi.queues.transport.wsgi import errors as wsgi_errors
LOG = log.getLogger(__name__)
class Listing(object):
"""A resource to list registered shards
:param shards_controller: means to interact with storage
"""
def __init__(self, shards_controller):
self._ctrl = shards_controller
def on_get(self, request, response, project_id):
"""Returns a shard listing as objects embedded in an array:
[
{"href": "", "weight": 100, "uri": ""},
...
]
:returns: HTTP | [200, 204]
"""
LOG.debug(u'LIST shards')
store = {}
request.get_param('marker', store=store)
request.get_param_as_int('limit', store=store)
request.get_param_as_bool('detailed', store=store)
results = {}
results['shards'] = list(self._ctrl.list(**store))
for entry in results['shards']:
entry['href'] = request.path + '/' + entry.pop('name')
if not results['shards']:
response.status = falcon.HTTP_204
return
response.content_location = request.relative_uri
response.body = transport_utils.to_json(results)
response.status = falcon.HTTP_200
class Resource(object):
"""A handler for individual shard.
:param shards_controller: means to interact with storage
"""
def __init__(self, shards_controller):
self._ctrl = shards_controller
validator_type = jsonschema.Draft4Validator
self._validators = {
'weight': validator_type(schema.patch_weight),
'uri': validator_type(schema.patch_uri),
'options': validator_type(schema.patch_options),
'create': validator_type(schema.create)
}
def on_get(self, request, response, project_id, shard):
"""Returns a JSON object for a single shard entry:
{"weight": 100, "uri": "", options: {...}}
:returns: HTTP | [200, 404]
"""
LOG.debug(u'GET shard - name: %s', shard)
data = None
detailed = request.get_param_as_bool('detailed') or False
try:
data = self._ctrl.get(shard, detailed)
except errors.ShardDoesNotExist as ex:
LOG.debug(ex)
raise falcon.HTTPNotFound()
data['href'] = request.path
# remove the name entry - it isn't needed on GET
del data['name']
response.body = transport_utils.to_json(data)
response.content_location = request.relative_uri
def on_put(self, request, response, project_id, shard):
"""Registers a new shard. Expects the following input:
{"weight": 100, "uri": ""}
An options object may also be provided.
:returns: HTTP | [201, 204]
"""
LOG.debug(u'PUT shard - name: %s', shard)
data = utils.load(request)
utils.validate(self._validators['create'], data)
if not storage_utils.can_connect(data['uri']):
raise wsgi_errors.HTTPBadRequestBody(
'cannot connect to %s' % data['uri']
)
self._ctrl.create(shard, weight=data['weight'],
uri=data['uri'],
options=data.get('options', {}))
response.status = falcon.HTTP_201
response.location = request.path
def on_delete(self, request, response, project_id, shard):
"""Deregisters a shard.
:returns: HTTP | 204
"""
LOG.debug(u'DELETE shard - name: %s', shard)
self._ctrl.delete(shard)
response.status = falcon.HTTP_204
def on_patch(self, request, response, project_id, shard):
"""Allows one to update a shard's weight, uri, and/or options.
This method expects the user to submit a JSON object
containing atleast one of: 'uri', 'weight', 'options'. If
none are found, the request is flagged as bad. There is also
strict format checking through the use of
jsonschema. Appropriate errors are returned in each case for
badly formatted input.
:returns: HTTP | 200,400
"""
LOG.debug(u'PATCH shard - name: %s', shard)
data = utils.load(request)
EXPECT = ('weight', 'uri', 'options')
if not any([(field in data) for field in EXPECT]):
LOG.debug(u'PATCH shard, bad params')
raise wsgi_errors.HTTPBadRequestBody(
'One of `uri`, `weight`, or `options` needs '
'to be specified'
)
for field in EXPECT:
utils.validate(self._validators[field], data)
if 'uri' in data and not storage_utils.can_connect(data['uri']):
raise wsgi_errors.HTTPBadRequestBody(
'cannot connect to %s' % data['uri']
)
fields = common_utils.fields(data, EXPECT,
pred=lambda v: v is not None)
try:
self._ctrl.update(shard, **fields)
except errors.ShardDoesNotExist as ex:
LOG.exception(ex)
raise falcon.HTTPNotFound()

View File

@ -0,0 +1,64 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
from marconi.openstack.common.gettextutils import _
import marconi.openstack.common.log as logging
from marconi.queues.storage import errors as storage_errors
from marconi.queues.transport import utils
from marconi.queues.transport.wsgi import errors as wsgi_errors
LOG = logging.getLogger(__name__)
class Resource(object):
__slots__ = ('queue_ctrl')
def __init__(self, queue_controller):
self.queue_ctrl = queue_controller
def on_get(self, req, resp, project_id, queue_name):
try:
resp_dict = self.queue_ctrl.stats(queue_name,
project=project_id)
message_stats = resp_dict['messages']
if message_stats['total'] != 0:
base_path = req.path[:req.path.rindex('/')] + '/messages/'
newest = message_stats['newest']
newest['href'] = base_path + newest['id']
del newest['id']
oldest = message_stats['oldest']
oldest['href'] = base_path + oldest['id']
del oldest['id']
resp.content_location = req.path
resp.body = utils.to_json(resp_dict)
# status defaults to 200
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise falcon.HTTPNotFound()
except Exception as ex:
LOG.exception(ex)
description = _(u'Queue stats could not be read.')
raise wsgi_errors.HTTPServiceUnavailable(description)

View File

@ -39,11 +39,8 @@ marconi.queues.control.storage =
mongodb = marconi.queues.storage.mongodb.driver:ControlDriver
faulty = marconi.tests.faulty_storage:ControlDriver
marconi.queues.public.transport =
wsgi = marconi.queues.transport.wsgi.public.driver:Driver
marconi.queues.admin.transport =
wsgi = marconi.queues.transport.wsgi.admin.driver:Driver
marconi.queues.transport =
wsgi = marconi.queues.transport.wsgi.driver:Driver
marconi.openstack.common.cache.backends =
memory = marconi.openstack.common.cache._backends.memory:MemoryBackend
@ -55,7 +52,7 @@ oslo.config.opts =
marconi.storage.sharding = marconi.queues.storage.sharding._config_options
marconi.storage.mongodb = marconi.queues.storage.mongodb.options._config_options
marconi.storage.sqlite = marconi.queues.storage.sqlite.options._config_options
marconi.transport.wsgi = marconi.queues.transport.wsgi.driver._config_options
marconi.transport.wsgi = marconi.queues.transport.wsgi.v1_0.driver._config_options
marconi.transport.base = marconi.queues.transport.base._config_options
marconi.transport.validation = marconi.queues.transport.validation._config_options