Implements queue, message and claim controllers for Redis

This patch implements the standard controllers for the redis
storage driver. It has been tested against a localhost Redis
server with ZAQAR_TEST_REDIS=1.

Change-Id: Ib7c100afd11a0410c3f241c1925d5aaf172ce6a8
Partially-Implements: blueprint redis-storage-driver
This commit is contained in:
Prashanth Raghu 2014-08-20 22:20:29 +08:00 committed by kgriffs
parent 6472ed4ba4
commit 76f6e063db
23 changed files with 2083 additions and 36 deletions

View File

@ -142,6 +142,26 @@
# Storage driver to use. (string value)
#storage=sqlite
[drivers:storage:redis]
#
# Options defined in zaqar.queues.storage.redis
#
# Redis Server URI. Socket file based connectors are supported.
# Example for socket file connector: redis:/tmp/redis.sock'
#uri=<None>
# Maximum number of times to retry an operation that failed
# due to a primary node failover. (integer value)
#max_reconnect_attempts=10
# Base sleep interval between attempts to reconnect after a
# primary node failover. The actual sleep time increases
# exponentially (power of 2) each time the operation is
# retried. (floating point value)
#reconnect_sleep=0.02
[drivers:storage:mongodb]

View File

@ -41,12 +41,14 @@ zaqar.queues.data.storage =
sqlite = zaqar.queues.storage.sqlalchemy.driver:DataDriver
sqlalchemy = zaqar.queues.storage.sqlalchemy.driver:DataDriver
mongodb = zaqar.queues.storage.mongodb.driver:DataDriver
redis = zaqar.queues.storage.redis.driver:DataDriver
faulty = zaqar.tests.faulty_storage:DataDriver
zaqar.queues.control.storage =
sqlite = zaqar.queues.storage.sqlalchemy.driver:ControlDriver
sqlalchemy = zaqar.queues.storage.sqlalchemy.driver:ControlDriver
mongodb = zaqar.queues.storage.mongodb.driver:ControlDriver
redis = zaqar.queues.storage.redis.driver:ControlDriver
faulty = zaqar.tests.faulty_storage:ControlDriver
zaqar.queues.transport =
@ -60,6 +62,7 @@ oslo.config.opts =
zaqar.queues.storage.pipeline = zaqar.queues.storage.pipeline:_config_options
zaqar.queues.storage.pooling = zaqar.queues.storage.pooling:_config_options
zaqar.queues.storage.mongodb = zaqar.queues.storage.mongodb.options:_config_options
zaqar.queues.storage.redis = zaqar.queues.storage.redis.options:_config_option
zaqar.queues.storage.sqlalchemy = zaqar.queues.storage.sqlalchemy.options:_config_options
zaqar.queues.transport.wsgi = zaqar.queues.transport.wsgi.driver:_config_options
zaqar.queues.transport.base = zaqar.queues.transport.base:_config_options

15
tests/etc/wsgi_redis.conf Normal file
View File

@ -0,0 +1,15 @@
[DEFAULT]
debug = False
verbose = False
[drivers]
transport = wsgi
storage = redis
[drivers:transport:wsgi]
port = 8888
[drivers:storage:redis]
uri = redis://127.0.0.1:6379
max_reconnect_attempts = 3
reconnect_sleep = 1

View File

@ -0,0 +1,11 @@
[DEFAULT]
pooling = True
[drivers]
transport = wsgi
storage = redis
[drivers:storage:redis]
uri = redis://127.0.0.1:6379
max_reconnect_attempts = 3
reconnect_sleep = 1

View File

@ -29,11 +29,15 @@ class TestUtils(testing.TestBase):
def test_can_connect_suceeds_if_good_uri_sqlite(self):
self.assertTrue(utils.can_connect('sqlite://:memory:'))
@ddt.data(
'mongodb://localhost:27018', # wrong port
'localhost:27017', # missing scheme
'redis://localhost:6379' # not supported with default install
)
def test_can_connect_fails_if_bad_uri_missing_schema(self):
self.assertFalse(utils.can_connect('localhost:27017'))
@testing.requires_mongodb
def test_can_connect_fails_if_bad_uri(self, uri):
self.assertFalse(utils.can_connect(uri))
def test_can_connect_fails_if_bad_uri_mongodb(self):
self.assertFalse(utils.can_connect('mongodb://localhost:8080'))
self.assertFalse(utils.can_connect('mongodb://example.com:27017'))
@testing.requires_redis
def test_can_connect_fails_if_bad_uri_redis(self):
self.assertFalse(utils.can_connect('redis://localhost:8080'))
self.assertFalse(utils.can_connect('redis://example.com:6379'))

View File

