Remove oslo namespace package

Blueprint remove-namespace-packages

Depends-on: I4e1cb4f1caaee61683dc9fb0ffc67e8f9e06dc6d
for openstack/barbican
Depends-on: I52848a1f63cb4a1b30d9e1c32f4bb18c5d9fe80e
for openstack/castellan
Depends-on: Ia21c15e8eca6bf456f7cfe13f815f5ce068601e7
for openstack/designate
Depends-on: I695d2141c00a5ee36e042efbb9bac4e2803c1948
for openstack/ironic
Depends-on: I779d8df5872887d5d8ec44012d792b59b0cf4f64
for openstack/ironic-lib
Depends-on: I977248bec9560bcd730efe6bed741f1dc025fdf3
for openstack/keystonemiddleware
Depends-on: I75e6e15d50ef9830d0581efd4cbcceb3e626f7b7
for openstack/manila
Depends-on: I975592f3694be42d52685ebf606f6d3012caf1a8
for openstack/murano
Depends-on: Icbd2ae016553b5e5c886205e87acf38f3ccec550
for openstack/murano-dashboard
Depends-on: If8a132de65ba1e57ea93f98daac66816a3cefaa8
for openstack/neutron
Depends-on: I9891564317aa3cf84874debdc966e48fb270455e
for openstack/os-brick
Depends-on: Ie11a5d51a6471792e122942dfc9063a0d9d38399
for openstack/python-barbicanclient
Depends-on: I5d325e9397404211df00d9e39195b63d9a2f2e76
for openstack/python-muranoclient
Depends-on: I474dcd9de166b00d02b3586858a28797e97a3231
for openstack/python-neutronclient
Depends-on: I5e4746b58e11bf56957f10517d484c173335dfc9
for openstack/sahara

Change-Id: I9ef9506873605661f394476c96e4a3bbdb85fbed
This commit is contained in:
Doug Hellmann 2015-07-13 16:11:47 +00:00
parent 6c4509587a
commit 6bd180efc3
23 changed files with 0 additions and 2206 deletions

View File

@ -1,15 +0,0 @@
# Copyright 2012 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__import__('pkg_resources').declare_namespace(__name__)

View File

@ -1,26 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import warnings
def deprecated():
new_name = __name__.replace('.', '_')
warnings.warn(
('The oslo namespace package is deprecated. Please use %s instead.' %
new_name),
DeprecationWarning,
stacklevel=3,
)
deprecated()

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils.encodeutils import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils.excutils import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils.importutils import * # noqa

View File

@ -1,15 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils.netutils import * # noqa
# NOTE(dhellmann): Needed for taskflow.
from oslo_utils.netutils import _ModifiedSplitResult # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils.reflection import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils.strutils import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils.timeutils import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils.units import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils.uuidutils import * # noqa

View File

@ -21,11 +21,7 @@ classifier =
[files]
packages =
oslo
oslo.utils
oslo_utils
namespace_packages =
oslo
[pbr]
warnerrors = true

View File

@ -1,13 +0,0 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -1,55 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import fixtures
import testtools
_TRUE_VALUES = ('true', '1', 'yes')
# FIXME(dhellmann) Update this to use oslo.test library
class TestCase(testtools.TestCase):
"""Test case base class for all unit tests."""
def setUp(self):
"""Run before each test method to initialize test environment."""
super(TestCase, self).setUp()
test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
try:
test_timeout = int(test_timeout)
except ValueError:
# If timeout value is invalid do not set a timeout.
test_timeout = 0
if test_timeout > 0:
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
self.useFixture(fixtures.NestedTempfile())
self.useFixture(fixtures.TempHomeDir())
if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
self.log_fixture = self.useFixture(fixtures.FakeLogger())

View File

@ -1,197 +0,0 @@
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import time
import mock
from oslotest import base as test_base
from oslotest import moxstubout
from oslo.utils import excutils
mox = moxstubout.mox
class SaveAndReraiseTest(test_base.BaseTestCase):
def test_save_and_reraise_exception(self):
e = None
msg = 'foo'
try:
try:
raise Exception(msg)
except Exception:
with excutils.save_and_reraise_exception():
pass
except Exception as _e:
e = _e
self.assertEqual(str(e), msg)
@mock.patch('logging.getLogger')
def test_save_and_reraise_exception_dropped(self, get_logger_mock):
logger = get_logger_mock()
e = None
msg = 'second exception'
try:
try:
raise Exception('dropped')
except Exception:
with excutils.save_and_reraise_exception():
raise Exception(msg)
except Exception as _e:
e = _e
self.assertEqual(str(e), msg)
self.assertTrue(logger.error.called)
def test_save_and_reraise_exception_no_reraise(self):
"""Test that suppressing the reraise works."""
try:
raise Exception('foo')
except Exception:
with excutils.save_and_reraise_exception() as ctxt:
ctxt.reraise = False
@mock.patch('logging.getLogger')
def test_save_and_reraise_exception_dropped_no_reraise(self,
get_logger_mock):
logger = get_logger_mock()
e = None
msg = 'second exception'
try:
try:
raise Exception('dropped')
except Exception:
with excutils.save_and_reraise_exception(reraise=False):
raise Exception(msg)
except Exception as _e:
e = _e
self.assertEqual(str(e), msg)
self.assertFalse(logger.error.called)
class ForeverRetryUncaughtExceptionsTest(test_base.BaseTestCase):
def setUp(self):
super(ForeverRetryUncaughtExceptionsTest, self).setUp()
moxfixture = self.useFixture(moxstubout.MoxStubout())
self.mox = moxfixture.mox
self.stubs = moxfixture.stubs
@excutils.forever_retry_uncaught_exceptions
def exception_generator(self):
exc = self.exception_to_raise()
while exc is not None:
raise exc
exc = self.exception_to_raise()
def exception_to_raise(self):
return None
def my_time_sleep(self, arg):
pass
def exc_retrier_common_start(self):
self.stubs.Set(time, 'sleep', self.my_time_sleep)
self.mox.StubOutWithMock(logging, 'exception')
self.mox.StubOutWithMock(time, 'time',
use_mock_anything=True)
self.mox.StubOutWithMock(self, 'exception_to_raise')
def exc_retrier_sequence(self, exc_id=None, timestamp=None,
exc_count=None):
self.exception_to_raise().AndReturn(
Exception('unexpected %d' % exc_id))
time.time().AndReturn(timestamp)
if exc_count != 0:
logging.exception(mox.In(
'Unexpected exception occurred %d time(s)' % exc_count))
def exc_retrier_common_end(self):
self.exception_to_raise().AndReturn(None)
self.mox.ReplayAll()
self.exception_generator()
self.addCleanup(self.stubs.UnsetAll)
def test_exc_retrier_1exc_gives_1log(self):
self.exc_retrier_common_start()
self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
self.exc_retrier_common_end()
def test_exc_retrier_same_10exc_1min_gives_1log(self):
self.exc_retrier_common_start()
self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
# By design, the following exception don't get logged because they
# are within the same minute.
for i in range(2, 11):
self.exc_retrier_sequence(exc_id=1, timestamp=i, exc_count=0)
self.exc_retrier_common_end()
def test_exc_retrier_same_2exc_2min_gives_2logs(self):
self.exc_retrier_common_start()
self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
self.exc_retrier_sequence(exc_id=1, timestamp=65, exc_count=1)
self.exc_retrier_common_end()
def test_exc_retrier_same_10exc_2min_gives_2logs(self):
self.exc_retrier_common_start()
self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
self.exc_retrier_sequence(exc_id=1, timestamp=12, exc_count=0)
self.exc_retrier_sequence(exc_id=1, timestamp=23, exc_count=0)
self.exc_retrier_sequence(exc_id=1, timestamp=34, exc_count=0)
self.exc_retrier_sequence(exc_id=1, timestamp=45, exc_count=0)
# The previous 4 exceptions are counted here
self.exc_retrier_sequence(exc_id=1, timestamp=106, exc_count=5)
# Again, the following are not logged due to being within
# the same minute
self.exc_retrier_sequence(exc_id=1, timestamp=117, exc_count=0)
self.exc_retrier_sequence(exc_id=1, timestamp=128, exc_count=0)
self.exc_retrier_sequence(exc_id=1, timestamp=139, exc_count=0)
self.exc_retrier_sequence(exc_id=1, timestamp=150, exc_count=0)
self.exc_retrier_common_end()
def test_exc_retrier_mixed_4exc_1min_gives_2logs(self):
self.exc_retrier_common_start()
self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
# By design, this second 'unexpected 1' exception is not counted. This
# is likely a rare thing and is a sacrifice for code simplicity.
self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0)
self.exc_retrier_sequence(exc_id=2, timestamp=20, exc_count=1)
# Again, trailing exceptions within a minute are not counted.
self.exc_retrier_sequence(exc_id=2, timestamp=30, exc_count=0)
self.exc_retrier_common_end()
def test_exc_retrier_mixed_4exc_2min_gives_2logs(self):
self.exc_retrier_common_start()
self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
# Again, this second exception of the same type is not counted
# for the sake of code simplicity.
self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0)
# The difference between this and the previous case is the log
# is also triggered by more than a minute expiring.
self.exc_retrier_sequence(exc_id=2, timestamp=100, exc_count=1)
self.exc_retrier_sequence(exc_id=2, timestamp=110, exc_count=0)
self.exc_retrier_common_end()
def test_exc_retrier_mixed_4exc_2min_gives_3logs(self):
self.exc_retrier_common_start()
self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
# This time the second 'unexpected 1' exception is counted due
# to the same exception occurring same when the minute expires.
self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0)
self.exc_retrier_sequence(exc_id=1, timestamp=100, exc_count=2)
self.exc_retrier_sequence(exc_id=2, timestamp=110, exc_count=1)
self.exc_retrier_common_end()

