diff --git a/qinling/api/controllers/v1/function.py b/qinling/api/controllers/v1/function.py index 04930f4b..8566c48c 100644 --- a/qinling/api/controllers/v1/function.py +++ b/qinling/api/controllers/v1/function.py @@ -34,6 +34,7 @@ from qinling import exceptions as exc from qinling import rpc from qinling.storage import base as storage_base from qinling.utils import constants +from qinling.utils import etcd_util from qinling.utils.openstack import keystone as keystone_util from qinling.utils.openstack import swift as swift_util from qinling.utils import rest_utils @@ -51,12 +52,14 @@ class FunctionWorkerController(rest.RestController): @wsme_pecan.wsexpose(resources.FunctionWorkers, types.uuid) def get_all(self, function_id): acl.enforce('function_worker:get_all', context.get_ctx()) - LOG.info("Get workers for function %s.", function_id) - db_workers = db_api.get_function_workers(function_id) - workers = [resources.FunctionWorker.from_dict(db_model.to_dict()) - for db_model in db_workers] + workers = etcd_util.get_workers(function_id, CONF) + workers = [ + resources.FunctionWorker.from_dict( + {'function_id': function_id, 'worker_name': w} + ) for w in workers + ] return resources.FunctionWorkers(workers=workers) @@ -254,6 +257,9 @@ class FunctionsController(rest.RestController): if func_db.trust_id: keystone_util.delete_trust(func_db.trust_id) + # Delete etcd keys + etcd_util.delete_function(id) + # This will also delete function service mapping as well. db_api.delete_function(id) @@ -313,9 +319,9 @@ class FunctionsController(rest.RestController): self._check_swift(swift_info.get('container'), swift_info.get('object')) - # Delete allocated resources in orchestrator. - db_api.delete_function_service_mapping(id) + # Delete allocated resources in orchestrator and etcd keys. self.engine_client.delete_function(id) + etcd_util.delete_function(id) func_db = db_api.update_function(id, values) @@ -363,14 +369,14 @@ class FunctionsController(rest.RestController): """ acl.enforce('function:scale_down', context.get_ctx()) - func_db = db_api.get_function(id) + db_api.get_function(id) + workers = etcd_util.get_workers(id) params = scale.to_dict() - if len(func_db.workers) <= 1: + if len(workers) <= 1: LOG.info('No need to scale down function %s', id) return LOG.info('Starting to scale down function %s, params: %s', id, params) - self.engine_client.scaledown_function(id, count=params['count']) @rest_utils.wrap_wsme_controller_exception @@ -386,5 +392,6 @@ class FunctionsController(rest.RestController): db_api.get_function(id) LOG.info('Starting to detach function %s', id) - # Delete all resources created by orchestrator asynchronously. + # Delete allocated resources in orchestrator and etcd keys. self.engine_client.delete_function(id) + etcd_util.delete_function(id) diff --git a/qinling/api/controllers/v1/resources.py b/qinling/api/controllers/v1/resources.py index bc4a1797..8efcfa36 100644 --- a/qinling/api/controllers/v1/resources.py +++ b/qinling/api/controllers/v1/resources.py @@ -214,7 +214,6 @@ class Functions(ResourceList): class FunctionWorker(Resource): - id = wtypes.text function_id = wsme.wsattr(types.uuid, readonly=True) worker_name = wtypes.text diff --git a/qinling/api/controllers/v1/root.py b/qinling/api/controllers/v1/root.py index d29563c5..3256fdee 100644 --- a/qinling/api/controllers/v1/root.py +++ b/qinling/api/controllers/v1/root.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import pecan from wsme import types as wtypes import wsmeext.pecan as wsme_pecan @@ -28,13 +27,11 @@ class RootResource(resources.Resource): It references all other resources belonging to the API. """ - uri = wtypes.text class Controller(object): """API root controller for version 1.""" - functions = function.FunctionsController() runtimes = runtime.RuntimesController() executions = execution.ExecutionsController() diff --git a/qinling/api/controllers/v1/runtime.py b/qinling/api/controllers/v1/runtime.py index e4c47e55..dcf6f83c 100644 --- a/qinling/api/controllers/v1/runtime.py +++ b/qinling/api/controllers/v1/runtime.py @@ -24,6 +24,7 @@ from qinling.db import api as db_api from qinling import exceptions as exc from qinling import rpc from qinling import status +from qinling.utils import etcd_util from qinling.utils import rest_utils LOG = logging.getLogger(__name__) @@ -143,16 +144,13 @@ class RuntimesController(rest.RestController): ) func_ids = [func.id for func in db_funcs] - mappings = db_api.get_function_service_mappings( - insecure=True, function_id={'in': func_ids} - ) - if mappings: - raise exc.NotAllowedException( - 'Runtime %s is still in use by functions.' % id - ) + for id in func_ids: + if etcd_util.get_service_url(id): + raise exc.NotAllowedException( + 'Runtime %s is still in use by functions.' % id + ) values['status'] = status.UPGRADING - self.engine_client.update_runtime( id, image=values['image'], diff --git a/qinling/cmd/launch.py b/qinling/cmd/launch.py index e85c0613..6a6aa154 100644 --- a/qinling/cmd/launch.py +++ b/qinling/cmd/launch.py @@ -47,27 +47,29 @@ CONF = cfg.CONF def launch_api(): - server = api_service.WSGIService('qinling_api') - launcher = service.launch(CONF, server, workers=server.workers) - launcher.wait() + try: + server = api_service.WSGIService('qinling_api') + launcher = service.launch(CONF, server, workers=server.workers) + return launcher + except Exception as e: + sys.stderr.write("ERROR: %s\n" % e) + sys.exit(1) def launch_engine(): try: server = eng_service.EngineService() launcher = service.launch(CONF, server) - launcher.wait() - except RuntimeError as e: + return launcher + except Exception as e: sys.stderr.write("ERROR: %s\n" % e) sys.exit(1) def launch_any(options): - # Launch the servers on different threads. - threads = [eventlet.spawn(LAUNCH_OPTIONS[option]) - for option in options] - - [thread.wait() for thread in threads] + launchers = [LAUNCH_OPTIONS[option]() for option in options] + for l in launchers: + l.wait() LAUNCH_OPTIONS = { diff --git a/qinling/config.py b/qinling/config.py index 47fd5e44..f1a2f023 100644 --- a/qinling/config.py +++ b/qinling/config.py @@ -96,19 +96,14 @@ engine_opts = [ ), cfg.IntOpt( 'function_service_expiration', - default=300, + default=3600, help='Maximum service time in seconds for function in orchestrator.' ), cfg.IntOpt( 'function_concurrency', - default=10, + default=3, help='Maximum number of concurrent executions per function.' ), - cfg.BoolOpt( - 'enable_autoscaling', - default=True, - help='Enables autoscaling capability for function execution.' - ) ] STORAGE_GROUP = 'storage' @@ -134,7 +129,7 @@ kubernetes_opts = [ ), cfg.IntOpt( 'replicas', - default=3, + default=5, help='Number of desired replicas in deployment.' ), cfg.StrOpt( @@ -148,6 +143,18 @@ kubernetes_opts = [ default='127.0.0.1', help='Qinling API service ip address.' ), + cfg.StrOpt( + 'log_devel', + default='INFO', + choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], + help='Log level for kubernetes operations.' + ), +] + +ETCD_GROUP = 'etcd' +etcd_opts = [ + cfg.StrOpt('host', default='127.0.0.1', help='Etcd service host address.'), + cfg.PortOpt('port', default=2379, help='Etcd service port.'), ] @@ -163,37 +170,36 @@ def list_opts(): (ENGINE_GROUP, engine_opts), (STORAGE_GROUP, storage_opts), (KUBERNETES_GROUP, kubernetes_opts), + (ETCD_GROUP, etcd_opts), (None, [launch_opt]) ] return keystone_middleware_opts + keystone_loading_opts + qinling_opts -_DEFAULT_LOG_LEVELS = [ - 'eventlet.wsgi.server=WARN', - 'oslo_service.periodic_task=INFO', - 'oslo_service.loopingcall=INFO', - 'oslo_db=WARN', - 'oslo_concurrency.lockutils=WARN', - 'kubernetes.client.rest=DEBUG', - 'keystoneclient=INFO', - 'requests.packages.urllib3.connectionpool=CRITICAL', - 'urllib3.connectionpool=CRITICAL' -] - - def parse_args(args=None, usage=None, default_config_files=None): - default_log_levels = log.get_default_log_levels() - default_log_levels.extend(_DEFAULT_LOG_LEVELS) - log.set_defaults(default_log_levels=default_log_levels) - log.register_options(CONF) - CLI_OPTS = [launch_opt] CONF.register_cli_opts(CLI_OPTS) for group, options in list_opts(): CONF.register_opts(list(options), group) + _DEFAULT_LOG_LEVELS = [ + 'eventlet.wsgi.server=WARN', + 'oslo_service.periodic_task=INFO', + 'oslo_service.loopingcall=INFO', + 'oslo_db=WARN', + 'oslo_concurrency.lockutils=WARN', + 'kubernetes.client.rest=%s' % CONF.kubernetes.log_devel, + 'keystoneclient=INFO', + 'requests.packages.urllib3.connectionpool=CRITICAL', + 'urllib3.connectionpool=CRITICAL', + ] + default_log_levels = log.get_default_log_levels() + default_log_levels.extend(_DEFAULT_LOG_LEVELS) + log.set_defaults(default_log_levels=default_log_levels) + log.register_options(CONF) + CONF( args=args, project='qinling', diff --git a/qinling/db/api.py b/qinling/db/api.py index 3c38f24a..7516972d 100644 --- a/qinling/db/api.py +++ b/qinling/db/api.py @@ -146,42 +146,6 @@ def delete_executions(**kwargs): return IMPL.delete_executions(**kwargs) -def create_function_service_mapping(values): - return IMPL.create_function_service_mapping(values) - - -def get_function_service_mapping(function_id): - return IMPL.get_function_service_mapping(function_id) - - -def get_function_service_mappings(**kwargs): - return IMPL.get_function_service_mappings(**kwargs) - - -def delete_function_service_mapping(id): - return IMPL.delete_function_service_mapping(id) - - -def create_function_worker(values): - return IMPL.create_function_worker(values) - - -def delete_function_worker(name): - return IMPL.delete_function_worker(name) - - -def delete_function_workers(function_id): - return IMPL.delete_function_workers(function_id) - - -def get_function_workers(function_id): - return IMPL.get_function_workers(function_id) - - -def acquire_worker_lock(id): - return IMPL.acquire_worker_lock(id) - - def create_job(values): return IMPL.create_job(values) diff --git a/qinling/db/base.py b/qinling/db/base.py index 10e77fc9..92138da4 100644 --- a/qinling/db/base.py +++ b/qinling/db/base.py @@ -19,7 +19,6 @@ from oslo_db import options as db_options from oslo_db.sqlalchemy import session as db_session from qinling import context -from qinling.db.sqlalchemy import sqlite_lock from qinling import exceptions as exc from qinling.utils import thread_local @@ -123,8 +122,6 @@ def end_tx(): if ses.dirty: rollback_tx() - release_locks_if_sqlite(ses) - ses.close() _set_thread_local_session(None) @@ -176,16 +173,6 @@ def insecure_aware(): return _decorator -@session_aware() -def get_driver_name(session=None): - return session.bind.url.drivername - - -def release_locks_if_sqlite(session): - if get_driver_name() == 'sqlite': - sqlite_lock.release_locks(session) - - @session_aware() def model_query(model, columns=(), session=None): """Query helper. diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py index c3bdb2f6..c3ae8b54 100644 --- a/qinling/db/sqlalchemy/api.py +++ b/qinling/db/sqlalchemy/api.py @@ -27,7 +27,6 @@ from qinling.db import base as db_base from qinling.db.sqlalchemy import filters as db_filters from qinling.db.sqlalchemy import model_base from qinling.db.sqlalchemy import models -from qinling.db.sqlalchemy import sqlite_lock from qinling import exceptions as exc from qinling import status @@ -355,6 +354,14 @@ def get_execution(id, insecure=None, session=None): return execution +@db_base.session_aware() +def update_execution(id, values, session=None): + execution = get_execution(id) + execution.update(values.copy()) + + return execution + + @db_base.session_aware() def get_executions(session=None, **kwargs): return _get_collection_sorted_by_time(models.Execution, **kwargs) @@ -373,109 +380,6 @@ def delete_executions(session=None, insecure=None, **kwargs): return _delete_all(models.Execution, insecure=insecure, **kwargs) -@db_base.session_aware() -def create_function_service_mapping(values, session=None): - mapping = models.FunctionServiceMapping() - mapping.update(values.copy()) - - # Ignore duplicate error for FunctionServiceMapping - try: - mapping.save(session=session) - except oslo_db_exc.DBDuplicateEntry: - session.close() - - -@db_base.session_aware() -def get_function_service_mapping(function_id, session=None): - mapping = db_base.model_query( - models.FunctionServiceMapping - ).filter_by(function_id=function_id).first() - - if not mapping: - raise exc.DBEntityNotFoundError( - "FunctionServiceMapping not found [function_id=%s]" % function_id - ) - - return mapping - - -@db_base.session_aware() -def get_function_service_mappings(session=None, **kwargs): - return _get_collection_sorted_by_time( - models.FunctionServiceMapping, **kwargs - ) - - -@db_base.session_aware() -def delete_function_service_mapping(id, session=None): - try: - mapping = get_function_service_mapping(id) - except exc.DBEntityNotFoundError: - return - - session.delete(mapping) - - -@db_base.session_aware() -def create_function_worker(values, session=None): - worker = models.FunctionWorkers() - worker.update(values.copy()) - - # Ignore duplicate error for FunctionWorkers - try: - worker.save(session=session) - except oslo_db_exc.DBDuplicateEntry: - session.close() - - return worker - - -@db_base.session_aware() -def get_function_workers(function_id, session=None): - workers = db_base.model_query( - models.FunctionWorkers - ).filter_by(function_id=function_id).all() - - return workers - - -@db_base.session_aware() -def delete_function_worker(worker_name, session=None): - worker = db_base.model_query( - models.FunctionWorkers - ).filter_by(worker_name=worker_name).first() - - if not worker: - raise exc.DBEntityNotFoundError( - "FunctionWorker not found [worker_name=%s]" % worker_name - ) - - session.delete(worker) - - -@db_base.session_aware() -def delete_function_workers(function_id, session=None): - workers = get_function_workers(function_id) - - for worker in workers: - session.delete(worker) - - -@db_base.session_aware() -def acquire_worker_lock(function_id, session=None): - # Expire all so all objects queried after lock is acquired - # will be up-to-date from the DB and not from cache. - session.expire_all() - - if db_base.get_driver_name() == 'sqlite': - # In case of 'sqlite' we need to apply a manual lock. - sqlite_lock.acquire_lock(function_id, session) - - return _secure_query( - models.FunctionWorkers).with_for_update().filter( - models.FunctionWorkers.function_id == function_id).all() - - @db_base.session_aware() def create_job(values, session=None): job = models.Job() diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py index 4ab02d7c..6d4048a6 100644 --- a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py +++ b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py @@ -79,36 +79,6 @@ def upgrade(): info={"check_ifexists": True} ) - op.create_table( - 'function_service_mappings', - sa.Column('id', sa.String(length=36), nullable=False), - sa.Column('created_at', sa.DateTime(), nullable=True), - sa.Column('updated_at', sa.DateTime(), nullable=True), - sa.Column('function_id', sa.String(length=36), nullable=False), - sa.Column('service_url', sa.String(length=255), nullable=False), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('function_id', 'service_url'), - sa.ForeignKeyConstraint( - ['function_id'], [u'functions.id'], ondelete='CASCADE' - ), - info={"check_ifexists": True} - ) - - op.create_table( - 'function_workers', - sa.Column('id', sa.String(length=36), nullable=False), - sa.Column('created_at', sa.DateTime(), nullable=True), - sa.Column('updated_at', sa.DateTime(), nullable=True), - sa.Column('function_id', sa.String(length=36), nullable=False), - sa.Column('worker_name', sa.String(length=255), nullable=False), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('function_id', 'worker_name'), - sa.ForeignKeyConstraint( - ['function_id'], [u'functions.id'], ondelete='CASCADE' - ), - info={"check_ifexists": True} - ) - op.create_table( 'executions', sa.Column('created_at', sa.DateTime(), nullable=True), diff --git a/qinling/db/sqlalchemy/models.py b/qinling/db/sqlalchemy/models.py index 54bb686f..8e537100 100644 --- a/qinling/db/sqlalchemy/models.py +++ b/qinling/db/sqlalchemy/models.py @@ -50,36 +50,6 @@ class Function(model_base.QinlingSecureModelBase): trust_id = sa.Column(sa.String(80)) -class FunctionServiceMapping(model_base.QinlingModelBase): - __tablename__ = 'function_service_mappings' - - __table_args__ = ( - sa.UniqueConstraint('function_id', 'service_url'), - ) - - id = model_base.id_column() - function_id = sa.Column( - sa.String(36), - sa.ForeignKey(Function.id, ondelete='CASCADE'), - ) - service_url = sa.Column(sa.String(255), nullable=False) - - -class FunctionWorkers(model_base.QinlingModelBase): - __tablename__ = 'function_workers' - - __table_args__ = ( - sa.UniqueConstraint('function_id', 'worker_name'), - ) - - id = model_base.id_column() - function_id = sa.Column( - sa.String(36), - sa.ForeignKey(Function.id, ondelete='CASCADE'), - ) - worker_name = sa.Column(sa.String(255), nullable=False) - - class Execution(model_base.QinlingSecureModelBase): __tablename__ = 'executions' @@ -115,21 +85,6 @@ class Job(model_base.QinlingSecureModelBase): return d -# Delete service mapping automatically when deleting function. -Function.service = relationship( - "FunctionServiceMapping", - uselist=False, - lazy='subquery', - cascade="all, delete-orphan" -) -# Delete workers automatically when deleting function. -Function.workers = relationship( - "FunctionWorkers", - order_by="FunctionWorkers.created_at", - lazy='subquery', - cascade="all, delete-orphan" -) - Runtime.functions = relationship("Function", back_populates="runtime") # Only get jobs diff --git a/qinling/db/sqlalchemy/sqlite_lock.py b/qinling/db/sqlalchemy/sqlite_lock.py deleted file mode 100644 index 04c90a5d..00000000 --- a/qinling/db/sqlalchemy/sqlite_lock.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright 2017 Catalyst IT Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from eventlet import semaphore - - -_mutex = semaphore.Semaphore() -_locks = {} - - -def acquire_lock(obj_id, session): - with _mutex: - if obj_id not in _locks: - _locks[obj_id] = (session, semaphore.BoundedSemaphore(1)) - - tup = _locks.get(obj_id) - - tup[1].acquire() - - # Make sure to update the dictionary once the lock is acquired - # to adjust session ownership. - _locks[obj_id] = (session, tup[1]) - - -def release_locks(session): - with _mutex: - for obj_id, tup in _locks.items(): - if tup[0] is session: - tup[1].release() - - -def get_locks(): - return _locks - - -def cleanup(): - with _mutex: - # NOTE: For the sake of simplicity we assume that we remove stale locks - # after all tests because this kind of locking can only be used with - # sqlite database. Supporting fully dynamically allocated (and removed) - # locks is much more complex task. If this method is not called after - # tests it will cause a memory leak. - _locks.clear() diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index a699eb54..f6d53423 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -11,17 +11,17 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - from oslo_config import cfg from oslo_log import log as logging import requests +import tenacity -from qinling import context from qinling.db import api as db_api from qinling.engine import utils from qinling import status from qinling.utils import common from qinling.utils import constants +from qinling.utils import etcd_util LOG = logging.getLogger(__name__) CONF = cfg.CONF @@ -85,6 +85,32 @@ class DefaultEngine(object): LOG.info('Rollbacked.', resource=resource) + @tenacity.retry( + wait=tenacity.wait_fixed(1), + stop=tenacity.stop_after_attempt(30), + retry=(tenacity.retry_if_result(lambda result: result is False)) + ) + def function_load_check(self, function_id, runtime_id): + with etcd_util.get_worker_lock() as lock: + if not lock.is_acquired(): + return False + + workers = etcd_util.get_workers(function_id) + running_execs = db_api.get_executions( + function_id=function_id, status=status.RUNNING + ) + concurrency = (len(running_execs) or 1) / (len(workers) or 1) + if (len(workers) == 0 or + concurrency > CONF.engine.function_concurrency): + LOG.info( + 'Scale up function %s. Current concurrency: %s, execution ' + 'number %s, worker number %s', + function_id, concurrency, len(running_execs), len(workers) + ) + + # NOTE(kong): The increase step could be configurable + return self.scaleup_function(None, function_id, runtime_id, 1) + def create_execution(self, ctx, execution_id, function_id, runtime_id, input=None): LOG.info( @@ -93,123 +119,100 @@ class DefaultEngine(object): execution_id, function_id, runtime_id, input ) - if CONF.engine.enable_autoscaling: - self.function_load_check(function_id, runtime_id) + function = db_api.get_function(function_id) + source = function.code['source'] + image = None + identifier = None + labels = None + svc_url = None - # FIXME(kong): Make the transaction scope smaller. - with db_api.transaction(): - execution = db_api.get_execution(execution_id) - function = db_api.get_function(function_id) + # Auto scale workers if needed + if source != constants.IMAGE_FUNCTION: + svc_url = self.function_load_check(function_id, runtime_id) - if function.service: - func_url = '%s/execute' % function.service.service_url - - LOG.debug( - 'Found service url for function: %s, url: %s', - function_id, func_url - ) - - download_url = ( - 'http://%s:%s/v1/functions/%s?download=true' % - (CONF.kubernetes.qinling_service_address, - CONF.api.port, function_id) - ) - data = { - 'execution_id': execution_id, - 'input': input, - 'function_id': function_id, - 'entry': function.entry, - 'download_url': download_url, - } - if CONF.pecan.auth_enable: - data.update( - { - 'token': context.get_ctx().auth_token, - 'auth_url': CONF.keystone_authtoken.auth_uri, - 'username': CONF.keystone_authtoken.username, - 'password': CONF.keystone_authtoken.password, - 'trust_id': function.trust_id - } - ) - - success, res = utils.url_request( - self.session, func_url, body=data - ) - success = success and res.pop('success') - - LOG.debug( - 'Finished execution %s, success: %s', execution_id, success - ) - - execution.status = status.SUCCESS if success else status.FAILED - execution.logs = res.pop('logs', '') - execution.output = res - return - - source = function.code['source'] - image = None - identifier = None - labels = None - - if source == constants.IMAGE_FUNCTION: - image = function.code['image'] - identifier = ('%s-%s' % - (common.generate_unicode_uuid(dashed=False), - function_id) - )[:63] - labels = {'function_id': function_id} - else: - identifier = runtime_id - labels = {'runtime_id': runtime_id} - - worker_name, service_url = self.orchestrator.prepare_execution( - function_id, - image=image, - identifier=identifier, - labels=labels, - input=input, - ) - success, res = self.orchestrator.run_execution( - execution_id, - function_id, - input=input, - identifier=identifier, - service_url=service_url, - entry=function.entry, - trust_id=function.trust_id + temp_url = etcd_util.get_service_url(function_id) + svc_url = svc_url or temp_url + if svc_url: + func_url = '%s/execute' % svc_url + LOG.debug( + 'Found service url for function: %s, execution: %s, url: %s', + function_id, execution_id, func_url ) - logs = '' - # Execution log is only available for non-image source execution. - if service_url: - logs = res.pop('logs', '') - success = success and res.pop('success') - else: - # If the function is created from docker image, the output is - # direct output, here we convert to a dict to fit into the db - # schema. - res = {'output': res} + data = utils.get_request_data( + CONF, function_id, execution_id, + input, function.entry, function.trust_id + ) + success, res = utils.url_request( + self.session, func_url, body=data + ) + success = success and res.pop('success') LOG.debug( 'Finished execution %s, success: %s', execution_id, success ) - execution.output = res - execution.logs = logs - execution.status = status.SUCCESS if success else status.FAILED + db_api.update_execution( + execution_id, + { + 'status': status.SUCCESS if success else status.FAILED, + 'logs': res.pop('logs', ''), + 'output': res + } + ) + return - # No service is created in orchestrator for single container. - if not image: - mapping = { - 'function_id': function_id, - 'service_url': service_url, + if source == constants.IMAGE_FUNCTION: + image = function.code['image'] + identifier = ('%s-%s' % + (common.generate_unicode_uuid(dashed=False), + function_id) + )[:63] + labels = {'function_id': function_id} + else: + identifier = runtime_id + labels = {'runtime_id': runtime_id} + + _, svc_url = self.orchestrator.prepare_execution( + function_id, + image=image, + identifier=identifier, + labels=labels, + input=input, + ) + success, res = self.orchestrator.run_execution( + execution_id, + function_id, + input=input, + identifier=identifier, + service_url=svc_url, + entry=function.entry, + trust_id=function.trust_id + ) + + logs = '' + # Execution log is only available for non-image source execution. + if svc_url: + logs = res.pop('logs', '') + success = success and res.pop('success') + else: + # If the function is created from docker image, the output is + # direct output, here we convert to a dict to fit into the db + # schema. + res = {'output': res} + + LOG.debug( + 'Finished execution %s, success: %s', execution_id, success + ) + + db_api.update_execution( + execution_id, + { + 'status': status.SUCCESS if success else status.FAILED, + 'logs': logs, + 'output': res } - db_api.create_function_service_mapping(mapping) - worker = { - 'function_id': function_id, - 'worker_name': worker_name - } - db_api.create_function_worker(worker) + ) def delete_function(self, ctx, function_id): """Deletes underlying resources allocated for function.""" @@ -219,60 +222,32 @@ class DefaultEngine(object): labels = {'function_id': function_id} self.orchestrator.delete_function(function_id, labels=labels) - db_api.delete_function_workers(function_id) - LOG.info('Deleted.', resource=resource) def scaleup_function(self, ctx, function_id, runtime_id, count=1): - worker_names = self.orchestrator.scaleup_function( + worker_names, service_url = self.orchestrator.scaleup_function( function_id, identifier=runtime_id, count=count ) for name in worker_names: - worker = { - 'function_id': function_id, - 'worker_name': name - } - db_api.create_function_worker(worker) + etcd_util.create_worker(function_id, name) + + etcd_util.create_service_url(function_id, service_url) LOG.info('Finished scaling up function %s.', function_id) + return service_url def scaledown_function(self, ctx, function_id, count=1): - func_db = db_api.get_function(function_id) + workers = etcd_util.get_workers(function_id) worker_deleted_num = ( - count if len(func_db.workers) > count else len(func_db.workers) - 1 + count if len(workers) > count else len(workers) - 1 ) - workers = func_db.workers[:worker_deleted_num] + workers = workers[:worker_deleted_num] - with db_api.transaction(): - for worker in workers: - LOG.debug('Removing worker %s', worker.worker_name) - self.orchestrator.delete_worker( - worker.worker_name, - ) - db_api.delete_function_worker(worker.worker_name) + for worker in workers: + LOG.debug('Removing worker %s', worker) + self.orchestrator.delete_worker(worker) LOG.info('Finished scaling up function %s.', function_id) - - def function_load_check(self, function_id, runtime_id): - with db_api.transaction(): - db_api.acquire_worker_lock(function_id) - - running_execs = db_api.get_executions( - function_id=function_id, status=status.RUNNING - ) - workers = db_api.get_function_workers(function_id) - - concurrency = (len(running_execs) or 1) / (len(workers) or 1) - - if concurrency > CONF.engine.function_concurrency: - LOG.warning( - 'Scale up function %s because of high concurrency, current' - ' concurrency: %s', - function_id, concurrency - ) - - # TODO(kong): The inscrease step could be configurable - self.scaleup_function(None, function_id, runtime_id, 1) diff --git a/qinling/engine/service.py b/qinling/engine/service.py index e8eb3a83..dd530b92 100644 --- a/qinling/engine/service.py +++ b/qinling/engine/service.py @@ -31,7 +31,6 @@ CONF = cfg.CONF class EngineService(service.Service): def __init__(self): super(EngineService, self).__init__() - self.server = None def start(self): diff --git a/qinling/engine/utils.py b/qinling/engine/utils.py index 169b6450..24a4787b 100644 --- a/qinling/engine/utils.py +++ b/qinling/engine/utils.py @@ -16,6 +16,9 @@ import time from oslo_log import log as logging import requests import six +import tenacity + +from qinling import context LOG = logging.getLogger(__name__) @@ -24,20 +27,66 @@ def url_request(request_session, url, body=None): """Send request to a service url.""" exception = None + # Send ping request first to make sure the url works + try: + temp = url.split('/') + temp[-1] = 'ping' + ping_url = '/'.join(temp) + r = tenacity.Retrying( + wait=tenacity.wait_fixed(0.5), + stop=tenacity.stop_after_attempt(5), + retry=tenacity.retry_if_exception_type(IOError)) + r.call(request_session.get, ping_url, timeout=(3, 3)) + except Exception as e: + LOG.exception( + "Failed to request url %s, error: %s", ping_url, str(e) + ) + return False, {'error': 'Function execution failed.'} + for a in six.moves.xrange(10): try: - # Default execution duration is 3min, could be configurable + # Default execution max duration is 3min, could be configurable r = request_session.post(url, json=body, timeout=(3, 180)) return True, r.json() except requests.ConnectionError as e: exception = e - LOG.warning("Could not connect to service. Retrying.") + # NOTE(kong): Could be configurable time.sleep(1) - except Exception: - LOG.exception("Failed to request url %s", url) + except Exception as e: + LOG.exception("Failed to request url %s, error: %s", url, str(e)) return False, {'error': 'Function execution timeout.'} LOG.exception("Could not connect to function service. Reason: %s", exception) return False, {'error': 'Internal service error.'} + + +def get_request_data(conf, function_id, execution_id, input, entry, trust_id): + ctx = context.get_ctx() + + download_url = ( + 'http://%s:%s/v1/functions/%s?download=true' % + (conf.kubernetes.qinling_service_address, + conf.api.port, function_id) + ) + data = { + 'execution_id': execution_id, + 'input': input, + 'function_id': function_id, + 'entry': entry, + 'download_url': download_url, + 'request_id': ctx.request_id, + } + if conf.pecan.auth_enable: + data.update( + { + 'token': ctx.auth_token, + 'auth_url': conf.keystone_authtoken.auth_uri, + 'username': conf.keystone_authtoken.username, + 'password': conf.keystone_authtoken.password, + 'trust_id': trust_id + } + ) + + return data diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index a1192343..9ce0c914 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -23,7 +23,6 @@ import requests import tenacity import yaml -from qinling import context from qinling.engine import utils from qinling import exceptions as exc from qinling.orchestrator import base @@ -389,30 +388,9 @@ class KubernetesManager(base.OrchestratorBase): trust_id=None): if service_url: func_url = '%s/execute' % service_url - download_url = ( - 'http://%s:%s/v1/functions/%s?download=true' % - (self.conf.kubernetes.qinling_service_address, - self.conf.api.port, function_id) + data = utils.get_request_data( + self.conf, function_id, execution_id, input, entry, trust_id ) - - data = { - 'execution_id': execution_id, - 'input': input, - 'function_id': function_id, - 'entry': entry, - 'download_url': download_url, - 'trust_id': trust_id - } - if self.conf.pecan.auth_enable: - data.update( - { - 'token': context.get_ctx().auth_token, - 'auth_url': self.conf.keystone_authtoken.auth_uri, - 'username': self.conf.keystone_authtoken.username, - 'password': self.conf.keystone_authtoken.password, - } - ) - LOG.debug( 'Invoke function %s, url: %s, data: %s', function_id, func_url, data @@ -466,13 +444,13 @@ class KubernetesManager(base.OrchestratorBase): raise exc.OrchestratorException('Not enough workers available.') for pod in pods: - pod_name, _ = self._prepare_pod( + pod_name, service_url = self._prepare_pod( pod, identifier, function_id, labels ) pod_names.append(pod_name) LOG.info('Pods scaled up for function %s: %s', function_id, pod_names) - return pod_names + return pod_names, service_url def delete_worker(self, pod_name, **kwargs): self.v1.delete_namespaced_pod( diff --git a/qinling/services/periodics.py b/qinling/services/periodics.py index 3b063841..914d14b2 100644 --- a/qinling/services/periodics.py +++ b/qinling/services/periodics.py @@ -26,6 +26,7 @@ from qinling.db.sqlalchemy import models from qinling import rpc from qinling import status from qinling.utils import constants +from qinling.utils import etcd_util from qinling.utils import executions from qinling.utils import jobs from qinling.utils.openstack import keystone as keystone_utils @@ -50,21 +51,19 @@ def handle_function_service_expiration(ctx, engine_client, orchestrator): return for func_db in results: - if not func_db.service: + if not etcd_util.get_service_url(func_db.id): continue - with db_api.transaction(): - LOG.info( - 'Deleting service mapping and workers for function %s', - func_db.id - ) + LOG.info( + 'Deleting service mapping and workers for function %s', + func_db.id + ) - # Delete resources related to the function - engine_client.delete_function(func_db.id) + # Delete resources related to the function + engine_client.delete_function(func_db.id) - # Delete service mapping and worker records - db_api.delete_function_service_mapping(func_db.id) - db_api.delete_function_workers(func_db.id) + # Delete etcd keys + etcd_util.delete_function(func_db.id) def handle_job(engine_client): diff --git a/qinling/tests/unit/api/controllers/v1/test_function.py b/qinling/tests/unit/api/controllers/v1/test_function.py index d4061e0a..92f03258 100644 --- a/qinling/tests/unit/api/controllers/v1/test_function.py +++ b/qinling/tests/unit/api/controllers/v1/test_function.py @@ -103,9 +103,10 @@ class TestFunctionController(base.APITest): self.assertEqual(200, resp.status_int) self.assertEqual('new_name', resp.json['name']) + @mock.patch('qinling.utils.etcd_util.delete_function') @mock.patch('qinling.storage.file_system.FileSystemStorage.store') @mock.patch('qinling.rpc.EngineClient.delete_function') - def test_put_package(self, mock_delete_func, mock_store): + def test_put_package(self, mock_delete_func, mock_store, mock_etcd_del): db_func = self.create_function( runtime_id=self.runtime_id, prefix=TEST_CASE_NAME ) @@ -120,10 +121,12 @@ class TestFunctionController(base.APITest): self.assertEqual(200, resp.status_int) self.assertEqual(1, mock_store.call_count) mock_delete_func.assert_called_once_with(db_func.id) + mock_etcd_del.assert_called_once_with(db_func.id) + @mock.patch('qinling.utils.etcd_util.delete_function') @mock.patch('qinling.rpc.EngineClient.delete_function') @mock.patch('qinling.storage.file_system.FileSystemStorage.delete') - def test_delete(self, mock_delete, mock_delete_func): + def test_delete(self, mock_delete, mock_delete_func, mock_etcd_delete): db_func = self.create_function( runtime_id=self.runtime_id, prefix=TEST_CASE_NAME ) @@ -134,6 +137,7 @@ class TestFunctionController(base.APITest): unit_base.DEFAULT_PROJECT_ID, db_func.id ) mock_delete_func.assert_called_once_with(db_func.id) + mock_etcd_delete.assert_called_once_with(db_func.id) def test_delete_with_running_job(self): db_func = self.create_function( diff --git a/qinling/tests/unit/api/controllers/v1/test_function_worker.py b/qinling/tests/unit/api/controllers/v1/test_function_worker.py index 79825055..31f1397d 100644 --- a/qinling/tests/unit/api/controllers/v1/test_function_worker.py +++ b/qinling/tests/unit/api/controllers/v1/test_function_worker.py @@ -11,37 +11,29 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import uuid + +import mock -from qinling.db import api as db_api from qinling.tests.unit.api import base TEST_CASE_NAME = 'TestFunctionWorkerController' class TestFunctionWorkerController(base.APITest): - def setUp(self): - super(TestFunctionWorkerController, self).setUp() - - db_func = self.create_function(prefix=TEST_CASE_NAME) - self.function_id = db_func.id - - def test_get_all_workers(self): - db_worker = db_api.create_function_worker( - { - 'function_id': self.function_id, - 'worker_name': 'worker_1', - } - ) - expected = { - "id": db_worker.id, - "function_id": self.function_id, - "worker_name": "worker_1", - } - - resp = self.app.get('/v1/functions/%s/workers' % self.function_id) + @mock.patch('qinling.utils.etcd_util.get_workers') + def test_get_all_workers(self, mock_get_workers): + function_id = str(uuid.uuid4()) + mock_get_workers.return_value = ['test_worker0', 'test_worker1'] + resp = self.app.get('/v1/functions/%s/workers' % function_id) self.assertEqual(200, resp.status_int) - actual = self._assert_single_item( - resp.json['workers'], id=db_worker.id + self._assert_multiple_items( + resp.json['workers'], 2, function_id=function_id + ) + self._assert_single_item( + resp.json['workers'], worker_name='test_worker0' + ) + self._assert_single_item( + resp.json['workers'], worker_name='test_worker1' ) - self._assertDictContainsSubset(actual, expected) diff --git a/qinling/tests/unit/api/controllers/v1/test_runtime.py b/qinling/tests/unit/api/controllers/v1/test_runtime.py index 30a95746..b2b57dbc 100644 --- a/qinling/tests/unit/api/controllers/v1/test_runtime.py +++ b/qinling/tests/unit/api/controllers/v1/test_runtime.py @@ -117,8 +117,9 @@ class TestRuntimeController(base.APITest): self.assertEqual(409, resp.status_int) + @mock.patch('qinling.utils.etcd_util.get_service_url') @mock.patch('qinling.rpc.EngineClient.update_runtime') - def test_put_image(self, mock_update_runtime): + def test_put_image(self, mock_update_runtime, mock_etcd_url): resp = self.app.put_json( '/v1/runtimes/%s' % self.runtime_id, {'image': 'new_image'} ) diff --git a/qinling/tests/unit/base.py b/qinling/tests/unit/base.py index b9c8cafa..efc125c3 100644 --- a/qinling/tests/unit/base.py +++ b/qinling/tests/unit/base.py @@ -22,7 +22,6 @@ from oslotest import base from qinling import config from qinling import context as auth_context from qinling.db import api as db_api -from qinling.db.sqlalchemy import sqlite_lock from qinling import status DEFAULT_PROJECT_ID = 'default' @@ -137,6 +136,7 @@ class DbTestCase(BaseTest): (config.ENGINE_GROUP, config.engine_opts), (config.STORAGE_GROUP, config.storage_opts), (config.KUBERNETES_GROUP, config.kubernetes_opts), + (config.ETCD_GROUP, config.etcd_opts), (None, [config.launch_opt]) ] for group, options in qinling_opts: @@ -160,7 +160,6 @@ class DbTestCase(BaseTest): def _clean_db(self): db_api.delete_all() - sqlite_lock.cleanup() def create_runtime(self, prefix=None): runtime = db_api.create_runtime( diff --git a/qinling/utils/etcd_util.py b/qinling/utils/etcd_util.py new file mode 100644 index 00000000..38620b18 --- /dev/null +++ b/qinling/utils/etcd_util.py @@ -0,0 +1,66 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import uuid + +import etcd3gw +from oslo_config import cfg + +CONF = cfg.CONF +CLIENT = None + + +def get_client(conf=None): + global CLIENT + conf = conf or CONF + + if not CLIENT: + CLIENT = etcd3gw.client(host=conf.etcd.host, port=conf.etcd.port) + + return CLIENT + + +def get_worker_lock(): + client = get_client() + return client.lock(id='function_worker') + + +def create_worker(function_id, worker): + client = get_client() + client.create( + '%s/worker_%s' % (function_id, str(uuid.uuid4())), + worker + ) + + +def get_workers(function_id, conf=None): + client = get_client(conf) + values = client.get_prefix('%s/worker' % function_id) + workers = [w[0] for w in values] + return workers + + +def delete_function(function_id): + client = get_client() + client.delete_prefix(function_id) + + +def create_service_url(function_id, url): + client = get_client() + client.create('%s/service_url' % function_id, url) + + +def get_service_url(function_id): + client = get_client() + return client.get('%s/service_url' % function_id)[0] diff --git a/qinling_tempest_plugin/tests/api/test_executions.py b/qinling_tempest_plugin/tests/api/test_executions.py index 74be3967..43da2875 100644 --- a/qinling_tempest_plugin/tests/api/test_executions.py +++ b/qinling_tempest_plugin/tests/api/test_executions.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from concurrent import futures import os import pkg_resources import tempfile @@ -183,7 +184,7 @@ class ExecutionsTest(base.BaseQinlingTest): self.assertIn('Hello, OpenStack', body) @decorators.idempotent_id('f22097dc-37db-484d-83d3-3a97e72ec576') - def test_execution_concurrency(self): + def test_execution_concurrency_no_scale(self): self._create_function(name='test_python_sleep.py') def _create_execution(): @@ -191,11 +192,11 @@ class ExecutionsTest(base.BaseQinlingTest): return resp, body futs = [] - with futurist.GreenThreadPoolExecutor(max_workers=4) as executor: + with futurist.ThreadPoolExecutor(max_workers=10) as executor: for _ in range(3): fut = executor.submit(_create_execution) futs.append(fut) - for f in futs: + for f in futures.as_completed(futs): # Wait until we get the response resp, body = f.result() @@ -209,6 +210,33 @@ class ExecutionsTest(base.BaseQinlingTest): self.assertEqual(200, resp.status) self.assertEqual(1, len(body['workers'])) + @decorators.idempotent_id('a5ed173a-19b7-4c92-ac78-c8862ad1d1d2') + def test_execution_concurrency_scale_up(self): + self.await_runtime_available(self.runtime_id) + self._create_function(name='test_python_sleep.py') + + def _create_execution(): + resp, body = self.client.create_execution(self.function_id) + return resp, body + + futs = [] + with futurist.ThreadPoolExecutor(max_workers=10) as executor: + for _ in range(6): + fut = executor.submit(_create_execution) + futs.append(fut) + for f in futures.as_completed(futs): + # Wait until we get the response + resp, body = f.result() + + self.assertEqual(201, resp.status) + self.addCleanup(self.client.delete_resource, 'executions', + body['id'], ignore_notfound=True) + self.assertEqual('success', body['status']) + + resp, body = self.admin_client.get_function_workers(self.function_id) + self.assertEqual(200, resp.status) + self.assertEqual(2, len(body['workers'])) + @decorators.idempotent_id('a948382a-84af-4f0e-ad08-4297345e302c') def test_python_execution_file_limit(self): self._create_function(name='test_python_file_limit.py') diff --git a/requirements.txt b/requirements.txt index ce724245..76dc4a45 100644 --- a/requirements.txt +++ b/requirements.txt @@ -30,3 +30,4 @@ croniter>=0.3.4 # MIT License python-dateutil>=2.4.2 # BSD tenacity>=3.2.1 # Apache-2.0 PyMySQL>=0.7.6 # MIT License +etcd3gw>=0.2.0 # Apache-2.0 diff --git a/runtimes/python2/server.py b/runtimes/python2/server.py index b8d0ad81..8480777f 100644 --- a/runtimes/python2/server.py +++ b/runtimes/python2/server.py @@ -98,6 +98,7 @@ def execute(): download_url = params.get('download_url') function_id = params.get('function_id') entry = params.get('entry') + request_id = params.get('request_id') trust_id = params.get('trust_id') auth_url = params.get('auth_url') username = params.get('username') @@ -109,8 +110,9 @@ def execute(): function_module, function_method = tuple(entry.rsplit('.', 1)) app.logger.info( - 'Request received, execution_id:%s, input: %s, auth_url: %s' % - (execution_id, input, auth_url) + 'Request received, request_id: %s, execution_id: %s, input: %s, ' + 'auth_url: %s' % + (request_id, execution_id, input, auth_url) ) while downloading: @@ -213,6 +215,11 @@ def execute(): ) +@app.route('/ping') +def ping(): + return 'pong' + + setup_logger(logging.DEBUG) app.logger.info("Starting server")