Merge "[FT] Remove unneeded DB migration tests"

This commit is contained in:
Zuul 2021-01-15 10:48:50 +00:00 committed by Gerrit Code Review
commit 1683ada81f
5 changed files with 0 additions and 461 deletions

View File

@ -1,87 +0,0 @@
# Copyright 2016 Business Cat is Very Serious 13.37
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import collections
from oslo_db.sqlalchemy import utils as db_utils
from oslo_utils import uuidutils
from neutron.tests.functional.db import test_migrations
class HARouterPortMigrationMixin(object):
"""Validates binding_index for RouterL3AgentBinding migration."""
def _create_so(self, o_type, values):
"""create standard attr object."""
stan = db_utils.get_table(self.engine, 'standardattributes')
# find next available id taking into account existing records
rec_ids = [r.id for r in self.engine.execute(stan.select()).fetchall()]
next_id = max([0] + rec_ids) + 1
self.engine.execute(stan.insert().values({'id': next_id,
'resource_type': o_type}))
values['standard_attr_id'] = next_id
return self._create_rec(o_type, values)
def _create_rec(self, o_type, values):
otable = db_utils.get_table(self.engine, o_type)
self.engine.execute(otable.insert().values(values))
def _make_router_agents_and_bindings(self, router_id):
self._create_so('routers', {'id': router_id})
# each router gets a couple of agents
for _ in range(2):
agent_id = uuidutils.generate_uuid()
timestamp = '2000-04-06T14:34:23'
self._create_rec('agents', {'id': agent_id,
'topic': 'x',
'agent_type': 'L3',
'binary': 'x',
'host': agent_id,
'created_at': timestamp,
'started_at': timestamp,
'heartbeat_timestamp': timestamp,
'configurations': ''})
self._create_rec('routerl3agentbindings',
{'router_id': router_id, 'l3_agent_id': agent_id})
def _create_ha_routers(self, engine):
for rid in [uuidutils.generate_uuid() for i in range(10)]:
self._make_router_agents_and_bindings(rid)
def _pre_upgrade_2e0d7a8a1586(self, engine):
self._create_ha_routers(engine)
return True # return True so check function is invoked after migrate
def _check_2e0d7a8a1586(self, engine, data):
bindings_table = db_utils.get_table(engine, 'routerl3agentbindings')
rows = engine.execute(bindings_table.select()).fetchall()
routers_to_bindings = collections.defaultdict(list)
for router_id, agent_id, binding_index in rows:
routers_to_bindings[router_id].append(binding_index)
for binding_indices in routers_to_bindings.values():
self.assertEqual(list(range(1, 3)), sorted(binding_indices))
class TestHARouterPortMigrationMysql(HARouterPortMigrationMixin,
test_migrations.TestWalkMigrationsMysql):
pass
class TestHARouterPortMigrationPsql(HARouterPortMigrationMixin,
test_migrations.TestWalkMigrationsPsql):
pass

View File

