Merge "dse2 sequenced, differential pub-sub"

This commit is contained in:
Jenkins 2016-06-10 09:36:49 +00:00 committed by Gerrit Code Review
commit 18c6627ca4
7 changed files with 580 additions and 19 deletions

View File

@ -1289,9 +1289,10 @@ class PollingDataSourceDriver(DataSourceDriver):
# late (or dies and comes back up), DSE can automatically
# send the full table.
if tablename in self.state:
self.publish(tablename, self.state[tablename])
self.publish(
tablename, self.state[tablename], use_snapshot=False)
else:
self.publish(tablename, set())
self.publish(tablename, set(), use_snapshot=False)
except Exception as e:
self.last_error = e
LOG.exception("Datasource driver raised exception")

View File

@ -489,7 +489,9 @@ class deepSix(greenthread.GreenThread):
self.reqtimeout,
corruuid))
def publish(self, dataindex, newdata, key=''):
def publish(self, dataindex, newdata, key='', use_snapshot=False):
# Note(ekcs): use_snapshot param is ignored.
# Accepted here on temporary basis for dse1+2 compatibility.
self.log_debug("publishing to dataindex %s with data %s",
dataindex, strutils.mask_password(newdata, "****"))
if dataindex not in self.pubdata:

View File

@ -18,6 +18,12 @@ from oslo_serialization import jsonutils as json
LOG = logging.getLogger(__name__)
import six
if six.PY2:
import Queue as queue_package
else:
import queue as queue_package
class DataServiceInfo(object):
"""Metadata for DataService on the DSE.
@ -89,6 +95,11 @@ class DataService(object):
# TODO(pballand): make default methods for pub/subscribed tables
def __init__(self, service_id):
# Note(ekcs): temporary setting to disable use of diffs and sequencing
# to avoid muddying the process of a first dse2 system test.
# TODO(ekcs,dse2): remove when differential update is standard
self.always_snapshot = True
self.service_id = service_id
self.node = None
self._rpc_server = None
@ -97,6 +108,16 @@ class DataService(object):
self._running = False
self._published_tables_with_subscriber = set()
# data structures for sequenced data updates for reliable pub-sub
# msg queues for msgs to be processed
self.msg_queues = {} # {publisher -> {table -> msg queue}}
# last received & processed seqnum
self.receiver_seqnums = {} # {publisher -> {table -> seqnum}}
# last sent seqnum
self.sender_seqnums = {} # {table -> seqnum}
# last published data
self._last_published_data = {} # {table -> data}
def add_rpc_endpoint(self, endpt):
self._rpc_endpoints.append(endpt)
@ -178,27 +199,156 @@ class DataService(object):
def delete_datasource(self, datasource):
return self.node.delete_datasource(datasource)
def publish(self, table, data):
self.node.publish_table(self.service_id, table, data)
def publish(self, table, data, use_snapshot=True):
if self.always_snapshot:
self.node.publish_table(self.service_id, table, data)
return
def get_differential_and_set_last_published_data():
if table in self._last_published_data:
to_add = list(
set(data) - set(self._last_published_data[table]))
to_del = list(
set(self._last_published_data[table]) - set(data))
self._last_published_data[table] = data
else:
self._last_published_data[table] = data
to_add = data
to_del = []
return [to_add, to_del]
def increment_get_seqnum():
if table not in self.sender_seqnums:
self.sender_seqnums[table] = 0
else:
self.sender_seqnums[table] = self.sender_seqnums[table] + 1
return self.sender_seqnums[table]
if not use_snapshot:
data = get_differential_and_set_last_published_data()
if len(data[0]) == 0 and len(data[1]) == 0:
return
seqnum = increment_get_seqnum()
self.node.publish_table_sequenced(
self.service_id, table, data, use_snapshot, seqnum)
def subscribe(self, service, table):
data = self.node.subscribe_table(self.service_id, service, table)
self.receive_data(service, table, data)
if self.always_snapshot:
data = self.node.subscribe_table(self.service_id, service, table)
self.receive_data(service, table, data, is_snapshot=True)
return
(seqnum, data) = self.node.subscribe_table(
self.service_id, service, table)
self.receive_data_sequenced(
service, table, data, seqnum, is_snapshot=True)
def unsubscribe(self, service, table):
self.node.unsubscribe_table(self.service_id, service, table)
self._clear_msg_queue(service, table)
self._clear_receiver_seqnum(service, table)
def receive_data(self, publisher, table, data):
def _clear_msg_queue(self, publisher, table):
if publisher in self.msg_queues:
if table in self.msg_queues[publisher]:
del self.msg_queues[publisher][table]
def _clear_receiver_seqnum(self, publisher, table):
if publisher in self.receiver_seqnums:
if table in self.receiver_seqnums[publisher]:
del self.receiver_seqnums[publisher][table]
def receive_data_sequenced(
self, publisher, table, data, seqnum, is_snapshot=False):
"""Method called when sequenced publication data arrives."""
# TODO(ekcs): allow opting out of sequenced processing (per table)
# TODO(ekcs): re-subscribe when update missing for too long
def set_seqnum():
if publisher not in self.receiver_seqnums:
self.receiver_seqnums[publisher] = {}
self.receiver_seqnums[publisher][table] = seqnum
def clear_msg_queue():
self._clear_msg_queue(publisher, table)
def add_to_msg_queue():
if publisher not in self.msg_queues:
self.msg_queues[publisher] = {}
if table not in self.msg_queues[publisher]:
self.msg_queues[publisher][table] = \
queue_package.PriorityQueue()
self.msg_queues[publisher][table].put(
(seqnum, is_snapshot, data))
assert self.msg_queues[publisher][table].qsize() > 0
def process_queued_msg():
try:
s, i, d = self.msg_queues[publisher][table].get_nowait()
self.receive_data_sequenced(publisher, table, d, s, i)
except queue_package.Empty:
pass
except KeyError:
pass
# if no seqnum process immediately
if seqnum is None:
self.receive_data(publisher, table, data, is_snapshot)
# if first data update received on this table
elif (publisher not in self.receiver_seqnums or
table not in self.receiver_seqnums[publisher]):
if is_snapshot:
# set sequence number and process data
set_seqnum()
self.receive_data(publisher, table, data, is_snapshot)
process_queued_msg()
else:
# queue
add_to_msg_queue()
# if re-initialization
elif seqnum == 0: # initial snapshot or reset
# set sequence number and process data
set_seqnum()
clear_msg_queue()
self.receive_data(publisher, table, data, is_snapshot)
else:
# if seqnum is old, ignore
if seqnum <= self.receiver_seqnums[publisher][table]:
process_queued_msg()
# if seqnum next, process all in sequence
elif seqnum == self.receiver_seqnums[publisher][table] + 1:
set_seqnum()
self.receive_data(publisher, table, data, is_snapshot)
process_queued_msg()
# if seqnum future, queue for future
elif seqnum > self.receiver_seqnums[publisher][table] + 1:
add_to_msg_queue()
def receive_data(self, publisher, table, data, is_snapshot=True):
"""Method called when publication data arrives.
Instances will override this method.
"""
data = self.node.to_set_of_tuples(data)
if is_snapshot:
data = self.node.to_set_of_tuples(data)
else:
data = (self.node.to_set_of_tuples(data[0]),
self.node.to_set_of_tuples(data[1]))
self.last_msg = {}
self.last_msg['data'] = data
self.last_msg['publisher'] = publisher
self.last_msg['table'] = table
if not hasattr(self, 'receive_data_history'):
self.receive_data_history = []
self.receive_data_history.append(self.last_msg)
def subscription_list(self):
"""Method that returns subscription list.
@ -221,6 +371,13 @@ class DataService(object):
LOG.info('subscriber_list is duplicated in the new architecture.')
return []
def get_last_published_data_with_seqnum(self, table):
"""Method that returns the current seqnum & data for given table."""
if table not in self.sender_seqnums:
self.sender_seqnums[table] = 0
self._last_published_data[table] = self.get_snapshot(table)
return (self.sender_seqnums[table], self._last_published_data[table])
def get_snapshot(self, table):
"""Method that returns the current data for the given table.
@ -241,3 +398,10 @@ class DataServiceEndPoints (object):
return self.service.get_snapshot(table)
except AttributeError:
pass
def get_last_published_data_with_seqnum(self, context, table):
"""Function called on a node when an RPC request is sent."""
try:
return self.service.get_last_published_data_with_seqnum(table)
except AttributeError:
pass

View File

@ -89,6 +89,11 @@ class DseNode(object):
def __init__(self, messaging_config, node_id, node_rpc_endpoints,
partition_id=None):
# Note(ekcs): temporary setting to disable use of diffs and sequencing
# to avoid muddying the process of a first dse2 system test.
# TODO(ekcs,dse2): remove when differential update is standard
self.always_snapshot = False
self.messaging_config = messaging_config
self.node_id = node_id
self.node_rpc_endpoints = node_rpc_endpoints
@ -133,6 +138,8 @@ class DseNode(object):
def register_service(self, service):
assert service.node is None
service.always_snapshot = self.always_snapshot
service.node = self
self._services.append(service)
service._target = self.service_rpc_target(service.service_id,
@ -300,6 +307,8 @@ class DseNode(object):
client = messaging.RPCClient(self.transport, target)
client.cast(self.context, method, **kwargs)
# Note(ekcs): non-sequenced publish retained to simplify rollout of dse2
# to be replaced by handle_publish_sequenced
def publish_table(self, publisher, table, data):
"""Invoke RPC method on all insances of service_id.
@ -318,6 +327,26 @@ class DseNode(object):
self.broadcast_node_rpc("handle_publish", publisher=publisher,
table=table, data=data)
def publish_table_sequenced(
self, publisher, table, data, is_snapshot, seqnum):
"""Invoke RPC method on all insances of service_id.
Args:
service_id: The ID of the data service on which to invoke the call.
method: The method name to call.
kwargs: A dict of method arguments.
Returns:
None - Methods are invoked asynchronously and results are dropped.
Raises: RemoteError, MessageDeliveryFailure
"""
LOG.trace("<%s> Publishing from '%s' table %s: %s",
self.node_id, publisher, table, data)
self.broadcast_node_rpc(
"handle_publish_sequenced", publisher=publisher, table=table,
data=data, is_snapshot=is_snapshot, seqnum=seqnum)
def table_subscribers(self, publisher, table):
"""List services on this node that subscribes to publisher/table."""
return self.subscriptions.get(
@ -333,11 +362,15 @@ class DseNode(object):
self.subscriptions[publisher][table] = set()
self.subscriptions[publisher][table].add(subscriber)
snapshot = self.invoke_service_rpc(
publisher, "get_snapshot", table=table)
# oslo returns [] instead of set(), so handle that case directly
return self.to_set_of_tuples(snapshot)
if self.always_snapshot:
snapshot = self.invoke_service_rpc(
publisher, "get_snapshot", table=table)
return self.to_set_of_tuples(snapshot)
else:
snapshot_seqnum = self.invoke_service_rpc(
publisher, "get_last_published_data_with_seqnum", table=table)
return snapshot_seqnum
def get_subscription(self, service_id):
"""Return publisher/tables subscribed by service: service_id
@ -589,6 +622,8 @@ class DseNodeEndpoints (object):
def __init__(self, dsenode):
self.node = dsenode
# Note(ekcs): non-sequenced publish retained to simplify rollout of dse2
# to be replaced by handle_publish_sequenced
def handle_publish(self, context, publisher, table, data):
"""Function called on the node when a publication is sent.
@ -596,4 +631,15 @@ class DseNodeEndpoints (object):
"""
for s in self.node.table_subscribers(publisher, table):
self.node.service_object(s).receive_data(
publisher=publisher, table=table, data=data)
publisher=publisher, table=table, data=data, is_snapshot=True)
def handle_publish_sequenced(
self, context, publisher, table, data, is_snapshot, seqnum):
"""Function called on the node when a publication is sent.
Forwards the publication to all of the relevant services.
"""
for s in self.node.table_subscribers(publisher, table):
self.node.service_object(s).receive_data_sequenced(
publisher=publisher, table=table, data=data, seqnum=seqnum,
is_snapshot=is_snapshot)

View File

@ -2074,13 +2074,28 @@ class Dse2Runtime(DseRuntime):
def service_exists(self, service_name):
return self.is_valid_service(service_name)
def receive_data(self, publisher, table, data):
def receive_data(self, publisher, table, data, is_snapshot=False):
"""Event handler for when a dataservice publishes data.
That data can either be the full table (as a list of tuples)
or a delta (a list of Events).
"""
self.log("received data msg for %s:%s", publisher, table)
if not is_snapshot:
to_add = data[0]
to_del = data[1]
result = []
for row in to_del:
formula = compile.Literal.create_from_table_tuple(table, row)
event = compile.Event(formula=formula, insert=False)
result.append(event)
for row in to_add:
formula = compile.Literal.create_from_table_tuple(table, row)
event = compile.Event(formula=formula, insert=True)
result.append(event)
self.receive_data_update(publisher, table, result)
return
# if empty data, assume it is an init msg, since noop otherwise
if len(data) == 0:
self.receive_data_full(publisher, table, data)

View File

@ -217,6 +217,7 @@ class TestDSE(base.TestCase):
def test_datasource_poll(self):
node = helper.make_dsenode_new_partition('testnode')
node.always_snapshot = True # Note(ekcs): this test expects snapshot
pub = FakeDataSource('pub')
sub = FakeDataSource('sub')
node.register_service(pub)
@ -229,20 +230,66 @@ class TestDSE(base.TestCase):
lambda: sub.last_msg['data'], set(pub.state['fake_table']))
self.assertFalse(hasattr(pub, "last_msg"))
def test_policy(self):
def test_policy_data(self):
"""Test policy correctly processes initial data snapshot."""
node = helper.make_dsenode_new_partition('testnode')
node.always_snapshot = False
data = FakeDataSource('data')
engine = Dse2Runtime('engine')
node.register_service(data)
node.register_service(engine)
engine.create_policy('alpha')
engine.create_policy('policy1')
engine.create_policy('data')
self.insert_rule(engine, 'p(x) :- data:fake_table(x)', 'alpha')
self.insert_rule(engine, 'p(x) :- data:fake_table(x)', 'policy1')
data.state = {'fake_table': set([(1,), (2,)])}
data.poll()
helper.retry_check_db_equal(
engine, 'p(x)', 'p(1) p(2)', target='alpha')
engine, 'p(x)', 'p(1) p(2)', target='policy1')
self.assertFalse(hasattr(engine, "last_msg"))
def test_policy_data_update(self):
"""Test policy correctly processes initial data snapshot and update."""
node = helper.make_dsenode_new_partition('testnode')
node.always_snapshot = False
data = FakeDataSource('data')
engine = Dse2Runtime('engine')
node.register_service(data)
node.register_service(engine)
engine.create_policy('policy1')
engine.create_policy('data')
self.insert_rule(engine, 'p(x) :- data:fake_table(x)', 'policy1')
data.state = {'fake_table': set([(1,), (2,)])}
data.poll()
helper.retry_check_db_equal(
engine, 'p(x)', 'p(1) p(2)', target='policy1')
data.state = {'fake_table': set([(1,), (2,), (3,)])}
data.poll()
helper.retry_check_db_equal(
engine, 'p(x)', 'p(1) p(2) p(3)', target='policy1')
self.assertFalse(hasattr(engine, "last_msg"))
def test_policy_data_late_sub(self):
"""Test policy correctly processes data on late subscribe."""
node = helper.make_dsenode_new_partition('testnode')
node.always_snapshot = False
data = FakeDataSource('data')
engine = Dse2Runtime('engine')
node.register_service(data)
node.register_service(engine)
engine.create_policy('policy1')
engine.create_policy('data')
data.state = {'fake_table': set([(1,), (2,)])}
data.poll()
self.insert_rule(engine, 'p(x) :- data:fake_table(x)', 'policy1')
helper.retry_check_db_equal(
engine, 'p(x)', 'p(1) p(2)', target='policy1')
data.state = {'fake_table': set([(1,), (2,), (3,)])}
data.poll()
helper.retry_check_db_equal(
engine, 'p(x)', 'p(1) p(2) p(3)', target='policy1')
self.assertFalse(hasattr(engine, "last_msg"))
def insert_rule(self, engine, statement, target=None):

View File

@ -21,6 +21,10 @@ cfg.CONF.distributed_architecture = True
from congress.tests.policy_engines.test_agnostic import TestRuntime
from congress.tests2.api import base as api_base
from congress.policy_engines import agnostic
from congress.tests import helper
import sys
class TestDse2Runtime(TestRuntime):
def setUp(self):
@ -67,3 +71,285 @@ class TestDse2Runtime(TestRuntime):
# loaded rule is enabled and subscribes the table
subscriptions = engine2.subscription_list()
self.assertEqual([('nova', 'services')], subscriptions)
class TestAgnostic(TestRuntime):
def test_receive_data_no_sequence_num(self):
'''Test receiving data without sequence numbers'''
run = agnostic.Dse2Runtime('engine')
run.always_snapshot = False
run.create_policy('datasource1')
# initialize with full table
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[1], [2]], seqnum=None, is_snapshot=True)
actual = run.select('p(x)')
correct = 'p(1) p(2)'
self.assertTrue(helper.db_equal(actual, correct))
# add data
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[[3], [4]], []], seqnum=None, is_snapshot=False)
actual = run.select('p(x)')
correct = 'p(1) p(2) p(3) p(4)'
self.assertTrue(helper.db_equal(actual, correct))
# remove data
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[], [[2], [4]]], seqnum=None, is_snapshot=False)
actual = run.select('p(x)')
correct = 'p(1) p(3)'
self.assertTrue(helper.db_equal(actual, correct))
# add & remove data
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[[4]], [[3]]], seqnum=None, is_snapshot=False)
actual = run.select('p(x)')
correct = 'p(1) p(4)'
self.assertTrue(helper.db_equal(actual, correct))
# re-initialize with full table
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[1], [2]], seqnum=None, is_snapshot=True)
actual = run.select('p(x)')
correct = 'p(1) p(2)'
self.assertTrue(helper.db_equal(actual, correct))
def test_receive_data_in_order(self):
'''Test receiving data with sequence numbers, in order'''
run = agnostic.Dse2Runtime('engine')
run.always_snapshot = False
run.create_policy('datasource1')
# initialize with full table
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[1], [2]], seqnum=0, is_snapshot=True)
actual = run.select('p(x)')
correct = 'p(1) p(2)'
self.assertTrue(helper.db_equal(actual, correct))
# add data
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[[3], [4]], []], seqnum=1, is_snapshot=False)
actual = run.select('p(x)')
correct = 'p(1) p(2) p(3) p(4)'
self.assertTrue(helper.db_equal(actual, correct))
# remove data
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[], [[2], [4]]], seqnum=2, is_snapshot=False)
actual = run.select('p(x)')
correct = 'p(1) p(3)'
self.assertTrue(helper.db_equal(actual, correct))
# add & remove data
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[[4]], [[3]]], seqnum=3, is_snapshot=False)
actual = run.select('p(x)')
correct = 'p(1) p(4)'
self.assertTrue(helper.db_equal(actual, correct))
# re-initialize with full table
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[1], [2]], seqnum=4, is_snapshot=True)
actual = run.select('p(x)')
correct = 'p(1) p(2)'
self.assertTrue(helper.db_equal(actual, correct))
def test_receive_data_out_of_order(self):
'''Test receiving data with sequence numbers, out of order'''
run = agnostic.Dse2Runtime('engine')
run.always_snapshot = False
run.create_policy('datasource1')
# update with lower seqnum than init snapshot is ignored
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[[10]], []], seqnum=3, is_snapshot=False)
# add & remove data
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[[4]], [[3]]], seqnum=7, is_snapshot=False)
actual = run.select('p(x)')
correct = ''
self.assertTrue(helper.db_equal(actual, correct))
# remove data
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[], [[2], [4]]], seqnum=6, is_snapshot=False)
actual = run.select('p(x)')
correct = ''
self.assertTrue(helper.db_equal(actual, correct))
# add data
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[[3], [4]], []], seqnum=5, is_snapshot=False)
actual = run.select('p(x)')
correct = ''
self.assertTrue(helper.db_equal(actual, correct))
# initialize with full table
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[1], [2]], seqnum=4, is_snapshot=True)
actual = run.select('p(x)')
correct = 'p(1) p(4)'
self.assertTrue(helper.db_equal(actual, correct))
def test_receive_data_arbitrary_start(self):
'''Test receiving data with arbitrary starting sequence number'''
run = agnostic.Dse2Runtime('engine')
run.always_snapshot = False
run.create_policy('datasource1')
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[1], [2]], seqnum=1234, is_snapshot=True)
actual = run.select('p(x)')
correct = 'p(1) p(2)'
self.assertTrue(helper.db_equal(actual, correct))
def test_receive_data_duplicate_sequence_number(self):
'''Test receiving data with duplicate sequence number
Only one message (arbitrary) should be processed.
'''
run = agnostic.Dse2Runtime('engine')
run.always_snapshot = False
run.create_policy('datasource1')
# send three updates with the same seqnum
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[[1]], []], seqnum=1, is_snapshot=False)
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[[2]], []], seqnum=1, is_snapshot=False)
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[[3]], []], seqnum=1, is_snapshot=False)
# start with empty data
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[], seqnum=0, is_snapshot=True)
# exactly one of the three updates should be applied
actual = run.select('p(x)')
correct1 = 'p(1)'
correct2 = 'p(2)'
correct3 = 'p(3)'
self.assertTrue(
helper.db_equal(actual, correct1) or
helper.db_equal(actual, correct2) or
helper.db_equal(actual, correct3))
def test_receive_data_sequence_number_max_int(self):
'''Test receiving data when sequence number goes over max int'''
run = agnostic.Dse2Runtime('engine')
run.always_snapshot = False
run.create_policy('datasource1')
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[1], [2]], seqnum=sys.maxsize, is_snapshot=True)
actual = run.select('p(x)')
correct = 'p(1) p(2)'
self.assertTrue(helper.db_equal(actual, correct))
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[], [[2]]], seqnum=sys.maxsize + 1, is_snapshot=False)
actual = run.select('p(x)')
correct = 'p(1)'
self.assertTrue(helper.db_equal(actual, correct))
# test out-of-sequence update ignored
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[[2]], []], seqnum=sys.maxsize, is_snapshot=False)
actual = run.select('p(x)')
correct = 'p(1)'
self.assertTrue(helper.db_equal(actual, correct))
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[[4]], []], seqnum=sys.maxsize + 3, is_snapshot=False)
actual = run.select('p(x)')
correct = 'p(1)'
self.assertTrue(helper.db_equal(actual, correct))
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[[3]], []], seqnum=sys.maxsize + 2, is_snapshot=False)
actual = run.select('p(x)')
correct = 'p(1) p(3) p(4)'
self.assertTrue(helper.db_equal(actual, correct))
def test_receive_data_multiple_tables(self):
'''Test receiving data with sequence numbers, multiple tables'''
run = agnostic.Dse2Runtime('engine')
run.always_snapshot = False
run.create_policy('datasource1')
# initialize p with full table
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[1]], seqnum=0, is_snapshot=True)
actual = run.select('p(x)')
correct = 'p(1)'
self.assertTrue(helper.db_equal(actual, correct))
# add data to p
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[[2]], []], seqnum=1, is_snapshot=False)
actual = run.select('p(x)')
correct = 'p(1) p(2)'
self.assertTrue(helper.db_equal(actual, correct))
# add data to q
run.receive_data_sequenced(
publisher='datasource1', table='q',
data=[[[2]], []], seqnum=1, is_snapshot=False)
actual = run.select('q(x)')
correct = '' # does not apply until initialize
self.assertTrue(helper.db_equal(actual, correct))
# initialize q with full table
run.receive_data_sequenced(
publisher='datasource1', table='q',
data=[[1]], seqnum=0, is_snapshot=True)
actual = run.select('q(x)')
correct = 'q(1) q(2)' # both initial data and preceding update applied
self.assertTrue(helper.db_equal(actual, correct))
# add data to q
run.receive_data_sequenced(
publisher='datasource1', table='q',
data=[[[3]], []], seqnum=2, is_snapshot=False)
actual = run.select('q(x)')
correct = 'q(1) q(2) q(3)'
self.assertTrue(helper.db_equal(actual, correct))
# add data to p
run.receive_data_sequenced(
publisher='datasource1', table='p',
data=[[[3]], []], seqnum=2, is_snapshot=False)
actual = run.select('p(x)')
correct = 'p(1) p(2) p(3)'
self.assertTrue(helper.db_equal(actual, correct))
# TODO(ekcs): receive data multiple publishers