Implement redis driver for notifications

This patch adds redis driver for notifications service against
the storage layer. And this can be also to make the Jenkins
Redis gate happy.

Partially-Implements blueprint: notifications

Change-Id: I8420c66cf35f88503279bb6d0926040affd12471
This commit is contained in:
Fei Long Wang
2015-03-17 06:44:56 +13:00
parent 7a29e9de10
commit 10664899c7
7 changed files with 334 additions and 5 deletions

View File

@@ -414,3 +414,10 @@ class RedisClaimsTest(base.ClaimControllerTest):
num_removed = self.controller._gc(self.queue_name, None)
self.assertEqual(num_removed, 5)
@testing.requires_redis
class RedisSubscriptionTests(base.SubscriptionControllerTest):
driver_class = driver.DataDriver
config_file = 'wsgi_redis.conf'
controller_class = controllers.SubscriptionController

View File

@@ -15,8 +15,10 @@
from zaqar.storage.redis import claims
from zaqar.storage.redis import messages
from zaqar.storage.redis import queues
from zaqar.storage.redis import subscriptions
QueueController = queues.QueueController
MessageController = messages.MessageController
ClaimController = claims.ClaimController
SubscriptionController = subscriptions.SubscriptionController

View File

@@ -208,7 +208,7 @@ class DataDriver(storage.DataDriverBase):
@decorators.lazy_property(write=False)
def subscription_controller(self):
raise NotImplementedError()
return controllers.SubscriptionController(self)
class ControlDriver(storage.ControlDriverBase):
@@ -238,10 +238,6 @@ class ControlDriver(storage.ControlDriverBase):
def flavors_controller(self):
raise NotImplementedError()
@property
def subscriptions_controller(self):
raise NotImplementedError()
def _get_redis_client(driver):
conf = driver.redis_conf

View File

@@ -1,4 +1,6 @@
# Copyright (c) 2014 Prashanth Raghu.
# Copyright (c) 2015 Catalyst IT Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
@@ -20,6 +22,7 @@ from oslo_utils import encodeutils
from oslo_utils import timeutils
MSGENV_FIELD_KEYS = (b'id', b't', b'cr', b'e', b'u', b'c', b'c.e')
SUBENV_FIELD_KEYS = (b'id', b's', b'u', b't', b'e', b'o', b'p')
# TODO(kgriffs): Make similar classes for claims and queues
@@ -99,6 +102,62 @@ class MessageEnvelope(object):
pipe.expire(self.id, self.ttl)
class SubscriptionEnvelope(object):
"""Encapsulates the subscription envelope."""
__slots__ = [
'id',
'source',
'subscriber',
'ttl',
'expires',
'options',
'project',
]
def __init__(self, **kwargs):
self.id = kwargs.get('id', str(uuid.uuid4()))
self.source = kwargs['source']
self.subscriber = kwargs['subscriber']
self.ttl = kwargs['ttl']
self.expires = kwargs.get('expires', float('inf'))
self.options = kwargs['options']
@staticmethod
def from_hmap(hmap):
kwargs = _hmap_kv_to_subenv(hmap)
return SubscriptionEnvelope(**kwargs)
@staticmethod
def from_redis(sid, client):
values = client.hmget(sid, SUBENV_FIELD_KEYS)
# NOTE(kgriffs): If the key does not exist, redis-py returns
# an array of None values.
if values[0] is None:
return None
return _hmap_kv_to_subenv(SUBENV_FIELD_KEYS, values)
def to_redis(self, pipe):
hmap = _subenv_to_hmap(self)
pipe.hmset(self.id, hmap)
pipe.expire(self.id, self.ttl)
def to_basic(self, now):
basic_msg = {
'id': self.id,
'source': self.source,
'subscriber': self.subscriber,
'ttl': self.ttl,
'expires': self.expires,
'options': self.options,
}
return basic_msg
# NOTE(kgriffs): This could have implemented MessageEnvelope functionality
# by adding an "include_body" param to all the methods, but then you end
# up with tons of if statements that make the code rather ugly.
@@ -220,3 +279,33 @@ def _msgenv_to_hmap(msg):
'c': msg.claim_id or '',
'c.e': msg.claim_expires,
}
def _hmap_kv_to_subenv(keys, values):
hmap = dict(zip(keys, values))
kwargs = _hmap_to_subenv_kwargs(hmap)
return SubscriptionEnvelope(**kwargs)
def _hmap_to_subenv_kwargs(hmap):
# NOTE(kgriffs): Under Py3K, redis-py converts all strings
# into binary. Woohoo!
return {
'id': encodeutils.safe_decode(hmap[b'id']),
'source': hmap[b's'],
'subscriber': hmap[b'u'],
'ttl': int(hmap[b't']),
'expires': int(hmap[b'e']),
'options': hmap[b'o']
}
def _subenv_to_hmap(msg):
return {
'id': msg.id,
's': msg.source,
'u': msg.subscriber,
't': msg.ttl,
'e': msg.expires,
'o': msg.options
}

