Files
python-ganttclient/nova/tests/test_quantum.py
Jason Kölker 3fc6879894 Standardize logging delaration and use
* Make modules use getLogger(__name__) and log to the result

Change-Id: Ib6d69b4be140ec89affc86ed11e65e422d551df1
2012-02-14 13:02:43 -06:00

509 lines
19 KiB
Python

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Nicira, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import context
from nova import db
from nova.db.sqlalchemy import models
from nova.db.sqlalchemy.session import get_session
from nova import exception
from nova import log as logging
from nova.network.quantum import manager as quantum_manager
from nova.network.quantum import melange_connection
from nova import test
from nova import utils
from nova.network import manager
LOG = logging.getLogger(__name__)
# this class can be used for unit functional/testing on nova,
# as it does not actually make remote calls to the Quantum service
class FakeQuantumClientConnection(object):
def __init__(self):
self.nets = {}
def get_networks_for_tenant(self, tenant_id):
net_ids = []
for net_id, n in self.nets.items():
if n['tenant-id'] == tenant_id:
net_ids.append(net_id)
return {'networks': net_ids}
def create_network(self, tenant_id, network_name, **kwargs):
uuid = str(utils.gen_uuid())
self.nets[uuid] = {'net-name': network_name,
'tenant-id': tenant_id,
'ports': {}}
return uuid
def delete_network(self, tenant_id, net_id):
if self.nets[net_id]['tenant-id'] == tenant_id:
del self.nets[net_id]
def network_exists(self, tenant_id, net_id):
try:
return self.nets[net_id]['tenant-id'] == tenant_id
except KeyError:
return False
def get_network_name(self, tenant_id, net_id):
return self.nets[net_id]['net-name']
def _confirm_not_attached(self, interface_id):
for n in self.nets.values():
for p in n['ports'].values():
if p['attachment-id'] == interface_id:
raise Exception(_("interface '%s' is already attached" %
interface_id))
def create_and_attach_port(self, tenant_id, net_id, interface_id,
**kwargs):
if not self.network_exists(tenant_id, net_id):
raise Exception(
_("network %(net_id)s does not exist for tenant %(tenant_id)"
% locals()))
self._confirm_not_attached(interface_id)
uuid = str(utils.gen_uuid())
self.nets[net_id]['ports'][uuid] = \
{"port-state": "ACTIVE",
"attachment-id": interface_id}
def detach_and_delete_port(self, tenant_id, net_id, port_id):
if not self.network_exists(tenant_id, net_id):
raise exception.NotFound(
_("network %(net_id)s does not exist "
"for tenant %(tenant_id)s" % locals()))
del self.nets[net_id]['ports'][port_id]
def get_port_by_attachment(self, tenant_id, net_id, attachment_id):
for nid, n in self.nets.items():
if nid == net_id and n['tenant-id'] == tenant_id:
for port_id, p in n['ports'].items():
if p['attachment-id'] == attachment_id:
return port_id
return None
def get_attached_ports(self, tenant_id, net_id):
ports = []
for nid, n in self.nets.items():
if nid == net_id and n['tenant-id'] == tenant_id:
for port_id, p in n['ports'].items():
ports.append({'port-id': port_id,
'attachment': p['attachment-id']})
return ports
def get_networks(self, tenant_id):
nets = []
for nid, n in self.nets.items():
if n['tenant-id'] == tenant_id:
x = {'id': nid}
nets.append(x)
return {'networks': nets}
networks = [{'label': 'project1-net1',
'injected': False,
'multi_host': False,
'cidr': '192.168.0.0/24',
'cidr_v6': '2001:1db8::/64',
'gateway_v6': '2001:1db8::1',
'netmask_v6': '64',
'netmask': '255.255.255.0',
'bridge': None,
'bridge_interface': None,
'gateway': '192.168.0.1',
'broadcast': '192.168.0.255',
'dns1': '192.168.0.1',
'dns2': '192.168.0.2',
'vlan': None,
'host': None,
'vpn_public_address': None,
'project_id': 'fake_project1',
'priority': 1},
{'label': 'project2-net1',
'injected': False,
'multi_host': False,
'cidr': '192.168.1.0/24',
'cidr_v6': '2001:1db9::/64',
'gateway_v6': '2001:1db9::1',
'netmask_v6': '64',
'netmask': '255.255.255.0',
'bridge': None,
'bridge_interface': None,
'gateway': '192.168.1.1',
'broadcast': '192.168.1.255',
'dns1': '192.168.0.1',
'dns2': '192.168.0.2',
'vlan': None,
'host': None,
'project_id': 'fake_project2',
'priority': 1},
{'label': "public",
'injected': False,
'multi_host': False,
'cidr': '10.0.0.0/24',
'cidr_v6': '2001:1dba::/64',
'gateway_v6': '2001:1dba::1',
'netmask_v6': '64',
'netmask': '255.255.255.0',
'bridge': None,
'bridge_interface': None,
'gateway': '10.0.0.1',
'broadcast': '10.0.0.255',
'dns1': '10.0.0.1',
'dns2': '10.0.0.2',
'vlan': None,
'host': None,
'project_id': None,
'priority': 0},
{'label': "project2-net2",
'injected': False,
'multi_host': False,
'cidr': '9.0.0.0/24',
'cidr_v6': '2001:1dbb::/64',
'gateway_v6': '2001:1dbb::1',
'netmask_v6': '64',
'netmask': '255.255.255.0',
'bridge': None,
'bridge_interface': None,
'gateway': '9.0.0.1',
'broadcast': '9.0.0.255',
'dns1': '9.0.0.1',
'dns2': '9.0.0.2',
'vlan': None,
'host': None,
'project_id': "fake_project2",
'priority': 2}]
# this is a base class to be used by all other Quantum Test classes
class QuantumNovaTestCase(test.TestCase):
def setUp(self):
super(QuantumNovaTestCase, self).setUp()
self.net_man = quantum_manager.QuantumManager(
ipam_lib="nova.network.quantum.nova_ipam_lib",
q_conn=FakeQuantumClientConnection())
# Tests seem to create some networks by default, which
# we don't want. So we delete them.
ctx = context.RequestContext('user1', 'fake_project1').elevated()
for n in db.network_get_all(ctx):
db.network_delete_safe(ctx, n['id'])
# Other unit tests (e.g., test_compute.py) have a nasty
# habit of of creating fixed IPs and not cleaning up, which
# can confuse these tests, so we remove all existing fixed
# ips before starting.
session = get_session()
result = session.query(models.FixedIp).all()
with session.begin():
for fip_ref in result:
session.delete(fip_ref)
def _create_network(self, n):
ctx = context.RequestContext('user1', n['project_id'])
nwks = self.net_man.create_networks(
ctx,
label=n['label'], cidr=n['cidr'],
multi_host=n['multi_host'],
num_networks=1, network_size=256,
cidr_v6=n['cidr_v6'],
gateway=n['gateway'],
gateway_v6=n['gateway_v6'], bridge=None,
bridge_interface=None, dns1=n['dns1'],
dns2=n['dns2'],
project_id=n['project_id'],
priority=n['priority'])
n['uuid'] = nwks[0]['uuid']
class QuantumNovaIPAMTestCase(QuantumNovaTestCase):
def test_create_and_delete_nets(self):
self._create_nets()
self._delete_nets()
def _create_nets(self):
for n in networks:
self._create_network(n)
def _delete_nets(self):
for n in networks:
ctx = context.RequestContext('user1', n['project_id'])
db_nets = db.network_get_all(ctx.elevated())
for x in db_nets:
if x['label'] == n['label']:
n['uuid'] = x['uuid']
self.net_man.delete_network(ctx, None, n['uuid'])
def test_allocate_and_deallocate_instance_static(self):
self._create_nets()
project_id = "fake_project1"
ctx = context.RequestContext('user1', project_id)
instance_ref = db.instance_create(ctx,
{"project_id": project_id})
def func(arg1, arg2):
pass
def func2(arg1, arg2, arg3):
pass
def func1(arg1):
pass
self.net_man.driver.update_dhcp_hostfile_with_text = func
self.net_man.driver.restart_dhcp = func2
self.net_man.driver.kill_dhcp = func1
nw_info = self.net_man.allocate_for_instance(ctx.elevated(),
instance_id=instance_ref['id'], host="",
rxtx_factor=3,
project_id=project_id)
self.assertEquals(len(nw_info), 2)
cidrs = ['10.', '192.']
addrs = ['10.', '192.']
cidrs_v6 = ['2001:1dba:', '2001:1db8:']
addrs_v6 = ['2001:1dba:', '2001:1db8:']
def check_for_startswith(choices, choice):
for v in choices:
if choice.startswith(v):
choices.remove(v)
return True
return False
# we don't know which order the NICs will be in until we
# introduce the notion of priority
for vif in nw_info:
for subnet in vif['network']['subnets']:
cidr = subnet['cidr'].lower()
if subnet['version'] == 4:
# v4 cidr
self.assertTrue(check_for_startswith(cidrs, cidr))
# v4 address
address = subnet['ips'][0]['address']
self.assertTrue(check_for_startswith(addrs, address))
else:
# v6 cidr
self.assertTrue(check_for_startswith(cidrs_v6, cidr))
# v6 address
address = subnet['ips'][0]['address']
self.assertTrue(check_for_startswith(addrs_v6, address))
self.net_man.deallocate_for_instance(ctx,
instance_id=instance_ref['id'],
project_id=project_id)
self._delete_nets()
def test_allocate_and_deallocate_instance_dynamic(self):
self._create_nets()
project_id = "fake_project2"
ctx = context.RequestContext('user1', project_id)
net_ids = self.net_man.q_conn.get_networks_for_tenant(project_id)
requested_networks = [(net_id, None) for net_id in
net_ids['networks']]
self.net_man.validate_networks(ctx, requested_networks)
instance_ref = db.instance_create(ctx,
{"project_id": project_id})
def func(arg1, arg2):
pass
def func1(arg1):
pass
def func2(arg1, arg2, arg3):
pass
self.net_man.driver.update_dhcp_hostfile_with_text = func
self.net_man.driver.restart_dhcp = func2
self.net_man.driver.kill_dhcp = func1
nw_info = self.net_man.allocate_for_instance(ctx,
instance_id=instance_ref['id'], host="",
rxtx_factor=3,
project_id=project_id,
requested_networks=requested_networks)
self.assertEquals(len(nw_info), 2)
cidrs = ['9.', '192.']
addrs = ['9.', '192.']
cidrs_v6 = ['2001:1dbb:', '2001:1db9:']
addrs_v6 = ['2001:1dbb:', '2001:1db9:']
def check_for_startswith(choices, choice):
for v in choices:
if choice.startswith(v):
choices.remove(v)
return True
# we don't know which order the NICs will be in until we
# introduce the notion of priority
for vif in nw_info:
for subnet in vif['network']['subnets']:
cidr = subnet['cidr'].lower()
if subnet['version'] == 4:
# v4 cidr
self.assertTrue(check_for_startswith(cidrs, cidr))
# v4 address
address = subnet['ips'][0]['address']
self.assertTrue(check_for_startswith(addrs, address))
else:
# v6 cidr
self.assertTrue(check_for_startswith(cidrs_v6, cidr))
# v6 address
address = subnet['ips'][0]['address']
self.assertTrue(check_for_startswith(addrs_v6, address))
self.net_man.deallocate_for_instance(ctx,
instance_id=instance_ref['id'],
project_id=project_id)
self._delete_nets()
def test_validate_bad_network(self):
ctx = context.RequestContext('user1', 'fake_project1')
self.assertRaises(exception.NetworkNotFound,
self.net_man.validate_networks, ctx, [("", None)])
class QuantumNovaMACGenerationTestCase(QuantumNovaTestCase):
def test_local_mac_address_creation(self):
self.flags(use_melange_mac_generation=False)
fake_mac = "ab:cd:ef:ab:cd:ef"
self.stubs.Set(utils, "generate_mac_address",
lambda: fake_mac)
project_id = "fake_project1"
ctx = context.RequestContext('user1', project_id)
self._create_network(networks[0])
net_ids = self.net_man.q_conn.get_networks_for_tenant(project_id)
requested_networks = [(net_id, None) for net_id in net_ids['networks']]
instance_ref = db.api.instance_create(ctx,
{"project_id": project_id})
nw_info = self.net_man.allocate_for_instance(ctx,
instance_id=instance_ref['id'], host="",
rxtx_factor=3,
project_id=project_id,
requested_networks=requested_networks)
self.assertEqual(nw_info[0]['address'], fake_mac)
def test_melange_mac_address_creation(self):
self.flags(use_melange_mac_generation=True)
fake_mac = "ab:cd:ef:ab:cd:ef"
self.stubs.Set(melange_connection.MelangeConnection, "create_vif",
lambda w, x, y, z: fake_mac)
project_id = "fake_project1"
ctx = context.RequestContext('user1', project_id)
self._create_network(networks[0])
net_ids = self.net_man.q_conn.get_networks_for_tenant(project_id)
requested_networks = [(net_id, None) for net_id in net_ids['networks']]
instance_ref = db.api.instance_create(ctx,
{"project_id": project_id})
nw_info = self.net_man.allocate_for_instance(ctx,
instance_id=instance_ref['id'], host="",
rxtx_factor=3,
project_id=project_id,
requested_networks=requested_networks)
self.assertEqual(nw_info[0]['address'], fake_mac)
class QuantumNovaPortSecurityTestCase(QuantumNovaTestCase):
def test_port_securty(self):
self.flags(use_melange_mac_generation=True)
self.flags(quantum_use_port_security=True)
fake_mac = "ab:cd:ef:ab:cd:ef"
self.stubs.Set(melange_connection.MelangeConnection, "create_vif",
lambda w, x, y, z: fake_mac)
project_id = "fake_project1"
ctx = context.RequestContext('user1', project_id)
self._create_network(networks[0])
net_ids = self.net_man.q_conn.get_networks_for_tenant(project_id)
requested_networks = [(net_id, None) for net_id in net_ids['networks']]
instance_ref = db.api.instance_create(ctx,
{"project_id": project_id})
oldfunc = self.net_man.q_conn.create_and_attach_port
# Make sure we get the appropriate mac set in allowed_address_pairs
# if port security is enabled.
def _instrumented_create_and_attach_port(tenant_id, net_id,
interface_id, **kwargs):
self.assertTrue('allowed_address_pairs' in kwargs.keys())
pairs = kwargs['allowed_address_pairs']
self.assertTrue(pairs[0]['mac_address'] == fake_mac)
self.net_man.q_conn.create_and_attach_port = oldfunc
return oldfunc(tenant_id, net_id, interface_id, **kwargs)
self.net_man.q_conn.create_and_attach_port = \
_instrumented_create_and_attach_port
nw_info = self.net_man.allocate_for_instance(ctx,
instance_id=instance_ref['id'], host="",
rxtx_factor=3,
project_id=project_id,
requested_networks=requested_networks)
self.assertEqual(nw_info[0]['address'], fake_mac)
def test_port_securty_negative(self):
self.flags(use_melange_mac_generation=True)
self.flags(quantum_use_port_security=False)
fake_mac = "ab:cd:ef:ab:cd:ef"
self.stubs.Set(melange_connection.MelangeConnection, "create_vif",
lambda w, x, y, z: fake_mac)
project_id = "fake_project1"
ctx = context.RequestContext('user1', project_id)
self._create_network(networks[0])
net_ids = self.net_man.q_conn.get_networks_for_tenant(project_id)
requested_networks = [(net_id, None) for net_id in net_ids['networks']]
instance_ref = db.api.instance_create(ctx,
{"project_id": project_id})
oldfunc = self.net_man.q_conn.create_and_attach_port
# Make sure no pairs are passed in if port security is turned off
def _instrumented_create_and_attach_port(tenant_id, net_id,
interface_id, **kwargs):
self.assertTrue('allowed_address_pairs' in kwargs.keys())
pairs = kwargs['allowed_address_pairs']
self.assertTrue(len(pairs) == 0)
self.net_man.q_conn.create_and_attach_port = oldfunc
return oldfunc(tenant_id, net_id, interface_id, **kwargs)
self.net_man.q_conn.create_and_attach_port = \
_instrumented_create_and_attach_port
nw_info = self.net_man.allocate_for_instance(ctx,
instance_id=instance_ref['id'], host="",
rxtx_factor=3,
project_id=project_id,
requested_networks=requested_networks)
self.assertEqual(nw_info[0]['address'], fake_mac)