
When a primary MongoDB node fails over to a secondary, pymongo raises an AutoReconnect error. Let's catch that and retry the operation so that we truly are Highly Available (in the sense that the user will never notice the few ms of "downtime" caused by a failover). This is particularly important when hosting backend with a DBaaS that routinely fails over the master as a way of compacting shards. NOTE: In order to get all MongoDB tests green, a tiny unrelated bug in test_shards was fixed as part of this patch. Closes-Bug: 1214973 Change-Id: Ibf172e30ec6e7fa0bbb8fdcebda9e985d1e49714
749 lines
28 KiB
Python
749 lines
28 KiB
Python
# Copyright (c) 2013 Red Hat, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Implements MongoDB the storage controller for messages.
|
|
|
|
Field Mappings:
|
|
In order to reduce the disk / memory space used,
|
|
field names will be, most of the time, the first
|
|
letter of their long name.
|
|
"""
|
|
|
|
import datetime
|
|
import time
|
|
|
|
from bson import objectid
|
|
import pymongo.errors
|
|
import pymongo.read_preferences
|
|
|
|
from marconi.openstack.common.gettextutils import _
|
|
import marconi.openstack.common.log as logging
|
|
from marconi.openstack.common import timeutils
|
|
from marconi.queues import storage
|
|
from marconi.queues.storage import errors
|
|
from marconi.queues.storage.mongodb import utils
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
# NOTE(kgriffs): This value, in seconds, should be at least less than the
|
|
# minimum allowed TTL for messages (60 seconds). Make it 45 to allow for
|
|
# some fudge room.
|
|
MAX_RETRY_POST_DURATION = 45
|
|
|
|
# NOTE(kgriffs): It is extremely unlikely that all workers would somehow hang
|
|
# for more than 5 seconds, without a single one being able to succeed in
|
|
# posting some messages and incrementing the counter, thus allowing the other
|
|
# producers to succeed in turn.
|
|
COUNTER_STALL_WINDOW = 5
|
|
|
|
# For hinting
|
|
ID_INDEX_FIELDS = [('_id', 1)]
|
|
|
|
# For removing expired messages
|
|
TTL_INDEX_FIELDS = [
|
|
('e', 1),
|
|
]
|
|
|
|
# NOTE(cpp-cabrera): to unify use of project/queue across mongodb
|
|
# storage impls.
|
|
PROJ_QUEUE = utils.PROJ_QUEUE_KEY
|
|
|
|
# NOTE(kgriffs): This index is for listing messages, usually
|
|
# filtering out claimed ones.
|
|
ACTIVE_INDEX_FIELDS = [
|
|
(PROJ_QUEUE, 1), # Project will be unique, so put first
|
|
('k', 1), # Used for sorting and paging, must come before range queries
|
|
('c.e', 1), # Used for filtering out claimed messages
|
|
|
|
# NOTE(kgriffs): We do not include 'u' and 'tx' here on
|
|
# purpose. It was found experimentally that adding 'u' did
|
|
# not improve performance, and so it was left out in order
|
|
# to reduce index size and make updating the index
|
|
# faster. When 'tx' was added, it was assumed that it would
|
|
# follow a similar performance pattern to 'u', since by
|
|
# the time you traverse the index down past the fields
|
|
# listed above, there is very little left to scan, esp.
|
|
# considering all queries are limited (limit=) to a fairly
|
|
# small number.
|
|
#
|
|
# TODO(kgriffs): The extrapolation wrt 'tx' needs to be
|
|
# proven empirically.
|
|
]
|
|
|
|
# For counting
|
|
COUNTING_INDEX_FIELDS = [
|
|
(PROJ_QUEUE, 1), # Project will be unique, so put first
|
|
('c.e', 1), # Used for filtering out claimed messages
|
|
]
|
|
|
|
# Index used for claims
|
|
CLAIMED_INDEX_FIELDS = [
|
|
(PROJ_QUEUE, 1),
|
|
('c.id', 1),
|
|
('k', 1),
|
|
('c.e', 1),
|
|
]
|
|
|
|
# Index used to ensure uniqueness.
|
|
MARKER_INDEX_FIELDS = [
|
|
(PROJ_QUEUE, 1),
|
|
('k', 1),
|
|
]
|
|
|
|
TRANSACTION_INDEX_FIELDS = [
|
|
('tx', 1),
|
|
]
|
|
|
|
|
|
class MessageController(storage.Message):
|
|
"""Implements message resource operations using MongoDB.
|
|
|
|
Messages are scoped by project + queue.
|
|
|
|
Messages:
|
|
Name Field
|
|
-------------------------
|
|
scope -> p_q
|
|
ttl -> t
|
|
expires -> e
|
|
marker -> k
|
|
body -> b
|
|
claim -> c
|
|
client uuid -> u
|
|
transaction -> tx
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(MessageController, self).__init__(*args, **kwargs)
|
|
|
|
# Cache for convenience and performance
|
|
self._num_partitions = self.driver.mongodb_conf.partitions
|
|
self._queue_ctrl = self.driver.queue_controller
|
|
self._retry_range = range(self.driver.mongodb_conf.max_attempts)
|
|
|
|
# Create a list of 'messages' collections, one for each database
|
|
# partition, ordered by partition number.
|
|
#
|
|
# NOTE(kgriffs): Order matters, since it is used to lookup the
|
|
# collection by partition number. For example, self._collections[2]
|
|
# would provide access to marconi_p2.messages (partition numbers are
|
|
# zero-based).
|
|
self._collections = [db.messages
|
|
for db in self.driver.message_databases]
|
|
|
|
# Ensure indexes are initialized before any queries are performed
|
|
for collection in self._collections:
|
|
self._ensure_indexes(collection)
|
|
|
|
#-----------------------------------------------------------------------
|
|
# Helpers
|
|
#-----------------------------------------------------------------------
|
|
|
|
def _ensure_indexes(self, collection):
|
|
"""Ensures that all indexes are created."""
|
|
|
|
collection.ensure_index(TTL_INDEX_FIELDS,
|
|
name='ttl',
|
|
expireAfterSeconds=0,
|
|
background=True)
|
|
|
|
collection.ensure_index(ACTIVE_INDEX_FIELDS,
|
|
name='active',
|
|
background=True)
|
|
|
|
collection.ensure_index(CLAIMED_INDEX_FIELDS,
|
|
name='claimed',
|
|
background=True)
|
|
|
|
collection.ensure_index(COUNTING_INDEX_FIELDS,
|
|
name='counting',
|
|
background=True)
|
|
|
|
# NOTE(kgriffs): This index must be unique so that
|
|
# inserting a message with the same marker to the
|
|
# same queue will fail; this is used to detect a
|
|
# race condition which can cause an observer client
|
|
# to miss a message when there is more than one
|
|
# producer posting messages to the same queue, in
|
|
# parallel.
|
|
collection.ensure_index(MARKER_INDEX_FIELDS,
|
|
name='queue_marker',
|
|
unique=True,
|
|
background=True)
|
|
|
|
collection.ensure_index(TRANSACTION_INDEX_FIELDS,
|
|
name='transaction',
|
|
background=True)
|
|
|
|
def _collection(self, queue_name, project=None):
|
|
"""Get a partitioned collection instance."""
|
|
return self._collections[utils.get_partition(self._num_partitions,
|
|
queue_name, project)]
|
|
|
|
def _backoff_sleep(self, attempt):
|
|
"""Sleep between retries using a jitter algorithm.
|
|
|
|
Mitigates thrashing between multiple parallel requests, and
|
|
creates backpressure on clients to slow down the rate
|
|
at which they submit requests.
|
|
|
|
:param attempt: current attempt number, zero-based
|
|
"""
|
|
conf = self.driver.mongodb_conf
|
|
seconds = utils.calculate_backoff(attempt, conf.max_attempts,
|
|
conf.max_retry_sleep,
|
|
conf.max_retry_jitter)
|
|
|
|
time.sleep(seconds)
|
|
|
|
def _purge_queue(self, queue_name, project=None):
|
|
"""Removes all messages from the queue.
|
|
|
|
Warning: Only use this when deleting the queue; otherwise
|
|
you can cause a side-effect of reseting the marker counter
|
|
which can cause clients to miss tons of messages.
|
|
|
|
If the queue does not exist, this method fails silently.
|
|
|
|
:param queue_name: name of the queue to purge
|
|
:param project: ID of the project to which the queue belongs
|
|
"""
|
|
scope = utils.scope_queue_name(queue_name, project)
|
|
collection = self._collection(queue_name, project)
|
|
collection.remove({PROJ_QUEUE: scope}, w=0)
|
|
|
|
def _list(self, queue_name, project=None, marker=None,
|
|
echo=False, client_uuid=None, fields=None,
|
|
include_claimed=False, sort=1, limit=None):
|
|
"""Message document listing helper.
|
|
|
|
:param queue_name: Name of the queue to list
|
|
:param project: (Default None) Project `queue_name` belongs to. If
|
|
not specified, queries the "global" namespace/project.
|
|
:param marker: (Default None) Message marker from which to start
|
|
iterating. If not specified, starts with the first message
|
|
available in the queue.
|
|
:param echo: (Default False) Whether to return messages that match
|
|
client_uuid
|
|
:param client_uuid: (Default None) UUID for the client that
|
|
originated this request
|
|
:param fields: (Default None) Fields to include in emitted
|
|
documents
|
|
:param include_claimed: (Default False) Whether to include
|
|
claimed messages, not just active ones
|
|
:param sort: (Default 1) Sort order for the listing. Pass 1 for
|
|
ascending (oldest message first), or -1 for descending (newest
|
|
message first).
|
|
:param limit: (Default None) The maximum number of messages
|
|
to list. The results may include fewer messages than the
|
|
requested `limit` if not enough are available. If limit is
|
|
not specified
|
|
|
|
:returns: Generator yielding up to `limit` messages.
|
|
"""
|
|
|
|
if sort not in (1, -1):
|
|
raise ValueError(u'sort must be either 1 (ascending) '
|
|
u'or -1 (descending)')
|
|
|
|
now = timeutils.utcnow_ts()
|
|
|
|
query = {
|
|
# Messages must belong to this queue and project.
|
|
PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
|
|
|
|
# NOTE(kgriffs): Messages must be finalized (i.e., must not
|
|
# be part of an unfinalized transaction).
|
|
#
|
|
# See also the note wrt 'tx' within the definition
|
|
# of ACTIVE_INDEX_FIELDS.
|
|
'tx': None,
|
|
}
|
|
|
|
if not echo:
|
|
query['u'] = {'$ne': client_uuid}
|
|
|
|
if marker is not None:
|
|
query['k'] = {'$gt': marker}
|
|
|
|
collection = self._collection(queue_name, project)
|
|
|
|
if not include_claimed:
|
|
# Only include messages that are not part of
|
|
# any claim, or are part of an expired claim.
|
|
query['c.e'] = {'$lte': now}
|
|
|
|
# Construct the request
|
|
cursor = collection.find(query, fields=fields, sort=[('k', sort)])
|
|
|
|
if limit is not None:
|
|
cursor.limit(limit)
|
|
|
|
# NOTE(flaper87): Suggest the index to use for this query to
|
|
# ensure the most performant one is chosen.
|
|
return cursor.hint(ACTIVE_INDEX_FIELDS)
|
|
|
|
#-----------------------------------------------------------------------
|
|
# "Friends" interface
|
|
#-----------------------------------------------------------------------
|
|
|
|
def _count(self, queue_name, project=None, include_claimed=False):
|
|
"""Return total number of messages in a queue.
|
|
|
|
This method is designed to very quickly count the number
|
|
of messages in a given queue. Expired messages are not
|
|
counted, of course. If the queue does not exist, the
|
|
count will always be 0.
|
|
|
|
Note: Some expired messages may be included in the count if
|
|
they haven't been GC'd yet. This is done for performance.
|
|
"""
|
|
query = {
|
|
# Messages must belong to this queue and project.
|
|
PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
|
|
|
|
# NOTE(kgriffs): Messages must be finalized (i.e., must not
|
|
# be part of an unfinalized transaction).
|
|
#
|
|
# See also the note wrt 'tx' within the definition
|
|
# of ACTIVE_INDEX_FIELDS.
|
|
'tx': None,
|
|
}
|
|
|
|
if not include_claimed:
|
|
# Exclude messages that are claimed
|
|
query['c.e'] = {'$lte': timeutils.utcnow_ts()}
|
|
|
|
collection = self._collection(queue_name, project)
|
|
return collection.find(query).hint(COUNTING_INDEX_FIELDS).count()
|
|
|
|
def _active(self, queue_name, marker=None, echo=False,
|
|
client_uuid=None, fields=None, project=None,
|
|
limit=None):
|
|
|
|
return self._list(queue_name, project=project, marker=marker,
|
|
echo=echo, client_uuid=client_uuid,
|
|
fields=fields, include_claimed=False,
|
|
limit=limit)
|
|
|
|
def _claimed(self, queue_name, claim_id,
|
|
expires=None, limit=None, project=None):
|
|
|
|
if claim_id is None:
|
|
claim_id = {'$ne': None}
|
|
|
|
query = {
|
|
PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
|
|
'c.id': claim_id,
|
|
'c.e': {'$gt': expires or timeutils.utcnow_ts()},
|
|
}
|
|
|
|
# NOTE(kgriffs): Claimed messages bust be queried from
|
|
# the primary to avoid a race condition caused by the
|
|
# multi-phased "create claim" algorithm.
|
|
preference = pymongo.read_preferences.ReadPreference.PRIMARY
|
|
collection = self._collection(queue_name, project)
|
|
msgs = collection.find(query, sort=[('k', 1)],
|
|
read_preference=preference).hint(
|
|
CLAIMED_INDEX_FIELDS
|
|
)
|
|
|
|
if limit is not None:
|
|
msgs = msgs.limit(limit)
|
|
|
|
now = timeutils.utcnow_ts()
|
|
|
|
def denormalizer(msg):
|
|
doc = _basic_message(msg, now)
|
|
doc['claim'] = msg['c']
|
|
|
|
return doc
|
|
|
|
return utils.HookedCursor(msgs, denormalizer)
|
|
|
|
def _unclaim(self, queue_name, claim_id, project=None):
|
|
cid = utils.to_oid(claim_id)
|
|
|
|
# NOTE(cpp-cabrera): early abort - avoid a DB query if we're handling
|
|
# an invalid ID
|
|
if cid is None:
|
|
return
|
|
|
|
# NOTE(cpp-cabrera): unclaim by setting the claim ID to None
|
|
# and the claim expiration time to now
|
|
now = timeutils.utcnow_ts()
|
|
scope = utils.scope_queue_name(queue_name, project)
|
|
collection = self._collection(queue_name, project)
|
|
|
|
collection.update({PROJ_QUEUE: scope, 'c.id': cid},
|
|
{'$set': {'c': {'id': None, 'e': now}}},
|
|
upsert=False, multi=True)
|
|
|
|
#-----------------------------------------------------------------------
|
|
# Public interface
|
|
#-----------------------------------------------------------------------
|
|
|
|
def list(self, queue_name, project=None, marker=None,
|
|
limit=storage.DEFAULT_MESSAGES_PER_PAGE,
|
|
echo=False, client_uuid=None, include_claimed=False):
|
|
|
|
if marker is not None:
|
|
try:
|
|
marker = int(marker)
|
|
except ValueError:
|
|
yield iter([])
|
|
|
|
messages = self._list(queue_name, project=project, marker=marker,
|
|
client_uuid=client_uuid, echo=echo,
|
|
include_claimed=include_claimed, limit=limit)
|
|
|
|
marker_id = {}
|
|
|
|
now = timeutils.utcnow_ts()
|
|
|
|
# NOTE (kgriffs) @utils.raises_conn_error not needed on this
|
|
# function, since utils.HookedCursor already has it.
|
|
def denormalizer(msg):
|
|
marker_id['next'] = msg['k']
|
|
|
|
return _basic_message(msg, now)
|
|
|
|
yield utils.HookedCursor(messages, denormalizer)
|
|
yield str(marker_id['next'])
|
|
|
|
@utils.raises_conn_error
|
|
@utils.retries_on_autoreconnect
|
|
def first(self, queue_name, project=None, sort=1):
|
|
cursor = self._list(queue_name, project=project,
|
|
include_claimed=True, sort=sort,
|
|
limit=1)
|
|
try:
|
|
message = next(cursor)
|
|
except StopIteration:
|
|
raise errors.QueueIsEmpty(queue_name, project)
|
|
|
|
return message
|
|
|
|
@utils.raises_conn_error
|
|
@utils.retries_on_autoreconnect
|
|
def get(self, queue_name, message_id, project=None):
|
|
mid = utils.to_oid(message_id)
|
|
if mid is None:
|
|
raise errors.MessageDoesNotExist(message_id, queue_name,
|
|
project)
|
|
|
|
now = timeutils.utcnow_ts()
|
|
|
|
query = {
|
|
'_id': mid,
|
|
PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
|
|
}
|
|
|
|
collection = self._collection(queue_name, project)
|
|
message = list(collection.find(query).limit(1).hint(ID_INDEX_FIELDS))
|
|
|
|
if not message:
|
|
raise errors.MessageDoesNotExist(message_id, queue_name,
|
|
project)
|
|
|
|
return _basic_message(message[0], now)
|
|
|
|
@utils.raises_conn_error
|
|
@utils.retries_on_autoreconnect
|
|
def bulk_get(self, queue_name, message_ids, project=None):
|
|
message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid]
|
|
if not message_ids:
|
|
return iter([])
|
|
|
|
now = timeutils.utcnow_ts()
|
|
|
|
# Base query, always check expire time
|
|
query = {
|
|
'_id': {'$in': message_ids},
|
|
PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
|
|
}
|
|
|
|
collection = self._collection(queue_name, project)
|
|
|
|
# NOTE(flaper87): Should this query
|
|
# be sorted?
|
|
messages = collection.find(query).hint(ID_INDEX_FIELDS)
|
|
|
|
def denormalizer(msg):
|
|
return _basic_message(msg, now)
|
|
|
|
return utils.HookedCursor(messages, denormalizer)
|
|
|
|
@utils.raises_conn_error
|
|
@utils.retries_on_autoreconnect
|
|
def post(self, queue_name, messages, client_uuid, project=None):
|
|
# NOTE(flaper87): This method should be safe to retry on
|
|
# autoreconnect, since we've a 2-step insert for messages.
|
|
# The worst-case scenario is that we'll increase the counter
|
|
# several times and we'd end up with some non-active messages.
|
|
|
|
if not self._queue_ctrl.exists(queue_name, project):
|
|
raise errors.QueueDoesNotExist(queue_name, project)
|
|
|
|
now = timeutils.utcnow_ts()
|
|
now_dt = datetime.datetime.utcfromtimestamp(now)
|
|
collection = self._collection(queue_name, project)
|
|
|
|
# Set the next basis marker for the first attempt.
|
|
next_marker = self._queue_ctrl._get_counter(queue_name, project)
|
|
|
|
# Unique transaction ID to facilitate atomic batch inserts
|
|
transaction = objectid.ObjectId()
|
|
|
|
prepared_messages = [
|
|
{
|
|
PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
|
|
't': message['ttl'],
|
|
'e': now_dt + datetime.timedelta(seconds=message['ttl']),
|
|
'u': client_uuid,
|
|
'c': {'id': None, 'e': now},
|
|
'b': message['body'] if 'body' in message else {},
|
|
'k': next_marker + index,
|
|
'tx': transaction,
|
|
}
|
|
|
|
for index, message in enumerate(messages)
|
|
]
|
|
|
|
# NOTE(kgriffs): Don't take the time to do a 2-phase insert
|
|
# if there is no way for it to partially succeed.
|
|
if len(prepared_messages) == 1:
|
|
transaction = None
|
|
prepared_messages[0]['tx'] = None
|
|
|
|
# Use a retry range for sanity, although we expect
|
|
# to rarely, if ever, reach the maximum number of
|
|
# retries.
|
|
#
|
|
# NOTE(kgriffs): With the default configuration (100 ms
|
|
# max sleep, 1000 max attempts), the max stall time
|
|
# before the operation is abandoned is 49.95 seconds.
|
|
for attempt in self._retry_range:
|
|
try:
|
|
ids = collection.insert(prepared_messages)
|
|
|
|
# Log a message if we retried, for debugging perf issues
|
|
if attempt != 0:
|
|
msgtmpl = _(u'%(attempts)d attempt(s) required to post '
|
|
u'%(num_messages)d messages to queue '
|
|
u'"%(queue)s" under project %(project)s')
|
|
|
|
LOG.debug(msgtmpl,
|
|
dict(queue=queue_name,
|
|
attempts=attempt + 1,
|
|
num_messages=len(ids),
|
|
project=project))
|
|
|
|
# Update the counter in preparation for the next batch
|
|
#
|
|
# NOTE(kgriffs): Due to the unique index on the messages
|
|
# collection, competing inserts will fail as a whole,
|
|
# and keep retrying until the counter is incremented
|
|
# such that the competing marker's will start at a
|
|
# unique number, 1 past the max of the messages just
|
|
# inserted above.
|
|
self._queue_ctrl._inc_counter(queue_name, project,
|
|
amount=len(ids))
|
|
|
|
# NOTE(kgriffs): Finalize the insert once we can say that
|
|
# all the messages made it. This makes bulk inserts
|
|
# atomic, assuming queries filter out any non-finalized
|
|
# messages.
|
|
if transaction is not None:
|
|
collection.update({'tx': transaction},
|
|
{'$set': {'tx': None}},
|
|
upsert=False, multi=True)
|
|
|
|
return map(str, ids)
|
|
|
|
except pymongo.errors.DuplicateKeyError as ex:
|
|
# TODO(kgriffs): Record stats of how often retries happen,
|
|
# and how many attempts, on average, are required to insert
|
|
# messages.
|
|
|
|
# NOTE(kgriffs): This can be used in conjunction with the
|
|
# log line, above, that is emitted after all messages have
|
|
# been posted, to gauge how long it is taking for messages
|
|
# to be posted to a given queue, or overall.
|
|
#
|
|
# TODO(kgriffs): Add transaction ID to help match up loglines
|
|
if attempt == 0:
|
|
msgtmpl = _(u'First attempt failed while '
|
|
u'adding messages to queue '
|
|
u'"%(queue)s" under project %(project)s')
|
|
|
|
LOG.debug(msgtmpl, dict(queue=queue_name, project=project))
|
|
|
|
# NOTE(kgriffs): Never retry past the point that competing
|
|
# messages expire and are GC'd, since once they are gone,
|
|
# the unique index no longer protects us from getting out
|
|
# of order, which could cause an observer to miss this
|
|
# message. The code below provides a sanity-check to ensure
|
|
# this situation can not happen.
|
|
elapsed = timeutils.utcnow_ts() - now
|
|
if elapsed > MAX_RETRY_POST_DURATION:
|
|
msgtmpl = _(u'Exceeded maximum retry duration for queue '
|
|
u'"%(queue)s" under project %(project)s')
|
|
|
|
LOG.warning(msgtmpl,
|
|
dict(queue=queue_name, project=project))
|
|
break
|
|
|
|
# Chill out for a moment to mitigate thrashing/thundering
|
|
self._backoff_sleep(attempt)
|
|
|
|
# NOTE(kgriffs): Perhaps we failed because a worker crashed
|
|
# after inserting messages, but before incrementing the
|
|
# counter; that would cause all future requests to stall,
|
|
# since they would keep getting the same base marker that is
|
|
# conflicting with existing messages, until the messages that
|
|
# "won" expire, at which time we would end up reusing markers,
|
|
# and that could make some messages invisible to an observer
|
|
# that is querying with a marker that is large than the ones
|
|
# being reused.
|
|
#
|
|
# To mitigate this, we apply a heuristic to determine whether
|
|
# a counter has stalled. We attempt to increment the counter,
|
|
# but only if it hasn't been updated for a few seconds, which
|
|
# should mean that nobody is left to update it!
|
|
#
|
|
# Note that we increment one at a time until the logjam is
|
|
# broken, since we don't know how many messages were posted
|
|
# by the worker before it crashed.
|
|
next_marker = self._queue_ctrl._inc_counter(
|
|
queue_name, project, window=COUNTER_STALL_WINDOW)
|
|
|
|
# Retry the entire batch with a new sequence of markers.
|
|
#
|
|
# NOTE(kgriffs): Due to the unique index, and how
|
|
# MongoDB works with batch requests, we will never
|
|
# end up with a partially-successful update. The first
|
|
# document in the batch will fail to insert, and the
|
|
# remainder of the documents will not be attempted.
|
|
if next_marker is None:
|
|
# NOTE(kgriffs): Usually we will end up here, since
|
|
# it should be rare that a counter becomes stalled.
|
|
next_marker = self._queue_ctrl._get_counter(
|
|
queue_name, project)
|
|
else:
|
|
msgtmpl = (u'Detected a stalled message counter for '
|
|
u'queue "%(queue)s" under project %(project)s. '
|
|
u'The counter was incremented to %(value)d.')
|
|
|
|
LOG.warning(msgtmpl,
|
|
dict(queue=queue_name,
|
|
project=project,
|
|
value=next_marker))
|
|
|
|
for index, message in enumerate(prepared_messages):
|
|
message['k'] = next_marker + index
|
|
|
|
except Exception as ex:
|
|
LOG.exception(ex)
|
|
raise
|
|
|
|
msgtmpl = _(u'Hit maximum number of attempts (%(max)s) for queue '
|
|
u'"%(queue)s" under project %(project)s')
|
|
|
|
LOG.warning(msgtmpl,
|
|
dict(max=self.driver.mongodb_conf.max_attempts,
|
|
queue=queue_name,
|
|
project=project))
|
|
|
|
succeeded_ids = []
|
|
raise errors.MessageConflict(queue_name, project,
|
|
succeeded_ids)
|
|
|
|
@utils.raises_conn_error
|
|
@utils.retries_on_autoreconnect
|
|
def delete(self, queue_name, message_id, project=None, claim=None):
|
|
# NOTE(cpp-cabrera): return early - this is an invalid message
|
|
# id so we won't be able to find it any way
|
|
mid = utils.to_oid(message_id)
|
|
if mid is None:
|
|
return
|
|
|
|
collection = self._collection(queue_name, project)
|
|
|
|
query = {
|
|
'_id': mid,
|
|
PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
|
|
}
|
|
|
|
# NOTE(cpp-cabrera): return early - the user gaves us an
|
|
# invalid claim id and that renders the rest of this
|
|
# request moot
|
|
cid = utils.to_oid(claim)
|
|
if cid is None:
|
|
return
|
|
|
|
now = timeutils.utcnow_ts()
|
|
cursor = collection.find(query).hint(ID_INDEX_FIELDS)
|
|
|
|
try:
|
|
message = next(cursor)
|
|
except StopIteration:
|
|
return
|
|
|
|
is_claimed = (message['c']['id'] is not None and
|
|
message['c']['e'] > now)
|
|
|
|
if claim is None:
|
|
if is_claimed:
|
|
raise errors.MessageIsClaimed(message_id)
|
|
|
|
else:
|
|
if message['c']['id'] != cid:
|
|
# NOTE(kgriffs): Read from primary in case the message
|
|
# was just barely claimed, and claim hasn't made it to
|
|
# the secondary.
|
|
pref = pymongo.read_preferences.ReadPreference.PRIMARY
|
|
message = collection.find_one(query, read_preference=pref)
|
|
|
|
if message['c']['id'] != cid:
|
|
raise errors.MessageIsClaimedBy(message_id, claim)
|
|
|
|
collection.remove(query['_id'], w=0)
|
|
|
|
@utils.raises_conn_error
|
|
@utils.retries_on_autoreconnect
|
|
def bulk_delete(self, queue_name, message_ids, project=None):
|
|
message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid]
|
|
query = {
|
|
'_id': {'$in': message_ids},
|
|
PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
|
|
}
|
|
|
|
collection = self._collection(queue_name, project)
|
|
collection.remove(query, w=0)
|
|
|
|
|
|
def _basic_message(msg, now):
|
|
oid = msg['_id']
|
|
age = now - utils.oid_ts(oid)
|
|
|
|
return {
|
|
'id': str(oid),
|
|
'age': int(age),
|
|
'ttl': msg['t'],
|
|
'body': msg['b'],
|
|
}
|