Complete basic workflow(Finally!)

- Create runtime
- Create function
- Invoke function(i.e. create execution)
This commit is contained in:
Lingxian Kong 2017-05-09 23:47:12 +12:00
parent 7b6deac2c1
commit df32a9f412
35 changed files with 1068 additions and 120 deletions

@ -1,8 +1,8 @@
===============================
=======
qinling
===============================
=======
Function as a Service
Function as a Service (Documentation needs to be added, please stay tuned!)
Please fill here a long description which must be at least 3 lines wrapped on
80 cols, so that distribution package maintainers can use it in their packages.

@ -1,12 +1,175 @@
============
Qinling Installation Guide
==========================
Prerequisites
-------------
It is necessary to install some specific system libs for installing Qinling.
They can be installed on most popular operating system using their package
manager (for Ubuntu - *apt*, for Fedora - *dnf*, CentOS - *yum*, for Mac OS -
*brew* or *macports*).
The list of needed packages is shown below:
1. **python-dev**
2. **python-setuptools**
3. **python-pip**
4. **libffi-dev**
5. **libxslt1-dev (or libxslt-dev)**
6. **libxml2-dev**
7. **libyaml-dev**
8. **libssl-dev**
In case of Ubuntu, just run::
$ apt-get install -y python-dev python-setuptools python-pip libffi-dev libxslt1-dev \
libxml2-dev libyaml-dev libssl-dev
**NOTE:** **Qinling can be used without authentication at all or it can work
with OpenStack.** In case of OpenStack, it works **only on Keystone v3**, make
sure **Keystone v3** is installed.
Installation
============
------------
At the command line::
First of all, clone the repo and go to the repo directory::
$ pip install qinling
$ git clone https://github.com/openstack/qinling.git
$ cd qinling
Or, if you have virtualenvwrapper installed::
Generate config::
$ mkvirtualenv qinling
$ pip install qinling
$ tox -egenconfig
Configure Qinling as needed. The configuration file is located in
``etc/qinling.conf.sample``. You will need to modify the configuration options
and then copy it into ``/etc/qinling/qinling.conf``.
For details see :doc:`Qinling Configuration Guide </guides/configuration_guide>`
**Virtualenv installation**::
$ tox
This will install necessary virtual environments and run all the project tests.
Installing virtual environments may take significant time (~10-15 mins).
**Local installation**::
$ pip install -e .
or::
$ pip install -r requirements.txt
$ python setup.py install
**NOTE**: Differences *pip install -e* and *setup.py install*. **pip install -e**
works very similarly to **setup.py install** or the EasyInstall tool, except
that it doesnt actually install anything. Instead, it creates a special
.egg-link file in the deployment directory, that links to your projects
source code.
Before the first run
--------------------
After installation you will see **qinling-server** and **qinling-db-manage** commands
in your environment, either in system or virtual environment.
**NOTE**: In case of using **virtualenv**, all Qinling related commands available via
**tox -evenv --**. For example, *qinling-server* is available via
*tox -evenv -- qinling-server*.
**qinling-db-manage** command can be used for migrations.
For updating the database to the latest revision type::
$ qinling-db-manage --config-file <path-to-qinling.conf> upgrade head
Before starting Qinling server, run *qinling-db-manage populate* command.
It prepares the DB, creates in it with all standard actions and standard
workflows which Qinling provides for all Qinling users.::
$ qinling-db-manage --config-file <path-to-qinling.conf> populate
For more detailed information about *qinling-db-manage* script please see :doc:`Qinling Upgrade Guide </guides/upgrade_guide>`.
**NOTE**: For users who want a dry run with **SQLite** database backend(not
used in production), *qinling-db-manage* is not recommended for database
initialization because of `SQLite limitations <http://www.sqlite.org/omitted.html>`_.
Please use sync_db script described below instead for database initialization.
**If you use virtualenv**::
$ tools/sync_db.sh --config-file <path-to-qinling.conf>
**Or run sync_db directly**::
$ python tools/sync_db.py --config-file <path-to-qinling.conf>
Running Qinling API server
--------------------------
To run Qinling API server perform the following command in a shell::
$ qinling-server --server api --config-file <path-to-qinling.conf>
Running Qinling Engines
-----------------------
To run Qinling Engine perform the following command in a shell::
$ qinling-server --server engine --config-file <path-to-qinling.conf>
Running Qinling Task Executors
------------------------------
To run Qinling Task Executor instance perform the following command in a shell::
$ qinling-server --server executor --config-file <path-to-qinling.conf>
Note that at least one Engine instance and one Executor instance should be
running so that workflow tasks are processed by Qinling.
Running Multiple Qinling Servers Under the Same Process
-------------------------------------------------------
To run more than one server (API, Engine, or Task Executor) on the same process,
perform the following command in a shell::
$ qinling-server --server api,engine --config-file <path-to-qinling.conf>
The --server command line option can be a comma delimited list. The valid
options are "all" (by default if not specified) or any combination of "api",
"engine", and "executor". It's important to note that the "fake" transport for
the rpc_backend defined in the config file should only be used if "all" the
Qinling servers are launched on the same process. Otherwise, messages do not
get delivered if the Qinling servers are launched on different processes
because the "fake" transport is using an in process queue.
Qinling And Docker
------------------
Please first refer `installation steps for docker <https://docs.docker.com/installation/>`_.
To build the image from the qinling source, change directory to the root
directory of the Qinling git repository and run::
$ docker build -t <Name of image> .
In case you want pre-built image, you can download it from `openstack tarballs source <https://tarballs.openstack.org/qinling/images/qinling-docker.tar.gz>`_.
To load this image to docker registry, please run following command::
$ docker load -i '<path of qinling-docker.tar.gz>'
The Qinling Docker image is configured to store the database in the user's home
directory. For persistence of these data, you may want to keep this directory
outside of the container. This may be done by the following steps::
$ sudo mkdir '<user-defined-directory>'
$ docker run -it -v '<user-defined-directory>':/home/qinling <Name of image>
More about docker: https://www.docker.com/
**NOTE:** This docker image uses **SQLite** database. So, it cannot be used for
production environment. If you want to use this for production environment,
then put customized qinling.conf to '<user-defined-directory>'.
Qinling Client Installation
---------------------------
Please refer to :doc:`Qinling Client / CLI Guide </guides/mistralclient_guide>`

