Merge "Runtime implementation for non-image type function."

This commit is contained in:
Zuul 2018-06-07 05:42:50 +00:00 committed by Gerrit Code Review
commit 1daf7ea48a
13 changed files with 202 additions and 21 deletions

View File

@ -22,7 +22,7 @@ QINLING_CONF_FILE=${QINLING_CONF_DIR}/qinling.conf
QINLING_POLICY_FILE=${QINLING_CONF_DIR}/policy.json QINLING_POLICY_FILE=${QINLING_CONF_DIR}/policy.json
QINLING_AUTH_CACHE_DIR=${QINLING_AUTH_CACHE_DIR:-/var/cache/qinling} QINLING_AUTH_CACHE_DIR=${QINLING_AUTH_CACHE_DIR:-/var/cache/qinling}
QINLING_FUNCTION_STORAGE_DIR=${QINLING_FUNCTION_STORAGE_DIR:-/opt/qinling/funtion/packages} QINLING_FUNCTION_STORAGE_DIR=${QINLING_FUNCTION_STORAGE_DIR:-/opt/qinling/funtion/packages}
QINLING_PYTHON_RUNTIME_IMAGE=${QINLING_PYTHON_RUNTIME_IMAGE:-openstackqinling/python-runtime:0.0.2} QINLING_PYTHON_RUNTIME_IMAGE=${QINLING_PYTHON_RUNTIME_IMAGE:-openstackqinling/python-runtime:0.0.3}
QINLING_NODEJS_RUNTIME_IMAGE=${QINLING_NODEJS_RUNTIME_IMAGE:-openstackqinling/nodejs-runtime:0.0.1} QINLING_NODEJS_RUNTIME_IMAGE=${QINLING_NODEJS_RUNTIME_IMAGE:-openstackqinling/nodejs-runtime:0.0.1}
QINLING_SIDECAR_IMAGE=${QINLING_SIDECAR_IMAGE:-openstackqinling/sidecar:0.0.1} QINLING_SIDECAR_IMAGE=${QINLING_SIDECAR_IMAGE:-openstackqinling/sidecar:0.0.1}

View File