View File

@ -1,121 +0,0 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import sys
from oslotest import base as test_base
from oslo.utils import importutils
class ImportUtilsTest(test_base.BaseTestCase):
# NOTE(jkoelker) There has GOT to be a way to test this. But mocking
# __import__ is the devil. Right now we just make
# sure we can import something from the stdlib
def test_import_class(self):
dt = importutils.import_class('datetime.datetime')
self.assertEqual(sys.modules['datetime'].datetime, dt)
def test_import_bad_class(self):
self.assertRaises(ImportError, importutils.import_class,
'lol.u_mad.brah')
def test_import_module(self):
dt = importutils.import_module('datetime')
self.assertEqual(sys.modules['datetime'], dt)
def test_import_object_optional_arg_not_present(self):
obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver')
self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_optional_arg_present(self):
obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver',
first_arg=False)
self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_required_arg_not_present(self):
# arg 1 isn't optional here
self.assertRaises(TypeError, importutils.import_object,
'oslo_utils.tests.fake.FakeDriver2')
def test_import_object_required_arg_present(self):
obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver2',
first_arg=False)
self.assertEqual(obj.__class__.__name__, 'FakeDriver2')
# namespace tests
def test_import_object_ns_optional_arg_not_present(self):
obj = importutils.import_object_ns('oslo_utils',
'tests.fake.FakeDriver')
self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_ns_optional_arg_present(self):
obj = importutils.import_object_ns('oslo_utils',
'tests.fake.FakeDriver',
first_arg=False)
self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_ns_required_arg_not_present(self):
# arg 1 isn't optional here
self.assertRaises(TypeError, importutils.import_object_ns,
'oslo_utils', 'tests.fake.FakeDriver2')
def test_import_object_ns_required_arg_present(self):
obj = importutils.import_object_ns('oslo_utils',
'tests.fake.FakeDriver2',
first_arg=False)
self.assertEqual(obj.__class__.__name__, 'FakeDriver2')
# namespace tests
def test_import_object_ns_full_optional_arg_not_present(self):
obj = importutils.import_object_ns('tests2',
'oslo_utils.tests.fake.FakeDriver')
self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_ns_full_optional_arg_present(self):
obj = importutils.import_object_ns('tests2',
'oslo_utils.tests.fake.FakeDriver',
first_arg=False)
self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_ns_full_required_arg_not_present(self):
# arg 1 isn't optional here
self.assertRaises(TypeError, importutils.import_object_ns,
'tests2', 'oslo_utils.tests.fake.FakeDriver2')
def test_import_object_ns_full_required_arg_present(self):
obj = importutils.import_object_ns('tests2',
'oslo_utils.tests.fake.FakeDriver2',
first_arg=False)
self.assertEqual(obj.__class__.__name__, 'FakeDriver2')
def test_import_object(self):
dt = importutils.import_object('datetime.time')
self.assertTrue(isinstance(dt, sys.modules['datetime'].time))
def test_import_object_with_args(self):
dt = importutils.import_object('datetime.datetime', 2012, 4, 5)
self.assertTrue(isinstance(dt, sys.modules['datetime'].datetime))
self.assertEqual(dt, datetime.datetime(2012, 4, 5))
def test_try_import(self):
dt = importutils.try_import('datetime')
self.assertEqual(sys.modules['datetime'], dt)
def test_try_import_returns_default(self):
foo = importutils.try_import('foo.bar')
self.assertIsNone(foo)

View File

@ -1,204 +0,0 @@
# Copyright 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
import mock
from oslotest import base as test_base
from oslo.utils import netutils
class NetworkUtilsTest(test_base.BaseTestCase):
def test_no_host(self):
result = netutils.urlsplit('http://')
self.assertEqual('', result.netloc)
self.assertEqual(None, result.port)
self.assertEqual(None, result.hostname)
self.assertEqual('http', result.scheme)
def test_parse_host_port(self):
self.assertEqual(('server01', 80),
netutils.parse_host_port('server01:80'))
self.assertEqual(('server01', None),
netutils.parse_host_port('server01'))
self.assertEqual(('server01', 1234),
netutils.parse_host_port('server01',
default_port=1234))
self.assertEqual(('::1', 80),
netutils.parse_host_port('[::1]:80'))
self.assertEqual(('::1', None),
netutils.parse_host_port('[::1]'))
self.assertEqual(('::1', 1234),
netutils.parse_host_port('[::1]',
default_port=1234))
self.assertEqual(('2001:db8:85a3::8a2e:370:7334', 1234),
netutils.parse_host_port(
'2001:db8:85a3::8a2e:370:7334',
default_port=1234))
def test_urlsplit(self):
result = netutils.urlsplit('rpc://myhost?someparam#somefragment')
self.assertEqual(result.scheme, 'rpc')
self.assertEqual(result.netloc, 'myhost')
self.assertEqual(result.path, '')
self.assertEqual(result.query, 'someparam')
self.assertEqual(result.fragment, 'somefragment')
result = netutils.urlsplit(
'rpc://myhost/mypath?someparam#somefragment',
allow_fragments=False)
self.assertEqual(result.scheme, 'rpc')
self.assertEqual(result.netloc, 'myhost')
self.assertEqual(result.path, '/mypath')
self.assertEqual(result.query, 'someparam#somefragment')
self.assertEqual(result.fragment, '')
result = netutils.urlsplit(
'rpc://user:pass@myhost/mypath?someparam#somefragment',
allow_fragments=False)
self.assertEqual(result.scheme, 'rpc')
self.assertEqual(result.netloc, 'user:pass@myhost')
self.assertEqual(result.path, '/mypath')
self.assertEqual(result.query, 'someparam#somefragment')
self.assertEqual(result.fragment, '')
def test_urlsplit_ipv6(self):
ipv6_url = 'http://[::1]:443/v2.0/'
result = netutils.urlsplit(ipv6_url)
self.assertEqual(result.scheme, 'http')
self.assertEqual(result.netloc, '[::1]:443')
self.assertEqual(result.path, '/v2.0/')
self.assertEqual(result.hostname, '::1')
self.assertEqual(result.port, 443)
ipv6_url = 'http://user:pass@[::1]/v2.0/'
result = netutils.urlsplit(ipv6_url)
self.assertEqual(result.scheme, 'http')
self.assertEqual(result.netloc, 'user:pass@[::1]')
self.assertEqual(result.path, '/v2.0/')
self.assertEqual(result.hostname, '::1')
self.assertEqual(result.port, None)
ipv6_url = 'https://[2001:db8:85a3::8a2e:370:7334]:1234/v2.0/xy?ab#12'
result = netutils.urlsplit(ipv6_url)
self.assertEqual(result.scheme, 'https')
self.assertEqual(result.netloc, '[2001:db8:85a3::8a2e:370:7334]:1234')
self.assertEqual(result.path, '/v2.0/xy')
self.assertEqual(result.hostname, '2001:db8:85a3::8a2e:370:7334')
self.assertEqual(result.port, 1234)
self.assertEqual(result.query, 'ab')
self.assertEqual(result.fragment, '12')
def test_urlsplit_params(self):
test_url = "http://localhost/?a=b&c=d"
result = netutils.urlsplit(test_url)
self.assertEqual({'a': 'b', 'c': 'd'}, result.params())
self.assertEqual({'a': 'b', 'c': 'd'}, result.params(collapse=False))
test_url = "http://localhost/?a=b&a=c&a=d"
result = netutils.urlsplit(test_url)
self.assertEqual({'a': 'd'}, result.params())
self.assertEqual({'a': ['b', 'c', 'd']}, result.params(collapse=False))
test_url = "http://localhost"
result = netutils.urlsplit(test_url)
self.assertEqual({}, result.params())
test_url = "http://localhost?"
result = netutils.urlsplit(test_url)
self.assertEqual({}, result.params())
def test_set_tcp_keepalive(self):
mock_sock = mock.Mock()
netutils.set_tcp_keepalive(mock_sock, True, 100, 10, 5)
calls = [
mock.call.setsockopt(socket.SOL_SOCKET,
socket.SO_KEEPALIVE, True),
]
if hasattr(socket, 'TCP_KEEPIDLE'):
calls += [
mock.call.setsockopt(socket.IPPROTO_TCP,
socket.TCP_KEEPIDLE, 100)
]
if hasattr(socket, 'TCP_KEEPINTVL'):
calls += [
mock.call.setsockopt(socket.IPPROTO_TCP,
socket.TCP_KEEPINTVL, 10),
]
if hasattr(socket, 'TCP_KEEPCNT'):
calls += [
mock.call.setsockopt(socket.IPPROTO_TCP,
socket.TCP_KEEPCNT, 5)
]
mock_sock.assert_has_calls(calls)
mock_sock.reset_mock()
netutils.set_tcp_keepalive(mock_sock, False)
self.assertEqual(1, len(mock_sock.mock_calls))
def test_is_valid_ipv4(self):
self.assertTrue(netutils.is_valid_ipv4('42.42.42.42'))
self.assertFalse(netutils.is_valid_ipv4('-1.11.11.11'))
self.assertFalse(netutils.is_valid_ipv4(''))
def test_is_valid_ipv6(self):
self.assertTrue(netutils.is_valid_ipv6('::1'))
self.assertFalse(netutils.is_valid_ipv6(
'1fff::a88:85a3::172.31.128.1'))
self.assertFalse(netutils.is_valid_ipv6(''))
def test_is_valid_ip(self):
self.assertTrue(netutils.is_valid_ip('127.0.0.1'))
self.assertTrue(netutils.is_valid_ip('2001:db8::ff00:42:8329'))
self.assertFalse(netutils.is_valid_ip('256.0.0.0'))
self.assertFalse(netutils.is_valid_ip('::1.2.3.'))
self.assertFalse(netutils.is_valid_ip(''))
def test_valid_port(self):
valid_inputs = [1, '1', 2, '3', '5', 8, 13, 21,
'80', '3246', '65535']
for input_str in valid_inputs:
self.assertTrue(netutils.is_valid_port(input_str))
def test_valid_port_fail(self):
invalid_inputs = ['-32768', '0', 0, '65536', 528491, '528491',
'528.491', 'thirty-seven', None]
for input_str in invalid_inputs:
self.assertFalse(netutils.is_valid_port(input_str))
def test_get_my_ip(self):
sock_attrs = {
'return_value.getsockname.return_value': ['1.2.3.4', '']}
with mock.patch('socket.socket', **sock_attrs):
addr = netutils.get_my_ipv4()
self.assertEqual(addr, '1.2.3.4')
@mock.patch('socket.socket')
@mock.patch('oslo_utils.netutils._get_my_ipv4_address')
def test_get_my_ip_socket_error(self, ip, mock_socket):
mock_socket.side_effect = socket.error
ip.return_value = '1.2.3.4'
addr = netutils.get_my_ipv4()
self.assertEqual(addr, '1.2.3.4')

