Browse Source

Fix logs query for executions

After function autoscaling support, it's impossible for qinling-engine
to get execution log because it doesn't know which pod it is talking
to.

So, it's neccessary for the runtime to return execution logs to
qinling engine.

The qinling client is not affected.

Change-Id: I96dfd00cc83d8b8a5e8c601ee3800b1ef1a45b1b
changes/22/498622/3
Lingxian Kong 4 years ago
parent
commit
f2189a0468
  1. 8
      qinling/db/api.py
  2. 17
      qinling/db/sqlalchemy/api.py
  3. 28
      qinling/engine/default_engine.py
  4. 4
      qinling/orchestrator/base.py
  5. 15
      qinling/orchestrator/kubernetes/manager.py
  6. 25
      qinling/services/periodics.py
  7. 1
      qinling/status.py
  8. 82
      runtimes/python2/server.py

8
qinling/db/api.py

@ -166,6 +166,14 @@ def create_function_worker(values):
return IMPL.create_function_worker(values)
def delete_function_workers(id):
return IMPL.delete_function_workers(id)
def get_function_workers(function_id):
return IMPL.get_function_workers(function_id)
def create_job(values):
return IMPL.create_job(values)

17
qinling/db/sqlalchemy/api.py

@ -410,6 +410,23 @@ def create_function_worker(values, session=None):
return mapping
@db_base.session_aware()
def get_function_workers(function_id, session=None):
workers = db_base.model_query(
models.FunctionWorkers
).filter_by(function_id=function_id).all()
return workers
@db_base.session_aware()
def delete_function_workers(id, session=None):
workers = get_function_workers(id)
for worker in workers:
session.delete(worker)
@db_base.session_aware()
def create_job(values, session=None):
job = models.Job()

28
qinling/engine/default_engine.py

@ -89,6 +89,7 @@ class DefaultEngine(object):
execution_id, function_id, runtime_id, input
)
# FIXME(kong): Make the transaction range smaller.
with db_api.transaction():
execution = db_api.get_execution(execution_id)
function = db_api.get_function(function_id)
@ -102,17 +103,14 @@ class DefaultEngine(object):
data = {'input': input, 'execution_id': execution_id}
r = requests.post(func_url, json=data)
# logs = self.orchestrator.get_execution_log(
# execution_id,
# worker_name=function.service.worker_name,
# )
res = r.json()
LOG.debug('Finished execution %s', execution_id)
execution.status = status.SUCCESS
execution.output = r.json()
# execution.logs = logs
success = res.pop('success')
execution.status = status.SUCCESS if success else status.FAILED
execution.logs = res.pop('logs', '')
execution.output = res
return
source = function.code['source']
@ -150,10 +148,14 @@ class DefaultEngine(object):
logs = ''
# Execution log is only available for non-image source execution.
if service_url:
logs = self.orchestrator.get_execution_log(
execution_id,
worker_name=worker_name,
)
logs = output.pop('logs', '')
success = output.pop('success')
else:
# If the function is created from docker image, the output is
# direct output, here we convert to a dict to fit into the db
# schema.
output = {'output': output}
success = True
LOG.debug(
'Finished execution. execution_id=%s, output=%s',
@ -162,7 +164,7 @@ class DefaultEngine(object):
)
execution.output = output
execution.logs = logs
execution.status = status.SUCCESS
execution.status = status.SUCCESS if success else status.FAILED
# No service is created in orchestrator for single container.
if not image:

4
qinling/orchestrator/base.py

@ -46,10 +46,6 @@ class OrchestratorBase(object):
def run_execution(self, execution_id, function_id, **kwargs):
raise NotImplementedError
@abc.abstractmethod
def get_execution_log(self, execution_id, **kwargs):
raise NotImplementedError
@abc.abstractmethod
def delete_function(self, function_id, **kwargs):
raise NotImplementedError

15
qinling/orchestrator/kubernetes/manager.py

@ -387,21 +387,6 @@ class KubernetesManager(base.OrchestratorBase):
return output
def get_execution_log(self, execution_id, worker_name=None):
logs = self.v1.read_namespaced_pod_log(
worker_name,
self.conf.kubernetes.namespace,
)
b_index = logs.index('Start execution: %s' % execution_id)
end_string = 'Finished execution: %s' % execution_id
e_index = logs.index(end_string)
e_index += len(end_string)
execution_log = logs[b_index:e_index]
return execution_log
def delete_function(self, function_id, labels=None):
selector = common.convert_dict_to_string(labels)

