Merge "engine: set execution status upon exceptions"
This commit is contained in:
commit
8a8216836c
@ -18,6 +18,7 @@ import tenacity
|
||||
|
||||
from qinling.db import api as db_api
|
||||
from qinling.engine import utils
|
||||
from qinling import exceptions as exc
|
||||
from qinling import status
|
||||
from qinling.utils import common
|
||||
from qinling.utils import constants
|
||||
@ -123,10 +124,15 @@ class DefaultEngine(object):
|
||||
identifier = None
|
||||
labels = None
|
||||
svc_url = None
|
||||
is_image_source = source == constants.IMAGE_FUNCTION
|
||||
|
||||
# Auto scale workers if needed
|
||||
if source != constants.IMAGE_FUNCTION:
|
||||
svc_url = self.function_load_check(function_id, runtime_id)
|
||||
if not is_image_source:
|
||||
try:
|
||||
svc_url = self.function_load_check(function_id, runtime_id)
|
||||
except exc.OrchestratorException as e:
|
||||
utils.handle_execution_exception(execution_id, str(e))
|
||||
return
|
||||
|
||||
temp_url = etcd_util.get_service_url(function_id)
|
||||
svc_url = svc_url or temp_url
|
||||
@ -145,20 +151,9 @@ class DefaultEngine(object):
|
||||
success, res = utils.url_request(
|
||||
self.session, func_url, body=data
|
||||
)
|
||||
success = success and res.pop('success')
|
||||
|
||||
LOG.debug(
|
||||
'Finished execution %s, success: %s', execution_id, success
|
||||
)
|
||||
|
||||
db_api.update_execution(
|
||||
execution_id,
|
||||
{
|
||||
'status': status.SUCCESS if success else status.FAILED,
|
||||
'logs': res.pop('logs', ''),
|
||||
'result': res
|
||||
}
|
||||
)
|
||||
utils.finish_execution(
|
||||
execution_id, success, res, is_image_source=is_image_source)
|
||||
return
|
||||
|
||||
if source == constants.IMAGE_FUNCTION:
|
||||
@ -171,13 +166,18 @@ class DefaultEngine(object):
|
||||
identifier = runtime_id
|
||||
labels = {'runtime_id': runtime_id}
|
||||
|
||||
_, svc_url = self.orchestrator.prepare_execution(
|
||||
function_id,
|
||||
image=image,
|
||||
identifier=identifier,
|
||||
labels=labels,
|
||||
input=input,
|
||||
)
|
||||
try:
|
||||
_, svc_url = self.orchestrator.prepare_execution(
|
||||
function_id,
|
||||
image=image,
|
||||
identifier=identifier,
|
||||
labels=labels,
|
||||
input=input,
|
||||
)
|
||||
except exc.OrchestratorException as e:
|
||||
utils.handle_execution_exception(execution_id, str(e))
|
||||
return
|
||||
|
||||
success, res = self.orchestrator.run_execution(
|
||||
execution_id,
|
||||
function_id,
|
||||
@ -188,29 +188,8 @@ class DefaultEngine(object):
|
||||
trust_id=function.trust_id
|
||||
)
|
||||
|
||||
logs = ''
|
||||
# Execution log is only available for non-image source execution.
|
||||
if svc_url:
|
||||
logs = res.pop('logs', '')
|
||||
success = success and res.pop('success')
|
||||
else:
|
||||
# If the function is created from docker image, the result is
|
||||
# direct output, here we convert to a dict to fit into the db
|
||||
# schema.
|
||||
res = {'output': res}
|
||||
|
||||
LOG.debug(
|
||||
'Finished execution %s, success: %s', execution_id, success
|
||||
)
|
||||
|
||||
db_api.update_execution(
|
||||
execution_id,
|
||||
{
|
||||
'status': status.SUCCESS if success else status.FAILED,
|
||||
'logs': logs,
|
||||
'result': res
|
||||
}
|
||||
)
|
||||
utils.finish_execution(
|
||||
execution_id, success, res, is_image_source=is_image_source)
|
||||
|
||||
def delete_function(self, ctx, function_id):
|
||||
"""Deletes underlying resources allocated for function."""
|
||||
|
@ -19,6 +19,8 @@ import six
|
||||
import tenacity
|
||||
|
||||
from qinling import context
|
||||
from qinling.db import api as db_api
|
||||
from qinling import status
|
||||
from qinling.utils import constants
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -99,3 +101,42 @@ def get_request_data(conf, function_id, execution_id, input, entry, trust_id,
|
||||
)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def db_set_execution_status(execution_id, execution_status, logs, res):
|
||||
db_api.update_execution(
|
||||
execution_id,
|
||||
{
|
||||
'status': execution_status,
|
||||
'logs': logs,
|
||||
'result': res
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def finish_execution(execution_id, success, res, is_image_source=False):
|
||||
logs = ''
|
||||
if is_image_source:
|
||||
# If the function is created from docker image, the result is
|
||||
# direct output, here we convert to a dict to fit into the db
|
||||
# schema.
|
||||
res = {'output': res}
|
||||
else:
|
||||
# Execution log is only available for non-image source execution.
|
||||
logs = res.pop('logs', '')
|
||||
success = success and res.pop('success')
|
||||
|
||||
LOG.debug(
|
||||
'Finished execution %s, success: %s', execution_id, success
|
||||
)
|
||||
db_set_execution_status(
|
||||
execution_id, status.SUCCESS if success else status.FAILED,
|
||||
logs, res
|
||||
)
|
||||
|
||||
|
||||
def handle_execution_exception(execution_id, exc_str):
|
||||
LOG.error(
|
||||
'Error running execution %s: %s', execution_id, exc_str
|
||||
)
|
||||
db_set_execution_status(execution_id, status.ERROR, '', {})
|
||||
|
@ -14,6 +14,8 @@
|
||||
|
||||
import mock
|
||||
|
||||
from qinling import exceptions as exc
|
||||
from qinling import status
|
||||
from qinling.tests.unit.api import base
|
||||
|
||||
TEST_CASE_NAME = 'TestExecutionController'
|
||||
@ -39,6 +41,17 @@ class TestExecutionController(base.APITest):
|
||||
|
||||
self.assertEqual(1, resp.json.get('count'))
|
||||
|
||||
@mock.patch('qinling.rpc.EngineClient.create_execution')
|
||||
def test_create_rpc_error(self, mock_create_execution):
|
||||
mock_create_execution.side_effect = exc.QinlingException
|
||||
body = {
|
||||
'function_id': self.func_id,
|
||||
}
|
||||
resp = self.app.post_json('/v1/executions', body)
|
||||
|
||||
self.assertEqual(201, resp.status_int)
|
||||
self.assertEqual(status.ERROR, resp.json.get('status'))
|
||||
|
||||
@mock.patch('qinling.rpc.EngineClient.create_execution')
|
||||
def test_get(self, mock_create_execution):
|
||||
body = {
|
||||
|
@ -297,6 +297,42 @@ class TestDefaultEngine(base.DbTestCase):
|
||||
self.assertEqual(execution_2.logs, '')
|
||||
self.assertEqual(execution_2.result, {'output': 'failed result'})
|
||||
|
||||
@mock.patch('qinling.utils.etcd_util.get_service_url')
|
||||
def test_create_execution_prepare_execution_exception(
|
||||
self,
|
||||
etcd_util_get_service_url_mock
|
||||
):
|
||||
function = self.create_function(prefix='TestDefaultEngine')
|
||||
function_id = function.id
|
||||
runtime_id = function.runtime_id
|
||||
db_api.update_function(
|
||||
function_id,
|
||||
{
|
||||
'code': {
|
||||
'source': constants.IMAGE_FUNCTION,
|
||||
'image': self.rand_name('image',
|
||||
prefix='TestDefaultEngine')
|
||||
}
|
||||
}
|
||||
)
|
||||
function = db_api.get_function(function_id)
|
||||
execution = self.create_execution(
|
||||
function_id=function_id, prefix='TestDefaultEngine')
|
||||
execution_id = execution.id
|
||||
prepare_execution = self.orchestrator.prepare_execution
|
||||
prepare_execution.side_effect = exc.OrchestratorException(
|
||||
'Exception in prepare_execution'
|
||||
)
|
||||
etcd_util_get_service_url_mock.return_value = None
|
||||
|
||||
self.default_engine.create_execution(
|
||||
mock.Mock(), execution_id, function_id, runtime_id)
|
||||
|
||||
execution = db_api.get_execution(execution_id)
|
||||
self.assertEqual(execution.status, status.ERROR)
|
||||
self.assertEqual(execution.logs, '')
|
||||
self.assertEqual(execution.result, {})
|
||||
|
||||
@mock.patch('qinling.utils.etcd_util.get_service_url')
|
||||
def test_create_execution_not_image_source(
|
||||
self,
|
||||
@ -335,6 +371,27 @@ class TestDefaultEngine(base.DbTestCase):
|
||||
self.assertEqual(execution.logs, 'execution log')
|
||||
self.assertEqual(execution.result, {'output': 'success output'})
|
||||
|
||||
def test_create_execution_not_image_source_scaleup_exception(self):
|
||||
function = self.create_function(prefix='TestDefaultEngine')
|
||||
function_id = function.id
|
||||
runtime_id = function.runtime_id
|
||||
execution = self.create_execution(
|
||||
function_id=function_id, prefix='TestDefaultEngine')
|
||||
execution_id = execution.id
|
||||
self.default_engine.function_load_check = mock.Mock(
|
||||
side_effect=exc.OrchestratorException(
|
||||
'Exception in scaleup_function'
|
||||
)
|
||||
)
|
||||
|
||||
self.default_engine.create_execution(
|
||||
mock.Mock(), execution_id, function_id, runtime_id)
|
||||
|
||||
execution = db_api.get_execution(execution_id)
|
||||
self.assertEqual(execution.status, status.ERROR)
|
||||
self.assertEqual(execution.logs, '')
|
||||
self.assertEqual(execution.result, {})
|
||||
|
||||
@mock.patch('qinling.engine.utils.get_request_data')
|
||||
@mock.patch('qinling.engine.utils.url_request')
|
||||
@mock.patch('qinling.utils.etcd_util.get_service_url')
|
||||
|
@ -75,10 +75,22 @@ def create_execution(engine_client, params):
|
||||
params.update({'status': status.RUNNING})
|
||||
db_model = db_api.create_execution(params)
|
||||
|
||||
engine_client.create_execution(
|
||||
db_model.id, function_id, runtime_id,
|
||||
input=params.get('input'), is_sync=is_sync
|
||||
)
|
||||
try:
|
||||
engine_client.create_execution(
|
||||
db_model.id, function_id, runtime_id,
|
||||
input=params.get('input'), is_sync=is_sync
|
||||
)
|
||||
except exc.QinlingException:
|
||||
# Catch RPC errors for executions:
|
||||
# - for RemoteError in an RPC call, the execution status would be
|
||||
# handled in the engine side;
|
||||
# - for other exceptions in an RPC call or cast, the execution status
|
||||
# would remain RUNNING so we should update it.
|
||||
db_model = db_api.get_execution(db_model.id)
|
||||
if db_model.status == status.RUNNING:
|
||||
db_model = db_api.update_execution(db_model.id,
|
||||
{'status': status.ERROR})
|
||||
return db_model
|
||||
|
||||
if is_sync:
|
||||
# The execution should already be updated by engine service for sync
|
||||
|
Loading…
x
Reference in New Issue
Block a user