Changed the association setup for ip/port

Fixes JIRA:NCP-1608
This commit is contained in:
Justin Hammond
2015-07-24 23:46:52 -05:00
parent 482d01b6e7
commit 59907d11d1
11 changed files with 551 additions and 418 deletions

View File

@@ -147,9 +147,14 @@ def _model_query(context, model, filters, fields=None):
datetime.timedelta(seconds=value))
# NOTE(asadoughi): should this allow for deallocated_at = null?
model_filters.append(model.deallocated_at <= reuse)
elif key == "port_id":
if model == models.PortIpAssociation:
model_filters.append(model.port_id == value)
elif key == "tenant_id":
if model == models.IPAddress:
model_filters.append(model.used_by_tenant_id.in_(value))
elif model == models.PortIpAssociation:
pass
else:
model_filters.append(model.tenant_id.in_(value))
@@ -233,6 +238,13 @@ def port_create(context, **port_dict):
return port
@scoped
def ip_port_association_find(context, **filters):
query = context.session.query(models.PortIpAssociation)
model_filters = _model_query(context, models.PortIpAssociation, filters)
return query.filter(*model_filters)
def port_disassociate_ip(context, ports, address):
assocs_to_remove = [assoc for assoc in address.associations
if assoc.port in ports]
@@ -247,11 +259,10 @@ def port_disassociate_ip(context, ports, address):
def port_associate_ip(context, ports, address, enable_port=None):
for port in ports:
assoc = models.PortIpAssociation()
assoc.port_id = port.id
assoc.ip_address_id = address.id
assoc.port = port
assoc.ip_address = address
assoc.enabled = port.id in enable_port if enable_port else False
address.associations.append(assoc)
context.session.add(address)
context.session.add(assoc)
return address
@@ -319,8 +330,8 @@ def ip_address_find(context, lock_mode=False, **filters):
models.Port.device_id.in_(filters["device_id"])))
if filters.get("service"):
model_filters.append(models.IPAddress.ports.any(
models.Port.service == filters["service"]))
model_filters.append(models.IPAddress.associations.any(
models.PortIpAssociation.service == filters["service"]))
if filters.get("port_id"):
model_filters.append(models.IPAddress.ports.any(

View File

@@ -129,6 +129,8 @@ port_ip_association_table = sa.Table(
primary_key=True),
sa.Column("enabled", sa.Boolean(), default=True, nullable=False,
server_default='1'),
sa.Column("service", sa.String(255), default='none', nullable=True,
server_default='none'),
**TABLE_KWARGS)
@@ -170,6 +172,31 @@ class IPAddress(BASEV2, models.HasId):
sa.ForeignKey("quark_transactions.id"),
nullable=True)
def is_shared(self):
return self.address_type == ip_types.SHARED
def has_shared_owner(self):
for assoc in self["associations"]:
if assoc.service != 'none' and assoc.service is not None:
return True
return False
def get_shared_owner(self):
for assoc in self["associations"]:
if assoc.service != 'none' and assoc.service is not None:
return assoc.port
return None
def set_service_for_port(self, port, service):
for assoc in self["associations"]:
if assoc.port_id == port["id"]:
assoc.service = service
def get_service_for_port(self, port):
for assoc in self["associations"]:
if assoc.port_id == port["id"]:
return assoc.service
def enabled_for_port(self, port):
for assoc in self["associations"]:
if assoc.port_id == port["id"]:
@@ -405,7 +432,6 @@ class Port(BASEV2, models.HasTenant, models.HasId):
device_owner = sa.Column(sa.String(255))
bridge = sa.Column(sa.String(255))
associations = orm.relationship(PortIpAssociation, backref="port")
service = sa.Column(sa.String(255), default="none")
@declarative.declared_attr
def ip_addresses(cls):
@@ -415,7 +441,7 @@ class Port(BASEV2, models.HasTenant, models.HasId):
return orm.relationship(IPAddress, primaryjoin=primaryjoin,
secondaryjoin=secondaryjoin,
secondary=port_ip_association_table,
backref="ports",
backref='ports',
order_by='IPAddress.allocated_at')
@declarative.declared_attr

View File

