From 1b694eb485b8e7cf10502677f582fed970f7e71f Mon Sep 17 00:00:00 2001 From: Fei Long Wang Date: Sun, 7 Dec 2014 00:05:42 +1300 Subject: [PATCH] Implement mongodb driver for notifications This patch adds the base API for notification to the storage layer, and an init mongodb driver. The API follows pretty much the leads of other APIs and supports basic operations that allow users to create, delete, update and retrieve subscriptions for notification. Partially-Implements blueprint: notifications Change-Id: Ie065aaec16b9e6dbf9d31d93320bf6e691a4ed37 --- tests/unit/storage/test_impl_mongodb.py | 11 +- zaqar/storage/__init__.py | 2 + zaqar/storage/base.py | 111 +++++++++++++++++ zaqar/storage/errors.py | 9 ++ zaqar/storage/mongodb/controllers.py | 2 + zaqar/storage/mongodb/driver.py | 10 ++ zaqar/storage/mongodb/subscriptions.py | 151 ++++++++++++++++++++++++ zaqar/storage/pipeline.py | 8 +- zaqar/storage/pooling.py | 67 +++++++++++ zaqar/storage/redis/driver.py | 8 ++ zaqar/storage/sqlalchemy/driver.py | 8 ++ zaqar/tests/faulty_storage.py | 4 + zaqar/tests/unit/storage/base.py | 122 +++++++++++++++++++ 13 files changed, 511 insertions(+), 2 deletions(-) create mode 100644 zaqar/storage/mongodb/subscriptions.py diff --git a/tests/unit/storage/test_impl_mongodb.py b/tests/unit/storage/test_impl_mongodb.py index e8a67931..2da51fbc 100644 --- a/tests/unit/storage/test_impl_mongodb.py +++ b/tests/unit/storage/test_impl_mongodb.py @@ -41,7 +41,8 @@ from zaqar.tests.unit.storage import base class MongodbSetupMixin(object): def _purge_databases(self): databases = (self.driver.message_databases + - [self.driver.queues_database]) + [self.driver.queues_database, + self.driver.subscriptions_database]) for db in databases: self.driver.connection.drop_database(db) @@ -444,6 +445,14 @@ class MongodbClaimTests(MongodbSetupMixin, base.ClaimControllerTest): project=self.project) +@testing.requires_mongodb +class MongodbSubscriptionTests(MongodbSetupMixin, + base.SubscriptionControllerTest): + driver_class = mongodb.DataDriver + config_file = 'wsgi_mongodb.conf' + controller_class = controllers.SubscriptionController + + # # TODO(kgriffs): Do these need database purges as well as those above? # diff --git a/zaqar/storage/__init__.py b/zaqar/storage/__init__.py index c48acff4..795647b1 100644 --- a/zaqar/storage/__init__.py +++ b/zaqar/storage/__init__.py @@ -24,11 +24,13 @@ CatalogueBase = base.CatalogueBase Claim = base.Claim Message = base.Message Queue = base.Queue +Subscription = base.Subscription PoolsBase = base.PoolsBase FlavorsBase = base.FlavorsBase DEFAULT_QUEUES_PER_PAGE = base.DEFAULT_QUEUES_PER_PAGE DEFAULT_MESSAGES_PER_PAGE = base.DEFAULT_MESSAGES_PER_PAGE DEFAULT_POOLS_PER_PAGE = base.DEFAULT_POOLS_PER_PAGE +DEFAULT_SUBSCRIPTIONS_PER_PAGE = base.DEFAULT_SUBSCRIPTIONS_PER_PAGE DEFAULT_MESSAGES_PER_CLAIM = base.DEFAULT_MESSAGES_PER_CLAIM diff --git a/zaqar/storage/base.py b/zaqar/storage/base.py index 100dd87d..989be47f 100644 --- a/zaqar/storage/base.py +++ b/zaqar/storage/base.py @@ -31,6 +31,7 @@ import zaqar.openstack.common.log as logging DEFAULT_QUEUES_PER_PAGE = 10 DEFAULT_MESSAGES_PER_PAGE = 10 DEFAULT_POOLS_PER_PAGE = 10 +DEFAULT_SUBSCRIPTIONS_PER_PAGE = 10 DEFAULT_MESSAGES_PER_CLAIM = 10 @@ -223,6 +224,11 @@ class DataDriverBase(DriverBase): """Returns the driver's claim controller.""" raise NotImplementedError + @abc.abstractproperty + def subscription_controller(self): + """Returns the driver's subscription controller.""" + raise NotImplementedError + @six.add_metaclass(abc.ABCMeta) class ControlDriverBase(DriverBase): @@ -564,6 +570,111 @@ class Claim(ControllerBase): raise NotImplementedError +@six.add_metaclass(abc.ABCMeta) +class Subscription(ControllerBase): + """This class is responsible for managing subscriptions of notification. + + """ + + @abc.abstractmethod + def list(self, queue, project=None, marker=None, + limit=DEFAULT_SUBSCRIPTIONS_PER_PAGE): + """Base method for listing subscriptions. + + :param queue: Name of the queue to get the subscriptions from. + :type queue: six.text_type + :param project: Project this subscription belongs to. + :type project: six.text_type + :param marker: used to determine which subscription to start with + :type marker: six.text_type + :param limit: (Default 10) Max number of results to return + :type limit: int + :returns: An iterator giving a sequence of subscriptions + and the marker of the next page. + :rtype: [{}] + """ + raise NotImplementedError + + @abc.abstractmethod + def get(self, queue, subscription_id, project=None): + """Returns a single subscription entry. + + :param queue: Name of the queue subscription belongs to. + :type queue: six.text_type + :param subscription_id: ID of this subscription + :type subscription_id: six.text_type + :param project: Project this subscription belongs to. + :type project: six.text_type + :returns: Dictionary containing subscription data + :rtype: {} + :raises: SubscriptionDoesNotExist if not found + """ + raise NotImplementedError + + @abc.abstractmethod + def create(self, queue, subscriber, ttl, options, project=None): + """Create a new subscription. + + :param queue:The source queue for notifications + :type queue: six.text_type + :param subscriber: The subscriber URI + :type subscriber: six.text_type + :param ttl: time to live for this subscription + :type ttl: int + :param options: Options used to configure this subscription + :type options: dict + :param project: Project id + :type project: six.text_type + :returns: True if a subscription was created and False + if it is failed. + :rtype: boolean + """ + raise NotImplementedError + + @abc.abstractmethod + def update(self, queue, subscription_id, project=None, **kwargs): + """Updates the weight, uris, and/or options of this subscription + + :param queue: Name of the queue subscription belongs to. + :type queue: six.text_type + :param name: ID of the subscription + :type name: text + :param kwargs: one of: `source`, `subscriber`, `ttl`, `options` + :type kwargs: dict + :raises: SubscriptionDoesNotExist if not found + """ + + raise NotImplementedError + + @abc.abstractmethod + def exists(self, queue, subscription_id, project=None): + """Base method for testing subscription existence. + + :param queue: Name of the queue subscription belongs to. + :type queue: six.text_type + :param subscription_id: ID of subscription + :type subscription_id: six.text_type + :param project: Project id + :type project: six.text_type + :returns: True if a subscription exists and False + if it does not. + """ + raise NotImplementedError + + @abc.abstractmethod + def delete(self, queue, subscription_id, project=None): + """Base method for deleting a subscription. + + :param queue: Name of the queue subscription belongs to. + :type queue: six.text_type + :param subscription_id: ID of the subscription to be deleted. + :type subscription_id: six.text_type + :param project: Project id + :type project: six.text_type + """ + raise NotImplementedError + + @six.add_metaclass(abc.ABCMeta) class PoolsBase(ControllerBase): """A controller for managing pools.""" diff --git a/zaqar/storage/errors.py b/zaqar/storage/errors.py index 626e9af5..ad80ad0f 100644 --- a/zaqar/storage/errors.py +++ b/zaqar/storage/errors.py @@ -176,3 +176,12 @@ class PoolInUseByFlavor(NotPermitted): @property def flavor(self): return self._flavor + + +class SubscriptionDoesNotExist(DoesNotExist): + + msg_format = u'Subscription {subscription_id} does not exist' + + def __init__(self, subscription_id): + super(SubscriptionDoesNotExist, + self).__init__(subscription_id=subscription_id) diff --git a/zaqar/storage/mongodb/controllers.py b/zaqar/storage/mongodb/controllers.py index 37e426fe..01b176a8 100644 --- a/zaqar/storage/mongodb/controllers.py +++ b/zaqar/storage/mongodb/controllers.py @@ -28,6 +28,7 @@ from zaqar.storage.mongodb import flavors from zaqar.storage.mongodb import messages from zaqar.storage.mongodb import pools from zaqar.storage.mongodb import queues +from zaqar.storage.mongodb import subscriptions CatalogueController = catalogue.CatalogueController @@ -36,3 +37,4 @@ FlavorsController = flavors.FlavorsController MessageController = messages.MessageController QueueController = queues.QueueController PoolsController = pools.PoolsController +SubscriptionController = subscriptions.SubscriptionController diff --git a/zaqar/storage/mongodb/driver.py b/zaqar/storage/mongodb/driver.py index 94858945..da213926 100644 --- a/zaqar/storage/mongodb/driver.py +++ b/zaqar/storage/mongodb/driver.py @@ -165,6 +165,12 @@ class DataDriver(storage.DataDriverBase): return [self.connection[name + '_messages_p' + str(p)] for p in range(partitions)] + @decorators.lazy_property(write=False) + def subscriptions_database(self): + """Database dedicated to the "subscription" collection.""" + name = self.mongodb_conf.database + '_subscriptions' + return self.connection[name] + @decorators.lazy_property(write=False) def connection(self): """MongoDB client connection instance.""" @@ -182,6 +188,10 @@ class DataDriver(storage.DataDriverBase): def claim_controller(self): return controllers.ClaimController(self) + @decorators.lazy_property(write=False) + def subscription_controller(self): + return controllers.SubscriptionController(self) + class ControlDriver(storage.ControlDriverBase): diff --git a/zaqar/storage/mongodb/subscriptions.py b/zaqar/storage/mongodb/subscriptions.py new file mode 100644 index 00000000..915eb3e9 --- /dev/null +++ b/zaqar/storage/mongodb/subscriptions.py @@ -0,0 +1,151 @@ +# Copyright (c) 2014 Catalyst IT Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy +# of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +from oslo.utils import timeutils +import pymongo.errors + +from zaqar.common import utils as common_utils +from zaqar.storage import base +from zaqar.storage import errors +from zaqar.storage.mongodb import utils + +ID_INDEX_FIELDS = [('_id', 1)] + +SUBSCRIPTIONS_INDEX = [ + ('s', 1), + ('u', 1), + ('p', 1), +] + + +class SubscriptionController(base.Subscription): + """Implements subscription resource operations using MongoDB. + + Subscriptions are unique by project + queue/topic + subscriber. + + Schema: + 's': source :: six.text_type + 'u': subscriber:: six.text_type + 't': ttl:: int + 'e': expires: int + 'o': options :: dict + 'p': project :: six.text_type + """ + + def __init__(self, *args, **kwargs): + super(SubscriptionController, self).__init__(*args, **kwargs) + self._collection = self.driver.subscriptions_database.subscriptions + self._queue_collection = self.driver.queues_database.queues + self._collection.ensure_index(SUBSCRIPTIONS_INDEX, unique=True) + + @utils.raises_conn_error + def list(self, queue, project=None, marker=None, limit=10): + query = {'s': queue, 'p': project} + if marker is not None: + query['_id'] = {'$gt': marker} + + fields = {'s': 1, 'u': 1, 't': 1, 'p': 1, 'o': 1, '_id': 1} + + cursor = self._collection.find(query, fields=fields) + cursor = cursor.limit(limit).sort('_id') + marker_name = {} + + def normalizer(record): + ret = { + 'id': str(record['_id']), + 'source': record['s'], + 'subscriber': record['u'], + 'ttl': record['t'], + 'options': record['o'], + } + marker_name['next'] = record['_id'] + + return ret + + yield utils.HookedCursor(cursor, normalizer) + yield marker_name and marker_name['next'] + + @utils.raises_conn_error + def get(self, queue, subscription_id, project=None): + res = self._collection.find_one({'_id': utils.to_oid(subscription_id), + 'p': project}) + + if not res: + raise errors.SubscriptionDoesNotExist(subscription_id) + + return _normalize(res) + + @utils.raises_conn_error + def create(self, queue, subscriber, ttl, options, project=None): + source = queue + now = timeutils.utcnow_ts() + ttl = int(ttl) + expires = now + ttl + source_query = {'p_q': utils.scope_queue_name(source, project)} + target_source = self._queue_collection.find_one(source_query, + fields={'m': 1, + '_id': 0}) + if target_source is None: + raise errors.QueueDoesNotExist(target_source, project) + + try: + subscription_id = self._collection.insert({'s': source, + 'u': subscriber, + 't': ttl, + 'e': expires, + 'o': options, + 'p': project}) + return subscription_id + except pymongo.errors.DuplicateKeyError: + return None + + @utils.raises_conn_error + def exists(self, queue, subscription_id, project=None): + return self._collection.find_one({'_id': utils.to_oid(subscription_id), + 'p': project}) is not None + + @utils.raises_conn_error + def update(self, queue, subscription_id, project=None, **kwargs): + names = ('subscriber', 'ttl', 'options') + key_transform = lambda x: 'u' if x == 'subscriber' else x[0] + fields = common_utils.fields(kwargs, names, + pred=lambda x: x is not None, + key_transform=key_transform) + assert fields, ('`subscriber`, `ttl`, ' + 'or `options` not found in kwargs') + + res = self._collection.update({'_id': utils.to_oid(subscription_id), + 'p': project}, + {'$set': fields}, + upsert=False) + + if not res['updatedExisting']: + raise errors.SubscriptionDoesNotExist(subscription_id) + + @utils.raises_conn_error + def delete(self, queue, subscription_id, project=None): + self._collection.remove({'_id': utils.to_oid(subscription_id), + 'p': project}, w=0) + + +def _normalize(record): + ret = { + 'id': str(record['_id']), + 'source': record['s'], + 'subscriber': record['u'], + 'ttl': record['t'], + 'options': record['o'] + } + + return ret diff --git a/zaqar/storage/pipeline.py b/zaqar/storage/pipeline.py index 1d6816ac..6cdbf283 100644 --- a/zaqar/storage/pipeline.py +++ b/zaqar/storage/pipeline.py @@ -23,7 +23,7 @@ from zaqar.storage import base LOG = logging.getLogger(__name__) -_PIPELINE_RESOURCES = ('queue', 'message', 'claim') +_PIPELINE_RESOURCES = ('queue', 'message', 'claim', 'subscription') _PIPELINE_CONFIGS = tuple(( cfg.ListOpt(resource + '_pipeline', default=[], @@ -122,3 +122,9 @@ class DataDriver(base.DataDriverBase): stages = _get_storage_pipeline('claim', self.conf) stages.append(self._storage.claim_controller) return stages + + @decorators.lazy_property(write=False) + def subscription_controller(self): + stages = _get_storage_pipeline('subscription', self.conf) + stages.append(self._storage.subscription_controller) + return stages diff --git a/zaqar/storage/pooling.py b/zaqar/storage/pooling.py index 80ee921c..6dfaca19 100644 --- a/zaqar/storage/pooling.py +++ b/zaqar/storage/pooling.py @@ -121,6 +121,10 @@ class DataDriver(storage.DataDriverBase): def claim_controller(self): return ClaimController(self._pool_catalog) + @decorators.lazy_property(write=False) + def subscription_controller(self): + return SubscriptionController(self._pool_catalog) + class QueueController(storage.Queue): """Routes operations to a queue controller in the appropriate pool. @@ -345,6 +349,54 @@ class ClaimController(storage.Claim): return None +class SubscriptionController(storage.Subscription): + """Controller to facilitate processing for subscription operations.""" + + _resource_name = 'subscription' + + def __init__(self, pool_catalog): + super(SubscriptionController, self).__init__(pool_catalog) + self._pool_catalog = pool_catalog + self._get_controller = self._pool_catalog.get_subscription_controller + + def list(self, queue, project=None, marker=None, + limit=storage.DEFAULT_SUBSCRIPTIONS_PER_PAGE): + control = self._get_controller(queue, project) + if control: + return control.list(queue, project=project, + marker=marker, limit=limit) + + def get(self, queue, subscription_id, project=None): + control = self._get_controller(queue, project) + if control: + return control.get(queue, subscription_id, project=project) + + def create(self, queue, subscriber, ttl, options, project=None): + control = self._get_controller(queue, project) + if control: + return control.post(queue, subscriber, + ttl, options, + project=project) + + def update(self, queue, subscription_id, project=None, **kwargs): + control = self._get_controller(queue, project) + if control: + return control.update(queue, subscription_id, + project=project, **kwargs) + + def delete(self, queue, subscription_id, project=None): + control = self._get_controller(queue, project) + if control: + return control.delete(queue, subscription_id, + project=project) + + def exists(self, queue, subscription_id, project=None): + control = self._get_controller(queue, project) + if control: + return control.exists(queue, subscription_id, + project=project) + + class Catalog(object): """Represents the mapping between queues and pool drivers.""" @@ -491,6 +543,21 @@ class Catalog(object): target = self.lookup(queue, project) return target and target.claim_controller + def get_subscription_controller(self, queue, project=None): + """Lookup the subscription controller for the given queue and project. + + :param queue: Name of the queue for which to find a pool + :param project: Project to which the queue belongs, or + None to specify the "global" or "generic" project. + + :returns: The subscription controller associated with the data driver + for the pool containing (queue, project) or None if this doesn't + exist. + :rtype: Maybe SubscriptionController + """ + target = self.lookup(queue, project) + return target and target.subscription_controller + def lookup(self, queue, project=None): """Lookup a pool driver for the given queue and project. diff --git a/zaqar/storage/redis/driver.py b/zaqar/storage/redis/driver.py index 834442db..3f7d0495 100644 --- a/zaqar/storage/redis/driver.py +++ b/zaqar/storage/redis/driver.py @@ -206,6 +206,10 @@ class DataDriver(storage.DataDriverBase): def claim_controller(self): return controllers.ClaimController(self) + @decorators.lazy_property(write=False) + def subscription_controller(self): + raise NotImplementedError() + class ControlDriver(storage.ControlDriverBase): @@ -234,6 +238,10 @@ class ControlDriver(storage.ControlDriverBase): def flavors_controller(self): raise NotImplementedError() + @property + def subscriptions_controller(self): + raise NotImplementedError() + def _get_redis_client(driver): conf = driver.redis_conf diff --git a/zaqar/storage/sqlalchemy/driver.py b/zaqar/storage/sqlalchemy/driver.py index ef99eeb2..086df72f 100644 --- a/zaqar/storage/sqlalchemy/driver.py +++ b/zaqar/storage/sqlalchemy/driver.py @@ -128,6 +128,10 @@ class DataDriver(storage.DataDriverBase): def claim_controller(self): return controllers.ClaimController(self) + @decorators.lazy_property(write=False) + def subscription_controller(self): + pass + def is_alive(self): return True @@ -190,3 +194,7 @@ class ControlDriver(storage.ControlDriverBase): def flavors_controller(self): # NOTE(flaper87): Needed to avoid `abc` errors. pass + + @property + def subscriptions_controller(self): + pass diff --git a/zaqar/tests/faulty_storage.py b/zaqar/tests/faulty_storage.py index 4487165d..a4a98ed3 100644 --- a/zaqar/tests/faulty_storage.py +++ b/zaqar/tests/faulty_storage.py @@ -46,6 +46,10 @@ class DataDriver(storage.DataDriverBase): def claim_controller(self): return None + @property + def subscription_controller(self): + return None + class ControlDriver(storage.ControlDriverBase): diff --git a/zaqar/tests/unit/storage/base.py b/zaqar/tests/unit/storage/base.py index 24249fa3..ef405ad2 100644 --- a/zaqar/tests/unit/storage/base.py +++ b/zaqar/tests/unit/storage/base.py @@ -1,4 +1,5 @@ # Copyright (c) 2013 Red Hat, Inc. +# Copyright (c) 2014 Catalyst IT Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -929,6 +930,127 @@ class ClaimControllerTest(ControllerBaseTest): project=self.project) +class SubscriptionControllerTest(ControllerBaseTest): + """Subscriptions Controller base tests. + + """ + queue_name = 'test_queue' + controller_base_class = storage.Subscription + + def setUp(self): + super(SubscriptionControllerTest, self).setUp() + self.subscription_controller = self.driver.subscription_controller + + # Lets create a queue as the source of subscription + self.queue_controller = self.driver.queue_controller + self.queue_controller.create(self.queue_name, project=self.project) + + self.source = self.queue_name + self.subscriber = 'http://trigger.me' + self.ttl = 600 + self.options = {'uri': 'http://fake.com'} + + def tearDown(self): + self.queue_controller.delete(self.queue_name, project=self.project) + super(SubscriptionControllerTest, self).tearDown() + + def test_list(self): + for s in six.moves.xrange(15): + subscriber = 'http://fake_{0}'.format(s) + self.subscription_controller.create(self.source, + subscriber, + self.ttl, + self.options, + project=self.project) + + interaction = self.subscription_controller.list(self.source, + project=self.project) + subscriptions = list(next(interaction)) + + self.assertEqual(all(map(lambda s: 'source' in s and 'subscriber' in s, + subscriptions)), True) + self.assertEqual(len(subscriptions), 10) + + interaction = (self.subscription_controller.list(self.source, + project=self.project, + marker=next(interaction))) + subscriptions = list(next(interaction)) + + self.assertEqual(all(map(lambda s: 'source' in s and 'subscriber' in s, + subscriptions)), True) + self.assertEqual(len(subscriptions), 5) + + def test_get_raises_if_subscription_does_not_exist(self): + self.assertRaises(errors.SubscriptionDoesNotExist, + self.subscription_controller.get, + self.queue_name, + 'notexists', + project=self.project) + + def test_lifecycle(self): + s_id = self.subscription_controller.create(self.source, + self.subscriber, + self.ttl, + self.options, + project=self.project) + + subscription = self.subscription_controller.get(self.queue_name, + s_id, + self.project) + + self.assertEqual(self.source, + subscription['source']) + self.assertEqual(self.subscriber, + subscription['subscriber']) + + exist = self.subscription_controller.exists(self.queue_name, + s_id, + self.project) + + self.assertTrue(exist) + + self.subscription_controller.update(self.queue_name, + s_id, + project=self.project, + subscriber='http://a.com' + ) + + updated = self.subscription_controller.get(self.queue_name, + s_id, + self.project) + + self.assertEqual('http://a.com', updated['subscriber']) + + self.subscription_controller.delete(self.queue_name, + s_id, project=self.project) + self.assertRaises(errors.SubscriptionDoesNotExist, + self.subscription_controller.get, + self.queue_name, s_id) + + def test_create_existed(self): + self.subscription_controller.create(self.source, + self.subscriber, + self.ttl, + self.options, + project=self.project) + + s_id = self.subscription_controller.create(self.source, + self.subscriber, + self.ttl, + self.options, + project=self.project) + self.assertIsNone(s_id) + + def test_nonexist_source(self): + self.assertRaises(errors.QueueDoesNotExist, + self.subscription_controller.create, + 'fake_queue_name', + self.subscriber, + self.ttl, + self.options, + self.project) + + class PoolsControllerTest(ControllerBaseTest): """Pools Controller base tests.