Implement mongodb driver for notifications

This patch adds the base API for notification to the storage layer,
and an init mongodb driver. The API follows pretty much the leads
of other APIs and supports basic operations that allow users to
create, delete, update and retrieve subscriptions for notification.

Partially-Implements blueprint: notifications

Change-Id: Ie065aaec16b9e6dbf9d31d93320bf6e691a4ed37
This commit is contained in:
Fei Long Wang 2014-12-07 00:05:42 +13:00
parent 6222cc6654
commit 1b694eb485
13 changed files with 511 additions and 2 deletions

View File

@ -41,7 +41,8 @@ from zaqar.tests.unit.storage import base
class MongodbSetupMixin(object):
def _purge_databases(self):
databases = (self.driver.message_databases +
[self.driver.queues_database])
[self.driver.queues_database,
self.driver.subscriptions_database])
for db in databases:
self.driver.connection.drop_database(db)
@ -444,6 +445,14 @@ class MongodbClaimTests(MongodbSetupMixin, base.ClaimControllerTest):
project=self.project)
@testing.requires_mongodb
class MongodbSubscriptionTests(MongodbSetupMixin,
base.SubscriptionControllerTest):
driver_class = mongodb.DataDriver
config_file = 'wsgi_mongodb.conf'
controller_class = controllers.SubscriptionController
#
# TODO(kgriffs): Do these need database purges as well as those above?
#

View File

@ -24,11 +24,13 @@ CatalogueBase = base.CatalogueBase
Claim = base.Claim
Message = base.Message
Queue = base.Queue
Subscription = base.Subscription
PoolsBase = base.PoolsBase
FlavorsBase = base.FlavorsBase
DEFAULT_QUEUES_PER_PAGE = base.DEFAULT_QUEUES_PER_PAGE
DEFAULT_MESSAGES_PER_PAGE = base.DEFAULT_MESSAGES_PER_PAGE
DEFAULT_POOLS_PER_PAGE = base.DEFAULT_POOLS_PER_PAGE
DEFAULT_SUBSCRIPTIONS_PER_PAGE = base.DEFAULT_SUBSCRIPTIONS_PER_PAGE
DEFAULT_MESSAGES_PER_CLAIM = base.DEFAULT_MESSAGES_PER_CLAIM

View File

