Browse Source

Remove ebtables_driver/manager dead code

Previous changes[1] have been merged as enablers[2] to fix the bug
1274034 but an alternative solution has been choosen and now we can
consider the introduced code as dead code.

This changes removes [2], associated tests and rootwrap filters.

[1] I9ef57a86b1a1c1fa4ba1a034c920f23cb40072c0
    I3c66e92cbe8883dcad843ad243388def3a96dbe5
[2] neutron.agent.linux.ebtables_driver
    neutron.agent.linux.ebtables_manager

Closes-Bug: #1493422
Related-Bug: #1274034
Change-Id: I61e38fc0d8cf8e79252aabc19a70240be57e4a32
tags/7.0.0.0rc1
Cedric Brandily 4 years ago
parent
commit
b62b92da9b
6 changed files with 0 additions and 1023 deletions
  1. +0
    -2
      etc/neutron/rootwrap.d/ebtables.filters
  2. +0
    -290
      neutron/agent/linux/ebtables_driver.py
  3. +0
    -253
      neutron/agent/linux/ebtables_manager.py
  4. +0
    -127
      neutron/tests/functional/agent/linux/test_ebtables_driver.py
  5. +0
    -191
      neutron/tests/unit/agent/linux/test_ebtables_driver.py
  6. +0
    -160
      neutron/tests/unit/agent/linux/test_ebtables_manager.py

+ 0
- 2
etc/neutron/rootwrap.d/ebtables.filters View File

@@ -8,6 +8,4 @@

[Filters]

# neutron/agent/linux/ebtables_driver.py
ebtables: CommandFilter, ebtables, root
ebtablesEnv: EnvFilter, ebtables, root, EBTABLES_ATOMIC_FILE=

+ 0
- 290
neutron/agent/linux/ebtables_driver.py View File

@@ -1,290 +0,0 @@
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#

"""Implement ebtables rules using linux utilities."""

import re

from retrying import retry

from oslo_config import cfg
from oslo_log import log as logging

from neutron.common import utils

ebtables_opts = [
cfg.StrOpt('ebtables_path',
default='$state_path/ebtables-',
help=_('Location of temporary ebtables table files.')),
]

CONF = cfg.CONF
CONF.register_opts(ebtables_opts)

LOG = logging.getLogger(__name__)

# Collection of regexes to parse ebtables output
_RE_FIND_BRIDGE_TABLE_NAME = re.compile(r'^Bridge table:[\s]*([a-z]+)$')
# get chain name, nunmber of entries and policy name.
_RE_FIND_BRIDGE_CHAIN_INFO = re.compile(
r'^Bridge chain:[\s]*(.*),[\s]*entries:[\s]*[0-9]+,[\s]*'
r'policy:[\s]*([A-Z]+)$')
_RE_FIND_BRIDGE_RULE_COUNTERS = re.compile(
r',[\s]*pcnt[\s]*=[\s]*([0-9]+)[\s]*--[\s]*bcnt[\s]*=[\s]*([0-9]+)$')
_RE_FIND_COMMIT_STATEMENT = re.compile(r'^COMMIT$')
_RE_FIND_COMMENTS_AND_BLANKS = re.compile(r'^#|^$')
_RE_FIND_APPEND_RULE = re.compile(r'-A (\S+) ')

# Regexes to parse ebtables rule file input
_RE_RULES_FIND_TABLE_NAME = re.compile(r'^\*([a-z]+)$')
_RE_RULES_FIND_CHAIN_NAME = re.compile(r'^:(.*)[\s]+([A-Z]+)$')
_RE_RULES_FIND_RULE_LINE = re.compile(r'^\[([0-9]+):([0-9]+)\]')


def _process_ebtables_output(lines):
"""Process raw output of ebtables rule listing file.

Empty lines and comments removed, ebtables listing output converted
into ebtables rules.

For example, if the raw ebtables list lines (input to this function) are:

Bridge table: filter
Bridge chain: INPUT, entries: 0, policy: ACCEPT
Bridge chain: FORWARD, entries: 0, policy: ACCEPT
Bridge chain: OUTPUT, entries: 0, policy: ACCEPT

The output then will be:

*filter
:INPUT ACCEPT
:FORWARD ACCEPT
:OUTPUT ACCEPT
COMMIT

Key point: ebtables rules listing output is not the same as the rules
format for setting new rules.

"""
table = None
chain = ''
chains = []
rules = []

for line in lines:
if _RE_FIND_COMMENTS_AND_BLANKS.search(line):
continue
match = _RE_FIND_BRIDGE_RULE_COUNTERS.search(line)
if table and match:
rules.append('[%s:%s] -A %s %s' % (match.group(1),
match.group(2),
chain,
line[:match.start()].strip()))
match = _RE_FIND_BRIDGE_CHAIN_INFO.search(line)
if match:
chains.append(':%s %s' % (match.group(1), match.group(2)))
chain = match.group(1)
continue
match = _RE_FIND_BRIDGE_TABLE_NAME.search(line)
if match:
table = '*%s' % match.group(1)
continue
return [table] + chains + rules + ['COMMIT']


