Add context for all SQL transactions

Follow the last efforts to make Neutron compatible with sqlalchemy-20
(see [1]), and use the necessary context for all SQL transactions.

Also make tempest job non-voting until [2] merges.

[1] https://review.opendev.org/c/openstack/neutron-lib/+/828738
[2] https://review.opendev.org/c/openstack/neutron-tempest-plugin/+/842113

Closes-Bug: #1973757
Change-Id: I5aef8683df55b0825181e57257f0b71887315e52
This commit is contained in:
elajkat 2022-05-17 14:14:58 +02:00
parent ee460a3c06
commit f59837263c
5 changed files with 160 additions and 134 deletions

View File

@ -6,10 +6,17 @@
- openstack-python3-zed-jobs-neutron
check:
jobs:
- neutron-tempest-plugin-tap-as-a-service
gate:
jobs:
- neutron-tempest-plugin-tap-as-a-service
# TODO(lajoskatona): make the job non-voting till
# https://review.opendev.org/c/openstack/neutron-tempest-plugin/+/842113
# is not merged
- neutron-tempest-plugin-tap-as-a-service:
voting: false
# TODO(lajoskatona): remove job from gate till
# https://review.opendev.org/c/openstack/neutron-tempest-plugin/+/842113
# is not merged
# gate:
# jobs:
# - neutron-tempest-plugin-tap-as-a-service
periodic-weekly:
jobs:
- openstack-tox-py39

View File

