Ensure default IP policy on subnets

* Added API validation
* Added database migrations

RM7212
This commit is contained in:
Amir Sadoughi
2014-07-26 14:55:33 -05:00
parent 005dd2ccc4
commit c57582bd21
11 changed files with 780 additions and 55 deletions

View File

@@ -0,0 +1,106 @@
"""Ensure default IP policy for subnets with IP policies
Revision ID: 45a07fac3d38
Revises: 2748e48cee3a
Create Date: 2014-07-05 12:49:51.815631
"""
# revision identifiers, used by Alembic.
revision = '45a07fac3d38'
down_revision = '2748e48cee3a'
import logging
from alembic import op
from sqlalchemy.sql import column, select, table
import netaddr
from neutron.openstack.common import timeutils
from neutron.openstack.common import uuidutils
import sqlalchemy as sa
LOG = logging.getLogger("alembic.migration")
def upgrade():
ip_policy = table('quark_ip_policy',
column('id', sa.String(length=36)),
column('tenant_id', sa.String(length=255)),
column('created_at', sa.DateTime()))
ip_policy_cidrs = table('quark_ip_policy_cidrs',
column('id', sa.String(length=36)),
column('created_at', sa.DateTime()),
column('ip_policy_id', sa.String(length=36)),
column('cidr', sa.String(length=64)))
subnets = table('quark_subnets',
column('_cidr', sa.String(length=64)),
column('ip_policy_id', sa.String(length=36)))
connection = op.get_bind()
# 1. Get all ip_policy_cidrs for subnets with an ip_policy.
j = subnets.outerjoin(
ip_policy_cidrs,
subnets.c.ip_policy_id == ip_policy_cidrs.c.ip_policy_id)
q = select([subnets.c.ip_policy_id, subnets.c._cidr,
ip_policy_cidrs.c.id, ip_policy_cidrs.c.cidr]).select_from(
j).where(subnets.c.ip_policy_id != None).order_by( # noqa
subnets.c.ip_policy_id)
data = connection.execute(q).fetchall()
if data is None:
return
# 2. Check ip_policy_cidrs contains default ip policy for subnet.
ipp_to_update = dict()
def _test_change_needed(ipp_id, s, ipp):
if s is None or ipp is None:
return
updated = False
last = netaddr.IPAddress(subnet.broadcast)
first = netaddr.IPAddress(subnet.network)
if last not in ipp:
updated = True
ipp.add(last)
if first not in ipp:
updated = True
ipp.add(first)
if updated:
ipp_to_update[ipp_id] = ipp
prev_ip_policy_id = ''
subnet, ip_policy = None, None
for ip_policy_id, cidr, ippc_id, ippc_cidr in data:
if ip_policy_id != prev_ip_policy_id:
_test_change_needed(prev_ip_policy_id, subnet, ip_policy)
subnet, ip_policy = netaddr.IPNetwork(cidr), netaddr.IPSet()
ip_policy |= netaddr.IPSet([ippc_cidr] if ippc_cidr else [])
prev_ip_policy_id = ip_policy_id
_test_change_needed(prev_ip_policy_id, subnet, ip_policy)
if not ipp_to_update.keys():
return
LOG.info("IP Policy IDs to update: %s", ipp_to_update.keys())
# 3. Delete ip_policy_cidrs for ip_policy_ids to be updated.
connection.execute(ip_policy_cidrs.delete().where(
ip_policy_cidrs.c.ip_policy_id.in_(ipp_to_update.keys())))
# 4. Insert ip_policy_cidrs for ip_policy_ids to be updated.
vals = [dict(id=uuidutils.generate_uuid(),
created_at=timeutils.utcnow(),
ip_policy_id=key,
cidr=str(x.cidr))
for key in ipp_to_update.keys()
for x in ipp_to_update[key].iter_cidrs()]
if not vals:
return
LOG.info("IP Policy CIDR IDs to insert: %s", [v["id"] for v in vals])
connection.execute(ip_policy_cidrs.insert(), *vals)
def downgrade():
raise NotImplementedError()

View File

@@ -0,0 +1,84 @@
"""Ensure default IP policy exists for subnets without IP policies
Revision ID: 552b213c2b8c
Revises: 45a07fac3d38
Create Date: 2014-07-25 15:07:07.418971
"""
# revision identifiers, used by Alembic.
revision = '552b213c2b8c'
down_revision = '45a07fac3d38'
import logging
from quark.plugin_modules import ip_policies
from alembic import op
from sqlalchemy.sql import column, select, table
from neutron.openstack.common import timeutils
from neutron.openstack.common import uuidutils
import sqlalchemy as sa
LOG = logging.getLogger("alembic.migration")
def upgrade():
ip_policy = table('quark_ip_policy',
column('id', sa.String(length=36)),
column('tenant_id', sa.String(length=255)),
column('created_at', sa.DateTime()))
ip_policy_cidrs = table('quark_ip_policy_cidrs',
column('id', sa.String(length=36)),
column('created_at', sa.DateTime()),
column('ip_policy_id', sa.String(length=36)),
column('cidr', sa.String(length=64)))
subnets = table('quark_subnets',
column('id', sa.String(length=36)),
column('_cidr', sa.String(length=64)),
column('tenant_id', sa.String(length=255)),
column('ip_policy_id', sa.String(length=36)))
connection = op.get_bind()
# 1. Find all subnets without ip_policy.
data = connection.execute(select([
subnets.c.id, subnets.c._cidr, subnets.c.tenant_id]).where(
subnets.c.ip_policy_id == None)).fetchall() # noqa
if not data:
return
LOG.info("Subnet IDs without IP policies: %s", [d[0] for d in data])
# 2. Insert ip_policy rows with id.
vals = [dict(id=uuidutils.generate_uuid(),
created_at=timeutils.utcnow(),
tenant_id=tenant_id)
for id, cidr, tenant_id in data]
LOG.info("IP Policy IDs to insert: %s", [v["id"] for v in vals])
connection.execute(ip_policy.insert(), *vals)
# 3. Insert default ip_policy_cidrs for those ip_policy's.
vals2 = []
for ((id, cidr, tenant_id), ip_policy) in zip(data, vals):
cidrs = []
ip_policies.ensure_default_policy(cidrs, [dict(cidr=cidr)])
for cidr in cidrs:
vals2.append(dict(id=uuidutils.generate_uuid(),
created_at=timeutils.utcnow(),
ip_policy_id=ip_policy["id"],
cidr=str(cidr)))
LOG.info("IP Policy CIDR IDs to insert: %s", [v["id"] for v in vals2])
connection.execute(ip_policy_cidrs.insert(), *vals2)
# 4. Set ip_policy_id rows in quark_subnets.
for ((id, cidr, tenant_id), ip_policy) in zip(data, vals):
connection.execute(subnets.update().values(
ip_policy_id=ip_policy["id"]).where(
subnets.c.id == id))
def downgrade():
raise NotImplementedError()