@ -0,0 +1,106 @@
# Copyright 2017 Catalyst IT Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from pecan import rest
import requests
import wsmeext.pecan as wsme_pecan
from qinling.api.controllers.v1 import resources
from qinling.api.controllers.v1 import types
from qinling.db import api as db_api
from qinling import exceptions as exc
from qinling import rpc
from qinling.utils import rest_utils
LOG = logging.getLogger(__name__)
class ExecutionsController(rest.RestController):
def __init__(self, *args, **kwargs):
self.engine_client = rpc.get_engine_client()
super(ExecutionsController, self).__init__(*args, **kwargs)
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(
resources.Execution,
body=resources.Execution,
status_code=201
)
def post(self, execution):
params = execution.to_dict()
LOG.info("Creating execution. [execution=%s]", params)
function_id = params['function_id']
# Check if the service url is existing.
try:
mapping = db_api.get_function_service_mapping(function_id)
LOG.debug('Found Service url for function: %s', function_id)
func_url = '%s/execute' % mapping.service_url
LOG.info('Invoke function %s, url: %s', function_id, func_url)
r = requests.post(func_url, data=params.get('input'))
params.update(
{'status': 'success', 'output': {'result': r.json()}}
)
db_model = db_api.create_execution(params)
return resources.Execution.from_dict(db_model.to_dict())
except exc.DBEntityNotFoundError:
pass
func = db_api.get_function(function_id)
runtime_id = func.runtime_id
params.update({'status': 'running'})
db_model = db_api.create_execution(params)
self.engine_client.create_execution(
db_model.id, function_id, runtime_id, input=params.get('input')
)
updated_db = db_api.get_execution(db_model.id)
return resources.Execution.from_dict(updated_db.to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.Executions)
def get_all(self):
LOG.info("Get all executions.")
executions = [resources.Execution.from_dict(db_model.to_dict())
for db_model in db_api.get_executions()]
return resources.Executions(executions=executions)
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.Execution, types.uuid)
def get(self, id):
LOG.info("Fetch execution [id=%s]", id)
execution_db = db_api.get_execution(id)
return resources.Execution.from_dict(execution_db.to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(None, types.uuid, status_code=204)
def delete(self, id):
"""Delete the specified Execution."""
LOG.info("Delete execution [id=%s]", id)
return db_api.delete_execution(id)

@ -13,11 +13,14 @@
# limitations under the License.
import json
import os
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import strutils
import pecan
from pecan import rest
from webob.static import FileIter
import wsmeext.pecan as wsme_pecan
from qinling.api.controllers.v1 import resources
@ -30,66 +33,99 @@ from qinling.utils import rest_utils
LOG = logging.getLogger(__name__)
POST_REQUIRED = set(['name', 'runtime', 'code'])
POST_REQUIRED = set(['name', 'runtime_id', 'code'])
class FunctionsController(rest.RestController):
def __init__(self, *args, **kwargs):
self.storage_provider = storage_base.load_storage_providers(cfg.CONF)
self.storage_provider = storage_base.load_storage_provider(cfg.CONF)
super(FunctionsController, self).__init__(*args, **kwargs)
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.Function, types.uuid)
@rest_utils.wrap_pecan_controller_exception
@pecan.expose()
def get(self, id):
LOG.info("Fetch function [id=%s]", id)
download = strutils.bool_from_string(
pecan.request.GET.get('download', False)
)
func_db = db_api.get_function(id)
ctx = context.get_ctx()
return resources.Function.from_dict(func_db.to_dict())
if not download:
pecan.override_template('json')
return resources.Function.from_dict(func_db.to_dict()).to_dict()
else:
f = self.storage_provider.retrieve(
ctx.projectid,
id,
)
def get_data(self, id):
pass
pecan.response.app_iter = FileIter(f)
pecan.response.headers['Content-Type'] = 'application/zip'
pecan.response.headers['Content-Disposition'] = (
'attachment; filename="%s"' % os.path.basename(f.name)
)
@rest_utils.wrap_pecan_controller_exception
@pecan.expose()
@pecan.expose('json')
def post(self, **kwargs):
LOG.info("Create function, params=%s", kwargs)
LOG.info("Creating function, params=%s", kwargs)
if not POST_REQUIRED.issubset(set(kwargs.keys())):
raise exc.InputException(
'Required param is missing. Required: %s' % POST_REQUIRED
)
runtime = db_api.get_runtime(kwargs['runtime_id'])
if runtime.status != 'available':
raise exc.InputException(
'Runtime %s not available.' % kwargs['runtime_id']
)
values = {
'name': kwargs['name'],
'runtime': kwargs['runtime'],
'description': kwargs.get('description', None),
'runtime_id': kwargs['runtime_id'],
'code': json.loads(kwargs['code']),
'storage': 'local'
'entry': kwargs.get('entry', 'main'),
}
if values['code'].get('package', False):
data = kwargs['package'].file.read()
ctx = context.get_ctx()
with db_api.transaction():
func_db = db_api.create_function(values)
self.storage_provider[values['storage']].store(
self.storage_provider.store(
ctx.projectid,
values['name'],
func_db.id,
data
)
pecan.response.status = 201
return resources.Function.from_dict(func_db.to_dict())
return resources.Function.from_dict(func_db.to_dict()).to_dict()
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.Functions)
def get_all(self):
LOG.info("Get all functions.")
funcs = resources.Functions()
funcs.functions = []
functions = [resources.Function.from_dict(db_model.to_dict())
for db_model in db_api.get_functions()]
return funcs
return resources.Functions(functions=functions)
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(None, types.uuid, status_code=204)
def delete(self, id):
"""Delete the specified function."""
LOG.info("Delete function [id=%s]", id)
with db_api.transaction():
db_api.delete_function(id)
self.storage_provider.delete(context.get_ctx().projectid, id)