@ -1,142 +0,0 @@
# Copyright 2016 Infoblox Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from neutron_lib import constants
from oslo_db.sqlalchemy import utils as db_utils
from oslo_utils import uuidutils
from neutron.tests.functional.db import test_migrations
class MigrationToPluggableIpamMixin(object):
"""Validates data migration to Pluggable IPAM."""
_standard_attribute_id = 0
def _gen_attr_id(self, engine, type):
self._standard_attribute_id += 1
standardattributes = db_utils.get_table(engine, 'standardattributes')
engine.execute(standardattributes.insert().values({
'id': self._standard_attribute_id, 'resource_type': type}))
return self._standard_attribute_id
def _create_subnets(self, engine, data):
"""Create subnets and saves subnet id in data"""
networks = db_utils.get_table(engine, 'networks')
subnets = db_utils.get_table(engine, 'subnets')
pools = db_utils.get_table(engine, 'ipallocationpools')
allocations = db_utils.get_table(engine, 'ipallocations')
for cidr in data:
ip_version = (constants.IP_VERSION_6 if ':' in cidr else
constants.IP_VERSION_4)
# Save generated id in incoming dict to simplify validations
network_id = uuidutils.generate_uuid()
network_dict = dict(
id=network_id,
standard_attr_id=self._gen_attr_id(engine, 'networks'))
engine.execute(networks.insert().values(network_dict))
data[cidr]['id'] = uuidutils.generate_uuid()
subnet_dict = dict(id=data[cidr]['id'],
cidr=cidr,
ip_version=ip_version,
standard_attr_id=self._gen_attr_id(engine,
'subnets'),
network_id=network_id)
engine.execute(subnets.insert().values(subnet_dict))
if data[cidr].get('pools'):
for pool in data[cidr]['pools']:
pool_dict = dict(id=uuidutils.generate_uuid(),
first_ip=pool['first_ip'],
last_ip=pool['last_ip'],
subnet_id=data[cidr]['id'])
engine.execute(pools.insert().values(pool_dict))
if data[cidr].get('allocations'):
for ip in data[cidr]['allocations']:
ip_dict = dict(ip_address=ip,
subnet_id=data[cidr]['id'],
network_id=network_id)
engine.execute(allocations.insert().values(ip_dict))
def _pre_upgrade_3b935b28e7a0(self, engine):
data = {
'172.23.0.0/16': {
'pools': [{'first_ip': '172.23.0.2',
'last_ip': '172.23.255.254'}],
'allocations': ('172.23.0.2', '172.23.245.2')},
'192.168.40.0/24': {
'pools': [{'first_ip': '192.168.40.2',
'last_ip': '192.168.40.100'},
{'first_ip': '192.168.40.105',
'last_ip': '192.168.40.150'},
{'first_ip': '192.168.40.155',
'last_ip': '192.168.40.157'},
],
'allocations': ('192.168.40.2', '192.168.40.3',
'192.168.40.15', '192.168.40.60')},
'fafc:babc::/64': {
'pools': [{'first_ip': 'fafc:babc::2',
'last_ip': 'fafc:babc::6:fe00',
}],
'allocations': ('fafc:babc::3',)}}
self._create_subnets(engine, data)
return data
def _check_3b935b28e7a0(self, engine, data):
subnets = db_utils.get_table(engine, 'ipamsubnets')
pools = db_utils.get_table(engine, 'ipamallocationpools')
allocations = db_utils.get_table(engine, 'ipamallocations')
ipam_subnets = engine.execute(subnets.select()).fetchall()
# Count of ipam subnets should match count of usual subnets
self.assertEqual(len(data), len(ipam_subnets))
neutron_to_ipam_id = {subnet.neutron_subnet_id: subnet.id
for subnet in ipam_subnets}
for cidr in data:
self.assertIn(data[cidr]['id'], neutron_to_ipam_id)
ipam_subnet_id = neutron_to_ipam_id[data[cidr]['id']]
# Validate ip allocations are migrated correctly
ipam_allocations = engine.execute(allocations.select().where(
allocations.c.ipam_subnet_id == ipam_subnet_id)).fetchall()
for ipam_allocation in ipam_allocations:
self.assertIn(ipam_allocation.ip_address,
data[cidr]['allocations'])
self.assertEqual(len(data[cidr]['allocations']),
len(ipam_allocations))
# Validate allocation pools are migrated correctly
ipam_pools = engine.execute(pools.select().where(
pools.c.ipam_subnet_id == ipam_subnet_id)).fetchall()
# Covert to dict for easier lookup
pool_dict = {pool.first_ip: pool.last_ip for pool in ipam_pools}
for p in data[cidr]['pools']:
self.assertIn(p['first_ip'], pool_dict)
self.assertEqual(p['last_ip'], pool_dict[p['first_ip']])
self.assertEqual(len(data[cidr]['pools']),
len(ipam_pools))
class TestMigrationToPluggableIpamMysql(MigrationToPluggableIpamMixin,
test_migrations.TestWalkMigrationsMysql):
pass
class TestMigrationToPluggableIpamPsql(MigrationToPluggableIpamMixin,
test_migrations.TestWalkMigrationsPsql):
pass

View File

@ -1,70 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_db.sqlalchemy import utils as db_utils
from oslo_utils import uuidutils
from neutron.tests.functional.db import test_migrations
def _create_record_with_sa(engine, resource_type, attributes):
"""Create a record with standard attributes."""
sa_table = db_utils.get_table(engine, 'standardattributes')
sa_record = engine.execute(sa_table.insert().values(
{'resource_type': resource_type}))
attributes['standard_attr_id'] = sa_record.inserted_primary_key[0]
resource_table = db_utils.get_table(engine, resource_type)
engine.execute(resource_table.insert().values(attributes))
class NetworkSegmentNameAndDescriptionMixin(object):
"""Validates migration that adds name and description ."""
def _pre_upgrade_97c25b0d2353(self, engine):
# Create a network for segments to belong to
net_id = uuidutils.generate_uuid()
_create_record_with_sa(engine, 'networks', {
'id': net_id, 'name': '97c25b0d2353'})
# Create some segments with old model
ns_table = db_utils.get_table(engine, 'networksegments')
for s in range(5):
engine.execute(ns_table.insert().values({
'id': uuidutils.generate_uuid(),
'network_id': net_id,
'network_type': 'flat'}))
return True # Return True so check function is invoked after migrate
def _check_97c25b0d2353(self, engine, data):
ns_table = db_utils.get_table(engine, 'networksegments')
sa_table = db_utils.get_table(engine, 'standardattributes')
for segment in engine.execute(ns_table.select()).fetchall():
# Ensure a stdattr record was created for this old segment
standard_id = segment.standard_attr_id
rows = engine.execute(sa_table.select().where(
sa_table.c.id == standard_id)).fetchall()
self.assertEqual(1, len(rows))
# Ensure this old segment can now be named
engine.execute(ns_table.update().values(name='Zeus').where(
ns_table.c.standard_attr_id == standard_id))
class TestNetworkSegmentNameDescMySql(NetworkSegmentNameAndDescriptionMixin,
test_migrations.TestWalkMigrationsMysql):
pass
class TestNetworkSegmentNameDescPsql(NetworkSegmentNameAndDescriptionMixin,
test_migrations.TestWalkMigrationsPsql):
pass

