HA race condition test for DHCP scheduling

Add fullstack testcase for DHCP HA scheduling with fake scheduler
that selects new agent with some extra sleep for every scheduling.

Change-Id: I1045992ce9a18b37dd7dcdb46063698fad983932
Related-Bug: #1535554
changes/87/683987/7
elajkat 3 years ago
parent 19bc090b1b
commit 0041860e97
  1. 44
      neutron/tests/base.py
  2. 4
      neutron/tests/fullstack/resources/config.py
  3. 4
      neutron/tests/fullstack/resources/environment.py
  4. 0
      neutron/tests/fullstack/schedulers/__init__.py
  5. 52
      neutron/tests/fullstack/schedulers/dhcp.py
  6. 40
      neutron/tests/fullstack/test_dhcp_agent.py
  7. 46
      neutron/tests/functional/services/portforwarding/test_port_forwarding.py

@ -23,6 +23,7 @@ import inspect
import logging
import os
import os.path
import threading
import eventlet.timeout
import fixtures
@ -479,6 +480,49 @@ class BaseTestCase(DietTestCase):
self.config(group='AGENT',
root_helper_daemon=get_rootwrap_daemon_cmd())
def _simulate_concurrent_requests_process_and_raise(self, calls, args):
class SimpleThread(threading.Thread):
def __init__(self, q):
super(SimpleThread, self).__init__()
self.q = q
self.exception = None
def run(self):
try:
while not self.q.empty():
item = None
try:
item = self.q.get(False)
func, func_args = item[0], item[1]
func(*func_args)
except six.moves.queue.Empty:
pass
finally:
if item:
self.q.task_done()
except Exception as e:
self.exception = e
def get_exception(self):
return self.exception
q = six.moves.queue.Queue()
for func, func_args in zip(calls, args):
q.put_nowait((func, func_args))
threads = []
for z in range(len(calls)):
t = SimpleThread(q)
threads.append(t)
t.start()
q.join()
for t in threads:
e = t.get_exception()
if e:
raise e
class PluginFixture(fixtures.Fixture):

@ -122,6 +122,10 @@ class NeutronConfigFixture(ConfigFixture):
env_desc.placement_port
}
})
if env_desc.dhcp_scheduler_class:
self.config['DEFAULT']['dhcp_agents_per_network'] = '1'
self.config['DEFAULT']['network_scheduler_driver'] = (
env_desc.dhcp_scheduler_class)
net_helpers.set_local_port_range(CLIENT_CONN_PORT_START,
CLIENT_CONN_PORT_END)

@ -39,7 +39,8 @@ class EnvironmentDescription(object):
agent_down_time=75, router_scheduler=None,
global_mtu=constants.DEFAULT_NETWORK_MTU,
debug_iptables=False, log=False, report_bandwidths=False,
has_placement=False, placement_port=None):
has_placement=False, placement_port=None,
dhcp_scheduler_class=None,):
self.network_type = network_type
self.l2_pop = l2_pop
self.qos = qos
@ -55,6 +56,7 @@ class EnvironmentDescription(object):
self.report_bandwidths = report_bandwidths
self.has_placement = has_placement
self.placement_port = placement_port
self.dhcp_scheduler_class = dhcp_scheduler_class
if self.qos:
self.service_plugins += ',qos'
if self.log:

@ -0,0 +1,52 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from neutron.scheduler import base_scheduler
from neutron.scheduler import dhcp_agent_scheduler
class AlwaysTheOtherAgentScheduler(base_scheduler.BaseChanceScheduler,
dhcp_agent_scheduler.AutoScheduler):
"""Choose always different agent that the ones selected previously
This dhcp agent scheduler intended use is only in fullstack tests.
The goal is to ensure the concurrently running schedulings to select
different agents so the over-scheduling becomes visible in the number
of agents scheduled to the network.
To use this scheduler initialize your EnvironmentDescription with
dhcp_scheduler_class='neutron.tests.fullstack.test_dhcp_agent.'
'AlwaysTheOtherAgentScheduler'
"""
def __init__(self):
self.last_selected_agent_ids = []
super(AlwaysTheOtherAgentScheduler, self).__init__(
dhcp_agent_scheduler.DhcpFilter())
def select(self, plugin, context, resource_hostable_agents,
resource_hosted_agents, num_agents_needed):
possible_agents = []
for agent in resource_hostable_agents:
if agent.id in self.last_selected_agent_ids:
continue
else:
possible_agents.append(agent)
num_agents = min(len(possible_agents), num_agents_needed)
self.last_selected_agent_ids = [
ag.id for ag in possible_agents[0:num_agents]]
# Note(lajoskatona): To make the race window big enough let's delay
# the actual scheduling.
time.sleep(5)
return possible_agents[0:num_agents_needed]

