Merge "Function timeout support for python runtime"

This commit is contained in:
Zuul 2018-09-07 06:54:15 +00:00 committed by Gerrit Code Review
commit cd688170da
8 changed files with 77 additions and 12 deletions

View File

@ -22,7 +22,7 @@ QINLING_CONF_FILE=${QINLING_CONF_DIR}/qinling.conf
QINLING_POLICY_FILE=${QINLING_CONF_DIR}/policy.json QINLING_POLICY_FILE=${QINLING_CONF_DIR}/policy.json
QINLING_AUTH_CACHE_DIR=${QINLING_AUTH_CACHE_DIR:-/var/cache/qinling} QINLING_AUTH_CACHE_DIR=${QINLING_AUTH_CACHE_DIR:-/var/cache/qinling}
QINLING_FUNCTION_STORAGE_DIR=${QINLING_FUNCTION_STORAGE_DIR:-/opt/qinling/function/packages} QINLING_FUNCTION_STORAGE_DIR=${QINLING_FUNCTION_STORAGE_DIR:-/opt/qinling/function/packages}
QINLING_PYTHON_RUNTIME_IMAGE=${QINLING_PYTHON_RUNTIME_IMAGE:-openstackqinling/python3-runtime:0.0.1} QINLING_PYTHON_RUNTIME_IMAGE=${QINLING_PYTHON_RUNTIME_IMAGE:-openstackqinling/python3-runtime:0.0.2}
QINLING_NODEJS_RUNTIME_IMAGE=${QINLING_NODEJS_RUNTIME_IMAGE:-openstackqinling/nodejs-runtime:0.0.1} QINLING_NODEJS_RUNTIME_IMAGE=${QINLING_NODEJS_RUNTIME_IMAGE:-openstackqinling/nodejs-runtime:0.0.1}
QINLING_SIDECAR_IMAGE=${QINLING_SIDECAR_IMAGE:-openstackqinling/sidecar:0.0.2} QINLING_SIDECAR_IMAGE=${QINLING_SIDECAR_IMAGE:-openstackqinling/sidecar:0.0.2}

View File

