diff --git a/qinling/api/controllers/v1/function.py b/qinling/api/controllers/v1/function.py index 8f80e534..920d1247 100644 --- a/qinling/api/controllers/v1/function.py +++ b/qinling/api/controllers/v1/function.py @@ -43,6 +43,11 @@ UPDATE_ALLOWED = set(['name', 'description', 'entry']) class FunctionsController(rest.RestController): + _custom_actions = { + 'scale_up': ['POST'], + 'scale_down': ['POST'], + } + def __init__(self, *args, **kwargs): self.storage_provider = storage_base.load_storage_provider(CONF) self.engine_client = rpc.get_engine_client() @@ -216,3 +221,21 @@ class FunctionsController(rest.RestController): self.engine_client.delete_function(id) return resources.Function.from_dict(func_db.to_dict()) + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose( + None, + types.uuid, + status_code=202 + ) + def scale_up(self, id): + """Scale up the containers for function execution. + + This is admin only operation. The number of added containers is defined + in config file. + """ + func_db = db_api.get_function(id) + + LOG.info('Starting to scale up function %s', id) + + self.engine_client.scaleup_function(id, runtime_id=func_db.runtime_id) diff --git a/qinling/config.py b/qinling/config.py index 1b5695c9..be9ae366 100644 --- a/qinling/config.py +++ b/qinling/config.py @@ -132,6 +132,11 @@ kubernetes_opts = [ 'qinling_service_address', help='Qinling API service ip address.' ), + cfg.IntOpt( + 'scale_step', + default=1, + help='Number of pods for function scale up.' + ), ] CONF = cfg.CONF diff --git a/qinling/db/api.py b/qinling/db/api.py index d70125cd..614c1305 100644 --- a/qinling/db/api.py +++ b/qinling/db/api.py @@ -162,6 +162,10 @@ def delete_function_service_mapping(id): return IMPL.delete_function_service_mapping(id) +def create_function_worker(values): + return IMPL.create_function_worker(values) + + def create_job(values): return IMPL.create_job(values) diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py index 6a36cd7a..9527ad96 100644 --- a/qinling/db/sqlalchemy/api.py +++ b/qinling/db/sqlalchemy/api.py @@ -395,6 +395,21 @@ def delete_function_service_mapping(id, session=None): session.delete(mapping) +@db_base.session_aware() +def create_function_worker(values, session=None): + mapping = models.FunctionWorkers() + mapping.update(values.copy()) + + try: + mapping.save(session=session) + except oslo_db_exc.DBDuplicateEntry as e: + raise exc.DBError( + "Duplicate entry for FunctionWorkers: %s" % e.columns + ) + + return mapping + + @db_base.session_aware() def create_job(values, session=None): job = models.Job() diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py index 88608b95..a7c8bcc5 100644 --- a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py +++ b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py @@ -45,43 +45,7 @@ def _add_if_not_exists(element, compiler, **kw): def upgrade(): op.create_table( - 'function', - sa.Column('created_at', sa.DateTime(), nullable=True), - sa.Column('updated_at', sa.DateTime(), nullable=True), - sa.Column('project_id', sa.String(length=80), nullable=False), - sa.Column('id', sa.String(length=36), nullable=False), - sa.Column('name', sa.String(length=255), nullable=True), - sa.Column('description', sa.String(length=255), nullable=True), - sa.Column('runtime_id', sa.String(length=36), nullable=True), - sa.Column('memory_size', sa.Integer, nullable=True), - sa.Column('timeout', sa.Integer, nullable=True), - sa.Column('code', st.JsonLongDictType(), nullable=False), - sa.Column('entry', sa.String(length=80), nullable=False), - sa.Column('count', sa.Integer, nullable=False), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('name', 'project_id'), - sa.ForeignKeyConstraint(['runtime_id'], [u'runtime.id']), - info={"check_ifexists": True} - ) - - op.create_table( - 'function_service_mapping', - sa.Column('id', sa.String(length=36), nullable=False), - sa.Column('created_at', sa.DateTime(), nullable=True), - sa.Column('updated_at', sa.DateTime(), nullable=True), - sa.Column('function_id', sa.String(length=36), nullable=False), - sa.Column('service_url', sa.String(length=255), nullable=False), - sa.Column('worker_name', sa.String(length=255), nullable=False), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('function_id', 'service_url'), - sa.ForeignKeyConstraint( - ['function_id'], [u'function.id'], ondelete='CASCADE' - ), - info={"check_ifexists": True} - ) - - op.create_table( - 'runtime', + 'runtimes', sa.Column('created_at', sa.DateTime(), nullable=True), sa.Column('updated_at', sa.DateTime(), nullable=True), sa.Column('project_id', sa.String(length=80), nullable=False), @@ -96,7 +60,56 @@ def upgrade(): ) op.create_table( - 'execution', + 'functions', + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('project_id', sa.String(length=80), nullable=False), + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('name', sa.String(length=255), nullable=True), + sa.Column('description', sa.String(length=255), nullable=True), + sa.Column('runtime_id', sa.String(length=36), nullable=True), + sa.Column('memory_size', sa.Integer, nullable=True), + sa.Column('timeout', sa.Integer, nullable=True), + sa.Column('code', st.JsonLongDictType(), nullable=False), + sa.Column('entry', sa.String(length=80), nullable=False), + sa.Column('count', sa.Integer, nullable=False), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('name', 'project_id'), + sa.ForeignKeyConstraint(['runtime_id'], [u'runtimes.id']), + info={"check_ifexists": True} + ) + + op.create_table( + 'function_service_mappings', + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('function_id', sa.String(length=36), nullable=False), + sa.Column('service_url', sa.String(length=255), nullable=False), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('function_id', 'service_url'), + sa.ForeignKeyConstraint( + ['function_id'], [u'functions.id'], ondelete='CASCADE' + ), + info={"check_ifexists": True} + ) + + op.create_table( + 'function_workers', + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('function_id', sa.String(length=36), nullable=False), + sa.Column('worker_name', sa.String(length=255), nullable=False), + sa.PrimaryKeyConstraint('id'), + sa.ForeignKeyConstraint( + ['function_id'], [u'functions.id'], ondelete='CASCADE' + ), + info={"check_ifexists": True} + ) + + op.create_table( + 'executions', sa.Column('created_at', sa.DateTime(), nullable=True), sa.Column('updated_at', sa.DateTime(), nullable=True), sa.Column('project_id', sa.String(length=80), nullable=False), @@ -113,7 +126,7 @@ def upgrade(): ) op.create_table( - 'job', + 'jobs', sa.Column('created_at', sa.DateTime(), nullable=True), sa.Column('updated_at', sa.DateTime(), nullable=True), sa.Column('project_id', sa.String(length=80), nullable=False), @@ -128,6 +141,6 @@ def upgrade(): sa.Column('count', sa.Integer(), nullable=True), sa.Column('trust_id', sa.String(length=80), nullable=True), sa.PrimaryKeyConstraint('id'), - sa.ForeignKeyConstraint(['function_id'], [u'function.id']), + sa.ForeignKeyConstraint(['function_id'], [u'functions.id']), info={"check_ifexists": True} ) diff --git a/qinling/db/sqlalchemy/models.py b/qinling/db/sqlalchemy/models.py index 4c5cca8f..5df35be5 100644 --- a/qinling/db/sqlalchemy/models.py +++ b/qinling/db/sqlalchemy/models.py @@ -21,7 +21,7 @@ from qinling.utils import common class Runtime(model_base.QinlingSecureModelBase): - __tablename__ = 'runtime' + __tablename__ = 'runtimes' __table_args__ = ( sa.UniqueConstraint('image', 'project_id'), @@ -34,7 +34,7 @@ class Runtime(model_base.QinlingSecureModelBase): class Function(model_base.QinlingSecureModelBase): - __tablename__ = 'function' + __tablename__ = 'functions' __table_args__ = ( sa.UniqueConstraint('name', 'project_id'), @@ -54,7 +54,7 @@ class Function(model_base.QinlingSecureModelBase): class FunctionServiceMapping(model_base.QinlingModelBase): - __tablename__ = 'function_service_mapping' + __tablename__ = 'function_service_mappings' __table_args__ = ( sa.UniqueConstraint('function_id', 'service_url'), @@ -66,11 +66,21 @@ class FunctionServiceMapping(model_base.QinlingModelBase): sa.ForeignKey(Function.id, ondelete='CASCADE'), ) service_url = sa.Column(sa.String(255), nullable=False) + + +class FunctionWorkers(model_base.QinlingModelBase): + __tablename__ = 'function_workers' + + id = model_base.id_column() + function_id = sa.Column( + sa.String(36), + sa.ForeignKey(Function.id, ondelete='CASCADE'), + ) worker_name = sa.Column(sa.String(255), nullable=False) class Execution(model_base.QinlingSecureModelBase): - __tablename__ = 'execution' + __tablename__ = 'executions' function_id = sa.Column(sa.String(36), nullable=False) status = sa.Column(sa.String(32), nullable=False) @@ -82,7 +92,7 @@ class Execution(model_base.QinlingSecureModelBase): class Job(model_base.QinlingSecureModelBase): - __tablename__ = 'job' + __tablename__ = 'jobs' name = sa.Column(sa.String(255), nullable=True) pattern = sa.Column(sa.String(32), nullable=True) @@ -111,6 +121,11 @@ Function.service = relationship( uselist=False, cascade="all, delete-orphan" ) +# Delete workers automatically when deleting function. +Function.workers = relationship( + "FunctionWorkers", + cascade="all, delete-orphan" +) Runtime.functions = relationship("Function", back_populates="runtime") diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index 8ca5a423..01f07828 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -103,16 +103,16 @@ class DefaultEngine(object): data = {'input': input, 'execution_id': execution_id} r = requests.post(func_url, json=data) - logs = self.orchestrator.get_execution_log( - execution_id, - worker_name=function.service.worker_name, - ) + # logs = self.orchestrator.get_execution_log( + # execution_id, + # worker_name=function.service.worker_name, + # ) LOG.debug('Finished execution %s', execution_id) execution.status = status.SUCCESS execution.output = r.json() - execution.logs = logs + # execution.logs = logs return source = function.code['source'] @@ -169,9 +169,13 @@ class DefaultEngine(object): mapping = { 'function_id': function_id, 'service_url': service_url, - 'worker_name': worker_name } db_api.create_function_service_mapping(mapping) + worker = { + 'function_id': function_id, + 'worker_name': worker_name + } + db_api.create_function_worker(worker) def delete_function(self, ctx, function_id): resource = {'type': 'function', 'id': function_id} @@ -181,3 +185,22 @@ class DefaultEngine(object): self.orchestrator.delete_function(function_id, labels=labels) LOG.info('Deleted.', resource=resource) + + def scaleup_function(self, ctx, function_id, runtime_id): + function = db_api.get_function(function_id) + + worker_names = self.orchestrator.scaleup_function( + function_id, + identifier=runtime_id, + entry=function.entry + ) + + with db_api.transaction(): + for name in worker_names: + worker = { + 'function_id': function_id, + 'worker_name': name + } + db_api.create_function_worker(worker) + + LOG.info('Finished scaling up function %s.', function_id) diff --git a/qinling/orchestrator/base.py b/qinling/orchestrator/base.py index d7432fbe..f6c1e77b 100644 --- a/qinling/orchestrator/base.py +++ b/qinling/orchestrator/base.py @@ -54,6 +54,10 @@ class OrchestratorBase(object): def delete_function(self, function_id, **kwargs): raise NotImplementedError + @abc.abstractmethod + def scaleup_function(self, function_id, **kwargs): + raise NotImplementedError + def load_orchestrator(conf): global ORCHESTRATOR diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index 4afa3118..ed6793c6 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy import os import time @@ -195,7 +196,7 @@ class KubernetesManager(base.OrchestratorBase): return True - def _choose_available_pod(self, labels): + def _choose_available_pod(self, labels, count=1): selector = common.convert_dict_to_string(labels) ret = self.v1.list_namespaced_pod( @@ -206,18 +207,17 @@ class KubernetesManager(base.OrchestratorBase): if len(ret.items) == 0: return None - # Choose the last available one by default. - pod = ret.items[-1] + return ret.items[-count:] - return pod - - def _prepare_pod(self, pod, deployment_name, function_id, labels, entry): + def _prepare_pod(self, pod, deployment_name, function_id, labels=None, + entry=None, actual_function=None): """Pod preparation. 1. Update pod labels. 2. Expose service and trigger package download. """ name = pod.metadata.name + actual_function = actual_function or function_id LOG.info( 'Prepare pod %s in deployment %s for function %s', @@ -225,18 +225,7 @@ class KubernetesManager(base.OrchestratorBase): ) # Update pod label. - pod_labels = pod.metadata.labels or {} - pod_labels.update({'function_id': function_id}) - body = { - 'metadata': { - 'labels': pod_labels - } - } - self.v1.patch_namespaced_pod( - name, self.conf.kubernetes.namespace, body - ) - - LOG.debug('Labels updated for pod %s', name) + pod_labels = self._update_pod_label(pod, {'function_id': function_id}) # Create service for the chosen pod. service_name = "service-%s" % function_id @@ -278,12 +267,12 @@ class KubernetesManager(base.OrchestratorBase): download_url = ( 'http://%s:%s/v1/functions/%s?download=true' % (self.conf.kubernetes.qinling_service_address, - self.conf.api.port, function_id) + self.conf.api.port, actual_function) ) data = { 'download_url': download_url, - 'function_id': function_id, + 'function_id': actual_function, 'entry': entry, 'token': context.get_ctx().auth_token, } @@ -325,6 +314,24 @@ class KubernetesManager(base.OrchestratorBase): body=yaml.safe_load(pod_body), ) + def _update_pod_label(self, pod, new_label=None): + name = pod.metadata.name + + pod_labels = copy.deepcopy(pod.metadata.labels) or {} + pod_labels.update(new_label) + body = { + 'metadata': { + 'labels': pod_labels + } + } + self.v1.patch_namespaced_pod( + name, self.conf.kubernetes.namespace, body + ) + + LOG.debug('Labels updated for pod %s', name) + + return pod_labels + def prepare_execution(self, function_id, image=None, identifier=None, labels=None, input=None, entry='main.main'): """Prepare service URL for function. @@ -346,7 +353,8 @@ class KubernetesManager(base.OrchestratorBase): if not pod: raise exc.OrchestratorException('No pod available.') - return self._prepare_pod(pod, identifier, function_id, labels, entry) + return self._prepare_pod(pod[0], identifier, function_id, labels, + entry) def run_execution(self, execution_id, function_id, input=None, identifier=None, service_url=None): @@ -415,3 +423,41 @@ class KubernetesManager(base.OrchestratorBase): ) LOG.info("Pod for function %s deleted.", function_id) + + def scaleup_function(self, function_id, identifier=None, + entry='main.main'): + pod_names = [] + labels = {'runtime_id': identifier} + pods = self._choose_available_pod( + labels, count=self.conf.kubernetes.scale_step + ) + + if not pods: + raise exc.OrchestratorException('Not enough pods available.') + + temp_function = '%s-temp' % function_id + for pod in pods: + self._prepare_pod(pod, identifier, temp_function, labels, entry, + actual_function=function_id) + + # Delete temporary service + selector = common.convert_dict_to_string( + {'function_id': temp_function} + ) + ret = self.v1.list_namespaced_service( + self.conf.kubernetes.namespace, label_selector=selector + ) + svc_names = [i.metadata.name for i in ret.items] + for svc_name in svc_names: + self.v1.delete_namespaced_service( + svc_name, + self.conf.kubernetes.namespace, + ) + + # Modify pod labels to fit into correct service + self._update_pod_label(pod, {'function_id': function_id}) + + pod_names.append(pod.metadata.name) + + LOG.info('Pods scaled up for function %s: %s', function_id, pod_names) + return pod_names diff --git a/qinling/rpc.py b/qinling/rpc.py index e8835450..ea33c9c2 100644 --- a/qinling/rpc.py +++ b/qinling/rpc.py @@ -186,3 +186,12 @@ class EngineClient(object): 'delete_function', function_id=id ) + + @wrap_messaging_exception + def scaleup_function(self, id, runtime_id): + return self._client.prepare(topic=self.topic, server=None).cast( + ctx.get_ctx(), + 'scaleup_function', + function_id=id, + runtime_id=runtime_id, + )