@@ -23,7 +23,6 @@ from quark.db import api as db_api
from quark.db import ip_types
from quark import exceptions as quark_exceptions
from quark import ipam
from quark.plugin_modules import ports as port_module
from quark import plugin_views as v
CONF = cfg.CONF
@@ -88,9 +87,8 @@ def _shared_ip_request(ip_address):
return len(port_ids) > 1 or len(device_ids) > 1
def _shared_ip_and_active(iptype, ports, except_port=None):
if(iptype == ip_types.SHARED and any(p.service != 'none' and
p.id != except_port for p in ports)):
def _shared_ip_and_active(ip_address, except_port=None):
if ip_address.is_shared() and ip_address.has_shared_owner():
return True
return False
@@ -169,13 +167,9 @@ def create_ip_address(context, body):
if ip_address else [],
segment_id=segment_id,
address_type=iptype)
# Ensure that there are no ports whose service is set to something other
# than 'none' because nova will not be able to know it was disassociated
if _shared_ip_and_active(iptype, ports):
raise quark_exceptions.PortRequiresDisassociation()
with context.session.begin():
new_address = db_api.port_associate_ip(context, ports,
new_addresses[0])
address = new_addresses[0]
new_address = db_api.port_associate_ip(context, ports, address)
return v._make_ip_dict(new_address)
@@ -255,7 +249,7 @@ def delete_ip_address(context, id):
if not ip_address or ip_address.deallocated:
raise quark_exceptions.IpAddressNotFound(addr_id=id)
if _shared_ip_and_active(ip_address.address_type, ip_address.ports):
if _shared_ip_and_active(ip_address):
raise quark_exceptions.PortRequiresDisassociation()
db_api.update_port_associations_for_ip(context, [], ip_address)
@@ -298,7 +292,7 @@ def get_ports_for_ip_address(context, ip_id, limit=None, sorts=None,
ports = db_api.port_find(context, limit, sorts, marker,
fields=fields, join_security_groups=True,
**filters)
return v._make_ip_ports_list(ports, fields)
return v._make_ip_ports_list(addr, ports, fields)
def get_port_for_ip_address(context, ip_id, id, fields=None):
@@ -324,7 +318,7 @@ def get_port_for_ip_address(context, ip_id, id, fields=None):
if not results:
raise exceptions.PortNotFound(port_id=id, net_id='')
return v._make_port_for_ip_dict(results)
return v._make_port_for_ip_dict(addr, results)
def update_port_for_ip_address(context, ip_id, id, port):
@@ -339,19 +333,19 @@ def update_port_for_ip_address(context, ip_id, id, port):
neutron/api/v2/attributes.py.
"""
LOG.info("update_port %s for tenant %s" % (id, context.tenant_id))
addr = db_api.ip_address_find(context, id=ip_id, scope=db_api.ONE)
sanitize_list = ['service']
port_dict = {k: port['port'][k] for k in sanitize_list}
addr = db_api.ip_address_find(context, id=ip_id, scope=db_api.ONE)
if not addr:
raise quark_exceptions.IpAddressNotFound(addr_id=ip_id)
port_db = db_api.port_find(context, id=id, scope=db_api.ONE)
if not port_db:
raise quark_exceptions.PortNotFound(port_id=id)
port_dict = {k: port['port'][k] for k in sanitize_list}
require_da = False
ports = addr.ports
iptype = addr.address_type
service = port_dict.get('service')
if require_da and _shared_ip_and_active(iptype, ports, except_port=id):
if require_da and _shared_ip_and_active(addr, except_port=id):
raise quark_exceptions.PortRequiresDisassociation()
new_port = {"port": port_dict}
port_ret = port_module.update_port(context, id, new_port)
return v._make_port_for_ip_dict(port_ret)
addr.set_service_for_port(port_db, service)
return v._make_port_for_ip_dict(addr, port_db)

View File

@@ -155,10 +155,11 @@ def _make_security_group_rule_dict(security_rule, fields=None):
return res
def _ip_port_dict(port, fields=None):
def _ip_port_dict(ip, port, fields=None):
service = ip.get_service_for_port(port)
res = {"id": port.get("id"),
"device_id": port.get("device_id"),
"service": port.get("service")}
"service": service}
return res
@@ -199,28 +200,30 @@ def _make_port_address_dict(ip, port, fields=None):
return ip_addr
def _make_port_for_ip_dict(port, fields=None):
res = _ip_port_dict(port)
def _make_port_for_ip_dict(ip, port, fields=None):
res = _ip_port_dict(ip, port)
return res
def _ip_is_fixed(port, ip):
at = ip.get('address_type')
return (not at or at == ip_types.FIXED or (at == ip_types.SHARED and
port.service != "none"))
return (not at or at in (ip_types.FIXED, ip_types.SHARED))
def _make_port_dict(port, fields=None):
res = _port_dict(port)
ips = []
for assoc in port.associations:
ips.append(assoc.ip_address)
res["fixed_ips"] = [_make_port_address_dict(ip, port, fields)
for ip in port.ip_addresses if _ip_is_fixed(port, ip)]
for ip in ips if _ip_is_fixed(port, ip)]
return res
def _make_ip_ports_list(query, fields=None):
def _make_ip_ports_list(ip, query, fields=None):
ports = []
for port in query:
port_dict = _ip_port_dict(port, fields)
port_dict = _ip_port_dict(ip, port, fields)
ports.append(port_dict)
return ports

View File

@@ -14,11 +14,15 @@ class BaseFunctionalTest(test_base.TestBase):
self.context = context.Context('fake', 'fake', is_admin=False)
cfg.CONF.set_override('connection', 'sqlite://', 'database')
configure_mappers()
engine = neutron_db_api.get_engine()
models.BASEV2.metadata.create_all(engine)
quota_driver.quota_db.Quota.metadata.create_all(engine)
# Must set the neutron's facade to none before each test
# otherwise the data will be shared between tests
neutron_db_api._FACADE = None
self.engine = neutron_db_api.get_engine()
models.BASEV2.metadata.create_all(self.engine)
quota_driver.quota_db.Quota.metadata.create_all(self.engine)
def tearDown(self):
engine = neutron_db_api.get_engine()
models.BASEV2.metadata.drop_all(engine)
quota_driver.quota_db.Quota.metadata.drop_all(engine)
self.context = None
neutron_db_api._FACADE = None
models.BASEV2.metadata.drop_all(self.engine)
quota_driver.quota_db.Quota.metadata.drop_all(self.engine)

View File

@@ -4,6 +4,7 @@ import netaddr
import contextlib
from oslo_config import cfg
from quark.db import api as db_api
from quark.db import ip_types
import quark.ipam
import quark.plugin
@@ -73,7 +74,16 @@ class QuarkSharedIPs(MySqlBaseFunctionalTest):
with self._stubs(self.network, self.subnet, self.ports_info2) as (
net, sub, ports):
for p in ports:
self.assertEqual('none', p.get('service'))
port_db = db_api.port_find(self.context, id=p['id'],
scope=db_api.ONE)
assocs = db_api.ip_port_association_find(self.context,
scope=db_api.ALL,
port_id=p['id'])
self.assertEqual(1, len(p.get('fixed_ips')))
self.assertEqual(1, len(port_db.ip_addresses))
ip_db = port_db.ip_addresses[0]
self.assertEqual('none', ip_db.get_service_for_port(port_db))
self.assertEqual(1, len(assocs))
port_ids = [ports[0]['id'], ports[1]['id']]
shared_ip = {'ip_address': dict(port_ids=port_ids,
@@ -95,7 +105,16 @@ class QuarkSharedIPs(MySqlBaseFunctionalTest):
with self._stubs(self.network, self.subnet, self.ports_info2) as (
net, sub, ports):
for p in ports:
self.assertEqual('none', p.get('service'))
port_db = db_api.port_find(self.context, id=p['id'],
scope=db_api.ONE)
assocs = db_api.ip_port_association_find(self.context,
scope=db_api.ALL,
port_id=p['id'])
self.assertEqual(1, len(p.get('fixed_ips')))
self.assertEqual(1, len(port_db.ip_addresses))
ip_db = port_db.ip_addresses[0]
self.assertEqual('none', ip_db.get_service_for_port(port_db))
self.assertEqual(1, len(assocs))
device_ids = [ports[0]['device_id'], ports[1]['device_id']]
shared_ip = {'ip_address': dict(device_ids=device_ids,
@@ -108,22 +127,35 @@ class QuarkSharedIPs(MySqlBaseFunctionalTest):
self.assertEqual(2, len(ports_ip))
port = port_api.get_port(self.context, ports[0]['id'])
self.assertEqual(1, len(port['fixed_ips']))
self.assertEqual(2, len(port['fixed_ips']))
port_ip_update = ip_api.update_port_for_ip_address
updated_port = port_ip_update(self.context, ip['id'],
ports[0]['id'], _make_body('derp'))
self.assertEqual('derp', updated_port.get('service'))
port = port_api.get_port(self.context, ports[0]['id'])
self.assertEqual(2, len(port['fixed_ips']))
port = ip_api.get_port_for_ip_address(self.context, ip['id'],
ports[0]['id'])
self.assertEqual('derp', port.get('service'))
port = ip_api.get_port_for_ip_address(self.context, ip['id'],
ports[1]['id'])
self.assertEqual('none', port.get('service'))
def test_create_shared_ips_with_device_ids(self):
with self._stubs(self.network, self.subnet, self.ports_info2) as (
net, sub, ports):
for p in ports:
self.assertEqual('none', p.get('service'))
port_db = db_api.port_find(self.context, id=p['id'],
scope=db_api.ONE)
assocs = db_api.ip_port_association_find(self.context,
scope=db_api.ALL,
port_id=p['id'])
self.assertEqual(1, len(p.get('fixed_ips')))
self.assertEqual(1, len(port_db.ip_addresses))
ip_db = port_db.ip_addresses[0]
self.assertEqual('none', ip_db.get_service_for_port(port_db))
self.assertEqual(1, len(assocs))
device_ids = [ports[0]['device_id'], ports[1]['device_id']]
shared_ip = {'ip_address': dict(device_ids=device_ids,
@@ -144,42 +176,63 @@ class QuarkSharedIPs(MySqlBaseFunctionalTest):
with self._stubs(self.network, self.subnet, self.ports_info4) as (
net, sub, ports):
for p in ports:
self.assertEqual('none', p.get('service'))
port_ids1 = [ports[0]['id'], ports[1]['id']]
port_ids2 = [ports[2]['id'], ports[3]['id']]
filters = dict(device_id='a')
ips = ip_api.get_ip_addresses(self.context, **filters)
self.assertEqual(1, len(ips))
shared_ip1 = {'ip_address': dict(port_ids=port_ids1,
network_id=net['id'],
version=4)}
ip1 = ip_api.create_ip_address(self.context, shared_ip1)
updated_port = port_api.update_port(self.context, ports[0]['id'],
_make_body('derp'))
self.assertEqual('derp', updated_port.get('service'))
for p in ports:
if p['id'] != ports[0]['id']:
self.assertEqual('none', p.get('service'))
self.assertEqual(2, len(ip1['port_ids']))
shared_ip2 = {'ip_address': dict(port_ids=port_ids2,
network_id=net['id'],
version=4)}
ip2 = ip_api.create_ip_address(self.context, shared_ip2)
self.assertEqual(2, len(ip2['port_ids']))
ports_ip = ip_api.get_ports_for_ip_address(self.context, ip1['id'])
self.assertEqual(2, len(ports_ip))
ports_ip = ip_api.get_ports_for_ip_address(self.context, ip2['id'])
self.assertEqual(2, len(ports_ip))
filters = dict(device_id='a', service='derp')
filters = dict(device_id='a')
ips = ip_api.get_ip_addresses(self.context, **filters)
self.assertEqual(2, len(ips))
filters = dict(device_id='x')
ips = ip_api.get_ip_addresses(self.context, **filters)
self.assertEqual(0, len(ips))
filters = dict(device_id='a', service='derp')
ips = ip_api.get_ip_addresses(self.context, **filters)
self.assertEqual(0, len(ips))
filters = dict(service='derp')
ips = ip_api.get_ip_addresses(self.context, **filters)
self.assertEqual(0, len(ips))
filters = dict(device_id='a', service='none')
ips = ip_api.get_ip_addresses(self.context, **filters)
self.assertEqual(2, len(ips))
port_ip_update = ip_api.update_port_for_ip_address
updated_port = port_ip_update(self.context, ip1['id'],
ports[0]['id'], _make_body('derp'))
self.assertEqual('derp', updated_port.get('service'))
filters = dict(device_id='a', service='derp')
ips = ip_api.get_ip_addresses(self.context, **filters)
self.assertEqual(1, len(ips))
filters = dict(device_id='a', service='derp',
type=ip_types.FIXED)
ips = ip_api.get_ip_addresses(self.context, **filters)
self.assertEqual(1, len(ips))
self.assertEqual(0, len(ips))
filters = dict(device_id='a', service='derp',
type=ip_types.SHARED)

View File

@@ -0,0 +1,206 @@
# Copyright 2014 Openstack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for# the specific language governing permissions and limitations
# under the License.
import mock
import netaddr
import contextlib
from oslo_config import cfg
from quark.db import ip_types
from quark import exceptions as q_exceptions
import quark.ipam
import quark.plugin
import quark.plugin_modules.ip_addresses as ip_api
import quark.plugin_modules.mac_address_ranges as macrng_api
import quark.plugin_modules.networks as network_api
import quark.plugin_modules.ports as port_api
import quark.plugin_modules.subnets as subnet_api
from quark.tests.functional.base import BaseFunctionalTest
class QuarkSharedIPs(BaseFunctionalTest):
def __init__(self, *args, **kwargs):
super(QuarkSharedIPs, self).__init__(*args, **kwargs)
self.disassociate_exception = q_exceptions.PortRequiresDisassociation
self.cidr = "192.168.2.0/24"
self.ip_network = netaddr.IPNetwork(self.cidr)
network = dict(name="public", tenant_id="fake", network_plugin="BASE")
self.network = {"network": network}
subnet = dict(ip_version=4, next_auto_assign_ip=2,
cidr=self.cidr, first_ip=self.ip_network.first,
last_ip=self.ip_network.last, ip_policy=None,
tenant_id="fake")
self.subnet = {"subnet": subnet}
port1 = {'port': dict(device_id='a')}
port2 = {'port': dict(device_id='b')}
port3 = {'port': dict(device_id='c')}
port4 = {'port': dict(device_id='d')}
self.ports_info2 = [port1, port2]
self.ports_info4 = [port1, port2, port3, port4]
def setUp(self):
super(QuarkSharedIPs, self).setUp()
self.old_show_port_service = cfg.CONF.QUARK.show_port_service
cfg.CONF.set_override('show_port_service', True, 'QUARK')
def tearDown(self):
super(QuarkSharedIPs, self).tearDown()
cfg.CONF.set_override('show_port_service', self.old_show_port_service,
'QUARK')
@contextlib.contextmanager
def _stubs(self, network_info, subnet_info, ports_info):
self.ipam = quark.ipam.QuarkIpamANY()
with contextlib.nested(
mock.patch("neutron.common.rpc.get_notifier"),
mock.patch("neutron.quota.QUOTAS.limit_check")):
net = network_api.create_network(self.context, network_info)
mac = {'mac_address_range': dict(cidr="AA:BB:CC")}
self.context.is_admin = True
macrng_api.create_mac_address_range(self.context, mac)
self.context.is_admin = False
subnet_info['subnet']['network_id'] = net['id']
sub = subnet_api.create_subnet(self.context, subnet_info)
ports = []
for port_info in ports_info:
port_info['port']['network_id'] = net['id']
ports.append(port_api.create_port(self.context, port_info))
yield net, sub, ports
def test_create_shared_ips_with_port_ids(self):
def _make_body(ip):
fix_ip = dict(ip_address=ip, subnet_id=sub['id'])
port_info = {"port": dict(fixed_ips=[fix_ip])}
return port_info
with self._stubs(self.network, self.subnet, self.ports_info2) as (
net, sub, ports):
port_ids = [ports[0]['id'], ports[1]['id']]
shared_ip = {'ip_address': dict(port_ids=port_ids,
network_id=net['id'],
version=4)}
ip = ip_api.create_ip_address(self.context, shared_ip)
self.assertEqual(ip_types.SHARED, ip['type'])
ports_ip = ip_api.get_ports_for_ip_address(self.context, ip['id'])
self.assertEqual(2, len(ports_ip))
def test_shared_ip_in_fixed_ip_list(self):
def _make_body(service):
body = dict(service=service)
port_info = {"port": dict(body)}
return port_info
with self._stubs(self.network, self.subnet, self.ports_info2) as (
net, sub, ports):
for port in ports:
self.assertEqual(1, len(port['fixed_ips']))
port_ids = [ports[0]['id'], ports[1]['id']]
shared_ip = {'ip_address': dict(port_ids=port_ids,
network_id=net['id'],
version=4)}
p_id = ports[0]['id']
ip = ip_api.create_ip_address(self.context, shared_ip)
self.assertEqual(ip_types.SHARED, ip['type'])
ports_ip = ip_api.get_ports_for_ip_address(self.context, ip['id'])
self.assertEqual(2, len(ports_ip))
port_ip_update = ip_api.update_port_for_ip_address
updated_port = port_ip_update(self.context, ip['id'],
p_id, _make_body('derp'))
self.assertEqual('derp', updated_port.get('service'))
port = port_api.get_port(self.context, p_id)
self.assertEqual(2, len(port['fixed_ips']))
def test_ip_port_list_has_services(self):
def _make_body(service):
body = dict(service=service)
port_info = {"port": dict(body)}
return port_info
with self._stubs(self.network, self.subnet, self.ports_info2) as (
net, sub, ports):
for port in ports:
self.assertEqual(1, len(port['fixed_ips']))
device_ids = [ports[0]['device_id'], ports[1]['device_id']]
shared_ip = {'ip_address': dict(device_ids=device_ids,
network_id=net['id'],
version=4)}
ip = ip_api.create_ip_address(self.context, shared_ip)
port_ip_update = ip_api.update_port_for_ip_address
port_ip_update(self.context, ip['id'],
ports[0]['id'], _make_body('derp'))
ports_ip = ip_api.get_ports_for_ip_address(self.context, ip['id'])
self.assertEqual(2, len(ports_ip))
for port in ports_ip:
self.assertTrue('service' in port)
self.assertTrue('device_id' in port)
self.assertTrue('id' in port)
self.assertTrue(port['service'] in ('derp', 'none'),
'Service is: %s' % str(port['service']))
def test_can_delete_ip_without_active_port(self):
def _make_body(service):
body = dict(service=service)
port_info = {"port": dict(body)}
return port_info
with self._stubs(self.network, self.subnet, self.ports_info2) as (
net, sub, ports):
device_ids = [ports[0]['device_id'], ports[1]['device_id']]
shared_ip = {'ip_address': dict(device_ids=device_ids,
network_id=net['id'],
version=4)}
ip = ip_api.create_ip_address(self.context, shared_ip)
ip_api.delete_ip_address(self.context, ip['id'])
with self.assertRaises(q_exceptions.IpAddressNotFound):
ip_api.get_ip_address(self.context, ip['id'])
def test_cannot_delete_ip_with_active_port(self):
def _make_body(service):
body = dict(service=service)
port_info = {"port": dict(body)}
return port_info
with self._stubs(self.network, self.subnet, self.ports_info2) as (
net, sub, ports):
device_ids = [ports[0]['device_id'], ports[1]['device_id']]
shared_ip = {'ip_address': dict(device_ids=device_ids,
network_id=net['id'],
version=4)}
ip = ip_api.create_ip_address(self.context, shared_ip)
port_ip_update = ip_api.update_port_for_ip_address
port_ip_update(self.context, ip['id'],
ports[0]['id'], _make_body('derp'))
with self.assertRaises(self.disassociate_exception):
ip_api.delete_ip_address(self.context, ip['id'])

View File

@@ -325,3 +325,175 @@ class QuarkFindPortsFilterByDeviceOwner(BaseFunctionalTest):
db_api.port_delete(self.context, port_mod1)
db_api.port_delete(self.context, port_mod2)
db_api.port_delete(self.context, port_mod3)
class QuarkPortFixedIPOperations(BaseFunctionalTest):
def __init__(self, *args, **kwargs):
super(QuarkPortFixedIPOperations, self).__init__(*args, **kwargs)
cidr = "192.168.10.0/24"
ip_network = netaddr.IPNetwork(cidr)
cidr_v6 = "2001:db8::/32"
ip_network_v6 = netaddr.IPNetwork(cidr_v6)
# some default stuff
network = dict(name="public", tenant_id="make",
network_plugin="BASE",
ipam_strategy="ANY")
self.net_info = {"network": network}
subnet_v4 = dict(ip_version=4, next_auto_assign_ip=2,
cidr=cidr, first_ip=ip_network.first,
last_ip=ip_network.last, ip_policy=None,
tenant_id="fake")
subnet_v6 = dict(ip_version=6, next_auto_assign_ip=2,
cidr=cidr_v6, first_ip=ip_network_v6.first,
last_ip=ip_network_v6.last, ip_policy=None,
tenant_id="fake")
self.sub_info = {"subnet": subnet_v4}
self.sub_info_v6 = {"subnet": subnet_v6}
@contextlib.contextmanager
def _stubs(self, network_info, subnet_info):
with contextlib.nested(
mock.patch("neutron.common.rpc.get_notifier"),
mock.patch("neutron.quota.QUOTAS.limit_check")):
mac = {'mac_address_range': dict(cidr="AA:BB:CC")}
self.context.is_admin = True
macrng_api.create_mac_address_range(self.context, mac)
self.context.is_admin = False
network = network_api.create_network(self.context, network_info)
subnet_info['subnet']['network_id'] = network['id']
subnet = subnet_api.create_subnet(self.context, subnet_info)
yield network, subnet
def test_create_port_single_fixed_ip(self):
with self._stubs(self.net_info, self.sub_info) as (network, subnet):
fixed_ips = [dict(subnet_id=subnet['id'], enabled=True,
ip_address="192.168.10.45")]
port = dict(port=dict(network_id=network['id'],
tenant_id=self.context.tenant_id,
device_id=2,
fixed_ips=fixed_ips))
expected = {'status': "ACTIVE",
'device_owner': None,
'network_id': network["id"],
'tenant_id': self.context.tenant_id,
'admin_state_up': True,
'fixed_ips': fixed_ips,
'device_id': 2}
result = port_api.create_port(self.context, port)
for key in expected.keys():
self.assertEqual(result[key], expected[key],
"Mismatch on %s" % key)
def test_create_port_multiple_fixed_ipv4(self):
with self._stubs(self.net_info, self.sub_info) as (network, subnet):
fixed_ips = [dict(subnet_id=subnet['id'], enabled=True,
ip_address="192.168.10.45"),
dict(subnet_id=subnet['id'], enabled=True,
ip_address="192.168.10.199")]
port = dict(port=dict(network_id=network['id'],
tenant_id=self.context.tenant_id,
device_id=2,
fixed_ips=fixed_ips))
expected = {'status': "ACTIVE",
'device_owner': None,
'network_id': network["id"],
'tenant_id': self.context.tenant_id,
'admin_state_up': True,
'fixed_ips': fixed_ips,
'device_id': 2}
result = port_api.create_port(self.context, port)
for key in expected.keys():
if key != 'fixed_ips':
self.assertEqual(result[key], expected[key],
"Mismatch on %s" % key)
for ip in result['fixed_ips']:
self.assertTrue(ip in expected['fixed_ips'])
def test_create_port_multiple_fixed_ipv6(self):
with self._stubs(self.net_info, self.sub_info_v6) as (network, subnet):
ipv6a = "2001:db8::10"
ipv6b = "2001:db8::15"
fixed_ips = [dict(subnet_id=subnet['id'], enabled=True,
ip_address=ipv6a),
dict(subnet_id=subnet['id'], enabled=True,
ip_address=ipv6b)]
port = dict(port=dict(network_id=network['id'],
tenant_id=self.context.tenant_id,
device_id=2,
fixed_ips=fixed_ips))
expected = {'status': "ACTIVE",
'device_owner': None,
'network_id': network["id"],
'tenant_id': self.context.tenant_id,
'admin_state_up': True,
'fixed_ips': fixed_ips,
'device_id': 2}
result = port_api.create_port(self.context, port)
for key in expected.keys():
if key != 'fixed_ips':
self.assertEqual(result[key], expected[key],
"Mismatch on %s" % key)
for ip in result['fixed_ips']:
self.assertTrue(ip in expected['fixed_ips'])
def test_update_port_multiple_fixed_ipv4(self):
with self._stubs(self.net_info, self.sub_info) as (network, subnet):
fixed_ips = [dict(subnet_id=subnet['id'], enabled=True,
ip_address="192.168.10.45"),
dict(subnet_id=subnet['id'], enabled=True,
ip_address="192.168.10.199")]
port = dict(port=dict(network_id=network['id'],
tenant_id=self.context.tenant_id,
device_id=2,
fixed_ips=fixed_ips))
expected = {'status': "ACTIVE",
'device_owner': None,
'network_id': network["id"],
'tenant_id': self.context.tenant_id,
'admin_state_up': True,
'fixed_ips': fixed_ips,
'device_id': '2'}
result = port_api.create_port(self.context, port)
fixed_ips = [dict(subnet_id=subnet['id'], enabled=True,
ip_address="192.168.10.236"),
dict(subnet_id=subnet['id'], enabled=True,
ip_address="192.168.10.42")]
new_port = dict(port=dict(fixed_ips=fixed_ips))
result = port_api.update_port(self.context, result['id'], new_port)
for key in expected.keys():
if key != 'fixed_ips':
self.assertEqual(result[key], expected[key],
"Mismatch on %s" % key)
for ip in result['fixed_ips']:
self.assertTrue(ip in fixed_ips,
'%s not in %s' % (ip, expected['fixed_ips']))
def test_port_show(self):
with self._stubs(self.net_info, self.sub_info) as (network, subnet):
fixed_ips = [dict(subnet_id=subnet['id'], enabled=True,
ip_address="192.168.10.45"),
dict(subnet_id=subnet['id'], enabled=True,
ip_address="192.168.10.199")]
port = dict(port=dict(network_id=network['id'],
tenant_id=self.context.tenant_id,
device_id=2,
fixed_ips=fixed_ips))
expected = {'status': "ACTIVE",
'device_owner': None,
'network_id': network["id"],
'tenant_id': self.context.tenant_id,
'admin_state_up': True,
'fixed_ips': fixed_ips,
'device_id': 2}
result = port_api.create_port(self.context, port)
result = port_api.get_port(self.context, result['id'])
for key in expected.keys():
if key != 'fixed_ips':
self.assertEqual(result[key], expected[key],
"Mismatch on %s" % key)
for ip in result['fixed_ips']:
self.assertTrue(ip in fixed_ips,
'%s not in %s' % (ip, expected['fixed_ips']))

View File

@@ -228,8 +228,8 @@ class TestQuarkSharedIPAddressCreate(test_quark_plugin.TestQuarkPlugin):
def test_create_ip_address_address_type_shared(self, mock_dbapi, mock_ipam,
*args):
cfg.CONF.set_override('ipam_reuse_after', 100, "QUARK")
ports = [dict(id=1, network_id=2, ip_addresses=[], service="none"),
dict(id=2, network_id=2, ip_addresses=[], service="none")]
ports = [dict(id=1, network_id=2, ip_addresses=[]),
dict(id=2, network_id=2, ip_addresses=[])]
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, tenant_id=1)
port_models = [models.Port(**p) for p in ports]
@@ -284,38 +284,6 @@ class TestQuarkSharedIPAddressCreate(test_quark_plugin.TestQuarkPlugin):
version=ip_address['version'], ip_addresses=[],
segment_id=None, address_type="fixed")
def test_fail_to_make_shared_with_active_port(self, mock_dbapi, mock_ipam,
*args):
cfg.CONF.set_override('ipam_reuse_after', 100, "QUARK")
ports = [dict(id=1, network_id=2, ip_addresses=[], service="active"),
dict(id=2, network_id=2, ip_addresses=[], service="none")]
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, used_by_tenant_id=1)
port_models = [models.Port(**p) for p in ports]
ip_model = models.IPAddress()
ip_model.update(ip)
mock_dbapi.port_find.side_effect = port_models
mock_ipam.allocate_ip_address.side_effect = (
self._alloc_stub(ip_model))
ip_address = {"network_id": ip["network_id"],
"version": 4, 'device_ids': [2],
"port_ids": [pm.id for pm in port_models]}
qe = quark_exceptions
with self.assertRaises(qe.PortRequiresDisassociation):
self.plugin.create_ip_address(self.context,
dict(ip_address=ip_address))
# NOTE(thomasem): Having to assert that [ip_model] was passed instead
# of an empty list due to the expected behavior of this method being
# that it mutates the passed in list. So, after it's run, the list
# has already been mutated and it's a reference to that list that
# we're checking. This method ought to be changed to return the new
# IP and let the caller mutate the list, not the other way around.
mock_ipam.allocate_ip_address.assert_called_once_with(
self.context, [ip_model], ip['network_id'], None, 100,
version=ip_address['version'], ip_addresses=[],
address_type="shared", segment_id=None)
class TestQuarkSharedIPAddressPortsValid(test_quark_plugin.TestQuarkPlugin):
def test_validate_ports_on_network_raise_segment(self):
@@ -753,8 +721,7 @@ class TestQuarkGetIpAddressPort(test_quark_plugin.TestQuarkPlugin):
def test_get_ip_address_ports(self, mock_dbapi, mock_ipam, *args):
port = dict(mac_address="AA:BB:CC:DD:EE:FF", network_id=1,
tenant_id=self.context.tenant_id, device_id=2,
bridge="xenbr0", device_owner='network:dhcp',
service='none', id=100)
bridge="xenbr0", device_owner='network:dhcp', id=100)
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
port_model = models.Port()
@@ -773,7 +740,6 @@ class TestQuarkGetIpAddressPort(test_quark_plugin.TestQuarkPlugin):
ip_address_id=[1])
self.assertEqual(port["id"], res["id"])
self.assertEqual(port["service"], res["service"])
self.assertEqual(port["device_id"], res["device_id"])
self.assertFalse('mac_address' in res)
self.assertFalse('network_id' in res)
@@ -783,12 +749,10 @@ class TestQuarkGetIpAddressPort(test_quark_plugin.TestQuarkPlugin):
def test_get_ip_address_port(self, mock_dbapi, mock_ipam, *args):
port = dict(mac_address="AA:BB:CC:DD:EE:FF", network_id=1,
tenant_id=self.context.tenant_id, device_id=2,
bridge="xenbr0", device_owner='network:dhcp',
service='none', id=100)
bridge="xenbr0", device_owner='network:dhcp', id=100)
port2 = dict(mac_address="AA:BB:CC:DD:EE:FF", network_id=1,
tenant_id=self.context.tenant_id, device_id=2,
bridge="xenbr0", device_owner='network:dhcp',
service='none', id=100)
bridge="xenbr0", device_owner='network:dhcp', id=100)
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
port_model = models.Port()
@@ -812,7 +776,6 @@ class TestQuarkGetIpAddressPort(test_quark_plugin.TestQuarkPlugin):
id=100, ip_address_id=[1],
scope=mock_dbapi.ONE)
self.assertEqual(port["id"], res["id"])
self.assertEqual(port["service"], res["service"])
self.assertEqual(port["device_id"], res["device_id"])
self.assertFalse('mac_address' in res)
self.assertFalse('network_id' in res)
@@ -820,9 +783,9 @@ class TestQuarkGetIpAddressPort(test_quark_plugin.TestQuarkPlugin):
self.assertFalse('tenant_id' in res)
def test_deleting_inactive_shared_ip(self, mock_dbapi, mock_ipam, *args):
port = dict(id=100, service='none', network_id=2,
port = dict(id=100, network_id=2,
backend_key="derp", device_id="y")
port2 = dict(id=101, service='none', network_id=2,
port2 = dict(id=101, network_id=2,
backend_key="derp", device_id="x")
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, address_type="shared")
@@ -852,59 +815,11 @@ class TestQuarkGetIpAddressPort(test_quark_plugin.TestQuarkPlugin):
with self.assertRaises(exceptions.PortNotFound):
self.plugin.get_port_for_ip_address(self.context, 123, 100)
def test_bad_request_when_deleting_active_shared_ip(self, mock_dbapi,
mock_ipam, *args):
port = dict(id=100, service='considered_active', network_id=2,
backend_key="derp", device_id="y")
port2 = dict(id=101, service='considered_active', network_id=2,
backend_key="derp", device_id="x")
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, address_type="shared")
port_model = models.Port()
port_model2 = models.Port()
port_model.update(port)
port_model2.update(port2)
ip_model = models.IPAddress()
ip_model.update(ip)
ip_model.ports = [port_model, port_model2]
mock_dbapi.port_find.return_value = port_model
mock_dbapi.ip_address_find.return_value = ip_model
mock_ipam.allocate_ip_address.side_effect = (
self._alloc_stub(ip_model))
qe = quark_exceptions
with self.assertRaises(qe.PortRequiresDisassociation):
self.plugin.delete_ip_address(self.context, 1)
def test_bad_request_deleting_single_active_shared_ip(self, mock_dbapi,
mock_ipam, *args):
port = dict(id=100, service='none', network_id=2,
backend_key="derp", device_id="y")
port2 = dict(id=101, service='considered_active', network_id=2,
backend_key="derp", device_id="x")
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, address_type="shared")
port_model = models.Port()
port_model2 = models.Port()
port_model.update(port)
port_model2.update(port2)
ip_model = models.IPAddress()
ip_model.update(ip)
ip_model.ports = [port_model, port_model2]
mock_dbapi.port_find.return_value = port_model
mock_dbapi.ip_address_find.return_value = ip_model
mock_ipam.allocate_ip_address.side_effect = (
self._alloc_stub(ip_model))
qe = quark_exceptions
with self.assertRaises(qe.PortRequiresDisassociation):
self.plugin.delete_ip_address(self.context, 1)
def test_update_port_service_inactive_ip(self, mock_dbapi, mock_ipam,
*args):
port = dict(id=100, service='none', network_id=2,
port = dict(id=100, network_id=2,
backend_key="derp", device_id="y")
port2 = dict(id=101, service='none', network_id=2,
port2 = dict(id=101, network_id=2,
backend_key="derp", device_id="x")
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, address_type="shared")
@@ -916,7 +831,8 @@ class TestQuarkGetIpAddressPort(test_quark_plugin.TestQuarkPlugin):
ip_model.update(ip)
ip_model.ports = [port_model, port_model2]
mock_port_update = patch('quark.plugin_modules.ports.update_port')
ip_mod = 'quark.db.models.IPAddress'
mock_port_update = patch('%s.set_service_for_port' % ip_mod)
self.addCleanup(mock_port_update.stop)
mock_port_update = mock_port_update.start()
@@ -934,7 +850,7 @@ class TestQuarkGetIpAddressPort(test_quark_plugin.TestQuarkPlugin):
def test_update_shared_ip_deactivate(self, mock_dbapi, mock_ipam, *args):
port = dict(id=100, service='compute', network_id=2,
backend_key="derp", device_id="y")
port2 = dict(id=101, service='none', network_id=2,
port2 = dict(id=101, network_id=2,
backend_key="derp", device_id="x")
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, address_type="shared")
@@ -946,7 +862,8 @@ class TestQuarkGetIpAddressPort(test_quark_plugin.TestQuarkPlugin):
ip_model.update(ip)
ip_model.ports = [port_model, port_model2]
mock_port_update = patch('quark.plugin_modules.ports.update_port')
ip_mod = 'quark.db.models.IPAddress'
mock_port_update = patch('%s.set_service_for_port' % ip_mod)
self.addCleanup(mock_port_update.stop)
mock_port_update = mock_port_update.start()

View File

@@ -107,28 +107,6 @@ class TestQuarkGetPorts(test_quark_plugin.TestQuarkPlugin):
self.assertEqual(fixed_ips[0]["ip_address"],
ip["address_readable"])
def test_port_show(self):
ip = dict(id=1, address=netaddr.IPAddress("192.168.1.100").value,
address_readable="192.168.1.100", subnet_id=1, network_id=2,
version=4)
port = dict(mac_address="AA:BB:CC:DD:EE:FF", network_id=1,
tenant_id=self.context.tenant_id, device_id=2)
expected = {'status': "ACTIVE",
'device_owner': None,
'mac_address': 'AA:BB:CC:DD:EE:FF',
'network_id': 1,
'tenant_id': self.context.tenant_id,
'admin_state_up': None,
'device_id': 2}
with self._stubs(ports=port, addrs=[ip]):
result = self.plugin.get_port(self.context, 1)
fixed_ips = result.pop("fixed_ips")
for key in expected.keys():
self.assertEqual(result[key], expected[key])
self.assertEqual(fixed_ips[0]["subnet_id"], ip["subnet_id"])
self.assertEqual(fixed_ips[0]["ip_address"],
ip["address_readable"])
def test_port_show_with_int_mac(self):
port = dict(mac_address=int('AABBCCDDEEFF', 16), network_id=1,
tenant_id=self.context.tenant_id, device_id=2)
@@ -194,165 +172,6 @@ class TestQuarkGetPortsByIPAddress(test_quark_plugin.TestQuarkPlugin):
fields=None)
class TestQuarkCreatePort(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, port=None, ip_addresses=None):
ip_models = [models.IPAddress(**ip) for ip in ip_addresses]
mac = {"address": "AA:BB:CC:DD:EE:FF"}
network = {"network_plugin": "BASE",
"ipam_strategy": "ANY",
"id": 1,
"tenant_id": self.context.tenant_id}
del(port['fixed_ips'])
port['ip_addresses'] = ip_models
port_model = models.Port(**port)
with contextlib.nested(
mock.patch("quark.db.api.network_find"),
mock.patch("quark.db.api.port_count_all"),
mock.patch("quark.db.api.port_create"),
mock.patch("quark.db.api.port_find"),
mock.patch("quark.ipam.QuarkIpam.allocate_ip_address"),
mock.patch("quark.ipam.QuarkIpam.allocate_mac_address")
) as (network_find, port_count, port_create, port_find, allocate_ip,
allocate_mac):
def allocate_ip_effect(context, addresses, *args, **kwargs):
for ip_model in ip_models:
ip_model.enabled_for_port = lambda x: True
addresses.append(ip_models)
network_find.return_value = network
port_count.return_value = 0
port_create.return_value = port_model
port_find.return_value = None
allocate_ip.side_effect = allocate_ip_effect
allocate_mac.return_value = mac
yield port_create
def test_create_port_with_2_fixed_ipv4_ips(self):
ip1 = {"address": netaddr.IPAddress("1.0.0.2").ipv6().value,
"address_readable": "1.0.0.2",
"network_id": 1,
"subnet_id": "subnet1",
"version": 4}
ip2 = {"address": netaddr.IPAddress("2.0.0.2").ipv6().value,
"address_readable": "2.0.0.2",
"network_id": 1,
"subnet_id": "subnet2",
"version": 4}
fixed_ips = [{"ip_address": ip1['address_readable'],
"subnet_id": ip1['subnet_id'],
"enabled": True},
{"ip_address": ip2['address_readable'],
"subnet_id": ip2['subnet_id'],
"enabled": True}]
port = {"port":
{'fixed_ips': fixed_ips,
'id': "11111111-2222-3333-4444-555555555555",
'tenant_id': self.context.tenant_id,
'mac_address': "AA:BB:CC:DD:EE:FF",
'name': "fakeport",
'network_id': 1}}
expected = {'admin_state_up': None,
'device_id': None,
'device_owner': None,
'fixed_ips': fixed_ips,
'id': "11111111-2222-3333-4444-555555555555",
'mac_address': "AA:BB:CC:DD:EE:FF",
'name': "fakeport",
'network_id': 1,
'security_groups': [],
'status': "ACTIVE",
'tenant_id': self.context.tenant_id}
with self._stubs(port=port.get('port'), ip_addresses=(ip1, ip2)) as (
port_create):
result = self.plugin.create_port(self.context, port)
self.assertTrue(port_create.called)
self.assertEqual(result, expected)
def test_create_port_with_2_fixed_ipv6_ips(self):
ip1 = {"address": netaddr.IPAddress("feed::1").ipv6().value,
"address_readable": "feed::1",
"network_id": 1,
"subnet_id": "subnet1",
"version": 6}
ip2 = {"address": netaddr.IPAddress("feef::1").ipv6().value,
"address_readable": "feef::1",
"network_id": 1,
"subnet_id": "subnet2",
"version": 6}
fixed_ips = [{"ip_address": ip1['address_readable'],
"subnet_id": ip1['subnet_id'],
"enabled": True},
{"ip_address": ip2['address_readable'],
"subnet_id": ip2['subnet_id'],
"enabled": True}]
port = {"port":
{'fixed_ips': fixed_ips,
'id': "11111111-2222-3333-4444-555555555555",
'tenant_id': self.context.tenant_id,
'mac_address': "AA:BB:CC:DD:EE:FF",
'name': "fakeport",
'network_id': 1}}
expected = {'admin_state_up': None,
'device_id': None,
'device_owner': None,
'fixed_ips': fixed_ips,
'id': "11111111-2222-3333-4444-555555555555",
'mac_address': "AA:BB:CC:DD:EE:FF",
'name': "fakeport",
'network_id': 1,
'security_groups': [],
'status': "ACTIVE",
'tenant_id': self.context.tenant_id}
with self._stubs(port=port.get('port'), ip_addresses=(ip1, ip2)) as (
port_create):
result = self.plugin.create_port(self.context, port)
self.assertTrue(port_create.called)
self.assertEqual(result, expected)
def test_create_port_mixed_ipv4_and_ipv6_fixed_ips(self):
ip1 = {"address": netaddr.IPAddress("1.0.0.2").ipv6().value,
"address_readable": "1.0.0.2",
"network_id": 1,
"subnet_id": "subnet1",
"version": 4}
ip2 = {"address": netaddr.IPAddress("feed::1").ipv6().value,
"address_readable": "feed::1",
"network_id": 1,
"subnet_id": "subnet2",
"version": 6}
fixed_ips = [{"ip_address": ip1['address_readable'],
"subnet_id": ip1['subnet_id'],
"enabled": True},
{"ip_address": ip2['address_readable'],
"subnet_id": ip2['subnet_id'],
"enabled": True}]
port = {"port":
{'fixed_ips': fixed_ips,
'id': "11111111-2222-3333-4444-555555555555",
'tenant_id': self.context.tenant_id,
'mac_address': "AA:BB:CC:DD:EE:FF",
'name': "fakeport",
'network_id': 1}}
expected = {'admin_state_up': None,
'device_id': None,
'device_owner': None,
'fixed_ips': fixed_ips,
'id': "11111111-2222-3333-4444-555555555555",
'mac_address': "AA:BB:CC:DD:EE:FF",
'name': "fakeport",
'network_id': 1,
'security_groups': [],
'status': "ACTIVE",
'tenant_id': self.context.tenant_id}
with self._stubs(port=port.get('port'), ip_addresses=(ip1, ip2)) as (
port_create):
result = self.plugin.create_port(self.context, port)
self.assertTrue(port_create.called)
self.assertEqual(result, expected)
class TestQuarkCreatePortFailure(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, port=None, network=None, addr=None, mac=None):
@@ -612,35 +431,6 @@ class TestQuarkCreatePortsSameDevBadRequest(test_quark_plugin.TestQuarkPlugin):
for key in expected.keys():
self.assertEqual(result[key], expected[key])
def test_create_port_fixed_ip(self):
network = dict(id='1', tenant_id=self.context.tenant_id)
mac = dict(address="AA:BB:CC:DD:EE:FF")
subnet = dict(id=1, network_id=network["id"])
ip = mock.MagicMock()
ip.get = lambda x, *y: 1 if x == "subnet_id" else None
ip.formatted = lambda: "192.168.10.45"
ip.enabled_for_port = lambda x: True
fixed_ips = [dict(subnet_id=1, enabled=True,
ip_address="192.168.10.45")]
port = dict(port=dict(mac_address=mac["address"], network_id='1',
tenant_id=self.context.tenant_id, device_id=2,
fixed_ips=fixed_ips, ip_addresses=[ip]))
expected = {'status': "ACTIVE",
'device_owner': None,
'mac_address': mac["address"],
'network_id': network["id"],
'tenant_id': self.context.tenant_id,
'admin_state_up': None,
'fixed_ips': fixed_ips,
'device_id': 2}
with self._stubs(port=port["port"], network=network, addr=ip,
mac=mac, subnet=subnet) as port_create:
result = self.plugin.create_port(self.context, port)
self.assertTrue(port_create.called)
for key in expected.keys():
self.assertEqual(result[key], expected[key])
@mock.patch("quark.network_strategy.JSONStrategy.is_parent_network")
def test_create_providernet_port_fixed_ip_not_authorized(self, is_parent):
is_parent.return_value = True
@@ -925,48 +715,6 @@ class TestQuarkUpdatePort(test_quark_plugin.TestQuarkPlugin):
self.plugin.update_port(self.context, 1, new_port)
self.assertEqual(alloc_ip.call_count, 1)
def test_update_port_multiple_fixed_ips(self):
ip1 = netaddr.IPAddress("1.1.1.1")
ip2 = netaddr.IPAddress("1.1.1.2")
with self._stubs(
port=dict(id=1, name="myport", mac_address="0:0:0:0:0:1")
) as (port_find, port_update, alloc_ip, dealloc_ip):
class AllocIPMock(object):
def __init__(mock_self):
mock_self.called_once = False
def __call__(mock_self, ctxt, addrs, *args, **kwargs):
# RM10187 fix - assert that the allocate is called with an
# empty list each time. Otherwise any call after the first
# could pass because the requested IP was created but not
# allocated, or fail because the IP hadn't been generated
# but the IP generated on the previous call satisfied the
# IPAM strategy.
self.assertEqual(addrs, [])
if not mock_self.called_once:
addrs.append(models.IPAddress(
address=ip1.value, address_readable=str(ip1)))
mock_self.called_once = True
else:
addrs.append(models.IPAddress(
address=ip2.value, address_readable=str(ip2)))
alloc_ip.side_effect = AllocIPMock()
new_port = dict(port=dict(
fixed_ips=[dict(subnet_id=1,
ip_address=str(ip1)),
dict(subnet_id=1,
ip_address=str(ip2))]))
port_res = self.plugin.update_port(self.context, 1, new_port)
self.assertEqual(alloc_ip.call_count, 2)
self.assertEqual(port_res["fixed_ips"][0]["ip_address"],
str(ip1.ipv6()))
self.assertEqual(port_res["fixed_ips"][1]["ip_address"],
str(ip2.ipv6()))
def test_update_port_goes_over_quota(self):
fixed_ips = {"fixed_ips": [{"subnet_id": 1},
{"subnet_id": 1},

View File

@@ -208,10 +208,9 @@ class TestDBAPI(BaseFunctionalTest):
r = db_api.port_associate_ip(self.context, mock_ports, mock_address)
self.assertEqual(len(r.associations), len(mock_ports))
for assoc, port in zip(r.associations, mock_ports):
self.assertEqual(assoc.port_id, port.id)
self.assertEqual(assoc.ip_address_id, mock_address.id)
self.assertEqual(assoc.port, port)
self.assertEqual(assoc.ip_address, mock_address)
self.assertEqual(assoc.enabled, False)
self.context.session.add.assert_called_once_with(r)
def test_port_associate_ip_enable_port(self):
self.context.session.add = mock.Mock()
@@ -224,10 +223,10 @@ class TestDBAPI(BaseFunctionalTest):
enable_port="1")
self.assertEqual(len(r.associations), 1)
assoc = r.associations[0]
self.assertEqual(assoc.port_id, mock_port.id)
self.assertEqual(assoc.ip_address_id, mock_address.id)
self.assertEqual(assoc.port, mock_port)
self.assertEqual(assoc.ip_address, mock_address)
self.assertEqual(assoc.enabled, True)
self.context.session.add.assert_called_once_with(r)
self.context.session.add.assert_called_once_with(assoc)
def test_port_disassociate_ip(self):
self.context.session.add = mock.Mock()