Merge "Bulk up port context retrieval"
This commit is contained in:
commit
f1091b73a3
|
@ -908,7 +908,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
filters=net_filters)
|
||||
}
|
||||
segments_by_netid = segments_db.get_networks_segments(
|
||||
context, nets_by_netid.keys())
|
||||
context, list(nets_by_netid.keys()))
|
||||
netctxs_by_netid = {
|
||||
net_id: driver_context.NetworkContext(
|
||||
self, context, nets_by_netid[net_id],
|
||||
|
@ -1551,6 +1551,52 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
|
||||
return self._bind_port_if_needed(port_context)
|
||||
|
||||
@utils.transaction_guard
|
||||
@db_api.retry_if_session_inactive(context_var_name='plugin_context')
|
||||
def get_bound_ports_contexts(self, plugin_context, dev_ids, host=None):
|
||||
result = {}
|
||||
with db_api.context_manager.reader.using(plugin_context):
|
||||
dev_to_full_pids = db.partial_port_ids_to_full_ids(
|
||||
plugin_context, dev_ids)
|
||||
# get all port objects for IDs
|
||||
port_dbs_by_id = db.get_port_db_objects(
|
||||
plugin_context, dev_to_full_pids.values())
|
||||
# get all networks for PortContext construction
|
||||
netctxs_by_netid = self.get_network_contexts(
|
||||
plugin_context,
|
||||
{p.network_id for p in port_dbs_by_id.values()})
|
||||
for dev_id in dev_ids:
|
||||
port_id = dev_to_full_pids.get(dev_id)
|
||||
port_db = port_dbs_by_id.get(port_id)
|
||||
if (not port_id or not port_db or
|
||||
port_db.network_id not in netctxs_by_netid):
|
||||
result[dev_id] = None
|
||||
continue
|
||||
port = self._make_port_dict(port_db)
|
||||
if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
|
||||
binding = db.get_distributed_port_binding_by_host(
|
||||
plugin_context, port['id'], host)
|
||||
bindlevelhost_match = host
|
||||
else:
|
||||
binding = port_db.port_binding
|
||||
bindlevelhost_match = binding.host if binding else None
|
||||
if not binding:
|
||||
LOG.info(_LI("Binding info for port %s was not found, "
|
||||
"it might have been deleted already."),
|
||||
port_id)
|
||||
result[dev_id] = None
|
||||
continue
|
||||
levels = [l for l in port_db.binding_levels
|
||||
if l.host == bindlevelhost_match]
|
||||
levels = sorted(levels, key=lambda l: l.level)
|
||||
network_ctx = netctxs_by_netid.get(port_db.network_id)
|
||||
port_context = driver_context.PortContext(
|
||||
self, plugin_context, port, network_ctx, binding, levels)
|
||||
result[dev_id] = port_context
|
||||
|
||||
return {d: self._bind_port_if_needed(pctx) if pctx else None
|
||||
for d, pctx in result.items()}
|
||||
|
||||
@utils.transaction_guard
|
||||
@db_api.retry_if_session_inactive()
|
||||
def update_port_status(self, context, port_id, status, host=None,
|
||||
|
|
|
@ -82,13 +82,22 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
|
|||
{'device': device, 'agent_id': agent_id})
|
||||
return {'device': device}
|
||||
|
||||
segment = port_context.bottom_bound_segment
|
||||
port = port_context.current
|
||||
# caching information about networks for future use
|
||||
if cached_networks is not None:
|
||||
if port['network_id'] not in cached_networks:
|
||||
cached_networks[port['network_id']] = (
|
||||
port_context.network.current)
|
||||
return self._get_device_details(rpc_context, agent_id=agent_id,
|
||||
host=host, device=device,
|
||||
port_context=port_context)
|
||||
|
||||
def _get_device_details(self, rpc_context, agent_id, host, device,
|
||||
port_context):
|
||||
segment = port_context.bottom_bound_segment
|
||||
port = port_context.current
|
||||
plugin = directory.get_plugin()
|
||||
port_id = port_context.current['id']
|
||||
|
||||
if not segment:
|
||||
LOG.warning(_LW("Device %(device)s requested by agent "
|
||||
|
@ -149,17 +158,31 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
|
|||
**kwargs):
|
||||
devices = []
|
||||
failed_devices = []
|
||||
cached_networks = {}
|
||||
for device in kwargs.pop('devices', []):
|
||||
devices_to_fetch = kwargs.pop('devices', [])
|
||||
plugin = directory.get_plugin()
|
||||
host = kwargs.get('host')
|
||||
bound_contexts = plugin.get_bound_ports_contexts(rpc_context,
|
||||
devices_to_fetch,
|
||||
host)
|
||||
for device in devices_to_fetch:
|
||||
if not bound_contexts.get(device):
|
||||
# unbound bound
|
||||
LOG.debug("Device %(device)s requested by agent "
|
||||
"%(agent_id)s not found in database",
|
||||
{'device': device,
|
||||
'agent_id': kwargs.get('agent_id')})
|
||||
devices.append({'device': device})
|
||||
continue
|
||||
try:
|
||||
devices.append(self.get_device_details(
|
||||
devices.append(self._get_device_details(
|
||||
rpc_context,
|
||||
agent_id=kwargs.get('agent_id'),
|
||||
host=host,
|
||||
device=device,
|
||||
cached_networks=cached_networks,
|
||||
**kwargs))
|
||||
port_context=bound_contexts[device]))
|
||||
except Exception:
|
||||
LOG.error(_LE("Failed to get details for device %s"),
|
||||
device)
|
||||
LOG.exception(_LE("Failed to get details for device %s"),
|
||||
device)
|
||||
failed_devices.append(device)
|
||||
|
||||
return {'devices': devices,
|
||||
|
|
|
@ -132,15 +132,6 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
|
|||
cached_networks=cached_networks)
|
||||
self.assertFalse(self.plugin.get_network.called)
|
||||
|
||||
def test_get_bound_port_context_cache_miss(self):
|
||||
ctx = context.get_admin_context()
|
||||
with self.port(name='name') as port:
|
||||
some_network = {'id': u'2ac23560-7638-44e2-9875-c1888b02af72'}
|
||||
self.plugin.get_network = mock.Mock(return_value=some_network)
|
||||
self.plugin.get_bound_port_context(ctx, port['port']['id'],
|
||||
cached_networks={})
|
||||
self.assertEqual(1, self.plugin.get_network.call_count)
|
||||
|
||||
def _test_update_port_binding(self, host, new_host=None):
|
||||
with mock.patch.object(self.plugin,
|
||||
'_notify_port_updated') as notify_mock:
|
||||
|
|
|
@ -166,13 +166,13 @@ class RpcCallbacksTestCase(base.BaseTestCase):
|
|||
def _test_get_devices_list(self, callback, side_effect, expected):
|
||||
devices = [1, 2, 3, 4, 5]
|
||||
kwargs = {'host': 'fake_host', 'agent_id': 'fake_agent_id'}
|
||||
with mock.patch.object(self.callbacks, 'get_device_details',
|
||||
with mock.patch.object(self.callbacks, '_get_device_details',
|
||||
side_effect=side_effect) as f:
|
||||
res = callback('fake_context', devices=devices, **kwargs)
|
||||
self.assertEqual(expected, res)
|
||||
self.assertEqual(len(devices), f.call_count)
|
||||
calls = [mock.call('fake_context', device=i,
|
||||
cached_networks={}, **kwargs)
|
||||
port_context=mock.ANY, **kwargs)
|
||||
for i in devices]
|
||||
f.assert_has_calls(calls)
|
||||
|
||||
|
|
Loading…
Reference in New Issue