Merge "Adding ability to retry clients calls"
This commit is contained in:
commit
f96a28023f
@ -127,6 +127,7 @@ def list_opts():
|
|||||||
from sahara.service.edp import job_utils
|
from sahara.service.edp import job_utils
|
||||||
from sahara.service import periodic
|
from sahara.service import periodic
|
||||||
from sahara.utils import cluster_progress_ops as cpo
|
from sahara.utils import cluster_progress_ops as cpo
|
||||||
|
from sahara.utils.openstack import base
|
||||||
from sahara.utils.openstack import heat
|
from sahara.utils.openstack import heat
|
||||||
from sahara.utils.openstack import neutron
|
from sahara.utils.openstack import neutron
|
||||||
from sahara.utils.openstack import nova
|
from sahara.utils.openstack import nova
|
||||||
@ -151,7 +152,8 @@ def list_opts():
|
|||||||
periodic.periodic_opts,
|
periodic.periodic_opts,
|
||||||
proxy.opts,
|
proxy.opts,
|
||||||
cpo.event_log_opts,
|
cpo.event_log_opts,
|
||||||
wsgi.wsgi_opts)),
|
wsgi.wsgi_opts,
|
||||||
|
base.opts)),
|
||||||
(poll_utils.timeouts.name,
|
(poll_utils.timeouts.name,
|
||||||
itertools.chain(poll_utils.timeouts_opts)),
|
itertools.chain(poll_utils.timeouts_opts)),
|
||||||
(api.conductor_group.name,
|
(api.conductor_group.name,
|
||||||
@ -167,7 +169,9 @@ def list_opts():
|
|||||||
(swift.swift_group.name,
|
(swift.swift_group.name,
|
||||||
itertools.chain(swift.opts)),
|
itertools.chain(swift.opts)),
|
||||||
(keystone.keystone_group.name,
|
(keystone.keystone_group.name,
|
||||||
itertools.chain(keystone.ssl_opts))
|
itertools.chain(keystone.ssl_opts)),
|
||||||
|
(base.retries.name,
|
||||||
|
itertools.chain(base.opts))
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@ -359,3 +359,14 @@ class UpdateFailedException(SaharaException):
|
|||||||
self.message = message
|
self.message = message
|
||||||
self.message = self.message % value
|
self.message = self.message % value
|
||||||
super(UpdateFailedException, self).__init__()
|
super(UpdateFailedException, self).__init__()
|
||||||
|
|
||||||
|
|
||||||
|
class MaxRetriesExceeded(SaharaException):
|
||||||
|
code = "MAX_RETRIES_EXCEEDED"
|
||||||
|
message = _("Operation %(operation)s wasn't executed correctly after "
|
||||||
|
"%(attempts)d attempts")
|
||||||
|
|
||||||
|
def __init__(self, attempts, operation):
|
||||||
|
self.message = self.message % {'operation': operation,
|
||||||
|
'attempts': attempts}
|
||||||
|
super(MaxRetriesExceeded, self).__init__()
|
||||||
|
@ -13,6 +13,15 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
|
||||||
|
from cinderclient import exceptions as cinder_exc
|
||||||
|
from heatclient import exc as heat_exc
|
||||||
|
from keystoneclient import exceptions as keystone_exc
|
||||||
|
import mock
|
||||||
|
from neutronclient.common import exceptions as neutron_exc
|
||||||
|
from novaclient import exceptions as nova_exc
|
||||||
|
|
||||||
|
from sahara import exceptions as sahara_exc
|
||||||
from sahara.tests.unit import base as testbase
|
from sahara.tests.unit import base as testbase
|
||||||
from sahara.utils.openstack import base
|
from sahara.utils.openstack import base
|
||||||
|
|
||||||
@ -82,3 +91,105 @@ class AuthUrlTest(testbase.SaharaTestCase):
|
|||||||
_assert("https://127.0.0.1:8080/v3/")
|
_assert("https://127.0.0.1:8080/v3/")
|
||||||
_assert("https://127.0.0.1:8080/v42")
|
_assert("https://127.0.0.1:8080/v42")
|
||||||
_assert("https://127.0.0.1:8080/v42/")
|
_assert("https://127.0.0.1:8080/v42/")
|
||||||
|
|
||||||
|
|
||||||
|
class ExecuteWithRetryTest(testbase.SaharaTestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(ExecuteWithRetryTest, self).setUp()
|
||||||
|
self.fake_client_call = mock.MagicMock()
|
||||||
|
self.fake_client_call.__name__ = 'fake_client_call'
|
||||||
|
self.override_config('retries_number', 2, 'retries')
|
||||||
|
|
||||||
|
@mock.patch('sahara.context.sleep')
|
||||||
|
def _check_error_without_retry(self, error, code, m_sleep):
|
||||||
|
self.fake_client_call.side_effect = error(code)
|
||||||
|
|
||||||
|
self.assertRaises(error, base.execute_with_retries,
|
||||||
|
self.fake_client_call)
|
||||||
|
self.assertEqual(1, self.fake_client_call.call_count)
|
||||||
|
self.fake_client_call.reset_mock()
|
||||||
|
|
||||||
|
@mock.patch('sahara.context.sleep')
|
||||||
|
def _check_error_with_retry(self, error, code, m_sleep):
|
||||||
|
self.fake_client_call.side_effect = error(code)
|
||||||
|
|
||||||
|
self.assertRaises(sahara_exc.MaxRetriesExceeded,
|
||||||
|
base.execute_with_retries, self.fake_client_call)
|
||||||
|
self.assertEqual(3, self.fake_client_call.call_count)
|
||||||
|
self.fake_client_call.reset_mock()
|
||||||
|
|
||||||
|
def test_novaclient_calls_without_retry(self):
|
||||||
|
# check that following errors will not be retried
|
||||||
|
self._check_error_without_retry(nova_exc.BadRequest, 400)
|
||||||
|
self._check_error_without_retry(nova_exc.Unauthorized, 401)
|
||||||
|
self._check_error_without_retry(nova_exc.Forbidden, 403)
|
||||||
|
self._check_error_without_retry(nova_exc.NotFound, 404)
|
||||||
|
self._check_error_without_retry(nova_exc.MethodNotAllowed, 405)
|
||||||
|
self._check_error_without_retry(nova_exc.Conflict, 409)
|
||||||
|
self._check_error_without_retry(nova_exc.HTTPNotImplemented, 501)
|
||||||
|
|
||||||
|
def test_novaclient_calls_with_retry(self):
|
||||||
|
# check that following errors will be retried
|
||||||
|
self._check_error_with_retry(nova_exc.OverLimit, 413)
|
||||||
|
self._check_error_with_retry(nova_exc.RateLimit, 429)
|
||||||
|
|
||||||
|
def test_cinderclient_calls_without_retry(self):
|
||||||
|
# check that following errors will not be retried
|
||||||
|
self._check_error_without_retry(cinder_exc.BadRequest, 400)
|
||||||
|
self._check_error_without_retry(cinder_exc.Unauthorized, 401)
|
||||||
|
self._check_error_without_retry(cinder_exc.Forbidden, 403)
|
||||||
|
self._check_error_without_retry(cinder_exc.NotFound, 404)
|
||||||
|
self._check_error_without_retry(nova_exc.HTTPNotImplemented, 501)
|
||||||
|
|
||||||
|
def test_cinderclient_calls_with_retry(self):
|
||||||
|
# check that following error will be retried
|
||||||
|
self._check_error_with_retry(cinder_exc.OverLimit, 413)
|
||||||
|
|
||||||
|
def test_neutronclient_calls_without_retry(self):
|
||||||
|
# check that following errors will not be retried
|
||||||
|
self._check_error_without_retry(neutron_exc.BadRequest, 400)
|
||||||
|
self._check_error_without_retry(neutron_exc.Forbidden, 403)
|
||||||
|
self._check_error_without_retry(neutron_exc.NotFound, 404)
|
||||||
|
self._check_error_without_retry(neutron_exc.Conflict, 409)
|
||||||
|
|
||||||
|
def test_neutronclient_calls_with_retry(self):
|
||||||
|
# check that following errors will be retried
|
||||||
|
self._check_error_with_retry(neutron_exc.InternalServerError, 500)
|
||||||
|
self._check_error_with_retry(neutron_exc.ServiceUnavailable, 503)
|
||||||
|
|
||||||
|
def test_heatclient_calls_without_retry(self):
|
||||||
|
# check that following errors will not be retried
|
||||||
|
self._check_error_without_retry(heat_exc.HTTPBadRequest, 400)
|
||||||
|
self._check_error_without_retry(heat_exc.HTTPUnauthorized, 401)
|
||||||
|
self._check_error_without_retry(heat_exc.HTTPForbidden, 403)
|
||||||
|
self._check_error_without_retry(heat_exc.HTTPNotFound, 404)
|
||||||
|
self._check_error_without_retry(heat_exc.HTTPMethodNotAllowed, 405)
|
||||||
|
self._check_error_without_retry(heat_exc.HTTPConflict, 409)
|
||||||
|
self._check_error_without_retry(heat_exc.HTTPUnsupported, 415)
|
||||||
|
self._check_error_without_retry(heat_exc.HTTPNotImplemented, 501)
|
||||||
|
|
||||||
|
def test_heatclient_calls_with_retry(self):
|
||||||
|
# check that following errors will be retried
|
||||||
|
self._check_error_with_retry(heat_exc.HTTPInternalServerError, 500)
|
||||||
|
self._check_error_with_retry(heat_exc.HTTPBadGateway, 502)
|
||||||
|
self._check_error_with_retry(heat_exc.HTTPServiceUnavailable, 503)
|
||||||
|
|
||||||
|
def test_keystoneclient_calls_without_retry(self):
|
||||||
|
# check that following errors will not be retried
|
||||||
|
self._check_error_without_retry(keystone_exc.BadRequest, 400)
|
||||||
|
self._check_error_without_retry(keystone_exc.Unauthorized, 401)
|
||||||
|
self._check_error_without_retry(keystone_exc.Forbidden, 403)
|
||||||
|
self._check_error_without_retry(keystone_exc.NotFound, 404)
|
||||||
|
self._check_error_without_retry(keystone_exc.MethodNotAllowed, 405)
|
||||||
|
self._check_error_without_retry(keystone_exc.Conflict, 409)
|
||||||
|
self._check_error_without_retry(keystone_exc.UnsupportedMediaType, 415)
|
||||||
|
self._check_error_without_retry(keystone_exc.HttpNotImplemented, 501)
|
||||||
|
|
||||||
|
def test_keystoneclient_calls_with_retry(self):
|
||||||
|
# check that following errors will be retried
|
||||||
|
self._check_error_with_retry(keystone_exc.RequestTimeout, 408)
|
||||||
|
self._check_error_with_retry(keystone_exc.InternalServerError, 500)
|
||||||
|
self._check_error_with_retry(keystone_exc.BadGateway, 502)
|
||||||
|
self._check_error_with_retry(keystone_exc.ServiceUnavailable, 503)
|
||||||
|
self._check_error_with_retry(keystone_exc.GatewayTimeout, 504)
|
||||||
|
@ -14,14 +14,37 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
|
from oslo_log import log as logging
|
||||||
from oslo_serialization import jsonutils as json
|
from oslo_serialization import jsonutils as json
|
||||||
from six.moves.urllib import parse as urlparse
|
from six.moves.urllib import parse as urlparse
|
||||||
|
|
||||||
from sahara import context
|
from sahara import context
|
||||||
from sahara import exceptions as ex
|
from sahara import exceptions as ex
|
||||||
from sahara.i18n import _
|
from sahara.i18n import _
|
||||||
|
from sahara.i18n import _LE
|
||||||
|
from sahara.i18n import _LW
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# List of the errors, that can be retried
|
||||||
|
ERRORS_TO_RETRY = [408, 413, 429, 500, 502, 503, 504]
|
||||||
|
|
||||||
|
opts = [
|
||||||
|
cfg.IntOpt('retries_number',
|
||||||
|
default=5,
|
||||||
|
help='Number of times to retry the request to client before '
|
||||||
|
'failing'),
|
||||||
|
cfg.IntOpt('retry_after',
|
||||||
|
default=10,
|
||||||
|
help='Time between the retries to client (in seconds).')
|
||||||
|
]
|
||||||
|
|
||||||
|
retries = cfg.OptGroup(name='retries',
|
||||||
|
title='OpenStack clients calls retries')
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
CONF.register_group(retries)
|
||||||
|
CONF.register_opts(opts, group=retries)
|
||||||
|
|
||||||
|
|
||||||
def url_for(service_catalog, service_type, admin=False, endpoint_type=None):
|
def url_for(service_catalog, service_type, admin=False, endpoint_type=None):
|
||||||
@ -84,3 +107,31 @@ def retrieve_auth_url():
|
|||||||
version = 'v3' if CONF.use_identity_api_v3 else 'v2.0'
|
version = 'v3' if CONF.use_identity_api_v3 else 'v2.0'
|
||||||
|
|
||||||
return "%s://%s:%s/%s/" % (info.scheme, info.hostname, info.port, version)
|
return "%s://%s:%s/%s/" % (info.scheme, info.hostname, info.port, version)
|
||||||
|
|
||||||
|
|
||||||
|
def execute_with_retries(method, *args, **kwargs):
|
||||||
|
attempts = CONF.retries.retries_number + 1
|
||||||
|
while attempts > 0:
|
||||||
|
try:
|
||||||
|
return method(*args, **kwargs)
|
||||||
|
except Exception as e:
|
||||||
|
error_code = getattr(e, 'http_status', None) or getattr(
|
||||||
|
e, 'status_code', None) or getattr(e, 'code', None)
|
||||||
|
if error_code in ERRORS_TO_RETRY:
|
||||||
|
LOG.warning(_LW('Occasional error occured during "{method}" '
|
||||||
|
'execution: {error_msg} ({error_code}). '
|
||||||
|
'Operation will be retried.').format(
|
||||||
|
method=method.__name__,
|
||||||
|
error_msg=e,
|
||||||
|
error_code=error_code))
|
||||||
|
attempts -= 1
|
||||||
|
retry_after = getattr(e, 'retry_after', 0)
|
||||||
|
context.sleep(max(retry_after, CONF.retries.retry_after))
|
||||||
|
else:
|
||||||
|
LOG.error(_LE('Permanent error occured during "{method}" '
|
||||||
|
'execution: {error_msg}.').format(
|
||||||
|
method=method.__name__,
|
||||||
|
error_msg=e))
|
||||||
|
raise e
|
||||||
|
else:
|
||||||
|
raise ex.MaxRetriesExceeded(attempts, method.__name__)
|
||||||
|
Loading…
Reference in New Issue
Block a user