From bcf2ee589ca82a0c7bdedcb5406829c9e313e037 Mon Sep 17 00:00:00 2001 From: Zhihao Yuan Date: Thu, 4 Apr 2013 19:15:41 -0400 Subject: [PATCH] Queue stats in MongoDB. The patch implements stats methods for mongodb's queue controller and uses the `MessageController.active` method to get valid messages present in the queue. Implements: blueprint storage-mongodb Change-Id: I8a2d32ebeef9a8dd4b2131b522f026d9c5d0cd6e --- marconi/storage/mongodb/controllers.py | 30 +++++++++----- marconi/tests/storage/base.py | 56 +++++++++++++------------- 2 files changed, 48 insertions(+), 38 deletions(-) diff --git a/marconi/storage/mongodb/controllers.py b/marconi/storage/mongodb/controllers.py index a5e17e6ac..6621d9376 100644 --- a/marconi/storage/mongodb/controllers.py +++ b/marconi/storage/mongodb/controllers.py @@ -93,7 +93,15 @@ class QueueController(storage.QueueBase): self._col.remove({"t": tenant, "n": name}) def stats(self, name, tenant=None): - raise NotImplementedError + msg_ctrl = self.driver.message_controller + # NOTE(flaper87): Should we split this into + # total, active and expired ? + msgs = msg_ctrl.active(name, tenant=tenant).count() + + return { + "actions": 0, + "messages": msgs + } def actions(self, name, tenant=None, marker=None, limit=10): raise NotImplementedError @@ -128,8 +136,7 @@ class MessageController(storage.MessageBase): return queue_controller.get_id(queue, tenant) def active(self, queue, tenant=None, marker=None, - limit=10, echo=False, client_uuid=None, - fields=None): + echo=False, client_uuid=None, fields=None): now = timeutils.utcnow_ts() query = {"$or": [ @@ -157,9 +164,7 @@ class MessageController(storage.MessageBase): if fields and not isinstance(fields, (dict, list)): raise TypeError(_("Fields must be an instance of list / dict")) - return self._col.find(query, limit=limit, - sort=[("_id", 1)], - fields=fields) + return self._col.find(query, fields=fields) def claimed(self, claim_id=None, expires=None, limit=None): @@ -197,8 +202,8 @@ class MessageController(storage.MessageBase): def list(self, queue, tenant=None, marker=None, limit=10, echo=False, client_uuid=None): - messages = self.active(queue, tenant, marker, - limit, echo, client_uuid) + messages = self.active(queue, tenant, marker, echo, client_uuid) + messages = messages.limit(limit).sort("_id") now = timeutils.utcnow_ts() for msg in messages: @@ -375,11 +380,14 @@ class ClaimController(storage.ClaimBase): # Get a list of active, not claimed nor expired # messages that could be claimed. - msgs = msg_ctrl.active(queue, tenant=tenant, - limit=limit, fields={"_id": 1}) + msgs = msg_ctrl.active(queue, tenant=tenant, fields={"_id": 1}) + msgs = msgs.limit(limit).sort("_id") messages = iter([]) - if msgs.count() == 0: + + # Lets respect the limit + # during the count + if msgs.count(True) == 0: return (str(oid), messages) ids = [msg["_id"] for msg in msgs] diff --git a/marconi/tests/storage/base.py b/marconi/tests/storage/base.py index 825cfb11b..c34eab0a4 100644 --- a/marconi/tests/storage/base.py +++ b/marconi/tests/storage/base.py @@ -44,6 +44,10 @@ class QueueControllerTest(ControllerBaseTest): """ controller_base_class = storage.QueueBase + def setUp(self): + super(QueueControllerTest, self).setUp() + self.message_controller = self.driver.message_controller + def test_list(self): num = 4 for queue in xrange(num): @@ -78,6 +82,13 @@ class QueueControllerTest(ControllerBaseTest): queue = self.controller.get("test", tenant=self.tenant) self.assertEqual(queue["meta"], "test_meta") + # Test Queue Statistic + _insert_fixtures(self.message_controller, "test", + tenant=self.tenant, client_uuid="my_uuid", num=12) + + stats = self.controller.stats("test", tenant=self.tenant) + self.assertEqual(stats['messages'], 12) + # Test Queue Deletion self.controller.delete("test", tenant=self.tenant) @@ -110,18 +121,6 @@ class MessageControllerTest(ControllerBaseTest): self.queue_controller.delete(self.queue_name) super(MessageControllerTest, self).tearDown() - def insert_fixtures(self, client_uuid=None, num=4): - - def messages(): - for n in xrange(num): - yield { - "ttl": 60, - "body": { - "event": "Event number %s" % n - }} - self.controller.post(self.queue_name, messages(), - tenant=self.tenant, client_uuid=client_uuid) - def test_message_lifecycle(self): queue_name = self.queue_name @@ -153,7 +152,8 @@ class MessageControllerTest(ControllerBaseTest): tenant=self.tenant) def test_qet_multi(self): - self.insert_fixtures(client_uuid="my_uuid", num=20) + _insert_fixtures(self.controller, self.queue_name, + tenant=self.tenant, client_uuid="my_uuid", num=20) def load_messages(expected, *args, **kwargs): msgs = list(self.controller.list(*args, **kwargs)) @@ -207,21 +207,9 @@ class ClaimControllerTest(ControllerBaseTest): self.queue_controller.delete(self.queue_name) super(ClaimControllerTest, self).tearDown() - def insert_fixtures(self, client_uuid=None, num=4): - - def messages(): - for n in xrange(num): - yield { - "ttl": 60 + num, - "body": { - "event": "Event number %s" % n - }} - self.message_controller.post(self.queue_name, messages(), - tenant=self.tenant, - client_uuid=client_uuid) - def test_claim_lifecycle(self): - self.insert_fixtures(client_uuid="my_uuid", num=20) + _insert_fixtures(self.message_controller, self.queue_name, + tenant=self.tenant, client_uuid="my_uuid", num=20) meta = {"ttl": 70} @@ -264,3 +252,17 @@ class ClaimControllerTest(ControllerBaseTest): self.assertRaises(storage.exceptions.ClaimDoesNotExist, self.controller.get, self.queue_name, claim_id, tenant=self.tenant) + + +def _insert_fixtures(controller, queue_name, tenant=None, + client_uuid=None, num=4): + + def messages(): + for n in xrange(num): + yield { + "ttl": 60, + "body": { + "event": "Event number %s" % n + }} + controller.post(queue_name, messages(), + tenant=tenant, client_uuid=client_uuid)