diff --git a/tests/unit/storage/test_impl_mongodb.py b/tests/unit/storage/test_impl_mongodb.py index e8a67931..2da51fbc 100644 --- a/tests/unit/storage/test_impl_mongodb.py +++ b/tests/unit/storage/test_impl_mongodb.py @@ -41,7 +41,8 @@ from zaqar.tests.unit.storage import base class MongodbSetupMixin(object): def _purge_databases(self): databases = (self.driver.message_databases + - [self.driver.queues_database]) + [self.driver.queues_database, + self.driver.subscriptions_database]) for db in databases: self.driver.connection.drop_database(db) @@ -444,6 +445,14 @@ class MongodbClaimTests(MongodbSetupMixin, base.ClaimControllerTest): project=self.project) +@testing.requires_mongodb +class MongodbSubscriptionTests(MongodbSetupMixin, + base.SubscriptionControllerTest): + driver_class = mongodb.DataDriver + config_file = 'wsgi_mongodb.conf' + controller_class = controllers.SubscriptionController + + # # TODO(kgriffs): Do these need database purges as well as those above? # diff --git a/zaqar/storage/__init__.py b/zaqar/storage/__init__.py index c48acff4..795647b1 100644 --- a/zaqar/storage/__init__.py +++ b/zaqar/storage/__init__.py @@ -24,11 +24,13 @@ CatalogueBase = base.CatalogueBase Claim = base.Claim Message = base.Message Queue = base.Queue +Subscription = base.Subscription PoolsBase = base.PoolsBase FlavorsBase = base.FlavorsBase DEFAULT_QUEUES_PER_PAGE = base.DEFAULT_QUEUES_PER_PAGE DEFAULT_MESSAGES_PER_PAGE = base.DEFAULT_MESSAGES_PER_PAGE DEFAULT_POOLS_PER_PAGE = base.DEFAULT_POOLS_PER_PAGE +DEFAULT_SUBSCRIPTIONS_PER_PAGE = base.DEFAULT_SUBSCRIPTIONS_PER_PAGE DEFAULT_MESSAGES_PER_CLAIM = base.DEFAULT_MESSAGES_PER_CLAIM diff --git a/zaqar/storage/base.py b/zaqar/storage/base.py index 100dd87d..989be47f 100644 --- a/zaqar/storage/base.py +++ b/zaqar/storage/base.py @@ -31,6 +31,7 @@ import zaqar.openstack.common.log as logging DEFAULT_QUEUES_PER_PAGE = 10 DEFAULT_MESSAGES_PER_PAGE = 10 DEFAULT_POOLS_PER_PAGE = 10 +DEFAULT_SUBSCRIPTIONS_PER_PAGE = 10 DEFAULT_MESSAGES_PER_CLAIM = 10 @@ -223,6 +224,11 @@ class DataDriverBase(DriverBase): """Returns the driver's claim controller.""" raise NotImplementedError + @abc.abstractproperty + def subscription_controller(self): + """Returns the driver's subscription controller.""" + raise NotImplementedError + @six.add_metaclass(abc.ABCMeta) class ControlDriverBase(DriverBase): @@ -564,6 +570,111 @@ class Claim(ControllerBase): raise NotImplementedError +@six.add_metaclass(abc.ABCMeta) +class Subscription(ControllerBase): + """This class is responsible for managing subscriptions of notification. + + """ + + @abc.abstractmethod + def list(self, queue, project=None, marker=None, + limit=DEFAULT_SUBSCRIPTIONS_PER_PAGE): + """Base method for listing subscriptions. + + :param queue: Name of the queue to get the subscriptions from. + :type queue: six.text_type + :param project: Project this subscription belongs to. + :type project: six.text_type + :param marker: used to determine which subscription to start with + :type marker: six.text_type + :param limit: (Default 10) Max number of results to return + :type limit: int + :returns: An iterator giving a sequence of subscriptions + and the marker of the next page. + :rtype: [{}] + """ + raise NotImplementedError + + @abc.abstractmethod + def get(self, queue, subscription_id, project=None): + """Returns a single subscription entry. + + :param queue: Name of the queue subscription belongs to. + :type queue: six.text_type + :param subscription_id: ID of this subscription + :type subscription_id: six.text_type + :param project: Project this subscription belongs to. + :type project: six.text_type + :returns: Dictionary containing subscription data + :rtype: {} + :raises: SubscriptionDoesNotExist if not found + """ + raise NotImplementedError + + @abc.abstractmethod + def create(self, queue, subscriber, ttl, options, project=None): + """Create a new subscription. + + :param queue:The source queue for notifications + :type queue: six.text_type + :param subscriber: The subscriber URI + :type subscriber: six.text_type + :param ttl: time to live for this subscription + :type ttl: int + :param options: Options used to configure this subscription + :type options: dict + :param project: Project id + :type project: six.text_type + :returns: True if a subscription was created and False + if it is failed. + :rtype: boolean + """ + raise NotImplementedError + + @abc.abstractmethod + def update(self, queue, subscription_id, project=None, **kwargs): + """Updates the weight, uris, and/or options of this subscription + + :param queue: Name of the queue subscription belongs to. + :type queue: six.text_type + :param name: ID of the subscription + :type name: text + :param kwargs: one of: `source`, `subscriber`, `ttl`, `options` + :type kwargs: dict + :raises: SubscriptionDoesNotExist if not found + """ + + raise NotImplementedError + + @abc.abstractmethod + def exists(self, queue, subscription_id, project=None): + """Base method for testing subscription existence. + + :param queue: Name of the queue subscription belongs to. + :type queue: six.text_type + :param subscription_id: ID of subscription + :type subscription_id: six.text_type + :param project: Project id + :type project: six.text_type + :returns: True if a subscription exists and False + if it does not. + """ + raise NotImplementedError + + @abc.abstractmethod + def delete(self, queue, subscription_id, project=None): + """Base method for deleting a subscription. + + :param queue: Name of the queue subscription belongs to. + :type queue: six.text_type + :param subscription_id: ID of the subscription to be deleted. + :type subscription_id: six.text_type + :param project: Project id + :type project: six.text_type + """ + raise NotImplementedError + + @six.add_metaclass(abc.ABCMeta) class PoolsBase(ControllerBase): """A controller for managing pools.""" diff --git a/zaqar/storage/errors.py b/zaqar/storage/errors.py index 626e9af5..ad80ad0f 100644 --- a/zaqar/storage/errors.py +++ b/zaqar/storage/errors.py @@ -176,3 +176,12 @@ class PoolInUseByFlavor(NotPermitted): @property def flavor(self): return self._flavor + + +class SubscriptionDoesNotExist(DoesNotExist): + + msg_format = u'Subscription {subscription_id} does not exist' + + def __init__(self, subscription_id): + super(SubscriptionDoesNotExist, + self).__init__(subscription_id=subscription_id) diff --git a/zaqar/storage/mongodb/controllers.py b/zaqar/storage/mongodb/controllers.py index 37e426fe..01b176a8 100644 --- a/zaqar/storage/mongodb/controllers.py +++ b/zaqar/storage/mongodb/controllers.py @@ -28,6 +28,7 @@ from zaqar.storage.mongodb import flavors from zaqar.storage.mongodb import messages from zaqar.storage.mongodb import pools from zaqar.storage.mongodb import queues +from zaqar.storage.mongodb import subscriptions CatalogueController = catalogue.CatalogueController @@ -36,3 +37,4 @@ FlavorsController = flavors.FlavorsController MessageController = messages.MessageController QueueController = queues.QueueController PoolsController = pools.PoolsController +SubscriptionController = subscriptions.SubscriptionController diff --git a/zaqar/storage/mongodb/driver.py b/zaqar/storage/mongodb/driver.py index 94858945..da213926 100644 --- a/zaqar/storage/mongodb/driver.py +++ b/zaqar/storage/mongodb/driver.py @@ -165,6 +165,12 @@ class DataDriver(storage.DataDriverBase): return [self.connection[name + '_messages_p' + str(p)] for p in range(partitions)] + @decorators.lazy_property(write=False) + def subscriptions_database(self): + """Database dedicated to the "subscription" collection.""" + name = self.mongodb_conf.database + '_subscriptions' + return self.connection[name] + @decorators.lazy_property(write=False) def connection(self): """MongoDB client connection instance.""" @@ -182,6 +188,10 @@ class DataDriver(storage.DataDriverBase): def claim_controller(self): return controllers.ClaimController(self) + @decorators.lazy_property(write=False) + def subscription_controller(self): + return controllers.SubscriptionController(self) + class ControlDriver(storage.ControlDriverBase): diff --git a/zaqar/storage/mongodb/subscriptions.py b/zaqar/storage/mongodb/subscriptions.py new file mode 100644 index 00000000..915eb3e9 --- /dev/null +++ b/zaqar/storage/mongodb/subscriptions.py @@ -0,0 +1,151 @@ +# Copyright (c) 2014 Catalyst IT Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy +# of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +from oslo.utils import timeutils +import pymongo.errors + +from zaqar.common import utils as common_utils +from zaqar.storage import base +from zaqar.storage import errors +from zaqar.storage.mongodb import utils + +ID_INDEX_FIELDS = [('_id', 1)] + +SUBSCRIPTIONS_INDEX = [ + ('s', 1), + ('u', 1), + ('p', 1), +] + + +class SubscriptionController(base.Subscription): + """Implements subscription resource operations using MongoDB. + + Subscriptions are unique by project + queue/topic + subscriber. + + Schema: + 's': source :: six.text_type + 'u': subscriber:: six.text_type + 't': ttl:: int + 'e': expires: int + 'o': options :: dict + 'p': project :: six.text_type + """ + + def __init__(self, *args, **kwargs): + super(SubscriptionController, self).__init__(*args, **kwargs) + self._collection = self.driver.subscriptions_database.subscriptions + self._queue_collection = self.driver.queues_database.queues + self._collection.ensure_index(SUBSCRIPTIONS_INDEX, unique=True) + + @utils.raises_conn_error + def list(self, queue, project=None, marker=None, limit=10): + query = {'s': queue, 'p': project} + if marker is not None: + query['_id'] = {'$gt': marker} + + fields = {'s': 1, 'u': 1, 't': 1, 'p': 1, 'o': 1, '_id': 1} + + cursor = self._collection.find(query, fields=fields) + cursor = cursor.limit(limit).sort('_id') + marker_name = {} + + def normalizer(record): + ret = { + 'id': str(record['_id']), + 'source': record['s'], + 'subscriber': record['u'], + 'ttl': record['t'], + 'options': record['o'], + } + marker_name['next'] = record['_id'] + + return ret + + yield utils.HookedCursor(cursor, normalizer) + yield marker_name and marker_name['next'] + + @utils.raises_conn_error + def get(self, queue, subscription_id, project=None): + res = self._collection.find_one({'_id': utils.to_oid(subscription_id), + 'p': project}) + + if not res: + raise errors.SubscriptionDoesNotExist(subscription_id) + + return _normalize(res) + + @utils.raises_conn_error + def create(self, queue, subscriber, ttl, options, project=None): + source = queue + now = timeutils.utcnow_ts() + ttl = int(ttl) + expires = now + ttl + source_query = {'p_q': utils.scope_queue_name(source, project)} + target_source = self._queue_collection.find_one(source_query, + fields={'m': 1, + '_id': 0}) + if target_source is None: + raise errors.QueueDoesNotExist(target_source, project) + + try: + subscription_id = self._collection.insert({'s': source, + 'u': subscriber, + 't': ttl, + 'e': expires, + 'o': options, + 'p': project}) + return subscription_id + except pymongo.errors.DuplicateKeyError: + return None + + @utils.raises_conn_error + def exists(self, queue, subscription_id, project=None): + return self._collection.find_one({'_id': utils.to_oid(subscription_id), + 'p': project}) is not None + + @utils.raises_conn_error + def update(self, queue, subscription_id, project=None, **kwargs): + names = ('subscriber', 'ttl', 'options') + key_transform = lambda x: 'u' if x == 'subscriber' else x[0] + fields = common_utils.fields(kwargs, names, + pred=lambda x: x is not None, + key_transform=key_transform) + assert fields, ('`subscriber`, `ttl`, ' + 'or `options` not found in kwargs') + + res = self._collection.update({'_id': utils.to_oid(subscription_id), + 'p': project}, + {'$set': fields}, + upsert=False) + + if not res['updatedExisting']: + raise errors.SubscriptionDoesNotExist(subscription_id) + + @utils.raises_conn_error + def delete(self, queue, subscription_id, project=None): + self._collection.remove({'_id': utils.to_oid(subscription_id), + 'p': project}, w=0) + + +def _normalize(record): + ret = { + 'id': str(record['_id']), + 'source': record['s'], + 'subscriber': record['u'], + 'ttl': record['t'], + 'options': record['o'] + } + + return ret diff --git a/zaqar/storage/pipeline.py b/zaqar/storage/pipeline.py index 1d6816ac..6cdbf283 100644 --- a/zaqar/storage/pipeline.py +++ b/zaqar/storage/pipeline.py @@ -23,7 +23,7 @@ from zaqar.storage import base LOG = logging.getLogger(__name__) -_PIPELINE_RESOURCES = ('queue', 'message', 'claim') +_PIPELINE_RESOURCES = ('queue', 'message', 'claim', 'subscription') _PIPELINE_CONFIGS = tuple(( cfg.ListOpt(resource + '_pipeline', default=[], @@ -122,3 +122,9 @@ class DataDriver(base.DataDriverBase): stages = _get_storage_pipeline('claim', self.conf) stages.append(self._storage.claim_controller) return stages + + @decorators.lazy_property(write=False) + def subscription_controller(self): + stages = _get_storage_pipeline('subscription', self.conf) + stages.append(self._storage.subscription_controller) + return stages diff --git a/zaqar/storage/pooling.py b/zaqar/storage/pooling.py index 80ee921c..6dfaca19 100644 --- a/zaqar/storage/pooling.py +++ b/zaqar/storage/pooling.py @@ -121,6 +121,10 @@ class DataDriver(storage.DataDriverBase): def claim_controller(self): return ClaimController(self._pool_catalog) + @decorators.lazy_property(write=False) + def subscription_controller(self): + return SubscriptionController(self._pool_catalog) + class QueueController(storage.Queue): """Routes operations to a queue controller in the appropriate pool. @@ -345,6 +349,54 @@ class ClaimController(storage.Claim): return None +class SubscriptionController(storage.Subscription): + """Controller to facilitate processing for subscription operations.""" + + _resource_name = 'subscription' + + def __init__(self, pool_catalog): + super(SubscriptionController, self).__init__(pool_catalog) + self._pool_catalog = pool_catalog + self._get_controller = self._pool_catalog.get_subscription_controller + + def list(self, queue, project=None, marker=None, + limit=storage.DEFAULT_SUBSCRIPTIONS_PER_PAGE): + control = self._get_controller(queue, project) + if control: + return control.list(queue, project=project, + marker=marker, limit=limit) + + def get(self, queue, subscription_id, project=None): + control = self._get_controller(queue, project) + if control: + return control.get(queue, subscription_id, project=project) + + def create(self, queue, subscriber, ttl, options, project=None): + control = self._get_controller(queue, project) + if control: + return control.post(queue, subscriber, + ttl, options, + project=project) + + def update(self, queue, subscription_id, project=None, **kwargs): + control = self._get_controller(queue, project) + if control: + return control.update(queue, subscription_id, + project=project, **kwargs) + + def delete(self, queue, subscription_id, project=None): + control = self._get_controller(queue, project) + if control: + return control.delete(queue, subscription_id, + project=project) + + def exists(self, queue, subscription_id, project=None): + control = self._get_controller(queue, project) + if control: + return control.exists(queue, subscription_id, + project=project) + + class Catalog(object): """Represents the mapping between queues and pool drivers.""" @@ -491,6 +543,21 @@ class Catalog(object): target = self.lookup(queue, project) return target and target.claim_controller + def get_subscription_controller(self, queue, project=None): + """Lookup the subscription controller for the given queue and project. + + :param queue: Name of the queue for which to find a pool + :param project: Project to which the queue belongs, or + None to specify the "global" or "generic" project. + + :returns: The subscription controller associated with the data driver + for the pool containing (queue, project) or None if this doesn't + exist. + :rtype: Maybe SubscriptionController + """ + target = self.lookup(queue, project) + return target and target.subscription_controller + def lookup(self, queue, project=None): """Lookup a pool driver for the given queue and project. diff --git a/zaqar/storage/redis/driver.py b/zaqar/storage/redis/driver.py index 834442db..3f7d0495 100644 --- a/zaqar/storage/redis/driver.py +++ b/zaqar/storage/redis/driver.py @@ -206,6 +206,10 @@ class DataDriver(storage.DataDriverBase): def claim_controller(self): return controllers.ClaimController(self) + @decorators.lazy_property(write=False) + def subscription_controller(self): + raise NotImplementedError() + class ControlDriver(storage.ControlDriverBase): @@ -234,6 +238,10 @@ class ControlDriver(storage.ControlDriverBase): def flavors_controller(self): raise NotImplementedError() + @property + def subscriptions_controller(self): + raise NotImplementedError() + def _get_redis_client(driver): conf = driver.redis_conf diff --git a/zaqar/storage/sqlalchemy/driver.py b/zaqar/storage/sqlalchemy/driver.py index ef99eeb2..086df72f 100644 --- a/zaqar/storage/sqlalchemy/driver.py +++ b/zaqar/storage/sqlalchemy/driver.py @@ -128,6 +128,10 @@ class DataDriver(storage.DataDriverBase): def claim_controller(self): return controllers.ClaimController(self) + @decorators.lazy_property(write=False) + def subscription_controller(self): + pass + def is_alive(self): return True @@ -190,3 +194,7 @@ class ControlDriver(storage.ControlDriverBase): def flavors_controller(self): # NOTE(flaper87): Needed to avoid `abc` errors. pass + + @property + def subscriptions_controller(self): + pass diff --git a/zaqar/tests/faulty_storage.py b/zaqar/tests/faulty_storage.py index 4487165d..a4a98ed3 100644 --- a/zaqar/tests/faulty_storage.py +++ b/zaqar/tests/faulty_storage.py @@ -46,6 +46,10 @@ class DataDriver(storage.DataDriverBase): def claim_controller(self): return None + @property + def subscription_controller(self): + return None + class ControlDriver(storage.ControlDriverBase): diff --git a/zaqar/tests/unit/storage/base.py b/zaqar/tests/unit/storage/base.py index 24249fa3..ef405ad2 100644 --- a/zaqar/tests/unit/storage/base.py +++ b/zaqar/tests/unit/storage/base.py @@ -1,4 +1,5 @@ # Copyright (c) 2013 Red Hat, Inc. +# Copyright (c) 2014 Catalyst IT Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -929,6 +930,127 @@ class ClaimControllerTest(ControllerBaseTest): project=self.project) +class SubscriptionControllerTest(ControllerBaseTest): + """Subscriptions Controller base tests. + + """ + queue_name = 'test_queue' + controller_base_class = storage.Subscription + + def setUp(self): + super(SubscriptionControllerTest, self).setUp() + self.subscription_controller = self.driver.subscription_controller + + # Lets create a queue as the source of subscription + self.queue_controller = self.driver.queue_controller + self.queue_controller.create(self.queue_name, project=self.project) + + self.source = self.queue_name + self.subscriber = 'http://trigger.me' + self.ttl = 600 + self.options = {'uri': 'http://fake.com'} + + def tearDown(self): + self.queue_controller.delete(self.queue_name, project=self.project) + super(SubscriptionControllerTest, self).tearDown() + + def test_list(self): + for s in six.moves.xrange(15): + subscriber = 'http://fake_{0}'.format(s) + self.subscription_controller.create(self.source, + subscriber, + self.ttl, + self.options, + project=self.project) + + interaction = self.subscription_controller.list(self.source, + project=self.project) + subscriptions = list(next(interaction)) + + self.assertEqual(all(map(lambda s: 'source' in s and 'subscriber' in s, + subscriptions)), True) + self.assertEqual(len(subscriptions), 10) + + interaction = (self.subscription_controller.list(self.source, + project=self.project, + marker=next(interaction))) + subscriptions = list(next(interaction)) + + self.assertEqual(all(map(lambda s: 'source' in s and 'subscriber' in s, + subscriptions)), True) + self.assertEqual(len(subscriptions), 5) + + def test_get_raises_if_subscription_does_not_exist(self): + self.assertRaises(errors.SubscriptionDoesNotExist, + self.subscription_controller.get, + self.queue_name, + 'notexists', + project=self.project) + + def test_lifecycle(self): + s_id = self.subscription_controller.create(self.source, + self.subscriber, + self.ttl, + self.options, + project=self.project) + + subscription = self.subscription_controller.get(self.queue_name, + s_id, + self.project) + + self.assertEqual(self.source, + subscription['source']) + self.assertEqual(self.subscriber, + subscription['subscriber']) + + exist = self.subscription_controller.exists(self.queue_name, + s_id, + self.project) + + self.assertTrue(exist) + + self.subscription_controller.update(self.queue_name, + s_id, + project=self.project, + subscriber='http://a.com' + ) + + updated = self.subscription_controller.get(self.queue_name, + s_id, + self.project) + + self.assertEqual('http://a.com', updated['subscriber']) + + self.subscription_controller.delete(self.queue_name, + s_id, project=self.project) + self.assertRaises(errors.SubscriptionDoesNotExist, + self.subscription_controller.get, + self.queue_name, s_id) + + def test_create_existed(self): + self.subscription_controller.create(self.source, + self.subscriber, + self.ttl, + self.options, + project=self.project) + + s_id = self.subscription_controller.create(self.source, + self.subscriber, + self.ttl, + self.options, + project=self.project) + self.assertIsNone(s_id) + + def test_nonexist_source(self): + self.assertRaises(errors.QueueDoesNotExist, + self.subscription_controller.create, + 'fake_queue_name', + self.subscriber, + self.ttl, + self.options, + self.project) + + class PoolsControllerTest(ControllerBaseTest): """Pools Controller base tests.