Merge "Implement mongodb driver for notifications"

This commit is contained in:
Jenkins 2015-01-19 10:33:38 +00:00 committed by Gerrit Code Review
commit 898ced1ec1
13 changed files with 511 additions and 2 deletions

View File

@ -41,7 +41,8 @@ from zaqar.tests.unit.storage import base
class MongodbSetupMixin(object):
def _purge_databases(self):
databases = (self.driver.message_databases +
[self.driver.queues_database])
[self.driver.queues_database,
self.driver.subscriptions_database])
for db in databases:
self.driver.connection.drop_database(db)
@ -444,6 +445,14 @@ class MongodbClaimTests(MongodbSetupMixin, base.ClaimControllerTest):
project=self.project)
@testing.requires_mongodb
class MongodbSubscriptionTests(MongodbSetupMixin,
base.SubscriptionControllerTest):
driver_class = mongodb.DataDriver
config_file = 'wsgi_mongodb.conf'
controller_class = controllers.SubscriptionController
#
# TODO(kgriffs): Do these need database purges as well as those above?
#

View File

@ -24,11 +24,13 @@ CatalogueBase = base.CatalogueBase
Claim = base.Claim
Message = base.Message
Queue = base.Queue
Subscription = base.Subscription
PoolsBase = base.PoolsBase
FlavorsBase = base.FlavorsBase
DEFAULT_QUEUES_PER_PAGE = base.DEFAULT_QUEUES_PER_PAGE
DEFAULT_MESSAGES_PER_PAGE = base.DEFAULT_MESSAGES_PER_PAGE
DEFAULT_POOLS_PER_PAGE = base.DEFAULT_POOLS_PER_PAGE
DEFAULT_SUBSCRIPTIONS_PER_PAGE = base.DEFAULT_SUBSCRIPTIONS_PER_PAGE
DEFAULT_MESSAGES_PER_CLAIM = base.DEFAULT_MESSAGES_PER_CLAIM

View File

@ -31,6 +31,7 @@ import zaqar.openstack.common.log as logging
DEFAULT_QUEUES_PER_PAGE = 10
DEFAULT_MESSAGES_PER_PAGE = 10
DEFAULT_POOLS_PER_PAGE = 10
DEFAULT_SUBSCRIPTIONS_PER_PAGE = 10
DEFAULT_MESSAGES_PER_CLAIM = 10
@ -223,6 +224,11 @@ class DataDriverBase(DriverBase):
"""Returns the driver's claim controller."""
raise NotImplementedError
@abc.abstractproperty
def subscription_controller(self):
"""Returns the driver's subscription controller."""
raise NotImplementedError
@six.add_metaclass(abc.ABCMeta)
class ControlDriverBase(DriverBase):
@ -564,6 +570,111 @@ class Claim(ControllerBase):
raise NotImplementedError
@six.add_metaclass(abc.ABCMeta)
class Subscription(ControllerBase):
"""This class is responsible for managing subscriptions of notification.
"""
@abc.abstractmethod
def list(self, queue, project=None, marker=None,
limit=DEFAULT_SUBSCRIPTIONS_PER_PAGE):
"""Base method for listing subscriptions.
:param queue: Name of the queue to get the subscriptions from.
:type queue: six.text_type
:param project: Project this subscription belongs to.
:type project: six.text_type
:param marker: used to determine which subscription to start with
:type marker: six.text_type
:param limit: (Default 10) Max number of results to return
:type limit: int
:returns: An iterator giving a sequence of subscriptions
and the marker of the next page.
:rtype: [{}]
"""
raise NotImplementedError
@abc.abstractmethod
def get(self, queue, subscription_id, project=None):
"""Returns a single subscription entry.
:param queue: Name of the queue subscription belongs to.
:type queue: six.text_type
:param subscription_id: ID of this subscription
:type subscription_id: six.text_type
:param project: Project this subscription belongs to.
:type project: six.text_type
:returns: Dictionary containing subscription data
:rtype: {}
:raises: SubscriptionDoesNotExist if not found
"""
raise NotImplementedError
@abc.abstractmethod
def create(self, queue, subscriber, ttl, options, project=None):
"""Create a new subscription.
:param queue:The source queue for notifications
:type queue: six.text_type
:param subscriber: The subscriber URI
:type subscriber: six.text_type
:param ttl: time to live for this subscription
:type ttl: int
:param options: Options used to configure this subscription
:type options: dict
:param project: Project id
:type project: six.text_type
:returns: True if a subscription was created and False
if it is failed.
:rtype: boolean
"""
raise NotImplementedError
@abc.abstractmethod
def update(self, queue, subscription_id, project=None, **kwargs):
"""Updates the weight, uris, and/or options of this subscription
:param queue: Name of the queue subscription belongs to.
:type queue: six.text_type
:param name: ID of the subscription
:type name: text
:param kwargs: one of: `source`, `subscriber`, `ttl`, `options`
:type kwargs: dict
:raises: SubscriptionDoesNotExist if not found
"""
raise NotImplementedError
@abc.abstractmethod
def exists(self, queue, subscription_id, project=None):
"""Base method for testing subscription existence.
:param queue: Name of the queue subscription belongs to.
:type queue: six.text_type
:param subscription_id: ID of subscription
:type subscription_id: six.text_type
:param project: Project id
:type project: six.text_type
:returns: True if a subscription exists and False
if it does not.
"""
raise NotImplementedError
@abc.abstractmethod
def delete(self, queue, subscription_id, project=None):
"""Base method for deleting a subscription.
:param queue: Name of the queue subscription belongs to.
:type queue: six.text_type
:param subscription_id: ID of the subscription to be deleted.
:type subscription_id: six.text_type
:param project: Project id
:type project: six.text_type
"""
raise NotImplementedError
@six.add_metaclass(abc.ABCMeta)
class PoolsBase(ControllerBase):
"""A controller for managing pools."""

