diff --git a/etc/marconi.conf-sample b/etc/marconi.conf-sample index b70a2a546..e546e7036 100644 --- a/etc/marconi.conf-sample +++ b/etc/marconi.conf-sample @@ -8,7 +8,7 @@ # Log to this file! log_file = /var/log/marconi/server.log -;auth_strategy = +;auth_strategy = # ================= Syslog Options ============================ @@ -59,19 +59,6 @@ database = marconi # at the same instant. ;max_retry_jitter = 0.005 -# Frequency of message garbage collections, in seconds -;gc_interval = 300 - -# Threshold of number of expired messages to reach in a given -# queue, before performing the GC. Useful for reducing frequent -# locks on the DB for non-busy queues, or for worker queues -# which process jobs quickly enough to keep the number of in- -# flight messages low. -# -# Note: The higher this number, the larger the memory-mapped DB -# files will be. -;gc_threshold = 1000 - [drivers:proxy:storage:mongodb] uri = mongodb://localhost:27017 database = marconi_proxy @@ -102,4 +89,4 @@ database = marconi_proxy # caching mechanism [oslo_cache] cache_backend = memory -;cache_prefix = my_prefix \ No newline at end of file +;cache_prefix = my_prefix diff --git a/marconi/cmd/gc.py b/marconi/cmd/gc.py deleted file mode 100644 index 92f4e4633..000000000 --- a/marconi/cmd/gc.py +++ /dev/null @@ -1,61 +0,0 @@ -# Copyright (c) 2013 Rackspace Hosting, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# -# See the License for the specific language governing permissions and -# limitations under the License. - -import random -import sys -import time - -from marconi.common import cli -from marconi.common import config -from marconi.openstack.common import log as logging -from marconi.queues import bootstrap - -PROJECT_CFG = config.project('marconi') -LOG = logging.getLogger(__name__) - - -@cli.runnable -def run(): - """Entry point to start marconi-gc. - - Operators should run 2-3 instances on different - boxes for fault-tolerance. - - Note: This call blocks until the process is killed - or interrupted. - """ - - try: - info = _(u'Starting marconi-gc') - print(info + _(u'. Use CTRL+C to exit...\n')) - LOG.info(info) - - boot = bootstrap.Bootstrap(cli_args=sys.argv[1:]) - storage_driver = boot.storage - gc_interval = storage_driver.gc_interval - - # NOTE(kgriffs): Don't want all garbage collector - # instances running at the same time (will peg the DB). - offset = random.random() * gc_interval - time.sleep(offset) - - while True: - storage_driver.gc() - time.sleep(gc_interval) - - except NotImplementedError as ex: - print(_(u'The configured storage driver does not support GC.\n')) - LOG.exception(ex) diff --git a/marconi/queues/storage/mongodb/claims.py b/marconi/queues/storage/mongodb/claims.py index 48220afd6..578c4ac42 100644 --- a/marconi/queues/storage/mongodb/claims.py +++ b/marconi/queues/storage/mongodb/claims.py @@ -21,6 +21,8 @@ Field Mappings: letter of their long name. """ +import datetime + from bson import objectid from marconi.common import config @@ -129,8 +131,9 @@ class ClaimController(storage.ClaimBase): now = timeutils.utcnow_ts() claim_expires = now + ttl - message_expires = claim_expires + grace message_ttl = ttl + grace + message_expiration = datetime.datetime.utcfromtimestamp( + claim_expires + grace) meta = { 'id': oid, @@ -171,10 +174,10 @@ class ClaimController(storage.ClaimBase): # This sets the expiration time to # `expires` on messages that would # expire before claim. - new_values = {'e': message_expires, 't': message_ttl} + new_values = {'e': message_expiration, 't': message_ttl} msg_ctrl._col.update({'q': queue, 'p': project, - 'e': {'$lt': message_expires}, + 'e': {'$lt': message_expiration}, 'c.id': oid}, {'$set': new_values}, upsert=False, multi=True) diff --git a/marconi/queues/storage/mongodb/driver.py b/marconi/queues/storage/mongodb/driver.py index f7f56a6f7..1456c8d7a 100644 --- a/marconi/queues/storage/mongodb/driver.py +++ b/marconi/queues/storage/mongodb/driver.py @@ -31,28 +31,29 @@ class Driver(storage.DriverBase): def __init__(self): # Lazy instantiation self._database = None + self._connection = None + + def _connect(self): + if options.CFG.uri and 'replicaSet' in options.CFG.uri: + self._connection = pymongo.MongoReplicaSetClient(options.CFG.uri) + else: + self._connection = pymongo.MongoClient(options.CFG.uri) @property def db(self): """Property for lazy instantiation of mongodb's database.""" if self._database is None: - if options.CFG.uri and 'replicaSet' in options.CFG.uri: - conn = pymongo.MongoReplicaSetClient(options.CFG.uri) - else: - conn = pymongo.MongoClient(options.CFG.uri) - - self._database = conn[options.CFG.database] + self._database = self.connection[options.CFG.database] return self._database - def gc(self): - LOG.info(_(u'Performing garbage collection.')) + @property + def connection(self): + """Property for lazy instantiation of mongodb's client connection.""" + if self._connection is None: + self._connect() - try: - self.message_controller.remove_expired() - except pymongo.errors.ConnectionFailure as ex: - # Better luck next time... - LOG.exception(ex) + return self._connection @property def gc_interval(self): diff --git a/marconi/queues/storage/mongodb/messages.py b/marconi/queues/storage/mongodb/messages.py index 0d50a9949..ca08d06be 100644 --- a/marconi/queues/storage/mongodb/messages.py +++ b/marconi/queues/storage/mongodb/messages.py @@ -21,6 +21,7 @@ Field Mappings: letter of their long name. """ +import datetime import time import pymongo.errors @@ -39,9 +40,25 @@ CFG = config.namespace('limits:storage').from_options( default_message_paging=10, ) +# NOTE(kgriffs): This value, in seconds, should be at least less than the +# minimum allowed TTL for messages (60 seconds). Make it 45 to allow for +# some fudge room. +MAX_RETRY_POST_DURATION = 45 + +# NOTE(kgriffs): It is extremely unlikely that all workers would somehow hang +# for more than 5 seconds, without a single one being able to succeed in +# posting some messages and incrementing the counter, thus allowing the other +# producers to succeed in turn. +COUNTER_STALL_WINDOW = 5 + # For hinting ID_INDEX_FIELDS = [('_id', 1)] +# For removing expired messages +TTL_INDEX_FIELDS = [ + ('e', 1) +] + # NOTE(kgriffs): This index is for listing messages, usually # filtering out claimed ones. ACTIVE_INDEX_FIELDS = [ @@ -67,11 +84,11 @@ CLAIMED_INDEX_FIELDS = [ ('c.e', 1), ] -# Index used for _next_marker() and also to ensure uniqueness. +# Index used to ensure uniqueness. MARKER_INDEX_FIELDS = [ ('p', 1), ('q', 1), - ('k', -1) + ('k', 1) ] @@ -94,7 +111,7 @@ class MessageController(storage.MessageBase): # Cache for convenience and performance (avoids extra lookups and # recreating the range for every request.) - self._queue_controller = self.driver.queue_controller + self._queue_ctrl = self.driver.queue_controller self._db = self.driver.db self._retry_range = range(options.CFG.max_attempts) @@ -111,6 +128,11 @@ class MessageController(storage.MessageBase): def _ensure_indexes(self): """Ensures that all indexes are created.""" + self._col.ensure_index(TTL_INDEX_FIELDS, + name='ttl', + expireAfterSeconds=0, + background=True) + self._col.ensure_index(ACTIVE_INDEX_FIELDS, name='active', background=True) @@ -135,36 +157,6 @@ class MessageController(storage.MessageBase): unique=True, background=True) - def _next_marker(self, queue_name, project=None): - """Retrieves the next message marker for a given queue. - - This helper is used to generate monotonic pagination - markers that are saved as part of the message - document. Simply taking the max of the current message - markers works, since Marconi always leaves the most recent - message in the queue (new queues always return 1). - - Note 1: Markers are scoped per-queue and so are *not* - globally unique or globally ordered. - - Note 2: If two or more requests to this method are made - in parallel, this method will return the same - marker. This is done intentionally so that the caller - can detect a parallel message post, allowing it to - mitigate race conditions between producer and - observer clients. - - :param queue_name: Determines the scope for the marker - :param project: Queue's project - :returns: next message marker as an integer - """ - - document = self._col.find_one({'p': project, 'q': queue_name}, - sort=[('k', -1)], - fields={'k': 1, '_id': 0}) - - return 1 if document is None else (document['k'] + 1) - def _backoff_sleep(self, attempt): """Sleep between retries using a jitter algorithm. @@ -180,38 +172,6 @@ class MessageController(storage.MessageBase): time.sleep(seconds) - def _remove_expired(self, queue_name, project): - """Removes all expired messages except for the most recent - in each queue. - - This method is used in lieu of mongo's TTL index since we - must always leave at least one message in the queue for - calculating the next marker. - - :param queue_name: name for the queue from which to remove - expired messages - :param project: Project queue_name belong's too - """ - - # Get the message with the highest marker, and leave - # it in the queue - head = self._col.find_one({'p': project, 'q': queue_name}, - sort=[('k', -1)], fields={'k': 1}) - - if head is None: - # Assume queue was just deleted via a parallel request - LOG.debug(_(u'Queue %s is empty or missing.') % queue_name) - return - - query = { - 'p': project, - 'q': queue_name, - 'k': {'$ne': head['k']}, - 'e': {'$lte': timeutils.utcnow_ts()}, - } - - self._col.remove(query, w=0) - def _purge_queue(self, queue_name, project=None): """Removes all messages from the queue. @@ -267,9 +227,6 @@ class MessageController(storage.MessageBase): # queue and project 'p': project, 'q': queue_name, - - # The messages cannot be expired - 'e': {'$gt': now}, } if not echo: @@ -309,9 +266,6 @@ class MessageController(storage.MessageBase): # Messages must belong to this queue 'p': project, 'q': queue_name, - - # The messages can not be expired - 'e': {'$gt': timeutils.utcnow_ts()}, } if not include_claimed: @@ -399,31 +353,6 @@ class MessageController(storage.MessageBase): {'$set': {'c': {'id': None, 'e': now}}}, upsert=False, multi=True) - def remove_expired(self): - """Removes all expired messages except for the most recent - in each queue. - - This method is used in lieu of mongo's TTL index since we - must always leave at least one message in the queue for - calculating the next marker. - - Warning: This method is expensive, since it must perform - separate queries for each queue, due to the requirement that - it must leave at least one message in each queue, and it - is impractical to send a huge list of _id's to filter out - in a single call. That being said, this is somewhat mitigated - by the fact that remove() is run on each queue seperately, - thereby reducing the duration that any given lock is held. - """ - - # TODO(kgriffs): Optimize first by batching the .removes, second - # by setting a 'last inserted ID' in the queue collection for - # each message inserted (TBD, may cause problematic side-effect), - # and third, by changing the marker algorithm such that it no - # longer depends on retaining the last message in the queue! - for name, project in self._queue_controller._get_np(): - self._remove_expired(name, project) - def list(self, queue_name, project=None, marker=None, limit=None, echo=False, client_uuid=None, include_claimed=False): @@ -469,7 +398,6 @@ class MessageController(storage.MessageBase): '_id': mid, 'p': project, 'q': queue_name, - 'e': {'$gt': now} } message = list(self._col.find(query).limit(1).hint(ID_INDEX_FIELDS)) @@ -493,7 +421,6 @@ class MessageController(storage.MessageBase): '_id': {'$in': message_ids}, 'p': project, 'q': queue_name, - 'e': {'$gt': now}, } # NOTE(flaper87): Should this query @@ -508,19 +435,20 @@ class MessageController(storage.MessageBase): @utils.raises_conn_error def post(self, queue_name, messages, client_uuid, project=None): now = timeutils.utcnow_ts() + now_dt = datetime.datetime.utcfromtimestamp(now) - if not self._queue_controller.exists(queue_name, project): + if not self._queue_ctrl.exists(queue_name, project): raise exceptions.QueueDoesNotExist(queue_name, project) # Set the next basis marker for the first attempt. - next_marker = self._next_marker(queue_name, project) + next_marker = self._queue_ctrl._get_counter(queue_name, project) prepared_messages = [ { 't': message['ttl'], 'q': queue_name, 'p': project, - 'e': now + message['ttl'], + 'e': now_dt + datetime.timedelta(seconds=message['ttl']), 'u': client_uuid, 'c': {'id': None, 'e': now}, 'b': message['body'] if 'body' in message else {}, @@ -530,38 +458,47 @@ class MessageController(storage.MessageBase): for index, message in enumerate(messages) ] - # Results are aggregated across all attempts - # NOTE(kgriffs): Using lazy instantiation... - aggregated_results = None - # Use a retry range for sanity, although we expect # to rarely, if ever, reach the maximum number of # retries. + # + # NOTE(kgriffs): With the default configuration (100 ms + # max sleep, 1000 max attempts), the max stall time + # before the operation is abandoned is 49.95 seconds. for attempt in self._retry_range: try: ids = self._col.insert(prepared_messages) - # NOTE(kgriffs): Only use aggregated results if we must, - # which saves some cycles on the happy path. - if aggregated_results: - aggregated_results.extend(ids) - ids = aggregated_results - # Log a message if we retried, for debugging perf issues if attempt != 0: message = _(u'%(attempts)d attempt(s) required to post ' u'%(num_messages)d messages to queue ' - u'%(queue_name)s and project %(project)s') - message %= dict(queue_name=queue_name, attempts=attempt+1, + u'"%(queue)s" under project %(project)s') + message %= dict(queue=queue_name, attempts=attempt+1, num_messages=len(ids), project=project) LOG.debug(message) + # Update the counter in preparation for the next batch + # + # NOTE(kgriffs): Due to the unique index on the messages + # collection, competing inserts will fail as a whole, + # and keep retrying until the counter is incremented + # such that the competing marker's will start at a + # unique number, 1 past the max of the messages just + # inserted above. + self._queue_ctrl._inc_counter(queue_name, project, + amount=len(ids)) + return map(str, ids) except pymongo.errors.DuplicateKeyError as ex: # Try again with the remaining messages + # TODO(kgriffs): Record stats of how often retries happen, + # and how many attempts, on average, are required to insert + # messages. + # NOTE(kgriffs): This can be used in conjunction with the # log line, above, that is emitted after all messages have # been posted, to guage how long it is taking for messages @@ -570,43 +507,76 @@ class MessageController(storage.MessageBase): # TODO(kgriffs): Add transaction ID to help match up loglines if attempt == 0: message = _(u'First attempt failed while ' - u'adding messages to queue %s ' - u'for current request') % queue_name + u'adding messages to queue ' + u'"%(queue)s" under project %(project)s') + message %= dict(queue=queue_name, project=project) LOG.debug(message) - # TODO(kgriffs): Record stats of how often retries happen, - # and how many attempts, on average, are required to insert - # messages. + # NOTE(kgriffs): Never retry past the point that competing + # messages expire and are GC'd, since once they are gone, + # the unique index no longer protects us from getting out + # of order, which could cause an observer to miss this + # message. The code below provides a sanity-check to ensure + # this situation can not happen. + elapsed = timeutils.utcnow_ts() - now + if elapsed > MAX_RETRY_POST_DURATION: + message = _(u'Exceeded maximum retry duration for queue ' + u'"%(queue)s" under project %(project)s') + message %= dict(queue=queue_name, project=project) - # NOTE(kgriffs): Slice prepared_messages. We have to interpret - # the error message to get the duplicate key, which gives - # us the marker that had a dupe, allowing us to extrapolate - # how many messages were consumed, since markers are monotonic - # counters. - duplicate_marker = utils.dup_marker_from_error(str(ex)) - failed_index = duplicate_marker - next_marker - - # Put the successful one's IDs into aggregated_results. - succeeded_messages = prepared_messages[:failed_index] - succeeded_ids = [msg['_id'] for msg in succeeded_messages] - - # Results are aggregated across all attempts - if aggregated_results is None: - aggregated_results = succeeded_ids - else: - aggregated_results.extend(succeeded_ids) - - # Retry the remaining messages with a new sequence - # of markers. - prepared_messages = prepared_messages[failed_index:] - next_marker = self._next_marker(queue_name, project) - for index, message in enumerate(prepared_messages): - message['k'] = next_marker + index + LOG.warning(message) + break # Chill out for a moment to mitigate thrashing/thundering self._backoff_sleep(attempt) + # NOTE(kgriffs): Perhaps we failed because a worker crashed + # after inserting messages, but before incrementing the + # counter; that would cause all future requests to stall, + # since they would keep getting the same base marker that is + # conflicting with existing messages, until the messages that + # "won" expire, at which time we would end up reusing markers, + # and that could make some messages invisible to an observer + # that is querying with a marker that is large than the ones + # being reused. + # + # To mitigate this, we apply a heuristic to determine whether + # a counter has stalled. We attempt to increment the counter, + # but only if it hasn't been updated for a few seconds, which + # should mean that nobody is left to update it! + # + # Note that we increment one at a time until the logjam is + # broken, since we don't know how many messages were posted + # by the worker before it crashed. + next_marker = self._queue_ctrl._inc_counter( + queue_name, project, window=COUNTER_STALL_WINDOW) + + # Retry the entire batch with a new sequence of markers. + # + # NOTE(kgriffs): Due to the unique index, and how + # MongoDB works with batch requests, we will never + # end up with a partially-successful update. The first + # document in the batch will fail to insert, and the + # remainder of the documents will not be attempted. + if next_marker is None: + # NOTE(kgriffs): Usually we will end up here, since + # it should be rare that a counter becomes stalled. + next_marker = self._queue_ctrl._get_counter( + queue_name, project) + else: + message = (u'Detected a stalled message counter for ' + u'queue "%(queue)s" under project %(project)s. ' + u'The counter was incremented to %(value)d.') + message %= dict(queue=queue_name, + project=project, + value=next_marker) + + LOG.warning(message) + + for index, message in enumerate(prepared_messages): + message['k'] = next_marker + index + except Exception as ex: # TODO(kgriffs): Query the DB to get the last marker that # made it, and extrapolate from there to figure out what @@ -616,13 +586,13 @@ class MessageController(storage.MessageBase): raise message = _(u'Hit maximum number of attempts (%(max)s) for queue ' - u'%(id)s in project %(project)s') - message %= dict(max=options.CFG.max_attempts, id=queue_name, + u'"%(queue)s" under project %(project)s') + message %= dict(max=options.CFG.max_attempts, queue=queue_name, project=project) LOG.warning(message) - succeeded_ids = map(str, aggregated_results) + succeeded_ids = [] raise exceptions.MessageConflict(queue_name, project, succeeded_ids) @utils.raises_conn_error @@ -647,7 +617,6 @@ class MessageController(storage.MessageBase): return now = timeutils.utcnow_ts() - query['e'] = {'$gt': now} message = self._col.find_one(query) if message is None: diff --git a/marconi/queues/storage/mongodb/queues.py b/marconi/queues/storage/mongodb/queues.py index 4726c883c..2fac32d0a 100644 --- a/marconi/queues/storage/mongodb/queues.py +++ b/marconi/queues/storage/mongodb/queues.py @@ -40,13 +40,20 @@ class QueueController(storage.QueueBase): """Implements queue resource operations using MongoDB. Queues: - Name Field - ------------------ - name -> n - project -> p - counter -> c - metadata -> m + Name Field + ------------------- + name -> n + project -> p + msg counter -> c + metadata -> m + + Message Counter: + + Name Field + ------------------- + value -> v + modified ts -> t """ def __init__(self, *args, **kwargs): @@ -76,6 +83,89 @@ class QueueController(storage.QueueBase): cursor = self._col.find({}, fields={'n': 1, 'p': 1}) return ((doc['n'], doc['p']) for doc in cursor) + def _get_counter(self, name, project=None): + """Retrieves the current message counter value for a given queue. + + This helper is used to generate monotonic pagination + markers that are saved as part of the message + document. + + Note 1: Markers are scoped per-queue and so are *not* + globally unique or globally ordered. + + Note 2: If two or more requests to this method are made + in parallel, this method will return the same counter + value. This is done intentionally so that the caller + can detect a parallel message post, allowing it to + mitigate race conditions between producer and + observer clients. + + :param name: Name of the queue to which the counter is scoped + :param project: Queue's project + :returns: current message counter as an integer + """ + + doc = self._col.find_one({'p': project, 'n': name}, + fields={'c.v': 1, '_id': 0}) + + if doc is None: + raise exceptions.QueueDoesNotExist(name, project) + + return doc['c']['v'] + + def _inc_counter(self, name, project=None, amount=1, window=None): + """Increments the message counter and returns the new value. + + :param name: Name of the queue to which the counter is scoped + :param project: Queue's project name + :param amount: (Default 1) Amount by which to increment the counter + :param window: (Default None) A time window, in seconds, that + must have elapsed since the counter was last updated, in + order to increment the counter. + + :returns: Updated message counter value, or None if window + was specified, and the counter has already been updated + within the specified time period. + + :raises: storage.exceptions.QueueDoesNotExist + """ + now = timeutils.utcnow_ts() + + update = {'$inc': {'c.v': amount}, '$set': {'c.t': now}} + query = {'p': project, 'n': name} + if window is not None: + threshold = now - window + query['c.t'] = {'$lt': threshold} + + while True: + try: + doc = self._col.find_and_modify(query, update, new=True, + fields={'c.v': 1, '_id': 0}) + break + except pymongo.errors.AutoReconnect as ex: + LOG.exception(ex) + + if doc is None: + if window is None: + # NOTE(kgriffs): Since we did not filter by a time window, + # the queue should have been found and updated. Perhaps + # the queue has been deleted? + message = _(u'Failed to increment the message ' + u'counter for queue %(name)s and ' + u'project %(project)s') + message %= dict(name=name, project=project) + + LOG.warning(message) + + raise exceptions.QueueDoesNotExist(name, project) + + # NOTE(kgriffs): Assume the queue existed, but the counter + # was recently updated, causing the range query on 'c.t' to + # exclude the record. + return None + + return doc['c']['v'] + #----------------------------------------------------------------------- # Interface #----------------------------------------------------------------------- @@ -116,7 +206,12 @@ class QueueController(storage.QueueBase): @utils.raises_conn_error def create(self, name, project=None): try: - self._col.insert({'p': project, 'n': name, 'm': {}, 'c': 1}) + # NOTE(kgriffs): Start counting at 1, and assume the first + # message ever posted will succeed and set t to a UNIX + # "modified at" timestamp. + counter = {'v': 1, 't': 0} + + self._col.insert({'p': project, 'n': name, 'm': {}, 'c': counter}) except pymongo.errors.DuplicateKeyError: return False diff --git a/marconi/queues/storage/mongodb/utils.py b/marconi/queues/storage/mongodb/utils.py index 88847bcd1..c40cf341a 100644 --- a/marconi/queues/storage/mongodb/utils.py +++ b/marconi/queues/storage/mongodb/utils.py @@ -17,21 +17,16 @@ import collections import datetime import functools import random -import re from bson import errors as berrors from bson import objectid from bson import tz_util from pymongo import errors -from marconi.common import exceptions import marconi.openstack.common.log as logging from marconi.openstack.common import timeutils from marconi.queues.storage import exceptions as storage_exceptions - -DUP_MARKER_REGEX = re.compile(r'\$queue_marker.*?:\s(\d+)') - # BSON ObjectId gives TZ-aware datetime, so we generate a # TZ-aware UNIX epoch for convenience. EPOCH = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=tz_util.utc) @@ -39,24 +34,6 @@ EPOCH = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=tz_util.utc) LOG = logging.getLogger(__name__) -def dup_marker_from_error(error_message): - """Extracts the duplicate marker from a MongoDB error string. - - :param error_message: raw error message string returned - by mongod on a duplicate key error. - - :raises: marconi.common.exceptions.PatternNotFound - :returns: extracted marker as an integer - """ - match = DUP_MARKER_REGEX.findall(error_message) - if not match: - description = (u'Error message could not be parsed: %s' % - error_message) - raise exceptions.PatternNotFound(description) - - return int(match[-1]) - - def cached_gen(iterable): """Converts the iterable into a caching generator. diff --git a/marconi/tests/__init__.py b/marconi/tests/__init__.py index 2f5d176fa..87b51ff79 100644 --- a/marconi/tests/__init__.py +++ b/marconi/tests/__init__.py @@ -18,5 +18,9 @@ from marconi.tests import base from marconi.tests import helpers -TestBase = base.TestBase +SKIP_SLOW_TESTS = helpers.SKIP_SLOW_TESTS +RUN_SLOW_TESTS = not SKIP_SLOW_TESTS + expect = helpers.expect +is_slow = helpers.is_slow +TestBase = base.TestBase diff --git a/marconi/tests/helpers.py b/marconi/tests/helpers.py index 861e3da1f..0a32fdb1f 100644 --- a/marconi/tests/helpers.py +++ b/marconi/tests/helpers.py @@ -14,10 +14,14 @@ # limitations under the License. import contextlib +import functools +import os import uuid import six +SKIP_SLOW_TESTS = os.environ.get('MARCONI_TEST_SLOW') is None + @contextlib.contextmanager def expect(*exc_type): @@ -120,3 +124,30 @@ def entries(controller, count): for p, q, _, _ in spec: controller.delete(p, q) + + +def is_slow(condition=lambda self: True): + """Decorator to flag slow tests. + + Slow tests will be skipped if MARCONI_TEST_SLOW is set, and + condition(self) returns True. + + :param condition: Function that returns True IFF the test will be slow; + useful for child classes which may modify the behavior of a test + such that it may or may not be slow. + """ + + def decorator(func): + @functools.wraps(func) + def wrapper(self): + if SKIP_SLOW_TESTS and condition(self): + msg = ('Skipping slow test. Set MARCONI_TEST_SLOW ' + 'to enable slow tests.') + + self.skipTest(msg) + + func(self) + + return wrapper + + return decorator diff --git a/marconi/tests/queues/storage/base.py b/marconi/tests/queues/storage/base.py index 956764014..8d5478ba1 100644 --- a/marconi/tests/queues/storage/base.py +++ b/marconi/tests/queues/storage/base.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime import time from testtools import matchers @@ -43,6 +44,10 @@ class ControllerBaseTest(testing.TestBase): self.driver = self.driver_class() self.controller = self.controller_class(self.driver) + def tearDown(self): + timeutils.clear_time_override() + super(ControllerBaseTest, self).tearDown() + class QueueControllerTest(ControllerBaseTest): """Queue Controller base tests.""" @@ -53,10 +58,6 @@ class QueueControllerTest(ControllerBaseTest): self.message_controller = self.driver.message_controller self.claim_controller = self.driver.claim_controller - def tearDown(self): - timeutils.clear_time_override() - super(QueueControllerTest, self).tearDown() - def test_list(self): num = 15 for queue in xrange(num): @@ -132,21 +133,18 @@ class QueueControllerTest(ControllerBaseTest): self.assertNotEqual(oldest, newest) - # NOTE(kgriffs): Ensure "now" is different enough + # NOTE(kgriffs): Ensure is different enough # for the next comparison to work. - timeutils.set_time_override() - timeutils.advance_time_seconds(60) + soon = timeutils.utcnow() + datetime.timedelta(seconds=60) for message_stat in (oldest, newest): created_iso = message_stat['created'] created = timeutils.parse_isotime(created_iso) self.assertThat(timeutils.normalize_time(created), - matchers.LessThan(timeutils.utcnow())) + matchers.LessThan(soon)) self.assertIn('id', message_stat) - timeutils.clear_time_override() - self.assertThat(oldest['created'], matchers.LessThan(newest['created'])) @@ -188,6 +186,9 @@ class MessageControllerTest(ControllerBaseTest): queue_name = 'test_queue' controller_base_class = storage.MessageBase + # Specifies how often expired messages are purged, in sec. + gc_interval = 0 + def setUp(self): super(MessageControllerTest, self).setUp() @@ -342,7 +343,8 @@ class MessageControllerTest(ControllerBaseTest): project=self.project, claim=cid) - def test_expired_message(self): + @testing.is_slow(condition=lambda self: self.gc_interval != 0) + def test_expired_messages(self): messages = [{'body': 3.14, 'ttl': 0}] [msgid] = self.controller.post(self.queue_name, messages, @@ -353,14 +355,16 @@ class MessageControllerTest(ControllerBaseTest): project=self.project, client_uuid='my_uuid') + time.sleep(self.gc_interval) + with testing.expect(storage.exceptions.DoesNotExist): self.controller.get(self.queue_name, msgid, project=self.project) - countof = self.queue_controller.stats(self.queue_name, - project=self.project) + stats = self.queue_controller.stats(self.queue_name, + project=self.project) - self.assertEquals(countof['messages']['free'], 0) + self.assertEquals(stats['messages']['free'], 0) def test_bad_id(self): # NOTE(cpp-cabrera): A malformed ID should result in an empty diff --git a/setup.cfg b/setup.cfg index fbd9a626a..8366afd00 100644 --- a/setup.cfg +++ b/setup.cfg @@ -24,7 +24,6 @@ packages = [entry_points] console_scripts = - marconi-gc = marconi.cmd.gc:run marconi-server = marconi.cmd.server:run marconi.storage = diff --git a/tests/unit/queues/storage/test_impl_mongodb.py b/tests/unit/queues/storage/test_impl_mongodb.py index 30d6b1d5e..81453740f 100644 --- a/tests/unit/queues/storage/test_impl_mongodb.py +++ b/tests/unit/queues/storage/test_impl_mongodb.py @@ -21,8 +21,9 @@ from pymongo import cursor import pymongo.errors from testtools import matchers -from marconi.common import exceptions +from marconi.openstack.common import timeutils from marconi.queues import storage +from marconi.queues.storage import exceptions from marconi.queues.storage import mongodb from marconi.queues.storage.mongodb import controllers from marconi.queues.storage.mongodb import options as mongodb_options @@ -33,28 +34,6 @@ from marconi.tests.queues.storage import base class MongodbUtilsTest(testing.TestBase): - def test_dup_marker_from_error(self): - error_message = ('E11000 duplicate key error index: ' - 'marconi.messages.$queue_marker dup key: ' - '{ : "queue", : "project", : 3 }') - - marker = utils.dup_marker_from_error(error_message) - self.assertEquals(marker, 3) - - error_message = ('E11000 duplicate key error index: ' - 'marconi.messages.$x_y dup key: ' - '{ : "queue", : "project", : 3 }') - - self.assertRaises(exceptions.PatternNotFound, - utils.dup_marker_from_error, error_message) - - error_message = ('E11000 duplicate key error index: ' - 'marconi.messages.$queue_marker dup key: ' - '{ : ObjectId("51adff46b100eb85d8a93a2d") }') - - self.assertRaises(exceptions.PatternNotFound, - utils.dup_marker_from_error, error_message) - def test_calculate_backoff(self): sec = utils.calculate_backoff(0, 10, 2, 0) self.assertEquals(sec, 0) @@ -111,6 +90,7 @@ class MongodbQueueTests(base.QueueControllerTest): def tearDown(self): self.controller._col.drop() + self.message_controller._col.drop() super(MongodbQueueTests, self).tearDown() def test_indexes(self): @@ -143,6 +123,9 @@ class MongodbMessageTests(base.MessageControllerTest): driver_class = mongodb.Driver controller_class = controllers.MessageController + # NOTE(kgriffs): MongoDB's TTL scavenger only runs once a minute + gc_interval = 60 + def setUp(self): if not os.environ.get('MONGODB_TEST_LIVE'): self.skipTest('No MongoDB instance running') @@ -152,11 +135,9 @@ class MongodbMessageTests(base.MessageControllerTest): def tearDown(self): self.controller._col.drop() + self.queue_controller._col.drop() super(MongodbMessageTests, self).tearDown() - def _count_expired(self, queue, project=None): - return self.controller._count_expired(queue, project) - def test_indexes(self): col = self.controller._col indexes = col.index_information() @@ -165,58 +146,121 @@ class MongodbMessageTests(base.MessageControllerTest): self.assertIn('queue_marker', indexes) self.assertIn('counting', indexes) - def test_next_marker(self): + def test_message_counter(self): queue_name = 'marker_test' iterations = 10 self.queue_controller.create(queue_name) - seed_marker1 = self.controller._next_marker(queue_name) + seed_marker1 = self.queue_controller._get_counter(queue_name) self.assertEqual(seed_marker1, 1, 'First marker is 1') for i in range(iterations): self.controller.post(queue_name, [{'ttl': 60}], 'uuid') - marker1 = self.controller._next_marker(queue_name) - marker2 = self.controller._next_marker(queue_name) - marker3 = self.controller._next_marker(queue_name) + + marker1 = self.queue_controller._get_counter(queue_name) + marker2 = self.queue_controller._get_counter(queue_name) + marker3 = self.queue_controller._get_counter(queue_name) + self.assertEqual(marker1, marker2) self.assertEqual(marker2, marker3) - self.assertEqual(marker1, i + 2) - def test_remove_expired(self): - num_projects = 10 - num_queues = 10 - messages_per_queue = 100 + new_value = self.queue_controller._inc_counter(queue_name) + self.assertIsNotNone(new_value) - projects = ['gc-test-project-{0}'.format(i) - for i in range(num_projects)] + value_before = self.queue_controller._get_counter(queue_name) + new_value = self.queue_controller._inc_counter(queue_name) + self.assertIsNotNone(new_value) + value_after = self.queue_controller._get_counter(queue_name) + self.assertEquals(value_after, value_before + 1) - queue_names = ['gc-test-{0}'.format(i) for i in range(num_queues)] - client_uuid = 'b623c53c-cf75-11e2-84e1-a1187188419e' - messages = [{'ttl': 0, 'body': str(i)} - for i in range(messages_per_queue)] + value_before = value_after + new_value = self.queue_controller._inc_counter(queue_name, amount=7) + value_after = self.queue_controller._get_counter(queue_name) + self.assertEquals(value_after, value_before + 7) + self.assertEquals(value_after, new_value) - for project in projects: - for queue in queue_names: - self.queue_controller.create(queue, project) - self.controller.post(queue, messages, client_uuid, project) + reference_value = value_after - self.controller.remove_expired() + unchanged = self.queue_controller._inc_counter(queue_name, window=10) + self.assertIsNone(unchanged) - for project in projects: - for queue in queue_names: - query = {'q': queue, 'p': project} + # TODO(kgriffs): Pass utcnow to work around bug + # in set_time_override until we merge the fix in + # from upstream. + timeutils.set_time_override(timeutils.utcnow()) - cursor = self.driver.db.messages.find(query) - count = cursor.count() + timeutils.advance_time_seconds(10) + changed = self.queue_controller._inc_counter(queue_name, window=5) + self.assertEquals(changed, reference_value + 1) + timeutils.clear_time_override() - # Expect that the most recent message for each queue - # will not be removed. - self.assertEquals(count, 1) + def test_race_condition_on_post(self): + queue_name = 'marker_test' + self.queue_controller.create(queue_name) - message = next(cursor) - self.assertEquals(message['k'], messages_per_queue) + expected_messages = [ + { + 'ttl': 60, + 'body': { + 'event': 'BackupStarted', + 'backupId': 'c378813c-3f0b-11e2-ad92-7823d2b0f3ce', + }, + }, + { + 'ttl': 60, + 'body': { + 'event': 'BackupStarted', + 'backupId': 'd378813c-3f0b-11e2-ad92-7823d2b0f3ce', + }, + }, + { + 'ttl': 60, + 'body': { + 'event': 'BackupStarted', + 'backupId': 'e378813c-3f0b-11e2-ad92-7823d2b0f3ce', + }, + }, + ] + + uuid = '97b64000-2526-11e3-b088-d85c1300734c' + + # NOTE(kgriffs): Patch _inc_counter so it is a noop, so that + # the second time we post, we will get a collision. This simulates + # what happens when we have parallel requests and the "winning" + # requests hasn't gotten around to calling _inc_counter before the + # "losing" request attempts to insert it's batch of messages. + with mock.patch.object(mongodb.queues.QueueController, + '_inc_counter', autospec=True) as method: + + method.return_value = 2 + messages = expected_messages[:1] + created = list(self.controller.post(queue_name, messages, uuid)) + self.assertEquals(len(created), 1) + + # Force infinite retries + if testing.RUN_SLOW_TESTS: + method.return_value = None + + with testing.expect(exceptions.MessageConflict): + self.controller.post(queue_name, messages, uuid) + + created = list(self.controller.post(queue_name, + expected_messages[1:], + uuid)) + + self.assertEquals(len(created), 2) + + expected_ids = [m['body']['backupId'] for m in expected_messages] + + interaction = self.controller.list(queue_name, client_uuid=uuid, + echo=True) + actual_messages = list(next(interaction)) + self.assertEquals(len(actual_messages), len(expected_messages)) + actual_ids = [m['body']['backupId'] for m in actual_messages] + + self.assertEquals(actual_ids, expected_ids) def test_empty_queue_exception(self): queue_name = 'empty-queue-test' @@ -244,6 +288,11 @@ class MongodbClaimTests(base.ClaimControllerTest): super(MongodbClaimTests, self).setUp() self.load_conf('wsgi_mongodb.conf') + def tearDown(self): + self.message_controller._col.drop() + self.queue_controller._col.drop() + super(MongodbClaimTests, self).tearDown() + def test_claim_doesnt_exist(self): """Verifies that operations fail on expired/missing claims.