View File

@ -1,279 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslotest import base as test_base
import six
import testtools
from oslo.utils import reflection
if six.PY3:
RUNTIME_ERROR_CLASSES = ['RuntimeError', 'Exception',
'BaseException', 'object']
else:
RUNTIME_ERROR_CLASSES = ['RuntimeError', 'StandardError', 'Exception',
'BaseException', 'object']
def dummy_decorator(f):
@six.wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
def mere_function(a, b):
pass
def function_with_defs(a, b, optional=None):
pass
def function_with_kwargs(a, b, **kwargs):
pass
class Class(object):
def method(self, c, d):
pass
@staticmethod
def static_method(e, f):
pass
@classmethod
def class_method(cls, g, h):
pass
class CallableClass(object):
def __call__(self, i, j):
pass
class ClassWithInit(object):
def __init__(self, k, l):
pass
class CallbackEqualityTest(test_base.BaseTestCase):
def test_different_simple_callbacks(self):
def a():
pass
def b():
pass
self.assertFalse(reflection.is_same_callback(a, b))
def test_static_instance_callbacks(self):
class A(object):
@staticmethod
def b(a, b, c):
pass
a = A()
b = A()
self.assertTrue(reflection.is_same_callback(a.b, b.b))
def test_different_instance_callbacks(self):
class A(object):
def b(self):
pass
def __eq__(self, other):
return True
b = A()
c = A()
self.assertFalse(reflection.is_same_callback(b.b, c.b))
self.assertTrue(reflection.is_same_callback(b.b, c.b, strict=False))
class GetCallableNameTest(test_base.BaseTestCase):
def test_mere_function(self):
name = reflection.get_callable_name(mere_function)
self.assertEqual('.'.join((__name__, 'mere_function')), name)
def test_method(self):
name = reflection.get_callable_name(Class.method)
self.assertEqual('.'.join((__name__, 'Class', 'method')), name)
def test_instance_method(self):
name = reflection.get_callable_name(Class().method)
self.assertEqual('.'.join((__name__, 'Class', 'method')), name)
def test_static_method(self):
name = reflection.get_callable_name(Class.static_method)
if six.PY3:
self.assertEqual('.'.join((__name__, 'Class', 'static_method')),
name)
else:
# NOTE(imelnikov): static method are just functions, class name
# is not recorded anywhere in them.
self.assertEqual('.'.join((__name__, 'static_method')), name)
def test_class_method(self):
name = reflection.get_callable_name(Class.class_method)
self.assertEqual('.'.join((__name__, 'Class', 'class_method')), name)
def test_constructor(self):
name = reflection.get_callable_name(Class)
self.assertEqual('.'.join((__name__, 'Class')), name)
def test_callable_class(self):
name = reflection.get_callable_name(CallableClass())
self.assertEqual('.'.join((__name__, 'CallableClass')), name)
def test_callable_class_call(self):
name = reflection.get_callable_name(CallableClass().__call__)
self.assertEqual('.'.join((__name__, 'CallableClass',
'__call__')), name)
# These extended/special case tests only work on python 3, due to python 2
# being broken/incorrect with regard to these special cases...
@testtools.skipIf(not six.PY3, 'python 3.x is not currently available')
class GetCallableNameTestExtended(test_base.BaseTestCase):
# Tests items in http://legacy.python.org/dev/peps/pep-3155/
class InnerCallableClass(object):
def __call__(self):
pass
def test_inner_callable_class(self):
obj = self.InnerCallableClass()
name = reflection.get_callable_name(obj.__call__)
expected_name = '.'.join((__name__, 'GetCallableNameTestExtended',
'InnerCallableClass', '__call__'))
self.assertEqual(expected_name, name)
def test_inner_callable_function(self):
def a():
def b():
pass
return b
name = reflection.get_callable_name(a())
expected_name = '.'.join((__name__, 'GetCallableNameTestExtended',
'test_inner_callable_function', '<locals>',
'a', '<locals>', 'b'))
self.assertEqual(expected_name, name)
def test_inner_class(self):
obj = self.InnerCallableClass()
name = reflection.get_callable_name(obj)
expected_name = '.'.join((__name__,
'GetCallableNameTestExtended',
'InnerCallableClass'))
self.assertEqual(expected_name, name)
class GetCallableArgsTest(test_base.BaseTestCase):
def test_mere_function(self):
result = reflection.get_callable_args(mere_function)
self.assertEqual(['a', 'b'], result)
def test_function_with_defaults(self):
result = reflection.get_callable_args(function_with_defs)
self.assertEqual(['a', 'b', 'optional'], result)
def test_required_only(self):
result = reflection.get_callable_args(function_with_defs,
required_only=True)
self.assertEqual(['a', 'b'], result)
def test_method(self):
result = reflection.get_callable_args(Class.method)
self.assertEqual(['self', 'c', 'd'], result)
def test_instance_method(self):
result = reflection.get_callable_args(Class().method)
self.assertEqual(['c', 'd'], result)
def test_class_method(self):
result = reflection.get_callable_args(Class.class_method)
self.assertEqual(['g', 'h'], result)
def test_class_constructor(self):
result = reflection.get_callable_args(ClassWithInit)
self.assertEqual(['k', 'l'], result)
def test_class_with_call(self):
result = reflection.get_callable_args(CallableClass())
self.assertEqual(['i', 'j'], result)
def test_decorators_work(self):
@dummy_decorator
def special_fun(x, y):
pass
result = reflection.get_callable_args(special_fun)
self.assertEqual(['x', 'y'], result)
class AcceptsKwargsTest(test_base.BaseTestCase):
def test_no_kwargs(self):
self.assertEqual(False, reflection.accepts_kwargs(mere_function))
def test_with_kwargs(self):
self.assertEqual(True, reflection.accepts_kwargs(function_with_kwargs))
class GetClassNameTest(test_base.BaseTestCase):
def test_std_exception(self):
name = reflection.get_class_name(RuntimeError)
self.assertEqual('RuntimeError', name)
def test_class(self):
name = reflection.get_class_name(Class)
self.assertEqual('.'.join((__name__, 'Class')), name)
def test_instance(self):
name = reflection.get_class_name(Class())
self.assertEqual('.'.join((__name__, 'Class')), name)
def test_int(self):
name = reflection.get_class_name(42)
self.assertEqual('int', name)
class GetAllClassNamesTest(test_base.BaseTestCase):
def test_std_class(self):
names = list(reflection.get_all_class_names(RuntimeError))
self.assertEqual(RUNTIME_ERROR_CLASSES, names)
def test_std_class_up_to(self):
names = list(reflection.get_all_class_names(RuntimeError,
up_to=Exception))
self.assertEqual(RUNTIME_ERROR_CLASSES[:-2], names)

