From bcc7a125ae48d0057276b8b395d49ce87db4c879 Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Tue, 4 Sep 2018 23:38:40 +1200 Subject: [PATCH] Add duration for image type function For image type function execution, the response message should be similar to package type function, which includes the duration information. There is no output field in the result for image type function. The output can be checked by query the execution log. Change-Id: I02dbd53db4f8bee3696b810de1df7bb2c77c20b3 Story: 2003145 Task: 23280 --- qinling/engine/default_engine.py | 6 +- qinling/engine/utils.py | 12 +- qinling/orchestrator/kubernetes/manager.py | 36 ++++-- .../tests/unit/engine/test_default_engine.py | 20 ++-- .../orchestrator/kubernetes/test_manager.py | 105 +++++++++++------- .../tests/api/test_executions.py | 31 +++--- 6 files changed, 123 insertions(+), 87 deletions(-) diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index 303c1318..c3ab1c77 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -182,7 +182,7 @@ class DefaultEngine(object): is_image_source=is_image_source) return - if source == constants.IMAGE_FUNCTION: + if is_image_source: image = function.code['image'] # Be consistent with k8s naming convention identifier = ('%s-%s' % @@ -211,8 +211,8 @@ class DefaultEngine(object): utils.handle_execution_exception(execution_id, str(e)) return - # For image type function, read the worker log; For package type - # function, invoke and get log + # For image type function, wait for its completion and retrieve the + # worker log; For package type function, invoke and get log success, res = self.orchestrator.run_execution( execution_id, function_id, diff --git a/qinling/engine/utils.py b/qinling/engine/utils.py index 1a6de7f4..05408ee0 100644 --- a/qinling/engine/utils.py +++ b/qinling/engine/utils.py @@ -130,16 +130,8 @@ def db_set_execution_status(execution_id, execution_status, logs, res): def finish_execution(execution_id, success, res, is_image_source=False): - logs = '' - if is_image_source: - # If the function is created from docker image, the result is - # direct output, here we convert to a dict to fit into the db - # schema. - res = {'output': res} - else: - # Execution log is only available for non-image source execution. - logs = res.pop('logs', '') - success = success and res.pop('success') + logs = res.pop('logs', '') + success = success and res.pop('success', True) LOG.debug( 'Finished execution %s, success: %s', execution_id, success diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index e8ccff1e..fac48187 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -509,28 +509,40 @@ class KubernetesManager(base.OrchestratorBase): self.conf.kubernetes.namespace ) status = pod.status.phase - return True if status == 'Succeeded' else False + if status == 'Succeeded': + return pod + + raise exc.OrchestratorException() + + duration = 0 try: r = tenacity.Retrying( wait=tenacity.wait_fixed(1), stop=tenacity.stop_after_delay(180), - retry=tenacity.retry_if_result( - lambda result: result is False) + retry=tenacity.retry_if_exception_type( + exc.OrchestratorException) ) - r.call(_wait_complete) - except Exception as e: - LOG.exception( - "Failed to get pod output, pod: %s, error: %s", - identifier, str(e) - ) - return False, {'error': 'Function execution failed.'} + pod = r.call(_wait_complete) - output = self.v1.read_namespaced_pod_log( + statuses = pod.status.container_statuses + for s in statuses: + if hasattr(s.state, "terminated"): + end_time = s.state.terminated.finished_at + start_time = s.state.terminated.started_at + delta = end_time - start_time + duration = delta.seconds + break + except Exception: + LOG.exception("Failed to wait for pod %s", identifier) + return False, {'error': 'Function execution failed.', + 'duration': duration} + + log = self.v1.read_namespaced_pod_log( identifier, self.conf.kubernetes.namespace, ) - return True, output + return True, {'duration': duration, 'logs': log} def delete_function(self, function_id, version, labels=None): """Delete related resources for function. diff --git a/qinling/tests/unit/engine/test_default_engine.py b/qinling/tests/unit/engine/test_default_engine.py index e3e484a8..f51af844 100644 --- a/qinling/tests/unit/engine/test_default_engine.py +++ b/qinling/tests/unit/engine/test_default_engine.py @@ -242,8 +242,9 @@ class TestDefaultEngine(base.DbTestCase): self.orchestrator.prepare_execution.return_value = ( mock.Mock(), None) self.orchestrator.run_execution.side_effect = [ - (True, 'success result'), - (False, 'failed result')] + (True, {'duration': 5, 'logs': 'fake log'}), + (False, {'duration': 0, 'error': 'Function execution failed.'}) + ] # Create two executions, with different results self.default_engine.create_execution( @@ -302,12 +303,15 @@ class TestDefaultEngine(base.DbTestCase): execution_1 = db_api.get_execution(execution_1_id) execution_2 = db_api.get_execution(execution_2_id) - self.assertEqual(execution_1.status, status.SUCCESS) - self.assertEqual(execution_1.logs, '') - self.assertEqual(execution_1.result, {'output': 'success result'}) - self.assertEqual(execution_2.status, status.FAILED) - self.assertEqual(execution_2.logs, '') - self.assertEqual(execution_2.result, {'output': 'failed result'}) + self.assertEqual(status.SUCCESS, execution_1.status) + self.assertEqual('fake log', execution_1.logs) + self.assertEqual({"duration": 5}, execution_1.result) + self.assertEqual(status.FAILED, execution_2.status) + self.assertEqual('', execution_2.logs) + self.assertEqual( + {'duration': 0, 'error': 'Function execution failed.'}, + execution_2.result + ) @mock.patch('qinling.utils.etcd_util.get_service_url') def test_create_execution_prepare_execution_exception( diff --git a/qinling/tests/unit/orchestrator/kubernetes/test_manager.py b/qinling/tests/unit/orchestrator/kubernetes/test_manager.py index 0b2186dc..ab82aab7 100644 --- a/qinling/tests/unit/orchestrator/kubernetes/test_manager.py +++ b/qinling/tests/unit/orchestrator/kubernetes/test_manager.py @@ -12,10 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import mock +import datetime import testtools import yaml +import mock from oslo_config import cfg from qinling import config @@ -616,10 +617,62 @@ class TestKubernetesManager(base.DbTestCase): def test_run_execution_image_type_function(self): pod = mock.Mock() + status = mock.Mock() + status.state.terminated.finished_at = datetime.datetime(2018, 9, 4, 10, + 1, 50) + status.state.terminated.started_at = datetime.datetime(2018, 9, 4, 10, + 1, 40) pod.status.phase = 'Succeeded' + pod.status.container_statuses = [status] self.k8s_v1_api.read_namespaced_pod.return_value = pod - fake_output = 'fake output' - self.k8s_v1_api.read_namespaced_pod_log.return_value = fake_output + fake_log = 'fake log' + self.k8s_v1_api.read_namespaced_pod_log.return_value = fake_log + execution_id = common.generate_unicode_uuid() + function_id = common.generate_unicode_uuid() + identifier = 'fake_identifier' + + result, output = self.manager.run_execution(execution_id, function_id, + 0, identifier=identifier) + + self.k8s_v1_api.read_namespaced_pod.assert_called_once_with( + identifier, self.fake_namespace) + self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with( + identifier, self.fake_namespace) + self.assertTrue(result) + + expected_output = {'duration': 10, 'logs': fake_log} + self.assertEqual(expected_output, output) + + def test_run_execution_image_type_function_retry(self): + pod1 = mock.Mock() + pod1.status.phase = '' + pod2 = mock.Mock() + status = mock.Mock() + status.state.terminated.finished_at = datetime.datetime(2018, 9, 4, 10, + 1, 50) + status.state.terminated.started_at = datetime.datetime(2018, 9, 4, 10, + 1, 40) + pod2.status.phase = 'Succeeded' + pod2.status.container_statuses = [status] + self.k8s_v1_api.read_namespaced_pod.side_effect = [pod1, pod2] + fake_log = 'fake log' + self.k8s_v1_api.read_namespaced_pod_log.return_value = fake_log + execution_id = common.generate_unicode_uuid() + function_id = common.generate_unicode_uuid() + + result, output = self.manager.run_execution(execution_id, function_id, + 0) + + self.assertEqual(2, self.k8s_v1_api.read_namespaced_pod.call_count) + self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with( + None, self.fake_namespace) + self.assertTrue(result) + + expected_output = {'duration': 10, 'logs': fake_log} + self.assertEqual(expected_output, output) + + def test_run_execution_image_type_function_read_pod_exception(self): + self.k8s_v1_api.read_namespaced_pod.side_effect = RuntimeError execution_id = common.generate_unicode_uuid() function_id = common.generate_unicode_uuid() @@ -628,10 +681,14 @@ class TestKubernetesManager(base.DbTestCase): self.k8s_v1_api.read_namespaced_pod.assert_called_once_with( None, self.fake_namespace) - self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with( - None, self.fake_namespace) - self.assertTrue(result) - self.assertEqual(fake_output, output) + self.k8s_v1_api.read_namespaced_pod_log.assert_not_called() + self.assertFalse(result) + + expected_output = { + 'error': 'Function execution failed.', + 'duration': 0 + } + self.assertEqual(expected_output, output) @mock.patch('qinling.engine.utils.url_request') def test_run_execution_version_0(self, mock_request): @@ -662,40 +719,6 @@ class TestKubernetesManager(base.DbTestCase): self.manager.session, 'FAKE_URL/execute', body=data ) - def test_run_execution_no_service_url_retry(self): - pod1 = mock.Mock() - pod1.status.phase = '' - pod2 = mock.Mock() - pod2.status.phase = 'Succeeded' - self.k8s_v1_api.read_namespaced_pod.side_effect = [pod1, pod2] - fake_output = 'fake output' - self.k8s_v1_api.read_namespaced_pod_log.return_value = fake_output - execution_id = common.generate_unicode_uuid() - function_id = common.generate_unicode_uuid() - - result, output = self.manager.run_execution(execution_id, function_id, - 0) - - self.assertEqual(2, self.k8s_v1_api.read_namespaced_pod.call_count) - self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with( - None, self.fake_namespace) - self.assertTrue(result) - self.assertEqual(fake_output, output) - - def test_run_execution_no_service_url_read_pod_exception(self): - self.k8s_v1_api.read_namespaced_pod.side_effect = RuntimeError - execution_id = common.generate_unicode_uuid() - function_id = common.generate_unicode_uuid() - - result, output = self.manager.run_execution(execution_id, function_id, - 0) - - self.k8s_v1_api.read_namespaced_pod.assert_called_once_with( - None, self.fake_namespace) - self.k8s_v1_api.read_namespaced_pod_log.assert_not_called() - self.assertFalse(result) - self.assertEqual({'error': 'Function execution failed.'}, output) - def test_delete_function(self): # Deleting namespaced service is also tested in this. svc1 = mock.Mock() diff --git a/qinling_tempest_plugin/tests/api/test_executions.py b/qinling_tempest_plugin/tests/api/test_executions.py index 4db0631e..efaef199 100644 --- a/qinling_tempest_plugin/tests/api/test_executions.py +++ b/qinling_tempest_plugin/tests/api/test_executions.py @@ -264,6 +264,24 @@ class ExecutionsTest(base.BaseQinlingTest): self.assertEqual(200, resp.status) self.assertEqual(2, len(body['workers'])) + @decorators.idempotent_id('d0598868-e45d-11e7-9125-00224d6b7bc1') + def test_image_function_execution(self): + function_id = self.create_function(image=True) + resp, body = self.client.create_execution(function_id, + input='Qinling') + + self.assertEqual(201, resp.status) + execution_id = body['id'] + self.addCleanup(self.client.delete_resource, 'executions', + execution_id, ignore_notfound=True) + + self.assertEqual('success', body['status']) + self.assertIn('duration', jsonutils.loads(body['result'])) + + resp, body = self.client.get_execution_log(execution_id) + self.assertEqual(200, resp.status) + self.assertIn('Qinling', body) + @decorators.idempotent_id('ccfe67ce-e467-11e7-916c-00224d6b7bc1') def test_python_execution_positional_args(self): package = self.create_package( @@ -320,19 +338,6 @@ class ExecutionsTest(base.BaseQinlingTest): 'too much resource consumption', result['output'] ) - @decorators.idempotent_id('d0598868-e45d-11e7-9125-00224d6b7bc1') - def test_execution_image_function(self): - function_id = self.create_function(image=True) - resp, body = self.client.create_execution(function_id, - input='Qinling') - - self.assertEqual(201, resp.status) - execution_id = body['id'] - self.addCleanup(self.client.delete_resource, 'executions', - execution_id, ignore_notfound=True) - self.assertEqual('success', body['status']) - self.assertIn('Qinling', jsonutils.loads(body['result'])['output']) - @decorators.idempotent_id('2b5f0787-b82d-4fc4-af76-cf86d389a76b') def test_python_execution_memory_limit_non_image(self): """In this case, the following steps are taken: