fix(mongodb): Marker generation creates a bottleneck under heavy load

This patch changes markers so that they are generated using a per-queue
side counter. A heuristic is used to mitigate a race condition. Due to the
new semantics, partial inserts are no longer possible due to collisions,
which ended up simplifying the retry logic for posting messages.

As a consequence of this patch, the last message posted no longer needs
to remain in the queue indefinitely, rendering marconi-gc unnecessary,
and so it has been removed.

Also, since the mongod GC worker runs once a minute, the queries no longer
filter out expired-but-not-yet-gc'd messages; on average, a message may
live more than 30 seconds passed it's expected lifetime, but I do not
think that this will harm or complicate any application building on top of
Marconi, practically speaking. That being said, it is worth calling out
in documentation.

Closes-Bug: #1218602
Change-Id: I34e24e7dd7c4e017c84eb5929ce37ad4c9e5266a
This commit is contained in:
kgriffs 2013-09-16 18:19:14 -05:00
parent 0ee1ab8e75
commit 1512db43ec
12 changed files with 399 additions and 341 deletions

View File

@ -59,19 +59,6 @@ database = marconi
# at the same instant. # at the same instant.
;max_retry_jitter = 0.005 ;max_retry_jitter = 0.005
# Frequency of message garbage collections, in seconds
;gc_interval = 300
# Threshold of number of expired messages to reach in a given
# queue, before performing the GC. Useful for reducing frequent
# locks on the DB for non-busy queues, or for worker queues
# which process jobs quickly enough to keep the number of in-
# flight messages low.
#
# Note: The higher this number, the larger the memory-mapped DB
# files will be.
;gc_threshold = 1000
[drivers:proxy:storage:mongodb] [drivers:proxy:storage:mongodb]
uri = mongodb://localhost:27017 uri = mongodb://localhost:27017
database = marconi_proxy database = marconi_proxy

View File

@ -1,61 +0,0 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import sys
import time
from marconi.common import cli
from marconi.common import config
from marconi.openstack.common import log as logging
from marconi.queues import bootstrap
PROJECT_CFG = config.project('marconi')
LOG = logging.getLogger(__name__)
@cli.runnable
def run():
"""Entry point to start marconi-gc.
Operators should run 2-3 instances on different
boxes for fault-tolerance.
Note: This call blocks until the process is killed
or interrupted.
"""
try:
info = _(u'Starting marconi-gc')
print(info + _(u'. Use CTRL+C to exit...\n'))
LOG.info(info)
boot = bootstrap.Bootstrap(cli_args=sys.argv[1:])
storage_driver = boot.storage
gc_interval = storage_driver.gc_interval
# NOTE(kgriffs): Don't want all garbage collector
# instances running at the same time (will peg the DB).
offset = random.random() * gc_interval
time.sleep(offset)
while True:
storage_driver.gc()
time.sleep(gc_interval)
except NotImplementedError as ex:
print(_(u'The configured storage driver does not support GC.\n'))
LOG.exception(ex)

View File

