use mutate operation instead of update operation

for insert/delete vlan_bindings column in physical_port table.
updating the vlan_bindings, which requires the previous data
from neutron db for a port, which is a overhead operation,and also causing
inconsistent entries in ovsdb server.
use mutate operation so that only the vlan_bindings which is generated
in request is inserted/deleted by mutating the vlan_binding column.
so that vlan_bindings in ovsdb is consistent.

Closes-Bug: 1542185

Change-Id: I8e0d3c19f74ae2906236fd71befacc0ff501023a
changes/13/302513/6
vikas 7 years ago
parent 48df6c1dc2
commit 4104c4f837

@ -329,7 +329,7 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
def update_connection_to_gateway(self, context, ovsdb_identifier,
logical_switch_dict, locator_dicts,
mac_dicts, port_dicts):
mac_dicts, port_dicts, op_method):
"""Handle RPC cast from plugin.
Handle RPC cast from plugin to connect/disconnect a network
@ -341,6 +341,7 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
mac_dicts,
port_dicts,
ovsdb_identifier,
op_method,
False)
elif ((self.enable_manager) and (
not self.l2gw_agent_type == n_const.MONITOR)):
@ -350,7 +351,7 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
logical_switch_dict,
locator_dicts,
mac_dicts,
port_dicts, ovsdb_identifier, False)
port_dicts, ovsdb_identifier, op_method, False)
elif not self.enable_manager:
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:
@ -358,7 +359,8 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
locator_dicts,
mac_dicts,
port_dicts,
ovsdb_identifier)
ovsdb_identifier,
op_method)
def agent_to_plugin_rpc(self, ovsdb_data):
self.plugin_rpc.update_ovsdb_changes(ctx.get_admin_context(),

@ -215,6 +215,7 @@ class OVSDBWriter(base_connection.BaseConnection):
def update_connection_to_gateway(self, logical_switch_dict,
locator_dicts, mac_dicts,
port_dicts, ovsdb_identifier,
op_method,
rcv_required=True):
"""Updates Physical Port's VNI to VLAN binding."""
# Form the JSON Query so as to update the physical port with the
@ -222,7 +223,8 @@ class OVSDBWriter(base_connection.BaseConnection):
update_dicts = self._get_bindings_to_update(logical_switch_dict,
locator_dicts,
mac_dicts,
port_dicts)
port_dicts,
op_method)
op_id = str(random.getrandbits(128))
query = {"method": "transact",
"params": update_dicts,
@ -266,7 +268,7 @@ class OVSDBWriter(base_connection.BaseConnection):
return
def _get_bindings_to_update(self, l_switch_dict, locator_dicts,
mac_dicts, port_dicts):
mac_dicts, port_dicts, op_method):
# For connection-create, there are two cases to be handled
# Case 1: VMs exist in a network on compute nodes.
# Connection request will contain locators, ports, MACs and
@ -335,7 +337,7 @@ class OVSDBWriter(base_connection.BaseConnection):
params)
# Use ports
self._form_ports(ls_list, port_list, params)
self._form_ports(ls_list, port_list, params, op_method)
params.append(commit_dict)
return params
@ -366,7 +368,7 @@ class OVSDBWriter(base_connection.BaseConnection):
ls_list)
params.append(query)
def _form_ports(self, ls_list, port_list, params):
def _form_ports(self, ls_list, port_list, params, op_method):
for port in port_list:
port_vlan_bindings = []
outer_list = []
@ -381,11 +383,22 @@ class OVSDBWriter(base_connection.BaseConnection):
outer_list.append([vlan_binding.vlan,
ls_list])
port_vlan_bindings.append(outer_list)
update_dict = {"op": "update",
"table": "Physical_Port",
"where": [["_uuid", "==",
["uuid", port.uuid]]],
"row": {"vlan_bindings": port_vlan_bindings}}
if op_method == 'CREATE':
update_dict = {"op": "mutate",
"table": "Physical_Port",
"where": [["_uuid", "==",
["uuid", port.uuid]]],
"mutations": [["vlan_bindings",
"insert",
port_vlan_bindings]]}
elif op_method == 'DELETE':
update_dict = {"op": "mutate",
"table": "Physical_Port",
"where": [["_uuid", "==",
["uuid", port.uuid]]],
"mutations": [["vlan_bindings",
"delete",
port_vlan_bindings]]}
params.append(update_dict)
def _get_physical_locator_dict(self, locator):

@ -43,3 +43,5 @@ L2_GATEWAY_SERVICE_PLUGIN = "Neutron L2 gateway Service Plugin"
PORT_FAULT_STATUS_UP = "UP"
SWITCH_FAULT_STATUS_UP = "UP"
VXLAN = "vxlan"
CREATE = "CREATE"
DELETE = "DELETE"

