Merge "Expose exception config in nsxlib"
This commit is contained in:
commit
782ab2c1ed
|
@ -19,7 +19,6 @@ import unittest
|
|||
import mock
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import uuidutils
|
||||
from requests import exceptions as requests_exceptions
|
||||
from requests import models
|
||||
|
||||
from vmware_nsxlib import v3
|
||||
|
@ -181,12 +180,6 @@ class MemoryMockAPIProvider(nsx_cluster.AbstractHTTPProvider):
|
|||
# all callers use the same backing
|
||||
return self._store
|
||||
|
||||
def is_connection_exception(self, exception):
|
||||
return isinstance(exception, requests_exceptions.ConnectionError)
|
||||
|
||||
def is_timeout_exception(self, exception):
|
||||
return isinstance(exception, requests_exceptions.Timeout)
|
||||
|
||||
|
||||
class NsxClientTestCase(NsxLibTestCase):
|
||||
|
||||
|
@ -396,12 +389,14 @@ class NsxClientTestCase(NsxLibTestCase):
|
|||
return client
|
||||
|
||||
def new_mocked_cluster(self, conf_managers, validate_conn_func,
|
||||
concurrent_connections=None):
|
||||
concurrent_connections=None, exceptions=None):
|
||||
mock_provider = mock.Mock()
|
||||
mock_provider.default_scheme = 'https'
|
||||
mock_provider.validate_connection = validate_conn_func
|
||||
|
||||
nsxlib_config = get_default_nsxlib_config()
|
||||
if exceptions:
|
||||
nsxlib_config.exception_config = exceptions
|
||||
if concurrent_connections:
|
||||
nsxlib_config.concurrent_connections = concurrent_connections
|
||||
nsxlib_config.http_provider = mock_provider
|
||||
|
|
|
@ -363,7 +363,7 @@ class NsxV3APIClientTestCase(nsxlib_testcase.NsxClientTestCase):
|
|||
def test_raise_error(self):
|
||||
api = self.new_mocked_client(client.NSX3Client)
|
||||
with self.assertRaises(nsxlib_exc.ManagerError) as e:
|
||||
api._raise_error(requests.codes.INTERNAL_SERVER_ERROR, 'GET', '')
|
||||
api._raise_error('GET', requests.codes.INTERNAL_SERVER_ERROR, '')
|
||||
self.assertEqual(e.exception.status_code,
|
||||
requests.codes.INTERNAL_SERVER_ERROR)
|
||||
|
||||
|
|
|
@ -16,16 +16,20 @@
|
|||
import unittest
|
||||
|
||||
import mock
|
||||
from requests import codes
|
||||
from requests import exceptions as requests_exceptions
|
||||
from requests import models
|
||||
import six.moves.urllib.parse as urlparse
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from vmware_nsxlib.tests.unit.v3 import mocks
|
||||
from vmware_nsxlib.tests.unit.v3 import nsxlib_testcase
|
||||
from vmware_nsxlib import v3
|
||||
from vmware_nsxlib.v3 import client
|
||||
from vmware_nsxlib.v3 import client_cert
|
||||
from vmware_nsxlib.v3 import cluster
|
||||
from vmware_nsxlib.v3 import config
|
||||
from vmware_nsxlib.v3 import exceptions as nsxlib_exc
|
||||
|
||||
|
||||
|
@ -37,6 +41,10 @@ def _validate_conn_down(*args, **kwargs):
|
|||
raise requests_exceptions.ConnectionError()
|
||||
|
||||
|
||||
def _validate_server_busy(*args, **kwargs):
|
||||
raise nsxlib_exc.ServerBusy(details="Test")
|
||||
|
||||
|
||||
def get_sess_create_resp():
|
||||
sess_create_response = models.Response()
|
||||
sess_create_response.status_code = 200
|
||||
|
@ -282,9 +290,10 @@ class NsxV3ClusteredAPIWithClientCertTestCase(NsxV3ClusteredAPITestCase):
|
|||
|
||||
class ClusteredAPITestCase(nsxlib_testcase.NsxClientTestCase):
|
||||
|
||||
def _test_health(self, validate_fn, expected_health):
|
||||
def _test_health(self, validate_fn, expected_health, exceptions=None):
|
||||
conf_managers = ['8.9.10.11', '9.10.11.12']
|
||||
api = self.new_mocked_cluster(conf_managers, validate_fn)
|
||||
api = self.new_mocked_cluster(conf_managers, validate_fn,
|
||||
exceptions=exceptions)
|
||||
|
||||
self.assertEqual(expected_health, api.health)
|
||||
|
||||
|
@ -312,6 +321,16 @@ class ClusteredAPITestCase(nsxlib_testcase.NsxClientTestCase):
|
|||
self.assertRaises(nsxlib_exc.ServiceClusterUnavailable,
|
||||
api.get, 'api/v1/transport-zones')
|
||||
|
||||
def test_cluster_validate_with_exception_retry(self):
|
||||
self._test_health(_validate_server_busy, cluster.ClusterHealth.GREEN)
|
||||
|
||||
def test_cluster_validate_with_exception_no_retry(self):
|
||||
exceptions = config.ExceptionConfig()
|
||||
exceptions.retriables = []
|
||||
self._test_health(_validate_server_busy,
|
||||
cluster.ClusterHealth.RED,
|
||||
exceptions)
|
||||
|
||||
def test_cluster_proxy_stale_revision(self):
|
||||
|
||||
def stale_revision():
|
||||
|
@ -322,6 +341,93 @@ class ClusteredAPITestCase(nsxlib_testcase.NsxClientTestCase):
|
|||
self.assertRaises(nsxlib_exc.StaleRevision,
|
||||
api.get, 'api/v1/transport-zones')
|
||||
|
||||
def test_cluster_down_and_retry_on_server_error(self):
|
||||
|
||||
def server_error():
|
||||
return mocks.MockRequestsResponse(
|
||||
codes.INTERNAL_SERVER_ERROR,
|
||||
jsonutils.dumps({'error_message': 'test', 'error_code': 98}))
|
||||
|
||||
def all_well():
|
||||
return mocks.MockRequestsResponse(
|
||||
codes.OK,
|
||||
jsonutils.dumps({'id': 'test'}))
|
||||
|
||||
conf_managers = ['8.9.10.11', '9.10.11.12', '10.11.12.13']
|
||||
exceptions = config.ExceptionConfig()
|
||||
exceptions.ground_triggers.append(nsxlib_exc.CannotConnectToServer)
|
||||
api = self.mock_nsx_clustered_api(
|
||||
nsx_api_managers=conf_managers,
|
||||
session_response=[server_error, all_well])
|
||||
api.nsxlib_config.exception_config = exceptions
|
||||
|
||||
api.get('api/v1/transport-zones')
|
||||
# first manager should go down, second one is confirmed as UP
|
||||
# after retry
|
||||
self.assertEqual(cluster.ClusterHealth.ORANGE, api.health)
|
||||
|
||||
def test_max_retry_attempts_on_server_error(self):
|
||||
|
||||
def server_error():
|
||||
return mocks.MockRequestsResponse(
|
||||
codes.INTERNAL_SERVER_ERROR,
|
||||
jsonutils.dumps({'error_message': 'test', 'error_code': 98}))
|
||||
|
||||
conf_managers = ['8.9.10.11', '9.10.11.12', '10.11.12.13']
|
||||
exceptions = config.ExceptionConfig()
|
||||
exceptions.ground_triggers.append(nsxlib_exc.CannotConnectToServer)
|
||||
max_attempts = 4
|
||||
api = self.mock_nsx_clustered_api(
|
||||
nsx_api_managers=conf_managers,
|
||||
max_attempts=max_attempts,
|
||||
session_response=[server_error for i in range(0, max_attempts)])
|
||||
api.nsxlib_config.cluster_unavailable_retry = False
|
||||
api.nsxlib_config.exception_config = exceptions
|
||||
|
||||
self.assertRaises(nsxlib_exc.ServiceClusterUnavailable,
|
||||
api.get, 'api/v1/transport-zones')
|
||||
self.assertEqual(cluster.ClusterHealth.RED, api.health)
|
||||
|
||||
def test_max_retry_attempts_on_retriable_error(self):
|
||||
|
||||
def server_error():
|
||||
return mocks.MockRequestsResponse(
|
||||
codes.INTERNAL_SERVER_ERROR,
|
||||
jsonutils.dumps({'error_message': 'test', 'error_code': 98}))
|
||||
|
||||
conf_managers = ['8.9.10.11', '9.10.11.12', '10.11.12.13']
|
||||
exceptions = config.ExceptionConfig()
|
||||
max_attempts = 2
|
||||
api = self.mock_nsx_clustered_api(
|
||||
nsx_api_managers=conf_managers,
|
||||
max_attempts=max_attempts,
|
||||
session_response=[server_error for i in range(0, max_attempts)])
|
||||
api.nsxlib_config.exception_config = exceptions
|
||||
|
||||
self.assertRaises(nsxlib_exc.CannotConnectToServer,
|
||||
api.get, 'api/v1/transport-zones')
|
||||
# This exception does not ground endpoint
|
||||
self.assertEqual(cluster.ClusterHealth.GREEN, api.health)
|
||||
|
||||
def test_non_retriable_error(self):
|
||||
|
||||
def server_error():
|
||||
return mocks.MockRequestsResponse(
|
||||
codes.INTERNAL_SERVER_ERROR,
|
||||
jsonutils.dumps({'error_message': 'test', 'error_code': 99}))
|
||||
|
||||
conf_managers = ['8.9.10.11', '9.10.11.12', '10.11.12.13']
|
||||
exceptions = config.ExceptionConfig()
|
||||
max_attempts = 2
|
||||
api = self.mock_nsx_clustered_api(
|
||||
nsx_api_managers=conf_managers,
|
||||
max_attempts=max_attempts,
|
||||
session_response=[server_error for i in range(0, max_attempts)])
|
||||
api.nsxlib_config.exception_config = exceptions
|
||||
|
||||
api.get('api/v1/transport-zones')
|
||||
self.assertEqual(cluster.ClusterHealth.GREEN, api.health)
|
||||
|
||||
def test_cluster_proxy_connection_establish_error(self):
|
||||
|
||||
def connect_timeout():
|
||||
|
|
|
@ -305,6 +305,43 @@ class TestNsxV3Utils(nsxlib_testcase.NsxClientTestCase):
|
|||
self.assertRaises(exceptions.NsxLibInvalidInput, func_to_fail, 99)
|
||||
self.assertEqual(max_retries, total_count['val'])
|
||||
|
||||
def test_retry_random_upon_exception_result_retry(self):
|
||||
total_count = {'val': 0}
|
||||
max_retries = 3
|
||||
|
||||
@utils.retry_random_upon_exception_result(max_retries)
|
||||
def func_to_fail():
|
||||
total_count['val'] = total_count['val'] + 1
|
||||
return exceptions.NsxLibInvalidInput(error_message='foo')
|
||||
|
||||
self.assertRaises(exceptions.NsxLibInvalidInput, func_to_fail)
|
||||
self.assertEqual(max_retries, total_count['val'])
|
||||
|
||||
def test_retry_random_upon_exception_result_no_retry(self):
|
||||
total_count = {'val': 0}
|
||||
|
||||
@utils.retry_random_upon_exception_result(3)
|
||||
def func_to_fail():
|
||||
total_count['val'] = total_count['val'] + 1
|
||||
raise exceptions.NsxLibInvalidInput(error_message='foo')
|
||||
|
||||
self.assertRaises(exceptions.NsxLibInvalidInput, func_to_fail)
|
||||
# should not retry since exception is raised, and not returned
|
||||
self.assertEqual(1, total_count['val'])
|
||||
|
||||
def test_retry_random_upon_exception_result_no_retry2(self):
|
||||
total_count = {'val': 0}
|
||||
ret_val = 42
|
||||
|
||||
@utils.retry_random_upon_exception_result(3)
|
||||
def func_to_fail():
|
||||
total_count['val'] = total_count['val'] + 1
|
||||
return ret_val
|
||||
|
||||
self.assertEqual(ret_val, func_to_fail())
|
||||
# should not retry since no exception is returned
|
||||
self.assertEqual(1, total_count['val'])
|
||||
|
||||
@mock.patch.object(utils, '_update_max_nsgroups_criteria_tags')
|
||||
@mock.patch.object(utils, '_update_max_tags')
|
||||
@mock.patch.object(utils, '_update_tag_length')
|
||||
|
|
|
@ -29,6 +29,42 @@ LOG = log.getLogger(__name__)
|
|||
NULL_CURSOR_PREFIX = '0000'
|
||||
|
||||
|
||||
def get_http_error_details(response):
|
||||
msg = response.json() if response.content else ''
|
||||
error_code = None
|
||||
related_error_codes = []
|
||||
|
||||
if isinstance(msg, dict) and 'error_message' in msg:
|
||||
error_code = msg.get('error_code')
|
||||
related_errors = [error['error_message'] for error in
|
||||
msg.get('related_errors', [])]
|
||||
related_error_codes = [str(error['error_code']) for error in
|
||||
msg.get('related_errors', []) if
|
||||
error.get('error_code')]
|
||||
msg = msg['error_message']
|
||||
if related_errors:
|
||||
msg += " relatedErrors: %s" % ' '.join(related_errors)
|
||||
|
||||
return {'status_code': response.status_code,
|
||||
'error_code': error_code,
|
||||
'related_error_codes': related_error_codes,
|
||||
'details': msg}
|
||||
|
||||
|
||||
def init_http_exception_from_response(response):
|
||||
if not response:
|
||||
return None
|
||||
|
||||
error_details = get_http_error_details(response)
|
||||
if not error_details['error_code']:
|
||||
return None
|
||||
|
||||
error = http_error_to_exception(error_details['status_code'],
|
||||
error_details['error_code'])
|
||||
|
||||
return error(manager='', **error_details)
|
||||
|
||||
|
||||
def http_error_to_exception(status_code, error_code):
|
||||
errors = {
|
||||
requests.codes.NOT_FOUND:
|
||||
|
@ -99,7 +135,7 @@ class RESTClient(object):
|
|||
def list(self, resource='', headers=None, silent=False):
|
||||
return self.url_list(resource, headers=headers, silent=silent)
|
||||
|
||||
def get(self, uuid, headers=None, silent=False, with_retries=True):
|
||||
def get(self, uuid, headers=None, silent=False, with_retries=False):
|
||||
return self.url_get(uuid, headers=headers, silent=silent,
|
||||
with_retries=with_retries)
|
||||
|
||||
|
@ -132,7 +168,7 @@ class RESTClient(object):
|
|||
cursor = page.get('cursor', NULL_CURSOR_PREFIX)
|
||||
return concatenate_response
|
||||
|
||||
def url_get(self, url, headers=None, silent=False, with_retries=True):
|
||||
def url_get(self, url, headers=None, silent=False, with_retries=False):
|
||||
return self._rest_call(url, method='GET', headers=headers,
|
||||
silent=silent, with_retries=with_retries)
|
||||
|
||||
|
@ -151,10 +187,10 @@ class RESTClient(object):
|
|||
def url_patch(self, url, body, headers=None):
|
||||
return self._rest_call(url, method='PATCH', body=body, headers=headers)
|
||||
|
||||
def _raise_error(self, status_code, operation, result_msg,
|
||||
def _raise_error(self, operation, status_code, details,
|
||||
error_code=None, related_error_codes=None):
|
||||
error = http_error_to_exception(status_code, error_code)
|
||||
raise error(manager='', operation=operation, details=result_msg,
|
||||
raise error(manager='', operation=operation, details=details,
|
||||
error_code=error_code,
|
||||
related_error_codes=related_error_codes,
|
||||
status_code=status_code)
|
||||
|
@ -171,22 +207,9 @@ class RESTClient(object):
|
|||
for code in expected]),
|
||||
'body': result_msg})
|
||||
|
||||
error_code = None
|
||||
related_error_codes = []
|
||||
if isinstance(result_msg, dict) and 'error_message' in result_msg:
|
||||
error_code = result_msg.get('error_code')
|
||||
related_errors = [error['error_message'] for error in
|
||||
result_msg.get('related_errors', [])]
|
||||
related_error_codes = [str(error['error_code']) for error in
|
||||
result_msg.get('related_errors', []) if
|
||||
error.get('error_code')]
|
||||
result_msg = result_msg['error_message']
|
||||
if related_errors:
|
||||
result_msg += " relatedErrors: %s" % ' '.join(
|
||||
related_errors)
|
||||
self._raise_error(result.status_code, operation, result_msg,
|
||||
error_code=error_code,
|
||||
related_error_codes=related_error_codes)
|
||||
error_details = get_http_error_details(result)
|
||||
|
||||
self._raise_error(operation, **error_details)
|
||||
|
||||
@classmethod
|
||||
def merge_headers(cls, *headers):
|
||||
|
@ -322,35 +345,21 @@ class NSX3Client(JSONRESTClient):
|
|||
default_headers=default_headers,
|
||||
client_obj=client_obj)
|
||||
|
||||
def _raise_error(self, status_code, operation, result_msg,
|
||||
def _raise_error(self, operation, status_code, details,
|
||||
error_code=None, related_error_codes=None):
|
||||
"""Override the Rest client errors to add the manager IPs"""
|
||||
error = http_error_to_exception(status_code, error_code)
|
||||
raise error(manager=self.nsx_api_managers,
|
||||
operation=operation,
|
||||
details=result_msg,
|
||||
details=details,
|
||||
error_code=error_code,
|
||||
related_error_codes=related_error_codes,
|
||||
status_code=status_code)
|
||||
|
||||
def _rest_call(self, url, **kwargs):
|
||||
if kwargs.get('with_retries', True):
|
||||
# Retry on "607: Persistence layer is currently reconfiguring"
|
||||
# and on "98: Cannot connect to server"
|
||||
retry_codes = [exceptions.APITransactionAborted,
|
||||
exceptions.CannotConnectToServer]
|
||||
if self.rate_limit_retry:
|
||||
# If too many requests are handled by the nsx at the same time,
|
||||
# error "429: Too Many Requests" or "503: Server Unavailable"
|
||||
# will be returned.
|
||||
retry_codes.append(exceptions.ServerBusy)
|
||||
LOG.warning("with_retries setting is deprecated and will be "
|
||||
"removed. Please use exceptions setting in nsxlib "
|
||||
"config instead")
|
||||
|
||||
# the client is expected to retry after a random 400-600 milli,
|
||||
# and later exponentially until 5 seconds wait
|
||||
@utils.retry_random_upon_exception(
|
||||
tuple(retry_codes), max_attempts=self.max_attempts)
|
||||
def _rest_call_with_retry(self, url, **kwargs):
|
||||
return super(NSX3Client, self)._rest_call(url, **kwargs)
|
||||
return _rest_call_with_retry(self, url, **kwargs)
|
||||
else:
|
||||
return super(NSX3Client, self)._rest_call(url, **kwargs)
|
||||
return super(NSX3Client, self)._rest_call(url, **kwargs)
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
import abc
|
||||
import contextlib
|
||||
import copy
|
||||
|
@ -31,7 +32,6 @@ from oslo_log import log
|
|||
from oslo_service import loopingcall
|
||||
import requests
|
||||
from requests import adapters
|
||||
from requests import exceptions as requests_exceptions
|
||||
import six
|
||||
import six.moves.urllib.parse as urlparse
|
||||
import urllib3
|
||||
|
@ -78,20 +78,6 @@ class AbstractHTTPProvider(object):
|
|||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def is_connection_exception(self, exception):
|
||||
"""Determine if the given exception is related to connection failure.
|
||||
|
||||
Return True if it's a connection exception and False otherwise.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def is_timeout_exception(self, exception):
|
||||
"""Determine if the given exception is related to timeout.
|
||||
|
||||
Return True if it's a timeout exception and False otherwise.
|
||||
"""
|
||||
|
||||
|
||||
class TimeoutSession(requests.Session):
|
||||
"""Extends requests.Session to support timeout at the session level."""
|
||||
|
@ -245,15 +231,6 @@ class NSXRequestsHTTPProvider(AbstractHTTPProvider):
|
|||
|
||||
return session
|
||||
|
||||
def is_connection_exception(self, exception):
|
||||
return isinstance(exception, requests_exceptions.ConnectionError)
|
||||
|
||||
def is_timeout_exception(self, exception):
|
||||
return isinstance(exception, requests_exceptions.Timeout)
|
||||
|
||||
def is_conn_open_exception(self, exception):
|
||||
return isinstance(exception, requests_exceptions.ConnectTimeout)
|
||||
|
||||
def get_default_headers(self, session, provider, allow_overwrite_header,
|
||||
token_provider=None):
|
||||
"""Get the default headers that should be added to future requests"""
|
||||
|
@ -586,7 +563,12 @@ class ClusteredAPI(object):
|
|||
# regenerate connection pool based on token
|
||||
endpoint.regenerate_pool()
|
||||
except Exception as e:
|
||||
endpoint.set_state(EndpointState.DOWN)
|
||||
if self.nsxlib_config.exception_config.should_retry(e):
|
||||
LOG.info("Exception is retriable, endpoint stays UP")
|
||||
endpoint.set_state(EndpointState.UP)
|
||||
else:
|
||||
endpoint.set_state(EndpointState.DOWN)
|
||||
|
||||
LOG.warning("Failed to validate API cluster endpoint "
|
||||
"'%(ep)s' due to: %(err)s",
|
||||
{'ep': endpoint, 'err': e})
|
||||
|
@ -667,6 +649,20 @@ class ClusteredAPI(object):
|
|||
# slow rate at once per 33 seconds by default.
|
||||
yield EndpointConnection(endpoint, conn, conn_wait, rate_wait)
|
||||
|
||||
def _raise_http_exception_if_needed(self, response):
|
||||
# We need to inspect http codes to understand whether
|
||||
# this error is relevant for endpoint-level decisions, such
|
||||
# as ground endpoint or retry with next endpoint
|
||||
exc = nsx_client.init_http_exception_from_response(response)
|
||||
if not exc:
|
||||
# This exception is irrelevant for endpoint decisions
|
||||
return
|
||||
|
||||
exc_config = self.nsxlib_config.exception_config
|
||||
if (exc_config.should_ground_endpoint(exc) or
|
||||
exc_config.should_retry(exc)):
|
||||
raise exc
|
||||
|
||||
def _proxy_stub(self, proxy_for):
|
||||
def _call_proxy(url, *args, **kwargs):
|
||||
return self._proxy(proxy_for, url, *args, **kwargs)
|
||||
|
@ -674,7 +670,8 @@ class ClusteredAPI(object):
|
|||
|
||||
def _proxy(self, proxy_for, uri, *args, **kwargs):
|
||||
|
||||
@utils.retry_upon_none_result(self.nsxlib_config.max_attempts)
|
||||
@utils.retry_random_upon_exception_result(
|
||||
max_attempts=self.nsxlib_config.max_attempts)
|
||||
def _proxy_internal(proxy_for, uri, *args, **kwargs):
|
||||
# proxy http request call to an avail endpoint
|
||||
with self.endpoint_connection() as conn_data:
|
||||
|
@ -702,22 +699,29 @@ class ClusteredAPI(object):
|
|||
response = do_request(url, *args, **kwargs)
|
||||
endpoint.set_state(EndpointState.UP)
|
||||
|
||||
# for some status codes, we need to bring the cluster
|
||||
# down or retry API call
|
||||
self._raise_http_exception_if_needed(response)
|
||||
|
||||
return response
|
||||
except Exception as e:
|
||||
LOG.warning("Request failed due to: %s", e)
|
||||
if (not self._http_provider.is_connection_exception(e) and
|
||||
not self._http_provider.is_timeout_exception(e)):
|
||||
# only trap and retry connection & timeout errors
|
||||
raise e
|
||||
if self._http_provider.is_conn_open_exception(e):
|
||||
# unable to establish new connection - endpoint is
|
||||
# inaccessible
|
||||
exc_config = self.nsxlib_config.exception_config
|
||||
if exc_config.should_ground_endpoint(e):
|
||||
# consider endpoint inaccessible and move to next
|
||||
# endpoint
|
||||
endpoint.set_state(EndpointState.DOWN)
|
||||
|
||||
LOG.info("Connection to %s failed, checking additional "
|
||||
"connections and endpoints" % url)
|
||||
# this might be a result of server closing connection
|
||||
# return None so it will retry upto max_attempts.
|
||||
elif not exc_config.should_retry(e):
|
||||
LOG.info("Exception %s is configured as not retriable",
|
||||
e)
|
||||
raise e
|
||||
|
||||
# Returning the exception instead of raising it will cause
|
||||
# decarator to retry. If retry attempts is exceeded, this
|
||||
# same exception will be raised due to overriden reraise
|
||||
# method of RetryAttemptsExceeded
|
||||
return e
|
||||
|
||||
return _proxy_internal(proxy_for, uri, *args, **kwargs)
|
||||
|
||||
|
|
|
@ -15,9 +15,45 @@
|
|||
|
||||
from oslo_log import log
|
||||
|
||||
from requests import exceptions as requests_exceptions
|
||||
|
||||
from vmware_nsxlib.v3 import exceptions as v3_exceptions
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class ExceptionConfig(object):
|
||||
|
||||
def __init__(self):
|
||||
# When hit during API call, these exceptions will mark
|
||||
# endpoint as DOWN immediately
|
||||
# This setting has no effect on keepalive validation
|
||||
self.ground_triggers = [requests_exceptions.ConnectionError,
|
||||
requests_exceptions.Timeout]
|
||||
|
||||
# When hit during API call, these exceptions will be
|
||||
# retried with next available endpoint
|
||||
# When hit during validation, these exception will not
|
||||
# mark endpoint as DOWN
|
||||
self.retriables = [v3_exceptions.APITransactionAborted,
|
||||
v3_exceptions.CannotConnectToServer,
|
||||
v3_exceptions.ServerBusy]
|
||||
|
||||
def should_ground_endpoint(self, ex):
|
||||
for exception in self.ground_triggers:
|
||||
if isinstance(ex, exception):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def should_retry(self, ex):
|
||||
for exception in self.retriables:
|
||||
if isinstance(ex, exception):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
class NsxLibConfig(object):
|
||||
"""Class holding all the configuration parameters used by the nsxlib code.
|
||||
|
||||
|
@ -131,7 +167,8 @@ class NsxLibConfig(object):
|
|||
allow_passthrough=False,
|
||||
realization_max_attempts=50,
|
||||
realization_wait_sec=1.0,
|
||||
api_rate_limit_per_endpoint=None):
|
||||
api_rate_limit_per_endpoint=None,
|
||||
exception_config=None):
|
||||
|
||||
self.nsx_api_managers = nsx_api_managers
|
||||
self._username = username
|
||||
|
@ -160,6 +197,7 @@ class NsxLibConfig(object):
|
|||
self.realization_max_attempts = realization_max_attempts
|
||||
self.realization_wait_sec = realization_wait_sec
|
||||
self.api_rate_limit_per_endpoint = api_rate_limit_per_endpoint
|
||||
self.exception_config = exception_config or ExceptionConfig()
|
||||
|
||||
if len(nsx_api_managers) == 1 and not self.cluster_unavailable_retry:
|
||||
LOG.warning("When only one endpoint is provided, keepalive probes"
|
||||
|
|
|
@ -26,7 +26,7 @@ from tenacity import _utils as tenacity_utils
|
|||
|
||||
from vmware_nsxlib._i18n import _
|
||||
from vmware_nsxlib.v3 import constants
|
||||
from vmware_nsxlib.v3 import exceptions as nsxlib_exceptions
|
||||
from vmware_nsxlib.v3 import exceptions as nsxlib_exc
|
||||
from vmware_nsxlib.v3 import nsx_constants
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
@ -105,7 +105,7 @@ def update_tag_limits(limits):
|
|||
def _validate_resource_type_length(resource_type):
|
||||
# Add in a validation to ensure that we catch this at build time
|
||||
if len(resource_type) > MAX_RESOURCE_TYPE_LEN:
|
||||
raise nsxlib_exceptions.NsxLibInvalidInput(
|
||||
raise nsxlib_exc.NsxLibInvalidInput(
|
||||
error_message=(_('Resource type cannot exceed %(max_len)s '
|
||||
'characters: %(resource_type)s') %
|
||||
{'max_len': MAX_RESOURCE_TYPE_LEN,
|
||||
|
@ -215,6 +215,26 @@ def retry_upon_none_result(max_attempts, delay=0.5, max_delay=10,
|
|||
before=_log_before_retry, after=_log_after_retry)
|
||||
|
||||
|
||||
class RetryAttemptsExceeded(tenacity.RetryError):
|
||||
def reraise(self):
|
||||
raise self.last_attempt.result()
|
||||
|
||||
|
||||
# Retry when exception is returned by decorated function.
|
||||
# If retry attempts are exceeded, reraise the last exception.
|
||||
# This is achieved by overriding reraise method of RetryAttemptsExceeded
|
||||
def retry_random_upon_exception_result(max_attempts, delay=0.5, max_delay=10):
|
||||
wait_func = tenacity.wait_random_exponential(
|
||||
multiplier=delay, max=max_delay)
|
||||
return tenacity.retry(reraise=True,
|
||||
retry_error_cls=RetryAttemptsExceeded,
|
||||
retry=tenacity.retry_if_result(
|
||||
lambda x: isinstance(x, Exception)),
|
||||
wait=wait_func,
|
||||
stop=tenacity.stop_after_attempt(max_attempts),
|
||||
before=_log_before_retry, after=_log_after_retry)
|
||||
|
||||
|
||||
def list_match(list1, list2):
|
||||
# Check if list1 and list2 have identical elements, but relaxed on
|
||||
# dict elements where list1's dict element can be a subset of list2's
|
||||
|
@ -432,7 +452,7 @@ class NsxLibApiBase(object):
|
|||
# NSX has, we will get a 412: Precondition Failed.
|
||||
# In that case we need to re-fetch, patch the response and send
|
||||
# it again with the new revision_id
|
||||
@retry_upon_exception(nsxlib_exceptions.StaleRevision,
|
||||
@retry_upon_exception(nsxlib_exc.StaleRevision,
|
||||
max_attempts=self.max_attempts)
|
||||
def do_update():
|
||||
return self._internal_update_resource(
|
||||
|
@ -458,7 +478,7 @@ class NsxLibApiBase(object):
|
|||
|
||||
def _delete_by_path_with_retry(self, path):
|
||||
# Using internal method so we can access max_attempts in the decorator
|
||||
@retry_upon_exception(nsxlib_exceptions.StaleRevision,
|
||||
@retry_upon_exception(nsxlib_exc.StaleRevision,
|
||||
max_attempts=self.max_attempts)
|
||||
def _do_delete():
|
||||
self.client.delete(path)
|
||||
|
@ -467,7 +487,7 @@ class NsxLibApiBase(object):
|
|||
|
||||
def _create_with_retry(self, resource, body=None, headers=None):
|
||||
# Using internal method so we can access max_attempts in the decorator
|
||||
@retry_upon_exception(nsxlib_exceptions.StaleRevision,
|
||||
@retry_upon_exception(nsxlib_exc.StaleRevision,
|
||||
max_attempts=self.max_attempts)
|
||||
def _do_create():
|
||||
return self.client.create(resource, body, headers=headers)
|
||||
|
@ -489,11 +509,11 @@ class NsxLibApiBase(object):
|
|||
if len(matched_results) == 0:
|
||||
err_msg = (_("Could not find %(resource)s %(name)s") %
|
||||
{'name': name_or_id, 'resource': resource})
|
||||
raise nsxlib_exceptions.ManagerError(details=err_msg)
|
||||
raise nsxlib_exc.ManagerError(details=err_msg)
|
||||
elif len(matched_results) > 1:
|
||||
err_msg = (_("Found multiple %(resource)s named %(name)s") %
|
||||
{'name': name_or_id, 'resource': resource})
|
||||
raise nsxlib_exceptions.ManagerError(details=err_msg)
|
||||
raise nsxlib_exc.ManagerError(details=err_msg)
|
||||
|
||||
return matched_results[0].get('id')
|
||||
|
||||
|
@ -579,24 +599,24 @@ def validate_icmp_params(icmp_type, icmp_code, icmp_version=4, strict=False):
|
|||
if icmp_type:
|
||||
if (strict and icmp_type not in
|
||||
constants.IPV4_ICMP_STRICT_TYPES):
|
||||
raise nsxlib_exceptions.InvalidInput(
|
||||
raise nsxlib_exc.InvalidInput(
|
||||
operation='create_rule',
|
||||
arg_val=icmp_type,
|
||||
arg_name='icmp_type')
|
||||
if icmp_type not in constants.IPV4_ICMP_TYPES:
|
||||
raise nsxlib_exceptions.InvalidInput(
|
||||
raise nsxlib_exc.InvalidInput(
|
||||
operation='create_rule',
|
||||
arg_val=icmp_type,
|
||||
arg_name='icmp_type')
|
||||
if (icmp_code and strict and icmp_code not in
|
||||
constants.IPV4_ICMP_STRICT_TYPES[icmp_type]):
|
||||
raise nsxlib_exceptions.InvalidInput(
|
||||
raise nsxlib_exc.InvalidInput(
|
||||
operation='create_rule',
|
||||
arg_val=icmp_code,
|
||||
arg_name='icmp_code for this icmp_type')
|
||||
if (icmp_code and icmp_code not in
|
||||
constants.IPV4_ICMP_TYPES[icmp_type]):
|
||||
raise nsxlib_exceptions.InvalidInput(
|
||||
raise nsxlib_exc.InvalidInput(
|
||||
operation='create_rule',
|
||||
arg_val=icmp_code,
|
||||
arg_name='icmp_code for this icmp_type')
|
||||
|
@ -610,7 +630,7 @@ def get_l4_protocol_name(protocol_number):
|
|||
try:
|
||||
protocol_number = int(protocol_number)
|
||||
except ValueError:
|
||||
raise nsxlib_exceptions.InvalidInput(
|
||||
raise nsxlib_exc.InvalidInput(
|
||||
operation='create_rule',
|
||||
arg_val=protocol_number,
|
||||
arg_name='protocol')
|
||||
|
|
Loading…
Reference in New Issue