@ -19,8 +19,6 @@ from wsme import types as wtypes
from qinling.api.controllers.v1 import types
PROVIDER_TYPES = wtypes.Enum(str, 'docker', 'fission')
class Resource(wtypes.Base):
"""REST API Resource."""
@ -166,11 +164,11 @@ class Function(Resource):
id = wtypes.text
name = wtypes.text
description = wtypes.text
memorysize = int
memory_size = int
timeout = int
runtime = wtypes.text
runtime_id = types.uuid
code = types.jsontype
provider = PROVIDER_TYPES
entry = wtypes.text
created_at = wtypes.text
updated_at = wtypes.text
@ -180,11 +178,11 @@ class Function(Resource):
id='123e4567-e89b-12d3-a456-426655440000',
name='hello_world',
description='this is the first function.',
memorysize=1,
memory_size=1,
timeout=1,
runtime='python2.7',
runtime_id='123e4567-e89b-12d3-a456-426655440001',
code={'zip': True},
provider='docker',
entry='main',
created_at='1970-01-01T00:00:00.000000',
updated_at='1970-01-01T00:00:00.000000'
)
@ -228,7 +226,7 @@ class Runtime(Resource):
name='python2.7',
image='lingxiankong/python',
status='available',
project_id='<default-project>',
project_id='default',
description='Python 2.7 environment.',
created_at='1970-01-01T00:00:00.000000',
updated_at='1970-01-01T00:00:00.000000'
@ -254,3 +252,48 @@ class Runtimes(ResourceList):
)
return sample
class Execution(Resource):
id = types.uuid
function_id = wsme.wsattr(types.uuid, mandatory=True)
status = wsme.wsattr(wtypes.text, readonly=True)
sync = bool
input = types.jsontype
output = wsme.wsattr(types.jsontype, readonly=True)
created_at = wsme.wsattr(wtypes.text, readonly=True)
updated_at = wsme.wsattr(wtypes.text, readonly=True)
@classmethod
def sample(cls):
return cls(
id='123e4567-e89b-12d3-a456-426655440000',
function_id='123e4567-e89b-12d3-a456-426655440000',
status='success',
sync=True,
input={'data': 'hello, world'},
output={'result': 'hello, world'},
created_at='1970-01-01T00:00:00.000000',
updated_at='1970-01-01T00:00:00.000000'
)
class Executions(ResourceList):
executions = [Execution]
def __init__(self, **kwargs):
self._type = 'executions'
super(Executions, self).__init__(**kwargs)
@classmethod
def sample(cls):
sample = cls()
sample.executions = [Execution.sample()]
sample.next = (
"http://localhost:7070/v1/executions?"
"sort_keys=id,name&sort_dirs=asc,desc&limit=10&"
"marker=123e4567-e89b-12d3-a456-426655440000"
)
return sample

@ -16,6 +16,7 @@ import pecan
from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from qinling.api.controllers.v1 import execution
from qinling.api.controllers.v1 import function
from qinling.api.controllers.v1 import resources
from qinling.api.controllers.v1 import runtime
@ -35,6 +36,7 @@ class Controller(object):
functions = function.FunctionsController()
runtimes = runtime.RuntimesController()
executions = execution.ExecutionsController()
@wsme_pecan.wsexpose(RootResource)
def index(self):