@ -0,0 +1,277 @@
# Copyright (c) 2014 Prashanth Raghu.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import time
import uuid
import redis
from zaqar.openstack.common.cache import cache as oslo_cache
from zaqar.openstack.common import timeutils
from zaqar.queues import storage
from zaqar.queues.storage.redis import controllers
from zaqar.queues.storage.redis import driver
from zaqar.queues.storage.redis import messages
from zaqar.queues.storage.redis import options
from zaqar.queues.storage.redis import utils
from zaqar import tests as testing
from zaqar.tests.queues.storage import base
def _create_sample_message(now=None, claimed=False, body=None):
if now is None:
now = timeutils.utcnow_ts()
if claimed:
claim_id = uuid.uuid4()
claim_expires = now + 300
else:
claim_id = None
claim_expires = now
if body is None:
body = {}
return messages.Message(
ttl=60,
created=now,
client_uuid=uuid.uuid4(),
claim_id=claim_id,
claim_expires=claim_expires,
body=body
)
class RedisUtilsTest(testing.TestBase):
config_file = 'wsgi_redis.conf'
def setUp(self):
super(RedisUtilsTest, self).setUp()
self.conf.register_opts(options.REDIS_OPTIONS,
group=options.REDIS_GROUP)
self.redis_conf = self.conf[options.REDIS_GROUP]
MockDriver = collections.namedtuple('MockDriver', 'redis_conf')
self.driver = MockDriver(self.redis_conf)
def test_scope_queue_name(self):
self.assertEqual(utils.scope_queue_name('my-q'), '.my-q')
self.assertEqual(utils.scope_queue_name('my-q', None), '.my-q')
self.assertEqual(utils.scope_queue_name('my-q', '123'), '123.my-q')
self.assertEqual(utils.scope_queue_name('my-q_1', '123'), '123.my-q_1')
self.assertEqual(utils.scope_queue_name(), '.')
self.assertEqual(utils.scope_queue_name(None, '123'), '123.')
def test_scope_messages_set(self):
self.assertEqual(utils.scope_message_ids_set('my-q'), '.my-q.')
self.assertEqual(utils.scope_message_ids_set('my-q', 'p'), 'p.my-q.')
self.assertEqual(utils.scope_message_ids_set('my-q', 'p', 's'),
'p.my-q.s')
self.assertEqual(utils.scope_message_ids_set(None), '..')
self.assertEqual(utils.scope_message_ids_set(None, '123'), '123..')
self.assertEqual(utils.scope_message_ids_set(None, None, 's'), '..s')
def test_normalize_none_str(self):
self.assertEqual(utils.normalize_none_str('my-q'), 'my-q')
self.assertEqual(utils.normalize_none_str(None), '')
def test_msg_claimed_filter(self):
now = timeutils.utcnow_ts()
unclaimed_msg = _create_sample_message()
self.assertFalse(utils.msg_claimed_filter(unclaimed_msg, now))
claimed_msg = _create_sample_message(claimed=True)
self.assertTrue(utils.msg_claimed_filter(claimed_msg, now))
# NOTE(kgriffs): Has a claim ID, but the claim is expired
claimed_msg.claim_expires = now - 60
self.assertFalse(utils.msg_claimed_filter(claimed_msg, now))
def test_descope_queue_name(self):
self.assertEqual(utils.descope_queue_name('p.q'), 'q')
self.assertEqual(utils.descope_queue_name('.q'), 'q')
self.assertEqual(utils.descope_queue_name('.'), '')
def test_msg_echo_filter(self):
msg = _create_sample_message()
self.assertTrue(utils.msg_echo_filter(msg, msg.client_uuid))
alt_uuid = utils.generate_uuid()
self.assertFalse(utils.msg_echo_filter(msg, alt_uuid))
def test_basic_message(self):
now = timeutils.utcnow_ts()
body = {
'msg': 'Hello Earthlings!',
'unicode': u'ab\u00e7',
'bytes': b'ab\xc3\xa7',
b'ab\xc3\xa7': 'one, two, three',
u'ab\u00e7': 'one, two, three',
}
msg = _create_sample_message(now=now, body=body)
basic_msg = msg.to_basic(now + 5)
self.assertEqual(basic_msg['id'], msg.id)
self.assertEqual(basic_msg['age'], 5)
self.assertEqual(basic_msg['body'], body)
self.assertEqual(basic_msg['ttl'], msg.ttl)
def test_retries_on_connection_error(self):
num_calls = [0]
@utils.retries_on_connection_error
def _raises_connection_error(self):
num_calls[0] += 1
raise redis.exceptions.ConnectionError
self.assertRaises(redis.exceptions.ConnectionError,
_raises_connection_error, self)
self.assertEqual(num_calls, [self.redis_conf.max_reconnect_attempts])
@testing.requires_redis
class RedisDriverTest(testing.TestBase):
config_file = 'wsgi_redis.conf'
def test_db_instance(self):
cache = oslo_cache.get_cache()
redis_driver = driver.DataDriver(self.conf, cache)
self.assertTrue(isinstance(redis_driver.connection, redis.StrictRedis))
@testing.requires_redis
class RedisQueuesTest(base.QueueControllerTest):
driver_class = driver.DataDriver
config_file = 'wsgi_redis.conf'
controller_class = controllers.QueueController
def setUp(self):
super(RedisQueuesTest, self).setUp()
self.connection = self.driver.connection
self.msg_controller = self.driver.message_controller
def tearDown(self):
super(RedisQueuesTest, self).tearDown()
self.connection.flushdb()
def test_inc_counter(self):
queue_name = 'inc-counter'
self.controller.create(queue_name)
self.controller._inc_counter(queue_name, None, 10)
scoped_q_name = utils.scope_queue_name(queue_name)
count = self.controller._get_queue_info(scoped_q_name, b'c', int)[0]
self.assertEqual(count, 10)
def test_inc_claimed(self):
self.addCleanup(self.controller.delete, 'test-queue',
project=self.project)
queue_name = 'inc-claimed'
self.controller.create(queue_name)
self.controller._inc_claimed(queue_name, None, 10)
scoped_q_name = utils.scope_queue_name(queue_name)
claimed = self.controller._get_queue_info(scoped_q_name,
b'cl', int)[0]
self.assertEqual(claimed, 10)
@testing.requires_redis
class RedisMessagesTest(base.MessageControllerTest):
driver_class = driver.DataDriver
config_file = 'wsgi_redis.conf'
controller_class = controllers.MessageController
def setUp(self):
super(RedisMessagesTest, self).setUp()
self.connection = self.driver.connection
self.q_controller = self.driver.queue_controller
def tearDown(self):
super(RedisMessagesTest, self).tearDown()
self.connection.flushdb()
def test_get_count(self):
queue_name = 'get-count'
self.q_controller.create(queue_name)
msgs = [{
'ttl': 300,
'body': 'di mo fy'
} for i in range(0, 10)]
client_id = uuid.uuid4()
# Creating 10 messages
self.controller.post(queue_name, msgs, client_id)
messages_set_id = utils.scope_message_ids_set(queue_name, None,
'messages')
num_msg = self.controller._get_count(messages_set_id)
self.assertEqual(num_msg, 10)
def test_empty_queue_exception(self):
queue_name = 'empty-queue-test'
self.q_controller.create(queue_name)
self.assertRaises(storage.errors.QueueIsEmpty,
self.controller.first, queue_name)
@testing.requires_redis
class RedisClaimsTest(base.ClaimControllerTest):
driver_class = driver.DataDriver
config_file = 'wsgi_redis.conf'
controller_class = controllers.ClaimController
def setUp(self):
super(RedisClaimsTest, self).setUp()
self.connection = self.driver.connection
self.q_controller = self.driver.queue_controller
def tearDown(self):
super(RedisClaimsTest, self).tearDown()
self.connection.flushdb()
def test_claim_doesnt_exist(self):
queue_name = 'no-such-claim'
epoch = '000000000000000000000000'
self.q_controller.create(queue_name)
self.assertRaises(storage.errors.ClaimDoesNotExist,
self.controller.get, queue_name,
epoch, project=None)
claim_id, messages = self.controller.create(queue_name, {'ttl': 2,
'grace': 0},
project=None)
# Lets let it expire
time.sleep(2)
self.assertRaises(storage.errors.ClaimDoesNotExist,
self.controller.update, queue_name,
claim_id, {}, project=None)

View File

@ -42,26 +42,34 @@ class Conflict(ExceptionBase):
class MessageConflict(Conflict):
msg_format = (u'Message could not be enqueued due to a conflict '
u'with another message that is already in '
u'with one or more other messages that are already in '
u'queue {queue} for project {project}')
def __init__(self, queue, project, message_ids):
def __init__(self, queue, project):
"""Initializes the error with contextual information.
:param queue: name of the queue to which the message was posted
:param project: name of the project to which the queue belongs
:param message_ids: list of IDs for messages successfully
posted. Note that these must be in the same order as the
list of messages originally submitted to be enqueued.
"""
super(MessageConflict, self).__init__(queue=queue, project=project)
self._succeeded_ids = message_ids
@property
def succeeded_ids(self):
return self._succeeded_ids
class ClaimConflict(Conflict):
msg_format = (u'Messages could not be claimed due to a conflict '
u'with another parallel claim that is already in '
u'queue {queue} for project {project}')
def __init__(self, queue, project):
"""Initializes the error with contextual information.
:param queue: name of the queue to which the message was posted
:param project: name of the project to which the queue belongs
"""
super(ClaimConflict, self).__init__(queue=queue, project=project)
class QueueDoesNotExist(DoesNotExist):

View File

@ -668,9 +668,7 @@ class MessageController(storage.Message):
queue=queue_name,
project=project))
succeeded_ids = []
raise errors.MessageConflict(queue_name, project,
succeeded_ids)
raise errors.MessageConflict(queue_name, project)
@utils.raises_conn_error
@utils.retries_on_autoreconnect

View File

View File

