diff --git a/neutron/tests/unit/plugins/ml2/drivers/l2pop/test_db.py b/neutron/tests/unit/plugins/ml2/drivers/l2pop/test_db.py index 4ac33caf420..3029a0f2747 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/l2pop/test_db.py +++ b/neutron/tests/unit/plugins/ml2/drivers/l2pop/test_db.py @@ -16,6 +16,7 @@ import netaddr from neutron_lib.api.definitions import portbindings from neutron_lib import constants from neutron_lib import context +from neutron_lib.db import api as db_api from neutron_lib.tests import tools from neutron_lib.utils import net from oslo_utils import uuidutils @@ -53,7 +54,7 @@ class TestL2PopulationDBTestCase(testlib_api.SqlTestCase): network_obj.Network(self.ctx, id=network_id).create() def _create_router(self, distributed=True, ha=False): - with self.ctx.session.begin(subtransactions=True): + with db_api.CONTEXT_WRITER.using(self.ctx): self.ctx.session.add(l3_models.Router(id=TEST_ROUTER_ID)) l3_objs.RouterExtraAttributes( self.ctx, @@ -67,7 +68,7 @@ class TestL2PopulationDBTestCase(testlib_api.SqlTestCase): # Tests should test that host3 is not a HA agent host. helpers.register_l3_agent(HOST_3) helpers.register_ovs_agent(HOST_3, tunneling_ip=HOST_3_TUNNELING_IP) - with self.ctx.session.begin(subtransactions=True): + with db_api.CONTEXT_WRITER.using(self.ctx): network_obj.Network(self.ctx, id=TEST_HA_NETWORK_ID).create() self._create_router(distributed=distributed, ha=True) for state, host in [(constants.HA_ROUTER_STATE_ACTIVE, HOST), @@ -100,7 +101,7 @@ class TestL2PopulationDBTestCase(testlib_api.SqlTestCase): self.assertIsNone(agent) def _setup_port_binding(self, **kwargs): - with self.ctx.session.begin(subtransactions=True): + with db_api.CONTEXT_WRITER.using(self.ctx): mac = netaddr.EUI( net.get_random_mac('fa:16:3e:00:00:00'.split(':')), dialect=netaddr.mac_unix_expanded) diff --git a/neutron/tests/unit/plugins/ml2/test_db.py b/neutron/tests/unit/plugins/ml2/test_db.py index 79992150ca3..94d23c34a5b 100644 --- a/neutron/tests/unit/plugins/ml2/test_db.py +++ b/neutron/tests/unit/plugins/ml2/test_db.py @@ -328,7 +328,7 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase): return ports def _setup_neutron_router(self): - with self.ctx.session.begin(subtransactions=True): + with db_api.CONTEXT_WRITER.using(self.ctx): router = l3_models.Router() self.ctx.session.add(router) return router diff --git a/neutron/tests/unit/plugins/ml2/test_ovo_rpc.py b/neutron/tests/unit/plugins/ml2/test_ovo_rpc.py index 964a0471861..831868db2a3 100644 --- a/neutron/tests/unit/plugins/ml2/test_ovo_rpc.py +++ b/neutron/tests/unit/plugins/ml2/test_ovo_rpc.py @@ -14,6 +14,7 @@ from unittest import mock from neutron_lib import context +from neutron_lib.db import api as db_api from neutron_lib.plugins import directory from neutron.objects import network @@ -94,7 +95,7 @@ class OVOServerRpcInterfaceTestCase(test_plugin.Ml2PluginV2TestCase): def test_transaction_state_error_doesnt_notify(self): # running in a transaction should cause it to skip notification since # fresh reads aren't possible. - with self.ctx.session.begin(): + with db_api.CONTEXT_WRITER.using(self.ctx): self.plugin.create_security_group( self.ctx, {'security_group': {'tenant_id': 'test', 'description': 'desc', diff --git a/neutron/tests/unit/plugins/ml2/test_plugin.py b/neutron/tests/unit/plugins/ml2/test_plugin.py index dd0a92f1ee2..a9e00c12327 100644 --- a/neutron/tests/unit/plugins/ml2/test_plugin.py +++ b/neutron/tests/unit/plugins/ml2/test_plugin.py @@ -2390,26 +2390,26 @@ class TestMl2PortBinding(Ml2PluginV2TestCase, self._check_port_binding_profile(port, profile) def test_update_port_binding_host_id_none(self): - with self.port() as port: - plugin = directory.get_plugin() - binding = p_utils.get_port_binding_by_status_and_host( - plugin._get_port(self.context, - port['port']['id']).port_bindings, - constants.ACTIVE) - with self.context.session.begin(subtransactions=True): + with db_api.CONTEXT_WRITER.using(self.context): + with self.port() as port: + plugin = directory.get_plugin() + binding = p_utils.get_port_binding_by_status_and_host( + plugin._get_port(self.context, + port['port']['id']).port_bindings, + constants.ACTIVE) binding.host = 'test' - mech_context = driver_context.PortContext( - plugin, self.context, port['port'], - plugin.get_network(self.context, port['port']['network_id']), - binding, None) - with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin.' - '_update_port_dict_binding') as update_mock: - attrs = {portbindings.HOST_ID: None} - self.assertEqual('test', binding.host) - with self.context.session.begin(subtransactions=True): + mech_context = driver_context.PortContext( + plugin, self.context, port['port'], + plugin.get_network( + self.context, port['port']['network_id']), + binding, None) + with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin.' + '_update_port_dict_binding') as update_mock: + attrs = {portbindings.HOST_ID: None} + self.assertEqual('test', binding.host) plugin._process_port_binding(mech_context, attrs) - self.assertTrue(update_mock.mock_calls) - self.assertEqual('', binding.host) + self.assertTrue(update_mock.mock_calls) + self.assertEqual('', binding.host) def test_update_port_binding_host_id_not_changed(self): with self.port() as port: diff --git a/neutron/tests/unit/plugins/ml2/test_port_binding.py b/neutron/tests/unit/plugins/ml2/test_port_binding.py index 8c4c09d65ee..dda6e595a71 100644 --- a/neutron/tests/unit/plugins/ml2/test_port_binding.py +++ b/neutron/tests/unit/plugins/ml2/test_port_binding.py @@ -19,6 +19,7 @@ from neutron_lib.api.definitions import portbindings from neutron_lib.api.definitions import portbindings_extended as pbe_ext from neutron_lib import constants as const from neutron_lib import context +from neutron_lib.db import api as db_api from neutron_lib import exceptions from neutron_lib.plugins import directory from neutron_lib.plugins import utils @@ -119,7 +120,7 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase): ctx = context.get_admin_context() with self.port(name='name') as port: # emulating concurrent binding deletion - with ctx.session.begin(): + with db_api.CONTEXT_WRITER.using(ctx): for item in (ctx.session.query(ml2_models.PortBinding). filter_by(port_id=port['port']['id'])): ctx.session.delete(item)