@ -20,6 +20,7 @@ from sqlalchemy import orm
from sqlalchemy.orm import exc
from neutron_lib import constants
from neutron_lib.db import api as db_api
from neutron_lib.db import model_base
from neutron_lib.db import model_query
from neutron_lib.db import utils as db_utils
@ -86,12 +87,16 @@ class Taas_db_Mixin(taas.TaasPluginBase):
def _core_plugin(self):
return directory.get_plugin()
@db_api.retry_if_session_inactive()
@db_api.CONTEXT_READER
def _get_tap_service(self, context, id):
try:
return model_query.get_by_id(context, TapService, id)
except exc.NoResultFound:
raise taas.TapServiceNotFound(tap_id=id)
@db_api.retry_if_session_inactive()
@db_api.CONTEXT_READER
def _get_tap_id_association(self, context, tap_service_id):
try:
query = model_query.query_with_hooks(context, TapIdAssociation)
@ -100,10 +105,11 @@ class Taas_db_Mixin(taas.TaasPluginBase):
except exc.NoResultFound:
raise taas.TapServiceNotFound(tap_id=tap_service_id)
@db_api.CONTEXT_READER
def _get_tap_flow(self, context, id):
try:
return model_query.get_by_id(context, TapFlow, id)
except Exception:
except exc.NoResultFound:
raise taas.TapFlowNotFound(flow_id=id)
def _make_tap_service_dict(self, tap_service, fields=None):
@ -135,20 +141,21 @@ class Taas_db_Mixin(taas.TaasPluginBase):
return db_utils.resource_fields(res, fields)
@db_api.retry_if_session_inactive()
@db_api.CONTEXT_WRITER
def create_tap_service(self, context, tap_service):
LOG.debug("create_tap_service() called")
t_s = tap_service['tap_service']
tenant_id = t_s['tenant_id']
with context.session.begin(subtransactions=True):
tap_service_db = TapService(
id=uuidutils.generate_uuid(),
tenant_id=tenant_id,
name=t_s['name'],
description=t_s['description'],
port_id=t_s['port_id'],
status=constants.DOWN,
)
context.session.add(tap_service_db)
tap_service_db = TapService(
id=uuidutils.generate_uuid(),
tenant_id=tenant_id,
name=t_s['name'],
description=t_s['description'],
port_id=t_s['port_id'],
status=constants.DOWN,
)
context.session.add(tap_service_db)
return self._make_tap_service_dict(tap_service_db)
@ -183,41 +190,45 @@ class Taas_db_Mixin(taas.TaasPluginBase):
# not found
raise taas.TapServiceLimitReached()
@db_api.retry_if_session_inactive()
@db_api.CONTEXT_WRITER
def create_tap_id_association(self, context, tap_service_id):
LOG.debug("create_tap_id_association() called")
# create the TapIdAssociation object
with context.session.begin(subtransactions=True):
# allocate Taas id.
# if conflict happened, it will raise db.DBDuplicateEntry.
# this will be retry request again in neutron controller framework.
# so we just make sure TapIdAssociation field taas_id is unique
tap_id_association_db = self._allocate_taas_id_with_tap_service_id(
context, tap_service_id)
# allocate Taas id.
# if conflict happened, it will raise db.DBDuplicateEntry.
# this will be retry request again in neutron controller framework.
# so we just make sure TapIdAssociation field taas_id is unique
tap_id_association_db = self._allocate_taas_id_with_tap_service_id(
context, tap_service_id)
return self._make_tap_id_association_dict(tap_id_association_db)
@db_api.retry_if_session_inactive()
@db_api.CONTEXT_WRITER
def create_tap_flow(self, context, tap_flow):
LOG.debug("create_tap_flow() called")
t_f = tap_flow['tap_flow']
tenant_id = t_f['tenant_id']
# TODO(Vinay): Check for the tenant_id validation
# TODO(Vinay): Check for the source port validation
with context.session.begin(subtransactions=True):
tap_flow_db = TapFlow(
id=uuidutils.generate_uuid(),
tenant_id=tenant_id,
name=t_f['name'],
description=t_f['description'],
tap_service_id=t_f['tap_service_id'],
source_port=t_f['source_port'],
direction=t_f['direction'],
status=constants.DOWN,
vlan_filter=t_f['vlan_filter'],
)
context.session.add(tap_flow_db)
tap_flow_db = TapFlow(
id=uuidutils.generate_uuid(),
tenant_id=tenant_id,
name=t_f['name'],
description=t_f['description'],
tap_service_id=t_f['tap_service_id'],
source_port=t_f['source_port'],
direction=t_f['direction'],
status=constants.DOWN,
vlan_filter=t_f['vlan_filter'],
)
context.session.add(tap_flow_db)
return self._make_tap_flow_dict(tap_flow_db)
@db_api.retry_if_session_inactive()
@db_api.CONTEXT_WRITER
def delete_tap_service(self, context, id):
LOG.debug("delete_tap_service() called")
@ -226,9 +237,9 @@ class Taas_db_Mixin(taas.TaasPluginBase):
if not count:
raise taas.TapServiceNotFound(tap_id=id)
@db_api.CONTEXT_WRITER
def delete_tap_flow(self, context, id):
LOG.debug("delete_tap_flow() called")
count = context.session.query(TapFlow).filter_by(id=id).delete()
if not count:
@ -246,12 +257,15 @@ class Taas_db_Mixin(taas.TaasPluginBase):
t_a = self._get_tap_id_association(context, tap_service_id)
return self._make_tap_id_association_dict(t_a)
@db_api.CONTEXT_READER
def get_tap_flow(self, context, id, fields=None):
LOG.debug("get_tap_flow() called")
t_f = self._get_tap_flow(context, id)
return self._make_tap_flow_dict(t_f, fields)
@db_api.retry_if_session_inactive()
@db_api.CONTEXT_READER
def get_tap_services(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
@ -260,6 +274,8 @@ class Taas_db_Mixin(taas.TaasPluginBase):
self._make_tap_service_dict,
filters=filters, fields=fields)
@db_api.retry_if_session_inactive()
@db_api.CONTEXT_READER
def get_tap_flows(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
@ -268,24 +284,27 @@ class Taas_db_Mixin(taas.TaasPluginBase):
self._make_tap_flow_dict,
filters=filters, fields=fields)
def _get_port_details(self, context, port_id):
with context.session.begin(subtransactions=True):
port = self._core_plugin().get_port(context, port_id)
@db_api.retry_if_session_inactive()
@db_api.CONTEXT_READER
def get_port_details(self, context, port_id):
port = self._core_plugin().get_port(context, port_id)
return port
@db_api.retry_if_session_inactive()
@db_api.CONTEXT_WRITER
def update_tap_service(self, context, id, tap_service):
LOG.debug("update_tap_service() called")
t_s = tap_service['tap_service']
with context.session.begin(subtransactions=True):
tap_service_db = self._get_tap_service(context, id)
tap_service_db.update(t_s)
tap_service_db = self._get_tap_service(context, id)
tap_service_db.update(t_s)
return self._make_tap_service_dict(tap_service_db)
@db_api.retry_if_session_inactive()
@db_api.CONTEXT_WRITER
def update_tap_flow(self, context, id, tap_flow):
LOG.debug("update_tap_flow() called")
t_f = tap_flow['tap_flow']
with context.session.begin(subtransactions=True):
tap_flow_db = self._get_tap_flow(context, id)
tap_flow_db.update(t_f)
tap_flow_db = self._get_tap_flow(context, id)
tap_flow_db.update(t_f)
return self._make_tap_flow_dict(tap_flow_db)

View File

