Support docker image as function code

Allow user to create function using her own docker image including
all the code and dependencies inside. The image needs to have an
script running as entry point, expecting user's input as params.

This allow users to execute functions written in any language.
This commit is contained in:
Lingxian Kong 2017-05-18 17:19:13 +12:00
parent 0efa51ae24
commit 14789ab0c8
10 changed files with 183 additions and 35 deletions

View File

@ -29,6 +29,7 @@ from qinling.api.controllers.v1 import types
from qinling import context from qinling import context
from qinling.db import api as db_api from qinling.db import api as db_api
from qinling import exceptions as exc from qinling import exceptions as exc
from qinling import rpc
from qinling.storage import base as storage_base from qinling.storage import base as storage_base
from qinling.utils.openstack import swift as swift_util from qinling.utils.openstack import swift as swift_util
from qinling.utils import rest_utils from qinling.utils import rest_utils
@ -36,13 +37,14 @@ from qinling.utils import rest_utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
POST_REQUIRED = set(['name', 'runtime_id', 'code']) POST_REQUIRED = set(['name', 'code'])
CODE_SOURCE = set(['package', 'swift', 'image']) CODE_SOURCE = set(['package', 'swift', 'image'])
class FunctionsController(rest.RestController): class FunctionsController(rest.RestController):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.storage_provider = storage_base.load_storage_provider(CONF) self.storage_provider = storage_base.load_storage_provider(CONF)
self.engine_client = rpc.get_engine_client()
super(FunctionsController, self).__init__(*args, **kwargs) super(FunctionsController, self).__init__(*args, **kwargs)
@ -69,6 +71,13 @@ class FunctionsController(rest.RestController):
container = func_db.code['swift']['container'] container = func_db.code['swift']['container']
obj = func_db.code['swift']['object'] obj = func_db.code['swift']['object']
f = swift_util.download_object(container, obj) f = swift_util.download_object(container, obj)
else:
msg = 'Download image function is not allowed.'
pecan.abort(
status_code=405,
detail=msg,
headers={'Server-Error-Message': msg}
)
pecan.response.app_iter = (f if isinstance(f, collections.Iterable) pecan.response.app_iter = (f if isinstance(f, collections.Iterable)
else FileIter(f)) else FileIter(f))
@ -82,21 +91,17 @@ class FunctionsController(rest.RestController):
def post(self, **kwargs): def post(self, **kwargs):
LOG.info("Creating function, params=%s", kwargs) LOG.info("Creating function, params=%s", kwargs)
# When using image to create function, runtime_id is not a required
# param.
if not POST_REQUIRED.issubset(set(kwargs.keys())): if not POST_REQUIRED.issubset(set(kwargs.keys())):
raise exc.InputException( raise exc.InputException(
'Required param is missing. Required: %s' % POST_REQUIRED 'Required param is missing. Required: %s' % POST_REQUIRED
) )
runtime = db_api.get_runtime(kwargs['runtime_id'])
if runtime.status != 'available':
raise exc.InputException(
'Runtime %s not available.' % kwargs['runtime_id']
)
values = { values = {
'name': kwargs['name'], 'name': kwargs['name'],
'description': kwargs.get('description', None), 'description': kwargs.get('description'),
'runtime_id': kwargs['runtime_id'], 'runtime_id': kwargs.get('runtime_id'),
'code': json.loads(kwargs['code']), 'code': json.loads(kwargs['code']),
'entry': kwargs.get('entry', 'main'), 'entry': kwargs.get('entry', 'main'),
} }
@ -105,9 +110,19 @@ class FunctionsController(rest.RestController):
if not source or source not in CODE_SOURCE: if not source or source not in CODE_SOURCE:
raise exc.InputException( raise exc.InputException(
'Invalid code source specified, available sources: %s' % 'Invalid code source specified, available sources: %s' %
CODE_SOURCE ', '.join(CODE_SOURCE)
) )
if source != 'image':
if not kwargs.get('runtime_id'):
raise exc.InputException('"runtime_id" must be specified.')
runtime = db_api.get_runtime(kwargs['runtime_id'])
if runtime.status != 'available':
raise exc.InputException(
'Runtime %s is not available.' % kwargs['runtime_id']
)
store = False store = False
if values['code']['source'] == 'package': if values['code']['source'] == 'package':
store = True store = True
@ -124,12 +139,12 @@ class FunctionsController(rest.RestController):
if not swift_util.check_object(container, object): if not swift_util.check_object(container, object):
raise exc.InputException('Object does not exist in Swift.') raise exc.InputException('Object does not exist in Swift.')
ctx = context.get_ctx()
with db_api.transaction(): with db_api.transaction():
func_db = db_api.create_function(values) func_db = db_api.create_function(values)
if store: if store:
ctx = context.get_ctx()
self.storage_provider.store( self.storage_provider.store(
ctx.projectid, ctx.projectid,
func_db.id, func_db.id,
@ -161,6 +176,8 @@ class FunctionsController(rest.RestController):
if source == 'package': if source == 'package':
self.storage_provider.delete(context.get_ctx().projectid, id) self.storage_provider.delete(context.get_ctx().projectid, id)
if source == 'image':
self.engine_client.delete_function(id, func_db.name)
# This will also delete function service mapping as well. # This will also delete function service mapping as well.
db_api.delete_function(id) db_api.delete_function(id)

View File

@ -52,7 +52,7 @@ def upgrade():
sa.Column('id', sa.String(length=36), nullable=False), sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('name', sa.String(length=255), nullable=True), sa.Column('name', sa.String(length=255), nullable=True),
sa.Column('description', sa.String(length=255), nullable=True), sa.Column('description', sa.String(length=255), nullable=True),
sa.Column('runtime_id', sa.String(length=36), nullable=False), sa.Column('runtime_id', sa.String(length=36), nullable=True),
sa.Column('memory_size', sa.Integer, nullable=True), sa.Column('memory_size', sa.Integer, nullable=True),
sa.Column('timeout', sa.Integer, nullable=True), sa.Column('timeout', sa.Integer, nullable=True),
sa.Column('code', st.JsonLongDictType(), nullable=False), sa.Column('code', st.JsonLongDictType(), nullable=False),

View File

@ -27,7 +27,7 @@ class Function(model_base.QinlingSecureModelBase):
name = sa.Column(sa.String(255), nullable=False) name = sa.Column(sa.String(255), nullable=False)
description = sa.Column(sa.String(255)) description = sa.Column(sa.String(255))
runtime_id = sa.Column(sa.String(36), nullable=False) runtime_id = sa.Column(sa.String(36), nullable=True)
memory_size = sa.Column(sa.Integer) memory_size = sa.Column(sa.Integer)
timeout = sa.Column(sa.Integer) timeout = sa.Column(sa.Integer)
code = sa.Column(st.JsonLongDictType(), nullable=False) code = sa.Column(st.JsonLongDictType(), nullable=False)

View File

@ -15,6 +15,7 @@
from oslo_log import log as logging from oslo_log import log as logging
from qinling.db import api as db_api from qinling.db import api as db_api
from qinling.utils import common
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -72,16 +73,42 @@ class DefaultEngine(object):
with db_api.transaction(): with db_api.transaction():
execution = db_api.get_execution(execution_id) execution = db_api.get_execution(execution_id)
runtime = db_api.get_runtime(runtime_id) function = db_api.get_function(function_id)
identifier = '%s-%s' % (runtime_id, runtime.name)
labels = {'runtime_name': runtime.name, 'runtime_id': runtime_id} source = function.code['source']
image = None
identifier = None
labels = None
if source == 'image':
image = function.code['image']
identifier = ('%s-%s' %
(common.generate_unicode_uuid(dashed=False),
function_id)
)[:63]
labels = {
'function_name': function.name, 'function_id': function_id
}
else:
runtime = db_api.get_runtime(runtime_id)
identifier = ('%s-%s' % (runtime_id, runtime.name))[:63]
labels = {
'runtime_name': runtime.name, 'runtime_id': runtime_id
}
service_url = self.orchestrator.prepare_execution( service_url = self.orchestrator.prepare_execution(
function_id, identifier=identifier, labels=labels function_id,
image=image,
identifier=identifier,
labels=labels,
input=input
) )
output = self.orchestrator.run_execution( output = self.orchestrator.run_execution(
function_id, input=input, service_url=service_url function_id,
input=input,
identifier=identifier,
service_url=service_url,
) )
LOG.debug( LOG.debug(
@ -93,8 +120,17 @@ class DefaultEngine(object):
execution.output = output execution.output = output
execution.status = 'success' execution.status = 'success'
mapping = { if not image:
'function_id': function_id, mapping = {
'service_url': service_url 'function_id': function_id,
} 'service_url': service_url
db_api.create_function_service_mapping(mapping) }
db_api.create_function_service_mapping(mapping)
def delete_function(self, ctx, function_id, function_name):
LOG.info('Start to delete function, id=%s', function_id)
labels = {
'function_name': function_name, 'function_id': function_id
}
self.orchestrator.delete_function(labels)

View File

@ -42,6 +42,10 @@ class OrchestratorBase(object):
def run_execution(self, function_id, **kwargs): def run_execution(self, function_id, **kwargs):
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod
def delete_function(self, labels):
raise NotImplementedError
def load_orchestrator(conf): def load_orchestrator(conf):
global ORCHESTRATOR global ORCHESTRATOR

View File

@ -53,6 +53,7 @@ class KubernetesManager(base.OrchestratorBase):
self.deployment_template = jinja_env.get_template('deployment.j2') self.deployment_template = jinja_env.get_template('deployment.j2')
self.service_template = jinja_env.get_template('service.j2') self.service_template = jinja_env.get_template('service.j2')
self.pod_template = jinja_env.get_template('pod.j2')
def _ensure_namespace(self): def _ensure_namespace(self):
ret = self.v1.list_namespace() ret = self.v1.list_namespace()
@ -249,19 +250,82 @@ class KubernetesManager(base.OrchestratorBase):
return pod_service_url return pod_service_url
def prepare_execution(self, function_id, identifier=None, labels=None): def _create_pod(self, image, pod_name, labels, input):
pod = self._choose_available_pod(labels) pod_body = self.pod_template.render(
{
"pod_name": pod_name,
"labels": labels,
"pod_image": image,
"input": input
}
)
LOG.info(
"Creating pod %s for image function:\n%s", pod_name, pod_body
)
self.v1.create_namespaced_pod(
self.conf.kubernetes.namespace,
body=yaml.safe_load(pod_body),
)
def prepare_execution(self, function_id, image=None, identifier=None,
labels=None, input=None):
"""Prepare service URL for function.
For image function, create a single pod with input, so the function
will be executed.
For normal function, choose a pod from the pool and expose a service,
return the service URL.
"""
pod = None
if image:
self._create_pod(image, identifier, labels, input)
return None
else:
pod = self._choose_available_pod(labels)
if not pod: if not pod:
raise exc.OrchestratorException('No pod available.') raise exc.OrchestratorException('No pod available.')
return self._prepare_pod(pod, identifier, function_id, labels) return self._prepare_pod(pod, identifier, function_id, labels)
def run_execution(self, function_id, input=None, service_url=None): def run_execution(self, function_id, input=None, identifier=None,
func_url = '%s/execute' % service_url service_url=None):
if service_url:
func_url = '%s/execute' % service_url
LOG.info('Invoke function %s, url: %s', function_id, func_url)
LOG.info('Invoke function %s, url: %s', function_id, func_url) r = requests.post(func_url, data=input)
r = requests.post(func_url, data=input) return {'result': r.json()}
else:
status = None
return {'result': r.json()} # Wait for execution to be finished.
# TODO(kong): Do not retry infinitely.
while status != 'Succeeded':
pod = self.v1.read_namespaced_pod(
identifier,
self.conf.kubernetes.namespace
)
status = pod.status.phase
time.sleep(0.5)
output = self.v1.read_namespaced_pod_log(
identifier,
self.conf.kubernetes.namespace,
)
return {'result': output}
def delete_function(self, labels):
selector = common.convert_dict_to_string(labels)
self.v1.delete_collection_namespaced_pod(
self.conf.kubernetes.namespace,
label_selector=selector
)

View File

@ -0,0 +1,20 @@
apiVersion: v1
kind: Pod
metadata:
name: {{ pod_name }}
labels:
{% for key, value in labels.items() %}
{{ key }}: {{ value }}
{% endfor %}
spec:
containers:
- name: {{ pod_name }}
image: {{ pod_image }}
imagePullPolicy: IfNotPresent
{% if input %}
args:
{% for item in input %}
- {{ item }}
{% endfor %}
{% endif %}
restartPolicy: Never

View File

@ -156,3 +156,12 @@ class EngineClient(object):
runtime_id=runtime_id, runtime_id=runtime_id,
input=input input=input
) )
@wrap_messaging_exception
def delete_function(self, id, name):
return self._client.prepare(topic=self.topic, server=None).cast(
ctx.get_ctx(),
'delete_function',
function_id=id,
function_name=name
)

View File

@ -23,8 +23,8 @@ def convert_dict_to_string(d):
return ','.join(temp_list) return ','.join(temp_list)
def generate_unicode_uuid(): def generate_unicode_uuid(dashed=True):
return uuidutils.generate_uuid() return uuidutils.generate_uuid(dashed=dashed)
def disable_ssl_warnings(func): def disable_ssl_warnings(func):

View File

@ -21,8 +21,6 @@ from qinling import context
CONF = cfg.CONF CONF = cfg.CONF
KS_SESSION = None
def _get_user_keystone_session(): def _get_user_keystone_session():
ctx = context.get_ctx() ctx = context.get_ctx()