diff --git a/zaqar/queues/storage/errors.py b/zaqar/queues/storage/errors.py index 317775e8d..7b47bb8c6 100644 --- a/zaqar/queues/storage/errors.py +++ b/zaqar/queues/storage/errors.py @@ -90,14 +90,6 @@ class MessageDoesNotExist(DoesNotExist): project=project) -class MessageIsClaimed(NotPermitted): - - msg_format = u'Message {mid} is claimed' - - def __init__(self, mid): - super(MessageIsClaimed, self).__init__(mid=mid) - - class ClaimDoesNotExist(DoesNotExist): msg_format = (u'Claim {cid} does not exist in ' @@ -108,6 +100,30 @@ class ClaimDoesNotExist(DoesNotExist): project=project) +class MessageIsClaimed(NotPermitted): + + msg_format = u'Message {mid} is claimed' + + def __init__(self, mid): + super(MessageIsClaimed, self).__init__(mid=mid) + + +class MessageNotClaimed(NotPermitted): + + msg_format = u'Message {mid} is no longer claimed' + + def __init__(self, mid): + super(MessageNotClaimed, self).__init__(mid=mid) + + +class MessageNotClaimedBy(NotPermitted): + + msg_format = u'Message {mid} is not claimed by {cid}' + + def __init__(self, mid, cid): + super(MessageNotClaimedBy, self).__init__(cid=cid, mid=mid) + + class QueueNotMapped(DoesNotExist): msg_format = (u'No pool found for ' @@ -117,14 +133,6 @@ class QueueNotMapped(DoesNotExist): super(QueueNotMapped, self).__init__(queue=queue, project=project) -class MessageIsClaimedBy(NotPermitted): - - msg_format = u'Message {mid} is not claimed by {cid}' - - def __init__(self, mid, cid): - super(MessageIsClaimedBy, self).__init__(cid=cid, mid=mid) - - class PoolDoesNotExist(DoesNotExist): msg_format = u'Pool {pool} does not exist' diff --git a/zaqar/queues/storage/mongodb/messages.py b/zaqar/queues/storage/mongodb/messages.py index 9e0d8338b..72e37036a 100644 --- a/zaqar/queues/storage/mongodb/messages.py +++ b/zaqar/queues/storage/mongodb/messages.py @@ -688,12 +688,9 @@ class MessageController(storage.Message): PROJ_QUEUE: utils.scope_queue_name(queue_name, project), } - # NOTE(cpp-cabrera): return early - the user gaves us an - # invalid claim id and that renders the rest of this - # request moot cid = utils.to_oid(claim) if cid is None: - return + raise errors.ClaimDoesNotExist(queue_name, project, claim) now = timeutils.utcnow_ts() cursor = collection.find(query).hint(ID_INDEX_FIELDS) @@ -703,11 +700,8 @@ class MessageController(storage.Message): except StopIteration: return - is_claimed = (message['c']['id'] is not None and - message['c']['e'] > now) - if claim is None: - if is_claimed: + if _is_claimed(message, now): raise errors.MessageIsClaimed(message_id) else: @@ -719,7 +713,10 @@ class MessageController(storage.Message): message = collection.find_one(query, read_preference=pref) if message['c']['id'] != cid: - raise errors.MessageIsClaimedBy(message_id, claim) + if _is_claimed(message, now): + raise errors.MessageNotClaimedBy(message_id, claim) + + raise errors.MessageNotClaimed(message_id) collection.remove(query['_id'], w=0) @@ -762,6 +759,11 @@ class MessageController(storage.Message): return final_messages +def _is_claimed(msg, now): + return (msg['c']['id'] is not None and + msg['c']['e'] > now) + + def _basic_message(msg, now): oid = msg['_id'] age = now - utils.oid_ts(oid) diff --git a/zaqar/queues/storage/sqlalchemy/messages.py b/zaqar/queues/storage/sqlalchemy/messages.py index 06b0618d4..d66b8f465 100644 --- a/zaqar/queues/storage/sqlalchemy/messages.py +++ b/zaqar/queues/storage/sqlalchemy/messages.py @@ -73,6 +73,17 @@ class MessageController(storage.Message): except errors.MessageDoesNotExist: return False + def _get_cid(self, mid): + """Return the decoded claim ID for the given message. + + :param mid: Decoded message ID + """ + + and_stmt = sa.and_(tables.Messages.c.id == mid) + sel = sa.sql.select([tables.Messages.c.cid], and_stmt) + + return self.driver.get(sel)[0] + def get(self, queue, message_id, project): body, ttl, created = self._get(queue, message_id, project) now = timeutils.utcnow_ts() @@ -265,7 +276,7 @@ class MessageController(storage.Message): cid = claim and utils.cid_decode(claim) or None if claim and cid is None: - return + raise errors.ClaimDoesNotExist(queue, project, claim) and_stmt.append(tables.Messages.c.cid == cid) @@ -273,7 +284,16 @@ class MessageController(storage.Message): res = trans.execute(statement) if res.rowcount == 0: - raise errors.MessageIsClaimed(mid) + # NOTE(kgriffs): Either the message is not claimed, + # or if it is, the specified claim does not exist. + cid = self._get_cid(mid) + if cid is None: + raise errors.MessageNotClaimed(mid) + + # NOTE(kgriffs): The message exists, but the claim + # must have expired or something, since it + # was not associated with the message. + raise errors.MessageNotClaimedBy(mid, claim) def bulk_delete(self, queue, message_ids, project): if project is None: diff --git a/zaqar/queues/transport/wsgi/errors.py b/zaqar/queues/transport/wsgi/errors.py index 24ec17a75..d92c76de9 100644 --- a/zaqar/queues/transport/wsgi/errors.py +++ b/zaqar/queues/transport/wsgi/errors.py @@ -33,7 +33,7 @@ class HTTPServiceUnavailable(falcon.HTTPServiceUnavailable): class HTTPBadRequestAPI(falcon.HTTPBadRequest): """Wraps falcon.HTTPBadRequest with a contextual title.""" - TITLE = _(u'Invalid API call') + TITLE = _(u'Invalid API request') def __init__(self, description): super(HTTPBadRequestAPI, self).__init__(self.TITLE, description) diff --git a/zaqar/queues/transport/wsgi/v1_0/messages.py b/zaqar/queues/transport/wsgi/v1_0/messages.py index 61faaf689..2cfb8b3c5 100644 --- a/zaqar/queues/transport/wsgi/v1_0/messages.py +++ b/zaqar/queues/transport/wsgi/v1_0/messages.py @@ -279,11 +279,15 @@ class ItemResource(object): # status defaults to 200 def on_delete(self, req, resp, project_id, queue_name, message_id): + LOG.debug(u'Messages item DELETE - message: %(message)s, ' u'queue: %(queue)s, project: %(project)s', {'message': message_id, 'queue': queue_name, 'project': project_id}) + + error_title = _(u'Unable to delete') + try: self.message_controller.delete( queue_name, @@ -291,12 +295,23 @@ class ItemResource(object): project=project_id, claim=req.get_param('claim_id')) + except storage_errors.MessageNotClaimed as ex: + LOG.debug(ex) + description = _(u'A claim was specified, but the message ' + u'is not currently claimed.') + raise falcon.HTTPBadRequest(error_title, description) + + except storage_errors.ClaimDoesNotExist as ex: + LOG.debug(ex) + description = _(u'The specified claim does not exist or ' + u'has expired.') + raise falcon.HTTPBadRequest(error_title, description) + except storage_errors.NotPermitted as ex: - LOG.exception(ex) - title = _(u'Unable to delete') + LOG.debug(ex) description = _(u'This message is claimed; it cannot be ' - u'deleted without a valid claim_id.') - raise falcon.HTTPForbidden(title, description) + u'deleted without a valid claim ID.') + raise falcon.HTTPForbidden(error_title, description) except Exception as ex: LOG.exception(ex) diff --git a/zaqar/queues/transport/wsgi/v1_1/messages.py b/zaqar/queues/transport/wsgi/v1_1/messages.py index 080dd7393..bd9514ad8 100644 --- a/zaqar/queues/transport/wsgi/v1_1/messages.py +++ b/zaqar/queues/transport/wsgi/v1_1/messages.py @@ -341,6 +341,9 @@ class ItemResource(object): {'message': message_id, 'queue': queue_name, 'project': project_id}) + + error_title = _(u'Unable to delete') + try: self.message_controller.delete( queue_name, @@ -348,12 +351,23 @@ class ItemResource(object): project=project_id, claim=req.get_param('claim_id')) + except storage_errors.MessageNotClaimed as ex: + LOG.debug(ex) + description = _(u'A claim was specified, but the message ' + u'is not currently claimed.') + raise falcon.HTTPBadRequest(error_title, description) + + except storage_errors.ClaimDoesNotExist as ex: + LOG.debug(ex) + description = _(u'The specified claim does not exist or ' + u'has expired.') + raise falcon.HTTPBadRequest(error_title, description) + except storage_errors.NotPermitted as ex: - LOG.exception(ex) - title = _(u'Unable to delete') + LOG.debug(ex) description = _(u'This message is claimed; it cannot be ' - u'deleted without a valid claim_id.') - raise falcon.HTTPForbidden(title, description) + u'deleted without a valid claim ID.') + raise falcon.HTTPForbidden(error_title, description) except Exception as ex: LOG.exception(ex) diff --git a/zaqar/tests/queues/storage/base.py b/zaqar/tests/queues/storage/base.py index 7eb2ac264..21622e7e0 100644 --- a/zaqar/tests/queues/storage/base.py +++ b/zaqar/tests/queues/storage/base.py @@ -402,6 +402,12 @@ class MessageControllerTest(ControllerBaseTest): self.claim_controller.delete(self.queue_name, cid, project=self.project) + # NOTE(kgriffs) Message is no longer claimed, but try + # to delete it with the claim anyway. It should raise + # an error, because the client needs a hint that + # perhaps the claim expired before it got around to + # trying to delete the message, which means another + # worker could be processing this message now. with testing.expect(storage.errors.NotPermitted): self.controller.delete(self.queue_name, msg2['id'], project=self.project, @@ -465,11 +471,20 @@ class MessageControllerTest(ControllerBaseTest): project=self.project, client_uuid=uuid.uuid4()) - bad_claim_id = '; DROP TABLE queues' - self.controller.delete(self.queue_name, - msgid, - project=self.project, - claim=bad_claim_id) + # NOTE(kgriffs): If the client has a typo or + # something, they will need a hint that the + # request was invalid. + # + # On the other hand, if they are actually + # probing for a vulnerability, telling them + # the claim they requested doesn't exist should + # be harmless. + with testing.expect(storage.errors.ClaimDoesNotExist): + bad_claim_id = '; DROP TABLE queues' + self.controller.delete(self.queue_name, + msgid, + project=self.project, + claim=bad_claim_id) def test_bad_marker(self): bad_marker = 'xyz' diff --git a/zaqar/tests/queues/transport/wsgi/v1/test_claims.py b/zaqar/tests/queues/transport/wsgi/v1/test_claims.py index 6518a247e..ca3de83ad 100644 --- a/zaqar/tests/queues/transport/wsgi/v1/test_claims.py +++ b/zaqar/tests/queues/transport/wsgi/v1/test_claims.py @@ -183,7 +183,7 @@ class ClaimsBaseTest(base.V1Base): # Try to delete a message with an invalid claim ID self.simulate_delete(message_href, self.project_id, query_string=params) - self.assertEqual(self.srmock.status, falcon.HTTP_403) + self.assertEqual(self.srmock.status, falcon.HTTP_400) # Make sure it wasn't deleted! self.simulate_get(message_href, self.project_id, query_string=params) diff --git a/zaqar/tests/queues/transport/wsgi/v1/test_messages.py b/zaqar/tests/queues/transport/wsgi/v1/test_messages.py index 0178e1715..226f1a60c 100644 --- a/zaqar/tests/queues/transport/wsgi/v1/test_messages.py +++ b/zaqar/tests/queues/transport/wsgi/v1/test_messages.py @@ -54,6 +54,13 @@ class MessagesBaseTest(base.V1Base): doc = '{"_ttl": 60}' self.simulate_put(self.queue_path, self.project_id, body=doc) + # NOTE(kgriffs): Also register without a project for tests + # that do not specify a project. + # + # TODO(kgriffs): Should a project id always be required or + # automatically supplied in the simulate_* methods? + self.simulate_put(self.queue_path, body=doc) + self.headers = { 'Client-ID': str(uuid.uuid4()), } @@ -424,8 +431,9 @@ class MessagesBaseTest(base.V1Base): resp = self._post_messages(path + '/messages', 1) location = jsonutils.loads(resp[0])['resources'][0] - self.simulate_delete(location, query_string='claim_id=invalid') - self.assertEqual(self.srmock.status, falcon.HTTP_204) + self.simulate_delete(location, self.project_id, + query_string='claim_id=invalid') + self.assertEqual(self.srmock.status, falcon.HTTP_400) self.simulate_get(location, self.project_id) self.assertEqual(self.srmock.status, falcon.HTTP_200) diff --git a/zaqar/tests/queues/transport/wsgi/v1_1/test_claims.py b/zaqar/tests/queues/transport/wsgi/v1_1/test_claims.py index 35e0e988e..d06c67988 100644 --- a/zaqar/tests/queues/transport/wsgi/v1_1/test_claims.py +++ b/zaqar/tests/queues/transport/wsgi/v1_1/test_claims.py @@ -233,7 +233,7 @@ class ClaimsBaseTest(base.V1_1Base): # Try to delete a message with an invalid claim ID self.simulate_delete(message_href, query_string=params, headers=self.headers) - self.assertEqual(self.srmock.status, falcon.HTTP_403) + self.assertEqual(self.srmock.status, falcon.HTTP_400) # Make sure it wasn't deleted! self.simulate_get(message_href, query_string=params, diff --git a/zaqar/tests/queues/transport/wsgi/v1_1/test_messages.py b/zaqar/tests/queues/transport/wsgi/v1_1/test_messages.py index 422eae1c1..1b50194ff 100644 --- a/zaqar/tests/queues/transport/wsgi/v1_1/test_messages.py +++ b/zaqar/tests/queues/transport/wsgi/v1_1/test_messages.py @@ -470,7 +470,7 @@ class MessagesBaseTest(base.V1_1Base): self.simulate_delete(location, query_string='claim_id=invalid', headers=self.headers) - self.assertEqual(self.srmock.status, falcon.HTTP_204) + self.assertEqual(self.srmock.status, falcon.HTTP_400) self.simulate_get(location, headers=self.headers) self.assertEqual(self.srmock.status, falcon.HTTP_200)