From bebc99cb4469ebfe3b2888aab9436c32a8d518f8 Mon Sep 17 00:00:00 2001 From: Hongbin Lu Date: Mon, 29 Dec 2014 05:06:48 +0000 Subject: [PATCH] Add more tests for magnum/common Test cases were ported from Nova and Ironic. Implements: blueprint increase-test-coverage Change-Id: Ibd262ac84af3167077c9d2c4fd6c1ea872b9b303 --- magnum/tests/common/test_context.py | 55 +++ magnum/tests/common/test_safeutils.py | 99 +++++ magnum/tests/common/test_utils.py | 501 ++++++++++++++++++++++++++ 3 files changed, 655 insertions(+) create mode 100644 magnum/tests/common/test_context.py create mode 100644 magnum/tests/common/test_safeutils.py create mode 100644 magnum/tests/common/test_utils.py diff --git a/magnum/tests/common/test_context.py b/magnum/tests/common/test_context.py new file mode 100644 index 0000000000..bba28c41f4 --- /dev/null +++ b/magnum/tests/common/test_context.py @@ -0,0 +1,55 @@ +# Copyright 2015 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from magnum.common import context +from magnum.tests import base + + +class ContextTestCase(base.TestCase): + + def test_context(self): + ctx = context.RequestContext(auth_token='auth_token1', + auth_url='auth_url1', + domain_id='domain_id1', + domain_name='domain_name1', + user='user1', tenant='tenant1', + is_admin=True, is_public_api=True, + read_only=True, show_deleted=True, + request_id='request_id1', + trust_id='trust_id1', + auth_token_info='auth_token_info1') + self.assertEqual("auth_token1", ctx.auth_token) + self.assertEqual("auth_url1", ctx.auth_url) + self.assertEqual("domain_id1", ctx.domain_id) + self.assertEqual("domain_name1", ctx.domain_name) + self.assertEqual("user1", ctx.user) + self.assertEqual("tenant1", ctx.tenant) + self.assertTrue(ctx.is_admin) + self.assertTrue(ctx.is_public_api) + self.assertTrue(ctx.read_only) + self.assertTrue(ctx.show_deleted) + self.assertEqual("request_id1", ctx.request_id) + self.assertEqual("trust_id1", ctx.trust_id) + self.assertEqual("auth_token_info1", ctx.auth_token_info) + + def test_to_dict_from_dict(self): + ctx = context.RequestContext(is_admin=True, user='foo', + tenant='foo') + self.assertTrue(ctx.is_admin) + self.assertIsNotNone(ctx.user) + self.assertIsNotNone(ctx.tenant) + ctx2 = context.RequestContext.from_dict(ctx.to_dict()) + self.assertTrue(ctx2.is_admin) + self.assertIsNone(ctx2.user) + self.assertIsNone(ctx2.tenant) \ No newline at end of file diff --git a/magnum/tests/common/test_safeutils.py b/magnum/tests/common/test_safeutils.py new file mode 100644 index 0000000000..1af8fc6a6e --- /dev/null +++ b/magnum/tests/common/test_safeutils.py @@ -0,0 +1,99 @@ +# Copyright 2011 Justin Santa Barbara +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from magnum.common import safe_utils +from magnum.tests import base + + +class GetCallArgsTestCase(base.BaseTestCase): + + def _test_func(self, instance, red=None, blue=None): + pass + + def test_all_kwargs(self): + args = () + kwargs = {'instance': {'uuid': 1}, 'red': 3, 'blue': 4} + callargs = safe_utils.getcallargs(self._test_func, *args, **kwargs) + # implicit self counts as an arg + self.assertEqual(4, len(callargs)) + self.assertIn('instance', callargs) + self.assertEqual({'uuid': 1}, callargs['instance']) + self.assertIn('red', callargs) + self.assertEqual(3, callargs['red']) + self.assertIn('blue', callargs) + self.assertEqual(4, callargs['blue']) + + def test_all_args(self): + args = ({'uuid': 1}, 3, 4) + kwargs = {} + callargs = safe_utils.getcallargs(self._test_func, *args, **kwargs) + # implicit self counts as an arg + self.assertEqual(4, len(callargs)) + self.assertIn('instance', callargs) + self.assertEqual({'uuid': 1}, callargs['instance']) + self.assertIn('red', callargs) + self.assertEqual(3, callargs['red']) + self.assertIn('blue', callargs) + self.assertEqual(4, callargs['blue']) + + def test_mixed_args(self): + args = ({'uuid': 1}, 3) + kwargs = {'blue': 4} + callargs = safe_utils.getcallargs(self._test_func, *args, **kwargs) + # implicit self counts as an arg + self.assertEqual(4, len(callargs)) + self.assertIn('instance', callargs) + self.assertEqual({'uuid': 1}, callargs['instance']) + self.assertIn('red', callargs) + self.assertEqual(3, callargs['red']) + self.assertIn('blue', callargs) + self.assertEqual(4, callargs['blue']) + + def test_partial_kwargs(self): + args = () + kwargs = {'instance': {'uuid': 1}, 'red': 3} + callargs = safe_utils.getcallargs(self._test_func, *args, **kwargs) + # implicit self counts as an arg + self.assertEqual(4, len(callargs)) + self.assertIn('instance', callargs) + self.assertEqual({'uuid': 1}, callargs['instance']) + self.assertIn('red', callargs) + self.assertEqual(3, callargs['red']) + self.assertIn('blue', callargs) + self.assertIsNone(callargs['blue']) + + def test_partial_args(self): + args = ({'uuid': 1}, 3) + kwargs = {} + callargs = safe_utils.getcallargs(self._test_func, *args, **kwargs) + # implicit self counts as an arg + self.assertEqual(4, len(callargs)) + self.assertIn('instance', callargs) + self.assertEqual({'uuid': 1}, callargs['instance']) + self.assertIn('red', callargs) + self.assertEqual(3, callargs['red']) + self.assertIn('blue', callargs) + self.assertIsNone(callargs['blue']) + + def test_partial_mixed_args(self): + args = (3,) + kwargs = {'instance': {'uuid': 1}} + callargs = safe_utils.getcallargs(self._test_func, *args, **kwargs) + self.assertEqual(4, len(callargs)) + self.assertIn('instance', callargs) + self.assertEqual({'uuid': 1}, callargs['instance']) + self.assertIn('red', callargs) + self.assertEqual(3, callargs['red']) + self.assertIn('blue', callargs) + self.assertIsNone(callargs['blue']) \ No newline at end of file diff --git a/magnum/tests/common/test_utils.py b/magnum/tests/common/test_utils.py new file mode 100644 index 0000000000..f8e18cafd7 --- /dev/null +++ b/magnum/tests/common/test_utils.py @@ -0,0 +1,501 @@ +# Copyright 2011 Justin Santa Barbara +# Copyright 2012 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import errno +import hashlib +import os +import os.path +import shutil +import tempfile +import uuid + +import mock +import netaddr +from oslo.config import cfg +from oslo_concurrency import processutils +import six +import six.moves.builtins as __builtin__ + +from magnum.common import exception +from magnum.common import utils +from magnum.tests import base + +CONF = cfg.CONF + + +class UtilsTestCase(base.TestCase): + + def test_random_alnum(self): + s = utils.random_alnum(10) + self.assertEqual(10, len(s)) + s = utils.random_alnum(100) + self.assertEqual(100, len(s)) + + def test_unlink(self): + with mock.patch.object(os, "unlink") as unlink_mock: + unlink_mock.return_value = None + utils.unlink_without_raise("/fake/path") + unlink_mock.assert_called_once_with("/fake/path") + + def test_unlink_ENOENT(self): + with mock.patch.object(os, "unlink") as unlink_mock: + unlink_mock.side_effect = OSError(errno.ENOENT) + utils.unlink_without_raise("/fake/path") + unlink_mock.assert_called_once_with("/fake/path") + + def test_create_link(self): + with mock.patch.object(os, "symlink") as symlink_mock: + symlink_mock.return_value = None + utils.create_link_without_raise("/fake/source", "/fake/link") + symlink_mock.assert_called_once_with("/fake/source", "/fake/link") + + def test_create_link_EEXIST(self): + with mock.patch.object(os, "symlink") as symlink_mock: + symlink_mock.side_effect = OSError(errno.EEXIST) + utils.create_link_without_raise("/fake/source", "/fake/link") + symlink_mock.assert_called_once_with("/fake/source", "/fake/link") + + +class ExecuteTestCase(base.TestCase): + + def test_retry_on_failure(self): + fd, tmpfilename = tempfile.mkstemp() + _, tmpfilename2 = tempfile.mkstemp() + try: + fp = os.fdopen(fd, 'w+') + fp.write('''#!/bin/sh +# If stdin fails to get passed during one of the runs, make a note. +if ! grep -q foo +then + echo 'failure' > "$1" +fi +# If stdin has failed to get passed during this or a previous run, exit early. +if grep failure "$1" +then + exit 1 +fi +runs="$(cat $1)" +if [ -z "$runs" ] +then + runs=0 +fi +runs=$(($runs + 1)) +echo $runs > "$1" +exit 1 +''') + fp.close() + os.chmod(tmpfilename, 0o755) + try: + self.assertRaises(processutils.ProcessExecutionError, + utils.execute, + tmpfilename, tmpfilename2, attempts=10, + process_input='foo', + delay_on_retry=False) + except OSError as e: + if e.errno == errno.EACCES: + self.skipTest("Permissions error detected. " + "Are you running with a noexec /tmp?") + else: + raise + fp = open(tmpfilename2, 'r') + runs = fp.read() + fp.close() + self.assertNotEqual(runs.strip(), 'failure', 'stdin did not ' + 'always get passed ' + 'correctly') + runs = int(runs.strip()) + self.assertEqual(10, runs, + 'Ran %d times instead of 10.' % (runs,)) + finally: + os.unlink(tmpfilename) + os.unlink(tmpfilename2) + + def test_unknown_kwargs_raises_error(self): + self.assertRaises(processutils.UnknownArgumentError, + utils.execute, + '/usr/bin/env', 'true', + this_is_not_a_valid_kwarg=True) + + def test_check_exit_code_boolean(self): + utils.execute('/usr/bin/env', 'false', check_exit_code=False) + self.assertRaises(processutils.ProcessExecutionError, + utils.execute, + '/usr/bin/env', 'false', check_exit_code=True) + + def test_no_retry_on_success(self): + fd, tmpfilename = tempfile.mkstemp() + _, tmpfilename2 = tempfile.mkstemp() + try: + fp = os.fdopen(fd, 'w+') + fp.write('''#!/bin/sh +# If we've already run, bail out. +grep -q foo "$1" && exit 1 +# Mark that we've run before. +echo foo > "$1" +# Check that stdin gets passed correctly. +grep foo +''') + fp.close() + os.chmod(tmpfilename, 0o755) + try: + utils.execute(tmpfilename, + tmpfilename2, + process_input='foo', + attempts=2) + except OSError as e: + if e.errno == errno.EACCES: + self.skipTest("Permissions error detected. " + "Are you running with a noexec /tmp?") + else: + raise + finally: + os.unlink(tmpfilename) + os.unlink(tmpfilename2) + + @mock.patch.object(processutils, 'execute') + @mock.patch.object(os.environ, 'copy', return_value={}) + def test_execute_use_standard_locale_no_env_variables(self, env_mock, + execute_mock): + utils.execute('foo', use_standard_locale=True) + execute_mock.assert_called_once_with('foo', + env_variables={'LC_ALL': 'C'}) + + @mock.patch.object(processutils, 'execute') + def test_execute_use_standard_locale_with_env_variables(self, + execute_mock): + utils.execute('foo', use_standard_locale=True, + env_variables={'foo': 'bar'}) + execute_mock.assert_called_once_with('foo', + env_variables={'LC_ALL': 'C', + 'foo': 'bar'}) + + @mock.patch.object(processutils, 'execute') + def test_execute_not_use_standard_locale(self, execute_mock): + utils.execute('foo', use_standard_locale=False, + env_variables={'foo': 'bar'}) + execute_mock.assert_called_once_with('foo', + env_variables={'foo': 'bar'}) + + def test_execute_get_root_helper(self): + with mock.patch.object(processutils, 'execute') as execute_mock: + helper = utils._get_root_helper() + utils.execute('foo', run_as_root=True) + execute_mock.assert_called_once_with('foo', run_as_root=True, + root_helper=helper) + + def test_execute_without_root_helper(self): + with mock.patch.object(processutils, 'execute') as execute_mock: + utils.execute('foo', run_as_root=False) + execute_mock.assert_called_once_with('foo', run_as_root=False) + + +class GenericUtilsTestCase(base.TestCase): + def test_hostname_unicode_sanitization(self): + hostname = u"\u7684.test.example.com" + self.assertEqual("test.example.com", + utils.sanitize_hostname(hostname)) + + def test_hostname_sanitize_periods(self): + hostname = "....test.example.com..." + self.assertEqual("test.example.com", + utils.sanitize_hostname(hostname)) + + def test_hostname_sanitize_dashes(self): + hostname = "----test.example.com---" + self.assertEqual("test.example.com", + utils.sanitize_hostname(hostname)) + + def test_hostname_sanitize_characters(self): + hostname = "(#@&$!(@*--#&91)(__=+--test-host.example!!.com-0+" + self.assertEqual("91----test-host.example.com-0", + utils.sanitize_hostname(hostname)) + + def test_hostname_translate(self): + hostname = "<}\x1fh\x10e\x08l\x02l\x05o\x12!{>" + self.assertEqual("hello", utils.sanitize_hostname(hostname)) + + def test_read_cached_file(self): + with mock.patch.object(os.path, "getmtime") as getmtime_mock: + getmtime_mock.return_value = 1 + + cache_data = {"data": 1123, "mtime": 1} + data = utils.read_cached_file("/this/is/a/fake", cache_data) + self.assertEqual(cache_data["data"], data) + getmtime_mock.assert_called_once_with(mock.ANY) + + def test_read_modified_cached_file(self): + with mock.patch.object(os.path, "getmtime") as getmtime_mock: + with mock.patch.object(__builtin__, 'open') as open_mock: + getmtime_mock.return_value = 2 + fake_contents = "lorem ipsum" + fake_file = mock.Mock() + fake_file.read.return_value = fake_contents + fake_context_manager = mock.MagicMock() + fake_context_manager.__enter__.return_value = fake_file + fake_context_manager.__exit__.return_value = None + open_mock.return_value = fake_context_manager + + cache_data = {"data": 1123, "mtime": 1} + self.reload_called = False + + def test_reload(reloaded_data): + self.assertEqual(fake_contents, reloaded_data) + self.reload_called = True + + data = utils.read_cached_file("/this/is/a/fake", + cache_data, + reload_func=test_reload) + + self.assertEqual(fake_contents, data) + self.assertTrue(self.reload_called) + getmtime_mock.assert_called_once_with(mock.ANY) + open_mock.assert_called_once_with(mock.ANY) + fake_file.read.assert_called_once_with() + fake_context_manager.__exit__.assert_called_once_with(mock.ANY, + mock.ANY, + mock.ANY) + fake_context_manager.__enter__.assert_called_once_with() + + def test_hash_file(self): + data = 'Mary had a little lamb, its fleece as white as snow' + flo = six.StringIO(data) + h1 = utils.hash_file(flo) + h2 = hashlib.sha1(data).hexdigest() + self.assertEqual(h1, h2) + + def test_is_valid_boolstr(self): + self.assertTrue(utils.is_valid_boolstr('true')) + self.assertTrue(utils.is_valid_boolstr('false')) + self.assertTrue(utils.is_valid_boolstr('yes')) + self.assertTrue(utils.is_valid_boolstr('no')) + self.assertTrue(utils.is_valid_boolstr('y')) + self.assertTrue(utils.is_valid_boolstr('n')) + self.assertTrue(utils.is_valid_boolstr('1')) + self.assertTrue(utils.is_valid_boolstr('0')) + + self.assertFalse(utils.is_valid_boolstr('maybe')) + self.assertFalse(utils.is_valid_boolstr('only on tuesdays')) + + def test_is_valid_ipv6_cidr(self): + self.assertTrue(utils.is_valid_ipv6_cidr("2600::/64")) + self.assertTrue(utils.is_valid_ipv6_cidr( + "abcd:ef01:2345:6789:abcd:ef01:192.168.254.254/48")) + self.assertTrue(utils.is_valid_ipv6_cidr( + "0000:0000:0000:0000:0000:0000:0000:0001/32")) + self.assertTrue(utils.is_valid_ipv6_cidr( + "0000:0000:0000:0000:0000:0000:0000:0001")) + self.assertFalse(utils.is_valid_ipv6_cidr("foo")) + self.assertFalse(utils.is_valid_ipv6_cidr("127.0.0.1")) + + def test_get_shortened_ipv6(self): + self.assertEqual("abcd:ef01:2345:6789:abcd:ef01:c0a8:fefe", + utils.get_shortened_ipv6( + "abcd:ef01:2345:6789:abcd:ef01:192.168.254.254")) + self.assertEqual("::1", utils.get_shortened_ipv6( + "0000:0000:0000:0000:0000:0000:0000:0001")) + self.assertEqual("caca::caca:0:babe:201:102", + utils.get_shortened_ipv6( + "caca:0000:0000:caca:0000:babe:0201:0102")) + self.assertRaises(netaddr.AddrFormatError, utils.get_shortened_ipv6, + "127.0.0.1") + self.assertRaises(netaddr.AddrFormatError, utils.get_shortened_ipv6, + "failure") + + def test_get_shortened_ipv6_cidr(self): + self.assertEqual("2600::/64", utils.get_shortened_ipv6_cidr( + "2600:0000:0000:0000:0000:0000:0000:0000/64")) + self.assertEqual("2600::/64", utils.get_shortened_ipv6_cidr( + "2600::1/64")) + self.assertRaises(netaddr.AddrFormatError, + utils.get_shortened_ipv6_cidr, + "127.0.0.1") + self.assertRaises(netaddr.AddrFormatError, + utils.get_shortened_ipv6_cidr, + "failure") + + def test_is_valid_mac(self): + self.assertTrue(utils.is_valid_mac("52:54:00:cf:2d:31")) + self.assertTrue(utils.is_valid_mac(u"52:54:00:cf:2d:31")) + self.assertFalse(utils.is_valid_mac("127.0.0.1")) + self.assertFalse(utils.is_valid_mac("not:a:mac:address")) + self.assertFalse(utils.is_valid_mac("52-54-00-cf-2d-31")) + self.assertFalse(utils.is_valid_mac("aa bb cc dd ee ff")) + self.assertTrue(utils.is_valid_mac("AA:BB:CC:DD:EE:FF")) + self.assertFalse(utils.is_valid_mac("AA BB CC DD EE FF")) + self.assertFalse(utils.is_valid_mac("AA-BB-CC-DD-EE-FF")) + + def test_validate_and_normalize_mac(self): + mac = 'AA:BB:CC:DD:EE:FF' + with mock.patch.object(utils, 'is_valid_mac') as m_mock: + m_mock.return_value = True + self.assertEqual(mac.lower(), + utils.validate_and_normalize_mac(mac)) + + def test_validate_and_normalize_mac_invalid_format(self): + with mock.patch.object(utils, 'is_valid_mac') as m_mock: + m_mock.return_value = False + self.assertRaises(exception.InvalidMAC, + utils.validate_and_normalize_mac, 'invalid-mac') + + def test_safe_rstrip(self): + value = '/test/' + rstripped_value = '/test' + not_rstripped = '/' + + self.assertEqual(rstripped_value, utils.safe_rstrip(value, '/')) + self.assertEqual(not_rstripped, utils.safe_rstrip(not_rstripped, '/')) + + def test_safe_rstrip_not_raises_exceptions(self): + # Supplying an integer should normally raise an exception because it + # does not save the rstrip() method. + value = 10 + + # In the case of raising an exception safe_rstrip() should return the + # original value. + self.assertEqual(value, utils.safe_rstrip(value)) + + +class MkfsTestCase(base.TestCase): + + @mock.patch.object(utils, 'execute') + def test_mkfs(self, execute_mock): + utils.mkfs('ext4', '/my/block/dev') + utils.mkfs('msdos', '/my/msdos/block/dev') + utils.mkfs('swap', '/my/swap/block/dev') + + expected = [mock.call('mkfs', '-t', 'ext4', '-F', '/my/block/dev', + run_as_root=True, + use_standard_locale=True), + mock.call('mkfs', '-t', 'msdos', '/my/msdos/block/dev', + run_as_root=True, + use_standard_locale=True), + mock.call('mkswap', '/my/swap/block/dev', + run_as_root=True, + use_standard_locale=True)] + self.assertEqual(expected, execute_mock.call_args_list) + + @mock.patch.object(utils, 'execute') + def test_mkfs_with_label(self, execute_mock): + utils.mkfs('ext4', '/my/block/dev', 'ext4-vol') + utils.mkfs('msdos', '/my/msdos/block/dev', 'msdos-vol') + utils.mkfs('swap', '/my/swap/block/dev', 'swap-vol') + + expected = [mock.call('mkfs', '-t', 'ext4', '-F', '-L', 'ext4-vol', + '/my/block/dev', run_as_root=True, + use_standard_locale=True), + mock.call('mkfs', '-t', 'msdos', '-n', 'msdos-vol', + '/my/msdos/block/dev', run_as_root=True, + use_standard_locale=True), + mock.call('mkswap', '-L', 'swap-vol', + '/my/swap/block/dev', run_as_root=True, + use_standard_locale=True)] + self.assertEqual(expected, execute_mock.call_args_list) + + @mock.patch.object(utils, 'execute', + side_effect=processutils.ProcessExecutionError( + stderr=os.strerror(errno.ENOENT))) + def test_mkfs_with_unsupported_fs(self, execute_mock): + self.assertRaises(exception.FileSystemNotSupported, + utils.mkfs, 'foo', '/my/block/dev') + + @mock.patch.object(utils, 'execute', + side_effect=processutils.ProcessExecutionError( + stderr='fake')) + def test_mkfs_with_unexpected_error(self, execute_mock): + self.assertRaises(processutils.ProcessExecutionError, utils.mkfs, + 'ext4', '/my/block/dev', 'ext4-vol') + + +class IntLikeTestCase(base.TestCase): + + def test_is_int_like(self): + self.assertTrue(utils.is_int_like(1)) + self.assertTrue(utils.is_int_like("1")) + self.assertTrue(utils.is_int_like("514")) + self.assertTrue(utils.is_int_like("0")) + + self.assertFalse(utils.is_int_like(1.1)) + self.assertFalse(utils.is_int_like("1.1")) + self.assertFalse(utils.is_int_like("1.1.1")) + self.assertFalse(utils.is_int_like(None)) + self.assertFalse(utils.is_int_like("0.")) + self.assertFalse(utils.is_int_like("aaaaaa")) + self.assertFalse(utils.is_int_like("....")) + self.assertFalse(utils.is_int_like("1g")) + self.assertFalse( + utils.is_int_like("0cc3346e-9fef-4445-abe6-5d2b2690ec64")) + self.assertFalse(utils.is_int_like("a1")) + + +class UUIDTestCase(base.TestCase): + + def test_generate_uuid(self): + uuid_string = utils.generate_uuid() + self.assertIsInstance(uuid_string, str) + self.assertEqual(36, len(uuid_string)) + # make sure there are 4 dashes + self.assertEqual(32, len(uuid_string.replace('-', ''))) + + def test_is_uuid_like(self): + self.assertTrue(utils.is_uuid_like(str(uuid.uuid4()))) + + def test_id_is_uuid_like(self): + self.assertFalse(utils.is_uuid_like(1234567)) + + def test_name_is_uuid_like(self): + self.assertFalse(utils.is_uuid_like('zhongyueluo')) + + +class TempFilesTestCase(base.TestCase): + + def test_tempdir(self): + + dirname = None + with utils.tempdir() as tempdir: + self.assertTrue(os.path.isdir(tempdir)) + dirname = tempdir + self.assertFalse(os.path.exists(dirname)) + + @mock.patch.object(shutil, 'rmtree') + @mock.patch.object(tempfile, 'mkdtemp') + def test_tempdir_mocked(self, mkdtemp_mock, rmtree_mock): + + self.config(tempdir='abc') + mkdtemp_mock.return_value = 'temp-dir' + kwargs = {'a': 'b'} + + with utils.tempdir(**kwargs) as tempdir: + self.assertEqual('temp-dir', tempdir) + tempdir_created = tempdir + + mkdtemp_mock.assert_called_once_with(**kwargs) + rmtree_mock.assert_called_once_with(tempdir_created) + + @mock.patch.object(utils, 'LOG') + @mock.patch.object(shutil, 'rmtree') + @mock.patch.object(tempfile, 'mkdtemp') + def test_tempdir_mocked_error_on_rmtree(self, mkdtemp_mock, rmtree_mock, + log_mock): + + self.config(tempdir='abc') + mkdtemp_mock.return_value = 'temp-dir' + rmtree_mock.side_effect = OSError + + with utils.tempdir() as tempdir: + self.assertEqual('temp-dir', tempdir) + tempdir_created = tempdir + + rmtree_mock.assert_called_once_with(tempdir_created) + self.assertTrue(log_mock.error.called) \ No newline at end of file