Replace deprecated functions in datetime

The datetime.utcfromtimestamp() and datetime.utcnow()
are deprecated in Python 3.12.
Replace datetime.utcfromtimestamp() with datetime.fromtimestamp().
Replace datetime.utcnow() with oslo_utils.timeutils.utcnow() or
datetime.now().

This removes glance's in-tree 'delta_seconds' and 'utcnow'
implementation and replaces the delta_seconds implementation
with oslo_utils.timeutils.delta_seconds().

Change-Id: I509594dd29a8e50ad94b3c53e8f55037b3548add
Signed-off-by: Takashi Natsume <takanattie@gmail.com>
This commit is contained in:
Takashi Natsume 2024-10-05 17:21:32 +09:00
parent acab9351a1
commit 1f8a84cdcc
16 changed files with 107 additions and 132 deletions

View File

@ -126,14 +126,16 @@ def list_cached(args):
if last_accessed == 0:
last_accessed = "N/A"
else:
last_accessed = datetime.datetime.utcfromtimestamp(
last_accessed).isoformat()
last_accessed = datetime.datetime.fromtimestamp(
last_accessed, tz=datetime.timezone.utc).replace(
tzinfo=None).isoformat()
pretty_table.add_row((
image['image_id'],
last_accessed,
datetime.datetime.utcfromtimestamp(
image['last_modified']).isoformat(),
image['last_modified'], tz=datetime.timezone.utc).replace(
tzinfo=None).isoformat(),
image['size'],
image['hits']))

View File

@ -21,6 +21,7 @@ import datetime
import iso8601
from oslo_utils import encodeutils
from oslo_utils import timeutils
# ISO 8601 extended time format with microseconds
_ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f'
@ -31,7 +32,7 @@ PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND
def isotime(at=None, subsecond=False):
"""Stringify time in ISO 8601 format."""
if not at:
at = utcnow()
at = timeutils.utcnow()
st = at.strftime(_ISO8601_TIME_FORMAT
if not subsecond
else _ISO8601_TIME_FORMAT_SUBSECOND)
@ -51,19 +52,6 @@ def parse_isotime(timestr):
raise ValueError(encodeutils.exception_to_unicode(e))
def utcnow(with_timezone=False):
"""Overridable version of utils.utcnow that can return a TZ-aware datetime.
"""
if utcnow.override_time:
try:
return utcnow.override_time.pop(0)
except AttributeError:
return utcnow.override_time
if with_timezone:
return datetime.datetime.now(tz=iso8601.iso8601.UTC)
return datetime.datetime.utcnow()
def normalize_time(timestamp):
"""Normalize time in arbitrary timezone to UTC naive object."""
offset = timestamp.utcoffset()
@ -74,17 +62,5 @@ def normalize_time(timestamp):
def iso8601_from_timestamp(timestamp, microsecond=False):
"""Returns an iso8601 formatted date from timestamp."""
return isotime(datetime.datetime.utcfromtimestamp(timestamp), microsecond)
utcnow.override_time = None
def delta_seconds(before, after):
"""Return the difference between two timing objects.
Compute the difference in seconds between two date, time, or
datetime objects (as a float, to microsecond resolution).
"""
delta = after - before
return datetime.timedelta.total_seconds(delta)
return isotime(datetime.datetime.fromtimestamp(
timestamp, tz=datetime.timezone.utc).replace(tzinfo=None), microsecond)

View File

