Support function execution log query

After execution is finished, Qinling provides API to query execution
log for audit, debug or other purposes.

Implements: blueprint qinling-logs
Change-Id: Ie92619bf238c8e72e15c350c65bfbbb88a065f97
This commit is contained in:
Lingxian Kong 2017-08-13 00:33:49 +12:00
parent 35e652f43a
commit 089f41de2b
7 changed files with 77 additions and 9 deletions

View File

@ -13,6 +13,7 @@
# limitations under the License. # limitations under the License.
from oslo_log import log as logging from oslo_log import log as logging
import pecan
from pecan import rest from pecan import rest
import wsmeext.pecan as wsme_pecan import wsmeext.pecan as wsme_pecan
@ -26,7 +27,19 @@ from qinling.utils import rest_utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class ExecutionLogController(rest.RestController):
@rest_utils.wrap_pecan_controller_exception
@pecan.expose(content_type='text/plain')
def get_all(self, execution_id):
LOG.info("Get logs for execution %s.", execution_id)
execution_db = db_api.get_execution(execution_id)
return execution_db.logs
class ExecutionsController(rest.RestController): class ExecutionsController(rest.RestController):
log = ExecutionLogController()
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.engine_client = rpc.get_engine_client() self.engine_client = rpc.get_engine_client()
self.type = 'execution' self.type = 'execution'

View File

@ -71,6 +71,7 @@ def upgrade():
sa.Column('updated_at', sa.DateTime(), nullable=True), sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('function_id', sa.String(length=36), nullable=False), sa.Column('function_id', sa.String(length=36), nullable=False),
sa.Column('service_url', sa.String(length=255), nullable=False), sa.Column('service_url', sa.String(length=255), nullable=False),
sa.Column('worker_name', sa.String(length=255), nullable=False),
sa.PrimaryKeyConstraint('id'), sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('function_id', 'service_url'), sa.UniqueConstraint('function_id', 'service_url'),
sa.ForeignKeyConstraint( sa.ForeignKeyConstraint(
@ -106,6 +107,7 @@ def upgrade():
sa.Column('sync', sa.BOOLEAN, nullable=False), sa.Column('sync', sa.BOOLEAN, nullable=False),
sa.Column('input', st.JsonLongDictType(), nullable=True), sa.Column('input', st.JsonLongDictType(), nullable=True),
sa.Column('output', st.JsonLongDictType(), nullable=True), sa.Column('output', st.JsonLongDictType(), nullable=True),
sa.Column('logs', sa.Text(), nullable=True),
sa.PrimaryKeyConstraint('id'), sa.PrimaryKeyConstraint('id'),
info={"check_ifexists": True} info={"check_ifexists": True}
) )

View File

@ -66,6 +66,7 @@ class FunctionServiceMapping(model_base.QinlingModelBase):
sa.ForeignKey(Function.id, ondelete='CASCADE'), sa.ForeignKey(Function.id, ondelete='CASCADE'),
) )
service_url = sa.Column(sa.String(255), nullable=False) service_url = sa.Column(sa.String(255), nullable=False)
worker_name = sa.Column(sa.String(255), nullable=False)
class Execution(model_base.QinlingSecureModelBase): class Execution(model_base.QinlingSecureModelBase):
@ -77,6 +78,7 @@ class Execution(model_base.QinlingSecureModelBase):
input = sa.Column(st.JsonLongDictType()) input = sa.Column(st.JsonLongDictType())
output = sa.Column(st.JsonLongDictType()) output = sa.Column(st.JsonLongDictType())
description = sa.Column(sa.String(255)) description = sa.Column(sa.String(255))
logs = sa.Column(sa.Text(), nullable=True)
class Job(model_base.QinlingSecureModelBase): class Job(model_base.QinlingSecureModelBase):

View File

@ -100,10 +100,19 @@ class DefaultEngine(object):
function_id, func_url function_id, func_url
) )
data = {'input': input} data = {'input': input, 'execution_id': execution_id}
r = requests.post(func_url, json=data) r = requests.post(func_url, json=data)
logs = self.orchestrator.get_execution_log(
execution_id,
worker_name=function.service.worker_name,
)
LOG.debug('Finished execution %s', execution_id)
execution.status = status.SUCCESS execution.status = status.SUCCESS
execution.output = r.json() execution.output = r.json()
execution.logs = logs
return return
source = function.code['source'] source = function.code['source']
@ -122,7 +131,7 @@ class DefaultEngine(object):
identifier = runtime_id identifier = runtime_id
labels = {'runtime_id': runtime_id} labels = {'runtime_id': runtime_id}
service_url = self.orchestrator.prepare_execution( worker_name, service_url = self.orchestrator.prepare_execution(
function_id, function_id,
image=image, image=image,
identifier=identifier, identifier=identifier,
@ -131,25 +140,36 @@ class DefaultEngine(object):
entry=function.entry entry=function.entry
) )
output = self.orchestrator.run_execution( output = self.orchestrator.run_execution(
execution_id,
function_id, function_id,
input=input, input=input,
identifier=identifier, identifier=identifier,
service_url=service_url, service_url=service_url,
) )
logs = ''
# Execution log is only available for non-image source execution.
if service_url:
logs = self.orchestrator.get_execution_log(
execution_id,
worker_name=worker_name,
)
LOG.debug( LOG.debug(
'Finished execution. execution_id=%s, output=%s', 'Finished execution. execution_id=%s, output=%s',
execution_id, execution_id,
output output
) )
execution.output = output execution.output = output
execution.logs = logs
execution.status = status.SUCCESS execution.status = status.SUCCESS
# No service is created in orchestrator for single container. # No service is created in orchestrator for single container.
if not image: if not image:
mapping = { mapping = {
'function_id': function_id, 'function_id': function_id,
'service_url': service_url 'service_url': service_url,
'worker_name': worker_name
} }
db_api.create_function_service_mapping(mapping) db_api.create_function_service_mapping(mapping)