View File

@@ -1 +1 @@
2748e48cee3a
552b213c2b8c

View File

@@ -376,20 +376,9 @@ class IPPolicy(BASEV2, models.HasId, models.HasTenant):
@staticmethod
def get_ip_policy_cidrs(subnet):
ip_policy = subnet["ip_policy"] or {}
subnet_cidr = netaddr.IPNetwork(subnet["cidr"])
network_ip = subnet_cidr.network
broadcast_ip = subnet_cidr.broadcast
prefix_len = '32' if subnet_cidr.version == 4 else '128'
default_policy_cidrs = ["%s/%s" % (network_ip, prefix_len),
"%s/%s" % (broadcast_ip, prefix_len)]
ip_policy_cidrs = []
ip_policies = ip_policy.get("exclude", [])
if ip_policies:
ip_policy_cidrs = [ip_policy_cidr.cidr
for ip_policy_cidr in ip_policies]
ip_policy_cidrs = ip_policy_cidrs + default_policy_cidrs
ip_policy_cidrs = [ip_policy_cidr.cidr
for ip_policy_cidr in ip_policies]
return netaddr.IPSet(ip_policy_cidrs)

View File

@@ -57,6 +57,7 @@ def create_ip_policy(context, ip_policy):
if not subnets:
raise exceptions.SubnetNotFound(id=subnet_ids)
if ip_policy_cidrs:
ensure_default_policy(ip_policy_cidrs, subnets)
_validate_cidrs_fit_into_subnets(ip_policy_cidrs, subnets)
models.extend(subnets)
@@ -68,6 +69,7 @@ def create_ip_policy(context, ip_policy):
subnets = [subnet for net in nets
for subnet in net.get("subnets", [])]
if ip_policy_cidrs and subnets:
ensure_default_policy(ip_policy_cidrs, subnets)
_validate_cidrs_fit_into_subnets(ip_policy_cidrs, subnets)
models.extend(nets)
@@ -121,7 +123,9 @@ def update_ip_policy(context, id, ip_policy):
context, id=subnet_ids, scope=db_api.ALL)
if len(subnets) != len(subnet_ids):
raise exceptions.SubnetNotFound(id=subnet_ids)
# FIXME(asadoughi): cannot update exclude w/o updating subnet_ids
if ip_policy_cidrs:
ensure_default_policy(ip_policy_cidrs, subnets)
_validate_cidrs_fit_into_subnets(ip_policy_cidrs, subnets)
models.extend(subnets)
@@ -135,6 +139,7 @@ def update_ip_policy(context, id, ip_policy):
subnets = [subnet for net in nets
for subnet in net.get("subnets", [])]
if ip_policy_cidrs and subnets:
ensure_default_policy(ip_policy_cidrs, subnets)
_validate_cidrs_fit_into_subnets(ip_policy_cidrs, subnets)
models.extend(nets)
@@ -171,3 +176,17 @@ def _validate_cidrs_fit_into_subnets(cidrs, subnets):
resource="ip_policy",
msg="CIDR %s not in subnet CIDR %s"
% (cidr, subnet_cidr))
def ensure_default_policy(cidrs, subnets):
policy_cidrs = netaddr.IPSet(cidrs)
for subnet in subnets:
subnet_cidr = netaddr.IPNetwork(subnet["cidr"])
network_ip = subnet_cidr.network
broadcast_ip = subnet_cidr.broadcast
prefix_len = '32' if subnet_cidr.version == 4 else '128'
default_policy_cidrs = ["%s/%s" % (network_ip, prefix_len),
"%s/%s" % (broadcast_ip, prefix_len)]
for cidr in default_policy_cidrs:
if netaddr.IPNetwork(cidr) not in policy_cidrs:
cidrs.append(cidr)

View File

@@ -23,9 +23,9 @@ from neutron.openstack.common import timeutils
from oslo.config import cfg
from quark.db import api as db_api
from quark.db import models as models
from quark import exceptions as q_exc
from quark import network_strategy
from quark.plugin_modules import ip_policies
from quark.plugin_modules import routes
from quark import plugin_views as v
from quark import utils
@@ -151,8 +151,6 @@ def _get_exclude_cidrs_from_allocation_pools(subnet_db, allocation_pools):
cidrset -= netaddr.IPSet(netaddr.IPRange(
netaddr.IPAddress(start),
netaddr.IPAddress(end)).cidrs())
default_cidrset = models.IPPolicy.get_ip_policy_cidrs(subnet_db)
cidrset.update(default_cidrset)
cidrs = [str(x.cidr) for x in cidrset.iter_cidrs()]
return cidrs
@@ -232,12 +230,14 @@ def create_subnet(context, subnet):
new_subnet["dns_nameservers"].append(db_api.dns_create(
context, ip=netaddr.IPAddress(dns_ip)))
if isinstance(allocation_pools, list) and allocation_pools:
cidrs = []
if isinstance(allocation_pools, list):
_validate_allocation_pools(allocation_pools, sub_attrs["cidr"])
cidrs = _get_exclude_cidrs_from_allocation_pools(
new_subnet, allocation_pools)
new_subnet["ip_policy"] = db_api.ip_policy_create(context,
exclude=cidrs)
ip_policies.ensure_default_policy(cidrs, [new_subnet])
new_subnet["ip_policy"] = db_api.ip_policy_create(context,
exclude=cidrs)
subnet_dict = v._make_subnet_dict(new_subnet)
subnet_dict["gateway_ip"] = gateway_ip
@@ -314,16 +314,13 @@ def update_subnet(context, id, subnet):
subnet_db["routes"].append(db_api.route_create(
context, cidr=route["destination"], gateway=route["nexthop"]))
if isinstance(allocation_pools, list) and allocation_pools:
if isinstance(allocation_pools, list):
_validate_allocation_pools(allocation_pools, subnet_db["cidr"])
cidrs = _get_exclude_cidrs_from_allocation_pools(
subnet_db, allocation_pools)
if subnet_db["ip_policy"]:
subnet_db["ip_policy"] = db_api.ip_policy_update(
context, subnet_db["ip_policy"], exclude=cidrs)
else:
subnet_db["ip_policy"] = db_api.ip_policy_create(
context, exclude=cidrs)
ip_policies.ensure_default_policy(cidrs, [subnet_db])
subnet_db["ip_policy"] = db_api.ip_policy_update(
context, subnet_db["ip_policy"], exclude=cidrs)
subnet = db_api.subnet_update(context, subnet_db, **s)
return v._make_subnet_dict(subnet)