@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
from pecan import rest
import wsmeext.pecan as wsme_pecan
@ -20,6 +19,7 @@ import wsmeext.pecan as wsme_pecan
from qinling.api.controllers.v1 import resources
from qinling.api.controllers.v1 import types
from qinling.db import api as db_api
from qinling import exceptions as exc
from qinling import rpc
from qinling.utils import rest_utils
@ -51,7 +51,6 @@ class RuntimesController(rest.RestController):
return resources.Runtimes(runtimes=runtimes)
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(
resources.Runtime,
@ -79,6 +78,15 @@ class RuntimesController(rest.RestController):
with db_api.transaction():
runtime_db = db_api.get_runtime(id)
# Runtime can not be deleted if still associate with functions.
funcs = db_api.get_functions(runtime_id={'eq': id})
if len(funcs):
raise exc.NotAllowedException(
'Runtime %s is still in use.' % id
)
runtime_db.status = 'deleting'
# Clean related resources asynchronously
self.engine_client.delete_runtime(id)

@ -23,7 +23,7 @@ eventlet.monkey_patch(
thread=False if '--use-debugger' in sys.argv else True,
time=True)
import os # noqa
import os
# If ../qingling/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
@ -33,15 +33,15 @@ POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'qinling', '__init__.py')):
sys.path.insert(0, POSSIBLE_TOPDIR)
from oslo_config import cfg # noqa
from oslo_log import log as logging # noqa
from oslo_service import service # noqa
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import service
from qinling.api import service as api_service # noqa
from qinling import config # noqa
from qinling.engine import service as eng_service # noqa
from qinling import rpc # noqa
from qinling import version # noqa
from qinling.api import service as api_service
from qinling import config
from qinling.engine import service as eng_service
from qinling import rpc
from qinling import version
CONF = cfg.CONF

@ -99,7 +99,13 @@ storage_opts = [
'file_system_dir',
default='/opt/qinling/funtion/packages',
help='Directory to store funtion packages.'
)
),
cfg.StrOpt(
'provider',
default='local',
choices=['local', 'swift'],
help='Storage provider for function code package.'
),
]
KUBERNETES_GROUP = 'kubernetes'
@ -118,11 +124,9 @@ kubernetes_opts = [
'kube_host',
help='Kubernetes server address.'
),
cfg.StrOpt(
'volume_name',
default='functiondir',
help='Name of the volume shared between worker container and utility '
'container.'
cfg.IPOpt(
'qinling_service_address',
help='Qinling API service ip address.'
),
]

@ -26,7 +26,7 @@ ALLOWED_WITHOUT_AUTH = ['/', '/v1/']
CTX_THREAD_LOCAL_NAME = "QINLING_APP_CTX_THREAD_LOCAL"
DEFAULT_PROJECT_ID = "<default-project>"
DEFAULT_PROJECT_ID = "default"
def authenticate(req):

@ -106,3 +106,34 @@ def delete_runtime(id):
def update_runtime(id, values):
return IMPL.update_runtime(id, values)
# Execution
def create_execution(values):
return IMPL.create_execution(values)
def get_execution(id):
return IMPL.get_execution(id)
def get_executions():
return IMPL.get_executions()
def delete_execution(id):
return IMPL.delete_execution(id)
def update_execution(id, values):
return IMPL.update_execution(id, values)
def create_function_service_mapping(values):
return IMPL.create_function_service_mapping(values)
def get_function_service_mapping(function_id):
return IMPL.get_function_service_mapping(function_id)

@ -183,14 +183,18 @@ def _get_db_object_by_id(model, id, insecure=False):
@db_base.session_aware()
def get_function(id):
pass
def get_function(id, session=None):
function = _get_db_object_by_id(models.Function, id)
if not function:
raise exc.DBEntityNotFoundError("Function not found [id=%s]" % id)
return function
@db_base.session_aware()
def get_functions(limit=None, marker=None, sort_keys=None,
sort_dirs=None, fields=None, **kwargs):
pass
def get_functions(session=None, **kwargs):
return _get_collection_sorted_by_time(models.Function, **kwargs)
@db_base.session_aware()
@ -214,8 +218,10 @@ def update_function(id, values):
@db_base.session_aware()
def delete_function(id):
pass
def delete_function(id, session=None):
function = get_function(id)
session.delete(function)
@db_base.session_aware()
@ -253,3 +259,69 @@ def delete_runtime(id, session=None):
runtime = get_runtime(id)
session.delete(runtime)
@db_base.session_aware()
def create_execution(values, session=None):
execution = models.Execution()
execution.update(values.copy())
try:
execution.save(session=session)
except oslo_db_exc.DBDuplicateEntry as e:
raise exc.DBError(
"Duplicate entry for Execution: %s" % e.columns
)
return execution
@db_base.session_aware()
def get_execution(id, session=None):
execution = _get_db_object_by_id(models.Execution, id)
if not execution:
raise exc.DBEntityNotFoundError("Execution not found [id=%s]" % id)
return execution
@db_base.session_aware()
def get_executions(session=None, **kwargs):
return _get_collection_sorted_by_time(models.Execution, **kwargs)
@db_base.session_aware()
def delete_execution(id, session=None):
execution = get_execution(id)
session.delete(execution)
@db_base.session_aware()
def create_function_service_mapping(values, session=None):
mapping = models.FunctionServiceMapping()
mapping.update(values.copy())
try:
mapping.save(session=session)
except oslo_db_exc.DBDuplicateEntry as e:
raise exc.DBError(
"Duplicate entry for FunctionServiceMapping: %s" % e.columns
)
return mapping
@db_base.session_aware()
def get_function_service_mapping(function_id, session=None):
mapping = db_base.model_query(
models.FunctionServiceMapping
).filter_by(function_id=function_id).first()
if not mapping:
raise exc.DBEntityNotFoundError(
"FunctionServiceMapping not found [function_id=%s]" % function_id
)
return mapping

