Separate db_change_logic loop from nb_api
To be able to handle pubsub events (namely through db_change_callback) the nb_api forced the user to call the process_changes() method which is a blocking loop. This is unusable for applications that do other things or have event loops of their own. In this patch this logic was moved to the local_controller as it is specific to this use-case, and now the nb_api exposes a register_db_change_callback method for this use case. As a result of this change, there is no need to use the is_external_app flag, as all it does is makes sure the subscriber is not initialized, which happens now if the register_db_change_callback is not called. Change-Id: I2b02fde8018f0ac209175beadf75bbb20a6480d6
This commit is contained in:
parent
f414212d79
commit
d1050491cd
@ -179,7 +179,7 @@ def add_object_from_json(json_str, table):
|
||||
:param table: table name where object should be added
|
||||
:return: None
|
||||
"""
|
||||
nb_api = api_nb.NbApi.get_instance(False, True)
|
||||
nb_api = api_nb.NbApi.get_instance(False)
|
||||
try:
|
||||
model = model_framework.get_model(table)
|
||||
except KeyError:
|
||||
|
@ -279,7 +279,7 @@ def start(is_service):
|
||||
"""main method"""
|
||||
df_config.init(sys.argv)
|
||||
df_utils.config_parse()
|
||||
nb_api = api_nb.NbApi.get_instance(False, True)
|
||||
nb_api = api_nb.NbApi.get_instance(False)
|
||||
if is_service:
|
||||
df_service.register_service('df-skydive-service', nb_api)
|
||||
service_manager = cotyledon.ServiceManager()
|
||||
|
@ -81,7 +81,7 @@ def main():
|
||||
config.init(sys.argv[1:])
|
||||
config.setup_logging()
|
||||
environment_setup()
|
||||
nb_api = api_nb.NbApi.get_instance(False, True)
|
||||
nb_api = api_nb.NbApi.get_instance(False)
|
||||
service_instance = metadata_service.DFMetadataProxyHandler(
|
||||
cfg.CONF, nb_api)
|
||||
df_service.register_service('df-metadata-service', nb_api)
|
||||
|
@ -165,7 +165,7 @@ class BGPService(service.Service):
|
||||
|
||||
def main():
|
||||
df_config.init(sys.argv)
|
||||
nb_api = api_nb.NbApi.get_instance(False, True)
|
||||
nb_api = api_nb.NbApi.get_instance(False)
|
||||
server = BGPService(nb_api)
|
||||
df_service.register_service('df-bgp-service', nb_api)
|
||||
service.launch(cfg.CONF, server).wait()
|
||||
|
@ -14,7 +14,9 @@
|
||||
# under the License.
|
||||
|
||||
import sys
|
||||
import time
|
||||
|
||||
from eventlet import queue
|
||||
from oslo_log import log
|
||||
from oslo_service import loopingcall
|
||||
from ryu.app.ofctl import service as of_service
|
||||
@ -47,9 +49,11 @@ class DfLocalController(object):
|
||||
|
||||
def __init__(self, chassis_name, nb_api):
|
||||
self.db_store = db_store.get_instance()
|
||||
self._queue = queue.PriorityQueue()
|
||||
|
||||
self.chassis_name = chassis_name
|
||||
self.nb_api = nb_api
|
||||
self.nb_api.set_db_change_callback(self.db_change_callback)
|
||||
self.ip = cfg.CONF.df.local_ip
|
||||
# Virtual tunnel port support multiple tunnel types together
|
||||
self.tunnel_types = cfg.CONF.df.tunnel_types
|
||||
@ -67,6 +71,7 @@ class DfLocalController(object):
|
||||
nb_api=self.nb_api,
|
||||
vswitch_api=self.vswitch_api,
|
||||
neutron_server_notifier=self.neutron_notifier,
|
||||
db_change_callback=self.db_change_callback
|
||||
)
|
||||
# The OfctlService is needed to support the 'get_flows' method
|
||||
self.open_flow_service = app_mgr.instantiate(of_service.OfctlService)
|
||||
@ -85,8 +90,21 @@ class DfLocalController(object):
|
||||
self.sync_rate_limiter = df_utils.RateLimiter(
|
||||
max_rate=1, time_unit=db_common.DB_SYNC_MINIMUM_INTERVAL)
|
||||
|
||||
def db_change_callback(self, table, key, action, value, topic=None):
|
||||
update = db_common.DbUpdate(table, key, action, value, topic=topic)
|
||||
LOG.debug("Pushing Update to Queue: %s", update)
|
||||
self._queue.put(update)
|
||||
time.sleep(0)
|
||||
|
||||
def process_changes(self):
|
||||
while True:
|
||||
next_update = self._queue.get(block=True)
|
||||
LOG.debug("Event update: %s", next_update)
|
||||
self.nb_api._notification_cb(next_update)
|
||||
self._queue.task_done()
|
||||
|
||||
def run(self):
|
||||
self.vswitch_api.initialize(self.nb_api)
|
||||
self.vswitch_api.initialize(self.db_change_callback)
|
||||
self.nb_api.register_notification_callback(self._handle_update)
|
||||
if cfg.CONF.df.enable_neutron_notifier:
|
||||
self.neutron_notifier.initialize(nb_api=self.nb_api,
|
||||
@ -118,10 +136,10 @@ class DfLocalController(object):
|
||||
self._register_models()
|
||||
self.register_chassis()
|
||||
self.sync()
|
||||
self.nb_api.process_changes()
|
||||
self.process_changes()
|
||||
|
||||
def _submit_sync_event(self):
|
||||
self.nb_api.db_change_callback(None, None,
|
||||
self.db_change_callback(None, None,
|
||||
ctrl_const.CONTROLLER_SYNC, None)
|
||||
|
||||
def _register_models(self):
|
||||
@ -300,7 +318,7 @@ class DfLocalController(object):
|
||||
action = update.action
|
||||
if action == ctrl_const.CONTROLLER_REINITIALIZE:
|
||||
self.db_store.clear()
|
||||
self.vswitch_api.initialize(self.nb_api)
|
||||
self.vswitch_api.initialize(self.db_change_callback)
|
||||
self.sync()
|
||||
elif action == ctrl_const.CONTROLLER_SYNC:
|
||||
self.sync()
|
||||
|
@ -147,7 +147,10 @@ class PublisherService(object):
|
||||
|
||||
def main():
|
||||
df_config.init(sys.argv)
|
||||
nb_api = api_nb.NbApi.get_instance(False, True)
|
||||
# PATCH(snapiri): Disable pub_sub as it creates a publisher in nb_api
|
||||
# which collides with the publisher we create here.
|
||||
cfg.CONF.set_override('enable_df_pub_sub', False, group='df')
|
||||
nb_api = api_nb.NbApi.get_instance(False)
|
||||
service = PublisherService(nb_api)
|
||||
df_service.register_service('df-publisher-service', nb_api)
|
||||
service.initialize()
|
||||
|
@ -38,6 +38,7 @@ class RyuDFAdapter(ofp_handler.OFPHandler):
|
||||
OF_AUTO_PORT_DESC_STATS_REQ_VER = 0x04
|
||||
|
||||
def __init__(self, vswitch_api, nb_api,
|
||||
db_change_callback,
|
||||
neutron_server_notifier=None):
|
||||
super(RyuDFAdapter, self).__init__()
|
||||
self.dispatcher = dispatcher.AppDispatcher(cfg.CONF.df.apps_list)
|
||||
@ -47,6 +48,7 @@ class RyuDFAdapter(ofp_handler.OFPHandler):
|
||||
self._datapath = None
|
||||
self.table_handlers = {}
|
||||
self.first_connect = True
|
||||
self.db_change_callback = db_change_callback
|
||||
|
||||
@property
|
||||
def datapath(self):
|
||||
@ -111,11 +113,11 @@ class RyuDFAdapter(ofp_handler.OFPHandler):
|
||||
if not self.first_connect:
|
||||
# For reconnecting to the ryu controller, df needs a full sync
|
||||
# in case any resource added during the disconnection.
|
||||
self.nb_api.db_change_callback(None, None,
|
||||
self.db_change_callback(None, None,
|
||||
constants.CONTROLLER_REINITIALIZE,
|
||||
None)
|
||||
self.first_connect = False
|
||||
self.vswitch_api.initialize(self.nb_api)
|
||||
self.vswitch_api.initialize(self.db_change_callback)
|
||||
|
||||
def _send_port_desc_stats_request(self, datapath):
|
||||
ofp_parser = datapath.ofproto_parser
|
||||
|
@ -17,7 +17,6 @@
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from eventlet import queue
|
||||
from jsonmodels import errors
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
@ -60,7 +59,6 @@ class NbApi(object):
|
||||
super(NbApi, self).__init__()
|
||||
self.driver = db_driver
|
||||
self.controller = None
|
||||
self._queue = queue.PriorityQueue()
|
||||
self.use_pubsub = use_pubsub
|
||||
self.publisher = None
|
||||
self.subscriber = None
|
||||
@ -73,7 +71,7 @@ class NbApi(object):
|
||||
self.pub_sub_use_multiproc = cfg.CONF.df.pub_sub_use_multiproc
|
||||
|
||||
@staticmethod
|
||||
def get_instance(is_neutron_server, is_external_app=False):
|
||||
def get_instance(is_neutron_server):
|
||||
global _nb_api
|
||||
if _nb_api is None:
|
||||
nb_driver = df_utils.load_driver(
|
||||
@ -82,18 +80,16 @@ class NbApi(object):
|
||||
# Do not use pubsub for external apps - this causes issues with
|
||||
# threads and other issues.
|
||||
use_pubsub = cfg.CONF.df.enable_df_pub_sub
|
||||
if is_external_app:
|
||||
use_pubsub = False
|
||||
nb_api = NbApi(
|
||||
nb_driver,
|
||||
use_pubsub=use_pubsub,
|
||||
is_neutron_server=is_neutron_server)
|
||||
ip, port = get_db_ip_port()
|
||||
nb_api.initialize(db_ip=ip, db_port=port)
|
||||
nb_api._initialize(db_ip=ip, db_port=port)
|
||||
_nb_api = nb_api
|
||||
return _nb_api
|
||||
|
||||
def initialize(self, db_ip='127.0.0.1', db_port=4001):
|
||||
def _initialize(self, db_ip='127.0.0.1', db_port=4001):
|
||||
self.driver.initialize(db_ip, db_port, config=cfg.CONF.df)
|
||||
if self.use_pubsub:
|
||||
self.publisher = self._get_publisher()
|
||||
@ -113,14 +109,19 @@ class NbApi(object):
|
||||
if "active_port_detection" in cfg.CONF.df.apps_list:
|
||||
self.publisher.initialize()
|
||||
|
||||
def set_db_change_callback(self, db_change_callback):
|
||||
if self.use_pubsub and not self.is_neutron_server:
|
||||
# NOTE(gampel) we want to start queuing event as soon
|
||||
# as possible
|
||||
self._start_subscriber()
|
||||
if not self.subscriber.is_running:
|
||||
self._start_subscriber(db_change_callback)
|
||||
# Register for DB Failover detection in NB Plugin
|
||||
self.subscriber.set_subscriber_for_failover(
|
||||
self.subscriber,
|
||||
self.db_change_callback)
|
||||
db_change_callback)
|
||||
self.subscriber.register_hamsg_for_db()
|
||||
else:
|
||||
LOG.warning('Subscriber is already initialized, ignoring call')
|
||||
|
||||
def close(self):
|
||||
if self.publisher:
|
||||
@ -152,8 +153,8 @@ class NbApi(object):
|
||||
df_utils.DF_PUBSUB_DRIVER_NAMESPACE)
|
||||
return pub_sub_driver.get_subscriber()
|
||||
|
||||
def _start_subscriber(self):
|
||||
self.subscriber.initialize(self.db_change_callback)
|
||||
def _start_subscriber(self, db_change_callback):
|
||||
self.subscriber.initialize(db_change_callback)
|
||||
self.subscriber.register_topic(db_common.SEND_ALL_TOPIC)
|
||||
publishers_ips = cfg.CONF.df.publishers_ips
|
||||
uris = {'%s://%s:%s' % (
|
||||
@ -200,19 +201,6 @@ class NbApi(object):
|
||||
self.subscriber.register_topic(topic)
|
||||
self.subscriber.daemonize()
|
||||
|
||||
def db_change_callback(self, table, key, action, value, topic=None):
|
||||
update = db_common.DbUpdate(table, key, action, value, topic=topic)
|
||||
LOG.debug("Pushing Update to Queue: %s", update)
|
||||
self._queue.put(update)
|
||||
time.sleep(0)
|
||||
|
||||
def process_changes(self):
|
||||
while True:
|
||||
next_update = self._queue.get(block=True)
|
||||
LOG.debug("Event update: %s", next_update)
|
||||
self._notification_cb(next_update)
|
||||
self._queue.task_done()
|
||||
|
||||
def create(self, obj, skip_send_event=False):
|
||||
"""Create the provided object in the database and publish an event
|
||||
about its creation.
|
||||
|
@ -96,9 +96,9 @@ def _is_ovsport_update_valid(action, ovsport):
|
||||
|
||||
|
||||
class DFIdl(idl.Idl):
|
||||
def __init__(self, nb_api, remote, schema):
|
||||
def __init__(self, remote, schema, db_change_callback):
|
||||
super(DFIdl, self).__init__(remote, schema)
|
||||
self.nb_api = nb_api
|
||||
self.db_change_callback = db_change_callback
|
||||
|
||||
def notify(self, event, row, updates=None):
|
||||
if not row or not hasattr(row, '_table'):
|
||||
@ -109,7 +109,7 @@ class DFIdl(idl.Idl):
|
||||
local_interface = ovs.OvsPort.from_idl_row(row)
|
||||
action = event if event != 'update' else 'set'
|
||||
if _is_ovsport_update_valid(action, local_interface):
|
||||
self.nb_api.db_change_callback(
|
||||
self.db_change_callback(
|
||||
local_interface.table_name,
|
||||
local_interface.id,
|
||||
action,
|
||||
@ -117,7 +117,8 @@ class DFIdl(idl.Idl):
|
||||
)
|
||||
|
||||
|
||||
def df_idl_from_server(nb_api, connection_string, schema_name):
|
||||
def df_idl_from_server(connection_string, schema_name,
|
||||
db_change_callback):
|
||||
"""Create the Idl instance by pulling the schema from OVSDB server"""
|
||||
helper = idlutils.get_schema_helper(connection_string, schema_name)
|
||||
tables = ovsdb_monitor_table_filter_default
|
||||
@ -126,7 +127,7 @@ def df_idl_from_server(nb_api, connection_string, schema_name):
|
||||
helper.register_table(table_name)
|
||||
else:
|
||||
helper.register_columns(table_name, columns)
|
||||
return DFIdl(nb_api, connection_string, helper)
|
||||
return DFIdl(connection_string, helper, db_change_callback)
|
||||
|
||||
|
||||
class DFOvsdbApi(impl_idl.OvsdbIdl):
|
||||
@ -136,8 +137,9 @@ class DFOvsdbApi(impl_idl.OvsdbIdl):
|
||||
class OvsdbIdl has defined lots of command. Dragonflow can use
|
||||
them. And Dragonflow can extend its own commands in this class.
|
||||
"""
|
||||
def __init__(self, nb_api, db_connection, timeout):
|
||||
idl = df_idl_from_server(nb_api, db_connection, 'Open_vSwitch')
|
||||
def __init__(self, db_connection, timeout, db_change_callback):
|
||||
idl = df_idl_from_server(db_connection, 'Open_vSwitch',
|
||||
db_change_callback)
|
||||
type(self).ovsdb_connection = None
|
||||
ovsdb_connection = connection.Connection(idl, timeout)
|
||||
super(DFOvsdbApi, self).__init__(ovsdb_connection)
|
||||
|
@ -54,16 +54,17 @@ class OvsApi(object):
|
||||
else:
|
||||
vlog.Vlog.init()
|
||||
|
||||
def initialize(self, nb_api):
|
||||
def initialize(self, db_change_callback):
|
||||
db_connection = ('%s:%s:%s' % (self.protocol, self.ip, self.port))
|
||||
|
||||
nb_api.db_change_callback(None, None,
|
||||
db_change_callback(None, None,
|
||||
constants.CONTROLLER_OVS_SYNC_STARTED, None)
|
||||
|
||||
self.ovsdb = impl_idl.DFOvsdbApi(
|
||||
nb_api, db_connection, self.vsctl_timeout)
|
||||
db_connection, self.vsctl_timeout,
|
||||
db_change_callback)
|
||||
|
||||
nb_api.db_change_callback(None, None,
|
||||
db_change_callback(None, None,
|
||||
constants.CONTROLLER_OVS_SYNC_FINISHED, None)
|
||||
|
||||
def _db_get_val(self, table, record, column, check_error=False,
|
||||
|
@ -13,7 +13,9 @@
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
import time
|
||||
|
||||
from eventlet import queue
|
||||
from oslo_log import log
|
||||
|
||||
from dragonflow.common import utils as df_utils
|
||||
@ -57,14 +59,19 @@ class DFTestBase(base.BaseTestCase):
|
||||
self.conf = cfg.CONF.df
|
||||
self.integration_bridge = self.conf.integration_bridge
|
||||
|
||||
self._queue = queue.PriorityQueue()
|
||||
self.nb_api = api_nb.NbApi.get_instance(False)
|
||||
# As we are running in the same process over and over,
|
||||
# do not perform redundant calls to the subscriber
|
||||
if not self.nb_api.subscriber.is_running:
|
||||
self.nb_api.set_db_change_callback(self._db_change_callback)
|
||||
|
||||
self.mgt_ip = self.conf.management_ip
|
||||
self.__objects_to_close = []
|
||||
self.addCleanup(self._close_stored_objects)
|
||||
|
||||
self.vswitch_api = utils.OvsTestApi(self.mgt_ip)
|
||||
self.vswitch_api.initialize(self.nb_api)
|
||||
self.vswitch_api.initialize(self._db_change_callback)
|
||||
|
||||
if cfg.CONF.df.enable_selective_topology_distribution:
|
||||
self.start_subscribing()
|
||||
@ -73,6 +80,11 @@ class DFTestBase(base.BaseTestCase):
|
||||
self._publish_log_event('started')
|
||||
self.addCleanup(self._publish_log_event, 'finished')
|
||||
|
||||
def _db_change_callback(self, table, key, action, value, topic=None):
|
||||
update = db_common.DbUpdate(table, key, action, value, topic=topic)
|
||||
self._queue.put(update)
|
||||
time.sleep(0)
|
||||
|
||||
def _publish_log_event(self, event):
|
||||
global _publisher
|
||||
if _publisher is None:
|
||||
|
@ -64,22 +64,22 @@ class TestOvsdbMonitor(test_base.DFTestBase):
|
||||
return True
|
||||
|
||||
def _get_wanted_vm_online(self, mac):
|
||||
while self.nb_api._queue.qsize() > 0:
|
||||
self.next_update = self.nb_api._queue.get()
|
||||
while self._queue.qsize() > 0:
|
||||
self.next_update = self._queue.get()
|
||||
if self._check_wanted_vm_online(self.next_update, mac):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _get_wanted_vm_offline(self, mac):
|
||||
while self.nb_api._queue.qsize() > 0:
|
||||
self.next_update = self.nb_api._queue.get()
|
||||
while self._queue.qsize() > 0:
|
||||
self.next_update = self._queue.get()
|
||||
if self._check_wanted_vm_offline(self.next_update, mac):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _get_all_wanted_vms_online(self, mac1, mac2):
|
||||
while self.nb_api._queue.qsize() > 0:
|
||||
self.next_update = self.nb_api._queue.get()
|
||||
while self._queue.qsize() > 0:
|
||||
self.next_update = self._queue.get()
|
||||
if self._check_wanted_vm_online(self.next_update, mac1):
|
||||
self.set_wanted_vms.add(mac1)
|
||||
if len(self.set_wanted_vms) == 2:
|
||||
@ -93,7 +93,7 @@ class TestOvsdbMonitor(test_base.DFTestBase):
|
||||
return False
|
||||
|
||||
def test_notify_message(self):
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network = objects.NetworkTestObj(self.neutron, self.nb_api)
|
||||
network_id = network.create()
|
||||
subnet = self.store(objects.SubnetTestObj(self.neutron, self.nb_api,
|
||||
network_id))
|
||||
|
@ -374,7 +374,7 @@ class TestDbTableMonitors(PubSubTestBase):
|
||||
self.namespace.events = []
|
||||
self.namespace.has_values = False
|
||||
self.publisher = self._get_server_publisher()
|
||||
self.subscriber = self._get_subscriber(self._db_change_callback)
|
||||
self.subscriber = self._get_subscriber(self._pubsub_change_callback)
|
||||
self.monitor = self._create_monitor('chassis')
|
||||
|
||||
def tearDown(self):
|
||||
@ -384,7 +384,7 @@ class TestDbTableMonitors(PubSubTestBase):
|
||||
self._stop_publisher(self.publisher)
|
||||
super(TestDbTableMonitors, self).tearDown()
|
||||
|
||||
def _db_change_callback(self, table, key, action, value, topic):
|
||||
def _pubsub_change_callback(self, table, key, action, value, topic):
|
||||
self.namespace.events.append({
|
||||
'table': table,
|
||||
'key': key,
|
||||
|
@ -33,16 +33,18 @@ class TestRyuBaseApp(test_base.DFTestBase):
|
||||
ryu_cfg.CONF.ofp_listen_host = cfg.CONF.df_ryu.of_listen_address
|
||||
ryu_cfg.CONF.ofp_tcp_listen_port = cfg.CONF.df_ryu.of_listen_port + 1
|
||||
app_mgr = app_manager.AppManager.get_instance()
|
||||
self.open_flow_app = app_mgr.instantiate(ryu_base_app.RyuDFAdapter,
|
||||
self.open_flow_app = app_mgr.instantiate(
|
||||
ryu_base_app.RyuDFAdapter,
|
||||
vswitch_api=mock.Mock(),
|
||||
nb_api=mock.Mock())
|
||||
nb_api=mock.Mock(),
|
||||
db_change_callback=self._db_change_callback)
|
||||
self.open_flow_app.load = mock.Mock()
|
||||
self.addCleanup(app_mgr.uninstantiate, self.open_flow_app.name)
|
||||
|
||||
test_controller = ('tcp:' + cfg.CONF.df_ryu.of_listen_address + ':' +
|
||||
str(cfg.CONF.df_ryu.of_listen_port + 1))
|
||||
self.vswitch_api = vswitch_impl.OvsApi(self.mgt_ip)
|
||||
self.vswitch_api.initialize(self.nb_api)
|
||||
self.vswitch_api.initialize(self._db_change_callback)
|
||||
cur_controllers = self.vswitch_api.ovsdb.get_controller(
|
||||
self.integration_bridge).execute()
|
||||
cur_controllers.append(test_controller)
|
||||
|
@ -79,9 +79,10 @@ class DFAppTestBase(tests_base.BaseTestCase):
|
||||
self.vswitch_api = self.controller.vswitch_api = mock.MagicMock()
|
||||
kwargs = dict(
|
||||
nb_api=self.controller.nb_api,
|
||||
vswitch_api=self.controller.vswitch_api,
|
||||
vswitch_api=self.controller.vswitch_api
|
||||
)
|
||||
self.controller.open_flow_app = ryu_base_app.RyuDFAdapter(**kwargs)
|
||||
self.controller.open_flow_app = ryu_base_app.RyuDFAdapter(
|
||||
db_change_callback=self.controller.db_change_callback, **kwargs)
|
||||
self.open_flow_app = self.controller.open_flow_app
|
||||
self.datapath = self.open_flow_app._datapath = mock.Mock()
|
||||
self.open_flow_app.load(self.controller.open_flow_app, **kwargs)
|
||||
|
@ -31,7 +31,8 @@ class TestRyuDFAdapter(tests_base.BaseTestCase):
|
||||
super(TestRyuDFAdapter, self).setUp()
|
||||
self.ryu_df_adapter = ryu_base_app.RyuDFAdapter(
|
||||
vswitch_api=mock.Mock(),
|
||||
nb_api=mock.Mock())
|
||||
nb_api=mock.Mock(),
|
||||
db_change_callback=mock.Mock())
|
||||
self.mock_app = mock.Mock(spec=[
|
||||
'router_updated',
|
||||
'router_deleted',
|
||||
|
Loading…
Reference in New Issue
Block a user