@ -21,6 +21,8 @@ Field Mappings:
letter of their long name. letter of their long name.
""" """
import datetime
from bson import objectid from bson import objectid
from marconi.common import config from marconi.common import config
@ -129,8 +131,9 @@ class ClaimController(storage.ClaimBase):
now = timeutils.utcnow_ts() now = timeutils.utcnow_ts()
claim_expires = now + ttl claim_expires = now + ttl
message_expires = claim_expires + grace
message_ttl = ttl + grace message_ttl = ttl + grace
message_expiration = datetime.datetime.utcfromtimestamp(
claim_expires + grace)
meta = { meta = {
'id': oid, 'id': oid,
@ -171,10 +174,10 @@ class ClaimController(storage.ClaimBase):
# This sets the expiration time to # This sets the expiration time to
# `expires` on messages that would # `expires` on messages that would
# expire before claim. # expire before claim.
new_values = {'e': message_expires, 't': message_ttl} new_values = {'e': message_expiration, 't': message_ttl}
msg_ctrl._col.update({'q': queue, msg_ctrl._col.update({'q': queue,
'p': project, 'p': project,
'e': {'$lt': message_expires}, 'e': {'$lt': message_expiration},
'c.id': oid}, 'c.id': oid},
{'$set': new_values}, {'$set': new_values},
upsert=False, multi=True) upsert=False, multi=True)

View File

@ -31,28 +31,29 @@ class Driver(storage.DriverBase):
def __init__(self): def __init__(self):
# Lazy instantiation # Lazy instantiation
self._database = None self._database = None
self._connection = None
def _connect(self):
if options.CFG.uri and 'replicaSet' in options.CFG.uri:
self._connection = pymongo.MongoReplicaSetClient(options.CFG.uri)
else:
self._connection = pymongo.MongoClient(options.CFG.uri)
@property @property
def db(self): def db(self):
"""Property for lazy instantiation of mongodb's database.""" """Property for lazy instantiation of mongodb's database."""
if self._database is None: if self._database is None:
if options.CFG.uri and 'replicaSet' in options.CFG.uri: self._database = self.connection[options.CFG.database]
conn = pymongo.MongoReplicaSetClient(options.CFG.uri)
else:
conn = pymongo.MongoClient(options.CFG.uri)
self._database = conn[options.CFG.database]
return self._database return self._database
def gc(self): @property
LOG.info(_(u'Performing garbage collection.')) def connection(self):
"""Property for lazy instantiation of mongodb's client connection."""
if self._connection is None:
self._connect()
try: return self._connection
self.message_controller.remove_expired()
except pymongo.errors.ConnectionFailure as ex:
# Better luck next time...
LOG.exception(ex)
@property @property
def gc_interval(self): def gc_interval(self):

View File

@ -21,6 +21,7 @@ Field Mappings:
letter of their long name. letter of their long name.
""" """
import datetime
import time import time
import pymongo.errors import pymongo.errors
@ -39,9 +40,25 @@ CFG = config.namespace('limits:storage').from_options(
default_message_paging=10, default_message_paging=10,
) )
# NOTE(kgriffs): This value, in seconds, should be at least less than the
# minimum allowed TTL for messages (60 seconds). Make it 45 to allow for
# some fudge room.
MAX_RETRY_POST_DURATION = 45
# NOTE(kgriffs): It is extremely unlikely that all workers would somehow hang
# for more than 5 seconds, without a single one being able to succeed in
# posting some messages and incrementing the counter, thus allowing the other
# producers to succeed in turn.
COUNTER_STALL_WINDOW = 5
# For hinting # For hinting
ID_INDEX_FIELDS = [('_id', 1)] ID_INDEX_FIELDS = [('_id', 1)]
# For removing expired messages
TTL_INDEX_FIELDS = [
('e', 1)
]
# NOTE(kgriffs): This index is for listing messages, usually # NOTE(kgriffs): This index is for listing messages, usually
# filtering out claimed ones. # filtering out claimed ones.
ACTIVE_INDEX_FIELDS = [ ACTIVE_INDEX_FIELDS = [
@ -67,11 +84,11 @@ CLAIMED_INDEX_FIELDS = [
('c.e', 1), ('c.e', 1),
] ]
# Index used for _next_marker() and also to ensure uniqueness. # Index used to ensure uniqueness.
MARKER_INDEX_FIELDS = [ MARKER_INDEX_FIELDS = [
('p', 1), ('p', 1),
('q', 1), ('q', 1),
('k', -1) ('k', 1)
] ]
@ -94,7 +111,7 @@ class MessageController(storage.MessageBase):
# Cache for convenience and performance (avoids extra lookups and # Cache for convenience and performance (avoids extra lookups and
# recreating the range for every request.) # recreating the range for every request.)
self._queue_controller = self.driver.queue_controller self._queue_ctrl = self.driver.queue_controller
self._db = self.driver.db self._db = self.driver.db
self._retry_range = range(options.CFG.max_attempts) self._retry_range = range(options.CFG.max_attempts)
@ -111,6 +128,11 @@ class MessageController(storage.MessageBase):
def _ensure_indexes(self): def _ensure_indexes(self):
"""Ensures that all indexes are created.""" """Ensures that all indexes are created."""
self._col.ensure_index(TTL_INDEX_FIELDS,
name='ttl',
expireAfterSeconds=0,
background=True)
self._col.ensure_index(ACTIVE_INDEX_FIELDS, self._col.ensure_index(ACTIVE_INDEX_FIELDS,
name='active', name='active',
background=True) background=True)
@ -135,36 +157,6 @@ class MessageController(storage.MessageBase):
unique=True, unique=True,
background=True) background=True)
def _next_marker(self, queue_name, project=None):
"""Retrieves the next message marker for a given queue.
This helper is used to generate monotonic pagination
markers that are saved as part of the message
document. Simply taking the max of the current message
markers works, since Marconi always leaves the most recent
message in the queue (new queues always return 1).
Note 1: Markers are scoped per-queue and so are *not*
globally unique or globally ordered.
Note 2: If two or more requests to this method are made
in parallel, this method will return the same
marker. This is done intentionally so that the caller
can detect a parallel message post, allowing it to
mitigate race conditions between producer and
observer clients.
:param queue_name: Determines the scope for the marker
:param project: Queue's project
:returns: next message marker as an integer
"""
document = self._col.find_one({'p': project, 'q': queue_name},
sort=[('k', -1)],
fields={'k': 1, '_id': 0})
return 1 if document is None else (document['k'] + 1)
def _backoff_sleep(self, attempt): def _backoff_sleep(self, attempt):
"""Sleep between retries using a jitter algorithm. """Sleep between retries using a jitter algorithm.
@ -180,38 +172,6 @@ class MessageController(storage.MessageBase):
time.sleep(seconds) time.sleep(seconds)
def _remove_expired(self, queue_name, project):
"""Removes all expired messages except for the most recent
in each queue.
This method is used in lieu of mongo's TTL index since we
must always leave at least one message in the queue for
calculating the next marker.
:param queue_name: name for the queue from which to remove
expired messages
:param project: Project queue_name belong's too
"""
# Get the message with the highest marker, and leave
# it in the queue
head = self._col.find_one({'p': project, 'q': queue_name},
sort=[('k', -1)], fields={'k': 1})
if head is None:
# Assume queue was just deleted via a parallel request
LOG.debug(_(u'Queue %s is empty or missing.') % queue_name)
return
query = {
'p': project,
'q': queue_name,
'k': {'$ne': head['k']},
'e': {'$lte': timeutils.utcnow_ts()},
}
self._col.remove(query, w=0)
def _purge_queue(self, queue_name, project=None): def _purge_queue(self, queue_name, project=None):
"""Removes all messages from the queue. """Removes all messages from the queue.
@ -267,9 +227,6 @@ class MessageController(storage.MessageBase):
# queue and project # queue and project
'p': project, 'p': project,
'q': queue_name, 'q': queue_name,
# The messages cannot be expired
'e': {'$gt': now},
} }
if not echo: if not echo:
@ -309,9 +266,6 @@ class MessageController(storage.MessageBase):
# Messages must belong to this queue # Messages must belong to this queue
'p': project, 'p': project,
'q': queue_name, 'q': queue_name,
# The messages can not be expired
'e': {'$gt': timeutils.utcnow_ts()},
} }
if not include_claimed: if not include_claimed:
@ -399,31 +353,6 @@ class MessageController(storage.MessageBase):
{'$set': {'c': {'id': None, 'e': now}}}, {'$set': {'c': {'id': None, 'e': now}}},
upsert=False, multi=True) upsert=False, multi=True)
def remove_expired(self):
"""Removes all expired messages except for the most recent
in each queue.
This method is used in lieu of mongo's TTL index since we
must always leave at least one message in the queue for
calculating the next marker.
Warning: This method is expensive, since it must perform
separate queries for each queue, due to the requirement that
it must leave at least one message in each queue, and it
is impractical to send a huge list of _id's to filter out
in a single call. That being said, this is somewhat mitigated
by the fact that remove() is run on each queue seperately,
thereby reducing the duration that any given lock is held.
"""
# TODO(kgriffs): Optimize first by batching the .removes, second
# by setting a 'last inserted ID' in the queue collection for
# each message inserted (TBD, may cause problematic side-effect),
# and third, by changing the marker algorithm such that it no
# longer depends on retaining the last message in the queue!
for name, project in self._queue_controller._get_np():
self._remove_expired(name, project)
def list(self, queue_name, project=None, marker=None, limit=None, def list(self, queue_name, project=None, marker=None, limit=None,
echo=False, client_uuid=None, include_claimed=False): echo=False, client_uuid=None, include_claimed=False):
@ -469,7 +398,6 @@ class MessageController(storage.MessageBase):
'_id': mid, '_id': mid,
'p': project, 'p': project,
'q': queue_name, 'q': queue_name,
'e': {'$gt': now}
} }
message = list(self._col.find(query).limit(1).hint(ID_INDEX_FIELDS)) message = list(self._col.find(query).limit(1).hint(ID_INDEX_FIELDS))
@ -493,7 +421,6 @@ class MessageController(storage.MessageBase):
'_id': {'$in': message_ids}, '_id': {'$in': message_ids},
'p': project, 'p': project,
'q': queue_name, 'q': queue_name,
'e': {'$gt': now},
} }
# NOTE(flaper87): Should this query # NOTE(flaper87): Should this query
@ -508,19 +435,20 @@ class MessageController(storage.MessageBase):
@utils.raises_conn_error @utils.raises_conn_error
def post(self, queue_name, messages, client_uuid, project=None): def post(self, queue_name, messages, client_uuid, project=None):
now = timeutils.utcnow_ts() now = timeutils.utcnow_ts()
now_dt = datetime.datetime.utcfromtimestamp(now)
if not self._queue_controller.exists(queue_name, project): if not self._queue_ctrl.exists(queue_name, project):
raise exceptions.QueueDoesNotExist(queue_name, project) raise exceptions.QueueDoesNotExist(queue_name, project)
# Set the next basis marker for the first attempt. # Set the next basis marker for the first attempt.
next_marker = self._next_marker(queue_name, project) next_marker = self._queue_ctrl._get_counter(queue_name, project)
prepared_messages = [ prepared_messages = [
{ {
't': message['ttl'], 't': message['ttl'],
'q': queue_name, 'q': queue_name,
'p': project, 'p': project,
'e': now + message['ttl'], 'e': now_dt + datetime.timedelta(seconds=message['ttl']),
'u': client_uuid, 'u': client_uuid,
'c': {'id': None, 'e': now}, 'c': {'id': None, 'e': now},
'b': message['body'] if 'body' in message else {}, 'b': message['body'] if 'body' in message else {},
@ -530,38 +458,47 @@ class MessageController(storage.MessageBase):
for index, message in enumerate(messages) for index, message in enumerate(messages)
] ]
# Results are aggregated across all attempts
# NOTE(kgriffs): Using lazy instantiation...
aggregated_results = None
# Use a retry range for sanity, although we expect # Use a retry range for sanity, although we expect
# to rarely, if ever, reach the maximum number of # to rarely, if ever, reach the maximum number of
# retries. # retries.
#
# NOTE(kgriffs): With the default configuration (100 ms
# max sleep, 1000 max attempts), the max stall time
# before the operation is abandoned is 49.95 seconds.
for attempt in self._retry_range: for attempt in self._retry_range:
try: try:
ids = self._col.insert(prepared_messages) ids = self._col.insert(prepared_messages)
# NOTE(kgriffs): Only use aggregated results if we must,
# which saves some cycles on the happy path.
if aggregated_results:
aggregated_results.extend(ids)
ids = aggregated_results
# Log a message if we retried, for debugging perf issues # Log a message if we retried, for debugging perf issues
if attempt != 0: if attempt != 0:
message = _(u'%(attempts)d attempt(s) required to post ' message = _(u'%(attempts)d attempt(s) required to post '
u'%(num_messages)d messages to queue ' u'%(num_messages)d messages to queue '
u'%(queue_name)s and project %(project)s') u'"%(queue)s" under project %(project)s')
message %= dict(queue_name=queue_name, attempts=attempt+1, message %= dict(queue=queue_name, attempts=attempt+1,
num_messages=len(ids), project=project) num_messages=len(ids), project=project)
LOG.debug(message) LOG.debug(message)
# Update the counter in preparation for the next batch
#
# NOTE(kgriffs): Due to the unique index on the messages
# collection, competing inserts will fail as a whole,
# and keep retrying until the counter is incremented
# such that the competing marker's will start at a
# unique number, 1 past the max of the messages just
# inserted above.
self._queue_ctrl._inc_counter(queue_name, project,
amount=len(ids))
return map(str, ids) return map(str, ids)
except pymongo.errors.DuplicateKeyError as ex: except pymongo.errors.DuplicateKeyError as ex:
# Try again with the remaining messages # Try again with the remaining messages
# TODO(kgriffs): Record stats of how often retries happen,
# and how many attempts, on average, are required to insert
# messages.
# NOTE(kgriffs): This can be used in conjunction with the # NOTE(kgriffs): This can be used in conjunction with the
# log line, above, that is emitted after all messages have # log line, above, that is emitted after all messages have
# been posted, to guage how long it is taking for messages # been posted, to guage how long it is taking for messages
@ -570,43 +507,76 @@ class MessageController(storage.MessageBase):
# TODO(kgriffs): Add transaction ID to help match up loglines # TODO(kgriffs): Add transaction ID to help match up loglines
if attempt == 0: if attempt == 0:
message = _(u'First attempt failed while ' message = _(u'First attempt failed while '
u'adding messages to queue %s ' u'adding messages to queue '
u'for current request') % queue_name u'"%(queue)s" under project %(project)s')
message %= dict(queue=queue_name, project=project)
LOG.debug(message) LOG.debug(message)
# TODO(kgriffs): Record stats of how often retries happen, # NOTE(kgriffs): Never retry past the point that competing
# and how many attempts, on average, are required to insert # messages expire and are GC'd, since once they are gone,
# messages. # the unique index no longer protects us from getting out
# of order, which could cause an observer to miss this
# message. The code below provides a sanity-check to ensure
# this situation can not happen.
elapsed = timeutils.utcnow_ts() - now
if elapsed > MAX_RETRY_POST_DURATION:
message = _(u'Exceeded maximum retry duration for queue '
u'"%(queue)s" under project %(project)s')
message %= dict(queue=queue_name, project=project)
# NOTE(kgriffs): Slice prepared_messages. We have to interpret LOG.warning(message)
# the error message to get the duplicate key, which gives break
# us the marker that had a dupe, allowing us to extrapolate
# how many messages were consumed, since markers are monotonic
# counters.
duplicate_marker = utils.dup_marker_from_error(str(ex))
failed_index = duplicate_marker - next_marker
# Put the successful one's IDs into aggregated_results.
succeeded_messages = prepared_messages[:failed_index]
succeeded_ids = [msg['_id'] for msg in succeeded_messages]
# Results are aggregated across all attempts
if aggregated_results is None:
aggregated_results = succeeded_ids
else:
aggregated_results.extend(succeeded_ids)
# Retry the remaining messages with a new sequence
# of markers.
prepared_messages = prepared_messages[failed_index:]
next_marker = self._next_marker(queue_name, project)
for index, message in enumerate(prepared_messages):
message['k'] = next_marker + index
# Chill out for a moment to mitigate thrashing/thundering # Chill out for a moment to mitigate thrashing/thundering
self._backoff_sleep(attempt) self._backoff_sleep(attempt)
# NOTE(kgriffs): Perhaps we failed because a worker crashed
# after inserting messages, but before incrementing the
# counter; that would cause all future requests to stall,
# since they would keep getting the same base marker that is
# conflicting with existing messages, until the messages that
# "won" expire, at which time we would end up reusing markers,
# and that could make some messages invisible to an observer
# that is querying with a marker that is large than the ones
# being reused.
#
# To mitigate this, we apply a heuristic to determine whether
# a counter has stalled. We attempt to increment the counter,
# but only if it hasn't been updated for a few seconds, which
# should mean that nobody is left to update it!
#
# Note that we increment one at a time until the logjam is
# broken, since we don't know how many messages were posted
# by the worker before it crashed.
next_marker = self._queue_ctrl._inc_counter(
queue_name, project, window=COUNTER_STALL_WINDOW)
# Retry the entire batch with a new sequence of markers.
#
# NOTE(kgriffs): Due to the unique index, and how
# MongoDB works with batch requests, we will never
# end up with a partially-successful update. The first
# document in the batch will fail to insert, and the
# remainder of the documents will not be attempted.
if next_marker is None:
# NOTE(kgriffs): Usually we will end up here, since
# it should be rare that a counter becomes stalled.
next_marker = self._queue_ctrl._get_counter(
queue_name, project)
else:
message = (u'Detected a stalled message counter for '
u'queue "%(queue)s" under project %(project)s. '
u'The counter was incremented to %(value)d.')
message %= dict(queue=queue_name,
project=project,
value=next_marker)
LOG.warning(message)
for index, message in enumerate(prepared_messages):
message['k'] = next_marker + index
except Exception as ex: except Exception as ex:
# TODO(kgriffs): Query the DB to get the last marker that # TODO(kgriffs): Query the DB to get the last marker that
# made it, and extrapolate from there to figure out what # made it, and extrapolate from there to figure out what
@ -616,13 +586,13 @@ class MessageController(storage.MessageBase):
raise raise
message = _(u'Hit maximum number of attempts (%(max)s) for queue ' message = _(u'Hit maximum number of attempts (%(max)s) for queue '
u'%(id)s in project %(project)s') u'"%(queue)s" under project %(project)s')
message %= dict(max=options.CFG.max_attempts, id=queue_name, message %= dict(max=options.CFG.max_attempts, queue=queue_name,
project=project) project=project)
LOG.warning(message) LOG.warning(message)
succeeded_ids = map(str, aggregated_results) succeeded_ids = []
raise exceptions.MessageConflict(queue_name, project, succeeded_ids) raise exceptions.MessageConflict(queue_name, project, succeeded_ids)
@utils.raises_conn_error @utils.raises_conn_error
@ -647,7 +617,6 @@ class MessageController(storage.MessageBase):
return return
now = timeutils.utcnow_ts() now = timeutils.utcnow_ts()
query['e'] = {'$gt': now}
message = self._col.find_one(query) message = self._col.find_one(query)
if message is None: if message is None:

View File

@ -40,13 +40,20 @@ class QueueController(storage.QueueBase):
"""Implements queue resource operations using MongoDB. """Implements queue resource operations using MongoDB.
Queues: Queues:
Name Field Name Field
------------------ -------------------
name -> n name -> n
project -> p project -> p
counter -> c msg counter -> c
metadata -> m metadata -> m
Message Counter:
Name Field
-------------------
value -> v
modified ts -> t
""" """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
@ -76,6 +83,89 @@ class QueueController(storage.QueueBase):
cursor = self._col.find({}, fields={'n': 1, 'p': 1}) cursor = self._col.find({}, fields={'n': 1, 'p': 1})
return ((doc['n'], doc['p']) for doc in cursor) return ((doc['n'], doc['p']) for doc in cursor)
def _get_counter(self, name, project=None):
"""Retrieves the current message counter value for a given queue.
This helper is used to generate monotonic pagination
markers that are saved as part of the message
document.
Note 1: Markers are scoped per-queue and so are *not*
globally unique or globally ordered.
Note 2: If two or more requests to this method are made
in parallel, this method will return the same counter
value. This is done intentionally so that the caller
can detect a parallel message post, allowing it to
mitigate race conditions between producer and
observer clients.
:param name: Name of the queue to which the counter is scoped
:param project: Queue's project
:returns: current message counter as an integer
"""
doc = self._col.find_one({'p': project, 'n': name},
fields={'c.v': 1, '_id': 0})
if doc is None:
raise exceptions.QueueDoesNotExist(name, project)
return doc['c']['v']
def _inc_counter(self, name, project=None, amount=1, window=None):
"""Increments the message counter and returns the new value.
:param name: Name of the queue to which the counter is scoped
:param project: Queue's project name
:param amount: (Default 1) Amount by which to increment the counter
:param window: (Default None) A time window, in seconds, that
must have elapsed since the counter was last updated, in
order to increment the counter.
:returns: Updated message counter value, or None if window
was specified, and the counter has already been updated
within the specified time period.
:raises: storage.exceptions.QueueDoesNotExist
"""
now = timeutils.utcnow_ts()
update = {'$inc': {'c.v': amount}, '$set': {'c.t': now}}
query = {'p': project, 'n': name}
if window is not None:
threshold = now - window
query['c.t'] = {'$lt': threshold}
while True:
try:
doc = self._col.find_and_modify(query, update, new=True,
fields={'c.v': 1, '_id': 0})
break
except pymongo.errors.AutoReconnect as ex:
LOG.exception(ex)
if doc is None:
if window is None:
# NOTE(kgriffs): Since we did not filter by a time window,
# the queue should have been found and updated. Perhaps
# the queue has been deleted?
message = _(u'Failed to increment the message '
u'counter for queue %(name)s and '
u'project %(project)s')
message %= dict(name=name, project=project)
LOG.warning(message)
raise exceptions.QueueDoesNotExist(name, project)
# NOTE(kgriffs): Assume the queue existed, but the counter
# was recently updated, causing the range query on 'c.t' to
# exclude the record.
return None
return doc['c']['v']
#----------------------------------------------------------------------- #-----------------------------------------------------------------------
# Interface # Interface
#----------------------------------------------------------------------- #-----------------------------------------------------------------------
@ -116,7 +206,12 @@ class QueueController(storage.QueueBase):
@utils.raises_conn_error @utils.raises_conn_error
def create(self, name, project=None): def create(self, name, project=None):
try: try:
self._col.insert({'p': project, 'n': name, 'm': {}, 'c': 1}) # NOTE(kgriffs): Start counting at 1, and assume the first
# message ever posted will succeed and set t to a UNIX
# "modified at" timestamp.
counter = {'v': 1, 't': 0}
self._col.insert({'p': project, 'n': name, 'm': {}, 'c': counter})
except pymongo.errors.DuplicateKeyError: except pymongo.errors.DuplicateKeyError:
return False return False

View File

@ -17,21 +17,16 @@ import collections
import datetime import datetime
import functools import functools
import random import random
import re
from bson import errors as berrors from bson import errors as berrors
from bson import objectid from bson import objectid
from bson import tz_util from bson import tz_util
from pymongo import errors from pymongo import errors
from marconi.common import exceptions
import marconi.openstack.common.log as logging import marconi.openstack.common.log as logging
from marconi.openstack.common import timeutils from marconi.openstack.common import timeutils
from marconi.queues.storage import exceptions as storage_exceptions from marconi.queues.storage import exceptions as storage_exceptions
DUP_MARKER_REGEX = re.compile(r'\$queue_marker.*?:\s(\d+)')
# BSON ObjectId gives TZ-aware datetime, so we generate a # BSON ObjectId gives TZ-aware datetime, so we generate a
# TZ-aware UNIX epoch for convenience. # TZ-aware UNIX epoch for convenience.
EPOCH = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=tz_util.utc) EPOCH = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=tz_util.utc)
@ -39,24 +34,6 @@ EPOCH = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=tz_util.utc)
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def dup_marker_from_error(error_message):
"""Extracts the duplicate marker from a MongoDB error string.
:param error_message: raw error message string returned
by mongod on a duplicate key error.
:raises: marconi.common.exceptions.PatternNotFound
:returns: extracted marker as an integer
"""
match = DUP_MARKER_REGEX.findall(error_message)
if not match:
description = (u'Error message could not be parsed: %s' %
error_message)
raise exceptions.PatternNotFound(description)
return int(match[-1])
def cached_gen(iterable): def cached_gen(iterable):
"""Converts the iterable into a caching generator. """Converts the iterable into a caching generator.

