add unit test for check and create default vpc function

Change-Id: I0d42c205067ab09510d9a8b321b0e31919c58ad2
This commit is contained in:
tikitavi 2016-12-31 02:32:24 +03:00
parent 2c0fe86d82
commit 9fd28fbed3
2 changed files with 79 additions and 2 deletions

View File

@ -89,6 +89,7 @@ CIDR_SUBNET_DEFAULT = '172.31.0.0/20'
CIDR_SUBNET_1 = '10.10.1.0/24'
IP_FIRST_SUBNET_1 = '10.10.1.4'
IP_LAST_SUBNET_1 = '10.10.1.254'
IP_GATEWAY_SUBNET_DEFAULT = '172.31.0.1'
IP_GATEWAY_SUBNET_1 = '10.10.1.1'
IP_GATEWAY_SUBNET_2 = '10.10.2.1'
CIDR_SUBNET_2 = '10.10.2.0/24'
@ -487,12 +488,17 @@ EC2_SUBNET_2 = {'subnetId': ID_EC2_SUBNET_2,
'availableIpAddressCount': 253,
'mapPublicIpOnLaunch': False}
OS_SUBNET_HOST_ROUTE_NEXTHOP = '169.254.169.254/32'
OS_SUBNET_DEFAULT = {'id': ID_OS_SUBNET_DEFAULT,
'network_id': ID_OS_NETWORK_DEFAULT,
'name': ID_EC2_SUBNET_DEFAULT,
'ip_version': '4',
'cidr': CIDR_SUBNET_DEFAULT,
'host_routes': [],
'host_routes': [{'nexthop': IP_GATEWAY_SUBNET_DEFAULT,
'destination': '172.31.0.0/16'},
{'nexthop': IP_GATEWAY_SUBNET_DEFAULT,
'destination':
OS_SUBNET_HOST_ROUTE_NEXTHOP}],
'gateway_ip': None}
OS_SUBNET_1 = {'id': ID_OS_SUBNET_1,
'network_id': ID_OS_NETWORK_1,
@ -502,7 +508,7 @@ OS_SUBNET_1 = {'id': ID_OS_SUBNET_1,
'host_routes': [{'nexthop': IP_GATEWAY_SUBNET_1,
'destination': '10.10.0.0/16'},
{'nexthop': IP_GATEWAY_SUBNET_1,
'destination': '169.254.169.254/32'}],
'destination': OS_SUBNET_HOST_ROUTE_NEXTHOP}],
'gateway_ip': IP_GATEWAY_SUBNET_1}
OS_SUBNET_2 = {'id': ID_OS_SUBNET_2,
'network_id': ID_OS_NETWORK_2,

View File

@ -18,6 +18,7 @@ import copy
import mock
from neutronclient.common import exceptions as neutron_exception
from ec2api.api import vpc as vpc_api
from ec2api.tests.unit import base
from ec2api.tests.unit import fakes
from ec2api.tests.unit import matchers
@ -54,6 +55,8 @@ class VpcTestCase(base.ApiTestCase):
mock.ANY, 'rtb',
tools.purge_dict(fakes.DB_ROUTE_TABLE_1,
('id',)))
# checking that no default vpc related stuff is added
self.assertEqual(2, self.db_api.add_item.call_count)
self.db_api.update_item.assert_called_once_with(
mock.ANY,
fakes.DB_VPC_1)
@ -279,3 +282,71 @@ class VpcTestCase(base.ApiTestCase):
self.assertEqual(resp['vpcSet'], [fakes.EC2_VPC_DEFAULT])
self.db_api.add_item.assert_not_called()
class VpcPrivateTestCase(base.BaseTestCase):
def test_check_and_create_default_vpc(self):
context = base.create_context()
nova = self.mock_nova()
neutron = self.mock_neutron()
db_api = self.mock_db()
neutron.create_router.side_effect = (
tools.get_neutron_create('router', fakes.ID_OS_ROUTER_DEFAULT))
neutron.create_network.side_effect = (
tools.get_neutron_create('network', fakes.ID_OS_NETWORK_DEFAULT,
{'status': 'available'}))
neutron.create_subnet.side_effect = (
tools.get_neutron_create('subnet', fakes.ID_OS_SUBNET_DEFAULT))
db_api.add_item.side_effect = (
tools.get_db_api_add_item({
'vpc': fakes.ID_EC2_VPC_DEFAULT,
'rtb': fakes.ID_EC2_ROUTE_TABLE_DEFAULT,
'sg': fakes.ID_EC2_SECURITY_GROUP_DEFAULT,
'subnet': fakes.ID_EC2_SUBNET_DEFAULT}))
db_api.set_mock_items(fakes.DB_ROUTE_TABLE_DEFAULT,
fakes.DB_SUBNET_DEFAULT)
db_api.get_item_by_id.side_effect = (
tools.get_db_api_get_item_by_id(fakes.DB_VPC_DEFAULT,
fakes.DB_ROUTE_TABLE_DEFAULT))
vpc_api._check_and_create_default_vpc(context)
neutron.create_router.assert_called_with({'router': {}})
neutron.update_router.assert_called_once_with(
fakes.ID_OS_ROUTER_DEFAULT,
{'router': {'name': fakes.EC2_VPC_DEFAULT['vpcId']}})
db_api.add_item.assert_any_call(
mock.ANY, 'vpc',
tools.purge_dict(fakes.DB_VPC_DEFAULT,
('id', 'vpc_id', 'route_table_id')))
db_api.add_item.assert_any_call(
mock.ANY, 'rtb',
tools.purge_dict(fakes.DB_ROUTE_TABLE_DEFAULT,
('id',)))
db_api.update_item.assert_called_once_with(
mock.ANY,
fakes.DB_VPC_DEFAULT)
db_api.add_item.assert_any_call(
mock.ANY, 'subnet',
tools.purge_dict(fakes.DB_SUBNET_DEFAULT, ('id',)))
neutron.create_network.assert_called_once_with(
{'network': {'name': 'subnet-0'}})
neutron.update_network.assert_called_once_with(
fakes.ID_OS_NETWORK_DEFAULT,
{'network': {'name': fakes.ID_EC2_SUBNET_DEFAULT}})
neutron.create_subnet.assert_called_once_with(
{'subnet': tools.purge_dict(fakes.OS_SUBNET_DEFAULT,
('id', 'name', 'gateway_ip'))})
neutron.update_subnet.assert_called_once_with(
fakes.ID_OS_SUBNET_DEFAULT,
{'subnet': {'name': fakes.ID_EC2_SUBNET_DEFAULT,
'gateway_ip': None}})
neutron.add_interface_router.assert_called_once_with(
fakes.ID_OS_ROUTER_DEFAULT,
{'subnet_id': fakes.ID_OS_SUBNET_DEFAULT})