From e886447d29d81ca17db513dfce46e1784641b97e Mon Sep 17 00:00:00 2001 From: Jiangyuan Date: Sun, 13 May 2018 14:22:51 +0800 Subject: [PATCH] Runtime implementation for non-image type function. Using 'cgroup' in runtime server to limit cpu and memory resources. Each pod in deployment will has respective 'cgroup' directory on host. Under '/sys/fs/cgroup/xx/kubepods//pod' directory, use HOSTNAME to create a new cgroup folder, which will only be used to limit function execution. Then we do not need to care about how and when to delete this new folder on host. Use 'openstackqinling/python-runtime:0.0.3' as the new python runtime image. Story: 2001586 Task: 14415 Change-Id: Id04a72c4f4a3c559dc7c746688b13ef93656d125 --- devstack/settings | 2 +- qinling/engine/default_engine.py | 3 +- qinling/engine/utils.py | 6 +- qinling/orchestrator/kubernetes/manager.py | 10 +- .../kubernetes/templates/deployment.j2 | 12 ++ .../tests/unit/engine/test_default_engine.py | 8 +- .../orchestrator/kubernetes/test_manager.py | 7 +- qinling_tempest_plugin/config.py | 2 +- runtimes/python2/Dockerfile | 8 +- runtimes/python2/cglimit.py | 124 ++++++++++++++++++ runtimes/python2/custom-entrypoint.sh | 6 + runtimes/python2/requirements.txt | 1 + runtimes/python2/server.py | 34 ++++- 13 files changed, 202 insertions(+), 21 deletions(-) create mode 100644 runtimes/python2/cglimit.py create mode 100644 runtimes/python2/custom-entrypoint.sh diff --git a/devstack/settings b/devstack/settings index 821a97f0..b991fa35 100644 --- a/devstack/settings +++ b/devstack/settings @@ -22,7 +22,7 @@ QINLING_CONF_FILE=${QINLING_CONF_DIR}/qinling.conf QINLING_POLICY_FILE=${QINLING_CONF_DIR}/policy.json QINLING_AUTH_CACHE_DIR=${QINLING_AUTH_CACHE_DIR:-/var/cache/qinling} QINLING_FUNCTION_STORAGE_DIR=${QINLING_FUNCTION_STORAGE_DIR:-/opt/qinling/funtion/packages} -QINLING_PYTHON_RUNTIME_IMAGE=${QINLING_PYTHON_RUNTIME_IMAGE:-openstackqinling/python-runtime:0.0.2} +QINLING_PYTHON_RUNTIME_IMAGE=${QINLING_PYTHON_RUNTIME_IMAGE:-openstackqinling/python-runtime:0.0.3} QINLING_NODEJS_RUNTIME_IMAGE=${QINLING_NODEJS_RUNTIME_IMAGE:-openstackqinling/nodejs-runtime:0.0.1} QINLING_SIDECAR_IMAGE=${QINLING_SIDECAR_IMAGE:-openstackqinling/sidecar:0.0.1} diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index ba9d00ef..75a6dc91 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -160,7 +160,7 @@ class DefaultEngine(object): data = utils.get_request_data( CONF, function_id, function_version, execution_id, - input, function.entry, function.trust_id, + rlimit, input, function.entry, function.trust_id, self.qinling_endpoint ) success, res = utils.url_request( @@ -206,6 +206,7 @@ class DefaultEngine(object): execution_id, function_id, function_version, + rlimit=rlimit if svc_url else None, input=input, identifier=identifier, service_url=svc_url, diff --git a/qinling/engine/utils.py b/qinling/engine/utils.py index e1ef85fb..7db0fb3b 100644 --- a/qinling/engine/utils.py +++ b/qinling/engine/utils.py @@ -74,8 +74,8 @@ def url_request(request_session, url, body=None): return False, {'error': 'Internal service error.'} -def get_request_data(conf, function_id, version, execution_id, input, entry, - trust_id, qinling_endpoint): +def get_request_data(conf, function_id, version, execution_id, rlimit, input, + entry, trust_id, qinling_endpoint): """Prepare the request body should send to the worker.""" ctx = context.get_ctx() @@ -94,6 +94,8 @@ def get_request_data(conf, function_id, version, execution_id, input, entry, data = { 'execution_id': execution_id, + 'cpu': rlimit['cpu'], + 'memory_size': rlimit['memory_size'], 'input': input, 'function_id': function_id, 'function_version': version, diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index 392a5ebb..65ffc5c9 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -421,9 +421,9 @@ class KubernetesManager(base.OrchestratorBase): self.delete_function(function_id, version, labels) raise exc.OrchestratorException('Execution preparation failed.') - def run_execution(self, execution_id, function_id, version, input=None, - identifier=None, service_url=None, entry='main.main', - trust_id=None): + def run_execution(self, execution_id, function_id, version, rlimit=None, + input=None, identifier=None, service_url=None, + entry='main.main', trust_id=None): """Run execution. Return a tuple including the result and the output. @@ -431,8 +431,8 @@ class KubernetesManager(base.OrchestratorBase): if service_url: func_url = '%s/execute' % service_url data = utils.get_request_data( - self.conf, function_id, version, execution_id, input, entry, - trust_id, self.qinling_endpoint + self.conf, function_id, version, execution_id, rlimit, input, + entry, trust_id, self.qinling_endpoint ) LOG.debug( 'Invoke function %s(version %s), url: %s, data: %s', diff --git a/qinling/orchestrator/kubernetes/templates/deployment.j2 b/qinling/orchestrator/kubernetes/templates/deployment.j2 index 17822d96..bcb2a7b8 100644 --- a/qinling/orchestrator/kubernetes/templates/deployment.j2 +++ b/qinling/orchestrator/kubernetes/templates/deployment.j2 @@ -25,6 +25,9 @@ spec: volumes: - name: package-folder emptyDir: {} + - name: cgroup-folder + hostPath: + path: /sys/fs/cgroup containers: - name: {{ container_name }} image: {{ image }} @@ -34,6 +37,15 @@ spec: volumeMounts: - name: package-folder mountPath: /var/qinling/packages + - name: cgroup-folder + mountPath: /qinling_cgroup + env: + - name: POD_UID + valueFrom: + fieldRef: + fieldPath: metadata.uid + - name: QOS_CLASS + value: "BestEffort" - name: sidecar image: {{ sidecar_image }} imagePullPolicy: IfNotPresent diff --git a/qinling/tests/unit/engine/test_default_engine.py b/qinling/tests/unit/engine/test_default_engine.py index c23b9935..22ddb841 100644 --- a/qinling/tests/unit/engine/test_default_engine.py +++ b/qinling/tests/unit/engine/test_default_engine.py @@ -261,6 +261,7 @@ class TestDefaultEngine(base.DbTestCase): mock.call(execution_1_id, function_id, 0, + rlimit=None, input=None, identifier=mock.ANY, service_url=None, @@ -269,6 +270,7 @@ class TestDefaultEngine(base.DbTestCase): mock.call(execution_2_id, function_id, 0, + rlimit=None, input='input', identifier=mock.ANY, service_url=None, @@ -357,8 +359,8 @@ class TestDefaultEngine(base.DbTestCase): identifier=runtime_id, labels={'runtime_id': runtime_id}, input=None) self.orchestrator.run_execution.assert_called_once_with( - execution_id, function_id, 0, input=None, identifier=runtime_id, - service_url='svc_url', entry=function.entry, + execution_id, function_id, 0, rlimit=self.rlimit, input=None, + identifier=runtime_id, service_url='svc_url', entry=function.entry, trust_id=function.trust_id) execution = db_api.get_execution(execution_id) @@ -419,7 +421,7 @@ class TestDefaultEngine(base.DbTestCase): function_id, 0, runtime_id) etcd_util_get_service_url_mock.assert_called_once_with(function_id, 0) engine_utils_get_request_data_mock.assert_called_once_with( - mock.ANY, function_id, 0, execution_id, + mock.ANY, function_id, 0, execution_id, self.rlimit, 'input', function.entry, function.trust_id, self.qinling_endpoint) engine_utils_url_request_mock.assert_called_once_with( diff --git a/qinling/tests/unit/orchestrator/kubernetes/test_manager.py b/qinling/tests/unit/orchestrator/kubernetes/test_manager.py index 22160647..77c41162 100644 --- a/qinling/tests/unit/orchestrator/kubernetes/test_manager.py +++ b/qinling/tests/unit/orchestrator/kubernetes/test_manager.py @@ -1,4 +1,4 @@ -# Copyright 2018 AWCloud Software Co., Ltd. +# Copyright 2018 AWCloud Software Co., Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -601,13 +601,16 @@ class TestKubernetesManager(base.DbTestCase): function_id = common.generate_unicode_uuid() result, output = self.manager.run_execution( - execution_id, function_id, 0, service_url='FAKE_URL' + execution_id, function_id, 0, rlimit=self.rlimit, + service_url='FAKE_URL' ) download_url = ('http://127.0.0.1:7070/v1/functions/%s?download=true' % function_id) data = { 'execution_id': execution_id, + 'cpu': self.rlimit['cpu'], + 'memory_size': self.rlimit['memory_size'], 'input': None, 'function_id': function_id, 'function_version': 0, diff --git a/qinling_tempest_plugin/config.py b/qinling_tempest_plugin/config.py index f7e8b364..13ae87ee 100644 --- a/qinling_tempest_plugin/config.py +++ b/qinling_tempest_plugin/config.py @@ -41,7 +41,7 @@ QinlingGroup = [ 'publicURL', 'adminURL', 'internalURL'], help="The endpoint type to use for the qinling service."), cfg.StrOpt("python_runtime_image", - default="openstackqinling/python-runtime:0.0.2", + default="openstackqinling/python-runtime:0.0.3", help="The Python runtime being used in the tests."), cfg.StrOpt("nodejs_runtime_image", default="openstackqinling/nodejs-runtime:0.0.1", diff --git a/runtimes/python2/Dockerfile b/runtimes/python2/Dockerfile index 20a6faea..7620169a 100644 --- a/runtimes/python2/Dockerfile +++ b/runtimes/python2/Dockerfile @@ -1,7 +1,7 @@ FROM phusion/baseimage:0.9.22 MAINTAINER anlin.kong@gmail.com -# We need to use non-root user because root user is not affected by ulimit. +# We need to use non-root user to execute functions and root user to set resource limits. USER root RUN useradd -Ms /bin/bash qinling @@ -12,8 +12,10 @@ RUN apt-get update && \ COPY . /app WORKDIR /app RUN pip install --no-cache-dir -r requirements.txt && \ + chmod 0750 custom-entrypoint.sh && \ + mkdir /qinling_cgroup && \ + mkdir -p /var/lock/qinling && \ mkdir -p /var/qinling/packages && \ chown -R qinling:qinling /app /var/qinling/packages -# uwsgi --http :9090 --uid qinling --wsgi-file server.py --callable app --master --processes 5 --threads 1 -CMD ["/usr/local/bin/uwsgi", "--http", ":9090", "--uid", "qinling", "--wsgi-file", "server.py", "--callable", "app", "--master", "--processes", "5", "--threads", "1"] +CMD ["/bin/bash", "custom-entrypoint.sh"] diff --git a/runtimes/python2/cglimit.py b/runtimes/python2/cglimit.py new file mode 100644 index 00000000..0b63ac85 --- /dev/null +++ b/runtimes/python2/cglimit.py @@ -0,0 +1,124 @@ +# Copyright 2018 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import sys + +from flask import Flask +from flask import make_response +from flask import request +from oslo_concurrency import lockutils + +app = Flask(__name__) +ch = logging.StreamHandler(sys.stdout) +ch.setLevel(logging.DEBUG) +ch.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) +del app.logger.handlers[:] +app.logger.addHandler(ch) + +# Deployer can specify cfs_period_us default value here. +PERIOD = 100000 + + +def log(message, level="info"): + global app + log_func = getattr(app.logger, level) + log_func(message) + + +@lockutils.synchronized('set_limitation', external=True, + lock_path='/var/lock/qinling') +def _cgroup_limit(cpu, memory_size, pid): + """Modify 'cgroup' files to set resource limits. + + Each pod(worker) will have cgroup folders on the host cgroup filesystem, + like '/sys/fs/cgroup//kubepods//pod/', + to limit memory and cpu resources that can be used in pod. + + For more information about cgroup, please see [1], about sharing PID + namespaces in kubernetes, please see also [2]. + + Return None if successful otherwise a Flask.Response object. + + [1]https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/6/html/resource_management_guide/sec-creating_cgroups + [2]https://github.com/kubernetes/kubernetes/pull/51634 + """ + hostname = os.getenv('HOSTNAME') + pod_id = os.getenv('POD_UID') + qos_class = None + if os.getenv('QOS_CLASS') == 'BestEffort': + qos_class = 'besteffort' + elif os.getenv('QOS_CLASS') == 'Burstable': + qos_class = 'burstable' + elif os.getenv('QOS_CLASS') == 'Guaranteed': + qos_class = '' + + if not pod_id or qos_class is None: + return make_response("Failed to get current worker information", 500) + + memory_base_path = os.path.join('/qinling_cgroup', 'memory', 'kubepods', + qos_class, 'pod%s' % pod_id) + cpu_base_path = os.path.join('/qinling_cgroup', 'cpu', 'kubepods', + qos_class, 'pod%s' % pod_id) + memory_path = os.path.join(memory_base_path, hostname) + cpu_path = os.path.join(cpu_base_path, hostname) + + if os.path.isdir(memory_base_path): + if not os.path.isdir(memory_path): + os.makedirs(memory_path) + + if os.path.isdir(cpu_base_path): + if not os.path.isdir(cpu_path): + os.makedirs(cpu_path) + + try: + # set cpu and memory resource limits + with open('%s/memory.limit_in_bytes' % memory_path, 'w') as f: + f.write('%d' % int(memory_size)) + with open('%s/cpu.cfs_period_us' % cpu_path, 'w') as f: + f.write('%d' % PERIOD) + with open('%s/cpu.cfs_quota_us' % cpu_path, 'w') as f: + f.write('%d' % ((int(cpu)*PERIOD/1000))) + + # add pid to 'tasks' files + with open('%s/tasks' % memory_path, 'w') as f: + f.write('%d' % pid) + with open('%s/tasks' % cpu_path, 'w') as f: + f.write('%d' % pid) + except Exception as e: + return make_response("Failed to modify cgroup files: %s" + % str(e), 500) + + +@app.route('/cglimit', methods=['POST']) +def cglimit(): + """Set resource limitations for execution. + + Only root user has jurisdiction to modify all cgroup files. + + :param cpu: cpu resource that execution can use in total. + :param memory_size: RAM resource that execution can use in total. + + Currently swap ought to be disabled in kubernetes. + """ + params = request.get_json() + cpu = params['cpu'] + memory_size = params['memory_size'] + pid = params['pid'] + log("Set resource limits request received, params: %s" % params) + + resp = _cgroup_limit(cpu, memory_size, pid) + + return resp if resp else 'pidlimited' diff --git a/runtimes/python2/custom-entrypoint.sh b/runtimes/python2/custom-entrypoint.sh new file mode 100644 index 00000000..fa310a39 --- /dev/null +++ b/runtimes/python2/custom-entrypoint.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash +# This is expected to run as root. + +uwsgi --http :9090 --uid qinling --wsgi-file server.py --callable app --master --processes 5 --threads 1 & + +uwsgi --http :9092 --uid root --wsgi-file cglimit.py --callable app --master --processes 1 --threads 1 diff --git a/runtimes/python2/requirements.txt b/runtimes/python2/requirements.txt index 32dddd28..d1e6381a 100644 --- a/runtimes/python2/requirements.txt +++ b/runtimes/python2/requirements.txt @@ -8,3 +8,4 @@ python-octaviaclient>=1.0.0 # Apache-2.0 python-mistralclient>=3.1.0 # Apache-2.0 keystoneauth1>=2.21.0 # Apache-2.0 openstacksdk>=0.9.19 +oslo.concurrency>=3.25.0 # Apache-2.0 diff --git a/runtimes/python2/server.py b/runtimes/python2/server.py index bc230d1f..a1b5eceb 100644 --- a/runtimes/python2/server.py +++ b/runtimes/python2/server.py @@ -73,11 +73,35 @@ def _get_responce(output, duration, logs, success, code): def _invoke_function(execution_id, zip_file, module_name, method, arg, input, - return_dict): - """Thie function is supposed to be running in a child process.""" + return_dict, rlimit): + """Thie function is supposed to be running in a child process. + + HOSTNAME will be used to create cgroup directory related to worker. + + Current execution pid will be added to cgroup tasks file, and then all + its child processes will be automatically added to this 'cgroup'. + + Once executions exceed the cgroup limit, they will be killed by OOMKill + and this subprocess will exit with number(-9). + """ # Set resource limit for current sub-process _set_ulimit() + # Set cpu and memory limits to cgroup by calling cglimit service + pid = os.getpid() + root_resp = requests.post( + 'http://localhost:9092/cglimit', + json={ + 'cpu': rlimit['cpu'], + 'memory_size': rlimit['memory_size'], + 'pid': pid + } + ) + if not root_resp.ok: + return_dict['result'] = root_resp.content + return_dict['success'] = False + sys.exit(0) + sys.path.insert(0, zip_file) sys.stdout = open("%s.out" % execution_id, "w", 0) @@ -123,6 +147,10 @@ def execute(): username = params.get('username') password = params.get('password') zip_file = '/var/qinling/packages/%s.zip' % function_id + rlimit = { + 'cpu': params['cpu'], + 'memory_size': params['memory_size'] + } function_module, function_method = 'main', 'main' if entry: @@ -182,7 +210,7 @@ def execute(): p = Process( target=_invoke_function, args=(execution_id, zip_file, function_module, function_method, - input.pop('__function_input', None), input, return_dict) + input.pop('__function_input', None), input, return_dict, rlimit) ) p.start() p.join()