Implements provider subnets

JIRA:NCP-1580

Implements hooks for showing provider subnets, which are merely aliases
for subnets owned by provider networks. Unlike provider networks, they
are merely placeholders and will never be related in the schema to any
other object.
This commit is contained in:
Matt Dietz
2015-10-23 16:55:46 -05:00
parent aed03d3929
commit 9c44acb83e
12 changed files with 327 additions and 60 deletions

View File

@@ -0,0 +1,52 @@
import sys
from neutron.common import config
from neutron.db import api as neutron_db_api
from oslo_config import cfg
from quark.db import models
def main():
config.init(sys.argv[1:])
if not cfg.CONF.config_file:
sys.exit(_("ERROR: Unable to find configuration file via the default"
" search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and"
" the '--config-file' option!"))
config.setup_logging()
session = neutron_db_api.get_session()
subnets = [
models.Subnet(name="public_v4",
network_id="00000000-0000-0000-0000-000000000000",
id="00000000-0000-0000-0000-000000000000",
tenant_id="rackspace",
segment_id="rackspace",
do_not_use=True,
_cidr="0.0.0.0/0"),
models.Subnet(name="public_v6",
network_id="00000000-0000-0000-0000-000000000000",
id="11111111-1111-1111-1111-111111111111",
tenant_id="rackspace",
segment_id="rackspace",
do_not_use=True,
_cidr="::/0"),
models.Subnet(name="private_v4",
network_id="11111111-1111-1111-1111-111111111111",
id="22222222-2222-2222-2222-222222222222",
tenant_id="rackspace",
segment_id="rackspace",
do_not_use=True,
_cidr="0.0.0.0/0"),
models.Subnet(name="private_v6",
network_id="11111111-1111-1111-1111-111111111111",
id="33333333-3333-3333-3333-333333333333",
tenant_id="rackspace",
segment_id="rackspace",
do_not_use=True,
_cidr="::/0")]
session.bulk_save_objects(subnets)
if __name__ == "__main__":
main()

View File