@ -24,13 +24,58 @@ Create Date: 2017-05-03 12:02:51.935368
revision = '001'
down_revision = None
import re
from alembic import op
import sqlalchemy as sa
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.schema import CreateTable
from qinling.db.sqlalchemy import types as st
@compiles(CreateTable)
def _add_if_not_exists(element, compiler, **kw):
output = compiler.visit_create_table(element, **kw)
if element.element.info.get("check_ifexists"):
output = re.sub(
"^\s*CREATE TABLE", "CREATE TABLE IF NOT EXISTS", output, re.S)
return output
def upgrade():
op.create_table(
'function',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('project_id', sa.String(length=80), nullable=False),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('name', sa.String(length=255), nullable=True),
sa.Column('description', sa.String(length=255), nullable=True),
sa.Column('runtime_id', sa.String(length=36), nullable=False),
sa.Column('memory_size', sa.Integer, nullable=True),
sa.Column('timeout', sa.Integer, nullable=True),
sa.Column('code', st.JsonLongDictType(), nullable=False),
sa.Column('entry', sa.String(length=80), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name', 'project_id'),
info={"check_ifexists": True}
)
op.create_table(
'function_service_mapping',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('function_id', sa.String(length=36), nullable=False),
sa.Column('service_url', sa.String(length=255), nullable=False),
sa.PrimaryKeyConstraint('function_id'),
sa.UniqueConstraint('function_id', 'service_url'),
sa.ForeignKeyConstraint(
['function_id'], [u'function.id'], ondelete='CASCADE'
),
info={"check_ifexists": True}
)
op.create_table(
'runtime',
sa.Column('created_at', sa.DateTime(), nullable=True),
@ -42,4 +87,21 @@ def upgrade():
sa.Column('image', sa.String(length=255), nullable=False),
sa.Column('status', sa.String(length=32), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name'),
info={"check_ifexists": True}
)
op.create_table(
'execution',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('project_id', sa.String(length=80), nullable=False),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('function_id', sa.String(length=36), nullable=False),
sa.Column('status', sa.String(length=32), nullable=False),
sa.Column('sync', sa.BOOLEAN, nullable=False),
sa.Column('input', st.JsonLongDictType(), nullable=True),
sa.Column('output', st.JsonLongDictType(), nullable=True),
sa.PrimaryKeyConstraint('id'),
info={"check_ifexists": True}
)

@ -23,8 +23,6 @@ from oslo_config import cfg
from oslo_utils import importutils
import six
from qinling import config
# We need to import mistral.api.app to
# make sure we register all needed options.
importutils.try_import('qinling.api.app')

@ -15,19 +15,19 @@
import six
from oslo_db.sqlalchemy import models as oslo_models
from oslo_utils import uuidutils
import sqlalchemy as sa
from sqlalchemy.ext import declarative
from sqlalchemy.orm import attributes
from qinling import context
from qinling.utils import common
def id_column():
return sa.Column(
sa.String(36),
primary_key=True,
default=uuidutils.generate_uuid()
default=common.generate_unicode_uuid
)

@ -27,12 +27,26 @@ class Function(model_base.QinlingSecureModelBase):
name = sa.Column(sa.String(255), nullable=False)
description = sa.Column(sa.String(255))
runtime = sa.Column(sa.String(32), nullable=False)
memorysize = sa.Column(sa.Integer, nullable=False)
timeout = sa.Column(sa.Integer, nullable=False)
provider = sa.Column(sa.String(32), nullable=False)
package = sa.Column(sa.Boolean, nullable=False)
runtime_id = sa.Column(sa.String(36), nullable=False)
memory_size = sa.Column(sa.Integer)
timeout = sa.Column(sa.Integer)
code = sa.Column(st.JsonLongDictType(), nullable=False)
entry = sa.Column(sa.String(80), nullable=False)
class FunctionServiceMapping(model_base.QinlingModelBase):
__tablename__ = 'function_service_mapping'
__table_args__ = (
sa.UniqueConstraint('function_id', 'service_url'),
)
function_id = sa.Column(
sa.String(36),
sa.ForeignKey(Function.id, ondelete='CASCADE'),
primary_key=True,
)
service_url = sa.Column(sa.String(255), nullable=False)
class Runtime(model_base.QinlingSecureModelBase):
@ -42,3 +56,15 @@ class Runtime(model_base.QinlingSecureModelBase):
description = sa.Column(sa.String(255))
image = sa.Column(sa.String(255), nullable=False)
status = sa.Column(sa.String(32), nullable=False)
sa.UniqueConstraint('name')
class Execution(model_base.QinlingSecureModelBase):
__tablename__ = 'execution'
function_id = sa.Column(sa.String(36), nullable=False)
status = sa.Column(sa.String(32), nullable=False)
sync = sa.Column(sa.BOOLEAN, default=True)
input = sa.Column(st.JsonLongDictType())
output = sa.Column(st.JsonLongDictType())

@ -12,11 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
from qinling.db import api as db_api
from qinling import exceptions as exc
LOG = logging.getLogger(__name__)
@ -50,8 +48,6 @@ class DefaultEngine(object):
runtime.status = 'error'
raise exc.OrchestratorException('Failed to create pool.')
def delete_runtime(self, ctx, runtime_id):
LOG.info('Start to delete runtime, id=%s', runtime_id)
@ -65,3 +61,40 @@ class DefaultEngine(object):
db_api.delete_runtime(runtime_id)
LOG.info('Runtime %s deleted.', runtime_id)
def create_execution(self, ctx, execution_id, function_id, runtime_id,
input=None):
LOG.info(
'Creating execution. execution_id=%s, function_id=%s, '
'runtime_id=%s',
execution_id, function_id, runtime_id
)
with db_api.transaction():
execution = db_api.get_execution(execution_id)
runtime = db_api.get_runtime(runtime_id)
identifier = '%s-%s' % (runtime_id, runtime.name)
labels = {'runtime_name': runtime.name, 'runtime_id': runtime_id}
service_url = self.orchestrator.prepare_execution(
function_id, identifier=identifier, labels=labels
)
output = self.orchestrator.run_execution(
function_id, input=input, service_url=service_url
)
LOG.debug(
'Finished execution. execution_id=%s, output=%s',
execution_id,
output
)
execution.output = output
execution.status = 'success'
mapping = {
'function_id': function_id,
'service_url': service_url
}
db_api.create_function_service_mapping(mapping)

@ -78,6 +78,15 @@ class ApplicationContextNotFoundException(QinlingException):
message = "Application context not found"
class StorageNotFoundException(QinlingException):
http_code = 404
message = "Storage file not found"
class StorageProviderException(QinlingException):
http_code = 500
class OrchestratorException(QinlingException):
http_code = 500
message = "Orchestrator error."

@ -34,6 +34,14 @@ class OrchestratorBase(object):
def delete_pool(self, name, **kwargs):
raise NotImplementedError
@abc.abstractmethod
def prepare_execution(self, function_id, **kwargs):
raise NotImplementedError
@abc.abstractmethod
def run_execution(self, function_id, **kwargs):
raise NotImplementedError
def load_orchestrator(conf):
global ORCHESTRATOR

@ -13,16 +13,15 @@
# limitations under the License.
import os
import time
import jinja2
from kubernetes import config
from kubernetes import client
from kubernetes.client import models
from kubernetes.client.rest import ApiException
from oslo_config import cfg
from oslo_log import log as logging
import requests
import yaml
from qinling import exceptions as exc
from qinling.orchestrator import base
from qinling.utils import common
@ -47,10 +46,12 @@ class KubernetesManager(base.OrchestratorBase):
searchpath=os.path.dirname(TEMPLATES_DIR)
)
jinja_env = jinja2.Environment(
loader=template_loader, autoescape=True, trim_blocks=True
loader=template_loader, autoescape=True, trim_blocks=True,
lstrip_blocks=True
)
self.deployment_template = jinja_env.get_template('deployment.j2')
self.service_template = jinja_env.get_template('service.j2')
def _ensure_namespace(self):
ret = self.v1.list_namespace()
@ -80,7 +81,6 @@ class KubernetesManager(base.OrchestratorBase):
"name": name,
"labels": labels if labels else {},
"replicas": self.conf.kubernetes.replicas,
"volume_name": self.conf.kubernetes.volume_name,
"container_name": 'worker',
"image": image,
}
@ -104,13 +104,6 @@ class KubernetesManager(base.OrchestratorBase):
selector = common.convert_dict_to_string(labels)
self.v1.delete_collection_namespaced_pod(
self.conf.kubernetes.namespace,
label_selector=selector
)
LOG.info("Pods in deployment %s deleted.", name)
self.v1extention.delete_collection_namespaced_replica_set(
self.conf.kubernetes.namespace,
label_selector=selector
@ -122,11 +115,10 @@ class KubernetesManager(base.OrchestratorBase):
self.conf.kubernetes.namespace, label_selector=selector
)
names = [i.metadata.name for i in ret.items]
for name in names:
for svc_name in names:
self.v1.delete_namespaced_service(
name,
svc_name,
self.conf.kubernetes.namespace,
models.v1_delete_options.V1DeleteOptions()
)
LOG.info("Services in deployment %s deleted.", name)
@ -137,4 +129,133 @@ class KubernetesManager(base.OrchestratorBase):
field_selector='metadata.name=%s' % name
)
# Should delete pods after deleting deployment to avoid pods are
# recreated by k8s.
self.v1.delete_collection_namespaced_pod(
self.conf.kubernetes.namespace,
label_selector=selector
)
LOG.info("Pods in deployment %s deleted.", name)
LOG.info("Deployment %s deleted.", name)
def _choose_available_pod(self, labels):
selector = common.convert_dict_to_string(labels)
ret = self.v1.list_namespaced_pod(
self.conf.kubernetes.namespace,
label_selector='!function_id,%s' % selector
)
if len(ret.items) == 0:
return None
# Choose the last available one by default.
pod = ret.items[-1]
return pod
def _prepare_pod(self, pod, deployment_name, function_id, service_labels):
"""Pod preparation.
1. Update pod labels.
2. Expose service and trigger package download.
"""
name = pod.metadata.name
LOG.info(
'Prepare pod %s in deployment %s for function %s',
name, deployment_name, function_id
)
# Update pod label.
pod_labels = pod.metadata.labels or {}
pod_labels.update({'function_id': function_id})
body = {
'metadata': {
'labels': pod_labels
}
}
self.v1.patch_namespaced_pod(
name, self.conf.kubernetes.namespace, body
)
LOG.debug('Labels updated for pod %s', name)
# Create service for the choosen pod.
service_name = "service-%s" % function_id
service_body = self.service_template.render(
{
"service_name": service_name,
"labels": service_labels,
"selector": pod_labels
}
)
ret = self.v1.create_namespaced_service(
self.conf.kubernetes.namespace, yaml.safe_load(service_body)
)
node_port = ret.spec.ports[0].node_port
LOG.debug(
'Service created for pod %s, service name: %s, node port: %s',
name, service_name, node_port
)
# Get external ip address for an arbitary node.
ret = self.v1.list_node()
addresses = ret.items[0].status.addresses
node_ip = None
for addr in addresses:
if addr.type == 'ExternalIP':
node_ip = addr.address
# FIXME: test purpose using minikube
if not node_ip:
for addr in addresses:
if addr.type == 'InternalIP':
node_ip = addr.address
# Download code package into container.
pod_service_url = 'http://%s:%s' % (node_ip, node_port)
request_url = '%s/download' % pod_service_url
download_url = (
'http://%s:%s/v1/functions/%s?download=true' %
(self.conf.kubernetes.qinling_service_address,
self.conf.api.port, function_id)
)
data = {'download_url': download_url, 'function_id': function_id}
LOG.debug(
'Send request to pod %s, request_url: %s, data: %s',
name, request_url, data
)
# TODO(kong): Here we sleep some time to avoid 'Failed to establish a
# new connection' error for some reason. Needs to find a better
# solution.
time.sleep(1)
r = requests.post(request_url, data=data)
if r.status_code != requests.codes.ok:
raise exc.OrchestratorException(
'Failed to download function code package.'
)
return pod_service_url
def prepare_execution(self, function_id, identifier=None, labels=None):
pod = self._choose_available_pod(labels)
if not pod:
raise exc.OrchestratorException('No pod available.')
return self._prepare_pod(pod, identifier, function_id, labels)
def run_execution(self, function_id, input=None, service_url=None):
func_url = '%s/execute' % service_url
LOG.info('Invoke function %s, url: %s', function_id, func_url)
r = requests.post(func_url, data=input)
return {'result': r.json()}

