From 2a32ae927147ebb07a188b569c264bca80bbb116 Mon Sep 17 00:00:00 2001 From: Kevin Benton Date: Wed, 15 Feb 2017 22:27:53 -0800 Subject: [PATCH] Bulk up port context retrieval With the switch to subquery relationships, individual get_port calls can get expensive with large numbers of ports (100ms per port in my dev environment). This patch bulks up the retrieval of the port contexts so one set of queries covers all of the devices in an RPC call. Partial-Bug: #1665215 Change-Id: I63757e143b23c24c349be98dc5a09115b8709a25 (cherry picked from commit 529da4e583aefe7640d8559987282f223f3ad06b) --- neutron/plugins/ml2/plugin.py | 46 +++++++++++++++++++ neutron/plugins/ml2/rpc.py | 39 ++++++++++++---- .../unit/plugins/ml2/test_port_binding.py | 9 ---- neutron/tests/unit/plugins/ml2/test_rpc.py | 4 +- 4 files changed, 79 insertions(+), 19 deletions(-) diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 303fc9e1966..d63c144ecd6 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -1616,6 +1616,52 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, return self._bind_port_if_needed(port_context) + @utils.transaction_guard + @db_api.retry_if_session_inactive(context_var_name='plugin_context') + def get_bound_ports_contexts(self, plugin_context, dev_ids, host=None): + result = {} + with db_api.context_manager.reader.using(plugin_context): + dev_to_full_pids = db.partial_port_ids_to_full_ids( + plugin_context, dev_ids) + # get all port objects for IDs + port_dbs_by_id = db.get_port_db_objects( + plugin_context, dev_to_full_pids.values()) + # get all networks for PortContext construction + netctxs_by_netid = self.get_network_contexts( + plugin_context, + {p.network_id for p in port_dbs_by_id.values()}) + for dev_id in dev_ids: + port_id = dev_to_full_pids.get(dev_id) + port_db = port_dbs_by_id.get(port_id) + if (not port_id or not port_db or + port_db.network_id not in netctxs_by_netid): + result[dev_id] = None + continue + port = self._make_port_dict(port_db) + if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE: + binding = db.get_distributed_port_binding_by_host( + plugin_context, port['id'], host) + bindlevelhost_match = host + else: + binding = port_db.port_binding + bindlevelhost_match = binding.host if binding else None + if not binding: + LOG.info(_LI("Binding info for port %s was not found, " + "it might have been deleted already."), + port_id) + result[dev_id] = None + continue + levels = [l for l in port_db.binding_levels + if l.host == bindlevelhost_match] + levels = sorted(levels, key=lambda l: l.level) + network_ctx = netctxs_by_netid.get(port_db.network_id) + port_context = driver_context.PortContext( + self, plugin_context, port, network_ctx, binding, levels) + result[dev_id] = port_context + + return {d: self._bind_port_if_needed(pctx) if pctx else None + for d, pctx in result.items()} + @utils.transaction_guard @db_api.retry_if_session_inactive() def update_port_status(self, context, port_id, status, host=None, diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index 61b34c252f1..4a0f1d06bb2 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -82,13 +82,22 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin): {'device': device, 'agent_id': agent_id}) return {'device': device} - segment = port_context.bottom_bound_segment port = port_context.current # caching information about networks for future use if cached_networks is not None: if port['network_id'] not in cached_networks: cached_networks[port['network_id']] = ( port_context.network.current) + return self._get_device_details(rpc_context, agent_id=agent_id, + host=host, device=device, + port_context=port_context) + + def _get_device_details(self, rpc_context, agent_id, host, device, + port_context): + segment = port_context.bottom_bound_segment + port = port_context.current + plugin = directory.get_plugin() + port_id = port_context.current['id'] if not segment: LOG.warning(_LW("Device %(device)s requested by agent " @@ -149,17 +158,31 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin): **kwargs): devices = [] failed_devices = [] - cached_networks = {} - for device in kwargs.pop('devices', []): + devices_to_fetch = kwargs.pop('devices', []) + plugin = directory.get_plugin() + host = kwargs.get('host') + bound_contexts = plugin.get_bound_ports_contexts(rpc_context, + devices_to_fetch, + host) + for device in devices_to_fetch: + if not bound_contexts.get(device): + # unbound bound + LOG.debug("Device %(device)s requested by agent " + "%(agent_id)s not found in database", + {'device': device, + 'agent_id': kwargs.get('agent_id')}) + devices.append({'device': device}) + continue try: - devices.append(self.get_device_details( + devices.append(self._get_device_details( rpc_context, + agent_id=kwargs.get('agent_id'), + host=host, device=device, - cached_networks=cached_networks, - **kwargs)) + port_context=bound_contexts[device])) except Exception: - LOG.error(_LE("Failed to get details for device %s"), - device) + LOG.exception(_LE("Failed to get details for device %s"), + device) failed_devices.append(device) return {'devices': devices, diff --git a/neutron/tests/unit/plugins/ml2/test_port_binding.py b/neutron/tests/unit/plugins/ml2/test_port_binding.py index dc21aea6d0c..aa3dfcc8d39 100644 --- a/neutron/tests/unit/plugins/ml2/test_port_binding.py +++ b/neutron/tests/unit/plugins/ml2/test_port_binding.py @@ -131,15 +131,6 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase): cached_networks=cached_networks) self.assertFalse(self.plugin.get_network.called) - def test_get_bound_port_context_cache_miss(self): - ctx = context.get_admin_context() - with self.port(name='name') as port: - some_network = {'id': u'2ac23560-7638-44e2-9875-c1888b02af72'} - self.plugin.get_network = mock.Mock(return_value=some_network) - self.plugin.get_bound_port_context(ctx, port['port']['id'], - cached_networks={}) - self.assertEqual(1, self.plugin.get_network.call_count) - def _test_update_port_binding(self, host, new_host=None): with mock.patch.object(self.plugin, '_notify_port_updated') as notify_mock: diff --git a/neutron/tests/unit/plugins/ml2/test_rpc.py b/neutron/tests/unit/plugins/ml2/test_rpc.py index f0c7332f4ee..1e09012857b 100644 --- a/neutron/tests/unit/plugins/ml2/test_rpc.py +++ b/neutron/tests/unit/plugins/ml2/test_rpc.py @@ -166,13 +166,13 @@ class RpcCallbacksTestCase(base.BaseTestCase): def _test_get_devices_list(self, callback, side_effect, expected): devices = [1, 2, 3, 4, 5] kwargs = {'host': 'fake_host', 'agent_id': 'fake_agent_id'} - with mock.patch.object(self.callbacks, 'get_device_details', + with mock.patch.object(self.callbacks, '_get_device_details', side_effect=side_effect) as f: res = callback('fake_context', devices=devices, **kwargs) self.assertEqual(expected, res) self.assertEqual(len(devices), f.call_count) calls = [mock.call('fake_context', device=i, - cached_networks={}, **kwargs) + port_context=mock.ANY, **kwargs) for i in devices] f.assert_has_calls(calls)