Avoid writing segments to the DB repeatedly
When:
* the segments service plugin is enabled and
* we have multiple rpc worker processes (as in the sum of rpc_workers
and rpc_state_report_workers, since both kind processes agent
state_reports) and
* many ovs-agents report physnets,
then rabbitmq dispatches the state_report messages between the workers
in a round robin fashion, therefore eventually the state_reports of the
same agent will hit all rpc workers.
Unfortunately all worker processes have a 'reported_hosts' set to
remember from which host it has seen agent reports already. But right
after a server start when that set is still empty, each worker will
unconditionally write the received physnet-segment information into
the db. This means we multiply the load on the db and rpc workers by
a factor of the rpc worker count.
This patch tries to reduce the load on the db by adding another early
return before the unconditional db write.
Change-Id: I935186b6ee95f0cae8dc05869d9742c8fb3353c3
Closes-Bug: #1952730
(cherry picked from commit 176503e610
)
This commit is contained in:
parent
ad9c486659
commit
dcb372b041
|
@ -386,6 +386,7 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
||||||
agent = self._get_agent_by_type_and_host(
|
agent = self._get_agent_by_type_and_host(
|
||||||
context, agent_state['agent_type'], agent_state['host'])
|
context, agent_state['agent_type'], agent_state['host'])
|
||||||
agent_state_orig = copy.deepcopy(agent_state)
|
agent_state_orig = copy.deepcopy(agent_state)
|
||||||
|
agent_state_previous = copy.deepcopy(agent)
|
||||||
if not agent.is_active:
|
if not agent.is_active:
|
||||||
status = agent_consts.AGENT_REVIVED
|
status = agent_consts.AGENT_REVIVED
|
||||||
if 'resource_versions' not in agent_state:
|
if 'resource_versions' not in agent_state:
|
||||||
|
@ -406,6 +407,7 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
||||||
event_type = events.AFTER_UPDATE
|
event_type = events.AFTER_UPDATE
|
||||||
except agent_exc.AgentNotFoundByTypeHost:
|
except agent_exc.AgentNotFoundByTypeHost:
|
||||||
agent_state_orig = None
|
agent_state_orig = None
|
||||||
|
agent_state_previous = None
|
||||||
greenthread.sleep(0)
|
greenthread.sleep(0)
|
||||||
res['created_at'] = current_time
|
res['created_at'] = current_time
|
||||||
res['started_at'] = current_time
|
res['started_at'] = current_time
|
||||||
|
@ -430,7 +432,7 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
||||||
'plugin': self,
|
'plugin': self,
|
||||||
'status': status
|
'status': status
|
||||||
},
|
},
|
||||||
states=(agent_state_orig, ),
|
states=(agent_state_orig, agent_state_previous),
|
||||||
desired_state=agent_state,
|
desired_state=agent_state,
|
||||||
resource_id=agent.id
|
resource_id=agent.id
|
||||||
))
|
))
|
||||||
|
|
|
@ -304,6 +304,11 @@ def _update_segment_host_mapping_for_agent(resource, event, trigger,
|
||||||
if host in reported_hosts and not start_flag:
|
if host in reported_hosts and not start_flag:
|
||||||
return
|
return
|
||||||
reported_hosts.add(host)
|
reported_hosts.add(host)
|
||||||
|
if (len(payload.states) > 1 and
|
||||||
|
payload.states[1] is not None and
|
||||||
|
agent.get('configurations') == payload.states[1].get(
|
||||||
|
'configurations')):
|
||||||
|
return
|
||||||
segments = get_segments_with_phys_nets(context, phys_nets)
|
segments = get_segments_with_phys_nets(context, phys_nets)
|
||||||
current_segment_ids = {
|
current_segment_ids = {
|
||||||
segment['id'] for segment in segments
|
segment['id'] for segment in segments
|
||||||
|
|
|
@ -965,9 +965,9 @@ class TestMl2HostSegmentMappingAgentServerSynch(HostSegmentMappingTestCase):
|
||||||
self._register_agent(host, mappings={physical_network: 'br-eth-1'},
|
self._register_agent(host, mappings={physical_network: 'br-eth-1'},
|
||||||
plugin=self.plugin, start_flag=True)
|
plugin=self.plugin, start_flag=True)
|
||||||
self.assertIn(host, db.reported_hosts)
|
self.assertIn(host, db.reported_hosts)
|
||||||
self.assertEqual(2, mock_function.call_count)
|
self.assertEqual(1, mock_function.call_count)
|
||||||
expected_call = mock.call(mock.ANY, host, set())
|
expected_call = mock.call(mock.ANY, host, set())
|
||||||
mock_function.assert_has_calls([expected_call, expected_call])
|
mock_function.assert_has_calls([expected_call])
|
||||||
|
|
||||||
@mock.patch(mock_path)
|
@mock.patch(mock_path)
|
||||||
def test_no_starting_agent_is_not_processed(self, mock_function):
|
def test_no_starting_agent_is_not_processed(self, mock_function):
|
||||||
|
|
Loading…
Reference in New Issue