View File

@ -1,102 +0,0 @@
# Copyright 2016 Business Cat is Very Serious
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from neutron_lib import constants
from oslo_db.sqlalchemy import utils as db_utils
from oslo_utils import uuidutils
from neutron.tests.functional.db import test_migrations
class HARouterPortMigrationMixin(object):
"""Validates HA port to router port migration."""
def _create_so(self, o_type, values):
"""create standard attr object."""
stan = db_utils.get_table(self.engine, 'standardattributes')
# find next available id taking into account existing records
rec_ids = [r.id for r in self.engine.execute(stan.select()).fetchall()]
next_id = max([0] + rec_ids) + 1
self.engine.execute(stan.insert().values({'id': next_id,
'resource_type': o_type}))
values['standard_attr_id'] = next_id
return self._create_rec(o_type, values)
def _create_rec(self, o_type, values):
otable = db_utils.get_table(self.engine, o_type)
self.engine.execute(otable.insert().values(values))
def _make_router_agents_and_ports(self, router_id, network_id,
add_binding):
self._create_so('routers', {'id': router_id})
# each router gets a couple of agents
for _ in range(2):
port_id = uuidutils.generate_uuid()
self._create_so('ports', {'id': port_id, 'network_id': network_id,
'mac_address': port_id[0:31],
'admin_state_up': True,
'device_id': router_id,
'device_owner': 'network',
'status': 'ACTIVE'})
agent_id = uuidutils.generate_uuid()
timestamp = '2000-04-06T14:34:23'
self._create_rec('agents', {'id': agent_id, 'topic': 'x',
'agent_type': 'L3',
'binary': 'x',
'host': agent_id,
'created_at': timestamp,
'started_at': timestamp,
'heartbeat_timestamp': timestamp,
'configurations': ''})
self._create_rec('ha_router_agent_port_bindings',
{'port_id': port_id, 'router_id': router_id,
'l3_agent_id': agent_id})
if add_binding:
ptype = constants.DEVICE_OWNER_ROUTER_HA_INTF
self._create_rec('routerports',
{'router_id': router_id, 'port_id': port_id,
'port_type': ptype})
def _create_ha_routers_with_ports(self, engine):
network_id = uuidutils.generate_uuid()
self._create_so('networks', {'id': network_id})
unpatched_router_ids = [uuidutils.generate_uuid() for i in range(10)]
for rid in unpatched_router_ids:
self._make_router_agents_and_ports(rid, network_id, False)
# make half of the routers already have routerport bindings to simulate
# a back-port of Ifd3e007aaf2a2ed8123275aa3a9f540838e3c003
patched_router_ids = [uuidutils.generate_uuid() for i in range(10)]
for rid in patched_router_ids:
self._make_router_agents_and_ports(rid, network_id, True)
def _pre_upgrade_a8b517cff8ab(self, engine):
self._create_ha_routers_with_ports(engine)
return True # return True so check function is invoked after migrate
def _check_a8b517cff8ab(self, engine, data):
rp = db_utils.get_table(engine, 'routerports')
# just ensuring the correct count of routerport records is enough.
# 20 routers * 2 ports per router
self.assertEqual(40, len(engine.execute(rp.select()).fetchall()))
class TestHARouterPortMigrationMysql(HARouterPortMigrationMixin,
test_migrations.TestWalkMigrationsMysql):
pass
class TestHARouterPortMigrationPsql(HARouterPortMigrationMixin,
test_migrations.TestWalkMigrationsPsql):
pass

View File

@ -1,60 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo_db.sqlalchemy import utils as db_utils
from oslo_utils import uuidutils
from neutron.tests.functional.db import test_migrations
class QosStandardAttrMixin(object):
"""Validates qos standard attr migration."""
def _create_qos_pol(self, pol_id, description):
otable = db_utils.get_table(self.engine, 'qos_policies')
values = {'id': pol_id, 'description': description}
self.engine.execute(otable.insert().values(values))
def _create_policies_with_descriptions(self, engine):
for i in range(10):
pol_id = uuidutils.generate_uuid()
self._create_qos_pol(pol_id, 'description-%s' % pol_id)
def _pre_upgrade_b12a3ef66e62(self, engine):
self._create_policies_with_descriptions(engine)
return True # return True so check function is invoked after migrate
def _check_b12a3ef66e62(self, engine, data):
qp = db_utils.get_table(engine, 'qos_policies')
sa = db_utils.get_table(engine, 'standardattributes')
for qos_pol in engine.execute(qp.select()).fetchall():
# ensure standard attributes model was created
standard_id = qos_pol.standard_attr_id
rows = engine.execute(
sa.select().where(sa.c.id == standard_id)).fetchall()
self.assertEqual(1, len(rows))
# ensure description got moved over
self.assertEqual('description-%s' % qos_pol.id,
rows[0].description)
class TestQosStandardAttrMysql(QosStandardAttrMixin,
test_migrations.TestWalkMigrationsMysql):
pass
class TestQosStandardAttrPsql(QosStandardAttrMixin,
test_migrations.TestWalkMigrationsPsql):
pass