From 962979a9221131336a8aa1610704527626e60120 Mon Sep 17 00:00:00 2001 From: Flavio Percoco Date: Mon, 9 Jun 2014 15:38:55 +0200 Subject: [PATCH] Add flavors support to mongodb This patch is a follow-up on the queue's flavor work. It adds support for such feature to the MongoDB storage driver. Partially-Implements blueprint: marconi-queue-flavors Change-Id: Ib3231ab361896e1c08831d17899dd4111710ab9b --- .../unit/queues/storage/test_impl_mongodb.py | 11 ++ zaqar/queues/storage/__init__.py | 1 + zaqar/queues/storage/base.py | 5 + zaqar/queues/storage/errors.py | 8 + zaqar/queues/storage/mongodb/controllers.py | 2 + zaqar/queues/storage/mongodb/driver.py | 4 + zaqar/queues/storage/mongodb/flavors.py | 126 +++++++++++++++ zaqar/queues/storage/sqlalchemy/driver.py | 5 + zaqar/tests/faulty_storage.py | 4 + zaqar/tests/queues/storage/base.py | 147 ++++++++++++++++++ 10 files changed, 313 insertions(+) create mode 100644 zaqar/queues/storage/mongodb/flavors.py diff --git a/tests/unit/queues/storage/test_impl_mongodb.py b/tests/unit/queues/storage/test_impl_mongodb.py index 634fe1c1b..5436bed58 100644 --- a/tests/unit/queues/storage/test_impl_mongodb.py +++ b/tests/unit/queues/storage/test_impl_mongodb.py @@ -439,3 +439,14 @@ class PooledClaimsTests(base.ClaimControllerTest): driver_class = pooling.DataDriver control_driver_class = mongodb.ControlDriver controller_base_class = pooling.RoutingController + + +@testing.requires_mongodb +class MongodbFlavorsTest(base.FlavorsControllerTest): + driver_class = mongodb.ControlDriver + controller_class = controllers.FlavorsController + + def setUp(self): + super(MongodbFlavorsTest, self).setUp() + self.load_conf('wsgi_mongodb.conf') + self.addCleanup(self.controller.drop_all) diff --git a/zaqar/queues/storage/__init__.py b/zaqar/queues/storage/__init__.py index a466c3b88..e20ca3dae 100644 --- a/zaqar/queues/storage/__init__.py +++ b/zaqar/queues/storage/__init__.py @@ -24,6 +24,7 @@ Claim = base.Claim Message = base.Message Queue = base.Queue PoolsBase = base.PoolsBase +FlavorsBase = base.FlavorsBase DEFAULT_QUEUES_PER_PAGE = base.DEFAULT_QUEUES_PER_PAGE DEFAULT_MESSAGES_PER_PAGE = base.DEFAULT_MESSAGES_PER_PAGE diff --git a/zaqar/queues/storage/base.py b/zaqar/queues/storage/base.py index 0a9c6d472..2b59b5121 100644 --- a/zaqar/queues/storage/base.py +++ b/zaqar/queues/storage/base.py @@ -110,6 +110,11 @@ class ControlDriverBase(DriverBase): """Returns storage's pool management controller.""" raise NotImplementedError + @abc.abstractproperty + def flavors_controller(self): + """Returns storage's flavor management controller.""" + raise NotImplementedError + class ControllerBase(object): """Top-level class for controllers. diff --git a/zaqar/queues/storage/errors.py b/zaqar/queues/storage/errors.py index 12a05adeb..317775e8d 100644 --- a/zaqar/queues/storage/errors.py +++ b/zaqar/queues/storage/errors.py @@ -133,6 +133,14 @@ class PoolDoesNotExist(DoesNotExist): super(PoolDoesNotExist, self).__init__(pool=pool) +class FlavorDoesNotExist(DoesNotExist): + + msg_format = u'Flavor {flavor} does not exist' + + def __init__(self, flavor): + super(FlavorDoesNotExist, self).__init__(flavor=flavor) + + class NoPoolFound(ExceptionBase): msg_format = u'No pools registered' diff --git a/zaqar/queues/storage/mongodb/controllers.py b/zaqar/queues/storage/mongodb/controllers.py index 6b09ba39f..246b2b6ba 100644 --- a/zaqar/queues/storage/mongodb/controllers.py +++ b/zaqar/queues/storage/mongodb/controllers.py @@ -24,6 +24,7 @@ Field Mappings: from zaqar.queues.storage.mongodb import catalogue from zaqar.queues.storage.mongodb import claims +from zaqar.queues.storage.mongodb import flavors from zaqar.queues.storage.mongodb import messages from zaqar.queues.storage.mongodb import pools from zaqar.queues.storage.mongodb import queues @@ -31,6 +32,7 @@ from zaqar.queues.storage.mongodb import queues CatalogueController = catalogue.CatalogueController ClaimController = claims.ClaimController +FlavorsController = flavors.FlavorsController MessageController = messages.MessageController QueueController = queues.QueueController PoolsController = pools.PoolsController diff --git a/zaqar/queues/storage/mongodb/driver.py b/zaqar/queues/storage/mongodb/driver.py index 07dad570a..cc9a0b866 100644 --- a/zaqar/queues/storage/mongodb/driver.py +++ b/zaqar/queues/storage/mongodb/driver.py @@ -168,3 +168,7 @@ class ControlDriver(storage.ControlDriverBase): @property def catalogue_controller(self): return controllers.CatalogueController(self) + + @property + def flavors_controller(self): + return controllers.FlavorsController(self) diff --git a/zaqar/queues/storage/mongodb/flavors.py b/zaqar/queues/storage/mongodb/flavors.py new file mode 100644 index 000000000..986a82fc1 --- /dev/null +++ b/zaqar/queues/storage/mongodb/flavors.py @@ -0,0 +1,126 @@ +# Copyright (c) 2014 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy +# of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +""" +Schema: + 'n': name :: six.text_type + 'p': project :: six.text_type + 's': storage pool :: six.text_type + 'c': capabilities :: dict +""" + +import functools + +from zaqar.queues.storage import base +from zaqar.queues.storage import errors +from zaqar.queues.storage.mongodb import utils + +FLAVORS_INDEX = [ + ('p', 1), + ('n', 1), +] + +# NOTE(cpp-cabrera): used for get/list operations. There's no need to +# show the marker or the _id - they're implementation details. +OMIT_FIELDS = (('_id', False),) + + +def _field_spec(detailed=False): + return dict(OMIT_FIELDS + (() if detailed else (('c', False),))) + + +class FlavorsController(base.FlavorsBase): + + def __init__(self, *args, **kwargs): + super(FlavorsController, self).__init__(*args, **kwargs) + + self._col = self.driver.database.flavors + self._col.ensure_index(FLAVORS_INDEX, + background=True, + name='flavors_name', + unique=True) + + @utils.raises_conn_error + def list(self, project=None, marker=None, limit=10, detailed=False): + query = {'p': project} + if marker is not None: + query['n'] = {'$gt': marker} + + cursor = self._col.find(query, fields=_field_spec(detailed), + limit=limit) + + normalizer = functools.partial(_normalize, detailed=detailed) + return utils.HookedCursor(cursor, normalizer) + + @utils.raises_conn_error + def get(self, name, project=None, detailed=False): + res = self._col.find_one({'n': name, 'p': project}, + _field_spec(detailed)) + + if not res: + raise errors.FlavorDoesNotExist(name) + + return _normalize(res, detailed) + + @utils.raises_conn_error + def create(self, name, pool, project=None, capabilities=None): + # TODO(flaper87): Verify storage exists + capabilities = {} if capabilities is None else capabilities + self._col.update({'n': name, 'p': project}, + {'$set': {'s': pool, 'c': capabilities}}, + upsert=True) + + @utils.raises_conn_error + def exists(self, name, project=None): + return self._col.find_one({'n': name, 'p': project}) is not None + + @utils.raises_conn_error + def update(self, name, project=None, pool=None, capabilities=None): + fields = {} + + if capabilities is not None: + fields['c'] = capabilities + + if pool is not None: + fields['s'] = pool + + assert fields, '`pool` or `capabilities` not found in kwargs' + res = self._col.update({'n': name, 'p': project}, + {'$set': fields}, + upsert=False) + + if not res['updatedExisting']: + raise errors.FlavorDoesNotExist(name) + + @utils.raises_conn_error + def delete(self, name, project=None): + self._col.remove({'n': name, 'p': project}, w=0) + + @utils.raises_conn_error + def drop_all(self): + self._col.drop() + self._col.ensure_index(FLAVORS_INDEX, unique=True) + + +def _normalize(pool, detailed=False): + ret = { + 'name': pool['n'], + 'project': pool['p'], + 'pool': pool['s'], + } + + if detailed: + ret['capabilities'] = pool['c'] + + return ret diff --git a/zaqar/queues/storage/sqlalchemy/driver.py b/zaqar/queues/storage/sqlalchemy/driver.py index 6ae491300..e43efb878 100644 --- a/zaqar/queues/storage/sqlalchemy/driver.py +++ b/zaqar/queues/storage/sqlalchemy/driver.py @@ -154,3 +154,8 @@ class ControlDriver(storage.ControlDriverBase): @property def catalogue_controller(self): return controllers.CatalogueController(self) + + @property + def flavors_controller(self): + # NOTE(flaper87): Needed to avoid `abc` errors. + raise NotImplementedError diff --git a/zaqar/tests/faulty_storage.py b/zaqar/tests/faulty_storage.py index 258f89600..995feaa9c 100644 --- a/zaqar/tests/faulty_storage.py +++ b/zaqar/tests/faulty_storage.py @@ -53,6 +53,10 @@ class ControlDriver(storage.ControlDriverBase): def pools_controller(self): return None + @property + def flavors_controller(self): + return None + class QueueController(storage.Queue): def __init__(self, driver): diff --git a/zaqar/tests/queues/storage/base.py b/zaqar/tests/queues/storage/base.py index 15be629c6..997836182 100644 --- a/zaqar/tests/queues/storage/base.py +++ b/zaqar/tests/queues/storage/base.py @@ -991,6 +991,153 @@ class CatalogueControllerTest(ControllerBaseTest): self.controller.insert(self.project, q2, u'a') +class FlavorsControllerTest(ControllerBaseTest): + """Flavors Controller base tests. + + NOTE(flaper87): Implementations of this class should + override the tearDown method in order + to clean up storage's state. + """ + controller_base_class = storage.FlavorsBase + + def setUp(self): + super(FlavorsControllerTest, self).setUp() + self.pools_controller = self.driver.pools_controller + self.flavors_controller = self.driver.flavors_controller + + # Let's create one pool + self.pool = str(uuid.uuid1()) + self.pools_controller.create(self.pool, 100, 'localhost', {}) + + def tearDown(self): + self.flavors_controller.drop_all() + super(FlavorsControllerTest, self).tearDown() + + def test_create_succeeds(self): + self.flavors_controller.create('durable', self.pool, + project=self.project, + capabilities={}) + + def _flavors_expects(self, flavor, xname, xproject, xpool): + self.assertIn('name', flavor) + self.assertEqual(flavor['name'], xname) + self.assertIn('project', flavor) + self.assertEqual(flavor['project'], xproject) + self.assertIn('pool', flavor) + self.assertEqual(flavor['pool'], xpool) + + def test_create_replaces_on_duplicate_insert(self): + name = str(uuid.uuid1()) + self.flavors_controller.create(name, self.pool, + project=self.project, + capabilities={}) + self.flavors_controller.create(name, 'another_pool', + project=self.project, + capabilities={}) + entry = self.flavors_controller.get(name, project=self.project) + self._flavors_expects(entry, name, self.project, 'another_pool') + + def test_get_returns_expected_content(self): + name = 'durable' + capabilities = {'fifo': True} + self.flavors_controller.create(name, self.pool, + project=self.project, + capabilities=capabilities) + res = self.flavors_controller.get(name, project=self.project) + self._flavors_expects(res, name, self.project, self.pool) + self.assertNotIn('capabilities', res) + + def test_detailed_get_returns_expected_content(self): + name = 'durable' + capabilities = {'fifo': True} + self.flavors_controller.create(name, self.pool, + project=self.project, + capabilities=capabilities) + res = self.flavors_controller.get(name, project=self.project, + detailed=True) + self._flavors_expects(res, name, self.project, self.pool) + self.assertIn('capabilities', res) + self.assertEqual(res['capabilities'], capabilities) + + def test_get_raises_if_not_found(self): + self.assertRaises(storage.errors.FlavorDoesNotExist, + self.flavors_controller.get, 'notexists') + + def test_exists(self): + self.flavors_controller.create('exists', self.pool, + project=self.project, + capabilities={}) + self.assertTrue(self.flavors_controller.exists('exists', + project=self.project)) + self.assertFalse(self.flavors_controller.exists('notexists', + project=self.project)) + + def test_update_raises_assertion_error_on_bad_fields(self): + self.assertRaises(AssertionError, self.pools_controller.update, + self.pool) + + def test_update_works(self): + name = 'yummy' + self.flavors_controller.create(name, self.pool, + project=self.project, + capabilities={}) + + res = self.flavors_controller.get(name, project=self.project, + detailed=True) + + new_capabilities = {'fifo': False} + self.flavors_controller.update(name, project=self.project, + pool='olympic', + capabilities={'fifo': False}) + res = self.flavors_controller.get(name, project=self.project, + detailed=True) + self._flavors_expects(res, name, self.project, 'olympic') + self.assertEqual(res['capabilities'], new_capabilities) + + def test_delete_works(self): + name = 'puke' + self.flavors_controller.create(name, self.pool, + project=self.project, + capabilities={}) + self.flavors_controller.delete(name, project=self.project) + self.assertFalse(self.flavors_controller.exists(name)) + + def test_delete_nonexistent_is_silent(self): + self.flavors_controller.delete('nonexisting') + + def test_drop_all_leads_to_empty_listing(self): + self.flavors_controller.drop_all() + cursor = self.flavors_controller.list() + self.assertRaises(StopIteration, next, cursor) + + def test_listing_simple(self): + name_gen = lambda i: chr(ord('A') + i) + for i in range(15): + self.flavors_controller.create(name_gen(i), project=self.project, + pool=str(i), capabilities={}) + + res = list(self.flavors_controller.list(project=self.project)) + self.assertEqual(len(res), 10) + for i, entry in enumerate(res): + self._flavors_expects(entry, name_gen(i), self.project, str(i)) + self.assertNotIn('capabilities', entry) + + res = list(self.flavors_controller.list(project=self.project, limit=5)) + self.assertEqual(len(res), 5) + + res = next(self.flavors_controller.list(project=self.project, + marker=name_gen(3))) + self._flavors_expects(res, name_gen(4), self.project, '4') + + res = list(self.flavors_controller.list(project=self.project, + detailed=True)) + self.assertEqual(len(res), 10) + for i, entry in enumerate(res): + self._flavors_expects(entry, name_gen(i), self.project, str(i)) + self.assertIn('capabilities', entry) + self.assertEqual(entry['capabilities'], {}) + + def _insert_fixtures(controller, queue_name, project=None, client_uuid=None, num=4, ttl=120):