From c74ac2faac412c76add41b9654577cf1ebda71a8 Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Mon, 28 Aug 2017 15:51:12 +1200 Subject: [PATCH] Add scale up ability for function execution Provide an API to do scale up operation, leave the container monitoring to orchestrator underlay, although k8s already supports HPA, but we should not rely on specific container orchestrator capability for general purpose. Partially implements: blueprint qingling-autoscaling Change-Id: Iff1ff646a6df263b2770b8cebc74e80ab18c7613 --- qinling/api/controllers/v1/function.py | 23 +++++ qinling/config.py | 5 + qinling/db/api.py | 4 + qinling/db/sqlalchemy/api.py | 15 +++ .../alembic_migrations/versions/001_pike.py | 93 +++++++++++-------- qinling/db/sqlalchemy/models.py | 25 ++++- qinling/engine/default_engine.py | 35 +++++-- qinling/orchestrator/base.py | 4 + qinling/orchestrator/kubernetes/manager.py | 88 +++++++++++++----- qinling/rpc.py | 9 ++ 10 files changed, 229 insertions(+), 72 deletions(-) diff --git a/qinling/api/controllers/v1/function.py b/qinling/api/controllers/v1/function.py index 8f80e534..920d1247 100644 --- a/qinling/api/controllers/v1/function.py +++ b/qinling/api/controllers/v1/function.py @@ -43,6 +43,11 @@ UPDATE_ALLOWED = set(['name', 'description', 'entry']) class FunctionsController(rest.RestController): + _custom_actions = { + 'scale_up': ['POST'], + 'scale_down': ['POST'], + } + def __init__(self, *args, **kwargs): self.storage_provider = storage_base.load_storage_provider(CONF) self.engine_client = rpc.get_engine_client() @@ -216,3 +221,21 @@ class FunctionsController(rest.RestController): self.engine_client.delete_function(id) return resources.Function.from_dict(func_db.to_dict()) + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose( + None, + types.uuid, + status_code=202 + ) + def scale_up(self, id): + """Scale up the containers for function execution. + + This is admin only operation. The number of added containers is defined + in config file. + """ + func_db = db_api.get_function(id) + + LOG.info('Starting to scale up function %s', id) + + self.engine_client.scaleup_function(id, runtime_id=func_db.runtime_id) diff --git a/qinling/config.py b/qinling/config.py index 1b5695c9..be9ae366 100644 --- a/qinling/config.py +++ b/qinling/config.py @@ -132,6 +132,11 @@ kubernetes_opts = [ 'qinling_service_address', help='Qinling API service ip address.' ), + cfg.IntOpt( + 'scale_step', + default=1, + help='Number of pods for function scale up.' + ), ] CONF = cfg.CONF diff --git a/qinling/db/api.py b/qinling/db/api.py index d70125cd..614c1305 100644 --- a/qinling/db/api.py +++ b/qinling/db/api.py @@ -162,6 +162,10 @@ def delete_function_service_mapping(id): return IMPL.delete_function_service_mapping(id) +def create_function_worker(values): + return IMPL.create_function_worker(values) + + def create_job(values): return IMPL.create_job(values) diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py index 6a36cd7a..9527ad96 100644 --- a/qinling/db/sqlalchemy/api.py +++ b/qinling/db/sqlalchemy/api.py @@ -395,6 +395,21 @@ def delete_function_service_mapping(id, session=None): session.delete(mapping) +@db_base.session_aware() +def create_function_worker(values, session=None): + mapping = models.FunctionWorkers() + mapping.update(values.copy()) + + try: + mapping.save(session=session) + except oslo_db_exc.DBDuplicateEntry as e: + raise exc.DBError( + "Duplicate entry for FunctionWorkers: %s" % e.columns + ) + + return mapping + + @db_base.session_aware() def create_job(values, session=None): job = models.Job() diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py index 88608b95..a7c8bcc5 100644 --- a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py +++ b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py @@ -45,43 +45,7 @@ def _add_if_not_exists(element, compiler, **kw): def upgrade(): op.create_table( - 'function', - sa.Column('created_at', sa.DateTime(), nullable=True), - sa.Column('updated_at', sa.DateTime(), nullable=True), - sa.Column('project_id', sa.String(length=80), nullable=False), - sa.Column('id', sa.String(length=36), nullable=False), - sa.Column('name', sa.String(length=255), nullable=True), - sa.Column('description', sa.String(length=255), nullable=True), - sa.Column('runtime_id', sa.String(length=36), nullable=True), - sa.Column('memory_size', sa.Integer, nullable=True), - sa.Column('timeout', sa.Integer, nullable=True), - sa.Column('code', st.JsonLongDictType(), nullable=False), - sa.Column('entry', sa.String(length=80), nullable=False), - sa.Column('count', sa.Integer, nullable=False), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('name', 'project_id'), - sa.ForeignKeyConstraint(['runtime_id'], [u'runtime.id']), - info={"check_ifexists": True} - ) - - op.create_table( - 'function_service_mapping', - sa.Column('id', sa.String(length=36), nullable=False), - sa.Column('created_at', sa.DateTime(), nullable=True), - sa.Column('updated_at', sa.DateTime(), nullable=True), - sa.Column('function_id', sa.String(length=36), nullable=False), - sa.Column('service_url', sa.String(length=255), nullable=False), - sa.Column('worker_name', sa.String(length=255), nullable=False), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('function_id', 'service_url'), - sa.ForeignKeyConstraint( - ['function_id'], [u'function.id'], ondelete='CASCADE' - ), - info={"check_ifexists": True} - ) - - op.create_table( - 'runtime', + 'runtimes', sa.Column('created_at', sa.DateTime(), nullable=True), sa.Column('updated_at', sa.DateTime(), nullable=True), sa.Column('project_id', sa.String(length=80), nullable=False), @@ -96,7 +60,56 @@ def upgrade(): ) op.create_table( - 'execution', + 'functions', + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('project_id', sa.String(length=80), nullable=False), + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('name', sa.String(length=255), nullable=True), + sa.Column('description', sa.String(length=255), nullable=True), + sa.Column('runtime_id', sa.String(length=36), nullable=True), + sa.Column('memory_size', sa.Integer, nullable=True), + sa.Column('timeout', sa.Integer, nullable=True), + sa.Column('code', st.JsonLongDictType(), nullable=False), + sa.Column('entry', sa.String(length=80), nullable=False), + sa.Column('count', sa.Integer, nullable=False), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('name', 'project_id'), + sa.ForeignKeyConstraint(['runtime_id'], [u'runtimes.id']), + info={"check_ifexists": True} + ) + + op.create_table( + 'function_service_mappings', + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('function_id', sa.String(length=36), nullable=False), + sa.Column('service_url', sa.String(length=255), nullable=False), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('function_id', 'service_url'), + sa.ForeignKeyConstraint( + ['function_id'], [u'functions.id'], ondelete='CASCADE' + ), + info={"check_ifexists": True} + ) + + op.create_table( + 'function_workers', + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('function_id', sa.String(length=36), nullable=False), + sa.Column('worker_name', sa.String(length=255), nullable=False), + sa.PrimaryKeyConstraint('id'), + sa.ForeignKeyConstraint( + ['function_id'], [u'functions.id'], ondelete='CASCADE' + ), + info={"check_ifexists": True} + ) + + op.create_table( + 'executions', sa.Column('created_at', sa.DateTime(), nullable=True), sa.Column('updated_at', sa.DateTime(), nullable=True), sa.Column('project_id', sa.String(length=80), nullable=False), @@ -113,7 +126,7 @@ def upgrade(): ) op.create_table( - 'job', + 'jobs', sa.Column('created_at', sa.DateTime(), nullable=True), sa.Column('updated_at', sa.DateTime(), nullable=True), sa.Column('project_id', sa.String(length=80), nullable=False), @@ -128,6 +141,6 @@ def upgrade(): sa.Column('count', sa.Integer(), nullable=True), sa.Column('trust_id', sa.String(length=80), nullable=True), sa.PrimaryKeyConstraint('id'), - sa.ForeignKeyConstraint(['function_id'], [u'function.id']), + sa.ForeignKeyConstraint(['function_id'], [u'functions.id']), info={"check_ifexists": True} ) diff --git a/qinling/db/sqlalchemy/models.py b/qinling/db/sqlalchemy/models.py index 4c5cca8f..5df35be5 100644 --- a/qinling/db/sqlalchemy/models.py +++ b/qinling/db/sqlalchemy/models.py @@ -21,7 +21,7 @@ from qinling.utils import common class Runtime(model_base.QinlingSecureModelBase): - __tablename__ = 'runtime' + __tablename__ = 'runtimes' __table_args__ = ( sa.UniqueConstraint('image', 'project_id'), @@ -34,7 +34,7 @@ class Runtime(model_base.QinlingSecureModelBase): class Function(model_base.QinlingSecureModelBase): - __tablename__ = 'function' + __tablename__ = 'functions' __table_args__ = ( sa.UniqueConstraint('name', 'project_id'), @@ -54,7 +54,7 @@ class Function(model_base.QinlingSecureModelBase): class FunctionServiceMapping(model_base.QinlingModelBase): - __tablename__ = 'function_service_mapping' + __tablename__ = 'function_service_mappings' __table_args__ = ( sa.UniqueConstraint('function_id', 'service_url'), @@ -66,11 +66,21 @@ class FunctionServiceMapping(model_base.QinlingModelBase): sa.ForeignKey(Function.id, ondelete='CASCADE'), ) service_url = sa.Column(sa.String(255), nullable=False) + + +class FunctionWorkers(model_base.QinlingModelBase): + __tablename__ = 'function_workers' + + id = model_base.id_column() + function_id = sa.Column( + sa.String(36), + sa.ForeignKey(Function.id, ondelete='CASCADE'), + ) worker_name = sa.Column(sa.String(255), nullable=False) class Execution(model_base.QinlingSecureModelBase): - __tablename__ = 'execution' + __tablename__ = 'executions' function_id = sa.Column(sa.String(36), nullable=False) status = sa.Column(sa.String(32), nullable=False) @@ -82,7 +92,7 @@ class Execution(model_base.QinlingSecureModelBase): class Job(model_base.QinlingSecureModelBase): - __tablename__ = 'job' + __tablename__ = 'jobs' name = sa.Column(sa.String(255), nullable=True) pattern = sa.Column(sa.String(32), nullable=True) @@ -111,6 +121,11 @@ Function.service = relationship( uselist=False, cascade="all, delete-orphan" ) +# Delete workers automatically when deleting function. +Function.workers = relationship( + "FunctionWorkers", + cascade="all, delete-orphan" +) Runtime.functions = relationship("Function", back_populates="runtime") diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index 8ca5a423..01f07828 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -103,16 +103,16 @@ class DefaultEngine(object): data = {'input': input, 'execution_id': execution_id} r = requests.post(func_url, json=data) - logs = self.orchestrator.get_execution_log( - execution_id, - worker_name=function.service.worker_name, - ) + # logs = self.orchestrator.get_execution_log( + # execution_id, + # worker_name=function.service.worker_name, + # ) LOG.debug('Finished execution %s', execution_id) execution.status = status.SUCCESS execution.output = r.json() - execution.logs = logs + # execution.logs = logs return source = function.code['source'] @@ -169,9 +169,13 @@ class DefaultEngine(object): mapping = { 'function_id': function_id, 'service_url': service_url, - 'worker_name': worker_name } db_api.create_function_service_mapping(mapping) + worker = { + 'function_id': function_id, + 'worker_name': worker_name + } + db_api.create_function_worker(worker) def delete_function(self, ctx, function_id): resource = {'type': 'function', 'id': function_id} @@ -181,3 +185,22 @@ class DefaultEngine(object): self.orchestrator.delete_function(function_id, labels=labels) LOG.info('Deleted.', resource=resource) + + def scaleup_function(self, ctx, function_id, runtime_id): + function = db_api.get_function(function_id) + + worker_names = self.orchestrator.scaleup_function( + function_id, + identifier=runtime_id, + entry=function.entry + ) + + with db_api.transaction(): + for name in worker_names: + worker = { + 'function_id': function_id, + 'worker_name': name + } + db_api.create_function_worker(worker) + + LOG.info('Finished scaling up function %s.', function_id) diff --git a/qinling/orchestrator/base.py b/qinling/orchestrator/base.py index d7432fbe..f6c1e77b 100644 --- a/qinling/orchestrator/base.py +++ b/qinling/orchestrator/base.py @@ -54,6 +54,10 @@ class OrchestratorBase(object): def delete_function(self, function_id, **kwargs): raise NotImplementedError + @abc.abstractmethod + def scaleup_function(self, function_id, **kwargs): + raise NotImplementedError + def load_orchestrator(conf): global ORCHESTRATOR diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index 4afa3118..ed6793c6 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy import os import time @@ -195,7 +196,7 @@ class KubernetesManager(base.OrchestratorBase): return True - def _choose_available_pod(self, labels): + def _choose_available_pod(self, labels, count=1): selector = common.convert_dict_to_string(labels) ret = self.v1.list_namespaced_pod( @@ -206,18 +207,17 @@ class KubernetesManager(base.OrchestratorBase): if len(ret.items) == 0: return None - # Choose the last available one by default. - pod = ret.items[-1] + return ret.items[-count:] - return pod - - def _prepare_pod(self, pod, deployment_name, function_id, labels, entry): + def _prepare_pod(self, pod, deployment_name, function_id, labels=None, + entry=None, actual_function=None): """Pod preparation. 1. Update pod labels. 2. Expose service and trigger package download. """ name = pod.metadata.name + actual_function = actual_function or function_id LOG.info( 'Prepare pod %s in deployment %s for function %s', @@ -225,18 +225,7 @@ class KubernetesManager(base.OrchestratorBase): ) # Update pod label. - pod_labels = pod.metadata.labels or {} - pod_labels.update({'function_id': function_id}) - body = { - 'metadata': { - 'labels': pod_labels - } - } - self.v1.patch_namespaced_pod( - name, self.conf.kubernetes.namespace, body - ) - - LOG.debug('Labels updated for pod %s', name) + pod_labels = self._update_pod_label(pod, {'function_id': function_id}) # Create service for the chosen pod. service_name = "service-%s" % function_id @@ -278,12 +267,12 @@ class KubernetesManager(base.OrchestratorBase): download_url = ( 'http://%s:%s/v1/functions/%s?download=true' % (self.conf.kubernetes.qinling_service_address, - self.conf.api.port, function_id) + self.conf.api.port, actual_function) ) data = { 'download_url': download_url, - 'function_id': function_id, + 'function_id': actual_function, 'entry': entry, 'token': context.get_ctx().auth_token, } @@ -325,6 +314,24 @@ class KubernetesManager(base.OrchestratorBase): body=yaml.safe_load(pod_body), ) + def _update_pod_label(self, pod, new_label=None): + name = pod.metadata.name + + pod_labels = copy.deepcopy(pod.metadata.labels) or {} + pod_labels.update(new_label) + body = { + 'metadata': { + 'labels': pod_labels + } + } + self.v1.patch_namespaced_pod( + name, self.conf.kubernetes.namespace, body + ) + + LOG.debug('Labels updated for pod %s', name) + + return pod_labels + def prepare_execution(self, function_id, image=None, identifier=None, labels=None, input=None, entry='main.main'): """Prepare service URL for function. @@ -346,7 +353,8 @@ class KubernetesManager(base.OrchestratorBase): if not pod: raise exc.OrchestratorException('No pod available.') - return self._prepare_pod(pod, identifier, function_id, labels, entry) + return self._prepare_pod(pod[0], identifier, function_id, labels, + entry) def run_execution(self, execution_id, function_id, input=None, identifier=None, service_url=None): @@ -415,3 +423,41 @@ class KubernetesManager(base.OrchestratorBase): ) LOG.info("Pod for function %s deleted.", function_id) + + def scaleup_function(self, function_id, identifier=None, + entry='main.main'): + pod_names = [] + labels = {'runtime_id': identifier} + pods = self._choose_available_pod( + labels, count=self.conf.kubernetes.scale_step + ) + + if not pods: + raise exc.OrchestratorException('Not enough pods available.') + + temp_function = '%s-temp' % function_id + for pod in pods: + self._prepare_pod(pod, identifier, temp_function, labels, entry, + actual_function=function_id) + + # Delete temporary service + selector = common.convert_dict_to_string( + {'function_id': temp_function} + ) + ret = self.v1.list_namespaced_service( + self.conf.kubernetes.namespace, label_selector=selector + ) + svc_names = [i.metadata.name for i in ret.items] + for svc_name in svc_names: + self.v1.delete_namespaced_service( + svc_name, + self.conf.kubernetes.namespace, + ) + + # Modify pod labels to fit into correct service + self._update_pod_label(pod, {'function_id': function_id}) + + pod_names.append(pod.metadata.name) + + LOG.info('Pods scaled up for function %s: %s', function_id, pod_names) + return pod_names diff --git a/qinling/rpc.py b/qinling/rpc.py index e8835450..ea33c9c2 100644 --- a/qinling/rpc.py +++ b/qinling/rpc.py @@ -186,3 +186,12 @@ class EngineClient(object): 'delete_function', function_id=id ) + + @wrap_messaging_exception + def scaleup_function(self, id, runtime_id): + return self._client.prepare(topic=self.topic, server=None).cast( + ctx.get_ctx(), + 'scaleup_function', + function_id=id, + runtime_id=runtime_id, + )