topology: support link discovery
event.py: add link events. switches.py: add link discovery (only of1.0). dumper.py: add handler for link events. TODO: support other OpenFlow version. Signed-off-by: YAMADA Hideki <yamada.hideki@po.ntts.co.jp> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
parent
443891b1c4
commit
7d5a68cdc2
@ -41,6 +41,7 @@ from ryu import version
|
||||
from ryu.app import wsgi
|
||||
from ryu.base.app_manager import AppManager
|
||||
from ryu.controller import controller
|
||||
from ryu.topology import switches
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
@ -21,7 +21,7 @@ import time
|
||||
|
||||
from ryu.base import app_manager
|
||||
from ryu.controller.handler import set_ev_handler
|
||||
from ryu.topology import switches, event
|
||||
from ryu.topology import event
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -34,8 +34,15 @@ class DiscoveryEventDumper(app_manager.RyuApp):
|
||||
super(DiscoveryEventDumper, self).__init__()
|
||||
|
||||
# For testing when sync and async request.
|
||||
# self.threads.append(gevent.spawn_later(0, self._request_sync, 5))
|
||||
self.threads.append(gevent.spawn_later(0, self._request_async, 10))
|
||||
# self.threads.append(
|
||||
# gevent.spawn_later(0, self._switch_request_sync, 5))
|
||||
# self.threads.append(
|
||||
# gevent.spawn_later(0, self._switch_request_async, 10))
|
||||
#
|
||||
# self.threads.append(
|
||||
# gevent.spawn_later(0, self._link_request_sync, 5))
|
||||
# self.threads.append(
|
||||
# gevent.spawn_later(0, self._link_request_async, 10))
|
||||
|
||||
self.is_active = True
|
||||
|
||||
@ -59,22 +66,30 @@ class DiscoveryEventDumper(app_manager.RyuApp):
|
||||
def port_modify_handler(self, ev):
|
||||
LOG.debug(ev)
|
||||
|
||||
def _request_sync(self, interval):
|
||||
@set_ev_handler(event.EventLinkAdd)
|
||||
def link_add_handler(self, ev):
|
||||
LOG.debug(ev)
|
||||
|
||||
@set_ev_handler(event.EventLinkDelete)
|
||||
def link_del_handler(self, ev):
|
||||
LOG.debug(ev)
|
||||
|
||||
def _switch_request_sync(self, interval):
|
||||
while self.is_active:
|
||||
request = event.EventSwitchRequest()
|
||||
LOG.debug('request sync %s thread(%s)',
|
||||
LOG.debug('switch_request sync %s thread(%s)',
|
||||
request, id(gevent.getcurrent()))
|
||||
reply = self.send_request(request)
|
||||
LOG.debug('reply sync %s', reply)
|
||||
LOG.debug('switch_reply sync %s', reply)
|
||||
if len(reply.switches) > 0:
|
||||
for sw in reply.switches:
|
||||
LOG.debug(' %s', sw)
|
||||
gevent.sleep(interval)
|
||||
|
||||
def _request_async(self, interval):
|
||||
def _switch_request_async(self, interval):
|
||||
while self.is_active:
|
||||
request = event.EventSwitchRequest()
|
||||
LOG.debug('request async %s thread(%s)',
|
||||
LOG.debug('switch_request async %s thread(%s)',
|
||||
request, id(gevent.getcurrent()))
|
||||
self.send_event(request.dst, request)
|
||||
|
||||
@ -86,7 +101,7 @@ class DiscoveryEventDumper(app_manager.RyuApp):
|
||||
i += 1
|
||||
LOG.debug(' thread is busy... %s/%s thread(%s)',
|
||||
i, busy, id(gevent.getcurrent()))
|
||||
LOG.debug(' thread yield to reply handler. thread(%s)',
|
||||
LOG.debug(' thread yield to switch_reply handler. thread(%s)',
|
||||
id(gevent.getcurrent()))
|
||||
|
||||
# yield
|
||||
@ -98,7 +113,51 @@ class DiscoveryEventDumper(app_manager.RyuApp):
|
||||
|
||||
@set_ev_handler(event.EventSwitchReply)
|
||||
def switch_reply_handler(self, reply):
|
||||
LOG.debug('reply async %s', reply)
|
||||
LOG.debug('switch_reply async %s', reply)
|
||||
if len(reply.switches) > 0:
|
||||
for sw in reply.switches:
|
||||
LOG.debug(' %s', sw)
|
||||
|
||||
def _link_request_sync(self, interval):
|
||||
while self.is_active:
|
||||
request = event.EventLinkRequest()
|
||||
LOG.debug('link_request sync %s thread(%s)',
|
||||
request, id(gevent.getcurrent()))
|
||||
reply = self.send_request(request)
|
||||
LOG.debug('link_reply sync %s', reply)
|
||||
if len(reply.links) > 0:
|
||||
for link in reply.links:
|
||||
LOG.debug(' %s', link)
|
||||
gevent.sleep(interval)
|
||||
|
||||
def _link_request_async(self, interval):
|
||||
while self.is_active:
|
||||
request = event.EventLinkRequest()
|
||||
LOG.debug('link_request async %s thread(%s)',
|
||||
request, id(gevent.getcurrent()))
|
||||
self.send_event(request.dst, request)
|
||||
|
||||
start = time.time()
|
||||
busy = interval / 2
|
||||
i = 0
|
||||
while i < busy:
|
||||
if time.time() > start + i:
|
||||
i += 1
|
||||
LOG.debug(' thread is busy... %s/%s thread(%s)',
|
||||
i, busy, id(gevent.getcurrent()))
|
||||
LOG.debug(' thread yield to link_reply handler. thread(%s)',
|
||||
id(gevent.getcurrent()))
|
||||
|
||||
# yield
|
||||
gevent.sleep(0)
|
||||
|
||||
LOG.debug(' thread get back. thread(%s)',
|
||||
id(gevent.getcurrent()))
|
||||
gevent.sleep(interval - busy)
|
||||
|
||||
@set_ev_handler(event.EventLinkReply)
|
||||
def link_reply_handler(self, reply):
|
||||
LOG.debug('link_reply async %s', reply)
|
||||
if len(reply.links) > 0:
|
||||
for link in reply.links:
|
||||
LOG.debug(' %s', link)
|
||||
|
@ -84,3 +84,45 @@ class EventSwitchReply(event.EventReplyBase):
|
||||
def __str__(self):
|
||||
return 'EventSwitchReply<dst=%s, %s>' % \
|
||||
(self.dst, self.switches)
|
||||
|
||||
|
||||
class EventLinkBase(event.EventBase):
|
||||
def __init__(self, link):
|
||||
super(EventLinkBase, self).__init__()
|
||||
self.link = link
|
||||
|
||||
def __str__(self):
|
||||
return '%s<%s>' % (self.__class__.__name__, self.link)
|
||||
|
||||
|
||||
class EventLinkAdd(EventLinkBase):
|
||||
def __init__(self, link):
|
||||
super(EventLinkAdd, self).__init__(link)
|
||||
|
||||
|
||||
class EventLinkDelete(EventLinkBase):
|
||||
def __init__(self, link):
|
||||
super(EventLinkDelete, self).__init__(link)
|
||||
|
||||
|
||||
class EventLinkRequest(event.EventRequestBase):
|
||||
# If dpid is None, reply all list
|
||||
def __init__(self, dpid=None):
|
||||
super(EventLinkRequest, self).__init__()
|
||||
self.dst = 'switches'
|
||||
self.dpid = dpid
|
||||
|
||||
def __str__(self):
|
||||
return 'EventLinkRequest<src=%s, dpid=%s>' % \
|
||||
(self.src, self.dpid)
|
||||
|
||||
|
||||
class EventLinkReply(event.EventReplyBase):
|
||||
def __init__(self, dst, dpid, links):
|
||||
super(EventLinkReply, self).__init__(dst)
|
||||
self.dpid = dpid
|
||||
self.links = links
|
||||
|
||||
def __str__(self):
|
||||
return 'EventLinkReply<dst=%s, dpid=%s, links=%s>' % \
|
||||
(self.dst, self.dpid, len(self.links))
|
||||
|
@ -14,16 +14,43 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import gevent
|
||||
import struct
|
||||
import time
|
||||
from oslo.config import cfg
|
||||
|
||||
from ryu.topology import event
|
||||
from ryu.base import app_manager
|
||||
from ryu.controller import ofp_event
|
||||
from ryu.controller.handler import set_ev_cls
|
||||
from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
|
||||
from ryu.exception import RyuException
|
||||
from ryu.lib.mac import DONTCARE
|
||||
from ryu.lib.dpid import dpid_to_str, str_to_dpid
|
||||
from ryu.lib.packet import packet, ethernet, lldp
|
||||
from ryu.ofproto.ether import ETH_TYPE_LLDP
|
||||
from ryu.ofproto import ofproto_v1_0
|
||||
from ryu.ofproto import nx_match
|
||||
from ryu.ofproto import ofproto_v1_2
|
||||
from ryu.ofproto import ofproto_v1_3
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
CONF.register_cli_opts([
|
||||
cfg.BoolOpt('observe-links', default=False,
|
||||
help='observe link discovery events.'),
|
||||
cfg.BoolOpt('install-lldp-flow', default=True,
|
||||
help='link discovery: explicitly install flow entry '
|
||||
'to send lldp packet to controller'),
|
||||
cfg.BoolOpt('explicit-drop', default=True,
|
||||
help='link discovery: explicitly drop lldp packet in')
|
||||
])
|
||||
|
||||
|
||||
class Port(object):
|
||||
# This is data class passed by EventPortXXX
|
||||
def __init__(self, dpid, ofproto, ofpport):
|
||||
@ -91,6 +118,27 @@ class Switch(object):
|
||||
return msg
|
||||
|
||||
|
||||
class Link(object):
|
||||
# This is data class passed by EventLinkXXX
|
||||
def __init__(self, src, dst):
|
||||
super(Link, self).__init__()
|
||||
self.src = src
|
||||
self.dst = dst
|
||||
|
||||
# this type is used for key value of LinkState
|
||||
def __eq__(self, other):
|
||||
return self.src == other.src and self.dst == other.dst
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self.__eq__(other)
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.src, self.dst))
|
||||
|
||||
def __str__(self):
|
||||
return 'Link: %s to %s' % (self.src, self.dst)
|
||||
|
||||
|
||||
class PortState(dict):
|
||||
# dict: int port_no -> OFPPort port
|
||||
# OFPPort is defined in ryu.ofproto.ofproto_v1_X_parser
|
||||
@ -107,17 +155,297 @@ class PortState(dict):
|
||||
self[port_no] = port
|
||||
|
||||
|
||||
class PortData(object):
|
||||
def __init__(self, is_down, lldp_data):
|
||||
super(PortData, self).__init__()
|
||||
self.is_down = is_down
|
||||
self.lldp_data = lldp_data
|
||||
self.timestamp = None
|
||||
self.sent = 0
|
||||
|
||||
def lldp_sent(self):
|
||||
self.timestamp = time.time()
|
||||
self.sent += 1
|
||||
|
||||
def lldp_received(self):
|
||||
self.sent = 0
|
||||
|
||||
def lldp_dropped(self):
|
||||
return self.sent
|
||||
|
||||
def clear_timestamp(self):
|
||||
self.timestamp = None
|
||||
|
||||
def set_down(self, is_down):
|
||||
self.is_down = is_down
|
||||
|
||||
def __str__(self):
|
||||
return 'PortData<live=%s, timestamp=%s, sent=%d>' \
|
||||
% (not self.is_down, self.timestamp, self.sent)
|
||||
|
||||
|
||||
class PortDataState(dict):
|
||||
# dict: Port class -> PortData class
|
||||
# slimed down version of OrderedDict as python 2.6 doesn't support it.
|
||||
_PREV = 0
|
||||
_NEXT = 1
|
||||
_KEY = 2
|
||||
|
||||
def __init__(self):
|
||||
super(PortDataState, self).__init__()
|
||||
self._root = root = [] # sentinel node
|
||||
root[:] = [root, root, None] # [_PREV, _NEXT, _KEY]
|
||||
# doubly linked list
|
||||
self._map = {}
|
||||
|
||||
def _remove_key(self, key):
|
||||
link_prev, link_next, key = self._map.pop(key)
|
||||
link_prev[self._NEXT] = link_next
|
||||
link_next[self._PREV] = link_prev
|
||||
|
||||
def _append_key(self, key):
|
||||
root = self._root
|
||||
last = root[self._PREV]
|
||||
last[self._NEXT] = root[self._PREV] = self._map[key] = [last, root,
|
||||
key]
|
||||
|
||||
def _prepend_key(self, key):
|
||||
root = self._root
|
||||
first = root[self._NEXT]
|
||||
first[self._PREV] = root[self._NEXT] = self._map[key] = [root, first,
|
||||
key]
|
||||
|
||||
def _move_last_key(self, key):
|
||||
self._remove_key(key)
|
||||
self._append_key(key)
|
||||
|
||||
def _move_front_key(self, key):
|
||||
self._remove_key(key)
|
||||
self._prepend_key(key)
|
||||
|
||||
def add_port(self, port, lldp_data):
|
||||
if port not in self:
|
||||
self._prepend_key(port)
|
||||
self[port] = PortData(port.is_down(), lldp_data)
|
||||
else:
|
||||
self[port].is_down = port.is_down()
|
||||
|
||||
def lldp_sent(self, port):
|
||||
port_data = self[port]
|
||||
port_data.lldp_sent()
|
||||
self._move_last_key(port)
|
||||
return port_data
|
||||
|
||||
def lldp_received(self, port):
|
||||
self[port].lldp_received()
|
||||
|
||||
def move_front(self, port):
|
||||
port_data = self.get(port, None)
|
||||
if port_data is not None:
|
||||
port_data.clear_timestamp()
|
||||
self._move_front_key(port)
|
||||
|
||||
def set_down(self, port):
|
||||
is_down = port.is_down()
|
||||
port_data = self[port]
|
||||
port_data.set_down(is_down)
|
||||
port_data.clear_timestamp()
|
||||
if not is_down:
|
||||
self._move_front_key(port)
|
||||
return is_down
|
||||
|
||||
def get_port(self, port):
|
||||
return self[port]
|
||||
|
||||
def del_port(self, port):
|
||||
del self[port]
|
||||
self._remove_key(port)
|
||||
|
||||
def __iter__(self):
|
||||
root = self._root
|
||||
curr = root[self._NEXT]
|
||||
while curr is not root:
|
||||
yield curr[self._KEY]
|
||||
curr = curr[self._NEXT]
|
||||
|
||||
def clear(self):
|
||||
for node in self._map.itervalues():
|
||||
del node[:]
|
||||
root = self._root
|
||||
root[:] = [root, root, None]
|
||||
self._map.clear()
|
||||
dict.clear(self)
|
||||
|
||||
def items(self):
|
||||
'od.items() -> list of (key, value) pairs in od'
|
||||
return [(key, self[key]) for key in self]
|
||||
|
||||
def iteritems(self):
|
||||
'od.iteritems -> an iterator over the (key, value) pairs in od'
|
||||
for k in self:
|
||||
yield (k, self[k])
|
||||
|
||||
|
||||
class LinkState(dict):
|
||||
# dict: Link class -> timestamp
|
||||
def __init__(self):
|
||||
super(LinkState, self).__init__()
|
||||
self._map = {}
|
||||
|
||||
def get_peer(self, src):
|
||||
return self._map.get(src, None)
|
||||
|
||||
def update_link(self, src, dst):
|
||||
link = Link(src, dst)
|
||||
|
||||
self[link] = time.time()
|
||||
self._map[src] = dst
|
||||
|
||||
# return if the reverse link is also up or not
|
||||
rev_link = Link(dst, src)
|
||||
return rev_link in self
|
||||
|
||||
def link_down(self, link):
|
||||
del self[link]
|
||||
del self._map[link.src]
|
||||
|
||||
def rev_link_set_timestamp(self, rev_link, timestamp):
|
||||
# rev_link may or may not in LinkSet
|
||||
if rev_link in self:
|
||||
self[rev_link] = timestamp
|
||||
|
||||
def port_deleted(self, src):
|
||||
dst = self.get_peer(src)
|
||||
if dst is None:
|
||||
raise KeyError()
|
||||
|
||||
link = Link(src, dst)
|
||||
rev_link = Link(dst, src)
|
||||
del self[link]
|
||||
del self._map[src]
|
||||
# reverse link might not exist
|
||||
self.pop(rev_link, None)
|
||||
rev_link_dst = self._map.pop(dst, None)
|
||||
|
||||
return dst, rev_link_dst
|
||||
|
||||
|
||||
class LLDPPacket(object):
|
||||
# make a LLDP packet for link discovery.
|
||||
|
||||
CHASSIS_ID_PREFIX = 'dpid:'
|
||||
CHASSIS_ID_PREFIX_LEN = len(CHASSIS_ID_PREFIX)
|
||||
CHASSIS_ID_FMT = CHASSIS_ID_PREFIX + '%s'
|
||||
|
||||
PORT_ID_STR = '!I' # uint32_t
|
||||
PORT_ID_SIZE = 4
|
||||
|
||||
class LLDPUnknownFormat(RyuException):
|
||||
message = '%(msg)s'
|
||||
|
||||
@staticmethod
|
||||
def lldp_packet(dpid, port_no, dl_addr, ttl):
|
||||
pkt = packet.Packet()
|
||||
|
||||
dst = lldp.LLDP_MAC_NEAREST_BRIDGE
|
||||
src = dl_addr
|
||||
ethertype = ETH_TYPE_LLDP
|
||||
eth_pkt = ethernet.ethernet(dst, src, ethertype)
|
||||
pkt.add_protocol(eth_pkt)
|
||||
|
||||
tlv_chassis_id = lldp.ChassisID(
|
||||
subtype=lldp.ChassisID.SUB_LOCALLY_ASSIGNED,
|
||||
chassis_id=LLDPPacket.CHASSIS_ID_FMT %
|
||||
dpid_to_str(dpid))
|
||||
|
||||
tlv_port_id = lldp.PortID(subtype=lldp.PortID.SUB_PORT_COMPONENT,
|
||||
port_id=struct.pack(
|
||||
LLDPPacket.PORT_ID_STR,
|
||||
port_no))
|
||||
|
||||
tlv_ttl = lldp.TTL(ttl=ttl)
|
||||
tlv_end = lldp.End()
|
||||
|
||||
tlvs = (tlv_chassis_id, tlv_port_id, tlv_ttl, tlv_end)
|
||||
lldp_pkt = lldp.lldp(tlvs)
|
||||
pkt.add_protocol(lldp_pkt)
|
||||
|
||||
pkt.serialize()
|
||||
return pkt.data
|
||||
|
||||
@staticmethod
|
||||
def lldp_parse(data):
|
||||
pkt = packet.Packet(data)
|
||||
eth_pkt = pkt.next()
|
||||
assert type(eth_pkt) == ethernet.ethernet
|
||||
|
||||
lldp_pkt = pkt.next()
|
||||
if type(lldp_pkt) != lldp.lldp:
|
||||
raise LLDPPacket.LLDPUnknownFormat()
|
||||
|
||||
tlv_chassis_id = lldp_pkt.tlvs[0]
|
||||
if tlv_chassis_id.subtype != lldp.ChassisID.SUB_LOCALLY_ASSIGNED:
|
||||
raise LLDPPacket.LLDPUnknownFormat(
|
||||
msg='unknown chassis id subtype %d' % tlv_chassis_id.subtype)
|
||||
chassis_id = tlv_chassis_id.chassis_id
|
||||
if not chassis_id.startswith(LLDPPacket.CHASSIS_ID_PREFIX):
|
||||
raise LLDPPacket.LLDPUnknownFormat(
|
||||
msg='unknown chassis id format %s' % chassis_id)
|
||||
src_dpid = str_to_dpid(chassis_id[LLDPPacket.CHASSIS_ID_PREFIX_LEN:])
|
||||
|
||||
tlv_port_id = lldp_pkt.tlvs[1]
|
||||
if tlv_port_id.subtype != lldp.PortID.SUB_PORT_COMPONENT:
|
||||
raise LLDPPacket.LLDPUnknownFormat(
|
||||
msg='unknown port id subtype %d' % tlv_port_id.subtype)
|
||||
port_id = tlv_port_id.port_id
|
||||
if len(port_id) != LLDPPacket.PORT_ID_SIZE:
|
||||
raise LLDPPacket.LLDPUnknownFormat(
|
||||
msg='unknown port id %d' % port_id)
|
||||
(src_port_no, ) = struct.unpack(LLDPPacket.PORT_ID_STR, port_id)
|
||||
|
||||
return src_dpid, src_port_no
|
||||
|
||||
|
||||
class Switches(app_manager.RyuApp):
|
||||
_EVENTS = [event.EventSwitchEnter, event.EventSwitchLeave,
|
||||
event.EventPortAdd, event.EventPortDelete,
|
||||
event.EventPortModify]
|
||||
event.EventPortModify,
|
||||
event.EventLinkAdd, event.EventLinkDelete]
|
||||
|
||||
DEFAULT_TTL = 120 # unused. ignored.
|
||||
LLDP_PACKET_LEN = len(LLDPPacket.lldp_packet(0, 0, DONTCARE, 0))
|
||||
|
||||
LLDP_SEND_GUARD = .05
|
||||
LLDP_SEND_PERIOD_PER_PORT = .9
|
||||
TIMEOUT_CHECK_PERIOD = 5.
|
||||
LINK_TIMEOUT = TIMEOUT_CHECK_PERIOD * 2
|
||||
LINK_LLDP_DROP = 5
|
||||
|
||||
def __init__(self):
|
||||
super(Switches, self).__init__()
|
||||
|
||||
self.name = 'switches'
|
||||
self.dps = {} # datapath_id => class Datapath
|
||||
self.dps = {} # datapath_id => Datapath class
|
||||
self.port_state = {} # datapath_id => ports
|
||||
self.ports = PortDataState() # Port class -> PortData class
|
||||
self.links = LinkState() # Link class -> timestamp
|
||||
self.is_active = True
|
||||
|
||||
self.link_discovery = CONF.observe_links
|
||||
if self.link_discovery:
|
||||
self.install_flow = CONF.install_lldp_flow
|
||||
self.explicit_drop = CONF.explicit_drop
|
||||
self.lldp_event = gevent.event.Event()
|
||||
self.link_event = gevent.event.Event()
|
||||
self.threads.append(gevent.spawn_later(0, self.lldp_loop))
|
||||
self.threads.append(gevent.spawn_later(0, self.link_loop))
|
||||
|
||||
def close(self):
|
||||
self.is_active = False
|
||||
if self.link_discovery:
|
||||
self.lldp_event.set()
|
||||
self.link_event.set()
|
||||
gevent.joinall(self.threads)
|
||||
|
||||
def _register(self, dp):
|
||||
assert dp.id is not None
|
||||
@ -133,12 +461,41 @@ class Switches(app_manager.RyuApp):
|
||||
del self.dps[dp.id]
|
||||
del self.port_state[dp.id]
|
||||
|
||||
def _get_switch(self, dp):
|
||||
switch = Switch(dp)
|
||||
for ofpport in self.port_state[dp.id].itervalues():
|
||||
def _get_switch(self, dpid):
|
||||
if dpid in self.dps:
|
||||
switch = Switch(self.dps[dpid])
|
||||
for ofpport in self.port_state[dpid].itervalues():
|
||||
switch.add_port(ofpport)
|
||||
return switch
|
||||
|
||||
def _get_port(self, dpid, port_no):
|
||||
switch = self._get_switch(dpid)
|
||||
if switch:
|
||||
for p in switch.ports:
|
||||
if p.port_no == port_no:
|
||||
return p
|
||||
|
||||
def _port_added(self, port):
|
||||
lldp_data = LLDPPacket.lldp_packet(
|
||||
port.dpid, port.port_no, port.hw_addr, self.DEFAULT_TTL)
|
||||
self.ports.add_port(port, lldp_data)
|
||||
# LOG.debug('_port_added dpid=%s, port_no=%s, live=%s',
|
||||
# port.dpid, port.port_no, port.is_live())
|
||||
|
||||
def _link_down(self, port):
|
||||
try:
|
||||
dst, rev_link_dst = self.links.port_deleted(port)
|
||||
except KeyError:
|
||||
# LOG.debug('key error. src=%s, dst=%s',
|
||||
# port, self.links.get_peer(port))
|
||||
return
|
||||
link = Link(port, dst)
|
||||
self.send_event_to_observers(event.EventLinkDelete(link))
|
||||
if rev_link_dst:
|
||||
rev_link = Link(dst, rev_link_dst)
|
||||
self.send_event_to_observers(event.EventLinkDelete(rev_link))
|
||||
self.ports.move_front(dst)
|
||||
|
||||
@set_ev_cls(ofp_event.EventOFPStateChange,
|
||||
[MAIN_DISPATCHER, DEAD_DISPATCHER])
|
||||
def state_change_handler(self, ev):
|
||||
@ -148,19 +505,54 @@ class Switches(app_manager.RyuApp):
|
||||
|
||||
if ev.state == MAIN_DISPATCHER:
|
||||
self._register(dp)
|
||||
switch = self._get_switch(dp)
|
||||
switch = self._get_switch(dp.id)
|
||||
LOG.debug('register %s', switch)
|
||||
self.send_event_to_observers(event.EventSwitchEnter(switch))
|
||||
|
||||
if not self.link_discovery:
|
||||
return
|
||||
|
||||
if self.install_flow:
|
||||
ofproto = dp.ofproto
|
||||
ofproto_parser = dp.ofproto_parser
|
||||
|
||||
# TODO:XXX need other versions
|
||||
if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
|
||||
rule = nx_match.ClsRule()
|
||||
rule.set_dl_dst(lldp.LLDP_MAC_NEAREST_BRIDGE)
|
||||
rule.set_dl_type(ETH_TYPE_LLDP)
|
||||
actions = [ofproto_parser.OFPActionOutput(
|
||||
ofproto.OFPP_CONTROLLER, self.LLDP_PACKET_LEN)]
|
||||
dp.send_flow_mod(
|
||||
rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
|
||||
idle_timeout=0, hard_timeout=0, actions=actions)
|
||||
else:
|
||||
LOG.error('cannot install flow. unsupported version. %x',
|
||||
dp.ofproto.OFP_VERSION)
|
||||
|
||||
for port in switch.ports:
|
||||
if not port.is_reserved():
|
||||
self._port_added(port)
|
||||
self.lldp_event.set()
|
||||
|
||||
elif ev.state == DEAD_DISPATCHER:
|
||||
# dp.id is None when datapath dies before handshake
|
||||
if dp.id is None:
|
||||
return
|
||||
switch = self._get_switch(dp)
|
||||
switch = self._get_switch(dp.id)
|
||||
self._unregister(dp)
|
||||
LOG.debug('unregister %s', switch)
|
||||
self.send_event_to_observers(event.EventSwitchLeave(switch))
|
||||
|
||||
if not self.link_discovery:
|
||||
return
|
||||
|
||||
for port in switch.ports:
|
||||
if not port.is_reserved():
|
||||
self.ports.del_port(port)
|
||||
self._link_down(port)
|
||||
self.lldp_event.set()
|
||||
|
||||
@set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER)
|
||||
def port_status_handler(self, ev):
|
||||
msg = ev.msg
|
||||
@ -176,6 +568,14 @@ class Switches(app_manager.RyuApp):
|
||||
self.send_event_to_observers(
|
||||
event.EventPortAdd(Port(dp.id, dp.ofproto, ofpport)))
|
||||
|
||||
if not self.link_discovery:
|
||||
return
|
||||
|
||||
port = self._get_port(dp.id, ofpport.port_no)
|
||||
if port and not port.is_reserved():
|
||||
self._port_added(port)
|
||||
self.lldp_event.set()
|
||||
|
||||
elif reason == dp.ofproto.OFPPR_DELETE:
|
||||
#LOG.debug('A port was deleted.' +
|
||||
# '(datapath id = %s, port number = %s)',
|
||||
@ -184,6 +584,15 @@ class Switches(app_manager.RyuApp):
|
||||
self.send_event_to_observers(
|
||||
event.EventPortDelete(Port(dp.id, dp.ofproto, ofpport)))
|
||||
|
||||
if not self.link_discovery:
|
||||
return
|
||||
|
||||
port = self._get_port(dp.id, ofpport.port_no)
|
||||
if port and not port.is_reserved():
|
||||
self.ports.del_port(port)
|
||||
self._link_down(port)
|
||||
self.lldp_event.set()
|
||||
|
||||
else:
|
||||
assert reason == dp.ofproto.OFPPR_MODIFY
|
||||
#LOG.debug('A port was modified.' +
|
||||
@ -193,18 +602,184 @@ class Switches(app_manager.RyuApp):
|
||||
self.send_event_to_observers(
|
||||
event.EventPortModify(Port(dp.id, dp.ofproto, ofpport)))
|
||||
|
||||
if not self.link_discovery:
|
||||
return
|
||||
|
||||
port = self._get_port(dp.id, ofpport.port_no)
|
||||
if port and not port.is_reserved():
|
||||
if self.ports.set_down(port):
|
||||
self._link_down(port)
|
||||
self.lldp_event.set()
|
||||
|
||||
@staticmethod
|
||||
def _drop_packet(msg):
|
||||
if msg.buffer_id == 0xffffffff:
|
||||
return # TODO:use constant instead of -1
|
||||
|
||||
dp = msg.datapath
|
||||
# TODO:XXX
|
||||
if dp.ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
|
||||
dp.send_packet_out(dp.id, msg.in_port, [])
|
||||
else:
|
||||
LOG.error('cannot drop_packet. unsupported version. %x',
|
||||
dp.ofproto.OFP_VERSION)
|
||||
|
||||
@set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
|
||||
def packet_in_handler(self, ev):
|
||||
if not self.link_discovery:
|
||||
return
|
||||
|
||||
msg = ev.msg
|
||||
try:
|
||||
src_dpid, src_port_no = LLDPPacket.lldp_parse(msg.data)
|
||||
except LLDPPacket.LLDPUnknownFormat as e:
|
||||
# This handler can receive all the packtes which can be
|
||||
# not-LLDP packet. Ignore it silently
|
||||
return
|
||||
else:
|
||||
dst_dpid = msg.datapath.id
|
||||
dst_port_no = msg.in_port
|
||||
|
||||
src = self._get_port(src_dpid, src_port_no)
|
||||
if not src or src.dpid == dst_dpid:
|
||||
return
|
||||
|
||||
dst = self._get_port(dst_dpid, dst_port_no)
|
||||
if not dst:
|
||||
return
|
||||
|
||||
old_peer = self.links.get_peer(src)
|
||||
# LOG.debug("Packet-In")
|
||||
# LOG.debug(" src=%s", src)
|
||||
# LOG.debug(" dst=%s", dst)
|
||||
# LOG.debug(" old_peer=%s", old_peer)
|
||||
if old_peer and old_peer != dst:
|
||||
old_link = Link(src, old_peer)
|
||||
self.send_event_to_observers(event.EventLinkDelete(old_link))
|
||||
|
||||
link = Link(src, dst)
|
||||
if not link in self.links:
|
||||
self.send_event_to_observers(event.EventLinkAdd(link))
|
||||
|
||||
if not self.links.update_link(src, dst):
|
||||
# reverse link is not detected yet.
|
||||
# So schedule the check early because it's very likely it's up
|
||||
try:
|
||||
self.ports.lldp_received(dst)
|
||||
except KeyError as e:
|
||||
# There are races between EventOFPPacketIn and
|
||||
# EventDPPortAdd. So packet-in event can happend before
|
||||
# port add event. In that case key error can happend.
|
||||
# LOG.debug('lldp_received: KeyError %s', e)
|
||||
pass
|
||||
else:
|
||||
self.ports.move_front(dst)
|
||||
self.lldp_event.set()
|
||||
if self.explicit_drop:
|
||||
self._drop_packet(msg)
|
||||
|
||||
def send_lldp_packet(self, port):
|
||||
try:
|
||||
port_data = self.ports.lldp_sent(port)
|
||||
except KeyError as e:
|
||||
# ports can be modified during our sleep in self.lldp_loop()
|
||||
# LOG.debug('send_lldp: KeyError %s', e)
|
||||
return
|
||||
if port_data.is_down:
|
||||
return
|
||||
|
||||
dp = self.dps.get(port.dpid, None)
|
||||
if dp is None:
|
||||
# datapath was already deleted
|
||||
return
|
||||
|
||||
# LOG.debug('lldp sent dpid=%s, port_no=%d', dp.id, port.port_no)
|
||||
# TODO:XXX
|
||||
if dp.ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
|
||||
actions = [dp.ofproto_parser.OFPActionOutput(port.port_no)]
|
||||
dp.send_packet_out(actions=actions, data=port_data.lldp_data)
|
||||
else:
|
||||
LOG.error('cannot send lldp packet. unsupported version. %x',
|
||||
dp.ofproto.OFP_VERSION)
|
||||
|
||||
def lldp_loop(self):
|
||||
while self.is_active:
|
||||
self.lldp_event.clear()
|
||||
|
||||
now = time.time()
|
||||
timeout = None
|
||||
ports_now = []
|
||||
ports = []
|
||||
for (key, data) in self.ports.items():
|
||||
if data.timestamp is None:
|
||||
ports_now.append(key)
|
||||
continue
|
||||
|
||||
expire = data.timestamp + self.LLDP_SEND_PERIOD_PER_PORT
|
||||
if expire <= now:
|
||||
ports.append(key)
|
||||
continue
|
||||
|
||||
timeout = expire - now
|
||||
break
|
||||
|
||||
for port in ports_now:
|
||||
self.send_lldp_packet(port)
|
||||
for port in ports:
|
||||
self.send_lldp_packet(port)
|
||||
gevent.sleep(self.LLDP_SEND_GUARD) # don't burst
|
||||
|
||||
if timeout is not None and ports:
|
||||
timeout = 0 # We have already slept
|
||||
# LOG.debug('lldp sleep %s', timeout)
|
||||
self.lldp_event.wait(timeout=timeout)
|
||||
|
||||
def link_loop(self):
|
||||
while self.is_active:
|
||||
self.link_event.clear()
|
||||
|
||||
now = time.time()
|
||||
deleted = []
|
||||
for (link, timestamp) in self.links.items():
|
||||
# LOG.debug('%s timestamp %d (now %d)', link, timestamp, now)
|
||||
if timestamp + self.LINK_TIMEOUT < now:
|
||||
src = link.src
|
||||
if src in self.ports:
|
||||
port_data = self.ports.get_port(src)
|
||||
# LOG.debug('port_data %s', port_data)
|
||||
if port_data.lldp_dropped() > self.LINK_LLDP_DROP:
|
||||
deleted.append(link)
|
||||
|
||||
for link in deleted:
|
||||
self.links.link_down(link)
|
||||
# LOG.debug('delete %s', link)
|
||||
self.send_event_to_observers(event.EventLinkDelete(link))
|
||||
|
||||
dst = link.dst
|
||||
rev_link = Link(dst, link.src)
|
||||
if rev_link not in deleted:
|
||||
# It is very likely that the reverse link is also
|
||||
# disconnected. Check it early.
|
||||
expire = now - self.LINK_TIMEOUT
|
||||
self.links.rev_link_set_timestamp(rev_link, expire)
|
||||
if dst in self.ports:
|
||||
self.ports.move_front(dst)
|
||||
self.lldp_event.set()
|
||||
|
||||
self.link_event.wait(timeout=self.TIMEOUT_CHECK_PERIOD)
|
||||
|
||||
@set_ev_cls(event.EventSwitchRequest)
|
||||
def switch_request_handler(self, req):
|
||||
LOG.debug(req)
|
||||
# LOG.debug(req)
|
||||
dpid = req.dpid
|
||||
|
||||
switches = []
|
||||
if dpid is None:
|
||||
# reply all list
|
||||
for dp in self.dps.itervalues():
|
||||
switches.append(self._get_switch(dp))
|
||||
switches.append(self._get_switch(dp.id))
|
||||
elif dpid in self.dps:
|
||||
switches.append(self._get_switch(self.dps[dpid]))
|
||||
switches.append(self._get_switch(dpid))
|
||||
|
||||
rep = event.EventSwitchReply(req.src, switches)
|
||||
if req.sync:
|
||||
@ -212,10 +787,33 @@ class Switches(app_manager.RyuApp):
|
||||
else:
|
||||
self.send_event(req.src, rep)
|
||||
|
||||
@set_ev_cls(event.EventLinkRequest)
|
||||
def link_request_handler(self, req):
|
||||
# LOG.debug(req)
|
||||
dpid = req.dpid
|
||||
|
||||
def get(app, dpid=None):
|
||||
if dpid is None:
|
||||
links = self.links
|
||||
else:
|
||||
links = [link for link in self.links if link.src.dpid == dpid]
|
||||
rep = event.EventLinkReply(req.src, dpid, links)
|
||||
if req.sync:
|
||||
self.send_reply(rep)
|
||||
else:
|
||||
self.send_event(req.src, rep)
|
||||
|
||||
|
||||
def get_switch(app, dpid=None):
|
||||
return app.send_request(event.EventSwitchRequest(dpid))
|
||||
|
||||
|
||||
def get_all(app):
|
||||
return get(app)
|
||||
def get_all_switch(app):
|
||||
return get_switch(app)
|
||||
|
||||
|
||||
def get_link(app, dpid=None):
|
||||
return app.send_request(event.EventLinkRequest(dpid))
|
||||
|
||||
|
||||
def get_all_link(app):
|
||||
return get_link(app)
|
||||
|
Loading…
Reference in New Issue
Block a user