@ -31,6 +31,7 @@ import zaqar.openstack.common.log as logging
DEFAULT_QUEUES_PER_PAGE = 10
DEFAULT_MESSAGES_PER_PAGE = 10
DEFAULT_POOLS_PER_PAGE = 10
DEFAULT_SUBSCRIPTIONS_PER_PAGE = 10
DEFAULT_MESSAGES_PER_CLAIM = 10
@ -223,6 +224,11 @@ class DataDriverBase(DriverBase):
"""Returns the driver's claim controller."""
raise NotImplementedError
@abc.abstractproperty
def subscription_controller(self):
"""Returns the driver's subscription controller."""
raise NotImplementedError
@six.add_metaclass(abc.ABCMeta)
class ControlDriverBase(DriverBase):
@ -564,6 +570,111 @@ class Claim(ControllerBase):
raise NotImplementedError
@six.add_metaclass(abc.ABCMeta)
class Subscription(ControllerBase):
"""This class is responsible for managing subscriptions of notification.
"""
@abc.abstractmethod
def list(self, queue, project=None, marker=None,
limit=DEFAULT_SUBSCRIPTIONS_PER_PAGE):
"""Base method for listing subscriptions.
:param queue: Name of the queue to get the subscriptions from.
:type queue: six.text_type
:param project: Project this subscription belongs to.
:type project: six.text_type
:param marker: used to determine which subscription to start with
:type marker: six.text_type
:param limit: (Default 10) Max number of results to return
:type limit: int
:returns: An iterator giving a sequence of subscriptions
and the marker of the next page.
:rtype: [{}]
"""
raise NotImplementedError
@abc.abstractmethod
def get(self, queue, subscription_id, project=None):
"""Returns a single subscription entry.
:param queue: Name of the queue subscription belongs to.
:type queue: six.text_type
:param subscription_id: ID of this subscription
:type subscription_id: six.text_type
:param project: Project this subscription belongs to.
:type project: six.text_type
:returns: Dictionary containing subscription data
:rtype: {}
:raises: SubscriptionDoesNotExist if not found
"""
raise NotImplementedError
@abc.abstractmethod
def create(self, queue, subscriber, ttl, options, project=None):
"""Create a new subscription.
:param queue:The source queue for notifications
:type queue: six.text_type
:param subscriber: The subscriber URI
:type subscriber: six.text_type
:param ttl: time to live for this subscription
:type ttl: int
:param options: Options used to configure this subscription
:type options: dict
:param project: Project id
:type project: six.text_type
:returns: True if a subscription was created and False
if it is failed.
:rtype: boolean
"""
raise NotImplementedError
@abc.abstractmethod
def update(self, queue, subscription_id, project=None, **kwargs):
"""Updates the weight, uris, and/or options of this subscription
:param queue: Name of the queue subscription belongs to.
:type queue: six.text_type
:param name: ID of the subscription
:type name: text
:param kwargs: one of: `source`, `subscriber`, `ttl`, `options`
:type kwargs: dict
:raises: SubscriptionDoesNotExist if not found
"""
raise NotImplementedError
@abc.abstractmethod
def exists(self, queue, subscription_id, project=None):
"""Base method for testing subscription existence.
:param queue: Name of the queue subscription belongs to.
:type queue: six.text_type
:param subscription_id: ID of subscription
:type subscription_id: six.text_type
:param project: Project id
:type project: six.text_type
:returns: True if a subscription exists and False
if it does not.
"""
raise NotImplementedError
@abc.abstractmethod
def delete(self, queue, subscription_id, project=None):
"""Base method for deleting a subscription.
:param queue: Name of the queue subscription belongs to.
:type queue: six.text_type
:param subscription_id: ID of the subscription to be deleted.
:type subscription_id: six.text_type
:param project: Project id
:type project: six.text_type
"""
raise NotImplementedError
@six.add_metaclass(abc.ABCMeta)
class PoolsBase(ControllerBase):
"""A controller for managing pools."""

View File

@ -176,3 +176,12 @@ class PoolInUseByFlavor(NotPermitted):
@property
def flavor(self):
return self._flavor
class SubscriptionDoesNotExist(DoesNotExist):
msg_format = u'Subscription {subscription_id} does not exist'
def __init__(self, subscription_id):
super(SubscriptionDoesNotExist,
self).__init__(subscription_id=subscription_id)

View File

@ -28,6 +28,7 @@ from zaqar.storage.mongodb import flavors
from zaqar.storage.mongodb import messages
from zaqar.storage.mongodb import pools
from zaqar.storage.mongodb import queues
from zaqar.storage.mongodb import subscriptions
CatalogueController = catalogue.CatalogueController
@ -36,3 +37,4 @@ FlavorsController = flavors.FlavorsController
MessageController = messages.MessageController
QueueController = queues.QueueController
PoolsController = pools.PoolsController
SubscriptionController = subscriptions.SubscriptionController

View File

@ -165,6 +165,12 @@ class DataDriver(storage.DataDriverBase):
return [self.connection[name + '_messages_p' + str(p)]
for p in range(partitions)]
@decorators.lazy_property(write=False)
def subscriptions_database(self):
"""Database dedicated to the "subscription" collection."""
name = self.mongodb_conf.database + '_subscriptions'
return self.connection[name]
@decorators.lazy_property(write=False)
def connection(self):
"""MongoDB client connection instance."""
@ -182,6 +188,10 @@ class DataDriver(storage.DataDriverBase):
def claim_controller(self):
return controllers.ClaimController(self)
@decorators.lazy_property(write=False)
def subscription_controller(self):
return controllers.SubscriptionController(self)
class ControlDriver(storage.ControlDriverBase):

View File

