Merge "API v1.1 - Encapsulate message post bodies in a JSON object"
This commit is contained in:
commit
b3f1b48d0f
@ -29,32 +29,21 @@ JSONArray = list
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def filter_stream(stream, len, spec=None, doctype=JSONObject):
|
||||
"""Reads, deserializes, and validates a document from a stream.
|
||||
#
|
||||
# TODO(kgriffs): Create Falcon "before" hooks adapters for these functions
|
||||
#
|
||||
|
||||
|
||||
def deserialize(stream, len):
|
||||
"""Deserializes JSON from a file-like stream.
|
||||
|
||||
This function deserializes JSON from a stream, including
|
||||
translating read and parsing errors to HTTP error types.
|
||||
|
||||
:param stream: file-like object from which to read an object or
|
||||
array of objects.
|
||||
:param len: number of bytes to read from stream
|
||||
:param spec: (Default None) Iterable describing expected fields,
|
||||
yielding tuples with the form of:
|
||||
|
||||
(field_name, value_type, default_value)
|
||||
|
||||
Note that value_type may either be a Python type, or the
|
||||
special string '*' to accept any type. default_value is the
|
||||
default to give the field if it is missing, or None to require
|
||||
that the field be present.
|
||||
|
||||
If spec is None, the incoming documents will not be validated.
|
||||
:param doctype: type of document to expect; must be either
|
||||
JSONObject or JSONArray.
|
||||
:raises: HTTPBadRequest, HTTPServiceUnavailable
|
||||
:returns: A sanitized, filtered version of the document list read
|
||||
from the stream. If the document contains a list of objects,
|
||||
each object will be filtered and returned in a new list. If,
|
||||
on the other hand, the document is expected to contain a
|
||||
single object, that object will be filtered and returned as
|
||||
a single-element iterable.
|
||||
"""
|
||||
|
||||
if len is None:
|
||||
@ -65,7 +54,7 @@ def filter_stream(stream, len, spec=None, doctype=JSONObject):
|
||||
# TODO(kgriffs): read_json should stream the resulting list
|
||||
# of messages, returning a generator rather than buffering
|
||||
# everything in memory (bp/streaming-serialization).
|
||||
document = utils.read_json(stream, len)
|
||||
return utils.read_json(stream, len)
|
||||
|
||||
except utils.MalformedJSON as ex:
|
||||
LOG.debug(ex)
|
||||
@ -83,11 +72,37 @@ def filter_stream(stream, len, spec=None, doctype=JSONObject):
|
||||
description = _(u'Request body could not be read.')
|
||||
raise errors.HTTPServiceUnavailable(description)
|
||||
|
||||
|
||||
def sanitize(document, spec=None, doctype=JSONObject):
|
||||
"""Validates a document and drops undesired fields.
|
||||
|
||||
:param document: A dict to verify according to `spec`.
|
||||
:param spec: (Default None) Iterable describing expected fields,
|
||||
yielding tuples with the form of:
|
||||
|
||||
(field_name, value_type, default_value)
|
||||
|
||||
Note that value_type may either be a Python type, or the
|
||||
special string '*' to accept any type. default_value is the
|
||||
default to give the field if it is missing, or None to require
|
||||
that the field be present.
|
||||
|
||||
If spec is None, the incoming documents will not be validated.
|
||||
:param doctype: type of document to expect; must be either
|
||||
JSONObject or JSONArray.
|
||||
:raises: HTTPBadRequestBody
|
||||
:returns: A sanitized, filtered version of the document. If the
|
||||
document is a list of objects, each object will be filtered
|
||||
and returned in a new list. If, on the other hand, the document
|
||||
is expected to contain a single object, that object's fields will
|
||||
be filtered and the resulting object will be returned.
|
||||
"""
|
||||
|
||||
if doctype is JSONObject:
|
||||
if not isinstance(document, JSONObject):
|
||||
raise errors.HTTPDocumentTypeNotSupported()
|
||||
|
||||
return (document,) if spec is None else (filter(document, spec),)
|
||||
return document if spec is None else filter(document, spec)
|
||||
|
||||
if doctype is JSONArray:
|
||||
if not isinstance(document, JSONArray):
|
||||
|
@ -52,8 +52,8 @@ class CollectionResource(Resource):
|
||||
|
||||
# Read claim metadata (e.g., TTL) and raise appropriate
|
||||
# HTTP errors as needed.
|
||||
metadata, = wsgi_utils.filter_stream(req.stream, req.content_length,
|
||||
CLAIM_POST_SPEC)
|
||||
document = wsgi_utils.deserialize(req.stream, req.content_length)
|
||||
metadata = wsgi_utils.sanitize(document, CLAIM_POST_SPEC)
|
||||
|
||||
# Claim some messages
|
||||
try:
|
||||
@ -148,8 +148,8 @@ class ItemResource(Resource):
|
||||
|
||||
# Read claim metadata (e.g., TTL) and raise appropriate
|
||||
# HTTP errors as needed.
|
||||
metadata, = wsgi_utils.filter_stream(req.stream, req.content_length,
|
||||
CLAIM_PATCH_SPEC)
|
||||
document = wsgi_utils.deserialize(req.stream, req.content_length)
|
||||
metadata = wsgi_utils.sanitize(document, CLAIM_PATCH_SPEC)
|
||||
|
||||
try:
|
||||
self._validate.claim_updating(metadata)
|
||||
|
@ -145,12 +145,10 @@ class CollectionResource(object):
|
||||
LOG.debug(ex)
|
||||
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||
|
||||
# Pull out just the fields we care about
|
||||
messages = wsgi_utils.filter_stream(
|
||||
req.stream,
|
||||
req.content_length,
|
||||
MESSAGE_POST_SPEC,
|
||||
doctype=wsgi_utils.JSONArray)
|
||||
# Deserialize and validate the request body
|
||||
document = wsgi_utils.deserialize(req.stream, req.content_length)
|
||||
messages = wsgi_utils.sanitize(document, MESSAGE_POST_SPEC,
|
||||
doctype=wsgi_utils.JSONArray)
|
||||
|
||||
# Enqueue the messages
|
||||
partial = False
|
||||
|
@ -71,9 +71,8 @@ class Resource(object):
|
||||
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||
|
||||
# Deserialize queue metadata
|
||||
metadata, = wsgi_utils.filter_stream(req.stream,
|
||||
req.content_length,
|
||||
spec=None)
|
||||
document = wsgi_utils.deserialize(req.stream, req.content_length)
|
||||
metadata = wsgi_utils.sanitize(document, spec=None)
|
||||
|
||||
try:
|
||||
self.queue_ctrl.set_metadata(queue_name,
|
||||
|
@ -74,8 +74,8 @@ class CollectionResource(object):
|
||||
else:
|
||||
# Read claim metadata (e.g., TTL) and raise appropriate
|
||||
# HTTP errors as needed.
|
||||
metadata, = wsgi_utils.filter_stream(
|
||||
req.stream, req.content_length, self._claim_post_spec)
|
||||
document = wsgi_utils.deserialize(req.stream, req.content_length)
|
||||
metadata = wsgi_utils.sanitize(document, self._claim_post_spec)
|
||||
|
||||
# Claim some messages
|
||||
try:
|
||||
@ -171,8 +171,8 @@ class ItemResource(object):
|
||||
|
||||
# Read claim metadata (e.g., TTL) and raise appropriate
|
||||
# HTTP errors as needed.
|
||||
metadata, = wsgi_utils.filter_stream(req.stream, req.content_length,
|
||||
CLAIM_PATCH_SPEC)
|
||||
document = wsgi_utils.deserialize(req.stream, req.content_length)
|
||||
metadata = wsgi_utils.sanitize(document, CLAIM_PATCH_SPEC)
|
||||
|
||||
try:
|
||||
self._validate.claim_updating(metadata)
|
||||
|
@ -159,12 +159,16 @@ class CollectionResource(object):
|
||||
LOG.debug(ex)
|
||||
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||
|
||||
# Pull out just the fields we care about
|
||||
messages = wsgi_utils.filter_stream(
|
||||
req.stream,
|
||||
req.content_length,
|
||||
self._message_post_spec,
|
||||
doctype=wsgi_utils.JSONArray)
|
||||
# Deserialize and validate the incoming messages
|
||||
document = wsgi_utils.deserialize(req.stream, req.content_length)
|
||||
|
||||
if 'messages' not in document:
|
||||
description = _(u'No messages were found in the request body.')
|
||||
raise wsgi_errors.HTTPBadRequestAPI(description)
|
||||
|
||||
messages = wsgi_utils.sanitize(document['messages'],
|
||||
self._message_post_spec,
|
||||
doctype=wsgi_utils.JSONArray)
|
||||
|
||||
# Enqueue the messages
|
||||
partial = False
|
||||
|
@ -71,9 +71,8 @@ class Resource(object):
|
||||
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||
|
||||
# Deserialize queue metadata
|
||||
metadata, = wsgi_utils.filter_stream(req.stream,
|
||||
req.content_length,
|
||||
spec=None)
|
||||
document = wsgi_utils.deserialize(req.stream, req.content_length)
|
||||
metadata = wsgi_utils.sanitize(document, spec=None)
|
||||
|
||||
try:
|
||||
self.queue_ctrl.set_metadata(queue_name,
|
||||
|
@ -95,13 +95,27 @@ def single_message_body(messagesize=2, default_ttl=False, ttl=None):
|
||||
|
||||
|
||||
def create_message_body(messagecount, **kwargs):
|
||||
"""Returns request body for post message tests.
|
||||
"""Returns request body for message-posting tests.
|
||||
|
||||
:param messagecount: Number of messages to create
|
||||
:param **kwargs: Same as for `single_message_body`
|
||||
"""
|
||||
|
||||
return [single_message_body(**kwargs) for i in range(messagecount)]
|
||||
return [single_message_body(**kwargs)
|
||||
for i in range(messagecount)]
|
||||
|
||||
|
||||
def create_message_body_v1_1(messagecount, **kwargs):
|
||||
"""Returns request body for message-posting tests.
|
||||
|
||||
:param messagecount: Number of messages to create
|
||||
:param **kwargs: Same as for `single_message_body`
|
||||
"""
|
||||
|
||||
return {
|
||||
"messages": [single_message_body(**kwargs)
|
||||
for i in range(messagecount)]
|
||||
}
|
||||
|
||||
|
||||
def create_pool_body(**kwargs):
|
||||
|
@ -20,6 +20,7 @@ from marconi.tests.queues.transport.wsgi.v1 import test_media_type
|
||||
from marconi.tests.queues.transport.wsgi.v1 import test_messages
|
||||
from marconi.tests.queues.transport.wsgi.v1 import test_pools
|
||||
from marconi.tests.queues.transport.wsgi.v1 import test_queue_lifecycle as l
|
||||
from marconi.tests.queues.transport.wsgi.v1 import test_validation
|
||||
|
||||
TestAuth = test_auth.TestAuth
|
||||
TestClaimsFaultyDriver = test_claims.TestClaimsFaultyDriver
|
||||
@ -37,3 +38,4 @@ TestQueueLifecycleMongoDB = l.TestQueueLifecycleMongoDB
|
||||
TestQueueLifecycleSqlalchemy = l.TestQueueLifecycleSqlalchemy
|
||||
TestPoolsMongoDB = test_pools.TestPoolsMongoDB
|
||||
TestPoolsSqlalchemy = test_pools.TestPoolsSqlalchemy
|
||||
TestValidation = test_validation.TestValidation
|
||||
|
@ -382,6 +382,7 @@ class MessagesBaseTest(base.V1Base):
|
||||
def test_when_claim_deleted_then_messages_unclaimed(self):
|
||||
path = self.queue_path
|
||||
self._post_messages(path + '/messages', repeat=5)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
# post claim
|
||||
self.simulate_post(path + '/claims', self.project_id,
|
||||
|
@ -21,20 +21,16 @@ import falcon
|
||||
from marconi.tests.queues.transport.wsgi import base
|
||||
|
||||
|
||||
class ValidationTest(base.TestBase):
|
||||
class TestValidation(base.V1Base):
|
||||
|
||||
config_file = 'wsgi_sqlalchemy_validation.conf'
|
||||
|
||||
def setUp(self):
|
||||
super(ValidationTest, self).setUp()
|
||||
super(TestValidation, self).setUp()
|
||||
|
||||
self.project_id = '7e55e1a7e'
|
||||
|
||||
# NOTE(kgriffs): ATM, validation logic does not key off
|
||||
# the API version. Therefore, we just pick '/v1' arbitrarily
|
||||
# as the url prefix.
|
||||
self.queue_path = '/v1/queues/noein'
|
||||
|
||||
self.queue_path = self.url_prefix + '/queues/noein'
|
||||
self.simulate_put(self.queue_path, self.project_id)
|
||||
|
||||
self.headers = {
|
||||
@ -43,7 +39,7 @@ class ValidationTest(base.TestBase):
|
||||
|
||||
def tearDown(self):
|
||||
self.simulate_delete(self.queue_path, self.project_id)
|
||||
super(ValidationTest, self).tearDown()
|
||||
super(TestValidation, self).tearDown()
|
||||
|
||||
def test_metadata_deserialization(self):
|
||||
# Normal case
|
@ -20,6 +20,7 @@ from marconi.tests.queues.transport.wsgi.v1_1 import test_media_type
|
||||
from marconi.tests.queues.transport.wsgi.v1_1 import test_messages
|
||||
from marconi.tests.queues.transport.wsgi.v1_1 import test_pools
|
||||
from marconi.tests.queues.transport.wsgi.v1_1 import test_queue_lifecycle as l
|
||||
from marconi.tests.queues.transport.wsgi.v1_1 import test_validation
|
||||
|
||||
TestAuth = test_auth.TestAuth
|
||||
TestClaimsFaultyDriver = test_claims.TestClaimsFaultyDriver
|
||||
@ -37,3 +38,4 @@ TestQueueLifecycleMongoDB = l.TestQueueLifecycleMongoDB
|
||||
TestQueueLifecycleSqlalchemy = l.TestQueueLifecycleSqlalchemy
|
||||
TestPoolsMongoDB = test_pools.TestPoolsMongoDB
|
||||
TestPoolsSqlalchemy = test_pools.TestPoolsSqlalchemy
|
||||
TestValidation = test_validation.TestValidation
|
||||
|
@ -49,7 +49,7 @@ class ClaimsBaseTest(base.V1_1Base):
|
||||
self.simulate_put(self.queue_path, body=doc, headers=self.headers)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
doc = json.dumps([{'body': 239, 'ttl': 300}] * 10)
|
||||
doc = json.dumps({'messages': [{'body': 239, 'ttl': 300}] * 10})
|
||||
self.simulate_post(self.queue_path + '/messages',
|
||||
body=doc, headers=self.headers)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_201)
|
||||
|
@ -116,8 +116,9 @@ class TestDefaultLimits(base.V1_1Base):
|
||||
self.simulate_delete(path, headers=self.headers)
|
||||
|
||||
def _prepare_messages(self, count):
|
||||
doc = jsonutils.dumps([{'body': 239, 'ttl': 300}] * count)
|
||||
self.simulate_post(self.messages_path, body=doc,
|
||||
doc = {'messages': [{'body': 239, 'ttl': 300}] * count}
|
||||
body = jsonutils.dumps(doc)
|
||||
self.simulate_post(self.messages_path, body=body,
|
||||
headers=self.headers)
|
||||
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_201)
|
||||
|
@ -70,7 +70,7 @@ class MessagesBaseTest(base.V1_1Base):
|
||||
super(MessagesBaseTest, self).tearDown()
|
||||
|
||||
def _test_post(self, sample_messages):
|
||||
sample_doc = jsonutils.dumps(sample_messages)
|
||||
sample_doc = jsonutils.dumps({'messages': sample_messages})
|
||||
|
||||
result = self.simulate_post(self.messages_path,
|
||||
body=sample_doc, headers=self.headers)
|
||||
@ -176,10 +176,12 @@ class MessagesBaseTest(base.V1_1Base):
|
||||
self._test_post(sample_messages)
|
||||
|
||||
def test_post_optional_ttl(self):
|
||||
sample_messages = [
|
||||
{'body': 239},
|
||||
{'body': {'key': 'value'}, 'ttl': 200},
|
||||
]
|
||||
sample_messages = {
|
||||
'messages': [
|
||||
{'body': 239},
|
||||
{'body': {'key': 'value'}, 'ttl': 200},
|
||||
],
|
||||
}
|
||||
|
||||
# Manually check default TTL is max from config
|
||||
|
||||
@ -259,23 +261,24 @@ class MessagesBaseTest(base.V1_1Base):
|
||||
|
||||
@ddt.data(-1, 59, 1209601)
|
||||
def test_unacceptable_ttl(self, ttl):
|
||||
doc = {'messages': [{'ttl': ttl, 'body': None}]}
|
||||
|
||||
self.simulate_post(self.queue_path + '/messages',
|
||||
body=jsonutils.dumps([{'ttl': ttl,
|
||||
'body': None}]),
|
||||
body=jsonutils.dumps(doc),
|
||||
headers=self.headers)
|
||||
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
def test_exceeded_message_posting(self):
|
||||
# Total (raw request) size
|
||||
doc = jsonutils.dumps([{'body': "some body", 'ttl': 100}] * 20,
|
||||
indent=4)
|
||||
doc = {'messages': [{'body': "some body", 'ttl': 100}] * 20}
|
||||
body = jsonutils.dumps(doc, indent=4)
|
||||
|
||||
max_len = self.transport_cfg.max_message_size
|
||||
long_doc = doc + (' ' * (max_len - len(doc) + 1))
|
||||
long_body = body + (' ' * (max_len - len(body) + 1))
|
||||
|
||||
self.simulate_post(self.queue_path + '/messages',
|
||||
body=long_doc,
|
||||
body=long_body,
|
||||
headers=self.headers)
|
||||
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_400)
|
||||
@ -484,9 +487,10 @@ class MessagesBaseTest(base.V1_1Base):
|
||||
messages[0]['href'])
|
||||
|
||||
def _post_messages(self, target, repeat=1):
|
||||
doc = jsonutils.dumps([{'body': 239, 'ttl': 300}] * repeat)
|
||||
return self.simulate_post(target, body=doc,
|
||||
headers=self.headers)
|
||||
doc = {'messages': [{'body': 239, 'ttl': 300}] * repeat}
|
||||
|
||||
body = jsonutils.dumps(doc)
|
||||
return self.simulate_post(target, body=body, headers=self.headers)
|
||||
|
||||
def _get_msg_id(self, headers):
|
||||
return self._get_msg_ids(headers)[0]
|
||||
@ -532,14 +536,14 @@ class TestMessagesFaultyDriver(base.V1_1BaseFaulty):
|
||||
def test_simple(self):
|
||||
project_id = 'xyz'
|
||||
path = self.url_prefix + '/queues/fizbit/messages'
|
||||
doc = '[{"body": 239, "ttl": 100}]'
|
||||
body = '{"messages": [{"body": 239, "ttl": 100}]}'
|
||||
headers = {
|
||||
'Client-ID': str(uuid.uuid4()),
|
||||
'X-Project-ID': project_id
|
||||
}
|
||||
|
||||
self.simulate_post(path,
|
||||
body=doc,
|
||||
body=body,
|
||||
headers=headers)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_503)
|
||||
|
||||
|
91
marconi/tests/queues/transport/wsgi/v1_1/test_validation.py
Normal file
91
marconi/tests/queues/transport/wsgi/v1_1/test_validation.py
Normal file
@ -0,0 +1,91 @@
|
||||
# Copyright (c) 2013 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import uuid
|
||||
|
||||
import falcon
|
||||
|
||||
from marconi.tests.queues.transport.wsgi import base
|
||||
|
||||
|
||||
class TestValidation(base.V1_1Base):
|
||||
|
||||
config_file = 'wsgi_sqlalchemy_validation.conf'
|
||||
|
||||
def setUp(self):
|
||||
super(TestValidation, self).setUp()
|
||||
|
||||
self.project_id = '7e55e1a7e'
|
||||
|
||||
self.queue_path = self.url_prefix + '/queues/noein'
|
||||
self.simulate_put(self.queue_path, self.project_id)
|
||||
|
||||
self.headers = {
|
||||
'Client-ID': str(uuid.uuid4()),
|
||||
}
|
||||
|
||||
def tearDown(self):
|
||||
self.simulate_delete(self.queue_path, self.project_id)
|
||||
super(TestValidation, self).tearDown()
|
||||
|
||||
def test_metadata_deserialization(self):
|
||||
# Normal case
|
||||
self.simulate_put(self.queue_path + '/metadata',
|
||||
self.project_id,
|
||||
body='{"timespace": "Shangri-la"}')
|
||||
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
||||
|
||||
# Too long
|
||||
max_queue_metadata = 64
|
||||
|
||||
doc_tmpl = '{{"Dragon Torc":"{0}"}}'
|
||||
doc_tmpl_ws = '{{ "Dragon Torc" : "{0}" }}' # with whitespace
|
||||
envelope_length = len(doc_tmpl.format(''))
|
||||
|
||||
for tmpl in doc_tmpl, doc_tmpl_ws:
|
||||
gen = '0' * (max_queue_metadata - envelope_length + 1)
|
||||
doc = tmpl.format(gen)
|
||||
self.simulate_put(self.queue_path + '/metadata',
|
||||
self.project_id,
|
||||
body=doc)
|
||||
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
def test_message_deserialization(self):
|
||||
# Normal case
|
||||
body = '{"messages": [{"body": "Dragon Knights", "ttl": 100}]}'
|
||||
self.simulate_post(self.queue_path + '/messages',
|
||||
self.project_id, body=body,
|
||||
headers=self.headers)
|
||||
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
# Both messages' size are too long
|
||||
max_message_size = 256
|
||||
|
||||
obj = {'a': 0, 'b': ''}
|
||||
envelope_length = len(json.dumps(obj, separators=(',', ':')))
|
||||
obj['b'] = 'x' * (max_message_size - envelope_length + 1)
|
||||
|
||||
for long_body in ('a' * (max_message_size - 2 + 1), obj):
|
||||
doc = json.dumps([{'body': long_body, 'ttl': 100}])
|
||||
self.simulate_post(self.queue_path + '/messages',
|
||||
self.project_id,
|
||||
body=doc,
|
||||
headers=self.headers)
|
||||
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_400)
|
@ -48,7 +48,8 @@ class TestClaims(base.V1FunctionalTestBase):
|
||||
messagecount=self.limits.max_messages_per_page)
|
||||
|
||||
for i in range(10):
|
||||
self.client.post(url, data=doc)
|
||||
result = self.client.post(url, data=doc)
|
||||
self.assertEqual(result.status_code, 201)
|
||||
|
||||
@ddt.data({}, dict(limit=2))
|
||||
def test_claim_messages(self, params):
|
||||
@ -76,6 +77,8 @@ class TestClaims(base.V1FunctionalTestBase):
|
||||
doc = {"ttl": 300, "grace": 100}
|
||||
|
||||
result = self.client.post(params=params, data=doc)
|
||||
self.assertEqual(result.status_code, 201)
|
||||
|
||||
location = result.headers['Location']
|
||||
|
||||
url = self.cfg.marconi.url + location
|
||||
|
@ -46,7 +46,7 @@ class TestClaims(base.V1_1FunctionalTestBase):
|
||||
|
||||
# Post Messages
|
||||
url = self.queue_url + '/messages'
|
||||
doc = helpers.create_message_body(
|
||||
doc = helpers.create_message_body_v1_1(
|
||||
messagecount=self.limits.max_messages_per_page)
|
||||
|
||||
for i in range(10):
|
||||
|
@ -25,7 +25,6 @@ from marconi.tests.functional import helpers
|
||||
|
||||
@ddt.ddt
|
||||
class TestMessages(base.V1_1FunctionalTestBase):
|
||||
|
||||
"""Message Tests Specific to V1.1."""
|
||||
|
||||
server_class = base.MarconiServer
|
||||
@ -53,7 +52,8 @@ class TestMessages(base.V1_1FunctionalTestBase):
|
||||
def _post_large_bulk_insert(self, offset):
|
||||
"""Insert just under than max allowed messages."""
|
||||
|
||||
doc = '[{{"body": "{0}", "ttl": 300}}, {{"body": "{1}", "ttl": 120}}]'
|
||||
doc = ('{{"messages":[{{"body": "{0}", "ttl": 300}},'
|
||||
'{{"body": "{1}", "ttl": 120}}]}}')
|
||||
overhead = len(doc.format('', ''))
|
||||
|
||||
half_size = (self.limits.max_message_size - overhead) // 2
|
||||
@ -68,7 +68,7 @@ class TestMessages(base.V1_1FunctionalTestBase):
|
||||
This test also verifies that claimed messages are
|
||||
retuned (or not) depending on the include_claimed flag.
|
||||
"""
|
||||
doc = helpers.create_message_body(messagecount=1)
|
||||
doc = helpers.create_message_body_v1_1(messagecount=1)
|
||||
|
||||
result = self.client.post(data=doc)
|
||||
self.assertEqual(result.status_code, 201)
|
||||
@ -85,7 +85,7 @@ class TestMessages(base.V1_1FunctionalTestBase):
|
||||
|
||||
# Compare message metadata
|
||||
result_body = result.json()['body']
|
||||
posted_metadata = doc[0]['body']
|
||||
posted_metadata = doc['messages'][0]['body']
|
||||
self.assertEqual(result_body, posted_metadata)
|
||||
|
||||
# Post a claim & verify the include_claimed flag.
|
||||
@ -111,7 +111,7 @@ class TestMessages(base.V1_1FunctionalTestBase):
|
||||
def test_message_bulk_insert(self):
|
||||
"""Bulk Insert Messages into the Queue."""
|
||||
message_count = self.limits.max_messages_per_page
|
||||
doc = helpers.create_message_body(messagecount=message_count)
|
||||
doc = helpers.create_message_body_v1_1(messagecount=message_count)
|
||||
|
||||
result = self.client.post(data=doc)
|
||||
self.assertEqual(result.status_code, 201)
|
||||
@ -143,7 +143,8 @@ class TestMessages(base.V1_1FunctionalTestBase):
|
||||
|
||||
def test_message_default_ttl(self):
|
||||
"""Insert Single Message into the Queue using the default TTL."""
|
||||
doc = helpers.create_message_body(messagecount=1, default_ttl=True)
|
||||
doc = helpers.create_message_body_v1_1(messagecount=1,
|
||||
default_ttl=True)
|
||||
|
||||
result = self.client.post(data=doc)
|
||||
self.assertEqual(result.status_code, 201)
|
||||
@ -172,7 +173,7 @@ class TestMessages(base.V1_1FunctionalTestBase):
|
||||
self.limits.max_messages_per_page)
|
||||
|
||||
# Test Setup
|
||||
doc = helpers.create_message_body(
|
||||
doc = helpers.create_message_body_v1_1(
|
||||
messagecount=self.limits.max_messages_per_page)
|
||||
|
||||
result = self.client.post(data=doc)
|
||||
@ -201,7 +202,7 @@ class TestMessages(base.V1_1FunctionalTestBase):
|
||||
def test_message_delete(self):
|
||||
"""Delete Message."""
|
||||
# Test Setup
|
||||
doc = helpers.create_message_body(messagecount=1)
|
||||
doc = helpers.create_message_body_v1_1(messagecount=1)
|
||||
result = self.client.post(data=doc)
|
||||
self.assertEqual(result.status_code, 201)
|
||||
|
||||
@ -219,7 +220,7 @@ class TestMessages(base.V1_1FunctionalTestBase):
|
||||
|
||||
def test_message_bulk_delete(self):
|
||||
"""Bulk Delete Messages."""
|
||||
doc = helpers.create_message_body(messagecount=10)
|
||||
doc = helpers.create_message_body_v1_1(messagecount=10)
|
||||
result = self.client.post(data=doc)
|
||||
|
||||
self.assertEqual(result.status_code, 201)
|
||||
@ -246,7 +247,7 @@ class TestMessages(base.V1_1FunctionalTestBase):
|
||||
|
||||
def test_message_partial_delete(self):
|
||||
"""Delete Messages will be partially successful."""
|
||||
doc = helpers.create_message_body(messagecount=3)
|
||||
doc = helpers.create_message_body_v1_1(messagecount=3)
|
||||
result = self.client.post(data=doc)
|
||||
|
||||
self.assertEqual(result.status_code, 201)
|
||||
@ -263,7 +264,7 @@ class TestMessages(base.V1_1FunctionalTestBase):
|
||||
@ddt.data(5, 1)
|
||||
def test_messages_pop(self, limit=5):
|
||||
"""Pop messages from a queue."""
|
||||
doc = helpers.create_message_body(messagecount=limit)
|
||||
doc = helpers.create_message_body_v1_1(messagecount=limit)
|
||||
result = self.client.post(data=doc)
|
||||
|
||||
self.assertEqual(result.status_code, 201)
|
||||
@ -287,7 +288,7 @@ class TestMessages(base.V1_1FunctionalTestBase):
|
||||
@ddt.data(10000000, 0, -1)
|
||||
def test_messages_pop_invalid(self, limit):
|
||||
"""Pop messages from a queue."""
|
||||
doc = helpers.create_message_body(
|
||||
doc = helpers.create_message_body_v1_1(
|
||||
messagecount=self.limits.max_messages_per_page)
|
||||
result = self.client.post(data=doc)
|
||||
|
||||
@ -310,7 +311,7 @@ class TestMessages(base.V1_1FunctionalTestBase):
|
||||
|
||||
def test_messages_delete_pop_and_id(self):
|
||||
"""Delete messages with pop & id params in the request."""
|
||||
doc = helpers.create_message_body(
|
||||
doc = helpers.create_message_body_v1_1(
|
||||
messagecount=1)
|
||||
result = self.client.post(data=doc)
|
||||
|
||||
@ -347,7 +348,7 @@ class TestMessages(base.V1_1FunctionalTestBase):
|
||||
|
||||
def test_messages_pop_one(self):
|
||||
"""Pop single messages from a queue."""
|
||||
doc = helpers.create_message_body(
|
||||
doc = helpers.create_message_body_v1_1(
|
||||
messagecount=self.limits.max_messages_per_page)
|
||||
result = self.client.post(data=doc)
|
||||
|
||||
@ -373,7 +374,7 @@ class TestMessages(base.V1_1FunctionalTestBase):
|
||||
|
||||
def test_message_partial_get(self):
|
||||
"""Get Messages will be partially successful."""
|
||||
doc = helpers.create_message_body(messagecount=3)
|
||||
doc = helpers.create_message_body_v1_1(messagecount=3)
|
||||
result = self.client.post(data=doc)
|
||||
|
||||
self.assertEqual(result.status_code, 201)
|
||||
@ -519,7 +520,7 @@ class TestMessages(base.V1_1FunctionalTestBase):
|
||||
|
||||
self.skipTest("Not supported")
|
||||
del self.client.headers["Client-ID"]
|
||||
doc = helpers.create_message_body(messagecount=1)
|
||||
doc = helpers.create_message_body_v1_1(messagecount=1)
|
||||
|
||||
result = self.client.post(data=doc)
|
||||
self.assertEqual(result.status_code, 400)
|
||||
|
@ -291,7 +291,7 @@ class TestQueueMisc(base.V1_1FunctionalTestBase):
|
||||
self.assertEqual(result.status_code, 201)
|
||||
|
||||
# Post Messages to the test queue
|
||||
doc = helpers.create_message_body(
|
||||
doc = helpers.create_message_body_v1_1(
|
||||
messagecount=self.limits.max_messages_per_claim_or_pop)
|
||||
|
||||
message_url = self.queue_url + '/messages'
|
||||
|
@ -117,12 +117,13 @@ class TestUtils(testtools.TestCase):
|
||||
document = six.text_type(json.dumps(obj, ensure_ascii=False))
|
||||
doc_stream = io.StringIO(document)
|
||||
|
||||
filtered = utils.filter_stream(doc_stream, len(document), spec=None)
|
||||
self.assertEqual(filtered[0], obj)
|
||||
deserialized = utils.deserialize(doc_stream, len(document))
|
||||
filtered = utils.sanitize(deserialized, spec=None)
|
||||
self.assertEqual(filtered, obj)
|
||||
|
||||
# NOTE(kgriffs): Ensure default value for *spec* is None
|
||||
doc_stream.seek(0)
|
||||
filtered2 = utils.filter_stream(doc_stream, len(document))
|
||||
filtered2 = utils.sanitize(deserialized)
|
||||
self.assertEqual(filtered2, filtered)
|
||||
|
||||
def test_no_spec_array(self):
|
||||
@ -130,8 +131,9 @@ class TestUtils(testtools.TestCase):
|
||||
document = six.text_type(json.dumps(things, ensure_ascii=False))
|
||||
doc_stream = io.StringIO(document)
|
||||
|
||||
filtered = utils.filter_stream(doc_stream, len(document),
|
||||
doctype=utils.JSONArray, spec=None)
|
||||
deserialized = utils.deserialize(doc_stream, len(document))
|
||||
filtered = utils.sanitize(deserialized, doctype=utils.JSONArray,
|
||||
spec=None)
|
||||
self.assertEqual(filtered, things)
|
||||
|
||||
def test_filter_star(self):
|
||||
@ -148,13 +150,15 @@ class TestUtils(testtools.TestCase):
|
||||
document = six.text_type(json.dumps(obj, ensure_ascii=False))
|
||||
stream = io.StringIO(document)
|
||||
spec = [('body', dict, None), ('id', six.string_types, None)]
|
||||
filtered_object, = utils.filter_stream(stream, len(document), spec)
|
||||
|
||||
# Positive test
|
||||
deserialized_object = utils.deserialize(stream, len(document))
|
||||
filtered_object = utils.sanitize(deserialized_object, spec)
|
||||
self.assertEqual(filtered_object, obj)
|
||||
|
||||
stream.seek(0)
|
||||
# Negative test
|
||||
self.assertRaises(falcon.HTTPBadRequest,
|
||||
utils.filter_stream, stream, len(document), spec,
|
||||
utils.sanitize, deserialized_object, spec,
|
||||
doctype=utils.JSONArray)
|
||||
|
||||
def test_filter_stream_expect_array(self):
|
||||
@ -163,26 +167,24 @@ class TestUtils(testtools.TestCase):
|
||||
document = six.text_type(json.dumps(array, ensure_ascii=False))
|
||||
stream = io.StringIO(document)
|
||||
spec = [('body', dict, None)]
|
||||
filtered_objects = list(utils.filter_stream(
|
||||
stream, len(document), spec, doctype=utils.JSONArray))
|
||||
|
||||
self.assertEqual(filtered_objects, array)
|
||||
# Positive test
|
||||
deserialized_object = utils.deserialize(stream, len(document))
|
||||
filtered_object = utils.sanitize(deserialized_object, spec,
|
||||
doctype=utils.JSONArray)
|
||||
self.assertEqual(filtered_object, array)
|
||||
|
||||
stream.seek(0)
|
||||
# Negative test
|
||||
self.assertRaises(falcon.HTTPBadRequest,
|
||||
utils.filter_stream, stream, len(document), spec,
|
||||
utils.sanitize, deserialized_object, spec,
|
||||
doctype=utils.JSONObject)
|
||||
|
||||
def test_filter_stream_wrong_use(self):
|
||||
document = u'3'
|
||||
stream = io.StringIO(document)
|
||||
spec = None
|
||||
def test_bad_doctype(self):
|
||||
self.assertRaises(TypeError,
|
||||
utils.filter_stream, stream, len(document), spec,
|
||||
doctype=int)
|
||||
utils.sanitize, {}, None, doctype=int)
|
||||
|
||||
def test_filter_stream_no_reading(self):
|
||||
def test_deserialize_bad_stream(self):
|
||||
stream = None
|
||||
length = None
|
||||
self.assertRaises(falcon.HTTPBadRequest,
|
||||
utils.filter_stream, stream, length, None)
|
||||
utils.deserialize, stream, length)
|
||||
|
@ -89,6 +89,10 @@ class TestPoolsSqlalchemy(v1.TestPoolsSqlalchemy):
|
||||
url_prefix = URL_PREFIX
|
||||
|
||||
|
||||
class TestValidation(v1.TestValidation):
|
||||
url_prefix = URL_PREFIX
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# v1.0 only
|
||||
# --------------------------------------------------------------------------
|
||||
|
@ -93,6 +93,10 @@ class TestPoolsSqlalchemy(v1_1.TestPoolsSqlalchemy):
|
||||
url_prefix = URL_PREFIX
|
||||
|
||||
|
||||
class TestValidation(v1_1.TestValidation):
|
||||
url_prefix = URL_PREFIX
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# v1.1 only
|
||||
# --------------------------------------------------------------------------
|
||||
@ -153,8 +157,10 @@ class TestMessages(base.V1_1Base):
|
||||
super(TestMessages, self).tearDown()
|
||||
|
||||
def _post_messages(self, target, repeat=1):
|
||||
doc = jsonutils.dumps([{'body': 239, 'ttl': 300}] * repeat)
|
||||
return self.simulate_post(target, self.project_id, body=doc,
|
||||
doc = {'messages': [{'body': 239, 'ttl': 300}] * repeat}
|
||||
body = jsonutils.dumps(doc)
|
||||
|
||||
return self.simulate_post(target, self.project_id, body=body,
|
||||
headers=self.headers)
|
||||
|
||||
def _get_msg_id(self, headers):
|
||||
|
Loading…
Reference in New Issue
Block a user