LinuxBridge: update status according to admin_state_up

Fixes bug 1099065

In addition to this the agent will only treat ports that exist on the agent.

Change-Id: I927649a45a860421ef0d825015516000475ad08d
This commit is contained in:
Gary Kotton 2013-01-13 09:16:29 +00:00
parent fe0ae7a7f0
commit b56c2c9983
5 changed files with 116 additions and 86 deletions

@ -74,6 +74,12 @@ class PluginApi(proxy.RpcProxy):
agent_id=agent_id), agent_id=agent_id),
topic=self.topic) topic=self.topic)
def update_device_up(self, context, device, agent_id):
return self.call(context,
self.make_msg('update_device_up', device=device,
agent_id=agent_id),
topic=self.topic)
def tunnel_sync(self, context, tunnel_ip): def tunnel_sync(self, context, tunnel_ip):
return self.call(context, return self.call(context,
self.make_msg('tunnel_sync', tunnel_ip=tunnel_ip), self.make_msg('tunnel_sync', tunnel_ip=tunnel_ip),

@ -62,12 +62,16 @@ VLAN_BINDINGS = "vlan_bindings"
PORT_BINDINGS = "port_bindings" PORT_BINDINGS = "port_bindings"
class LinuxBridge: class LinuxBridgeManager:
def __init__(self, interface_mappings, root_helper): def __init__(self, interface_mappings, root_helper):
self.interface_mappings = interface_mappings self.interface_mappings = interface_mappings
self.root_helper = root_helper self.root_helper = root_helper
self.ip = ip_lib.IPWrapper(self.root_helper) self.ip = ip_lib.IPWrapper(self.root_helper)
self.udev = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(self.udev)
monitor.filter_by('net')
def device_exists(self, device): def device_exists(self, device):
"""Check if ethernet device exists.""" """Check if ethernet device exists."""
try: try:
@ -391,38 +395,76 @@ class LinuxBridge:
return return
LOG.debug("Done deleting subinterface %s" % interface) LOG.debug("Done deleting subinterface %s" % interface)
def update_devices(self, registered_devices):
devices = self.udev_get_tap_devices()
if devices == registered_devices:
return
added = devices - registered_devices
removed = registered_devices - devices
return {'current': devices,
'added': added,
'removed': removed}
def udev_get_tap_devices(self):
devices = set()
for device in self.udev.list_devices(subsystem='net'):
name = self.udev_get_name(device)
if self.is_tap_device(name):
devices.add(name)
return devices
def is_tap_device(self, name):
return name.startswith(TAP_INTERFACE_PREFIX)
def udev_get_name(self, device):
return device.sys_name
class LinuxBridgeRpcCallbacks(): class LinuxBridgeRpcCallbacks():
# Set RPC API version to 1.0 by default. # Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0' RPC_API_VERSION = '1.0'
def __init__(self, context, linux_br): def __init__(self, context, agent):
self.context = context self.context = context
self.linux_br = linux_br self.agent = agent
def network_delete(self, context, **kwargs): def network_delete(self, context, **kwargs):
LOG.debug("network_delete received") LOG.debug("network_delete received")
network_id = kwargs.get('network_id') network_id = kwargs.get('network_id')
bridge_name = self.linux_br.get_bridge_name(network_id) bridge_name = self.agent.get_bridge_name(network_id)
LOG.debug("Delete %s", bridge_name) LOG.debug("Delete %s", bridge_name)
self.linux_br.delete_vlan_bridge(bridge_name) self.agent.delete_vlan_bridge(bridge_name)
def port_update(self, context, **kwargs): def port_update(self, context, **kwargs):
LOG.debug("port_update received") LOG.debug(_("port_update received"))
# Check port exists on node
port = kwargs.get('port') port = kwargs.get('port')
tap_device_name = self.agent.br_mgr.get_tap_device_name(port['id'])
devices = self.agent.br_mgr.udev_get_tap_devices()
if not tap_device_name in devices:
return
if port['admin_state_up']: if port['admin_state_up']:
vlan_id = kwargs.get('vlan_id') vlan_id = kwargs.get('vlan_id')
physical_network = kwargs.get('physical_network') physical_network = kwargs.get('physical_network')
# create the networking for the port # create the networking for the port
self.linux_br.add_interface(port['network_id'], self.agent.br_mgr.add_interface(port['network_id'],
physical_network, physical_network,
vlan_id, vlan_id,
port['id']) port['id'])
# update plugin about port status
self.agent.plugin_rpc.update_device_up(self.context,
tap_device_name,
self.agent.agent_id)
else: else:
bridge_name = self.linux_br.get_bridge_name(port['network_id']) bridge_name = self.agent.br_mgr.get_bridge_name(
tap_device_name = self.linux_br.get_tap_device_name(port['id']) port['network_id'])
self.linux_br.remove_interface(bridge_name, tap_device_name) self.agent.br_mgr.remove_interface(bridge_name, tap_device_name)
# update plugin about port status
self.agent.plugin_rpc.update_device_down(self.context,
tap_device_name,
self.agent.agent_id)
def create_rpc_dispatcher(self): def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager. '''Get the rpc dispatcher for this manager.
@ -445,18 +487,18 @@ class LinuxBridgeQuantumAgentDB:
self.db_connection_url = db_connection_url self.db_connection_url = db_connection_url
def setup_linux_bridge(self, interface_mappings): def setup_linux_bridge(self, interface_mappings):
self.linux_br = LinuxBridge(interface_mappings, self.root_helper) self.br_mgr = LinuxBridgeManager(interface_mappings, self.root_helper)
def process_port_binding(self, network_id, interface_id, def process_port_binding(self, network_id, interface_id,
physical_network, vlan_id): physical_network, vlan_id):
return self.linux_br.add_interface(network_id, return self.br_mgr.add_interface(network_id,
physical_network, vlan_id, physical_network, vlan_id,
interface_id) interface_id)
def remove_port_binding(self, network_id, interface_id): def remove_port_binding(self, network_id, interface_id):
bridge_name = self.linux_br.get_bridge_name(network_id) bridge_name = self.br_mgr.get_bridge_name(network_id)
tap_device_name = self.linux_br.get_tap_device_name(interface_id) tap_device_name = self.br_mgr.get_tap_device_name(interface_id)
return self.linux_br.remove_interface(bridge_name, tap_device_name) return self.br_mgr.remove_interface(bridge_name, tap_device_name)
def process_unplugged_interfaces(self, plugged_interfaces): def process_unplugged_interfaces(self, plugged_interfaces):
""" """
@ -468,44 +510,29 @@ class LinuxBridgeQuantumAgentDB:
plugged_tap_device_names = [] plugged_tap_device_names = []
plugged_gateway_device_names = [] plugged_gateway_device_names = []
for interface in plugged_interfaces: for interface in plugged_interfaces:
if interface.startswith(GATEWAY_INTERFACE_PREFIX): tap_device_name = self.br_mgr.get_tap_device_name(interface)
""" plugged_tap_device_names.append(tap_device_name)
The name for the gateway devices is set by the linux net
driver, hence we use the name as is
"""
plugged_gateway_device_names.append(interface)
else:
tap_device_name = self.linux_br.get_tap_device_name(interface)
plugged_tap_device_names.append(tap_device_name)
LOG.debug("plugged tap device names %s" % plugged_tap_device_names) LOG.debug("plugged tap device names %s" % plugged_tap_device_names)
for tap_device in self.linux_br.get_all_tap_devices(): for tap_device in self.br_mgr.get_all_tap_devices():
if tap_device not in plugged_tap_device_names: if tap_device not in plugged_tap_device_names:
current_bridge_name = ( current_bridge_name = (
self.linux_br.get_bridge_for_tap_device(tap_device)) self.br_mgr.get_bridge_for_tap_device(tap_device))
if current_bridge_name: if current_bridge_name:
self.linux_br.remove_interface(current_bridge_name, self.br_mgr.remove_interface(current_bridge_name,
tap_device) tap_device)
for gw_device in self.linux_br.get_all_gateway_devices():
if gw_device not in plugged_gateway_device_names:
current_bridge_name = (
self.linux_br.get_bridge_for_tap_device(gw_device))
if current_bridge_name:
self.linux_br.remove_interface(current_bridge_name,
gw_device)
def process_deleted_networks(self, vlan_bindings): def process_deleted_networks(self, vlan_bindings):
current_quantum_networks = vlan_bindings.keys() current_quantum_networks = vlan_bindings.keys()
current_quantum_bridge_names = [] current_quantum_bridge_names = []
for network in current_quantum_networks: for network in current_quantum_networks:
bridge_name = self.linux_br.get_bridge_name(network) bridge_name = self.br_mgr.get_bridge_name(network)
current_quantum_bridge_names.append(bridge_name) current_quantum_bridge_names.append(bridge_name)
quantum_bridges_on_this_host = self.linux_br.get_all_quantum_bridges() quantum_bridges_on_this_host = self.br_mgr.get_all_quantum_bridges()
for bridge in quantum_bridges_on_this_host: for bridge in quantum_bridges_on_this_host:
if bridge not in current_quantum_bridge_names: if bridge not in current_quantum_bridge_names:
self.linux_br.delete_vlan_bridge(bridge) self.br_mgr.delete_vlan_bridge(bridge)
def manage_networks_on_host(self, db, def manage_networks_on_host(self, db,
old_vlan_bindings, old_vlan_bindings,
@ -638,7 +665,7 @@ class LinuxBridgeQuantumAgentRPC:
is_admin=False) is_admin=False)
# Handle updates from service # Handle updates from service
self.callbacks = LinuxBridgeRpcCallbacks(self.context, self.callbacks = LinuxBridgeRpcCallbacks(self.context,
self.linux_br) self.br_mgr)
self.dispatcher = self.callbacks.create_rpc_dispatcher() self.dispatcher = self.callbacks.create_rpc_dispatcher()
# Define the listening consumers for the agent # Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE], consumers = [[topics.PORT, topics.UPDATE],
@ -646,41 +673,14 @@ class LinuxBridgeQuantumAgentRPC:
self.connection = agent_rpc.create_consumers(self.dispatcher, self.connection = agent_rpc.create_consumers(self.dispatcher,
self.topic, self.topic,
consumers) consumers)
self.udev = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(self.udev)
monitor.filter_by('net')
def setup_linux_bridge(self, interface_mappings): def setup_linux_bridge(self, interface_mappings):
self.linux_br = LinuxBridge(interface_mappings, self.root_helper) self.br_mgr = LinuxBridgeManager(interface_mappings, self.root_helper)
def remove_port_binding(self, network_id, interface_id): def remove_port_binding(self, network_id, interface_id):
bridge_name = self.linux_br.get_bridge_name(network_id) bridge_name = self.br_mgr.get_bridge_name(network_id)
tap_device_name = self.linux_br.get_tap_device_name(interface_id) tap_device_name = self.br_mgr.get_tap_device_name(interface_id)
return self.linux_br.remove_interface(bridge_name, tap_device_name) return self.br_mgr.remove_interface(bridge_name, tap_device_name)
def update_devices(self, registered_devices):
devices = self.udev_get_all_tap_devices()
if devices == registered_devices:
return
added = devices - registered_devices
removed = registered_devices - devices
return {'current': devices,
'added': added,
'removed': removed}
def udev_get_all_tap_devices(self):
devices = set()
for device in self.udev.list_devices(subsystem='net'):
name = self.udev_get_name(device)
if self.is_tap_device(name):
devices.add(name)
return devices
def is_tap_device(self, name):
return name.startswith(TAP_INTERFACE_PREFIX)
def udev_get_name(self, device):
return device.sys_name
def process_network_devices(self, device_info): def process_network_devices(self, device_info):
resync_a = False resync_a = False
@ -708,10 +708,10 @@ class LinuxBridgeQuantumAgentRPC:
LOG.info("Port %s updated. Details: %s", device, details) LOG.info("Port %s updated. Details: %s", device, details)
if details['admin_state_up']: if details['admin_state_up']:
# create the networking for the port # create the networking for the port
self.linux_br.add_interface(details['network_id'], self.br_mgr.add_interface(details['network_id'],
details['physical_network'], details['physical_network'],
details['vlan_id'], details['vlan_id'],
details['port_id']) details['port_id'])
else: else:
self.remove_port_binding(details['network_id'], self.remove_port_binding(details['network_id'],
details['port_id']) details['port_id'])
@ -750,7 +750,7 @@ class LinuxBridgeQuantumAgentRPC:
devices.clear() devices.clear()
sync = False sync = False
device_info = self.update_devices(devices) device_info = self.br_mgr.update_devices(devices)
# notify plugin about device deltas # notify plugin about device deltas
if device_info: if device_info:

@ -72,8 +72,10 @@ class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin):
'network_id': port['network_id'], 'network_id': port['network_id'],
'port_id': port['id'], 'port_id': port['id'],
'admin_state_up': port['admin_state_up']} 'admin_state_up': port['admin_state_up']}
# Set the port status to UP new_status = (q_const.PORT_STATUS_ACTIVE if port['admin_state_up']
db.set_port_status(port['id'], q_const.PORT_STATUS_ACTIVE) else q_const.PORT_STATUS_DOWN)
if port['status'] != new_status:
db.set_port_status(port['id'], new_status)
else: else:
entry = {'device': device} entry = {'device': device}
LOG.debug("%s can not be found in database", device) LOG.debug("%s can not be found in database", device)
@ -89,14 +91,29 @@ class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin):
if port: if port:
entry = {'device': device, entry = {'device': device,
'exists': True} 'exists': True}
# Set port status to DOWN if port['status'] != q_const.PORT_STATUS_DOWN:
db.set_port_status(port['id'], q_const.PORT_STATUS_DOWN) # Set port status to DOWN
db.set_port_status(port['id'], q_const.PORT_STATUS_DOWN)
else: else:
entry = {'device': device, entry = {'device': device,
'exists': False} 'exists': False}
LOG.debug("%s can not be found in database", device) LOG.debug("%s can not be found in database", device)
return entry return entry
def update_device_up(self, rpc_context, **kwargs):
"""Device is up on agent"""
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
LOG.debug(_("Device %(device)s up %(agent_id)s"),
locals())
port = self.get_port_from_device(device)
if port:
if port['status'] != q_const.PORT_STATUS_ACTIVE:
# Set port status to ACTIVE
db.set_port_status(port['id'], q_const.PORT_STATUS_ACTIVE)
else:
LOG.debug(_("%s can not be found in database"), device)
class AgentNotifierApi(proxy.RpcProxy): class AgentNotifierApi(proxy.RpcProxy):
'''Agent side of the linux bridge rpc API. '''Agent side of the linux bridge rpc API.

@ -30,7 +30,7 @@ class TestLinuxBridge(unittest.TestCase):
interface_mappings = {'physnet1': 'eth1'} interface_mappings = {'physnet1': 'eth1'}
root_helper = cfg.CONF.AGENT.root_helper root_helper = cfg.CONF.AGENT.root_helper
self.linux_bridge = linuxbridge_quantum_agent.LinuxBridge( self.linux_bridge = linuxbridge_quantum_agent.LinuxBridgeManager(
interface_mappings, root_helper) interface_mappings, root_helper)
def test_ensure_physical_in_bridge_invalid(self): def test_ensure_physical_in_bridge_invalid(self):

@ -91,3 +91,10 @@ class rpcApiTestCase(unittest2.TestCase):
'update_device_down', rpc_method='call', 'update_device_down', rpc_method='call',
device='fake_device', device='fake_device',
agent_id='fake_agent_id') agent_id='fake_agent_id')
def test_update_device_up(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_lb_api(rpcapi, topics.PLUGIN,
'update_device_up', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')