@ -0,0 +1,368 @@
# Copyright (c) 2014 Prashanth Raghu.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import msgpack
import redis
from zaqar.common import decorators
from zaqar.openstack.common import log as logging
from zaqar.openstack.common import timeutils
from zaqar.queues import storage
from zaqar.queues.storage import errors
from zaqar.queues.storage.redis import messages
from zaqar.queues.storage.redis import utils
LOG = logging.getLogger(__name__)
QUEUE_CLAIMS_SUFFIX = 'claims'
CLAIM_MESSAGES_SUFFIX = 'messages'
RETRY_CLAIM_TIMEOUT = 10
class ClaimController(storage.Claim):
"""Implements claim resource operations using Redis.
Redis Data Structures:
----------------------
Claims list (Redis set) contains claim ids
Key: <project-id_q-name>
Name Field
-------------------------
claim_ids m
Claimed Messages (Redis set) contains the list of
message ids stored per claim
Key: <claim_id>_messages
Claim info(Redis Hash):
Key: <claim_id>
Name Field
-------------------------
ttl -> t
id -> id
expires -> e
"""
def __init__(self, *args, **kwargs):
super(ClaimController, self).__init__(*args, **kwargs)
self._client = self.driver.connection
self._packer = msgpack.Packer(encoding='utf-8',
use_bin_type=True).pack
self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8')
def _get_claim_info(self, claim_id, fields, transform=int):
"""Get one or more fields from the claim Info."""
values = self._client.hmget(claim_id, fields)
return [transform(v) for v in values] if transform else values
def _exists(self, queue, claim_id, project):
client = self._client
claims_set_key = utils.scope_claims_set(queue, project,
QUEUE_CLAIMS_SUFFIX)
# Return False if no such claim exists
# TODO(prashanthr_): Discuss the feasibility of a bloom filter.
if not client.sismember(claims_set_key, claim_id):
return False
expires = self._get_claim_info(claim_id, b'e')[0]
now = timeutils.utcnow_ts()
if now > expires:
return False
return True
def _get_claimed_message_keys(self, claim_id):
return self._client.lrange(claim_id, 0, -1)
@decorators.lazy_property(write=False)
def _message_ctrl(self):
return self.driver.message_controller
@decorators.lazy_property(write=False)
def _queue_ctrl(self):
return self.driver.queue_controller
@utils.raises_conn_error
@utils.retries_on_connection_error
def get(self, queue, claim_id, project=None):
if not self._exists(queue, claim_id, project):
raise errors.ClaimDoesNotExist(queue, project, claim_id)
claim_msgs_key = utils.scope_claim_messages(claim_id,
CLAIM_MESSAGES_SUFFIX)
# basic_messages
msg_keys = self._get_claimed_message_keys(claim_msgs_key)
with self._client.pipeline() as pipe:
for key in msg_keys:
pipe.hgetall(key)
raw_messages = pipe.execute()
now = timeutils.utcnow_ts()
basic_messages = [messages.Message.from_redis(msg).to_basic(now)
for msg in raw_messages if msg]
# claim_meta
now = timeutils.utcnow_ts()
expires, ttl = self._get_claim_info(claim_id, [b'e', b't'])
update_time = expires - ttl
age = now - update_time
claim_meta = {
'age': age,
'ttl': ttl,
'id': claim_id,
}
return claim_meta, basic_messages
@utils.raises_conn_error
@utils.retries_on_connection_error
def create(self, queue, metadata, project=None,
limit=storage.DEFAULT_MESSAGES_PER_CLAIM):
ttl = int(metadata.get('ttl', 60))
grace = int(metadata.get('grace', 60))
msg_ttl = ttl + grace
claim_id = utils.generate_uuid()
claim_key = utils.scope_claim_messages(claim_id,
CLAIM_MESSAGES_SUFFIX)
claims_set_key = utils.scope_claims_set(queue, project,
QUEUE_CLAIMS_SUFFIX)
counter_key = self._queue_ctrl._claim_counter_key(queue, project)
with self._client.pipeline() as pipe:
start_ts = timeutils.utcnow_ts()
# NOTE(kgriffs): Retry the operation if another transaction
# completes before this one, in which case it will have
# claimed the same messages the current thread is trying
# to claim, and therefoe we must try for another batch.
#
# This loop will eventually time out if we can't manage to
# claim any messages due to other threads continually beating
# us to the punch.
# TODO(kgriffs): Would it be beneficial (or harmful) to
# introducce a backoff sleep in between retries?
while (timeutils.utcnow_ts() - start_ts) < RETRY_CLAIM_TIMEOUT:
# NOTE(kgriffs): The algorithm for claiming messages:
#
# 1. Get a batch of messages that are currently active.
# 2. For each active message in the batch, extend its
# lifetime IFF it would otherwise expire before the
# claim itself does.
# 3. Associate the claim with each message
# 4. Create a claim record with details such as TTL
# and expiration time.
# 5. Add the claim's ID to a set to facilitate fast
# existence checks.
results = self._message_ctrl._active(queue, project=project,
limit=limit)
cursor = next(results)
msg_list = list(cursor)
# NOTE(kgriffs): If there are no active messages to
# claim, simply return an empty list.
if not msg_list:
return (None, iter([]))
basic_messages = []
try:
# TODO(kgriffs): Is it faster/better to do this all
# in a Lua script instead of using an app-layer
# transaction?
# NOTE(kgriffs): Abort the entire transaction if
# another request beats us to the punch. We detect
# this by putting a watch on the key that will have
# one of its fields updated as the final step of
# the transaction.
pipe.watch(counter_key)
pipe.multi()
now = timeutils.utcnow_ts()
claim_expires = now + ttl
msg_expires = claim_expires + grace
# Associate the claim with each message
for msg in msg_list:
msg.claim_id = claim_id
msg.claim_expires = claim_expires
if _msg_would_expire(msg, msg_expires):
msg.ttl = msg_ttl
msg.expires = msg_expires
pipe.rpush(claim_key, msg.id)
# TODO(kgriffs): Rather than writing back the
# entire message, only set the fields that
# have changed.
msg.to_redis(pipe)
basic_messages.append(msg.to_basic(now))
# Create the claim
claim_info = {
'id': claim_id,
't': ttl,
'e': claim_expires
}
pipe.hmset(claim_id, claim_info)
# NOTE(kgriffs): Add the claim ID to a set so that
# existence checks can be performed quickly.
pipe.sadd(claims_set_key, claim_id)
# NOTE(kgriffs): Update a counter that facilitates
# the queue stats calculation.
self._queue_ctrl._inc_claimed(queue, project,
len(msg_list),
pipe=pipe)
pipe.execute()
return claim_id, basic_messages
except redis.exceptions.WatchError:
continue
raise errors.ClaimConflict(queue, project)
@utils.raises_conn_error
@utils.retries_on_connection_error
def update(self, queue, claim_id, metadata, project=None):
if not self._exists(queue, claim_id, project):
raise errors.ClaimDoesNotExist(claim_id, queue, project)
now = timeutils.utcnow_ts()
claim_ttl = int(metadata.get('ttl', 60))
claim_expires = now + claim_ttl
grace = int(metadata.get('grace', 60))
msg_ttl = claim_ttl + grace
msg_expires = claim_expires + grace
claim_messages = utils.scope_claim_messages(claim_id,
CLAIM_MESSAGES_SUFFIX)
msg_keys = self._get_claimed_message_keys(claim_messages)
with self._client.pipeline() as pipe:
for key in msg_keys:
pipe.hgetall(key)
claimed_msgs = pipe.execute()
claim_info = {
't': claim_ttl,
'e': claim_expires,
}
with self._client.pipeline() as pipe:
for msg in claimed_msgs:
if msg:
msg = messages.Message.from_redis(msg)
msg.claim_id = claim_id
msg.claim_expires = claim_expires
if _msg_would_expire(msg, msg_expires):
msg.ttl = msg_ttl
msg.expires = msg_expires
# TODO(kgriffs): Rather than writing back the
# entire message, only set the fields that
# have changed.
msg.to_redis(pipe)
# Update the claim id and claim expiration info
# for all the messages.
pipe.hmset(claim_id, claim_info)
pipe.execute()
@utils.raises_conn_error
@utils.retries_on_connection_error
def delete(self, queue, claim_id, project=None):
# NOTE(prashanthr_): Return silently when the claim
# does not exist
if not self._exists(queue, claim_id, project):
return
now = timeutils.utcnow_ts()
claim_messages_key = utils.scope_claim_messages(claim_id,
CLAIM_MESSAGES_SUFFIX)
msg_keys = self._get_claimed_message_keys(claim_messages_key)
with self._client.pipeline() as pipe:
for msg_key in msg_keys:
pipe.hgetall(msg_key)
claimed_msgs = pipe.execute()
# Update the claim id and claim expiration info
# for all the messages.
claims_set_key = utils.scope_claims_set(queue, project,
QUEUE_CLAIMS_SUFFIX)
with self._client.pipeline() as pipe:
pipe.srem(claims_set_key, claim_id)
pipe.delete(claim_id)
pipe.delete(claim_messages_key)
for msg in claimed_msgs:
if msg:
msg = messages.Message.from_redis(msg)
msg.claim_id = None
msg.claim_expires = now
# TODO(kgriffs): Rather than writing back the
# entire message, only set the fields that
# have changed.
msg.to_redis(pipe)
self._queue_ctrl._inc_claimed(queue, project,
-1 * len(claimed_msgs),
pipe=pipe)
pipe.execute()
def _msg_would_expire(message, now):
return message.expires < now