def _match_rule_line(table, line):
match = _RE_RULES_FIND_RULE_LINE.search(line)
if table and match:
args = line[match.end():].split()
res = [(table, args)]
if int(match.group(1)) > 0 and int(match.group(2)) > 0:
p = _RE_FIND_APPEND_RULE
rule = p.sub(r'-C \1 %s %s ', line[match.end() + 1:])
args = (rule % (match.group(1), match.group(2))).split()
res.append((table, args))
return table, res
else:
return table, None


def _match_chain_name(table, tables, line):
match = _RE_RULES_FIND_CHAIN_NAME.search(line)
if table and match:
if match.group(1) not in tables[table]:
args = ['-N', match.group(1), '-P', match.group(2)]
else:
args = ['-P', match.group(1), match.group(2)]
return table, (table, args)
else:
return table, None


def _match_table_name(table, line):
match = _RE_RULES_FIND_TABLE_NAME.search(line)
if match:
# Initialize with current kernel table if we just start out
table = match.group(1)
return table, (table, ['--atomic-init'])
else:
return table, None


def _match_commit_statement(table, line):
match = _RE_FIND_COMMIT_STATEMENT.search(line)
if table and match:
# Conclude by issuing the commit command
return (table, ['--atomic-commit'])
else:
return None


def _process_ebtables_input(lines):
"""Import text ebtables rules. Similar to iptables-restore.

Was based on:
http://sourceforge.net/p/ebtables/code/ci/
3730ceb7c0a81781679321bfbf9eaa39cfcfb04e/tree/userspace/ebtables2/
ebtables-save?format=raw

The function prepares and returns a list of tuples, each tuple consisting
of a table name and ebtables arguments. The caller can then repeatedly call
ebtables on that table with those arguments to get the rules applied.

For example, this input:

*filter
:INPUT ACCEPT
:FORWARD ACCEPT
:OUTPUT ACCEPT
:neutron-nwfilter-spoofing-fallb ACCEPT
:neutron-nwfilter-OUTPUT ACCEPT
:neutron-nwfilter-INPUT ACCEPT
:neutron-nwfilter-FORWARD ACCEPT
[0:0] -A INPUT -j neutron-nwfilter-INPUT
[0:0] -A OUTPUT -j neutron-nwfilter-OUTPUT
[0:0] -A FORWARD -j neutron-nwfilter-FORWARD
[0:0] -A neutron-nwfilter-spoofing-fallb -j DROP
COMMIT

... produces this output:

('filter', ['--atomic-init'])
('filter', ['-P', 'INPUT', 'ACCEPT'])
('filter', ['-P', 'FORWARD', 'ACCEPT'])
('filter', ['-P', 'OUTPUT', 'ACCEPT'])
('filter', ['-N', 'neutron-nwfilter-spoofing-fallb', '-P', 'ACCEPT'])
('filter', ['-N', 'neutron-nwfilter-OUTPUT', '-P', 'ACCEPT'])
('filter', ['-N', 'neutron-nwfilter-INPUT', '-P', 'ACCEPT'])
('filter', ['-N', 'neutron-nwfilter-FORWARD', '-P', 'ACCEPT'])
('filter', ['-A', 'INPUT', '-j', 'neutron-nwfilter-INPUT'])
('filter', ['-A', 'OUTPUT', '-j', 'neutron-nwfilter-OUTPUT'])
('filter', ['-A', 'FORWARD', '-j', 'neutron-nwfilter-FORWARD'])
('filter', ['-A', 'neutron-nwfilter-spoofing-fallb', '-j', 'DROP'])
('filter', ['--atomic-commit'])

"""
tables = {'filter': ['INPUT', 'FORWARD', 'OUTPUT'],
'nat': ['PREROUTING', 'OUTPUT', 'POSTROUTING'],
'broute': ['BROUTING']}
table = None

ebtables_args = list()
for line in lines.splitlines():
if _RE_FIND_COMMENTS_AND_BLANKS.search(line):
continue
table, res = _match_rule_line(table, line)
if res:
ebtables_args.extend(res)
continue
table, res = _match_chain_name(table, tables, line)
if res:
ebtables_args.append(res)
continue
table, res = _match_table_name(table, line)
if res:
ebtables_args.append(res)
continue
res = _match_commit_statement(table, line)
if res:
ebtables_args.append(res)
continue

return ebtables_args


@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000,
stop_max_delay=10000)
def _cmd_retry(func, *args, **kwargs):
return func(*args, **kwargs)


