Merge "Add retries to fetching the zookeeper server version"
This commit is contained in:
@@ -25,6 +25,9 @@ futures>=2.1.6
|
||||
# Used for structured input validation
|
||||
jsonschema>=2.0.0,<3.0.0
|
||||
|
||||
# For utility retries...
|
||||
retrying>=1.2.3,!=1.3.0
|
||||
|
||||
# For common utilities
|
||||
oslo.utils>=1.2.0 # Apache-2.0
|
||||
oslo.serialization>=1.2.0 # Apache-2.0
|
||||
|
||||
@@ -19,6 +19,9 @@ stevedore>=1.1.0 # Apache-2.0
|
||||
# Used for structured input validation
|
||||
jsonschema>=2.0.0,<3.0.0
|
||||
|
||||
# For utility retries...
|
||||
retrying>=1.2.3,!=1.3.0
|
||||
|
||||
# For common utilities
|
||||
oslo.utils>=1.2.0 # Apache-2.0
|
||||
oslo.serialization>=1.2.0 # Apache-2.0
|
||||
|
||||
56
taskflow/tests/unit/test_utils_kazoo_utils.py
Normal file
56
taskflow/tests/unit/test_utils_kazoo_utils.py
Normal file
@@ -0,0 +1,56 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from kazoo import client
|
||||
from kazoo import exceptions as k_exc
|
||||
from six.moves import range as compat_range
|
||||
|
||||
from taskflow import test
|
||||
from taskflow.test import mock
|
||||
from taskflow.utils import kazoo_utils as ku
|
||||
|
||||
_FAKE_ZK_VER = (3, 4, 0)
|
||||
|
||||
|
||||
def _iter_succeed_after(attempts):
|
||||
for i in compat_range(0, attempts):
|
||||
if i % 2 == 0:
|
||||
yield ValueError("Broken")
|
||||
else:
|
||||
yield AttributeError("Broken")
|
||||
yield _FAKE_ZK_VER
|
||||
|
||||
|
||||
class KazooUtilTest(test.TestCase):
|
||||
def test_flakey_version_fetch_fail(self):
|
||||
m = mock.create_autospec(client.KazooClient, instance=True)
|
||||
m.server_version.side_effect = _iter_succeed_after(11)
|
||||
self.assertRaises(k_exc.KazooException,
|
||||
ku.fetch_server_version, m, 10)
|
||||
self.assertEqual(10, m.server_version.call_count)
|
||||
|
||||
def test_flakey_version_fetch_fail_truncated(self):
|
||||
m = mock.create_autospec(client.KazooClient, instance=True)
|
||||
m.server_version.side_effect = [None, [], "", [1]]
|
||||
self.assertRaises(k_exc.KazooException,
|
||||
ku.fetch_server_version, m, 4)
|
||||
self.assertEqual(4, m.server_version.call_count)
|
||||
|
||||
def test_flakey_version_fetch_pass(self):
|
||||
m = mock.create_autospec(client.KazooClient, instance=True)
|
||||
m.server_version.side_effect = _iter_succeed_after(4)
|
||||
self.assertEqual((3, 4, 0), ku.fetch_server_version(m, 5))
|
||||
self.assertEqual(5, m.server_version.call_count)
|
||||
@@ -17,6 +17,7 @@
|
||||
from kazoo import client
|
||||
from kazoo import exceptions as k_exc
|
||||
from oslo_utils import reflection
|
||||
import retrying
|
||||
import six
|
||||
from six.moves import zip as compat_zip
|
||||
|
||||
@@ -138,7 +139,32 @@ def finalize_client(client):
|
||||
pass
|
||||
|
||||
|
||||
def check_compatible(client, min_version=None, max_version=None):
|
||||
def fetch_server_version(client, fetch_attempts):
|
||||
"""Fetches the server version but also handles its apparent flakiness.
|
||||
|
||||
The issue @ https://github.com/python-zk/kazoo/issues/274 explains
|
||||
why this happens and how it may become better at some point in the
|
||||
future; once that issue is addressed we should be able to handle this
|
||||
better...
|
||||
"""
|
||||
# If for some reason the parsed version is not composed of a 'major.minor'
|
||||
# version we likely got a truncated value back and we should try again...
|
||||
retry_on_result = lambda version: not version or len(version) <= 1
|
||||
retry_on_exception = lambda exc: isinstance(exc, (AttributeError,
|
||||
ValueError))
|
||||
r = retrying.Retrying(retry_on_exception=retry_on_exception,
|
||||
stop_max_attempt_number=fetch_attempts,
|
||||
retry_on_result=retry_on_result)
|
||||
try:
|
||||
return r.call(client.server_version)
|
||||
except (AttributeError, ValueError, retrying.RetryError):
|
||||
raise k_exc.KazooException("Unable to fetch useable server"
|
||||
" version after trying %s times"
|
||||
% (fetch_attempts))
|
||||
|
||||
|
||||
def check_compatible(client, min_version=None, max_version=None,
|
||||
version_fetch_attempts=3):
|
||||
"""Checks if a kazoo client is backed by a zookeeper server version.
|
||||
|
||||
This check will verify that the zookeeper server version that the client
|
||||
@@ -148,7 +174,7 @@ def check_compatible(client, min_version=None, max_version=None):
|
||||
"""
|
||||
server_version = None
|
||||
if min_version:
|
||||
server_version = tuple((int(a) for a in client.server_version()))
|
||||
server_version = fetch_server_version(client, version_fetch_attempts)
|
||||
min_version = tuple((int(a) for a in min_version))
|
||||
if server_version < min_version:
|
||||
pretty_server_version = ".".join([str(a) for a in server_version])
|
||||
@@ -159,7 +185,8 @@ def check_compatible(client, min_version=None, max_version=None):
|
||||
min_version))
|
||||
if max_version:
|
||||
if server_version is None:
|
||||
server_version = tuple((int(a) for a in client.server_version()))
|
||||
server_version = fetch_server_version(client,
|
||||
version_fetch_attempts)
|
||||
max_version = tuple((int(a) for a in max_version))
|
||||
if server_version > max_version:
|
||||
pretty_server_version = ".".join([str(a) for a in server_version])
|
||||
|
||||
Reference in New Issue
Block a user