cd698f2664
In module netaddr valid_ipv4/valid_ipv6 raise exception AddrFormatError if address is specified as empty string. Methods is_valid_ipv4/is_valid_ipv6 catch the exception and return boolean value. So we'd better use them from oslo.utils. This commit also adds empty string case in unit test. Change-Id: If58889bb322686c252b4fe89a4ddf9469121028c
930 lines
34 KiB
Python
930 lines
34 KiB
Python
# Copyright 2011 Justin Santa Barbara
|
|
# Copyright 2014 NetApp, Inc.
|
|
# Copyright 2014 Mirantis, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import datetime
|
|
import errno
|
|
import os
|
|
import os.path
|
|
import socket
|
|
import tempfile
|
|
import time
|
|
import uuid
|
|
|
|
import ddt
|
|
import mock
|
|
from oslo_config import cfg
|
|
from oslo_utils import timeutils
|
|
import paramiko
|
|
from six.moves import builtins
|
|
|
|
import manila
|
|
from manila.common import constants
|
|
from manila import context
|
|
from manila.db import api as db
|
|
from manila import exception
|
|
from manila import test
|
|
from manila import utils
|
|
|
|
CONF = cfg.CONF
|
|
|
|
|
|
class GetFromPathTestCase(test.TestCase):
|
|
def test_tolerates_nones(self):
|
|
f = utils.get_from_path
|
|
|
|
input = []
|
|
self.assertEqual([], f(input, "a"))
|
|
self.assertEqual([], f(input, "a/b"))
|
|
self.assertEqual([], f(input, "a/b/c"))
|
|
|
|
input = [None]
|
|
self.assertEqual([], f(input, "a"))
|
|
self.assertEqual([], f(input, "a/b"))
|
|
self.assertEqual([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': None}]
|
|
self.assertEqual([], f(input, "a"))
|
|
self.assertEqual([], f(input, "a/b"))
|
|
self.assertEqual([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': None}}]
|
|
self.assertEqual([{'b': None}], f(input, "a"))
|
|
self.assertEqual([], f(input, "a/b"))
|
|
self.assertEqual([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': {'c': None}}}]
|
|
self.assertEqual([{'b': {'c': None}}], f(input, "a"))
|
|
self.assertEqual([{'c': None}], f(input, "a/b"))
|
|
self.assertEqual([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': {'c': None}}}, {'a': None}]
|
|
self.assertEqual([{'b': {'c': None}}], f(input, "a"))
|
|
self.assertEqual([{'c': None}], f(input, "a/b"))
|
|
self.assertEqual([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': {'c': None}}}, {'a': {'b': None}}]
|
|
self.assertEqual([{'b': {'c': None}}, {'b': None}], f(input, "a"))
|
|
self.assertEqual([{'c': None}], f(input, "a/b"))
|
|
self.assertEqual([], f(input, "a/b/c"))
|
|
|
|
def test_does_select(self):
|
|
f = utils.get_from_path
|
|
|
|
input = [{'a': 'a_1'}]
|
|
self.assertEqual(['a_1'], f(input, "a"))
|
|
self.assertEqual([], f(input, "a/b"))
|
|
self.assertEqual([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': 'b_1'}}]
|
|
self.assertEqual([{'b': 'b_1'}], f(input, "a"))
|
|
self.assertEqual(['b_1'], f(input, "a/b"))
|
|
self.assertEqual([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': {'c': 'c_1'}}}]
|
|
self.assertEqual([{'b': {'c': 'c_1'}}], f(input, "a"))
|
|
self.assertEqual([{'c': 'c_1'}], f(input, "a/b"))
|
|
self.assertEqual(['c_1'], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': {'c': 'c_1'}}}, {'a': None}]
|
|
self.assertEqual([{'b': {'c': 'c_1'}}], f(input, "a"))
|
|
self.assertEqual([{'c': 'c_1'}], f(input, "a/b"))
|
|
self.assertEqual(['c_1'], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': {'c': 'c_1'}}},
|
|
{'a': {'b': None}}]
|
|
self.assertEqual([{'b': {'c': 'c_1'}}, {'b': None}], f(input, "a"))
|
|
self.assertEqual([{'c': 'c_1'}], f(input, "a/b"))
|
|
self.assertEqual(['c_1'], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': {'c': 'c_1'}}},
|
|
{'a': {'b': {'c': 'c_2'}}}]
|
|
self.assertEqual([{'b': {'c': 'c_1'}}, {'b': {'c': 'c_2'}}],
|
|
f(input, "a"))
|
|
self.assertEqual([{'c': 'c_1'}, {'c': 'c_2'}], f(input, "a/b"))
|
|
self.assertEqual(['c_1', 'c_2'], f(input, "a/b/c"))
|
|
|
|
self.assertEqual([], f(input, "a/b/c/d"))
|
|
self.assertEqual([], f(input, "c/a/b/d"))
|
|
self.assertEqual([], f(input, "i/r/t"))
|
|
|
|
def test_flattens_lists(self):
|
|
f = utils.get_from_path
|
|
|
|
input = [{'a': [1, 2, 3]}]
|
|
self.assertEqual([1, 2, 3], f(input, "a"))
|
|
self.assertEqual([], f(input, "a/b"))
|
|
self.assertEqual([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': [1, 2, 3]}}]
|
|
self.assertEqual([{'b': [1, 2, 3]}], f(input, "a"))
|
|
self.assertEqual([1, 2, 3], f(input, "a/b"))
|
|
self.assertEqual([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': [1, 2, 3]}}, {'a': {'b': [4, 5, 6]}}]
|
|
self.assertEqual([1, 2, 3, 4, 5, 6], f(input, "a/b"))
|
|
self.assertEqual([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': [{'b': [1, 2, 3]}, {'b': [4, 5, 6]}]}]
|
|
self.assertEqual([1, 2, 3, 4, 5, 6], f(input, "a/b"))
|
|
self.assertEqual([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': [1, 2, {'b': 'b_1'}]}]
|
|
self.assertEqual([1, 2, {'b': 'b_1'}], f(input, "a"))
|
|
self.assertEqual(['b_1'], f(input, "a/b"))
|
|
|
|
def test_bad_xpath(self):
|
|
f = utils.get_from_path
|
|
|
|
self.assertRaises(exception.Error, f, [], None)
|
|
self.assertRaises(exception.Error, f, [], "")
|
|
self.assertRaises(exception.Error, f, [], "/")
|
|
self.assertRaises(exception.Error, f, [], "/a")
|
|
self.assertRaises(exception.Error, f, [], "/a/")
|
|
self.assertRaises(exception.Error, f, [], "//")
|
|
self.assertRaises(exception.Error, f, [], "//a")
|
|
self.assertRaises(exception.Error, f, [], "a//a")
|
|
self.assertRaises(exception.Error, f, [], "a//a/")
|
|
self.assertRaises(exception.Error, f, [], "a/a/")
|
|
|
|
def test_real_failure1(self):
|
|
# Real world failure case...
|
|
# We weren't coping when the input was a Dictionary instead of a List
|
|
# This led to test_accepts_dictionaries
|
|
f = utils.get_from_path
|
|
|
|
inst = {'fixed_ip': {'floating_ips': [{'address': '1.2.3.4'}],
|
|
'address': '192.168.0.3'},
|
|
'hostname': ''}
|
|
|
|
private_ips = f(inst, 'fixed_ip/address')
|
|
public_ips = f(inst, 'fixed_ip/floating_ips/address')
|
|
self.assertEqual(['192.168.0.3'], private_ips)
|
|
self.assertEqual(['1.2.3.4'], public_ips)
|
|
|
|
def test_accepts_dictionaries(self):
|
|
f = utils.get_from_path
|
|
|
|
input = {'a': [1, 2, 3]}
|
|
self.assertEqual([1, 2, 3], f(input, "a"))
|
|
self.assertEqual([], f(input, "a/b"))
|
|
self.assertEqual([], f(input, "a/b/c"))
|
|
|
|
input = {'a': {'b': [1, 2, 3]}}
|
|
self.assertEqual([{'b': [1, 2, 3]}], f(input, "a"))
|
|
self.assertEqual([1, 2, 3], f(input, "a/b"))
|
|
self.assertEqual([], f(input, "a/b/c"))
|
|
|
|
input = {'a': [{'b': [1, 2, 3]}, {'b': [4, 5, 6]}]}
|
|
self.assertEqual([1, 2, 3, 4, 5, 6], f(input, "a/b"))
|
|
self.assertEqual([], f(input, "a/b/c"))
|
|
|
|
input = {'a': [1, 2, {'b': 'b_1'}]}
|
|
self.assertEqual([1, 2, {'b': 'b_1'}], f(input, "a"))
|
|
self.assertEqual(['b_1'], f(input, "a/b"))
|
|
|
|
|
|
@ddt.ddt
|
|
class GenericUtilsTestCase(test.TestCase):
|
|
def test_read_cached_file(self):
|
|
cache_data = {"data": 1123, "mtime": 1}
|
|
with mock.patch.object(os.path, "getmtime", mock.Mock(return_value=1)):
|
|
data = utils.read_cached_file("/this/is/a/fake", cache_data)
|
|
self.assertEqual(cache_data["data"], data)
|
|
os.path.getmtime.assert_called_once_with("/this/is/a/fake")
|
|
|
|
def test_read_modified_cached_file(self):
|
|
with mock.patch.object(os.path, "getmtime", mock.Mock(return_value=2)):
|
|
fake_contents = "lorem ipsum"
|
|
fake_file = mock.Mock()
|
|
fake_file.read = mock.Mock(return_value=fake_contents)
|
|
fake_context_manager = mock.Mock()
|
|
fake_context_manager.__enter__ = mock.Mock(return_value=fake_file)
|
|
fake_context_manager.__exit__ = mock.Mock()
|
|
with mock.patch.object(
|
|
builtins, 'open',
|
|
mock.Mock(return_value=fake_context_manager)):
|
|
cache_data = {"data": 1123, "mtime": 1}
|
|
self.reload_called = False
|
|
|
|
def test_reload(reloaded_data):
|
|
self.assertEqual(fake_contents, reloaded_data)
|
|
self.reload_called = True
|
|
|
|
data = utils.read_cached_file("/this/is/a/fake",
|
|
cache_data,
|
|
reload_func=test_reload)
|
|
self.assertEqual(fake_contents, data)
|
|
self.assertTrue(self.reload_called)
|
|
fake_file.read.assert_called_once_with()
|
|
fake_context_manager.__enter__.assert_any_call()
|
|
builtins.open.assert_called_once_with("/this/is/a/fake")
|
|
os.path.getmtime.assert_called_once_with("/this/is/a/fake")
|
|
|
|
def test_read_file_as_root(self):
|
|
def fake_execute(*args, **kwargs):
|
|
if args[1] == 'bad':
|
|
raise exception.ProcessExecutionError
|
|
return 'fakecontents', None
|
|
|
|
self.mock_object(utils, 'execute', fake_execute)
|
|
contents = utils.read_file_as_root('good')
|
|
self.assertEqual('fakecontents', contents)
|
|
self.assertRaises(exception.FileNotFound,
|
|
utils.read_file_as_root, 'bad')
|
|
|
|
def test_temporary_chown(self):
|
|
def fake_execute(*args, **kwargs):
|
|
if args[0] == 'chown':
|
|
fake_execute.uid = args[1]
|
|
self.mock_object(utils, 'execute', fake_execute)
|
|
|
|
with tempfile.NamedTemporaryFile() as f:
|
|
with utils.temporary_chown(f.name, owner_uid=2):
|
|
self.assertEqual(2, fake_execute.uid)
|
|
self.assertEqual(fake_execute.uid, os.getuid())
|
|
|
|
def test_service_is_up(self):
|
|
fts_func = datetime.datetime.fromtimestamp
|
|
fake_now = 1000
|
|
down_time = 5
|
|
self.flags(service_down_time=down_time)
|
|
with mock.patch.object(timeutils, 'utcnow',
|
|
mock.Mock(return_value=fts_func(fake_now))):
|
|
|
|
# Up (equal)
|
|
service = {'updated_at': fts_func(fake_now - down_time),
|
|
'created_at': fts_func(fake_now - down_time)}
|
|
result = utils.service_is_up(service)
|
|
self.assertTrue(result)
|
|
timeutils.utcnow.assert_called_once_with()
|
|
|
|
with mock.patch.object(timeutils, 'utcnow',
|
|
mock.Mock(return_value=fts_func(fake_now))):
|
|
# Up
|
|
service = {'updated_at': fts_func(fake_now - down_time + 1),
|
|
'created_at': fts_func(fake_now - down_time + 1)}
|
|
result = utils.service_is_up(service)
|
|
self.assertTrue(result)
|
|
timeutils.utcnow.assert_called_once_with()
|
|
|
|
with mock.patch.object(timeutils, 'utcnow',
|
|
mock.Mock(return_value=fts_func(fake_now))):
|
|
# Down
|
|
service = {'updated_at': fts_func(fake_now - down_time - 1),
|
|
'created_at': fts_func(fake_now - down_time - 1)}
|
|
result = utils.service_is_up(service)
|
|
self.assertFalse(result)
|
|
timeutils.utcnow.assert_called_once_with()
|
|
|
|
def test_is_ipv6_configured0(self):
|
|
fake_fd = mock.Mock()
|
|
fake_fd.read.return_value = 'test'
|
|
with mock.patch('six.moves.builtins.open',
|
|
mock.Mock(return_value=fake_fd)) as open:
|
|
self.assertTrue(utils.is_ipv6_configured())
|
|
|
|
open.assert_called_once_with('/proc/net/if_inet6')
|
|
fake_fd.read.assert_called_once_with(32)
|
|
|
|
def test_is_ipv6_configured1(self):
|
|
fake_fd = mock.Mock()
|
|
fake_fd.read.return_value = ''
|
|
with mock.patch(
|
|
'six.moves.builtins.open', mock.Mock(return_value=fake_fd)):
|
|
self.assertFalse(utils.is_ipv6_configured())
|
|
|
|
def test_is_ipv6_configured2(self):
|
|
with mock.patch('six.moves.builtins.open',
|
|
mock.Mock(side_effect=IOError(
|
|
errno.ENOENT, 'Fake no such file error.'))):
|
|
self.assertFalse(utils.is_ipv6_configured())
|
|
|
|
def test_is_ipv6_configured3(self):
|
|
with mock.patch('six.moves.builtins.open',
|
|
mock.Mock(side_effect=IOError(
|
|
errno.EPERM, 'Fake no such file error.'))):
|
|
self.assertRaises(IOError, utils.is_ipv6_configured)
|
|
|
|
def test_is_eventlet_bug105(self):
|
|
fake_dns = mock.Mock()
|
|
fake_dns.getaddrinfo.side_effect = socket.gaierror(errno.EBADF)
|
|
with mock.patch.dict('sys.modules', {
|
|
'eventlet.support.greendns': fake_dns}):
|
|
self.assertTrue(utils.is_eventlet_bug105())
|
|
self.assertTrue(fake_dns.getaddrinfo.called)
|
|
|
|
def test_is_eventlet_bug105_neg(self):
|
|
fake_dns = mock.Mock()
|
|
fake_dns.getaddrinfo.return_value = [
|
|
(socket.AF_INET6, socket.SOCK_STREAM, 0, '', (u'127.0.0.1', 80)),
|
|
]
|
|
with mock.patch.dict('sys.modules', {
|
|
'eventlet.support.greendns': fake_dns}):
|
|
self.assertFalse(utils.is_eventlet_bug105())
|
|
fake_dns.getaddrinfo.assert_called_once_with('::1', 80)
|
|
|
|
@ddt.data(['ssh', '-D', 'my_name@name_of_remote_computer'],
|
|
['echo', '"quoted arg with space"'],
|
|
['echo', "'quoted arg with space'"])
|
|
def test_check_ssh_injection(self, cmd):
|
|
cmd_list = cmd
|
|
self.assertIsNone(utils.check_ssh_injection(cmd_list))
|
|
|
|
@ddt.data(['ssh', 'my_name@ name_of_remote_computer'],
|
|
['||', 'my_name@name_of_remote_computer'],
|
|
['cmd', 'virus;ls'],
|
|
['cmd', '"arg\"withunescaped"'],
|
|
['cmd', 'virus;"quoted argument"'],
|
|
['echo', '"quoted argument";rm -rf'],
|
|
['echo', "'quoted argument `rm -rf`'"],
|
|
['echo', '"quoted";virus;"quoted"'],
|
|
['echo', '"quoted";virus;\'quoted\''])
|
|
def test_check_ssh_injection_on_error0(self, cmd):
|
|
self.assertRaises(exception.SSHInjectionThreat,
|
|
utils.check_ssh_injection, cmd)
|
|
|
|
@ddt.data(
|
|
(("3G", "G"), 3.0),
|
|
(("4.1G", "G"), 4.1),
|
|
(("5.23G", "G"), 5.23),
|
|
(("9728M", "G"), 9.5),
|
|
(("8192K", "G"), 0.0078125),
|
|
(("2T", "G"), 2048.0),
|
|
(("2.1T", "G"), 2150.4),
|
|
(("3P", "G"), 3145728.0),
|
|
(("3.4P", "G"), 3565158.4),
|
|
(("9728M", "M"), 9728.0),
|
|
(("9728.2381T", "T"), 9728.2381),
|
|
(("0", "G"), 0.0),
|
|
(("512", "M"), 0.00048828125),
|
|
(("2097152.", "M"), 2.0),
|
|
((".1024", "K"), 0.0001),
|
|
(("2048G", "T"), 2.0),
|
|
(("65536G", "P"), 0.0625),
|
|
)
|
|
@ddt.unpack
|
|
def test_translate_string_size_to_float_positive(self, request, expected):
|
|
actual = utils.translate_string_size_to_float(*request)
|
|
self.assertEqual(expected, actual)
|
|
|
|
@ddt.data(
|
|
(None, "G"),
|
|
("fake", "G"),
|
|
("1fake", "G"),
|
|
("2GG", "G"),
|
|
("1KM", "G"),
|
|
("K1M", "G"),
|
|
("M1K", "G"),
|
|
("", "G"),
|
|
(23, "G"),
|
|
(23.0, "G"),
|
|
)
|
|
@ddt.unpack
|
|
def test_translate_string_size_to_float_negative(self, string, multiplier):
|
|
actual = utils.translate_string_size_to_float(string, multiplier)
|
|
self.assertIsNone(actual)
|
|
|
|
|
|
class MonkeyPatchTestCase(test.TestCase):
|
|
"""Unit test for utils.monkey_patch()."""
|
|
def setUp(self):
|
|
super(MonkeyPatchTestCase, self).setUp()
|
|
self.example_package = 'manila.tests.monkey_patch_example.'
|
|
self.flags(
|
|
monkey_patch=True,
|
|
monkey_patch_modules=[self.example_package + 'example_a' + ':'
|
|
+ self.example_package
|
|
+ 'example_decorator'])
|
|
|
|
def test_monkey_patch(self):
|
|
utils.monkey_patch()
|
|
manila.tests.monkey_patch_example.CALLED_FUNCTION = []
|
|
from manila.tests.monkey_patch_example import example_a
|
|
from manila.tests.monkey_patch_example import example_b
|
|
|
|
self.assertEqual('Example function', example_a.example_function_a())
|
|
exampleA = example_a.ExampleClassA()
|
|
exampleA.example_method()
|
|
ret_a = exampleA.example_method_add(3, 5)
|
|
self.assertEqual(8, ret_a)
|
|
|
|
self.assertEqual('Example function', example_b.example_function_b())
|
|
exampleB = example_b.ExampleClassB()
|
|
exampleB.example_method()
|
|
ret_b = exampleB.example_method_add(3, 5)
|
|
|
|
self.assertEqual(8, ret_b)
|
|
package_a = self.example_package + 'example_a.'
|
|
self.assertTrue(package_a + 'example_function_a'
|
|
in manila.tests.monkey_patch_example.CALLED_FUNCTION)
|
|
|
|
self.assertTrue(package_a + 'ExampleClassA.example_method'
|
|
in manila.tests.monkey_patch_example.CALLED_FUNCTION)
|
|
self.assertTrue(package_a + 'ExampleClassA.example_method_add'
|
|
in manila.tests.monkey_patch_example.CALLED_FUNCTION)
|
|
package_b = self.example_package + 'example_b.'
|
|
self.assertFalse(package_b + 'example_function_b'
|
|
in manila.tests.monkey_patch_example.CALLED_FUNCTION)
|
|
self.assertFalse(package_b + 'ExampleClassB.example_method'
|
|
in manila.tests.monkey_patch_example.CALLED_FUNCTION)
|
|
self.assertFalse(package_b + 'ExampleClassB.example_method_add'
|
|
in manila.tests.monkey_patch_example.CALLED_FUNCTION)
|
|
|
|
|
|
class FakeSSHClient(object):
|
|
|
|
def __init__(self):
|
|
self.id = uuid.uuid4()
|
|
self.transport = FakeTransport()
|
|
|
|
def set_missing_host_key_policy(self, policy):
|
|
pass
|
|
|
|
def connect(self, ip, port=22, username=None, password=None,
|
|
key_filename=None, look_for_keys=None, timeout=10):
|
|
pass
|
|
|
|
def get_transport(self):
|
|
return self.transport
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
pass
|
|
|
|
|
|
class FakeSock(object):
|
|
def settimeout(self, timeout):
|
|
pass
|
|
|
|
|
|
class FakeTransport(object):
|
|
|
|
def __init__(self):
|
|
self.active = True
|
|
self.sock = FakeSock()
|
|
|
|
def set_keepalive(self, timeout):
|
|
pass
|
|
|
|
def is_active(self):
|
|
return self.active
|
|
|
|
|
|
class SSHPoolTestCase(test.TestCase):
|
|
"""Unit test for SSH Connection Pool."""
|
|
|
|
def test_single_ssh_connect(self):
|
|
with mock.patch.object(paramiko, "SSHClient",
|
|
mock.Mock(return_value=FakeSSHClient())):
|
|
sshpool = utils.SSHPool("127.0.0.1", 22, 10, "test",
|
|
password="test", min_size=1, max_size=1)
|
|
with sshpool.item() as ssh:
|
|
first_id = ssh.id
|
|
|
|
with sshpool.item() as ssh:
|
|
second_id = ssh.id
|
|
|
|
self.assertEqual(first_id, second_id)
|
|
paramiko.SSHClient.assert_called_once_with()
|
|
|
|
def test_create_ssh_with_password(self):
|
|
fake_ssh_client = mock.Mock()
|
|
ssh_pool = utils.SSHPool("127.0.0.1", 22, 10, "test",
|
|
password="test")
|
|
with mock.patch.object(paramiko, "SSHClient",
|
|
return_value=fake_ssh_client):
|
|
ssh_pool.create()
|
|
|
|
fake_ssh_client.connect.assert_called_once_with(
|
|
"127.0.0.1", port=22, username="test",
|
|
password="test", key_filename=None, look_for_keys=False,
|
|
timeout=10)
|
|
|
|
def test_create_ssh_with_key(self):
|
|
path_to_private_key = "/fakepath/to/privatekey"
|
|
fake_ssh_client = mock.Mock()
|
|
ssh_pool = utils.SSHPool("127.0.0.1", 22, 10, "test",
|
|
privatekey="/fakepath/to/privatekey")
|
|
with mock.patch.object(paramiko, "SSHClient",
|
|
return_value=fake_ssh_client):
|
|
ssh_pool.create()
|
|
fake_ssh_client.connect.assert_called_once_with(
|
|
"127.0.0.1", port=22, username="test", password=None,
|
|
key_filename=path_to_private_key, look_for_keys=False,
|
|
timeout=10)
|
|
|
|
def test_create_ssh_with_nothing(self):
|
|
fake_ssh_client = mock.Mock()
|
|
ssh_pool = utils.SSHPool("127.0.0.1", 22, 10, "test")
|
|
with mock.patch.object(paramiko, "SSHClient",
|
|
return_value=fake_ssh_client):
|
|
ssh_pool.create()
|
|
fake_ssh_client.connect.assert_called_once_with(
|
|
"127.0.0.1", port=22, username="test", password=None,
|
|
key_filename=None, look_for_keys=True,
|
|
timeout=10)
|
|
|
|
def test_create_ssh_error_connecting(self):
|
|
attrs = {'connect.side_effect': paramiko.SSHException, }
|
|
fake_ssh_client = mock.Mock(**attrs)
|
|
ssh_pool = utils.SSHPool("127.0.0.1", 22, 10, "test")
|
|
with mock.patch.object(paramiko, "SSHClient",
|
|
return_value=fake_ssh_client):
|
|
self.assertRaises(exception.SSHException, ssh_pool.create)
|
|
fake_ssh_client.connect.assert_called_once_with(
|
|
"127.0.0.1", port=22, username="test", password=None,
|
|
key_filename=None, look_for_keys=True,
|
|
timeout=10)
|
|
|
|
def test_closed_reopend_ssh_connections(self):
|
|
with mock.patch.object(paramiko, "SSHClient",
|
|
mock.Mock(return_value=FakeSSHClient())):
|
|
sshpool = utils.SSHPool("127.0.0.1", 22, 10, "test",
|
|
password="test", min_size=1, max_size=2)
|
|
with sshpool.item() as ssh:
|
|
first_id = ssh.id
|
|
with sshpool.item() as ssh:
|
|
second_id = ssh.id
|
|
# Close the connection and test for a new connection
|
|
ssh.get_transport().active = False
|
|
self.assertEqual(first_id, second_id)
|
|
paramiko.SSHClient.assert_called_once_with()
|
|
|
|
# Expected new ssh pool
|
|
with mock.patch.object(paramiko, "SSHClient",
|
|
mock.Mock(return_value=FakeSSHClient())):
|
|
with sshpool.item() as ssh:
|
|
third_id = ssh.id
|
|
self.assertNotEqual(first_id, third_id)
|
|
paramiko.SSHClient.assert_called_once_with()
|
|
|
|
|
|
class CidrToNetmaskTestCase(test.TestCase):
|
|
"""Unit test for cidr to netmask."""
|
|
|
|
def test_cidr_to_netmask_01(self):
|
|
cidr = '10.0.0.0/0'
|
|
expected_netmask = '0.0.0.0'
|
|
result = utils.cidr_to_netmask(cidr)
|
|
self.assertEqual(expected_netmask, result)
|
|
|
|
def test_cidr_to_netmask_02(self):
|
|
cidr = '10.0.0.0/24'
|
|
expected_netmask = '255.255.255.0'
|
|
result = utils.cidr_to_netmask(cidr)
|
|
self.assertEqual(expected_netmask, result)
|
|
|
|
def test_cidr_to_netmask_03(self):
|
|
cidr = '10.0.0.0/5'
|
|
expected_netmask = '248.0.0.0'
|
|
result = utils.cidr_to_netmask(cidr)
|
|
self.assertEqual(expected_netmask, result)
|
|
|
|
def test_cidr_to_netmask_04(self):
|
|
cidr = '10.0.0.0/32'
|
|
expected_netmask = '255.255.255.255'
|
|
result = utils.cidr_to_netmask(cidr)
|
|
self.assertEqual(expected_netmask, result)
|
|
|
|
def test_cidr_to_netmask_05(self):
|
|
cidr = '10.0.0.1'
|
|
expected_netmask = '255.255.255.255'
|
|
result = utils.cidr_to_netmask(cidr)
|
|
self.assertEqual(expected_netmask, result)
|
|
|
|
def test_cidr_to_netmask_invalid_01(self):
|
|
cidr = '10.0.0.0/33'
|
|
self.assertRaises(exception.InvalidInput, utils.cidr_to_netmask, cidr)
|
|
|
|
def test_cidr_to_netmask_invalid_02(self):
|
|
cidr = ''
|
|
self.assertRaises(exception.InvalidInput, utils.cidr_to_netmask, cidr)
|
|
|
|
def test_cidr_to_netmask_invalid_03(self):
|
|
cidr = '10.0.0.0/33'
|
|
self.assertRaises(exception.InvalidInput, utils.cidr_to_netmask, cidr)
|
|
|
|
def test_cidr_to_netmask_invalid_04(self):
|
|
cidr = '10.0.0.555/33'
|
|
self.assertRaises(exception.InvalidInput, utils.cidr_to_netmask, cidr)
|
|
|
|
|
|
@ddt.ddt
|
|
class IsValidIPVersion(test.TestCase):
|
|
"""Test suite for function 'is_valid_ip_address'."""
|
|
|
|
@ddt.data('0.0.0.0', '255.255.255.255', '192.168.0.1')
|
|
def test_valid_v4(self, addr):
|
|
for vers in (4, '4'):
|
|
self.assertTrue(utils.is_valid_ip_address(addr, vers))
|
|
|
|
@ddt.data(
|
|
'2001:cdba:0000:0000:0000:0000:3257:9652',
|
|
'2001:cdba:0:0:0:0:3257:9652',
|
|
'2001:cdba::3257:9652')
|
|
def test_valid_v6(self, addr):
|
|
for vers in (6, '6'):
|
|
self.assertTrue(utils.is_valid_ip_address(addr, vers))
|
|
|
|
@ddt.data(
|
|
{'addr': '1.1.1.1', 'vers': 3},
|
|
{'addr': '1.1.1.1', 'vers': 5},
|
|
{'addr': '1.1.1.1', 'vers': 7},
|
|
{'addr': '2001:cdba::3257:9652', 'vers': '3'},
|
|
{'addr': '2001:cdba::3257:9652', 'vers': '5'},
|
|
{'addr': '2001:cdba::3257:9652', 'vers': '7'})
|
|
@ddt.unpack
|
|
def test_provided_invalid_version(self, addr, vers):
|
|
self.assertRaises(
|
|
exception.ManilaException, utils.is_valid_ip_address, addr, vers)
|
|
|
|
def test_provided_none_version(self):
|
|
self.assertRaises(TypeError, utils.is_valid_ip_address, '', None)
|
|
|
|
@ddt.data(None, 'fake', '1.1.1.1')
|
|
def test_provided_invalid_v6_address(self, addr):
|
|
for vers in (6, '6'):
|
|
self.assertFalse(utils.is_valid_ip_address(addr, vers))
|
|
|
|
@ddt.data(None, 'fake', '255.255.255.256', '2001:cdba::3257:9652', '')
|
|
def test_provided_invalid_v4_address(self, addr):
|
|
for vers in (4, '4'):
|
|
self.assertFalse(utils.is_valid_ip_address(addr, vers))
|
|
|
|
|
|
class Comparable(utils.ComparableMixin):
|
|
def __init__(self, value):
|
|
self.value = value
|
|
|
|
def _cmpkey(self):
|
|
return self.value
|
|
|
|
|
|
class TestComparableMixin(test.TestCase):
|
|
|
|
def setUp(self):
|
|
super(TestComparableMixin, self).setUp()
|
|
self.one = Comparable(1)
|
|
self.two = Comparable(2)
|
|
|
|
def test_lt(self):
|
|
self.assertTrue(self.one < self.two)
|
|
self.assertFalse(self.two < self.one)
|
|
self.assertFalse(self.one < self.one)
|
|
|
|
def test_le(self):
|
|
self.assertTrue(self.one <= self.two)
|
|
self.assertFalse(self.two <= self.one)
|
|
self.assertTrue(self.one <= self.one)
|
|
|
|
def test_eq(self):
|
|
self.assertFalse(self.one == self.two)
|
|
self.assertFalse(self.two == self.one)
|
|
self.assertTrue(self.one == self.one)
|
|
|
|
def test_ge(self):
|
|
self.assertFalse(self.one >= self.two)
|
|
self.assertTrue(self.two >= self.one)
|
|
self.assertTrue(self.one >= self.one)
|
|
|
|
def test_gt(self):
|
|
self.assertFalse(self.one > self.two)
|
|
self.assertTrue(self.two > self.one)
|
|
self.assertFalse(self.one > self.one)
|
|
|
|
def test_ne(self):
|
|
self.assertTrue(self.one != self.two)
|
|
self.assertTrue(self.two != self.one)
|
|
self.assertFalse(self.one != self.one)
|
|
|
|
def test_compare(self):
|
|
self.assertEqual(NotImplemented,
|
|
self.one._compare(1, self.one._cmpkey))
|
|
|
|
|
|
class TestRetryDecorator(test.TestCase):
|
|
def setUp(self):
|
|
super(TestRetryDecorator, self).setUp()
|
|
|
|
def test_no_retry_required(self):
|
|
self.counter = 0
|
|
|
|
with mock.patch.object(time, 'sleep') as mock_sleep:
|
|
@utils.retry(exception.ManilaException,
|
|
interval=2,
|
|
retries=3,
|
|
backoff_rate=2)
|
|
def succeeds():
|
|
self.counter += 1
|
|
return 'success'
|
|
|
|
ret = succeeds()
|
|
self.assertFalse(mock_sleep.called)
|
|
self.assertEqual('success', ret)
|
|
self.assertEqual(1, self.counter)
|
|
|
|
def test_no_retry_required_random(self):
|
|
self.counter = 0
|
|
|
|
with mock.patch.object(time, 'sleep') as mock_sleep:
|
|
@utils.retry(exception.ManilaException,
|
|
interval=2,
|
|
retries=3,
|
|
backoff_rate=2,
|
|
wait_random=True)
|
|
def succeeds():
|
|
self.counter += 1
|
|
return 'success'
|
|
|
|
ret = succeeds()
|
|
self.assertFalse(mock_sleep.called)
|
|
self.assertEqual('success', ret)
|
|
self.assertEqual(1, self.counter)
|
|
|
|
def test_retries_once_random(self):
|
|
self.counter = 0
|
|
interval = 2
|
|
backoff_rate = 2
|
|
retries = 3
|
|
|
|
with mock.patch.object(time, 'sleep') as mock_sleep:
|
|
@utils.retry(exception.ManilaException,
|
|
interval,
|
|
retries,
|
|
backoff_rate,
|
|
wait_random=True)
|
|
def fails_once():
|
|
self.counter += 1
|
|
if self.counter < 2:
|
|
raise exception.ManilaException(data='fake')
|
|
else:
|
|
return 'success'
|
|
|
|
ret = fails_once()
|
|
self.assertEqual('success', ret)
|
|
self.assertEqual(2, self.counter)
|
|
self.assertEqual(1, mock_sleep.call_count)
|
|
self.assertTrue(mock_sleep.called)
|
|
|
|
def test_retries_once(self):
|
|
self.counter = 0
|
|
interval = 2
|
|
backoff_rate = 2
|
|
retries = 3
|
|
|
|
with mock.patch.object(time, 'sleep') as mock_sleep:
|
|
@utils.retry(exception.ManilaException,
|
|
interval,
|
|
retries,
|
|
backoff_rate)
|
|
def fails_once():
|
|
self.counter += 1
|
|
if self.counter < 2:
|
|
raise exception.ManilaException(data='fake')
|
|
else:
|
|
return 'success'
|
|
|
|
ret = fails_once()
|
|
self.assertEqual('success', ret)
|
|
self.assertEqual(2, self.counter)
|
|
self.assertEqual(1, mock_sleep.call_count)
|
|
mock_sleep.assert_called_with(interval * backoff_rate)
|
|
|
|
def test_limit_is_reached(self):
|
|
self.counter = 0
|
|
retries = 3
|
|
interval = 2
|
|
backoff_rate = 4
|
|
|
|
with mock.patch.object(time, 'sleep') as mock_sleep:
|
|
@utils.retry(exception.ManilaException,
|
|
interval,
|
|
retries,
|
|
backoff_rate)
|
|
def always_fails():
|
|
self.counter += 1
|
|
raise exception.ManilaException(data='fake')
|
|
|
|
self.assertRaises(exception.ManilaException,
|
|
always_fails)
|
|
self.assertEqual(retries, self.counter)
|
|
|
|
expected_sleep_arg = []
|
|
|
|
for i in range(retries):
|
|
if i > 0:
|
|
interval *= backoff_rate
|
|
expected_sleep_arg.append(float(interval))
|
|
|
|
mock_sleep.assert_has_calls(map(mock.call, expected_sleep_arg))
|
|
|
|
def test_wrong_exception_no_retry(self):
|
|
|
|
with mock.patch.object(time, 'sleep') as mock_sleep:
|
|
@utils.retry(exception.ManilaException)
|
|
def raise_unexpected_error():
|
|
raise ValueError("value error")
|
|
|
|
self.assertRaises(ValueError, raise_unexpected_error)
|
|
self.assertFalse(mock_sleep.called)
|
|
|
|
|
|
@ddt.ddt
|
|
class RequireDriverInitializedTestCase(test.TestCase):
|
|
|
|
@ddt.data(True, False)
|
|
def test_require_driver_initialized(self, initialized):
|
|
|
|
class FakeDriver(object):
|
|
@property
|
|
def initialized(self):
|
|
return initialized
|
|
|
|
class FakeException(Exception):
|
|
pass
|
|
|
|
class FakeManager(object):
|
|
driver = FakeDriver()
|
|
|
|
@utils.require_driver_initialized
|
|
def call_me(self):
|
|
raise FakeException(
|
|
"Should be raised only if manager.driver.initialized "
|
|
"('%s') is equal to 'True'." % initialized)
|
|
|
|
if initialized:
|
|
expected_exception = FakeException
|
|
else:
|
|
expected_exception = exception.DriverNotInitialized
|
|
|
|
self.assertRaises(expected_exception, FakeManager().call_me)
|
|
|
|
|
|
@ddt.ddt
|
|
class ShareMigrationHelperTestCase(test.TestCase):
|
|
"""Tests DataMigrationHelper."""
|
|
|
|
def setUp(self):
|
|
super(ShareMigrationHelperTestCase, self).setUp()
|
|
self.context = context.get_admin_context()
|
|
|
|
def test_wait_for_access_update(self):
|
|
sid = 1
|
|
fake_share_instances = [
|
|
{'id': sid, 'access_rules_status': constants.STATUS_OUT_OF_SYNC},
|
|
{'id': sid, 'access_rules_status': constants.STATUS_ACTIVE},
|
|
]
|
|
|
|
self.mock_object(time, 'sleep')
|
|
self.mock_object(db, 'share_instance_get',
|
|
mock.Mock(side_effect=fake_share_instances))
|
|
|
|
utils.wait_for_access_update(self.context, db,
|
|
fake_share_instances[0], 1)
|
|
|
|
db.share_instance_get.assert_has_calls(
|
|
[mock.call(mock.ANY, sid), mock.call(mock.ANY, sid)]
|
|
)
|
|
time.sleep.assert_called_once_with(1)
|
|
|
|
@ddt.data(
|
|
(
|
|
{'id': '1', 'access_rules_status': constants.STATUS_ERROR},
|
|
exception.ShareMigrationFailed
|
|
),
|
|
(
|
|
{'id': '1', 'access_rules_status': constants.STATUS_OUT_OF_SYNC},
|
|
exception.ShareMigrationFailed
|
|
),
|
|
)
|
|
@ddt.unpack
|
|
def test_wait_for_access_update_invalid(self, fake_instance, expected_exc):
|
|
self.mock_object(time, 'sleep')
|
|
self.mock_object(db, 'share_instance_get',
|
|
mock.Mock(return_value=fake_instance))
|
|
|
|
now = time.time()
|
|
timeout = now + 100
|
|
|
|
self.mock_object(time, 'time',
|
|
mock.Mock(side_effect=[now, timeout]))
|
|
|
|
self.assertRaises(expected_exc,
|
|
utils.wait_for_access_update, self.context,
|
|
db, fake_instance, 1)
|