Heartbeat update of subscribed tables

Added list of target:tables subscribed by each node in the heartbeat update.
On each heartbeat, each dseNode updates each contained DataService on
which of its published tables have subscribers.

Partial-Bug: #1544126

Change-Id: I186410df29b5c31fe8f2f7e50a9c895c36d6fa45
This commit is contained in:
Tim Hinrichs 2016-02-23 14:44:41 -08:00 committed by Eric K
parent fc95b412e1
commit a656c9bb64
4 changed files with 160 additions and 25 deletions

View File

@ -38,6 +38,14 @@ def drop_cast_echos(wrapped):
return wrapper
class HeartbeatEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return 0 # suppress sets
# Let the base class default method handle all other cases
return json.JSONEncoder.default(self, obj)
class _DseControlBusEndpoint(object):
def __init__(self, dse_bus):
self.dse_bus = dse_bus
@ -47,11 +55,16 @@ class _DseControlBusEndpoint(object):
LOG.debug("<%s> Accepted heartbeat: context=%s, args='%s'",
self.dse_bus.node.node_id, client_ctxt, args)
hb = json.loads(args)
# convert dict to set
for target in hb['subscribed_tables']:
hb['subscribed_tables'][target] = set(
hb['subscribed_tables'][target])
peer_id = client_ctxt['node_id']
new_status = {
'node_id': peer_id,
'instance': client_ctxt['instance'],
'services': hb['services'],
'subscribed_tables': hb['subscribed_tables']
}
old_status = self.dse_bus.peers.get(peer_id)
@ -97,13 +110,16 @@ class DseNodeControlBus(DataService):
def _publish_heartbeat(self):
args = json.dumps(
{'services': [s.info.to_dict()
for s in self.node.get_services(True)]})
for s in self.node.get_services(True)],
'subscribed_tables': self.node.subscriptions},
cls=HeartbeatEncoder)
self.node.broadcast_service_rpc(self.service_id, 'accept_heartbeat',
args=args)
def _heartbeat_loop(self):
while self._running:
self._publish_heartbeat()
self.node._update_tables_with_subscriber()
eventlet.sleep(self.HEARTBEAT_INTERVAL)
def _refresh_peers(self):

View File

