diff --git a/etc/zaqar.conf.sample b/etc/zaqar.conf.sample index 218be7f53..e19745d23 100644 --- a/etc/zaqar.conf.sample +++ b/etc/zaqar.conf.sample @@ -142,6 +142,26 @@ # Storage driver to use. (string value) #storage=sqlite +[drivers:storage:redis] + +# +# Options defined in zaqar.queues.storage.redis +# + +# Redis Server URI. Socket file based connectors are supported. +# Example for socket file connector: redis:/tmp/redis.sock' +#uri= + +# Maximum number of times to retry an operation that failed +# due to a primary node failover. (integer value) +#max_reconnect_attempts=10 + +# Base sleep interval between attempts to reconnect after a +# primary node failover. The actual sleep time increases +# exponentially (power of 2) each time the operation is +# retried. (floating point value) +#reconnect_sleep=0.02 + [drivers:storage:mongodb] diff --git a/setup.cfg b/setup.cfg index 7e507b675..2a1b3724d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -41,12 +41,14 @@ zaqar.queues.data.storage = sqlite = zaqar.queues.storage.sqlalchemy.driver:DataDriver sqlalchemy = zaqar.queues.storage.sqlalchemy.driver:DataDriver mongodb = zaqar.queues.storage.mongodb.driver:DataDriver + redis = zaqar.queues.storage.redis.driver:DataDriver faulty = zaqar.tests.faulty_storage:DataDriver zaqar.queues.control.storage = sqlite = zaqar.queues.storage.sqlalchemy.driver:ControlDriver sqlalchemy = zaqar.queues.storage.sqlalchemy.driver:ControlDriver mongodb = zaqar.queues.storage.mongodb.driver:ControlDriver + redis = zaqar.queues.storage.redis.driver:ControlDriver faulty = zaqar.tests.faulty_storage:ControlDriver zaqar.queues.transport = @@ -60,6 +62,7 @@ oslo.config.opts = zaqar.queues.storage.pipeline = zaqar.queues.storage.pipeline:_config_options zaqar.queues.storage.pooling = zaqar.queues.storage.pooling:_config_options zaqar.queues.storage.mongodb = zaqar.queues.storage.mongodb.options:_config_options + zaqar.queues.storage.redis = zaqar.queues.storage.redis.options:_config_option zaqar.queues.storage.sqlalchemy = zaqar.queues.storage.sqlalchemy.options:_config_options zaqar.queues.transport.wsgi = zaqar.queues.transport.wsgi.driver:_config_options zaqar.queues.transport.base = zaqar.queues.transport.base:_config_options diff --git a/tests/etc/wsgi_redis.conf b/tests/etc/wsgi_redis.conf new file mode 100644 index 000000000..5673aac6f --- /dev/null +++ b/tests/etc/wsgi_redis.conf @@ -0,0 +1,15 @@ +[DEFAULT] +debug = False +verbose = False + +[drivers] +transport = wsgi +storage = redis + +[drivers:transport:wsgi] +port = 8888 + +[drivers:storage:redis] +uri = redis://127.0.0.1:6379 +max_reconnect_attempts = 3 +reconnect_sleep = 1 \ No newline at end of file diff --git a/tests/etc/wsgi_redis_pooled.conf b/tests/etc/wsgi_redis_pooled.conf new file mode 100644 index 000000000..104e7e026 --- /dev/null +++ b/tests/etc/wsgi_redis_pooled.conf @@ -0,0 +1,11 @@ +[DEFAULT] +pooling = True + +[drivers] +transport = wsgi +storage = redis + +[drivers:storage:redis] +uri = redis://127.0.0.1:6379 +max_reconnect_attempts = 3 +reconnect_sleep = 1 \ No newline at end of file diff --git a/tests/unit/common/storage/test_utils.py b/tests/unit/common/storage/test_utils.py index 594ba5b5e..9285cd998 100644 --- a/tests/unit/common/storage/test_utils.py +++ b/tests/unit/common/storage/test_utils.py @@ -29,11 +29,15 @@ class TestUtils(testing.TestBase): def test_can_connect_suceeds_if_good_uri_sqlite(self): self.assertTrue(utils.can_connect('sqlite://:memory:')) - @ddt.data( - 'mongodb://localhost:27018', # wrong port - 'localhost:27017', # missing scheme - 'redis://localhost:6379' # not supported with default install - ) + def test_can_connect_fails_if_bad_uri_missing_schema(self): + self.assertFalse(utils.can_connect('localhost:27017')) + @testing.requires_mongodb - def test_can_connect_fails_if_bad_uri(self, uri): - self.assertFalse(utils.can_connect(uri)) + def test_can_connect_fails_if_bad_uri_mongodb(self): + self.assertFalse(utils.can_connect('mongodb://localhost:8080')) + self.assertFalse(utils.can_connect('mongodb://example.com:27017')) + + @testing.requires_redis + def test_can_connect_fails_if_bad_uri_redis(self): + self.assertFalse(utils.can_connect('redis://localhost:8080')) + self.assertFalse(utils.can_connect('redis://example.com:6379')) diff --git a/tests/unit/queues/storage/test_impl_redis.py b/tests/unit/queues/storage/test_impl_redis.py new file mode 100644 index 000000000..f2e2046fb --- /dev/null +++ b/tests/unit/queues/storage/test_impl_redis.py @@ -0,0 +1,277 @@ +# Copyright (c) 2014 Prashanth Raghu. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import time +import uuid + +import redis + +from zaqar.openstack.common.cache import cache as oslo_cache +from zaqar.openstack.common import timeutils +from zaqar.queues import storage +from zaqar.queues.storage.redis import controllers +from zaqar.queues.storage.redis import driver +from zaqar.queues.storage.redis import messages +from zaqar.queues.storage.redis import options +from zaqar.queues.storage.redis import utils +from zaqar import tests as testing +from zaqar.tests.queues.storage import base + + +def _create_sample_message(now=None, claimed=False, body=None): + if now is None: + now = timeutils.utcnow_ts() + + if claimed: + claim_id = uuid.uuid4() + claim_expires = now + 300 + else: + claim_id = None + claim_expires = now + + if body is None: + body = {} + + return messages.Message( + ttl=60, + created=now, + client_uuid=uuid.uuid4(), + claim_id=claim_id, + claim_expires=claim_expires, + body=body + ) + + +class RedisUtilsTest(testing.TestBase): + + config_file = 'wsgi_redis.conf' + + def setUp(self): + super(RedisUtilsTest, self).setUp() + + self.conf.register_opts(options.REDIS_OPTIONS, + group=options.REDIS_GROUP) + + self.redis_conf = self.conf[options.REDIS_GROUP] + + MockDriver = collections.namedtuple('MockDriver', 'redis_conf') + + self.driver = MockDriver(self.redis_conf) + + def test_scope_queue_name(self): + self.assertEqual(utils.scope_queue_name('my-q'), '.my-q') + self.assertEqual(utils.scope_queue_name('my-q', None), '.my-q') + self.assertEqual(utils.scope_queue_name('my-q', '123'), '123.my-q') + self.assertEqual(utils.scope_queue_name('my-q_1', '123'), '123.my-q_1') + + self.assertEqual(utils.scope_queue_name(), '.') + self.assertEqual(utils.scope_queue_name(None, '123'), '123.') + + def test_scope_messages_set(self): + self.assertEqual(utils.scope_message_ids_set('my-q'), '.my-q.') + self.assertEqual(utils.scope_message_ids_set('my-q', 'p'), 'p.my-q.') + self.assertEqual(utils.scope_message_ids_set('my-q', 'p', 's'), + 'p.my-q.s') + + self.assertEqual(utils.scope_message_ids_set(None), '..') + self.assertEqual(utils.scope_message_ids_set(None, '123'), '123..') + self.assertEqual(utils.scope_message_ids_set(None, None, 's'), '..s') + + def test_normalize_none_str(self): + self.assertEqual(utils.normalize_none_str('my-q'), 'my-q') + self.assertEqual(utils.normalize_none_str(None), '') + + def test_msg_claimed_filter(self): + now = timeutils.utcnow_ts() + + unclaimed_msg = _create_sample_message() + self.assertFalse(utils.msg_claimed_filter(unclaimed_msg, now)) + + claimed_msg = _create_sample_message(claimed=True) + self.assertTrue(utils.msg_claimed_filter(claimed_msg, now)) + + # NOTE(kgriffs): Has a claim ID, but the claim is expired + claimed_msg.claim_expires = now - 60 + self.assertFalse(utils.msg_claimed_filter(claimed_msg, now)) + + def test_descope_queue_name(self): + self.assertEqual(utils.descope_queue_name('p.q'), 'q') + self.assertEqual(utils.descope_queue_name('.q'), 'q') + self.assertEqual(utils.descope_queue_name('.'), '') + + def test_msg_echo_filter(self): + msg = _create_sample_message() + self.assertTrue(utils.msg_echo_filter(msg, msg.client_uuid)) + + alt_uuid = utils.generate_uuid() + self.assertFalse(utils.msg_echo_filter(msg, alt_uuid)) + + def test_basic_message(self): + now = timeutils.utcnow_ts() + body = { + 'msg': 'Hello Earthlings!', + 'unicode': u'ab\u00e7', + 'bytes': b'ab\xc3\xa7', + b'ab\xc3\xa7': 'one, two, three', + u'ab\u00e7': 'one, two, three', + } + + msg = _create_sample_message(now=now, body=body) + basic_msg = msg.to_basic(now + 5) + + self.assertEqual(basic_msg['id'], msg.id) + self.assertEqual(basic_msg['age'], 5) + self.assertEqual(basic_msg['body'], body) + self.assertEqual(basic_msg['ttl'], msg.ttl) + + def test_retries_on_connection_error(self): + num_calls = [0] + + @utils.retries_on_connection_error + def _raises_connection_error(self): + num_calls[0] += 1 + raise redis.exceptions.ConnectionError + + self.assertRaises(redis.exceptions.ConnectionError, + _raises_connection_error, self) + self.assertEqual(num_calls, [self.redis_conf.max_reconnect_attempts]) + + +@testing.requires_redis +class RedisDriverTest(testing.TestBase): + + config_file = 'wsgi_redis.conf' + + def test_db_instance(self): + cache = oslo_cache.get_cache() + redis_driver = driver.DataDriver(self.conf, cache) + + self.assertTrue(isinstance(redis_driver.connection, redis.StrictRedis)) + + +@testing.requires_redis +class RedisQueuesTest(base.QueueControllerTest): + + driver_class = driver.DataDriver + config_file = 'wsgi_redis.conf' + controller_class = controllers.QueueController + + def setUp(self): + super(RedisQueuesTest, self).setUp() + self.connection = self.driver.connection + self.msg_controller = self.driver.message_controller + + def tearDown(self): + super(RedisQueuesTest, self).tearDown() + self.connection.flushdb() + + def test_inc_counter(self): + queue_name = 'inc-counter' + self.controller.create(queue_name) + self.controller._inc_counter(queue_name, None, 10) + + scoped_q_name = utils.scope_queue_name(queue_name) + count = self.controller._get_queue_info(scoped_q_name, b'c', int)[0] + self.assertEqual(count, 10) + + def test_inc_claimed(self): + self.addCleanup(self.controller.delete, 'test-queue', + project=self.project) + + queue_name = 'inc-claimed' + + self.controller.create(queue_name) + self.controller._inc_claimed(queue_name, None, 10) + + scoped_q_name = utils.scope_queue_name(queue_name) + claimed = self.controller._get_queue_info(scoped_q_name, + b'cl', int)[0] + self.assertEqual(claimed, 10) + + +@testing.requires_redis +class RedisMessagesTest(base.MessageControllerTest): + driver_class = driver.DataDriver + config_file = 'wsgi_redis.conf' + controller_class = controllers.MessageController + + def setUp(self): + super(RedisMessagesTest, self).setUp() + self.connection = self.driver.connection + self.q_controller = self.driver.queue_controller + + def tearDown(self): + super(RedisMessagesTest, self).tearDown() + self.connection.flushdb() + + def test_get_count(self): + queue_name = 'get-count' + self.q_controller.create(queue_name) + + msgs = [{ + 'ttl': 300, + 'body': 'di mo fy' + } for i in range(0, 10)] + + client_id = uuid.uuid4() + # Creating 10 messages + self.controller.post(queue_name, msgs, client_id) + + messages_set_id = utils.scope_message_ids_set(queue_name, None, + 'messages') + + num_msg = self.controller._get_count(messages_set_id) + self.assertEqual(num_msg, 10) + + def test_empty_queue_exception(self): + queue_name = 'empty-queue-test' + self.q_controller.create(queue_name) + + self.assertRaises(storage.errors.QueueIsEmpty, + self.controller.first, queue_name) + + +@testing.requires_redis +class RedisClaimsTest(base.ClaimControllerTest): + driver_class = driver.DataDriver + config_file = 'wsgi_redis.conf' + controller_class = controllers.ClaimController + + def setUp(self): + super(RedisClaimsTest, self).setUp() + self.connection = self.driver.connection + self.q_controller = self.driver.queue_controller + + def tearDown(self): + super(RedisClaimsTest, self).tearDown() + self.connection.flushdb() + + def test_claim_doesnt_exist(self): + queue_name = 'no-such-claim' + epoch = '000000000000000000000000' + self.q_controller.create(queue_name) + self.assertRaises(storage.errors.ClaimDoesNotExist, + self.controller.get, queue_name, + epoch, project=None) + + claim_id, messages = self.controller.create(queue_name, {'ttl': 2, + 'grace': 0}, + project=None) + + # Lets let it expire + time.sleep(2) + self.assertRaises(storage.errors.ClaimDoesNotExist, + self.controller.update, queue_name, + claim_id, {}, project=None) \ No newline at end of file diff --git a/zaqar/queues/storage/errors.py b/zaqar/queues/storage/errors.py index 7b47bb8c6..34e6aa51c 100644 --- a/zaqar/queues/storage/errors.py +++ b/zaqar/queues/storage/errors.py @@ -42,26 +42,34 @@ class Conflict(ExceptionBase): class MessageConflict(Conflict): msg_format = (u'Message could not be enqueued due to a conflict ' - u'with another message that is already in ' + u'with one or more other messages that are already in ' u'queue {queue} for project {project}') - def __init__(self, queue, project, message_ids): + def __init__(self, queue, project): """Initializes the error with contextual information. :param queue: name of the queue to which the message was posted :param project: name of the project to which the queue belongs - :param message_ids: list of IDs for messages successfully - posted. Note that these must be in the same order as the - list of messages originally submitted to be enqueued. """ super(MessageConflict, self).__init__(queue=queue, project=project) - self._succeeded_ids = message_ids - @property - def succeeded_ids(self): - return self._succeeded_ids + +class ClaimConflict(Conflict): + + msg_format = (u'Messages could not be claimed due to a conflict ' + u'with another parallel claim that is already in ' + u'queue {queue} for project {project}') + + def __init__(self, queue, project): + """Initializes the error with contextual information. + + :param queue: name of the queue to which the message was posted + :param project: name of the project to which the queue belongs + """ + + super(ClaimConflict, self).__init__(queue=queue, project=project) class QueueDoesNotExist(DoesNotExist): diff --git a/zaqar/queues/storage/mongodb/messages.py b/zaqar/queues/storage/mongodb/messages.py index 72e37036a..bb5244af8 100644 --- a/zaqar/queues/storage/mongodb/messages.py +++ b/zaqar/queues/storage/mongodb/messages.py @@ -668,9 +668,7 @@ class MessageController(storage.Message): queue=queue_name, project=project)) - succeeded_ids = [] - raise errors.MessageConflict(queue_name, project, - succeeded_ids) + raise errors.MessageConflict(queue_name, project) @utils.raises_conn_error @utils.retries_on_autoreconnect diff --git a/zaqar/queues/storage/redis/__init__.py b/zaqar/queues/storage/redis/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/zaqar/queues/storage/redis/claims.py b/zaqar/queues/storage/redis/claims.py new file mode 100644 index 000000000..e1348126a --- /dev/null +++ b/zaqar/queues/storage/redis/claims.py @@ -0,0 +1,368 @@ +# Copyright (c) 2014 Prashanth Raghu. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools + +import msgpack +import redis + +from zaqar.common import decorators +from zaqar.openstack.common import log as logging +from zaqar.openstack.common import timeutils +from zaqar.queues import storage +from zaqar.queues.storage import errors +from zaqar.queues.storage.redis import messages +from zaqar.queues.storage.redis import utils + +LOG = logging.getLogger(__name__) + +QUEUE_CLAIMS_SUFFIX = 'claims' +CLAIM_MESSAGES_SUFFIX = 'messages' + +RETRY_CLAIM_TIMEOUT = 10 + + +class ClaimController(storage.Claim): + """Implements claim resource operations using Redis. + + Redis Data Structures: + ---------------------- + Claims list (Redis set) contains claim ids + + Key: + + Name Field + ------------------------- + claim_ids m + + Claimed Messages (Redis set) contains the list of + message ids stored per claim + + Key: _messages + + Claim info(Redis Hash): + + Key: + + Name Field + ------------------------- + ttl -> t + id -> id + expires -> e + """ + def __init__(self, *args, **kwargs): + super(ClaimController, self).__init__(*args, **kwargs) + self._client = self.driver.connection + + self._packer = msgpack.Packer(encoding='utf-8', + use_bin_type=True).pack + self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8') + + def _get_claim_info(self, claim_id, fields, transform=int): + """Get one or more fields from the claim Info.""" + + values = self._client.hmget(claim_id, fields) + return [transform(v) for v in values] if transform else values + + def _exists(self, queue, claim_id, project): + client = self._client + claims_set_key = utils.scope_claims_set(queue, project, + QUEUE_CLAIMS_SUFFIX) + + # Return False if no such claim exists + # TODO(prashanthr_): Discuss the feasibility of a bloom filter. + if not client.sismember(claims_set_key, claim_id): + return False + + expires = self._get_claim_info(claim_id, b'e')[0] + now = timeutils.utcnow_ts() + + if now > expires: + return False + + return True + + def _get_claimed_message_keys(self, claim_id): + return self._client.lrange(claim_id, 0, -1) + + @decorators.lazy_property(write=False) + def _message_ctrl(self): + return self.driver.message_controller + + @decorators.lazy_property(write=False) + def _queue_ctrl(self): + return self.driver.queue_controller + + @utils.raises_conn_error + @utils.retries_on_connection_error + def get(self, queue, claim_id, project=None): + if not self._exists(queue, claim_id, project): + raise errors.ClaimDoesNotExist(queue, project, claim_id) + + claim_msgs_key = utils.scope_claim_messages(claim_id, + CLAIM_MESSAGES_SUFFIX) + + # basic_messages + msg_keys = self._get_claimed_message_keys(claim_msgs_key) + + with self._client.pipeline() as pipe: + for key in msg_keys: + pipe.hgetall(key) + + raw_messages = pipe.execute() + + now = timeutils.utcnow_ts() + basic_messages = [messages.Message.from_redis(msg).to_basic(now) + for msg in raw_messages if msg] + + # claim_meta + now = timeutils.utcnow_ts() + expires, ttl = self._get_claim_info(claim_id, [b'e', b't']) + update_time = expires - ttl + age = now - update_time + + claim_meta = { + 'age': age, + 'ttl': ttl, + 'id': claim_id, + } + + return claim_meta, basic_messages + + @utils.raises_conn_error + @utils.retries_on_connection_error + def create(self, queue, metadata, project=None, + limit=storage.DEFAULT_MESSAGES_PER_CLAIM): + + ttl = int(metadata.get('ttl', 60)) + grace = int(metadata.get('grace', 60)) + msg_ttl = ttl + grace + + claim_id = utils.generate_uuid() + claim_key = utils.scope_claim_messages(claim_id, + CLAIM_MESSAGES_SUFFIX) + + claims_set_key = utils.scope_claims_set(queue, project, + QUEUE_CLAIMS_SUFFIX) + + counter_key = self._queue_ctrl._claim_counter_key(queue, project) + + with self._client.pipeline() as pipe: + + start_ts = timeutils.utcnow_ts() + + # NOTE(kgriffs): Retry the operation if another transaction + # completes before this one, in which case it will have + # claimed the same messages the current thread is trying + # to claim, and therefoe we must try for another batch. + # + # This loop will eventually time out if we can't manage to + # claim any messages due to other threads continually beating + # us to the punch. + + # TODO(kgriffs): Would it be beneficial (or harmful) to + # introducce a backoff sleep in between retries? + while (timeutils.utcnow_ts() - start_ts) < RETRY_CLAIM_TIMEOUT: + + # NOTE(kgriffs): The algorithm for claiming messages: + # + # 1. Get a batch of messages that are currently active. + # 2. For each active message in the batch, extend its + # lifetime IFF it would otherwise expire before the + # claim itself does. + # 3. Associate the claim with each message + # 4. Create a claim record with details such as TTL + # and expiration time. + # 5. Add the claim's ID to a set to facilitate fast + # existence checks. + + results = self._message_ctrl._active(queue, project=project, + limit=limit) + + cursor = next(results) + msg_list = list(cursor) + + # NOTE(kgriffs): If there are no active messages to + # claim, simply return an empty list. + if not msg_list: + return (None, iter([])) + + basic_messages = [] + + try: + # TODO(kgriffs): Is it faster/better to do this all + # in a Lua script instead of using an app-layer + # transaction? + + # NOTE(kgriffs): Abort the entire transaction if + # another request beats us to the punch. We detect + # this by putting a watch on the key that will have + # one of its fields updated as the final step of + # the transaction. + pipe.watch(counter_key) + pipe.multi() + + now = timeutils.utcnow_ts() + + claim_expires = now + ttl + msg_expires = claim_expires + grace + + # Associate the claim with each message + for msg in msg_list: + msg.claim_id = claim_id + msg.claim_expires = claim_expires + + if _msg_would_expire(msg, msg_expires): + msg.ttl = msg_ttl + msg.expires = msg_expires + + pipe.rpush(claim_key, msg.id) + + # TODO(kgriffs): Rather than writing back the + # entire message, only set the fields that + # have changed. + msg.to_redis(pipe) + + basic_messages.append(msg.to_basic(now)) + + # Create the claim + claim_info = { + 'id': claim_id, + 't': ttl, + 'e': claim_expires + } + + pipe.hmset(claim_id, claim_info) + + # NOTE(kgriffs): Add the claim ID to a set so that + # existence checks can be performed quickly. + pipe.sadd(claims_set_key, claim_id) + + # NOTE(kgriffs): Update a counter that facilitates + # the queue stats calculation. + self._queue_ctrl._inc_claimed(queue, project, + len(msg_list), + pipe=pipe) + + pipe.execute() + return claim_id, basic_messages + + except redis.exceptions.WatchError: + continue + + raise errors.ClaimConflict(queue, project) + + @utils.raises_conn_error + @utils.retries_on_connection_error + def update(self, queue, claim_id, metadata, project=None): + if not self._exists(queue, claim_id, project): + raise errors.ClaimDoesNotExist(claim_id, queue, project) + + now = timeutils.utcnow_ts() + + claim_ttl = int(metadata.get('ttl', 60)) + claim_expires = now + claim_ttl + + grace = int(metadata.get('grace', 60)) + msg_ttl = claim_ttl + grace + msg_expires = claim_expires + grace + + claim_messages = utils.scope_claim_messages(claim_id, + CLAIM_MESSAGES_SUFFIX) + + msg_keys = self._get_claimed_message_keys(claim_messages) + + with self._client.pipeline() as pipe: + for key in msg_keys: + pipe.hgetall(key) + + claimed_msgs = pipe.execute() + + claim_info = { + 't': claim_ttl, + 'e': claim_expires, + } + + with self._client.pipeline() as pipe: + for msg in claimed_msgs: + if msg: + msg = messages.Message.from_redis(msg) + msg.claim_id = claim_id + msg.claim_expires = claim_expires + + if _msg_would_expire(msg, msg_expires): + msg.ttl = msg_ttl + msg.expires = msg_expires + + # TODO(kgriffs): Rather than writing back the + # entire message, only set the fields that + # have changed. + msg.to_redis(pipe) + + # Update the claim id and claim expiration info + # for all the messages. + pipe.hmset(claim_id, claim_info) + + pipe.execute() + + @utils.raises_conn_error + @utils.retries_on_connection_error + def delete(self, queue, claim_id, project=None): + # NOTE(prashanthr_): Return silently when the claim + # does not exist + if not self._exists(queue, claim_id, project): + return + + now = timeutils.utcnow_ts() + claim_messages_key = utils.scope_claim_messages(claim_id, + CLAIM_MESSAGES_SUFFIX) + + msg_keys = self._get_claimed_message_keys(claim_messages_key) + + with self._client.pipeline() as pipe: + for msg_key in msg_keys: + pipe.hgetall(msg_key) + + claimed_msgs = pipe.execute() + + # Update the claim id and claim expiration info + # for all the messages. + claims_set_key = utils.scope_claims_set(queue, project, + QUEUE_CLAIMS_SUFFIX) + + with self._client.pipeline() as pipe: + pipe.srem(claims_set_key, claim_id) + pipe.delete(claim_id) + pipe.delete(claim_messages_key) + + for msg in claimed_msgs: + if msg: + msg = messages.Message.from_redis(msg) + msg.claim_id = None + msg.claim_expires = now + + # TODO(kgriffs): Rather than writing back the + # entire message, only set the fields that + # have changed. + msg.to_redis(pipe) + + self._queue_ctrl._inc_claimed(queue, project, + -1 * len(claimed_msgs), + pipe=pipe) + + pipe.execute() + + +def _msg_would_expire(message, now): + return message.expires < now diff --git a/zaqar/queues/storage/redis/controllers.py b/zaqar/queues/storage/redis/controllers.py new file mode 100644 index 000000000..07b55cb4c --- /dev/null +++ b/zaqar/queues/storage/redis/controllers.py @@ -0,0 +1,22 @@ +# Copyright (c) 2014 Prashanth Raghu +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from zaqar.queues.storage.redis import claims +from zaqar.queues.storage.redis import messages +from zaqar.queues.storage.redis import queues + + +QueueController = queues.QueueController +MessageController = messages.MessageController +ClaimController = claims.ClaimController diff --git a/zaqar/queues/storage/redis/driver.py b/zaqar/queues/storage/redis/driver.py new file mode 100644 index 000000000..083625cc8 --- /dev/null +++ b/zaqar/queues/storage/redis/driver.py @@ -0,0 +1,107 @@ +# Copyright (c) 2014 Prashanth Raghu. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import redis +from six.moves import urllib + +from zaqar.common import decorators +from zaqar.openstack.common import log as logging +from zaqar.queues import storage +from zaqar.queues.storage.redis import controllers +from zaqar.queues.storage.redis import options + + +LOG = logging.getLogger(__name__) + + +def _get_redis_client(conf): + # TODO(prashanthr_): Add SSL support + parsed_url = urllib.parse.urlparse(conf.uri) + + if parsed_url.hostname: + return redis.StrictRedis(host=parsed_url.hostname, + port=parsed_url.port) + else: + return redis.StrictRedis(unix_socket_path=parsed_url.path) + + +class DataDriver(storage.DataDriverBase): + + def __init__(self, conf, cache): + super(DataDriver, self).__init__(conf, cache) + + opts = options.REDIS_OPTIONS + + if 'dynamic' in conf: + names = conf[options.REDIS_GROUP].keys() + opts = filter(lambda x: x.name not in names, opts) + + self.conf.register_opts(opts, + group=options.REDIS_GROUP) + self.redis_conf = self.conf[options.REDIS_GROUP] + + def is_alive(self): + try: + return self.connection.ping() + except redis.exceptions.ConnectionError: + return False + + def _health(self): + KPI = {} + KPI['storage_reachable'] = self.is_alive() + KPI['operation_status'] = self._get_operation_status() + + # TODO(kgriffs): Add metrics re message volume + return KPI + + @decorators.lazy_property(write=False) + def connection(self): + """Redis client connection instance.""" + return _get_redis_client(self.redis_conf) + + @decorators.lazy_property(write=False) + def queue_controller(self): + return controllers.QueueController(self) + + @decorators.lazy_property(write=False) + def message_controller(self): + return controllers.MessageController(self) + + @decorators.lazy_property(write=False) + def claim_controller(self): + return controllers.ClaimController(self) + + +class ControlDriver(storage.ControlDriverBase): + + def __init__(self, conf, cache): + super(ControlDriver, self).__init__(conf, cache) + + self.conf.register_opts(options.REDIS_OPTIONS, + group=options.REDIS_GROUP) + + self.redis_conf = self.conf[options.REDIS_GROUP] + + @decorators.lazy_property(write=False) + def connection(self): + """Redis client connection instance.""" + return _get_redis_client(self.redis_conf) + + @property + def pools_controller(self): + return None + + @property + def catalogue_controller(self): + return None diff --git a/zaqar/queues/storage/redis/messages.py b/zaqar/queues/storage/redis/messages.py new file mode 100644 index 000000000..1978cae93 --- /dev/null +++ b/zaqar/queues/storage/redis/messages.py @@ -0,0 +1,483 @@ +# Copyright (c) 2014 Prashanth Raghu. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools +import uuid + +import redis + +from zaqar.common import decorators +from zaqar.openstack.common import strutils +from zaqar.openstack.common import timeutils +from zaqar.queues import storage +from zaqar.queues.storage import errors +from zaqar.queues.storage.redis import models +from zaqar.queues.storage.redis import utils + +Message = models.Message + + +MESSAGE_IDS_SUFFIX = 'messages' +# The rank counter is an atomic index to rank messages +# in a FIFO manner. +MESSAGE_RANK_COUNTER_SUFFIX = 'rank_counter' + +# NOTE(kgriffs): This value, in seconds, should be at least less than the +# minimum allowed TTL for messages (60 seconds). +RETRY_POST_TIMEOUT = 10 + + +class MessageController(storage.Message): + """Implements message resource operations using Redis. + + Messages are scoped by project + queue. + + Redis Data Structures: + ---------------------- + 1. Message id's list (Redis sorted set) + + Each queue in the system has a set of message ids currently + in the queue. The list is sorted based on a ranking which is + incremented atomically using the counter(MESSAGE_RANK_COUNTER_SUFFIX) + also stored in the database for every queue. + + Key: + + 2. Messages(Redis Hash): + + Scoped by the UUID of the message, the redis datastructure + has the following information. + + + Name Field + ----------------------------- + id -> id + ttl -> t + expires -> e + body -> b + claim -> c + claim expiry time -> c.e + client uuid -> u + created time -> cr + """ + + def __init__(self, *args, **kwargs): + super(MessageController, self).__init__(*args, **kwargs) + self._client = self.driver.connection + + @decorators.lazy_property(write=False) + def _queue_ctrl(self): + return self.driver.queue_controller + + @decorators.lazy_property(write=False) + def _claim_ctrl(self): + return self.driver.claim_controller + + def _active(self, queue_name, marker=None, echo=False, + client_uuid=None, project=None, + limit=None): + return self._list(queue_name, project=project, marker=marker, + echo=echo, client_uuid=client_uuid, + include_claimed=False, + limit=limit, to_basic=False) + + def _get_count(self, msgset_key): + """Get num messages in a Queue. + + Return the number of messages in a queue scoped by + queue and project. + """ + return self._client.zcard(msgset_key) + + @utils.raises_conn_error + @utils.retries_on_connection_error + def _delete_queue_messages(self, queue, project, pipe): + """Method to remove all the messages belonging to a queue. + + Will be referenced from the QueueController. + The pipe to execute deletion will be passed from the QueueController + executing the operation. + """ + client = self._client + msgset_key = utils.scope_message_ids_set(queue, project, + MESSAGE_IDS_SUFFIX) + message_ids = client.zrange(msgset_key, 0, -1) + + pipe.delete(msgset_key) + for msg_id in message_ids: + pipe.delete(msg_id) + + # TODO(prashanthr_): Looking for better ways to solve the issue. + def _find_first_unclaimed(self, queue, project, limit): + """Find the first unclaimed message in the queue.""" + + msgset_key = utils.scope_message_ids_set(queue, project, + MESSAGE_IDS_SUFFIX) + marker = 0 + now = timeutils.utcnow_ts() + + # NOTE(prashanthr_): This will not be an infinite loop. + while True: + msg_keys = self._client.zrange(msgset_key, marker, + marker + limit) + if msg_keys: + messages = [Message.from_redis(self._client.hgetall(msg_key)) + for msg_key in msg_keys] + + for msg in messages: + if not utils.msg_claimed_filter(msg, now): + return msg.id + else: + return None + + def _exists(self, key): + """Check if message exists in the Queue. + + Helper function which checks if a particular message_id + exists in the sorted set of the queues message ids. + """ + return self._client.exists(key) + + def _get_first_message_id(self, queue, project, sort): + """Fetch head/tail of the Queue. + + Helper function to get the first message in the queue + sort > 0 get from the left else from the right. + """ + msgset_key = utils.scope_message_ids_set(queue, project, + MESSAGE_IDS_SUFFIX) + + sorter = self._client.zrange if sort == 1 else self._client.zrevrange + message_ids = sorter(msgset_key, 0, 0) + return message_ids[0] if message_ids else None + + def _get(self, message_id): + msg = self._client.hgetall(message_id) + return Message.from_redis(msg) if msg else None + + def _get_claim(self, message_id): + claim = self._client.hmget(message_id, 'c', 'c.e') + + if claim == [None, None]: + # NOTE(kgriffs): message_id was not found + return None + + return { + # NOTE(kgriffs): A "None" claim is serialized as an empty str + 'id': strutils.safe_decode(claim[0]) or None, + 'expires': int(claim[1]), + } + + def _list(self, queue, project=None, marker=None, + limit=storage.DEFAULT_MESSAGES_PER_PAGE, + echo=False, client_uuid=None, + include_claimed=False, to_basic=True): + + if not self._queue_ctrl.exists(queue, project): + raise errors.QueueDoesNotExist(queue, + project) + + msgset_key = utils.scope_message_ids_set(queue, + project, + MESSAGE_IDS_SUFFIX) + client = self._client + + with self._client.pipeline() as pipe: + # NOTE(prashanthr_): Iterate through the queue to find the first + # unclaimed message. + if not marker and not include_claimed: + marker = self._find_first_unclaimed(queue, project, limit) + start = client.zrank(msgset_key, marker) or 0 + else: + rank = client.zrank(msgset_key, marker) + start = rank + 1 if rank else 0 + + message_ids = client.zrange(msgset_key, start, + start + (limit - 1)) + + for msg_id in message_ids: + pipe.hgetall(msg_id) + + messages = pipe.execute() + + # NOTE(prashanthr_): Build a list of filters for checking + # the following: + # + # 1. Message is expired + # 2. Message is claimed + # 3. Message should not be echoed + # + now = timeutils.utcnow_ts() + filters = [functools.partial(utils.msg_expired_filter, now=now)] + + if not include_claimed: + filters.append(functools.partial(utils.msg_claimed_filter, + now=now)) + + if not echo: + filters.append(functools.partial(utils.msg_echo_filter, + client_uuid=client_uuid)) + + marker = {} + + yield _filter_messages(messages, pipe, filters, to_basic, marker) + yield marker['next'] + + @utils.raises_conn_error + @utils.retries_on_connection_error + def list(self, queue, project=None, marker=None, + limit=storage.DEFAULT_MESSAGES_PER_PAGE, + echo=False, client_uuid=None, + include_claimed=False): + + return self._list(queue, project, marker, limit, echo, + client_uuid, include_claimed) + + @utils.raises_conn_error + @utils.retries_on_connection_error + def first(self, queue, project=None, sort=1): + if sort not in (1, -1): + raise ValueError(u'sort must be either 1 (ascending) ' + u'or -1 (descending)') + + message_id = self._get_first_message_id(queue, project, sort) + if not message_id: + raise errors.QueueIsEmpty(queue, project) + + now = timeutils.utcnow_ts() + return self._get(message_id).to_basic(now, include_created=True) + + @utils.raises_conn_error + @utils.retries_on_connection_error + def get(self, queue, message_id, project=None): + if not self._queue_ctrl.exists(queue, project): + raise errors.QueueDoesNotExist(queue, project) + + message = self._get(message_id) + now = timeutils.utcnow_ts() + + if message and not utils.msg_expired_filter(message, now): + return message.to_basic(now) + else: + raise errors.MessageDoesNotExist(message_id, queue, project) + + @utils.raises_conn_error + @utils.retries_on_connection_error + def bulk_get(self, queue, message_ids, project=None): + if not self._queue_ctrl.exists(queue, project): + raise errors.QueueDoesNotExist(queue, project) + + # NOTE(prashanthr_): Pipelining is used here purely + # for performance. + with self._client.pipeline() as pipe: + for mid in message_ids: + pipe.hgetall(mid) + + messages = pipe.execute() + + # NOTE(kgriffs): Skip messages that may have been deleted + now = timeutils.utcnow_ts() + return (Message.from_redis(msg).to_basic(now) + for msg in messages if msg) + + @utils.raises_conn_error + @utils.retries_on_connection_error + def post(self, queue, messages, client_uuid, project=None): + if not self._queue_ctrl.exists(queue, project): + raise errors.QueueDoesNotExist(queue, project) + + msgset_key = utils.scope_message_ids_set(queue, project, + MESSAGE_IDS_SUFFIX) + + counter_key = utils.scope_queue_index(queue, project, + MESSAGE_RANK_COUNTER_SUFFIX) + + with self._client.pipeline() as pipe: + + start_ts = timeutils.utcnow_ts() + + # NOTE(kgriffs): Retry the operation if another transaction + # completes before this one, in which case it may have + # posted messages with the same rank counter the current + # thread is trying to use, which would cause messages + # to get out of order and introduce the risk of a client + # missing a message while reading from the queue. + # + # This loop will eventually time out if we can't manage to + # post any messages due to other threads continually beating + # us to the punch. + + # TODO(kgriffs): Would it be beneficial (or harmful) to + # introducce a backoff sleep in between retries? + while (timeutils.utcnow_ts() - start_ts) < RETRY_POST_TIMEOUT: + now = timeutils.utcnow_ts() + prepared_messages = [ + Message( + ttl=msg['ttl'], + created=now, + client_uuid=client_uuid, + claim_id=None, + claim_expires=now, + body=msg.get('body', {}), + ) + + for msg in messages + ] + + try: + pipe.watch(counter_key) + + rank_counter = pipe.get(counter_key) + rank_counter = int(rank_counter) if rank_counter else 0 + + pipe.multi() + + keys = [] + for i, msg in enumerate(prepared_messages): + msg.to_redis(pipe) + pipe.zadd(msgset_key, rank_counter + i, msg.id) + keys.append(msg.id) + + pipe.incrby(counter_key, len(keys)) + self._queue_ctrl._inc_counter(queue, project, + len(prepared_messages), + pipe=pipe) + + pipe.execute() + return keys + + except redis.exceptions.WatchError: + continue + + raise errors.MessageConflict(queue, project) + + @utils.raises_conn_error + @utils.retries_on_connection_error + def delete(self, queue, message_id, project=None, claim=None): + if not self._queue_ctrl.exists(queue, project): + raise errors.QueueDoesNotExist(queue, project) + + # TODO(kgriffs): Create decorator for validating claim and message + # IDs, since those are not checked at the transport layer. This + # decorator should be applied to all relevant methods. + if claim is not None: + try: + uuid.UUID(claim) + except ValueError: + raise errors.ClaimDoesNotExist(queue, project, claim) + + msg_claim = self._get_claim(message_id) + + # NOTE(kgriffs): The message does not exist, so + # it is essentially "already deleted". + if msg_claim is None: + return + + now = timeutils.utcnow_ts() + is_claimed = msg_claim['id'] and (now < msg_claim['expires']) + + if claim is None: + if is_claimed: + raise errors.MessageIsClaimed(message_id) + + elif not is_claimed: + raise errors.MessageNotClaimed(message_id) + + elif msg_claim['id'] != claim: + if not self._claim_ctrl._exists(queue, claim, project): + raise errors.ClaimDoesNotExist(queue, project, claim) + + raise errors.MessageNotClaimedBy(message_id, claim) + + msgset_key = utils.scope_message_ids_set(queue, project, + MESSAGE_IDS_SUFFIX) + with self._client.pipeline() as pipe: + results = pipe.delete(message_id).zrem(msgset_key, + message_id).execute() + + # NOTE(prashanthr_): results[0] is 1 when the delete is + # successful. Hence we use that case to identify successful + # deletes. + if results[0] == 1: + self._queue_ctrl._inc_counter(queue, project, -1) + + @utils.raises_conn_error + @utils.retries_on_connection_error + def bulk_delete(self, queue, message_ids, project=None): + if not self._queue_ctrl.exists(queue, project): + raise errors.QueueDoesNotExist(queue, + project) + + msgset_key = utils.scope_message_ids_set(queue, project, + MESSAGE_IDS_SUFFIX) + + with self._client.pipeline() as pipe: + for message_id in message_ids: + pipe.delete(message_id).zrem(msgset_key, message_id) + + results = pipe.execute() + + # NOTE(prashanthr_): None is returned for the cases where + # the message might not exist or has been deleted/expired. + # Hence we calculate the number of deletes as the + # total number of message ids - number of failed deletes. + amount = -1 * (len(results) - results.count(0)) / 2 + self._queue_ctrl._inc_counter(queue, project, int(amount)) + + @utils.raises_conn_error + @utils.retries_on_connection_error + def pop(self, queue, limit, project=None): + # Pop is implemented as a chain of the following operations: + # 1. Create a claim. + # 2. Delete the messages claimed. + # 3. Delete the claim. + + claim_id, messages = self._claim_ctrl.create( + queue, dict(ttl=1, grace=0), project, limit=limit) + + message_ids = [message['id'] for message in messages] + self.bulk_delete(queue, message_ids, project) + # NOTE(prashanthr_): Creating a claim controller reference + # causes a recursive reference. Hence, using the reference + # from the driver. + self._claim_ctrl.delete(queue, claim_id, project) + return messages + + +def _filter_messages(messages, pipe, filters, to_basic, marker): + """Create a filtering iterator over a list of messages. + + The function accepts a list of filters to be filtered + before the the message can be included as a part of the reply. + """ + now = timeutils.utcnow_ts() + + for msg in messages: + # NOTE(kgriffs): Message may have been deleted, so + # check each value to ensure we got a message back + if msg: + msg = Message.from_redis(msg) + + # NOTE(kgriffs): Check to see if any of the filters + # indiciate that this message should be skipped. + for should_skip in filters: + if should_skip(msg): + break + else: + marker['next'] = msg.id + + if to_basic: + yield msg.to_basic(now) + else: + yield msg diff --git a/zaqar/queues/storage/redis/models.py b/zaqar/queues/storage/redis/models.py new file mode 100644 index 000000000..74f9bc612 --- /dev/null +++ b/zaqar/queues/storage/redis/models.py @@ -0,0 +1,124 @@ +# Copyright (c) 2014 Prashanth Raghu. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools +import uuid + +import msgpack + +from zaqar.openstack.common import strutils +from zaqar.openstack.common import timeutils + + +_pack = msgpack.Packer(encoding='utf-8', use_bin_type=True).pack +_unpack = functools.partial(msgpack.unpackb, encoding='utf-8') + + +# TODO(kgriffs): Make similar classes for claims and queues +class Message(object): + """Message is used to organize,store and retrieve messages from redis. + + Message class helps organize,store and retrieve messages in a version + compatible manner. + + :param id: Message ID in the form of a hexadecimal UUID. If not + given, one will be automatically generated. + :param ttl: Message TTL in seconds + :param created: Message creation time as a UNIX timestamp + :param client_uuid: UUID of the client that posted the message + :param claim_id: If claimed, the UUID of the claim. Set to None + for messages that have never been claimed. + :param claim_expires: Claim expiration as a UNIX timestamp + :param body: Message payload. Must be serializable to mspack. + """ + message_data = {} + + __slots__ = ( + 'id', + 'ttl', + 'created', + 'expires', + 'client_uuid', + 'claim_id', + 'claim_expires', + 'body', + ) + + def __init__(self, **kwargs): + self.id = kwargs.get('id', str(uuid.uuid4())) + self.ttl = kwargs['ttl'] + self.created = kwargs['created'] + self.expires = kwargs.get('expires', self.created + self.ttl) + + self.client_uuid = str(kwargs['client_uuid']) + + self.claim_id = kwargs.get('claim_id') + self.claim_expires = kwargs['claim_expires'] + + self.body = kwargs['body'] + + @property + def created_iso(self): + return timeutils.iso8601_from_timestamp(self.created) + + @staticmethod + def from_redis(doc): + claim_id = doc[b'c'] + if claim_id: + claim_id = strutils.safe_decode(claim_id) + else: + claim_id = None + + # NOTE(kgriffs): Under Py3K, redis-py converts all strings + # into binary. Woohoo! + return Message( + id=strutils.safe_decode(doc[b'id']), + ttl=int(doc[b't']), + created=int(doc[b'cr']), + expires=int(doc[b'e']), + + client_uuid=strutils.safe_decode(doc[b'u']), + + claim_id=claim_id, + claim_expires=int(doc[b'c.e']), + + body=_unpack(doc[b'b']), + ) + + def to_redis(self, pipe): + doc = { + 'id': self.id, + 't': self.ttl, + 'cr': self.created, + 'e': self.expires, + 'u': self.client_uuid, + 'c': self.claim_id or '', + 'c.e': self.claim_expires, + 'b': _pack(self.body), + } + + pipe.hmset(self.id, doc) + + def to_basic(self, now, include_created=False): + basic_msg = { + 'id': self.id, + 'age': now - self.created, + 'ttl': self.ttl, + 'body': self.body + } + + if include_created: + basic_msg['created'] = self.created_iso + + return basic_msg diff --git a/zaqar/queues/storage/redis/options.py b/zaqar/queues/storage/redis/options.py new file mode 100644 index 000000000..f768f8cf7 --- /dev/null +++ b/zaqar/queues/storage/redis/options.py @@ -0,0 +1,40 @@ +# Copyright (c) 2014 Prashanth Raghu. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Redis storage driver configuration options.""" + +from oslo.config import cfg + + +REDIS_OPTIONS = ( + cfg.StrOpt('uri', default="redis://127.0.0.1:6379", + help=('Redis Server URI. Can also use a ' + 'socket file based connector. ' + 'Ex: redis:/tmp/redis.sock')), + + cfg.IntOpt('max_reconnect_attempts', default=10, + help=('Maximum number of times to retry an operation that ' + 'failed due to a redis node failover.')), + + cfg.FloatOpt('reconnect_sleep', default=1, + help=('Base sleep interval between attempts to reconnect ' + 'after a redis node failover. ')) + +) + +REDIS_GROUP = 'drivers:storage:redis' + + +def _config_options(): + return [(REDIS_GROUP, REDIS_OPTIONS)] diff --git a/zaqar/queues/storage/redis/queues.py b/zaqar/queues/storage/redis/queues.py new file mode 100644 index 000000000..61098698d --- /dev/null +++ b/zaqar/queues/storage/redis/queues.py @@ -0,0 +1,253 @@ +# Copyright (c) 2014 Prashanth Raghu. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools + +import msgpack +import redis + +from zaqar.common import decorators +from zaqar.openstack.common import log as logging +from zaqar.openstack.common import timeutils +from zaqar.queues import storage +from zaqar.queues.storage import errors +from zaqar.queues.storage.redis import messages +from zaqar.queues.storage.redis import utils + +LOG = logging.getLogger(__name__) + +QUEUES_SET_STORE_NAME = 'queues_set' +MESSAGE_IDS_SUFFIX = 'messages' + + +class QueueController(storage.Queue): + """Implements queue resource operations using Redis. + + Queues are scoped by project, which is prefixed to the + queue name. + + Queues (Redis sorted set): + + Key: queues_set + + Id Value + --------------------------------- + name -> + + + The set helps faster existence checks, while the list helps + paginated retrieval of queues. + + Queue Information (Redis hash): + + Key: + + Name Field + ------------------------------- + count -> c + num_msgs_claimed -> cl + metadata -> m + creation timestamp -> t + """ + + def __init__(self, *args, **kwargs): + super(QueueController, self).__init__(*args, **kwargs) + self._client = self.driver.connection + self._packer = msgpack.Packer(encoding='utf-8', + use_bin_type=True).pack + self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8') + + @decorators.lazy_property(write=False) + def _message_ctrl(self): + return self.driver.message_controller + + def _claim_counter_key(self, name, project): + return utils.scope_queue_name(name, project) + + def _inc_counter(self, name, project, amount=1, pipe=None): + queue_key = utils.scope_queue_name(name, project) + + client = pipe if pipe is not None else self._client + client.hincrby(queue_key, 'c', amount) + + def _inc_claimed(self, name, project, amount=1, pipe=None): + queue_key = utils.scope_queue_name(name, project) + + client = pipe if pipe is not None else self._client + client.hincrby(queue_key, 'cl', amount) + + # TODO(kgriffs): Reimplement in Lua; this is way too expensive! + def _get_expired_message_count(self, name, project): + """Calculate the number of expired messages in the queue. + + Used to compute the stats on the queue. + Method has O(n) complexity as we iterate the entire list of + messages. + """ + + messages_set_key = utils.scope_message_ids_set(name, project, + MESSAGE_IDS_SUFFIX) + + with self._client.pipeline() as pipe: + for msg_key in self._client.zrange(messages_set_key, 0, -1): + pipe.hgetall(msg_key) + + raw_messages = pipe.execute() + + expired = 0 + now = timeutils.utcnow_ts() + + for msg in raw_messages: + if msg: + msg = messages.Message.from_redis(msg) + if utils.msg_expired_filter(msg, now): + expired += 1 + + return expired + + def _get_queue_info(self, queue_key, fields, transform=str): + """Get one or more fields from Queue Info.""" + + values = self._client.hmget(queue_key, fields) + return [transform(v) for v in values] if transform else values + + @utils.raises_conn_error + @utils.retries_on_connection_error + def list(self, project=None, marker=None, + limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False): + client = self._client + qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project) + marker = utils.scope_queue_name(marker, project) + rank = client.zrank(qset_key, marker) + start = rank + 1 if rank else 0 + + cursor = (q for q in client.zrange(qset_key, start, + start + limit - 1)) + marker_next = {} + + def denormalizer(info, name): + queue = {'name': utils.descope_queue_name(name)} + marker_next['next'] = queue['name'] + if detailed: + queue['metadata'] = info[1] + + return queue + + yield utils.QueueListCursor(self._client, cursor, denormalizer) + yield marker_next and marker_next['next'] + + def get(self, name, project=None): + """Obtain the metadata from the queue.""" + return self.get_metadata(name, project) + + @utils.raises_conn_error + def create(self, name, metadata=None, project=None): + # TODO(prashanthr_): Implement as a lua script. + queue_key = utils.scope_queue_name(name, project) + qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project) + + # Check if the queue already exists. + if self.exists(name, project): + return False + + queue = { + 'c': 0, + 'cl': 0, + 'm': self._packer(metadata or {}), + 't': timeutils.utcnow_ts() + } + + # Pipeline ensures atomic inserts. + with self._client.pipeline() as pipe: + pipe.zadd(qset_key, 1, queue_key).hmset(queue_key, queue) + + try: + pipe.execute() + except redis.exceptions.ResponseError: + return False + + return True + + @utils.raises_conn_error + @utils.retries_on_connection_error + def exists(self, name, project=None): + # TODO(prashanthr_): Cache this lookup + queue_key = utils.scope_queue_name(name, project) + qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project) + + return self._client.zrank(qset_key, queue_key) is not None + + @utils.raises_conn_error + @utils.retries_on_connection_error + def set_metadata(self, name, metadata, project=None): + if not self.exists(name, project): + raise errors.QueueDoesNotExist(name, project) + + key = utils.scope_queue_name(name, project) + fields = {'m': self._packer(metadata)} + + self._client.hmset(key, fields) + + @utils.raises_conn_error + @utils.retries_on_connection_error + def get_metadata(self, name, project=None): + if not self.exists(name, project): + raise errors.QueueDoesNotExist(name, project) + + queue_key = utils.scope_queue_name(name, project) + metadata = self._get_queue_info(queue_key, b'm', None)[0] + + return self._unpacker(metadata) + + @utils.raises_conn_error + @utils.retries_on_connection_error + def delete(self, name, project=None): + queue_key = utils.scope_queue_name(name, project) + qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project) + + # NOTE(prashanthr_): Pipelining is used to mitigate race conditions + with self._client.pipeline() as pipe: + pipe.zrem(qset_key, queue_key) + pipe.delete(queue_key) + self._message_ctrl._delete_queue_messages(name, project, pipe) + + pipe.execute() + + @utils.raises_conn_error + @utils.retries_on_connection_error + def stats(self, name, project=None): + if not self.exists(name, project=project): + raise errors.QueueDoesNotExist(name, project) + + queue_key = utils.scope_queue_name(name, project) + + claimed, total = self._get_queue_info(queue_key, [b'cl', b'c'], int) + expired = self._get_expired_message_count(name, project) + + message_stats = { + 'claimed': claimed, + 'free': total - claimed - expired, + 'total': total + } + + try: + newest = self._message_ctrl.first(name, project, -1) + oldest = self._message_ctrl.first(name, project, 1) + except errors.QueueIsEmpty: + pass + else: + message_stats['newest'] = newest + message_stats['oldest'] = oldest + + return {'messages': message_stats} diff --git a/zaqar/queues/storage/redis/utils.py b/zaqar/queues/storage/redis/utils.py new file mode 100644 index 000000000..bb6d58e86 --- /dev/null +++ b/zaqar/queues/storage/redis/utils.py @@ -0,0 +1,193 @@ +# Copyright (c) 2014 Prashanth Raghu. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools +import sys +import time +import uuid + +import redis +import six + +from zaqar.i18n import _ +from zaqar.openstack.common import log as logging +from zaqar.openstack.common import strutils +from zaqar.queues.storage import errors + +LOG = logging.getLogger(__name__) + + +def descope_queue_name(scoped_name): + """Descope Queue name with '.'. + + Returns the queue name from the scoped name + which is of the form project-id.queue-name + """ + + return scoped_name.split('.')[1] + + +def normalize_none_str(string_or_none): + """Returns '' IFF given value is None, passthrough otherwise. + + This function normalizes None to the empty string to facilitate + string concatenation when a variable could be None. + """ + + # TODO(prashanthr_) : Try to reuse this utility. Violates DRY + return '' if string_or_none is None else string_or_none + + +def generate_uuid(): + return str(uuid.uuid4()) + + +def scope_queue_name(queue=None, project=None): + """Returns a scoped name for a queue based on project and queue. + + If only the project name is specified, a scope signifying "all queues" + for that project is returned. If neither queue nor project are + specified, a scope for "all global queues" is returned, which + is to be interpreted as excluding queues scoped by project. + + :returns: '{project}.{queue}' if project and queue are given, + '{project}.' if ONLY project is given, '.{queue}' if ONLY + queue is given, and '.' if neither are given. + """ + + # TODO(prashanthr_) : Try to reuse this utility. Violates DRY + return normalize_none_str(project) + '.' + normalize_none_str(queue) + +# NOTE(prashanthr_): Aliase the scope_queue_name function +# to be used in the pools and claims controller as similar +# functionality is required to scope redis id's. +scope_pool_catalogue = scope_claim_messages = scope_queue_name + + +def scope_message_ids_set(queue=None, project=None, message_suffix=''): + """Scope messages set with '.' + + Returns a scoped name for the list of messages in the form + project-id_queue-name_suffix + """ + + return (normalize_none_str(project) + '.' + + normalize_none_str(queue) + '.' + + message_suffix) + +# NOTE(prashanthr_): Aliasing the scope_message_ids_set function +# to be used in the pools and claims controller as similar +# functionality is required to scope redis id's. +scope_queue_catalogue = scope_claims_set = scope_message_ids_set +scope_queue_index = scope_message_ids_set + + +def raises_conn_error(func): + """Handles the Redis ConnectionFailure error. + + This decorator catches Redis's ConnectionError + and raises Marconi's ConnectionError instead. + """ + + # Note(prashanthr_) : Try to reuse this utility. Violates DRY + # Can pass exception type into the decorator and create a + # storage level utility. + + @functools.wraps(func) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except redis.exceptions.ConnectionError as ex: + LOG.exception(ex) + raise errors.ConnectionError() + + return wrapper + + +def retries_on_connection_error(func): + """Causes the wrapped function to be re-called on ConnectionError. + + This decorator catches Redis ConnectionError and retries + the function call. + + .. Note:: + Assumes that the decorated function has defined self.driver.redis_cinf + so that `max_reconnect_attempts` and `reconnect_sleep` can be taken + into account. + + .. Warning:: The decorated function must be idempotent. + """ + + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + # TODO(prashanthr_) : Try to reuse this utility. Violates DRY + # Can pass config parameters into the decorator and create a + # storage level utility. + + max_attemps = self.driver.redis_conf.max_reconnect_attempts + sleep_sec = self.driver.redis_conf.reconnect_sleep + + for attempt in range(max_attemps): + try: + return func(self, *args, **kwargs) + except redis.exceptions.ConnectionError: + ex = sys.exc_info()[1] + LOG.warn(_(u'Caught ConnectionError, retrying the ' + 'call to {0}').format(func)) + + time.sleep(sleep_sec * (2 ** attempt)) + else: + LOG.error(_(u'Caught ConnectionError, maximum attempts ' + 'to {0} exceeded.').format(func)) + raise ex + + return wrapper + + +def msg_claimed_filter(message, now): + """Return True IFF the message is currently claimed.""" + + return message.claim_id and (now < message.claim_expires) + + +def msg_echo_filter(message, client_uuid): + """Return True IFF the specified client posted the message.""" + + return message.client_uuid == six.text_type(client_uuid) + + +def msg_expired_filter(message, now): + """Return True IFF the message has expired.""" + + return message.expires <= now + + +class QueueListCursor(object): + + def __init__(self, client, queues, denormalizer): + self.queue_iter = queues + self.denormalizer = denormalizer + self.client = client + + def __iter__(self): + return self + + @raises_conn_error + def next(self): + curr = next(self.queue_iter) + queue = self.client.hmget(curr, ['c', 'm']) + return self.denormalizer(queue, strutils.safe_decode(curr)) + + def __next__(self): + return self.next() \ No newline at end of file diff --git a/zaqar/queues/storage/utils.py b/zaqar/queues/storage/utils.py index d01e23b3d..1f5bf4c63 100644 --- a/zaqar/queues/storage/utils.py +++ b/zaqar/queues/storage/utils.py @@ -120,7 +120,7 @@ def keyify(key, iterable): def can_connect(uri): - """Given a URI, verifies whether its possible to connect to it. + """Given a URI, verifies whether it's possible to connect to it. :param uri: connection string to a storage endpoint :type uri: six.text_type diff --git a/zaqar/queues/transport/wsgi/v1_0/messages.py b/zaqar/queues/transport/wsgi/v1_0/messages.py index 2cfb8b3c5..d9fdaa46b 100644 --- a/zaqar/queues/transport/wsgi/v1_0/messages.py +++ b/zaqar/queues/transport/wsgi/v1_0/messages.py @@ -169,13 +169,8 @@ class CollectionResource(object): except storage_errors.MessageConflict as ex: LOG.exception(ex) - message_ids = ex.succeeded_ids - - if not message_ids: - # TODO(kgriffs): Include error code that is different - # from the code used in the generic case, below. - description = _(u'No messages could be enqueued.') - raise wsgi_errors.HTTPServiceUnavailable(description) + description = _(u'No messages could be enqueued.') + raise wsgi_errors.HTTPServiceUnavailable(description) except Exception as ex: LOG.exception(ex) diff --git a/zaqar/queues/transport/wsgi/v1_1/messages.py b/zaqar/queues/transport/wsgi/v1_1/messages.py index bd9514ad8..5a3a878a0 100644 --- a/zaqar/queues/transport/wsgi/v1_1/messages.py +++ b/zaqar/queues/transport/wsgi/v1_1/messages.py @@ -190,13 +190,8 @@ class CollectionResource(object): except storage_errors.MessageConflict as ex: LOG.exception(ex) - message_ids = ex.succeeded_ids - - if not message_ids: - # TODO(kgriffs): Include error code that is different - # from the code used in the generic case, below. - description = _(u'No messages could be enqueued.') - raise wsgi_errors.HTTPServiceUnavailable(description) + description = _(u'No messages could be enqueued.') + raise wsgi_errors.HTTPServiceUnavailable(description) except Exception as ex: LOG.exception(ex) diff --git a/zaqar/tests/__init__.py b/zaqar/tests/__init__.py index 049642c98..07ed21229 100644 --- a/zaqar/tests/__init__.py +++ b/zaqar/tests/__init__.py @@ -25,4 +25,5 @@ RUN_SLOW_TESTS = not SKIP_SLOW_TESTS expect = helpers.expect is_slow = helpers.is_slow requires_mongodb = helpers.requires_mongodb +requires_redis = helpers.requires_redis TestBase = base.TestBase diff --git a/zaqar/tests/helpers.py b/zaqar/tests/helpers.py index 5424bbb9a..032c01732 100644 --- a/zaqar/tests/helpers.py +++ b/zaqar/tests/helpers.py @@ -24,6 +24,7 @@ import testtools SKIP_SLOW_TESTS = os.environ.get('ZAQAR_TEST_SLOW') is None SKIP_MONGODB_TESTS = os.environ.get('ZAQAR_TEST_MONGODB') is None +SKIP_REDIS_TESTS = os.environ.get('ZAQAR_TEST_REDIS') is None @contextlib.contextmanager @@ -205,6 +206,22 @@ def requires_mongodb(test_case): return testtools.skipIf(SKIP_MONGODB_TESTS, reason)(test_case) +def requires_redis(test_case): + """Decorator to flag a test case as being dependent on Redis. + + Redis-specific tests will be skipped unless the MARCONI_TEST_REDIS + environment variable is set. If the variable is set, the tests will + assume that redis is running and listening on localhost. + """ + + reason = ('Skipping tests that require Redis. Ensure ' + 'Redis is running on localhost and then set ' + 'ZAQAR_TEST_REDIS in order to enable tests ' + 'that are specific to this storage backend. ') + + return testtools.skipIf(SKIP_REDIS_TESTS, reason)(test_case) + + def is_slow(condition=lambda self: True): """Decorator to flag slow tests. diff --git a/zaqar/tests/queues/storage/base.py b/zaqar/tests/queues/storage/base.py index db3e48dc0..aeff50da2 100644 --- a/zaqar/tests/queues/storage/base.py +++ b/zaqar/tests/queues/storage/base.py @@ -226,6 +226,119 @@ class QueueControllerTest(ControllerBaseTest): self.assertNotIn('newest', message_stats) self.assertNotIn('oldest', message_stats) + def test_queue_count_on_bulk_delete(self): + self.addCleanup(self.controller.delete, 'test-queue', + project=self.project) + queue_name = 'test-queue' + client_uuid = uuid.uuid4() + + created = self.controller.create(queue_name, project=self.project) + self.assertTrue(created) + + # Create 10 messages. + msg_keys = _insert_fixtures(self.message_controller, queue_name, + project=self.project, + client_uuid=client_uuid, num=10) + + stats = self.controller.stats(queue_name, + self.project)['messages'] + self.assertEqual(stats['total'], 10) + + # Delete 5 messages + self.message_controller.bulk_delete(queue_name, msg_keys[0:5], + self.project) + + stats = self.controller.stats(queue_name, + self.project)['messages'] + self.assertEqual(stats['total'], 5) + + def test_queue_count_on_bulk_delete_with_invalid_id(self): + self.addCleanup(self.controller.delete, 'test-queue', + project=self.project) + queue_name = 'test-queue' + client_uuid = uuid.uuid4() + + created = self.controller.create(queue_name, project=self.project) + self.assertTrue(created) + + # Create 10 messages. + msg_keys = _insert_fixtures(self.message_controller, queue_name, + project=self.project, + client_uuid=client_uuid, num=10) + + stats = self.controller.stats(queue_name, + self.project)['messages'] + self.assertEqual(stats['total'], 10) + + # Delete 5 messages + self.message_controller.bulk_delete(queue_name, + msg_keys[0:5] + ['invalid'], + self.project) + + stats = self.controller.stats(queue_name, + self.project)['messages'] + self.assertEqual(stats['total'], 5) + + def test_queue_count_on_delete(self): + self.addCleanup(self.controller.delete, 'test-queue', + project=self.project) + queue_name = 'test-queue' + client_uuid = uuid.uuid4() + + created = self.controller.create(queue_name, project=self.project) + self.assertTrue(created) + + # Create 10 messages. + msg_keys = _insert_fixtures(self.message_controller, queue_name, + project=self.project, + client_uuid=client_uuid, num=10) + + stats = self.controller.stats(queue_name, + self.project)['messages'] + self.assertEqual(stats['total'], 10) + + # Delete 1 message + self.message_controller.delete(queue_name, msg_keys[0], + self.project) + stats = self.controller.stats(queue_name, + self.project)['messages'] + self.assertEqual(stats['total'], 9) + + def test_queue_count_on_claim_delete(self): + self.addCleanup(self.controller.delete, 'test-queue', + project=self.project) + queue_name = 'test-queue' + client_uuid = uuid.uuid4() + + created = self.controller.create(queue_name, project=self.project) + self.assertTrue(created) + + # Create 15 messages. + _insert_fixtures(self.message_controller, queue_name, + project=self.project, + client_uuid=client_uuid, num=15) + + stats = self.controller.stats(queue_name, + self.project)['messages'] + self.assertEqual(stats['total'], 15) + + metadata = {'ttl': 120, 'grace': 60} + # Claim 10 messages + claim_id, _ = self.claim_controller.create(queue_name, metadata, + self.project) + + stats = self.controller.stats(queue_name, + self.project)['messages'] + self.assertEqual(stats['claimed'], 10) + + # Delete the claim + self.claim_controller.delete(queue_name, claim_id, + self.project) + stats = self.controller.stats(queue_name, + self.project)['messages'] + + self.assertEqual(stats['claimed'], 0) + class MessageControllerTest(ControllerBaseTest): """Message Controller base tests. @@ -1174,5 +1287,5 @@ def _insert_fixtures(controller, queue_name, project=None, 'event': 'Event number {0}'.format(n) }} - controller.post(queue_name, messages(), - project=project, client_uuid=client_uuid) + return controller.post(queue_name, messages(), + project=project, client_uuid=client_uuid)