View File

@@ -20,6 +20,7 @@ from neutron.common import exceptions
from quark import exceptions as quark_exceptions
from quark.plugin_modules import ip_policies as ippol
from quark.tests import test_base
from quark.tests import test_quark_plugin
@@ -190,7 +191,8 @@ class TestQuarkCreateIpPolicies(test_quark_plugin.TestQuarkPlugin):
self.assertEqual(len(resp.keys()), 6)
self.assertEqual(resp["subnet_ids"], [1])
self.assertEqual(resp["network_ids"], [])
self.assertEqual(resp["exclude"], ["::/128"])
self.assertEqual(resp["exclude"],
["::/128", "::ffff:ffff:ffff:ffff/128"])
self.assertEqual(resp["name"], "foo")
self.assertEqual(resp["tenant_id"], 1)
@@ -210,7 +212,7 @@ class TestQuarkCreateIpPolicies(test_quark_plugin.TestQuarkPlugin):
self.assertEqual(len(resp.keys()), 6)
self.assertEqual(resp["subnet_ids"], [1])
self.assertEqual(resp["network_ids"], [])
self.assertEqual(resp["exclude"], ["0.0.0.0/24"])
self.assertEqual(resp["exclude"], ["0.0.0.0/24", "0.0.255.255/32"])
self.assertEqual(resp["name"], "foo")
self.assertEqual(resp["tenant_id"], 1)
@@ -282,14 +284,12 @@ class TestQuarkUpdateIpPolicies(test_quark_plugin.TestQuarkPlugin):
def test_update_ip_policy_networks_not_found(self):
ipp = dict(id=1, networks=[])
with self._stubs(ipp) as (ip_policy_update):
with self._stubs(ipp):
with self.assertRaises(exceptions.NetworkNotFound):
self.plugin.update_ip_policy(
self.context,
1,
dict(ip_policy=dict(network_ids=[100])))
self.fai("HI")
self.assertEqual(ip_policy_update.called, 0)
def test_update_ip_policy_networks(self):
ipp = dict(id=1, networks=[dict()],
@@ -304,6 +304,38 @@ class TestQuarkUpdateIpPolicies(test_quark_plugin.TestQuarkPlugin):
dict(ip_policy=dict(network_ids=[100])))
self.assertEqual(ip_policy_update.called, 1)
def test_update_ip_policy_exclude_v4(self):
subnets = [dict(id=100, cidr="0.0.0.0/16")]
ipp = dict(id=1, subnets=subnets,
exclude=["0.0.0.0/24"],
name="foo", tenant_id=1)
with self._stubs(ipp, subnets=subnets) as (ip_policy_update):
self.plugin.update_ip_policy(
self.context,
1,
dict(ip_policy=dict(subnet_ids=[100], exclude=["0.0.0.1/32"])))
ip_policy_update.assert_called_once_with(
self.context,
ipp,
subnet_ids=[100],
exclude=["0.0.0.1/32", "0.0.0.0/32", "0.0.255.255/32"])
def test_update_ip_policy_exclude_v6(self):
subnets = [dict(id=100, cidr="::/64")]
ipp = dict(id=1, subnets=subnets,
exclude=["::/128"],
name="foo", tenant_id=1)
with self._stubs(ipp, subnets=subnets) as (ip_policy_update):
self.plugin.update_ip_policy(
self.context,
1,
dict(ip_policy=dict(subnet_ids=[100], exclude=["::1/128"])))
ip_policy_update.assert_called_once_with(
self.context,
ipp,
subnet_ids=[100],
exclude=["::1/128", "::/128", "::ffff:ffff:ffff:ffff/128"])
class TestQuarkDeleteIpPolicies(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
@@ -407,3 +439,70 @@ class TestQuarkValidateCIDRsFitsIntoSubnets(test_quark_plugin.TestQuarkPlugin):
["::/127"],
[dict(id=1, cidr="::/96"),
dict(id=1, cidr="::/128")])
class TestQuarkEnsureDefaultPolicy(test_base.TestBase):
def test_no_cidrs_no_subnets(self):
cidrs = []
subnets = []
self.assertIsNone(ippol.ensure_default_policy(cidrs, subnets))
self.assertEqual(cidrs, [])
self.assertEqual(subnets, [])
def test_no_cidrs_v4(self):
cidrs = []
subnets = [dict(cidr="192.168.10.1/24")]
self.assertIsNone(ippol.ensure_default_policy(cidrs, subnets))
self.assertEqual(cidrs, ["192.168.10.0/32", "192.168.10.255/32"])
self.assertEqual(subnets, [dict(cidr="192.168.10.1/24")])
def test_no_subnets_v4(self):
cidrs = ["192.168.10.0/32", "192.168.10.255/32"]
subnets = []
self.assertIsNone(ippol.ensure_default_policy(cidrs, subnets))
self.assertEqual(cidrs, ["192.168.10.0/32", "192.168.10.255/32"])
self.assertEqual(subnets, [])
def test_cidrs_without_default_cidrs_v4(self):
cidrs = ["192.168.10.20/32", "192.168.10.40/32"]
subnets = [dict(cidr="192.168.10.1/24")]
self.assertIsNone(ippol.ensure_default_policy(cidrs, subnets))
self.assertEqual(cidrs, ["192.168.10.20/32", "192.168.10.40/32",
"192.168.10.0/32", "192.168.10.255/32"])
self.assertEqual(subnets, [dict(cidr="192.168.10.1/24")])
def test_cidrs_with_default_cidrs_v4(self):
cidrs = ["192.168.10.0/32", "192.168.10.255/32"]
subnets = [dict(cidr="192.168.10.1/24")]
self.assertIsNone(ippol.ensure_default_policy(cidrs, subnets))
self.assertEqual(cidrs, ["192.168.10.0/32", "192.168.10.255/32"])
self.assertEqual(subnets, [dict(cidr="192.168.10.1/24")])
def test_no_cidrs_v6(self):
cidrs = []
subnets = [dict(cidr="::/64")]
self.assertIsNone(ippol.ensure_default_policy(cidrs, subnets))
self.assertEqual(cidrs, ["::/128", "::ffff:ffff:ffff:ffff/128"])
self.assertEqual(subnets, [dict(cidr="::/64")])
def test_no_subnets_v6(self):
cidrs = ["::/128", "::ffff:ffff:ffff:ffff/128"]
subnets = []
self.assertIsNone(ippol.ensure_default_policy(cidrs, subnets))
self.assertEqual(cidrs, ["::/128", "::ffff:ffff:ffff:ffff/128"])
self.assertEqual(subnets, [])
def test_cidrs_without_default_cidrs_v6(self):
cidrs = ["::10/128", "::20/128"]
subnets = [dict(cidr="::/64")]
self.assertIsNone(ippol.ensure_default_policy(cidrs, subnets))
self.assertEqual(cidrs, ["::10/128", "::20/128",
"::/128", "::ffff:ffff:ffff:ffff/128"])
self.assertEqual(subnets, [dict(cidr="::/64")])
def test_cidrs_with_default_cidrs_v6(self):
cidrs = ["::/128", "::ffff:ffff:ffff:ffff/128"]
subnets = [dict(cidr="::/64")]
self.assertIsNone(ippol.ensure_default_policy(cidrs, subnets))
self.assertEqual(cidrs, ["::/128", "::ffff:ffff:ffff:ffff/128"])
self.assertEqual(subnets, [dict(cidr="::/64")])

