diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 098930ed..f6e0227f 100755 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -24,8 +24,10 @@ function install_k8s { source tools/gate/setup_gate.sh popd - # Pre-pull the default docker image for python runtime + # Pre-pull the default docker image for python runtime and image function + # test. sudo docker pull $QINLING_PYTHON_RUNTIME_IMAGE + sudo docker pull openstackqinling/alpine-test } diff --git a/qinling/api/controllers/v1/function.py b/qinling/api/controllers/v1/function.py index 8566c48c..635f2dc7 100644 --- a/qinling/api/controllers/v1/function.py +++ b/qinling/api/controllers/v1/function.py @@ -165,15 +165,19 @@ class FunctionsController(rest.RestController): ) store = False - if values['code']['source'] == constants.PACKAGE_FUNCTION: + create_trust = True + if source == constants.PACKAGE_FUNCTION: store = True data = kwargs['package'].file.read() - elif values['code']['source'] == constants.SWIFT_FUNCTION: + elif source == constants.SWIFT_FUNCTION: swift_info = values['code'].get('swift', {}) self._check_swift(swift_info.get('container'), swift_info.get('object')) + else: + create_trust = False + values['entry'] = None - if cfg.CONF.pecan.auth_enable: + if cfg.CONF.pecan.auth_enable and create_trust: try: values['trust_id'] = keystone_util.create_trust().id LOG.debug('Trust %s created', values['trust_id']) @@ -187,7 +191,6 @@ class FunctionsController(rest.RestController): if store: ctx = context.get_ctx() - self.storage_provider.store( ctx.projectid, func_db.id, @@ -219,7 +222,6 @@ class FunctionsController(rest.RestController): project_id=project_id, ) LOG.info("Get all %ss. filters=%s", self.type, filters) - db_functions = db_api.get_functions(insecure=all_projects, **filters) functions = [resources.Function.from_dict(db_model.to_dict()) for db_model in db_functions] diff --git a/qinling/api/controllers/v1/resources.py b/qinling/api/controllers/v1/resources.py index 8efcfa36..9d5368ed 100644 --- a/qinling/api/controllers/v1/resources.py +++ b/qinling/api/controllers/v1/resources.py @@ -279,12 +279,28 @@ class Execution(Resource): description = wtypes.text status = wsme.wsattr(wtypes.text, readonly=True) sync = bool - input = types.jsontype - output = wsme.wsattr(types.jsontype, readonly=True) + input = wtypes.text + result = wsme.wsattr(types.jsontype, readonly=True) project_id = wsme.wsattr(wtypes.text, readonly=True) created_at = wsme.wsattr(wtypes.text, readonly=True) updated_at = wsme.wsattr(wtypes.text, readonly=True) + @classmethod + def from_dict(cls, d): + obj = cls() + + for key, val in d.items(): + if key == 'input' and val: + if val.get('__function_input'): + setattr(obj, key, val.get('__function_input')) + else: + setattr(obj, key, json.dumps(val)) + continue + if hasattr(obj, key): + setattr(obj, key, val) + + return obj + @classmethod def sample(cls): return cls( @@ -294,7 +310,7 @@ class Execution(Resource): status='success', sync=True, input={'data': 'hello, world'}, - output={'result': 'hello, world'}, + result={'result': 'hello, world'}, project_id='default', created_at='1970-01-01T00:00:00.000000', updated_at='1970-01-01T00:00:00.000000' diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py index 6d4048a6..3642c58d 100644 --- a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py +++ b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py @@ -71,7 +71,7 @@ def upgrade(): sa.Column('memory_size', sa.Integer, nullable=True), sa.Column('timeout', sa.Integer, nullable=True), sa.Column('code', st.JsonLongDictType(), nullable=False), - sa.Column('entry', sa.String(length=80), nullable=False), + sa.Column('entry', sa.String(length=80), nullable=True), sa.Column('count', sa.Integer, nullable=False), sa.Column('trust_id', sa.String(length=80), nullable=True), sa.PrimaryKeyConstraint('id'), @@ -90,7 +90,7 @@ def upgrade(): sa.Column('status', sa.String(length=32), nullable=False), sa.Column('sync', sa.BOOLEAN, nullable=False), sa.Column('input', st.JsonLongDictType(), nullable=True), - sa.Column('output', st.JsonLongDictType(), nullable=True), + sa.Column('result', st.JsonLongDictType(), nullable=True), sa.Column('logs', sa.Text(), nullable=True), sa.PrimaryKeyConstraint('id'), info={"check_ifexists": True} diff --git a/qinling/db/sqlalchemy/models.py b/qinling/db/sqlalchemy/models.py index 8e537100..9c536c32 100644 --- a/qinling/db/sqlalchemy/models.py +++ b/qinling/db/sqlalchemy/models.py @@ -40,12 +40,12 @@ class Function(model_base.QinlingSecureModelBase): ) # We want to get runtime info when we query function runtime = relationship( - 'Runtime', back_populates="functions", innerjoin=True, lazy='joined' + 'Runtime', back_populates="functions", innerjoin=True, lazy='select' ) memory_size = sa.Column(sa.Integer) timeout = sa.Column(sa.Integer) code = sa.Column(st.JsonLongDictType(), nullable=False) - entry = sa.Column(sa.String(80), nullable=False) + entry = sa.Column(sa.String(80), nullable=True) count = sa.Column(sa.Integer, default=0) trust_id = sa.Column(sa.String(80)) @@ -57,7 +57,7 @@ class Execution(model_base.QinlingSecureModelBase): status = sa.Column(sa.String(32), nullable=False) sync = sa.Column(sa.BOOLEAN, default=True) input = sa.Column(st.JsonLongDictType()) - output = sa.Column(st.JsonLongDictType()) + result = sa.Column(st.JsonLongDictType()) description = sa.Column(sa.String(255)) logs = sa.Column(sa.Text(), nullable=True) diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index f6d53423..5768a942 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -157,7 +157,7 @@ class DefaultEngine(object): { 'status': status.SUCCESS if success else status.FAILED, 'logs': res.pop('logs', ''), - 'output': res + 'result': res } ) return @@ -196,7 +196,7 @@ class DefaultEngine(object): logs = res.pop('logs', '') success = success and res.pop('success') else: - # If the function is created from docker image, the output is + # If the function is created from docker image, the result is # direct output, here we convert to a dict to fit into the db # schema. res = {'output': res} @@ -210,7 +210,7 @@ class DefaultEngine(object): { 'status': status.SUCCESS if success else status.FAILED, 'logs': logs, - 'output': res + 'result': res } ) diff --git a/qinling/engine/service.py b/qinling/engine/service.py index dd530b92..21062e2b 100644 --- a/qinling/engine/service.py +++ b/qinling/engine/service.py @@ -35,22 +35,18 @@ class EngineService(service.Service): def start(self): orchestrator = orchestra_base.load_orchestrator(CONF) - db_api.setup_db() - LOG.info('Starting periodic tasks...') - periodics.start_function_mapping_handler(orchestrator) - topic = CONF.engine.topic server = CONF.engine.host transport = messaging.get_rpc_transport(CONF) target = messaging.Target(topic=topic, server=server, fanout=False) - endpoints = [engine.DefaultEngine(orchestrator)] + endpoint = engine.DefaultEngine(orchestrator) access_policy = dispatcher.DefaultRPCAccessPolicy self.server = messaging.get_rpc_server( transport, target, - endpoints, + [endpoint], executor='eventlet', access_policy=access_policy, serializer=rpc.ContextSerializer( @@ -58,6 +54,9 @@ class EngineService(service.Service): ) ) + LOG.info('Starting function mapping periodic task...') + periodics.start_function_mapping_handler(endpoint) + LOG.info('Starting engine...') self.server.start() diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index 9ce0c914..b5fc8e6f 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -13,6 +13,7 @@ # limitations under the License. import copy +import json import os import time @@ -315,12 +316,19 @@ class KubernetesManager(base.OrchestratorBase): return pod_name, pod_service_url def _create_pod(self, image, pod_name, labels, input): + if not input: + input_list = [] + elif input.get('__function_input'): + input_list = input.get('__function_input').split() + else: + input_list = [json.dumps(input)] + pod_body = self.pod_template.render( { "pod_name": pod_name, "labels": labels, "pod_image": image, - "input": input + "input": input_list } ) @@ -386,6 +394,7 @@ class KubernetesManager(base.OrchestratorBase): def run_execution(self, execution_id, function_id, input=None, identifier=None, service_url=None, entry='main.main', trust_id=None): + """Run execution and get output.""" if service_url: func_url = '%s/execute' % service_url data = utils.get_request_data( @@ -398,24 +407,34 @@ class KubernetesManager(base.OrchestratorBase): return utils.url_request(self.session, func_url, body=data) else: - status = None - - # Wait for execution to be finished. - # TODO(kong): Do not retry infinitely. - while status != 'Succeeded': + def _wait_complete(): pod = self.v1.read_namespaced_pod( identifier, self.conf.kubernetes.namespace ) status = pod.status.phase - time.sleep(0.5) + return True if status == 'Succeeded' else False + + try: + r = tenacity.Retrying( + wait=tenacity.wait_fixed(1), + stop=tenacity.stop_after_delay(180), + retry=tenacity.retry_if_result( + lambda result: result is False) + ) + r.call(_wait_complete) + except Exception as e: + LOG.exception( + "Failed to get pod output, pod: %s, error: %s", + identifier, str(e) + ) + return False, {'error': 'Function execution failed.'} output = self.v1.read_namespaced_pod_log( identifier, self.conf.kubernetes.namespace, ) - - return output + return True, output def delete_function(self, function_id, labels=None): selector = common.convert_dict_to_string(labels) diff --git a/qinling/services/periodics.py b/qinling/services/periodics.py index 914d14b2..849a4536 100644 --- a/qinling/services/periodics.py +++ b/qinling/services/periodics.py @@ -36,9 +36,13 @@ CONF = cfg.CONF _periodic_tasks = {} -def handle_function_service_expiration(ctx, engine_client, orchestrator): - context.set_ctx(ctx) +def handle_function_service_expiration(ctx, engine): + """Clean up resources related to expired functions. + If it's image function, we will rely on the orchestrator itself to do the + image clean up, e.g. image collection feature in kubernetes. + """ + context.set_ctx(ctx) delta = timedelta(seconds=CONF.engine.function_service_expiration) expiry_time = datetime.utcnow() - delta @@ -60,8 +64,7 @@ def handle_function_service_expiration(ctx, engine_client, orchestrator): ) # Delete resources related to the function - engine_client.delete_function(func_db.id) - + engine.delete_function(func_db.id) # Delete etcd keys etcd_util.delete_function(func_db.id) @@ -144,16 +147,13 @@ def handle_job(engine_client): context.set_ctx(None) -def start_function_mapping_handler(orchestrator): +def start_function_mapping_handler(engine): tg = threadgroup.ThreadGroup(1) - engine_client = rpc.get_engine_client() - tg.add_timer( 300, handle_function_service_expiration, ctx=context.Context(), - engine_client=engine_client, - orchestrator=orchestrator + engine=engine, ) _periodic_tasks[constants.PERIODIC_FUNC_MAPPING_HANDLER] = tg diff --git a/qinling/utils/executions.py b/qinling/utils/executions.py index 21989f83..72b76e15 100644 --- a/qinling/utils/executions.py +++ b/qinling/utils/executions.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import json from oslo_log import log as logging @@ -23,13 +24,14 @@ LOG = logging.getLogger(__name__) def _update_function_db(function_id): - # NOTE(kong): Store function info in cache? - func_db = db_api.get_function(function_id) - runtime_db = func_db.runtime - if runtime_db and runtime_db.status != status.AVAILABLE: - raise exc.RuntimeNotAvailableException( - 'Runtime %s is not available.' % func_db.runtime_id - ) + with db_api.transaction(): + # NOTE(kong): Store function info in cache? + func_db = db_api.get_function(function_id) + runtime_db = func_db.runtime + if runtime_db and runtime_db.status != status.AVAILABLE: + raise exc.RuntimeNotAvailableException( + 'Runtime %s is not available.' % func_db.runtime_id + ) # Function update is done using UPDATE ... FROM ... WHERE # non-locking clause. @@ -59,6 +61,12 @@ def _update_function_db(function_id): def create_execution(engine_client, params): function_id = params['function_id'] is_sync = params.get('sync', True) + input = params.get('input') + if input: + try: + params['input'] = json.loads(input) + except ValueError: + params['input'] = {'__function_input': input} runtime_id = _update_function_db(function_id) diff --git a/qinling_tempest_plugin/functions/test_python_positional_args.py b/qinling_tempest_plugin/functions/test_python_positional_args.py new file mode 100644 index 00000000..b4b75fc5 --- /dev/null +++ b/qinling_tempest_plugin/functions/test_python_positional_args.py @@ -0,0 +1,17 @@ +# Copyright 2017 Catalyst IT Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def main(name, **kwargs): + return 'Hello, %s' % name diff --git a/qinling_tempest_plugin/tests/api/test_executions.py b/qinling_tempest_plugin/tests/api/test_executions.py index 43da2875..cf55323b 100644 --- a/qinling_tempest_plugin/tests/api/test_executions.py +++ b/qinling_tempest_plugin/tests/api/test_executions.py @@ -88,7 +88,7 @@ class ExecutionsTest(base.BaseQinlingTest): self._create_function() resp, body = self.client.create_execution(self.function_id, - input={'name': 'Qinling'}) + input='{"name": "Qinling"}') self.assertEqual(201, resp.status) @@ -117,8 +117,9 @@ class ExecutionsTest(base.BaseQinlingTest): """Admin user can get executions of other projects""" self._create_function() - resp, body = self.client.create_execution(self.function_id, - input={'name': 'Qinling'}) + resp, body = self.client.create_execution( + self.function_id, input='{"name": "Qinling"}' + ) self.assertEqual(201, resp.status) execution_id = body['id'] @@ -160,15 +161,14 @@ class ExecutionsTest(base.BaseQinlingTest): execution_id, ignore_notfound=True) self.assertEqual('running', body['status']) - self.await_execution_success(execution_id) @decorators.idempotent_id('6cb47b1d-a8c6-48f2-a92f-c4f613c33d1c') def test_execution_log(self): self._create_function() - - resp, body = self.client.create_execution(self.function_id, - input={'name': 'OpenStack'}) + resp, body = self.client.create_execution( + self.function_id, input='{"name": "OpenStack"}' + ) self.assertEqual(201, resp.status) self.addCleanup(self.client.delete_resource, 'executions', @@ -237,6 +237,20 @@ class ExecutionsTest(base.BaseQinlingTest): self.assertEqual(200, resp.status) self.assertEqual(2, len(body['workers'])) + @decorators.idempotent_id('ccfe67ce-e467-11e7-916c-00224d6b7bc1') + def test_python_execution_positional_args(self): + self._create_function(name='test_python_positional_args.py') + resp, body = self.client.create_execution(self.function_id, + input='Qinling') + + self.assertEqual(201, resp.status) + self.addCleanup(self.client.delete_resource, 'executions', + body['id'], ignore_notfound=True) + self.assertEqual('success', body['status']) + + result = jsonutils.loads(body['result']) + self.assertIn('Qinling', result['output']) + @decorators.idempotent_id('a948382a-84af-4f0e-ad08-4297345e302c') def test_python_execution_file_limit(self): self._create_function(name='test_python_file_limit.py') @@ -248,9 +262,9 @@ class ExecutionsTest(base.BaseQinlingTest): body['id'], ignore_notfound=True) self.assertEqual('failed', body['status']) - output = jsonutils.loads(body['output']) + result = jsonutils.loads(body['result']) self.assertIn( - 'Too many open files', output['output'] + 'Too many open files', result['output'] ) @decorators.idempotent_id('bf6f8f35-fa88-469b-8878-7aa85a8ce5ab') @@ -264,7 +278,20 @@ class ExecutionsTest(base.BaseQinlingTest): body['id'], ignore_notfound=True) self.assertEqual('failed', body['status']) - output = jsonutils.loads(body['output']) + result = jsonutils.loads(body['result']) self.assertIn( - 'too much resource consumption', output['output'] + 'too much resource consumption', result['output'] ) + + @decorators.idempotent_id('d0598868-e45d-11e7-9125-00224d6b7bc1') + def test_execution_image_function(self): + function_id = self.create_function(image=True) + resp, body = self.client.create_execution(function_id, + input='Qinling') + + self.assertEqual(201, resp.status) + execution_id = body['id'] + self.addCleanup(self.client.delete_resource, 'executions', + execution_id, ignore_notfound=True) + self.assertEqual('success', body['status']) + self.assertIn('Qinling', jsonutils.loads(body['result'])['output']) diff --git a/qinling_tempest_plugin/tests/api/test_functions.py b/qinling_tempest_plugin/tests/api/test_functions.py index 8045712b..ddd7efdb 100644 --- a/qinling_tempest_plugin/tests/api/test_functions.py +++ b/qinling_tempest_plugin/tests/api/test_functions.py @@ -152,8 +152,9 @@ class FunctionsTest(base.BaseQinlingTest): def test_detach(self): """Admin only operation.""" function_id = self.create_function(self.python_zip_file) - resp, _ = self.client.create_execution(function_id, - input={'name': 'Qinling'}) + resp, _ = self.client.create_execution( + function_id, input='{"name": "Qinling"}' + ) self.assertEqual(201, resp.status) resp, body = self.admin_client.get_function_workers(function_id) diff --git a/qinling_tempest_plugin/tests/base.py b/qinling_tempest_plugin/tests/base.py index 6a9fcc7b..5b9c04dc 100644 --- a/qinling_tempest_plugin/tests/base.py +++ b/qinling_tempest_plugin/tests/base.py @@ -68,24 +68,31 @@ class BaseQinlingTest(test.BaseTestCase): self.assertEqual(200, resp.status) self.assertEqual('success', body['status']) - def create_function(self, package_path): + def create_function(self, package_path=None, image=False): function_name = data_utils.rand_name('function', prefix=self.name_prefix) - base_name, _ = os.path.splitext(package_path) - module_name = os.path.basename(base_name) - with open(package_path, 'rb') as package_data: + if not image: + base_name, _ = os.path.splitext(package_path) + module_name = os.path.basename(base_name) + with open(package_path, 'rb') as package_data: + resp, body = self.client.create_function( + {"source": "package"}, + self.runtime_id, + name=function_name, + package_data=package_data, + entry='%s.main' % module_name + ) + self.addCleanup(os.remove, package_path) + else: resp, body = self.client.create_function( - {"source": "package"}, - self.runtime_id, + {"source": "image", "image": "openstackqinling/alpine-test"}, + None, name=function_name, - package_data=package_data, - entry='%s.main' % module_name ) self.assertEqual(201, resp.status_code) function_id = body['id'] - self.addCleanup(os.remove, package_path) self.addCleanup(self.client.delete_resource, 'functions', function_id, ignore_notfound=True) diff --git a/qinling_tempest_plugin/tests/scenario/test_basic_ops.py b/qinling_tempest_plugin/tests/scenario/test_basic_ops.py index 2adbb5ea..867da7ad 100644 --- a/qinling_tempest_plugin/tests/scenario/test_basic_ops.py +++ b/qinling_tempest_plugin/tests/scenario/test_basic_ops.py @@ -93,8 +93,9 @@ class BasicOpsTest(base.BaseQinlingTest): function_id, ignore_notfound=True) # Invoke function - resp, body = self.client.create_execution(function_id, - input={'name': 'Qinling'}) + resp, body = self.client.create_execution( + function_id, input='{"name": "Qinling"}' + ) self.assertEqual(201, resp.status) self.assertEqual('success', body['status']) diff --git a/runtimes/python2/server.py b/runtimes/python2/server.py index 8480777f..27bb7824 100644 --- a/runtimes/python2/server.py +++ b/runtimes/python2/server.py @@ -53,7 +53,7 @@ def _print_trace(): print(''.join(line for line in lines)) -def _invoke_function(execution_id, zip_file, module_name, method, input, +def _invoke_function(execution_id, zip_file, module_name, method, arg, input, return_dict): """Thie function is supposed to be running in a child process.""" sys.path.insert(0, zip_file) @@ -64,7 +64,7 @@ def _invoke_function(execution_id, zip_file, module_name, method, input, try: module = importlib.import_module(module_name) func = getattr(module, method) - return_dict['result'] = func(**input) + return_dict['result'] = func(arg, **input) if arg else func(**input) return_dict['success'] = True except Exception as e: _print_trace() @@ -180,7 +180,7 @@ def execute(): p = Process( target=_invoke_function, args=(execution_id, zip_file, function_module, function_method, - input, return_dict) + input.pop('__function_input', None), input, return_dict) ) p.start() p.join() @@ -224,4 +224,4 @@ setup_logger(logging.DEBUG) app.logger.info("Starting server") # Just for testing purpose -app.run(host='0.0.0.0', port='9090', threaded=True) +app.run(host='0.0.0.0', port=9090, threaded=True) diff --git a/tools/clear_resources.sh b/tools/clear_resources.sh index 6ec09754..0db81143 100644 --- a/tools/clear_resources.sh +++ b/tools/clear_resources.sh @@ -15,7 +15,7 @@ function delete_resources(){ ids=$(openstack function execution list -f yaml -c Id | awk '{print $3}') for id in $ids do - openstack function execution delete $id + openstack function execution delete --execution $id done # Delete functions