View File

@ -18,5 +18,9 @@
from marconi.tests import base from marconi.tests import base
from marconi.tests import helpers from marconi.tests import helpers
TestBase = base.TestBase SKIP_SLOW_TESTS = helpers.SKIP_SLOW_TESTS
RUN_SLOW_TESTS = not SKIP_SLOW_TESTS
expect = helpers.expect expect = helpers.expect
is_slow = helpers.is_slow
TestBase = base.TestBase

View File

@ -14,10 +14,14 @@
# limitations under the License. # limitations under the License.
import contextlib import contextlib
import functools
import os
import uuid import uuid
import six import six
SKIP_SLOW_TESTS = os.environ.get('MARCONI_TEST_SLOW') is None
@contextlib.contextmanager @contextlib.contextmanager
def expect(*exc_type): def expect(*exc_type):
@ -120,3 +124,30 @@ def entries(controller, count):
for p, q, _, _ in spec: for p, q, _, _ in spec:
controller.delete(p, q) controller.delete(p, q)
def is_slow(condition=lambda self: True):
"""Decorator to flag slow tests.
Slow tests will be skipped if MARCONI_TEST_SLOW is set, and
condition(self) returns True.
:param condition: Function that returns True IFF the test will be slow;
useful for child classes which may modify the behavior of a test
such that it may or may not be slow.
"""
def decorator(func):
@functools.wraps(func)
def wrapper(self):
if SKIP_SLOW_TESTS and condition(self):
msg = ('Skipping slow test. Set MARCONI_TEST_SLOW '
'to enable slow tests.')
self.skipTest(msg)
func(self)
return wrapper
return decorator

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import datetime
import time import time
from testtools import matchers from testtools import matchers
@ -43,6 +44,10 @@ class ControllerBaseTest(testing.TestBase):
self.driver = self.driver_class() self.driver = self.driver_class()
self.controller = self.controller_class(self.driver) self.controller = self.controller_class(self.driver)
def tearDown(self):
timeutils.clear_time_override()
super(ControllerBaseTest, self).tearDown()
class QueueControllerTest(ControllerBaseTest): class QueueControllerTest(ControllerBaseTest):
"""Queue Controller base tests.""" """Queue Controller base tests."""
@ -53,10 +58,6 @@ class QueueControllerTest(ControllerBaseTest):
self.message_controller = self.driver.message_controller self.message_controller = self.driver.message_controller
self.claim_controller = self.driver.claim_controller self.claim_controller = self.driver.claim_controller
def tearDown(self):
timeutils.clear_time_override()
super(QueueControllerTest, self).tearDown()
def test_list(self): def test_list(self):
num = 15 num = 15
for queue in xrange(num): for queue in xrange(num):
@ -132,21 +133,18 @@ class QueueControllerTest(ControllerBaseTest):
self.assertNotEqual(oldest, newest) self.assertNotEqual(oldest, newest)
# NOTE(kgriffs): Ensure "now" is different enough # NOTE(kgriffs): Ensure is different enough
# for the next comparison to work. # for the next comparison to work.
timeutils.set_time_override() soon = timeutils.utcnow() + datetime.timedelta(seconds=60)
timeutils.advance_time_seconds(60)
for message_stat in (oldest, newest): for message_stat in (oldest, newest):
created_iso = message_stat['created'] created_iso = message_stat['created']
created = timeutils.parse_isotime(created_iso) created = timeutils.parse_isotime(created_iso)
self.assertThat(timeutils.normalize_time(created), self.assertThat(timeutils.normalize_time(created),
matchers.LessThan(timeutils.utcnow())) matchers.LessThan(soon))
self.assertIn('id', message_stat) self.assertIn('id', message_stat)
timeutils.clear_time_override()
self.assertThat(oldest['created'], self.assertThat(oldest['created'],
matchers.LessThan(newest['created'])) matchers.LessThan(newest['created']))
@ -188,6 +186,9 @@ class MessageControllerTest(ControllerBaseTest):
queue_name = 'test_queue' queue_name = 'test_queue'
controller_base_class = storage.MessageBase controller_base_class = storage.MessageBase
# Specifies how often expired messages are purged, in sec.
gc_interval = 0
def setUp(self): def setUp(self):
super(MessageControllerTest, self).setUp() super(MessageControllerTest, self).setUp()
@ -342,7 +343,8 @@ class MessageControllerTest(ControllerBaseTest):
project=self.project, project=self.project,
claim=cid) claim=cid)
def test_expired_message(self): @testing.is_slow(condition=lambda self: self.gc_interval != 0)
def test_expired_messages(self):
messages = [{'body': 3.14, 'ttl': 0}] messages = [{'body': 3.14, 'ttl': 0}]
[msgid] = self.controller.post(self.queue_name, messages, [msgid] = self.controller.post(self.queue_name, messages,
@ -353,14 +355,16 @@ class MessageControllerTest(ControllerBaseTest):
project=self.project, project=self.project,
client_uuid='my_uuid') client_uuid='my_uuid')
time.sleep(self.gc_interval)
with testing.expect(storage.exceptions.DoesNotExist): with testing.expect(storage.exceptions.DoesNotExist):
self.controller.get(self.queue_name, msgid, self.controller.get(self.queue_name, msgid,
project=self.project) project=self.project)
countof = self.queue_controller.stats(self.queue_name, stats = self.queue_controller.stats(self.queue_name,
project=self.project) project=self.project)
self.assertEquals(countof['messages']['free'], 0) self.assertEquals(stats['messages']['free'], 0)
def test_bad_id(self): def test_bad_id(self):
# NOTE(cpp-cabrera): A malformed ID should result in an empty # NOTE(cpp-cabrera): A malformed ID should result in an empty

