replace dispatcher mechanism

This is purely internal change and no API for applications is
changed. At least, I confirmed that folsom OpenStack plugin works.

With the current dispatcher mechanism, multiple greenlets call
applications' handlers and might be blocked anywhere so we need
various locks to handle that concurrency. This makes things difficult
for application developers.

With this patch, each applications are connected with events. Each
application has the own greenlet(s) to handle events and might send
events to other applications.

If an application registers handlers for some OF events, it subscribes
to OF component and registers the OF events that it's interested. OF
application delivers such OF events to the application and the
application's greenlet executes the handlers.

With this, we can completely remove dispatcher.py and its friends.

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
FUJITA Tomonori 2013-02-03 21:54:40 +09:00
parent 64e2f97fd9
commit 7578e7d602
6 changed files with 129 additions and 85 deletions

View File

@ -17,6 +17,9 @@
import inspect
import itertools
import logging
import gevent
from gevent.queue import Queue
from ryu import utils
from ryu.controller.handler import register_instance
@ -24,20 +27,11 @@ from ryu.controller.controller import Datapath
LOG = logging.getLogger('ryu.base.app_manager')
SERVICE_BRICKS = {}
class RyuAppContext(object):
"""
Base class for Ryu application context
"""
def __init__(self):
super(RyuAppContext, self).__init__()
def close(self):
"""
teardown method
The method name, close, is chosen for python context manager
"""
pass
def lookup_service_brick(name):
return SERVICE_BRICKS.get(name)
class RyuApp(object):
@ -55,6 +49,45 @@ class RyuApp(object):
def __init__(self, *_args, **_kwargs):
super(RyuApp, self).__init__()
self.name = self.__class__.__name__
self.event_handlers = {}
self.observers = {}
self.threads = []
self.events = Queue()
self.threads.append(gevent.spawn(self._event_loop))
def register_handler(self, ev_cls, handler):
assert callable(handler)
self.event_handlers.setdefault(ev_cls, [])
self.event_handlers[ev_cls].append(handler)
def register_observer(self, ev_cls, name):
self.observers.setdefault(ev_cls, [])
self.observers[ev_cls].append(name)
def get_handlers(self, ev):
return self.event_handlers.get(ev.__class__, [])
def get_observers(self, ev):
return self.observers.get(ev.__class__, [])
def _event_loop(self):
while True:
ev = self.events.get()
handlers = self.get_handlers(ev)
for handler in handlers:
handler(ev)
def _send_event(self, ev):
self.events.put(ev)
def send_event(self, name, ev):
if name in SERVICE_BRICKS:
SERVICE_BRICKS[name]._send_event(ev)
def send_event_to_observers(self, ev):
for observer in self.get_observers(ev):
self.send_event(observer, ev)
def close(self):
"""
@ -102,7 +135,12 @@ class AppManager(object):
def create_contexts(self):
for key, cls in self.contexts_cls.items():
self.contexts[key] = cls()
context = cls()
self.contexts[key] = context
# hack for dpset
if context.__class__.__base__ == RyuApp:
SERVICE_BRICKS[context.name] = context
register_instance(context)
return self.contexts
def instantiate_apps(self, *args, **kwargs):
@ -124,6 +162,15 @@ class AppManager(object):
app = cls(*args, **kwargs)
register_instance(app)
self.applications[app_name] = app
SERVICE_BRICKS[app.name] = app
for key, i in SERVICE_BRICKS.items():
for _k, m in inspect.getmembers(i, inspect.ismethod):
if hasattr(m, 'observer'):
name = m.observer.split('.')[-1]
if name in SERVICE_BRICKS:
brick = SERVICE_BRICKS[name]
brick.register_observer(m.ev_cls, i.name)
def close(self):
def close_all(close_dict):

View File

@ -25,6 +25,8 @@ import ssl
from gevent.server import StreamServer
from gevent.queue import Queue
import ryu.base.app_manager
from ryu.ofproto import ofproto_common
from ryu.ofproto import ofproto_parser
from ryu.ofproto import ofproto_v1_0
@ -35,7 +37,6 @@ from ryu.ofproto import ofproto_v1_3
from ryu.ofproto import ofproto_v1_3_parser
from ryu.ofproto import nx_match
from ryu.controller import dispatcher
from ryu.controller import handler
from ryu.controller import ofp_event
@ -123,25 +124,22 @@ class Datapath(object):
# prevent it from eating memory up
self.send_q = Queue(16)
# circular reference self.ev_q.aux == self
self.ev_q = dispatcher.EventQueue(handler.QUEUE_NAME_OFP_MSG,
handler.HANDSHAKE_DISPATCHER,
self)
self.set_version(max(self.supported_ofp_version))
self.xid = random.randint(0, self.ofproto.MAX_XID)
self.id = None # datapath_id is unknown yet
self.ports = None
self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10
self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event')
self.set_state(handler.HANDSHAKE_DISPATCHER)
def close(self):
"""
Call this before discarding this datapath object
The circular refernce as self.ev_q.aux == self must be broken.
"""
# tell this datapath is dead
self.ev_q.set_dispatcher(handler.DEAD_DISPATCHER)
self.ev_q.close()
self.set_state(handler.DEAD_DISPATCHER)
def set_state(self, state):
self.state = state
ev = ofp_event.EventOFPStateChange(self)
ev.state = state
self.ofp_brick.send_event_to_observers(ev)
def set_version(self, version):
assert version in self.supported_ofp_version
@ -169,7 +167,13 @@ class Datapath(object):
msg = ofproto_parser.msg(self,
version, msg_type, msg_len, xid, buf)
#LOG.debug('queue msg %s cls %s', msg, msg.__class__)
self.ev_q.queue(ofp_event.ofp_msg_to_ev(msg))
ev = ofp_event.ofp_msg_to_ev(msg)
handlers = self.ofp_brick.get_handlers(ev)
for handler in handlers:
if self.state in handler.dispatchers:
handler(ev)
self.ofp_brick.send_event_to_observers(ev)
buf = buf[required_len:]
required_len = ofproto_common.OFP_HEADER_SIZE
@ -219,10 +223,6 @@ class Datapath(object):
gevent.kill(send_thr)
gevent.joinall([send_thr])
def send_ev(self, ev):
#LOG.debug('send_ev %s', ev)
self.ev_q.queue(ev)
#
# Utility methods for convenience
#

View File

@ -16,8 +16,9 @@
import logging
from ryu.base import app_manager
from ryu.controller import event
from ryu.controller import dispatcher
from ryu.controller import ofp_event
from ryu.controller import dp_type
from ryu.controller import handler
from ryu.controller.handler import set_ev_cls
@ -25,9 +26,7 @@ from ryu.controller.handler import set_ev_cls
LOG = logging.getLogger('ryu.controller.dpset')
QUEUE_NAME_DPSET = 'dpset'
DISPATCHER_NAME_DPSET = 'dpset'
DPSET_EV_DISPATCHER = dispatcher.EventDispatcher(DISPATCHER_NAME_DPSET)
DPSET_EV_DISPATCHER = 'dpset'
class EventDPBase(event.EventBase):
@ -46,18 +45,16 @@ class EventDP(EventDPBase):
# this depends on controller::Datapath and dispatchers in handler
class DPSet(object):
class DPSet(app_manager.RyuApp):
def __init__(self):
super(DPSet, self).__init__()
self.name = 'dpset'
# dp registration and type setting can be occur in any order
# Sometimes the sw_type is set before dp connection
self.dp_types = {}
self.dps = {} # datapath_id => class Datapath
self.ev_q = dispatcher.EventQueue(QUEUE_NAME_DPSET,
DPSET_EV_DISPATCHER)
handler.register_instance(self)
def register(self, dp):
assert dp.id is not None
@ -68,7 +65,7 @@ class DPSet(object):
dp.dp_type = dp_type_
self.dps[dp.id] = dp
self.ev_q.queue(EventDP(dp, True))
self.send_event_to_observers(EventDP(dp, True))
def unregister(self, dp):
if dp.id in self.dps:
@ -76,7 +73,7 @@ class DPSet(object):
assert dp.id not in self.dp_types
self.dp_types[dp.id] = getattr(dp, 'dp_type', dp_type.UNKNOWN)
self.ev_q.queue(EventDP(dp, False))
self.send_event_to_observers(EventDP(dp, False))
def set_type(self, dp_id, dp_type_=dp_type.UNKNOWN):
if dp_id in self.dps:
@ -92,19 +89,14 @@ class DPSet(object):
def get_all(self):
return self.dps.items()
@set_ev_cls(dispatcher.EventDispatcherChange,
dispatcher.QUEUE_EV_DISPATCHER)
@set_ev_cls(ofp_event.EventOFPStateChange,
[handler.MAIN_DISPATCHER, handler.DEAD_DISPATCHER])
def dispacher_change(self, ev):
LOG.debug('dispatcher change q %s dispatcher %s',
ev.ev_q.name, ev.new_dispatcher.name)
if ev.ev_q.name != handler.QUEUE_NAME_OFP_MSG:
return
datapath = ev.ev_q.aux
datapath = ev.datapath
assert datapath is not None
if ev.new_dispatcher.name == handler.DISPATCHER_NAME_OFP_MAIN:
if ev.state == handler.MAIN_DISPATCHER:
LOG.debug('DPSET: register datapath %s', datapath)
self.register(datapath)
elif ev.new_dispatcher.name == handler.DISPATCHER_NAME_OFP_DEAD:
elif ev.state == handler.DEAD_DISPATCHER:
LOG.debug('DPSET: unregister datapath %s', datapath)
self.unregister(datapath)

View File

@ -14,36 +14,39 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import inspect
import logging
from ryu.controller import dispatcher
from ryu.controller import ofp_event
LOG = logging.getLogger('ryu.controller.handler')
QUEUE_NAME_OFP_MSG = 'ofp_msg'
DISPATCHER_NAME_OFP_HANDSHAKE = 'ofp_handshake'
HANDSHAKE_DISPATCHER = dispatcher.EventDispatcher(
DISPATCHER_NAME_OFP_HANDSHAKE)
DISPATCHER_NAME_OFP_CONFIG = 'ofp_config'
CONFIG_DISPATCHER = dispatcher.EventDispatcher(DISPATCHER_NAME_OFP_CONFIG)
DISPATCHER_NAME_OFP_MAIN = 'ofp_main'
MAIN_DISPATCHER = dispatcher.EventDispatcher(DISPATCHER_NAME_OFP_MAIN)
DISPATCHER_NAME_OFP_DEAD = 'ofp_dead'
DEAD_DISPATCHER = dispatcher.EventDispatcher(DISPATCHER_NAME_OFP_DEAD)
# just represent OF datapath state. datapath specific so should be moved.
HANDSHAKE_DISPATCHER = "handshake"
CONFIG_DISPATCHER = "config"
MAIN_DISPATCHER = "main"
DEAD_DISPATCHER = "dead"
# should be named something like 'observe_event'
def set_ev_cls(ev_cls, dispatchers):
def _set_ev_cls_dec(handler):
handler.ev_cls = ev_cls
handler.dispatchers = dispatchers
handler.dispatchers = _listify(dispatchers)
handler.observer = ev_cls.__module__
return handler
return _set_ev_cls_dec
def _is_ev_handler(meth):
def set_ev_handler(ev_cls, dispatchers):
def _set_ev_cls_dec(handler):
handler.ev_cls = ev_cls
handler.dispatchers = _listify(dispatchers)
return handler
return _set_ev_cls_dec
def _is_ev_cls(meth):
return hasattr(meth, 'ev_cls')
@ -58,12 +61,5 @@ def _listify(may_list):
def register_instance(i):
for _k, m in inspect.getmembers(i, inspect.ismethod):
# LOG.debug('instance %s k %s m %s', i, _k, m)
if not _is_ev_handler(m):
continue
_dispatchers = _listify(getattr(m, 'dispatchers', None))
# LOG.debug("_dispatchers %s", _dispatchers)
for d in _dispatchers:
# LOG.debug('register dispatcher %s ev %s k %s m %s',
# d.name, m.ev_cls, _k, m)
d.register_handler(m.ev_cls, m)
if _is_ev_cls(m):
i.register_handler(m.ev_cls, m)

View File

@ -73,3 +73,9 @@ _PARSER_MODULE_LIST = ['ryu.ofproto.ofproto_v1_0_parser',
for m in _PARSER_MODULE_LIST:
# print 'loading module %s' % m
_create_ofp_msg_ev_from_module(m)
class EventOFPStateChange(event.EventBase):
def __init__(self, dp):
super(EventOFPStateChange, self).__init__()
self.datapath = dp

View File

@ -16,10 +16,12 @@
import logging
import ryu.base.app_manager
from ryu import utils
from ryu.base import app_manager
from ryu.controller import handler
from ryu.controller import ofp_event
from ryu.controller.handler import set_ev_cls
from ryu.controller.handler import set_ev_cls, set_ev_handler
from ryu.controller.handler import HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER,\
MAIN_DISPATCHER
@ -39,9 +41,10 @@ LOG = logging.getLogger('ryu.controller.ofp_handler')
# back Echo Reply message.
class OFPHandler(app_manager.RyuApp):
class OFPHandler(ryu.base.app_manager.RyuApp):
def __init__(self, *args, **kwargs):
super(OFPHandler, self).__init__(*args, **kwargs)
self.name = 'ofp_event'
@staticmethod
def hello_failed(datapath, error_desc):
@ -52,7 +55,7 @@ class OFPHandler(app_manager.RyuApp):
error_msg.data = error_desc
datapath.send_msg(error_msg)
@set_ev_cls(ofp_event.EventOFPHello, HANDSHAKE_DISPATCHER)
@set_ev_handler(ofp_event.EventOFPHello, HANDSHAKE_DISPATCHER)
def hello_handler(self, ev):
LOG.debug('hello ev %s', ev)
msg = ev.msg
@ -121,9 +124,9 @@ class OFPHandler(app_manager.RyuApp):
# now move on to config state
LOG.debug('move onto config mode')
datapath.ev_q.set_dispatcher(CONFIG_DISPATCHER)
datapath.set_state(CONFIG_DISPATCHER)
@set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
@set_ev_handler(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
def switch_features_handler(self, ev):
msg = ev.msg
datapath = msg.datapath
@ -147,10 +150,10 @@ class OFPHandler(app_manager.RyuApp):
datapath.send_msg(set_config)
LOG.debug('move onto main mode')
ev.msg.datapath.ev_q.set_dispatcher(MAIN_DISPATCHER)
ev.msg.datapath.set_state(MAIN_DISPATCHER)
@set_ev_cls(ofp_event.EventOFPEchoRequest,
[HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, MAIN_DISPATCHER])
@set_ev_handler(ofp_event.EventOFPEchoRequest,
[HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, MAIN_DISPATCHER])
def echo_request_handler(self, ev):
msg = ev.msg
datapath = msg.datapath
@ -159,8 +162,8 @@ class OFPHandler(app_manager.RyuApp):
echo_reply.data = msg.data
datapath.send_msg(echo_reply)
@set_ev_cls(ofp_event.EventOFPErrorMsg,
[HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, MAIN_DISPATCHER])
@set_ev_handler(ofp_event.EventOFPErrorMsg,
[HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, MAIN_DISPATCHER])
def error_msg_handler(self, ev):
msg = ev.msg
LOG.debug('error msg ev %s type 0x%x code 0x%x %s',