Do not fail silently when given a bogus claim ID

This patch modifies the message delete operation so that it raises
an exception when given a totally bogus claim ID. Previously, the
drivers would just fail silently. On the other hand, the drivers
would raise an error if the claim ID was formatted correctly, but
the message was no longer claimed.

In both scenarious, the client needs a hint that something went
wrong, and so this patch simply has them both result in the same
error. The WSGI transport driver was also updated to return more
accurate error responses.

       / .'
 .---. \/
(._.' \()
 ^"""^"

Change-Id: I6f64c5eb764c821f3996c0ca6789b49d84a6cd2c
This commit is contained in:
kgriffs 2014-08-28 09:49:10 -05:00
parent ab1d471d94
commit 6472ed4ba4
11 changed files with 128 additions and 46 deletions

View File

@ -90,14 +90,6 @@ class MessageDoesNotExist(DoesNotExist):
project=project) project=project)
class MessageIsClaimed(NotPermitted):
msg_format = u'Message {mid} is claimed'
def __init__(self, mid):
super(MessageIsClaimed, self).__init__(mid=mid)
class ClaimDoesNotExist(DoesNotExist): class ClaimDoesNotExist(DoesNotExist):
msg_format = (u'Claim {cid} does not exist in ' msg_format = (u'Claim {cid} does not exist in '
@ -108,6 +100,30 @@ class ClaimDoesNotExist(DoesNotExist):
project=project) project=project)
class MessageIsClaimed(NotPermitted):
msg_format = u'Message {mid} is claimed'
def __init__(self, mid):
super(MessageIsClaimed, self).__init__(mid=mid)
class MessageNotClaimed(NotPermitted):
msg_format = u'Message {mid} is no longer claimed'
def __init__(self, mid):
super(MessageNotClaimed, self).__init__(mid=mid)
class MessageNotClaimedBy(NotPermitted):
msg_format = u'Message {mid} is not claimed by {cid}'
def __init__(self, mid, cid):
super(MessageNotClaimedBy, self).__init__(cid=cid, mid=mid)
class QueueNotMapped(DoesNotExist): class QueueNotMapped(DoesNotExist):
msg_format = (u'No pool found for ' msg_format = (u'No pool found for '
@ -117,14 +133,6 @@ class QueueNotMapped(DoesNotExist):
super(QueueNotMapped, self).__init__(queue=queue, project=project) super(QueueNotMapped, self).__init__(queue=queue, project=project)
class MessageIsClaimedBy(NotPermitted):
msg_format = u'Message {mid} is not claimed by {cid}'
def __init__(self, mid, cid):
super(MessageIsClaimedBy, self).__init__(cid=cid, mid=mid)
class PoolDoesNotExist(DoesNotExist): class PoolDoesNotExist(DoesNotExist):
msg_format = u'Pool {pool} does not exist' msg_format = u'Pool {pool} does not exist'

View File

@ -688,12 +688,9 @@ class MessageController(storage.Message):
PROJ_QUEUE: utils.scope_queue_name(queue_name, project), PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
} }
# NOTE(cpp-cabrera): return early - the user gaves us an
# invalid claim id and that renders the rest of this
# request moot
cid = utils.to_oid(claim) cid = utils.to_oid(claim)
if cid is None: if cid is None:
return raise errors.ClaimDoesNotExist(queue_name, project, claim)
now = timeutils.utcnow_ts() now = timeutils.utcnow_ts()
cursor = collection.find(query).hint(ID_INDEX_FIELDS) cursor = collection.find(query).hint(ID_INDEX_FIELDS)
@ -703,11 +700,8 @@ class MessageController(storage.Message):
except StopIteration: except StopIteration:
return return
is_claimed = (message['c']['id'] is not None and
message['c']['e'] > now)
if claim is None: if claim is None:
if is_claimed: if _is_claimed(message, now):
raise errors.MessageIsClaimed(message_id) raise errors.MessageIsClaimed(message_id)
else: else:
@ -719,7 +713,10 @@ class MessageController(storage.Message):
message = collection.find_one(query, read_preference=pref) message = collection.find_one(query, read_preference=pref)
if message['c']['id'] != cid: if message['c']['id'] != cid:
raise errors.MessageIsClaimedBy(message_id, claim) if _is_claimed(message, now):
raise errors.MessageNotClaimedBy(message_id, claim)
raise errors.MessageNotClaimed(message_id)
collection.remove(query['_id'], w=0) collection.remove(query['_id'], w=0)
@ -762,6 +759,11 @@ class MessageController(storage.Message):
return final_messages return final_messages
def _is_claimed(msg, now):
return (msg['c']['id'] is not None and
msg['c']['e'] > now)
def _basic_message(msg, now): def _basic_message(msg, now):
oid = msg['_id'] oid = msg['_id']
age = now - utils.oid_ts(oid) age = now - utils.oid_ts(oid)

View File

@ -73,6 +73,17 @@ class MessageController(storage.Message):
except errors.MessageDoesNotExist: except errors.MessageDoesNotExist:
return False return False
def _get_cid(self, mid):
"""Return the decoded claim ID for the given message.
:param mid: Decoded message ID
"""
and_stmt = sa.and_(tables.Messages.c.id == mid)
sel = sa.sql.select([tables.Messages.c.cid], and_stmt)
return self.driver.get(sel)[0]
def get(self, queue, message_id, project): def get(self, queue, message_id, project):
body, ttl, created = self._get(queue, message_id, project) body, ttl, created = self._get(queue, message_id, project)
now = timeutils.utcnow_ts() now = timeutils.utcnow_ts()
@ -265,7 +276,7 @@ class MessageController(storage.Message):
cid = claim and utils.cid_decode(claim) or None cid = claim and utils.cid_decode(claim) or None
if claim and cid is None: if claim and cid is None:
return raise errors.ClaimDoesNotExist(queue, project, claim)
and_stmt.append(tables.Messages.c.cid == cid) and_stmt.append(tables.Messages.c.cid == cid)
@ -273,7 +284,16 @@ class MessageController(storage.Message):
res = trans.execute(statement) res = trans.execute(statement)
if res.rowcount == 0: if res.rowcount == 0:
raise errors.MessageIsClaimed(mid) # NOTE(kgriffs): Either the message is not claimed,
# or if it is, the specified claim does not exist.
cid = self._get_cid(mid)
if cid is None:
raise errors.MessageNotClaimed(mid)
# NOTE(kgriffs): The message exists, but the claim
# must have expired or something, since it
# was not associated with the message.
raise errors.MessageNotClaimedBy(mid, claim)
def bulk_delete(self, queue, message_ids, project): def bulk_delete(self, queue, message_ids, project):
if project is None: if project is None:

View File

@ -33,7 +33,7 @@ class HTTPServiceUnavailable(falcon.HTTPServiceUnavailable):
class HTTPBadRequestAPI(falcon.HTTPBadRequest): class HTTPBadRequestAPI(falcon.HTTPBadRequest):
"""Wraps falcon.HTTPBadRequest with a contextual title.""" """Wraps falcon.HTTPBadRequest with a contextual title."""
TITLE = _(u'Invalid API call') TITLE = _(u'Invalid API request')
def __init__(self, description): def __init__(self, description):
super(HTTPBadRequestAPI, self).__init__(self.TITLE, description) super(HTTPBadRequestAPI, self).__init__(self.TITLE, description)

View File

@ -279,11 +279,15 @@ class ItemResource(object):
# status defaults to 200 # status defaults to 200
def on_delete(self, req, resp, project_id, queue_name, message_id): def on_delete(self, req, resp, project_id, queue_name, message_id):
LOG.debug(u'Messages item DELETE - message: %(message)s, ' LOG.debug(u'Messages item DELETE - message: %(message)s, '
u'queue: %(queue)s, project: %(project)s', u'queue: %(queue)s, project: %(project)s',
{'message': message_id, {'message': message_id,
'queue': queue_name, 'queue': queue_name,
'project': project_id}) 'project': project_id})
error_title = _(u'Unable to delete')
try: try:
self.message_controller.delete( self.message_controller.delete(
queue_name, queue_name,
@ -291,12 +295,23 @@ class ItemResource(object):
project=project_id, project=project_id,
claim=req.get_param('claim_id')) claim=req.get_param('claim_id'))
except storage_errors.MessageNotClaimed as ex:
LOG.debug(ex)
description = _(u'A claim was specified, but the message '
u'is not currently claimed.')
raise falcon.HTTPBadRequest(error_title, description)
except storage_errors.ClaimDoesNotExist as ex:
LOG.debug(ex)
description = _(u'The specified claim does not exist or '
u'has expired.')
raise falcon.HTTPBadRequest(error_title, description)
except storage_errors.NotPermitted as ex: except storage_errors.NotPermitted as ex:
LOG.exception(ex) LOG.debug(ex)
title = _(u'Unable to delete')
description = _(u'This message is claimed; it cannot be ' description = _(u'This message is claimed; it cannot be '
u'deleted without a valid claim_id.') u'deleted without a valid claim ID.')
raise falcon.HTTPForbidden(title, description) raise falcon.HTTPForbidden(error_title, description)
except Exception as ex: except Exception as ex:
LOG.exception(ex) LOG.exception(ex)

View File

@ -341,6 +341,9 @@ class ItemResource(object):
{'message': message_id, {'message': message_id,
'queue': queue_name, 'queue': queue_name,
'project': project_id}) 'project': project_id})
error_title = _(u'Unable to delete')
try: try:
self.message_controller.delete( self.message_controller.delete(
queue_name, queue_name,
@ -348,12 +351,23 @@ class ItemResource(object):
project=project_id, project=project_id,
claim=req.get_param('claim_id')) claim=req.get_param('claim_id'))
except storage_errors.MessageNotClaimed as ex:
LOG.debug(ex)
description = _(u'A claim was specified, but the message '
u'is not currently claimed.')
raise falcon.HTTPBadRequest(error_title, description)
except storage_errors.ClaimDoesNotExist as ex:
LOG.debug(ex)
description = _(u'The specified claim does not exist or '
u'has expired.')
raise falcon.HTTPBadRequest(error_title, description)
except storage_errors.NotPermitted as ex: except storage_errors.NotPermitted as ex:
LOG.exception(ex) LOG.debug(ex)
title = _(u'Unable to delete')
description = _(u'This message is claimed; it cannot be ' description = _(u'This message is claimed; it cannot be '
u'deleted without a valid claim_id.') u'deleted without a valid claim ID.')
raise falcon.HTTPForbidden(title, description) raise falcon.HTTPForbidden(error_title, description)
except Exception as ex: except Exception as ex:
LOG.exception(ex) LOG.exception(ex)

View File

@ -402,6 +402,12 @@ class MessageControllerTest(ControllerBaseTest):
self.claim_controller.delete(self.queue_name, cid, self.claim_controller.delete(self.queue_name, cid,
project=self.project) project=self.project)
# NOTE(kgriffs) Message is no longer claimed, but try
# to delete it with the claim anyway. It should raise
# an error, because the client needs a hint that
# perhaps the claim expired before it got around to
# trying to delete the message, which means another
# worker could be processing this message now.
with testing.expect(storage.errors.NotPermitted): with testing.expect(storage.errors.NotPermitted):
self.controller.delete(self.queue_name, msg2['id'], self.controller.delete(self.queue_name, msg2['id'],
project=self.project, project=self.project,
@ -465,11 +471,20 @@ class MessageControllerTest(ControllerBaseTest):
project=self.project, project=self.project,
client_uuid=uuid.uuid4()) client_uuid=uuid.uuid4())
bad_claim_id = '; DROP TABLE queues' # NOTE(kgriffs): If the client has a typo or
self.controller.delete(self.queue_name, # something, they will need a hint that the
msgid, # request was invalid.
project=self.project, #
claim=bad_claim_id) # On the other hand, if they are actually
# probing for a vulnerability, telling them
# the claim they requested doesn't exist should
# be harmless.
with testing.expect(storage.errors.ClaimDoesNotExist):
bad_claim_id = '; DROP TABLE queues'
self.controller.delete(self.queue_name,
msgid,
project=self.project,
claim=bad_claim_id)
def test_bad_marker(self): def test_bad_marker(self):
bad_marker = 'xyz' bad_marker = 'xyz'

View File

@ -183,7 +183,7 @@ class ClaimsBaseTest(base.V1Base):
# Try to delete a message with an invalid claim ID # Try to delete a message with an invalid claim ID
self.simulate_delete(message_href, self.project_id, self.simulate_delete(message_href, self.project_id,
query_string=params) query_string=params)
self.assertEqual(self.srmock.status, falcon.HTTP_403) self.assertEqual(self.srmock.status, falcon.HTTP_400)
# Make sure it wasn't deleted! # Make sure it wasn't deleted!
self.simulate_get(message_href, self.project_id, query_string=params) self.simulate_get(message_href, self.project_id, query_string=params)

View File

@ -54,6 +54,13 @@ class MessagesBaseTest(base.V1Base):
doc = '{"_ttl": 60}' doc = '{"_ttl": 60}'
self.simulate_put(self.queue_path, self.project_id, body=doc) self.simulate_put(self.queue_path, self.project_id, body=doc)
# NOTE(kgriffs): Also register without a project for tests
# that do not specify a project.
#
# TODO(kgriffs): Should a project id always be required or
# automatically supplied in the simulate_* methods?
self.simulate_put(self.queue_path, body=doc)
self.headers = { self.headers = {
'Client-ID': str(uuid.uuid4()), 'Client-ID': str(uuid.uuid4()),
} }
@ -424,8 +431,9 @@ class MessagesBaseTest(base.V1Base):
resp = self._post_messages(path + '/messages', 1) resp = self._post_messages(path + '/messages', 1)
location = jsonutils.loads(resp[0])['resources'][0] location = jsonutils.loads(resp[0])['resources'][0]
self.simulate_delete(location, query_string='claim_id=invalid') self.simulate_delete(location, self.project_id,
self.assertEqual(self.srmock.status, falcon.HTTP_204) query_string='claim_id=invalid')
self.assertEqual(self.srmock.status, falcon.HTTP_400)
self.simulate_get(location, self.project_id) self.simulate_get(location, self.project_id)
self.assertEqual(self.srmock.status, falcon.HTTP_200) self.assertEqual(self.srmock.status, falcon.HTTP_200)

View File

@ -233,7 +233,7 @@ class ClaimsBaseTest(base.V1_1Base):
# Try to delete a message with an invalid claim ID # Try to delete a message with an invalid claim ID
self.simulate_delete(message_href, self.simulate_delete(message_href,
query_string=params, headers=self.headers) query_string=params, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_403) self.assertEqual(self.srmock.status, falcon.HTTP_400)
# Make sure it wasn't deleted! # Make sure it wasn't deleted!
self.simulate_get(message_href, query_string=params, self.simulate_get(message_href, query_string=params,

View File

@ -470,7 +470,7 @@ class MessagesBaseTest(base.V1_1Base):
self.simulate_delete(location, query_string='claim_id=invalid', self.simulate_delete(location, query_string='claim_id=invalid',
headers=self.headers) headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_204) self.assertEqual(self.srmock.status, falcon.HTTP_400)
self.simulate_get(location, headers=self.headers) self.simulate_get(location, headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200) self.assertEqual(self.srmock.status, falcon.HTTP_200)