# -*- coding: utf-8 -*- # Copyright 2013 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import mock import requests from nailgun.test import base from nailgun.utils import camel_to_snake_case from nailgun.utils import compact from nailgun.utils import dict_merge from nailgun.utils import dict_update from nailgun.utils import flatten from nailgun.utils import get_lines from nailgun.utils import grouper from nailgun.utils import parse_bool from nailgun.utils import remove_key_from_dict from nailgun.utils import text_format_safe from nailgun.utils import traverse from nailgun.utils.debian import get_apt_preferences_line from nailgun.utils.debian import get_release_file from nailgun.utils.debian import parse_release_file from nailgun.utils.fake_generator import FakeNodesGenerator class TestUtils(base.BaseIntegrationTest): def test_dict_merge(self): custom = {"coord": [10, 10], "pass": "qwerty", "dict": {"body": "solid", "color": "black", "dict": {"stuff": "hz"}}} common = {"parent": None, "dict": {"transparency": 100, "dict": {"another_stuff": "hz"}}} result = dict_merge(custom, common) self.assertEqual(result, {"coord": [10, 10], "parent": None, "pass": "qwerty", "dict": {"body": "solid", "color": "black", "transparency": 100, "dict": {"stuff": "hz", "another_stuff": "hz"}}}) def test_dict_update_first_level(self): target = { 'a': {'b': 1}, 'c': {'d': {'e': {'f': 2}}} } update1 = { 'a': {'b': 2}, 'c': {'d': {'g': 2}} } dict_update(target, update1, level=1) self.assertEqual({'b': 2}, target['a']) self.assertEqual({'d': {'g': 2}}, target['c']) def test_dict_update_2nd_levels(self): target = { 'a': {'b': 1}, 'c': {'d': {'e': {'f': {'k': 2}}, 'l': 1}} } update1 = { 'a': {'b': 2}, 'c': {'d': {'e': {'g': 2}}} } dict_update(target, update1, level=2) self.assertEqual({'b': 2}, target['a']) self.assertEqual({'d': {'e': {'g': 2}, 'l': 1}}, target['c']) def test_dict_update_all_levels(self): target = { 'a': {'b': 1}, 'c': {'d': {'e': {'f': 2}}} } update1 = { 'a': {'b': 2}, 'c': {'d': {'g': 2}} } dict_update(target, update1) self.assertEqual({'b': 2}, target['a']) self.assertEqual({'d': {'g': 2, 'e': {'f': 2}}}, target['c']) def test_camel_case_to_snake_case(self): self.assertTrue( camel_to_snake_case('TestCase') == 'test_case') self.assertTrue( camel_to_snake_case('TTestCase') == 't_test_case') def test_compact(self): self.assertListEqual( compact([1, '', 5, False, None, False, 'test']), [1, 5, 'test']) def test_flatten(self): self.assertListEqual( flatten([7, 5, [3, [4, 5], 1], 2]), [7, 5, 3, [4, 5], 1, 2]) def test_grouper(self): self.assertEqual( list(grouper([0, 1, 2, 3], 2)), [(0, 1), (2, 3)]) self.assertEqual( list(grouper([0, 1, 2, 3, 4, 5], 3)), [(0, 1, 2), (3, 4, 5)]) self.assertEqual( list(grouper([0, 1, 2, 3, 4], 3)), [(0, 1, 2), (3, 4, None)]) self.assertEqual( list(grouper([0, 1, 2, 3, 4], 3, 'x')), [(0, 1, 2), (3, 4, 'x')]) def test_get_lines(self): empty = "" empty_multiline = "\n\n\n" non_empty = "abc\nfoo\nbar" mixed = "abc\n\nfoo\n\n\nbar" self.assertEqual(get_lines(empty), []) self.assertEqual(get_lines(empty_multiline), []) self.assertEqual(get_lines(non_empty), ['abc', 'foo', 'bar']) self.assertEqual(get_lines(mixed), ['abc', 'foo', 'bar']) def test_parse_bool(self): true_values = ['1', 't', 'T', 'TRUE', 'True', 'true'] false_values = ['0', 'f', 'F', 'FALSE', 'False', 'false'] for value in true_values: self.assertTrue(parse_bool(value)) for value in false_values: self.assertFalse(parse_bool(value)) self.assertRaises(ValueError, parse_bool, 'tru') self.assertRaises(ValueError, parse_bool, 'fals') def test_remove_key_from_dict(self): input_dict = { 'spam': { 'egg': { 'key_to_remove': 'a' } }, 'key_to_remove': 'b' } output_dict = remove_key_from_dict(input_dict, 'key_to_remove') self.assertDictEqual({'spam': {'egg': {}}}, output_dict) class TestTraverse(base.BaseUnitTest): class TestGenerator(object): @classmethod def test(cls, arg=None): return 'testvalue' @classmethod def evaluate(cls, func, arg=None): return getattr(cls, func)(arg) data = { 'foo': { 'generator': 'test', }, 'bar': 'test {a} string', 'baz': 42, 'regex': { 'source': 'test {a} string', 'error': 'an {a} error' }, 'list': [ { 'x': 'a {a} a', }, { 'y': 'b 42 b', } ] } def test_wo_formatting_context(self): result = traverse( self.data, keywords={'generator': self.TestGenerator.evaluate} ) self.assertEqual(result, { 'foo': 'testvalue', 'bar': 'test {a} string', 'baz': 42, 'regex': { 'source': 'test {a} string', 'error': 'an {a} error' }, 'list': [ { 'x': 'a {a} a', }, { 'y': 'b 42 b', } ]}) def test_w_formatting_context(self): result = traverse( self.data, formatter_context={'a': 13}, keywords={'generator': self.TestGenerator.evaluate} ) self.assertEqual(result, { 'foo': 'testvalue', 'bar': 'test 13 string', 'baz': 42, 'regex': { 'source': 'test {a} string', 'error': 'an {a} error' }, 'list': [ { 'x': 'a 13 a', }, { 'y': 'b 42 b', } ]}) def test_formatter_returns_informative_error(self): with self.assertRaisesRegexp(ValueError, '{a}'): traverse( self.data, formatter_context={'b': 13}, keywords={'generator': self.TestGenerator.evaluate} ) def test_w_safe_formatting_context(self): data = self.data.copy() data['bar'] = 'test {b} value' result = traverse( data, text_format_safe, {'a': 13}, keywords={'generator': self.TestGenerator.evaluate} ) self.assertEqual(result, { 'foo': 'testvalue', 'bar': 'test {b} value', 'baz': 42, 'regex': { 'source': 'test {a} string', 'error': 'an {a} error' }, 'list': [ { 'x': 'a 13 a', }, { 'y': 'b 42 b', } ]}) def test_custom_keywords(self): data = { 'key1': {'exp1': 'name1', 'exp1_arg': 'arg1'}, 'key2': {'exp2': 'name2'}, } generator1 = mock.MagicMock(return_value='val1') generator2 = mock.MagicMock(return_value='val2') result = traverse(data, keywords={ 'exp1': generator1, 'exp2': generator2 }) self.assertEqual({'key1': 'val1', 'key2': 'val2'}, result) generator1.assert_called_once_with('name1', 'arg1') generator2.assert_called_once_with('name2') class TestGetDebianReleaseFile(base.BaseUnitTest): @mock.patch('nailgun.utils.debian.requests.get') def test_normal_ubuntu_repo(self, m_get): get_release_file({ 'name': 'myrepo', 'uri': 'http://some-uri.com/path', 'suite': 'mysuite', 'section': 'main university', }) m_get.assert_called_with( 'http://some-uri.com/path/dists/mysuite/Release') @mock.patch('nailgun.utils.debian.requests.get') def test_flat_ubuntu_repo(self, m_get): testcases = [ # (suite, uri) ('/', 'http://some-uri.com/deb/Release'), ('/path', 'http://some-uri.com/deb/path/Release'), ('path', 'http://some-uri.com/deb/path/Release'), ] for suite, uri in testcases: get_release_file({ 'name': 'myrepo', 'uri': 'http://some-uri.com/deb', 'suite': suite, 'section': '', }) m_get.assert_called_with(uri) @mock.patch('nailgun.utils.debian.requests.get') def test_do_not_silence_http_errors(self, m_get): r = requests.Response() r.status_code = 404 m_get.return_value = r self.assertRaises(requests.exceptions.HTTPError, get_release_file, { 'name': 'myrepo', 'uri': 'http://some-uri.com/path', 'suite': 'mysuite', 'section': 'main university', }) @mock.patch('nailgun.utils.debian.requests.get') def test_do_not_retry_on_404(self, m_get): r = requests.Response() r.status_code = 404 m_get.return_value = r self.assertRaises(requests.exceptions.HTTPError, get_release_file, { 'name': 'myrepo', 'uri': 'http://some-uri.com/path', 'suite': 'mysuite', 'section': 'main university', }, retries=3) self.assertEqual(m_get.call_count, 1) @mock.patch('nailgun.utils.debian.requests.get') def test_do_retry_on_error(self, m_get): r = requests.Response() r.status_code = 500 m_get.return_value = r self.assertRaises(requests.exceptions.HTTPError, get_release_file, { 'name': 'myrepo', 'uri': 'http://some-uri.com/path', 'suite': 'mysuite', 'section': 'main university', }, retries=3) self.assertEqual(m_get.call_count, 3) @mock.patch('nailgun.utils.debian.requests.get') def test_returns_content_if_http_ok(self, m_get): r = requests.Response() r._content = 'content' r.status_code = 200 m_get.return_value = r content = get_release_file({ 'name': 'myrepo', 'uri': 'http://some-uri.com/path', 'suite': 'mysuite', 'section': 'main university', }) self.assertEqual(content, 'content') class TestParseDebianReleaseFile(base.BaseUnitTest): _deb_release_info = ''' Origin: TestOrigin Label: TestLabel Archive: test-archive Codename: testcodename Date: Thu, 08 May 2014 14:19:09 UTC Architectures: amd64 i386 Components: main restricted universe multiverse Description: Test Description MD5Sum: ead1cbf42ed119c50bf3aab28b5b6351 934 main/binary-amd64/Packages 52d605b4217be64f461751f233dd9a8f 96 main/binary-amd64/Release SHA1: 28c4460e3aaf1b93f11911fdc4ff23c28809af89 934 main/binary-amd64/Packages d03d716bceaba35f91726c096e2a9a8c23ccc766 96 main/binary-amd64/Release ''' def test_parse(self): deb_release = parse_release_file(self._deb_release_info) self.assertEqual(deb_release, { 'Origin': 'TestOrigin', 'Label': 'TestLabel', 'Archive': 'test-archive', 'Codename': 'testcodename', 'Date': 'Thu, 08 May 2014 14:19:09 UTC', 'Architectures': 'amd64 i386', 'Components': 'main restricted universe multiverse', 'Description': 'Test Description', 'MD5Sum': [ { 'md5sum': 'ead1cbf42ed119c50bf3aab28b5b6351', 'size': '934', 'name': 'main/binary-amd64/Packages', }, { 'md5sum': '52d605b4217be64f461751f233dd9a8f', 'size': '96', 'name': 'main/binary-amd64/Release', } ], 'SHA1': [ { 'sha1': '28c4460e3aaf1b93f11911fdc4ff23c28809af89', 'size': '934', 'name': 'main/binary-amd64/Packages', }, { 'sha1': 'd03d716bceaba35f91726c096e2a9a8c23ccc766', 'size': '96', 'name': 'main/binary-amd64/Release', } ], }) class TestGetAptPreferencesLine(base.BaseUnitTest): _deb_release = { 'Origin': 'TestOrigin', 'Label': 'TestLabel', 'Archive': 'test-archive', 'Version': '42.42', 'Codename': 'testcodename', 'Date': 'Thu, 08 May 2014 14:19:09 UTC', 'Architectures': 'amd64 i386', 'Components': 'main restricted universe multiverse', 'Description': 'Test Description', } def test_all_rules(self): pin = get_apt_preferences_line(self._deb_release) self.assertItemsEqual(pin.split(','), [ 'o=TestOrigin', 'l=TestLabel', 'a=test-archive', 'v=42.42', 'n=testcodename', ]) def test_not_all_rules(self): deb_release = self._deb_release.copy() del deb_release['Codename'] del deb_release['Label'] del deb_release['Version'] pin = get_apt_preferences_line(deb_release) self.assertItemsEqual(pin.split(','), [ 'o=TestOrigin', 'a=test-archive', ]) def test_suite_is_synonym_for_archive(self): deb_release = self._deb_release.copy() deb_release['Suite'] = 'test-suite' del deb_release['Archive'] pin = get_apt_preferences_line(deb_release) conditions = pin.split(',') self.assertIn('a=test-suite', conditions) class TestFakeNodeGenerator(base.BaseUnitTest): def setUp(self): self.generator = FakeNodesGenerator() def test_generate_fake_nodes_with_total_count(self): total_nodes_count = 8 generated_nodes = self.generator.generate_fake_nodes(total_nodes_count) self.assertEqual(total_nodes_count, len(generated_nodes)) def test_generate_fake_nodes_with_error_offline_nodes(self): total_nodes_count = 7 error_nodes_count = 2 offline_nodes_count = 1 offloading_ifaces_nodes_count = 3 generated_nodes = self.generator.generate_fake_nodes( total_nodes_count, error_nodes_count, offline_nodes_count, offloading_ifaces_nodes_count ) self.assertEqual(total_nodes_count, len(generated_nodes)) generated_error_nodes = [n for n in generated_nodes if n['fields']['status'] == 'error'] self.assertEqual(error_nodes_count, len(generated_error_nodes)) generated_offline_nodes = [n for n in generated_nodes if not n['fields']['online']] self.assertEqual(offline_nodes_count, len(generated_offline_nodes)) offloading_ifaces_nodes = [ n for n in generated_nodes if n['fields']['meta']['interfaces'][0].get('offloading_modes') ] self.assertEqual(offloading_ifaces_nodes_count, len(offloading_ifaces_nodes)) def test_generate_fake_nodes_with_wrong_params(self): total_nodes_count = 4 error_nodes_count = 2 offline_nodes_count = 5 generated_nodes = self.generator.generate_fake_nodes( total_nodes_count, error_nodes_count, offline_nodes_count) self.assertEqual(total_nodes_count, len(generated_nodes)) actual_error_nodes = len( [n for n in generated_nodes if n['fields']['status'] == 'error']) actual_offline_nodes = len( [n for n in generated_nodes if not n['fields']['online']]) self.assertTrue( total_nodes_count >= actual_error_nodes + actual_offline_nodes) def test_generate_fake_node_common_structure(self): pk = 123 sample_node_fields = { 'status': 'discover', 'name': 'Supermicro X9DRW', 'hostname': 'node-1', 'ip': '172.18.67.168', 'online': True, 'labels': {}, 'pending_addition': False, 'platform_name': 'X9DRW', 'mac': '00:25:90:6a:b1:10', 'timestamp': '', 'progress': 0, 'pending_deletion': False, 'os_platform': 'ubuntu', 'manufacturer': 'Supermicro', 'meta': { 'cpu': {}, 'interfaces': [], 'disks': [], 'system': {}, 'memory': [], 'numa_topology': { 'distances': [], 'numa_nodes': [], 'supported_hugepages': [], }, }, } generated_node = self.generator.generate_fake_node(pk) self.assertEqual(generated_node.get('pk'), pk) self.assertEqual(generated_node.get('model'), 'nailgun.node') generated_node_fields = generated_node.get('fields', {}) self.assertItemsEqual(sample_node_fields.keys(), generated_node_fields.keys()) self.assertItemsEqual(sample_node_fields['meta'].keys(), generated_node_fields.get('meta', {}).keys()) self.assertItemsEqual( sample_node_fields['meta']['numa_topology'], generated_node_fields.get('meta', {}).get('numa_topology', {}) ) self.assertFalse(generated_node_fields.get('pending_deletion')) self.assertFalse(generated_node_fields.get('pending_addition')) self.assertEqual(generated_node_fields.get('progress'), 0) def test_generate_fake_node_with_params(self): pk = 123 is_online = False generated_node = self.generator.generate_fake_node( pk, is_online=is_online, use_offload_iface=True) self.assertEqual(pk, generated_node.get('pk')) generated_node_fields = generated_node.get('fields') self.assertEqual(is_online, generated_node_fields.get('online')) self.assertEqual('discover', generated_node_fields.get('status')) self.assertIn('offloading_modes', generated_node_fields['meta'].get('interfaces')[0]) pk = 321 generated_node = self.generator.generate_fake_node( pk, is_error=True, use_offload_iface=False) self.assertEqual(pk, generated_node.get('pk')) generated_node_fields = generated_node.get('fields') self.assertTrue(generated_node_fields.get('online')) self.assertEqual('error', generated_node_fields.get('status')) self.assertNotIn('offloading_modes', generated_node_fields['meta'].get('interfaces')[0]) def test_generate_fake_node_interface_meta(self): generated_node_fields = self.generator.generate_fake_node(1)['fields'] known_mac = generated_node_fields['mac'] known_ip = generated_node_fields['ip'] ifaces = generated_node_fields['meta']['interfaces'] suitable_ifaces = [ x['name'] for x in ifaces if x.get('ip') == known_ip or x.get('mac') == known_mac ] self.assertEqual(len(set(suitable_ifaces)), 1) def test_generate_fake_node_disk_suffixes(self): count = 500 disk_suffixes = list(self.generator._get_disk_suffixes(count)) self.assertEqual(len(set(disk_suffixes)), count) self.assertEqual(disk_suffixes[27], 'ab') def test_generate_disks_meta_with_non_zero_size(self): memory_random_side_effect = [ { 'model': 'Virtual Floppy0', 'name': 'sde', 'disk': 'sde', 'size': 0 }, { 'model': 'Virtual HDisk0', 'name': 'sdf', 'disk': 'sdf', 'size': 0 }, { 'model': 'TOSHIBA MK1002TS', 'name': 'sda', 'disk': 'sda', 'size': 1000204886016 } ] with mock.patch('random.choice', side_effect=memory_random_side_effect) as mock_random: disks_meta = self.generator._generate_disks_meta(1) self.assertNotEqual(disks_meta[0]['size'], 0) self.assertEqual(mock_random.call_count, 3)