From 76f6e063db982cd45a2b9c75d9a01e1533f905d1 Mon Sep 17 00:00:00 2001
From: Prashanth Raghu
Date: Wed, 20 Aug 2014 22:20:29 +0800
Subject: [PATCH] Implements queue, message and claim controllers for Redis
This patch implements the standard controllers for the redis
storage driver. It has been tested against a localhost Redis
server with ZAQAR_TEST_REDIS=1.
Change-Id: Ib7c100afd11a0410c3f241c1925d5aaf172ce6a8
Partially-Implements: blueprint redis-storage-driver
---
etc/zaqar.conf.sample | 20 +
setup.cfg | 3 +
tests/etc/wsgi_redis.conf | 15 +
tests/etc/wsgi_redis_pooled.conf | 11 +
tests/unit/common/storage/test_utils.py | 18 +-
tests/unit/queues/storage/test_impl_redis.py | 277 +++++++++++
zaqar/queues/storage/errors.py | 26 +-
zaqar/queues/storage/mongodb/messages.py | 4 +-
zaqar/queues/storage/redis/__init__.py | 0
zaqar/queues/storage/redis/claims.py | 368 ++++++++++++++
zaqar/queues/storage/redis/controllers.py | 22 +
zaqar/queues/storage/redis/driver.py | 107 ++++
zaqar/queues/storage/redis/messages.py | 483 +++++++++++++++++++
zaqar/queues/storage/redis/models.py | 124 +++++
zaqar/queues/storage/redis/options.py | 40 ++
zaqar/queues/storage/redis/queues.py | 253 ++++++++++
zaqar/queues/storage/redis/utils.py | 193 ++++++++
zaqar/queues/storage/utils.py | 2 +-
zaqar/queues/transport/wsgi/v1_0/messages.py | 9 +-
zaqar/queues/transport/wsgi/v1_1/messages.py | 9 +-
zaqar/tests/__init__.py | 1 +
zaqar/tests/helpers.py | 17 +
zaqar/tests/queues/storage/base.py | 117 ++++-
23 files changed, 2083 insertions(+), 36 deletions(-)
create mode 100644 tests/etc/wsgi_redis.conf
create mode 100644 tests/etc/wsgi_redis_pooled.conf
create mode 100644 tests/unit/queues/storage/test_impl_redis.py
create mode 100644 zaqar/queues/storage/redis/__init__.py
create mode 100644 zaqar/queues/storage/redis/claims.py
create mode 100644 zaqar/queues/storage/redis/controllers.py
create mode 100644 zaqar/queues/storage/redis/driver.py
create mode 100644 zaqar/queues/storage/redis/messages.py
create mode 100644 zaqar/queues/storage/redis/models.py
create mode 100644 zaqar/queues/storage/redis/options.py
create mode 100644 zaqar/queues/storage/redis/queues.py
create mode 100644 zaqar/queues/storage/redis/utils.py
diff --git a/etc/zaqar.conf.sample b/etc/zaqar.conf.sample
index 218be7f53..e19745d23 100644
--- a/etc/zaqar.conf.sample
+++ b/etc/zaqar.conf.sample
@@ -142,6 +142,26 @@
# Storage driver to use. (string value)
#storage=sqlite
+[drivers:storage:redis]
+
+#
+# Options defined in zaqar.queues.storage.redis
+#
+
+# Redis Server URI. Socket file based connectors are supported.
+# Example for socket file connector: redis:/tmp/redis.sock'
+#uri=
+
+# Maximum number of times to retry an operation that failed
+# due to a primary node failover. (integer value)
+#max_reconnect_attempts=10
+
+# Base sleep interval between attempts to reconnect after a
+# primary node failover. The actual sleep time increases
+# exponentially (power of 2) each time the operation is
+# retried. (floating point value)
+#reconnect_sleep=0.02
+
[drivers:storage:mongodb]
diff --git a/setup.cfg b/setup.cfg
index ac53d0c05..9e711de54 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -41,12 +41,14 @@ zaqar.queues.data.storage =
sqlite = zaqar.queues.storage.sqlalchemy.driver:DataDriver
sqlalchemy = zaqar.queues.storage.sqlalchemy.driver:DataDriver
mongodb = zaqar.queues.storage.mongodb.driver:DataDriver
+ redis = zaqar.queues.storage.redis.driver:DataDriver
faulty = zaqar.tests.faulty_storage:DataDriver
zaqar.queues.control.storage =
sqlite = zaqar.queues.storage.sqlalchemy.driver:ControlDriver
sqlalchemy = zaqar.queues.storage.sqlalchemy.driver:ControlDriver
mongodb = zaqar.queues.storage.mongodb.driver:ControlDriver
+ redis = zaqar.queues.storage.redis.driver:ControlDriver
faulty = zaqar.tests.faulty_storage:ControlDriver
zaqar.queues.transport =
@@ -60,6 +62,7 @@ oslo.config.opts =
zaqar.queues.storage.pipeline = zaqar.queues.storage.pipeline:_config_options
zaqar.queues.storage.pooling = zaqar.queues.storage.pooling:_config_options
zaqar.queues.storage.mongodb = zaqar.queues.storage.mongodb.options:_config_options
+ zaqar.queues.storage.redis = zaqar.queues.storage.redis.options:_config_option
zaqar.queues.storage.sqlalchemy = zaqar.queues.storage.sqlalchemy.options:_config_options
zaqar.queues.transport.wsgi = zaqar.queues.transport.wsgi.driver:_config_options
zaqar.queues.transport.base = zaqar.queues.transport.base:_config_options
diff --git a/tests/etc/wsgi_redis.conf b/tests/etc/wsgi_redis.conf
new file mode 100644
index 000000000..5673aac6f
--- /dev/null
+++ b/tests/etc/wsgi_redis.conf
@@ -0,0 +1,15 @@
+[DEFAULT]
+debug = False
+verbose = False
+
+[drivers]
+transport = wsgi
+storage = redis
+
+[drivers:transport:wsgi]
+port = 8888
+
+[drivers:storage:redis]
+uri = redis://127.0.0.1:6379
+max_reconnect_attempts = 3
+reconnect_sleep = 1
\ No newline at end of file
diff --git a/tests/etc/wsgi_redis_pooled.conf b/tests/etc/wsgi_redis_pooled.conf
new file mode 100644
index 000000000..104e7e026
--- /dev/null
+++ b/tests/etc/wsgi_redis_pooled.conf
@@ -0,0 +1,11 @@
+[DEFAULT]
+pooling = True
+
+[drivers]
+transport = wsgi
+storage = redis
+
+[drivers:storage:redis]
+uri = redis://127.0.0.1:6379
+max_reconnect_attempts = 3
+reconnect_sleep = 1
\ No newline at end of file
diff --git a/tests/unit/common/storage/test_utils.py b/tests/unit/common/storage/test_utils.py
index 594ba5b5e..9285cd998 100644
--- a/tests/unit/common/storage/test_utils.py
+++ b/tests/unit/common/storage/test_utils.py
@@ -29,11 +29,15 @@ class TestUtils(testing.TestBase):
def test_can_connect_suceeds_if_good_uri_sqlite(self):
self.assertTrue(utils.can_connect('sqlite://:memory:'))
- @ddt.data(
- 'mongodb://localhost:27018', # wrong port
- 'localhost:27017', # missing scheme
- 'redis://localhost:6379' # not supported with default install
- )
+ def test_can_connect_fails_if_bad_uri_missing_schema(self):
+ self.assertFalse(utils.can_connect('localhost:27017'))
+
@testing.requires_mongodb
- def test_can_connect_fails_if_bad_uri(self, uri):
- self.assertFalse(utils.can_connect(uri))
+ def test_can_connect_fails_if_bad_uri_mongodb(self):
+ self.assertFalse(utils.can_connect('mongodb://localhost:8080'))
+ self.assertFalse(utils.can_connect('mongodb://example.com:27017'))
+
+ @testing.requires_redis
+ def test_can_connect_fails_if_bad_uri_redis(self):
+ self.assertFalse(utils.can_connect('redis://localhost:8080'))
+ self.assertFalse(utils.can_connect('redis://example.com:6379'))
diff --git a/tests/unit/queues/storage/test_impl_redis.py b/tests/unit/queues/storage/test_impl_redis.py
new file mode 100644
index 000000000..f2e2046fb
--- /dev/null
+++ b/tests/unit/queues/storage/test_impl_redis.py
@@ -0,0 +1,277 @@
+# Copyright (c) 2014 Prashanth Raghu.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import time
+import uuid
+
+import redis
+
+from zaqar.openstack.common.cache import cache as oslo_cache
+from zaqar.openstack.common import timeutils
+from zaqar.queues import storage
+from zaqar.queues.storage.redis import controllers
+from zaqar.queues.storage.redis import driver
+from zaqar.queues.storage.redis import messages
+from zaqar.queues.storage.redis import options
+from zaqar.queues.storage.redis import utils
+from zaqar import tests as testing
+from zaqar.tests.queues.storage import base
+
+
+def _create_sample_message(now=None, claimed=False, body=None):
+ if now is None:
+ now = timeutils.utcnow_ts()
+
+ if claimed:
+ claim_id = uuid.uuid4()
+ claim_expires = now + 300
+ else:
+ claim_id = None
+ claim_expires = now
+
+ if body is None:
+ body = {}
+
+ return messages.Message(
+ ttl=60,
+ created=now,
+ client_uuid=uuid.uuid4(),
+ claim_id=claim_id,
+ claim_expires=claim_expires,
+ body=body
+ )
+
+
+class RedisUtilsTest(testing.TestBase):
+
+ config_file = 'wsgi_redis.conf'
+
+ def setUp(self):
+ super(RedisUtilsTest, self).setUp()
+
+ self.conf.register_opts(options.REDIS_OPTIONS,
+ group=options.REDIS_GROUP)
+
+ self.redis_conf = self.conf[options.REDIS_GROUP]
+
+ MockDriver = collections.namedtuple('MockDriver', 'redis_conf')
+
+ self.driver = MockDriver(self.redis_conf)
+
+ def test_scope_queue_name(self):
+ self.assertEqual(utils.scope_queue_name('my-q'), '.my-q')
+ self.assertEqual(utils.scope_queue_name('my-q', None), '.my-q')
+ self.assertEqual(utils.scope_queue_name('my-q', '123'), '123.my-q')
+ self.assertEqual(utils.scope_queue_name('my-q_1', '123'), '123.my-q_1')
+
+ self.assertEqual(utils.scope_queue_name(), '.')
+ self.assertEqual(utils.scope_queue_name(None, '123'), '123.')
+
+ def test_scope_messages_set(self):
+ self.assertEqual(utils.scope_message_ids_set('my-q'), '.my-q.')
+ self.assertEqual(utils.scope_message_ids_set('my-q', 'p'), 'p.my-q.')
+ self.assertEqual(utils.scope_message_ids_set('my-q', 'p', 's'),
+ 'p.my-q.s')
+
+ self.assertEqual(utils.scope_message_ids_set(None), '..')
+ self.assertEqual(utils.scope_message_ids_set(None, '123'), '123..')
+ self.assertEqual(utils.scope_message_ids_set(None, None, 's'), '..s')
+
+ def test_normalize_none_str(self):
+ self.assertEqual(utils.normalize_none_str('my-q'), 'my-q')
+ self.assertEqual(utils.normalize_none_str(None), '')
+
+ def test_msg_claimed_filter(self):
+ now = timeutils.utcnow_ts()
+
+ unclaimed_msg = _create_sample_message()
+ self.assertFalse(utils.msg_claimed_filter(unclaimed_msg, now))
+
+ claimed_msg = _create_sample_message(claimed=True)
+ self.assertTrue(utils.msg_claimed_filter(claimed_msg, now))
+
+ # NOTE(kgriffs): Has a claim ID, but the claim is expired
+ claimed_msg.claim_expires = now - 60
+ self.assertFalse(utils.msg_claimed_filter(claimed_msg, now))
+
+ def test_descope_queue_name(self):
+ self.assertEqual(utils.descope_queue_name('p.q'), 'q')
+ self.assertEqual(utils.descope_queue_name('.q'), 'q')
+ self.assertEqual(utils.descope_queue_name('.'), '')
+
+ def test_msg_echo_filter(self):
+ msg = _create_sample_message()
+ self.assertTrue(utils.msg_echo_filter(msg, msg.client_uuid))
+
+ alt_uuid = utils.generate_uuid()
+ self.assertFalse(utils.msg_echo_filter(msg, alt_uuid))
+
+ def test_basic_message(self):
+ now = timeutils.utcnow_ts()
+ body = {
+ 'msg': 'Hello Earthlings!',
+ 'unicode': u'ab\u00e7',
+ 'bytes': b'ab\xc3\xa7',
+ b'ab\xc3\xa7': 'one, two, three',
+ u'ab\u00e7': 'one, two, three',
+ }
+
+ msg = _create_sample_message(now=now, body=body)
+ basic_msg = msg.to_basic(now + 5)
+
+ self.assertEqual(basic_msg['id'], msg.id)
+ self.assertEqual(basic_msg['age'], 5)
+ self.assertEqual(basic_msg['body'], body)
+ self.assertEqual(basic_msg['ttl'], msg.ttl)
+
+ def test_retries_on_connection_error(self):
+ num_calls = [0]
+
+ @utils.retries_on_connection_error
+ def _raises_connection_error(self):
+ num_calls[0] += 1
+ raise redis.exceptions.ConnectionError
+
+ self.assertRaises(redis.exceptions.ConnectionError,
+ _raises_connection_error, self)
+ self.assertEqual(num_calls, [self.redis_conf.max_reconnect_attempts])
+
+
+@testing.requires_redis
+class RedisDriverTest(testing.TestBase):
+
+ config_file = 'wsgi_redis.conf'
+
+ def test_db_instance(self):
+ cache = oslo_cache.get_cache()
+ redis_driver = driver.DataDriver(self.conf, cache)
+
+ self.assertTrue(isinstance(redis_driver.connection, redis.StrictRedis))
+
+
+@testing.requires_redis
+class RedisQueuesTest(base.QueueControllerTest):
+
+ driver_class = driver.DataDriver
+ config_file = 'wsgi_redis.conf'
+ controller_class = controllers.QueueController
+
+ def setUp(self):
+ super(RedisQueuesTest, self).setUp()
+ self.connection = self.driver.connection
+ self.msg_controller = self.driver.message_controller
+
+ def tearDown(self):
+ super(RedisQueuesTest, self).tearDown()
+ self.connection.flushdb()
+
+ def test_inc_counter(self):
+ queue_name = 'inc-counter'
+ self.controller.create(queue_name)
+ self.controller._inc_counter(queue_name, None, 10)
+
+ scoped_q_name = utils.scope_queue_name(queue_name)
+ count = self.controller._get_queue_info(scoped_q_name, b'c', int)[0]
+ self.assertEqual(count, 10)
+
+ def test_inc_claimed(self):
+ self.addCleanup(self.controller.delete, 'test-queue',
+ project=self.project)
+
+ queue_name = 'inc-claimed'
+
+ self.controller.create(queue_name)
+ self.controller._inc_claimed(queue_name, None, 10)
+
+ scoped_q_name = utils.scope_queue_name(queue_name)
+ claimed = self.controller._get_queue_info(scoped_q_name,
+ b'cl', int)[0]
+ self.assertEqual(claimed, 10)
+
+
+@testing.requires_redis
+class RedisMessagesTest(base.MessageControllerTest):
+ driver_class = driver.DataDriver
+ config_file = 'wsgi_redis.conf'
+ controller_class = controllers.MessageController
+
+ def setUp(self):
+ super(RedisMessagesTest, self).setUp()
+ self.connection = self.driver.connection
+ self.q_controller = self.driver.queue_controller
+
+ def tearDown(self):
+ super(RedisMessagesTest, self).tearDown()
+ self.connection.flushdb()
+
+ def test_get_count(self):
+ queue_name = 'get-count'
+ self.q_controller.create(queue_name)
+
+ msgs = [{
+ 'ttl': 300,
+ 'body': 'di mo fy'
+ } for i in range(0, 10)]
+
+ client_id = uuid.uuid4()
+ # Creating 10 messages
+ self.controller.post(queue_name, msgs, client_id)
+
+ messages_set_id = utils.scope_message_ids_set(queue_name, None,
+ 'messages')
+
+ num_msg = self.controller._get_count(messages_set_id)
+ self.assertEqual(num_msg, 10)
+
+ def test_empty_queue_exception(self):
+ queue_name = 'empty-queue-test'
+ self.q_controller.create(queue_name)
+
+ self.assertRaises(storage.errors.QueueIsEmpty,
+ self.controller.first, queue_name)
+
+
+@testing.requires_redis
+class RedisClaimsTest(base.ClaimControllerTest):
+ driver_class = driver.DataDriver
+ config_file = 'wsgi_redis.conf'
+ controller_class = controllers.ClaimController
+
+ def setUp(self):
+ super(RedisClaimsTest, self).setUp()
+ self.connection = self.driver.connection
+ self.q_controller = self.driver.queue_controller
+
+ def tearDown(self):
+ super(RedisClaimsTest, self).tearDown()
+ self.connection.flushdb()
+
+ def test_claim_doesnt_exist(self):
+ queue_name = 'no-such-claim'
+ epoch = '000000000000000000000000'
+ self.q_controller.create(queue_name)
+ self.assertRaises(storage.errors.ClaimDoesNotExist,
+ self.controller.get, queue_name,
+ epoch, project=None)
+
+ claim_id, messages = self.controller.create(queue_name, {'ttl': 2,
+ 'grace': 0},
+ project=None)
+
+ # Lets let it expire
+ time.sleep(2)
+ self.assertRaises(storage.errors.ClaimDoesNotExist,
+ self.controller.update, queue_name,
+ claim_id, {}, project=None)
\ No newline at end of file
diff --git a/zaqar/queues/storage/errors.py b/zaqar/queues/storage/errors.py
index 7b47bb8c6..34e6aa51c 100644
--- a/zaqar/queues/storage/errors.py
+++ b/zaqar/queues/storage/errors.py
@@ -42,26 +42,34 @@ class Conflict(ExceptionBase):
class MessageConflict(Conflict):
msg_format = (u'Message could not be enqueued due to a conflict '
- u'with another message that is already in '
+ u'with one or more other messages that are already in '
u'queue {queue} for project {project}')
- def __init__(self, queue, project, message_ids):
+ def __init__(self, queue, project):
"""Initializes the error with contextual information.
:param queue: name of the queue to which the message was posted
:param project: name of the project to which the queue belongs
- :param message_ids: list of IDs for messages successfully
- posted. Note that these must be in the same order as the
- list of messages originally submitted to be enqueued.
"""
super(MessageConflict, self).__init__(queue=queue, project=project)
- self._succeeded_ids = message_ids
- @property
- def succeeded_ids(self):
- return self._succeeded_ids
+
+class ClaimConflict(Conflict):
+
+ msg_format = (u'Messages could not be claimed due to a conflict '
+ u'with another parallel claim that is already in '
+ u'queue {queue} for project {project}')
+
+ def __init__(self, queue, project):
+ """Initializes the error with contextual information.
+
+ :param queue: name of the queue to which the message was posted
+ :param project: name of the project to which the queue belongs
+ """
+
+ super(ClaimConflict, self).__init__(queue=queue, project=project)
class QueueDoesNotExist(DoesNotExist):
diff --git a/zaqar/queues/storage/mongodb/messages.py b/zaqar/queues/storage/mongodb/messages.py
index 72e37036a..bb5244af8 100644
--- a/zaqar/queues/storage/mongodb/messages.py
+++ b/zaqar/queues/storage/mongodb/messages.py
@@ -668,9 +668,7 @@ class MessageController(storage.Message):
queue=queue_name,
project=project))
- succeeded_ids = []
- raise errors.MessageConflict(queue_name, project,
- succeeded_ids)
+ raise errors.MessageConflict(queue_name, project)
@utils.raises_conn_error
@utils.retries_on_autoreconnect
diff --git a/zaqar/queues/storage/redis/__init__.py b/zaqar/queues/storage/redis/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/zaqar/queues/storage/redis/claims.py b/zaqar/queues/storage/redis/claims.py
new file mode 100644
index 000000000..e1348126a
--- /dev/null
+++ b/zaqar/queues/storage/redis/claims.py
@@ -0,0 +1,368 @@
+# Copyright (c) 2014 Prashanth Raghu.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import functools
+
+import msgpack
+import redis
+
+from zaqar.common import decorators
+from zaqar.openstack.common import log as logging
+from zaqar.openstack.common import timeutils
+from zaqar.queues import storage
+from zaqar.queues.storage import errors
+from zaqar.queues.storage.redis import messages
+from zaqar.queues.storage.redis import utils
+
+LOG = logging.getLogger(__name__)
+
+QUEUE_CLAIMS_SUFFIX = 'claims'
+CLAIM_MESSAGES_SUFFIX = 'messages'
+
+RETRY_CLAIM_TIMEOUT = 10
+
+
+class ClaimController(storage.Claim):
+ """Implements claim resource operations using Redis.
+
+ Redis Data Structures:
+ ----------------------
+ Claims list (Redis set) contains claim ids
+
+ Key:
+
+ Name Field
+ -------------------------
+ claim_ids m
+
+ Claimed Messages (Redis set) contains the list of
+ message ids stored per claim
+
+ Key: _messages
+
+ Claim info(Redis Hash):
+
+ Key:
+
+ Name Field
+ -------------------------
+ ttl -> t
+ id -> id
+ expires -> e
+ """
+ def __init__(self, *args, **kwargs):
+ super(ClaimController, self).__init__(*args, **kwargs)
+ self._client = self.driver.connection
+
+ self._packer = msgpack.Packer(encoding='utf-8',
+ use_bin_type=True).pack
+ self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8')
+
+ def _get_claim_info(self, claim_id, fields, transform=int):
+ """Get one or more fields from the claim Info."""
+
+ values = self._client.hmget(claim_id, fields)
+ return [transform(v) for v in values] if transform else values
+
+ def _exists(self, queue, claim_id, project):
+ client = self._client
+ claims_set_key = utils.scope_claims_set(queue, project,
+ QUEUE_CLAIMS_SUFFIX)
+
+ # Return False if no such claim exists
+ # TODO(prashanthr_): Discuss the feasibility of a bloom filter.
+ if not client.sismember(claims_set_key, claim_id):
+ return False
+
+ expires = self._get_claim_info(claim_id, b'e')[0]
+ now = timeutils.utcnow_ts()
+
+ if now > expires:
+ return False
+
+ return True
+
+ def _get_claimed_message_keys(self, claim_id):
+ return self._client.lrange(claim_id, 0, -1)
+
+ @decorators.lazy_property(write=False)
+ def _message_ctrl(self):
+ return self.driver.message_controller
+
+ @decorators.lazy_property(write=False)
+ def _queue_ctrl(self):
+ return self.driver.queue_controller
+
+ @utils.raises_conn_error
+ @utils.retries_on_connection_error
+ def get(self, queue, claim_id, project=None):
+ if not self._exists(queue, claim_id, project):
+ raise errors.ClaimDoesNotExist(queue, project, claim_id)
+
+ claim_msgs_key = utils.scope_claim_messages(claim_id,
+ CLAIM_MESSAGES_SUFFIX)
+
+ # basic_messages
+ msg_keys = self._get_claimed_message_keys(claim_msgs_key)
+
+ with self._client.pipeline() as pipe:
+ for key in msg_keys:
+ pipe.hgetall(key)
+
+ raw_messages = pipe.execute()
+
+ now = timeutils.utcnow_ts()
+ basic_messages = [messages.Message.from_redis(msg).to_basic(now)
+ for msg in raw_messages if msg]
+
+ # claim_meta
+ now = timeutils.utcnow_ts()
+ expires, ttl = self._get_claim_info(claim_id, [b'e', b't'])
+ update_time = expires - ttl
+ age = now - update_time
+
+ claim_meta = {
+ 'age': age,
+ 'ttl': ttl,
+ 'id': claim_id,
+ }
+
+ return claim_meta, basic_messages
+
+ @utils.raises_conn_error
+ @utils.retries_on_connection_error
+ def create(self, queue, metadata, project=None,
+ limit=storage.DEFAULT_MESSAGES_PER_CLAIM):
+
+ ttl = int(metadata.get('ttl', 60))
+ grace = int(metadata.get('grace', 60))
+ msg_ttl = ttl + grace
+
+ claim_id = utils.generate_uuid()
+ claim_key = utils.scope_claim_messages(claim_id,
+ CLAIM_MESSAGES_SUFFIX)
+
+ claims_set_key = utils.scope_claims_set(queue, project,
+ QUEUE_CLAIMS_SUFFIX)
+
+ counter_key = self._queue_ctrl._claim_counter_key(queue, project)
+
+ with self._client.pipeline() as pipe:
+
+ start_ts = timeutils.utcnow_ts()
+
+ # NOTE(kgriffs): Retry the operation if another transaction
+ # completes before this one, in which case it will have
+ # claimed the same messages the current thread is trying
+ # to claim, and therefoe we must try for another batch.
+ #
+ # This loop will eventually time out if we can't manage to
+ # claim any messages due to other threads continually beating
+ # us to the punch.
+
+ # TODO(kgriffs): Would it be beneficial (or harmful) to
+ # introducce a backoff sleep in between retries?
+ while (timeutils.utcnow_ts() - start_ts) < RETRY_CLAIM_TIMEOUT:
+
+ # NOTE(kgriffs): The algorithm for claiming messages:
+ #
+ # 1. Get a batch of messages that are currently active.
+ # 2. For each active message in the batch, extend its
+ # lifetime IFF it would otherwise expire before the
+ # claim itself does.
+ # 3. Associate the claim with each message
+ # 4. Create a claim record with details such as TTL
+ # and expiration time.
+ # 5. Add the claim's ID to a set to facilitate fast
+ # existence checks.
+
+ results = self._message_ctrl._active(queue, project=project,
+ limit=limit)
+
+ cursor = next(results)
+ msg_list = list(cursor)
+
+ # NOTE(kgriffs): If there are no active messages to
+ # claim, simply return an empty list.
+ if not msg_list:
+ return (None, iter([]))
+
+ basic_messages = []
+
+ try:
+ # TODO(kgriffs): Is it faster/better to do this all
+ # in a Lua script instead of using an app-layer
+ # transaction?
+
+ # NOTE(kgriffs): Abort the entire transaction if
+ # another request beats us to the punch. We detect
+ # this by putting a watch on the key that will have
+ # one of its fields updated as the final step of
+ # the transaction.
+ pipe.watch(counter_key)
+ pipe.multi()
+
+ now = timeutils.utcnow_ts()
+
+ claim_expires = now + ttl
+ msg_expires = claim_expires + grace
+
+ # Associate the claim with each message
+ for msg in msg_list:
+ msg.claim_id = claim_id
+ msg.claim_expires = claim_expires
+
+ if _msg_would_expire(msg, msg_expires):
+ msg.ttl = msg_ttl
+ msg.expires = msg_expires
+
+ pipe.rpush(claim_key, msg.id)
+
+ # TODO(kgriffs): Rather than writing back the
+ # entire message, only set the fields that
+ # have changed.
+ msg.to_redis(pipe)
+
+ basic_messages.append(msg.to_basic(now))
+
+ # Create the claim
+ claim_info = {
+ 'id': claim_id,
+ 't': ttl,
+ 'e': claim_expires
+ }
+
+ pipe.hmset(claim_id, claim_info)
+
+ # NOTE(kgriffs): Add the claim ID to a set so that
+ # existence checks can be performed quickly.
+ pipe.sadd(claims_set_key, claim_id)
+
+ # NOTE(kgriffs): Update a counter that facilitates
+ # the queue stats calculation.
+ self._queue_ctrl._inc_claimed(queue, project,
+ len(msg_list),
+ pipe=pipe)
+
+ pipe.execute()
+ return claim_id, basic_messages
+
+ except redis.exceptions.WatchError:
+ continue
+
+ raise errors.ClaimConflict(queue, project)
+
+ @utils.raises_conn_error
+ @utils.retries_on_connection_error
+ def update(self, queue, claim_id, metadata, project=None):
+ if not self._exists(queue, claim_id, project):
+ raise errors.ClaimDoesNotExist(claim_id, queue, project)
+
+ now = timeutils.utcnow_ts()
+
+ claim_ttl = int(metadata.get('ttl', 60))
+ claim_expires = now + claim_ttl
+
+ grace = int(metadata.get('grace', 60))
+ msg_ttl = claim_ttl + grace
+ msg_expires = claim_expires + grace
+
+ claim_messages = utils.scope_claim_messages(claim_id,
+ CLAIM_MESSAGES_SUFFIX)
+
+ msg_keys = self._get_claimed_message_keys(claim_messages)
+
+ with self._client.pipeline() as pipe:
+ for key in msg_keys:
+ pipe.hgetall(key)
+
+ claimed_msgs = pipe.execute()
+
+ claim_info = {
+ 't': claim_ttl,
+ 'e': claim_expires,
+ }
+
+ with self._client.pipeline() as pipe:
+ for msg in claimed_msgs:
+ if msg:
+ msg = messages.Message.from_redis(msg)
+ msg.claim_id = claim_id
+ msg.claim_expires = claim_expires
+
+ if _msg_would_expire(msg, msg_expires):
+ msg.ttl = msg_ttl
+ msg.expires = msg_expires
+
+ # TODO(kgriffs): Rather than writing back the
+ # entire message, only set the fields that
+ # have changed.
+ msg.to_redis(pipe)
+
+ # Update the claim id and claim expiration info
+ # for all the messages.
+ pipe.hmset(claim_id, claim_info)
+
+ pipe.execute()
+
+ @utils.raises_conn_error
+ @utils.retries_on_connection_error
+ def delete(self, queue, claim_id, project=None):
+ # NOTE(prashanthr_): Return silently when the claim
+ # does not exist
+ if not self._exists(queue, claim_id, project):
+ return
+
+ now = timeutils.utcnow_ts()
+ claim_messages_key = utils.scope_claim_messages(claim_id,
+ CLAIM_MESSAGES_SUFFIX)
+
+ msg_keys = self._get_claimed_message_keys(claim_messages_key)
+
+ with self._client.pipeline() as pipe:
+ for msg_key in msg_keys:
+ pipe.hgetall(msg_key)
+
+ claimed_msgs = pipe.execute()
+
+ # Update the claim id and claim expiration info
+ # for all the messages.
+ claims_set_key = utils.scope_claims_set(queue, project,
+ QUEUE_CLAIMS_SUFFIX)
+
+ with self._client.pipeline() as pipe:
+ pipe.srem(claims_set_key, claim_id)
+ pipe.delete(claim_id)
+ pipe.delete(claim_messages_key)
+
+ for msg in claimed_msgs:
+ if msg:
+ msg = messages.Message.from_redis(msg)
+ msg.claim_id = None
+ msg.claim_expires = now
+
+ # TODO(kgriffs): Rather than writing back the
+ # entire message, only set the fields that
+ # have changed.
+ msg.to_redis(pipe)
+
+ self._queue_ctrl._inc_claimed(queue, project,
+ -1 * len(claimed_msgs),
+ pipe=pipe)
+
+ pipe.execute()
+
+
+def _msg_would_expire(message, now):
+ return message.expires < now
diff --git a/zaqar/queues/storage/redis/controllers.py b/zaqar/queues/storage/redis/controllers.py
new file mode 100644
index 000000000..07b55cb4c
--- /dev/null
+++ b/zaqar/queues/storage/redis/controllers.py
@@ -0,0 +1,22 @@
+# Copyright (c) 2014 Prashanth Raghu
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from zaqar.queues.storage.redis import claims
+from zaqar.queues.storage.redis import messages
+from zaqar.queues.storage.redis import queues
+
+
+QueueController = queues.QueueController
+MessageController = messages.MessageController
+ClaimController = claims.ClaimController
diff --git a/zaqar/queues/storage/redis/driver.py b/zaqar/queues/storage/redis/driver.py
new file mode 100644
index 000000000..083625cc8
--- /dev/null
+++ b/zaqar/queues/storage/redis/driver.py
@@ -0,0 +1,107 @@
+# Copyright (c) 2014 Prashanth Raghu.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import redis
+from six.moves import urllib
+
+from zaqar.common import decorators
+from zaqar.openstack.common import log as logging
+from zaqar.queues import storage
+from zaqar.queues.storage.redis import controllers
+from zaqar.queues.storage.redis import options
+
+
+LOG = logging.getLogger(__name__)
+
+
+def _get_redis_client(conf):
+ # TODO(prashanthr_): Add SSL support
+ parsed_url = urllib.parse.urlparse(conf.uri)
+
+ if parsed_url.hostname:
+ return redis.StrictRedis(host=parsed_url.hostname,
+ port=parsed_url.port)
+ else:
+ return redis.StrictRedis(unix_socket_path=parsed_url.path)
+
+
+class DataDriver(storage.DataDriverBase):
+
+ def __init__(self, conf, cache):
+ super(DataDriver, self).__init__(conf, cache)
+
+ opts = options.REDIS_OPTIONS
+
+ if 'dynamic' in conf:
+ names = conf[options.REDIS_GROUP].keys()
+ opts = filter(lambda x: x.name not in names, opts)
+
+ self.conf.register_opts(opts,
+ group=options.REDIS_GROUP)
+ self.redis_conf = self.conf[options.REDIS_GROUP]
+
+ def is_alive(self):
+ try:
+ return self.connection.ping()
+ except redis.exceptions.ConnectionError:
+ return False
+
+ def _health(self):
+ KPI = {}
+ KPI['storage_reachable'] = self.is_alive()
+ KPI['operation_status'] = self._get_operation_status()
+
+ # TODO(kgriffs): Add metrics re message volume
+ return KPI
+
+ @decorators.lazy_property(write=False)
+ def connection(self):
+ """Redis client connection instance."""
+ return _get_redis_client(self.redis_conf)
+
+ @decorators.lazy_property(write=False)
+ def queue_controller(self):
+ return controllers.QueueController(self)
+
+ @decorators.lazy_property(write=False)
+ def message_controller(self):
+ return controllers.MessageController(self)
+
+ @decorators.lazy_property(write=False)
+ def claim_controller(self):
+ return controllers.ClaimController(self)
+
+
+class ControlDriver(storage.ControlDriverBase):
+
+ def __init__(self, conf, cache):
+ super(ControlDriver, self).__init__(conf, cache)
+
+ self.conf.register_opts(options.REDIS_OPTIONS,
+ group=options.REDIS_GROUP)
+
+ self.redis_conf = self.conf[options.REDIS_GROUP]
+
+ @decorators.lazy_property(write=False)
+ def connection(self):
+ """Redis client connection instance."""
+ return _get_redis_client(self.redis_conf)
+
+ @property
+ def pools_controller(self):
+ return None
+
+ @property
+ def catalogue_controller(self):
+ return None
diff --git a/zaqar/queues/storage/redis/messages.py b/zaqar/queues/storage/redis/messages.py
new file mode 100644
index 000000000..1978cae93
--- /dev/null
+++ b/zaqar/queues/storage/redis/messages.py
@@ -0,0 +1,483 @@
+# Copyright (c) 2014 Prashanth Raghu.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import functools
+import uuid
+
+import redis
+
+from zaqar.common import decorators
+from zaqar.openstack.common import strutils
+from zaqar.openstack.common import timeutils
+from zaqar.queues import storage
+from zaqar.queues.storage import errors
+from zaqar.queues.storage.redis import models
+from zaqar.queues.storage.redis import utils
+
+Message = models.Message
+
+
+MESSAGE_IDS_SUFFIX = 'messages'
+# The rank counter is an atomic index to rank messages
+# in a FIFO manner.
+MESSAGE_RANK_COUNTER_SUFFIX = 'rank_counter'
+
+# NOTE(kgriffs): This value, in seconds, should be at least less than the
+# minimum allowed TTL for messages (60 seconds).
+RETRY_POST_TIMEOUT = 10
+
+
+class MessageController(storage.Message):
+ """Implements message resource operations using Redis.
+
+ Messages are scoped by project + queue.
+
+ Redis Data Structures:
+ ----------------------
+ 1. Message id's list (Redis sorted set)
+
+ Each queue in the system has a set of message ids currently
+ in the queue. The list is sorted based on a ranking which is
+ incremented atomically using the counter(MESSAGE_RANK_COUNTER_SUFFIX)
+ also stored in the database for every queue.
+
+ Key:
+
+ 2. Messages(Redis Hash):
+
+ Scoped by the UUID of the message, the redis datastructure
+ has the following information.
+
+
+ Name Field
+ -----------------------------
+ id -> id
+ ttl -> t
+ expires -> e
+ body -> b
+ claim -> c
+ claim expiry time -> c.e
+ client uuid -> u
+ created time -> cr
+ """
+
+ def __init__(self, *args, **kwargs):
+ super(MessageController, self).__init__(*args, **kwargs)
+ self._client = self.driver.connection
+
+ @decorators.lazy_property(write=False)
+ def _queue_ctrl(self):
+ return self.driver.queue_controller
+
+ @decorators.lazy_property(write=False)
+ def _claim_ctrl(self):
+ return self.driver.claim_controller
+
+ def _active(self, queue_name, marker=None, echo=False,
+ client_uuid=None, project=None,
+ limit=None):
+ return self._list(queue_name, project=project, marker=marker,
+ echo=echo, client_uuid=client_uuid,
+ include_claimed=False,
+ limit=limit, to_basic=False)
+
+ def _get_count(self, msgset_key):
+ """Get num messages in a Queue.
+
+ Return the number of messages in a queue scoped by
+ queue and project.
+ """
+ return self._client.zcard(msgset_key)
+
+ @utils.raises_conn_error
+ @utils.retries_on_connection_error
+ def _delete_queue_messages(self, queue, project, pipe):
+ """Method to remove all the messages belonging to a queue.
+
+ Will be referenced from the QueueController.
+ The pipe to execute deletion will be passed from the QueueController
+ executing the operation.
+ """
+ client = self._client
+ msgset_key = utils.scope_message_ids_set(queue, project,
+ MESSAGE_IDS_SUFFIX)
+ message_ids = client.zrange(msgset_key, 0, -1)
+
+ pipe.delete(msgset_key)
+ for msg_id in message_ids:
+ pipe.delete(msg_id)
+
+ # TODO(prashanthr_): Looking for better ways to solve the issue.
+ def _find_first_unclaimed(self, queue, project, limit):
+ """Find the first unclaimed message in the queue."""
+
+ msgset_key = utils.scope_message_ids_set(queue, project,
+ MESSAGE_IDS_SUFFIX)
+ marker = 0
+ now = timeutils.utcnow_ts()
+
+ # NOTE(prashanthr_): This will not be an infinite loop.
+ while True:
+ msg_keys = self._client.zrange(msgset_key, marker,
+ marker + limit)
+ if msg_keys:
+ messages = [Message.from_redis(self._client.hgetall(msg_key))
+ for msg_key in msg_keys]
+
+ for msg in messages:
+ if not utils.msg_claimed_filter(msg, now):
+ return msg.id
+ else:
+ return None
+
+ def _exists(self, key):
+ """Check if message exists in the Queue.
+
+ Helper function which checks if a particular message_id
+ exists in the sorted set of the queues message ids.
+ """
+ return self._client.exists(key)
+
+ def _get_first_message_id(self, queue, project, sort):
+ """Fetch head/tail of the Queue.
+
+ Helper function to get the first message in the queue
+ sort > 0 get from the left else from the right.
+ """
+ msgset_key = utils.scope_message_ids_set(queue, project,
+ MESSAGE_IDS_SUFFIX)
+
+ sorter = self._client.zrange if sort == 1 else self._client.zrevrange
+ message_ids = sorter(msgset_key, 0, 0)
+ return message_ids[0] if message_ids else None
+
+ def _get(self, message_id):
+ msg = self._client.hgetall(message_id)
+ return Message.from_redis(msg) if msg else None
+
+ def _get_claim(self, message_id):
+ claim = self._client.hmget(message_id, 'c', 'c.e')
+
+ if claim == [None, None]:
+ # NOTE(kgriffs): message_id was not found
+ return None
+
+ return {
+ # NOTE(kgriffs): A "None" claim is serialized as an empty str
+ 'id': strutils.safe_decode(claim[0]) or None,
+ 'expires': int(claim[1]),
+ }
+
+ def _list(self, queue, project=None, marker=None,
+ limit=storage.DEFAULT_MESSAGES_PER_PAGE,
+ echo=False, client_uuid=None,
+ include_claimed=False, to_basic=True):
+
+ if not self._queue_ctrl.exists(queue, project):
+ raise errors.QueueDoesNotExist(queue,
+ project)
+
+ msgset_key = utils.scope_message_ids_set(queue,
+ project,
+ MESSAGE_IDS_SUFFIX)
+ client = self._client
+
+ with self._client.pipeline() as pipe:
+ # NOTE(prashanthr_): Iterate through the queue to find the first
+ # unclaimed message.
+ if not marker and not include_claimed:
+ marker = self._find_first_unclaimed(queue, project, limit)
+ start = client.zrank(msgset_key, marker) or 0
+ else:
+ rank = client.zrank(msgset_key, marker)
+ start = rank + 1 if rank else 0
+
+ message_ids = client.zrange(msgset_key, start,
+ start + (limit - 1))
+
+ for msg_id in message_ids:
+ pipe.hgetall(msg_id)
+
+ messages = pipe.execute()
+
+ # NOTE(prashanthr_): Build a list of filters for checking
+ # the following:
+ #
+ # 1. Message is expired
+ # 2. Message is claimed
+ # 3. Message should not be echoed
+ #
+ now = timeutils.utcnow_ts()
+ filters = [functools.partial(utils.msg_expired_filter, now=now)]
+
+ if not include_claimed:
+ filters.append(functools.partial(utils.msg_claimed_filter,
+ now=now))
+
+ if not echo:
+ filters.append(functools.partial(utils.msg_echo_filter,
+ client_uuid=client_uuid))
+
+ marker = {}
+
+ yield _filter_messages(messages, pipe, filters, to_basic, marker)
+ yield marker['next']
+
+ @utils.raises_conn_error
+ @utils.retries_on_connection_error
+ def list(self, queue, project=None, marker=None,
+ limit=storage.DEFAULT_MESSAGES_PER_PAGE,
+ echo=False, client_uuid=None,
+ include_claimed=False):
+
+ return self._list(queue, project, marker, limit, echo,
+ client_uuid, include_claimed)
+
+ @utils.raises_conn_error
+ @utils.retries_on_connection_error
+ def first(self, queue, project=None, sort=1):
+ if sort not in (1, -1):
+ raise ValueError(u'sort must be either 1 (ascending) '
+ u'or -1 (descending)')
+
+ message_id = self._get_first_message_id(queue, project, sort)
+ if not message_id:
+ raise errors.QueueIsEmpty(queue, project)
+
+ now = timeutils.utcnow_ts()
+ return self._get(message_id).to_basic(now, include_created=True)
+
+ @utils.raises_conn_error
+ @utils.retries_on_connection_error
+ def get(self, queue, message_id, project=None):
+ if not self._queue_ctrl.exists(queue, project):
+ raise errors.QueueDoesNotExist(queue, project)
+
+ message = self._get(message_id)
+ now = timeutils.utcnow_ts()
+
+ if message and not utils.msg_expired_filter(message, now):
+ return message.to_basic(now)
+ else:
+ raise errors.MessageDoesNotExist(message_id, queue, project)
+
+ @utils.raises_conn_error
+ @utils.retries_on_connection_error
+ def bulk_get(self, queue, message_ids, project=None):
+ if not self._queue_ctrl.exists(queue, project):
+ raise errors.QueueDoesNotExist(queue, project)
+
+ # NOTE(prashanthr_): Pipelining is used here purely
+ # for performance.
+ with self._client.pipeline() as pipe:
+ for mid in message_ids:
+ pipe.hgetall(mid)
+
+ messages = pipe.execute()
+
+ # NOTE(kgriffs): Skip messages that may have been deleted
+ now = timeutils.utcnow_ts()
+ return (Message.from_redis(msg).to_basic(now)
+ for msg in messages if msg)
+
+ @utils.raises_conn_error
+ @utils.retries_on_connection_error
+ def post(self, queue, messages, client_uuid, project=None):
+ if not self._queue_ctrl.exists(queue, project):
+ raise errors.QueueDoesNotExist(queue, project)
+
+ msgset_key = utils.scope_message_ids_set(queue, project,
+ MESSAGE_IDS_SUFFIX)
+
+ counter_key = utils.scope_queue_index(queue, project,
+ MESSAGE_RANK_COUNTER_SUFFIX)
+
+ with self._client.pipeline() as pipe:
+
+ start_ts = timeutils.utcnow_ts()
+
+ # NOTE(kgriffs): Retry the operation if another transaction
+ # completes before this one, in which case it may have
+ # posted messages with the same rank counter the current
+ # thread is trying to use, which would cause messages
+ # to get out of order and introduce the risk of a client
+ # missing a message while reading from the queue.
+ #
+ # This loop will eventually time out if we can't manage to
+ # post any messages due to other threads continually beating
+ # us to the punch.
+
+ # TODO(kgriffs): Would it be beneficial (or harmful) to
+ # introducce a backoff sleep in between retries?
+ while (timeutils.utcnow_ts() - start_ts) < RETRY_POST_TIMEOUT:
+ now = timeutils.utcnow_ts()
+ prepared_messages = [
+ Message(
+ ttl=msg['ttl'],
+ created=now,
+ client_uuid=client_uuid,
+ claim_id=None,
+ claim_expires=now,
+ body=msg.get('body', {}),
+ )
+
+ for msg in messages
+ ]
+
+ try:
+ pipe.watch(counter_key)
+
+ rank_counter = pipe.get(counter_key)
+ rank_counter = int(rank_counter) if rank_counter else 0
+
+ pipe.multi()
+
+ keys = []
+ for i, msg in enumerate(prepared_messages):
+ msg.to_redis(pipe)
+ pipe.zadd(msgset_key, rank_counter + i, msg.id)
+ keys.append(msg.id)
+
+ pipe.incrby(counter_key, len(keys))
+ self._queue_ctrl._inc_counter(queue, project,
+ len(prepared_messages),
+ pipe=pipe)
+
+ pipe.execute()
+ return keys
+
+ except redis.exceptions.WatchError:
+ continue
+
+ raise errors.MessageConflict(queue, project)
+
+ @utils.raises_conn_error
+ @utils.retries_on_connection_error
+ def delete(self, queue, message_id, project=None, claim=None):
+ if not self._queue_ctrl.exists(queue, project):
+ raise errors.QueueDoesNotExist(queue, project)
+
+ # TODO(kgriffs): Create decorator for validating claim and message
+ # IDs, since those are not checked at the transport layer. This
+ # decorator should be applied to all relevant methods.
+ if claim is not None:
+ try:
+ uuid.UUID(claim)
+ except ValueError:
+ raise errors.ClaimDoesNotExist(queue, project, claim)
+
+ msg_claim = self._get_claim(message_id)
+
+ # NOTE(kgriffs): The message does not exist, so
+ # it is essentially "already deleted".
+ if msg_claim is None:
+ return
+
+ now = timeutils.utcnow_ts()
+ is_claimed = msg_claim['id'] and (now < msg_claim['expires'])
+
+ if claim is None:
+ if is_claimed:
+ raise errors.MessageIsClaimed(message_id)
+
+ elif not is_claimed:
+ raise errors.MessageNotClaimed(message_id)
+
+ elif msg_claim['id'] != claim:
+ if not self._claim_ctrl._exists(queue, claim, project):
+ raise errors.ClaimDoesNotExist(queue, project, claim)
+
+ raise errors.MessageNotClaimedBy(message_id, claim)
+
+ msgset_key = utils.scope_message_ids_set(queue, project,
+ MESSAGE_IDS_SUFFIX)
+ with self._client.pipeline() as pipe:
+ results = pipe.delete(message_id).zrem(msgset_key,
+ message_id).execute()
+
+ # NOTE(prashanthr_): results[0] is 1 when the delete is
+ # successful. Hence we use that case to identify successful
+ # deletes.
+ if results[0] == 1:
+ self._queue_ctrl._inc_counter(queue, project, -1)
+
+ @utils.raises_conn_error
+ @utils.retries_on_connection_error
+ def bulk_delete(self, queue, message_ids, project=None):
+ if not self._queue_ctrl.exists(queue, project):
+ raise errors.QueueDoesNotExist(queue,
+ project)
+
+ msgset_key = utils.scope_message_ids_set(queue, project,
+ MESSAGE_IDS_SUFFIX)
+
+ with self._client.pipeline() as pipe:
+ for message_id in message_ids:
+ pipe.delete(message_id).zrem(msgset_key, message_id)
+
+ results = pipe.execute()
+
+ # NOTE(prashanthr_): None is returned for the cases where
+ # the message might not exist or has been deleted/expired.
+ # Hence we calculate the number of deletes as the
+ # total number of message ids - number of failed deletes.
+ amount = -1 * (len(results) - results.count(0)) / 2
+ self._queue_ctrl._inc_counter(queue, project, int(amount))
+
+ @utils.raises_conn_error
+ @utils.retries_on_connection_error
+ def pop(self, queue, limit, project=None):
+ # Pop is implemented as a chain of the following operations:
+ # 1. Create a claim.
+ # 2. Delete the messages claimed.
+ # 3. Delete the claim.
+
+ claim_id, messages = self._claim_ctrl.create(
+ queue, dict(ttl=1, grace=0), project, limit=limit)
+
+ message_ids = [message['id'] for message in messages]
+ self.bulk_delete(queue, message_ids, project)
+ # NOTE(prashanthr_): Creating a claim controller reference
+ # causes a recursive reference. Hence, using the reference
+ # from the driver.
+ self._claim_ctrl.delete(queue, claim_id, project)
+ return messages
+
+
+def _filter_messages(messages, pipe, filters, to_basic, marker):
+ """Create a filtering iterator over a list of messages.
+
+ The function accepts a list of filters to be filtered
+ before the the message can be included as a part of the reply.
+ """
+ now = timeutils.utcnow_ts()
+
+ for msg in messages:
+ # NOTE(kgriffs): Message may have been deleted, so
+ # check each value to ensure we got a message back
+ if msg:
+ msg = Message.from_redis(msg)
+
+ # NOTE(kgriffs): Check to see if any of the filters
+ # indiciate that this message should be skipped.
+ for should_skip in filters:
+ if should_skip(msg):
+ break
+ else:
+ marker['next'] = msg.id
+
+ if to_basic:
+ yield msg.to_basic(now)
+ else:
+ yield msg
diff --git a/zaqar/queues/storage/redis/models.py b/zaqar/queues/storage/redis/models.py
new file mode 100644
index 000000000..74f9bc612
--- /dev/null
+++ b/zaqar/queues/storage/redis/models.py
@@ -0,0 +1,124 @@
+# Copyright (c) 2014 Prashanth Raghu.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import functools
+import uuid
+
+import msgpack
+
+from zaqar.openstack.common import strutils
+from zaqar.openstack.common import timeutils
+
+
+_pack = msgpack.Packer(encoding='utf-8', use_bin_type=True).pack
+_unpack = functools.partial(msgpack.unpackb, encoding='utf-8')
+
+
+# TODO(kgriffs): Make similar classes for claims and queues
+class Message(object):
+ """Message is used to organize,store and retrieve messages from redis.
+
+ Message class helps organize,store and retrieve messages in a version
+ compatible manner.
+
+ :param id: Message ID in the form of a hexadecimal UUID. If not
+ given, one will be automatically generated.
+ :param ttl: Message TTL in seconds
+ :param created: Message creation time as a UNIX timestamp
+ :param client_uuid: UUID of the client that posted the message
+ :param claim_id: If claimed, the UUID of the claim. Set to None
+ for messages that have never been claimed.
+ :param claim_expires: Claim expiration as a UNIX timestamp
+ :param body: Message payload. Must be serializable to mspack.
+ """
+ message_data = {}
+
+ __slots__ = (
+ 'id',
+ 'ttl',
+ 'created',
+ 'expires',
+ 'client_uuid',
+ 'claim_id',
+ 'claim_expires',
+ 'body',
+ )
+
+ def __init__(self, **kwargs):
+ self.id = kwargs.get('id', str(uuid.uuid4()))
+ self.ttl = kwargs['ttl']
+ self.created = kwargs['created']
+ self.expires = kwargs.get('expires', self.created + self.ttl)
+
+ self.client_uuid = str(kwargs['client_uuid'])
+
+ self.claim_id = kwargs.get('claim_id')
+ self.claim_expires = kwargs['claim_expires']
+
+ self.body = kwargs['body']
+
+ @property
+ def created_iso(self):
+ return timeutils.iso8601_from_timestamp(self.created)
+
+ @staticmethod
+ def from_redis(doc):
+ claim_id = doc[b'c']
+ if claim_id:
+ claim_id = strutils.safe_decode(claim_id)
+ else:
+ claim_id = None
+
+ # NOTE(kgriffs): Under Py3K, redis-py converts all strings
+ # into binary. Woohoo!
+ return Message(
+ id=strutils.safe_decode(doc[b'id']),
+ ttl=int(doc[b't']),
+ created=int(doc[b'cr']),
+ expires=int(doc[b'e']),
+
+ client_uuid=strutils.safe_decode(doc[b'u']),
+
+ claim_id=claim_id,
+ claim_expires=int(doc[b'c.e']),
+
+ body=_unpack(doc[b'b']),
+ )
+
+ def to_redis(self, pipe):
+ doc = {
+ 'id': self.id,
+ 't': self.ttl,
+ 'cr': self.created,
+ 'e': self.expires,
+ 'u': self.client_uuid,
+ 'c': self.claim_id or '',
+ 'c.e': self.claim_expires,
+ 'b': _pack(self.body),
+ }
+
+ pipe.hmset(self.id, doc)
+
+ def to_basic(self, now, include_created=False):
+ basic_msg = {
+ 'id': self.id,
+ 'age': now - self.created,
+ 'ttl': self.ttl,
+ 'body': self.body
+ }
+
+ if include_created:
+ basic_msg['created'] = self.created_iso
+
+ return basic_msg
diff --git a/zaqar/queues/storage/redis/options.py b/zaqar/queues/storage/redis/options.py
new file mode 100644
index 000000000..f768f8cf7
--- /dev/null
+++ b/zaqar/queues/storage/redis/options.py
@@ -0,0 +1,40 @@
+# Copyright (c) 2014 Prashanth Raghu.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Redis storage driver configuration options."""
+
+from oslo.config import cfg
+
+
+REDIS_OPTIONS = (
+ cfg.StrOpt('uri', default="redis://127.0.0.1:6379",
+ help=('Redis Server URI. Can also use a '
+ 'socket file based connector. '
+ 'Ex: redis:/tmp/redis.sock')),
+
+ cfg.IntOpt('max_reconnect_attempts', default=10,
+ help=('Maximum number of times to retry an operation that '
+ 'failed due to a redis node failover.')),
+
+ cfg.FloatOpt('reconnect_sleep', default=1,
+ help=('Base sleep interval between attempts to reconnect '
+ 'after a redis node failover. '))
+
+)
+
+REDIS_GROUP = 'drivers:storage:redis'
+
+
+def _config_options():
+ return [(REDIS_GROUP, REDIS_OPTIONS)]
diff --git a/zaqar/queues/storage/redis/queues.py b/zaqar/queues/storage/redis/queues.py
new file mode 100644
index 000000000..61098698d
--- /dev/null
+++ b/zaqar/queues/storage/redis/queues.py
@@ -0,0 +1,253 @@
+# Copyright (c) 2014 Prashanth Raghu.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import functools
+
+import msgpack
+import redis
+
+from zaqar.common import decorators
+from zaqar.openstack.common import log as logging
+from zaqar.openstack.common import timeutils
+from zaqar.queues import storage
+from zaqar.queues.storage import errors
+from zaqar.queues.storage.redis import messages
+from zaqar.queues.storage.redis import utils
+
+LOG = logging.getLogger(__name__)
+
+QUEUES_SET_STORE_NAME = 'queues_set'
+MESSAGE_IDS_SUFFIX = 'messages'
+
+
+class QueueController(storage.Queue):
+ """Implements queue resource operations using Redis.
+
+ Queues are scoped by project, which is prefixed to the
+ queue name.
+
+ Queues (Redis sorted set):
+
+ Key: queues_set
+
+ Id Value
+ ---------------------------------
+ name ->
+
+
+ The set helps faster existence checks, while the list helps
+ paginated retrieval of queues.
+
+ Queue Information (Redis hash):
+
+ Key:
+
+ Name Field
+ -------------------------------
+ count -> c
+ num_msgs_claimed -> cl
+ metadata -> m
+ creation timestamp -> t
+ """
+
+ def __init__(self, *args, **kwargs):
+ super(QueueController, self).__init__(*args, **kwargs)
+ self._client = self.driver.connection
+ self._packer = msgpack.Packer(encoding='utf-8',
+ use_bin_type=True).pack
+ self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8')
+
+ @decorators.lazy_property(write=False)
+ def _message_ctrl(self):
+ return self.driver.message_controller
+
+ def _claim_counter_key(self, name, project):
+ return utils.scope_queue_name(name, project)
+
+ def _inc_counter(self, name, project, amount=1, pipe=None):
+ queue_key = utils.scope_queue_name(name, project)
+
+ client = pipe if pipe is not None else self._client
+ client.hincrby(queue_key, 'c', amount)
+
+ def _inc_claimed(self, name, project, amount=1, pipe=None):
+ queue_key = utils.scope_queue_name(name, project)
+
+ client = pipe if pipe is not None else self._client
+ client.hincrby(queue_key, 'cl', amount)
+
+ # TODO(kgriffs): Reimplement in Lua; this is way too expensive!
+ def _get_expired_message_count(self, name, project):
+ """Calculate the number of expired messages in the queue.
+
+ Used to compute the stats on the queue.
+ Method has O(n) complexity as we iterate the entire list of
+ messages.
+ """
+
+ messages_set_key = utils.scope_message_ids_set(name, project,
+ MESSAGE_IDS_SUFFIX)
+
+ with self._client.pipeline() as pipe:
+ for msg_key in self._client.zrange(messages_set_key, 0, -1):
+ pipe.hgetall(msg_key)
+
+ raw_messages = pipe.execute()
+
+ expired = 0
+ now = timeutils.utcnow_ts()
+
+ for msg in raw_messages:
+ if msg:
+ msg = messages.Message.from_redis(msg)
+ if utils.msg_expired_filter(msg, now):
+ expired += 1
+
+ return expired
+
+ def _get_queue_info(self, queue_key, fields, transform=str):
+ """Get one or more fields from Queue Info."""
+
+ values = self._client.hmget(queue_key, fields)
+ return [transform(v) for v in values] if transform else values
+
+ @utils.raises_conn_error
+ @utils.retries_on_connection_error
+ def list(self, project=None, marker=None,
+ limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
+ client = self._client
+ qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
+ marker = utils.scope_queue_name(marker, project)
+ rank = client.zrank(qset_key, marker)
+ start = rank + 1 if rank else 0
+
+ cursor = (q for q in client.zrange(qset_key, start,
+ start + limit - 1))
+ marker_next = {}
+
+ def denormalizer(info, name):
+ queue = {'name': utils.descope_queue_name(name)}
+ marker_next['next'] = queue['name']
+ if detailed:
+ queue['metadata'] = info[1]
+
+ return queue
+
+ yield utils.QueueListCursor(self._client, cursor, denormalizer)
+ yield marker_next and marker_next['next']
+
+ def get(self, name, project=None):
+ """Obtain the metadata from the queue."""
+ return self.get_metadata(name, project)
+
+ @utils.raises_conn_error
+ def create(self, name, metadata=None, project=None):
+ # TODO(prashanthr_): Implement as a lua script.
+ queue_key = utils.scope_queue_name(name, project)
+ qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
+
+ # Check if the queue already exists.
+ if self.exists(name, project):
+ return False
+
+ queue = {
+ 'c': 0,
+ 'cl': 0,
+ 'm': self._packer(metadata or {}),
+ 't': timeutils.utcnow_ts()
+ }
+
+ # Pipeline ensures atomic inserts.
+ with self._client.pipeline() as pipe:
+ pipe.zadd(qset_key, 1, queue_key).hmset(queue_key, queue)
+
+ try:
+ pipe.execute()
+ except redis.exceptions.ResponseError:
+ return False
+
+ return True
+
+ @utils.raises_conn_error
+ @utils.retries_on_connection_error
+ def exists(self, name, project=None):
+ # TODO(prashanthr_): Cache this lookup
+ queue_key = utils.scope_queue_name(name, project)
+ qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
+
+ return self._client.zrank(qset_key, queue_key) is not None
+
+ @utils.raises_conn_error
+ @utils.retries_on_connection_error
+ def set_metadata(self, name, metadata, project=None):
+ if not self.exists(name, project):
+ raise errors.QueueDoesNotExist(name, project)
+
+ key = utils.scope_queue_name(name, project)
+ fields = {'m': self._packer(metadata)}
+
+ self._client.hmset(key, fields)
+
+ @utils.raises_conn_error
+ @utils.retries_on_connection_error
+ def get_metadata(self, name, project=None):
+ if not self.exists(name, project):
+ raise errors.QueueDoesNotExist(name, project)
+
+ queue_key = utils.scope_queue_name(name, project)
+ metadata = self._get_queue_info(queue_key, b'm', None)[0]
+
+ return self._unpacker(metadata)
+
+ @utils.raises_conn_error
+ @utils.retries_on_connection_error
+ def delete(self, name, project=None):
+ queue_key = utils.scope_queue_name(name, project)
+ qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
+
+ # NOTE(prashanthr_): Pipelining is used to mitigate race conditions
+ with self._client.pipeline() as pipe:
+ pipe.zrem(qset_key, queue_key)
+ pipe.delete(queue_key)
+ self._message_ctrl._delete_queue_messages(name, project, pipe)
+
+ pipe.execute()
+
+ @utils.raises_conn_error
+ @utils.retries_on_connection_error
+ def stats(self, name, project=None):
+ if not self.exists(name, project=project):
+ raise errors.QueueDoesNotExist(name, project)
+
+ queue_key = utils.scope_queue_name(name, project)
+
+ claimed, total = self._get_queue_info(queue_key, [b'cl', b'c'], int)
+ expired = self._get_expired_message_count(name, project)
+
+ message_stats = {
+ 'claimed': claimed,
+ 'free': total - claimed - expired,
+ 'total': total
+ }
+
+ try:
+ newest = self._message_ctrl.first(name, project, -1)
+ oldest = self._message_ctrl.first(name, project, 1)
+ except errors.QueueIsEmpty:
+ pass
+ else:
+ message_stats['newest'] = newest
+ message_stats['oldest'] = oldest
+
+ return {'messages': message_stats}
diff --git a/zaqar/queues/storage/redis/utils.py b/zaqar/queues/storage/redis/utils.py
new file mode 100644
index 000000000..bb6d58e86
--- /dev/null
+++ b/zaqar/queues/storage/redis/utils.py
@@ -0,0 +1,193 @@
+# Copyright (c) 2014 Prashanth Raghu.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import functools
+import sys
+import time
+import uuid
+
+import redis
+import six
+
+from zaqar.i18n import _
+from zaqar.openstack.common import log as logging
+from zaqar.openstack.common import strutils
+from zaqar.queues.storage import errors
+
+LOG = logging.getLogger(__name__)
+
+
+def descope_queue_name(scoped_name):
+ """Descope Queue name with '.'.
+
+ Returns the queue name from the scoped name
+ which is of the form project-id.queue-name
+ """
+
+ return scoped_name.split('.')[1]
+
+
+def normalize_none_str(string_or_none):
+ """Returns '' IFF given value is None, passthrough otherwise.
+
+ This function normalizes None to the empty string to facilitate
+ string concatenation when a variable could be None.
+ """
+
+ # TODO(prashanthr_) : Try to reuse this utility. Violates DRY
+ return '' if string_or_none is None else string_or_none
+
+
+def generate_uuid():
+ return str(uuid.uuid4())
+
+
+def scope_queue_name(queue=None, project=None):
+ """Returns a scoped name for a queue based on project and queue.
+
+ If only the project name is specified, a scope signifying "all queues"
+ for that project is returned. If neither queue nor project are
+ specified, a scope for "all global queues" is returned, which
+ is to be interpreted as excluding queues scoped by project.
+
+ :returns: '{project}.{queue}' if project and queue are given,
+ '{project}.' if ONLY project is given, '.{queue}' if ONLY
+ queue is given, and '.' if neither are given.
+ """
+
+ # TODO(prashanthr_) : Try to reuse this utility. Violates DRY
+ return normalize_none_str(project) + '.' + normalize_none_str(queue)
+
+# NOTE(prashanthr_): Aliase the scope_queue_name function
+# to be used in the pools and claims controller as similar
+# functionality is required to scope redis id's.
+scope_pool_catalogue = scope_claim_messages = scope_queue_name
+
+
+def scope_message_ids_set(queue=None, project=None, message_suffix=''):
+ """Scope messages set with '.'
+
+ Returns a scoped name for the list of messages in the form
+ project-id_queue-name_suffix
+ """
+
+ return (normalize_none_str(project) + '.' +
+ normalize_none_str(queue) + '.' +
+ message_suffix)
+
+# NOTE(prashanthr_): Aliasing the scope_message_ids_set function
+# to be used in the pools and claims controller as similar
+# functionality is required to scope redis id's.
+scope_queue_catalogue = scope_claims_set = scope_message_ids_set
+scope_queue_index = scope_message_ids_set
+
+
+def raises_conn_error(func):
+ """Handles the Redis ConnectionFailure error.
+
+ This decorator catches Redis's ConnectionError
+ and raises Marconi's ConnectionError instead.
+ """
+
+ # Note(prashanthr_) : Try to reuse this utility. Violates DRY
+ # Can pass exception type into the decorator and create a
+ # storage level utility.
+
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except redis.exceptions.ConnectionError as ex:
+ LOG.exception(ex)
+ raise errors.ConnectionError()
+
+ return wrapper
+
+
+def retries_on_connection_error(func):
+ """Causes the wrapped function to be re-called on ConnectionError.
+
+ This decorator catches Redis ConnectionError and retries
+ the function call.
+
+ .. Note::
+ Assumes that the decorated function has defined self.driver.redis_cinf
+ so that `max_reconnect_attempts` and `reconnect_sleep` can be taken
+ into account.
+
+ .. Warning:: The decorated function must be idempotent.
+ """
+
+ @functools.wraps(func)
+ def wrapper(self, *args, **kwargs):
+ # TODO(prashanthr_) : Try to reuse this utility. Violates DRY
+ # Can pass config parameters into the decorator and create a
+ # storage level utility.
+
+ max_attemps = self.driver.redis_conf.max_reconnect_attempts
+ sleep_sec = self.driver.redis_conf.reconnect_sleep
+
+ for attempt in range(max_attemps):
+ try:
+ return func(self, *args, **kwargs)
+ except redis.exceptions.ConnectionError:
+ ex = sys.exc_info()[1]
+ LOG.warn(_(u'Caught ConnectionError, retrying the '
+ 'call to {0}').format(func))
+
+ time.sleep(sleep_sec * (2 ** attempt))
+ else:
+ LOG.error(_(u'Caught ConnectionError, maximum attempts '
+ 'to {0} exceeded.').format(func))
+ raise ex
+
+ return wrapper
+
+
+def msg_claimed_filter(message, now):
+ """Return True IFF the message is currently claimed."""
+
+ return message.claim_id and (now < message.claim_expires)
+
+
+def msg_echo_filter(message, client_uuid):
+ """Return True IFF the specified client posted the message."""
+
+ return message.client_uuid == six.text_type(client_uuid)
+
+
+def msg_expired_filter(message, now):
+ """Return True IFF the message has expired."""
+
+ return message.expires <= now
+
+
+class QueueListCursor(object):
+
+ def __init__(self, client, queues, denormalizer):
+ self.queue_iter = queues
+ self.denormalizer = denormalizer
+ self.client = client
+
+ def __iter__(self):
+ return self
+
+ @raises_conn_error
+ def next(self):
+ curr = next(self.queue_iter)
+ queue = self.client.hmget(curr, ['c', 'm'])
+ return self.denormalizer(queue, strutils.safe_decode(curr))
+
+ def __next__(self):
+ return self.next()
\ No newline at end of file
diff --git a/zaqar/queues/storage/utils.py b/zaqar/queues/storage/utils.py
index d01e23b3d..1f5bf4c63 100644
--- a/zaqar/queues/storage/utils.py
+++ b/zaqar/queues/storage/utils.py
@@ -120,7 +120,7 @@ def keyify(key, iterable):
def can_connect(uri):
- """Given a URI, verifies whether its possible to connect to it.
+ """Given a URI, verifies whether it's possible to connect to it.
:param uri: connection string to a storage endpoint
:type uri: six.text_type
diff --git a/zaqar/queues/transport/wsgi/v1_0/messages.py b/zaqar/queues/transport/wsgi/v1_0/messages.py
index 2cfb8b3c5..d9fdaa46b 100644
--- a/zaqar/queues/transport/wsgi/v1_0/messages.py
+++ b/zaqar/queues/transport/wsgi/v1_0/messages.py
@@ -169,13 +169,8 @@ class CollectionResource(object):
except storage_errors.MessageConflict as ex:
LOG.exception(ex)
- message_ids = ex.succeeded_ids
-
- if not message_ids:
- # TODO(kgriffs): Include error code that is different
- # from the code used in the generic case, below.
- description = _(u'No messages could be enqueued.')
- raise wsgi_errors.HTTPServiceUnavailable(description)
+ description = _(u'No messages could be enqueued.')
+ raise wsgi_errors.HTTPServiceUnavailable(description)
except Exception as ex:
LOG.exception(ex)
diff --git a/zaqar/queues/transport/wsgi/v1_1/messages.py b/zaqar/queues/transport/wsgi/v1_1/messages.py
index bd9514ad8..5a3a878a0 100644
--- a/zaqar/queues/transport/wsgi/v1_1/messages.py
+++ b/zaqar/queues/transport/wsgi/v1_1/messages.py
@@ -190,13 +190,8 @@ class CollectionResource(object):
except storage_errors.MessageConflict as ex:
LOG.exception(ex)
- message_ids = ex.succeeded_ids
-
- if not message_ids:
- # TODO(kgriffs): Include error code that is different
- # from the code used in the generic case, below.
- description = _(u'No messages could be enqueued.')
- raise wsgi_errors.HTTPServiceUnavailable(description)
+ description = _(u'No messages could be enqueued.')
+ raise wsgi_errors.HTTPServiceUnavailable(description)
except Exception as ex:
LOG.exception(ex)
diff --git a/zaqar/tests/__init__.py b/zaqar/tests/__init__.py
index 049642c98..07ed21229 100644
--- a/zaqar/tests/__init__.py
+++ b/zaqar/tests/__init__.py
@@ -25,4 +25,5 @@ RUN_SLOW_TESTS = not SKIP_SLOW_TESTS
expect = helpers.expect
is_slow = helpers.is_slow
requires_mongodb = helpers.requires_mongodb
+requires_redis = helpers.requires_redis
TestBase = base.TestBase
diff --git a/zaqar/tests/helpers.py b/zaqar/tests/helpers.py
index 5424bbb9a..032c01732 100644
--- a/zaqar/tests/helpers.py
+++ b/zaqar/tests/helpers.py
@@ -24,6 +24,7 @@ import testtools
SKIP_SLOW_TESTS = os.environ.get('ZAQAR_TEST_SLOW') is None
SKIP_MONGODB_TESTS = os.environ.get('ZAQAR_TEST_MONGODB') is None
+SKIP_REDIS_TESTS = os.environ.get('ZAQAR_TEST_REDIS') is None
@contextlib.contextmanager
@@ -205,6 +206,22 @@ def requires_mongodb(test_case):
return testtools.skipIf(SKIP_MONGODB_TESTS, reason)(test_case)
+def requires_redis(test_case):
+ """Decorator to flag a test case as being dependent on Redis.
+
+ Redis-specific tests will be skipped unless the MARCONI_TEST_REDIS
+ environment variable is set. If the variable is set, the tests will
+ assume that redis is running and listening on localhost.
+ """
+
+ reason = ('Skipping tests that require Redis. Ensure '
+ 'Redis is running on localhost and then set '
+ 'ZAQAR_TEST_REDIS in order to enable tests '
+ 'that are specific to this storage backend. ')
+
+ return testtools.skipIf(SKIP_REDIS_TESTS, reason)(test_case)
+
+
def is_slow(condition=lambda self: True):
"""Decorator to flag slow tests.
diff --git a/zaqar/tests/queues/storage/base.py b/zaqar/tests/queues/storage/base.py
index 21622e7e0..172a543c0 100644
--- a/zaqar/tests/queues/storage/base.py
+++ b/zaqar/tests/queues/storage/base.py
@@ -226,6 +226,119 @@ class QueueControllerTest(ControllerBaseTest):
self.assertNotIn('newest', message_stats)
self.assertNotIn('oldest', message_stats)
+ def test_queue_count_on_bulk_delete(self):
+ self.addCleanup(self.controller.delete, 'test-queue',
+ project=self.project)
+ queue_name = 'test-queue'
+ client_uuid = uuid.uuid4()
+
+ created = self.controller.create(queue_name, project=self.project)
+ self.assertTrue(created)
+
+ # Create 10 messages.
+ msg_keys = _insert_fixtures(self.message_controller, queue_name,
+ project=self.project,
+ client_uuid=client_uuid, num=10)
+
+ stats = self.controller.stats(queue_name,
+ self.project)['messages']
+ self.assertEqual(stats['total'], 10)
+
+ # Delete 5 messages
+ self.message_controller.bulk_delete(queue_name, msg_keys[0:5],
+ self.project)
+
+ stats = self.controller.stats(queue_name,
+ self.project)['messages']
+ self.assertEqual(stats['total'], 5)
+
+ def test_queue_count_on_bulk_delete_with_invalid_id(self):
+ self.addCleanup(self.controller.delete, 'test-queue',
+ project=self.project)
+ queue_name = 'test-queue'
+ client_uuid = uuid.uuid4()
+
+ created = self.controller.create(queue_name, project=self.project)
+ self.assertTrue(created)
+
+ # Create 10 messages.
+ msg_keys = _insert_fixtures(self.message_controller, queue_name,
+ project=self.project,
+ client_uuid=client_uuid, num=10)
+
+ stats = self.controller.stats(queue_name,
+ self.project)['messages']
+ self.assertEqual(stats['total'], 10)
+
+ # Delete 5 messages
+ self.message_controller.bulk_delete(queue_name,
+ msg_keys[0:5] + ['invalid'],
+ self.project)
+
+ stats = self.controller.stats(queue_name,
+ self.project)['messages']
+ self.assertEqual(stats['total'], 5)
+
+ def test_queue_count_on_delete(self):
+ self.addCleanup(self.controller.delete, 'test-queue',
+ project=self.project)
+ queue_name = 'test-queue'
+ client_uuid = uuid.uuid4()
+
+ created = self.controller.create(queue_name, project=self.project)
+ self.assertTrue(created)
+
+ # Create 10 messages.
+ msg_keys = _insert_fixtures(self.message_controller, queue_name,
+ project=self.project,
+ client_uuid=client_uuid, num=10)
+
+ stats = self.controller.stats(queue_name,
+ self.project)['messages']
+ self.assertEqual(stats['total'], 10)
+
+ # Delete 1 message
+ self.message_controller.delete(queue_name, msg_keys[0],
+ self.project)
+ stats = self.controller.stats(queue_name,
+ self.project)['messages']
+ self.assertEqual(stats['total'], 9)
+
+ def test_queue_count_on_claim_delete(self):
+ self.addCleanup(self.controller.delete, 'test-queue',
+ project=self.project)
+ queue_name = 'test-queue'
+ client_uuid = uuid.uuid4()
+
+ created = self.controller.create(queue_name, project=self.project)
+ self.assertTrue(created)
+
+ # Create 15 messages.
+ _insert_fixtures(self.message_controller, queue_name,
+ project=self.project,
+ client_uuid=client_uuid, num=15)
+
+ stats = self.controller.stats(queue_name,
+ self.project)['messages']
+ self.assertEqual(stats['total'], 15)
+
+ metadata = {'ttl': 120, 'grace': 60}
+ # Claim 10 messages
+ claim_id, _ = self.claim_controller.create(queue_name, metadata,
+ self.project)
+
+ stats = self.controller.stats(queue_name,
+ self.project)['messages']
+ self.assertEqual(stats['claimed'], 10)
+
+ # Delete the claim
+ self.claim_controller.delete(queue_name, claim_id,
+ self.project)
+ stats = self.controller.stats(queue_name,
+ self.project)['messages']
+
+ self.assertEqual(stats['claimed'], 0)
+
class MessageControllerTest(ControllerBaseTest):
"""Message Controller base tests.
@@ -1164,5 +1277,5 @@ def _insert_fixtures(controller, queue_name, project=None,
'event': 'Event number {0}'.format(n)
}}
- controller.post(queue_name, messages(),
- project=project, client_uuid=client_uuid)
+ return controller.post(queue_name, messages(),
+ project=project, client_uuid=client_uuid)