From 7ba5676ec8a9e1ddad16f6ba25ad52609309f88a Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Thu, 7 Dec 2017 22:38:13 +1300 Subject: [PATCH] Function execution concurrency improvement Increase pod based on the execution rate in qinling-engine so it's qinling's responsibility to decide when and how to autoscale pod allocation. This behavior is configurable, the admin user can also scale up the pod via qinling API manually. Change-Id: Ie0b01481405daf10f495fa9d6389a624a82f0385 Implements: blueprint qingling-autoscaling --- qinling/config.py | 10 ++++++ qinling/db/api.py | 4 +++ qinling/db/sqlalchemy/api.py | 16 ++++++++++ qinling/engine/default_engine.py | 37 ++++++++++++++++++---- qinling/orchestrator/kubernetes/manager.py | 16 ++++++++-- 5 files changed, 73 insertions(+), 10 deletions(-) diff --git a/qinling/config.py b/qinling/config.py index 816bb4e6..47fd5e44 100644 --- a/qinling/config.py +++ b/qinling/config.py @@ -98,6 +98,16 @@ engine_opts = [ 'function_service_expiration', default=300, help='Maximum service time in seconds for function in orchestrator.' + ), + cfg.IntOpt( + 'function_concurrency', + default=10, + help='Maximum number of concurrent executions per function.' + ), + cfg.BoolOpt( + 'enable_autoscaling', + default=True, + help='Enables autoscaling capability for function execution.' ) ] diff --git a/qinling/db/api.py b/qinling/db/api.py index 16567137..50f090d3 100644 --- a/qinling/db/api.py +++ b/qinling/db/api.py @@ -178,6 +178,10 @@ def get_function_workers(function_id): return IMPL.get_function_workers(function_id) +def acquire_worker_lock(id): + return IMPL.acquire_worker_lock(id) + + def create_job(values): return IMPL.create_job(values) diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py index 49dc0755..0cf3816a 100644 --- a/qinling/db/sqlalchemy/api.py +++ b/qinling/db/sqlalchemy/api.py @@ -27,6 +27,7 @@ from qinling.db import base as db_base from qinling.db.sqlalchemy import filters as db_filters from qinling.db.sqlalchemy import model_base from qinling.db.sqlalchemy import models +from qinling.db.sqlalchemy import sqlite_lock from qinling import exceptions as exc from qinling import status @@ -449,6 +450,21 @@ def delete_function_workers(function_id, session=None): session.delete(worker) +@db_base.session_aware() +def acquire_worker_lock(function_id, session=None): + # Expire all so all objects queried after lock is acquired + # will be up-to-date from the DB and not from cache. + session.expire_all() + + if db_base.get_driver_name() == 'sqlite': + # In case of 'sqlite' we need to apply a manual lock. + sqlite_lock.acquire_lock(function_id, session) + + return _secure_query( + models.FunctionWorkers).with_for_update().filter( + models.FunctionWorkers.function_id == function_id).all() + + @db_base.session_aware() def create_job(values, session=None): job = models.Job() diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index bb22c8a5..3189ae7b 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -93,6 +93,9 @@ class DefaultEngine(object): execution_id, function_id, runtime_id, input ) + if CONF.engine.enable_autoscaling: + self.function_load_check(function_id, runtime_id) + # FIXME(kong): Make the transaction scope smaller. with db_api.transaction(): execution = db_api.get_execution(execution_id) @@ -225,13 +228,12 @@ class DefaultEngine(object): count=count ) - with db_api.transaction(): - for name in worker_names: - worker = { - 'function_id': function_id, - 'worker_name': name - } - db_api.create_function_worker(worker) + for name in worker_names: + worker = { + 'function_id': function_id, + 'worker_name': name + } + db_api.create_function_worker(worker) LOG.info('Finished scaling up function %s.', function_id) @@ -251,3 +253,24 @@ class DefaultEngine(object): db_api.delete_function_worker(worker.worker_name) LOG.info('Finished scaling up function %s.', function_id) + + def function_load_check(self, function_id, runtime_id): + with db_api.transaction(): + db_api.acquire_worker_lock(function_id) + + running_execs = db_api.get_executions( + function_id=function_id, status=status.RUNNING + ) + workers = db_api.get_function_workers(function_id) + + concurrency = (len(running_execs) or 1) / (len(workers) or 1) + + if concurrency > CONF.engine.function_concurrency: + LOG.warning( + 'Scale up function %s because of high concurrency, current' + ' concurrency: %s', + function_id, concurrency + ) + + # TODO(kong): The inscrease step could be configurable + self.scaleup_function(None, function_id, runtime_id, 1) diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index ded947f9..80041397 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -220,9 +220,18 @@ class KubernetesManager(base.OrchestratorBase): return True - def _choose_available_pod(self, labels, count=1): + def _choose_available_pod(self, labels, count=1, function_id=None): selector = common.convert_dict_to_string(labels) + # If there is already a pod for function, reuse it. + if function_id: + ret = self.v1.list_namespaced_pod( + self.conf.kubernetes.namespace, + label_selector='function_id=%s' % function_id + ) + if len(ret.items) > 0: + return ret.items[:count] + ret = self.v1.list_namespaced_pod( self.conf.kubernetes.namespace, label_selector='!function_id,%s' % selector @@ -356,10 +365,11 @@ class KubernetesManager(base.OrchestratorBase): self._create_pod(image, identifier, labels, input) return identifier, None else: - pod = self._choose_available_pod(labels) + pod = self._choose_available_pod(labels, function_id=function_id) if not pod: - raise exc.OrchestratorException('No worker available.') + LOG.critical('No worker available.') + raise exc.OrchestratorException('Execution preparation failed.') try: pod_name, url = self._prepare_pod(