Improve function execution performance

- Improve/Simplify python runtime
- Use non-locking for communication with db when invoking function.
- Add config option to run job handler.
- Pre-fetch runtime when query function in db.

Change-Id: I9e791aecf03d5bf64a39548e6c9bad1b678882af
This commit is contained in:
Lingxian Kong 2017-12-05 23:52:37 +13:00
parent 01be0d2a1f
commit c19c4dca00
17 changed files with 284 additions and 244 deletions

View File

@ -47,6 +47,7 @@ def setup_app(config=None):
db_api.setup_db()
if cfg.CONF.api.enable_job_handler:
LOG.info('Starting periodic tasks...')
periodics.start_job_handler()

View File

@ -75,7 +75,7 @@ class ExecutionsController(rest.RestController):
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.Execution, types.uuid)
def get(self, id):
LOG.info("Fetch resource.", resource={'type': self.type, 'id': id})
LOG.info("Get resource.", resource={'type': self.type, 'id': id})
execution_db = db_api.get_execution(id)

View File

@ -69,7 +69,7 @@ class FunctionsController(rest.RestController):
@rest_utils.wrap_pecan_controller_exception
@pecan.expose()
def get(self, id):
LOG.info("Fetch resource.", resource={'type': self.type, 'id': id})
LOG.info("Get resource.", resource={'type': self.type, 'id': id})
download = strutils.bool_from_string(
pecan.request.GET.get('download', False)

View File

@ -84,7 +84,7 @@ class JobsController(rest.RestController):
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.Job, types.uuid)
def get(self, id):
LOG.info("Fetch resource.", resource={'type': self.type, 'id': id})
LOG.info("Get resource.", resource={'type': self.type, 'id': id})
job_db = db_api.get_job(id)
return resources.Job.from_dict(job_db.to_dict())

View File

@ -42,7 +42,7 @@ class RuntimesController(rest.RestController):
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.Runtime, types.uuid)
def get(self, id):
LOG.info("Fetch resource.", resource={'type': self.type, 'id': id})
LOG.info("Get resource.", resource={'type': self.type, 'id': id})
runtime_db = db_api.get_runtime(id)

View File

@ -41,7 +41,12 @@ api_opts = [
help='Number of workers for Qinling API service '
'default is equal to the number of CPUs available if that can '
'be determined, else a default worker count of 1 is returned.'
)
),
cfg.BoolOpt(
'enable_job_handler',
default=True,
help='Enable job handler.'
),
]
PECAN_GROUP = 'pecan'

View File

@ -19,6 +19,7 @@ import threading
from oslo_config import cfg
from oslo_db import exception as oslo_db_exc
from oslo_db.sqlalchemy import utils as db_utils
from oslo_log import log as logging
import sqlalchemy as sa
from qinling import context
@ -30,6 +31,7 @@ from qinling import exceptions as exc
from qinling import status
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
_SCHEMA_LOCK = threading.RLock()
_initialized = False
@ -366,14 +368,11 @@ def create_function_service_mapping(values, session=None):
mapping = models.FunctionServiceMapping()
mapping.update(values.copy())
# Ignore duplicate error for FunctionServiceMapping
try:
mapping.save(session=session)
except oslo_db_exc.DBDuplicateEntry as e:
raise exc.DBError(
"Duplicate entry for FunctionServiceMapping: %s" % e.columns
)
return mapping
except oslo_db_exc.DBDuplicateEntry:
session.close()
@db_base.session_aware()
@ -412,14 +411,11 @@ def create_function_worker(values, session=None):
mapping = models.FunctionWorkers()
mapping.update(values.copy())
# Ignore duplicate error for FunctionWorkers
try:
mapping.save(session=session)
except oslo_db_exc.DBDuplicateEntry as e:
raise exc.DBError(
"Duplicate entry for FunctionWorkers: %s" % e.columns
)
return mapping
except oslo_db_exc.DBDuplicateEntry:
session.close()
@db_base.session_aware()

View File