View File

@ -0,0 +1,22 @@
# Copyright (c) 2014 Prashanth Raghu
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from zaqar.queues.storage.redis import claims
from zaqar.queues.storage.redis import messages
from zaqar.queues.storage.redis import queues
QueueController = queues.QueueController
MessageController = messages.MessageController
ClaimController = claims.ClaimController

View File

@ -0,0 +1,107 @@
# Copyright (c) 2014 Prashanth Raghu.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import redis
from six.moves import urllib
from zaqar.common import decorators
from zaqar.openstack.common import log as logging
from zaqar.queues import storage
from zaqar.queues.storage.redis import controllers
from zaqar.queues.storage.redis import options
LOG = logging.getLogger(__name__)
def _get_redis_client(conf):
# TODO(prashanthr_): Add SSL support
parsed_url = urllib.parse.urlparse(conf.uri)
if parsed_url.hostname:
return redis.StrictRedis(host=parsed_url.hostname,
port=parsed_url.port)
else:
return redis.StrictRedis(unix_socket_path=parsed_url.path)
class DataDriver(storage.DataDriverBase):
def __init__(self, conf, cache):
super(DataDriver, self).__init__(conf, cache)
opts = options.REDIS_OPTIONS
if 'dynamic' in conf:
names = conf[options.REDIS_GROUP].keys()
opts = filter(lambda x: x.name not in names, opts)
self.conf.register_opts(opts,
group=options.REDIS_GROUP)
self.redis_conf = self.conf[options.REDIS_GROUP]
def is_alive(self):
try:
return self.connection.ping()
except redis.exceptions.ConnectionError:
return False
def _health(self):
KPI = {}
KPI['storage_reachable'] = self.is_alive()
KPI['operation_status'] = self._get_operation_status()
# TODO(kgriffs): Add metrics re message volume
return KPI
@decorators.lazy_property(write=False)
def connection(self):
"""Redis client connection instance."""
return _get_redis_client(self.redis_conf)
@decorators.lazy_property(write=False)
def queue_controller(self):
return controllers.QueueController(self)
@decorators.lazy_property(write=False)
def message_controller(self):
return controllers.MessageController(self)
@decorators.lazy_property(write=False)
def claim_controller(self):
return controllers.ClaimController(self)
class ControlDriver(storage.ControlDriverBase):
def __init__(self, conf, cache):
super(ControlDriver, self).__init__(conf, cache)
self.conf.register_opts(options.REDIS_OPTIONS,
group=options.REDIS_GROUP)
self.redis_conf = self.conf[options.REDIS_GROUP]
@decorators.lazy_property(write=False)
def connection(self):
"""Redis client connection instance."""
return _get_redis_client(self.redis_conf)
@property
def pools_controller(self):
return None
@property
def catalogue_controller(self):
return None

View File

