Forbid adding stores with mismatching capabilities
We're currently allow stores with mismatching capabilities to co-exist under the same pool group. This is wrong, as it's assumed that all stores in a pool group support the same capabilities. Partially-Implements blueprint: expose-storage-capabilities Change-Id: I7bc5065ab2e5d9db05a3995f97bd6c6d5b2e538f
This commit is contained in:
parent
bb18ce1c82
commit
55179bc02e
@ -14,8 +14,6 @@
|
||||
|
||||
import uuid
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from zaqar.openstack.common.cache import cache as oslo_cache
|
||||
from zaqar.storage import errors
|
||||
from zaqar.storage import pooling
|
||||
@ -35,8 +33,6 @@ class PoolCatalogTest(testing.TestBase):
|
||||
def setUp(self):
|
||||
super(PoolCatalogTest, self).setUp()
|
||||
|
||||
self.conf.register_opts([cfg.StrOpt('storage')],
|
||||
group='drivers')
|
||||
cache = oslo_cache.get_cache()
|
||||
control = utils.load_storage_driver(self.conf, cache,
|
||||
control_mode=True)
|
||||
|
@ -15,7 +15,6 @@
|
||||
import random
|
||||
import uuid
|
||||
|
||||
from oslo_config import cfg
|
||||
import six
|
||||
|
||||
from zaqar.openstack.common.cache import cache as oslo_cache
|
||||
@ -32,9 +31,6 @@ class PoolQueuesTest(testing.TestBase):
|
||||
def setUp(self):
|
||||
super(PoolQueuesTest, self).setUp()
|
||||
|
||||
self.conf.register_opts([cfg.StrOpt('storage')],
|
||||
group='drivers')
|
||||
|
||||
cache = oslo_cache.get_cache()
|
||||
control = utils.load_storage_driver(self.conf, cache,
|
||||
control_mode=True)
|
||||
|
@ -26,6 +26,8 @@ from oslo_config import cfg
|
||||
import six
|
||||
|
||||
import zaqar.openstack.common.log as logging
|
||||
from zaqar.storage import errors
|
||||
from zaqar.storage import utils
|
||||
|
||||
|
||||
DEFAULT_QUEUES_PER_PAGE = 10
|
||||
@ -679,7 +681,27 @@ class Subscription(ControllerBase):
|
||||
class PoolsBase(ControllerBase):
|
||||
"""A controller for managing pools."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def _check_capabilities(self, uri, group=None, name=None):
|
||||
if name:
|
||||
group = list(self._get_group(self._get(name)['group']))
|
||||
else:
|
||||
group = list(self._get_group(group))
|
||||
|
||||
if not len(group) > 0:
|
||||
return True
|
||||
|
||||
default_store = self.driver.conf.drivers.storage
|
||||
|
||||
existing_store = utils.load_storage_impl(group[0]['uri'],
|
||||
default_store=default_store)
|
||||
new_store = utils.load_storage_impl(uri,
|
||||
default_store=default_store)
|
||||
|
||||
# NOTE(flaper87): Since all pools in a pool group
|
||||
# are assumed to have the same capabilities, it's
|
||||
# fine to check against just 1
|
||||
return existing_store.BASE_CAPABILITIES == new_store.BASE_CAPABILITIES
|
||||
|
||||
def list(self, marker=None, limit=DEFAULT_POOLS_PER_PAGE,
|
||||
detailed=False):
|
||||
"""Lists all registered pools.
|
||||
@ -694,9 +716,10 @@ class PoolsBase(ControllerBase):
|
||||
:rtype: [{}]
|
||||
"""
|
||||
|
||||
raise NotImplementedError
|
||||
return self._list(marker, limit, detailed)
|
||||
|
||||
_list = abc.abstractmethod(lambda x: None)
|
||||
|
||||
@abc.abstractmethod
|
||||
def create(self, name, weight, uri, group=None, options=None):
|
||||
"""Registers a pool entry.
|
||||
|
||||
@ -712,10 +735,13 @@ class PoolsBase(ControllerBase):
|
||||
:param options: Options used to configure this pool
|
||||
:type options: dict
|
||||
"""
|
||||
if not self._check_capabilities(uri, group=group):
|
||||
raise errors.PoolCapabilitiesMismatch()
|
||||
|
||||
raise NotImplementedError
|
||||
return self._create(name, weight, uri, group, options)
|
||||
|
||||
_create = abc.abstractmethod(lambda x: None)
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_group(self, group=None, detailed=False):
|
||||
"""Returns a single pool entry.
|
||||
|
||||
@ -728,10 +754,10 @@ class PoolsBase(ControllerBase):
|
||||
:rtype: {}
|
||||
:raises: PoolDoesNotExist if not found
|
||||
"""
|
||||
return self._get_group(group, detailed)
|
||||
|
||||
raise NotImplementedError
|
||||
_get_group = abc.abstractmethod(lambda x: None)
|
||||
|
||||
@abc.abstractmethod
|
||||
def get(self, name, detailed=False):
|
||||
"""Returns a single pool entry.
|
||||
|
||||
@ -743,10 +769,10 @@ class PoolsBase(ControllerBase):
|
||||
:rtype: {}
|
||||
:raises: PoolDoesNotExist if not found
|
||||
"""
|
||||
return self._get(name, detailed)
|
||||
|
||||
raise NotImplementedError
|
||||
_get = abc.abstractmethod(lambda x: None)
|
||||
|
||||
@abc.abstractmethod
|
||||
def exists(self, name):
|
||||
"""Returns a single pool entry.
|
||||
|
||||
@ -755,10 +781,10 @@ class PoolsBase(ControllerBase):
|
||||
:returns: True if the pool exists
|
||||
:rtype: bool
|
||||
"""
|
||||
return self._exists(name)
|
||||
|
||||
raise NotImplementedError
|
||||
_exists = abc.abstractmethod(lambda x: None)
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete(self, name):
|
||||
"""Removes a pool entry.
|
||||
|
||||
@ -766,10 +792,10 @@ class PoolsBase(ControllerBase):
|
||||
:type name: six.text_type
|
||||
:rtype: None
|
||||
"""
|
||||
return self._delete(name)
|
||||
|
||||
raise NotImplementedError
|
||||
_delete = abc.abstractmethod(lambda x: None)
|
||||
|
||||
@abc.abstractmethod
|
||||
def update(self, name, **kwargs):
|
||||
"""Updates the weight, uris, and/or options of this pool
|
||||
|
||||
@ -779,14 +805,19 @@ class PoolsBase(ControllerBase):
|
||||
:type kwargs: dict
|
||||
:raises: PoolDoesNotExist
|
||||
"""
|
||||
uri = kwargs.get('uri')
|
||||
if uri and not self._check_capabilities(uri, name=name):
|
||||
raise errors.PoolCapabilitiesMismatch()
|
||||
|
||||
raise NotImplementedError
|
||||
return self._update(name, **kwargs)
|
||||
|
||||
_update = abc.abstractmethod(lambda x: None)
|
||||
|
||||
@abc.abstractmethod
|
||||
def drop_all(self):
|
||||
"""Deletes all pools from storage."""
|
||||
return self._drop_all()
|
||||
|
||||
raise NotImplementedError
|
||||
_drop_all = abc.abstractmethod(lambda x: None)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
|
@ -185,3 +185,9 @@ class SubscriptionDoesNotExist(DoesNotExist):
|
||||
def __init__(self, subscription_id):
|
||||
super(SubscriptionDoesNotExist,
|
||||
self).__init__(subscription_id=subscription_id)
|
||||
|
||||
|
||||
class PoolCapabilitiesMismatch(ExceptionBase):
|
||||
|
||||
msg_format = (u'The pool being added does not '
|
||||
u'support the minimum set of capabilities')
|
||||
|
@ -54,7 +54,7 @@ class PoolsController(base.PoolsBase):
|
||||
unique=True)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def list(self, marker=None, limit=10, detailed=False):
|
||||
def _list(self, marker=None, limit=10, detailed=False):
|
||||
query = {}
|
||||
if marker is not None:
|
||||
query['n'] = {'$gt': marker}
|
||||
@ -71,7 +71,7 @@ class PoolsController(base.PoolsBase):
|
||||
yield marker_name and marker_name['next']
|
||||
|
||||
@utils.raises_conn_error
|
||||
def get(self, name, detailed=False):
|
||||
def _get(self, name, detailed=False):
|
||||
res = self._col.find_one({'n': name},
|
||||
_field_spec(detailed))
|
||||
if not res:
|
||||
@ -80,13 +80,13 @@ class PoolsController(base.PoolsBase):
|
||||
return _normalize(res, detailed)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def get_group(self, group=None, detailed=False):
|
||||
def _get_group(self, group=None, detailed=False):
|
||||
cursor = self._col.find({'g': group}, fields=_field_spec(detailed))
|
||||
normalizer = functools.partial(_normalize, detailed=detailed)
|
||||
return utils.HookedCursor(cursor, normalizer)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def create(self, name, weight, uri, group=None, options=None):
|
||||
def _create(self, name, weight, uri, group=None, options=None):
|
||||
options = {} if options is None else options
|
||||
self._col.update({'n': name},
|
||||
{'$set': {'n': name,
|
||||
@ -97,11 +97,11 @@ class PoolsController(base.PoolsBase):
|
||||
upsert=True)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def exists(self, name):
|
||||
def _exists(self, name):
|
||||
return self._col.find_one({'n': name}) is not None
|
||||
|
||||
@utils.raises_conn_error
|
||||
def update(self, name, **kwargs):
|
||||
def _update(self, name, **kwargs):
|
||||
names = ('uri', 'weight', 'group', 'options')
|
||||
fields = common_utils.fields(kwargs, names,
|
||||
pred=lambda x: x is not None,
|
||||
@ -116,7 +116,7 @@ class PoolsController(base.PoolsBase):
|
||||
raise errors.PoolDoesNotExist(name)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def delete(self, name):
|
||||
def _delete(self, name):
|
||||
# NOTE(wpf): Initializing the Flavors controller here instead of
|
||||
# doing so in __init__ is required to avoid falling in a maximum
|
||||
# recursion error.
|
||||
@ -138,7 +138,7 @@ class PoolsController(base.PoolsBase):
|
||||
pass
|
||||
|
||||
@utils.raises_conn_error
|
||||
def drop_all(self):
|
||||
def _drop_all(self):
|
||||
self._col.drop()
|
||||
self._col.ensure_index(POOLS_INDEX, unique=True)
|
||||
|
||||
|
@ -41,7 +41,7 @@ class PoolsController(base.PoolsBase):
|
||||
self._conn = self.driver.connection
|
||||
|
||||
@utils.raises_conn_error
|
||||
def list(self, marker=None, limit=10, detailed=False):
|
||||
def _list(self, marker=None, limit=10, detailed=False):
|
||||
marker = marker or ''
|
||||
|
||||
# TODO(cpp-cabrera): optimization - limit the columns returned
|
||||
@ -65,7 +65,7 @@ class PoolsController(base.PoolsBase):
|
||||
yield marker_name and marker_name['next']
|
||||
|
||||
@utils.raises_conn_error
|
||||
def get_group(self, group=None, detailed=False):
|
||||
def _get_group(self, group=None, detailed=False):
|
||||
stmt = sa.sql.select([tables.Pools]).where(
|
||||
tables.Pools.c.group == group
|
||||
)
|
||||
@ -75,7 +75,7 @@ class PoolsController(base.PoolsBase):
|
||||
return (normalizer(v) for v in cursor)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def get(self, name, detailed=False):
|
||||
def _get(self, name, detailed=False):
|
||||
stmt = sa.sql.select([tables.Pools]).where(
|
||||
tables.Pools.c.name == name
|
||||
)
|
||||
@ -88,7 +88,7 @@ class PoolsController(base.PoolsBase):
|
||||
|
||||
# TODO(cpp-cabrera): rename to upsert
|
||||
@utils.raises_conn_error
|
||||
def create(self, name, weight, uri, group=None, options=None):
|
||||
def _create(self, name, weight, uri, group=None, options=None):
|
||||
opts = None if options is None else utils.json_encode(options)
|
||||
|
||||
try:
|
||||
@ -104,14 +104,14 @@ class PoolsController(base.PoolsBase):
|
||||
group=group, options=options)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def exists(self, name):
|
||||
def _exists(self, name):
|
||||
stmt = sa.sql.select([tables.Pools.c.name]).where(
|
||||
tables.Pools.c.name == name
|
||||
).limit(1)
|
||||
return self._conn.execute(stmt).fetchone() is not None
|
||||
|
||||
@utils.raises_conn_error
|
||||
def update(self, name, **kwargs):
|
||||
def _update(self, name, **kwargs):
|
||||
# NOTE(cpp-cabrera): by pruning None-valued kwargs, we avoid
|
||||
# overwriting the existing options field with None, since that
|
||||
# one can be null.
|
||||
@ -133,14 +133,14 @@ class PoolsController(base.PoolsBase):
|
||||
raise errors.PoolDoesNotExist(name)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def delete(self, name):
|
||||
def _delete(self, name):
|
||||
stmt = sa.sql.expression.delete(tables.Pools).where(
|
||||
tables.Pools.c.name == name
|
||||
)
|
||||
self._conn.execute(stmt)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def drop_all(self):
|
||||
def _drop_all(self):
|
||||
stmt = sa.sql.expression.delete(tables.Pools)
|
||||
self._conn.execute(stmt)
|
||||
|
||||
|
@ -65,6 +65,32 @@ def dynamic_conf(uri, options, conf=None):
|
||||
return conf
|
||||
|
||||
|
||||
def load_storage_impl(uri, control_mode=False, default_store=None):
|
||||
"""Loads a storage driver implementation and returns it.
|
||||
|
||||
:param uri: The connection uri to parse and load a driver for.
|
||||
:param control_mode: (Default False). Determines which
|
||||
driver type to load; if False, the data driver is
|
||||
loaded. If True, the control driver is loaded.
|
||||
:param default_store: The default store to load if no scheme
|
||||
is parsed.
|
||||
"""
|
||||
|
||||
mode = 'control' if control_mode else 'data'
|
||||
driver_type = 'zaqar.{0}.storage'.format(mode)
|
||||
storage_type = six.moves.urllib_parse.urlparse(uri).scheme or default_store
|
||||
|
||||
try:
|
||||
mgr = driver.DriverManager(driver_type, storage_type,
|
||||
invoke_on_load=False)
|
||||
|
||||
return mgr.driver
|
||||
|
||||
except Exception as exc:
|
||||
LOG.exception(exc)
|
||||
raise errors.InvalidDriver(exc)
|
||||
|
||||
|
||||
def load_storage_driver(conf, cache, storage_type=None, control_mode=False):
|
||||
"""Loads a storage driver and returns it.
|
||||
|
||||
|
@ -51,6 +51,8 @@ class TestBase(testtools.TestCase):
|
||||
self.conf = cfg.ConfigOpts()
|
||||
|
||||
self.conf.register_opts(bootstrap._GENERAL_OPTIONS)
|
||||
self.conf.register_opts(bootstrap._DRIVER_OPTIONS,
|
||||
group=bootstrap._DRIVER_GROUP)
|
||||
|
||||
@classmethod
|
||||
def conf_path(cls, filename):
|
||||
|
@ -1122,11 +1122,13 @@ class PoolsControllerTest(ControllerBaseTest):
|
||||
self.pool)
|
||||
|
||||
def test_update_works(self):
|
||||
# NOTE(flaper87): This may fail for redis. Create
|
||||
# a dummy store for tests.
|
||||
self.pools_controller.update(self.pool, weight=101,
|
||||
uri='redis://localhost',
|
||||
uri='localhost3',
|
||||
options={'a': 1})
|
||||
res = self.pools_controller.get(self.pool, detailed=True)
|
||||
self._pool_expects(res, self.pool, 101, 'redis://localhost')
|
||||
self._pool_expects(res, self.pool, 101, 'localhost3')
|
||||
self.assertEqual(res['options'], {'a': 1})
|
||||
|
||||
def test_delete_works(self):
|
||||
@ -1208,6 +1210,15 @@ class PoolsControllerTest(ControllerBaseTest):
|
||||
self.assertIn('options', entry)
|
||||
self.assertEqual(entry['options'], {})
|
||||
|
||||
def test_mismatching_capabilities(self):
|
||||
# NOTE(flaper87): This may fail for redis. Create
|
||||
# a dummy store for tests.
|
||||
with testing.expect(errors.PoolCapabilitiesMismatch):
|
||||
self.pools_controller.create(str(uuid.uuid1()),
|
||||
100, 'redis://localhost',
|
||||
group=self.pool_group,
|
||||
options={})
|
||||
|
||||
|
||||
class CatalogueControllerTest(ControllerBaseTest):
|
||||
controller_base_class = storage.CatalogueBase
|
||||
|
Loading…
x
Reference in New Issue
Block a user