Merge "Add _max_messages_post_size
and _default_message_ttl
for queue"
This commit is contained in:
commit
0525130a73
@ -100,6 +100,7 @@ class Endpoints(object):
|
|||||||
try:
|
try:
|
||||||
self._validate.queue_identification(queue_name, project_id)
|
self._validate.queue_identification(queue_name, project_id)
|
||||||
self._validate.queue_metadata_length(len(str(metadata)))
|
self._validate.queue_metadata_length(len(str(metadata)))
|
||||||
|
self._validate.queue_metadata_putting(metadata)
|
||||||
created = self._queue_controller.create(queue_name,
|
created = self._queue_controller.create(queue_name,
|
||||||
metadata=metadata,
|
metadata=metadata,
|
||||||
project=project_id)
|
project=project_id)
|
||||||
@ -371,18 +372,41 @@ class Endpoints(object):
|
|||||||
return api_utils.error_response(req, ex, headers, error)
|
return api_utils.error_response(req, ex, headers, error)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# NOTE(flwang): Replace 'exists' with 'get_metadata' won't impact
|
||||||
|
# the performance since both of them will call
|
||||||
|
# collection.find_one()
|
||||||
|
queue_meta = None
|
||||||
|
try:
|
||||||
|
queue_meta = self._queue_controller.get_metadata(queue_name,
|
||||||
|
project_id)
|
||||||
|
except storage_errors.DoesNotExist as ex:
|
||||||
|
self._validate.queue_identification(queue_name, project_id)
|
||||||
|
self._queue_controller.create(queue_name, project=project_id)
|
||||||
|
# NOTE(flwang): Queue is created in lazy mode, so no metadata
|
||||||
|
# set.
|
||||||
|
queue_meta = {}
|
||||||
|
|
||||||
|
queue_max_msg_size = queue_meta.get('_max_messages_post_size',
|
||||||
|
None)
|
||||||
|
queue_default_ttl = queue_meta.get('_default_message_ttl', None)
|
||||||
|
|
||||||
|
# TODO(flwang): To avoid any unexpected regression issue, we just
|
||||||
|
# leave the _message_post_spec attribute of class as it's. It
|
||||||
|
# should be removed in Newton release.
|
||||||
|
if queue_default_ttl:
|
||||||
|
_message_post_spec = (('ttl', int, queue_default_ttl),
|
||||||
|
('body', '*', None),)
|
||||||
|
else:
|
||||||
|
_message_post_spec = (('ttl', int, self._defaults.message_ttl),
|
||||||
|
('body', '*', None),)
|
||||||
# Place JSON size restriction before parsing
|
# Place JSON size restriction before parsing
|
||||||
self._validate.message_length(len(str(messages)))
|
self._validate.message_length(len(str(messages)),
|
||||||
|
max_msg_post_size=queue_max_msg_size)
|
||||||
except validation.ValidationFailed as ex:
|
except validation.ValidationFailed as ex:
|
||||||
LOG.debug(ex)
|
LOG.debug(ex)
|
||||||
headers = {'status': 400}
|
headers = {'status': 400}
|
||||||
return api_utils.error_response(req, ex, headers)
|
return api_utils.error_response(req, ex, headers)
|
||||||
|
|
||||||
_message_post_spec = (
|
|
||||||
('ttl', int, self._defaults.message_ttl),
|
|
||||||
('body', '*', None),
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
messages = api_utils.sanitize(messages,
|
messages = api_utils.sanitize(messages,
|
||||||
_message_post_spec,
|
_message_post_spec,
|
||||||
@ -397,10 +421,6 @@ class Endpoints(object):
|
|||||||
|
|
||||||
self._validate.message_posting(messages)
|
self._validate.message_posting(messages)
|
||||||
|
|
||||||
if not self._queue_controller.exists(queue_name, project_id):
|
|
||||||
self._validate.queue_identification(queue_name, project_id)
|
|
||||||
self._queue_controller.create(queue_name, project=project_id)
|
|
||||||
|
|
||||||
message_ids = self._message_controller.post(
|
message_ids = self._message_controller.post(
|
||||||
queue_name,
|
queue_name,
|
||||||
messages=messages,
|
messages=messages,
|
||||||
|
@ -265,6 +265,45 @@ class TestMessagesMongoDB(base.V2Base):
|
|||||||
self._post_messages(self.url_prefix + '/queues/nonexistent/messages')
|
self._post_messages(self.url_prefix + '/queues/nonexistent/messages')
|
||||||
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
||||||
|
|
||||||
|
def test_post_using_queue_default_message_ttl(self):
|
||||||
|
queue_path = self.url_prefix + '/queues/test_queue1'
|
||||||
|
messages_path = queue_path + '/messages'
|
||||||
|
doc = '{"_default_message_ttl": 999}'
|
||||||
|
self.simulate_put(queue_path, body=doc, headers=self.headers)
|
||||||
|
self.addCleanup(self.simulate_delete, queue_path, headers=self.headers)
|
||||||
|
sample_messages = {
|
||||||
|
'messages': [
|
||||||
|
{'body': {'key': 'value'}},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
sample_doc = jsonutils.dumps(sample_messages)
|
||||||
|
result = self.simulate_post(messages_path,
|
||||||
|
body=sample_doc, headers=self.headers)
|
||||||
|
result_doc = jsonutils.loads(result[0])
|
||||||
|
href = result_doc['resources'][0]
|
||||||
|
result = self.simulate_get(href, headers=self.headers)
|
||||||
|
message = jsonutils.loads(result[0])
|
||||||
|
|
||||||
|
self.assertEqual(999, message['ttl'])
|
||||||
|
|
||||||
|
def test_post_using_queue_max_messages_post_size(self):
|
||||||
|
queue_path = self.url_prefix + '/queues/test_queue2'
|
||||||
|
messages_path = queue_path + '/messages'
|
||||||
|
doc = '{"_max_messages_post_size": 1023}'
|
||||||
|
self.simulate_put(queue_path, body=doc, headers=self.headers)
|
||||||
|
self.addCleanup(self.simulate_delete, queue_path, headers=self.headers)
|
||||||
|
sample_messages = {
|
||||||
|
'messages': [
|
||||||
|
{'body': {'key': 'a' * 1204}},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
sample_doc = jsonutils.dumps(sample_messages)
|
||||||
|
self.simulate_post(messages_path,
|
||||||
|
body=sample_doc, headers=self.headers)
|
||||||
|
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||||
|
|
||||||
def test_get_from_missing_queue(self):
|
def test_get_from_missing_queue(self):
|
||||||
body = self.simulate_get(self.url_prefix +
|
body = self.simulate_get(self.url_prefix +
|
||||||
'/queues/nonexistent/messages',
|
'/queues/nonexistent/messages',
|
||||||
@ -633,7 +672,7 @@ class TestMessagesFaultyDriver(base.V2BaseFaulty):
|
|||||||
self.simulate_post(path,
|
self.simulate_post(path,
|
||||||
body=body,
|
body=body,
|
||||||
headers=headers)
|
headers=headers)
|
||||||
self.assertEqual(falcon.HTTP_503, self.srmock.status)
|
self.assertEqual(falcon.HTTP_500, self.srmock.status)
|
||||||
|
|
||||||
self.simulate_get(path,
|
self.simulate_get(path,
|
||||||
headers=headers)
|
headers=headers)
|
||||||
|
@ -113,3 +113,40 @@ class TestValidation(base.V2Base):
|
|||||||
self.project_id, body=body,
|
self.project_id, body=body,
|
||||||
headers=self.headers)
|
headers=self.headers)
|
||||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||||
|
|
||||||
|
def test_queue_metadata_putting(self):
|
||||||
|
# Test _default_message_ttl
|
||||||
|
# TTL normal case
|
||||||
|
queue_1 = self.url_prefix + '/queues/queue1'
|
||||||
|
self.simulate_put(queue_1,
|
||||||
|
self.project_id,
|
||||||
|
body='{"_default_message_ttl": 60}')
|
||||||
|
self.addCleanup(self.simulate_delete, queue_1, headers=self.headers)
|
||||||
|
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
||||||
|
|
||||||
|
# TTL under min
|
||||||
|
self.simulate_put(queue_1,
|
||||||
|
self.project_id,
|
||||||
|
body='{"_default_message_ttl": 59}')
|
||||||
|
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||||
|
|
||||||
|
# TTL over max
|
||||||
|
self.simulate_put(queue_1,
|
||||||
|
self.project_id,
|
||||||
|
body='{"_default_message_ttl": 1209601}')
|
||||||
|
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||||
|
|
||||||
|
# Test _max_messages_post_size
|
||||||
|
# Size normal case
|
||||||
|
queue_2 = self.url_prefix + '/queues/queue2'
|
||||||
|
self.simulate_put(queue_2,
|
||||||
|
self.project_id,
|
||||||
|
body='{"_max_messages_post_size": 255}')
|
||||||
|
self.addCleanup(self.simulate_delete, queue_2, headers=self.headers)
|
||||||
|
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
||||||
|
|
||||||
|
# Size over max
|
||||||
|
self.simulate_put(queue_2,
|
||||||
|
self.project_id,
|
||||||
|
body='{"_max_messages_post_size": 257}')
|
||||||
|
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||||
|
@ -156,6 +156,42 @@ class Validator(object):
|
|||||||
msg = _(u'Queue metadata is too large. Max size: {0}')
|
msg = _(u'Queue metadata is too large. Max size: {0}')
|
||||||
raise ValidationFailed(msg, self._limits_conf.max_queue_metadata)
|
raise ValidationFailed(msg, self._limits_conf.max_queue_metadata)
|
||||||
|
|
||||||
|
def queue_metadata_putting(self, queue_metadata):
|
||||||
|
"""Checking if the reserved attributes of the queue are valid.
|
||||||
|
|
||||||
|
:param queue_metadata: Queue's metadata.
|
||||||
|
:raises: ValidationFailed if any reserved attribute is invalid.
|
||||||
|
"""
|
||||||
|
if not queue_metadata:
|
||||||
|
return
|
||||||
|
|
||||||
|
queue_default_ttl = queue_metadata.get('_default_message_ttl', None)
|
||||||
|
if queue_default_ttl and not isinstance(queue_default_ttl, int):
|
||||||
|
msg = _(u'_default_message_ttl must be integer.')
|
||||||
|
raise ValidationFailed(msg)
|
||||||
|
|
||||||
|
if queue_default_ttl:
|
||||||
|
if not (MIN_MESSAGE_TTL <= queue_default_ttl <=
|
||||||
|
self._limits_conf.max_message_ttl):
|
||||||
|
msg = _(u'_default_message_ttl can not exceed {0} '
|
||||||
|
'seconds, and must be at least {1} seconds long.')
|
||||||
|
raise ValidationFailed(
|
||||||
|
msg, self._limits_conf.max_message_ttl, MIN_MESSAGE_TTL)
|
||||||
|
|
||||||
|
queue_max_msg_size = queue_metadata.get('_max_messages_post_size',
|
||||||
|
None)
|
||||||
|
if queue_max_msg_size and not isinstance(queue_max_msg_size, int):
|
||||||
|
msg = _(u'_max_messages_post_size must be integer.')
|
||||||
|
raise ValidationFailed(msg)
|
||||||
|
|
||||||
|
if queue_max_msg_size:
|
||||||
|
if not (0 < queue_max_msg_size <=
|
||||||
|
self._limits_conf.max_messages_post_size):
|
||||||
|
raise ValidationFailed(
|
||||||
|
_(u'_max_messages_post_size can not exceed {0}, '
|
||||||
|
' and must be at least greater than 0.'),
|
||||||
|
self._limits_conf.max_messages_post_size)
|
||||||
|
|
||||||
def message_posting(self, messages):
|
def message_posting(self, messages):
|
||||||
"""Restrictions on a list of messages.
|
"""Restrictions on a list of messages.
|
||||||
|
|
||||||
@ -170,7 +206,7 @@ class Validator(object):
|
|||||||
for msg in messages:
|
for msg in messages:
|
||||||
self.message_content(msg)
|
self.message_content(msg)
|
||||||
|
|
||||||
def message_length(self, content_length):
|
def message_length(self, content_length, max_msg_post_size=None):
|
||||||
"""Restrictions on message post length.
|
"""Restrictions on message post length.
|
||||||
|
|
||||||
:param content_length: Queue request's length.
|
:param content_length: Queue request's length.
|
||||||
@ -178,6 +214,26 @@ class Validator(object):
|
|||||||
"""
|
"""
|
||||||
if content_length is None:
|
if content_length is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if max_msg_post_size:
|
||||||
|
try:
|
||||||
|
min_max_size = min(max_msg_post_size,
|
||||||
|
self._limits_conf.max_messages_post_size)
|
||||||
|
if content_length > min_max_size:
|
||||||
|
raise ValidationFailed(
|
||||||
|
_(u'Message collection size is too large. The max '
|
||||||
|
'size for current queue is {0}. It is calculated '
|
||||||
|
'by max size = min(max_messages_post_size_config: '
|
||||||
|
'{1}, max_messages_post_size_queue: {2}).'),
|
||||||
|
min_max_size,
|
||||||
|
self._limits_conf.max_messages_post_size,
|
||||||
|
max_msg_post_size)
|
||||||
|
except TypeError:
|
||||||
|
# NOTE(flwang): If there is a type error when using min(),
|
||||||
|
# it only happens in py3.x, it will be skipped and compare
|
||||||
|
# the message length with the size defined in config file.
|
||||||
|
pass
|
||||||
|
|
||||||
if content_length > self._limits_conf.max_messages_post_size:
|
if content_length > self._limits_conf.max_messages_post_size:
|
||||||
raise ValidationFailed(
|
raise ValidationFailed(
|
||||||
_(u'Message collection size is too large. Max size {0}'),
|
_(u'Message collection size is too large. Max size {0}'),
|
||||||
|
@ -38,6 +38,7 @@ class CollectionResource(object):
|
|||||||
'_wsgi_conf',
|
'_wsgi_conf',
|
||||||
'_validate',
|
'_validate',
|
||||||
'_message_post_spec',
|
'_message_post_spec',
|
||||||
|
'_default_message_ttl'
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, wsgi_conf, validate,
|
def __init__(self, wsgi_conf, validate,
|
||||||
@ -48,9 +49,10 @@ class CollectionResource(object):
|
|||||||
self._validate = validate
|
self._validate = validate
|
||||||
self._message_controller = message_controller
|
self._message_controller = message_controller
|
||||||
self._queue_controller = queue_controller
|
self._queue_controller = queue_controller
|
||||||
|
self._default_message_ttl = default_message_ttl
|
||||||
|
|
||||||
self._message_post_spec = (
|
self._message_post_spec = (
|
||||||
('ttl', int, default_message_ttl),
|
('ttl', int, self._default_message_ttl),
|
||||||
('body', '*', None),
|
('body', '*', None),
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -155,10 +157,37 @@ class CollectionResource(object):
|
|||||||
@acl.enforce("messages:create")
|
@acl.enforce("messages:create")
|
||||||
def on_post(self, req, resp, project_id, queue_name):
|
def on_post(self, req, resp, project_id, queue_name):
|
||||||
client_uuid = wsgi_helpers.get_client_uuid(req)
|
client_uuid = wsgi_helpers.get_client_uuid(req)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# NOTE(flwang): Replace 'exists' with 'get_metadata' won't impact
|
||||||
|
# the performance since both of them will call
|
||||||
|
# collection.find_one()
|
||||||
|
queue_meta = None
|
||||||
|
try:
|
||||||
|
queue_meta = self._queue_controller.get_metadata(queue_name,
|
||||||
|
project_id)
|
||||||
|
except storage_errors.DoesNotExist as ex:
|
||||||
|
self._validate.queue_identification(queue_name, project_id)
|
||||||
|
self._queue_controller.create(queue_name, project=project_id)
|
||||||
|
# NOTE(flwang): Queue is created in lazy mode, so no metadata
|
||||||
|
# set.
|
||||||
|
queue_meta = {}
|
||||||
|
|
||||||
|
queue_max_msg_size = queue_meta.get('_max_messages_post_size',
|
||||||
|
None)
|
||||||
|
queue_default_ttl = queue_meta.get('_default_message_ttl', None)
|
||||||
|
|
||||||
|
# TODO(flwang): To avoid any unexpected regression issue, we just
|
||||||
|
# leave the _message_post_spec attribute of class as it's. It
|
||||||
|
# should be removed in Newton release.
|
||||||
|
if queue_default_ttl:
|
||||||
|
message_post_spec = (('ttl', int, queue_default_ttl),
|
||||||
|
('body', '*', None),)
|
||||||
|
else:
|
||||||
|
message_post_spec = (('ttl', int, self._default_message_ttl),
|
||||||
|
('body', '*', None),)
|
||||||
# Place JSON size restriction before parsing
|
# Place JSON size restriction before parsing
|
||||||
self._validate.message_length(req.content_length)
|
self._validate.message_length(req.content_length,
|
||||||
|
max_msg_post_size=queue_max_msg_size)
|
||||||
except validation.ValidationFailed as ex:
|
except validation.ValidationFailed as ex:
|
||||||
LOG.debug(ex)
|
LOG.debug(ex)
|
||||||
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||||
@ -171,15 +200,12 @@ class CollectionResource(object):
|
|||||||
raise wsgi_errors.HTTPBadRequestAPI(description)
|
raise wsgi_errors.HTTPBadRequestAPI(description)
|
||||||
|
|
||||||
messages = wsgi_utils.sanitize(document['messages'],
|
messages = wsgi_utils.sanitize(document['messages'],
|
||||||
self._message_post_spec,
|
message_post_spec,
|
||||||
doctype=wsgi_utils.JSONArray)
|
doctype=wsgi_utils.JSONArray)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._validate.message_posting(messages)
|
self._validate.message_posting(messages)
|
||||||
|
|
||||||
if not self._queue_controller.exists(queue_name, project_id):
|
|
||||||
self._queue_controller.create(queue_name, project=project_id)
|
|
||||||
|
|
||||||
message_ids = self._message_controller.post(
|
message_ids = self._message_controller.post(
|
||||||
queue_name,
|
queue_name,
|
||||||
messages=messages,
|
messages=messages,
|
||||||
|
@ -74,6 +74,7 @@ class ItemResource(object):
|
|||||||
metadata = wsgi_utils.sanitize(document, spec=None)
|
metadata = wsgi_utils.sanitize(document, spec=None)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
self._validate.queue_metadata_putting(metadata)
|
||||||
created = self._queue_controller.create(queue_name,
|
created = self._queue_controller.create(queue_name,
|
||||||
metadata=metadata,
|
metadata=metadata,
|
||||||
project=project_id)
|
project=project_id)
|
||||||
@ -81,7 +82,9 @@ class ItemResource(object):
|
|||||||
except storage_errors.FlavorDoesNotExist as ex:
|
except storage_errors.FlavorDoesNotExist as ex:
|
||||||
LOG.exception(ex)
|
LOG.exception(ex)
|
||||||
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||||
|
except validation.ValidationFailed as ex:
|
||||||
|
LOG.debug(ex)
|
||||||
|
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
LOG.exception(ex)
|
LOG.exception(ex)
|
||||||
description = _(u'Queue could not be created.')
|
description = _(u'Queue could not be created.')
|
||||||
|
Loading…
Reference in New Issue
Block a user