kgriffs 4c93505f8c fix: Checking whether queue exists adds latency
This patch uses oslo.cache to cache the call to
QueueController.exists for a few seconds. A decorator was
added to marconi.common in order to DRY up the caching
logic between QueueController and sharding.Catalog.

MessagePack is used to normalize the cached data, since it
is very fast at encoding and esp. decoding, and creates
very compact representations.

Closes-Bug: #1245573
Change-Id: Id4fd822fd907815ce8da2f03e51f888b6c63ad6b
2014-06-06 18:41:19 -05:00

314 lines
11 KiB
Python

# Copyright (c) 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implements the MongoDB storage controller for queues.
Field Mappings:
In order to reduce the disk / memory space used,
field names will be, most of the time, the first
letter of their long name.
"""
import pymongo.errors
from marconi.common import decorators
from marconi.openstack.common.gettextutils import _
import marconi.openstack.common.log as logging
from marconi.openstack.common import timeutils
from marconi.queues import storage
from marconi.queues.storage import errors
from marconi.queues.storage.mongodb import utils
LOG = logging.getLogger(__name__)
# NOTE(kgriffs): E.g.: 'marconi-queuecontroller:5083853/my-queue'
_QUEUE_CACHE_PREFIX = 'queuecontroller:'
# NOTE(kgriffs): This causes some race conditions, but they are
# harmless. If a queue was deleted, but we are still returning
# that it exists, some messages may get inserted without the
# client getting an error. In this case, those messages would
# be orphaned and expire eventually according to their TTL.
#
# What this means for the client is that they have a bug; they
# deleted a queue and then immediately tried to post messages
# to it. If they keep trying to use the queue, they will
# eventually start getting an error, once the cache entry
# expires, which should clue them in on what happened.
#
# TODO(kgriffs): Make dynamic?
_QUEUE_CACHE_TTL = 5
def _queue_exists_key(queue, project=None):
# NOTE(kgriffs): Use string concatenation for performance,
# also put project first since it is guaranteed to be
# unique, which should reduce lookup time.
return _QUEUE_CACHE_PREFIX + 'exists:' + str(project) + '/' + queue
class QueueController(storage.Queue):
"""Implements queue resource operations using MongoDB.
Queues are scoped by project, which is prefixed to the
queue name.
Queues:
Name Field
---------------------
name -> p_q
msg counter -> c
metadata -> m
Message Counter:
Name Field
-------------------
value -> v
modified ts -> t
"""
def __init__(self, *args, **kwargs):
super(QueueController, self).__init__(*args, **kwargs)
self._cache = self.driver.cache
self._collection = self.driver.queues_database.queues
# NOTE(flaper87): This creates a unique index for
# project and name. Using project as the prefix
# allows for querying by project and project+name.
# This is also useful for retrieving the queues list for
# a specific project, for example. Order matters!
self._collection.ensure_index([('p_q', 1)], unique=True)
#-----------------------------------------------------------------------
# Helpers
#-----------------------------------------------------------------------
def _get(self, name, project=None, fields={'m': 1, '_id': 0}):
queue = self._collection.find_one(_get_scoped_query(name, project),
fields=fields)
if queue is None:
raise errors.QueueDoesNotExist(name, project)
return queue
def _get_counter(self, name, project=None):
"""Retrieves the current message counter value for a given queue.
This helper is used to generate monotonic pagination
markers that are saved as part of the message
document.
Note 1: Markers are scoped per-queue and so are *not*
globally unique or globally ordered.
Note 2: If two or more requests to this method are made
in parallel, this method will return the same counter
value. This is done intentionally so that the caller
can detect a parallel message post, allowing it to
mitigate race conditions between producer and
observer clients.
:param name: Name of the queue to which the counter is scoped
:param project: Queue's project
:returns: current message counter as an integer
"""
doc = self._collection.find_one(_get_scoped_query(name, project),
fields={'c.v': 1, '_id': 0})
if doc is None:
raise errors.QueueDoesNotExist(name, project)
return doc['c']['v']
def _inc_counter(self, name, project=None, amount=1, window=None):
"""Increments the message counter and returns the new value.
:param name: Name of the queue to which the counter is scoped
:param project: Queue's project name
:param amount: (Default 1) Amount by which to increment the counter
:param window: (Default None) A time window, in seconds, that
must have elapsed since the counter was last updated, in
order to increment the counter.
:returns: Updated message counter value, or None if window
was specified, and the counter has already been updated
within the specified time period.
:raises: storage.errors.QueueDoesNotExist
"""
now = timeutils.utcnow_ts()
update = {'$inc': {'c.v': amount}, '$set': {'c.t': now}}
query = _get_scoped_query(name, project)
if window is not None:
threshold = now - window
query['c.t'] = {'$lt': threshold}
while True:
try:
doc = self._collection.find_and_modify(
query, update, new=True, fields={'c.v': 1, '_id': 0})
break
except pymongo.errors.AutoReconnect as ex:
LOG.exception(ex)
if doc is None:
if window is None:
# NOTE(kgriffs): Since we did not filter by a time window,
# the queue should have been found and updated. Perhaps
# the queue has been deleted?
message = _(u'Failed to increment the message '
u'counter for queue %(name)s and '
u'project %(project)s')
message %= dict(name=name, project=project)
LOG.warning(message)
raise errors.QueueDoesNotExist(name, project)
# NOTE(kgriffs): Assume the queue existed, but the counter
# was recently updated, causing the range query on 'c.t' to
# exclude the record.
return None
return doc['c']['v']
#-----------------------------------------------------------------------
# Interface
#-----------------------------------------------------------------------
def list(self, project=None, marker=None,
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
query = utils.scoped_query(marker, project)
fields = {'p_q': 1, '_id': 0}
if detailed:
fields['m'] = 1
cursor = self._collection.find(query, fields=fields)
cursor = cursor.limit(limit).sort('p_q')
marker_name = {}
def normalizer(record):
queue = {'name': utils.descope_queue_name(record['p_q'])}
marker_name['next'] = queue['name']
if detailed:
queue['metadata'] = record['m']
return queue
yield utils.HookedCursor(cursor, normalizer)
yield marker_name and marker_name['next']
@utils.raises_conn_error
@utils.retries_on_autoreconnect
def get_metadata(self, name, project=None):
queue = self._get(name, project)
return queue.get('m', {})
@utils.raises_conn_error
# @utils.retries_on_autoreconnect
def create(self, name, project=None):
# NOTE(flaper87): If the connection fails after it was called
# and we retry to insert the queue, we could end up returning
# `False` because of the `DuplicatedKeyError` although the
# queue was indeed created by this API call.
#
# TODO(kgriffs): Commented out `retries_on_autoreconnect` for
# now due to the above issue, since creating a queue is less
# important to make super HA.
try:
# NOTE(kgriffs): Start counting at 1, and assume the first
# message ever posted will succeed and set t to a UNIX
# "modified at" timestamp.
counter = {'v': 1, 't': 0}
scoped_name = utils.scope_queue_name(name, project)
self._collection.insert({'p_q': scoped_name, 'm': {},
'c': counter})
except pymongo.errors.DuplicateKeyError:
return False
else:
return True
# NOTE(kgriffs): Only cache when it exists; if it doesn't exist, and
# someone creates it, we want it to be immediately visible.
@utils.raises_conn_error
@utils.retries_on_autoreconnect
@decorators.caches(_queue_exists_key, _QUEUE_CACHE_TTL, lambda v: v)
def exists(self, name, project=None):
query = _get_scoped_query(name, project)
return self._collection.find_one(query) is not None
@utils.raises_conn_error
@utils.retries_on_autoreconnect
def set_metadata(self, name, metadata, project=None):
rst = self._collection.update(_get_scoped_query(name, project),
{'$set': {'m': metadata}},
multi=False,
manipulate=False)
if not rst['updatedExisting']:
raise errors.QueueDoesNotExist(name, project)
@utils.raises_conn_error
@utils.retries_on_autoreconnect
@exists.purges
def delete(self, name, project=None):
self.driver.message_controller._purge_queue(name, project)
self._collection.remove(_get_scoped_query(name, project))
@utils.raises_conn_error
@utils.retries_on_autoreconnect
def stats(self, name, project=None):
if not self.exists(name, project=project):
raise errors.QueueDoesNotExist(name, project)
controller = self.driver.message_controller
active = controller._count(name, project=project,
include_claimed=False)
total = controller._count(name, project=project,
include_claimed=True)
message_stats = {
'claimed': total - active,
'free': active,
'total': total,
}
try:
oldest = controller.first(name, project=project, sort=1)
newest = controller.first(name, project=project, sort=-1)
except errors.QueueIsEmpty:
pass
else:
now = timeutils.utcnow_ts()
message_stats['oldest'] = utils.stat_message(oldest, now)
message_stats['newest'] = utils.stat_message(newest, now)
return {'messages': message_stats}
def _get_scoped_query(name, project):
return {'p_q': utils.scope_queue_name(name, project)}