Migrate Listener to the new model framework
Change-Id: Iad0344832f8d7eba05afeced49854445a93b477c Partially-Implements: bp refactor-nb-api
This commit is contained in:
parent
2d52ded02d
commit
f6d0ce4888
@ -167,36 +167,6 @@ class NbApi(object):
|
||||
self.publisher.send_event(update)
|
||||
eventlet.sleep(0)
|
||||
|
||||
def get_neutron_listener(self, id):
|
||||
try:
|
||||
listener = self.driver.get_key(db_models.Listener.table_name, id)
|
||||
return db_models.Listener(listener)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def get_all_neutron_listeners(self):
|
||||
listeners = self.driver.get_all_entries(db_models.Listener.table_name)
|
||||
return [db_models.Listener(l) for l in listeners]
|
||||
|
||||
def create_neutron_listener(self, id, **columns):
|
||||
listener = {
|
||||
'id': id
|
||||
}
|
||||
listener.update(columns)
|
||||
listener_json = jsonutils.dumps(listener)
|
||||
self.driver.create_key(db_models.Listener.table_name, id,
|
||||
listener_json)
|
||||
|
||||
def update_neutron_listener(self, id, **columns):
|
||||
listener_json = self.driver.get_key(db_models.Listener.table_name, id)
|
||||
listener = jsonutils.loads(listener_json)
|
||||
listener.update(columns)
|
||||
listener_json = jsonutils.dumps(listener)
|
||||
self.driver.set_key(db_models.Listener.table_name, id, listener_json)
|
||||
|
||||
def delete_neutron_listener(self, host):
|
||||
self.driver.delete_key(db_models.Listener.table_name, host)
|
||||
|
||||
def register_notification_callback(self, controller):
|
||||
self.controller = controller
|
||||
LOG.info("DB configuration sync finished, waiting for changes")
|
||||
|
@ -17,7 +17,6 @@ NbDbObject = legacy.NbDbObject
|
||||
UniqueKeyMixin = legacy.UniqueKeyMixin
|
||||
Floatingip = legacy.Floatingip
|
||||
AllowedAddressPairsActivePort = legacy.AllowedAddressPairsActivePort
|
||||
Listener = legacy.Listener
|
||||
OvsPort = legacy.OvsPort
|
||||
|
||||
UNIQUE_KEY = legacy.UNIQUE_KEY
|
||||
|
@ -44,3 +44,27 @@ class Publisher(mf.ModelBase, mixins.Name):
|
||||
@classmethod
|
||||
def on_get_all_post(self, instances):
|
||||
return [o for o in instances if not o.is_stale()]
|
||||
|
||||
|
||||
@mf.register_model
|
||||
@mf.construct_nb_db_model
|
||||
class Listener(mf.ModelBase):
|
||||
table_name = "listener"
|
||||
|
||||
timestamp = df_fields.TimestampField()
|
||||
ppid = fields.IntField()
|
||||
|
||||
@property
|
||||
def topic(self):
|
||||
return 'listener_{id}'.format(id=self.id)
|
||||
|
||||
def update_timestamp(self):
|
||||
self.timestamp = time.time()
|
||||
|
||||
def on_create_pre(self):
|
||||
super(Listener, self).on_create_pre()
|
||||
self.update_timestamp()
|
||||
|
||||
def on_update_pre(self):
|
||||
super(Listener, self).on_update_pre()
|
||||
self.update_timestamp()
|
||||
|
@ -166,21 +166,6 @@ class AllowedAddressPairsActivePort(NbDbObject):
|
||||
return True
|
||||
|
||||
|
||||
@register_model_class
|
||||
class Listener(NbDbObject):
|
||||
|
||||
table_name = "listener"
|
||||
|
||||
def get_topic(self):
|
||||
return 'listener' + '_' + self.inner_obj['id']
|
||||
|
||||
def get_timestamp(self):
|
||||
return self.inner_obj['timestamp']
|
||||
|
||||
def get_ppid(self):
|
||||
return self.inner_obj['ppid']
|
||||
|
||||
|
||||
class OvsPort(object):
|
||||
|
||||
TYPE_VM = 'vm'
|
||||
|
@ -27,6 +27,7 @@ from oslo_log import log
|
||||
from dragonflow.common import utils as df_utils
|
||||
from dragonflow.db import db_common
|
||||
from dragonflow.db import models
|
||||
from dragonflow.db.models import core
|
||||
from dragonflow.db.models import l2
|
||||
from dragonflow.db.neutron import lockedobjects_db as lock_db
|
||||
from dragonflow.db import neutron_notifier_api
|
||||
@ -54,11 +55,11 @@ class NbApiNeutronNotifier(neutron_notifier_api.NeutronNotifierDriver):
|
||||
|
||||
@lock_db.wrap_db_lock(lock_db.RESOURCE_NEUTRON_LISTENER)
|
||||
def create_heart_beat_reporter(self, host):
|
||||
listener = self.nb_api.get_neutron_listener(host)
|
||||
if not listener:
|
||||
listener = self.nb_api.get(core.Listener(id=host))
|
||||
if listener is None:
|
||||
self._create_heart_beat_reporter(host)
|
||||
else:
|
||||
ppid = listener.get_ppid()
|
||||
ppid = listener.ppid
|
||||
my_ppid = os.getppid()
|
||||
LOG.info("Listener %(l)s exists, my ppid is %(ppid)s",
|
||||
{'l': listener, 'ppid': my_ppid})
|
||||
@ -66,14 +67,18 @@ class NbApiNeutronNotifier(neutron_notifier_api.NeutronNotifierDriver):
|
||||
# equal to my_ppid. I tried to set api_worker=1, still multiple
|
||||
# neutron-server processes were created.
|
||||
if ppid != my_ppid:
|
||||
self.nb_api.delete_neutron_listener(host)
|
||||
self.nb_api.delete(listener)
|
||||
self._create_heart_beat_reporter(host)
|
||||
|
||||
def _create_heart_beat_reporter(self, host):
|
||||
listener = core.Listener(
|
||||
id=host,
|
||||
ppid=os.getppid(),
|
||||
)
|
||||
self.nb_api.register_listener_callback(self.notify_neutron_server,
|
||||
'listener_' + host)
|
||||
LOG.info("Register listener %s", host)
|
||||
self.heart_beat_reporter = HeartBeatReporter(self.nb_api)
|
||||
listener.topic)
|
||||
LOG.info("Register listener %s", listener.id)
|
||||
self.heart_beat_reporter = HeartBeatReporter(self.nb_api, listener)
|
||||
self.heart_beat_reporter.daemonize()
|
||||
|
||||
def notify_port_status(self, ovs_port, status):
|
||||
@ -86,7 +91,7 @@ class NbApiNeutronNotifier(neutron_notifier_api.NeutronNotifierDriver):
|
||||
fip.get_id(), 'update', status)
|
||||
|
||||
def _send_event(self, table, key, action, value):
|
||||
listeners = self.nb_api.get_all_neutron_listeners()
|
||||
listeners = self.nb_api.get_all(core.Listener)
|
||||
listeners_num = len(listeners)
|
||||
if listeners_num > 1:
|
||||
# Sort by timestamp and choose from the latest ones randomly.
|
||||
@ -98,14 +103,14 @@ class NbApiNeutronNotifier(neutron_notifier_api.NeutronNotifierDriver):
|
||||
# one is chosen. For users, do not need to figure out what is
|
||||
# the best report interval. A big interval increase the possility a
|
||||
# dead one is chosen, while a small one may affect the performance
|
||||
listeners.sort(key=lambda l: l.get_timestamp(), reverse=True)
|
||||
listeners.sort(key=lambda l: l.timestamp, reverse=True)
|
||||
selected = random.choice(listeners[:len(listeners) / 2])
|
||||
elif listeners_num == 1:
|
||||
selected = listeners[0]
|
||||
else:
|
||||
LOG.warning("No neutron listener found")
|
||||
return
|
||||
topic = selected.get_topic()
|
||||
topic = selected.topic
|
||||
update = db_common.DbUpdate(table, key, action, value, topic=topic)
|
||||
LOG.info("Publish to neutron %s", topic)
|
||||
self.nb_api.publisher.send_event(update)
|
||||
@ -125,8 +130,9 @@ class NbApiNeutronNotifier(neutron_notifier_api.NeutronNotifierDriver):
|
||||
class HeartBeatReporter(object):
|
||||
"""Updates heartbeat timestamp periodically with a random delay."""
|
||||
|
||||
def __init__(self, api_nb):
|
||||
def __init__(self, api_nb, listener):
|
||||
self.api_nb = api_nb
|
||||
self.listener = listener
|
||||
self._daemon = df_utils.DFDaemon()
|
||||
|
||||
def daemonize(self):
|
||||
@ -136,11 +142,7 @@ class HeartBeatReporter(object):
|
||||
return self._daemon.stop()
|
||||
|
||||
def run(self):
|
||||
listener = cfg.CONF.host
|
||||
ppid = os.getppid()
|
||||
self.api_nb.create_neutron_listener(listener,
|
||||
timestamp=int(time.time()),
|
||||
ppid=ppid)
|
||||
self.api_nb.create(self.listener)
|
||||
|
||||
cfg_interval = cfg.CONF.df.neutron_listener_report_interval
|
||||
delay = cfg.CONF.df.neutron_listener_report_delay
|
||||
@ -151,9 +153,7 @@ class HeartBeatReporter(object):
|
||||
# throughput and pressure for df-db in a big scale
|
||||
interval = random.randint(cfg_interval, cfg_interval + delay)
|
||||
time.sleep(interval)
|
||||
timestamp = int(time.time())
|
||||
self.api_nb.update_neutron_listener(listener,
|
||||
timestamp=timestamp,
|
||||
ppid=ppid)
|
||||
self.api_nb.update(self.listener)
|
||||
except Exception:
|
||||
LOG.exception("Failed to report heart beat for %s", listener)
|
||||
LOG.exception("Failed to report heart beat for %s",
|
||||
self.listener)
|
||||
|
@ -14,7 +14,6 @@ import copy
|
||||
|
||||
import mock
|
||||
|
||||
from dragonflow.db import models as db_models
|
||||
from dragonflow.db.models import l2
|
||||
from dragonflow.db.models import l3
|
||||
from dragonflow.tests.fullstack import test_base
|
||||
@ -91,50 +90,3 @@ class Test_API_NB(test_base.DFTestBase):
|
||||
self.assertIsNotNone(lrouter1.unique_key)
|
||||
|
||||
self.assertNotEqual(lrouter.unique_key, lrouter1.unique_key)
|
||||
|
||||
def test_create_listener(self):
|
||||
# prepare
|
||||
fake_listener1 = db_models.Listener("{}")
|
||||
fake_listener1.inner_obj = {"id": "fake_host1",
|
||||
"timestamp": 1,
|
||||
"ppid": -1}
|
||||
|
||||
fake_listener2 = db_models.Listener("{}")
|
||||
fake_listener2.inner_obj = {"id": "fake_host2",
|
||||
"timestamp": 2,
|
||||
"ppid": -2}
|
||||
|
||||
# test creating
|
||||
self.nb_api.create_neutron_listener('fake_host1',
|
||||
timestamp=1,
|
||||
ppid=-1)
|
||||
self.nb_api.create_neutron_listener('fake_host2',
|
||||
timestamp=2,
|
||||
ppid=-2)
|
||||
|
||||
listeners = self.nb_api.get_all_neutron_listeners()
|
||||
self.assertIn(fake_listener1, listeners)
|
||||
self.assertIn(fake_listener2, listeners)
|
||||
|
||||
# test updating timestamp
|
||||
self.nb_api.update_neutron_listener('fake_host1',
|
||||
timestamp=11)
|
||||
listener1 = self.nb_api.get_neutron_listener('fake_host1')
|
||||
self.assertEqual(listener1.get_timestamp(), 11)
|
||||
self.assertEqual(listener1.get_ppid(), -1)
|
||||
|
||||
# test updating timestamp and ppid
|
||||
self.nb_api.update_neutron_listener('fake_host2',
|
||||
timestamp=22,
|
||||
ppid=-22)
|
||||
listener2 = self.nb_api.get_neutron_listener('fake_host2')
|
||||
self.assertEqual(listener2.get_timestamp(), 22)
|
||||
self.assertEqual(listener2.get_ppid(), -22)
|
||||
|
||||
# test deleting
|
||||
self.nb_api.delete_neutron_listener('fake_host1')
|
||||
self.nb_api.delete_neutron_listener('fake_host2')
|
||||
listener1 = self.nb_api.get_neutron_listener('fake_host1')
|
||||
listener2 = self.nb_api.get_neutron_listener('fake_host2')
|
||||
self.assertIsNone(listener1)
|
||||
self.assertIsNone(listener2)
|
||||
|
@ -10,14 +10,11 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from dragonflow.common import utils as df_utils
|
||||
from dragonflow.db import models
|
||||
from dragonflow.db.models import core
|
||||
from dragonflow.db.models import l2
|
||||
from dragonflow.tests import base as tests_base
|
||||
from dragonflow.tests.common import utils
|
||||
@ -35,27 +32,28 @@ class TestNbApiNeutronNotifier(tests_base.BaseTestCase):
|
||||
self.notifier = df_utils.load_driver(
|
||||
cfg.CONF.df.neutron_notifier,
|
||||
df_utils.DF_NEUTRON_NOTIFIER_DRIVER_NAMESPACE)
|
||||
self.notifier.nb_api = mock.Mock()
|
||||
|
||||
def test_create_heart_beat_reporter(self):
|
||||
nb_api = mock.Mock()
|
||||
self.notifier.nb_api = nb_api
|
||||
nb_api.get_neutron_listener.return_value = None
|
||||
self.notifier.create_heart_beat_reporter('fake_host')
|
||||
self.assertTrue(nb_api.register_listener_callback.called)
|
||||
getppid_patch = mock.patch('os.getppid', return_value=1)
|
||||
self.addCleanup(getppid_patch.stop)
|
||||
getppid_patch.start()
|
||||
|
||||
nb_api.reset_mock()
|
||||
listener = {'id': 'fake_host', 'ppid': 'fake_ppid'}
|
||||
nb_api.get_neutron_listener.return_value = models.Listener(
|
||||
jsonutils.dumps(listener))
|
||||
def test_create_new_heart_beat_reporter(self):
|
||||
self.notifier.nb_api.get.return_value = None
|
||||
self.notifier.create_heart_beat_reporter('fake_host')
|
||||
self.assertTrue(nb_api.register_listener_callback.called)
|
||||
self.notifier.nb_api.register_listener_callback.assert_called_once()
|
||||
|
||||
nb_api.reset_mock()
|
||||
listener = {'id': 'fake_host', 'ppid': os.getppid()}
|
||||
nb_api.get_neutron_listener.return_value = models.Listener(
|
||||
jsonutils.dumps(listener))
|
||||
def test_replace_heart_beat_reporter(self):
|
||||
listener = core.Listener(id='fake_host', ppid=6)
|
||||
self.notifier.nb_api.get.return_value = listener
|
||||
self.notifier.create_heart_beat_reporter('fake_host')
|
||||
self.assertFalse(nb_api.register_listener_callback.called)
|
||||
self.notifier.nb_api.register_listener_callback.assert_called_once()
|
||||
|
||||
def test_valid_heart_beat_reporter_exists(self):
|
||||
listener = core.Listener(id='fake_host', ppid=1)
|
||||
self.notifier.nb_api.get.return_value = listener
|
||||
self.notifier.create_heart_beat_reporter('fake_host')
|
||||
self.notifier.nb_api.register_listener_callback.assert_not_called()
|
||||
|
||||
def test_notify_neutron_server(self):
|
||||
core_plugin = mock.Mock()
|
||||
|
Loading…
Reference in New Issue
Block a user