From f2189a04680ead95aa267edbdad0e641376380c8 Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Tue, 29 Aug 2017 11:36:14 +1200 Subject: [PATCH] Fix logs query for executions After function autoscaling support, it's impossible for qinling-engine to get execution log because it doesn't know which pod it is talking to. So, it's neccessary for the runtime to return execution logs to qinling engine. The qinling client is not affected. Change-Id: I96dfd00cc83d8b8a5e8c601ee3800b1ef1a45b1b --- qinling/db/api.py | 8 +++ qinling/db/sqlalchemy/api.py | 17 +++++ qinling/engine/default_engine.py | 28 ++++---- qinling/orchestrator/base.py | 4 -- qinling/orchestrator/kubernetes/manager.py | 15 ---- qinling/services/periodics.py | 25 ++++--- qinling/status.py | 1 + runtimes/python2/server.py | 84 ++++++++++++++-------- 8 files changed, 108 insertions(+), 74 deletions(-) diff --git a/qinling/db/api.py b/qinling/db/api.py index 614c1305..f55cf828 100644 --- a/qinling/db/api.py +++ b/qinling/db/api.py @@ -166,6 +166,14 @@ def create_function_worker(values): return IMPL.create_function_worker(values) +def delete_function_workers(id): + return IMPL.delete_function_workers(id) + + +def get_function_workers(function_id): + return IMPL.get_function_workers(function_id) + + def create_job(values): return IMPL.create_job(values) diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py index 9527ad96..3f687c8f 100644 --- a/qinling/db/sqlalchemy/api.py +++ b/qinling/db/sqlalchemy/api.py @@ -410,6 +410,23 @@ def create_function_worker(values, session=None): return mapping +@db_base.session_aware() +def get_function_workers(function_id, session=None): + workers = db_base.model_query( + models.FunctionWorkers + ).filter_by(function_id=function_id).all() + + return workers + + +@db_base.session_aware() +def delete_function_workers(id, session=None): + workers = get_function_workers(id) + + for worker in workers: + session.delete(worker) + + @db_base.session_aware() def create_job(values, session=None): job = models.Job() diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index 01f07828..7c91f436 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -89,6 +89,7 @@ class DefaultEngine(object): execution_id, function_id, runtime_id, input ) + # FIXME(kong): Make the transaction range smaller. with db_api.transaction(): execution = db_api.get_execution(execution_id) function = db_api.get_function(function_id) @@ -102,17 +103,14 @@ class DefaultEngine(object): data = {'input': input, 'execution_id': execution_id} r = requests.post(func_url, json=data) - - # logs = self.orchestrator.get_execution_log( - # execution_id, - # worker_name=function.service.worker_name, - # ) + res = r.json() LOG.debug('Finished execution %s', execution_id) - execution.status = status.SUCCESS - execution.output = r.json() - # execution.logs = logs + success = res.pop('success') + execution.status = status.SUCCESS if success else status.FAILED + execution.logs = res.pop('logs', '') + execution.output = res return source = function.code['source'] @@ -150,10 +148,14 @@ class DefaultEngine(object): logs = '' # Execution log is only available for non-image source execution. if service_url: - logs = self.orchestrator.get_execution_log( - execution_id, - worker_name=worker_name, - ) + logs = output.pop('logs', '') + success = output.pop('success') + else: + # If the function is created from docker image, the output is + # direct output, here we convert to a dict to fit into the db + # schema. + output = {'output': output} + success = True LOG.debug( 'Finished execution. execution_id=%s, output=%s', @@ -162,7 +164,7 @@ class DefaultEngine(object): ) execution.output = output execution.logs = logs - execution.status = status.SUCCESS + execution.status = status.SUCCESS if success else status.FAILED # No service is created in orchestrator for single container. if not image: diff --git a/qinling/orchestrator/base.py b/qinling/orchestrator/base.py index f6c1e77b..7829332e 100644 --- a/qinling/orchestrator/base.py +++ b/qinling/orchestrator/base.py @@ -46,10 +46,6 @@ class OrchestratorBase(object): def run_execution(self, execution_id, function_id, **kwargs): raise NotImplementedError - @abc.abstractmethod - def get_execution_log(self, execution_id, **kwargs): - raise NotImplementedError - @abc.abstractmethod def delete_function(self, function_id, **kwargs): raise NotImplementedError diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index ed6793c6..9e47e4d3 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -387,21 +387,6 @@ class KubernetesManager(base.OrchestratorBase): return output - def get_execution_log(self, execution_id, worker_name=None): - logs = self.v1.read_namespaced_pod_log( - worker_name, - self.conf.kubernetes.namespace, - ) - - b_index = logs.index('Start execution: %s' % execution_id) - end_string = 'Finished execution: %s' % execution_id - e_index = logs.index(end_string) - e_index += len(end_string) - - execution_log = logs[b_index:e_index] - - return execution_log - def delete_function(self, function_id, labels=None): selector = common.convert_dict_to_string(labels) diff --git a/qinling/services/periodics.py b/qinling/services/periodics.py index cc4d4e53..4ec8a3fc 100644 --- a/qinling/services/periodics.py +++ b/qinling/services/periodics.py @@ -42,27 +42,26 @@ def handle_function_service_expiration(ctx, engine_client, orchestrator): expiry_time = datetime.utcnow() - delta results = db_api.get_functions( - fields=['id'], sort_keys=['updated_at'], insecure=True, updated_at={'lte': expiry_time} ) - - expiry_ids = [ret.id for ret in results] - - if not expiry_ids: + if len(results) == 0: return - mappings = db_api.get_function_service_mappings( - function_id={'in': expiry_ids} - ) + for func_db in results: + with db_api.transaction(): + LOG.info( + 'Deleting service mapping and workers for function %s', + func_db.id + ) - with db_api.transaction(): - for m in mappings: - LOG.info('Deleting service mapping for function %s', m.function_id) + # Delete resources related to the function + engine_client.delete_function(func_db.id) - engine_client.delete_function(m.function_id) - db_api.delete_function_service_mapping(m.function_id) + # Delete service mapping and worker records + db_api.delete_function_service_mapping(func_db.id) + db_api.delete_function_workers(func_db.id) def handle_job(engine_client): diff --git a/qinling/status.py b/qinling/status.py index 1d09a228..0e5af521 100644 --- a/qinling/status.py +++ b/qinling/status.py @@ -22,3 +22,4 @@ DONE = 'done' PAUSED = 'paused' CANCELLED = 'cancelled' SUCCESS = 'success' +FAILED = 'failed' diff --git a/runtimes/python2/server.py b/runtimes/python2/server.py index ec33d92e..61e0f625 100644 --- a/runtimes/python2/server.py +++ b/runtimes/python2/server.py @@ -15,6 +15,8 @@ import importlib import json import logging +from multiprocessing import Manager +from multiprocessing import Process import os import sys import time @@ -31,13 +33,6 @@ app = Flask(__name__) zip_file = '' function_module = 'main' function_method = 'main' -zip_imported = False - -# By default sys.stdout is usually line buffered for tty devices and fully -# buffered for other files. We need to change it to unbuffered to get execution -# log properly. -unbuffered = os.fdopen(sys.stdout.fileno(), 'w', 0) -sys.stdout = unbuffered @app.route('/download', methods=['POST']) @@ -77,41 +72,70 @@ def download(): return 'success' -@app.route('/execute', methods=['POST']) -def execute(): - global zip_imported - global zip_file - global function_module - global function_method +def _invoke_function(execution_id, zip_file, module_name, method, input, + return_dict): + """Thie function is supposed to be running in a child process.""" + sys.path.insert(0, zip_file) + sys.stdout = open("%s.out" % execution_id, "w", 0) - if not zip_imported: - sys.path.insert(0, zip_file) - zip_imported = True - - params = request.get_json() or {} - input = params.get('input') or {} - execution_id = params['execution_id'] print('Start execution: %s' % execution_id) - app.logger.debug('Invoking function with input: %s' % input) - start = time.time() try: - module = importlib.import_module(function_module) - func = getattr(module, function_method) - result = func(**input) + module = importlib.import_module(module_name) + func = getattr(module, method) + return_dict['result'] = func(**input) except Exception as e: - result = str(e) + return_dict['result'] = str(e) + return_dict['success'] = False # Print stacktrace exc_type, exc_value, exc_traceback = sys.exc_info() lines = traceback.format_exception(exc_type, exc_value, exc_traceback) - app.logger.debug(''.join(line for line in lines)) + print(''.join(line for line in lines)) finally: print('Finished execution: %s' % execution_id) - duration = time.time() - start + +@app.route('/execute', methods=['POST']) +def execute(): + global zip_file + global function_module + global function_method + + params = request.get_json() or {} + input = params.get('input') or {} + execution_id = params['execution_id'] + + manager = Manager() + return_dict = manager.dict() + return_dict['success'] = True + start = time.time() + + # Run the function in a separate process to avoid messing up the log + p = Process( + target=_invoke_function, + args=(execution_id, zip_file, function_module, function_method, + input, return_dict) + ) + p.start() + p.join() + + duration = round(time.time() - start, 3) + + with open('%s.out' % execution_id) as f: + logs = f.read() + + os.remove('%s.out' % execution_id) + return Response( - response=json.dumps({'output': result, 'duration': duration}), + response=json.dumps( + { + 'output': return_dict.get('result'), + 'duration': duration, + 'logs': logs, + 'success': return_dict['success'] + } + ), status=200, mimetype='application/json' ) @@ -131,4 +155,6 @@ def setup_logger(loglevel): setup_logger(logging.DEBUG) app.logger.info("Starting server") + +# Just for testing purpose app.run(host='0.0.0.0', port='9090', threaded=True)