From e45f382e51c9a3d57a6d8c01cec9e70f7ca364fd Mon Sep 17 00:00:00 2001 From: Isaku Yamahata Date: Fri, 22 Nov 2013 16:45:59 +0900 Subject: [PATCH] base/app_manager: create/destroy RyuApp instances dynamically allow RyuManager to create/destroy RyuApp instances dynamically and register/unregister event observer dynamically. Cc: yuta-hamada Signed-off-by: Isaku Yamahata Signed-off-by: YAMAMOTO Takashi Signed-off-by: FUJITA Tomonori --- ryu/base/app_manager.py | 121 ++++++++++++++++++++++++++++++---------- 1 file changed, 92 insertions(+), 29 deletions(-) diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py index 883ca49c..1cd7e9f6 100644 --- a/ryu/base/app_manager.py +++ b/ryu/base/app_manager.py @@ -22,6 +22,7 @@ import sys from ryu import utils from ryu.controller.handler import register_instance, get_dependent_services from ryu.controller.controller import Datapath +from ryu.controller import event from ryu.controller.event import EventRequestBase, EventReplyBase from ryu.lib import hub @@ -41,6 +42,10 @@ def register_app(app): register_instance(app) +def unregister_app(app): + SERVICE_BRICKS.pop(app.name) + + class RyuApp(object): """ Base class for Ryu network application @@ -65,12 +70,23 @@ class RyuApp(object): self.replies = hub.Queue() self.logger = logging.getLogger(self.name) + # prevent accidental creation of instances of this class outside RyuApp + class _EventThreadStop(event.EventBase): + pass + self._event_stop = _EventThreadStop() + self.is_active = True + def start(self): """ Hook that is called after startup initialization is done. """ self.threads.append(hub.spawn(self._event_loop)) + def stop(self): + self.is_active = False + self._send_event(self._event_stop, None) + hub.joinall(self.threads) + def register_handler(self, ev_cls, handler): assert callable(handler) self.event_handlers.setdefault(ev_cls, []) @@ -81,6 +97,14 @@ class RyuApp(object): ev_cls_observers = self.observers.setdefault(ev_cls, {}) ev_cls_observers.setdefault(name, set()).update(states) + def unregister_observer(self, ev_cls, name): + observers = self.observers.get(ev_cls, {}) + observers.pop(name) + + def unregister_observer_all_event(self, name): + for observers in self.observers.values(): + observers.pop(name, None) + def get_handlers(self, ev, state=None): handlers = self.event_handlers.get(ev.__class__, []) if state is None: @@ -109,8 +133,10 @@ class RyuApp(object): return self.replies.get() def _event_loop(self): - while True: + while self.is_active or not self.events.empty(): ev, state = self.events.get() + if ev == self._event_stop: + continue handlers = self.get_handlers(ev, state) for handler in handlers: handler(ev) @@ -210,26 +236,7 @@ class AppManager(object): register_app(context) return self.contexts - def instantiate_apps(self, *args, **kwargs): - for app_name, cls in self.applications_cls.items(): - # for now, only single instance of a given module - # Do we need to support multiple instances? - # Yes, maybe for slicing. - LOG.info('instantiating app %s', app_name) - - if hasattr(cls, 'OFP_VERSIONS'): - for k in Datapath.supported_ofp_version.keys(): - if not k in cls.OFP_VERSIONS: - del Datapath.supported_ofp_version[k] - - assert len(Datapath.supported_ofp_version), \ - 'No OpenFlow version is available' - - assert app_name not in self.applications - app = cls(*args, **kwargs) - register_app(app) - self.applications[app_name] = app - + def _update_bricks(self): for i in SERVICE_BRICKS.values(): for _k, m in inspect.getmembers(i, inspect.ismethod): if not hasattr(m, 'observer'): @@ -247,22 +254,78 @@ class AppManager(object): brick.register_observer(m.ev_cls, i.name, m.dispatchers) + @staticmethod + def _report_brick(name, app): + LOG.debug("BRICK %s" % name) + for ev_cls, list_ in app.observers.items(): + LOG.debug(" PROVIDES %s TO %s" % (ev_cls.__name__, list_)) + for ev_cls in app.event_handlers.keys(): + LOG.debug(" CONSUMES %s" % (ev_cls.__name__,)) + + @staticmethod + def report_bricks(): for brick, i in SERVICE_BRICKS.items(): - LOG.debug("BRICK %s" % brick) - for ev_cls, list in i.observers.items(): - LOG.debug(" PROVIDES %s TO %s" % (ev_cls.__name__, list)) - for ev_cls in i.event_handlers.keys(): - LOG.debug(" CONSUMES %s" % (ev_cls.__name__,)) + AppManager._report_brick(brick, i) + + def _instantiate(self, app_name, cls, *args, **kwargs): + # for now, only single instance of a given module + # Do we need to support multiple instances? + # Yes, maybe for slicing. + LOG.info('instantiating app %s of %s', app_name, cls.__name__) + + if hasattr(cls, 'OFP_VERSIONS'): + for k in Datapath.supported_ofp_version.keys(): + if not k in cls.OFP_VERSIONS: + del Datapath.supported_ofp_version[k] + + assert len(Datapath.supported_ofp_version), \ + 'No OpenFlow version is available' + + if app_name is not None: + assert app_name not in self.applications + app = cls(*args, **kwargs) + register_app(app) + assert app.name not in self.applications + self.applications[app.name] = app + return app + + def instantiate(self, cls, *args, **kwargs): + app = self._instantiate(None, cls, *args, **kwargs) + self._update_bricks() + self._report_brick(app.name, app) + return app + + def instantiate_apps(self, *args, **kwargs): + for app_name, cls in self.applications_cls.items(): + self._instantiate(app_name, cls, *args, **kwargs) + + self._update_bricks() + self.report_bricks() for app in self.applications.values(): app.start() + @staticmethod + def _close(app): + close_method = getattr(app, 'close', None) + if callable(close_method): + close_method() + + def uninstantiate(self, name): + app = self.applications.pop(name) + unregister_app(app) + for app_ in SERVICE_BRICKS.values(): + app_.unregister_observer_all_event(name) + app.stop() + self._close(app) + events = app.events + if not events.empty(): + app.logger.debug('%s events remians %d', app.name, events.qsize()) + def close(self): def close_all(close_dict): for app in close_dict.values(): - close_method = getattr(app, 'close', None) - if callable(close_method): - close_method() + self._close(app) close_dict.clear() close_all(self.applications)