@ -0,0 +1,483 @@
# Copyright (c) 2014 Prashanth Raghu.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import uuid
import redis
from zaqar.common import decorators
from zaqar.openstack.common import strutils
from zaqar.openstack.common import timeutils
from zaqar.queues import storage
from zaqar.queues.storage import errors
from zaqar.queues.storage.redis import models
from zaqar.queues.storage.redis import utils
Message = models.Message
MESSAGE_IDS_SUFFIX = 'messages'
# The rank counter is an atomic index to rank messages
# in a FIFO manner.
MESSAGE_RANK_COUNTER_SUFFIX = 'rank_counter'
# NOTE(kgriffs): This value, in seconds, should be at least less than the
# minimum allowed TTL for messages (60 seconds).
RETRY_POST_TIMEOUT = 10
class MessageController(storage.Message):
"""Implements message resource operations using Redis.
Messages are scoped by project + queue.
Redis Data Structures:
----------------------
1. Message id's list (Redis sorted set)
Each queue in the system has a set of message ids currently
in the queue. The list is sorted based on a ranking which is
incremented atomically using the counter(MESSAGE_RANK_COUNTER_SUFFIX)
also stored in the database for every queue.
Key: <project-id.q-name>
2. Messages(Redis Hash):
Scoped by the UUID of the message, the redis datastructure
has the following information.
Name Field
-----------------------------
id -> id
ttl -> t
expires -> e
body -> b
claim -> c
claim expiry time -> c.e
client uuid -> u
created time -> cr
"""
def __init__(self, *args, **kwargs):
super(MessageController, self).__init__(*args, **kwargs)
self._client = self.driver.connection
@decorators.lazy_property(write=False)
def _queue_ctrl(self):
return self.driver.queue_controller
@decorators.lazy_property(write=False)
def _claim_ctrl(self):
return self.driver.claim_controller
def _active(self, queue_name, marker=None, echo=False,
client_uuid=None, project=None,
limit=None):
return self._list(queue_name, project=project, marker=marker,
echo=echo, client_uuid=client_uuid,
include_claimed=False,
limit=limit, to_basic=False)
def _get_count(self, msgset_key):
"""Get num messages in a Queue.
Return the number of messages in a queue scoped by
queue and project.
"""
return self._client.zcard(msgset_key)
@utils.raises_conn_error
@utils.retries_on_connection_error
def _delete_queue_messages(self, queue, project, pipe):
"""Method to remove all the messages belonging to a queue.
Will be referenced from the QueueController.
The pipe to execute deletion will be passed from the QueueController
executing the operation.
"""
client = self._client
msgset_key = utils.scope_message_ids_set(queue, project,
MESSAGE_IDS_SUFFIX)
message_ids = client.zrange(msgset_key, 0, -1)
pipe.delete(msgset_key)
for msg_id in message_ids:
pipe.delete(msg_id)
# TODO(prashanthr_): Looking for better ways to solve the issue.
def _find_first_unclaimed(self, queue, project, limit):
"""Find the first unclaimed message in the queue."""
msgset_key = utils.scope_message_ids_set(queue, project,
MESSAGE_IDS_SUFFIX)
marker = 0
now = timeutils.utcnow_ts()
# NOTE(prashanthr_): This will not be an infinite loop.
while True:
msg_keys = self._client.zrange(msgset_key, marker,
marker + limit)
if msg_keys:
messages = [Message.from_redis(self._client.hgetall(msg_key))
for msg_key in msg_keys]
for msg in messages:
if not utils.msg_claimed_filter(msg, now):
return msg.id
else:
return None
def _exists(self, key):
"""Check if message exists in the Queue.
Helper function which checks if a particular message_id
exists in the sorted set of the queues message ids.
"""
return self._client.exists(key)
def _get_first_message_id(self, queue, project, sort):
"""Fetch head/tail of the Queue.
Helper function to get the first message in the queue
sort > 0 get from the left else from the right.
"""
msgset_key = utils.scope_message_ids_set(queue, project,
MESSAGE_IDS_SUFFIX)
sorter = self._client.zrange if sort == 1 else self._client.zrevrange
message_ids = sorter(msgset_key, 0, 0)
return message_ids[0] if message_ids else None
def _get(self, message_id):
msg = self._client.hgetall(message_id)
return Message.from_redis(msg) if msg else None
def _get_claim(self, message_id):
claim = self._client.hmget(message_id, 'c', 'c.e')
if claim == [None, None]:
# NOTE(kgriffs): message_id was not found
return None
return {
# NOTE(kgriffs): A "None" claim is serialized as an empty str
'id': strutils.safe_decode(claim[0]) or None,
'expires': int(claim[1]),
}
def _list(self, queue, project=None, marker=None,
limit=storage.DEFAULT_MESSAGES_PER_PAGE,
echo=False, client_uuid=None,
include_claimed=False, to_basic=True):
if not self._queue_ctrl.exists(queue, project):
raise errors.QueueDoesNotExist(queue,
project)
msgset_key = utils.scope_message_ids_set(queue,
project,
MESSAGE_IDS_SUFFIX)
client = self._client
with self._client.pipeline() as pipe:
# NOTE(prashanthr_): Iterate through the queue to find the first
# unclaimed message.
if not marker and not include_claimed:
marker = self._find_first_unclaimed(queue, project, limit)
start = client.zrank(msgset_key, marker) or 0
else:
rank = client.zrank(msgset_key, marker)
start = rank + 1 if rank else 0
message_ids = client.zrange(msgset_key, start,
start + (limit - 1))
for msg_id in message_ids:
pipe.hgetall(msg_id)
messages = pipe.execute()
# NOTE(prashanthr_): Build a list of filters for checking
# the following:
#
# 1. Message is expired
# 2. Message is claimed
# 3. Message should not be echoed
#
now = timeutils.utcnow_ts()
filters = [functools.partial(utils.msg_expired_filter, now=now)]
if not include_claimed:
filters.append(functools.partial(utils.msg_claimed_filter,
now=now))
if not echo:
filters.append(functools.partial(utils.msg_echo_filter,
client_uuid=client_uuid))
marker = {}
yield _filter_messages(messages, pipe, filters, to_basic, marker)
yield marker['next']
@utils.raises_conn_error
@utils.retries_on_connection_error
def list(self, queue, project=None, marker=None,
limit=storage.DEFAULT_MESSAGES_PER_PAGE,
echo=False, client_uuid=None,
include_claimed=False):
return self._list(queue, project, marker, limit, echo,
client_uuid, include_claimed)
@utils.raises_conn_error
@utils.retries_on_connection_error
def first(self, queue, project=None, sort=1):
if sort not in (1, -1):
raise ValueError(u'sort must be either 1 (ascending) '
u'or -1 (descending)')
message_id = self._get_first_message_id(queue, project, sort)
if not message_id:
raise errors.QueueIsEmpty(queue, project)
now = timeutils.utcnow_ts()
return self._get(message_id).to_basic(now, include_created=True)
@utils.raises_conn_error
@utils.retries_on_connection_error
def get(self, queue, message_id, project=None):
if not self._queue_ctrl.exists(queue, project):
raise errors.QueueDoesNotExist(queue, project)
message = self._get(message_id)
now = timeutils.utcnow_ts()
if message and not utils.msg_expired_filter(message, now):
return message.to_basic(now)
else:
raise errors.MessageDoesNotExist(message_id, queue, project)
@utils.raises_conn_error
@utils.retries_on_connection_error
def bulk_get(self, queue, message_ids, project=None):
if not self._queue_ctrl.exists(queue, project):
raise errors.QueueDoesNotExist(queue, project)
# NOTE(prashanthr_): Pipelining is used here purely
# for performance.
with self._client.pipeline() as pipe:
for mid in message_ids:
pipe.hgetall(mid)
messages = pipe.execute()
# NOTE(kgriffs): Skip messages that may have been deleted
now = timeutils.utcnow_ts()
return (Message.from_redis(msg).to_basic(now)
for msg in messages if msg)
@utils.raises_conn_error
@utils.retries_on_connection_error
def post(self, queue, messages, client_uuid, project=None):
if not self._queue_ctrl.exists(queue, project):
raise errors.QueueDoesNotExist(queue, project)
msgset_key = utils.scope_message_ids_set(queue, project,
MESSAGE_IDS_SUFFIX)
counter_key = utils.scope_queue_index(queue, project,
MESSAGE_RANK_COUNTER_SUFFIX)
with self._client.pipeline() as pipe:
start_ts = timeutils.utcnow_ts()
# NOTE(kgriffs): Retry the operation if another transaction
# completes before this one, in which case it may have
# posted messages with the same rank counter the current
# thread is trying to use, which would cause messages
# to get out of order and introduce the risk of a client
# missing a message while reading from the queue.
#
# This loop will eventually time out if we can't manage to
# post any messages due to other threads continually beating
# us to the punch.
# TODO(kgriffs): Would it be beneficial (or harmful) to
# introducce a backoff sleep in between retries?
while (timeutils.utcnow_ts() - start_ts) < RETRY_POST_TIMEOUT:
now = timeutils.utcnow_ts()
prepared_messages = [
Message(
ttl=msg['ttl'],
created=now,
client_uuid=client_uuid,
claim_id=None,
claim_expires=now,
body=msg.get('body', {}),
)
for msg in messages
]
try:
pipe.watch(counter_key)
rank_counter = pipe.get(counter_key)
rank_counter = int(rank_counter) if rank_counter else 0
pipe.multi()
keys = []
for i, msg in enumerate(prepared_messages):
msg.to_redis(pipe)
pipe.zadd(msgset_key, rank_counter + i, msg.id)
keys.append(msg.id)
pipe.incrby(counter_key, len(keys))
self._queue_ctrl._inc_counter(queue, project,
len(prepared_messages),
pipe=pipe)
pipe.execute()
return keys
except redis.exceptions.WatchError:
continue
raise errors.MessageConflict(queue, project)
@utils.raises_conn_error
@utils.retries_on_connection_error
def delete(self, queue, message_id, project=None, claim=None):
if not self._queue_ctrl.exists(queue, project):
raise errors.QueueDoesNotExist(queue, project)
# TODO(kgriffs): Create decorator for validating claim and message
# IDs, since those are not checked at the transport layer. This
# decorator should be applied to all relevant methods.
if claim is not None:
try:
uuid.UUID(claim)
except ValueError:
raise errors.ClaimDoesNotExist(queue, project, claim)
msg_claim = self._get_claim(message_id)
# NOTE(kgriffs): The message does not exist, so
# it is essentially "already deleted".
if msg_claim is None:
return
now = timeutils.utcnow_ts()
is_claimed = msg_claim['id'] and (now < msg_claim['expires'])
if claim is None:
if is_claimed:
raise errors.MessageIsClaimed(message_id)
elif not is_claimed:
raise errors.MessageNotClaimed(message_id)
elif msg_claim['id'] != claim:
if not self._claim_ctrl._exists(queue, claim, project):
raise errors.ClaimDoesNotExist(queue, project, claim)
raise errors.MessageNotClaimedBy(message_id, claim)
msgset_key = utils.scope_message_ids_set(queue, project,
MESSAGE_IDS_SUFFIX)
with self._client.pipeline() as pipe:
results = pipe.delete(message_id).zrem(msgset_key,
message_id).execute()
# NOTE(prashanthr_): results[0] is 1 when the delete is
# successful. Hence we use that case to identify successful
# deletes.
if results[0] == 1:
self._queue_ctrl._inc_counter(queue, project, -1)
@utils.raises_conn_error
@utils.retries_on_connection_error
def bulk_delete(self, queue, message_ids, project=None):
if not self._queue_ctrl.exists(queue, project):
raise errors.QueueDoesNotExist(queue,
project)
msgset_key = utils.scope_message_ids_set(queue, project,
MESSAGE_IDS_SUFFIX)
with self._client.pipeline() as pipe:
for message_id in message_ids:
pipe.delete(message_id).zrem(msgset_key, message_id)
results = pipe.execute()
# NOTE(prashanthr_): None is returned for the cases where
# the message might not exist or has been deleted/expired.
# Hence we calculate the number of deletes as the
# total number of message ids - number of failed deletes.
amount = -1 * (len(results) - results.count(0)) / 2
self._queue_ctrl._inc_counter(queue, project, int(amount))
@utils.raises_conn_error
@utils.retries_on_connection_error
def pop(self, queue, limit, project=None):
# Pop is implemented as a chain of the following operations:
# 1. Create a claim.
# 2. Delete the messages claimed.
# 3. Delete the claim.
claim_id, messages = self._claim_ctrl.create(
queue, dict(ttl=1, grace=0), project, limit=limit)
message_ids = [message['id'] for message in messages]
self.bulk_delete(queue, message_ids, project)
# NOTE(prashanthr_): Creating a claim controller reference
# causes a recursive reference. Hence, using the reference
# from the driver.
self._claim_ctrl.delete(queue, claim_id, project)
return messages
def _filter_messages(messages, pipe, filters, to_basic, marker):
"""Create a filtering iterator over a list of messages.
The function accepts a list of filters to be filtered
before the the message can be included as a part of the reply.
"""
now = timeutils.utcnow_ts()
for msg in messages:
# NOTE(kgriffs): Message may have been deleted, so
# check each value to ensure we got a message back
if msg:
msg = Message.from_redis(msg)
# NOTE(kgriffs): Check to see if any of the filters
# indiciate that this message should be skipped.
for should_skip in filters:
if should_skip(msg):
break
else:
marker['next'] = msg.id
if to_basic:
yield msg.to_basic(now)
else:
yield msg

