Remove QueueController from data to control plane

To separate data and control plane of storage layer;
Moving queue_controller from data to controll plane.
Trying to shift the QueueController object in Control plane.
For this new MessageQueueHandler class is also added inside storage/x/messages.py
And the newly added handler is added to the entry point.

Implements: blueprint split-data-and-control-plane
Co-Author: Flavio Percoco<flavio@redhat.com>

Change-Id: I8a167d6ed8e54c98b077b9ea56e68b4e8d5b0291
This commit is contained in:
Shaifali Agrawal 2015-02-24 00:22:51 -08:00 committed by Flavio Percoco
parent 3dce426e0c
commit c3a5718775
31 changed files with 534 additions and 496 deletions

View File

@ -72,6 +72,13 @@ oslo.config.opts =
zaqar.storage.stages =
zaqar.notification.notifier = zaqar.notification.notifier:NotifierDriver
zaqar.storage.mongodb.driver.queue.stages =
message_queue_handler = zaqar.storage.mongodb.messages:MessageQueueHandler
zaqar.storage.redis.driver.queue.stages =
message_queue_handler = zaqar.storage.redis.messages:MessageQueueHandler
[nosetests]
where=tests
verbosity=2

View File

@ -9,7 +9,7 @@ storage = mongodb
[drivers:message_store:mongodb]
uri = mongodb://127.0.0.1:27017
database = zaqar_test
database = zaqar_test_pooled
[drivers:management_store:mongodb]
uri = mongodb://127.0.0.1:27017

View File

@ -12,13 +12,11 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ddt
from zaqar.tests.functional import base
from zaqar.tests.functional import helpers
@ddt.ddt
class TestHealth(base.V1_1FunctionalTestBase):
server_class = base.ZaqarAdminServer
@ -37,22 +35,16 @@ class TestHealth(base.V1_1FunctionalTestBase):
self.client.set_base_url(self.base_url)
@ddt.data(
{
'name': "pool_1",
'weight': 10,
'uri': "mongodb://localhost:27017"
}
)
def test_health_with_pool(self, params):
def test_health_with_pool(self):
# FIXME(flwang): Please use mongodb after the sqlalchemy is disabled
# as pool node and the mongodb is working on gate successfully.
doc = helpers.create_pool_body(
weight=params.get('weight', 10),
uri=params.get('uri', "mongodb://localhost:27017")
weight=10,
uri="mongodb://localhost:27017",
options=dict(database='zaqar_test_pooled_1')
)
pool_name = params.get('name', "pool_1")
pool_name = "pool_1"
self.addCleanup(self.client.delete, url='/pools/' + pool_name)
result = self.client.put('/pools/' + pool_name, data=doc)
@ -86,7 +78,7 @@ class TestHealth(base.V1_1FunctionalTestBase):
self.assertEqual(health[pool_name]['storage_reachable'], True)
op_status = health[pool_name]['operation_status']
for op in op_status.keys():
self.assertTrue(op_status[op]['succeeded'])
self.assertTrue(op_status[op]['succeeded'])
message_volume = health[pool_name]['message_volume']
self.assertEqual(message_volume['claimed'], 2)

View File

