Function execution concurrency improvement

Increase pod based on the execution rate in qinling-engine so it's
qinling's responsibility to decide when and how to autoscale pod
allocation. This behavior is configurable, the admin user can also
scale up the pod via qinling API manually.

Change-Id: Ie0b01481405daf10f495fa9d6389a624a82f0385
Implements: blueprint qingling-autoscaling
This commit is contained in:
Lingxian Kong 2017-12-07 22:38:13 +13:00
parent c19c4dca00
commit 7ba5676ec8
5 changed files with 73 additions and 10 deletions

View File

@ -98,6 +98,16 @@ engine_opts = [
'function_service_expiration', 'function_service_expiration',
default=300, default=300,
help='Maximum service time in seconds for function in orchestrator.' help='Maximum service time in seconds for function in orchestrator.'
),
cfg.IntOpt(
'function_concurrency',
default=10,
help='Maximum number of concurrent executions per function.'
),
cfg.BoolOpt(
'enable_autoscaling',
default=True,
help='Enables autoscaling capability for function execution.'
) )
] ]

View File

@ -178,6 +178,10 @@ def get_function_workers(function_id):
return IMPL.get_function_workers(function_id) return IMPL.get_function_workers(function_id)
def acquire_worker_lock(id):
return IMPL.acquire_worker_lock(id)
def create_job(values): def create_job(values):
return IMPL.create_job(values) return IMPL.create_job(values)

View File

@ -27,6 +27,7 @@ from qinling.db import base as db_base
from qinling.db.sqlalchemy import filters as db_filters from qinling.db.sqlalchemy import filters as db_filters
from qinling.db.sqlalchemy import model_base from qinling.db.sqlalchemy import model_base
from qinling.db.sqlalchemy import models from qinling.db.sqlalchemy import models
from qinling.db.sqlalchemy import sqlite_lock
from qinling import exceptions as exc from qinling import exceptions as exc
from qinling import status from qinling import status
@ -449,6 +450,21 @@ def delete_function_workers(function_id, session=None):
session.delete(worker) session.delete(worker)
@db_base.session_aware()
def acquire_worker_lock(function_id, session=None):
# Expire all so all objects queried after lock is acquired
# will be up-to-date from the DB and not from cache.
session.expire_all()
if db_base.get_driver_name() == 'sqlite':
# In case of 'sqlite' we need to apply a manual lock.
sqlite_lock.acquire_lock(function_id, session)
return _secure_query(
models.FunctionWorkers).with_for_update().filter(
models.FunctionWorkers.function_id == function_id).all()
@db_base.session_aware() @db_base.session_aware()
def create_job(values, session=None): def create_job(values, session=None):
job = models.Job() job = models.Job()

View File

@ -93,6 +93,9 @@ class DefaultEngine(object):
execution_id, function_id, runtime_id, input execution_id, function_id, runtime_id, input
) )
if CONF.engine.enable_autoscaling:
self.function_load_check(function_id, runtime_id)
# FIXME(kong): Make the transaction scope smaller. # FIXME(kong): Make the transaction scope smaller.
with db_api.transaction(): with db_api.transaction():
execution = db_api.get_execution(execution_id) execution = db_api.get_execution(execution_id)
@ -225,7 +228,6 @@ class DefaultEngine(object):
count=count count=count
) )
with db_api.transaction():
for name in worker_names: for name in worker_names:
worker = { worker = {
'function_id': function_id, 'function_id': function_id,
@ -251,3 +253,24 @@ class DefaultEngine(object):
db_api.delete_function_worker(worker.worker_name) db_api.delete_function_worker(worker.worker_name)
LOG.info('Finished scaling up function %s.', function_id) LOG.info('Finished scaling up function %s.', function_id)
def function_load_check(self, function_id, runtime_id):
with db_api.transaction():
db_api.acquire_worker_lock(function_id)
running_execs = db_api.get_executions(
function_id=function_id, status=status.RUNNING
)
workers = db_api.get_function_workers(function_id)
concurrency = (len(running_execs) or 1) / (len(workers) or 1)
if concurrency > CONF.engine.function_concurrency:
LOG.warning(
'Scale up function %s because of high concurrency, current'
' concurrency: %s',
function_id, concurrency
)
# TODO(kong): The inscrease step could be configurable
self.scaleup_function(None, function_id, runtime_id, 1)

View File

@ -220,9 +220,18 @@ class KubernetesManager(base.OrchestratorBase):
return True return True
def _choose_available_pod(self, labels, count=1): def _choose_available_pod(self, labels, count=1, function_id=None):
selector = common.convert_dict_to_string(labels) selector = common.convert_dict_to_string(labels)
# If there is already a pod for function, reuse it.
if function_id:
ret = self.v1.list_namespaced_pod(
self.conf.kubernetes.namespace,
label_selector='function_id=%s' % function_id
)
if len(ret.items) > 0:
return ret.items[:count]
ret = self.v1.list_namespaced_pod( ret = self.v1.list_namespaced_pod(
self.conf.kubernetes.namespace, self.conf.kubernetes.namespace,
label_selector='!function_id,%s' % selector label_selector='!function_id,%s' % selector
@ -356,10 +365,11 @@ class KubernetesManager(base.OrchestratorBase):
self._create_pod(image, identifier, labels, input) self._create_pod(image, identifier, labels, input)
return identifier, None return identifier, None
else: else:
pod = self._choose_available_pod(labels) pod = self._choose_available_pod(labels, function_id=function_id)
if not pod: if not pod:
raise exc.OrchestratorException('No worker available.') LOG.critical('No worker available.')
raise exc.OrchestratorException('Execution preparation failed.')
try: try:
pod_name, url = self._prepare_pod( pod_name, url = self._prepare_pod(