Stop using deprecated datetime.utc* functions

Change-Id: I9a205a4191e9b26784e507262cb66a1190c2bc71
This commit is contained in:
Tim Burke 2024-01-04 00:30:32 +00:00 committed by Tim Burke
parent bf7f3ff2f9
commit 603122a32c
6 changed files with 37 additions and 30 deletions

View File

@ -113,7 +113,7 @@ class S3Timestamp(utils.Timestamp):
@property
def s3xmlformat(self):
dt = datetime.datetime.utcfromtimestamp(self.ceil())
dt = datetime.datetime.fromtimestamp(self.ceil(), utils.UTC)
return dt.strftime(self.S3_XML_FORMAT)
@classmethod

View File

@ -114,6 +114,7 @@ from swift.common.utils.timestamp import ( # noqa
EPOCH,
last_modified_date_to_timestamp,
normalize_delete_at_timestamp,
UTC,
)
from swift.common.utils.ipaddrs import ( # noqa
is_valid_ip,
@ -521,22 +522,6 @@ def get_policy_index(req_headers, res_headers):
return str(policy_index) if policy_index is not None else None
class _UTC(datetime.tzinfo):
"""
A tzinfo class for datetime objects that returns a 0 timedelta (UTC time)
"""
def dst(self, dt):
return datetime.timedelta(0)
utcoffset = dst
def tzname(self, dt):
return 'UTC'
UTC = _UTC()
class LogStringFormatter(string.Formatter):
def __init__(self, default='', quote=False):
super(LogStringFormatter, self).__init__()

View File

@ -18,6 +18,7 @@
import datetime
import functools
import math
import sys
import time
import six
@ -189,12 +190,14 @@ class Timestamp(object):
elif us < 0:
t -= 1
us += 1000000
dt = datetime.datetime.utcfromtimestamp(t)
dt = datetime.datetime.fromtimestamp(t, UTC)
dt = dt.replace(microsecond=us)
else:
dt = datetime.datetime.utcfromtimestamp(t)
dt = datetime.datetime.fromtimestamp(t, UTC)
isoformat = dt.isoformat()
# need to drop tzinfo
isoformat = isoformat[:isoformat.index('+')]
# python isoformat() doesn't include msecs when zero
if len(isoformat) < len("1970-01-01T00:00:00.000000"):
isoformat += ".000000"
@ -397,3 +400,21 @@ def normalize_delete_at_timestamp(timestamp, high_precision=False):
"""
fmt = '%016.5f' if high_precision else '%010d'
return fmt % min(max(0, float(timestamp)), 9999999999.99999)
if sys.version_info < (3, 11):
class _UTC(datetime.tzinfo):
"""
A tzinfo class for datetimes that returns a 0 timedelta (UTC time)
"""
def dst(self, dt):
return datetime.timedelta(0)
utcoffset = dst
def tzname(self, dt):
return 'UTC'
UTC = _UTC()
else:
from datetime import UTC

View File

@ -20,7 +20,7 @@ import mock
import time
from contextlib import contextmanager
from swift.common import swob
from swift.common import swob, utils
from swift.common.http import is_success
from swift.common.middleware.s3api.s3api import filter_factory
@ -200,7 +200,7 @@ class S3ApiTestCase(unittest.TestCase):
return email.utils.formatdate(time.time() + skew)
def get_v4_amz_date_header(self, offset=None):
when = datetime.utcnow()
when = datetime.now(utils.UTC)
if offset is not None:
when += offset
return when.strftime('%Y%m%dT%H%M%SZ')

View File

@ -33,7 +33,7 @@ from swift.common.middleware.s3api.utils import Config
from swift.common.middleware.keystoneauth import KeystoneAuth
from swift.common import swob, registry
from swift.common.swob import Request
from swift.common.utils import md5, get_logger
from swift.common.utils import md5, get_logger, UTC
from keystonemiddleware.auth_token import AuthProtocol
from keystoneauth1.access import AccessInfoV2
@ -403,7 +403,7 @@ class TestS3ApiMiddleware(S3ApiTestCase):
'AWSAccessKeyId=test:tester' % expire,
environ={'REQUEST_METHOD': 'GET'},
headers={'Date': self.get_date_header()})
req.headers['Date'] = datetime.utcnow()
req.headers['Date'] = datetime.now(UTC)
req.content_type = 'text/plain'
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'AccessDenied')
@ -415,7 +415,7 @@ class TestS3ApiMiddleware(S3ApiTestCase):
# Set expire to last 32b timestamp value
# This number can't be higher, because it breaks tests on 32b systems
expire = '2147483647' # 19 Jan 2038 03:14:07
utc_date = datetime.utcnow()
utc_date = datetime.now(UTC)
req = Request.blank('/bucket/object?Signature=X&Expires=%s&'
'AWSAccessKeyId=test:tester&Timestamp=%s' %
(expire, utc_date.isoformat().rsplit('.')[0]),
@ -452,7 +452,7 @@ class TestS3ApiMiddleware(S3ApiTestCase):
'AWSAccessKeyId=test:tester' % expire,
environ={'REQUEST_METHOD': 'GET'},
headers={'Date': self.get_date_header()})
req.headers['Date'] = datetime.utcnow()
req.headers['Date'] = datetime.now(UTC)
req.content_type = 'text/plain'
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'AccessDenied')
@ -466,7 +466,7 @@ class TestS3ApiMiddleware(S3ApiTestCase):
'AWSAccessKeyId=test:tester' % expire,
environ={'REQUEST_METHOD': 'GET'},
headers={'Date': self.get_date_header()})
req.headers['Date'] = datetime.utcnow()
req.headers['Date'] = datetime.now(UTC)
req.content_type = 'text/plain'
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'AccessDenied')
@ -479,7 +479,7 @@ class TestS3ApiMiddleware(S3ApiTestCase):
req = Request.blank('/bucket/object?Expires=%s&'
'AWSAccessKeyId=' % expire,
environ={'REQUEST_METHOD': 'GET'})
req.headers['Date'] = datetime.utcnow()
req.headers['Date'] = datetime.now(UTC)
req.content_type = 'text/plain'
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'AccessDenied')

View File

@ -39,7 +39,8 @@ from swift.container.server import gen_resp_headers, ContainerController
from swift.common.direct_client import ClientException
from swift.common import swob
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.utils import split_path, Timestamp, encode_timestamps, mkdirs
from swift.common.utils import split_path, Timestamp, encode_timestamps, \
mkdirs, UTC
from test.debug_logger import debug_logger
from test.unit import FakeRing, fake_http_connect, patch_policies, \
@ -48,8 +49,8 @@ from test.unit.common.middleware import helpers
def timestamp_to_last_modified(timestamp):
return datetime.utcfromtimestamp(
float(Timestamp(timestamp))).strftime('%Y-%m-%dT%H:%M:%S.%f')
dt = datetime.fromtimestamp(float(Timestamp(timestamp)), UTC)
return dt.strftime('%Y-%m-%dT%H:%M:%S.%f')
def container_resp_headers(**kwargs):