@@ -566,7 +566,7 @@ def network_find(context, limit=None, sorts=None, marker=None,
defaults = [] defaults = []
provider_query = False provider_query = False
if "id" in filters: if "id" in filters:
ids, defaults = STRATEGY.split_network_ids(context, filters["id"]) ids, defaults = STRATEGY.split_network_ids(filters["id"])
if not ids and defaults and "shared" not in filters: if not ids and defaults and "shared" not in filters:
provider_query = True provider_query = True
if ids: if ids:
@@ -575,7 +575,7 @@ def network_find(context, limit=None, sorts=None, marker=None,
filters.pop("id") filters.pop("id")
if "shared" in filters: if "shared" in filters:
defaults = STRATEGY.get_assignable_networks(context) defaults = STRATEGY.get_provider_networks()
if True in filters["shared"]: if True in filters["shared"]:
if ids: if ids:
defaults = [net for net in ids if net in defaults] defaults = [net for net in ids if net in defaults]
@@ -709,19 +709,64 @@ def subnet_update_set_alloc_pool_cache(context, subnet, cache_data=None):
@scoped @scoped
def subnet_find(context, limit=None, page_reverse=False, sorts=None, def subnet_find(context, limit=None, page_reverse=False, sorts=None,
marker_obj=None, **filters): marker_obj=None, fields=None, **filters):
if "shared" in filters and True in filters["shared"]: ids = []
return [] defaults = []
provider_query = False
if "id" in filters:
ids, defaults = STRATEGY.split_subnet_ids(filters["id"])
if not ids and defaults and "shared" not in filters:
provider_query = True
if ids:
filters["id"] = ids
else:
filters.pop("id")
if "shared" in filters:
defaults = STRATEGY.get_provider_subnets()
if True in filters["shared"]:
if ids:
defaults = [sub for sub in ids if sub in defaults]
filters.pop("id")
if not defaults:
return []
else:
defaults.insert(0, INVERT_DEFAULTS)
filters.pop("shared")
return _subnet_find(context, limit, sorts, marker_obj, page_reverse,
fields, defaults=defaults,
provider_query=provider_query, **filters)
def _subnet_find(context, limit, sorts, marker, page_reverse, fields,
defaults=None, provider_query=False, **filters):
query = context.session.query(models.Subnet) query = context.session.query(models.Subnet)
model_filters = _model_query(context, models.Subnet, filters) model_filters = _model_query(context, models.Subnet, filters, query)
if defaults:
invert_defaults = False
if INVERT_DEFAULTS in defaults:
invert_defaults = True
defaults.pop(0)
if filters and invert_defaults:
query = query.filter(and_(not_(models.Subnet.id.in_(defaults)),
and_(*model_filters)))
elif not provider_query and filters and not invert_defaults:
query = query.filter(or_(models.Network.id.in_(defaults),
and_(*model_filters)))
elif not invert_defaults:
query = query.filter(models.Subnet.id.in_(defaults))
else:
query = query.filter(*model_filters)
if "join_dns" in filters: if "join_dns" in filters:
query = query.options(orm.joinedload(models.Subnet.dns_nameservers)) query = query.options(orm.joinedload(models.Subnet.dns_nameservers))
if "join_routes" in filters: if "join_routes" in filters:
query = query.options(orm.joinedload(models.Subnet.routes)) query = query.options(orm.joinedload(models.Subnet.routes))
return paginate_query(query.filter(*model_filters), models.Subnet, limit,
sorts, marker_obj) return paginate_query(query, models.Subnet, limit, sorts, marker)
def subnet_count_all(context, **filters): def subnet_count_all(context, **filters):

View File

@@ -57,7 +57,7 @@ class UnmanagedDriver(object):
def create_port(self, context, network_id, port_id, **kwargs): def create_port(self, context, network_id, port_id, **kwargs):
LOG.info("create_port %s %s %s" % (context.tenant_id, network_id, LOG.info("create_port %s %s %s" % (context.tenant_id, network_id,
port_id)) port_id))
bridge_name = STRATEGY.get_network(context, network_id)["bridge"] bridge_name = STRATEGY.get_network(network_id)["bridge"]
return {"uuid": port_id, "bridge": bridge_name} return {"uuid": port_id, "bridge": bridge_name}
def update_port(self, context, port_id, **kwargs): def update_port(self, context, port_id, **kwargs):

View File

@@ -30,7 +30,7 @@ CONF.register_opts(quark_opts, "QUARK")
class JSONStrategy(object): class JSONStrategy(object):
def __init__(self, strategy=None): def __init__(self, strategy=None):
self.reverse_strategy = {} self.subnet_strategy = {}
self.strategy = {} self.strategy = {}
if not strategy: if not strategy:
self._compile_strategy(CONF.QUARK.default_net_strategy) self._compile_strategy(CONF.QUARK.default_net_strategy)
@@ -39,25 +39,47 @@ class JSONStrategy(object):
def _compile_strategy(self, strategy): def _compile_strategy(self, strategy):
self.strategy = json.loads(strategy) self.strategy = json.loads(strategy)
for net_id, meta in self.strategy.iteritems():
for subnet_id in meta["subnets"]:
self.subnet_strategy[subnet_id] = net_id
def split_network_ids(self, context, net_ids): def _split(self, func, resource_ids):
assignable = [] provider = []
tenant = [] tenant = []
for net_id in net_ids: for res_id in resource_ids:
if self.is_provider_network(net_id): if func(res_id):
assignable.append(net_id) provider.append(res_id)
else: else:
tenant.append(net_id) tenant.append(res_id)
return tenant, assignable return tenant, provider
def get_network(self, context, net_id): def split_network_ids(self, net_ids):
return self.strategy.get(net_id) return self._split(self.is_provider_network, net_ids)
def get_assignable_networks(self, context): def split_subnet_ids(self, subnet_ids):
return self._split(self.is_provider_subnet, subnet_ids)
def get_provider_networks(self):
return self.strategy.keys() return self.strategy.keys()
def get_provider_subnets(self):
return self.subnet_strategy.keys()
def get_network(self, net_id):
return self.strategy.get(net_id)
def is_provider_network(self, net_id): def is_provider_network(self, net_id):
return self.strategy.get(net_id) is not None return self.strategy.get(net_id) is not None
def is_provider_subnet(self, subnet_id):
return subnet_id in self.subnet_strategy
def subnet_ids_for_network(self, net_id):
if net_id in self.strategy:
return self.strategy.get(net_id)["subnets"]
def get_network_for_subnet(self, subnet_id):
return self.subnet_strategy.get(subnet_id)
STRATEGY = JSONStrategy() STRATEGY = JSONStrategy()

View File

@@ -369,9 +369,10 @@ def get_subnets(context, limit=None, page_reverse=False, sorts=None,
""" """
LOG.info("get_subnets for tenant %s with filters %s fields %s" % LOG.info("get_subnets for tenant %s with filters %s fields %s" %
(context.tenant_id, filters, fields)) (context.tenant_id, filters, fields))
filters = filters or {}
subnets = db_api.subnet_find(context, limit=limit, subnets = db_api.subnet_find(context, limit=limit,
page_reverse=page_reverse, sorts=sorts, page_reverse=page_reverse, sorts=sorts,
marker=marker, marker_obj=marker,
join_dns=True, join_routes=True, **filters) join_dns=True, join_routes=True, **filters)
for subnet in subnets: for subnet in subnets:
cache = subnet.get("_allocation_pool_cache") cache = subnet.get("_allocation_pool_cache")

View File

@@ -71,16 +71,20 @@ def _make_network_dict(network, fields=None):
else: else:
res["subnets"] = [s["id"] for s in network.get("subnets", [])] res["subnets"] = [s["id"] for s in network.get("subnets", [])]
else: else:
res["subnets"] = [] res["subnets"] = STRATEGY.subnet_ids_for_network(network["id"])
return res return res
def _make_subnet_dict(subnet, fields=None): def _make_subnet_dict(subnet, fields=None):
dns_nameservers = [str(netaddr.IPAddress(dns["ip"])) dns_nameservers = [str(netaddr.IPAddress(dns["ip"]))
for dns in subnet.get("dns_nameservers")] for dns in subnet.get("dns_nameservers", [])]
net_id = subnet["network_id"] subnet_id = subnet.get("id")
if STRATEGY.is_provider_subnet(subnet_id):
net_id = STRATEGY.get_network_for_subnet(subnet_id)
else:
net_id = subnet["network_id"]
res = {"id": subnet.get("id"), res = {"id": subnet_id,
"name": subnet.get("name"), "name": subnet.get("name"),
"tenant_id": subnet.get("tenant_id"), "tenant_id": subnet.get("tenant_id"),
"network_id": net_id, "network_id": net_id,
@@ -93,7 +97,8 @@ def _make_subnet_dict(subnet, fields=None):
if CONF.QUARK.show_subnet_ip_policy_id: if CONF.QUARK.show_subnet_ip_policy_id:
res['ip_policy_id'] = subnet.get("ip_policy_id") res['ip_policy_id'] = subnet.get("ip_policy_id")
if CONF.QUARK.show_allocation_pools: if (CONF.QUARK.show_allocation_pools and not
STRATEGY.is_provider_subnet(subnet_id)):
res["allocation_pools"] = subnet.allocation_pools res["allocation_pools"] = subnet.allocation_pools
else: else:
res["allocation_pools"] = [] res["allocation_pools"] = []
@@ -102,11 +107,10 @@ def _make_subnet_dict(subnet, fields=None):
return {"destination": route["cidr"], return {"destination": route["cidr"],
"nexthop": route["gateway"]} "nexthop": route["gateway"]}
# TODO(mdietz): really inefficient, should go away
res["gateway_ip"] = None res["gateway_ip"] = None
res["host_routes"] = [] res["host_routes"] = []
default_found = False default_found = False
for route in subnet["routes"]: for route in subnet.get("routes", []):
netroute = netaddr.IPNetwork(route["cidr"]) netroute = netaddr.IPNetwork(route["cidr"])
if _is_default_route(netroute): if _is_default_route(netroute):
# NOTE(mdietz): This has the potential to find more than one # NOTE(mdietz): This has the potential to find more than one
@@ -252,8 +256,7 @@ def _make_ports_list(query, fields=None):
def _make_subnets_list(query, fields=None): def _make_subnets_list(query, fields=None):
subnets = [] subnets = []
for subnet in query: for subnet in query:
subnet_dict = _make_subnet_dict(subnet, fields=fields) subnets.append(_make_subnet_dict(subnet, fields=fields))
subnets.append(subnet_dict)
return subnets return subnets

View File

@@ -92,9 +92,8 @@ class TestQuarkGetNetworksShared(test_quark_plugin.TestQuarkPlugin):
def setUp(self): def setUp(self):
super(TestQuarkGetNetworksShared, self).setUp() super(TestQuarkGetNetworksShared, self).setUp()
self.strategy = {"public_network": self.strategy = {"public_network":
{"required": True, {"bridge": "xenbr0",
"bridge": "xenbr0", "subnets": ["public_v4", "public_v6"]}}
"children": {"nova": "child_net"}}}
self.strategy_json = json.dumps(self.strategy) self.strategy_json = json.dumps(self.strategy)
self.old = plugin_views.STRATEGY self.old = plugin_views.STRATEGY
plugin_views.STRATEGY = network_strategy.JSONStrategy( plugin_views.STRATEGY = network_strategy.JSONStrategy(
@@ -150,7 +149,7 @@ class TestQuarkGetNetworksShared(test_quark_plugin.TestQuarkPlugin):
""" Includes regression for RM8483. """ """ Includes regression for RM8483. """
for net in ret: for net in ret:
if net['shared']: if net['shared']:
self.assertEqual(0, len(net['subnets'])) self.assertEqual(2, len(net['subnets']))
else: else:
self.assertEqual(1, len(net['subnets'])) self.assertEqual(1, len(net['subnets']))
net_find.assert_called_with(self.context, None, None, None, False, net_find.assert_called_with(self.context, None, None, None, False,

View File

@@ -267,9 +267,11 @@ class TestQuarkCreatePortRM9305(test_quark_plugin.TestQuarkPlugin):
def setUp(self): def setUp(self):
super(TestQuarkCreatePortRM9305, self).setUp() super(TestQuarkCreatePortRM9305, self).setUp()
strategy = {"00000000-0000-0000-0000-000000000000": strategy = {"00000000-0000-0000-0000-000000000000":
{"bridge": "publicnet"}, {"bridge": "publicnet",
"subnets": ["public_v4", "public_v6"]},
"11111111-1111-1111-1111-111111111111": "11111111-1111-1111-1111-111111111111":
{"bridge": "servicenet"}} {"bridge": "servicenet",
"subnets": ["private_v4", "private_v6"]}}
strategy_json = json.dumps(strategy) strategy_json = json.dumps(strategy)
quark_ports.STRATEGY = network_strategy.JSONStrategy(strategy_json) quark_ports.STRATEGY = network_strategy.JSONStrategy(strategy_json)
@@ -936,9 +938,8 @@ class TestQuarkCreatePortOnSharedNetworks(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager @contextlib.contextmanager
def _stubs(self, port=None, network=None, addr=None, mac=None): def _stubs(self, port=None, network=None, addr=None, mac=None):
self.strategy = {"public_network": self.strategy = {"public_network":
{"required": True, {"bridge": "xenbr0",
"bridge": "xenbr0", "subnets": ["public_v4", "public_v6"]}}
"children": {"nova": "child_net"}}}
strategy_json = json.dumps(self.strategy) strategy_json = json.dumps(self.strategy)
quark_ports.STRATEGY = network_strategy.JSONStrategy(strategy_json) quark_ports.STRATEGY = network_strategy.JSONStrategy(strategy_json)

View File

@@ -16,6 +16,7 @@
import contextlib import contextlib
import copy import copy
from datetime import datetime from datetime import datetime
import json
import time import time
import uuid import uuid
@@ -24,8 +25,11 @@ from neutron.api.v2 import attributes as neutron_attrs
from neutron.common import exceptions from neutron.common import exceptions
from oslo_config import cfg from oslo_config import cfg
from quark.db import api as db_api
from quark.db import models from quark.db import models
from quark import exceptions as q_exc from quark import exceptions as q_exc
from quark import network_strategy
from quark import plugin_views
from quark.tests import test_quark_plugin from quark.tests import test_quark_plugin
@@ -1591,3 +1595,90 @@ class TestQuarkUpdateSubnetAttrFilters(test_quark_plugin.TestQuarkPlugin):
tenant_id=subnet["subnet"]["tenant_id"], tenant_id=subnet["subnet"]["tenant_id"],
enable_dhcp=subnet["subnet"]["enable_dhcp"], enable_dhcp=subnet["subnet"]["enable_dhcp"],
do_not_use=subnet["subnet"]["do_not_use"]) do_not_use=subnet["subnet"]["do_not_use"])
class TestQuarkGetSubnetsShared(test_quark_plugin.TestQuarkPlugin):
def setUp(self):
super(TestQuarkGetSubnetsShared, self).setUp()
self.strategy = {"public_network":
{"bridge": "xenbr0",
"subnets": ["public_v4", "public_v6"]}}
self.strategy_json = json.dumps(self.strategy)
self.old = plugin_views.STRATEGY
plugin_views.STRATEGY = network_strategy.JSONStrategy(
self.strategy_json)
cfg.CONF.set_override("default_net_strategy", self.strategy_json,
"QUARK")
def tearDown(self):
plugin_views.STRATEGY = self.old
@contextlib.contextmanager
def _stubs(self, subnets=None):
subnet_mods = []
if isinstance(subnets, list):
for sub in subnets:
subnet_mod = models.Subnet()
subnet_mod.update(sub)
subnet_mods.append(subnet_mod)
db_mod = "quark.db.api"
db_api.STRATEGY = network_strategy.JSONStrategy(self.strategy_json)
network_strategy.STRATEGY = network_strategy.JSONStrategy(
self.strategy_json)
with mock.patch("%s._subnet_find" % db_mod) as subnet_find:
subnet_find.return_value = subnet_mods
yield subnet_find
def test_get_subnets_shared(self):
sub0 = dict(id='public_v4', tenant_id="provider", name="public_v4",
_cidr="0.0.0.0/0", network_id="public_network")
sub1 = dict(id='public_v6', tenant_id="provider", name="public_v6",
_cidr="::/0", network_id="public_network")
with self._stubs(subnets=[sub0, sub1]) as subnet_find:
ret = self.plugin.get_subnets(self.context, None, None, None,
False, {"shared": [True]})
for sub in ret:
self.assertEqual("public_network", sub["network_id"])
subnet_find.assert_called_with(self.context, None, None, False,
None, None,
join_routes=True,
defaults=["public_v4", "public_v6"],
join_dns=True,
provider_query=False)
def test_get_subnets_shared_false(self):
sub0 = dict(id='public_v4', tenant_id="provider", name="public_v4",
_cidr="0.0.0.0/0", network_id="public_network")
sub1 = dict(id='public_v6', tenant_id="provider", name="public_v6",
_cidr="::/0", network_id="public_network")
with self._stubs(subnets=[sub0, sub1]) as subnet_find:
self.plugin.get_subnets(self.context, None, None, None,
False, {"shared": [False]})
invert = db_api.INVERT_DEFAULTS
subnet_find.assert_called_with(self.context, None, None, False,
None, None,
defaults=[invert, "public_v4",
"public_v6"],
provider_query=False,
join_routes=True, join_dns=True)
def test_get_subnets_no_shared(self):
sub0 = dict(id='public_v4', tenant_id="provider", name="public_v4",
_cidr="0.0.0.0/0", network_id="public_network")
sub1 = dict(id='tenant_v4', tenant_id="tenant", name="tenant_v4",
_cidr="0.0.0.0/0", network_id="tenant_network")
with self._stubs(subnets=[sub0, sub1]) as subnet_find:
self.plugin.get_subnets(self.context, None, None, None,
False)
subnet_find.assert_called_with(self.context, None, None, False,
None, None,
defaults=[],
provider_query=False,
join_routes=True, join_dns=True)

View File

@@ -2166,9 +2166,11 @@ class QuarkIpamTestIpAddressFailure(test_base.TestBase):
def setUp(self): def setUp(self):
super(QuarkIpamTestIpAddressFailure, self).setUp() super(QuarkIpamTestIpAddressFailure, self).setUp()
strategy = {"00000000-0000-0000-0000-000000000000": strategy = {"00000000-0000-0000-0000-000000000000":
{"bridge": "publicnet"}, {"bridge": "publicnet",
"subnets": ["public_v4", "public_v6"]},
"11111111-1111-1111-1111-111111111111": "11111111-1111-1111-1111-111111111111":
{"bridge": "servicenet"}} {"bridge": "servicenet",
"subnets": ["private_v4", "private_v6"]}}
strategy_json = json.dumps(strategy) strategy_json = json.dumps(strategy)
quark.ipam.STRATEGY = network_strategy.JSONStrategy(strategy_json) quark.ipam.STRATEGY = network_strategy.JSONStrategy(strategy_json)

View File

@@ -23,35 +23,85 @@ from quark.tests import test_base
class TestJSONStrategy(test_base.TestBase): class TestJSONStrategy(test_base.TestBase):
def setUp(self): def setUp(self):
self.context = None self.strategy = {"public_network": {"bridge": "xenbr0",
self.strategy = {"public_network": "subnets": ["public_v4",
{"required": True, "public_v6"]}}
"bridge": "xenbr0"}}
strategy_json = json.dumps(self.strategy) strategy_json = json.dumps(self.strategy)
cfg.CONF.set_override("default_net_strategy", strategy_json, "QUARK") cfg.CONF.set_override("default_net_strategy", strategy_json, "QUARK")
def test_get_assignable_networks_default_strategy(self):
json_strategy = network_strategy.JSONStrategy()
net_ids = json_strategy.get_assignable_networks(self.context)
self.assertEqual("public_network", net_ids[0])
def test_get_assignable_networks_custom_strategy(self):
custom = {"private_network": self.strategy["public_network"]}
json_strategy = network_strategy.JSONStrategy(json.dumps(custom))
net_ids = json_strategy.get_assignable_networks(self.context)
self.assertEqual("private_network", net_ids[0])
def test_get_network(self): def test_get_network(self):
json_strategy = network_strategy.JSONStrategy() json_strategy = network_strategy.JSONStrategy()
net = json_strategy.get_network(self.context, "public_network") net = json_strategy.get_network("public_network")
self.assertEqual(net["bridge"], "xenbr0") self.assertEqual(net["bridge"], "xenbr0")
def test_split_network_ids(self): def test_split_network_ids(self):
json_strategy = network_strategy.JSONStrategy() json_strategy = network_strategy.JSONStrategy()
net_ids = ["foo_net", "public_network"] net_ids = ["foo_net", "public_network"]
tenant, assignable = json_strategy.split_network_ids(self.context, tenant, assignable = json_strategy.split_network_ids(net_ids)
net_ids)
self.assertTrue("foo_net" in tenant) self.assertTrue("foo_net" in tenant)
self.assertTrue("foo_net" not in assignable) self.assertTrue("foo_net" not in assignable)
self.assertTrue("public_network" not in tenant) self.assertTrue("public_network" not in tenant)
self.assertTrue("public_network" in assignable) self.assertTrue("public_network" in assignable)
def test_split_subnet_ids(self):
json_strategy = network_strategy.JSONStrategy()
subnet_ids = ["tenant_subnet", "public_v6"]
tenant, assignable = json_strategy.split_subnet_ids(subnet_ids)
self.assertTrue("tenant_subnet" in tenant)
self.assertTrue("tenant_subnet" not in assignable)
self.assertTrue("public_v6" not in tenant)
self.assertTrue("public_v6" in assignable)
def test_is_provider_network(self):
json_strategy = network_strategy.JSONStrategy()
self.assertTrue(json_strategy.is_provider_network("public_network"))
def test_is_not_provider_network(self):
json_strategy = network_strategy.JSONStrategy()
self.assertFalse(json_strategy.is_provider_network("tenant_network"))
def test_is_provider_subnet(self):
json_strategy = network_strategy.JSONStrategy()
self.assertTrue(json_strategy.is_provider_subnet("public_v4"))
def test_is_not_provider_subnet(self):
json_strategy = network_strategy.JSONStrategy()
self.assertFalse(json_strategy.is_provider_network("tenant_v4"))
def test_get_provider_networks(self):
json_strategy = network_strategy.JSONStrategy()
expected = "public_network"
nets = json_strategy.get_provider_networks()
self.assertTrue(expected in nets)
self.assertEqual(1, len(nets))
def test_get_provider_subnets(self):
json_strategy = network_strategy.JSONStrategy()
expected = ["public_v4", "public_v6"]
subs = json_strategy.get_provider_subnets()
for sub in expected:
self.assertTrue(sub in subs)
self.assertEqual(2, len(subs))
def test_get_network_for_subnet(self):
json_strategy = network_strategy.JSONStrategy()
net = json_strategy.get_network_for_subnet("public_v4")
self.assertEqual("public_network", net)
def test_get_network_for_subnet_matches_none(self):
json_strategy = network_strategy.JSONStrategy()
net = json_strategy.get_network_for_subnet("tenant_v4")
self.assertIsNone(net)
def test_subnet_ids_for_network(self):
json_strategy = network_strategy.JSONStrategy()
expected = ["public_v4", "public_v6"]
subs = json_strategy.subnet_ids_for_network("public_network")
for sub in expected:
self.assertTrue(sub in subs)
self.assertEqual(2, len(subs))
def test_subnet_ids_for_network_matches_none(self):
json_strategy = network_strategy.JSONStrategy()
subs = json_strategy.subnet_ids_for_network("tenant_network")
self.assertIsNone(subs)

View File

@@ -27,7 +27,8 @@ from quark.tests import test_base
class TestUnmanagedDriver(test_base.TestBase): class TestUnmanagedDriver(test_base.TestBase):
def setUp(self): def setUp(self):
super(TestUnmanagedDriver, self).setUp() super(TestUnmanagedDriver, self).setUp()
self.strategy = {"public_network": {"bridge": "xenbr0"}} self.strategy = {"public_network": {"bridge": "xenbr0",
"subnets": ["public"]}}
strategy_json = json.dumps(self.strategy) strategy_json = json.dumps(self.strategy)
self.driver = unmanaged.UnmanagedDriver() self.driver = unmanaged.UnmanagedDriver()
unmanaged.STRATEGY = network_strategy.JSONStrategy(strategy_json) unmanaged.STRATEGY = network_strategy.JSONStrategy(strategy_json)