View File

@ -176,3 +176,12 @@ class PoolInUseByFlavor(NotPermitted):
@property
def flavor(self):
return self._flavor
class SubscriptionDoesNotExist(DoesNotExist):
msg_format = u'Subscription {subscription_id} does not exist'
def __init__(self, subscription_id):
super(SubscriptionDoesNotExist,
self).__init__(subscription_id=subscription_id)

View File

@ -28,6 +28,7 @@ from zaqar.storage.mongodb import flavors
from zaqar.storage.mongodb import messages
from zaqar.storage.mongodb import pools
from zaqar.storage.mongodb import queues
from zaqar.storage.mongodb import subscriptions
CatalogueController = catalogue.CatalogueController
@ -36,3 +37,4 @@ FlavorsController = flavors.FlavorsController
MessageController = messages.MessageController
QueueController = queues.QueueController
PoolsController = pools.PoolsController
SubscriptionController = subscriptions.SubscriptionController

View File

@ -165,6 +165,12 @@ class DataDriver(storage.DataDriverBase):
return [self.connection[name + '_messages_p' + str(p)]
for p in range(partitions)]
@decorators.lazy_property(write=False)
def subscriptions_database(self):
"""Database dedicated to the "subscription" collection."""
name = self.mongodb_conf.database + '_subscriptions'
return self.connection[name]
@decorators.lazy_property(write=False)
def connection(self):
"""MongoDB client connection instance."""
@ -182,6 +188,10 @@ class DataDriver(storage.DataDriverBase):
def claim_controller(self):
return controllers.ClaimController(self)
@decorators.lazy_property(write=False)
def subscription_controller(self):
return controllers.SubscriptionController(self)
class ControlDriver(storage.ControlDriverBase):

View File

