Moving Common Agent into separate module

Moving the CommonAgent and all it's unittests into a speparate module.

Closes-Bug: #1468803

Change-Id: Ifccc6ee1a77eef3928ad326cd5857092aeef4a17
This commit is contained in:
Andreas Scheuring 2016-01-28 10:28:43 +01:00
parent c4ec972987
commit 7d153a671b
4 changed files with 889 additions and 818 deletions

View File

@ -0,0 +1,405 @@
# Copyright (c) 2016 IBM Corp.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import sys
import time
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
from oslo_service import service
from neutron._i18n import _LE, _LI
from neutron.agent.l2.extensions import manager as ext_manager
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import config as common_config
from neutron.common import constants
from neutron.common import topics
from neutron import context
from neutron.plugins.ml2.drivers.agent import _agent_manager_base as amb
from neutron.plugins.ml2.drivers.agent import config as cagt_config # noqa
LOG = logging.getLogger(__name__)
LB_AGENT_BINARY = 'neutron-linuxbridge-agent'
BRIDGE_NAME_PREFIX = "brq"
VXLAN_INTERFACE_PREFIX = "vxlan-"
class CommonAgentLoop(service.Service):
def __init__(self, manager, polling_interval,
quitting_rpc_timeout, agent_type, agent_binary):
"""Constructor.
:param manager: the manager object containing the impl specifics
:param polling_interval: interval (secs) to poll DB.
:param quitting_rpc_timeout: timeout in seconds for rpc calls after
stop is called.
:param agent_type: Specifies the type of the agent
:param agent_binary: The agent binary string
"""
super(CommonAgentLoop, self).__init__()
self.mgr = manager
self._validate_manager_class()
self.polling_interval = polling_interval
self.quitting_rpc_timeout = quitting_rpc_timeout
self.agent_type = agent_type
self.agent_binary = agent_binary
def _validate_manager_class(self):
if not isinstance(self.mgr,
amb.CommonAgentManagerBase):
LOG.error(_LE("Manager class must inherit from "
"CommonAgentManagerBase to ensure CommonAgent "
"works properly."))
sys.exit(1)
def start(self):
self.prevent_arp_spoofing = cfg.CONF.AGENT.prevent_arp_spoofing
# stores all configured ports on agent
self.network_ports = collections.defaultdict(list)
# flag to do a sync after revival
self.fullsync = False
self.context = context.get_admin_context_without_session()
self.setup_rpc()
self.init_extension_manager(self.connection)
configurations = {'extensions': self.ext_manager.names()}
configurations.update(self.mgr.get_agent_configurations())
self.agent_state = {
'binary': self.agent_binary,
'host': cfg.CONF.host,
'topic': constants.L2_AGENT_TOPIC,
'configurations': configurations,
'agent_type': self.agent_type,
'start_flag': True}
report_interval = cfg.CONF.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
heartbeat.start(interval=report_interval)
self.daemon_loop()
def stop(self, graceful=True):
LOG.info(_LI("Stopping %s agent."), self.agent_type)
if graceful and self.quitting_rpc_timeout:
self.set_rpc_timeout(self.quitting_rpc_timeout)
super(CommonAgentLoop, self).stop(graceful)
def reset(self):
common_config.setup_logging()
def _report_state(self):
try:
devices = len(self.mgr.get_all_devices())
self.agent_state.get('configurations')['devices'] = devices
agent_status = self.state_rpc.report_state(self.context,
self.agent_state,
True)
if agent_status == constants.AGENT_REVIVED:
LOG.info(_LI('Agent has just been revived. '
'Doing a full sync.'))
self.fullsync = True
self.agent_state.pop('start_flag', None)
except Exception:
LOG.exception(_LE("Failed reporting state!"))
def _validate_rpc_endpoints(self):
if not isinstance(self.endpoints[0],
amb.CommonAgentManagerRpcCallBackBase):
LOG.error(_LE("RPC Callback class must inherit from "
"CommonAgentManagerRpcCallBackBase to ensure "
"CommonAgent works properly."))
sys.exit(1)
def setup_rpc(self):
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
self.sg_agent = sg_rpc.SecurityGroupAgentRpc(
self.context, self.sg_plugin_rpc, defer_refresh_firewall=True)
self.agent_id = self.mgr.get_agent_id()
LOG.info(_LI("RPC agent_id: %s"), self.agent_id)
self.topic = topics.AGENT
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
# RPC network init
# Handle updates from service
self.rpc_callbacks = self.mgr.get_rpc_callbacks(self.context, self,
self.sg_agent)
self.endpoints = [self.rpc_callbacks]
self._validate_rpc_endpoints()
# Define the listening consumers for the agent
consumers = self.mgr.get_rpc_consumers()
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
def init_extension_manager(self, connection):
ext_manager.register_opts(cfg.CONF)
self.ext_manager = (
ext_manager.AgentExtensionsManager(cfg.CONF))
self.ext_manager.initialize(
connection, self.mgr.get_extension_driver_type())
def _clean_network_ports(self, device):
for netid, ports_list in self.network_ports.items():
for port_data in ports_list:
if device == port_data['device']:
ports_list.remove(port_data)
if ports_list == []:
self.network_ports.pop(netid)
return port_data['port_id']
def _update_network_ports(self, network_id, port_id, device):
self._clean_network_ports(device)
self.network_ports[network_id].append({
"port_id": port_id,
"device": device
})
def process_network_devices(self, device_info):
resync_a = False
resync_b = False
self.sg_agent.setup_port_filters(device_info.get('added'),
device_info.get('updated'))
# Updated devices are processed the same as new ones, as their
# admin_state_up may have changed. The set union prevents duplicating
# work when a device is new and updated in the same polling iteration.
devices_added_updated = (set(device_info.get('added'))
| set(device_info.get('updated')))
if devices_added_updated:
resync_a = self.treat_devices_added_updated(devices_added_updated)
if device_info.get('removed'):
resync_b = self.treat_devices_removed(device_info['removed'])
# If one of the above operations fails => resync with plugin
return (resync_a | resync_b)
def treat_devices_added_updated(self, devices):
try:
devices_details_list = self.plugin_rpc.get_devices_details_list(
self.context, devices, self.agent_id)
except Exception:
LOG.exception(_LE("Unable to get port details for %s"), devices)
# resync is needed
return True
for device_details in devices_details_list:
device = device_details['device']
LOG.debug("Port %s added", device)
if 'port_id' in device_details:
LOG.info(_LI("Port %(device)s updated. Details: %(details)s"),
{'device': device, 'details': device_details})
if self.prevent_arp_spoofing:
self.mgr.setup_arp_spoofing_protection(device,
device_details)
segment = amb.NetworkSegment(
device_details.get('network_type'),
device_details['physical_network'],
device_details.get('segmentation_id')
)
network_id = device_details['network_id']
self.rpc_callbacks.add_network(network_id, segment)
interface_plugged = self.mgr.plug_interface(
network_id, segment,
device, device_details['device_owner'])
# REVISIT(scheuran): Changed the way how ports admin_state_up
# is implemented.
#
# Old lb implementation:
# - admin_state_up: ensure that tap is plugged into bridge
# - admin_state_down: remove tap from bridge
# New lb implementation:
# - admin_state_up: set tap device state to up
# - admin_state_down: set tap device state to down
#
# However both approaches could result in races with
# nova/libvirt and therefore to an invalid system state in the
# scenario, where an instance is booted with a port configured
# with admin_state_up = False:
#
# Libvirt does the following actions in exactly
# this order (see libvirt virnetdevtap.c)
# 1) Create the tap device, set its MAC and MTU
# 2) Plug the tap into the bridge
# 3) Set the tap online
#
# Old lb implementation:
# A race could occur, if the lb agent removes the tap device
# right after step 1). Then libvirt will add it to the bridge
# again in step 2).
# New lb implementation:
# The race could occur if the lb-agent sets the taps device
# state to down right after step 2). In step 3) libvirt
# might set it to up again.
#
# This is not an issue if an instance is booted with a port
# configured with admin_state_up = True. Libvirt would just
# set the tap device up again.
#
# This refactoring is recommended for the following reasons:
# 1) An existing race with libvirt caused by the behavior of
# the old implementation. See Bug #1312016
# 2) The new code is much more readable
if interface_plugged:
self.mgr.ensure_port_admin_state(
device, device_details['admin_state_up'])
# update plugin about port status if admin_state is up
if device_details['admin_state_up']:
if interface_plugged:
self.plugin_rpc.update_device_up(self.context,
device,
self.agent_id,
cfg.CONF.host)
else:
self.plugin_rpc.update_device_down(self.context,
device,
self.agent_id,
cfg.CONF.host)
self._update_network_ports(device_details['network_id'],
device_details['port_id'],
device_details['device'])
self.ext_manager.handle_port(self.context, device_details)
else:
LOG.info(_LI("Device %s not defined on plugin"), device)
# no resync is needed
return False
def treat_devices_removed(self, devices):
resync = False
self.sg_agent.remove_devices_filter(devices)
for device in devices:
LOG.info(_LI("Attachment %s removed"), device)
details = None
try:
details = self.plugin_rpc.update_device_down(self.context,
device,
self.agent_id,
cfg.CONF.host)
except Exception:
LOG.exception(_LE("Error occurred while removing port %s"),
device)
resync = True
if details and details['exists']:
LOG.info(_LI("Port %s updated."), device)
else:
LOG.debug("Device %s not defined on plugin", device)
port_id = self._clean_network_ports(device)
self.ext_manager.delete_port(self.context,
{'device': device,
'port_id': port_id})
if self.prevent_arp_spoofing:
self.mgr.delete_arp_spoofing_protection(devices)
return resync
def scan_devices(self, previous, sync):
device_info = {}
updated_devices = self.rpc_callbacks.get_and_clear_updated_devices()
current_devices = self.mgr.get_all_devices()
device_info['current'] = current_devices
if previous is None:
# This is the first iteration of daemon_loop().
previous = {'added': set(),
'current': set(),
'updated': set(),
'removed': set()}
# clear any orphaned ARP spoofing rules (e.g. interface was
# manually deleted)
if self.prevent_arp_spoofing:
self.mgr.delete_unreferenced_arp_protection(current_devices)
if sync:
# This is the first iteration, or the previous one had a problem.
# Re-add all existing devices.
device_info['added'] = current_devices
# Retry cleaning devices that may not have been cleaned properly.
# And clean any that disappeared since the previous iteration.
device_info['removed'] = (previous['removed'] | previous['current']
- current_devices)
# Retry updating devices that may not have been updated properly.
# And any that were updated since the previous iteration.
# Only update devices that currently exist.
device_info['updated'] = (previous['updated'] | updated_devices
& current_devices)
else:
device_info['added'] = current_devices - previous['current']
device_info['removed'] = previous['current'] - current_devices
device_info['updated'] = updated_devices & current_devices
return device_info
def _device_info_has_changes(self, device_info):
return (device_info.get('added')
or device_info.get('updated')
or device_info.get('removed'))
def daemon_loop(self):
LOG.info(_LI("LinuxBridge Agent RPC Daemon Started!"))
device_info = None
sync = True
while True:
start = time.time()
if self.fullsync:
sync = True
self.fullsync = False
if sync:
LOG.info(_LI("Agent out of sync with plugin!"))
device_info = self.scan_devices(previous=device_info, sync=sync)
sync = False
if (self._device_info_has_changes(device_info)
or self.sg_agent.firewall_refresh_needed()):
LOG.debug("Agent loop found changes! %s", device_info)
try:
sync = self.process_network_devices(device_info)
except Exception:
LOG.exception(_LE("Error in agent loop. Devices info: %s"),
device_info)
sync = True
# sleep till end of polling interval
elapsed = (time.time() - start)
if (elapsed < self.polling_interval):
time.sleep(self.polling_interval - elapsed)
else:
LOG.debug("Loop iteration exceeded interval "
"(%(polling_interval)s vs. %(elapsed)s)!",
{'polling_interval': self.polling_interval,
'elapsed': elapsed})
def set_rpc_timeout(self, timeout):
for rpc_api in (self.plugin_rpc, self.sg_plugin_rpc,
self.state_rpc):
rpc_api.client.timeout = timeout

