# Copyright 2012-2013 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # import time import uuid import mock from osc_lib import exceptions from osc_lib.tests import fakes from osc_lib.tests import utils as test_utils from osc_lib import utils PASSWORD = "Pa$$w0rd" WASSPORD = "Wa$$p0rd" DROWSSAP = "dr0w$$aP" class FakeOddballResource(fakes.FakeResource): def get(self, attr): """get() is needed for utils.find_resource()""" if attr == 'id': return self.id elif attr == 'name': return self.name else: return None class TestUtils(test_utils.TestCase): def _get_test_items(self): item1 = {'a': 1, 'b': 2} item2 = {'a': 1, 'b': 3} item3 = {'a': 2, 'b': 2} item4 = {'a': 2, 'b': 1} return [item1, item2, item3, item4] def test_find_min_match_no_sort(self): items = self._get_test_items() sort_str = None flair = {} expect_items = items self.assertEqual( expect_items, list(utils.find_min_match(items, sort_str, **flair)), ) def test_find_min_match_no_flair(self): items = self._get_test_items() sort_str = 'b' flair = {} expect_items = [items[3], items[0], items[2], items[1]] self.assertEqual( expect_items, utils.find_min_match(items, sort_str, **flair), ) def test_find_min_match_a2(self): items = self._get_test_items() sort_str = 'b' flair = {'a': 2} expect_items = [items[3], items[2]] self.assertEqual( expect_items, utils.find_min_match(items, sort_str, **flair), ) def test_find_min_match_b2(self): items = self._get_test_items() sort_str = 'b' flair = {'b': 2} expect_items = [items[0], items[2], items[1]] self.assertEqual( expect_items, utils.find_min_match(items, sort_str, **flair), ) def test_find_min_match_b5(self): items = self._get_test_items() sort_str = 'b' flair = {'b': 5} expect_items = [] self.assertEqual( expect_items, utils.find_min_match(items, sort_str, **flair), ) def test_find_min_match_a2_b2(self): items = self._get_test_items() sort_str = 'b' flair = {'a': 2, 'b': 2} expect_items = [items[2]] self.assertEqual( expect_items, utils.find_min_match(items, sort_str, **flair), ) def test_get_password_good(self): with mock.patch("getpass.getpass", return_value=PASSWORD): mock_stdin = mock.Mock() mock_stdin.isatty = mock.Mock() mock_stdin.isatty.return_value = True self.assertEqual(PASSWORD, utils.get_password(mock_stdin)) def test_get_password_bad_once(self): answers = [PASSWORD, WASSPORD, DROWSSAP, DROWSSAP] with mock.patch("getpass.getpass", side_effect=answers): mock_stdin = mock.Mock() mock_stdin.isatty = mock.Mock() mock_stdin.isatty.return_value = True self.assertEqual(DROWSSAP, utils.get_password(mock_stdin)) def test_get_password_no_tty(self): mock_stdin = mock.Mock() mock_stdin.isatty = mock.Mock() mock_stdin.isatty.return_value = False self.assertRaises(exceptions.CommandError, utils.get_password, mock_stdin) def test_get_password_cntrl_d(self): with mock.patch("getpass.getpass", side_effect=EOFError()): mock_stdin = mock.Mock() mock_stdin.isatty = mock.Mock() mock_stdin.isatty.return_value = True self.assertRaises(exceptions.CommandError, utils.get_password, mock_stdin) def test_sort_items_with_one_key(self): items = self._get_test_items() sort_str = 'b' expect_items = [items[3], items[0], items[2], items[1]] self.assertEqual(expect_items, utils.sort_items(items, sort_str)) def test_sort_items_with_multiple_keys(self): items = self._get_test_items() sort_str = 'a,b' expect_items = [items[0], items[1], items[3], items[2]] self.assertEqual(expect_items, utils.sort_items(items, sort_str)) def test_sort_items_all_with_direction(self): items = self._get_test_items() sort_str = 'a:desc,b:desc' expect_items = [items[2], items[3], items[1], items[0]] self.assertEqual(expect_items, utils.sort_items(items, sort_str)) def test_sort_items_some_with_direction(self): items = self._get_test_items() sort_str = 'a,b:desc' expect_items = [items[1], items[0], items[2], items[3]] self.assertEqual(expect_items, utils.sort_items(items, sort_str)) def test_sort_items_with_object(self): item1 = mock.Mock(a=1, b=2) item2 = mock.Mock(a=1, b=3) item3 = mock.Mock(a=2, b=2) item4 = mock.Mock(a=2, b=1) items = [item1, item2, item3, item4] sort_str = 'b,a' expect_items = [item4, item1, item3, item2] self.assertEqual(expect_items, utils.sort_items(items, sort_str)) def test_sort_items_with_empty_key(self): items = self._get_test_items() sort_srt = '' self.assertEqual(items, utils.sort_items(items, sort_srt)) sort_srt = None self.assertEqual(items, utils.sort_items(items, sort_srt)) def test_sort_items_with_invalid_key(self): items = self._get_test_items() sort_str = 'c' self.assertRaises(exceptions.CommandError, utils.sort_items, items, sort_str) def test_sort_items_with_invalid_direction(self): items = self._get_test_items() sort_str = 'a:bad_dir' self.assertRaises(exceptions.CommandError, utils.sort_items, items, sort_str) @mock.patch.object(time, 'sleep') def test_wait_for_delete_ok(self, mock_sleep): # Tests the normal flow that the resource is deleted with a 404 coming # back on the 2nd iteration of the wait loop. resource = mock.MagicMock(status='ACTIVE', progress=None) mock_get = mock.Mock(side_effect=[resource, exceptions.NotFound(404)]) manager = mock.MagicMock(get=mock_get) res_id = str(uuid.uuid4()) callback = mock.Mock() self.assertTrue(utils.wait_for_delete(manager, res_id, callback=callback)) mock_sleep.assert_called_once_with(5) callback.assert_called_once_with(0) @mock.patch.object(time, 'sleep') def test_wait_for_delete_timeout(self, mock_sleep): # Tests that we fail if the resource is not deleted before the timeout. resource = mock.MagicMock(status='ACTIVE') mock_get = mock.Mock(return_value=resource) manager = mock.MagicMock(get=mock_get) res_id = str(uuid.uuid4()) self.assertFalse(utils.wait_for_delete(manager, res_id, sleep_time=1, timeout=1)) mock_sleep.assert_called_once_with(1) @mock.patch.object(time, 'sleep') def test_wait_for_delete_error(self, mock_sleep): # Tests that we fail if the resource goes to error state while waiting. resource = mock.MagicMock(status='ERROR') mock_get = mock.Mock(return_value=resource) manager = mock.MagicMock(get=mock_get) res_id = str(uuid.uuid4()) self.assertFalse(utils.wait_for_delete(manager, res_id)) mock_sleep.assert_not_called() @mock.patch.object(time, 'sleep') def test_wait_for_delete_error_with_overrides(self, mock_sleep): # Tests that we fail if the resource is my_status=failed resource = mock.MagicMock(my_status='FAILED') mock_get = mock.Mock(return_value=resource) manager = mock.MagicMock(get=mock_get) res_id = str(uuid.uuid4()) self.assertFalse(utils.wait_for_delete(manager, res_id, status_field='my_status', error_status=['failed'])) mock_sleep.assert_not_called() @mock.patch.object(time, 'sleep') def test_wait_for_delete_error_with_overrides_exception(self, mock_sleep): # Tests that we succeed if the resource is specific exception mock_get = mock.Mock(side_effect=Exception) manager = mock.MagicMock(get=mock_get) res_id = str(uuid.uuid4()) self.assertTrue(utils.wait_for_delete(manager, res_id, exception_name=['Exception'])) mock_sleep.assert_not_called() @mock.patch.object(time, 'sleep') def test_wait_for_status_ok(self, mock_sleep): # Tests the normal flow that the resource is status=active resource = mock.MagicMock(status='ACTIVE') status_f = mock.Mock(return_value=resource) res_id = str(uuid.uuid4()) self.assertTrue(utils.wait_for_status(status_f, res_id,)) mock_sleep.assert_not_called() @mock.patch.object(time, 'sleep') def test_wait_for_status_ok_with_overrides(self, mock_sleep): # Tests the normal flow that the resource is status=complete resource = mock.MagicMock(my_status='COMPLETE') status_f = mock.Mock(return_value=resource) res_id = str(uuid.uuid4()) self.assertTrue(utils.wait_for_status(status_f, res_id, status_field='my_status', success_status=['complete'])) mock_sleep.assert_not_called() @mock.patch.object(time, 'sleep') def test_wait_for_status_error(self, mock_sleep): # Tests that we fail if the resource is status=error resource = mock.MagicMock(status='ERROR') status_f = mock.Mock(return_value=resource) res_id = str(uuid.uuid4()) self.assertFalse(utils.wait_for_status(status_f, res_id)) mock_sleep.assert_not_called() @mock.patch.object(time, 'sleep') def test_wait_for_status_error_with_overrides(self, mock_sleep): # Tests that we fail if the resource is my_status=failed resource = mock.MagicMock(my_status='FAILED') status_f = mock.Mock(return_value=resource) res_id = str(uuid.uuid4()) self.assertFalse(utils.wait_for_status(status_f, res_id, status_field='my_status', error_status=['failed'])) mock_sleep.assert_not_called() def test_build_kwargs_dict_value_set(self): self.assertEqual({'arg_bla': 'bla'}, utils.build_kwargs_dict('arg_bla', 'bla')) def test_build_kwargs_dict_value_None(self): self.assertEqual({}, utils.build_kwargs_dict('arg_bla', None)) def test_build_kwargs_dict_value_empty_str(self): self.assertEqual({}, utils.build_kwargs_dict('arg_bla', '')) def test_is_ascii_bytes(self): self.assertFalse(utils.is_ascii(b'\xe2')) def test_is_ascii_string(self): self.assertFalse(utils.is_ascii(u'\u2665')) def test_format_size(self): self.assertEqual("999", utils.format_size(999)) self.assertEqual("100K", utils.format_size(100000)) self.assertEqual("2M", utils.format_size(2000000)) self.assertEqual( "16.4M", utils.format_size(16361280) ) self.assertEqual( "1.6G", utils.format_size(1576395005) ) self.assertEqual("0", utils.format_size(None)) def test_backward_compat_col_lister(self): fake_col_headers = ['ID', 'Name', 'Size'] columns = ['Display Name'] column_map = {'Display Name': 'Name'} results = utils.backward_compat_col_lister(fake_col_headers, columns, column_map) self.assertIsInstance(results, list) self.assertIn('Display Name', results) self.assertNotIn('Name', results) self.assertIn('ID', results) self.assertIn('Size', results) def test_backward_compat_col_lister_no_specify_column(self): fake_col_headers = ['ID', 'Name', 'Size'] columns = [] column_map = {'Display Name': 'Name'} results = utils.backward_compat_col_lister(fake_col_headers, columns, column_map) self.assertIsInstance(results, list) self.assertNotIn('Display Name', results) self.assertIn('Name', results) self.assertIn('ID', results) self.assertIn('Size', results) def test_backward_compat_col_lister_with_tuple_headers(self): fake_col_headers = ('ID', 'Name', 'Size') columns = ['Display Name'] column_map = {'Display Name': 'Name'} results = utils.backward_compat_col_lister(fake_col_headers, columns, column_map) self.assertIsInstance(results, list) self.assertIn('Display Name', results) self.assertNotIn('Name', results) self.assertIn('ID', results) self.assertIn('Size', results) def test_backward_compat_col_showone(self): fake_object = {'id': 'fake-id', 'name': 'fake-name', 'size': 'fake-size'} columns = ['display_name'] column_map = {'display_name': 'name'} results = utils.backward_compat_col_showone(fake_object, columns, column_map) self.assertIsInstance(results, dict) self.assertIn('display_name', results) self.assertIn('id', results) self.assertNotIn('name', results) self.assertIn('size', results) def test_backward_compat_col_showone_no_specify_column(self): fake_object = {'id': 'fake-id', 'name': 'fake-name', 'size': 'fake-size'} columns = [] column_map = {'display_name': 'name'} results = utils.backward_compat_col_showone(fake_object, columns, column_map) self.assertIsInstance(results, dict) self.assertNotIn('display_name', results) self.assertIn('id', results) self.assertIn('name', results) self.assertIn('size', results) class NoUniqueMatch(Exception): pass class TestFindResource(test_utils.TestCase): def setUp(self): super(TestFindResource, self).setUp() self.name = 'legos' self.expected = mock.Mock() self.manager = mock.Mock() self.manager.resource_class = mock.Mock() self.manager.resource_class.__name__ = 'lego' def test_find_resource_get_int(self): self.manager.get = mock.Mock(return_value=self.expected) result = utils.find_resource(self.manager, 1) self.assertEqual(self.expected, result) self.manager.get.assert_called_with(1) def test_find_resource_get_int_string(self): self.manager.get = mock.Mock(return_value=self.expected) result = utils.find_resource(self.manager, "2") self.assertEqual(self.expected, result) self.manager.get.assert_called_with("2") def test_find_resource_get_name_and_domain(self): name = 'admin' domain_id = '30524568d64447fbb3fa8b7891c10dd6' # NOTE(stevemar): we need an iterable side-effect because the same # function (manager.get()) is used twice, the first time an exception # will happen, then the result will be found, but only after using # the domain ID as a query arg side_effect = [Exception('Boom!'), self.expected] self.manager.get = mock.Mock(side_effect=side_effect) result = utils.find_resource(self.manager, name, domain_id=domain_id) self.assertEqual(self.expected, result) self.manager.get.assert_called_with(name, domain_id=domain_id) def test_find_resource_get_uuid(self): uuid = '9a0dc2a0-ad0d-11e3-a5e2-0800200c9a66' self.manager.get = mock.Mock(return_value=self.expected) result = utils.find_resource(self.manager, uuid) self.assertEqual(self.expected, result) self.manager.get.assert_called_with(uuid) def test_find_resource_get_whatever(self): self.manager.get = mock.Mock(return_value=self.expected) result = utils.find_resource(self.manager, 'whatever') self.assertEqual(self.expected, result) self.manager.get.assert_called_with('whatever') def test_find_resource_find(self): self.manager.get = mock.Mock(side_effect=Exception('Boom!')) self.manager.find = mock.Mock(return_value=self.expected) result = utils.find_resource(self.manager, self.name) self.assertEqual(self.expected, result) self.manager.get.assert_called_with(self.name) self.manager.find.assert_called_with(name=self.name) def test_find_resource_find_not_found(self): self.manager.get = mock.Mock(side_effect=Exception('Boom!')) self.manager.find = mock.Mock( side_effect=exceptions.NotFound(404, "2") ) result = self.assertRaises(exceptions.CommandError, utils.find_resource, self.manager, self.name) self.assertEqual("No lego with a name or ID of 'legos' exists.", str(result)) self.manager.get.assert_called_with(self.name) self.manager.find.assert_called_with(name=self.name) def test_find_resource_list_forbidden(self): self.manager.get = mock.Mock(side_effect=Exception('Boom!')) self.manager.find = mock.Mock(side_effect=Exception('Boom!')) self.manager.list = mock.Mock( side_effect=exceptions.Forbidden(403) ) self.assertRaises(exceptions.Forbidden, utils.find_resource, self.manager, self.name) self.manager.list.assert_called_with() def test_find_resource_find_no_unique(self): self.manager.get = mock.Mock(side_effect=Exception('Boom!')) self.manager.find = mock.Mock(side_effect=NoUniqueMatch()) result = self.assertRaises(exceptions.CommandError, utils.find_resource, self.manager, self.name) self.assertEqual("More than one lego exists with the name 'legos'.", str(result)) self.manager.get.assert_called_with(self.name) self.manager.find.assert_called_with(name=self.name) def test_find_resource_silly_resource(self): # We need a resource with no resource_class for this test, start fresh self.manager = mock.Mock() self.manager.get = mock.Mock(side_effect=Exception('Boom!')) self.manager.find = mock.Mock( side_effect=AttributeError( "'Controller' object has no attribute 'find'", ) ) silly_resource = FakeOddballResource( None, {'id': '12345', 'name': self.name}, loaded=True, ) self.manager.list = mock.Mock( return_value=[silly_resource, ], ) result = utils.find_resource(self.manager, self.name) self.assertEqual(silly_resource, result) self.manager.get.assert_called_with(self.name) self.manager.find.assert_called_with(name=self.name) def test_find_resource_silly_resource_not_found(self): # We need a resource with no resource_class for this test, start fresh self.manager = mock.Mock() self.manager.get = mock.Mock(side_effect=Exception('Boom!')) self.manager.find = mock.Mock( side_effect=AttributeError( "'Controller' object has no attribute 'find'", ) ) self.manager.list = mock.Mock(return_value=[]) result = self.assertRaises(exceptions.CommandError, utils.find_resource, self.manager, self.name) self.assertEqual("Could not find resource legos", str(result)) self.manager.get.assert_called_with(self.name) self.manager.find.assert_called_with(name=self.name) def test_find_resource_silly_resource_no_unique_match(self): # We need a resource with no resource_class for this test, start fresh self.manager = mock.Mock() self.manager.get = mock.Mock(side_effect=Exception('Boom!')) self.manager.find = mock.Mock( side_effect=AttributeError( "'Controller' object has no attribute 'find'", ) ) silly_resource = FakeOddballResource( None, {'id': '12345', 'name': self.name}, loaded=True, ) silly_resource_same = FakeOddballResource( None, {'id': 'abcde', 'name': self.name}, loaded=True, ) self.manager.list = mock.Mock(return_value=[silly_resource, silly_resource_same]) result = self.assertRaises(exceptions.CommandError, utils.find_resource, self.manager, self.name) self.assertEqual("More than one resource exists " "with the name or ID 'legos'.", str(result)) self.manager.get.assert_called_with(self.name) self.manager.find.assert_called_with(name=self.name) def test_format_dict(self): expected = "a='b', c='d', e='f'" self.assertEqual(expected, utils.format_dict({'a': 'b', 'c': 'd', 'e': 'f'})) self.assertEqual(expected, utils.format_dict({'e': 'f', 'c': 'd', 'a': 'b'})) self.assertIsNone(utils.format_dict(None)) def test_format_dict_of_list(self): expected = "a=a1, a2; b=b1, b2; c=c1, c2; e=" self.assertEqual(expected, utils.format_dict_of_list({'a': ['a2', 'a1'], 'b': ['b2', 'b1'], 'c': ['c1', 'c2'], 'd': None, 'e': []}) ) self.assertEqual(expected, utils.format_dict_of_list({'c': ['c1', 'c2'], 'a': ['a2', 'a1'], 'b': ['b2', 'b1'], 'e': []}) ) self.assertIsNone(utils.format_dict_of_list(None)) def test_format_dict_of_list_with_separator(self): expected = "a=a1, a2\nb=b1, b2\nc=c1, c2\ne=" self.assertEqual(expected, utils.format_dict_of_list({'a': ['a2', 'a1'], 'b': ['b2', 'b1'], 'c': ['c1', 'c2'], 'd': None, 'e': []}, separator='\n') ) self.assertEqual(expected, utils.format_dict_of_list({'c': ['c1', 'c2'], 'a': ['a2', 'a1'], 'b': ['b2', 'b1'], 'e': []}, separator='\n') ) self.assertIsNone(utils.format_dict_of_list(None, separator='\n')) def test_format_list(self): expected = 'a, b, c' self.assertEqual(expected, utils.format_list(['a', 'b', 'c'])) self.assertEqual(expected, utils.format_list(['c', 'b', 'a'])) self.assertIsNone(utils.format_list(None)) def test_format_list_of_dicts(self): expected = "a='b', c='d'\ne='f'" sorted_data = [{'a': 'b', 'c': 'd'}, {'e': 'f'}] unsorted_data = [{'c': 'd', 'a': 'b'}, {'e': 'f'}] self.assertEqual(expected, utils.format_list_of_dicts(sorted_data)) self.assertEqual(expected, utils.format_list_of_dicts(unsorted_data)) self.assertEqual('', utils.format_list_of_dicts([])) self.assertEqual('', utils.format_list_of_dicts([{}])) self.assertIsNone(utils.format_list_of_dicts(None)) def test_format_list_separator(self): expected = 'a\nb\nc' actual_pre_sorted = utils.format_list(['a', 'b', 'c'], separator='\n') actual_unsorted = utils.format_list(['c', 'b', 'a'], separator='\n') self.assertEqual(expected, actual_pre_sorted) self.assertEqual(expected, actual_unsorted)