From 20d10bc97cdd019cf3ec972b8dd2c3acbfa3d49d Mon Sep 17 00:00:00 2001 From: Hunt Xu Date: Tue, 13 Mar 2018 18:03:20 +0800 Subject: [PATCH] tests: add unit tests for qinling.orchestrator This commit adds unit tests for exposed methods in qinling.orchestrator. This commit only covers branches and exceptions found in the exposed methods. Those in internal helper functions would be covered by a following patch. Change-Id: I47df26435d93a3dc310314252cd20e624b4d0bb2 --- qinling/tests/unit/orchestrator/__init__.py | 0 .../unit/orchestrator/kubernetes/__init__.py | 0 .../orchestrator/kubernetes/test_manager.py | 533 ++++++++++++++++++ 3 files changed, 533 insertions(+) create mode 100644 qinling/tests/unit/orchestrator/__init__.py create mode 100644 qinling/tests/unit/orchestrator/kubernetes/__init__.py create mode 100644 qinling/tests/unit/orchestrator/kubernetes/test_manager.py diff --git a/qinling/tests/unit/orchestrator/__init__.py b/qinling/tests/unit/orchestrator/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/qinling/tests/unit/orchestrator/kubernetes/__init__.py b/qinling/tests/unit/orchestrator/kubernetes/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/qinling/tests/unit/orchestrator/kubernetes/test_manager.py b/qinling/tests/unit/orchestrator/kubernetes/test_manager.py new file mode 100644 index 00000000..54532b7a --- /dev/null +++ b/qinling/tests/unit/orchestrator/kubernetes/test_manager.py @@ -0,0 +1,533 @@ +# Copyright 2018 AWCloud Software Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +import yaml + +from oslo_config import cfg + +from qinling import config +from qinling import exceptions as exc +from qinling.orchestrator.kubernetes import manager as k8s_manager +from qinling.tests.unit import base +from qinling.utils import common + +CONF = cfg.CONF +SERVICE_PORT = 9090 +SERVICE_ADDRESS_EXTERNAL = '1.2.3.4' + + +class TestKubernetesManager(base.BaseTest): + def setUp(self): + super(TestKubernetesManager, self).setUp() + CONF.register_opts(config.kubernetes_opts, config.KUBERNETES_GROUP) + self.conf = CONF + self.qinling_endpoint = 'http://127.0.0.1:7070' + self.k8s_v1_api = mock.Mock() + self.k8s_v1_ext = mock.Mock() + clients = {'v1': self.k8s_v1_api, + 'v1extention': self.k8s_v1_ext} + mock.patch( + 'qinling.orchestrator.kubernetes.utils.get_k8s_clients', + return_value=clients + ).start() + self.fake_namespace = self.rand_name('namespace', + prefix='TestKubernetesManager') + self.override_config('namespace', self.fake_namespace, + config.KUBERNETES_GROUP) + namespace = mock.Mock() + namespace.metadata.name = self.fake_namespace + namespaces = mock.Mock() + namespaces.items = [namespace] + self.k8s_v1_api.list_namespace.return_value = namespaces + self.manager = k8s_manager.KubernetesManager( + self.conf, self.qinling_endpoint) + + def _create_service(self): + port = mock.Mock() + port.node_port = SERVICE_PORT + service = mock.Mock() + service.spec.ports = [port] + return service + + def _create_nodes_with_external_ip(self): + addr1 = mock.Mock() + addr1.type = 'UNKNOWN TYPE' + addr2 = mock.Mock() + addr2.type = 'ExternalIP' + addr2.address = SERVICE_ADDRESS_EXTERNAL + item = mock.Mock() + item.status.addresses = [addr1, addr2] + nodes = mock.Mock() + nodes.items = [item] + return nodes + + def test_create_pool(self): + ret = mock.Mock() + ret.status.replicas = 5 + ret.status.available_replicas = 5 + self.k8s_v1_ext.read_namespaced_deployment.return_value = ret + fake_replicas = 5 + self.override_config('replicas', fake_replicas, + config.KUBERNETES_GROUP) + fake_deployment_name = self.rand_name('deployment', + prefix='TestKubernetesManager') + fake_image = self.rand_name('image', prefix='TestKubernetesManager') + + self.manager.create_pool(fake_deployment_name, fake_image) + + deployment_body = self.manager.deployment_template.render( + { + 'name': fake_deployment_name, + 'labels': {'runtime_id': fake_deployment_name}, + 'replicas': fake_replicas, + 'container_name': 'worker', + 'image': fake_image + } + ) + self.k8s_v1_ext.create_namespaced_deployment.assert_called_once_with( + body=yaml.safe_load(deployment_body), + namespace=self.fake_namespace) + self.k8s_v1_ext.read_namespaced_deployment.assert_called_once_with( + fake_deployment_name, self.fake_namespace) + + def test_delete_pool(self): + # Deleting namespaced service is also tested in this. + svc1 = mock.Mock() + svc1_name = self.rand_name('service', prefix='TestKubernetesManager') + svc1.metadata.name = svc1_name + svc2 = mock.Mock() + svc2_name = self.rand_name('service', prefix='TestKubernetesManager') + svc2.metadata.name = svc2_name + services = mock.Mock() + services.items = [svc1, svc2] + self.k8s_v1_api.list_namespaced_service.return_value = services + fake_deployment_name = self.rand_name('deployment', + prefix='TestKubernetesManager') + + self.manager.delete_pool(fake_deployment_name) + + del_rep = self.k8s_v1_ext.delete_collection_namespaced_replica_set + del_rep.assert_called_once_with( + self.fake_namespace, + label_selector='runtime_id=%s' % fake_deployment_name) + delete_service_calls = [ + mock.call(svc1_name, self.fake_namespace), + mock.call(svc2_name, self.fake_namespace), + ] + self.k8s_v1_api.delete_namespaced_service.assert_has_calls( + delete_service_calls) + self.assertEqual( + 2, self.k8s_v1_api.delete_namespaced_service.call_count) + del_dep = self.k8s_v1_ext.delete_collection_namespaced_deployment + del_dep.assert_called_once_with( + self.fake_namespace, + label_selector='runtime_id=%s' % fake_deployment_name, + field_selector='metadata.name=%s' % fake_deployment_name) + del_pod = self.k8s_v1_api.delete_collection_namespaced_pod + del_pod.assert_called_once_with( + self.fake_namespace, + label_selector='runtime_id=%s' % fake_deployment_name) + + def test_update_pool(self): + fake_deployment_name = self.rand_name('deployment', + prefix='TestKubernetesManager') + image = self.rand_name('image', prefix='TestKubernetesManager') + body = { + 'spec': { + 'template': { + 'spec': { + 'containers': [ + { + 'name': 'worker', + 'image': image + } + ] + } + } + } + } + ret = mock.Mock() + ret.status.unavailable_replicas = 0 + self.k8s_v1_ext.read_namespaced_deployment_status.return_value = ret + + update_result = self.manager.update_pool(fake_deployment_name, + image=image) + + self.assertTrue(update_result) + self.k8s_v1_ext.patch_namespaced_deployment.assert_called_once_with( + fake_deployment_name, self.fake_namespace, body) + read_status = self.k8s_v1_ext.read_namespaced_deployment_status + read_status.assert_called_once_with(fake_deployment_name, + self.fake_namespace) + + def test_update_pool_retry(self): + fake_deployment_name = self.rand_name('deployment', + prefix='TestKubernetesManager') + image = self.rand_name('image', prefix='TestKubernetesManager') + ret1 = mock.Mock() + ret1.status.unavailable_replicas = 1 + ret2 = mock.Mock() + ret2.status.unavailable_replicas = 0 + self.k8s_v1_ext.read_namespaced_deployment_status.side_effect = [ + ret1, ret2] + + update_result = self.manager.update_pool(fake_deployment_name, + image=image) + + self.assertTrue(update_result) + self.k8s_v1_ext.patch_namespaced_deployment.assert_called_once_with( + fake_deployment_name, self.fake_namespace, mock.ANY) + read_status = self.k8s_v1_ext.read_namespaced_deployment_status + self.assertEqual(2, read_status.call_count) + + def test_update_pool_rollback(self): + fake_deployment_name = self.rand_name('deployment', + prefix='TestKubernetesManager') + image = self.rand_name('image', prefix='TestKubernetesManager') + ret = mock.Mock() + ret.status.unavailable_replicas = 1 + self.k8s_v1_ext.read_namespaced_deployment_status.return_value = ret + rollback_body = { + "name": fake_deployment_name, + "rollbackTo": { + "revision": 0 + } + } + + update_result = self.manager.update_pool(fake_deployment_name, + image=image) + + self.assertFalse(update_result) + self.k8s_v1_ext.patch_namespaced_deployment.assert_called_once_with( + fake_deployment_name, self.fake_namespace, mock.ANY) + read_status = self.k8s_v1_ext.read_namespaced_deployment_status + self.assertEqual(5, read_status.call_count) + rollback = self.k8s_v1_ext.create_namespaced_deployment_rollback + rollback.assert_called_once_with( + fake_deployment_name, self.fake_namespace, rollback_body) + + def test_prepare_execution(self): + pod = mock.Mock() + pod.metadata.name = self.rand_name('pod', + prefix='TestKubernetesManager') + pod.metadata.labels = {'pod1_key1': 'pod1_value1'} + list_pod_ret = mock.Mock() + list_pod_ret.items = [pod] + self.k8s_v1_api.list_namespaced_pod.return_value = list_pod_ret + self.k8s_v1_api.create_namespaced_service.return_value = ( + self._create_service() + ) + self.k8s_v1_api.list_node.return_value = ( + self._create_nodes_with_external_ip() + ) + runtime_id = common.generate_unicode_uuid() + function_id = common.generate_unicode_uuid() + + pod_names, service_url = self.manager.prepare_execution( + function_id, image=None, identifier=runtime_id, + labels={'runtime_id': runtime_id}) + + self.assertEqual(pod.metadata.name, pod_names) + self.assertEqual( + 'http://%s:%s' % (SERVICE_ADDRESS_EXTERNAL, SERVICE_PORT), + service_url) + + # in _choose_available_pods + self.k8s_v1_api.list_namespaced_pod.assert_called_once_with( + self.fake_namespace, + label_selector='function_id=%s' % function_id) + + # in _prepare_pod -> _update_pod_label + pod_labels = {'pod1_key1': 'pod1_value1', 'function_id': function_id} + body = {'metadata': {'labels': pod_labels}} + self.k8s_v1_api.patch_namespaced_pod.assert_called_once_with( + pod.metadata.name, self.fake_namespace, body) + + # in _prepare_pod + service_body = self.manager.service_template.render( + { + 'service_name': 'service-%s' % function_id, + 'labels': {'function_id': function_id, + 'runtime_id': runtime_id}, + 'selector': pod_labels + } + ) + self.k8s_v1_api.create_namespaced_service.assert_called_once_with( + self.fake_namespace, yaml.safe_load(service_body)) + + def test_prepare_execution_with_image(self): + function_id = common.generate_unicode_uuid() + image = self.rand_name('image', prefix='TestKubernetesManager') + identifier = ('%s-%s' % ( + common.generate_unicode_uuid(dashed=False), + function_id) + )[:63] + + pod_name, url = self.manager.prepare_execution( + function_id, image=image, identifier=identifier) + + self.assertEqual(identifier, pod_name) + self.assertIsNone(url) + + # in _create_pod + pod_body = self.manager.pod_template.render( + { + 'pod_name': identifier, + 'labels': {'function_id': function_id}, + 'pod_image': image, + 'input': [] + } + ) + self.k8s_v1_api.create_namespaced_pod.assert_called_once_with( + self.fake_namespace, body=yaml.safe_load(pod_body)) + + def test_prepare_execution_no_worker_available(self): + ret_pods = mock.Mock() + ret_pods.items = [] + self.k8s_v1_api.list_namespaced_pod.return_value = ret_pods + function_id = common.generate_unicode_uuid() + runtime_id = common.generate_unicode_uuid() + labels = {'runtime_id': runtime_id} + + self.assertRaisesRegex( + exc.OrchestratorException, + "^Execution preparation failed\.$", + self.manager.prepare_execution, + function_id, image=None, identifier=runtime_id, labels=labels) + + # in _choose_available_pods + list_calls = [ + mock.call(self.fake_namespace, + label_selector='function_id=%s' % function_id), + mock.call(self.fake_namespace, + label_selector='!function_id,runtime_id=%s' % runtime_id) + ] + self.k8s_v1_api.list_namespaced_pod.assert_has_calls(list_calls) + self.assertEqual(2, self.k8s_v1_api.list_namespaced_pod.call_count) + + def test_prepare_execution_pod_preparation_failed(self): + pod = mock.Mock() + pod.metadata.name = self.rand_name('pod', + prefix='TestKubernetesManager') + pod.metadata.labels = None + ret_pods = mock.Mock() + ret_pods.items = [pod] + self.k8s_v1_api.list_namespaced_pod.return_value = ret_pods + exception = RuntimeError() + exception.status = 500 + self.k8s_v1_api.create_namespaced_service.side_effect = exception + function_id = common.generate_unicode_uuid() + runtime_id = common.generate_unicode_uuid() + + with mock.patch.object( + self.manager, 'delete_function' + ) as delete_function_mock: + self.assertRaisesRegex( + exc.OrchestratorException, + '^Execution preparation failed\.$', + self.manager.prepare_execution, + function_id, image=None, identifier=runtime_id, + labels={'runtime_id': runtime_id}) + + delete_function_mock.assert_called_once_with( + function_id, + {'runtime_id': runtime_id, 'function_id': function_id}) + + def test_run_execution(self): + pod = mock.Mock() + pod.status.phase = 'Succeeded' + self.k8s_v1_api.read_namespaced_pod.return_value = pod + fake_output = 'fake output' + self.k8s_v1_api.read_namespaced_pod_log.return_value = fake_output + execution_id = common.generate_unicode_uuid() + function_id = common.generate_unicode_uuid() + + result, output = self.manager.run_execution(execution_id, function_id) + + self.k8s_v1_api.read_namespaced_pod.assert_called_once_with( + None, self.fake_namespace) + self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with( + None, self.fake_namespace) + self.assertTrue(result) + self.assertEqual(fake_output, output) + + @mock.patch('qinling.engine.utils.get_request_data') + @mock.patch('qinling.engine.utils.url_request') + def test_run_execution_with_service_url(self, url_request_mock, + get_request_data_mock): + fake_output = 'fake output' + url_request_mock.return_value = (True, 'fake output') + fake_data = 'some data' + get_request_data_mock.return_value = fake_data + execution_id = common.generate_unicode_uuid() + function_id = common.generate_unicode_uuid() + + result, output = self.manager.run_execution( + execution_id, function_id, service_url='FAKE_URL') + + get_request_data_mock.assert_called_once_with( + self.conf, function_id, execution_id, None, 'main.main', None, + self.qinling_endpoint) + url_request_mock.assert_called_once_with( + self.manager.session, 'FAKE_URL/execute', body=fake_data) + self.assertTrue(result) + self.assertEqual(fake_output, output) + + def test_run_execution_retry(self): + pod1 = mock.Mock() + pod1.status.phase = '' + pod2 = mock.Mock() + pod2.status.phase = 'Succeeded' + self.k8s_v1_api.read_namespaced_pod.side_effect = [pod1, pod2] + fake_output = 'fake output' + self.k8s_v1_api.read_namespaced_pod_log.return_value = fake_output + execution_id = common.generate_unicode_uuid() + function_id = common.generate_unicode_uuid() + + result, output = self.manager.run_execution(execution_id, function_id) + + self.assertEqual(2, self.k8s_v1_api.read_namespaced_pod.call_count) + self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with( + None, self.fake_namespace) + self.assertTrue(result) + self.assertEqual(fake_output, output) + + def test_run_execution_failed(self): + self.k8s_v1_api.read_namespaced_pod.side_effect = RuntimeError + execution_id = common.generate_unicode_uuid() + function_id = common.generate_unicode_uuid() + + result, output = self.manager.run_execution(execution_id, function_id) + + self.k8s_v1_api.read_namespaced_pod.assert_called_once_with( + None, self.fake_namespace) + self.k8s_v1_api.read_namespaced_pod_log.assert_not_called() + self.assertFalse(result) + self.assertEqual({'error': 'Function execution failed.'}, output) + + def test_delete_function(self): + # Deleting namespaced service is also tested in this. + svc1 = mock.Mock() + svc1_name = self.rand_name('service', prefix='TestKubernetesManager') + svc1.metadata.name = svc1_name + svc2 = mock.Mock() + svc2_name = self.rand_name('service', prefix='TestKubernetesManager') + svc2.metadata.name = svc2_name + services = mock.Mock() + services.items = [svc1, svc2] + self.k8s_v1_api.list_namespaced_service.return_value = services + function_id = common.generate_unicode_uuid() + + self.manager.delete_function(function_id) + + self.k8s_v1_api.list_namespaced_service.assert_called_once_with( + self.fake_namespace, label_selector='function_id=%s' % function_id) + delete_service_calls = [ + mock.call(svc1_name, self.fake_namespace), + mock.call(svc2_name, self.fake_namespace) + ] + self.k8s_v1_api.delete_namespaced_service.assert_has_calls( + delete_service_calls) + self.assertEqual( + 2, self.k8s_v1_api.delete_namespaced_service.call_count) + delete_pod = self.k8s_v1_api.delete_collection_namespaced_pod + delete_pod.assert_called_once_with( + self.fake_namespace, label_selector='function_id=%s' % function_id) + + def test_delete_function_with_labels(self): + services = mock.Mock() + services.items = [] + labels = {'key1': 'value1', 'key2': 'value2'} + selector = common.convert_dict_to_string(labels) + self.k8s_v1_api.list_namespaced_service.return_value = services + function_id = common.generate_unicode_uuid() + + self.manager.delete_function(function_id, labels=labels) + + self.k8s_v1_api.list_namespaced_service.assert_called_once_with( + self.fake_namespace, label_selector=selector) + self.k8s_v1_api.delete_namespaced_service.assert_not_called() + delete_pod = self.k8s_v1_api.delete_collection_namespaced_pod + delete_pod.assert_called_once_with( + self.fake_namespace, label_selector=selector) + + def test_scaleup_function(self): + pod = mock.Mock() + pod.metadata.name = self.rand_name('pod', + prefix='TestKubernetesManager') + pod.metadata.labels = {'pod1_key1': 'pod1_value1'} + list_pod_ret = mock.Mock() + list_pod_ret.items = [pod] + self.k8s_v1_api.list_namespaced_pod.return_value = list_pod_ret + self.k8s_v1_api.create_namespaced_service.return_value = ( + self._create_service() + ) + self.k8s_v1_api.list_node.return_value = ( + self._create_nodes_with_external_ip() + ) + runtime_id = common.generate_unicode_uuid() + function_id = common.generate_unicode_uuid() + + pod_names, service_url = self.manager.scaleup_function( + function_id, identifier=runtime_id) + + self.assertEqual([pod.metadata.name], pod_names) + self.assertEqual( + 'http://%s:%s' % (SERVICE_ADDRESS_EXTERNAL, SERVICE_PORT), + service_url) + + # in _choose_available_pods + self.k8s_v1_api.list_namespaced_pod.assert_called_once_with( + self.fake_namespace, + label_selector='!function_id,runtime_id=%s' % runtime_id) + + # in _prepare_pod -> _update_pod_label + pod_labels = {'pod1_key1': 'pod1_value1', 'function_id': function_id} + body = {'metadata': {'labels': pod_labels}} + self.k8s_v1_api.patch_namespaced_pod.assert_called_once_with( + pod.metadata.name, self.fake_namespace, body) + + # in _prepare_pod + service_body = self.manager.service_template.render( + { + 'service_name': 'service-%s' % function_id, + 'labels': {'function_id': function_id, + 'runtime_id': runtime_id}, + 'selector': pod_labels + } + ) + self.k8s_v1_api.create_namespaced_service.assert_called_once_with( + self.fake_namespace, yaml.safe_load(service_body)) + + def test_scaleup_function_not_enough_workers(self): + runtime_id = common.generate_unicode_uuid() + function_id = common.generate_unicode_uuid() + ret_pods = mock.Mock() + ret_pods.items = [mock.Mock()] + self.k8s_v1_api.list_namespaced_pod.return_value = ret_pods + + self.assertRaisesRegex( + exc.OrchestratorException, + "^Not enough workers available\.$", + self.manager.scaleup_function, + function_id, identifier=runtime_id, count=2) + + def test_delete_worker(self): + pod_name = self.rand_name('pod', prefix='TestKubernetesManager') + + self.manager.delete_worker(pod_name) + + self.k8s_v1_api.delete_namespaced_pod.assert_called_once_with( + pod_name, self.fake_namespace, {})