diff --git a/qinling/config.py b/qinling/config.py index 816bb4e6..47fd5e44 100644 --- a/qinling/config.py +++ b/qinling/config.py @@ -98,6 +98,16 @@ engine_opts = [ 'function_service_expiration', default=300, help='Maximum service time in seconds for function in orchestrator.' + ), + cfg.IntOpt( + 'function_concurrency', + default=10, + help='Maximum number of concurrent executions per function.' + ), + cfg.BoolOpt( + 'enable_autoscaling', + default=True, + help='Enables autoscaling capability for function execution.' ) ] diff --git a/qinling/db/api.py b/qinling/db/api.py index 16567137..50f090d3 100644 --- a/qinling/db/api.py +++ b/qinling/db/api.py @@ -178,6 +178,10 @@ def get_function_workers(function_id): return IMPL.get_function_workers(function_id) +def acquire_worker_lock(id): + return IMPL.acquire_worker_lock(id) + + def create_job(values): return IMPL.create_job(values) diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py index 49dc0755..0cf3816a 100644 --- a/qinling/db/sqlalchemy/api.py +++ b/qinling/db/sqlalchemy/api.py @@ -27,6 +27,7 @@ from qinling.db import base as db_base from qinling.db.sqlalchemy import filters as db_filters from qinling.db.sqlalchemy import model_base from qinling.db.sqlalchemy import models +from qinling.db.sqlalchemy import sqlite_lock from qinling import exceptions as exc from qinling import status @@ -449,6 +450,21 @@ def delete_function_workers(function_id, session=None): session.delete(worker) +@db_base.session_aware() +def acquire_worker_lock(function_id, session=None): + # Expire all so all objects queried after lock is acquired + # will be up-to-date from the DB and not from cache. + session.expire_all() + + if db_base.get_driver_name() == 'sqlite': + # In case of 'sqlite' we need to apply a manual lock. + sqlite_lock.acquire_lock(function_id, session) + + return _secure_query( + models.FunctionWorkers).with_for_update().filter( + models.FunctionWorkers.function_id == function_id).all() + + @db_base.session_aware() def create_job(values, session=None): job = models.Job() diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index bb22c8a5..3189ae7b 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -93,6 +93,9 @@ class DefaultEngine(object): execution_id, function_id, runtime_id, input ) + if CONF.engine.enable_autoscaling: + self.function_load_check(function_id, runtime_id) + # FIXME(kong): Make the transaction scope smaller. with db_api.transaction(): execution = db_api.get_execution(execution_id) @@ -225,13 +228,12 @@ class DefaultEngine(object): count=count ) - with db_api.transaction(): - for name in worker_names: - worker = { - 'function_id': function_id, - 'worker_name': name - } - db_api.create_function_worker(worker) + for name in worker_names: + worker = { + 'function_id': function_id, + 'worker_name': name + } + db_api.create_function_worker(worker) LOG.info('Finished scaling up function %s.', function_id) @@ -251,3 +253,24 @@ class DefaultEngine(object): db_api.delete_function_worker(worker.worker_name) LOG.info('Finished scaling up function %s.', function_id) + + def function_load_check(self, function_id, runtime_id): + with db_api.transaction(): + db_api.acquire_worker_lock(function_id) + + running_execs = db_api.get_executions( + function_id=function_id, status=status.RUNNING + ) + workers = db_api.get_function_workers(function_id) + + concurrency = (len(running_execs) or 1) / (len(workers) or 1) + + if concurrency > CONF.engine.function_concurrency: + LOG.warning( + 'Scale up function %s because of high concurrency, current' + ' concurrency: %s', + function_id, concurrency + ) + + # TODO(kong): The inscrease step could be configurable + self.scaleup_function(None, function_id, runtime_id, 1) diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index ded947f9..80041397 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -220,9 +220,18 @@ class KubernetesManager(base.OrchestratorBase): return True - def _choose_available_pod(self, labels, count=1): + def _choose_available_pod(self, labels, count=1, function_id=None): selector = common.convert_dict_to_string(labels) + # If there is already a pod for function, reuse it. + if function_id: + ret = self.v1.list_namespaced_pod( + self.conf.kubernetes.namespace, + label_selector='function_id=%s' % function_id + ) + if len(ret.items) > 0: + return ret.items[:count] + ret = self.v1.list_namespaced_pod( self.conf.kubernetes.namespace, label_selector='!function_id,%s' % selector @@ -356,10 +365,11 @@ class KubernetesManager(base.OrchestratorBase): self._create_pod(image, identifier, labels, input) return identifier, None else: - pod = self._choose_available_pod(labels) + pod = self._choose_available_pod(labels, function_id=function_id) if not pod: - raise exc.OrchestratorException('No worker available.') + LOG.critical('No worker available.') + raise exc.OrchestratorException('Execution preparation failed.') try: pod_name, url = self._prepare_pod(