Use default pool for queue listing
While we're properly use a default pool when manipulating a specific queue, we don't use it when actually listing queues. This fixes it by using the same mechanism. Change-Id: Ie2329389bf92793af0c8d7a676fcada757fd26d7 Closes-Bug: #1533668
This commit is contained in:
parent
edfb7ec3b8
commit
460ecce2ec
@ -152,16 +152,14 @@ class QueueController(storage.Queue):
|
|||||||
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
|
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
|
||||||
|
|
||||||
def all_pages():
|
def all_pages():
|
||||||
cursor = self._pool_catalog._pools_ctrl.list(limit=0)
|
pool = self._pool_catalog.get_default_pool()
|
||||||
pools_list = list(next(cursor))
|
if pool is None:
|
||||||
anypool = pools_list and pools_list[0]
|
raise errors.NoPoolFound()
|
||||||
if anypool:
|
yield next(pool.queue_controller.list(
|
||||||
yield next(self._pool_catalog.get_driver(anypool['name'])
|
project=project,
|
||||||
.queue_controller.list(
|
marker=marker,
|
||||||
project=project,
|
limit=limit,
|
||||||
marker=marker,
|
detailed=detailed))
|
||||||
limit=limit,
|
|
||||||
detailed=detailed))
|
|
||||||
|
|
||||||
# make a heap compared with 'name'
|
# make a heap compared with 'name'
|
||||||
ls = heapq.merge(*[
|
ls = heapq.merge(*[
|
||||||
@ -589,27 +587,14 @@ class Catalog(object):
|
|||||||
target = self.lookup(queue, project)
|
target = self.lookup(queue, project)
|
||||||
return target and target.subscription_controller
|
return target and target.subscription_controller
|
||||||
|
|
||||||
def lookup(self, queue, project=None):
|
def get_default_pool(self, use_listing=True):
|
||||||
"""Lookup a pool driver for the given queue and project.
|
if use_listing:
|
||||||
|
cursor = self._pools_ctrl.list(limit=0)
|
||||||
:param queue: Name of the queue for which to find a pool
|
pools_list = list(next(cursor))
|
||||||
:param project: Project to which the queue belongs, or
|
if pools_list:
|
||||||
None to specify the "global" or "generic" project.
|
return self.get_driver(pools_list[0]['name'])
|
||||||
|
|
||||||
:returns: A storage driver instance for the appropriate pool. If
|
|
||||||
the driver does not exist yet, it is created and cached. If the
|
|
||||||
queue is not mapped, returns None.
|
|
||||||
:rtype: Maybe DataDriver
|
|
||||||
"""
|
|
||||||
|
|
||||||
try:
|
|
||||||
pool_id = self._pool_id(queue, project)
|
|
||||||
except errors.QueueNotMapped as ex:
|
|
||||||
LOG.debug(ex)
|
|
||||||
|
|
||||||
if not self._catalog_conf.enable_virtual_pool:
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
if self._catalog_conf.enable_virtual_pool:
|
||||||
conf_section = ('drivers:message_store:%s' %
|
conf_section = ('drivers:message_store:%s' %
|
||||||
self._conf.drivers.message_store)
|
self._conf.drivers.message_store)
|
||||||
|
|
||||||
@ -643,6 +628,26 @@ class Catalog(object):
|
|||||||
# store.
|
# store.
|
||||||
return self.get_driver(None, pool_conf)
|
return self.get_driver(None, pool_conf)
|
||||||
|
|
||||||
|
def lookup(self, queue, project=None):
|
||||||
|
"""Lookup a pool driver for the given queue and project.
|
||||||
|
|
||||||
|
:param queue: Name of the queue for which to find a pool
|
||||||
|
:param project: Project to which the queue belongs, or
|
||||||
|
None to specify the "global" or "generic" project.
|
||||||
|
|
||||||
|
:returns: A storage driver instance for the appropriate pool. If
|
||||||
|
the driver does not exist yet, it is created and cached. If the
|
||||||
|
queue is not mapped, returns None.
|
||||||
|
:rtype: Maybe DataDriver
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
pool_id = self._pool_id(queue, project)
|
||||||
|
except errors.QueueNotMapped as ex:
|
||||||
|
LOG.debug(ex)
|
||||||
|
|
||||||
|
return self.get_default_pool(use_listing=False)
|
||||||
|
|
||||||
return self.get_driver(pool_id)
|
return self.get_driver(pool_id)
|
||||||
|
|
||||||
def get_driver(self, pool_id, pool_conf=None):
|
def get_driver(self, pool_id, pool_conf=None):
|
||||||
|
@ -96,6 +96,9 @@ class CollectionResource(object):
|
|||||||
self._validate.queue_listing(**kwargs)
|
self._validate.queue_listing(**kwargs)
|
||||||
results = self._queue_controller.list(project=project_id, **kwargs)
|
results = self._queue_controller.list(project=project_id, **kwargs)
|
||||||
|
|
||||||
|
# Buffer list of queues
|
||||||
|
queues = list(next(results))
|
||||||
|
|
||||||
except validation.ValidationFailed as ex:
|
except validation.ValidationFailed as ex:
|
||||||
LOG.debug(ex)
|
LOG.debug(ex)
|
||||||
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||||
@ -105,9 +108,6 @@ class CollectionResource(object):
|
|||||||
description = _(u'Queues could not be listed.')
|
description = _(u'Queues could not be listed.')
|
||||||
raise wsgi_errors.HTTPServiceUnavailable(description)
|
raise wsgi_errors.HTTPServiceUnavailable(description)
|
||||||
|
|
||||||
# Buffer list of queues
|
|
||||||
queues = list(next(results))
|
|
||||||
|
|
||||||
# Check for an empty list
|
# Check for an empty list
|
||||||
if len(queues) == 0:
|
if len(queues) == 0:
|
||||||
resp.status = falcon.HTTP_204
|
resp.status = falcon.HTTP_204
|
||||||
|
@ -123,6 +123,9 @@ class CollectionResource(object):
|
|||||||
self._validate.queue_listing(**kwargs)
|
self._validate.queue_listing(**kwargs)
|
||||||
results = self._queue_controller.list(project=project_id, **kwargs)
|
results = self._queue_controller.list(project=project_id, **kwargs)
|
||||||
|
|
||||||
|
# Buffer list of queues
|
||||||
|
queues = list(next(results))
|
||||||
|
|
||||||
except validation.ValidationFailed as ex:
|
except validation.ValidationFailed as ex:
|
||||||
LOG.debug(ex)
|
LOG.debug(ex)
|
||||||
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||||
@ -132,9 +135,6 @@ class CollectionResource(object):
|
|||||||
description = _(u'Queues could not be listed.')
|
description = _(u'Queues could not be listed.')
|
||||||
raise wsgi_errors.HTTPServiceUnavailable(description)
|
raise wsgi_errors.HTTPServiceUnavailable(description)
|
||||||
|
|
||||||
# Buffer list of queues
|
|
||||||
queues = list(next(results))
|
|
||||||
|
|
||||||
# Got some. Prepare the response.
|
# Got some. Prepare the response.
|
||||||
kwargs['marker'] = next(results) or kwargs.get('marker', '')
|
kwargs['marker'] = next(results) or kwargs.get('marker', '')
|
||||||
for each_queue in queues:
|
for each_queue in queues:
|
||||||
|
@ -130,6 +130,9 @@ class CollectionResource(object):
|
|||||||
self._validate.queue_listing(**kwargs)
|
self._validate.queue_listing(**kwargs)
|
||||||
results = self._queue_controller.list(project=project_id, **kwargs)
|
results = self._queue_controller.list(project=project_id, **kwargs)
|
||||||
|
|
||||||
|
# Buffer list of queues
|
||||||
|
queues = list(next(results))
|
||||||
|
|
||||||
except validation.ValidationFailed as ex:
|
except validation.ValidationFailed as ex:
|
||||||
LOG.debug(ex)
|
LOG.debug(ex)
|
||||||
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||||
@ -139,9 +142,6 @@ class CollectionResource(object):
|
|||||||
description = _(u'Queues could not be listed.')
|
description = _(u'Queues could not be listed.')
|
||||||
raise wsgi_errors.HTTPServiceUnavailable(description)
|
raise wsgi_errors.HTTPServiceUnavailable(description)
|
||||||
|
|
||||||
# Buffer list of queues
|
|
||||||
queues = list(next(results))
|
|
||||||
|
|
||||||
# Got some. Prepare the response.
|
# Got some. Prepare the response.
|
||||||
kwargs['marker'] = next(results) or kwargs.get('marker', '')
|
kwargs['marker'] = next(results) or kwargs.get('marker', '')
|
||||||
for each_queue in queues:
|
for each_queue in queues:
|
||||||
|
Loading…
Reference in New Issue
Block a user