@ -17,6 +17,7 @@
from neutron_lib.api.definitions import portbindings
from neutron_lib import constants
from neutron_lib.db import api as db_api
from neutron_lib import exceptions as n_exc
from neutron_lib import rpc as n_rpc
@ -55,7 +56,7 @@ class TaasCallbacks(object):
for ts in active_tss:
# If tap-service port is bound to a different host than the one
# which sent this RPC, then continue.
ts_port = self.plugin._get_port_details(
ts_port = self.plugin.get_port_details(
context, ts['port_id'])
if ts_port['binding:host_id'] != host:
continue
@ -90,6 +91,7 @@ class TaasCallbacks(object):
super(TaasPlugin, self.plugin).delete_tap_flow(
context, tf['id'])
@db_api.CONTEXT_WRITER
def set_tap_service_status(self, context, msg, status, host=None):
"""Handle Rpc from Agent to set the status of Tap resources."""
LOG.info("In RPC Call to set tap service status: Host=%s, "
@ -98,25 +100,24 @@ class TaasCallbacks(object):
# Clear the resource from DB once agent indicates successful deletion
# by mech driver.
if status == constants.INACTIVE:
with context.session.begin(subtransactions=True):
ts = self.plugin.get_tap_service(context, msg['id'])
driver_context = sd_context.TapServiceContext(self.plugin,
context,
ts)
super(TaasPlugin, self.plugin).delete_tap_service(context,
msg['id'])
self.plugin.driver.delete_tap_service_postcommit(
driver_context)
ts = self.plugin.get_tap_service(context, msg['id'])
driver_context = sd_context.TapServiceContext(self.plugin,
context,
ts)
super(TaasPlugin, self.plugin).delete_tap_service(context,
msg['id'])
self.plugin.driver.delete_tap_service_postcommit(
driver_context)
return
with context.session.begin(subtransactions=True):
ts = self.plugin.get_tap_service(context, msg['id'])
ts['status'] = status
super(TaasPlugin, self.plugin).update_tap_service(
context,
msg['id'],
{'tap_service': ts})
ts = self.plugin.get_tap_service(context, msg['id'])
ts['status'] = status
super(TaasPlugin, self.plugin).update_tap_service(
context,
msg['id'],
{'tap_service': ts})
@db_api.CONTEXT_WRITER
def set_tap_flow_status(self, context, msg, status, host=None):
"""Handle Rpc from Agent to set the status of Tap resources."""
LOG.info("In RPC Call to set tap flow status: Host=%s, "
@ -125,22 +126,20 @@ class TaasCallbacks(object):
# Clear the resource from DB once agent indicates successful deletion
# by mech driver.
if status == constants.INACTIVE:
with context.session.begin(subtransactions=True):
tf = self.plugin.get_tap_flow(context, msg['id'])
driver_context = sd_context.TapFlowContext(self.plugin,
context,
tf)
super(TaasPlugin, self.plugin).delete_tap_flow(context,
msg['id'])
self.plugin.driver.delete_tap_flow_postcommit(driver_context)
tf = self.plugin.get_tap_flow(context, msg['id'])
driver_context = sd_context.TapFlowContext(self.plugin,
context,
tf)
super(TaasPlugin, self.plugin).delete_tap_flow(context,
msg['id'])
self.plugin.driver.delete_tap_flow_postcommit(driver_context)
return
with context.session.begin(subtransactions=True):
tf = self.plugin.get_tap_flow(context, msg['id'])
tf['status'] = status
super(TaasPlugin, self.plugin).update_tap_flow(context,
msg['id'],
{'tap_flow': tf})
tf = self.plugin.get_tap_flow(context, msg['id'])
tf['status'] = status
super(TaasPlugin, self.plugin).update_tap_flow(context,
msg['id'],
{'tap_flow': tf})
class TaasRpcDriver(service_drivers.TaasBaseDriver):
@ -187,8 +186,8 @@ class TaasRpcDriver(service_drivers.TaasBaseDriver):
ts = context.tap_service
tap_id_association = context.tap_id_association
taas_vlan_id = tap_id_association['taas_id']
port = self.service_plugin._get_port_details(context._plugin_context,
ts['port_id'])
port = self.service_plugin.get_port_details(context._plugin_context,
ts['port_id'])
host = port['binding:host_id']
rpc_msg = {'tap_service': ts,
@ -210,7 +209,7 @@ class TaasRpcDriver(service_drivers.TaasBaseDriver):
taas_vlan_id = tap_id_association['taas_id']
try:
port = self.service_plugin._get_port_details(
port = self.service_plugin.get_port_details(
context._plugin_context,
ts['port_id'])
host = port['binding:host_id']
@ -238,14 +237,14 @@ class TaasRpcDriver(service_drivers.TaasBaseDriver):
tf = context.tap_flow
taas_id = self._get_taas_id(context._plugin_context, tf)
# Extract the host where the source port is located
port = self.service_plugin._get_port_details(context._plugin_context,
tf['source_port'])
port = self.service_plugin.get_port_details(context._plugin_context,
tf['source_port'])
host = port['binding:host_id']
port_mac = port['mac_address']
# Extract the tap-service port
ts = self.service_plugin.get_tap_service(context._plugin_context,
tf['tap_service_id'])
ts_port = self.service_plugin._get_port_details(
ts_port = self.service_plugin.get_port_details(
context._plugin_context, ts['port_id'])
# Send RPC message to both the source port host and
@ -264,14 +263,14 @@ class TaasRpcDriver(service_drivers.TaasBaseDriver):
tf = context.tap_flow
taas_id = self._get_taas_id(context._plugin_context, tf)
# Extract the host where the source port is located
port = self.service_plugin._get_port_details(context._plugin_context,
tf['source_port'])
port = self.service_plugin.get_port_details(context._plugin_context,
tf['source_port'])
host = port['binding:host_id']
port_mac = port['mac_address']
# Extract the tap-service port
ts = self.service_plugin.get_tap_service(context._plugin_context,
tf['tap_service_id'])
ts_port = self.service_plugin._get_port_details(
ts_port = self.service_plugin.get_port_details(
context._plugin_context, ts['port_id'])
src_vlans_list = []
@ -286,7 +285,7 @@ class TaasRpcDriver(service_drivers.TaasBaseDriver):
fields=['source_port', 'vlan_filter'])
for tap_flow in active_tfs:
source_port = self.service_plugin._get_port_details(
source_port = self.service_plugin.get_port_details(
context._plugin_context, tap_flow['source_port'])
LOG.debug("taas: active TF's source_port %(source_port)s",

View File

@ -21,6 +21,7 @@ from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants
from neutron_lib.db import api as db_api
from neutron_lib import exceptions as n_exc
from neutron_taas.common import constants as taas_consts
@ -70,6 +71,7 @@ class TaasPlugin(taas_db.Taas_db_Mixin):
raise n_exc.Invalid("Error retrieving driver for provider %s" %
provider)
@db_api.CONTEXT_WRITER
def create_tap_service(self, context, tap_service):
LOG.debug("create_tap_service() called")
@ -78,7 +80,7 @@ class TaasPlugin(taas_db.Taas_db_Mixin):
port_id = t_s['port_id']
# Get port details
port = self._get_port_details(context, port_id)
port = self.get_port_details(context, port_id)
# Check if the port is owned by the tenant.
if port['tenant_id'] != tenant_id:
@ -93,11 +95,10 @@ class TaasPlugin(taas_db.Taas_db_Mixin):
LOG.debug("Host could not be found, Port Binding disbaled!")
# Create tap service in the db model
with context.session.begin(subtransactions=True):
ts = super(TaasPlugin, self).create_tap_service(context,
tap_service)
driver_context = sd_context.TapServiceContext(self, context, ts)
self.driver.create_tap_service_precommit(driver_context)
ts = super(TaasPlugin, self).create_tap_service(context,
tap_service)
driver_context = sd_context.TapServiceContext(self, context, ts)
self.driver.create_tap_service_precommit(driver_context)
try:
self.driver.create_tap_service_postcommit(driver_context)
@ -109,6 +110,7 @@ class TaasPlugin(taas_db.Taas_db_Mixin):
return ts
@db_api.CONTEXT_WRITER
def delete_tap_service(self, context, id):
LOG.debug("delete_tap_service() called")
@ -121,25 +123,25 @@ class TaasPlugin(taas_db.Taas_db_Mixin):
for t_f in t_f_collection:
self.delete_tap_flow(context, t_f['id'])
with context.session.begin(subtransactions=True):
ts = self.get_tap_service(context, id)
driver_context = sd_context.TapServiceContext(self, context, ts)
if ts['status'] == constants.ACTIVE:
ts['status'] = constants.PENDING_DELETE
super(TaasPlugin, self).update_tap_service(
context, id, {'tap_service': ts})
method = self.driver.delete_tap_service_precommit
else:
super(TaasPlugin, self).delete_tap_service(context, id)
method = self.driver.delete_tap_service_postcommit
ts = self.get_tap_service(context, id)
driver_context = sd_context.TapServiceContext(self, context, ts)
if ts['status'] == constants.ACTIVE:
ts['status'] = constants.PENDING_DELETE
super(TaasPlugin, self).update_tap_service(
context, id, {'tap_service': ts})
method = self.driver.delete_tap_service_precommit
else:
super(TaasPlugin, self).delete_tap_service(context, id)
method = self.driver.delete_tap_service_postcommit
try:
method(driver_context)
except Exception:
with excutils.save_and_reraise_exception():
LOG.error("Failed to delete tap service on driver. "
"tap_sevice: %s", id)
try:
method(driver_context)
except Exception:
with excutils.save_and_reraise_exception():
LOG.error("Failed to delete tap service on driver. "
"tap_sevice: %s", id)
@db_api.CONTEXT_WRITER
def create_tap_flow(self, context, tap_flow):
LOG.debug("create_tap_flow() called")
@ -156,10 +158,9 @@ class TaasPlugin(taas_db.Taas_db_Mixin):
raise taas_ex.TapServiceNotBelongToTenant()
# create tap flow in the db model
with context.session.begin(subtransactions=True):
tf = super(TaasPlugin, self).create_tap_flow(context, tap_flow)
driver_context = sd_context.TapFlowContext(self, context, tf)
self.driver.create_tap_flow_precommit(driver_context)
tf = super(TaasPlugin, self).create_tap_flow(context, tap_flow)
driver_context = sd_context.TapFlowContext(self, context, tf)
self.driver.create_tap_flow_precommit(driver_context)
try:
self.driver.create_tap_flow_postcommit(driver_context)
@ -171,27 +172,27 @@ class TaasPlugin(taas_db.Taas_db_Mixin):
return tf
@db_api.CONTEXT_WRITER
def delete_tap_flow(self, context, id):
LOG.debug("delete_tap_flow() called")
with context.session.begin(subtransactions=True):
tf = self.get_tap_flow(context, id)
driver_context = sd_context.TapFlowContext(self, context, tf)
if tf['status'] == constants.ACTIVE:
tf['status'] = constants.PENDING_DELETE
super(TaasPlugin, self).update_tap_flow(context, id,
{'tap_flow': tf})
method = self.driver.delete_tap_flow_precommit
else:
super(TaasPlugin, self).delete_tap_flow(context, id)
method = self.driver.delete_tap_flow_postcommit
tf = self.get_tap_flow(context, id)
driver_context = sd_context.TapFlowContext(self, context, tf)
if tf['status'] == constants.ACTIVE:
tf['status'] = constants.PENDING_DELETE
super(TaasPlugin, self).update_tap_flow(context, id,
{'tap_flow': tf})
method = self.driver.delete_tap_flow_precommit
else:
super(TaasPlugin, self).delete_tap_flow(context, id)
method = self.driver.delete_tap_flow_postcommit
try:
method(driver_context)
except Exception:
with excutils.save_and_reraise_exception():
LOG.error("Failed to delete tap flow on driver. "
"tap_flow: %s", id)
try:
method(driver_context)
except Exception:
with excutils.save_and_reraise_exception():
LOG.error("Failed to delete tap flow on driver. "
"tap_flow: %s", id)
@registry.receives(resources.PORT, [events.PRECOMMIT_DELETE])
def handle_delete_port(self, resource, event, trigger, payload):

View File

@ -87,7 +87,7 @@ class TestTaasPlugin(testlib_api.SqlTestCase):
req = {
'tap_service': self._tap_service,
}
with mock.patch.object(self._plugin, '_get_port_details',
with mock.patch.object(self._plugin, 'get_port_details',
return_value=self._port_details):
self._plugin.create_tap_service(self._context, req)
self._tap_service['id'] = mock.ANY
@ -119,7 +119,7 @@ class TestTaasPlugin(testlib_api.SqlTestCase):
req = {
'tap_flow': self._tap_flow,
}
with mock.patch.object(self._plugin, '_get_port_details',
with mock.patch.object(self._plugin, 'get_port_details',
return_value=self._port_details):
self._plugin.create_tap_flow(self._context, req)
self._tap_flow['id'] = mock.ANY
@ -198,7 +198,7 @@ class TestTaasPlugin(testlib_api.SqlTestCase):
req = {
'tap_service': self._tap_service,
}
with mock.patch.object(self._plugin, '_get_port_details',
with mock.patch.object(self._plugin, 'get_port_details',
return_value=self._port_details):
self._plugin.create_tap_service(self._context, req)
@ -269,7 +269,7 @@ class TestTaasPlugin(testlib_api.SqlTestCase):
req = {
'tap_flow': self._tap_flow,
}
with mock.patch.object(self._plugin, '_get_port_details',
with mock.patch.object(self._plugin, 'get_port_details',
return_value=self._port_details):
self._plugin.create_tap_flow(self._context, req)