@ -0,0 +1,151 @@
# Copyright (c) 2014 Catalyst IT Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
from oslo.utils import timeutils
import pymongo.errors
from zaqar.common import utils as common_utils
from zaqar.storage import base
from zaqar.storage import errors
from zaqar.storage.mongodb import utils
ID_INDEX_FIELDS = [('_id', 1)]
SUBSCRIPTIONS_INDEX = [
('s', 1),
('u', 1),
('p', 1),
]
class SubscriptionController(base.Subscription):
"""Implements subscription resource operations using MongoDB.
Subscriptions are unique by project + queue/topic + subscriber.
Schema:
's': source :: six.text_type
'u': subscriber:: six.text_type
't': ttl:: int
'e': expires: int
'o': options :: dict
'p': project :: six.text_type
"""
def __init__(self, *args, **kwargs):
super(SubscriptionController, self).__init__(*args, **kwargs)
self._collection = self.driver.subscriptions_database.subscriptions
self._queue_collection = self.driver.queues_database.queues
self._collection.ensure_index(SUBSCRIPTIONS_INDEX, unique=True)
@utils.raises_conn_error
def list(self, queue, project=None, marker=None, limit=10):
query = {'s': queue, 'p': project}
if marker is not None:
query['_id'] = {'$gt': marker}
fields = {'s': 1, 'u': 1, 't': 1, 'p': 1, 'o': 1, '_id': 1}
cursor = self._collection.find(query, fields=fields)
cursor = cursor.limit(limit).sort('_id')
marker_name = {}
def normalizer(record):
ret = {
'id': str(record['_id']),
'source': record['s'],
'subscriber': record['u'],
'ttl': record['t'],
'options': record['o'],
}
marker_name['next'] = record['_id']
return ret
yield utils.HookedCursor(cursor, normalizer)
yield marker_name and marker_name['next']
@utils.raises_conn_error
def get(self, queue, subscription_id, project=None):
res = self._collection.find_one({'_id': utils.to_oid(subscription_id),
'p': project})
if not res:
raise errors.SubscriptionDoesNotExist(subscription_id)
return _normalize(res)
@utils.raises_conn_error
def create(self, queue, subscriber, ttl, options, project=None):
source = queue
now = timeutils.utcnow_ts()
ttl = int(ttl)
expires = now + ttl
source_query = {'p_q': utils.scope_queue_name(source, project)}
target_source = self._queue_collection.find_one(source_query,
fields={'m': 1,
'_id': 0})
if target_source is None:
raise errors.QueueDoesNotExist(target_source, project)
try:
subscription_id = self._collection.insert({'s': source,
'u': subscriber,
't': ttl,
'e': expires,
'o': options,
'p': project})
return subscription_id
except pymongo.errors.DuplicateKeyError:
return None
@utils.raises_conn_error
def exists(self, queue, subscription_id, project=None):
return self._collection.find_one({'_id': utils.to_oid(subscription_id),
'p': project}) is not None
@utils.raises_conn_error
def update(self, queue, subscription_id, project=None, **kwargs):
names = ('subscriber', 'ttl', 'options')
key_transform = lambda x: 'u' if x == 'subscriber' else x[0]
fields = common_utils.fields(kwargs, names,
pred=lambda x: x is not None,
key_transform=key_transform)
assert fields, ('`subscriber`, `ttl`, '
'or `options` not found in kwargs')
res = self._collection.update({'_id': utils.to_oid(subscription_id),
'p': project},
{'$set': fields},
upsert=False)
if not res['updatedExisting']:
raise errors.SubscriptionDoesNotExist(subscription_id)
@utils.raises_conn_error
def delete(self, queue, subscription_id, project=None):
self._collection.remove({'_id': utils.to_oid(subscription_id),
'p': project}, w=0)
def _normalize(record):
ret = {
'id': str(record['_id']),
'source': record['s'],
'subscriber': record['u'],
'ttl': record['t'],
'options': record['o']
}
return ret

View File

