diff --git a/devstack/settings b/devstack/settings index 821a97f0..b991fa35 100644 --- a/devstack/settings +++ b/devstack/settings @@ -22,7 +22,7 @@ QINLING_CONF_FILE=${QINLING_CONF_DIR}/qinling.conf QINLING_POLICY_FILE=${QINLING_CONF_DIR}/policy.json QINLING_AUTH_CACHE_DIR=${QINLING_AUTH_CACHE_DIR:-/var/cache/qinling} QINLING_FUNCTION_STORAGE_DIR=${QINLING_FUNCTION_STORAGE_DIR:-/opt/qinling/funtion/packages} -QINLING_PYTHON_RUNTIME_IMAGE=${QINLING_PYTHON_RUNTIME_IMAGE:-openstackqinling/python-runtime:0.0.2} +QINLING_PYTHON_RUNTIME_IMAGE=${QINLING_PYTHON_RUNTIME_IMAGE:-openstackqinling/python-runtime:0.0.3} QINLING_NODEJS_RUNTIME_IMAGE=${QINLING_NODEJS_RUNTIME_IMAGE:-openstackqinling/nodejs-runtime:0.0.1} QINLING_SIDECAR_IMAGE=${QINLING_SIDECAR_IMAGE:-openstackqinling/sidecar:0.0.1} diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index ba9d00ef..75a6dc91 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -160,7 +160,7 @@ class DefaultEngine(object): data = utils.get_request_data( CONF, function_id, function_version, execution_id, - input, function.entry, function.trust_id, + rlimit, input, function.entry, function.trust_id, self.qinling_endpoint ) success, res = utils.url_request( @@ -206,6 +206,7 @@ class DefaultEngine(object): execution_id, function_id, function_version, + rlimit=rlimit if svc_url else None, input=input, identifier=identifier, service_url=svc_url, diff --git a/qinling/engine/utils.py b/qinling/engine/utils.py index e1ef85fb..7db0fb3b 100644 --- a/qinling/engine/utils.py +++ b/qinling/engine/utils.py @@ -74,8 +74,8 @@ def url_request(request_session, url, body=None): return False, {'error': 'Internal service error.'} -def get_request_data(conf, function_id, version, execution_id, input, entry, - trust_id, qinling_endpoint): +def get_request_data(conf, function_id, version, execution_id, rlimit, input, + entry, trust_id, qinling_endpoint): """Prepare the request body should send to the worker.""" ctx = context.get_ctx() @@ -94,6 +94,8 @@ def get_request_data(conf, function_id, version, execution_id, input, entry, data = { 'execution_id': execution_id, + 'cpu': rlimit['cpu'], + 'memory_size': rlimit['memory_size'], 'input': input, 'function_id': function_id, 'function_version': version, diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index 392a5ebb..65ffc5c9 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -421,9 +421,9 @@ class KubernetesManager(base.OrchestratorBase): self.delete_function(function_id, version, labels) raise exc.OrchestratorException('Execution preparation failed.') - def run_execution(self, execution_id, function_id, version, input=None, - identifier=None, service_url=None, entry='main.main', - trust_id=None): + def run_execution(self, execution_id, function_id, version, rlimit=None, + input=None, identifier=None, service_url=None, + entry='main.main', trust_id=None): """Run execution. Return a tuple including the result and the output. @@ -431,8 +431,8 @@ class KubernetesManager(base.OrchestratorBase): if service_url: func_url = '%s/execute' % service_url data = utils.get_request_data( - self.conf, function_id, version, execution_id, input, entry, - trust_id, self.qinling_endpoint + self.conf, function_id, version, execution_id, rlimit, input, + entry, trust_id, self.qinling_endpoint ) LOG.debug( 'Invoke function %s(version %s), url: %s, data: %s', diff --git a/qinling/orchestrator/kubernetes/templates/deployment.j2 b/qinling/orchestrator/kubernetes/templates/deployment.j2 index 17822d96..bcb2a7b8 100644 --- a/qinling/orchestrator/kubernetes/templates/deployment.j2 +++ b/qinling/orchestrator/kubernetes/templates/deployment.j2 @@ -25,6 +25,9 @@ spec: volumes: - name: package-folder emptyDir: {} + - name: cgroup-folder + hostPath: + path: /sys/fs/cgroup containers: - name: {{ container_name }} image: {{ image }} @@ -34,6 +37,15 @@ spec: volumeMounts: - name: package-folder mountPath: /var/qinling/packages + - name: cgroup-folder + mountPath: /qinling_cgroup + env: + - name: POD_UID + valueFrom: + fieldRef: + fieldPath: metadata.uid + - name: QOS_CLASS + value: "BestEffort" - name: sidecar image: {{ sidecar_image }} imagePullPolicy: IfNotPresent diff --git a/qinling/tests/unit/engine/test_default_engine.py b/qinling/tests/unit/engine/test_default_engine.py index c23b9935..22ddb841 100644 --- a/qinling/tests/unit/engine/test_default_engine.py +++ b/qinling/tests/unit/engine/test_default_engine.py @@ -261,6 +261,7 @@ class TestDefaultEngine(base.DbTestCase): mock.call(execution_1_id, function_id, 0, + rlimit=None, input=None, identifier=mock.ANY, service_url=None, @@ -269,6 +270,7 @@ class TestDefaultEngine(base.DbTestCase): mock.call(execution_2_id, function_id, 0, + rlimit=None, input='input', identifier=mock.ANY, service_url=None, @@ -357,8 +359,8 @@ class TestDefaultEngine(base.DbTestCase): identifier=runtime_id, labels={'runtime_id': runtime_id}, input=None) self.orchestrator.run_execution.assert_called_once_with( - execution_id, function_id, 0, input=None, identifier=runtime_id, - service_url='svc_url', entry=function.entry, + execution_id, function_id, 0, rlimit=self.rlimit, input=None, + identifier=runtime_id, service_url='svc_url', entry=function.entry, trust_id=function.trust_id) execution = db_api.get_execution(execution_id) @@ -419,7 +421,7 @@ class TestDefaultEngine(base.DbTestCase): function_id, 0, runtime_id) etcd_util_get_service_url_mock.assert_called_once_with(function_id, 0) engine_utils_get_request_data_mock.assert_called_once_with( - mock.ANY, function_id, 0, execution_id, + mock.ANY, function_id, 0, execution_id, self.rlimit, 'input', function.entry, function.trust_id, self.qinling_endpoint) engine_utils_url_request_mock.assert_called_once_with( diff --git a/qinling/tests/unit/orchestrator/kubernetes/test_manager.py b/qinling/tests/unit/orchestrator/kubernetes/test_manager.py index 22160647..77c41162 100644 --- a/qinling/tests/unit/orchestrator/kubernetes/test_manager.py +++ b/qinling/tests/unit/orchestrator/kubernetes/test_manager.py @@ -1,4 +1,4 @@ -# Copyright 2018 AWCloud Software Co., Ltd. +# Copyright 2018 AWCloud Software Co., Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -601,13 +601,16 @@ class TestKubernetesManager(base.DbTestCase): function_id = common.generate_unicode_uuid() result, output = self.manager.run_execution( - execution_id, function_id, 0, service_url='FAKE_URL' + execution_id, function_id, 0, rlimit=self.rlimit, + service_url='FAKE_URL' ) download_url = ('http://127.0.0.1:7070/v1/functions/%s?download=true' % function_id) data = { 'execution_id': execution_id, + 'cpu': self.rlimit['cpu'], + 'memory_size': self.rlimit['memory_size'], 'input': None, 'function_id': function_id, 'function_version': 0, diff --git a/qinling_tempest_plugin/config.py b/qinling_tempest_plugin/config.py index f7e8b364..13ae87ee 100644 --- a/qinling_tempest_plugin/config.py +++ b/qinling_tempest_plugin/config.py @@ -41,7 +41,7 @@ QinlingGroup = [ 'publicURL', 'adminURL', 'internalURL'], help="The endpoint type to use for the qinling service."), cfg.StrOpt("python_runtime_image", - default="openstackqinling/python-runtime:0.0.2", + default="openstackqinling/python-runtime:0.0.3", help="The Python runtime being used in the tests."), cfg.StrOpt("nodejs_runtime_image", default="openstackqinling/nodejs-runtime:0.0.1", diff --git a/runtimes/python2/Dockerfile b/runtimes/python2/Dockerfile index 20a6faea..7620169a 100644 --- a/runtimes/python2/Dockerfile +++ b/runtimes/python2/Dockerfile @@ -1,7 +1,7 @@ FROM phusion/baseimage:0.9.22 MAINTAINER anlin.kong@gmail.com -# We need to use non-root user because root user is not affected by ulimit. +# We need to use non-root user to execute functions and root user to set resource limits. USER root RUN useradd -Ms /bin/bash qinling @@ -12,8 +12,10 @@ RUN apt-get update && \ COPY . /app WORKDIR /app RUN pip install --no-cache-dir -r requirements.txt && \ + chmod 0750 custom-entrypoint.sh && \ + mkdir /qinling_cgroup && \ + mkdir -p /var/lock/qinling && \ mkdir -p /var/qinling/packages && \ chown -R qinling:qinling /app /var/qinling/packages -# uwsgi --http :9090 --uid qinling --wsgi-file server.py --callable app --master --processes 5 --threads 1 -CMD ["/usr/local/bin/uwsgi", "--http", ":9090", "--uid", "qinling", "--wsgi-file", "server.py", "--callable", "app", "--master", "--processes", "5", "--threads", "1"] +CMD ["/bin/bash", "custom-entrypoint.sh"] diff --git a/runtimes/python2/cglimit.py b/runtimes/python2/cglimit.py new file mode 100644 index 00000000..0b63ac85 --- /dev/null +++ b/runtimes/python2/cglimit.py @@ -0,0 +1,124 @@ +# Copyright 2018 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import sys + +from flask import Flask +from flask import make_response +from flask import request +from oslo_concurrency import lockutils + +app = Flask(__name__) +ch = logging.StreamHandler(sys.stdout) +ch.setLevel(logging.DEBUG) +ch.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) +del app.logger.handlers[:] +app.logger.addHandler(ch) + +# Deployer can specify cfs_period_us default value here. +PERIOD = 100000 + + +def log(message, level="info"): + global app + log_func = getattr(app.logger, level) + log_func(message) + + +@lockutils.synchronized('set_limitation', external=True, + lock_path='/var/lock/qinling') +def _cgroup_limit(cpu, memory_size, pid): + """Modify 'cgroup' files to set resource limits. + + Each pod(worker) will have cgroup folders on the host cgroup filesystem, + like '/sys/fs/cgroup//kubepods//pod/', + to limit memory and cpu resources that can be used in pod. + + For more information about cgroup, please see [1], about sharing PID + namespaces in kubernetes, please see also [2]. + + Return None if successful otherwise a Flask.Response object. + + [1]https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/6/html/resource_management_guide/sec-creating_cgroups + [2]https://github.com/kubernetes/kubernetes/pull/51634 + """ + hostname = os.getenv('HOSTNAME') + pod_id = os.getenv('POD_UID') + qos_class = None + if os.getenv('QOS_CLASS') == 'BestEffort': + qos_class = 'besteffort' + elif os.getenv('QOS_CLASS') == 'Burstable': + qos_class = 'burstable' + elif os.getenv('QOS_CLASS') == 'Guaranteed': + qos_class = '' + + if not pod_id or qos_class is None: + return make_response("Failed to get current worker information", 500) + + memory_base_path = os.path.join('/qinling_cgroup', 'memory', 'kubepods', + qos_class, 'pod%s' % pod_id) + cpu_base_path = os.path.join('/qinling_cgroup', 'cpu', 'kubepods', + qos_class, 'pod%s' % pod_id) + memory_path = os.path.join(memory_base_path, hostname) + cpu_path = os.path.join(cpu_base_path, hostname) + + if os.path.isdir(memory_base_path): + if not os.path.isdir(memory_path): + os.makedirs(memory_path) + + if os.path.isdir(cpu_base_path): + if not os.path.isdir(cpu_path): + os.makedirs(cpu_path) + + try: + # set cpu and memory resource limits + with open('%s/memory.limit_in_bytes' % memory_path, 'w') as f: + f.write('%d' % int(memory_size)) + with open('%s/cpu.cfs_period_us' % cpu_path, 'w') as f: + f.write('%d' % PERIOD) + with open('%s/cpu.cfs_quota_us' % cpu_path, 'w') as f: + f.write('%d' % ((int(cpu)*PERIOD/1000))) + + # add pid to 'tasks' files + with open('%s/tasks' % memory_path, 'w') as f: + f.write('%d' % pid) + with open('%s/tasks' % cpu_path, 'w') as f: + f.write('%d' % pid) + except Exception as e: + return make_response("Failed to modify cgroup files: %s" + % str(e), 500) + + +@app.route('/cglimit', methods=['POST']) +def cglimit(): + """Set resource limitations for execution. + + Only root user has jurisdiction to modify all cgroup files. + + :param cpu: cpu resource that execution can use in total. + :param memory_size: RAM resource that execution can use in total. + + Currently swap ought to be disabled in kubernetes. + """ + params = request.get_json() + cpu = params['cpu'] + memory_size = params['memory_size'] + pid = params['pid'] + log("Set resource limits request received, params: %s" % params) + + resp = _cgroup_limit(cpu, memory_size, pid) + + return resp if resp else 'pidlimited' diff --git a/runtimes/python2/custom-entrypoint.sh b/runtimes/python2/custom-entrypoint.sh new file mode 100644 index 00000000..fa310a39 --- /dev/null +++ b/runtimes/python2/custom-entrypoint.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash +# This is expected to run as root. + +uwsgi --http :9090 --uid qinling --wsgi-file server.py --callable app --master --processes 5 --threads 1 & + +uwsgi --http :9092 --uid root --wsgi-file cglimit.py --callable app --master --processes 1 --threads 1 diff --git a/runtimes/python2/requirements.txt b/runtimes/python2/requirements.txt index 32dddd28..d1e6381a 100644 --- a/runtimes/python2/requirements.txt +++ b/runtimes/python2/requirements.txt @@ -8,3 +8,4 @@ python-octaviaclient>=1.0.0 # Apache-2.0 python-mistralclient>=3.1.0 # Apache-2.0 keystoneauth1>=2.21.0 # Apache-2.0 openstacksdk>=0.9.19 +oslo.concurrency>=3.25.0 # Apache-2.0 diff --git a/runtimes/python2/server.py b/runtimes/python2/server.py index bc230d1f..a1b5eceb 100644 --- a/runtimes/python2/server.py +++ b/runtimes/python2/server.py @@ -73,11 +73,35 @@ def _get_responce(output, duration, logs, success, code): def _invoke_function(execution_id, zip_file, module_name, method, arg, input, - return_dict): - """Thie function is supposed to be running in a child process.""" + return_dict, rlimit): + """Thie function is supposed to be running in a child process. + + HOSTNAME will be used to create cgroup directory related to worker. + + Current execution pid will be added to cgroup tasks file, and then all + its child processes will be automatically added to this 'cgroup'. + + Once executions exceed the cgroup limit, they will be killed by OOMKill + and this subprocess will exit with number(-9). + """ # Set resource limit for current sub-process _set_ulimit() + # Set cpu and memory limits to cgroup by calling cglimit service + pid = os.getpid() + root_resp = requests.post( + 'http://localhost:9092/cglimit', + json={ + 'cpu': rlimit['cpu'], + 'memory_size': rlimit['memory_size'], + 'pid': pid + } + ) + if not root_resp.ok: + return_dict['result'] = root_resp.content + return_dict['success'] = False + sys.exit(0) + sys.path.insert(0, zip_file) sys.stdout = open("%s.out" % execution_id, "w", 0) @@ -123,6 +147,10 @@ def execute(): username = params.get('username') password = params.get('password') zip_file = '/var/qinling/packages/%s.zip' % function_id + rlimit = { + 'cpu': params['cpu'], + 'memory_size': params['memory_size'] + } function_module, function_method = 'main', 'main' if entry: @@ -182,7 +210,7 @@ def execute(): p = Process( target=_invoke_function, args=(execution_id, zip_file, function_module, function_method, - input.pop('__function_input', None), input, return_dict) + input.pop('__function_input', None), input, return_dict, rlimit) ) p.start() p.join()