@ -83,6 +83,8 @@ class DataService(object):
Attributes:
service_id: A unique ID of the service.
_published_tables_with_subscriber: A set of tables published by self
that has subscribers
"""
# TODO(pballand): make default methods for pub/subscribed tables
@ -91,6 +93,7 @@ class DataService(object):
self.node = None
self._rpc_endpoints = [DataServiceEndPoints(self)]
self._running = False
self._published_tables_with_subscriber = set()
def add_rpc_endpoint(self, endpt):
self._rpc_endpoints.append(endpt)

View File

@ -107,10 +107,15 @@ class DseNode(object):
executor='eventlet')
self._service_rpc_servers = {} # {service_id => (rpcserver, target)}
# # keep track of what publisher/tables local services subscribe to
# subscribers indexed by publisher and table:
# {publisher_id ->
# {table_name -> set_of_subscriber_ids}}
self.subscriptions = {}
# Note(ekcs): A little strange that _control_bus starts before self?
self._control_bus = DseNodeControlBus(self)
self.register_service(self._control_bus)
# keep track of which local services subscribed to which other services
self.subscribers = {}
# load configured drivers
self.loaded_drivers = self.load_drivers()
self.start()
@ -320,28 +325,42 @@ class DseNode(object):
self.broadcast_node_rpc("handle_publish", publisher=publisher,
table=table, data=data)
def table_subscribers(self, target, table):
"""List all services on this node that subscribed to target/table."""
return [s for s in self.subscribers
if (target in self.subscribers[s] and
table in self.subscribers[s][target])]
def table_subscribers(self, publisher, table):
"""List services on this node that subscribes to publisher/table."""
return self.subscriptions.get(
publisher, {}).get(table, [])
def subscribe_table(self, service, target, table):
def subscribe_table(self, subscriber, publisher, table):
"""Prepare local service to receives publications from target/table."""
# data structure: {service -> {target -> set-of-tables}
LOG.trace("subscribing %s to %s:%s", service, target, table)
if service not in self.subscribers:
self.subscribers[service] = {}
if target not in self.subscribers[service]:
self.subscribers[service][target] = set()
self.subscribers[service][target].add(table)
LOG.trace("subscribing %s to %s:%s", subscriber, publisher, table)
if publisher not in self.subscriptions:
self.subscriptions[publisher] = {}
if table not in self.subscriptions[publisher]:
self.subscriptions[publisher][table] = set()
self.subscriptions[publisher][table].add(subscriber)
snapshot = self.invoke_service_rpc(
target, "get_snapshot", table=table)
publisher, "get_snapshot", table=table)
# oslo returns [] instead of set(), so handle that case directly
return self.to_set_of_tuples(snapshot)
def get_subscription(self, service_id):
return self.subscribers.get(service_id, {})
"""Return publisher/tables subscribed by service: service_id
Return data structure:
{publisher_id -> set of tables}
"""
result = {}
for publisher in self.subscriptions:
for table in self.subscriptions[publisher]:
if service_id in self.subscriptions[publisher][table]:
try:
result[publisher].add(table)
except KeyError:
result[publisher] = set([table])
return result
def to_set_of_tuples(self, snapshot):
try:
@ -349,17 +368,31 @@ class DseNode(object):
except TypeError:
return snapshot
def unsubscribe_table(self, service, target, table):
def unsubscribe_table(self, subscriber, publisher, table):
"""Remove subscription for local service to target/table."""
if service not in self.subscribers:
if publisher not in self.subscriptions:
return False
if target not in self.subscribers[service]:
if table not in self.subscriptions[publisher]:
return False
self.subscribers[service][target].discard(table)
if len(self.subscribers[service][target]) == 0:
del self.subscribers[service][target]
if len(self.subscribers[service]) == 0:
del self.subscribers[service]
self.subscriptions[publisher][table].discard(subscriber)
if len(self.subscriptions[publisher][table]) == 0:
del self.subscriptions[publisher][table]
if len(self.subscriptions[publisher]) == 0:
del self.subscriptions[publisher]
def _update_tables_with_subscriber(self):
# not thread-safe: assumes each dseNode is single-threaded
peers = self.dse_status()['peers']
for s in self.get_services():
sid = s.service_id
# first, include subscriptions within the node, if any
tables_with_subs = set(self.subscriptions.get(sid, {}))
# then add subscriptions from other nodes
for peer_id in peers:
if sid in peers[peer_id]['subscribed_tables']:
tables_with_subs |= peers[
peer_id]['subscribed_tables'][sid]
s._published_tables_with_subscriber = tables_with_subs
# Driver CRUD. Maybe belongs in a subclass of DseNode?

View File

@ -289,3 +289,86 @@ class TestDSE(base.TestCase):
congressException.NotFound,
lambda: node.invoke_service_rpc(
'test1', 'get_status', source_id=None, params=None))
def _create_node_with_services(self, nodes, services, num, partition_id):
nid = 'cbd_node%s' % num
nodes.append(DseNode(self.messaging_config, nid, [], partition_id))
ns = []
for s in range(num):
# intentionally starting different number services
ns.append(FakeDataSource('cbd-%d_svc-%d' % (num, s)))
nodes[-1].register_service(ns[-1])
services.append(ns)
return nodes[-1]
def test_subs_list_update_aggregated_by_service(self):
part = helper.get_new_partition()
nodes = []
services = []
num_nodes = 3
for i in range(num_nodes):
n = self._create_node_with_services(nodes, services, i, part)
n.start()
# add subscriptions
for i in range(2, num_nodes):
for s2 in services[i]:
for s1 in services[i-1]:
s1.subscribe(s2.service_id, 'table-A')
s2.subscribe(s1.service_id, 'table-B')
services[1][0].subscribe(services[2][0].service_id, 'table-C')
services[2][1].subscribe(services[2][0].service_id, 'table-D')
# constructed expected results
expected_subbed_tables = {}
expected_subbed_tables[nodes[1].node_id] = {}
expected_subbed_tables[nodes[2].node_id] = {}
expected_subbed_tables[nodes[1].node_id][
services[1][0].service_id] = set(['table-B'])
expected_subbed_tables[nodes[2].node_id][
services[2][0].service_id] = set(['table-A', 'table-C', 'table-D'])
expected_subbed_tables[nodes[2].node_id][
services[2][1].service_id] = set(['table-A'])
# validate
def _validate_subbed_tables(node):
for s in node.get_services():
sid = s.service_id
subscribed_tables = node.service_object(
sid)._published_tables_with_subscriber
self.assertEqual(
subscribed_tables,
expected_subbed_tables[node.node_id][sid],
'%s has incorrect subscribed tables list' % sid)
return True
for n in nodes:
helper.retry_check_function_return_value(
lambda: _validate_subbed_tables(n), True)
# selectively unsubscribe
services[1][0].unsubscribe(services[2][0].service_id, 'table-A')
# note that services[2][1] still subscribes to 'table-B'
services[2][0].unsubscribe(services[1][0].service_id, 'table-B')
# extraneous unsubscribe
services[2][0].unsubscribe(services[1][0].service_id, 'table-None')
# update expected results
expected_subbed_tables[nodes[2].node_id][
services[2][0].service_id] = set(['table-C', 'table-D'])
for n in nodes:
helper.retry_check_function_return_value(
lambda: _validate_subbed_tables(n), True)
# resubscribe
services[1][0].subscribe(services[2][0].service_id, 'table-A')
services[2][0].subscribe(services[1][0].service_id, 'table-B')
# update expected results
expected_subbed_tables[nodes[2].node_id][
services[2][0].service_id] = set(['table-A', 'table-C', 'table-D'])
for n in nodes:
helper.retry_check_function_return_value(
lambda: _validate_subbed_tables(n), True)