187 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			187 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # vim: tabstop=4 shiftwidth=4 softtabstop=4
 | |
| 
 | |
| # Copyright 2011 OpenStack Foundation.
 | |
| # All Rights Reserved.
 | |
| #
 | |
| #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 | |
| #    not use this file except in compliance with the License. You may obtain
 | |
| #    a copy of the License at
 | |
| #
 | |
| #         http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| #    Unless required by applicable law or agreed to in writing, software
 | |
| #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | |
| #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | |
| #    License for the specific language governing permissions and limitations
 | |
| #    under the License.
 | |
| 
 | |
| """
 | |
| Time related utilities and helper functions.
 | |
| """
 | |
| 
 | |
| import calendar
 | |
| import datetime
 | |
| 
 | |
| import iso8601
 | |
| 
 | |
| 
 | |
| # ISO 8601 extended time format with microseconds
 | |
| _ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f'
 | |
| _ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
 | |
| PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND
 | |
| 
 | |
| 
 | |
| def isotime(at=None, subsecond=False):
 | |
|     """Stringify time in ISO 8601 format"""
 | |
|     if not at:
 | |
|         at = utcnow()
 | |
|     st = at.strftime(_ISO8601_TIME_FORMAT
 | |
|                      if not subsecond
 | |
|                      else _ISO8601_TIME_FORMAT_SUBSECOND)
 | |
|     tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC'
 | |
|     st += ('Z' if tz == 'UTC' else tz)
 | |
|     return st
 | |
| 
 | |
| 
 | |
| def parse_isotime(timestr):
 | |
|     """Parse time from ISO 8601 format"""
 | |
|     try:
 | |
|         return iso8601.parse_date(timestr)
 | |
|     except iso8601.ParseError as e:
 | |
|         raise ValueError(e.message)
 | |
|     except TypeError as e:
 | |
|         raise ValueError(e.message)
 | |
| 
 | |
| 
 | |
| def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
 | |
|     """Returns formatted utcnow."""
 | |
|     if not at:
 | |
|         at = utcnow()
 | |
|     return at.strftime(fmt)
 | |
| 
 | |
| 
 | |
| def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
 | |
|     """Turn a formatted time back into a datetime."""
 | |
|     return datetime.datetime.strptime(timestr, fmt)
 | |
| 
 | |
| 
 | |
| def normalize_time(timestamp):
 | |
|     """Normalize time in arbitrary timezone to UTC naive object"""
 | |
|     offset = timestamp.utcoffset()
 | |
|     if offset is None:
 | |
|         return timestamp
 | |
|     return timestamp.replace(tzinfo=None) - offset
 | |
| 
 | |
| 
 | |
| def is_older_than(before, seconds):
 | |
|     """Return True if before is older than seconds."""
 | |
|     if isinstance(before, basestring):
 | |
|         before = parse_strtime(before).replace(tzinfo=None)
 | |
|     return utcnow() - before > datetime.timedelta(seconds=seconds)
 | |
| 
 | |
| 
 | |
| def is_newer_than(after, seconds):
 | |
|     """Return True if after is newer than seconds."""
 | |
|     if isinstance(after, basestring):
 | |
|         after = parse_strtime(after).replace(tzinfo=None)
 | |
|     return after - utcnow() > datetime.timedelta(seconds=seconds)
 | |
| 
 | |
| 
 | |
| def utcnow_ts():
 | |
|     """Timestamp version of our utcnow function."""
 | |
|     return calendar.timegm(utcnow().timetuple())
 | |
| 
 | |
| 
 | |
| def utcnow():
 | |
|     """Overridable version of utils.utcnow."""
 | |
|     if utcnow.override_time:
 | |
|         try:
 | |
|             return utcnow.override_time.pop(0)
 | |
|         except AttributeError:
 | |
|             return utcnow.override_time
 | |
|     return datetime.datetime.utcnow()
 | |
| 
 | |
| 
 | |
| def iso8601_from_timestamp(timestamp):
 | |
|     """Returns a iso8601 formated date from timestamp"""
 | |
|     return isotime(datetime.datetime.utcfromtimestamp(timestamp))
 | |
| 
 | |
| 
 | |
| utcnow.override_time = None
 | |
| 
 | |
| 
 | |
| def set_time_override(override_time=datetime.datetime.utcnow()):
 | |
|     """
 | |
|     Override utils.utcnow to return a constant time or a list thereof,
 | |
|     one at a time.
 | |
|     """
 | |
|     utcnow.override_time = override_time
 | |
| 
 | |
| 
 | |
| def advance_time_delta(timedelta):
 | |
|     """Advance overridden time using a datetime.timedelta."""
 | |
|     assert(not utcnow.override_time is None)
 | |
|     try:
 | |
|         for dt in utcnow.override_time:
 | |
|             dt += timedelta
 | |
|     except TypeError:
 | |
|         utcnow.override_time += timedelta
 | |
| 
 | |
| 
 | |
| def advance_time_seconds(seconds):
 | |
|     """Advance overridden time by seconds."""
 | |
|     advance_time_delta(datetime.timedelta(0, seconds))
 | |
| 
 | |
| 
 | |
| def clear_time_override():
 | |
|     """Remove the overridden time."""
 | |
|     utcnow.override_time = None
 | |
| 
 | |
| 
 | |
| def marshall_now(now=None):
 | |
|     """Make an rpc-safe datetime with microseconds.
 | |
| 
 | |
|     Note: tzinfo is stripped, but not required for relative times."""
 | |
|     if not now:
 | |
|         now = utcnow()
 | |
|     return dict(day=now.day, month=now.month, year=now.year, hour=now.hour,
 | |
|                 minute=now.minute, second=now.second,
 | |
|                 microsecond=now.microsecond)
 | |
| 
 | |
| 
 | |
| def unmarshall_time(tyme):
 | |
|     """Unmarshall a datetime dict."""
 | |
|     return datetime.datetime(day=tyme['day'],
 | |
|                              month=tyme['month'],
 | |
|                              year=tyme['year'],
 | |
|                              hour=tyme['hour'],
 | |
|                              minute=tyme['minute'],
 | |
|                              second=tyme['second'],
 | |
|                              microsecond=tyme['microsecond'])
 | |
| 
 | |
| 
 | |
| def delta_seconds(before, after):
 | |
|     """
 | |
|     Compute the difference in seconds between two date, time, or
 | |
|     datetime objects (as a float, to microsecond resolution).
 | |
|     """
 | |
|     delta = after - before
 | |
|     try:
 | |
|         return delta.total_seconds()
 | |
|     except AttributeError:
 | |
|         return ((delta.days * 24 * 3600) + delta.seconds +
 | |
|                 float(delta.microseconds) / (10 ** 6))
 | |
| 
 | |
| 
 | |
| def is_soon(dt, window):
 | |
|     """
 | |
|     Determines if time is going to happen in the next window seconds.
 | |
| 
 | |
|     :params dt: the time
 | |
|     :params window: minimum seconds to remain to consider the time not soon
 | |
| 
 | |
|     :return: True if expiration is within the given duration
 | |
|     """
 | |
|     soon = (utcnow() + datetime.timedelta(seconds=window))
 | |
|     return normalize_time(dt) <= soon
 | 