View File

@@ -78,6 +78,10 @@ class QueueController(storage.Queue):
def _claim_ctrl(self):
return self.driver.claim_controller
@decorators.lazy_property(write=False)
def _subscription_ctrl(self):
return self.driver.subscription_controller
def _get_queue_info(self, queue_key, fields, transform=str):
"""Get one or more fields from Queue Info."""

View File

@@ -0,0 +1,173 @@
# Copyright (c) 2015 Catalyst IT Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import functools
import uuid
import msgpack
from oslo_utils import timeutils
import redis
from zaqar.common import decorators
from zaqar.common import utils as common_utils
from zaqar.storage import base
from zaqar.storage import errors
from zaqar.storage.redis import models
from zaqar.storage.redis import utils
SubscripitonEnvelope = models.SubscriptionEnvelope
SUBSET_INDEX_KEY = 'subset_index'
SUBSCRIPTION_IDS_SUFFIX = 'subscriptions'
class SubscriptionController(base.Subscription):
"""Implements subscription resource operations using MongoDB.
Subscriptions are unique by project + queue/topic + subscriber.
Schema:
's': source :: six.text_type
'u': subscriber:: six.text_type
't': ttl:: int
'e': expires: int
'o': options :: dict
'p': project :: six.text_type
"""
def __init__(self, *args, **kwargs):
super(SubscriptionController, self).__init__(*args, **kwargs)
self._client = self.driver.connection
self._packer = msgpack.Packer(encoding='utf-8',
use_bin_type=True).pack
self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8')
@decorators.lazy_property(write=False)
def _queue_ctrl(self):
return self.driver.queue_controller
@utils.raises_conn_error
@utils.retries_on_connection_error
def list(self, queue, project=None, marker=None, limit=10):
client = self._client
subset_key = utils.scope_subscription_ids_set(queue,
project,
SUBSCRIPTION_IDS_SUFFIX)
marker = utils.scope_queue_name(marker, project)
rank = client.zrank(subset_key, marker)
start = rank + 1 if rank else 0
cursor = (q for q in client.zrange(subset_key, start,
start + limit - 1))
marker_next = {}
def denormalizer(record, sid):
ret = {
'id': sid,
'source': record[0],
'subscriber': record[1],
'ttl': record[2],
'options': record[3],
}
marker_next['next'] = sid
return ret
yield utils.SubscriptionListCursor(self._client, cursor, denormalizer)
yield marker_next and marker_next['next']
@utils.raises_conn_error
@utils.retries_on_connection_error
def get(self, queue, subscription_id, project=None):
if not self._queue_ctrl.exists(queue, project):
raise errors.QueueDoesNotExist(queue, project)
subscription = SubscripitonEnvelope.from_redis(subscription_id,
self._client)
now = timeutils.utcnow_ts()
if subscription and not utils.subscription_expired_filter(subscription,
now):
return subscription.to_basic(now)
else:
raise errors.SubscriptionDoesNotExist(subscription_id)
@utils.raises_conn_error
@utils.retries_on_connection_error
def create(self, queue, subscriber, ttl, options, project=None):
subscription_id = str(uuid.uuid4())
subset_key = utils.scope_subscription_ids_set(queue,
project,
SUBSCRIPTION_IDS_SUFFIX)
source = queue
now = timeutils.utcnow_ts()
ttl = int(ttl)
expires = now + ttl
subscription = {'id': subscription_id,
's': source,
'u': subscriber,
't': ttl,
'e': expires,
'o': options,
'p': project}
if not self._queue_ctrl.exists(queue, project):
raise errors.QueueDoesNotExist(queue, project)
try:
# Pipeline ensures atomic inserts.
with self._client.pipeline() as pipe:
pipe.zadd(subset_key, 1,
subscription_id).hmset(subscription_id,
subscription)
pipe.execute()
return subscription_id
except redis.exceptions.ResponseError:
return None
@utils.raises_conn_error
@utils.retries_on_connection_error
def exists(self, queue, subscription_id, project=None):
subset_key = utils.scope_subscription_ids_set(queue, project,
SUBSCRIPTION_IDS_SUFFIX)
return self._client.zrank(subset_key, subscription_id) is not None
@utils.raises_conn_error
@utils.retries_on_connection_error
def update(self, queue, subscription_id, project=None, **kwargs):
names = ('subscriber', 'ttl', 'options')
key_transform = lambda x: 'u' if x == 'subscriber' else x[0]
fields = common_utils.fields(kwargs, names,
pred=lambda x: x is not None,
key_transform=key_transform)
assert fields, ('`subscriber`, `ttl`, '
'or `options` not found in kwargs')
# Pipeline ensures atomic inserts.
with self._client.pipeline() as pipe:
pipe.hmset(subscription_id, fields)
pipe.execute()
@utils.raises_conn_error
@utils.retries_on_connection_error
def delete(self, queue, subscription_id, project=None):
subset_key = utils.scope_subscription_ids_set(queue, project,
SUBSCRIPTION_IDS_SUFFIX)
# NOTE(prashanthr_): Pipelining is used to mitigate race conditions
with self._client.pipeline() as pipe:
pipe.zrem(subset_key, subscription_id)
pipe.delete(subscription_id)
pipe.execute()

