diff --git a/marconi/storage/exceptions.py b/marconi/storage/exceptions.py index 99f27c9a7..cf18e2591 100644 --- a/marconi/storage/exceptions.py +++ b/marconi/storage/exceptions.py @@ -84,6 +84,13 @@ class MessageDoesNotExist(DoesNotExist): super(MessageDoesNotExist, self).__init__(msg) +class MessageIsClaimed(NotPermitted): + + def __init__(self, mid): + msg = (u'Message %(mid)s is claimed' % dict(mid=mid)) + super(MessageIsClaimed, self).__init__(msg) + + class ClaimDoesNotExist(DoesNotExist): def __init__(self, cid, queue, project): @@ -93,9 +100,9 @@ class ClaimDoesNotExist(DoesNotExist): super(ClaimDoesNotExist, self).__init__(msg) -class ClaimNotPermitted(NotPermitted): +class MessageIsClaimedBy(NotPermitted): def __init__(self, mid, cid): msg = (u'Message %(mid)s is not claimed by %(cid)s' % dict(cid=cid, mid=mid)) - super(ClaimNotPermitted, self).__init__(msg) + super(MessageIsClaimedBy, self).__init__(msg) diff --git a/marconi/storage/mongodb/messages.py b/marconi/storage/mongodb/messages.py index c0aafca2b..b85dfb82c 100644 --- a/marconi/storage/mongodb/messages.py +++ b/marconi/storage/mongodb/messages.py @@ -609,29 +609,32 @@ class MessageController(storage.MessageBase): '_id': mid } - if claim: - # NOTE(cpp-cabrera): return early - the user gaves us an - # invalid claim id and that renders the rest of this - # request moot - cid = utils.to_oid(claim) - if cid is None: - return + # NOTE(cpp-cabrera): return early - the user gaves us an + # invalid claim id and that renders the rest of this + # request moot + cid = utils.to_oid(claim) + if cid is None: + return - now = timeutils.utcnow() - query['e'] = {'$gt': now} - message = self._col.find_one(query) + now = timeutils.utcnow() + query['e'] = {'$gt': now} + message = self._col.find_one(query) - if message is None: - return None + if message is None: + return - if not ('c' in message and - message['c']['id'] == cid and - message['c']['e'] > now): - raise exceptions.ClaimNotPermitted(message_id, claim) + is_claimed = (message['c']['id'] is not None and + message['c']['e'] > now) + + if claim is None: + if is_claimed: + raise exceptions.MessageIsClaimed(message_id) - self._col.remove(query['_id'], w=0) else: - self._col.remove(query, w=0) + if message['c']['id'] != cid: + raise exceptions.MessageIsClaimedBy(message_id, claim) + + self._col.remove(query['_id'], w=0) @utils.raises_conn_error def bulk_delete(self, queue_name, message_ids, project=None): diff --git a/marconi/storage/sqlite/messages.py b/marconi/storage/sqlite/messages.py index 3fea195f8..fc29612cf 100644 --- a/marconi/storage/sqlite/messages.py +++ b/marconi/storage/sqlite/messages.py @@ -223,15 +223,6 @@ class MessageController(base.MessageBase): if id is None: return - if not claim: - self.driver.run(''' - delete from Messages - where id = ? - and qid = (select id from Queues - where project = ? and name = ?) - ''', id, project, queue) - return - with self.driver('immediate'): message_exists, = self.driver.get(''' select count(M.id) @@ -244,7 +235,23 @@ class MessageController(base.MessageBase): if not message_exists: return - self.__delete_claimed(id, claim) + if claim is None: + self.__delete_unclaimed(id) + else: + self.__delete_claimed(id, claim) + + def __delete_unclaimed(self, id): + self.driver.run(''' + delete from Messages + where id = ? + and not exists (select * + from Claims join Locked + on id = cid + where ttl > julianday() * 86400.0 - created) + ''', id) + + if not self.driver.affected: + raise exceptions.MessageIsClaimed(id) def __delete_claimed(self, id, claim): # Precondition: id exists in a specific queue @@ -263,7 +270,7 @@ class MessageController(base.MessageBase): ''', id, cid) if not self.driver.affected: - raise exceptions.ClaimNotPermitted(id, claim) + raise exceptions.MessageIsClaimedBy(id, claim) def bulk_delete(self, queue, message_ids, project): if project is None: diff --git a/marconi/tests/transport/wsgi/test_claims.py b/marconi/tests/transport/wsgi/test_claims.py index ecdc64bfa..b11c4ae90 100644 --- a/marconi/tests/transport/wsgi/test_claims.py +++ b/marconi/tests/transport/wsgi/test_claims.py @@ -145,6 +145,10 @@ class ClaimsBaseTest(base.TestBase): claim_href) self.assertEquals(claim['ttl'], 100) + # Try to delete the message without submitting a claim_id + self.simulate_delete(message_href, self.project_id) + self.assertEquals(self.srmock.status, falcon.HTTP_403) + # Delete the message and its associated claim self.simulate_delete(message_href, self.project_id, query_string=params) diff --git a/marconi/transport/wsgi/messages.py b/marconi/transport/wsgi/messages.py index d3ce444ae..11e713b50 100644 --- a/marconi/transport/wsgi/messages.py +++ b/marconi/transport/wsgi/messages.py @@ -290,10 +290,11 @@ class ItemResource(object): except storage_exceptions.NotPermitted as ex: LOG.exception(ex) - title = _(u'Invalid claim') - description = _(u'The specified claim either does not ' - u'exist or has expired.') + title = _(u'Unable to delete') + description = _(u'This message is claimed; it cannot be ' + u'deleted without a valid claim_id.') raise falcon.HTTPForbidden(title, description) + except Exception as ex: LOG.exception(ex) description = _(u'Message could not be deleted.')