Add support for deleting ml2/ovn agents
This adds support for deleting OVN controller/metadata agents. Behavior is undefined if the agents are still actually up as per the Agent API docs. As part of this, it is necessary to be able to tell all workers that the agent is gone. This can't be done by deleting the Chassis, because ovn-controller deletes the Chassis if it is stopped gracefully and we need to still display those agents as down until ovn-controller is restarted. This also means we can't write a value to the Chassis marking the agent as 'deleted' because the Chassis may not be there. And of course you can't use the cache because then other workers won't see that the agent is deleted. Due to the hash ring implementation, we also cannot naively just send some pre-defined event that all workers can listen for to update their status of the agent. Only one worker would process the event. So we need some kind of GLOBAL event type that is processed by all workers. When the hash ring implementation was done, the agent API implementation was redesigned to work around moving from having a single OVN Worker to having distributed events. That implementation relied on marking the agents 'alive' in the OVSDB. With large numbers of Chassis entries, this induces significant load, with 2 DB writes per Chassis per cfg.CONF.agent_down_time / 2 seconds (37 by default). This patch reverts that change and goes back to using events to store agent information in the cache, but adds support for "GLOBAL" events that are run on each worker that uses a particular connection. Change-Id: I4581848ad3e176fa576f80a752f2f062c974c2d1
This commit is contained in:
parent
a2a29e2d62
commit
da3ce73198
@ -15,11 +15,11 @@
|
||||
import collections
|
||||
import functools
|
||||
import re
|
||||
import uuid
|
||||
|
||||
from neutron_lib import constants as n_const
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_log import log
|
||||
from oslo_utils import uuidutils
|
||||
from ovsdbapp.backend.ovs_idl import event as row_event
|
||||
from ovsdbapp.backend.ovs_idl import vlog
|
||||
import tenacity
|
||||
@ -261,8 +261,10 @@ class MetadataAgent(object):
|
||||
# NOTE(lucasagomes): db_add() will not overwrite the UUID if
|
||||
# it's already set.
|
||||
table = ('Chassis_Private' if self.has_chassis_private else 'Chassis')
|
||||
ext_ids = {
|
||||
ovn_const.OVN_AGENT_METADATA_ID_KEY: uuidutils.generate_uuid()}
|
||||
chassis_id = uuid.UUID(self._get_own_chassis_name())
|
||||
# Generate unique, but consistent metadata id for chassis name
|
||||
agent_id = uuid.uuid5(chassis_id, 'metadata_agent')
|
||||
ext_ids = {ovn_const.OVN_AGENT_METADATA_ID_KEY: str(agent_id)}
|
||||
self.sb_idl.db_add(table, self.chassis, 'external_ids',
|
||||
ext_ids).execute(check_error=True)
|
||||
|
||||
|
@ -14,10 +14,13 @@
|
||||
|
||||
import abc
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from neutron._i18n import _
|
||||
from neutron.common.ovn import constants as ovn_const
|
||||
from neutron.common.ovn import utils as ovn_utils
|
||||
from neutron.common import utils
|
||||
|
||||
|
||||
class NeutronAgent(abc.ABC):
|
||||
@ -27,26 +30,26 @@ class NeutronAgent(abc.ABC):
|
||||
# Register the subclasses to be looked up by their type
|
||||
NeutronAgent.types[cls.agent_type] = cls
|
||||
|
||||
def __init__(self, chassis_private):
|
||||
self.chassis_private = chassis_private
|
||||
self.chassis = self.get_chassis(chassis_private)
|
||||
def __init__(self, chassis_private, driver, updated_at=None):
|
||||
self.driver = driver
|
||||
self.set_down = False
|
||||
self.update(chassis_private, updated_at)
|
||||
|
||||
@staticmethod
|
||||
def get_chassis(chassis_private):
|
||||
try:
|
||||
return chassis_private.chassis[0]
|
||||
except (AttributeError, IndexError):
|
||||
# No Chassis_Private support, just use Chassis
|
||||
return chassis_private
|
||||
def update(self, chassis_private, updated_at=None, clear_down=False):
|
||||
self.chassis_private = chassis_private
|
||||
self.updated_at = updated_at or timeutils.utcnow(with_timezone=True)
|
||||
if clear_down:
|
||||
self.set_down = False
|
||||
|
||||
@property
|
||||
def updated_at(self):
|
||||
def chassis(self):
|
||||
try:
|
||||
return timeutils.parse_isotime(self.chassis.external_ids[self.key])
|
||||
except KeyError:
|
||||
return timeutils.utcnow(with_timezone=True)
|
||||
return self.chassis_private.chassis[0]
|
||||
except (AttributeError, IndexError):
|
||||
# No Chassis_Private support, just use Chassis
|
||||
return self.chassis_private
|
||||
|
||||
def as_dict(self, alive):
|
||||
def as_dict(self):
|
||||
return {
|
||||
'binary': self.binary,
|
||||
'host': self.chassis.hostname,
|
||||
@ -62,39 +65,62 @@ class NeutronAgent(abc.ABC):
|
||||
'start_flag': True,
|
||||
'agent_type': self.agent_type,
|
||||
'id': self.agent_id,
|
||||
'alive': alive,
|
||||
'alive': self.alive,
|
||||
'admin_state_up': True}
|
||||
|
||||
@classmethod
|
||||
def from_type(cls, _type, chassis_private):
|
||||
return cls.types[_type](chassis_private)
|
||||
|
||||
@staticmethod
|
||||
def matches_chassis(chassis):
|
||||
"""Is this Agent type found on the passed in chassis?"""
|
||||
@property
|
||||
def alive(self):
|
||||
if self.set_down:
|
||||
return False
|
||||
# TODO(twilson) Determine if we can go back to just checking:
|
||||
# if self.driver._nb_ovn.nb_global.nb_cfg == self.nb_cfg:
|
||||
if self.driver._nb_ovn.nb_global.nb_cfg - self.nb_cfg <= 1:
|
||||
return True
|
||||
now = timeutils.utcnow(with_timezone=True)
|
||||
if (now - self.updated_at).total_seconds() < cfg.CONF.agent_down_time:
|
||||
# down, but not yet timed out
|
||||
return True
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def agents_from_chassis(cls, chassis_private):
|
||||
return [AgentCls(chassis_private)
|
||||
for AgentCls in cls.types.values()
|
||||
if AgentCls.matches_chassis(cls.get_chassis(chassis_private))]
|
||||
def from_type(cls, _type, chassis_private, driver, updated_at=None):
|
||||
return cls.types[_type](chassis_private, driver, updated_at)
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def agent_type(self):
|
||||
pass
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def binary(self):
|
||||
pass
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def nb_cfg(self):
|
||||
pass
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def agent_id(self):
|
||||
pass
|
||||
|
||||
|
||||
class ControllerAgent(NeutronAgent):
|
||||
agent_type = ovn_const.OVN_CONTROLLER_AGENT
|
||||
binary = 'ovn-controller'
|
||||
key = ovn_const.OVN_LIVENESS_CHECK_EXT_ID_KEY
|
||||
|
||||
@staticmethod # it is by default, but this makes pep8 happy
|
||||
def __new__(cls, chassis_private, driver, updated_at=None):
|
||||
if ('enable-chassis-as-gw' in
|
||||
chassis_private.external_ids.get('ovn-cms-options', [])):
|
||||
cls = ControllerGatewayAgent
|
||||
return super().__new__(cls)
|
||||
|
||||
@staticmethod
|
||||
def matches_chassis(chassis):
|
||||
return ('enable-chassis-as-gw' not in
|
||||
chassis.external_ids.get('ovn-cms-options', []))
|
||||
def id_from_chassis_private(chassis_private):
|
||||
return chassis_private.name
|
||||
|
||||
@property
|
||||
def nb_cfg(self):
|
||||
@ -102,7 +128,7 @@ class ControllerAgent(NeutronAgent):
|
||||
|
||||
@property
|
||||
def agent_id(self):
|
||||
return self.chassis_private.name
|
||||
return self.id_from_chassis_private(self.chassis_private)
|
||||
|
||||
@property
|
||||
def description(self):
|
||||
@ -113,28 +139,76 @@ class ControllerAgent(NeutronAgent):
|
||||
class ControllerGatewayAgent(ControllerAgent):
|
||||
agent_type = ovn_const.OVN_CONTROLLER_GW_AGENT
|
||||
|
||||
@staticmethod
|
||||
def matches_chassis(chassis):
|
||||
return ('enable-chassis-as-gw' in
|
||||
chassis.external_ids.get('ovn-cms-options', []))
|
||||
|
||||
|
||||
class MetadataAgent(NeutronAgent):
|
||||
agent_type = ovn_const.OVN_METADATA_AGENT
|
||||
binary = 'neutron-ovn-metadata-agent'
|
||||
key = ovn_const.METADATA_LIVENESS_CHECK_EXT_ID_KEY
|
||||
|
||||
@property
|
||||
def alive(self):
|
||||
# If ovn-controller is down, then metadata agent is down even
|
||||
# if the metadata-agent binary is updating external_ids.
|
||||
try:
|
||||
if not AgentCache()[self.chassis_private.name].alive:
|
||||
return False
|
||||
except KeyError:
|
||||
return False
|
||||
return super().alive
|
||||
|
||||
@property
|
||||
def nb_cfg(self):
|
||||
return int(self.chassis_private.external_ids.get(
|
||||
ovn_const.OVN_AGENT_METADATA_SB_CFG_KEY, 0))
|
||||
|
||||
@staticmethod
|
||||
def id_from_chassis_private(chassis_private):
|
||||
return chassis_private.external_ids.get(
|
||||
ovn_const.OVN_AGENT_METADATA_ID_KEY)
|
||||
|
||||
@property
|
||||
def agent_id(self):
|
||||
return self.chassis_private.external_ids.get(
|
||||
ovn_const.OVN_AGENT_METADATA_ID_KEY)
|
||||
return self.id_from_chassis_private(self.chassis_private)
|
||||
|
||||
@property
|
||||
def description(self):
|
||||
return self.chassis_private.external_ids.get(
|
||||
ovn_const.OVN_AGENT_METADATA_DESC_KEY, '')
|
||||
|
||||
|
||||
@utils.SingletonDecorator
|
||||
class AgentCache:
|
||||
def __init__(self, driver=None):
|
||||
# This is just to make pylint happy because it doesn't like calls to
|
||||
# AgentCache() with no arguments, despite init only being called the
|
||||
# first time--and we do really want a driver passed in.
|
||||
if driver is None:
|
||||
raise ValueError(_("driver cannot be None"))
|
||||
self.agents = {}
|
||||
self.driver = driver
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.agents.values())
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.agents[key]
|
||||
|
||||
def update(self, agent_type, row, updated_at=None, clear_down=False):
|
||||
cls = NeutronAgent.types[agent_type]
|
||||
try:
|
||||
agent = self.agents[cls.id_from_chassis_private(row)]
|
||||
agent.update(row, updated_at=updated_at, clear_down=clear_down)
|
||||
except KeyError:
|
||||
agent = NeutronAgent.from_type(agent_type, row, self.driver,
|
||||
updated_at=updated_at)
|
||||
self.agents[agent.agent_id] = agent
|
||||
return agent
|
||||
|
||||
def __delitem__(self, agent_id):
|
||||
del self.agents[agent_id]
|
||||
|
||||
def agents_by_chassis_private(self, chassis_private):
|
||||
# Get unique agent ids based on the chassis_private
|
||||
agent_ids = {cls.id_from_chassis_private(chassis_private)
|
||||
for cls in NeutronAgent.types.values()}
|
||||
# Return the cached agents of agent_ids whose keys are in the cache
|
||||
return (self.agents[id_] for id_ in agent_ids & self.agents.keys())
|
||||
|
@ -36,7 +36,6 @@ from oslo_config import cfg
|
||||
from oslo_db import exception as os_db_exc
|
||||
from oslo_log import log
|
||||
from oslo_utils import timeutils
|
||||
from ovsdbapp.backend.ovs_idl import idlutils
|
||||
|
||||
from neutron._i18n import _
|
||||
from neutron.common.ovn import acl as ovn_acl
|
||||
@ -64,7 +63,6 @@ import neutron.wsgi
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
METADATA_READY_WAIT_TIMEOUT = 15
|
||||
AGENTS = {}
|
||||
|
||||
|
||||
class MetadataServiceReadyWaitTimeoutException(Exception):
|
||||
@ -277,15 +275,12 @@ class OVNMechanismDriver(api.MechanismDriver):
|
||||
self.node_uuid = ovn_hash_ring_db.add_node(admin_context,
|
||||
self.hash_ring_group)
|
||||
|
||||
n_agent.AgentCache(self) # Initialize singleton agent cache
|
||||
self._nb_ovn, self._sb_ovn = impl_idl_ovn.get_ovn_idls(self, trigger)
|
||||
|
||||
if self._sb_ovn.is_table_present('Chassis_Private'):
|
||||
self.agent_chassis_table = 'Chassis_Private'
|
||||
|
||||
# AGENTS must be populated after fork so if ovn-controller is stopped
|
||||
# before a worker handles a get_agents request, we still show agents
|
||||
populate_agents(self)
|
||||
|
||||
# Override agents API methods
|
||||
self.patch_plugin_merge("get_agents", get_agents)
|
||||
self.patch_plugin_choose("get_agent", get_agent)
|
||||
@ -1130,38 +1125,6 @@ class OVNMechanismDriver(api.MechanismDriver):
|
||||
" neutron-ovn-metadata-agent status/logs.",
|
||||
port_id)
|
||||
|
||||
def agent_alive(self, agent, update_db):
|
||||
# Allow a maximum of 1 difference between expected and read values
|
||||
# to avoid false positives.
|
||||
if self._nb_ovn.nb_global.nb_cfg - agent.nb_cfg <= 1:
|
||||
if update_db:
|
||||
self.mark_agent_alive(agent)
|
||||
return True
|
||||
|
||||
now = timeutils.utcnow(with_timezone=True)
|
||||
if (now - agent.updated_at).total_seconds() < cfg.CONF.agent_down_time:
|
||||
# down, but not yet timed out
|
||||
return True
|
||||
return False
|
||||
|
||||
def mark_agent_alive(self, agent):
|
||||
# Update the time of our successful check
|
||||
value = timeutils.utcnow(with_timezone=True).isoformat()
|
||||
self._sb_ovn.db_set(
|
||||
self.agent_chassis_table, agent.chassis_private.uuid,
|
||||
('external_ids', {agent.key: value})).execute(check_error=True)
|
||||
|
||||
def agents_from_chassis(self, chassis_private, update_db=True):
|
||||
agent_dict = {}
|
||||
# For each Chassis there will possibly be a Metadata agent and either
|
||||
# a Controller or Controller Gateway agent.
|
||||
for agent in n_agent.NeutronAgent.agents_from_chassis(chassis_private):
|
||||
if not agent.agent_id:
|
||||
continue
|
||||
alive = self.agent_alive(agent, update_db)
|
||||
agent_dict[agent.agent_id] = agent.as_dict(alive)
|
||||
return agent_dict
|
||||
|
||||
def patch_plugin_merge(self, method_name, new_fn, op=operator.add):
|
||||
old_method = getattr(self._plugin, method_name)
|
||||
|
||||
@ -1228,42 +1191,22 @@ class OVNMechanismDriver(api.MechanismDriver):
|
||||
return azs
|
||||
|
||||
|
||||
def populate_agents(driver):
|
||||
for ch in driver._sb_ovn.tables[driver.agent_chassis_table].rows.values():
|
||||
# update the cache, rows are hashed on uuid but it is the name that
|
||||
# stays consistent across ovn-controller restarts
|
||||
AGENTS.update({ch.name: ch})
|
||||
|
||||
|
||||
def get_agents(self, context, filters=None, fields=None, _driver=None):
|
||||
update_db = _driver.ping_all_chassis()
|
||||
_driver.ping_all_chassis()
|
||||
filters = filters or {}
|
||||
agent_list = []
|
||||
populate_agents(_driver)
|
||||
for ch in AGENTS.values():
|
||||
for agent in _driver.agents_from_chassis(ch, update_db).values():
|
||||
if all(agent[k] in v for k, v in filters.items()):
|
||||
agent_list.append(agent)
|
||||
for agent in n_agent.AgentCache():
|
||||
agent_dict = agent.as_dict()
|
||||
if all(agent_dict[k] in v for k, v in filters.items()):
|
||||
agent_list.append(agent_dict)
|
||||
return agent_list
|
||||
|
||||
|
||||
def get_agent(self, context, id, fields=None, _driver=None):
|
||||
chassis = None
|
||||
try:
|
||||
# look up Chassis by *name*, which the id attribute is
|
||||
chassis = _driver._sb_ovn.lookup(_driver.agent_chassis_table, id)
|
||||
except idlutils.RowNotFound:
|
||||
# If the UUID is not found, check for the metadata agent ID
|
||||
for ch in _driver._sb_ovn.tables[
|
||||
_driver.agent_chassis_table].rows.values():
|
||||
metadata_agent_id = ch.external_ids.get(
|
||||
ovn_const.OVN_AGENT_METADATA_ID_KEY)
|
||||
if id == metadata_agent_id:
|
||||
chassis = ch
|
||||
break
|
||||
else:
|
||||
return n_agent.AgentCache()[id].as_dict()
|
||||
except KeyError:
|
||||
raise n_exc.agent.AgentNotFound(id=id)
|
||||
return _driver.agents_from_chassis(chassis)[id]
|
||||
|
||||
|
||||
def update_agent(self, context, id, agent, _driver=None):
|
||||
@ -1289,9 +1232,28 @@ def update_agent(self, context, id, agent, _driver=None):
|
||||
|
||||
|
||||
def delete_agent(self, context, id, _driver=None):
|
||||
get_agent(self, None, id, _driver=_driver)
|
||||
raise n_exc.BadRequest(resource='agent',
|
||||
msg='OVN agents cannot be deleted')
|
||||
# raise AgentNotFound if this isn't an ml2/ovn-related agent
|
||||
agent = get_agent(self, None, id, _driver=_driver)
|
||||
|
||||
# NOTE(twilson) According to the API docs, an agent must be disabled
|
||||
# before deletion. Otherwise, behavior seems to be undefined. We could
|
||||
# check that alive=False before allowing deletion, but depending on the
|
||||
# agent_down_time setting, that could take quite a while.
|
||||
# If ovn-controller is up, the Chassis will be recreated and so the agent
|
||||
# will still show as up. The recreated Chassis will cause all kinds of
|
||||
# events to fire. But again, undefined behavior.
|
||||
chassis_name = agent['configurations']['chassis_name']
|
||||
_driver._sb_ovn.chassis_del(chassis_name, if_exists=True).execute(
|
||||
check_error=True)
|
||||
# Send a specific event that all API workers can get to delete the agent
|
||||
# from their caches. Ideally we could send a single transaction that both
|
||||
# created and deleted the key, but alas python-ovs is too "smart"
|
||||
_driver._sb_ovn.db_set(
|
||||
'SB_Global', '.', ('external_ids', {'delete_agent': str(id)})).execute(
|
||||
check_error=True)
|
||||
_driver._sb_ovn.db_remove(
|
||||
'SB_Global', '.', 'external_ids', delete_agent=str(id),
|
||||
if_exists=True).execute(check_error=True)
|
||||
|
||||
|
||||
def create_default_drop_port_group(nb_idl):
|
||||
|
@ -34,6 +34,7 @@ from neutron.common.ovn import hash_ring_manager
|
||||
from neutron.common.ovn import utils
|
||||
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf
|
||||
from neutron.db import ovn_hash_ring_db
|
||||
from neutron.plugins.ml2.drivers.ovn.agent import neutron_agent as n_agent
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -213,6 +214,92 @@ class PortBindingChassisUpdateEvent(row_event.RowEvent):
|
||||
self.driver.set_port_status_up(row.logical_port)
|
||||
|
||||
|
||||
class ChassisAgentEvent(BaseEvent):
|
||||
GLOBAL = True
|
||||
|
||||
# NOTE (twilson) Do not run new transactions out of a GLOBAL Event since
|
||||
# it will be running on every single process, and you almost certainly
|
||||
# don't want to insert/update/delete something a bajillion times.
|
||||
def __init__(self, driver):
|
||||
self.driver = driver
|
||||
super().__init__()
|
||||
|
||||
@property
|
||||
def table(self):
|
||||
# It probably doesn't matter, but since agent_chassis_table changes
|
||||
# in post_fork_initialize(), resolve this at runtime
|
||||
return self.driver.agent_chassis_table
|
||||
|
||||
@table.setter
|
||||
def table(self, value):
|
||||
pass
|
||||
|
||||
|
||||
class ChassisAgentDownEvent(ChassisAgentEvent):
|
||||
events = (BaseEvent.ROW_DELETE,)
|
||||
|
||||
def run(self, event, row, old):
|
||||
for agent in n_agent.AgentCache().agents_by_chassis_private(row):
|
||||
agent.set_down = True
|
||||
|
||||
def match_fn(self, event, row, old=None):
|
||||
return True
|
||||
|
||||
|
||||
class ChassisAgentDeleteEvent(ChassisAgentEvent):
|
||||
events = (BaseEvent.ROW_UPDATE,)
|
||||
table = 'SB_Global'
|
||||
|
||||
def match_fn(self, event, row, old=None):
|
||||
try:
|
||||
return (old.external_ids.get('delete_agent') !=
|
||||
row.external_ids['delete_agent'])
|
||||
except (AttributeError, KeyError):
|
||||
return False
|
||||
|
||||
def run(self, event, row, old):
|
||||
del n_agent.AgentCache()[row.external_ids['delete_agent']]
|
||||
|
||||
|
||||
class ChassisAgentWriteEvent(ChassisAgentEvent):
|
||||
events = (BaseEvent.ROW_CREATE, BaseEvent.ROW_UPDATE)
|
||||
|
||||
def match_fn(self, event, row, old=None):
|
||||
return event == self.ROW_CREATE or getattr(old, 'nb_cfg', False)
|
||||
|
||||
def run(self, event, row, old):
|
||||
n_agent.AgentCache().update(ovn_const.OVN_CONTROLLER_AGENT, row,
|
||||
clear_down=event == self.ROW_CREATE)
|
||||
|
||||
|
||||
class ChassisMetadataAgentWriteEvent(ChassisAgentEvent):
|
||||
events = (BaseEvent.ROW_CREATE, BaseEvent.ROW_UPDATE)
|
||||
|
||||
@staticmethod
|
||||
def _metadata_nb_cfg(row):
|
||||
return int(
|
||||
row.external_ids.get(ovn_const.OVN_AGENT_METADATA_SB_CFG_KEY, -1))
|
||||
|
||||
@staticmethod
|
||||
def agent_id(row):
|
||||
return row.external_ids.get(ovn_const.OVN_AGENT_METADATA_ID_KEY)
|
||||
|
||||
def match_fn(self, event, row, old=None):
|
||||
if not self.agent_id(row):
|
||||
# Don't create a cached object with an agent_id of 'None'
|
||||
return False
|
||||
if event == self.ROW_CREATE:
|
||||
return True
|
||||
try:
|
||||
return self._metadata_nb_cfg(row) != self._metadata_nb_cfg(old)
|
||||
except (AttributeError, KeyError):
|
||||
return False
|
||||
|
||||
def run(self, event, row, old):
|
||||
n_agent.AgentCache().update(ovn_const.OVN_METADATA_AGENT, row,
|
||||
clear_down=True)
|
||||
|
||||
|
||||
class PortBindingChassisEvent(row_event.RowEvent):
|
||||
"""Port_Binding update event - set chassis for chassisredirect port.
|
||||
|
||||
@ -359,8 +446,24 @@ class NeutronPgDropPortGroupCreated(row_event.WaitEvent):
|
||||
|
||||
class OvnDbNotifyHandler(row_event.RowEventHandler):
|
||||
def __init__(self, driver):
|
||||
super(OvnDbNotifyHandler, self).__init__()
|
||||
self.driver = driver
|
||||
super(OvnDbNotifyHandler, self).__init__()
|
||||
try:
|
||||
self._lock = self._RowEventHandler__lock
|
||||
self._watched_events = self._RowEventHandler__watched_events
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
def notify(self, event, row, updates=None, global_=False):
|
||||
matching = self.matching_events(event, row, updates, global_)
|
||||
for match in matching:
|
||||
self.notifications.put((match, event, row, updates))
|
||||
|
||||
def matching_events(self, event, row, updates, global_=False):
|
||||
with self._lock:
|
||||
return tuple(t for t in self._watched_events
|
||||
if getattr(t, 'GLOBAL', False) == global_ and
|
||||
self.match(t, event, row, updates))
|
||||
|
||||
|
||||
class Ml2OvnIdlBase(connection.OvsdbIdl):
|
||||
@ -448,12 +551,12 @@ class OvnIdlDistributedLock(BaseOvnIdl):
|
||||
self._last_touch = None
|
||||
|
||||
def notify(self, event, row, updates=None):
|
||||
self.notify_handler.notify(event, row, updates, global_=True)
|
||||
try:
|
||||
target_node = self._hash_ring.get_node(str(row.uuid))
|
||||
except exceptions.HashRingIsEmpty as e:
|
||||
LOG.error('HashRing is empty, error: %s', e)
|
||||
return
|
||||
|
||||
if target_node != self._node_uuid:
|
||||
return
|
||||
|
||||
@ -530,6 +633,14 @@ class OvnNbIdl(OvnIdlDistributedLock):
|
||||
|
||||
class OvnSbIdl(OvnIdlDistributedLock):
|
||||
|
||||
def __init__(self, driver, remote, schema):
|
||||
super(OvnSbIdl, self).__init__(driver, remote, schema)
|
||||
self.notify_handler.watch_events([
|
||||
ChassisAgentDeleteEvent(self.driver),
|
||||
ChassisAgentDownEvent(self.driver),
|
||||
ChassisAgentWriteEvent(self.driver),
|
||||
ChassisMetadataAgentWriteEvent(self.driver)])
|
||||
|
||||
@classmethod
|
||||
def from_server(cls, connection_string, schema_name, driver):
|
||||
_check_and_set_ssl_files(schema_name)
|
||||
@ -541,6 +652,7 @@ class OvnSbIdl(OvnIdlDistributedLock):
|
||||
helper.register_table('Port_Binding')
|
||||
helper.register_table('Datapath_Binding')
|
||||
helper.register_table('MAC_Binding')
|
||||
helper.register_columns('SB_Global', ['external_ids'])
|
||||
return cls(driver, connection_string, helper)
|
||||
|
||||
def post_connect(self):
|
||||
|
@ -15,7 +15,6 @@
|
||||
from unittest import mock
|
||||
|
||||
import fixtures as og_fixtures
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutron.common.ovn import constants as ovn_const
|
||||
@ -70,6 +69,10 @@ class DistributedLockTestEvent(event.WaitEvent):
|
||||
self.event.set()
|
||||
|
||||
|
||||
class GlobalTestEvent(DistributedLockTestEvent):
|
||||
GLOBAL = True
|
||||
|
||||
|
||||
class TestNBDbMonitor(base.TestOVNFunctionalBase):
|
||||
|
||||
def setUp(self):
|
||||
@ -198,15 +201,12 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
|
||||
self._test_port_binding_and_status(port['id'], 'bind', 'ACTIVE')
|
||||
self._test_port_binding_and_status(port['id'], 'unbind', 'DOWN')
|
||||
|
||||
def test_distributed_lock(self):
|
||||
api_workers = 11
|
||||
cfg.CONF.set_override('api_workers', api_workers)
|
||||
row_event = DistributedLockTestEvent()
|
||||
def _create_workers(self, row_event, worker_num):
|
||||
self.mech_driver._nb_ovn.idl.notify_handler.watch_event(row_event)
|
||||
worker_list = [self.mech_driver._nb_ovn, ]
|
||||
worker_list = [self.mech_driver._nb_ovn]
|
||||
|
||||
# Create 10 fake workers
|
||||
for _ in range(api_workers - len(worker_list)):
|
||||
for _ in range(worker_num):
|
||||
node_uuid = uuidutils.generate_uuid()
|
||||
db_hash_ring.add_node(
|
||||
self.context, ovn_const.HASH_RING_ML2_GROUP, node_uuid)
|
||||
@ -228,11 +228,17 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
|
||||
|
||||
# Assert we have 11 active workers in the ring
|
||||
self.assertEqual(
|
||||
11, len(db_hash_ring.get_active_nodes(
|
||||
worker_num + 1,
|
||||
len(db_hash_ring.get_active_nodes(
|
||||
self.context,
|
||||
interval=ovn_const.HASH_RING_NODES_TIMEOUT,
|
||||
group_name=ovn_const.HASH_RING_ML2_GROUP)))
|
||||
|
||||
return worker_list
|
||||
|
||||
def test_distributed_lock(self):
|
||||
row_event = DistributedLockTestEvent()
|
||||
self._create_workers(row_event, worker_num=10)
|
||||
# Trigger the event
|
||||
self.create_port()
|
||||
|
||||
@ -242,6 +248,30 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
|
||||
# Assert that only one worker handled the event
|
||||
self.assertEqual(1, row_event.COUNTER)
|
||||
|
||||
def test_global_events(self):
|
||||
worker_num = 10
|
||||
distributed_event = DistributedLockTestEvent()
|
||||
global_event = GlobalTestEvent()
|
||||
worker_list = self._create_workers(distributed_event, worker_num)
|
||||
for worker in worker_list:
|
||||
worker.idl.notify_handler.watch_event(global_event)
|
||||
|
||||
# This should generate one distributed even handled by a single worker
|
||||
# and one global event, that should be handled by all workers
|
||||
self.create_port()
|
||||
|
||||
# Wait for the distributed event to complete
|
||||
self.assertTrue(distributed_event.wait())
|
||||
|
||||
# Assert that only one worker handled the distributed event
|
||||
self.assertEqual(1, distributed_event.COUNTER)
|
||||
|
||||
n_utils.wait_until_true(
|
||||
lambda: global_event.COUNTER == worker_num + 1,
|
||||
exception=Exception(
|
||||
"Fanout event didn't get handled expected %d times" %
|
||||
(worker_num + 1)))
|
||||
|
||||
|
||||
class TestNBDbMonitorOverTcp(TestNBDbMonitor):
|
||||
def get_ovsdb_server_protocol(self):
|
||||
|
@ -17,8 +17,10 @@ from unittest import mock
|
||||
|
||||
from neutron_lib.api.definitions import portbindings
|
||||
from neutron_lib import constants
|
||||
from neutron_lib.exceptions import agent as agent_exc
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import uuidutils
|
||||
from ovsdbapp.backend.ovs_idl import event
|
||||
from ovsdbapp.tests.functional import base as ovs_base
|
||||
|
||||
from neutron.common.ovn import constants as ovn_const
|
||||
@ -744,23 +746,75 @@ class TestProvnetPorts(base.TestOVNFunctionalBase):
|
||||
self.assertIsNone(ovn_localnetport)
|
||||
|
||||
|
||||
class AgentWaitEvent(event.WaitEvent):
|
||||
"""Wait for a list of Chassis to be created"""
|
||||
|
||||
ONETIME = False
|
||||
|
||||
def __init__(self, driver, chassis_names):
|
||||
table = driver.agent_chassis_table
|
||||
events = (self.ROW_CREATE,)
|
||||
self.chassis_names = chassis_names
|
||||
super().__init__(events, table, None)
|
||||
self.event_name = "AgentWaitEvent"
|
||||
|
||||
def match_fn(self, event, row, old):
|
||||
return row.name in self.chassis_names
|
||||
|
||||
def run(self, event, row, old):
|
||||
self.chassis_names.remove(row.name)
|
||||
if not self.chassis_names:
|
||||
self.event.set()
|
||||
|
||||
|
||||
class TestAgentApi(base.TestOVNFunctionalBase):
|
||||
TEST_AGENT = 'test'
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.host = 'test-host'
|
||||
self.controller_agent = self.add_fake_chassis(self.host)
|
||||
self.host = n_utils.get_rand_name(prefix='testhost-')
|
||||
self.plugin = self.mech_driver._plugin
|
||||
agent = {'agent_type': 'test', 'binary': '/bin/test',
|
||||
'host': self.host, 'topic': 'test_topic'}
|
||||
_, status = self.plugin.create_or_update_agent(self.context, agent)
|
||||
self.test_agent = status['id']
|
||||
mock.patch.object(self.mech_driver, 'ping_all_chassis',
|
||||
return_value=False).start()
|
||||
|
||||
def test_agent_show_non_ovn(self):
|
||||
self.assertTrue(self.plugin.get_agent(self.context, self.test_agent))
|
||||
metadata_agent_id = uuidutils.generate_uuid()
|
||||
# To be *mostly* sure the agent cache has been updated, we need to
|
||||
# wait for the Chassis events to run. So add a new event that should
|
||||
# run afterthey do and wait for it. I've only had to do this when
|
||||
# adding *a bunch* of Chassis at a time, but better safe than sorry.
|
||||
chassis_name = uuidutils.generate_uuid()
|
||||
agent_event = AgentWaitEvent(self.mech_driver, [chassis_name])
|
||||
self.sb_api.idl.notify_handler.watch_event(agent_event)
|
||||
|
||||
def test_agent_show_ovn_controller(self):
|
||||
self.assertTrue(self.plugin.get_agent(self.context,
|
||||
self.controller_agent))
|
||||
self.chassis = self.add_fake_chassis(self.host, name=chassis_name,
|
||||
external_ids={
|
||||
ovn_const.OVN_AGENT_METADATA_ID_KEY: metadata_agent_id})
|
||||
|
||||
self.assertTrue(agent_event.wait())
|
||||
|
||||
self.agent_types = {
|
||||
self.TEST_AGENT: self._create_test_agent(),
|
||||
ovn_const.OVN_CONTROLLER_AGENT: self.chassis,
|
||||
ovn_const.OVN_METADATA_AGENT: metadata_agent_id,
|
||||
}
|
||||
|
||||
def _create_test_agent(self):
|
||||
agent = {'agent_type': self.TEST_AGENT, 'binary': '/bin/test',
|
||||
'host': self.host, 'topic': 'test_topic'}
|
||||
_, status = self.plugin.create_or_update_agent(self.context, agent)
|
||||
return status['id']
|
||||
|
||||
def test_agent_show(self):
|
||||
for agent_id in self.agent_types.values():
|
||||
self.assertTrue(self.plugin.get_agent(self.context, agent_id))
|
||||
|
||||
def test_agent_list(self):
|
||||
agent_ids = [a['id'] for a in self.plugin.get_agents(
|
||||
self.context, filters={'host': self.host})]
|
||||
self.assertCountEqual(list(self.agent_types.values()), agent_ids)
|
||||
|
||||
def test_agent_delete(self):
|
||||
for agent_id in self.agent_types.values():
|
||||
self.plugin.delete_agent(self.context, agent_id)
|
||||
self.assertRaises(agent_exc.AgentNotFound, self.plugin.get_agent,
|
||||
self.context, agent_id)
|
||||
|
@ -217,13 +217,18 @@ class TestOvnIdlDistributedLock(base.BaseTestCase):
|
||||
hash_ring_manager.HashRingManager,
|
||||
'get_node', return_value=self.node_uuid).start()
|
||||
|
||||
def _assert_has_notify_calls(self):
|
||||
self.idl.notify_handler.notify.assert_has_calls([
|
||||
mock.call(self.fake_event, self.fake_row, None, global_=True),
|
||||
mock.call(self.fake_event, self.fake_row, None)])
|
||||
self.assertEqual(2, len(self.idl.notify_handler.mock_calls))
|
||||
|
||||
@mock.patch.object(ovn_hash_ring_db, 'touch_node')
|
||||
def test_notify(self, mock_touch_node):
|
||||
self.idl.notify(self.fake_event, self.fake_row)
|
||||
|
||||
mock_touch_node.assert_called_once_with(mock.ANY, self.node_uuid)
|
||||
self.idl.notify_handler.notify.assert_called_once_with(
|
||||
self.fake_event, self.fake_row, None)
|
||||
self._assert_has_notify_calls()
|
||||
|
||||
@mock.patch.object(ovn_hash_ring_db, 'touch_node')
|
||||
def test_notify_skip_touch_node(self, mock_touch_node):
|
||||
@ -233,8 +238,7 @@ class TestOvnIdlDistributedLock(base.BaseTestCase):
|
||||
|
||||
# Assert that touch_node() wasn't called
|
||||
self.assertFalse(mock_touch_node.called)
|
||||
self.idl.notify_handler.notify.assert_called_once_with(
|
||||
self.fake_event, self.fake_row, None)
|
||||
self._assert_has_notify_calls()
|
||||
|
||||
@mock.patch.object(ovn_hash_ring_db, 'touch_node')
|
||||
def test_notify_last_touch_expired(self, mock_touch_node):
|
||||
@ -250,8 +254,7 @@ class TestOvnIdlDistributedLock(base.BaseTestCase):
|
||||
|
||||
# Assert that touch_node() was invoked
|
||||
mock_touch_node.assert_called_once_with(mock.ANY, self.node_uuid)
|
||||
self.idl.notify_handler.notify.assert_called_once_with(
|
||||
self.fake_event, self.fake_row, None)
|
||||
self._assert_has_notify_calls()
|
||||
|
||||
@mock.patch.object(ovsdb_monitor.LOG, 'exception')
|
||||
@mock.patch.object(ovn_hash_ring_db, 'touch_node')
|
||||
@ -264,14 +267,14 @@ class TestOvnIdlDistributedLock(base.BaseTestCase):
|
||||
mock_touch_node.assert_called_once_with(mock.ANY, self.node_uuid)
|
||||
# Assert we are logging the exception
|
||||
self.assertTrue(mock_log.called)
|
||||
self.idl.notify_handler.notify.assert_called_once_with(
|
||||
self.fake_event, self.fake_row, None)
|
||||
self._assert_has_notify_calls()
|
||||
|
||||
def test_notify_different_node(self):
|
||||
self.mock_get_node.return_value = 'different-node-uuid'
|
||||
self.idl.notify('fake-event', self.fake_row)
|
||||
# Assert that notify() wasn't called for a different node uuid
|
||||
self.assertFalse(self.idl.notify_handler.notify.called)
|
||||
self.idl.notify_handler.notify.assert_called_once_with(
|
||||
self.fake_event, self.fake_row, None, global_=True)
|
||||
|
||||
|
||||
class TestPortBindingChassisUpdateEvent(base.BaseTestCase):
|
||||
@ -420,8 +423,9 @@ class TestOvnNbIdlNotifyHandler(test_mech_driver.OVNMechanismDriverTestCase):
|
||||
self.idl.notify_handler.notify = mock.Mock()
|
||||
self.idl.notify("create", row)
|
||||
# Assert that if the target_node returned by the ring is different
|
||||
# than this driver's node_uuid, notify() won't be called
|
||||
self.assertFalse(self.idl.notify_handler.notify.called)
|
||||
# than this driver's node_uuid, only global notify() won't be called
|
||||
self.idl.notify_handler.notify.assert_called_once_with(
|
||||
"create", row, None, global_=True)
|
||||
|
||||
|
||||
class TestOvnSbIdlNotifyHandler(test_mech_driver.OVNMechanismDriverTestCase):
|
||||
@ -432,6 +436,7 @@ class TestOvnSbIdlNotifyHandler(test_mech_driver.OVNMechanismDriverTestCase):
|
||||
super(TestOvnSbIdlNotifyHandler, self).setUp()
|
||||
sb_helper = ovs_idl.SchemaHelper(schema_json=OVN_SB_SCHEMA)
|
||||
sb_helper.register_table('Chassis')
|
||||
self.driver.agent_chassis_table = 'Chassis'
|
||||
self.sb_idl = ovsdb_monitor.OvnSbIdl(self.driver, "remote", sb_helper)
|
||||
self.sb_idl.post_connect()
|
||||
self.chassis_table = self.sb_idl.tables.get('Chassis')
|
||||
|
@ -87,6 +87,11 @@ class TestOVNMechanismDriver(test_plugin.Ml2PluginV2TestCase):
|
||||
super(TestOVNMechanismDriver, self).setUp()
|
||||
mm = directory.get_plugin().mechanism_manager
|
||||
self.mech_driver = mm.mech_drivers['ovn'].obj
|
||||
neutron_agent.AgentCache(self.mech_driver)
|
||||
# Because AgentCache is a singleton and we get a new mech_driver each
|
||||
# setUp(), override the AgentCache driver.
|
||||
neutron_agent.AgentCache().driver = self.mech_driver
|
||||
|
||||
self.mech_driver._nb_ovn = fakes.FakeOvsdbNbOvnIdl()
|
||||
self.mech_driver._sb_ovn = fakes.FakeOvsdbSbOvnIdl()
|
||||
self.mech_driver._ovn_client._qos_driver = mock.Mock()
|
||||
@ -1724,73 +1729,75 @@ class TestOVNMechanismDriver(test_plugin.Ml2PluginV2TestCase):
|
||||
self.plugin.update_port_status.assert_called_once_with(
|
||||
fake_context, fake_port['id'], const.PORT_STATUS_ACTIVE)
|
||||
|
||||
def _add_chassis_agent(self, nb_cfg, agent_type, updated_at=None):
|
||||
updated_at = updated_at or datetime.datetime.utcnow()
|
||||
def _add_chassis(self, nb_cfg):
|
||||
chassis_private = mock.Mock()
|
||||
chassis_private.nb_cfg = nb_cfg
|
||||
chassis_private.uuid = uuid.uuid4()
|
||||
chassis_private.name = str(uuid.uuid4())
|
||||
return chassis_private
|
||||
|
||||
def _add_chassis_agent(self, nb_cfg, agent_type, chassis_private=None,
|
||||
updated_at=None):
|
||||
updated_at = updated_at or timeutils.utcnow(with_timezone=True)
|
||||
chassis_private = chassis_private or self._add_chassis(nb_cfg)
|
||||
chassis_private.external_ids = {
|
||||
ovn_const.OVN_LIVENESS_CHECK_EXT_ID_KEY:
|
||||
datetime.datetime.isoformat(updated_at)}
|
||||
if agent_type == ovn_const.OVN_METADATA_AGENT:
|
||||
chassis_private.external_ids.update({
|
||||
ovn_const.OVN_AGENT_METADATA_SB_CFG_KEY: nb_cfg,
|
||||
ovn_const.METADATA_LIVENESS_CHECK_EXT_ID_KEY:
|
||||
datetime.datetime.isoformat(updated_at)})
|
||||
ovn_const.OVN_AGENT_METADATA_ID_KEY: str(uuid.uuid4())})
|
||||
chassis_private.chassis = [chassis_private]
|
||||
|
||||
return neutron_agent.NeutronAgent.from_type(
|
||||
agent_type, chassis_private)
|
||||
return neutron_agent.AgentCache().update(agent_type, chassis_private,
|
||||
updated_at)
|
||||
|
||||
def test_agent_alive_true(self):
|
||||
chassis_private = self._add_chassis(5)
|
||||
for agent_type in (ovn_const.OVN_CONTROLLER_AGENT,
|
||||
ovn_const.OVN_METADATA_AGENT):
|
||||
self.mech_driver._nb_ovn.nb_global.nb_cfg = 5
|
||||
agent = self._add_chassis_agent(5, agent_type)
|
||||
self.assertTrue(self.mech_driver.agent_alive(agent,
|
||||
update_db=True))
|
||||
# Assert that each Chassis has been updated in the SB database
|
||||
self.assertEqual(2, self.sb_ovn.db_set.call_count)
|
||||
agent = self._add_chassis_agent(5, agent_type, chassis_private)
|
||||
self.assertTrue(agent.alive, "Agent of type %s alive=%s" %
|
||||
(agent.agent_type, agent.alive))
|
||||
|
||||
def test_agent_alive_true_one_diff(self):
|
||||
# Agent should be reported as alive when the nb_cfg delta is 1
|
||||
# even if the last update time was old enough.
|
||||
nb_cfg = 5
|
||||
chassis_private = self._add_chassis(nb_cfg)
|
||||
for agent_type in (ovn_const.OVN_CONTROLLER_AGENT,
|
||||
ovn_const.OVN_METADATA_AGENT):
|
||||
self.mech_driver._nb_ovn.nb_global.nb_cfg = 5
|
||||
self.mech_driver._nb_ovn.nb_global.nb_cfg = nb_cfg + 1
|
||||
now = timeutils.utcnow()
|
||||
updated_at = now - datetime.timedelta(cfg.CONF.agent_down_time + 1)
|
||||
agent = self._add_chassis_agent(4, agent_type, updated_at)
|
||||
self.assertTrue(self.mech_driver.agent_alive(agent,
|
||||
update_db=True))
|
||||
agent = self._add_chassis_agent(nb_cfg, agent_type,
|
||||
chassis_private, updated_at)
|
||||
self.assertTrue(agent.alive, "Agent of type %s alive=%s" %
|
||||
(agent.agent_type, agent.alive))
|
||||
|
||||
def test_agent_alive_not_timed_out(self):
|
||||
nb_cfg = 3
|
||||
chassis_private = self._add_chassis(nb_cfg)
|
||||
for agent_type in (ovn_const.OVN_CONTROLLER_AGENT,
|
||||
ovn_const.OVN_METADATA_AGENT):
|
||||
self.mech_driver._nb_ovn.nb_global.nb_cfg = 5
|
||||
agent = self._add_chassis_agent(3, agent_type)
|
||||
self.assertTrue(self.mech_driver.agent_alive(
|
||||
agent, update_db=True),
|
||||
"Agent type %s is not alive" % agent_type)
|
||||
self.mech_driver._nb_ovn.nb_global.nb_cfg = nb_cfg + 2
|
||||
agent = self._add_chassis_agent(nb_cfg, agent_type,
|
||||
chassis_private)
|
||||
self.assertTrue(agent.alive, "Agent of type %s alive=%s" %
|
||||
(agent.agent_type, agent.alive))
|
||||
|
||||
def test_agent_alive_timed_out(self):
|
||||
nb_cfg = 3
|
||||
chassis_private = self._add_chassis(nb_cfg)
|
||||
for agent_type in (ovn_const.OVN_CONTROLLER_AGENT,
|
||||
ovn_const.OVN_METADATA_AGENT):
|
||||
self.mech_driver._nb_ovn.nb_global.nb_cfg = 5
|
||||
now = timeutils.utcnow()
|
||||
self.mech_driver._nb_ovn.nb_global.nb_cfg = nb_cfg + 2
|
||||
now = timeutils.utcnow(with_timezone=True)
|
||||
updated_at = now - datetime.timedelta(cfg.CONF.agent_down_time + 1)
|
||||
agent = self._add_chassis_agent(3, agent_type, updated_at)
|
||||
self.assertFalse(self.mech_driver.agent_alive(agent,
|
||||
update_db=True))
|
||||
|
||||
def test_agent_alive_true_skip_db_update(self):
|
||||
for agent_type in (ovn_const.OVN_CONTROLLER_AGENT,
|
||||
ovn_const.OVN_METADATA_AGENT):
|
||||
self.mech_driver._nb_ovn.nb_global.nb_cfg = 5
|
||||
agent = self._add_chassis_agent(5, agent_type)
|
||||
self.assertTrue(self.mech_driver.agent_alive(agent,
|
||||
update_db=False))
|
||||
self.sb_ovn.db_set.assert_not_called()
|
||||
agent = self._add_chassis_agent(nb_cfg, agent_type,
|
||||
chassis_private, updated_at)
|
||||
self.assertFalse(agent.alive, "Agent of type %s alive=%s" %
|
||||
(agent.agent_type, agent.alive))
|
||||
|
||||
def _test__update_dnat_entry_if_needed(self, up=True):
|
||||
ovn_conf.cfg.CONF.set_override(
|
||||
|
@ -0,0 +1,7 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Add support for deleting ML2/OVN agents. Previously, deleting an agent
|
||||
would return a Bad Request error. In addition to deleting the agent,
|
||||
this change also drastically improves the scalability of the ML2/OVN
|
||||
agent handling code.
|
Loading…
Reference in New Issue
Block a user