Separate wait_until to standalone function

It makes wait_until more usable outside of unittest.TestCase classes.
Part of this patch is renaming pinger module to helpers thus we can have
helpers module with useful utilities for testing.

Change-Id: Ie8e4854c23aa3739b0df618db1b3944466d64bd2
Related-Bug: 1243216
This commit is contained in:
Jakub Libosvar 2014-09-30 18:13:50 +02:00
parent ef86198181
commit f1277a4036
4 changed files with 32 additions and 18 deletions

View File

@ -19,7 +19,7 @@ from neutron.agent.linux import ovs_lib
from neutron.agent.linux import utils
from neutron.common import constants as n_const
from neutron.openstack.common import uuidutils
from neutron.tests.functional.agent.linux import pinger
from neutron.tests.functional.agent.linux import helpers
from neutron.tests.functional import base as functional_base
@ -97,7 +97,7 @@ class BaseIPVethTestCase(BaseLinuxTestCase):
def setUp(self):
super(BaseIPVethTestCase, self).setUp()
self.check_sudo_enabled()
self.pinger = pinger.Pinger(self)
self.pinger = helpers.Pinger(self)
@staticmethod
def _set_ip_up(device, cidr, broadcast, ip_version=4):

View File

@ -12,6 +12,23 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
def wait_until_true(predicate, timeout=60, sleep=1, exception=None):
"""
Wait until callable predicate is evaluated as True
:param predicate: Callable deciding whether waiting should continue.
Best practice is to instantiate predicate with functools.partial()
:param timeout: Timeout in seconds how long should function wait.
:param sleep: Polling interval for results in seconds.
:param exception: Exception class for eventlet.Timeout.
(see doc for eventlet.Timeout for more information)
"""
with eventlet.timeout.Timeout(timeout, exception):
while not predicate():
eventlet.sleep(sleep)
class Pinger(object):

View File

@ -14,6 +14,7 @@
# under the License.
import copy
import functools
import fixtures
import mock
@ -29,6 +30,7 @@ from neutron.openstack.common import log as logging
from neutron.openstack.common import uuidutils
from neutron.tests.common.agents import l3_agent as l3_test_agent
from neutron.tests.functional.agent.linux import base
from neutron.tests.functional.agent.linux import helpers
from neutron.tests.unit import test_l3_agent
LOG = logging.getLogger(__name__)
@ -233,16 +235,19 @@ class L3AgentTestCase(L3AgentTestFramework):
router = self.manage_router(self.agent, router_info)
if enable_ha:
self.wait_until(lambda: router.ha_state == 'master')
helpers.wait_until_true(lambda: router.ha_state == 'master')
# Keepalived notifies of a state transition when it starts,
# not when it ends. Thus, we have to wait until keepalived finishes
# configuring everything. We verify this by waiting until the last
# device has an IP address.
device = router.router[l3_constants.INTERFACE_KEY][-1]
self.wait_until(self.device_exists_with_ip_mac, device,
self.agent.get_internal_device_name,
router.ns_name)
device_exists = functools.partial(
self.device_exists_with_ip_mac,
device,
self.agent.get_internal_device_name,
router.ns_name)
helpers.wait_until_true(device_exists)
self.assertTrue(self._namespace_exists(router))
self.assertTrue(self._metadata_proxy_exists(self.agent.conf, router))
@ -347,8 +352,8 @@ class L3HATestFramework(L3AgentTestFramework):
router2 = self.manage_router(self.failover_agent, router_info_2)
self.wait_until(lambda: router1.ha_state == 'master')
self.wait_until(lambda: router2.ha_state == 'backup')
helpers.wait_until_true(lambda: router1.ha_state == 'master')
helpers.wait_until_true(lambda: router2.ha_state == 'backup')
device_name = self.agent.get_ha_device_name(
router1.router[l3_constants.HA_INTERFACE_KEY]['id'])
@ -356,5 +361,5 @@ class L3HATestFramework(L3AgentTestFramework):
router1.ns_name)
ha_device.link.set_down()
self.wait_until(lambda: router2.ha_state == 'master')
self.wait_until(lambda: router1.ha_state == 'fault')
helpers.wait_until_true(lambda: router2.ha_state == 'master')
helpers.wait_until_true(lambda: router1.ha_state == 'fault')

View File

@ -14,14 +14,11 @@
# under the License.
import os
import time
from neutron.tests import base
SUDO_CMD = 'sudo -n'
TIMEOUT = 60
SLEEP_INTERVAL = 1
class BaseSudoTestCase(base.BaseTestCase):
@ -58,8 +55,3 @@ class BaseSudoTestCase(base.BaseTestCase):
def check_sudo_enabled(self):
if not self.sudo_enabled:
self.skipTest('testing with sudo is not enabled')
def wait_until(self, predicate, *args, **kwargs):
with self.assert_max_execution_time(TIMEOUT):
while not predicate(*args, **kwargs):
time.sleep(SLEEP_INTERVAL)