Add runtimes rest api

- Add db layer and db migration support
- Add orchestrator layer, currently only kubernetes supported.
- Now, create/get/delete runtime work fine
This commit is contained in:
Lingxian Kong 2017-05-05 01:29:56 +12:00
parent de9c3e2f7c
commit 7b6deac2c1
35 changed files with 1118 additions and 111 deletions

View File

@ -1,52 +0,0 @@
# Copyright 2017 Catalyst IT Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
from pecan import rest
import wsmeext.pecan as wsme_pecan
from qinling.api.controllers.v1 import resources
from qinling.api.controllers.v1 import types
from qinling.engine import rpc
from qinling.utils import rest_utils
LOG = logging.getLogger(__name__)
class EnvironmentsController(rest.RestController):
def __init__(self, *args, **kwargs):
self.engine_client = rpc.get_engine_client()
super(EnvironmentsController, self).__init__(*args, **kwargs)
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.Environment, types.uuid)
def get(self, id):
LOG.info("Fetch environment [id=%s]", id)
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(
resources.Environment,
body=resources.Environment,
status_code=201
)
def post(self, env):
LOG.info("Create environment. [environment=%s]", env)
self.engine_client.create_environment()
return resources.Environment.from_dict(
{'id': '123', 'name': 'python2.7'}
)

View File

View File

