From 5b9ae410249b96f522cfa545da5cfb296a1fc26b Mon Sep 17 00:00:00 2001 From: Eric K Date: Mon, 17 Dec 2018 17:31:27 -0800 Subject: [PATCH] Avoid coreference between current state and _last_published_data Previously, all data source drivers replace the entire state on each poll or push update (doctor). The DSE publish code relied on this situation to optimize performance by skipping a copy when setting _last_published_data. The coreference introduced did not matter because the original state was never edited, only replaced. But beginning with the webhook drivers (monasca webhook and vitrage), the drivers edit the current state. This a bug was introduced where sometimes an update would not be published to messaging bus because of an incorrect _last_published_data due to coreference. This patch fixes the bug. In addition, the _last_published_data elements are stored as set rather than list to save the need to convert to set each time a diff is needed. The times when conversion back to list is needed to publish on the messaging bus (used for snapshot publish only) is expected to be far fewer than the times the conversion to set is needed. A few additionalogging lines are also introbuced for improved debuggability. Change-Id: I332a844beaf8dec4450bb1ade6ce7e41cc3d4d91 --- congress/dse2/data_service.py | 25 ++++++++++++------ congress/tests/dse2/test_dse2.py | 44 ++++++++++++++++---------------- 2 files changed, 39 insertions(+), 30 deletions(-) diff --git a/congress/dse2/data_service.py b/congress/dse2/data_service.py index 87540135a..33b41cd2a 100644 --- a/congress/dse2/data_service.py +++ b/congress/dse2/data_service.py @@ -226,17 +226,23 @@ class DataService(object): # Note(thread-safety): blocking function def publish(self, table, data, use_snapshot=False): + LOG.debug('Publishing table %s', table) + LOG.trace('Parameters: table: %s, data: %s, use_snapshot: %s', + table, data, use_snapshot) + LOG.trace('Last published data %s', self._last_published_data) + data = set(data) # make a copy to avoid co-reference + def get_differential_and_set_last_published_data(): if table in self._last_published_data: - to_add = list( - set(data) - set(self._last_published_data[table])) - to_del = list( - set(self._last_published_data[table]) - set(data)) + LOG.trace('Diff against last published data %s', + self._last_published_data[table]) + to_add = list(data - self._last_published_data[table]) + to_del = list(self._last_published_data[table] - data) self._last_published_data[table] = data else: - self._last_published_data[table] = data - to_add = copy.copy(data) + to_add = list(data) to_del = [] + self._last_published_data[table] = data return [to_add, to_del] def increment_get_seqnum(): @@ -248,6 +254,7 @@ class DataService(object): if not use_snapshot: data = get_differential_and_set_last_published_data() + LOG.debug('Differential data to publish %s', data) if len(data[0]) == 0 and len(data[1]) == 0: return @@ -445,8 +452,10 @@ class DataService(object): """Method that returns the current seqnum & data for given table.""" if table not in self.sender_seqnums: self.sender_seqnums[table] = 0 - self._last_published_data[table] = self.get_snapshot(table) - return (self.sender_seqnums[table], self._last_published_data[table]) + self._last_published_data[table] = set(self.get_snapshot(table)) + # make a copy to avoid co-reference + return (self.sender_seqnums[table], + list(self._last_published_data[table])) def get_snapshot(self, table): """Method that returns the current data for the given table. diff --git a/congress/tests/dse2/test_dse2.py b/congress/tests/dse2/test_dse2.py index 5c469e14c..c16a8d2a0 100644 --- a/congress/tests/dse2/test_dse2.py +++ b/congress/tests/dse2/test_dse2.py @@ -55,9 +55,9 @@ class TestDSE(base.TestCase): test1.subscribe('test2', 'p') helper.retry_check_function_return_value( lambda: hasattr(test1, 'last_msg'), True) - test2.publish('p', 42, use_snapshot=True) + test2.publish('p', [42], use_snapshot=True) helper.retry_check_function_return_value( - lambda: test1.last_msg['data'], 42) + lambda: test1.last_msg['data'], [42]) self.assertFalse(hasattr(test2, "last_msg")) node.stop() @@ -73,9 +73,9 @@ class TestDSE(base.TestCase): test2.subscribe('test1', 'p') helper.retry_check_function_return_value( lambda: hasattr(test2, 'last_msg'), True) - test1.publish('p', 42, use_snapshot=True) + test1.publish('p', [42], use_snapshot=True) helper.retry_check_function_return_value( - lambda: test2.last_msg['data'], 42) + lambda: test2.last_msg['data'], [42]) self.assertFalse(hasattr(test1, "last_msg")) node.stop() @@ -91,9 +91,9 @@ class TestDSE(base.TestCase): test1.unsubscribe('test2', 'q') # unsub from q should not affect p helper.retry_check_function_return_value( lambda: hasattr(test1, 'last_msg'), True) - test2.publish('p', 42, use_snapshot=True) + test2.publish('p', [42], use_snapshot=True) helper.retry_check_function_return_value( - lambda: test1.last_msg['data'], 42) + lambda: test1.last_msg['data'], [42]) self.assertFalse(hasattr(test2, "last_msg")) node.stop() @@ -106,9 +106,9 @@ class TestDSE(base.TestCase): self.assertFalse(hasattr(test1, "last_msg")) test2 = fake_datasource.FakeDataSource('test2') node.register_service(test2) - test2.publish('p', 42, use_snapshot=True) + test2.publish('p', [42], use_snapshot=True) helper.retry_check_function_return_value( - lambda: test1.last_msg['data'], 42) + lambda: test1.last_msg['data'], [42]) self.assertFalse(hasattr(test2, "last_msg")) node.stop() node.wait() @@ -124,9 +124,9 @@ class TestDSE(base.TestCase): test1.subscribe('test2', 'p') helper.retry_check_function_return_value( lambda: hasattr(test1, 'last_msg'), True) - test2.publish('p', 42, use_snapshot=True) + test2.publish('p', [42], use_snapshot=True) helper.retry_check_function_return_value( - lambda: test1.last_msg['data'], 42) + lambda: test1.last_msg['data'], [42]) self.assertFalse(hasattr(test2, "last_msg")) node1.stop() node2.stop() @@ -144,9 +144,9 @@ class TestDSE(base.TestCase): test1.unsubscribe('test2', 'q') # unsub from q should not affect p helper.retry_check_function_return_value( lambda: hasattr(test1, 'last_msg'), True) - test2.publish('p', 42, use_snapshot=True) + test2.publish('p', [42], use_snapshot=True) helper.retry_check_function_return_value( - lambda: test1.last_msg['data'], 42) + lambda: test1.last_msg['data'], [42]) self.assertFalse(hasattr(test2, "last_msg")) node1.stop() node2.stop() @@ -164,9 +164,9 @@ class TestDSE(base.TestCase): test1.subscribe('test3', 'p') helper.retry_check_function_return_value( lambda: hasattr(test1, 'last_msg'), True) - test3.publish('p', 42, use_snapshot=True) + test3.publish('p', [42], use_snapshot=True) helper.retry_check_function_return_value( - lambda: test1.last_msg['data'], 42) + lambda: test1.last_msg['data'], [42]) self.assertFalse(hasattr(test2, "last_msg")) self.assertFalse(hasattr(test3, "last_msg")) node1.stop() @@ -197,9 +197,9 @@ class TestDSE(base.TestCase): nova.subscribe('test', 'p') helper.retry_check_function_return_value( lambda: hasattr(nova, 'last_msg'), True) - test.publish('p', 42, use_snapshot=True) + test.publish('p', [42], use_snapshot=True) helper.retry_check_function_return_value( - lambda: nova.last_msg['data'], 42) + lambda: nova.last_msg['data'], [42]) self.assertFalse(hasattr(test, "last_msg")) node.stop() @@ -215,15 +215,15 @@ class TestDSE(base.TestCase): nova.subscribe('test', 'p') helper.retry_check_function_return_value( lambda: hasattr(nova, 'last_msg'), True) - test.publish('p', 42, use_snapshot=True) + test.publish('p', [42], use_snapshot=True) helper.retry_check_function_return_value( - lambda: nova.last_msg['data'], 42) + lambda: nova.last_msg['data'], [42]) self.assertFalse(hasattr(test, "last_msg")) nova.unsubscribe('test', 'p') - test.publish('p', 43, use_snapshot=True) + test.publish('p', [43], use_snapshot=True) # hard to test that the message is never delivered time.sleep(0.2) - self.assertEqual(nova.last_msg['data'], 42) + self.assertEqual(nova.last_msg['data'], [42]) node.stop() @mock.patch.object(nova_client, 'Client', spec_set=True, autospec=True) @@ -238,9 +238,9 @@ class TestDSE(base.TestCase): test.subscribe('nova', 'p') helper.retry_check_function_return_value( lambda: hasattr(test, 'last_msg'), True) - nova.publish('p', 42, use_snapshot=True) + nova.publish('p', [42], use_snapshot=True) helper.retry_check_function_return_value( - lambda: test.last_msg['data'], 42) + lambda: test.last_msg['data'], [42]) self.assertFalse(hasattr(nova, "last_msg")) node.stop()