@ -0,0 +1,151 @@
# Copyright (c) 2014 Catalyst IT Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
from oslo.utils import timeutils
import pymongo.errors
from zaqar.common import utils as common_utils
from zaqar.storage import base
from zaqar.storage import errors
from zaqar.storage.mongodb import utils
ID_INDEX_FIELDS = [('_id', 1)]
SUBSCRIPTIONS_INDEX = [
('s', 1),
('u', 1),
('p', 1),
]
class SubscriptionController(base.Subscription):
"""Implements subscription resource operations using MongoDB.
Subscriptions are unique by project + queue/topic + subscriber.
Schema:
's': source :: six.text_type
'u': subscriber:: six.text_type
't': ttl:: int
'e': expires: int
'o': options :: dict
'p': project :: six.text_type
"""
def __init__(self, *args, **kwargs):
super(SubscriptionController, self).__init__(*args, **kwargs)
self._collection = self.driver.subscriptions_database.subscriptions
self._queue_collection = self.driver.queues_database.queues
self._collection.ensure_index(SUBSCRIPTIONS_INDEX, unique=True)
@utils.raises_conn_error
def list(self, queue, project=None, marker=None, limit=10):
query = {'s': queue, 'p': project}
if marker is not None:
query['_id'] = {'$gt': marker}
fields = {'s': 1, 'u': 1, 't': 1, 'p': 1, 'o': 1, '_id': 1}
cursor = self._collection.find(query, fields=fields)
cursor = cursor.limit(limit).sort('_id')
marker_name = {}
def normalizer(record):
ret = {
'id': str(record['_id']),
'source': record['s'],
'subscriber': record['u'],
'ttl': record['t'],
'options': record['o'],
}
marker_name['next'] = record['_id']
return ret
yield utils.HookedCursor(cursor, normalizer)
yield marker_name and marker_name['next']
@utils.raises_conn_error
def get(self, queue, subscription_id, project=None):
res = self._collection.find_one({'_id': utils.to_oid(subscription_id),
'p': project})
if not res:
raise errors.SubscriptionDoesNotExist(subscription_id)
return _normalize(res)
@utils.raises_conn_error
def create(self, queue, subscriber, ttl, options, project=None):
source = queue
now = timeutils.utcnow_ts()
ttl = int(ttl)
expires = now + ttl
source_query = {'p_q': utils.scope_queue_name(source, project)}
target_source = self._queue_collection.find_one(source_query,
fields={'m': 1,
'_id': 0})
if target_source is None:
raise errors.QueueDoesNotExist(target_source, project)
try:
subscription_id = self._collection.insert({'s': source,
'u': subscriber,
't': ttl,
'e': expires,
'o': options,
'p': project})
return subscription_id
except pymongo.errors.DuplicateKeyError:
return None
@utils.raises_conn_error
def exists(self, queue, subscription_id, project=None):
return self._collection.find_one({'_id': utils.to_oid(subscription_id),
'p': project}) is not None
@utils.raises_conn_error
def update(self, queue, subscription_id, project=None, **kwargs):
names = ('subscriber', 'ttl', 'options')
key_transform = lambda x: 'u' if x == 'subscriber' else x[0]
fields = common_utils.fields(kwargs, names,
pred=lambda x: x is not None,
key_transform=key_transform)
assert fields, ('`subscriber`, `ttl`, '
'or `options` not found in kwargs')
res = self._collection.update({'_id': utils.to_oid(subscription_id),
'p': project},
{'$set': fields},
upsert=False)
if not res['updatedExisting']:
raise errors.SubscriptionDoesNotExist(subscription_id)
@utils.raises_conn_error
def delete(self, queue, subscription_id, project=None):
self._collection.remove({'_id': utils.to_oid(subscription_id),
'p': project}, w=0)
def _normalize(record):
ret = {
'id': str(record['_id']),
'source': record['s'],
'subscriber': record['u'],
'ttl': record['t'],
'options': record['o']
}
return ret

View File