View File

@ -1,594 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import math
import mock
from oslotest import base as test_base
import six
import testscenarios
from oslo.utils import strutils
from oslo.utils import units
load_tests = testscenarios.load_tests_apply_scenarios
class StrUtilsTest(test_base.BaseTestCase):
def test_bool_bool_from_string(self):
self.assertTrue(strutils.bool_from_string(True))
self.assertFalse(strutils.bool_from_string(False))
def test_bool_bool_from_string_default(self):
self.assertTrue(strutils.bool_from_string('', default=True))
self.assertFalse(strutils.bool_from_string('wibble', default=False))
def _test_bool_from_string(self, c):
self.assertTrue(strutils.bool_from_string(c('true')))
self.assertTrue(strutils.bool_from_string(c('TRUE')))
self.assertTrue(strutils.bool_from_string(c('on')))
self.assertTrue(strutils.bool_from_string(c('On')))
self.assertTrue(strutils.bool_from_string(c('yes')))
self.assertTrue(strutils.bool_from_string(c('YES')))
self.assertTrue(strutils.bool_from_string(c('yEs')))
self.assertTrue(strutils.bool_from_string(c('1')))
self.assertTrue(strutils.bool_from_string(c('T')))
self.assertTrue(strutils.bool_from_string(c('t')))
self.assertTrue(strutils.bool_from_string(c('Y')))
self.assertTrue(strutils.bool_from_string(c('y')))
self.assertFalse(strutils.bool_from_string(c('false')))
self.assertFalse(strutils.bool_from_string(c('FALSE')))
self.assertFalse(strutils.bool_from_string(c('off')))
self.assertFalse(strutils.bool_from_string(c('OFF')))
self.assertFalse(strutils.bool_from_string(c('no')))
self.assertFalse(strutils.bool_from_string(c('0')))
self.assertFalse(strutils.bool_from_string(c('42')))
self.assertFalse(strutils.bool_from_string(c(
'This should not be True')))
self.assertFalse(strutils.bool_from_string(c('F')))
self.assertFalse(strutils.bool_from_string(c('f')))
self.assertFalse(strutils.bool_from_string(c('N')))
self.assertFalse(strutils.bool_from_string(c('n')))
# Whitespace should be stripped
self.assertTrue(strutils.bool_from_string(c(' 1 ')))
self.assertTrue(strutils.bool_from_string(c(' true ')))
self.assertFalse(strutils.bool_from_string(c(' 0 ')))
self.assertFalse(strutils.bool_from_string(c(' false ')))
def test_bool_from_string(self):
self._test_bool_from_string(lambda s: s)
def test_unicode_bool_from_string(self):
self._test_bool_from_string(six.text_type)
self.assertFalse(strutils.bool_from_string(u'使用', strict=False))
exc = self.assertRaises(ValueError, strutils.bool_from_string,
u'使用', strict=True)
expected_msg = (u"Unrecognized value '使用', acceptable values are:"
u" '0', '1', 'f', 'false', 'n', 'no', 'off', 'on',"
u" 't', 'true', 'y', 'yes'")
self.assertEqual(expected_msg, six.text_type(exc))
def test_other_bool_from_string(self):
self.assertFalse(strutils.bool_from_string(None))
self.assertFalse(strutils.bool_from_string(mock.Mock()))
def test_int_bool_from_string(self):
self.assertTrue(strutils.bool_from_string(1))
self.assertFalse(strutils.bool_from_string(-1))
self.assertFalse(strutils.bool_from_string(0))
self.assertFalse(strutils.bool_from_string(2))
def test_strict_bool_from_string(self):
# None isn't allowed in strict mode
exc = self.assertRaises(ValueError, strutils.bool_from_string, None,
strict=True)
expected_msg = ("Unrecognized value 'None', acceptable values are:"
" '0', '1', 'f', 'false', 'n', 'no', 'off', 'on',"
" 't', 'true', 'y', 'yes'")
self.assertEqual(expected_msg, str(exc))
# Unrecognized strings aren't allowed
self.assertFalse(strutils.bool_from_string('Other', strict=False))
exc = self.assertRaises(ValueError, strutils.bool_from_string, 'Other',
strict=True)
expected_msg = ("Unrecognized value 'Other', acceptable values are:"
" '0', '1', 'f', 'false', 'n', 'no', 'off', 'on',"
" 't', 'true', 'y', 'yes'")
self.assertEqual(expected_msg, str(exc))
# Unrecognized numbers aren't allowed
exc = self.assertRaises(ValueError, strutils.bool_from_string, 2,
strict=True)
expected_msg = ("Unrecognized value '2', acceptable values are:"
" '0', '1', 'f', 'false', 'n', 'no', 'off', 'on',"
" 't', 'true', 'y', 'yes'")
self.assertEqual(expected_msg, str(exc))
# False-like values are allowed
self.assertFalse(strutils.bool_from_string('f', strict=True))
self.assertFalse(strutils.bool_from_string('false', strict=True))
self.assertFalse(strutils.bool_from_string('off', strict=True))
self.assertFalse(strutils.bool_from_string('n', strict=True))
self.assertFalse(strutils.bool_from_string('no', strict=True))
self.assertFalse(strutils.bool_from_string('0', strict=True))
self.assertTrue(strutils.bool_from_string('1', strict=True))
# Avoid font-similarity issues (one looks like lowercase-el, zero like
# oh, etc...)
for char in ('O', 'o', 'L', 'l', 'I', 'i'):
self.assertRaises(ValueError, strutils.bool_from_string, char,
strict=True)
def test_int_from_bool_as_string(self):
self.assertEqual(1, strutils.int_from_bool_as_string(True))
self.assertEqual(0, strutils.int_from_bool_as_string(False))
def test_slugify(self):
to_slug = strutils.to_slug
self.assertRaises(TypeError, to_slug, True)
self.assertEqual(six.u("hello"), to_slug("hello"))
self.assertEqual(six.u("two-words"), to_slug("Two Words"))
self.assertEqual(six.u("ma-any-spa-ce-es"),
to_slug("Ma-any\t spa--ce- es"))
self.assertEqual(six.u("excamation"), to_slug("exc!amation!"))
self.assertEqual(six.u("ampserand"), to_slug("&ampser$and"))
self.assertEqual(six.u("ju5tnum8er"), to_slug("ju5tnum8er"))
self.assertEqual(six.u("strip-"), to_slug(" strip - "))
self.assertEqual(six.u("perche"), to_slug(six.b("perch\xc3\xa9")))
self.assertEqual(six.u("strange"),
to_slug("\x80strange", errors="ignore"))
class StringToBytesTest(test_base.BaseTestCase):
_unit_system = [
('si', dict(unit_system='SI')),
('iec', dict(unit_system='IEC')),
('invalid_unit_system', dict(unit_system='KKK', assert_error=True)),
]
_sign = [
('no_sign', dict(sign='')),
('positive', dict(sign='+')),
('negative', dict(sign='-')),
('invalid_sign', dict(sign='~', assert_error=True)),
]
_magnitude = [
('integer', dict(magnitude='79')),
('decimal', dict(magnitude='7.9')),
('decimal_point_start', dict(magnitude='.9')),
('decimal_point_end', dict(magnitude='79.', assert_error=True)),
('invalid_literal', dict(magnitude='7.9.9', assert_error=True)),
('garbage_value', dict(magnitude='asdf', assert_error=True)),
]
_unit_prefix = [
('no_unit_prefix', dict(unit_prefix='')),
('k', dict(unit_prefix='k')),
('K', dict(unit_prefix='K')),
('M', dict(unit_prefix='M')),
('G', dict(unit_prefix='G')),
('T', dict(unit_prefix='T')),
('Ki', dict(unit_prefix='Ki')),
('Mi', dict(unit_prefix='Mi')),
('Gi', dict(unit_prefix='Gi')),
('Ti', dict(unit_prefix='Ti')),
('invalid_unit_prefix', dict(unit_prefix='B', assert_error=True)),
]
_unit_suffix = [
('b', dict(unit_suffix='b')),
('bit', dict(unit_suffix='bit')),
('B', dict(unit_suffix='B')),
('invalid_unit_suffix', dict(unit_suffix='Kg', assert_error=True)),
]
_return_int = [
('return_dec', dict(return_int=False)),
('return_int', dict(return_int=True)),
]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._unit_system,
cls._sign,
cls._magnitude,
cls._unit_prefix,
cls._unit_suffix,
cls._return_int)
def test_string_to_bytes(self):
def _get_quantity(sign, magnitude, unit_suffix):
res = float('%s%s' % (sign, magnitude))
if unit_suffix in ['b', 'bit']:
res /= 8
return res
def _get_constant(unit_prefix, unit_system):
if not unit_prefix:
return 1
elif unit_system == 'SI':
res = getattr(units, unit_prefix)
elif unit_system == 'IEC':
if unit_prefix.endswith('i'):
res = getattr(units, unit_prefix)
else:
res = getattr(units, '%si' % unit_prefix)
return res
text = ''.join([self.sign, self.magnitude, self.unit_prefix,
self.unit_suffix])
err_si = self.unit_system == 'SI' and (self.unit_prefix == 'K' or
self.unit_prefix.endswith('i'))
err_iec = self.unit_system == 'IEC' and self.unit_prefix == 'k'
if getattr(self, 'assert_error', False) or err_si or err_iec:
self.assertRaises(ValueError, strutils.string_to_bytes,
text, unit_system=self.unit_system,
return_int=self.return_int)
return
quantity = _get_quantity(self.sign, self.magnitude, self.unit_suffix)
constant = _get_constant(self.unit_prefix, self.unit_system)
expected = quantity * constant
actual = strutils.string_to_bytes(text, unit_system=self.unit_system,
return_int=self.return_int)
if self.return_int:
self.assertEqual(actual, int(math.ceil(expected)))
else:
self.assertAlmostEqual(actual, expected)
StringToBytesTest.generate_scenarios()
class MaskPasswordTestCase(test_base.BaseTestCase):
def test_json(self):
# Test 'adminPass' w/o spaces
payload = """{'adminPass':'mypassword'}"""
expected = """{'adminPass':'***'}"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'adminPass' with spaces
payload = """{ 'adminPass' : 'mypassword' }"""
expected = """{ 'adminPass' : '***' }"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_pass' w/o spaces
payload = """{'admin_pass':'mypassword'}"""
expected = """{'admin_pass':'***'}"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_pass' with spaces
payload = """{ 'admin_pass' : 'mypassword' }"""
expected = """{ 'admin_pass' : '***' }"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_password' w/o spaces
payload = """{'admin_password':'mypassword'}"""
expected = """{'admin_password':'***'}"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_password' with spaces
payload = """{ 'admin_password' : 'mypassword' }"""
expected = """{ 'admin_password' : '***' }"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'password' w/o spaces
payload = """{'password':'mypassword'}"""
expected = """{'password':'***'}"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'password' with spaces
payload = """{ 'password' : 'mypassword' }"""
expected = """{ 'password' : '***' }"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'auth_password' w/o spaces
payload = """{'auth_password':'mypassword'}"""
expected = """{'auth_password':'***'}"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'auth_password' with spaces
payload = """{ 'auth_password' : 'mypassword' }"""
expected = """{ 'auth_password' : '***' }"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'secret_uuid' w/o spaces
payload = """{'secret_uuid':'myuuid'}"""
expected = """{'secret_uuid':'***'}"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'secret_uuid' with spaces
payload = """{ 'secret_uuid' : 'myuuid' }"""
expected = """{ 'secret_uuid' : '***' }"""
self.assertEqual(expected, strutils.mask_password(payload))
def test_xml(self):
# Test 'adminPass' w/o spaces
payload = """<adminPass>mypassword</adminPass>"""
expected = """<adminPass>***</adminPass>"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'adminPass' with spaces
payload = """<adminPass>
mypassword
</adminPass>"""
expected = """<adminPass>***</adminPass>"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_pass' w/o spaces
payload = """<admin_pass>mypassword</admin_pass>"""
expected = """<admin_pass>***</admin_pass>"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_pass' with spaces
payload = """<admin_pass>
mypassword
</admin_pass>"""
expected = """<admin_pass>***</admin_pass>"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_password' w/o spaces
payload = """<admin_password>mypassword</admin_password>"""
expected = """<admin_password>***</admin_password>"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_password' with spaces
payload = """<admin_password>
mypassword
</admin_password>"""
expected = """<admin_password>***</admin_password>"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'password' w/o spaces
payload = """<password>mypassword</password>"""
expected = """<password>***</password>"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'password' with spaces
payload = """<password>
mypassword
</password>"""
expected = """<password>***</password>"""
self.assertEqual(expected, strutils.mask_password(payload))
def test_xml_attribute(self):
# Test 'adminPass' w/o spaces
payload = """adminPass='mypassword'"""
expected = """adminPass='***'"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'adminPass' with spaces
payload = """adminPass = 'mypassword'"""
expected = """adminPass = '***'"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'adminPass' with double quotes
payload = """adminPass = "mypassword\""""
expected = """adminPass = "***\""""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_pass' w/o spaces
payload = """admin_pass='mypassword'"""
expected = """admin_pass='***'"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_pass' with spaces
payload = """admin_pass = 'mypassword'"""
expected = """admin_pass = '***'"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_pass' with double quotes
payload = """admin_pass = "mypassword\""""
expected = """admin_pass = "***\""""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_password' w/o spaces
payload = """admin_password='mypassword'"""
expected = """admin_password='***'"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_password' with spaces
payload = """admin_password = 'mypassword'"""
expected = """admin_password = '***'"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_password' with double quotes
payload = """admin_password = "mypassword\""""
expected = """admin_password = "***\""""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'password' w/o spaces
payload = """password='mypassword'"""
expected = """password='***'"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'password' with spaces
payload = """password = 'mypassword'"""
expected = """password = '***'"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'password' with double quotes
payload = """password = "mypassword\""""
expected = """password = "***\""""
self.assertEqual(expected, strutils.mask_password(payload))
def test_json_message(self):
payload = """body: {"changePassword": {"adminPass": "1234567"}}"""
expected = """body: {"changePassword": {"adminPass": "***"}}"""
self.assertEqual(expected, strutils.mask_password(payload))
payload = """body: {"rescue": {"admin_pass": "1234567"}}"""
expected = """body: {"rescue": {"admin_pass": "***"}}"""
self.assertEqual(expected, strutils.mask_password(payload))
payload = """body: {"rescue": {"admin_password": "1234567"}}"""
expected = """body: {"rescue": {"admin_password": "***"}}"""
self.assertEqual(expected, strutils.mask_password(payload))
payload = """body: {"rescue": {"password": "1234567"}}"""
expected = """body: {"rescue": {"password": "***"}}"""
self.assertEqual(expected, strutils.mask_password(payload))
def test_xml_message(self):
payload = """<?xml version="1.0" encoding="UTF-8"?>
<rebuild
xmlns="http://docs.openstack.org/compute/api/v1.1"
name="foobar"
imageRef="http://openstack.example.com/v1.1/32278/images/70a599e0-31e7"
accessIPv4="1.2.3.4"
accessIPv6="fe80::100"
adminPass="seekr3t">
<metadata>
<meta key="My Server Name">Apache1</meta>
</metadata>
</rebuild>"""
expected = """<?xml version="1.0" encoding="UTF-8"?>
<rebuild
xmlns="http://docs.openstack.org/compute/api/v1.1"
name="foobar"
imageRef="http://openstack.example.com/v1.1/32278/images/70a599e0-31e7"
accessIPv4="1.2.3.4"
accessIPv6="fe80::100"
adminPass="***">
<metadata>
<meta key="My Server Name">Apache1</meta>
</metadata>
</rebuild>"""
self.assertEqual(expected, strutils.mask_password(payload))
payload = """<?xml version="1.0" encoding="UTF-8"?>
<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
admin_pass="MySecretPass"/>"""
expected = """<?xml version="1.0" encoding="UTF-8"?>
<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
admin_pass="***"/>"""
self.assertEqual(expected, strutils.mask_password(payload))
payload = """<?xml version="1.0" encoding="UTF-8"?>
<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
admin_password="MySecretPass"/>"""
expected = """<?xml version="1.0" encoding="UTF-8"?>
<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
admin_password="***"/>"""
self.assertEqual(expected, strutils.mask_password(payload))
payload = """<?xml version="1.0" encoding="UTF-8"?>
<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
password="MySecretPass"/>"""
expected = """<?xml version="1.0" encoding="UTF-8"?>
<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
password="***"/>"""
self.assertEqual(expected, strutils.mask_password(payload))
def test_mask_password(self):
payload = "test = 'password' : 'aaaaaa'"
expected = "test = 'password' : '111'"
self.assertEqual(expected,
strutils.mask_password(payload, secret='111'))
payload = 'mysqld --password "aaaaaa"'
expected = 'mysqld --password "****"'
self.assertEqual(expected,
strutils.mask_password(payload, secret='****'))
payload = 'mysqld --password aaaaaa'
expected = 'mysqld --password ???'
self.assertEqual(expected,
strutils.mask_password(payload, secret='???'))
payload = 'mysqld --password = "aaaaaa"'
expected = 'mysqld --password = "****"'
self.assertEqual(expected,
strutils.mask_password(payload, secret='****'))
payload = "mysqld --password = 'aaaaaa'"
expected = "mysqld --password = '****'"
self.assertEqual(expected,
strutils.mask_password(payload, secret='****'))
payload = "mysqld --password = aaaaaa"
expected = "mysqld --password = ****"
self.assertEqual(expected,
strutils.mask_password(payload, secret='****'))
payload = "test = password = aaaaaa"
expected = "test = password = 111"
self.assertEqual(expected,
strutils.mask_password(payload, secret='111'))
payload = "test = password= aaaaaa"
expected = "test = password= 111"
self.assertEqual(expected,
strutils.mask_password(payload, secret='111'))
payload = "test = password =aaaaaa"
expected = "test = password =111"
self.assertEqual(expected,
strutils.mask_password(payload, secret='111'))
payload = "test = password=aaaaaa"
expected = "test = password=111"
self.assertEqual(expected,
strutils.mask_password(payload, secret='111'))
payload = 'test = "original_password" : "aaaaaaaaa"'
expected = 'test = "original_password" : "***"'
self.assertEqual(expected, strutils.mask_password(payload))
payload = 'test = "param1" : "value"'
expected = 'test = "param1" : "value"'
self.assertEqual(expected, strutils.mask_password(payload))
payload = """{'adminPass':'mypassword'}"""
payload = six.text_type(payload)
expected = """{'adminPass':'***'}"""
self.assertEqual(expected, strutils.mask_password(payload))
payload = ("test = 'node.session.auth.password','-v','mypassword',"
"'nomask'")
expected = ("test = 'node.session.auth.password','-v','***',"
"'nomask'")
self.assertEqual(expected, strutils.mask_password(payload))
payload = ("test = 'node.session.auth.password', '--password', "
"'mypassword', 'nomask'")
expected = ("test = 'node.session.auth.password', '--password', "
"'***', 'nomask'")
self.assertEqual(expected, strutils.mask_password(payload))
payload = ("test = 'node.session.auth.password', '--password', "
"'mypassword'")
expected = ("test = 'node.session.auth.password', '--password', "
"'***'")
self.assertEqual(expected, strutils.mask_password(payload))
payload = "test = node.session.auth.password -v mypassword nomask"
expected = "test = node.session.auth.password -v *** nomask"
self.assertEqual(expected, strutils.mask_password(payload))
payload = ("test = node.session.auth.password --password mypassword "
"nomask")
expected = ("test = node.session.auth.password --password *** "
"nomask")
self.assertEqual(expected, strutils.mask_password(payload))
payload = ("test = node.session.auth.password --password mypassword")
expected = ("test = node.session.auth.password --password ***")
self.assertEqual(expected, strutils.mask_password(payload))
payload = "test = cmd --password my\xe9\x80\x80pass"
expected = ("test = cmd --password ***")
self.assertEqual(expected, strutils.mask_password(payload))
class IsIntLikeTestCase(test_base.BaseTestCase):
def test_is_int_like_true(self):
self.assertTrue(strutils.is_int_like(1))
self.assertTrue(strutils.is_int_like("1"))
self.assertTrue(strutils.is_int_like("514"))
self.assertTrue(strutils.is_int_like("0"))
def test_is_int_like_false(self):
self.assertFalse(strutils.is_int_like(1.1))
self.assertFalse(strutils.is_int_like("1.1"))
self.assertFalse(strutils.is_int_like("1.1.1"))
self.assertFalse(strutils.is_int_like(None))
self.assertFalse(strutils.is_int_like("0."))
self.assertFalse(strutils.is_int_like("aaaaaa"))
self.assertFalse(strutils.is_int_like("...."))
self.assertFalse(strutils.is_int_like("1g"))
self.assertFalse(
strutils.is_int_like("0cc3346e-9fef-4445-abe6-5d2b2690ec64"))
self.assertFalse(strutils.is_int_like("a1"))
# NOTE(viktors): 12e3 - is a float number
self.assertFalse(strutils.is_int_like("12e3"))
# NOTE(viktors): Check integer numbers with base not 10
self.assertFalse(strutils.is_int_like("0o51"))
self.assertFalse(strutils.is_int_like("0xDEADBEEF"))