@ -40,7 +40,7 @@ QinlingGroup = [
'publicURL', 'adminURL', 'internalURL'], 'publicURL', 'adminURL', 'internalURL'],
help="The endpoint type to use for the qinling service."), help="The endpoint type to use for the qinling service."),
cfg.StrOpt("python_runtime_image", cfg.StrOpt("python_runtime_image",
default="openstackqinling/python3-runtime:0.0.1", default="openstackqinling/python3-runtime:0.0.2",
help="The Python runtime being used in the tests."), help="The Python runtime being used in the tests."),
cfg.StrOpt("nodejs_runtime_image", cfg.StrOpt("nodejs_runtime_image",
default="openstackqinling/nodejs-runtime:0.0.1", default="openstackqinling/nodejs-runtime:0.0.1",

View File

@ -14,5 +14,5 @@
import time import time
def main(seconds=5, **kwargs): def main(seconds=4, **kwargs):
time.sleep(seconds) time.sleep(seconds)

View File

@ -339,7 +339,7 @@ class ExecutionsTest(base.BaseQinlingTest):
) )
@decorators.idempotent_id('2b5f0787-b82d-4fc4-af76-cf86d389a76b') @decorators.idempotent_id('2b5f0787-b82d-4fc4-af76-cf86d389a76b')
def test_python_execution_memory_limit_non_image(self): def test_python_execution_memory_limit(self):
"""In this case, the following steps are taken: """In this case, the following steps are taken:
1. Create a function that requires ~80M memory to run. 1. Create a function that requires ~80M memory to run.
@ -390,7 +390,7 @@ class ExecutionsTest(base.BaseQinlingTest):
self.assertEqual(4, output) self.assertEqual(4, output)
@decorators.idempotent_id('ed714f98-29fe-4e8d-b6ee-9730f92bddea') @decorators.idempotent_id('ed714f98-29fe-4e8d-b6ee-9730f92bddea')
def test_python_execution_cpu_limit_non_image(self): def test_python_execution_cpu_limit(self):
"""In this case, the following steps are taken: """In this case, the following steps are taken:
1. Create a function that takes some time to finish (calculating the 1. Create a function that takes some time to finish (calculating the
@ -408,7 +408,7 @@ class ExecutionsTest(base.BaseQinlingTest):
package = self.create_package( package = self.create_package(
name='python/test_python_cpu_limit.py' name='python/test_python_cpu_limit.py'
) )
function_id = self.create_function(package_path=package) function_id = self.create_function(package_path=package, timeout=180)
# Invoke function # Invoke function
resp, body = self.client.create_execution(function_id) resp, body = self.client.create_execution(function_id)
@ -486,3 +486,27 @@ class ExecutionsTest(base.BaseQinlingTest):
self.assertEqual('success', body['status']) self.assertEqual('success', body['status'])
result = json.loads(body['result']) result = json.loads(body['result'])
self.assertEqual(page_sha256, result['output']) self.assertEqual(page_sha256, result['output'])
@decorators.idempotent_id('b05e3bac-b23f-11e8-9679-00224d6b7bc1')
def test_python_execution_timeout(self):
package = self.create_package(
name='python/test_python_sleep.py'
)
function_id = self.create_function(package_path=package)
resp, body = self.client.create_execution(
function_id,
input='{"seconds": 7}'
)
self.assertEqual(201, resp.status)
self.addCleanup(self.client.delete_resource, 'executions',
body['id'], ignore_notfound=True)
self.assertEqual('failed', body['status'])
result = jsonutils.loads(body['result'])
self.assertGreater(result['duration'], 5)
self.assertIn(
'Function execution timeout', result['output']
)

View File

@ -9,3 +9,4 @@ python-mistralclient>=3.1.0 # Apache-2.0
keystoneauth1>=2.21.0 # Apache-2.0 keystoneauth1>=2.21.0 # Apache-2.0
openstacksdk>=0.9.19 openstacksdk>=0.9.19
oslo.concurrency>=3.25.0 # Apache-2.0 oslo.concurrency>=3.25.0 # Apache-2.0
psutil>=5.4.7 # BSD

View File

@ -27,6 +27,7 @@ from flask import request
from flask import Response from flask import Response
from keystoneauth1.identity import generic from keystoneauth1.identity import generic
from keystoneauth1 import session from keystoneauth1 import session
import psutil
import requests import requests
app = Flask(__name__) app = Flask(__name__)
@ -34,6 +35,7 @@ app = Flask(__name__)
DOWNLOAD_ERROR = "Failed to download function package from %s, error: %s" DOWNLOAD_ERROR = "Failed to download function package from %s, error: %s"
INVOKE_ERROR = "Function execution failed because of too much resource " \ INVOKE_ERROR = "Function execution failed because of too much resource " \
"consumption" "consumption"
TIMEOUT_ERROR = "Function execution timeout."
def _print_trace(): def _print_trace():
@ -73,6 +75,17 @@ def _get_responce(output, duration, logs, success, code):
) )
def _killtree(pid, including_parent=True):
parent = psutil.Process(pid)
for child in parent.children(recursive=True):
print("kill child %s" % child)
child.kill()
if including_parent:
print("kill parent %s" % parent)
parent.kill()
def _invoke_function(execution_id, zip_file_dir, module_name, method, arg, def _invoke_function(execution_id, zip_file_dir, module_name, method, arg,
input, return_dict, rlimit): input, return_dict, rlimit):
"""Thie function is supposed to be running in a child process. """Thie function is supposed to be running in a child process.
@ -146,6 +159,7 @@ def execute():
auth_url = params.get('auth_url') auth_url = params.get('auth_url')
username = params.get('username') username = params.get('username')
password = params.get('password') password = params.get('password')
timeout = params.get('timeout')
zip_file_dir = '/var/qinling/packages/%s' % function_id zip_file_dir = '/var/qinling/packages/%s' % function_id
rlimit = { rlimit = {
'cpu': params['cpu'], 'cpu': params['cpu'],
@ -206,25 +220,31 @@ def execute():
return_dict['success'] = False return_dict['success'] = False
start = time.time() start = time.time()
# Run the function in a separate process to avoid messing up the log # Run the function in a separate process to avoid messing up the log. If
# the timeout is reached, kill all the subprocesses.
p = Process( p = Process(
target=_invoke_function, target=_invoke_function,
args=(execution_id, zip_file_dir, function_module, function_method, args=(execution_id, zip_file_dir, function_module, function_method,
input.pop('__function_input', None), input, return_dict, rlimit) input.pop('__function_input', None), input, return_dict, rlimit)
) )
timed_out = False
p.start() p.start()
p.join() p.join(timeout)
if p.is_alive():
_killtree(p.pid)
timed_out = True
#################################################################### ####################################################################
# #
# Get execution output(log, duration, etc.) # Get execution result(log, duration, etc.)
# #
#################################################################### ####################################################################
duration = round(time.time() - start, 3) duration = round(time.time() - start, 3)
# Process was killed unexpectedly or finished with error. # Process was killed unexpectedly or finished with error.
if p.exitcode != 0: if p.exitcode != 0:
output = INVOKE_ERROR output = TIMEOUT_ERROR if timed_out else INVOKE_ERROR
success = False success = False
else: else:
output = return_dict.get('result') output = return_dict.get('result')

View File

@ -9,3 +9,4 @@ python-mistralclient>=3.1.0 # Apache-2.0
keystoneauth1>=2.21.0 # Apache-2.0 keystoneauth1>=2.21.0 # Apache-2.0
openstacksdk>=0.9.19 openstacksdk>=0.9.19
oslo.concurrency>=3.25.0 # Apache-2.0 oslo.concurrency>=3.25.0 # Apache-2.0
psutil>=5.4.7 # BSD

View File

@ -27,6 +27,7 @@ from flask import request
from flask import Response from flask import Response
from keystoneauth1.identity import generic from keystoneauth1.identity import generic
from keystoneauth1 import session from keystoneauth1 import session
import psutil
import requests import requests
app = Flask(__name__) app = Flask(__name__)
@ -34,6 +35,7 @@ app = Flask(__name__)
DOWNLOAD_ERROR = "Failed to download function package from %s, error: %s" DOWNLOAD_ERROR = "Failed to download function package from %s, error: %s"
INVOKE_ERROR = "Function execution failed because of too much resource " \ INVOKE_ERROR = "Function execution failed because of too much resource " \
"consumption" "consumption"
TIMEOUT_ERROR = "Function execution timeout."
def _print_trace(): def _print_trace():
@ -73,6 +75,17 @@ def _get_responce(output, duration, logs, success, code):
) )
def _killtree(pid, including_parent=True):
parent = psutil.Process(pid)
for child in parent.children(recursive=True):
print("kill child %s" % child)
child.kill()
if including_parent:
print("kill parent %s" % parent)
parent.kill()
def _invoke_function(execution_id, zip_file_dir, module_name, method, arg, def _invoke_function(execution_id, zip_file_dir, module_name, method, arg,
input, return_dict, rlimit): input, return_dict, rlimit):
"""Thie function is supposed to be running in a child process. """Thie function is supposed to be running in a child process.
@ -146,6 +159,7 @@ def execute():
auth_url = params.get('auth_url') auth_url = params.get('auth_url')
username = params.get('username') username = params.get('username')
password = params.get('password') password = params.get('password')
timeout = params.get('timeout')
zip_file_dir = '/var/qinling/packages/%s' % function_id zip_file_dir = '/var/qinling/packages/%s' % function_id
rlimit = { rlimit = {
'cpu': params['cpu'], 'cpu': params['cpu'],
@ -212,8 +226,13 @@ def execute():
args=(execution_id, zip_file_dir, function_module, function_method, args=(execution_id, zip_file_dir, function_module, function_method,
input.pop('__function_input', None), input, return_dict, rlimit) input.pop('__function_input', None), input, return_dict, rlimit)
) )
timed_out = False
p.start() p.start()
p.join() p.join(timeout)
if p.is_alive():
_killtree(p.pid)
timed_out = True
#################################################################### ####################################################################
# #
@ -224,7 +243,7 @@ def execute():
# Process was killed unexpectedly or finished with error. # Process was killed unexpectedly or finished with error.
if p.exitcode != 0: if p.exitcode != 0:
output = INVOKE_ERROR output = TIMEOUT_ERROR if timed_out else INVOKE_ERROR
success = False success = False
else: else:
output = return_dict.get('result') output = return_dict.get('result')