def run_ebtables(namespace, execute, table, args):
"""Run ebtables utility, with retry if necessary.

Provide table name and list of additional arguments to ebtables.

"""
cmd = ['ebtables', '-t', table]
if CONF.ebtables_path:
f = '%s%s' % (CONF.ebtables_path, table)
cmd += ['--atomic-file', f]
cmd += args
if namespace:
cmd = ['ip', 'netns', 'exec', namespace] + cmd
# TODO(jbrendel): The root helper is used for every ebtables command,
# but as we use an atomic file we only need root for
# init and commit commands.
# But the generated file by init ebtables command is
# only readable and writable by root.
#
# We retry the execution of ebtables in case of failure. Known issue:
# See bug: https://bugs.launchpad.net/nova/+bug/1316621
# See patch: https://review.openstack.org/#/c/140514/3
return _cmd_retry(execute, cmd, **{"run_as_root": True})


def run_ebtables_multiple(namespace, execute, arg_list):
"""Run ebtables utility multiple times.

Similar to run(), but runs ebtables for every element in arg_list.
Each arg_list element is a tuple containing the table name and a list
of ebtables arguments.

"""
for table, args in arg_list:
run_ebtables(namespace, execute, table, args)


@utils.synchronized('ebtables', external=True)
def ebtables_save(execute, tables_names, namespace=None):
"""Generate text output of the ebtables rules.

Based on:
http://sourceforge.net/p/ebtables/code/ci/master/tree/userspace/ebtables2/
ebtables-save?format=raw

"""
raw_outputs = (run_ebtables(namespace, execute,
t, ['-L', '--Lc']).splitlines() for t in tables_names)
parsed_outputs = (_process_ebtables_output(lines) for lines in raw_outputs)
return '\n'.join(l for lines in parsed_outputs for l in lines)


@utils.synchronized('ebtables', external=True)
def ebtables_restore(lines, execute, namespace=None):
"""Import text ebtables rules and apply."""
ebtables_args = _process_ebtables_input(lines)
run_ebtables_multiple(namespace, execute, ebtables_args)

+ 0
- 253
neutron/agent/linux/ebtables_manager.py View File

@@ -1,253 +0,0 @@
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#

"""
Implement a manager for ebtables rules.

NOTE: The ebtables manager contains a lot of duplicated or very similar code
from the iptables manager. An option would have been to refactor the
iptables manager so that ebtables and iptables manager can share common
code. However, the iptables manager was considered too brittle and
in need for a larger re-work or full replacement in the future.
Therefore, it was decided not to do any refactoring for now and to accept
the code duplication.

"""

import inspect
import os

from oslo_log import log as logging

from neutron.i18n import _LW


LOG = logging.getLogger(__name__)


MAX_CHAIN_LEN_EBTABLES = 31
# NOTE(jbrendel): ebtables supports chain names of up to 31 characters, and
# we add up to 12 characters to prefix_chain which is used
# as a prefix, so we limit it to 19 characters.
POSTROUTING_STR = '-POSTROUTING'
MAX_LEN_PREFIX_CHAIN = MAX_CHAIN_LEN_EBTABLES - len(POSTROUTING_STR)

# When stripping or calculating string lengths, sometimes a '-' which separates
# name components needs to be considered.
DASH_STR_LEN = 1


def binary_name():
"""Grab the name of the binary we're running in."""
return os.path.basename(inspect.stack()[-1][1])


def _get_prefix_chain(prefix_chain=None):
"""Determine the prefix chain."""
if prefix_chain:
return prefix_chain[:MAX_LEN_PREFIX_CHAIN]
else:
return binary_name()[:MAX_LEN_PREFIX_CHAIN]


def get_chain_name(chain_name, wrap=True, prefix_chain=None):
"""Determine the chain name."""
if wrap:
# Get the possible chain name length in function of the prefix name
# length.
chain_len = (MAX_CHAIN_LEN_EBTABLES -
(len(_get_prefix_chain(prefix_chain)) + DASH_STR_LEN))
return chain_name[:chain_len]
else:
return chain_name[:MAX_CHAIN_LEN_EBTABLES]


class EbtablesRule(object):
"""An ebtables rule.

You shouldn't need to use this class directly, it's only used by
EbtablesManager.

"""

def __init__(self, chain, rule, wrap=True, top=False,
prefix_chain=None):
self.prefix_chain = _get_prefix_chain(prefix_chain)
self.chain = get_chain_name(chain, wrap, prefix_chain)
self.rule = rule
self.wrap = wrap
self.top = top

def __eq__(self, other):
return ((self.chain == other.chain) and
(self.rule == other.rule) and
(self.top == other.top) and
(self.wrap == other.wrap))

def __ne__(self, other):
return not self == other