View File

@@ -1,4 +1,6 @@
# Copyright (c) 2014 Prashanth Raghu.
# Copyright (c) 2015 Catalyst IT Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
@@ -27,6 +29,7 @@ from zaqar.storage import errors
LOG = logging.getLogger(__name__)
MESSAGE_IDS_SUFFIX = 'messages'
SUBSCRIPTION_IDS_SUFFIX = 'subscriptions'
def descope_queue_name(scoped_name):
@@ -98,6 +101,31 @@ def descope_message_ids_set(msgset_key):
return (tokens[1] or None, tokens[0] or None)
def scope_subscription_ids_set(queue=None, project=None,
subscription_suffix=''):
"""Scope subscriptions set with '.'
Returns a scoped name for the list of subscriptions in the form
project-id_queue-name_suffix
"""
return (normalize_none_str(project) + '.' +
normalize_none_str(queue) + '.' +
subscription_suffix)
def descope_subscription_ids_set(subset_key):
"""Descope subscriptions set with '.'
:returns: (queue, project)
"""
tokens = subset_key.split('.')
return (tokens[1] or None, tokens[0] or None)
# NOTE(prashanthr_): Aliasing the scope_message_ids_set function
# to be used in the pools and claims controller as similar
# functionality is required to scope redis id's.
@@ -109,6 +137,10 @@ def msgset_key(queue, project=None):
return scope_message_ids_set(queue, project, MESSAGE_IDS_SUFFIX)
def subset_key(queue, project=None):
return scope_subscription_ids_set(queue, project, SUBSCRIPTION_IDS_SUFFIX)
def raises_conn_error(func):
"""Handles the Redis ConnectionFailure error.
@@ -205,6 +237,12 @@ def msg_expired_filter(message, now):
return message.expires <= now
def subscription_expired_filter(subscription, now):
"""Return True if the subscription has expired."""
return subscription.expires <= now
class QueueListCursor(object):
def __init__(self, client, queues, denormalizer):
@@ -223,3 +261,23 @@ class QueueListCursor(object):
def __next__(self):
return self.next()
class SubscriptionListCursor(object):
def __init__(self, client, subscriptions, denormalizer):
self.subscription_iter = subscriptions
self.denormalizer = denormalizer
self.client = client
def __iter__(self):
return self
@raises_conn_error
def next(self):
curr = next(self.subscription_iter)
subscription = self.client.hmget(curr, ['s', 'u', 't', 'o'])
return self.denormalizer(subscription, encodeutils.safe_decode(curr))
def __next__(self):
return self.next()