Merge "python3 runtime"
This commit is contained in:
commit
2495008aec
21
runtimes/python3/Dockerfile
Normal file
21
runtimes/python3/Dockerfile
Normal file
@ -0,0 +1,21 @@
|
||||
FROM phusion/baseimage:0.9.22
|
||||
MAINTAINER anlin.kong@gmail.com
|
||||
|
||||
# We need to use non-root user to execute functions and root user to set resource limits.
|
||||
USER root
|
||||
RUN useradd -Ms /bin/bash qinling
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get -y install python3-dev python3-setuptools libffi-dev libxslt1-dev libxml2-dev libyaml-dev libssl-dev python3-pip && \
|
||||
pip3 install -U pip setuptools uwsgi
|
||||
|
||||
COPY . /app
|
||||
WORKDIR /app
|
||||
RUN pip install --no-cache-dir -r requirements.txt && \
|
||||
chmod 0750 custom-entrypoint.sh && \
|
||||
mkdir /qinling_cgroup && \
|
||||
mkdir -p /var/lock/qinling && \
|
||||
mkdir -p /var/qinling/packages && \
|
||||
chown -R qinling:qinling /app /var/qinling/packages
|
||||
|
||||
CMD ["/bin/bash", "custom-entrypoint.sh"]
|
124
runtimes/python3/cglimit.py
Normal file
124
runtimes/python3/cglimit.py
Normal file
@ -0,0 +1,124 @@
|
||||
# Copyright 2018 Catalyst IT Limited
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
from flask import Flask
|
||||
from flask import make_response
|
||||
from flask import request
|
||||
from oslo_concurrency import lockutils
|
||||
|
||||
app = Flask(__name__)
|
||||
ch = logging.StreamHandler(sys.stdout)
|
||||
ch.setLevel(logging.DEBUG)
|
||||
ch.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
||||
del app.logger.handlers[:]
|
||||
app.logger.addHandler(ch)
|
||||
|
||||
# Deployer can specify cfs_period_us default value here.
|
||||
PERIOD = 100000
|
||||
|
||||
|
||||
def log(message, level="info"):
|
||||
global app
|
||||
log_func = getattr(app.logger, level)
|
||||
log_func(message)
|
||||
|
||||
|
||||
@lockutils.synchronized('set_limitation', external=True,
|
||||
lock_path='/var/lock/qinling')
|
||||
def _cgroup_limit(cpu, memory_size, pid):
|
||||
"""Modify 'cgroup' files to set resource limits.
|
||||
|
||||
Each pod(worker) will have cgroup folders on the host cgroup filesystem,
|
||||
like '/sys/fs/cgroup/<resource_type>/kubepods/<qos_class>/pod<pod_id>/',
|
||||
to limit memory and cpu resources that can be used in pod.
|
||||
|
||||
For more information about cgroup, please see [1], about sharing PID
|
||||
namespaces in kubernetes, please see also [2].
|
||||
|
||||
Return None if successful otherwise a Flask.Response object.
|
||||
|
||||
[1]https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/6/html/resource_management_guide/sec-creating_cgroups
|
||||
[2]https://github.com/kubernetes/kubernetes/pull/51634
|
||||
"""
|
||||
hostname = os.getenv('HOSTNAME')
|
||||
pod_id = os.getenv('POD_UID')
|
||||
qos_class = None
|
||||
if os.getenv('QOS_CLASS') == 'BestEffort':
|
||||
qos_class = 'besteffort'
|
||||
elif os.getenv('QOS_CLASS') == 'Burstable':
|
||||
qos_class = 'burstable'
|
||||
elif os.getenv('QOS_CLASS') == 'Guaranteed':
|
||||
qos_class = ''
|
||||
|
||||
if not pod_id or qos_class is None:
|
||||
return make_response("Failed to get current worker information", 500)
|
||||
|
||||
memory_base_path = os.path.join('/qinling_cgroup', 'memory', 'kubepods',
|
||||
qos_class, 'pod%s' % pod_id)
|
||||
cpu_base_path = os.path.join('/qinling_cgroup', 'cpu', 'kubepods',
|
||||
qos_class, 'pod%s' % pod_id)
|
||||
memory_path = os.path.join(memory_base_path, hostname)
|
||||
cpu_path = os.path.join(cpu_base_path, hostname)
|
||||
|
||||
if os.path.isdir(memory_base_path):
|
||||
if not os.path.isdir(memory_path):
|
||||
os.makedirs(memory_path)
|
||||
|
||||
if os.path.isdir(cpu_base_path):
|
||||
if not os.path.isdir(cpu_path):
|
||||
os.makedirs(cpu_path)
|
||||
|
||||
try:
|
||||
# set cpu and memory resource limits
|
||||
with open('%s/memory.limit_in_bytes' % memory_path, 'w') as f:
|
||||
f.write('%d' % int(memory_size))
|
||||
with open('%s/cpu.cfs_period_us' % cpu_path, 'w') as f:
|
||||
f.write('%d' % PERIOD)
|
||||
with open('%s/cpu.cfs_quota_us' % cpu_path, 'w') as f:
|
||||
f.write('%d' % ((int(cpu)*PERIOD/1000)))
|
||||
|
||||
# add pid to 'tasks' files
|
||||
with open('%s/tasks' % memory_path, 'w') as f:
|
||||
f.write('%d' % pid)
|
||||
with open('%s/tasks' % cpu_path, 'w') as f:
|
||||
f.write('%d' % pid)
|
||||
except Exception as e:
|
||||
return make_response("Failed to modify cgroup files: %s"
|
||||
% str(e), 500)
|
||||
|
||||
|
||||
@app.route('/cglimit', methods=['POST'])
|
||||
def cglimit():
|
||||
"""Set resource limitations for execution.
|
||||
|
||||
Only root user has jurisdiction to modify all cgroup files.
|
||||
|
||||
:param cpu: cpu resource that execution can use in total.
|
||||
:param memory_size: RAM resource that execution can use in total.
|
||||
|
||||
Currently swap ought to be disabled in kubernetes.
|
||||
"""
|
||||
params = request.get_json()
|
||||
cpu = params['cpu']
|
||||
memory_size = params['memory_size']
|
||||
pid = params['pid']
|
||||
log("Set resource limits request received, params: %s" % params)
|
||||
|
||||
resp = _cgroup_limit(cpu, memory_size, pid)
|
||||
|
||||
return resp if resp else 'pidlimited'
|
6
runtimes/python3/custom-entrypoint.sh
Normal file
6
runtimes/python3/custom-entrypoint.sh
Normal file
@ -0,0 +1,6 @@
|
||||
#!/usr/bin/env bash
|
||||
# This is expected to run as root.
|
||||
|
||||
uwsgi --http :9090 --uid qinling --wsgi-file server.py --callable app --master --processes 5 --threads 1 &
|
||||
|
||||
uwsgi --http 127.0.0.1:9092 --uid root --wsgi-file cglimit.py --callable app --master --processes 1 --threads 1
|
11
runtimes/python3/requirements.txt
Normal file
11
runtimes/python3/requirements.txt
Normal file
@ -0,0 +1,11 @@
|
||||
Flask>=0.10,!=0.11,<1.0 # BSD
|
||||
python-openstackclient>=3.3.0,!=3.10.0 # Apache-2.0
|
||||
python-neutronclient>=6.3.0 # Apache-2.0
|
||||
python-swiftclient>=3.2.0 # Apache-2.0
|
||||
python-ceilometerclient>=2.5.0 # Apache-2.0
|
||||
python-zaqarclient>=1.0.0 # Apache-2.0
|
||||
python-octaviaclient>=1.0.0 # Apache-2.0
|
||||
python-mistralclient>=3.1.0 # Apache-2.0
|
||||
keystoneauth1>=2.21.0 # Apache-2.0
|
||||
openstacksdk>=0.9.19
|
||||
oslo.concurrency>=3.25.0 # Apache-2.0
|
243
runtimes/python3/server.py
Normal file
243
runtimes/python3/server.py
Normal file
@ -0,0 +1,243 @@
|
||||
# Copyright 2017 Catalyst IT Limited
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import importlib
|
||||
import json
|
||||
from multiprocessing import Manager
|
||||
from multiprocessing import Process
|
||||
import os
|
||||
import resource
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from flask import Flask
|
||||
from flask import request
|
||||
from flask import Response
|
||||
from keystoneauth1.identity import generic
|
||||
from keystoneauth1 import session
|
||||
import requests
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
DOWNLOAD_ERROR = "Failed to download function package from %s, error: %s"
|
||||
INVOKE_ERROR = "Function execution failed because of too much resource " \
|
||||
"consumption"
|
||||
|
||||
|
||||
def _print_trace():
|
||||
exc_type, exc_value, exc_traceback = sys.exc_info()
|
||||
lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
|
||||
print((''.join(line for line in lines)))
|
||||
|
||||
|
||||
def _set_ulimit():
|
||||
"""Limit resources usage for the current process and/or its children.
|
||||
|
||||
Refer to https://docs.python.org/2.7/library/resource.html
|
||||
"""
|
||||
customized_limits = {
|
||||
resource.RLIMIT_NOFILE: 1024,
|
||||
resource.RLIMIT_NPROC: 128,
|
||||
# TODO(lxkong): 50M by default, need to be configurable in future.
|
||||
resource.RLIMIT_FSIZE: 524288000
|
||||
}
|
||||
for t, soft in list(customized_limits.items()):
|
||||
_, hard = resource.getrlimit(t)
|
||||
resource.setrlimit(t, (soft, hard))
|
||||
|
||||
|
||||
def _get_responce(output, duration, logs, success, code):
|
||||
return Response(
|
||||
response=json.dumps(
|
||||
{
|
||||
'output': output,
|
||||
'duration': duration,
|
||||
'logs': logs,
|
||||
'success': success
|
||||
}
|
||||
),
|
||||
status=code,
|
||||
mimetype='application/json'
|
||||
)
|
||||
|
||||
|
||||
def _invoke_function(execution_id, zip_file_dir, module_name, method, arg,
|
||||
input, return_dict, rlimit):
|
||||
"""Thie function is supposed to be running in a child process.
|
||||
|
||||
HOSTNAME will be used to create cgroup directory related to worker.
|
||||
|
||||
Current execution pid will be added to cgroup tasks file, and then all
|
||||
its child processes will be automatically added to this 'cgroup'.
|
||||
|
||||
Once executions exceed the cgroup limit, they will be killed by OOMKill
|
||||
and this subprocess will exit with number(-9).
|
||||
"""
|
||||
# Set resource limit for current sub-process
|
||||
_set_ulimit()
|
||||
|
||||
# Set cpu and memory limits to cgroup by calling cglimit service
|
||||
pid = os.getpid()
|
||||
root_resp = requests.post(
|
||||
'http://localhost:9092/cglimit',
|
||||
json={
|
||||
'cpu': rlimit['cpu'],
|
||||
'memory_size': rlimit['memory_size'],
|
||||
'pid': pid
|
||||
}
|
||||
)
|
||||
|
||||
sys.stdout = open("%s.out" % execution_id, "w")
|
||||
|
||||
if not root_resp.ok:
|
||||
print('WARN: Resource limiting failed, run in unlimit mode.')
|
||||
|
||||
print(('Start execution: %s' % execution_id))
|
||||
|
||||
sys.path.insert(0, zip_file_dir)
|
||||
try:
|
||||
module = importlib.import_module(module_name)
|
||||
func = getattr(module, method)
|
||||
return_dict['result'] = func(arg, **input) if arg else func(**input)
|
||||
return_dict['success'] = True
|
||||
except Exception as e:
|
||||
_print_trace()
|
||||
|
||||
if isinstance(e, OSError) and 'Resource' in str(e):
|
||||
sys.exit(1)
|
||||
|
||||
return_dict['result'] = str(e)
|
||||
return_dict['success'] = False
|
||||
finally:
|
||||
print(('Finished execution: %s' % execution_id))
|
||||
|
||||
|
||||
@app.route('/execute', methods=['POST'])
|
||||
def execute():
|
||||
"""Invoke function.
|
||||
|
||||
Several things need to handle in this function:
|
||||
- Save the function log
|
||||
- Capture the function internal exception
|
||||
- Deal with process execution error (The process may be killed for some
|
||||
reason, e.g. unlimited memory allocation)
|
||||
- Deal with os error for process (e.g. Resource temporarily unavailable)
|
||||
"""
|
||||
params = request.get_json() or {}
|
||||
input = params.get('input') or {}
|
||||
execution_id = params['execution_id']
|
||||
download_url = params.get('download_url')
|
||||
function_id = params.get('function_id')
|
||||
entry = params.get('entry')
|
||||
request_id = params.get('request_id')
|
||||
trust_id = params.get('trust_id')
|
||||
auth_url = params.get('auth_url')
|
||||
username = params.get('username')
|
||||
password = params.get('password')
|
||||
zip_file_dir = '/var/qinling/packages/%s' % function_id
|
||||
rlimit = {
|
||||
'cpu': params['cpu'],
|
||||
'memory_size': params['memory_size']
|
||||
}
|
||||
|
||||
function_module, function_method = 'main', 'main'
|
||||
if entry:
|
||||
function_module, function_method = tuple(entry.rsplit('.', 1))
|
||||
|
||||
print((
|
||||
'Request received, request_id: %s, execution_id: %s, input: %s, '
|
||||
'auth_url: %s' %
|
||||
(request_id, execution_id, input, auth_url)
|
||||
))
|
||||
|
||||
####################################################################
|
||||
#
|
||||
# Download function package by calling sidecar service. We don't check the
|
||||
# zip file existence here to avoid using partial file during downloading.
|
||||
#
|
||||
####################################################################
|
||||
resp = requests.post(
|
||||
'http://localhost:9091/download',
|
||||
json={
|
||||
'download_url': download_url,
|
||||
'function_id': function_id,
|
||||
'token': params.get('token')
|
||||
}
|
||||
)
|
||||
if not resp.ok:
|
||||
return _get_responce(resp.content, 0, '', False, 500)
|
||||
|
||||
####################################################################
|
||||
#
|
||||
# Provide an openstack session to user's function
|
||||
#
|
||||
####################################################################
|
||||
os_session = None
|
||||
if auth_url:
|
||||
auth = generic.Password(
|
||||
username=username,
|
||||
password=password,
|
||||
auth_url=auth_url,
|
||||
trust_id=trust_id,
|
||||
user_domain_name='Default'
|
||||
)
|
||||
os_session = session.Session(auth=auth, verify=False)
|
||||
input.update({'context': {'os_session': os_session}})
|
||||
|
||||
####################################################################
|
||||
#
|
||||
# Create a new process to run user's function
|
||||
#
|
||||
####################################################################
|
||||
manager = Manager()
|
||||
return_dict = manager.dict()
|
||||
return_dict['success'] = False
|
||||
start = time.time()
|
||||
|
||||
# Run the function in a separate process to avoid messing up the log
|
||||
p = Process(
|
||||
target=_invoke_function,
|
||||
args=(execution_id, zip_file_dir, function_module, function_method,
|
||||
input.pop('__function_input', None), input, return_dict, rlimit)
|
||||
)
|
||||
p.start()
|
||||
p.join()
|
||||
|
||||
####################################################################
|
||||
#
|
||||
# Get execution output(log, duration, etc.)
|
||||
#
|
||||
####################################################################
|
||||
duration = round(time.time() - start, 3)
|
||||
|
||||
# Process was killed unexpectedly or finished with error.
|
||||
if p.exitcode != 0:
|
||||
output = INVOKE_ERROR
|
||||
success = False
|
||||
else:
|
||||
output = return_dict.get('result')
|
||||
success = return_dict['success']
|
||||
|
||||
# Execution log
|
||||
with open('%s.out' % execution_id) as f:
|
||||
logs = f.read()
|
||||
os.remove('%s.out' % execution_id)
|
||||
|
||||
return _get_responce(output, duration, logs, success, 200)
|
||||
|
||||
|
||||
@app.route('/ping')
|
||||
def ping():
|
||||
return 'pong'
|
Loading…
Reference in New Issue
Block a user