From 854d2d81fceeb60dde62aacf6356fcb7a75affce Mon Sep 17 00:00:00 2001 From: Masahito Muroi Date: Fri, 30 Sep 2016 00:09:13 +0900 Subject: [PATCH] HeartBeat packet includes subscribed_table info HeartBeat packet has service's info that includes list of suscriber. Change-Id: I82dcb41a2cc935bd35c2f9997c6ab73845aa71f7 Closes-Bug: 1628440 --- congress/dse2/control_bus.py | 5 ++++- congress/dse2/data_service.py | 2 +- congress/tests/dse2/test_data_service.py | 16 ++++++++++++++++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/congress/dse2/control_bus.py b/congress/dse2/control_bus.py index 2932a730b..7304c1409 100644 --- a/congress/dse2/control_bus.py +++ b/congress/dse2/control_bus.py @@ -41,7 +41,7 @@ def drop_cast_echos(wrapped): class HeartbeatEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, set): - return 0 # suppress sets + return list(obj) # Let the base class default method handle all other cases return json.JSONEncoder.default(self, obj) @@ -114,6 +114,9 @@ class DseNodeControlBus(data_service.DataService): args = json.dumps( {'services': [s.info.to_dict() for s in self.node.get_services(True)], + # FIXME(ekcs): suppress subscriber details for each subscribed + # table to avoid unnecessary network traffic. Only binary + # information needed over HB. 'subscribed_tables': self.node.subscriptions}, cls=HeartbeatEncoder) # Note(thread-safety): blocking call diff --git a/congress/dse2/data_service.py b/congress/dse2/data_service.py index 4401e12a5..07a5310cb 100644 --- a/congress/dse2/data_service.py +++ b/congress/dse2/data_service.py @@ -151,7 +151,7 @@ class DataService(object): service_id=self.service_id, node_id=self.node.node_id if self.node else None, published_tables=None, - subscribed_tables=None, + subscribed_tables=self._published_tables_with_subscriber, rpc_endpoints_info=None, ) diff --git a/congress/tests/dse2/test_data_service.py b/congress/tests/dse2/test_data_service.py index fadcd8250..1f222a1f3 100644 --- a/congress/tests/dse2/test_data_service.py +++ b/congress/tests/dse2/test_data_service.py @@ -87,6 +87,22 @@ class TestDataService(base.TestCase): self.assertEqual(ds._running, False, "Stopped service is marked as not running") + def test_service_info(self): + ds = data_service.DataService("svc1") + ds.node = mock.MagicMock() + ds.node.node_id = 'node-id' + ds._published_tables_with_subscriber = set(['table1']) + + expected_result = { + 'service_id': 'svc1', + 'node_id': 'node-id', + 'published_tables': [], + 'subscribed_tables': set(['table1']), + 'rpc_endpoints_info': [] + } + + self.assertEqual(expected_result, ds.info.to_dict()) + # TODO(pballand): replace with congress unit test framework when convenient if __name__ == '__main__':