@ -23,7 +23,7 @@ from zaqar.storage import base
LOG = logging.getLogger(__name__)
_PIPELINE_RESOURCES = ('queue', 'message', 'claim')
_PIPELINE_RESOURCES = ('queue', 'message', 'claim', 'subscription')
_PIPELINE_CONFIGS = tuple((
cfg.ListOpt(resource + '_pipeline', default=[],
@ -122,3 +122,9 @@ class DataDriver(base.DataDriverBase):
stages = _get_storage_pipeline('claim', self.conf)
stages.append(self._storage.claim_controller)
return stages
@decorators.lazy_property(write=False)
def subscription_controller(self):
stages = _get_storage_pipeline('subscription', self.conf)
stages.append(self._storage.subscription_controller)
return stages

View File

@ -121,6 +121,10 @@ class DataDriver(storage.DataDriverBase):
def claim_controller(self):
return ClaimController(self._pool_catalog)
@decorators.lazy_property(write=False)
def subscription_controller(self):
return SubscriptionController(self._pool_catalog)
class QueueController(storage.Queue):
"""Routes operations to a queue controller in the appropriate pool.
@ -345,6 +349,54 @@ class ClaimController(storage.Claim):
return None
class SubscriptionController(storage.Subscription):
"""Controller to facilitate processing for subscription operations."""
_resource_name = 'subscription'
def __init__(self, pool_catalog):
super(SubscriptionController, self).__init__(pool_catalog)
self._pool_catalog = pool_catalog
self._get_controller = self._pool_catalog.get_subscription_controller
def list(self, queue, project=None, marker=None,
limit=storage.DEFAULT_SUBSCRIPTIONS_PER_PAGE):
control = self._get_controller(queue, project)
if control:
return control.list(queue, project=project,
marker=marker, limit=limit)
def get(self, queue, subscription_id, project=None):
control = self._get_controller(queue, project)
if control:
return control.get(queue, subscription_id, project=project)
def create(self, queue, subscriber, ttl, options, project=None):
control = self._get_controller(queue, project)
if control:
return control.post(queue, subscriber,
ttl, options,
project=project)
def update(self, queue, subscription_id, project=None, **kwargs):
control = self._get_controller(queue, project)
if control:
return control.update(queue, subscription_id,
project=project, **kwargs)
def delete(self, queue, subscription_id, project=None):
control = self._get_controller(queue, project)
if control:
return control.delete(queue, subscription_id,
project=project)
def exists(self, queue, subscription_id, project=None):
control = self._get_controller(queue, project)
if control:
return control.exists(queue, subscription_id,
project=project)
class Catalog(object):
"""Represents the mapping between queues and pool drivers."""
@ -491,6 +543,21 @@ class Catalog(object):
target = self.lookup(queue, project)
return target and target.claim_controller
def get_subscription_controller(self, queue, project=None):
"""Lookup the subscription controller for the given queue and project.
:param queue: Name of the queue for which to find a pool
:param project: Project to which the queue belongs, or
None to specify the "global" or "generic" project.
:returns: The subscription controller associated with the data driver
for the pool containing (queue, project) or None if this doesn't
exist.
:rtype: Maybe SubscriptionController
"""
target = self.lookup(queue, project)
return target and target.subscription_controller
def lookup(self, queue, project=None):
"""Lookup a pool driver for the given queue and project.

View File

@ -206,6 +206,10 @@ class DataDriver(storage.DataDriverBase):
def claim_controller(self):
return controllers.ClaimController(self)
@decorators.lazy_property(write=False)
def subscription_controller(self):
raise NotImplementedError()
class ControlDriver(storage.ControlDriverBase):
@ -234,6 +238,10 @@ class ControlDriver(storage.ControlDriverBase):
def flavors_controller(self):
raise NotImplementedError()
@property
def subscriptions_controller(self):
raise NotImplementedError()
def _get_redis_client(driver):
conf = driver.redis_conf

View File

@ -128,6 +128,10 @@ class DataDriver(storage.DataDriverBase):
def claim_controller(self):
return controllers.ClaimController(self)
@decorators.lazy_property(write=False)
def subscription_controller(self):
pass
def is_alive(self):
return True
@ -190,3 +194,7 @@ class ControlDriver(storage.ControlDriverBase):
def flavors_controller(self):
# NOTE(flaper87): Needed to avoid `abc` errors.
pass
@property
def subscriptions_controller(self):
pass

View File

@ -46,6 +46,10 @@ class DataDriver(storage.DataDriverBase):
def claim_controller(self):
return None
@property
def subscription_controller(self):
return None
class ControlDriver(storage.ControlDriverBase):

View File

@ -1,4 +1,5 @@
# Copyright (c) 2013 Red Hat, Inc.
# Copyright (c) 2014 Catalyst IT Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -929,6 +930,127 @@ class ClaimControllerTest(ControllerBaseTest):
project=self.project)
class SubscriptionControllerTest(ControllerBaseTest):
"""Subscriptions Controller base tests.
"""
queue_name = 'test_queue'
controller_base_class = storage.Subscription
def setUp(self):
super(SubscriptionControllerTest, self).setUp()
self.subscription_controller = self.driver.subscription_controller
# Lets create a queue as the source of subscription
self.queue_controller = self.driver.queue_controller
self.queue_controller.create(self.queue_name, project=self.project)
self.source = self.queue_name
self.subscriber = 'http://trigger.me'
self.ttl = 600
self.options = {'uri': 'http://fake.com'}
def tearDown(self):
self.queue_controller.delete(self.queue_name, project=self.project)
super(SubscriptionControllerTest, self).tearDown()
def test_list(self):
for s in six.moves.xrange(15):
subscriber = 'http://fake_{0}'.format(s)
self.subscription_controller.create(self.source,
subscriber,
self.ttl,
self.options,
project=self.project)
interaction = self.subscription_controller.list(self.source,
project=self.project)
subscriptions = list(next(interaction))
self.assertEqual(all(map(lambda s: 'source' in s and 'subscriber' in s,
subscriptions)), True)
self.assertEqual(len(subscriptions), 10)
interaction = (self.subscription_controller.list(self.source,
project=self.project,
marker=next(interaction)))
subscriptions = list(next(interaction))
self.assertEqual(all(map(lambda s: 'source' in s and 'subscriber' in s,
subscriptions)), True)
self.assertEqual(len(subscriptions), 5)
def test_get_raises_if_subscription_does_not_exist(self):
self.assertRaises(errors.SubscriptionDoesNotExist,
self.subscription_controller.get,
self.queue_name,
'notexists',
project=self.project)
def test_lifecycle(self):
s_id = self.subscription_controller.create(self.source,
self.subscriber,
self.ttl,
self.options,
project=self.project)
subscription = self.subscription_controller.get(self.queue_name,
s_id,
self.project)
self.assertEqual(self.source,
subscription['source'])
self.assertEqual(self.subscriber,
subscription['subscriber'])
exist = self.subscription_controller.exists(self.queue_name,
s_id,
self.project)
self.assertTrue(exist)
self.subscription_controller.update(self.queue_name,
s_id,
project=self.project,
subscriber='http://a.com'
)
updated = self.subscription_controller.get(self.queue_name,
s_id,
self.project)
self.assertEqual('http://a.com', updated['subscriber'])
self.subscription_controller.delete(self.queue_name,
s_id, project=self.project)
self.assertRaises(errors.SubscriptionDoesNotExist,
self.subscription_controller.get,
self.queue_name, s_id)
def test_create_existed(self):
self.subscription_controller.create(self.source,
self.subscriber,
self.ttl,
self.options,
project=self.project)
s_id = self.subscription_controller.create(self.source,
self.subscriber,
self.ttl,
self.options,
project=self.project)
self.assertIsNone(s_id)
def test_nonexist_source(self):
self.assertRaises(errors.QueueDoesNotExist,
self.subscription_controller.create,
'fake_queue_name',
self.subscriber,
self.ttl,
self.options,
self.project)
class PoolsControllerTest(ControllerBaseTest):
"""Pools Controller base tests.