Files
python-ganttclient/nova/tests/test_network.py
Soren Hansen 8867e8898f Merge trunk
2011-03-14 14:21:44 +01:00

538 lines
24 KiB
Python

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for network code
"""
import IPy
import os
import time
from nova import context
from nova import db
from nova import exception
from nova import flags
from nova import log as logging
from nova import test
from nova import utils
from nova.auth import manager
from nova.network import linux_net
FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.tests.network')
class IptablesManagerTestCase(test.TestCase):
sample_filter = ['#Generated by iptables-save on Fri Feb 18 15:17:05 2011',
'*filter',
':INPUT ACCEPT [2223527:305688874]',
':FORWARD ACCEPT [0:0]',
':OUTPUT ACCEPT [2172501:140856656]',
':nova-compute-FORWARD - [0:0]',
':nova-compute-INPUT - [0:0]',
':nova-compute-local - [0:0]',
':nova-compute-OUTPUT - [0:0]',
':nova-filter-top - [0:0]',
'-A FORWARD -j nova-filter-top ',
'-A OUTPUT -j nova-filter-top ',
'-A nova-filter-top -j nova-compute-local ',
'-A INPUT -j nova-compute-INPUT ',
'-A OUTPUT -j nova-compute-OUTPUT ',
'-A FORWARD -j nova-compute-FORWARD ',
'-A INPUT -i virbr0 -p udp -m udp --dport 53 -j ACCEPT ',
'-A INPUT -i virbr0 -p tcp -m tcp --dport 53 -j ACCEPT ',
'-A INPUT -i virbr0 -p udp -m udp --dport 67 -j ACCEPT ',
'-A INPUT -i virbr0 -p tcp -m tcp --dport 67 -j ACCEPT ',
'-A FORWARD -s 192.168.122.0/24 -i virbr0 -j ACCEPT ',
'-A FORWARD -i virbr0 -o virbr0 -j ACCEPT ',
'-A FORWARD -o virbr0 -j REJECT --reject-with '
'icmp-port-unreachable ',
'-A FORWARD -i virbr0 -j REJECT --reject-with '
'icmp-port-unreachable ',
'COMMIT',
'# Completed on Fri Feb 18 15:17:05 2011']
sample_nat = ['# Generated by iptables-save on Fri Feb 18 15:17:05 2011',
'*nat',
':PREROUTING ACCEPT [3936:762355]',
':INPUT ACCEPT [2447:225266]',
':OUTPUT ACCEPT [63491:4191863]',
':POSTROUTING ACCEPT [63112:4108641]',
':nova-compute-OUTPUT - [0:0]',
':nova-compute-floating-ip-snat - [0:0]',
':nova-compute-SNATTING - [0:0]',
':nova-compute-PREROUTING - [0:0]',
':nova-compute-POSTROUTING - [0:0]',
':nova-postrouting-bottom - [0:0]',
'-A PREROUTING -j nova-compute-PREROUTING ',
'-A OUTPUT -j nova-compute-OUTPUT ',
'-A POSTROUTING -j nova-compute-POSTROUTING ',
'-A POSTROUTING -j nova-postrouting-bottom ',
'-A nova-postrouting-bottom -j nova-compute-SNATTING ',
'-A nova-compute-SNATTING -j nova-compute-floating-ip-snat ',
'COMMIT',
'# Completed on Fri Feb 18 15:17:05 2011']
def setUp(self):
super(IptablesManagerTestCase, self).setUp()
self.manager = linux_net.IptablesManager()
def test_filter_rules_are_wrapped(self):
current_lines = self.sample_filter
table = self.manager.ipv4['filter']
table.add_rule('FORWARD', '-s 1.2.3.4/5 -j DROP')
new_lines = self.manager._modify_rules(current_lines, table)
self.assertTrue('-A run_tests.py-FORWARD '
'-s 1.2.3.4/5 -j DROP' in new_lines)
table.remove_rule('FORWARD', '-s 1.2.3.4/5 -j DROP')
new_lines = self.manager._modify_rules(current_lines, table)
self.assertTrue('-A run_tests.py-FORWARD '
'-s 1.2.3.4/5 -j DROP' not in new_lines)
def test_nat_rules(self):
current_lines = self.sample_nat
new_lines = self.manager._modify_rules(current_lines,
self.manager.ipv4['nat'])
for line in [':nova-compute-OUTPUT - [0:0]',
':nova-compute-floating-ip-snat - [0:0]',
':nova-compute-SNATTING - [0:0]',
':nova-compute-PREROUTING - [0:0]',
':nova-compute-POSTROUTING - [0:0]']:
self.assertTrue(line in new_lines, "One of nova-compute's chains "
"went missing.")
seen_lines = set()
for line in new_lines:
line = line.strip()
self.assertTrue(line not in seen_lines,
"Duplicate line: %s" % line)
seen_lines.add(line)
last_postrouting_line = ''
for line in new_lines:
if line.startswith('-A POSTROUTING'):
last_postrouting_line = line
self.assertTrue('-j nova-postrouting-bottom' in last_postrouting_line,
"Last POSTROUTING rule does not jump to "
"nova-postouting-bottom: %s" % last_postrouting_line)
for chain in ['POSTROUTING', 'PREROUTING', 'OUTPUT']:
self.assertTrue('-A %s -j run_tests.py-%s' \
% (chain, chain) in new_lines,
"Built-in chain %s not wrapped" % (chain,))
def test_filter_rules(self):
current_lines = self.sample_filter
new_lines = self.manager._modify_rules(current_lines,
self.manager.ipv4['filter'])
for line in [':nova-compute-FORWARD - [0:0]',
':nova-compute-INPUT - [0:0]',
':nova-compute-local - [0:0]',
':nova-compute-OUTPUT - [0:0]']:
self.assertTrue(line in new_lines, "One of nova-compute's chains"
" went missing.")
seen_lines = set()
for line in new_lines:
line = line.strip()
self.assertTrue(line not in seen_lines,
"Duplicate line: %s" % line)
seen_lines.add(line)
for chain in ['FORWARD', 'OUTPUT']:
for line in new_lines:
if line.startswith('-A %s' % chain):
self.assertTrue('-j nova-filter-top' in line,
"First %s rule does not "
"jump to nova-filter-top" % chain)
break
self.assertTrue('-A nova-filter-top '
'-j run_tests.py-local' in new_lines,
"nova-filter-top does not jump to wrapped local chain")
for chain in ['INPUT', 'OUTPUT', 'FORWARD']:
self.assertTrue('-A %s -j run_tests.py-%s' \
% (chain, chain) in new_lines,
"Built-in chain %s not wrapped" % (chain,))
class NetworkTestCase(test.TestCase):
"""Test cases for network code"""
def setUp(self):
super(NetworkTestCase, self).setUp()
# NOTE(vish): if you change these flags, make sure to change the
# flags in the corresponding section in nova-dhcpbridge
self.flags(connection_type='fake',
fake_call=True,
fake_network=True)
self.manager = manager.AuthManager()
self.user = self.manager.create_user('netuser', 'netuser', 'netuser')
self.projects = []
self.network = utils.import_object(FLAGS.network_manager)
self.context = context.RequestContext(project=None, user=self.user)
for i in range(FLAGS.num_networks):
name = 'project%s' % i
project = self.manager.create_project(name, 'netuser', name)
self.projects.append(project)
# create the necessary network data for the project
user_context = context.RequestContext(project=self.projects[i],
user=self.user)
host = self.network.get_network_host(user_context.elevated())
instance_ref = self._create_instance(0)
self.instance_id = instance_ref['id']
instance_ref = self._create_instance(1)
self.instance2_id = instance_ref['id']
def tearDown(self):
# TODO(termie): this should really be instantiating clean datastores
# in between runs, one failure kills all the tests
db.instance_destroy(context.get_admin_context(), self.instance_id)
db.instance_destroy(context.get_admin_context(), self.instance2_id)
for project in self.projects:
self.manager.delete_project(project)
self.manager.delete_user(self.user)
super(NetworkTestCase, self).tearDown()
def _create_instance(self, project_num, mac=None):
if not mac:
mac = utils.generate_mac()
project = self.projects[project_num]
self.context._project = project
self.context.project_id = project.id
return db.instance_create(self.context,
{'project_id': project.id,
'mac_address': mac})
def _create_address(self, project_num, instance_id=None):
"""Create an address in given project num"""
if instance_id is None:
instance_id = self.instance_id
self.context._project = self.projects[project_num]
self.context.project_id = self.projects[project_num].id
return self.network.allocate_fixed_ip(self.context, instance_id)
def _deallocate_address(self, project_num, address):
self.context._project = self.projects[project_num]
self.context.project_id = self.projects[project_num].id
self.network.deallocate_fixed_ip(self.context, address)
def test_private_ipv6(self):
"""Make sure ipv6 is OK"""
if FLAGS.use_ipv6:
instance_ref = self._create_instance(0)
address = self._create_address(0, instance_ref['id'])
network_ref = db.project_get_network(
context.get_admin_context(),
self.context.project_id)
address_v6 = db.instance_get_fixed_address_v6(
context.get_admin_context(),
instance_ref['id'])
self.assertEqual(instance_ref['mac_address'],
utils.to_mac(address_v6))
instance_ref2 = db.fixed_ip_get_instance_v6(
context.get_admin_context(),
address_v6)
self.assertEqual(instance_ref['id'], instance_ref2['id'])
self.assertEqual(address_v6,
utils.to_global_ipv6(
network_ref['cidr_v6'],
instance_ref['mac_address']))
self._deallocate_address(0, address)
db.instance_destroy(context.get_admin_context(),
instance_ref['id'])
def test_public_network_association(self):
"""Makes sure that we can allocaate a public ip"""
# TODO(vish): better way of adding floating ips
self.context._project = self.projects[0]
self.context.project_id = self.projects[0].id
pubnet = IPy.IP(flags.FLAGS.floating_range)
address = str(pubnet[0])
try:
db.floating_ip_get_by_address(context.get_admin_context(), address)
except exception.NotFound:
db.floating_ip_create(context.get_admin_context(),
{'address': address,
'host': FLAGS.host})
float_addr = self.network.allocate_floating_ip(self.context,
self.projects[0].id)
fix_addr = self._create_address(0)
lease_ip(fix_addr)
self.assertEqual(float_addr, str(pubnet[0]))
self.network.associate_floating_ip(self.context, float_addr, fix_addr)
address = db.instance_get_floating_address(context.get_admin_context(),
self.instance_id)
self.assertEqual(address, float_addr)
self.network.disassociate_floating_ip(self.context, float_addr)
address = db.instance_get_floating_address(context.get_admin_context(),
self.instance_id)
self.assertEqual(address, None)
self.network.deallocate_floating_ip(self.context, float_addr)
self.network.deallocate_fixed_ip(self.context, fix_addr)
release_ip(fix_addr)
db.floating_ip_destroy(context.get_admin_context(), float_addr)
def test_allocate_deallocate_fixed_ip(self):
"""Makes sure that we can allocate and deallocate a fixed ip"""
address = self._create_address(0)
self.assertTrue(is_allocated_in_project(address, self.projects[0].id))
lease_ip(address)
self._deallocate_address(0, address)
# Doesn't go away until it's dhcp released
self.assertTrue(is_allocated_in_project(address, self.projects[0].id))
release_ip(address)
self.assertFalse(is_allocated_in_project(address, self.projects[0].id))
def test_side_effects(self):
"""Ensures allocating and releasing has no side effects"""
address = self._create_address(0)
address2 = self._create_address(1, self.instance2_id)
self.assertTrue(is_allocated_in_project(address, self.projects[0].id))
self.assertTrue(is_allocated_in_project(address2, self.projects[1].id))
self.assertFalse(is_allocated_in_project(address, self.projects[1].id))
# Addresses are allocated before they're issued
lease_ip(address)
lease_ip(address2)
self._deallocate_address(0, address)
release_ip(address)
self.assertFalse(is_allocated_in_project(address, self.projects[0].id))
# First address release shouldn't affect the second
self.assertTrue(is_allocated_in_project(address2, self.projects[1].id))
self._deallocate_address(1, address2)
release_ip(address2)
self.assertFalse(is_allocated_in_project(address2,
self.projects[1].id))
def test_subnet_edge(self):
"""Makes sure that private ips don't overlap"""
first = self._create_address(0)
lease_ip(first)
instance_ids = []
for i in range(1, FLAGS.num_networks):
instance_ref = self._create_instance(i, mac=utils.generate_mac())
instance_ids.append(instance_ref['id'])
address = self._create_address(i, instance_ref['id'])
instance_ref = self._create_instance(i, mac=utils.generate_mac())
instance_ids.append(instance_ref['id'])
address2 = self._create_address(i, instance_ref['id'])
instance_ref = self._create_instance(i, mac=utils.generate_mac())
instance_ids.append(instance_ref['id'])
address3 = self._create_address(i, instance_ref['id'])
lease_ip(address)
lease_ip(address2)
lease_ip(address3)
self.context._project = self.projects[i]
self.context.project_id = self.projects[i].id
self.assertFalse(is_allocated_in_project(address,
self.projects[0].id))
self.assertFalse(is_allocated_in_project(address2,
self.projects[0].id))
self.assertFalse(is_allocated_in_project(address3,
self.projects[0].id))
self.network.deallocate_fixed_ip(self.context, address)
self.network.deallocate_fixed_ip(self.context, address2)
self.network.deallocate_fixed_ip(self.context, address3)
release_ip(address)
release_ip(address2)
release_ip(address3)
for instance_id in instance_ids:
db.instance_destroy(context.get_admin_context(), instance_id)
self.context._project = self.projects[0]
self.context.project_id = self.projects[0].id
self.network.deallocate_fixed_ip(self.context, first)
self._deallocate_address(0, first)
release_ip(first)
def test_vpn_ip_and_port_looks_valid(self):
"""Ensure the vpn ip and port are reasonable"""
self.assert_(self.projects[0].vpn_ip)
self.assert_(self.projects[0].vpn_port >= FLAGS.vpn_start)
self.assert_(self.projects[0].vpn_port <= FLAGS.vpn_start +
FLAGS.num_networks)
def test_too_many_networks(self):
"""Ensure error is raised if we run out of networks"""
projects = []
networks_left = (FLAGS.num_networks -
db.network_count(context.get_admin_context()))
for i in range(networks_left):
project = self.manager.create_project('many%s' % i, self.user)
projects.append(project)
db.project_get_network(context.get_admin_context(), project.id)
project = self.manager.create_project('last', self.user)
projects.append(project)
self.assertRaises(db.NoMoreNetworks,
db.project_get_network,
context.get_admin_context(),
project.id)
for project in projects:
self.manager.delete_project(project)
def test_ips_are_reused(self):
"""Makes sure that ip addresses that are deallocated get reused"""
address = self._create_address(0)
lease_ip(address)
self.network.deallocate_fixed_ip(self.context, address)
release_ip(address)
address2 = self._create_address(0)
self.assertEqual(address, address2)
lease_ip(address)
self.network.deallocate_fixed_ip(self.context, address2)
release_ip(address)
def test_available_ips(self):
"""Make sure the number of available ips for the network is correct
The number of available IP addresses depends on the test
environment's setup.
Network size is set in test fixture's setUp method.
There are ips reserved at the bottom and top of the range.
services (network, gateway, CloudPipe, broadcast)
"""
network = db.project_get_network(context.get_admin_context(),
self.projects[0].id)
net_size = flags.FLAGS.network_size
admin_context = context.get_admin_context()
total_ips = (db.network_count_available_ips(admin_context,
network['id']) +
db.network_count_reserved_ips(admin_context,
network['id']) +
db.network_count_allocated_ips(admin_context,
network['id']))
self.assertEqual(total_ips, net_size)
def test_too_many_addresses(self):
"""Test for a NoMoreAddresses exception when all fixed ips are used.
"""
admin_context = context.get_admin_context()
network = db.project_get_network(admin_context, self.projects[0].id)
num_available_ips = db.network_count_available_ips(admin_context,
network['id'])
addresses = []
instance_ids = []
for i in range(num_available_ips):
instance_ref = self._create_instance(0)
instance_ids.append(instance_ref['id'])
address = self._create_address(0, instance_ref['id'])
addresses.append(address)
lease_ip(address)
ip_count = db.network_count_available_ips(context.get_admin_context(),
network['id'])
self.assertEqual(ip_count, 0)
self.assertRaises(db.NoMoreAddresses,
self.network.allocate_fixed_ip,
self.context,
'foo')
for i in range(num_available_ips):
self.network.deallocate_fixed_ip(self.context, addresses[i])
release_ip(addresses[i])
db.instance_destroy(context.get_admin_context(), instance_ids[i])
ip_count = db.network_count_available_ips(context.get_admin_context(),
network['id'])
self.assertEqual(ip_count, num_available_ips)
def test_dhcp_lease_output(self):
admin_ctxt = context.get_admin_context()
address = self._create_address(0, self.instance_id)
lease_ip(address)
network_ref = db.network_get_by_instance(admin_ctxt, self.instance_id)
leases = linux_net.get_dhcp_leases(context.get_admin_context(),
network_ref['id'])
for line in leases.split('\n'):
seconds, mac, ip, hostname, client_id = line.split(' ')
self.assertTrue(int(seconds) > time.time(), 'Lease expires in '
'the past')
octets = mac.split(':')
self.assertEqual(len(octets), 6, "Wrong number of octets "
"in %s" % (max,))
for octet in octets:
self.assertEqual(len(octet), 2, "Oddly sized octet: %s"
% (octet,))
# This will throw an exception if the octet is invalid
int(octet, 16)
# And this will raise an exception in case of an invalid IP
IPy.IP(ip)
release_ip(address)
def is_allocated_in_project(address, project_id):
"""Returns true if address is in specified project"""
project_net = db.project_get_network(context.get_admin_context(),
project_id)
network = db.fixed_ip_get_network(context.get_admin_context(), address)
instance = db.fixed_ip_get_instance(context.get_admin_context(), address)
# instance exists until release
return instance is not None and network['id'] == project_net['id']
def binpath(script):
"""Returns the absolute path to a script in bin"""
return os.path.abspath(os.path.join(__file__, "../../../bin", script))
def lease_ip(private_ip):
"""Run add command on dhcpbridge"""
network_ref = db.fixed_ip_get_network(context.get_admin_context(),
private_ip)
instance_ref = db.fixed_ip_get_instance(context.get_admin_context(),
private_ip)
cmd = (binpath('nova-dhcpbridge'), 'add',
instance_ref['mac_address'],
private_ip, 'fake')
env = {'DNSMASQ_INTERFACE': network_ref['bridge'],
'TESTING': '1',
'FLAGFILE': FLAGS.dhcpbridge_flagfile}
(out, err) = utils.execute(*cmd, addl_env=env)
LOG.debug("ISSUE_IP: %s, %s ", out, err)
def release_ip(private_ip):
"""Run del command on dhcpbridge"""
network_ref = db.fixed_ip_get_network(context.get_admin_context(),
private_ip)
instance_ref = db.fixed_ip_get_instance(context.get_admin_context(),
private_ip)
cmd = (binpath('nova-dhcpbridge'), 'del',
instance_ref['mac_address'],
private_ip, 'fake')
env = {'DNSMASQ_INTERFACE': network_ref['bridge'],
'TESTING': '1',
'FLAGFILE': FLAGS.dhcpbridge_flagfile}
(out, err) = utils.execute(*cmd, addl_env=env)
LOG.debug("RELEASE_IP: %s, %s ", out, err)