diff --git a/marconi/storage/mongodb/claims.py b/marconi/storage/mongodb/claims.py index 12643141d..cfded92ee 100644 --- a/marconi/storage/mongodb/claims.py +++ b/marconi/storage/mongodb/claims.py @@ -21,8 +21,6 @@ Field Mappings: letter of their long name. """ -import datetime - from bson import objectid from marconi.common import config @@ -64,7 +62,7 @@ class ClaimController(storage.ClaimBase): msg_ctrl = self.driver.message_controller # Base query, always check expire time - now = timeutils.utcnow() + now = timeutils.utcnow_ts() cid = utils.to_oid(claim_id) if cid is None: raise exceptions.ClaimDoesNotExist(queue, project, claim_id) @@ -87,8 +85,8 @@ class ClaimController(storage.ClaimBase): project=project)) claim = next(msgs) - update_time = claim['e'] - datetime.timedelta(seconds=claim['t']) - age = timeutils.delta_seconds(update_time, now) + update_time = claim['e'] - claim['t'] + age = update_time - now claim = { 'age': int(age), @@ -128,12 +126,10 @@ class ClaimController(storage.ClaimBase): grace = metadata['grace'] oid = objectid.ObjectId() - now = timeutils.utcnow() - ttl_delta = datetime.timedelta(seconds=ttl) - claim_expires = now + ttl_delta + now = timeutils.utcnow_ts() + claim_expires = now + ttl - grace_delta = datetime.timedelta(seconds=grace) - message_expires = claim_expires + grace_delta + message_expires = claim_expires + grace message_ttl = ttl + grace meta = { @@ -153,7 +149,7 @@ class ClaimController(storage.ClaimBase): if len(ids) == 0: return (None, messages) - now = timeutils.utcnow() + now = timeutils.utcnow_ts() # Set claim field for messages in ids updated = msg_ctrl._col.update({'_id': {'$in': ids}, @@ -196,11 +192,9 @@ class ClaimController(storage.ClaimBase): if cid is None: raise exceptions.ClaimDoesNotExist(claim_id, queue, project) - now = timeutils.utcnow() + now = timeutils.utcnow_ts() ttl = int(metadata.get('ttl', 60)) - ttl_delta = datetime.timedelta(seconds=ttl) - - expires = now + ttl_delta + expires = now + ttl msg_ctrl = self.driver.message_controller claimed = msg_ctrl.claimed(queue, cid, expires=now, diff --git a/marconi/storage/mongodb/messages.py b/marconi/storage/mongodb/messages.py index d8fcb044e..efb2ec99f 100644 --- a/marconi/storage/mongodb/messages.py +++ b/marconi/storage/mongodb/messages.py @@ -21,7 +21,6 @@ Field Mappings: letter of their long name. """ -import datetime import time import pymongo.errors @@ -208,7 +207,7 @@ class MessageController(storage.MessageBase): 'p': project, 'q': queue_name, 'k': {'$ne': head['k']}, - 'e': {'$lte': timeutils.utcnow()}, + 'e': {'$lte': timeutils.utcnow_ts()}, } self._col.remove(query, w=0) @@ -261,7 +260,7 @@ class MessageController(storage.MessageBase): raise ValueError(u'sort must be either 1 (ascending) ' u'or -1 (descending)') - now = timeutils.utcnow() + now = timeutils.utcnow_ts() query = { # Messages must belong to this @@ -312,12 +311,12 @@ class MessageController(storage.MessageBase): 'q': queue_name, # The messages can not be expired - 'e': {'$gt': timeutils.utcnow()}, + 'e': {'$gt': timeutils.utcnow_ts()}, } if not include_claimed: # Exclude messages that are claimed - query['c.e'] = {'$lte': timeutils.utcnow()} + query['c.e'] = {'$lte': timeutils.utcnow_ts()} return self._col.find(query).hint(COUNTING_INDEX_FIELDS).count() @@ -362,7 +361,7 @@ class MessageController(storage.MessageBase): 'p': project, 'q': queue_name, 'c.id': claim_id, - 'c.e': {'$gt': expires or timeutils.utcnow()}, + 'c.e': {'$gt': expires or timeutils.utcnow_ts()}, } # NOTE(kgriffs): Claimed messages bust be queried from @@ -375,7 +374,7 @@ class MessageController(storage.MessageBase): if limit is not None: msgs = msgs.limit(limit) - now = timeutils.utcnow() + now = timeutils.utcnow_ts() def denormalizer(msg): doc = _basic_message(msg, now) @@ -395,7 +394,7 @@ class MessageController(storage.MessageBase): # NOTE(cpp-cabrera): unclaim by setting the claim ID to None # and the claim expiration time to now - now = timeutils.utcnow() + now = timeutils.utcnow_ts() self._col.update({'p': project, 'q': queue_name, 'c.id': cid}, {'$set': {'c': {'id': None, 'e': now}}}, upsert=False, multi=True) @@ -443,7 +442,7 @@ class MessageController(storage.MessageBase): marker_id = {} - now = timeutils.utcnow() + now = timeutils.utcnow_ts() def denormalizer(msg): marker_id['next'] = msg['k'] @@ -464,7 +463,7 @@ class MessageController(storage.MessageBase): raise exceptions.MessageDoesNotExist(message_id, queue_name, project) - now = timeutils.utcnow() + now = timeutils.utcnow_ts() query = { '_id': mid, @@ -487,7 +486,7 @@ class MessageController(storage.MessageBase): if not message_ids: return iter([]) - now = timeutils.utcnow() + now = timeutils.utcnow_ts() # Base query, always check expire time query = { @@ -508,7 +507,7 @@ class MessageController(storage.MessageBase): @utils.raises_conn_error def post(self, queue_name, messages, client_uuid, project=None): - now = timeutils.utcnow() + now = timeutils.utcnow_ts() if not self._queue_controller.exists(queue_name, project): raise exceptions.QueueDoesNotExist(queue_name, project) @@ -521,7 +520,7 @@ class MessageController(storage.MessageBase): 't': message['ttl'], 'q': queue_name, 'p': project, - 'e': now + datetime.timedelta(seconds=message['ttl']), + 'e': now + message['ttl'], 'u': client_uuid, 'c': {'id': None, 'e': now}, 'b': message['body'] if 'body' in message else {}, @@ -647,7 +646,7 @@ class MessageController(storage.MessageBase): if cid is None: return - now = timeutils.utcnow() + now = timeutils.utcnow_ts() query['e'] = {'$gt': now} message = self._col.find_one(query) @@ -681,7 +680,7 @@ class MessageController(storage.MessageBase): def _basic_message(msg, now): oid = msg['_id'] - age = timeutils.delta_seconds(utils.oid_utc(oid), now) + age = utils.oid_ts(oid) - now return { 'id': str(oid), diff --git a/marconi/storage/mongodb/queues.py b/marconi/storage/mongodb/queues.py index f2c752265..5aee9b34b 100644 --- a/marconi/storage/mongodb/queues.py +++ b/marconi/storage/mongodb/queues.py @@ -164,7 +164,7 @@ class QueueController(storage.QueueBase): except exceptions.QueueIsEmpty: pass else: - now = timeutils.utcnow() + now = timeutils.utcnow_ts() message_stats['oldest'] = utils.stat_message(oldest, now) message_stats['newest'] = utils.stat_message(newest, now) diff --git a/marconi/storage/mongodb/utils.py b/marconi/storage/mongodb/utils.py index 5cf86594f..d9d43451c 100644 --- a/marconi/storage/mongodb/utils.py +++ b/marconi/storage/mongodb/utils.py @@ -14,12 +14,14 @@ # limitations under the License. import collections +import datetime import functools import random import re from bson import errors as berrors from bson import objectid +from bson import tz_util from pymongo import errors from marconi.common import exceptions @@ -30,6 +32,10 @@ from marconi.storage import exceptions as storage_exceptions DUP_MARKER_REGEX = re.compile(r'\$queue_marker.*?:\s(\d+)') +# BSON ObjectId gives TZ-aware datetime, so we generate a +# TZ-aware UNIX epoch for convenience. +EPOCH = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=tz_util.utc) + LOG = logging.getLogger(__name__) @@ -118,7 +124,7 @@ def to_oid(obj): """Creates a new ObjectId based on the input. Returns None when TypeError or berrors.InvalidId - is raised by the ObjectID class. + is raised by the ObjectId class. :param obj: Anything that can be passed as an input to `objectid.ObjectId` @@ -129,12 +135,12 @@ def to_oid(obj): return None -def oid_utc(oid): - """Converts an ObjectId to a non-tz-aware datetime. +def oid_ts(oid): + """Converts an ObjectId to a UNIX timestamp. :raises: TypeError if oid isn't an ObjectId """ try: - return timeutils.normalize_time(oid.generation_time) + return timeutils.delta_seconds(EPOCH, oid.generation_time) except AttributeError: raise TypeError(u'Expected ObjectId and got %s' % type(oid)) @@ -142,13 +148,13 @@ def oid_utc(oid): def stat_message(message, now): """Creates a stat document from the given message, relative to now.""" oid = message['_id'] - created = oid_utc(oid) - age = timeutils.delta_seconds(created, now) + created = oid_ts(oid) + age = created - now return { 'id': str(oid), 'age': int(age), - 'created': timeutils.isotime(created), + 'created': timeutils.iso8601_from_timestamp(created), }