def __str__(self):
if self.wrap:
chain = '%s-%s' % (self.prefix_chain, self.chain)
else:
chain = self.chain
return '-A %s %s' % (chain, self.rule)


class EbtablesTable(object):
"""An ebtables table."""

def __init__(self, prefix_chain=None):
self.rules = []
self.rules_to_remove = []
self.chains = set()
self.unwrapped_chains = set()
self.chains_to_remove = set()
self.prefix_chain = _get_prefix_chain(prefix_chain)

def add_chain(self, name, wrap=True):
"""Adds a named chain to the table.

The chain name is wrapped to be unique for the component creating
it, so different components of Neutron can safely create identically
named chains without interfering with one another.

At the moment, its wrapped name is <prefix chain>-<chain name>,
so if neutron-server creates a chain named 'OUTPUT', it'll actually
end up named 'neutron-server-OUTPUT'.

"""
name = get_chain_name(name, wrap, self.prefix_chain)
if wrap:
self.chains.add(name)
else:
self.unwrapped_chains.add(name)

def _select_chain_set(self, wrap):
if wrap:
return self.chains
else:
return self.unwrapped_chains

def ensure_remove_chain(self, name, wrap=True):
"""Ensure the chain is removed.

This removal "cascades". All rule in the chain are removed, as are
all rules in other chains that jump to it.
"""
self.remove_chain(name, wrap, log_not_found=False)

def remove_chain(self, name, wrap=True, log_not_found=True):
"""Remove named chain.

This removal "cascades". All rules in the chain are removed, as are
all rules in other chains that jump to it.

If the chain is not found then this is merely logged.

"""
name = get_chain_name(name, wrap, self.prefix_chain)
chain_set = self._select_chain_set(wrap)

if name not in chain_set:
if log_not_found:
LOG.warn(_LW('Attempted to remove chain %s '
'which does not exist'), name)
return

chain_set.remove(name)

if not wrap:
# non-wrapped chains and rules need to be dealt with specially,
# so we keep a list of them to be iterated over in apply()
self.chains_to_remove.add(name)

# first, add rules to remove that have a matching chain name
self.rules_to_remove += [r for r in self.rules if r.chain == name]

# next, remove rules from list that have a matching chain name
self.rules = [r for r in self.rules if r.chain != name]

if not wrap:
jump_snippet = '-j %s' % name
# next, add rules to remove that have a matching jump chain
self.rules_to_remove += [r for r in self.rules
if jump_snippet in r.rule]
else:
jump_snippet = '-j %s-%s' % (self.prefix_chain, name)

# finally, remove rules from list that have a matching jump chain
self.rules = [r for r in self.rules
if jump_snippet not in r.rule]

def add_rule(self, chain, rule, wrap=True, top=False):
"""Add a rule to the table.

This is just like what you'd feed to ebtables, just without
the '-A <chain name>' bit at the start.

However, if you need to jump to one of your wrapped chains,
prepend its name with a '$' which will ensure the wrapping
is applied correctly.

"""
chain = get_chain_name(chain, wrap, self.prefix_chain)
if wrap and chain not in self.chains:
raise LookupError(_('Unknown chain: %r') % chain)

if '$' in rule:
rule = ' '.join(map(self._wrap_target_chain, rule.split(' ')))

self.rules.append(EbtablesRule(chain, rule, wrap, top,
self.prefix_chain))

def remove_rule(self, chain, rule, wrap=True, top=False):
"""Remove a rule from a chain.

However, if the rule jumps to one of your wrapped chains,
prepend its name with a '$' which will ensure the wrapping
is applied correctly.
"""
chain = get_chain_name(chain, wrap, self.prefix_chain)
if '$' in rule:
rule = ' '.join(map(self._wrap_target_chain, rule.split(' ')))

try:
self.rules.remove(EbtablesRule(chain, rule, wrap, top,
self.prefix_chain))
if not wrap:
self.rules_to_remove.append(
EbtablesRule(chain, rule, wrap, top,
self.prefix_chain))
except ValueError:
LOG.warn(_LW('Tried to remove rule that was not there:'
' %(chain)r %(rule)r %(wrap)r %(top)r'),
{'chain': chain, 'rule': rule,
'top': top, 'wrap': wrap})

def _wrap_target_chain(self, s):
if s.startswith('$'):
return ('%s-%s' % (self.prefix_chain, s[1:]))
return s

def empty_chain(self, chain, wrap=True):
"""Remove all rules from a chain."""
chain = get_chain_name(chain, wrap, self.prefix_chain)
chained_rules = [rule for rule in self.rules
if rule.chain == chain and rule.wrap == wrap]
for rule in chained_rules:
self.rules.remove(rule)

+ 0
- 127
neutron/tests/functional/agent/linux/test_ebtables_driver.py View File

