[OVN] Protect the AgentCache iteration by copying the local cache

During the execution of ``AgentCache.__iter__``, the content of
"self.agents" can change; the Neutron server can attend to an event
to add or delete an OVN agent. If that happens, the iteration will
fail with the error:
  "RuntimeError: dictionary changed size during iteration"

To avoid this, before iterating the list of agents, the cache
singleton creates a copy of the local cache. The aim of making this
copy is to avoid using locks between threads.

Closes-Bug: #1976292
Change-Id: Icf92685579409282bad0a80ba42531c93738e0b1
This commit is contained in:
Rodolfo Alonso Hernandez 2022-05-15 22:48:46 +00:00 committed by Rodolfo Alonso
parent f966ed8c15
commit a71fe45d96
3 changed files with 76 additions and 3 deletions

View File

@ -13,6 +13,7 @@
#
import abc
import copy
import datetime
from oslo_config import cfg
@ -229,7 +230,10 @@ class AgentCache:
self.driver = driver
def __iter__(self):
return iter(self.agents.values())
# Copying self.agents will avoid any issue during the iteration if an
# agent is added or deleted.
_agents = copy.copy(self.agents)
return iter(_agents.values())
def __getitem__(self, key):
return self.agents[key]
@ -253,12 +257,12 @@ class AgentCache:
agent_ids = {cls.id_from_chassis_private(chassis_private)
for cls in NeutronAgent.types.values()}
# Return the cached agents of agent_ids whose keys are in the cache
return (self.agents[id_] for id_ in agent_ids & self.agents.keys())
return (agent for agent in self if agent.agent_id in agent_ids)
def get_agents(self, filters=None):
filters = filters or {}
agent_list = []
for agent in self.agents.values():
for agent in self:
agent_dict = agent.as_dict()
if all(agent_dict[k] in v for k, v in filters.items()):
agent_list.append(agent)

View File

@ -0,0 +1,69 @@
# Copyright 2022 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import eventlet
from neutron.common.ovn import constants as ovn_const
from neutron.plugins.ml2.drivers.ovn.agent import neutron_agent
from neutron.tests import base
from neutron.tests.unit import fake_resources as fakes
class AgentCacheTestCase(base.BaseTestCase):
def setUp(self):
super().setUp()
self.agent_cache = neutron_agent.AgentCache(driver=mock.ANY)
self.addCleanup(self._clean_agent_cache)
self.names_ref = []
for i in range(10): # Add 10 agents.
chassis_private = fakes.FakeOvsdbRow.create_one_ovsdb_row(
attrs={'name': 'chassis' + str(i)})
self.agent_cache.update(ovn_const.OVN_CONTROLLER_AGENT,
chassis_private)
self.names_ref.append('chassis' + str(i))
def _clean_agent_cache(self):
self.agent_cache.agents = {}
def _list_agents(self):
self.names_read = []
for idx, agent in enumerate(self.agent_cache):
self.names_read.append(agent.agent_id)
if idx == 5: # Swap to "_add_and_delete_agents" thread.
eventlet.sleep(0)
def _add_and_delete_agents(self):
del self.agent_cache['chassis8']
chassis_private = fakes.FakeOvsdbRow.create_one_ovsdb_row(
attrs={'name': 'chassis10'})
self.agent_cache.update(ovn_const.OVN_CONTROLLER_AGENT,
chassis_private)
def test_update_while_iterating_agents(self):
pool = eventlet.GreenPool(2)
pool.spawn(self._list_agents)
pool.spawn(self._add_and_delete_agents)
pool.waitall()
self.assertEqual(self.names_ref, self.names_read)
def test_agents_by_chassis_private(self):
chassis_private = fakes.FakeOvsdbRow.create_one_ovsdb_row(
attrs={'name': 'chassis5'})
agents = self.agent_cache.agents_by_chassis_private(chassis_private)
agents = list(agents)
self.assertEqual(1, len(agents))
self.assertEqual('chassis5', agents[0].agent_id)