@ -3,39 +3,26 @@ kind: Deployment
metadata:
name: {{ name }}
labels:
{% for key, value in labels.items() %}
{{ key}}: {{ value }}
{% endfor %}
{% for key, value in labels.items() %}
{{ key }}: {{ value }}
{% endfor %}
spec:
replicas: {{ replicas }}
selector:
matchLabels:
{% for key, value in labels.items() %}
{{ key}}: {{ value }}
{% endfor %}
{% for key, value in labels.items() %}
{{ key }}: {{ value }}
{% endfor %}
template:
metadata:
labels:
{% for key, value in labels.items() %}
{{ key}}: {{ value }}
{% endfor %}
{% for key, value in labels.items() %}
{{ key }}: {{ value }}
{% endfor %}
spec:
volumes:
- name: {{ volume_name }}
emptyDir: {}
containers:
- name: {{ container_name }}
image: {{ image }}
imagePullPolicy: IfNotPresent
volumeMounts:
- name: {{ volume_name }}
mountPath: /function
- name: fetcher
image: fission/fetcher
imagePullPolicy: IfNotPresent
volumeMounts:
- name: {{ volume_name }}
mountPath: /function
command:
- /fetcher
- /function
ports:
- containerPort: 9090

@ -0,0 +1,17 @@
apiVersion: v1
kind: Service
metadata:
name: {{ service_name }}
labels:
{% for key, value in labels.items() %}
{{ key }}: {{ value }}
{% endfor %}
spec:
type: NodePort
selector:
{% for key, value in selector.items() %}
{{ key}}: "{{ value }}"
{% endfor %}
ports:
- protocol: TCP
port: 9090