@@ -1,127 +0,0 @@
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from neutron.agent.linux import bridge_lib
from neutron.agent.linux import ebtables_driver
from neutron.tests.common import machine_fixtures
from neutron.tests.common import net_helpers
from neutron.tests.functional import base


NO_FILTER_APPLY = (
"*filter\n"
":INPUT ACCEPT\n"
":FORWARD ACCEPT\n"
":OUTPUT ACCEPT\n"
":neutron-nwfilter-OUTPUT ACCEPT\n"
":neutron-nwfilter-INPUT ACCEPT\n"
":neutron-nwfilter-FORWARD ACCEPT\n"
":neutron-nwfilter-spoofing-fallb ACCEPT\n"
"[0:0] -A INPUT -j neutron-nwfilter-INPUT\n"
"[0:0] -A FORWARD -j neutron-nwfilter-FORWARD\n"
"[2:140] -A OUTPUT -j neutron-nwfilter-OUTPUT\n"
"[0:0] -A neutron-nwfilter-spoofing-fallb -j DROP\n"
"COMMIT")

FILTER_APPLY_TEMPLATE = (
"*filter\n"
":INPUT ACCEPT\n"
":FORWARD ACCEPT\n"
":OUTPUT ACCEPT\n"
":neutron-nwfilter-OUTPUT ACCEPT\n"
":neutron-nwfilter-isome-port-id ACCEPT\n"
":neutron-nwfilter-i-arp-some-por ACCEPT\n"
":neutron-nwfilter-i-ip-some-port ACCEPT\n"
":neutron-nwfilter-spoofing-fallb ACCEPT\n"
":neutron-nwfilter-INPUT ACCEPT\n"
":neutron-nwfilter-FORWARD ACCEPT\n"
"[0:0] -A neutron-nwfilter-OUTPUT -j neutron-nwfilter-isome-port-id\n"
"[0:0] -A INPUT -j neutron-nwfilter-INPUT\n"
"[2:140] -A OUTPUT -j neutron-nwfilter-OUTPUT\n"
"[0:0] -A FORWARD -j neutron-nwfilter-FORWARD\n"
"[0:0] -A neutron-nwfilter-spoofing-fallb -j DROP\n"
"[0:0] -A neutron-nwfilter-i-arp-some-por "
"-p arp --arp-opcode 2 --arp-mac-src %(mac_addr)s "
"--arp-ip-src %(ip_addr)s -j RETURN\n"
"[0:0] -A neutron-nwfilter-i-arp-some-por -p ARP --arp-op Request "
"-j ACCEPT\n"
"[0:0] -A neutron-nwfilter-i-arp-some-por "
"-j neutron-nwfilter-spoofing-fallb\n"
"[0:0] -A neutron-nwfilter-isome-port-id "
"-p arp -j neutron-nwfilter-i-arp-some-por\n"
"[0:0] -A neutron-nwfilter-i-ip-some-port "
"-s %(mac_addr)s -p IPv4 --ip-source %(ip_addr)s -j RETURN\n"
"[0:0] -A neutron-nwfilter-i-ip-some-port "
"-j neutron-nwfilter-spoofing-fallb\n"
"[0:0] -A neutron-nwfilter-isome-port-id "
"-p IPv4 -j neutron-nwfilter-i-ip-some-port\n"
"COMMIT")


class EbtablesLowLevelTestCase(base.BaseSudoTestCase):

def setUp(self):
super(EbtablesLowLevelTestCase, self).setUp()

bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge
self.source, self.destination = self.useFixture(
machine_fixtures.PeerMachines(bridge)).machines

# Extract MAC and IP address of one of my interfaces
self.mac = self.source.port.link.address
self.addr = self.source.ip

# Pick one of the namespaces and setup a bridge for the local ethernet
# interface there, because ebtables only works on bridged interfaces.
dev_mybridge = bridge_lib.BridgeDevice.addbr(
'mybridge', self.source.namespace)
dev_mybridge.addif(self.source.port.name)

# Take the IP addrss off one of the interfaces and apply it to the
# bridge interface instead.
self.source.port.addr.delete(self.source.ip_cidr)
dev_mybridge.link.set_up()
dev_mybridge.addr.add(self.source.ip_cidr)

def _test_basic_port_filter_wrong_mac(self):
# Setup filter with wrong IP/MAC address pair. Basic filters only allow
# packets with specified address combinations, thus all packets will
# be dropped.
mac_ip_pair = dict(mac_addr="11:11:11:22:22:22", ip_addr=self.addr)
filter_apply = FILTER_APPLY_TEMPLATE % mac_ip_pair
ebtables_driver.ebtables_restore(filter_apply,
self.source.execute)
self.source.assert_no_ping(self.destination.ip)