View File

@ -24,7 +24,6 @@ packages =
[entry_points] [entry_points]
console_scripts = console_scripts =
marconi-gc = marconi.cmd.gc:run
marconi-server = marconi.cmd.server:run marconi-server = marconi.cmd.server:run
marconi.storage = marconi.storage =

View File

@ -21,8 +21,9 @@ from pymongo import cursor
import pymongo.errors import pymongo.errors
from testtools import matchers from testtools import matchers
from marconi.common import exceptions from marconi.openstack.common import timeutils
from marconi.queues import storage from marconi.queues import storage
from marconi.queues.storage import exceptions
from marconi.queues.storage import mongodb from marconi.queues.storage import mongodb
from marconi.queues.storage.mongodb import controllers from marconi.queues.storage.mongodb import controllers
from marconi.queues.storage.mongodb import options as mongodb_options from marconi.queues.storage.mongodb import options as mongodb_options
@ -33,28 +34,6 @@ from marconi.tests.queues.storage import base
class MongodbUtilsTest(testing.TestBase): class MongodbUtilsTest(testing.TestBase):
def test_dup_marker_from_error(self):
error_message = ('E11000 duplicate key error index: '
'marconi.messages.$queue_marker dup key: '
'{ : "queue", : "project", : 3 }')
marker = utils.dup_marker_from_error(error_message)
self.assertEquals(marker, 3)
error_message = ('E11000 duplicate key error index: '
'marconi.messages.$x_y dup key: '
'{ : "queue", : "project", : 3 }')
self.assertRaises(exceptions.PatternNotFound,
utils.dup_marker_from_error, error_message)
error_message = ('E11000 duplicate key error index: '
'marconi.messages.$queue_marker dup key: '
'{ : ObjectId("51adff46b100eb85d8a93a2d") }')
self.assertRaises(exceptions.PatternNotFound,
utils.dup_marker_from_error, error_message)
def test_calculate_backoff(self): def test_calculate_backoff(self):
sec = utils.calculate_backoff(0, 10, 2, 0) sec = utils.calculate_backoff(0, 10, 2, 0)
self.assertEquals(sec, 0) self.assertEquals(sec, 0)
@ -111,6 +90,7 @@ class MongodbQueueTests(base.QueueControllerTest):
def tearDown(self): def tearDown(self):
self.controller._col.drop() self.controller._col.drop()
self.message_controller._col.drop()
super(MongodbQueueTests, self).tearDown() super(MongodbQueueTests, self).tearDown()
def test_indexes(self): def test_indexes(self):
@ -143,6 +123,9 @@ class MongodbMessageTests(base.MessageControllerTest):
driver_class = mongodb.Driver driver_class = mongodb.Driver
controller_class = controllers.MessageController controller_class = controllers.MessageController
# NOTE(kgriffs): MongoDB's TTL scavenger only runs once a minute
gc_interval = 60
def setUp(self): def setUp(self):
if not os.environ.get('MONGODB_TEST_LIVE'): if not os.environ.get('MONGODB_TEST_LIVE'):
self.skipTest('No MongoDB instance running') self.skipTest('No MongoDB instance running')
@ -152,11 +135,9 @@ class MongodbMessageTests(base.MessageControllerTest):
def tearDown(self): def tearDown(self):
self.controller._col.drop() self.controller._col.drop()
self.queue_controller._col.drop()
super(MongodbMessageTests, self).tearDown() super(MongodbMessageTests, self).tearDown()
def _count_expired(self, queue, project=None):
return self.controller._count_expired(queue, project)
def test_indexes(self): def test_indexes(self):
col = self.controller._col col = self.controller._col
indexes = col.index_information() indexes = col.index_information()
@ -165,58 +146,121 @@ class MongodbMessageTests(base.MessageControllerTest):
self.assertIn('queue_marker', indexes) self.assertIn('queue_marker', indexes)
self.assertIn('counting', indexes) self.assertIn('counting', indexes)
def test_next_marker(self): def test_message_counter(self):
queue_name = 'marker_test' queue_name = 'marker_test'
iterations = 10 iterations = 10
self.queue_controller.create(queue_name) self.queue_controller.create(queue_name)
seed_marker1 = self.controller._next_marker(queue_name) seed_marker1 = self.queue_controller._get_counter(queue_name)
self.assertEqual(seed_marker1, 1, 'First marker is 1') self.assertEqual(seed_marker1, 1, 'First marker is 1')
for i in range(iterations): for i in range(iterations):
self.controller.post(queue_name, [{'ttl': 60}], 'uuid') self.controller.post(queue_name, [{'ttl': 60}], 'uuid')
marker1 = self.controller._next_marker(queue_name)
marker2 = self.controller._next_marker(queue_name) marker1 = self.queue_controller._get_counter(queue_name)
marker3 = self.controller._next_marker(queue_name) marker2 = self.queue_controller._get_counter(queue_name)
marker3 = self.queue_controller._get_counter(queue_name)
self.assertEqual(marker1, marker2) self.assertEqual(marker1, marker2)
self.assertEqual(marker2, marker3) self.assertEqual(marker2, marker3)
self.assertEqual(marker1, i + 2) self.assertEqual(marker1, i + 2)
def test_remove_expired(self): new_value = self.queue_controller._inc_counter(queue_name)
num_projects = 10 self.assertIsNotNone(new_value)
num_queues = 10
messages_per_queue = 100
projects = ['gc-test-project-{0}'.format(i) value_before = self.queue_controller._get_counter(queue_name)
for i in range(num_projects)] new_value = self.queue_controller._inc_counter(queue_name)
self.assertIsNotNone(new_value)
value_after = self.queue_controller._get_counter(queue_name)
self.assertEquals(value_after, value_before + 1)
queue_names = ['gc-test-{0}'.format(i) for i in range(num_queues)] value_before = value_after
client_uuid = 'b623c53c-cf75-11e2-84e1-a1187188419e' new_value = self.queue_controller._inc_counter(queue_name, amount=7)
messages = [{'ttl': 0, 'body': str(i)} value_after = self.queue_controller._get_counter(queue_name)
for i in range(messages_per_queue)] self.assertEquals(value_after, value_before + 7)
self.assertEquals(value_after, new_value)
for project in projects: reference_value = value_after
for queue in queue_names:
self.queue_controller.create(queue, project)
self.controller.post(queue, messages, client_uuid, project)
self.controller.remove_expired() unchanged = self.queue_controller._inc_counter(queue_name, window=10)
self.assertIsNone(unchanged)
for project in projects: # TODO(kgriffs): Pass utcnow to work around bug
for queue in queue_names: # in set_time_override until we merge the fix in
query = {'q': queue, 'p': project} # from upstream.
timeutils.set_time_override(timeutils.utcnow())
cursor = self.driver.db.messages.find(query) timeutils.advance_time_seconds(10)
count = cursor.count() changed = self.queue_controller._inc_counter(queue_name, window=5)
self.assertEquals(changed, reference_value + 1)
timeutils.clear_time_override()
# Expect that the most recent message for each queue def test_race_condition_on_post(self):
# will not be removed. queue_name = 'marker_test'
self.assertEquals(count, 1) self.queue_controller.create(queue_name)
message = next(cursor) expected_messages = [
self.assertEquals(message['k'], messages_per_queue) {
'ttl': 60,
'body': {
'event': 'BackupStarted',
'backupId': 'c378813c-3f0b-11e2-ad92-7823d2b0f3ce',
},
},
{
'ttl': 60,
'body': {
'event': 'BackupStarted',
'backupId': 'd378813c-3f0b-11e2-ad92-7823d2b0f3ce',
},
},
{
'ttl': 60,
'body': {
'event': 'BackupStarted',
'backupId': 'e378813c-3f0b-11e2-ad92-7823d2b0f3ce',
},
},
]
uuid = '97b64000-2526-11e3-b088-d85c1300734c'
# NOTE(kgriffs): Patch _inc_counter so it is a noop, so that
# the second time we post, we will get a collision. This simulates
# what happens when we have parallel requests and the "winning"
# requests hasn't gotten around to calling _inc_counter before the
# "losing" request attempts to insert it's batch of messages.
with mock.patch.object(mongodb.queues.QueueController,
'_inc_counter', autospec=True) as method:
method.return_value = 2
messages = expected_messages[:1]
created = list(self.controller.post(queue_name, messages, uuid))
self.assertEquals(len(created), 1)
# Force infinite retries
if testing.RUN_SLOW_TESTS:
method.return_value = None
with testing.expect(exceptions.MessageConflict):
self.controller.post(queue_name, messages, uuid)
created = list(self.controller.post(queue_name,
expected_messages[1:],
uuid))
self.assertEquals(len(created), 2)
expected_ids = [m['body']['backupId'] for m in expected_messages]
interaction = self.controller.list(queue_name, client_uuid=uuid,
echo=True)
actual_messages = list(next(interaction))
self.assertEquals(len(actual_messages), len(expected_messages))
actual_ids = [m['body']['backupId'] for m in actual_messages]
self.assertEquals(actual_ids, expected_ids)
def test_empty_queue_exception(self): def test_empty_queue_exception(self):
queue_name = 'empty-queue-test' queue_name = 'empty-queue-test'
@ -244,6 +288,11 @@ class MongodbClaimTests(base.ClaimControllerTest):
super(MongodbClaimTests, self).setUp() super(MongodbClaimTests, self).setUp()
self.load_conf('wsgi_mongodb.conf') self.load_conf('wsgi_mongodb.conf')
def tearDown(self):
self.message_controller._col.drop()
self.queue_controller._col.drop()
super(MongodbClaimTests, self).tearDown()
def test_claim_doesnt_exist(self): def test_claim_doesnt_exist(self):
"""Verifies that operations fail on expired/missing claims. """Verifies that operations fail on expired/missing claims.