@ -144,3 +144,15 @@ class EngineClient(object):
'delete_runtime',
runtime_id=id
)
@wrap_messaging_exception
def create_execution(self, execution_id, function_id, runtime_id,
input=None):
return self._client.prepare(topic=self.topic, server=None).call(
ctx.get_ctx(),
'create_execution',
execution_id=execution_id,
function_id=function_id,
runtime_id=runtime_id,
input=input
)

@ -15,8 +15,11 @@
import abc
import six
from stevedore import driver
STORAGE_PROVIDER_MAPPING = {}
from qinling import exceptions as exc
STORAGE_PROVIDER = None
@six.add_metaclass(abc.ABCMeta)
@ -31,8 +34,28 @@ class PackageStorage(object):
def retrieve(self, project_id, function):
raise NotImplementedError
@abc.abstractmethod
def delete(self, project_id, function):
raise NotImplementedError
def load_storage_providers(conf):
global STORAGE_PROVIDER_MAPPING
return STORAGE_PROVIDER_MAPPING
def load_storage_provider(conf):
global STORAGE_PROVIDER
if not STORAGE_PROVIDER:
try:
mgr = driver.DriverManager(
'qinling.storage.provider',
conf.storage.provider,
invoke_on_load=True,
invoke_args=[conf]
)
STORAGE_PROVIDER = mgr.driver
except Exception as e:
raise exc.StorageProviderException(
'Failed to load storage provider: %s. Error: %s' %
(conf.storage.provider, str(e))
)
return STORAGE_PROVIDER