View File

@ -43,7 +43,11 @@ class OrchestratorBase(object):
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def run_execution(self, function_id, **kwargs): def run_execution(self, execution_id, function_id, **kwargs):
raise NotImplementedError
@abc.abstractmethod
def get_execution_log(self, execution_id, **kwargs):
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod

View File

@ -304,7 +304,7 @@ class KubernetesManager(base.OrchestratorBase):
'Failed to download function code package.' 'Failed to download function code package.'
) )
return pod_service_url return name, pod_service_url
def _create_pod(self, image, pod_name, labels, input): def _create_pod(self, image, pod_name, labels, input):
pod_body = self.pod_template.render( pod_body = self.pod_template.render(
@ -339,7 +339,7 @@ class KubernetesManager(base.OrchestratorBase):
if image: if image:
self._create_pod(image, identifier, labels, input) self._create_pod(image, identifier, labels, input)
return None return identifier, None
else: else:
pod = self._choose_available_pod(labels) pod = self._choose_available_pod(labels)
@ -348,11 +348,11 @@ class KubernetesManager(base.OrchestratorBase):
return self._prepare_pod(pod, identifier, function_id, labels, entry) return self._prepare_pod(pod, identifier, function_id, labels, entry)
def run_execution(self, function_id, input=None, identifier=None, def run_execution(self, execution_id, function_id, input=None,
service_url=None): identifier=None, service_url=None):
if service_url: if service_url:
func_url = '%s/execute' % service_url func_url = '%s/execute' % service_url
data = {'input': input} data = {'input': input, 'execution_id': execution_id}
LOG.info('Invoke function %s, url: %s', function_id, func_url) LOG.info('Invoke function %s, url: %s', function_id, func_url)
@ -379,6 +379,21 @@ class KubernetesManager(base.OrchestratorBase):
return output return output
def get_execution_log(self, execution_id, worker_name=None):
logs = self.v1.read_namespaced_pod_log(
worker_name,
self.conf.kubernetes.namespace,
)
b_index = logs.index('Start execution: %s' % execution_id)
end_string = 'Finished execution: %s' % execution_id
e_index = logs.index(end_string)
e_index += len(end_string)
execution_log = logs[b_index:e_index]
return execution_log
def delete_function(self, function_id, labels=None): def delete_function(self, function_id, labels=None):
selector = common.convert_dict_to_string(labels) selector = common.convert_dict_to_string(labels)

View File

@ -15,6 +15,7 @@
import importlib import importlib
import json import json
import logging import logging
import os
import sys import sys
import time import time
import traceback import traceback
@ -32,6 +33,13 @@ function_module = 'main'
function_method = 'main' function_method = 'main'
# By default sys.stdout is usually line buffered for tty devices and fully
# buffered for other files. We need to change it to unbuffered to get execution
# log properly.
unbuffered = os.fdopen(sys.stdout.fileno(), 'w', 0)
sys.stdout = unbuffered
@app.route('/download', methods=['POST']) @app.route('/download', methods=['POST'])
def download(): def download():
params = request.get_json() or {} params = request.get_json() or {}
@ -77,6 +85,8 @@ def execute():
params = request.get_json() or {} params = request.get_json() or {}
input = params.get('input') or {} input = params.get('input') or {}
execution_id = params['execution_id']
print('Start execution: %s' % execution_id)
app.logger.debug('Invoking function with input: %s' % input) app.logger.debug('Invoking function with input: %s' % input)
start = time.time() start = time.time()
@ -92,6 +102,8 @@ def execute():
exc_type, exc_value, exc_traceback = sys.exc_info() exc_type, exc_value, exc_traceback = sys.exc_info()
lines = traceback.format_exception(exc_type, exc_value, exc_traceback) lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
app.logger.debug(''.join(line for line in lines)) app.logger.debug(''.join(line for line in lines))
finally:
print('Finished execution: %s' % execution_id)
duration = time.time() - start duration = time.time() - start
return Response( return Response(