Use queue's name and project to get messages

Mongodb's driver currently queries the queue's collection to get queue
id and then filter messages with it. Instead of storing the id, this
patches stores the queue name and the project in the message collection
and use that for querying messages.

It's safe to assume that the queue exists when messages are returned by
the query, however, posts operation will still verify queue's existence.

Fixes bug: #1207018

Change-Id: Iaa7bc3f1300b3349a5cfad5a9f6ecabb2f75e95e
This commit is contained in:
Flaper Fesp
2013-08-01 17:22:42 +02:00
parent 5b1c447ba2
commit a30092c55e
7 changed files with 128 additions and 130 deletions

View File

@@ -64,9 +64,6 @@ class ClaimController(storage.ClaimBase):
def get(self, queue, claim_id, project=None):
msg_ctrl = self.driver.message_controller
# Check whether the queue exists or not
qid = self._get_queue_id(queue, project)
# Base query, always check expire time
now = timeutils.utcnow()
@@ -91,8 +88,9 @@ class ClaimController(storage.ClaimBase):
# Lets get claim's data
# from the first message
# in the iterator
messages = messages(msg_ctrl.claimed(qid, cid, now))
claim = next(messages)
msgs = messages(msg_ctrl.claimed(queue, cid, now,
project=project))
claim = next(msgs)
claim = {
'age': age.seconds,
'ttl': claim.pop('t'),
@@ -101,7 +99,7 @@ class ClaimController(storage.ClaimBase):
except StopIteration:
raise exceptions.ClaimDoesNotExist(cid, queue, project)
return (claim, messages)
return (claim, msgs)
@utils.raises_conn_error
def create(self, queue, metadata, project=None, limit=10):
@@ -124,9 +122,8 @@ class ClaimController(storage.ClaimBase):
"""
msg_ctrl = self.driver.message_controller
# We don't need the qid here but
# we need to verify it exists.
qid = self._get_queue_id(queue, project)
self._get_queue_id(queue, project)
ttl = metadata['ttl']
grace = metadata['grace']
oid = objectid.ObjectId()
@@ -147,7 +144,7 @@ class ClaimController(storage.ClaimBase):
# Get a list of active, not claimed nor expired
# messages that could be claimed.
msgs = msg_ctrl.active(qid, fields={'_id': 1})
msgs = msg_ctrl.active(queue, fields={'_id': 1}, project=project)
msgs = msgs.limit(limit)
messages = iter([])
@@ -177,7 +174,8 @@ class ClaimController(storage.ClaimBase):
# `expires` on messages that would
# expire before claim.
new_values = {'e': message_expires, 't': message_ttl}
msg_ctrl._col.update({'q': qid,
msg_ctrl._col.update({'q': queue,
'p': project,
'e': {'$lt': message_expires},
'c.id': oid},
{'$set': new_values},
@@ -204,9 +202,9 @@ class ClaimController(storage.ClaimBase):
if now > expires:
raise ValueError('New ttl will make the claim expires')
qid = self._get_queue_id(queue, project)
msg_ctrl = self.driver.message_controller
claimed = msg_ctrl.claimed(qid, cid, expires=now, limit=1)
claimed = msg_ctrl.claimed(queue, cid, expires=now,
limit=1, project=project)
try:
next(claimed)
@@ -219,7 +217,7 @@ class ClaimController(storage.ClaimBase):
'e': expires,
}
msg_ctrl._col.update({'q': qid, 'c.id': cid},
msg_ctrl._col.update({'q': queue, 'p': project, 'c.id': cid},
{'$set': {'c': meta}},
upsert=False, multi=True)
@@ -227,7 +225,8 @@ class ClaimController(storage.ClaimBase):
# This sets the expiration time to
# `expires` on messages that would
# expire before claim.
msg_ctrl._col.update({'q': qid,
msg_ctrl._col.update({'q': queue,
'p': project,
'e': {'$lt': expires},
'c.id': cid},
{'$set': {'e': expires, 't': ttl}},
@@ -235,11 +234,5 @@ class ClaimController(storage.ClaimBase):
@utils.raises_conn_error
def delete(self, queue, claim_id, project=None):
try:
qid = self._get_queue_id(queue, project)
except exceptions.QueueDoesNotExist:
# Fail silently on bad queue/project
return
msg_ctrl = self.driver.message_controller
msg_ctrl.unclaim(qid, claim_id)
msg_ctrl.unclaim(queue, claim_id, project=project)

View File

@@ -25,7 +25,6 @@ import collections
import datetime
import time
from bson import objectid
import pymongo.errors
import marconi.openstack.common.log as logging
@@ -75,6 +74,7 @@ class MessageController(storage.MessageBase):
# specific message. (see `get`)
self.active_fields = [
('q', 1),
('p', 1),
('k', 1),
('e', 1),
('c.e', 1),
@@ -87,6 +87,7 @@ class MessageController(storage.MessageBase):
# Index used for claims
self.claimed_fields = [
('q', 1),
('p', 1),
('c.id', 1),
('k', 1),
('c.e', 1),
@@ -106,7 +107,7 @@ class MessageController(storage.MessageBase):
# to miss a message when there is more than one
# producer posting messages to the same queue, in
# parallel.
self._col.ensure_index([('q', 1), ('k', -1)],
self._col.ensure_index([('q', 1), ('p', 1), ('k', -1)],
name='queue_marker',
unique=True,
background=True)
@@ -118,10 +119,10 @@ class MessageController(storage.MessageBase):
def _get_queue_id(self, queue, project=None):
return self._queue_controller._get_id(queue, project)
def _get_queue_ids(self):
return self._queue_controller._get_ids()
def _get_queue_np(self):
return self._queue_controller._get_np()
def _next_marker(self, queue_id):
def _next_marker(self, queue, project=None):
"""Retrieves the next message marker for a given queue.
This helper is used to generate monotonic pagination
@@ -140,11 +141,12 @@ class MessageController(storage.MessageBase):
mitigate race conditions between producer and
observer clients.
:param queue_id: queue ID
:param queue: queue name
:param project: Queue's project
:returns: next message marker as an integer
"""
document = self._col.find_one({'q': queue_id},
document = self._col.find_one({'q': queue, 'p': project},
sort=[('k', -1)],
fields={'k': 1, '_id': 0})
@@ -166,20 +168,21 @@ class MessageController(storage.MessageBase):
time.sleep(seconds)
def _count_expired(self, queue_id):
def _count_expired(self, queue_name, project=None):
"""Counts the number of expired messages in a queue.
:param queue_id: id for the queue to stat
"""
query = {
'q': queue_id,
'p': project,
'q': queue_name,
'e': {'$lte': timeutils.utcnow()},
}
return self._col.find(query).count()
def _remove_expired(self, queue_id):
def _remove_expired(self, queue_name, project):
"""Removes all expired messages except for the most recent
in each queue.
@@ -190,28 +193,32 @@ class MessageController(storage.MessageBase):
Note that expired messages are only removed if their count
exceeds options.CFG.gc_threshold.
:param queue_id: id for the queue from which to remove
:param queue_name: name for the queue from which to remove
expired messages
:param project: Project queue_name belong's too
"""
if options.CFG.gc_threshold <= self._count_expired(queue_id):
expired_msgs = self._count_expired(queue_name, project)
if options.CFG.gc_threshold <= expired_msgs:
# Get the message with the highest marker, and leave
# it in the queue
head = self._col.find_one({'q': queue_id},
sort=[('k', -1)],
fields={'_id': 1})
# NOTE(flaper87): Keep the counter in a separate record and
# lets remove all messages.
head = self._col.find_one({'q': queue_name, 'p': project},
sort=[('k', -1)], fields={'_id': 1})
if head is None:
# Assume queue was just deleted via a parallel request
LOG.warning(_('Queue %s is empty or missing.') % queue_id)
LOG.warning(_('Queue %s is empty or missing.') % queue_name)
return
# NOTE(flaper87): Can we use k instead of
# _id here? The active index will cover
# the previous query and the the remove
# one.
# the previous query and the remove one.
query = {
'q': queue_id,
'p': project,
'q': queue_name,
'e': {'$lte': timeutils.utcnow()},
'_id': {'$ne': head['_id']}
}
@@ -230,17 +237,14 @@ class MessageController(storage.MessageBase):
:param queue: name of the queue to purge
:param project: name of the project to which the queue belongs
"""
try:
qid = self._get_queue_id(queue, project)
self._col.remove({'q': qid}, w=0)
except exceptions.QueueDoesNotExist:
pass
self._col.remove({'q': queue, 'p': project}, w=0)
def _list(self, queue_id, marker=None, echo=False,
client_uuid=None, fields=None, include_claimed=False):
def _list(self, queue_name, marker=None, echo=False, client_uuid=None,
fields=None, include_claimed=False, project=None):
"""Message document listing helper.
:param queue_id: ObjectID of the queue to list
:param queue_name: Name of the queue to list
:param project: Project `queue_name` belongs to.
:param marker: Message marker from which to start iterating
:param echo: Whether to return messages that match client_uuid
:param client_uuid: UUID for the client that originated this request
@@ -254,9 +258,12 @@ class MessageController(storage.MessageBase):
now = timeutils.utcnow()
query = {
# Messages must belong to this queue
'q': queue_id,
# The messages can not be expired
# Messages must belong to this
# queue and project
'p': project,
'q': queue_name,
# The messages cannot be expired
'e': {'$gt': now},
}
@@ -282,26 +289,20 @@ class MessageController(storage.MessageBase):
# Interface
#-----------------------------------------------------------------------
def active(self, queue_id, marker=None, echo=False,
client_uuid=None, fields=None):
def active(self, queue_name, marker=None, echo=False,
client_uuid=None, fields=None, project=None):
# NOTE(kgriffs): Since this is a public method, queue_id
# might not be an ObjectID. Usually it will be, since active()
# is a utility method, so short-circuit for performance.
if not isinstance(queue_id, objectid.ObjectId):
queue_id = utils.to_oid(queue_id)
return self._list(queue_name, marker, echo, client_uuid,
fields, include_claimed=False, project=project)
return self._list(queue_id, marker, echo, client_uuid, fields,
include_claimed=False)
def claimed(self, queue_id, claim_id=None, expires=None, limit=None):
if not isinstance(queue_id, objectid.ObjectId):
queue_id = utils.to_oid(queue_id)
def claimed(self, queue_name, claim_id=None,
expires=None, limit=None, project=None):
query = {
'c.id': claim_id,
'c.e': {'$gt': expires or timeutils.utcnow()},
'q': queue_id,
'q': queue_name,
'p': project,
}
if not claim_id:
@@ -323,18 +324,17 @@ class MessageController(storage.MessageBase):
return utils.HookedCursor(msgs, denormalizer)
def unclaim(self, queue_id, claim_id):
def unclaim(self, queue_name, claim_id, project=None):
try:
qid = utils.to_oid(queue_id)
cid = utils.to_oid(claim_id)
except ValueError:
return
self._col.update({'q': qid, 'c.id': cid},
self._col.update({'q': queue_name, 'p': project, 'c.id': cid},
{'$set': {'c': {'id': None, 'e': 0}}},
upsert=False, multi=True)
def remove_expired(self, project=None):
def remove_expired(self):
"""Removes all expired messages except for the most recent
in each queue.
@@ -359,8 +359,8 @@ class MessageController(storage.MessageBase):
# each message inserted (TBD, may cause problematic side-effect),
# and third, by changing the marker algorithm such that it no
# longer depends on retaining the last message in the queue!
for id in self._get_queue_ids():
self._remove_expired(id)
for name, project in self._get_queue_np():
self._remove_expired(name, project)
def list(self, queue, project=None, marker=None, limit=10,
echo=False, client_uuid=None, include_claimed=False):
@@ -371,9 +371,8 @@ class MessageController(storage.MessageBase):
except ValueError:
raise exceptions.MalformedMarker()
qid = self._get_queue_id(queue, project)
messages = self._list(qid, marker, echo, client_uuid,
include_claimed=include_claimed)
messages = self._list(queue, marker, echo, client_uuid,
include_claimed=include_claimed, project=project)
messages = messages.limit(limit)
marker_id = {}
@@ -393,19 +392,19 @@ class MessageController(storage.MessageBase):
mid = utils.to_oid(message_id)
now = timeutils.utcnow()
# Base query, always check expire time
query = {
'q': self._get_queue_id(queue, project),
'e': {'$gt': now},
'_id': mid
'_id': mid,
'q': queue,
'p': project,
'e': {'$gt': now}
}
message = self._col.find_one(query)
message = list(self._col.find(query).limit(1).hint([('_id', 1)]))
if message is None:
if not message:
raise exceptions.MessageDoesNotExist(message_id, queue, project)
return _basic_message(message, now)
return _basic_message(message[0], now)
@utils.raises_conn_error
def bulk_get(self, queue, message_ids, project=None):
@@ -414,12 +413,15 @@ class MessageController(storage.MessageBase):
# Base query, always check expire time
query = {
'q': self._get_queue_id(queue, project),
'e': {'$gt': now},
'q': queue,
'p': project,
'_id': {'$in': message_ids},
'e': {'$gt': now},
}
messages = self._col.find(query)
# NOTE(flaper87): Should this query
# be sorted?
messages = self._col.find(query).hint([('_id', 1)])
def denormalizer(msg):
return _basic_message(msg, now)
@@ -429,10 +431,12 @@ class MessageController(storage.MessageBase):
@utils.raises_conn_error
def post(self, queue, messages, client_uuid, project=None):
now = timeutils.utcnow()
queue_id = self._get_queue_id(queue, project)
# NOTE(flaper87): We need to assert the queue exists
self._get_queue_id(queue, project)
# Set the next basis marker for the first attempt.
next_marker = self._next_marker(queue_id)
next_marker = self._next_marker(queue, project)
# Results are aggregated across all attempts
# NOTE(kgriffs): lazy instantiation
@@ -447,7 +451,8 @@ class MessageController(storage.MessageBase):
message_gen = (
{
't': message['ttl'],
'q': queue_id,
'q': queue,
'p': project,
'e': now + datetime.timedelta(seconds=message['ttl']),
'u': client_uuid,
'c': {'id': None, 'e': now},
@@ -477,9 +482,9 @@ class MessageController(storage.MessageBase):
if attempt != 0:
message = _('%(attempts)d attempt(s) required to post '
'%(num_messages)d messages to queue '
'%(queue_id)s')
message %= dict(queue_id=queue_id, attempts=attempt + 1,
num_messages=len(ids))
'%(queue_name)s and project %(project)s')
message %= dict(queue_name=queue, attempts=attempt + 1,
num_messages=len(ids), project=project)
LOG.debug(message)
@@ -496,7 +501,7 @@ class MessageController(storage.MessageBase):
# TODO(kgriffs): Add transaction ID to help match up loglines
if attempt == 0:
message = _('First attempt failed while adding messages '
'to queue %s for current request') % queue_id
'to queue %s for current request') % queue
LOG.debug(message)
@@ -530,7 +535,7 @@ class MessageController(storage.MessageBase):
# Retry the remaining messages with a new sequence
# of markers.
prepared_messages = cached_messages[failed_index:]
next_marker = self._next_marker(queue_id)
next_marker = self._next_marker(queue, project)
for index, message in enumerate(prepared_messages):
message['k'] = next_marker + index
@@ -548,7 +553,7 @@ class MessageController(storage.MessageBase):
message = _('Hit maximum number of attempts (%(max)s) for queue '
'%(id)s in project %(project)s')
message %= dict(max=options.CFG.max_attempts, id=queue_id,
message %= dict(max=options.CFG.max_attempts, id=queue,
project=project)
LOG.warning(message)
@@ -562,7 +567,8 @@ class MessageController(storage.MessageBase):
mid = utils.to_oid(message_id)
query = {
'q': self._get_queue_id(queue, project),
'q': queue,
'p': project,
'_id': mid
}
@@ -592,7 +598,8 @@ class MessageController(storage.MessageBase):
try:
message_ids = [utils.to_oid(id) for id in message_ids]
query = {
'q': self._get_queue_id(queue, project),
'q': queue,
'p': project,
'_id': {'$in': message_ids},
}

View File

@@ -74,10 +74,10 @@ class QueueController(storage.QueueBase):
queue = self._get(name, project, fields=['_id'])
return queue.get('_id')
def _get_ids(self):
"""Returns a generator producing a list of all queue IDs."""
cursor = self._col.find({}, fields={'_id': 1})
return (doc['_id'] for doc in cursor)
def _get_np(self):
"""Returns a generator producing a list of all queue (n, p)."""
cursor = self._col.find({}, fields={'n': 1, 'p': 1})
return ((doc['n'], doc['p']) for doc in cursor)
#-----------------------------------------------------------------------
# Interface
@@ -139,10 +139,10 @@ class QueueController(storage.QueueBase):
@utils.raises_conn_error
def stats(self, name, project=None):
queue_id = self._get_id(name, project)
self._get_id(name, project)
controller = self.driver.message_controller
active = controller.active(queue_id)
claimed = controller.claimed(queue_id)
active = controller.active(name, project=project)
claimed = controller.claimed(name, project=project)
return {
'actions': 0,

View File

@@ -92,28 +92,29 @@ class MessageController(base.MessageBase):
with self.driver('deferred'):
sql = '''
select id, content, ttl, julianday() * 86400.0 - created
from Messages
where ttl > julianday() * 86400.0 - created
and qid = ?'''
select M.id, content, ttl, julianday() * 86400.0 - created
from Queues as Q join Messages as M
on M.qid = Q.id
where M.ttl > julianday() * 86400.0 - created
and Q.name = ? and Q.project = ?'''
args = [utils.get_qid(self.driver, queue, project)]
args = [queue, project]
if not echo:
sql += '''
and client != ?'''
and M.client != ?'''
args += [client_uuid]
if marker:
sql += '''
and id > ?'''
and M.id > ?'''
args += [utils.marker_decode(marker)]
if not include_claimed:
sql += '''
and id not in (select msgid
from Claims join Locked
on id = cid)'''
and M.id not in (select msgid
from Claims join Locked
on id = cid)'''
sql += '''
limit ?'''
@@ -146,8 +147,8 @@ class MessageController(base.MessageBase):
self.driver.run('''
delete from Messages
where ttl <= julianday() * 86400.0 - created
and qid = ?''', qid)
where ttl <= julianday() * 86400.0 - created
and qid = ?''', qid)
# executemany() sets lastrowid to None, so no matter we manually
# generate the IDs or not, we still need to query for it.

View File

@@ -157,8 +157,7 @@ class MongodbMessageTests(base.MessageControllerTest):
super(MongodbMessageTests, self).tearDown()
def _count_expired(self, queue, project=None):
queue_id = self.queue_controller._get_id(queue, project)
return self.controller._count_expired(queue_id)
return self.controller._count_expired(queue, project)
def test_indexes(self):
col = self.controller._col
@@ -172,16 +171,15 @@ class MongodbMessageTests(base.MessageControllerTest):
iterations = 10
self.queue_controller.create(queue_name)
queue_id = self.queue_controller._get_id(queue_name)
seed_marker1 = self.controller._next_marker(queue_name)
self.assertEqual(seed_marker1, 1, 'First marker is 1')
for i in range(iterations):
self.controller.post(queue_name, [{'ttl': 60}], 'uuid')
marker1 = self.controller._next_marker(queue_id)
marker2 = self.controller._next_marker(queue_id)
marker3 = self.controller._next_marker(queue_id)
marker1 = self.controller._next_marker(queue_name)
marker2 = self.controller._next_marker(queue_name)
marker3 = self.controller._next_marker(queue_name)
self.assertEqual(marker1, marker2)
self.assertEqual(marker2, marker3)
@@ -239,8 +237,7 @@ class MongodbMessageTests(base.MessageControllerTest):
# Sanity-check that the most recent message is the
# one remaining in the queue.
queue = random.choice(queue_names)
queue_id = self.queue_controller._get_id(queue, project)
message = self.driver.db.messages.find_one({'q': queue_id})
message = self.driver.db.messages.find_one({'q': queue, 'p': project})
self.assertEquals(message['k'], messages_per_queue)

View File

@@ -160,7 +160,7 @@ class MessagesBaseTest(base.TestBase):
self.assertEquals(self.srmock.status, falcon.HTTP_204)
self.simulate_get(target, self.project_id, query_string=params)
self.assertEquals(self.srmock.status, falcon.HTTP_404)
self.assertEquals(self.srmock.status, falcon.HTTP_204)
# Safe to delete non-existing ones
self.simulate_delete(target, self.project_id, query_string=params)
@@ -214,7 +214,7 @@ class MessagesBaseTest(base.TestBase):
self.simulate_get('/v1/queues/nonexistent/messages', self.project_id,
headers=self.headers)
self.assertEquals(self.srmock.status, falcon.HTTP_404)
self.assertEquals(self.srmock.status, falcon.HTTP_204)
def test_list_with_bad_marker(self):
path = self.queue_path + '/messages'

View File

@@ -55,7 +55,7 @@ class CollectionResource(object):
# Prepare response
messages = list(messages)
if not messages:
raise falcon.HTTPNotFound()
return None
base_path += '/'
for each_message in messages:
@@ -200,14 +200,14 @@ class CollectionResource(object):
ids = req.get_param_as_list('ids')
if ids is None:
response = self._get(req, project_id, queue_name)
if response is None:
resp.status = falcon.HTTP_204
return
else:
base_path = req.path + '/messages'
response = self._get_by_id(base_path, project_id, queue_name, ids)
if response is None:
resp.status = falcon.HTTP_204
return
resp.body = helpers.to_json(response)
def on_delete(self, req, resp, project_id, queue_name):