From 83a0e27d59b7408dbe430659daceeda5346a082c Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Tue, 11 Sep 2018 12:03:50 +1200 Subject: [PATCH] Support Image type function timeout Story: 2002174 Task: 26342 Change-Id: Id8c141ea46d3e4ee7a35c99a04f1e40583f6e3dc --- qinling/engine/default_engine.py | 19 ++++---- qinling/engine/utils.py | 10 +++-- qinling/exceptions.py | 5 +++ qinling/orchestrator/kubernetes/manager.py | 38 ++++++++++++---- .../orchestrator/kubernetes/templates/pod.j2 | 2 +- .../tests/unit/engine/test_default_engine.py | 29 ++++++------ .../orchestrator/kubernetes/test_manager.py | 45 +++++++++++++++++-- qinling/utils/executions.py | 10 +++-- .../tests/api/test_executions.py | 43 ++++++++++++++++-- qinling_tempest_plugin/tests/base.py | 4 +- 10 files changed, 156 insertions(+), 49 deletions(-) diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index f97182e1..7f0c3184 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -20,7 +20,6 @@ from qinling.db import api as db_api from qinling.engine import utils from qinling import exceptions as exc from qinling import status -from qinling.utils import common from qinling.utils import constants from qinling.utils import etcd_util @@ -184,19 +183,15 @@ class DefaultEngine(object): if is_image_source: image = function.code['image'] - # Be consistent with k8s naming convention - identifier = ('%s-%s' % - (common.generate_unicode_uuid(dashed=False), - function_id) - )[:63] + identifier = ('%s-%s' % (execution_id, function_id))[:63] else: identifier = runtime_id labels = {'runtime_id': runtime_id} try: - # For image function, it will be executed inside this method; for - # package type function it only sets up underlying resources and - # get a service url. If the service url is already created + # For image function, it will be executed inside this method; + # For package type function it only sets up underlying resources + # and get a service url. If the service url is already created # beforehand, nothing happens. _, svc_url = self.orchestrator.prepare_execution( function_id, @@ -212,7 +207,8 @@ class DefaultEngine(object): return # For image type function, wait for its completion and retrieve the - # worker log; For package type function, invoke and get log + # worker log; + # For package type function, invoke and get log success, res = self.orchestrator.run_execution( execution_id, function_id, @@ -222,7 +218,8 @@ class DefaultEngine(object): identifier=identifier, service_url=svc_url, entry=function.entry, - trust_id=function.trust_id + trust_id=function.trust_id, + timeout=function.timeout ) utils.finish_execution(execution_id, success, res, diff --git a/qinling/engine/utils.py b/qinling/engine/utils.py index 9b7b9fa6..a8264b80 100644 --- a/qinling/engine/utils.py +++ b/qinling/engine/utils.py @@ -46,7 +46,7 @@ def url_request(request_session, url, body=None): LOG.exception( "Failed to request url %s, error: %s", ping_url, str(e) ) - return False, {'error': 'Function execution failed.'} + return False, {'output': 'Function execution failed.'} for a in six.moves.xrange(10): res = None @@ -67,12 +67,12 @@ def url_request(request_session, url, body=None): LOG.error("Response status: %s, content: %s", res.status_code, res.content) - return False, {'error': 'Function execution timeout.'} + return False, {'output': 'Function execution timeout.'} LOG.exception("Could not connect to function service. Reason: %s", exception) - return False, {'error': 'Internal service error.'} + return False, {'output': 'Internal service error.'} def get_request_data(conf, function_id, version, execution_id, rlimit, input, @@ -149,5 +149,7 @@ def handle_execution_exception(execution_id, exc_str): 'Error running execution %s: %s', execution_id, exc_str ) db_set_execution_status( - execution_id, status.ERROR, '', {'error': 'Function execution failed.'} + execution_id, status.ERROR, + '', + {'output': 'Function execution failed.'} ) diff --git a/qinling/exceptions.py b/qinling/exceptions.py index e8db68b9..a17dfeb1 100644 --- a/qinling/exceptions.py +++ b/qinling/exceptions.py @@ -121,3 +121,8 @@ class SwiftException(QinlingException): class EtcdLockException(QinlingException): http_code = 409 message = 'Etcd lock failed' + + +class TimeoutException(QinlingException): + http_code = 500 + message = 'Function execution timeout' diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index 82c79c86..b3dd4689 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -414,10 +414,14 @@ class KubernetesManager(base.OrchestratorBase): "Creating pod %s for image function:\n%s", pod_name, pod_body ) - self.v1.create_namespaced_pod( - self.conf.kubernetes.namespace, - body=yaml.safe_load(pod_body), - ) + try: + self.v1.create_namespaced_pod( + self.conf.kubernetes.namespace, + body=yaml.safe_load(pod_body), + ) + except Exception: + LOG.exception("Failed to create pod.") + raise exc.OrchestratorException('Execution preparation failed.') def _update_pod_label(self, pod, new_label): name = pod.metadata.name @@ -463,6 +467,7 @@ class KubernetesManager(base.OrchestratorBase): ) self._create_pod(image, rlimit, identifier, labels, input) + return identifier, None else: pods = self._choose_available_pods(labels, function_id=function_id, @@ -513,15 +518,16 @@ class KubernetesManager(base.OrchestratorBase): if status == 'Succeeded': return pod - raise exc.OrchestratorException() + raise exc.TimeoutException() duration = 0 try: r = tenacity.Retrying( wait=tenacity.wait_fixed(1), - stop=tenacity.stop_after_delay(180), + stop=tenacity.stop_after_delay(timeout), retry=tenacity.retry_if_exception_type( - exc.OrchestratorException) + exc.TimeoutException), + reraise=True ) pod = r.call(_wait_complete) @@ -533,15 +539,31 @@ class KubernetesManager(base.OrchestratorBase): delta = end_time - start_time duration = delta.seconds break + except exc.TimeoutException: + LOG.exception( + "Timeout for function execution %s, pod %s", + execution_id, identifier + ) + + self.v1.delete_namespaced_pod( + identifier, + self.conf.kubernetes.namespace, + {} + ) + LOG.debug('Pod %s deleted.', identifier) + + return False, {'output': 'Function execution timeout.', + 'duration': timeout} except Exception: LOG.exception("Failed to wait for pod %s", identifier) - return False, {'error': 'Function execution failed.', + return False, {'output': 'Function execution failed.', 'duration': duration} log = self.v1.read_namespaced_pod_log( identifier, self.conf.kubernetes.namespace, ) + return True, {'duration': duration, 'logs': log} def delete_function(self, function_id, version, labels=None): diff --git a/qinling/orchestrator/kubernetes/templates/pod.j2 b/qinling/orchestrator/kubernetes/templates/pod.j2 index f55326b0..125a6c92 100644 --- a/qinling/orchestrator/kubernetes/templates/pod.j2 +++ b/qinling/orchestrator/kubernetes/templates/pod.j2 @@ -18,7 +18,7 @@ spec: {% if input %} args: {% for item in input %} - - {{ item }} + - "{{ item | safe }}" {% endfor %} {% endif %} restartPolicy: Never diff --git a/qinling/tests/unit/engine/test_default_engine.py b/qinling/tests/unit/engine/test_default_engine.py index 2346087f..e30558b9 100644 --- a/qinling/tests/unit/engine/test_default_engine.py +++ b/qinling/tests/unit/engine/test_default_engine.py @@ -243,7 +243,7 @@ class TestDefaultEngine(base.DbTestCase): mock.Mock(), None) self.orchestrator.run_execution.side_effect = [ (True, {'duration': 5, 'logs': 'fake log'}), - (False, {'duration': 0, 'error': 'Function execution failed.'}) + (False, {'duration': 0, 'output': 'Function execution failed.'}) ] # Create two executions, with different results @@ -287,7 +287,8 @@ class TestDefaultEngine(base.DbTestCase): identifier=mock.ANY, service_url=None, entry=function.entry, - trust_id=function.trust_id), + trust_id=function.trust_id, + timeout=function.timeout), mock.call(execution_2_id, function_id, 0, @@ -296,7 +297,8 @@ class TestDefaultEngine(base.DbTestCase): identifier=mock.ANY, service_url=None, entry=function.entry, - trust_id=function.trust_id) + trust_id=function.trust_id, + timeout=function.timeout) ] self.orchestrator.run_execution.assert_has_calls(run_calls) @@ -309,7 +311,7 @@ class TestDefaultEngine(base.DbTestCase): self.assertEqual(status.FAILED, execution_2.status) self.assertEqual('', execution_2.logs) self.assertEqual( - {'duration': 0, 'error': 'Function execution failed.'}, + {'duration': 0, 'output': 'Function execution failed.'}, execution_2.result ) @@ -348,10 +350,11 @@ class TestDefaultEngine(base.DbTestCase): mock.Mock(), execution_id, function_id, 0, runtime_id) execution = db_api.get_execution(execution_id) - self.assertEqual(execution.status, status.ERROR) - self.assertEqual(execution.logs, '') - self.assertEqual(execution.result, - {'error': 'Function execution failed.'}) + + self.assertEqual(status.ERROR, execution.status) + self.assertEqual('', execution.logs) + self.assertEqual({'output': 'Function execution failed.'}, + execution.result) @mock.patch('qinling.utils.etcd_util.get_service_url') def test_create_execution_package_type_function( @@ -385,7 +388,7 @@ class TestDefaultEngine(base.DbTestCase): self.orchestrator.run_execution.assert_called_once_with( execution_id, function_id, 0, rlimit=self.rlimit, input=None, identifier=runtime_id, service_url='svc_url', entry=function.entry, - trust_id=function.trust_id) + trust_id=function.trust_id, timeout=function.timeout) execution = db_api.get_execution(execution_id) @@ -410,10 +413,10 @@ class TestDefaultEngine(base.DbTestCase): execution = db_api.get_execution(execution_id) - self.assertEqual(execution.status, status.ERROR) - self.assertEqual(execution.logs, '') - self.assertEqual(execution.result, - {'error': 'Function execution failed.'}) + self.assertEqual(status.ERROR, execution.status) + self.assertEqual('', execution.logs) + self.assertEqual({'output': 'Function execution failed.'}, + execution.result) @mock.patch('qinling.engine.utils.get_request_data') @mock.patch('qinling.engine.utils.url_request') diff --git a/qinling/tests/unit/orchestrator/kubernetes/test_manager.py b/qinling/tests/unit/orchestrator/kubernetes/test_manager.py index 662fa510..5f2f8369 100644 --- a/qinling/tests/unit/orchestrator/kubernetes/test_manager.py +++ b/qinling/tests/unit/orchestrator/kubernetes/test_manager.py @@ -500,6 +500,24 @@ class TestKubernetesManager(base.DbTestCase): self.k8s_v1_api.create_namespaced_pod.assert_called_once_with( self.fake_namespace, body=yaml.safe_load(pod_body)) + def test_prepare_execution_with_image_pod_failed(self): + function_id = common.generate_unicode_uuid() + image = self.rand_name('image', prefix=self.prefix) + identifier = ( + '%s-%s' % (common.generate_unicode_uuid(dashed=True), function_id) + )[:63] + self.k8s_v1_api.create_namespaced_pod.side_effect = RuntimeError + + self.assertRaises( + exc.OrchestratorException, + self.manager.prepare_execution, + function_id, + 0, + rlimit=self.rlimit, + image=image, + identifier=identifier, + ) + def test_prepare_execution_not_image_no_worker_available(self): ret_pods = mock.Mock() ret_pods.items = [] @@ -661,7 +679,7 @@ class TestKubernetesManager(base.DbTestCase): function_id = common.generate_unicode_uuid() result, output = self.manager.run_execution(execution_id, function_id, - 0) + 0, timeout=5) self.assertEqual(2, self.k8s_v1_api.read_namespaced_pod.call_count) self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with( @@ -671,13 +689,34 @@ class TestKubernetesManager(base.DbTestCase): expected_output = {'duration': 10, 'logs': fake_log} self.assertEqual(expected_output, output) + def test_run_execution_image_type_function_timeout(self): + execution_id = common.generate_unicode_uuid() + function_id = common.generate_unicode_uuid() + pod1 = mock.Mock() + pod1.status.phase = '' + self.k8s_v1_api.read_namespaced_pod.return_value = pod1 + + result, output = self.manager.run_execution( + execution_id, function_id, 0, + identifier='fake_identifier', + timeout=1 + ) + + self.assertFalse(result) + + expected_output = { + 'output': 'Function execution timeout.', + 'duration': 1 + } + self.assertEqual(expected_output, output) + def test_run_execution_image_type_function_read_pod_exception(self): self.k8s_v1_api.read_namespaced_pod.side_effect = RuntimeError execution_id = common.generate_unicode_uuid() function_id = common.generate_unicode_uuid() result, output = self.manager.run_execution(execution_id, function_id, - 0) + 0, timeout=5) self.k8s_v1_api.read_namespaced_pod.assert_called_once_with( None, self.fake_namespace) @@ -685,7 +724,7 @@ class TestKubernetesManager(base.DbTestCase): self.assertFalse(result) expected_output = { - 'error': 'Function execution failed.', + 'output': 'Function execution failed.', 'duration': 0 } self.assertEqual(expected_output, output) diff --git a/qinling/utils/executions.py b/qinling/utils/executions.py index 207fde51..ffce1c66 100644 --- a/qinling/utils/executions.py +++ b/qinling/utils/executions.py @@ -11,9 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import json - from oslo_log import log as logging +from oslo_serialization import jsonutils from qinling.db import api as db_api from qinling.db.sqlalchemy import models @@ -112,7 +111,12 @@ def create_execution(engine_client, params): # input in params should be a string. if input: try: - params['input'] = json.loads(input) + function_input = jsonutils.loads(input) + # If input is e.g. '6', result of jsonutils.loads is 6 which can + # not be stored in db. + if type(function_input) == int: + raise ValueError + params['input'] = function_input except ValueError: params['input'] = {'__function_input': input} diff --git a/qinling_tempest_plugin/tests/api/test_executions.py b/qinling_tempest_plugin/tests/api/test_executions.py index 37e245b9..a3e35609 100644 --- a/qinling_tempest_plugin/tests/api/test_executions.py +++ b/qinling_tempest_plugin/tests/api/test_executions.py @@ -91,7 +91,7 @@ class ExecutionsTest(base.BaseQinlingTest): self.assertEqual(201, resp.status) self.assertEqual('error', body['status']) result = jsonutils.loads(body['result']) - self.assertEqual('Function execution failed.', result['error']) + self.assertEqual('Function execution failed.', result['output']) @decorators.idempotent_id('2199d1e6-de7d-4345-8745-a8184d6022b1') def test_get_all_admin(self): @@ -266,7 +266,8 @@ class ExecutionsTest(base.BaseQinlingTest): @decorators.idempotent_id('d0598868-e45d-11e7-9125-00224d6b7bc1') def test_image_function_execution(self): - function_id = self.create_function(image=True) + function_id = self.create_function( + image="openstackqinling/alpine-test") resp, body = self.client.create_execution(function_id, input='Qinling') @@ -282,6 +283,42 @@ class ExecutionsTest(base.BaseQinlingTest): self.assertEqual(200, resp.status) self.assertIn('Qinling', body) + @decorators.idempotent_id('ab962144-d5b1-11e8-978f-026f8338c1e5') + def test_image_function_execution_timeout(self): + function_id = self.create_function(image="lingxiankong/sleep") + resp, body = self.client.create_execution(function_id, + input='6') + + self.assertEqual(201, resp.status) + self.addCleanup(self.client.delete_resource, 'executions', + body['id'], ignore_notfound=True) + self.assertEqual('failed', body['status']) + + result = jsonutils.loads(body['result']) + + self.assertGreaterEqual(result['duration'], 5) + self.assertIn( + 'Function execution timeout', result['output'] + ) + + # Update function timeout + resp, _ = self.client.update_function( + function_id, + timeout=10 + ) + self.assertEqual(200, resp.status_code) + + resp, body = self.client.create_execution(function_id, + input='6') + + self.assertEqual(201, resp.status) + self.addCleanup(self.client.delete_resource, 'executions', + body['id'], ignore_notfound=True) + self.assertEqual('success', body['status']) + + result = jsonutils.loads(body['result']) + self.assertGreaterEqual(result['duration'], 6) + @decorators.idempotent_id('ccfe67ce-e467-11e7-916c-00224d6b7bc1') def test_python_execution_positional_args(self): package = self.create_package( @@ -313,7 +350,6 @@ class ExecutionsTest(base.BaseQinlingTest): self.assertEqual('failed', body['status']) result = jsonutils.loads(body['result']) - self.assertNotIn('error', result) self.assertIn( 'Too many open files', result['output'] ) @@ -333,7 +369,6 @@ class ExecutionsTest(base.BaseQinlingTest): self.assertEqual('failed', body['status']) result = jsonutils.loads(body['result']) - self.assertNotIn('error', result) self.assertIn( 'too much resource consumption', result['output'] ) diff --git a/qinling_tempest_plugin/tests/base.py b/qinling_tempest_plugin/tests/base.py index 5097410e..46326bc2 100644 --- a/qinling_tempest_plugin/tests/base.py +++ b/qinling_tempest_plugin/tests/base.py @@ -118,7 +118,7 @@ class BaseQinlingTest(test.BaseTestCase): self.addCleanup(os.remove, zip_file) return zip_file - def create_function(self, package_path=None, image=False, + def create_function(self, package_path=None, image=None, md5sum=None, timeout=None): function_name = data_utils.rand_name( 'function', @@ -145,7 +145,7 @@ class BaseQinlingTest(test.BaseTestCase): ) else: resp, body = self.client.create_function( - {"source": "image", "image": "openstackqinling/alpine-test"}, + {"source": "image", "image": image}, None, name=function_name, )