@ -13,6 +13,8 @@
# limitations under the License.
import json
import wsme
from wsme import types as wtypes
from qinling.api.controllers.v1 import types
@ -209,36 +211,42 @@ class Functions(ResourceList):
return sample
class Environment(Resource):
class Runtime(Resource):
id = wtypes.text
name = wtypes.text
image = wsme.wsattr(wtypes.text, mandatory=True)
description = wtypes.text
created_at = wtypes.text
updated_at = wtypes.text
status = wsme.wsattr(wtypes.text, readonly=True)
project_id = wsme.wsattr(wtypes.text, readonly=True)
created_at = wsme.wsattr(wtypes.text, readonly=True)
updated_at = wsme.wsattr(wtypes.text, readonly=True)
@classmethod
def sample(cls):
return cls(
id='123e4567-e89b-12d3-a456-426655440000',
name='python2.7',
image='lingxiankong/python',
status='available',
project_id='<default-project>',
description='Python 2.7 environment.',
created_at='1970-01-01T00:00:00.000000',
updated_at='1970-01-01T00:00:00.000000'
)
class Environments(ResourceList):
environments = [Environment]
class Runtimes(ResourceList):
runtimes = [Runtime]
def __init__(self, **kwargs):
self._type = 'environments'
super(Environments, self).__init__(**kwargs)
super(Runtimes, self).__init__(**kwargs)
@classmethod
def sample(cls):
sample = cls()
sample.environments = [Environment.sample()]
sample.runtimes = [Runtime.sample()]
sample.next = (
"http://localhost:7070/v1/environments?"
"sort_keys=id,name&sort_dirs=asc,desc&limit=10&"

View File

@ -16,9 +16,9 @@ import pecan
from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from qinling.api.controllers.v1 import environment
from qinling.api.controllers.v1 import function
from qinling.api.controllers.v1 import resources
from qinling.api.controllers.v1 import runtime
class RootResource(resources.Resource):
@ -34,7 +34,7 @@ class Controller(object):
"""API root controller for version 1."""
functions = function.FunctionsController()
environments = environment.EnvironmentsController()
runtimes = runtime.RuntimesController()
@wsme_pecan.wsexpose(RootResource)
def index(self):

View File

View File

@ -0,0 +1,84 @@
# Copyright 2017 Catalyst IT Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
from pecan import rest
import wsmeext.pecan as wsme_pecan
from qinling.api.controllers.v1 import resources
from qinling.api.controllers.v1 import types
from qinling.db import api as db_api
from qinling import rpc
from qinling.utils import rest_utils
LOG = logging.getLogger(__name__)
class RuntimesController(rest.RestController):
def __init__(self, *args, **kwargs):
self.engine_client = rpc.get_engine_client()
super(RuntimesController, self).__init__(*args, **kwargs)
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.Runtime, types.uuid)
def get(self, id):
LOG.info("Fetch runtime [id=%s]", id)
runtime_db = db_api.get_runtime(id)
return resources.Runtime.from_dict(runtime_db.to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.Runtimes)
def get_all(self):
LOG.info("Get all runtimes.")
runtimes = [resources.Runtime.from_dict(db_model.to_dict())
for db_model in db_api.get_runtimes()]
return resources.Runtimes(runtimes=runtimes)
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(
resources.Runtime,
body=resources.Runtime,
status_code=201
)
def post(self, runtime):
params = runtime.to_dict()
LOG.info("Creating runtime. [runtime=%s]", params)
params.update({'status': 'creating'})
db_model = db_api.create_runtime(params)
self.engine_client.create_runtime(db_model.id)
return resources.Runtime.from_dict(db_model.to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(None, types.uuid, status_code=204)
def delete(self, id):
"""Delete runtime."""
LOG.info("Delete runtime [id=%s]", id)
with db_api.transaction():
runtime_db = db_api.get_runtime(id)
runtime_db.status = 'deleting'
self.engine_client.delete_runtime(id)

View File

@ -38,9 +38,9 @@ from oslo_log import log as logging # noqa
from oslo_service import service # noqa
from qinling.api import service as api_service # noqa
from qinling.engine import rpc # noqa
from qinling.engine import service as eng_service # noqa
from qinling import config # noqa
from qinling.engine import service as eng_service # noqa
from qinling import rpc # noqa
from qinling import version # noqa
CONF = cfg.CONF

View File

@ -28,6 +28,7 @@ launch_opt = cfg.ListOpt(
help='Specifies which qinling server to start by the launch script.'
)
API_GROUP = 'api'
api_opts = [
cfg.StrOpt('host', default='0.0.0.0', help='Qinling API server host.'),
cfg.PortOpt('port', default=7070, help='Qinling API server port.'),
@ -45,6 +46,7 @@ api_opts = [
)
]
PECAN_GROUP = 'pecan'
pecan_opts = [
cfg.StrOpt(
'root',
@ -69,6 +71,7 @@ pecan_opts = [
)
]
ENGINE_GROUP = 'engine'
engine_opts = [
cfg.StrOpt(
'host',
@ -82,8 +85,15 @@ engine_opts = [
default='qinling_engine',
help='The message topic that the engine listens on.'
),
cfg.StrOpt(
'orchestrator',
default='kubernetes',
choices=['kubernetes', 'swarm'],
help='The container orchestrator.'
),
]
STORAGE_GROUP = 'storage'
storage_opts = [
cfg.StrOpt(
'file_system_dir',
@ -92,17 +102,39 @@ storage_opts = [
)
]
KUBERNETES_GROUP = 'kubernetes'
kubernetes_opts = [
cfg.StrOpt(
'namespace',
default='qinling',
help='Resources scope created by Qinling.'
),
cfg.IntOpt(
'replicas',
default=3,
help='Number of desired replicas in deployment.'
),
cfg.StrOpt(
'kube_host',
help='Kubernetes server address.'
),
cfg.StrOpt(
'volume_name',
default='functiondir',
help='Name of the volume shared between worker container and utility '
'container.'
),
]
CONF = cfg.CONF
API_GROUP = 'api'
PECAN_GROUP = 'pecan'
ENGINE_GROUP = 'engine'
STORAGE_GROUP = 'storage'
CLI_OPTS = [launch_opt]
CONF.register_opts(api_opts, group=API_GROUP)
CONF.register_opts(pecan_opts, group=PECAN_GROUP)
CONF.register_opts(engine_opts, group=ENGINE_GROUP)
CONF.register_opts(storage_opts, group=STORAGE_GROUP)
CONF.register_opts(kubernetes_opts, group=KUBERNETES_GROUP)
CONF.register_cli_opts(CLI_OPTS)
default_group_opts = itertools.chain(
@ -117,6 +149,7 @@ def list_opts():
(PECAN_GROUP, pecan_opts),
(ENGINE_GROUP, engine_opts),
(STORAGE_GROUP, storage_opts),
(KUBERNETES_GROUP, kubernetes_opts),
(None, default_group_opts)
]

View File

@ -83,3 +83,26 @@ def update_function(id, values):
def delete_function(id):
IMPL.delete_function(id)
# Function
def create_runtime(values):
return IMPL.create_runtime(values)
def get_runtime(id):
return IMPL.get_runtime(id)
def get_runtimes():
return IMPL.get_runtimes()
def delete_runtime(id):
return IMPL.delete_runtime(id)
def update_runtime(id, values):
return IMPL.update_runtime(id, values)

View File

@ -37,7 +37,7 @@ def _get_facade():
return _FACADE
def get_session(expire_on_commit=True, autocommit=False):
def get_session(expire_on_commit=False, autocommit=False):
"""Helper method to grab session."""
facade = _get_facade()
return facade.get_session(expire_on_commit=expire_on_commit,

View File

@ -18,10 +18,14 @@ import threading
from oslo_config import cfg
from oslo_db import exception as oslo_db_exc
from oslo_db.sqlalchemy import utils as db_utils
from oslo_log import log as logging
import sqlalchemy as sa
from qinling import context
from qinling.db import base as db_base
from qinling.db.sqlalchemy import filters as db_filters
from qinling.db.sqlalchemy import model_base
from qinling.db.sqlalchemy import models
from qinling import exceptions as exc
@ -98,6 +102,86 @@ def transaction():
end_tx()
def _secure_query(model, *columns):
query = db_base.model_query(model, columns)
if not issubclass(model, model_base.QinlingSecureModelBase):
return query
query = query.filter(model.project_id == context.get_ctx().projectid)
return query
def _paginate_query(model, limit=None, marker=None, sort_keys=None,
sort_dirs=None, query=None):
if not query:
query = _secure_query(model)
sort_keys = sort_keys if sort_keys else []
if 'id' not in sort_keys:
sort_keys.append('id')
sort_dirs.append('asc') if sort_dirs else None
query = db_utils.paginate_query(
query,
model,
limit,
sort_keys,
marker=marker,
sort_dirs=sort_dirs
)
return query
def _get_collection(model, insecure=False, limit=None, marker=None,
sort_keys=None, sort_dirs=None, fields=None, **filters):
columns = (
tuple([getattr(model, f) for f in fields if hasattr(model, f)])
if fields else ()
)
query = (db_base.model_query(model, *columns) if insecure
else _secure_query(model, *columns))
query = db_filters.apply_filters(query, model, **filters)
query = _paginate_query(
model,
limit,
marker,
sort_keys,
sort_dirs,
query
)
try:
return query.all()
except Exception as e:
raise exc.DBError(
"Failed when querying database, error type: %s, "
"error message: %s" % (e.__class__.__name__, str(e))
)
def _get_collection_sorted_by_time(model, insecure=False, fields=None,
sort_keys=['created_at'], **kwargs):
return _get_collection(
model=model,
insecure=insecure,
sort_keys=sort_keys,
fields=fields,
**kwargs
)
def _get_db_object_by_id(model, id, insecure=False):
query = db_base.model_query(model) if insecure else _secure_query(model)
return query.filter_by(id=id).first()
@db_base.session_aware()
def get_function(id):
pass
@ -132,3 +216,40 @@ def update_function(id, values):
@db_base.session_aware()
def delete_function(id):
pass
@db_base.session_aware()
def create_runtime(values, session=None):
runtime = models.Runtime()
runtime.update(values.copy())
try:
runtime.save(session=session)
except oslo_db_exc.DBDuplicateEntry as e:
raise exc.DBError(
"Duplicate entry for Runtime: %s" % e.columns
)
return runtime
@db_base.session_aware()
def get_runtime(id, session=None):
runtime = _get_db_object_by_id(models.Runtime, id)
if not runtime:
raise exc.DBEntityNotFoundError("Runtime not found [id=%s]" % id)
return runtime
@db_base.session_aware()
def get_runtimes(session=None, **kwargs):
return _get_collection_sorted_by_time(models.Runtime, **kwargs)
@db_base.session_aware()
def delete_runtime(id, session=None):
runtime = get_runtime(id)
session.delete(runtime)

View File

@ -0,0 +1,67 @@
# Copyright 2016 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy as sa
def apply_filters(query, model, **filters):
filter_dict = {}
for key, value in filters.items():
column_attr = getattr(model, key)
if isinstance(value, dict):
if 'in' in value:
query = query.filter(column_attr.in_(value['in']))
elif 'nin' in value:
query = query.filter(~column_attr.in_(value['nin']))
elif 'neq' in value:
query = query.filter(column_attr != value['neq'])
elif 'gt' in value:
query = query.filter(column_attr > value['gt'])
elif 'gte' in value:
query = query.filter(column_attr >= value['gte'])
elif 'lt' in value:
query = query.filter(column_attr < value['lt'])
elif 'lte' in value:
query = query.filter(column_attr <= value['lte'])
elif 'eq' in value:
query = query.filter(column_attr == value['eq'])
elif 'has' in value:
like_pattern = '%{0}%'.format(value['has'])
query = query.filter(column_attr.like(like_pattern))
else:
filter_dict[key] = value
# We need to handle tag case seprately. As tag datatype is MutableList.
# TODO(hparekh): Need to think how can we get rid of this.
tags = filters.pop('tags', None)
# To match the tag list, a resource must contain at least all of the
# tags present in the filter parameter.
if tags:
tag_attr = getattr(model, 'tags')
if not isinstance(tags, list):
expr = tag_attr.contains(tags)
else:
expr = sa.and_(*[tag_attr.contains(tag) for tag in tags])
query = query.filter(expr)
if filter_dict:
query = query.filter_by(**filter_dict)
return query

View File

@ -0,0 +1,58 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = qinling/db/sqlalchemy/migration/alembic_migrations
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# max length of characters to apply to the
# "slug" field
#truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
sqlalchemy.url =
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@ -0,0 +1,61 @@
The migrations in `alembic_migrations/versions` contain the changes needed to migrate
between Qinling database revisions. A migration occurs by executing a script that
details the changes needed to upgrade the database. The migration scripts
are ordered so that multiple scripts can run sequentially. The scripts are executed by
Qinling's migration wrapper which uses the Alembic library to manage the migration. Qinling
supports migration from Pike or later.
You can upgrade to the latest database version via:
```
qinling-db-manage --config-file /path/to/qinling.conf upgrade head
```
To check the current database version:
```
qinling-db-manage --config-file /path/to/qinling.conf current
```
To create a script to run the migration offline:
```
qinling-db-manage --config-file /path/to/qinling.conf upgrade head --sql
```
To run the offline migration between specific migration versions:
```
qinling-db-manage --config-file /path/to/qinling.conf upgrade <start version>:<end version> --sql
```
Upgrade the database incrementally:
```
qinling-db-manage --config-file /path/to/qinling.conf upgrade --delta <# of revs>
```
Or, upgrade the database to one newer revision:
```
qinling-db-manage --config-file /path/to/qinling.conf upgrade +1
```
Create new revision:
```
qinling-db-manage --config-file /path/to/qinling.conf revision -m "description of revision" --autogenerate
```
Create a blank file:
```
qinling-db-manage --config-file /path/to/qinling.conf revision -m "description of revision"
```
This command does not perform any migrations, it only sets the revision.
Revision may be any existing revision. Use this command carefully.
```
qinling-db-manage --config-file /path/to/qinling.conf stamp <revision>
```
To verify that the timeline does branch, you can run this command:
```
qinling-db-manage --config-file /path/to/qinling.conf check_migration
```
If the migration path has branch, you can find the branch point via:
```
qinling-db-manage --config-file /path/to/qinling.conf history

View File

@ -0,0 +1,84 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
from alembic import context
from logging import config as c
from oslo_utils import importutils
from sqlalchemy import create_engine
from sqlalchemy import pool
from qinling.db.sqlalchemy import model_base
importutils.try_import('qinling.db.sqlalchemy.models')
# This is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
qinling_config = config.qinling_config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
c.fileConfig(config.config_file_name)
# Add your model's MetaData object here for 'autogenerate' support.
target_metadata = model_base.QinlingSecureModelBase.metadata
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
context.configure(url=qinling_config.database.connection)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
engine = create_engine(
qinling_config.database.connection,
poolclass=pool.NullPool
)
connection = engine.connect()
context.configure(
connection=connection,
target_metadata=target_metadata
)
try:
with context.begin_transaction():
context.run_migrations()
finally:
connection.close()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@ -0,0 +1,33 @@
# Copyright ${create_date.year} OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision}
Create Date: ${create_date}
"""
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
def upgrade():
${upgrades if upgrades else "pass"}

View File

@ -0,0 +1,45 @@
# Copyright 2017 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pike release
Revision ID: 001
Revises: None
Create Date: 2017-05-03 12:02:51.935368
"""
# revision identifiers, used by Alembic.
revision = '001'
down_revision = None
from alembic import op
import sqlalchemy as sa
from qinling.db.sqlalchemy import types as st
def upgrade():
op.create_table(
'runtime',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('project_id', sa.String(length=80), nullable=False),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('name', sa.String(length=255), nullable=True),
sa.Column('description', sa.String(length=255), nullable=True),
sa.Column('image', sa.String(length=255), nullable=False),
sa.Column('status', sa.String(length=32), nullable=False),
sa.PrimaryKeyConstraint('id'),
)

View File

@ -0,0 +1,123 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Starter script for qinling-db-manage."""
import os
import sys
from alembic import command as alembic_cmd
from alembic import config as alembic_cfg
from alembic import util as alembic_u
from oslo_config import cfg
from oslo_utils import importutils
import six
from qinling import config
# We need to import mistral.api.app to
# make sure we register all needed options.
importutils.try_import('qinling.api.app')
CONF = cfg.CONF
def do_alembic_command(config, cmd, *args, **kwargs):
try:
getattr(alembic_cmd, cmd)(config, *args, **kwargs)
except alembic_u.CommandError as e:
alembic_u.err(six.text_type(e))
def do_check_migration(config, _cmd):
do_alembic_command(config, 'branches')
def do_upgrade(config, cmd):
if not CONF.command.revision and not CONF.command.delta:
raise SystemExit('You must provide a revision or relative delta')
revision = CONF.command.revision
if CONF.command.delta:
sign = '+' if CONF.command.name == 'upgrade' else '-'
revision = sign + str(CONF.command.delta)
do_alembic_command(config, cmd, revision, sql=CONF.command.sql)
def do_stamp(config, cmd):
do_alembic_command(
config, cmd,
CONF.command.revision,
sql=CONF.command.sql
)
def do_revision(config, cmd):
do_alembic_command(
config, cmd,
message=CONF.command.message,
autogenerate=CONF.command.autogenerate,
sql=CONF.command.sql
)
def add_command_parsers(subparsers):
for name in ['current', 'history', 'branches']:
parser = subparsers.add_parser(name)
parser.set_defaults(func=do_alembic_command)
parser = subparsers.add_parser('upgrade')
parser.add_argument('--delta', type=int)
parser.add_argument('--sql', action='store_true')
parser.add_argument('revision', nargs='?')
parser.set_defaults(func=do_upgrade)
parser = subparsers.add_parser('stamp')
parser.add_argument('--sql', action='store_true')
parser.add_argument('revision', nargs='?')
parser.set_defaults(func=do_stamp)
parser = subparsers.add_parser('revision')
parser.add_argument('-m', '--message')
parser.add_argument('--autogenerate', action='store_true')
parser.add_argument('--sql', action='store_true')
parser.set_defaults(func=do_revision)
command_opt = cfg.SubCommandOpt('command',
title='Command',
help='Available commands',
handler=add_command_parsers)
CONF.register_cli_opt(command_opt)
def main():
config = alembic_cfg.Config(
os.path.join(os.path.dirname(__file__), 'alembic.ini')
)
config.set_main_option(
'script_location',
'qinling.db.sqlalchemy.migration:alembic_migrations'
)
# attach the Qinling conf to the Alembic conf
config.qinling_config = CONF
CONF(project='qinling')
CONF.command.func(config, CONF.command.name)
if __name__ == '__main__':
sys.exit(main())

View File

@ -20,6 +20,8 @@ import sqlalchemy as sa
from sqlalchemy.ext import declarative
from sqlalchemy.orm import attributes
from qinling import context
def id_column():
return sa.Column(
@ -29,6 +31,10 @@ def id_column():
)
def get_project_id():
return context.get_ctx().projectid
class _QinlingModelBase(oslo_models.ModelBase, oslo_models.TimestampMixin):
"""Base class for all Qinling SQLAlchemy DB Models."""
@ -109,4 +115,8 @@ class QinlingSecureModelBase(QinlingModelBase):
__abstract__ = True
id = id_column()
project_id = sa.Column(sa.String(80), nullable=False)
project_id = sa.Column(
sa.String(80),
nullable=False,
default=get_project_id
)

View File

@ -33,3 +33,12 @@ class Function(model_base.QinlingSecureModelBase):
provider = sa.Column(sa.String(32), nullable=False)
package = sa.Column(sa.Boolean, nullable=False)
code = sa.Column(st.JsonLongDictType(), nullable=False)
class Runtime(model_base.QinlingSecureModelBase):
__tablename__ = 'runtime'
name = sa.Column(sa.String(255))
description = sa.Column(sa.String(255))
image = sa.Column(sa.String(255), nullable=False)
status = sa.Column(sa.String(32), nullable=False)

View File

@ -1,26 +0,0 @@
# Copyright 2017 Catalyst IT Limited
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class Engine(object):
"""Engine interface."""
@abc.abstractmethod
def create_environment(self):
raise NotImplementedError

View File

@ -15,11 +15,53 @@
from oslo_config import cfg
from oslo_log import log as logging
from qinling.engine import base
from qinling.db import api as db_api
from qinling import exceptions as exc
LOG = logging.getLogger(__name__)
class DefaultEngine(base.Engine):
def create_environment(self, ctx):
LOG.info('Received request.')
class DefaultEngine(object):
def __init__(self, orchestrator):
self.orchestrator = orchestrator
def create_runtime(self, ctx, runtime_id):
LOG.info('Start to create runtime, id=%s', runtime_id)
with db_api.transaction():
runtime = db_api.get_runtime(runtime_id)
identifier = '%s-%s' % (runtime_id, runtime.name)
labels = {'runtime_name': runtime.name, 'runtime_id': runtime_id}
try:
self.orchestrator.create_pool(
identifier,
runtime.image,
labels=labels,
)
runtime.status = 'available'
except Exception as e:
LOG.exception(
'Failed to create pool for runtime %s. Error: %s',
runtime_id,
str(e)
)
runtime.status = 'error'
raise exc.OrchestratorException('Failed to create pool.')
def delete_runtime(self, ctx, runtime_id):
LOG.info('Start to delete runtime, id=%s', runtime_id)
with db_api.transaction():
runtime = db_api.get_runtime(runtime_id)
identifier = '%s-%s' % (runtime_id, runtime.name)
labels = {'runtime_name': runtime.name, 'runtime_id': runtime_id}
self.orchestrator.delete_pool(identifier, labels=labels)
db_api.delete_runtime(runtime_id)
LOG.info('Runtime %s deleted.', runtime_id)

View File

@ -19,9 +19,11 @@ from oslo_service import service
from qinling.db import api as db_api
from qinling.engine import default_engine as engine
from qinling.engine import rpc
from qinling.orchestrator import base as orchestra_base
from qinling import rpc
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class EngineService(service.Service):
@ -31,11 +33,13 @@ class EngineService(service.Service):
self.server = None
def start(self):
topic = cfg.CONF.engine.topic
server = cfg.CONF.engine.host
transport = messaging.get_transport(cfg.CONF)
orchestrator = orchestra_base.load_orchestrator(CONF)
topic = CONF.engine.topic
server = CONF.engine.host
transport = messaging.get_transport(CONF)
target = messaging.Target(topic=topic, server=server, fanout=False)
endpoints = [engine.DefaultEngine()]
endpoints = [engine.DefaultEngine(orchestrator)]
self.server = messaging.get_rpc_server(
transport,
target,

View File

@ -68,6 +68,16 @@ class DBError(QinlingException):
http_code = 400
class DBEntityNotFoundError(DBError):
http_code = 404
message = "Object not found"
class ApplicationContextNotFoundException(QinlingException):
http_code = 400
message = "Application context not found"
class OrchestratorException(QinlingException):
http_code = 500
message = "Orchestrator error."

View File

@ -0,0 +1,55 @@
# Copyright 2017 Catalyst IT Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
from stevedore import driver
from qinling import exceptions as exc
ORCHESTRATOR = None
@six.add_metaclass(abc.ABCMeta)
class OrchestratorBase(object):
"""OrchestratorBase interface."""
@abc.abstractmethod
def create_pool(self, name, image, **kwargs):
raise NotImplementedError
@abc.abstractmethod
def delete_pool(self, name, **kwargs):
raise NotImplementedError
def load_orchestrator(conf):
global ORCHESTRATOR
if not ORCHESTRATOR:
try:
mgr = driver.DriverManager('qinling.orchestrator',
conf.engine.orchestrator,
invoke_on_load=True,
invoke_args=[conf])
ORCHESTRATOR = mgr.driver
except Exception as e:
raise exc.OrchestratorException(
'Failed to load orchestrator: %s. Error: %s' %
(conf.engine.orchestrator, str(e))
)
return ORCHESTRATOR

View File

@ -0,0 +1,140 @@
# Copyright 2017 Catalyst IT Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import jinja2
from kubernetes import config
from kubernetes import client
from kubernetes.client import models
from kubernetes.client.rest import ApiException
from oslo_config import cfg
from oslo_log import log as logging
import yaml
from qinling.orchestrator import base
from qinling.utils import common
LOG = logging.getLogger(__name__)
TEMPLATES_DIR = (os.path.dirname(os.path.realpath(__file__)) + '/templates/')
class KubernetesManager(base.OrchestratorBase):
def __init__(self, conf):
self.conf = conf
client.Configuration().host = self.conf.kubernetes.kube_host
self.v1 = client.CoreV1Api()
self.v1extention = client.ExtensionsV1beta1Api()
# Create namespace if not exists
self._ensure_namespace()
# Get templates.
template_loader = jinja2.FileSystemLoader(
searchpath=os.path.dirname(TEMPLATES_DIR)
)
jinja_env = jinja2.Environment(
loader=template_loader, autoescape=True, trim_blocks=True
)
self.deployment_template = jinja_env.get_template('deployment.j2')
def _ensure_namespace(self):
ret = self.v1.list_namespace()
cur_names = [i.metadata.name for i in ret.items]
if self.conf.kubernetes.namespace not in cur_names:
LOG.info('Creating namespace: %s', self.conf.kubernetes.namespace)
namespace_body = {
'apiVersion': 'v1',
'kind': 'Namespace',
'metadata': {
'name': self.conf.kubernetes.namespace,
'labels': {
'name': self.conf.kubernetes.namespace
}
},
}
self.v1.create_namespace(namespace_body)
LOG.info('Namespace %s created.', self.conf.kubernetes.namespace)
def create_pool(self, name, image, labels=None):
deployment_body = self.deployment_template.render(
{
"name": name,
"labels": labels if labels else {},
"replicas": self.conf.kubernetes.replicas,
"volume_name": self.conf.kubernetes.volume_name,
"container_name": 'worker',
"image": image,
}
)
LOG.info(
"Creating deployment for runtime %s: \n%s", name, deployment_body
)
self.v1extention.create_namespaced_deployment(
body=yaml.safe_load(deployment_body),
namespace=self.conf.kubernetes.namespace
)
LOG.info("Deployment for runtime %s created.", name)
def delete_pool(self, name, labels=None):
"""Delete all resources belong to the deployment."""
LOG.info("Deleting deployment %s", name)
selector = common.convert_dict_to_string(labels)
self.v1.delete_collection_namespaced_pod(
self.conf.kubernetes.namespace,
label_selector=selector
)
LOG.info("Pods in deployment %s deleted.", name)
self.v1extention.delete_collection_namespaced_replica_set(
self.conf.kubernetes.namespace,
label_selector=selector
)
LOG.info("ReplicaSets in deployment %s deleted.", name)
ret = self.v1.list_namespaced_service(
self.conf.kubernetes.namespace, label_selector=selector
)
names = [i.metadata.name for i in ret.items]
for name in names:
self.v1.delete_namespaced_service(
name,
self.conf.kubernetes.namespace,
models.v1_delete_options.V1DeleteOptions()
)
LOG.info("Services in deployment %s deleted.", name)
self.v1extention.delete_collection_namespaced_deployment(
self.conf.kubernetes.namespace,
label_selector=selector,
field_selector='metadata.name=%s' % name
)
LOG.info("Deployment %s deleted.", name)

View File

@ -0,0 +1,41 @@
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: {{ name }}
labels:
{% for key, value in labels.items() %}
{{ key}}: {{ value }}
{% endfor %}
spec:
replicas: {{ replicas }}
selector:
matchLabels:
{% for key, value in labels.items() %}
{{ key}}: {{ value }}
{% endfor %}
template:
metadata:
labels:
{% for key, value in labels.items() %}
{{ key}}: {{ value }}
{% endfor %}
spec:
volumes:
- name: {{ volume_name }}
emptyDir: {}
containers:
- name: {{ container_name }}
image: {{ image }}
imagePullPolicy: IfNotPresent
volumeMounts:
- name: {{ volume_name }}
mountPath: /function
- name: fetcher
image: fission/fetcher
imagePullPolicy: IfNotPresent
volumeMounts:
- name: {{ volume_name }}
mountPath: /function
command:
- /fetcher
- /function

View File

View File

@ -18,7 +18,6 @@ import oslo_messaging as messaging
from oslo_messaging.rpc import client
from qinling import context as ctx
from qinling.engine import base
from qinling import exceptions as exc
LOG = logging.getLogger(__name__)
@ -111,7 +110,7 @@ class ContextSerializer(messaging.Serializer):
return qinling_ctx
class EngineClient(base.Engine):
class EngineClient(object):
"""RPC Engine client."""
def __init__(self, transport):
@ -122,15 +121,26 @@ class EngineClient(base.Engine):
serializer = ContextSerializer(
messaging.serializer.JsonPayloadSerializer())
self.topic = cfg.CONF.engine.topic
self._client = messaging.RPCClient(
transport,
messaging.Target(topic=cfg.CONF.engine.topic),
messaging.Target(topic=self.topic),
serializer=serializer
)
@wrap_messaging_exception
def create_environment(self):
return self._client.cast(
def create_runtime(self, id):
return self._client.prepare(topic=self.topic, server=None).cast(
ctx.get_ctx(),
'create_environment'
'create_runtime',
runtime_id=id
)
@wrap_messaging_exception
def delete_runtime(self, id):
return self._client.prepare(topic=self.topic, server=None).cast(
ctx.get_ctx(),
'delete_runtime',
runtime_id=id
)

18
qinling/utils/common.py Normal file
View File

@ -0,0 +1,18 @@
# Copyright 2017 Catalyst IT Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
def convert_dict_to_string(d):
temp_list = ['%s=%s' % (k, v) for k, v in d.items()]
return ','.join(temp_list)

View File

@ -22,3 +22,5 @@ SQLAlchemy>=1.0.10,!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8 # MIT
sqlalchemy-migrate>=0.9.6 # Apache-2.0
stevedore>=1.20.0 # Apache-2.0
WSME>=0.8 # MIT
kubernetes>=1.0.0b1 # Apache-2.0
PyYAML>=3.10.0 # MIT

View File

@ -25,9 +25,13 @@ packages =
[entry_points]
console_scripts =
qinling-server = qinling.cmd.launch:main
qinling-db-manage = qinling.db.sqlalchemy.migration.cli:main
qinling.orchestrator =
kubernetes = qinling.orchestrator.kubernetes.manager:KubernetesManager
oslo.config.opts =
mistral.config = mistral.config:list_opts
qinling.config = qinling.config:list_opts
[build_sphinx]
all-files = 1