Moves IP Address into their own module

Refactors the plugin to move the IP Address calls and associated tests
into separate submodules for better readability.
This commit is contained in:
Matt Dietz
2013-07-22 18:31:19 +00:00
parent c7fa791506
commit eeb3a884c1
3 changed files with 376 additions and 96 deletions

View File

@@ -41,6 +41,7 @@ from quark.db import api as db_api
from quark.db import models
from quark import exceptions as quark_exceptions
from quark import network_strategy
from quark.plugin_modules import ip_addresses
from quark.plugin_modules import ip_policies
from quark.plugin_modules import mac_address_ranges
from quark.plugin_modules import security_groups
@@ -847,102 +848,6 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
raise quark_exceptions.RouteNotFound(route_id=id)
db_api.route_delete(context, route)
def get_ip_addresses(self, context, **filters):
LOG.info("get_ip_addresses for tenant %s" % context.tenant_id)
filters["_deallocated"] = False
addrs = db_api.ip_address_find(context, scope=db_api.ALL, **filters)
return [v._make_ip_dict(ip) for ip in addrs]
def get_ip_address(self, context, id):
LOG.info("get_ip_address %s for tenant %s" %
(id, context.tenant_id))
addr = db_api.ip_address_find(context, id=id, scope=db_api.ONE)
if not addr:
raise quark_exceptions.IpAddressNotFound(addr_id=id)
return v._make_ip_dict(addr)
def create_ip_address(self, context, ip_address):
LOG.info("create_ip_address for tenant %s" % context.tenant_id)
port = None
ip_dict = ip_address["ip_address"]
port_ids = ip_dict.get('port_ids')
network_id = ip_dict.get('network_id')
device_ids = ip_dict.get('device_ids')
ip_version = ip_dict.get('version')
ip_address = ip_dict.get('ip_address')
ports = []
if device_ids and not network_id:
raise exceptions.BadRequest(
resource="ip_addresses",
msg="network_id is required if device_ids are supplied.")
if network_id and device_ids:
for device_id in device_ids:
port = db_api.port_find(
context, network_id=network_id, device_id=device_id,
tenant_id=context.tenant_id, scope=db_api.ONE)
ports.append(port)
elif port_ids:
for port_id in port_ids:
port = db_api.port_find(context, id=port_id,
tenant_id=context.tenant_id,
scope=db_api.ONE)
ports.append(port)
if not ports:
raise exceptions.PortNotFound(port_id=port_ids,
net_id=network_id)
address = self.ipam_driver.allocate_ip_address(
context,
port['network_id'],
port['id'],
self.ipam_reuse_after,
ip_version,
ip_address)
for port in ports:
port["ip_addresses"].append(address)
return v._make_ip_dict(address)
def update_ip_address(self, context, id, ip_address):
LOG.info("update_ip_address %s for tenant %s" %
(id, context.tenant_id))
address = db_api.ip_address_find(
context, id=id, tenant_id=context.tenant_id, scope=db_api.ONE)
if not address:
raise exceptions.NotFound(
message="No IP address found with id=%s" % id)
old_ports = address['ports']
port_ids = ip_address['ip_address'].get('port_ids')
if port_ids is None:
return v._make_ip_dict(address)
for port in old_ports:
port['ip_addresses'].remove(address)
if port_ids:
ports = db_api.port_find(
context, tenant_id=context.tenant_id, id=port_ids,
scope=db_api.ALL)
# NOTE: could be considered inefficient because we're converting
# to a list to check length. Maybe revisit
if len(ports) != len(port_ids):
raise exceptions.NotFound(
message="No ports not found with ids=%s" % port_ids)
for port in ports:
port['ip_addresses'].extend([address])
else:
address["deallocated"] = 1
return v._make_ip_dict(address)
def get_mac_address_range(self, context, id, fields=None):
return mac_address_ranges.get_mac_address_range(context, id, fields)
@@ -1003,3 +908,15 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
def delete_ip_policy(self, context, id):
return ip_policies.delete_ip_policy(context, id)
def get_ip_addresses(self, context, **filters):
return ip_addresses.get_ip_addresses(context, **filters)
def get_ip_address(self, context, id):
return ip_addresses.get_ip_address(context, id)
def create_ip_address(self, context, ip_address):
return ip_addresses.create_ip_address(context, ip_address)
def update_ip_address(self, context, id, ip_address):
return ip_addresses.update_ip_address(context, id, ip_address)

View File

