Merge "Fix queues actions for pooling"
This commit is contained in:
commit
a7ec202e71
@ -159,7 +159,7 @@ class DataDriver(storage.DataDriverBase):
|
||||
|
||||
|
||||
class QueueController(storage.Queue):
|
||||
"""Routes operations to a queue controller in the appropriate pool.
|
||||
"""Routes operations to get the appropriate queue controller.
|
||||
|
||||
:param pool_catalog: a catalog of available pools
|
||||
:type pool_catalog: queues.pooling.base.Catalog
|
||||
@ -168,16 +168,14 @@ class QueueController(storage.Queue):
|
||||
def __init__(self, pool_catalog):
|
||||
super(QueueController, self).__init__(None)
|
||||
self._pool_catalog = pool_catalog
|
||||
self._mgt_queue_ctrl = self._pool_catalog.control.queue_controller
|
||||
self._get_controller = self._pool_catalog.get_queue_controller
|
||||
|
||||
def _list(self, project=None, marker=None,
|
||||
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
|
||||
|
||||
def all_pages():
|
||||
pool = self._pool_catalog.get_default_pool()
|
||||
if pool is None:
|
||||
raise errors.NoPoolFound()
|
||||
yield next(pool.queue_controller.list(
|
||||
yield next(self._mgt_queue_ctrl.list(
|
||||
project=project,
|
||||
marker=marker,
|
||||
limit=limit,
|
||||
@ -218,17 +216,15 @@ class QueueController(storage.Queue):
|
||||
# however. If between the time we register a queue and go to
|
||||
# look it up, the queue is deleted, then this assertion will
|
||||
# fail.
|
||||
control = self._get_controller(name, project)
|
||||
if not control:
|
||||
pool = self._pool_catalog.lookup(name, project)
|
||||
if not pool:
|
||||
raise RuntimeError('Failed to register queue')
|
||||
return control.create(name, metadata=metadata, project=project)
|
||||
return self._mgt_queue_ctrl.create(name, metadata=metadata,
|
||||
project=project)
|
||||
|
||||
def _delete(self, name, project=None):
|
||||
# NOTE(cpp-cabrera): If we fail to find a project/queue in the
|
||||
# catalogue for a delete, just ignore it.
|
||||
control = self._get_controller(name, project)
|
||||
if control:
|
||||
|
||||
mqHandler = self._get_controller(name, project)
|
||||
if mqHandler:
|
||||
# NOTE(cpp-cabrera): delete from the catalogue first. If
|
||||
# zaqar crashes in the middle of these two operations,
|
||||
# it is desirable that the entry be missing from the
|
||||
@ -239,34 +235,24 @@ class QueueController(storage.Queue):
|
||||
# latter case is more difficult to reason about, and may
|
||||
# yield 500s in some operations.
|
||||
self._pool_catalog.deregister(name, project)
|
||||
ret = control.delete(name, project)
|
||||
return ret
|
||||
mqHandler.delete(name, project)
|
||||
|
||||
return None
|
||||
return self._mgt_queue_ctrl.delete(name, project)
|
||||
|
||||
def _exists(self, name, project=None):
|
||||
control = self._get_controller(name, project)
|
||||
if control:
|
||||
return control.exists(name, project=project)
|
||||
return False
|
||||
return self._mgt_queue_ctrl.exists(name, project=project)
|
||||
|
||||
def get_metadata(self, name, project=None):
|
||||
control = self._get_controller(name, project)
|
||||
if control:
|
||||
return control.get_metadata(name, project=project)
|
||||
raise errors.QueueDoesNotExist(name, project)
|
||||
return self._mgt_queue_ctrl.get_metadata(name, project=project)
|
||||
|
||||
def set_metadata(self, name, metadata, project=None):
|
||||
control = self._get_controller(name, project)
|
||||
if control:
|
||||
return control.set_metadata(name, metadata=metadata,
|
||||
project=project)
|
||||
raise errors.QueueDoesNotExist(name, project)
|
||||
return self._mgt_queue_ctrl.set_metadata(name, metadata=metadata,
|
||||
project=project)
|
||||
|
||||
def _stats(self, name, project=None):
|
||||
control = self._get_controller(name, project)
|
||||
if control:
|
||||
return control.stats(name, project=project)
|
||||
mqHandler = self._get_controller(name, project)
|
||||
if mqHandler:
|
||||
return mqHandler.stats(name, project=project)
|
||||
raise errors.QueueDoesNotExist(name, project)
|
||||
|
||||
|
||||
|
@ -145,6 +145,9 @@ class ClaimController(storage.Claim):
|
||||
return claim_id, claimed
|
||||
|
||||
def update(self, queue, claim_id, metadata, project=None):
|
||||
if not self._queue_ctrl.exists(queue, project):
|
||||
raise errors.QueueDoesNotExist(queue, project)
|
||||
|
||||
container = utils._claim_container(queue, project)
|
||||
try:
|
||||
headers, obj = self._client.get_object(container, claim_id)
|
||||
|
@ -323,7 +323,13 @@ class MessageQueueHandler(object):
|
||||
total = 0
|
||||
claimed = 0
|
||||
container = utils._message_container(name, project)
|
||||
_, objects = self._client.get_container(container)
|
||||
|
||||
try:
|
||||
_, objects = self._client.get_container(container)
|
||||
except swiftclient.ClientException as exc:
|
||||
if exc.http_status == 404:
|
||||
raise errors.QueueIsEmpty(name, project)
|
||||
|
||||
newest = None
|
||||
oldest = None
|
||||
now = timeutils.utcnow_ts(True)
|
||||
|
@ -15,3 +15,6 @@ database = zaqar_test_pooled
|
||||
[drivers:management_store:mongodb]
|
||||
uri = mongodb://127.0.0.1:27017
|
||||
database = zaqar_test
|
||||
|
||||
[pooling:catalog]
|
||||
enable_virtual_pool = True
|
@ -0,0 +1,20 @@
|
||||
[DEFAULT]
|
||||
pooling = True
|
||||
admin_mode = True
|
||||
unreliable = True
|
||||
enable_deprecated_api_versions = 1,1.1
|
||||
|
||||
[drivers]
|
||||
transport = wsgi
|
||||
message_store = mongodb
|
||||
|
||||
[drivers:message_store:mongodb]
|
||||
uri = mongodb://127.0.0.1:27017
|
||||
database = zaqar_test_pooled
|
||||
|
||||
[drivers:management_store:mongodb]
|
||||
uri = mongodb://127.0.0.1:27017
|
||||
database = zaqar_test
|
||||
|
||||
[pooling:catalog]
|
||||
enable_virtual_pool = False
|
@ -14,4 +14,7 @@ reconnect_sleep = 1
|
||||
[drivers:management_store:redis]
|
||||
uri = redis://127.0.0.1:6379
|
||||
max_reconnect_attempts = 3
|
||||
reconnect_sleep = 1
|
||||
reconnect_sleep = 1
|
||||
|
||||
[pooling:catalog]
|
||||
enable_virtual_pool = True
|
@ -12,3 +12,5 @@ bind = 0.0.0.0
|
||||
port = 8888
|
||||
workers = 20
|
||||
|
||||
[pooling:catalog]
|
||||
enable_virtual_pool = True
|
@ -29,7 +29,7 @@ from zaqar import tests as testing
|
||||
@testing.requires_mongodb
|
||||
class PoolCatalogTest(testing.TestBase):
|
||||
|
||||
config_file = 'wsgi_mongodb_pooled.conf'
|
||||
config_file = 'wsgi_mongodb_pooled_disable_virtual_pool.conf'
|
||||
|
||||
def setUp(self):
|
||||
super(PoolCatalogTest, self).setUp()
|
||||
|
@ -54,6 +54,15 @@ class Resource(object):
|
||||
resp.body = utils.to_json(resp_dict)
|
||||
# status defaults to 200
|
||||
|
||||
except storage_errors.QueueIsEmpty as ex:
|
||||
resp_dict = {
|
||||
'messages': {
|
||||
'claimed': 0,
|
||||
'free': 0,
|
||||
'total': 0
|
||||
}
|
||||
}
|
||||
resp.body = utils.to_json(resp_dict)
|
||||
except storage_errors.DoesNotExist as ex:
|
||||
LOG.debug(ex)
|
||||
raise wsgi_errors.HTTPNotFound(six.text_type(ex))
|
||||
|
@ -53,7 +53,8 @@ class Resource(object):
|
||||
resp.body = utils.to_json(resp_dict)
|
||||
# status defaults to 200
|
||||
|
||||
except storage_errors.QueueDoesNotExist as ex:
|
||||
except (storage_errors.QueueDoesNotExist,
|
||||
storage_errors.QueueIsEmpty) as ex:
|
||||
resp_dict = {
|
||||
'messages': {
|
||||
'claimed': 0,
|
||||
|
@ -57,7 +57,8 @@ class Resource(object):
|
||||
resp.body = utils.to_json(resp_dict)
|
||||
# status defaults to 200
|
||||
|
||||
except storage_errors.QueueDoesNotExist as ex:
|
||||
except (storage_errors.QueueDoesNotExist,
|
||||
storage_errors.QueueIsEmpty) as ex:
|
||||
resp_dict = {
|
||||
'messages': {
|
||||
'claimed': 0,
|
||||
|
Loading…
x
Reference in New Issue
Block a user