Avoid coreference between current state and _last_published_data
Previously, all data source drivers replace the entire state on each poll or push update (doctor). The DSE publish code relied on this situation to optimize performance by skipping a copy when setting _last_published_data. The coreference introduced did not matter because the original state was never edited, only replaced. But beginning with the webhook drivers (monasca webhook and vitrage), the drivers edit the current state. This a bug was introduced where sometimes an update would not be published to messaging bus because of an incorrect _last_published_data due to coreference. This patch fixes the bug. In addition, the _last_published_data elements are stored as set rather than list to save the need to convert to set each time a diff is needed. The times when conversion back to list is needed to publish on the messaging bus (used for snapshot publish only) is expected to be far fewer than the times the conversion to set is needed. A few additionalogging lines are also introbuced for improved debuggability. Change-Id: I332a844beaf8dec4450bb1ade6ce7e41cc3d4d91
This commit is contained in:
parent
e75c7bd893
commit
5b9ae41024
@ -226,17 +226,23 @@ class DataService(object):
|
||||
|
||||
# Note(thread-safety): blocking function
|
||||
def publish(self, table, data, use_snapshot=False):
|
||||
LOG.debug('Publishing table %s', table)
|
||||
LOG.trace('Parameters: table: %s, data: %s, use_snapshot: %s',
|
||||
table, data, use_snapshot)
|
||||
LOG.trace('Last published data %s', self._last_published_data)
|
||||
data = set(data) # make a copy to avoid co-reference
|
||||
|
||||
def get_differential_and_set_last_published_data():
|
||||
if table in self._last_published_data:
|
||||
to_add = list(
|
||||
set(data) - set(self._last_published_data[table]))
|
||||
to_del = list(
|
||||
set(self._last_published_data[table]) - set(data))
|
||||
LOG.trace('Diff against last published data %s',
|
||||
self._last_published_data[table])
|
||||
to_add = list(data - self._last_published_data[table])
|
||||
to_del = list(self._last_published_data[table] - data)
|
||||
self._last_published_data[table] = data
|
||||
else:
|
||||
self._last_published_data[table] = data
|
||||
to_add = copy.copy(data)
|
||||
to_add = list(data)
|
||||
to_del = []
|
||||
self._last_published_data[table] = data
|
||||
return [to_add, to_del]
|
||||
|
||||
def increment_get_seqnum():
|
||||
@ -248,6 +254,7 @@ class DataService(object):
|
||||
|
||||
if not use_snapshot:
|
||||
data = get_differential_and_set_last_published_data()
|
||||
LOG.debug('Differential data to publish %s', data)
|
||||
if len(data[0]) == 0 and len(data[1]) == 0:
|
||||
return
|
||||
|
||||
@ -445,8 +452,10 @@ class DataService(object):
|
||||
"""Method that returns the current seqnum & data for given table."""
|
||||
if table not in self.sender_seqnums:
|
||||
self.sender_seqnums[table] = 0
|
||||
self._last_published_data[table] = self.get_snapshot(table)
|
||||
return (self.sender_seqnums[table], self._last_published_data[table])
|
||||
self._last_published_data[table] = set(self.get_snapshot(table))
|
||||
# make a copy to avoid co-reference
|
||||
return (self.sender_seqnums[table],
|
||||
list(self._last_published_data[table]))
|
||||
|
||||
def get_snapshot(self, table):
|
||||
"""Method that returns the current data for the given table.
|
||||
|
@ -55,9 +55,9 @@ class TestDSE(base.TestCase):
|
||||
test1.subscribe('test2', 'p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(test1, 'last_msg'), True)
|
||||
test2.publish('p', 42, use_snapshot=True)
|
||||
test2.publish('p', [42], use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test1.last_msg['data'], 42)
|
||||
lambda: test1.last_msg['data'], [42])
|
||||
self.assertFalse(hasattr(test2, "last_msg"))
|
||||
node.stop()
|
||||
|
||||
@ -73,9 +73,9 @@ class TestDSE(base.TestCase):
|
||||
test2.subscribe('test1', 'p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(test2, 'last_msg'), True)
|
||||
test1.publish('p', 42, use_snapshot=True)
|
||||
test1.publish('p', [42], use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test2.last_msg['data'], 42)
|
||||
lambda: test2.last_msg['data'], [42])
|
||||
self.assertFalse(hasattr(test1, "last_msg"))
|
||||
node.stop()
|
||||
|
||||
@ -91,9 +91,9 @@ class TestDSE(base.TestCase):
|
||||
test1.unsubscribe('test2', 'q') # unsub from q should not affect p
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(test1, 'last_msg'), True)
|
||||
test2.publish('p', 42, use_snapshot=True)
|
||||
test2.publish('p', [42], use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test1.last_msg['data'], 42)
|
||||
lambda: test1.last_msg['data'], [42])
|
||||
self.assertFalse(hasattr(test2, "last_msg"))
|
||||
node.stop()
|
||||
|
||||
@ -106,9 +106,9 @@ class TestDSE(base.TestCase):
|
||||
self.assertFalse(hasattr(test1, "last_msg"))
|
||||
test2 = fake_datasource.FakeDataSource('test2')
|
||||
node.register_service(test2)
|
||||
test2.publish('p', 42, use_snapshot=True)
|
||||
test2.publish('p', [42], use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test1.last_msg['data'], 42)
|
||||
lambda: test1.last_msg['data'], [42])
|
||||
self.assertFalse(hasattr(test2, "last_msg"))
|
||||
node.stop()
|
||||
node.wait()
|
||||
@ -124,9 +124,9 @@ class TestDSE(base.TestCase):
|
||||
test1.subscribe('test2', 'p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(test1, 'last_msg'), True)
|
||||
test2.publish('p', 42, use_snapshot=True)
|
||||
test2.publish('p', [42], use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test1.last_msg['data'], 42)
|
||||
lambda: test1.last_msg['data'], [42])
|
||||
self.assertFalse(hasattr(test2, "last_msg"))
|
||||
node1.stop()
|
||||
node2.stop()
|
||||
@ -144,9 +144,9 @@ class TestDSE(base.TestCase):
|
||||
test1.unsubscribe('test2', 'q') # unsub from q should not affect p
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(test1, 'last_msg'), True)
|
||||
test2.publish('p', 42, use_snapshot=True)
|
||||
test2.publish('p', [42], use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test1.last_msg['data'], 42)
|
||||
lambda: test1.last_msg['data'], [42])
|
||||
self.assertFalse(hasattr(test2, "last_msg"))
|
||||
node1.stop()
|
||||
node2.stop()
|
||||
@ -164,9 +164,9 @@ class TestDSE(base.TestCase):
|
||||
test1.subscribe('test3', 'p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(test1, 'last_msg'), True)
|
||||
test3.publish('p', 42, use_snapshot=True)
|
||||
test3.publish('p', [42], use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test1.last_msg['data'], 42)
|
||||
lambda: test1.last_msg['data'], [42])
|
||||
self.assertFalse(hasattr(test2, "last_msg"))
|
||||
self.assertFalse(hasattr(test3, "last_msg"))
|
||||
node1.stop()
|
||||
@ -197,9 +197,9 @@ class TestDSE(base.TestCase):
|
||||
nova.subscribe('test', 'p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(nova, 'last_msg'), True)
|
||||
test.publish('p', 42, use_snapshot=True)
|
||||
test.publish('p', [42], use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: nova.last_msg['data'], 42)
|
||||
lambda: nova.last_msg['data'], [42])
|
||||
self.assertFalse(hasattr(test, "last_msg"))
|
||||
node.stop()
|
||||
|
||||
@ -215,15 +215,15 @@ class TestDSE(base.TestCase):
|
||||
nova.subscribe('test', 'p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(nova, 'last_msg'), True)
|
||||
test.publish('p', 42, use_snapshot=True)
|
||||
test.publish('p', [42], use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: nova.last_msg['data'], 42)
|
||||
lambda: nova.last_msg['data'], [42])
|
||||
self.assertFalse(hasattr(test, "last_msg"))
|
||||
nova.unsubscribe('test', 'p')
|
||||
test.publish('p', 43, use_snapshot=True)
|
||||
test.publish('p', [43], use_snapshot=True)
|
||||
# hard to test that the message is never delivered
|
||||
time.sleep(0.2)
|
||||
self.assertEqual(nova.last_msg['data'], 42)
|
||||
self.assertEqual(nova.last_msg['data'], [42])
|
||||
node.stop()
|
||||
|
||||
@mock.patch.object(nova_client, 'Client', spec_set=True, autospec=True)
|
||||
@ -238,9 +238,9 @@ class TestDSE(base.TestCase):
|
||||
test.subscribe('nova', 'p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(test, 'last_msg'), True)
|
||||
nova.publish('p', 42, use_snapshot=True)
|
||||
nova.publish('p', [42], use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test.last_msg['data'], 42)
|
||||
lambda: test.last_msg['data'], [42])
|
||||
self.assertFalse(hasattr(nova, "last_msg"))
|
||||
node.stop()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user