add LACP application
this application provides the simple example of link aggregation using LACP. the module "lacplib" controls exchange of LACP packets and watches the status of the slave i/fs. the status changes if the i/fs went into a LAG or timeout to exchange LACP occurred. the module sends a "EventSlaveStateChanged" event when the status changed. the module "simple_switch_lacp" is a variation of "simple_switch". the switch receives the "EventPacketIn" event instead of the "EventOFPPacketIn" event from the module "lacplib" in order to except LACP. when the module received "EventSlaveStateChanged" event, the module resets flow entries. to run: ryu-manager ryu/app/simple_switch_lacp.py Signed-off-by: Yuichi Ito <ito.yuichi0@gmal.com> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
parent
46d7a9805c
commit
2ca49a222b
115
ryu/app/simple_switch_lacp.py
Normal file
115
ryu/app/simple_switch_lacp.py
Normal file
@ -0,0 +1,115 @@
|
||||
# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import struct
|
||||
|
||||
from ryu.base import app_manager
|
||||
from ryu.controller.handler import set_ev_cls
|
||||
from ryu.ofproto import ofproto_v1_0
|
||||
from ryu.lib import addrconv
|
||||
from ryu.lib import lacplib
|
||||
from ryu.lib.dpid import str_to_dpid
|
||||
|
||||
|
||||
class SimpleSwitchLacp(app_manager.RyuApp):
|
||||
OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION]
|
||||
_CONTEXTS = {'lacplib': lacplib.LacpLib}
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(SimpleSwitchLacp, self).__init__(*args, **kwargs)
|
||||
self.mac_to_port = {}
|
||||
self._lacp = kwargs['lacplib']
|
||||
# in this sample application, bonding i/fs of the switchs
|
||||
# shall be set up as follows:
|
||||
# - the port 1 and 2 of the datapath 1 face the slave i/fs.
|
||||
# - the port 3, 4 and 5 of the datapath 1 face the others.
|
||||
# - the port 1 and 2 of the datapath 2 face the others.
|
||||
self._lacp.add(
|
||||
dpid=str_to_dpid('0000000000000001'), ports=[1, 2])
|
||||
self._lacp.add(
|
||||
dpid=str_to_dpid('0000000000000001'), ports=[3, 4, 5])
|
||||
self._lacp.add(
|
||||
dpid=str_to_dpid('0000000000000002'), ports=[1, 2])
|
||||
|
||||
def add_flow(self, datapath, in_port, dst, actions):
|
||||
ofproto = datapath.ofproto
|
||||
parser = datapath.ofproto_parser
|
||||
|
||||
match = parser.OFPMatch(in_port=in_port,
|
||||
dl_dst=addrconv.mac.text_to_bin(dst))
|
||||
mod = parser.OFPFlowMod(
|
||||
datapath=datapath, match=match, cookie=0,
|
||||
command=ofproto.OFPFC_ADD, actions=actions)
|
||||
datapath.send_msg(mod)
|
||||
|
||||
def del_flow(self, datapath, dst):
|
||||
ofproto = datapath.ofproto
|
||||
parser = datapath.ofproto_parser
|
||||
|
||||
match = parser.OFPMatch(dl_dst=addrconv.mac.text_to_bin(dst))
|
||||
mod = parser.OFPFlowMod(
|
||||
datapath=datapath, match=match, cookie=0,
|
||||
command=ofproto.OFPFC_DELETE)
|
||||
datapath.send_msg(mod)
|
||||
|
||||
@set_ev_cls(lacplib.EventPacketIn, lacplib.LAG_EV_DISPATCHER)
|
||||
def _packet_in_handler(self, ev):
|
||||
msg = ev.msg
|
||||
datapath = msg.datapath
|
||||
ofproto = datapath.ofproto
|
||||
|
||||
(dst_, src_, _eth_type) = struct.unpack_from(
|
||||
'!6s6sH', buffer(msg.data), 0)
|
||||
src = addrconv.mac.bin_to_text(src_)
|
||||
dst = addrconv.mac.bin_to_text(dst_)
|
||||
|
||||
dpid = datapath.id
|
||||
self.mac_to_port.setdefault(dpid, {})
|
||||
|
||||
self.logger.info("packet in %s %s %s %s",
|
||||
dpid, src, dst, msg.in_port)
|
||||
|
||||
# learn a mac address to avoid FLOOD next time.
|
||||
self.mac_to_port[dpid][src] = msg.in_port
|
||||
|
||||
if dst in self.mac_to_port[dpid]:
|
||||
out_port = self.mac_to_port[dpid][dst]
|
||||
else:
|
||||
out_port = ofproto.OFPP_FLOOD
|
||||
|
||||
actions = [datapath.ofproto_parser.OFPActionOutput(out_port)]
|
||||
|
||||
# install a flow to avoid packet_in next time
|
||||
if out_port != ofproto.OFPP_FLOOD:
|
||||
self.add_flow(datapath, msg.in_port, dst, actions)
|
||||
|
||||
out = datapath.ofproto_parser.OFPPacketOut(
|
||||
datapath=datapath, buffer_id=msg.buffer_id, in_port=msg.in_port,
|
||||
actions=actions)
|
||||
datapath.send_msg(out)
|
||||
|
||||
@set_ev_cls(lacplib.EventSlaveStateChanged, lacplib.LAG_EV_DISPATCHER)
|
||||
def _slave_state_changed_handler(self, ev):
|
||||
datapath = ev.datapath
|
||||
dpid = datapath.id
|
||||
port_no = ev.port
|
||||
enabled = ev.enabled
|
||||
self.logger.info("slave state changed port: %d enabled: %s",
|
||||
port_no, enabled)
|
||||
if dpid in self.mac_to_port:
|
||||
for mac in self.mac_to_port[dpid]:
|
||||
self.del_flow(datapath, mac)
|
||||
del self.mac_to_port[dpid]
|
||||
self.mac_to_port.setdefault(dpid, {})
|
302
ryu/lib/lacplib.py
Normal file
302
ryu/lib/lacplib.py
Normal file
@ -0,0 +1,302 @@
|
||||
# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from ryu.base import app_manager
|
||||
from ryu.controller import event
|
||||
from ryu.controller import ofp_event
|
||||
from ryu.controller.handler import MAIN_DISPATCHER
|
||||
from ryu.controller.handler import set_ev_cls
|
||||
from ryu.ofproto import ether
|
||||
from ryu.lib import addrconv
|
||||
from ryu.lib.dpid import dpid_to_str
|
||||
from ryu.lib.packet import packet
|
||||
from ryu.lib.packet import ethernet
|
||||
from ryu.lib.packet import slow
|
||||
|
||||
|
||||
LAG_EV_DISPATCHER = "lacplib"
|
||||
|
||||
|
||||
class EventPacketIn(event.EventBase):
|
||||
"""a PacketIn event class using except LACP."""
|
||||
def __init__(self, msg):
|
||||
"""initialization."""
|
||||
super(EventPacketIn, self).__init__()
|
||||
self.msg = msg
|
||||
|
||||
|
||||
class EventSlaveStateChanged(event.EventBase):
|
||||
"""a event class that notifies the changes of the statuses of the
|
||||
slave i/fs."""
|
||||
def __init__(self, datapath, port, enabled):
|
||||
"""initialization."""
|
||||
super(EventSlaveStateChanged, self).__init__()
|
||||
self.datapath = datapath
|
||||
self.port = port
|
||||
self.enabled = enabled
|
||||
|
||||
|
||||
class LacpLib(app_manager.RyuApp):
|
||||
"""LACP exchange library. this works only in a PASSIVE mode."""
|
||||
|
||||
#-------------------------------------------------------------------
|
||||
# PUBLIC METHODS
|
||||
#-------------------------------------------------------------------
|
||||
def __init__(self):
|
||||
"""initialization."""
|
||||
super(LacpLib, self).__init__()
|
||||
self.name = 'lacplib'
|
||||
self.bonds = []
|
||||
self._set_logger()
|
||||
|
||||
def add(self, dpid, ports):
|
||||
"""add a setting of a bonding i/f.
|
||||
'add' method takes the correspondig args in this order.
|
||||
|
||||
========= =====================================================
|
||||
Attribute Description
|
||||
========= =====================================================
|
||||
dpid an integer value that means datapath id.
|
||||
|
||||
ports a list of integer values that means the ports face
|
||||
with the slave i/fs.
|
||||
========= =====================================================
|
||||
|
||||
if you want to use multi LAG, call 'add' method more than once.
|
||||
"""
|
||||
assert isinstance(ports, list)
|
||||
assert 2 <= len(ports)
|
||||
ifs = {}
|
||||
for port in ports:
|
||||
ifs[port] = {'enabled': False, 'timeout': 0}
|
||||
bond = {}
|
||||
bond[dpid] = ifs
|
||||
self.bonds.append(bond)
|
||||
|
||||
#-------------------------------------------------------------------
|
||||
# PUBLIC METHODS ( EVENT HANDLERS )
|
||||
#-------------------------------------------------------------------
|
||||
@set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
|
||||
def packet_in_handler(self, evt):
|
||||
"""PacketIn event handler. when the received packet was LACP,
|
||||
proceed it. otherwise, send a event."""
|
||||
req_pkt = packet.Packet(evt.msg.data)
|
||||
if slow.lacp in req_pkt:
|
||||
(req_lacp, ) = req_pkt.get_protocols(slow.lacp)
|
||||
(req_eth, ) = req_pkt.get_protocols(ethernet.ethernet)
|
||||
self._do_lacp(req_lacp, req_eth.src, evt.msg)
|
||||
else:
|
||||
self.send_event_to_observers(EventPacketIn(evt.msg))
|
||||
|
||||
@set_ev_cls(ofp_event.EventOFPFlowRemoved, MAIN_DISPATCHER)
|
||||
def flow_removed_handler(self, evt):
|
||||
"""FlowRemoved event handler. when the removed flow entry was
|
||||
for LACP, set the status of the slave i/f to enabled, and
|
||||
send a event that LACP exchange timeout has occurred."""
|
||||
msg = evt.msg
|
||||
datapath = msg.datapath
|
||||
dpid = datapath.id
|
||||
match = msg.match
|
||||
port = match.in_port
|
||||
if ether.ETH_TYPE_SLOW != match.dl_type:
|
||||
return
|
||||
self.logger.info(
|
||||
"SW=%s PORT=%d LACP exchange timeout has occurred.",
|
||||
dpid_to_str(dpid), port)
|
||||
self._set_slave_enabled(dpid, port, False)
|
||||
self._set_slave_timeout(dpid, port, 0)
|
||||
self.send_event_to_observers(
|
||||
EventSlaveStateChanged(datapath, port, False))
|
||||
|
||||
#-------------------------------------------------------------------
|
||||
# PRIVATE METHODS ( RELATED TO LACP )
|
||||
#-------------------------------------------------------------------
|
||||
def _do_lacp(self, req_lacp, src, msg):
|
||||
"""packet-in process when the received packet is LACP."""
|
||||
datapath = msg.datapath
|
||||
dpid = datapath.id
|
||||
port = msg.in_port
|
||||
ofproto = datapath.ofproto
|
||||
parser = datapath.ofproto_parser
|
||||
|
||||
self.logger.info("SW=%s PORT=%d LACP received.",
|
||||
dpid_to_str(dpid), port)
|
||||
self.logger.debug(str(req_lacp))
|
||||
|
||||
# when LACP arrived at disabled port, update the status of
|
||||
# the slave i/f and reset all flow entries except for LACP.
|
||||
if not self._get_slave_enabled(dpid, port):
|
||||
self.logger.info(
|
||||
"SW=%s PORT=%d the slave i/f has just been up.",
|
||||
dpid_to_str(dpid), port)
|
||||
self._set_slave_enabled(dpid, port, True)
|
||||
self.send_event_to_observers(
|
||||
EventSlaveStateChanged(datapath, port, True))
|
||||
|
||||
# set the idle_timeout time using the actor state of the
|
||||
# received packet.
|
||||
if req_lacp.LACP_STATE_SHORT_TIMEOUT == \
|
||||
req_lacp.actor_state_timeout:
|
||||
idle_timeout = req_lacp.SHORT_TIMEOUT_TIME
|
||||
else:
|
||||
idle_timeout = req_lacp.LONG_TIMEOUT_TIME
|
||||
|
||||
# when the timeout time has changed, update the timeout time of
|
||||
# the slave i/f and re-enter a flow entry for the packet from
|
||||
# the slave i/f with idle_timeout.
|
||||
if idle_timeout != self._get_slave_timeout(dpid, port):
|
||||
self.logger.info(
|
||||
"SW=%s PORT=%d the timeout time has changed.",
|
||||
dpid_to_str(dpid), port)
|
||||
self._set_slave_timeout(dpid, port, idle_timeout)
|
||||
self._set_flow_entry_packet_in(src, port, idle_timeout,
|
||||
msg)
|
||||
|
||||
# create a response packet.
|
||||
res_pkt = self._create_response(datapath, port, req_lacp)
|
||||
|
||||
# packet-out the response packet.
|
||||
out_port = ofproto.OFPP_IN_PORT
|
||||
actions = [parser.OFPActionOutput(out_port)]
|
||||
out = datapath.ofproto_parser.OFPPacketOut(
|
||||
datapath=datapath, buffer_id=ofproto.OFP_NO_BUFFER,
|
||||
data=res_pkt.data, in_port=port, actions=actions)
|
||||
datapath.send_msg(out)
|
||||
|
||||
def _create_response(self, datapath, port, req):
|
||||
"""create a packet including LACP."""
|
||||
src = datapath.ports[port].hw_addr
|
||||
res_ether = ethernet.ethernet(
|
||||
slow.SLOW_PROTOCOL_MULTICAST, src, ether.ETH_TYPE_SLOW)
|
||||
res_lacp = self._create_lacp(datapath, port, req)
|
||||
res_pkt = packet.Packet()
|
||||
res_pkt.add_protocol(res_ether)
|
||||
res_pkt.add_protocol(res_lacp)
|
||||
res_pkt.serialize()
|
||||
return res_pkt
|
||||
|
||||
def _create_lacp(self, datapath, port, req):
|
||||
"""create a LACP packet."""
|
||||
actor_system = datapath.ports[datapath.ofproto.OFPP_LOCAL].hw_addr
|
||||
res = slow.lacp(
|
||||
actor_system_priority=0xffff,
|
||||
actor_system=actor_system,
|
||||
actor_key=req.actor_key,
|
||||
actor_port_priority=0xff,
|
||||
actor_port=port,
|
||||
actor_state_activity=req.LACP_STATE_PASSIVE,
|
||||
actor_state_timeout=req.actor_state_timeout,
|
||||
actor_state_aggregation=req.actor_state_aggregation,
|
||||
actor_state_synchronization=req.actor_state_synchronization,
|
||||
actor_state_collecting=req.actor_state_collecting,
|
||||
actor_state_distributing=req.actor_state_distributing,
|
||||
actor_state_defaulted=req.LACP_STATE_OPERATIONAL_PARTNER,
|
||||
actor_state_expired=req.LACP_STATE_NOT_EXPIRED,
|
||||
partner_system_priority=req.actor_system_priority,
|
||||
partner_system=req.actor_system,
|
||||
partner_key=req.actor_key,
|
||||
partner_port_priority=req.actor_port_priority,
|
||||
partner_port=req.actor_port,
|
||||
partner_state_activity=req.actor_state_activity,
|
||||
partner_state_timeout=req.actor_state_timeout,
|
||||
partner_state_aggregation=req.actor_state_aggregation,
|
||||
partner_state_synchronization=req.actor_state_synchronization,
|
||||
partner_state_collecting=req.actor_state_collecting,
|
||||
partner_state_distributing=req.actor_state_distributing,
|
||||
partner_state_defaulted=req.actor_state_defaulted,
|
||||
partner_state_expired=req.actor_state_expired,
|
||||
collector_max_delay=0)
|
||||
self.logger.info("SW=%s PORT=%d LACP sent.",
|
||||
dpid_to_str(datapath.id), port)
|
||||
self.logger.debug(str(res))
|
||||
return res
|
||||
|
||||
def _get_slave_enabled(self, dpid, port):
|
||||
"""get whether a slave i/f at some port of some datapath is
|
||||
enable or not."""
|
||||
slave = self._get_slave(dpid, port)
|
||||
if slave:
|
||||
return slave['enabled']
|
||||
else:
|
||||
return False
|
||||
|
||||
def _set_slave_enabled(self, dpid, port, enabled):
|
||||
"""set whether a slave i/f at some port of some datapath is
|
||||
enable or not."""
|
||||
slave = self._get_slave(dpid, port)
|
||||
if slave:
|
||||
slave['enabled'] = enabled
|
||||
|
||||
def _get_slave_timeout(self, dpid, port):
|
||||
"""get the timeout time at some port of some datapath."""
|
||||
slave = self._get_slave(dpid, port)
|
||||
if slave:
|
||||
return slave['timeout']
|
||||
else:
|
||||
return 0
|
||||
|
||||
def _set_slave_timeout(self, dpid, port, timeout):
|
||||
"""set the timeout time at some port of some datapath."""
|
||||
slave = self._get_slave(dpid, port)
|
||||
if slave:
|
||||
slave['timeout'] = timeout
|
||||
|
||||
def _get_slave(self, dpid, port):
|
||||
"""get slave i/f at some port of some datapath."""
|
||||
result = None
|
||||
for bond in self.bonds:
|
||||
if dpid in bond:
|
||||
if port in bond[dpid]:
|
||||
result = bond[dpid][port]
|
||||
break
|
||||
return result
|
||||
|
||||
#-------------------------------------------------------------------
|
||||
# PRIVATE METHODS ( RELATED TO OPEN FLOW PROTOCOL )
|
||||
#-------------------------------------------------------------------
|
||||
def _set_flow_entry_packet_in(self, src, port, timeout, msg):
|
||||
"""enter a flow entry for the packet from the slave i/f
|
||||
with idle_timeout."""
|
||||
datapath = msg.datapath
|
||||
ofproto = datapath.ofproto
|
||||
parser = datapath.ofproto_parser
|
||||
|
||||
wildcards = ofproto.OFPFW_ALL
|
||||
wildcards &= ~ofproto.OFPFW_IN_PORT
|
||||
wildcards &= ~ofproto.OFPFW_DL_SRC
|
||||
wildcards &= ~ofproto.OFPFW_DL_TYPE
|
||||
match = parser.OFPMatch(wildcards=wildcards, in_port=port,
|
||||
dl_src=addrconv.mac.text_to_bin(src),
|
||||
dl_type=ether.ETH_TYPE_SLOW)
|
||||
actions = [parser.OFPActionOutput(
|
||||
ofproto.OFPP_CONTROLLER, ofproto.OFP_MSG_SIZE_MAX)]
|
||||
mod = parser.OFPFlowMod(
|
||||
datapath=datapath, match=match, cookie=0,
|
||||
command=ofproto.OFPFC_ADD, idle_timeout=timeout,
|
||||
flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
|
||||
datapath.send_msg(mod)
|
||||
|
||||
#-------------------------------------------------------------------
|
||||
# PRIVATE METHODS ( OTHERS )
|
||||
#-------------------------------------------------------------------
|
||||
def _set_logger(self):
|
||||
"""change log format."""
|
||||
self.logger.propagate = False
|
||||
hdl = logging.StreamHandler()
|
||||
fmt_str = '[LACP][%(levelname)s] %(message)s'
|
||||
hdl.setFormatter(logging.Formatter(fmt_str))
|
||||
self.logger.addHandler(hdl)
|
Loading…
Reference in New Issue
Block a user