base/app_manager: create/destroy RyuApp instances dynamically

allow RyuManager to create/destroy RyuApp instances dynamically
and register/unregister event observer dynamically.

Cc: yuta-hamada <yuta.hamada.z02@gimal.com>
Signed-off-by: Isaku Yamahata <yamahata@valinux.co.jp>
Signed-off-by: YAMAMOTO Takashi <yamamoto@valinux.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
Isaku Yamahata 2013-11-22 16:45:59 +09:00 committed by FUJITA Tomonori
parent 2a10dfbd9c
commit e45f382e51

View File

@ -22,6 +22,7 @@ import sys
from ryu import utils from ryu import utils
from ryu.controller.handler import register_instance, get_dependent_services from ryu.controller.handler import register_instance, get_dependent_services
from ryu.controller.controller import Datapath from ryu.controller.controller import Datapath
from ryu.controller import event
from ryu.controller.event import EventRequestBase, EventReplyBase from ryu.controller.event import EventRequestBase, EventReplyBase
from ryu.lib import hub from ryu.lib import hub
@ -41,6 +42,10 @@ def register_app(app):
register_instance(app) register_instance(app)
def unregister_app(app):
SERVICE_BRICKS.pop(app.name)
class RyuApp(object): class RyuApp(object):
""" """
Base class for Ryu network application Base class for Ryu network application
@ -65,12 +70,23 @@ class RyuApp(object):
self.replies = hub.Queue() self.replies = hub.Queue()
self.logger = logging.getLogger(self.name) self.logger = logging.getLogger(self.name)
# prevent accidental creation of instances of this class outside RyuApp
class _EventThreadStop(event.EventBase):
pass
self._event_stop = _EventThreadStop()
self.is_active = True
def start(self): def start(self):
""" """
Hook that is called after startup initialization is done. Hook that is called after startup initialization is done.
""" """
self.threads.append(hub.spawn(self._event_loop)) self.threads.append(hub.spawn(self._event_loop))
def stop(self):
self.is_active = False
self._send_event(self._event_stop, None)
hub.joinall(self.threads)
def register_handler(self, ev_cls, handler): def register_handler(self, ev_cls, handler):
assert callable(handler) assert callable(handler)
self.event_handlers.setdefault(ev_cls, []) self.event_handlers.setdefault(ev_cls, [])
@ -81,6 +97,14 @@ class RyuApp(object):
ev_cls_observers = self.observers.setdefault(ev_cls, {}) ev_cls_observers = self.observers.setdefault(ev_cls, {})
ev_cls_observers.setdefault(name, set()).update(states) ev_cls_observers.setdefault(name, set()).update(states)
def unregister_observer(self, ev_cls, name):
observers = self.observers.get(ev_cls, {})
observers.pop(name)
def unregister_observer_all_event(self, name):
for observers in self.observers.values():
observers.pop(name, None)
def get_handlers(self, ev, state=None): def get_handlers(self, ev, state=None):
handlers = self.event_handlers.get(ev.__class__, []) handlers = self.event_handlers.get(ev.__class__, [])
if state is None: if state is None:
@ -109,8 +133,10 @@ class RyuApp(object):
return self.replies.get() return self.replies.get()
def _event_loop(self): def _event_loop(self):
while True: while self.is_active or not self.events.empty():
ev, state = self.events.get() ev, state = self.events.get()
if ev == self._event_stop:
continue
handlers = self.get_handlers(ev, state) handlers = self.get_handlers(ev, state)
for handler in handlers: for handler in handlers:
handler(ev) handler(ev)
@ -210,26 +236,7 @@ class AppManager(object):
register_app(context) register_app(context)
return self.contexts return self.contexts
def instantiate_apps(self, *args, **kwargs): def _update_bricks(self):
for app_name, cls in self.applications_cls.items():
# for now, only single instance of a given module
# Do we need to support multiple instances?
# Yes, maybe for slicing.
LOG.info('instantiating app %s', app_name)
if hasattr(cls, 'OFP_VERSIONS'):
for k in Datapath.supported_ofp_version.keys():
if not k in cls.OFP_VERSIONS:
del Datapath.supported_ofp_version[k]
assert len(Datapath.supported_ofp_version), \
'No OpenFlow version is available'
assert app_name not in self.applications
app = cls(*args, **kwargs)
register_app(app)
self.applications[app_name] = app
for i in SERVICE_BRICKS.values(): for i in SERVICE_BRICKS.values():
for _k, m in inspect.getmembers(i, inspect.ismethod): for _k, m in inspect.getmembers(i, inspect.ismethod):
if not hasattr(m, 'observer'): if not hasattr(m, 'observer'):
@ -247,22 +254,78 @@ class AppManager(object):
brick.register_observer(m.ev_cls, i.name, brick.register_observer(m.ev_cls, i.name,
m.dispatchers) m.dispatchers)
@staticmethod
def _report_brick(name, app):
LOG.debug("BRICK %s" % name)
for ev_cls, list_ in app.observers.items():
LOG.debug(" PROVIDES %s TO %s" % (ev_cls.__name__, list_))
for ev_cls in app.event_handlers.keys():
LOG.debug(" CONSUMES %s" % (ev_cls.__name__,))
@staticmethod
def report_bricks():
for brick, i in SERVICE_BRICKS.items(): for brick, i in SERVICE_BRICKS.items():
LOG.debug("BRICK %s" % brick) AppManager._report_brick(brick, i)
for ev_cls, list in i.observers.items():
LOG.debug(" PROVIDES %s TO %s" % (ev_cls.__name__, list)) def _instantiate(self, app_name, cls, *args, **kwargs):
for ev_cls in i.event_handlers.keys(): # for now, only single instance of a given module
LOG.debug(" CONSUMES %s" % (ev_cls.__name__,)) # Do we need to support multiple instances?
# Yes, maybe for slicing.
LOG.info('instantiating app %s of %s', app_name, cls.__name__)
if hasattr(cls, 'OFP_VERSIONS'):
for k in Datapath.supported_ofp_version.keys():
if not k in cls.OFP_VERSIONS:
del Datapath.supported_ofp_version[k]
assert len(Datapath.supported_ofp_version), \
'No OpenFlow version is available'
if app_name is not None:
assert app_name not in self.applications
app = cls(*args, **kwargs)
register_app(app)
assert app.name not in self.applications
self.applications[app.name] = app
return app
def instantiate(self, cls, *args, **kwargs):
app = self._instantiate(None, cls, *args, **kwargs)
self._update_bricks()
self._report_brick(app.name, app)
return app
def instantiate_apps(self, *args, **kwargs):
for app_name, cls in self.applications_cls.items():
self._instantiate(app_name, cls, *args, **kwargs)
self._update_bricks()
self.report_bricks()
for app in self.applications.values(): for app in self.applications.values():
app.start() app.start()
@staticmethod
def _close(app):
close_method = getattr(app, 'close', None)
if callable(close_method):
close_method()
def uninstantiate(self, name):
app = self.applications.pop(name)
unregister_app(app)
for app_ in SERVICE_BRICKS.values():
app_.unregister_observer_all_event(name)
app.stop()
self._close(app)
events = app.events
if not events.empty():
app.logger.debug('%s events remians %d', app.name, events.qsize())
def close(self): def close(self):
def close_all(close_dict): def close_all(close_dict):
for app in close_dict.values(): for app in close_dict.values():
close_method = getattr(app, 'close', None) self._close(app)
if callable(close_method):
close_method()
close_dict.clear() close_dict.clear()
close_all(self.applications) close_all(self.applications)