Support Image type function timeout

Story: 2002174
Task: 26342

Change-Id: Id8c141ea46d3e4ee7a35c99a04f1e40583f6e3dc
This commit is contained in:
Lingxian Kong 2018-09-11 12:03:50 +12:00
parent 16aeaf899f
commit 83a0e27d59
10 changed files with 156 additions and 49 deletions

View File

@ -20,7 +20,6 @@ from qinling.db import api as db_api
from qinling.engine import utils from qinling.engine import utils
from qinling import exceptions as exc from qinling import exceptions as exc
from qinling import status from qinling import status
from qinling.utils import common
from qinling.utils import constants from qinling.utils import constants
from qinling.utils import etcd_util from qinling.utils import etcd_util
@ -184,19 +183,15 @@ class DefaultEngine(object):
if is_image_source: if is_image_source:
image = function.code['image'] image = function.code['image']
# Be consistent with k8s naming convention identifier = ('%s-%s' % (execution_id, function_id))[:63]
identifier = ('%s-%s' %
(common.generate_unicode_uuid(dashed=False),
function_id)
)[:63]
else: else:
identifier = runtime_id identifier = runtime_id
labels = {'runtime_id': runtime_id} labels = {'runtime_id': runtime_id}
try: try:
# For image function, it will be executed inside this method; for # For image function, it will be executed inside this method;
# package type function it only sets up underlying resources and # For package type function it only sets up underlying resources
# get a service url. If the service url is already created # and get a service url. If the service url is already created
# beforehand, nothing happens. # beforehand, nothing happens.
_, svc_url = self.orchestrator.prepare_execution( _, svc_url = self.orchestrator.prepare_execution(
function_id, function_id,
@ -212,7 +207,8 @@ class DefaultEngine(object):
return return
# For image type function, wait for its completion and retrieve the # For image type function, wait for its completion and retrieve the
# worker log; For package type function, invoke and get log # worker log;
# For package type function, invoke and get log
success, res = self.orchestrator.run_execution( success, res = self.orchestrator.run_execution(
execution_id, execution_id,
function_id, function_id,
@ -222,7 +218,8 @@ class DefaultEngine(object):
identifier=identifier, identifier=identifier,
service_url=svc_url, service_url=svc_url,
entry=function.entry, entry=function.entry,
trust_id=function.trust_id trust_id=function.trust_id,
timeout=function.timeout
) )
utils.finish_execution(execution_id, success, res, utils.finish_execution(execution_id, success, res,

View File

@ -46,7 +46,7 @@ def url_request(request_session, url, body=None):
LOG.exception( LOG.exception(
"Failed to request url %s, error: %s", ping_url, str(e) "Failed to request url %s, error: %s", ping_url, str(e)
) )
return False, {'error': 'Function execution failed.'} return False, {'output': 'Function execution failed.'}
for a in six.moves.xrange(10): for a in six.moves.xrange(10):
res = None res = None
@ -67,12 +67,12 @@ def url_request(request_session, url, body=None):
LOG.error("Response status: %s, content: %s", LOG.error("Response status: %s, content: %s",
res.status_code, res.content) res.status_code, res.content)
return False, {'error': 'Function execution timeout.'} return False, {'output': 'Function execution timeout.'}
LOG.exception("Could not connect to function service. Reason: %s", LOG.exception("Could not connect to function service. Reason: %s",
exception) exception)
return False, {'error': 'Internal service error.'} return False, {'output': 'Internal service error.'}
def get_request_data(conf, function_id, version, execution_id, rlimit, input, def get_request_data(conf, function_id, version, execution_id, rlimit, input,
@ -149,5 +149,7 @@ def handle_execution_exception(execution_id, exc_str):
'Error running execution %s: %s', execution_id, exc_str 'Error running execution %s: %s', execution_id, exc_str
) )
db_set_execution_status( db_set_execution_status(
execution_id, status.ERROR, '', {'error': 'Function execution failed.'} execution_id, status.ERROR,
'',
{'output': 'Function execution failed.'}
) )

View File

@ -121,3 +121,8 @@ class SwiftException(QinlingException):
class EtcdLockException(QinlingException): class EtcdLockException(QinlingException):
http_code = 409 http_code = 409
message = 'Etcd lock failed' message = 'Etcd lock failed'
class TimeoutException(QinlingException):
http_code = 500
message = 'Function execution timeout'

View File

@ -414,10 +414,14 @@ class KubernetesManager(base.OrchestratorBase):
"Creating pod %s for image function:\n%s", pod_name, pod_body "Creating pod %s for image function:\n%s", pod_name, pod_body
) )
self.v1.create_namespaced_pod( try:
self.conf.kubernetes.namespace, self.v1.create_namespaced_pod(
body=yaml.safe_load(pod_body), self.conf.kubernetes.namespace,
) body=yaml.safe_load(pod_body),
)
except Exception:
LOG.exception("Failed to create pod.")
raise exc.OrchestratorException('Execution preparation failed.')
def _update_pod_label(self, pod, new_label): def _update_pod_label(self, pod, new_label):
name = pod.metadata.name name = pod.metadata.name
@ -463,6 +467,7 @@ class KubernetesManager(base.OrchestratorBase):
) )
self._create_pod(image, rlimit, identifier, labels, input) self._create_pod(image, rlimit, identifier, labels, input)
return identifier, None return identifier, None
else: else:
pods = self._choose_available_pods(labels, function_id=function_id, pods = self._choose_available_pods(labels, function_id=function_id,
@ -513,15 +518,16 @@ class KubernetesManager(base.OrchestratorBase):
if status == 'Succeeded': if status == 'Succeeded':
return pod return pod
raise exc.OrchestratorException() raise exc.TimeoutException()
duration = 0 duration = 0
try: try:
r = tenacity.Retrying( r = tenacity.Retrying(
wait=tenacity.wait_fixed(1), wait=tenacity.wait_fixed(1),
stop=tenacity.stop_after_delay(180), stop=tenacity.stop_after_delay(timeout),
retry=tenacity.retry_if_exception_type( retry=tenacity.retry_if_exception_type(
exc.OrchestratorException) exc.TimeoutException),
reraise=True
) )
pod = r.call(_wait_complete) pod = r.call(_wait_complete)
@ -533,15 +539,31 @@ class KubernetesManager(base.OrchestratorBase):
delta = end_time - start_time delta = end_time - start_time
duration = delta.seconds duration = delta.seconds
break break
except exc.TimeoutException:
LOG.exception(
"Timeout for function execution %s, pod %s",
execution_id, identifier
)
self.v1.delete_namespaced_pod(
identifier,
self.conf.kubernetes.namespace,
{}
)
LOG.debug('Pod %s deleted.', identifier)
return False, {'output': 'Function execution timeout.',
'duration': timeout}
except Exception: except Exception:
LOG.exception("Failed to wait for pod %s", identifier) LOG.exception("Failed to wait for pod %s", identifier)
return False, {'error': 'Function execution failed.', return False, {'output': 'Function execution failed.',
'duration': duration} 'duration': duration}
log = self.v1.read_namespaced_pod_log( log = self.v1.read_namespaced_pod_log(
identifier, identifier,
self.conf.kubernetes.namespace, self.conf.kubernetes.namespace,
) )
return True, {'duration': duration, 'logs': log} return True, {'duration': duration, 'logs': log}
def delete_function(self, function_id, version, labels=None): def delete_function(self, function_id, version, labels=None):

View File

@ -18,7 +18,7 @@ spec:
{% if input %} {% if input %}
args: args:
{% for item in input %} {% for item in input %}
- {{ item }} - "{{ item | safe }}"
{% endfor %} {% endfor %}
{% endif %} {% endif %}
restartPolicy: Never restartPolicy: Never

View File

@ -243,7 +243,7 @@ class TestDefaultEngine(base.DbTestCase):
mock.Mock(), None) mock.Mock(), None)
self.orchestrator.run_execution.side_effect = [ self.orchestrator.run_execution.side_effect = [
(True, {'duration': 5, 'logs': 'fake log'}), (True, {'duration': 5, 'logs': 'fake log'}),
(False, {'duration': 0, 'error': 'Function execution failed.'}) (False, {'duration': 0, 'output': 'Function execution failed.'})
] ]
# Create two executions, with different results # Create two executions, with different results
@ -287,7 +287,8 @@ class TestDefaultEngine(base.DbTestCase):
identifier=mock.ANY, identifier=mock.ANY,
service_url=None, service_url=None,
entry=function.entry, entry=function.entry,
trust_id=function.trust_id), trust_id=function.trust_id,
timeout=function.timeout),
mock.call(execution_2_id, mock.call(execution_2_id,
function_id, function_id,
0, 0,
@ -296,7 +297,8 @@ class TestDefaultEngine(base.DbTestCase):
identifier=mock.ANY, identifier=mock.ANY,
service_url=None, service_url=None,
entry=function.entry, entry=function.entry,
trust_id=function.trust_id) trust_id=function.trust_id,
timeout=function.timeout)
] ]
self.orchestrator.run_execution.assert_has_calls(run_calls) self.orchestrator.run_execution.assert_has_calls(run_calls)
@ -309,7 +311,7 @@ class TestDefaultEngine(base.DbTestCase):
self.assertEqual(status.FAILED, execution_2.status) self.assertEqual(status.FAILED, execution_2.status)
self.assertEqual('', execution_2.logs) self.assertEqual('', execution_2.logs)
self.assertEqual( self.assertEqual(
{'duration': 0, 'error': 'Function execution failed.'}, {'duration': 0, 'output': 'Function execution failed.'},
execution_2.result execution_2.result
) )
@ -348,10 +350,11 @@ class TestDefaultEngine(base.DbTestCase):
mock.Mock(), execution_id, function_id, 0, runtime_id) mock.Mock(), execution_id, function_id, 0, runtime_id)
execution = db_api.get_execution(execution_id) execution = db_api.get_execution(execution_id)
self.assertEqual(execution.status, status.ERROR)
self.assertEqual(execution.logs, '') self.assertEqual(status.ERROR, execution.status)
self.assertEqual(execution.result, self.assertEqual('', execution.logs)
{'error': 'Function execution failed.'}) self.assertEqual({'output': 'Function execution failed.'},
execution.result)
@mock.patch('qinling.utils.etcd_util.get_service_url') @mock.patch('qinling.utils.etcd_util.get_service_url')
def test_create_execution_package_type_function( def test_create_execution_package_type_function(
@ -385,7 +388,7 @@ class TestDefaultEngine(base.DbTestCase):
self.orchestrator.run_execution.assert_called_once_with( self.orchestrator.run_execution.assert_called_once_with(
execution_id, function_id, 0, rlimit=self.rlimit, input=None, execution_id, function_id, 0, rlimit=self.rlimit, input=None,
identifier=runtime_id, service_url='svc_url', entry=function.entry, identifier=runtime_id, service_url='svc_url', entry=function.entry,
trust_id=function.trust_id) trust_id=function.trust_id, timeout=function.timeout)
execution = db_api.get_execution(execution_id) execution = db_api.get_execution(execution_id)
@ -410,10 +413,10 @@ class TestDefaultEngine(base.DbTestCase):
execution = db_api.get_execution(execution_id) execution = db_api.get_execution(execution_id)
self.assertEqual(execution.status, status.ERROR) self.assertEqual(status.ERROR, execution.status)
self.assertEqual(execution.logs, '') self.assertEqual('', execution.logs)
self.assertEqual(execution.result, self.assertEqual({'output': 'Function execution failed.'},
{'error': 'Function execution failed.'}) execution.result)
@mock.patch('qinling.engine.utils.get_request_data') @mock.patch('qinling.engine.utils.get_request_data')
@mock.patch('qinling.engine.utils.url_request') @mock.patch('qinling.engine.utils.url_request')

View File

@ -500,6 +500,24 @@ class TestKubernetesManager(base.DbTestCase):
self.k8s_v1_api.create_namespaced_pod.assert_called_once_with( self.k8s_v1_api.create_namespaced_pod.assert_called_once_with(
self.fake_namespace, body=yaml.safe_load(pod_body)) self.fake_namespace, body=yaml.safe_load(pod_body))
def test_prepare_execution_with_image_pod_failed(self):
function_id = common.generate_unicode_uuid()
image = self.rand_name('image', prefix=self.prefix)
identifier = (
'%s-%s' % (common.generate_unicode_uuid(dashed=True), function_id)
)[:63]
self.k8s_v1_api.create_namespaced_pod.side_effect = RuntimeError
self.assertRaises(
exc.OrchestratorException,
self.manager.prepare_execution,
function_id,
0,
rlimit=self.rlimit,
image=image,
identifier=identifier,
)
def test_prepare_execution_not_image_no_worker_available(self): def test_prepare_execution_not_image_no_worker_available(self):
ret_pods = mock.Mock() ret_pods = mock.Mock()
ret_pods.items = [] ret_pods.items = []
@ -661,7 +679,7 @@ class TestKubernetesManager(base.DbTestCase):
function_id = common.generate_unicode_uuid() function_id = common.generate_unicode_uuid()
result, output = self.manager.run_execution(execution_id, function_id, result, output = self.manager.run_execution(execution_id, function_id,
0) 0, timeout=5)
self.assertEqual(2, self.k8s_v1_api.read_namespaced_pod.call_count) self.assertEqual(2, self.k8s_v1_api.read_namespaced_pod.call_count)
self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with( self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with(
@ -671,13 +689,34 @@ class TestKubernetesManager(base.DbTestCase):
expected_output = {'duration': 10, 'logs': fake_log} expected_output = {'duration': 10, 'logs': fake_log}
self.assertEqual(expected_output, output) self.assertEqual(expected_output, output)
def test_run_execution_image_type_function_timeout(self):
execution_id = common.generate_unicode_uuid()
function_id = common.generate_unicode_uuid()
pod1 = mock.Mock()
pod1.status.phase = ''
self.k8s_v1_api.read_namespaced_pod.return_value = pod1
result, output = self.manager.run_execution(
execution_id, function_id, 0,
identifier='fake_identifier',
timeout=1
)
self.assertFalse(result)
expected_output = {
'output': 'Function execution timeout.',
'duration': 1
}
self.assertEqual(expected_output, output)
def test_run_execution_image_type_function_read_pod_exception(self): def test_run_execution_image_type_function_read_pod_exception(self):
self.k8s_v1_api.read_namespaced_pod.side_effect = RuntimeError self.k8s_v1_api.read_namespaced_pod.side_effect = RuntimeError
execution_id = common.generate_unicode_uuid() execution_id = common.generate_unicode_uuid()
function_id = common.generate_unicode_uuid() function_id = common.generate_unicode_uuid()
result, output = self.manager.run_execution(execution_id, function_id, result, output = self.manager.run_execution(execution_id, function_id,
0) 0, timeout=5)
self.k8s_v1_api.read_namespaced_pod.assert_called_once_with( self.k8s_v1_api.read_namespaced_pod.assert_called_once_with(
None, self.fake_namespace) None, self.fake_namespace)
@ -685,7 +724,7 @@ class TestKubernetesManager(base.DbTestCase):
self.assertFalse(result) self.assertFalse(result)
expected_output = { expected_output = {
'error': 'Function execution failed.', 'output': 'Function execution failed.',
'duration': 0 'duration': 0
} }
self.assertEqual(expected_output, output) self.assertEqual(expected_output, output)

View File

@ -11,9 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json
from oslo_log import log as logging from oslo_log import log as logging
from oslo_serialization import jsonutils
from qinling.db import api as db_api from qinling.db import api as db_api
from qinling.db.sqlalchemy import models from qinling.db.sqlalchemy import models
@ -112,7 +111,12 @@ def create_execution(engine_client, params):
# input in params should be a string. # input in params should be a string.
if input: if input:
try: try:
params['input'] = json.loads(input) function_input = jsonutils.loads(input)
# If input is e.g. '6', result of jsonutils.loads is 6 which can
# not be stored in db.
if type(function_input) == int:
raise ValueError
params['input'] = function_input
except ValueError: except ValueError:
params['input'] = {'__function_input': input} params['input'] = {'__function_input': input}

View File

@ -91,7 +91,7 @@ class ExecutionsTest(base.BaseQinlingTest):
self.assertEqual(201, resp.status) self.assertEqual(201, resp.status)
self.assertEqual('error', body['status']) self.assertEqual('error', body['status'])
result = jsonutils.loads(body['result']) result = jsonutils.loads(body['result'])
self.assertEqual('Function execution failed.', result['error']) self.assertEqual('Function execution failed.', result['output'])
@decorators.idempotent_id('2199d1e6-de7d-4345-8745-a8184d6022b1') @decorators.idempotent_id('2199d1e6-de7d-4345-8745-a8184d6022b1')
def test_get_all_admin(self): def test_get_all_admin(self):
@ -266,7 +266,8 @@ class ExecutionsTest(base.BaseQinlingTest):
@decorators.idempotent_id('d0598868-e45d-11e7-9125-00224d6b7bc1') @decorators.idempotent_id('d0598868-e45d-11e7-9125-00224d6b7bc1')
def test_image_function_execution(self): def test_image_function_execution(self):
function_id = self.create_function(image=True) function_id = self.create_function(
image="openstackqinling/alpine-test")
resp, body = self.client.create_execution(function_id, resp, body = self.client.create_execution(function_id,
input='Qinling') input='Qinling')
@ -282,6 +283,42 @@ class ExecutionsTest(base.BaseQinlingTest):
self.assertEqual(200, resp.status) self.assertEqual(200, resp.status)
self.assertIn('Qinling', body) self.assertIn('Qinling', body)
@decorators.idempotent_id('ab962144-d5b1-11e8-978f-026f8338c1e5')
def test_image_function_execution_timeout(self):
function_id = self.create_function(image="lingxiankong/sleep")
resp, body = self.client.create_execution(function_id,
input='6')
self.assertEqual(201, resp.status)
self.addCleanup(self.client.delete_resource, 'executions',
body['id'], ignore_notfound=True)
self.assertEqual('failed', body['status'])
result = jsonutils.loads(body['result'])
self.assertGreaterEqual(result['duration'], 5)
self.assertIn(
'Function execution timeout', result['output']
)
# Update function timeout
resp, _ = self.client.update_function(
function_id,
timeout=10
)
self.assertEqual(200, resp.status_code)
resp, body = self.client.create_execution(function_id,
input='6')
self.assertEqual(201, resp.status)
self.addCleanup(self.client.delete_resource, 'executions',
body['id'], ignore_notfound=True)
self.assertEqual('success', body['status'])
result = jsonutils.loads(body['result'])
self.assertGreaterEqual(result['duration'], 6)
@decorators.idempotent_id('ccfe67ce-e467-11e7-916c-00224d6b7bc1') @decorators.idempotent_id('ccfe67ce-e467-11e7-916c-00224d6b7bc1')
def test_python_execution_positional_args(self): def test_python_execution_positional_args(self):
package = self.create_package( package = self.create_package(
@ -313,7 +350,6 @@ class ExecutionsTest(base.BaseQinlingTest):
self.assertEqual('failed', body['status']) self.assertEqual('failed', body['status'])
result = jsonutils.loads(body['result']) result = jsonutils.loads(body['result'])
self.assertNotIn('error', result)
self.assertIn( self.assertIn(
'Too many open files', result['output'] 'Too many open files', result['output']
) )
@ -333,7 +369,6 @@ class ExecutionsTest(base.BaseQinlingTest):
self.assertEqual('failed', body['status']) self.assertEqual('failed', body['status'])
result = jsonutils.loads(body['result']) result = jsonutils.loads(body['result'])
self.assertNotIn('error', result)
self.assertIn( self.assertIn(
'too much resource consumption', result['output'] 'too much resource consumption', result['output']
) )

View File

@ -118,7 +118,7 @@ class BaseQinlingTest(test.BaseTestCase):
self.addCleanup(os.remove, zip_file) self.addCleanup(os.remove, zip_file)
return zip_file return zip_file
def create_function(self, package_path=None, image=False, def create_function(self, package_path=None, image=None,
md5sum=None, timeout=None): md5sum=None, timeout=None):
function_name = data_utils.rand_name( function_name = data_utils.rand_name(
'function', 'function',
@ -145,7 +145,7 @@ class BaseQinlingTest(test.BaseTestCase):
) )
else: else:
resp, body = self.client.create_function( resp, body = self.client.create_function(
{"source": "image", "image": "openstackqinling/alpine-test"}, {"source": "image", "image": image},
None, None,
name=function_name, name=function_name,
) )