@@ -0,0 +1,128 @@
# Copyright 2013 Openstack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import exceptions
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from oslo.config import cfg
from quark.db import api as db_api
from quark import exceptions as quark_exceptions
from quark import plugin_views as v
CONF = cfg.CONF
LOG = logging.getLogger("neutron.quark")
ipam_driver = (importutils.import_class(CONF.QUARK.ipam_driver))()
def get_ip_addresses(context, **filters):
LOG.info("get_ip_addresses for tenant %s" % context.tenant_id)
filters["_deallocated"] = False
addrs = db_api.ip_address_find(context, scope=db_api.ALL, **filters)
return [v._make_ip_dict(ip) for ip in addrs]
def get_ip_address(context, id):
LOG.info("get_ip_address %s for tenant %s" %
(id, context.tenant_id))
addr = db_api.ip_address_find(context, id=id, scope=db_api.ONE)
if not addr:
raise quark_exceptions.IpAddressNotFound(addr_id=id)
return v._make_ip_dict(addr)
def create_ip_address(context, ip_address):
LOG.info("create_ip_address for tenant %s" % context.tenant_id)
port = None
ip_dict = ip_address["ip_address"]
port_ids = ip_dict.get('port_ids')
network_id = ip_dict.get('network_id')
device_ids = ip_dict.get('device_ids')
ip_version = ip_dict.get('version')
ip_address = ip_dict.get('ip_address')
ports = []
if device_ids and not network_id:
raise exceptions.BadRequest(
resource="ip_addresses",
msg="network_id is required if device_ids are supplied.")
if network_id and device_ids:
for device_id in device_ids:
port = db_api.port_find(
context, network_id=network_id, device_id=device_id,
tenant_id=context.tenant_id, scope=db_api.ONE)
ports.append(port)
elif port_ids:
for port_id in port_ids:
port = db_api.port_find(context, id=port_id,
tenant_id=context.tenant_id,
scope=db_api.ONE)
ports.append(port)
if not ports:
raise exceptions.PortNotFound(port_id=port_ids,
net_id=network_id)
address = ipam_driver.allocate_ip_address(
context,
port['network_id'],
port['id'],
CONF.QUARK.ipam_reuse_after,
ip_version,
ip_address)
for port in ports:
port["ip_addresses"].append(address)
return v._make_ip_dict(address)
def update_ip_address(context, id, ip_address):
LOG.info("update_ip_address %s for tenant %s" %
(id, context.tenant_id))
address = db_api.ip_address_find(
context, id=id, tenant_id=context.tenant_id, scope=db_api.ONE)
if not address:
raise exceptions.NotFound(
message="No IP address found with id=%s" % id)
old_ports = address['ports']
port_ids = ip_address['ip_address'].get('port_ids')
if port_ids is None:
return v._make_ip_dict(address)
for port in old_ports:
port['ip_addresses'].remove(address)
if port_ids:
ports = db_api.port_find(
context, tenant_id=context.tenant_id, id=port_ids,
scope=db_api.ALL)
# NOTE: could be considered inefficient because we're converting
# to a list to check length. Maybe revisit
if len(ports) != len(port_ids):
raise exceptions.NotFound(
message="No ports not found with ids=%s" % port_ids)
for port in ports:
port['ip_addresses'].extend([address])
else:
address["deallocated"] = 1
return v._make_ip_dict(address)

View File

