diff --git a/MANIFEST.in b/MANIFEST.in index a17ee1f02..24294fd6e 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,3 +1,5 @@ +recursive-include zaqar *.lua + exclude .gitignore exclude .gitreview diff --git a/tests/unit/queues/storage/test_impl_redis.py b/tests/unit/queues/storage/test_impl_redis.py index ec4dc4f30..1ea834832 100644 --- a/tests/unit/queues/storage/test_impl_redis.py +++ b/tests/unit/queues/storage/test_impl_redis.py @@ -175,6 +175,21 @@ class RedisDriverTest(testing.TestBase): self.assertTrue(isinstance(redis_driver.connection, redis.StrictRedis)) + def test_version_match(self): + cache = oslo_cache.get_cache() + + with mock.patch('redis.StrictRedis.info') as info: + info.return_value = {'redis_version': '2.4.6'} + self.assertRaises(RuntimeError, driver.DataDriver, + self.conf, cache) + + info.return_value = {'redis_version': '2.11'} + + try: + driver.DataDriver(self.conf, cache) + except RuntimeError: + self.fail('version match failed') + @testing.requires_redis class RedisQueuesTest(base.QueueControllerTest): diff --git a/zaqar/queues/storage/redis/claims.py b/zaqar/queues/storage/redis/claims.py index 3941fb09f..b465f41e6 100644 --- a/zaqar/queues/storage/redis/claims.py +++ b/zaqar/queues/storage/redis/claims.py @@ -16,13 +16,13 @@ import functools import msgpack from oslo.utils import timeutils -import redis from zaqar.common import decorators from zaqar.openstack.common import log as logging from zaqar.queues import storage from zaqar.queues.storage import errors from zaqar.queues.storage.redis import messages +from zaqar.queues.storage.redis import scripting from zaqar.queues.storage.redis import utils LOG = logging.getLogger(__name__) @@ -41,7 +41,7 @@ RETRY_CLAIM_TIMEOUT = 10 COUNTING_BATCH_SIZE = 100 -class ClaimController(storage.Claim): +class ClaimController(storage.Claim, scripting.Mixin): """Implements claim resource operations using Redis. Redis Data Structures: @@ -78,6 +78,8 @@ class ClaimController(storage.Claim): +----------------+---------+ """ + script_names = ['claim_messages'] + def __init__(self, *args, **kwargs): super(ClaimController, self).__init__(*args, **kwargs) self._client = self.driver.connection @@ -100,6 +102,17 @@ class ClaimController(storage.Claim): values = self._client.hmget(claim_id, fields) return [transform(v) for v in values] if transform else values + def _claim_messages(self, msgset_key, now, limit, + claim_id, claim_expires, msg_ttl, msg_expires): + + # NOTE(kgriffs): A watch on a pipe could also be used, but that + # is less efficient and predictable, based on our experience in + # having to do something similar in the MongoDB driver. + func = self._scripts['claim_messages'] + + args = [now, limit, claim_id, claim_expires, msg_ttl, msg_expires] + return func(keys=[msgset_key], args=args) + def _exists(self, queue, claim_id, project): client = self._client claims_set_key = utils.scope_claims_set(queue, project, @@ -238,134 +251,60 @@ class ClaimController(storage.Claim): claim_ttl = int(metadata.get('ttl', 60)) grace = int(metadata.get('grace', 60)) + + now = timeutils.utcnow_ts() msg_ttl = claim_ttl + grace + claim_expires = now + claim_ttl + msg_expires = claim_expires + grace claim_id = utils.generate_uuid() - claim_msgs_key = utils.scope_claim_messages(claim_id, - CLAIM_MESSAGES_SUFFIX) + claimed_msgs = [] - claims_set_key = utils.scope_claims_set(queue, project, - QUEUE_CLAIMS_SUFFIX) + # NOTE(kgriffs): Claim some messages + msgset_key = utils.msgset_key(queue, project) + claimed_ids = self._claim_messages(msgset_key, now, limit, + claim_id, claim_expires, + msg_ttl, msg_expires) - with self._client.pipeline() as pipe: - # NOTE(kgriffs): Retry the operation if another transaction - # completes before this one, in which case it will have - # claimed the same messages the current thread is trying - # to claim, and therefoe we must try for another batch. - # - # This loop will eventually time out if we can't manage to - # claim any messages due to other threads continually beating - # us to the punch. + if claimed_ids: + claimed_msgs = messages.Message.from_redis_bulk(claimed_ids, + self._client) + claimed_msgs = [msg.to_basic(now) for msg in claimed_msgs] - # TODO(kgriffs): Would it be beneficial (or harmful) to - # introducce a backoff sleep in between retries? + # NOTE(kgriffs): Perist claim records + with self._client.pipeline() as pipe: + claim_msgs_key = utils.scope_claim_messages( + claim_id, CLAIM_MESSAGES_SUFFIX) - start_ts = timeutils.utcnow_ts() - while (timeutils.utcnow_ts() - start_ts) < RETRY_CLAIM_TIMEOUT: + for mid in claimed_ids: + pipe.rpush(claim_msgs_key, mid) - # NOTE(kgriffs): The algorithm for claiming messages: + pipe.expire(claim_msgs_key, claim_ttl) + + claim_info = { + 'id': claim_id, + 't': claim_ttl, + 'e': claim_expires, + 'n': len(claimed_ids), + } + + pipe.hmset(claim_id, claim_info) + pipe.expire(claim_id, claim_ttl) + + # NOTE(kgriffs): Add the claim ID to a set so that + # existence checks can be performed quickly. This + # is also used as a watch key in order to gaurd + # against race conditions. # - # 1. Get a batch of messages that are currently active. - # 2. For each active message in the batch, extend its - # lifetime IFF it would otherwise expire before the - # claim itself does. - # 3. Associate the claim with each message - # 4. Create a claim record with details such as TTL - # and expiration time. - # 5. Add the claim's ID to a set to facilitate fast - # existence checks. + # A sorted set is used to facilitate cleaning + # up the IDs of expired claims. + claims_set_key = utils.scope_claims_set(queue, project, + QUEUE_CLAIMS_SUFFIX) - try: - # TODO(kgriffs): Is it faster/better to do this all - # in a Lua script instead of using an app-layer - # transaction? Lua requires Redis 2.6 or better. + pipe.zadd(claims_set_key, claim_expires, claim_id) + pipe.execute() - # NOTE(kgriffs): Abort the entire transaction if - # another request beats us to the punch. We detect - # this by putting a watch on the key that will have - # one of its fields updated as the final step of - # the transaction. - # - # No other request to list active messages can - # proceed while this current transaction is in - # progress; therefore, it is not possible for - # a different process to get some active messages - # while the pipeline commands have partway - # completed. Either the other process will query - # for active messages at the same moment as - # the current proc and get the exact same set, - # or its request will have to wait while the - # current process performs the transaction in - # its entirety. - pipe.watch(claims_set_key) - pipe.multi() - - results = self._message_ctrl._active( - queue, project=project, limit=limit) - - cursor = next(results) - msg_list = list(cursor) - num_messages = len(msg_list) - - # NOTE(kgriffs): If there are no active messages to - # claim, simply return an empty list. - if not msg_list: - return (None, iter([])) - - basic_messages = [] - - now = timeutils.utcnow_ts() - - claim_expires = now + claim_ttl - msg_expires = claim_expires + grace - - # Associate the claim with each message - for msg in msg_list: - msg.claim_id = claim_id - msg.claim_expires = claim_expires - - if _msg_would_expire(msg, msg_expires): - msg.ttl = msg_ttl - msg.expires = msg_expires - - pipe.rpush(claim_msgs_key, msg.id) - - # TODO(kgriffs): Rather than writing back the - # entire message, only set the fields that - # have changed. - msg.to_redis(pipe, include_body=False) - - basic_messages.append(msg.to_basic(now)) - - pipe.expire(claim_msgs_key, claim_ttl) - - # Create the claim - claim_info = { - 'id': claim_id, - 't': claim_ttl, - 'e': claim_expires, - 'n': num_messages, - } - - pipe.hmset(claim_id, claim_info) - pipe.expire(claim_id, claim_ttl) - - # NOTE(kgriffs): Add the claim ID to a set so that - # existence checks can be performed quickly. This - # is also used as a watch key in order to gaurd - # against race conditions. - # - # A sorted set is used to facilitate cleaning - # up the IDs of expired claims. - pipe.zadd(claims_set_key, claim_expires, claim_id) - pipe.execute() - - return claim_id, basic_messages - - except redis.exceptions.WatchError: - continue - - raise errors.ClaimConflict(queue, project) + return claim_id, claimed_msgs @utils.raises_conn_error @utils.retries_on_connection_error diff --git a/zaqar/queues/storage/redis/driver.py b/zaqar/queues/storage/redis/driver.py index cfa81cf07..91689eae5 100644 --- a/zaqar/queues/storage/redis/driver.py +++ b/zaqar/queues/storage/redis/driver.py @@ -16,6 +16,7 @@ import redis from six.moves import urllib from zaqar.common import decorators +from zaqar.i18n import _ from zaqar.openstack.common import log as logging from zaqar.queues import storage from zaqar.queues.storage.redis import controllers @@ -51,6 +52,13 @@ class DataDriver(storage.DataDriverBase): group=options.REDIS_GROUP) self.redis_conf = self.conf[options.REDIS_GROUP] + server_version = self.connection.info()['redis_version'] + if tuple(map(int, server_version.split('.'))) < (2, 6): + msg = _('The Redis driver requires redis-server>=2.6, ' + '%s found') % server_version + + raise RuntimeError(msg) + def is_alive(self): try: return self.connection.ping() diff --git a/zaqar/queues/storage/redis/messages.py b/zaqar/queues/storage/redis/messages.py index a7d7fd868..985afdf3d 100644 --- a/zaqar/queues/storage/redis/messages.py +++ b/zaqar/queues/storage/redis/messages.py @@ -29,7 +29,6 @@ Message = models.Message MessageEnvelope = models.MessageEnvelope -MESSAGE_IDS_SUFFIX = 'messages' MSGSET_INDEX_KEY = 'msgset_index' # The rank counter is an atomic index to rank messages @@ -112,14 +111,6 @@ class MessageController(storage.Message): def _claim_ctrl(self): return self.driver.claim_controller - def _active(self, queue_name, marker=None, echo=False, - client_uuid=None, project=None, - limit=None): - return self._list(queue_name, project=project, marker=marker, - echo=echo, client_uuid=client_uuid, - include_claimed=False, - limit=limit, to_basic=False) - def _count(self, queue, project): """Return total number of messages in a queue. @@ -127,22 +118,13 @@ class MessageController(storage.Message): they haven't been GC'd yet. This is done for performance. """ - msgset_key = utils.scope_message_ids_set(queue, project, - MESSAGE_IDS_SUFFIX) - - return self._client.zcard(msgset_key) + return self._client.zcard(utils.msgset_key(queue, project)) def _create_msgset(self, queue, project, pipe): - msgset_key = utils.scope_message_ids_set(queue, project, - MESSAGE_IDS_SUFFIX) - - pipe.zadd(MSGSET_INDEX_KEY, 1, msgset_key) + pipe.zadd(MSGSET_INDEX_KEY, 1, utils.msgset_key(queue, project)) def _delete_msgset(self, queue, project, pipe): - msgset_key = utils.scope_message_ids_set(queue, project, - MESSAGE_IDS_SUFFIX) - - pipe.zrem(MSGSET_INDEX_KEY, msgset_key) + pipe.zrem(MSGSET_INDEX_KEY, utils.msgset_key(queue, project)) @utils.raises_conn_error @utils.retries_on_connection_error @@ -154,8 +136,7 @@ class MessageController(storage.Message): executing the operation. """ client = self._client - msgset_key = utils.scope_message_ids_set(queue, project, - MESSAGE_IDS_SUFFIX) + msgset_key = utils.msgset_key(queue, project) message_ids = client.zrange(msgset_key, 0, -1) pipe.delete(msgset_key) @@ -166,8 +147,7 @@ class MessageController(storage.Message): def _find_first_unclaimed(self, queue, project, limit): """Find the first unclaimed message in the queue.""" - msgset_key = utils.scope_message_ids_set(queue, project, - MESSAGE_IDS_SUFFIX) + msgset_key = utils.msgset_key(queue, project) now = timeutils.utcnow_ts() # TODO(kgriffs): Generalize this paging pattern (DRY) @@ -198,8 +178,7 @@ class MessageController(storage.Message): Helper function to get the first message in the queue sort > 0 get from the left else from the right. """ - msgset_key = utils.scope_message_ids_set(queue, project, - MESSAGE_IDS_SUFFIX) + msgset_key = utils.msgset_key(queue, project) zrange = self._client.zrange if sort == 1 else self._client.zrevrange message_ids = zrange(msgset_key, 0, 0) @@ -242,9 +221,7 @@ class MessageController(storage.Message): raise errors.QueueDoesNotExist(queue, project) - msgset_key = utils.scope_message_ids_set(queue, - project, - MESSAGE_IDS_SUFFIX) + msgset_key = utils.msgset_key(queue, project) client = self._client if not marker and not include_claimed: @@ -419,9 +396,7 @@ class MessageController(storage.Message): if not self._queue_ctrl.exists(queue, project): raise errors.QueueDoesNotExist(queue, project) - msgset_key = utils.scope_message_ids_set(queue, project, - MESSAGE_IDS_SUFFIX) - + msgset_key = utils.msgset_key(queue, project) counter_key = utils.scope_queue_index(queue, project, MESSAGE_RANK_COUNTER_SUFFIX) @@ -518,8 +493,8 @@ class MessageController(storage.Message): raise errors.MessageNotClaimedBy(message_id, claim) - msgset_key = utils.scope_message_ids_set(queue, project, - MESSAGE_IDS_SUFFIX) + msgset_key = utils.msgset_key(queue, project) + with self._client.pipeline() as pipe: pipe.delete(message_id) pipe.zrem(msgset_key, message_id) @@ -538,8 +513,7 @@ class MessageController(storage.Message): raise errors.QueueDoesNotExist(queue, project) - msgset_key = utils.scope_message_ids_set(queue, project, - MESSAGE_IDS_SUFFIX) + msgset_key = utils.msgset_key(queue, project) with self._client.pipeline() as pipe: for mid in message_ids: diff --git a/zaqar/queues/storage/redis/scripting.py b/zaqar/queues/storage/redis/scripting.py new file mode 100644 index 000000000..0d343d952 --- /dev/null +++ b/zaqar/queues/storage/redis/scripting.py @@ -0,0 +1,40 @@ +# Copyright (c) 2014 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from zaqar.common import decorators + + +class Mixin(object): + script_names = [] + + @decorators.lazy_property(write=False) + def _scripts(self): + scripts = {} + + for name in self.script_names: + script = _read_script(name) + scripts[name] = self._client.register_script(script) + + return scripts + + +def _read_script(script_name): + folder = os.path.abspath(os.path.dirname(__file__)) + filename = os.path.join(folder, 'scripts', script_name + '.lua') + + with open(filename, 'r') as script_file: + return script_file.read() \ No newline at end of file diff --git a/zaqar/queues/storage/redis/scripts/claim_messages.lua b/zaqar/queues/storage/redis/scripts/claim_messages.lua new file mode 100644 index 000000000..fb1d36a3c --- /dev/null +++ b/zaqar/queues/storage/redis/scripts/claim_messages.lua @@ -0,0 +1,88 @@ +--[[ + +Copyright (c) 2014 Rackspace Hosting, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. +See the License for the specific language governing permissions and +limitations under the License. + +--]] + +-- Read params +local msgset_key = KEYS[1] + +local now = tonumber(ARGV[1]) +local limit = tonumber(ARGV[2]) +local claim_id = ARGV[3] +local claim_expires = tonumber(ARGV[4]) +local msg_ttl = tonumber(ARGV[5]) +local msg_expires = tonumber(ARGV[6]) + +-- Scan for up to 'limit' unclaimed messages +local BATCH_SIZE = 100 + +local start = 0 +local claimed_msgs = {} + +local found_unclaimed = false + +while (#claimed_msgs < limit) do + local stop = (start + BATCH_SIZE - 1) + local msg_ids = redis.call('ZRANGE', msgset_key, start, stop) + + if (#msg_ids == 0) then + break + end + + start = start + BATCH_SIZE + + -- TODO(kgriffs): Try moving claimed IDs to a different set + -- to avoid scanning through already-claimed messages. + for i, mid in ipairs(msg_ids) do + -- NOTE(kgriffs): Since execution of this script can not + -- happen in parallel, once we find the first unclaimed + -- message, the remaining messages will always be + -- unclaimed as well. + + if not found_unclaimed then + local msg = redis.call('HMGET', mid, 'c', 'c.e') + + if msg[1] == '' or tonumber(msg[2]) <= now then + found_unclaimed = true + end + end + + if found_unclaimed then + local msg_expires_prev = redis.call('HGET', mid, 'e') + + -- Found an unclaimed message, so claim it + redis.call('HMSET', mid, + 'c', claim_id, + 'c.e', claim_expires) + + -- Will the message expire early? + if tonumber(msg_expires_prev) < msg_expires then + redis.call('HMSET', mid, + 't', msg_ttl, + 'e', msg_expires) + end + + claimed_msgs[#claimed_msgs + 1] = mid + + if (#claimed_msgs == limit) then + break + end + end + end +end + +return claimed_msgs \ No newline at end of file diff --git a/zaqar/queues/storage/redis/utils.py b/zaqar/queues/storage/redis/utils.py index 2f68c0898..d663649cc 100644 --- a/zaqar/queues/storage/redis/utils.py +++ b/zaqar/queues/storage/redis/utils.py @@ -26,6 +26,7 @@ from zaqar.openstack.common import log as logging from zaqar.queues.storage import errors LOG = logging.getLogger(__name__) +MESSAGE_IDS_SUFFIX = 'messages' def descope_queue_name(scoped_name): @@ -104,6 +105,10 @@ scope_queue_catalogue = scope_claims_set = scope_message_ids_set scope_queue_index = scope_message_ids_set +def msgset_key(queue, project=None): + return scope_message_ids_set(queue, project, MESSAGE_IDS_SUFFIX) + + def raises_conn_error(func): """Handles the Redis ConnectionFailure error.