Merge "Add duration for image type function"
This commit is contained in:
commit
85b0fdd1ee
|
@ -182,7 +182,7 @@ class DefaultEngine(object):
|
||||||
is_image_source=is_image_source)
|
is_image_source=is_image_source)
|
||||||
return
|
return
|
||||||
|
|
||||||
if source == constants.IMAGE_FUNCTION:
|
if is_image_source:
|
||||||
image = function.code['image']
|
image = function.code['image']
|
||||||
# Be consistent with k8s naming convention
|
# Be consistent with k8s naming convention
|
||||||
identifier = ('%s-%s' %
|
identifier = ('%s-%s' %
|
||||||
|
@ -211,8 +211,8 @@ class DefaultEngine(object):
|
||||||
utils.handle_execution_exception(execution_id, str(e))
|
utils.handle_execution_exception(execution_id, str(e))
|
||||||
return
|
return
|
||||||
|
|
||||||
# For image type function, read the worker log; For package type
|
# For image type function, wait for its completion and retrieve the
|
||||||
# function, invoke and get log
|
# worker log; For package type function, invoke and get log
|
||||||
success, res = self.orchestrator.run_execution(
|
success, res = self.orchestrator.run_execution(
|
||||||
execution_id,
|
execution_id,
|
||||||
function_id,
|
function_id,
|
||||||
|
|
|
@ -130,16 +130,8 @@ def db_set_execution_status(execution_id, execution_status, logs, res):
|
||||||
|
|
||||||
|
|
||||||
def finish_execution(execution_id, success, res, is_image_source=False):
|
def finish_execution(execution_id, success, res, is_image_source=False):
|
||||||
logs = ''
|
logs = res.pop('logs', '')
|
||||||
if is_image_source:
|
success = success and res.pop('success', True)
|
||||||
# If the function is created from docker image, the result is
|
|
||||||
# direct output, here we convert to a dict to fit into the db
|
|
||||||
# schema.
|
|
||||||
res = {'output': res}
|
|
||||||
else:
|
|
||||||
# Execution log is only available for non-image source execution.
|
|
||||||
logs = res.pop('logs', '')
|
|
||||||
success = success and res.pop('success')
|
|
||||||
|
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
'Finished execution %s, success: %s', execution_id, success
|
'Finished execution %s, success: %s', execution_id, success
|
||||||
|
|
|
@ -509,28 +509,40 @@ class KubernetesManager(base.OrchestratorBase):
|
||||||
self.conf.kubernetes.namespace
|
self.conf.kubernetes.namespace
|
||||||
)
|
)
|
||||||
status = pod.status.phase
|
status = pod.status.phase
|
||||||
return True if status == 'Succeeded' else False
|
|
||||||
|
|
||||||
|
if status == 'Succeeded':
|
||||||
|
return pod
|
||||||
|
|
||||||
|
raise exc.OrchestratorException()
|
||||||
|
|
||||||
|
duration = 0
|
||||||
try:
|
try:
|
||||||
r = tenacity.Retrying(
|
r = tenacity.Retrying(
|
||||||
wait=tenacity.wait_fixed(1),
|
wait=tenacity.wait_fixed(1),
|
||||||
stop=tenacity.stop_after_delay(180),
|
stop=tenacity.stop_after_delay(180),
|
||||||
retry=tenacity.retry_if_result(
|
retry=tenacity.retry_if_exception_type(
|
||||||
lambda result: result is False)
|
exc.OrchestratorException)
|
||||||
)
|
)
|
||||||
r.call(_wait_complete)
|
pod = r.call(_wait_complete)
|
||||||
except Exception as e:
|
|
||||||
LOG.exception(
|
|
||||||
"Failed to get pod output, pod: %s, error: %s",
|
|
||||||
identifier, str(e)
|
|
||||||
)
|
|
||||||
return False, {'error': 'Function execution failed.'}
|
|
||||||
|
|
||||||
output = self.v1.read_namespaced_pod_log(
|
statuses = pod.status.container_statuses
|
||||||
|
for s in statuses:
|
||||||
|
if hasattr(s.state, "terminated"):
|
||||||
|
end_time = s.state.terminated.finished_at
|
||||||
|
start_time = s.state.terminated.started_at
|
||||||
|
delta = end_time - start_time
|
||||||
|
duration = delta.seconds
|
||||||
|
break
|
||||||
|
except Exception:
|
||||||
|
LOG.exception("Failed to wait for pod %s", identifier)
|
||||||
|
return False, {'error': 'Function execution failed.',
|
||||||
|
'duration': duration}
|
||||||
|
|
||||||
|
log = self.v1.read_namespaced_pod_log(
|
||||||
identifier,
|
identifier,
|
||||||
self.conf.kubernetes.namespace,
|
self.conf.kubernetes.namespace,
|
||||||
)
|
)
|
||||||
return True, output
|
return True, {'duration': duration, 'logs': log}
|
||||||
|
|
||||||
def delete_function(self, function_id, version, labels=None):
|
def delete_function(self, function_id, version, labels=None):
|
||||||
"""Delete related resources for function.
|
"""Delete related resources for function.
|
||||||
|
|
|
@ -242,8 +242,9 @@ class TestDefaultEngine(base.DbTestCase):
|
||||||
self.orchestrator.prepare_execution.return_value = (
|
self.orchestrator.prepare_execution.return_value = (
|
||||||
mock.Mock(), None)
|
mock.Mock(), None)
|
||||||
self.orchestrator.run_execution.side_effect = [
|
self.orchestrator.run_execution.side_effect = [
|
||||||
(True, 'success result'),
|
(True, {'duration': 5, 'logs': 'fake log'}),
|
||||||
(False, 'failed result')]
|
(False, {'duration': 0, 'error': 'Function execution failed.'})
|
||||||
|
]
|
||||||
|
|
||||||
# Create two executions, with different results
|
# Create two executions, with different results
|
||||||
self.default_engine.create_execution(
|
self.default_engine.create_execution(
|
||||||
|
@ -302,12 +303,15 @@ class TestDefaultEngine(base.DbTestCase):
|
||||||
execution_1 = db_api.get_execution(execution_1_id)
|
execution_1 = db_api.get_execution(execution_1_id)
|
||||||
execution_2 = db_api.get_execution(execution_2_id)
|
execution_2 = db_api.get_execution(execution_2_id)
|
||||||
|
|
||||||
self.assertEqual(execution_1.status, status.SUCCESS)
|
self.assertEqual(status.SUCCESS, execution_1.status)
|
||||||
self.assertEqual(execution_1.logs, '')
|
self.assertEqual('fake log', execution_1.logs)
|
||||||
self.assertEqual(execution_1.result, {'output': 'success result'})
|
self.assertEqual({"duration": 5}, execution_1.result)
|
||||||
self.assertEqual(execution_2.status, status.FAILED)
|
self.assertEqual(status.FAILED, execution_2.status)
|
||||||
self.assertEqual(execution_2.logs, '')
|
self.assertEqual('', execution_2.logs)
|
||||||
self.assertEqual(execution_2.result, {'output': 'failed result'})
|
self.assertEqual(
|
||||||
|
{'duration': 0, 'error': 'Function execution failed.'},
|
||||||
|
execution_2.result
|
||||||
|
)
|
||||||
|
|
||||||
@mock.patch('qinling.utils.etcd_util.get_service_url')
|
@mock.patch('qinling.utils.etcd_util.get_service_url')
|
||||||
def test_create_execution_prepare_execution_exception(
|
def test_create_execution_prepare_execution_exception(
|
||||||
|
|
|
@ -12,10 +12,11 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import mock
|
import datetime
|
||||||
import testtools
|
import testtools
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
|
import mock
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
|
|
||||||
from qinling import config
|
from qinling import config
|
||||||
|
@ -616,10 +617,62 @@ class TestKubernetesManager(base.DbTestCase):
|
||||||
|
|
||||||
def test_run_execution_image_type_function(self):
|
def test_run_execution_image_type_function(self):
|
||||||
pod = mock.Mock()
|
pod = mock.Mock()
|
||||||
|
status = mock.Mock()
|
||||||
|
status.state.terminated.finished_at = datetime.datetime(2018, 9, 4, 10,
|
||||||
|
1, 50)
|
||||||
|
status.state.terminated.started_at = datetime.datetime(2018, 9, 4, 10,
|
||||||
|
1, 40)
|
||||||
pod.status.phase = 'Succeeded'
|
pod.status.phase = 'Succeeded'
|
||||||
|
pod.status.container_statuses = [status]
|
||||||
self.k8s_v1_api.read_namespaced_pod.return_value = pod
|
self.k8s_v1_api.read_namespaced_pod.return_value = pod
|
||||||
fake_output = 'fake output'
|
fake_log = 'fake log'
|
||||||
self.k8s_v1_api.read_namespaced_pod_log.return_value = fake_output
|
self.k8s_v1_api.read_namespaced_pod_log.return_value = fake_log
|
||||||
|
execution_id = common.generate_unicode_uuid()
|
||||||
|
function_id = common.generate_unicode_uuid()
|
||||||
|
identifier = 'fake_identifier'
|
||||||
|
|
||||||
|
result, output = self.manager.run_execution(execution_id, function_id,
|
||||||
|
0, identifier=identifier)
|
||||||
|
|
||||||
|
self.k8s_v1_api.read_namespaced_pod.assert_called_once_with(
|
||||||
|
identifier, self.fake_namespace)
|
||||||
|
self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with(
|
||||||
|
identifier, self.fake_namespace)
|
||||||
|
self.assertTrue(result)
|
||||||
|
|
||||||
|
expected_output = {'duration': 10, 'logs': fake_log}
|
||||||
|
self.assertEqual(expected_output, output)
|
||||||
|
|
||||||
|
def test_run_execution_image_type_function_retry(self):
|
||||||
|
pod1 = mock.Mock()
|
||||||
|
pod1.status.phase = ''
|
||||||
|
pod2 = mock.Mock()
|
||||||
|
status = mock.Mock()
|
||||||
|
status.state.terminated.finished_at = datetime.datetime(2018, 9, 4, 10,
|
||||||
|
1, 50)
|
||||||
|
status.state.terminated.started_at = datetime.datetime(2018, 9, 4, 10,
|
||||||
|
1, 40)
|
||||||
|
pod2.status.phase = 'Succeeded'
|
||||||
|
pod2.status.container_statuses = [status]
|
||||||
|
self.k8s_v1_api.read_namespaced_pod.side_effect = [pod1, pod2]
|
||||||
|
fake_log = 'fake log'
|
||||||
|
self.k8s_v1_api.read_namespaced_pod_log.return_value = fake_log
|
||||||
|
execution_id = common.generate_unicode_uuid()
|
||||||
|
function_id = common.generate_unicode_uuid()
|
||||||
|
|
||||||
|
result, output = self.manager.run_execution(execution_id, function_id,
|
||||||
|
0)
|
||||||
|
|
||||||
|
self.assertEqual(2, self.k8s_v1_api.read_namespaced_pod.call_count)
|
||||||
|
self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with(
|
||||||
|
None, self.fake_namespace)
|
||||||
|
self.assertTrue(result)
|
||||||
|
|
||||||
|
expected_output = {'duration': 10, 'logs': fake_log}
|
||||||
|
self.assertEqual(expected_output, output)
|
||||||
|
|
||||||
|
def test_run_execution_image_type_function_read_pod_exception(self):
|
||||||
|
self.k8s_v1_api.read_namespaced_pod.side_effect = RuntimeError
|
||||||
execution_id = common.generate_unicode_uuid()
|
execution_id = common.generate_unicode_uuid()
|
||||||
function_id = common.generate_unicode_uuid()
|
function_id = common.generate_unicode_uuid()
|
||||||
|
|
||||||
|
@ -628,10 +681,14 @@ class TestKubernetesManager(base.DbTestCase):
|
||||||
|
|
||||||
self.k8s_v1_api.read_namespaced_pod.assert_called_once_with(
|
self.k8s_v1_api.read_namespaced_pod.assert_called_once_with(
|
||||||
None, self.fake_namespace)
|
None, self.fake_namespace)
|
||||||
self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with(
|
self.k8s_v1_api.read_namespaced_pod_log.assert_not_called()
|
||||||
None, self.fake_namespace)
|
self.assertFalse(result)
|
||||||
self.assertTrue(result)
|
|
||||||
self.assertEqual(fake_output, output)
|
expected_output = {
|
||||||
|
'error': 'Function execution failed.',
|
||||||
|
'duration': 0
|
||||||
|
}
|
||||||
|
self.assertEqual(expected_output, output)
|
||||||
|
|
||||||
@mock.patch('qinling.engine.utils.url_request')
|
@mock.patch('qinling.engine.utils.url_request')
|
||||||
def test_run_execution_version_0(self, mock_request):
|
def test_run_execution_version_0(self, mock_request):
|
||||||
|
@ -662,40 +719,6 @@ class TestKubernetesManager(base.DbTestCase):
|
||||||
self.manager.session, 'FAKE_URL/execute', body=data
|
self.manager.session, 'FAKE_URL/execute', body=data
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_run_execution_no_service_url_retry(self):
|
|
||||||
pod1 = mock.Mock()
|
|
||||||
pod1.status.phase = ''
|
|
||||||
pod2 = mock.Mock()
|
|
||||||
pod2.status.phase = 'Succeeded'
|
|
||||||
self.k8s_v1_api.read_namespaced_pod.side_effect = [pod1, pod2]
|
|
||||||
fake_output = 'fake output'
|
|
||||||
self.k8s_v1_api.read_namespaced_pod_log.return_value = fake_output
|
|
||||||
execution_id = common.generate_unicode_uuid()
|
|
||||||
function_id = common.generate_unicode_uuid()
|
|
||||||
|
|
||||||
result, output = self.manager.run_execution(execution_id, function_id,
|
|
||||||
0)
|
|
||||||
|
|
||||||
self.assertEqual(2, self.k8s_v1_api.read_namespaced_pod.call_count)
|
|
||||||
self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with(
|
|
||||||
None, self.fake_namespace)
|
|
||||||
self.assertTrue(result)
|
|
||||||
self.assertEqual(fake_output, output)
|
|
||||||
|
|
||||||
def test_run_execution_no_service_url_read_pod_exception(self):
|
|
||||||
self.k8s_v1_api.read_namespaced_pod.side_effect = RuntimeError
|
|
||||||
execution_id = common.generate_unicode_uuid()
|
|
||||||
function_id = common.generate_unicode_uuid()
|
|
||||||
|
|
||||||
result, output = self.manager.run_execution(execution_id, function_id,
|
|
||||||
0)
|
|
||||||
|
|
||||||
self.k8s_v1_api.read_namespaced_pod.assert_called_once_with(
|
|
||||||
None, self.fake_namespace)
|
|
||||||
self.k8s_v1_api.read_namespaced_pod_log.assert_not_called()
|
|
||||||
self.assertFalse(result)
|
|
||||||
self.assertEqual({'error': 'Function execution failed.'}, output)
|
|
||||||
|
|
||||||
def test_delete_function(self):
|
def test_delete_function(self):
|
||||||
# Deleting namespaced service is also tested in this.
|
# Deleting namespaced service is also tested in this.
|
||||||
svc1 = mock.Mock()
|
svc1 = mock.Mock()
|
||||||
|
|
|
@ -264,6 +264,24 @@ class ExecutionsTest(base.BaseQinlingTest):
|
||||||
self.assertEqual(200, resp.status)
|
self.assertEqual(200, resp.status)
|
||||||
self.assertEqual(2, len(body['workers']))
|
self.assertEqual(2, len(body['workers']))
|
||||||
|
|
||||||
|
@decorators.idempotent_id('d0598868-e45d-11e7-9125-00224d6b7bc1')
|
||||||
|
def test_image_function_execution(self):
|
||||||
|
function_id = self.create_function(image=True)
|
||||||
|
resp, body = self.client.create_execution(function_id,
|
||||||
|
input='Qinling')
|
||||||
|
|
||||||
|
self.assertEqual(201, resp.status)
|
||||||
|
execution_id = body['id']
|
||||||
|
self.addCleanup(self.client.delete_resource, 'executions',
|
||||||
|
execution_id, ignore_notfound=True)
|
||||||
|
|
||||||
|
self.assertEqual('success', body['status'])
|
||||||
|
self.assertIn('duration', jsonutils.loads(body['result']))
|
||||||
|
|
||||||
|
resp, body = self.client.get_execution_log(execution_id)
|
||||||
|
self.assertEqual(200, resp.status)
|
||||||
|
self.assertIn('Qinling', body)
|
||||||
|
|
||||||
@decorators.idempotent_id('ccfe67ce-e467-11e7-916c-00224d6b7bc1')
|
@decorators.idempotent_id('ccfe67ce-e467-11e7-916c-00224d6b7bc1')
|
||||||
def test_python_execution_positional_args(self):
|
def test_python_execution_positional_args(self):
|
||||||
package = self.create_package(
|
package = self.create_package(
|
||||||
|
@ -320,19 +338,6 @@ class ExecutionsTest(base.BaseQinlingTest):
|
||||||
'too much resource consumption', result['output']
|
'too much resource consumption', result['output']
|
||||||
)
|
)
|
||||||
|
|
||||||
@decorators.idempotent_id('d0598868-e45d-11e7-9125-00224d6b7bc1')
|
|
||||||
def test_execution_image_function(self):
|
|
||||||
function_id = self.create_function(image=True)
|
|
||||||
resp, body = self.client.create_execution(function_id,
|
|
||||||
input='Qinling')
|
|
||||||
|
|
||||||
self.assertEqual(201, resp.status)
|
|
||||||
execution_id = body['id']
|
|
||||||
self.addCleanup(self.client.delete_resource, 'executions',
|
|
||||||
execution_id, ignore_notfound=True)
|
|
||||||
self.assertEqual('success', body['status'])
|
|
||||||
self.assertIn('Qinling', jsonutils.loads(body['result'])['output'])
|
|
||||||
|
|
||||||
@decorators.idempotent_id('2b5f0787-b82d-4fc4-af76-cf86d389a76b')
|
@decorators.idempotent_id('2b5f0787-b82d-4fc4-af76-cf86d389a76b')
|
||||||
def test_python_execution_memory_limit_non_image(self):
|
def test_python_execution_memory_limit_non_image(self):
|
||||||
"""In this case, the following steps are taken:
|
"""In this case, the following steps are taken:
|
||||||
|
|
Loading…
Reference in New Issue