From ffc1f82554aa64a7e14c49e9c8aad80634319825 Mon Sep 17 00:00:00 2001 From: Yuichi Ito Date: Thu, 26 Dec 2013 10:00:40 +0900 Subject: [PATCH] add IGMP snooping application this application provides the simple example of IGMP snooping. the module "igmplib" mainly offers 2 functions: - listening on in the IGMP conversation between multicast routers and hosts - emulating of the querier function of multicast servers the former operates a switch as a snooping switch and controls transmission of an unnecessary multicasting packet. the latter realizes the IGMP conversation in the environment without multicast routers. the module "simple_switch_igmp" is a variation of "simple_switch". the switch receives the "EventPacketIn" event instead of the "EventOFPPacketIn" event from the module "igmplib" in order to except IGMP. Signed-off-by: Yuichi Ito Signed-off-by: FUJITA Tomonori --- ryu/app/simple_switch_igmp.py | 104 +++++ ryu/lib/igmplib.py | 810 ++++++++++++++++++++++++++++++++++ 2 files changed, 914 insertions(+) create mode 100644 ryu/app/simple_switch_igmp.py create mode 100644 ryu/lib/igmplib.py diff --git a/ryu/app/simple_switch_igmp.py b/ryu/app/simple_switch_igmp.py new file mode 100644 index 00000000..b1b014f0 --- /dev/null +++ b/ryu/app/simple_switch_igmp.py @@ -0,0 +1,104 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import struct + +from ryu.base import app_manager +from ryu.controller.handler import MAIN_DISPATCHER +from ryu.controller.handler import set_ev_cls +from ryu.ofproto import ofproto_v1_0 +from ryu.lib import addrconv +from ryu.lib import igmplib +from ryu.lib.dpid import str_to_dpid + + +class SimpleSwitchIgmp(app_manager.RyuApp): + OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION] + _CONTEXTS = {'igmplib': igmplib.IgmpLib} + + def __init__(self, *args, **kwargs): + super(SimpleSwitchIgmp, self).__init__(*args, **kwargs) + self.mac_to_port = {} + self._snoop = kwargs['igmplib'] + # if you want a switch to operate as a querier, + # set up as follows: + self._snoop.set_querier_mode( + dpid=str_to_dpid('0000000000000001'), server_port=2) + # dpid the datapath id that will operate as a querier. + # server_port a port number which connect to the multicast + # server. + # + # NOTE: you can set up only the one querier. + # when you called this method several times, + # only the last one becomes effective. + + def add_flow(self, datapath, in_port, dst, actions): + ofproto = datapath.ofproto + parser = datapath.ofproto_parser + + match = parser.OFPMatch(in_port=in_port, + dl_dst=addrconv.mac.text_to_bin(dst)) + mod = parser.OFPFlowMod( + datapath=datapath, match=match, cookie=0, + command=ofproto.OFPFC_ADD, actions=actions) + datapath.send_msg(mod) + + @set_ev_cls(igmplib.EventPacketIn, MAIN_DISPATCHER) + def _packet_in_handler(self, ev): + msg = ev.msg + datapath = msg.datapath + ofproto = datapath.ofproto + + (dst_, src_, _eth_type) = struct.unpack_from( + '!6s6sH', buffer(msg.data), 0) + src = addrconv.mac.bin_to_text(src_) + dst = addrconv.mac.bin_to_text(dst_) + + dpid = datapath.id + self.mac_to_port.setdefault(dpid, {}) + + self.logger.info("packet in %s %s %s %s", + dpid, src, dst, msg.in_port) + + # learn a mac address to avoid FLOOD next time. + self.mac_to_port[dpid][src] = msg.in_port + + if dst in self.mac_to_port[dpid]: + out_port = self.mac_to_port[dpid][dst] + else: + out_port = ofproto.OFPP_FLOOD + + actions = [datapath.ofproto_parser.OFPActionOutput(out_port)] + + # install a flow to avoid packet_in next time + if out_port != ofproto.OFPP_FLOOD: + self.add_flow(datapath, msg.in_port, dst, actions) + + out = datapath.ofproto_parser.OFPPacketOut( + datapath=datapath, buffer_id=msg.buffer_id, in_port=msg.in_port, + actions=actions) + datapath.send_msg(out) + + @set_ev_cls(igmplib.EventMulticastGroupStateChanged, + MAIN_DISPATCHER) + def _status_changed(self, ev): + msg = { + igmplib.MG_GROUP_ADDED: 'Multicast Group Added', + igmplib.MG_MEMBER_CHANGED: 'Multicast Group Member Changed', + igmplib.MG_GROUP_REMOVED: 'Multicast Group Removed', + } + self.logger.info("%s: [%s] querier:[%s] hosts:%s", + msg.get(ev.reason), ev.address, ev.src, + ev.dsts) diff --git a/ryu/lib/igmplib.py b/ryu/lib/igmplib.py new file mode 100644 index 00000000..1171da3b --- /dev/null +++ b/ryu/lib/igmplib.py @@ -0,0 +1,810 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import struct + +from ryu.base import app_manager +from ryu.controller import event +from ryu.controller import ofp_event +from ryu.controller.handler import DEAD_DISPATCHER +from ryu.controller.handler import MAIN_DISPATCHER +from ryu.controller.handler import set_ev_cls +from ryu.ofproto import ether +from ryu.ofproto import inet +from ryu.ofproto import ofproto_v1_0 +from ryu.ofproto import ofproto_v1_2 +from ryu.ofproto import ofproto_v1_3 +from ryu.lib import addrconv +from ryu.lib import hub +from ryu.lib.dpid import dpid_to_str +from ryu.lib.packet import packet +from ryu.lib.packet import ethernet +from ryu.lib.packet import ipv4 +from ryu.lib.packet import igmp + + +class EventPacketIn(event.EventBase): + """a PacketIn event class using except IGMP.""" + def __init__(self, msg): + """initialization.""" + super(EventPacketIn, self).__init__() + self.msg = msg + + +MG_GROUP_ADDED = 1 +MG_MEMBER_CHANGED = 2 +MG_GROUP_REMOVED = 3 + + +class EventMulticastGroupStateChanged(event.EventBase): + """a event class that notifies the changes of the statuses of the + multicast groups.""" + + def __init__(self, reason, address, src, dsts): + """ + ========= ===================================================== + Attribute Description + ========= ===================================================== + reason why the event occurs. use one of MG_*. + address a multicast group address. + src a port number in which a querier exists. + dsts a list of port numbers in which the members exist. + ========= ===================================================== + """ + super(EventMulticastGroupStateChanged, self).__init__() + self.reason = reason + self.address = address + self.src = src + self.dsts = dsts + + +class IgmpLib(app_manager.RyuApp): + """IGMP snooping library.""" + + #------------------------------------------------------------------- + # PUBLIC METHODS + #------------------------------------------------------------------- + def __init__(self): + """initialization.""" + super(IgmpLib, self).__init__() + self.name = 'igmplib' + self._querier = IgmpQuerier() + self._snooper = IgmpSnooper(self.send_event_to_observers) + + def set_querier_mode(self, dpid, server_port): + """set a datapath id and server port number to the instance + of IgmpQuerier. + + ============ ================================================== + Attribute Description + ============ ================================================== + dpid the datapath id that will operate as a querier. + server_port the port number linked to the multicasting server. + ============ ================================================== + """ + self._querier.set_querier_mode(dpid, server_port) + + #------------------------------------------------------------------- + # PUBLIC METHODS ( EVENT HANDLERS ) + #------------------------------------------------------------------- + @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER) + def packet_in_handler(self, evt): + """PacketIn event handler. when the received packet was IGMP, + proceed it. otherwise, send a event.""" + msg = evt.msg + dpid = msg.datapath.id + + req_pkt = packet.Packet(msg.data) + req_igmp = req_pkt.get_protocol(igmp.igmp) + if req_igmp: + if self._querier.dpid == dpid: + self._querier.packet_in_handler(req_igmp, msg) + else: + self._snooper.packet_in_handler(req_pkt, req_igmp, msg) + else: + self.send_event_to_observers(EventPacketIn(msg)) + + @set_ev_cls(ofp_event.EventOFPStateChange, + [MAIN_DISPATCHER, DEAD_DISPATCHER]) + def state_change_handler(self, evt): + """StateChange event handler.""" + datapath = evt.datapath + assert datapath is not None + if datapath.id == self._querier.dpid: + if evt.state == MAIN_DISPATCHER: + self._querier.start_loop(datapath) + elif evt.state == DEAD_DISPATCHER: + self._querier.stop_loop() + + +class IgmpBase(object): + """IGMP abstract class library.""" + + #------------------------------------------------------------------- + # PUBLIC METHODS + #------------------------------------------------------------------- + def __init__(self): + self._set_flow_func = { + ofproto_v1_0.OFP_VERSION: self._set_flow_entry_v1_0, + ofproto_v1_2.OFP_VERSION: self._set_flow_entry_v1_2, + ofproto_v1_3.OFP_VERSION: self._set_flow_entry_v1_2, + } + self._del_flow_func = { + ofproto_v1_0.OFP_VERSION: self._del_flow_entry_v1_0, + ofproto_v1_2.OFP_VERSION: self._del_flow_entry_v1_2, + ofproto_v1_3.OFP_VERSION: self._del_flow_entry_v1_2, + } + + #------------------------------------------------------------------- + # PROTECTED METHODS ( RELATED TO OPEN FLOW PROTOCOL ) + #------------------------------------------------------------------- + def _set_flow_entry_v1_0(self, datapath, actions, in_port, dst, + src=None): + ofproto = datapath.ofproto + parser = datapath.ofproto_parser + + match = parser.OFPMatch( + dl_type=ether.ETH_TYPE_IP, in_port=in_port, + nw_src=self._ipv4_text_to_int(src), + nw_dst=self._ipv4_text_to_int(dst)) + mod = parser.OFPFlowMod( + datapath=datapath, match=match, cookie=0, + command=ofproto.OFPFC_ADD, actions=actions) + datapath.send_msg(mod) + + def _set_flow_entry_v1_2(self, datapath, actions, in_port, dst, + src=None): + ofproto = datapath.ofproto + parser = datapath.ofproto_parser + + match = parser.OFPMatch( + eth_type=ether.ETH_TYPE_IP, in_port=in_port, ipv4_dst=dst) + if src is not None: + match.append_field(ofproto.OXM_OF_IPV4_SRC, src) + inst = [parser.OFPInstructionActions( + ofproto.OFPIT_APPLY_ACTIONS, actions)] + mod = parser.OFPFlowMod( + datapath=datapath, command=ofproto.OFPFC_ADD, + priority=65535, match=match, instructions=inst) + datapath.send_msg(mod) + + def _set_flow_entry(self, datapath, actions, in_port, dst, src=None): + """set a flow entry.""" + set_flow = self._set_flow_func.get(datapath.ofproto.OFP_VERSION) + assert set_flow + set_flow(datapath, actions, in_port, dst, src) + + def _del_flow_entry_v1_0(self, datapath, in_port, dst, src=None): + ofproto = datapath.ofproto + parser = datapath.ofproto_parser + + match = parser.OFPMatch( + dl_type=ether.ETH_TYPE_IP, in_port=in_port, + nw_src=self._ipv4_text_to_int(src), + nw_dst=self._ipv4_text_to_int(dst)) + mod = parser.OFPFlowMod( + datapath=datapath, match=match, cookie=0, + command=ofproto.OFPFC_DELETE) + datapath.send_msg(mod) + + def _del_flow_entry_v1_2(self, datapath, in_port, dst, src=None): + ofproto = datapath.ofproto + parser = datapath.ofproto_parser + + match = parser.OFPMatch( + eth_type=ether.ETH_TYPE_IP, in_port=in_port, ipv4_dst=dst) + if src is not None: + match.append_field(ofproto.OXM_OF_IPV4_SRC, src) + mod = parser.OFPFlowMod( + datapath=datapath, command=ofproto.OFPFC_DELETE, + out_port=ofproto.OFPP_ANY, out_group=ofproto.OFPG_ANY, + match=match) + datapath.send_msg(mod) + + def _del_flow_entry(self, datapath, in_port, dst, src=None): + """remove a flow entry.""" + del_flow = self._del_flow_func.get(datapath.ofproto.OFP_VERSION) + assert del_flow + del_flow(datapath, in_port, dst, src) + + def _do_packet_out(self, datapath, data, in_port, actions): + """send a packet.""" + ofproto = datapath.ofproto + parser = datapath.ofproto_parser + + out = parser.OFPPacketOut( + datapath=datapath, buffer_id=ofproto.OFP_NO_BUFFER, + data=data, in_port=in_port, actions=actions) + datapath.send_msg(out) + + #------------------------------------------------------------------- + # PROTECTED METHODS ( OTHERS ) + #------------------------------------------------------------------- + def _ipv4_text_to_int(self, ip_text): + """convert ip v4 string to integer.""" + if ip_text is None: + return None + assert isinstance(ip_text, str) + return struct.unpack('!I', addrconv.ipv4.text_to_bin(ip_text))[0] + + +class IgmpQuerier(IgmpBase): + """IGMP querier emulation class library. + + this querier is a simplified implementation, and is not based on RFC, + for example as following points: + - ignore some constant values + - does not send a specific QUERY in response to LEAVE + - and so on + """ + + #------------------------------------------------------------------- + # PUBLIC METHODS + #------------------------------------------------------------------- + def __init__(self): + """initialization.""" + super(IgmpQuerier, self).__init__() + self.name = "IgmpQuerier" + self.logger = logging.getLogger(self.name) + self.dpid = None + self.server_port = None + + self._datapath = None + self._querier_thread = None + + # the structure of self._macst + # + # +-------+------------------+ + # | group | port: True/False | + # | +------------------+ + # | |... | + # +-------+------------------+ + # | ... | + # +--------------------------+ + # + # group multicast address. + # port a port number which connect to the group member. + # the value indicates that whether a flow entry + # was registered. + self._mcast = {} + + self._set_logger() + + def set_querier_mode(self, dpid, server_port): + """set the datapath to work as a querier. note that you can set + up only the one querier. when you called this method several + times, only the last one becomes effective.""" + self.dpid = dpid + self.server_port = server_port + if self._querier_thread: + hub.kill(self._querier_thread) + self._querier_thread = None + + def packet_in_handler(self, req_igmp, msg): + """the process when the querier received IGMP.""" + ofproto = msg.datapath.ofproto + if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION: + in_port = msg.in_port + else: + in_port = msg.match['in_port'] + if (igmp.IGMP_TYPE_REPORT_V1 == req_igmp.msgtype or + igmp.IGMP_TYPE_REPORT_V2 == req_igmp.msgtype): + self._do_report(req_igmp, in_port, msg) + elif igmp.IGMP_TYPE_LEAVE == req_igmp.msgtype: + self._do_leave(req_igmp, in_port, msg) + + def start_loop(self, datapath): + """start QUERY thread.""" + self._datapath = datapath + self._querier_thread = hub.spawn(self._send_query) + self.logger.info("started a querier.") + + def stop_loop(self): + """stop QUERY thread.""" + hub.kill(self._querier_thread) + self._querier_thread = None + self._datapath = None + self.logger.info("stopped a querier.") + + #------------------------------------------------------------------- + # PRIVATE METHODS ( RELATED TO IGMP ) + #------------------------------------------------------------------- + def _send_query(self): + """ send a QUERY message periodically.""" + timeout = 60 + ofproto = self._datapath.ofproto + parser = self._datapath.ofproto_parser + if ofproto_v1_0.OFP_VERSION == ofproto.OFP_VERSION: + send_port = ofproto.OFPP_NONE + else: + send_port = ofproto.OFPP_ANY + + # create a general query. + res_igmp = igmp.igmp( + msgtype=igmp.IGMP_TYPE_QUERY, + maxresp=igmp.QUERY_RESPONSE_INTERVAL * 10, + csum=0, + address='0.0.0.0') + res_ipv4 = ipv4.ipv4( + total_length=len(ipv4.ipv4()) + len(res_igmp), + proto=inet.IPPROTO_IGMP, ttl=1, + src='0.0.0.0', + dst=igmp.MULTICAST_IP_ALL_HOST) + res_ether = ethernet.ethernet( + dst=igmp.MULTICAST_MAC_ALL_HOST, + src=self._datapath.ports[ofproto.OFPP_LOCAL].hw_addr, + ethertype=ether.ETH_TYPE_IP) + res_pkt = packet.Packet() + res_pkt.add_protocol(res_ether) + res_pkt.add_protocol(res_ipv4) + res_pkt.add_protocol(res_igmp) + res_pkt.serialize() + + actions = [parser.OFPActionOutput(ofproto.OFPP_FLOOD)] + + while True: + # reset reply status. + for status in self._mcast.values(): + for port in status.keys(): + status[port] = False + + # send a general query to the host that sent this message. + self._do_packet_out( + self._datapath, res_pkt.data, send_port, actions) + hub.sleep(igmp.QUERY_RESPONSE_INTERVAL) + + # QUERY timeout expired. + del_groups = [] + for group, status in self._mcast.items(): + del_ports = [] + actions = [] + for port in status.keys(): + if not status[port]: + del_ports.append(port) + else: + actions.append(parser.OFPActionOutput(port)) + if len(actions) and len(del_ports): + self._set_flow_entry( + self._datapath, actions, self.server_port, group) + if not len(actions): + self._del_flow_entry( + self._datapath, self.server_port, group) + del_groups.append(group) + if len(del_ports): + for port in del_ports: + self._del_flow_entry(self._datapath, port, group) + for port in del_ports: + del status[port] + for group in del_groups: + del self._mcast[group] + + rest_time = timeout - igmp.QUERY_RESPONSE_INTERVAL + hub.sleep(rest_time) + + def _do_report(self, report, in_port, msg): + """the process when the querier received a REPORT message.""" + datapath = msg.datapath + ofproto = datapath.ofproto + parser = datapath.ofproto_parser + + if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION: + size = 65535 + else: + size = ofproto.OFPCML_MAX + + update = False + self._mcast.setdefault(report.address, {}) + if not in_port in self._mcast[report.address]: + update = True + self._mcast[report.address][in_port] = True + + if update: + actions = [] + for port in self._mcast[report.address]: + actions.append(parser.OFPActionOutput(port)) + self._set_flow_entry( + datapath, actions, self.server_port, report.address) + self._set_flow_entry( + datapath, + [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER, size)], + in_port, report.address) + + def _do_leave(self, leave, in_port, msg): + """the process when the querier received a LEAVE message.""" + datapath = msg.datapath + parser = datapath.ofproto_parser + + self._mcast.setdefault(leave.address, {}) + if in_port in self._mcast[leave.address]: + self._del_flow_entry( + datapath, in_port, leave.address) + del self._mcast[leave.address][in_port] + actions = [] + for port in self._mcast[leave.address]: + actions.append(parser.OFPActionOutput(port)) + if len(actions): + self._set_flow_entry( + datapath, actions, self.server_port, leave.address) + else: + self._del_flow_entry( + datapath, self.server_port, leave.address) + + #------------------------------------------------------------------- + # PRIVATE METHODS ( OTHERS ) + #------------------------------------------------------------------- + def _set_logger(self): + """change log format.""" + self.logger.propagate = False + hdl = logging.StreamHandler() + fmt_str = '[querier][%(levelname)s] %(message)s' + hdl.setFormatter(logging.Formatter(fmt_str)) + self.logger.addHandler(hdl) + + +class IgmpSnooper(IgmpBase): + """IGMP snooping class library.""" + + #------------------------------------------------------------------- + # PUBLIC METHODS + #------------------------------------------------------------------- + def __init__(self, send_event): + """initialization.""" + super(IgmpSnooper, self).__init__() + self.name = "IgmpSnooper" + self.logger = logging.getLogger(self.name) + self._send_event = send_event + + # the structure of self._to_querier + # + # +------+--------------+ + # | dpid | 'port': port | + # | +--------------+ + # | | 'ip': ip | + # | +--------------+ + # | | 'mac': mac | + # +------+--------------+ + # | ... | + # +---------------------+ + # + # dpid datapath id. + # port a port number which connect to the querier. + # ip IP address of the querier. + # mac MAC address of the querier. + self._to_querier = {} + + # the structure of self._to_hosts + # + # +------+-------+---------------------------------+ + # | dpid | group | 'replied': True/False | + # | | +---------------------------------+ + # | | | 'leave': leave | + # | | +-----------+--------+------------+ + # | | | 'ports' | portno | 'out': out | + # | | | | +------------+ + # | | | | | 'in': in | + # | | | +--------+------------+ + # | | | | ... | + # | +-------+-----------+---------------------+ + # | | ... | + # +------+-----------------------------------------+ + # | ... | + # +------------------------------------------------+ + # + # dpid datapath id. + # group multicast address. + # replied the value indicates whether a REPORT message was + # replied. + # leave a LEAVE message. + # portno a port number which has joined to the multicast + # group. + # out the value indicates whether a flow entry for the + # packet outputted to the port was registered. + # in the value indicates whether a flow entry for the + # packet inputted from the port was registered. + self._to_hosts = {} + + self._set_logger() + + def packet_in_handler(self, req_pkt, req_igmp, msg): + """the process when the snooper received IGMP.""" + dpid = msg.datapath.id + ofproto = msg.datapath.ofproto + if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION: + in_port = msg.in_port + else: + in_port = msg.match['in_port'] + + log = "SW=%s PORT=%d IGMP received. " % ( + dpid_to_str(dpid), in_port) + self.logger.debug(str(req_igmp)) + if igmp.IGMP_TYPE_QUERY == req_igmp.msgtype: + self.logger.info(log + "[QUERY]") + (req_ipv4, ) = req_pkt.get_protocols(ipv4.ipv4) + (req_eth, ) = req_pkt.get_protocols(ethernet.ethernet) + self._do_query(req_igmp, req_ipv4, req_eth, in_port, msg) + elif (igmp.IGMP_TYPE_REPORT_V1 == req_igmp.msgtype or + igmp.IGMP_TYPE_REPORT_V2 == req_igmp.msgtype): + self.logger.info(log + "[REPORT]") + self._do_report(req_igmp, in_port, msg) + elif igmp.IGMP_TYPE_LEAVE == req_igmp.msgtype: + self.logger.info(log + "[LEAVE]") + self._do_leave(req_igmp, in_port, msg) + elif igmp.IGMP_TYPE_REPORT_V3 == req_igmp.msgtype: + self.logger.info(log + "V3 is not supported yet.") + self._do_flood(in_port, msg) + else: + self.logger.info(log + "[unknown type:%d]", + req_igmp.msgtype) + self._do_flood(in_port, msg) + + #------------------------------------------------------------------- + # PRIVATE METHODS ( RELATED TO IGMP ) + #------------------------------------------------------------------- + def _do_query(self, query, iph, eth, in_port, msg): + """the process when the snooper received a QUERY message.""" + datapath = msg.datapath + dpid = datapath.id + ofproto = datapath.ofproto + parser = datapath.ofproto_parser + + # learn the querier. + self._to_querier[dpid] = { + 'port': in_port, + 'ip': iph.src, + 'mac': eth.src + } + + # set the timeout time. + timeout = igmp.QUERY_RESPONSE_INTERVAL + if query.maxresp: + timeout = query.maxresp / 10 + + self._to_hosts.setdefault(dpid, {}) + if '0.0.0.0' == query.address: + # general query. reset all reply status. + for group in self._to_hosts[dpid].values(): + group['replied'] = False + group['leave'] = None + else: + # specific query. reset the reply status of the specific + # group. + group = self._to_hosts[dpid].get(query.address) + if group: + group['replied'] = False + group['leave'] = None + + actions = [parser.OFPActionOutput(ofproto.OFPP_FLOOD)] + self._do_packet_out( + datapath, msg.data, in_port, actions) + + # wait for REPORT messages. + hub.spawn(self._do_timeout_for_query, timeout, datapath) + + def _do_report(self, report, in_port, msg): + """the process when the snooper received a REPORT message.""" + datapath = msg.datapath + dpid = datapath.id + ofproto = datapath.ofproto + parser = datapath.ofproto_parser + + if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION: + size = 65535 + else: + size = ofproto.OFPCML_MAX + + # check whether the querier port has been specified. + outport = None + value = self._to_querier.get(dpid) + if value: + outport = value['port'] + + # send a event when the multicast group address is new. + self._to_hosts.setdefault(dpid, {}) + if not self._to_hosts[dpid].get(report.address): + self._send_event( + EventMulticastGroupStateChanged( + MG_GROUP_ADDED, report.address, outport, [])) + self._to_hosts[dpid].setdefault( + report.address, + {'replied': False, 'leave': None, 'ports': {}}) + + # set a flow entry from a host to the controller when + # a host sent a REPORT message. + if not self._to_hosts[dpid][report.address]['ports'].get( + in_port): + self._to_hosts[dpid][report.address]['ports'][ + in_port] = {'out': False, 'in': False} + self._set_flow_entry( + datapath, + [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER, size)], + in_port, report.address) + + if not self._to_hosts[dpid][report.address]['ports'][ + in_port]['out']: + self._to_hosts[dpid][report.address]['ports'][ + in_port]['out'] = True + + if not outport: + self.logger.info("no querier exists.") + return + + # set a flow entry from a multicast server to hosts. + if not self._to_hosts[dpid][report.address]['ports'][ + in_port]['in']: + actions = [] + ports = [] + for port in self._to_hosts[dpid][report.address]['ports']: + actions.append(parser.OFPActionOutput(port)) + ports.append(port) + self._send_event( + EventMulticastGroupStateChanged( + MG_MEMBER_CHANGED, report.address, outport, ports)) + self._set_flow_entry( + datapath, actions, outport, report.address) + self._to_hosts[dpid][report.address]['ports'][ + in_port]['in'] = True + + # send a REPORT message to the querier if this message arrived + # first after a QUERY message was sent. + if not self._to_hosts[dpid][report.address]['replied']: + actions = [parser.OFPActionOutput(outport, size)] + self._do_packet_out(datapath, msg.data, in_port, actions) + self._to_hosts[dpid][report.address]['replied'] = True + + def _do_leave(self, leave, in_port, msg): + """the process when the snooper received a LEAVE message.""" + datapath = msg.datapath + dpid = datapath.id + ofproto = datapath.ofproto + parser = datapath.ofproto_parser + + # check whether the querier port has been specified. + if not self._to_querier.get(dpid): + self.logger.info("no querier exists.") + return + + # save this LEAVE message and reset the condition of the port + # that received this message. + self._to_hosts.setdefault(dpid, {}) + self._to_hosts[dpid].setdefault( + leave.address, + {'replied': False, 'leave': None, 'ports': {}}) + self._to_hosts[dpid][leave.address]['leave'] = msg + self._to_hosts[dpid][leave.address]['ports'][in_port] = { + 'out': False, 'in': False} + + # create a specific query. + timeout = igmp.LAST_MEMBER_QUERY_INTERVAL + res_igmp = igmp.igmp( + msgtype=igmp.IGMP_TYPE_QUERY, + maxresp=timeout * 10, + csum=0, + address=leave.address) + res_ipv4 = ipv4.ipv4( + total_length=len(ipv4.ipv4()) + len(res_igmp), + proto=inet.IPPROTO_IGMP, ttl=1, + src=self._to_querier[dpid]['ip'], + dst=igmp.MULTICAST_IP_ALL_HOST) + res_ether = ethernet.ethernet( + dst=igmp.MULTICAST_MAC_ALL_HOST, + src=self._to_querier[dpid]['mac'], + ethertype=ether.ETH_TYPE_IP) + res_pkt = packet.Packet() + res_pkt.add_protocol(res_ether) + res_pkt.add_protocol(res_ipv4) + res_pkt.add_protocol(res_igmp) + res_pkt.serialize() + + # send a specific query to the host that sent this message. + actions = [parser.OFPActionOutput(ofproto.OFPP_IN_PORT)] + self._do_packet_out(datapath, res_pkt.data, in_port, actions) + + # wait for REPORT messages. + hub.spawn(self._do_timeout_for_leave, timeout, datapath, + leave.address, in_port) + + def _do_flood(self, in_port, msg): + """the process when the snooper received a message of the + outside for processing. """ + datapath = msg.datapath + ofproto = datapath.ofproto + parser = datapath.ofproto_parser + + actions = [parser.OFPActionOutput(ofproto.OFPP_FLOOD)] + self._do_packet_out(datapath, msg.data, in_port, actions) + + def _do_timeout_for_query(self, timeout, datapath): + """the process when the QUERY from the querier timeout expired.""" + dpid = datapath.id + + hub.sleep(timeout) + outport = self._to_querier[dpid]['port'] + + remove_dsts = [] + for dst in self._to_hosts[dpid]: + if not self._to_hosts[dpid][dst]['replied']: + # if no REPORT message sent from any members of + # the group, remove flow entries about the group and + # send a LEAVE message if exists. + self._remove_multicast_group(datapath, outport, dst) + remove_dsts.append(dst) + + for dst in remove_dsts: + del self._to_hosts[dpid][dst] + + def _do_timeout_for_leave(self, timeout, datapath, dst, in_port): + """the process when the QUERY from the switch timeout expired.""" + parser = datapath.ofproto_parser + dpid = datapath.id + + hub.sleep(timeout) + outport = self._to_querier[dpid]['port'] + + if self._to_hosts[dpid][dst]['ports'][in_port]['out']: + return + + del self._to_hosts[dpid][dst]['ports'][in_port] + self._del_flow_entry(datapath, in_port, dst) + actions = [] + ports = [] + for port in self._to_hosts[dpid][dst]['ports']: + actions.append(parser.OFPActionOutput(port)) + ports.append(port) + + if len(actions): + self._send_event( + EventMulticastGroupStateChanged( + MG_MEMBER_CHANGED, dst, outport, ports)) + self._set_flow_entry( + datapath, actions, outport, dst) + self._to_hosts[dpid][dst]['leave'] = None + else: + self._remove_multicast_group(datapath, outport, dst) + del self._to_hosts[dpid][dst] + + def _remove_multicast_group(self, datapath, outport, dst): + """remove flow entries about the group and send a LEAVE message + if exists.""" + ofproto = datapath.ofproto + parser = datapath.ofproto_parser + dpid = datapath.id + + self._send_event( + EventMulticastGroupStateChanged( + MG_GROUP_REMOVED, dst, outport, [])) + self._del_flow_entry(datapath, outport, dst) + for port in self._to_hosts[dpid][dst]['ports']: + self._del_flow_entry(datapath, port, dst) + leave = self._to_hosts[dpid][dst]['leave'] + if leave: + if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION: + in_port = leave.in_port + else: + in_port = leave.match['in_port'] + actions = [parser.OFPActionOutput(outport)] + self._do_packet_out( + datapath, leave.data, in_port, actions) + + #------------------------------------------------------------------- + # PRIVATE METHODS ( OTHERS ) + #------------------------------------------------------------------- + def _set_logger(self): + """change log format.""" + self.logger.propagate = False + hdl = logging.StreamHandler() + fmt_str = '[snoop][%(levelname)s] %(message)s' + hdl.setFormatter(logging.Formatter(fmt_str)) + self.logger.addHandler(hdl)