Expose exception config in nsxlib
The user will be able to specify exception config object, that defines which exceptions bring endpoint down, and which exceptions trigger retry. This change removes exception handling from the client class, which hopefully makes the code more readable and easier to follow. Change-Id: If4dd5c01e4bc83c9704347c2c7c8638c5ac1d72c
This commit is contained in:
parent
12f0edff2f
commit
b02092b252
|
@ -19,7 +19,6 @@ import unittest
|
|||
import mock
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import uuidutils
|
||||
from requests import exceptions as requests_exceptions
|
||||
from requests import models
|
||||
|
||||
from vmware_nsxlib import v3
|
||||
|
@ -181,12 +180,6 @@ class MemoryMockAPIProvider(nsx_cluster.AbstractHTTPProvider):
|
|||
# all callers use the same backing
|
||||
return self._store
|
||||
|
||||
def is_connection_exception(self, exception):
|
||||
return isinstance(exception, requests_exceptions.ConnectionError)
|
||||
|
||||
def is_timeout_exception(self, exception):
|
||||
return isinstance(exception, requests_exceptions.Timeout)
|
||||
|
||||
|
||||
class NsxClientTestCase(NsxLibTestCase):
|
||||
|
||||
|
@ -396,12 +389,14 @@ class NsxClientTestCase(NsxLibTestCase):
|
|||
return client
|
||||
|
||||
def new_mocked_cluster(self, conf_managers, validate_conn_func,
|
||||
concurrent_connections=None):
|
||||
concurrent_connections=None, exceptions=None):
|
||||
mock_provider = mock.Mock()
|
||||
mock_provider.default_scheme = 'https'
|
||||
mock_provider.validate_connection = validate_conn_func
|
||||
|
||||
nsxlib_config = get_default_nsxlib_config()
|
||||
if exceptions:
|
||||
nsxlib_config.exception_config = exceptions
|
||||
if concurrent_connections:
|
||||
nsxlib_config.concurrent_connections = concurrent_connections
|
||||
nsxlib_config.http_provider = mock_provider
|
||||
|
|
|
@ -363,7 +363,7 @@ class NsxV3APIClientTestCase(nsxlib_testcase.NsxClientTestCase):
|
|||
def test_raise_error(self):
|
||||
api = self.new_mocked_client(client.NSX3Client)
|
||||
with self.assertRaises(nsxlib_exc.ManagerError) as e:
|
||||
api._raise_error(requests.codes.INTERNAL_SERVER_ERROR, 'GET', '')
|
||||
api._raise_error('GET', requests.codes.INTERNAL_SERVER_ERROR, '')
|
||||
self.assertEqual(e.exception.status_code,
|
||||
requests.codes.INTERNAL_SERVER_ERROR)
|
||||
|
||||
|
|
|
@ -16,16 +16,20 @@
|
|||
import unittest
|
||||
|
||||
import mock
|
||||
from requests import codes
|
||||
from requests import exceptions as requests_exceptions
|
||||
from requests import models
|
||||
import six.moves.urllib.parse as urlparse
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from vmware_nsxlib.tests.unit.v3 import mocks
|
||||
from vmware_nsxlib.tests.unit.v3 import nsxlib_testcase
|
||||
from vmware_nsxlib import v3
|
||||
from vmware_nsxlib.v3 import client
|
||||
from vmware_nsxlib.v3 import client_cert
|
||||
from vmware_nsxlib.v3 import cluster
|
||||
from vmware_nsxlib.v3 import config
|
||||
from vmware_nsxlib.v3 import exceptions as nsxlib_exc
|
||||
|
||||
|
||||
|
@ -37,6 +41,10 @@ def _validate_conn_down(*args, **kwargs):
|
|||
raise requests_exceptions.ConnectionError()
|
||||
|
||||
|
||||
def _validate_server_busy(*args, **kwargs):
|
||||
raise nsxlib_exc.ServerBusy(details="Test")
|
||||
|
||||
|
||||
def get_sess_create_resp():
|
||||
sess_create_response = models.Response()
|
||||
sess_create_response.status_code = 200
|
||||
|
@ -282,9 +290,10 @@ class NsxV3ClusteredAPIWithClientCertTestCase(NsxV3ClusteredAPITestCase):
|
|||
|
||||
class ClusteredAPITestCase(nsxlib_testcase.NsxClientTestCase):
|
||||
|
||||
def _test_health(self, validate_fn, expected_health):
|
||||
def _test_health(self, validate_fn, expected_health, exceptions=None):
|
||||
conf_managers = ['8.9.10.11', '9.10.11.12']
|
||||
api = self.new_mocked_cluster(conf_managers, validate_fn)
|
||||
api = self.new_mocked_cluster(conf_managers, validate_fn,
|
||||
exceptions=exceptions)
|
||||
|
||||
self.assertEqual(expected_health, api.health)
|
||||
|
||||
|
@ -312,6 +321,16 @@ class ClusteredAPITestCase(nsxlib_testcase.NsxClientTestCase):
|
|||
self.assertRaises(nsxlib_exc.ServiceClusterUnavailable,
|
||||
api.get, 'api/v1/transport-zones')
|
||||
|
||||
def test_cluster_validate_with_exception_retry(self):
|
||||
self._test_health(_validate_server_busy, cluster.ClusterHealth.GREEN)
|
||||
|
||||
def test_cluster_validate_with_exception_no_retry(self):
|
||||
exceptions = config.ExceptionConfig()
|
||||
exceptions.retriables = []
|
||||
self._test_health(_validate_server_busy,
|
||||
cluster.ClusterHealth.RED,
|
||||
exceptions)
|
||||
|
||||
def test_cluster_proxy_stale_revision(self):
|
||||
|
||||
def stale_revision():
|
||||
|
@ -322,6 +341,93 @@ class ClusteredAPITestCase(nsxlib_testcase.NsxClientTestCase):
|
|||
self.assertRaises(nsxlib_exc.StaleRevision,
|
||||
api.get, 'api/v1/transport-zones')
|
||||
|
||||
def test_cluster_down_and_retry_on_server_error(self):
|
||||
|
||||
def server_error():
|
||||
return mocks.MockRequestsResponse(
|
||||
codes.INTERNAL_SERVER_ERROR,
|
||||
jsonutils.dumps({'error_message': 'test', 'error_code': 98}))
|
||||
|
||||
def all_well():
|
||||
return mocks.MockRequestsResponse(
|
||||
codes.OK,
|
||||
jsonutils.dumps({'id': 'test'}))
|
||||
|
||||
conf_managers = ['8.9.10.11', '9.10.11.12', '10.11.12.13']
|
||||
exceptions = config.ExceptionConfig()
|
||||
exceptions.ground_triggers.append(nsxlib_exc.CannotConnectToServer)
|
||||
api = self.mock_nsx_clustered_api(
|
||||
nsx_api_managers=conf_managers,
|
||||
session_response=[server_error, all_well])
|
||||
api.nsxlib_config.exception_config = exceptions
|
||||
|
||||
api.get('api/v1/transport-zones')
|
||||
# first manager should go down, second one is confirmed as UP
|
||||
# after retry
|
||||
self.assertEqual(cluster.ClusterHealth.ORANGE, api.health)
|
||||
|
||||
def test_max_retry_attempts_on_server_error(self):
|
||||
|
||||
def server_error():
|
||||
return mocks.MockRequestsResponse(
|
||||
codes.INTERNAL_SERVER_ERROR,
|
||||
jsonutils.dumps({'error_message': 'test', 'error_code': 98}))
|
||||
|
||||
conf_managers = ['8.9.10.11', '9.10.11.12', '10.11.12.13']
|
||||
exceptions = config.ExceptionConfig()
|
||||
exceptions.ground_triggers.append(nsxlib_exc.CannotConnectToServer)
|
||||
max_attempts = 4
|
||||
api = self.mock_nsx_clustered_api(
|
||||
nsx_api_managers=conf_managers,
|
||||
max_attempts=max_attempts,
|
||||
session_response=[server_error for i in range(0, max_attempts)])
|
||||
api.nsxlib_config.cluster_unavailable_retry = False
|
||||
api.nsxlib_config.exception_config = exceptions
|
||||
|
||||
self.assertRaises(nsxlib_exc.ServiceClusterUnavailable,
|
||||
api.get, 'api/v1/transport-zones')
|
||||
self.assertEqual(cluster.ClusterHealth.RED, api.health)
|
||||
|
||||
def test_max_retry_attempts_on_retriable_error(self):
|
||||
|
||||
def server_error():
|
||||
return mocks.MockRequestsResponse(
|
||||
codes.INTERNAL_SERVER_ERROR,
|
||||
jsonutils.dumps({'error_message': 'test', 'error_code': 98}))
|
||||
|
||||
conf_managers = ['8.9.10.11', '9.10.11.12', '10.11.12.13']
|
||||
exceptions = config.ExceptionConfig()
|
||||
max_attempts = 2
|
||||
api = self.mock_nsx_clustered_api(
|
||||
nsx_api_managers=conf_managers,
|
||||
max_attempts=max_attempts,
|
||||
session_response=[server_error for i in range(0, max_attempts)])
|
||||
api.nsxlib_config.exception_config = exceptions
|
||||
|
||||
self.assertRaises(nsxlib_exc.CannotConnectToServer,
|
||||
api.get, 'api/v1/transport-zones')
|
||||
# This exception does not ground endpoint
|
||||
self.assertEqual(cluster.ClusterHealth.GREEN, api.health)
|
||||
|
||||
def test_non_retriable_error(self):
|
||||
|
||||
def server_error():
|
||||
return mocks.MockRequestsResponse(
|
||||
codes.INTERNAL_SERVER_ERROR,
|
||||
jsonutils.dumps({'error_message': 'test', 'error_code': 99}))
|
||||
|
||||
conf_managers = ['8.9.10.11', '9.10.11.12', '10.11.12.13']
|
||||
exceptions = config.ExceptionConfig()
|
||||
max_attempts = 2
|
||||
api = self.mock_nsx_clustered_api(
|
||||
nsx_api_managers=conf_managers,
|
||||
max_attempts=max_attempts,
|
||||
session_response=[server_error for i in range(0, max_attempts)])
|
||||
api.nsxlib_config.exception_config = exceptions
|
||||
|
||||
api.get('api/v1/transport-zones')
|
||||
self.assertEqual(cluster.ClusterHealth.GREEN, api.health)
|
||||
|
||||
def test_cluster_proxy_connection_establish_error(self):
|
||||
|
||||
def connect_timeout():
|
||||
|
|
|
@ -305,6 +305,43 @@ class TestNsxV3Utils(nsxlib_testcase.NsxClientTestCase):
|
|||
self.assertRaises(exceptions.NsxLibInvalidInput, func_to_fail, 99)
|
||||
self.assertEqual(max_retries, total_count['val'])
|
||||
|
||||
def test_retry_random_upon_exception_result_retry(self):
|
||||
total_count = {'val': 0}
|
||||
max_retries = 3
|
||||
|
||||
@utils.retry_random_upon_exception_result(max_retries)
|
||||
def func_to_fail():
|
||||
total_count['val'] = total_count['val'] + 1
|
||||
return exceptions.NsxLibInvalidInput(error_message='foo')
|
||||
|
||||
self.assertRaises(exceptions.NsxLibInvalidInput, func_to_fail)
|
||||
self.assertEqual(max_retries, total_count['val'])
|
||||
|
||||
def test_retry_random_upon_exception_result_no_retry(self):
|
||||
total_count = {'val': 0}
|
||||
|
||||
@utils.retry_random_upon_exception_result(3)
|
||||
def func_to_fail():
|
||||
total_count['val'] = total_count['val'] + 1
|
||||
raise exceptions.NsxLibInvalidInput(error_message='foo')
|
||||
|
||||
self.assertRaises(exceptions.NsxLibInvalidInput, func_to_fail)
|
||||
# should not retry since exception is raised, and not returned
|
||||
self.assertEqual(1, total_count['val'])
|
||||
|
||||
def test_retry_random_upon_exception_result_no_retry2(self):
|
||||
total_count = {'val': 0}
|
||||
ret_val = 42
|
||||
|
||||
@utils.retry_random_upon_exception_result(3)
|
||||
def func_to_fail():
|
||||
total_count['val'] = total_count['val'] + 1
|
||||
return ret_val
|
||||
|
||||
self.assertEqual(ret_val, func_to_fail())
|
||||
# should not retry since no exception is returned
|
||||
self.assertEqual(1, total_count['val'])
|
||||
|
||||
@mock.patch.object(utils, '_update_max_nsgroups_criteria_tags')
|
||||
@mock.patch.object(utils, '_update_max_tags')
|
||||
@mock.patch.object(utils, '_update_tag_length')
|
||||
|
|
|
@ -29,6 +29,42 @@ LOG = log.getLogger(__name__)
|
|||
NULL_CURSOR_PREFIX = '0000'
|
||||
|
||||
|
||||
def get_http_error_details(response):
|
||||
msg = response.json() if response.content else ''
|
||||
error_code = None
|
||||
related_error_codes = []
|
||||
|
||||
if isinstance(msg, dict) and 'error_message' in msg:
|
||||
error_code = msg.get('error_code')
|
||||
related_errors = [error['error_message'] for error in
|
||||
msg.get('related_errors', [])]
|
||||
related_error_codes = [str(error['error_code']) for error in
|
||||
msg.get('related_errors', []) if
|
||||
error.get('error_code')]
|
||||
msg = msg['error_message']
|
||||
if related_errors:
|
||||
msg += " relatedErrors: %s" % ' '.join(related_errors)
|
||||
|
||||
return {'status_code': response.status_code,
|
||||
'error_code': error_code,
|
||||
'related_error_codes': related_error_codes,
|
||||
'details': msg}
|
||||
|
||||
|
||||
def init_http_exception_from_response(response):
|
||||
if not response:
|
||||
return None
|
||||
|
||||
error_details = get_http_error_details(response)
|
||||
if not error_details['error_code']:
|
||||
return None
|
||||
|
||||
error = http_error_to_exception(error_details['status_code'],
|
||||
error_details['error_code'])
|
||||
|
||||
return error(manager='', **error_details)
|
||||
|
||||
|
||||
def http_error_to_exception(status_code, error_code):
|
||||
errors = {
|
||||
requests.codes.NOT_FOUND:
|
||||
|
@ -99,7 +135,7 @@ class RESTClient(object):
|
|||
def list(self, resource='', headers=None, silent=False):
|
||||
return self.url_list(resource, headers=headers, silent=silent)
|
||||
|
||||
def get(self, uuid, headers=None, silent=False, with_retries=True):
|
||||
def get(self, uuid, headers=None, silent=False, with_retries=False):
|
||||
return self.url_get(uuid, headers=headers, silent=silent,
|
||||
with_retries=with_retries)
|
||||
|
||||
|
@ -132,7 +168,7 @@ class RESTClient(object):
|
|||
cursor = page.get('cursor', NULL_CURSOR_PREFIX)
|
||||
return concatenate_response
|
||||
|
||||
def url_get(self, url, headers=None, silent=False, with_retries=True):
|
||||
def url_get(self, url, headers=None, silent=False, with_retries=False):
|
||||
return self._rest_call(url, method='GET', headers=headers,
|
||||
silent=silent, with_retries=with_retries)
|
||||
|
||||
|
@ -151,10 +187,10 @@ class RESTClient(object):
|
|||
def url_patch(self, url, body, headers=None):
|
||||
return self._rest_call(url, method='PATCH', body=body, headers=headers)
|
||||
|
||||
def _raise_error(self, status_code, operation, result_msg,
|
||||
def _raise_error(self, operation, status_code, details,
|
||||
error_code=None, related_error_codes=None):
|
||||
error = http_error_to_exception(status_code, error_code)
|
||||
raise error(manager='', operation=operation, details=result_msg,
|
||||
raise error(manager='', operation=operation, details=details,
|
||||
error_code=error_code,
|
||||
related_error_codes=related_error_codes,
|
||||
status_code=status_code)
|
||||
|
@ -171,22 +207,9 @@ class RESTClient(object):
|
|||
for code in expected]),
|
||||
'body': result_msg})
|
||||
|
||||
error_code = None
|
||||
related_error_codes = []
|
||||
if isinstance(result_msg, dict) and 'error_message' in result_msg:
|
||||
error_code = result_msg.get('error_code')
|
||||
related_errors = [error['error_message'] for error in
|
||||
result_msg.get('related_errors', [])]
|
||||
related_error_codes = [str(error['error_code']) for error in
|
||||
result_msg.get('related_errors', []) if
|
||||
error.get('error_code')]
|
||||
result_msg = result_msg['error_message']
|
||||
if related_errors:
|
||||
result_msg += " relatedErrors: %s" % ' '.join(
|
||||
related_errors)
|
||||
self._raise_error(result.status_code, operation, result_msg,
|
||||
error_code=error_code,
|
||||
related_error_codes=related_error_codes)
|
||||
error_details = get_http_error_details(result)
|
||||
|
||||
self._raise_error(operation, **error_details)
|
||||
|
||||
@classmethod
|
||||
def merge_headers(cls, *headers):
|
||||
|
@ -322,35 +345,21 @@ class NSX3Client(JSONRESTClient):
|
|||
default_headers=default_headers,
|
||||
client_obj=client_obj)
|
||||
|
||||
def _raise_error(self, status_code, operation, result_msg,
|
||||
def _raise_error(self, operation, status_code, details,
|
||||
error_code=None, related_error_codes=None):
|
||||
"""Override the Rest client errors to add the manager IPs"""
|
||||
error = http_error_to_exception(status_code, error_code)
|
||||
raise error(manager=self.nsx_api_managers,
|
||||
operation=operation,
|
||||
details=result_msg,
|
||||
details=details,
|
||||
error_code=error_code,
|
||||
related_error_codes=related_error_codes,
|
||||
status_code=status_code)
|
||||
|
||||
def _rest_call(self, url, **kwargs):
|
||||
if kwargs.get('with_retries', True):
|
||||
# Retry on "607: Persistence layer is currently reconfiguring"
|
||||
# and on "98: Cannot connect to server"
|
||||
retry_codes = [exceptions.APITransactionAborted,
|
||||
exceptions.CannotConnectToServer]
|
||||
if self.rate_limit_retry:
|
||||
# If too many requests are handled by the nsx at the same time,
|
||||
# error "429: Too Many Requests" or "503: Server Unavailable"
|
||||
# will be returned.
|
||||
retry_codes.append(exceptions.ServerBusy)
|
||||
LOG.warning("with_retries setting is deprecated and will be "
|
||||
"removed. Please use exceptions setting in nsxlib "
|
||||
"config instead")
|
||||
|
||||
# the client is expected to retry after a random 400-600 milli,
|
||||
# and later exponentially until 5 seconds wait
|
||||
@utils.retry_random_upon_exception(
|
||||
tuple(retry_codes), max_attempts=self.max_attempts)
|
||||
def _rest_call_with_retry(self, url, **kwargs):
|
||||
return super(NSX3Client, self)._rest_call(url, **kwargs)
|
||||
return _rest_call_with_retry(self, url, **kwargs)
|
||||
else:
|
||||
return super(NSX3Client, self)._rest_call(url, **kwargs)
|
||||
return super(NSX3Client, self)._rest_call(url, **kwargs)
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
import abc
|
||||
import contextlib
|
||||
import copy
|
||||
|
@ -31,7 +32,6 @@ from oslo_log import log
|
|||
from oslo_service import loopingcall
|
||||
import requests
|
||||
from requests import adapters
|
||||
from requests import exceptions as requests_exceptions
|
||||
import six
|
||||
import six.moves.urllib.parse as urlparse
|
||||
import urllib3
|
||||
|
@ -78,20 +78,6 @@ class AbstractHTTPProvider(object):
|
|||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def is_connection_exception(self, exception):
|
||||
"""Determine if the given exception is related to connection failure.
|
||||
|
||||
Return True if it's a connection exception and False otherwise.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def is_timeout_exception(self, exception):
|
||||
"""Determine if the given exception is related to timeout.
|
||||
|
||||
Return True if it's a timeout exception and False otherwise.
|
||||
"""
|
||||
|
||||
|
||||
class TimeoutSession(requests.Session):
|
||||
"""Extends requests.Session to support timeout at the session level."""
|
||||
|
@ -245,15 +231,6 @@ class NSXRequestsHTTPProvider(AbstractHTTPProvider):
|
|||
|
||||
return session
|
||||
|
||||
def is_connection_exception(self, exception):
|
||||
return isinstance(exception, requests_exceptions.ConnectionError)
|
||||
|
||||
def is_timeout_exception(self, exception):
|
||||
return isinstance(exception, requests_exceptions.Timeout)
|
||||
|
||||
def is_conn_open_exception(self, exception):
|
||||
return isinstance(exception, requests_exceptions.ConnectTimeout)
|
||||
|
||||
def get_default_headers(self, session, provider, allow_overwrite_header,
|
||||
token_provider=None):
|
||||
"""Get the default headers that should be added to future requests"""
|
||||
|
@ -586,7 +563,12 @@ class ClusteredAPI(object):
|
|||
# regenerate connection pool based on token
|
||||
endpoint.regenerate_pool()
|
||||
except Exception as e:
|
||||
endpoint.set_state(EndpointState.DOWN)
|
||||
if self.nsxlib_config.exception_config.should_retry(e):
|
||||
LOG.info("Exception is retriable, endpoint stays UP")
|
||||
endpoint.set_state(EndpointState.UP)
|
||||
else:
|
||||
endpoint.set_state(EndpointState.DOWN)
|
||||
|
||||
LOG.warning("Failed to validate API cluster endpoint "
|
||||
"'%(ep)s' due to: %(err)s",
|
||||
{'ep': endpoint, 'err': e})
|
||||
|
@ -667,6 +649,20 @@ class ClusteredAPI(object):
|
|||
# slow rate at once per 33 seconds by default.
|
||||
yield EndpointConnection(endpoint, conn, conn_wait, rate_wait)
|
||||
|
||||
def _raise_http_exception_if_needed(self, response):
|
||||
# We need to inspect http codes to understand whether
|
||||
# this error is relevant for endpoint-level decisions, such
|
||||
# as ground endpoint or retry with next endpoint
|
||||
exc = nsx_client.init_http_exception_from_response(response)
|
||||
if not exc:
|
||||
# This exception is irrelevant for endpoint decisions
|
||||
return
|
||||
|
||||
exc_config = self.nsxlib_config.exception_config
|
||||
if (exc_config.should_ground_endpoint(exc) or
|
||||
exc_config.should_retry(exc)):
|
||||
raise exc
|
||||
|
||||
def _proxy_stub(self, proxy_for):
|
||||
def _call_proxy(url, *args, **kwargs):
|
||||
return self._proxy(proxy_for, url, *args, **kwargs)
|
||||
|
@ -674,7 +670,8 @@ class ClusteredAPI(object):
|
|||
|
||||
def _proxy(self, proxy_for, uri, *args, **kwargs):
|
||||
|
||||
@utils.retry_upon_none_result(self.nsxlib_config.max_attempts)
|
||||
@utils.retry_random_upon_exception_result(
|
||||
max_attempts=self.nsxlib_config.max_attempts)
|
||||
def _proxy_internal(proxy_for, uri, *args, **kwargs):
|
||||
# proxy http request call to an avail endpoint
|
||||
with self.endpoint_connection() as conn_data:
|
||||
|
@ -702,22 +699,29 @@ class ClusteredAPI(object):
|
|||
response = do_request(url, *args, **kwargs)
|
||||
endpoint.set_state(EndpointState.UP)
|
||||
|
||||
# for some status codes, we need to bring the cluster
|
||||
# down or retry API call
|
||||
self._raise_http_exception_if_needed(response)
|
||||
|
||||
return response
|
||||
except Exception as e:
|
||||
LOG.warning("Request failed due to: %s", e)
|
||||
if (not self._http_provider.is_connection_exception(e) and
|
||||
not self._http_provider.is_timeout_exception(e)):
|
||||
# only trap and retry connection & timeout errors
|
||||
raise e
|
||||
if self._http_provider.is_conn_open_exception(e):
|
||||
# unable to establish new connection - endpoint is
|
||||
# inaccessible
|
||||
exc_config = self.nsxlib_config.exception_config
|
||||
if exc_config.should_ground_endpoint(e):
|
||||
# consider endpoint inaccessible and move to next
|
||||
# endpoint
|
||||
endpoint.set_state(EndpointState.DOWN)
|
||||
|
||||
LOG.info("Connection to %s failed, checking additional "
|
||||
"connections and endpoints" % url)
|
||||
# this might be a result of server closing connection
|
||||
# return None so it will retry upto max_attempts.
|
||||
elif not exc_config.should_retry(e):
|
||||
LOG.info("Exception %s is configured as not retriable",
|
||||
e)
|
||||
raise e
|
||||
|
||||
# Returning the exception instead of raising it will cause
|
||||
# decarator to retry. If retry attempts is exceeded, this
|
||||
# same exception will be raised due to overriden reraise
|
||||
# method of RetryAttemptsExceeded
|
||||
return e
|
||||
|
||||
return _proxy_internal(proxy_for, uri, *args, **kwargs)
|
||||
|
||||
|
|
|
@ -16,9 +16,45 @@
|
|||
from oslo_log import log
|
||||
from oslo_log import versionutils
|
||||
|
||||
from requests import exceptions as requests_exceptions
|
||||
|
||||
from vmware_nsxlib.v3 import exceptions as v3_exceptions
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class ExceptionConfig(object):
|
||||
|
||||
def __init__(self):
|
||||
# When hit during API call, these exceptions will mark
|
||||
# endpoint as DOWN immediately
|
||||
# This setting has no effect on keepalive validation
|
||||
self.ground_triggers = [requests_exceptions.ConnectionError,
|
||||
requests_exceptions.Timeout]
|
||||
|
||||
# When hit during API call, these exceptions will be
|
||||
# retried with next available endpoint
|
||||
# When hit during validation, these exception will not
|
||||
# mark endpoint as DOWN
|
||||
self.retriables = [v3_exceptions.APITransactionAborted,
|
||||
v3_exceptions.CannotConnectToServer,
|
||||
v3_exceptions.ServerBusy]
|
||||
|
||||
def should_ground_endpoint(self, ex):
|
||||
for exception in self.ground_triggers:
|
||||
if isinstance(ex, exception):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def should_retry(self, ex):
|
||||
for exception in self.retriables:
|
||||
if isinstance(ex, exception):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
class NsxLibConfig(object):
|
||||
"""Class holding all the configuration parameters used by the nsxlib code.
|
||||
|
||||
|
@ -135,7 +171,8 @@ class NsxLibConfig(object):
|
|||
allow_passthrough=False,
|
||||
realization_max_attempts=50,
|
||||
realization_wait_sec=1.0,
|
||||
api_rate_limit_per_endpoint=None):
|
||||
api_rate_limit_per_endpoint=None,
|
||||
exception_config=None):
|
||||
|
||||
self.nsx_api_managers = nsx_api_managers
|
||||
self._username = username
|
||||
|
@ -164,6 +201,7 @@ class NsxLibConfig(object):
|
|||
self.realization_max_attempts = realization_max_attempts
|
||||
self.realization_wait_sec = realization_wait_sec
|
||||
self.api_rate_limit_per_endpoint = api_rate_limit_per_endpoint
|
||||
self.exception_config = exception_config or ExceptionConfig()
|
||||
|
||||
if len(nsx_api_managers) == 1 and not self.cluster_unavailable_retry:
|
||||
LOG.warning("When only one endpoint is provided, keepalive probes"
|
||||
|
|
|
@ -26,7 +26,7 @@ from tenacity import _utils as tenacity_utils
|
|||
|
||||
from vmware_nsxlib._i18n import _
|
||||
from vmware_nsxlib.v3 import constants
|
||||
from vmware_nsxlib.v3 import exceptions as nsxlib_exceptions
|
||||
from vmware_nsxlib.v3 import exceptions as nsxlib_exc
|
||||
from vmware_nsxlib.v3 import nsx_constants
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
@ -105,7 +105,7 @@ def update_tag_limits(limits):
|
|||
def _validate_resource_type_length(resource_type):
|
||||
# Add in a validation to ensure that we catch this at build time
|
||||
if len(resource_type) > MAX_RESOURCE_TYPE_LEN:
|
||||
raise nsxlib_exceptions.NsxLibInvalidInput(
|
||||
raise nsxlib_exc.NsxLibInvalidInput(
|
||||
error_message=(_('Resource type cannot exceed %(max_len)s '
|
||||
'characters: %(resource_type)s') %
|
||||
{'max_len': MAX_RESOURCE_TYPE_LEN,
|
||||
|
@ -215,6 +215,26 @@ def retry_upon_none_result(max_attempts, delay=0.5, max_delay=10,
|
|||
before=_log_before_retry, after=_log_after_retry)
|
||||
|
||||
|
||||
class RetryAttemptsExceeded(tenacity.RetryError):
|
||||
def reraise(self):
|
||||
raise self.last_attempt.result()
|
||||
|
||||
|
||||
# Retry when exception is returned by decorated function.
|
||||
# If retry attempts are exceeded, reraise the last exception.
|
||||
# This is achieved by overriding reraise method of RetryAttemptsExceeded
|
||||
def retry_random_upon_exception_result(max_attempts, delay=0.5, max_delay=10):
|
||||
wait_func = tenacity.wait_random_exponential(
|
||||
multiplier=delay, max=max_delay)
|
||||
return tenacity.retry(reraise=True,
|
||||
retry_error_cls=RetryAttemptsExceeded,
|
||||
retry=tenacity.retry_if_result(
|
||||
lambda x: isinstance(x, Exception)),
|
||||
wait=wait_func,
|
||||
stop=tenacity.stop_after_attempt(max_attempts),
|
||||
before=_log_before_retry, after=_log_after_retry)
|
||||
|
||||
|
||||
def list_match(list1, list2):
|
||||
# Check if list1 and list2 have identical elements, but relaxed on
|
||||
# dict elements where list1's dict element can be a subset of list2's
|
||||
|
@ -432,7 +452,7 @@ class NsxLibApiBase(object):
|
|||
# NSX has, we will get a 412: Precondition Failed.
|
||||
# In that case we need to re-fetch, patch the response and send
|
||||
# it again with the new revision_id
|
||||
@retry_upon_exception(nsxlib_exceptions.StaleRevision,
|
||||
@retry_upon_exception(nsxlib_exc.StaleRevision,
|
||||
max_attempts=self.max_attempts)
|
||||
def do_update():
|
||||
return self._internal_update_resource(
|
||||
|
@ -458,7 +478,7 @@ class NsxLibApiBase(object):
|
|||
|
||||
def _delete_by_path_with_retry(self, path):
|
||||
# Using internal method so we can access max_attempts in the decorator
|
||||
@retry_upon_exception(nsxlib_exceptions.StaleRevision,
|
||||
@retry_upon_exception(nsxlib_exc.StaleRevision,
|
||||
max_attempts=self.max_attempts)
|
||||
def _do_delete():
|
||||
self.client.delete(path)
|
||||
|
@ -467,7 +487,7 @@ class NsxLibApiBase(object):
|
|||
|
||||
def _create_with_retry(self, resource, body=None, headers=None):
|
||||
# Using internal method so we can access max_attempts in the decorator
|
||||
@retry_upon_exception(nsxlib_exceptions.StaleRevision,
|
||||
@retry_upon_exception(nsxlib_exc.StaleRevision,
|
||||
max_attempts=self.max_attempts)
|
||||
def _do_create():
|
||||
return self.client.create(resource, body, headers=headers)
|
||||
|
@ -489,11 +509,11 @@ class NsxLibApiBase(object):
|
|||
if len(matched_results) == 0:
|
||||
err_msg = (_("Could not find %(resource)s %(name)s") %
|
||||
{'name': name_or_id, 'resource': resource})
|
||||
raise nsxlib_exceptions.ManagerError(details=err_msg)
|
||||
raise nsxlib_exc.ManagerError(details=err_msg)
|
||||
elif len(matched_results) > 1:
|
||||
err_msg = (_("Found multiple %(resource)s named %(name)s") %
|
||||
{'name': name_or_id, 'resource': resource})
|
||||
raise nsxlib_exceptions.ManagerError(details=err_msg)
|
||||
raise nsxlib_exc.ManagerError(details=err_msg)
|
||||
|
||||
return matched_results[0].get('id')
|
||||
|
||||
|
@ -579,24 +599,24 @@ def validate_icmp_params(icmp_type, icmp_code, icmp_version=4, strict=False):
|
|||
if icmp_type:
|
||||
if (strict and icmp_type not in
|
||||
constants.IPV4_ICMP_STRICT_TYPES):
|
||||
raise nsxlib_exceptions.InvalidInput(
|
||||
raise nsxlib_exc.InvalidInput(
|
||||
operation='create_rule',
|
||||
arg_val=icmp_type,
|
||||
arg_name='icmp_type')
|
||||
if icmp_type not in constants.IPV4_ICMP_TYPES:
|
||||
raise nsxlib_exceptions.InvalidInput(
|
||||
raise nsxlib_exc.InvalidInput(
|
||||
operation='create_rule',
|
||||
arg_val=icmp_type,
|
||||
arg_name='icmp_type')
|
||||
if (icmp_code and strict and icmp_code not in
|
||||
constants.IPV4_ICMP_STRICT_TYPES[icmp_type]):
|
||||
raise nsxlib_exceptions.InvalidInput(
|
||||
raise nsxlib_exc.InvalidInput(
|
||||
operation='create_rule',
|
||||
arg_val=icmp_code,
|
||||
arg_name='icmp_code for this icmp_type')
|
||||
if (icmp_code and icmp_code not in
|
||||
constants.IPV4_ICMP_TYPES[icmp_type]):
|
||||
raise nsxlib_exceptions.InvalidInput(
|
||||
raise nsxlib_exc.InvalidInput(
|
||||
operation='create_rule',
|
||||
arg_val=icmp_code,
|
||||
arg_name='icmp_code for this icmp_type')
|
||||
|
@ -610,7 +630,7 @@ def get_l4_protocol_name(protocol_number):
|
|||
try:
|
||||
protocol_number = int(protocol_number)
|
||||
except ValueError:
|
||||
raise nsxlib_exceptions.InvalidInput(
|
||||
raise nsxlib_exc.InvalidInput(
|
||||
operation='create_rule',
|
||||
arg_val=protocol_number,
|
||||
arg_name='protocol')
|
||||
|
|
Loading…
Reference in New Issue