# Assure that ping will work once we unfilter the instance
ebtables_driver.ebtables_restore(NO_FILTER_APPLY,
self.source.execute)
self.source.assert_ping(self.destination.ip)

def _test_basic_port_filter_correct_mac(self):
# Use the correct IP/MAC address pair for this one.
mac_ip_pair = dict(mac_addr=self.mac, ip_addr=self.addr)

filter_apply = FILTER_APPLY_TEMPLATE % mac_ip_pair
ebtables_driver.ebtables_restore(filter_apply,
self.source.execute)

self.source.assert_ping(self.destination.ip)

def test_ebtables_filtering(self):
# Cannot parallelize those tests. Therefore need to call them
# in order from a single function.
self._test_basic_port_filter_wrong_mac()
self._test_basic_port_filter_correct_mac()

+ 0
- 191
neutron/tests/unit/agent/linux/test_ebtables_driver.py View File

@@ -1,191 +0,0 @@
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#

import mock

from oslo_config import cfg

from neutron.agent.linux import ebtables_driver as eb
from neutron.cmd.sanity.checks import ebtables_supported
from neutron.tests import base


TABLES_NAMES = ['filter', 'nat', 'broute']

CONF = cfg.CONF


class EbtablesDriverLowLevelInputTestCase(base.BaseTestCase):

def test_match_rule_line(self):
self.assertEqual((None, None), eb._match_rule_line(None, "foo"))

rule_line = "[0:1] foobar blah bar"
self.assertEqual(('mytab', [('mytab', ['foobar', 'blah', 'bar'])]),
eb._match_rule_line("mytab", rule_line))

rule_line = "[2:3] foobar -A BAR -j BLAH"
self.assertEqual(
('mytab',
[('mytab', ['foobar', '-A', 'BAR', '-j', 'BLAH']),
('mytab', ['foobar', '-C', 'BAR', '2', '3', '-j', 'BLAH'])]),
eb._match_rule_line("mytab", rule_line))

def test_match_chain_name(self):
self.assertEqual((None, None), eb._match_chain_name(None, None, "foo"))

rule_line = ":neutron-nwfilter-OUTPUT ACCEPT"
tables = {"mytab": []}
self.assertEqual(
('mytab',
('mytab', ['-N', 'neutron-nwfilter-OUTPUT', '-P', 'ACCEPT'])),
eb._match_chain_name("mytab", tables, rule_line))

rule_line = ":neutron-nwfilter-OUTPUT ACCEPT"
tables = {"mytab": ['neutron-nwfilter-OUTPUT']}
self.assertEqual(
('mytab',
('mytab', ['-P', 'neutron-nwfilter-OUTPUT', 'ACCEPT'])),
eb._match_chain_name("mytab", tables, rule_line))

def test_match_table_name(self):
self.assertEqual((None, None), eb._match_table_name(None, "foo"))

rule_line = "*filter"
self.assertEqual(('filter', ('filter', ['--atomic-init'])),
eb._match_table_name("mytab", rule_line))

def test_commit_statement(self):
self.assertEqual(None, eb._match_commit_statement(None, "foo"))

rule_line = "COMMIT"
self.assertEqual(('mytab', ['--atomic-commit']),
eb._match_commit_statement("mytab", rule_line))

def test_ebtables_input_parse_comment(self):
# Comments and empty lines are stripped, nothing should be left.
test_input = ("# Here is a comment\n"
"\n"
"# We just had an empty line.\n")
res = eb._process_ebtables_input(test_input)
self.assertEqual(list(), res)

def test_ebtables_input_parse_start(self):
# Starting
test_input = "*filter"
res = eb._process_ebtables_input(test_input)
self.assertEqual([('filter', ['--atomic-init'])], res)

def test_ebtables_input_parse_commit(self):
# COMMIT without first starting a table should result in nothing,
test_input = "COMMIT"
res = eb._process_ebtables_input(test_input)
self.assertEqual(list(), res)

test_input = "*filter\nCOMMIT"
res = eb._process_ebtables_input(test_input)
self.assertEqual([('filter', ['--atomic-init']),
('filter', ['--atomic-commit'])],
res)

def test_ebtables_input_parse_rule(self):
test_input = "*filter\n[0:0] -A INPUT -j neutron-nwfilter-INPUT"
res = eb._process_ebtables_input(test_input)
self.assertEqual([('filter', ['--atomic-init']),
('filter',
['-A', 'INPUT', '-j', 'neutron-nwfilter-INPUT'])],
res)

def test_ebtables_input_parse_chain(self):
test_input = "*filter\n:foobar ACCEPT"
res = eb._process_ebtables_input(test_input)
self.assertEqual([('filter', ['--atomic-init']),
('filter', ['-N', 'foobar', '-P', 'ACCEPT'])],
res)

