diff --git a/qinling/db/api.py b/qinling/db/api.py index 614c1305..f55cf828 100644 --- a/qinling/db/api.py +++ b/qinling/db/api.py @@ -166,6 +166,14 @@ def create_function_worker(values): return IMPL.create_function_worker(values) +def delete_function_workers(id): + return IMPL.delete_function_workers(id) + + +def get_function_workers(function_id): + return IMPL.get_function_workers(function_id) + + def create_job(values): return IMPL.create_job(values) diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py index 9527ad96..3f687c8f 100644 --- a/qinling/db/sqlalchemy/api.py +++ b/qinling/db/sqlalchemy/api.py @@ -410,6 +410,23 @@ def create_function_worker(values, session=None): return mapping +@db_base.session_aware() +def get_function_workers(function_id, session=None): + workers = db_base.model_query( + models.FunctionWorkers + ).filter_by(function_id=function_id).all() + + return workers + + +@db_base.session_aware() +def delete_function_workers(id, session=None): + workers = get_function_workers(id) + + for worker in workers: + session.delete(worker) + + @db_base.session_aware() def create_job(values, session=None): job = models.Job() diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index 01f07828..7c91f436 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -89,6 +89,7 @@ class DefaultEngine(object): execution_id, function_id, runtime_id, input ) + # FIXME(kong): Make the transaction range smaller. with db_api.transaction(): execution = db_api.get_execution(execution_id) function = db_api.get_function(function_id) @@ -102,17 +103,14 @@ class DefaultEngine(object): data = {'input': input, 'execution_id': execution_id} r = requests.post(func_url, json=data) - - # logs = self.orchestrator.get_execution_log( - # execution_id, - # worker_name=function.service.worker_name, - # ) + res = r.json() LOG.debug('Finished execution %s', execution_id) - execution.status = status.SUCCESS - execution.output = r.json() - # execution.logs = logs + success = res.pop('success') + execution.status = status.SUCCESS if success else status.FAILED + execution.logs = res.pop('logs', '') + execution.output = res return source = function.code['source'] @@ -150,10 +148,14 @@ class DefaultEngine(object): logs = '' # Execution log is only available for non-image source execution. if service_url: - logs = self.orchestrator.get_execution_log( - execution_id, - worker_name=worker_name, - ) + logs = output.pop('logs', '') + success = output.pop('success') + else: + # If the function is created from docker image, the output is + # direct output, here we convert to a dict to fit into the db + # schema. + output = {'output': output} + success = True LOG.debug( 'Finished execution. execution_id=%s, output=%s', @@ -162,7 +164,7 @@ class DefaultEngine(object): ) execution.output = output execution.logs = logs - execution.status = status.SUCCESS + execution.status = status.SUCCESS if success else status.FAILED # No service is created in orchestrator for single container. if not image: diff --git a/qinling/orchestrator/base.py b/qinling/orchestrator/base.py index f6c1e77b..7829332e 100644 --- a/qinling/orchestrator/base.py +++ b/qinling/orchestrator/base.py @@ -46,10 +46,6 @@ class OrchestratorBase(object): def run_execution(self, execution_id, function_id, **kwargs): raise NotImplementedError - @abc.abstractmethod - def get_execution_log(self, execution_id, **kwargs): - raise NotImplementedError - @abc.abstractmethod def delete_function(self, function_id, **kwargs): raise NotImplementedError diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index ed6793c6..9e47e4d3 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -387,21 +387,6 @@ class KubernetesManager(base.OrchestratorBase): return output - def get_execution_log(self, execution_id, worker_name=None): - logs = self.v1.read_namespaced_pod_log( - worker_name, - self.conf.kubernetes.namespace, - ) - - b_index = logs.index('Start execution: %s' % execution_id) - end_string = 'Finished execution: %s' % execution_id - e_index = logs.index(end_string) - e_index += len(end_string) - - execution_log = logs[b_index:e_index] - - return execution_log - def delete_function(self, function_id, labels=None): selector = common.convert_dict_to_string(labels) diff --git a/qinling/services/periodics.py b/qinling/services/periodics.py index cc4d4e53..4ec8a3fc 100644 --- a/qinling/services/periodics.py +++ b/qinling/services/periodics.py @@ -42,27 +42,26 @@ def handle_function_service_expiration(ctx, engine_client, orchestrator): expiry_time = datetime.utcnow() - delta results = db_api.get_functions( - fields=['id'], sort_keys=['updated_at'], insecure=True, updated_at={'lte': expiry_time} ) - - expiry_ids = [ret.id for ret in results] - - if not expiry_ids: + if len(results) == 0: return - mappings = db_api.get_function_service_mappings( - function_id={'in': expiry_ids} - ) + for func_db in results: + with db_api.transaction(): + LOG.info( + 'Deleting service mapping and workers for function %s', + func_db.id + ) - with db_api.transaction(): - for m in mappings: - LOG.info('Deleting service mapping for function %s', m.function_id) + # Delete resources related to the function + engine_client.delete_function(func_db.id) - engine_client.delete_function(m.function_id) - db_api.delete_function_service_mapping(m.function_id) + # Delete service mapping and worker records + db_api.delete_function_service_mapping(func_db.id) + db_api.delete_function_workers(func_db.id) def handle_job(engine_client): diff --git a/qinling/status.py b/qinling/status.py index 1d09a228..0e5af521 100644 --- a/qinling/status.py +++ b/qinling/status.py @@ -22,3 +22,4 @@ DONE = 'done' PAUSED = 'paused' CANCELLED = 'cancelled' SUCCESS = 'success' +FAILED = 'failed' diff --git a/runtimes/python2/server.py b/runtimes/python2/server.py index ec33d92e..61e0f625 100644 --- a/runtimes/python2/server.py +++ b/runtimes/python2/server.py @@ -15,6 +15,8 @@ import importlib import json import logging +from multiprocessing import Manager +from multiprocessing import Process import os import sys import time @@ -31,13 +33,6 @@ app = Flask(__name__) zip_file = '' function_module = 'main' function_method = 'main' -zip_imported = False - -# By default sys.stdout is usually line buffered for tty devices and fully -# buffered for other files. We need to change it to unbuffered to get execution -# log properly. -unbuffered = os.fdopen(sys.stdout.fileno(), 'w', 0) -sys.stdout = unbuffered @app.route('/download', methods=['POST']) @@ -77,41 +72,70 @@ def download(): return 'success' -@app.route('/execute', methods=['POST']) -def execute(): - global zip_imported - global zip_file - global function_module - global function_method +def _invoke_function(execution_id, zip_file, module_name, method, input, + return_dict): + """Thie function is supposed to be running in a child process.""" + sys.path.insert(0, zip_file) + sys.stdout = open("%s.out" % execution_id, "w", 0) - if not zip_imported: - sys.path.insert(0, zip_file) - zip_imported = True - - params = request.get_json() or {} - input = params.get('input') or {} - execution_id = params['execution_id'] print('Start execution: %s' % execution_id) - app.logger.debug('Invoking function with input: %s' % input) - start = time.time() try: - module = importlib.import_module(function_module) - func = getattr(module, function_method) - result = func(**input) + module = importlib.import_module(module_name) + func = getattr(module, method) + return_dict['result'] = func(**input) except Exception as e: - result = str(e) + return_dict['result'] = str(e) + return_dict['success'] = False # Print stacktrace exc_type, exc_value, exc_traceback = sys.exc_info() lines = traceback.format_exception(exc_type, exc_value, exc_traceback) - app.logger.debug(''.join(line for line in lines)) + print(''.join(line for line in lines)) finally: print('Finished execution: %s' % execution_id) - duration = time.time() - start + +@app.route('/execute', methods=['POST']) +def execute(): + global zip_file + global function_module + global function_method + + params = request.get_json() or {} + input = params.get('input') or {} + execution_id = params['execution_id'] + + manager = Manager() + return_dict = manager.dict() + return_dict['success'] = True + start = time.time() + + # Run the function in a separate process to avoid messing up the log + p = Process( + target=_invoke_function, + args=(execution_id, zip_file, function_module, function_method, + input, return_dict) + ) + p.start() + p.join() + + duration = round(time.time() - start, 3) + + with open('%s.out' % execution_id) as f: + logs = f.read() + + os.remove('%s.out' % execution_id) + return Response( - response=json.dumps({'output': result, 'duration': duration}), + response=json.dumps( + { + 'output': return_dict.get('result'), + 'duration': duration, + 'logs': logs, + 'success': return_dict['success'] + } + ), status=200, mimetype='application/json' ) @@ -131,4 +155,6 @@ def setup_logger(loglevel): setup_logger(logging.DEBUG) app.logger.info("Starting server") + +# Just for testing purpose app.run(host='0.0.0.0', port='9090', threaded=True)