diff --git a/qinling/api/controllers/v1/environment.py b/qinling/api/controllers/v1/environment.py deleted file mode 100644 index 34b5e8b3..00000000 --- a/qinling/api/controllers/v1/environment.py +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright 2017 Catalyst IT Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from oslo_config import cfg -from oslo_log import log as logging -from pecan import rest -import wsmeext.pecan as wsme_pecan - -from qinling.api.controllers.v1 import resources -from qinling.api.controllers.v1 import types -from qinling.engine import rpc -from qinling.utils import rest_utils - -LOG = logging.getLogger(__name__) - - -class EnvironmentsController(rest.RestController): - def __init__(self, *args, **kwargs): - self.engine_client = rpc.get_engine_client() - - super(EnvironmentsController, self).__init__(*args, **kwargs) - - @rest_utils.wrap_wsme_controller_exception - @wsme_pecan.wsexpose(resources.Environment, types.uuid) - def get(self, id): - LOG.info("Fetch environment [id=%s]", id) - - @rest_utils.wrap_wsme_controller_exception - @wsme_pecan.wsexpose( - resources.Environment, - body=resources.Environment, - status_code=201 - ) - def post(self, env): - LOG.info("Create environment. [environment=%s]", env) - - self.engine_client.create_environment() - - return resources.Environment.from_dict( - {'id': '123', 'name': 'python2.7'} - ) diff --git a/qinling/api/controllers/v1/execution.py b/qinling/api/controllers/v1/execution.py new file mode 100644 index 00000000..e69de29b diff --git a/qinling/api/controllers/v1/resources.py b/qinling/api/controllers/v1/resources.py index b6c55a21..9dea5e63 100644 --- a/qinling/api/controllers/v1/resources.py +++ b/qinling/api/controllers/v1/resources.py @@ -13,6 +13,8 @@ # limitations under the License. import json + +import wsme from wsme import types as wtypes from qinling.api.controllers.v1 import types @@ -209,36 +211,42 @@ class Functions(ResourceList): return sample -class Environment(Resource): +class Runtime(Resource): id = wtypes.text name = wtypes.text + image = wsme.wsattr(wtypes.text, mandatory=True) description = wtypes.text - created_at = wtypes.text - updated_at = wtypes.text + status = wsme.wsattr(wtypes.text, readonly=True) + project_id = wsme.wsattr(wtypes.text, readonly=True) + created_at = wsme.wsattr(wtypes.text, readonly=True) + updated_at = wsme.wsattr(wtypes.text, readonly=True) @classmethod def sample(cls): return cls( id='123e4567-e89b-12d3-a456-426655440000', name='python2.7', + image='lingxiankong/python', + status='available', + project_id='', description='Python 2.7 environment.', created_at='1970-01-01T00:00:00.000000', updated_at='1970-01-01T00:00:00.000000' ) -class Environments(ResourceList): - environments = [Environment] +class Runtimes(ResourceList): + runtimes = [Runtime] def __init__(self, **kwargs): self._type = 'environments' - super(Environments, self).__init__(**kwargs) + super(Runtimes, self).__init__(**kwargs) @classmethod def sample(cls): sample = cls() - sample.environments = [Environment.sample()] + sample.runtimes = [Runtime.sample()] sample.next = ( "http://localhost:7070/v1/environments?" "sort_keys=id,name&sort_dirs=asc,desc&limit=10&" diff --git a/qinling/api/controllers/v1/root.py b/qinling/api/controllers/v1/root.py index 89f906ab..43d12fc9 100644 --- a/qinling/api/controllers/v1/root.py +++ b/qinling/api/controllers/v1/root.py @@ -16,9 +16,9 @@ import pecan from wsme import types as wtypes import wsmeext.pecan as wsme_pecan -from qinling.api.controllers.v1 import environment from qinling.api.controllers.v1 import function from qinling.api.controllers.v1 import resources +from qinling.api.controllers.v1 import runtime class RootResource(resources.Resource): @@ -34,7 +34,7 @@ class Controller(object): """API root controller for version 1.""" functions = function.FunctionsController() - environments = environment.EnvironmentsController() + runtimes = runtime.RuntimesController() @wsme_pecan.wsexpose(RootResource) def index(self): diff --git a/qinling/api/controllers/v1/route.py b/qinling/api/controllers/v1/route.py new file mode 100644 index 00000000..e69de29b diff --git a/qinling/api/controllers/v1/runtime.py b/qinling/api/controllers/v1/runtime.py new file mode 100644 index 00000000..ce5196ee --- /dev/null +++ b/qinling/api/controllers/v1/runtime.py @@ -0,0 +1,84 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg +from oslo_log import log as logging +from pecan import rest +import wsmeext.pecan as wsme_pecan + +from qinling.api.controllers.v1 import resources +from qinling.api.controllers.v1 import types +from qinling.db import api as db_api +from qinling import rpc +from qinling.utils import rest_utils + +LOG = logging.getLogger(__name__) + + +class RuntimesController(rest.RestController): + def __init__(self, *args, **kwargs): + self.engine_client = rpc.get_engine_client() + + super(RuntimesController, self).__init__(*args, **kwargs) + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose(resources.Runtime, types.uuid) + def get(self, id): + LOG.info("Fetch runtime [id=%s]", id) + + runtime_db = db_api.get_runtime(id) + + return resources.Runtime.from_dict(runtime_db.to_dict()) + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose(resources.Runtimes) + def get_all(self): + LOG.info("Get all runtimes.") + + runtimes = [resources.Runtime.from_dict(db_model.to_dict()) + for db_model in db_api.get_runtimes()] + + return resources.Runtimes(runtimes=runtimes) + + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose( + resources.Runtime, + body=resources.Runtime, + status_code=201 + ) + def post(self, runtime): + params = runtime.to_dict() + + LOG.info("Creating runtime. [runtime=%s]", params) + + params.update({'status': 'creating'}) + + db_model = db_api.create_runtime(params) + self.engine_client.create_runtime(db_model.id) + + return resources.Runtime.from_dict(db_model.to_dict()) + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose(None, types.uuid, status_code=204) + def delete(self, id): + """Delete runtime.""" + + LOG.info("Delete runtime [id=%s]", id) + + with db_api.transaction(): + runtime_db = db_api.get_runtime(id) + runtime_db.status = 'deleting' + + self.engine_client.delete_runtime(id) diff --git a/qinling/cmd/launch.py b/qinling/cmd/launch.py index e60bdb19..61943e18 100644 --- a/qinling/cmd/launch.py +++ b/qinling/cmd/launch.py @@ -38,9 +38,9 @@ from oslo_log import log as logging # noqa from oslo_service import service # noqa from qinling.api import service as api_service # noqa -from qinling.engine import rpc # noqa -from qinling.engine import service as eng_service # noqa from qinling import config # noqa +from qinling.engine import service as eng_service # noqa +from qinling import rpc # noqa from qinling import version # noqa CONF = cfg.CONF diff --git a/qinling/config.py b/qinling/config.py index 2fedd26b..28630d90 100644 --- a/qinling/config.py +++ b/qinling/config.py @@ -28,6 +28,7 @@ launch_opt = cfg.ListOpt( help='Specifies which qinling server to start by the launch script.' ) +API_GROUP = 'api' api_opts = [ cfg.StrOpt('host', default='0.0.0.0', help='Qinling API server host.'), cfg.PortOpt('port', default=7070, help='Qinling API server port.'), @@ -45,6 +46,7 @@ api_opts = [ ) ] +PECAN_GROUP = 'pecan' pecan_opts = [ cfg.StrOpt( 'root', @@ -69,6 +71,7 @@ pecan_opts = [ ) ] +ENGINE_GROUP = 'engine' engine_opts = [ cfg.StrOpt( 'host', @@ -82,8 +85,15 @@ engine_opts = [ default='qinling_engine', help='The message topic that the engine listens on.' ), + cfg.StrOpt( + 'orchestrator', + default='kubernetes', + choices=['kubernetes', 'swarm'], + help='The container orchestrator.' + ), ] +STORAGE_GROUP = 'storage' storage_opts = [ cfg.StrOpt( 'file_system_dir', @@ -92,17 +102,39 @@ storage_opts = [ ) ] +KUBERNETES_GROUP = 'kubernetes' +kubernetes_opts = [ + cfg.StrOpt( + 'namespace', + default='qinling', + help='Resources scope created by Qinling.' + ), + cfg.IntOpt( + 'replicas', + default=3, + help='Number of desired replicas in deployment.' + ), + cfg.StrOpt( + 'kube_host', + help='Kubernetes server address.' + ), + cfg.StrOpt( + 'volume_name', + default='functiondir', + help='Name of the volume shared between worker container and utility ' + 'container.' + ), +] + CONF = cfg.CONF -API_GROUP = 'api' -PECAN_GROUP = 'pecan' -ENGINE_GROUP = 'engine' -STORAGE_GROUP = 'storage' + CLI_OPTS = [launch_opt] CONF.register_opts(api_opts, group=API_GROUP) CONF.register_opts(pecan_opts, group=PECAN_GROUP) CONF.register_opts(engine_opts, group=ENGINE_GROUP) CONF.register_opts(storage_opts, group=STORAGE_GROUP) +CONF.register_opts(kubernetes_opts, group=KUBERNETES_GROUP) CONF.register_cli_opts(CLI_OPTS) default_group_opts = itertools.chain( @@ -117,6 +149,7 @@ def list_opts(): (PECAN_GROUP, pecan_opts), (ENGINE_GROUP, engine_opts), (STORAGE_GROUP, storage_opts), + (KUBERNETES_GROUP, kubernetes_opts), (None, default_group_opts) ] diff --git a/qinling/db/api.py b/qinling/db/api.py index 8b61de46..cdcfb9ba 100644 --- a/qinling/db/api.py +++ b/qinling/db/api.py @@ -83,3 +83,26 @@ def update_function(id, values): def delete_function(id): IMPL.delete_function(id) + + +# Function + + +def create_runtime(values): + return IMPL.create_runtime(values) + + +def get_runtime(id): + return IMPL.get_runtime(id) + + +def get_runtimes(): + return IMPL.get_runtimes() + + +def delete_runtime(id): + return IMPL.delete_runtime(id) + + +def update_runtime(id, values): + return IMPL.update_runtime(id, values) diff --git a/qinling/db/base.py b/qinling/db/base.py index 9038f763..ff09cea3 100644 --- a/qinling/db/base.py +++ b/qinling/db/base.py @@ -37,7 +37,7 @@ def _get_facade(): return _FACADE -def get_session(expire_on_commit=True, autocommit=False): +def get_session(expire_on_commit=False, autocommit=False): """Helper method to grab session.""" facade = _get_facade() return facade.get_session(expire_on_commit=expire_on_commit, diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py index 9050615f..5171c8a6 100644 --- a/qinling/db/sqlalchemy/api.py +++ b/qinling/db/sqlalchemy/api.py @@ -18,10 +18,14 @@ import threading from oslo_config import cfg from oslo_db import exception as oslo_db_exc +from oslo_db.sqlalchemy import utils as db_utils from oslo_log import log as logging import sqlalchemy as sa +from qinling import context from qinling.db import base as db_base +from qinling.db.sqlalchemy import filters as db_filters +from qinling.db.sqlalchemy import model_base from qinling.db.sqlalchemy import models from qinling import exceptions as exc @@ -98,6 +102,86 @@ def transaction(): end_tx() +def _secure_query(model, *columns): + query = db_base.model_query(model, columns) + + if not issubclass(model, model_base.QinlingSecureModelBase): + return query + + query = query.filter(model.project_id == context.get_ctx().projectid) + + return query + + +def _paginate_query(model, limit=None, marker=None, sort_keys=None, + sort_dirs=None, query=None): + if not query: + query = _secure_query(model) + + sort_keys = sort_keys if sort_keys else [] + + if 'id' not in sort_keys: + sort_keys.append('id') + sort_dirs.append('asc') if sort_dirs else None + + query = db_utils.paginate_query( + query, + model, + limit, + sort_keys, + marker=marker, + sort_dirs=sort_dirs + ) + + return query + + +def _get_collection(model, insecure=False, limit=None, marker=None, + sort_keys=None, sort_dirs=None, fields=None, **filters): + columns = ( + tuple([getattr(model, f) for f in fields if hasattr(model, f)]) + if fields else () + ) + + query = (db_base.model_query(model, *columns) if insecure + else _secure_query(model, *columns)) + query = db_filters.apply_filters(query, model, **filters) + + query = _paginate_query( + model, + limit, + marker, + sort_keys, + sort_dirs, + query + ) + + try: + return query.all() + except Exception as e: + raise exc.DBError( + "Failed when querying database, error type: %s, " + "error message: %s" % (e.__class__.__name__, str(e)) + ) + + +def _get_collection_sorted_by_time(model, insecure=False, fields=None, + sort_keys=['created_at'], **kwargs): + return _get_collection( + model=model, + insecure=insecure, + sort_keys=sort_keys, + fields=fields, + **kwargs + ) + + +def _get_db_object_by_id(model, id, insecure=False): + query = db_base.model_query(model) if insecure else _secure_query(model) + + return query.filter_by(id=id).first() + + @db_base.session_aware() def get_function(id): pass @@ -132,3 +216,40 @@ def update_function(id, values): @db_base.session_aware() def delete_function(id): pass + + +@db_base.session_aware() +def create_runtime(values, session=None): + runtime = models.Runtime() + runtime.update(values.copy()) + + try: + runtime.save(session=session) + except oslo_db_exc.DBDuplicateEntry as e: + raise exc.DBError( + "Duplicate entry for Runtime: %s" % e.columns + ) + + return runtime + + +@db_base.session_aware() +def get_runtime(id, session=None): + runtime = _get_db_object_by_id(models.Runtime, id) + + if not runtime: + raise exc.DBEntityNotFoundError("Runtime not found [id=%s]" % id) + + return runtime + + +@db_base.session_aware() +def get_runtimes(session=None, **kwargs): + return _get_collection_sorted_by_time(models.Runtime, **kwargs) + + +@db_base.session_aware() +def delete_runtime(id, session=None): + runtime = get_runtime(id) + + session.delete(runtime) diff --git a/qinling/db/sqlalchemy/filters.py b/qinling/db/sqlalchemy/filters.py new file mode 100644 index 00000000..84faa920 --- /dev/null +++ b/qinling/db/sqlalchemy/filters.py @@ -0,0 +1,67 @@ +# Copyright 2016 NEC Corporation. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import sqlalchemy as sa + + +def apply_filters(query, model, **filters): + filter_dict = {} + + for key, value in filters.items(): + column_attr = getattr(model, key) + + if isinstance(value, dict): + if 'in' in value: + query = query.filter(column_attr.in_(value['in'])) + elif 'nin' in value: + query = query.filter(~column_attr.in_(value['nin'])) + elif 'neq' in value: + query = query.filter(column_attr != value['neq']) + elif 'gt' in value: + query = query.filter(column_attr > value['gt']) + elif 'gte' in value: + query = query.filter(column_attr >= value['gte']) + elif 'lt' in value: + query = query.filter(column_attr < value['lt']) + elif 'lte' in value: + query = query.filter(column_attr <= value['lte']) + elif 'eq' in value: + query = query.filter(column_attr == value['eq']) + elif 'has' in value: + like_pattern = '%{0}%'.format(value['has']) + + query = query.filter(column_attr.like(like_pattern)) + else: + filter_dict[key] = value + + # We need to handle tag case seprately. As tag datatype is MutableList. + # TODO(hparekh): Need to think how can we get rid of this. + tags = filters.pop('tags', None) + + # To match the tag list, a resource must contain at least all of the + # tags present in the filter parameter. + if tags: + tag_attr = getattr(model, 'tags') + + if not isinstance(tags, list): + expr = tag_attr.contains(tags) + else: + expr = sa.and_(*[tag_attr.contains(tag) for tag in tags]) + + query = query.filter(expr) + + if filter_dict: + query = query.filter_by(**filter_dict) + + return query diff --git a/qinling/db/sqlalchemy/migration/__init__.py b/qinling/db/sqlalchemy/migration/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/qinling/db/sqlalchemy/migration/alembic.ini b/qinling/db/sqlalchemy/migration/alembic.ini new file mode 100644 index 00000000..7a00cabc --- /dev/null +++ b/qinling/db/sqlalchemy/migration/alembic.ini @@ -0,0 +1,58 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +script_location = qinling/db/sqlalchemy/migration/alembic_migrations + +# template used to generate migration files +# file_template = %%(rev)s_%%(slug)s + +# max length of characters to apply to the +# "slug" field +#truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +sqlalchemy.url = + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S \ No newline at end of file diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/README.md b/qinling/db/sqlalchemy/migration/alembic_migrations/README.md new file mode 100644 index 00000000..09e9e442 --- /dev/null +++ b/qinling/db/sqlalchemy/migration/alembic_migrations/README.md @@ -0,0 +1,61 @@ +The migrations in `alembic_migrations/versions` contain the changes needed to migrate +between Qinling database revisions. A migration occurs by executing a script that +details the changes needed to upgrade the database. The migration scripts +are ordered so that multiple scripts can run sequentially. The scripts are executed by +Qinling's migration wrapper which uses the Alembic library to manage the migration. Qinling +supports migration from Pike or later. + +You can upgrade to the latest database version via: +``` +qinling-db-manage --config-file /path/to/qinling.conf upgrade head +``` + +To check the current database version: +``` +qinling-db-manage --config-file /path/to/qinling.conf current +``` + +To create a script to run the migration offline: +``` +qinling-db-manage --config-file /path/to/qinling.conf upgrade head --sql +``` + +To run the offline migration between specific migration versions: +``` +qinling-db-manage --config-file /path/to/qinling.conf upgrade : --sql +``` + +Upgrade the database incrementally: +``` +qinling-db-manage --config-file /path/to/qinling.conf upgrade --delta <# of revs> +``` + +Or, upgrade the database to one newer revision: +``` +qinling-db-manage --config-file /path/to/qinling.conf upgrade +1 +``` + +Create new revision: +``` +qinling-db-manage --config-file /path/to/qinling.conf revision -m "description of revision" --autogenerate +``` + +Create a blank file: +``` +qinling-db-manage --config-file /path/to/qinling.conf revision -m "description of revision" +``` + +This command does not perform any migrations, it only sets the revision. +Revision may be any existing revision. Use this command carefully. +``` +qinling-db-manage --config-file /path/to/qinling.conf stamp +``` + +To verify that the timeline does branch, you can run this command: +``` +qinling-db-manage --config-file /path/to/qinling.conf check_migration +``` + +If the migration path has branch, you can find the branch point via: +``` +qinling-db-manage --config-file /path/to/qinling.conf history \ No newline at end of file diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/__init__.py b/qinling/db/sqlalchemy/migration/alembic_migrations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/env.py b/qinling/db/sqlalchemy/migration/alembic_migrations/env.py new file mode 100644 index 00000000..c1eee756 --- /dev/null +++ b/qinling/db/sqlalchemy/migration/alembic_migrations/env.py @@ -0,0 +1,84 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import with_statement +from alembic import context +from logging import config as c +from oslo_utils import importutils +from sqlalchemy import create_engine +from sqlalchemy import pool + +from qinling.db.sqlalchemy import model_base + + +importutils.try_import('qinling.db.sqlalchemy.models') + +# This is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config +qinling_config = config.qinling_config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +c.fileConfig(config.config_file_name) + +# Add your model's MetaData object here for 'autogenerate' support. +target_metadata = model_base.QinlingSecureModelBase.metadata + + +def run_migrations_offline(): + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + context.configure(url=qinling_config.database.connection) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online(): + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + engine = create_engine( + qinling_config.database.connection, + poolclass=pool.NullPool + ) + + connection = engine.connect() + context.configure( + connection=connection, + target_metadata=target_metadata + ) + + try: + with context.begin_transaction(): + context.run_migrations() + finally: + connection.close() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/script.py.mako b/qinling/db/sqlalchemy/migration/alembic_migrations/script.py.mako new file mode 100644 index 00000000..efbb28a3 --- /dev/null +++ b/qinling/db/sqlalchemy/migration/alembic_migrations/script.py.mako @@ -0,0 +1,33 @@ +# Copyright ${create_date.year} OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision} +Create Date: ${create_date} + +""" + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + + +def upgrade(): + ${upgrades if upgrades else "pass"} \ No newline at end of file diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py new file mode 100644 index 00000000..5227409a --- /dev/null +++ b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py @@ -0,0 +1,45 @@ +# Copyright 2017 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Pike release + +Revision ID: 001 +Revises: None +Create Date: 2017-05-03 12:02:51.935368 + +""" + +# revision identifiers, used by Alembic. +revision = '001' +down_revision = None + +from alembic import op +import sqlalchemy as sa + +from qinling.db.sqlalchemy import types as st + + +def upgrade(): + op.create_table( + 'runtime', + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('project_id', sa.String(length=80), nullable=False), + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('name', sa.String(length=255), nullable=True), + sa.Column('description', sa.String(length=255), nullable=True), + sa.Column('image', sa.String(length=255), nullable=False), + sa.Column('status', sa.String(length=32), nullable=False), + sa.PrimaryKeyConstraint('id'), + ) diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/__init__.py b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/qinling/db/sqlalchemy/migration/cli.py b/qinling/db/sqlalchemy/migration/cli.py new file mode 100644 index 00000000..b660aecb --- /dev/null +++ b/qinling/db/sqlalchemy/migration/cli.py @@ -0,0 +1,123 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Starter script for qinling-db-manage.""" + +import os +import sys + +from alembic import command as alembic_cmd +from alembic import config as alembic_cfg +from alembic import util as alembic_u +from oslo_config import cfg +from oslo_utils import importutils +import six + +from qinling import config + +# We need to import mistral.api.app to +# make sure we register all needed options. +importutils.try_import('qinling.api.app') + +CONF = cfg.CONF + + +def do_alembic_command(config, cmd, *args, **kwargs): + try: + getattr(alembic_cmd, cmd)(config, *args, **kwargs) + except alembic_u.CommandError as e: + alembic_u.err(six.text_type(e)) + + +def do_check_migration(config, _cmd): + do_alembic_command(config, 'branches') + + +def do_upgrade(config, cmd): + if not CONF.command.revision and not CONF.command.delta: + raise SystemExit('You must provide a revision or relative delta') + + revision = CONF.command.revision + + if CONF.command.delta: + sign = '+' if CONF.command.name == 'upgrade' else '-' + revision = sign + str(CONF.command.delta) + + do_alembic_command(config, cmd, revision, sql=CONF.command.sql) + + +def do_stamp(config, cmd): + do_alembic_command( + config, cmd, + CONF.command.revision, + sql=CONF.command.sql + ) + + +def do_revision(config, cmd): + do_alembic_command( + config, cmd, + message=CONF.command.message, + autogenerate=CONF.command.autogenerate, + sql=CONF.command.sql + ) + + +def add_command_parsers(subparsers): + for name in ['current', 'history', 'branches']: + parser = subparsers.add_parser(name) + parser.set_defaults(func=do_alembic_command) + + parser = subparsers.add_parser('upgrade') + parser.add_argument('--delta', type=int) + parser.add_argument('--sql', action='store_true') + parser.add_argument('revision', nargs='?') + parser.set_defaults(func=do_upgrade) + + parser = subparsers.add_parser('stamp') + parser.add_argument('--sql', action='store_true') + parser.add_argument('revision', nargs='?') + parser.set_defaults(func=do_stamp) + + parser = subparsers.add_parser('revision') + parser.add_argument('-m', '--message') + parser.add_argument('--autogenerate', action='store_true') + parser.add_argument('--sql', action='store_true') + parser.set_defaults(func=do_revision) + + +command_opt = cfg.SubCommandOpt('command', + title='Command', + help='Available commands', + handler=add_command_parsers) + +CONF.register_cli_opt(command_opt) + + +def main(): + config = alembic_cfg.Config( + os.path.join(os.path.dirname(__file__), 'alembic.ini') + ) + config.set_main_option( + 'script_location', + 'qinling.db.sqlalchemy.migration:alembic_migrations' + ) + # attach the Qinling conf to the Alembic conf + config.qinling_config = CONF + + CONF(project='qinling') + CONF.command.func(config, CONF.command.name) + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/qinling/db/sqlalchemy/model_base.py b/qinling/db/sqlalchemy/model_base.py index 313565ff..0e328683 100644 --- a/qinling/db/sqlalchemy/model_base.py +++ b/qinling/db/sqlalchemy/model_base.py @@ -20,6 +20,8 @@ import sqlalchemy as sa from sqlalchemy.ext import declarative from sqlalchemy.orm import attributes +from qinling import context + def id_column(): return sa.Column( @@ -29,6 +31,10 @@ def id_column(): ) +def get_project_id(): + return context.get_ctx().projectid + + class _QinlingModelBase(oslo_models.ModelBase, oslo_models.TimestampMixin): """Base class for all Qinling SQLAlchemy DB Models.""" @@ -109,4 +115,8 @@ class QinlingSecureModelBase(QinlingModelBase): __abstract__ = True id = id_column() - project_id = sa.Column(sa.String(80), nullable=False) + project_id = sa.Column( + sa.String(80), + nullable=False, + default=get_project_id + ) diff --git a/qinling/db/sqlalchemy/models.py b/qinling/db/sqlalchemy/models.py index 1753fa42..a4986037 100644 --- a/qinling/db/sqlalchemy/models.py +++ b/qinling/db/sqlalchemy/models.py @@ -33,3 +33,12 @@ class Function(model_base.QinlingSecureModelBase): provider = sa.Column(sa.String(32), nullable=False) package = sa.Column(sa.Boolean, nullable=False) code = sa.Column(st.JsonLongDictType(), nullable=False) + + +class Runtime(model_base.QinlingSecureModelBase): + __tablename__ = 'runtime' + + name = sa.Column(sa.String(255)) + description = sa.Column(sa.String(255)) + image = sa.Column(sa.String(255), nullable=False) + status = sa.Column(sa.String(32), nullable=False) diff --git a/qinling/engine/base.py b/qinling/engine/base.py deleted file mode 100644 index 7ba3d811..00000000 --- a/qinling/engine/base.py +++ /dev/null @@ -1,26 +0,0 @@ -# Copyright 2017 Catalyst IT Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import abc - -import six - - -@six.add_metaclass(abc.ABCMeta) -class Engine(object): - """Engine interface.""" - - @abc.abstractmethod - def create_environment(self): - raise NotImplementedError diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index 269d5c53..c941e510 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -15,11 +15,53 @@ from oslo_config import cfg from oslo_log import log as logging -from qinling.engine import base +from qinling.db import api as db_api +from qinling import exceptions as exc LOG = logging.getLogger(__name__) -class DefaultEngine(base.Engine): - def create_environment(self, ctx): - LOG.info('Received request.') +class DefaultEngine(object): + def __init__(self, orchestrator): + self.orchestrator = orchestrator + + def create_runtime(self, ctx, runtime_id): + LOG.info('Start to create runtime, id=%s', runtime_id) + + with db_api.transaction(): + runtime = db_api.get_runtime(runtime_id) + identifier = '%s-%s' % (runtime_id, runtime.name) + labels = {'runtime_name': runtime.name, 'runtime_id': runtime_id} + + try: + self.orchestrator.create_pool( + identifier, + runtime.image, + labels=labels, + ) + + runtime.status = 'available' + except Exception as e: + LOG.exception( + 'Failed to create pool for runtime %s. Error: %s', + runtime_id, + str(e) + ) + + runtime.status = 'error' + + raise exc.OrchestratorException('Failed to create pool.') + + def delete_runtime(self, ctx, runtime_id): + LOG.info('Start to delete runtime, id=%s', runtime_id) + + with db_api.transaction(): + runtime = db_api.get_runtime(runtime_id) + identifier = '%s-%s' % (runtime_id, runtime.name) + labels = {'runtime_name': runtime.name, 'runtime_id': runtime_id} + + self.orchestrator.delete_pool(identifier, labels=labels) + + db_api.delete_runtime(runtime_id) + + LOG.info('Runtime %s deleted.', runtime_id) diff --git a/qinling/engine/service.py b/qinling/engine/service.py index 26daa767..f0d6c98d 100644 --- a/qinling/engine/service.py +++ b/qinling/engine/service.py @@ -19,9 +19,11 @@ from oslo_service import service from qinling.db import api as db_api from qinling.engine import default_engine as engine -from qinling.engine import rpc +from qinling.orchestrator import base as orchestra_base +from qinling import rpc LOG = logging.getLogger(__name__) +CONF = cfg.CONF class EngineService(service.Service): @@ -31,11 +33,13 @@ class EngineService(service.Service): self.server = None def start(self): - topic = cfg.CONF.engine.topic - server = cfg.CONF.engine.host - transport = messaging.get_transport(cfg.CONF) + orchestrator = orchestra_base.load_orchestrator(CONF) + + topic = CONF.engine.topic + server = CONF.engine.host + transport = messaging.get_transport(CONF) target = messaging.Target(topic=topic, server=server, fanout=False) - endpoints = [engine.DefaultEngine()] + endpoints = [engine.DefaultEngine(orchestrator)] self.server = messaging.get_rpc_server( transport, target, diff --git a/qinling/exceptions.py b/qinling/exceptions.py index 6ed117e8..14be4317 100644 --- a/qinling/exceptions.py +++ b/qinling/exceptions.py @@ -68,6 +68,16 @@ class DBError(QinlingException): http_code = 400 +class DBEntityNotFoundError(DBError): + http_code = 404 + message = "Object not found" + + class ApplicationContextNotFoundException(QinlingException): http_code = 400 message = "Application context not found" + + +class OrchestratorException(QinlingException): + http_code = 500 + message = "Orchestrator error." diff --git a/qinling/orchestrator/base.py b/qinling/orchestrator/base.py index e69de29b..8695c910 100644 --- a/qinling/orchestrator/base.py +++ b/qinling/orchestrator/base.py @@ -0,0 +1,55 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc + +import six +from stevedore import driver + +from qinling import exceptions as exc + +ORCHESTRATOR = None + + +@six.add_metaclass(abc.ABCMeta) +class OrchestratorBase(object): + """OrchestratorBase interface.""" + + @abc.abstractmethod + def create_pool(self, name, image, **kwargs): + raise NotImplementedError + + @abc.abstractmethod + def delete_pool(self, name, **kwargs): + raise NotImplementedError + + +def load_orchestrator(conf): + global ORCHESTRATOR + + if not ORCHESTRATOR: + try: + mgr = driver.DriverManager('qinling.orchestrator', + conf.engine.orchestrator, + invoke_on_load=True, + invoke_args=[conf]) + + ORCHESTRATOR = mgr.driver + except Exception as e: + raise exc.OrchestratorException( + 'Failed to load orchestrator: %s. Error: %s' % + (conf.engine.orchestrator, str(e)) + ) + + return ORCHESTRATOR diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py new file mode 100644 index 00000000..20374db6 --- /dev/null +++ b/qinling/orchestrator/kubernetes/manager.py @@ -0,0 +1,140 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import jinja2 +from kubernetes import config +from kubernetes import client +from kubernetes.client import models +from kubernetes.client.rest import ApiException +from oslo_config import cfg +from oslo_log import log as logging +import yaml + +from qinling.orchestrator import base +from qinling.utils import common + +LOG = logging.getLogger(__name__) + +TEMPLATES_DIR = (os.path.dirname(os.path.realpath(__file__)) + '/templates/') + + +class KubernetesManager(base.OrchestratorBase): + def __init__(self, conf): + self.conf = conf + + client.Configuration().host = self.conf.kubernetes.kube_host + self.v1 = client.CoreV1Api() + self.v1extention = client.ExtensionsV1beta1Api() + + # Create namespace if not exists + self._ensure_namespace() + + # Get templates. + template_loader = jinja2.FileSystemLoader( + searchpath=os.path.dirname(TEMPLATES_DIR) + ) + jinja_env = jinja2.Environment( + loader=template_loader, autoescape=True, trim_blocks=True + ) + + self.deployment_template = jinja_env.get_template('deployment.j2') + + def _ensure_namespace(self): + ret = self.v1.list_namespace() + cur_names = [i.metadata.name for i in ret.items] + + if self.conf.kubernetes.namespace not in cur_names: + LOG.info('Creating namespace: %s', self.conf.kubernetes.namespace) + + namespace_body = { + 'apiVersion': 'v1', + 'kind': 'Namespace', + 'metadata': { + 'name': self.conf.kubernetes.namespace, + 'labels': { + 'name': self.conf.kubernetes.namespace + } + }, + } + + self.v1.create_namespace(namespace_body) + + LOG.info('Namespace %s created.', self.conf.kubernetes.namespace) + + def create_pool(self, name, image, labels=None): + deployment_body = self.deployment_template.render( + { + "name": name, + "labels": labels if labels else {}, + "replicas": self.conf.kubernetes.replicas, + "volume_name": self.conf.kubernetes.volume_name, + "container_name": 'worker', + "image": image, + } + ) + + LOG.info( + "Creating deployment for runtime %s: \n%s", name, deployment_body + ) + + self.v1extention.create_namespaced_deployment( + body=yaml.safe_load(deployment_body), + namespace=self.conf.kubernetes.namespace + ) + + LOG.info("Deployment for runtime %s created.", name) + + def delete_pool(self, name, labels=None): + """Delete all resources belong to the deployment.""" + + LOG.info("Deleting deployment %s", name) + + selector = common.convert_dict_to_string(labels) + + self.v1.delete_collection_namespaced_pod( + self.conf.kubernetes.namespace, + label_selector=selector + ) + + LOG.info("Pods in deployment %s deleted.", name) + + self.v1extention.delete_collection_namespaced_replica_set( + self.conf.kubernetes.namespace, + label_selector=selector + ) + + LOG.info("ReplicaSets in deployment %s deleted.", name) + + ret = self.v1.list_namespaced_service( + self.conf.kubernetes.namespace, label_selector=selector + ) + names = [i.metadata.name for i in ret.items] + for name in names: + self.v1.delete_namespaced_service( + name, + self.conf.kubernetes.namespace, + models.v1_delete_options.V1DeleteOptions() + ) + + LOG.info("Services in deployment %s deleted.", name) + + self.v1extention.delete_collection_namespaced_deployment( + self.conf.kubernetes.namespace, + label_selector=selector, + field_selector='metadata.name=%s' % name + ) + + LOG.info("Deployment %s deleted.", name) diff --git a/qinling/orchestrator/kubernetes/templates/deployment.j2 b/qinling/orchestrator/kubernetes/templates/deployment.j2 new file mode 100644 index 00000000..7c722331 --- /dev/null +++ b/qinling/orchestrator/kubernetes/templates/deployment.j2 @@ -0,0 +1,41 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: {{ name }} + labels: +{% for key, value in labels.items() %} + {{ key}}: {{ value }} +{% endfor %} +spec: + replicas: {{ replicas }} + selector: + matchLabels: +{% for key, value in labels.items() %} + {{ key}}: {{ value }} +{% endfor %} + template: + metadata: + labels: +{% for key, value in labels.items() %} + {{ key}}: {{ value }} +{% endfor %} + spec: + volumes: + - name: {{ volume_name }} + emptyDir: {} + containers: + - name: {{ container_name }} + image: {{ image }} + imagePullPolicy: IfNotPresent + volumeMounts: + - name: {{ volume_name }} + mountPath: /function + - name: fetcher + image: fission/fetcher + imagePullPolicy: IfNotPresent + volumeMounts: + - name: {{ volume_name }} + mountPath: /function + command: + - /fetcher + - /function \ No newline at end of file diff --git a/qinling/orchestrator/swarm/manager.py b/qinling/orchestrator/swarm/manager.py new file mode 100644 index 00000000..e69de29b diff --git a/qinling/engine/rpc.py b/qinling/rpc.py similarity index 87% rename from qinling/engine/rpc.py rename to qinling/rpc.py index 7904a4fd..e9c877f8 100644 --- a/qinling/engine/rpc.py +++ b/qinling/rpc.py @@ -18,7 +18,6 @@ import oslo_messaging as messaging from oslo_messaging.rpc import client from qinling import context as ctx -from qinling.engine import base from qinling import exceptions as exc LOG = logging.getLogger(__name__) @@ -111,7 +110,7 @@ class ContextSerializer(messaging.Serializer): return qinling_ctx -class EngineClient(base.Engine): +class EngineClient(object): """RPC Engine client.""" def __init__(self, transport): @@ -122,15 +121,26 @@ class EngineClient(base.Engine): serializer = ContextSerializer( messaging.serializer.JsonPayloadSerializer()) + self.topic = cfg.CONF.engine.topic + self._client = messaging.RPCClient( transport, - messaging.Target(topic=cfg.CONF.engine.topic), + messaging.Target(topic=self.topic), serializer=serializer ) @wrap_messaging_exception - def create_environment(self): - return self._client.cast( + def create_runtime(self, id): + return self._client.prepare(topic=self.topic, server=None).cast( ctx.get_ctx(), - 'create_environment' + 'create_runtime', + runtime_id=id + ) + + @wrap_messaging_exception + def delete_runtime(self, id): + return self._client.prepare(topic=self.topic, server=None).cast( + ctx.get_ctx(), + 'delete_runtime', + runtime_id=id ) diff --git a/qinling/utils/common.py b/qinling/utils/common.py new file mode 100644 index 00000000..44874816 --- /dev/null +++ b/qinling/utils/common.py @@ -0,0 +1,18 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +def convert_dict_to_string(d): + temp_list = ['%s=%s' % (k, v) for k, v in d.items()] + + return ','.join(temp_list) diff --git a/requirements.txt b/requirements.txt index 3b6e2f71..e0079061 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,3 +22,5 @@ SQLAlchemy>=1.0.10,!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8 # MIT sqlalchemy-migrate>=0.9.6 # Apache-2.0 stevedore>=1.20.0 # Apache-2.0 WSME>=0.8 # MIT +kubernetes>=1.0.0b1 # Apache-2.0 +PyYAML>=3.10.0 # MIT diff --git a/setup.cfg b/setup.cfg index ea889fae..3adc8729 100644 --- a/setup.cfg +++ b/setup.cfg @@ -25,9 +25,13 @@ packages = [entry_points] console_scripts = qinling-server = qinling.cmd.launch:main + qinling-db-manage = qinling.db.sqlalchemy.migration.cli:main + +qinling.orchestrator = + kubernetes = qinling.orchestrator.kubernetes.manager:KubernetesManager oslo.config.opts = - mistral.config = mistral.config:list_opts + qinling.config = qinling.config:list_opts [build_sphinx] all-files = 1