Merge "delete stale ovsdb table entries from mysql db"

changes/18/332318/1
Jenkins 7 years ago committed by Gerrit Code Review
commit 89a6cf1072

@ -505,3 +505,66 @@ def get_all_ucast_mac_remote_by_ls(context, record_dict):
return session.query(models.UcastMacsRemotes).filter_by(
ovsdb_identifier=record_dict['ovsdb_identifier'],
logical_switch_id=record_dict['logical_switch_id']).all()
def delete_all_physical_locators_by_ovsdb_identifier(context,
ovsdb_identifier):
"""Delete all physical locators based on ovsdb identifier."""
session = context.session
with session.begin(subtransactions=True):
session.query(models.PhysicalLocators).filter_by(
ovsdb_identifier=ovsdb_identifier).delete()
def delete_all_physical_switches_by_ovsdb_identifier(context,
ovsdb_identifier):
"""Delete all physical switches based on ovsdb identifier."""
session = context.session
with session.begin(subtransactions=True):
session.query(models.PhysicalSwitches).filter_by(
ovsdb_identifier=ovsdb_identifier).delete()
def delete_all_physical_ports_by_ovsdb_identifier(context,
ovsdb_identifier):
"""Delete all physical ports based on ovsdb identifier."""
session = context.session
with session.begin(subtransactions=True):
session.query(models.PhysicalPorts).filter_by(
ovsdb_identifier=ovsdb_identifier).delete()
def delete_all_logical_switches_by_ovsdb_identifier(context,
ovsdb_identifier):
"""Delete all physical switches based on ovsdb identifier."""
session = context.session
with session.begin(subtransactions=True):
session.query(models.LogicalSwitches).filter_by(
ovsdb_identifier=ovsdb_identifier).delete()
def delete_all_ucast_macs_locals_by_ovsdb_identifier(context,
ovsdb_identifier):
"""Delete all ucast mac locals based on ovsdb identifier."""
session = context.session
with session.begin(subtransactions=True):
session.query(models.UcastMacsLocals).filter_by(
ovsdb_identifier=ovsdb_identifier).delete()
def delete_all_ucast_macs_remotes_by_ovsdb_identifier(context,
ovsdb_identifier):
"""Delete all ucast mac remotes based on ovsdb identifier."""
session = context.session
with session.begin(subtransactions=True):
session.query(models.UcastMacsRemotes).filter_by(
ovsdb_identifier=ovsdb_identifier).delete()
def delete_all_vlan_bindings_by_ovsdb_identifier(context,
ovsdb_identifier):
"""Delete all vlan bindings based on ovsdb identifier."""
session = context.session
with session.begin(subtransactions=True):
session.query(models.VlanBindings).filter_by(
ovsdb_identifier=ovsdb_identifier).delete()

@ -28,10 +28,11 @@ class L2GatewayAgentApi(object):
target = messaging.Target(topic=topic, version=self.API_VERSION)
self.client = n_rpc.get_client(target)
def update_ovsdb_changes(self, context, ovsdb_data):
def update_ovsdb_changes(self, context, activity, ovsdb_data):
cctxt = self.client.prepare()
return cctxt.cast(context,
'update_ovsdb_changes',
activity=activity,
ovsdb_data=ovsdb_data)
def notify_ovsdb_states(self, context, ovsdb_states):

@ -362,6 +362,7 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
ovsdb_identifier,
op_method)
def agent_to_plugin_rpc(self, ovsdb_data):
def agent_to_plugin_rpc(self, activity, ovsdb_data):
self.plugin_rpc.update_ovsdb_changes(ctx.get_admin_context(),
activity,
ovsdb_data)

@ -29,6 +29,10 @@ from networking_l2gw.services.l2gateway import exceptions
LOG = logging.getLogger(__name__)
class Activity:
Initial, Update = range(2)
class OVSDBMonitor(base_connection.BaseConnection):
"""Monitors OVSDB servers."""
def __init__(self, conf, gw_config, callback, mgr=None):
@ -135,7 +139,8 @@ class OVSDBMonitor(base_connection.BaseConnection):
params_list = message.get('params')
param_dict = params_list[1]
self._process_tables(param_dict, data_dict)
self.rpc_callback(self._form_ovsdb_data(data_dict, addr))
self.rpc_callback(Activity.Update,
self._form_ovsdb_data(data_dict, addr))
def _process_tables(self, param_dict, data_dict):
# Process all the tables one by one.
@ -246,7 +251,8 @@ class OVSDBMonitor(base_connection.BaseConnection):
data_dict = self._initialize_data_dict()
try:
self._process_tables(result_dict, data_dict)
self.rpc_callback(self._form_ovsdb_data(data_dict, addr))
self.rpc_callback(Activity.Initial,
self._form_ovsdb_data(data_dict, addr))
except Exception as e:
LOG.exception(_LE("_process_monitor_msg:ERROR %s "), e)

