diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index 303c1318..c3ab1c77 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -182,7 +182,7 @@ class DefaultEngine(object): is_image_source=is_image_source) return - if source == constants.IMAGE_FUNCTION: + if is_image_source: image = function.code['image'] # Be consistent with k8s naming convention identifier = ('%s-%s' % @@ -211,8 +211,8 @@ class DefaultEngine(object): utils.handle_execution_exception(execution_id, str(e)) return - # For image type function, read the worker log; For package type - # function, invoke and get log + # For image type function, wait for its completion and retrieve the + # worker log; For package type function, invoke and get log success, res = self.orchestrator.run_execution( execution_id, function_id, diff --git a/qinling/engine/utils.py b/qinling/engine/utils.py index 1a6de7f4..05408ee0 100644 --- a/qinling/engine/utils.py +++ b/qinling/engine/utils.py @@ -130,16 +130,8 @@ def db_set_execution_status(execution_id, execution_status, logs, res): def finish_execution(execution_id, success, res, is_image_source=False): - logs = '' - if is_image_source: - # If the function is created from docker image, the result is - # direct output, here we convert to a dict to fit into the db - # schema. - res = {'output': res} - else: - # Execution log is only available for non-image source execution. - logs = res.pop('logs', '') - success = success and res.pop('success') + logs = res.pop('logs', '') + success = success and res.pop('success', True) LOG.debug( 'Finished execution %s, success: %s', execution_id, success diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index e8ccff1e..fac48187 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -509,28 +509,40 @@ class KubernetesManager(base.OrchestratorBase): self.conf.kubernetes.namespace ) status = pod.status.phase - return True if status == 'Succeeded' else False + if status == 'Succeeded': + return pod + + raise exc.OrchestratorException() + + duration = 0 try: r = tenacity.Retrying( wait=tenacity.wait_fixed(1), stop=tenacity.stop_after_delay(180), - retry=tenacity.retry_if_result( - lambda result: result is False) + retry=tenacity.retry_if_exception_type( + exc.OrchestratorException) ) - r.call(_wait_complete) - except Exception as e: - LOG.exception( - "Failed to get pod output, pod: %s, error: %s", - identifier, str(e) - ) - return False, {'error': 'Function execution failed.'} + pod = r.call(_wait_complete) - output = self.v1.read_namespaced_pod_log( + statuses = pod.status.container_statuses + for s in statuses: + if hasattr(s.state, "terminated"): + end_time = s.state.terminated.finished_at + start_time = s.state.terminated.started_at + delta = end_time - start_time + duration = delta.seconds + break + except Exception: + LOG.exception("Failed to wait for pod %s", identifier) + return False, {'error': 'Function execution failed.', + 'duration': duration} + + log = self.v1.read_namespaced_pod_log( identifier, self.conf.kubernetes.namespace, ) - return True, output + return True, {'duration': duration, 'logs': log} def delete_function(self, function_id, version, labels=None): """Delete related resources for function. diff --git a/qinling/tests/unit/engine/test_default_engine.py b/qinling/tests/unit/engine/test_default_engine.py index e3e484a8..f51af844 100644 --- a/qinling/tests/unit/engine/test_default_engine.py +++ b/qinling/tests/unit/engine/test_default_engine.py @@ -242,8 +242,9 @@ class TestDefaultEngine(base.DbTestCase): self.orchestrator.prepare_execution.return_value = ( mock.Mock(), None) self.orchestrator.run_execution.side_effect = [ - (True, 'success result'), - (False, 'failed result')] + (True, {'duration': 5, 'logs': 'fake log'}), + (False, {'duration': 0, 'error': 'Function execution failed.'}) + ] # Create two executions, with different results self.default_engine.create_execution( @@ -302,12 +303,15 @@ class TestDefaultEngine(base.DbTestCase): execution_1 = db_api.get_execution(execution_1_id) execution_2 = db_api.get_execution(execution_2_id) - self.assertEqual(execution_1.status, status.SUCCESS) - self.assertEqual(execution_1.logs, '') - self.assertEqual(execution_1.result, {'output': 'success result'}) - self.assertEqual(execution_2.status, status.FAILED) - self.assertEqual(execution_2.logs, '') - self.assertEqual(execution_2.result, {'output': 'failed result'}) + self.assertEqual(status.SUCCESS, execution_1.status) + self.assertEqual('fake log', execution_1.logs) + self.assertEqual({"duration": 5}, execution_1.result) + self.assertEqual(status.FAILED, execution_2.status) + self.assertEqual('', execution_2.logs) + self.assertEqual( + {'duration': 0, 'error': 'Function execution failed.'}, + execution_2.result + ) @mock.patch('qinling.utils.etcd_util.get_service_url') def test_create_execution_prepare_execution_exception( diff --git a/qinling/tests/unit/orchestrator/kubernetes/test_manager.py b/qinling/tests/unit/orchestrator/kubernetes/test_manager.py index 0b2186dc..ab82aab7 100644 --- a/qinling/tests/unit/orchestrator/kubernetes/test_manager.py +++ b/qinling/tests/unit/orchestrator/kubernetes/test_manager.py @@ -12,10 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import mock +import datetime import testtools import yaml +import mock from oslo_config import cfg from qinling import config @@ -616,10 +617,62 @@ class TestKubernetesManager(base.DbTestCase): def test_run_execution_image_type_function(self): pod = mock.Mock() + status = mock.Mock() + status.state.terminated.finished_at = datetime.datetime(2018, 9, 4, 10, + 1, 50) + status.state.terminated.started_at = datetime.datetime(2018, 9, 4, 10, + 1, 40) pod.status.phase = 'Succeeded' + pod.status.container_statuses = [status] self.k8s_v1_api.read_namespaced_pod.return_value = pod - fake_output = 'fake output' - self.k8s_v1_api.read_namespaced_pod_log.return_value = fake_output + fake_log = 'fake log' + self.k8s_v1_api.read_namespaced_pod_log.return_value = fake_log + execution_id = common.generate_unicode_uuid() + function_id = common.generate_unicode_uuid() + identifier = 'fake_identifier' + + result, output = self.manager.run_execution(execution_id, function_id, + 0, identifier=identifier) + + self.k8s_v1_api.read_namespaced_pod.assert_called_once_with( + identifier, self.fake_namespace) + self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with( + identifier, self.fake_namespace) + self.assertTrue(result) + + expected_output = {'duration': 10, 'logs': fake_log} + self.assertEqual(expected_output, output) + + def test_run_execution_image_type_function_retry(self): + pod1 = mock.Mock() + pod1.status.phase = '' + pod2 = mock.Mock() + status = mock.Mock() + status.state.terminated.finished_at = datetime.datetime(2018, 9, 4, 10, + 1, 50) + status.state.terminated.started_at = datetime.datetime(2018, 9, 4, 10, + 1, 40) + pod2.status.phase = 'Succeeded' + pod2.status.container_statuses = [status] + self.k8s_v1_api.read_namespaced_pod.side_effect = [pod1, pod2] + fake_log = 'fake log' + self.k8s_v1_api.read_namespaced_pod_log.return_value = fake_log + execution_id = common.generate_unicode_uuid() + function_id = common.generate_unicode_uuid() + + result, output = self.manager.run_execution(execution_id, function_id, + 0) + + self.assertEqual(2, self.k8s_v1_api.read_namespaced_pod.call_count) + self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with( + None, self.fake_namespace) + self.assertTrue(result) + + expected_output = {'duration': 10, 'logs': fake_log} + self.assertEqual(expected_output, output) + + def test_run_execution_image_type_function_read_pod_exception(self): + self.k8s_v1_api.read_namespaced_pod.side_effect = RuntimeError execution_id = common.generate_unicode_uuid() function_id = common.generate_unicode_uuid() @@ -628,10 +681,14 @@ class TestKubernetesManager(base.DbTestCase): self.k8s_v1_api.read_namespaced_pod.assert_called_once_with( None, self.fake_namespace) - self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with( - None, self.fake_namespace) - self.assertTrue(result) - self.assertEqual(fake_output, output) + self.k8s_v1_api.read_namespaced_pod_log.assert_not_called() + self.assertFalse(result) + + expected_output = { + 'error': 'Function execution failed.', + 'duration': 0 + } + self.assertEqual(expected_output, output) @mock.patch('qinling.engine.utils.url_request') def test_run_execution_version_0(self, mock_request): @@ -662,40 +719,6 @@ class TestKubernetesManager(base.DbTestCase): self.manager.session, 'FAKE_URL/execute', body=data ) - def test_run_execution_no_service_url_retry(self): - pod1 = mock.Mock() - pod1.status.phase = '' - pod2 = mock.Mock() - pod2.status.phase = 'Succeeded' - self.k8s_v1_api.read_namespaced_pod.side_effect = [pod1, pod2] - fake_output = 'fake output' - self.k8s_v1_api.read_namespaced_pod_log.return_value = fake_output - execution_id = common.generate_unicode_uuid() - function_id = common.generate_unicode_uuid() - - result, output = self.manager.run_execution(execution_id, function_id, - 0) - - self.assertEqual(2, self.k8s_v1_api.read_namespaced_pod.call_count) - self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with( - None, self.fake_namespace) - self.assertTrue(result) - self.assertEqual(fake_output, output) - - def test_run_execution_no_service_url_read_pod_exception(self): - self.k8s_v1_api.read_namespaced_pod.side_effect = RuntimeError - execution_id = common.generate_unicode_uuid() - function_id = common.generate_unicode_uuid() - - result, output = self.manager.run_execution(execution_id, function_id, - 0) - - self.k8s_v1_api.read_namespaced_pod.assert_called_once_with( - None, self.fake_namespace) - self.k8s_v1_api.read_namespaced_pod_log.assert_not_called() - self.assertFalse(result) - self.assertEqual({'error': 'Function execution failed.'}, output) - def test_delete_function(self): # Deleting namespaced service is also tested in this. svc1 = mock.Mock() diff --git a/qinling_tempest_plugin/tests/api/test_executions.py b/qinling_tempest_plugin/tests/api/test_executions.py index 4db0631e..efaef199 100644 --- a/qinling_tempest_plugin/tests/api/test_executions.py +++ b/qinling_tempest_plugin/tests/api/test_executions.py @@ -264,6 +264,24 @@ class ExecutionsTest(base.BaseQinlingTest): self.assertEqual(200, resp.status) self.assertEqual(2, len(body['workers'])) + @decorators.idempotent_id('d0598868-e45d-11e7-9125-00224d6b7bc1') + def test_image_function_execution(self): + function_id = self.create_function(image=True) + resp, body = self.client.create_execution(function_id, + input='Qinling') + + self.assertEqual(201, resp.status) + execution_id = body['id'] + self.addCleanup(self.client.delete_resource, 'executions', + execution_id, ignore_notfound=True) + + self.assertEqual('success', body['status']) + self.assertIn('duration', jsonutils.loads(body['result'])) + + resp, body = self.client.get_execution_log(execution_id) + self.assertEqual(200, resp.status) + self.assertIn('Qinling', body) + @decorators.idempotent_id('ccfe67ce-e467-11e7-916c-00224d6b7bc1') def test_python_execution_positional_args(self): package = self.create_package( @@ -320,19 +338,6 @@ class ExecutionsTest(base.BaseQinlingTest): 'too much resource consumption', result['output'] ) - @decorators.idempotent_id('d0598868-e45d-11e7-9125-00224d6b7bc1') - def test_execution_image_function(self): - function_id = self.create_function(image=True) - resp, body = self.client.create_execution(function_id, - input='Qinling') - - self.assertEqual(201, resp.status) - execution_id = body['id'] - self.addCleanup(self.client.delete_resource, 'executions', - execution_id, ignore_notfound=True) - self.assertEqual('success', body['status']) - self.assertIn('Qinling', jsonutils.loads(body['result'])['output']) - @decorators.idempotent_id('2b5f0787-b82d-4fc4-af76-cf86d389a76b') def test_python_execution_memory_limit_non_image(self): """In this case, the following steps are taken: