Adds network model and network info cache.

The next merge will prepopulate the cache, and use the model to keep the
cache up to date.
I realize "cache" is a bit of a stretch for what this is doing.

blueprint network-info-model
blueprint compute-network-info

Change-Id: I0f0f4ba3de1310e1ff89239dab6ea8e24c85f2c8
This commit is contained in:
Trey Morris
2011-12-01 16:54:40 -06:00
committed by Trey Morris
parent 10c829ff39
commit d8e84937c1
8 changed files with 855 additions and 0 deletions

View File

@@ -649,6 +649,48 @@ def instance_get_id_to_uuid_mapping(context, ids):
###################
def instance_info_cache_create(context, values):
"""Create a new instance cache record in the table.
:param context: = request context object
:param values: = dict containing column values
"""
return IMPL.instance_info_cache_create(context, values)
def instance_info_cache_get(context, instance_id, session=None):
"""Gets an instance info cache from the table.
:param instance_id: = id of the info cache's instance
:param session: = optional session object
"""
return IMPL.instance_info_cache_get(context, instance_id, session=None)
def instance_info_cache_update(context, instance_id, values,
session=None):
"""Update an instance info cache record in the table.
:param instance_id: = id of info cache's instance
:param values: = dict containing column values to update
"""
return IMPL.instance_info_cache_update(context, instance_id, values,
session)
def instance_info_cache_delete_by_instance_id(context, instance_id,
session=None):
"""Deletes an existing instance_info_cache record
:param instance_id: = id of the instance tied to the cache record
"""
return IMPL.instance_info_cache_delete_by_instance_id(context, instance_id,
session)
###################
def key_pair_create(context, values):
"""Create a key_pair from the values dictionary."""
return IMPL.key_pair_create(context, values)

View File

@@ -1132,6 +1132,8 @@ def instance_destroy(context, instance_id):
update({'deleted': True,
'deleted_at': utils.utcnow(),
'updated_at': literal_column('updated_at')})
instance_info_cache_delete_by_instance_id(context, instance_id,
session=session)
@require_context
@@ -1557,6 +1559,75 @@ def instance_get_id_to_uuid_mapping(context, ids):
###################
@require_context
def instance_info_cache_create(context, values):
"""Create a new instance cache record in the table.
:param context: = request context object
:param values: = dict containing column values
"""
info_cache = models.InstanceInfoCache()
info_cache['id'] = str(utils.gen_uuid())
info_cache.update(values)
session = get_session()
with session.begin():
info_cache.save(session=session)
return info_cache
@require_context
def instance_info_cache_get(context, instance_id, session=None):
"""Gets an instance info cache from the table.
:param instance_id: = id of the info cache's instance
:param session: = optional session object
"""
session = session or get_session()
info_cache = session.query(models.InstanceInfoCache).\
filter_by(instance_id=instance_id).\
first()
return info_cache
@require_context
def instance_info_cache_update(context, instance_id, values,
session=None):
"""Update an instance info cache record in the table.
:param instance_id: = id of info cache's instance
:param values: = dict containing column values to update
:param session: = optional session object
"""
session = session or get_session()
info_cache = instance_info_cache_get(context, instance_id,
session=session)
values['updated_at'] = literal_column('updated_at')
if info_cache:
info_cache.update(values)
info_cache.save(session=session)
return info_cache
@require_context
def instance_info_cache_delete_by_instance_id(context, instance_id,
session=None):
"""Deletes an existing instance_info_cache record
:param instance_id: = id of the instance tied to the cache record
:param session: = optional session object
"""
values = {'deleted': True,
'deleted_at': utils.utcnow()}
instance_info_cache_update(context, instance_id, values, session)
###################
@require_context
def key_pair_create(context, values):
key_pair_ref = models.KeyPair()

View File