@ -44,11 +44,11 @@ class L2GatewayOVSDBCallbacks(object):
self.plugin = plugin
self.ovsdb = None
def update_ovsdb_changes(self, context, ovsdb_data):
def update_ovsdb_changes(self, context, activity, ovsdb_data):
"""RPC to update the changes from OVSDB in the database."""
self.ovsdb = self.get_ovsdbdata_object(
ovsdb_data.get(n_const.OVSDB_IDENTIFIER))
self.ovsdb.update_ovsdb_changes(context, ovsdb_data)
self.ovsdb.update_ovsdb_changes(context, activity, ovsdb_data)
def notify_ovsdb_states(self, context, ovsdb_states):
"""RPC to notify the OVSDB servers connection state."""
@ -74,9 +74,27 @@ class OVSDBData(object):
self.core_plugin = manager.NeutronManager.get_plugin()
self.tunnel_call = tunnel_calls.Tunnel_Calls()
def update_ovsdb_changes(self, context, ovsdb_data):
def _cleanup_all_ovsdb_tables(self, context, ovsdb_identifier):
db.delete_all_physical_locators_by_ovsdb_identifier(
context, ovsdb_identifier)
db.delete_all_physical_switches_by_ovsdb_identifier(
context, ovsdb_identifier)
db.delete_all_physical_ports_by_ovsdb_identifier(
context, ovsdb_identifier)
db.delete_all_logical_switches_by_ovsdb_identifier(
context, ovsdb_identifier)
db.delete_all_ucast_macs_locals_by_ovsdb_identifier(
context, ovsdb_identifier)
db.delete_all_ucast_macs_remotes_by_ovsdb_identifier(
context, ovsdb_identifier)
db.delete_all_vlan_bindings_by_ovsdb_identifier(
context, ovsdb_identifier)
def update_ovsdb_changes(self, context, activity, ovsdb_data):
"""RPC to update the changes from OVSDB in the database."""
ovsdb_identifier = ovsdb_data.get('ovsdb_identifier')
if not activity:
self._cleanup_all_ovsdb_tables(context, ovsdb_identifier)
for item, value in ovsdb_data.items():
lookup = self.entry_table.get(item, None)
if lookup:

@ -35,10 +35,12 @@ class L2GatewayAgentApiTestCase(base.BaseTestCase):
def test_update_ovsdb_changes(self):
cctxt = mock.Mock()
context = mock.Mock()
fake_activity = 1
self.agent_rpc.client.prepare.return_value = cctxt
self.agent_rpc.update_ovsdb_changes(context, mock.ANY)
self.agent_rpc.update_ovsdb_changes(context, fake_activity, mock.ANY)
cctxt.cast.assert_called_with(
context, 'update_ovsdb_changes', ovsdb_data=mock.ANY)
context, 'update_ovsdb_changes',
activity=fake_activity, ovsdb_data=mock.ANY)
def test_notify_ovsdb_states(self):
cctxt = mock.Mock()

@ -37,13 +37,15 @@ class TestL2GatewayOVSDBCallbacks(object):
self.context = context.get_admin_context()
def test_update_ovsdb_changes(self):
fake_activity = 1
fake_ovsdb_data = {n_const.OVSDB_IDENTIFIER: 'fake_id'}
with mock.patch.object(data, 'OVSDBData') as ovs_data:
self.l2gw_callbacks.update_ovsdb_changes(self.context,
fake_activity,
fake_ovsdb_data)
ovsdb_return_value = ovs_data.return_value
ovsdb_return_value.update_ovsdb_changes.assert_called_with(
self.context, fake_ovsdb_data)
self.context, fake_activity, fake_ovsdb_data)
def test_notify_ovsdb_states(self):
fake_ovsdb_states = {'ovsdb1': 'connected'}
@ -83,6 +85,7 @@ class TestOVSDBData(base.BaseTestCase):
def test_update_ovsdb_changes(self):
fake_dict = {}
fake_activity = 1
fake_remote_mac = {'uuid': '123456',
'mac': 'mac123',
'ovsdb_identifier': 'host1',
@ -182,7 +185,7 @@ class TestOVSDBData(base.BaseTestCase):
'deleted_local_macs': process_deleted_local_macs,
'deleted_remote_macs': process_deleted_remote_macs}
self.ovsdb_data.update_ovsdb_changes(
self.context, fake_ovsdb_data)
self.context, fake_activity, fake_ovsdb_data)
process_new_logical_switches.assert_called_with(
self.context, fake_new_logical_switches)

Loading…
Cancel
Save