Remove redundant utils code

Magnum common/utils.py and object/utils.py contains many unnecessary
functions which were first imported when Magnum was created/copied from
Ironic. These redundant methods bloat the codebase and create a
maintenance burden and as such should be removed.

Change-Id: I02f198d90ffa408ad87e2e26ec82b165e92f28e8
This commit is contained in:
Tom Cammann 2016-05-25 11:13:43 +01:00
parent 988593e11f
commit d8a3e64f23
5 changed files with 73 additions and 825 deletions

View File

@ -19,24 +19,18 @@
"""Utilities and helper functions."""
import contextlib
import errno
import hashlib
import os
import random
import re
import shutil
import tempfile
import netaddr
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
import paramiko
import six
from magnum.common import exception
from magnum.i18n import _
from magnum.i18n import _LE
from magnum.i18n import _LW
@ -127,75 +121,6 @@ def trycmd(*args, **kwargs):
return processutils.trycmd(*args, **kwargs)
def ssh_connect(connection):
"""Method to connect to a remote system using ssh protocol.
:param connection: a dict of connection parameters.
:returns: paramiko.SSHClient -- an active ssh connection.
:raises: SSHConnectFailed
"""
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
key_contents = connection.get('key_contents')
if key_contents:
data = six.moves.StringIO(key_contents)
if "BEGIN RSA PRIVATE" in key_contents:
pkey = paramiko.RSAKey.from_private_key(data)
elif "BEGIN DSA PRIVATE" in key_contents:
pkey = paramiko.DSSKey.from_private_key(data)
else:
# Can't include the key contents - secure material.
raise ValueError(_("Invalid private key"))
else:
pkey = None
ssh.connect(connection.get('host'),
username=connection.get('username'),
password=connection.get('password'),
port=connection.get('port', 22),
pkey=pkey,
key_filename=connection.get('key_filename'),
timeout=connection.get('timeout', 10))
# send TCP keepalive packets every 20 seconds
ssh.get_transport().set_keepalive(20)
except Exception as e:
LOG.debug("SSH connect failed: %s", e)
raise exception.SSHConnectFailed(host=connection.get('host'))
return ssh
def generate_uid(topic, size=8):
characters = '01234567890abcdefghijklmnopqrstuvwxyz'
choices = [random.choice(characters) for _x in range(size)]
return '%s-%s' % (topic, ''.join(choices))
def random_alnum(size=32):
characters = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'
return ''.join(random.choice(characters) for _ in range(size))
def delete_if_exists(pathname):
"""delete a file, but ignore file not found error."""
try:
os.unlink(pathname)
except OSError as e:
if e.errno == errno.ENOENT:
return
else:
raise
def is_valid_boolstr(val):
"""Check if the provided string is a valid bool string or not."""
boolstrs = ('true', 'false', 'yes', 'no', 'y', 'n', '1', '0')
return str(val).lower() in boolstrs
def is_valid_mac(address):
"""Verify the format of a MAC address.
@ -228,135 +153,6 @@ def validate_and_normalize_mac(address):
return address.lower()
def is_valid_ipv4(address):
"""Verify that address represents a valid IPv4 address."""
try:
return netaddr.valid_ipv4(address)
except Exception:
return False
def is_valid_ipv6(address):
try:
return netaddr.valid_ipv6(address)
except Exception:
return False
def is_valid_ipv6_cidr(address):
try:
str(netaddr.IPNetwork(address, version=6).cidr)
return True
except Exception:
return False
def get_shortened_ipv6(address):
addr = netaddr.IPAddress(address, version=6)
return str(addr.ipv6())
def get_shortened_ipv6_cidr(address):
net = netaddr.IPNetwork(address, version=6)
return str(net.cidr)
def is_valid_cidr(address):
"""Check if the provided ipv4 or ipv6 address is a valid CIDR address."""
try:
# Validate the correct CIDR Address
netaddr.IPNetwork(address)
except netaddr.core.AddrFormatError:
return False
except UnboundLocalError:
# NOTE(MotoKen): work around bug in netaddr 0.7.5 (see detail in
# https://github.com/drkjam/netaddr/issues/2)
return False
# Prior validation partially verify /xx part
# Verify it here
ip_segment = address.split('/')
if (len(ip_segment) <= 1 or
ip_segment[1] == ''):
return False
return True
def get_ip_version(network):
"""Returns the IP version of a network (IPv4 or IPv6).
:raises: AddrFormatError if invalid network.
"""
if netaddr.IPNetwork(network).version == 6:
return "IPv6"
elif netaddr.IPNetwork(network).version == 4:
return "IPv4"
def convert_to_list_dict(lst, label):
"""Convert a value or list into a list of dicts."""
if not lst:
return None
if not isinstance(lst, list):
lst = [lst]
return [{label: x} for x in lst]
def sanitize_hostname(hostname):
"""Return a hostname which conforms to RFC-952 and RFC-1123 specs."""
hostname = six.text_type(hostname)
hostname = re.sub('[ _]', '-', hostname)
hostname = re.sub('[^a-zA-Z0-9_.-]+', '', hostname)
hostname = hostname.lower()
hostname = hostname.strip('.-')
return hostname
def read_cached_file(filename, cache_info, reload_func=None):
"""Read from a file if it has been modified.
:param cache_info: dictionary to hold opaque cache.
:param reload_func: optional function to be called with data when
file is reloaded due to a modification.
:returns: data from file
"""
mtime = os.path.getmtime(filename)
if not cache_info or mtime != cache_info.get('mtime'):
LOG.debug("Reloading cached file %s", filename)
with open(filename) as fap:
cache_info['data'] = fap.read()
cache_info['mtime'] = mtime
if reload_func:
reload_func(cache_info['data'])
return cache_info['data']
def file_open(*args, **kwargs):
"""Open file
see built-in file() documentation for more details
Note: The reason this is kept in a separate module is to easily
be able to provide a stub module that doesn't alter system
state at all (for unit tests)
"""
return file(*args, **kwargs)
def hash_file(file_like_object):
"""Generate a hash for the contents of a file."""
checksum = hashlib.sha1()
for chunk in iter(lambda: six.b(file_like_object.read(32768)), b''):
checksum.update(chunk)
return checksum.hexdigest()
@contextlib.contextmanager
def tempdir(**kwargs):
tempfile.tempdir = CONF.tempdir
@ -370,54 +166,6 @@ def tempdir(**kwargs):
LOG.error(_LE('Could not remove tmpdir: %s'), e)
def mkfs(fs, path, label=None):
"""Format a file or block device
:param fs: Filesystem type (examples include 'swap', 'ext3', 'ext4'
'btrfs', etc.)
:param path: Path to file or block device to format
:param label: Volume label to use
"""
if fs == 'swap':
args = ['mkswap']
else:
args = ['mkfs', '-t', fs]
# add -F to force no interactive execute on non-block device.
if fs in ('ext3', 'ext4'):
args.extend(['-F'])
if label:
if fs in ('msdos', 'vfat'):
label_opt = '-n'
else:
label_opt = '-L'
args.extend([label_opt, label])
args.append(path)
try:
execute(*args, run_as_root=True, use_standard_locale=True)
except processutils.ProcessExecutionError as e:
with excutils.save_and_reraise_exception() as ctx:
if os.strerror(errno.ENOENT) in e.stderr:
ctx.reraise = False
LOG.exception(_LE('Failed to make file system. '
'File system %s is not supported.'), fs)
raise exception.FileSystemNotSupported(fs=fs)
else:
LOG.exception(_LE('Failed to create a file system '
'in %(path)s. Error: %(error)s'),
{'path': path, 'error': e})
def unlink_without_raise(path):
try:
os.unlink(path)
except OSError as e:
if e.errno == errno.ENOENT:
return
else:
LOG.warning(_LW("Failed to unlink %(path)s, error: %(e)s"),
{'path': path, 'e': e})
def rmtree_without_raise(path):
try:
if os.path.isdir(path):
@ -427,23 +175,6 @@ def rmtree_without_raise(path):
{'path': path, 'e': e})
def write_to_file(path, contents):
with open(path, 'w') as f:
f.write(contents)
def create_link_without_raise(source, link):
try:
os.symlink(source, link)
except OSError as e:
if e.errno == errno.EEXIST:
return
else:
LOG.warning(_LW("Failed to create symlink from %(source)s to "
"%(link)s, error: %(e)s"),
{'source': source, 'link': link, 'e': e})
def safe_rstrip(value, chars=None):
"""Removes trailing characters from a string if that does not make it empty
@ -462,47 +193,6 @@ def safe_rstrip(value, chars=None):
return value.rstrip(chars) or value
def mount(src, dest, *args):
"""Mounts a device/image file on specified location.
:param src: the path to the source file for mounting
:param dest: the path where it needs to be mounted.
:param args: a tuple containing the arguments to be
passed to mount command.
:raises: processutils.ProcessExecutionError if it failed
to run the process.
"""
args = ('mount', ) + args + (src, dest)
execute(*args, run_as_root=True, check_exit_code=[0])
def umount(loc, *args):
"""Umounts a mounted location.
:param loc: the path to be unmounted.
:param args: a tuple containing the argumnets to be
passed to the umount command.
:raises: processutils.ProcessExecutionError if it failed
to run the process.
"""
args = ('umount', ) + args + (loc, )
execute(*args, run_as_root=True, check_exit_code=[0])
def dd(src, dst, *args):
"""Execute dd from src to dst.
:param src: the input file for dd command.
:param dst: the output file for dd command.
:param args: a tuple containing the arguments to be
passed to dd command.
:raises: processutils.ProcessExecutionError if it failed
to run the process.
"""
execute('dd', 'if=%s' % src, 'of=%s' % dst, *args,
run_as_root=True, check_exit_code=[0])
def is_name_safe(name):
"""Checks whether the name is valid or not.
@ -519,17 +209,6 @@ def is_name_safe(name):
return True
def raise_exception_invalid_scheme(url):
valid_schemes = ['http', 'https']
if not isinstance(url, six.string_types):
raise exception.Urllib2InvalidScheme(url=url)
scheme = url.split(':')[0]
if scheme not in valid_schemes:
raise exception.Urllib2InvalidScheme(url=url)
def get_k8s_quantity(quantity):
"""This function is used to get k8s quantity.

