From 14789ab0c8c2c74d21590179cf0dbc3106a45292 Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Thu, 18 May 2017 17:19:13 +1200 Subject: [PATCH] Support docker image as function code Allow user to create function using her own docker image including all the code and dependencies inside. The image needs to have an script running as entry point, expecting user's input as params. This allow users to execute functions written in any language. --- qinling/api/controllers/v1/function.py | 41 +++++++--- .../alembic_migrations/versions/001_pike.py | 2 +- qinling/db/sqlalchemy/models.py | 2 +- qinling/engine/default_engine.py | 56 ++++++++++--- qinling/orchestrator/base.py | 4 + qinling/orchestrator/kubernetes/manager.py | 78 +++++++++++++++++-- .../orchestrator/kubernetes/templates/pod.j2 | 20 +++++ qinling/rpc.py | 9 +++ qinling/utils/common.py | 4 +- qinling/utils/openstack/keystone.py | 2 - 10 files changed, 183 insertions(+), 35 deletions(-) create mode 100644 qinling/orchestrator/kubernetes/templates/pod.j2 diff --git a/qinling/api/controllers/v1/function.py b/qinling/api/controllers/v1/function.py index 052e5587..ae23487a 100644 --- a/qinling/api/controllers/v1/function.py +++ b/qinling/api/controllers/v1/function.py @@ -29,6 +29,7 @@ from qinling.api.controllers.v1 import types from qinling import context from qinling.db import api as db_api from qinling import exceptions as exc +from qinling import rpc from qinling.storage import base as storage_base from qinling.utils.openstack import swift as swift_util from qinling.utils import rest_utils @@ -36,13 +37,14 @@ from qinling.utils import rest_utils LOG = logging.getLogger(__name__) CONF = cfg.CONF -POST_REQUIRED = set(['name', 'runtime_id', 'code']) +POST_REQUIRED = set(['name', 'code']) CODE_SOURCE = set(['package', 'swift', 'image']) class FunctionsController(rest.RestController): def __init__(self, *args, **kwargs): self.storage_provider = storage_base.load_storage_provider(CONF) + self.engine_client = rpc.get_engine_client() super(FunctionsController, self).__init__(*args, **kwargs) @@ -69,6 +71,13 @@ class FunctionsController(rest.RestController): container = func_db.code['swift']['container'] obj = func_db.code['swift']['object'] f = swift_util.download_object(container, obj) + else: + msg = 'Download image function is not allowed.' + pecan.abort( + status_code=405, + detail=msg, + headers={'Server-Error-Message': msg} + ) pecan.response.app_iter = (f if isinstance(f, collections.Iterable) else FileIter(f)) @@ -82,21 +91,17 @@ class FunctionsController(rest.RestController): def post(self, **kwargs): LOG.info("Creating function, params=%s", kwargs) + # When using image to create function, runtime_id is not a required + # param. if not POST_REQUIRED.issubset(set(kwargs.keys())): raise exc.InputException( 'Required param is missing. Required: %s' % POST_REQUIRED ) - runtime = db_api.get_runtime(kwargs['runtime_id']) - if runtime.status != 'available': - raise exc.InputException( - 'Runtime %s not available.' % kwargs['runtime_id'] - ) - values = { 'name': kwargs['name'], - 'description': kwargs.get('description', None), - 'runtime_id': kwargs['runtime_id'], + 'description': kwargs.get('description'), + 'runtime_id': kwargs.get('runtime_id'), 'code': json.loads(kwargs['code']), 'entry': kwargs.get('entry', 'main'), } @@ -105,9 +110,19 @@ class FunctionsController(rest.RestController): if not source or source not in CODE_SOURCE: raise exc.InputException( 'Invalid code source specified, available sources: %s' % - CODE_SOURCE + ', '.join(CODE_SOURCE) ) + if source != 'image': + if not kwargs.get('runtime_id'): + raise exc.InputException('"runtime_id" must be specified.') + + runtime = db_api.get_runtime(kwargs['runtime_id']) + if runtime.status != 'available': + raise exc.InputException( + 'Runtime %s is not available.' % kwargs['runtime_id'] + ) + store = False if values['code']['source'] == 'package': store = True @@ -124,12 +139,12 @@ class FunctionsController(rest.RestController): if not swift_util.check_object(container, object): raise exc.InputException('Object does not exist in Swift.') - ctx = context.get_ctx() - with db_api.transaction(): func_db = db_api.create_function(values) if store: + ctx = context.get_ctx() + self.storage_provider.store( ctx.projectid, func_db.id, @@ -161,6 +176,8 @@ class FunctionsController(rest.RestController): if source == 'package': self.storage_provider.delete(context.get_ctx().projectid, id) + if source == 'image': + self.engine_client.delete_function(id, func_db.name) # This will also delete function service mapping as well. db_api.delete_function(id) diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py index 2031c432..8b394cd1 100644 --- a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py +++ b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py @@ -52,7 +52,7 @@ def upgrade(): sa.Column('id', sa.String(length=36), nullable=False), sa.Column('name', sa.String(length=255), nullable=True), sa.Column('description', sa.String(length=255), nullable=True), - sa.Column('runtime_id', sa.String(length=36), nullable=False), + sa.Column('runtime_id', sa.String(length=36), nullable=True), sa.Column('memory_size', sa.Integer, nullable=True), sa.Column('timeout', sa.Integer, nullable=True), sa.Column('code', st.JsonLongDictType(), nullable=False), diff --git a/qinling/db/sqlalchemy/models.py b/qinling/db/sqlalchemy/models.py index ca94b974..e6010533 100644 --- a/qinling/db/sqlalchemy/models.py +++ b/qinling/db/sqlalchemy/models.py @@ -27,7 +27,7 @@ class Function(model_base.QinlingSecureModelBase): name = sa.Column(sa.String(255), nullable=False) description = sa.Column(sa.String(255)) - runtime_id = sa.Column(sa.String(36), nullable=False) + runtime_id = sa.Column(sa.String(36), nullable=True) memory_size = sa.Column(sa.Integer) timeout = sa.Column(sa.Integer) code = sa.Column(st.JsonLongDictType(), nullable=False) diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index 040cd191..cb3d8d3e 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -15,6 +15,7 @@ from oslo_log import log as logging from qinling.db import api as db_api +from qinling.utils import common LOG = logging.getLogger(__name__) @@ -72,16 +73,42 @@ class DefaultEngine(object): with db_api.transaction(): execution = db_api.get_execution(execution_id) - runtime = db_api.get_runtime(runtime_id) - identifier = '%s-%s' % (runtime_id, runtime.name) - labels = {'runtime_name': runtime.name, 'runtime_id': runtime_id} + function = db_api.get_function(function_id) + + source = function.code['source'] + image = None + identifier = None + labels = None + + if source == 'image': + image = function.code['image'] + identifier = ('%s-%s' % + (common.generate_unicode_uuid(dashed=False), + function_id) + )[:63] + labels = { + 'function_name': function.name, 'function_id': function_id + } + else: + runtime = db_api.get_runtime(runtime_id) + identifier = ('%s-%s' % (runtime_id, runtime.name))[:63] + labels = { + 'runtime_name': runtime.name, 'runtime_id': runtime_id + } service_url = self.orchestrator.prepare_execution( - function_id, identifier=identifier, labels=labels + function_id, + image=image, + identifier=identifier, + labels=labels, + input=input ) output = self.orchestrator.run_execution( - function_id, input=input, service_url=service_url + function_id, + input=input, + identifier=identifier, + service_url=service_url, ) LOG.debug( @@ -93,8 +120,17 @@ class DefaultEngine(object): execution.output = output execution.status = 'success' - mapping = { - 'function_id': function_id, - 'service_url': service_url - } - db_api.create_function_service_mapping(mapping) + if not image: + mapping = { + 'function_id': function_id, + 'service_url': service_url + } + db_api.create_function_service_mapping(mapping) + + def delete_function(self, ctx, function_id, function_name): + LOG.info('Start to delete function, id=%s', function_id) + + labels = { + 'function_name': function_name, 'function_id': function_id + } + self.orchestrator.delete_function(labels) diff --git a/qinling/orchestrator/base.py b/qinling/orchestrator/base.py index ddb641e5..9ff083d0 100644 --- a/qinling/orchestrator/base.py +++ b/qinling/orchestrator/base.py @@ -42,6 +42,10 @@ class OrchestratorBase(object): def run_execution(self, function_id, **kwargs): raise NotImplementedError + @abc.abstractmethod + def delete_function(self, labels): + raise NotImplementedError + def load_orchestrator(conf): global ORCHESTRATOR diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index 8b5d0325..529323d1 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -53,6 +53,7 @@ class KubernetesManager(base.OrchestratorBase): self.deployment_template = jinja_env.get_template('deployment.j2') self.service_template = jinja_env.get_template('service.j2') + self.pod_template = jinja_env.get_template('pod.j2') def _ensure_namespace(self): ret = self.v1.list_namespace() @@ -249,19 +250,82 @@ class KubernetesManager(base.OrchestratorBase): return pod_service_url - def prepare_execution(self, function_id, identifier=None, labels=None): - pod = self._choose_available_pod(labels) + def _create_pod(self, image, pod_name, labels, input): + pod_body = self.pod_template.render( + { + "pod_name": pod_name, + "labels": labels, + "pod_image": image, + "input": input + } + ) + + LOG.info( + "Creating pod %s for image function:\n%s", pod_name, pod_body + ) + + self.v1.create_namespaced_pod( + self.conf.kubernetes.namespace, + body=yaml.safe_load(pod_body), + ) + + def prepare_execution(self, function_id, image=None, identifier=None, + labels=None, input=None): + """Prepare service URL for function. + + For image function, create a single pod with input, so the function + will be executed. + + For normal function, choose a pod from the pool and expose a service, + return the service URL. + """ + pod = None + + if image: + self._create_pod(image, identifier, labels, input) + return None + else: + pod = self._choose_available_pod(labels) if not pod: raise exc.OrchestratorException('No pod available.') return self._prepare_pod(pod, identifier, function_id, labels) - def run_execution(self, function_id, input=None, service_url=None): - func_url = '%s/execute' % service_url + def run_execution(self, function_id, input=None, identifier=None, + service_url=None): + if service_url: + func_url = '%s/execute' % service_url + LOG.info('Invoke function %s, url: %s', function_id, func_url) - LOG.info('Invoke function %s, url: %s', function_id, func_url) + r = requests.post(func_url, data=input) - r = requests.post(func_url, data=input) + return {'result': r.json()} + else: + status = None - return {'result': r.json()} + # Wait for execution to be finished. + # TODO(kong): Do not retry infinitely. + while status != 'Succeeded': + pod = self.v1.read_namespaced_pod( + identifier, + self.conf.kubernetes.namespace + ) + status = pod.status.phase + + time.sleep(0.5) + + output = self.v1.read_namespaced_pod_log( + identifier, + self.conf.kubernetes.namespace, + ) + + return {'result': output} + + def delete_function(self, labels): + selector = common.convert_dict_to_string(labels) + + self.v1.delete_collection_namespaced_pod( + self.conf.kubernetes.namespace, + label_selector=selector + ) diff --git a/qinling/orchestrator/kubernetes/templates/pod.j2 b/qinling/orchestrator/kubernetes/templates/pod.j2 new file mode 100644 index 00000000..06bc199f --- /dev/null +++ b/qinling/orchestrator/kubernetes/templates/pod.j2 @@ -0,0 +1,20 @@ +apiVersion: v1 +kind: Pod +metadata: + name: {{ pod_name }} + labels: + {% for key, value in labels.items() %} + {{ key }}: {{ value }} + {% endfor %} +spec: + containers: + - name: {{ pod_name }} + image: {{ pod_image }} + imagePullPolicy: IfNotPresent + {% if input %} + args: + {% for item in input %} + - {{ item }} + {% endfor %} + {% endif %} + restartPolicy: Never diff --git a/qinling/rpc.py b/qinling/rpc.py index 36b16d88..8acab36e 100644 --- a/qinling/rpc.py +++ b/qinling/rpc.py @@ -156,3 +156,12 @@ class EngineClient(object): runtime_id=runtime_id, input=input ) + + @wrap_messaging_exception + def delete_function(self, id, name): + return self._client.prepare(topic=self.topic, server=None).cast( + ctx.get_ctx(), + 'delete_function', + function_id=id, + function_name=name + ) diff --git a/qinling/utils/common.py b/qinling/utils/common.py index be8a7e12..eafdc739 100644 --- a/qinling/utils/common.py +++ b/qinling/utils/common.py @@ -23,8 +23,8 @@ def convert_dict_to_string(d): return ','.join(temp_list) -def generate_unicode_uuid(): - return uuidutils.generate_uuid() +def generate_unicode_uuid(dashed=True): + return uuidutils.generate_uuid(dashed=dashed) def disable_ssl_warnings(func): diff --git a/qinling/utils/openstack/keystone.py b/qinling/utils/openstack/keystone.py index 6e07275a..0f6d8aef 100644 --- a/qinling/utils/openstack/keystone.py +++ b/qinling/utils/openstack/keystone.py @@ -21,8 +21,6 @@ from qinling import context CONF = cfg.CONF -KS_SESSION = None - def _get_user_keystone_session(): ctx = context.get_ctx()