diff --git a/qinling/api/controllers/v1/execution.py b/qinling/api/controllers/v1/execution.py index 6262693b..4c320e7b 100644 --- a/qinling/api/controllers/v1/execution.py +++ b/qinling/api/controllers/v1/execution.py @@ -13,6 +13,7 @@ # limitations under the License. from oslo_log import log as logging +import pecan from pecan import rest import wsmeext.pecan as wsme_pecan @@ -26,7 +27,19 @@ from qinling.utils import rest_utils LOG = logging.getLogger(__name__) +class ExecutionLogController(rest.RestController): + @rest_utils.wrap_pecan_controller_exception + @pecan.expose(content_type='text/plain') + def get_all(self, execution_id): + LOG.info("Get logs for execution %s.", execution_id) + execution_db = db_api.get_execution(execution_id) + + return execution_db.logs + + class ExecutionsController(rest.RestController): + log = ExecutionLogController() + def __init__(self, *args, **kwargs): self.engine_client = rpc.get_engine_client() self.type = 'execution' diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py index bf633fb7..88608b95 100644 --- a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py +++ b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py @@ -71,6 +71,7 @@ def upgrade(): sa.Column('updated_at', sa.DateTime(), nullable=True), sa.Column('function_id', sa.String(length=36), nullable=False), sa.Column('service_url', sa.String(length=255), nullable=False), + sa.Column('worker_name', sa.String(length=255), nullable=False), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('function_id', 'service_url'), sa.ForeignKeyConstraint( @@ -106,6 +107,7 @@ def upgrade(): sa.Column('sync', sa.BOOLEAN, nullable=False), sa.Column('input', st.JsonLongDictType(), nullable=True), sa.Column('output', st.JsonLongDictType(), nullable=True), + sa.Column('logs', sa.Text(), nullable=True), sa.PrimaryKeyConstraint('id'), info={"check_ifexists": True} ) diff --git a/qinling/db/sqlalchemy/models.py b/qinling/db/sqlalchemy/models.py index 8e037b41..e4f2a8af 100644 --- a/qinling/db/sqlalchemy/models.py +++ b/qinling/db/sqlalchemy/models.py @@ -66,6 +66,7 @@ class FunctionServiceMapping(model_base.QinlingModelBase): sa.ForeignKey(Function.id, ondelete='CASCADE'), ) service_url = sa.Column(sa.String(255), nullable=False) + worker_name = sa.Column(sa.String(255), nullable=False) class Execution(model_base.QinlingSecureModelBase): @@ -77,6 +78,7 @@ class Execution(model_base.QinlingSecureModelBase): input = sa.Column(st.JsonLongDictType()) output = sa.Column(st.JsonLongDictType()) description = sa.Column(sa.String(255)) + logs = sa.Column(sa.Text(), nullable=True) class Job(model_base.QinlingSecureModelBase): diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index d9583b15..8ca5a423 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -100,10 +100,19 @@ class DefaultEngine(object): function_id, func_url ) - data = {'input': input} + data = {'input': input, 'execution_id': execution_id} r = requests.post(func_url, json=data) + + logs = self.orchestrator.get_execution_log( + execution_id, + worker_name=function.service.worker_name, + ) + + LOG.debug('Finished execution %s', execution_id) + execution.status = status.SUCCESS execution.output = r.json() + execution.logs = logs return source = function.code['source'] @@ -122,7 +131,7 @@ class DefaultEngine(object): identifier = runtime_id labels = {'runtime_id': runtime_id} - service_url = self.orchestrator.prepare_execution( + worker_name, service_url = self.orchestrator.prepare_execution( function_id, image=image, identifier=identifier, @@ -131,25 +140,36 @@ class DefaultEngine(object): entry=function.entry ) output = self.orchestrator.run_execution( + execution_id, function_id, input=input, identifier=identifier, service_url=service_url, ) + logs = '' + # Execution log is only available for non-image source execution. + if service_url: + logs = self.orchestrator.get_execution_log( + execution_id, + worker_name=worker_name, + ) + LOG.debug( 'Finished execution. execution_id=%s, output=%s', execution_id, output ) execution.output = output + execution.logs = logs execution.status = status.SUCCESS # No service is created in orchestrator for single container. if not image: mapping = { 'function_id': function_id, - 'service_url': service_url + 'service_url': service_url, + 'worker_name': worker_name } db_api.create_function_service_mapping(mapping) diff --git a/qinling/orchestrator/base.py b/qinling/orchestrator/base.py index bcd81df6..d7432fbe 100644 --- a/qinling/orchestrator/base.py +++ b/qinling/orchestrator/base.py @@ -43,7 +43,11 @@ class OrchestratorBase(object): raise NotImplementedError @abc.abstractmethod - def run_execution(self, function_id, **kwargs): + def run_execution(self, execution_id, function_id, **kwargs): + raise NotImplementedError + + @abc.abstractmethod + def get_execution_log(self, execution_id, **kwargs): raise NotImplementedError @abc.abstractmethod diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index 67b4e9d5..02b0be30 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -304,7 +304,7 @@ class KubernetesManager(base.OrchestratorBase): 'Failed to download function code package.' ) - return pod_service_url + return name, pod_service_url def _create_pod(self, image, pod_name, labels, input): pod_body = self.pod_template.render( @@ -339,7 +339,7 @@ class KubernetesManager(base.OrchestratorBase): if image: self._create_pod(image, identifier, labels, input) - return None + return identifier, None else: pod = self._choose_available_pod(labels) @@ -348,11 +348,11 @@ class KubernetesManager(base.OrchestratorBase): return self._prepare_pod(pod, identifier, function_id, labels, entry) - def run_execution(self, function_id, input=None, identifier=None, - service_url=None): + def run_execution(self, execution_id, function_id, input=None, + identifier=None, service_url=None): if service_url: func_url = '%s/execute' % service_url - data = {'input': input} + data = {'input': input, 'execution_id': execution_id} LOG.info('Invoke function %s, url: %s', function_id, func_url) @@ -379,6 +379,21 @@ class KubernetesManager(base.OrchestratorBase): return output + def get_execution_log(self, execution_id, worker_name=None): + logs = self.v1.read_namespaced_pod_log( + worker_name, + self.conf.kubernetes.namespace, + ) + + b_index = logs.index('Start execution: %s' % execution_id) + end_string = 'Finished execution: %s' % execution_id + e_index = logs.index(end_string) + e_index += len(end_string) + + execution_log = logs[b_index:e_index] + + return execution_log + def delete_function(self, function_id, labels=None): selector = common.convert_dict_to_string(labels) diff --git a/runtimes/python2/server.py b/runtimes/python2/server.py index 08c1fd4e..3d899388 100644 --- a/runtimes/python2/server.py +++ b/runtimes/python2/server.py @@ -15,6 +15,7 @@ import importlib import json import logging +import os import sys import time import traceback @@ -32,6 +33,13 @@ function_module = 'main' function_method = 'main' +# By default sys.stdout is usually line buffered for tty devices and fully +# buffered for other files. We need to change it to unbuffered to get execution +# log properly. +unbuffered = os.fdopen(sys.stdout.fileno(), 'w', 0) +sys.stdout = unbuffered + + @app.route('/download', methods=['POST']) def download(): params = request.get_json() or {} @@ -77,6 +85,8 @@ def execute(): params = request.get_json() or {} input = params.get('input') or {} + execution_id = params['execution_id'] + print('Start execution: %s' % execution_id) app.logger.debug('Invoking function with input: %s' % input) start = time.time() @@ -92,6 +102,8 @@ def execute(): exc_type, exc_value, exc_traceback = sys.exc_info() lines = traceback.format_exception(exc_type, exc_value, exc_traceback) app.logger.debug(''.join(line for line in lines)) + finally: + print('Finished execution: %s' % execution_id) duration = time.time() - start return Response(