chore: Pass cache into drivers
This patch plumbs storage drivers so they all get a cache instance they can use at their discretion. I decided to make cache a required param to the initializers, since any deployment of Marconi of any significance would likely opt to enable caching anyway, and the cache library has a default in-memory backend that can be used for simple deployments. Note also that both data- and control-plane drivers receive a cache instance, even though the sharding controllers will not use it (caching will be done in the sharding data driver instead.) I thought it would be better to pass cache in both cases so we can share test code and avoid complicating utils.load_storage_driver(). Also, the control driver may eventually support operations other than sharding; cache may come in handy then. Change-Id: I647791af0d7a5914c30cb2489033ec650a455370 Signed-off-by: kgriffs <kurt.griffiths@rackspace.com>
This commit is contained in:
parent
d30edc4db0
commit
ecce97fd03
@ -70,9 +70,11 @@ class Bootstrap(object):
|
||||
|
||||
if self.conf.sharding:
|
||||
LOG.debug(_(u'Storage sharding enabled'))
|
||||
storage_driver = sharding.DataDriver(self.conf, self.control)
|
||||
storage_driver = sharding.DataDriver(self.conf, self.cache,
|
||||
self.control)
|
||||
else:
|
||||
storage_driver = storage_utils.load_storage_driver(self.conf)
|
||||
storage_driver = storage_utils.load_storage_driver(
|
||||
self.conf, self.cache)
|
||||
|
||||
LOG.debug(_(u'Loading storage pipeline'))
|
||||
return pipeline.DataDriver(self.conf, storage_driver)
|
||||
@ -80,12 +82,12 @@ class Bootstrap(object):
|
||||
@decorators.lazy_property(write=False)
|
||||
def control(self):
|
||||
LOG.debug(_(u'Loading storage control driver'))
|
||||
return storage_utils.load_storage_driver(self.conf,
|
||||
return storage_utils.load_storage_driver(self.conf, self.cache,
|
||||
control_mode=True)
|
||||
|
||||
@decorators.lazy_property(write=False)
|
||||
def cache(self):
|
||||
LOG.debug(_(u'Loading Proxy Cache Driver'))
|
||||
LOG.debug(_(u'Loading proxy cache driver'))
|
||||
try:
|
||||
mgr = oslo_cache.get_cache(self.conf)
|
||||
return mgr
|
||||
|
@ -32,7 +32,22 @@ _LIMITS_GROUP = 'limits:storage'
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class DataDriverBase(object):
|
||||
class DriverBase(object):
|
||||
"""Base class for both data and control plane drivers
|
||||
|
||||
:param conf: Configuration containing options for this driver.
|
||||
:type conf: `oslo.config.ConfigOpts`
|
||||
:param cache: Cache instance to use for reducing latency
|
||||
for certain lookups.
|
||||
:type cache: `marconi.common.cache.backends.BaseCache`
|
||||
"""
|
||||
def __init__(self, conf, cache):
|
||||
self.conf = conf
|
||||
self.cache = cache
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class DataDriverBase(DriverBase):
|
||||
"""Interface definition for storage drivers.
|
||||
|
||||
Data plane storage drivers are responsible for implementing the
|
||||
@ -41,16 +56,15 @@ class DataDriverBase(object):
|
||||
Connection information and driver-specific options are
|
||||
loaded from the config file or the shard catalog.
|
||||
|
||||
:param conf: Driver configuration. Can be any
|
||||
dict-like object containing the expected
|
||||
options. Must at least include 'uri' which
|
||||
provides connection options such as host and
|
||||
port.
|
||||
|
||||
:param conf: Configuration containing options for this driver.
|
||||
:type conf: `oslo.config.ConfigOpts`
|
||||
:param cache: Cache instance to use for reducing latency
|
||||
for certain lookups.
|
||||
:type cache: `marconi.common.cache.backends.BaseCache`
|
||||
"""
|
||||
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
def __init__(self, conf, cache):
|
||||
super(DataDriverBase, self).__init__(conf, cache)
|
||||
|
||||
self.conf.register_opts(_LIMITS_OPTIONS, group=_LIMITS_GROUP)
|
||||
self.limits_conf = self.conf[_LIMITS_GROUP]
|
||||
@ -77,7 +91,7 @@ class DataDriverBase(object):
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class ControlDriverBase(object):
|
||||
class ControlDriverBase(DriverBase):
|
||||
"""Interface definition for control plane storage drivers.
|
||||
|
||||
Storage drivers that work at the control plane layer allow one to
|
||||
@ -86,10 +100,13 @@ class ControlDriverBase(object):
|
||||
|
||||
Allows access to the shard registry through a catalogue and a
|
||||
shard controller.
|
||||
"""
|
||||
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
:param conf: Configuration containing options for this driver.
|
||||
:type conf: `oslo.config.ConfigOpts`
|
||||
:param cache: Cache instance to use for reducing latency
|
||||
for certain lookups.
|
||||
:type cache: `marconi.common.cache.backends.BaseCache`
|
||||
"""
|
||||
|
||||
@abc.abstractproperty
|
||||
def catalogue_controller(self):
|
||||
|
@ -39,8 +39,8 @@ def _connection(conf):
|
||||
|
||||
class DataDriver(storage.DataDriverBase):
|
||||
|
||||
def __init__(self, conf):
|
||||
super(DataDriver, self).__init__(conf)
|
||||
def __init__(self, conf, cache):
|
||||
super(DataDriver, self).__init__(conf, cache)
|
||||
|
||||
opts = options.MONGODB_OPTIONS
|
||||
|
||||
@ -112,8 +112,8 @@ class DataDriver(storage.DataDriverBase):
|
||||
|
||||
class ControlDriver(storage.ControlDriverBase):
|
||||
|
||||
def __init__(self, conf):
|
||||
super(ControlDriver, self).__init__(conf)
|
||||
def __init__(self, conf, cache):
|
||||
super(ControlDriver, self).__init__(conf, cache)
|
||||
|
||||
self.conf.register_opts(options.MONGODB_OPTIONS,
|
||||
group=options.MONGODB_GROUP)
|
||||
|
@ -82,15 +82,15 @@ def _get_storage_pipeline(resource_name, conf):
|
||||
class DataDriver(base.DataDriverBase):
|
||||
"""Meta-driver for injecting pipelines in front of controllers.
|
||||
|
||||
:param storage_conf: For real drivers, this would be used to
|
||||
configure the storage, but in this case it is simply ignored.
|
||||
:param conf: Configuration from which to load pipeline settings
|
||||
:param storage: Storage driver that will service requests as the
|
||||
last step in the pipeline
|
||||
"""
|
||||
|
||||
def __init__(self, conf, storage):
|
||||
super(DataDriver, self).__init__(conf)
|
||||
# NOTE(kgriffs): Pass None for cache since it won't ever
|
||||
# be referenced.
|
||||
super(DataDriver, self).__init__(conf, None)
|
||||
self._storage = storage
|
||||
|
||||
def is_alive(self):
|
||||
|
@ -38,13 +38,16 @@ _CATALOG_GROUP = 'sharding:catalog'
|
||||
class DataDriver(storage.DataDriverBase):
|
||||
"""Sharding meta-driver for routing requests to multiple backends.
|
||||
|
||||
:param storage_conf: Ignored, since this is a meta-driver
|
||||
:param catalog_conf: Options pertaining to the shard catalog
|
||||
:param conf: Configuration from which to read sharding options
|
||||
:param cache: Cache instance that will be passed to individual
|
||||
storage driver instances that correspond to each shard. will
|
||||
also be used by the shard controller to reduce latency for
|
||||
some operations.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, control):
|
||||
super(DataDriver, self).__init__(conf)
|
||||
self._shard_catalog = Catalog(conf, control)
|
||||
def __init__(self, conf, cache, control):
|
||||
super(DataDriver, self).__init__(conf, cache)
|
||||
self._shard_catalog = Catalog(conf, cache, control)
|
||||
|
||||
def is_alive(self):
|
||||
return all(self._shard_catalog.get_driver(shard['name']).is_alive()
|
||||
@ -307,9 +310,10 @@ class ClaimController(RoutingController):
|
||||
class Catalog(object):
|
||||
"""Represents the mapping between queues and shard drivers."""
|
||||
|
||||
def __init__(self, conf, control):
|
||||
def __init__(self, conf, cache, control):
|
||||
self._drivers = {}
|
||||
self._conf = conf
|
||||
self._cache = cache
|
||||
|
||||
self._conf.register_opts(_CATALOG_OPTIONS, group=_CATALOG_GROUP)
|
||||
self._catalog_conf = self._conf[_CATALOG_GROUP]
|
||||
@ -355,7 +359,7 @@ class Catalog(object):
|
||||
conf.register_opts(general_opts)
|
||||
conf.register_opts(driver_opts, group=u'drivers')
|
||||
conf.register_opts(storage_opts, group=storage_group)
|
||||
return utils.load_storage_driver(conf)
|
||||
return utils.load_storage_driver(conf, self._cache)
|
||||
|
||||
def register(self, queue, project=None):
|
||||
"""Register a new queue in the shard catalog.
|
||||
|
@ -36,8 +36,8 @@ _SQLITE_GROUP = 'drivers:storage:sqlite'
|
||||
|
||||
class DataDriver(storage.DataDriverBase):
|
||||
|
||||
def __init__(self, conf):
|
||||
super(DataDriver, self).__init__(conf)
|
||||
def __init__(self, conf, cache):
|
||||
super(DataDriver, self).__init__(conf, cache)
|
||||
|
||||
self.conf.register_opts(_SQLITE_OPTIONS, group=_SQLITE_GROUP)
|
||||
self.sqlite_conf = self.conf[_SQLITE_GROUP]
|
||||
@ -203,8 +203,8 @@ class DataDriver(storage.DataDriverBase):
|
||||
|
||||
class ControlDriver(storage.ControlDriverBase):
|
||||
|
||||
def __init__(self, conf):
|
||||
super(ControlDriver, self).__init__(conf)
|
||||
def __init__(self, conf, cache):
|
||||
super(ControlDriver, self).__init__(conf, cache)
|
||||
|
||||
self.conf.register_opts(_SQLITE_OPTIONS, group=_SQLITE_GROUP)
|
||||
self.sqlite_conf = self.conf[_SQLITE_GROUP]
|
||||
|
@ -22,22 +22,30 @@ from marconi.openstack.common import log
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def load_storage_driver(conf, control_mode=False):
|
||||
def load_storage_driver(conf, cache, control_mode=False):
|
||||
"""Loads a storage driver and returns it.
|
||||
|
||||
The driver's initializer will be passed conf as its only arg.
|
||||
The driver's initializer will be passed conf and cache as
|
||||
its positional args.
|
||||
|
||||
:param conf: Configuration instance to use for loading the
|
||||
driver. Must include a 'drivers' group.
|
||||
:param cache: Cache instance that the driver can (optionally)
|
||||
use to reduce latency for some operations.
|
||||
:param control_mode: (Default False). Determines which
|
||||
driver type to load; if False, the data driver is
|
||||
loaded. If True, the control driver is loaded.
|
||||
"""
|
||||
|
||||
mode = 'control' if control_mode else 'data'
|
||||
driver_type = 'marconi.queues.{0}.storage'.format(mode)
|
||||
|
||||
try:
|
||||
mgr = driver.DriverManager(driver_type,
|
||||
conf['drivers'].storage,
|
||||
invoke_on_load=True,
|
||||
invoke_args=[conf])
|
||||
invoke_args=[conf, cache])
|
||||
|
||||
return mgr.driver
|
||||
|
||||
except RuntimeError as exc:
|
||||
|
@ -17,8 +17,8 @@ from marconi.queues import storage
|
||||
|
||||
|
||||
class DataDriver(storage.DataDriverBase):
|
||||
def __init__(self, conf):
|
||||
super(DataDriver, self).__init__(conf)
|
||||
def __init__(self, conf, cache):
|
||||
super(DataDriver, self).__init__(conf, cache)
|
||||
|
||||
@property
|
||||
def default_options(self):
|
||||
@ -42,8 +42,8 @@ class DataDriver(storage.DataDriverBase):
|
||||
|
||||
class ControlDriver(storage.ControlDriverBase):
|
||||
|
||||
def __init__(self, conf):
|
||||
super(ControlDriver, self).__init__(conf)
|
||||
def __init__(self, conf, cache):
|
||||
super(ControlDriver, self).__init__(conf, cache)
|
||||
|
||||
@property
|
||||
def catalogue_controller(self):
|
||||
|
@ -21,6 +21,7 @@ import ddt
|
||||
import six
|
||||
from testtools import matchers
|
||||
|
||||
from marconi.common.cache import cache as oslo_cache
|
||||
from marconi.openstack.common import timeutils
|
||||
from marconi.queues import storage
|
||||
from marconi.queues.storage import errors
|
||||
@ -46,7 +47,8 @@ class ControllerBaseTest(testing.TestBase):
|
||||
self.controller_class,
|
||||
self.controller_base_class))
|
||||
|
||||
self.driver = self.driver_class(self.conf)
|
||||
cache = oslo_cache.get_cache(self.conf)
|
||||
self.driver = self.driver_class(self.conf, cache)
|
||||
self._prepare_conf()
|
||||
|
||||
self.addCleanup(self._purge_databases)
|
||||
|
@ -21,6 +21,7 @@ from pymongo import cursor
|
||||
import pymongo.errors
|
||||
from testtools import matchers
|
||||
|
||||
from marconi.common.cache import cache as oslo_cache
|
||||
from marconi.openstack.common import timeutils
|
||||
from marconi.queues import storage
|
||||
from marconi.queues.storage import errors
|
||||
@ -92,7 +93,8 @@ class MongodbDriverTest(testing.TestBase, MongodbTestMixin):
|
||||
config_file = 'wsgi_mongodb.conf'
|
||||
|
||||
def test_db_instance(self):
|
||||
driver = mongodb.DataDriver(self.conf)
|
||||
cache = oslo_cache.get_cache(self.conf)
|
||||
driver = mongodb.DataDriver(self.conf, cache)
|
||||
|
||||
databases = (driver.message_databases +
|
||||
[driver.queues_database])
|
||||
|
@ -18,6 +18,7 @@ import uuid
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from marconi.common.cache import cache as oslo_cache
|
||||
from marconi.queues.storage import sharding
|
||||
from marconi.queues.storage import sqlite
|
||||
from marconi.queues.storage import utils
|
||||
@ -37,7 +38,10 @@ class TestShardCatalog(testing.TestBase):
|
||||
|
||||
self.conf.register_opts([cfg.StrOpt('storage')],
|
||||
group='drivers')
|
||||
control = utils.load_storage_driver(self.conf, control_mode=True)
|
||||
cache = oslo_cache.get_cache(self.conf)
|
||||
control = utils.load_storage_driver(self.conf, cache,
|
||||
control_mode=True)
|
||||
|
||||
self.catalogue_ctrl = control.catalogue_controller
|
||||
self.shards_ctrl = control.shards_controller
|
||||
|
||||
@ -47,7 +51,7 @@ class TestShardCatalog(testing.TestBase):
|
||||
self.project = str(uuid.uuid1())
|
||||
self.shards_ctrl.create(self.shard, 100, 'sqlite://memory')
|
||||
self.catalogue_ctrl.insert(self.project, self.queue, self.shard)
|
||||
self.catalog = sharding.Catalog(self.conf, control)
|
||||
self.catalog = sharding.Catalog(self.conf, cache, control)
|
||||
|
||||
def tearDown(self):
|
||||
self.catalogue_ctrl.drop_all()
|
||||
|
@ -19,6 +19,7 @@ import uuid
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from marconi.common.cache import cache as oslo_cache
|
||||
from marconi.queues.storage import sharding
|
||||
from marconi.queues.storage import utils
|
||||
from marconi import tests as testing
|
||||
@ -33,11 +34,12 @@ class TestShardQueues(base.TestBase):
|
||||
conf = self.load_conf('wsgi_mongodb_sharded.conf')
|
||||
|
||||
conf.register_opts([cfg.StrOpt('storage')],
|
||||
group='queues:drivers')
|
||||
group='drivers')
|
||||
|
||||
control = utils.load_storage_driver(conf, control_mode=True)
|
||||
cache = oslo_cache.get_cache(self.conf)
|
||||
control = utils.load_storage_driver(conf, cache, control_mode=True)
|
||||
self.shards_ctrl = control.shards_controller
|
||||
self.driver = sharding.DataDriver(conf, control)
|
||||
self.driver = sharding.DataDriver(conf, cache, control)
|
||||
self.controller = self.driver.queue_controller
|
||||
|
||||
# fake two shards
|
||||
|
Loading…
Reference in New Issue
Block a user