
725 lines
29 KiB

# Copyright (c) 2016 VMware, Inc. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import six
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_messaging import exceptions as messaging_exceptions
from oslo_messaging.rpc import dispatcher
from oslo_utils import importutils
from oslo_utils import strutils
from oslo_utils import uuidutils
from congress.datalog import compile as datalog_compile
from congress.datasources import constants
from congress.db import datasources as datasources_db
from congress.dse2 import control_bus
from congress import exception
LOG = logging.getLogger(__name__)
class DseNode(object):
"""Addressable entity participating on the DSE message bus.
The Data Services Engine (DSE) is comprised of one or more DseNode
instances that each may run one or more DataService instances. All
communication between data services uses the DseNode interface.
- node_id: The unique ID of this node on the DSE.
- messaging_config: Configuration options for the message bus. See
oslo.messaging for more details.
- node_rpc_endpoints: List of object instances exposing a remotely
invokable interface.
EXCHANGE = 'congress'
CONTROL_TOPIC = 'congress-control'
SERVICE_TOPIC_PREFIX = 'congress-service-'
def node_rpc_target(self, namespace=None, server=None, fanout=False):
return messaging.Target(exchange=self.EXCHANGE,
def service_rpc_target(self, service_id, namespace=None, server=None,
topic = self._add_partition(self.SERVICE_TOPIC_PREFIX + service_id)
return messaging.Target(exchange=self.EXCHANGE,
def _add_partition(self, topic, partition_id=None):
"""Create a seed-specific version of an oslo-messaging topic."""
partition_id = partition_id or self.partition_id
if partition_id is None:
return topic
return topic + "-" + str(partition_id)
def __init__(self, messaging_config, node_id, node_rpc_endpoints,
# Note(ekcs): temporary setting to disable use of diffs and sequencing
# to avoid muddying the process of a first dse2 system test.
# TODO(ekcs,dse2): remove when differential update is standard
self.always_snapshot = False
self.messaging_config = messaging_config
self.node_id = node_id
self.node_rpc_endpoints = node_rpc_endpoints
# unique identifier shared by all nodes that can communicate
self.partition_id = partition_id or cfg.CONF.dse.bus_id or "bus"
self._running = False
self._services = []
# uuid to help recognize node_id clash
self.instance = uuidutils.generate_uuid()
# TODO(dse2): add detection and logging/rectifying for node_id clash?
access_policy = dispatcher.DefaultRPCAccessPolicy
self.context = self._message_context()
self.transport = messaging.get_rpc_transport(
allowed_remote_exmods=[exception.__name__, dispatcher.__name__,
db_exc.__name__, ])
self._rpctarget = self.node_rpc_target(self.node_id, self.node_id)
self._rpc_server = messaging.get_rpc_server(
self.transport, self._rpctarget, self.node_rpc_endpoints,
executor='eventlet', access_policy=access_policy)
# # keep track of what publisher/tables local services subscribe to
# subscribers indexed by publisher and table:
# {publisher_id ->
# {table_name -> set_of_subscriber_ids}}
self.subscriptions = {}
# Note(ekcs): A little strange that _control_bus starts before self?
self._control_bus = control_bus.DseNodeControlBus(self)
# load configured drivers
self.loaded_drivers = self.load_drivers()
self.periodic_tasks = None
self.sync_thread = None
def __del__(self):
def __repr__(self):
return self.__class__.__name__ + "<%s>" % self.node_id
def _message_context(self):
return {'node_id': self.node_id, 'instance': str(self.instance)}
# Note(thread-safety): blocking function
def register_service(self, service):
assert service.node is None
if self.service_object(service.service_id):
msg = ('Service %s already exsists on the node %s'
% (service.service_id, self.node_id))
raise exception.DataServiceError(msg)
access_policy = dispatcher.DefaultRPCAccessPolicy
service.always_snapshot = self.always_snapshot
service.node = self
service._target = self.service_rpc_target(service.service_id,
service._rpc_server = messaging.get_rpc_server(
self.transport, service._target, service.rpc_endpoints(),
executor='eventlet', access_policy=access_policy)
if self._running:
LOG.debug('<%s> Service %s RPC Server listening on %s',
self.node_id, service.service_id, service._target)
# Note(thread-safety): blocking function
def unregister_service(self, service_id=None, uuid_=None):
"""Unregister service from DseNode matching on service_id or uuid\_
Only one should be supplied. No-op if no matching service found.
LOG.debug("unregistering service %s on node %s", service_id,
service = self.service_object(service_id=service_id, uuid_=uuid_)
if service is not None:
# Note(thread-safety): blocking call
def get_services(self, hidden=False):
"""Return all local service objects."""
if hidden:
return self._services
return [s for s in self._services if s.service_id[0] != '_']
def get_global_service_names(self, hidden=False):
"""Return names of all services on all nodes."""
services = self.get_services(hidden=hidden)
local_services = [s.service_id for s in services]
# Also, check services registered on other nodes
peer_nodes = self.dse_status()['peers']
peer_services = []
for node in peer_nodes.values():
[srv['service_id'] for srv in node['services']])
return set(local_services + peer_services)
def service_object(self, service_id=None, uuid_=None):
"""Return the service object requested.
Search by service_id or uuid\_ (only one should be supplied).
None if not found.
if service_id is not None:
if uuid_ is not None:
raise TypeError('service_object() cannot accept both args '
'service_id and uuid_')
for s in self._services:
if s.service_id == service_id:
return s
elif uuid_ is not None:
for s in self._services:
if getattr(s, 'ds_id', None) == uuid_:
return s
raise TypeError('service_object() requires service_id or '
'uuid_ argument, but neither is given.')
return None
def start(self):
LOG.debug("<%s> DSE Node '%s' starting with %s sevices...",
self.node_id, self.node_id, len(self._services))
# Start Node RPC server
LOG.debug('<%s> Node RPC Server listening on %s',
self.node_id, self._rpctarget)
# Start Service RPC server(s)
for s in self._services:
LOG.debug('<%s> Service %s RPC Server listening on %s',
self.node_id, s.service_id, s._target)
self._running = True
def stop(self):
if self._running is False:
return"Stopping DSE node '%s'", self.node_id)
for s in self._services:
self._running = False
# Note(thread-safety): blocking function
def wait(self):
for s in self._services:
# Note(thread-safety): blocking call
# Note(thread-safety): blocking call
def dse_status(self):
"""Return latest observation of DSE status."""
return self._control_bus.dse_status()
def is_valid_service(self, service_id):
return service_id in self.get_global_service_names(hidden=True)
# Note(thread-safety): blocking function
def invoke_node_rpc(self, node_id, method, kwargs=None, timeout=None):
"""Invoke RPC method on a DSE Node.
:param: node_id: The ID of the node on which to invoke the call.
:param: method: The method name to call.
:param: kwargs: A dict of method arguments.
:returns: The result of the method invocation.
:raises: MessagingTimeout, RemoteError, MessageDeliveryFailure
if kwargs is None:
kwargs = {}
target = self.node_rpc_target(server=node_id)
LOG.trace("<%s> Invoking RPC '%s' on %s", self.node_id, method, target)
client = messaging.RPCClient(self.transport, target, timeout=timeout)
return, method, **kwargs)
# Note(thread-safety): blocking function
def broadcast_node_rpc(self, method, kwargs=None):
"""Invoke RPC method on all DSE Nodes.
:param: method: The method name to call.
:param: kwargs: A dict of method arguments.
:returns: None
Methods are invoked asynchronously and results are dropped.
:raises: RemoteError, MessageDeliveryFailure
if kwargs is None:
kwargs = {}
target = self.node_rpc_target(fanout=True)
LOG.trace("<%s> Casting RPC '%s' on %s", self.node_id, method, target)
client = messaging.RPCClient(self.transport, target)
client.cast(self.context, method, **kwargs)
# Note(thread-safety): blocking function
def invoke_service_rpc(
self, service_id, method, kwargs=None, timeout=None, local=False,
"""Invoke RPC method on a DSE Service.
:param: service_id: The ID of the data service on which to invoke the
:param: method: The method name to call.
:param: kwargs: A dict of method arguments.
:returns: The result of the method invocation.
:raises: MessagingTimeout, RemoteError, MessageDeliveryFailure,
target = self.service_rpc_target(
service_id, server=(self.node_id if local else None))
LOG.trace("<%s> Preparing to invoking RPC '%s' on %s",
self.node_id, method, target)
client = messaging.RPCClient(self.transport, target, timeout=timeout,
if not self.is_valid_service(service_id):
# First ping the destination to fail fast if unresponsive
LOG.trace("<%s> Checking responsiveness before invoking RPC "
"'%s' on %s", self.node_id, method, target)
self.context, 'ping')
except (messaging_exceptions.MessagingTimeout,
msg = "service '%s' could not be found"
raise exception.NotFound(msg % service_id)
if kwargs is None:
kwargs = {}
"<%s> Invoking RPC '%s' on %s", self.node_id, method, target)
result =, method, **kwargs)
except dispatcher.NoSuchMethod:
msg = "Method %s not supported for datasource %s"
LOG.exception(msg, method, service_id)
raise exception.BadRequest(msg % (method, service_id))
except (messaging_exceptions.MessagingTimeout,
msg = "Request to service '%s' timed out"
raise exception.NotFound(msg % service_id)
LOG.trace("<%s> RPC call returned: %s", self.node_id, result)
return result
# Note(thread-safety): blocking function
def broadcast_service_rpc(self, service_id, method, kwargs=None):
"""Invoke RPC method on all instances of service_id.
:param: service_id: The ID of the data service on which to invoke the
:param: method: The method name to call.
:param: kwargs: A dict of method arguments.
:returns: None - Methods are invoked asynchronously and results are
:raises: RemoteError, MessageDeliveryFailure
if kwargs is None:
kwargs = {}
if not self.is_valid_service(service_id):
msg = "service '%s' is not a registered service"
raise exception.NotFound(msg % service_id)
target = self.service_rpc_target(service_id, fanout=True)
LOG.trace("<%s> Casting RPC '%s' on %s", self.node_id, method, target)
client = messaging.RPCClient(self.transport, target)
client.cast(self.context, method, **kwargs)
# Note(ekcs): non-sequenced publish retained to simplify rollout of dse2
# to be replaced by handle_publish_sequenced
# Note(thread-safety): blocking function
def publish_table(self, publisher, table, data):
"""Invoke RPC method on all insances of service_id.
:param: service_id: The ID of the data service on which to invoke the
:param: method: The method name to call.
:param: kwargs: A dict of method arguments.
:returns: None
Methods are invoked asynchronously and results are dropped.
:raises: RemoteError, MessageDeliveryFailure
LOG.trace("<%s> Publishing from '%s' table %s: %s",
self.node_id, publisher, table, data)
{'publisher': publisher, 'table': table, 'data': data})
# Note(thread-safety): blocking function
def publish_table_sequenced(
self, publisher, table, data, is_snapshot, seqnum):
"""Invoke RPC method on all insances of service_id.
:param: service_id: The ID of the data service on which to invoke the
:param: method: The method name to call.
:param: kwargs: A dict of method arguments.
:returns: None
Methods are invoked asynchronously and results are dropped.
:raises: RemoteError, MessageDeliveryFailure
LOG.trace("<%s> Publishing from '%s' table %s: %s",
self.node_id, publisher, table, data)
{'publisher': publisher, 'table': table,
'data': data, 'is_snapshot': is_snapshot, 'seqnum': seqnum})
def table_subscribers(self, publisher, table):
"""List services on this node that subscribes to publisher/table."""
return self.subscriptions.get(
publisher, {}).get(table, [])
# Note(thread-safety): blocking function
def subscribe_table(self, subscriber, publisher, table):
"""Prepare local service to receives publications from target/table."""
# data structure: {service -> {target -> set-of-tables}
LOG.trace("subscribing %s to %s:%s", subscriber, publisher, table)
if publisher not in self.subscriptions:
self.subscriptions[publisher] = {}
if table not in self.subscriptions[publisher]:
self.subscriptions[publisher][table] = set()
# oslo returns [] instead of set(), so handle that case directly
if self.always_snapshot:
# Note(thread-safety): blocking call
snapshot = self.invoke_service_rpc(
publisher, "get_snapshot", {'table': table})
return self.to_set_of_tuples(snapshot)
# Note(thread-safety): blocking call
snapshot_seqnum = self.invoke_service_rpc(
publisher, "get_last_published_data_with_seqnum",
{'table': table})
return snapshot_seqnum
def get_subscription(self, service_id):
"""Return publisher/tables subscribed by service: service_id
Return data structure:
{publisher_id -> set of tables}
result = {}
for publisher in self.subscriptions:
for table in self.subscriptions[publisher]:
if service_id in self.subscriptions[publisher][table]:
except KeyError:
result[publisher] = set([table])
return result
def get_subscribers(self, service_id):
"""List of services subscribed to this service."""
result = set()
tables = self.subscriptions.get(service_id, None)
if not tables:
# no subscribers
return []
for t in tables:
result = result | self.subscriptions[service_id][t]
return list(result)
def to_set_of_tuples(self, snapshot):
return set([tuple(x) for x in snapshot])
except TypeError:
return snapshot
def unsubscribe_table(self, subscriber, publisher, table):
"""Remove subscription for local service to target/table."""
if publisher not in self.subscriptions:
return False
if table not in self.subscriptions[publisher]:
return False
if len(self.subscriptions[publisher][table]) == 0:
del self.subscriptions[publisher][table]
if len(self.subscriptions[publisher]) == 0:
del self.subscriptions[publisher]
def _update_tables_with_subscriber(self):
# not thread-safe: assumes each dseNode is single-threaded
peers = self.dse_status()['peers']
for s in self.get_services():
sid = s.service_id
# first, include subscriptions within the node, if any
tables_with_subs = set(self.subscriptions.get(sid, {}))
# then add subscriptions from other nodes
for peer_id in peers:
if sid in peers[peer_id]['subscribed_tables']:
tables_with_subs |= peers[
# call DataService hooks
if hasattr(s, 'on_first_subs'):
added = tables_with_subs - s._published_tables_with_subscriber
if len(added) > 0:
if hasattr(s, 'on_no_subs'):
removed = \
s._published_tables_with_subscriber - tables_with_subs
if len(removed) > 0:
s._published_tables_with_subscriber = tables_with_subs
# Driver CRUD. Maybe belongs in a subclass of DseNode?
# Note(thread-safety): blocking function?
def load_drivers(self):
"""Load all configured drivers and check no name conflict"""
result = {}
for driver_path in cfg.CONF.drivers:
# Note(thread-safety): blocking call?
obj = importutils.import_class(driver_path)
driver = obj.get_datasource_info()
if driver['id'] in result:
raise exception.BadConfig(_("There is a driver loaded already"
"with the driver name of %s")
% driver['id'])
driver['module'] = driver_path
result[driver['id']] = driver
return result
def get_driver_info(self, driver_name):
driver = self.loaded_drivers.get(driver_name)
if not driver:
raise exception.DriverNotFound(id=driver_name)
return driver
def get_drivers_info(self):
return self.loaded_drivers
def get_driver_schema(self, drivername):
driver = self.get_driver_info(drivername)
# Note(thread-safety): blocking call?
obj = importutils.import_class(driver['module'])
return obj.get_schema()
# Datasource CRUD. Maybe belongs in a subclass of DseNode?
# Note(thread-safety): blocking function
def get_datasource(cls, id_):
"""Return the created datasource."""
# Note(thread-safety): blocking call
result = datasources_db.get_datasource(id_)
if not result:
raise exception.DatasourceNotFound(id=id_)
return cls.make_datasource_dict(result)
# Note(thread-safety): blocking function
def get_datasources(self, filter_secret=False):
"""Return the created datasources as recorded in the DB.
This returns what datasources the database contains, not the
datasources that this server instance is running.
results = []
for datasource in datasources_db.get_datasources():
result = self.make_datasource_dict(datasource)
if filter_secret:
# driver_info knows which fields should be secret
driver_info = self.get_driver_info(result['driver'])
for hide_field in driver_info['secret']:
result['config'][hide_field] = "<hidden>"
except KeyError:
return results
def delete_missing_driver_datasources(self):
removed = 0
for datasource in datasources_db.get_datasources():
except exception.DriverNotFound:
removed = removed+1
LOG.debug("Datasource driver '%s' not found, deleting the "
"datasource '%s' from DB ", datasource.driver,"Datsource cleanup completed, removed %d datasources",
def make_datasource_dict(self, req, fields=None):
result = {'id': req.get('id') or uuidutils.generate_uuid(),
'name': req.get('name'),
'driver': req.get('driver'),
'description': req.get('description'),
'type': None,
'enabled': req.get('enabled', True)}
# NOTE(arosen): we store the config as a string in the db so
# here we serialize it back when returning it.
if isinstance(req.get('config'), six.string_types):
result['config'] = json.loads(req['config'])
result['config'] = req.get('config')
return self._fields(result, fields)
def _fields(self, resource, fields):
if fields:
return dict(((key, item) for key, item in resource.items()
if key in fields))
return resource
def validate_create_datasource(self, req):
name = req['name']
if not datalog_compile.string_is_servicename(name):
raise exception.InvalidDatasourceName(value=name)
driver = req['driver']
config = req['config'] or {}
for loaded_driver in self.loaded_drivers.values():
if loaded_driver['id'] == driver:
specified_options = set(config.keys())
valid_options = set(loaded_driver['config'].keys())
# Check that all the specified options passed in are
# valid configuration options that the driver exposes.
invalid_options = specified_options - valid_options
if invalid_options:
raise exception.InvalidDriverOption(
# check that all the required options are passed in
required_options = set(
[k for k, v in loaded_driver['config'].items()
if v == constants.REQUIRED])
missing_options = required_options - specified_options
if missing_options:
missing_options = ', '.join(missing_options)
raise exception.MissingRequiredConfigOptions(
return loaded_driver
# If we get here no datasource driver match was found.
raise exception.InvalidDriver(driver=req)
# Note (thread-safety): blocking function
def create_datasource_service(self, datasource):
"""Create a new DataService on this node.
:param: name is the name of the service. Must be unique across all
:param: classPath is a string giving the path to the class name, e.g.
:param: args is the list of arguments to give the DataService
:param: type\_ is the kind of service
:param: id\_ is an optional parameter for specifying the uuid.
# get the driver info for the datasource
ds_dict = self.make_datasource_dict(datasource)
if not ds_dict['enabled']:"datasource %s not enabled, skip loading",
driver_info = self.get_driver_info(ds_dict['driver'])
# split class_path into module and class name
class_path = driver_info['module']
pieces = class_path.split(".")
module_name = ".".join(pieces[:-1])
class_name = pieces[-1]
if ds_dict['config'] is None:
args = {'ds_id': ds_dict['id']}
args = dict(ds_dict['config'], ds_id=ds_dict['id'])
kwargs = {'name': ds_dict['name'], 'args': args}"creating service %s with class %s and args %s",
ds_dict['name'], module_name,
strutils.mask_password(kwargs, "****"))
# import the module
# Note(thread-safety): blocking call?
module = importutils.import_module(module_name)
service = getattr(module, class_name)(**kwargs)
except Exception:
msg = ("Error loading instance of module '%s'")
LOG.exception(msg, class_path)
raise exception.DataServiceError(msg % class_path)
return service
class DseNodeEndpoints (object):
"""Collection of RPC endpoints that the DseNode exposes on the bus.
Must be a separate class since all public methods of a given
class are assumed to be valid RPC endpoints.
def __init__(self, dsenode):
self.node = dsenode
# Note(ekcs): non-sequenced publish retained to simplify rollout of dse2
# to be replaced by handle_publish_sequenced
def handle_publish(self, context, publisher, table, data):
"""Function called on the node when a publication is sent.
Forwards the publication to all of the relevant services.
for s in self.node.table_subscribers(publisher, table):
publisher=publisher, table=table, data=data, is_snapshot=True)
def handle_publish_sequenced(
self, context, publisher, table, data, is_snapshot, seqnum):
"""Function called on the node when a publication is sent.
Forwards the publication to all of the relevant services.
for s in self.node.table_subscribers(publisher, table):
publisher=publisher, table=table, data=data, seqnum=seqnum,