From e5ad33c4281056f37edd0cbd9d3ba853b374ce50 Mon Sep 17 00:00:00 2001 From: Matt Dietz Date: Tue, 23 Jul 2013 21:11:05 +0000 Subject: [PATCH] Moves subnets into a submodule Refactors plugin.py by moving subnets functions and their associated tests into submodules for better plugin readability. --- quark/plugin.py | 266 +------- quark/plugin_modules/subnets.py | 281 +++++++++ quark/tests/plugin_modules/test_subnets.py | 670 +++++++++++++++++++++ quark/tests/test_quark_plugin.py | 647 -------------------- 4 files changed, 973 insertions(+), 891 deletions(-) create mode 100644 quark/plugin_modules/subnets.py create mode 100644 quark/tests/plugin_modules/test_subnets.py diff --git a/quark/plugin.py b/quark/plugin.py index e219ac4..8ef5590 100644 --- a/quark/plugin.py +++ b/quark/plugin.py @@ -16,14 +16,11 @@ """ v2 Neutron Plug-in API Quark Implementation """ -import netaddr from oslo.config import cfg from sqlalchemy.orm import sessionmaker, scoped_session from zope import sqlalchemy as zsa -#FIXME(mdietz): remove once all resources have moved into submods -from neutron.common import config as neutron_cfg from neutron.common import exceptions from neutron.db import api as neutron_db_api from neutron.extensions import providernet as pnet @@ -45,6 +42,7 @@ from quark.plugin_modules import mac_address_ranges from quark.plugin_modules import ports from quark.plugin_modules import routes from quark.plugin_modules import security_groups +from quark.plugin_modules import subnets from quark import plugin_views as v from quark import utils @@ -88,244 +86,6 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2, self.ipam_reuse_after = CONF.QUARK.ipam_reuse_after neutron_db_api.register_models(base=models.BASEV2) - def _validate_subnet_cidr(self, context, network_id, new_subnet_cidr): - """Validate the CIDR for a subnet. - - Verifies the specified CIDR does not overlap with the ones defined - for the other subnets specified for this network, or with any other - CIDR if overlapping IPs are disabled. - - """ - if neutron_cfg.cfg.CONF.allow_overlapping_ips: - return - - new_subnet_ipset = netaddr.IPSet([new_subnet_cidr]) - - # Using admin context here, in case we actually share networks later - subnet_list = db_api.subnet_find(context.elevated(), - network_id=network_id) - for subnet in subnet_list: - if (netaddr.IPSet([subnet.cidr]) & new_subnet_ipset): - # don't give out details of the overlapping subnet - err_msg = (_("Requested subnet with cidr: %(cidr)s for " - "network: %(network_id)s overlaps with another " - "subnet") % - {'cidr': new_subnet_cidr, - 'network_id': network_id}) - LOG.error(_("Validation for CIDR: %(new_cidr)s failed - " - "overlaps with subnet %(subnet_id)s " - "(CIDR: %(cidr)s)"), - {'new_cidr': new_subnet_cidr, - 'subnet_id': subnet.id, - 'cidr': subnet.cidr}) - raise exceptions.InvalidInput(error_message=err_msg) - - def create_subnet(self, context, subnet): - """Create a subnet. - - Create a subnet which represents a range of IP addresses - that can be allocated to devices - - : param context: neutron api request context - : param subnet: dictionary describing the subnet, with keys - as listed in the RESOURCE_ATTRIBUTE_MAP object in - neutron/api/v2/attributes.py. All keys will be populated. - """ - LOG.info("create_subnet for tenant %s" % context.tenant_id) - net_id = subnet["subnet"]["network_id"] - - net = db_api.network_find(context, id=net_id, scope=db_api.ONE) - if not net: - raise exceptions.NetworkNotFound(net_id=net_id) - - sub_attrs = subnet["subnet"] - - self._validate_subnet_cidr(context, net_id, sub_attrs["cidr"]) - - cidr = netaddr.IPNetwork(sub_attrs["cidr"]) - gateway_ip = utils.pop_param(sub_attrs, "gateway_ip", str(cidr[1])) - dns_ips = utils.pop_param(sub_attrs, "dns_nameservers", []) - host_routes = utils.pop_param(sub_attrs, "host_routes", []) - allocation_pools = utils.pop_param(sub_attrs, "allocation_pools", []) - - new_subnet = db_api.subnet_create(context, **sub_attrs) - - default_route = None - for route in host_routes: - netaddr_route = netaddr.IPNetwork(route["destination"]) - if netaddr_route.value == routes.DEFAULT_ROUTE.value: - default_route = route - gateway_ip = default_route["nexthop"] - new_subnet["routes"].append(db_api.route_create( - context, cidr=route["destination"], gateway=route["nexthop"])) - - if default_route is None: - new_subnet["routes"].append(db_api.route_create( - context, cidr=str(routes.DEFAULT_ROUTE), gateway=gateway_ip)) - - for dns_ip in dns_ips: - new_subnet["dns_nameservers"].append(db_api.dns_create( - context, ip=netaddr.IPAddress(dns_ip))) - - if allocation_pools: - exclude = netaddr.IPSet([cidr]) - for p in allocation_pools: - x = netaddr.IPSet(netaddr.IPRange(p["start"], p["end"])) - exclude = exclude - x - new_subnet["ip_policy"] = db_api.ip_policy_create(context, - exclude=exclude) - # HACK(amir): force backref for ip_policy - if not new_subnet["network"]: - new_subnet["network"] = net - subnet_dict = v._make_subnet_dict(new_subnet, - default_route=routes.DEFAULT_ROUTE) - subnet_dict["gateway_ip"] = gateway_ip - return subnet_dict - - def update_subnet(self, context, id, subnet): - """Update values of a subnet. - - : param context: neutron api request context - : param id: UUID representing the subnet to update. - : param subnet: dictionary with keys indicating fields to update. - valid keys are those that have a value of True for 'allow_put' - as listed in the RESOURCE_ATTRIBUTE_MAP object in - neutron/api/v2/attributes.py. - """ - LOG.info("update_subnet %s for tenant %s" % - (id, context.tenant_id)) - - subnet_db = db_api.subnet_find(context, id=id, scope=db_api.ONE) - if not subnet_db: - raise exceptions.SubnetNotFound(id=id) - - s = subnet["subnet"] - - dns_ips = s.pop("dns_nameservers", []) - host_routes = s.pop("host_routes", []) - gateway_ip = s.pop("gateway_ip", None) - - if gateway_ip: - default_route = None - for route in host_routes: - netaddr_route = netaddr.IPNetwork(route["destination"]) - if netaddr_route.value == routes.DEFAULT_ROUTE.value: - default_route = route - break - if default_route is None: - route_model = db_api.route_find( - context, cidr=str(routes.DEFAULT_ROUTE), subnet_id=id, - scope=db_api.ONE) - if route_model: - db_api.route_update(context, route_model, - gateway=gateway_ip) - else: - db_api.route_create(context, - cidr=str(routes.DEFAULT_ROUTE), - gateway=gateway_ip, subnet_id=id) - - if dns_ips: - subnet_db["dns_nameservers"] = [] - for dns_ip in dns_ips: - subnet_db["dns_nameservers"].append(db_api.dns_create( - context, - ip=netaddr.IPAddress(dns_ip))) - - if host_routes: - subnet_db["routes"] = [] - for route in host_routes: - subnet_db["routes"].append(db_api.route_create( - context, cidr=route["destination"], gateway=route["nexthop"])) - - subnet = db_api.subnet_update(context, subnet_db, **s) - return v._make_subnet_dict(subnet, default_route=routes.DEFAULT_ROUTE) - - def get_subnet(self, context, id, fields=None): - """Retrieve a subnet. - - : param context: neutron api request context - : param id: UUID representing the subnet to fetch. - : param fields: a list of strings that are valid keys in a - subnet dictionary as listed in the RESOURCE_ATTRIBUTE_MAP - object in neutron/api/v2/attributes.py. Only these fields - will be returned. - """ - LOG.info("get_subnet %s for tenant %s with fields %s" % - (id, context.tenant_id, fields)) - subnet = db_api.subnet_find(context, id=id, scope=db_api.ONE) - if not subnet: - raise exceptions.SubnetNotFound(subnet_id=id) - - # Check the network_id against the strategies - net_id = subnet["network_id"] - net_id = STRATEGY.get_parent_network(net_id) - subnet["network_id"] = net_id - - return v._make_subnet_dict(subnet, default_route=routes.DEFAULT_ROUTE) - - def get_subnets(self, context, filters=None, fields=None): - """Retrieve a list of subnets. - - The contents of the list depends on the identity of the user - making the request (as indicated by the context) as well as any - filters. - : param context: neutron api request context - : param filters: a dictionary with keys that are valid keys for - a subnet as listed in the RESOURCE_ATTRIBUTE_MAP object - in neutron/api/v2/attributes.py. Values in this dictiontary - are an iterable containing values that will be used for an exact - match comparison for that value. Each result returned by this - function will have matched one of the values for each key in - filters. - : param fields: a list of strings that are valid keys in a - subnet dictionary as listed in the RESOURCE_ATTRIBUTE_MAP - object in neutron/api/v2/attributes.py. Only these fields - will be returned. - """ - LOG.info("get_subnets for tenant %s with filters %s fields %s" % - (context.tenant_id, filters, fields)) - subnets = db_api.subnet_find(context, **filters) - return v._make_subnets_list(subnets, fields=fields, - default_route=routes.DEFAULT_ROUTE) - - def get_subnets_count(self, context, filters=None): - """Return the number of subnets. - - The result depends on the identity of the user making the request - (as indicated by the context) as well as any filters. - : param context: neutron api request context - : param filters: a dictionary with keys that are valid keys for - a network as listed in the RESOURCE_ATTRIBUTE_MAP object - in neutron/api/v2/attributes.py. Values in this dictiontary - are an iterable containing values that will be used for an exact - match comparison for that value. Each result returned by this - function will have matched one of the values for each key in - filters. - - NOTE: this method is optional, as it was not part of the originally - defined plugin API. - """ - LOG.info("get_subnets_count for tenant %s with filters %s" % - (context.tenant_id, filters)) - return db_api.subnet_count_all(context, **filters) - - def _delete_subnet(self, context, subnet): - if subnet.allocated_ips: - raise exceptions.SubnetInUse(subnet_id=subnet["id"]) - db_api.subnet_delete(context, subnet) - - def delete_subnet(self, context, id): - """Delete a subnet. - - : param context: neutron api request context - : param id: UUID representing the subnet to delete. - """ - LOG.info("delete_subnet %s for tenant %s" % (id, context.tenant_id)) - subnet = db_api.subnet_find(context, id=id, scope=db_api.ONE) - if not subnet: - raise exceptions.SubnetNotFound(subnet_id=id) - self._delete_subnet(context, subnet) - def _adapt_provider_nets(self, context, network): #TODO(mdietz) going to ignore all the boundary and network # type checking for now. @@ -366,14 +126,14 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2, phys_type=pnet_type, phys_net=phys_net, segment_id=seg_id) - subnets = net_attrs.pop("subnets", []) + subs = net_attrs.pop("subnets", []) net_attrs["id"] = net_uuid net_attrs["tenant_id"] = context.tenant_id new_net = db_api.network_create(context, **net_attrs) new_subnets = [] - for sub in subnets: + for sub in subs: sub["subnet"]["network_id"] = new_net["id"] sub["subnet"]["tenant_id"] = context.tenant_id s = db_api.subnet_create(context, **sub["subnet"]) @@ -483,7 +243,7 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2, raise exceptions.NetworkInUse(net_id=id) self.net_driver.delete_network(context, id) for subnet in net["subnets"]: - self._delete_subnet(context, subnet) + subnets._delete_subnet(context, subnet) db_api.network_delete(context, net) def get_mac_address_range(self, context, id, fields=None): @@ -594,3 +354,21 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2, def delete_route(self, context, id): routes.delete_route(context, id) + + def create_subnet(self, context, subnet): + return subnets.create_subnet(context, subnet) + + def update_subnet(self, context, id, subnet): + return subnets.update_subnet(context, id, subnet) + + def get_subnet(self, context, id, fields=None): + return subnets.get_subnet(context, id, fields) + + def get_subnets(self, context, filters=None, fields=None): + return subnets.get_subnets(context, filters, fields) + + def get_subnets_count(self, context, filters=None): + return subnets.get_subnets_count(context, filters) + + def delete_subnet(self, context, id): + return subnets.delete_subnet(context, id) diff --git a/quark/plugin_modules/subnets.py b/quark/plugin_modules/subnets.py new file mode 100644 index 0000000..424f2e3 --- /dev/null +++ b/quark/plugin_modules/subnets.py @@ -0,0 +1,281 @@ +# Copyright 2013 Openstack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import netaddr + +from neutron.common import config as neutron_cfg +from neutron.common import exceptions +from neutron.openstack.common import importutils +from neutron.openstack.common import log as logging +from oslo.config import cfg + +from quark.db import api as db_api +from quark import network_strategy +from quark.plugin_modules import routes +from quark import plugin_views as v +from quark import utils + +CONF = cfg.CONF +DEFAULT_ROUTE = netaddr.IPNetwork("0.0.0.0/0") +LOG = logging.getLogger("neutron.quark") +STRATEGY = network_strategy.STRATEGY + +ipam_driver = (importutils.import_class(CONF.QUARK.ipam_driver))() + + +def _validate_subnet_cidr(context, network_id, new_subnet_cidr): + """Validate the CIDR for a subnet. + + Verifies the specified CIDR does not overlap with the ones defined + for the other subnets specified for this network, or with any other + CIDR if overlapping IPs are disabled. + + """ + if neutron_cfg.cfg.CONF.allow_overlapping_ips: + return + + new_subnet_ipset = netaddr.IPSet([new_subnet_cidr]) + + # Using admin context here, in case we actually share networks later + subnet_list = db_api.subnet_find(context.elevated(), + network_id=network_id) + for subnet in subnet_list: + if (netaddr.IPSet([subnet.cidr]) & new_subnet_ipset): + # don't give out details of the overlapping subnet + err_msg = (_("Requested subnet with cidr: %(cidr)s for " + "network: %(network_id)s overlaps with another " + "subnet") % + {'cidr': new_subnet_cidr, + 'network_id': network_id}) + LOG.error(_("Validation for CIDR: %(new_cidr)s failed - " + "overlaps with subnet %(subnet_id)s " + "(CIDR: %(cidr)s)"), + {'new_cidr': new_subnet_cidr, + 'subnet_id': subnet.id, + 'cidr': subnet.cidr}) + raise exceptions.InvalidInput(error_message=err_msg) + + +def create_subnet(context, subnet): + """Create a subnet. + + Create a subnet which represents a range of IP addresses + that can be allocated to devices + + : param context: neutron api request context + : param subnet: dictionary describing the subnet, with keys + as listed in the RESOURCE_ATTRIBUTE_MAP object in + neutron/api/v2/attributes.py. All keys will be populated. + """ + LOG.info("create_subnet for tenant %s" % context.tenant_id) + net_id = subnet["subnet"]["network_id"] + + net = db_api.network_find(context, id=net_id, scope=db_api.ONE) + if not net: + raise exceptions.NetworkNotFound(net_id=net_id) + + sub_attrs = subnet["subnet"] + + _validate_subnet_cidr(context, net_id, sub_attrs["cidr"]) + + cidr = netaddr.IPNetwork(sub_attrs["cidr"]) + gateway_ip = utils.pop_param(sub_attrs, "gateway_ip", str(cidr[1])) + dns_ips = utils.pop_param(sub_attrs, "dns_nameservers", []) + host_routes = utils.pop_param(sub_attrs, "host_routes", []) + allocation_pools = utils.pop_param(sub_attrs, "allocation_pools", []) + + new_subnet = db_api.subnet_create(context, **sub_attrs) + + default_route = None + for route in host_routes: + netaddr_route = netaddr.IPNetwork(route["destination"]) + if netaddr_route.value == routes.DEFAULT_ROUTE.value: + default_route = route + gateway_ip = default_route["nexthop"] + new_subnet["routes"].append(db_api.route_create( + context, cidr=route["destination"], gateway=route["nexthop"])) + + if default_route is None: + new_subnet["routes"].append(db_api.route_create( + context, cidr=str(routes.DEFAULT_ROUTE), gateway=gateway_ip)) + + for dns_ip in dns_ips: + new_subnet["dns_nameservers"].append(db_api.dns_create( + context, ip=netaddr.IPAddress(dns_ip))) + + if allocation_pools: + exclude = netaddr.IPSet([cidr]) + for p in allocation_pools: + x = netaddr.IPSet(netaddr.IPRange(p["start"], p["end"])) + exclude = exclude - x + new_subnet["ip_policy"] = db_api.ip_policy_create(context, + exclude=exclude) + # HACK(amir): force backref for ip_policy + if not new_subnet["network"]: + new_subnet["network"] = net + subnet_dict = v._make_subnet_dict(new_subnet, + default_route=routes.DEFAULT_ROUTE) + subnet_dict["gateway_ip"] = gateway_ip + return subnet_dict + + +def update_subnet(context, id, subnet): + """Update values of a subnet. + + : param context: neutron api request context + : param id: UUID representing the subnet to update. + : param subnet: dictionary with keys indicating fields to update. + valid keys are those that have a value of True for 'allow_put' + as listed in the RESOURCE_ATTRIBUTE_MAP object in + neutron/api/v2/attributes.py. + """ + LOG.info("update_subnet %s for tenant %s" % + (id, context.tenant_id)) + + subnet_db = db_api.subnet_find(context, id=id, scope=db_api.ONE) + if not subnet_db: + raise exceptions.SubnetNotFound(id=id) + + s = subnet["subnet"] + + dns_ips = s.pop("dns_nameservers", []) + host_routes = s.pop("host_routes", []) + gateway_ip = s.pop("gateway_ip", None) + + if gateway_ip: + default_route = None + for route in host_routes: + netaddr_route = netaddr.IPNetwork(route["destination"]) + if netaddr_route.value == routes.DEFAULT_ROUTE.value: + default_route = route + break + if default_route is None: + route_model = db_api.route_find( + context, cidr=str(routes.DEFAULT_ROUTE), subnet_id=id, + scope=db_api.ONE) + if route_model: + db_api.route_update(context, route_model, + gateway=gateway_ip) + else: + db_api.route_create(context, + cidr=str(routes.DEFAULT_ROUTE), + gateway=gateway_ip, subnet_id=id) + + if dns_ips: + subnet_db["dns_nameservers"] = [] + for dns_ip in dns_ips: + subnet_db["dns_nameservers"].append(db_api.dns_create( + context, + ip=netaddr.IPAddress(dns_ip))) + + if host_routes: + subnet_db["routes"] = [] + for route in host_routes: + subnet_db["routes"].append(db_api.route_create( + context, cidr=route["destination"], gateway=route["nexthop"])) + + subnet = db_api.subnet_update(context, subnet_db, **s) + return v._make_subnet_dict(subnet, default_route=routes.DEFAULT_ROUTE) + + +def get_subnet(context, id, fields=None): + """Retrieve a subnet. + + : param context: neutron api request context + : param id: UUID representing the subnet to fetch. + : param fields: a list of strings that are valid keys in a + subnet dictionary as listed in the RESOURCE_ATTRIBUTE_MAP + object in neutron/api/v2/attributes.py. Only these fields + will be returned. + """ + LOG.info("get_subnet %s for tenant %s with fields %s" % + (id, context.tenant_id, fields)) + subnet = db_api.subnet_find(context, id=id, scope=db_api.ONE) + if not subnet: + raise exceptions.SubnetNotFound(subnet_id=id) + + # Check the network_id against the strategies + net_id = subnet["network_id"] + net_id = STRATEGY.get_parent_network(net_id) + subnet["network_id"] = net_id + + return v._make_subnet_dict(subnet, default_route=routes.DEFAULT_ROUTE) + + +def get_subnets(context, filters=None, fields=None): + """Retrieve a list of subnets. + + The contents of the list depends on the identity of the user + making the request (as indicated by the context) as well as any + filters. + : param context: neutron api request context + : param filters: a dictionary with keys that are valid keys for + a subnet as listed in the RESOURCE_ATTRIBUTE_MAP object + in neutron/api/v2/attributes.py. Values in this dictiontary + are an iterable containing values that will be used for an exact + match comparison for that value. Each result returned by this + function will have matched one of the values for each key in + filters. + : param fields: a list of strings that are valid keys in a + subnet dictionary as listed in the RESOURCE_ATTRIBUTE_MAP + object in neutron/api/v2/attributes.py. Only these fields + will be returned. + """ + LOG.info("get_subnets for tenant %s with filters %s fields %s" % + (context.tenant_id, filters, fields)) + subnets = db_api.subnet_find(context, **filters) + return v._make_subnets_list(subnets, fields=fields, + default_route=routes.DEFAULT_ROUTE) + + +def get_subnets_count(context, filters=None): + """Return the number of subnets. + + The result depends on the identity of the user making the request + (as indicated by the context) as well as any filters. + : param context: neutron api request context + : param filters: a dictionary with keys that are valid keys for + a network as listed in the RESOURCE_ATTRIBUTE_MAP object + in neutron/api/v2/attributes.py. Values in this dictiontary + are an iterable containing values that will be used for an exact + match comparison for that value. Each result returned by this + function will have matched one of the values for each key in + filters. + + NOTE: this method is optional, as it was not part of the originally + defined plugin API. + """ + LOG.info("get_subnets_count for tenant %s with filters %s" % + (context.tenant_id, filters)) + return db_api.subnet_count_all(context, **filters) + + +def _delete_subnet(context, subnet): + if subnet.allocated_ips: + raise exceptions.SubnetInUse(subnet_id=subnet["id"]) + db_api.subnet_delete(context, subnet) + + +def delete_subnet(context, id): + """Delete a subnet. + + : param context: neutron api request context + : param id: UUID representing the subnet to delete. + """ + LOG.info("delete_subnet %s for tenant %s" % (id, context.tenant_id)) + subnet = db_api.subnet_find(context, id=id, scope=db_api.ONE) + if not subnet: + raise exceptions.SubnetNotFound(subnet_id=id) + _delete_subnet(context, subnet) diff --git a/quark/tests/plugin_modules/test_subnets.py b/quark/tests/plugin_modules/test_subnets.py new file mode 100644 index 0000000..7033d30 --- /dev/null +++ b/quark/tests/plugin_modules/test_subnets.py @@ -0,0 +1,670 @@ +# Copyright 2013 Openstack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib +import copy +import uuid + +import mock +from neutron.api.v2 import attributes as neutron_attrs +from neutron.common import exceptions +from oslo.config import cfg + +from quark.db import models +from quark.tests import test_quark_plugin + + +class TestQuarkGetSubnetCount(test_quark_plugin.TestQuarkPlugin): + def test_get_subnet_count(self): + """This isn't really testable.""" + with mock.patch("quark.db.api.subnet_count_all"): + self.plugin.get_subnets_count(self.context, {}) + + +class TestQuarkGetSubnets(test_quark_plugin.TestQuarkPlugin): + @contextlib.contextmanager + def _stubs(self, subnets=None, routes=None): + if routes is None: + routes = [] + route_models = [] + for route in routes: + r = models.Route() + r.update(route) + route_models.append(r) + + if isinstance(subnets, list): + subnet_models = [] + for subnet in subnets: + s_dict = subnet.copy() + s_dict["routes"] = route_models + s = models.Subnet(network=models.Network()) + s.update(s_dict) + subnet_models.append(s) + elif subnets: + mod = models.Subnet(network=models.Network()) + mod.update(subnets) + mod["routes"] = route_models + subnet_models = mod + else: + subnet_models = None + + with mock.patch("quark.db.api.subnet_find") as subnet_find: + subnet_find.return_value = subnet_models + yield + + def test_subnets_list(self): + subnet_id = str(uuid.uuid4()) + route = dict(id=1, cidr="0.0.0.0/0", gateway="192.168.0.1") + + subnet = dict(id=subnet_id, network_id=1, name=subnet_id, + tenant_id=self.context.tenant_id, ip_version=4, + cidr="192.168.0.0/24", gateway_ip="192.168.0.1", + dns_nameservers=[], + enable_dhcp=None) + expected_route = dict(destination=route["cidr"], + nexthop=route["gateway"]) + + with self._stubs(subnets=[subnet], routes=[route]): + res = self.plugin.get_subnets(self.context, {}, {}) + # Compare routes separately + routes = res[0].pop("host_routes") + for key in subnet.keys(): + self.assertEqual(res[0][key], subnet[key]) + for key in expected_route.keys(): + self.assertEqual(routes[0][key], expected_route[key]) + + def test_subnet_show_fail(self): + with self._stubs(): + with self.assertRaises(exceptions.SubnetNotFound): + self.plugin.get_subnet(self.context, 1) + + def test_subnet_show(self): + subnet_id = str(uuid.uuid4()) + route = dict(id=1, cidr="0.0.0.0/0", gateway="192.168.0.1", + subnet_id=subnet_id) + + expected_route = dict(destination=route["cidr"], + nexthop=route["gateway"]) + + subnet = dict(id=subnet_id, network_id=1, name=subnet_id, + tenant_id=self.context.tenant_id, ip_version=4, + cidr="192.168.0.0/24", gateway_ip="192.168.0.1", + dns_nameservers=[], + enable_dhcp=None) + + with self._stubs(subnets=subnet, routes=[route]): + res = self.plugin.get_subnet(self.context, subnet_id) + + # Compare routes separately + routes = res.pop("host_routes") + for key in subnet.keys(): + self.assertEqual(res[key], subnet[key]) + for key in expected_route.keys(): + self.assertEqual(routes[0][key], expected_route[key]) + + +class TestQuarkCreateSubnetOverlapping(test_quark_plugin.TestQuarkPlugin): + @contextlib.contextmanager + def _stubs(self, subnets=None): + if subnets is None: + subnets = [] + subnet_models = [] + for subnet in subnets: + s = models.Subnet() + s.update(subnet) + subnet_models.append(s) + network = models.Network() + network.update(dict(id=1, subnets=subnet_models)) + with contextlib.nested( + mock.patch("quark.db.api.network_find"), + mock.patch("quark.db.api.subnet_find"), + mock.patch("quark.db.api.subnet_create") + ) as (net_find, subnet_find, subnet_create): + net_find.return_value = network + subnet_find.return_value = subnet_models + subnet_create.return_value = models.Subnet( + network=models.Network(), + cidr="192.168.1.1/24") + yield subnet_create + + def test_create_subnet_overlapping_true(self): + cfg.CONF.set_override('allow_overlapping_ips', True) + with self._stubs() as subnet_create: + s = dict(subnet=dict( + gateway_ip=neutron_attrs.ATTR_NOT_SPECIFIED, + dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED, + cidr="192.168.1.1/8", + network_id=1)) + self.plugin.create_subnet(self.context, s) + self.assertEqual(subnet_create.call_count, 1) + + def test_create_subnet_overlapping_false(self): + cfg.CONF.set_override('allow_overlapping_ips', False) + with self._stubs() as subnet_create: + s = dict(subnet=dict( + gateway_ip=neutron_attrs.ATTR_NOT_SPECIFIED, + dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED, + cidr="192.168.1.1/8", + network_id=1)) + self.plugin.create_subnet(self.context, s) + self.assertEqual(subnet_create.call_count, 1) + + def test_create_subnet_overlapping_conflict(self): + cfg.CONF.set_override('allow_overlapping_ips', False) + with self._stubs(subnets=[dict(cidr="192.168.10.1/24")]): + with self.assertRaises(exceptions.InvalidInput): + s = dict(subnet=dict(cidr="192.168.1.1/8", + network_id=1)) + self.plugin.create_subnet(self.context, s) + + +class TestQuarkCreateSubnetAllocationPools(test_quark_plugin.TestQuarkPlugin): + @contextlib.contextmanager + def _stubs(self, subnet): + s = models.Subnet(network=models.Network(id=1, subnets=[])) + s.update(subnet) + + with contextlib.nested( + mock.patch("quark.db.api.network_find"), + mock.patch("quark.db.api.subnet_find"), + mock.patch("quark.db.api.subnet_create") + ) as (net_find, subnet_find, subnet_create): + net_find.return_value = s["network"] + subnet_find.return_value = [] + subnet_create.return_value = s + yield subnet_create + + def test_create_subnet_allocation_pools_zero(self): + s = dict(subnet=dict( + cidr="192.168.1.1/24", + network_id=1)) + with self._stubs(s["subnet"]) as subnet_create: + resp = self.plugin.create_subnet(self.context, s) + self.assertEqual(subnet_create.call_count, 1) + self.assertEqual(resp["allocation_pools"], + [dict(start="192.168.1.0", end="192.168.1.255")]) + + def test_create_subnet_allocation_pools_one(self): + pools = [dict(start="192.168.1.10", end="192.168.1.20")] + s = dict(subnet=dict( + allocation_pools=pools, + cidr="192.168.1.1/24", + network_id=1)) + with self._stubs(s["subnet"]) as subnet_create: + resp = self.plugin.create_subnet(self.context, s) + self.assertEqual(subnet_create.call_count, 1) + self.assertEqual(resp["allocation_pools"], pools) + + def test_create_subnet_allocation_pools_two(self): + pools = [dict(start="192.168.1.10", end="192.168.1.20"), + dict(start="192.168.1.40", end="192.168.1.50")] + s = dict(subnet=dict( + allocation_pools=pools, + cidr="192.168.1.1/24", + network_id=1)) + with self._stubs(s["subnet"]) as subnet_create: + resp = self.plugin.create_subnet(self.context, s) + self.assertEqual(subnet_create.call_count, 1) + self.assertEqual(resp["allocation_pools"], pools) + + +# TODO(amir): Refactor the tests to test individual subnet attributes. +# * copy.deepcopy was necessary to maintain tests on keys, which is a bit ugly. +# * workaround is also in place for lame ATTR_NOT_SPECIFIED object() +class TestQuarkCreateSubnet(test_quark_plugin.TestQuarkPlugin): + @contextlib.contextmanager + def _stubs(self, subnet=None, network=None, routes=None, dns=None): + subnet_mod = models.Subnet(network=models.Network()) + dns_ips = subnet.pop("dns_nameservers", []) + host_routes = subnet.pop("host_routes", []) + subnet_mod.update(subnet) + subnet["dns_nameservers"] = dns_ips + subnet["host_routes"] = host_routes + routes = routes or [] + dns = dns or [] + route_models = [models.Route(**r) for r in routes] + dns_models = [models.DNSNameserver(**d) for d in dns] + + with contextlib.nested( + mock.patch("quark.db.api.subnet_create"), + mock.patch("quark.db.api.network_find"), + mock.patch("quark.db.api.dns_create"), + mock.patch("quark.db.api.route_create"), + ) as (subnet_create, net_find, dns_create, route_create): + subnet_create.return_value = subnet_mod + net_find.return_value = network + route_create.side_effect = route_models + dns_create.side_effect = dns_models + yield subnet_create, dns_create, route_create + + def test_create_subnet(self): + routes = [dict(cidr="0.0.0.0/0", gateway="0.0.0.0")] + subnet = dict( + subnet=dict(network_id=1, + tenant_id=self.context.tenant_id, ip_version=4, + cidr="172.16.0.0/24", gateway_ip="0.0.0.0", + dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED, + host_routes=neutron_attrs.ATTR_NOT_SPECIFIED, + enable_dhcp=None)) + network = dict(network_id=1) + with self._stubs( + subnet=subnet["subnet"], + network=network, + routes=routes + ) as (subnet_create, dns_create, route_create): + dns_nameservers = subnet["subnet"].pop("dns_nameservers") + host_routes = subnet["subnet"].pop("host_routes") + subnet_request = copy.deepcopy(subnet) + subnet_request["subnet"]["dns_nameservers"] = dns_nameservers + subnet_request["subnet"]["host_routes"] = host_routes + res = self.plugin.create_subnet(self.context, + subnet_request) + self.assertEqual(subnet_create.call_count, 1) + self.assertEqual(dns_create.call_count, 0) + self.assertEqual(route_create.call_count, 1) + for key in subnet["subnet"].keys(): + if key == "host_routes": + self.assertEqual(res[key][0]["destination"], "0.0.0.0/0") + self.assertEqual(res[key][0]["nexthop"], "0.0.0.0") + else: + self.assertEqual(res[key], subnet["subnet"][key]) + + def test_create_subnet_no_network_fails(self): + subnet = dict(subnet=dict(network_id=1)) + with self._stubs(subnet=dict(), network=None): + with self.assertRaises(exceptions.NetworkNotFound): + self.plugin.create_subnet(self.context, subnet) + + def test_create_subnet_no_gateway_ip_defaults(self): + routes = [dict(cidr="0.0.0.0/0", gateway="172.16.0.1")] + subnet = dict( + subnet=dict(network_id=1, + tenant_id=self.context.tenant_id, ip_version=4, + cidr="172.16.0.0/24", + gateway_ip=neutron_attrs.ATTR_NOT_SPECIFIED, + dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED, + enable_dhcp=None)) + network = dict(network_id=1) + with self._stubs( + subnet=subnet["subnet"], + network=network, + routes=routes + ) as (subnet_create, dns_create, route_create): + dns_nameservers = subnet["subnet"].pop("dns_nameservers") + gateway_ip = subnet["subnet"].pop("gateway_ip") + subnet_request = copy.deepcopy(subnet) + subnet_request["subnet"]["dns_nameservers"] = dns_nameservers + subnet_request["subnet"]["gateway_ip"] = gateway_ip + res = self.plugin.create_subnet(self.context, subnet_request) + self.assertEqual(subnet_create.call_count, 1) + self.assertEqual(dns_create.call_count, 0) + self.assertEqual(route_create.call_count, 1) + for key in subnet["subnet"].keys(): + if key == "gateway_ip": + self.assertEqual(res[key], "172.16.0.1") + elif key == "host_routes": + self.assertEqual(res[key][0]["destination"], "0.0.0.0/0") + self.assertEqual(res[key][0]["nexthop"], "172.16.0.1") + else: + self.assertEqual(res[key], subnet["subnet"][key]) + + def test_create_subnet_dns_nameservers(self): + routes = [dict(cidr="0.0.0.0/0", gateway="0.0.0.0")] + dns_ns = [dict(ip="4.2.2.1"), dict(ip="4.2.2.2")] + subnet = dict( + subnet=dict(network_id=1, + tenant_id=self.context.tenant_id, ip_version=4, + cidr="172.16.0.0/24", gateway_ip="0.0.0.0", + dns_nameservers=["4.2.2.1", "4.2.2.2"], + enable_dhcp=None)) + network = dict(network_id=1) + with self._stubs( + subnet=subnet["subnet"], + network=network, + routes=routes, + dns=dns_ns + ) as (subnet_create, dns_create, route_create): + res = self.plugin.create_subnet(self.context, + copy.deepcopy(subnet)) + self.assertEqual(subnet_create.call_count, 1) + self.assertEqual(dns_create.call_count, 2) + self.assertEqual(route_create.call_count, 1) + for key in subnet["subnet"].keys(): + if key == "host_routes": + self.assertEqual(res[key][0]["destination"], "0.0.0.0/0") + self.assertEqual(res[key][0]["nexthop"], "0.0.0.0") + else: + self.assertEqual(res[key], subnet["subnet"][key]) + + def test_create_subnet_routes(self): + routes = [dict(cidr="1.1.1.1/8", gateway="172.16.0.4"), + dict(cidr="0.0.0.0/0", gateway="0.0.0.0")] + subnet = dict( + subnet=dict(network_id=1, + tenant_id=self.context.tenant_id, ip_version=4, + cidr="172.16.0.0/24", gateway_ip="0.0.0.0", + dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED, + host_routes=[{"destination": "1.1.1.1/8", + "nexthop": "172.16.0.4"}], + enable_dhcp=None)) + network = dict(network_id=1) + with self._stubs( + subnet=subnet["subnet"], + network=network, + routes=routes + ) as (subnet_create, dns_create, route_create): + dns_nameservers = subnet["subnet"].pop("dns_nameservers") + subnet_request = copy.deepcopy(subnet) + subnet_request["subnet"]["dns_nameservers"] = dns_nameservers + res = self.plugin.create_subnet(self.context, subnet_request) + self.assertEqual(subnet_create.call_count, 1) + self.assertEqual(dns_create.call_count, 0) + self.assertEqual(route_create.call_count, 2) + for key in subnet["subnet"].keys(): + if key == "host_routes": + res_tuples = [(r["destination"], r["nexthop"]) + for r in res[key]] + self.assertIn(("1.1.1.1/8", "172.16.0.4"), res_tuples) + self.assertIn(("0.0.0.0/0", "0.0.0.0"), res_tuples) + self.assertEqual(2, len(res_tuples)) + else: + self.assertEqual(res[key], subnet["subnet"][key]) + + def test_create_subnet_default_route(self): + routes = [dict(cidr="0.0.0.0/0", gateway="172.16.0.4")] + subnet = dict( + subnet=dict(network_id=1, + tenant_id=self.context.tenant_id, ip_version=4, + cidr="172.16.0.0/24", + gateway_ip=neutron_attrs.ATTR_NOT_SPECIFIED, + dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED, + host_routes=[{"destination": "0.0.0.0/0", + "nexthop": "172.16.0.4"}], + enable_dhcp=None)) + network = dict(network_id=1) + with self._stubs( + subnet=subnet["subnet"], + network=network, + routes=routes + ) as (subnet_create, dns_create, route_create): + dns_nameservers = subnet["subnet"].pop("dns_nameservers") + gateway_ip = subnet["subnet"].pop("gateway_ip") + subnet_request = copy.deepcopy(subnet) + subnet_request["subnet"]["dns_nameservers"] = dns_nameservers + subnet_request["subnet"]["gateway_ip"] = gateway_ip + res = self.plugin.create_subnet(self.context, subnet_request) + self.assertEqual(subnet_create.call_count, 1) + self.assertEqual(dns_create.call_count, 0) + self.assertEqual(route_create.call_count, 1) + for key in subnet["subnet"].keys(): + if key == "host_routes": + res_tuples = [(r["destination"], r["nexthop"]) + for r in res[key]] + self.assertEqual([("0.0.0.0/0", "172.16.0.4")], res_tuples) + elif key == "gateway_ip": + self.assertEqual(res[key], "172.16.0.4") + else: + self.assertEqual(res[key], subnet["subnet"][key]) + + def test_create_subnet_default_route_gateway_ip(self): + """If default route (host_routes) and gateway_ip are both provided, + then host_route takes precedence. + """ + routes = [dict(cidr="0.0.0.0/0", gateway="172.16.0.4")] + subnet = dict( + subnet=dict(network_id=1, + tenant_id=self.context.tenant_id, ip_version=4, + cidr="172.16.0.0/24", + gateway_ip="172.16.0.3", + dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED, + host_routes=[{"destination": "0.0.0.0/0", + "nexthop": "172.16.0.4"}], + enable_dhcp=None)) + network = dict(network_id=1) + with self._stubs( + subnet=subnet["subnet"], + network=network, + routes=routes + ) as (subnet_create, dns_create, route_create): + dns_nameservers = subnet["subnet"].pop("dns_nameservers") + subnet_request = copy.deepcopy(subnet) + subnet_request["subnet"]["dns_nameservers"] = dns_nameservers + res = self.plugin.create_subnet(self.context, subnet_request) + self.assertEqual(subnet_create.call_count, 1) + self.assertEqual(dns_create.call_count, 0) + self.assertEqual(route_create.call_count, 1) + for key in subnet["subnet"].keys(): + if key == "host_routes": + res_tuples = [(r["destination"], r["nexthop"]) + for r in res[key]] + self.assertEqual([("0.0.0.0/0", "172.16.0.4")], res_tuples) + elif key == "gateway_ip": + self.assertEqual(res[key], "172.16.0.4") + else: + self.assertEqual(res[key], subnet["subnet"][key]) + + +class TestQuarkUpdateSubnet(test_quark_plugin.TestQuarkPlugin): + DEFAULT_ROUTE = [dict(destination="0.0.0.0/0", + nexthop="172.16.0.1")] + + @contextlib.contextmanager + def _stubs(self, host_routes=None, new_routes=None, find_routes=True, + new_dns_servers=None): + if host_routes is None: + host_routes = [] + if new_routes: + new_routes = [models.Route(cidr=r["destination"], + gateway=r["nexthop"], + subnet_id=1) + for r in new_routes] + if new_dns_servers: + new_dns_servers = [models.DNSNameserver( + ip=ip, + subnet_id=1) for ip in new_dns_servers] + + subnet = dict( + id=1, + network_id=1, + tenant_id=self.context.tenant_id, ip_version=4, + cidr="172.16.0.0/24", + host_routes=host_routes, + dns_nameservers=["4.2.2.1", "4.2.2.2"], + enable_dhcp=None) + + dns_ips = subnet.pop("dns_nameservers", []) + host_routes = subnet.pop("host_routes", []) + subnet_mod = models.Subnet() + + subnet_mod.update(subnet) + + subnet_mod["dns_nameservers"] = [models.DNSNameserver(ip=ip) + for ip in dns_ips] + subnet_mod["routes"] = [models.Route(cidr=r["destination"], + gateway=r["nexthop"], + subnet_id=subnet_mod["id"]) + for r in host_routes] + with contextlib.nested( + mock.patch("quark.db.api.subnet_find"), + mock.patch("quark.db.api.subnet_update"), + mock.patch("quark.db.api.dns_create"), + mock.patch("quark.db.api.route_find"), + mock.patch("quark.db.api.route_update"), + mock.patch("quark.db.api.route_create"), + ) as (subnet_find, subnet_update, + dns_create, + route_find, route_update, route_create): + subnet_find.return_value = subnet_mod + route_find.return_value = subnet_mod["routes"][0] \ + if subnet_mod["routes"] and find_routes else None + new_subnet_mod = models.Subnet(network=models.Network()) + new_subnet_mod.update(subnet_mod) + if new_routes: + new_subnet_mod["routes"] = new_routes + if new_dns_servers: + new_subnet_mod["dns_nameservers"] = new_dns_servers + subnet_update.return_value = new_subnet_mod + yield dns_create, route_update, route_create + + def test_update_subnet_not_found(self): + with self.assertRaises(exceptions.SubnetNotFound): + self.plugin.update_subnet(self.context, 1, {}) + + def test_update_subnet_dns_nameservers(self): + new_dns_servers = ["1.1.1.2"] + with self._stubs( + host_routes=self.DEFAULT_ROUTE, + new_dns_servers=new_dns_servers + ) as (dns_create, route_update, route_create): + req = dict(subnet=dict(dns_nameservers=new_dns_servers)) + res = self.plugin.update_subnet(self.context, + 1, + req) + self.assertEqual(dns_create.call_count, 1) + self.assertEqual(route_create.call_count, 0) + self.assertEqual(res["dns_nameservers"], new_dns_servers) + + def test_update_subnet_routes(self): + new_routes = [dict(destination="10.0.0.0/24", + nexthop="1.1.1.1")] + with self._stubs( + host_routes=self.DEFAULT_ROUTE, + new_routes=new_routes + ) as (dns_create, route_update, route_create): + req = dict(subnet=dict( + host_routes=new_routes)) + res = self.plugin.update_subnet(self.context, 1, req) + self.assertEqual(dns_create.call_count, 0) + self.assertEqual(route_create.call_count, 1) + self.assertEqual(len(res["host_routes"]), 1) + self.assertEqual(res["host_routes"][0]["destination"], + "10.0.0.0/24") + self.assertEqual(res["host_routes"][0]["nexthop"], + "1.1.1.1") + self.assertIsNone(res["gateway_ip"]) + + def test_update_subnet_gateway_ip_with_default_route_in_db(self): + with self._stubs( + host_routes=self.DEFAULT_ROUTE, + new_routes=[dict(destination="0.0.0.0/0", nexthop="1.2.3.4")] + ) as (dns_create, route_update, route_create): + req = dict(subnet=dict(gateway_ip="1.2.3.4")) + res = self.plugin.update_subnet(self.context, 1, req) + self.assertEqual(dns_create.call_count, 0) + self.assertEqual(route_create.call_count, 0) + self.assertEqual(route_update.call_count, 1) + self.assertEqual(len(res["host_routes"]), 1) + self.assertEqual(res["host_routes"][0]["destination"], + "0.0.0.0/0") + self.assertEqual(res["host_routes"][0]["nexthop"], + "1.2.3.4") + self.assertEqual(res["gateway_ip"], "1.2.3.4") + + def test_update_subnet_gateway_ip_with_non_default_route_in_db(self): + with self._stubs( + host_routes=[dict(destination="1.1.1.1/8", nexthop="9.9.9.9")], + find_routes=False, + new_routes=[dict(destination="1.1.1.1/8", nexthop="9.9.9.9"), + dict(destination="0.0.0.0/0", nexthop="1.2.3.4")] + ) as (dns_create, route_update, route_create): + req = dict(subnet=dict(gateway_ip="1.2.3.4")) + res = self.plugin.update_subnet(self.context, 1, req) + self.assertEqual(dns_create.call_count, 0) + self.assertEqual(route_create.call_count, 1) + + self.assertEqual(res["gateway_ip"], "1.2.3.4") + + self.assertEqual(len(res["host_routes"]), 2) + res_tuples = [(r["destination"], r["nexthop"]) + for r in res["host_routes"]] + self.assertIn(("0.0.0.0/0", "1.2.3.4"), res_tuples) + self.assertIn(("1.1.1.1/8", "9.9.9.9"), res_tuples) + + def test_update_subnet_gateway_ip_without_default_route_in_db(self): + with self._stubs( + host_routes=None, + new_routes=[dict(destination="0.0.0.0/0", nexthop="1.2.3.4")] + ) as (dns_create, route_update, route_create): + req = dict(subnet=dict(gateway_ip="1.2.3.4")) + res = self.plugin.update_subnet(self.context, 1, req) + self.assertEqual(dns_create.call_count, 0) + self.assertEqual(route_create.call_count, 1) + self.assertEqual(len(res["host_routes"]), 1) + self.assertEqual(res["host_routes"][0]["destination"], + "0.0.0.0/0") + self.assertEqual(res["host_routes"][0]["nexthop"], + "1.2.3.4") + self.assertEqual(res["gateway_ip"], "1.2.3.4") + + def test_update_subnet_gateway_ip_with_default_route_in_args(self): + new_routes = [dict(destination="0.0.0.0/0", + nexthop="4.3.2.1")] + with self._stubs( + host_routes=self.DEFAULT_ROUTE, + new_routes=new_routes + ) as (dns_create, route_update, route_create): + req = dict(subnet=dict( + host_routes=new_routes, + gateway_ip="1.2.3.4")) + res = self.plugin.update_subnet(self.context, 1, req) + self.assertEqual(dns_create.call_count, 0) + self.assertEqual(route_create.call_count, 1) + self.assertEqual(len(res["host_routes"]), 1) + self.assertEqual(res["host_routes"][0]["destination"], + "0.0.0.0/0") + self.assertEqual(res["host_routes"][0]["nexthop"], + "4.3.2.1") + self.assertEqual(res["gateway_ip"], "4.3.2.1") + + +class TestQuarkDeleteSubnet(test_quark_plugin.TestQuarkPlugin): + @contextlib.contextmanager + def _stubs(self, subnet, ips): + ip_mods = [] + subnet_mod = None + if subnet: + subnet_mod = models.Subnet() + subnet_mod.update(subnet) + for ip in ips: + ip_mod = models.IPAddress() + ip_mod.update(ip) + ip_mods.append(ip_mod) + + db_mod = "quark.db.api" + with contextlib.nested( + mock.patch("%s.subnet_find" % db_mod), + mock.patch("%s.subnet_delete" % db_mod) + ) as (sub_find, sub_delete): + if subnet_mod: + subnet_mod.allocated_ips = ip_mods + sub_find.return_value = subnet_mod + yield sub_delete + + def test_delete_subnet(self): + subnet = dict(id=1) + with self._stubs(subnet=subnet, ips=[]) as sub_delete: + self.plugin.delete_subnet(self.context, 1) + self.assertTrue(sub_delete.called) + + def test_delete_subnet_no_subnet_fails(self): + with self._stubs(subnet=None, ips=[]): + with self.assertRaises(exceptions.SubnetNotFound): + self.plugin.delete_subnet(self.context, 1) + + def test_delete_subnet_has_allocated_ips_fails(self): + subnet = dict(id=1) + with self._stubs(subnet=subnet, ips=[{}]): + with self.assertRaises(exceptions.SubnetInUse): + self.plugin.delete_subnet(self.context, 1) diff --git a/quark/tests/test_quark_plugin.py b/quark/tests/test_quark_plugin.py index 7343602..75bb76e 100644 --- a/quark/tests/test_quark_plugin.py +++ b/quark/tests/test_quark_plugin.py @@ -13,12 +13,9 @@ # License for the specific language governing permissions and limitations # under the License. -import uuid import contextlib -import copy import mock -from neutron.api.v2 import attributes as neutron_attrs from neutron.common import exceptions from neutron.db import api as db_api from oslo.config import cfg @@ -42,13 +39,6 @@ class TestQuarkPlugin(test_base.TestBase): db_api.clear_db() -class TestQuarkGetSubnetCount(TestQuarkPlugin): - def test_get_subnet_count(self): - """This isn't really testable.""" - with mock.patch("quark.db.api.subnet_count_all"): - self.plugin.get_subnets_count(self.context, {}) - - class TestQuarkAPIExtensions(TestQuarkPlugin): """Adds coverage for appending the API extension path.""" def test_append_quark_extensions(self): @@ -69,643 +59,6 @@ class TestQuarkAPIExtensions(TestQuarkPlugin): "apple:banana:carrot") -class TestQuarkGetSubnets(TestQuarkPlugin): - @contextlib.contextmanager - def _stubs(self, subnets=None, routes=None): - if routes is None: - routes = [] - route_models = [] - for route in routes: - r = models.Route() - r.update(route) - route_models.append(r) - - if isinstance(subnets, list): - subnet_models = [] - for subnet in subnets: - s_dict = subnet.copy() - s_dict["routes"] = route_models - s = models.Subnet(network=models.Network()) - s.update(s_dict) - subnet_models.append(s) - elif subnets: - mod = models.Subnet(network=models.Network()) - mod.update(subnets) - mod["routes"] = route_models - subnet_models = mod - else: - subnet_models = None - - with mock.patch("quark.db.api.subnet_find") as subnet_find: - subnet_find.return_value = subnet_models - yield - - def test_subnets_list(self): - subnet_id = str(uuid.uuid4()) - route = dict(id=1, cidr="0.0.0.0/0", gateway="192.168.0.1") - - subnet = dict(id=subnet_id, network_id=1, name=subnet_id, - tenant_id=self.context.tenant_id, ip_version=4, - cidr="192.168.0.0/24", gateway_ip="192.168.0.1", - dns_nameservers=[], - enable_dhcp=None) - expected_route = dict(destination=route["cidr"], - nexthop=route["gateway"]) - - with self._stubs(subnets=[subnet], routes=[route]): - res = self.plugin.get_subnets(self.context, {}, {}) - # Compare routes separately - routes = res[0].pop("host_routes") - for key in subnet.keys(): - self.assertEqual(res[0][key], subnet[key]) - for key in expected_route.keys(): - self.assertEqual(routes[0][key], expected_route[key]) - - def test_subnet_show_fail(self): - with self._stubs(): - with self.assertRaises(exceptions.SubnetNotFound): - self.plugin.get_subnet(self.context, 1) - - def test_subnet_show(self): - subnet_id = str(uuid.uuid4()) - route = dict(id=1, cidr="0.0.0.0/0", gateway="192.168.0.1", - subnet_id=subnet_id) - - expected_route = dict(destination=route["cidr"], - nexthop=route["gateway"]) - - subnet = dict(id=subnet_id, network_id=1, name=subnet_id, - tenant_id=self.context.tenant_id, ip_version=4, - cidr="192.168.0.0/24", gateway_ip="192.168.0.1", - dns_nameservers=[], - enable_dhcp=None) - - with self._stubs(subnets=subnet, routes=[route]): - res = self.plugin.get_subnet(self.context, subnet_id) - - # Compare routes separately - routes = res.pop("host_routes") - for key in subnet.keys(): - self.assertEqual(res[key], subnet[key]) - for key in expected_route.keys(): - self.assertEqual(routes[0][key], expected_route[key]) - - -class TestQuarkCreateSubnetOverlapping(TestQuarkPlugin): - @contextlib.contextmanager - def _stubs(self, subnets=None): - if subnets is None: - subnets = [] - subnet_models = [] - for subnet in subnets: - s = models.Subnet() - s.update(subnet) - subnet_models.append(s) - network = models.Network() - network.update(dict(id=1, subnets=subnet_models)) - with contextlib.nested( - mock.patch("quark.db.api.network_find"), - mock.patch("quark.db.api.subnet_find"), - mock.patch("quark.db.api.subnet_create") - ) as (net_find, subnet_find, subnet_create): - net_find.return_value = network - subnet_find.return_value = subnet_models - subnet_create.return_value = models.Subnet( - network=models.Network(), - cidr="192.168.1.1/24") - yield subnet_create - - def test_create_subnet_overlapping_true(self): - cfg.CONF.set_override('allow_overlapping_ips', True) - with self._stubs() as subnet_create: - s = dict(subnet=dict( - gateway_ip=neutron_attrs.ATTR_NOT_SPECIFIED, - dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED, - cidr="192.168.1.1/8", - network_id=1)) - self.plugin.create_subnet(self.context, s) - self.assertEqual(subnet_create.call_count, 1) - - def test_create_subnet_overlapping_false(self): - cfg.CONF.set_override('allow_overlapping_ips', False) - with self._stubs() as subnet_create: - s = dict(subnet=dict( - gateway_ip=neutron_attrs.ATTR_NOT_SPECIFIED, - dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED, - cidr="192.168.1.1/8", - network_id=1)) - self.plugin.create_subnet(self.context, s) - self.assertEqual(subnet_create.call_count, 1) - - def test_create_subnet_overlapping_conflict(self): - cfg.CONF.set_override('allow_overlapping_ips', False) - with self._stubs(subnets=[dict(cidr="192.168.10.1/24")]): - with self.assertRaises(exceptions.InvalidInput): - s = dict(subnet=dict(cidr="192.168.1.1/8", - network_id=1)) - self.plugin.create_subnet(self.context, s) - - -class TestQuarkCreateSubnetAllocationPools(TestQuarkPlugin): - @contextlib.contextmanager - def _stubs(self, subnet): - s = models.Subnet(network=models.Network(id=1, subnets=[])) - s.update(subnet) - - with contextlib.nested( - mock.patch("quark.db.api.network_find"), - mock.patch("quark.db.api.subnet_find"), - mock.patch("quark.db.api.subnet_create") - ) as (net_find, subnet_find, subnet_create): - net_find.return_value = s["network"] - subnet_find.return_value = [] - subnet_create.return_value = s - yield subnet_create - - def test_create_subnet_allocation_pools_zero(self): - s = dict(subnet=dict( - cidr="192.168.1.1/24", - network_id=1)) - with self._stubs(s["subnet"]) as subnet_create: - resp = self.plugin.create_subnet(self.context, s) - self.assertEqual(subnet_create.call_count, 1) - self.assertEqual(resp["allocation_pools"], - [dict(start="192.168.1.0", end="192.168.1.255")]) - - def test_create_subnet_allocation_pools_one(self): - pools = [dict(start="192.168.1.10", end="192.168.1.20")] - s = dict(subnet=dict( - allocation_pools=pools, - cidr="192.168.1.1/24", - network_id=1)) - with self._stubs(s["subnet"]) as subnet_create: - resp = self.plugin.create_subnet(self.context, s) - self.assertEqual(subnet_create.call_count, 1) - self.assertEqual(resp["allocation_pools"], pools) - - def test_create_subnet_allocation_pools_two(self): - pools = [dict(start="192.168.1.10", end="192.168.1.20"), - dict(start="192.168.1.40", end="192.168.1.50")] - s = dict(subnet=dict( - allocation_pools=pools, - cidr="192.168.1.1/24", - network_id=1)) - with self._stubs(s["subnet"]) as subnet_create: - resp = self.plugin.create_subnet(self.context, s) - self.assertEqual(subnet_create.call_count, 1) - self.assertEqual(resp["allocation_pools"], pools) - - -# TODO(amir): Refactor the tests to test individual subnet attributes. -# * copy.deepcopy was necessary to maintain tests on keys, which is a bit ugly. -# * workaround is also in place for lame ATTR_NOT_SPECIFIED object() -class TestQuarkCreateSubnet(TestQuarkPlugin): - @contextlib.contextmanager - def _stubs(self, subnet=None, network=None, routes=None, dns=None): - subnet_mod = models.Subnet(network=models.Network()) - dns_ips = subnet.pop("dns_nameservers", []) - host_routes = subnet.pop("host_routes", []) - subnet_mod.update(subnet) - subnet["dns_nameservers"] = dns_ips - subnet["host_routes"] = host_routes - routes = routes or [] - dns = dns or [] - route_models = [models.Route(**r) for r in routes] - dns_models = [models.DNSNameserver(**d) for d in dns] - - with contextlib.nested( - mock.patch("quark.db.api.subnet_create"), - mock.patch("quark.db.api.network_find"), - mock.patch("quark.db.api.dns_create"), - mock.patch("quark.db.api.route_create"), - ) as (subnet_create, net_find, dns_create, route_create): - subnet_create.return_value = subnet_mod - net_find.return_value = network - route_create.side_effect = route_models - dns_create.side_effect = dns_models - yield subnet_create, dns_create, route_create - - def test_create_subnet(self): - routes = [dict(cidr="0.0.0.0/0", gateway="0.0.0.0")] - subnet = dict( - subnet=dict(network_id=1, - tenant_id=self.context.tenant_id, ip_version=4, - cidr="172.16.0.0/24", gateway_ip="0.0.0.0", - dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED, - host_routes=neutron_attrs.ATTR_NOT_SPECIFIED, - enable_dhcp=None)) - network = dict(network_id=1) - with self._stubs( - subnet=subnet["subnet"], - network=network, - routes=routes - ) as (subnet_create, dns_create, route_create): - dns_nameservers = subnet["subnet"].pop("dns_nameservers") - host_routes = subnet["subnet"].pop("host_routes") - subnet_request = copy.deepcopy(subnet) - subnet_request["subnet"]["dns_nameservers"] = dns_nameservers - subnet_request["subnet"]["host_routes"] = host_routes - res = self.plugin.create_subnet(self.context, - subnet_request) - self.assertEqual(subnet_create.call_count, 1) - self.assertEqual(dns_create.call_count, 0) - self.assertEqual(route_create.call_count, 1) - for key in subnet["subnet"].keys(): - if key == "host_routes": - self.assertEqual(res[key][0]["destination"], "0.0.0.0/0") - self.assertEqual(res[key][0]["nexthop"], "0.0.0.0") - else: - self.assertEqual(res[key], subnet["subnet"][key]) - - def test_create_subnet_no_network_fails(self): - subnet = dict(subnet=dict(network_id=1)) - with self._stubs(subnet=dict(), network=None): - with self.assertRaises(exceptions.NetworkNotFound): - self.plugin.create_subnet(self.context, subnet) - - def test_create_subnet_no_gateway_ip_defaults(self): - routes = [dict(cidr="0.0.0.0/0", gateway="172.16.0.1")] - subnet = dict( - subnet=dict(network_id=1, - tenant_id=self.context.tenant_id, ip_version=4, - cidr="172.16.0.0/24", - gateway_ip=neutron_attrs.ATTR_NOT_SPECIFIED, - dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED, - enable_dhcp=None)) - network = dict(network_id=1) - with self._stubs( - subnet=subnet["subnet"], - network=network, - routes=routes - ) as (subnet_create, dns_create, route_create): - dns_nameservers = subnet["subnet"].pop("dns_nameservers") - gateway_ip = subnet["subnet"].pop("gateway_ip") - subnet_request = copy.deepcopy(subnet) - subnet_request["subnet"]["dns_nameservers"] = dns_nameservers - subnet_request["subnet"]["gateway_ip"] = gateway_ip - res = self.plugin.create_subnet(self.context, subnet_request) - self.assertEqual(subnet_create.call_count, 1) - self.assertEqual(dns_create.call_count, 0) - self.assertEqual(route_create.call_count, 1) - for key in subnet["subnet"].keys(): - if key == "gateway_ip": - self.assertEqual(res[key], "172.16.0.1") - elif key == "host_routes": - self.assertEqual(res[key][0]["destination"], "0.0.0.0/0") - self.assertEqual(res[key][0]["nexthop"], "172.16.0.1") - else: - self.assertEqual(res[key], subnet["subnet"][key]) - - def test_create_subnet_dns_nameservers(self): - routes = [dict(cidr="0.0.0.0/0", gateway="0.0.0.0")] - dns_ns = [dict(ip="4.2.2.1"), dict(ip="4.2.2.2")] - subnet = dict( - subnet=dict(network_id=1, - tenant_id=self.context.tenant_id, ip_version=4, - cidr="172.16.0.0/24", gateway_ip="0.0.0.0", - dns_nameservers=["4.2.2.1", "4.2.2.2"], - enable_dhcp=None)) - network = dict(network_id=1) - with self._stubs( - subnet=subnet["subnet"], - network=network, - routes=routes, - dns=dns_ns - ) as (subnet_create, dns_create, route_create): - res = self.plugin.create_subnet(self.context, - copy.deepcopy(subnet)) - self.assertEqual(subnet_create.call_count, 1) - self.assertEqual(dns_create.call_count, 2) - self.assertEqual(route_create.call_count, 1) - for key in subnet["subnet"].keys(): - if key == "host_routes": - self.assertEqual(res[key][0]["destination"], "0.0.0.0/0") - self.assertEqual(res[key][0]["nexthop"], "0.0.0.0") - else: - self.assertEqual(res[key], subnet["subnet"][key]) - - def test_create_subnet_routes(self): - routes = [dict(cidr="1.1.1.1/8", gateway="172.16.0.4"), - dict(cidr="0.0.0.0/0", gateway="0.0.0.0")] - subnet = dict( - subnet=dict(network_id=1, - tenant_id=self.context.tenant_id, ip_version=4, - cidr="172.16.0.0/24", gateway_ip="0.0.0.0", - dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED, - host_routes=[{"destination": "1.1.1.1/8", - "nexthop": "172.16.0.4"}], - enable_dhcp=None)) - network = dict(network_id=1) - with self._stubs( - subnet=subnet["subnet"], - network=network, - routes=routes - ) as (subnet_create, dns_create, route_create): - dns_nameservers = subnet["subnet"].pop("dns_nameservers") - subnet_request = copy.deepcopy(subnet) - subnet_request["subnet"]["dns_nameservers"] = dns_nameservers - res = self.plugin.create_subnet(self.context, subnet_request) - self.assertEqual(subnet_create.call_count, 1) - self.assertEqual(dns_create.call_count, 0) - self.assertEqual(route_create.call_count, 2) - for key in subnet["subnet"].keys(): - if key == "host_routes": - res_tuples = [(r["destination"], r["nexthop"]) - for r in res[key]] - self.assertIn(("1.1.1.1/8", "172.16.0.4"), res_tuples) - self.assertIn(("0.0.0.0/0", "0.0.0.0"), res_tuples) - self.assertEqual(2, len(res_tuples)) - else: - self.assertEqual(res[key], subnet["subnet"][key]) - - def test_create_subnet_default_route(self): - routes = [dict(cidr="0.0.0.0/0", gateway="172.16.0.4")] - subnet = dict( - subnet=dict(network_id=1, - tenant_id=self.context.tenant_id, ip_version=4, - cidr="172.16.0.0/24", - gateway_ip=neutron_attrs.ATTR_NOT_SPECIFIED, - dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED, - host_routes=[{"destination": "0.0.0.0/0", - "nexthop": "172.16.0.4"}], - enable_dhcp=None)) - network = dict(network_id=1) - with self._stubs( - subnet=subnet["subnet"], - network=network, - routes=routes - ) as (subnet_create, dns_create, route_create): - dns_nameservers = subnet["subnet"].pop("dns_nameservers") - gateway_ip = subnet["subnet"].pop("gateway_ip") - subnet_request = copy.deepcopy(subnet) - subnet_request["subnet"]["dns_nameservers"] = dns_nameservers - subnet_request["subnet"]["gateway_ip"] = gateway_ip - res = self.plugin.create_subnet(self.context, subnet_request) - self.assertEqual(subnet_create.call_count, 1) - self.assertEqual(dns_create.call_count, 0) - self.assertEqual(route_create.call_count, 1) - for key in subnet["subnet"].keys(): - if key == "host_routes": - res_tuples = [(r["destination"], r["nexthop"]) - for r in res[key]] - self.assertEqual([("0.0.0.0/0", "172.16.0.4")], res_tuples) - elif key == "gateway_ip": - self.assertEqual(res[key], "172.16.0.4") - else: - self.assertEqual(res[key], subnet["subnet"][key]) - - def test_create_subnet_default_route_gateway_ip(self): - """If default route (host_routes) and gateway_ip are both provided, - then host_route takes precedence. - """ - routes = [dict(cidr="0.0.0.0/0", gateway="172.16.0.4")] - subnet = dict( - subnet=dict(network_id=1, - tenant_id=self.context.tenant_id, ip_version=4, - cidr="172.16.0.0/24", - gateway_ip="172.16.0.3", - dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED, - host_routes=[{"destination": "0.0.0.0/0", - "nexthop": "172.16.0.4"}], - enable_dhcp=None)) - network = dict(network_id=1) - with self._stubs( - subnet=subnet["subnet"], - network=network, - routes=routes - ) as (subnet_create, dns_create, route_create): - dns_nameservers = subnet["subnet"].pop("dns_nameservers") - subnet_request = copy.deepcopy(subnet) - subnet_request["subnet"]["dns_nameservers"] = dns_nameservers - res = self.plugin.create_subnet(self.context, subnet_request) - self.assertEqual(subnet_create.call_count, 1) - self.assertEqual(dns_create.call_count, 0) - self.assertEqual(route_create.call_count, 1) - for key in subnet["subnet"].keys(): - if key == "host_routes": - res_tuples = [(r["destination"], r["nexthop"]) - for r in res[key]] - self.assertEqual([("0.0.0.0/0", "172.16.0.4")], res_tuples) - elif key == "gateway_ip": - self.assertEqual(res[key], "172.16.0.4") - else: - self.assertEqual(res[key], subnet["subnet"][key]) - - -class TestQuarkUpdateSubnet(TestQuarkPlugin): - DEFAULT_ROUTE = [dict(destination="0.0.0.0/0", - nexthop="172.16.0.1")] - - @contextlib.contextmanager - def _stubs(self, host_routes=None, new_routes=None, find_routes=True, - new_dns_servers=None): - if host_routes is None: - host_routes = [] - if new_routes: - new_routes = [models.Route(cidr=r["destination"], - gateway=r["nexthop"], - subnet_id=1) - for r in new_routes] - if new_dns_servers: - new_dns_servers = [models.DNSNameserver( - ip=ip, - subnet_id=1) for ip in new_dns_servers] - - subnet = dict( - id=1, - network_id=1, - tenant_id=self.context.tenant_id, ip_version=4, - cidr="172.16.0.0/24", - host_routes=host_routes, - dns_nameservers=["4.2.2.1", "4.2.2.2"], - enable_dhcp=None) - - dns_ips = subnet.pop("dns_nameservers", []) - host_routes = subnet.pop("host_routes", []) - subnet_mod = models.Subnet() - - subnet_mod.update(subnet) - - subnet_mod["dns_nameservers"] = [models.DNSNameserver(ip=ip) - for ip in dns_ips] - subnet_mod["routes"] = [models.Route(cidr=r["destination"], - gateway=r["nexthop"], - subnet_id=subnet_mod["id"]) - for r in host_routes] - with contextlib.nested( - mock.patch("quark.db.api.subnet_find"), - mock.patch("quark.db.api.subnet_update"), - mock.patch("quark.db.api.dns_create"), - mock.patch("quark.db.api.route_find"), - mock.patch("quark.db.api.route_update"), - mock.patch("quark.db.api.route_create"), - ) as (subnet_find, subnet_update, - dns_create, - route_find, route_update, route_create): - subnet_find.return_value = subnet_mod - route_find.return_value = subnet_mod["routes"][0] \ - if subnet_mod["routes"] and find_routes else None - new_subnet_mod = models.Subnet(network=models.Network()) - new_subnet_mod.update(subnet_mod) - if new_routes: - new_subnet_mod["routes"] = new_routes - if new_dns_servers: - new_subnet_mod["dns_nameservers"] = new_dns_servers - subnet_update.return_value = new_subnet_mod - yield dns_create, route_update, route_create - - def test_update_subnet_not_found(self): - with self.assertRaises(exceptions.SubnetNotFound): - self.plugin.update_subnet(self.context, 1, {}) - - def test_update_subnet_dns_nameservers(self): - new_dns_servers = ["1.1.1.2"] - with self._stubs( - host_routes=self.DEFAULT_ROUTE, - new_dns_servers=new_dns_servers - ) as (dns_create, route_update, route_create): - req = dict(subnet=dict(dns_nameservers=new_dns_servers)) - res = self.plugin.update_subnet(self.context, - 1, - req) - self.assertEqual(dns_create.call_count, 1) - self.assertEqual(route_create.call_count, 0) - self.assertEqual(res["dns_nameservers"], new_dns_servers) - - def test_update_subnet_routes(self): - new_routes = [dict(destination="10.0.0.0/24", - nexthop="1.1.1.1")] - with self._stubs( - host_routes=self.DEFAULT_ROUTE, - new_routes=new_routes - ) as (dns_create, route_update, route_create): - req = dict(subnet=dict( - host_routes=new_routes)) - res = self.plugin.update_subnet(self.context, 1, req) - self.assertEqual(dns_create.call_count, 0) - self.assertEqual(route_create.call_count, 1) - self.assertEqual(len(res["host_routes"]), 1) - self.assertEqual(res["host_routes"][0]["destination"], - "10.0.0.0/24") - self.assertEqual(res["host_routes"][0]["nexthop"], - "1.1.1.1") - self.assertIsNone(res["gateway_ip"]) - - def test_update_subnet_gateway_ip_with_default_route_in_db(self): - with self._stubs( - host_routes=self.DEFAULT_ROUTE, - new_routes=[dict(destination="0.0.0.0/0", nexthop="1.2.3.4")] - ) as (dns_create, route_update, route_create): - req = dict(subnet=dict(gateway_ip="1.2.3.4")) - res = self.plugin.update_subnet(self.context, 1, req) - self.assertEqual(dns_create.call_count, 0) - self.assertEqual(route_create.call_count, 0) - self.assertEqual(route_update.call_count, 1) - self.assertEqual(len(res["host_routes"]), 1) - self.assertEqual(res["host_routes"][0]["destination"], - "0.0.0.0/0") - self.assertEqual(res["host_routes"][0]["nexthop"], - "1.2.3.4") - self.assertEqual(res["gateway_ip"], "1.2.3.4") - - def test_update_subnet_gateway_ip_with_non_default_route_in_db(self): - with self._stubs( - host_routes=[dict(destination="1.1.1.1/8", nexthop="9.9.9.9")], - find_routes=False, - new_routes=[dict(destination="1.1.1.1/8", nexthop="9.9.9.9"), - dict(destination="0.0.0.0/0", nexthop="1.2.3.4")] - ) as (dns_create, route_update, route_create): - req = dict(subnet=dict(gateway_ip="1.2.3.4")) - res = self.plugin.update_subnet(self.context, 1, req) - self.assertEqual(dns_create.call_count, 0) - self.assertEqual(route_create.call_count, 1) - - self.assertEqual(res["gateway_ip"], "1.2.3.4") - - self.assertEqual(len(res["host_routes"]), 2) - res_tuples = [(r["destination"], r["nexthop"]) - for r in res["host_routes"]] - self.assertIn(("0.0.0.0/0", "1.2.3.4"), res_tuples) - self.assertIn(("1.1.1.1/8", "9.9.9.9"), res_tuples) - - def test_update_subnet_gateway_ip_without_default_route_in_db(self): - with self._stubs( - host_routes=None, - new_routes=[dict(destination="0.0.0.0/0", nexthop="1.2.3.4")] - ) as (dns_create, route_update, route_create): - req = dict(subnet=dict(gateway_ip="1.2.3.4")) - res = self.plugin.update_subnet(self.context, 1, req) - self.assertEqual(dns_create.call_count, 0) - self.assertEqual(route_create.call_count, 1) - self.assertEqual(len(res["host_routes"]), 1) - self.assertEqual(res["host_routes"][0]["destination"], - "0.0.0.0/0") - self.assertEqual(res["host_routes"][0]["nexthop"], - "1.2.3.4") - self.assertEqual(res["gateway_ip"], "1.2.3.4") - - def test_update_subnet_gateway_ip_with_default_route_in_args(self): - new_routes = [dict(destination="0.0.0.0/0", - nexthop="4.3.2.1")] - with self._stubs( - host_routes=self.DEFAULT_ROUTE, - new_routes=new_routes - ) as (dns_create, route_update, route_create): - req = dict(subnet=dict( - host_routes=new_routes, - gateway_ip="1.2.3.4")) - res = self.plugin.update_subnet(self.context, 1, req) - self.assertEqual(dns_create.call_count, 0) - self.assertEqual(route_create.call_count, 1) - self.assertEqual(len(res["host_routes"]), 1) - self.assertEqual(res["host_routes"][0]["destination"], - "0.0.0.0/0") - self.assertEqual(res["host_routes"][0]["nexthop"], - "4.3.2.1") - self.assertEqual(res["gateway_ip"], "4.3.2.1") - - -class TestQuarkDeleteSubnet(TestQuarkPlugin): - @contextlib.contextmanager - def _stubs(self, subnet, ips): - ip_mods = [] - subnet_mod = None - if subnet: - subnet_mod = models.Subnet() - subnet_mod.update(subnet) - for ip in ips: - ip_mod = models.IPAddress() - ip_mod.update(ip) - ip_mods.append(ip_mod) - - db_mod = "quark.db.api" - with contextlib.nested( - mock.patch("%s.subnet_find" % db_mod), - mock.patch("%s.subnet_delete" % db_mod) - ) as (sub_find, sub_delete): - if subnet_mod: - subnet_mod.allocated_ips = ip_mods - sub_find.return_value = subnet_mod - yield sub_delete - - def test_delete_subnet(self): - subnet = dict(id=1) - with self._stubs(subnet=subnet, ips=[]) as sub_delete: - self.plugin.delete_subnet(self.context, 1) - self.assertTrue(sub_delete.called) - - def test_delete_subnet_no_subnet_fails(self): - with self._stubs(subnet=None, ips=[]): - with self.assertRaises(exceptions.SubnetNotFound): - self.plugin.delete_subnet(self.context, 1) - - def test_delete_subnet_has_allocated_ips_fails(self): - subnet = dict(id=1) - with self._stubs(subnet=subnet, ips=[{}]): - with self.assertRaises(exceptions.SubnetInUse): - self.plugin.delete_subnet(self.context, 1) - - class TestQuarkGetNetworks(TestQuarkPlugin): @contextlib.contextmanager def _stubs(self, nets=None, subnets=None):