Do not call update_device_list in large sets

Ovs-agent can process the ports in large sets, then all
of these ports will have to update DB status or attributes.
But neutron server is centralized. It may have to do
something else, or the database processing can be also
time-consuming. Because of these, it sometimes returns
the RPC timeout exception to ovs-agent. And a fullsync
will be triggered in next rpc loop. The restart time is
becoming longer and longer.

Adds a default step to update the port to reduce
the probability of RPC timeout.

Related-Bug: #1813703
Related-Bug: #1813704
Related-Bug: #1813706
Related-Bug: #1813707

Change-Id: Ie37f4a4869969e235ce16b73cdfcbdc98626823e
This commit is contained in:
LIU Yulong 2019-02-21 16:39:50 +08:00 committed by LIU Yulong
parent 29ad31be10
commit 8408af4f17
3 changed files with 37 additions and 5 deletions

View File

@ -155,10 +155,31 @@ class PluginApi(object):
def update_device_list(self, context, devices_up, devices_down, def update_device_list(self, context, devices_up, devices_down,
agent_id, host, agent_restarted=False): agent_id, host, agent_restarted=False):
cctxt = self.client.prepare(version='1.5') cctxt = self.client.prepare(version='1.5')
return cctxt.call(context, 'update_device_list',
devices_up=devices_up, devices_down=devices_down, ret_devices_up = []
failed_devices_up = []
ret_devices_down = []
failed_devices_down = []
step = n_const.RPC_RES_PROCESSING_STEP
devices_up = list(devices_up)
devices_down = list(devices_down)
for i in range(0, max(len(devices_up), len(devices_down)), step):
# Divide-and-conquer RPC timeout
ret = cctxt.call(context, 'update_device_list',
devices_up=devices_up[i:i + step],
devices_down=devices_down[i:i + step],
agent_id=agent_id, host=host, agent_id=agent_id, host=host,
agent_restarted=agent_restarted) agent_restarted=agent_restarted)
ret_devices_up.extend(ret.get("devices_up", []))
failed_devices_up.extend(ret.get("failed_devices_up", []))
ret_devices_down.extend(ret.get("devices_down", []))
failed_devices_down.extend(ret.get("failed_devices_down", []))
return {'devices_up': ret_devices_up,
'failed_devices_up': failed_devices_up,
'devices_down': ret_devices_down,
'failed_devices_down': failed_devices_down}
def tunnel_sync(self, context, tunnel_ip, tunnel_type=None, host=None): def tunnel_sync(self, context, tunnel_ip, tunnel_type=None, host=None):
cctxt = self.client.prepare(version='1.4') cctxt = self.client.prepare(version='1.4')

View File

@ -274,3 +274,6 @@ RP_INVENTORY_DEFAULTS = 'resource_provider_inventory_defaults'
# for the restart success rate. # for the restart success rate.
# [1] http://paste.openstack.org/show/745685/ # [1] http://paste.openstack.org/show/745685/
AGENT_RES_PROCESSING_STEP = 100 AGENT_RES_PROCESSING_STEP = 100
# Number of resources for neutron to divide the large RPC
# call data sets.
RPC_RES_PROCESSING_STEP = 20

View File

@ -327,9 +327,17 @@ class RpcCallbacksTestCase(base.BaseTestCase):
class RpcApiTestCase(base.BaseTestCase): class RpcApiTestCase(base.BaseTestCase):
def _test_rpc_api(self, rpcapi, topic, method, rpc_method, **kwargs): def _test_rpc_api(self, rpcapi, topic, method, rpc_method, **kwargs):
if method == "update_device_list":
expected = {'devices_up': [],
'failed_devices_up': [],
'devices_down': [],
'failed_devices_down': []}
else:
expected = 'foo'
ctxt = oslo_context.RequestContext(user_id='fake_user', ctxt = oslo_context.RequestContext(user_id='fake_user',
tenant='fake_project') tenant='fake_project')
expected_retval = 'foo' if rpc_method == 'call' else None expected_retval = expected if rpc_method == 'call' else None
expected_version = kwargs.pop('version', None) expected_version = kwargs.pop('version', None)
fanout = kwargs.pop('fanout', False) fanout = kwargs.pop('fanout', False)