@ -12,10 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import fileutils
from qinling import exceptions as exc
from qinling.storage import base
LOG = logging.getLogger(__name__)
@ -33,7 +36,43 @@ class FileSystemStorage(base.PackageStorage):
'Store package, function: %s, project: %s', function, project_id
)
project_path = os.path.join(CONF.storage.file_system_dir, project_id)
fileutils.ensure_tree(project_path)
func_zip = os.path.join(project_path, '%s.zip' % function)
with open(func_zip, 'wb') as fd:
fd.write(data)
def retrieve(self, project_id, function):
LOG.info(
'Get package data, function: %s, project: %s', function, project_id
)
func_zip = os.path.join(
CONF.storage.file_system_dir,
'%s/%s.zip' % (project_id, function)
)
if not os.path.exists(func_zip):
raise exc.StorageNotFoundException(
'Package of function %s for project %s not found.' %
(function, project_id)
)
f = open(func_zip, 'rb')
return f
def delete(self, project_id, function):
LOG.info(
'Delete package data, function: %s, project: %s', function,
project_id
)
func_zip = os.path.join(
CONF.storage.file_system_dir,
'%s/%s.zip' % (project_id, function)
)
if os.path.exists(func_zip):
os.remove(func_zip)

@ -12,7 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_utils import uuidutils
def convert_dict_to_string(d):
temp_list = ['%s=%s' % (k, v) for k, v in d.items()]
return ','.join(temp_list)
def generate_unicode_uuid():
return uuidutils.generate_uuid()

@ -0,0 +1,13 @@
FROM alpine:3.5
RUN apk update
RUN apk add --no-cache python2 python2-dev build-base py2-pip
RUN pip install --upgrade pip
RUN rm -r /root/.cache
COPY . /app
WORKDIR /app
RUN pip install -r requirements.txt
ENTRYPOINT ["python"]
CMD ["server.py"]

@ -0,0 +1,31 @@
# Qinling: Python Environment
This is the Python environment for Qinling.
It's a Docker image containing a Python 2.7 runtime, along with a
dynamic loader. A few common dependencies are included in the
requirements.txt file.
## Customizing this image
To add package dependencies, edit requirements.txt to add what you
need, and rebuild this image (instructions below).
You also may want to customize what's available to the function in its
request context. You can do this by editing server.py (see the
comment in that file about customizing request context).
## Rebuilding and pushing the image
You'll need access to a Docker registry to push the image: you can
sign up for Docker hub at hub.docker.com, or use registries from
gcr.io, quay.io, etc. Let's assume you're using a docker hub account
called USER. Build and push the image to the the registry:
```
docker build -t USER/python-env . && docker push USER/python-env
```
## Using the image in Qinling
TBD

@ -0,0 +1,3 @@
Flask>=0.11.1
httplib2
requests>=2.7.0

@ -0,0 +1,91 @@
# Copyright 2017 Catalyst IT Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import sys
import time
import zipfile
import zipimport
from flask import abort
from flask import Flask
from flask import request
from flask import Response
import requests
app = Flask(__name__)
file_name = ''
@app.route('/download', methods=['POST'])
def download():
service_url = request.form['download_url']
function_id = request.form['function_id']
global file_name
file_name = '%s.zip' % function_id
app.logger.info('Request received, service_url:%s' % service_url)
r = requests.get(service_url, stream=True)
with open(file_name, 'wb') as fd:
for chunk in r.iter_content(chunk_size=128):
fd.write(chunk)
if not zipfile.is_zipfile(file_name):
abort(500)
app.logger.info('Code package downloaded to %s' % file_name)
return 'success'
@app.route('/execute', methods=['POST'])
def execute():
global file_name
importer = zipimport.zipimporter(file_name)
module = importer.load_module('main')
start = time.time()
try:
result = module.main()
except Exception as e:
result = str(e)
duration = time.time() - start
return Response(
response=json.dumps({'output': result, 'duration': duration}),
status=200,
mimetype='application/json'
)
def setup_logger(loglevel):
global app
root = logging.getLogger()
root.setLevel(loglevel)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(loglevel)
ch.setFormatter(
logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
)
app.logger.addHandler(ch)
setup_logger(logging.DEBUG)
app.logger.info("Starting server")
app.run(host='0.0.0.0', port='9090')

@ -27,6 +27,9 @@ console_scripts =
qinling-server = qinling.cmd.launch:main
qinling-db-manage = qinling.db.sqlalchemy.migration.cli:main
qinling.storage.provider:
local = qinling.storage.file_system:FileSystemStorage
qinling.orchestrator =
kubernetes = qinling.orchestrator.kubernetes.manager:KubernetesManager

@ -3,7 +3,6 @@
# process, which may cause wedges in the gate later.
hacking>=0.12.0,<0.13 # Apache-2.0
coverage>=4.0 # Apache-2.0
python-subunit>=0.0.18 # Apache-2.0/BSD
sphinx>=1.5.1 # BSD

@ -1,6 +1,7 @@
[DEFAULT]
namespace = qinling.config
namespace = oslo.messaging
namespace = keystonemiddleware.auth_token
namespace = oslo.messaging
namespace = oslo.log
namespace = oslo.policy
namespace = oslo.db