def test_ebtables_input_parse_all_together(self):
test_input = \
("*filter\n"
":INPUT ACCEPT\n"
":FORWARD ACCEPT\n"
":OUTPUT ACCEPT\n"
":neutron-nwfilter-spoofing-fallb ACCEPT\n"
":neutron-nwfilter-OUTPUT ACCEPT\n"
":neutron-nwfilter-INPUT ACCEPT\n"
":neutron-nwfilter-FORWARD ACCEPT\n"
"[0:0] -A INPUT -j neutron-nwfilter-INPUT\n"
"[0:0] -A OUTPUT -j neutron-nwfilter-OUTPUT\n"
"[0:0] -A FORWARD -j neutron-nwfilter-FORWARD\n"
"[0:0] -A neutron-nwfilter-spoofing-fallb -j DROP\n"
"COMMIT")
observed_res = eb._process_ebtables_input(test_input)
TNAME = 'filter'
expected_res = [
(TNAME, ['--atomic-init']),
(TNAME, ['-P', 'INPUT', 'ACCEPT']),
(TNAME, ['-P', 'FORWARD', 'ACCEPT']),
(TNAME, ['-P', 'OUTPUT', 'ACCEPT']),
(TNAME, ['-N', 'neutron-nwfilter-spoofing-fallb', '-P', 'ACCEPT']),
(TNAME, ['-N', 'neutron-nwfilter-OUTPUT', '-P', 'ACCEPT']),
(TNAME, ['-N', 'neutron-nwfilter-INPUT', '-P', 'ACCEPT']),
(TNAME, ['-N', 'neutron-nwfilter-FORWARD', '-P', 'ACCEPT']),
(TNAME, ['-A', 'INPUT', '-j', 'neutron-nwfilter-INPUT']),
(TNAME, ['-A', 'OUTPUT', '-j', 'neutron-nwfilter-OUTPUT']),
(TNAME, ['-A', 'FORWARD', '-j', 'neutron-nwfilter-FORWARD']),
(TNAME, ['-A', 'neutron-nwfilter-spoofing-fallb', '-j', 'DROP']),
(TNAME, ['--atomic-commit'])]

self.assertEqual(expected_res, observed_res)


class EbtablesDriverLowLevelOutputTestCase(base.BaseTestCase):

def test_ebtables_save_and_restore(self):
test_output = ('Bridge table: filter\n'
'Bridge chain: INPUT, entries: 1, policy: ACCEPT\n'
'-j CONTINUE , pcnt = 0 -- bcnt = 0\n'
'Bridge chain: FORWARD, entries: 1, policy: ACCEPT\n'
'-j CONTINUE , pcnt = 0 -- bcnt = 1\n'
'Bridge chain: OUTPUT, entries: 1, policy: ACCEPT\n'
'-j CONTINUE , pcnt = 1 -- bcnt = 1').split('\n')

observed_res = eb._process_ebtables_output(test_output)
expected_res = ['*filter',
':INPUT ACCEPT',
':FORWARD ACCEPT',
':OUTPUT ACCEPT',
'[0:0] -A INPUT -j CONTINUE',
'[0:1] -A FORWARD -j CONTINUE',
'[1:1] -A OUTPUT -j CONTINUE',
'COMMIT']
self.assertEqual(expected_res, observed_res)


class EbtablesDriverTestCase(base.BaseTestCase):

def setUp(self):
super(EbtablesDriverTestCase, self).setUp()
self.root_helper = 'sudo'
self.ebtables_path = CONF.ebtables_path
self.execute_p = mock.patch('neutron.agent.linux.utils.execute')
self.execute = self.execute_p.start()

def test_ebtables_sanity_check(self):
self.assertTrue(ebtables_supported())
self.execute.assert_has_calls([mock.call(['ebtables', '--version'])])

self.execute.side_effect = RuntimeError
self.assertFalse(ebtables_supported())

+ 0
- 160
neutron/tests/unit/agent/linux/test_ebtables_manager.py View File

@@ -1,160 +0,0 @@
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#

import mock

from neutron.agent.linux import ebtables_manager as em

from neutron.tests import base

LONG_NAME = "1234567890" * 3


class EbtablesManagerBaseTestCase(base.BaseTestCase):
def setUp(self):
super(EbtablesManagerBaseTestCase, self).setUp()
mock.patch.object(em, "binary_name", return_value="binary").start()


class EbtablesChainNameTestCase(EbtablesManagerBaseTestCase):

def test_get_prefix_chain(self):
# Fake the binary name to a known value for this test.
# Testing prefix chain name
self.assertEqual(em._get_prefix_chain(), "binary")
self.assertEqual(em._get_prefix_chain("some-name"),
"some-name")
self.assertEqual(em._get_prefix_chain(LONG_NAME),
LONG_NAME[:em.MAX_LEN_PREFIX_CHAIN])

