diff --git a/qinling/api/controllers/v1/function_version.py b/qinling/api/controllers/v1/function_version.py index c8db3149..25847d28 100644 --- a/qinling/api/controllers/v1/function_version.py +++ b/qinling/api/controllers/v1/function_version.py @@ -56,12 +56,16 @@ class FunctionVersionsController(rest.RestController): @tenacity.retry( wait=tenacity.wait_fixed(1), stop=tenacity.stop_after_attempt(30), - retry=(tenacity.retry_if_result(lambda result: result is False)) + reraise=True, + retry=tenacity.retry_if_exception_type(exc.EtcdLockException) ) def _create_function_version(self, project_id, function_id, **kwargs): with etcd_util.get_function_version_lock(function_id) as lock: if not lock.is_acquired(): - return False + raise exc.EtcdLockException( + "Etcd: failed to acquire version lock for function %s." % + function_id + ) with db_api.transaction(): # Get latest function package md5 and version number @@ -132,8 +136,15 @@ class FunctionVersionsController(rest.RestController): } # Try to create a new function version within lock and db transaction - version = self._create_function_version(ctx.project_id, function_id, - **values) + try: + version = self._create_function_version( + ctx.project_id, function_id, **values + ) + except exc.EtcdLockException as e: + LOG.exception(str(e)) + # Reraise a generic exception as the end users should not know + # the underlying details. + raise exc.QinlingException('Internal server error.') return resources.FunctionVersion.from_db_obj(version) diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index bb8c81dc..f9da3365 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -92,7 +92,8 @@ class DefaultEngine(object): @tenacity.retry( wait=tenacity.wait_fixed(1), stop=tenacity.stop_after_attempt(30), - retry=(tenacity.retry_if_result(lambda result: result is False)) + reraise=True, + retry=tenacity.retry_if_exception_type(exc.EtcdLockException) ) def function_load_check(self, function_id, version, runtime_id): """Check function load and scale the workers if needed. @@ -101,7 +102,10 @@ class DefaultEngine(object): """ with etcd_util.get_worker_lock(function_id, version) as lock: if not lock.is_acquired(): - return False + raise exc.EtcdLockException( + 'Etcd: failed to get worker lock for function %s' + '(version %s).' % (function_id, version) + ) workers = etcd_util.get_workers(function_id, version) running_execs = db_api.get_executions( @@ -149,7 +153,10 @@ class DefaultEngine(object): svc_url = self.function_load_check(function_id, function_version, runtime_id) - except exc.OrchestratorException as e: + except ( + exc.OrchestratorException, + exc.EtcdLockException + ) as e: utils.handle_execution_exception(execution_id, str(e)) return diff --git a/qinling/engine/utils.py b/qinling/engine/utils.py index 7db0fb3b..1a6de7f4 100644 --- a/qinling/engine/utils.py +++ b/qinling/engine/utils.py @@ -38,6 +38,7 @@ def url_request(request_session, url, body=None): r = tenacity.Retrying( wait=tenacity.wait_fixed(1), stop=tenacity.stop_after_attempt(30), + reraise=True, retry=tenacity.retry_if_exception_type(IOError) ) r.call(request_session.get, ping_url, timeout=(3, 3), verify=False) @@ -150,9 +151,10 @@ def finish_execution(execution_id, success, res, is_image_source=False): def handle_execution_exception(execution_id, exc_str): - LOG.error( + # This method should be called from an exception handler + LOG.exception( 'Error running execution %s: %s', execution_id, exc_str ) db_set_execution_status( - execution_id, status.ERROR, '', {'EngineError': exc_str} + execution_id, status.ERROR, '', {'error': 'Function execution failed.'} ) diff --git a/qinling/exceptions.py b/qinling/exceptions.py index 15a31117..e8db68b9 100644 --- a/qinling/exceptions.py +++ b/qinling/exceptions.py @@ -116,3 +116,8 @@ class TrustFailedException(QinlingException): class SwiftException(QinlingException): http_code = 500 message = "Failed to communicate with Swift." + + +class EtcdLockException(QinlingException): + http_code = 409 + message = 'Etcd lock failed' diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index fe3900c4..ff74f54b 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -90,7 +90,8 @@ class KubernetesManager(base.OrchestratorBase): @tenacity.retry( wait=tenacity.wait_fixed(2), stop=tenacity.stop_after_delay(600), - retry=tenacity.retry_if_result(lambda result: not result) + reraise=True, + retry=tenacity.retry_if_exception_type(exc.OrchestratorException) ) def _wait_deployment_available(self, name): ret = self.v1extension.read_namespaced_deployment( @@ -98,10 +99,11 @@ class KubernetesManager(base.OrchestratorBase): self.conf.kubernetes.namespace ) - if not ret.status.replicas: - return False - - return ret.status.replicas == ret.status.available_replicas + if ( + not ret.status.replicas or + ret.status.replicas != ret.status.available_replicas + ): + raise exc.OrchestratorException('Deployment %s not ready.' % name) def get_pool(self, name): total = 0 diff --git a/qinling/tests/unit/api/controllers/v1/test_function_version.py b/qinling/tests/unit/api/controllers/v1/test_function_version.py index 62f27d86..048920f9 100644 --- a/qinling/tests/unit/api/controllers/v1/test_function_version.py +++ b/qinling/tests/unit/api/controllers/v1/test_function_version.py @@ -101,6 +101,19 @@ class TestFunctionVersionController(base.APITest): self.assertEqual(403, resp.status_int) + @mock.patch('qinling.utils.etcd_util.get_function_version_lock') + def test_post_etcd_lock_failed(self, mock_etcd_lock): + lock = mock.Mock() + mock_etcd_lock.return_value.__enter__.return_value = lock + lock.is_acquired.return_value = False + + body = {'description': 'new version'} + resp = self.app.post_json('/v1/functions/%s/versions' % self.func_id, + body, expect_errors=True) + + self.assertEqual(500, resp.status_int) + self.assertEqual("Internal server error.", resp.json['faultstring']) + def test_get_all(self): db_api.increase_function_version(self.func_id, 0, description="version 1") diff --git a/qinling/tests/unit/engine/test_default_engine.py b/qinling/tests/unit/engine/test_default_engine.py index f730b9a0..3e6feb27 100644 --- a/qinling/tests/unit/engine/test_default_engine.py +++ b/qinling/tests/unit/engine/test_default_engine.py @@ -198,6 +198,25 @@ class TestDefaultEngine(base.DbTestCase): self.assertEqual(3, lock.is_acquired.call_count) mock_getworkers.assert_called_once_with(function_id, 0) + @mock.patch('qinling.utils.etcd_util.get_worker_lock') + def test_function_load_check_failed_to_get_worker_lock(self, mock_getlock): + function = self.create_function() + function_id = function.id + runtime_id = function.runtime_id + function_version = 0 + lock = mock.Mock() + # Lock is never acquired. + lock.is_acquired.return_value = False + mock_getlock.return_value.__enter__.return_value = lock + + self.assertRaisesRegex( + exc.EtcdLockException, + "^Etcd: failed to get worker lock for function %s" + "\(version %s\)\.$" % (function_id, function_version), + self.default_engine.function_load_check, + function_id, function_version, runtime_id + ) + @mock.patch('qinling.utils.etcd_util.get_service_url') def test_create_execution_image_type_function(self, mock_svc_url): """Create 2 executions for an image type function.""" @@ -327,7 +346,7 @@ class TestDefaultEngine(base.DbTestCase): self.assertEqual(execution.status, status.ERROR) self.assertEqual(execution.logs, '') self.assertEqual(execution.result, - {'EngineError': 'Exception in prepare_execution'}) + {'error': 'Function execution failed.'}) @mock.patch('qinling.utils.etcd_util.get_service_url') def test_create_execution_package_type_function( @@ -389,7 +408,7 @@ class TestDefaultEngine(base.DbTestCase): self.assertEqual(execution.status, status.ERROR) self.assertEqual(execution.logs, '') self.assertEqual(execution.result, - {'EngineError': 'Exception in scaleup_function'}) + {'error': 'Function execution failed.'}) @mock.patch('qinling.engine.utils.get_request_data') @mock.patch('qinling.engine.utils.url_request') diff --git a/qinling/tests/unit/orchestrator/kubernetes/test_manager.py b/qinling/tests/unit/orchestrator/kubernetes/test_manager.py index 25d7f164..4bca6e72 100644 --- a/qinling/tests/unit/orchestrator/kubernetes/test_manager.py +++ b/qinling/tests/unit/orchestrator/kubernetes/test_manager.py @@ -188,8 +188,11 @@ class TestKubernetesManager(base.DbTestCase): fake_deployment_name = self.rand_name('deployment', prefix=self.prefix) fake_image = self.rand_name('image', prefix=self.prefix) - self.manager.create_pool(fake_deployment_name, fake_image) - + self.assertRaisesRegex( + exc.OrchestratorException, + "^Deployment %s not ready\.$" % fake_deployment_name, + self.manager.create_pool, + fake_deployment_name, fake_image) self.assertLess( 200, # Default timeout is 600s with wait interval set to 2s. self.k8s_v1_ext.read_namespaced_deployment.call_count) diff --git a/qinling_tempest_plugin/config.py b/qinling_tempest_plugin/config.py index 8e7dac16..a6d3edb5 100644 --- a/qinling_tempest_plugin/config.py +++ b/qinling_tempest_plugin/config.py @@ -45,4 +45,10 @@ QinlingGroup = [ cfg.StrOpt("nodejs_runtime_image", default="openstackqinling/nodejs-runtime:0.0.1", help="The NodeJS runtime being used in the tests."), + cfg.StrOpt('etcd_host', + default='127.0.0.1', + help='Etcd service host address.'), + cfg.PortOpt('etcd_port', + default=2379, + help='Etcd service port.'), ] diff --git a/qinling_tempest_plugin/tests/api/test_executions.py b/qinling_tempest_plugin/tests/api/test_executions.py index 174df339..0879bc85 100644 --- a/qinling_tempest_plugin/tests/api/test_executions.py +++ b/qinling_tempest_plugin/tests/api/test_executions.py @@ -11,16 +11,20 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from concurrent import futures +import etcd3gw import json import futurist from oslo_serialization import jsonutils +from tempest import config from tempest.lib import decorators from tempest.lib import exceptions from qinling_tempest_plugin.tests import base +CONF = config.CONF INVOKE_ERROR = "Function execution failed because of too much resource " \ "consumption" @@ -64,6 +68,32 @@ class ExecutionsTest(base.BaseQinlingTest): resp = self.client.delete_resource('executions', execution_id_2) self.assertEqual(204, resp.status) + @decorators.idempotent_id('6a388918-86eb-4e10-88e2-0032a7df38e9') + def test_create_execution_worker_lock_failed(self): + """test_create_execution_worker_lock_failed + + When creating an execution, the qinling-engine will check the load + and try to scaleup the function if needed. A lock is required when + doing this check. + + In this test we acquire the lock manually, so that qinling will fail + to acquire the lock. + """ + function_id = self.create_function() + + etcd3_client = etcd3gw.client(host=CONF.qinling.etcd_host, + port=CONF.qinling.etcd_port) + lock_id = "function_worker_%s_%s" % (function_id, 0) + with etcd3_client.lock(id=lock_id): + resp, body = self.client.create_execution( + function_id, input='{"name": "Qinling"}' + ) + + self.assertEqual(201, resp.status) + self.assertEqual('error', body['status']) + result = jsonutils.loads(body['result']) + self.assertEqual('Function execution failed.', result['error']) + @decorators.idempotent_id('2199d1e6-de7d-4345-8745-a8184d6022b1') def test_get_all_admin(self): """Admin user can get executions of other projects""" diff --git a/qinling_tempest_plugin/tests/api/test_function_versions.py b/qinling_tempest_plugin/tests/api/test_function_versions.py index 7c84862b..210ca8e9 100644 --- a/qinling_tempest_plugin/tests/api/test_function_versions.py +++ b/qinling_tempest_plugin/tests/api/test_function_versions.py @@ -11,12 +11,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +import etcd3gw +from tempest import config from tempest.lib import decorators from tempest.lib import exceptions import tenacity from qinling_tempest_plugin.tests import base +CONF = config.CONF + class FunctionVersionsTest(base.BaseQinlingTest): name_prefix = 'FunctionVersionsTest' @@ -87,6 +92,28 @@ class FunctionVersionsTest(base.BaseQinlingTest): function_id ) + @decorators.idempotent_id('78dc5552-fcb8-4b27-86f7-5f3d96143934') + def test_create_version_lock_failed(self): + """test_create_version_lock_failed + + Creating a function requires a lock. If qinling failed to acquire the + lock then an error would be returned after some retries. + + In this test we acquire the lock manually, so that qinling will fail + to acquire the lock. + """ + function_id = self.create_function() + + etcd3_client = etcd3gw.client(host=CONF.qinling.etcd_host, + port=CONF.qinling.etcd_port) + lock_id = "function_version_%s" % function_id + with etcd3_client.lock(id=lock_id): + self.assertRaises( + exceptions.ServerFault, + self.client.create_function_version, + function_id + ) + @decorators.idempotent_id('43c06f41-d116-43a7-a61c-115f7591b22e') def test_get_by_admin(self): """Admin user can get normal user's function version."""