Adds support for a new sub resources:

* /ip_addresses/<id>/ports
  * /ip_addresses/<id>/ports/<id>

While these new resources are essentially the same construct as /ports they
have their own view and thus produce different output.

Current version does not include the code for quotas and policy.

Big changes:

  * new resources
  * POST /ip_addresses with device_ids > 1 now makes IP shared (did not before)
This commit is contained in:
Justin Hammond
2015-05-24 17:24:14 -05:00
parent 002e1a21f2
commit 3da9c3e032
10 changed files with 568 additions and 17 deletions

View File

@@ -31,6 +31,13 @@ attr_dict[RESOURCE_NAME] = {'allow_post': True,
'allow_put': True,
'is_visible': True}
SUB_RESOURCE_ATTRIBUTE_MAP = {
'ports': {
'parent': {'collection_name': 'ip_addresses',
'member_name': 'ip_address'}
}
}
LOG = logging.getLogger(__name__)
@@ -62,6 +69,8 @@ class IpAddressesController(wsgi.Controller):
raise webob.exc.HTTPNotFound()
except exceptions.Conflict:
raise webob.exc.HTTPConflict()
except exceptions.BadRequest:
raise webob.exc.HTTPBadRequest()
def update(self, request, id, body=None):
body = self._deserialize(request.body, request.get_content_type())
@@ -71,6 +80,67 @@ class IpAddressesController(wsgi.Controller):
except exceptions.NotFound:
raise webob.exc.HTTPNotFound()
def delete(self, request, id):
context = request.context
try:
return self._plugin.delete_ip_address(context, id)
except exceptions.NotFound:
raise webob.exc.HTTPNotFound()
except exceptions.BadRequest:
raise webob.exc.HTTPBadRequest()
class IpAddressPortController(wsgi.Controller):
def __init__(self, plugin):
self._resource_name = RESOURCE_NAME
self._plugin = plugin
def _clean_query_string(self, request, filters):
clean_list = ['id', 'device_id', 'service']
for clean in clean_list:
if clean in request.GET:
filters[clean] = request.GET[clean]
del request.GET[clean]
def index(self, ip_address_id, request):
context = request.context
filters = {}
self._clean_query_string(request, filters)
fx = self._plugin.get_ports_for_ip_address
try:
ports = fx(context, ip_address_id, filters=filters, **request.GET)
return {"ports": ports}
except exceptions.NotFound:
raise webob.exc.HTTPNotFound()
def create(self, request, **kwargs):
raise webob.exc.HTTPNotImplemented()
def show(self, ip_address_id, request, id):
context = request.context
# TODO(jlh): need to ensure ip_address_id is used to filter port
try:
return {"port":
self._plugin.get_port_for_ip_address(context,
ip_address_id, id)}
except exceptions.NotFound:
raise webob.exc.HTTPNotFound()
def update(self, ip_address_id, request, id, body=None):
body = self._deserialize(request.body, request.get_content_type())
try:
return {"port": self._plugin.update_port_for_ip(request.context,
ip_address_id,
id, body)}
except exceptions.NotFound:
raise webob.exc.HTTPNotFound()
except exceptions.BadRequest:
raise webob.exc.HTTPBadRequest()
def delete(self, request, id, **kwargs):
raise webob.exc.HTTPNotImplemented()
class Ip_addresses(object):
"""IP Addresses support."""
@@ -104,7 +174,16 @@ class Ip_addresses(object):
@classmethod
def get_resources(cls):
"""Returns Ext Resources."""
controller = IpAddressesController(manager.NeutronManager.get_plugin())
return [extensions.ResourceExtension(
Ip_addresses.get_alias(),
controller)]
ip_controller = IpAddressesController(
manager.NeutronManager.get_plugin())
ip_port_controller = IpAddressPortController(
manager.NeutronManager.get_plugin())
resources = []
resources.append(extensions.ResourceExtension(
Ip_addresses.get_alias(),
ip_controller))
parent = {'collection_name': 'ip_addresses',
'member_name': 'ip_address'}
resources.append(extensions.ResourceExtension(
'ports', ip_port_controller, parent=parent))
return resources

View File

@@ -85,7 +85,7 @@ def _model_attrs(model):
def _model_query(context, model, filters, fields=None):
filters = filters or {}
model_filters = []
eq_filters = ["address", "cidr", "deallocated", "ip_version",
eq_filters = ["address", "cidr", "deallocated", "ip_version", "service",
"mac_address_range_id", "transaction_id"]
in_filters = ["device_id", "device_owner", "group_id", "id", "mac_address",
"name", "network_id", "segment_id", "subnet_id",
@@ -295,6 +295,10 @@ def ip_address_create(context, **address_dict):
return ip_address
def ip_address_delete(context, addr):
context.session.delete(addr)
@scoped
def ip_address_find(context, lock_mode=False, **filters):
query = context.session.query(models.IPAddress)

View File

@@ -0,0 +1,24 @@
"""Added service column to port table
Revision ID: 4dbf83f37bc0
Revises: 33e9e23ba761
Create Date: 2015-05-26 13:27:38.995202
"""
# revision identifiers, used by Alembic.
revision = '4dbf83f37bc0'
down_revision = '1bdc1b574beb'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column('quark_ports', sa.Column('service', sa.String(length=255),
nullable=False,
server_default="none"))
def downgrade():
op.drop_column('quark_ports', 'service')

View File

@@ -1 +1 @@
1bdc1b574beb
4dbf83f37bc0

View File

@@ -403,6 +403,7 @@ class Port(BASEV2, models.HasTenant, models.HasId):
device_owner = sa.Column(sa.String(255))
bridge = sa.Column(sa.String(255))
associations = orm.relationship(PortIpAssociation, backref="port")
service = sa.Column(sa.String(255), default="none")
@declarative.declared_attr
def ip_addresses(cls):

View File

@@ -33,10 +33,14 @@ class AmbigiousLswitchCount(exceptions.NeutronException):
message = _("Too many lswitches for network %(net_id)s.")
class IpAddressNotFound(exceptions.NeutronException):
class IpAddressNotFound(exceptions.NotFound):
message = _("IP Address %(addr_id)s not found.")
class PortRequiresDisassociation(exceptions.BadRequest):
message = _("Port requires disassociation before IP can be deleted")
class RouteConflict(exceptions.NeutronException):
message = _("Route overlaps existing route %(route_id)s with %(cidr)s")

View File

@@ -243,6 +243,10 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
def update_ip_address(self, context, id, ip_address):
return ip_addresses.update_ip_address(context, id, ip_address)
@sessioned
def delete_ip_address(self, context, id):
return ip_addresses.delete_ip_address(context, id)
@sessioned
def create_port(self, context, port):
self._fix_missing_tenant_id(context, port["port"])
@@ -256,12 +260,29 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
def update_port(self, context, id, port):
return ports.update_port(context, id, port)
@sessioned
def update_port_for_ip(self, context, ip_id, id, port):
return ip_addresses.update_port_for_ip_address(context, ip_id, id,
port)
@sessioned
def get_ports(self, context, limit=None, page_reverse=False, sorts=None,
marker=None, filters=None, fields=None):
return ports.get_ports(context, limit, sorts, marker, page_reverse,
filters, fields)
@sessioned
def get_ports_for_ip_address(self, context, ip, limit=None,
page_reverse=False, sorts=None, marker=None,
filters=None, fields=None):
return ip_addresses.get_ports_for_ip_address(context, ip, limit, sorts,
marker, page_reverse,
filters, fields)
@sessioned
def get_port_for_ip_address(self, context, ip_id, id, fields=None):
return ip_addresses.get_port_for_ip_address(context, ip_id, id, fields)
@sessioned
def get_ports_count(self, context, filters=None):
return ports.get_ports_count(context, filters)

View File

@@ -23,6 +23,7 @@ from quark.db import api as db_api
from quark.db import ip_types
from quark import exceptions as quark_exceptions
from quark import ipam
from quark.plugin_modules import ports as port_module
from quark import plugin_views as v
CONF = cfg.CONF
@@ -75,7 +76,15 @@ def validate_port_ip_quotas(context, ports):
def _shared_ip_request(ip_address):
port_ids = ip_address.get('ip_address', {}).get('port_ids', [])
return len(port_ids) > 1
device_ids = ip_address.get('ip_address', {}).get('device_ids', [])
return len(port_ids) > 1 or len(device_ids) > 1
def _shared_ip_and_active(iptype, ports, except_port=None):
if(iptype == ip_types.SHARED and any(p.service != 'none' and
p.id != except_port for p in ports)):
return True
return False
def _can_be_shared(address_model):
@@ -85,8 +94,8 @@ def _can_be_shared(address_model):
def create_ip_address(context, body):
LOG.info("create_ip_address for tenant %s" % context.tenant_id)
address_type = (ip_types.SHARED if _shared_ip_request(body)
else ip_types.FIXED)
iptype = (ip_types.SHARED if _shared_ip_request(body)
else ip_types.FIXED)
ip_dict = body.get("ip_address")
port_ids = ip_dict.get('port_ids', [])
network_id = ip_dict.get('network_id')
@@ -139,7 +148,11 @@ def create_ip_address(context, body):
version=ip_version,
ip_addresses=[ip_address]
if ip_address else [],
address_type=address_type)
address_type=iptype)
# Ensure that there are no ports whose service is set to something other
# than 'none' because nova will not be able to know it was disassociated
if _shared_ip_and_active(iptype, ports):
raise quark_exceptions.PortRequiresDisassociation()
with context.session.begin():
new_address = db_api.port_associate_ip(context, ports,
new_addresses[0])
@@ -161,8 +174,7 @@ def _raise_if_shared_and_enabled(address_request, address_model):
def update_ip_address(context, id, ip_address):
LOG.info("update_ip_address %s for tenant %s" %
(id, context.tenant_id))
LOG.info("update_ip_address %s for tenant %s" % (id, context.tenant_id))
ports = []
with context.session.begin():
address = db_api.ip_address_find(
@@ -208,3 +220,118 @@ def update_ip_address(context, id, ip_address):
context, address)
return v._make_ip_dict(address)
return v._make_ip_dict(new_address)
def delete_ip_address(context, id):
"""Delete an ip address.
: param context: neutron api request context
: param id: UUID representing the ip address to delete.
"""
LOG.info("delete_ip_address %s for tenant %s" % (id, context.tenant_id))
with context.session.begin():
ip_address = db_api.ip_address_find(
context, id=id, tenant_id=context.tenant_id, scope=db_api.ONE)
if not ip_address or ip_address.deallocated:
raise quark_exceptions.IpAddressNotFound(addr_id=id)
if _shared_ip_and_active(ip_address.address_type, ip_address.ports):
raise quark_exceptions.PortRequiresDisassociation()
db_api.update_port_associations_for_ip(context, [], ip_address)
ipam_driver.deallocate_ip_address(context, ip_address)
def get_ports_for_ip_address(context, ip_id, limit=None, sorts=None,
marker=None, page_reverse=False, filters=None,
fields=None):
"""Retrieve a list of ports.
The contents of the list depends on the identity of the user
making the request (as indicated by the context) as well as any
filters.
: param context: neutron api request context
: param filters: a dictionary with keys that are valid keys for
a port as listed in the RESOURCE_ATTRIBUTE_MAP object
in neutron/api/v2/attributes.py. Values in this dictionary
are an iterable containing values that will be used for an exact
match comparison for that value. Each result returned by this
function will have matched one of the values for each key in
filters.
: param fields: a list of strings that are valid keys in a
port dictionary as listed in the RESOURCE_ATTRIBUTE_MAP
object in neutron/api/v2/attributes.py. Only these fields
will be returned.
"""
LOG.info("get_ports for tenant %s filters %s fields %s" %
(context.tenant_id, filters, fields))
addr = db_api.ip_address_find(context, id=ip_id, scope=db_api.ONE)
if not addr:
raise quark_exceptions.IpAddressNotFound(addr_id=ip_id)
if filters is None:
filters = {}
filters['ip_address'] = ip_id
ports = db_api.port_find(context, limit, sorts, marker,
fields=fields, join_security_groups=True,
**filters)
return v._make_ip_ports_list(ports, fields)
def get_port_for_ip_address(context, ip_id, id, fields=None):
"""Retrieve a port.
: param context: neutron api request context
: param id: UUID representing the port to fetch.
: param fields: a list of strings that are valid keys in a
port dictionary as listed in the RESOURCE_ATTRIBUTE_MAP
object in neutron/api/v2/attributes.py. Only these fields
will be returned.
"""
LOG.info("get_port %s for tenant %s fields %s" %
(id, context.tenant_id, fields))
addr = db_api.ip_address_find(context, id=ip_id, scope=db_api.ONE)
if not addr:
raise quark_exceptions.IpAddressNotFound(addr_id=ip_id)
filters = {'ip_address': ip_id}
results = db_api.port_find(context, id=id, fields=fields,
scope=db_api.ONE, **filters)
if not results:
raise exceptions.PortNotFound(port_id=id, net_id='')
return v._make_port_for_ip_dict(results)
def update_port_for_ip_address(context, ip_id, id, port):
"""Update values of a port.
: param context: neutron api request context
: param ip_id: UUID representing the ip associated with port to update
: param id: UUID representing the port to update.
: param port: dictionary with keys indicating fields to update.
valid keys are those that have a value of True for 'allow_put'
as listed in the RESOURCE_ATTRIBUTE_MAP object in
neutron/api/v2/attributes.py.
"""
LOG.info("update_port %s for tenant %s" % (id, context.tenant_id))
addr = db_api.ip_address_find(context, id=ip_id, scope=db_api.ONE)
sanitize_list = ['service']
port_dict = {k: port['port'][k] for k in sanitize_list}
if not addr:
raise quark_exceptions.IpAddressNotFound(addr_id=ip_id)
ports = addr.ports
iptype = addr.address_type
if _shared_ip_and_active(iptype, ports, except_port=id):
raise quark_exceptions.PortRequiresDisassociation()
new_port = {"port": port_dict}
port_ret = port_module.update_port(context, id, new_port)
return v._make_port_for_ip_dict(port_ret)

