d512e4763d
pycrypto is unmaintained for a while and requirements team advises using pycryptodome or cryptography libraries instead (see openstack-dev thread [0]). troveclient uses pycrypto to generate random data/content in its tests which does not require any specific cryptographic need. This commit replaces it by os.urandom from Python standard library. [0] http://lists.openstack.org/pipermail/openstack-dev/2017-March/113568.html Change-Id: Ia43c5883a51e8412c4a238d127c13db39e863da1
99 lines
3.9 KiB
Python
99 lines
3.9 KiB
Python
# Copyright 2011 OpenStack Foundation
|
|
# Copyright 2013 Rackspace Hosting
|
|
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import os
|
|
import tempfile
|
|
import testtools
|
|
|
|
from troveclient import utils
|
|
|
|
|
|
class UtilsTest(testtools.TestCase):
|
|
|
|
def func(self):
|
|
pass
|
|
|
|
def test_add_hookable_mixin(self):
|
|
hook_type = "hook_type"
|
|
mixin = utils.HookableMixin()
|
|
mixin.add_hook(hook_type, self.func)
|
|
self.assertIn(hook_type, mixin._hooks_map)
|
|
self.assertIn(self.func, mixin._hooks_map[hook_type])
|
|
|
|
def test_run_hookable_mixin(self):
|
|
hook_type = "hook_type"
|
|
mixin = utils.HookableMixin()
|
|
mixin.add_hook(hook_type, self.func)
|
|
mixin.run_hooks(hook_type)
|
|
|
|
def test_environment(self):
|
|
self.assertEqual('', utils.env())
|
|
self.assertEqual('passing', utils.env(default='passing'))
|
|
|
|
os.environ['test_abc'] = 'passing'
|
|
self.assertEqual('passing', utils.env('test_abc'))
|
|
self.assertEqual('', utils.env('test_abcd'))
|
|
|
|
def test_encode_decode_data(self):
|
|
text_data_str = 'This is a text string'
|
|
try:
|
|
text_data_bytes = bytes('This is a byte stream', 'utf-8')
|
|
except TypeError:
|
|
text_data_bytes = bytes('This is a byte stream')
|
|
random_data_str = os.urandom(12)
|
|
random_data_bytes = bytearray(os.urandom(12))
|
|
special_char_str = '\x00\xFF\x00\xFF\xFF\x00'
|
|
special_char_bytes = bytearray(
|
|
[ord(item) for item in special_char_str])
|
|
data = [text_data_str,
|
|
text_data_bytes,
|
|
random_data_str,
|
|
random_data_bytes,
|
|
special_char_str,
|
|
special_char_bytes]
|
|
|
|
for datum in data:
|
|
# the deserialized data is always a bytearray
|
|
try:
|
|
expected_deserialized = bytearray(
|
|
[ord(item) for item in datum])
|
|
except TypeError:
|
|
expected_deserialized = bytearray(
|
|
[item for item in datum])
|
|
serialized_data = utils.encode_data(datum)
|
|
self.assertIsNotNone(serialized_data, "'%s' serialized is None" %
|
|
datum)
|
|
deserialized_data = utils.decode_data(serialized_data)
|
|
self.assertIsNotNone(deserialized_data, "'%s' deserialized is None"
|
|
% datum)
|
|
self.assertEqual(expected_deserialized, deserialized_data,
|
|
"Serialize/Deserialize failed")
|
|
# Now we write the data to a file and read it back in
|
|
# to make sure the round-trip doesn't change anything.
|
|
with tempfile.NamedTemporaryFile() as temp_file:
|
|
with open(temp_file.name, 'wb') as fh_w:
|
|
fh_w.write(
|
|
bytearray([ord(item) for item in serialized_data]))
|
|
with open(temp_file.name, 'rb') as fh_r:
|
|
new_serialized_data = fh_r.read()
|
|
new_deserialized_data = utils.decode_data(
|
|
new_serialized_data)
|
|
self.assertIsNotNone(new_deserialized_data,
|
|
"'%s' deserialized is None" % datum)
|
|
self.assertEqual(expected_deserialized, new_deserialized_data,
|
|
"Serialize/Deserialize with files failed")
|