@@ -0,0 +1,62 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from sqlalchemy import *
from migrate import *
from nova import log as logging
from nova import utils
meta = MetaData()
# instance info cache table to add to DB
instance_info_caches = Table('instance_info_caches', meta,
Column('created_at', DateTime(timezone=False),
default=utils.utcnow()),
Column('updated_at', DateTime(timezone=False),
onupdate=utils.utcnow()),
Column('deleted_at', DateTime(timezone=False)),
Column('deleted', Boolean(create_constraint=True, name=None)),
Column('id', Integer(), primary_key=True),
Column('network_info', Text()),
Column('instance_id', String(36),
ForeignKey('instances.uuid'),
nullable=False,
unique=True),
mysql_engine='InnoDB')
def upgrade(migrate_engine):
meta.bind = migrate_engine
# load instances for fk
instances = Table('instances', meta, autoload=True)
# create instance_info_caches table
try:
instance_info_caches.create()
except Exception:
logging.error(_("Table |%s| not created!"), repr(instance_info_caches))
raise
def downgrade(migrate_engine):
try:
instance_info_caches.drop()
except Exception:
logging.error(_("instance_info_caches tables not dropped"))
raise

View File

@@ -257,6 +257,27 @@ class Instance(BASE, NovaBase):
progress = Column(Integer)
class InstanceInfoCache(BASE, NovaBase):
"""
Represents a cache of information about an instance
"""
__tablename__ = 'instance_info_caches'
id = Column(String(36), primary_key=True)
# text column used for storing a json object of network data for api
network_info = Column(Text)
# this is all uuid based, we have them might as well start using them
instance_id = Column(String(36), ForeignKey('instances.uuid'),
nullable=False, unique=True)
instance = relationship(Instance,
backref=backref('info_cache', uselist=False),
foreign_keys=instance_id,
primaryjoin='and_('
'InstanceInfoCache.instance_id == Instance.uuid,'
'InstanceInfoCache.deleted == False)')
class VirtualStorageArray(BASE, NovaBase):
"""
Represents a virtual storage array supplying block storage to instances.

View File

@@ -317,6 +317,10 @@ class InvalidCPUInfo(Invalid):
message = _("Unacceptable CPU info") + ": %(reason)s"
class InvalidIpAddressError(Invalid):
message = _("%(address)s is not a valid IP v4/6 address.")
class InvalidVLANTag(Invalid):
message = _("VLAN tag is not appropriate for the port group "
"%(bridge)s. Expected VLAN tag is %(tag)s, "

255
nova/network/model.py Normal file
View File

@@ -0,0 +1,255 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import netaddr
import types
from nova import exception
class Model(dict):
"""Defines some necessary structures for most of the network models"""
def __repr__(self):
return self.__class__.__name__ + '(' + dict.__repr__(self) + ')'
def set_meta(self, kwargs):
# pull meta out of kwargs if it's there
self['meta'] = kwargs.pop('meta', {})
# update meta with any additional kwargs that may exist
self['meta'].update(kwargs)
class IP(Model):
"""Represents an IP address in Nova"""
def __init__(self, address=None, type=None, **kwargs):
super(IP, self).__init__()
self['address'] = address
self['type'] = type
self['version'] = kwargs.pop('version', None)
self.set_meta(kwargs)
# determine version from address if not passed in
if self['address'] and not self['version']:
try:
self['version'] = netaddr.IPAddress(self['address']).version
except netaddr.AddrFormatError, e:
raise exception.InvalidIpAddressError(self['address'])
def __eq__(self, other):
return self['address'] == other['address']
@classmethod
def hydrate(cls, ip):
if ip:
return IP(**ip)
return None
class FixedIP(IP):
"""Represents a Fixed IP address in Nova"""
def __init__(self, floating_ips=None, **kwargs):
super(FixedIP, self).__init__(**kwargs)
self['floating_ips'] = floating_ips or []
if not self['type']:
self['type'] = 'fixed'
def add_floating_ip(self, floating_ip):
if floating_ip not in self['floating_ips']:
self['floating_ips'].append(floating_ip)
def floating_ip_addresses(self):
return [ip['address'] for ip in self['floating_ips']]
@classmethod
def hydrate(cls, fixed_ip):
fixed_ip = FixedIP(**fixed_ip)
fixed_ip['floating_ips'] = [IP.hydrate(floating_ip)
for floating_ip in fixed_ip['floating_ips']]
return fixed_ip
class Route(Model):
"""Represents an IP Route in Nova"""
def __init__(self, cidr=None, gateway=None, interface=None, **kwargs):
super(Route, self).__init__()
self['cidr'] = cidr
self['gateway'] = gateway
self['interface'] = interface
self.set_meta(kwargs)
@classmethod
def hydrate(cls, route):
route = Route(**route)
route['gateway'] = IP.hydrate(route['gateway'])
return route
class Subnet(Model):
"""Represents a Subnet in Nova"""
def __init__(self, cidr=None, dns=None, gateway=None, ips=None,
routes=None, **kwargs):
super(Subnet, self).__init__()
self['cidr'] = cidr
self['dns'] = dns or []
self['gateway'] = gateway
self['ips'] = ips or []
self['routes'] = routes or []
self['version'] = kwargs.pop('version', None)
self.set_meta(kwargs)
if self['cidr'] and not self['version']:
self['version'] = netaddr.IPNetwork(self['cidr']).version
def __eq__(self, other):
return self['cidr'] == other['cidr']
def add_route(self, new_route):
if new_route not in self['routes']:
self['routes'].append(new_route)
def add_dns(self, dns):
if dns not in self['dns']:
self['dns'].append(dns)
def add_ip(self, ip):
if ip not in self['ips']:
self['ips'].append(ip)
@classmethod
def hydrate(cls, subnet):
subnet = Subnet(**subnet)
subnet['dns'] = [IP.hydrate(dns) for dns in subnet['dns']]
subnet['ips'] = [FixedIP.hydrate(ip) for ip in subnet['ips']]
subnet['routes'] = [Route.hydrate(route) for route in subnet['routes']]
subnet['gateway'] = IP.hydrate(subnet['gateway'])
return subnet
class Network(Model):
"""Represents a Network in Nova"""
def __init__(self, id=None, bridge=None, label=None,
subnets=None, **kwargs):
super(Network, self).__init__()
self['id'] = id
self['bridge'] = bridge
self['label'] = label
self['subnets'] = subnets or []
self.set_meta(kwargs)
def add_subnet(self, subnet):
if subnet not in self['subnets']:
self['subnets'].append(subnet)
@classmethod
def hydrate(cls, network):
if network:
network = Network(**network)
network['subnets'] = [Subnet.hydrate(subnet)
for subnet in network['subnets']]
return network
class VIF(Model):
"""Represents a Virtual Interface in Nova"""
def __init__(self, id=None, address=None, network=None, **kwargs):
super(VIF, self).__init__()
self['id'] = id
self['address'] = address
self['network'] = network or None
self.set_meta(kwargs)
def __eq__(self, other):
return self['id'] == other['id']
def fixed_ips(self):
return [fixed_ip for subnet in self['network']['subnets']
for fixed_ip in subnet['ips']]
def floating_ips(self):
return [floating_ip for fixed_ip in self.fixed_ips()
for floating_ip in fixed_ip['floating_ips']]
def labeled_ips(self):
""" returns the list of all IPs in this flat structure:
{'network_label': 'my_network',
'network_id': 'n8v29837fn234782f08fjxk3ofhb84',
'ips': [{'address': '123.123.123.123',
'version': 4,
'type: 'fixed',
'meta': {...}},
{'address': '124.124.124.124',
'version': 4,
'type': 'floating',
'meta': {...}},
{'address': 'fe80::4',
'version': 6,
'type': 'fixed',
'meta': {...}}]"""
if self['network']:
# remove unecessary fields on fixed_ips
ips = [IP(**ip) for ip in self.fixed_ips()]
for ip in ips:
# remove floating ips from IP, since this is a flat structure
# of all IPs
del ip['meta']['floating_ips']
# add floating ips to list (if any)
ips.extend(self.floating_ips())
return {'network_label': self['network']['label'],
'network_id': self['network']['id'],
'ips': ips}
return []
@classmethod
def hydrate(cls, vif):
vif = VIF(**vif)
vif['network'] = Network.hydrate(vif['network'])
return vif
class NetworkInfo(list):
"""Stores and manipulates network information for a Nova instance"""
# NetworkInfo is a list of VIFs
def fixed_ips(self):
"""Returns all fixed_ips without floating_ips attached"""
return [ip for vif in self for ip in vif.fixed_ips()]
def floating_ips(self):
"""Returns all floating_ips"""
return [ip for vif in self for ip in vif.floating_ips()]
@classmethod
def hydrate(cls, network_info):
if isinstance(network_info, types.StringTypes):
network_info = json.loads(network_info)
return NetworkInfo([VIF.hydrate(vif) for vif in network_info])
def as_cache(self):
return json.dumps(self)

View File

@@ -0,0 +1,72 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova.network import model
def new_ip(ip_dict=None):
new_ip = dict(address='192.168.1.100')
ip_dict = ip_dict or {}
new_ip.update(ip_dict)
return model.FixedIP(**new_ip)
def new_route(route_dict=None):
new_route = dict(
cidr='0.0.0.0/24',
gateway=new_ip(dict(address='192.168.1.1')),
interface='eth0')
route_dict = route_dict or {}
new_route.update(route_dict)
return model.Route(**new_route)
def new_subnet(subnet_dict=None):
new_subnet = dict(
cidr='255.255.255.0',
dns=[new_ip(dict(address='1.2.3.4')),
new_ip(dict(address='2.3.4.5'))],
gateway=new_ip(dict(address='192.168.1.1')),
ips=[new_ip(dict(address='192.168.1.100')),
new_ip(dict(address='192.168.1.101'))],
routes=[new_route()],
version=4)
subnet_dict = subnet_dict or {}
new_subnet.update(subnet_dict)
return model.Subnet(**new_subnet)
def new_network(network_dict=None):
new_net = dict(
id=1,
bridge='br0',
label='public',
subnets=[new_subnet(), new_subnet(dict(cidr='255.255.255.255'))])
network_dict = network_dict or {}
new_net.update(network_dict)
return model.Network(**new_net)
def new_vif(vif_dict=None):
vif = dict(
id=1,
address='aa:aa:aa:aa:aa:aa',
network=new_network())
vif_dict = vif_dict or {}
vif.update(vif_dict)
return model.VIF(**vif)

View File

@@ -0,0 +1,328 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from nova import exception
from nova import log as logging
from nova.network import model
from nova import test
from nova.tests import fake_network_cache_model
LOG = logging.getLogger('nova.tests.network')
class RouteTests(test.TestCase):
def test_create_route_with_attrs(self):
route = fake_network_cache_model.new_route()
ip = fake_network_cache_model.new_ip(dict(address='192.168.1.1'))
self.assertEqual(route['cidr'], '0.0.0.0/24')
self.assertEqual(route['gateway']['address'], '192.168.1.1')
self.assertEqual(route['interface'], 'eth0')
def test_routes_equal(self):
route1 = fake_network_cache_model.new_route()
route2 = fake_network_cache_model.new_route()
self.assertEqual(route1, route2)
def test_routes_not_equal(self):
route1 = fake_network_cache_model.new_route()
route2 = fake_network_cache_model.new_route(dict(cidr='1.1.1.1/24'))
self.assertNotEqual(route1, route2)
def test_hydrate(self):
route = model.Route.hydrate(
{'gateway': fake_network_cache_model.new_ip(
dict(address='192.168.1.1'))})
self.assertEqual(route['cidr'], None)
self.assertEqual(route['gateway']['address'], '192.168.1.1')
self.assertEqual(route['interface'], None)
class FixedIPTests(test.TestCase):
def test_createnew_fixed_ip_with_attrs(self):
fixed_ip = model.FixedIP(address='192.168.1.100')
self.assertEqual(fixed_ip['address'], '192.168.1.100')
self.assertEqual(fixed_ip['floating_ips'], [])
self.assertEqual(fixed_ip['type'], 'fixed')
self.assertEqual(fixed_ip['version'], 4)
def test_create_fixed_ipv6(self):
fixed_ip = model.FixedIP(address='::1')
self.assertEqual(fixed_ip['address'], '::1')
self.assertEqual(fixed_ip['floating_ips'], [])
self.assertEqual(fixed_ip['type'], 'fixed')
self.assertEqual(fixed_ip['version'], 6)
def test_create_fixed_bad_ip_fails(self):
self.assertRaises(exception.InvalidIpAddressError,
model.FixedIP,
address='picklespicklespickles')
def test_equate_two_fixed_ips(self):
fixed_ip = model.FixedIP(address='::1')
fixed_ip2 = model.FixedIP(address='::1')
self.assertEqual(fixed_ip, fixed_ip2)
def test_equate_two_dissimilar_fixed_ips_fails(self):
fixed_ip = model.FixedIP(address='::1')
fixed_ip2 = model.FixedIP(address='::2')
self.assertNotEqual(fixed_ip, fixed_ip2)
def test_hydrate(self):
fixed_ip = model.FixedIP.hydrate({})
self.assertEqual(fixed_ip['floating_ips'], [])
self.assertEqual(fixed_ip['address'], None)
self.assertEqual(fixed_ip['type'], 'fixed')
self.assertEqual(fixed_ip['version'], None)
def test_add_floating_ip(self):
fixed_ip = model.FixedIP(address='192.168.1.100')
fixed_ip.add_floating_ip('192.168.1.101')
self.assertEqual(fixed_ip['floating_ips'], ['192.168.1.101'])
def test_add_floating_ip_repeatedly_only_one_instance(self):
fixed_ip = model.FixedIP(address='192.168.1.100')
for i in xrange(10):
fixed_ip.add_floating_ip('192.168.1.101')
self.assertEqual(fixed_ip['floating_ips'], ['192.168.1.101'])
class SubnetTests(test.TestCase):
def test_create_subnet_with_attrs(self):
subnet = fake_network_cache_model.new_subnet()
route1 = fake_network_cache_model.new_route()
self.assertEqual(subnet['cidr'], '255.255.255.0')
self.assertEqual(subnet['dns'],
[fake_network_cache_model.new_ip(dict(address='1.2.3.4')),
fake_network_cache_model.new_ip(dict(address='2.3.4.5'))])
self.assertEqual(subnet['gateway']['address'], '192.168.1.1')
self.assertEqual(subnet['ips'],
[fake_network_cache_model.new_ip(
dict(address='192.168.1.100')),
fake_network_cache_model.new_ip(
dict(address='192.168.1.101'))])
self.assertEqual(subnet['routes'], [route1])
self.assertEqual(subnet['version'], 4)
def test_add_route(self):
subnet = fake_network_cache_model.new_subnet()
route1 = fake_network_cache_model.new_route()
route2 = fake_network_cache_model.new_route({'cidr': '1.1.1.1/24'})
subnet.add_route(route2)
self.assertEqual(subnet['routes'], [route1, route2])
def test_add_route_a_lot(self):
subnet = fake_network_cache_model.new_subnet()
route1 = fake_network_cache_model.new_route()
route2 = fake_network_cache_model.new_route({'cidr': '1.1.1.1/24'})
for i in xrange(10):
subnet.add_route(route2)
self.assertEqual(subnet['routes'], [route1, route2])
def test_add_dns(self):
subnet = fake_network_cache_model.new_subnet()
dns = fake_network_cache_model.new_ip(dict(address='9.9.9.9'))
subnet.add_dns(dns)
self.assertEqual(subnet['dns'],
[fake_network_cache_model.new_ip(dict(address='1.2.3.4')),
fake_network_cache_model.new_ip(dict(address='2.3.4.5')),
fake_network_cache_model.new_ip(dict(address='9.9.9.9'))])
def test_add_dns_a_lot(self):
subnet = fake_network_cache_model.new_subnet()
for i in xrange(10):
subnet.add_dns(fake_network_cache_model.new_ip(
dict(address='9.9.9.9')))
self.assertEqual(subnet['dns'],
[fake_network_cache_model.new_ip(dict(address='1.2.3.4')),
fake_network_cache_model.new_ip(dict(address='2.3.4.5')),
fake_network_cache_model.new_ip(dict(address='9.9.9.9'))])
def test_add_ip(self):
subnet = fake_network_cache_model.new_subnet()
subnet.add_ip(fake_network_cache_model.new_ip(
dict(address='192.168.1.102')))
self.assertEqual(subnet['ips'],
[fake_network_cache_model.new_ip(
dict(address='192.168.1.100')),
fake_network_cache_model.new_ip(
dict(address='192.168.1.101')),
fake_network_cache_model.new_ip(
dict(address='192.168.1.102'))])
def test_add_ip_a_lot(self):
subnet = fake_network_cache_model.new_subnet()
for i in xrange(10):
subnet.add_ip(fake_network_cache_model.new_ip(
dict(address='192.168.1.102')))
self.assertEqual(subnet['ips'],
[fake_network_cache_model.new_ip(
dict(address='192.168.1.100')),
fake_network_cache_model.new_ip(
dict(address='192.168.1.101')),
fake_network_cache_model.new_ip(
dict(address='192.168.1.102'))])
def test_hydrate(self):
subnet_dict = {
'cidr': '255.255.255.0',
'dns': [fake_network_cache_model.new_ip(dict(address='1.1.1.1'))],
'ips': [fake_network_cache_model.new_ip(dict(address='2.2.2.2'))],
'routes': [fake_network_cache_model.new_route()],
'version': 4,
'gateway': fake_network_cache_model.new_ip(
dict(address='3.3.3.3'))}
subnet = model.Subnet.hydrate(subnet_dict)
self.assertEqual(subnet['cidr'], '255.255.255.0')
self.assertEqual(subnet['dns'], [fake_network_cache_model.new_ip(
dict(address='1.1.1.1'))])
self.assertEqual(subnet['gateway']['address'], '3.3.3.3')
self.assertEqual(subnet['ips'], [fake_network_cache_model.new_ip(
dict(address='2.2.2.2'))])
self.assertEqual(subnet['routes'], [
fake_network_cache_model.new_route()])
self.assertEqual(subnet['version'], 4)
class NetworkTests(test.TestCase):
def test_create_network(self):
network = fake_network_cache_model.new_network()
self.assertEqual(network['id'], 1)
self.assertEqual(network['bridge'], 'br0')
self.assertEqual(network['label'], 'public')
self.assertEqual(network['subnets'],
[fake_network_cache_model.new_subnet(),
fake_network_cache_model.new_subnet(
dict(cidr='255.255.255.255'))])
def test_add_subnet(self):
network = fake_network_cache_model.new_network()
network.add_subnet(fake_network_cache_model.new_subnet(
dict(cidr='0.0.0.0')))
self.assertEqual(network['subnets'],
[fake_network_cache_model.new_subnet(),
fake_network_cache_model.new_subnet(
dict(cidr='255.255.255.255')),
fake_network_cache_model.new_subnet(dict(cidr='0.0.0.0'))])
def test_add_subnet_a_lot(self):
network = fake_network_cache_model.new_network()
for i in xrange(10):
network.add_subnet(fake_network_cache_model.new_subnet(
dict(cidr='0.0.0.0')))
self.assertEqual(network['subnets'],
[fake_network_cache_model.new_subnet(),
fake_network_cache_model.new_subnet(
dict(cidr='255.255.255.255')),
fake_network_cache_model.new_subnet(dict(cidr='0.0.0.0'))])
def test_hydrate(self):
new_network = dict(
id=1,
bridge='br0',
label='public',
subnets=[fake_network_cache_model.new_subnet(),
fake_network_cache_model.new_subnet(
dict(cidr='255.255.255.255'))])
network = model.Network.hydrate(fake_network_cache_model.new_network())
self.assertEqual(network['id'], 1)
self.assertEqual(network['bridge'], 'br0')
self.assertEqual(network['label'], 'public')
self.assertEqual(network['subnets'],
[fake_network_cache_model.new_subnet(),
fake_network_cache_model.new_subnet(
dict(cidr='255.255.255.255'))])
class VIFTests(test.TestCase):
def test_create_vif(self):
vif = fake_network_cache_model.new_vif()
self.assertEqual(vif['id'], 1)
self.assertEqual(vif['address'], 'aa:aa:aa:aa:aa:aa')
self.assertEqual(vif['network'],
fake_network_cache_model.new_network())
def test_vif_get_fixed_ips(self):
vif = fake_network_cache_model.new_vif()
fixed_ips = vif.fixed_ips()
ips = [fake_network_cache_model.new_ip(dict(address='192.168.1.100')),
fake_network_cache_model.new_ip(
dict(address='192.168.1.101'))] * 2
self.assertEqual(fixed_ips, ips)
def test_vif_get_floating_ips(self):
vif = fake_network_cache_model.new_vif()
vif['network']['subnets'][0]['ips'][0].add_floating_ip('192.168.1.1')
floating_ips = vif.floating_ips()
self.assertEqual(floating_ips, ['192.168.1.1'])
def test_vif_get_labeled_ips(self):
vif = fake_network_cache_model.new_vif()
labeled_ips = vif.labeled_ips()
ip_dict = {
'network_id': 1,
'ips': [fake_network_cache_model.new_ip(
{'address': '192.168.1.100'}),
fake_network_cache_model.new_ip(
{'address': '192.168.1.101'})] * 2,
'network_label': 'public'}
self.assertEqual(labeled_ips, ip_dict)
def test_hydrate(self):
new_vif = dict(
id=1,
address='127.0.0.1',
network=fake_network_cache_model.new_network())
vif = model.VIF.hydrate(fake_network_cache_model.new_vif())
self.assertEqual(vif['id'], 1)
self.assertEqual(vif['address'], 'aa:aa:aa:aa:aa:aa')
self.assertEqual(vif['network'],
fake_network_cache_model.new_network())
class NetworkInfoTests(test.TestCase):
def test_create_model(self):
ninfo = model.NetworkInfo([fake_network_cache_model.new_vif(),
fake_network_cache_model.new_vif(
{'address':'bb:bb:bb:bb:bb:bb'})])
self.assertEqual(ninfo.fixed_ips(),
[fake_network_cache_model.new_ip({'address': '192.168.1.100'}),
fake_network_cache_model.new_ip(
{'address': '192.168.1.101'})] * 4)
def test_get_floating_ips(self):
vif = fake_network_cache_model.new_vif()
vif['network']['subnets'][0]['ips'][0].add_floating_ip('192.168.1.1')
ninfo = model.NetworkInfo([vif,
fake_network_cache_model.new_vif(
{'address':'bb:bb:bb:bb:bb:bb'})])
self.assertEqual(ninfo.floating_ips(), ['192.168.1.1'])
def test_hydrate(self):
ninfo = model.NetworkInfo([fake_network_cache_model.new_vif(),
fake_network_cache_model.new_vif(
{'address':'bb:bb:bb:bb:bb:bb'})])
deserialized = model.NetworkInfo.hydrate(ninfo)
self.assertEqual(ninfo.fixed_ips(),
[fake_network_cache_model.new_ip({'address': '192.168.1.100'}),
fake_network_cache_model.new_ip(
{'address': '192.168.1.101'})] * 4)