25
qinling/services/periodics.py

@ -42,27 +42,26 @@ def handle_function_service_expiration(ctx, engine_client, orchestrator):
expiry_time = datetime.utcnow() - delta
results = db_api.get_functions(
fields=['id'],
sort_keys=['updated_at'],
insecure=True,
updated_at={'lte': expiry_time}
)
expiry_ids = [ret.id for ret in results]
if not expiry_ids:
if len(results) == 0:
return
mappings = db_api.get_function_service_mappings(
function_id={'in': expiry_ids}
)
for func_db in results:
with db_api.transaction():
LOG.info(
'Deleting service mapping and workers for function %s',
func_db.id
)
with db_api.transaction():
for m in mappings:
LOG.info('Deleting service mapping for function %s', m.function_id)
# Delete resources related to the function
engine_client.delete_function(func_db.id)
engine_client.delete_function(m.function_id)
db_api.delete_function_service_mapping(m.function_id)
# Delete service mapping and worker records
db_api.delete_function_service_mapping(func_db.id)
db_api.delete_function_workers(func_db.id)
def handle_job(engine_client):

1
qinling/status.py

@ -22,3 +22,4 @@ DONE = 'done'
PAUSED = 'paused'
CANCELLED = 'cancelled'
SUCCESS = 'success'
FAILED = 'failed'

82
runtimes/python2/server.py

@ -15,6 +15,8 @@
import importlib
import json
import logging
from multiprocessing import Manager
from multiprocessing import Process
import os
import sys
import time
@ -31,13 +33,6 @@ app = Flask(__name__)
zip_file = ''
function_module = 'main'
function_method = 'main'
zip_imported = False
# By default sys.stdout is usually line buffered for tty devices and fully
# buffered for other files. We need to change it to unbuffered to get execution
# log properly.
unbuffered = os.fdopen(sys.stdout.fileno(), 'w', 0)
sys.stdout = unbuffered
@app.route('/download', methods=['POST'])
@ -77,41 +72,70 @@ def download():
return 'success'
def _invoke_function(execution_id, zip_file, module_name, method, input,
return_dict):
"""Thie function is supposed to be running in a child process."""
sys.path.insert(0, zip_file)
sys.stdout = open("%s.out" % execution_id, "w", 0)
print('Start execution: %s' % execution_id)
try:
module = importlib.import_module(module_name)
func = getattr(module, method)
return_dict['result'] = func(**input)
except Exception as e:
return_dict['result'] = str(e)
return_dict['success'] = False
# Print stacktrace
exc_type, exc_value, exc_traceback = sys.exc_info()
lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
print(''.join(line for line in lines))
finally:
print('Finished execution: %s' % execution_id)
@app.route('/execute', methods=['POST'])
def execute():
global zip_imported
global zip_file
global function_module
global function_method
if not zip_imported:
sys.path.insert(0, zip_file)
zip_imported = True
params = request.get_json() or {}
input = params.get('input') or {}
execution_id = params['execution_id']
print('Start execution: %s' % execution_id)
app.logger.debug('Invoking function with input: %s' % input)
manager = Manager()
return_dict = manager.dict()
return_dict['success'] = True
start = time.time()
try:
module = importlib.import_module(function_module)
func = getattr(module, function_method)
result = func(**input)
except Exception as e:
result = str(e)
# Print stacktrace
exc_type, exc_value, exc_traceback = sys.exc_info()
lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
app.logger.debug(''.join(line for line in lines))
finally:
print('Finished execution: %s' % execution_id)
# Run the function in a separate process to avoid messing up the log
p = Process(
target=_invoke_function,
args=(execution_id, zip_file, function_module, function_method,
input, return_dict)
)
p.start()
p.join()
duration = round(time.time() - start, 3)
with open('%s.out' % execution_id) as f:
logs = f.read()
os.remove('%s.out' % execution_id)
duration = time.time() - start
return Response(
response=json.dumps({'output': result, 'duration': duration}),
response=json.dumps(
{
'output': return_dict.get('result'),
'duration': duration,
'logs': logs,
'success': return_dict['success']
}
),
status=200,
mimetype='application/json'
)
@ -131,4 +155,6 @@ def setup_logger(loglevel):
setup_logger(logging.DEBUG)
app.logger.info("Starting server")
# Just for testing purpose
app.run(host='0.0.0.0', port='9090', threaded=True)
Loading…
Cancel
Save