Add duration for image type function

For image type function execution, the response message should be
similar to package type function, which includes the duration
information. There is no output field in the result for image type
function. The output can be checked by query the execution log.

Change-Id: I02dbd53db4f8bee3696b810de1df7bb2c77c20b3
Story: 2003145
Task: 23280
This commit is contained in:
Lingxian Kong 2018-09-04 23:38:40 +12:00
parent 5f7ba972b6
commit bcc7a125ae
6 changed files with 123 additions and 87 deletions

View File

@ -182,7 +182,7 @@ class DefaultEngine(object):
is_image_source=is_image_source) is_image_source=is_image_source)
return return
if source == constants.IMAGE_FUNCTION: if is_image_source:
image = function.code['image'] image = function.code['image']
# Be consistent with k8s naming convention # Be consistent with k8s naming convention
identifier = ('%s-%s' % identifier = ('%s-%s' %
@ -211,8 +211,8 @@ class DefaultEngine(object):
utils.handle_execution_exception(execution_id, str(e)) utils.handle_execution_exception(execution_id, str(e))
return return
# For image type function, read the worker log; For package type # For image type function, wait for its completion and retrieve the
# function, invoke and get log # worker log; For package type function, invoke and get log
success, res = self.orchestrator.run_execution( success, res = self.orchestrator.run_execution(
execution_id, execution_id,
function_id, function_id,

View File

@ -130,16 +130,8 @@ def db_set_execution_status(execution_id, execution_status, logs, res):
def finish_execution(execution_id, success, res, is_image_source=False): def finish_execution(execution_id, success, res, is_image_source=False):
logs = '' logs = res.pop('logs', '')
if is_image_source: success = success and res.pop('success', True)
# If the function is created from docker image, the result is
# direct output, here we convert to a dict to fit into the db
# schema.
res = {'output': res}
else:
# Execution log is only available for non-image source execution.
logs = res.pop('logs', '')
success = success and res.pop('success')
LOG.debug( LOG.debug(
'Finished execution %s, success: %s', execution_id, success 'Finished execution %s, success: %s', execution_id, success

View File

@ -509,28 +509,40 @@ class KubernetesManager(base.OrchestratorBase):
self.conf.kubernetes.namespace self.conf.kubernetes.namespace
) )
status = pod.status.phase status = pod.status.phase
return True if status == 'Succeeded' else False
if status == 'Succeeded':
return pod
raise exc.OrchestratorException()
duration = 0
try: try:
r = tenacity.Retrying( r = tenacity.Retrying(
wait=tenacity.wait_fixed(1), wait=tenacity.wait_fixed(1),
stop=tenacity.stop_after_delay(180), stop=tenacity.stop_after_delay(180),
retry=tenacity.retry_if_result( retry=tenacity.retry_if_exception_type(
lambda result: result is False) exc.OrchestratorException)
) )
r.call(_wait_complete) pod = r.call(_wait_complete)
except Exception as e:
LOG.exception(
"Failed to get pod output, pod: %s, error: %s",
identifier, str(e)
)
return False, {'error': 'Function execution failed.'}
output = self.v1.read_namespaced_pod_log( statuses = pod.status.container_statuses
for s in statuses:
if hasattr(s.state, "terminated"):
end_time = s.state.terminated.finished_at
start_time = s.state.terminated.started_at
delta = end_time - start_time
duration = delta.seconds
break
except Exception:
LOG.exception("Failed to wait for pod %s", identifier)
return False, {'error': 'Function execution failed.',
'duration': duration}
log = self.v1.read_namespaced_pod_log(
identifier, identifier,
self.conf.kubernetes.namespace, self.conf.kubernetes.namespace,
) )
return True, output return True, {'duration': duration, 'logs': log}
def delete_function(self, function_id, version, labels=None): def delete_function(self, function_id, version, labels=None):
"""Delete related resources for function. """Delete related resources for function.

View File

@ -242,8 +242,9 @@ class TestDefaultEngine(base.DbTestCase):
self.orchestrator.prepare_execution.return_value = ( self.orchestrator.prepare_execution.return_value = (
mock.Mock(), None) mock.Mock(), None)
self.orchestrator.run_execution.side_effect = [ self.orchestrator.run_execution.side_effect = [
(True, 'success result'), (True, {'duration': 5, 'logs': 'fake log'}),
(False, 'failed result')] (False, {'duration': 0, 'error': 'Function execution failed.'})
]
# Create two executions, with different results # Create two executions, with different results
self.default_engine.create_execution( self.default_engine.create_execution(
@ -302,12 +303,15 @@ class TestDefaultEngine(base.DbTestCase):
execution_1 = db_api.get_execution(execution_1_id) execution_1 = db_api.get_execution(execution_1_id)
execution_2 = db_api.get_execution(execution_2_id) execution_2 = db_api.get_execution(execution_2_id)
self.assertEqual(execution_1.status, status.SUCCESS) self.assertEqual(status.SUCCESS, execution_1.status)
self.assertEqual(execution_1.logs, '') self.assertEqual('fake log', execution_1.logs)
self.assertEqual(execution_1.result, {'output': 'success result'}) self.assertEqual({"duration": 5}, execution_1.result)
self.assertEqual(execution_2.status, status.FAILED) self.assertEqual(status.FAILED, execution_2.status)
self.assertEqual(execution_2.logs, '') self.assertEqual('', execution_2.logs)
self.assertEqual(execution_2.result, {'output': 'failed result'}) self.assertEqual(
{'duration': 0, 'error': 'Function execution failed.'},
execution_2.result
)
@mock.patch('qinling.utils.etcd_util.get_service_url') @mock.patch('qinling.utils.etcd_util.get_service_url')
def test_create_execution_prepare_execution_exception( def test_create_execution_prepare_execution_exception(

View File

@ -12,10 +12,11 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import mock import datetime
import testtools import testtools
import yaml import yaml
import mock
from oslo_config import cfg from oslo_config import cfg
from qinling import config from qinling import config
@ -616,10 +617,62 @@ class TestKubernetesManager(base.DbTestCase):
def test_run_execution_image_type_function(self): def test_run_execution_image_type_function(self):
pod = mock.Mock() pod = mock.Mock()
status = mock.Mock()
status.state.terminated.finished_at = datetime.datetime(2018, 9, 4, 10,
1, 50)
status.state.terminated.started_at = datetime.datetime(2018, 9, 4, 10,
1, 40)
pod.status.phase = 'Succeeded' pod.status.phase = 'Succeeded'
pod.status.container_statuses = [status]
self.k8s_v1_api.read_namespaced_pod.return_value = pod self.k8s_v1_api.read_namespaced_pod.return_value = pod
fake_output = 'fake output' fake_log = 'fake log'
self.k8s_v1_api.read_namespaced_pod_log.return_value = fake_output self.k8s_v1_api.read_namespaced_pod_log.return_value = fake_log
execution_id = common.generate_unicode_uuid()
function_id = common.generate_unicode_uuid()
identifier = 'fake_identifier'
result, output = self.manager.run_execution(execution_id, function_id,
0, identifier=identifier)
self.k8s_v1_api.read_namespaced_pod.assert_called_once_with(
identifier, self.fake_namespace)
self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with(
identifier, self.fake_namespace)
self.assertTrue(result)
expected_output = {'duration': 10, 'logs': fake_log}
self.assertEqual(expected_output, output)
def test_run_execution_image_type_function_retry(self):
pod1 = mock.Mock()
pod1.status.phase = ''
pod2 = mock.Mock()
status = mock.Mock()
status.state.terminated.finished_at = datetime.datetime(2018, 9, 4, 10,
1, 50)
status.state.terminated.started_at = datetime.datetime(2018, 9, 4, 10,
1, 40)
pod2.status.phase = 'Succeeded'
pod2.status.container_statuses = [status]
self.k8s_v1_api.read_namespaced_pod.side_effect = [pod1, pod2]
fake_log = 'fake log'
self.k8s_v1_api.read_namespaced_pod_log.return_value = fake_log
execution_id = common.generate_unicode_uuid()
function_id = common.generate_unicode_uuid()
result, output = self.manager.run_execution(execution_id, function_id,
0)
self.assertEqual(2, self.k8s_v1_api.read_namespaced_pod.call_count)
self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with(
None, self.fake_namespace)
self.assertTrue(result)
expected_output = {'duration': 10, 'logs': fake_log}
self.assertEqual(expected_output, output)
def test_run_execution_image_type_function_read_pod_exception(self):
self.k8s_v1_api.read_namespaced_pod.side_effect = RuntimeError
execution_id = common.generate_unicode_uuid() execution_id = common.generate_unicode_uuid()
function_id = common.generate_unicode_uuid() function_id = common.generate_unicode_uuid()
@ -628,10 +681,14 @@ class TestKubernetesManager(base.DbTestCase):
self.k8s_v1_api.read_namespaced_pod.assert_called_once_with( self.k8s_v1_api.read_namespaced_pod.assert_called_once_with(
None, self.fake_namespace) None, self.fake_namespace)
self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with( self.k8s_v1_api.read_namespaced_pod_log.assert_not_called()
None, self.fake_namespace) self.assertFalse(result)
self.assertTrue(result)
self.assertEqual(fake_output, output) expected_output = {
'error': 'Function execution failed.',
'duration': 0
}
self.assertEqual(expected_output, output)
@mock.patch('qinling.engine.utils.url_request') @mock.patch('qinling.engine.utils.url_request')
def test_run_execution_version_0(self, mock_request): def test_run_execution_version_0(self, mock_request):
@ -662,40 +719,6 @@ class TestKubernetesManager(base.DbTestCase):
self.manager.session, 'FAKE_URL/execute', body=data self.manager.session, 'FAKE_URL/execute', body=data
) )
def test_run_execution_no_service_url_retry(self):
pod1 = mock.Mock()
pod1.status.phase = ''
pod2 = mock.Mock()
pod2.status.phase = 'Succeeded'
self.k8s_v1_api.read_namespaced_pod.side_effect = [pod1, pod2]
fake_output = 'fake output'
self.k8s_v1_api.read_namespaced_pod_log.return_value = fake_output
execution_id = common.generate_unicode_uuid()
function_id = common.generate_unicode_uuid()
result, output = self.manager.run_execution(execution_id, function_id,
0)
self.assertEqual(2, self.k8s_v1_api.read_namespaced_pod.call_count)
self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with(
None, self.fake_namespace)
self.assertTrue(result)
self.assertEqual(fake_output, output)
def test_run_execution_no_service_url_read_pod_exception(self):
self.k8s_v1_api.read_namespaced_pod.side_effect = RuntimeError
execution_id = common.generate_unicode_uuid()
function_id = common.generate_unicode_uuid()
result, output = self.manager.run_execution(execution_id, function_id,
0)
self.k8s_v1_api.read_namespaced_pod.assert_called_once_with(
None, self.fake_namespace)
self.k8s_v1_api.read_namespaced_pod_log.assert_not_called()
self.assertFalse(result)
self.assertEqual({'error': 'Function execution failed.'}, output)
def test_delete_function(self): def test_delete_function(self):
# Deleting namespaced service is also tested in this. # Deleting namespaced service is also tested in this.
svc1 = mock.Mock() svc1 = mock.Mock()

View File

@ -264,6 +264,24 @@ class ExecutionsTest(base.BaseQinlingTest):
self.assertEqual(200, resp.status) self.assertEqual(200, resp.status)
self.assertEqual(2, len(body['workers'])) self.assertEqual(2, len(body['workers']))
@decorators.idempotent_id('d0598868-e45d-11e7-9125-00224d6b7bc1')
def test_image_function_execution(self):
function_id = self.create_function(image=True)
resp, body = self.client.create_execution(function_id,
input='Qinling')
self.assertEqual(201, resp.status)
execution_id = body['id']
self.addCleanup(self.client.delete_resource, 'executions',
execution_id, ignore_notfound=True)
self.assertEqual('success', body['status'])
self.assertIn('duration', jsonutils.loads(body['result']))
resp, body = self.client.get_execution_log(execution_id)
self.assertEqual(200, resp.status)
self.assertIn('Qinling', body)
@decorators.idempotent_id('ccfe67ce-e467-11e7-916c-00224d6b7bc1') @decorators.idempotent_id('ccfe67ce-e467-11e7-916c-00224d6b7bc1')
def test_python_execution_positional_args(self): def test_python_execution_positional_args(self):
package = self.create_package( package = self.create_package(
@ -320,19 +338,6 @@ class ExecutionsTest(base.BaseQinlingTest):
'too much resource consumption', result['output'] 'too much resource consumption', result['output']
) )
@decorators.idempotent_id('d0598868-e45d-11e7-9125-00224d6b7bc1')
def test_execution_image_function(self):
function_id = self.create_function(image=True)
resp, body = self.client.create_execution(function_id,
input='Qinling')
self.assertEqual(201, resp.status)
execution_id = body['id']
self.addCleanup(self.client.delete_resource, 'executions',
execution_id, ignore_notfound=True)
self.assertEqual('success', body['status'])
self.assertIn('Qinling', jsonutils.loads(body['result'])['output'])
@decorators.idempotent_id('2b5f0787-b82d-4fc4-af76-cf86d389a76b') @decorators.idempotent_id('2b5f0787-b82d-4fc4-af76-cf86d389a76b')
def test_python_execution_memory_limit_non_image(self): def test_python_execution_memory_limit_non_image(self):
"""In this case, the following steps are taken: """In this case, the following steps are taken: