chore: remove queue_id cruft in mongo driver
We store queue name in message schema; there is no need to query for queue ID. Some `queue_id` param are left in comments, this patch replaces them with `queue_name`. Change-Id: I260007f0d6091b46755f374d9e237c0a7ab372ef
This commit is contained in:
@@ -56,10 +56,6 @@ class ClaimController(storage.ClaimBase):
|
||||
the claim id and it's expiration timestamp.
|
||||
"""
|
||||
|
||||
def _get_queue_id(self, queue, project):
|
||||
queue_controller = self.driver.queue_controller
|
||||
return queue_controller._get_id(queue, project)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def get(self, queue, claim_id, project=None):
|
||||
msg_ctrl = self.driver.message_controller
|
||||
@@ -122,7 +118,8 @@ class ClaimController(storage.ClaimBase):
|
||||
"""
|
||||
msg_ctrl = self.driver.message_controller
|
||||
|
||||
self._get_queue_id(queue, project)
|
||||
if not self.driver.queue_controller.exists(queue, project):
|
||||
raise exceptions.QueueDoesNotExist(queue, project)
|
||||
|
||||
ttl = metadata['ttl']
|
||||
grace = metadata['grace']
|
||||
|
||||
@@ -42,7 +42,7 @@ class MessageController(storage.MessageBase):
|
||||
Messages:
|
||||
Name Field
|
||||
-----------------
|
||||
queue_id -> q
|
||||
queue_name -> q
|
||||
expires -> e
|
||||
ttl -> t
|
||||
uuid -> u
|
||||
@@ -115,12 +115,6 @@ class MessageController(storage.MessageBase):
|
||||
# Helpers
|
||||
#-----------------------------------------------------------------------
|
||||
|
||||
def _get_queue_id(self, queue, project=None):
|
||||
return self._queue_controller._get_id(queue, project)
|
||||
|
||||
def _get_queue_np(self):
|
||||
return self._queue_controller._get_np()
|
||||
|
||||
def _next_marker(self, queue_name, project=None):
|
||||
"""Retrieves the next message marker for a given queue.
|
||||
|
||||
@@ -169,7 +163,7 @@ class MessageController(storage.MessageBase):
|
||||
def _count_expired(self, queue_name, project=None):
|
||||
"""Counts the number of expired messages in a queue.
|
||||
|
||||
:param queue_id: id for the queue to stat
|
||||
:param queue_name: Name of the queue to stat
|
||||
"""
|
||||
|
||||
query = {
|
||||
@@ -315,7 +309,7 @@ class MessageController(storage.MessageBase):
|
||||
def first(self, queue_name, project=None, sort=1):
|
||||
"""Get first message in the queue (including claimed).
|
||||
|
||||
:param queue_id: ObjectID of the queue to list
|
||||
:param queue_name: Name of the queue to list
|
||||
:param sort: (Default 1) Sort order for the listing. Pass 1 for
|
||||
ascending (oldest message first), or -1 for descending (newest
|
||||
message first).
|
||||
@@ -407,7 +401,7 @@ class MessageController(storage.MessageBase):
|
||||
# each message inserted (TBD, may cause problematic side-effect),
|
||||
# and third, by changing the marker algorithm such that it no
|
||||
# longer depends on retaining the last message in the queue!
|
||||
for name, project in self._get_queue_np():
|
||||
for name, project in self._queue_controller._get_np():
|
||||
self._remove_expired(name, project)
|
||||
|
||||
def list(self, queue_name, project=None, marker=None, limit=10,
|
||||
@@ -492,8 +486,8 @@ class MessageController(storage.MessageBase):
|
||||
def post(self, queue_name, messages, client_uuid, project=None):
|
||||
now = timeutils.utcnow()
|
||||
|
||||
# NOTE(flaper87): We need to assert the queue exists
|
||||
self._get_queue_id(queue_name, project)
|
||||
if not self._queue_controller.exists(queue_name, project):
|
||||
raise exceptions.QueueDoesNotExist(queue_name, project)
|
||||
|
||||
# Set the next basis marker for the first attempt.
|
||||
next_marker = self._next_marker(queue_name, project)
|
||||
|
||||
@@ -67,14 +67,6 @@ class QueueController(storage.QueueBase):
|
||||
|
||||
return queue
|
||||
|
||||
def _get_id(self, name, project=None):
|
||||
"""Just like the `get` method, but only returns the queue's id
|
||||
|
||||
:returns: Queue's `ObjectId`
|
||||
"""
|
||||
queue = self._get(name, project, fields=['_id'])
|
||||
return queue.get('_id')
|
||||
|
||||
def _get_np(self):
|
||||
"""Returns a generator producing a list of all queue (n, p)."""
|
||||
cursor = self._col.find({}, fields={'n': 1, 'p': 1})
|
||||
|
||||
@@ -122,13 +122,12 @@ class MongodbQueueTests(base.QueueControllerTest):
|
||||
def test_messages_purged(self):
|
||||
queue_name = 'test'
|
||||
self.controller.create(queue_name)
|
||||
qid = self.controller._get_id(queue_name)
|
||||
self.message_controller.post(queue_name,
|
||||
[{'ttl': 60}],
|
||||
1234)
|
||||
self.controller.delete(queue_name)
|
||||
col = self.message_controller._col
|
||||
self.assertEqual(col.find({'q': qid}).count(), 0)
|
||||
self.assertEqual(col.find({'q': queue_name}).count(), 0)
|
||||
|
||||
def test_raises_connection_error(self):
|
||||
|
||||
|
||||
Reference in New Issue
Block a user