View File

@ -0,0 +1,124 @@
# Copyright (c) 2014 Prashanth Raghu.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import uuid
import msgpack
from zaqar.openstack.common import strutils
from zaqar.openstack.common import timeutils
_pack = msgpack.Packer(encoding='utf-8', use_bin_type=True).pack
_unpack = functools.partial(msgpack.unpackb, encoding='utf-8')
# TODO(kgriffs): Make similar classes for claims and queues
class Message(object):
"""Message is used to organize,store and retrieve messages from redis.
Message class helps organize,store and retrieve messages in a version
compatible manner.
:param id: Message ID in the form of a hexadecimal UUID. If not
given, one will be automatically generated.
:param ttl: Message TTL in seconds
:param created: Message creation time as a UNIX timestamp
:param client_uuid: UUID of the client that posted the message
:param claim_id: If claimed, the UUID of the claim. Set to None
for messages that have never been claimed.
:param claim_expires: Claim expiration as a UNIX timestamp
:param body: Message payload. Must be serializable to mspack.
"""
message_data = {}
__slots__ = (
'id',
'ttl',
'created',
'expires',
'client_uuid',
'claim_id',
'claim_expires',
'body',
)
def __init__(self, **kwargs):
self.id = kwargs.get('id', str(uuid.uuid4()))
self.ttl = kwargs['ttl']
self.created = kwargs['created']
self.expires = kwargs.get('expires', self.created + self.ttl)
self.client_uuid = str(kwargs['client_uuid'])
self.claim_id = kwargs.get('claim_id')
self.claim_expires = kwargs['claim_expires']
self.body = kwargs['body']
@property
def created_iso(self):
return timeutils.iso8601_from_timestamp(self.created)
@staticmethod
def from_redis(doc):
claim_id = doc[b'c']
if claim_id:
claim_id = strutils.safe_decode(claim_id)
else:
claim_id = None
# NOTE(kgriffs): Under Py3K, redis-py converts all strings
# into binary. Woohoo!
return Message(
id=strutils.safe_decode(doc[b'id']),
ttl=int(doc[b't']),
created=int(doc[b'cr']),
expires=int(doc[b'e']),
client_uuid=strutils.safe_decode(doc[b'u']),
claim_id=claim_id,
claim_expires=int(doc[b'c.e']),
body=_unpack(doc[b'b']),
)
def to_redis(self, pipe):
doc = {
'id': self.id,
't': self.ttl,
'cr': self.created,
'e': self.expires,
'u': self.client_uuid,
'c': self.claim_id or '',
'c.e': self.claim_expires,
'b': _pack(self.body),
}
pipe.hmset(self.id, doc)
def to_basic(self, now, include_created=False):
basic_msg = {
'id': self.id,
'age': now - self.created,
'ttl': self.ttl,
'body': self.body
}
if include_created:
basic_msg['created'] = self.created_iso
return basic_msg

View File

@ -0,0 +1,40 @@
# Copyright (c) 2014 Prashanth Raghu.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Redis storage driver configuration options."""
from oslo.config import cfg
REDIS_OPTIONS = (
cfg.StrOpt('uri', default="redis://127.0.0.1:6379",
help=('Redis Server URI. Can also use a '
'socket file based connector. '
'Ex: redis:/tmp/redis.sock')),
cfg.IntOpt('max_reconnect_attempts', default=10,
help=('Maximum number of times to retry an operation that '
'failed due to a redis node failover.')),
cfg.FloatOpt('reconnect_sleep', default=1,
help=('Base sleep interval between attempts to reconnect '
'after a redis node failover. '))
)
REDIS_GROUP = 'drivers:storage:redis'
def _config_options():
return [(REDIS_GROUP, REDIS_OPTIONS)]

View File

