diff --git a/qinling/api/controllers/v1/function.py b/qinling/api/controllers/v1/function.py index 052e5587..ae23487a 100644 --- a/qinling/api/controllers/v1/function.py +++ b/qinling/api/controllers/v1/function.py @@ -29,6 +29,7 @@ from qinling.api.controllers.v1 import types from qinling import context from qinling.db import api as db_api from qinling import exceptions as exc +from qinling import rpc from qinling.storage import base as storage_base from qinling.utils.openstack import swift as swift_util from qinling.utils import rest_utils @@ -36,13 +37,14 @@ from qinling.utils import rest_utils LOG = logging.getLogger(__name__) CONF = cfg.CONF -POST_REQUIRED = set(['name', 'runtime_id', 'code']) +POST_REQUIRED = set(['name', 'code']) CODE_SOURCE = set(['package', 'swift', 'image']) class FunctionsController(rest.RestController): def __init__(self, *args, **kwargs): self.storage_provider = storage_base.load_storage_provider(CONF) + self.engine_client = rpc.get_engine_client() super(FunctionsController, self).__init__(*args, **kwargs) @@ -69,6 +71,13 @@ class FunctionsController(rest.RestController): container = func_db.code['swift']['container'] obj = func_db.code['swift']['object'] f = swift_util.download_object(container, obj) + else: + msg = 'Download image function is not allowed.' + pecan.abort( + status_code=405, + detail=msg, + headers={'Server-Error-Message': msg} + ) pecan.response.app_iter = (f if isinstance(f, collections.Iterable) else FileIter(f)) @@ -82,21 +91,17 @@ class FunctionsController(rest.RestController): def post(self, **kwargs): LOG.info("Creating function, params=%s", kwargs) + # When using image to create function, runtime_id is not a required + # param. if not POST_REQUIRED.issubset(set(kwargs.keys())): raise exc.InputException( 'Required param is missing. Required: %s' % POST_REQUIRED ) - runtime = db_api.get_runtime(kwargs['runtime_id']) - if runtime.status != 'available': - raise exc.InputException( - 'Runtime %s not available.' % kwargs['runtime_id'] - ) - values = { 'name': kwargs['name'], - 'description': kwargs.get('description', None), - 'runtime_id': kwargs['runtime_id'], + 'description': kwargs.get('description'), + 'runtime_id': kwargs.get('runtime_id'), 'code': json.loads(kwargs['code']), 'entry': kwargs.get('entry', 'main'), } @@ -105,9 +110,19 @@ class FunctionsController(rest.RestController): if not source or source not in CODE_SOURCE: raise exc.InputException( 'Invalid code source specified, available sources: %s' % - CODE_SOURCE + ', '.join(CODE_SOURCE) ) + if source != 'image': + if not kwargs.get('runtime_id'): + raise exc.InputException('"runtime_id" must be specified.') + + runtime = db_api.get_runtime(kwargs['runtime_id']) + if runtime.status != 'available': + raise exc.InputException( + 'Runtime %s is not available.' % kwargs['runtime_id'] + ) + store = False if values['code']['source'] == 'package': store = True @@ -124,12 +139,12 @@ class FunctionsController(rest.RestController): if not swift_util.check_object(container, object): raise exc.InputException('Object does not exist in Swift.') - ctx = context.get_ctx() - with db_api.transaction(): func_db = db_api.create_function(values) if store: + ctx = context.get_ctx() + self.storage_provider.store( ctx.projectid, func_db.id, @@ -161,6 +176,8 @@ class FunctionsController(rest.RestController): if source == 'package': self.storage_provider.delete(context.get_ctx().projectid, id) + if source == 'image': + self.engine_client.delete_function(id, func_db.name) # This will also delete function service mapping as well. db_api.delete_function(id) diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py index 2031c432..8b394cd1 100644 --- a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py +++ b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py @@ -52,7 +52,7 @@ def upgrade(): sa.Column('id', sa.String(length=36), nullable=False), sa.Column('name', sa.String(length=255), nullable=True), sa.Column('description', sa.String(length=255), nullable=True), - sa.Column('runtime_id', sa.String(length=36), nullable=False), + sa.Column('runtime_id', sa.String(length=36), nullable=True), sa.Column('memory_size', sa.Integer, nullable=True), sa.Column('timeout', sa.Integer, nullable=True), sa.Column('code', st.JsonLongDictType(), nullable=False), diff --git a/qinling/db/sqlalchemy/models.py b/qinling/db/sqlalchemy/models.py index ca94b974..e6010533 100644 --- a/qinling/db/sqlalchemy/models.py +++ b/qinling/db/sqlalchemy/models.py @@ -27,7 +27,7 @@ class Function(model_base.QinlingSecureModelBase): name = sa.Column(sa.String(255), nullable=False) description = sa.Column(sa.String(255)) - runtime_id = sa.Column(sa.String(36), nullable=False) + runtime_id = sa.Column(sa.String(36), nullable=True) memory_size = sa.Column(sa.Integer) timeout = sa.Column(sa.Integer) code = sa.Column(st.JsonLongDictType(), nullable=False) diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index 040cd191..cb3d8d3e 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -15,6 +15,7 @@ from oslo_log import log as logging from qinling.db import api as db_api +from qinling.utils import common LOG = logging.getLogger(__name__) @@ -72,16 +73,42 @@ class DefaultEngine(object): with db_api.transaction(): execution = db_api.get_execution(execution_id) - runtime = db_api.get_runtime(runtime_id) - identifier = '%s-%s' % (runtime_id, runtime.name) - labels = {'runtime_name': runtime.name, 'runtime_id': runtime_id} + function = db_api.get_function(function_id) + + source = function.code['source'] + image = None + identifier = None + labels = None + + if source == 'image': + image = function.code['image'] + identifier = ('%s-%s' % + (common.generate_unicode_uuid(dashed=False), + function_id) + )[:63] + labels = { + 'function_name': function.name, 'function_id': function_id + } + else: + runtime = db_api.get_runtime(runtime_id) + identifier = ('%s-%s' % (runtime_id, runtime.name))[:63] + labels = { + 'runtime_name': runtime.name, 'runtime_id': runtime_id + } service_url = self.orchestrator.prepare_execution( - function_id, identifier=identifier, labels=labels + function_id, + image=image, + identifier=identifier, + labels=labels, + input=input ) output = self.orchestrator.run_execution( - function_id, input=input, service_url=service_url + function_id, + input=input, + identifier=identifier, + service_url=service_url, ) LOG.debug( @@ -93,8 +120,17 @@ class DefaultEngine(object): execution.output = output execution.status = 'success' - mapping = { - 'function_id': function_id, - 'service_url': service_url - } - db_api.create_function_service_mapping(mapping) + if not image: + mapping = { + 'function_id': function_id, + 'service_url': service_url + } + db_api.create_function_service_mapping(mapping) + + def delete_function(self, ctx, function_id, function_name): + LOG.info('Start to delete function, id=%s', function_id) + + labels = { + 'function_name': function_name, 'function_id': function_id + } + self.orchestrator.delete_function(labels) diff --git a/qinling/orchestrator/base.py b/qinling/orchestrator/base.py index ddb641e5..9ff083d0 100644 --- a/qinling/orchestrator/base.py +++ b/qinling/orchestrator/base.py @@ -42,6 +42,10 @@ class OrchestratorBase(object): def run_execution(self, function_id, **kwargs): raise NotImplementedError + @abc.abstractmethod + def delete_function(self, labels): + raise NotImplementedError + def load_orchestrator(conf): global ORCHESTRATOR diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index 8b5d0325..529323d1 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -53,6 +53,7 @@ class KubernetesManager(base.OrchestratorBase): self.deployment_template = jinja_env.get_template('deployment.j2') self.service_template = jinja_env.get_template('service.j2') + self.pod_template = jinja_env.get_template('pod.j2') def _ensure_namespace(self): ret = self.v1.list_namespace() @@ -249,19 +250,82 @@ class KubernetesManager(base.OrchestratorBase): return pod_service_url - def prepare_execution(self, function_id, identifier=None, labels=None): - pod = self._choose_available_pod(labels) + def _create_pod(self, image, pod_name, labels, input): + pod_body = self.pod_template.render( + { + "pod_name": pod_name, + "labels": labels, + "pod_image": image, + "input": input + } + ) + + LOG.info( + "Creating pod %s for image function:\n%s", pod_name, pod_body + ) + + self.v1.create_namespaced_pod( + self.conf.kubernetes.namespace, + body=yaml.safe_load(pod_body), + ) + + def prepare_execution(self, function_id, image=None, identifier=None, + labels=None, input=None): + """Prepare service URL for function. + + For image function, create a single pod with input, so the function + will be executed. + + For normal function, choose a pod from the pool and expose a service, + return the service URL. + """ + pod = None + + if image: + self._create_pod(image, identifier, labels, input) + return None + else: + pod = self._choose_available_pod(labels) if not pod: raise exc.OrchestratorException('No pod available.') return self._prepare_pod(pod, identifier, function_id, labels) - def run_execution(self, function_id, input=None, service_url=None): - func_url = '%s/execute' % service_url + def run_execution(self, function_id, input=None, identifier=None, + service_url=None): + if service_url: + func_url = '%s/execute' % service_url + LOG.info('Invoke function %s, url: %s', function_id, func_url) - LOG.info('Invoke function %s, url: %s', function_id, func_url) + r = requests.post(func_url, data=input) - r = requests.post(func_url, data=input) + return {'result': r.json()} + else: + status = None - return {'result': r.json()} + # Wait for execution to be finished. + # TODO(kong): Do not retry infinitely. + while status != 'Succeeded': + pod = self.v1.read_namespaced_pod( + identifier, + self.conf.kubernetes.namespace + ) + status = pod.status.phase + + time.sleep(0.5) + + output = self.v1.read_namespaced_pod_log( + identifier, + self.conf.kubernetes.namespace, + ) + + return {'result': output} + + def delete_function(self, labels): + selector = common.convert_dict_to_string(labels) + + self.v1.delete_collection_namespaced_pod( + self.conf.kubernetes.namespace, + label_selector=selector + ) diff --git a/qinling/orchestrator/kubernetes/templates/pod.j2 b/qinling/orchestrator/kubernetes/templates/pod.j2 new file mode 100644 index 00000000..06bc199f --- /dev/null +++ b/qinling/orchestrator/kubernetes/templates/pod.j2 @@ -0,0 +1,20 @@ +apiVersion: v1 +kind: Pod +metadata: + name: {{ pod_name }} + labels: + {% for key, value in labels.items() %} + {{ key }}: {{ value }} + {% endfor %} +spec: + containers: + - name: {{ pod_name }} + image: {{ pod_image }} + imagePullPolicy: IfNotPresent + {% if input %} + args: + {% for item in input %} + - {{ item }} + {% endfor %} + {% endif %} + restartPolicy: Never diff --git a/qinling/rpc.py b/qinling/rpc.py index 36b16d88..8acab36e 100644 --- a/qinling/rpc.py +++ b/qinling/rpc.py @@ -156,3 +156,12 @@ class EngineClient(object): runtime_id=runtime_id, input=input ) + + @wrap_messaging_exception + def delete_function(self, id, name): + return self._client.prepare(topic=self.topic, server=None).cast( + ctx.get_ctx(), + 'delete_function', + function_id=id, + function_name=name + ) diff --git a/qinling/utils/common.py b/qinling/utils/common.py index be8a7e12..eafdc739 100644 --- a/qinling/utils/common.py +++ b/qinling/utils/common.py @@ -23,8 +23,8 @@ def convert_dict_to_string(d): return ','.join(temp_list) -def generate_unicode_uuid(): - return uuidutils.generate_uuid() +def generate_unicode_uuid(dashed=True): + return uuidutils.generate_uuid(dashed=dashed) def disable_ssl_warnings(func): diff --git a/qinling/utils/openstack/keystone.py b/qinling/utils/openstack/keystone.py index 6e07275a..0f6d8aef 100644 --- a/qinling/utils/openstack/keystone.py +++ b/qinling/utils/openstack/keystone.py @@ -21,8 +21,6 @@ from qinling import context CONF = cfg.CONF -KS_SESSION = None - def _get_user_keystone_session(): ctx = context.get_ctx()