diff --git a/marconi/queues/bootstrap.py b/marconi/queues/bootstrap.py index 56d4acf04..117a6eaec 100644 --- a/marconi/queues/bootstrap.py +++ b/marconi/queues/bootstrap.py @@ -70,9 +70,11 @@ class Bootstrap(object): if self.conf.sharding: LOG.debug(_(u'Storage sharding enabled')) - storage_driver = sharding.DataDriver(self.conf, self.control) + storage_driver = sharding.DataDriver(self.conf, self.cache, + self.control) else: - storage_driver = storage_utils.load_storage_driver(self.conf) + storage_driver = storage_utils.load_storage_driver( + self.conf, self.cache) LOG.debug(_(u'Loading storage pipeline')) return pipeline.DataDriver(self.conf, storage_driver) @@ -80,12 +82,12 @@ class Bootstrap(object): @decorators.lazy_property(write=False) def control(self): LOG.debug(_(u'Loading storage control driver')) - return storage_utils.load_storage_driver(self.conf, + return storage_utils.load_storage_driver(self.conf, self.cache, control_mode=True) @decorators.lazy_property(write=False) def cache(self): - LOG.debug(_(u'Loading Proxy Cache Driver')) + LOG.debug(_(u'Loading proxy cache driver')) try: mgr = oslo_cache.get_cache(self.conf) return mgr diff --git a/marconi/queues/storage/base.py b/marconi/queues/storage/base.py index af1623481..7d3c35090 100644 --- a/marconi/queues/storage/base.py +++ b/marconi/queues/storage/base.py @@ -32,7 +32,22 @@ _LIMITS_GROUP = 'limits:storage' @six.add_metaclass(abc.ABCMeta) -class DataDriverBase(object): +class DriverBase(object): + """Base class for both data and control plane drivers + + :param conf: Configuration containing options for this driver. + :type conf: `oslo.config.ConfigOpts` + :param cache: Cache instance to use for reducing latency + for certain lookups. + :type cache: `marconi.common.cache.backends.BaseCache` + """ + def __init__(self, conf, cache): + self.conf = conf + self.cache = cache + + +@six.add_metaclass(abc.ABCMeta) +class DataDriverBase(DriverBase): """Interface definition for storage drivers. Data plane storage drivers are responsible for implementing the @@ -41,16 +56,15 @@ class DataDriverBase(object): Connection information and driver-specific options are loaded from the config file or the shard catalog. - :param conf: Driver configuration. Can be any - dict-like object containing the expected - options. Must at least include 'uri' which - provides connection options such as host and - port. - + :param conf: Configuration containing options for this driver. + :type conf: `oslo.config.ConfigOpts` + :param cache: Cache instance to use for reducing latency + for certain lookups. + :type cache: `marconi.common.cache.backends.BaseCache` """ - def __init__(self, conf): - self.conf = conf + def __init__(self, conf, cache): + super(DataDriverBase, self).__init__(conf, cache) self.conf.register_opts(_LIMITS_OPTIONS, group=_LIMITS_GROUP) self.limits_conf = self.conf[_LIMITS_GROUP] @@ -77,7 +91,7 @@ class DataDriverBase(object): @six.add_metaclass(abc.ABCMeta) -class ControlDriverBase(object): +class ControlDriverBase(DriverBase): """Interface definition for control plane storage drivers. Storage drivers that work at the control plane layer allow one to @@ -86,10 +100,13 @@ class ControlDriverBase(object): Allows access to the shard registry through a catalogue and a shard controller. - """ - def __init__(self, conf): - self.conf = conf + :param conf: Configuration containing options for this driver. + :type conf: `oslo.config.ConfigOpts` + :param cache: Cache instance to use for reducing latency + for certain lookups. + :type cache: `marconi.common.cache.backends.BaseCache` + """ @abc.abstractproperty def catalogue_controller(self): diff --git a/marconi/queues/storage/mongodb/driver.py b/marconi/queues/storage/mongodb/driver.py index 78f09c2c8..ee42f6612 100644 --- a/marconi/queues/storage/mongodb/driver.py +++ b/marconi/queues/storage/mongodb/driver.py @@ -39,8 +39,8 @@ def _connection(conf): class DataDriver(storage.DataDriverBase): - def __init__(self, conf): - super(DataDriver, self).__init__(conf) + def __init__(self, conf, cache): + super(DataDriver, self).__init__(conf, cache) opts = options.MONGODB_OPTIONS @@ -112,8 +112,8 @@ class DataDriver(storage.DataDriverBase): class ControlDriver(storage.ControlDriverBase): - def __init__(self, conf): - super(ControlDriver, self).__init__(conf) + def __init__(self, conf, cache): + super(ControlDriver, self).__init__(conf, cache) self.conf.register_opts(options.MONGODB_OPTIONS, group=options.MONGODB_GROUP) diff --git a/marconi/queues/storage/pipeline.py b/marconi/queues/storage/pipeline.py index 0a51deb27..33ae8f144 100644 --- a/marconi/queues/storage/pipeline.py +++ b/marconi/queues/storage/pipeline.py @@ -82,15 +82,15 @@ def _get_storage_pipeline(resource_name, conf): class DataDriver(base.DataDriverBase): """Meta-driver for injecting pipelines in front of controllers. - :param storage_conf: For real drivers, this would be used to - configure the storage, but in this case it is simply ignored. :param conf: Configuration from which to load pipeline settings :param storage: Storage driver that will service requests as the last step in the pipeline """ def __init__(self, conf, storage): - super(DataDriver, self).__init__(conf) + # NOTE(kgriffs): Pass None for cache since it won't ever + # be referenced. + super(DataDriver, self).__init__(conf, None) self._storage = storage def is_alive(self): diff --git a/marconi/queues/storage/sharding.py b/marconi/queues/storage/sharding.py index 818f9a998..76a15bc5d 100644 --- a/marconi/queues/storage/sharding.py +++ b/marconi/queues/storage/sharding.py @@ -38,13 +38,16 @@ _CATALOG_GROUP = 'sharding:catalog' class DataDriver(storage.DataDriverBase): """Sharding meta-driver for routing requests to multiple backends. - :param storage_conf: Ignored, since this is a meta-driver - :param catalog_conf: Options pertaining to the shard catalog + :param conf: Configuration from which to read sharding options + :param cache: Cache instance that will be passed to individual + storage driver instances that correspond to each shard. will + also be used by the shard controller to reduce latency for + some operations. """ - def __init__(self, conf, control): - super(DataDriver, self).__init__(conf) - self._shard_catalog = Catalog(conf, control) + def __init__(self, conf, cache, control): + super(DataDriver, self).__init__(conf, cache) + self._shard_catalog = Catalog(conf, cache, control) def is_alive(self): return all(self._shard_catalog.get_driver(shard['name']).is_alive() @@ -307,9 +310,10 @@ class ClaimController(RoutingController): class Catalog(object): """Represents the mapping between queues and shard drivers.""" - def __init__(self, conf, control): + def __init__(self, conf, cache, control): self._drivers = {} self._conf = conf + self._cache = cache self._conf.register_opts(_CATALOG_OPTIONS, group=_CATALOG_GROUP) self._catalog_conf = self._conf[_CATALOG_GROUP] @@ -355,7 +359,7 @@ class Catalog(object): conf.register_opts(general_opts) conf.register_opts(driver_opts, group=u'drivers') conf.register_opts(storage_opts, group=storage_group) - return utils.load_storage_driver(conf) + return utils.load_storage_driver(conf, self._cache) def register(self, queue, project=None): """Register a new queue in the shard catalog. diff --git a/marconi/queues/storage/sqlite/driver.py b/marconi/queues/storage/sqlite/driver.py index 843d741e6..2352f7231 100644 --- a/marconi/queues/storage/sqlite/driver.py +++ b/marconi/queues/storage/sqlite/driver.py @@ -36,8 +36,8 @@ _SQLITE_GROUP = 'drivers:storage:sqlite' class DataDriver(storage.DataDriverBase): - def __init__(self, conf): - super(DataDriver, self).__init__(conf) + def __init__(self, conf, cache): + super(DataDriver, self).__init__(conf, cache) self.conf.register_opts(_SQLITE_OPTIONS, group=_SQLITE_GROUP) self.sqlite_conf = self.conf[_SQLITE_GROUP] @@ -203,8 +203,8 @@ class DataDriver(storage.DataDriverBase): class ControlDriver(storage.ControlDriverBase): - def __init__(self, conf): - super(ControlDriver, self).__init__(conf) + def __init__(self, conf, cache): + super(ControlDriver, self).__init__(conf, cache) self.conf.register_opts(_SQLITE_OPTIONS, group=_SQLITE_GROUP) self.sqlite_conf = self.conf[_SQLITE_GROUP] diff --git a/marconi/queues/storage/utils.py b/marconi/queues/storage/utils.py index e8661eb5d..97b9b50b9 100644 --- a/marconi/queues/storage/utils.py +++ b/marconi/queues/storage/utils.py @@ -22,22 +22,30 @@ from marconi.openstack.common import log LOG = log.getLogger(__name__) -def load_storage_driver(conf, control_mode=False): +def load_storage_driver(conf, cache, control_mode=False): """Loads a storage driver and returns it. - The driver's initializer will be passed conf as its only arg. + The driver's initializer will be passed conf and cache as + its positional args. :param conf: Configuration instance to use for loading the driver. Must include a 'drivers' group. + :param cache: Cache instance that the driver can (optionally) + use to reduce latency for some operations. + :param control_mode: (Default False). Determines which + driver type to load; if False, the data driver is + loaded. If True, the control driver is loaded. """ mode = 'control' if control_mode else 'data' driver_type = 'marconi.queues.{0}.storage'.format(mode) + try: mgr = driver.DriverManager(driver_type, conf['drivers'].storage, invoke_on_load=True, - invoke_args=[conf]) + invoke_args=[conf, cache]) + return mgr.driver except RuntimeError as exc: diff --git a/marconi/tests/faulty_storage.py b/marconi/tests/faulty_storage.py index 9e0db921e..39d104091 100644 --- a/marconi/tests/faulty_storage.py +++ b/marconi/tests/faulty_storage.py @@ -17,8 +17,8 @@ from marconi.queues import storage class DataDriver(storage.DataDriverBase): - def __init__(self, conf): - super(DataDriver, self).__init__(conf) + def __init__(self, conf, cache): + super(DataDriver, self).__init__(conf, cache) @property def default_options(self): @@ -42,8 +42,8 @@ class DataDriver(storage.DataDriverBase): class ControlDriver(storage.ControlDriverBase): - def __init__(self, conf): - super(ControlDriver, self).__init__(conf) + def __init__(self, conf, cache): + super(ControlDriver, self).__init__(conf, cache) @property def catalogue_controller(self): diff --git a/marconi/tests/queues/storage/base.py b/marconi/tests/queues/storage/base.py index 19fc9d0d3..37a8c3cd7 100644 --- a/marconi/tests/queues/storage/base.py +++ b/marconi/tests/queues/storage/base.py @@ -21,6 +21,7 @@ import ddt import six from testtools import matchers +from marconi.common.cache import cache as oslo_cache from marconi.openstack.common import timeutils from marconi.queues import storage from marconi.queues.storage import errors @@ -46,7 +47,8 @@ class ControllerBaseTest(testing.TestBase): self.controller_class, self.controller_base_class)) - self.driver = self.driver_class(self.conf) + cache = oslo_cache.get_cache(self.conf) + self.driver = self.driver_class(self.conf, cache) self._prepare_conf() self.addCleanup(self._purge_databases) diff --git a/tests/unit/queues/storage/test_impl_mongodb.py b/tests/unit/queues/storage/test_impl_mongodb.py index caf96806a..fa0a61364 100644 --- a/tests/unit/queues/storage/test_impl_mongodb.py +++ b/tests/unit/queues/storage/test_impl_mongodb.py @@ -21,6 +21,7 @@ from pymongo import cursor import pymongo.errors from testtools import matchers +from marconi.common.cache import cache as oslo_cache from marconi.openstack.common import timeutils from marconi.queues import storage from marconi.queues.storage import errors @@ -92,7 +93,8 @@ class MongodbDriverTest(testing.TestBase, MongodbTestMixin): config_file = 'wsgi_mongodb.conf' def test_db_instance(self): - driver = mongodb.DataDriver(self.conf) + cache = oslo_cache.get_cache(self.conf) + driver = mongodb.DataDriver(self.conf, cache) databases = (driver.message_databases + [driver.queues_database]) diff --git a/tests/unit/queues/storage/test_shard_catalog.py b/tests/unit/queues/storage/test_shard_catalog.py index f66daba13..507f60943 100644 --- a/tests/unit/queues/storage/test_shard_catalog.py +++ b/tests/unit/queues/storage/test_shard_catalog.py @@ -18,6 +18,7 @@ import uuid from oslo.config import cfg +from marconi.common.cache import cache as oslo_cache from marconi.queues.storage import sharding from marconi.queues.storage import sqlite from marconi.queues.storage import utils @@ -37,7 +38,10 @@ class TestShardCatalog(testing.TestBase): self.conf.register_opts([cfg.StrOpt('storage')], group='drivers') - control = utils.load_storage_driver(self.conf, control_mode=True) + cache = oslo_cache.get_cache(self.conf) + control = utils.load_storage_driver(self.conf, cache, + control_mode=True) + self.catalogue_ctrl = control.catalogue_controller self.shards_ctrl = control.shards_controller @@ -47,7 +51,7 @@ class TestShardCatalog(testing.TestBase): self.project = str(uuid.uuid1()) self.shards_ctrl.create(self.shard, 100, 'sqlite://memory') self.catalogue_ctrl.insert(self.project, self.queue, self.shard) - self.catalog = sharding.Catalog(self.conf, control) + self.catalog = sharding.Catalog(self.conf, cache, control) def tearDown(self): self.catalogue_ctrl.drop_all() diff --git a/tests/unit/queues/storage/test_shard_queues.py b/tests/unit/queues/storage/test_shard_queues.py index 546fe1d60..a46869097 100644 --- a/tests/unit/queues/storage/test_shard_queues.py +++ b/tests/unit/queues/storage/test_shard_queues.py @@ -19,6 +19,7 @@ import uuid from oslo.config import cfg +from marconi.common.cache import cache as oslo_cache from marconi.queues.storage import sharding from marconi.queues.storage import utils from marconi import tests as testing @@ -33,11 +34,12 @@ class TestShardQueues(base.TestBase): conf = self.load_conf('wsgi_mongodb_sharded.conf') conf.register_opts([cfg.StrOpt('storage')], - group='queues:drivers') + group='drivers') - control = utils.load_storage_driver(conf, control_mode=True) + cache = oslo_cache.get_cache(self.conf) + control = utils.load_storage_driver(conf, cache, control_mode=True) self.shards_ctrl = control.shards_controller - self.driver = sharding.DataDriver(conf, control) + self.driver = sharding.DataDriver(conf, cache, control) self.controller = self.driver.queue_controller # fake two shards