Check metadata iptables chains during functional test

The L3 agent functional test creates a router and asserts that
the proxy process is up. It will now also check that the NAT
and filter rules were added via the iptables manager.

This is to allow us to move the metadata management out of the
L3 agent and into its own L3 agent driver.

Change-Id: Iaa9b475a4294db96a9645829d362e090b61ed3a2
Partially-implements: blueprint restructure-l3-agent
This commit is contained in:
Assaf Muller 2014-11-23 14:32:19 +02:00
parent 2e93af3cfd
commit 24a703c386
2 changed files with 24 additions and 6 deletions

View File

@ -242,9 +242,6 @@ class IptablesTable(object):
return [rule for rule in self.rules
if rule.chain == chain and rule.wrap == wrap]
def is_chain_empty(self, chain, wrap=True):
return not self._get_chain_rules(chain, wrap)
def empty_chain(self, chain, wrap=True):
"""Remove all rules from a chain."""
chained_rules = self._get_chain_rules(chain, wrap)
@ -362,12 +359,15 @@ class IptablesManager(object):
self.ipv4['nat'].add_chain('float-snat')
self.ipv4['nat'].add_rule('snat', '-j $float-snat')
def is_chain_empty(self, table, chain, ip_version=4, wrap=True):
def get_chain(self, table, chain, ip_version=4, wrap=True):
try:
requested_table = {4: self.ipv4, 6: self.ipv6}[ip_version][table]
except KeyError:
return True
return requested_table.is_chain_empty(chain, wrap)
return []
return requested_table._get_chain_rules(chain, wrap)
def is_chain_empty(self, table, chain, ip_version=4, wrap=True):
return not self.get_chain(table, chain, ip_version, wrap)
def defer_apply_on(self):
self.iptables_apply_deferred = True

View File

@ -252,6 +252,7 @@ class L3AgentTestCase(L3AgentTestFramework):
self._assert_floating_ips(router)
self._assert_snat_chains(router)
self._assert_floating_ip_chains(router)
self._assert_metadata_chains(router)
if enable_ha:
self._assert_ha_device(router)
@ -309,6 +310,23 @@ class L3AgentTestCase(L3AgentTestFramework):
self.assertFalse(router.iptables_manager.is_chain_empty(
'nat', 'float-snat'))
def _get_rule(self, iptables_manager, table, chain, predicate):
rules = iptables_manager.get_chain(table, chain)
result = next(rule for rule in rules if predicate(rule))
return result
def _assert_metadata_chains(self, router):
metadata_port_filter = lambda rule: (
str(self.agent.conf.metadata_port) in rule.rule)
self.assertTrue(self._get_rule(router.iptables_manager,
'nat',
'PREROUTING',
metadata_port_filter))
self.assertTrue(self._get_rule(router.iptables_manager,
'filter',
'INPUT',
metadata_port_filter))
def _assert_router_does_not_exist(self, router):
# If the namespace assertion succeeds
# then the devices and iptable rules have also been deleted,