Implement Lazy Create Queue in v1.1 API
This patch allows: 1.Posting a message for queue that does not exist, and creates the queue on the fly 2.Listing messages, Stats from a non-existing queue which gives an empty list with a HTTP code of 200 Claim behavior is left unchanged. Implements : blueprint api-v1.1-lazy-create-queues Change-Id: Ie8e1b068d78561deaf01ca84e47e506d88422c1e
This commit is contained in:
parent
f227a24651
commit
b825b314a1
@ -50,7 +50,8 @@ def public_endpoints(driver):
|
|||||||
('/queues/{queue_name}/messages',
|
('/queues/{queue_name}/messages',
|
||||||
messages.CollectionResource(driver._wsgi_conf,
|
messages.CollectionResource(driver._wsgi_conf,
|
||||||
driver._validate,
|
driver._validate,
|
||||||
message_controller)),
|
message_controller,
|
||||||
|
queue_controller)),
|
||||||
('/queues/{queue_name}/messages/{message_id}',
|
('/queues/{queue_name}/messages/{message_id}',
|
||||||
messages.ItemResource(message_controller)),
|
messages.ItemResource(message_controller)),
|
||||||
|
|
||||||
|
@ -31,12 +31,15 @@ MESSAGE_POST_SPEC = (('ttl', int), ('body', '*'))
|
|||||||
|
|
||||||
class CollectionResource(object):
|
class CollectionResource(object):
|
||||||
|
|
||||||
__slots__ = ('message_controller', '_wsgi_conf', '_validate')
|
__slots__ = ('message_controller', '_wsgi_conf', '_validate',
|
||||||
|
'queue_controller')
|
||||||
|
|
||||||
def __init__(self, wsgi_conf, validate, message_controller):
|
def __init__(self, wsgi_conf, validate, message_controller,
|
||||||
|
queue_controller):
|
||||||
self._wsgi_conf = wsgi_conf
|
self._wsgi_conf = wsgi_conf
|
||||||
self._validate = validate
|
self._validate = validate
|
||||||
self.message_controller = message_controller
|
self.message_controller = message_controller
|
||||||
|
self.queue_controller = queue_controller
|
||||||
|
|
||||||
#-----------------------------------------------------------------------
|
#-----------------------------------------------------------------------
|
||||||
# Helpers
|
# Helpers
|
||||||
@ -109,13 +112,14 @@ class CollectionResource(object):
|
|||||||
raise wsgi_errors.HTTPServiceUnavailable(description)
|
raise wsgi_errors.HTTPServiceUnavailable(description)
|
||||||
|
|
||||||
if not messages:
|
if not messages:
|
||||||
return None
|
messages = []
|
||||||
|
|
||||||
# Found some messages, so prepare the response
|
else:
|
||||||
kwargs['marker'] = next(results)
|
# Found some messages, so prepare the response
|
||||||
for each_message in messages:
|
kwargs['marker'] = next(results)
|
||||||
each_message['href'] = req.path + '/' + each_message['id']
|
for each_message in messages:
|
||||||
del each_message['id']
|
each_message['href'] = req.path + '/' + each_message['id']
|
||||||
|
del each_message['id']
|
||||||
|
|
||||||
return {
|
return {
|
||||||
'messages': messages,
|
'messages': messages,
|
||||||
@ -158,6 +162,9 @@ class CollectionResource(object):
|
|||||||
try:
|
try:
|
||||||
self._validate.message_posting(messages)
|
self._validate.message_posting(messages)
|
||||||
|
|
||||||
|
if not self.queue_controller.exists(queue_name, project_id):
|
||||||
|
self.queue_controller.create(queue_name, project_id)
|
||||||
|
|
||||||
message_ids = self.message_controller.post(
|
message_ids = self.message_controller.post(
|
||||||
queue_name,
|
queue_name,
|
||||||
messages=messages,
|
messages=messages,
|
||||||
@ -205,16 +212,21 @@ class CollectionResource(object):
|
|||||||
resp.content_location = req.relative_uri
|
resp.content_location = req.relative_uri
|
||||||
|
|
||||||
ids = req.get_param_as_list('ids')
|
ids = req.get_param_as_list('ids')
|
||||||
|
|
||||||
if ids is None:
|
if ids is None:
|
||||||
response = self._get(req, project_id, queue_name)
|
response = self._get(req, project_id, queue_name)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
response = self._get_by_id(req.path, project_id, queue_name, ids)
|
response = self._get_by_id(req.path, project_id, queue_name, ids)
|
||||||
|
|
||||||
if response is None:
|
if response is None:
|
||||||
resp.status = falcon.HTTP_204
|
# NOTE(TheSriram): Trying to get a message by id, should
|
||||||
return
|
# return the message if its present, otherwise a 404 since
|
||||||
|
# the message might have been deleted.
|
||||||
|
resp.status = falcon.HTTP_404
|
||||||
|
|
||||||
resp.body = utils.to_json(response)
|
else:
|
||||||
|
resp.body = utils.to_json(response)
|
||||||
# status defaults to 200
|
# status defaults to 200
|
||||||
|
|
||||||
def on_delete(self, req, resp, project_id, queue_name):
|
def on_delete(self, req, resp, project_id, queue_name):
|
||||||
|
@ -54,6 +54,17 @@ class Resource(object):
|
|||||||
resp.body = utils.to_json(resp_dict)
|
resp.body = utils.to_json(resp_dict)
|
||||||
# status defaults to 200
|
# status defaults to 200
|
||||||
|
|
||||||
|
except storage_errors.QueueDoesNotExist as ex:
|
||||||
|
resp_dict = {
|
||||||
|
'messages': {
|
||||||
|
'claimed': 0,
|
||||||
|
'free': 0,
|
||||||
|
'total': 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
resp.content_location = req.path
|
||||||
|
resp.body = utils.to_json(resp_dict)
|
||||||
|
|
||||||
except storage_errors.DoesNotExist as ex:
|
except storage_errors.DoesNotExist as ex:
|
||||||
LOG.debug(ex)
|
LOG.debug(ex)
|
||||||
raise falcon.HTTPNotFound()
|
raise falcon.HTTPNotFound()
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
|
|
||||||
from falcon import testing as ftest
|
from falcon import testing as ftest
|
||||||
|
|
||||||
|
from marconi.openstack.common import jsonutils
|
||||||
from marconi.queues import bootstrap
|
from marconi.queues import bootstrap
|
||||||
from marconi.queues.transport.wsgi import driver
|
from marconi.queues.transport.wsgi import driver
|
||||||
from marconi import tests as testing
|
from marconi import tests as testing
|
||||||
@ -123,10 +124,13 @@ class V1BaseFaulty(TestBaseFaulty):
|
|||||||
|
|
||||||
|
|
||||||
class V1_1Base(TestBase):
|
class V1_1Base(TestBase):
|
||||||
"""Base class for V1 API Tests.
|
"""Base class for V1.1 API Tests.
|
||||||
Should contain methods specific to V1.1 of the API
|
Should contain methods specific to V1.1 of the API
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def _empty_message_list(self, body):
|
||||||
|
self.assertEqual(jsonutils.loads(body[0])['messages'], [])
|
||||||
|
|
||||||
def simulate_request(self, path, **kwargs):
|
def simulate_request(self, path, **kwargs):
|
||||||
"""Simulate a request.
|
"""Simulate a request.
|
||||||
|
|
||||||
|
@ -121,13 +121,15 @@ class ClaimsBaseTest(base.V1_1Base):
|
|||||||
body = self.simulate_get(self.messages_path,
|
body = self.simulate_get(self.messages_path,
|
||||||
headers=self.headers,
|
headers=self.headers,
|
||||||
query_string="echo=true")
|
query_string="echo=true")
|
||||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||||
|
self._empty_message_list(body)
|
||||||
|
|
||||||
# Listing messages, by default, won't include claimed, won't echo
|
# Listing messages, by default, won't include claimed, won't echo
|
||||||
body = self.simulate_get(self.messages_path,
|
body = self.simulate_get(self.messages_path,
|
||||||
headers=self.headers,
|
headers=self.headers,
|
||||||
query_string="echo=false")
|
query_string="echo=false")
|
||||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||||
|
self._empty_message_list(body)
|
||||||
|
|
||||||
# List messages, include_claimed, but don't echo
|
# List messages, include_claimed, but don't echo
|
||||||
body = self.simulate_get(self.messages_path,
|
body = self.simulate_get(self.messages_path,
|
||||||
@ -135,7 +137,8 @@ class ClaimsBaseTest(base.V1_1Base):
|
|||||||
'&echo=false',
|
'&echo=false',
|
||||||
headers=self.headers)
|
headers=self.headers)
|
||||||
|
|
||||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||||
|
self._empty_message_list(body)
|
||||||
|
|
||||||
# List messages with a different client-id and echo=false.
|
# List messages with a different client-id and echo=false.
|
||||||
# Should return some messages
|
# Should return some messages
|
||||||
|
@ -78,7 +78,8 @@ class TestDefaultLimits(base.V1_1Base):
|
|||||||
headers=self.headers,
|
headers=self.headers,
|
||||||
query_string='echo=false')
|
query_string='echo=false')
|
||||||
|
|
||||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||||
|
self._empty_message_list(result)
|
||||||
|
|
||||||
self._prepare_messages(storage.DEFAULT_MESSAGES_PER_PAGE + 1)
|
self._prepare_messages(storage.DEFAULT_MESSAGES_PER_PAGE + 1)
|
||||||
result = self.simulate_get(self.messages_path,
|
result = self.simulate_get(self.messages_path,
|
||||||
|
@ -31,6 +31,7 @@ from marconi.tests.queues.transport.wsgi import base
|
|||||||
|
|
||||||
@ddt.ddt
|
@ddt.ddt
|
||||||
class MessagesBaseTest(base.V1_1Base):
|
class MessagesBaseTest(base.V1_1Base):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(MessagesBaseTest, self).setUp()
|
super(MessagesBaseTest, self).setUp()
|
||||||
|
|
||||||
@ -194,7 +195,7 @@ class MessagesBaseTest(base.V1_1Base):
|
|||||||
|
|
||||||
game_title = 'v' * validation.QUEUE_NAME_MAX_LEN
|
game_title = 'v' * validation.QUEUE_NAME_MAX_LEN
|
||||||
self._post_messages(queues_path + game_title + '/messages')
|
self._post_messages(queues_path + game_title + '/messages')
|
||||||
self.assertEqual(self.srmock.status, falcon.HTTP_404)
|
self.assertEqual(self.srmock.status, falcon.HTTP_201)
|
||||||
|
|
||||||
game_title += 'v'
|
game_title += 'v'
|
||||||
self._post_messages(queues_path + game_title + '/messages')
|
self._post_messages(queues_path + game_title + '/messages')
|
||||||
@ -202,12 +203,14 @@ class MessagesBaseTest(base.V1_1Base):
|
|||||||
|
|
||||||
def test_post_to_missing_queue(self):
|
def test_post_to_missing_queue(self):
|
||||||
self._post_messages(self.url_prefix + '/queues/nonexistent/messages')
|
self._post_messages(self.url_prefix + '/queues/nonexistent/messages')
|
||||||
self.assertEqual(self.srmock.status, falcon.HTTP_404)
|
self.assertEqual(self.srmock.status, falcon.HTTP_201)
|
||||||
|
|
||||||
def test_get_from_missing_queue(self):
|
def test_get_from_missing_queue(self):
|
||||||
self.simulate_get(self.url_prefix + '/queues/nonexistent/messages',
|
body = self.simulate_get(self.url_prefix +
|
||||||
headers=self.headers)
|
'/queues/nonexistent/messages',
|
||||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
headers=self.headers)
|
||||||
|
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||||
|
self._empty_message_list(body)
|
||||||
|
|
||||||
@ddt.data('', '0xdeadbeef', '550893e0-2b6e-11e3-835a-5cf9dd72369')
|
@ddt.data('', '0xdeadbeef', '550893e0-2b6e-11e3-835a-5cf9dd72369')
|
||||||
def test_bad_client_id(self, text_id):
|
def test_bad_client_id(self, text_id):
|
||||||
@ -235,7 +238,7 @@ class MessagesBaseTest(base.V1_1Base):
|
|||||||
def test_unacceptable_ttl(self, ttl):
|
def test_unacceptable_ttl(self, ttl):
|
||||||
self.simulate_post(self.queue_path + '/messages',
|
self.simulate_post(self.queue_path + '/messages',
|
||||||
body=jsonutils.dumps([{'ttl': ttl,
|
body=jsonutils.dumps([{'ttl': ttl,
|
||||||
'body': None}]),
|
'body': None}]),
|
||||||
headers=self.headers)
|
headers=self.headers)
|
||||||
|
|
||||||
self.assertEqual(self.srmock.status, falcon.HTTP_400)
|
self.assertEqual(self.srmock.status, falcon.HTTP_400)
|
||||||
@ -294,7 +297,7 @@ class MessagesBaseTest(base.V1_1Base):
|
|||||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
||||||
|
|
||||||
self.simulate_get(target, query_string=params, headers=self.headers)
|
self.simulate_get(target, query_string=params, headers=self.headers)
|
||||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
self.assertEqual(self.srmock.status, falcon.HTTP_404)
|
||||||
|
|
||||||
# Safe to delete non-existing ones
|
# Safe to delete non-existing ones
|
||||||
self.simulate_delete(target, query_string=params, headers=self.headers)
|
self.simulate_delete(target, query_string=params, headers=self.headers)
|
||||||
@ -321,7 +324,7 @@ class MessagesBaseTest(base.V1_1Base):
|
|||||||
path + '?' + query_string)
|
path + '?' + query_string)
|
||||||
|
|
||||||
cnt = 0
|
cnt = 0
|
||||||
while self.srmock.status == falcon.HTTP_200:
|
while jsonutils.loads(body[0])['messages'] != []:
|
||||||
contents = jsonutils.loads(body[0])
|
contents = jsonutils.loads(body[0])
|
||||||
[target, params] = contents['links'][0]['href'].split('?')
|
[target, params] = contents['links'][0]['href'].split('?')
|
||||||
|
|
||||||
@ -335,7 +338,8 @@ class MessagesBaseTest(base.V1_1Base):
|
|||||||
cnt += 1
|
cnt += 1
|
||||||
|
|
||||||
self.assertEqual(cnt, 4)
|
self.assertEqual(cnt, 4)
|
||||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||||
|
self._empty_message_list(body)
|
||||||
|
|
||||||
# Stats
|
# Stats
|
||||||
body = self.simulate_get(self.queue_path + '/stats',
|
body = self.simulate_get(self.queue_path + '/stats',
|
||||||
@ -354,20 +358,23 @@ class MessagesBaseTest(base.V1_1Base):
|
|||||||
matchers.MatchesRegex(expected_pattern))
|
matchers.MatchesRegex(expected_pattern))
|
||||||
|
|
||||||
# NOTE(kgriffs): Try to get messages for a missing queue
|
# NOTE(kgriffs): Try to get messages for a missing queue
|
||||||
self.simulate_get(self.url_prefix + '/queues/nonexistent/messages',
|
body = self.simulate_get(self.url_prefix +
|
||||||
headers=self.headers)
|
'/queues/nonexistent/messages',
|
||||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
headers=self.headers)
|
||||||
|
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||||
|
self._empty_message_list(body)
|
||||||
|
|
||||||
def test_list_with_bad_marker(self):
|
def test_list_with_bad_marker(self):
|
||||||
path = self.queue_path + '/messages'
|
path = self.queue_path + '/messages'
|
||||||
self._post_messages(path, repeat=5)
|
self._post_messages(path, repeat=5)
|
||||||
|
|
||||||
query_string = 'limit=3&echo=true&marker=sfhlsfdjh2048'
|
query_string = 'limit=3&echo=true&marker=sfhlsfdjh2048'
|
||||||
self.simulate_get(path,
|
body = self.simulate_get(path,
|
||||||
query_string=query_string,
|
query_string=query_string,
|
||||||
headers=self.headers)
|
headers=self.headers)
|
||||||
|
|
||||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||||
|
self._empty_message_list(body)
|
||||||
|
|
||||||
def test_no_uuid(self):
|
def test_no_uuid(self):
|
||||||
headers = {
|
headers = {
|
||||||
@ -412,11 +419,11 @@ class MessagesBaseTest(base.V1_1Base):
|
|||||||
self.simulate_get(path, headers=self.headers)
|
self.simulate_get(path, headers=self.headers)
|
||||||
self.assertEqual(self.srmock.status, falcon.HTTP_404)
|
self.assertEqual(self.srmock.status, falcon.HTTP_404)
|
||||||
|
|
||||||
def test_get_multiple_invalid_messages_204s(self):
|
def test_get_multiple_invalid_messages_404s(self):
|
||||||
path = self.url_prefix + '/queues/notthere/messages'
|
path = self.url_prefix + '/queues/notthere/messages'
|
||||||
self.simulate_get(path, query_string='ids=a,b,c',
|
self.simulate_get(path, query_string='ids=a,b,c',
|
||||||
headers=self.headers)
|
headers=self.headers)
|
||||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
self.assertEqual(self.srmock.status, falcon.HTTP_404)
|
||||||
|
|
||||||
def test_delete_multiple_invalid_messages_204s(self):
|
def test_delete_multiple_invalid_messages_204s(self):
|
||||||
path = self.url_prefix + '/queues/notthere/messages'
|
path = self.url_prefix + '/queues/notthere/messages'
|
||||||
|
@ -67,9 +67,9 @@ class QueueLifecycleBaseTest(base.V1_1Base):
|
|||||||
gumshoe_queue_path_metadata = self.gumshoe_queue_path + '/metadata'
|
gumshoe_queue_path_metadata = self.gumshoe_queue_path + '/metadata'
|
||||||
gumshoe_queue_path_stats = self.gumshoe_queue_path + '/stats'
|
gumshoe_queue_path_stats = self.gumshoe_queue_path + '/stats'
|
||||||
|
|
||||||
# Stats not found - queue not created yet
|
# Stats are empty - queue not created yet
|
||||||
self.simulate_get(gumshoe_queue_path_stats, headers=headers)
|
self.simulate_get(gumshoe_queue_path_stats, headers=headers)
|
||||||
self.assertEqual(self.srmock.status, falcon.HTTP_404)
|
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||||
|
|
||||||
# Metadata not found - queue not created yet
|
# Metadata not found - queue not created yet
|
||||||
self.simulate_get(gumshoe_queue_path_metadata, headers=headers)
|
self.simulate_get(gumshoe_queue_path_metadata, headers=headers)
|
||||||
@ -114,7 +114,7 @@ class QueueLifecycleBaseTest(base.V1_1Base):
|
|||||||
|
|
||||||
# Get non-existent stats
|
# Get non-existent stats
|
||||||
self.simulate_get(gumshoe_queue_path_stats, headers=headers)
|
self.simulate_get(gumshoe_queue_path_stats, headers=headers)
|
||||||
self.assertEqual(self.srmock.status, falcon.HTTP_404)
|
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||||
|
|
||||||
# Get non-existent metadata
|
# Get non-existent metadata
|
||||||
self.simulate_get(gumshoe_queue_path_metadata, headers=headers)
|
self.simulate_get(gumshoe_queue_path_metadata, headers=headers)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user