def test_get_chain_name(self):
# Testing full chain name
prefix_chain = "some-other-name"
self.assertEqual(em.get_chain_name(chain_name="foobar",
prefix_chain=prefix_chain),
"foobar")

is_name = em.get_chain_name(chain_name=LONG_NAME,
wrap=True,
prefix_chain=prefix_chain)
should_name = (LONG_NAME[:em.MAX_CHAIN_LEN_EBTABLES -
len(prefix_chain) - 1])
self.assertEqual(is_name, should_name)
self.assertEqual(em.get_chain_name(chain_name=LONG_NAME,
wrap=False,
prefix_chain=prefix_chain),
LONG_NAME)
should_name = LONG_NAME[:-len("bar")]
self.assertEqual(em.get_chain_name(chain_name=LONG_NAME,
wrap=True,
prefix_chain="bar"),
should_name)
self.assertEqual(em.get_chain_name(chain_name=LONG_NAME,
wrap=False,
prefix_chain="bar"),
LONG_NAME)


class EbtablesRuleTestCase(EbtablesManagerBaseTestCase):

def test_basic_ops(self):
r1 = em.EbtablesRule("chain-name", "some-rule", wrap=True, top=False,
prefix_chain="foobar")
r2 = em.EbtablesRule("chain-name", "some-rule", wrap=True, top=False,
prefix_chain="foobar")
r3 = em.EbtablesRule("chain-name", "some-rule", wrap=True, top=True,
prefix_chain="foobar")
self.assertEqual(r1, r2)
self.assertNotEqual(r1, r3)

self.assertEqual("-A foobar-chain-name some-rule", str(r1))


class EbtablesTableTestCase(EbtablesManagerBaseTestCase):

def setUp(self):
super(EbtablesTableTestCase, self).setUp()
self.et = em.EbtablesTable()

def test_add_chain(self):
# Wrapped and un-wrapped chains are maintained separately, thus same
# name is possible.
self.et.add_chain("bar" + LONG_NAME, wrap=False)
self.et.add_chain("baz" + LONG_NAME, wrap=False)
self.et.add_chain("baz" + LONG_NAME)
self.et.add_chain("foo" + LONG_NAME)

self.assertEqual(set(['baz123456789012345678901',
'foo123456789012345678901']),
self.et._select_chain_set(wrap=True))
self.assertEqual(set(['bar1234567890123456789012345678',
'baz1234567890123456789012345678']),
self.et._select_chain_set(wrap=False))

def test_add_remove_rule(self):
# Adding some rules to a chain
self.et.add_chain("foobar")
self.et.add_rule("foobar", "some rule text")
self.assertEqual("-A binary-foobar some rule text",
str(self.et.rules[0]))
self.assertEqual(1, len(self.et.rules))

self.et.add_rule("foobar", "another rule")
self.assertEqual(2, len(self.et.rules))
self.assertEqual("-A binary-foobar some rule text",
str(self.et.rules[0]))
self.assertEqual("-A binary-foobar another rule",
str(self.et.rules[1]))

# Removing one of the rules, testing the state of the remaining rule
# list.
self.et.remove_rule("foobar", "some rule text")
self.assertEqual(1, len(self.et.rules))
self.assertEqual("-A binary-foobar another rule",
str(self.et.rules[0]))

# Testing emptying of a chain
self.et.add_rule("foobar", "yet another rule")
self.assertEqual(2, len(self.et.rules))
self.et.empty_chain("foobar")
self.assertEqual(0, len(self.et.rules))

def test_remove_chain(self):
self.et.add_chain("foobar")
self.et.add_rule("foobar", "some rule text")
self.et.add_rule("foobar", "yet another rule")
self.et.ensure_remove_chain("foobar")
self.assertEqual(0, len(self.et.rules))
self.assertEqual(0, len(self.et.chains))

# Testing the 'cascading' remove: If rules of chain A point to chain B
# and chain B is removed then those rules of chain A also need to be
# removed.
self.et.add_chain("chain-A")
self.et.add_rule("chain-A", "some rule text")
self.et.add_chain("chain-B")
self.et.add_rule("chain-B", "another rule")
# Now add the rule to chain-A with chain-B as jump target
self.et.add_rule("chain-A", "jumpyjump -j binary-chain-B")
self.assertEqual(2, len(self.et.chains))
self.assertEqual(3, len(self.et.rules))
# Remove chain-B, making the jump rule in chain-A invalid. This should
# trigger the cascading deletion of the rules.
self.et.ensure_remove_chain("chain-B")
self.assertEqual(1, len(self.et.chains))
self.assertEqual(1, len(self.et.rules))
self.assertEqual("-A binary-chain-A some rule text",
str(self.et.rules[0]))

Loading…
Cancel
Save