diff --git a/qinling/tests/unit/orchestrator/kubernetes/test_manager.py b/qinling/tests/unit/orchestrator/kubernetes/test_manager.py index 54532b7a..b1a9fece 100644 --- a/qinling/tests/unit/orchestrator/kubernetes/test_manager.py +++ b/qinling/tests/unit/orchestrator/kubernetes/test_manager.py @@ -13,6 +13,7 @@ # limitations under the License. import mock +import testtools import yaml from oslo_config import cfg @@ -26,6 +27,7 @@ from qinling.utils import common CONF = cfg.CONF SERVICE_PORT = 9090 SERVICE_ADDRESS_EXTERNAL = '1.2.3.4' +SERVICE_ADDRESS_INTERNAL = '127.0.0.1' class TestKubernetesManager(base.BaseTest): @@ -73,6 +75,55 @@ class TestKubernetesManager(base.BaseTest): nodes.items = [item] return nodes + def _create_nodes_with_internal_ip(self): + addr1 = mock.Mock() + addr1.type = 'InternalIP' + addr1.address = SERVICE_ADDRESS_INTERNAL + addr2 = mock.Mock() + addr2.type = 'UNKNOWN TYPE' + item = mock.Mock() + item.status.addresses = [addr1, addr2] + nodes = mock.Mock() + nodes.items = [item] + return nodes + + def test__ensure_namespace(self): + # self.manager is not used in this test. + namespaces = mock.Mock() + namespaces.items = [] + self.k8s_v1_api.list_namespace.return_value = namespaces + + k8s_manager.KubernetesManager(self.conf, self.qinling_endpoint) + + namespace_body = { + 'apiVersion': 'v1', + 'kind': 'Namespace', + 'metadata': { + 'name': self.fake_namespace, + 'labels': { + 'name': self.fake_namespace + } + }, + } + # setUp also calls list_namespace. + self.assertEqual(2, self.k8s_v1_api.list_namespace.call_count) + self.k8s_v1_api.create_namespace.assert_called_once_with( + namespace_body) + + def test__ensure_namespace_not_create_namespace(self): + # self.manager is not used in this test. + item = mock.Mock() + item.metadata.name = self.fake_namespace + namespaces = mock.Mock() + namespaces.items = [item] + self.k8s_v1_api.list_namespace.return_value = namespaces + + k8s_manager.KubernetesManager(self.conf, self.qinling_endpoint) + + # setUp also calls list_namespace. + self.assertEqual(2, self.k8s_v1_api.list_namespace.call_count) + self.k8s_v1_api.create_namespace.assert_not_called() + def test_create_pool(self): ret = mock.Mock() ret.status.replicas = 5 @@ -102,6 +153,42 @@ class TestKubernetesManager(base.BaseTest): self.k8s_v1_ext.read_namespaced_deployment.assert_called_once_with( fake_deployment_name, self.fake_namespace) + def test_create_pool_wait_deployment_available(self): + ret1 = mock.Mock() + ret1.status.replicas = 0 + ret2 = mock.Mock() + ret2.status.replicas = 3 + ret2.status.available_replicas = 1 + ret3 = mock.Mock() + ret3.status.replicas = 3 + ret3.status.available_replicas = 3 + self.k8s_v1_ext.read_namespaced_deployment.side_effect = [ + ret1, ret2, ret3 + ] + fake_deployment_name = self.rand_name('deployment', + prefix='TestKubernetesManager') + fake_image = self.rand_name('image', prefix='TestKubernetesManager') + + self.manager.create_pool(fake_deployment_name, fake_image) + + self.assertEqual( + 3, self.k8s_v1_ext.read_namespaced_deployment.call_count) + + @testtools.skip("Default timeout is too long.") + def test_create_pool_wait_deployment_timeout(self): + ret = mock.Mock() + ret.status.replicas = 0 + self.k8s_v1_ext.read_namespaced_deployment.return_value = ret + fake_deployment_name = self.rand_name('deployment', + prefix='TestKubernetesManager') + fake_image = self.rand_name('image', prefix='TestKubernetesManager') + + self.manager.create_pool(fake_deployment_name, fake_image) + + self.assertLess( + 200, # Default timeout is 600s with wait interval set to 2s. + self.k8s_v1_ext.read_namespaced_deployment.call_count) + def test_delete_pool(self): # Deleting namespaced service is also tested in this. svc1 = mock.Mock() @@ -293,6 +380,56 @@ class TestKubernetesManager(base.BaseTest): self.k8s_v1_api.create_namespaced_pod.assert_called_once_with( self.fake_namespace, body=yaml.safe_load(pod_body)) + def test_prepare_execution_with_image_function_input(self): + function_id = common.generate_unicode_uuid() + image = self.rand_name('image', prefix='TestKubernetesManager') + identifier = ('%s-%s' % ( + common.generate_unicode_uuid(dashed=False), + function_id) + )[:63] + fake_input = {'__function_input': 'input_item1 input_item2'} + + pod_name, url = self.manager.prepare_execution( + function_id, image=image, identifier=identifier, + input=fake_input) + + # in _create_pod + pod_body = self.manager.pod_template.render( + { + 'pod_name': identifier, + 'labels': {'function_id': function_id}, + 'pod_image': image, + 'input': ['input_item1', 'input_item2'] + } + ) + self.k8s_v1_api.create_namespaced_pod.assert_called_once_with( + self.fake_namespace, body=yaml.safe_load(pod_body)) + + def test_prepare_execution_with_image_json_input(self): + function_id = common.generate_unicode_uuid() + image = self.rand_name('image', prefix='TestKubernetesManager') + identifier = ('%s-%s' % ( + common.generate_unicode_uuid(dashed=False), + function_id) + )[:63] + fake_input = '["input_item3", "input_item4"]' + + pod_name, url = self.manager.prepare_execution( + function_id, image=image, identifier=identifier, + input=fake_input) + + # in _create_pod + pod_body = self.manager.pod_template.render( + { + 'pod_name': identifier, + 'labels': {'function_id': function_id}, + 'pod_image': image, + 'input': ['input_item3', 'input_item4'] + } + ) + self.k8s_v1_api.create_namespaced_pod.assert_called_once_with( + self.fake_namespace, body=yaml.safe_load(pod_body)) + def test_prepare_execution_no_worker_available(self): ret_pods = mock.Mock() ret_pods.items = [] @@ -317,6 +454,34 @@ class TestKubernetesManager(base.BaseTest): self.k8s_v1_api.list_namespaced_pod.assert_has_calls(list_calls) self.assertEqual(2, self.k8s_v1_api.list_namespaced_pod.call_count) + def test_prepare_execution_service_already_exists(self): + pod = mock.Mock() + pod.metadata.name = self.rand_name('pod', + prefix='TestKubernetesManager') + pod.metadata.labels = {'pod1_key1': 'pod1_value1'} + list_pod_ret = mock.Mock() + list_pod_ret.items = [pod] + self.k8s_v1_api.list_namespaced_pod.return_value = list_pod_ret + exception = RuntimeError() + exception.status = 409 + self.k8s_v1_api.create_namespaced_service.side_effect = exception + self.k8s_v1_api.read_namespaced_service.return_value = ( + self._create_service() + ) + self.k8s_v1_api.list_node.return_value = ( + self._create_nodes_with_external_ip() + ) + runtime_id = common.generate_unicode_uuid() + function_id = common.generate_unicode_uuid() + + pod_names, service_url = self.manager.prepare_execution( + function_id, image=None, identifier=runtime_id, + labels={'runtime_id': runtime_id}) + + # in _prepare_pod + self.k8s_v1_api.read_namespaced_service.assert_called_once_with( + 'service-%s' % function_id, self.fake_namespace) + def test_prepare_execution_pod_preparation_failed(self): pod = mock.Mock() pod.metadata.name = self.rand_name('pod', @@ -345,6 +510,32 @@ class TestKubernetesManager(base.BaseTest): function_id, {'runtime_id': runtime_id, 'function_id': function_id}) + def test_prepare_execution_service_internal_ip(self): + pod = mock.Mock() + pod.metadata.name = self.rand_name('pod', + prefix='TestKubernetesManager') + pod.metadata.labels = {'pod1_key1': 'pod1_value1'} + list_pod_ret = mock.Mock() + list_pod_ret.items = [pod] + self.k8s_v1_api.list_namespaced_pod.return_value = list_pod_ret + self.k8s_v1_api.create_namespaced_service.return_value = ( + self._create_service() + ) + self.k8s_v1_api.list_node.return_value = ( + self._create_nodes_with_internal_ip() + ) + runtime_id = common.generate_unicode_uuid() + function_id = common.generate_unicode_uuid() + + pod_names, service_url = self.manager.prepare_execution( + function_id, image=None, identifier=runtime_id, + labels={'runtime_id': runtime_id}) + + self.assertEqual(pod.metadata.name, pod_names) + self.assertEqual( + 'http://%s:%s' % (SERVICE_ADDRESS_INTERNAL, SERVICE_PORT), + service_url) + def test_run_execution(self): pod = mock.Mock() pod.status.phase = 'Succeeded' @@ -524,6 +715,77 @@ class TestKubernetesManager(base.BaseTest): self.manager.scaleup_function, function_id, identifier=runtime_id, count=2) + def test_scaleup_function_service_already_exists(self): + pod = mock.Mock() + pod.metadata.name = self.rand_name('pod', + prefix='TestKubernetesManager') + pod.metadata.labels = {'pod1_key1': 'pod1_value1'} + list_pod_ret = mock.Mock() + list_pod_ret.items = [pod] + self.k8s_v1_api.list_namespaced_pod.return_value = list_pod_ret + exception = RuntimeError() + exception.status = 409 + self.k8s_v1_api.create_namespaced_service.side_effect = exception + self.k8s_v1_api.read_namespaced_service.return_value = ( + self._create_service() + ) + self.k8s_v1_api.list_node.return_value = ( + self._create_nodes_with_external_ip() + ) + runtime_id = common.generate_unicode_uuid() + function_id = common.generate_unicode_uuid() + + pod_names, service_url = self.manager.scaleup_function( + function_id, identifier=runtime_id) + + # in _prepare_pod + self.k8s_v1_api.read_namespaced_service.assert_called_once_with( + 'service-%s' % function_id, self.fake_namespace) + + def test_scaleup_function_service_create_failed(self): + pod = mock.Mock() + pod.metadata.name = self.rand_name('pod', + prefix='TestKubernetesManager') + pod.metadata.labels = None + list_pod_ret = mock.Mock() + list_pod_ret.items = [pod] + self.k8s_v1_api.list_namespaced_pod.return_value = list_pod_ret + exception = RuntimeError() + exception.status = 500 + self.k8s_v1_api.create_namespaced_service.side_effect = exception + runtime_id = common.generate_unicode_uuid() + function_id = common.generate_unicode_uuid() + + self.assertRaises( + RuntimeError, + self.manager.scaleup_function, + function_id, identifier=runtime_id) + + def test_scaleup_function_service_internal_ip(self): + pod = mock.Mock() + pod.metadata.name = self.rand_name('pod', + prefix='TestKubernetesManager') + pod.metadata.labels = None + list_pod_ret = mock.Mock() + list_pod_ret.items = [pod] + self.k8s_v1_api.list_namespaced_pod.return_value = list_pod_ret + self.k8s_v1_api.create_namespaced_service.return_value = ( + self._create_service() + ) + self.k8s_v1_api.list_node.return_value = ( + self._create_nodes_with_internal_ip() + ) + runtime_id = common.generate_unicode_uuid() + function_id = common.generate_unicode_uuid() + + pod_names, service_url = self.manager.scaleup_function( + function_id, identifier=runtime_id) + + self.assertEqual([pod.metadata.name], pod_names) + self.assertEqual( + 'http://%s:%s' % (SERVICE_ADDRESS_INTERNAL, SERVICE_PORT), + service_url) + def test_delete_worker(self): pod_name = self.rand_name('pod', prefix='TestKubernetesManager')