View File

@ -19,34 +19,29 @@
# Based on the structure of the OpenVSwitch agent in the
# Neutron OpenVSwitch Plugin.
import collections
import sys
import time
import netaddr
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from oslo_service import loopingcall
from oslo_service import service
from oslo_utils import excutils
from six import moves
from neutron._i18n import _LE, _LI, _LW
from neutron.agent.l2.extensions import manager as ext_manager
from neutron.agent.linux import bridge_lib
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import config as common_config
from neutron.common import constants
from neutron.common import exceptions
from neutron.common import topics
from neutron.common import utils as n_utils
from neutron import context
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.agent import _agent_manager_base as amb
from neutron.plugins.ml2.drivers.agent import _common_agent as ca
from neutron.plugins.ml2.drivers.agent import config as cagt_config # noqa
from neutron.plugins.ml2.drivers.l2pop.rpc_manager \
import l2population_rpc as l2pop_rpc
@ -838,371 +833,6 @@ class LinuxBridgeRpcCallbacks(
getattr(self, method)(context, values)
class CommonAgentLoop(service.Service):
def __init__(self, manager, polling_interval,
quitting_rpc_timeout, agent_type, agent_binary):
"""Constructor.
:param manager: the manager object containing the impl specifics
:param polling_interval: interval (secs) to poll DB.
:param quitting_rpc_timeout: timeout in seconds for rpc calls after
stop is called.
:param agent_type: Specifies the type of the agent
:param agent_binary: The agent binary string
"""
super(CommonAgentLoop, self).__init__()
self.mgr = manager
self._validate_manager_class()
self.polling_interval = polling_interval
self.quitting_rpc_timeout = quitting_rpc_timeout
self.agent_type = agent_type
self.agent_binary = agent_binary
def _validate_manager_class(self):
if not isinstance(self.mgr,
amb.CommonAgentManagerBase):
LOG.error(_LE("Manager class must inherit from "
"CommonAgentManagerBase to ensure CommonAgent "
"works properly."))
sys.exit(1)
def start(self):
self.prevent_arp_spoofing = cfg.CONF.AGENT.prevent_arp_spoofing
# stores all configured ports on agent
self.network_ports = collections.defaultdict(list)
# flag to do a sync after revival
self.fullsync = False
self.context = context.get_admin_context_without_session()
self.setup_rpc()
self.init_extension_manager(self.connection)
configurations = {'extensions': self.ext_manager.names()}
configurations.update(self.mgr.get_agent_configurations())
self.agent_state = {
'binary': self.agent_binary,
'host': cfg.CONF.host,
'topic': constants.L2_AGENT_TOPIC,
'configurations': configurations,
'agent_type': self.agent_type,
'start_flag': True}
report_interval = cfg.CONF.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
heartbeat.start(interval=report_interval)
self.daemon_loop()
def stop(self, graceful=True):
LOG.info(_LI("Stopping %s agent."), self.agent_type)
if graceful and self.quitting_rpc_timeout:
self.set_rpc_timeout(self.quitting_rpc_timeout)
super(CommonAgentLoop, self).stop(graceful)
def reset(self):
common_config.setup_logging()
def _report_state(self):
try:
devices = len(self.mgr.get_all_devices())
self.agent_state.get('configurations')['devices'] = devices
agent_status = self.state_rpc.report_state(self.context,
self.agent_state,
True)
if agent_status == constants.AGENT_REVIVED:
LOG.info(_LI('Agent has just been revived. '
'Doing a full sync.'))
self.fullsync = True
self.agent_state.pop('start_flag', None)
except Exception:
LOG.exception(_LE("Failed reporting state!"))
def _validate_rpc_endpoints(self):
if not isinstance(self.endpoints[0],
amb.CommonAgentManagerRpcCallBackBase):
LOG.error(_LE("RPC Callback class must inherit from "
"CommonAgentManagerRpcCallBackBase to ensure "
"CommonAgent works properly."))
sys.exit(1)
def setup_rpc(self):
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
self.sg_agent = sg_rpc.SecurityGroupAgentRpc(
self.context, self.sg_plugin_rpc, defer_refresh_firewall=True)
self.agent_id = self.mgr.get_agent_id()
LOG.info(_LI("RPC agent_id: %s"), self.agent_id)
self.topic = topics.AGENT
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
# RPC network init
# Handle updates from service
self.rpc_callbacks = self.mgr.get_rpc_callbacks(self.context, self,
self.sg_agent)
self.endpoints = [self.rpc_callbacks]
self._validate_rpc_endpoints()
# Define the listening consumers for the agent
consumers = self.mgr.get_rpc_consumers()
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
def init_extension_manager(self, connection):
ext_manager.register_opts(cfg.CONF)
self.ext_manager = (
ext_manager.AgentExtensionsManager(cfg.CONF))
self.ext_manager.initialize(
connection, self.mgr.get_extension_driver_type())
def _clean_network_ports(self, device):
for netid, ports_list in self.network_ports.items():
for port_data in ports_list:
if device == port_data['device']:
ports_list.remove(port_data)
if ports_list == []:
self.network_ports.pop(netid)
return port_data['port_id']
def _update_network_ports(self, network_id, port_id, device):
self._clean_network_ports(device)
self.network_ports[network_id].append({
"port_id": port_id,
"device": device
})
def process_network_devices(self, device_info):
resync_a = False
resync_b = False
self.sg_agent.setup_port_filters(device_info.get('added'),
device_info.get('updated'))
# Updated devices are processed the same as new ones, as their
# admin_state_up may have changed. The set union prevents duplicating
# work when a device is new and updated in the same polling iteration.
devices_added_updated = (set(device_info.get('added'))
| set(device_info.get('updated')))
if devices_added_updated:
resync_a = self.treat_devices_added_updated(devices_added_updated)
if device_info.get('removed'):
resync_b = self.treat_devices_removed(device_info['removed'])
# If one of the above operations fails => resync with plugin
return (resync_a | resync_b)
def treat_devices_added_updated(self, devices):
try:
devices_details_list = self.plugin_rpc.get_devices_details_list(
self.context, devices, self.agent_id)
except Exception:
LOG.exception(_LE("Unable to get port details for %s"), devices)
# resync is needed
return True
for device_details in devices_details_list:
device = device_details['device']
LOG.debug("Port %s added", device)
if 'port_id' in device_details:
LOG.info(_LI("Port %(device)s updated. Details: %(details)s"),
{'device': device, 'details': device_details})
if self.prevent_arp_spoofing:
self.mgr.setup_arp_spoofing_protection(device,
device_details)
segment = amb.NetworkSegment(
device_details.get('network_type'),
device_details['physical_network'],
device_details.get('segmentation_id')
)
network_id = device_details['network_id']
self.rpc_callbacks.add_network(network_id, segment)
interface_plugged = self.mgr.plug_interface(
network_id, segment,
device, device_details['device_owner'])
# REVISIT(scheuran): Changed the way how ports admin_state_up
# is implemented.
#
# Old lb implementation:
# - admin_state_up: ensure that tap is plugged into bridge
# - admin_state_down: remove tap from bridge
# New lb implementation:
# - admin_state_up: set tap device state to up
# - admin_state_down: set tap device state to down
#
# However both approaches could result in races with
# nova/libvirt and therefore to an invalid system state in the
# scenario, where an instance is booted with a port configured
# with admin_state_up = False:
#
# Libvirt does the following actions in exactly
# this order (see libvirt virnetdevtap.c)
# 1) Create the tap device, set its MAC and MTU
# 2) Plug the tap into the bridge
# 3) Set the tap online
#
# Old lb implementation:
# A race could occur, if the lb agent removes the tap device
# right after step 1). Then libvirt will add it to the bridge
# again in step 2).
# New lb implementation:
# The race could occur if the lb-agent sets the taps device
# state to down right after step 2). In step 3) libvirt
# might set it to up again.
#
# This is not an issue if an instance is booted with a port
# configured with admin_state_up = True. Libvirt would just
# set the tap device up again.
#
# This refactoring is recommended for the following reasons:
# 1) An existing race with libvirt caused by the behavior of
# the old implementation. See Bug #1312016
# 2) The new code is much more readable
if interface_plugged:
self.mgr.ensure_port_admin_state(
device,
device_details['admin_state_up'])
# update plugin about port status if admin_state is up
if device_details['admin_state_up']:
if interface_plugged:
self.plugin_rpc.update_device_up(self.context,
device,
self.agent_id,
cfg.CONF.host)
else:
self.plugin_rpc.update_device_down(self.context,
device,
self.agent_id,
cfg.CONF.host)
self._update_network_ports(device_details['network_id'],
device_details['port_id'],
device_details['device'])
self.ext_manager.handle_port(self.context, device_details)
else:
LOG.info(_LI("Device %s not defined on plugin"), device)
# no resync is needed
return False
def treat_devices_removed(self, devices):
resync = False
self.sg_agent.remove_devices_filter(devices)
for device in devices:
LOG.info(_LI("Attachment %s removed"), device)
details = None
try:
details = self.plugin_rpc.update_device_down(self.context,
device,
self.agent_id,
cfg.CONF.host)
except Exception:
LOG.exception(_LE("Error occurred while removing port %s"),
device)
resync = True
if details and details['exists']:
LOG.info(_LI("Port %s updated."), device)
else:
LOG.debug("Device %s not defined on plugin", device)
port_id = self._clean_network_ports(device)
self.ext_manager.delete_port(self.context,
{'device': device,
'port_id': port_id})
if self.prevent_arp_spoofing:
self.mgr.delete_arp_spoofing_protection(devices)
return resync
def scan_devices(self, previous, sync):
device_info = {}
updated_devices = self.rpc_callbacks.get_and_clear_updated_devices()
current_devices = self.mgr.get_all_devices()
device_info['current'] = current_devices
if previous is None:
# This is the first iteration of daemon_loop().
previous = {'added': set(),
'current': set(),
'updated': set(),
'removed': set()}
# clear any orphaned ARP spoofing rules (e.g. interface was
# manually deleted)
if self.prevent_arp_spoofing:
self.mgr.delete_unreferenced_arp_protection(current_devices)
if sync:
# This is the first iteration, or the previous one had a problem.
# Re-add all existing devices.
device_info['added'] = current_devices
# Retry cleaning devices that may not have been cleaned properly.
# And clean any that disappeared since the previous iteration.
device_info['removed'] = (previous['removed'] | previous['current']
- current_devices)
# Retry updating devices that may not have been updated properly.
# And any that were updated since the previous iteration.
# Only update devices that currently exist.
device_info['updated'] = (previous['updated'] | updated_devices
& current_devices)
else:
device_info['added'] = current_devices - previous['current']
device_info['removed'] = previous['current'] - current_devices
device_info['updated'] = updated_devices & current_devices
return device_info
def _device_info_has_changes(self, device_info):
return (device_info.get('added')
or device_info.get('updated')
or device_info.get('removed'))
def daemon_loop(self):
LOG.info(_LI("LinuxBridge Agent RPC Daemon Started!"))
device_info = None
sync = True
while True:
start = time.time()
if self.fullsync:
sync = True
self.fullsync = False
if sync:
LOG.info(_LI("Agent out of sync with plugin!"))
device_info = self.scan_devices(previous=device_info, sync=sync)
sync = False
if (self._device_info_has_changes(device_info)
or self.sg_agent.firewall_refresh_needed()):
LOG.debug("Agent loop found changes! %s", device_info)
try:
sync = self.process_network_devices(device_info)
except Exception:
LOG.exception(_LE("Error in agent loop. Devices info: %s"),
device_info)
sync = True
# sleep till end of polling interval
elapsed = (time.time() - start)
if (elapsed < self.polling_interval):
time.sleep(self.polling_interval - elapsed)
else:
LOG.debug("Loop iteration exceeded interval "
"(%(polling_interval)s vs. %(elapsed)s)!",
{'polling_interval': self.polling_interval,
'elapsed': elapsed})
def set_rpc_timeout(self, timeout):
for rpc_api in (self.plugin_rpc, self.sg_plugin_rpc,
self.state_rpc):
rpc_api.client.timeout = timeout
def main():
common_config.init(sys.argv[1:])
@ -1229,9 +859,9 @@ def main():
polling_interval = cfg.CONF.AGENT.polling_interval
quitting_rpc_timeout = cfg.CONF.AGENT.quitting_rpc_timeout
agent = CommonAgentLoop(manager, polling_interval, quitting_rpc_timeout,
constants.AGENT_TYPE_LINUXBRIDGE,
LB_AGENT_BINARY)
agent = ca.CommonAgentLoop(manager, polling_interval, quitting_rpc_timeout,
constants.AGENT_TYPE_LINUXBRIDGE,
LB_AGENT_BINARY)
LOG.info(_LI("Agent initialized successfully, now running... "))
launcher = service.launch(cfg.CONF, agent)
launcher.wait()

View File

@ -0,0 +1,480 @@
# Copyright (c) 2016 IBM Corp.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
from neutron.agent.linux import bridge_lib
from neutron.common import constants
from neutron.plugins.ml2.drivers.agent import _agent_manager_base as amb
from neutron.plugins.ml2.drivers.agent import _common_agent as ca
from neutron.tests import base
LOCAL_IP = '192.168.0.33'
LOCAL_IPV6 = '2001:db8:1::33'
VXLAN_GROUPV6 = 'ff05::/120'
PORT_1 = 'abcdef01-12ddssdfds-fdsfsd'
DEVICE_1 = 'tapabcdef01-12'
NETWORK_ID = '57653b20-ed5b-4ed0-a31d-06f84e3fd909'
BRIDGE_MAPPING_VALUE = 'br-eth2'
BRIDGE_MAPPINGS = {'physnet0': BRIDGE_MAPPING_VALUE}
INTERFACE_MAPPINGS = {'physnet1': 'eth1'}
FAKE_DEFAULT_DEV = mock.Mock()
FAKE_DEFAULT_DEV.name = 'eth1'
PORT_DATA = {
"port_id": PORT_1,
"device": DEVICE_1
}
class TestCommonAgentLoop(base.BaseTestCase):
def setUp(self):
super(TestCommonAgentLoop, self).setUp()
# disable setting up periodic state reporting
cfg.CONF.set_override('report_interval', 0, 'AGENT')
cfg.CONF.set_override('prevent_arp_spoofing', False, 'AGENT')
cfg.CONF.set_default('firewall_driver',
'neutron.agent.firewall.NoopFirewallDriver',
group='SECURITYGROUP')
cfg.CONF.set_override('local_ip', LOCAL_IP, 'VXLAN')
self.get_bridge_names_p = mock.patch.object(bridge_lib,
'get_bridge_names')
self.get_bridge_names = self.get_bridge_names_p.start()
self.get_bridge_names.return_value = ["br-int", "brq1"]
manager = mock.Mock()
manager.get_all_devices.return_value = []
manager.get_agent_configurations.return_value = {}
manager.get_rpc_consumers.return_value = []
with mock.patch.object(ca.CommonAgentLoop, '_validate_manager_class'),\
mock.patch.object(ca.CommonAgentLoop, '_validate_rpc_endpoints'):
self.agent = ca.CommonAgentLoop(manager, 0, 10, 'fake_agent',
'foo-binary')
with mock.patch.object(self.agent, "daemon_loop"):
self.agent.start()
def test_treat_devices_removed_with_existed_device(self):
agent = self.agent
agent.mgr.ensure_port_admin_state = mock.Mock()
devices = [DEVICE_1]
agent.network_ports[NETWORK_ID].append(PORT_DATA)
with mock.patch.object(agent.plugin_rpc,
"update_device_down") as fn_udd,\
mock.patch.object(agent.sg_agent,
"remove_devices_filter") as fn_rdf,\
mock.patch.object(agent.ext_manager,
"delete_port") as ext_mgr_delete_port:
fn_udd.return_value = {'device': DEVICE_1,
'exists': True}
with mock.patch.object(ca.LOG, 'info') as log:
resync = agent.treat_devices_removed(devices)
self.assertEqual(2, log.call_count)
self.assertFalse(resync)
self.assertTrue(fn_udd.called)
self.assertTrue(fn_rdf.called)
self.assertTrue(ext_mgr_delete_port.called)
self.assertTrue(
PORT_DATA not in agent.network_ports[NETWORK_ID]
)
def test_treat_devices_removed_with_not_existed_device(self):
agent = self.agent
devices = [DEVICE_1]
agent.network_ports[NETWORK_ID].append(PORT_DATA)
with mock.patch.object(agent.plugin_rpc,
"update_device_down") as fn_udd,\
mock.patch.object(agent.sg_agent,
"remove_devices_filter") as fn_rdf,\
mock.patch.object(agent.ext_manager,
"delete_port") as ext_mgr_delete_port:
fn_udd.return_value = {'device': DEVICE_1,
'exists': False}
with mock.patch.object(ca.LOG, 'debug') as log:
resync = agent.treat_devices_removed(devices)
self.assertEqual(1, log.call_count)
self.assertFalse(resync)
self.assertTrue(fn_udd.called)
self.assertTrue(fn_rdf.called)
self.assertTrue(ext_mgr_delete_port.called)
self.assertTrue(
PORT_DATA not in agent.network_ports[NETWORK_ID]
)
def test_treat_devices_removed_failed(self):
agent = self.agent
devices = [DEVICE_1]
agent.network_ports[NETWORK_ID].append(PORT_DATA)
with mock.patch.object(agent.plugin_rpc,
"update_device_down") as fn_udd,\
mock.patch.object(agent.sg_agent,
"remove_devices_filter") as fn_rdf,\
mock.patch.object(agent.ext_manager,
"delete_port") as ext_mgr_delete_port:
fn_udd.side_effect = Exception()
resync = agent.treat_devices_removed(devices)
self.assertTrue(resync)
self.assertTrue(fn_udd.called)
self.assertTrue(fn_rdf.called)
self.assertTrue(ext_mgr_delete_port.called)
self.assertTrue(
PORT_DATA not in agent.network_ports[NETWORK_ID]
)
def test_treat_devices_removed_with_prevent_arp_spoofing_true(self):
agent = self.agent
agent.prevent_arp_spoofing = True
agent._ensure_port_admin_state = mock.Mock()
devices = [DEVICE_1]
with mock.patch.object(agent.plugin_rpc,
"update_device_down") as fn_udd,\
mock.patch.object(agent.sg_agent,
"remove_devices_filter"):
fn_udd.return_value = {'device': DEVICE_1,
'exists': True}
with mock.patch.object(agent.mgr,
'delete_arp_spoofing_protection') as de_arp:
agent.treat_devices_removed(devices)
de_arp.assert_called_with(devices)
def _test_scan_devices(self, previous, updated,
fake_current, expected, sync):
self.agent.mgr = mock.Mock()
self.agent.mgr.get_all_devices.return_value = fake_current
self.agent.rpc_callbacks.get_and_clear_updated_devices.return_value =\
updated
results = self.agent.scan_devices(previous, sync)
self.assertEqual(expected, results)
def test_scan_devices_no_changes(self):
previous = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()}
fake_current = set([1, 2])
updated = set()
expected = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False)
def test_scan_devices_added_removed(self):
previous = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()}
fake_current = set([2, 3])
updated = set()
expected = {'current': set([2, 3]),
'updated': set(),
'added': set([3]),
'removed': set([1])}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False)
def test_scan_devices_removed_retried_on_sync(self):
previous = {'current': set([2, 3]),
'updated': set(),
'added': set(),
'removed': set([1])}
fake_current = set([2, 3])
updated = set()
expected = {'current': set([2, 3]),
'updated': set(),
'added': set([2, 3]),
'removed': set([1])}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=True)
def test_scan_devices_vanished_removed_on_sync(self):
previous = {'current': set([2, 3]),
'updated': set(),
'added': set(),
'removed': set([1])}
# Device 2 disappeared.
fake_current = set([3])
updated = set()
# Device 1 should be retried.
expected = {'current': set([3]),
'updated': set(),
'added': set([3]),
'removed': set([1, 2])}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=True)
def test_scan_devices_updated(self):
previous = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()}
fake_current = set([1, 2])
updated = set([1])
expected = {'current': set([1, 2]),
'updated': set([1]),
'added': set(),
'removed': set()}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False)
def test_scan_devices_updated_non_existing(self):
previous = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()}
fake_current = set([1, 2])
updated = set([3])
expected = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False)
def test_scan_devices_updated_deleted_concurrently(self):
previous = {
'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()
}
# Device 2 disappeared.
fake_current = set([1])
# Device 2 got an concurrent update via network_update
updated = set([2])
expected = {
'current': set([1]),
'updated': set(),
'added': set(),
'removed': set([2])
}
self._test_scan_devices(
previous, updated, fake_current, expected, sync=False
)
def test_scan_devices_updated_on_sync(self):
previous = {'current': set([1, 2]),
'updated': set([1]),
'added': set(),
'removed': set()}
fake_current = set([1, 2])
updated = set([2])
expected = {'current': set([1, 2]),
'updated': set([1, 2]),
'added': set([1, 2]),
'removed': set()}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=True)
def test_scan_devices_with_prevent_arp_spoofing_true(self):
self.agent.prevent_arp_spoofing = True
previous = None
fake_current = set([1, 2])
updated = set()
expected = {'current': set([1, 2]),
'updated': set(),
'added': set([1, 2]),
'removed': set()}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False)
self.agent.mgr.delete_unreferenced_arp_protection.assert_called_with(
fake_current)
def test_process_network_devices(self):
agent = self.agent
device_info = {'current': set(),
'added': set(['tap3', 'tap4']),
'updated': set(['tap2', 'tap3']),
'removed': set(['tap1'])}
agent.sg_agent.setup_port_filters = mock.Mock()
agent.treat_devices_added_updated = mock.Mock(return_value=False)
agent.treat_devices_removed = mock.Mock(return_value=False)
agent.process_network_devices(device_info)
agent.sg_agent.setup_port_filters.assert_called_with(
device_info['added'],
device_info['updated'])
agent.treat_devices_added_updated.assert_called_with(set(['tap2',
'tap3',
'tap4']))
agent.treat_devices_removed.assert_called_with(set(['tap1']))
def test_treat_devices_added_updated_no_local_interface(self):
agent = self.agent
mock_details = {'device': 'dev123',
'port_id': 'port123',
'network_id': 'net123',
'admin_state_up': True,
'network_type': 'vlan',
'segmentation_id': 100,
'physical_network': 'physnet1',
'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX}
agent.ext_manager = mock.Mock()
agent.plugin_rpc = mock.Mock()
agent.plugin_rpc.get_devices_details_list.return_value = [mock_details]
agent.mgr = mock.Mock()
agent.mgr.plug_interface.return_value = False
agent.mgr.ensure_port_admin_state = mock.Mock()
agent.treat_devices_added_updated(set(['tap1']))
self.assertFalse(agent.mgr.ensure_port_admin_state.called)
def test_treat_devices_added_updated_admin_state_up_true(self):
agent = self.agent
mock_details = {'device': 'dev123',
'port_id': 'port123',
'network_id': 'net123',
'admin_state_up': True,
'network_type': 'vlan',
'segmentation_id': 100,
'physical_network': 'physnet1',
'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX}
mock_port_data = {
'port_id': mock_details['port_id'],
'device': mock_details['device']
}
agent.ext_manager = mock.Mock()
agent.plugin_rpc = mock.Mock()
agent.plugin_rpc.get_devices_details_list.return_value = [mock_details]
agent.mgr = mock.Mock()
agent.mgr.plug_interface.return_value = True
agent.mgr.ensure_port_admin_state = mock.Mock()
mock_segment = amb.NetworkSegment(mock_details['network_type'],
mock_details['physical_network'],
mock_details['segmentation_id'])
with mock.patch('neutron.plugins.ml2.drivers.agent.'
'_agent_manager_base.NetworkSegment',
return_value=mock_segment):
resync_needed = agent.treat_devices_added_updated(set(['tap1']))
self.assertFalse(resync_needed)
agent.rpc_callbacks.add_network.assert_called_with('net123',
mock_segment)
agent.mgr.plug_interface.assert_called_with(
'net123', mock_segment, 'dev123',
constants.DEVICE_OWNER_NETWORK_PREFIX)
self.assertTrue(agent.plugin_rpc.update_device_up.called)
self.assertTrue(agent.ext_manager.handle_port.called)
self.assertTrue(mock_port_data in agent.network_ports[
mock_details['network_id']]
)
def test_treat_devices_added_updated_prevent_arp_spoofing_true(self):
agent = self.agent
agent.prevent_arp_spoofing = True
mock_details = {'device': 'dev123',
'port_id': 'port123',
'network_id': 'net123',
'admin_state_up': True,
'network_type': 'vlan',
'segmentation_id': 100,
'physical_network': 'physnet1',
'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX}
agent.plugin_rpc = mock.Mock()
agent.plugin_rpc.get_devices_details_list.return_value = [mock_details]
agent.mgr = mock.Mock()
agent.mgr.plug_interface.return_value = True
with mock.patch.object(agent.mgr,
'setup_arp_spoofing_protection') as set_arp:
agent.treat_devices_added_updated(set(['tap1']))
set_arp.assert_called_with(mock_details['device'], mock_details)
def test_set_rpc_timeout(self):
self.agent.stop()
for rpc_client in (self.agent.plugin_rpc.client,
self.agent.sg_plugin_rpc.client,
self.agent.state_rpc.client):
self.assertEqual(cfg.CONF.AGENT.quitting_rpc_timeout,
rpc_client.timeout)
def test_set_rpc_timeout_no_value(self):
self.agent.quitting_rpc_timeout = None
with mock.patch.object(self.agent, 'set_rpc_timeout') as mock_set_rpc:
self.agent.stop()
self.assertFalse(mock_set_rpc.called)
def test_report_state_revived(self):
with mock.patch.object(self.agent.state_rpc,
"report_state") as report_st:
report_st.return_value = constants.AGENT_REVIVED
self.agent._report_state()
self.assertTrue(self.agent.fullsync)
def test_update_network_ports(self):
port_1_data = PORT_DATA
NETWORK_2_ID = 'fake_second_network'
port_2_data = {
'port_id': 'fake_port_2',
'device': 'fake_port_2_device_name'
}
self.agent.network_ports[NETWORK_ID].append(
port_1_data
)
self.agent.network_ports[NETWORK_ID].append(
port_2_data
)
#check update port:
self.agent._update_network_ports(
NETWORK_2_ID, port_2_data['port_id'], port_2_data['device']
)
self.assertTrue(
port_2_data not in self.agent.network_ports[NETWORK_ID]
)
self.assertTrue(
port_2_data in self.agent.network_ports[NETWORK_2_ID]
)
def test_clean_network_ports(self):
port_1_data = PORT_DATA
port_2_data = {
'port_id': 'fake_port_2',
'device': 'fake_port_2_device_name'
}
self.agent.network_ports[NETWORK_ID].append(
port_1_data
)
self.agent.network_ports[NETWORK_ID].append(
port_2_data
)
#check removing port from network when other ports are still there:
cleaned_port_id = self.agent._clean_network_ports(DEVICE_1)
self.assertTrue(
NETWORK_ID in self.agent.network_ports.keys()
)
self.assertTrue(
port_1_data not in self.agent.network_ports[NETWORK_ID]
)
self.assertTrue(
port_2_data in self.agent.network_ports[NETWORK_ID]
)
self.assertEqual(PORT_1, cleaned_port_id)
#and now remove last port from network:
cleaned_port_id = self.agent._clean_network_ports(
port_2_data['device']
)
self.assertTrue(
NETWORK_ID not in self.agent.network_ports.keys()
)
self.assertEqual(port_2_data['port_id'], cleaned_port_id)

View File

@ -106,450 +106,6 @@ class TestLinuxBridge(base.BaseTestCase):
self.assertTrue(vxlan_bridge_func.called)
class TestCommonAgentLoop(base.BaseTestCase):
def setUp(self):
super(TestCommonAgentLoop, self).setUp()
# disable setting up periodic state reporting
cfg.CONF.set_override('report_interval', 0, 'AGENT')
cfg.CONF.set_override('prevent_arp_spoofing', False, 'AGENT')
cfg.CONF.set_default('firewall_driver',
'neutron.agent.firewall.NoopFirewallDriver',
group='SECURITYGROUP')
cfg.CONF.set_override('local_ip', LOCAL_IP, 'VXLAN')
self.get_bridge_names_p = mock.patch.object(bridge_lib,
'get_bridge_names')
self.get_bridge_names = self.get_bridge_names_p.start()
self.get_bridge_names.return_value = ["br-int", "brq1"]
manager = mock.Mock()
manager.get_all_devices.return_value = []
manager.get_agent_configurations.return_value = {}
manager.get_rpc_consumers.return_value = []
with mock.patch.object(linuxbridge_neutron_agent.CommonAgentLoop,
'_validate_manager_class'), \
mock.patch.object(linuxbridge_neutron_agent.CommonAgentLoop,
'_validate_rpc_endpoints'):
self.agent = linuxbridge_neutron_agent.CommonAgentLoop(
manager, 0, 10, 'fake_agent', 'foo-binary')
with mock.patch.object(self.agent, "daemon_loop"):
self.agent.start()
def test_treat_devices_removed_with_existed_device(self):
agent = self.agent
agent.mgr.ensure_port_admin_state = mock.Mock()
devices = [DEVICE_1]
agent.network_ports[NETWORK_ID].append(PORT_DATA)
with mock.patch.object(agent.plugin_rpc,
"update_device_down") as fn_udd,\
mock.patch.object(agent.sg_agent,
"remove_devices_filter") as fn_rdf,\
mock.patch.object(agent.ext_manager,
"delete_port") as ext_mgr_delete_port:
fn_udd.return_value = {'device': DEVICE_1,
'exists': True}
with mock.patch.object(linuxbridge_neutron_agent.LOG,
'info') as log:
resync = agent.treat_devices_removed(devices)
self.assertEqual(2, log.call_count)
self.assertFalse(resync)
self.assertTrue(fn_udd.called)
self.assertTrue(fn_rdf.called)
self.assertTrue(ext_mgr_delete_port.called)
self.assertTrue(
PORT_DATA not in agent.network_ports[NETWORK_ID]
)
def test_treat_devices_removed_with_not_existed_device(self):
agent = self.agent
devices = [DEVICE_1]
agent.network_ports[NETWORK_ID].append(PORT_DATA)
with mock.patch.object(agent.plugin_rpc,
"update_device_down") as fn_udd,\
mock.patch.object(agent.sg_agent,
"remove_devices_filter") as fn_rdf,\
mock.patch.object(agent.ext_manager,
"delete_port") as ext_mgr_delete_port:
fn_udd.return_value = {'device': DEVICE_1,
'exists': False}
with mock.patch.object(linuxbridge_neutron_agent.LOG,
'debug') as log:
resync = agent.treat_devices_removed(devices)
self.assertEqual(1, log.call_count)
self.assertFalse(resync)
self.assertTrue(fn_udd.called)
self.assertTrue(fn_rdf.called)
self.assertTrue(ext_mgr_delete_port.called)
self.assertTrue(
PORT_DATA not in agent.network_ports[NETWORK_ID]
)
def test_treat_devices_removed_failed(self):
agent = self.agent
devices = [DEVICE_1]
agent.network_ports[NETWORK_ID].append(PORT_DATA)
with mock.patch.object(agent.plugin_rpc,
"update_device_down") as fn_udd,\
mock.patch.object(agent.sg_agent,
"remove_devices_filter") as fn_rdf,\
mock.patch.object(agent.ext_manager,
"delete_port") as ext_mgr_delete_port:
fn_udd.side_effect = Exception()
resync = agent.treat_devices_removed(devices)
self.assertTrue(resync)
self.assertTrue(fn_udd.called)
self.assertTrue(fn_rdf.called)
self.assertTrue(ext_mgr_delete_port.called)
self.assertTrue(
PORT_DATA not in agent.network_ports[NETWORK_ID]
)
def test_treat_devices_removed_with_prevent_arp_spoofing_true(self):
agent = self.agent
agent.prevent_arp_spoofing = True
agent._ensure_port_admin_state = mock.Mock()
devices = [DEVICE_1]
with mock.patch.object(agent.plugin_rpc,
"update_device_down") as fn_udd,\
mock.patch.object(agent.sg_agent,
"remove_devices_filter"):
fn_udd.return_value = {'device': DEVICE_1,
'exists': True}
with mock.patch.object(agent.mgr,
'delete_arp_spoofing_protection') as de_arp:
agent.treat_devices_removed(devices)
de_arp.assert_called_with(devices)
def _test_scan_devices(self, previous, updated,
fake_current, expected, sync):
self.agent.mgr = mock.Mock()
self.agent.mgr.get_all_devices.return_value = fake_current
self.agent.rpc_callbacks.get_and_clear_updated_devices.return_value =\
updated
results = self.agent.scan_devices(previous, sync)
self.assertEqual(expected, results)
def test_scan_devices_no_changes(self):
previous = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()}
fake_current = set([1, 2])
updated = set()
expected = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False)
def test_scan_devices_added_removed(self):
previous = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()}
fake_current = set([2, 3])
updated = set()
expected = {'current': set([2, 3]),
'updated': set(),
'added': set([3]),
'removed': set([1])}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False)
def test_scan_devices_removed_retried_on_sync(self):
previous = {'current': set([2, 3]),
'updated': set(),
'added': set(),
'removed': set([1])}
fake_current = set([2, 3])
updated = set()
expected = {'current': set([2, 3]),
'updated': set(),
'added': set([2, 3]),
'removed': set([1])}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=True)
def test_scan_devices_vanished_removed_on_sync(self):
previous = {'current': set([2, 3]),
'updated': set(),
'added': set(),
'removed': set([1])}
# Device 2 disappeared.
fake_current = set([3])
updated = set()
# Device 1 should be retried.
expected = {'current': set([3]),
'updated': set(),
'added': set([3]),
'removed': set([1, 2])}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=True)
def test_scan_devices_updated(self):
previous = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()}
fake_current = set([1, 2])
updated = set([1])
expected = {'current': set([1, 2]),
'updated': set([1]),
'added': set(),
'removed': set()}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False)
def test_scan_devices_updated_non_existing(self):
previous = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()}
fake_current = set([1, 2])
updated = set([3])
expected = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False)
def test_scan_devices_updated_deleted_concurrently(self):
previous = {
'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()
}
# Device 2 disappeared.
fake_current = set([1])
# Device 2 got an concurrent update via network_update
updated = set([2])
expected = {
'current': set([1]),
'updated': set(),
'added': set(),
'removed': set([2])
}
self._test_scan_devices(
previous, updated, fake_current, expected, sync=False
)
def test_scan_devices_updated_on_sync(self):
previous = {'current': set([1, 2]),
'updated': set([1]),
'added': set(),
'removed': set()}
fake_current = set([1, 2])
updated = set([2])
expected = {'current': set([1, 2]),
'updated': set([1, 2]),
'added': set([1, 2]),
'removed': set()}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=True)
def test_scan_devices_with_prevent_arp_spoofing_true(self):
self.agent.prevent_arp_spoofing = True
previous = None
fake_current = set([1, 2])
updated = set()
expected = {'current': set([1, 2]),
'updated': set(),
'added': set([1, 2]),
'removed': set()}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False)
self.agent.mgr.delete_unreferenced_arp_protection.assert_called_with(
fake_current)
def test_process_network_devices(self):
agent = self.agent
device_info = {'current': set(),
'added': set(['tap3', 'tap4']),
'updated': set(['tap2', 'tap3']),
'removed': set(['tap1'])}
agent.sg_agent.setup_port_filters = mock.Mock()
agent.treat_devices_added_updated = mock.Mock(return_value=False)
agent.treat_devices_removed = mock.Mock(return_value=False)
agent.process_network_devices(device_info)
agent.sg_agent.setup_port_filters.assert_called_with(
device_info['added'],
device_info['updated'])
agent.treat_devices_added_updated.assert_called_with(set(['tap2',
'tap3',
'tap4']))
agent.treat_devices_removed.assert_called_with(set(['tap1']))
def test_treat_devices_added_updated_no_local_interface(self):
agent = self.agent
mock_details = {'device': 'dev123',
'port_id': 'port123',
'network_id': 'net123',
'admin_state_up': True,
'network_type': 'vlan',
'segmentation_id': 100,
'physical_network': 'physnet1',
'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX}
agent.ext_manager = mock.Mock()
agent.plugin_rpc = mock.Mock()
agent.plugin_rpc.get_devices_details_list.return_value = [mock_details]
agent.mgr = mock.Mock()
agent.mgr.plug_interface.return_value = False
agent.mgr.ensure_port_admin_state = mock.Mock()
agent.treat_devices_added_updated(set(['tap1']))
self.assertFalse(agent.mgr.ensure_port_admin_state.called)
def test_treat_devices_added_updated_admin_state_up_true(self):
agent = self.agent
mock_details = {'device': 'dev123',
'port_id': 'port123',
'network_id': 'net123',
'admin_state_up': True,
'network_type': 'vlan',
'segmentation_id': 100,
'physical_network': 'physnet1',
'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX}
mock_port_data = {
'port_id': mock_details['port_id'],
'device': mock_details['device']
}
agent.ext_manager = mock.Mock()
agent.plugin_rpc = mock.Mock()
agent.plugin_rpc.get_devices_details_list.return_value = [mock_details]
agent.mgr = mock.Mock()
agent.mgr.plug_interface.return_value = True
agent.mgr.ensure_port_admin_state = mock.Mock()
mock_segment = amb.NetworkSegment(mock_details['network_type'],
mock_details['physical_network'],
mock_details['segmentation_id'])
with mock.patch('neutron.plugins.ml2.drivers.agent.'
'_agent_manager_base.NetworkSegment',
return_value=mock_segment):
resync_needed = agent.treat_devices_added_updated(set(['tap1']))
self.assertFalse(resync_needed)
agent.rpc_callbacks.add_network.assert_called_with('net123',
mock_segment)
agent.mgr.plug_interface.assert_called_with(
'net123', mock_segment, 'dev123',
constants.DEVICE_OWNER_NETWORK_PREFIX)
self.assertTrue(agent.plugin_rpc.update_device_up.called)
self.assertTrue(agent.ext_manager.handle_port.called)
self.assertTrue(mock_port_data in agent.network_ports[
mock_details['network_id']]
)
def test_treat_devices_added_updated_prevent_arp_spoofing_true(self):
agent = self.agent
agent.prevent_arp_spoofing = True
mock_details = {'device': 'dev123',
'port_id': 'port123',
'network_id': 'net123',
'admin_state_up': True,
'network_type': 'vlan',
'segmentation_id': 100,
'physical_network': 'physnet1',
'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX}
agent.plugin_rpc = mock.Mock()
agent.plugin_rpc.get_devices_details_list.return_value = [mock_details]
agent.mgr = mock.Mock()
agent.mgr.plug_interface.return_value = True
with mock.patch.object(agent.mgr,
'setup_arp_spoofing_protection') as set_arp:
agent.treat_devices_added_updated(set(['tap1']))
set_arp.assert_called_with(mock_details['device'], mock_details)
def test_set_rpc_timeout(self):
self.agent.stop()
for rpc_client in (self.agent.plugin_rpc.client,
self.agent.sg_plugin_rpc.client,
self.agent.state_rpc.client):
self.assertEqual(cfg.CONF.AGENT.quitting_rpc_timeout,
rpc_client.timeout)
def test_set_rpc_timeout_no_value(self):
self.agent.quitting_rpc_timeout = None
with mock.patch.object(self.agent, 'set_rpc_timeout') as mock_set_rpc:
self.agent.stop()
self.assertFalse(mock_set_rpc.called)
def test_report_state_revived(self):
with mock.patch.object(self.agent.state_rpc,
"report_state") as report_st:
report_st.return_value = constants.AGENT_REVIVED
self.agent._report_state()
self.assertTrue(self.agent.fullsync)
def test_update_network_ports(self):
port_1_data = PORT_DATA
NETWORK_2_ID = 'fake_second_network'
port_2_data = {
'port_id': 'fake_port_2',
'device': 'fake_port_2_device_name'
}
self.agent.network_ports[NETWORK_ID].append(
port_1_data
)
self.agent.network_ports[NETWORK_ID].append(
port_2_data
)
#check update port:
self.agent._update_network_ports(
NETWORK_2_ID, port_2_data['port_id'], port_2_data['device']
)
self.assertTrue(
port_2_data not in self.agent.network_ports[NETWORK_ID]
)
self.assertTrue(
port_2_data in self.agent.network_ports[NETWORK_2_ID]
)
def test_clean_network_ports(self):
port_1_data = PORT_DATA
port_2_data = {
'port_id': 'fake_port_2',
'device': 'fake_port_2_device_name'
}
self.agent.network_ports[NETWORK_ID].append(
port_1_data
)
self.agent.network_ports[NETWORK_ID].append(
port_2_data
)
#check removing port from network when other ports are still there:
cleaned_port_id = self.agent._clean_network_ports(DEVICE_1)
self.assertTrue(
NETWORK_ID in self.agent.network_ports.keys()
)
self.assertTrue(
port_1_data not in self.agent.network_ports[NETWORK_ID]
)
self.assertTrue(
port_2_data in self.agent.network_ports[NETWORK_ID]
)
self.assertEqual(PORT_1, cleaned_port_id)
#and now remove last port from network:
cleaned_port_id = self.agent._clean_network_ports(
port_2_data['device']
)
self.assertTrue(
NETWORK_ID not in self.agent.network_ports.keys()
)
self.assertEqual(port_2_data['port_id'], cleaned_port_id)
class TestLinuxBridgeManager(base.BaseTestCase):
def setUp(self):
super(TestLinuxBridgeManager, self).setUp()