View File

@@ -41,7 +41,11 @@ quark_view_opts = [
cfg.BoolOpt('show_subnet_ip_policy_id',
default=True,
help=_('Controls whether or not to show ip_policy_id for'
'subnets'))
'subnets')),
cfg.BoolOpt('show_port_service',
default=False,
help=_('Controls whether or not to show service for'
'ports'))
]
CONF.register_opts(quark_view_opts, "QUARK")
@@ -151,6 +155,13 @@ def _make_security_group_rule_dict(security_rule, fields=None):
return res
def _ip_port_dict(port, fields=None):
res = {"id": port.get("id"),
"device_id": port.get("device_id"),
"service": port.get("service")}
return res
def _port_dict(port, fields=None):
res = {"id": port.get("id"),
"name": port.get("name"),
@@ -164,6 +175,9 @@ def _port_dict(port, fields=None):
"device_id": port.get("device_id"),
"device_owner": port.get("device_owner")}
if CONF.QUARK.show_port_service:
res['service'] = port.get("service")
if "mac_address" in res and res["mac_address"]:
mac = str(netaddr.EUI(res["mac_address"])).replace('-', ':')
res["mac_address"] = mac
@@ -185,6 +199,11 @@ def _make_port_address_dict(ip, port, fields=None):
return ip_addr
def _make_port_for_ip_dict(port, fields=None):
res = _ip_port_dict(port)
return res
def _make_port_dict(port, fields=None):
res = _port_dict(port)
res["fixed_ips"] = [_make_port_address_dict(ip, port, fields)
@@ -194,6 +213,14 @@ def _make_port_dict(port, fields=None):
return res
def _make_ip_ports_list(query, fields=None):
ports = []
for port in query:
port_dict = _ip_port_dict(port, fields)
ports.append(port_dict)
return ports
def _make_ports_list(query, fields=None):
ports = []
for port in query:

View File

@@ -221,8 +221,8 @@ class TestQuarkSharedIPAddressCreate(test_quark_plugin.TestQuarkPlugin):
def test_create_ip_address_address_type_shared(self, mock_dbapi, mock_ipam,
*args):
cfg.CONF.set_override('ipam_reuse_after', 100, "QUARK")
ports = [dict(id=1, network_id=2, ip_addresses=[]),
dict(id=2, network_id=2, ip_addresses=[])]
ports = [dict(id=1, network_id=2, ip_addresses=[], service="none"),
dict(id=2, network_id=2, ip_addresses=[], service="none")]
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, used_by_tenant_id=1)
port_models = [models.Port(**p) for p in ports]
@@ -277,6 +277,38 @@ class TestQuarkSharedIPAddressCreate(test_quark_plugin.TestQuarkPlugin):
version=ip_address['version'], ip_addresses=[],
address_type="fixed")
def test_fail_to_make_shared_with_active_port(self, mock_dbapi, mock_ipam,
*args):
cfg.CONF.set_override('ipam_reuse_after', 100, "QUARK")
ports = [dict(id=1, network_id=2, ip_addresses=[], service="active"),
dict(id=2, network_id=2, ip_addresses=[], service="none")]
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, used_by_tenant_id=1)
port_models = [models.Port(**p) for p in ports]
ip_model = models.IPAddress()
ip_model.update(ip)
mock_dbapi.port_find.side_effect = port_models
mock_ipam.allocate_ip_address.side_effect = (
self._alloc_stub(ip_model))
ip_address = {"network_id": ip["network_id"],
"version": 4, 'device_ids': [2],
"port_ids": [pm.id for pm in port_models]}
qe = quark_exceptions
with self.assertRaises(qe.PortRequiresDisassociation):
self.plugin.create_ip_address(self.context,
dict(ip_address=ip_address))
# NOTE(thomasem): Having to assert that [ip_model] was passed instead
# of an empty list due to the expected behavior of this method being
# that it mutates the passed in list. So, after it's run, the list
# has already been mutated and it's a reference to that list that
# we're checking. This method ought to be changed to return the new
# IP and let the caller mutate the list, not the other way around.
mock_ipam.allocate_ip_address.assert_called_once_with(
self.context, [ip_model], ip['network_id'], None, 100,
version=ip_address['version'], ip_addresses=[],
address_type="shared")
class TestQuarkSharedIPAddressPortsValid(test_quark_plugin.TestQuarkPlugin):
def test_validate_ports_on_network_raise_segment(self):
@@ -699,4 +731,236 @@ class TestQuarkGetIpAddresses(test_quark_plugin.TestQuarkPlugin):
self.assertEqual(ips[i]["id"], addr["id"])
self.assertEqual(ips[i]["subnet_id"], addr["subnet_id"])
self.assertEqual(ips[i]["address_readable"], addr["address"])
self.assertEqual(addr["port_ids"][0], port["id"])
@mock.patch("quark.plugin_modules.ip_addresses"
".validate_ports_on_network_and_same_segment")
@mock.patch("quark.plugin_modules.ip_addresses.ipam_driver")
@mock.patch("quark.plugin_modules.ip_addresses.db_api")
class TestQuarkGetIpAddressPort(test_quark_plugin.TestQuarkPlugin):
def _alloc_stub(self, ip_model):
def _alloc_ip(context, addr, *args, **kwargs):
addr.append(ip_model)
return _alloc_ip
def test_get_ip_address_ports(self, mock_dbapi, mock_ipam, *args):
port = dict(mac_address="AA:BB:CC:DD:EE:FF", network_id=1,
tenant_id=self.context.tenant_id, device_id=2,
bridge="xenbr0", device_owner='network:dhcp',
service='none', id=100)
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
port_model = models.Port()
port_model.update(port)
ip_model = models.IPAddress()
ip_model.update(ip)
mock_dbapi.port_find.return_value = [port_model]
mock_ipam.allocate_ip_address.side_effect = (
self._alloc_stub(ip_model))
res = self.plugin.get_ports_for_ip_address(self.context, 1)[0]
self.assertEqual(port["id"], res["id"])
self.assertEqual(port["service"], res["service"])
self.assertEqual(port["device_id"], res["device_id"])
self.assertFalse('mac_address' in res)
self.assertFalse('network_id' in res)
self.assertFalse('bridge' in res)
self.assertFalse('tenant_id' in res)
def test_get_ip_address_port(self, mock_dbapi, mock_ipam, *args):
port = dict(mac_address="AA:BB:CC:DD:EE:FF", network_id=1,
tenant_id=self.context.tenant_id, device_id=2,
bridge="xenbr0", device_owner='network:dhcp',
service='none', id=100)
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
port_model = models.Port()
port_model.update(port)
ip_model = models.IPAddress()
ip_model.update(ip)
mock_dbapi.port_find.return_value = port_model
mock_ipam.allocate_ip_address.side_effect = (
self._alloc_stub(ip_model))
res = self.plugin.get_port_for_ip_address(self.context, 1, 100)
self.assertEqual(port["id"], res["id"])
self.assertEqual(port["service"], res["service"])
self.assertEqual(port["device_id"], res["device_id"])
self.assertFalse('mac_address' in res)
self.assertFalse('network_id' in res)
self.assertFalse('bridge' in res)
self.assertFalse('tenant_id' in res)
def test_deleting_inactive_shared_ip(self, mock_dbapi, mock_ipam, *args):
port = dict(id=100, service='none', network_id=2,
backend_key="derp", device_id="y")
port2 = dict(id=101, service='none', network_id=2,
backend_key="derp", device_id="x")
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, address_type="shared")
port_model = models.Port()
port_model2 = models.Port()
port_model.update(port)
port_model2.update(port2)
ip_model = models.IPAddress()
ip_model.update(ip)
ip_model.ports = [port_model, port_model2]
mock_dbapi.port_find.return_value = port_model
mock_dbapi.ip_address_find.return_value = ip_model
mock_ipam.allocate_ip_address.side_effect = (
self._alloc_stub(ip_model))
self.plugin.delete_ip_address(self.context, 1)
self.assertFalse(mock_dbapi.ip_address_delete.called)
self.assertTrue(mock_ipam.deallocate_ip_address.called)
def test_get_ip_address_no_ip_fails(self, mock_dbapi, mock_ipam, *args):
mock_dbapi.ip_address_find.return_value = []
with self.assertRaises(quark_exceptions.IpAddressNotFound):
self.plugin.get_port_for_ip_address(self.context, 123, 100)
def test_get_ip_address_no_port_fails(self, mock_dbapi, mock_ipam, *args):
mock_dbapi.port_find.return_value = []
with self.assertRaises(exceptions.PortNotFound):
self.plugin.get_port_for_ip_address(self.context, 123, 100)
def test_bad_request_when_deleting_active_shared_ip(self, mock_dbapi,
mock_ipam, *args):
port = dict(id=100, service='considered_active', network_id=2,
backend_key="derp", device_id="y")
port2 = dict(id=101, service='considered_active', network_id=2,
backend_key="derp", device_id="x")
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, address_type="shared")
port_model = models.Port()
port_model2 = models.Port()
port_model.update(port)
port_model2.update(port2)
ip_model = models.IPAddress()
ip_model.update(ip)
ip_model.ports = [port_model, port_model2]
mock_dbapi.port_find.return_value = port_model
mock_dbapi.ip_address_find.return_value = ip_model
mock_ipam.allocate_ip_address.side_effect = (
self._alloc_stub(ip_model))
qe = quark_exceptions
with self.assertRaises(qe.PortRequiresDisassociation):
self.plugin.delete_ip_address(self.context, 1)
def test_bad_request_deleting_single_active_shared_ip(self, mock_dbapi,
mock_ipam, *args):
port = dict(id=100, service='none', network_id=2,
backend_key="derp", device_id="y")
port2 = dict(id=101, service='considered_active', network_id=2,
backend_key="derp", device_id="x")
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, address_type="shared")
port_model = models.Port()
port_model2 = models.Port()
port_model.update(port)
port_model2.update(port2)
ip_model = models.IPAddress()
ip_model.update(ip)
ip_model.ports = [port_model, port_model2]
mock_dbapi.port_find.return_value = port_model
mock_dbapi.ip_address_find.return_value = ip_model
mock_ipam.allocate_ip_address.side_effect = (
self._alloc_stub(ip_model))
qe = quark_exceptions
with self.assertRaises(qe.PortRequiresDisassociation):
self.plugin.delete_ip_address(self.context, 1)
def test_update_port_service_inactive_ip(self, mock_dbapi, mock_ipam,
*args):
port = dict(id=100, service='none', network_id=2,
backend_key="derp", device_id="y")
port2 = dict(id=101, service='none', network_id=2,
backend_key="derp", device_id="x")
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, address_type="shared")
port_model = models.Port()
port_model2 = models.Port()
port_model.update(port)
port_model2.update(port2)
ip_model = models.IPAddress()
ip_model.update(ip)
ip_model.ports = [port_model, port_model2]
mock_port_update = patch('quark.plugin_modules.ports.update_port')
self.addCleanup(mock_port_update.stop)
mock_port_update = mock_port_update.start()
mock_dbapi.port_find.return_value = port_model
mock_dbapi.ip_address_find.return_value = ip_model
mock_ipam.allocate_ip_address.side_effect = (
self._alloc_stub(ip_model))
port_update = dict(service='derp')
port_update = {'port': port_update}
self.plugin.update_port_for_ip(self.context, 1, 100, port_update)
self.assertTrue(mock_port_update.called)
self.assertTrue(mock_port_update.called_once_with(
self.context, 100, port_update))
def test_fail_to_update_service_with_active_shared_ip(self, mock_dbapi,
mock_ipam, *args):
port = dict(id=100, service='none', network_id=2,
backend_key="derp", device_id="y")
port2 = dict(id=101, service='compute', network_id=2,
backend_key="derp", device_id="x")
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, address_type="shared")
port_model = models.Port()
port_model2 = models.Port()
port_model.update(port)
port_model2.update(port2)
ip_model = models.IPAddress()
ip_model.update(ip)
ip_model.ports = [port_model, port_model2]
mock_port_update = patch('quark.plugin_modules.ports.update_port')
self.addCleanup(mock_port_update.stop)
mock_port_update = mock_port_update.start()
mock_dbapi.port_find.return_value = port_model
mock_dbapi.ip_address_find.return_value = ip_model
mock_ipam.allocate_ip_address.side_effect = (
self._alloc_stub(ip_model))
port_update = dict(service='derp')
port_update = {'port': port_update}
qe = quark_exceptions
with self.assertRaises(qe.PortRequiresDisassociation):
self.plugin.update_port_for_ip(self.context, 1, 100, port_update)
def test_update_shared_ip_deactivate(self, mock_dbapi, mock_ipam, *args):
port = dict(id=100, service='compute', network_id=2,
backend_key="derp", device_id="y")
port2 = dict(id=101, service='none', network_id=2,
backend_key="derp", device_id="x")
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4, address_type="shared")
port_model = models.Port()
port_model2 = models.Port()
port_model.update(port)
port_model2.update(port2)
ip_model = models.IPAddress()
ip_model.update(ip)
ip_model.ports = [port_model, port_model2]
mock_port_update = patch('quark.plugin_modules.ports.update_port')
self.addCleanup(mock_port_update.stop)
mock_port_update = mock_port_update.start()
mock_dbapi.port_find.return_value = port_model
mock_dbapi.ip_address_find.return_value = ip_model
mock_ipam.allocate_ip_address.side_effect = (
self._alloc_stub(ip_model))
port_update = dict(service='derp')
port_update = {'port': port_update}
self.plugin.update_port_for_ip(self.context, 1, 100, port_update)
self.assertTrue(mock_port_update.called)
self.assertTrue(mock_port_update.called_once_with(
self.context, 100, port_update))