Truncate IPPolicyCIDR's outside of Subnet._cidr
RM7210
This commit is contained in:
@@ -20,8 +20,9 @@ logging_config.fileConfig(config.config_file_name)
|
||||
# for 'autogenerate' support
|
||||
target_metadata = models.BASEV2.metadata
|
||||
# FIXME: https://bitbucket.org/zzzeek/alembic/issue/38
|
||||
table_names = set([tbl.name for tbl in target_metadata.sorted_tables])
|
||||
for t in quota_driver.quota_db.Quota.metadata.tables.values():
|
||||
if t.name == "quotas":
|
||||
if t.name == "quotas" and t.name not in table_names:
|
||||
t.tometadata(target_metadata)
|
||||
|
||||
# other values from the config, defined by the needs of env.py,
|
||||
|
||||
@@ -0,0 +1,94 @@
|
||||
"""Truncate IPPolicyCIDR outside of Subnet._cidr
|
||||
|
||||
Revision ID: 2748e48cee3a
|
||||
Revises: 4358d1b8cc75
|
||||
Create Date: 2014-06-27 05:46:09.430767
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '2748e48cee3a'
|
||||
down_revision = '1284c81cf727'
|
||||
|
||||
import logging
|
||||
|
||||
from alembic import op
|
||||
from sqlalchemy.sql import column, select, table
|
||||
import netaddr
|
||||
from neutron.openstack.common import timeutils
|
||||
from neutron.openstack.common import uuidutils
|
||||
import sqlalchemy as sa
|
||||
|
||||
LOG = logging.getLogger("alembic.migration")
|
||||
|
||||
|
||||
def upgrade():
|
||||
ip_policy_cidrs = table('quark_ip_policy_cidrs',
|
||||
column('id', sa.String(length=36)),
|
||||
column('created_at', sa.DateTime()),
|
||||
column('ip_policy_id', sa.String(length=36)),
|
||||
column('cidr', sa.String(length=64)))
|
||||
subnets = table('quark_subnets',
|
||||
column('_cidr', sa.String(length=64)),
|
||||
column('ip_policy_id', sa.String(length=36)))
|
||||
|
||||
connection = op.get_bind()
|
||||
|
||||
# 1. Find `quark_ip_policy_cidrs` rows.
|
||||
data = connection.execute(select([
|
||||
subnets.c.ip_policy_id, subnets.c._cidr,
|
||||
ip_policy_cidrs.c.id, ip_policy_cidrs.c.cidr]).where(
|
||||
subnets.c.ip_policy_id == ip_policy_cidrs.c.ip_policy_id).order_by(
|
||||
subnets.c.ip_policy_id)).fetchall()
|
||||
if data is None:
|
||||
return
|
||||
|
||||
# 2. Accumulate with `quark_ip_policy_cidrs` rows are outside of the
|
||||
# subnet's cidr.
|
||||
ipp_to_update = dict()
|
||||
|
||||
def _test_change_needed(ipp_id, s, ipp):
|
||||
if s is None or ipp is None:
|
||||
return
|
||||
diff = ipp - s
|
||||
if diff.size > 0:
|
||||
ipp_to_update[ipp_id] = ipp & s
|
||||
|
||||
prev_ip_policy_id = ''
|
||||
subnet, ip_policy = None, None
|
||||
for ip_policy_id, cidr, ippc_id, ippc_cidr in data:
|
||||
if ip_policy_id != prev_ip_policy_id:
|
||||
_test_change_needed(prev_ip_policy_id, subnet, ip_policy)
|
||||
subnet, ip_policy = netaddr.IPSet([cidr]), netaddr.IPSet()
|
||||
ip_policy |= netaddr.IPSet([ippc_cidr])
|
||||
prev_ip_policy_id = ip_policy_id
|
||||
_test_change_needed(prev_ip_policy_id, subnet, ip_policy)
|
||||
|
||||
if not ipp_to_update.keys():
|
||||
return
|
||||
|
||||
LOG.info("IP Policy IDs to update: %s", ipp_to_update.keys())
|
||||
|
||||
# 3. Delete `quark_ip_policy_cidrs` rows that need to be replaced with rows
|
||||
# that are inside of the subnet's cidr.
|
||||
connection.execute(ip_policy_cidrs.delete().where(
|
||||
ip_policy_cidrs.c.ip_policy_id.in_(ipp_to_update.keys())))
|
||||
|
||||
# 4. Insert `quark_ip_policy_cidrs` rows with cidrs that are inside the
|
||||
# subnet's cidr.
|
||||
vals = [dict(id=uuidutils.generate_uuid(),
|
||||
created_at=timeutils.utcnow(),
|
||||
ip_policy_id=key,
|
||||
cidr=str(x.cidr))
|
||||
for key in ipp_to_update.keys()
|
||||
for x in ipp_to_update[key].iter_cidrs()]
|
||||
if not vals:
|
||||
return
|
||||
|
||||
LOG.info("IP Policy CIDR IDs to insert: %s", [v["id"] for v in vals])
|
||||
|
||||
connection.execute(ip_policy_cidrs.insert(), *vals)
|
||||
|
||||
|
||||
def downgrade():
|
||||
raise NotImplementedError()
|
||||
@@ -1 +1 @@
|
||||
1284c81cf727
|
||||
2748e48cee3a
|
||||
|
||||
@@ -390,10 +390,7 @@ class IPPolicy(BASEV2, models.HasId, models.HasTenant):
|
||||
for ip_policy_cidr in ip_policies]
|
||||
|
||||
ip_policy_cidrs = ip_policy_cidrs + default_policy_cidrs
|
||||
|
||||
ip_set = netaddr.IPSet(ip_policy_cidrs)
|
||||
|
||||
return ip_set & netaddr.IPSet([subnet_cidr])
|
||||
return netaddr.IPSet(ip_policy_cidrs)
|
||||
|
||||
|
||||
class IPPolicyCIDR(BASEV2, models.HasId):
|
||||
|
||||
@@ -70,6 +70,75 @@ def _validate_subnet_cidr(context, network_id, new_subnet_cidr):
|
||||
raise exceptions.InvalidInput(error_message=err_msg)
|
||||
|
||||
|
||||
# Note(asadoughi): Copied from neutron/db/db_base_plugin_v2.py
|
||||
def _validate_allocation_pools(ip_pools, subnet_cidr):
|
||||
"""Validate IP allocation pools.
|
||||
|
||||
Verify start and end address for each allocation pool are valid,
|
||||
ie: constituted by valid and appropriately ordered IP addresses.
|
||||
Also, verify pools do not overlap among themselves.
|
||||
Finally, verify that each range fall within the subnet's CIDR.
|
||||
"""
|
||||
subnet = netaddr.IPNetwork(subnet_cidr)
|
||||
subnet_first_ip = netaddr.IPAddress(subnet.first + 1)
|
||||
subnet_last_ip = netaddr.IPAddress(subnet.last - 1)
|
||||
|
||||
LOG.debug(_("Performing IP validity checks on allocation pools"))
|
||||
ip_sets = []
|
||||
for ip_pool in ip_pools:
|
||||
try:
|
||||
start_ip = netaddr.IPAddress(ip_pool['start'])
|
||||
end_ip = netaddr.IPAddress(ip_pool['end'])
|
||||
except netaddr.AddrFormatError:
|
||||
LOG.info(_("Found invalid IP address in pool: "
|
||||
"%(start)s - %(end)s:"),
|
||||
{'start': ip_pool['start'],
|
||||
'end': ip_pool['end']})
|
||||
raise exceptions.InvalidAllocationPool(pool=ip_pool)
|
||||
if (start_ip.version != subnet.version or
|
||||
end_ip.version != subnet.version):
|
||||
LOG.info(_("Specified IP addresses do not match "
|
||||
"the subnet IP version"))
|
||||
raise exceptions.InvalidAllocationPool(pool=ip_pool)
|
||||
if end_ip < start_ip:
|
||||
LOG.info(_("Start IP (%(start)s) is greater than end IP "
|
||||
"(%(end)s)"),
|
||||
{'start': ip_pool['start'], 'end': ip_pool['end']})
|
||||
raise exceptions.InvalidAllocationPool(pool=ip_pool)
|
||||
if start_ip < subnet_first_ip or end_ip > subnet_last_ip:
|
||||
LOG.info(_("Found pool larger than subnet "
|
||||
"CIDR:%(start)s - %(end)s"),
|
||||
{'start': ip_pool['start'],
|
||||
'end': ip_pool['end']})
|
||||
raise exceptions.OutOfBoundsAllocationPool(
|
||||
pool=ip_pool,
|
||||
subnet_cidr=subnet_cidr)
|
||||
# Valid allocation pool
|
||||
# Create an IPSet for it for easily verifying overlaps
|
||||
ip_sets.append(netaddr.IPSet(netaddr.IPRange(
|
||||
ip_pool['start'],
|
||||
ip_pool['end']).cidrs()))
|
||||
|
||||
LOG.debug(_("Checking for overlaps among allocation pools "
|
||||
"and gateway ip"))
|
||||
ip_ranges = ip_pools[:]
|
||||
|
||||
# Use integer cursors as an efficient way for implementing
|
||||
# comparison and avoiding comparing the same pair twice
|
||||
for l_cursor in range(len(ip_sets)):
|
||||
for r_cursor in range(l_cursor + 1, len(ip_sets)):
|
||||
if ip_sets[l_cursor] & ip_sets[r_cursor]:
|
||||
l_range = ip_ranges[l_cursor]
|
||||
r_range = ip_ranges[r_cursor]
|
||||
LOG.info(_("Found overlapping ranges: %(l_range)s and "
|
||||
"%(r_range)s"),
|
||||
{'l_range': l_range, 'r_range': r_range})
|
||||
raise exceptions.OverlappingAllocationPools(
|
||||
pool_1=l_range,
|
||||
pool_2=r_range,
|
||||
subnet_cidr=subnet_cidr)
|
||||
|
||||
|
||||
def _get_exclude_cidrs_from_allocation_pools(subnet_db, allocation_pools):
|
||||
subnet_net = netaddr.IPNetwork(subnet_db["cidr"])
|
||||
cidrset = netaddr.IPSet(
|
||||
@@ -164,6 +233,7 @@ def create_subnet(context, subnet):
|
||||
context, ip=netaddr.IPAddress(dns_ip)))
|
||||
|
||||
if isinstance(allocation_pools, list) and allocation_pools:
|
||||
_validate_allocation_pools(allocation_pools, sub_attrs["cidr"])
|
||||
cidrs = _get_exclude_cidrs_from_allocation_pools(
|
||||
new_subnet, allocation_pools)
|
||||
new_subnet["ip_policy"] = db_api.ip_policy_create(context,
|
||||
@@ -245,6 +315,7 @@ def update_subnet(context, id, subnet):
|
||||
context, cidr=route["destination"], gateway=route["nexthop"]))
|
||||
|
||||
if isinstance(allocation_pools, list) and allocation_pools:
|
||||
_validate_allocation_pools(allocation_pools, subnet_db["cidr"])
|
||||
cidrs = _get_exclude_cidrs_from_allocation_pools(
|
||||
subnet_db, allocation_pools)
|
||||
if subnet_db["ip_policy"]:
|
||||
|
||||
@@ -261,6 +261,26 @@ class TestQuarkCreateSubnetAllocationPools(test_quark_plugin.TestQuarkPlugin):
|
||||
self.assertEqual(subnet_create.call_count, 1)
|
||||
self.assertEqual(resp["allocation_pools"], pools)
|
||||
|
||||
def test_create_subnet_allocation_pools_invalid_outside(self):
|
||||
pools = [dict(start="192.168.0.10", end="192.168.0.20")]
|
||||
s = dict(subnet=dict(
|
||||
allocation_pools=pools,
|
||||
cidr="192.168.1.1/24",
|
||||
network_id=1))
|
||||
with self._stubs(s["subnet"]):
|
||||
with self.assertRaises(exceptions.OutOfBoundsAllocationPool):
|
||||
self.plugin.create_subnet(self.context, s)
|
||||
|
||||
def test_create_subnet_allocation_pools_invalid_overlaps(self):
|
||||
pools = [dict(start="192.168.0.255", end="192.168.1.20")]
|
||||
s = dict(subnet=dict(
|
||||
allocation_pools=pools,
|
||||
cidr="192.168.1.1/24",
|
||||
network_id=1))
|
||||
with self._stubs(s["subnet"]):
|
||||
with self.assertRaises(exceptions.OutOfBoundsAllocationPool):
|
||||
self.plugin.create_subnet(self.context, s)
|
||||
|
||||
def test_create_subnet_allocation_pools_two(self):
|
||||
pools = [dict(start="192.168.1.10", end="192.168.1.20"),
|
||||
dict(start="192.168.1.40", end="192.168.1.50")]
|
||||
@@ -880,6 +900,20 @@ class TestQuarkUpdateSubnet(test_quark_plugin.TestQuarkPlugin):
|
||||
self.assertEqual(resp["allocation_pools"],
|
||||
[dict(start="172.16.0.1", end="172.16.0.254")])
|
||||
|
||||
def test_update_subnet_allocation_pools_invalid_outside(self):
|
||||
pools = [dict(start="172.16.1.10", end="172.16.1.20")]
|
||||
s = dict(subnet=dict(allocation_pools=pools))
|
||||
with self._stubs() as (dns_create, route_update, route_create):
|
||||
with self.assertRaises(exceptions.OutOfBoundsAllocationPool):
|
||||
self.plugin.update_subnet(self.context, 1, s)
|
||||
|
||||
def test_create_subnet_allocation_pools_invalid_overlaps(self):
|
||||
pools = [dict(start="172.15.255.255", end="172.16.0.20")]
|
||||
s = dict(subnet=dict(allocation_pools=pools))
|
||||
with self._stubs() as (dns_create, route_update, route_create):
|
||||
with self.assertRaises(exceptions.OutOfBoundsAllocationPool):
|
||||
self.plugin.update_subnet(self.context, 1, s)
|
||||
|
||||
def test_update_subnet_allocation_pools_one(self):
|
||||
pools = [dict(start="172.16.0.10", end="172.16.0.20")]
|
||||
s = dict(subnet=dict(allocation_pools=pools))
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
from netaddr import IPSet
|
||||
|
||||
from quark.db import models
|
||||
@@ -43,14 +42,3 @@ class TestDBModels(test_base.TestBase):
|
||||
ip_policy_rules,
|
||||
IPSet(["fc00::/128",
|
||||
"fdff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/128"]))
|
||||
|
||||
def test_get_ip_policy_cidrs_intersection(self):
|
||||
ip_policy_cidr = mock.MagicMock()
|
||||
ip_policy_cidr.cidr = "10.10.10.1/24"
|
||||
subnet = dict(id=1, ip_version=4, next_auto_assign_ip=0,
|
||||
cidr="0.0.0.0/24", first_ip=0, last_ip=255,
|
||||
network=dict(ip_policy=None),
|
||||
ip_policy=dict(exclude=[ip_policy_cidr]))
|
||||
ip_policy_rules = models.IPPolicy.get_ip_policy_cidrs(subnet)
|
||||
self.assertEqual(ip_policy_rules,
|
||||
IPSet(['0.0.0.0/32', '0.0.0.255/32']))
|
||||
|
||||
221
quark/tests/test_migrations.py
Normal file
221
quark/tests/test_migrations.py
Normal file
@@ -0,0 +1,221 @@
|
||||
import contextlib
|
||||
import datetime
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
from alembic import command as alembic_command
|
||||
from alembic import config as alembic_config
|
||||
import mock
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy import pool
|
||||
from sqlalchemy.sql import column
|
||||
from sqlalchemy.sql import select
|
||||
from sqlalchemy.sql import table
|
||||
|
||||
import quark.db.migration
|
||||
from quark.tests import test_base
|
||||
|
||||
|
||||
class BaseMigrationTest(test_base.TestBase):
|
||||
def setUp(self):
|
||||
self.config = alembic_config.Config(
|
||||
os.path.join(quark.db.migration.__path__[0], 'alembic.ini'))
|
||||
self.config.set_main_option('script_location',
|
||||
'quark.db.migration:alembic')
|
||||
self.fileno, self.filepath = tempfile.mkstemp()
|
||||
secret_cfg = mock.MagicMock()
|
||||
secret_cfg.database.connection = "sqlite:///" + self.filepath
|
||||
self.config.neutron_config = secret_cfg
|
||||
|
||||
engine = create_engine(
|
||||
self.config.neutron_config.database.connection,
|
||||
poolclass=pool.NullPool)
|
||||
self.connection = engine.connect()
|
||||
|
||||
def tearDown(self):
|
||||
self.connection.close()
|
||||
os.unlink(self.filepath)
|
||||
|
||||
|
||||
class Test2748e48cee3a(BaseMigrationTest):
|
||||
def setUp(self):
|
||||
super(Test2748e48cee3a, self).setUp()
|
||||
alembic_command.upgrade(self.config, '1284c81cf727')
|
||||
self.ip_policy_cidrs = table(
|
||||
'quark_ip_policy_cidrs',
|
||||
column('id', sa.String(length=36)),
|
||||
column('created_at', sa.DateTime()),
|
||||
column('ip_policy_id', sa.String(length=36)),
|
||||
column('cidr', sa.String(length=64)))
|
||||
self.subnets = table(
|
||||
'quark_subnets',
|
||||
column('id', sa.String(length=36)),
|
||||
column('_cidr', sa.String(length=64)),
|
||||
column('ip_policy_id', sa.String(length=36)))
|
||||
|
||||
def test_upgrade_no_ip_policy_cidr(self):
|
||||
self.connection.execute(
|
||||
self.subnets.insert(),
|
||||
dict(id="000", _cidr="192.168.10.0/24", ip_policy_id=None))
|
||||
|
||||
alembic_command.upgrade(self.config, '2748e48cee3a')
|
||||
results = self.connection.execute(
|
||||
select([self.ip_policy_cidrs])).fetchall()
|
||||
self.assertEqual(len(results), 0)
|
||||
|
||||
def test_upgrade_ip_policy_cidr_inside(self):
|
||||
self.connection.execute(
|
||||
self.subnets.insert(),
|
||||
dict(id="000", _cidr="192.168.10.0/24", ip_policy_id="111"))
|
||||
dt = datetime.datetime(1970, 1, 1)
|
||||
self.connection.execute(
|
||||
self.ip_policy_cidrs.insert(),
|
||||
dict(id="222", created_at=dt,
|
||||
ip_policy_id="111", cidr="192.168.10.0/32"))
|
||||
|
||||
alembic_command.upgrade(self.config, '2748e48cee3a')
|
||||
results = self.connection.execute(
|
||||
select([self.ip_policy_cidrs])).fetchall()
|
||||
self.assertEqual(len(results), 1)
|
||||
result = results[0]
|
||||
self.assertEqual(result["id"], "222")
|
||||
self.assertEqual(result["created_at"], dt)
|
||||
self.assertEqual(result["ip_policy_id"], "111")
|
||||
self.assertEqual(result["cidr"], "192.168.10.0/32")
|
||||
|
||||
def test_upgrade_ip_policy_cidr_overlaps(self):
|
||||
self.connection.execute(
|
||||
self.subnets.insert(),
|
||||
dict(id="000", _cidr="192.168.10.0/24", ip_policy_id="111"))
|
||||
self.connection.execute(
|
||||
self.ip_policy_cidrs.insert(),
|
||||
dict(id="222", created_at=datetime.date(1970, 1, 1),
|
||||
ip_policy_id="111", cidr="192.168.10.0/16"))
|
||||
|
||||
with contextlib.nested(
|
||||
mock.patch("neutron.openstack.common.timeutils"),
|
||||
mock.patch("neutron.openstack.common.uuidutils")
|
||||
) as (tu, uuid):
|
||||
tu.utcnow.return_value = datetime.datetime(2004, 2, 14)
|
||||
uuid.generate_uuid.return_value = "foo"
|
||||
alembic_command.upgrade(self.config, '2748e48cee3a')
|
||||
results = self.connection.execute(
|
||||
select([self.ip_policy_cidrs])).fetchall()
|
||||
self.assertEqual(len(results), 1)
|
||||
result = results[0]
|
||||
self.assertEqual(result["id"], uuid.generate_uuid.return_value)
|
||||
self.assertEqual(result["created_at"], tu.utcnow.return_value)
|
||||
self.assertEqual(result["ip_policy_id"], "111")
|
||||
self.assertEqual(result["cidr"], "192.168.10.0/24")
|
||||
|
||||
def test_upgrade_ip_policy_cidr_overlaps_v6(self):
|
||||
self.connection.execute(
|
||||
self.subnets.insert(),
|
||||
dict(id="000", _cidr="fd00::/8", ip_policy_id="111"))
|
||||
self.connection.execute(
|
||||
self.ip_policy_cidrs.insert(),
|
||||
dict(id="222", created_at=datetime.date(1970, 1, 1),
|
||||
ip_policy_id="111", cidr="fd00::/7"))
|
||||
|
||||
with contextlib.nested(
|
||||
mock.patch("neutron.openstack.common.timeutils"),
|
||||
mock.patch("neutron.openstack.common.uuidutils")
|
||||
) as (tu, uuid):
|
||||
tu.utcnow.return_value = datetime.datetime(2004, 2, 14)
|
||||
uuid.generate_uuid.return_value = "foo"
|
||||
alembic_command.upgrade(self.config, '2748e48cee3a')
|
||||
results = self.connection.execute(
|
||||
select([self.ip_policy_cidrs])).fetchall()
|
||||
self.assertEqual(len(results), 1)
|
||||
result = results[0]
|
||||
self.assertEqual(result["id"], uuid.generate_uuid.return_value)
|
||||
self.assertEqual(result["created_at"], tu.utcnow.return_value)
|
||||
self.assertEqual(result["ip_policy_id"], "111")
|
||||
self.assertEqual(result["cidr"], "fd00::/8")
|
||||
|
||||
def test_upgrade_ip_policy_cidr_outside(self):
|
||||
self.connection.execute(
|
||||
self.subnets.insert(),
|
||||
dict(id="000", _cidr="192.168.10.0/24", ip_policy_id="111"))
|
||||
self.connection.execute(
|
||||
self.ip_policy_cidrs.insert(),
|
||||
dict(id="222", created_at=datetime.date(1970, 1, 1),
|
||||
ip_policy_id="111", cidr="0.0.0.0/24"))
|
||||
|
||||
alembic_command.upgrade(self.config, '2748e48cee3a')
|
||||
results = self.connection.execute(
|
||||
select([self.ip_policy_cidrs])).fetchall()
|
||||
self.assertEqual(len(results), 0)
|
||||
|
||||
def test_upgrade_bulk(self):
|
||||
self.connection.execute(
|
||||
self.subnets.insert(),
|
||||
dict(id="000", _cidr="192.168.10.0/24", ip_policy_id=None),
|
||||
dict(id="001", _cidr="192.168.10.0/24", ip_policy_id="111"),
|
||||
dict(id="002", _cidr="192.168.10.0/24", ip_policy_id="112"),
|
||||
dict(id="003", _cidr="192.168.10.0/24", ip_policy_id="113"))
|
||||
dt = datetime.datetime(1970, 1, 1)
|
||||
self.connection.execute(
|
||||
self.ip_policy_cidrs.insert(),
|
||||
dict(id="221", created_at=dt, ip_policy_id="111",
|
||||
cidr="192.168.10.0/32"),
|
||||
dict(id="222", created_at=dt, ip_policy_id="112",
|
||||
cidr="192.168.10.0/16"),
|
||||
dict(id="223", created_at=dt, ip_policy_id="113",
|
||||
cidr="0.0.0.0/24"))
|
||||
|
||||
with contextlib.nested(
|
||||
mock.patch("neutron.openstack.common.timeutils"),
|
||||
mock.patch("neutron.openstack.common.uuidutils")
|
||||
) as (tu, uuid):
|
||||
tu.utcnow.return_value = datetime.datetime(2004, 2, 14)
|
||||
uuid.generate_uuid.return_value = "foo"
|
||||
alembic_command.upgrade(self.config, '2748e48cee3a')
|
||||
results = self.connection.execute(
|
||||
select([self.ip_policy_cidrs])).fetchall()
|
||||
self.assertEqual(len(results), 2)
|
||||
result = results[0] if results[0]["id"] == "foo" else results[1]
|
||||
self.assertEqual(result["id"], uuid.generate_uuid.return_value)
|
||||
self.assertEqual(result["created_at"], tu.utcnow.return_value)
|
||||
self.assertEqual(result["ip_policy_id"], "112")
|
||||
self.assertEqual(result["cidr"], "192.168.10.0/24")
|
||||
result = results[0] if results[0]["id"] != "foo" else results[1]
|
||||
self.assertEqual(result["id"], "221")
|
||||
self.assertEqual(result["created_at"], dt)
|
||||
self.assertEqual(result["ip_policy_id"], "111")
|
||||
self.assertEqual(result["cidr"], "192.168.10.0/32")
|
||||
|
||||
def test_upgrade_multiple_ip_policy_cidrs(self):
|
||||
self.connection.execute(
|
||||
self.subnets.insert(),
|
||||
dict(id="000", _cidr="192.168.10.0/24", ip_policy_id="111"))
|
||||
self.connection.execute(
|
||||
self.ip_policy_cidrs.insert(),
|
||||
dict(id="221", created_at=datetime.date(1970, 1, 1),
|
||||
ip_policy_id="111", cidr="0.0.0.0/24"),
|
||||
dict(id="222", created_at=datetime.date(1970, 1, 1),
|
||||
ip_policy_id="111", cidr="192.168.10.255/32"),
|
||||
dict(id="223", created_at=datetime.date(1970, 1, 1),
|
||||
ip_policy_id="111", cidr="192.168.10.0/23"))
|
||||
|
||||
with contextlib.nested(
|
||||
mock.patch("neutron.openstack.common.timeutils"),
|
||||
mock.patch("neutron.openstack.common.uuidutils")
|
||||
) as (tu, uuid):
|
||||
tu.utcnow.return_value = datetime.datetime(2004, 2, 14)
|
||||
uuid.generate_uuid.return_value = "foo"
|
||||
alembic_command.upgrade(self.config, '2748e48cee3a')
|
||||
results = self.connection.execute(
|
||||
select([self.ip_policy_cidrs])).fetchall()
|
||||
self.assertEqual(len(results), 1)
|
||||
result = results[0]
|
||||
self.assertEqual(result["id"], uuid.generate_uuid.return_value)
|
||||
self.assertEqual(result["created_at"], tu.utcnow.return_value)
|
||||
self.assertEqual(result["ip_policy_id"], "111")
|
||||
self.assertEqual(result["cidr"], "192.168.10.0/24")
|
||||
|
||||
def test_downgrade(self):
|
||||
alembic_command.upgrade(self.config, '2748e48cee3a')
|
||||
with self.assertRaises(NotImplementedError):
|
||||
alembic_command.downgrade(self.config, '1284c81cf727')
|
||||
Reference in New Issue
Block a user