@ -120,6 +120,10 @@ class L2GatewayServiceDriverError(exceptions.NeutronException):
message = _("%(method)s failed.")
class InvalidMethod(exceptions.NeutronException):
message = _("invalid method '%(op_method)s'")
base.FAULT_MAP.update({L2GatewayInUse: web_exc.HTTPConflict,
L2GatewayPortInUse: web_exc.HTTPConflict,
L2GatewayConnectionExists: web_exc.HTTPConflict,

@ -15,6 +15,7 @@
from neutron.common import rpc as n_rpc
from networking_l2gw.services.l2gateway.common import constants as n_const
from networking_l2gw.services.l2gateway import exceptions as l2gw_exc
from oslo_log import log as logging
@ -80,10 +81,17 @@ class L2gatewayAgentApi(object):
ovsdb_identifier=ovsdb_identifier,
logical_switch_uuid=logical_switch_uuid)
def _validate_request_op_method(self, context, op_method):
"""validate the method in the request."""
method_list = [n_const.CREATE, n_const.DELETE]
if op_method not in method_list:
raise l2gw_exc.InvalidMethod(op_method=op_method)
def update_connection_to_gateway(self, context, ovsdb_identifier,
ls_dict, locator_list, mac_dict,
port_dict):
port_dict, op_method):
"""RPC to update the connection to gateway."""
self._validate_request_op_method(context, op_method)
cctxt = self.client.prepare()
try:
return cctxt.call(context,
@ -92,7 +100,8 @@ class L2gatewayAgentApi(object):
logical_switch_dict=ls_dict,
locator_dicts=locator_list,
mac_dicts=mac_dict,
port_dicts=port_dict)
port_dicts=port_dict,
op_method=op_method)
except messaging.MessagingTimeout:
message = _("Communication error with the L2 gateway agent")
raise l2gw_exc.OVSDBError(message=message)

@ -433,10 +433,6 @@ class L2gwRpcDriver(service_drivers.L2gwDriver):
'uuid': pp_dict['uuid']}
raise l2gw_exc.L2GatewayDuplicateSegmentationID(message=msg
)
vlan_dict = {'vlan': vlan_binding.get('vlan'),
'logical_switch_uuid': vlan_binding.get(
'logical_switch_uuid')}
port_list.append(vlan_dict)
physical_port = self._get_dict(
ovsdb_schema.PhysicalPort(
uuid=pp_dict.get('uuid'),
@ -449,17 +445,9 @@ class L2gwRpcDriver(service_drivers.L2gwDriver):
vlan_id = gw_connection.get('segmentation_id')
if not vlan_id:
vlan_id = interface.get('segmentation_id')
for vlan_binding in vlan_bindings:
if ((vlan_binding.get('vlan') == vlan_id) and (
vlan_binding.get(
'logical_switch_uuid') == logical_switch_uuid)):
continue
else:
vlan_dict = {
'vlan': vlan_binding.get('vlan'),
'logical_switch_uuid':
vlan_binding.get('logical_switch_uuid')}
port_list.append(vlan_dict)
vlan_dict = {'vlan': vlan_id,
'logical_switch_uuid': logical_switch_uuid}
port_list.append(vlan_dict)
physical_port = self._get_dict(
ovsdb_schema.PhysicalPort(
uuid=pp_dict.get('uuid'),
@ -646,7 +634,7 @@ class L2gwRpcDriver(service_drivers.L2gwDriver):
locator.pop('ovsdb_identifier')
self.agent_rpc.update_connection_to_gateway(
context, ovsdb_identifier, ls_dict, locator_list, mac_dict,
port_dict)
port_dict, 'CREATE')
def _get_identifer_list(self, context, gw_connection):
identifier_list = []
@ -722,7 +710,7 @@ class L2gwRpcDriver(service_drivers.L2gwDriver):
"DELETE", gw_connection_ovsdb_set))
self.agent_rpc.update_connection_to_gateway(
context, ovsdb_identifier, ls_dict, locator_list, mac_dict,
port_dict)
port_dict, 'DELETE')
# call delete vif_from_gateway for ovsdb_id_set
self._remove_vm_macs(context, network_id, ovsdb_id_set)

