Support list queue pagination in storage.
Change-Id: Ifc87f84518593ce25a9b18fd62dbe4d5cd39006f Implements: blueprint storage-reference Implements: blueprint storage-mongodb
This commit is contained in:
@@ -55,12 +55,30 @@ class QueueController(storage.QueueBase):
|
|||||||
# as specific tenant, for example. Order Matters!
|
# as specific tenant, for example. Order Matters!
|
||||||
self._col.ensure_index([("t", 1), ("n", 1)], unique=True)
|
self._col.ensure_index([("t", 1), ("n", 1)], unique=True)
|
||||||
|
|
||||||
def list(self, tenant=None):
|
def list(self, tenant=None, marker=None,
|
||||||
cursor = self._col.find({"t": tenant}, fields=dict(n=1, m=1, _id=0))
|
limit=10, detailed=False):
|
||||||
for queue in cursor:
|
query = {"t": tenant}
|
||||||
queue["name"] = queue.pop("n")
|
if marker:
|
||||||
queue["metadata"] = queue.pop("m", {})
|
query["n"] = {"$gt": marker}
|
||||||
yield queue
|
|
||||||
|
fields = {"n": 1, "_id": 0}
|
||||||
|
if detailed:
|
||||||
|
fields["m"] = 1
|
||||||
|
|
||||||
|
cursor = self._col.find(query, fields=fields)
|
||||||
|
cursor = cursor.limit(limit).sort("n")
|
||||||
|
marker_name = {}
|
||||||
|
|
||||||
|
def normalizer(records):
|
||||||
|
for rec in records:
|
||||||
|
queue = {"name": rec["n"]}
|
||||||
|
marker_name["next"] = queue["name"]
|
||||||
|
if detailed:
|
||||||
|
queue["metadata"] = rec["m"]
|
||||||
|
yield queue
|
||||||
|
|
||||||
|
yield normalizer(cursor)
|
||||||
|
yield marker_name["next"]
|
||||||
|
|
||||||
def _get(self, name, tenant=None, fields={"m": 1, "_id": 0}):
|
def _get(self, name, tenant=None, fields={"m": 1, "_id": 0}):
|
||||||
queue = self._col.find_one({"t": tenant, "n": name}, fields=fields)
|
queue = self._col.find_one({"t": tenant, "n": name}, fields=fields)
|
||||||
|
|||||||
@@ -34,13 +34,38 @@ class Queue(base.QueueBase):
|
|||||||
)
|
)
|
||||||
''')
|
''')
|
||||||
|
|
||||||
def list(self, tenant):
|
def list(self, tenant, marker=None,
|
||||||
records = self.driver.run('''
|
limit=10, detailed=False):
|
||||||
select name, metadata from Queues
|
sql = (('''
|
||||||
where tenant = ?''', tenant)
|
select name from Queues''' if not detailed
|
||||||
|
else '''
|
||||||
|
select name, metadata from Queues''') +
|
||||||
|
'''
|
||||||
|
where tenant = ?''')
|
||||||
|
args = [tenant]
|
||||||
|
|
||||||
for k, v in records:
|
if marker:
|
||||||
yield {'name': k, 'metadata': v}
|
sql += '''
|
||||||
|
and name > ?'''
|
||||||
|
args += [marker]
|
||||||
|
|
||||||
|
sql += '''
|
||||||
|
order by name
|
||||||
|
limit ?'''
|
||||||
|
args += [limit]
|
||||||
|
|
||||||
|
records = self.driver.run(sql, *args)
|
||||||
|
marker_name = {}
|
||||||
|
|
||||||
|
def it():
|
||||||
|
for rec in records:
|
||||||
|
marker_name['next'] = rec[0]
|
||||||
|
yield ({'name': rec[0]} if not detailed
|
||||||
|
else
|
||||||
|
{'name': rec[0], 'metadata': rec[1]})
|
||||||
|
|
||||||
|
yield it()
|
||||||
|
yield marker_name['next']
|
||||||
|
|
||||||
def get(self, name, tenant):
|
def get(self, name, tenant):
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -50,19 +50,27 @@ class QueueControllerTest(ControllerBaseTest):
|
|||||||
self.claim_controller = self.driver.claim_controller
|
self.claim_controller = self.driver.claim_controller
|
||||||
|
|
||||||
def test_list(self):
|
def test_list(self):
|
||||||
num = 4
|
num = 15
|
||||||
for queue in xrange(num):
|
for queue in xrange(num):
|
||||||
self.controller.upsert(queue, {}, tenant=self.tenant)
|
self.controller.upsert(queue, {}, tenant=self.tenant)
|
||||||
|
|
||||||
queues = self.controller.list(tenant=self.tenant)
|
interaction = self.controller.list(tenant=self.tenant,
|
||||||
|
detailed=True)
|
||||||
|
queues = list(interaction.next())
|
||||||
|
|
||||||
counter = 0
|
self.assertEquals(all(map(lambda queue:
|
||||||
for queue in queues:
|
'name' in queue and
|
||||||
self.assertEqual(len(queue), 2)
|
'metadata' in queue, queues)), True)
|
||||||
self.assertIn("name", queue)
|
self.assertEquals(len(queues), 10)
|
||||||
self.assertIn("metadata", queue)
|
|
||||||
counter += 1
|
interaction = self.controller.list(tenant=self.tenant,
|
||||||
self.assertEqual(counter, num)
|
marker=interaction.next())
|
||||||
|
queues = list(interaction.next())
|
||||||
|
|
||||||
|
self.assertEquals(all(map(lambda queue:
|
||||||
|
'name' in queue and
|
||||||
|
'metadata' not in queue, queues)), True)
|
||||||
|
self.assertEquals(len(queues), 5)
|
||||||
|
|
||||||
def test_queue_lifecycle(self):
|
def test_queue_lifecycle(self):
|
||||||
# Test Queue Creation
|
# Test Queue Creation
|
||||||
|
|||||||
@@ -100,9 +100,9 @@ class CollectionResource(object):
|
|||||||
def on_get(self, req, resp, tenant_id):
|
def on_get(self, req, resp, tenant_id):
|
||||||
resp_dict = {}
|
resp_dict = {}
|
||||||
try:
|
try:
|
||||||
queues = self.queue_ctrl.list(tenant_id)
|
interaction = self.queue_ctrl.list(tenant_id, detailed=True)
|
||||||
|
|
||||||
resp_dict['queues'] = list(queues)
|
resp_dict['queues'] = list(interaction.next())
|
||||||
for queue in resp_dict['queues']:
|
for queue in resp_dict['queues']:
|
||||||
queue['href'] = req.path + '/' + queue['name']
|
queue['href'] = req.path + '/' + queue['name']
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user