View File

@ -1,359 +0,0 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import calendar
import datetime
import time
import iso8601
import mock
from oslotest import base as test_base
from testtools import matchers
from oslo.utils import timeutils
class TimeUtilsTest(test_base.BaseTestCase):
def setUp(self):
super(TimeUtilsTest, self).setUp()
self.skynet_self_aware_time_str = '1997-08-29T06:14:00Z'
self.skynet_self_aware_time_ms_str = '1997-08-29T06:14:00.000123Z'
self.skynet_self_aware_time = datetime.datetime(1997, 8, 29, 6, 14, 0)
self.skynet_self_aware_ms_time = datetime.datetime(
1997, 8, 29, 6, 14, 0, 123)
self.one_minute_before = datetime.datetime(1997, 8, 29, 6, 13, 0)
self.one_minute_after = datetime.datetime(1997, 8, 29, 6, 15, 0)
self.skynet_self_aware_time_perfect_str = '1997-08-29T06:14:00.000000'
self.skynet_self_aware_time_perfect = datetime.datetime(1997, 8, 29,
6, 14, 0)
self.addCleanup(timeutils.clear_time_override)
def test_isotime(self):
with mock.patch('datetime.datetime') as datetime_mock:
datetime_mock.utcnow.return_value = self.skynet_self_aware_time
dt = timeutils.isotime()
self.assertEqual(dt, self.skynet_self_aware_time_str)
def test_isotimei_micro_second_precision(self):
with mock.patch('datetime.datetime') as datetime_mock:
datetime_mock.utcnow.return_value = self.skynet_self_aware_ms_time
dt = timeutils.isotime(subsecond=True)
self.assertEqual(dt, self.skynet_self_aware_time_ms_str)
def test_parse_isotime(self):
expect = timeutils.parse_isotime(self.skynet_self_aware_time_str)
skynet_self_aware_time_utc = self.skynet_self_aware_time.replace(
tzinfo=iso8601.iso8601.UTC)
self.assertEqual(skynet_self_aware_time_utc, expect)
def test_parse_isotime_micro_second_precision(self):
expect = timeutils.parse_isotime(self.skynet_self_aware_time_ms_str)
skynet_self_aware_time_ms_utc = self.skynet_self_aware_ms_time.replace(
tzinfo=iso8601.iso8601.UTC)
self.assertEqual(skynet_self_aware_time_ms_utc, expect)
def test_strtime(self):
expect = timeutils.strtime(self.skynet_self_aware_time_perfect)
self.assertEqual(self.skynet_self_aware_time_perfect_str, expect)
def test_parse_strtime(self):
perfect_time_format = self.skynet_self_aware_time_perfect_str
expect = timeutils.parse_strtime(perfect_time_format)
self.assertEqual(self.skynet_self_aware_time_perfect, expect)
def test_strtime_and_back(self):
orig_t = datetime.datetime(1997, 8, 29, 6, 14, 0)
s = timeutils.strtime(orig_t)
t = timeutils.parse_strtime(s)
self.assertEqual(orig_t, t)
def _test_is_older_than(self, fn):
strptime = datetime.datetime.strptime
with mock.patch('datetime.datetime') as datetime_mock:
datetime_mock.utcnow.return_value = self.skynet_self_aware_time
datetime_mock.strptime = strptime
expect_true = timeutils.is_older_than(fn(self.one_minute_before),
59)
self.assertTrue(expect_true)
expect_false = timeutils.is_older_than(fn(self.one_minute_before),
60)
self.assertFalse(expect_false)
expect_false = timeutils.is_older_than(fn(self.one_minute_before),
61)
self.assertFalse(expect_false)
def test_is_older_than_datetime(self):
self._test_is_older_than(lambda x: x)
def test_is_older_than_str(self):
self._test_is_older_than(timeutils.strtime)
def test_is_older_than_aware(self):
"""Tests sending is_older_than an 'aware' datetime."""
self._test_is_older_than(lambda x: x.replace(
tzinfo=iso8601.iso8601.UTC))
def _test_is_newer_than(self, fn):
strptime = datetime.datetime.strptime
with mock.patch('datetime.datetime') as datetime_mock:
datetime_mock.utcnow.return_value = self.skynet_self_aware_time
datetime_mock.strptime = strptime
expect_true = timeutils.is_newer_than(fn(self.one_minute_after),
59)
self.assertTrue(expect_true)
expect_false = timeutils.is_newer_than(fn(self.one_minute_after),
60)
self.assertFalse(expect_false)
expect_false = timeutils.is_newer_than(fn(self.one_minute_after),
61)
self.assertFalse(expect_false)
def test_is_newer_than_datetime(self):
self._test_is_newer_than(lambda x: x)
def test_is_newer_than_str(self):
self._test_is_newer_than(timeutils.strtime)
def test_is_newer_than_aware(self):
"""Tests sending is_newer_than an 'aware' datetime."""
self._test_is_newer_than(lambda x: x.replace(
tzinfo=iso8601.iso8601.UTC))
def test_set_time_override_using_default(self):
now = timeutils.utcnow_ts()
# NOTE(kgriffs): Normally it's bad form to sleep in a unit test,
# but this is the only way to test that set_time_override defaults
# to setting the override to the current time.
time.sleep(1)
timeutils.set_time_override()
overriden_now = timeutils.utcnow_ts()
self.assertThat(now, matchers.LessThan(overriden_now))
def test_utcnow_ts(self):
skynet_self_aware_ts = 872835240
skynet_dt = datetime.datetime.utcfromtimestamp(skynet_self_aware_ts)
self.assertEqual(self.skynet_self_aware_time, skynet_dt)
# NOTE(kgriffs): timeutils.utcnow_ts() uses time.time()
# IFF time override is not set.
with mock.patch('time.time') as time_mock:
time_mock.return_value = skynet_self_aware_ts
ts = timeutils.utcnow_ts()
self.assertEqual(ts, skynet_self_aware_ts)
timeutils.set_time_override(skynet_dt)
ts = timeutils.utcnow_ts()
self.assertEqual(ts, skynet_self_aware_ts)
def test_utcnow_ts_microsecond(self):
skynet_self_aware_ts = 872835240.000123
skynet_dt = datetime.datetime.utcfromtimestamp(skynet_self_aware_ts)
self.assertEqual(self.skynet_self_aware_ms_time, skynet_dt)
# NOTE(kgriffs): timeutils.utcnow_ts() uses time.time()
# IFF time override is not set.
with mock.patch('time.time') as time_mock:
time_mock.return_value = skynet_self_aware_ts
ts = timeutils.utcnow_ts(microsecond=True)
self.assertEqual(ts, skynet_self_aware_ts)
timeutils.set_time_override(skynet_dt)
ts = timeutils.utcnow_ts(microsecond=True)
self.assertEqual(ts, skynet_self_aware_ts)
def test_utcnow(self):
timeutils.set_time_override(mock.sentinel.utcnow)
self.assertEqual(timeutils.utcnow(), mock.sentinel.utcnow)
timeutils.clear_time_override()
self.assertFalse(timeutils.utcnow() == mock.sentinel.utcnow)
self.assertTrue(timeutils.utcnow())
self.assertEqual(timeutils.utcnow(True).tzinfo,
iso8601.iso8601.UTC)
def test_advance_time_delta(self):
timeutils.set_time_override(self.one_minute_before)
timeutils.advance_time_delta(datetime.timedelta(seconds=60))
self.assertEqual(timeutils.utcnow(), self.skynet_self_aware_time)
def test_advance_time_seconds(self):
timeutils.set_time_override(self.one_minute_before)
timeutils.advance_time_seconds(60)
self.assertEqual(timeutils.utcnow(), self.skynet_self_aware_time)
def test_marshall_time(self):
now = timeutils.utcnow()
binary = timeutils.marshall_now(now)
backagain = timeutils.unmarshall_time(binary)
self.assertEqual(now, backagain)
def test_delta_seconds(self):
before = timeutils.utcnow()
after = before + datetime.timedelta(days=7, seconds=59,
microseconds=123456)
self.assertAlmostEquals(604859.123456,
timeutils.delta_seconds(before, after))
def test_total_seconds(self):
delta = datetime.timedelta(days=1, hours=2, minutes=3, seconds=4.5)
self.assertAlmostEquals(93784.5,
timeutils.total_seconds(delta))
def test_iso8601_from_timestamp(self):
utcnow = timeutils.utcnow()
iso = timeutils.isotime(utcnow)
ts = calendar.timegm(utcnow.timetuple())
self.assertEqual(iso, timeutils.iso8601_from_timestamp(ts))
def test_is_soon(self):
expires = timeutils.utcnow() + datetime.timedelta(minutes=5)
self.assertFalse(timeutils.is_soon(expires, 120))
self.assertTrue(timeutils.is_soon(expires, 300))
self.assertTrue(timeutils.is_soon(expires, 600))
with mock.patch('datetime.datetime') as datetime_mock:
datetime_mock.utcnow.return_value = self.skynet_self_aware_time
expires = timeutils.utcnow()
self.assertTrue(timeutils.is_soon(expires, 0))
class TestIso8601Time(test_base.BaseTestCase):
def _instaneous(self, timestamp, yr, mon, day, hr, minute, sec, micro):
self.assertEqual(timestamp.year, yr)
self.assertEqual(timestamp.month, mon)
self.assertEqual(timestamp.day, day)
self.assertEqual(timestamp.hour, hr)
self.assertEqual(timestamp.minute, minute)
self.assertEqual(timestamp.second, sec)
self.assertEqual(timestamp.microsecond, micro)
def _do_test(self, time_str, yr, mon, day, hr, minute, sec, micro, shift):
DAY_SECONDS = 24 * 60 * 60
timestamp = timeutils.parse_isotime(time_str)
self._instaneous(timestamp, yr, mon, day, hr, minute, sec, micro)
offset = timestamp.tzinfo.utcoffset(None)
self.assertEqual(offset.seconds + offset.days * DAY_SECONDS, shift)
def test_zulu(self):
time_str = '2012-02-14T20:53:07Z'
self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 0, 0)
def test_zulu_micros(self):
time_str = '2012-02-14T20:53:07.123Z'
self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 123000, 0)
def test_offset_east(self):
time_str = '2012-02-14T20:53:07+04:30'
offset = 4.5 * 60 * 60
self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 0, offset)
def test_offset_east_micros(self):
time_str = '2012-02-14T20:53:07.42+04:30'
offset = 4.5 * 60 * 60
self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 420000, offset)
def test_offset_west(self):
time_str = '2012-02-14T20:53:07-05:30'
offset = -5.5 * 60 * 60
self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 0, offset)
def test_offset_west_micros(self):
time_str = '2012-02-14T20:53:07.654321-05:30'
offset = -5.5 * 60 * 60
self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 654321, offset)
def test_compare(self):
zulu = timeutils.parse_isotime('2012-02-14T20:53:07')
east = timeutils.parse_isotime('2012-02-14T20:53:07-01:00')
west = timeutils.parse_isotime('2012-02-14T20:53:07+01:00')
self.assertTrue(east > west)
self.assertTrue(east > zulu)
self.assertTrue(zulu > west)
def test_compare_micros(self):
zulu = timeutils.parse_isotime('2012-02-14T20:53:07.6544')
east = timeutils.parse_isotime('2012-02-14T19:53:07.654321-01:00')
west = timeutils.parse_isotime('2012-02-14T21:53:07.655+01:00')
self.assertTrue(east < west)
self.assertTrue(east < zulu)
self.assertTrue(zulu < west)
def test_zulu_roundtrip(self):
time_str = '2012-02-14T20:53:07Z'
zulu = timeutils.parse_isotime(time_str)
self.assertEqual(zulu.tzinfo, iso8601.iso8601.UTC)
self.assertEqual(timeutils.isotime(zulu), time_str)
def test_east_roundtrip(self):
time_str = '2012-02-14T20:53:07-07:00'
east = timeutils.parse_isotime(time_str)
self.assertEqual(east.tzinfo.tzname(None), '-07:00')
self.assertEqual(timeutils.isotime(east), time_str)
def test_west_roundtrip(self):
time_str = '2012-02-14T20:53:07+11:30'
west = timeutils.parse_isotime(time_str)
self.assertEqual(west.tzinfo.tzname(None), '+11:30')
self.assertEqual(timeutils.isotime(west), time_str)
def test_now_roundtrip(self):
time_str = timeutils.isotime()
now = timeutils.parse_isotime(time_str)
self.assertEqual(now.tzinfo, iso8601.iso8601.UTC)
self.assertEqual(timeutils.isotime(now), time_str)
def test_zulu_normalize(self):
time_str = '2012-02-14T20:53:07Z'
zulu = timeutils.parse_isotime(time_str)
normed = timeutils.normalize_time(zulu)
self._instaneous(normed, 2012, 2, 14, 20, 53, 7, 0)
def test_east_normalize(self):
time_str = '2012-02-14T20:53:07-07:00'
east = timeutils.parse_isotime(time_str)
normed = timeutils.normalize_time(east)
self._instaneous(normed, 2012, 2, 15, 3, 53, 7, 0)
def test_west_normalize(self):
time_str = '2012-02-14T20:53:07+21:00'
west = timeutils.parse_isotime(time_str)
normed = timeutils.normalize_time(west)
self._instaneous(normed, 2012, 2, 13, 23, 53, 7, 0)
def test_normalize_aware_to_naive(self):
dt = datetime.datetime(2011, 2, 14, 20, 53, 7)
time_str = '2011-02-14T20:53:07+21:00'
aware = timeutils.parse_isotime(time_str)
naive = timeutils.normalize_time(aware)
self.assertTrue(naive < dt)
def test_normalize_zulu_aware_to_naive(self):
dt = datetime.datetime(2011, 2, 14, 20, 53, 7)
time_str = '2011-02-14T19:53:07Z'
aware = timeutils.parse_isotime(time_str)
naive = timeutils.normalize_time(aware)
self.assertTrue(naive < dt)
def test_normalize_naive(self):
dt = datetime.datetime(2011, 2, 14, 20, 53, 7)
dtn = datetime.datetime(2011, 2, 14, 19, 53, 7)
naive = timeutils.normalize_time(dtn)
self.assertTrue(naive < dt)

