Queue stats in MongoDB.

The patch implements stats methods for mongodb's queue controller and
uses the `MessageController.active` method to get valid messages present
in the queue.

Implements: blueprint storage-mongodb

Change-Id: I8a2d32ebeef9a8dd4b2131b522f026d9c5d0cd6e
This commit is contained in:
Zhihao Yuan
2013-04-04 19:15:41 -04:00
committed by Flaper Fesp
parent e8eb8d1d78
commit bcf2ee589c
2 changed files with 48 additions and 38 deletions

View File

@@ -93,7 +93,15 @@ class QueueController(storage.QueueBase):
self._col.remove({"t": tenant, "n": name})
def stats(self, name, tenant=None):
raise NotImplementedError
msg_ctrl = self.driver.message_controller
# NOTE(flaper87): Should we split this into
# total, active and expired ?
msgs = msg_ctrl.active(name, tenant=tenant).count()
return {
"actions": 0,
"messages": msgs
}
def actions(self, name, tenant=None, marker=None, limit=10):
raise NotImplementedError
@@ -128,8 +136,7 @@ class MessageController(storage.MessageBase):
return queue_controller.get_id(queue, tenant)
def active(self, queue, tenant=None, marker=None,
limit=10, echo=False, client_uuid=None,
fields=None):
echo=False, client_uuid=None, fields=None):
now = timeutils.utcnow_ts()
query = {"$or": [
@@ -157,9 +164,7 @@ class MessageController(storage.MessageBase):
if fields and not isinstance(fields, (dict, list)):
raise TypeError(_("Fields must be an instance of list / dict"))
return self._col.find(query, limit=limit,
sort=[("_id", 1)],
fields=fields)
return self._col.find(query, fields=fields)
def claimed(self, claim_id=None, expires=None, limit=None):
@@ -197,8 +202,8 @@ class MessageController(storage.MessageBase):
def list(self, queue, tenant=None, marker=None,
limit=10, echo=False, client_uuid=None):
messages = self.active(queue, tenant, marker,
limit, echo, client_uuid)
messages = self.active(queue, tenant, marker, echo, client_uuid)
messages = messages.limit(limit).sort("_id")
now = timeutils.utcnow_ts()
for msg in messages:
@@ -375,11 +380,14 @@ class ClaimController(storage.ClaimBase):
# Get a list of active, not claimed nor expired
# messages that could be claimed.
msgs = msg_ctrl.active(queue, tenant=tenant,
limit=limit, fields={"_id": 1})
msgs = msg_ctrl.active(queue, tenant=tenant, fields={"_id": 1})
msgs = msgs.limit(limit).sort("_id")
messages = iter([])
if msgs.count() == 0:
# Lets respect the limit
# during the count
if msgs.count(True) == 0:
return (str(oid), messages)
ids = [msg["_id"] for msg in msgs]

View File

@@ -44,6 +44,10 @@ class QueueControllerTest(ControllerBaseTest):
"""
controller_base_class = storage.QueueBase
def setUp(self):
super(QueueControllerTest, self).setUp()
self.message_controller = self.driver.message_controller
def test_list(self):
num = 4
for queue in xrange(num):
@@ -78,6 +82,13 @@ class QueueControllerTest(ControllerBaseTest):
queue = self.controller.get("test", tenant=self.tenant)
self.assertEqual(queue["meta"], "test_meta")
# Test Queue Statistic
_insert_fixtures(self.message_controller, "test",
tenant=self.tenant, client_uuid="my_uuid", num=12)
stats = self.controller.stats("test", tenant=self.tenant)
self.assertEqual(stats['messages'], 12)
# Test Queue Deletion
self.controller.delete("test", tenant=self.tenant)
@@ -110,18 +121,6 @@ class MessageControllerTest(ControllerBaseTest):
self.queue_controller.delete(self.queue_name)
super(MessageControllerTest, self).tearDown()
def insert_fixtures(self, client_uuid=None, num=4):
def messages():
for n in xrange(num):
yield {
"ttl": 60,
"body": {
"event": "Event number %s" % n
}}
self.controller.post(self.queue_name, messages(),
tenant=self.tenant, client_uuid=client_uuid)
def test_message_lifecycle(self):
queue_name = self.queue_name
@@ -153,7 +152,8 @@ class MessageControllerTest(ControllerBaseTest):
tenant=self.tenant)
def test_qet_multi(self):
self.insert_fixtures(client_uuid="my_uuid", num=20)
_insert_fixtures(self.controller, self.queue_name,
tenant=self.tenant, client_uuid="my_uuid", num=20)
def load_messages(expected, *args, **kwargs):
msgs = list(self.controller.list(*args, **kwargs))
@@ -207,21 +207,9 @@ class ClaimControllerTest(ControllerBaseTest):
self.queue_controller.delete(self.queue_name)
super(ClaimControllerTest, self).tearDown()
def insert_fixtures(self, client_uuid=None, num=4):
def messages():
for n in xrange(num):
yield {
"ttl": 60 + num,
"body": {
"event": "Event number %s" % n
}}
self.message_controller.post(self.queue_name, messages(),
tenant=self.tenant,
client_uuid=client_uuid)
def test_claim_lifecycle(self):
self.insert_fixtures(client_uuid="my_uuid", num=20)
_insert_fixtures(self.message_controller, self.queue_name,
tenant=self.tenant, client_uuid="my_uuid", num=20)
meta = {"ttl": 70}
@@ -264,3 +252,17 @@ class ClaimControllerTest(ControllerBaseTest):
self.assertRaises(storage.exceptions.ClaimDoesNotExist,
self.controller.get, self.queue_name,
claim_id, tenant=self.tenant)
def _insert_fixtures(controller, queue_name, tenant=None,
client_uuid=None, num=4):
def messages():
for n in xrange(num):
yield {
"ttl": 60,
"body": {
"event": "Event number %s" % n
}}
controller.post(queue_name, messages(),
tenant=tenant, client_uuid=client_uuid)