Remove unused functions in utils
Change-Id: I83057f5bcb61c566407f11c6f5304de05455a563
This commit is contained in:
parent
34a1879e6b
commit
6dccb17ace
@ -16,10 +16,7 @@
|
||||
|
||||
import datetime
|
||||
import errno
|
||||
import os
|
||||
import os.path
|
||||
import socket
|
||||
import tempfile
|
||||
import time
|
||||
import uuid
|
||||
|
||||
@ -28,7 +25,6 @@ import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import timeutils
|
||||
import paramiko
|
||||
from six.moves import builtins
|
||||
|
||||
import manila
|
||||
from manila.common import constants
|
||||
@ -41,221 +37,8 @@ from manila import utils
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class GetFromPathTestCase(test.TestCase):
|
||||
def test_tolerates_nones(self):
|
||||
f = utils.get_from_path
|
||||
|
||||
input = []
|
||||
self.assertEqual([], f(input, "a"))
|
||||
self.assertEqual([], f(input, "a/b"))
|
||||
self.assertEqual([], f(input, "a/b/c"))
|
||||
|
||||
input = [None]
|
||||
self.assertEqual([], f(input, "a"))
|
||||
self.assertEqual([], f(input, "a/b"))
|
||||
self.assertEqual([], f(input, "a/b/c"))
|
||||
|
||||
input = [{'a': None}]
|
||||
self.assertEqual([], f(input, "a"))
|
||||
self.assertEqual([], f(input, "a/b"))
|
||||
self.assertEqual([], f(input, "a/b/c"))
|
||||
|
||||
input = [{'a': {'b': None}}]
|
||||
self.assertEqual([{'b': None}], f(input, "a"))
|
||||
self.assertEqual([], f(input, "a/b"))
|
||||
self.assertEqual([], f(input, "a/b/c"))
|
||||
|
||||
input = [{'a': {'b': {'c': None}}}]
|
||||
self.assertEqual([{'b': {'c': None}}], f(input, "a"))
|
||||
self.assertEqual([{'c': None}], f(input, "a/b"))
|
||||
self.assertEqual([], f(input, "a/b/c"))
|
||||
|
||||
input = [{'a': {'b': {'c': None}}}, {'a': None}]
|
||||
self.assertEqual([{'b': {'c': None}}], f(input, "a"))
|
||||
self.assertEqual([{'c': None}], f(input, "a/b"))
|
||||
self.assertEqual([], f(input, "a/b/c"))
|
||||
|
||||
input = [{'a': {'b': {'c': None}}}, {'a': {'b': None}}]
|
||||
self.assertEqual([{'b': {'c': None}}, {'b': None}], f(input, "a"))
|
||||
self.assertEqual([{'c': None}], f(input, "a/b"))
|
||||
self.assertEqual([], f(input, "a/b/c"))
|
||||
|
||||
def test_does_select(self):
|
||||
f = utils.get_from_path
|
||||
|
||||
input = [{'a': 'a_1'}]
|
||||
self.assertEqual(['a_1'], f(input, "a"))
|
||||
self.assertEqual([], f(input, "a/b"))
|
||||
self.assertEqual([], f(input, "a/b/c"))
|
||||
|
||||
input = [{'a': {'b': 'b_1'}}]
|
||||
self.assertEqual([{'b': 'b_1'}], f(input, "a"))
|
||||
self.assertEqual(['b_1'], f(input, "a/b"))
|
||||
self.assertEqual([], f(input, "a/b/c"))
|
||||
|
||||
input = [{'a': {'b': {'c': 'c_1'}}}]
|
||||
self.assertEqual([{'b': {'c': 'c_1'}}], f(input, "a"))
|
||||
self.assertEqual([{'c': 'c_1'}], f(input, "a/b"))
|
||||
self.assertEqual(['c_1'], f(input, "a/b/c"))
|
||||
|
||||
input = [{'a': {'b': {'c': 'c_1'}}}, {'a': None}]
|
||||
self.assertEqual([{'b': {'c': 'c_1'}}], f(input, "a"))
|
||||
self.assertEqual([{'c': 'c_1'}], f(input, "a/b"))
|
||||
self.assertEqual(['c_1'], f(input, "a/b/c"))
|
||||
|
||||
input = [{'a': {'b': {'c': 'c_1'}}},
|
||||
{'a': {'b': None}}]
|
||||
self.assertEqual([{'b': {'c': 'c_1'}}, {'b': None}], f(input, "a"))
|
||||
self.assertEqual([{'c': 'c_1'}], f(input, "a/b"))
|
||||
self.assertEqual(['c_1'], f(input, "a/b/c"))
|
||||
|
||||
input = [{'a': {'b': {'c': 'c_1'}}},
|
||||
{'a': {'b': {'c': 'c_2'}}}]
|
||||
self.assertEqual([{'b': {'c': 'c_1'}}, {'b': {'c': 'c_2'}}],
|
||||
f(input, "a"))
|
||||
self.assertEqual([{'c': 'c_1'}, {'c': 'c_2'}], f(input, "a/b"))
|
||||
self.assertEqual(['c_1', 'c_2'], f(input, "a/b/c"))
|
||||
|
||||
self.assertEqual([], f(input, "a/b/c/d"))
|
||||
self.assertEqual([], f(input, "c/a/b/d"))
|
||||
self.assertEqual([], f(input, "i/r/t"))
|
||||
|
||||
def test_flattens_lists(self):
|
||||
f = utils.get_from_path
|
||||
|
||||
input = [{'a': [1, 2, 3]}]
|
||||
self.assertEqual([1, 2, 3], f(input, "a"))
|
||||
self.assertEqual([], f(input, "a/b"))
|
||||
self.assertEqual([], f(input, "a/b/c"))
|
||||
|
||||
input = [{'a': {'b': [1, 2, 3]}}]
|
||||
self.assertEqual([{'b': [1, 2, 3]}], f(input, "a"))
|
||||
self.assertEqual([1, 2, 3], f(input, "a/b"))
|
||||
self.assertEqual([], f(input, "a/b/c"))
|
||||
|
||||
input = [{'a': {'b': [1, 2, 3]}}, {'a': {'b': [4, 5, 6]}}]
|
||||
self.assertEqual([1, 2, 3, 4, 5, 6], f(input, "a/b"))
|
||||
self.assertEqual([], f(input, "a/b/c"))
|
||||
|
||||
input = [{'a': [{'b': [1, 2, 3]}, {'b': [4, 5, 6]}]}]
|
||||
self.assertEqual([1, 2, 3, 4, 5, 6], f(input, "a/b"))
|
||||
self.assertEqual([], f(input, "a/b/c"))
|
||||
|
||||
input = [{'a': [1, 2, {'b': 'b_1'}]}]
|
||||
self.assertEqual([1, 2, {'b': 'b_1'}], f(input, "a"))
|
||||
self.assertEqual(['b_1'], f(input, "a/b"))
|
||||
|
||||
def test_bad_xpath(self):
|
||||
f = utils.get_from_path
|
||||
|
||||
self.assertRaises(exception.Error, f, [], None)
|
||||
self.assertRaises(exception.Error, f, [], "")
|
||||
self.assertRaises(exception.Error, f, [], "/")
|
||||
self.assertRaises(exception.Error, f, [], "/a")
|
||||
self.assertRaises(exception.Error, f, [], "/a/")
|
||||
self.assertRaises(exception.Error, f, [], "//")
|
||||
self.assertRaises(exception.Error, f, [], "//a")
|
||||
self.assertRaises(exception.Error, f, [], "a//a")
|
||||
self.assertRaises(exception.Error, f, [], "a//a/")
|
||||
self.assertRaises(exception.Error, f, [], "a/a/")
|
||||
|
||||
def test_real_failure1(self):
|
||||
# Real world failure case...
|
||||
# We weren't coping when the input was a Dictionary instead of a List
|
||||
# This led to test_accepts_dictionaries
|
||||
f = utils.get_from_path
|
||||
|
||||
inst = {'fixed_ip': {'floating_ips': [{'address': '1.2.3.4'}],
|
||||
'address': '192.168.0.3'},
|
||||
'hostname': ''}
|
||||
|
||||
private_ips = f(inst, 'fixed_ip/address')
|
||||
public_ips = f(inst, 'fixed_ip/floating_ips/address')
|
||||
self.assertEqual(['192.168.0.3'], private_ips)
|
||||
self.assertEqual(['1.2.3.4'], public_ips)
|
||||
|
||||
def test_accepts_dictionaries(self):
|
||||
f = utils.get_from_path
|
||||
|
||||
input = {'a': [1, 2, 3]}
|
||||
self.assertEqual([1, 2, 3], f(input, "a"))
|
||||
self.assertEqual([], f(input, "a/b"))
|
||||
self.assertEqual([], f(input, "a/b/c"))
|
||||
|
||||
input = {'a': {'b': [1, 2, 3]}}
|
||||
self.assertEqual([{'b': [1, 2, 3]}], f(input, "a"))
|
||||
self.assertEqual([1, 2, 3], f(input, "a/b"))
|
||||
self.assertEqual([], f(input, "a/b/c"))
|
||||
|
||||
input = {'a': [{'b': [1, 2, 3]}, {'b': [4, 5, 6]}]}
|
||||
self.assertEqual([1, 2, 3, 4, 5, 6], f(input, "a/b"))
|
||||
self.assertEqual([], f(input, "a/b/c"))
|
||||
|
||||
input = {'a': [1, 2, {'b': 'b_1'}]}
|
||||
self.assertEqual([1, 2, {'b': 'b_1'}], f(input, "a"))
|
||||
self.assertEqual(['b_1'], f(input, "a/b"))
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class GenericUtilsTestCase(test.TestCase):
|
||||
def test_read_cached_file(self):
|
||||
cache_data = {"data": 1123, "mtime": 1}
|
||||
with mock.patch.object(os.path, "getmtime", mock.Mock(return_value=1)):
|
||||
data = utils.read_cached_file("/this/is/a/fake", cache_data)
|
||||
self.assertEqual(cache_data["data"], data)
|
||||
os.path.getmtime.assert_called_once_with("/this/is/a/fake")
|
||||
|
||||
def test_read_modified_cached_file(self):
|
||||
with mock.patch.object(os.path, "getmtime", mock.Mock(return_value=2)):
|
||||
fake_contents = "lorem ipsum"
|
||||
fake_file = mock.Mock()
|
||||
fake_file.read = mock.Mock(return_value=fake_contents)
|
||||
fake_context_manager = mock.Mock()
|
||||
fake_context_manager.__enter__ = mock.Mock(return_value=fake_file)
|
||||
fake_context_manager.__exit__ = mock.Mock()
|
||||
with mock.patch.object(
|
||||
builtins, 'open',
|
||||
mock.Mock(return_value=fake_context_manager)):
|
||||
cache_data = {"data": 1123, "mtime": 1}
|
||||
self.reload_called = False
|
||||
|
||||
def test_reload(reloaded_data):
|
||||
self.assertEqual(fake_contents, reloaded_data)
|
||||
self.reload_called = True
|
||||
|
||||
data = utils.read_cached_file("/this/is/a/fake",
|
||||
cache_data,
|
||||
reload_func=test_reload)
|
||||
self.assertEqual(fake_contents, data)
|
||||
self.assertTrue(self.reload_called)
|
||||
fake_file.read.assert_called_once_with()
|
||||
fake_context_manager.__enter__.assert_any_call()
|
||||
builtins.open.assert_called_once_with("/this/is/a/fake")
|
||||
os.path.getmtime.assert_called_once_with("/this/is/a/fake")
|
||||
|
||||
def test_read_file_as_root(self):
|
||||
def fake_execute(*args, **kwargs):
|
||||
if args[1] == 'bad':
|
||||
raise exception.ProcessExecutionError
|
||||
return 'fakecontents', None
|
||||
|
||||
self.mock_object(utils, 'execute', fake_execute)
|
||||
contents = utils.read_file_as_root('good')
|
||||
self.assertEqual('fakecontents', contents)
|
||||
self.assertRaises(exception.FileNotFound,
|
||||
utils.read_file_as_root, 'bad')
|
||||
|
||||
def test_temporary_chown(self):
|
||||
def fake_execute(*args, **kwargs):
|
||||
if args[0] == 'chown':
|
||||
fake_execute.uid = args[1]
|
||||
self.mock_object(utils, 'execute', fake_execute)
|
||||
|
||||
with tempfile.NamedTemporaryFile() as f:
|
||||
with utils.temporary_chown(f.name, owner_uid=2):
|
||||
self.assertEqual(2, fake_execute.uid)
|
||||
self.assertEqual(fake_execute.uid, os.getuid())
|
||||
|
||||
def test_service_is_up(self):
|
||||
fts_func = datetime.datetime.fromtimestamp
|
||||
fake_now = 1000
|
||||
|
100
manila/utils.py
100
manila/utils.py
@ -218,57 +218,6 @@ class LazyPluggable(object):
|
||||
return getattr(backend, key)
|
||||
|
||||
|
||||
def get_from_path(items, path):
|
||||
"""Returns a list of items matching the specified path.
|
||||
|
||||
Takes an XPath-like expression e.g. prop1/prop2/prop3, and for each item
|
||||
in items, looks up items[prop1][prop2][prop3]. Like XPath, if any of the
|
||||
intermediate results are lists it will treat each list item individually.
|
||||
A 'None' in items or any child expressions will be ignored, this function
|
||||
will not throw because of None (anywhere) in items. The returned list
|
||||
will contain no None values.
|
||||
|
||||
"""
|
||||
if path is None:
|
||||
raise exception.Error('Invalid mini_xpath')
|
||||
|
||||
(first_token, sep, remainder) = path.partition('/')
|
||||
|
||||
if first_token == '':
|
||||
raise exception.Error('Invalid mini_xpath')
|
||||
|
||||
results = []
|
||||
|
||||
if items is None:
|
||||
return results
|
||||
|
||||
if not isinstance(items, list):
|
||||
# Wrap single objects in a list
|
||||
items = [items]
|
||||
|
||||
for item in items:
|
||||
if item is None:
|
||||
continue
|
||||
get_method = getattr(item, 'get', None)
|
||||
if get_method is None:
|
||||
continue
|
||||
child = get_method(first_token)
|
||||
if child is None:
|
||||
continue
|
||||
if isinstance(child, list):
|
||||
# Flatten intermediate lists
|
||||
for x in child:
|
||||
results.append(x)
|
||||
else:
|
||||
results.append(child)
|
||||
|
||||
if not sep:
|
||||
# No more tokens
|
||||
return results
|
||||
else:
|
||||
return get_from_path(results, remainder)
|
||||
|
||||
|
||||
def is_eventlet_bug105():
|
||||
"""Check if eventlet support IPv6 addresses.
|
||||
|
||||
@ -341,26 +290,6 @@ def monkey_patch():
|
||||
decorator("%s.%s" % (module, key), func))
|
||||
|
||||
|
||||
def read_cached_file(filename, cache_info, reload_func=None):
|
||||
"""Read from a file if it has been modified.
|
||||
|
||||
:param cache_info: dictionary to hold opaque cache.
|
||||
:param reload_func: optional function to be called with data when
|
||||
file is reloaded due to a modification.
|
||||
|
||||
:returns: data from file
|
||||
|
||||
"""
|
||||
mtime = os.path.getmtime(filename)
|
||||
if not cache_info or mtime != cache_info.get('mtime'):
|
||||
with open(filename) as fap:
|
||||
cache_info['data'] = fap.read()
|
||||
cache_info['mtime'] = mtime
|
||||
if reload_func:
|
||||
reload_func(cache_info['data'])
|
||||
return cache_info['data']
|
||||
|
||||
|
||||
def file_open(*args, **kwargs):
|
||||
"""Open file
|
||||
|
||||
@ -391,35 +320,6 @@ def validate_service_host(context, host):
|
||||
return service
|
||||
|
||||
|
||||
def read_file_as_root(file_path):
|
||||
"""Secure helper to read file as root."""
|
||||
try:
|
||||
out, _err = execute('cat', file_path, run_as_root=True)
|
||||
return out
|
||||
except exception.ProcessExecutionError:
|
||||
raise exception.FileNotFound(file_path=file_path)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def temporary_chown(path, owner_uid=None):
|
||||
"""Temporarily chown a path.
|
||||
|
||||
:params owner_uid: UID of temporary owner (defaults to current user)
|
||||
"""
|
||||
if owner_uid is None:
|
||||
owner_uid = os.getuid()
|
||||
|
||||
orig_uid = os.stat(path).st_uid
|
||||
|
||||
if orig_uid != owner_uid:
|
||||
execute('chown', owner_uid, path, run_as_root=True)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
if orig_uid != owner_uid:
|
||||
execute('chown', orig_uid, path, run_as_root=True)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def tempdir(**kwargs):
|
||||
tmpdir = tempfile.mkdtemp(**kwargs)
|
||||
|
Loading…
Reference in New Issue
Block a user