Files
zaqar/marconi/transport/wsgi/messages.py
kgriffs b2ad8f082e Use req.get_param's store feature in lieu of utils.purge
This patch removes the use of utils.purge since it is no
longer needed in light of Falcon's relatively new "store"
feature, which causes the target dict to only receive the
value of the param if it is found in the request, thus
avoiding clobbering default kwarg values, which was the
original intent of utils.purge.

As a bonus, this approach is also ~40% faster.

Change-Id: I6b3d82b1f700045e50b3131902b2c7b6dc6ea13c
Implements: blueprint v1-obvious-optimizations
2013-08-15 11:29:10 -05:00

322 lines
11 KiB
Python

# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
from marconi.common import config
from marconi.common import exceptions as input_exceptions
import marconi.openstack.common.log as logging
from marconi.storage import exceptions as storage_exceptions
from marconi.transport import utils
from marconi.transport import validation as validate
from marconi.transport.wsgi import exceptions as wsgi_exceptions
from marconi.transport.wsgi import utils as wsgi_utils
LOG = logging.getLogger(__name__)
CFG = config.namespace('drivers:transport:wsgi').from_options(
content_max_length=256 * 1024
)
MESSAGE_POST_SPEC = (('ttl', int), ('body', '*'))
class CollectionResource(object):
__slots__ = ('message_controller')
def __init__(self, message_controller):
self.message_controller = message_controller
#-----------------------------------------------------------------------
# Helpers
#-----------------------------------------------------------------------
def _get_by_id(self, base_path, project_id, queue_name, ids):
"""Returns one or more messages from the queue by ID."""
try:
validate.message_listing(limit=len(ids))
messages = self.message_controller.bulk_get(
queue_name,
message_ids=ids,
project=project_id)
except input_exceptions.ValidationFailed as ex:
raise wsgi_exceptions.HTTPBadRequestBody(str(ex))
except Exception as ex:
LOG.exception(ex)
description = _(u'Message could not be retrieved.')
raise wsgi_exceptions.HTTPServiceUnavailable(description)
# Prepare response
messages = list(messages)
if not messages:
return None
base_path += '/'
for each_message in messages:
each_message['href'] = base_path + each_message['id']
del each_message['id']
return messages
def _get(self, req, project_id, queue_name):
uuid = req.get_header('Client-ID', required=True)
kwargs = {}
# NOTE(kgriffs): This syntax ensures that
# we don't clobber default values with None.
req.get_param('marker', store=kwargs)
req.get_param_as_int('limit', store=kwargs)
req.get_param_as_bool('echo', store=kwargs)
req.get_param_as_bool('include_claimed', store=kwargs)
try:
validate.message_listing(**kwargs)
results = self.message_controller.list(
queue_name,
project=project_id,
client_uuid=uuid,
**kwargs)
# Buffer messages
cursor = next(results)
messages = list(cursor)
except input_exceptions.ValidationFailed as ex:
raise wsgi_exceptions.HTTPBadRequestBody(str(ex))
except storage_exceptions.DoesNotExist:
raise falcon.HTTPNotFound()
except storage_exceptions.MalformedMarker:
title = _(u'Invalid query string parameter')
description = _(u'The value for the query string '
u'parameter "marker" could not be '
u'parsed. We recommend using the '
u'"next" URI from a previous '
u'request directly, rather than '
u'constructing the URI manually. ')
raise falcon.HTTPBadRequest(title, description)
except Exception as ex:
LOG.exception(ex)
description = _(u'Messages could not be listed.')
raise wsgi_exceptions.HTTPServiceUnavailable(description)
if not messages:
return None
# Found some messages, so prepare the response
kwargs['marker'] = next(results)
for each_message in messages:
each_message['href'] = req.path + '/' + each_message['id']
del each_message['id']
return {
'messages': messages,
'links': [
{
'rel': 'next',
'href': req.path + falcon.to_query_str(kwargs)
}
]
}
#-----------------------------------------------------------------------
# Interface
#-----------------------------------------------------------------------
def on_post(self, req, resp, project_id, queue_name):
LOG.debug(_(u'Messages collection POST - queue: %(queue)s, '
u'project: %(project)s') %
{'queue': queue_name, 'project': project_id})
uuid = req.get_header('Client-ID', required=True)
# Place JSON size restriction before parsing
if req.content_length > CFG.content_max_length:
description = _(u'Message collection size is too large.')
raise wsgi_exceptions.HTTPBadRequestBody(description)
# Pull out just the fields we care about
messages = wsgi_utils.filter_stream(
req.stream,
req.content_length,
MESSAGE_POST_SPEC,
doctype=wsgi_utils.JSONArray)
# Verify that at least one message was provided.
# NOTE(kgriffs): This check assumes messages is a
# collection (not a generator).
if not messages:
description = _(u'No messages were provided.')
raise wsgi_exceptions.HTTPBadRequestBody(description)
# Enqueue the messages
partial = False
try:
# No need to check each message's size if it
# can not exceed the request size limit
validate.message_posting(
messages, check_size=(
validate.CFG.message_size_uplimit <
CFG.content_max_length))
message_ids = self.message_controller.post(
queue_name,
messages=messages,
project=project_id,
client_uuid=uuid)
except input_exceptions.ValidationFailed as ex:
raise wsgi_exceptions.HTTPBadRequestBody(str(ex))
except storage_exceptions.DoesNotExist:
raise falcon.HTTPNotFound()
except storage_exceptions.MessageConflict as ex:
LOG.exception(ex)
partial = True
message_ids = ex.succeeded_ids
if not message_ids:
# TODO(kgriffs): Include error code that is different
# from the code used in the generic case, below.
description = _(u'No messages could be enqueued.')
raise wsgi_exceptions.HTTPServiceUnavailable(description)
except Exception as ex:
LOG.exception(ex)
description = _(u'Messages could not be enqueued.')
raise wsgi_exceptions.HTTPServiceUnavailable(description)
# Prepare the response
resp.status = falcon.HTTP_201
ids_value = ','.join(message_ids)
resp.location = req.path + '?ids=' + ids_value
hrefs = [req.path + '/' + id for id in message_ids]
body = {'resources': hrefs, 'partial': partial}
resp.body = utils.to_json(body)
def on_get(self, req, resp, project_id, queue_name):
LOG.debug(_(u'Messages collection GET - queue: %(queue)s, '
u'project: %(project)s') %
{'queue': queue_name, 'project': project_id})
resp.content_location = req.relative_uri
ids = req.get_param_as_list('ids')
if ids is None:
response = self._get(req, project_id, queue_name)
else:
base_path = req.path + '/messages'
response = self._get_by_id(base_path, project_id, queue_name, ids)
if response is None:
resp.status = falcon.HTTP_204
return
resp.body = utils.to_json(response)
def on_delete(self, req, resp, project_id, queue_name):
# NOTE(zyuan): Attempt to delete the whole message collection
# (without an "ids" parameter) is not allowed
ids = req.get_param_as_list('ids', required=True)
try:
validate.message_listing(limit=len(ids))
self.message_controller.bulk_delete(
queue_name,
message_ids=ids,
project=project_id)
except input_exceptions.ValidationFailed as ex:
raise wsgi_exceptions.HTTPBadRequestBody(str(ex))
except Exception as ex:
LOG.exception(ex)
description = _(u'Messages could not be deleted.')
raise wsgi_exceptions.HTTPServiceUnavailable(description)
resp.status = falcon.HTTP_204
class ItemResource(object):
__slots__ = ('message_controller')
def __init__(self, message_controller):
self.message_controller = message_controller
def on_get(self, req, resp, project_id, queue_name, message_id):
LOG.debug(_(u'Messages item GET - message: %(message)s, '
u'queue: %(queue)s, project: %(project)s') %
{'message': message_id,
'queue': queue_name,
'project': project_id})
try:
message = self.message_controller.get(
queue_name,
message_id,
project=project_id)
except storage_exceptions.DoesNotExist:
raise falcon.HTTPNotFound()
except Exception as ex:
LOG.exception(ex)
description = _(u'Message could not be retrieved.')
raise wsgi_exceptions.HTTPServiceUnavailable(description)
# Prepare response
message['href'] = req.path
del message['id']
resp.content_location = req.relative_uri
resp.body = utils.to_json(message)
resp.status = falcon.HTTP_200
def on_delete(self, req, resp, project_id, queue_name, message_id):
LOG.debug(_(u'Messages item DELETE - message: %(message)s, '
u'queue: %(queue)s, project: %(project)s') %
{'message': message_id,
'queue': queue_name,
'project': project_id})
try:
self.message_controller.delete(
queue_name,
message_id=message_id,
project=project_id,
claim=req.get_param('claim_id'))
except storage_exceptions.NotPermitted as ex:
LOG.exception(ex)
title = _(u'Invalid claim')
description = _(u'The specified claim either does not '
u'exist or has expired.')
raise falcon.HTTPForbidden(title, description)
except Exception as ex:
LOG.exception(ex)
description = _(u'Message could not be deleted.')
raise wsgi_exceptions.HTTPServiceUnavailable(description)
# Alles guete
resp.status = falcon.HTTP_204