View File

@ -1,134 +0,0 @@
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Utility methods for objects"""
import ast
import datetime
import iso8601
import netaddr
from oslo_utils import timeutils
import six
from magnum.i18n import _
def datetime_or_none(dt):
"""Validate a datetime or None value."""
if dt is None:
return None
elif isinstance(dt, datetime.datetime):
if dt.utcoffset() is None:
# NOTE(danms): Legacy objects from sqlalchemy are stored in UTC,
# but are returned without a timezone attached.
# As a transitional aid, assume a tz-naive object is in UTC.
return dt.replace(tzinfo=iso8601.iso8601.Utc())
else:
return dt
raise ValueError(_("A datetime.datetime is required here"))
def datetime_or_str_or_none(val):
if isinstance(val, six.string_types):
return timeutils.parse_isotime(val)
return datetime_or_none(val)
def int_or_none(val):
"""Attempt to parse an integer value, or None."""
if val is None:
return val
else:
return int(val)
def str_or_none(val):
"""Attempt to stringify a value to unicode, or None."""
if val is None:
return val
else:
return six.text_type(val)
def dict_or_none(val):
"""Attempt to dictify a value, or None."""
if val is None:
return {}
elif isinstance(val, six.string_types):
return dict(ast.literal_eval(val))
else:
try:
return dict(val)
except ValueError:
return {}
def list_or_none(val):
"""Attempt to listify a value, or None."""
if val is None:
return []
elif isinstance(val, six.string_types):
return list(ast.literal_eval(val))
else:
try:
return list(val)
except ValueError:
return []
def ip_or_none(version):
"""Return a version-specific IP address validator."""
def validator(val, version=version):
if val is None:
return val
else:
return netaddr.IPAddress(val, version=version)
return validator
def nested_object_or_none(objclass):
def validator(val, objclass=objclass):
if val is None or isinstance(val, objclass):
return val
raise ValueError(_("An object of class %s is required here")
% objclass)
return validator
def dt_serializer(name):
"""Return a datetime serializer for a named attribute."""
def serializer(self, name=name):
if getattr(self, name) is not None:
return datetime.datetime.isoformat(getattr(self, name))
else:
return None
return serializer
def dt_deserializer(instance, val):
"""A deserializer method for datetime attributes."""
if val is None:
return None
else:
return timeutils.parse_isotime(val)
def obj_serializer(name):
def serializer(self, name=name):
if getattr(self, name) is not None:
return getattr(self, name).obj_to_primitive()
else:
return None
return serializer

View File

@ -14,17 +14,13 @@
# under the License.
import errno
import hashlib
import os
import os.path
import shutil
import tempfile
import mock
import netaddr
from oslo_concurrency import processutils
import six
import six.moves.builtins as __builtin__
from magnum.common import exception
from magnum.common import utils
@ -33,95 +29,6 @@ from magnum.tests import base
class UtilsTestCase(base.TestCase):
def test_random_alnum(self):
s = utils.random_alnum(10)
self.assertEqual(10, len(s))
s = utils.random_alnum(100)
self.assertEqual(100, len(s))
def test_unlink(self):
with mock.patch.object(os, "unlink") as unlink_mock:
unlink_mock.return_value = None
utils.unlink_without_raise("/fake/path")
unlink_mock.assert_called_once_with("/fake/path")
def test_unlink_ENOENT(self):
with mock.patch.object(os, "unlink") as unlink_mock:
unlink_mock.side_effect = OSError(errno.ENOENT)
utils.unlink_without_raise("/fake/path")
unlink_mock.assert_called_once_with("/fake/path")
def test_create_link(self):
with mock.patch.object(os, "symlink") as symlink_mock:
symlink_mock.return_value = None
utils.create_link_without_raise("/fake/source", "/fake/link")
symlink_mock.assert_called_once_with("/fake/source", "/fake/link")
def test_create_link_EEXIST(self):
with mock.patch.object(os, "symlink") as symlink_mock:
symlink_mock.side_effect = OSError(errno.EEXIST)
utils.create_link_without_raise("/fake/source", "/fake/link")
symlink_mock.assert_called_once_with("/fake/source", "/fake/link")
def test_generate_uid(self):
topic = 'test'
size = 8
s = utils.generate_uid(topic)
self.assertEqual(len(topic) + size + 1, len(s))
self.assertEqual(topic, s[:len(topic)])
size = 22
s = utils.generate_uid(topic, size)
self.assertEqual(len(topic) + size + 1, len(s))
def test_valid_ipv4(self):
self.assertTrue(utils.is_valid_ipv4('10.0.0.1'))
self.assertTrue(utils.is_valid_ipv4('255.255.255.255'))
def test_invalid_ipv4(self):
self.assertFalse(utils.is_valid_ipv4(''))
self.assertFalse(utils.is_valid_ipv4('x.x.x.x'))
self.assertFalse(utils.is_valid_ipv4('256.256.256.256'))
self.assertFalse(utils.is_valid_ipv4(
'AA42:0000:0000:0000:0202:B3FF:FE1E:8329'))
def test_valid_ipv6(self):
self.assertTrue(utils.is_valid_ipv6(
'AA42:0000:0000:0000:0202:B3FF:FE1E:8329'))
self.assertTrue(utils.is_valid_ipv6(
'AA42::0202:B3FF:FE1E:8329'))
def test_invalid_ipv6(self):
self.assertFalse(utils.is_valid_ipv6(''))
self.assertFalse(utils.is_valid_ipv6('10.0.0.1'))
self.assertFalse(utils.is_valid_ipv6('AA42::0202:B3FF:FE1E:'))
def test_valid_cidr(self):
self.assertTrue(utils.is_valid_cidr('10.0.0.0/24'))
self.assertTrue(utils.is_valid_cidr('10.0.0.1/32'))
self.assertTrue(utils.is_valid_cidr('0.0.0.0/0'))
def test_invalid_cidr(self):
self.assertFalse(utils.is_valid_cidr('10.0.0.1'))
self.assertFalse(utils.is_valid_cidr('10.0.0.1/33'))
def test_valid_network(self):
self.assertEqual('IPv4', utils.get_ip_version('10.0.0.1'))
self.assertEqual('IPv6', utils.get_ip_version(
'AA42:0000:0000:0000:0202:B3FF:FE1E:8329'))
def test_invalid_network(self):
self.assertRaises(netaddr.core.AddrFormatError,
utils.get_ip_version, 'x.x.x.x')
def test_convert_to_list_dict(self):
self.assertIsNone(utils.convert_to_list_dict(None, 'fred'))
self.assertIsNone(utils.convert_to_list_dict('', 'fred'))
self.assertEqual([{'fred': 'list'}],
utils.convert_to_list_dict('list', 'fred'))
self.assertEqual([{'fred': 'first'}, {'fred': 'second'}],
utils.convert_to_list_dict(['first', 'second'],
'fred'))
def test_get_k8s_quantity(self):
self.assertEqual(1024000.0, utils.get_k8s_quantity('1000Ki'))
self.assertEqual(0.001, utils.get_k8s_quantity('1E-3'))
@ -276,134 +183,6 @@ grep foo
utils.execute('foo', run_as_root=False)
execute_mock.assert_called_once_with('foo', run_as_root=False)
class GenericUtilsTestCase(base.TestCase):
def test_hostname_unicode_sanitization(self):
hostname = u"\u7684.test.example.com"
self.assertEqual("test.example.com",
utils.sanitize_hostname(hostname))
def test_hostname_sanitize_periods(self):
hostname = "....test.example.com..."
self.assertEqual("test.example.com",
utils.sanitize_hostname(hostname))
def test_hostname_sanitize_dashes(self):
hostname = "----test.example.com---"
self.assertEqual("test.example.com",
utils.sanitize_hostname(hostname))
def test_hostname_sanitize_characters(self):
hostname = "(#@&$!(@*--#&91)(__=+--test-host.example!!.com-0+"
self.assertEqual("91----test-host.example.com-0",
utils.sanitize_hostname(hostname))
def test_hostname_translate(self):
hostname = "<}\x1fh\x10e\x08l\x02l\x05o\x12!{>"
self.assertEqual("hello", utils.sanitize_hostname(hostname))
def test_read_cached_file(self):
with mock.patch.object(os.path, "getmtime") as getmtime_mock:
getmtime_mock.return_value = 1
cache_data = {"data": 1123, "mtime": 1}
data = utils.read_cached_file("/this/is/a/fake", cache_data)
self.assertEqual(cache_data["data"], data)
getmtime_mock.assert_called_once_with(mock.ANY)
def test_read_modified_cached_file(self):
with mock.patch.object(os.path, "getmtime") as getmtime_mock:
with mock.patch.object(__builtin__, 'open') as open_mock:
getmtime_mock.return_value = 2
fake_contents = "lorem ipsum"
fake_file = mock.Mock()
fake_file.read.return_value = fake_contents
fake_context_manager = mock.MagicMock()
fake_context_manager.__enter__.return_value = fake_file
fake_context_manager.__exit__.return_value = None
open_mock.return_value = fake_context_manager
cache_data = {"data": 1123, "mtime": 1}
self.reload_called = False
def test_reload(reloaded_data):
self.assertEqual(fake_contents, reloaded_data)
self.reload_called = True
data = utils.read_cached_file("/this/is/a/fake",
cache_data,
reload_func=test_reload)
self.assertEqual(fake_contents, data)
self.assertTrue(self.reload_called)
getmtime_mock.assert_called_once_with(mock.ANY)
open_mock.assert_called_once_with(mock.ANY)
fake_file.read.assert_called_once_with()
fake_context_manager.__exit__.assert_called_once_with(mock.ANY,
mock.ANY,
mock.ANY)
fake_context_manager.__enter__.assert_called_once_with()
def test_hash_file(self):
data = 'Mary had a little lamb, its fleece as white as snow'
flo = six.StringIO(data)
h1 = utils.hash_file(flo)
h2 = hashlib.sha1(six.b(data)).hexdigest()
self.assertEqual(h1, h2)
def test_is_valid_boolstr(self):
self.assertTrue(utils.is_valid_boolstr('true'))
self.assertTrue(utils.is_valid_boolstr('false'))
self.assertTrue(utils.is_valid_boolstr('yes'))
self.assertTrue(utils.is_valid_boolstr('no'))
self.assertTrue(utils.is_valid_boolstr('y'))
self.assertTrue(utils.is_valid_boolstr('n'))
self.assertTrue(utils.is_valid_boolstr('1'))
self.assertTrue(utils.is_valid_boolstr('0'))
self.assertFalse(utils.is_valid_boolstr('maybe'))
self.assertFalse(utils.is_valid_boolstr('only on tuesdays'))
def test_is_valid_ipv6_cidr(self):
self.assertTrue(utils.is_valid_ipv6_cidr("2600::/64"))
self.assertTrue(utils.is_valid_ipv6_cidr(
"abcd:ef01:2345:6789:abcd:ef01:192.168.254.254/48"))
self.assertTrue(utils.is_valid_ipv6_cidr(
"0000:0000:0000:0000:0000:0000:0000:0001/32"))
self.assertTrue(utils.is_valid_ipv6_cidr(
"0000:0000:0000:0000:0000:0000:0000:0001"))
self.assertFalse(utils.is_valid_ipv6_cidr("foo"))
self.assertFalse(utils.is_valid_ipv6_cidr("127.0.0.1"))
def test_get_shortened_ipv6(self):
self.assertEqual("abcd:ef01:2345:6789:abcd:ef01:c0a8:fefe",
utils.get_shortened_ipv6(
"abcd:ef01:2345:6789:abcd:ef01:192.168.254.254"))
self.assertEqual("::1",
utils.get_shortened_ipv6(
"0000:0000:0000:0000:0000:0000:0000:0001"))
self.assertEqual("caca::caca:0:babe:201:102",
utils.get_shortened_ipv6(
"caca:0000:0000:caca:0000:babe:0201:0102"))
self.assertRaises(netaddr.AddrFormatError, utils.get_shortened_ipv6,
"127.0.0.1")
self.assertRaises(netaddr.AddrFormatError, utils.get_shortened_ipv6,
"failure")
def test_get_shortened_ipv6_cidr(self):
self.assertEqual("2600::/64",
utils.get_shortened_ipv6_cidr(
"2600:0000:0000:0000:0000:0000:0000:0000/64"))
self.assertEqual("2600::/64",
utils.get_shortened_ipv6_cidr(
"2600::1/64"))
self.assertRaises(netaddr.AddrFormatError,
utils.get_shortened_ipv6_cidr,
"127.0.0.1")
self.assertRaises(netaddr.AddrFormatError,
utils.get_shortened_ipv6_cidr,
"failure")
def test_is_valid_mac(self):
self.assertTrue(utils.is_valid_mac("52:54:00:cf:2d:31"))
self.assertTrue(utils.is_valid_mac(u"52:54:00:cf:2d:31"))
@ -446,57 +225,6 @@ class GenericUtilsTestCase(base.TestCase):
self.assertEqual(value, utils.safe_rstrip(value))
class MkfsTestCase(base.TestCase):
@mock.patch.object(utils, 'execute')
def test_mkfs(self, execute_mock):
utils.mkfs('ext4', '/my/block/dev')
utils.mkfs('msdos', '/my/msdos/block/dev')
utils.mkfs('swap', '/my/swap/block/dev')
expected = [mock.call('mkfs', '-t', 'ext4', '-F', '/my/block/dev',
run_as_root=True,
use_standard_locale=True),
mock.call('mkfs', '-t', 'msdos', '/my/msdos/block/dev',
run_as_root=True,
use_standard_locale=True),
mock.call('mkswap', '/my/swap/block/dev',
run_as_root=True,
use_standard_locale=True)]
self.assertEqual(expected, execute_mock.call_args_list)
@mock.patch.object(utils, 'execute')
def test_mkfs_with_label(self, execute_mock):
utils.mkfs('ext4', '/my/block/dev', 'ext4-vol')
utils.mkfs('msdos', '/my/msdos/block/dev', 'msdos-vol')
utils.mkfs('swap', '/my/swap/block/dev', 'swap-vol')
expected = [mock.call('mkfs', '-t', 'ext4', '-F', '-L', 'ext4-vol',
'/my/block/dev', run_as_root=True,
use_standard_locale=True),
mock.call('mkfs', '-t', 'msdos', '-n', 'msdos-vol',
'/my/msdos/block/dev', run_as_root=True,
use_standard_locale=True),
mock.call('mkswap', '-L', 'swap-vol',
'/my/swap/block/dev', run_as_root=True,
use_standard_locale=True)]
self.assertEqual(expected, execute_mock.call_args_list)
@mock.patch.object(utils, 'execute',
side_effect=processutils.ProcessExecutionError(
stderr=os.strerror(errno.ENOENT)))
def test_mkfs_with_unsupported_fs(self, execute_mock):
self.assertRaises(exception.FileSystemNotSupported,
utils.mkfs, 'foo', '/my/block/dev')
@mock.patch.object(utils, 'execute',
side_effect=processutils.ProcessExecutionError(
stderr='fake'))
def test_mkfs_with_unexpected_error(self, execute_mock):
self.assertRaises(processutils.ProcessExecutionError, utils.mkfs,
'ext4', '/my/block/dev', 'ext4-vol')
class TempFilesTestCase(base.TestCase):
def test_tempdir(self):
@ -540,38 +268,6 @@ class TempFilesTestCase(base.TestCase):
self.assertTrue(log_mock.error.called)
class Urllib2_invalid_scheme(base.TestCase):
def test_raise_exception_invalid_scheme_file(self):
self.assertRaises(
exception.Urllib2InvalidScheme,
utils.raise_exception_invalid_scheme,
'file:///etc/passwd')
def test_raise_exception_invalid_scheme_starting_colon(self):
self.assertRaises(
exception.Urllib2InvalidScheme,
utils.raise_exception_invalid_scheme,
':///etc/passwd')
def test_raise_exception_invalid_scheme_None(self):
self.assertRaises(
exception.Urllib2InvalidScheme,
utils.raise_exception_invalid_scheme,
None)
def test_raise_exception_invalid_scheme_empty_string(self):
self.assertRaises(
exception.Urllib2InvalidScheme,
utils.raise_exception_invalid_scheme,
'')
def test_raise_exception_invalid_scheme_http(self):
utils.raise_exception_invalid_scheme(url='http://www.openstack.org')
def test_raise_exception_invalid_scheme_https(self):
utils.raise_exception_invalid_scheme(url='https://www.openstack.org')
class GeneratePasswordTestCase(base.TestCase):
def test_generate_password(self):
password = utils.generate_password(length=12)

View File

@ -15,17 +15,13 @@
import datetime
import gettext
import iso8601
import mock
import netaddr
from oslo_utils import timeutils
from oslo_versionedobjects import fields
from oslo_versionedobjects import fixture
from magnum.common import context as magnum_context
from magnum.common import exception
from magnum.objects import base
from magnum.objects import utils
from magnum.tests import base as test_base
gettext.install('magnum')
@ -93,68 +89,6 @@ class TestSubclassedObject(MyObj):
fields = {'new_field': fields.StringField()}
class TestUtils(test_base.TestCase):
def test_datetime_or_none(self):
naive_dt = timeutils.utcnow()
dt = timeutils.parse_isotime(datetime.datetime.isoformat(naive_dt))
self.assertEqual(dt, utils.datetime_or_none(dt))
self.assertEqual(naive_dt.replace(tzinfo=iso8601.iso8601.Utc()),
utils.datetime_or_none(dt))
self.assertIsNone(utils.datetime_or_none(None))
self.assertRaises(ValueError, utils.datetime_or_none, 'foo')
def test_datetime_or_str_or_none(self):
dts = datetime.datetime.isoformat(timeutils.utcnow())
dt = timeutils.parse_isotime(dts)
self.assertEqual(dt, utils.datetime_or_str_or_none(dt))
self.assertIsNone(utils.datetime_or_str_or_none(None))
self.assertEqual(dt, utils.datetime_or_str_or_none(dts))
self.assertRaises(ValueError, utils.datetime_or_str_or_none, 'foo')
def test_int_or_none(self):
self.assertEqual(1, utils.int_or_none(1))
self.assertEqual(1, utils.int_or_none('1'))
self.assertIsNone(utils.int_or_none(None))
self.assertRaises(ValueError, utils.int_or_none, 'foo')
def test_str_or_none(self):
class Obj(object):
pass
self.assertEqual('foo', utils.str_or_none('foo'))
self.assertEqual('1', utils.str_or_none(1))
self.assertIsNone(utils.str_or_none(None))
def test_ip_or_none(self):
ip4 = netaddr.IPAddress('1.2.3.4', 4)
ip6 = netaddr.IPAddress('1::2', 6)
self.assertEqual(ip4, utils.ip_or_none(4)('1.2.3.4'))
self.assertEqual(ip6, utils.ip_or_none(6)('1::2'))
self.assertIsNone(utils.ip_or_none(4)(None))
self.assertIsNone(utils.ip_or_none(6)(None))
self.assertRaises(netaddr.AddrFormatError, utils.ip_or_none(4), 'foo')
self.assertRaises(netaddr.AddrFormatError, utils.ip_or_none(6), 'foo')
def test_dt_serializer(self):
class Obj(object):
foo = utils.dt_serializer('bar')
obj = Obj()
obj.bar = timeutils.parse_isotime('1955-11-05T00:00:00Z')
self.assertEqual('1955-11-05T00:00:00+00:00', obj.foo())
obj.bar = None
self.assertIsNone(obj.foo())
obj.bar = 'foo'
self.assertRaises(TypeError, obj.foo)
def test_dt_deserializer(self):
dt = timeutils.parse_isotime('1955-11-05T00:00:00Z')
self.assertEqual(dt, utils.dt_deserializer(None,
datetime.datetime.isoformat(dt)))
self.assertIsNone(utils.dt_deserializer(None, None))
self.assertRaises(ValueError, utils.dt_deserializer, None, 'foo')
class _TestObject(object):
def test_hydration_type_error(self):
primitive = {'magnum_object.name': 'MyObj',

View File

@ -15,7 +15,15 @@
"""Magnum object test utilities."""
import datetime
import iso8601
import netaddr
from oslo_utils import timeutils
import six
from magnum.common import exception
from magnum.i18n import _
from magnum import objects
from magnum.tests.unit.db import utils as db_utils
@ -222,3 +230,68 @@ def get_test_container(context, **kw):
for key in db_container:
setattr(container, key, db_container[key])
return container
def datetime_or_none(dt):
"""Validate a datetime or None value."""
if dt is None:
return None
elif isinstance(dt, datetime.datetime):
if dt.utcoffset() is None:
# NOTE(danms): Legacy objects from sqlalchemy are stored in UTC,
# but are returned without a timezone attached.
# As a transitional aid, assume a tz-naive object is in UTC.
return dt.replace(tzinfo=iso8601.iso8601.Utc())
else:
return dt
raise ValueError(_("A datetime.datetime is required here"))
def datetime_or_str_or_none(val):
if isinstance(val, six.string_types):
return timeutils.parse_isotime(val)
return datetime_or_none(val)
def int_or_none(val):
"""Attempt to parse an integer value, or None."""
if val is None:
return val
else:
return int(val)
def str_or_none(val):
"""Attempt to stringify a value to unicode, or None."""
if val is None:
return val
else:
return six.text_type(val)
def ip_or_none(version):
"""Return a version-specific IP address validator."""
def validator(val, version=version):
if val is None:
return val
else:
return netaddr.IPAddress(val, version=version)
return validator
def dt_serializer(name):
"""Return a datetime serializer for a named attribute."""
def serializer(self, name=name):
if getattr(self, name) is not None:
return datetime.datetime.isoformat(getattr(self, name))
else:
return None
return serializer
def dt_deserializer(instance, val):
"""A deserializer method for datetime attributes."""
if val is None:
return None
else:
return timeutils.parse_isotime(val)