diff --git a/congress/api/base.py b/congress/api/base.py index 5152a59c5..14ec9a82e 100644 --- a/congress/api/base.py +++ b/congress/api/base.py @@ -34,11 +34,15 @@ class APIModel(object): self.datasource_mgr = datasource_mgr self.bus = bus self.name = name + self.dse_long_timeout = cfg.CONF.dse_long_timeout # Note(thread-safety): blocking function - def invoke_rpc(self, caller, name, kwargs): + def invoke_rpc(self, caller, name, kwargs, timeout=None): if self.dist_arch: - return self.bus.rpc(caller, name, kwargs) + local = (caller is self.engine and + self.bus.node.service_object(self.engine) is not None) + return self.bus.rpc( + caller, name, kwargs, timeout=timeout, local=local) else: func = getattr(caller, name, None) if func: diff --git a/congress/api/policy_model.py b/congress/api/policy_model.py index f2e3f5233..bc7cd15cd 100644 --- a/congress/api/policy_model.py +++ b/congress/api/policy_model.py @@ -184,7 +184,8 @@ class PolicyModel(base.APIModel): 'action_theory': actions, 'delta': delta, 'trace': trace, 'as_list': True} # Note(thread-safety): blocking call - result = self.invoke_rpc(self.engine, 'simulate', args) + result = self.invoke_rpc(self.engine, 'simulate', args, + timeout=self.dse_long_timeout) except exception.PolicyException as e: (num, desc) = error_codes.get('simulate_error') raise webservice.DataModelException(num, desc + "::" + str(e)) diff --git a/congress/api/row_model.py b/congress/api/row_model.py index b7bf19ca8..b3ce9ac49 100644 --- a/congress/api/row_model.py +++ b/congress/api/row_model.py @@ -83,8 +83,15 @@ class RowModel(base.APIModel): try: args = {'table_id': table_id, 'source_id': source_id, 'trace': gen_trace} - # Note(thread-safety): blocking call - result = self.invoke_rpc(caller, 'get_row_data', args) + if caller is self.engine: + # allow extra time for row policy engine query + # Note(thread-safety): blocking call + result = self.invoke_rpc( + caller, 'get_row_data', args, + timeout=self.dse_long_timeout) + else: + # Note(thread-safety): blocking call + result = self.invoke_rpc(caller, 'get_row_data', args) except exception.CongressException as e: m = ("Error occurred while processing source_id '%s' for row " "data of the table '%s'" % (source_id, table_id)) diff --git a/congress/api/rule_model.py b/congress/api/rule_model.py index 7927b57e8..1725415dc 100644 --- a/congress/api/rule_model.py +++ b/congress/api/rule_model.py @@ -108,7 +108,8 @@ class RuleModel(base.APIModel): 'rule_name': item.get('name'), 'comment': item.get('comment')} # Note(thread-safety): blocking call - return self.invoke_rpc(self.engine, 'persistent_insert_rule', args) + return self.invoke_rpc(self.engine, 'persistent_insert_rule', args, + timeout=self.dse_long_timeout) except exception.CongressException as e: raise webservice.DataModelException.create(e) @@ -131,6 +132,7 @@ class RuleModel(base.APIModel): try: args = {'id_': id_, 'policy_name_or_id': self.policy_name(context)} # Note(thread-safety): blocking call - return self.invoke_rpc(self.engine, 'persistent_delete_rule', args) + return self.invoke_rpc(self.engine, 'persistent_delete_rule', args, + timeout=self.dse_long_timeout) except exception.CongressException as e: raise webservice.DataModelException.create(e) diff --git a/congress/dse2/control_bus.py b/congress/dse2/control_bus.py index 3966caeef..f2134a6f4 100644 --- a/congress/dse2/control_bus.py +++ b/congress/dse2/control_bus.py @@ -118,7 +118,7 @@ class DseNodeControlBus(DataService): cls=HeartbeatEncoder) # Note(thread-safety): blocking call self.node.broadcast_service_rpc(self.service_id, 'accept_heartbeat', - args=args) + {'args': args}) def _call_heartbeat_callbacks(self): for service in self.node.get_services(): diff --git a/congress/dse2/data_service.py b/congress/dse2/data_service.py index 12e4273c5..4eb91b239 100644 --- a/congress/dse2/data_service.py +++ b/congress/dse2/data_service.py @@ -181,10 +181,9 @@ class DataService(object): self._rpc_server.wait() # Note(thread-safety): blocking function - def rpc(self, service, action, kwargs=None): - if kwargs is None: - kwargs = {} - return self.node.invoke_service_rpc(service, action, **kwargs) + def rpc(self, service, action, kwargs=None, timeout=None, local=False): + return self.node.invoke_service_rpc( + service, action, kwargs, timeout=timeout, local=local) # Will be removed once the reference of node exists in api # Note(thread-safety): blocking function @@ -418,3 +417,7 @@ class DataServiceEndPoints (object): return self.service.get_last_published_data_with_seqnum(table) except AttributeError: pass + + def ping(self, client_ctxt, **args): + """Echo args""" + return args diff --git a/congress/dse2/dse_node.py b/congress/dse2/dse_node.py index 69b5ec9d7..8167bfe17 100644 --- a/congress/dse2/dse_node.py +++ b/congress/dse2/dse_node.py @@ -43,7 +43,12 @@ LOG = logging.getLogger(__name__) _dse_opts = [ cfg.StrOpt('bus_id', default='bus', - help='Unique ID of this DSE bus') + help='Unique ID of this DSE bus'), + cfg.IntOpt('dse_ping_timeout', default=5, + help='RPC short timeout in seconds; used to ping destination'), + cfg.IntOpt('dse_long_timeout', default=120, + help='RPC long timeout in seconds; used on potentially long ' + 'running requests such as datasource action and PE row query'), ] cfg.CONF.register_opts(_dse_opts) @@ -237,7 +242,7 @@ class DseNode(object): return service_id in self.get_global_service_names(hidden=True) # Note(thread-safety): blocking function - def invoke_node_rpc(self, node_id, method, **kwargs): + def invoke_node_rpc(self, node_id, method, kwargs=None, timeout=None): """Invoke RPC method on a DSE Node. Args: @@ -250,13 +255,15 @@ class DseNode(object): Raises: MessagingTimeout, RemoteError, MessageDeliveryFailure """ + if kwargs is None: + kwargs = {} target = self.node_rpc_target(server=node_id) LOG.trace("<%s> Invoking RPC '%s' on %s", self.node_id, method, target) - client = messaging.RPCClient(self.transport, target) + client = messaging.RPCClient(self.transport, target, timeout=timeout) return client.call(self.context, method, **kwargs) # Note(thread-safety): blocking function - def broadcast_node_rpc(self, method, **kwargs): + def broadcast_node_rpc(self, method, kwargs=None): """Invoke RPC method on all DSE Nodes. Args: @@ -268,13 +275,16 @@ class DseNode(object): Raises: RemoteError, MessageDeliveryFailure """ + if kwargs is None: + kwargs = {} target = self.node_rpc_target(fanout=True) LOG.trace("<%s> Casting RPC '%s' on %s", self.node_id, method, target) client = messaging.RPCClient(self.transport, target) client.cast(self.context, method, **kwargs) # Note(thread-safety): blocking function - def invoke_service_rpc(self, service_id, method, **kwargs): + def invoke_service_rpc( + self, service_id, method, kwargs=None, timeout=None, local=False): """Invoke RPC method on a DSE Service. Args: @@ -287,23 +297,36 @@ class DseNode(object): Raises: MessagingTimeout, RemoteError, MessageDeliveryFailure, NotFound """ - target = self.service_rpc_target(service_id) - LOG.trace("<%s> Invoking RPC '%s' on %s", self.node_id, method, target) - client = messaging.RPCClient(self.transport, target) - # Using the control bus to check if the service exists before - # running the RPC doesn't always work, either because of bugs - # or nondeterminism--not clear which. + target = self.service_rpc_target( + service_id, server=(self.node_id if local else None)) + LOG.trace("<%s> Preparing to invoking RPC '%s' on %s", + self.node_id, method, target) + client = messaging.RPCClient(self.transport, target, timeout=timeout) + if not self.is_valid_service(service_id): + try: + # First ping the destination to fail fast if unresponsive + LOG.trace("<%s> Checking responsiveness before invoking RPC " + "'%s' on %s", self.node_id, method, target) + client.prepare(timeout=cfg.CONF.dse_ping_timeout).call( + self.context, 'ping') + except messaging_exceptions.MessagingTimeout: + msg = "service '%s' could not be found" + raise exception.NotFound(msg % service_id) + if kwargs is None: + kwargs = {} try: + LOG.trace( + "<%s> Invoking RPC '%s' on %s", self.node_id, method, target) result = client.call(self.context, method, **kwargs) except messaging_exceptions.MessagingTimeout: - msg = "service '%s' could not be found" + msg = "Request to service '%s' timed out" raise exception.NotFound(msg % service_id) LOG.trace("<%s> RPC call returned: %s", self.node_id, result) return result # Note(thread-safety): blocking function - def broadcast_service_rpc(self, service_id, method, **kwargs): - """Invoke RPC method on all insances of service_id. + def broadcast_service_rpc(self, service_id, method, kwargs=None): + """Invoke RPC method on all instances of service_id. Args: service_id: The ID of the data service on which to invoke the call. @@ -315,6 +338,8 @@ class DseNode(object): Raises: RemoteError, MessageDeliveryFailure """ + if kwargs is None: + kwargs = {} if not self.is_valid_service(service_id): msg = "service '%s' is not a registered service" raise exception.NotFound(msg % service_id) @@ -342,8 +367,9 @@ class DseNode(object): """ LOG.trace("<%s> Publishing from '%s' table %s: %s", self.node_id, publisher, table, data) - self.broadcast_node_rpc("handle_publish", publisher=publisher, - table=table, data=data) + self.broadcast_node_rpc( + "handle_publish", + {'publisher': publisher, 'table': table, 'data': data}) # Note(thread-safety): blocking function def publish_table_sequenced( @@ -363,8 +389,9 @@ class DseNode(object): LOG.trace("<%s> Publishing from '%s' table %s: %s", self.node_id, publisher, table, data) self.broadcast_node_rpc( - "handle_publish_sequenced", publisher=publisher, table=table, - data=data, is_snapshot=is_snapshot, seqnum=seqnum) + "handle_publish_sequenced", + {'publisher': publisher, 'table': table, + 'data': data, 'is_snapshot': is_snapshot, 'seqnum': seqnum}) def table_subscribers(self, publisher, table): """List services on this node that subscribes to publisher/table.""" @@ -386,12 +413,13 @@ class DseNode(object): if self.always_snapshot: # Note(thread-safety): blocking call snapshot = self.invoke_service_rpc( - publisher, "get_snapshot", table=table) + publisher, "get_snapshot", {'table': table}) return self.to_set_of_tuples(snapshot) else: # Note(thread-safety): blocking call snapshot_seqnum = self.invoke_service_rpc( - publisher, "get_last_published_data_with_seqnum", table=table) + publisher, "get_last_published_data_with_seqnum", + {'table': table}) return snapshot_seqnum def get_subscription(self, service_id): diff --git a/congress/policy_engines/agnostic.py b/congress/policy_engines/agnostic.py index 0a689217b..f10a660a7 100644 --- a/congress/policy_engines/agnostic.py +++ b/congress/policy_engines/agnostic.py @@ -2140,9 +2140,10 @@ class Dse2Runtime(DseRuntime): # TODO(ramineni): This is called only during execute_action, added # the same function name for compatibility with old arch args = {'action': action, 'action_args': args} - # Note(thread-safety): blocking call - return self.rpc(service_name, 'request_execute', args) + # 60s timeout for action execution because actions can take a while + return self.rpc(service_name, 'request_execute', args, + timeout=cfg.CONF.dse_long_timeout) def service_exists(self, service_name): return self.is_valid_service(service_name) diff --git a/congress/tests2/api/base.py b/congress/tests2/api/base.py index 43a3b499f..5103ee589 100644 --- a/congress/tests2/api/base.py +++ b/congress/tests2/api/base.py @@ -20,7 +20,8 @@ from congress.tests import fake_datasource from congress.tests import helper -def setup_config(with_fake_datasource=True): +def setup_config(with_fake_datasource=True, node_id='testnode', + same_partition_as_node=None): """Setup DseNode for testing. :param services is an array of DataServices @@ -34,7 +35,12 @@ def setup_config(with_fake_datasource=True): ['congress.tests.fake_datasource.FakeDataSource']) cfg.CONF.set_override('enable_synchronizer', False) - node = helper.make_dsenode_new_partition("testnode") + if same_partition_as_node is None: + node = helper.make_dsenode_new_partition(node_id) + else: + node = helper.make_dsenode_same_partition( + same_partition_as_node, node_id) + services = harness.create2(node=node) # Always register engine and fake datasource diff --git a/congress/tests2/dse2/test_datasource.py b/congress/tests2/dse2/test_datasource.py index fb098919c..5c79502e1 100644 --- a/congress/tests2/dse2/test_datasource.py +++ b/congress/tests2/dse2/test_datasource.py @@ -60,7 +60,7 @@ class TestDataSource(base.SqlTestCase): self.assertIsInstance(services[0], fake_datasource.FakeDataSource) obj = self.dseNode.invoke_service_rpc( - req['name'], 'get_status', source_id=None, params=None) + req['name'], 'get_status', {'source_id': None, 'params': None}) self.assertIsNotNone(obj) def test_get_datasource(self): @@ -136,7 +136,7 @@ class TestDataSource(base.SqlTestCase): self.assertEqual(len(services), 0) self.assertRaises( congressException.NotFound, self.dseNode.invoke_service_rpc, - req['name'], 'get_status', source_id=None, params=None) + req['name'], 'get_status', {'source_id': None, 'params': None}) # TODO(thinrichs): test that we've actually removed # the row from the DB diff --git a/congress/tests2/dse2/test_dse2.py b/congress/tests2/dse2/test_dse2.py index afa0e401f..f0abe3aab 100644 --- a/congress/tests2/dse2/test_dse2.py +++ b/congress/tests2/dse2/test_dse2.py @@ -305,13 +305,13 @@ class TestDSE(base.TestCase): test1 = FakeDataSource('test1') node.register_service(test1) obj = node.invoke_service_rpc( - 'test1', 'get_status', source_id=None, params=None) + 'test1', 'get_status', {'source_id': None, 'params': None}) self.assertIsNotNone(obj) node.unregister_service('test1') helper.retry_til_exception( congressException.NotFound, lambda: node.invoke_service_rpc( - 'test1', 'get_status', source_id=None, params=None)) + 'test1', 'get_status', {'source_id': None, 'params': None})) def _create_node_with_services(self, nodes, services, num, partition_id): nid = 'cbd_node%s' % num diff --git a/congress/tests2/dse2/test_dse_node.py b/congress/tests2/dse2/test_dse_node.py index 7224f49bb..bce28201f 100644 --- a/congress/tests2/dse2/test_dse_node.py +++ b/congress/tests2/dse2/test_dse_node.py @@ -43,6 +43,9 @@ class _PingRpcEndpoint(object): self.ping_received_from = [] def ping(self, client_ctxt, **args): + return args + + def ping_test(self, client_ctxt, **args): self.ping_receive_count += 1 self.ping_received_from.append(client_ctxt) return args @@ -147,7 +150,7 @@ class TestDseNode(base.TestCase): for j, target in enumerate(nodes): scount = endpoints[j].ping_receive_count args = {'arg1': 1, 'arg2': 'a'} - ret = source.invoke_node_rpc(target.node_id, 'ping', **args) + ret = source.invoke_node_rpc(target.node_id, 'ping_test', args) self.assertEqual(ret, args, "Ping echoed arguments") ecount = endpoints[j].ping_receive_count self.assertEqual(ecount - scount, 1, @@ -176,7 +179,7 @@ class TestDseNode(base.TestCase): scounts = [] for j, target in enumerate(nodes): scounts.append(endpoints[j].ping_receive_count) - source.broadcast_node_rpc('ping', arg1=1, arg2='a') + source.broadcast_node_rpc('ping_test', {'arg1': 1, 'arg2': 'a'}) eventlet.sleep(0.5) # wait for async delivery for j, target in enumerate(nodes): ecount = endpoints[j].ping_receive_count @@ -209,8 +212,8 @@ class TestDseNode(base.TestCase): ep = nodes[j]._services[-1].endpoints[0] scount = ep.ping_receive_count args = {'arg1': 1, 'arg2': 'a'} - ret = source.invoke_service_rpc(service.service_id, 'ping', - **args) + ret = source.invoke_service_rpc( + service.service_id, 'ping_test', args) self.assertEqual(ret, args, "Ping echoed arguments") ecount = ep.ping_receive_count self.assertEqual(ecount - scount, 1, @@ -241,7 +244,8 @@ class TestDseNode(base.TestCase): for j, target in enumerate(nodes): ep = nodes[j]._services[-1].endpoints[0] scounts.append(ep.ping_receive_count) - source.broadcast_service_rpc('tbsr_svc', 'ping', arg1=1, arg2='a') + source.broadcast_service_rpc( + 'tbsr_svc', 'ping_test', {'arg1': 1, 'arg2': 'a'}) eventlet.sleep(0.5) # wait for async delivery for j, target in enumerate(nodes): ep = nodes[j]._services[-1].endpoints[0] diff --git a/congress/tests2/test_congress.py b/congress/tests2/test_congress.py index d7e095249..595fa1cd8 100644 --- a/congress/tests2/test_congress.py +++ b/congress/tests2/test_congress.py @@ -31,7 +31,6 @@ cfg.CONF.distributed_architecture = True import neutronclient.v2_0 from oslo_log import log as logging -from congress.common import config from congress.datasources import neutronv2_driver from congress.datasources import nova_driver from congress import harness @@ -81,10 +80,6 @@ class TestCongress(BaseTestPolicyCongress): """Setup tests that use multiple mock neutron instances.""" super(TestCongress, self).setUp() - def setup_config(self): - args = ['--config-file', helper.etcdir('congress.conf.test')] - config.init(args) - def test_startup(self): self.assertIsNotNone(self.services['api']) self.assertIsNotNone(self.services[harness.ENGINE_SERVICE_NAME]) @@ -179,6 +174,61 @@ class TestCongress(BaseTestPolicyCongress): helper.retry_check_function_return_value_not_eq(f, 0) +class APILocalRouting(BaseTestPolicyCongress): + + def setUp(self): + super(APILocalRouting, self).setUp() + + # set up second API+PE node + self.services = api_base.setup_config( + with_fake_datasource=False, node_id='testnode2', + same_partition_as_node=self.node) + self.api2 = self.services['api'] + self.node2 = self.services['node'] + self.engine2 = self.services['engine'] + self.data = self.services['data'] + + # add different data to two PE instances + # going directly to agnostic not via API to make sure faulty API + # routing (subject of the test) would not affect test accuracy + self.engine.create_policy('policy') + self.engine2.create_policy('policy') + self.engine.insert('p(1) :- NOT q()', 'policy') + # self.engine1.insert('p(1)', 'policy') + self.engine2.insert('p(2) :- NOT q()', 'policy') + self.engine2.insert('p(3) :- NOT q()', 'policy') + + def test_intranode_pe_routing(self): + for i in range(0, 5): # run multiple times (non-determinism) + result = self.api['api-row'].get_items( + {}, {'policy_id': 'policy', 'table_id': 'p'}) + self.assertEqual(len(result['results']), 1) + result = self.api2['api-row'].get_items( + {}, {'policy_id': 'policy', 'table_id': 'p'}) + self.assertEqual(len(result['results']), 2) + + def test_non_PE_service_reachable(self): + # intranode + result = self.api['api-row'].get_items( + {}, {'ds_id': 'neutron', 'table_id': 'ports'}) + self.assertEqual(len(result['results']), 1) + + # internode + result = self.api2['api-row'].get_items( + {}, {'ds_id': 'neutron', 'table_id': 'ports'}) + self.assertEqual(len(result['results']), 1) + + def test_internode_pe_routing(self): + '''test reach internode PE when intranode PE not available''' + self.node.unregister_service('engine') + result = self.api['api-row'].get_items( + {}, {'policy_id': 'policy', 'table_id': 'p'}) + self.assertEqual(len(result['results']), 2) + result = self.api2['api-row'].get_items( + {}, {'policy_id': 'policy', 'table_id': 'p'}) + self.assertEqual(len(result['results']), 2) + + class TestPolicyExecute(BaseTestPolicyCongress): def setUp(self):