Merge "Introduce connection testers module"

This commit is contained in:
Jenkins 2015-07-18 00:56:51 +00:00 committed by Gerrit Code Review
commit 6c7c4c2b33
5 changed files with 324 additions and 28 deletions

View File

@ -19,6 +19,10 @@ import contextlib
import six
INGRESS_DIRECTION = 'ingress'
EGRESS_DIRECTION = 'egress'
@six.add_metaclass(abc.ABCMeta)
class FirewallDriver(object):
"""Firewall Driver base class.

View File

@ -32,24 +32,22 @@ from neutron.i18n import _LI
LOG = logging.getLogger(__name__)
SG_CHAIN = 'sg-chain'
INGRESS_DIRECTION = 'ingress'
EGRESS_DIRECTION = 'egress'
SPOOF_FILTER = 'spoof-filter'
CHAIN_NAME_PREFIX = {INGRESS_DIRECTION: 'i',
EGRESS_DIRECTION: 'o',
CHAIN_NAME_PREFIX = {firewall.INGRESS_DIRECTION: 'i',
firewall.EGRESS_DIRECTION: 'o',
SPOOF_FILTER: 's'}
DIRECTION_IP_PREFIX = {'ingress': 'source_ip_prefix',
'egress': 'dest_ip_prefix'}
IPSET_DIRECTION = {INGRESS_DIRECTION: 'src',
EGRESS_DIRECTION: 'dst'}
DIRECTION_IP_PREFIX = {firewall.INGRESS_DIRECTION: 'source_ip_prefix',
firewall.EGRESS_DIRECTION: 'dest_ip_prefix'}
IPSET_DIRECTION = {firewall.INGRESS_DIRECTION: 'src',
firewall.EGRESS_DIRECTION: 'dst'}
LINUX_DEV_LEN = 14
comment_rule = iptables_manager.comment_rule
class IptablesFirewallDriver(firewall.FirewallDriver):
"""Driver which enforces security groups through iptables rules."""
IPTABLES_DIRECTION = {INGRESS_DIRECTION: 'physdev-out',
EGRESS_DIRECTION: 'physdev-in'}
IPTABLES_DIRECTION = {firewall.INGRESS_DIRECTION: 'physdev-out',
firewall.EGRESS_DIRECTION: 'physdev-in'}
def __init__(self, namespace=None):
self.iptables = iptables_manager.IptablesManager(
@ -180,14 +178,14 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
def _setup_chains_apply(self, ports, unfiltered_ports):
self._add_chain_by_name_v4v6(SG_CHAIN)
for port in ports.values():
self._setup_chain(port, INGRESS_DIRECTION)
self._setup_chain(port, EGRESS_DIRECTION)
self._setup_chain(port, firewall.INGRESS_DIRECTION)
self._setup_chain(port, firewall.EGRESS_DIRECTION)
self.iptables.ipv4['filter'].add_rule(SG_CHAIN, '-j ACCEPT')
self.iptables.ipv6['filter'].add_rule(SG_CHAIN, '-j ACCEPT')
for port in unfiltered_ports.values():
self._add_accept_rule_port_sec(port, INGRESS_DIRECTION)
self._add_accept_rule_port_sec(port, EGRESS_DIRECTION)
self._add_accept_rule_port_sec(port, firewall.INGRESS_DIRECTION)
self._add_accept_rule_port_sec(port, firewall.EGRESS_DIRECTION)
def _remove_chains(self):
"""Remove ingress and egress chain for a port."""
@ -197,12 +195,12 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
def _remove_chains_apply(self, ports, unfiltered_ports):
for port in ports.values():
self._remove_chain(port, INGRESS_DIRECTION)
self._remove_chain(port, EGRESS_DIRECTION)
self._remove_chain(port, firewall.INGRESS_DIRECTION)
self._remove_chain(port, firewall.EGRESS_DIRECTION)
self._remove_chain(port, SPOOF_FILTER)
for port in unfiltered_ports.values():
self._remove_rule_port_sec(port, INGRESS_DIRECTION)
self._remove_rule_port_sec(port, EGRESS_DIRECTION)
self._remove_rule_port_sec(port, firewall.INGRESS_DIRECTION)
self._remove_rule_port_sec(port, firewall.EGRESS_DIRECTION)
self._remove_chain_by_name_v4v6(SG_CHAIN)
def _setup_chain(self, port, DIRECTION):
@ -263,7 +261,7 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
else:
self._remove_rule_from_chain_v4v6('FORWARD', jump_rule, jump_rule)
if direction == EGRESS_DIRECTION:
if direction == firewall.EGRESS_DIRECTION:
jump_rule = ['-m physdev --%s %s --physdev-is-bridged '
'-j ACCEPT' % (self.IPTABLES_DIRECTION[direction],
device)]
@ -300,7 +298,7 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
self._add_rules_to_chain_v4v6(SG_CHAIN, jump_rule, jump_rule,
comment=ic.SG_TO_VM_SG)
if direction == EGRESS_DIRECTION:
if direction == firewall.EGRESS_DIRECTION:
self._add_rules_to_chain_v4v6('INPUT', jump_rule, jump_rule,
comment=ic.INPUT_TO_SG)
@ -458,11 +456,11 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
ipv4_iptables_rules = []
ipv6_iptables_rules = []
# include fixed egress/ingress rules
if direction == EGRESS_DIRECTION:
if direction == firewall.EGRESS_DIRECTION:
self._add_fixed_egress_rules(port,
ipv4_iptables_rules,
ipv6_iptables_rules)
elif direction == INGRESS_DIRECTION:
elif direction == firewall.INGRESS_DIRECTION:
ipv6_iptables_rules += self._accept_inbound_icmpv6()
# include IPv4 and IPv6 iptable rules from security group
ipv4_iptables_rules += self._convert_sgr_to_iptables_rules(
@ -717,7 +715,7 @@ class OVSHybridIptablesFirewallDriver(IptablesFirewallDriver):
return ('qvb' + port['device'])[:LINUX_DEV_LEN]
def _get_jump_rule(self, port, direction):
if direction == INGRESS_DIRECTION:
if direction == firewall.INGRESS_DIRECTION:
device = self._get_br_device_name(port)
else:
device = self._get_device_name(port)
@ -740,11 +738,13 @@ class OVSHybridIptablesFirewallDriver(IptablesFirewallDriver):
def _add_chain(self, port, direction):
super(OVSHybridIptablesFirewallDriver, self)._add_chain(port,
direction)
if direction in [INGRESS_DIRECTION, EGRESS_DIRECTION]:
if direction in [firewall.INGRESS_DIRECTION,
firewall.EGRESS_DIRECTION]:
self._add_raw_chain_rules(port, direction)
def _remove_chain(self, port, direction):
super(OVSHybridIptablesFirewallDriver, self)._remove_chain(port,
direction)
if direction in [INGRESS_DIRECTION, EGRESS_DIRECTION]:
if direction in [firewall.INGRESS_DIRECTION,
firewall.EGRESS_DIRECTION]:
self._remove_raw_chain_rules(port, direction)

View File

@ -0,0 +1,265 @@
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import fixtures
from neutron.agent import firewall
from neutron.tests.common import machine_fixtures
from neutron.tests.common import net_helpers
class ConnectionTesterException(Exception):
pass
def _validate_direction(f):
@functools.wraps(f)
def wrap(self, direction, *args, **kwargs):
if direction not in (firewall.INGRESS_DIRECTION,
firewall.EGRESS_DIRECTION):
raise ConnectionTesterException('Unknown direction %s' % direction)
return f(self, direction, *args, **kwargs)
return wrap
class ConnectionTester(fixtures.Fixture):
"""Base class for testers
This class implements API for various methods for testing connectivity. The
concrete implementation relies on how encapsulated resources are
configured. That means child classes should define resources by themselves
(e.g. endpoints connected through linux bridge or ovs bridge).
"""
UDP = net_helpers.NetcatTester.UDP
TCP = net_helpers.NetcatTester.TCP
ICMP = 'icmp'
ARP = 'arp'
INGRESS = firewall.INGRESS_DIRECTION
EGRESS = firewall.EGRESS_DIRECTION
def _setUp(self):
self._protocol_to_method = {
self.UDP: self._test_transport_connectivity,
self.TCP: self._test_transport_connectivity,
self.ICMP: self._test_icmp_connectivity,
self.ARP: self._test_arp_connectivity}
self._nc_testers = dict()
self.addCleanup(self.cleanup)
def cleanup(self):
for nc in self._nc_testers.values():
nc.stop_processes()
@property
def vm_namespace(self):
return self._vm.namespace
@property
def vm_ip_address(self):
return self._vm.ip
@property
def vm_ip_cidr(self):
return self._vm.ip_cidr
@vm_ip_cidr.setter
def vm_ip_cidr(self, ip_cidr):
self._vm.ip_cidr = ip_cidr
@property
def vm_mac_address(self):
return self._vm.port.link.address
@vm_mac_address.setter
def vm_mac_address(self, mac_address):
self._vm.mac_address = mac_address
@property
def peer_namespace(self):
return self._peer.namespace
@property
def peer_ip_address(self):
return self._peer.ip
def flush_arp_tables(self):
"""Flush arptables in all used namespaces"""
for machine in (self._peer, self._vm):
machine.port.neigh.flush(4, 'all')
def _test_transport_connectivity(self, direction, protocol, src_port,
dst_port):
nc_tester = self._create_nc_tester(direction, protocol, src_port,
dst_port)
try:
nc_tester.test_connectivity()
except RuntimeError as exc:
raise ConnectionTesterException(
"%s connection over %s protocol with %s source port and "
"%s destination port can't be established: %s" % (
direction, protocol, src_port, dst_port, exc))
@_validate_direction
def _get_namespace_and_address(self, direction):
if direction == self.INGRESS:
return self.peer_namespace, self.vm_ip_address
return self.vm_namespace, self.peer_ip_address
def _test_icmp_connectivity(self, direction, protocol, src_port, dst_port):
src_namespace, ip_address = self._get_namespace_and_address(direction)
try:
net_helpers.assert_ping(src_namespace, ip_address)
except RuntimeError:
raise ConnectionTesterException(
"ICMP packets can't get from %s namespace to %s address" % (
src_namespace, ip_address))
def _test_arp_connectivity(self, direction, protocol, src_port, dst_port):
src_namespace, ip_address = self._get_namespace_and_address(direction)
try:
net_helpers.assert_arping(src_namespace, ip_address)
except RuntimeError:
raise ConnectionTesterException(
"ARP queries to %s address have no response from %s namespace"
% (ip_address, src_namespace))
@_validate_direction
def assert_connection(self, direction, protocol, src_port=None,
dst_port=None):
testing_method = self._protocol_to_method[protocol]
testing_method(direction, protocol, src_port, dst_port)
def assert_no_connection(self, direction, protocol, src_port=None,
dst_port=None):
try:
self.assert_connection(direction, protocol, src_port, dst_port)
except ConnectionTesterException:
pass
else:
dst_port_info = str()
src_port_info = str()
if dst_port is not None:
dst_port_info = " and destionation port %d" % dst_port
if src_port is not None:
src_port_info = " and source port %d" % src_port
raise ConnectionTesterException("%s connection with %s protocol%s"
"%s was established but it "
"shouldn't be possible" % (
direction, protocol,
src_port_info, dst_port_info))
@_validate_direction
def assert_established_connection(self, direction, protocol, src_port=None,
dst_port=None):
nc_params = (direction, protocol, src_port, dst_port)
nc_tester = self._nc_testers.get(nc_params)
if nc_tester:
if nc_tester.is_established:
nc_tester.test_connectivity()
else:
raise ConnectionTesterException(
'%s connection with protocol %s, source port %s and '
'destination port %s is not established' % nc_params)
else:
raise ConnectionTesterException(
"Attempting to test established %s connection with protocol %s"
", source port %s and destination port %s that hasn't been "
"established yet by calling establish_connection()"
% nc_params)
def assert_no_established_connection(self, direction, protocol,
src_port=None, dst_port=None):
try:
self.assert_established_connection(direction, protocol, src_port,
dst_port)
except ConnectionTesterException:
pass
else:
raise ConnectionTesterException(
'Established %s connection with protocol %s, source port %s, '
'destination port %s can still send packets throught' % (
direction, protocol, src_port, dst_port))
@_validate_direction
def establish_connection(self, direction, protocol, src_port=None,
dst_port=None):
nc_tester = self._create_nc_tester(direction, protocol, src_port,
dst_port)
nc_tester.establish_connection()
self.addCleanup(nc_tester.stop_processes)
def _create_nc_tester(self, direction, protocol, src_port, dst_port):
"""Create netcat tester
If there already exists a netcat tester that has established
connection, exception is raised.
"""
nc_key = (direction, protocol, src_port, dst_port)
nc_tester = self._nc_testers.get(nc_key)
if nc_tester and nc_tester.is_established:
raise ConnectionTesterException(
'%s connection using %s protocol, source port %s and '
'destination port %s is already established' % (
direction, protocol, src_port, dst_port))
if direction == self.INGRESS:
client_ns = self.peer_namespace
server_ns = self.vm_namespace
server_addr = self.vm_ip_address
else:
client_ns = self.vm_namespace
server_ns = self.peer_namespace
server_addr = self.peer_ip_address
server_port = dst_port or net_helpers.get_free_namespace_port(
protocol, server_ns)
nc_tester = net_helpers.NetcatTester(client_namespace=client_ns,
server_namespace=server_ns,
address=server_addr,
protocol=protocol,
src_port=src_port,
dst_port=server_port)
self._nc_testers[nc_key] = nc_tester
return nc_tester
class LinuxBridgeConnectionTester(ConnectionTester):
"""Tester with linux bridge in the middle
Both endpoints are placed in their separated namespace connected to
bridge's namespace via veth pair.
"""
def _setUp(self):
super(LinuxBridgeConnectionTester, self)._setUp()
self._bridge = self.useFixture(net_helpers.LinuxBridgeFixture()).bridge
self._peer, self._vm = self.useFixture(
machine_fixtures.PeerMachines(self._bridge)).machines
@property
def bridge_namespace(self):
return self._bridge.namespace
@property
def vm_port_id(self):
return net_helpers.VethFixture.get_peer_name(self._vm.port.name)
def flush_arp_tables(self):
self._bridge.neigh.flush(4, 'all')
super(LinuxBridgeConnectionTester, self).flush_arp_tables()

View File

@ -39,8 +39,7 @@ class FakeMachine(fixtures.Fixture):
def __init__(self, bridge, ip_cidr, gateway_ip=None):
super(FakeMachine, self).__init__()
self.bridge = bridge
self.ip_cidr = ip_cidr
self.ip = self.ip_cidr.partition('/')[0]
self._ip_cidr = ip_cidr
self.gateway_ip = gateway_ip
def _setUp(self):
@ -50,11 +49,35 @@ class FakeMachine(fixtures.Fixture):
self.port = self.useFixture(
net_helpers.PortFixture.get(self.bridge, self.namespace)).port
self.port.addr.add(self.ip_cidr)
self.port.addr.add(self._ip_cidr)
if self.gateway_ip:
net_helpers.set_namespace_gateway(self.port, self.gateway_ip)
@property
def ip(self):
return self._ip_cidr.partition('/')[0]
@property
def ip_cidr(self):
return self._ip_cidr
@ip_cidr.setter
def ip_cidr(self, ip_cidr):
self.port.addr.add(ip_cidr)
self.port.addr.delete(self._ip_cidr)
self._ip_cidr = ip_cidr
@property
def mac_address(self):
return self.port.link.address
@mac_address.setter
def mac_address(self, mac_address):
self.port.link.set_down()
self.port.link.set_address(mac_address)
self.port.link.set_up()
def execute(self, *args, **kwargs):
ns_ip_wrapper = ip_lib.IPWrapper(self.namespace)
return ns_ip_wrapper.netns.execute(*args, **kwargs)

View File

@ -275,6 +275,10 @@ class NetcatTester(object):
address=self.server_address,
listen=True)
@property
def is_established(self):
return bool(self._client_process and not self._client_process.poll())
def establish_connection(self):
if self._client_process:
raise RuntimeError('%(proto)s connection to $(ip_addr)s is already'