Raise specific exceptions instead of tenacity.RetryError

Story: 2003032
Task: 23060
Change-Id: I1b3222c3395ea8e5df46ab7455d919f3d0e606ea
This commit is contained in:
Hunt Xu 2018-07-18 18:19:42 +08:00
parent 69a61e5b55
commit f76ffc91d6
11 changed files with 143 additions and 18 deletions

View File

@ -56,12 +56,16 @@ class FunctionVersionsController(rest.RestController):
@tenacity.retry( @tenacity.retry(
wait=tenacity.wait_fixed(1), wait=tenacity.wait_fixed(1),
stop=tenacity.stop_after_attempt(30), stop=tenacity.stop_after_attempt(30),
retry=(tenacity.retry_if_result(lambda result: result is False)) reraise=True,
retry=tenacity.retry_if_exception_type(exc.EtcdLockException)
) )
def _create_function_version(self, project_id, function_id, **kwargs): def _create_function_version(self, project_id, function_id, **kwargs):
with etcd_util.get_function_version_lock(function_id) as lock: with etcd_util.get_function_version_lock(function_id) as lock:
if not lock.is_acquired(): if not lock.is_acquired():
return False raise exc.EtcdLockException(
"Etcd: failed to acquire version lock for function %s." %
function_id
)
with db_api.transaction(): with db_api.transaction():
# Get latest function package md5 and version number # Get latest function package md5 and version number
@ -132,8 +136,15 @@ class FunctionVersionsController(rest.RestController):
} }
# Try to create a new function version within lock and db transaction # Try to create a new function version within lock and db transaction
version = self._create_function_version(ctx.project_id, function_id, try:
**values) version = self._create_function_version(
ctx.project_id, function_id, **values
)
except exc.EtcdLockException as e:
LOG.exception(str(e))
# Reraise a generic exception as the end users should not know
# the underlying details.
raise exc.QinlingException('Internal server error.')
return resources.FunctionVersion.from_db_obj(version) return resources.FunctionVersion.from_db_obj(version)

View File

