# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """ Time related utilities and helper functions. """ import calendar import datetime import functools import logging import time import iso8601 try: import zoneinfo except ImportError: # zoneinfo is available in Python >= 3.9 import pytz zoneinfo = None from oslo_utils import reflection # ISO 8601 extended time format with microseconds _ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f' _ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S' PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND _MAX_DATETIME_SEC = 59 now = time.monotonic def parse_isotime(timestr): """Parse time from ISO 8601 format.""" try: return iso8601.parse_date(timestr) except iso8601.ParseError as e: raise ValueError(str(e)) except TypeError as e: raise ValueError(str(e)) def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT): """Turn a formatted time back into a datetime.""" return datetime.datetime.strptime(timestr, fmt) def normalize_time(timestamp): """Normalize time in arbitrary timezone to UTC naive object.""" offset = timestamp.utcoffset() if offset is None: return timestamp return timestamp.replace(tzinfo=None) - offset def is_older_than(before, seconds): """Return True if before is older than seconds. .. versionchanged:: 1.7 Accept datetime string with timezone information. Fix comparison with timezone aware datetime. """ if isinstance(before, str): before = parse_isotime(before) before = normalize_time(before) return utcnow() - before > datetime.timedelta(seconds=seconds) def is_newer_than(after, seconds): """Return True if after is newer than seconds. .. versionchanged:: 1.7 Accept datetime string with timezone information. Fix comparison with timezone aware datetime. """ if isinstance(after, str): after = parse_isotime(after) after = normalize_time(after) return after - utcnow() > datetime.timedelta(seconds=seconds) def utcnow_ts(microsecond=False): """Timestamp version of our utcnow function. See :py:class:`oslo_utils.fixture.TimeFixture`. .. versionchanged:: 1.3 Added optional *microsecond* parameter. """ if utcnow.override_time is None: # NOTE(kgriffs): This is several times faster # than going through calendar.timegm(...) timestamp = time.time() if not microsecond: timestamp = int(timestamp) return timestamp now = utcnow() timestamp = calendar.timegm(now.timetuple()) if microsecond: timestamp += float(now.microsecond) / 1000000 return timestamp def utcnow(with_timezone=False): """Overridable version of utils.utcnow that can return a TZ-aware datetime. See :py:class:`oslo_utils.fixture.TimeFixture`. .. versionchanged:: 1.6 Added *with_timezone* parameter. """ if utcnow.override_time: try: return utcnow.override_time.pop(0) except AttributeError: return utcnow.override_time if with_timezone: return datetime.datetime.now(tz=iso8601.iso8601.UTC) return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) utcnow.override_time = None def set_time_override(override_time=None): """Overrides utils.utcnow. Make it return a constant time or a list thereof, one at a time. See :py:class:`oslo_utils.fixture.TimeFixture`. :param override_time: datetime instance or list thereof. If not given, defaults to the current UTC time. """ utcnow.override_time = ( override_time or datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)) def advance_time_delta(timedelta): """Advance overridden time using a datetime.timedelta. See :py:class:`oslo_utils.fixture.TimeFixture`. """ assert utcnow.override_time is not None # nosec try: for dt in utcnow.override_time: dt += timedelta except TypeError: utcnow.override_time += timedelta def advance_time_seconds(seconds): """Advance overridden time by seconds. See :py:class:`oslo_utils.fixture.TimeFixture`. """ advance_time_delta(datetime.timedelta(0, seconds)) def clear_time_override(): """Remove the overridden time. See :py:class:`oslo_utils.fixture.TimeFixture`. """ utcnow.override_time = None def marshall_now(now=None): """Make an rpc-safe datetime with microseconds. .. versionchanged:: 1.6 Timezone information is now serialized instead of being stripped. """ if not now: now = utcnow() d = dict(day=now.day, month=now.month, year=now.year, hour=now.hour, minute=now.minute, second=now.second, microsecond=now.microsecond) if now.tzinfo: # Need to handle either iso8601 or python UTC format tzname = now.tzinfo.tzname(None) d['tzname'] = 'UTC' if tzname == 'UTC+00:00' else tzname return d def unmarshall_time(tyme): """Unmarshall a datetime dict. .. versionchanged:: 1.5 Drop leap second. .. versionchanged:: 1.6 Added support for timezone information. """ # NOTE(ihrachys): datetime does not support leap seconds, # so the best thing we can do for now is dropping them # http://bugs.python.org/issue23574 second = min(tyme['second'], _MAX_DATETIME_SEC) dt = datetime.datetime(day=tyme['day'], month=tyme['month'], year=tyme['year'], hour=tyme['hour'], minute=tyme['minute'], second=second, microsecond=tyme['microsecond']) tzname = tyme.get('tzname') if tzname: # Need to handle either iso8601 or python UTC format tzname = 'UTC' if tzname == 'UTC+00:00' else tzname if zoneinfo: tzinfo = zoneinfo.ZoneInfo(tzname) dt = dt.replace(tzinfo=tzinfo) else: tzinfo = pytz.timezone(tzname) dt = tzinfo.localize(dt) return dt def delta_seconds(before, after): """Return the difference between two timing objects. Compute the difference in seconds between two date, time, or datetime objects (as a float, to microsecond resolution). """ delta = after - before return delta.total_seconds() def is_soon(dt, window): """Determines if time is going to happen in the next window seconds. :param dt: the time :param window: minimum seconds to remain to consider the time not soon :return: True if expiration is within the given duration """ soon = (utcnow() + datetime.timedelta(seconds=window)) return normalize_time(dt) <= soon class Split(object): """A *immutable* stopwatch split. See: http://en.wikipedia.org/wiki/Stopwatch for what this is/represents. .. versionadded:: 1.4 """ __slots__ = ['_elapsed', '_length'] def __init__(self, elapsed, length): self._elapsed = elapsed self._length = length @property def elapsed(self): """Duration from stopwatch start.""" return self._elapsed @property def length(self): """Seconds from last split (or the elapsed time if no prior split).""" return self._length def __repr__(self): r = reflection.get_class_name(self, fully_qualified=False) r += "(elapsed=%s, length=%s)" % (self._elapsed, self._length) return r def time_it(logger, log_level=logging.DEBUG, message="It took %(seconds).02f seconds to" " run function '%(func_name)s'", enabled=True, min_duration=0.01): """Decorator that will log how long its decorated function takes to run. This does **not** output a log if the decorated function fails with an exception. :param logger: logger instance to use when logging elapsed time :param log_level: logger logging level to use when logging elapsed time :param message: customized message to use when logging elapsed time, the message may use automatically provide values ``%(seconds)`` and ``%(func_name)`` if it finds those values useful to record :param enabled: whether to enable or disable this decorator (useful to decorate a function with this decorator, and then easily be able to switch that decoration off by some config or other value) :param min_duration: argument that determines if logging is triggered or not, it is by default set to 0.01 seconds to avoid logging when durations and/or elapsed function call times are less than 0.01 seconds, to disable any ``min_duration`` checks this value should be set to less than or equal to zero or set to none """ def decorator(func): if not enabled: return func @functools.wraps(func) def wrapper(*args, **kwargs): with StopWatch() as w: result = func(*args, **kwargs) time_taken = w.elapsed() if min_duration is None or time_taken >= min_duration: logger.log(log_level, message, {'seconds': time_taken, 'func_name': reflection.get_callable_name(func)}) return result return wrapper return decorator class StopWatch(object): """A simple timer/stopwatch helper class. Inspired by: apache-commons-lang java stopwatch. Not thread-safe (when a single watch is mutated by multiple threads at the same time). Thread-safe when used by a single thread (not shared) or when operations are performed in a thread-safe manner on these objects by wrapping those operations with locks. .. versionadded:: 1.4 """ _STARTED = 'STARTED' _STOPPED = 'STOPPED' def __init__(self, duration=None): if duration is not None and duration < 0: raise ValueError("Duration must be greater or equal to" " zero and not %s" % duration) self._duration = duration self._started_at = None self._stopped_at = None self._state = None self._splits = () def start(self): """Starts the watch (if not already started). NOTE(harlowja): resets any splits previously captured (if any). """ if self._state == self._STARTED: return self self._started_at = now() self._stopped_at = None self._state = self._STARTED self._splits = () return self @property def splits(self): """Accessor to all/any splits that have been captured.""" return self._splits def split(self): """Captures a split/elapsed since start time (and doesn't stop).""" if self._state == self._STARTED: elapsed = self.elapsed() if self._splits: length = self._delta_seconds(self._splits[-1].elapsed, elapsed) else: length = elapsed self._splits = self._splits + (Split(elapsed, length),) return self._splits[-1] else: raise RuntimeError("Can not create a split time of a stopwatch" " if it has not been started or if it has been" " stopped") def restart(self): """Restarts the watch from a started/stopped state.""" if self._state == self._STARTED: self.stop() self.start() return self @staticmethod def _delta_seconds(earlier, later): # Uses max to avoid the delta/time going backwards (and thus negative). return max(0.0, later - earlier) def elapsed(self, maximum=None): """Returns how many seconds have elapsed.""" if self._state not in (self._STARTED, self._STOPPED): raise RuntimeError("Can not get the elapsed time of a stopwatch" " if it has not been started/stopped") if self._state == self._STOPPED: elapsed = self._delta_seconds(self._started_at, self._stopped_at) else: elapsed = self._delta_seconds(self._started_at, now()) if maximum is not None and elapsed > maximum: elapsed = max(0.0, maximum) return elapsed def __enter__(self): """Starts the watch.""" self.start() return self def __exit__(self, type, value, traceback): """Stops the watch (ignoring errors if stop fails).""" try: self.stop() except RuntimeError: # nosec: errors are meant to be ignored pass def leftover(self, return_none=False): """Returns how many seconds are left until the watch expires. :param return_none: when ``True`` instead of raising a ``RuntimeError`` when no duration has been set this call will return ``None`` instead. :type return_none: boolean """ if self._state != self._STARTED: raise RuntimeError("Can not get the leftover time of a stopwatch" " that has not been started") if self._duration is None: if not return_none: raise RuntimeError("Can not get the leftover time of a watch" " that has no duration") return None return max(0.0, self._duration - self.elapsed()) def expired(self): """Returns if the watch has expired (ie, duration provided elapsed).""" if self._state not in (self._STARTED, self._STOPPED): raise RuntimeError("Can not check if a stopwatch has expired" " if it has not been started/stopped") if self._duration is None: return False return self.elapsed() > self._duration def has_started(self): """Returns True if the watch is in a started state.""" return self._state == self._STARTED def has_stopped(self): """Returns True if the watch is in a stopped state.""" return self._state == self._STOPPED def resume(self): """Resumes the watch from a stopped state.""" if self._state == self._STOPPED: self._state = self._STARTED return self else: raise RuntimeError("Can not resume a stopwatch that has not been" " stopped") def stop(self): """Stops the watch.""" if self._state == self._STOPPED: return self if self._state != self._STARTED: raise RuntimeError("Can not stop a stopwatch that has not been" " started") self._stopped_at = now() self._state = self._STOPPED return self