@ -0,0 +1,253 @@
# Copyright (c) 2014 Prashanth Raghu.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import msgpack
import redis
from zaqar.common import decorators
from zaqar.openstack.common import log as logging
from zaqar.openstack.common import timeutils
from zaqar.queues import storage
from zaqar.queues.storage import errors
from zaqar.queues.storage.redis import messages
from zaqar.queues.storage.redis import utils
LOG = logging.getLogger(__name__)
QUEUES_SET_STORE_NAME = 'queues_set'
MESSAGE_IDS_SUFFIX = 'messages'
class QueueController(storage.Queue):
"""Implements queue resource operations using Redis.
Queues are scoped by project, which is prefixed to the
queue name.
Queues (Redis sorted set):
Key: queues_set
Id Value
---------------------------------
name -> <project-id_q-name>
The set helps faster existence checks, while the list helps
paginated retrieval of queues.
Queue Information (Redis hash):
Key: <project-id_q-name>
Name Field
-------------------------------
count -> c
num_msgs_claimed -> cl
metadata -> m
creation timestamp -> t
"""
def __init__(self, *args, **kwargs):
super(QueueController, self).__init__(*args, **kwargs)
self._client = self.driver.connection
self._packer = msgpack.Packer(encoding='utf-8',
use_bin_type=True).pack
self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8')
@decorators.lazy_property(write=False)
def _message_ctrl(self):
return self.driver.message_controller
def _claim_counter_key(self, name, project):
return utils.scope_queue_name(name, project)
def _inc_counter(self, name, project, amount=1, pipe=None):
queue_key = utils.scope_queue_name(name, project)
client = pipe if pipe is not None else self._client
client.hincrby(queue_key, 'c', amount)
def _inc_claimed(self, name, project, amount=1, pipe=None):
queue_key = utils.scope_queue_name(name, project)
client = pipe if pipe is not None else self._client
client.hincrby(queue_key, 'cl', amount)
# TODO(kgriffs): Reimplement in Lua; this is way too expensive!
def _get_expired_message_count(self, name, project):
"""Calculate the number of expired messages in the queue.
Used to compute the stats on the queue.
Method has O(n) complexity as we iterate the entire list of
messages.
"""
messages_set_key = utils.scope_message_ids_set(name, project,
MESSAGE_IDS_SUFFIX)
with self._client.pipeline() as pipe:
for msg_key in self._client.zrange(messages_set_key, 0, -1):
pipe.hgetall(msg_key)
raw_messages = pipe.execute()
expired = 0
now = timeutils.utcnow_ts()
for msg in raw_messages:
if msg:
msg = messages.Message.from_redis(msg)
if utils.msg_expired_filter(msg, now):
expired += 1
return expired
def _get_queue_info(self, queue_key, fields, transform=str):
"""Get one or more fields from Queue Info."""
values = self._client.hmget(queue_key, fields)
return [transform(v) for v in values] if transform else values
@utils.raises_conn_error
@utils.retries_on_connection_error
def list(self, project=None, marker=None,
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
client = self._client
qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
marker = utils.scope_queue_name(marker, project)
rank = client.zrank(qset_key, marker)
start = rank + 1 if rank else 0
cursor = (q for q in client.zrange(qset_key, start,
start + limit - 1))
marker_next = {}
def denormalizer(info, name):
queue = {'name': utils.descope_queue_name(name)}
marker_next['next'] = queue['name']
if detailed:
queue['metadata'] = info[1]
return queue
yield utils.QueueListCursor(self._client, cursor, denormalizer)
yield marker_next and marker_next['next']
def get(self, name, project=None):
"""Obtain the metadata from the queue."""
return self.get_metadata(name, project)
@utils.raises_conn_error
def create(self, name, metadata=None, project=None):
# TODO(prashanthr_): Implement as a lua script.
queue_key = utils.scope_queue_name(name, project)
qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
# Check if the queue already exists.
if self.exists(name, project):
return False
queue = {
'c': 0,
'cl': 0,
'm': self._packer(metadata or {}),
't': timeutils.utcnow_ts()
}
# Pipeline ensures atomic inserts.
with self._client.pipeline() as pipe:
pipe.zadd(qset_key, 1, queue_key).hmset(queue_key, queue)
try:
pipe.execute()
except redis.exceptions.ResponseError:
return False
return True
@utils.raises_conn_error
@utils.retries_on_connection_error
def exists(self, name, project=None):
# TODO(prashanthr_): Cache this lookup
queue_key = utils.scope_queue_name(name, project)
qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
return self._client.zrank(qset_key, queue_key) is not None
@utils.raises_conn_error
@utils.retries_on_connection_error
def set_metadata(self, name, metadata, project=None):
if not self.exists(name, project):
raise errors.QueueDoesNotExist(name, project)
key = utils.scope_queue_name(name, project)
fields = {'m': self._packer(metadata)}
self._client.hmset(key, fields)
@utils.raises_conn_error
@utils.retries_on_connection_error
def get_metadata(self, name, project=None):
if not self.exists(name, project):
raise errors.QueueDoesNotExist(name, project)
queue_key = utils.scope_queue_name(name, project)
metadata = self._get_queue_info(queue_key, b'm', None)[0]
return self._unpacker(metadata)
@utils.raises_conn_error
@utils.retries_on_connection_error
def delete(self, name, project=None):
queue_key = utils.scope_queue_name(name, project)
qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
# NOTE(prashanthr_): Pipelining is used to mitigate race conditions
with self._client.pipeline() as pipe:
pipe.zrem(qset_key, queue_key)
pipe.delete(queue_key)
self._message_ctrl._delete_queue_messages(name, project, pipe)
pipe.execute()
@utils.raises_conn_error
@utils.retries_on_connection_error
def stats(self, name, project=None):
if not self.exists(name, project=project):
raise errors.QueueDoesNotExist(name, project)
queue_key = utils.scope_queue_name(name, project)
claimed, total = self._get_queue_info(queue_key, [b'cl', b'c'], int)
expired = self._get_expired_message_count(name, project)
message_stats = {
'claimed': claimed,
'free': total - claimed - expired,
'total': total
}
try:
newest = self._message_ctrl.first(name, project, -1)
oldest = self._message_ctrl.first(name, project, 1)
except errors.QueueIsEmpty:
pass
else:
message_stats['newest'] = newest
message_stats['oldest'] = oldest
return {'messages': message_stats}

View File

@ -0,0 +1,193 @@
# Copyright (c) 2014 Prashanth Raghu.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import sys
import time
import uuid
import redis
import six
from zaqar.i18n import _
from zaqar.openstack.common import log as logging
from zaqar.openstack.common import strutils
from zaqar.queues.storage import errors
LOG = logging.getLogger(__name__)
def descope_queue_name(scoped_name):
"""Descope Queue name with '.'.
Returns the queue name from the scoped name
which is of the form project-id.queue-name
"""
return scoped_name.split('.')[1]
def normalize_none_str(string_or_none):
"""Returns '' IFF given value is None, passthrough otherwise.
This function normalizes None to the empty string to facilitate
string concatenation when a variable could be None.
"""
# TODO(prashanthr_) : Try to reuse this utility. Violates DRY
return '' if string_or_none is None else string_or_none
def generate_uuid():
return str(uuid.uuid4())
def scope_queue_name(queue=None, project=None):
"""Returns a scoped name for a queue based on project and queue.
If only the project name is specified, a scope signifying "all queues"
for that project is returned. If neither queue nor project are
specified, a scope for "all global queues" is returned, which
is to be interpreted as excluding queues scoped by project.
:returns: '{project}.{queue}' if project and queue are given,
'{project}.' if ONLY project is given, '.{queue}' if ONLY
queue is given, and '.' if neither are given.
"""
# TODO(prashanthr_) : Try to reuse this utility. Violates DRY
return normalize_none_str(project) + '.' + normalize_none_str(queue)
# NOTE(prashanthr_): Aliase the scope_queue_name function
# to be used in the pools and claims controller as similar
# functionality is required to scope redis id's.
scope_pool_catalogue = scope_claim_messages = scope_queue_name
def scope_message_ids_set(queue=None, project=None, message_suffix=''):
"""Scope messages set with '.'
Returns a scoped name for the list of messages in the form
project-id_queue-name_suffix
"""
return (normalize_none_str(project) + '.' +
normalize_none_str(queue) + '.' +
message_suffix)
# NOTE(prashanthr_): Aliasing the scope_message_ids_set function
# to be used in the pools and claims controller as similar
# functionality is required to scope redis id's.
scope_queue_catalogue = scope_claims_set = scope_message_ids_set
scope_queue_index = scope_message_ids_set
def raises_conn_error(func):
"""Handles the Redis ConnectionFailure error.
This decorator catches Redis's ConnectionError
and raises Marconi's ConnectionError instead.
"""
# Note(prashanthr_) : Try to reuse this utility. Violates DRY
# Can pass exception type into the decorator and create a
# storage level utility.
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except redis.exceptions.ConnectionError as ex:
LOG.exception(ex)
raise errors.ConnectionError()
return wrapper
def retries_on_connection_error(func):
"""Causes the wrapped function to be re-called on ConnectionError.
This decorator catches Redis ConnectionError and retries
the function call.
.. Note::
Assumes that the decorated function has defined self.driver.redis_cinf
so that `max_reconnect_attempts` and `reconnect_sleep` can be taken
into account.
.. Warning:: The decorated function must be idempotent.
"""
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
# TODO(prashanthr_) : Try to reuse this utility. Violates DRY
# Can pass config parameters into the decorator and create a
# storage level utility.
max_attemps = self.driver.redis_conf.max_reconnect_attempts
sleep_sec = self.driver.redis_conf.reconnect_sleep
for attempt in range(max_attemps):
try:
return func(self, *args, **kwargs)
except redis.exceptions.ConnectionError:
ex = sys.exc_info()[1]
LOG.warn(_(u'Caught ConnectionError, retrying the '
'call to {0}').format(func))
time.sleep(sleep_sec * (2 ** attempt))
else:
LOG.error(_(u'Caught ConnectionError, maximum attempts '
'to {0} exceeded.').format(func))
raise ex
return wrapper
def msg_claimed_filter(message, now):
"""Return True IFF the message is currently claimed."""
return message.claim_id and (now < message.claim_expires)
def msg_echo_filter(message, client_uuid):
"""Return True IFF the specified client posted the message."""
return message.client_uuid == six.text_type(client_uuid)
def msg_expired_filter(message, now):
"""Return True IFF the message has expired."""
return message.expires <= now
class QueueListCursor(object):
def __init__(self, client, queues, denormalizer):
self.queue_iter = queues
self.denormalizer = denormalizer
self.client = client
def __iter__(self):
return self
@raises_conn_error
def next(self):
curr = next(self.queue_iter)
queue = self.client.hmget(curr, ['c', 'm'])
return self.denormalizer(queue, strutils.safe_decode(curr))
def __next__(self):
return self.next()

View File

@ -120,7 +120,7 @@ def keyify(key, iterable):
def can_connect(uri):
"""Given a URI, verifies whether its possible to connect to it.
"""Given a URI, verifies whether it's possible to connect to it.
:param uri: connection string to a storage endpoint
:type uri: six.text_type

View File

@ -169,13 +169,8 @@ class CollectionResource(object):
except storage_errors.MessageConflict as ex:
LOG.exception(ex)
message_ids = ex.succeeded_ids
if not message_ids:
# TODO(kgriffs): Include error code that is different
# from the code used in the generic case, below.
description = _(u'No messages could be enqueued.')
raise wsgi_errors.HTTPServiceUnavailable(description)
description = _(u'No messages could be enqueued.')
raise wsgi_errors.HTTPServiceUnavailable(description)
except Exception as ex:
LOG.exception(ex)

View File

@ -190,13 +190,8 @@ class CollectionResource(object):
except storage_errors.MessageConflict as ex:
LOG.exception(ex)
message_ids = ex.succeeded_ids
if not message_ids:
# TODO(kgriffs): Include error code that is different
# from the code used in the generic case, below.
description = _(u'No messages could be enqueued.')
raise wsgi_errors.HTTPServiceUnavailable(description)
description = _(u'No messages could be enqueued.')
raise wsgi_errors.HTTPServiceUnavailable(description)
except Exception as ex:
LOG.exception(ex)

View File

@ -25,4 +25,5 @@ RUN_SLOW_TESTS = not SKIP_SLOW_TESTS
expect = helpers.expect
is_slow = helpers.is_slow
requires_mongodb = helpers.requires_mongodb
requires_redis = helpers.requires_redis
TestBase = base.TestBase

View File

@ -24,6 +24,7 @@ import testtools
SKIP_SLOW_TESTS = os.environ.get('ZAQAR_TEST_SLOW') is None
SKIP_MONGODB_TESTS = os.environ.get('ZAQAR_TEST_MONGODB') is None
SKIP_REDIS_TESTS = os.environ.get('ZAQAR_TEST_REDIS') is None
@contextlib.contextmanager
@ -205,6 +206,22 @@ def requires_mongodb(test_case):
return testtools.skipIf(SKIP_MONGODB_TESTS, reason)(test_case)
def requires_redis(test_case):
"""Decorator to flag a test case as being dependent on Redis.
Redis-specific tests will be skipped unless the MARCONI_TEST_REDIS
environment variable is set. If the variable is set, the tests will
assume that redis is running and listening on localhost.
"""
reason = ('Skipping tests that require Redis. Ensure '
'Redis is running on localhost and then set '
'ZAQAR_TEST_REDIS in order to enable tests '
'that are specific to this storage backend. ')
return testtools.skipIf(SKIP_REDIS_TESTS, reason)(test_case)
def is_slow(condition=lambda self: True):
"""Decorator to flag slow tests.

View File

@ -226,6 +226,119 @@ class QueueControllerTest(ControllerBaseTest):
self.assertNotIn('newest', message_stats)
self.assertNotIn('oldest', message_stats)
def test_queue_count_on_bulk_delete(self):
self.addCleanup(self.controller.delete, 'test-queue',
project=self.project)
queue_name = 'test-queue'
client_uuid = uuid.uuid4()
created = self.controller.create(queue_name, project=self.project)
self.assertTrue(created)
# Create 10 messages.
msg_keys = _insert_fixtures(self.message_controller, queue_name,
project=self.project,
client_uuid=client_uuid, num=10)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 10)
# Delete 5 messages
self.message_controller.bulk_delete(queue_name, msg_keys[0:5],
self.project)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 5)
def test_queue_count_on_bulk_delete_with_invalid_id(self):
self.addCleanup(self.controller.delete, 'test-queue',
project=self.project)
queue_name = 'test-queue'
client_uuid = uuid.uuid4()
created = self.controller.create(queue_name, project=self.project)
self.assertTrue(created)
# Create 10 messages.
msg_keys = _insert_fixtures(self.message_controller, queue_name,
project=self.project,
client_uuid=client_uuid, num=10)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 10)
# Delete 5 messages
self.message_controller.bulk_delete(queue_name,
msg_keys[0:5] + ['invalid'],
self.project)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 5)
def test_queue_count_on_delete(self):
self.addCleanup(self.controller.delete, 'test-queue',
project=self.project)
queue_name = 'test-queue'
client_uuid = uuid.uuid4()
created = self.controller.create(queue_name, project=self.project)
self.assertTrue(created)
# Create 10 messages.
msg_keys = _insert_fixtures(self.message_controller, queue_name,
project=self.project,
client_uuid=client_uuid, num=10)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 10)
# Delete 1 message
self.message_controller.delete(queue_name, msg_keys[0],
self.project)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 9)
def test_queue_count_on_claim_delete(self):
self.addCleanup(self.controller.delete, 'test-queue',
project=self.project)
queue_name = 'test-queue'
client_uuid = uuid.uuid4()
created = self.controller.create(queue_name, project=self.project)
self.assertTrue(created)
# Create 15 messages.
_insert_fixtures(self.message_controller, queue_name,
project=self.project,
client_uuid=client_uuid, num=15)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['total'], 15)
metadata = {'ttl': 120, 'grace': 60}
# Claim 10 messages
claim_id, _ = self.claim_controller.create(queue_name, metadata,
self.project)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['claimed'], 10)
# Delete the claim
self.claim_controller.delete(queue_name, claim_id,
self.project)
stats = self.controller.stats(queue_name,
self.project)['messages']
self.assertEqual(stats['claimed'], 0)
class MessageControllerTest(ControllerBaseTest):
"""Message Controller base tests.
@ -1164,5 +1277,5 @@ def _insert_fixtures(controller, queue_name, project=None,
'event': 'Event number {0}'.format(n)
}}
controller.post(queue_name, messages(),
project=project, client_uuid=client_uuid)
return controller.post(queue_name, messages(),
project=project, client_uuid=client_uuid)