View File

@ -1,54 +0,0 @@
# Copyright (c) 2012 Intel Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from oslotest import base as test_base
from oslo.utils import uuidutils
class UUIDUtilsTest(test_base.BaseTestCase):
def test_generate_uuid(self):
uuid_string = uuidutils.generate_uuid()
self.assertTrue(isinstance(uuid_string, str))
self.assertEqual(len(uuid_string), 36)
# make sure there are 4 dashes
self.assertEqual(len(uuid_string.replace('-', '')), 32)
def test_is_uuid_like(self):
self.assertTrue(uuidutils.is_uuid_like(str(uuid.uuid4())))
self.assertTrue(uuidutils.is_uuid_like(
'{12345678-1234-5678-1234-567812345678}'))
self.assertTrue(uuidutils.is_uuid_like(
'12345678123456781234567812345678'))
self.assertTrue(uuidutils.is_uuid_like(
'urn:uuid:12345678-1234-5678-1234-567812345678'))
self.assertTrue(uuidutils.is_uuid_like(
'urn:bbbaaaaa-aaaa-aaaa-aabb-bbbbbbbbbbbb'))
self.assertTrue(uuidutils.is_uuid_like(
'uuid:bbbaaaaa-aaaa-aaaa-aabb-bbbbbbbbbbbb'))
self.assertTrue(uuidutils.is_uuid_like(
'{}---bbb---aaa--aaa--aaa-----aaa---aaa--bbb-bbb---bbb-bbb-bb-{}'))
def test_is_uuid_like_insensitive(self):
self.assertTrue(uuidutils.is_uuid_like(str(uuid.uuid4()).upper()))
def test_id_is_uuid_like(self):
self.assertFalse(uuidutils.is_uuid_like(1234567))
def test_name_is_uuid_like(self):
self.assertFalse(uuidutils.is_uuid_like('zhongyueluo'))