@@ -0,0 +1,235 @@
# Copyright 2013 Openstack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import mock
from neutron.common import exceptions
from quark.db import models
from quark import exceptions as quark_exceptions
from quark.tests import test_quark_plugin
class TestIpAddresses(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, port, addr):
port_model = None
addr_model = None
if port:
port_model = models.Port()
port_model.update(port)
if addr:
addr_model = models.IPAddress()
addr_model.update(addr)
with contextlib.nested(
mock.patch("quark.db.api.port_find"),
mock.patch("quark.ipam.QuarkIpam.allocate_ip_address")
) as (port_find, alloc_ip):
port_find.return_value = port_model
alloc_ip.return_value = addr_model
yield
def test_create_ip_address_by_network_and_device(self):
port = dict(id=1, network_id=2, ip_addresses=[])
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4,
tenant_id=self.context.tenant_id)
with self._stubs(port=port, addr=ip):
ip_address = dict(network_id=ip["network_id"],
device_ids=[4])
response = self.plugin.create_ip_address(
self.context, dict(ip_address=ip_address))
self.assertIsNotNone(response["id"])
self.assertEqual(response["network_id"], ip_address["network_id"])
self.assertEqual(response["device_ids"], [""])
self.assertEqual(response["port_ids"], [port["id"]])
self.assertEqual(response["subnet_id"], ip["subnet_id"])
self.assertEqual(response["tenant_id"], self.context.tenant_id)
self.assertFalse(response["shared"])
self.assertEqual(response["version"], 4)
self.assertEqual(response["address"], "192.168.1.100")
def test_create_ip_address_with_port(self):
port = dict(id=1, network_id=2, ip_addresses=[])
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
with self._stubs(port=port, addr=ip):
ip_address = dict(port_ids=[port["id"]])
response = self.plugin.create_ip_address(
self.context, dict(ip_address=ip_address))
self.assertIsNotNone(response['id'])
self.assertEqual(response['network_id'], ip["network_id"])
self.assertEqual(response['port_ids'], [port["id"]])
self.assertEqual(response['subnet_id'], ip['id'])
def test_create_ip_address_by_device_no_network_fails(self):
with self._stubs(port={}, addr=None):
ip_address = dict(device_ids=[4])
with self.assertRaises(exceptions.BadRequest):
self.plugin.create_ip_address(self.context,
dict(ip_address=ip_address))
def test_create_ip_address_invalid_network_and_device(self):
with self._stubs(port=None, addr=None):
with self.assertRaises(exceptions.PortNotFound):
ip_address = {'ip_address': {'network_id': 'fake',
'device_id': 'fake'}}
self.plugin.create_ip_address(self.context, ip_address)
def test_create_ip_address_invalid_port(self):
with self._stubs(port=None, addr=None):
with self.assertRaises(exceptions.PortNotFound):
ip_address = {'ip_address': {'port_id': 'fake'}}
self.plugin.create_ip_address(self.context, ip_address)
class TestQuarkUpdateIPAddress(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, ports, addr, addr_ports=False):
port_models = []
addr_model = None
for port in ports:
port_model = models.Port()
port_model.update(port)
port_models.append(port_model)
if addr:
addr_model = models.IPAddress()
addr_model.update(addr)
if addr_ports:
addr_model.ports = port_models
db_mod = "quark.db.api"
with contextlib.nested(
mock.patch("%s.port_find" % db_mod),
mock.patch("%s.ip_address_find" % db_mod),
) as (port_find, ip_find):
port_find.return_value = port_models
ip_find.return_value = addr_model
yield
def test_update_ip_address_does_not_exist(self):
with self._stubs(ports=[], addr=None):
with self.assertRaises(exceptions.NotFound):
self.plugin.update_ip_address(self.context,
'no_ip_address_id',
{'ip_address': {'port_ids': []}})
def test_update_ip_address_port_not_found(self):
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
with self._stubs(ports=[], addr=ip):
with self.assertRaises(exceptions.NotFound):
ip_address = {'ip_address': {'port_ids': ['fake']}}
self.plugin.update_ip_address(self.context,
ip["id"],
ip_address)
def test_update_ip_address_specify_ports(self):
port = dict(id=1, network_id=2, ip_addresses=[])
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
with self._stubs(ports=[port], addr=ip):
ip_address = {'ip_address': {'port_ids': [port['id']]}}
response = self.plugin.update_ip_address(self.context,
ip['id'],
ip_address)
self.assertEqual(response['port_ids'], [port['id']])
def test_update_ip_address_no_ports(self):
port = dict(id=1, network_id=2, ip_addresses=[])
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
with self._stubs(ports=[port], addr=ip):
ip_address = {'ip_address': {}}
response = self.plugin.update_ip_address(self.context,
ip['id'],
ip_address)
self.assertEqual(response['port_ids'], [])
def test_update_ip_address_empty_ports_delete(self):
port = dict(id=1, network_id=2, ip_addresses=[])
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
with self._stubs(ports=[port], addr=ip, addr_ports=True):
ip_address = {'ip_address': {'port_ids': []}}
response = self.plugin.update_ip_address(self.context,
ip['id'],
ip_address)
self.assertEqual(response['port_ids'], [])
class TestQuarkGetIpAddresses(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, ips, ports):
with mock.patch("quark.db.api.ip_address_find") as ip_find:
ip_models = []
port_models = []
for port in ports:
p = models.Port()
p.update(port)
port_models.append(p)
if isinstance(ips, list):
for ip in ips:
version = ip.pop("version")
ip_mod = models.IPAddress()
ip_mod.update(ip)
ip_mod.version = version
ip_mod.ports = port_models
ip_models.append(ip_mod)
ip_find.return_value = ip_models
else:
if ips:
version = ips.pop("version")
ip_mod = models.IPAddress()
ip_mod.update(ips)
ip_mod.version = version
ip_mod.ports = port_models
ip_find.return_value = ip_mod
else:
ip_find.return_value = ips
yield
def test_get_ip_addresses(self):
port = dict(id=100, device_id="foobar")
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
with self._stubs(ips=[ip], ports=[port]):
res = self.plugin.get_ip_addresses(self.context)
addr_res = res[0]
self.assertEqual(ip["id"], addr_res["id"])
self.assertEqual(ip["subnet_id"], addr_res["subnet_id"])
self.assertEqual(ip["address_readable"], addr_res["address"])
self.assertEqual(addr_res["port_ids"][0], port["id"])
self.assertEqual(addr_res["device_ids"][0], port["device_id"])
def test_get_ip_address(self):
port = dict(id=100)
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
subnet_id=1, network_id=2, version=4)
with self._stubs(ips=ip, ports=[port]):
res = self.plugin.get_ip_address(self.context, 1)
self.assertEqual(ip["id"], res["id"])
self.assertEqual(ip["subnet_id"], res["subnet_id"])
self.assertEqual(ip["address_readable"], res["address"])
self.assertEqual(res["port_ids"][0], port["id"])
def test_get_ip_address_no_ip_fails(self):
port = dict(id=100)
with self._stubs(ips=None, ports=[port]):
with self.assertRaises(quark_exceptions.IpAddressNotFound):
self.plugin.get_ip_address(self.context, 1)