@ -286,15 +286,16 @@ class TestManager(base.BaseTestCase):
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
self.l2gw_agent_manager.__init__()
self.l2gw_agent_manager.l2gw_agent_type = n_const.MONITOR
fake_op_method = 'CREATE'
with mock.patch.object(ovsdb_common_class,
'OVSDB_commom_class') as mock_ovsdb_common:
self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value
self.l2gw_agent_manager.update_connection_to_gateway(
self.context, mock.Mock(), mock.Mock(), mock.Mock(),
mock.Mock(), mock.Mock())
mock.Mock(), mock.Mock(), fake_op_method)
(self.l2gw_agent_manager.ovsdb_fd.update_connection_to_gateway.
assert_called_with(mock.ANY, mock.ANY, mock.ANY, mock.ANY,
mock.ANY, False))
mock.ANY, fake_op_method, False))
def test_update_connection_to_gateway_for_transact_agent(self):
"""Test case to test update_connection_to_gateway
@ -304,6 +305,7 @@ class TestManager(base.BaseTestCase):
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
self.l2gw_agent_manager.__init__()
self.l2gw_agent_manager.l2gw_agent_type = ''
fake_op_method = 'CREATE'
with contextlib.nested(
mock.patch.object(ovsdb_common_class, 'OVSDB_commom_class'),
mock.patch.object(manager.OVSDBManager,
@ -313,11 +315,11 @@ class TestManager(base.BaseTestCase):
self.l2gw_agent_manager.ovsdb_fd.ovsdb_conn_list = ['fake_ip']
self.l2gw_agent_manager.update_connection_to_gateway(
self.context, 'fake_ip', mock.Mock(), mock.Mock(),
mock.Mock(), mock.Mock())
mock.Mock(), mock.Mock(), fake_op_method)
self.assertTrue(mock_open_conn.called)
(self.l2gw_agent_manager.ovsdb_fd.update_connection_to_gateway.
assert_called_with(mock.ANY, mock.ANY, mock.ANY, mock.ANY,
'fake_ip', False))
'fake_ip', fake_op_method, False))
def test_update_connection_to_gateway_for_enable_manager_false(self):
"""Test case to test update_connection_to_gateway with
@ -326,6 +328,7 @@ class TestManager(base.BaseTestCase):
"""
cfg.CONF.set_override('enable_manager', False, 'ovsdb')
self.l2gw_agent_manager.__init__()
fake_op_method = 'CREATE'
with contextlib.nested(
mock.patch.object(self.l2gw_agent_manager,
'_is_valid_request', return_value=True),
@ -333,7 +336,8 @@ class TestManager(base.BaseTestCase):
)) as (mock_valid_req, mock_ovsdb_fd):
self.l2gw_agent_manager.update_connection_to_gateway(
self.context, 'fake_ovsdb_id', "fake_logical_switch_dict",
"fake_locator_dicts", "fake_mac_dicts", "fake_port_dicts")
"fake_locator_dicts", "fake_mac_dicts", "fake_port_dicts",
fake_op_method)
ovsdb_sock_fd = mock_ovsdb_fd.return_value
mock_valid_req.assert_called_with('fake_ovsdb_id')
(ovsdb_sock_fd.update_connection_to_gateway.
@ -341,7 +345,7 @@ class TestManager(base.BaseTestCase):
"fake_locator_dicts",
"fake_mac_dicts",
"fake_port_dicts",
"fake_ovsdb_id"))
"fake_ovsdb_id", fake_op_method))
def test_delete_network_for_monitor_agent(self):
"""Test case to test delete_network with enable_manager."""

@ -372,6 +372,7 @@ class TestOVSDBWriter(base.BaseTestCase):
def test_get_bindings_to_update1(self):
"""Test case to test _get_bindings_to_update."""
fake_op_method = 'CREATE'
with contextlib.nested(
mock.patch.object(ovsdb_writer.OVSDBWriter,
'_form_logical_switch'),
@ -405,12 +406,14 @@ class TestOVSDBWriter(base.BaseTestCase):
'vlan_bindings':
'vlan_bindings',
'port_fault_status':
'port_fault_status'}])
'port_fault_status'}],
fake_op_method)
form_ls.assert_called_with(ls, mock.ANY)
form_pl.assert_called_with(['uuid', ls.uuid], [pl],
mock.ANY, mock.ANY)
form_pp.assert_called_with(['uuid', ls.uuid], [pp], mock.ANY)
form_pp.assert_called_with(['uuid', ls.uuid], [pp], mock.ANY,
fake_op_method)
def test_get_physical_locator_dict(self):
"""Test case to test _get_physical_locator_dict."""

@ -98,6 +98,12 @@ class TestL2GatewayAgentApi(base.BaseTestCase):
self.context, 'delete_network', ovsdb_identifier=mock.ANY,
logical_switch_uuid=mock.ANY)
def test_validate_request_op_method(self):
self.assertRaises(l2gw_exc.InvalidMethod,
self.plugin_rpc._validate_request_op_method,
self.context,
'fake_op_method')
def test_update_connection_to_gateway(self):
cctxt = mock.Mock()
fake_ovsdb_identifier = 'fake_ovsdb_id'
@ -105,17 +111,36 @@ class TestL2GatewayAgentApi(base.BaseTestCase):
fake_physical_locator_list = []
fake_mac_dicts = [{}]
fake_port_dicts = [{}]
fake_op_method = 'DELETE'
self.plugin_rpc.client.prepare.return_value = cctxt
self.plugin_rpc.update_connection_to_gateway(
self.context, fake_ovsdb_identifier, fake_logical_switch,
fake_physical_locator_list, fake_mac_dicts, fake_port_dicts)
fake_physical_locator_list, fake_mac_dicts, fake_port_dicts,
fake_op_method)
cctxt.call.assert_called_with(
self.context, 'update_connection_to_gateway',
ovsdb_identifier=fake_ovsdb_identifier,
logical_switch_dict=fake_logical_switch,
locator_dicts=fake_physical_locator_list,
mac_dicts=fake_mac_dicts,
port_dicts=fake_port_dicts)
port_dicts=fake_port_dicts,
op_method=fake_op_method)
def test_update_connection_to_gateway_with_invalid_op_method(self):
cctxt = mock.Mock()
fake_ovsdb_identifier = 'fake_ovsdb_id'
fake_logical_switch = {}
fake_physical_locator_list = []
fake_mac_dicts = [{}]
fake_port_dicts = [{}]
fake_op_method = 'create_delete_op'
self.plugin_rpc.client.prepare.return_value = cctxt
self.assertRaises(
l2gw_exc.InvalidMethod,
self.plugin_rpc.update_connection_to_gateway,
self.context, fake_ovsdb_identifier, fake_logical_switch,
fake_physical_locator_list, fake_mac_dicts, fake_port_dicts,
fake_op_method)
def test_update_connection_to_gateway_with_error(self):
cctxt = mock.Mock()
@ -124,6 +149,7 @@ class TestL2GatewayAgentApi(base.BaseTestCase):
fake_physical_locator_list = []
fake_mac_dicts = [{}]
fake_port_dicts = [{}]
fake_op_method = 'CREATE'
self.plugin_rpc.client.prepare.return_value = cctxt
# Test with a timeout exception
@ -134,7 +160,8 @@ class TestL2GatewayAgentApi(base.BaseTestCase):
l2gw_exc.OVSDBError,
self.plugin_rpc.update_connection_to_gateway,
self.context, fake_ovsdb_identifier, fake_logical_switch,
fake_physical_locator_list, fake_mac_dicts, fake_port_dicts)
fake_physical_locator_list, fake_mac_dicts, fake_port_dicts,
fake_op_method)
# Test with a remote exception
with mock.patch.object(cctxt,
@ -144,4 +171,5 @@ class TestL2GatewayAgentApi(base.BaseTestCase):
l2gw_exc.OVSDBError,
self.plugin_rpc.update_connection_to_gateway,
self.context, fake_ovsdb_identifier, fake_logical_switch,
fake_physical_locator_list, fake_mac_dicts, fake_port_dicts)
fake_physical_locator_list, fake_mac_dicts, fake_port_dicts,
fake_op_method)

@ -226,7 +226,7 @@ class TestL2gwRpcDriver(base.BaseTestCase):
def test_generate_port_list_for_delete(self):
fake_connection = {'l2_gateway_id': 'fake_l2gw_id',
'network_id': 'fake_network_id',
'segmentation_id': 200L}
'segmentation_id': 100}
fake_method = 'DELETE'
fake_interface = {'interface_name': 'fake_interface_name'}
fake_pp_dict = {'interface_name': 'fake_interface_name',
@ -234,7 +234,7 @@ class TestL2gwRpcDriver(base.BaseTestCase):
'physical_switch_id': 'fake_uuid',
'logical_switch_name': 'fake_network_id',
'uuid': 'fake_uuid'}
fake_vlan_binding = {'vlan': 100L,
fake_vlan_binding = {'vlan': 100,
'logical_switch_uuid': 'fake_uuid'}
fake_vlan_binding_list = [fake_vlan_binding]
@ -255,7 +255,7 @@ class TestL2gwRpcDriver(base.BaseTestCase):
get_vlan):
port = self.plugin._generate_port_list(
self.context, fake_method, 100L, fake_interface,
fake_pp_dict, 'fake_uuid1', fake_connection)
fake_pp_dict, 'fake_uuid', fake_connection)
get_vlan.assert_called_with(self.context, fake_pp_dict)
self.assertEqual(port, phys_port_dict)

Loading…
Cancel
Save