Bulk up port context retrieval

With the switch to subquery relationships, individual get_port calls
can get expensive with large numbers of ports
(100ms per port in my dev environment). This patch bulks up the
retrieval of the port contexts so one set of queries covers all
of the devices in an RPC call.

Partial-Bug: #1665215
Change-Id: I63757e143b23c24c349be98dc5a09115b8709a25
(cherry picked from commit 529da4e583)
changes/95/466395/3
Kevin Benton 6 years ago committed by Ihar Hrachyshka
parent ce0e4b25d5
commit 2a32ae9271
  1. 46
      neutron/plugins/ml2/plugin.py
  2. 39
      neutron/plugins/ml2/rpc.py
  3. 9
      neutron/tests/unit/plugins/ml2/test_port_binding.py
  4. 4
      neutron/tests/unit/plugins/ml2/test_rpc.py

@ -1616,6 +1616,52 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
return self._bind_port_if_needed(port_context)
@utils.transaction_guard
@db_api.retry_if_session_inactive(context_var_name='plugin_context')
def get_bound_ports_contexts(self, plugin_context, dev_ids, host=None):
result = {}
with db_api.context_manager.reader.using(plugin_context):
dev_to_full_pids = db.partial_port_ids_to_full_ids(
plugin_context, dev_ids)
# get all port objects for IDs
port_dbs_by_id = db.get_port_db_objects(
plugin_context, dev_to_full_pids.values())
# get all networks for PortContext construction
netctxs_by_netid = self.get_network_contexts(
plugin_context,
{p.network_id for p in port_dbs_by_id.values()})
for dev_id in dev_ids:
port_id = dev_to_full_pids.get(dev_id)
port_db = port_dbs_by_id.get(port_id)
if (not port_id or not port_db or
port_db.network_id not in netctxs_by_netid):
result[dev_id] = None
continue
port = self._make_port_dict(port_db)
if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
binding = db.get_distributed_port_binding_by_host(
plugin_context, port['id'], host)
bindlevelhost_match = host
else:
binding = port_db.port_binding
bindlevelhost_match = binding.host if binding else None
if not binding:
LOG.info(_LI("Binding info for port %s was not found, "
"it might have been deleted already."),
port_id)
result[dev_id] = None
continue
levels = [l for l in port_db.binding_levels
if l.host == bindlevelhost_match]
levels = sorted(levels, key=lambda l: l.level)
network_ctx = netctxs_by_netid.get(port_db.network_id)
port_context = driver_context.PortContext(
self, plugin_context, port, network_ctx, binding, levels)
result[dev_id] = port_context
return {d: self._bind_port_if_needed(pctx) if pctx else None
for d, pctx in result.items()}
@utils.transaction_guard
@db_api.retry_if_session_inactive()
def update_port_status(self, context, port_id, status, host=None,

@ -82,13 +82,22 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
{'device': device, 'agent_id': agent_id})
return {'device': device}
segment = port_context.bottom_bound_segment
port = port_context.current
# caching information about networks for future use
if cached_networks is not None:
if port['network_id'] not in cached_networks:
cached_networks[port['network_id']] = (
port_context.network.current)
return self._get_device_details(rpc_context, agent_id=agent_id,
host=host, device=device,
port_context=port_context)
def _get_device_details(self, rpc_context, agent_id, host, device,
port_context):
segment = port_context.bottom_bound_segment
port = port_context.current
plugin = directory.get_plugin()
port_id = port_context.current['id']
if not segment:
LOG.warning(_LW("Device %(device)s requested by agent "
@ -149,17 +158,31 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
**kwargs):
devices = []
failed_devices = []
cached_networks = {}
for device in kwargs.pop('devices', []):
devices_to_fetch = kwargs.pop('devices', [])
plugin = directory.get_plugin()
host = kwargs.get('host')
bound_contexts = plugin.get_bound_ports_contexts(rpc_context,
devices_to_fetch,
host)
for device in devices_to_fetch:
if not bound_contexts.get(device):
# unbound bound
LOG.debug("Device %(device)s requested by agent "
"%(agent_id)s not found in database",
{'device': device,
'agent_id': kwargs.get('agent_id')})
devices.append({'device': device})
continue
try:
devices.append(self.get_device_details(
devices.append(self._get_device_details(
rpc_context,
agent_id=kwargs.get('agent_id'),
host=host,
device=device,
cached_networks=cached_networks,
**kwargs))
port_context=bound_contexts[device]))
except Exception:
LOG.error(_LE("Failed to get details for device %s"),
device)
LOG.exception(_LE("Failed to get details for device %s"),
device)
failed_devices.append(device)
return {'devices': devices,

@ -131,15 +131,6 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
cached_networks=cached_networks)
self.assertFalse(self.plugin.get_network.called)
def test_get_bound_port_context_cache_miss(self):
ctx = context.get_admin_context()
with self.port(name='name') as port:
some_network = {'id': u'2ac23560-7638-44e2-9875-c1888b02af72'}
self.plugin.get_network = mock.Mock(return_value=some_network)
self.plugin.get_bound_port_context(ctx, port['port']['id'],
cached_networks={})
self.assertEqual(1, self.plugin.get_network.call_count)
def _test_update_port_binding(self, host, new_host=None):
with mock.patch.object(self.plugin,
'_notify_port_updated') as notify_mock:

@ -166,13 +166,13 @@ class RpcCallbacksTestCase(base.BaseTestCase):
def _test_get_devices_list(self, callback, side_effect, expected):
devices = [1, 2, 3, 4, 5]
kwargs = {'host': 'fake_host', 'agent_id': 'fake_agent_id'}
with mock.patch.object(self.callbacks, 'get_device_details',
with mock.patch.object(self.callbacks, '_get_device_details',
side_effect=side_effect) as f:
res = callback('fake_context', devices=devices, **kwargs)
self.assertEqual(expected, res)
self.assertEqual(len(devices), f.call_count)
calls = [mock.call('fake_context', device=i,
cached_networks={}, **kwargs)
port_context=mock.ANY, **kwargs)
for i in devices]
f.assert_has_calls(calls)

Loading…
Cancel
Save