Allow subscription before data service exists
If service does not respond, initialize to empty data. When the service comes into existence, the data will flow as normal. Preserves pre-DSE2 behavior. Closes-Bug: 1637172 Change-Id: Ic4910525a1c0bc13d79b3773854abdf6fe447908
This commit is contained in:
parent
987ccb15c8
commit
5beae4bd26
|
@ -11,22 +11,22 @@
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
#
|
|
||||||
|
|
||||||
from oslo_log import log as logging
|
|
||||||
from oslo_serialization import jsonutils as json
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
import copy
|
import copy
|
||||||
from oslo_config import cfg
|
import six # thirdparty import first because needed in import of Queue/queue
|
||||||
import six
|
|
||||||
import time
|
import time
|
||||||
if six.PY2:
|
if six.PY2:
|
||||||
import Queue as queue_package
|
import Queue as queue_package
|
||||||
else:
|
else:
|
||||||
import queue as queue_package
|
import queue as queue_package
|
||||||
|
|
||||||
|
from oslo_config import cfg
|
||||||
|
from oslo_log import log as logging
|
||||||
|
from oslo_serialization import jsonutils as json
|
||||||
|
|
||||||
|
from congress import exception
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class DataServiceInfo(object):
|
class DataServiceInfo(object):
|
||||||
"""Metadata for DataService on the DSE.
|
"""Metadata for DataService on the DSE.
|
||||||
|
@ -262,16 +262,20 @@ class DataService(object):
|
||||||
|
|
||||||
# Note(thread-safety): blocking function
|
# Note(thread-safety): blocking function
|
||||||
def subscribe(self, service, table):
|
def subscribe(self, service, table):
|
||||||
if self.always_snapshot:
|
try:
|
||||||
|
if self.always_snapshot:
|
||||||
|
# Note(thread-safety): blocking call
|
||||||
|
data = self.node.subscribe_table(
|
||||||
|
self.service_id, service, table)
|
||||||
|
self.receive_data(service, table, data, is_snapshot=True)
|
||||||
|
return
|
||||||
# Note(thread-safety): blocking call
|
# Note(thread-safety): blocking call
|
||||||
data = self.node.subscribe_table(self.service_id, service, table)
|
(seqnum, data) = self.node.subscribe_table(
|
||||||
self.receive_data(service, table, data, is_snapshot=True)
|
self.service_id, service, table)
|
||||||
return
|
self.receive_data_sequenced(
|
||||||
# Note(thread-safety): blocking call
|
service, table, data, seqnum, is_snapshot=True)
|
||||||
(seqnum, data) = self.node.subscribe_table(
|
except exception.NotFound:
|
||||||
self.service_id, service, table)
|
self.receive_data(service, table, set(), is_snapshot=True)
|
||||||
self.receive_data_sequenced(
|
|
||||||
service, table, data, seqnum, is_snapshot=True)
|
|
||||||
|
|
||||||
def unsubscribe(self, service, table):
|
def unsubscribe(self, service, table):
|
||||||
# Note(thread-safety): it is important to make sure there are no
|
# Note(thread-safety): it is important to make sure there are no
|
||||||
|
|
|
@ -95,6 +95,19 @@ class TestRuleModel(base.SqlTestCase):
|
||||||
# {'rule': 'p(x) :- beta:q(name=x)'},
|
# {'rule': 'p(x) :- beta:q(name=x)'},
|
||||||
# {}, context=self.context)
|
# {}, context=self.context)
|
||||||
|
|
||||||
|
def test_add_rule_with_cross_policy_table(self):
|
||||||
|
test_rule = {
|
||||||
|
"rule": "p(x) :- classification:q(x)",
|
||||||
|
"name": "test-rule-cross",
|
||||||
|
"comment": "test-comment"
|
||||||
|
}
|
||||||
|
test_rule_id, obj = self.rule_model.add_item(test_rule, {},
|
||||||
|
context=self.context)
|
||||||
|
test_rule['id'] = test_rule_id
|
||||||
|
ret = self.rule_model.get_item(test_rule_id, {},
|
||||||
|
context=self.context)
|
||||||
|
self.assertEqual(test_rule, ret)
|
||||||
|
|
||||||
def test_get_items(self):
|
def test_get_items(self):
|
||||||
ret = self.rule_model.get_items({}, context=self.context)
|
ret = self.rule_model.get_items({}, context=self.context)
|
||||||
self.assertTrue(all(p in ret['results']
|
self.assertTrue(all(p in ret['results']
|
||||||
|
|
|
@ -25,6 +25,7 @@ cfg.CONF.datasource_sync_period = 0
|
||||||
from oslo_messaging import conffixture
|
from oslo_messaging import conffixture
|
||||||
|
|
||||||
from congress.api import base as api_base
|
from congress.api import base as api_base
|
||||||
|
from congress.datalog import base as datalog_base
|
||||||
from congress.datalog import compile
|
from congress.datalog import compile
|
||||||
from congress.datasources import nova_driver
|
from congress.datasources import nova_driver
|
||||||
from congress import exception as congressException
|
from congress import exception as congressException
|
||||||
|
@ -95,6 +96,23 @@ class TestDSE(base.TestCase):
|
||||||
self.assertFalse(hasattr(test2, "last_msg"))
|
self.assertFalse(hasattr(test2, "last_msg"))
|
||||||
node.stop()
|
node.stop()
|
||||||
|
|
||||||
|
def test_sub_before_service_exists(self):
|
||||||
|
node = helper.make_dsenode_new_partition('testnode')
|
||||||
|
test1 = fake_datasource.FakeDataSource('test1')
|
||||||
|
node.register_service(test1)
|
||||||
|
|
||||||
|
test1.subscribe('test2', 'p')
|
||||||
|
helper.retry_check_function_return_value(
|
||||||
|
lambda: test1.last_msg['data'], set())
|
||||||
|
test2 = fake_datasource.FakeDataSource('test2')
|
||||||
|
node.register_service(test2)
|
||||||
|
test2.publish('p', 42)
|
||||||
|
helper.retry_check_function_return_value(
|
||||||
|
lambda: test1.last_msg['data'], 42)
|
||||||
|
self.assertFalse(hasattr(test2, "last_msg"))
|
||||||
|
node.stop()
|
||||||
|
node.wait()
|
||||||
|
|
||||||
def test_internode_pubsub(self):
|
def test_internode_pubsub(self):
|
||||||
node1 = helper.make_dsenode_new_partition('testnode1')
|
node1 = helper.make_dsenode_new_partition('testnode1')
|
||||||
test1 = fake_datasource.FakeDataSource('test1')
|
test1 = fake_datasource.FakeDataSource('test1')
|
||||||
|
@ -283,7 +301,7 @@ class TestDSE(base.TestCase):
|
||||||
node.register_service(engine)
|
node.register_service(engine)
|
||||||
|
|
||||||
engine.create_policy('policy1')
|
engine.create_policy('policy1')
|
||||||
engine.create_policy('data')
|
engine.create_policy('data', kind=datalog_base.DATASOURCE_POLICY_TYPE)
|
||||||
self.insert_rule(engine, 'p(x) :- data:fake_table(x)', 'policy1')
|
self.insert_rule(engine, 'p(x) :- data:fake_table(x)', 'policy1')
|
||||||
data.state = {'fake_table': set([(1,), (2,)])}
|
data.state = {'fake_table': set([(1,), (2,)])}
|
||||||
data.poll()
|
data.poll()
|
||||||
|
@ -302,7 +320,7 @@ class TestDSE(base.TestCase):
|
||||||
node.register_service(engine)
|
node.register_service(engine)
|
||||||
|
|
||||||
engine.create_policy('policy1')
|
engine.create_policy('policy1')
|
||||||
engine.create_policy('data')
|
engine.create_policy('data', kind=datalog_base.DATASOURCE_POLICY_TYPE)
|
||||||
self.insert_rule(engine, 'p(x) :- data:fake_table(x)', 'policy1')
|
self.insert_rule(engine, 'p(x) :- data:fake_table(x)', 'policy1')
|
||||||
data.state = {'fake_table': set([(1,), (2,)])}
|
data.state = {'fake_table': set([(1,), (2,)])}
|
||||||
data.poll()
|
data.poll()
|
||||||
|
@ -325,7 +343,7 @@ class TestDSE(base.TestCase):
|
||||||
node.register_service(engine)
|
node.register_service(engine)
|
||||||
|
|
||||||
engine.create_policy('policy1')
|
engine.create_policy('policy1')
|
||||||
engine.create_policy('data')
|
engine.create_policy('data', kind=datalog_base.DATASOURCE_POLICY_TYPE)
|
||||||
data.state = {'fake_table': set([(1,), (2,)])}
|
data.state = {'fake_table': set([(1,), (2,)])}
|
||||||
data.poll()
|
data.poll()
|
||||||
self.insert_rule(engine, 'p(x) :- data:fake_table(x)', 'policy1')
|
self.insert_rule(engine, 'p(x) :- data:fake_table(x)', 'policy1')
|
||||||
|
@ -458,7 +476,7 @@ class TestDSE(base.TestCase):
|
||||||
node.register_service(policy)
|
node.register_service(policy)
|
||||||
node.register_service(policy2)
|
node.register_service(policy2)
|
||||||
|
|
||||||
policy.create_policy('data')
|
policy.create_policy('data', kind=datalog_base.DATASOURCE_POLICY_TYPE)
|
||||||
policy.create_policy('classification')
|
policy.create_policy('classification')
|
||||||
policy.set_schema('data', compile.Schema({'q': (1,)}))
|
policy.set_schema('data', compile.Schema({'q': (1,)}))
|
||||||
policy.insert('p(x):-data:q(x),gt(x,2)', target='classification')
|
policy.insert('p(x):-data:q(x),gt(x,2)', target='classification')
|
||||||
|
|
|
@ -134,6 +134,23 @@ class TestRuntime(base.TestCase):
|
||||||
'p(1)',
|
'p(1)',
|
||||||
'Multipolicy deletion select')
|
'Multipolicy deletion select')
|
||||||
|
|
||||||
|
def test_cross_policy_rule(self):
|
||||||
|
"""Test rule that refer to table from another policy."""
|
||||||
|
run = agnostic.Runtime()
|
||||||
|
run.create_policy('test1')
|
||||||
|
run.create_policy('test2')
|
||||||
|
run.create_policy('test3')
|
||||||
|
run.insert(
|
||||||
|
'p(x) :- test1:q(x),test2:q(x),test3:q(x),q(x) q(1) q(2) q(3)',
|
||||||
|
'test3')
|
||||||
|
run.insert('q(1)', 'test1')
|
||||||
|
run.insert('q(1) q(2)', 'test2')
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
run.select('p(x)', 'test3'),
|
||||||
|
'p(1)',
|
||||||
|
'Cross-policy rule select')
|
||||||
|
|
||||||
def test_policy_types(self):
|
def test_policy_types(self):
|
||||||
"""Test types for multiple policies."""
|
"""Test types for multiple policies."""
|
||||||
# policy types
|
# policy types
|
||||||
|
|
Loading…
Reference in New Issue