From df32a9f412cc40260fb10a8fe88a5bc96f05dc1b Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Tue, 9 May 2017 23:47:12 +1200 Subject: [PATCH] Complete basic workflow(Finally!) - Create runtime - Create function - Invoke function(i.e. create execution) --- README.rst | 6 +- doc/source/installation.rst | 177 +++++++++++++++++- qinling/api/controllers/v1/execution.py | 106 +++++++++++ qinling/api/controllers/v1/function.py | 70 +++++-- qinling/api/controllers/v1/resources.py | 61 +++++- qinling/api/controllers/v1/root.py | 2 + qinling/api/controllers/v1/route.py | 0 qinling/api/controllers/v1/runtime.py | 12 +- qinling/cmd/launch.py | 18 +- qinling/config.py | 16 +- qinling/context.py | 2 +- qinling/db/api.py | 31 +++ qinling/db/sqlalchemy/api.py | 86 ++++++++- .../alembic_migrations/versions/001_pike.py | 62 ++++++ qinling/db/sqlalchemy/migration/cli.py | 2 - qinling/db/sqlalchemy/model_base.py | 4 +- qinling/db/sqlalchemy/models.py | 36 +++- qinling/engine/default_engine.py | 41 +++- qinling/exceptions.py | 9 + qinling/orchestrator/base.py | 8 + qinling/orchestrator/kubernetes/manager.py | 153 +++++++++++++-- .../kubernetes/templates/deployment.j2 | 35 ++-- .../kubernetes/templates/service.j2 | 17 ++ qinling/orchestrator/swarm/manager.py | 0 qinling/rpc.py | 12 ++ qinling/storage/base.py | 31 ++- qinling/storage/file_system.py | 39 ++++ qinling/utils/common.py | 7 + runtimes/python2.7/Dockerfile | 13 ++ runtimes/python2.7/README.md | 31 +++ runtimes/python2.7/requirements.txt | 3 + runtimes/python2.7/server.py | 91 +++++++++ setup.cfg | 3 + test-requirements.txt | 1 - .../config-generator.qinling.conf | 3 +- 35 files changed, 1068 insertions(+), 120 deletions(-) delete mode 100644 qinling/api/controllers/v1/route.py create mode 100644 qinling/orchestrator/kubernetes/templates/service.j2 delete mode 100644 qinling/orchestrator/swarm/manager.py create mode 100644 runtimes/python2.7/Dockerfile create mode 100644 runtimes/python2.7/README.md create mode 100644 runtimes/python2.7/requirements.txt create mode 100644 runtimes/python2.7/server.py rename tools/{ => config}/config-generator.qinling.conf (88%) diff --git a/README.rst b/README.rst index cd7fe8ce..ef6e355a 100644 --- a/README.rst +++ b/README.rst @@ -1,8 +1,8 @@ -=============================== +======= qinling -=============================== +======= -Function as a Service +Function as a Service (Documentation needs to be added, please stay tuned!) Please fill here a long description which must be at least 3 lines wrapped on 80 cols, so that distribution package maintainers can use it in their packages. diff --git a/doc/source/installation.rst b/doc/source/installation.rst index a8c20616..d4bb70b8 100644 --- a/doc/source/installation.rst +++ b/doc/source/installation.rst @@ -1,12 +1,175 @@ -============ +Qinling Installation Guide +========================== + +Prerequisites +------------- + +It is necessary to install some specific system libs for installing Qinling. +They can be installed on most popular operating system using their package +manager (for Ubuntu - *apt*, for Fedora - *dnf*, CentOS - *yum*, for Mac OS - +*brew* or *macports*). +The list of needed packages is shown below: + +1. **python-dev** +2. **python-setuptools** +3. **python-pip** +4. **libffi-dev** +5. **libxslt1-dev (or libxslt-dev)** +6. **libxml2-dev** +7. **libyaml-dev** +8. **libssl-dev** + +In case of Ubuntu, just run:: + + $ apt-get install -y python-dev python-setuptools python-pip libffi-dev libxslt1-dev \ + libxml2-dev libyaml-dev libssl-dev + +**NOTE:** **Qinling can be used without authentication at all or it can work +with OpenStack.** In case of OpenStack, it works **only on Keystone v3**, make +sure **Keystone v3** is installed. + Installation -============ +------------ -At the command line:: +First of all, clone the repo and go to the repo directory:: - $ pip install qinling + $ git clone https://github.com/openstack/qinling.git + $ cd qinling -Or, if you have virtualenvwrapper installed:: +Generate config:: - $ mkvirtualenv qinling - $ pip install qinling + $ tox -egenconfig + +Configure Qinling as needed. The configuration file is located in +``etc/qinling.conf.sample``. You will need to modify the configuration options +and then copy it into ``/etc/qinling/qinling.conf``. +For details see :doc:`Qinling Configuration Guide ` + +**Virtualenv installation**:: + + $ tox + +This will install necessary virtual environments and run all the project tests. +Installing virtual environments may take significant time (~10-15 mins). + +**Local installation**:: + + $ pip install -e . + +or:: + + $ pip install -r requirements.txt + $ python setup.py install + +**NOTE**: Differences *pip install -e* and *setup.py install*. **pip install -e** +works very similarly to **setup.py install** or the EasyInstall tool, except +that it doesn’t actually install anything. Instead, it creates a special +.egg-link file in the deployment directory, that links to your project’s +source code. + +Before the first run +-------------------- + +After installation you will see **qinling-server** and **qinling-db-manage** commands +in your environment, either in system or virtual environment. + +**NOTE**: In case of using **virtualenv**, all Qinling related commands available via +**tox -evenv --**. For example, *qinling-server* is available via +*tox -evenv -- qinling-server*. + +**qinling-db-manage** command can be used for migrations. + +For updating the database to the latest revision type:: + + $ qinling-db-manage --config-file upgrade head + +Before starting Qinling server, run *qinling-db-manage populate* command. +It prepares the DB, creates in it with all standard actions and standard +workflows which Qinling provides for all Qinling users.:: + + $ qinling-db-manage --config-file populate + +For more detailed information about *qinling-db-manage* script please see :doc:`Qinling Upgrade Guide `. + +**NOTE**: For users who want a dry run with **SQLite** database backend(not +used in production), *qinling-db-manage* is not recommended for database +initialization because of `SQLite limitations `_. +Please use sync_db script described below instead for database initialization. + +**If you use virtualenv**:: + + $ tools/sync_db.sh --config-file + +**Or run sync_db directly**:: + + $ python tools/sync_db.py --config-file + +Running Qinling API server +-------------------------- + +To run Qinling API server perform the following command in a shell:: + + $ qinling-server --server api --config-file + +Running Qinling Engines +----------------------- + +To run Qinling Engine perform the following command in a shell:: + + $ qinling-server --server engine --config-file + +Running Qinling Task Executors +------------------------------ +To run Qinling Task Executor instance perform the following command in a shell:: + + $ qinling-server --server executor --config-file + +Note that at least one Engine instance and one Executor instance should be +running so that workflow tasks are processed by Qinling. + +Running Multiple Qinling Servers Under the Same Process +------------------------------------------------------- +To run more than one server (API, Engine, or Task Executor) on the same process, +perform the following command in a shell:: + + $ qinling-server --server api,engine --config-file + +The --server command line option can be a comma delimited list. The valid +options are "all" (by default if not specified) or any combination of "api", +"engine", and "executor". It's important to note that the "fake" transport for +the rpc_backend defined in the config file should only be used if "all" the +Qinling servers are launched on the same process. Otherwise, messages do not +get delivered if the Qinling servers are launched on different processes +because the "fake" transport is using an in process queue. + +Qinling And Docker +------------------ +Please first refer `installation steps for docker `_. +To build the image from the qinling source, change directory to the root +directory of the Qinling git repository and run:: + + $ docker build -t . + +In case you want pre-built image, you can download it from `openstack tarballs source `_. + +To load this image to docker registry, please run following command:: + + $ docker load -i '' + +The Qinling Docker image is configured to store the database in the user's home +directory. For persistence of these data, you may want to keep this directory +outside of the container. This may be done by the following steps:: + + $ sudo mkdir '' + $ docker run -it -v '':/home/qinling + +More about docker: https://www.docker.com/ + +**NOTE:** This docker image uses **SQLite** database. So, it cannot be used for +production environment. If you want to use this for production environment, +then put customized qinling.conf to ''. + +Qinling Client Installation +--------------------------- + +Please refer to :doc:`Qinling Client / CLI Guide ` diff --git a/qinling/api/controllers/v1/execution.py b/qinling/api/controllers/v1/execution.py index e69de29b..d078f36a 100644 --- a/qinling/api/controllers/v1/execution.py +++ b/qinling/api/controllers/v1/execution.py @@ -0,0 +1,106 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging +from pecan import rest +import requests +import wsmeext.pecan as wsme_pecan + +from qinling.api.controllers.v1 import resources +from qinling.api.controllers.v1 import types +from qinling.db import api as db_api +from qinling import exceptions as exc +from qinling import rpc +from qinling.utils import rest_utils + +LOG = logging.getLogger(__name__) + + +class ExecutionsController(rest.RestController): + def __init__(self, *args, **kwargs): + self.engine_client = rpc.get_engine_client() + + super(ExecutionsController, self).__init__(*args, **kwargs) + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose( + resources.Execution, + body=resources.Execution, + status_code=201 + ) + def post(self, execution): + params = execution.to_dict() + + LOG.info("Creating execution. [execution=%s]", params) + + function_id = params['function_id'] + + # Check if the service url is existing. + try: + mapping = db_api.get_function_service_mapping(function_id) + LOG.debug('Found Service url for function: %s', function_id) + + func_url = '%s/execute' % mapping.service_url + LOG.info('Invoke function %s, url: %s', function_id, func_url) + + r = requests.post(func_url, data=params.get('input')) + params.update( + {'status': 'success', 'output': {'result': r.json()}} + ) + db_model = db_api.create_execution(params) + + return resources.Execution.from_dict(db_model.to_dict()) + except exc.DBEntityNotFoundError: + pass + + func = db_api.get_function(function_id) + runtime_id = func.runtime_id + params.update({'status': 'running'}) + + db_model = db_api.create_execution(params) + + self.engine_client.create_execution( + db_model.id, function_id, runtime_id, input=params.get('input') + ) + + updated_db = db_api.get_execution(db_model.id) + + return resources.Execution.from_dict(updated_db.to_dict()) + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose(resources.Executions) + def get_all(self): + LOG.info("Get all executions.") + + executions = [resources.Execution.from_dict(db_model.to_dict()) + for db_model in db_api.get_executions()] + + return resources.Executions(executions=executions) + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose(resources.Execution, types.uuid) + def get(self, id): + LOG.info("Fetch execution [id=%s]", id) + + execution_db = db_api.get_execution(id) + + return resources.Execution.from_dict(execution_db.to_dict()) + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose(None, types.uuid, status_code=204) + def delete(self, id): + """Delete the specified Execution.""" + LOG.info("Delete execution [id=%s]", id) + + return db_api.delete_execution(id) diff --git a/qinling/api/controllers/v1/function.py b/qinling/api/controllers/v1/function.py index cfb37538..f74606d6 100644 --- a/qinling/api/controllers/v1/function.py +++ b/qinling/api/controllers/v1/function.py @@ -13,11 +13,14 @@ # limitations under the License. import json +import os from oslo_config import cfg from oslo_log import log as logging +from oslo_utils import strutils import pecan from pecan import rest +from webob.static import FileIter import wsmeext.pecan as wsme_pecan from qinling.api.controllers.v1 import resources @@ -30,66 +33,99 @@ from qinling.utils import rest_utils LOG = logging.getLogger(__name__) -POST_REQUIRED = set(['name', 'runtime', 'code']) +POST_REQUIRED = set(['name', 'runtime_id', 'code']) class FunctionsController(rest.RestController): def __init__(self, *args, **kwargs): - self.storage_provider = storage_base.load_storage_providers(cfg.CONF) + self.storage_provider = storage_base.load_storage_provider(cfg.CONF) super(FunctionsController, self).__init__(*args, **kwargs) - @rest_utils.wrap_wsme_controller_exception - @wsme_pecan.wsexpose(resources.Function, types.uuid) + @rest_utils.wrap_pecan_controller_exception + @pecan.expose() def get(self, id): LOG.info("Fetch function [id=%s]", id) + download = strutils.bool_from_string( + pecan.request.GET.get('download', False) + ) func_db = db_api.get_function(id) + ctx = context.get_ctx() - return resources.Function.from_dict(func_db.to_dict()) + if not download: + pecan.override_template('json') + return resources.Function.from_dict(func_db.to_dict()).to_dict() + else: + f = self.storage_provider.retrieve( + ctx.projectid, + id, + ) - def get_data(self, id): - pass + pecan.response.app_iter = FileIter(f) + pecan.response.headers['Content-Type'] = 'application/zip' + pecan.response.headers['Content-Disposition'] = ( + 'attachment; filename="%s"' % os.path.basename(f.name) + ) @rest_utils.wrap_pecan_controller_exception - @pecan.expose() + @pecan.expose('json') def post(self, **kwargs): - LOG.info("Create function, params=%s", kwargs) + LOG.info("Creating function, params=%s", kwargs) if not POST_REQUIRED.issubset(set(kwargs.keys())): raise exc.InputException( 'Required param is missing. Required: %s' % POST_REQUIRED ) + runtime = db_api.get_runtime(kwargs['runtime_id']) + if runtime.status != 'available': + raise exc.InputException( + 'Runtime %s not available.' % kwargs['runtime_id'] + ) + values = { 'name': kwargs['name'], - 'runtime': kwargs['runtime'], + 'description': kwargs.get('description', None), + 'runtime_id': kwargs['runtime_id'], 'code': json.loads(kwargs['code']), - 'storage': 'local' + 'entry': kwargs.get('entry', 'main'), } if values['code'].get('package', False): data = kwargs['package'].file.read() ctx = context.get_ctx() + with db_api.transaction(): func_db = db_api.create_function(values) - self.storage_provider[values['storage']].store( + self.storage_provider.store( ctx.projectid, - values['name'], + func_db.id, data ) pecan.response.status = 201 - return resources.Function.from_dict(func_db.to_dict()) + return resources.Function.from_dict(func_db.to_dict()).to_dict() @rest_utils.wrap_wsme_controller_exception @wsme_pecan.wsexpose(resources.Functions) def get_all(self): LOG.info("Get all functions.") - funcs = resources.Functions() - funcs.functions = [] + functions = [resources.Function.from_dict(db_model.to_dict()) + for db_model in db_api.get_functions()] - return funcs + return resources.Functions(functions=functions) + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose(None, types.uuid, status_code=204) + def delete(self, id): + """Delete the specified function.""" + LOG.info("Delete function [id=%s]", id) + + with db_api.transaction(): + db_api.delete_function(id) + + self.storage_provider.delete(context.get_ctx().projectid, id) diff --git a/qinling/api/controllers/v1/resources.py b/qinling/api/controllers/v1/resources.py index 9dea5e63..83171a57 100644 --- a/qinling/api/controllers/v1/resources.py +++ b/qinling/api/controllers/v1/resources.py @@ -19,8 +19,6 @@ from wsme import types as wtypes from qinling.api.controllers.v1 import types -PROVIDER_TYPES = wtypes.Enum(str, 'docker', 'fission') - class Resource(wtypes.Base): """REST API Resource.""" @@ -166,11 +164,11 @@ class Function(Resource): id = wtypes.text name = wtypes.text description = wtypes.text - memorysize = int + memory_size = int timeout = int - runtime = wtypes.text + runtime_id = types.uuid code = types.jsontype - provider = PROVIDER_TYPES + entry = wtypes.text created_at = wtypes.text updated_at = wtypes.text @@ -180,11 +178,11 @@ class Function(Resource): id='123e4567-e89b-12d3-a456-426655440000', name='hello_world', description='this is the first function.', - memorysize=1, + memory_size=1, timeout=1, - runtime='python2.7', + runtime_id='123e4567-e89b-12d3-a456-426655440001', code={'zip': True}, - provider='docker', + entry='main', created_at='1970-01-01T00:00:00.000000', updated_at='1970-01-01T00:00:00.000000' ) @@ -228,7 +226,7 @@ class Runtime(Resource): name='python2.7', image='lingxiankong/python', status='available', - project_id='', + project_id='default', description='Python 2.7 environment.', created_at='1970-01-01T00:00:00.000000', updated_at='1970-01-01T00:00:00.000000' @@ -254,3 +252,48 @@ class Runtimes(ResourceList): ) return sample + + +class Execution(Resource): + id = types.uuid + function_id = wsme.wsattr(types.uuid, mandatory=True) + status = wsme.wsattr(wtypes.text, readonly=True) + sync = bool + input = types.jsontype + output = wsme.wsattr(types.jsontype, readonly=True) + created_at = wsme.wsattr(wtypes.text, readonly=True) + updated_at = wsme.wsattr(wtypes.text, readonly=True) + + @classmethod + def sample(cls): + return cls( + id='123e4567-e89b-12d3-a456-426655440000', + function_id='123e4567-e89b-12d3-a456-426655440000', + status='success', + sync=True, + input={'data': 'hello, world'}, + output={'result': 'hello, world'}, + created_at='1970-01-01T00:00:00.000000', + updated_at='1970-01-01T00:00:00.000000' + ) + + +class Executions(ResourceList): + executions = [Execution] + + def __init__(self, **kwargs): + self._type = 'executions' + + super(Executions, self).__init__(**kwargs) + + @classmethod + def sample(cls): + sample = cls() + sample.executions = [Execution.sample()] + sample.next = ( + "http://localhost:7070/v1/executions?" + "sort_keys=id,name&sort_dirs=asc,desc&limit=10&" + "marker=123e4567-e89b-12d3-a456-426655440000" + ) + + return sample diff --git a/qinling/api/controllers/v1/root.py b/qinling/api/controllers/v1/root.py index 43d12fc9..6329d5bd 100644 --- a/qinling/api/controllers/v1/root.py +++ b/qinling/api/controllers/v1/root.py @@ -16,6 +16,7 @@ import pecan from wsme import types as wtypes import wsmeext.pecan as wsme_pecan +from qinling.api.controllers.v1 import execution from qinling.api.controllers.v1 import function from qinling.api.controllers.v1 import resources from qinling.api.controllers.v1 import runtime @@ -35,6 +36,7 @@ class Controller(object): functions = function.FunctionsController() runtimes = runtime.RuntimesController() + executions = execution.ExecutionsController() @wsme_pecan.wsexpose(RootResource) def index(self): diff --git a/qinling/api/controllers/v1/route.py b/qinling/api/controllers/v1/route.py deleted file mode 100644 index e69de29b..00000000 diff --git a/qinling/api/controllers/v1/runtime.py b/qinling/api/controllers/v1/runtime.py index ce5196ee..b5f5bc6c 100644 --- a/qinling/api/controllers/v1/runtime.py +++ b/qinling/api/controllers/v1/runtime.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from oslo_config import cfg from oslo_log import log as logging from pecan import rest import wsmeext.pecan as wsme_pecan @@ -20,6 +19,7 @@ import wsmeext.pecan as wsme_pecan from qinling.api.controllers.v1 import resources from qinling.api.controllers.v1 import types from qinling.db import api as db_api +from qinling import exceptions as exc from qinling import rpc from qinling.utils import rest_utils @@ -51,7 +51,6 @@ class RuntimesController(rest.RestController): return resources.Runtimes(runtimes=runtimes) - @rest_utils.wrap_wsme_controller_exception @wsme_pecan.wsexpose( resources.Runtime, @@ -79,6 +78,15 @@ class RuntimesController(rest.RestController): with db_api.transaction(): runtime_db = db_api.get_runtime(id) + + # Runtime can not be deleted if still associate with functions. + funcs = db_api.get_functions(runtime_id={'eq': id}) + if len(funcs): + raise exc.NotAllowedException( + 'Runtime %s is still in use.' % id + ) + runtime_db.status = 'deleting' + # Clean related resources asynchronously self.engine_client.delete_runtime(id) diff --git a/qinling/cmd/launch.py b/qinling/cmd/launch.py index 61943e18..1f2746b2 100644 --- a/qinling/cmd/launch.py +++ b/qinling/cmd/launch.py @@ -23,7 +23,7 @@ eventlet.monkey_patch( thread=False if '--use-debugger' in sys.argv else True, time=True) -import os # noqa +import os # If ../qingling/__init__.py exists, add ../ to Python search path, so that # it will override what happens to be installed in /usr/(local/)lib/python... @@ -33,15 +33,15 @@ POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'qinling', '__init__.py')): sys.path.insert(0, POSSIBLE_TOPDIR) -from oslo_config import cfg # noqa -from oslo_log import log as logging # noqa -from oslo_service import service # noqa +from oslo_config import cfg +from oslo_log import log as logging +from oslo_service import service -from qinling.api import service as api_service # noqa -from qinling import config # noqa -from qinling.engine import service as eng_service # noqa -from qinling import rpc # noqa -from qinling import version # noqa +from qinling.api import service as api_service +from qinling import config +from qinling.engine import service as eng_service +from qinling import rpc +from qinling import version CONF = cfg.CONF diff --git a/qinling/config.py b/qinling/config.py index 28630d90..b69efa86 100644 --- a/qinling/config.py +++ b/qinling/config.py @@ -99,7 +99,13 @@ storage_opts = [ 'file_system_dir', default='/opt/qinling/funtion/packages', help='Directory to store funtion packages.' - ) + ), + cfg.StrOpt( + 'provider', + default='local', + choices=['local', 'swift'], + help='Storage provider for function code package.' + ), ] KUBERNETES_GROUP = 'kubernetes' @@ -118,11 +124,9 @@ kubernetes_opts = [ 'kube_host', help='Kubernetes server address.' ), - cfg.StrOpt( - 'volume_name', - default='functiondir', - help='Name of the volume shared between worker container and utility ' - 'container.' + cfg.IPOpt( + 'qinling_service_address', + help='Qinling API service ip address.' ), ] diff --git a/qinling/context.py b/qinling/context.py index 566d548d..0115c836 100644 --- a/qinling/context.py +++ b/qinling/context.py @@ -26,7 +26,7 @@ ALLOWED_WITHOUT_AUTH = ['/', '/v1/'] CTX_THREAD_LOCAL_NAME = "QINLING_APP_CTX_THREAD_LOCAL" -DEFAULT_PROJECT_ID = "" +DEFAULT_PROJECT_ID = "default" def authenticate(req): diff --git a/qinling/db/api.py b/qinling/db/api.py index cdcfb9ba..cf7d009c 100644 --- a/qinling/db/api.py +++ b/qinling/db/api.py @@ -106,3 +106,34 @@ def delete_runtime(id): def update_runtime(id, values): return IMPL.update_runtime(id, values) + + +# Execution + + +def create_execution(values): + return IMPL.create_execution(values) + + +def get_execution(id): + return IMPL.get_execution(id) + + +def get_executions(): + return IMPL.get_executions() + + +def delete_execution(id): + return IMPL.delete_execution(id) + + +def update_execution(id, values): + return IMPL.update_execution(id, values) + + +def create_function_service_mapping(values): + return IMPL.create_function_service_mapping(values) + + +def get_function_service_mapping(function_id): + return IMPL.get_function_service_mapping(function_id) diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py index 5171c8a6..87cb6773 100644 --- a/qinling/db/sqlalchemy/api.py +++ b/qinling/db/sqlalchemy/api.py @@ -183,14 +183,18 @@ def _get_db_object_by_id(model, id, insecure=False): @db_base.session_aware() -def get_function(id): - pass +def get_function(id, session=None): + function = _get_db_object_by_id(models.Function, id) + + if not function: + raise exc.DBEntityNotFoundError("Function not found [id=%s]" % id) + + return function @db_base.session_aware() -def get_functions(limit=None, marker=None, sort_keys=None, - sort_dirs=None, fields=None, **kwargs): - pass +def get_functions(session=None, **kwargs): + return _get_collection_sorted_by_time(models.Function, **kwargs) @db_base.session_aware() @@ -214,8 +218,10 @@ def update_function(id, values): @db_base.session_aware() -def delete_function(id): - pass +def delete_function(id, session=None): + function = get_function(id) + + session.delete(function) @db_base.session_aware() @@ -253,3 +259,69 @@ def delete_runtime(id, session=None): runtime = get_runtime(id) session.delete(runtime) + + +@db_base.session_aware() +def create_execution(values, session=None): + execution = models.Execution() + execution.update(values.copy()) + + try: + execution.save(session=session) + except oslo_db_exc.DBDuplicateEntry as e: + raise exc.DBError( + "Duplicate entry for Execution: %s" % e.columns + ) + + return execution + + +@db_base.session_aware() +def get_execution(id, session=None): + execution = _get_db_object_by_id(models.Execution, id) + + if not execution: + raise exc.DBEntityNotFoundError("Execution not found [id=%s]" % id) + + return execution + + +@db_base.session_aware() +def get_executions(session=None, **kwargs): + return _get_collection_sorted_by_time(models.Execution, **kwargs) + + +@db_base.session_aware() +def delete_execution(id, session=None): + execution = get_execution(id) + + session.delete(execution) + + +@db_base.session_aware() +def create_function_service_mapping(values, session=None): + mapping = models.FunctionServiceMapping() + mapping.update(values.copy()) + + try: + mapping.save(session=session) + except oslo_db_exc.DBDuplicateEntry as e: + raise exc.DBError( + "Duplicate entry for FunctionServiceMapping: %s" % e.columns + ) + + return mapping + + +@db_base.session_aware() +def get_function_service_mapping(function_id, session=None): + mapping = db_base.model_query( + models.FunctionServiceMapping + ).filter_by(function_id=function_id).first() + + if not mapping: + raise exc.DBEntityNotFoundError( + "FunctionServiceMapping not found [function_id=%s]" % function_id + ) + + return mapping diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py index 5227409a..2031c432 100644 --- a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py +++ b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py @@ -24,13 +24,58 @@ Create Date: 2017-05-03 12:02:51.935368 revision = '001' down_revision = None +import re + from alembic import op import sqlalchemy as sa +from sqlalchemy.ext.compiler import compiles +from sqlalchemy.schema import CreateTable from qinling.db.sqlalchemy import types as st +@compiles(CreateTable) +def _add_if_not_exists(element, compiler, **kw): + output = compiler.visit_create_table(element, **kw) + if element.element.info.get("check_ifexists"): + output = re.sub( + "^\s*CREATE TABLE", "CREATE TABLE IF NOT EXISTS", output, re.S) + return output + + def upgrade(): + op.create_table( + 'function', + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('project_id', sa.String(length=80), nullable=False), + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('name', sa.String(length=255), nullable=True), + sa.Column('description', sa.String(length=255), nullable=True), + sa.Column('runtime_id', sa.String(length=36), nullable=False), + sa.Column('memory_size', sa.Integer, nullable=True), + sa.Column('timeout', sa.Integer, nullable=True), + sa.Column('code', st.JsonLongDictType(), nullable=False), + sa.Column('entry', sa.String(length=80), nullable=False), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('name', 'project_id'), + info={"check_ifexists": True} + ) + + op.create_table( + 'function_service_mapping', + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('function_id', sa.String(length=36), nullable=False), + sa.Column('service_url', sa.String(length=255), nullable=False), + sa.PrimaryKeyConstraint('function_id'), + sa.UniqueConstraint('function_id', 'service_url'), + sa.ForeignKeyConstraint( + ['function_id'], [u'function.id'], ondelete='CASCADE' + ), + info={"check_ifexists": True} + ) + op.create_table( 'runtime', sa.Column('created_at', sa.DateTime(), nullable=True), @@ -42,4 +87,21 @@ def upgrade(): sa.Column('image', sa.String(length=255), nullable=False), sa.Column('status', sa.String(length=32), nullable=False), sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('name'), + info={"check_ifexists": True} + ) + + op.create_table( + 'execution', + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('project_id', sa.String(length=80), nullable=False), + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('function_id', sa.String(length=36), nullable=False), + sa.Column('status', sa.String(length=32), nullable=False), + sa.Column('sync', sa.BOOLEAN, nullable=False), + sa.Column('input', st.JsonLongDictType(), nullable=True), + sa.Column('output', st.JsonLongDictType(), nullable=True), + sa.PrimaryKeyConstraint('id'), + info={"check_ifexists": True} ) diff --git a/qinling/db/sqlalchemy/migration/cli.py b/qinling/db/sqlalchemy/migration/cli.py index b660aecb..643a250e 100644 --- a/qinling/db/sqlalchemy/migration/cli.py +++ b/qinling/db/sqlalchemy/migration/cli.py @@ -23,8 +23,6 @@ from oslo_config import cfg from oslo_utils import importutils import six -from qinling import config - # We need to import mistral.api.app to # make sure we register all needed options. importutils.try_import('qinling.api.app') diff --git a/qinling/db/sqlalchemy/model_base.py b/qinling/db/sqlalchemy/model_base.py index 0e328683..2e653943 100644 --- a/qinling/db/sqlalchemy/model_base.py +++ b/qinling/db/sqlalchemy/model_base.py @@ -15,19 +15,19 @@ import six from oslo_db.sqlalchemy import models as oslo_models -from oslo_utils import uuidutils import sqlalchemy as sa from sqlalchemy.ext import declarative from sqlalchemy.orm import attributes from qinling import context +from qinling.utils import common def id_column(): return sa.Column( sa.String(36), primary_key=True, - default=uuidutils.generate_uuid() + default=common.generate_unicode_uuid ) diff --git a/qinling/db/sqlalchemy/models.py b/qinling/db/sqlalchemy/models.py index a4986037..ca94b974 100644 --- a/qinling/db/sqlalchemy/models.py +++ b/qinling/db/sqlalchemy/models.py @@ -27,12 +27,26 @@ class Function(model_base.QinlingSecureModelBase): name = sa.Column(sa.String(255), nullable=False) description = sa.Column(sa.String(255)) - runtime = sa.Column(sa.String(32), nullable=False) - memorysize = sa.Column(sa.Integer, nullable=False) - timeout = sa.Column(sa.Integer, nullable=False) - provider = sa.Column(sa.String(32), nullable=False) - package = sa.Column(sa.Boolean, nullable=False) + runtime_id = sa.Column(sa.String(36), nullable=False) + memory_size = sa.Column(sa.Integer) + timeout = sa.Column(sa.Integer) code = sa.Column(st.JsonLongDictType(), nullable=False) + entry = sa.Column(sa.String(80), nullable=False) + + +class FunctionServiceMapping(model_base.QinlingModelBase): + __tablename__ = 'function_service_mapping' + + __table_args__ = ( + sa.UniqueConstraint('function_id', 'service_url'), + ) + + function_id = sa.Column( + sa.String(36), + sa.ForeignKey(Function.id, ondelete='CASCADE'), + primary_key=True, + ) + service_url = sa.Column(sa.String(255), nullable=False) class Runtime(model_base.QinlingSecureModelBase): @@ -42,3 +56,15 @@ class Runtime(model_base.QinlingSecureModelBase): description = sa.Column(sa.String(255)) image = sa.Column(sa.String(255), nullable=False) status = sa.Column(sa.String(32), nullable=False) + + sa.UniqueConstraint('name') + + +class Execution(model_base.QinlingSecureModelBase): + __tablename__ = 'execution' + + function_id = sa.Column(sa.String(36), nullable=False) + status = sa.Column(sa.String(32), nullable=False) + sync = sa.Column(sa.BOOLEAN, default=True) + input = sa.Column(st.JsonLongDictType()) + output = sa.Column(st.JsonLongDictType()) diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index c941e510..040cd191 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -12,11 +12,9 @@ # License for the specific language governing permissions and limitations # under the License. -from oslo_config import cfg from oslo_log import log as logging from qinling.db import api as db_api -from qinling import exceptions as exc LOG = logging.getLogger(__name__) @@ -50,8 +48,6 @@ class DefaultEngine(object): runtime.status = 'error' - raise exc.OrchestratorException('Failed to create pool.') - def delete_runtime(self, ctx, runtime_id): LOG.info('Start to delete runtime, id=%s', runtime_id) @@ -65,3 +61,40 @@ class DefaultEngine(object): db_api.delete_runtime(runtime_id) LOG.info('Runtime %s deleted.', runtime_id) + + def create_execution(self, ctx, execution_id, function_id, runtime_id, + input=None): + LOG.info( + 'Creating execution. execution_id=%s, function_id=%s, ' + 'runtime_id=%s', + execution_id, function_id, runtime_id + ) + + with db_api.transaction(): + execution = db_api.get_execution(execution_id) + runtime = db_api.get_runtime(runtime_id) + identifier = '%s-%s' % (runtime_id, runtime.name) + labels = {'runtime_name': runtime.name, 'runtime_id': runtime_id} + + service_url = self.orchestrator.prepare_execution( + function_id, identifier=identifier, labels=labels + ) + + output = self.orchestrator.run_execution( + function_id, input=input, service_url=service_url + ) + + LOG.debug( + 'Finished execution. execution_id=%s, output=%s', + execution_id, + output + ) + + execution.output = output + execution.status = 'success' + + mapping = { + 'function_id': function_id, + 'service_url': service_url + } + db_api.create_function_service_mapping(mapping) diff --git a/qinling/exceptions.py b/qinling/exceptions.py index 14be4317..1009b815 100644 --- a/qinling/exceptions.py +++ b/qinling/exceptions.py @@ -78,6 +78,15 @@ class ApplicationContextNotFoundException(QinlingException): message = "Application context not found" +class StorageNotFoundException(QinlingException): + http_code = 404 + message = "Storage file not found" + + +class StorageProviderException(QinlingException): + http_code = 500 + + class OrchestratorException(QinlingException): http_code = 500 message = "Orchestrator error." diff --git a/qinling/orchestrator/base.py b/qinling/orchestrator/base.py index 8695c910..ddb641e5 100644 --- a/qinling/orchestrator/base.py +++ b/qinling/orchestrator/base.py @@ -34,6 +34,14 @@ class OrchestratorBase(object): def delete_pool(self, name, **kwargs): raise NotImplementedError + @abc.abstractmethod + def prepare_execution(self, function_id, **kwargs): + raise NotImplementedError + + @abc.abstractmethod + def run_execution(self, function_id, **kwargs): + raise NotImplementedError + def load_orchestrator(conf): global ORCHESTRATOR diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index 20374db6..27ffbe90 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -13,16 +13,15 @@ # limitations under the License. import os +import time import jinja2 -from kubernetes import config from kubernetes import client -from kubernetes.client import models -from kubernetes.client.rest import ApiException -from oslo_config import cfg from oslo_log import log as logging +import requests import yaml +from qinling import exceptions as exc from qinling.orchestrator import base from qinling.utils import common @@ -47,10 +46,12 @@ class KubernetesManager(base.OrchestratorBase): searchpath=os.path.dirname(TEMPLATES_DIR) ) jinja_env = jinja2.Environment( - loader=template_loader, autoescape=True, trim_blocks=True + loader=template_loader, autoescape=True, trim_blocks=True, + lstrip_blocks=True ) self.deployment_template = jinja_env.get_template('deployment.j2') + self.service_template = jinja_env.get_template('service.j2') def _ensure_namespace(self): ret = self.v1.list_namespace() @@ -80,7 +81,6 @@ class KubernetesManager(base.OrchestratorBase): "name": name, "labels": labels if labels else {}, "replicas": self.conf.kubernetes.replicas, - "volume_name": self.conf.kubernetes.volume_name, "container_name": 'worker', "image": image, } @@ -104,13 +104,6 @@ class KubernetesManager(base.OrchestratorBase): selector = common.convert_dict_to_string(labels) - self.v1.delete_collection_namespaced_pod( - self.conf.kubernetes.namespace, - label_selector=selector - ) - - LOG.info("Pods in deployment %s deleted.", name) - self.v1extention.delete_collection_namespaced_replica_set( self.conf.kubernetes.namespace, label_selector=selector @@ -122,11 +115,10 @@ class KubernetesManager(base.OrchestratorBase): self.conf.kubernetes.namespace, label_selector=selector ) names = [i.metadata.name for i in ret.items] - for name in names: + for svc_name in names: self.v1.delete_namespaced_service( - name, + svc_name, self.conf.kubernetes.namespace, - models.v1_delete_options.V1DeleteOptions() ) LOG.info("Services in deployment %s deleted.", name) @@ -137,4 +129,133 @@ class KubernetesManager(base.OrchestratorBase): field_selector='metadata.name=%s' % name ) + # Should delete pods after deleting deployment to avoid pods are + # recreated by k8s. + self.v1.delete_collection_namespaced_pod( + self.conf.kubernetes.namespace, + label_selector=selector + ) + + LOG.info("Pods in deployment %s deleted.", name) LOG.info("Deployment %s deleted.", name) + + def _choose_available_pod(self, labels): + selector = common.convert_dict_to_string(labels) + + ret = self.v1.list_namespaced_pod( + self.conf.kubernetes.namespace, + label_selector='!function_id,%s' % selector + ) + + if len(ret.items) == 0: + return None + + # Choose the last available one by default. + pod = ret.items[-1] + + return pod + + def _prepare_pod(self, pod, deployment_name, function_id, service_labels): + """Pod preparation. + + 1. Update pod labels. + 2. Expose service and trigger package download. + """ + name = pod.metadata.name + + LOG.info( + 'Prepare pod %s in deployment %s for function %s', + name, deployment_name, function_id + ) + + # Update pod label. + pod_labels = pod.metadata.labels or {} + pod_labels.update({'function_id': function_id}) + body = { + 'metadata': { + 'labels': pod_labels + } + } + self.v1.patch_namespaced_pod( + name, self.conf.kubernetes.namespace, body + ) + + LOG.debug('Labels updated for pod %s', name) + + # Create service for the choosen pod. + service_name = "service-%s" % function_id + service_body = self.service_template.render( + { + "service_name": service_name, + "labels": service_labels, + "selector": pod_labels + } + ) + ret = self.v1.create_namespaced_service( + self.conf.kubernetes.namespace, yaml.safe_load(service_body) + ) + node_port = ret.spec.ports[0].node_port + + LOG.debug( + 'Service created for pod %s, service name: %s, node port: %s', + name, service_name, node_port + ) + + # Get external ip address for an arbitary node. + ret = self.v1.list_node() + addresses = ret.items[0].status.addresses + node_ip = None + for addr in addresses: + if addr.type == 'ExternalIP': + node_ip = addr.address + + # FIXME: test purpose using minikube + if not node_ip: + for addr in addresses: + if addr.type == 'InternalIP': + node_ip = addr.address + + # Download code package into container. + pod_service_url = 'http://%s:%s' % (node_ip, node_port) + request_url = '%s/download' % pod_service_url + download_url = ( + 'http://%s:%s/v1/functions/%s?download=true' % + (self.conf.kubernetes.qinling_service_address, + self.conf.api.port, function_id) + ) + data = {'download_url': download_url, 'function_id': function_id} + + LOG.debug( + 'Send request to pod %s, request_url: %s, data: %s', + name, request_url, data + ) + + # TODO(kong): Here we sleep some time to avoid 'Failed to establish a + # new connection' error for some reason. Needs to find a better + # solution. + time.sleep(1) + r = requests.post(request_url, data=data) + + if r.status_code != requests.codes.ok: + raise exc.OrchestratorException( + 'Failed to download function code package.' + ) + + return pod_service_url + + def prepare_execution(self, function_id, identifier=None, labels=None): + pod = self._choose_available_pod(labels) + + if not pod: + raise exc.OrchestratorException('No pod available.') + + return self._prepare_pod(pod, identifier, function_id, labels) + + def run_execution(self, function_id, input=None, service_url=None): + func_url = '%s/execute' % service_url + + LOG.info('Invoke function %s, url: %s', function_id, func_url) + + r = requests.post(func_url, data=input) + + return {'result': r.json()} diff --git a/qinling/orchestrator/kubernetes/templates/deployment.j2 b/qinling/orchestrator/kubernetes/templates/deployment.j2 index 7c722331..8e509c3c 100644 --- a/qinling/orchestrator/kubernetes/templates/deployment.j2 +++ b/qinling/orchestrator/kubernetes/templates/deployment.j2 @@ -3,39 +3,26 @@ kind: Deployment metadata: name: {{ name }} labels: -{% for key, value in labels.items() %} - {{ key}}: {{ value }} -{% endfor %} + {% for key, value in labels.items() %} + {{ key }}: {{ value }} + {% endfor %} spec: replicas: {{ replicas }} selector: matchLabels: -{% for key, value in labels.items() %} - {{ key}}: {{ value }} -{% endfor %} + {% for key, value in labels.items() %} + {{ key }}: {{ value }} + {% endfor %} template: metadata: labels: -{% for key, value in labels.items() %} - {{ key}}: {{ value }} -{% endfor %} + {% for key, value in labels.items() %} + {{ key }}: {{ value }} + {% endfor %} spec: - volumes: - - name: {{ volume_name }} - emptyDir: {} containers: - name: {{ container_name }} image: {{ image }} imagePullPolicy: IfNotPresent - volumeMounts: - - name: {{ volume_name }} - mountPath: /function - - name: fetcher - image: fission/fetcher - imagePullPolicy: IfNotPresent - volumeMounts: - - name: {{ volume_name }} - mountPath: /function - command: - - /fetcher - - /function \ No newline at end of file + ports: + - containerPort: 9090 diff --git a/qinling/orchestrator/kubernetes/templates/service.j2 b/qinling/orchestrator/kubernetes/templates/service.j2 new file mode 100644 index 00000000..64bdf3ed --- /dev/null +++ b/qinling/orchestrator/kubernetes/templates/service.j2 @@ -0,0 +1,17 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ service_name }} + labels: + {% for key, value in labels.items() %} + {{ key }}: {{ value }} + {% endfor %} +spec: + type: NodePort + selector: + {% for key, value in selector.items() %} + {{ key}}: "{{ value }}" + {% endfor %} + ports: + - protocol: TCP + port: 9090 diff --git a/qinling/orchestrator/swarm/manager.py b/qinling/orchestrator/swarm/manager.py deleted file mode 100644 index e69de29b..00000000 diff --git a/qinling/rpc.py b/qinling/rpc.py index e9c877f8..36b16d88 100644 --- a/qinling/rpc.py +++ b/qinling/rpc.py @@ -144,3 +144,15 @@ class EngineClient(object): 'delete_runtime', runtime_id=id ) + + @wrap_messaging_exception + def create_execution(self, execution_id, function_id, runtime_id, + input=None): + return self._client.prepare(topic=self.topic, server=None).call( + ctx.get_ctx(), + 'create_execution', + execution_id=execution_id, + function_id=function_id, + runtime_id=runtime_id, + input=input + ) diff --git a/qinling/storage/base.py b/qinling/storage/base.py index 629d9671..3ebfc378 100644 --- a/qinling/storage/base.py +++ b/qinling/storage/base.py @@ -15,8 +15,11 @@ import abc import six +from stevedore import driver -STORAGE_PROVIDER_MAPPING = {} +from qinling import exceptions as exc + +STORAGE_PROVIDER = None @six.add_metaclass(abc.ABCMeta) @@ -31,8 +34,28 @@ class PackageStorage(object): def retrieve(self, project_id, function): raise NotImplementedError + @abc.abstractmethod + def delete(self, project_id, function): + raise NotImplementedError -def load_storage_providers(conf): - global STORAGE_PROVIDER_MAPPING - return STORAGE_PROVIDER_MAPPING +def load_storage_provider(conf): + global STORAGE_PROVIDER + + if not STORAGE_PROVIDER: + try: + mgr = driver.DriverManager( + 'qinling.storage.provider', + conf.storage.provider, + invoke_on_load=True, + invoke_args=[conf] + ) + + STORAGE_PROVIDER = mgr.driver + except Exception as e: + raise exc.StorageProviderException( + 'Failed to load storage provider: %s. Error: %s' % + (conf.storage.provider, str(e)) + ) + + return STORAGE_PROVIDER diff --git a/qinling/storage/file_system.py b/qinling/storage/file_system.py index c5b9998e..f962754c 100644 --- a/qinling/storage/file_system.py +++ b/qinling/storage/file_system.py @@ -12,10 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os + from oslo_config import cfg from oslo_log import log as logging from oslo_utils import fileutils +from qinling import exceptions as exc from qinling.storage import base LOG = logging.getLogger(__name__) @@ -33,7 +36,43 @@ class FileSystemStorage(base.PackageStorage): 'Store package, function: %s, project: %s', function, project_id ) + project_path = os.path.join(CONF.storage.file_system_dir, project_id) + fileutils.ensure_tree(project_path) + + func_zip = os.path.join(project_path, '%s.zip' % function) + with open(func_zip, 'wb') as fd: + fd.write(data) + def retrieve(self, project_id, function): LOG.info( 'Get package data, function: %s, project: %s', function, project_id ) + + func_zip = os.path.join( + CONF.storage.file_system_dir, + '%s/%s.zip' % (project_id, function) + ) + + if not os.path.exists(func_zip): + raise exc.StorageNotFoundException( + 'Package of function %s for project %s not found.' % + (function, project_id) + ) + + f = open(func_zip, 'rb') + + return f + + def delete(self, project_id, function): + LOG.info( + 'Delete package data, function: %s, project: %s', function, + project_id + ) + + func_zip = os.path.join( + CONF.storage.file_system_dir, + '%s/%s.zip' % (project_id, function) + ) + + if os.path.exists(func_zip): + os.remove(func_zip) diff --git a/qinling/utils/common.py b/qinling/utils/common.py index 44874816..43f9835e 100644 --- a/qinling/utils/common.py +++ b/qinling/utils/common.py @@ -12,7 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +from oslo_utils import uuidutils + + def convert_dict_to_string(d): temp_list = ['%s=%s' % (k, v) for k, v in d.items()] return ','.join(temp_list) + + +def generate_unicode_uuid(): + return uuidutils.generate_uuid() diff --git a/runtimes/python2.7/Dockerfile b/runtimes/python2.7/Dockerfile new file mode 100644 index 00000000..b33653be --- /dev/null +++ b/runtimes/python2.7/Dockerfile @@ -0,0 +1,13 @@ +FROM alpine:3.5 + +RUN apk update +RUN apk add --no-cache python2 python2-dev build-base py2-pip +RUN pip install --upgrade pip +RUN rm -r /root/.cache + +COPY . /app +WORKDIR /app +RUN pip install -r requirements.txt + +ENTRYPOINT ["python"] +CMD ["server.py"] diff --git a/runtimes/python2.7/README.md b/runtimes/python2.7/README.md new file mode 100644 index 00000000..3529849e --- /dev/null +++ b/runtimes/python2.7/README.md @@ -0,0 +1,31 @@ +# Qinling: Python Environment + +This is the Python environment for Qinling. + +It's a Docker image containing a Python 2.7 runtime, along with a +dynamic loader. A few common dependencies are included in the +requirements.txt file. + +## Customizing this image + +To add package dependencies, edit requirements.txt to add what you +need, and rebuild this image (instructions below). + +You also may want to customize what's available to the function in its +request context. You can do this by editing server.py (see the +comment in that file about customizing request context). + +## Rebuilding and pushing the image + +You'll need access to a Docker registry to push the image: you can +sign up for Docker hub at hub.docker.com, or use registries from +gcr.io, quay.io, etc. Let's assume you're using a docker hub account +called USER. Build and push the image to the the registry: + +``` + docker build -t USER/python-env . && docker push USER/python-env +``` + +## Using the image in Qinling + +TBD diff --git a/runtimes/python2.7/requirements.txt b/runtimes/python2.7/requirements.txt new file mode 100644 index 00000000..5ff6fc84 --- /dev/null +++ b/runtimes/python2.7/requirements.txt @@ -0,0 +1,3 @@ +Flask>=0.11.1 +httplib2 +requests>=2.7.0 diff --git a/runtimes/python2.7/server.py b/runtimes/python2.7/server.py new file mode 100644 index 00000000..6b43e309 --- /dev/null +++ b/runtimes/python2.7/server.py @@ -0,0 +1,91 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import sys +import time +import zipfile +import zipimport + +from flask import abort +from flask import Flask +from flask import request +from flask import Response +import requests + +app = Flask(__name__) +file_name = '' + + +@app.route('/download', methods=['POST']) +def download(): + service_url = request.form['download_url'] + function_id = request.form['function_id'] + + global file_name + file_name = '%s.zip' % function_id + + app.logger.info('Request received, service_url:%s' % service_url) + + r = requests.get(service_url, stream=True) + + with open(file_name, 'wb') as fd: + for chunk in r.iter_content(chunk_size=128): + fd.write(chunk) + + if not zipfile.is_zipfile(file_name): + abort(500) + + app.logger.info('Code package downloaded to %s' % file_name) + + return 'success' + + +@app.route('/execute', methods=['POST']) +def execute(): + global file_name + importer = zipimport.zipimporter(file_name) + module = importer.load_module('main') + + start = time.time() + try: + result = module.main() + except Exception as e: + result = str(e) + + duration = time.time() - start + + return Response( + response=json.dumps({'output': result, 'duration': duration}), + status=200, + mimetype='application/json' + ) + + +def setup_logger(loglevel): + global app + root = logging.getLogger() + root.setLevel(loglevel) + ch = logging.StreamHandler(sys.stdout) + ch.setLevel(loglevel) + ch.setFormatter( + logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + ) + app.logger.addHandler(ch) + + +setup_logger(logging.DEBUG) +app.logger.info("Starting server") +app.run(host='0.0.0.0', port='9090') diff --git a/setup.cfg b/setup.cfg index 3adc8729..6cb1d32b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -27,6 +27,9 @@ console_scripts = qinling-server = qinling.cmd.launch:main qinling-db-manage = qinling.db.sqlalchemy.migration.cli:main +qinling.storage.provider: + local = qinling.storage.file_system:FileSystemStorage + qinling.orchestrator = kubernetes = qinling.orchestrator.kubernetes.manager:KubernetesManager diff --git a/test-requirements.txt b/test-requirements.txt index 8e86954d..78affe49 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -3,7 +3,6 @@ # process, which may cause wedges in the gate later. hacking>=0.12.0,<0.13 # Apache-2.0 - coverage>=4.0 # Apache-2.0 python-subunit>=0.0.18 # Apache-2.0/BSD sphinx>=1.5.1 # BSD diff --git a/tools/config-generator.qinling.conf b/tools/config/config-generator.qinling.conf similarity index 88% rename from tools/config-generator.qinling.conf rename to tools/config/config-generator.qinling.conf index 4d7bbe30..3d57c9a4 100644 --- a/tools/config-generator.qinling.conf +++ b/tools/config/config-generator.qinling.conf @@ -1,6 +1,7 @@ [DEFAULT] namespace = qinling.config -namespace = oslo.messaging namespace = keystonemiddleware.auth_token +namespace = oslo.messaging namespace = oslo.log namespace = oslo.policy +namespace = oslo.db