Moves subnets into a submodule

Refactors plugin.py by moving subnets functions and their associated
tests into submodules for better plugin readability.
This commit is contained in:
Matt Dietz
2013-07-23 21:11:05 +00:00
parent 726db17775
commit e5ad33c428
4 changed files with 973 additions and 891 deletions

View File

@@ -16,14 +16,11 @@
""" """
v2 Neutron Plug-in API Quark Implementation v2 Neutron Plug-in API Quark Implementation
""" """
import netaddr
from oslo.config import cfg from oslo.config import cfg
from sqlalchemy.orm import sessionmaker, scoped_session from sqlalchemy.orm import sessionmaker, scoped_session
from zope import sqlalchemy as zsa from zope import sqlalchemy as zsa
#FIXME(mdietz): remove once all resources have moved into submods
from neutron.common import config as neutron_cfg
from neutron.common import exceptions from neutron.common import exceptions
from neutron.db import api as neutron_db_api from neutron.db import api as neutron_db_api
from neutron.extensions import providernet as pnet from neutron.extensions import providernet as pnet
@@ -45,6 +42,7 @@ from quark.plugin_modules import mac_address_ranges
from quark.plugin_modules import ports from quark.plugin_modules import ports
from quark.plugin_modules import routes from quark.plugin_modules import routes
from quark.plugin_modules import security_groups from quark.plugin_modules import security_groups
from quark.plugin_modules import subnets
from quark import plugin_views as v from quark import plugin_views as v
from quark import utils from quark import utils
@@ -88,244 +86,6 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
self.ipam_reuse_after = CONF.QUARK.ipam_reuse_after self.ipam_reuse_after = CONF.QUARK.ipam_reuse_after
neutron_db_api.register_models(base=models.BASEV2) neutron_db_api.register_models(base=models.BASEV2)
def _validate_subnet_cidr(self, context, network_id, new_subnet_cidr):
"""Validate the CIDR for a subnet.
Verifies the specified CIDR does not overlap with the ones defined
for the other subnets specified for this network, or with any other
CIDR if overlapping IPs are disabled.
"""
if neutron_cfg.cfg.CONF.allow_overlapping_ips:
return
new_subnet_ipset = netaddr.IPSet([new_subnet_cidr])
# Using admin context here, in case we actually share networks later
subnet_list = db_api.subnet_find(context.elevated(),
network_id=network_id)
for subnet in subnet_list:
if (netaddr.IPSet([subnet.cidr]) & new_subnet_ipset):
# don't give out details of the overlapping subnet
err_msg = (_("Requested subnet with cidr: %(cidr)s for "
"network: %(network_id)s overlaps with another "
"subnet") %
{'cidr': new_subnet_cidr,
'network_id': network_id})
LOG.error(_("Validation for CIDR: %(new_cidr)s failed - "
"overlaps with subnet %(subnet_id)s "
"(CIDR: %(cidr)s)"),
{'new_cidr': new_subnet_cidr,
'subnet_id': subnet.id,
'cidr': subnet.cidr})
raise exceptions.InvalidInput(error_message=err_msg)
def create_subnet(self, context, subnet):
"""Create a subnet.
Create a subnet which represents a range of IP addresses
that can be allocated to devices
: param context: neutron api request context
: param subnet: dictionary describing the subnet, with keys
as listed in the RESOURCE_ATTRIBUTE_MAP object in
neutron/api/v2/attributes.py. All keys will be populated.
"""
LOG.info("create_subnet for tenant %s" % context.tenant_id)
net_id = subnet["subnet"]["network_id"]
net = db_api.network_find(context, id=net_id, scope=db_api.ONE)
if not net:
raise exceptions.NetworkNotFound(net_id=net_id)
sub_attrs = subnet["subnet"]
self._validate_subnet_cidr(context, net_id, sub_attrs["cidr"])
cidr = netaddr.IPNetwork(sub_attrs["cidr"])
gateway_ip = utils.pop_param(sub_attrs, "gateway_ip", str(cidr[1]))
dns_ips = utils.pop_param(sub_attrs, "dns_nameservers", [])
host_routes = utils.pop_param(sub_attrs, "host_routes", [])
allocation_pools = utils.pop_param(sub_attrs, "allocation_pools", [])
new_subnet = db_api.subnet_create(context, **sub_attrs)
default_route = None
for route in host_routes:
netaddr_route = netaddr.IPNetwork(route["destination"])
if netaddr_route.value == routes.DEFAULT_ROUTE.value:
default_route = route
gateway_ip = default_route["nexthop"]
new_subnet["routes"].append(db_api.route_create(
context, cidr=route["destination"], gateway=route["nexthop"]))
if default_route is None:
new_subnet["routes"].append(db_api.route_create(
context, cidr=str(routes.DEFAULT_ROUTE), gateway=gateway_ip))
for dns_ip in dns_ips:
new_subnet["dns_nameservers"].append(db_api.dns_create(
context, ip=netaddr.IPAddress(dns_ip)))
if allocation_pools:
exclude = netaddr.IPSet([cidr])
for p in allocation_pools:
x = netaddr.IPSet(netaddr.IPRange(p["start"], p["end"]))
exclude = exclude - x
new_subnet["ip_policy"] = db_api.ip_policy_create(context,
exclude=exclude)
# HACK(amir): force backref for ip_policy
if not new_subnet["network"]:
new_subnet["network"] = net
subnet_dict = v._make_subnet_dict(new_subnet,
default_route=routes.DEFAULT_ROUTE)
subnet_dict["gateway_ip"] = gateway_ip
return subnet_dict
def update_subnet(self, context, id, subnet):
"""Update values of a subnet.
: param context: neutron api request context
: param id: UUID representing the subnet to update.
: param subnet: dictionary with keys indicating fields to update.
valid keys are those that have a value of True for 'allow_put'
as listed in the RESOURCE_ATTRIBUTE_MAP object in
neutron/api/v2/attributes.py.
"""
LOG.info("update_subnet %s for tenant %s" %
(id, context.tenant_id))
subnet_db = db_api.subnet_find(context, id=id, scope=db_api.ONE)
if not subnet_db:
raise exceptions.SubnetNotFound(id=id)
s = subnet["subnet"]
dns_ips = s.pop("dns_nameservers", [])
host_routes = s.pop("host_routes", [])
gateway_ip = s.pop("gateway_ip", None)
if gateway_ip:
default_route = None
for route in host_routes:
netaddr_route = netaddr.IPNetwork(route["destination"])
if netaddr_route.value == routes.DEFAULT_ROUTE.value:
default_route = route
break
if default_route is None:
route_model = db_api.route_find(
context, cidr=str(routes.DEFAULT_ROUTE), subnet_id=id,
scope=db_api.ONE)
if route_model:
db_api.route_update(context, route_model,
gateway=gateway_ip)
else:
db_api.route_create(context,
cidr=str(routes.DEFAULT_ROUTE),
gateway=gateway_ip, subnet_id=id)
if dns_ips:
subnet_db["dns_nameservers"] = []
for dns_ip in dns_ips:
subnet_db["dns_nameservers"].append(db_api.dns_create(
context,
ip=netaddr.IPAddress(dns_ip)))
if host_routes:
subnet_db["routes"] = []
for route in host_routes:
subnet_db["routes"].append(db_api.route_create(
context, cidr=route["destination"], gateway=route["nexthop"]))
subnet = db_api.subnet_update(context, subnet_db, **s)
return v._make_subnet_dict(subnet, default_route=routes.DEFAULT_ROUTE)
def get_subnet(self, context, id, fields=None):
"""Retrieve a subnet.
: param context: neutron api request context
: param id: UUID representing the subnet to fetch.
: param fields: a list of strings that are valid keys in a
subnet dictionary as listed in the RESOURCE_ATTRIBUTE_MAP
object in neutron/api/v2/attributes.py. Only these fields
will be returned.
"""
LOG.info("get_subnet %s for tenant %s with fields %s" %
(id, context.tenant_id, fields))
subnet = db_api.subnet_find(context, id=id, scope=db_api.ONE)
if not subnet:
raise exceptions.SubnetNotFound(subnet_id=id)
# Check the network_id against the strategies
net_id = subnet["network_id"]
net_id = STRATEGY.get_parent_network(net_id)
subnet["network_id"] = net_id
return v._make_subnet_dict(subnet, default_route=routes.DEFAULT_ROUTE)
def get_subnets(self, context, filters=None, fields=None):
"""Retrieve a list of subnets.
The contents of the list depends on the identity of the user
making the request (as indicated by the context) as well as any
filters.
: param context: neutron api request context
: param filters: a dictionary with keys that are valid keys for
a subnet as listed in the RESOURCE_ATTRIBUTE_MAP object
in neutron/api/v2/attributes.py. Values in this dictiontary
are an iterable containing values that will be used for an exact
match comparison for that value. Each result returned by this
function will have matched one of the values for each key in
filters.
: param fields: a list of strings that are valid keys in a
subnet dictionary as listed in the RESOURCE_ATTRIBUTE_MAP
object in neutron/api/v2/attributes.py. Only these fields
will be returned.
"""
LOG.info("get_subnets for tenant %s with filters %s fields %s" %
(context.tenant_id, filters, fields))
subnets = db_api.subnet_find(context, **filters)
return v._make_subnets_list(subnets, fields=fields,
default_route=routes.DEFAULT_ROUTE)
def get_subnets_count(self, context, filters=None):
"""Return the number of subnets.
The result depends on the identity of the user making the request
(as indicated by the context) as well as any filters.
: param context: neutron api request context
: param filters: a dictionary with keys that are valid keys for
a network as listed in the RESOURCE_ATTRIBUTE_MAP object
in neutron/api/v2/attributes.py. Values in this dictiontary
are an iterable containing values that will be used for an exact
match comparison for that value. Each result returned by this
function will have matched one of the values for each key in
filters.
NOTE: this method is optional, as it was not part of the originally
defined plugin API.
"""
LOG.info("get_subnets_count for tenant %s with filters %s" %
(context.tenant_id, filters))
return db_api.subnet_count_all(context, **filters)
def _delete_subnet(self, context, subnet):
if subnet.allocated_ips:
raise exceptions.SubnetInUse(subnet_id=subnet["id"])
db_api.subnet_delete(context, subnet)
def delete_subnet(self, context, id):
"""Delete a subnet.
: param context: neutron api request context
: param id: UUID representing the subnet to delete.
"""
LOG.info("delete_subnet %s for tenant %s" % (id, context.tenant_id))
subnet = db_api.subnet_find(context, id=id, scope=db_api.ONE)
if not subnet:
raise exceptions.SubnetNotFound(subnet_id=id)
self._delete_subnet(context, subnet)
def _adapt_provider_nets(self, context, network): def _adapt_provider_nets(self, context, network):
#TODO(mdietz) going to ignore all the boundary and network #TODO(mdietz) going to ignore all the boundary and network
# type checking for now. # type checking for now.
@@ -366,14 +126,14 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
phys_type=pnet_type, phys_type=pnet_type,
phys_net=phys_net, segment_id=seg_id) phys_net=phys_net, segment_id=seg_id)
subnets = net_attrs.pop("subnets", []) subs = net_attrs.pop("subnets", [])
net_attrs["id"] = net_uuid net_attrs["id"] = net_uuid
net_attrs["tenant_id"] = context.tenant_id net_attrs["tenant_id"] = context.tenant_id
new_net = db_api.network_create(context, **net_attrs) new_net = db_api.network_create(context, **net_attrs)
new_subnets = [] new_subnets = []
for sub in subnets: for sub in subs:
sub["subnet"]["network_id"] = new_net["id"] sub["subnet"]["network_id"] = new_net["id"]
sub["subnet"]["tenant_id"] = context.tenant_id sub["subnet"]["tenant_id"] = context.tenant_id
s = db_api.subnet_create(context, **sub["subnet"]) s = db_api.subnet_create(context, **sub["subnet"])
@@ -483,7 +243,7 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
raise exceptions.NetworkInUse(net_id=id) raise exceptions.NetworkInUse(net_id=id)
self.net_driver.delete_network(context, id) self.net_driver.delete_network(context, id)
for subnet in net["subnets"]: for subnet in net["subnets"]:
self._delete_subnet(context, subnet) subnets._delete_subnet(context, subnet)
db_api.network_delete(context, net) db_api.network_delete(context, net)
def get_mac_address_range(self, context, id, fields=None): def get_mac_address_range(self, context, id, fields=None):
@@ -594,3 +354,21 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
def delete_route(self, context, id): def delete_route(self, context, id):
routes.delete_route(context, id) routes.delete_route(context, id)
def create_subnet(self, context, subnet):
return subnets.create_subnet(context, subnet)
def update_subnet(self, context, id, subnet):
return subnets.update_subnet(context, id, subnet)
def get_subnet(self, context, id, fields=None):
return subnets.get_subnet(context, id, fields)
def get_subnets(self, context, filters=None, fields=None):
return subnets.get_subnets(context, filters, fields)
def get_subnets_count(self, context, filters=None):
return subnets.get_subnets_count(context, filters)
def delete_subnet(self, context, id):
return subnets.delete_subnet(context, id)

View File

@@ -0,0 +1,281 @@
# Copyright 2013 Openstack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from neutron.common import config as neutron_cfg
from neutron.common import exceptions
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from oslo.config import cfg
from quark.db import api as db_api
from quark import network_strategy
from quark.plugin_modules import routes
from quark import plugin_views as v
from quark import utils
CONF = cfg.CONF
DEFAULT_ROUTE = netaddr.IPNetwork("0.0.0.0/0")
LOG = logging.getLogger("neutron.quark")
STRATEGY = network_strategy.STRATEGY
ipam_driver = (importutils.import_class(CONF.QUARK.ipam_driver))()
def _validate_subnet_cidr(context, network_id, new_subnet_cidr):
"""Validate the CIDR for a subnet.
Verifies the specified CIDR does not overlap with the ones defined
for the other subnets specified for this network, or with any other
CIDR if overlapping IPs are disabled.
"""
if neutron_cfg.cfg.CONF.allow_overlapping_ips:
return
new_subnet_ipset = netaddr.IPSet([new_subnet_cidr])
# Using admin context here, in case we actually share networks later
subnet_list = db_api.subnet_find(context.elevated(),
network_id=network_id)
for subnet in subnet_list:
if (netaddr.IPSet([subnet.cidr]) & new_subnet_ipset):
# don't give out details of the overlapping subnet
err_msg = (_("Requested subnet with cidr: %(cidr)s for "
"network: %(network_id)s overlaps with another "
"subnet") %
{'cidr': new_subnet_cidr,
'network_id': network_id})
LOG.error(_("Validation for CIDR: %(new_cidr)s failed - "
"overlaps with subnet %(subnet_id)s "
"(CIDR: %(cidr)s)"),
{'new_cidr': new_subnet_cidr,
'subnet_id': subnet.id,
'cidr': subnet.cidr})
raise exceptions.InvalidInput(error_message=err_msg)
def create_subnet(context, subnet):
"""Create a subnet.
Create a subnet which represents a range of IP addresses
that can be allocated to devices
: param context: neutron api request context
: param subnet: dictionary describing the subnet, with keys
as listed in the RESOURCE_ATTRIBUTE_MAP object in
neutron/api/v2/attributes.py. All keys will be populated.
"""
LOG.info("create_subnet for tenant %s" % context.tenant_id)
net_id = subnet["subnet"]["network_id"]
net = db_api.network_find(context, id=net_id, scope=db_api.ONE)
if not net:
raise exceptions.NetworkNotFound(net_id=net_id)
sub_attrs = subnet["subnet"]
_validate_subnet_cidr(context, net_id, sub_attrs["cidr"])
cidr = netaddr.IPNetwork(sub_attrs["cidr"])
gateway_ip = utils.pop_param(sub_attrs, "gateway_ip", str(cidr[1]))
dns_ips = utils.pop_param(sub_attrs, "dns_nameservers", [])
host_routes = utils.pop_param(sub_attrs, "host_routes", [])
allocation_pools = utils.pop_param(sub_attrs, "allocation_pools", [])
new_subnet = db_api.subnet_create(context, **sub_attrs)
default_route = None
for route in host_routes:
netaddr_route = netaddr.IPNetwork(route["destination"])
if netaddr_route.value == routes.DEFAULT_ROUTE.value:
default_route = route
gateway_ip = default_route["nexthop"]
new_subnet["routes"].append(db_api.route_create(
context, cidr=route["destination"], gateway=route["nexthop"]))
if default_route is None:
new_subnet["routes"].append(db_api.route_create(
context, cidr=str(routes.DEFAULT_ROUTE), gateway=gateway_ip))
for dns_ip in dns_ips:
new_subnet["dns_nameservers"].append(db_api.dns_create(
context, ip=netaddr.IPAddress(dns_ip)))
if allocation_pools:
exclude = netaddr.IPSet([cidr])
for p in allocation_pools:
x = netaddr.IPSet(netaddr.IPRange(p["start"], p["end"]))
exclude = exclude - x
new_subnet["ip_policy"] = db_api.ip_policy_create(context,
exclude=exclude)
# HACK(amir): force backref for ip_policy
if not new_subnet["network"]:
new_subnet["network"] = net
subnet_dict = v._make_subnet_dict(new_subnet,
default_route=routes.DEFAULT_ROUTE)
subnet_dict["gateway_ip"] = gateway_ip
return subnet_dict
def update_subnet(context, id, subnet):
"""Update values of a subnet.
: param context: neutron api request context
: param id: UUID representing the subnet to update.
: param subnet: dictionary with keys indicating fields to update.
valid keys are those that have a value of True for 'allow_put'
as listed in the RESOURCE_ATTRIBUTE_MAP object in
neutron/api/v2/attributes.py.
"""
LOG.info("update_subnet %s for tenant %s" %
(id, context.tenant_id))
subnet_db = db_api.subnet_find(context, id=id, scope=db_api.ONE)
if not subnet_db:
raise exceptions.SubnetNotFound(id=id)
s = subnet["subnet"]
dns_ips = s.pop("dns_nameservers", [])
host_routes = s.pop("host_routes", [])
gateway_ip = s.pop("gateway_ip", None)
if gateway_ip:
default_route = None
for route in host_routes:
netaddr_route = netaddr.IPNetwork(route["destination"])
if netaddr_route.value == routes.DEFAULT_ROUTE.value:
default_route = route
break
if default_route is None:
route_model = db_api.route_find(
context, cidr=str(routes.DEFAULT_ROUTE), subnet_id=id,
scope=db_api.ONE)
if route_model:
db_api.route_update(context, route_model,
gateway=gateway_ip)
else:
db_api.route_create(context,
cidr=str(routes.DEFAULT_ROUTE),
gateway=gateway_ip, subnet_id=id)
if dns_ips:
subnet_db["dns_nameservers"] = []
for dns_ip in dns_ips:
subnet_db["dns_nameservers"].append(db_api.dns_create(
context,
ip=netaddr.IPAddress(dns_ip)))
if host_routes:
subnet_db["routes"] = []
for route in host_routes:
subnet_db["routes"].append(db_api.route_create(
context, cidr=route["destination"], gateway=route["nexthop"]))
subnet = db_api.subnet_update(context, subnet_db, **s)
return v._make_subnet_dict(subnet, default_route=routes.DEFAULT_ROUTE)
def get_subnet(context, id, fields=None):
"""Retrieve a subnet.
: param context: neutron api request context
: param id: UUID representing the subnet to fetch.
: param fields: a list of strings that are valid keys in a
subnet dictionary as listed in the RESOURCE_ATTRIBUTE_MAP
object in neutron/api/v2/attributes.py. Only these fields
will be returned.
"""
LOG.info("get_subnet %s for tenant %s with fields %s" %
(id, context.tenant_id, fields))
subnet = db_api.subnet_find(context, id=id, scope=db_api.ONE)
if not subnet:
raise exceptions.SubnetNotFound(subnet_id=id)
# Check the network_id against the strategies
net_id = subnet["network_id"]
net_id = STRATEGY.get_parent_network(net_id)
subnet["network_id"] = net_id
return v._make_subnet_dict(subnet, default_route=routes.DEFAULT_ROUTE)
def get_subnets(context, filters=None, fields=None):
"""Retrieve a list of subnets.
The contents of the list depends on the identity of the user
making the request (as indicated by the context) as well as any
filters.
: param context: neutron api request context
: param filters: a dictionary with keys that are valid keys for
a subnet as listed in the RESOURCE_ATTRIBUTE_MAP object
in neutron/api/v2/attributes.py. Values in this dictiontary
are an iterable containing values that will be used for an exact
match comparison for that value. Each result returned by this
function will have matched one of the values for each key in
filters.
: param fields: a list of strings that are valid keys in a
subnet dictionary as listed in the RESOURCE_ATTRIBUTE_MAP
object in neutron/api/v2/attributes.py. Only these fields
will be returned.
"""
LOG.info("get_subnets for tenant %s with filters %s fields %s" %
(context.tenant_id, filters, fields))
subnets = db_api.subnet_find(context, **filters)
return v._make_subnets_list(subnets, fields=fields,
default_route=routes.DEFAULT_ROUTE)
def get_subnets_count(context, filters=None):
"""Return the number of subnets.
The result depends on the identity of the user making the request
(as indicated by the context) as well as any filters.
: param context: neutron api request context
: param filters: a dictionary with keys that are valid keys for
a network as listed in the RESOURCE_ATTRIBUTE_MAP object
in neutron/api/v2/attributes.py. Values in this dictiontary
are an iterable containing values that will be used for an exact
match comparison for that value. Each result returned by this
function will have matched one of the values for each key in
filters.
NOTE: this method is optional, as it was not part of the originally
defined plugin API.
"""
LOG.info("get_subnets_count for tenant %s with filters %s" %
(context.tenant_id, filters))
return db_api.subnet_count_all(context, **filters)
def _delete_subnet(context, subnet):
if subnet.allocated_ips:
raise exceptions.SubnetInUse(subnet_id=subnet["id"])
db_api.subnet_delete(context, subnet)
def delete_subnet(context, id):
"""Delete a subnet.
: param context: neutron api request context
: param id: UUID representing the subnet to delete.
"""
LOG.info("delete_subnet %s for tenant %s" % (id, context.tenant_id))
subnet = db_api.subnet_find(context, id=id, scope=db_api.ONE)
if not subnet:
raise exceptions.SubnetNotFound(subnet_id=id)
_delete_subnet(context, subnet)

View File

@@ -0,0 +1,670 @@
# Copyright 2013 Openstack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import copy
import uuid
import mock
from neutron.api.v2 import attributes as neutron_attrs
from neutron.common import exceptions
from oslo.config import cfg
from quark.db import models
from quark.tests import test_quark_plugin
class TestQuarkGetSubnetCount(test_quark_plugin.TestQuarkPlugin):
def test_get_subnet_count(self):
"""This isn't really testable."""
with mock.patch("quark.db.api.subnet_count_all"):
self.plugin.get_subnets_count(self.context, {})
class TestQuarkGetSubnets(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, subnets=None, routes=None):
if routes is None:
routes = []
route_models = []
for route in routes:
r = models.Route()
r.update(route)
route_models.append(r)
if isinstance(subnets, list):
subnet_models = []
for subnet in subnets:
s_dict = subnet.copy()
s_dict["routes"] = route_models
s = models.Subnet(network=models.Network())
s.update(s_dict)
subnet_models.append(s)
elif subnets:
mod = models.Subnet(network=models.Network())
mod.update(subnets)
mod["routes"] = route_models
subnet_models = mod
else:
subnet_models = None
with mock.patch("quark.db.api.subnet_find") as subnet_find:
subnet_find.return_value = subnet_models
yield
def test_subnets_list(self):
subnet_id = str(uuid.uuid4())
route = dict(id=1, cidr="0.0.0.0/0", gateway="192.168.0.1")
subnet = dict(id=subnet_id, network_id=1, name=subnet_id,
tenant_id=self.context.tenant_id, ip_version=4,
cidr="192.168.0.0/24", gateway_ip="192.168.0.1",
dns_nameservers=[],
enable_dhcp=None)
expected_route = dict(destination=route["cidr"],
nexthop=route["gateway"])
with self._stubs(subnets=[subnet], routes=[route]):
res = self.plugin.get_subnets(self.context, {}, {})
# Compare routes separately
routes = res[0].pop("host_routes")
for key in subnet.keys():
self.assertEqual(res[0][key], subnet[key])
for key in expected_route.keys():
self.assertEqual(routes[0][key], expected_route[key])
def test_subnet_show_fail(self):
with self._stubs():
with self.assertRaises(exceptions.SubnetNotFound):
self.plugin.get_subnet(self.context, 1)
def test_subnet_show(self):
subnet_id = str(uuid.uuid4())
route = dict(id=1, cidr="0.0.0.0/0", gateway="192.168.0.1",
subnet_id=subnet_id)
expected_route = dict(destination=route["cidr"],
nexthop=route["gateway"])
subnet = dict(id=subnet_id, network_id=1, name=subnet_id,
tenant_id=self.context.tenant_id, ip_version=4,
cidr="192.168.0.0/24", gateway_ip="192.168.0.1",
dns_nameservers=[],
enable_dhcp=None)
with self._stubs(subnets=subnet, routes=[route]):
res = self.plugin.get_subnet(self.context, subnet_id)
# Compare routes separately
routes = res.pop("host_routes")
for key in subnet.keys():
self.assertEqual(res[key], subnet[key])
for key in expected_route.keys():
self.assertEqual(routes[0][key], expected_route[key])
class TestQuarkCreateSubnetOverlapping(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, subnets=None):
if subnets is None:
subnets = []
subnet_models = []
for subnet in subnets:
s = models.Subnet()
s.update(subnet)
subnet_models.append(s)
network = models.Network()
network.update(dict(id=1, subnets=subnet_models))
with contextlib.nested(
mock.patch("quark.db.api.network_find"),
mock.patch("quark.db.api.subnet_find"),
mock.patch("quark.db.api.subnet_create")
) as (net_find, subnet_find, subnet_create):
net_find.return_value = network
subnet_find.return_value = subnet_models
subnet_create.return_value = models.Subnet(
network=models.Network(),
cidr="192.168.1.1/24")
yield subnet_create
def test_create_subnet_overlapping_true(self):
cfg.CONF.set_override('allow_overlapping_ips', True)
with self._stubs() as subnet_create:
s = dict(subnet=dict(
gateway_ip=neutron_attrs.ATTR_NOT_SPECIFIED,
dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED,
cidr="192.168.1.1/8",
network_id=1))
self.plugin.create_subnet(self.context, s)
self.assertEqual(subnet_create.call_count, 1)
def test_create_subnet_overlapping_false(self):
cfg.CONF.set_override('allow_overlapping_ips', False)
with self._stubs() as subnet_create:
s = dict(subnet=dict(
gateway_ip=neutron_attrs.ATTR_NOT_SPECIFIED,
dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED,
cidr="192.168.1.1/8",
network_id=1))
self.plugin.create_subnet(self.context, s)
self.assertEqual(subnet_create.call_count, 1)
def test_create_subnet_overlapping_conflict(self):
cfg.CONF.set_override('allow_overlapping_ips', False)
with self._stubs(subnets=[dict(cidr="192.168.10.1/24")]):
with self.assertRaises(exceptions.InvalidInput):
s = dict(subnet=dict(cidr="192.168.1.1/8",
network_id=1))
self.plugin.create_subnet(self.context, s)
class TestQuarkCreateSubnetAllocationPools(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, subnet):
s = models.Subnet(network=models.Network(id=1, subnets=[]))
s.update(subnet)
with contextlib.nested(
mock.patch("quark.db.api.network_find"),
mock.patch("quark.db.api.subnet_find"),
mock.patch("quark.db.api.subnet_create")
) as (net_find, subnet_find, subnet_create):
net_find.return_value = s["network"]
subnet_find.return_value = []
subnet_create.return_value = s
yield subnet_create
def test_create_subnet_allocation_pools_zero(self):
s = dict(subnet=dict(
cidr="192.168.1.1/24",
network_id=1))
with self._stubs(s["subnet"]) as subnet_create:
resp = self.plugin.create_subnet(self.context, s)
self.assertEqual(subnet_create.call_count, 1)
self.assertEqual(resp["allocation_pools"],
[dict(start="192.168.1.0", end="192.168.1.255")])
def test_create_subnet_allocation_pools_one(self):
pools = [dict(start="192.168.1.10", end="192.168.1.20")]
s = dict(subnet=dict(
allocation_pools=pools,
cidr="192.168.1.1/24",
network_id=1))
with self._stubs(s["subnet"]) as subnet_create:
resp = self.plugin.create_subnet(self.context, s)
self.assertEqual(subnet_create.call_count, 1)
self.assertEqual(resp["allocation_pools"], pools)
def test_create_subnet_allocation_pools_two(self):
pools = [dict(start="192.168.1.10", end="192.168.1.20"),
dict(start="192.168.1.40", end="192.168.1.50")]
s = dict(subnet=dict(
allocation_pools=pools,
cidr="192.168.1.1/24",
network_id=1))
with self._stubs(s["subnet"]) as subnet_create:
resp = self.plugin.create_subnet(self.context, s)
self.assertEqual(subnet_create.call_count, 1)
self.assertEqual(resp["allocation_pools"], pools)
# TODO(amir): Refactor the tests to test individual subnet attributes.
# * copy.deepcopy was necessary to maintain tests on keys, which is a bit ugly.
# * workaround is also in place for lame ATTR_NOT_SPECIFIED object()
class TestQuarkCreateSubnet(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, subnet=None, network=None, routes=None, dns=None):
subnet_mod = models.Subnet(network=models.Network())
dns_ips = subnet.pop("dns_nameservers", [])
host_routes = subnet.pop("host_routes", [])
subnet_mod.update(subnet)
subnet["dns_nameservers"] = dns_ips
subnet["host_routes"] = host_routes
routes = routes or []
dns = dns or []
route_models = [models.Route(**r) for r in routes]
dns_models = [models.DNSNameserver(**d) for d in dns]
with contextlib.nested(
mock.patch("quark.db.api.subnet_create"),
mock.patch("quark.db.api.network_find"),
mock.patch("quark.db.api.dns_create"),
mock.patch("quark.db.api.route_create"),
) as (subnet_create, net_find, dns_create, route_create):
subnet_create.return_value = subnet_mod
net_find.return_value = network
route_create.side_effect = route_models
dns_create.side_effect = dns_models
yield subnet_create, dns_create, route_create
def test_create_subnet(self):
routes = [dict(cidr="0.0.0.0/0", gateway="0.0.0.0")]
subnet = dict(
subnet=dict(network_id=1,
tenant_id=self.context.tenant_id, ip_version=4,
cidr="172.16.0.0/24", gateway_ip="0.0.0.0",
dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED,
host_routes=neutron_attrs.ATTR_NOT_SPECIFIED,
enable_dhcp=None))
network = dict(network_id=1)
with self._stubs(
subnet=subnet["subnet"],
network=network,
routes=routes
) as (subnet_create, dns_create, route_create):
dns_nameservers = subnet["subnet"].pop("dns_nameservers")
host_routes = subnet["subnet"].pop("host_routes")
subnet_request = copy.deepcopy(subnet)
subnet_request["subnet"]["dns_nameservers"] = dns_nameservers
subnet_request["subnet"]["host_routes"] = host_routes
res = self.plugin.create_subnet(self.context,
subnet_request)
self.assertEqual(subnet_create.call_count, 1)
self.assertEqual(dns_create.call_count, 0)
self.assertEqual(route_create.call_count, 1)
for key in subnet["subnet"].keys():
if key == "host_routes":
self.assertEqual(res[key][0]["destination"], "0.0.0.0/0")
self.assertEqual(res[key][0]["nexthop"], "0.0.0.0")
else:
self.assertEqual(res[key], subnet["subnet"][key])
def test_create_subnet_no_network_fails(self):
subnet = dict(subnet=dict(network_id=1))
with self._stubs(subnet=dict(), network=None):
with self.assertRaises(exceptions.NetworkNotFound):
self.plugin.create_subnet(self.context, subnet)
def test_create_subnet_no_gateway_ip_defaults(self):
routes = [dict(cidr="0.0.0.0/0", gateway="172.16.0.1")]
subnet = dict(
subnet=dict(network_id=1,
tenant_id=self.context.tenant_id, ip_version=4,
cidr="172.16.0.0/24",
gateway_ip=neutron_attrs.ATTR_NOT_SPECIFIED,
dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED,
enable_dhcp=None))
network = dict(network_id=1)
with self._stubs(
subnet=subnet["subnet"],
network=network,
routes=routes
) as (subnet_create, dns_create, route_create):
dns_nameservers = subnet["subnet"].pop("dns_nameservers")
gateway_ip = subnet["subnet"].pop("gateway_ip")
subnet_request = copy.deepcopy(subnet)
subnet_request["subnet"]["dns_nameservers"] = dns_nameservers
subnet_request["subnet"]["gateway_ip"] = gateway_ip
res = self.plugin.create_subnet(self.context, subnet_request)
self.assertEqual(subnet_create.call_count, 1)
self.assertEqual(dns_create.call_count, 0)
self.assertEqual(route_create.call_count, 1)
for key in subnet["subnet"].keys():
if key == "gateway_ip":
self.assertEqual(res[key], "172.16.0.1")
elif key == "host_routes":
self.assertEqual(res[key][0]["destination"], "0.0.0.0/0")
self.assertEqual(res[key][0]["nexthop"], "172.16.0.1")
else:
self.assertEqual(res[key], subnet["subnet"][key])
def test_create_subnet_dns_nameservers(self):
routes = [dict(cidr="0.0.0.0/0", gateway="0.0.0.0")]
dns_ns = [dict(ip="4.2.2.1"), dict(ip="4.2.2.2")]
subnet = dict(
subnet=dict(network_id=1,
tenant_id=self.context.tenant_id, ip_version=4,
cidr="172.16.0.0/24", gateway_ip="0.0.0.0",
dns_nameservers=["4.2.2.1", "4.2.2.2"],
enable_dhcp=None))
network = dict(network_id=1)
with self._stubs(
subnet=subnet["subnet"],
network=network,
routes=routes,
dns=dns_ns
) as (subnet_create, dns_create, route_create):
res = self.plugin.create_subnet(self.context,
copy.deepcopy(subnet))
self.assertEqual(subnet_create.call_count, 1)
self.assertEqual(dns_create.call_count, 2)
self.assertEqual(route_create.call_count, 1)
for key in subnet["subnet"].keys():
if key == "host_routes":
self.assertEqual(res[key][0]["destination"], "0.0.0.0/0")
self.assertEqual(res[key][0]["nexthop"], "0.0.0.0")
else:
self.assertEqual(res[key], subnet["subnet"][key])
def test_create_subnet_routes(self):
routes = [dict(cidr="1.1.1.1/8", gateway="172.16.0.4"),
dict(cidr="0.0.0.0/0", gateway="0.0.0.0")]
subnet = dict(
subnet=dict(network_id=1,
tenant_id=self.context.tenant_id, ip_version=4,
cidr="172.16.0.0/24", gateway_ip="0.0.0.0",
dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED,
host_routes=[{"destination": "1.1.1.1/8",
"nexthop": "172.16.0.4"}],
enable_dhcp=None))
network = dict(network_id=1)
with self._stubs(
subnet=subnet["subnet"],
network=network,
routes=routes
) as (subnet_create, dns_create, route_create):
dns_nameservers = subnet["subnet"].pop("dns_nameservers")
subnet_request = copy.deepcopy(subnet)
subnet_request["subnet"]["dns_nameservers"] = dns_nameservers
res = self.plugin.create_subnet(self.context, subnet_request)
self.assertEqual(subnet_create.call_count, 1)
self.assertEqual(dns_create.call_count, 0)
self.assertEqual(route_create.call_count, 2)
for key in subnet["subnet"].keys():
if key == "host_routes":
res_tuples = [(r["destination"], r["nexthop"])
for r in res[key]]
self.assertIn(("1.1.1.1/8", "172.16.0.4"), res_tuples)
self.assertIn(("0.0.0.0/0", "0.0.0.0"), res_tuples)
self.assertEqual(2, len(res_tuples))
else:
self.assertEqual(res[key], subnet["subnet"][key])
def test_create_subnet_default_route(self):
routes = [dict(cidr="0.0.0.0/0", gateway="172.16.0.4")]
subnet = dict(
subnet=dict(network_id=1,
tenant_id=self.context.tenant_id, ip_version=4,
cidr="172.16.0.0/24",
gateway_ip=neutron_attrs.ATTR_NOT_SPECIFIED,
dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED,
host_routes=[{"destination": "0.0.0.0/0",
"nexthop": "172.16.0.4"}],
enable_dhcp=None))
network = dict(network_id=1)
with self._stubs(
subnet=subnet["subnet"],
network=network,
routes=routes
) as (subnet_create, dns_create, route_create):
dns_nameservers = subnet["subnet"].pop("dns_nameservers")
gateway_ip = subnet["subnet"].pop("gateway_ip")
subnet_request = copy.deepcopy(subnet)
subnet_request["subnet"]["dns_nameservers"] = dns_nameservers
subnet_request["subnet"]["gateway_ip"] = gateway_ip
res = self.plugin.create_subnet(self.context, subnet_request)
self.assertEqual(subnet_create.call_count, 1)
self.assertEqual(dns_create.call_count, 0)
self.assertEqual(route_create.call_count, 1)
for key in subnet["subnet"].keys():
if key == "host_routes":
res_tuples = [(r["destination"], r["nexthop"])
for r in res[key]]
self.assertEqual([("0.0.0.0/0", "172.16.0.4")], res_tuples)
elif key == "gateway_ip":
self.assertEqual(res[key], "172.16.0.4")
else:
self.assertEqual(res[key], subnet["subnet"][key])
def test_create_subnet_default_route_gateway_ip(self):
"""If default route (host_routes) and gateway_ip are both provided,
then host_route takes precedence.
"""
routes = [dict(cidr="0.0.0.0/0", gateway="172.16.0.4")]
subnet = dict(
subnet=dict(network_id=1,
tenant_id=self.context.tenant_id, ip_version=4,
cidr="172.16.0.0/24",
gateway_ip="172.16.0.3",
dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED,
host_routes=[{"destination": "0.0.0.0/0",
"nexthop": "172.16.0.4"}],
enable_dhcp=None))
network = dict(network_id=1)
with self._stubs(
subnet=subnet["subnet"],
network=network,
routes=routes
) as (subnet_create, dns_create, route_create):
dns_nameservers = subnet["subnet"].pop("dns_nameservers")
subnet_request = copy.deepcopy(subnet)
subnet_request["subnet"]["dns_nameservers"] = dns_nameservers
res = self.plugin.create_subnet(self.context, subnet_request)
self.assertEqual(subnet_create.call_count, 1)
self.assertEqual(dns_create.call_count, 0)
self.assertEqual(route_create.call_count, 1)
for key in subnet["subnet"].keys():
if key == "host_routes":
res_tuples = [(r["destination"], r["nexthop"])
for r in res[key]]
self.assertEqual([("0.0.0.0/0", "172.16.0.4")], res_tuples)
elif key == "gateway_ip":
self.assertEqual(res[key], "172.16.0.4")
else:
self.assertEqual(res[key], subnet["subnet"][key])
class TestQuarkUpdateSubnet(test_quark_plugin.TestQuarkPlugin):
DEFAULT_ROUTE = [dict(destination="0.0.0.0/0",
nexthop="172.16.0.1")]
@contextlib.contextmanager
def _stubs(self, host_routes=None, new_routes=None, find_routes=True,
new_dns_servers=None):
if host_routes is None:
host_routes = []
if new_routes:
new_routes = [models.Route(cidr=r["destination"],
gateway=r["nexthop"],
subnet_id=1)
for r in new_routes]
if new_dns_servers:
new_dns_servers = [models.DNSNameserver(
ip=ip,
subnet_id=1) for ip in new_dns_servers]
subnet = dict(
id=1,
network_id=1,
tenant_id=self.context.tenant_id, ip_version=4,
cidr="172.16.0.0/24",
host_routes=host_routes,
dns_nameservers=["4.2.2.1", "4.2.2.2"],
enable_dhcp=None)
dns_ips = subnet.pop("dns_nameservers", [])
host_routes = subnet.pop("host_routes", [])
subnet_mod = models.Subnet()
subnet_mod.update(subnet)
subnet_mod["dns_nameservers"] = [models.DNSNameserver(ip=ip)
for ip in dns_ips]
subnet_mod["routes"] = [models.Route(cidr=r["destination"],
gateway=r["nexthop"],
subnet_id=subnet_mod["id"])
for r in host_routes]
with contextlib.nested(
mock.patch("quark.db.api.subnet_find"),
mock.patch("quark.db.api.subnet_update"),
mock.patch("quark.db.api.dns_create"),
mock.patch("quark.db.api.route_find"),
mock.patch("quark.db.api.route_update"),
mock.patch("quark.db.api.route_create"),
) as (subnet_find, subnet_update,
dns_create,
route_find, route_update, route_create):
subnet_find.return_value = subnet_mod
route_find.return_value = subnet_mod["routes"][0] \
if subnet_mod["routes"] and find_routes else None
new_subnet_mod = models.Subnet(network=models.Network())
new_subnet_mod.update(subnet_mod)
if new_routes:
new_subnet_mod["routes"] = new_routes
if new_dns_servers:
new_subnet_mod["dns_nameservers"] = new_dns_servers
subnet_update.return_value = new_subnet_mod
yield dns_create, route_update, route_create
def test_update_subnet_not_found(self):
with self.assertRaises(exceptions.SubnetNotFound):
self.plugin.update_subnet(self.context, 1, {})
def test_update_subnet_dns_nameservers(self):
new_dns_servers = ["1.1.1.2"]
with self._stubs(
host_routes=self.DEFAULT_ROUTE,
new_dns_servers=new_dns_servers
) as (dns_create, route_update, route_create):
req = dict(subnet=dict(dns_nameservers=new_dns_servers))
res = self.plugin.update_subnet(self.context,
1,
req)
self.assertEqual(dns_create.call_count, 1)
self.assertEqual(route_create.call_count, 0)
self.assertEqual(res["dns_nameservers"], new_dns_servers)
def test_update_subnet_routes(self):
new_routes = [dict(destination="10.0.0.0/24",
nexthop="1.1.1.1")]
with self._stubs(
host_routes=self.DEFAULT_ROUTE,
new_routes=new_routes
) as (dns_create, route_update, route_create):
req = dict(subnet=dict(
host_routes=new_routes))
res = self.plugin.update_subnet(self.context, 1, req)
self.assertEqual(dns_create.call_count, 0)
self.assertEqual(route_create.call_count, 1)
self.assertEqual(len(res["host_routes"]), 1)
self.assertEqual(res["host_routes"][0]["destination"],
"10.0.0.0/24")
self.assertEqual(res["host_routes"][0]["nexthop"],
"1.1.1.1")
self.assertIsNone(res["gateway_ip"])
def test_update_subnet_gateway_ip_with_default_route_in_db(self):
with self._stubs(
host_routes=self.DEFAULT_ROUTE,
new_routes=[dict(destination="0.0.0.0/0", nexthop="1.2.3.4")]
) as (dns_create, route_update, route_create):
req = dict(subnet=dict(gateway_ip="1.2.3.4"))
res = self.plugin.update_subnet(self.context, 1, req)
self.assertEqual(dns_create.call_count, 0)
self.assertEqual(route_create.call_count, 0)
self.assertEqual(route_update.call_count, 1)
self.assertEqual(len(res["host_routes"]), 1)
self.assertEqual(res["host_routes"][0]["destination"],
"0.0.0.0/0")
self.assertEqual(res["host_routes"][0]["nexthop"],
"1.2.3.4")
self.assertEqual(res["gateway_ip"], "1.2.3.4")
def test_update_subnet_gateway_ip_with_non_default_route_in_db(self):
with self._stubs(
host_routes=[dict(destination="1.1.1.1/8", nexthop="9.9.9.9")],
find_routes=False,
new_routes=[dict(destination="1.1.1.1/8", nexthop="9.9.9.9"),
dict(destination="0.0.0.0/0", nexthop="1.2.3.4")]
) as (dns_create, route_update, route_create):
req = dict(subnet=dict(gateway_ip="1.2.3.4"))
res = self.plugin.update_subnet(self.context, 1, req)
self.assertEqual(dns_create.call_count, 0)
self.assertEqual(route_create.call_count, 1)
self.assertEqual(res["gateway_ip"], "1.2.3.4")
self.assertEqual(len(res["host_routes"]), 2)
res_tuples = [(r["destination"], r["nexthop"])
for r in res["host_routes"]]
self.assertIn(("0.0.0.0/0", "1.2.3.4"), res_tuples)
self.assertIn(("1.1.1.1/8", "9.9.9.9"), res_tuples)
def test_update_subnet_gateway_ip_without_default_route_in_db(self):
with self._stubs(
host_routes=None,
new_routes=[dict(destination="0.0.0.0/0", nexthop="1.2.3.4")]
) as (dns_create, route_update, route_create):
req = dict(subnet=dict(gateway_ip="1.2.3.4"))
res = self.plugin.update_subnet(self.context, 1, req)
self.assertEqual(dns_create.call_count, 0)
self.assertEqual(route_create.call_count, 1)
self.assertEqual(len(res["host_routes"]), 1)
self.assertEqual(res["host_routes"][0]["destination"],
"0.0.0.0/0")
self.assertEqual(res["host_routes"][0]["nexthop"],
"1.2.3.4")
self.assertEqual(res["gateway_ip"], "1.2.3.4")
def test_update_subnet_gateway_ip_with_default_route_in_args(self):
new_routes = [dict(destination="0.0.0.0/0",
nexthop="4.3.2.1")]
with self._stubs(
host_routes=self.DEFAULT_ROUTE,
new_routes=new_routes
) as (dns_create, route_update, route_create):
req = dict(subnet=dict(
host_routes=new_routes,
gateway_ip="1.2.3.4"))
res = self.plugin.update_subnet(self.context, 1, req)
self.assertEqual(dns_create.call_count, 0)
self.assertEqual(route_create.call_count, 1)
self.assertEqual(len(res["host_routes"]), 1)
self.assertEqual(res["host_routes"][0]["destination"],
"0.0.0.0/0")
self.assertEqual(res["host_routes"][0]["nexthop"],
"4.3.2.1")
self.assertEqual(res["gateway_ip"], "4.3.2.1")
class TestQuarkDeleteSubnet(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, subnet, ips):
ip_mods = []
subnet_mod = None
if subnet:
subnet_mod = models.Subnet()
subnet_mod.update(subnet)
for ip in ips:
ip_mod = models.IPAddress()
ip_mod.update(ip)
ip_mods.append(ip_mod)
db_mod = "quark.db.api"
with contextlib.nested(
mock.patch("%s.subnet_find" % db_mod),
mock.patch("%s.subnet_delete" % db_mod)
) as (sub_find, sub_delete):
if subnet_mod:
subnet_mod.allocated_ips = ip_mods
sub_find.return_value = subnet_mod
yield sub_delete
def test_delete_subnet(self):
subnet = dict(id=1)
with self._stubs(subnet=subnet, ips=[]) as sub_delete:
self.plugin.delete_subnet(self.context, 1)
self.assertTrue(sub_delete.called)
def test_delete_subnet_no_subnet_fails(self):
with self._stubs(subnet=None, ips=[]):
with self.assertRaises(exceptions.SubnetNotFound):
self.plugin.delete_subnet(self.context, 1)
def test_delete_subnet_has_allocated_ips_fails(self):
subnet = dict(id=1)
with self._stubs(subnet=subnet, ips=[{}]):
with self.assertRaises(exceptions.SubnetInUse):
self.plugin.delete_subnet(self.context, 1)

View File

@@ -13,12 +13,9 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import uuid
import contextlib import contextlib
import copy
import mock import mock
from neutron.api.v2 import attributes as neutron_attrs
from neutron.common import exceptions from neutron.common import exceptions
from neutron.db import api as db_api from neutron.db import api as db_api
from oslo.config import cfg from oslo.config import cfg
@@ -42,13 +39,6 @@ class TestQuarkPlugin(test_base.TestBase):
db_api.clear_db() db_api.clear_db()
class TestQuarkGetSubnetCount(TestQuarkPlugin):
def test_get_subnet_count(self):
"""This isn't really testable."""
with mock.patch("quark.db.api.subnet_count_all"):
self.plugin.get_subnets_count(self.context, {})
class TestQuarkAPIExtensions(TestQuarkPlugin): class TestQuarkAPIExtensions(TestQuarkPlugin):
"""Adds coverage for appending the API extension path.""" """Adds coverage for appending the API extension path."""
def test_append_quark_extensions(self): def test_append_quark_extensions(self):
@@ -69,643 +59,6 @@ class TestQuarkAPIExtensions(TestQuarkPlugin):
"apple:banana:carrot") "apple:banana:carrot")
class TestQuarkGetSubnets(TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, subnets=None, routes=None):
if routes is None:
routes = []
route_models = []
for route in routes:
r = models.Route()
r.update(route)
route_models.append(r)
if isinstance(subnets, list):
subnet_models = []
for subnet in subnets:
s_dict = subnet.copy()
s_dict["routes"] = route_models
s = models.Subnet(network=models.Network())
s.update(s_dict)
subnet_models.append(s)
elif subnets:
mod = models.Subnet(network=models.Network())
mod.update(subnets)
mod["routes"] = route_models
subnet_models = mod
else:
subnet_models = None
with mock.patch("quark.db.api.subnet_find") as subnet_find:
subnet_find.return_value = subnet_models
yield
def test_subnets_list(self):
subnet_id = str(uuid.uuid4())
route = dict(id=1, cidr="0.0.0.0/0", gateway="192.168.0.1")
subnet = dict(id=subnet_id, network_id=1, name=subnet_id,
tenant_id=self.context.tenant_id, ip_version=4,
cidr="192.168.0.0/24", gateway_ip="192.168.0.1",
dns_nameservers=[],
enable_dhcp=None)
expected_route = dict(destination=route["cidr"],
nexthop=route["gateway"])
with self._stubs(subnets=[subnet], routes=[route]):
res = self.plugin.get_subnets(self.context, {}, {})
# Compare routes separately
routes = res[0].pop("host_routes")
for key in subnet.keys():
self.assertEqual(res[0][key], subnet[key])
for key in expected_route.keys():
self.assertEqual(routes[0][key], expected_route[key])
def test_subnet_show_fail(self):
with self._stubs():
with self.assertRaises(exceptions.SubnetNotFound):
self.plugin.get_subnet(self.context, 1)
def test_subnet_show(self):
subnet_id = str(uuid.uuid4())
route = dict(id=1, cidr="0.0.0.0/0", gateway="192.168.0.1",
subnet_id=subnet_id)
expected_route = dict(destination=route["cidr"],
nexthop=route["gateway"])
subnet = dict(id=subnet_id, network_id=1, name=subnet_id,
tenant_id=self.context.tenant_id, ip_version=4,
cidr="192.168.0.0/24", gateway_ip="192.168.0.1",
dns_nameservers=[],
enable_dhcp=None)
with self._stubs(subnets=subnet, routes=[route]):
res = self.plugin.get_subnet(self.context, subnet_id)
# Compare routes separately
routes = res.pop("host_routes")
for key in subnet.keys():
self.assertEqual(res[key], subnet[key])
for key in expected_route.keys():
self.assertEqual(routes[0][key], expected_route[key])
class TestQuarkCreateSubnetOverlapping(TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, subnets=None):
if subnets is None:
subnets = []
subnet_models = []
for subnet in subnets:
s = models.Subnet()
s.update(subnet)
subnet_models.append(s)
network = models.Network()
network.update(dict(id=1, subnets=subnet_models))
with contextlib.nested(
mock.patch("quark.db.api.network_find"),
mock.patch("quark.db.api.subnet_find"),
mock.patch("quark.db.api.subnet_create")
) as (net_find, subnet_find, subnet_create):
net_find.return_value = network
subnet_find.return_value = subnet_models
subnet_create.return_value = models.Subnet(
network=models.Network(),
cidr="192.168.1.1/24")
yield subnet_create
def test_create_subnet_overlapping_true(self):
cfg.CONF.set_override('allow_overlapping_ips', True)
with self._stubs() as subnet_create:
s = dict(subnet=dict(
gateway_ip=neutron_attrs.ATTR_NOT_SPECIFIED,
dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED,
cidr="192.168.1.1/8",
network_id=1))
self.plugin.create_subnet(self.context, s)
self.assertEqual(subnet_create.call_count, 1)
def test_create_subnet_overlapping_false(self):
cfg.CONF.set_override('allow_overlapping_ips', False)
with self._stubs() as subnet_create:
s = dict(subnet=dict(
gateway_ip=neutron_attrs.ATTR_NOT_SPECIFIED,
dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED,
cidr="192.168.1.1/8",
network_id=1))
self.plugin.create_subnet(self.context, s)
self.assertEqual(subnet_create.call_count, 1)
def test_create_subnet_overlapping_conflict(self):
cfg.CONF.set_override('allow_overlapping_ips', False)
with self._stubs(subnets=[dict(cidr="192.168.10.1/24")]):
with self.assertRaises(exceptions.InvalidInput):
s = dict(subnet=dict(cidr="192.168.1.1/8",
network_id=1))
self.plugin.create_subnet(self.context, s)
class TestQuarkCreateSubnetAllocationPools(TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, subnet):
s = models.Subnet(network=models.Network(id=1, subnets=[]))
s.update(subnet)
with contextlib.nested(
mock.patch("quark.db.api.network_find"),
mock.patch("quark.db.api.subnet_find"),
mock.patch("quark.db.api.subnet_create")
) as (net_find, subnet_find, subnet_create):
net_find.return_value = s["network"]
subnet_find.return_value = []
subnet_create.return_value = s
yield subnet_create
def test_create_subnet_allocation_pools_zero(self):
s = dict(subnet=dict(
cidr="192.168.1.1/24",
network_id=1))
with self._stubs(s["subnet"]) as subnet_create:
resp = self.plugin.create_subnet(self.context, s)
self.assertEqual(subnet_create.call_count, 1)
self.assertEqual(resp["allocation_pools"],
[dict(start="192.168.1.0", end="192.168.1.255")])
def test_create_subnet_allocation_pools_one(self):
pools = [dict(start="192.168.1.10", end="192.168.1.20")]
s = dict(subnet=dict(
allocation_pools=pools,
cidr="192.168.1.1/24",
network_id=1))
with self._stubs(s["subnet"]) as subnet_create:
resp = self.plugin.create_subnet(self.context, s)
self.assertEqual(subnet_create.call_count, 1)
self.assertEqual(resp["allocation_pools"], pools)
def test_create_subnet_allocation_pools_two(self):
pools = [dict(start="192.168.1.10", end="192.168.1.20"),
dict(start="192.168.1.40", end="192.168.1.50")]
s = dict(subnet=dict(
allocation_pools=pools,
cidr="192.168.1.1/24",
network_id=1))
with self._stubs(s["subnet"]) as subnet_create:
resp = self.plugin.create_subnet(self.context, s)
self.assertEqual(subnet_create.call_count, 1)
self.assertEqual(resp["allocation_pools"], pools)
# TODO(amir): Refactor the tests to test individual subnet attributes.
# * copy.deepcopy was necessary to maintain tests on keys, which is a bit ugly.
# * workaround is also in place for lame ATTR_NOT_SPECIFIED object()
class TestQuarkCreateSubnet(TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, subnet=None, network=None, routes=None, dns=None):
subnet_mod = models.Subnet(network=models.Network())
dns_ips = subnet.pop("dns_nameservers", [])
host_routes = subnet.pop("host_routes", [])
subnet_mod.update(subnet)
subnet["dns_nameservers"] = dns_ips
subnet["host_routes"] = host_routes
routes = routes or []
dns = dns or []
route_models = [models.Route(**r) for r in routes]
dns_models = [models.DNSNameserver(**d) for d in dns]
with contextlib.nested(
mock.patch("quark.db.api.subnet_create"),
mock.patch("quark.db.api.network_find"),
mock.patch("quark.db.api.dns_create"),
mock.patch("quark.db.api.route_create"),
) as (subnet_create, net_find, dns_create, route_create):
subnet_create.return_value = subnet_mod
net_find.return_value = network
route_create.side_effect = route_models
dns_create.side_effect = dns_models
yield subnet_create, dns_create, route_create
def test_create_subnet(self):
routes = [dict(cidr="0.0.0.0/0", gateway="0.0.0.0")]
subnet = dict(
subnet=dict(network_id=1,
tenant_id=self.context.tenant_id, ip_version=4,
cidr="172.16.0.0/24", gateway_ip="0.0.0.0",
dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED,
host_routes=neutron_attrs.ATTR_NOT_SPECIFIED,
enable_dhcp=None))
network = dict(network_id=1)
with self._stubs(
subnet=subnet["subnet"],
network=network,
routes=routes
) as (subnet_create, dns_create, route_create):
dns_nameservers = subnet["subnet"].pop("dns_nameservers")
host_routes = subnet["subnet"].pop("host_routes")
subnet_request = copy.deepcopy(subnet)
subnet_request["subnet"]["dns_nameservers"] = dns_nameservers
subnet_request["subnet"]["host_routes"] = host_routes
res = self.plugin.create_subnet(self.context,
subnet_request)
self.assertEqual(subnet_create.call_count, 1)
self.assertEqual(dns_create.call_count, 0)
self.assertEqual(route_create.call_count, 1)
for key in subnet["subnet"].keys():
if key == "host_routes":
self.assertEqual(res[key][0]["destination"], "0.0.0.0/0")
self.assertEqual(res[key][0]["nexthop"], "0.0.0.0")
else:
self.assertEqual(res[key], subnet["subnet"][key])
def test_create_subnet_no_network_fails(self):
subnet = dict(subnet=dict(network_id=1))
with self._stubs(subnet=dict(), network=None):
with self.assertRaises(exceptions.NetworkNotFound):
self.plugin.create_subnet(self.context, subnet)
def test_create_subnet_no_gateway_ip_defaults(self):
routes = [dict(cidr="0.0.0.0/0", gateway="172.16.0.1")]
subnet = dict(
subnet=dict(network_id=1,
tenant_id=self.context.tenant_id, ip_version=4,
cidr="172.16.0.0/24",
gateway_ip=neutron_attrs.ATTR_NOT_SPECIFIED,
dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED,
enable_dhcp=None))
network = dict(network_id=1)
with self._stubs(
subnet=subnet["subnet"],
network=network,
routes=routes
) as (subnet_create, dns_create, route_create):
dns_nameservers = subnet["subnet"].pop("dns_nameservers")
gateway_ip = subnet["subnet"].pop("gateway_ip")
subnet_request = copy.deepcopy(subnet)
subnet_request["subnet"]["dns_nameservers"] = dns_nameservers
subnet_request["subnet"]["gateway_ip"] = gateway_ip
res = self.plugin.create_subnet(self.context, subnet_request)
self.assertEqual(subnet_create.call_count, 1)
self.assertEqual(dns_create.call_count, 0)
self.assertEqual(route_create.call_count, 1)
for key in subnet["subnet"].keys():
if key == "gateway_ip":
self.assertEqual(res[key], "172.16.0.1")
elif key == "host_routes":
self.assertEqual(res[key][0]["destination"], "0.0.0.0/0")
self.assertEqual(res[key][0]["nexthop"], "172.16.0.1")
else:
self.assertEqual(res[key], subnet["subnet"][key])
def test_create_subnet_dns_nameservers(self):
routes = [dict(cidr="0.0.0.0/0", gateway="0.0.0.0")]
dns_ns = [dict(ip="4.2.2.1"), dict(ip="4.2.2.2")]
subnet = dict(
subnet=dict(network_id=1,
tenant_id=self.context.tenant_id, ip_version=4,
cidr="172.16.0.0/24", gateway_ip="0.0.0.0",
dns_nameservers=["4.2.2.1", "4.2.2.2"],
enable_dhcp=None))
network = dict(network_id=1)
with self._stubs(
subnet=subnet["subnet"],
network=network,
routes=routes,
dns=dns_ns
) as (subnet_create, dns_create, route_create):
res = self.plugin.create_subnet(self.context,
copy.deepcopy(subnet))
self.assertEqual(subnet_create.call_count, 1)
self.assertEqual(dns_create.call_count, 2)
self.assertEqual(route_create.call_count, 1)
for key in subnet["subnet"].keys():
if key == "host_routes":
self.assertEqual(res[key][0]["destination"], "0.0.0.0/0")
self.assertEqual(res[key][0]["nexthop"], "0.0.0.0")
else:
self.assertEqual(res[key], subnet["subnet"][key])
def test_create_subnet_routes(self):
routes = [dict(cidr="1.1.1.1/8", gateway="172.16.0.4"),
dict(cidr="0.0.0.0/0", gateway="0.0.0.0")]
subnet = dict(
subnet=dict(network_id=1,
tenant_id=self.context.tenant_id, ip_version=4,
cidr="172.16.0.0/24", gateway_ip="0.0.0.0",
dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED,
host_routes=[{"destination": "1.1.1.1/8",
"nexthop": "172.16.0.4"}],
enable_dhcp=None))
network = dict(network_id=1)
with self._stubs(
subnet=subnet["subnet"],
network=network,
routes=routes
) as (subnet_create, dns_create, route_create):
dns_nameservers = subnet["subnet"].pop("dns_nameservers")
subnet_request = copy.deepcopy(subnet)
subnet_request["subnet"]["dns_nameservers"] = dns_nameservers
res = self.plugin.create_subnet(self.context, subnet_request)
self.assertEqual(subnet_create.call_count, 1)
self.assertEqual(dns_create.call_count, 0)
self.assertEqual(route_create.call_count, 2)
for key in subnet["subnet"].keys():
if key == "host_routes":
res_tuples = [(r["destination"], r["nexthop"])
for r in res[key]]
self.assertIn(("1.1.1.1/8", "172.16.0.4"), res_tuples)
self.assertIn(("0.0.0.0/0", "0.0.0.0"), res_tuples)
self.assertEqual(2, len(res_tuples))
else:
self.assertEqual(res[key], subnet["subnet"][key])
def test_create_subnet_default_route(self):
routes = [dict(cidr="0.0.0.0/0", gateway="172.16.0.4")]
subnet = dict(
subnet=dict(network_id=1,
tenant_id=self.context.tenant_id, ip_version=4,
cidr="172.16.0.0/24",
gateway_ip=neutron_attrs.ATTR_NOT_SPECIFIED,
dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED,
host_routes=[{"destination": "0.0.0.0/0",
"nexthop": "172.16.0.4"}],
enable_dhcp=None))
network = dict(network_id=1)
with self._stubs(
subnet=subnet["subnet"],
network=network,
routes=routes
) as (subnet_create, dns_create, route_create):
dns_nameservers = subnet["subnet"].pop("dns_nameservers")
gateway_ip = subnet["subnet"].pop("gateway_ip")
subnet_request = copy.deepcopy(subnet)
subnet_request["subnet"]["dns_nameservers"] = dns_nameservers
subnet_request["subnet"]["gateway_ip"] = gateway_ip
res = self.plugin.create_subnet(self.context, subnet_request)
self.assertEqual(subnet_create.call_count, 1)
self.assertEqual(dns_create.call_count, 0)
self.assertEqual(route_create.call_count, 1)
for key in subnet["subnet"].keys():
if key == "host_routes":
res_tuples = [(r["destination"], r["nexthop"])
for r in res[key]]
self.assertEqual([("0.0.0.0/0", "172.16.0.4")], res_tuples)
elif key == "gateway_ip":
self.assertEqual(res[key], "172.16.0.4")
else:
self.assertEqual(res[key], subnet["subnet"][key])
def test_create_subnet_default_route_gateway_ip(self):
"""If default route (host_routes) and gateway_ip are both provided,
then host_route takes precedence.
"""
routes = [dict(cidr="0.0.0.0/0", gateway="172.16.0.4")]
subnet = dict(
subnet=dict(network_id=1,
tenant_id=self.context.tenant_id, ip_version=4,
cidr="172.16.0.0/24",
gateway_ip="172.16.0.3",
dns_nameservers=neutron_attrs.ATTR_NOT_SPECIFIED,
host_routes=[{"destination": "0.0.0.0/0",
"nexthop": "172.16.0.4"}],
enable_dhcp=None))
network = dict(network_id=1)
with self._stubs(
subnet=subnet["subnet"],
network=network,
routes=routes
) as (subnet_create, dns_create, route_create):
dns_nameservers = subnet["subnet"].pop("dns_nameservers")
subnet_request = copy.deepcopy(subnet)
subnet_request["subnet"]["dns_nameservers"] = dns_nameservers
res = self.plugin.create_subnet(self.context, subnet_request)
self.assertEqual(subnet_create.call_count, 1)
self.assertEqual(dns_create.call_count, 0)
self.assertEqual(route_create.call_count, 1)
for key in subnet["subnet"].keys():
if key == "host_routes":
res_tuples = [(r["destination"], r["nexthop"])
for r in res[key]]
self.assertEqual([("0.0.0.0/0", "172.16.0.4")], res_tuples)
elif key == "gateway_ip":
self.assertEqual(res[key], "172.16.0.4")
else:
self.assertEqual(res[key], subnet["subnet"][key])
class TestQuarkUpdateSubnet(TestQuarkPlugin):
DEFAULT_ROUTE = [dict(destination="0.0.0.0/0",
nexthop="172.16.0.1")]
@contextlib.contextmanager
def _stubs(self, host_routes=None, new_routes=None, find_routes=True,
new_dns_servers=None):
if host_routes is None:
host_routes = []
if new_routes:
new_routes = [models.Route(cidr=r["destination"],
gateway=r["nexthop"],
subnet_id=1)
for r in new_routes]
if new_dns_servers:
new_dns_servers = [models.DNSNameserver(
ip=ip,
subnet_id=1) for ip in new_dns_servers]
subnet = dict(
id=1,
network_id=1,
tenant_id=self.context.tenant_id, ip_version=4,
cidr="172.16.0.0/24",
host_routes=host_routes,
dns_nameservers=["4.2.2.1", "4.2.2.2"],
enable_dhcp=None)
dns_ips = subnet.pop("dns_nameservers", [])
host_routes = subnet.pop("host_routes", [])
subnet_mod = models.Subnet()
subnet_mod.update(subnet)
subnet_mod["dns_nameservers"] = [models.DNSNameserver(ip=ip)
for ip in dns_ips]
subnet_mod["routes"] = [models.Route(cidr=r["destination"],
gateway=r["nexthop"],
subnet_id=subnet_mod["id"])
for r in host_routes]
with contextlib.nested(
mock.patch("quark.db.api.subnet_find"),
mock.patch("quark.db.api.subnet_update"),
mock.patch("quark.db.api.dns_create"),
mock.patch("quark.db.api.route_find"),
mock.patch("quark.db.api.route_update"),
mock.patch("quark.db.api.route_create"),
) as (subnet_find, subnet_update,
dns_create,
route_find, route_update, route_create):
subnet_find.return_value = subnet_mod
route_find.return_value = subnet_mod["routes"][0] \
if subnet_mod["routes"] and find_routes else None
new_subnet_mod = models.Subnet(network=models.Network())
new_subnet_mod.update(subnet_mod)
if new_routes:
new_subnet_mod["routes"] = new_routes
if new_dns_servers:
new_subnet_mod["dns_nameservers"] = new_dns_servers
subnet_update.return_value = new_subnet_mod
yield dns_create, route_update, route_create
def test_update_subnet_not_found(self):
with self.assertRaises(exceptions.SubnetNotFound):
self.plugin.update_subnet(self.context, 1, {})
def test_update_subnet_dns_nameservers(self):
new_dns_servers = ["1.1.1.2"]
with self._stubs(
host_routes=self.DEFAULT_ROUTE,
new_dns_servers=new_dns_servers
) as (dns_create, route_update, route_create):
req = dict(subnet=dict(dns_nameservers=new_dns_servers))
res = self.plugin.update_subnet(self.context,
1,
req)
self.assertEqual(dns_create.call_count, 1)
self.assertEqual(route_create.call_count, 0)
self.assertEqual(res["dns_nameservers"], new_dns_servers)
def test_update_subnet_routes(self):
new_routes = [dict(destination="10.0.0.0/24",
nexthop="1.1.1.1")]
with self._stubs(
host_routes=self.DEFAULT_ROUTE,
new_routes=new_routes
) as (dns_create, route_update, route_create):
req = dict(subnet=dict(
host_routes=new_routes))
res = self.plugin.update_subnet(self.context, 1, req)
self.assertEqual(dns_create.call_count, 0)
self.assertEqual(route_create.call_count, 1)
self.assertEqual(len(res["host_routes"]), 1)
self.assertEqual(res["host_routes"][0]["destination"],
"10.0.0.0/24")
self.assertEqual(res["host_routes"][0]["nexthop"],
"1.1.1.1")
self.assertIsNone(res["gateway_ip"])
def test_update_subnet_gateway_ip_with_default_route_in_db(self):
with self._stubs(
host_routes=self.DEFAULT_ROUTE,
new_routes=[dict(destination="0.0.0.0/0", nexthop="1.2.3.4")]
) as (dns_create, route_update, route_create):
req = dict(subnet=dict(gateway_ip="1.2.3.4"))
res = self.plugin.update_subnet(self.context, 1, req)
self.assertEqual(dns_create.call_count, 0)
self.assertEqual(route_create.call_count, 0)
self.assertEqual(route_update.call_count, 1)
self.assertEqual(len(res["host_routes"]), 1)
self.assertEqual(res["host_routes"][0]["destination"],
"0.0.0.0/0")
self.assertEqual(res["host_routes"][0]["nexthop"],
"1.2.3.4")
self.assertEqual(res["gateway_ip"], "1.2.3.4")
def test_update_subnet_gateway_ip_with_non_default_route_in_db(self):
with self._stubs(
host_routes=[dict(destination="1.1.1.1/8", nexthop="9.9.9.9")],
find_routes=False,
new_routes=[dict(destination="1.1.1.1/8", nexthop="9.9.9.9"),
dict(destination="0.0.0.0/0", nexthop="1.2.3.4")]
) as (dns_create, route_update, route_create):
req = dict(subnet=dict(gateway_ip="1.2.3.4"))
res = self.plugin.update_subnet(self.context, 1, req)
self.assertEqual(dns_create.call_count, 0)
self.assertEqual(route_create.call_count, 1)
self.assertEqual(res["gateway_ip"], "1.2.3.4")
self.assertEqual(len(res["host_routes"]), 2)
res_tuples = [(r["destination"], r["nexthop"])
for r in res["host_routes"]]
self.assertIn(("0.0.0.0/0", "1.2.3.4"), res_tuples)
self.assertIn(("1.1.1.1/8", "9.9.9.9"), res_tuples)
def test_update_subnet_gateway_ip_without_default_route_in_db(self):
with self._stubs(
host_routes=None,
new_routes=[dict(destination="0.0.0.0/0", nexthop="1.2.3.4")]
) as (dns_create, route_update, route_create):
req = dict(subnet=dict(gateway_ip="1.2.3.4"))
res = self.plugin.update_subnet(self.context, 1, req)
self.assertEqual(dns_create.call_count, 0)
self.assertEqual(route_create.call_count, 1)
self.assertEqual(len(res["host_routes"]), 1)
self.assertEqual(res["host_routes"][0]["destination"],
"0.0.0.0/0")
self.assertEqual(res["host_routes"][0]["nexthop"],
"1.2.3.4")
self.assertEqual(res["gateway_ip"], "1.2.3.4")
def test_update_subnet_gateway_ip_with_default_route_in_args(self):
new_routes = [dict(destination="0.0.0.0/0",
nexthop="4.3.2.1")]
with self._stubs(
host_routes=self.DEFAULT_ROUTE,
new_routes=new_routes
) as (dns_create, route_update, route_create):
req = dict(subnet=dict(
host_routes=new_routes,
gateway_ip="1.2.3.4"))
res = self.plugin.update_subnet(self.context, 1, req)
self.assertEqual(dns_create.call_count, 0)
self.assertEqual(route_create.call_count, 1)
self.assertEqual(len(res["host_routes"]), 1)
self.assertEqual(res["host_routes"][0]["destination"],
"0.0.0.0/0")
self.assertEqual(res["host_routes"][0]["nexthop"],
"4.3.2.1")
self.assertEqual(res["gateway_ip"], "4.3.2.1")
class TestQuarkDeleteSubnet(TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, subnet, ips):
ip_mods = []
subnet_mod = None
if subnet:
subnet_mod = models.Subnet()
subnet_mod.update(subnet)
for ip in ips:
ip_mod = models.IPAddress()
ip_mod.update(ip)
ip_mods.append(ip_mod)
db_mod = "quark.db.api"
with contextlib.nested(
mock.patch("%s.subnet_find" % db_mod),
mock.patch("%s.subnet_delete" % db_mod)
) as (sub_find, sub_delete):
if subnet_mod:
subnet_mod.allocated_ips = ip_mods
sub_find.return_value = subnet_mod
yield sub_delete
def test_delete_subnet(self):
subnet = dict(id=1)
with self._stubs(subnet=subnet, ips=[]) as sub_delete:
self.plugin.delete_subnet(self.context, 1)
self.assertTrue(sub_delete.called)
def test_delete_subnet_no_subnet_fails(self):
with self._stubs(subnet=None, ips=[]):
with self.assertRaises(exceptions.SubnetNotFound):
self.plugin.delete_subnet(self.context, 1)
def test_delete_subnet_has_allocated_ips_fails(self):
subnet = dict(id=1)
with self._stubs(subnet=subnet, ips=[{}]):
with self.assertRaises(exceptions.SubnetInUse):
self.plugin.delete_subnet(self.context, 1)
class TestQuarkGetNetworks(TestQuarkPlugin): class TestQuarkGetNetworks(TestQuarkPlugin):
@contextlib.contextmanager @contextlib.contextmanager
def _stubs(self, nets=None, subnets=None): def _stubs(self, nets=None, subnets=None):