View File

@ -1,61 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import imp
import os
import warnings
import mock
from oslotest import base as test_base
import six
class DeprecationWarningTest(test_base.BaseTestCase):
@mock.patch('warnings.warn')
def test_warning(self, mock_warn):
import oslo.utils
imp.reload(oslo.utils)
self.assertTrue(mock_warn.called)
args = mock_warn.call_args
self.assertIn('oslo_utils', args[0][0])
self.assertIn('deprecated', args[0][0])
self.assertTrue(issubclass(args[0][1], DeprecationWarning))
def test_real_warning(self):
with warnings.catch_warnings(record=True) as warning_msgs:
warnings.resetwarnings()
warnings.simplefilter('always', DeprecationWarning)
import oslo.utils
# Use a separate function to get the stack level correct
# so we know the message points back to this file. This
# corresponds to an import or reload, which isn't working
# inside the test under Python 3.3. That may be due to a
# difference in the import implementation not triggering
# warnings properly when the module is reloaded, or
# because the warnings module is mostly implemented in C
# and something isn't cleanly resetting the global state
# used to track whether a warning needs to be
# emitted. Whatever the cause, we definitely see the
# warnings.warn() being invoked on a reload (see the test
# above) and warnings are reported on the console when we
# run the tests. A simpler test script run outside of
# testr does correctly report the warnings.
def foo():
oslo.utils.deprecated()
foo()
self.assertEqual(1, len(warning_msgs))
msg = warning_msgs[0]
self.assertIn('oslo_utils', six.text_type(msg.message))
self.assertEqual('test_warning.py', os.path.basename(msg.filename))

