Support purge queue -- websocket

A new endpoint /v2/queues/myqueue/purge is added to support purge
a queue, which accepts a POST body like:
{"resource_types": ["messages", "subscriptions"]} to allow user
purge particular resource of the queue. Test cases are added as
well.

Partially Implements: blueprint purge-queue

Change-Id: I5b2dad428a32270ecb3998bc3c0cfaccdf88ba38
This commit is contained in:
Fei Long Wang 2016-12-15 16:29:27 +13:00
parent b172c34c51
commit 5b1f9156c0
5 changed files with 128 additions and 2 deletions

View File

@ -222,6 +222,57 @@ class Endpoints(object):
headers = {'status': 200}
return response.Response(req, body, headers)
@api_utils.on_exception_sends_500
def queue_purge(self, req):
"""Purge queue
:param req: Request instance ready to be sent.
:type req: `api.common.Request`
:return: resp: Response instance
:type: resp: `api.common.Response`
"""
project_id = req._headers.get('X-Project-ID')
queue_name = req._body.get('queue_name')
resource_types = req._body.get('resource_types', ["messages",
"subscriptions"])
LOG.debug(u'Purge queue - queue: %(queue)s, '
u'project: %(project)s',
{'queue': queue_name, 'project': project_id})
try:
pop_limit = 100
if "messages" in resource_types:
LOG.debug("Purge all messages under queue %s" % queue_name)
resp = self._pop_messages(req, queue_name,
project_id, pop_limit)
while resp.get_response()['body']['messages']:
resp = self._pop_messages(req, queue_name,
project_id, pop_limit)
if "subscriptions" in resource_types:
LOG.debug("Purge all subscriptions under queue %s" %
queue_name)
resp = self._subscription_controller.list(queue_name,
project=project_id)
subscriptions = list(next(resp))
for sub in subscriptions:
self._subscription_controller.delete(queue_name,
sub['id'],
project=project_id)
except storage_errors.QueueDoesNotExist as ex:
LOG.exception(ex)
headers = {'status': 404}
return api_utils.error_response(req, ex, headers)
except storage_errors.ExceptionBase as ex:
LOG.exception(ex)
headers = {'status': 503}
return api_utils.error_response(req, ex, headers)
else:
headers = {'status': 204}
return response.Response(req, {}, headers)
# Messages
@api_utils.on_exception_sends_500
def message_list(self, req):

View File

@ -104,4 +104,23 @@ class RequestSchema(v1_1.RequestSchema):
},
'required': ['action', 'headers', 'body']
},
consts.QUEUE_PURGE: {
'properties': {
'action': {'enum': [consts.QUEUE_PURGE]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'resource_types': {'type': 'array'},
},
'required': ['queue_name'],
}
},
'required': ['action', 'headers', 'body']
},
})

View File

@ -64,12 +64,14 @@ QUEUE_OPS = (
QUEUE_GET,
QUEUE_DELETE,
QUEUE_GET_STATS,
QUEUE_PURGE
) = (
'queue_create',
'queue_list',
'queue_get',
'queue_delete',
'queue_get_stats',
'queue_purge'
)
CLAIM_OPS = (

View File

@ -602,6 +602,59 @@ class QueueLifecycleBaseTest(base.V2Base):
mock_queue_list.return_value = fake_generator()
self.protocol.onMessage(req, False)
def _post_messages(self, queue_name, headers, repeat=1):
messages = [{'body': 239, 'ttl': 300}] * repeat
action = consts.MESSAGE_POST
body = {"queue_name": queue_name,
"messages": messages}
send_mock = mock.Mock()
self.protocol.sendMessage = send_mock
req = test_utils.create_request(action, body, headers)
self.protocol.onMessage(req, False)
return json.loads(send_mock.call_args[0][0])
def test_purge(self):
arbitrary_number = 644079696574693
project_id = str(arbitrary_number)
client_id = str(uuid.uuid4())
headers = {
'X-Project-ID': project_id,
'Client-ID': client_id
}
queue_name = 'myqueue'
resp = self._post_messages(queue_name, headers, repeat=5)
msg_ids = resp['body']['message_ids']
send_mock = mock.Mock()
self.protocol.sendMessage = send_mock
for msg_id in msg_ids:
action = consts.MESSAGE_GET
body = {"queue_name": queue_name, "message_id": msg_id}
req = test_utils.create_request(action, body, headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0])
self.assertEqual(200, resp['headers']['status'])
action = consts.QUEUE_PURGE
body = {"queue_name": queue_name, "resource_types": ["messages"]}
req = test_utils.create_request(action, body, headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0])
self.assertEqual(204, resp['headers']['status'])
for msg_id in msg_ids:
action = consts.MESSAGE_GET
body = {"queue_name": queue_name, "message_id": msg_id}
req = test_utils.create_request(action, body, headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0])
self.assertEqual(404, resp['headers']['status'])
class TestQueueLifecycleMongoDB(QueueLifecycleBaseTest):

View File

@ -58,11 +58,12 @@ class Resource(object):
try:
if "messages" in document['resource_types']:
pop_limit = 100
LOG.debug("Purge all messages under queue %s" % queue_name)
messages = self._message_ctrl.pop(queue_name, 10,
messages = self._message_ctrl.pop(queue_name, pop_limit,
project=project_id)
while messages:
messages = self._message_ctrl.pop(queue_name, 10,
messages = self._message_ctrl.pop(queue_name, pop_limit,
project=project_id)
if "subscriptions" in document['resource_types']: