Pulls routes out into a submodule
Refactors plugin.py and associated tests to pull routes out into their own submodule for improved plugin readability.
This commit is contained in:
@@ -38,19 +38,18 @@ from neutron import neutron_plugin_base_v2
|
||||
from quark.api import extensions
|
||||
from quark.db import api as db_api
|
||||
from quark.db import models
|
||||
from quark import exceptions as quark_exceptions
|
||||
from quark import network_strategy
|
||||
from quark.plugin_modules import ip_addresses
|
||||
from quark.plugin_modules import ip_policies
|
||||
from quark.plugin_modules import mac_address_ranges
|
||||
from quark.plugin_modules import ports
|
||||
from quark.plugin_modules import routes
|
||||
from quark.plugin_modules import security_groups
|
||||
from quark import plugin_views as v
|
||||
from quark import utils
|
||||
|
||||
LOG = logging.getLogger("neutron.quark")
|
||||
CONF = cfg.CONF
|
||||
DEFAULT_ROUTE = netaddr.IPNetwork("0.0.0.0/0")
|
||||
STRATEGY = network_strategy.STRATEGY
|
||||
|
||||
|
||||
@@ -146,15 +145,15 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
|
||||
cidr = netaddr.IPNetwork(sub_attrs["cidr"])
|
||||
gateway_ip = utils.pop_param(sub_attrs, "gateway_ip", str(cidr[1]))
|
||||
dns_ips = utils.pop_param(sub_attrs, "dns_nameservers", [])
|
||||
routes = utils.pop_param(sub_attrs, "host_routes", [])
|
||||
host_routes = utils.pop_param(sub_attrs, "host_routes", [])
|
||||
allocation_pools = utils.pop_param(sub_attrs, "allocation_pools", [])
|
||||
|
||||
new_subnet = db_api.subnet_create(context, **sub_attrs)
|
||||
|
||||
default_route = None
|
||||
for route in routes:
|
||||
for route in host_routes:
|
||||
netaddr_route = netaddr.IPNetwork(route["destination"])
|
||||
if netaddr_route.value == DEFAULT_ROUTE.value:
|
||||
if netaddr_route.value == routes.DEFAULT_ROUTE.value:
|
||||
default_route = route
|
||||
gateway_ip = default_route["nexthop"]
|
||||
new_subnet["routes"].append(db_api.route_create(
|
||||
@@ -162,7 +161,7 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
|
||||
|
||||
if default_route is None:
|
||||
new_subnet["routes"].append(db_api.route_create(
|
||||
context, cidr=str(DEFAULT_ROUTE), gateway=gateway_ip))
|
||||
context, cidr=str(routes.DEFAULT_ROUTE), gateway=gateway_ip))
|
||||
|
||||
for dns_ip in dns_ips:
|
||||
new_subnet["dns_nameservers"].append(db_api.dns_create(
|
||||
@@ -179,7 +178,7 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
|
||||
if not new_subnet["network"]:
|
||||
new_subnet["network"] = net
|
||||
subnet_dict = v._make_subnet_dict(new_subnet,
|
||||
default_route=DEFAULT_ROUTE)
|
||||
default_route=routes.DEFAULT_ROUTE)
|
||||
subnet_dict["gateway_ip"] = gateway_ip
|
||||
return subnet_dict
|
||||
|
||||
@@ -203,25 +202,26 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
|
||||
s = subnet["subnet"]
|
||||
|
||||
dns_ips = s.pop("dns_nameservers", [])
|
||||
routes = s.pop("host_routes", [])
|
||||
host_routes = s.pop("host_routes", [])
|
||||
gateway_ip = s.pop("gateway_ip", None)
|
||||
|
||||
if gateway_ip:
|
||||
default_route = None
|
||||
for route in routes:
|
||||
for route in host_routes:
|
||||
netaddr_route = netaddr.IPNetwork(route["destination"])
|
||||
if netaddr_route.value == DEFAULT_ROUTE.value:
|
||||
if netaddr_route.value == routes.DEFAULT_ROUTE.value:
|
||||
default_route = route
|
||||
break
|
||||
if default_route is None:
|
||||
route_model = db_api.route_find(
|
||||
context, cidr=str(DEFAULT_ROUTE), subnet_id=id,
|
||||
context, cidr=str(routes.DEFAULT_ROUTE), subnet_id=id,
|
||||
scope=db_api.ONE)
|
||||
if route_model:
|
||||
db_api.route_update(context, route_model,
|
||||
gateway=gateway_ip)
|
||||
else:
|
||||
db_api.route_create(context, cidr=str(DEFAULT_ROUTE),
|
||||
db_api.route_create(context,
|
||||
cidr=str(routes.DEFAULT_ROUTE),
|
||||
gateway=gateway_ip, subnet_id=id)
|
||||
|
||||
if dns_ips:
|
||||
@@ -231,14 +231,14 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
|
||||
context,
|
||||
ip=netaddr.IPAddress(dns_ip)))
|
||||
|
||||
if routes:
|
||||
if host_routes:
|
||||
subnet_db["routes"] = []
|
||||
for route in routes:
|
||||
for route in host_routes:
|
||||
subnet_db["routes"].append(db_api.route_create(
|
||||
context, cidr=route["destination"], gateway=route["nexthop"]))
|
||||
|
||||
subnet = db_api.subnet_update(context, subnet_db, **s)
|
||||
return v._make_subnet_dict(subnet, default_route=DEFAULT_ROUTE)
|
||||
return v._make_subnet_dict(subnet, default_route=routes.DEFAULT_ROUTE)
|
||||
|
||||
def get_subnet(self, context, id, fields=None):
|
||||
"""Retrieve a subnet.
|
||||
@@ -261,7 +261,7 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
|
||||
net_id = STRATEGY.get_parent_network(net_id)
|
||||
subnet["network_id"] = net_id
|
||||
|
||||
return v._make_subnet_dict(subnet, default_route=DEFAULT_ROUTE)
|
||||
return v._make_subnet_dict(subnet, default_route=routes.DEFAULT_ROUTE)
|
||||
|
||||
def get_subnets(self, context, filters=None, fields=None):
|
||||
"""Retrieve a list of subnets.
|
||||
@@ -286,7 +286,7 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
|
||||
(context.tenant_id, filters, fields))
|
||||
subnets = db_api.subnet_find(context, **filters)
|
||||
return v._make_subnets_list(subnets, fields=fields,
|
||||
default_route=DEFAULT_ROUTE)
|
||||
default_route=routes.DEFAULT_ROUTE)
|
||||
|
||||
def get_subnets_count(self, context, filters=None):
|
||||
"""Return the number of subnets.
|
||||
@@ -486,51 +486,6 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
|
||||
self._delete_subnet(context, subnet)
|
||||
db_api.network_delete(context, net)
|
||||
|
||||
def get_route(self, context, id):
|
||||
LOG.info("get_route %s for tenant %s" % (id, context.tenant_id))
|
||||
route = db_api.route_find(context, id=id, scope=db_api.ONE)
|
||||
if not route:
|
||||
raise quark_exceptions.RouteNotFound(route_id=id)
|
||||
return v._make_route_dict(route)
|
||||
|
||||
def get_routes(self, context):
|
||||
LOG.info("get_routes for tenant %s" % context.tenant_id)
|
||||
routes = db_api.route_find(context)
|
||||
return [v._make_route_dict(r) for r in routes]
|
||||
|
||||
def create_route(self, context, route):
|
||||
LOG.info("create_route for tenant %s" % context.tenant_id)
|
||||
route = route["route"]
|
||||
subnet_id = route["subnet_id"]
|
||||
subnet = db_api.subnet_find(context, id=subnet_id, scope=db_api.ONE)
|
||||
if not subnet:
|
||||
raise exceptions.SubnetNotFound(subnet_id=subnet_id)
|
||||
|
||||
# TODO(anyone): May want to denormalize the cidr values into columns
|
||||
# to achieve single db lookup on conflict check
|
||||
route_cidr = netaddr.IPNetwork(route["cidr"])
|
||||
subnet_routes = db_api.route_find(context, subnet_id=subnet_id,
|
||||
scope=db_api.ALL)
|
||||
for sub_route in subnet_routes:
|
||||
sub_route_cidr = netaddr.IPNetwork(sub_route["cidr"])
|
||||
if sub_route_cidr.value == DEFAULT_ROUTE.value:
|
||||
continue
|
||||
if route_cidr in sub_route_cidr or sub_route_cidr in route_cidr:
|
||||
raise quark_exceptions.RouteConflict(
|
||||
route_id=sub_route["id"], cidr=str(route_cidr))
|
||||
new_route = db_api.route_create(context, **route)
|
||||
return v._make_route_dict(new_route)
|
||||
|
||||
def delete_route(self, context, id):
|
||||
#TODO(mdietz): This is probably where we check to see that someone is
|
||||
# admin and only filter on tenant if they aren't. Correct
|
||||
# for all the above later
|
||||
LOG.info("delete_route %s for tenant %s" % (id, context.tenant_id))
|
||||
route = db_api.route_find(context, id, scope=db_api.ONE)
|
||||
if not route:
|
||||
raise quark_exceptions.RouteNotFound(route_id=id)
|
||||
db_api.route_delete(context, route)
|
||||
|
||||
def get_mac_address_range(self, context, id, fields=None):
|
||||
return mac_address_ranges.get_mac_address_range(context, id, fields)
|
||||
|
||||
@@ -627,3 +582,15 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
|
||||
|
||||
def disassociate_port(self, context, id, ip_address_id):
|
||||
return ports.disassociate_port(context, id, ip_address_id)
|
||||
|
||||
def get_route(self, context, id):
|
||||
return routes.get_route(context, id)
|
||||
|
||||
def get_routes(self, context):
|
||||
return routes.get_routes(context)
|
||||
|
||||
def create_route(self, context, route):
|
||||
return routes.create_route(context, route)
|
||||
|
||||
def delete_route(self, context, id):
|
||||
routes.delete_route(context, id)
|
||||
|
80
quark/plugin_modules/routes.py
Normal file
80
quark/plugin_modules/routes.py
Normal file
@@ -0,0 +1,80 @@
|
||||
# Copyright 2013 Openstack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import netaddr
|
||||
|
||||
from neutron.common import exceptions
|
||||
from neutron.openstack.common import importutils
|
||||
from neutron.openstack.common import log as logging
|
||||
from oslo.config import cfg
|
||||
|
||||
from quark.db import api as db_api
|
||||
from quark import exceptions as quark_exceptions
|
||||
from quark import plugin_views as v
|
||||
|
||||
CONF = cfg.CONF
|
||||
DEFAULT_ROUTE = netaddr.IPNetwork("0.0.0.0/0")
|
||||
LOG = logging.getLogger("neutron.quark")
|
||||
|
||||
ipam_driver = (importutils.import_class(CONF.QUARK.ipam_driver))()
|
||||
|
||||
|
||||
def get_route(context, id):
|
||||
LOG.info("get_route %s for tenant %s" % (id, context.tenant_id))
|
||||
route = db_api.route_find(context, id=id, scope=db_api.ONE)
|
||||
if not route:
|
||||
raise quark_exceptions.RouteNotFound(route_id=id)
|
||||
return v._make_route_dict(route)
|
||||
|
||||
|
||||
def get_routes(context):
|
||||
LOG.info("get_routes for tenant %s" % context.tenant_id)
|
||||
routes = db_api.route_find(context)
|
||||
return [v._make_route_dict(r) for r in routes]
|
||||
|
||||
|
||||
def create_route(context, route):
|
||||
LOG.info("create_route for tenant %s" % context.tenant_id)
|
||||
route = route["route"]
|
||||
subnet_id = route["subnet_id"]
|
||||
subnet = db_api.subnet_find(context, id=subnet_id, scope=db_api.ONE)
|
||||
if not subnet:
|
||||
raise exceptions.SubnetNotFound(subnet_id=subnet_id)
|
||||
|
||||
# TODO(anyone): May want to denormalize the cidr values into columns
|
||||
# to achieve single db lookup on conflict check
|
||||
route_cidr = netaddr.IPNetwork(route["cidr"])
|
||||
subnet_routes = db_api.route_find(context, subnet_id=subnet_id,
|
||||
scope=db_api.ALL)
|
||||
for sub_route in subnet_routes:
|
||||
sub_route_cidr = netaddr.IPNetwork(sub_route["cidr"])
|
||||
if sub_route_cidr.value == DEFAULT_ROUTE.value:
|
||||
continue
|
||||
if route_cidr in sub_route_cidr or sub_route_cidr in route_cidr:
|
||||
raise quark_exceptions.RouteConflict(
|
||||
route_id=sub_route["id"], cidr=str(route_cidr))
|
||||
new_route = db_api.route_create(context, **route)
|
||||
return v._make_route_dict(new_route)
|
||||
|
||||
|
||||
def delete_route(context, id):
|
||||
#TODO(mdietz): This is probably where we check to see that someone is
|
||||
# admin and only filter on tenant if they aren't. Correct
|
||||
# for all the above later
|
||||
LOG.info("delete_route %s for tenant %s" % (id, context.tenant_id))
|
||||
route = db_api.route_find(context, id, scope=db_api.ONE)
|
||||
if not route:
|
||||
raise quark_exceptions.RouteNotFound(route_id=id)
|
||||
db_api.route_delete(context, route)
|
133
quark/tests/plugin_modules/test_routes.py
Normal file
133
quark/tests/plugin_modules/test_routes.py
Normal file
@@ -0,0 +1,133 @@
|
||||
# Copyright 2013 Openstack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
|
||||
import mock
|
||||
from neutron.common import exceptions
|
||||
|
||||
from quark import exceptions as quark_exceptions
|
||||
from quark.tests import test_quark_plugin
|
||||
|
||||
|
||||
class TestQuarkGetRoutes(test_quark_plugin.TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, routes):
|
||||
with mock.patch("quark.db.api.route_find") as route_find:
|
||||
route_find.return_value = routes
|
||||
yield
|
||||
|
||||
def test_get_routes(self):
|
||||
route = dict(id=1, cidr="192.168.0.0/24", gateway="192.168.0.1",
|
||||
subnet_id=2)
|
||||
with self._stubs(routes=[route]):
|
||||
res = self.plugin.get_routes(self.context)
|
||||
for key in route.keys():
|
||||
self.assertEqual(res[0][key], route[key])
|
||||
|
||||
def test_get_route(self):
|
||||
route = dict(id=1, cidr="192.168.0.0/24", gateway="192.168.0.1",
|
||||
subnet_id=2)
|
||||
with self._stubs(routes=route):
|
||||
res = self.plugin.get_route(self.context, 1)
|
||||
for key in route.keys():
|
||||
self.assertEqual(res[key], route[key])
|
||||
|
||||
def test_get_route_not_found_fails(self):
|
||||
with self._stubs(routes=None):
|
||||
with self.assertRaises(quark_exceptions.RouteNotFound):
|
||||
self.plugin.get_route(self.context, 1)
|
||||
|
||||
|
||||
class TestQuarkCreateRoutes(test_quark_plugin.TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, create_route, find_routes, subnet):
|
||||
db_mod = "quark.db.api"
|
||||
with contextlib.nested(
|
||||
mock.patch("%s.route_create" % db_mod),
|
||||
mock.patch("%s.route_find" % db_mod),
|
||||
mock.patch("%s.subnet_find" % db_mod)
|
||||
) as (route_create, route_find, subnet_find):
|
||||
route_create.return_value = create_route
|
||||
route_find.return_value = find_routes
|
||||
subnet_find.return_value = subnet
|
||||
yield
|
||||
|
||||
def test_create_route(self):
|
||||
subnet = dict(id=2)
|
||||
create_route = dict(id=1, cidr="172.16.0.0/24", gateway="172.16.0.1",
|
||||
subnet_id=subnet["id"])
|
||||
route = dict(id=1, cidr="192.168.0.0/24", gateway="192.168.0.1",
|
||||
subnet_id=subnet["id"])
|
||||
with self._stubs(create_route=create_route, find_routes=[route],
|
||||
subnet=subnet):
|
||||
res = self.plugin.create_route(self.context,
|
||||
dict(route=create_route))
|
||||
for key in create_route.keys():
|
||||
self.assertEqual(res[key], create_route[key])
|
||||
|
||||
def test_create_route_no_subnet_fails(self):
|
||||
subnet = dict(id=2)
|
||||
route = dict(id=1, cidr="192.168.0.0/24", gateway="192.168.0.1",
|
||||
subnet_id=subnet["id"])
|
||||
with self._stubs(create_route=route, find_routes=[], subnet=None):
|
||||
with self.assertRaises(exceptions.SubnetNotFound):
|
||||
self.plugin.create_route(self.context, dict(route=route))
|
||||
|
||||
def test_create_no_other_routes(self):
|
||||
subnet = dict(id=2)
|
||||
create_route = dict(id=1, cidr="192.168.0.0/24", gateway="192.168.0.1",
|
||||
subnet_id=subnet["id"])
|
||||
with self._stubs(create_route=create_route, find_routes=[],
|
||||
subnet=subnet):
|
||||
res = self.plugin.create_route(self.context,
|
||||
dict(route=create_route))
|
||||
self.assertEqual(res["cidr"], create_route["cidr"])
|
||||
|
||||
def test_create_conflicting_route_raises(self):
|
||||
subnet = dict(id=2)
|
||||
create_route = dict(id=1, cidr="192.168.0.0/24", gateway="192.168.0.1",
|
||||
subnet_id=subnet["id"])
|
||||
route = dict(id=1, cidr="192.168.0.0/24", gateway="192.168.0.1",
|
||||
subnet_id=subnet["id"])
|
||||
with self._stubs(create_route=create_route, find_routes=[route],
|
||||
subnet=subnet):
|
||||
with self.assertRaises(quark_exceptions.RouteConflict):
|
||||
self.plugin.create_route(self.context,
|
||||
dict(route=create_route))
|
||||
|
||||
|
||||
class TestQuarkDeleteRoutes(test_quark_plugin.TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, route):
|
||||
db_mod = "quark.db.api"
|
||||
with contextlib.nested(
|
||||
mock.patch("%s.route_delete" % db_mod),
|
||||
mock.patch("%s.route_find" % db_mod),
|
||||
) as (route_delete, route_find):
|
||||
route_find.return_value = route
|
||||
yield route_delete
|
||||
|
||||
def test_delete_route(self):
|
||||
route = dict(id=1, cidr="192.168.0.0/24", gateway="192.168.0.1",
|
||||
subnet_id=2)
|
||||
with self._stubs(route=route) as route_delete:
|
||||
self.plugin.delete_route(self.context, 1)
|
||||
self.assertTrue(route_delete.called)
|
||||
|
||||
def test_delete_route_not_found_fails(self):
|
||||
with self._stubs(route=None):
|
||||
with self.assertRaises(quark_exceptions.RouteNotFound):
|
||||
self.plugin.delete_route(self.context, 1)
|
@@ -24,7 +24,6 @@ from neutron.db import api as db_api
|
||||
from oslo.config import cfg
|
||||
|
||||
from quark.db import models
|
||||
from quark import exceptions as quark_exceptions
|
||||
import quark.plugin
|
||||
|
||||
from quark.tests import test_base
|
||||
@@ -898,114 +897,3 @@ class TestQuarkCreateNetwork(TestQuarkPlugin):
|
||||
self.assertEqual(net["subnets"], [])
|
||||
self.assertEqual(net["shared"], False)
|
||||
self.assertEqual(net["tenant_id"], 0)
|
||||
|
||||
|
||||
class TestQuarkGetRoutes(TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, routes):
|
||||
with mock.patch("quark.db.api.route_find") as route_find:
|
||||
route_find.return_value = routes
|
||||
yield
|
||||
|
||||
def test_get_routes(self):
|
||||
route = dict(id=1, cidr="192.168.0.0/24", gateway="192.168.0.1",
|
||||
subnet_id=2)
|
||||
with self._stubs(routes=[route]):
|
||||
res = self.plugin.get_routes(self.context)
|
||||
for key in route.keys():
|
||||
self.assertEqual(res[0][key], route[key])
|
||||
|
||||
def test_get_route(self):
|
||||
route = dict(id=1, cidr="192.168.0.0/24", gateway="192.168.0.1",
|
||||
subnet_id=2)
|
||||
with self._stubs(routes=route):
|
||||
res = self.plugin.get_route(self.context, 1)
|
||||
for key in route.keys():
|
||||
self.assertEqual(res[key], route[key])
|
||||
|
||||
def test_get_route_not_found_fails(self):
|
||||
with self._stubs(routes=None):
|
||||
with self.assertRaises(quark_exceptions.RouteNotFound):
|
||||
self.plugin.get_route(self.context, 1)
|
||||
|
||||
|
||||
class TestQuarkCreateRoutes(TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, create_route, find_routes, subnet):
|
||||
db_mod = "quark.db.api"
|
||||
with contextlib.nested(
|
||||
mock.patch("%s.route_create" % db_mod),
|
||||
mock.patch("%s.route_find" % db_mod),
|
||||
mock.patch("%s.subnet_find" % db_mod)
|
||||
) as (route_create, route_find, subnet_find):
|
||||
route_create.return_value = create_route
|
||||
route_find.return_value = find_routes
|
||||
subnet_find.return_value = subnet
|
||||
yield
|
||||
|
||||
def test_create_route(self):
|
||||
subnet = dict(id=2)
|
||||
create_route = dict(id=1, cidr="172.16.0.0/24", gateway="172.16.0.1",
|
||||
subnet_id=subnet["id"])
|
||||
route = dict(id=1, cidr="192.168.0.0/24", gateway="192.168.0.1",
|
||||
subnet_id=subnet["id"])
|
||||
with self._stubs(create_route=create_route, find_routes=[route],
|
||||
subnet=subnet):
|
||||
res = self.plugin.create_route(self.context,
|
||||
dict(route=create_route))
|
||||
for key in create_route.keys():
|
||||
self.assertEqual(res[key], create_route[key])
|
||||
|
||||
def test_create_route_no_subnet_fails(self):
|
||||
subnet = dict(id=2)
|
||||
route = dict(id=1, cidr="192.168.0.0/24", gateway="192.168.0.1",
|
||||
subnet_id=subnet["id"])
|
||||
with self._stubs(create_route=route, find_routes=[], subnet=None):
|
||||
with self.assertRaises(exceptions.SubnetNotFound):
|
||||
self.plugin.create_route(self.context, dict(route=route))
|
||||
|
||||
def test_create_no_other_routes(self):
|
||||
subnet = dict(id=2)
|
||||
create_route = dict(id=1, cidr="192.168.0.0/24", gateway="192.168.0.1",
|
||||
subnet_id=subnet["id"])
|
||||
with self._stubs(create_route=create_route, find_routes=[],
|
||||
subnet=subnet):
|
||||
res = self.plugin.create_route(self.context,
|
||||
dict(route=create_route))
|
||||
self.assertEqual(res["cidr"], create_route["cidr"])
|
||||
|
||||
def test_create_conflicting_route_raises(self):
|
||||
subnet = dict(id=2)
|
||||
create_route = dict(id=1, cidr="192.168.0.0/24", gateway="192.168.0.1",
|
||||
subnet_id=subnet["id"])
|
||||
route = dict(id=1, cidr="192.168.0.0/24", gateway="192.168.0.1",
|
||||
subnet_id=subnet["id"])
|
||||
with self._stubs(create_route=create_route, find_routes=[route],
|
||||
subnet=subnet):
|
||||
with self.assertRaises(quark_exceptions.RouteConflict):
|
||||
self.plugin.create_route(self.context,
|
||||
dict(route=create_route))
|
||||
|
||||
|
||||
class TestQuarkDeleteRoutes(TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, route):
|
||||
db_mod = "quark.db.api"
|
||||
with contextlib.nested(
|
||||
mock.patch("%s.route_delete" % db_mod),
|
||||
mock.patch("%s.route_find" % db_mod),
|
||||
) as (route_delete, route_find):
|
||||
route_find.return_value = route
|
||||
yield route_delete
|
||||
|
||||
def test_delete_route(self):
|
||||
route = dict(id=1, cidr="192.168.0.0/24", gateway="192.168.0.1",
|
||||
subnet_id=2)
|
||||
with self._stubs(route=route) as route_delete:
|
||||
self.plugin.delete_route(self.context, 1)
|
||||
self.assertTrue(route_delete.called)
|
||||
|
||||
def test_delete_route_not_found_fails(self):
|
||||
with self._stubs(route=None):
|
||||
with self.assertRaises(quark_exceptions.RouteNotFound):
|
||||
self.plugin.delete_route(self.context, 1)
|
||||
|
Reference in New Issue
Block a user