Get messages by id

This patch modifies the "get" method in the storage drivers' message
controller to allow querying a set of messages by id.

In the next patch, the WSGI transport driver will be similarly
modified to allow querying for multiple messages by ID, through
the API.

Change-Id: I3bd183438a3eaf7af6ca52a19bb804c47ac1f245
Implements: blueprint v1-api
This commit is contained in:
kgriffs
2013-06-17 18:29:10 -04:00
parent 60803d84e2
commit a262a658d3
6 changed files with 69 additions and 46 deletions

View File

@@ -182,15 +182,16 @@ class MessageBase(ControllerBase):
""" """
raise NotImplementedError raise NotImplementedError
def get(self, queue, message_id, project=None): def get(self, queue, message_ids, project=None):
"""Base method for getting a message. """Base method for getting a message.
:param queue: Name of the queue to get the :param queue: Name of the queue to get the
message from. message from.
:param project: Project id :param project: Project id
:param message_id: Message ID :param message_ids: One or more message IDs. Can be a single
string ID or a list of IDs.
:returns: Dictionary containing message data :returns: An iterable, yielding dicts containing message details
:raises: DoesNotExist :raises: DoesNotExist
""" """
raise NotImplementedError raise NotImplementedError

View File

@@ -482,31 +482,34 @@ class MessageController(storage.MessageBase):
yield utils.HookedCursor(messages, denormalizer) yield utils.HookedCursor(messages, denormalizer)
yield str(marker_id['next']) yield str(marker_id['next'])
def get(self, queue, message_id, project=None): def get(self, queue, message_ids, project=None):
mid = utils.to_oid(message_id) if not isinstance(message_ids, list):
message_ids = [message_ids]
message_ids = [utils.to_oid(id) for id in message_ids]
now = timeutils.utcnow() now = timeutils.utcnow()
# Base query, always check expire time # Base query, always check expire time
query = { query = {
'q': self._get_queue_id(queue, project), 'q': self._get_queue_id(queue, project),
'e': {'$gt': now}, 'e': {'$gt': now},
'_id': mid '_id': {'$in': message_ids},
} }
message = self._col.find_one(query) messages = self._col.find(query)
if message is None: def denormalizer(msg):
raise exceptions.MessageDoesNotExist(message_id, queue, project) oid = msg['_id']
age = now - utils.oid_utc(oid)
oid = message['_id'] return {
age = now - utils.oid_utc(oid) 'id': str(oid),
'age': age.seconds,
'ttl': msg['t'],
'body': msg['b'],
}
return { return utils.HookedCursor(messages, denormalizer)
'id': str(oid),
'age': age.seconds,
'ttl': message['t'],
'body': message['b'],
}
def post(self, queue, messages, client_uuid, project=None): def post(self, queue, messages, client_uuid, project=None):
now = timeutils.utcnow() now = timeutils.utcnow()

View File

@@ -143,26 +143,30 @@ class Message(base.MessageBase):
) )
''') ''')
def get(self, queue, message_id, project): def get(self, queue, message_ids, project):
try: if not isinstance(message_ids, list):
content, ttl, age = self.driver.get(''' message_ids = [message_ids]
select content, ttl, julianday() * 86400.0 - created
from Queues as Q join Messages as M
on qid = Q.id
where ttl > julianday() * 86400.0 - created
and M.id = ? and project = ? and name = ?
''', _msgid_decode(message_id), project, queue)
return { message_ids = ["'%s'" % _msgid_decode(id) for id in message_ids]
'id': message_id, message_ids = ','.join(message_ids)
sql = '''
select M.id, content, ttl, julianday() * 86400.0 - created
from Queues as Q join Messages as M
on qid = Q.id
where ttl > julianday() * 86400.0 - created
and M.id in (%s) and project = ? and name = ?
''' % message_ids
records = self.driver.run(sql, project, queue)
for id, content, ttl, age in records:
yield {
'id': id,
'ttl': ttl, 'ttl': ttl,
'age': int(age), 'age': int(age),
'body': content, 'body': content,
} }
except _NoResult:
raise exceptions.MessageDoesNotExist(message_id, queue, project)
def list(self, queue, project, marker=None, def list(self, queue, project, marker=None,
limit=10, echo=False, client_uuid=None): limit=10, echo=False, client_uuid=None):
@@ -446,7 +450,11 @@ def _get_qid(driver, queue, project):
# come with no special functionalities. # come with no special functionalities.
def _msgid_encode(id): def _msgid_encode(id):
return hex(id ^ 0x5c693a53)[2:] try:
return hex(id ^ 0x5c693a53)[2:]
except TypeError:
raise exceptions.MalformedID()
def _msgid_decode(id): def _msgid_decode(id):

