Nothing was using it. Change-Id: I5219ec6f6bce88b57bda7808720c1744951cc186 Signed-off-by: Stephen Finucane <sfinucan@redhat.com>
319 lines
10 KiB
Python
319 lines
10 KiB
Python
# Copyright 2011 OpenStack Foundation.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import errno
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import shutil
|
|
import stat
|
|
import tempfile
|
|
import time
|
|
from unittest import mock
|
|
import uuid
|
|
import yaml
|
|
|
|
from oslo_utils import fileutils
|
|
from oslo_utils.tests import base as test_base
|
|
|
|
TEST_PERMISSIONS = stat.S_IRWXU
|
|
|
|
|
|
class EnsureTree(test_base.BaseTestCase):
|
|
def test_ensure_tree(self):
|
|
tmpdir = tempfile.mkdtemp()
|
|
try:
|
|
testdir = f'{tmpdir}/foo/bar/baz'
|
|
fileutils.ensure_tree(testdir, TEST_PERMISSIONS)
|
|
self.assertTrue(os.path.isdir(testdir))
|
|
self.assertEqual(
|
|
os.stat(testdir).st_mode, TEST_PERMISSIONS | stat.S_IFDIR
|
|
)
|
|
finally:
|
|
if os.path.exists(tmpdir):
|
|
shutil.rmtree(tmpdir)
|
|
|
|
|
|
class DeleteIfExists(test_base.BaseTestCase):
|
|
def test_file_present(self):
|
|
tmpfile = tempfile.mktemp()
|
|
|
|
open(tmpfile, 'w')
|
|
fileutils.delete_if_exists(tmpfile)
|
|
self.assertFalse(os.path.exists(tmpfile))
|
|
|
|
def test_file_absent(self):
|
|
tmpfile = tempfile.mktemp()
|
|
|
|
fileutils.delete_if_exists(tmpfile)
|
|
self.assertFalse(os.path.exists(tmpfile))
|
|
|
|
def test_dir_present(self):
|
|
tmpdir = tempfile.mktemp()
|
|
os.mkdir(tmpdir)
|
|
|
|
fileutils.delete_if_exists(tmpdir, remove=os.rmdir)
|
|
self.assertFalse(os.path.exists(tmpdir))
|
|
|
|
def test_file_error(self):
|
|
def errm(path):
|
|
raise OSError(errno.EINVAL, '')
|
|
|
|
tmpfile = tempfile.mktemp()
|
|
|
|
open(tmpfile, 'w')
|
|
self.assertRaises(OSError, fileutils.delete_if_exists, tmpfile, errm)
|
|
os.unlink(tmpfile)
|
|
|
|
|
|
class RemovePathOnError(test_base.BaseTestCase):
|
|
def test_error(self):
|
|
tmpfile = tempfile.mktemp()
|
|
open(tmpfile, 'w')
|
|
|
|
try:
|
|
with fileutils.remove_path_on_error(tmpfile):
|
|
raise Exception
|
|
except Exception:
|
|
self.assertFalse(os.path.exists(tmpfile))
|
|
|
|
def test_no_error(self):
|
|
tmpfile = tempfile.mktemp()
|
|
open(tmpfile, 'w')
|
|
|
|
with fileutils.remove_path_on_error(tmpfile):
|
|
pass
|
|
self.assertTrue(os.path.exists(tmpfile))
|
|
os.unlink(tmpfile)
|
|
|
|
def test_remove(self):
|
|
tmpfile = tempfile.mktemp()
|
|
open(tmpfile, 'w')
|
|
|
|
try:
|
|
with fileutils.remove_path_on_error(tmpfile, remove=lambda x: x):
|
|
raise Exception
|
|
except Exception:
|
|
self.assertTrue(os.path.exists(tmpfile))
|
|
os.unlink(tmpfile)
|
|
|
|
def test_remove_dir(self):
|
|
tmpdir = tempfile.mktemp()
|
|
os.mkdir(tmpdir)
|
|
|
|
try:
|
|
with fileutils.remove_path_on_error(
|
|
tmpdir, lambda path: fileutils.delete_if_exists(path, os.rmdir)
|
|
):
|
|
raise Exception
|
|
except Exception:
|
|
self.assertFalse(os.path.exists(tmpdir))
|
|
|
|
|
|
class WriteToTempfileTestCase(test_base.BaseTestCase):
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.content = b'testing123'
|
|
|
|
def check_file_content(self, path):
|
|
with open(path) as fd:
|
|
ans = fd.read()
|
|
self.assertEqual(self.content, ans.encode("latin-1"))
|
|
|
|
def test_file_without_path_and_suffix(self):
|
|
res = fileutils.write_to_tempfile(self.content)
|
|
self.assertTrue(os.path.exists(res))
|
|
|
|
(basepath, tmpfile) = os.path.split(res)
|
|
self.assertTrue(basepath.startswith(tempfile.gettempdir()))
|
|
self.assertTrue(tmpfile.startswith('tmp'))
|
|
|
|
self.check_file_content(res)
|
|
|
|
def test_file_with_not_existing_path(self):
|
|
random_dir = uuid.uuid4().hex
|
|
path = f'/tmp/{random_dir}/test1'
|
|
res = fileutils.write_to_tempfile(self.content, path=path)
|
|
self.assertTrue(os.path.exists(res))
|
|
(basepath, tmpfile) = os.path.split(res)
|
|
self.assertEqual(basepath, path)
|
|
self.assertTrue(tmpfile.startswith('tmp'))
|
|
|
|
self.check_file_content(res)
|
|
shutil.rmtree('/tmp/' + random_dir)
|
|
|
|
def test_file_with_not_default_suffix(self):
|
|
suffix = '.conf'
|
|
res = fileutils.write_to_tempfile(self.content, suffix=suffix)
|
|
self.assertTrue(os.path.exists(res))
|
|
|
|
(basepath, tmpfile) = os.path.split(res)
|
|
self.assertTrue(basepath.startswith(tempfile.gettempdir()))
|
|
self.assertTrue(tmpfile.startswith('tmp'))
|
|
self.assertTrue(tmpfile.endswith('.conf'))
|
|
|
|
self.check_file_content(res)
|
|
|
|
def test_file_with_not_existing_path_and_not_default_suffix(self):
|
|
suffix = '.txt'
|
|
random_dir = uuid.uuid4().hex
|
|
path = f'/tmp/{random_dir}/test2'
|
|
res = fileutils.write_to_tempfile(
|
|
self.content, path=path, suffix=suffix
|
|
)
|
|
self.assertTrue(os.path.exists(res))
|
|
(basepath, tmpfile) = os.path.split(res)
|
|
self.assertTrue(tmpfile.startswith('tmp'))
|
|
self.assertEqual(basepath, path)
|
|
self.assertTrue(tmpfile.endswith(suffix))
|
|
|
|
self.check_file_content(res)
|
|
shutil.rmtree('/tmp/' + random_dir)
|
|
|
|
def test_file_with_not_default_prefix(self):
|
|
prefix = 'test'
|
|
res = fileutils.write_to_tempfile(self.content, prefix=prefix)
|
|
self.assertTrue(os.path.exists(res))
|
|
|
|
(basepath, tmpfile) = os.path.split(res)
|
|
self.assertTrue(tmpfile.startswith(prefix))
|
|
self.assertTrue(basepath.startswith(tempfile.gettempdir()))
|
|
|
|
self.check_file_content(res)
|
|
|
|
|
|
class TestComputeFileChecksum(test_base.BaseTestCase):
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.content = b'fake_content'
|
|
|
|
def check_file_content(self, content, path):
|
|
with open(path) as fd:
|
|
ans = fd.read()
|
|
self.assertEqual(content, ans.encode("latin-1"))
|
|
|
|
def test_compute_checksum_default_algorithm(self):
|
|
path = fileutils.write_to_tempfile(self.content)
|
|
self.assertTrue(os.path.exists(path))
|
|
self.check_file_content(self.content, path)
|
|
|
|
expected_checksum = hashlib.sha256()
|
|
expected_checksum.update(self.content)
|
|
|
|
actual_checksum = fileutils.compute_file_checksum(path)
|
|
|
|
self.assertEqual(expected_checksum.hexdigest(), actual_checksum)
|
|
|
|
def test_compute_checksum_sleep_0_called(self):
|
|
path = fileutils.write_to_tempfile(self.content)
|
|
self.assertTrue(os.path.exists(path))
|
|
self.check_file_content(self.content, path)
|
|
|
|
expected_checksum = hashlib.sha256()
|
|
expected_checksum.update(self.content)
|
|
|
|
with mock.patch.object(time, "sleep") as sleep_mock:
|
|
actual_checksum = fileutils.compute_file_checksum(
|
|
path, read_chunksize=4
|
|
)
|
|
|
|
sleep_mock.assert_has_calls([mock.call(0)] * 3)
|
|
# Just to make sure that there were exactly 3 calls
|
|
self.assertEqual(3, sleep_mock.call_count)
|
|
self.assertEqual(expected_checksum.hexdigest(), actual_checksum)
|
|
|
|
def test_compute_checksum_named_algorithm(self):
|
|
path = fileutils.write_to_tempfile(self.content)
|
|
self.assertTrue(os.path.exists(path))
|
|
self.check_file_content(self.content, path)
|
|
|
|
expected_checksum = hashlib.sha512()
|
|
expected_checksum.update(self.content)
|
|
|
|
actual_checksum = fileutils.compute_file_checksum(
|
|
path, algorithm='sha512'
|
|
)
|
|
|
|
self.assertEqual(expected_checksum.hexdigest(), actual_checksum)
|
|
|
|
def test_compute_checksum_invalid_algorithm(self):
|
|
path = fileutils.write_to_tempfile(self.content)
|
|
self.assertTrue(os.path.exists(path))
|
|
self.check_file_content(self.content, path)
|
|
|
|
self.assertRaises(
|
|
ValueError, fileutils.compute_file_checksum, path, algorithm='foo'
|
|
)
|
|
|
|
def test_file_does_not_exist(self):
|
|
random_file_name = uuid.uuid4().hex
|
|
path = os.path.join('/tmp', random_file_name)
|
|
self.assertRaises(IOError, fileutils.compute_file_checksum, path)
|
|
|
|
def test_generic_io_error(self):
|
|
tempdir = tempfile.mkdtemp()
|
|
self.assertRaises(IOError, fileutils.compute_file_checksum, tempdir)
|
|
|
|
|
|
class LastBytesTestCase(test_base.BaseTestCase):
|
|
"""Test the last_bytes() utility method."""
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.content = b'1234567890'
|
|
|
|
def test_truncated(self):
|
|
res = fileutils.write_to_tempfile(self.content)
|
|
self.assertTrue(os.path.exists(res))
|
|
out, unread_bytes = fileutils.last_bytes(res, 5)
|
|
self.assertEqual(b'67890', out)
|
|
self.assertGreater(unread_bytes, 0)
|
|
|
|
def test_read_all(self):
|
|
res = fileutils.write_to_tempfile(self.content)
|
|
self.assertTrue(os.path.exists(res))
|
|
out, unread_bytes = fileutils.last_bytes(res, 1000)
|
|
self.assertEqual(b'1234567890', out)
|
|
self.assertEqual(0, unread_bytes)
|
|
|
|
def test_non_exist_file(self):
|
|
self.assertRaises(
|
|
IOError, fileutils.last_bytes, 'non_exist_file', 1000
|
|
)
|
|
|
|
|
|
class FileTypeTestCase(test_base.BaseTestCase):
|
|
"""Test the is_yaml() and is_json() utility methods."""
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
data = {'name': 'test', 'website': 'example.com'}
|
|
temp_dir = tempfile.mkdtemp()
|
|
self.json_file = tempfile.mktemp(dir=temp_dir)
|
|
self.yaml_file = tempfile.mktemp(dir=temp_dir)
|
|
|
|
with open(self.json_file, 'w') as fh:
|
|
json.dump(data, fh)
|
|
with open(self.yaml_file, 'w') as fh:
|
|
yaml.dump(data, fh)
|
|
|
|
def test_is_json(self):
|
|
self.assertTrue(fileutils.is_json(self.json_file))
|
|
self.assertFalse(fileutils.is_json(self.yaml_file))
|
|
|
|
def test_is_yaml(self):
|
|
self.assertTrue(fileutils.is_yaml(self.yaml_file))
|
|
self.assertFalse(fileutils.is_yaml(self.json_file))
|