add IGMP snooping application

this application provides the simple example of IGMP snooping.

the module "igmplib" mainly offers 2 functions:
- listening on in the IGMP conversation between multicast routers and hosts
- emulating of the querier function of multicast servers
the former operates a switch as a snooping switch and controls transmission of an unnecessary multicasting packet.
the latter realizes the IGMP conversation in the environment without multicast routers.

the module "simple_switch_igmp" is a variation of "simple_switch".
the switch receives the "EventPacketIn" event instead of the "EventOFPPacketIn" event from the module "igmplib" in order to except IGMP.

Signed-off-by: Yuichi Ito <ito.yuichi0@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
Yuichi Ito 2013-12-26 10:00:40 +09:00 committed by FUJITA Tomonori
parent 5611b9e96e
commit ffc1f82554
2 changed files with 914 additions and 0 deletions

View File

@ -0,0 +1,104 @@
# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import struct
from ryu.base import app_manager
from ryu.controller.handler import MAIN_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.ofproto import ofproto_v1_0
from ryu.lib import addrconv
from ryu.lib import igmplib
from ryu.lib.dpid import str_to_dpid
class SimpleSwitchIgmp(app_manager.RyuApp):
OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION]
_CONTEXTS = {'igmplib': igmplib.IgmpLib}
def __init__(self, *args, **kwargs):
super(SimpleSwitchIgmp, self).__init__(*args, **kwargs)
self.mac_to_port = {}
self._snoop = kwargs['igmplib']
# if you want a switch to operate as a querier,
# set up as follows:
self._snoop.set_querier_mode(
dpid=str_to_dpid('0000000000000001'), server_port=2)
# dpid the datapath id that will operate as a querier.
# server_port a port number which connect to the multicast
# server.
#
# NOTE: you can set up only the one querier.
# when you called this method several times,
# only the last one becomes effective.
def add_flow(self, datapath, in_port, dst, actions):
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
match = parser.OFPMatch(in_port=in_port,
dl_dst=addrconv.mac.text_to_bin(dst))
mod = parser.OFPFlowMod(
datapath=datapath, match=match, cookie=0,
command=ofproto.OFPFC_ADD, actions=actions)
datapath.send_msg(mod)
@set_ev_cls(igmplib.EventPacketIn, MAIN_DISPATCHER)
def _packet_in_handler(self, ev):
msg = ev.msg
datapath = msg.datapath
ofproto = datapath.ofproto
(dst_, src_, _eth_type) = struct.unpack_from(
'!6s6sH', buffer(msg.data), 0)
src = addrconv.mac.bin_to_text(src_)
dst = addrconv.mac.bin_to_text(dst_)
dpid = datapath.id
self.mac_to_port.setdefault(dpid, {})
self.logger.info("packet in %s %s %s %s",
dpid, src, dst, msg.in_port)
# learn a mac address to avoid FLOOD next time.
self.mac_to_port[dpid][src] = msg.in_port
if dst in self.mac_to_port[dpid]:
out_port = self.mac_to_port[dpid][dst]
else:
out_port = ofproto.OFPP_FLOOD
actions = [datapath.ofproto_parser.OFPActionOutput(out_port)]
# install a flow to avoid packet_in next time
if out_port != ofproto.OFPP_FLOOD:
self.add_flow(datapath, msg.in_port, dst, actions)
out = datapath.ofproto_parser.OFPPacketOut(
datapath=datapath, buffer_id=msg.buffer_id, in_port=msg.in_port,
actions=actions)
datapath.send_msg(out)
@set_ev_cls(igmplib.EventMulticastGroupStateChanged,
MAIN_DISPATCHER)
def _status_changed(self, ev):
msg = {
igmplib.MG_GROUP_ADDED: 'Multicast Group Added',
igmplib.MG_MEMBER_CHANGED: 'Multicast Group Member Changed',
igmplib.MG_GROUP_REMOVED: 'Multicast Group Removed',
}
self.logger.info("%s: [%s] querier:[%s] hosts:%s",
msg.get(ev.reason), ev.address, ev.src,
ev.dsts)

810
ryu/lib/igmplib.py Normal file
View File