@ -102,6 +102,7 @@ def upgrade():
sa.Column('function_id', sa.String(length=36), nullable=False),
sa.Column('worker_name', sa.String(length=255), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('function_id', 'worker_name'),
sa.ForeignKeyConstraint(
['function_id'], [u'functions.id'], ondelete='CASCADE'
),

View File

@ -38,7 +38,10 @@ class Function(model_base.QinlingSecureModelBase):
runtime_id = sa.Column(
sa.String(36), sa.ForeignKey(Runtime.id), nullable=True
)
runtime = relationship('Runtime', back_populates="functions")
# We want to get runtime info when we query function
runtime = relationship(
'Runtime', back_populates="functions", innerjoin=True, lazy='joined'
)
memory_size = sa.Column(sa.Integer)
timeout = sa.Column(sa.Integer)
code = sa.Column(st.JsonLongDictType(), nullable=False)
@ -65,6 +68,10 @@ class FunctionServiceMapping(model_base.QinlingModelBase):
class FunctionWorkers(model_base.QinlingModelBase):
__tablename__ = 'function_workers'
__table_args__ = (
sa.UniqueConstraint('function_id', 'worker_name'),
)
id = model_base.id_column()
function_id = sa.Column(
sa.String(36),

View File

@ -16,6 +16,7 @@ from oslo_config import cfg
from oslo_log import log as logging
import requests
from qinling import context
from qinling.db import api as db_api
from qinling.engine import utils
from qinling import status
@ -99,12 +100,35 @@ class DefaultEngine(object):
if function.service:
func_url = '%s/execute' % function.service.service_url
LOG.debug(
'Found service url for function: %s, url: %s',
function_id, func_url
)
data = {'input': input, 'execution_id': execution_id}
download_url = (
'http://%s:%s/v1/functions/%s?download=true' %
(CONF.kubernetes.qinling_service_address,
CONF.api.port, function_id)
)
data = {
'execution_id': execution_id,
'input': input,
'function_id': function_id,
'entry': function.entry,
'download_url': download_url,
}
if CONF.pecan.auth_enable:
data.update(
{
'token': context.get_ctx().auth_token,
'auth_url': CONF.keystone_authtoken.auth_uri,
'username': CONF.keystone_authtoken.username,
'password': CONF.keystone_authtoken.password,
'trust_id': function.trust_id
}
)
success, res = utils.url_request(
self.session, func_url, body=data
)
@ -141,8 +165,6 @@ class DefaultEngine(object):
identifier=identifier,
labels=labels,
input=input,
entry=function.entry,
trust_id=function.trust_id
)
success, res = self.orchestrator.run_execution(
execution_id,
@ -150,6 +172,8 @@ class DefaultEngine(object):
input=input,
identifier=identifier,
service_url=service_url,
entry=function.entry,
trust_id=function.trust_id
)
logs = ''
@ -195,12 +219,9 @@ class DefaultEngine(object):
LOG.info('Deleted.', resource=resource)
def scaleup_function(self, ctx, function_id, runtime_id, count=1):
function = db_api.get_function(function_id)
worker_names = self.orchestrator.scaleup_function(
function_id,
identifier=runtime_id,
entry=function.entry,
count=count
)

View File

@ -11,18 +11,33 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo_log import log as logging
import requests
import six
LOG = logging.getLogger(__name__)
def url_request(request_session, url, body=None):
"""Send request to a service url."""
exception = None
for a in six.moves.xrange(10):
try:
# Default execution duration is 3min, could be configurable
r = request_session.post(url, json=body, timeout=(3, 180))
return True, r.json()
except requests.ConnectionError as e:
exception = e
LOG.warning("Could not connect to service. Retrying.")
time.sleep(1)
except Exception:
LOG.exception("Failed to request url %s", url)
return False, {'error': 'Function execution timeout.'}
LOG.exception("Could not connect to function service. Reason: %s",
exception)
return False, {'error': 'Internal service error.'}

View File

@ -20,7 +20,6 @@ import jinja2
from kubernetes import client
from oslo_log import log as logging
import requests
import six
import tenacity
import yaml
@ -234,19 +233,18 @@ class KubernetesManager(base.OrchestratorBase):
return ret.items[-count:]
def _prepare_pod(self, pod, deployment_name, function_id, labels=None,
entry=None, trust_id=None, actual_function=None):
def _prepare_pod(self, pod, deployment_name, function_id, labels=None):
"""Pod preparation.
1. Update pod labels.
2. Expose service and trigger package download.
2. Expose service.
"""
name = pod.metadata.name
actual_function = actual_function or function_id
pod_name = pod.metadata.name
labels = labels or {}
LOG.info(
'Prepare pod %s in deployment %s for function %s',
name, deployment_name, function_id
pod_name, deployment_name, function_id
)
# Update pod label.
@ -255,6 +253,8 @@ class KubernetesManager(base.OrchestratorBase):
# Create service for the chosen pod.
service_name = "service-%s" % function_id
labels.update({'function_id': function_id})
# TODO(kong): Make the service type configurable.
service_body = self.service_template.render(
{
"service_name": service_name,
@ -262,19 +262,32 @@ class KubernetesManager(base.OrchestratorBase):
"selector": pod_labels
}
)
try:
ret = self.v1.create_namespaced_service(
self.conf.kubernetes.namespace, yaml.safe_load(service_body)
)
node_port = ret.spec.ports[0].node_port
LOG.debug(
'Service created for pod %s, service name: %s, node port: %s',
name, service_name, node_port
'Service created for pod %s, service name: %s',
pod_name, service_name
)
except Exception as e:
# Service already exists
if e.status == 409:
LOG.debug(
'Service already exists for pod %s, service name: %s',
pod_name, service_name
)
time.sleep(1)
ret = self.v1.read_namespaced_service(
service_name, self.conf.kubernetes.namespace
)
else:
raise
# Get external ip address for an arbitrary node.
ret = self.v1.list_node()
addresses = ret.items[0].status.addresses
node_port = ret.spec.ports[0].node_port
nodes = self.v1.list_node()
addresses = nodes.items[0].status.addresses
node_ip = None
for addr in addresses:
if addr.type == 'ExternalIP':
@ -286,53 +299,9 @@ class KubernetesManager(base.OrchestratorBase):
if addr.type == 'InternalIP':
node_ip = addr.address
# Download code package into container.
pod_service_url = 'http://%s:%s' % (node_ip, node_port)
request_url = '%s/download' % pod_service_url
download_url = (
'http://%s:%s/v1/functions/%s?download=true' %
(self.conf.kubernetes.qinling_service_address,
self.conf.api.port, actual_function)
)
data = {
'download_url': download_url,
'function_id': actual_function,
'entry': entry,
'trust_id': trust_id
}
if self.conf.pecan.auth_enable:
data.update(
{
'token': context.get_ctx().auth_token,
'auth_url': self.conf.keystone_authtoken.auth_uri,
'username': self.conf.keystone_authtoken.username,
'password': self.conf.keystone_authtoken.password,
}
)
LOG.info(
'Send request to pod %s, request_url: %s', name, request_url
)
exception = None
for a in six.moves.xrange(10):
try:
r = self.session.post(request_url, json=data)
if r.status_code != requests.codes.ok:
raise exc.OrchestratorException(
'Failed to download function code package.'
)
return name, pod_service_url
except (requests.ConnectionError, requests.Timeout) as e:
exception = e
LOG.warning("Could not connect to service. Retrying.")
time.sleep(1)
raise exc.OrchestratorException(
'Could not connect to function service. Reason: %s', exception
)
return pod_name, pod_service_url
def _create_pod(self, image, pod_name, labels, input):
pod_body = self.pod_template.render(
@ -372,8 +341,7 @@ class KubernetesManager(base.OrchestratorBase):
return pod_labels
def prepare_execution(self, function_id, image=None, identifier=None,
labels=None, input=None, entry='main.main',
trust_id=None):
labels=None, input=None):
"""Prepare service URL for function.
For image function, create a single pod with input, so the function
@ -395,7 +363,7 @@ class KubernetesManager(base.OrchestratorBase):
try:
pod_name, url = self._prepare_pod(
pod[0], identifier, function_id, labels, entry, trust_id
pod[0], identifier, function_id, labels
)
return pod_name, url
except Exception:
@ -404,14 +372,38 @@ class KubernetesManager(base.OrchestratorBase):
raise exc.OrchestratorException('Execution preparation failed.')
def run_execution(self, execution_id, function_id, input=None,
identifier=None, service_url=None):
identifier=None, service_url=None, entry='main.main',
trust_id=None):
if service_url:
func_url = '%s/execute' % service_url
download_url = (
'http://%s:%s/v1/functions/%s?download=true' %
(self.conf.kubernetes.qinling_service_address,
self.conf.api.port, function_id)
)
data = {
'input': input,
'execution_id': execution_id,
'input': input,
'function_id': function_id,
'entry': entry,
'download_url': download_url,
'trust_id': trust_id
}
LOG.debug('Invoke function %s, url: %s', function_id, func_url)
if self.conf.pecan.auth_enable:
data.update(
{
'token': context.get_ctx().auth_token,
'auth_url': self.conf.keystone_authtoken.auth_uri,
'username': self.conf.keystone_authtoken.username,
'password': self.conf.keystone_authtoken.password,
}
)
LOG.debug(
'Invoke function %s, url: %s, data: %s',
function_id, func_url, data
)
return utils.url_request(self.session, func_url, body=data)
else:
@ -425,7 +417,6 @@ class KubernetesManager(base.OrchestratorBase):
self.conf.kubernetes.namespace
)
status = pod.status.phase
time.sleep(0.5)
output = self.v1.read_namespaced_pod_log(
@ -453,47 +444,26 @@ class KubernetesManager(base.OrchestratorBase):
label_selector=selector
)
def scaleup_function(self, function_id, identifier=None,
entry='main.main', count=1):
def scaleup_function(self, function_id, identifier=None, count=1):
pod_names = []
labels = {'runtime_id': identifier}
pods = self._choose_available_pod(
labels, count=count
)
pods = self._choose_available_pod(labels, count=count)
if not pods:
raise exc.OrchestratorException('Not enough workers available.')
temp_function = '%s-temp' % function_id
for pod in pods:
self._prepare_pod(pod, identifier, temp_function, labels, entry,
actual_function=function_id)
# Delete temporary service
selector = common.convert_dict_to_string(
{'function_id': temp_function}
pod_name, _ = self._prepare_pod(
pod, identifier, function_id, labels
)
ret = self.v1.list_namespaced_service(
self.conf.kubernetes.namespace, label_selector=selector
)
svc_names = [i.metadata.name for i in ret.items]
for svc_name in svc_names:
self.v1.delete_namespaced_service(
svc_name,
self.conf.kubernetes.namespace,
)
# Modify pod labels to fit into correct service
self._update_pod_label(pod, {'function_id': function_id})
pod_names.append(pod.metadata.name)
pod_names.append(pod_name)
LOG.info('Pods scaled up for function %s: %s', function_id, pod_names)
return pod_names
def delete_worker(self, worker_name, **kwargs):
def delete_worker(self, pod_name, **kwargs):
self.v1.delete_namespaced_pod(
worker_name,
pod_name,
self.conf.kubernetes.namespace,
{}
)

View File

@ -28,8 +28,8 @@ spec:
- containerPort: 9090
resources:
limits:
cpu: "0.5"
memory: 512Mi
cpu: "0.3"
memory: 256Mi
requests:
cpu: "0.1"
memory: 128Mi
memory: 64Mi

View File

@ -20,8 +20,8 @@ spec:
restartPolicy: Never
resources:
limits:
cpu: "0.5"
memory: 512Mi
cpu: "0.3"
memory: 256Mi
requests:
cpu: "0.1"
memory: 128Mi
memory: 64Mi

View File

@ -12,16 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from qinling.db import api as db_api
from qinling.db.sqlalchemy import models
from qinling import exceptions as exc
from qinling import status
LOG = logging.getLogger(__name__)
def create_execution(engine_client, params):
function_id = params['function_id']
is_sync = params.get('sync', True)
with db_api.transaction():
def _update_function_db(function_id):
# NOTE(kong): Store function info in cache?
func_db = db_api.get_function(function_id)
runtime_db = func_db.runtime
if runtime_db and runtime_db.status != status.AVAILABLE:
@ -29,15 +31,42 @@ def create_execution(engine_client, params):
'Runtime %s is not available.' % func_db.runtime_id
)
# Increase function invoke count, the updated_at field will be also
# updated.
func_db.count = func_db.count + 1
# Function update is done using UPDATE ... FROM ... WHERE
# non-locking clause.
while func_db:
count = func_db.count
modified = db_api.conditional_update(
models.Function,
{
'count': count + 1,
},
{
'id': function_id,
'count': count
},
insecure=True,
)
if not modified:
LOG.warning("Retrying to update function count.")
func_db = db_api.get_function(function_id)
continue
else:
break
return func_db.runtime_id
def create_execution(engine_client, params):
function_id = params['function_id']
is_sync = params.get('sync', True)
runtime_id = _update_function_db(function_id)
params.update({'status': status.RUNNING})
db_model = db_api.create_execution(params)
engine_client.create_execution(
db_model.id, function_id, func_db.runtime_id,
db_model.id, function_id, runtime_id,
input=params.get('input'), is_sync=is_sync
)

View File

@ -29,3 +29,4 @@ python-swiftclient>=3.2.0 # Apache-2.0
croniter>=0.3.4 # MIT License
python-dateutil>=2.4.2 # BSD
tenacity>=3.2.1 # Apache-2.0
PyMySQL>=0.7.6 # MIT License

View File

@ -23,7 +23,6 @@ import time
import traceback
import zipfile
from flask import abort
from flask import Flask
from flask import request
from flask import Response
@ -32,60 +31,20 @@ from keystoneauth1 import session
import requests
app = Flask(__name__)
zip_file = ''
function_module = 'main'
function_method = 'main'
auth_url = None
username = None
password = None
trust_id = None
downloaded = False
downloading = False
@app.route('/download', methods=['POST'])
def download():
params = request.get_json() or {}
download_url = params.get('download_url')
function_id = params.get('function_id')
entry = params.get('entry')
global auth_url
global username
global password
global trust_id
token = params.get('token')
auth_url = params.get('auth_url')
username = params.get('username')
password = params.get('password')
trust_id = params.get('trust_id')
headers = {}
if token:
headers = {'X-Auth-Token': token}
global zip_file
zip_file = '%s.zip' % function_id
app.logger.info(
'Request received, download_url:%s, headers: %s, entry: %s' %
(download_url, headers, entry)
def setup_logger(loglevel):
global app
root = logging.getLogger()
root.setLevel(loglevel)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(loglevel)
ch.setFormatter(
logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
)
# Get function code package from Qinling service.
r = requests.get(download_url, headers=headers, stream=True)
with open(zip_file, 'wb') as fd:
for chunk in r.iter_content(chunk_size=65535):
fd.write(chunk)
if not zipfile.is_zipfile(zip_file):
abort(500)
app.logger.info('Code package downloaded to %s' % zip_file)
global function_module
global function_method
function_module, function_method = tuple(entry.rsplit('.', 1))
return 'success'
app.logger.addHandler(ch)
def _print_trace():
@ -108,13 +67,13 @@ def _invoke_function(execution_id, zip_file, module_name, method, input,
return_dict['result'] = func(**input)
return_dict['success'] = True
except Exception as e:
_print_trace()
if isinstance(e, OSError) and 'Resource' in str(e):
sys.exit(1)
return_dict['result'] = str(e)
return_dict['success'] = False
_print_trace()
finally:
print('Finished execution: %s' % execution_id)
@ -130,23 +89,71 @@ def execute():
reason, e.g. unlimited memory allocation)
- Deal with os error for process (e.g. Resource temporarily unavailable)
"""
global zip_file
global function_module
global function_method
global auth_url
global username
global password
global trust_id
global downloading
global downloaded
params = request.get_json() or {}
input = params.get('input') or {}
execution_id = params['execution_id']
download_url = params.get('download_url')
function_id = params.get('function_id')
entry = params.get('entry')
trust_id = params.get('trust_id')
auth_url = params.get('auth_url')
username = params.get('username')
password = params.get('password')
zip_file = '%s.zip' % function_id
function_module, function_method = 'main', 'main'
if entry:
function_module, function_method = tuple(entry.rsplit('.', 1))
app.logger.info(
'Request received, execution_id:%s, input: %s, auth_url: %s, '
'username: %s, trust_id: %s' %
(execution_id, input, auth_url, username, trust_id)
'Request received, execution_id:%s, input: %s, auth_url: %s' %
(execution_id, input, auth_url)
)
while downloading:
# wait
time.sleep(3)
# download function package
if not downloading and not downloaded:
downloading = True
token = params.get('token')
headers = {}
if token:
headers = {'X-Auth-Token': token}
app.logger.info(
'Downloading function, download_url:%s, entry: %s' %
(download_url, entry)
)
# Get function code package from Qinling service.
r = requests.get(download_url, headers=headers, stream=True)
with open(zip_file, 'wb') as fd:
for chunk in r.iter_content(chunk_size=65535):
fd.write(chunk)
app.logger.info('Downloaded function package to %s' % zip_file)
downloading = False
downloaded = True
if downloaded:
if not zipfile.is_zipfile(zip_file):
return Response(
response=json.dumps(
{
'output': 'The function package is incorrect.',
'duration': 0,
'logs': '',
'success': False
}
),
status=500,
mimetype='application/json'
)
# Provide an openstack session to user's function
@ -160,7 +167,6 @@ def execute():
user_domain_name='Default'
)
os_session = session.Session(auth=auth, verify=False)
input.update({'context': {'os_session': os_session}})
manager = Manager()
@ -207,18 +213,6 @@ def execute():
)
def setup_logger(loglevel):
global app
root = logging.getLogger()
root.setLevel(loglevel)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(loglevel)
ch.setFormatter(
logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
)
app.logger.addHandler(ch)
setup_logger(logging.DEBUG)
app.logger.info("Starting server")