View File

@@ -250,6 +250,18 @@ class TestQuarkCreateSubnetAllocationPools(test_quark_plugin.TestQuarkPlugin):
self.assertEqual(resp["allocation_pools"],
[dict(start="192.168.1.1", end="192.168.1.254")])
def test_create_subnet_allocation_pools_zero_v6(self):
s = dict(subnet=dict(
cidr="2607:f0d0:1002:51::0/64",
network_id=1))
with self._stubs(s["subnet"]) as (subnet_create):
resp = self.plugin.create_subnet(self.context, s)
self.assertEqual(subnet_create.call_count, 1)
self.assertEqual(
resp["allocation_pools"],
[dict(start="2607:f0d0:1002:51::1",
end="2607:f0d0:1002:51:ffff:ffff:ffff:fffe")])
def test_create_subnet_allocation_pools_one(self):
pools = [dict(start="192.168.1.10", end="192.168.1.20")]
s = dict(subnet=dict(
@@ -319,6 +331,7 @@ class TestQuarkCreateSubnetAllocationPools(test_quark_plugin.TestQuarkPlugin):
self.assertEqual(resp["allocation_pools"], pools)
def test_create_subnet_allocation_pools_empty_list(self):
# Empty allocation_pools list yields subnet completely blocked out.
pools = []
s = dict(subnet=dict(
allocation_pools=pools,
@@ -327,8 +340,7 @@ class TestQuarkCreateSubnetAllocationPools(test_quark_plugin.TestQuarkPlugin):
with self._stubs(s["subnet"]) as (subnet_create):
resp = self.plugin.create_subnet(self.context, s)
self.assertEqual(subnet_create.call_count, 1)
expected_pools = [{'start': '192.168.1.1',
'end': '192.168.1.254'}]
expected_pools = []
self.assertEqual(resp["allocation_pools"], expected_pools)
@@ -760,7 +772,10 @@ class TestQuarkUpdateSubnet(test_quark_plugin.TestQuarkPlugin):
dns_ips = subnet.pop("dns_nameservers", [])
host_routes = subnet.pop("host_routes", [])
subnet_mod = models.Subnet()
subnet_mod = models.Subnet(
ip_policy=models.IPPolicy(
exclude=[models.IPPolicyCIDR(cidr="172.16.0.0/32"),
models.IPPolicyCIDR(cidr="172.16.0.255/32")]))
subnet_mod.update(subnet)
subnet_mod["dns_nameservers"] = [models.DNSNameserver(ip=ip)
@@ -949,7 +964,7 @@ class TestQuarkUpdateSubnet(test_quark_plugin.TestQuarkPlugin):
resp = self.plugin.update_subnet(self.context, 1, s)
self.assertEqual(resp["allocation_pools"], pools)
def test_create_subnet_allocation_pools_four(self):
def test_update_subnet_allocation_pools_four(self):
pools = [dict(start="2607:f0d0:1002:51::a",
end="2607:f0d0:1002:51:ffff:ffff:ffff:fffe")]
s = dict(subnet=dict(allocation_pools=pools))
@@ -963,14 +978,14 @@ class TestQuarkUpdateSubnet(test_quark_plugin.TestQuarkPlugin):
self.assertEqual(resp["allocation_pools"], pools)
def test_update_subnet_allocation_pools_empty_list(self):
# Empty allocation_pools list yields subnet completely blocked out.
pools = []
s = dict(subnet=dict(allocation_pools=pools))
with self._stubs(
new_ip_policy=[]
) as (dns_create, route_update, route_create):
resp = self.plugin.update_subnet(self.context, 1, s)
expected_pools = [{'start': '172.16.0.1',
'end': '172.16.0.254'}]
expected_pools = []
self.assertEqual(resp["allocation_pools"], expected_pools)
@@ -1173,6 +1188,8 @@ class TestQuarkCreateSubnetAttrFilters(test_quark_plugin.TestQuarkPlugin):
"next_auto_assign_ip": 10}}
with self._stubs() as (subnet_create, net_find):
subnet_create.return_value = models.Subnet(
cidr=subnet["subnet"]["cidr"])
self.plugin.create_subnet(self.context, subnet)
self.assertEqual(subnet_create.call_count, 1)
subnet_create.assert_called_once_with(
@@ -1192,6 +1209,8 @@ class TestQuarkCreateSubnetAttrFilters(test_quark_plugin.TestQuarkPlugin):
admin_ctx = self.context.elevated()
with self._stubs() as (subnet_create, net_find):
subnet_create.return_value = models.Subnet(
cidr=subnet["subnet"]["cidr"])
self.plugin.create_subnet(admin_ctx, subnet)
self.assertEqual(subnet_create.call_count, 1)
subnet_create.assert_called_once_with(

View File

@@ -28,8 +28,7 @@ class TestDBModels(test_base.TestBase):
cidr="0.0.0.0/24", first_ip=0, last_ip=255,
network=dict(ip_policy=None), ip_policy=None)
ip_policy_rules = models.IPPolicy.get_ip_policy_cidrs(subnet)
self.assertEqual(ip_policy_rules,
IPSet(['0.0.0.0/32', '0.0.0.255/32']))
self.assertEqual(ip_policy_rules, IPSet())
def test_get_ip_policy_cidrs_v6(self):
subnet = dict(id=1, ip_version=6, next_auto_assign_ip=0,
@@ -38,7 +37,4 @@ class TestDBModels(test_base.TestBase):
last_ip=337623910929368631717566993311207522303L,
network=dict(ip_policy=None), ip_policy=None)
ip_policy_rules = models.IPPolicy.get_ip_policy_cidrs(subnet)
self.assertEqual(
ip_policy_rules,
IPSet(["fc00::/128",
"fdff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/128"]))
self.assertEqual(ip_policy_rules, IPSet())

View File

@@ -730,7 +730,7 @@ class QuarkIpamTestBothRequiredIpAllocation(QuarkIpamBaseTest):
self.assertEqual(address[1]["version"], 6)
def test_allocate_allocate_ip_unsatisfied_strategy_fails(self):
old_override = cfg.CONF.QUARK.v6_allocation_attempts
old_override = cfg.CONF.QUARK.ip_address_retry_max
cfg.CONF.set_override('ip_address_retry_max', 1, 'QUARK')
subnet4 = dict(id=1, first_ip=0, last_ip=255,
@@ -740,7 +740,10 @@ class QuarkIpamTestBothRequiredIpAllocation(QuarkIpamBaseTest):
subnet6 = dict(id=1, first_ip=self.v6_fip.value,
last_ip=self.v6_lip.value, cidr="feed::/104",
ip_version=6, next_auto_assign_ip=-2,
network=dict(ip_policy=None), ip_policy=None)
network=dict(ip_policy=None),
ip_policy=dict(exclude=[
models.IPPolicyCIDR(cidr="feed::/128"),
models.IPPolicyCIDR(cidr="feed::ff:ffff/128")]))
with self._stubs(subnets=[[(subnet4, 0)], [(subnet6, 0)]],
addresses=[None, None, None, None]):
@@ -932,7 +935,8 @@ class QuarkNewIPAddressAllocation(QuarkIpamBaseTest):
subnet = dict(id=1, first_ip=0, last_ip=255,
cidr="0.0.0.0/24", ip_version=4,
next_auto_assign_ip=0, network=dict(ip_policy=None),
ip_policy=None)
ip_policy=dict(exclude=[
models.IPPolicyCIDR(cidr="0.0.0.0/32")]))
with self._stubs(subnets=[(subnet, 0)], addresses=[None, None]):
address = []
self.ipam.allocate_ip_address(self.context, address, 0, 0, 0,
@@ -944,11 +948,13 @@ class QuarkNewIPAddressAllocation(QuarkIpamBaseTest):
subnet1 = dict(id=1, first_ip=0, last_ip=0,
cidr="0.0.0.0/32", ip_version=4,
next_auto_assign_ip=0, network=dict(ip_policy=None),
ip_policy=None)
ip_policy=dict(exclude=[
models.IPPolicyCIDR(cidr="0.0.0.0/32")]))
subnet2 = dict(id=2, first_ip=256, last_ip=512,
cidr="0.0.1.0/24", ip_version=4,
next_auto_assign_ip=256, network=dict(ip_policy=None),
ip_policy=None)
ip_policy=dict(exclude=[
models.IPPolicyCIDR(cidr="0.0.1.0/32")]))
subnets = [(subnet1, 1), (subnet2, 0)]
with self._stubs(subnets=subnets, addresses=[None, None]):
address = []
@@ -963,9 +969,11 @@ class QuarkNewIPAddressAllocation(QuarkIpamBaseTest):
self.ipam.allocate_ip_address(self.context, [], 0, 0, 0)
def test_allocate_ip_no_available_subnet_fails(self):
subnet1 = dict(id=1, first_ip=0, last_ip=0,
subnet1 = dict(id=1, first_ip=0, last_ip=0, next_auto_assign_ip=0,
cidr="0.0.0.0/32", ip_version=4,
network=dict(ip_policy=None), ip_policy=None)
network=dict(ip_policy=None),
ip_policy=dict(exclude=[
models.IPPolicyCIDR(cidr="0.0.0.0/32")]))
with self._stubs(subnets=[(subnet1, 1)]):
with self.assertRaises(exceptions.IpAddressGenerationFailure):
self.ipam.allocate_ip_address(self.context, [], 0, 0, 0)
@@ -974,11 +982,13 @@ class QuarkNewIPAddressAllocation(QuarkIpamBaseTest):
subnet1 = dict(id=1, first_ip=0, last_ip=255,
cidr="0.0.0.0/24", ip_version=4,
next_auto_assign_ip=0, network=dict(ip_policy=None),
ip_policy=None)
ip_policy=dict(exclude=[
models.IPPolicyCIDR(cidr="0.0.0.0/32")]))
subnet2 = dict(id=2, first_ip=256, last_ip=510,
cidr="0.0.1.0/24", ip_version=4,
next_auto_assign_ip=0, network=dict(ip_policy=None),
ip_policy=None)
ip_policy=dict(exclude=[
models.IPPolicyCIDR(cidr="0.0.1.0/32")]))
subnets = [(subnet1, 1), (subnet2, 1)]
with self._stubs(subnets=subnets, addresses=[None, None]):
address = []
@@ -1201,7 +1211,8 @@ class TestQuarkIpPoliciesIpAllocation(QuarkIpamBaseTest):
cidr="192.168.0.0/24", ip_version=4,
next_auto_assign_ip=first,
network=dict(ip_policy=None),
ip_policy=None)
ip_policy=dict(exclude=[
models.IPPolicyCIDR(cidr="192.168.0.0/32")]))
with self._stubs(subnets=[(subnet, 0)], addresses=[None, None]):
address = []
self.ipam.allocate_ip_address(self.context, address, 0, 0, 0,
@@ -1220,6 +1231,8 @@ class TestQuarkIpPoliciesIpAllocation(QuarkIpamBaseTest):
version=4)
def test_ip_policy_on_subnet(self):
old_override = cfg.CONF.QUARK.ip_address_retry_max
cfg.CONF.set_override('ip_address_retry_max', 3, 'QUARK')
subnet = dict(id=1, first_ip=0, last_ip=255,
cidr="0.0.0.0/24", ip_version=4,
next_auto_assign_ip=0, network=dict(ip_policy=None),
@@ -1231,6 +1244,7 @@ class TestQuarkIpPoliciesIpAllocation(QuarkIpamBaseTest):
version=4)
self.assertEqual(address[0]["address"],
netaddr.IPAddress("::ffff:0.0.0.2").value)
cfg.CONF.set_override('ip_address_retry_max', old_override, 'QUARK')
def test_ip_policy_on_both_subnet_preferred(self):
net = dict(ip_policy=dict(exclude=[
@@ -1239,7 +1253,7 @@ class TestQuarkIpPoliciesIpAllocation(QuarkIpamBaseTest):
cidr="0.0.0.0/24", ip_version=4,
next_auto_assign_ip=0, network=net,
ip_policy=dict(exclude=[
models.IPPolicyCIDR(cidr="0.0.0.254/31")]))
models.IPPolicyCIDR(cidr="0.0.0.0/32")]))
with self._stubs(subnets=[(subnet, 0)], addresses=[None, None]):
address = []
self.ipam.allocate_ip_address(self.context, address, 0, 0, 0,

View File

@@ -219,3 +219,405 @@ class Test2748e48cee3a(BaseMigrationTest):
alembic_command.upgrade(self.config, '2748e48cee3a')
with self.assertRaises(NotImplementedError):
alembic_command.downgrade(self.config, '1284c81cf727')
class Test45a07fac3d38(BaseMigrationTest):
def setUp(self):
super(Test45a07fac3d38, self).setUp()
alembic_command.upgrade(self.config, '2748e48cee3a')
self.ip_policy_cidrs = table(
'quark_ip_policy_cidrs',
column('id', sa.String(length=36)),
column('created_at', sa.DateTime()),
column('ip_policy_id', sa.String(length=36)),
column('cidr', sa.String(length=64)))
self.subnets = table(
'quark_subnets',
column('id', sa.String(length=36)),
column('_cidr', sa.String(length=64)),
column('ip_policy_id', sa.String(length=36)))
def test_upgrade_no_subnets_no_ip_policy_cidrs(self):
alembic_command.upgrade(self.config, '45a07fac3d38')
results = self.connection.execute(
select([self.ip_policy_cidrs])).fetchall()
self.assertEqual(len(results), 0)
def test_upgrade_with_subnets_no_ip_policy(self):
self.connection.execute(
self.subnets.insert(),
dict(id="000", _cidr="192.168.10.0/24", ip_policy_id=None))
alembic_command.upgrade(self.config, '45a07fac3d38')
results = self.connection.execute(
select([self.ip_policy_cidrs])).fetchall()
self.assertEqual(len(results), 0)
def test_upgrade_with_subnets_no_ip_policy_cidrs(self):
self.connection.execute(
self.subnets.insert(),
dict(id="000", _cidr="192.168.10.0/24", ip_policy_id="111"))
alembic_command.upgrade(self.config, '45a07fac3d38')
results = self.connection.execute(
select([self.ip_policy_cidrs])).fetchall()
self.assertEqual(len(results), 2)
default_cidrs = ["192.168.10.0/32", "192.168.10.255/32"]
self.assertIn(results[0]["cidr"], default_cidrs)
self.assertIn(results[1]["cidr"], default_cidrs)
self.assertNotEqual(results[0]["cidr"], results[1]["cidr"])
def test_upgrade_with_subnets_non_default_ip_policy_cidrs(self):
self.connection.execute(
self.subnets.insert(),
dict(id="000", _cidr="192.168.10.0/24", ip_policy_id="111"))
self.connection.execute(
self.ip_policy_cidrs.insert(),
dict(id="222", created_at=datetime.date(1970, 1, 1),
ip_policy_id="111", cidr="192.168.10.13/32"))
with contextlib.nested(
mock.patch("neutron.openstack.common.timeutils"),
mock.patch("neutron.openstack.common.uuidutils")
) as (tu, uuid):
uuid.generate_uuid.side_effect = (1, 2, 3)
tu.utcnow.return_value = datetime.datetime(1970, 1, 1)
alembic_command.upgrade(self.config, '45a07fac3d38')
results = self.connection.execute(
select([self.ip_policy_cidrs])).fetchall()
self.assertEqual(len(results), 3)
default_cidrs = ["192.168.10.0/32", "192.168.10.255/32",
"192.168.10.13/32"]
for result in results:
self.assertIn(result["cidr"], default_cidrs)
self.assertGreaterEqual(int(result["id"]), 1)
self.assertLessEqual(int(result["id"]), 3)
self.assertEqual(result["created_at"], tu.utcnow.return_value)
self.assertNotEqual(results[0]["cidr"], results[1]["cidr"])
self.assertNotEqual(results[0]["cidr"], results[2]["cidr"])
self.assertNotEqual(results[1]["cidr"], results[2]["cidr"])
def test_upgrade_with_subnets_non_default_ip_policy_cidrs_v6(self):
self.connection.execute(
self.subnets.insert(),
dict(id="000", _cidr="fd00::/64", ip_policy_id="111"))
self.connection.execute(
self.ip_policy_cidrs.insert(),
dict(id="222", created_at=datetime.date(1970, 1, 1),
ip_policy_id="111", cidr="fd00::3/128"))
with contextlib.nested(
mock.patch("neutron.openstack.common.timeutils"),
mock.patch("neutron.openstack.common.uuidutils")
) as (tu, uuid):
uuid.generate_uuid.side_effect = (1, 2, 3)
tu.utcnow.return_value = datetime.datetime(1970, 1, 1)
alembic_command.upgrade(self.config, '45a07fac3d38')
results = self.connection.execute(
select([self.ip_policy_cidrs])).fetchall()
self.assertEqual(len(results), 3)
default_cidrs = ["fd00::/128", "fd00::3/128",
"fd00::ffff:ffff:ffff:ffff/128"]
for result in results:
self.assertIn(result["cidr"], default_cidrs)
self.assertGreaterEqual(int(result["id"]), 1)
self.assertLessEqual(int(result["id"]), 3)
self.assertEqual(result["created_at"], tu.utcnow.return_value)
self.assertNotEqual(results[0]["cidr"], results[1]["cidr"])
self.assertNotEqual(results[0]["cidr"], results[2]["cidr"])
self.assertNotEqual(results[1]["cidr"], results[2]["cidr"])
def test_upgrade_with_subnets_default_ip_policy_cidrs(self):
self.connection.execute(
self.subnets.insert(),
dict(id="000", _cidr="192.168.10.0/24", ip_policy_id="111"))
dt = datetime.datetime(1970, 1, 1)
self.connection.execute(
self.ip_policy_cidrs.insert(),
dict(id="222", created_at=dt,
ip_policy_id="111", cidr="192.168.10.0/32"),
dict(id="223", created_at=dt,
ip_policy_id="111", cidr="192.168.10.255/32"))
alembic_command.upgrade(self.config, '45a07fac3d38')
results = self.connection.execute(
select([self.ip_policy_cidrs])).fetchall()
self.assertEqual(len(results), 2)
default_cidrs = ["192.168.10.0/32", "192.168.10.255/32"]
self.assertIn(results[0]["cidr"], default_cidrs)
self.assertIn(results[1]["cidr"], default_cidrs)
self.assertTrue(results[0]["id"] == "222" or results[0]["id"] == "223")
self.assertTrue(results[1]["id"] == "222" or results[1]["id"] == "223")
self.assertEqual(results[0]["created_at"], dt)
self.assertEqual(results[1]["created_at"], dt)
def test_upgrade_bulk(self):
self.connection.execute(
self.subnets.insert(),
dict(id="000", _cidr="192.168.10.0/24", ip_policy_id=None),
dict(id="001", _cidr="192.168.10.0/24", ip_policy_id="111"),
dict(id="002", _cidr="192.168.10.0/24", ip_policy_id="112"),
dict(id="003", _cidr="192.168.10.0/24", ip_policy_id="113"))
dt = datetime.datetime(1970, 1, 1)
self.connection.execute(
self.ip_policy_cidrs.insert(),
dict(id="221", created_at=dt,
ip_policy_id="112", cidr="192.168.10.13/32"),
dict(id="222", created_at=dt,
ip_policy_id="113", cidr="192.168.10.0/32"),
dict(id="223", created_at=dt,
ip_policy_id="113", cidr="192.168.10.255/32"))
alembic_command.upgrade(self.config, '45a07fac3d38')
results = self.connection.execute(
select([self.ip_policy_cidrs]).where(
self.ip_policy_cidrs.c.ip_policy_id == None)).fetchall() # noqa
self.assertEqual(len(results), 0)
results = self.connection.execute(
select([self.ip_policy_cidrs]).where(
self.ip_policy_cidrs.c.ip_policy_id == "111")).fetchall()
self.assertEqual(len(results), 2)
default_cidrs = ["192.168.10.0/32", "192.168.10.255/32"]
self.assertIn(results[0]["cidr"], default_cidrs)
self.assertIn(results[1]["cidr"], default_cidrs)
self.assertNotEqual(results[0]["cidr"], results[1]["cidr"])
results = self.connection.execute(
select([self.ip_policy_cidrs]).where(
self.ip_policy_cidrs.c.ip_policy_id == "112")).fetchall()
self.assertEqual(len(results), 3)
default_cidrs = ["192.168.10.0/32", "192.168.10.255/32",
"192.168.10.13/32"]
for result in results:
self.assertIn(result["cidr"], default_cidrs)
self.assertNotEqual(results[0]["cidr"], results[1]["cidr"])
self.assertNotEqual(results[0]["cidr"], results[2]["cidr"])
self.assertNotEqual(results[1]["cidr"], results[2]["cidr"])
results = self.connection.execute(
select([self.ip_policy_cidrs]).where(
self.ip_policy_cidrs.c.ip_policy_id == "113")).fetchall()
self.assertEqual(len(results), 2)
default_cidrs = ["192.168.10.0/32", "192.168.10.255/32"]
self.assertIn(results[0]["cidr"], default_cidrs)
self.assertIn(results[1]["cidr"], default_cidrs)
self.assertTrue(results[0]["id"] == "222" or results[0]["id"] == "223")
self.assertTrue(results[1]["id"] == "222" or results[1]["id"] == "223")
self.assertEqual(results[0]["created_at"], dt)
self.assertEqual(results[1]["created_at"], dt)
def test_downgrade(self):
alembic_command.upgrade(self.config, '45a07fac3d38')
with self.assertRaises(NotImplementedError):
alembic_command.downgrade(self.config, '2748e48cee3a')
class Test552b213c2b8c(BaseMigrationTest):
def setUp(self):
super(Test552b213c2b8c, self).setUp()
alembic_command.upgrade(self.config, '45a07fac3d38')
self.ip_policy = table(
'quark_ip_policy',
column('id', sa.String(length=36)),
column('tenant_id', sa.String(length=255)),
column('created_at', sa.DateTime()))
self.ip_policy_cidrs = table(
'quark_ip_policy_cidrs',
column('id', sa.String(length=36)),
column('created_at', sa.DateTime()),
column('ip_policy_id', sa.String(length=36)),
column('cidr', sa.String(length=64)))
self.subnets = table(
'quark_subnets',
column('id', sa.String(length=36)),
column('tenant_id', sa.String(length=255)),
column('_cidr', sa.String(length=64)),
column('ip_policy_id', sa.String(length=36)))
def test_upgrade_no_subnets(self):
alembic_command.upgrade(self.config, '552b213c2b8c')
results = self.connection.execute(
select([self.ip_policy_cidrs])).fetchall()
self.assertEqual(len(results), 0)
def test_upgrade_subnets_with_ip_policy(self):
dt = datetime.datetime(1970, 1, 1)
self.connection.execute(
self.subnets.insert(),
dict(id="000", tenant_id="foo", _cidr="192.168.10.0/24",
ip_policy_id="111"))
self.connection.execute(
self.ip_policy.insert(),
dict(id="111", tenant_id="foo", created_at=dt))
self.connection.execute(
self.ip_policy_cidrs.insert(),
dict(id="221", created_at=dt,
ip_policy_id="111", cidr="192.168.10.13/32"))
alembic_command.upgrade(self.config, '552b213c2b8c')
results = self.connection.execute(
select([self.ip_policy])).fetchall()
self.assertEqual(len(results), 1)
result = results[0]
self.assertEqual(result["id"], "111")
self.assertEqual(result["tenant_id"], "foo")
self.assertEqual(result["created_at"], dt)
results = self.connection.execute(
select([self.ip_policy_cidrs])).fetchall()
self.assertEqual(len(results), 1)
result = results[0]
self.assertEqual(result["id"], "221")
self.assertEqual(result["created_at"], dt)
self.assertEqual(result["ip_policy_id"], "111")
self.assertEqual(result["cidr"], "192.168.10.13/32")
results = self.connection.execute(
select([self.subnets])).fetchall()
self.assertEqual(len(results), 1)
self.assertEqual(results[0]["ip_policy_id"], "111")
def test_upgrade_subnets_no_ip_policy(self):
self.connection.execute(
self.subnets.insert(),
dict(id="000", tenant_id="foo", _cidr="192.168.10.0/24",
ip_policy_id=None))
with contextlib.nested(
mock.patch("neutron.openstack.common.timeutils"),
mock.patch("neutron.openstack.common.uuidutils")
) as (tu, uuid):
dt = datetime.datetime(1970, 1, 1)
tu.utcnow.return_value = dt
uuid.generate_uuid.side_effect = ("666", "667", "668")
alembic_command.upgrade(self.config, '552b213c2b8c')
results = self.connection.execute(
select([self.ip_policy])).fetchall()
self.assertEqual(len(results), 1)
result = results[0]
self.assertEqual(result["id"], "666")
self.assertEqual(result["tenant_id"], "foo")
self.assertEqual(result["created_at"], dt)
results = self.connection.execute(
select([self.ip_policy_cidrs])).fetchall()
self.assertEqual(len(results), 2)
for result in results:
self.assertIn(result["id"], ("667", "668"))
self.assertEqual(result["created_at"], dt)
self.assertEqual(result["ip_policy_id"], "666")
self.assertIn(result["cidr"],
("192.168.10.0/32", "192.168.10.255/32"))
self.assertNotEqual(results[0]["cidr"], results[1]["cidr"])
results = self.connection.execute(
select([self.subnets])).fetchall()
self.assertEqual(len(results), 1)
self.assertEqual(results[0]["ip_policy_id"], "666")
def test_upgrade_subnets_no_ip_policy_v6(self):
self.connection.execute(
self.subnets.insert(),
dict(id="000", tenant_id="foo", _cidr="fd00::/64",
ip_policy_id=None))
with contextlib.nested(
mock.patch("neutron.openstack.common.timeutils"),
mock.patch("neutron.openstack.common.uuidutils")
) as (tu, uuid):
dt = datetime.datetime(1970, 1, 1)
tu.utcnow.return_value = dt
uuid.generate_uuid.side_effect = ("666", "667", "668")
alembic_command.upgrade(self.config, '552b213c2b8c')
results = self.connection.execute(
select([self.ip_policy])).fetchall()
self.assertEqual(len(results), 1)
result = results[0]
self.assertEqual(result["id"], "666")
self.assertEqual(result["tenant_id"], "foo")
self.assertEqual(result["created_at"], dt)
results = self.connection.execute(
select([self.ip_policy_cidrs])).fetchall()
self.assertEqual(len(results), 2)
for result in results:
self.assertIn(result["id"], ("667", "668"))
self.assertEqual(result["created_at"], dt)
self.assertEqual(result["ip_policy_id"], "666")
self.assertIn(result["cidr"],
("fd00::/128",
"fd00::ffff:ffff:ffff:ffff/128"))
self.assertNotEqual(results[0]["cidr"], results[1]["cidr"])
results = self.connection.execute(
select([self.subnets])).fetchall()
self.assertEqual(len(results), 1)
self.assertEqual(results[0]["ip_policy_id"], "666")
def test_upgrade_bulk(self):
dt = datetime.datetime(1970, 1, 1)
self.connection.execute(
self.subnets.insert(),
dict(id="000", tenant_id="foo", _cidr="192.168.10.0/24",
ip_policy_id="111"),
dict(id="001", tenant_id="foo", _cidr="192.168.10.0/24",
ip_policy_id=None),
dict(id="002", tenant_id="foo", _cidr="fd00::/64",
ip_policy_id=None))
self.connection.execute(
self.ip_policy.insert(),
dict(id="111", tenant_id="foo", created_at=dt))
self.connection.execute(
self.ip_policy_cidrs.insert(),
dict(id="221", created_at=dt,
ip_policy_id="111", cidr="192.168.10.13/32"))
with contextlib.nested(
mock.patch("neutron.openstack.common.timeutils"),
mock.patch("neutron.openstack.common.uuidutils")
) as (tu, uuid):
tu.utcnow.return_value = dt
uuid.generate_uuid.side_effect = ("5", "6", "7", "8", "9", "10")
alembic_command.upgrade(self.config, '552b213c2b8c')
results = self.connection.execute(
select([self.ip_policy]).where(
self.ip_policy.c.id == "111")).fetchall()
self.assertEqual(len(results), 1)
result = results[0]
self.assertEqual(result["id"], "111")
self.assertEqual(result["tenant_id"], "foo")
self.assertEqual(result["created_at"], dt)
results = self.connection.execute(
select([self.ip_policy_cidrs]).where(
self.ip_policy_cidrs.c.ip_policy_id == "111")).fetchall()
self.assertEqual(len(results), 1)
result = results[0]
self.assertEqual(result["id"], "221")
self.assertEqual(result["created_at"], dt)
self.assertEqual(result["ip_policy_id"], "111")
self.assertEqual(result["cidr"], "192.168.10.13/32")
results = self.connection.execute(
select([self.subnets]).where(
self.subnets.c.ip_policy_id == "111")).fetchall()
self.assertEqual(len(results), 1)
self.assertEqual(results[0]["ip_policy_id"], "111")
results = self.connection.execute(
select([self.ip_policy]).where(
self.ip_policy.c.id != "111")).fetchall()
self.assertEqual(len(results), 2)
for result in results:
self.assertIn(int(result["id"]), range(5, 11))
self.assertEqual(result["tenant_id"], "foo")
self.assertEqual(result["created_at"], dt)
results = self.connection.execute(
select([self.ip_policy_cidrs]).where(
self.ip_policy_cidrs.c.ip_policy_id != "111")).fetchall()
self.assertEqual(len(results), 4)
for result in results:
self.assertIn(int(result["id"]), range(5, 11))
self.assertEqual(result["created_at"], dt)
self.assertIn(int(result["ip_policy_id"]), range(5, 11))
self.assertIn(result["cidr"], (
"192.168.10.0/32", "192.168.10.255/32",
"fd00::/128", "fd00::ffff:ffff:ffff:ffff/128"))
results = self.connection.execute(
select([self.subnets]).where(
self.subnets.c.ip_policy_id != "111")).fetchall()
self.assertEqual(len(results), 2)
for subnet in results:
self.assertIn(int(subnet["ip_policy_id"]), range(5, 11))
def test_downgrade(self):
alembic_command.upgrade(self.config, '552b213c2b8c')
with self.assertRaises(NotImplementedError):
alembic_command.downgrade(self.config, '45a07fac3d38')