@ -20,6 +20,7 @@ import uuid
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils as oslo_timeutils
from glance.common import exception
from glance.common import timeutils
@ -110,7 +111,7 @@ def _get_session():
@utils.no_4byte_params
def _image_location_format(image_id, value, meta_data, status, deleted=False):
dt = timeutils.utcnow()
dt = oslo_timeutils.utcnow()
return {
'id': str(uuid.uuid4()),
'image_id': image_id,
@ -136,7 +137,7 @@ def _image_property_format(image_id, name, value):
def _image_member_format(image_id, tenant_id, can_share, status='pending',
deleted=False):
dt = timeutils.utcnow()
dt = oslo_timeutils.utcnow()
return {
'id': str(uuid.uuid4()),
'image_id': image_id,
@ -169,7 +170,7 @@ def _format_task_from_db(task_ref, task_info_ref):
def _task_format(task_id, **values):
dt = timeutils.utcnow()
dt = oslo_timeutils.utcnow()
task = {
'id': task_id,
'type': 'import',
@ -215,7 +216,7 @@ def _image_update(image, values, properties):
def _image_format(image_id, **values):
dt = timeutils.utcnow()
dt = oslo_timeutils.utcnow()
image = {
'id': image_id,
'name': None,
@ -558,7 +559,7 @@ def image_property_delete(context, prop_ref, image_ref):
prop = p
if not prop:
raise exception.NotFound()
prop['deleted_at'] = timeutils.utcnow()
prop['deleted_at'] = oslo_timeutils.utcnow()
prop['deleted'] = True
return prop
@ -622,7 +623,7 @@ def image_member_update(context, member_id, values):
for member in DATA['members']:
if member['id'] == member_id:
member.update(values)
member['updated_at'] = timeutils.utcnow()
member['updated_at'] = oslo_timeutils.utcnow()
return copy.deepcopy(member)
else:
raise exception.NotFound()
@ -662,7 +663,7 @@ def image_location_update(context, image_id, location):
raise exception.Invalid(msg)
deleted = location['status'] in ('deleted', 'pending_delete')
updated_time = timeutils.utcnow()
updated_time = oslo_timeutils.utcnow()
delete_time = updated_time if deleted else None
updated = False
@ -696,7 +697,7 @@ def image_location_delete(context, image_id, location_id, status,
for loc in DATA['locations']:
if loc['id'] == location_id and loc['image_id'] == image_id:
deleted = True
delete_time = delete_time or timeutils.utcnow()
delete_time = delete_time or oslo_timeutils.utcnow()
loc.update({"deleted": deleted,
"status": status,
"updated_at": delete_time,
@ -827,7 +828,7 @@ def image_update(context, image_id, image_values, purge_props=False,
# this matches weirdness in the sqlalchemy api
prop['deleted'] = True
image['updated_at'] = timeutils.utcnow()
image['updated_at'] = oslo_timeutils.utcnow()
_image_update(image, image_values,
{k: v for k, v in new_properties.items()
if k not in atomic_props})
@ -843,7 +844,7 @@ def image_update(context, image_id, image_values, purge_props=False,
def image_destroy(context, image_id):
global DATA
try:
delete_time = timeutils.utcnow()
delete_time = oslo_timeutils.utcnow()
DATA['images'][image_id]['deleted'] = True
DATA['images'][image_id]['deleted_at'] = delete_time
@ -976,7 +977,7 @@ def task_update(context, task_id, values):
raise exception.TaskNotFound(task_id=task_id)
task.update(task_values)
task['updated_at'] = timeutils.utcnow()
task['updated_at'] = oslo_timeutils.utcnow()
DATA['tasks'][task_id] = task
task_info = _task_info_update(task['id'], task_info_values)
@ -1017,8 +1018,8 @@ def task_delete(context, task_id):
global DATA
try:
DATA['tasks'][task_id]['deleted'] = True
DATA['tasks'][task_id]['deleted_at'] = timeutils.utcnow()
DATA['tasks'][task_id]['updated_at'] = timeutils.utcnow()
DATA['tasks'][task_id]['deleted_at'] = oslo_timeutils.utcnow()
DATA['tasks'][task_id]['updated_at'] = oslo_timeutils.utcnow()
return copy.deepcopy(DATA['tasks'][task_id])
except KeyError:
LOG.debug("No task found with ID %s", task_id)
@ -1028,7 +1029,7 @@ def task_delete(context, task_id):
def _task_soft_delete(context):
"""Scrub task entities which are expired """
global DATA
now = timeutils.utcnow()
now = oslo_timeutils.utcnow()
tasks = DATA['tasks'].values()
for task in tasks:
@ -1036,7 +1037,7 @@ def _task_soft_delete(context):
and task['expires_at'] <= now):
task['deleted'] = True
task['deleted_at'] = timeutils.utcnow()
task['deleted_at'] = oslo_timeutils.utcnow()
@log_call
@ -1246,7 +1247,7 @@ def metadef_namespace_update(context, namespace_id, values):
DATA['metadef_namespaces'].remove(namespace)
namespace.update(namespace_values)
namespace['updated_at'] = timeutils.utcnow()
namespace['updated_at'] = oslo_timeutils.utcnow()
DATA['metadef_namespaces'].append(namespace)
return namespace
@ -1483,7 +1484,7 @@ def metadef_object_update(context, namespace_name, object_id, values):
DATA['metadef_objects'].remove(object)
object.update(values)
object['updated_at'] = timeutils.utcnow()
object['updated_at'] = oslo_timeutils.utcnow()
DATA['metadef_objects'].append(object)
return object
@ -1611,7 +1612,7 @@ def metadef_property_update(context, namespace_name, property_id, values):
DATA['metadef_properties'].remove(property)
property.update(values)
property['updated_at'] = timeutils.utcnow()
property['updated_at'] = oslo_timeutils.utcnow()
DATA['metadef_properties'].append(property)
return property
@ -1988,7 +1989,7 @@ def metadef_tag_update(context, namespace_name, id, values):
DATA['metadef_tags'].remove(tag)
tag.update(values)
tag['updated_at'] = timeutils.utcnow()
tag['updated_at'] = oslo_timeutils.utcnow()
DATA['metadef_tags'].append(tag)
return tag
@ -2031,8 +2032,8 @@ def _format_association(namespace, resource_type, association_values):
'resource_type': resource_type['id'],
'properties_target': None,
'prefix': None,
'created_at': timeutils.utcnow(),
'updated_at': timeutils.utcnow()
'created_at': oslo_timeutils.utcnow(),
'updated_at': oslo_timeutils.utcnow()
}
association.update(association_values)
@ -2040,7 +2041,7 @@ def _format_association(namespace, resource_type, association_values):
def _format_resource_type(values):
dt = timeutils.utcnow()
dt = oslo_timeutils.utcnow()
resource_type = {
'id': _get_metadef_id(),
'name': values['name'],
@ -2064,7 +2065,7 @@ def _format_property(values):
def _format_namespace(values):
dt = timeutils.utcnow()
dt = oslo_timeutils.utcnow()
namespace = {
'id': _get_metadef_id(),
'namespace': None,
@ -2081,7 +2082,7 @@ def _format_namespace(values):
def _format_object(values):
dt = timeutils.utcnow()
dt = oslo_timeutils.utcnow()
object = {
'id': _get_metadef_id(),
'namespace_id': None,
@ -2097,7 +2098,7 @@ def _format_object(values):
def _format_tag(values):
dt = timeutils.utcnow()
dt = oslo_timeutils.utcnow()
tag = {
'id': _get_metadef_id(),
'namespace_id': None,
@ -2261,8 +2262,8 @@ def insert_cache_details(context, node_reference_url, image_id,
last_modified=None, hits=None):
global DATA
node_reference = node_reference_get_by_url(context, node_reference_url)
accessed = last_accessed or timeutils.utcnow()
modified = last_modified or timeutils.utcnow()
accessed = last_accessed or oslo_timeutils.utcnow()
modified = last_modified or oslo_timeutils.utcnow()
values = {
'last_accessed': accessed,
@ -2287,7 +2288,7 @@ def update_hit_count(context, image_id, node_reference_url):
last_hit_count = get_hit_count(context, image_id, node_reference_url)
node_reference = node_reference_get_by_url(context, node_reference_url)
all_images = DATA['cached_images']
last_accessed = timeutils.utcnow()
last_accessed = oslo_timeutils.utcnow()
values = {
'hits': last_hit_count + 1,
'last_accessed': last_accessed

View File

@ -30,6 +30,7 @@ from oslo_db import exception as db_exception
from oslo_db.sqlalchemy import session as oslo_db_session
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import timeutils as oslo_timeutils
import osprofiler.sqlalchemy
from retrying import retry
import sqlalchemy
@ -936,7 +937,7 @@ def image_set_property_atomic(image_id, name, value):
try:
connection.execute(table.insert(),
dict(deleted=False,
created_at=timeutils.utcnow(),
created_at=oslo_timeutils.utcnow(),
image_id=image_id,
name=name,
value=value))
@ -1035,7 +1036,7 @@ def _image_update(context, session, image_id, values, purge_props=False,
_drop_protected_attrs(models.Image, values)
# NOTE(iccha-sethi): updated_at must be explicitly set in case
# only ImageProperty table was modifited
values['updated_at'] = timeutils.utcnow()
values['updated_at'] = oslo_timeutils.utcnow()
if image_id:
query = session.query(models.Image).filter_by(id=image_id)
@ -1096,7 +1097,7 @@ def image_location_add(context, image_id, location):
@utils.no_4byte_params
def _image_location_add(context, session, image_id, location):
deleted = location['status'] in ('deleted', 'pending_delete')
delete_time = timeutils.utcnow() if deleted else None
delete_time = oslo_timeutils.utcnow() if deleted else None
location_ref = models.ImageLocation(image_id=image_id,
value=location['url'],
meta_data=location['metadata'],
@ -1124,7 +1125,7 @@ def _image_location_update(context, session, image_id, location):
id=loc_id).filter_by(image_id=image_id).one()
deleted = location['status'] in ('deleted', 'pending_delete')
updated_time = timeutils.utcnow()
updated_time = oslo_timeutils.utcnow()
delete_time = updated_time if deleted else None
location_ref.update({"value": location['url'],
@ -1162,7 +1163,7 @@ def _image_location_delete(
location_ref = session.query(models.ImageLocation).filter_by(
id=location_id).filter_by(image_id=image_id).one()
delete_time = delete_time or timeutils.utcnow()
delete_time = delete_time or oslo_timeutils.utcnow()
location_ref.update({"deleted": True,
"status": status,
@ -1280,7 +1281,7 @@ def _image_child_entry_delete_all(
query = session.query(child_model_cls).filter_by(
image_id=image_id).filter_by(deleted=False)
delete_time = delete_time or timeutils.utcnow()
delete_time = delete_time or oslo_timeutils.utcnow()
count = query.update({"deleted": True, "deleted_at": delete_time})
return count
@ -1592,7 +1593,8 @@ def purge_deleted_rows(context, age_in_days, max_rows):
session = get_session()
metadata = MetaData()
engine = get_engine()
deleted_age = timeutils.utcnow() - datetime.timedelta(days=age_in_days)
deleted_age = oslo_timeutils.utcnow() - datetime.timedelta(
days=age_in_days)
tables = []
for model_class in models.__dict__.values():
@ -1691,7 +1693,8 @@ def purge_deleted_rows_from_images(context, age_in_days, max_rows):
session = get_session()
metadata = MetaData()
engine = get_engine()
deleted_age = timeutils.utcnow() - datetime.timedelta(days=age_in_days)
deleted_age = oslo_timeutils.utcnow() - datetime.timedelta(
days=age_in_days)
tbl = 'images'
tab = Table(tbl, metadata, autoload_with=engine)
@ -1836,7 +1839,7 @@ def task_update(context, task_id, values):
task_ref = _task_get(context, session, task_id)
_drop_protected_attrs(models.Task, values)
values['updated_at'] = timeutils.utcnow()
values['updated_at'] = oslo_timeutils.utcnow()
_task_update(context, session, task_ref, values)
@ -1877,10 +1880,10 @@ def _tasks_get_by_image(context, session, image_id):
expires_at = models.Task.expires_at
query = query.filter(sa_sql.or_(expires_at == None,
expires_at >= timeutils.utcnow()))
expires_at >= oslo_timeutils.utcnow()))
updated_at = models.Task.updated_at
query.filter(
updated_at <= (timeutils.utcnow() +
updated_at <= (oslo_timeutils.utcnow() +
datetime.timedelta(hours=CONF.task.task_time_to_live)))
if not context.can_see_deleted:
@ -1919,8 +1922,8 @@ def _task_soft_delete(context, session):
query = (query.filter(models.Task.owner == context.owner)
.filter_by(deleted=False)
.filter(expires_at <= timeutils.utcnow()))
values = {'deleted': True, 'deleted_at': timeutils.utcnow()}
.filter(expires_at <= oslo_timeutils.utcnow()))
values = {'deleted': True, 'deleted_at': oslo_timeutils.utcnow()}
query.update(values)
@ -2537,8 +2540,8 @@ def insert_cache_details(context, node_reference_url, image_id,
last_modified=None, hits=None):
node_reference = node_reference_get_by_url(context, node_reference_url)
session = get_session()
accessed = last_accessed or timeutils.utcnow()
modified = last_modified or timeutils.utcnow()
accessed = last_accessed or oslo_timeutils.utcnow()
modified = last_modified or oslo_timeutils.utcnow()
values = {
'image_id': image_id,
@ -2564,7 +2567,7 @@ def insert_cache_details(context, node_reference_url, image_id,
@utils.no_4byte_params
def update_hit_count(context, image_id, node_reference_url):
session = get_session()
last_accessed = timeutils.utcnow()
last_accessed = oslo_timeutils.utcnow()
with session.begin():
node_id = session.query(models.NodeReference.node_reference_id).filter(

View File

@ -26,12 +26,12 @@ import re
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import encodeutils
from oslo_utils import timeutils
import sqlalchemy
from sqlalchemy import and_
from sqlalchemy.schema import MetaData
from sqlalchemy.sql import select
from glance.common import timeutils
from glance.i18n import _, _LE, _LI, _LW
LOG = logging.getLogger(__name__)

View File

@ -22,6 +22,7 @@ import uuid
from oslo_db.sqlalchemy import models
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from sqlalchemy import BigInteger
from sqlalchemy import Boolean
from sqlalchemy import Column
@ -38,8 +39,6 @@ from sqlalchemy import Text
from sqlalchemy.types import TypeDecorator
from sqlalchemy import UniqueConstraint
from glance.common import timeutils
BASE = declarative_base()

View File

@ -17,6 +17,7 @@ SQLAlchemy models for glance metadata schema
"""
from oslo_db.sqlalchemy import models
from oslo_utils import timeutils
from sqlalchemy import Boolean
from sqlalchemy import Column
from sqlalchemy import DateTime
@ -29,7 +30,6 @@ from sqlalchemy import String
from sqlalchemy import Text
from sqlalchemy import UniqueConstraint
from glance.common import timeutils
from glance.db.sqlalchemy.models import JSONEncodedDict

View File

@ -22,9 +22,9 @@ from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import importutils
from oslo_utils import timeutils
from glance.common import exception
from glance.common import timeutils
from glance.i18n import _, _LE, _LI, _LW
LOG = logging.getLogger(__name__)

View File

@ -106,10 +106,12 @@ class Migrate:
"db.", r['image_id'])
# NOTE(abhishekk): Converting dates to be compatible with
# centralized db
last_accessed = datetime.datetime.utcfromtimestamp(
r['last_accessed']).isoformat()
last_modified = datetime.datetime.utcfromtimestamp(
r['last_modified']).isoformat()
last_accessed = datetime.datetime.fromtimestamp(
r['last_accessed'], tz=datetime.timezone.utc).replace(
tzinfo=None).isoformat()
last_modified = datetime.datetime.fromtimestamp(
r['last_modified'], tz=datetime.timezone.utc).replace(
tzinfo=None).isoformat()
# insert into centralized database
self.db_api.insert_cache_details(
self.context, self.node_reference, r['image_id'],

View File

@ -23,6 +23,7 @@ import uuid
from oslo_db import exception as db_exception
from oslo_db.sqlalchemy import utils as sqlalchemyutils
from oslo_utils import timeutils as oslo_timeutils
from sqlalchemy import sql
from glance.common import exception
@ -44,7 +45,7 @@ UUID1, UUID2, UUID3 = sorted([str(uuid.uuid4()) for x in range(3)])
def build_image_fixture(**kwargs):
default_datetime = timeutils.utcnow()
default_datetime = oslo_timeutils.utcnow()
image = {
'id': str(uuid.uuid4()),
'name': 'fake image #2',
@ -71,7 +72,7 @@ def build_image_fixture(**kwargs):
def build_task_fixture(**kwargs):
default_datetime = timeutils.utcnow()
default_datetime = oslo_timeutils.utcnow()
task = {
'id': str(uuid.uuid4()),
'type': 'import',
@ -109,7 +110,7 @@ class TestDriver(test_utils.BaseTestCase):
self.create_images(self.fixtures)
def build_image_fixtures(self):
dt1 = timeutils.utcnow()
dt1 = oslo_timeutils.utcnow()
dt2 = dt1 + datetime.timedelta(microseconds=5)
fixtures = [
{
@ -148,10 +149,11 @@ class DriverTests(object):
fixture = {'name': 'mark', 'size': 12, 'status': 'queued'}
self.db_api.image_create(self.context, fixture)
@mock.patch.object(timeutils, 'utcnow')
@mock.patch.object(oslo_timeutils, 'utcnow')
def test_image_create_defaults(self, mock_utcnow):
mock_utcnow.return_value = datetime.datetime.utcnow()
create_time = timeutils.utcnow()
mock_utcnow.return_value = datetime.datetime.now(
datetime.timezone.utc).replace(tzinfo=None)
create_time = oslo_timeutils.utcnow()
values = {'status': 'queued',
'created_at': create_time,
'updated_at': create_time}
@ -923,7 +925,7 @@ class DriverTests(object):
def test_image_paginate(self):
"""Paginate through a list of images using limit and marker"""
now = timeutils.utcnow()
now = oslo_timeutils.utcnow()
extra_uuids = [(str(uuid.uuid4()),
now + datetime.timedelta(seconds=i * 5))
for i in range(2)]
@ -1242,9 +1244,10 @@ class DriverTests(object):
self.assertRaises(exception.NotFound, self.db_api.image_tag_delete,
self.context, UUID1, 'snap')
@mock.patch.object(timeutils, 'utcnow')
@mock.patch.object(oslo_timeutils, 'utcnow')
def test_image_member_create(self, mock_utcnow):
mock_utcnow.return_value = datetime.datetime.utcnow()
mock_utcnow.return_value = datetime.datetime.now(
datetime.timezone.utc).replace(tzinfo=None)
memberships = self.db_api.image_member_find(self.context)
self.assertEqual([], memberships)
@ -1473,7 +1476,7 @@ class DriverQuotaTests(test_utils.BaseTestCase):
auth_token='%s:%s:user' % (self.owner_id1, self.owner_id1))
self.db_api = db_tests.get_db(self.config)
db_tests.reset_db(self.db_api)
dt1 = timeutils.utcnow()
dt1 = oslo_timeutils.utcnow()
dt2 = dt1 + datetime.timedelta(microseconds=5)
fixtures = [
{
@ -1524,7 +1527,7 @@ class DriverQuotaTests(test_utils.BaseTestCase):
self.assertEqual(total, x)
def test_storage_quota_multiple_locations(self):
dt1 = timeutils.utcnow()
dt1 = oslo_timeutils.utcnow()
sz = 53
new_fixture_dict = {'id': str(uuid.uuid4()), 'created_at': dt1,
'updated_at': dt1, 'size': sz,
@ -1546,7 +1549,7 @@ class DriverQuotaTests(test_utils.BaseTestCase):
# NOTE(flaper87): This needs to be tested for
# soft deleted images as well. Currently there's no
# good way to delete locations.
dt1 = timeutils.utcnow()
dt1 = oslo_timeutils.utcnow()
sz = 53
image_id = str(uuid.uuid4())
new_fixture_dict = {'id': image_id, 'created_at': dt1,
@ -1671,7 +1674,7 @@ class TaskTests(test_utils.BaseTestCase):
self.assertEqual(0, len(tasks))
def test_task_get_all_owned(self):
then = timeutils.utcnow() + datetime.timedelta(days=365)
then = oslo_timeutils.utcnow() + datetime.timedelta(days=365)
TENANT1 = str(uuid.uuid4())
ctxt1 = context.RequestContext(is_admin=False,
tenant=TENANT1,
@ -1699,7 +1702,7 @@ class TaskTests(test_utils.BaseTestCase):
self.assertEqual(sorted(expected), sorted(task_owners))
def test_task_get(self):
expires_at = timeutils.utcnow()
expires_at = oslo_timeutils.utcnow()
image_id = str(uuid.uuid4())
fixture = {
'owner': self.context.owner,
@ -1731,7 +1734,7 @@ class TaskTests(test_utils.BaseTestCase):
def _test_task_get_by_image(self, expired=False, deleted=False,
other_owner=False):
expires_at = timeutils.utcnow()
expires_at = oslo_timeutils.utcnow()
if expired is False:
expires_at += datetime.timedelta(hours=1)
elif expired is None:
@ -1801,7 +1804,7 @@ class TaskTests(test_utils.BaseTestCase):
self.assertEqual(0, len(tasks))
def test_task_get_all(self):
now = timeutils.utcnow()
now = oslo_timeutils.utcnow()
then = now + datetime.timedelta(days=365)
image_id = str(uuid.uuid4())
fixture1 = {
@ -1858,7 +1861,7 @@ class TaskTests(test_utils.BaseTestCase):
self.assertNotIn(key, task)
def test_task_soft_delete(self):
now = timeutils.utcnow()
now = oslo_timeutils.utcnow()
then = now + datetime.timedelta(days=365)
fixture1 = build_task_fixture(id='1', expires_at=now,
@ -2028,7 +2031,7 @@ class DBPurgeTests(test_utils.BaseTestCase):
self.create_images(self.image_fixtures)
def build_fixtures(self):
dt1 = timeutils.utcnow() - datetime.timedelta(days=5)
dt1 = oslo_timeutils.utcnow() - datetime.timedelta(days=5)
dt2 = dt1 + datetime.timedelta(days=1)
dt3 = dt2 + datetime.timedelta(days=1)
fixtures = [
@ -2041,7 +2044,7 @@ class DBPurgeTests(test_utils.BaseTestCase):
{
'created_at': dt1,
'updated_at': dt2,
'deleted_at': timeutils.utcnow(),
'deleted_at': oslo_timeutils.utcnow(),
'deleted': True,
},
{
@ -2116,7 +2119,7 @@ class DBPurgeTests(test_utils.BaseTestCase):
# Add a 4th row in images table and set it deleted 15 days ago
uuidstr = uuid.uuid4().hex
created_time = timeutils.utcnow() - datetime.timedelta(days=20)
created_time = oslo_timeutils.utcnow() - datetime.timedelta(days=20)
deleted_time = created_time + datetime.timedelta(days=5)
images_row_fixture = {
'id': uuidstr,

View File

@ -17,9 +17,9 @@ import http.client
import eventlet
from oslo_serialization import jsonutils as json
from oslo_utils import timeutils
from glance.api.v2 import tasks
from glance.common import timeutils
from glance.tests.integration.v2 import base
TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df'

View File

@ -18,6 +18,7 @@ import datetime
from unittest import mock
import iso8601
from oslo_utils import timeutils as oslo_timeutils
from glance.common import timeutils
from glance.tests import utils as test_utils
@ -39,14 +40,14 @@ class TimeUtilsTest(test_utils.BaseTestCase):
6, 14, 0)
def test_isotime(self):
with mock.patch('datetime.datetime') as datetime_mock:
datetime_mock.utcnow.return_value = self.skynet_self_aware_time
with mock.patch('oslo_utils.timeutils.utcnow') as utcnow_mock:
utcnow_mock.return_value = self.skynet_self_aware_time
dt = timeutils.isotime()
self.assertEqual(dt, self.skynet_self_aware_time_str)
def test_isotimei_micro_second_precision(self):
with mock.patch('datetime.datetime') as datetime_mock:
datetime_mock.utcnow.return_value = self.skynet_self_aware_ms_time
with mock.patch('oslo_utils.timeutils.utcnow') as utcnow_mock:
utcnow_mock.return_value = self.skynet_self_aware_ms_time
dt = timeutils.isotime(subsecond=True)
self.assertEqual(dt, self.skynet_self_aware_time_ms_str)
@ -62,23 +63,8 @@ class TimeUtilsTest(test_utils.BaseTestCase):
tzinfo=iso8601.iso8601.UTC)
self.assertEqual(skynet_self_aware_time_ms_utc, expect)
def test_utcnow(self):
with mock.patch('datetime.datetime') as datetime_mock:
datetime_mock.utcnow.return_value = self.skynet_self_aware_time
self.assertEqual(timeutils.utcnow(), self.skynet_self_aware_time)
self.assertFalse(timeutils.utcnow() == self.skynet_self_aware_time)
self.assertTrue(timeutils.utcnow())
def test_delta_seconds(self):
before = timeutils.utcnow()
after = before + datetime.timedelta(days=7, seconds=59,
microseconds=123456)
self.assertAlmostEqual(604859.123456,
timeutils.delta_seconds(before, after))
def test_iso8601_from_timestamp(self):
utcnow = timeutils.utcnow()
utcnow = oslo_timeutils.utcnow()
iso = timeutils.isotime(utcnow)
ts = calendar.timegm(utcnow.timetuple())
self.assertEqual(iso, timeutils.iso8601_from_timestamp(ts))

View File

@ -20,11 +20,11 @@ import uuid
from oslo_config import cfg
import oslo_utils.importutils
from oslo_utils import timeutils
import glance.async_
from glance.async_ import taskflow_executor
from glance.common import exception
from glance.common import timeutils
from glance import domain
import glance.tests.utils as test_utils
@ -456,7 +456,8 @@ class TestTask(test_utils.BaseTestCase):
@mock.patch.object(timeutils, 'utcnow')
def test_succeed(self, mock_utcnow):
mock_utcnow.return_value = datetime.datetime.utcnow()
mock_utcnow.return_value = datetime.datetime.now(
datetime.timezone.utc).replace(tzinfo=None)
self.task.begin_processing()
self.task.succeed('{"location": "file://home"}')
self.assertEqual('success', self.task.status)
@ -471,7 +472,8 @@ class TestTask(test_utils.BaseTestCase):
@mock.patch.object(timeutils, 'utcnow')
def test_fail(self, mock_utcnow):
mock_utcnow.return_value = datetime.datetime.utcnow()
mock_utcnow.return_value = datetime.datetime.now(
datetime.timezone.utc).replace(tzinfo=None)
self.task.begin_processing()
self.task.fail('{"message": "connection failed"}')
self.assertEqual('failure', self.task.status)

View File

@ -674,9 +674,10 @@ class TestTaskNotifications(utils.BaseTestCase):
self.context,
self.notifier
)
self.patcher = mock.patch.object(timeutils, 'utcnow')
self.patcher = mock.patch('oslo_utils.timeutils.utcnow')
mock_utcnow = self.patcher.start()
mock_utcnow.return_value = datetime.datetime.utcnow()
mock_utcnow.return_value = datetime.datetime.now(
datetime.timezone.utc).replace(tzinfo=None)
def tearDown(self):
super(TestTaskNotifications, self).tearDown()

View File

@ -26,6 +26,7 @@ import glance_store as store
from oslo_config import cfg
from oslo_serialization import jsonutils
from oslo_utils import fixture
from oslo_utils import timeutils
import testtools
import webob
import webob.exc
@ -34,7 +35,6 @@ import glance.api.v2.image_actions
import glance.api.v2.images
from glance.common import exception
from glance.common import store_utils
from glance.common import timeutils
from glance import domain
import glance.notifier
import glance.schema

View File

@ -21,10 +21,10 @@ import uuid
from oslo_config import cfg
from oslo_serialization import jsonutils
from oslo_utils import timeutils
import webob
import glance.api.v2.tasks
from glance.common import timeutils
import glance.domain
import glance.gateway
from glance.tests.unit import base