View File

@@ -154,11 +154,10 @@ class MessageControllerTest(ControllerBaseTest):
# Test Message Deletion # Test Message Deletion
self.controller.delete(queue_name, created[0], project=self.project) self.controller.delete(queue_name, created[0], project=self.project)
# Test DoesNotExist # Test does not exist
self.assertRaises(storage.exceptions.DoesNotExist, messages = self.controller.get(queue_name, message_ids=created,
self.controller.get, project=self.project)
queue_name, message_id=created[0], self.assertRaises(StopIteration, messages.next)
project=self.project)
def test_get_multi(self): def test_get_multi(self):
_insert_fixtures(self.controller, self.queue_name, _insert_fixtures(self.controller, self.queue_name,
@@ -187,6 +186,18 @@ class MessageControllerTest(ControllerBaseTest):
load_messages(5, self.queue_name, echo=True, project=self.project, load_messages(5, self.queue_name, echo=True, project=self.project,
marker=interaction.next(), client_uuid='my_uuid') marker=interaction.next(), client_uuid='my_uuid')
def test_get_multi_by_id(self):
messages_in = [{'ttl': 120, 'body': 0}, {'ttl': 240, 'body': 1}]
ids = self.controller.post(self.queue_name, messages_in,
project=self.project,
client_uuid='my_uuid')
messages_out = self.controller.get(self.queue_name, ids,
project=self.project)
for idx, message in enumerate(messages_out):
self.assertEquals(message['body'], idx)
def test_claim_effects(self): def test_claim_effects(self):
_insert_fixtures(self.controller, self.queue_name, _insert_fixtures(self.controller, self.queue_name,
project=self.project, client_uuid='my_uuid', num=12) project=self.project, client_uuid='my_uuid', num=12)
@@ -210,9 +221,9 @@ class MessageControllerTest(ControllerBaseTest):
project=self.project, project=self.project,
claim=cid) claim=cid)
with testing.expect(storage.exceptions.DoesNotExist): with testing.expect(StopIteration):
self.controller.get(self.queue_name, msg1['id'], self.controller.get(self.queue_name, msg1['id'],
project=self.project) project=self.project).next()
# Make sure such a deletion is idempotent # Make sure such a deletion is idempotent
self.controller.delete(self.queue_name, msg1['id'], self.controller.delete(self.queue_name, msg1['id'],
@@ -235,9 +246,9 @@ class MessageControllerTest(ControllerBaseTest):
project=self.project, project=self.project,
client_uuid='my_uuid') client_uuid='my_uuid')
with testing.expect(storage.exceptions.DoesNotExist): with testing.expect(StopIteration):
self.controller.get(self.queue_name, msgid, self.controller.get(self.queue_name, msgid,
project=self.project) project=self.project).next()
countof = self.queue_controller.stats(self.queue_name, countof = self.queue_controller.stats(self.queue_name,
project=self.project) project=self.project)
@@ -261,7 +272,7 @@ class MessageControllerTest(ControllerBaseTest):
self.controller.delete(queue, bad_message_id, project) self.controller.delete(queue, bad_message_id, project)
with testing.expect(exceptions.MalformedID): with testing.expect(exceptions.MalformedID):
self.controller.get(queue, bad_message_id, project) self.controller.get(queue, bad_message_id, project).next()
def test_bad_claim_id(self): def test_bad_claim_id(self):
self.queue_controller.upsert('unused', {}, '480924') self.queue_controller.upsert('unused', {}, '480924')

View File

@@ -58,7 +58,7 @@ class MessageController(storage.MessageBase):
def __init__(self, driver): def __init__(self, driver):
pass pass
def get(self, queue, project=None, message_id=None, def get(self, queue, project=None, message_ids=None,
marker=None, echo=False, client_uuid=None): marker=None, echo=False, client_uuid=None):
raise NotImplementedError() raise NotImplementedError()

View File

@@ -170,10 +170,10 @@ class ItemResource(object):
try: try:
message = self.message_controller.get( message = self.message_controller.get(
queue_name, queue_name,
message_id=message_id, message_id,
project=project_id) project=project_id).next()
except storage_exceptions.DoesNotExist: except StopIteration:
raise falcon.HTTPNotFound() raise falcon.HTTPNotFound()
except Exception as ex: except Exception as ex:
LOG.exception(ex) LOG.exception(ex)