@ -36,6 +36,8 @@ class BaseDhcpAgentTest(base.BaseFullStackTestCase):
(constants.AGENT_TYPE_LINUXBRIDGE,
{'l2_agent_type': constants.AGENT_TYPE_LINUXBRIDGE})
]
boot_vm_for_test = True
dhcp_scheduler_class = None
def setUp(self):
host_descriptions = [
@ -48,12 +50,15 @@ class BaseDhcpAgentTest(base.BaseFullStackTestCase):
environment.EnvironmentDescription(
l2_pop=False,
arp_responder=False,
agent_down_time=self.agent_down_time),
agent_down_time=self.agent_down_time,
dhcp_scheduler_class=self.dhcp_scheduler_class,
),
host_descriptions)
super(BaseDhcpAgentTest, self).setUp(env)
self.project_id = uuidutils.generate_uuid()
self._create_network_subnet_and_vm()
if self.boot_vm_for_test:
self._create_network_subnet_and_vm()
def _spawn_vm(self):
host = random.choice(self.environment.hosts)
@ -193,3 +198,34 @@ class TestDhcpAgentHA(BaseDhcpAgentTest):
# check if new vm will get IP from DHCP agent which is still alive
new_vm = self._spawn_vm()
new_vm.block_until_dhcp_config_done()
class TestDhcpAgentHARaceCondition(BaseDhcpAgentTest):
agent_down_time = 30
number_of_hosts = 2
boot_vm_for_test = False
dhcp_scheduler_class = ('neutron.tests.fullstack.schedulers.dhcp.'
'AlwaysTheOtherAgentScheduler')
def setUp(self):
super(TestDhcpAgentHARaceCondition, self).setUp()
self._create_network_with_multiple_subnets()
def _create_network_with_multiple_subnets(self):
self.network = self.safe_client.create_network(self.project_id)
funcs = []
args = []
for i in range(4):
funcs.append(self.safe_client.create_subnet)
args.append((
self.project_id, self.network['id'], '10.0.%s.0/24' % i,
'10.0.%s.1' % i, 'subnet-test-%s' % i, True
))
self._simulate_concurrent_requests_process_and_raise(funcs, args)
def test_dhcp_agent_ha_with_race_condition(self):
network_dhcp_agents = self.client.list_dhcp_agent_hosting_networks(
self.network['id'])['agents']
self.assertEqual(1, len(network_dhcp_agents))

@ -10,8 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import threading
import mock
from neutron_lib.api.definitions import floating_ip_port_forwarding as apidef
from neutron_lib.callbacks import exceptions as c_exc
@ -20,7 +18,6 @@ from neutron_lib.exceptions import l3 as lib_l3_exc
from neutron_lib.plugins import constants as plugin_constants
from neutron_lib.plugins import directory
from oslo_utils import uuidutils
from six.moves import queue
from neutron.services.portforwarding.common import exceptions as pf_exc
from neutron.services.portforwarding import pf_plugin
@ -375,49 +372,6 @@ class PortForwardingTestCase(PortForwardingTestCaseBase):
self.pf_plugin.delete_floatingip_port_forwarding,
self.context, res['id'], uuidutils.generate_uuid())
def _simulate_concurrent_requests_process_and_raise(
self, funcs, args_list):
class SimpleThread(threading.Thread):
def __init__(self, q):
super(SimpleThread, self).__init__()
self.q = q
self.exception = None
def run(self):
try:
while not self.q.empty():
item = None
try:
item = self.q.get(False)
func, func_args = item[0], item[1]
func(*func_args)
except queue.Empty:
pass
finally:
if item:
self.q.task_done()
except Exception as e:
self.exception = e
def get_exception(self):
return self.exception
q = queue.Queue()
for func, func_args in zip(funcs, args_list):
q.put_nowait((func, func_args))
threads = []
for _ in range(len(funcs)):
t = SimpleThread(q)
threads.append(t)
t.start()
q.join()
for t in threads:
e = t.get_exception()
if e:
raise e
def test_concurrent_create_port_forwarding_delete_fip(self):
func1 = self.pf_plugin.create_floatingip_port_forwarding

Loading…
Cancel
Save