From b10761d456672f94c72e261df816e4c6c1cf0bae Mon Sep 17 00:00:00 2001 From: Zhihao Yuan Date: Tue, 23 Apr 2013 10:54:54 -0400 Subject: [PATCH] Support list queue pagination in storage. Change-Id: Ifc87f84518593ce25a9b18fd62dbe4d5cd39006f Implements: blueprint storage-reference Implements: blueprint storage-mongodb --- marconi/storage/mongodb/controllers.py | 30 ++++++++++++++++----- marconi/storage/sqlite/controllers.py | 37 +++++++++++++++++++++----- marconi/tests/storage/base.py | 26 +++++++++++------- marconi/transport/wsgi/queues.py | 4 +-- 4 files changed, 74 insertions(+), 23 deletions(-) diff --git a/marconi/storage/mongodb/controllers.py b/marconi/storage/mongodb/controllers.py index ff9c17ea5..5d943ab92 100644 --- a/marconi/storage/mongodb/controllers.py +++ b/marconi/storage/mongodb/controllers.py @@ -55,12 +55,30 @@ class QueueController(storage.QueueBase): # as specific tenant, for example. Order Matters! self._col.ensure_index([("t", 1), ("n", 1)], unique=True) - def list(self, tenant=None): - cursor = self._col.find({"t": tenant}, fields=dict(n=1, m=1, _id=0)) - for queue in cursor: - queue["name"] = queue.pop("n") - queue["metadata"] = queue.pop("m", {}) - yield queue + def list(self, tenant=None, marker=None, + limit=10, detailed=False): + query = {"t": tenant} + if marker: + query["n"] = {"$gt": marker} + + fields = {"n": 1, "_id": 0} + if detailed: + fields["m"] = 1 + + cursor = self._col.find(query, fields=fields) + cursor = cursor.limit(limit).sort("n") + marker_name = {} + + def normalizer(records): + for rec in records: + queue = {"name": rec["n"]} + marker_name["next"] = queue["name"] + if detailed: + queue["metadata"] = rec["m"] + yield queue + + yield normalizer(cursor) + yield marker_name["next"] def _get(self, name, tenant=None, fields={"m": 1, "_id": 0}): queue = self._col.find_one({"t": tenant, "n": name}, fields=fields) diff --git a/marconi/storage/sqlite/controllers.py b/marconi/storage/sqlite/controllers.py index cbefd5327..aa82c8099 100644 --- a/marconi/storage/sqlite/controllers.py +++ b/marconi/storage/sqlite/controllers.py @@ -34,13 +34,38 @@ class Queue(base.QueueBase): ) ''') - def list(self, tenant): - records = self.driver.run(''' - select name, metadata from Queues - where tenant = ?''', tenant) + def list(self, tenant, marker=None, + limit=10, detailed=False): + sql = ((''' + select name from Queues''' if not detailed + else ''' + select name, metadata from Queues''') + + ''' + where tenant = ?''') + args = [tenant] - for k, v in records: - yield {'name': k, 'metadata': v} + if marker: + sql += ''' + and name > ?''' + args += [marker] + + sql += ''' + order by name + limit ?''' + args += [limit] + + records = self.driver.run(sql, *args) + marker_name = {} + + def it(): + for rec in records: + marker_name['next'] = rec[0] + yield ({'name': rec[0]} if not detailed + else + {'name': rec[0], 'metadata': rec[1]}) + + yield it() + yield marker_name['next'] def get(self, name, tenant): try: diff --git a/marconi/tests/storage/base.py b/marconi/tests/storage/base.py index f7a54712e..64eb4872f 100644 --- a/marconi/tests/storage/base.py +++ b/marconi/tests/storage/base.py @@ -50,19 +50,27 @@ class QueueControllerTest(ControllerBaseTest): self.claim_controller = self.driver.claim_controller def test_list(self): - num = 4 + num = 15 for queue in xrange(num): self.controller.upsert(queue, {}, tenant=self.tenant) - queues = self.controller.list(tenant=self.tenant) + interaction = self.controller.list(tenant=self.tenant, + detailed=True) + queues = list(interaction.next()) - counter = 0 - for queue in queues: - self.assertEqual(len(queue), 2) - self.assertIn("name", queue) - self.assertIn("metadata", queue) - counter += 1 - self.assertEqual(counter, num) + self.assertEquals(all(map(lambda queue: + 'name' in queue and + 'metadata' in queue, queues)), True) + self.assertEquals(len(queues), 10) + + interaction = self.controller.list(tenant=self.tenant, + marker=interaction.next()) + queues = list(interaction.next()) + + self.assertEquals(all(map(lambda queue: + 'name' in queue and + 'metadata' not in queue, queues)), True) + self.assertEquals(len(queues), 5) def test_queue_lifecycle(self): # Test Queue Creation diff --git a/marconi/transport/wsgi/queues.py b/marconi/transport/wsgi/queues.py index 931e5d6cd..a36bcbc5a 100644 --- a/marconi/transport/wsgi/queues.py +++ b/marconi/transport/wsgi/queues.py @@ -100,9 +100,9 @@ class CollectionResource(object): def on_get(self, req, resp, tenant_id): resp_dict = {} try: - queues = self.queue_ctrl.list(tenant_id) + interaction = self.queue_ctrl.list(tenant_id, detailed=True) - resp_dict['queues'] = list(queues) + resp_dict['queues'] = list(interaction.next()) for queue in resp_dict['queues']: queue['href'] = req.path + '/' + queue['name']