@ -40,18 +40,24 @@ from zaqar.tests.unit.storage import base
class MongodbSetupMixin(object):
def _purge_databases(self):
databases = (self.driver.message_databases +
[self.driver.queues_database,
self.driver.subscriptions_database])
if isinstance(self.driver, mongodb.DataDriver):
databases = (self.driver.message_databases +
[self.control.queues_database,
self.driver.subscriptions_database])
else:
databases = [self.driver.queues_database]
for db in databases:
self.driver.connection.drop_database(db)
def _prepare_conf(self):
self.config(options.MESSAGE_MONGODB_GROUP,
database=uuid.uuid4().hex)
self.config(options.MANAGEMENT_MONGODB_GROUP,
database=uuid.uuid4().hex)
if options.MESSAGE_MONGODB_GROUP in self.conf:
self.config(options.MESSAGE_MONGODB_GROUP,
database=uuid.uuid4().hex)
if options.MANAGEMENT_MONGODB_GROUP in self.conf:
self.config(options.MANAGEMENT_MONGODB_GROUP,
database=uuid.uuid4().hex)
class MongodbUtilsTest(MongodbSetupMixin, testing.TestBase):
@ -69,6 +75,7 @@ class MongodbUtilsTest(MongodbSetupMixin, testing.TestBase):
MockDriver = collections.namedtuple('MockDriver', 'mongodb_conf')
self.driver = MockDriver(self.mongodb_conf)
self.control_driver = MockDriver(self.mongodb_conf)
def test_scope_queue_name(self):
self.assertEqual(utils.scope_queue_name('my-q'), '/my-q')
@ -153,14 +160,12 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
def test_db_instance(self):
self.config(unreliable=True)
cache = oslo_cache.get_cache()
driver = mongodb.DataDriver(self.conf, cache)
control = mongodb.ControlDriver(self.conf, cache)
data = mongodb.DataDriver(self.conf, cache, control)
databases = (driver.message_databases +
[driver.queues_database])
for db in databases:
for db in data.message_databases:
self.assertThat(db.name, matchers.StartsWith(
driver.mongodb_conf.database))
data.mongodb_conf.database))
def test_version_match(self):
self.config(unreliable=True)
@ -169,12 +174,14 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
with mock.patch('pymongo.MongoClient.server_info') as info:
info.return_value = {'version': '2.1'}
self.assertRaises(RuntimeError, mongodb.DataDriver,
self.conf, cache)
self.conf, cache,
mongodb.ControlDriver(self.conf, cache))
info.return_value = {'version': '2.11'}
try:
mongodb.DataDriver(self.conf, cache)
mongodb.DataDriver(self.conf, cache,
mongodb.ControlDriver(self.conf, cache))
except RuntimeError:
self.fail('version match failed')
@ -186,21 +193,24 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos:
is_mongos.__get__ = mock.Mock(return_value=False)
self.assertRaises(RuntimeError, mongodb.DataDriver,
self.conf, cache)
self.conf, cache,
mongodb.ControlDriver(self.conf, cache))
def test_using_replset(self):
cache = oslo_cache.get_cache()
with mock.patch('pymongo.MongoClient.nodes') as nodes:
nodes.__get__ = mock.Mock(return_value=['node1', 'node2'])
mongodb.DataDriver(self.conf, cache)
mongodb.DataDriver(self.conf, cache,
mongodb.ControlDriver(self.conf, cache))
def test_using_mongos(self):
cache = oslo_cache.get_cache()
with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos:
is_mongos.__get__ = mock.Mock(return_value=True)
mongodb.DataDriver(self.conf, cache)
mongodb.DataDriver(self.conf, cache,
mongodb.ControlDriver(self.conf, cache))
def test_write_concern_check_works(self):
cache = oslo_cache.get_cache()
@ -211,17 +221,21 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
with mock.patch('pymongo.MongoClient.write_concern') as wc:
wc.__get__ = mock.Mock(return_value={'w': 1})
self.assertRaises(RuntimeError, mongodb.DataDriver,
self.conf, cache)
self.conf, cache,
mongodb.ControlDriver(self.conf, cache))
wc.__get__ = mock.Mock(return_value={'w': 2})
mongodb.DataDriver(self.conf, cache)
mongodb.DataDriver(self.conf, cache,
mongodb.ControlDriver(self.conf, cache))
def test_write_concern_is_set(self):
cache = oslo_cache.get_cache()
with mock.patch('pymongo.MongoClient.is_mongos') as is_mongos:
is_mongos.__get__ = mock.Mock(return_value=True)
driver = mongodb.DataDriver(self.conf, cache)
driver = mongodb.DataDriver(self.conf, cache,
mongodb.ControlDriver
(self.conf, cache))
wc = driver.connection.write_concern
self.assertEqual(wc['w'], 'majority')
self.assertEqual(wc['j'], False)
@ -230,25 +244,16 @@ class MongodbDriverTest(MongodbSetupMixin, testing.TestBase):
@testing.requires_mongodb
class MongodbQueueTests(MongodbSetupMixin, base.QueueControllerTest):
driver_class = mongodb.DataDriver
driver_class = mongodb.ControlDriver
config_file = 'wsgi_mongodb.conf'
controller_class = controllers.QueueController
control_driver_class = mongodb.ControlDriver
def test_indexes(self):
collection = self.controller._collection
indexes = collection.index_information()
self.assertIn('p_q_1', indexes)
def test_messages_purged(self):
queue_name = 'test'
self.controller.create(queue_name)
self.message_controller.post(queue_name,
[{'ttl': 60}],
1234)
self.controller.delete(queue_name)
for collection in self.message_controller._collections:
self.assertEqual(collection.find({'q': queue_name}).count(), 0)
def test_raises_connection_error(self):
with mock.patch.object(cursor.Cursor,
@ -268,6 +273,7 @@ class MongodbMessageTests(MongodbSetupMixin, base.MessageControllerTest):
driver_class = mongodb.DataDriver
config_file = 'wsgi_mongodb.conf'
controller_class = controllers.MessageController
control_driver_class = mongodb.ControlDriver
# NOTE(kgriffs): MongoDB's TTL scavenger only runs once a minute
gc_interval = 60
@ -349,6 +355,7 @@ class MongodbFIFOMessageTests(MongodbSetupMixin, base.MessageControllerTest):
driver_class = mongodb.DataDriver
config_file = 'wsgi_fifo_mongodb.conf'
controller_class = controllers.FIFOMessageController
control_driver_class = mongodb.ControlDriver
# NOTE(kgriffs): MongoDB's TTL scavenger only runs once a minute
gc_interval = 60
@ -427,6 +434,7 @@ class MongodbClaimTests(MongodbSetupMixin, base.ClaimControllerTest):
driver_class = mongodb.DataDriver
config_file = 'wsgi_mongodb.conf'
controller_class = controllers.ClaimController
control_driver_class = mongodb.ControlDriver
def test_claim_doesnt_exist(self):
"""Verifies that operations fail on expired/missing claims.
@ -462,6 +470,7 @@ class MongodbSubscriptionTests(MongodbSetupMixin,
driver_class = mongodb.DataDriver
config_file = 'wsgi_mongodb.conf'
controller_class = controllers.SubscriptionController
control_driver_class = mongodb.ControlDriver
#
@ -473,6 +482,7 @@ class MongodbPoolsTests(base.PoolsControllerTest):
config_file = 'wsgi_mongodb.conf'
driver_class = mongodb.ControlDriver
controller_class = controllers.PoolsController
control_driver_class = mongodb.ControlDriver
def setUp(self):
super(MongodbPoolsTests, self).setUp()
@ -500,6 +510,7 @@ class MongodbPoolsTests(base.PoolsControllerTest):
class MongodbCatalogueTests(base.CatalogueControllerTest):
driver_class = mongodb.ControlDriver
controller_class = controllers.CatalogueController
control_driver_class = mongodb.ControlDriver
def setUp(self):
super(MongodbCatalogueTests, self).setUp()
@ -522,15 +533,6 @@ class PooledMessageTests(base.MessageControllerTest):
gc_interval = 60
@testing.requires_mongodb
class PooledQueueTests(base.QueueControllerTest):
config_file = 'wsgi_mongodb_pooled.conf'
controller_class = pooling.QueueController
driver_class = pooling.DataDriver
control_driver_class = mongodb.ControlDriver
controller_base_class = storage.Queue
@testing.requires_mongodb
class PooledClaimsTests(base.ClaimControllerTest):
config_file = 'wsgi_mongodb_pooled.conf'
@ -554,6 +556,7 @@ class PooledClaimsTests(base.ClaimControllerTest):
class MongodbFlavorsTest(base.FlavorsControllerTest):
driver_class = mongodb.ControlDriver
controller_class = controllers.FlavorsController
control_driver_class = mongodb.ControlDriver
def setUp(self):
super(MongodbFlavorsTest, self).setUp()

View File

@ -24,6 +24,7 @@ import redis
from zaqar.common import errors
from zaqar.openstack.common.cache import cache as oslo_cache
from zaqar import storage
from zaqar.storage import mongodb
from zaqar.storage.redis import controllers
from zaqar.storage.redis import driver
from zaqar.storage.redis import messages
@ -173,7 +174,9 @@ class RedisDriverTest(testing.TestBase):
def test_db_instance(self):
cache = oslo_cache.get_cache()
redis_driver = driver.DataDriver(self.conf, cache)
redis_driver = driver.DataDriver(self.conf, cache,
driver.ControlDriver
(self.conf, cache))
self.assertTrue(isinstance(redis_driver.connection, redis.StrictRedis))
@ -183,12 +186,14 @@ class RedisDriverTest(testing.TestBase):
with mock.patch('redis.StrictRedis.info') as info:
info.return_value = {'redis_version': '2.4.6'}
self.assertRaises(RuntimeError, driver.DataDriver,
self.conf, cache)
self.conf, cache,
driver.ControlDriver(self.conf, cache))
info.return_value = {'redis_version': '2.11'}
try:
driver.DataDriver(self.conf, cache)
driver.DataDriver(self.conf, cache,
driver.ControlDriver(self.conf, cache))
except RuntimeError:
self.fail('version match failed')
@ -281,6 +286,7 @@ class RedisQueuesTest(base.QueueControllerTest):
driver_class = driver.DataDriver
config_file = 'wsgi_redis.conf'
controller_class = controllers.QueueController
control_driver_class = mongodb.ControlDriver
def setUp(self):
super(RedisQueuesTest, self).setUp()
@ -297,11 +303,11 @@ class RedisMessagesTest(base.MessageControllerTest):
driver_class = driver.DataDriver
config_file = 'wsgi_redis.conf'
controller_class = controllers.MessageController
control_driver_class = mongodb.ControlDriver
def setUp(self):
super(RedisMessagesTest, self).setUp()
self.connection = self.driver.connection
self.queue_ctrl = self.driver.queue_controller
def tearDown(self):
super(RedisMessagesTest, self).tearDown()
@ -309,7 +315,7 @@ class RedisMessagesTest(base.MessageControllerTest):
def test_count(self):
queue_name = 'get-count'
self.queue_ctrl.create(queue_name)
self.queue_controller.create(queue_name)
msgs = [{
'ttl': 300,
@ -325,13 +331,13 @@ class RedisMessagesTest(base.MessageControllerTest):
def test_empty_queue_exception(self):
queue_name = 'empty-queue-test'
self.queue_ctrl.create(queue_name)
self.queue_controller.create(queue_name)
self.assertRaises(storage.errors.QueueIsEmpty,
self.controller.first, queue_name)
def test_gc(self):
self.queue_ctrl.create(self.queue_name)
self.queue_controller.create(self.queue_name)
self.controller.post(self.queue_name,
[{'ttl': 0, 'body': {}}],
client_uuid=str(uuid.uuid4()))
@ -353,12 +359,11 @@ class RedisClaimsTest(base.ClaimControllerTest):
driver_class = driver.DataDriver
config_file = 'wsgi_redis.conf'
controller_class = controllers.ClaimController
control_driver_class = mongodb.ControlDriver
def setUp(self):
super(RedisClaimsTest, self).setUp()
self.connection = self.driver.connection
self.queue_ctrl = self.driver.queue_controller
self.message_ctrl = self.driver.message_controller
def tearDown(self):
super(RedisClaimsTest, self).tearDown()
@ -367,7 +372,7 @@ class RedisClaimsTest(base.ClaimControllerTest):
def test_claim_doesnt_exist(self):
queue_name = 'no-such-claim'
epoch = '000000000000000000000000'
self.queue_ctrl.create(queue_name)
self.queue_controller.create(queue_name)
self.assertRaises(storage.errors.ClaimDoesNotExist,
self.controller.get, queue_name,
epoch, project=None)
@ -383,12 +388,12 @@ class RedisClaimsTest(base.ClaimControllerTest):
claim_id, {}, project=None)
def test_gc(self):
self.queue_ctrl.create(self.queue_name)
self.queue_controller.create(self.queue_name)
for _ in range(100):
self.message_ctrl.post(self.queue_name,
[{'ttl': 300, 'body': 'yo gabba'}],
client_uuid=str(uuid.uuid4()))
self.message_controller.post(self.queue_name,
[{'ttl': 300, 'body': 'yo gabba'}],
client_uuid=str(uuid.uuid4()))
now = timeutils.utcnow_ts()
timeutils_utcnow = 'oslo_utils.timeutils.utcnow_ts'

View File

@ -65,7 +65,7 @@ class PoolCatalogTest(testing.TestBase):
def test_lookup_loads_correct_driver(self):
storage = self.catalog.lookup(self.queue, self.project)
self.assertIsInstance(storage, mongodb.DataDriver)
self.assertIsInstance(storage._storage, mongodb.DataDriver)
def test_lookup_returns_none_if_queue_not_mapped(self):
self.assertIsNone(self.catalog.lookup('not', 'mapped'))
@ -77,14 +77,14 @@ class PoolCatalogTest(testing.TestBase):
def test_register_leads_to_successful_lookup(self):
self.catalog.register('not_yet', 'mapped')
storage = self.catalog.lookup('not_yet', 'mapped')
self.assertIsInstance(storage, mongodb.DataDriver)
self.assertIsInstance(storage._storage, mongodb.DataDriver)
def test_register_with_flavor(self):
queue = 'test'
self.catalog.register(queue, project=self.project,
flavor=self.flavor)
storage = self.catalog.lookup(queue, self.project)
self.assertIsInstance(storage, mongodb.DataDriver)
self.assertIsInstance(storage._storage, mongodb.DataDriver)
def test_register_with_fake_flavor(self):
self.assertRaises(errors.FlavorDoesNotExist,

View File

@ -1,99 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import random
import uuid
import six
from zaqar.openstack.common.cache import cache as oslo_cache
from zaqar.storage import pooling
from zaqar.storage import utils
from zaqar import tests as testing
@testing.requires_mongodb
class PoolQueuesTest(testing.TestBase):
config_file = 'wsgi_mongodb_pooled.conf'
def setUp(self):
super(PoolQueuesTest, self).setUp()
cache = oslo_cache.get_cache()
control = utils.load_storage_driver(self.conf, cache,
control_mode=True)
self.pools_ctrl = control.pools_controller
self.driver = pooling.DataDriver(self.conf, cache, control)
self.controller = self.driver.queue_controller
# fake two pools
for i in six.moves.xrange(2):
options = {'database': "zaqar_test_pools_" + str(i)}
self.pools_ctrl.create(str(uuid.uuid1()), 100,
'mongodb://localhost:27017',
options=options)
def tearDown(self):
self.pools_ctrl.drop_all()
super(PoolQueuesTest, self).tearDown()
def test_ping(self):
ping = self.driver.is_alive()
self.assertTrue(ping)
def test_listing(self):
project = "I.G"
interaction = self.controller.list(project=project,
detailed=False)
queues = list(next(interaction))
self.assertEqual(len(queues), 0)
for n in six.moves.xrange(10):
name = 'queue_%d' % n
self.controller.create(name, project=project)
self.controller.set_metadata(name,
metadata=random.getrandbits(12),
project=project)
interaction = self.controller.list(project=project,
detailed=True,
limit=7)
queues.extend(next(interaction))
marker = next(interaction)
self.assertEqual(len(queues), 7)
interaction = self.controller.list(project=project,
detailed=True,
limit=7,
marker=marker)
queues.extend(next(interaction))
self.assertEqual(len(queues), 10)
# ordered by name as a whole
self.assertTrue(all(queues[i]['name'] <= queues[i + 1]['name']
for i in six.moves.xrange(len(queues) - 1)))
for n in six.moves.xrange(10):
self.controller.delete('queue_%d' % n, project=project)
interaction = self.controller.list(project=project,
detailed=False)
queues = list(next(interaction))
self.assertEqual(len(queues), 0)

View File

@ -103,17 +103,17 @@ class Bootstrap(object):
@decorators.lazy_property(write=False)
def storage(self):
LOG.debug(u'Loading storage driver')
if self.conf.pooling:
LOG.debug(u'Storage pooling enabled')
storage_driver = pooling.DataDriver(self.conf, self.cache,
self.control)
else:
storage_driver = storage_utils.load_storage_driver(
self.conf, self.cache)
self.conf, self.cache, control_driver=self.control)
LOG.debug(u'Loading storage pipeline')
return pipeline.DataDriver(self.conf, storage_driver)
return pipeline.DataDriver(self.conf, storage_driver,
self.control)
@decorators.lazy_property(write=False)
def control(self):

View File

@ -25,6 +25,7 @@ import enum
from oslo_config import cfg
import six
from zaqar.common import decorators
import zaqar.openstack.common.log as logging
from zaqar.storage import errors
from zaqar.storage import utils
@ -95,8 +96,11 @@ class DataDriverBase(DriverBase):
BASE_CAPABILITIES = []
def __init__(self, conf, cache):
def __init__(self, conf, cache, control_driver):
super(DataDriverBase, self).__init__(conf, cache)
# creating ControlDriver instance for accessing QueueController's
# data from DataDriver
self.control_driver = control_driver
@abc.abstractmethod
def is_alive(self):
@ -195,6 +199,9 @@ class DataDriverBase(DriverBase):
_handle_status('delete_claim', func)
# delete queue
func = functools.partial(self.message_controller.bulk_delete,
queue, msg_ids, project=project)
_handle_status('bulk_delete_messages', func)
func = functools.partial(self.queue_controller.delete,
queue, project=project)
_handle_status('delete_queue', func)
@ -211,10 +218,9 @@ class DataDriverBase(DriverBase):
"""
pass
@abc.abstractproperty
@decorators.lazy_property(write=False)
def queue_controller(self):
"""Returns the driver's queue controller."""
raise NotImplementedError
return self.control_driver.queue_controller
@abc.abstractproperty
def message_controller(self):
@ -265,6 +271,11 @@ class ControlDriverBase(DriverBase):
"""Returns storage's flavor management controller."""
raise NotImplementedError
@abc.abstractproperty
def queue_controller(self):
"""Returns the driver's queue controller."""
raise NotImplementedError
class ControllerBase(object):
"""Top-level class for controllers.

View File

@ -78,8 +78,8 @@ class DataDriver(storage.DataDriverBase):
_COL_SUFIX = "_messages_p"
def __init__(self, conf, cache):
super(DataDriver, self).__init__(conf, cache)
def __init__(self, conf, cache, control_driver):
super(DataDriver, self).__init__(conf, cache, control_driver)
self.mongodb_conf = self.conf[options.MESSAGE_MONGODB_GROUP]
@ -146,17 +146,6 @@ class DataDriver(storage.DataDriverBase):
KPI['message_volume'] = message_volume
return KPI
@decorators.lazy_property(write=False)
def queues_database(self):
"""Database dedicated to the "queues" collection.
The queues collection is separated out into its own database
to avoid writer lock contention with the messages collections.
"""
name = self.mongodb_conf.database + '_queues'
return self.connection[name]
@decorators.lazy_property(write=False)
def message_databases(self):
"""List of message databases, ordered by partition number."""
@ -185,10 +174,6 @@ class DataDriver(storage.DataDriverBase):
"""MongoDB client connection instance."""
return _connection(self.mongodb_conf)
@decorators.lazy_property(write=False)
def queue_controller(self):
return controllers.QueueController(self)
@decorators.lazy_property(write=False)
def message_controller(self):
return controllers.MessageController(self)
@ -236,6 +221,21 @@ class ControlDriver(storage.ControlDriverBase):
name = self.mongodb_conf.database
return self.connection[name]
@decorators.lazy_property(write=False)
def queues_database(self):
"""Database dedicated to the "queues" collection.
The queues collection is separated out into its own database
to avoid writer lock contention with the messages collections.
"""
name = self.mongodb_conf.database + '_queues'
return self.connection[name]
@decorators.lazy_property(write=False)
def queue_controller(self):
return controllers.QueueController(self)
@property
def pools_controller(self):
return controllers.PoolsController(self)

View File

@ -862,3 +862,64 @@ def _basic_message(msg, now):
'body': msg['b'],
'claim_id': str(msg['c']['id']) if msg['c']['id'] else None
}
# NOTE(kgriffs): E.g.: 'queuecontroller:exists:5083853/my-queue'
_QUEUE_CACHE_PREFIX = 'queuecontroller:'
_QUEUE_CACHE_TTL = 5
def _queue_exists_key(queue, project=None):
# NOTE(kgriffs): Use string concatenation for performance,
# also put project first since it is guaranteed to be
# unique, which should reduce lookup time.
return _QUEUE_CACHE_PREFIX + 'exists:' + str(project) + '/' + queue
class MessageQueueHandler(object):
def __init__(self, driver, control_driver):
self.driver = driver
self._cache = self.driver.cache
self.queue_controller = self.driver.queue_controller
self.message_controller = self.driver.message_controller
def delete(self, queue_name, project=None):
self.message_controller._purge_queue(queue_name, project)
@utils.raises_conn_error
@utils.retries_on_autoreconnect
def stats(self, name, project=None):
if not self.queue_controller.exists(name, project=project):
raise errors.QueueDoesNotExist(name, project)
controller = self.message_controller
active = controller._count(name, project=project,
include_claimed=False)
total = controller._count(name, project=project,
include_claimed=True)
message_stats = {
'claimed': total - active,
'free': active,
'total': total,
}
try:
oldest = controller.first(name, project=project, sort=1)
newest = controller.first(name, project=project, sort=-1)
except errors.QueueIsEmpty:
pass
else:
now = timeutils.utcnow_ts()
message_stats['oldest'] = utils.stat_message(oldest, now)
message_stats['newest'] = utils.stat_message(newest, now)
return {'messages': message_stats}
def _get_scoped_query(name, project):
return {'p_q': utils.scope_queue_name(name, project)}

View File

@ -278,39 +278,12 @@ class QueueController(storage.Queue):
@utils.retries_on_autoreconnect
@_exists.purges
def _delete(self, name, project=None):
self.driver.message_controller._purge_queue(name, project)
self._collection.remove(_get_scoped_query(name, project))
@utils.raises_conn_error
@utils.retries_on_autoreconnect
def _stats(self, name, project=None):
if not self.exists(name, project=project):
raise errors.QueueDoesNotExist(name, project)
controller = self.driver.message_controller
active = controller._count(name, project=project,
include_claimed=False)
total = controller._count(name, project=project,
include_claimed=True)
message_stats = {
'claimed': total - active,
'free': active,
'total': total,
}
try:
oldest = controller.first(name, project=project, sort=1)
newest = controller.first(name, project=project, sort=-1)
except errors.QueueIsEmpty:
pass
else:
now = timeutils.utcnow_ts()
message_stats['oldest'] = utils.stat_message(oldest, now)
message_stats['newest'] = utils.stat_message(newest, now)
return {'messages': message_stats}
pass
def _get_scoped_query(name, project):

View File

@ -47,7 +47,8 @@ class SubscriptionController(base.Subscription):
def __init__(self, *args, **kwargs):
super(SubscriptionController, self).__init__(*args, **kwargs)
self._collection = self.driver.subscriptions_database.subscriptions
self._queue_collection = self.driver.queues_database.queues
queue_col = self.driver.control_driver.queues_database.queues
self._queue_collection = queue_col
self._collection.ensure_index(SUBSCRIPTIONS_INDEX, unique=True)
@utils.raises_conn_error

View File

@ -85,7 +85,7 @@ def _get_storage_pipeline(resource_name, conf, *args, **kwargs):
return pipeline
def _get_builtin_entry_points(resource_name, storage):
def _get_builtin_entry_points(resource_name, storage, control_driver):
# Load builtin stages
builtin_entry_points = []
@ -96,7 +96,8 @@ def _get_builtin_entry_points(resource_name, storage):
namespace = '%s.%s.stages' % (storage.__module__, resource_name)
extensions = extension.ExtensionManager(namespace,
invoke_on_load=True,
invoke_args=[storage])
invoke_args=[storage,
control_driver])
if len(extensions.extensions) == 0:
return []
@ -115,10 +116,10 @@ class DataDriver(base.DataDriverBase):
last step in the pipeline
"""
def __init__(self, conf, storage):
def __init__(self, conf, storage, control_driver):
# NOTE(kgriffs): Pass None for cache since it won't ever
# be referenced.
super(DataDriver, self).__init__(conf, None)
super(DataDriver, self).__init__(conf, None, control_driver)
self._storage = storage
@property
@ -133,14 +134,16 @@ class DataDriver(base.DataDriverBase):
@decorators.lazy_property(write=False)
def queue_controller(self):
stages = _get_builtin_entry_points('queue', self._storage)
stages = _get_builtin_entry_points('queue', self._storage,
self.control_driver)
stages.extend(_get_storage_pipeline('queue', self.conf))
stages.append(self._storage.queue_controller)
return common.Pipeline(stages)
@decorators.lazy_property(write=False)
def message_controller(self):
stages = _get_builtin_entry_points('message', self._storage)
stages = _get_builtin_entry_points('message', self._storage,
self.control_driver)
kwargs = {'subscription_controller':
self._storage.subscription_controller}
stages.extend(_get_storage_pipeline('message', self.conf, **kwargs))
@ -149,14 +152,16 @@ class DataDriver(base.DataDriverBase):
@decorators.lazy_property(write=False)
def claim_controller(self):
stages = _get_builtin_entry_points('claim', self._storage)
stages = _get_builtin_entry_points('claim', self._storage,
self.control_driver)
stages.extend(_get_storage_pipeline('claim', self.conf))
stages.append(self._storage.claim_controller)
return common.Pipeline(stages)
@decorators.lazy_property(write=False)
def subscription_controller(self):
stages = _get_builtin_entry_points('subscription', self._storage)
stages = _get_builtin_entry_points('subscription', self._storage,
self.control_driver)
stages.extend(_get_storage_pipeline('subscription', self.conf))
stages.append(self._storage.subscription_controller)
return common.Pipeline(stages)

View File

@ -23,6 +23,7 @@ from zaqar.common.storage import select
from zaqar.openstack.common import log
from zaqar import storage
from zaqar.storage import errors
from zaqar.storage import pipeline
from zaqar.storage import utils
LOG = log.getLogger(__name__)
@ -69,8 +70,8 @@ class DataDriver(storage.DataDriverBase):
BASE_CAPABILITIES = tuple(storage.Capabilities)
def __init__(self, conf, cache, control):
super(DataDriver, self).__init__(conf, cache)
def __init__(self, conf, cache, control, control_driver=None):
super(DataDriver, self).__init__(conf, cache, control_driver)
self._pool_catalog = Catalog(conf, cache, control)
@property
@ -404,6 +405,7 @@ class Catalog(object):
self._drivers = {}
self._conf = conf
self._cache = cache
self.control = control
self._conf.register_opts(_CATALOG_OPTIONS, group=_CATALOG_GROUP)
self._catalog_conf = self._conf[_CATALOG_GROUP]
@ -424,7 +426,10 @@ class Catalog(object):
pool = self._pools_ctrl.get(pool_id, detailed=True)
conf = utils.dynamic_conf(pool['uri'], pool['options'],
conf=self._conf)
return utils.load_storage_driver(conf, self._cache)
storage = utils.load_storage_driver(conf,
self._cache,
control_driver=self.control)
return pipeline.DataDriver(conf, storage, self.control)
@decorators.caches(_pool_cache_key, _POOL_CACHE_TTL)
def _pool_id(self, queue, project=None):

View File

@ -149,8 +149,8 @@ class DataDriver(storage.DataDriverBase):
_DRIVER_OPTIONS = options._config_options()
def __init__(self, conf, cache):
super(DataDriver, self).__init__(conf, cache)
def __init__(self, conf, cache, control_driver):
super(DataDriver, self).__init__(conf, cache, control_driver)
self.redis_conf = self.conf[options.MESSAGE_REDIS_GROUP]
server_version = self.connection.info()['redis_version']
@ -194,10 +194,6 @@ class DataDriver(storage.DataDriverBase):
"""Redis client connection instance."""
return _get_redis_client(self)
@decorators.lazy_property(write=False)
def queue_controller(self):
return controllers.QueueController(self)
@decorators.lazy_property(write=False)
def message_controller(self):
return controllers.MessageController(self)
@ -226,6 +222,10 @@ class ControlDriver(storage.ControlDriverBase):
"""Redis client connection instance."""
return _get_redis_client(self)
@decorators.lazy_property(write=False)
def queue_controller(self):
return controllers.QueueController(self)
@property
def pools_controller(self):
raise NotImplementedError()

View File

@ -17,6 +17,7 @@ import uuid
from oslo_utils import encodeutils
from oslo_utils import timeutils
import redis
from zaqar.common import decorators
from zaqar import storage
@ -558,3 +559,63 @@ def _filter_messages(messages, filters, to_basic, marker):
yield msg.to_basic(now)
else:
yield msg
QUEUES_SET_STORE_NAME = 'queues_set'
class MessageQueueHandler(object):
def __init__(self, driver, control_driver):
self.driver = driver
self._client = self.driver.connection
self._queue_ctrl = self.driver.queue_controller
self._message_ctrl = self.driver.message_controller
self._claim_ctrl = self.driver.claim_controller
@utils.raises_conn_error
def create(self, name, metadata=None, project=None):
with self._client.pipeline() as pipe:
self._message_ctrl._create_msgset(name, project, pipe)
try:
pipe.execute()
except redis.exceptions.ResponseError:
return False
@utils.raises_conn_error
@utils.retries_on_connection_error
def delete(self, name, project=None):
with self._client.pipeline() as pipe:
self._message_ctrl._delete_msgset(name, project, pipe)
self._message_ctrl._delete_queue_messages(name, project, pipe)
pipe.execute()
@utils.raises_conn_error
@utils.retries_on_connection_error
def stats(self, name, project=None):
if not self._queue_ctrl.exists(name, project=project):
raise errors.QueueDoesNotExist(name, project)
total = self._message_ctrl._count(name, project)
if total:
claimed = self._claim_ctrl._count_messages(name, project)
else:
claimed = 0
message_stats = {
'claimed': claimed,
'free': total - claimed,
'total': total,
}
if total:
try:
newest = self._message_ctrl.first(name, project, -1)
oldest = self._message_ctrl.first(name, project, 1)
except errors.QueueIsEmpty:
pass
else:
message_stats['newest'] = newest
message_stats['oldest'] = oldest
return {'messages': message_stats}

View File

@ -70,10 +70,6 @@ class QueueController(storage.Queue):
use_bin_type=True).pack
self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8')
@decorators.lazy_property(write=False)
def _message_ctrl(self):
return self.driver.message_controller
@decorators.lazy_property(write=False)
def _claim_ctrl(self):
return self.driver.claim_controller
@ -124,7 +120,7 @@ class QueueController(storage.Queue):
qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
# Check if the queue already exists.
if self.exists(name, project):
if self._exists(name, project):
return False
queue = {
@ -137,7 +133,6 @@ class QueueController(storage.Queue):
# Pipeline ensures atomic inserts.
with self._client.pipeline() as pipe:
pipe.zadd(qset_key, 1, queue_key).hmset(queue_key, queue)
self._message_ctrl._create_msgset(name, project, pipe)
try:
pipe.execute()
@ -187,38 +182,9 @@ class QueueController(storage.Queue):
with self._client.pipeline() as pipe:
pipe.zrem(qset_key, queue_key)
pipe.delete(queue_key)
self._message_ctrl._delete_msgset(name, project, pipe)
self._message_ctrl._delete_queue_messages(name, project, pipe)
pipe.execute()
@utils.raises_conn_error
@utils.retries_on_connection_error
def _stats(self, name, project=None):
if not self.exists(name, project=project):
raise errors.QueueDoesNotExist(name, project)
total = self._message_ctrl._count(name, project)
if total:
claimed = self._claim_ctrl._count_messages(name, project)
else:
claimed = 0
message_stats = {
'claimed': claimed,
'free': total - claimed,
'total': total,
}
if total:
try:
newest = self._message_ctrl.first(name, project, -1)
oldest = self._message_ctrl.first(name, project, 1)
except errors.QueueIsEmpty:
pass
else:
message_stats['newest'] = newest
message_stats['oldest'] = oldest
return {'messages': message_stats}
pass

View File

@ -88,7 +88,7 @@ class ClaimController(storage.Claim):
with self.driver.trans() as trans:
try:
qid = utils.get_qid(self.driver, queue, project)
qid = utils.get_qid(self.driver.control_driver, queue, project)
except errors.QueueDoesNotExist:
return None, iter([])
@ -136,7 +136,7 @@ class ClaimController(storage.Claim):
age = utils.get_age(tables.Claims.c.created)
with self.driver.trans() as trans:
qid = utils.get_qid(self.driver, queue, project)
qid = utils.get_qid(self.driver.control_driver, queue, project)
update = tables.Claims.update().where(sa.and_(
tables.Claims.c.ttl > age,
@ -168,7 +168,7 @@ class ClaimController(storage.Claim):
try:
# NOTE(flaper87): This could probably use some
# joins and be just 1 query.
qid = utils.get_qid(self.driver, queue, project)
qid = utils.get_qid(self.driver.control_driver, queue, project)
except errors.QueueDoesNotExist:
return

View File

@ -133,7 +133,8 @@ class MessageController(storage.Message):
if project is None:
project = ''
qid = utils.get_qid(self.driver, queue, project)
qid = utils.get_qid(self.driver.control_driver,
queue, project)
sel = sa.sql.select([tables.Messages.c.id,
tables.Messages.c.body,
@ -230,7 +231,8 @@ class MessageController(storage.Message):
project = ''
with self.driver.trans() as trans:
qid = utils.get_qid(self.driver, queue, project)
qid = utils.get_qid(self.driver.control_driver,
queue, project)
# Delete the expired messages
and_stmt = sa.and_(tables.Messages.c.ttl <=
@ -310,7 +312,8 @@ class MessageController(storage.Message):
with self.driver.trans() as trans:
try:
qid = utils.get_qid(self.driver, queue, project)
qid = utils.get_qid(self.driver.control_driver,
queue, project)
except errors.QueueDoesNotExist:
return
@ -359,7 +362,8 @@ class MessageController(storage.Message):
statement = tables.Messages.delete()
qid = utils.get_qid(self.driver, queue_name, project)
qid = utils.get_qid(self.driver.control_driver,
queue_name, project)
and_stmt = [tables.Messages.c.id.in_(message_ids),
tables.Messages.c.qid == qid]
@ -367,3 +371,50 @@ class MessageController(storage.Message):
trans.execute(statement.where(sa.and_(*and_stmt)))
return messages
class MessageQueueHandler(object):
def __init__(self, driver, control_driver):
self.driver = driver
def stats(self, name, project):
if project is None:
project = ''
qid = utils.get_qid(self.driver.control_driver, name, project)
sel = sa.sql.select([
sa.sql.select([sa.func.count(tables.Messages.c.id)],
sa.and_(
tables.Messages.c.qid == qid,
tables.Messages.c.cid != (None),
tables.Messages.c.ttl >
sfunc.now() - tables.Messages.c.created)),
sa.sql.select([sa.func.count(tables.Messages.c.id)],
sa.and_(
tables.Messages.c.qid == qid,
tables.Messages.c.cid == (None),
tables.Messages.c.ttl >
sfunc.now() - tables.Messages.c.created))
])
claimed, free = self.driver.get(sel)
total = free + claimed
message_stats = {
'claimed': claimed,
'free': free,
'total': total,
}
try:
message_controller = self.driver.message_controller
oldest = message_controller.first(name, project, sort=1)
newest = message_controller.first(name, project, sort=-1)
except errors.QueueIsEmpty:
pass
else:
message_stats['oldest'] = utils.stat_message(oldest)
message_stats['newest'] = utils.stat_message(newest)
return {'messages': message_stats}

View File

@ -13,7 +13,6 @@
# the License.
import sqlalchemy as sa
from sqlalchemy.sql import func as sfunc
from zaqar import storage
from zaqar.storage import errors
@ -130,43 +129,4 @@ class QueueController(storage.Queue):
self.driver.run(dlt)
def _stats(self, name, project):
if project is None:
project = ''
qid = utils.get_qid(self.driver, name, project)
sel = sa.sql.select([
sa.sql.select([sa.func.count(tables.Messages.c.id)],
sa.and_(
tables.Messages.c.qid == qid,
tables.Messages.c.cid != (None),
tables.Messages.c.ttl >
sfunc.now() - tables.Messages.c.created)),
sa.sql.select([sa.func.count(tables.Messages.c.id)],
sa.and_(
tables.Messages.c.qid == qid,
tables.Messages.c.cid == (None),
tables.Messages.c.ttl >
sfunc.now() - tables.Messages.c.created))
])
claimed, free = self.driver.get(sel)
total = free + claimed
message_stats = {
'claimed': claimed,
'free': free,
'total': total,
}
try:
message_controller = self.driver.message_controller
oldest = message_controller.first(name, project, sort=1)
newest = message_controller.first(name, project, sort=-1)
except errors.QueueIsEmpty:
pass
else:
message_stats['oldest'] = utils.stat_message(oldest)
message_stats['newest'] = utils.stat_message(newest)
return {'messages': message_stats}
pass

View File

@ -95,7 +95,8 @@ def load_storage_impl(uri, control_mode=False, default_store=None):
raise errors.InvalidDriver(exc)
def load_storage_driver(conf, cache, storage_type=None, control_mode=False):
def load_storage_driver(conf, cache, storage_type=None,
control_mode=False, control_driver=None):
"""Loads a storage driver and returns it.
The driver's initializer will be passed conf and cache as
@ -110,17 +111,24 @@ def load_storage_driver(conf, cache, storage_type=None, control_mode=False):
:param control_mode: (Default False). Determines which
driver type to load; if False, the data driver is
loaded. If True, the control driver is loaded.
:param control_driver: (Default None). The control driver
instance to pass to the storage driver. Needed to access
the queue controller, mainly.
"""
mode = 'control' if control_mode else 'data'
driver_type = 'zaqar.{0}.storage'.format(mode)
storage_type = storage_type or conf['drivers'].storage
_invoke_args = [conf, cache]
if control_driver is not None:
_invoke_args.append(control_driver)
try:
mgr = driver.DriverManager(driver_type,
storage_type,
invoke_on_load=True,
invoke_args=[conf, cache])
invoke_args=_invoke_args)
return mgr.driver
@ -178,7 +186,11 @@ def can_connect(uri, conf=None):
# the URI field. This should be sufficient to initialize a
# storage driver.
driver = load_storage_driver(conf, None,
storage_type=storage_type)
storage_type=storage_type,
control_driver=load_storage_driver
(conf, None,
storage_type=storage_type,
control_mode=True))
return driver.is_alive()
except Exception as exc:
LOG.debug('Can\'t connect to: %s \n%s' % (uri, exc))

View File

@ -17,8 +17,8 @@ from zaqar import storage
class DataDriver(storage.DataDriverBase):
def __init__(self, conf, cache):
super(DataDriver, self).__init__(conf, cache)
def __init__(self, conf, cache, control_driver):
super(DataDriver, self).__init__(conf, cache, control_driver)
@property
def default_options(self):
@ -36,7 +36,7 @@ class DataDriver(storage.DataDriverBase):
@property
def queue_controller(self):
return QueueController(self)
return self.control_driver.queue_controller
@property
def message_controller(self):
@ -56,6 +56,10 @@ class ControlDriver(storage.ControlDriverBase):
def __init__(self, conf, cache):
super(ControlDriver, self).__init__(conf, cache)
@property
def queue_controller(self):
return QueueController(self)
@property
def catalogue_controller(self):
return None

View File

@ -28,6 +28,7 @@ from testtools import matchers
from zaqar.openstack.common.cache import cache as oslo_cache
from zaqar import storage
from zaqar.storage import errors
from zaqar.storage import pipeline
from zaqar import tests as testing
from zaqar.tests import helpers
@ -58,18 +59,21 @@ class ControllerBaseTest(testing.TestBase):
self.skipTest("Pooling is enabled, "
"but control driver class is not specified")
self.control = self.control_driver_class(self.conf, cache)
if not pooling:
self.driver = self.driver_class(self.conf, cache)
args = [self.conf, cache]
if issubclass(self.driver_class, storage.DataDriverBase):
args.append(self.control)
self.driver = self.driver_class(*args)
else:
control = self.control_driver_class(self.conf, cache)
uri = "mongodb://localhost:27017"
for i in range(4):
options = {'database': "zaqar_test_pools_" + str(i)}
control.pools_controller.create(six.text_type(i),
100, uri, options=options)
self.driver = self.driver_class(self.conf, cache, control)
self.addCleanup(control.pools_controller.drop_all)
self.addCleanup(control.catalogue_controller.drop_all)
self.control.pools_controller.create(six.text_type(i),
100, uri, options=options)
self.driver = self.driver_class(self.conf, cache, self.control)
self.addCleanup(self.control.pools_controller.drop_all)
self.addCleanup(self.control.catalogue_controller.drop_all)
self._prepare_conf()
@ -80,6 +84,10 @@ class ControllerBaseTest(testing.TestBase):
else:
self.controller = self.controller_class(self.driver._pool_catalog)
self.pipeline = pipeline.DataDriver(self.conf,
self.driver,
self.control)
def _prepare_conf(self):
"""Prepare the conf before running tests
@ -98,9 +106,7 @@ class QueueControllerTest(ControllerBaseTest):
def setUp(self):
super(QueueControllerTest, self).setUp()
self.queue_controller = self.driver.queue_controller
self.message_controller = self.driver.message_controller
self.claim_controller = self.driver.claim_controller
self.queue_controller = self.pipeline.queue_controller
@ddt.data(None, ControllerBaseTest.project)
def test_list(self, project):
@ -164,10 +170,144 @@ class QueueControllerTest(ControllerBaseTest):
metadata = self.controller.get('test', project=self.project)
self.assertEqual(metadata['meta'], 'test_meta')
# Test queue deletion
self.controller.delete('test', project=self.project)
# Test queue existence
self.assertFalse(self.controller.exists('test', project=self.project))
class MessageControllerTest(ControllerBaseTest):
"""Message Controller base tests.
NOTE(flaper87): Implementations of this class should
override the tearDown method in order
to clean up storage's state.
"""
queue_name = 'test_queue'
controller_base_class = storage.Message
# Specifies how often expired messages are purged, in sec.
gc_interval = 0
def setUp(self):
super(MessageControllerTest, self).setUp()
# Lets create a queue
self.queue_controller = self.pipeline.queue_controller
self.claim_controller = self.pipeline.claim_controller
self.queue_controller.create(self.queue_name, project=self.project)
def tearDown(self):
self.queue_controller.delete(self.queue_name, project=self.project)
super(MessageControllerTest, self).tearDown()
def test_stats_for_empty_queue(self):
self.addCleanup(self.queue_controller.delete, 'test',
project=self.project)
created = self.queue_controller.create('test', project=self.project)
self.assertTrue(created)
stats = self.queue_controller.stats('test', project=self.project)
message_stats = stats['messages']
self.assertEqual(message_stats['free'], 0)
self.assertEqual(message_stats['claimed'], 0)
self.assertEqual(message_stats['total'], 0)
self.assertNotIn('newest', message_stats)
self.assertNotIn('oldest', message_stats)
def test_queue_count_on_bulk_delete(self):
self.addCleanup(self.queue_controller.delete, 'test-queue',
project=self.project)
queue_name = 'test-queue'
client_uuid = uuid.uuid4()
created = self.queue_controller.create(queue_name,
project=self.project)
self.assertTrue(created)
# Create 10 messages.
msg_keys = _insert_fixtures(self.controller, queue_name,
project=self.project,
client_uuid=client_uuid, num=10)
stats = self.queue_controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 10)
# Delete 5 messages
self.controller.bulk_delete(queue_name, msg_keys[0:5],
self.project)
stats = self.queue_controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 5)
def test_queue_count_on_bulk_delete_with_invalid_id(self):
self.addCleanup(self.queue_controller.delete, 'test-queue',
project=self.project)
queue_name = 'test-queue'
client_uuid = uuid.uuid4()
created = self.queue_controller.create(queue_name,
project=self.project)
self.assertTrue(created)
# Create 10 messages.
msg_keys = _insert_fixtures(self.controller, queue_name,
project=self.project,
client_uuid=client_uuid, num=10)
stats = self.queue_controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 10)
# Delete 5 messages
self.controller.bulk_delete(queue_name,
msg_keys[0:5] + ['invalid'],
self.project)
stats = self.queue_controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 5)
def test_queue_count_on_delete(self):
self.addCleanup(self.queue_controller.delete, 'test-queue',
project=self.project)
queue_name = 'test-queue'
client_uuid = uuid.uuid4()
created = self.queue_controller.create(queue_name,
project=self.project)
self.assertTrue(created)
# Create 10 messages.
msg_keys = _insert_fixtures(self.controller, queue_name,
project=self.project,
client_uuid=client_uuid, num=10)
stats = self.queue_controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 10)
# Delete 1 message
self.controller.delete(queue_name, msg_keys[0], self.project)
stats = self.queue_controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 9)
def test_queue_stats(self):
# Test queue creation
self.addCleanup(self.queue_controller.delete, 'test',
project=self.project)
created = self.queue_controller.create('test',
metadata=dict(meta='test_meta'),
project=self.project)
client_uuid = uuid.uuid4()
# Test queue statistic
_insert_fixtures(self.message_controller, 'test',
_insert_fixtures(self.controller, 'test',
project=self.project, client_uuid=client_uuid,
num=6)
@ -176,11 +316,11 @@ class QueueControllerTest(ControllerBaseTest):
# message timestamps (and may not be monkey-patchable).
time.sleep(1.2)
_insert_fixtures(self.message_controller, 'test',
_insert_fixtures(self.controller, 'test',
project=self.project, client_uuid=client_uuid,
num=6)
stats = self.controller.stats('test', project=self.project)
stats = self.queue_controller.stats('test', project=self.project)
message_stats = stats['messages']
self.assertEqual(message_stats['free'], 12)
@ -210,121 +350,23 @@ class QueueControllerTest(ControllerBaseTest):
self.assertThat(oldest['created'],
matchers.LessThan(newest['created']))
# Test queue deletion
self.controller.delete('test', project=self.project)
# Test queue existence
self.assertFalse(self.controller.exists('test', project=self.project))
def test_stats_for_empty_queue(self):
self.addCleanup(self.controller.delete, 'test', project=self.project)
created = self.controller.create('test', project=self.project)
self.assertTrue(created)
stats = self.controller.stats('test', project=self.project)
message_stats = stats['messages']
self.assertEqual(message_stats['free'], 0)
self.assertEqual(message_stats['claimed'], 0)
self.assertEqual(message_stats['total'], 0)
self.assertNotIn('newest', message_stats)
self.assertNotIn('oldest', message_stats)
def test_queue_count_on_bulk_delete(self):
self.addCleanup(self.controller.delete, 'test-queue',
project=self.project)
queue_name = 'test-queue'
client_uuid = uuid.uuid4()
created = self.controller.create(queue_name, project=self.project)
self.assertTrue(created)
# Create 10 messages.
msg_keys = _insert_fixtures(self.message_controller, queue_name,
project=self.project,
client_uuid=client_uuid, num=10)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 10)
# Delete 5 messages
self.message_controller.bulk_delete(queue_name, msg_keys[0:5],
self.project)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 5)
def test_queue_count_on_bulk_delete_with_invalid_id(self):
self.addCleanup(self.controller.delete, 'test-queue',
project=self.project)
queue_name = 'test-queue'
client_uuid = uuid.uuid4()
created = self.controller.create(queue_name, project=self.project)
self.assertTrue(created)
# Create 10 messages.
msg_keys = _insert_fixtures(self.message_controller, queue_name,
project=self.project,
client_uuid=client_uuid, num=10)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 10)
# Delete 5 messages
self.message_controller.bulk_delete(queue_name,
msg_keys[0:5] + ['invalid'],
self.project)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 5)
def test_queue_count_on_delete(self):
self.addCleanup(self.controller.delete, 'test-queue',
project=self.project)
queue_name = 'test-queue'
client_uuid = uuid.uuid4()
created = self.controller.create(queue_name, project=self.project)
self.assertTrue(created)
# Create 10 messages.
msg_keys = _insert_fixtures(self.message_controller, queue_name,
project=self.project,
client_uuid=client_uuid, num=10)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 10)
# Delete 1 message
self.message_controller.delete(queue_name, msg_keys[0],
self.project)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 9)
def test_queue_count_on_claim_delete(self):
self.addCleanup(self.controller.delete, 'test-queue',
self.addCleanup(self.queue_controller.delete, 'test-queue',
project=self.project)
queue_name = 'test-queue'
client_uuid = uuid.uuid4()
created = self.controller.create(queue_name, project=self.project)
created = self.queue_controller.create(queue_name,
project=self.project)
self.assertTrue(created)
# Create 15 messages.
msg_keys = _insert_fixtures(self.message_controller, queue_name,
msg_keys = _insert_fixtures(self.controller, queue_name,
project=self.project,
client_uuid=client_uuid, num=15)
stats = self.controller.stats(queue_name,
self.project)['messages']
stats = self.queue_controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 15)
metadata = {'ttl': 120, 'grace': 60}
@ -332,25 +374,25 @@ class QueueControllerTest(ControllerBaseTest):
claim_id, _ = self.claim_controller.create(queue_name, metadata,
self.project)
stats = self.controller.stats(queue_name,
self.project)['messages']
stats = self.queue_controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['claimed'], 10)
# Delete one message and ensure stats are updated even
# thought the claim itself has not been deleted.
self.message_controller.delete(queue_name, msg_keys[0],
self.project, claim_id)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.controller.delete(queue_name, msg_keys[0],
self.project, claim_id)
stats = self.queue_controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 14)
self.assertEqual(stats['claimed'], 9)
self.assertEqual(stats['free'], 5)
# Same thing but use bulk_delete interface
self.message_controller.bulk_delete(queue_name, msg_keys[1:3],
self.project)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.controller.bulk_delete(queue_name, msg_keys[1:3],
self.project)
stats = self.queue_controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 12)
self.assertEqual(stats['claimed'], 7)
self.assertEqual(stats['free'], 5)
@ -358,37 +400,11 @@ class QueueControllerTest(ControllerBaseTest):
# Delete the claim
self.claim_controller.delete(queue_name, claim_id,
self.project)
stats = self.controller.stats(queue_name,
self.project)['messages']
stats = self.queue_controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['claimed'], 0)
class MessageControllerTest(ControllerBaseTest):
"""Message Controller base tests.
NOTE(flaper87): Implementations of this class should
override the tearDown method in order
to clean up storage's state.
"""
queue_name = 'test_queue'
controller_base_class = storage.Message
# Specifies how often expired messages are purged, in sec.
gc_interval = 0
def setUp(self):
super(MessageControllerTest, self).setUp()
# Lets create a queue
self.queue_controller = self.driver.queue_controller
self.claim_controller = self.driver.claim_controller
self.queue_controller.create(self.queue_name, project=self.project)
def tearDown(self):
self.queue_controller.delete(self.queue_name, project=self.project)
super(MessageControllerTest, self).tearDown()
def test_message_lifecycle(self):
queue_name = self.queue_name
@ -729,8 +745,8 @@ class ClaimControllerTest(ControllerBaseTest):
super(ClaimControllerTest, self).setUp()
# Lets create a queue
self.queue_controller = self.driver.queue_controller
self.message_controller = self.driver.message_controller
self.queue_controller = self.pipeline.queue_controller
self.message_controller = self.pipeline.message_controller
self.queue_controller.create(self.queue_name, project=self.project)
def tearDown(self):

View File

@ -579,7 +579,7 @@ class TestQueueLifecycleMongoDB(QueueLifecycleBaseTest):
storage = self.boot.storage._storage
connection = storage.connection
connection.drop_database(storage.queues_database)
connection.drop_database(self.boot.control.queues_database)
for db in storage.message_databases:
connection.drop_database(db)

View File

@ -233,9 +233,10 @@ class TestClaimsMongoDB(ClaimsBaseTest):
def tearDown(self):
storage = self.boot.storage._storage
control = self.boot.control
connection = storage.connection
connection.drop_database(storage.queues_database)
connection.drop_database(control.queues_database)
for db in storage.message_databases:
connection.drop_database(db)

View File

@ -348,7 +348,7 @@ class TestQueueLifecycleMongoDB(QueueLifecycleBaseTest):
storage = self.boot.storage._storage
connection = storage.connection
connection.drop_database(storage.queues_database)
connection.drop_database(self.boot.control.queues_database)
for db in storage.message_databases:
connection.drop_database(db)

View File

@ -286,9 +286,10 @@ class TestClaimsMongoDB(ClaimsBaseTest):
def tearDown(self):
storage = self.boot.storage._storage
control = self.boot.control
connection = storage.connection
connection.drop_database(storage.queues_database)
connection.drop_database(control.queues_database)
for db in storage.message_databases:
connection.drop_database(db)

View File

@ -329,7 +329,7 @@ class TestQueueLifecycleMongoDB(QueueLifecycleBaseTest):
storage = self.boot.storage._storage
connection = storage.connection
connection.drop_database(storage.queues_database)
connection.drop_database(self.boot.control.queues_database)
for db in storage.message_databases:
connection.drop_database(db)

View File

@ -286,9 +286,10 @@ class TestClaimsMongoDB(ClaimsBaseTest):
def tearDown(self):
storage = self.boot.storage._storage
control = self.boot.control
connection = storage.connection
connection.drop_database(storage.queues_database)
connection.drop_database(control.queues_database)
for db in storage.message_databases:
connection.drop_database(db)

View File

@ -326,10 +326,11 @@ class TestQueueLifecycleMongoDB(QueueLifecycleBaseTest):
super(TestQueueLifecycleMongoDB, self).setUp()
def tearDown(self):
control = self.boot.control
storage = self.boot.storage._storage
connection = storage.connection
connection.drop_database(storage.queues_database)
connection.drop_database(control.queues_database)
for db in storage.message_databases:
connection.drop_database(db)