dragonflow/dragonflow/db/api_nb.py

280 lines
9.9 KiB
Python

# Copyright (c) 2015 OpenStack Foundation.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import time
import traceback
from jsonmodels import errors
from oslo_config import cfg
from oslo_log import log
from oslo_utils import excutils
import dragonflow.common.exceptions as df_exceptions
from dragonflow.common import utils as df_utils
from dragonflow.db import db_common
from dragonflow.db import model_proxy as mproxy
from dragonflow.db.models import core
LOG = log.getLogger(__name__)
_nb_api = None
def get_db_ip_port():
hosts = cfg.CONF.df.remote_db_hosts
if not hosts:
LOG.warning("Deprecated: remote_db_ip and remote_db_port are "
"deprecated for removal. Use remote_db_hosts instead")
ip = cfg.CONF.df.remote_db_ip
port = cfg.CONF.df.remote_db_port
return ip, port
host = hosts[0]
ip, port = host.split(':')
return ip, port
def _get_topic(obj):
try:
return getattr(obj, 'topic', None)
except errors.ValidationError:
return None
class NbApi(object):
def __init__(self, db_driver):
super(NbApi, self).__init__()
self.driver = db_driver
self.controller = None
self.use_pubsub = cfg.CONF.df.enable_df_pub_sub
self.publisher = None
self.subscriber = None
self.enable_selective_topo_dist = \
cfg.CONF.df.enable_selective_topology_distribution
@staticmethod
def get_instance():
global _nb_api
if _nb_api is None:
nb_driver = df_utils.load_driver(
cfg.CONF.df.nb_db_class,
df_utils.DF_NB_DB_DRIVER_NAMESPACE)
nb_api = NbApi(nb_driver)
ip, port = get_db_ip_port()
nb_api._initialize(db_ip=ip, db_port=port)
_nb_api = nb_api
return _nb_api
def _initialize(self, db_ip='127.0.0.1', db_port=4001):
self.driver.initialize(db_ip, db_port, config=cfg.CONF.df)
if self.use_pubsub:
self.publisher = self._get_publisher()
self.subscriber = self._get_subscriber()
self.publisher.initialize()
# Start a thread to detect DB failover in Plugin
self.publisher.set_publisher_for_failover(
self.publisher,
self.db_recover_callback)
self.publisher.start_detect_for_failover()
def set_db_change_callback(self, db_change_callback):
if self.use_pubsub:
# This is here to not allow multiple subscribers to be started
# under the same process. One should be more than enough.
if not self.subscriber.is_running:
self._start_subscriber(db_change_callback)
# Register for DB Failover detection in NB Plugin
self.subscriber.set_subscriber_for_failover(
self.subscriber,
db_change_callback)
self.subscriber.register_hamsg_for_db()
else:
LOG.warning('Subscriber is already initialized, ignoring call')
def close(self):
if self.publisher:
self.publisher.close()
if self.subscriber:
self.subscriber.close()
def db_recover_callback(self):
# only db with HA func can go in here
self.driver.process_ha()
self.publisher.process_ha()
self.subscriber.process_ha()
self.controller.sync()
def _get_publisher(self):
pub_sub_driver = df_utils.load_driver(
cfg.CONF.df.pub_sub_driver,
df_utils.DF_PUBSUB_DRIVER_NAMESPACE)
return pub_sub_driver.get_publisher()
def _get_subscriber(self):
pub_sub_driver = df_utils.load_driver(
cfg.CONF.df.pub_sub_driver,
df_utils.DF_PUBSUB_DRIVER_NAMESPACE)
return pub_sub_driver.get_subscriber()
def _start_subscriber(self, db_change_callback):
self.subscriber.initialize(db_change_callback)
self.subscriber.register_topic(db_common.SEND_ALL_TOPIC)
publishers_ips = cfg.CONF.df.publishers_ips
uris = {'%s://%s:%s' % (
cfg.CONF.df.publisher_transport,
ip,
cfg.CONF.df.publisher_port) for ip in publishers_ips}
publishers = self.get_all(core.Publisher)
uris |= {publisher.uri for publisher in publishers}
for uri in uris:
self.subscriber.register_listen_address(uri)
self.subscriber.daemonize()
def support_publish_subscribe(self):
return self.use_pubsub
def _send_db_change_event(self, table, key, action, value, topic):
if not self.use_pubsub:
return
if not self.enable_selective_topo_dist or topic is None:
topic = db_common.SEND_ALL_TOPIC
update = db_common.DbUpdate(table, key, action, value, topic=topic)
self.publisher.send_event(update)
time.sleep(0)
def register_notification_callback(self, notification_cb):
self._notification_cb = notification_cb
def register_listener_callback(self, cb, topic):
"""Register a callback for Neutron listener
This method is used to register a callback for Neutron listener
to handle the message from Dragonflow controller. It should only be
called from Neutron side and only once.
:param: a callback method to handle the message from Dragonflow
controller
:param topic: the topic this neutron listener cares about, e.g the
hostname of the node
"""
if not self.use_pubsub:
return
self.subscriber.initialize(cb)
self.subscriber.register_topic(topic)
self.subscriber.daemonize()
def create(self, obj, skip_send_event=False):
"""Create the provided object in the database and publish an event
about its creation.
"""
model = type(obj)
obj.on_create_pre()
serialized_obj = obj.to_json()
topic = _get_topic(obj)
self.driver.create_key(model.table_name, obj.id,
serialized_obj, topic)
if not skip_send_event:
self._send_db_change_event(model.table_name, obj.id, 'create',
serialized_obj, topic)
def update(self, obj, skip_send_event=False):
"""Update the provided object in the database and publish an event
about the change.
This method reads the existing object from the database and updates
any non-empty fields of the provided object. Retrieval happens by
id/topic fields.
"""
model = type(obj)
full_obj = self.get(obj)
db_obj = copy.copy(full_obj)
if full_obj is None:
raise df_exceptions.DBKeyNotFound(key=obj.id)
changed_fields = full_obj.update(obj)
if not changed_fields:
return
full_obj.on_update_pre(db_obj)
serialized_obj = full_obj.to_json()
topic = _get_topic(full_obj)
self.driver.set_key(model.table_name, full_obj.id,
serialized_obj, topic)
if not skip_send_event:
self._send_db_change_event(model.table_name, full_obj.id, 'set',
serialized_obj, topic)
def delete(self, obj, skip_send_event=False):
"""Delete the provided object from the database and publish the event
about its deletion.
The provided object does not have to have all the fields filled,
just the ID / topic (if applicable) of the object we wish to delete.
"""
model = type(obj)
obj.on_delete_pre()
topic = _get_topic(obj)
try:
self.driver.delete_key(model.table_name, obj.id, topic)
except df_exceptions.DBKeyNotFound:
with excutils.save_and_reraise_exception():
LOG.debug(
'Could not find object %(id)s to delete in %(table)s',
{'id': obj.id, 'table': model.table_name})
if not skip_send_event:
self._send_db_change_event(model.table_name, obj.id, 'delete',
None, topic)
def get(self, lean_obj):
"""Retrieve a model instance from the database. This function uses
lean_obj to deduce ID and model type
>>> nb_api.get(Chassis(id="one"))
Chassis(id="One", ip="192.168.121.22", tunnel_types=["vxlan"])
"""
if mproxy.is_model_proxy(lean_obj):
lean_obj = lean_obj.get_proxied_model()(id=lean_obj.id)
model = type(lean_obj)
try:
serialized_obj = self.driver.get_key(
model.table_name,
lean_obj.id,
_get_topic(lean_obj),
)
except df_exceptions.DBKeyNotFound:
exception_tb = traceback.format_exc()
LOG.debug(
'Could not get object %(id)s from table %(table)s',
{'id': lean_obj.id, 'table': model.table_name})
LOG.debug('%s', (exception_tb,))
else:
return model.from_json(serialized_obj)
def get_all(self, model, topic=None):
"""Get all instances of provided model, can be limited to instances
with a specific topic.
"""
all_values = self.driver.get_all_entries(model.table_name, topic)
all_objects = [model.from_json(e) for e in all_values]
return model.on_get_all_post(all_objects)