From d8a3e64f23effd55da4c1938e1caf6cac6e4c18c Mon Sep 17 00:00:00 2001 From: Tom Cammann Date: Wed, 25 May 2016 11:13:43 +0100 Subject: [PATCH] Remove redundant utils code Magnum common/utils.py and object/utils.py contains many unnecessary functions which were first imported when Magnum was created/copied from Ironic. These redundant methods bloat the codebase and create a maintenance burden and as such should be removed. Change-Id: I02f198d90ffa408ad87e2e26ec82b165e92f28e8 --- magnum/common/utils.py | 321 ---------------------- magnum/objects/utils.py | 134 --------- magnum/tests/unit/common/test_utils.py | 304 -------------------- magnum/tests/unit/objects/test_objects.py | 66 ----- magnum/tests/unit/objects/utils.py | 73 +++++ 5 files changed, 73 insertions(+), 825 deletions(-) delete mode 100644 magnum/objects/utils.py diff --git a/magnum/common/utils.py b/magnum/common/utils.py index 6774846ec4..dc2682afc6 100644 --- a/magnum/common/utils.py +++ b/magnum/common/utils.py @@ -19,24 +19,18 @@ """Utilities and helper functions.""" import contextlib -import errno -import hashlib import os import random import re import shutil import tempfile -import netaddr from oslo_concurrency import processutils from oslo_config import cfg from oslo_log import log as logging -from oslo_utils import excutils -import paramiko import six from magnum.common import exception -from magnum.i18n import _ from magnum.i18n import _LE from magnum.i18n import _LW @@ -127,75 +121,6 @@ def trycmd(*args, **kwargs): return processutils.trycmd(*args, **kwargs) -def ssh_connect(connection): - """Method to connect to a remote system using ssh protocol. - - :param connection: a dict of connection parameters. - :returns: paramiko.SSHClient -- an active ssh connection. - :raises: SSHConnectFailed - - """ - try: - ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - key_contents = connection.get('key_contents') - if key_contents: - data = six.moves.StringIO(key_contents) - if "BEGIN RSA PRIVATE" in key_contents: - pkey = paramiko.RSAKey.from_private_key(data) - elif "BEGIN DSA PRIVATE" in key_contents: - pkey = paramiko.DSSKey.from_private_key(data) - else: - # Can't include the key contents - secure material. - raise ValueError(_("Invalid private key")) - else: - pkey = None - ssh.connect(connection.get('host'), - username=connection.get('username'), - password=connection.get('password'), - port=connection.get('port', 22), - pkey=pkey, - key_filename=connection.get('key_filename'), - timeout=connection.get('timeout', 10)) - - # send TCP keepalive packets every 20 seconds - ssh.get_transport().set_keepalive(20) - except Exception as e: - LOG.debug("SSH connect failed: %s", e) - raise exception.SSHConnectFailed(host=connection.get('host')) - - return ssh - - -def generate_uid(topic, size=8): - characters = '01234567890abcdefghijklmnopqrstuvwxyz' - choices = [random.choice(characters) for _x in range(size)] - return '%s-%s' % (topic, ''.join(choices)) - - -def random_alnum(size=32): - characters = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ' - return ''.join(random.choice(characters) for _ in range(size)) - - -def delete_if_exists(pathname): - """delete a file, but ignore file not found error.""" - - try: - os.unlink(pathname) - except OSError as e: - if e.errno == errno.ENOENT: - return - else: - raise - - -def is_valid_boolstr(val): - """Check if the provided string is a valid bool string or not.""" - boolstrs = ('true', 'false', 'yes', 'no', 'y', 'n', '1', '0') - return str(val).lower() in boolstrs - - def is_valid_mac(address): """Verify the format of a MAC address. @@ -228,135 +153,6 @@ def validate_and_normalize_mac(address): return address.lower() -def is_valid_ipv4(address): - """Verify that address represents a valid IPv4 address.""" - try: - return netaddr.valid_ipv4(address) - except Exception: - return False - - -def is_valid_ipv6(address): - try: - return netaddr.valid_ipv6(address) - except Exception: - return False - - -def is_valid_ipv6_cidr(address): - try: - str(netaddr.IPNetwork(address, version=6).cidr) - return True - except Exception: - return False - - -def get_shortened_ipv6(address): - addr = netaddr.IPAddress(address, version=6) - return str(addr.ipv6()) - - -def get_shortened_ipv6_cidr(address): - net = netaddr.IPNetwork(address, version=6) - return str(net.cidr) - - -def is_valid_cidr(address): - """Check if the provided ipv4 or ipv6 address is a valid CIDR address.""" - try: - # Validate the correct CIDR Address - netaddr.IPNetwork(address) - except netaddr.core.AddrFormatError: - return False - except UnboundLocalError: - # NOTE(MotoKen): work around bug in netaddr 0.7.5 (see detail in - # https://github.com/drkjam/netaddr/issues/2) - return False - - # Prior validation partially verify /xx part - # Verify it here - ip_segment = address.split('/') - - if (len(ip_segment) <= 1 or - ip_segment[1] == ''): - return False - - return True - - -def get_ip_version(network): - """Returns the IP version of a network (IPv4 or IPv6). - - :raises: AddrFormatError if invalid network. - """ - if netaddr.IPNetwork(network).version == 6: - return "IPv6" - elif netaddr.IPNetwork(network).version == 4: - return "IPv4" - - -def convert_to_list_dict(lst, label): - """Convert a value or list into a list of dicts.""" - if not lst: - return None - if not isinstance(lst, list): - lst = [lst] - return [{label: x} for x in lst] - - -def sanitize_hostname(hostname): - """Return a hostname which conforms to RFC-952 and RFC-1123 specs.""" - hostname = six.text_type(hostname) - - hostname = re.sub('[ _]', '-', hostname) - hostname = re.sub('[^a-zA-Z0-9_.-]+', '', hostname) - hostname = hostname.lower() - hostname = hostname.strip('.-') - - return hostname - - -def read_cached_file(filename, cache_info, reload_func=None): - """Read from a file if it has been modified. - - :param cache_info: dictionary to hold opaque cache. - :param reload_func: optional function to be called with data when - file is reloaded due to a modification. - - :returns: data from file - - """ - mtime = os.path.getmtime(filename) - if not cache_info or mtime != cache_info.get('mtime'): - LOG.debug("Reloading cached file %s", filename) - with open(filename) as fap: - cache_info['data'] = fap.read() - cache_info['mtime'] = mtime - if reload_func: - reload_func(cache_info['data']) - return cache_info['data'] - - -def file_open(*args, **kwargs): - """Open file - - see built-in file() documentation for more details - - Note: The reason this is kept in a separate module is to easily - be able to provide a stub module that doesn't alter system - state at all (for unit tests) - """ - return file(*args, **kwargs) - - -def hash_file(file_like_object): - """Generate a hash for the contents of a file.""" - checksum = hashlib.sha1() - for chunk in iter(lambda: six.b(file_like_object.read(32768)), b''): - checksum.update(chunk) - return checksum.hexdigest() - - @contextlib.contextmanager def tempdir(**kwargs): tempfile.tempdir = CONF.tempdir @@ -370,54 +166,6 @@ def tempdir(**kwargs): LOG.error(_LE('Could not remove tmpdir: %s'), e) -def mkfs(fs, path, label=None): - """Format a file or block device - - :param fs: Filesystem type (examples include 'swap', 'ext3', 'ext4' - 'btrfs', etc.) - :param path: Path to file or block device to format - :param label: Volume label to use - """ - if fs == 'swap': - args = ['mkswap'] - else: - args = ['mkfs', '-t', fs] - # add -F to force no interactive execute on non-block device. - if fs in ('ext3', 'ext4'): - args.extend(['-F']) - if label: - if fs in ('msdos', 'vfat'): - label_opt = '-n' - else: - label_opt = '-L' - args.extend([label_opt, label]) - args.append(path) - try: - execute(*args, run_as_root=True, use_standard_locale=True) - except processutils.ProcessExecutionError as e: - with excutils.save_and_reraise_exception() as ctx: - if os.strerror(errno.ENOENT) in e.stderr: - ctx.reraise = False - LOG.exception(_LE('Failed to make file system. ' - 'File system %s is not supported.'), fs) - raise exception.FileSystemNotSupported(fs=fs) - else: - LOG.exception(_LE('Failed to create a file system ' - 'in %(path)s. Error: %(error)s'), - {'path': path, 'error': e}) - - -def unlink_without_raise(path): - try: - os.unlink(path) - except OSError as e: - if e.errno == errno.ENOENT: - return - else: - LOG.warning(_LW("Failed to unlink %(path)s, error: %(e)s"), - {'path': path, 'e': e}) - - def rmtree_without_raise(path): try: if os.path.isdir(path): @@ -427,23 +175,6 @@ def rmtree_without_raise(path): {'path': path, 'e': e}) -def write_to_file(path, contents): - with open(path, 'w') as f: - f.write(contents) - - -def create_link_without_raise(source, link): - try: - os.symlink(source, link) - except OSError as e: - if e.errno == errno.EEXIST: - return - else: - LOG.warning(_LW("Failed to create symlink from %(source)s to " - "%(link)s, error: %(e)s"), - {'source': source, 'link': link, 'e': e}) - - def safe_rstrip(value, chars=None): """Removes trailing characters from a string if that does not make it empty @@ -462,47 +193,6 @@ def safe_rstrip(value, chars=None): return value.rstrip(chars) or value -def mount(src, dest, *args): - """Mounts a device/image file on specified location. - - :param src: the path to the source file for mounting - :param dest: the path where it needs to be mounted. - :param args: a tuple containing the arguments to be - passed to mount command. - :raises: processutils.ProcessExecutionError if it failed - to run the process. - """ - args = ('mount', ) + args + (src, dest) - execute(*args, run_as_root=True, check_exit_code=[0]) - - -def umount(loc, *args): - """Umounts a mounted location. - - :param loc: the path to be unmounted. - :param args: a tuple containing the argumnets to be - passed to the umount command. - :raises: processutils.ProcessExecutionError if it failed - to run the process. - """ - args = ('umount', ) + args + (loc, ) - execute(*args, run_as_root=True, check_exit_code=[0]) - - -def dd(src, dst, *args): - """Execute dd from src to dst. - - :param src: the input file for dd command. - :param dst: the output file for dd command. - :param args: a tuple containing the arguments to be - passed to dd command. - :raises: processutils.ProcessExecutionError if it failed - to run the process. - """ - execute('dd', 'if=%s' % src, 'of=%s' % dst, *args, - run_as_root=True, check_exit_code=[0]) - - def is_name_safe(name): """Checks whether the name is valid or not. @@ -519,17 +209,6 @@ def is_name_safe(name): return True -def raise_exception_invalid_scheme(url): - valid_schemes = ['http', 'https'] - - if not isinstance(url, six.string_types): - raise exception.Urllib2InvalidScheme(url=url) - - scheme = url.split(':')[0] - if scheme not in valid_schemes: - raise exception.Urllib2InvalidScheme(url=url) - - def get_k8s_quantity(quantity): """This function is used to get k8s quantity. diff --git a/magnum/objects/utils.py b/magnum/objects/utils.py deleted file mode 100644 index 3e75797430..0000000000 --- a/magnum/objects/utils.py +++ /dev/null @@ -1,134 +0,0 @@ -# Copyright 2013 IBM Corp. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Utility methods for objects""" - -import ast -import datetime - -import iso8601 -import netaddr -from oslo_utils import timeutils -import six - -from magnum.i18n import _ - - -def datetime_or_none(dt): - """Validate a datetime or None value.""" - if dt is None: - return None - elif isinstance(dt, datetime.datetime): - if dt.utcoffset() is None: - # NOTE(danms): Legacy objects from sqlalchemy are stored in UTC, - # but are returned without a timezone attached. - # As a transitional aid, assume a tz-naive object is in UTC. - return dt.replace(tzinfo=iso8601.iso8601.Utc()) - else: - return dt - raise ValueError(_("A datetime.datetime is required here")) - - -def datetime_or_str_or_none(val): - if isinstance(val, six.string_types): - return timeutils.parse_isotime(val) - return datetime_or_none(val) - - -def int_or_none(val): - """Attempt to parse an integer value, or None.""" - if val is None: - return val - else: - return int(val) - - -def str_or_none(val): - """Attempt to stringify a value to unicode, or None.""" - if val is None: - return val - else: - return six.text_type(val) - - -def dict_or_none(val): - """Attempt to dictify a value, or None.""" - if val is None: - return {} - elif isinstance(val, six.string_types): - return dict(ast.literal_eval(val)) - else: - try: - return dict(val) - except ValueError: - return {} - - -def list_or_none(val): - """Attempt to listify a value, or None.""" - if val is None: - return [] - elif isinstance(val, six.string_types): - return list(ast.literal_eval(val)) - else: - try: - return list(val) - except ValueError: - return [] - - -def ip_or_none(version): - """Return a version-specific IP address validator.""" - def validator(val, version=version): - if val is None: - return val - else: - return netaddr.IPAddress(val, version=version) - return validator - - -def nested_object_or_none(objclass): - def validator(val, objclass=objclass): - if val is None or isinstance(val, objclass): - return val - raise ValueError(_("An object of class %s is required here") - % objclass) - return validator - - -def dt_serializer(name): - """Return a datetime serializer for a named attribute.""" - def serializer(self, name=name): - if getattr(self, name) is not None: - return datetime.datetime.isoformat(getattr(self, name)) - else: - return None - return serializer - - -def dt_deserializer(instance, val): - """A deserializer method for datetime attributes.""" - if val is None: - return None - else: - return timeutils.parse_isotime(val) - - -def obj_serializer(name): - def serializer(self, name=name): - if getattr(self, name) is not None: - return getattr(self, name).obj_to_primitive() - else: - return None - return serializer diff --git a/magnum/tests/unit/common/test_utils.py b/magnum/tests/unit/common/test_utils.py index 5438d1d6e2..7ff2f6d859 100644 --- a/magnum/tests/unit/common/test_utils.py +++ b/magnum/tests/unit/common/test_utils.py @@ -14,17 +14,13 @@ # under the License. import errno -import hashlib import os import os.path import shutil import tempfile import mock -import netaddr from oslo_concurrency import processutils -import six -import six.moves.builtins as __builtin__ from magnum.common import exception from magnum.common import utils @@ -33,95 +29,6 @@ from magnum.tests import base class UtilsTestCase(base.TestCase): - def test_random_alnum(self): - s = utils.random_alnum(10) - self.assertEqual(10, len(s)) - s = utils.random_alnum(100) - self.assertEqual(100, len(s)) - - def test_unlink(self): - with mock.patch.object(os, "unlink") as unlink_mock: - unlink_mock.return_value = None - utils.unlink_without_raise("/fake/path") - unlink_mock.assert_called_once_with("/fake/path") - - def test_unlink_ENOENT(self): - with mock.patch.object(os, "unlink") as unlink_mock: - unlink_mock.side_effect = OSError(errno.ENOENT) - utils.unlink_without_raise("/fake/path") - unlink_mock.assert_called_once_with("/fake/path") - - def test_create_link(self): - with mock.patch.object(os, "symlink") as symlink_mock: - symlink_mock.return_value = None - utils.create_link_without_raise("/fake/source", "/fake/link") - symlink_mock.assert_called_once_with("/fake/source", "/fake/link") - - def test_create_link_EEXIST(self): - with mock.patch.object(os, "symlink") as symlink_mock: - symlink_mock.side_effect = OSError(errno.EEXIST) - utils.create_link_without_raise("/fake/source", "/fake/link") - symlink_mock.assert_called_once_with("/fake/source", "/fake/link") - - def test_generate_uid(self): - topic = 'test' - size = 8 - s = utils.generate_uid(topic) - self.assertEqual(len(topic) + size + 1, len(s)) - self.assertEqual(topic, s[:len(topic)]) - size = 22 - s = utils.generate_uid(topic, size) - self.assertEqual(len(topic) + size + 1, len(s)) - - def test_valid_ipv4(self): - self.assertTrue(utils.is_valid_ipv4('10.0.0.1')) - self.assertTrue(utils.is_valid_ipv4('255.255.255.255')) - - def test_invalid_ipv4(self): - self.assertFalse(utils.is_valid_ipv4('')) - self.assertFalse(utils.is_valid_ipv4('x.x.x.x')) - self.assertFalse(utils.is_valid_ipv4('256.256.256.256')) - self.assertFalse(utils.is_valid_ipv4( - 'AA42:0000:0000:0000:0202:B3FF:FE1E:8329')) - - def test_valid_ipv6(self): - self.assertTrue(utils.is_valid_ipv6( - 'AA42:0000:0000:0000:0202:B3FF:FE1E:8329')) - self.assertTrue(utils.is_valid_ipv6( - 'AA42::0202:B3FF:FE1E:8329')) - - def test_invalid_ipv6(self): - self.assertFalse(utils.is_valid_ipv6('')) - self.assertFalse(utils.is_valid_ipv6('10.0.0.1')) - self.assertFalse(utils.is_valid_ipv6('AA42::0202:B3FF:FE1E:')) - - def test_valid_cidr(self): - self.assertTrue(utils.is_valid_cidr('10.0.0.0/24')) - self.assertTrue(utils.is_valid_cidr('10.0.0.1/32')) - self.assertTrue(utils.is_valid_cidr('0.0.0.0/0')) - - def test_invalid_cidr(self): - self.assertFalse(utils.is_valid_cidr('10.0.0.1')) - self.assertFalse(utils.is_valid_cidr('10.0.0.1/33')) - - def test_valid_network(self): - self.assertEqual('IPv4', utils.get_ip_version('10.0.0.1')) - self.assertEqual('IPv6', utils.get_ip_version( - 'AA42:0000:0000:0000:0202:B3FF:FE1E:8329')) - - def test_invalid_network(self): - self.assertRaises(netaddr.core.AddrFormatError, - utils.get_ip_version, 'x.x.x.x') - - def test_convert_to_list_dict(self): - self.assertIsNone(utils.convert_to_list_dict(None, 'fred')) - self.assertIsNone(utils.convert_to_list_dict('', 'fred')) - self.assertEqual([{'fred': 'list'}], - utils.convert_to_list_dict('list', 'fred')) - self.assertEqual([{'fred': 'first'}, {'fred': 'second'}], - utils.convert_to_list_dict(['first', 'second'], - 'fred')) - def test_get_k8s_quantity(self): self.assertEqual(1024000.0, utils.get_k8s_quantity('1000Ki')) self.assertEqual(0.001, utils.get_k8s_quantity('1E-3')) @@ -276,134 +183,6 @@ grep foo utils.execute('foo', run_as_root=False) execute_mock.assert_called_once_with('foo', run_as_root=False) - -class GenericUtilsTestCase(base.TestCase): - def test_hostname_unicode_sanitization(self): - hostname = u"\u7684.test.example.com" - self.assertEqual("test.example.com", - utils.sanitize_hostname(hostname)) - - def test_hostname_sanitize_periods(self): - hostname = "....test.example.com..." - self.assertEqual("test.example.com", - utils.sanitize_hostname(hostname)) - - def test_hostname_sanitize_dashes(self): - hostname = "----test.example.com---" - self.assertEqual("test.example.com", - utils.sanitize_hostname(hostname)) - - def test_hostname_sanitize_characters(self): - hostname = "(#@&$!(@*--#&91)(__=+--test-host.example!!.com-0+" - self.assertEqual("91----test-host.example.com-0", - utils.sanitize_hostname(hostname)) - - def test_hostname_translate(self): - hostname = "<}\x1fh\x10e\x08l\x02l\x05o\x12!{>" - self.assertEqual("hello", utils.sanitize_hostname(hostname)) - - def test_read_cached_file(self): - with mock.patch.object(os.path, "getmtime") as getmtime_mock: - getmtime_mock.return_value = 1 - - cache_data = {"data": 1123, "mtime": 1} - data = utils.read_cached_file("/this/is/a/fake", cache_data) - self.assertEqual(cache_data["data"], data) - getmtime_mock.assert_called_once_with(mock.ANY) - - def test_read_modified_cached_file(self): - with mock.patch.object(os.path, "getmtime") as getmtime_mock: - with mock.patch.object(__builtin__, 'open') as open_mock: - getmtime_mock.return_value = 2 - fake_contents = "lorem ipsum" - fake_file = mock.Mock() - fake_file.read.return_value = fake_contents - fake_context_manager = mock.MagicMock() - fake_context_manager.__enter__.return_value = fake_file - fake_context_manager.__exit__.return_value = None - open_mock.return_value = fake_context_manager - - cache_data = {"data": 1123, "mtime": 1} - self.reload_called = False - - def test_reload(reloaded_data): - self.assertEqual(fake_contents, reloaded_data) - self.reload_called = True - - data = utils.read_cached_file("/this/is/a/fake", - cache_data, - reload_func=test_reload) - - self.assertEqual(fake_contents, data) - self.assertTrue(self.reload_called) - getmtime_mock.assert_called_once_with(mock.ANY) - open_mock.assert_called_once_with(mock.ANY) - fake_file.read.assert_called_once_with() - fake_context_manager.__exit__.assert_called_once_with(mock.ANY, - mock.ANY, - mock.ANY) - fake_context_manager.__enter__.assert_called_once_with() - - def test_hash_file(self): - data = 'Mary had a little lamb, its fleece as white as snow' - flo = six.StringIO(data) - h1 = utils.hash_file(flo) - h2 = hashlib.sha1(six.b(data)).hexdigest() - self.assertEqual(h1, h2) - - def test_is_valid_boolstr(self): - self.assertTrue(utils.is_valid_boolstr('true')) - self.assertTrue(utils.is_valid_boolstr('false')) - self.assertTrue(utils.is_valid_boolstr('yes')) - self.assertTrue(utils.is_valid_boolstr('no')) - self.assertTrue(utils.is_valid_boolstr('y')) - self.assertTrue(utils.is_valid_boolstr('n')) - self.assertTrue(utils.is_valid_boolstr('1')) - self.assertTrue(utils.is_valid_boolstr('0')) - - self.assertFalse(utils.is_valid_boolstr('maybe')) - self.assertFalse(utils.is_valid_boolstr('only on tuesdays')) - - def test_is_valid_ipv6_cidr(self): - self.assertTrue(utils.is_valid_ipv6_cidr("2600::/64")) - self.assertTrue(utils.is_valid_ipv6_cidr( - "abcd:ef01:2345:6789:abcd:ef01:192.168.254.254/48")) - self.assertTrue(utils.is_valid_ipv6_cidr( - "0000:0000:0000:0000:0000:0000:0000:0001/32")) - self.assertTrue(utils.is_valid_ipv6_cidr( - "0000:0000:0000:0000:0000:0000:0000:0001")) - self.assertFalse(utils.is_valid_ipv6_cidr("foo")) - self.assertFalse(utils.is_valid_ipv6_cidr("127.0.0.1")) - - def test_get_shortened_ipv6(self): - self.assertEqual("abcd:ef01:2345:6789:abcd:ef01:c0a8:fefe", - utils.get_shortened_ipv6( - "abcd:ef01:2345:6789:abcd:ef01:192.168.254.254")) - self.assertEqual("::1", - utils.get_shortened_ipv6( - "0000:0000:0000:0000:0000:0000:0000:0001")) - self.assertEqual("caca::caca:0:babe:201:102", - utils.get_shortened_ipv6( - "caca:0000:0000:caca:0000:babe:0201:0102")) - self.assertRaises(netaddr.AddrFormatError, utils.get_shortened_ipv6, - "127.0.0.1") - self.assertRaises(netaddr.AddrFormatError, utils.get_shortened_ipv6, - "failure") - - def test_get_shortened_ipv6_cidr(self): - self.assertEqual("2600::/64", - utils.get_shortened_ipv6_cidr( - "2600:0000:0000:0000:0000:0000:0000:0000/64")) - self.assertEqual("2600::/64", - utils.get_shortened_ipv6_cidr( - "2600::1/64")) - self.assertRaises(netaddr.AddrFormatError, - utils.get_shortened_ipv6_cidr, - "127.0.0.1") - self.assertRaises(netaddr.AddrFormatError, - utils.get_shortened_ipv6_cidr, - "failure") - def test_is_valid_mac(self): self.assertTrue(utils.is_valid_mac("52:54:00:cf:2d:31")) self.assertTrue(utils.is_valid_mac(u"52:54:00:cf:2d:31")) @@ -446,57 +225,6 @@ class GenericUtilsTestCase(base.TestCase): self.assertEqual(value, utils.safe_rstrip(value)) -class MkfsTestCase(base.TestCase): - - @mock.patch.object(utils, 'execute') - def test_mkfs(self, execute_mock): - utils.mkfs('ext4', '/my/block/dev') - utils.mkfs('msdos', '/my/msdos/block/dev') - utils.mkfs('swap', '/my/swap/block/dev') - - expected = [mock.call('mkfs', '-t', 'ext4', '-F', '/my/block/dev', - run_as_root=True, - use_standard_locale=True), - mock.call('mkfs', '-t', 'msdos', '/my/msdos/block/dev', - run_as_root=True, - use_standard_locale=True), - mock.call('mkswap', '/my/swap/block/dev', - run_as_root=True, - use_standard_locale=True)] - self.assertEqual(expected, execute_mock.call_args_list) - - @mock.patch.object(utils, 'execute') - def test_mkfs_with_label(self, execute_mock): - utils.mkfs('ext4', '/my/block/dev', 'ext4-vol') - utils.mkfs('msdos', '/my/msdos/block/dev', 'msdos-vol') - utils.mkfs('swap', '/my/swap/block/dev', 'swap-vol') - - expected = [mock.call('mkfs', '-t', 'ext4', '-F', '-L', 'ext4-vol', - '/my/block/dev', run_as_root=True, - use_standard_locale=True), - mock.call('mkfs', '-t', 'msdos', '-n', 'msdos-vol', - '/my/msdos/block/dev', run_as_root=True, - use_standard_locale=True), - mock.call('mkswap', '-L', 'swap-vol', - '/my/swap/block/dev', run_as_root=True, - use_standard_locale=True)] - self.assertEqual(expected, execute_mock.call_args_list) - - @mock.patch.object(utils, 'execute', - side_effect=processutils.ProcessExecutionError( - stderr=os.strerror(errno.ENOENT))) - def test_mkfs_with_unsupported_fs(self, execute_mock): - self.assertRaises(exception.FileSystemNotSupported, - utils.mkfs, 'foo', '/my/block/dev') - - @mock.patch.object(utils, 'execute', - side_effect=processutils.ProcessExecutionError( - stderr='fake')) - def test_mkfs_with_unexpected_error(self, execute_mock): - self.assertRaises(processutils.ProcessExecutionError, utils.mkfs, - 'ext4', '/my/block/dev', 'ext4-vol') - - class TempFilesTestCase(base.TestCase): def test_tempdir(self): @@ -540,38 +268,6 @@ class TempFilesTestCase(base.TestCase): self.assertTrue(log_mock.error.called) -class Urllib2_invalid_scheme(base.TestCase): - def test_raise_exception_invalid_scheme_file(self): - self.assertRaises( - exception.Urllib2InvalidScheme, - utils.raise_exception_invalid_scheme, - 'file:///etc/passwd') - - def test_raise_exception_invalid_scheme_starting_colon(self): - self.assertRaises( - exception.Urllib2InvalidScheme, - utils.raise_exception_invalid_scheme, - ':///etc/passwd') - - def test_raise_exception_invalid_scheme_None(self): - self.assertRaises( - exception.Urllib2InvalidScheme, - utils.raise_exception_invalid_scheme, - None) - - def test_raise_exception_invalid_scheme_empty_string(self): - self.assertRaises( - exception.Urllib2InvalidScheme, - utils.raise_exception_invalid_scheme, - '') - - def test_raise_exception_invalid_scheme_http(self): - utils.raise_exception_invalid_scheme(url='http://www.openstack.org') - - def test_raise_exception_invalid_scheme_https(self): - utils.raise_exception_invalid_scheme(url='https://www.openstack.org') - - class GeneratePasswordTestCase(base.TestCase): def test_generate_password(self): password = utils.generate_password(length=12) diff --git a/magnum/tests/unit/objects/test_objects.py b/magnum/tests/unit/objects/test_objects.py index 3a43e9ce34..4405834397 100644 --- a/magnum/tests/unit/objects/test_objects.py +++ b/magnum/tests/unit/objects/test_objects.py @@ -15,17 +15,13 @@ import datetime import gettext -import iso8601 import mock -import netaddr -from oslo_utils import timeutils from oslo_versionedobjects import fields from oslo_versionedobjects import fixture from magnum.common import context as magnum_context from magnum.common import exception from magnum.objects import base -from magnum.objects import utils from magnum.tests import base as test_base gettext.install('magnum') @@ -93,68 +89,6 @@ class TestSubclassedObject(MyObj): fields = {'new_field': fields.StringField()} -class TestUtils(test_base.TestCase): - - def test_datetime_or_none(self): - naive_dt = timeutils.utcnow() - dt = timeutils.parse_isotime(datetime.datetime.isoformat(naive_dt)) - self.assertEqual(dt, utils.datetime_or_none(dt)) - self.assertEqual(naive_dt.replace(tzinfo=iso8601.iso8601.Utc()), - utils.datetime_or_none(dt)) - self.assertIsNone(utils.datetime_or_none(None)) - self.assertRaises(ValueError, utils.datetime_or_none, 'foo') - - def test_datetime_or_str_or_none(self): - dts = datetime.datetime.isoformat(timeutils.utcnow()) - dt = timeutils.parse_isotime(dts) - self.assertEqual(dt, utils.datetime_or_str_or_none(dt)) - self.assertIsNone(utils.datetime_or_str_or_none(None)) - self.assertEqual(dt, utils.datetime_or_str_or_none(dts)) - self.assertRaises(ValueError, utils.datetime_or_str_or_none, 'foo') - - def test_int_or_none(self): - self.assertEqual(1, utils.int_or_none(1)) - self.assertEqual(1, utils.int_or_none('1')) - self.assertIsNone(utils.int_or_none(None)) - self.assertRaises(ValueError, utils.int_or_none, 'foo') - - def test_str_or_none(self): - class Obj(object): - pass - self.assertEqual('foo', utils.str_or_none('foo')) - self.assertEqual('1', utils.str_or_none(1)) - self.assertIsNone(utils.str_or_none(None)) - - def test_ip_or_none(self): - ip4 = netaddr.IPAddress('1.2.3.4', 4) - ip6 = netaddr.IPAddress('1::2', 6) - self.assertEqual(ip4, utils.ip_or_none(4)('1.2.3.4')) - self.assertEqual(ip6, utils.ip_or_none(6)('1::2')) - self.assertIsNone(utils.ip_or_none(4)(None)) - self.assertIsNone(utils.ip_or_none(6)(None)) - self.assertRaises(netaddr.AddrFormatError, utils.ip_or_none(4), 'foo') - self.assertRaises(netaddr.AddrFormatError, utils.ip_or_none(6), 'foo') - - def test_dt_serializer(self): - class Obj(object): - foo = utils.dt_serializer('bar') - - obj = Obj() - obj.bar = timeutils.parse_isotime('1955-11-05T00:00:00Z') - self.assertEqual('1955-11-05T00:00:00+00:00', obj.foo()) - obj.bar = None - self.assertIsNone(obj.foo()) - obj.bar = 'foo' - self.assertRaises(TypeError, obj.foo) - - def test_dt_deserializer(self): - dt = timeutils.parse_isotime('1955-11-05T00:00:00Z') - self.assertEqual(dt, utils.dt_deserializer(None, - datetime.datetime.isoformat(dt))) - self.assertIsNone(utils.dt_deserializer(None, None)) - self.assertRaises(ValueError, utils.dt_deserializer, None, 'foo') - - class _TestObject(object): def test_hydration_type_error(self): primitive = {'magnum_object.name': 'MyObj', diff --git a/magnum/tests/unit/objects/utils.py b/magnum/tests/unit/objects/utils.py index 381d1bc97b..0918c6ecbb 100644 --- a/magnum/tests/unit/objects/utils.py +++ b/magnum/tests/unit/objects/utils.py @@ -15,7 +15,15 @@ """Magnum object test utilities.""" +import datetime + +import iso8601 +import netaddr +from oslo_utils import timeutils +import six + from magnum.common import exception +from magnum.i18n import _ from magnum import objects from magnum.tests.unit.db import utils as db_utils @@ -222,3 +230,68 @@ def get_test_container(context, **kw): for key in db_container: setattr(container, key, db_container[key]) return container + + +def datetime_or_none(dt): + """Validate a datetime or None value.""" + if dt is None: + return None + elif isinstance(dt, datetime.datetime): + if dt.utcoffset() is None: + # NOTE(danms): Legacy objects from sqlalchemy are stored in UTC, + # but are returned without a timezone attached. + # As a transitional aid, assume a tz-naive object is in UTC. + return dt.replace(tzinfo=iso8601.iso8601.Utc()) + else: + return dt + raise ValueError(_("A datetime.datetime is required here")) + + +def datetime_or_str_or_none(val): + if isinstance(val, six.string_types): + return timeutils.parse_isotime(val) + return datetime_or_none(val) + + +def int_or_none(val): + """Attempt to parse an integer value, or None.""" + if val is None: + return val + else: + return int(val) + + +def str_or_none(val): + """Attempt to stringify a value to unicode, or None.""" + if val is None: + return val + else: + return six.text_type(val) + + +def ip_or_none(version): + """Return a version-specific IP address validator.""" + def validator(val, version=version): + if val is None: + return val + else: + return netaddr.IPAddress(val, version=version) + return validator + + +def dt_serializer(name): + """Return a datetime serializer for a named attribute.""" + def serializer(self, name=name): + if getattr(self, name) is not None: + return datetime.datetime.isoformat(getattr(self, name)) + else: + return None + return serializer + + +def dt_deserializer(instance, val): + """A deserializer method for datetime attributes.""" + if val is None: + return None + else: + return timeutils.parse_isotime(val)