oslo.utils/oslo_utils/tests/test_fileutils.py

319 lines
10 KiB
Python

# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import errno
import hashlib
import json
import os
import shutil
import stat
import tempfile
import time
from unittest import mock
import uuid
import yaml
from oslotest import base as test_base
from oslo_utils import fileutils
TEST_PERMISSIONS = stat.S_IRWXU
class EnsureTree(test_base.BaseTestCase):
def test_ensure_tree(self):
tmpdir = tempfile.mkdtemp()
try:
testdir = '%s/foo/bar/baz' % (tmpdir,)
fileutils.ensure_tree(testdir, TEST_PERMISSIONS)
self.assertTrue(os.path.isdir(testdir))
self.assertEqual(os.stat(testdir).st_mode,
TEST_PERMISSIONS | stat.S_IFDIR)
finally:
if os.path.exists(tmpdir):
shutil.rmtree(tmpdir)
class DeleteIfExists(test_base.BaseTestCase):
def test_file_present(self):
tmpfile = tempfile.mktemp()
open(tmpfile, 'w')
fileutils.delete_if_exists(tmpfile)
self.assertFalse(os.path.exists(tmpfile))
def test_file_absent(self):
tmpfile = tempfile.mktemp()
fileutils.delete_if_exists(tmpfile)
self.assertFalse(os.path.exists(tmpfile))
def test_dir_present(self):
tmpdir = tempfile.mktemp()
os.mkdir(tmpdir)
fileutils.delete_if_exists(tmpdir, remove=os.rmdir)
self.assertFalse(os.path.exists(tmpdir))
def test_file_error(self):
def errm(path):
raise OSError(errno.EINVAL, '')
tmpfile = tempfile.mktemp()
open(tmpfile, 'w')
self.assertRaises(OSError, fileutils.delete_if_exists, tmpfile, errm)
os.unlink(tmpfile)
class RemovePathOnError(test_base.BaseTestCase):
def test_error(self):
tmpfile = tempfile.mktemp()
open(tmpfile, 'w')
try:
with fileutils.remove_path_on_error(tmpfile):
raise Exception
except Exception:
self.assertFalse(os.path.exists(tmpfile))
def test_no_error(self):
tmpfile = tempfile.mktemp()
open(tmpfile, 'w')
with fileutils.remove_path_on_error(tmpfile):
pass
self.assertTrue(os.path.exists(tmpfile))
os.unlink(tmpfile)
def test_remove(self):
tmpfile = tempfile.mktemp()
open(tmpfile, 'w')
try:
with fileutils.remove_path_on_error(tmpfile, remove=lambda x: x):
raise Exception
except Exception:
self.assertTrue(os.path.exists(tmpfile))
os.unlink(tmpfile)
def test_remove_dir(self):
tmpdir = tempfile.mktemp()
os.mkdir(tmpdir)
try:
with fileutils.remove_path_on_error(
tmpdir,
lambda path: fileutils.delete_if_exists(path, os.rmdir)):
raise Exception
except Exception:
self.assertFalse(os.path.exists(tmpdir))
class WriteToTempfileTestCase(test_base.BaseTestCase):
def setUp(self):
super(WriteToTempfileTestCase, self).setUp()
self.content = 'testing123'.encode('ascii')
def check_file_content(self, path):
with open(path, 'r') as fd:
ans = fd.read()
self.assertEqual(self.content, ans.encode("latin-1"))
def test_file_without_path_and_suffix(self):
res = fileutils.write_to_tempfile(self.content)
self.assertTrue(os.path.exists(res))
(basepath, tmpfile) = os.path.split(res)
self.assertTrue(basepath.startswith(tempfile.gettempdir()))
self.assertTrue(tmpfile.startswith('tmp'))
self.check_file_content(res)
def test_file_with_not_existing_path(self):
random_dir = uuid.uuid4().hex
path = '/tmp/%s/test1' % random_dir
res = fileutils.write_to_tempfile(self.content, path=path)
self.assertTrue(os.path.exists(res))
(basepath, tmpfile) = os.path.split(res)
self.assertEqual(basepath, path)
self.assertTrue(tmpfile.startswith('tmp'))
self.check_file_content(res)
shutil.rmtree('/tmp/' + random_dir)
def test_file_with_not_default_suffix(self):
suffix = '.conf'
res = fileutils.write_to_tempfile(self.content, suffix=suffix)
self.assertTrue(os.path.exists(res))
(basepath, tmpfile) = os.path.split(res)
self.assertTrue(basepath.startswith(tempfile.gettempdir()))
self.assertTrue(tmpfile.startswith('tmp'))
self.assertTrue(tmpfile.endswith('.conf'))
self.check_file_content(res)
def test_file_with_not_existing_path_and_not_default_suffix(self):
suffix = '.txt'
random_dir = uuid.uuid4().hex
path = '/tmp/%s/test2' % random_dir
res = fileutils.write_to_tempfile(self.content,
path=path,
suffix=suffix)
self.assertTrue(os.path.exists(res))
(basepath, tmpfile) = os.path.split(res)
self.assertTrue(tmpfile.startswith('tmp'))
self.assertEqual(basepath, path)
self.assertTrue(tmpfile.endswith(suffix))
self.check_file_content(res)
shutil.rmtree('/tmp/' + random_dir)
def test_file_with_not_default_prefix(self):
prefix = 'test'
res = fileutils.write_to_tempfile(self.content, prefix=prefix)
self.assertTrue(os.path.exists(res))
(basepath, tmpfile) = os.path.split(res)
self.assertTrue(tmpfile.startswith(prefix))
self.assertTrue(basepath.startswith(tempfile.gettempdir()))
self.check_file_content(res)
class TestComputeFileChecksum(test_base.BaseTestCase):
def setUp(self):
super(TestComputeFileChecksum, self).setUp()
self.content = 'fake_content'.encode('ascii')
def check_file_content(self, content, path):
with open(path, 'r') as fd:
ans = fd.read()
self.assertEqual(content, ans.encode("latin-1"))
def test_compute_checksum_default_algorithm(self):
path = fileutils.write_to_tempfile(self.content)
self.assertTrue(os.path.exists(path))
self.check_file_content(self.content, path)
expected_checksum = hashlib.sha256()
expected_checksum.update(self.content)
actual_checksum = fileutils.compute_file_checksum(path)
self.assertEqual(expected_checksum.hexdigest(), actual_checksum)
def test_compute_checksum_sleep_0_called(self):
path = fileutils.write_to_tempfile(self.content)
self.assertTrue(os.path.exists(path))
self.check_file_content(self.content, path)
expected_checksum = hashlib.sha256()
expected_checksum.update(self.content)
with mock.patch.object(time, "sleep") as sleep_mock:
actual_checksum = fileutils.compute_file_checksum(
path, read_chunksize=4)
sleep_mock.assert_has_calls([mock.call(0)] * 3)
# Just to make sure that there were exactly 3 calls
self.assertEqual(3, sleep_mock.call_count)
self.assertEqual(expected_checksum.hexdigest(), actual_checksum)
def test_compute_checksum_named_algorithm(self):
path = fileutils.write_to_tempfile(self.content)
self.assertTrue(os.path.exists(path))
self.check_file_content(self.content, path)
expected_checksum = hashlib.sha512()
expected_checksum.update(self.content)
actual_checksum = fileutils.compute_file_checksum(path,
algorithm='sha512')
self.assertEqual(expected_checksum.hexdigest(), actual_checksum)
def test_compute_checksum_invalid_algorithm(self):
path = fileutils.write_to_tempfile(self.content)
self.assertTrue(os.path.exists(path))
self.check_file_content(self.content, path)
self.assertRaises(ValueError, fileutils.compute_file_checksum,
path, algorithm='foo')
def test_file_does_not_exist(self):
random_file_name = uuid.uuid4().hex
path = os.path.join('/tmp', random_file_name)
self.assertRaises(IOError, fileutils.compute_file_checksum, path)
def test_generic_io_error(self):
tempdir = tempfile.mkdtemp()
self.assertRaises(IOError, fileutils.compute_file_checksum, tempdir)
class LastBytesTestCase(test_base.BaseTestCase):
"""Test the last_bytes() utility method."""
def setUp(self):
super(LastBytesTestCase, self).setUp()
self.content = b'1234567890'
def test_truncated(self):
res = fileutils.write_to_tempfile(self.content)
self.assertTrue(os.path.exists(res))
out, unread_bytes = fileutils.last_bytes(res, 5)
self.assertEqual(b'67890', out)
self.assertGreater(unread_bytes, 0)
def test_read_all(self):
res = fileutils.write_to_tempfile(self.content)
self.assertTrue(os.path.exists(res))
out, unread_bytes = fileutils.last_bytes(res, 1000)
self.assertEqual(b'1234567890', out)
self.assertEqual(0, unread_bytes)
def test_non_exist_file(self):
self.assertRaises(IOError, fileutils.last_bytes,
'non_exist_file', 1000)
class FileTypeTestCase(test_base.BaseTestCase):
"""Test the is_yaml() and is_json() utility methods."""
def setUp(self):
super(FileTypeTestCase, self).setUp()
data = {
'name': 'test',
'website': 'example.com'
}
temp_dir = tempfile.mkdtemp()
self.json_file = tempfile.mktemp(dir=temp_dir)
self.yaml_file = tempfile.mktemp(dir=temp_dir)
with open(self.json_file, 'w') as fh:
json.dump(data, fh)
with open(self.yaml_file, 'w') as fh:
yaml.dump(data, fh)
def test_is_json(self):
self.assertTrue(fileutils.is_json(self.json_file))
self.assertFalse(fileutils.is_json(self.yaml_file))
def test_is_yaml(self):
self.assertTrue(fileutils.is_yaml(self.yaml_file))
self.assertFalse(fileutils.is_yaml(self.json_file))