@ -160,7 +160,7 @@ class DefaultEngine(object):
data = utils.get_request_data( data = utils.get_request_data(
CONF, function_id, function_version, execution_id, CONF, function_id, function_version, execution_id,
input, function.entry, function.trust_id, rlimit, input, function.entry, function.trust_id,
self.qinling_endpoint self.qinling_endpoint
) )
success, res = utils.url_request( success, res = utils.url_request(
@ -206,6 +206,7 @@ class DefaultEngine(object):
execution_id, execution_id,
function_id, function_id,
function_version, function_version,
rlimit=rlimit if svc_url else None,
input=input, input=input,
identifier=identifier, identifier=identifier,
service_url=svc_url, service_url=svc_url,

View File

@ -74,8 +74,8 @@ def url_request(request_session, url, body=None):
return False, {'error': 'Internal service error.'} return False, {'error': 'Internal service error.'}
def get_request_data(conf, function_id, version, execution_id, input, entry, def get_request_data(conf, function_id, version, execution_id, rlimit, input,
trust_id, qinling_endpoint): entry, trust_id, qinling_endpoint):
"""Prepare the request body should send to the worker.""" """Prepare the request body should send to the worker."""
ctx = context.get_ctx() ctx = context.get_ctx()
@ -94,6 +94,8 @@ def get_request_data(conf, function_id, version, execution_id, input, entry,
data = { data = {
'execution_id': execution_id, 'execution_id': execution_id,
'cpu': rlimit['cpu'],
'memory_size': rlimit['memory_size'],
'input': input, 'input': input,
'function_id': function_id, 'function_id': function_id,
'function_version': version, 'function_version': version,

View File

@ -421,9 +421,9 @@ class KubernetesManager(base.OrchestratorBase):
self.delete_function(function_id, version, labels) self.delete_function(function_id, version, labels)
raise exc.OrchestratorException('Execution preparation failed.') raise exc.OrchestratorException('Execution preparation failed.')
def run_execution(self, execution_id, function_id, version, input=None, def run_execution(self, execution_id, function_id, version, rlimit=None,
identifier=None, service_url=None, entry='main.main', input=None, identifier=None, service_url=None,
trust_id=None): entry='main.main', trust_id=None):
"""Run execution. """Run execution.
Return a tuple including the result and the output. Return a tuple including the result and the output.
@ -431,8 +431,8 @@ class KubernetesManager(base.OrchestratorBase):
if service_url: if service_url:
func_url = '%s/execute' % service_url func_url = '%s/execute' % service_url
data = utils.get_request_data( data = utils.get_request_data(
self.conf, function_id, version, execution_id, input, entry, self.conf, function_id, version, execution_id, rlimit, input,
trust_id, self.qinling_endpoint entry, trust_id, self.qinling_endpoint
) )
LOG.debug( LOG.debug(
'Invoke function %s(version %s), url: %s, data: %s', 'Invoke function %s(version %s), url: %s, data: %s',

View File

@ -25,6 +25,9 @@ spec:
volumes: volumes:
- name: package-folder - name: package-folder
emptyDir: {} emptyDir: {}
- name: cgroup-folder
hostPath:
path: /sys/fs/cgroup
containers: containers:
- name: {{ container_name }} - name: {{ container_name }}
image: {{ image }} image: {{ image }}
@ -34,6 +37,15 @@ spec:
volumeMounts: volumeMounts:
- name: package-folder - name: package-folder
mountPath: /var/qinling/packages mountPath: /var/qinling/packages
- name: cgroup-folder
mountPath: /qinling_cgroup
env:
- name: POD_UID
valueFrom:
fieldRef:
fieldPath: metadata.uid
- name: QOS_CLASS
value: "BestEffort"
- name: sidecar - name: sidecar
image: {{ sidecar_image }} image: {{ sidecar_image }}
imagePullPolicy: IfNotPresent imagePullPolicy: IfNotPresent

View File

@ -261,6 +261,7 @@ class TestDefaultEngine(base.DbTestCase):
mock.call(execution_1_id, mock.call(execution_1_id,
function_id, function_id,
0, 0,
rlimit=None,
input=None, input=None,
identifier=mock.ANY, identifier=mock.ANY,
service_url=None, service_url=None,
@ -269,6 +270,7 @@ class TestDefaultEngine(base.DbTestCase):
mock.call(execution_2_id, mock.call(execution_2_id,
function_id, function_id,
0, 0,
rlimit=None,
input='input', input='input',
identifier=mock.ANY, identifier=mock.ANY,
service_url=None, service_url=None,
@ -357,8 +359,8 @@ class TestDefaultEngine(base.DbTestCase):
identifier=runtime_id, labels={'runtime_id': runtime_id}, identifier=runtime_id, labels={'runtime_id': runtime_id},
input=None) input=None)
self.orchestrator.run_execution.assert_called_once_with( self.orchestrator.run_execution.assert_called_once_with(
execution_id, function_id, 0, input=None, identifier=runtime_id, execution_id, function_id, 0, rlimit=self.rlimit, input=None,
service_url='svc_url', entry=function.entry, identifier=runtime_id, service_url='svc_url', entry=function.entry,
trust_id=function.trust_id) trust_id=function.trust_id)
execution = db_api.get_execution(execution_id) execution = db_api.get_execution(execution_id)
@ -419,7 +421,7 @@ class TestDefaultEngine(base.DbTestCase):
function_id, 0, runtime_id) function_id, 0, runtime_id)
etcd_util_get_service_url_mock.assert_called_once_with(function_id, 0) etcd_util_get_service_url_mock.assert_called_once_with(function_id, 0)
engine_utils_get_request_data_mock.assert_called_once_with( engine_utils_get_request_data_mock.assert_called_once_with(
mock.ANY, function_id, 0, execution_id, mock.ANY, function_id, 0, execution_id, self.rlimit,
'input', function.entry, function.trust_id, 'input', function.entry, function.trust_id,
self.qinling_endpoint) self.qinling_endpoint)
engine_utils_url_request_mock.assert_called_once_with( engine_utils_url_request_mock.assert_called_once_with(

View File

@ -1,4 +1,4 @@
# Copyright 2018 AWCloud Software Co., Ltd. # Copyright 2018 AWCloud Software Co., Ltd
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -601,13 +601,16 @@ class TestKubernetesManager(base.DbTestCase):
function_id = common.generate_unicode_uuid() function_id = common.generate_unicode_uuid()
result, output = self.manager.run_execution( result, output = self.manager.run_execution(
execution_id, function_id, 0, service_url='FAKE_URL' execution_id, function_id, 0, rlimit=self.rlimit,
service_url='FAKE_URL'
) )
download_url = ('http://127.0.0.1:7070/v1/functions/%s?download=true' download_url = ('http://127.0.0.1:7070/v1/functions/%s?download=true'
% function_id) % function_id)
data = { data = {
'execution_id': execution_id, 'execution_id': execution_id,
'cpu': self.rlimit['cpu'],
'memory_size': self.rlimit['memory_size'],
'input': None, 'input': None,
'function_id': function_id, 'function_id': function_id,
'function_version': 0, 'function_version': 0,

View File

@ -41,7 +41,7 @@ QinlingGroup = [
'publicURL', 'adminURL', 'internalURL'], 'publicURL', 'adminURL', 'internalURL'],
help="The endpoint type to use for the qinling service."), help="The endpoint type to use for the qinling service."),
cfg.StrOpt("python_runtime_image", cfg.StrOpt("python_runtime_image",
default="openstackqinling/python-runtime:0.0.2", default="openstackqinling/python-runtime:0.0.3",
help="The Python runtime being used in the tests."), help="The Python runtime being used in the tests."),
cfg.StrOpt("nodejs_runtime_image", cfg.StrOpt("nodejs_runtime_image",
default="openstackqinling/nodejs-runtime:0.0.1", default="openstackqinling/nodejs-runtime:0.0.1",

View File

@ -1,7 +1,7 @@
FROM phusion/baseimage:0.9.22 FROM phusion/baseimage:0.9.22
MAINTAINER anlin.kong@gmail.com MAINTAINER anlin.kong@gmail.com
# We need to use non-root user because root user is not affected by ulimit. # We need to use non-root user to execute functions and root user to set resource limits.
USER root USER root
RUN useradd -Ms /bin/bash qinling RUN useradd -Ms /bin/bash qinling
@ -12,8 +12,10 @@ RUN apt-get update && \
COPY . /app COPY . /app
WORKDIR /app WORKDIR /app
RUN pip install --no-cache-dir -r requirements.txt && \ RUN pip install --no-cache-dir -r requirements.txt && \
chmod 0750 custom-entrypoint.sh && \
mkdir /qinling_cgroup && \
mkdir -p /var/lock/qinling && \
mkdir -p /var/qinling/packages && \ mkdir -p /var/qinling/packages && \
chown -R qinling:qinling /app /var/qinling/packages chown -R qinling:qinling /app /var/qinling/packages
# uwsgi --http :9090 --uid qinling --wsgi-file server.py --callable app --master --processes 5 --threads 1 CMD ["/bin/bash", "custom-entrypoint.sh"]
CMD ["/usr/local/bin/uwsgi", "--http", ":9090", "--uid", "qinling", "--wsgi-file", "server.py", "--callable", "app", "--master", "--processes", "5", "--threads", "1"]

124
runtimes/python2/cglimit.py Normal file
View File

@ -0,0 +1,124 @@
# Copyright 2018 Catalyst IT Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import sys
from flask import Flask
from flask import make_response
from flask import request
from oslo_concurrency import lockutils
app = Flask(__name__)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG)
ch.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
del app.logger.handlers[:]
app.logger.addHandler(ch)
# Deployer can specify cfs_period_us default value here.
PERIOD = 100000
def log(message, level="info"):
global app
log_func = getattr(app.logger, level)
log_func(message)
@lockutils.synchronized('set_limitation', external=True,
lock_path='/var/lock/qinling')
def _cgroup_limit(cpu, memory_size, pid):
"""Modify 'cgroup' files to set resource limits.
Each pod(worker) will have cgroup folders on the host cgroup filesystem,
like '/sys/fs/cgroup/<resource_type>/kubepods/<qos_class>/pod<pod_id>/',
to limit memory and cpu resources that can be used in pod.
For more information about cgroup, please see [1], about sharing PID
namespaces in kubernetes, please see also [2].
Return None if successful otherwise a Flask.Response object.
[1]https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/6/html/resource_management_guide/sec-creating_cgroups
[2]https://github.com/kubernetes/kubernetes/pull/51634
"""
hostname = os.getenv('HOSTNAME')
pod_id = os.getenv('POD_UID')
qos_class = None
if os.getenv('QOS_CLASS') == 'BestEffort':
qos_class = 'besteffort'
elif os.getenv('QOS_CLASS') == 'Burstable':
qos_class = 'burstable'
elif os.getenv('QOS_CLASS') == 'Guaranteed':
qos_class = ''
if not pod_id or qos_class is None:
return make_response("Failed to get current worker information", 500)
memory_base_path = os.path.join('/qinling_cgroup', 'memory', 'kubepods',
qos_class, 'pod%s' % pod_id)
cpu_base_path = os.path.join('/qinling_cgroup', 'cpu', 'kubepods',
qos_class, 'pod%s' % pod_id)
memory_path = os.path.join(memory_base_path, hostname)
cpu_path = os.path.join(cpu_base_path, hostname)
if os.path.isdir(memory_base_path):
if not os.path.isdir(memory_path):
os.makedirs(memory_path)
if os.path.isdir(cpu_base_path):
if not os.path.isdir(cpu_path):
os.makedirs(cpu_path)
try:
# set cpu and memory resource limits
with open('%s/memory.limit_in_bytes' % memory_path, 'w') as f:
f.write('%d' % int(memory_size))
with open('%s/cpu.cfs_period_us' % cpu_path, 'w') as f:
f.write('%d' % PERIOD)
with open('%s/cpu.cfs_quota_us' % cpu_path, 'w') as f:
f.write('%d' % ((int(cpu)*PERIOD/1000)))
# add pid to 'tasks' files
with open('%s/tasks' % memory_path, 'w') as f:
f.write('%d' % pid)
with open('%s/tasks' % cpu_path, 'w') as f:
f.write('%d' % pid)
except Exception as e:
return make_response("Failed to modify cgroup files: %s"
% str(e), 500)
@app.route('/cglimit', methods=['POST'])
def cglimit():
"""Set resource limitations for execution.
Only root user has jurisdiction to modify all cgroup files.
:param cpu: cpu resource that execution can use in total.
:param memory_size: RAM resource that execution can use in total.
Currently swap ought to be disabled in kubernetes.
"""
params = request.get_json()
cpu = params['cpu']
memory_size = params['memory_size']
pid = params['pid']
log("Set resource limits request received, params: %s" % params)
resp = _cgroup_limit(cpu, memory_size, pid)
return resp if resp else 'pidlimited'

View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
# This is expected to run as root.
uwsgi --http :9090 --uid qinling --wsgi-file server.py --callable app --master --processes 5 --threads 1 &
uwsgi --http :9092 --uid root --wsgi-file cglimit.py --callable app --master --processes 1 --threads 1

View File

@ -8,3 +8,4 @@ python-octaviaclient>=1.0.0 # Apache-2.0
python-mistralclient>=3.1.0 # Apache-2.0 python-mistralclient>=3.1.0 # Apache-2.0
keystoneauth1>=2.21.0 # Apache-2.0 keystoneauth1>=2.21.0 # Apache-2.0
openstacksdk>=0.9.19 openstacksdk>=0.9.19
oslo.concurrency>=3.25.0 # Apache-2.0

View File

@ -73,11 +73,35 @@ def _get_responce(output, duration, logs, success, code):
def _invoke_function(execution_id, zip_file, module_name, method, arg, input, def _invoke_function(execution_id, zip_file, module_name, method, arg, input,
return_dict): return_dict, rlimit):
"""Thie function is supposed to be running in a child process.""" """Thie function is supposed to be running in a child process.
HOSTNAME will be used to create cgroup directory related to worker.
Current execution pid will be added to cgroup tasks file, and then all
its child processes will be automatically added to this 'cgroup'.
Once executions exceed the cgroup limit, they will be killed by OOMKill
and this subprocess will exit with number(-9).
"""
# Set resource limit for current sub-process # Set resource limit for current sub-process
_set_ulimit() _set_ulimit()
# Set cpu and memory limits to cgroup by calling cglimit service
pid = os.getpid()
root_resp = requests.post(
'http://localhost:9092/cglimit',
json={
'cpu': rlimit['cpu'],
'memory_size': rlimit['memory_size'],
'pid': pid
}
)
if not root_resp.ok:
return_dict['result'] = root_resp.content
return_dict['success'] = False
sys.exit(0)
sys.path.insert(0, zip_file) sys.path.insert(0, zip_file)
sys.stdout = open("%s.out" % execution_id, "w", 0) sys.stdout = open("%s.out" % execution_id, "w", 0)
@ -123,6 +147,10 @@ def execute():
username = params.get('username') username = params.get('username')
password = params.get('password') password = params.get('password')
zip_file = '/var/qinling/packages/%s.zip' % function_id zip_file = '/var/qinling/packages/%s.zip' % function_id
rlimit = {
'cpu': params['cpu'],
'memory_size': params['memory_size']
}
function_module, function_method = 'main', 'main' function_module, function_method = 'main', 'main'
if entry: if entry:
@ -182,7 +210,7 @@ def execute():
p = Process( p = Process(
target=_invoke_function, target=_invoke_function,
args=(execution_id, zip_file, function_module, function_method, args=(execution_id, zip_file, function_module, function_method,
input.pop('__function_input', None), input, return_dict) input.pop('__function_input', None), input, return_dict, rlimit)
) )
p.start() p.start()
p.join() p.join()