Add flavors support to mongodb
This patch is a follow-up on the queue's flavor work. It adds support for such feature to the MongoDB storage driver. Partially-Implements blueprint: marconi-queue-flavors Change-Id: Ib3231ab361896e1c08831d17899dd4111710ab9b
This commit is contained in:
parent
8af8395094
commit
962979a922
@ -439,3 +439,14 @@ class PooledClaimsTests(base.ClaimControllerTest):
|
|||||||
driver_class = pooling.DataDriver
|
driver_class = pooling.DataDriver
|
||||||
control_driver_class = mongodb.ControlDriver
|
control_driver_class = mongodb.ControlDriver
|
||||||
controller_base_class = pooling.RoutingController
|
controller_base_class = pooling.RoutingController
|
||||||
|
|
||||||
|
|
||||||
|
@testing.requires_mongodb
|
||||||
|
class MongodbFlavorsTest(base.FlavorsControllerTest):
|
||||||
|
driver_class = mongodb.ControlDriver
|
||||||
|
controller_class = controllers.FlavorsController
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(MongodbFlavorsTest, self).setUp()
|
||||||
|
self.load_conf('wsgi_mongodb.conf')
|
||||||
|
self.addCleanup(self.controller.drop_all)
|
||||||
|
@ -24,6 +24,7 @@ Claim = base.Claim
|
|||||||
Message = base.Message
|
Message = base.Message
|
||||||
Queue = base.Queue
|
Queue = base.Queue
|
||||||
PoolsBase = base.PoolsBase
|
PoolsBase = base.PoolsBase
|
||||||
|
FlavorsBase = base.FlavorsBase
|
||||||
|
|
||||||
DEFAULT_QUEUES_PER_PAGE = base.DEFAULT_QUEUES_PER_PAGE
|
DEFAULT_QUEUES_PER_PAGE = base.DEFAULT_QUEUES_PER_PAGE
|
||||||
DEFAULT_MESSAGES_PER_PAGE = base.DEFAULT_MESSAGES_PER_PAGE
|
DEFAULT_MESSAGES_PER_PAGE = base.DEFAULT_MESSAGES_PER_PAGE
|
||||||
|
@ -110,6 +110,11 @@ class ControlDriverBase(DriverBase):
|
|||||||
"""Returns storage's pool management controller."""
|
"""Returns storage's pool management controller."""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abc.abstractproperty
|
||||||
|
def flavors_controller(self):
|
||||||
|
"""Returns storage's flavor management controller."""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
class ControllerBase(object):
|
class ControllerBase(object):
|
||||||
"""Top-level class for controllers.
|
"""Top-level class for controllers.
|
||||||
|
@ -133,6 +133,14 @@ class PoolDoesNotExist(DoesNotExist):
|
|||||||
super(PoolDoesNotExist, self).__init__(pool=pool)
|
super(PoolDoesNotExist, self).__init__(pool=pool)
|
||||||
|
|
||||||
|
|
||||||
|
class FlavorDoesNotExist(DoesNotExist):
|
||||||
|
|
||||||
|
msg_format = u'Flavor {flavor} does not exist'
|
||||||
|
|
||||||
|
def __init__(self, flavor):
|
||||||
|
super(FlavorDoesNotExist, self).__init__(flavor=flavor)
|
||||||
|
|
||||||
|
|
||||||
class NoPoolFound(ExceptionBase):
|
class NoPoolFound(ExceptionBase):
|
||||||
|
|
||||||
msg_format = u'No pools registered'
|
msg_format = u'No pools registered'
|
||||||
|
@ -24,6 +24,7 @@ Field Mappings:
|
|||||||
|
|
||||||
from zaqar.queues.storage.mongodb import catalogue
|
from zaqar.queues.storage.mongodb import catalogue
|
||||||
from zaqar.queues.storage.mongodb import claims
|
from zaqar.queues.storage.mongodb import claims
|
||||||
|
from zaqar.queues.storage.mongodb import flavors
|
||||||
from zaqar.queues.storage.mongodb import messages
|
from zaqar.queues.storage.mongodb import messages
|
||||||
from zaqar.queues.storage.mongodb import pools
|
from zaqar.queues.storage.mongodb import pools
|
||||||
from zaqar.queues.storage.mongodb import queues
|
from zaqar.queues.storage.mongodb import queues
|
||||||
@ -31,6 +32,7 @@ from zaqar.queues.storage.mongodb import queues
|
|||||||
|
|
||||||
CatalogueController = catalogue.CatalogueController
|
CatalogueController = catalogue.CatalogueController
|
||||||
ClaimController = claims.ClaimController
|
ClaimController = claims.ClaimController
|
||||||
|
FlavorsController = flavors.FlavorsController
|
||||||
MessageController = messages.MessageController
|
MessageController = messages.MessageController
|
||||||
QueueController = queues.QueueController
|
QueueController = queues.QueueController
|
||||||
PoolsController = pools.PoolsController
|
PoolsController = pools.PoolsController
|
||||||
|
@ -168,3 +168,7 @@ class ControlDriver(storage.ControlDriverBase):
|
|||||||
@property
|
@property
|
||||||
def catalogue_controller(self):
|
def catalogue_controller(self):
|
||||||
return controllers.CatalogueController(self)
|
return controllers.CatalogueController(self)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def flavors_controller(self):
|
||||||
|
return controllers.FlavorsController(self)
|
||||||
|
126
zaqar/queues/storage/mongodb/flavors.py
Normal file
126
zaqar/queues/storage/mongodb/flavors.py
Normal file
@ -0,0 +1,126 @@
|
|||||||
|
# Copyright (c) 2014 Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
||||||
|
# use this file except in compliance with the License. You may obtain a copy
|
||||||
|
# of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations under
|
||||||
|
# the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
Schema:
|
||||||
|
'n': name :: six.text_type
|
||||||
|
'p': project :: six.text_type
|
||||||
|
's': storage pool :: six.text_type
|
||||||
|
'c': capabilities :: dict
|
||||||
|
"""
|
||||||
|
|
||||||
|
import functools
|
||||||
|
|
||||||
|
from zaqar.queues.storage import base
|
||||||
|
from zaqar.queues.storage import errors
|
||||||
|
from zaqar.queues.storage.mongodb import utils
|
||||||
|
|
||||||
|
FLAVORS_INDEX = [
|
||||||
|
('p', 1),
|
||||||
|
('n', 1),
|
||||||
|
]
|
||||||
|
|
||||||
|
# NOTE(cpp-cabrera): used for get/list operations. There's no need to
|
||||||
|
# show the marker or the _id - they're implementation details.
|
||||||
|
OMIT_FIELDS = (('_id', False),)
|
||||||
|
|
||||||
|
|
||||||
|
def _field_spec(detailed=False):
|
||||||
|
return dict(OMIT_FIELDS + (() if detailed else (('c', False),)))
|
||||||
|
|
||||||
|
|
||||||
|
class FlavorsController(base.FlavorsBase):
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(FlavorsController, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
self._col = self.driver.database.flavors
|
||||||
|
self._col.ensure_index(FLAVORS_INDEX,
|
||||||
|
background=True,
|
||||||
|
name='flavors_name',
|
||||||
|
unique=True)
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
def list(self, project=None, marker=None, limit=10, detailed=False):
|
||||||
|
query = {'p': project}
|
||||||
|
if marker is not None:
|
||||||
|
query['n'] = {'$gt': marker}
|
||||||
|
|
||||||
|
cursor = self._col.find(query, fields=_field_spec(detailed),
|
||||||
|
limit=limit)
|
||||||
|
|
||||||
|
normalizer = functools.partial(_normalize, detailed=detailed)
|
||||||
|
return utils.HookedCursor(cursor, normalizer)
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
def get(self, name, project=None, detailed=False):
|
||||||
|
res = self._col.find_one({'n': name, 'p': project},
|
||||||
|
_field_spec(detailed))
|
||||||
|
|
||||||
|
if not res:
|
||||||
|
raise errors.FlavorDoesNotExist(name)
|
||||||
|
|
||||||
|
return _normalize(res, detailed)
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
def create(self, name, pool, project=None, capabilities=None):
|
||||||
|
# TODO(flaper87): Verify storage exists
|
||||||
|
capabilities = {} if capabilities is None else capabilities
|
||||||
|
self._col.update({'n': name, 'p': project},
|
||||||
|
{'$set': {'s': pool, 'c': capabilities}},
|
||||||
|
upsert=True)
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
def exists(self, name, project=None):
|
||||||
|
return self._col.find_one({'n': name, 'p': project}) is not None
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
def update(self, name, project=None, pool=None, capabilities=None):
|
||||||
|
fields = {}
|
||||||
|
|
||||||
|
if capabilities is not None:
|
||||||
|
fields['c'] = capabilities
|
||||||
|
|
||||||
|
if pool is not None:
|
||||||
|
fields['s'] = pool
|
||||||
|
|
||||||
|
assert fields, '`pool` or `capabilities` not found in kwargs'
|
||||||
|
res = self._col.update({'n': name, 'p': project},
|
||||||
|
{'$set': fields},
|
||||||
|
upsert=False)
|
||||||
|
|
||||||
|
if not res['updatedExisting']:
|
||||||
|
raise errors.FlavorDoesNotExist(name)
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
def delete(self, name, project=None):
|
||||||
|
self._col.remove({'n': name, 'p': project}, w=0)
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
def drop_all(self):
|
||||||
|
self._col.drop()
|
||||||
|
self._col.ensure_index(FLAVORS_INDEX, unique=True)
|
||||||
|
|
||||||
|
|
||||||
|
def _normalize(pool, detailed=False):
|
||||||
|
ret = {
|
||||||
|
'name': pool['n'],
|
||||||
|
'project': pool['p'],
|
||||||
|
'pool': pool['s'],
|
||||||
|
}
|
||||||
|
|
||||||
|
if detailed:
|
||||||
|
ret['capabilities'] = pool['c']
|
||||||
|
|
||||||
|
return ret
|
@ -154,3 +154,8 @@ class ControlDriver(storage.ControlDriverBase):
|
|||||||
@property
|
@property
|
||||||
def catalogue_controller(self):
|
def catalogue_controller(self):
|
||||||
return controllers.CatalogueController(self)
|
return controllers.CatalogueController(self)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def flavors_controller(self):
|
||||||
|
# NOTE(flaper87): Needed to avoid `abc` errors.
|
||||||
|
raise NotImplementedError
|
||||||
|
@ -53,6 +53,10 @@ class ControlDriver(storage.ControlDriverBase):
|
|||||||
def pools_controller(self):
|
def pools_controller(self):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def flavors_controller(self):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
class QueueController(storage.Queue):
|
class QueueController(storage.Queue):
|
||||||
def __init__(self, driver):
|
def __init__(self, driver):
|
||||||
|
@ -991,6 +991,153 @@ class CatalogueControllerTest(ControllerBaseTest):
|
|||||||
self.controller.insert(self.project, q2, u'a')
|
self.controller.insert(self.project, q2, u'a')
|
||||||
|
|
||||||
|
|
||||||
|
class FlavorsControllerTest(ControllerBaseTest):
|
||||||
|
"""Flavors Controller base tests.
|
||||||
|
|
||||||
|
NOTE(flaper87): Implementations of this class should
|
||||||
|
override the tearDown method in order
|
||||||
|
to clean up storage's state.
|
||||||
|
"""
|
||||||
|
controller_base_class = storage.FlavorsBase
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(FlavorsControllerTest, self).setUp()
|
||||||
|
self.pools_controller = self.driver.pools_controller
|
||||||
|
self.flavors_controller = self.driver.flavors_controller
|
||||||
|
|
||||||
|
# Let's create one pool
|
||||||
|
self.pool = str(uuid.uuid1())
|
||||||
|
self.pools_controller.create(self.pool, 100, 'localhost', {})
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.flavors_controller.drop_all()
|
||||||
|
super(FlavorsControllerTest, self).tearDown()
|
||||||
|
|
||||||
|
def test_create_succeeds(self):
|
||||||
|
self.flavors_controller.create('durable', self.pool,
|
||||||
|
project=self.project,
|
||||||
|
capabilities={})
|
||||||
|
|
||||||
|
def _flavors_expects(self, flavor, xname, xproject, xpool):
|
||||||
|
self.assertIn('name', flavor)
|
||||||
|
self.assertEqual(flavor['name'], xname)
|
||||||
|
self.assertIn('project', flavor)
|
||||||
|
self.assertEqual(flavor['project'], xproject)
|
||||||
|
self.assertIn('pool', flavor)
|
||||||
|
self.assertEqual(flavor['pool'], xpool)
|
||||||
|
|
||||||
|
def test_create_replaces_on_duplicate_insert(self):
|
||||||
|
name = str(uuid.uuid1())
|
||||||
|
self.flavors_controller.create(name, self.pool,
|
||||||
|
project=self.project,
|
||||||
|
capabilities={})
|
||||||
|
self.flavors_controller.create(name, 'another_pool',
|
||||||
|
project=self.project,
|
||||||
|
capabilities={})
|
||||||
|
entry = self.flavors_controller.get(name, project=self.project)
|
||||||
|
self._flavors_expects(entry, name, self.project, 'another_pool')
|
||||||
|
|
||||||
|
def test_get_returns_expected_content(self):
|
||||||
|
name = 'durable'
|
||||||
|
capabilities = {'fifo': True}
|
||||||
|
self.flavors_controller.create(name, self.pool,
|
||||||
|
project=self.project,
|
||||||
|
capabilities=capabilities)
|
||||||
|
res = self.flavors_controller.get(name, project=self.project)
|
||||||
|
self._flavors_expects(res, name, self.project, self.pool)
|
||||||
|
self.assertNotIn('capabilities', res)
|
||||||
|
|
||||||
|
def test_detailed_get_returns_expected_content(self):
|
||||||
|
name = 'durable'
|
||||||
|
capabilities = {'fifo': True}
|
||||||
|
self.flavors_controller.create(name, self.pool,
|
||||||
|
project=self.project,
|
||||||
|
capabilities=capabilities)
|
||||||
|
res = self.flavors_controller.get(name, project=self.project,
|
||||||
|
detailed=True)
|
||||||
|
self._flavors_expects(res, name, self.project, self.pool)
|
||||||
|
self.assertIn('capabilities', res)
|
||||||
|
self.assertEqual(res['capabilities'], capabilities)
|
||||||
|
|
||||||
|
def test_get_raises_if_not_found(self):
|
||||||
|
self.assertRaises(storage.errors.FlavorDoesNotExist,
|
||||||
|
self.flavors_controller.get, 'notexists')
|
||||||
|
|
||||||
|
def test_exists(self):
|
||||||
|
self.flavors_controller.create('exists', self.pool,
|
||||||
|
project=self.project,
|
||||||
|
capabilities={})
|
||||||
|
self.assertTrue(self.flavors_controller.exists('exists',
|
||||||
|
project=self.project))
|
||||||
|
self.assertFalse(self.flavors_controller.exists('notexists',
|
||||||
|
project=self.project))
|
||||||
|
|
||||||
|
def test_update_raises_assertion_error_on_bad_fields(self):
|
||||||
|
self.assertRaises(AssertionError, self.pools_controller.update,
|
||||||
|
self.pool)
|
||||||
|
|
||||||
|
def test_update_works(self):
|
||||||
|
name = 'yummy'
|
||||||
|
self.flavors_controller.create(name, self.pool,
|
||||||
|
project=self.project,
|
||||||
|
capabilities={})
|
||||||
|
|
||||||
|
res = self.flavors_controller.get(name, project=self.project,
|
||||||
|
detailed=True)
|
||||||
|
|
||||||
|
new_capabilities = {'fifo': False}
|
||||||
|
self.flavors_controller.update(name, project=self.project,
|
||||||
|
pool='olympic',
|
||||||
|
capabilities={'fifo': False})
|
||||||
|
res = self.flavors_controller.get(name, project=self.project,
|
||||||
|
detailed=True)
|
||||||
|
self._flavors_expects(res, name, self.project, 'olympic')
|
||||||
|
self.assertEqual(res['capabilities'], new_capabilities)
|
||||||
|
|
||||||
|
def test_delete_works(self):
|
||||||
|
name = 'puke'
|
||||||
|
self.flavors_controller.create(name, self.pool,
|
||||||
|
project=self.project,
|
||||||
|
capabilities={})
|
||||||
|
self.flavors_controller.delete(name, project=self.project)
|
||||||
|
self.assertFalse(self.flavors_controller.exists(name))
|
||||||
|
|
||||||
|
def test_delete_nonexistent_is_silent(self):
|
||||||
|
self.flavors_controller.delete('nonexisting')
|
||||||
|
|
||||||
|
def test_drop_all_leads_to_empty_listing(self):
|
||||||
|
self.flavors_controller.drop_all()
|
||||||
|
cursor = self.flavors_controller.list()
|
||||||
|
self.assertRaises(StopIteration, next, cursor)
|
||||||
|
|
||||||
|
def test_listing_simple(self):
|
||||||
|
name_gen = lambda i: chr(ord('A') + i)
|
||||||
|
for i in range(15):
|
||||||
|
self.flavors_controller.create(name_gen(i), project=self.project,
|
||||||
|
pool=str(i), capabilities={})
|
||||||
|
|
||||||
|
res = list(self.flavors_controller.list(project=self.project))
|
||||||
|
self.assertEqual(len(res), 10)
|
||||||
|
for i, entry in enumerate(res):
|
||||||
|
self._flavors_expects(entry, name_gen(i), self.project, str(i))
|
||||||
|
self.assertNotIn('capabilities', entry)
|
||||||
|
|
||||||
|
res = list(self.flavors_controller.list(project=self.project, limit=5))
|
||||||
|
self.assertEqual(len(res), 5)
|
||||||
|
|
||||||
|
res = next(self.flavors_controller.list(project=self.project,
|
||||||
|
marker=name_gen(3)))
|
||||||
|
self._flavors_expects(res, name_gen(4), self.project, '4')
|
||||||
|
|
||||||
|
res = list(self.flavors_controller.list(project=self.project,
|
||||||
|
detailed=True))
|
||||||
|
self.assertEqual(len(res), 10)
|
||||||
|
for i, entry in enumerate(res):
|
||||||
|
self._flavors_expects(entry, name_gen(i), self.project, str(i))
|
||||||
|
self.assertIn('capabilities', entry)
|
||||||
|
self.assertEqual(entry['capabilities'], {})
|
||||||
|
|
||||||
|
|
||||||
def _insert_fixtures(controller, queue_name, project=None,
|
def _insert_fixtures(controller, queue_name, project=None,
|
||||||
client_uuid=None, num=4, ttl=120):
|
client_uuid=None, num=4, ttl=120):
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user