diff --git a/runtimes/python3/Dockerfile b/runtimes/python3/Dockerfile new file mode 100644 index 00000000..35670e79 --- /dev/null +++ b/runtimes/python3/Dockerfile @@ -0,0 +1,21 @@ +FROM phusion/baseimage:0.9.22 +MAINTAINER anlin.kong@gmail.com + +# We need to use non-root user to execute functions and root user to set resource limits. +USER root +RUN useradd -Ms /bin/bash qinling + +RUN apt-get update && \ + apt-get -y install python3-dev python3-setuptools libffi-dev libxslt1-dev libxml2-dev libyaml-dev libssl-dev python3-pip && \ + pip3 install -U pip setuptools uwsgi + +COPY . /app +WORKDIR /app +RUN pip install --no-cache-dir -r requirements.txt && \ + chmod 0750 custom-entrypoint.sh && \ + mkdir /qinling_cgroup && \ + mkdir -p /var/lock/qinling && \ + mkdir -p /var/qinling/packages && \ + chown -R qinling:qinling /app /var/qinling/packages + +CMD ["/bin/bash", "custom-entrypoint.sh"] diff --git a/runtimes/python3/cglimit.py b/runtimes/python3/cglimit.py new file mode 100644 index 00000000..0b63ac85 --- /dev/null +++ b/runtimes/python3/cglimit.py @@ -0,0 +1,124 @@ +# Copyright 2018 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import sys + +from flask import Flask +from flask import make_response +from flask import request +from oslo_concurrency import lockutils + +app = Flask(__name__) +ch = logging.StreamHandler(sys.stdout) +ch.setLevel(logging.DEBUG) +ch.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) +del app.logger.handlers[:] +app.logger.addHandler(ch) + +# Deployer can specify cfs_period_us default value here. +PERIOD = 100000 + + +def log(message, level="info"): + global app + log_func = getattr(app.logger, level) + log_func(message) + + +@lockutils.synchronized('set_limitation', external=True, + lock_path='/var/lock/qinling') +def _cgroup_limit(cpu, memory_size, pid): + """Modify 'cgroup' files to set resource limits. + + Each pod(worker) will have cgroup folders on the host cgroup filesystem, + like '/sys/fs/cgroup//kubepods//pod/', + to limit memory and cpu resources that can be used in pod. + + For more information about cgroup, please see [1], about sharing PID + namespaces in kubernetes, please see also [2]. + + Return None if successful otherwise a Flask.Response object. + + [1]https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/6/html/resource_management_guide/sec-creating_cgroups + [2]https://github.com/kubernetes/kubernetes/pull/51634 + """ + hostname = os.getenv('HOSTNAME') + pod_id = os.getenv('POD_UID') + qos_class = None + if os.getenv('QOS_CLASS') == 'BestEffort': + qos_class = 'besteffort' + elif os.getenv('QOS_CLASS') == 'Burstable': + qos_class = 'burstable' + elif os.getenv('QOS_CLASS') == 'Guaranteed': + qos_class = '' + + if not pod_id or qos_class is None: + return make_response("Failed to get current worker information", 500) + + memory_base_path = os.path.join('/qinling_cgroup', 'memory', 'kubepods', + qos_class, 'pod%s' % pod_id) + cpu_base_path = os.path.join('/qinling_cgroup', 'cpu', 'kubepods', + qos_class, 'pod%s' % pod_id) + memory_path = os.path.join(memory_base_path, hostname) + cpu_path = os.path.join(cpu_base_path, hostname) + + if os.path.isdir(memory_base_path): + if not os.path.isdir(memory_path): + os.makedirs(memory_path) + + if os.path.isdir(cpu_base_path): + if not os.path.isdir(cpu_path): + os.makedirs(cpu_path) + + try: + # set cpu and memory resource limits + with open('%s/memory.limit_in_bytes' % memory_path, 'w') as f: + f.write('%d' % int(memory_size)) + with open('%s/cpu.cfs_period_us' % cpu_path, 'w') as f: + f.write('%d' % PERIOD) + with open('%s/cpu.cfs_quota_us' % cpu_path, 'w') as f: + f.write('%d' % ((int(cpu)*PERIOD/1000))) + + # add pid to 'tasks' files + with open('%s/tasks' % memory_path, 'w') as f: + f.write('%d' % pid) + with open('%s/tasks' % cpu_path, 'w') as f: + f.write('%d' % pid) + except Exception as e: + return make_response("Failed to modify cgroup files: %s" + % str(e), 500) + + +@app.route('/cglimit', methods=['POST']) +def cglimit(): + """Set resource limitations for execution. + + Only root user has jurisdiction to modify all cgroup files. + + :param cpu: cpu resource that execution can use in total. + :param memory_size: RAM resource that execution can use in total. + + Currently swap ought to be disabled in kubernetes. + """ + params = request.get_json() + cpu = params['cpu'] + memory_size = params['memory_size'] + pid = params['pid'] + log("Set resource limits request received, params: %s" % params) + + resp = _cgroup_limit(cpu, memory_size, pid) + + return resp if resp else 'pidlimited' diff --git a/runtimes/python3/custom-entrypoint.sh b/runtimes/python3/custom-entrypoint.sh new file mode 100644 index 00000000..f0404fb7 --- /dev/null +++ b/runtimes/python3/custom-entrypoint.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash +# This is expected to run as root. + +uwsgi --http :9090 --uid qinling --wsgi-file server.py --callable app --master --processes 5 --threads 1 & + +uwsgi --http 127.0.0.1:9092 --uid root --wsgi-file cglimit.py --callable app --master --processes 1 --threads 1 diff --git a/runtimes/python3/requirements.txt b/runtimes/python3/requirements.txt new file mode 100644 index 00000000..d1e6381a --- /dev/null +++ b/runtimes/python3/requirements.txt @@ -0,0 +1,11 @@ +Flask>=0.10,!=0.11,<1.0 # BSD +python-openstackclient>=3.3.0,!=3.10.0 # Apache-2.0 +python-neutronclient>=6.3.0 # Apache-2.0 +python-swiftclient>=3.2.0 # Apache-2.0 +python-ceilometerclient>=2.5.0 # Apache-2.0 +python-zaqarclient>=1.0.0 # Apache-2.0 +python-octaviaclient>=1.0.0 # Apache-2.0 +python-mistralclient>=3.1.0 # Apache-2.0 +keystoneauth1>=2.21.0 # Apache-2.0 +openstacksdk>=0.9.19 +oslo.concurrency>=3.25.0 # Apache-2.0 diff --git a/runtimes/python3/server.py b/runtimes/python3/server.py new file mode 100644 index 00000000..1e9e939e --- /dev/null +++ b/runtimes/python3/server.py @@ -0,0 +1,243 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import importlib +import json +from multiprocessing import Manager +from multiprocessing import Process +import os +import resource +import sys +import time +import traceback + +from flask import Flask +from flask import request +from flask import Response +from keystoneauth1.identity import generic +from keystoneauth1 import session +import requests + +app = Flask(__name__) + +DOWNLOAD_ERROR = "Failed to download function package from %s, error: %s" +INVOKE_ERROR = "Function execution failed because of too much resource " \ + "consumption" + + +def _print_trace(): + exc_type, exc_value, exc_traceback = sys.exc_info() + lines = traceback.format_exception(exc_type, exc_value, exc_traceback) + print((''.join(line for line in lines))) + + +def _set_ulimit(): + """Limit resources usage for the current process and/or its children. + + Refer to https://docs.python.org/2.7/library/resource.html + """ + customized_limits = { + resource.RLIMIT_NOFILE: 1024, + resource.RLIMIT_NPROC: 128, + # TODO(lxkong): 50M by default, need to be configurable in future. + resource.RLIMIT_FSIZE: 524288000 + } + for t, soft in list(customized_limits.items()): + _, hard = resource.getrlimit(t) + resource.setrlimit(t, (soft, hard)) + + +def _get_responce(output, duration, logs, success, code): + return Response( + response=json.dumps( + { + 'output': output, + 'duration': duration, + 'logs': logs, + 'success': success + } + ), + status=code, + mimetype='application/json' + ) + + +def _invoke_function(execution_id, zip_file_dir, module_name, method, arg, + input, return_dict, rlimit): + """Thie function is supposed to be running in a child process. + + HOSTNAME will be used to create cgroup directory related to worker. + + Current execution pid will be added to cgroup tasks file, and then all + its child processes will be automatically added to this 'cgroup'. + + Once executions exceed the cgroup limit, they will be killed by OOMKill + and this subprocess will exit with number(-9). + """ + # Set resource limit for current sub-process + _set_ulimit() + + # Set cpu and memory limits to cgroup by calling cglimit service + pid = os.getpid() + root_resp = requests.post( + 'http://localhost:9092/cglimit', + json={ + 'cpu': rlimit['cpu'], + 'memory_size': rlimit['memory_size'], + 'pid': pid + } + ) + + sys.stdout = open("%s.out" % execution_id, "w") + + if not root_resp.ok: + print('WARN: Resource limiting failed, run in unlimit mode.') + + print(('Start execution: %s' % execution_id)) + + sys.path.insert(0, zip_file_dir) + try: + module = importlib.import_module(module_name) + func = getattr(module, method) + return_dict['result'] = func(arg, **input) if arg else func(**input) + return_dict['success'] = True + except Exception as e: + _print_trace() + + if isinstance(e, OSError) and 'Resource' in str(e): + sys.exit(1) + + return_dict['result'] = str(e) + return_dict['success'] = False + finally: + print(('Finished execution: %s' % execution_id)) + + +@app.route('/execute', methods=['POST']) +def execute(): + """Invoke function. + + Several things need to handle in this function: + - Save the function log + - Capture the function internal exception + - Deal with process execution error (The process may be killed for some + reason, e.g. unlimited memory allocation) + - Deal with os error for process (e.g. Resource temporarily unavailable) + """ + params = request.get_json() or {} + input = params.get('input') or {} + execution_id = params['execution_id'] + download_url = params.get('download_url') + function_id = params.get('function_id') + entry = params.get('entry') + request_id = params.get('request_id') + trust_id = params.get('trust_id') + auth_url = params.get('auth_url') + username = params.get('username') + password = params.get('password') + zip_file_dir = '/var/qinling/packages/%s' % function_id + rlimit = { + 'cpu': params['cpu'], + 'memory_size': params['memory_size'] + } + + function_module, function_method = 'main', 'main' + if entry: + function_module, function_method = tuple(entry.rsplit('.', 1)) + + print(( + 'Request received, request_id: %s, execution_id: %s, input: %s, ' + 'auth_url: %s' % + (request_id, execution_id, input, auth_url) + )) + + #################################################################### + # + # Download function package by calling sidecar service. We don't check the + # zip file existence here to avoid using partial file during downloading. + # + #################################################################### + resp = requests.post( + 'http://localhost:9091/download', + json={ + 'download_url': download_url, + 'function_id': function_id, + 'token': params.get('token') + } + ) + if not resp.ok: + return _get_responce(resp.content, 0, '', False, 500) + + #################################################################### + # + # Provide an openstack session to user's function + # + #################################################################### + os_session = None + if auth_url: + auth = generic.Password( + username=username, + password=password, + auth_url=auth_url, + trust_id=trust_id, + user_domain_name='Default' + ) + os_session = session.Session(auth=auth, verify=False) + input.update({'context': {'os_session': os_session}}) + + #################################################################### + # + # Create a new process to run user's function + # + #################################################################### + manager = Manager() + return_dict = manager.dict() + return_dict['success'] = False + start = time.time() + + # Run the function in a separate process to avoid messing up the log + p = Process( + target=_invoke_function, + args=(execution_id, zip_file_dir, function_module, function_method, + input.pop('__function_input', None), input, return_dict, rlimit) + ) + p.start() + p.join() + + #################################################################### + # + # Get execution output(log, duration, etc.) + # + #################################################################### + duration = round(time.time() - start, 3) + + # Process was killed unexpectedly or finished with error. + if p.exitcode != 0: + output = INVOKE_ERROR + success = False + else: + output = return_dict.get('result') + success = return_dict['success'] + + # Execution log + with open('%s.out' % execution_id) as f: + logs = f.read() + os.remove('%s.out' % execution_id) + + return _get_responce(output, duration, logs, success, 200) + + +@app.route('/ping') +def ping(): + return 'pong'