@ -92,7 +92,8 @@ class DefaultEngine(object):
@tenacity.retry( @tenacity.retry(
wait=tenacity.wait_fixed(1), wait=tenacity.wait_fixed(1),
stop=tenacity.stop_after_attempt(30), stop=tenacity.stop_after_attempt(30),
retry=(tenacity.retry_if_result(lambda result: result is False)) reraise=True,
retry=tenacity.retry_if_exception_type(exc.EtcdLockException)
) )
def function_load_check(self, function_id, version, runtime_id): def function_load_check(self, function_id, version, runtime_id):
"""Check function load and scale the workers if needed. """Check function load and scale the workers if needed.
@ -101,7 +102,10 @@ class DefaultEngine(object):
""" """
with etcd_util.get_worker_lock(function_id, version) as lock: with etcd_util.get_worker_lock(function_id, version) as lock:
if not lock.is_acquired(): if not lock.is_acquired():
return False raise exc.EtcdLockException(
'Etcd: failed to get worker lock for function %s'
'(version %s).' % (function_id, version)
)
workers = etcd_util.get_workers(function_id, version) workers = etcd_util.get_workers(function_id, version)
running_execs = db_api.get_executions( running_execs = db_api.get_executions(
@ -149,7 +153,10 @@ class DefaultEngine(object):
svc_url = self.function_load_check(function_id, svc_url = self.function_load_check(function_id,
function_version, function_version,
runtime_id) runtime_id)
except exc.OrchestratorException as e: except (
exc.OrchestratorException,
exc.EtcdLockException
) as e:
utils.handle_execution_exception(execution_id, str(e)) utils.handle_execution_exception(execution_id, str(e))
return return

View File

@ -38,6 +38,7 @@ def url_request(request_session, url, body=None):
r = tenacity.Retrying( r = tenacity.Retrying(
wait=tenacity.wait_fixed(1), wait=tenacity.wait_fixed(1),
stop=tenacity.stop_after_attempt(30), stop=tenacity.stop_after_attempt(30),
reraise=True,
retry=tenacity.retry_if_exception_type(IOError) retry=tenacity.retry_if_exception_type(IOError)
) )
r.call(request_session.get, ping_url, timeout=(3, 3), verify=False) r.call(request_session.get, ping_url, timeout=(3, 3), verify=False)
@ -150,9 +151,10 @@ def finish_execution(execution_id, success, res, is_image_source=False):
def handle_execution_exception(execution_id, exc_str): def handle_execution_exception(execution_id, exc_str):
LOG.error( # This method should be called from an exception handler
LOG.exception(
'Error running execution %s: %s', execution_id, exc_str 'Error running execution %s: %s', execution_id, exc_str
) )
db_set_execution_status( db_set_execution_status(
execution_id, status.ERROR, '', {'EngineError': exc_str} execution_id, status.ERROR, '', {'error': 'Function execution failed.'}
) )

View File

@ -116,3 +116,8 @@ class TrustFailedException(QinlingException):
class SwiftException(QinlingException): class SwiftException(QinlingException):
http_code = 500 http_code = 500
message = "Failed to communicate with Swift." message = "Failed to communicate with Swift."
class EtcdLockException(QinlingException):
http_code = 409
message = 'Etcd lock failed'

View File

@ -90,7 +90,8 @@ class KubernetesManager(base.OrchestratorBase):
@tenacity.retry( @tenacity.retry(
wait=tenacity.wait_fixed(2), wait=tenacity.wait_fixed(2),
stop=tenacity.stop_after_delay(600), stop=tenacity.stop_after_delay(600),
retry=tenacity.retry_if_result(lambda result: not result) reraise=True,
retry=tenacity.retry_if_exception_type(exc.OrchestratorException)
) )
def _wait_deployment_available(self, name): def _wait_deployment_available(self, name):
ret = self.v1extension.read_namespaced_deployment( ret = self.v1extension.read_namespaced_deployment(
@ -98,10 +99,11 @@ class KubernetesManager(base.OrchestratorBase):
self.conf.kubernetes.namespace self.conf.kubernetes.namespace
) )
if not ret.status.replicas: if (
return False not ret.status.replicas or
ret.status.replicas != ret.status.available_replicas
return ret.status.replicas == ret.status.available_replicas ):
raise exc.OrchestratorException('Deployment %s not ready.' % name)
def get_pool(self, name): def get_pool(self, name):
total = 0 total = 0

View File

@ -101,6 +101,19 @@ class TestFunctionVersionController(base.APITest):
self.assertEqual(403, resp.status_int) self.assertEqual(403, resp.status_int)
@mock.patch('qinling.utils.etcd_util.get_function_version_lock')
def test_post_etcd_lock_failed(self, mock_etcd_lock):
lock = mock.Mock()
mock_etcd_lock.return_value.__enter__.return_value = lock
lock.is_acquired.return_value = False
body = {'description': 'new version'}
resp = self.app.post_json('/v1/functions/%s/versions' % self.func_id,
body, expect_errors=True)
self.assertEqual(500, resp.status_int)
self.assertEqual("Internal server error.", resp.json['faultstring'])
def test_get_all(self): def test_get_all(self):
db_api.increase_function_version(self.func_id, 0, db_api.increase_function_version(self.func_id, 0,
description="version 1") description="version 1")

View File

@ -198,6 +198,25 @@ class TestDefaultEngine(base.DbTestCase):
self.assertEqual(3, lock.is_acquired.call_count) self.assertEqual(3, lock.is_acquired.call_count)
mock_getworkers.assert_called_once_with(function_id, 0) mock_getworkers.assert_called_once_with(function_id, 0)
@mock.patch('qinling.utils.etcd_util.get_worker_lock')
def test_function_load_check_failed_to_get_worker_lock(self, mock_getlock):
function = self.create_function()
function_id = function.id
runtime_id = function.runtime_id
function_version = 0
lock = mock.Mock()
# Lock is never acquired.
lock.is_acquired.return_value = False
mock_getlock.return_value.__enter__.return_value = lock
self.assertRaisesRegex(
exc.EtcdLockException,
"^Etcd: failed to get worker lock for function %s"
"\(version %s\)\.$" % (function_id, function_version),
self.default_engine.function_load_check,
function_id, function_version, runtime_id
)
@mock.patch('qinling.utils.etcd_util.get_service_url') @mock.patch('qinling.utils.etcd_util.get_service_url')
def test_create_execution_image_type_function(self, mock_svc_url): def test_create_execution_image_type_function(self, mock_svc_url):
"""Create 2 executions for an image type function.""" """Create 2 executions for an image type function."""
@ -327,7 +346,7 @@ class TestDefaultEngine(base.DbTestCase):
self.assertEqual(execution.status, status.ERROR) self.assertEqual(execution.status, status.ERROR)
self.assertEqual(execution.logs, '') self.assertEqual(execution.logs, '')
self.assertEqual(execution.result, self.assertEqual(execution.result,
{'EngineError': 'Exception in prepare_execution'}) {'error': 'Function execution failed.'})
@mock.patch('qinling.utils.etcd_util.get_service_url') @mock.patch('qinling.utils.etcd_util.get_service_url')
def test_create_execution_package_type_function( def test_create_execution_package_type_function(
@ -389,7 +408,7 @@ class TestDefaultEngine(base.DbTestCase):
self.assertEqual(execution.status, status.ERROR) self.assertEqual(execution.status, status.ERROR)
self.assertEqual(execution.logs, '') self.assertEqual(execution.logs, '')
self.assertEqual(execution.result, self.assertEqual(execution.result,
{'EngineError': 'Exception in scaleup_function'}) {'error': 'Function execution failed.'})
@mock.patch('qinling.engine.utils.get_request_data') @mock.patch('qinling.engine.utils.get_request_data')
@mock.patch('qinling.engine.utils.url_request') @mock.patch('qinling.engine.utils.url_request')

View File

@ -188,8 +188,11 @@ class TestKubernetesManager(base.DbTestCase):
fake_deployment_name = self.rand_name('deployment', prefix=self.prefix) fake_deployment_name = self.rand_name('deployment', prefix=self.prefix)
fake_image = self.rand_name('image', prefix=self.prefix) fake_image = self.rand_name('image', prefix=self.prefix)
self.manager.create_pool(fake_deployment_name, fake_image) self.assertRaisesRegex(
exc.OrchestratorException,
"^Deployment %s not ready\.$" % fake_deployment_name,
self.manager.create_pool,
fake_deployment_name, fake_image)
self.assertLess( self.assertLess(
200, # Default timeout is 600s with wait interval set to 2s. 200, # Default timeout is 600s with wait interval set to 2s.
self.k8s_v1_ext.read_namespaced_deployment.call_count) self.k8s_v1_ext.read_namespaced_deployment.call_count)

View File

@ -45,4 +45,10 @@ QinlingGroup = [
cfg.StrOpt("nodejs_runtime_image", cfg.StrOpt("nodejs_runtime_image",
default="openstackqinling/nodejs-runtime:0.0.1", default="openstackqinling/nodejs-runtime:0.0.1",
help="The NodeJS runtime being used in the tests."), help="The NodeJS runtime being used in the tests."),
cfg.StrOpt('etcd_host',
default='127.0.0.1',
help='Etcd service host address.'),
cfg.PortOpt('etcd_port',
default=2379,
help='Etcd service port.'),
] ]

View File

@ -11,16 +11,20 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from concurrent import futures from concurrent import futures
import etcd3gw
import json import json
import futurist import futurist
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
from tempest import config
from tempest.lib import decorators from tempest.lib import decorators
from tempest.lib import exceptions from tempest.lib import exceptions
from qinling_tempest_plugin.tests import base from qinling_tempest_plugin.tests import base
CONF = config.CONF
INVOKE_ERROR = "Function execution failed because of too much resource " \ INVOKE_ERROR = "Function execution failed because of too much resource " \
"consumption" "consumption"
@ -64,6 +68,32 @@ class ExecutionsTest(base.BaseQinlingTest):
resp = self.client.delete_resource('executions', execution_id_2) resp = self.client.delete_resource('executions', execution_id_2)
self.assertEqual(204, resp.status) self.assertEqual(204, resp.status)
@decorators.idempotent_id('6a388918-86eb-4e10-88e2-0032a7df38e9')
def test_create_execution_worker_lock_failed(self):
"""test_create_execution_worker_lock_failed
When creating an execution, the qinling-engine will check the load
and try to scaleup the function if needed. A lock is required when
doing this check.
In this test we acquire the lock manually, so that qinling will fail
to acquire the lock.
"""
function_id = self.create_function()
etcd3_client = etcd3gw.client(host=CONF.qinling.etcd_host,
port=CONF.qinling.etcd_port)
lock_id = "function_worker_%s_%s" % (function_id, 0)
with etcd3_client.lock(id=lock_id):
resp, body = self.client.create_execution(
function_id, input='{"name": "Qinling"}'
)
self.assertEqual(201, resp.status)
self.assertEqual('error', body['status'])
result = jsonutils.loads(body['result'])
self.assertEqual('Function execution failed.', result['error'])
@decorators.idempotent_id('2199d1e6-de7d-4345-8745-a8184d6022b1') @decorators.idempotent_id('2199d1e6-de7d-4345-8745-a8184d6022b1')
def test_get_all_admin(self): def test_get_all_admin(self):
"""Admin user can get executions of other projects""" """Admin user can get executions of other projects"""

View File

@ -11,12 +11,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import etcd3gw
from tempest import config
from tempest.lib import decorators from tempest.lib import decorators
from tempest.lib import exceptions from tempest.lib import exceptions
import tenacity import tenacity
from qinling_tempest_plugin.tests import base from qinling_tempest_plugin.tests import base
CONF = config.CONF
class FunctionVersionsTest(base.BaseQinlingTest): class FunctionVersionsTest(base.BaseQinlingTest):
name_prefix = 'FunctionVersionsTest' name_prefix = 'FunctionVersionsTest'
@ -87,6 +92,28 @@ class FunctionVersionsTest(base.BaseQinlingTest):
function_id function_id
) )
@decorators.idempotent_id('78dc5552-fcb8-4b27-86f7-5f3d96143934')
def test_create_version_lock_failed(self):
"""test_create_version_lock_failed
Creating a function requires a lock. If qinling failed to acquire the
lock then an error would be returned after some retries.
In this test we acquire the lock manually, so that qinling will fail
to acquire the lock.
"""
function_id = self.create_function()
etcd3_client = etcd3gw.client(host=CONF.qinling.etcd_host,
port=CONF.qinling.etcd_port)
lock_id = "function_version_%s" % function_id
with etcd3_client.lock(id=lock_id):
self.assertRaises(
exceptions.ServerFault,
self.client.create_function_version,
function_id
)
@decorators.idempotent_id('43c06f41-d116-43a7-a61c-115f7591b22e') @decorators.idempotent_id('43c06f41-d116-43a7-a61c-115f7591b22e')
def test_get_by_admin(self): def test_get_by_admin(self):
"""Admin user can get normal user's function version.""" """Admin user can get normal user's function version."""