Browse Source

Merge "Check metadata iptables chains during functional test"

changes/46/142946/5
Jenkins 7 years ago
committed by Gerrit Code Review
parent
commit
0a46f3633e
  1. 12
      neutron/agent/linux/iptables_manager.py
  2. 18
      neutron/tests/functional/agent/test_l3_agent.py

12
neutron/agent/linux/iptables_manager.py

@ -242,9 +242,6 @@ class IptablesTable(object):
return [rule for rule in self.rules
if rule.chain == chain and rule.wrap == wrap]
def is_chain_empty(self, chain, wrap=True):
return not self._get_chain_rules(chain, wrap)
def empty_chain(self, chain, wrap=True):
"""Remove all rules from a chain."""
chained_rules = self._get_chain_rules(chain, wrap)
@ -362,12 +359,15 @@ class IptablesManager(object):
self.ipv4['nat'].add_chain('float-snat')
self.ipv4['nat'].add_rule('snat', '-j $float-snat')
def is_chain_empty(self, table, chain, ip_version=4, wrap=True):
def get_chain(self, table, chain, ip_version=4, wrap=True):
try:
requested_table = {4: self.ipv4, 6: self.ipv6}[ip_version][table]
except KeyError:
return True
return requested_table.is_chain_empty(chain, wrap)
return []
return requested_table._get_chain_rules(chain, wrap)
def is_chain_empty(self, table, chain, ip_version=4, wrap=True):
return not self.get_chain(table, chain, ip_version, wrap)
def defer_apply_on(self):
self.iptables_apply_deferred = True

18
neutron/tests/functional/agent/test_l3_agent.py

@ -287,6 +287,7 @@ class L3AgentTestCase(L3AgentTestFramework):
self._assert_floating_ips(router)
self._assert_snat_chains(router)
self._assert_floating_ip_chains(router)
self._assert_metadata_chains(router)
if enable_ha:
self._assert_ha_device(router)
@ -344,6 +345,23 @@ class L3AgentTestCase(L3AgentTestFramework):
self.assertFalse(router.iptables_manager.is_chain_empty(
'nat', 'float-snat'))
def _get_rule(self, iptables_manager, table, chain, predicate):
rules = iptables_manager.get_chain(table, chain)
result = next(rule for rule in rules if predicate(rule))
return result
def _assert_metadata_chains(self, router):
metadata_port_filter = lambda rule: (
str(self.agent.conf.metadata_port) in rule.rule)
self.assertTrue(self._get_rule(router.iptables_manager,
'nat',
'PREROUTING',
metadata_port_filter))
self.assertTrue(self._get_rule(router.iptables_manager,
'filter',
'INPUT',
metadata_port_filter))
def _assert_router_does_not_exist(self, router):
# If the namespace assertion succeeds
# then the devices and iptable rules have also been deleted,

Loading…
Cancel
Save