@ -23,7 +23,7 @@ from zaqar.storage import base
LOG = logging.getLogger(__name__)
_PIPELINE_RESOURCES = ('queue', 'message', 'claim')
_PIPELINE_RESOURCES = ('queue', 'message', 'claim', 'subscription')
_PIPELINE_CONFIGS = tuple((
cfg.ListOpt(resource + '_pipeline', default=[],
@ -122,3 +122,9 @@ class DataDriver(base.DataDriverBase):
stages = _get_storage_pipeline('claim', self.conf)
stages.append(self._storage.claim_controller)
return stages
@decorators.lazy_property(write=False)
def subscription_controller(self):
stages = _get_storage_pipeline('subscription', self.conf)
stages.append(self._storage.subscription_controller)
return stages

View File

@ -121,6 +121,10 @@ class DataDriver(storage.DataDriverBase):
def claim_controller(self):
return ClaimController(self._pool_catalog)
@decorators.lazy_property(write=False)
def subscription_controller(self):
return SubscriptionController(self._pool_catalog)
class QueueController(storage.Queue):
"""Routes operations to a queue controller in the appropriate pool.
@ -345,6 +349,54 @@ class ClaimController(storage.Claim):
return None
class SubscriptionController(storage.Subscription):
"""Controller to facilitate processing for subscription operations."""
_resource_name = 'subscription'
def __init__(self, pool_catalog):
super(SubscriptionController, self).__init__(pool_catalog)
self._pool_catalog = pool_catalog
self._get_controller = self._pool_catalog.get_subscription_controller
def list(self, queue, project=None, marker=None,
limit=storage.DEFAULT_SUBSCRIPTIONS_PER_PAGE):
control = self._get_controller(queue, project)
if control:
return control.list(queue, project=project,
marker=marker, limit=limit)
def get(self, queue, subscription_id, project=None):
control = self._get_controller(queue, project)
if control:
return control.get(queue, subscription_id, project=project)
def create(self, queue, subscriber, ttl, options, project=None):
control = self._get_controller(queue, project)
if control:
return control.post(queue, subscriber,
ttl, options,
project=project)
def update(self, queue, subscription_id, project=None, **kwargs):
control = self._get_controller(queue, project)
if control:
return control.update(queue, subscription_id,
project=project, **kwargs)
def delete(self, queue, subscription_id, project=None):
control = self._get_controller(queue, project)
if control:
return control.delete(queue, subscription_id,
project=project)
def exists(self, queue, subscription_id, project=None):
control = self._get_controller(queue, project)
if control:
return control.exists(queue, subscription_id,
project=project)
class Catalog(object):
"""Represents the mapping between queues and pool drivers."""
@ -491,6 +543,21 @@ class Catalog(object):
target = self.lookup(queue, project)
return target and target.claim_controller
def get_subscription_controller(self, queue, project=None):
"""Lookup the subscription controller for the given queue and project.
:param queue: Name of the queue for which to find a pool
:param project: Project to which the queue belongs, or
None to specify the "global" or "generic" project.
:returns: The subscription controller associated with the data driver
for the pool containing (queue, project) or None if this doesn't
exist.
:rtype: Maybe SubscriptionController
"""
target = self.lookup(queue, project)
return target and target.subscription_controller
def lookup(self, queue, project=None):
"""Lookup a pool driver for the given queue and project.

View File

@ -206,6 +206,10 @@ class DataDriver(storage.DataDriverBase):
def claim_controller(self):
return controllers.ClaimController(self)
@decorators.lazy_property(write=False)
def subscription_controller(self):
raise NotImplementedError()
class ControlDriver(storage.ControlDriverBase):
@ -234,6 +238,10 @@ class ControlDriver(storage.ControlDriverBase):
def flavors_controller(self):
raise NotImplementedError()
@property
def subscriptions_controller(self):
raise NotImplementedError()
def _get_redis_client(driver):
conf = driver.redis_conf

View File

@ -128,6 +128,10 @@ class DataDriver(storage.DataDriverBase):
def claim_controller(self):
return controllers.ClaimController(self)
@decorators.lazy_property(write=False)
def subscription_controller(self):
pass
def is_alive(self):
return True
@ -190,3 +194,7 @@ class ControlDriver(storage.ControlDriverBase):
def flavors_controller(self):
# NOTE(flaper87): Needed to avoid `abc` errors.
pass
@property
def subscriptions_controller(self):
pass

View File

@ -46,6 +46,10 @@ class DataDriver(storage.DataDriverBase):
def claim_controller(self):
return None
@property
def subscription_controller(self):
return None
class ControlDriver(storage.ControlDriverBase):

View File

@ -1,4 +1,5 @@
# Copyright (c) 2013 Red Hat, Inc.
# Copyright (c) 2014 Catalyst IT Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -929,6 +930,127 @@ class ClaimControllerTest(ControllerBaseTest):
project=self.project)
class SubscriptionControllerTest(ControllerBaseTest):
"""Subscriptions Controller base tests.
"""
queue_name = 'test_queue'
controller_base_class = storage.Subscription
def setUp(self):
super(SubscriptionControllerTest, self).setUp()
self.subscription_controller = self.driver.subscription_controller
# Lets create a queue as the source of subscription
self.queue_controller = self.driver.queue_controller
self.queue_controller.create(self.queue_name, project=self.project)
self.source = self.queue_name
self.subscriber = 'http://trigger.me'
self.ttl = 600
self.options = {'uri': 'http://fake.com'}
def tearDown(self):
self.queue_controller.delete(self.queue_name, project=self.project)
super(SubscriptionControllerTest, self).tearDown()
def test_list(self):
for s in six.moves.xrange(15):
subscriber = 'http://fake_{0}'.format(s)
self.subscription_controller.create(self.source,
subscriber,
self.ttl,
self.options,
project=self.project)
interaction = self.subscription_controller.list(self.source,
project=self.project)
subscriptions = list(next(interaction))
self.assertEqual(all(map(lambda s: 'source' in s and 'subscriber' in s,
subscriptions)), True)
self.assertEqual(len(subscriptions), 10)
interaction = (self.subscription_controller.list(self.source,
project=self.project,
marker=next(interaction)))
subscriptions = list(next(interaction))
self.assertEqual(all(map(lambda s: 'source' in s and 'subscriber' in s,
subscriptions)), True)
self.assertEqual(len(subscriptions), 5)
def test_get_raises_if_subscription_does_not_exist(self):
self.assertRaises(errors.SubscriptionDoesNotExist,
self.subscription_controller.get,
self.queue_name,
'notexists',
project=self.project)
def test_lifecycle(self):
s_id = self.subscription_controller.create(self.source,
self.subscriber,
self.ttl,
self.options,
project=self.project)
subscription = self.subscription_controller.get(self.queue_name,
s_id,
self.project)
self.assertEqual(self.source,
subscription['source'])
self.assertEqual(self.subscriber,
subscription['subscriber'])
exist = self.subscription_controller.exists(self.queue_name,
s_id,
self.project)
self.assertTrue(exist)
self.subscription_controller.update(self.queue_name,
s_id,
project=self.project,
subscriber='http://a.com'
)
updated = self.subscription_controller.get(self.queue_name,
s_id,
self.project)
self.assertEqual('http://a.com', updated['subscriber'])
self.subscription_controller.delete(self.queue_name,
s_id, project=self.project)
self.assertRaises(errors.SubscriptionDoesNotExist,
self.subscription_controller.get,
self.queue_name, s_id)
def test_create_existed(self):
self.subscription_controller.create(self.source,
self.subscriber,
self.ttl,
self.options,
project=self.project)
s_id = self.subscription_controller.create(self.source,
self.subscriber,
self.ttl,
self.options,
project=self.project)
self.assertIsNone(s_id)
def test_nonexist_source(self):
self.assertRaises(errors.QueueDoesNotExist,
self.subscription_controller.create,
'fake_queue_name',
self.subscriber,
self.ttl,
self.options,
self.project)
class PoolsControllerTest(ControllerBaseTest):
"""Pools Controller base tests.