diff --git a/qinling/api/app.py b/qinling/api/app.py index 55504f52..4efe44e8 100644 --- a/qinling/api/app.py +++ b/qinling/api/app.py @@ -17,6 +17,7 @@ import pecan from qinling.api import access_control from qinling import context as ctx +from qinling.db import api as db_api def get_pecan_config(): @@ -40,6 +41,8 @@ def setup_app(config=None): config = get_pecan_config() app_conf = dict(config.app) + db_api.setup_db() + app = pecan.make_app( app_conf.pop('root'), hooks=lambda: [ctx.ContextHook(), ctx.AuthHook()], diff --git a/qinling/api/controllers/v1/environment.py b/qinling/api/controllers/v1/environment.py new file mode 100644 index 00000000..34b5e8b3 --- /dev/null +++ b/qinling/api/controllers/v1/environment.py @@ -0,0 +1,52 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg +from oslo_log import log as logging +from pecan import rest +import wsmeext.pecan as wsme_pecan + +from qinling.api.controllers.v1 import resources +from qinling.api.controllers.v1 import types +from qinling.engine import rpc +from qinling.utils import rest_utils + +LOG = logging.getLogger(__name__) + + +class EnvironmentsController(rest.RestController): + def __init__(self, *args, **kwargs): + self.engine_client = rpc.get_engine_client() + + super(EnvironmentsController, self).__init__(*args, **kwargs) + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose(resources.Environment, types.uuid) + def get(self, id): + LOG.info("Fetch environment [id=%s]", id) + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose( + resources.Environment, + body=resources.Environment, + status_code=201 + ) + def post(self, env): + LOG.info("Create environment. [environment=%s]", env) + + self.engine_client.create_environment() + + return resources.Environment.from_dict( + {'id': '123', 'name': 'python2.7'} + ) diff --git a/qinling/api/controllers/v1/function.py b/qinling/api/controllers/v1/function.py index 3edfefd2..cfb37538 100644 --- a/qinling/api/controllers/v1/function.py +++ b/qinling/api/controllers/v1/function.py @@ -14,13 +14,18 @@ import json +from oslo_config import cfg from oslo_log import log as logging import pecan from pecan import rest import wsmeext.pecan as wsme_pecan from qinling.api.controllers.v1 import resources +from qinling.api.controllers.v1 import types +from qinling import context +from qinling.db import api as db_api from qinling import exceptions as exc +from qinling.storage import base as storage_base from qinling.utils import rest_utils LOG = logging.getLogger(__name__) @@ -29,13 +34,26 @@ POST_REQUIRED = set(['name', 'runtime', 'code']) class FunctionsController(rest.RestController): + def __init__(self, *args, **kwargs): + self.storage_provider = storage_base.load_storage_providers(cfg.CONF) + + super(FunctionsController, self).__init__(*args, **kwargs) + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose(resources.Function, types.uuid) + def get(self, id): + LOG.info("Fetch function [id=%s]", id) + + func_db = db_api.get_function(id) + + return resources.Function.from_dict(func_db.to_dict()) + + def get_data(self, id): + pass + @rest_utils.wrap_pecan_controller_exception @pecan.expose() def post(self, **kwargs): - """Create a new function. - - :param func: Function object. - """ LOG.info("Create function, params=%s", kwargs) if not POST_REQUIRED.issubset(set(kwargs.keys())): @@ -43,18 +61,28 @@ class FunctionsController(rest.RestController): 'Required param is missing. Required: %s' % POST_REQUIRED ) - func = resources.Function() + values = { + 'name': kwargs['name'], + 'runtime': kwargs['runtime'], + 'code': json.loads(kwargs['code']), + 'storage': 'local' + } - func.name = kwargs['name'] - func.runtime = kwargs['runtime'] - func.code = json.loads(kwargs['code']) - - if func.code.get('package', False): + if values['code'].get('package', False): data = kwargs['package'].file.read() - print data + + ctx = context.get_ctx() + with db_api.transaction(): + func_db = db_api.create_function(values) + + self.storage_provider[values['storage']].store( + ctx.projectid, + values['name'], + data + ) pecan.response.status = 201 - return func.to_json() + return resources.Function.from_dict(func_db.to_dict()) @rest_utils.wrap_wsme_controller_exception @wsme_pecan.wsexpose(resources.Functions) diff --git a/qinling/api/controllers/v1/resources.py b/qinling/api/controllers/v1/resources.py index af8e4e87..b6c55a21 100644 --- a/qinling/api/controllers/v1/resources.py +++ b/qinling/api/controllers/v1/resources.py @@ -161,8 +161,6 @@ class Link(Resource): class Function(Resource): - """Function resource.""" - id = wtypes.text name = wtypes.text description = wtypes.text @@ -191,8 +189,6 @@ class Function(Resource): class Functions(ResourceList): - """A collection of Function resources.""" - functions = [Function] def __init__(self, **kwargs): @@ -211,3 +207,42 @@ class Functions(ResourceList): ) return sample + + +class Environment(Resource): + id = wtypes.text + name = wtypes.text + description = wtypes.text + created_at = wtypes.text + updated_at = wtypes.text + + @classmethod + def sample(cls): + return cls( + id='123e4567-e89b-12d3-a456-426655440000', + name='python2.7', + description='Python 2.7 environment.', + created_at='1970-01-01T00:00:00.000000', + updated_at='1970-01-01T00:00:00.000000' + ) + + +class Environments(ResourceList): + environments = [Environment] + + def __init__(self, **kwargs): + self._type = 'environments' + + super(Environments, self).__init__(**kwargs) + + @classmethod + def sample(cls): + sample = cls() + sample.environments = [Environment.sample()] + sample.next = ( + "http://localhost:7070/v1/environments?" + "sort_keys=id,name&sort_dirs=asc,desc&limit=10&" + "marker=123e4567-e89b-12d3-a456-426655440000" + ) + + return sample diff --git a/qinling/api/controllers/v1/root.py b/qinling/api/controllers/v1/root.py index 0df47226..89f906ab 100644 --- a/qinling/api/controllers/v1/root.py +++ b/qinling/api/controllers/v1/root.py @@ -16,6 +16,7 @@ import pecan from wsme import types as wtypes import wsmeext.pecan as wsme_pecan +from qinling.api.controllers.v1 import environment from qinling.api.controllers.v1 import function from qinling.api.controllers.v1 import resources @@ -33,6 +34,7 @@ class Controller(object): """API root controller for version 1.""" functions = function.FunctionsController() + environments = environment.EnvironmentsController() @wsme_pecan.wsexpose(RootResource) def index(self): diff --git a/qinling/cmd/launch.py b/qinling/cmd/launch.py index 6899ed1e..e60bdb19 100644 --- a/qinling/cmd/launch.py +++ b/qinling/cmd/launch.py @@ -38,6 +38,8 @@ from oslo_log import log as logging # noqa from oslo_service import service # noqa from qinling.api import service as api_service # noqa +from qinling.engine import rpc # noqa +from qinling.engine import service as eng_service # noqa from qinling import config # noqa from qinling import version # noqa @@ -45,15 +47,21 @@ CONF = cfg.CONF def launch_api(): - launcher = service.ProcessLauncher(cfg.CONF) - server = api_service.WSGIService('qinling_api') - - launcher.launch_service(server, workers=server.workers) - + launcher = service.launch(CONF, server, workers=server.workers) launcher.wait() +def launch_engine(): + try: + server = eng_service.EngineService() + launcher = service.launch(CONF, server) + launcher.wait() + except RuntimeError as e: + sys.stderr.write("ERROR: %s\n" % e) + sys.exit(1) + + def launch_any(options): # Launch the servers on different threads. threads = [eventlet.spawn(LAUNCH_OPTIONS[option]) @@ -64,6 +72,7 @@ def launch_any(options): LAUNCH_OPTIONS = { 'api': launch_api, + 'engine': launch_engine } QINLING_TITLE = r""" @@ -129,16 +138,16 @@ def main(): logging.setup(CONF, 'Qingling') + # Initialize RPC configuration. + rpc.get_transport() + if cfg.CONF.server == ['all']: - # Launch all servers. launch_any(LAUNCH_OPTIONS.keys()) else: - # Validate launch option. if set(cfg.CONF.server) - set(LAUNCH_OPTIONS.keys()): raise Exception('Valid options are all or any combination of ' ', '.join(LAUNCH_OPTIONS.keys())) - # Launch distinct set of server(s). launch_any(set(cfg.CONF.server)) except RuntimeError as excp: diff --git a/qinling/config.py b/qinling/config.py index 6705d1dd..2fedd26b 100644 --- a/qinling/config.py +++ b/qinling/config.py @@ -69,13 +69,40 @@ pecan_opts = [ ) ] +engine_opts = [ + cfg.StrOpt( + 'host', + default='0.0.0.0', + help='Name of the engine node. This can be an opaque ' + 'identifier. It is not necessarily a hostname, ' + 'FQDN, or IP address.' + ), + cfg.StrOpt( + 'topic', + default='qinling_engine', + help='The message topic that the engine listens on.' + ), +] + +storage_opts = [ + cfg.StrOpt( + 'file_system_dir', + default='/opt/qinling/funtion/packages', + help='Directory to store funtion packages.' + ) +] + CONF = cfg.CONF API_GROUP = 'api' PECAN_GROUP = 'pecan' +ENGINE_GROUP = 'engine' +STORAGE_GROUP = 'storage' CLI_OPTS = [launch_opt] CONF.register_opts(api_opts, group=API_GROUP) CONF.register_opts(pecan_opts, group=PECAN_GROUP) +CONF.register_opts(engine_opts, group=ENGINE_GROUP) +CONF.register_opts(storage_opts, group=STORAGE_GROUP) CONF.register_cli_opts(CLI_OPTS) default_group_opts = itertools.chain( @@ -88,6 +115,8 @@ def list_opts(): return [ (API_GROUP, api_opts), (PECAN_GROUP, pecan_opts), + (ENGINE_GROUP, engine_opts), + (STORAGE_GROUP, storage_opts), (None, default_group_opts) ] diff --git a/qinling/context.py b/qinling/context.py index afdb9a77..566d548d 100644 --- a/qinling/context.py +++ b/qinling/context.py @@ -18,10 +18,16 @@ import pecan from pecan import hooks from qinling import exceptions as exc +from qinling.utils import thread_local CONF = cfg.CONF + ALLOWED_WITHOUT_AUTH = ['/', '/v1/'] +CTX_THREAD_LOCAL_NAME = "QINLING_APP_CTX_THREAD_LOCAL" + +DEFAULT_PROJECT_ID = "" + def authenticate(req): # Refer to: @@ -61,9 +67,39 @@ class AuthHook(hooks.PecanHook): ) +def has_ctx(): + return thread_local.has_thread_local(CTX_THREAD_LOCAL_NAME) + + +def get_ctx(): + if not has_ctx(): + raise exc.ApplicationContextNotFoundException() + + return thread_local.get_thread_local(CTX_THREAD_LOCAL_NAME) + + +def set_ctx(new_ctx): + thread_local.set_thread_local(CTX_THREAD_LOCAL_NAME, new_ctx) + + +class Context(oslo_context.RequestContext): + _session = None + + def __init__(self, is_admin=False, **kwargs): + super(Context, self).__init__(is_admin=is_admin, **kwargs) + + @property + def projectid(self): + if CONF.pecan.auth_enable: + return self.project_id + else: + return DEFAULT_PROJECT_ID + + class ContextHook(hooks.PecanHook): - def on_route(self, state): - context_obj = oslo_context.RequestContext.from_environ( - state.request.environ - ) - state.request.context['qinling_context'] = context_obj + def before(self, state): + context_obj = Context.from_environ(state.request.environ) + set_ctx(context_obj) + + def after(self, state): + set_ctx(None) diff --git a/qinling/db/__init__.py b/qinling/db/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/qinling/db/api.py b/qinling/db/api.py new file mode 100644 index 00000000..8b61de46 --- /dev/null +++ b/qinling/db/api.py @@ -0,0 +1,85 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib + +from oslo_db import api as db_api + + +_BACKEND_MAPPING = { + 'sqlalchemy': 'qinling.db.sqlalchemy.api', +} + +IMPL = db_api.DBAPI('sqlalchemy', backend_mapping=_BACKEND_MAPPING) + + +def setup_db(): + IMPL.setup_db() + + +def drop_db(): + IMPL.drop_db() + + +def start_tx(): + IMPL.start_tx() + + +def commit_tx(): + IMPL.commit_tx() + + +def rollback_tx(): + IMPL.rollback_tx() + + +def end_tx(): + IMPL.end_tx() + + +@contextlib.contextmanager +def transaction(): + with IMPL.transaction(): + yield + + +# Function + + +def get_function(id): + return IMPL.get_function(id) + + +def get_functions(limit=None, marker=None, sort_keys=None, + sort_dirs=None, fields=None, **kwargs): + return IMPL.get_functions( + limit=limit, + marker=marker, + sort_keys=sort_keys, + sort_dirs=sort_dirs, + fields=fields, + **kwargs + ) + + +def create_function(values): + return IMPL.create_function(values) + + +def update_function(id, values): + return IMPL.update_function(id, values) + + +def delete_function(id): + IMPL.delete_function(id) diff --git a/qinling/db/base.py b/qinling/db/base.py new file mode 100644 index 00000000..9038f763 --- /dev/null +++ b/qinling/db/base.py @@ -0,0 +1,184 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools + +from oslo_config import cfg +from oslo_db import options +from oslo_db.sqlalchemy import session as db_session + +from qinling.db.sqlalchemy import sqlite_lock +from qinling import exceptions as exc +from qinling.utils import thread_local + +# Note(dzimine): sqlite only works for basic testing. +options.set_defaults(cfg.CONF, connection="sqlite:///qinling.sqlite") + +_FACADE = None + +_DB_SESSION_THREAD_LOCAL_NAME = "db_sql_alchemy_session" + + +def _get_facade(): + global _FACADE + if _FACADE is None: + _FACADE = db_session.EngineFacade.from_config(cfg.CONF, sqlite_fk=True) + return _FACADE + + +def get_session(expire_on_commit=True, autocommit=False): + """Helper method to grab session.""" + facade = _get_facade() + return facade.get_session(expire_on_commit=expire_on_commit, + autocommit=autocommit) + + +def get_engine(): + facade = _get_facade() + return facade.get_engine() + + +def _get_thread_local_session(): + return thread_local.get_thread_local(_DB_SESSION_THREAD_LOCAL_NAME) + + +def _get_or_create_thread_local_session(): + ses = _get_thread_local_session() + + if ses: + return ses, False + + ses = get_session() + _set_thread_local_session(ses) + + return ses, True + + +def _set_thread_local_session(session): + thread_local.set_thread_local(_DB_SESSION_THREAD_LOCAL_NAME, session) + + +def start_tx(): + """Starts transaction. + + Opens new database session and starts new transaction assuming + there wasn't any opened sessions within the same thread. + """ + if _get_thread_local_session(): + raise exc.DBError( + "Database transaction has already been started." + ) + + _set_thread_local_session(get_session()) + + +def commit_tx(): + """Commits previously started database transaction.""" + ses = _get_thread_local_session() + + if not ses: + raise exc.DBError( + "Nothing to commit. Database transaction" + " has not been previously started." + ) + + ses.commit() + + +def rollback_tx(): + """Rolls back previously started database transaction.""" + ses = _get_thread_local_session() + + if not ses: + raise exc.DBError( + "Nothing to roll back. Database transaction has not been started." + ) + + ses.rollback() + + +def end_tx(): + """Ends transaction. + + Ends current database transaction. + It rolls back all uncommitted changes and closes database session. + """ + ses = _get_thread_local_session() + + if not ses: + raise exc.DBError( + "Database transaction has not been started." + ) + + if ses.dirty: + rollback_tx() + + release_locks_if_sqlite(ses) + + ses.close() + _set_thread_local_session(None) + + +def session_aware(): + """Decorator for methods working within db session.""" + + def _decorator(func): + @functools.wraps(func) + def _within_session(*args, **kw): + ses, created = _get_or_create_thread_local_session() + + try: + kw['session'] = ses + + result = func(*args, **kw) + + if created: + ses.commit() + + return result + except Exception: + if created: + ses.rollback() + raise + finally: + if created: + _set_thread_local_session(None) + ses.close() + + return _within_session + + return _decorator + + +@session_aware() +def get_driver_name(session=None): + return session.bind.url.drivername + + +def release_locks_if_sqlite(session): + if get_driver_name() == 'sqlite': + sqlite_lock.release_locks(session) + + +@session_aware() +def model_query(model, columns=(), session=None): + """Query helper. + + :param model: Base model to query. + :param columns: Optional. Which columns to be queried. + """ + if columns: + return session.query(*columns) + + return session.query(model) diff --git a/qinling/db/sqlalchemy/__init__.py b/qinling/db/sqlalchemy/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py new file mode 100644 index 00000000..9050615f --- /dev/null +++ b/qinling/db/sqlalchemy/api.py @@ -0,0 +1,134 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import sys +import threading + +from oslo_config import cfg +from oslo_db import exception as oslo_db_exc +from oslo_log import log as logging +import sqlalchemy as sa + +from qinling.db import base as db_base +from qinling.db.sqlalchemy import models +from qinling import exceptions as exc + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + +_SCHEMA_LOCK = threading.RLock() +_initialized = False + + +def get_backend(): + """Consumed by openstack common code. + + The backend is this module itself. + :return Name of db backend. + """ + return sys.modules[__name__] + + +def setup_db(): + global _initialized + + with _SCHEMA_LOCK: + if _initialized: + return + + try: + models.Function.metadata.create_all(db_base.get_engine()) + + _initialized = True + except sa.exc.OperationalError as e: + raise exc.DBError("Failed to setup database: %s" % str(e)) + + +def drop_db(): + global _initialized + + with _SCHEMA_LOCK: + if not _initialized: + return + + try: + models.Function.metadata.drop_all(db_base.get_engine()) + + _initialized = False + except Exception as e: + raise exc.DBError("Failed to drop database: %s" % str(e)) + + +def start_tx(): + db_base.start_tx() + + +def commit_tx(): + db_base.commit_tx() + + +def rollback_tx(): + db_base.rollback_tx() + + +def end_tx(): + db_base.end_tx() + + +@contextlib.contextmanager +def transaction(): + start_tx() + + try: + yield + commit_tx() + finally: + end_tx() + + +@db_base.session_aware() +def get_function(id): + pass + + +@db_base.session_aware() +def get_functions(limit=None, marker=None, sort_keys=None, + sort_dirs=None, fields=None, **kwargs): + pass + + +@db_base.session_aware() +def create_function(values, session=None): + func = models.Function() + func.update(values.copy()) + + try: + func.save(session=session) + except oslo_db_exc.DBDuplicateEntry as e: + raise exc.DBError( + "Duplicate entry for Function: %s" % e.columns + ) + + return func + + +@db_base.session_aware() +def update_function(id, values): + pass + + +@db_base.session_aware() +def delete_function(id): + pass diff --git a/qinling/db/sqlalchemy/model_base.py b/qinling/db/sqlalchemy/model_base.py new file mode 100644 index 00000000..313565ff --- /dev/null +++ b/qinling/db/sqlalchemy/model_base.py @@ -0,0 +1,112 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import six + +from oslo_db.sqlalchemy import models as oslo_models +from oslo_utils import uuidutils +import sqlalchemy as sa +from sqlalchemy.ext import declarative +from sqlalchemy.orm import attributes + + +def id_column(): + return sa.Column( + sa.String(36), + primary_key=True, + default=uuidutils.generate_uuid() + ) + + +class _QinlingModelBase(oslo_models.ModelBase, oslo_models.TimestampMixin): + """Base class for all Qinling SQLAlchemy DB Models.""" + + __table__ = None + + __hash__ = object.__hash__ + + def __init__(self, **kwargs): + for key, value in kwargs.items(): + setattr(self, key, value) + + def __eq__(self, other): + if type(self) is not type(other): + return False + + for col in self.__table__.columns: + # In case of single table inheritance a class attribute + # corresponding to a table column may not exist so we need + # to skip these attributes. + if (hasattr(self, col.name) and hasattr(other, col.name) and + getattr(self, col.name) != getattr(other, col.name)): + return False + + return True + + def __ne__(self, other): + return not self.__eq__(other) + + def to_dict(self): + """sqlalchemy based automatic to_dict method.""" + d = {} + + # If a column is unloaded at this point, it is + # probably deferred. We do not want to access it + # here and thereby cause it to load. + unloaded = attributes.instance_state(self).unloaded + + for col in self.__table__.columns: + if col.name not in unloaded and hasattr(self, col.name): + d[col.name] = getattr(self, col.name) + + datetime_to_str(d, 'created_at') + datetime_to_str(d, 'updated_at') + + return d + + def get_clone(self): + """Clones current object, loads all fields and returns the result.""" + m = self.__class__() + + for col in self.__table__.columns: + if hasattr(self, col.name): + setattr(m, col.name, getattr(self, col.name)) + + setattr(m, 'created_at', getattr(self, 'created_at').isoformat(' ')) + + updated_at = getattr(self, 'updated_at') + if updated_at: + setattr(m, 'updated_at', updated_at.isoformat(' ')) + + return m + + def __repr__(self): + return '%s %s' % (type(self).__name__, self.to_dict().__repr__()) + + +def datetime_to_str(dct, attr_name): + if (dct.get(attr_name) is not None and + not isinstance(dct.get(attr_name), six.string_types)): + dct[attr_name] = dct[attr_name].isoformat(' ') + + +QinlingModelBase = declarative.declarative_base(cls=_QinlingModelBase) + + +class QinlingSecureModelBase(QinlingModelBase): + """Base class for all secure models.""" + __abstract__ = True + + id = id_column() + project_id = sa.Column(sa.String(80), nullable=False) diff --git a/qinling/db/sqlalchemy/models.py b/qinling/db/sqlalchemy/models.py new file mode 100644 index 00000000..1753fa42 --- /dev/null +++ b/qinling/db/sqlalchemy/models.py @@ -0,0 +1,35 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sqlalchemy as sa + +from qinling.db.sqlalchemy import model_base +from qinling.db.sqlalchemy import types as st + + +class Function(model_base.QinlingSecureModelBase): + __tablename__ = 'function' + + __table_args__ = ( + sa.UniqueConstraint('name', 'project_id'), + ) + + name = sa.Column(sa.String(255), nullable=False) + description = sa.Column(sa.String(255)) + runtime = sa.Column(sa.String(32), nullable=False) + memorysize = sa.Column(sa.Integer, nullable=False) + timeout = sa.Column(sa.Integer, nullable=False) + provider = sa.Column(sa.String(32), nullable=False) + package = sa.Column(sa.Boolean, nullable=False) + code = sa.Column(st.JsonLongDictType(), nullable=False) diff --git a/qinling/db/sqlalchemy/sqlite_lock.py b/qinling/db/sqlalchemy/sqlite_lock.py new file mode 100644 index 00000000..04c90a5d --- /dev/null +++ b/qinling/db/sqlalchemy/sqlite_lock.py @@ -0,0 +1,54 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from eventlet import semaphore + + +_mutex = semaphore.Semaphore() +_locks = {} + + +def acquire_lock(obj_id, session): + with _mutex: + if obj_id not in _locks: + _locks[obj_id] = (session, semaphore.BoundedSemaphore(1)) + + tup = _locks.get(obj_id) + + tup[1].acquire() + + # Make sure to update the dictionary once the lock is acquired + # to adjust session ownership. + _locks[obj_id] = (session, tup[1]) + + +def release_locks(session): + with _mutex: + for obj_id, tup in _locks.items(): + if tup[0] is session: + tup[1].release() + + +def get_locks(): + return _locks + + +def cleanup(): + with _mutex: + # NOTE: For the sake of simplicity we assume that we remove stale locks + # after all tests because this kind of locking can only be used with + # sqlite database. Supporting fully dynamically allocated (and removed) + # locks is much more complex task. If this method is not called after + # tests it will cause a memory leak. + _locks.clear() diff --git a/qinling/db/sqlalchemy/types.py b/qinling/db/sqlalchemy/types.py new file mode 100644 index 00000000..bb76e0c1 --- /dev/null +++ b/qinling/db/sqlalchemy/types.py @@ -0,0 +1,94 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This module implements SQLAlchemy-based types for dict and list +# expressed by json-strings +# + +from oslo_serialization import jsonutils +import sqlalchemy as sa +from sqlalchemy.dialects import mysql +from sqlalchemy.ext import mutable + + +class JsonEncoded(sa.TypeDecorator): + """Represents an immutable structure as a json-encoded string.""" + + impl = sa.Text + + def process_bind_param(self, value, dialect): + if value is not None: + value = jsonutils.dumps(value) + return value + + def process_result_value(self, value, dialect): + if value is not None: + value = jsonutils.loads(value) + return value + + +class MutableList(mutable.Mutable, list): + @classmethod + def coerce(cls, key, value): + """Convert plain lists to MutableList.""" + if not isinstance(value, MutableList): + if isinstance(value, list): + return MutableList(value) + + # this call will raise ValueError + return mutable.Mutable.coerce(key, value) + return value + + def __add__(self, value): + """Detect list add events and emit change events.""" + list.__add__(self, value) + self.changed() + + def append(self, value): + """Detect list add events and emit change events.""" + list.append(self, value) + self.changed() + + def __setitem__(self, key, value): + """Detect list set events and emit change events.""" + list.__setitem__(self, key, value) + self.changed() + + def __delitem__(self, i): + """Detect list del events and emit change events.""" + list.__delitem__(self, i) + self.changed() + + +def JsonDictType(): + """Returns an SQLAlchemy Column Type suitable to store a Json dict.""" + return mutable.MutableDict.as_mutable(JsonEncoded) + + +def JsonListType(): + """Returns an SQLAlchemy Column Type suitable to store a Json array.""" + return MutableList.as_mutable(JsonEncoded) + + +def LongText(): + # TODO(rakhmerov): Need to do for postgres. + return sa.Text().with_variant(mysql.LONGTEXT(), 'mysql') + + +class JsonEncodedLongText(JsonEncoded): + impl = LongText() + + +def JsonLongDictType(): + return mutable.MutableDict.as_mutable(JsonEncodedLongText) diff --git a/qinling/engine/__init__.py b/qinling/engine/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/qinling/engine/base.py b/qinling/engine/base.py new file mode 100644 index 00000000..7ba3d811 --- /dev/null +++ b/qinling/engine/base.py @@ -0,0 +1,26 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + +import six + + +@six.add_metaclass(abc.ABCMeta) +class Engine(object): + """Engine interface.""" + + @abc.abstractmethod + def create_environment(self): + raise NotImplementedError diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py new file mode 100644 index 00000000..269d5c53 --- /dev/null +++ b/qinling/engine/default_engine.py @@ -0,0 +1,25 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg +from oslo_log import log as logging + +from qinling.engine import base + +LOG = logging.getLogger(__name__) + + +class DefaultEngine(base.Engine): + def create_environment(self, ctx): + LOG.info('Received request.') diff --git a/qinling/engine/rpc.py b/qinling/engine/rpc.py new file mode 100644 index 00000000..7904a4fd --- /dev/null +++ b/qinling/engine/rpc.py @@ -0,0 +1,136 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg +from oslo_log import log as logging +import oslo_messaging as messaging +from oslo_messaging.rpc import client + +from qinling import context as ctx +from qinling.engine import base +from qinling import exceptions as exc + +LOG = logging.getLogger(__name__) + +_TRANSPORT = None +_ENGINE_CLIENT = None + + +def cleanup(): + """Intended to be used by tests to recreate all RPC related objects.""" + + global _TRANSPORT + global _ENGINE_CLIENT + + _TRANSPORT = None + _ENGINE_CLIENT = None + + +def get_transport(): + global _TRANSPORT + + if not _TRANSPORT: + _TRANSPORT = messaging.get_transport(cfg.CONF) + + return _TRANSPORT + + +def get_engine_client(): + global _ENGINE_CLIENT + + if not _ENGINE_CLIENT: + _ENGINE_CLIENT = EngineClient(get_transport()) + + return _ENGINE_CLIENT + + +def _wrap_exception_and_reraise(exception): + message = "%s: %s" % (exception.__class__.__name__, exception.args[0]) + + raise exc.QinlingException(message) + + +def wrap_messaging_exception(method): + """This decorator unwrap remote error in one of QinlingException. + + oslo.messaging has different behavior on raising exceptions + when fake or rabbit transports are used. In case of rabbit + transport it raises wrapped RemoteError which forwards directly + to API. Wrapped RemoteError contains one of QinlingException raised + remotely on Engine and for correct exception interpretation we + need to unwrap and raise given exception and manually send it to + API layer. + """ + def decorator(*args, **kwargs): + try: + return method(*args, **kwargs) + except exc.QinlingException: + raise + except (client.RemoteError, Exception) as e: + if hasattr(e, 'exc_type') and hasattr(exc, e.exc_type): + exc_cls = getattr(exc, e.exc_type) + raise exc_cls(e.value) + + _wrap_exception_and_reraise(e) + + return decorator + + +class ContextSerializer(messaging.Serializer): + def __init__(self, base): + self._base = base + + def serialize_entity(self, context, entity): + if not self._base: + return entity + return self._base.serialize_entity(context, entity) + + def deserialize_entity(self, context, entity): + if not self._base: + return entity + return self._base.deserialize_entity(context, entity) + + def serialize_context(self, context): + return context.to_dict() + + def deserialize_context(self, context): + qinling_ctx = ctx.Context.from_dict(context) + ctx.set_ctx(qinling_ctx) + + return qinling_ctx + + +class EngineClient(base.Engine): + """RPC Engine client.""" + + def __init__(self, transport): + """Constructs an RPC client for engine. + + :param transport: Messaging transport. + """ + serializer = ContextSerializer( + messaging.serializer.JsonPayloadSerializer()) + + self._client = messaging.RPCClient( + transport, + messaging.Target(topic=cfg.CONF.engine.topic), + serializer=serializer + ) + + @wrap_messaging_exception + def create_environment(self): + return self._client.cast( + ctx.get_ctx(), + 'create_environment' + ) diff --git a/qinling/engine/service.py b/qinling/engine/service.py new file mode 100644 index 00000000..26daa767 --- /dev/null +++ b/qinling/engine/service.py @@ -0,0 +1,72 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg +from oslo_log import log as logging +import oslo_messaging as messaging +from oslo_service import service + +from qinling.db import api as db_api +from qinling.engine import default_engine as engine +from qinling.engine import rpc + +LOG = logging.getLogger(__name__) + + +class EngineService(service.Service): + def __init__(self): + super(EngineService, self).__init__() + + self.server = None + + def start(self): + topic = cfg.CONF.engine.topic + server = cfg.CONF.engine.host + transport = messaging.get_transport(cfg.CONF) + target = messaging.Target(topic=topic, server=server, fanout=False) + endpoints = [engine.DefaultEngine()] + self.server = messaging.get_rpc_server( + transport, + target, + endpoints, + executor='eventlet', + serializer=rpc.ContextSerializer( + messaging.serializer.JsonPayloadSerializer()) + ) + + db_api.setup_db() + + LOG.info('Starting engine...') + self.server.start() + + super(EngineService, self).start() + + def stop(self, graceful=False): + if self.server: + LOG.info('Stopping engine...') + self.server.stop() + if graceful: + LOG.info( + 'Consumer successfully stopped. Waiting for final ' + 'messages to be processed...' + ) + self.server.wait() + + super(EngineService, self).stop(graceful=graceful) + + def reset(self): + if self.server: + self.server.reset() + + super(EngineService, self).reset() diff --git a/qinling/exceptions.py b/qinling/exceptions.py index b17969a7..6ed117e8 100644 --- a/qinling/exceptions.py +++ b/qinling/exceptions.py @@ -62,3 +62,12 @@ class UnauthorizedException(QinlingException): class NotAllowedException(QinlingException): http_code = 403 message = "Operation not allowed" + + +class DBError(QinlingException): + http_code = 400 + + +class ApplicationContextNotFoundException(QinlingException): + http_code = 400 + message = "Application context not found" diff --git a/qinling/orchestrator/__init__.py b/qinling/orchestrator/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/qinling/orchestrator/base.py b/qinling/orchestrator/base.py new file mode 100644 index 00000000..e69de29b diff --git a/qinling/orchestrator/kubernetes/__init__.py b/qinling/orchestrator/kubernetes/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/qinling/orchestrator/swarm/__init__.py b/qinling/orchestrator/swarm/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/qinling/storage/__init__.py b/qinling/storage/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/qinling/storage/base.py b/qinling/storage/base.py new file mode 100644 index 00000000..629d9671 --- /dev/null +++ b/qinling/storage/base.py @@ -0,0 +1,38 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc + +import six + +STORAGE_PROVIDER_MAPPING = {} + + +@six.add_metaclass(abc.ABCMeta) +class PackageStorage(object): + """PackageStorage interface.""" + + @abc.abstractmethod + def store(self, project_id, funtion, data): + raise NotImplementedError + + @abc.abstractmethod + def retrieve(self, project_id, function): + raise NotImplementedError + + +def load_storage_providers(conf): + global STORAGE_PROVIDER_MAPPING + + return STORAGE_PROVIDER_MAPPING diff --git a/qinling/storage/file_system.py b/qinling/storage/file_system.py new file mode 100644 index 00000000..c5b9998e --- /dev/null +++ b/qinling/storage/file_system.py @@ -0,0 +1,39 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_utils import fileutils + +from qinling.storage import base + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class FileSystemStorage(base.PackageStorage): + """Interact with file system for function package storage.""" + + def __init__(self, *args, **kwargs): + fileutils.ensure_tree(CONF.storage.file_system_dir) + + def store(self, project_id, function, data): + LOG.info( + 'Store package, function: %s, project: %s', function, project_id + ) + + def retrieve(self, project_id, function): + LOG.info( + 'Get package data, function: %s, project: %s', function, project_id + ) diff --git a/qinling/utils/thread_local.py b/qinling/utils/thread_local.py new file mode 100644 index 00000000..6eda3c86 --- /dev/null +++ b/qinling/utils/thread_local.py @@ -0,0 +1,67 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading + +from eventlet import corolocal + +_th_loc_storage = threading.local() + + +def _get_greenlet_local_storage(): + greenlet_id = corolocal.get_ident() + + greenlet_locals = getattr(_th_loc_storage, "greenlet_locals", None) + + if not greenlet_locals: + greenlet_locals = {} + _th_loc_storage.greenlet_locals = greenlet_locals + + if greenlet_id in greenlet_locals: + return greenlet_locals[greenlet_id] + else: + return None + + +def has_thread_local(var_name): + gl_storage = _get_greenlet_local_storage() + return gl_storage and var_name in gl_storage + + +def get_thread_local(var_name): + if not has_thread_local(var_name): + return None + + return _get_greenlet_local_storage()[var_name] + + +def set_thread_local(var_name, val): + if val is None and has_thread_local(var_name): + gl_storage = _get_greenlet_local_storage() + + # Delete variable from greenlet local storage. + if gl_storage: + del gl_storage[var_name] + + # Delete the entire greenlet local storage from thread local storage. + if gl_storage and len(gl_storage) == 0: + del _th_loc_storage.greenlet_locals[corolocal.get_ident()] + + if val is not None: + gl_storage = _get_greenlet_local_storage() + if not gl_storage: + gl_storage = _th_loc_storage.greenlet_locals[ + corolocal.get_ident()] = {} + + gl_storage[var_name] = val diff --git a/requirements.txt b/requirements.txt index 2706764f..3b6e2f71 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,6 +8,7 @@ eventlet!=0.18.3,>=0.18.2 # MIT keystonemiddleware>=4.12.0 # Apache-2.0 oslo.concurrency>=3.8.0 # Apache-2.0 oslo.config>=3.22.0 # Apache-2.0 +oslo.db>=4.19.0 # Apache-2.0 oslo.messaging>=5.19.0 # Apache-2.0 oslo.policy>=1.17.0 # Apache-2.0 oslo.utils>=3.20.0 # Apache-2.0 @@ -17,5 +18,7 @@ oslo.service>=1.10.0 # Apache-2.0 pecan!=1.0.2,!=1.0.3,!=1.0.4,!=1.2,>=1.0.0 # BSD setuptools!=24.0.0,!=34.0.0,!=34.0.1,!=34.0.2,!=34.0.3,!=34.1.0,!=34.1.1,!=34.2.0,!=34.3.0,!=34.3.1,!=34.3.2,>=16.0 # PSF/ZPL six>=1.9.0 # MIT +SQLAlchemy>=1.0.10,!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8 # MIT +sqlalchemy-migrate>=0.9.6 # Apache-2.0 stevedore>=1.20.0 # Apache-2.0 WSME>=0.8 # MIT