View File

@ -1,105 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslotest import base as test_base
import six
from oslo.utils import encodeutils
class EncodeUtilsTest(test_base.BaseTestCase):
def test_safe_decode(self):
safe_decode = encodeutils.safe_decode
self.assertRaises(TypeError, safe_decode, True)
self.assertEqual(six.u('ni\xf1o'), safe_decode(six.b("ni\xc3\xb1o"),
incoming="utf-8"))
if six.PY2:
# In Python 3, bytes.decode() doesn't support anymore
# bytes => bytes encodings like base64
self.assertEqual(six.u("test"), safe_decode("dGVzdA==",
incoming='base64'))
self.assertEqual(six.u("strange"), safe_decode(six.b('\x80strange'),
errors='ignore'))
self.assertEqual(six.u('\xc0'), safe_decode(six.b('\xc0'),
incoming='iso-8859-1'))
# Forcing incoming to ascii so it falls back to utf-8
self.assertEqual(six.u('ni\xf1o'), safe_decode(six.b('ni\xc3\xb1o'),
incoming='ascii'))
self.assertEqual(six.u('foo'), safe_decode(b'foo'))
def test_safe_encode_none_instead_of_text(self):
self.assertRaises(TypeError, encodeutils.safe_encode, None)
def test_safe_encode_bool_instead_of_text(self):
self.assertRaises(TypeError, encodeutils.safe_encode, True)
def test_safe_encode_int_instead_of_text(self):
self.assertRaises(TypeError, encodeutils.safe_encode, 1)
def test_safe_encode_list_instead_of_text(self):
self.assertRaises(TypeError, encodeutils.safe_encode, [])
def test_safe_encode_dict_instead_of_text(self):
self.assertRaises(TypeError, encodeutils.safe_encode, {})
def test_safe_encode_tuple_instead_of_text(self):
self.assertRaises(TypeError, encodeutils.safe_encode, ('foo', 'bar', ))
def test_safe_encode_py2(self):
if six.PY2:
# In Python 3, str.encode() doesn't support anymore
# text => text encodings like base64
self.assertEqual(
six.b("dGVzdA==\n"),
encodeutils.safe_encode("test", encoding='base64'),
)
else:
self.skipTest("Requires py2.x")
def test_safe_encode_force_incoming_utf8_to_ascii(self):
# Forcing incoming to ascii so it falls back to utf-8
self.assertEqual(
six.b('ni\xc3\xb1o'),
encodeutils.safe_encode(six.b('ni\xc3\xb1o'), incoming='ascii'),
)
def test_safe_encode_same_encoding_different_cases(self):
with mock.patch.object(encodeutils, 'safe_decode', mock.Mock()):
utf8 = encodeutils.safe_encode(
six.u('foo\xf1bar'), encoding='utf-8')
self.assertEqual(
encodeutils.safe_encode(utf8, 'UTF-8', 'utf-8'),
encodeutils.safe_encode(utf8, 'utf-8', 'UTF-8'),
)
self.assertEqual(
encodeutils.safe_encode(utf8, 'UTF-8', 'utf-8'),
encodeutils.safe_encode(utf8, 'utf-8', 'utf-8'),
)
encodeutils.safe_decode.assert_has_calls([])
def test_safe_encode_different_encodings(self):
text = six.u('foo\xc3\xb1bar')
result = encodeutils.safe_encode(
text=text, incoming='utf-8', encoding='iso-8859-1')
self.assertNotEqual(text, result)
self.assertNotEqual(six.b("foo\xf1bar"), result)