@ -0,0 +1,810 @@
# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import struct
from ryu.base import app_manager
from ryu.controller import event
from ryu.controller import ofp_event
from ryu.controller.handler import DEAD_DISPATCHER
from ryu.controller.handler import MAIN_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.ofproto import ether
from ryu.ofproto import inet
from ryu.ofproto import ofproto_v1_0
from ryu.ofproto import ofproto_v1_2
from ryu.ofproto import ofproto_v1_3
from ryu.lib import addrconv
from ryu.lib import hub
from ryu.lib.dpid import dpid_to_str
from ryu.lib.packet import packet
from ryu.lib.packet import ethernet
from ryu.lib.packet import ipv4
from ryu.lib.packet import igmp
class EventPacketIn(event.EventBase):
"""a PacketIn event class using except IGMP."""
def __init__(self, msg):
"""initialization."""
super(EventPacketIn, self).__init__()
self.msg = msg
MG_GROUP_ADDED = 1
MG_MEMBER_CHANGED = 2
MG_GROUP_REMOVED = 3
class EventMulticastGroupStateChanged(event.EventBase):
"""a event class that notifies the changes of the statuses of the
multicast groups."""
def __init__(self, reason, address, src, dsts):
"""
========= =====================================================
Attribute Description
========= =====================================================
reason why the event occurs. use one of MG_*.
address a multicast group address.
src a port number in which a querier exists.
dsts a list of port numbers in which the members exist.
========= =====================================================
"""
super(EventMulticastGroupStateChanged, self).__init__()
self.reason = reason
self.address = address
self.src = src
self.dsts = dsts
class IgmpLib(app_manager.RyuApp):
"""IGMP snooping library."""
#-------------------------------------------------------------------
# PUBLIC METHODS
#-------------------------------------------------------------------
def __init__(self):
"""initialization."""
super(IgmpLib, self).__init__()
self.name = 'igmplib'
self._querier = IgmpQuerier()
self._snooper = IgmpSnooper(self.send_event_to_observers)
def set_querier_mode(self, dpid, server_port):
"""set a datapath id and server port number to the instance
of IgmpQuerier.
============ ==================================================
Attribute Description
============ ==================================================
dpid the datapath id that will operate as a querier.
server_port the port number linked to the multicasting server.
============ ==================================================
"""
self._querier.set_querier_mode(dpid, server_port)
#-------------------------------------------------------------------
# PUBLIC METHODS ( EVENT HANDLERS )
#-------------------------------------------------------------------
@set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
def packet_in_handler(self, evt):
"""PacketIn event handler. when the received packet was IGMP,
proceed it. otherwise, send a event."""
msg = evt.msg
dpid = msg.datapath.id
req_pkt = packet.Packet(msg.data)
req_igmp = req_pkt.get_protocol(igmp.igmp)
if req_igmp:
if self._querier.dpid == dpid:
self._querier.packet_in_handler(req_igmp, msg)
else:
self._snooper.packet_in_handler(req_pkt, req_igmp, msg)
else:
self.send_event_to_observers(EventPacketIn(msg))
@set_ev_cls(ofp_event.EventOFPStateChange,
[MAIN_DISPATCHER, DEAD_DISPATCHER])
def state_change_handler(self, evt):
"""StateChange event handler."""
datapath = evt.datapath
assert datapath is not None
if datapath.id == self._querier.dpid:
if evt.state == MAIN_DISPATCHER:
self._querier.start_loop(datapath)
elif evt.state == DEAD_DISPATCHER:
self._querier.stop_loop()
class IgmpBase(object):
"""IGMP abstract class library."""
#-------------------------------------------------------------------
# PUBLIC METHODS
#-------------------------------------------------------------------
def __init__(self):
self._set_flow_func = {
ofproto_v1_0.OFP_VERSION: self._set_flow_entry_v1_0,
ofproto_v1_2.OFP_VERSION: self._set_flow_entry_v1_2,
ofproto_v1_3.OFP_VERSION: self._set_flow_entry_v1_2,
}
self._del_flow_func = {
ofproto_v1_0.OFP_VERSION: self._del_flow_entry_v1_0,
ofproto_v1_2.OFP_VERSION: self._del_flow_entry_v1_2,
ofproto_v1_3.OFP_VERSION: self._del_flow_entry_v1_2,
}
#-------------------------------------------------------------------
# PROTECTED METHODS ( RELATED TO OPEN FLOW PROTOCOL )
#-------------------------------------------------------------------
def _set_flow_entry_v1_0(self, datapath, actions, in_port, dst,
src=None):
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
match = parser.OFPMatch(
dl_type=ether.ETH_TYPE_IP, in_port=in_port,
nw_src=self._ipv4_text_to_int(src),
nw_dst=self._ipv4_text_to_int(dst))
mod = parser.OFPFlowMod(
datapath=datapath, match=match, cookie=0,
command=ofproto.OFPFC_ADD, actions=actions)
datapath.send_msg(mod)
def _set_flow_entry_v1_2(self, datapath, actions, in_port, dst,
src=None):
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
match = parser.OFPMatch(
eth_type=ether.ETH_TYPE_IP, in_port=in_port, ipv4_dst=dst)
if src is not None:
match.append_field(ofproto.OXM_OF_IPV4_SRC, src)
inst = [parser.OFPInstructionActions(
ofproto.OFPIT_APPLY_ACTIONS, actions)]
mod = parser.OFPFlowMod(
datapath=datapath, command=ofproto.OFPFC_ADD,
priority=65535, match=match, instructions=inst)
datapath.send_msg(mod)
def _set_flow_entry(self, datapath, actions, in_port, dst, src=None):
"""set a flow entry."""
set_flow = self._set_flow_func.get(datapath.ofproto.OFP_VERSION)
assert set_flow
set_flow(datapath, actions, in_port, dst, src)
def _del_flow_entry_v1_0(self, datapath, in_port, dst, src=None):
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
match = parser.OFPMatch(
dl_type=ether.ETH_TYPE_IP, in_port=in_port,
nw_src=self._ipv4_text_to_int(src),
nw_dst=self._ipv4_text_to_int(dst))
mod = parser.OFPFlowMod(
datapath=datapath, match=match, cookie=0,
command=ofproto.OFPFC_DELETE)
datapath.send_msg(mod)
def _del_flow_entry_v1_2(self, datapath, in_port, dst, src=None):
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
match = parser.OFPMatch(
eth_type=ether.ETH_TYPE_IP, in_port=in_port, ipv4_dst=dst)
if src is not None:
match.append_field(ofproto.OXM_OF_IPV4_SRC, src)
mod = parser.OFPFlowMod(
datapath=datapath, command=ofproto.OFPFC_DELETE,
out_port=ofproto.OFPP_ANY, out_group=ofproto.OFPG_ANY,
match=match)
datapath.send_msg(mod)
def _del_flow_entry(self, datapath, in_port, dst, src=None):
"""remove a flow entry."""
del_flow = self._del_flow_func.get(datapath.ofproto.OFP_VERSION)
assert del_flow
del_flow(datapath, in_port, dst, src)
def _do_packet_out(self, datapath, data, in_port, actions):
"""send a packet."""
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
out = parser.OFPPacketOut(
datapath=datapath, buffer_id=ofproto.OFP_NO_BUFFER,
data=data, in_port=in_port, actions=actions)
datapath.send_msg(out)
#-------------------------------------------------------------------
# PROTECTED METHODS ( OTHERS )
#-------------------------------------------------------------------
def _ipv4_text_to_int(self, ip_text):
"""convert ip v4 string to integer."""
if ip_text is None:
return None
assert isinstance(ip_text, str)
return struct.unpack('!I', addrconv.ipv4.text_to_bin(ip_text))[0]
class IgmpQuerier(IgmpBase):
"""IGMP querier emulation class library.
this querier is a simplified implementation, and is not based on RFC,
for example as following points:
- ignore some constant values
- does not send a specific QUERY in response to LEAVE
- and so on
"""
#-------------------------------------------------------------------
# PUBLIC METHODS
#-------------------------------------------------------------------
def __init__(self):
"""initialization."""
super(IgmpQuerier, self).__init__()
self.name = "IgmpQuerier"
self.logger = logging.getLogger(self.name)
self.dpid = None
self.server_port = None
self._datapath = None
self._querier_thread = None
# the structure of self._macst
#
# +-------+------------------+
# | group | port: True/False |
# | +------------------+
# | |... |
# +-------+------------------+
# | ... |
# +--------------------------+
#
# group multicast address.
# port a port number which connect to the group member.
# the value indicates that whether a flow entry
# was registered.
self._mcast = {}
self._set_logger()
def set_querier_mode(self, dpid, server_port):
"""set the datapath to work as a querier. note that you can set
up only the one querier. when you called this method several
times, only the last one becomes effective."""
self.dpid = dpid
self.server_port = server_port
if self._querier_thread:
hub.kill(self._querier_thread)
self._querier_thread = None
def packet_in_handler(self, req_igmp, msg):
"""the process when the querier received IGMP."""
ofproto = msg.datapath.ofproto
if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
in_port = msg.in_port
else:
in_port = msg.match['in_port']
if (igmp.IGMP_TYPE_REPORT_V1 == req_igmp.msgtype or
igmp.IGMP_TYPE_REPORT_V2 == req_igmp.msgtype):
self._do_report(req_igmp, in_port, msg)
elif igmp.IGMP_TYPE_LEAVE == req_igmp.msgtype:
self._do_leave(req_igmp, in_port, msg)
def start_loop(self, datapath):
"""start QUERY thread."""
self._datapath = datapath
self._querier_thread = hub.spawn(self._send_query)
self.logger.info("started a querier.")
def stop_loop(self):
"""stop QUERY thread."""
hub.kill(self._querier_thread)
self._querier_thread = None
self._datapath = None
self.logger.info("stopped a querier.")
#-------------------------------------------------------------------
# PRIVATE METHODS ( RELATED TO IGMP )
#-------------------------------------------------------------------
def _send_query(self):
""" send a QUERY message periodically."""
timeout = 60
ofproto = self._datapath.ofproto
parser = self._datapath.ofproto_parser
if ofproto_v1_0.OFP_VERSION == ofproto.OFP_VERSION:
send_port = ofproto.OFPP_NONE
else:
send_port = ofproto.OFPP_ANY
# create a general query.
res_igmp = igmp.igmp(
msgtype=igmp.IGMP_TYPE_QUERY,
maxresp=igmp.QUERY_RESPONSE_INTERVAL * 10,
csum=0,
address='0.0.0.0')
res_ipv4 = ipv4.ipv4(
total_length=len(ipv4.ipv4()) + len(res_igmp),
proto=inet.IPPROTO_IGMP, ttl=1,
src='0.0.0.0',
dst=igmp.MULTICAST_IP_ALL_HOST)
res_ether = ethernet.ethernet(
dst=igmp.MULTICAST_MAC_ALL_HOST,
src=self._datapath.ports[ofproto.OFPP_LOCAL].hw_addr,
ethertype=ether.ETH_TYPE_IP)
res_pkt = packet.Packet()
res_pkt.add_protocol(res_ether)
res_pkt.add_protocol(res_ipv4)
res_pkt.add_protocol(res_igmp)
res_pkt.serialize()
actions = [parser.OFPActionOutput(ofproto.OFPP_FLOOD)]
while True:
# reset reply status.
for status in self._mcast.values():
for port in status.keys():
status[port] = False
# send a general query to the host that sent this message.
self._do_packet_out(
self._datapath, res_pkt.data, send_port, actions)
hub.sleep(igmp.QUERY_RESPONSE_INTERVAL)
# QUERY timeout expired.
del_groups = []
for group, status in self._mcast.items():
del_ports = []
actions = []
for port in status.keys():
if not status[port]:
del_ports.append(port)
else:
actions.append(parser.OFPActionOutput(port))
if len(actions) and len(del_ports):
self._set_flow_entry(
self._datapath, actions, self.server_port, group)
if not len(actions):
self._del_flow_entry(
self._datapath, self.server_port, group)
del_groups.append(group)
if len(del_ports):
for port in del_ports:
self._del_flow_entry(self._datapath, port, group)
for port in del_ports:
del status[port]
for group in del_groups:
del self._mcast[group]
rest_time = timeout - igmp.QUERY_RESPONSE_INTERVAL
hub.sleep(rest_time)
def _do_report(self, report, in_port, msg):
"""the process when the querier received a REPORT message."""
datapath = msg.datapath
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
size = 65535
else:
size = ofproto.OFPCML_MAX
update = False
self._mcast.setdefault(report.address, {})
if not in_port in self._mcast[report.address]:
update = True
self._mcast[report.address][in_port] = True
if update:
actions = []
for port in self._mcast[report.address]:
actions.append(parser.OFPActionOutput(port))
self._set_flow_entry(
datapath, actions, self.server_port, report.address)
self._set_flow_entry(
datapath,
[parser.OFPActionOutput(ofproto.OFPP_CONTROLLER, size)],
in_port, report.address)
def _do_leave(self, leave, in_port, msg):
"""the process when the querier received a LEAVE message."""
datapath = msg.datapath
parser = datapath.ofproto_parser
self._mcast.setdefault(leave.address, {})
if in_port in self._mcast[leave.address]:
self._del_flow_entry(
datapath, in_port, leave.address)
del self._mcast[leave.address][in_port]
actions = []
for port in self._mcast[leave.address]:
actions.append(parser.OFPActionOutput(port))
if len(actions):
self._set_flow_entry(
datapath, actions, self.server_port, leave.address)
else:
self._del_flow_entry(
datapath, self.server_port, leave.address)
#-------------------------------------------------------------------
# PRIVATE METHODS ( OTHERS )
#-------------------------------------------------------------------
def _set_logger(self):
"""change log format."""
self.logger.propagate = False
hdl = logging.StreamHandler()
fmt_str = '[querier][%(levelname)s] %(message)s'
hdl.setFormatter(logging.Formatter(fmt_str))
self.logger.addHandler(hdl)
class IgmpSnooper(IgmpBase):
"""IGMP snooping class library."""
#-------------------------------------------------------------------
# PUBLIC METHODS
#-------------------------------------------------------------------
def __init__(self, send_event):
"""initialization."""
super(IgmpSnooper, self).__init__()
self.name = "IgmpSnooper"
self.logger = logging.getLogger(self.name)
self._send_event = send_event
# the structure of self._to_querier
#
# +------+--------------+
# | dpid | 'port': port |
# | +--------------+
# | | 'ip': ip |
# | +--------------+
# | | 'mac': mac |
# +------+--------------+
# | ... |
# +---------------------+
#
# dpid datapath id.
# port a port number which connect to the querier.
# ip IP address of the querier.
# mac MAC address of the querier.
self._to_querier = {}
# the structure of self._to_hosts
#
# +------+-------+---------------------------------+
# | dpid | group | 'replied': True/False |
# | | +---------------------------------+
# | | | 'leave': leave |
# | | +-----------+--------+------------+
# | | | 'ports' | portno | 'out': out |
# | | | | +------------+
# | | | | | 'in': in |
# | | | +--------+------------+
# | | | | ... |
# | +-------+-----------+---------------------+
# | | ... |
# +------+-----------------------------------------+
# | ... |
# +------------------------------------------------+
#
# dpid datapath id.
# group multicast address.
# replied the value indicates whether a REPORT message was
# replied.
# leave a LEAVE message.
# portno a port number which has joined to the multicast
# group.
# out the value indicates whether a flow entry for the
# packet outputted to the port was registered.
# in the value indicates whether a flow entry for the
# packet inputted from the port was registered.
self._to_hosts = {}
self._set_logger()
def packet_in_handler(self, req_pkt, req_igmp, msg):
"""the process when the snooper received IGMP."""
dpid = msg.datapath.id
ofproto = msg.datapath.ofproto
if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
in_port = msg.in_port
else:
in_port = msg.match['in_port']
log = "SW=%s PORT=%d IGMP received. " % (
dpid_to_str(dpid), in_port)
self.logger.debug(str(req_igmp))
if igmp.IGMP_TYPE_QUERY == req_igmp.msgtype:
self.logger.info(log + "[QUERY]")
(req_ipv4, ) = req_pkt.get_protocols(ipv4.ipv4)
(req_eth, ) = req_pkt.get_protocols(ethernet.ethernet)
self._do_query(req_igmp, req_ipv4, req_eth, in_port, msg)
elif (igmp.IGMP_TYPE_REPORT_V1 == req_igmp.msgtype or
igmp.IGMP_TYPE_REPORT_V2 == req_igmp.msgtype):
self.logger.info(log + "[REPORT]")
self._do_report(req_igmp, in_port, msg)
elif igmp.IGMP_TYPE_LEAVE == req_igmp.msgtype:
self.logger.info(log + "[LEAVE]")
self._do_leave(req_igmp, in_port, msg)
elif igmp.IGMP_TYPE_REPORT_V3 == req_igmp.msgtype:
self.logger.info(log + "V3 is not supported yet.")
self._do_flood(in_port, msg)
else:
self.logger.info(log + "[unknown type:%d]",
req_igmp.msgtype)
self._do_flood(in_port, msg)
#-------------------------------------------------------------------
# PRIVATE METHODS ( RELATED TO IGMP )
#-------------------------------------------------------------------
def _do_query(self, query, iph, eth, in_port, msg):
"""the process when the snooper received a QUERY message."""
datapath = msg.datapath
dpid = datapath.id
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
# learn the querier.
self._to_querier[dpid] = {
'port': in_port,
'ip': iph.src,
'mac': eth.src
}
# set the timeout time.
timeout = igmp.QUERY_RESPONSE_INTERVAL
if query.maxresp:
timeout = query.maxresp / 10
self._to_hosts.setdefault(dpid, {})
if '0.0.0.0' == query.address:
# general query. reset all reply status.
for group in self._to_hosts[dpid].values():
group['replied'] = False
group['leave'] = None
else:
# specific query. reset the reply status of the specific
# group.
group = self._to_hosts[dpid].get(query.address)
if group:
group['replied'] = False
group['leave'] = None
actions = [parser.OFPActionOutput(ofproto.OFPP_FLOOD)]
self._do_packet_out(
datapath, msg.data, in_port, actions)
# wait for REPORT messages.
hub.spawn(self._do_timeout_for_query, timeout, datapath)
def _do_report(self, report, in_port, msg):
"""the process when the snooper received a REPORT message."""
datapath = msg.datapath
dpid = datapath.id
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
size = 65535
else:
size = ofproto.OFPCML_MAX
# check whether the querier port has been specified.
outport = None
value = self._to_querier.get(dpid)
if value:
outport = value['port']
# send a event when the multicast group address is new.
self._to_hosts.setdefault(dpid, {})
if not self._to_hosts[dpid].get(report.address):
self._send_event(
EventMulticastGroupStateChanged(
MG_GROUP_ADDED, report.address, outport, []))
self._to_hosts[dpid].setdefault(
report.address,
{'replied': False, 'leave': None, 'ports': {}})
# set a flow entry from a host to the controller when
# a host sent a REPORT message.
if not self._to_hosts[dpid][report.address]['ports'].get(
in_port):
self._to_hosts[dpid][report.address]['ports'][
in_port] = {'out': False, 'in': False}
self._set_flow_entry(
datapath,
[parser.OFPActionOutput(ofproto.OFPP_CONTROLLER, size)],
in_port, report.address)
if not self._to_hosts[dpid][report.address]['ports'][
in_port]['out']:
self._to_hosts[dpid][report.address]['ports'][
in_port]['out'] = True
if not outport:
self.logger.info("no querier exists.")
return
# set a flow entry from a multicast server to hosts.
if not self._to_hosts[dpid][report.address]['ports'][
in_port]['in']:
actions = []
ports = []
for port in self._to_hosts[dpid][report.address]['ports']:
actions.append(parser.OFPActionOutput(port))
ports.append(port)
self._send_event(
EventMulticastGroupStateChanged(
MG_MEMBER_CHANGED, report.address, outport, ports))
self._set_flow_entry(
datapath, actions, outport, report.address)
self._to_hosts[dpid][report.address]['ports'][
in_port]['in'] = True
# send a REPORT message to the querier if this message arrived
# first after a QUERY message was sent.
if not self._to_hosts[dpid][report.address]['replied']:
actions = [parser.OFPActionOutput(outport, size)]
self._do_packet_out(datapath, msg.data, in_port, actions)
self._to_hosts[dpid][report.address]['replied'] = True
def _do_leave(self, leave, in_port, msg):
"""the process when the snooper received a LEAVE message."""
datapath = msg.datapath
dpid = datapath.id
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
# check whether the querier port has been specified.
if not self._to_querier.get(dpid):
self.logger.info("no querier exists.")
return
# save this LEAVE message and reset the condition of the port
# that received this message.
self._to_hosts.setdefault(dpid, {})
self._to_hosts[dpid].setdefault(
leave.address,
{'replied': False, 'leave': None, 'ports': {}})
self._to_hosts[dpid][leave.address]['leave'] = msg
self._to_hosts[dpid][leave.address]['ports'][in_port] = {
'out': False, 'in': False}
# create a specific query.
timeout = igmp.LAST_MEMBER_QUERY_INTERVAL
res_igmp = igmp.igmp(
msgtype=igmp.IGMP_TYPE_QUERY,
maxresp=timeout * 10,
csum=0,
address=leave.address)
res_ipv4 = ipv4.ipv4(
total_length=len(ipv4.ipv4()) + len(res_igmp),
proto=inet.IPPROTO_IGMP, ttl=1,
src=self._to_querier[dpid]['ip'],
dst=igmp.MULTICAST_IP_ALL_HOST)
res_ether = ethernet.ethernet(
dst=igmp.MULTICAST_MAC_ALL_HOST,
src=self._to_querier[dpid]['mac'],
ethertype=ether.ETH_TYPE_IP)
res_pkt = packet.Packet()
res_pkt.add_protocol(res_ether)
res_pkt.add_protocol(res_ipv4)
res_pkt.add_protocol(res_igmp)
res_pkt.serialize()
# send a specific query to the host that sent this message.
actions = [parser.OFPActionOutput(ofproto.OFPP_IN_PORT)]
self._do_packet_out(datapath, res_pkt.data, in_port, actions)
# wait for REPORT messages.
hub.spawn(self._do_timeout_for_leave, timeout, datapath,
leave.address, in_port)
def _do_flood(self, in_port, msg):
"""the process when the snooper received a message of the
outside for processing. """
datapath = msg.datapath
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
actions = [parser.OFPActionOutput(ofproto.OFPP_FLOOD)]
self._do_packet_out(datapath, msg.data, in_port, actions)
def _do_timeout_for_query(self, timeout, datapath):
"""the process when the QUERY from the querier timeout expired."""
dpid = datapath.id
hub.sleep(timeout)
outport = self._to_querier[dpid]['port']
remove_dsts = []
for dst in self._to_hosts[dpid]:
if not self._to_hosts[dpid][dst]['replied']:
# if no REPORT message sent from any members of
# the group, remove flow entries about the group and
# send a LEAVE message if exists.
self._remove_multicast_group(datapath, outport, dst)
remove_dsts.append(dst)
for dst in remove_dsts:
del self._to_hosts[dpid][dst]
def _do_timeout_for_leave(self, timeout, datapath, dst, in_port):
"""the process when the QUERY from the switch timeout expired."""
parser = datapath.ofproto_parser
dpid = datapath.id
hub.sleep(timeout)
outport = self._to_querier[dpid]['port']
if self._to_hosts[dpid][dst]['ports'][in_port]['out']:
return
del self._to_hosts[dpid][dst]['ports'][in_port]
self._del_flow_entry(datapath, in_port, dst)
actions = []
ports = []
for port in self._to_hosts[dpid][dst]['ports']:
actions.append(parser.OFPActionOutput(port))
ports.append(port)
if len(actions):
self._send_event(
EventMulticastGroupStateChanged(
MG_MEMBER_CHANGED, dst, outport, ports))
self._set_flow_entry(
datapath, actions, outport, dst)
self._to_hosts[dpid][dst]['leave'] = None
else:
self._remove_multicast_group(datapath, outport, dst)
del self._to_hosts[dpid][dst]
def _remove_multicast_group(self, datapath, outport, dst):
"""remove flow entries about the group and send a LEAVE message
if exists."""
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
dpid = datapath.id
self._send_event(
EventMulticastGroupStateChanged(
MG_GROUP_REMOVED, dst, outport, []))
self._del_flow_entry(datapath, outport, dst)
for port in self._to_hosts[dpid][dst]['ports']:
self._del_flow_entry(datapath, port, dst)
leave = self._to_hosts[dpid][dst]['leave']
if leave:
if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
in_port = leave.in_port
else:
in_port = leave.match['in_port']
actions = [parser.OFPActionOutput(outport)]
self._do_packet_out(
datapath, leave.data, in_port, actions)
#-------------------------------------------------------------------
# PRIVATE METHODS ( OTHERS )
#-------------------------------------------------------------------
def _set_logger(self):
"""change log format."""
self.logger.propagate = False
hdl = logging.StreamHandler()
fmt_str = '[snoop][%(levelname)s] %(message)s'
hdl.setFormatter(logging.Formatter(fmt_str))
self.logger.addHandler(hdl)