Merge "API routing to intranode PE"

This commit is contained in:
Jenkins 2016-07-28 22:53:27 +00:00 committed by Gerrit Code Review
commit 1c38124e75
13 changed files with 156 additions and 50 deletions

View File

@ -34,11 +34,15 @@ class APIModel(object):
self.datasource_mgr = datasource_mgr
self.bus = bus
self.name = name
self.dse_long_timeout = cfg.CONF.dse_long_timeout
# Note(thread-safety): blocking function
def invoke_rpc(self, caller, name, kwargs):
def invoke_rpc(self, caller, name, kwargs, timeout=None):
if self.dist_arch:
return self.bus.rpc(caller, name, kwargs)
local = (caller is self.engine and
self.bus.node.service_object(self.engine) is not None)
return self.bus.rpc(
caller, name, kwargs, timeout=timeout, local=local)
else:
func = getattr(caller, name, None)
if func:

View File

@ -184,7 +184,8 @@ class PolicyModel(base.APIModel):
'action_theory': actions, 'delta': delta,
'trace': trace, 'as_list': True}
# Note(thread-safety): blocking call
result = self.invoke_rpc(self.engine, 'simulate', args)
result = self.invoke_rpc(self.engine, 'simulate', args,
timeout=self.dse_long_timeout)
except exception.PolicyException as e:
(num, desc) = error_codes.get('simulate_error')
raise webservice.DataModelException(num, desc + "::" + str(e))

View File

@ -83,8 +83,15 @@ class RowModel(base.APIModel):
try:
args = {'table_id': table_id, 'source_id': source_id,
'trace': gen_trace}
# Note(thread-safety): blocking call
result = self.invoke_rpc(caller, 'get_row_data', args)
if caller is self.engine:
# allow extra time for row policy engine query
# Note(thread-safety): blocking call
result = self.invoke_rpc(
caller, 'get_row_data', args,
timeout=self.dse_long_timeout)
else:
# Note(thread-safety): blocking call
result = self.invoke_rpc(caller, 'get_row_data', args)
except exception.CongressException as e:
m = ("Error occurred while processing source_id '%s' for row "
"data of the table '%s'" % (source_id, table_id))

View File

@ -108,7 +108,8 @@ class RuleModel(base.APIModel):
'rule_name': item.get('name'),
'comment': item.get('comment')}
# Note(thread-safety): blocking call
return self.invoke_rpc(self.engine, 'persistent_insert_rule', args)
return self.invoke_rpc(self.engine, 'persistent_insert_rule', args,
timeout=self.dse_long_timeout)
except exception.CongressException as e:
raise webservice.DataModelException.create(e)
@ -131,6 +132,7 @@ class RuleModel(base.APIModel):
try:
args = {'id_': id_, 'policy_name_or_id': self.policy_name(context)}
# Note(thread-safety): blocking call
return self.invoke_rpc(self.engine, 'persistent_delete_rule', args)
return self.invoke_rpc(self.engine, 'persistent_delete_rule', args,
timeout=self.dse_long_timeout)
except exception.CongressException as e:
raise webservice.DataModelException.create(e)

View File

@ -118,7 +118,7 @@ class DseNodeControlBus(DataService):
cls=HeartbeatEncoder)
# Note(thread-safety): blocking call
self.node.broadcast_service_rpc(self.service_id, 'accept_heartbeat',
args=args)
{'args': args})
def _call_heartbeat_callbacks(self):
for service in self.node.get_services():

View File

@ -181,10 +181,9 @@ class DataService(object):
self._rpc_server.wait()
# Note(thread-safety): blocking function
def rpc(self, service, action, kwargs=None):
if kwargs is None:
kwargs = {}
return self.node.invoke_service_rpc(service, action, **kwargs)
def rpc(self, service, action, kwargs=None, timeout=None, local=False):
return self.node.invoke_service_rpc(
service, action, kwargs, timeout=timeout, local=local)
# Will be removed once the reference of node exists in api
# Note(thread-safety): blocking function
@ -418,3 +417,7 @@ class DataServiceEndPoints (object):
return self.service.get_last_published_data_with_seqnum(table)
except AttributeError:
pass
def ping(self, client_ctxt, **args):
"""Echo args"""
return args

View File

@ -43,7 +43,12 @@ LOG = logging.getLogger(__name__)
_dse_opts = [
cfg.StrOpt('bus_id', default='bus',
help='Unique ID of this DSE bus')
help='Unique ID of this DSE bus'),
cfg.IntOpt('dse_ping_timeout', default=5,
help='RPC short timeout in seconds; used to ping destination'),
cfg.IntOpt('dse_long_timeout', default=120,
help='RPC long timeout in seconds; used on potentially long '
'running requests such as datasource action and PE row query'),
]
cfg.CONF.register_opts(_dse_opts)
@ -237,7 +242,7 @@ class DseNode(object):
return service_id in self.get_global_service_names(hidden=True)
# Note(thread-safety): blocking function
def invoke_node_rpc(self, node_id, method, **kwargs):
def invoke_node_rpc(self, node_id, method, kwargs=None, timeout=None):
"""Invoke RPC method on a DSE Node.
Args:
@ -250,13 +255,15 @@ class DseNode(object):
Raises: MessagingTimeout, RemoteError, MessageDeliveryFailure
"""
if kwargs is None:
kwargs = {}
target = self.node_rpc_target(server=node_id)
LOG.trace("<%s> Invoking RPC '%s' on %s", self.node_id, method, target)
client = messaging.RPCClient(self.transport, target)
client = messaging.RPCClient(self.transport, target, timeout=timeout)
return client.call(self.context, method, **kwargs)
# Note(thread-safety): blocking function
def broadcast_node_rpc(self, method, **kwargs):
def broadcast_node_rpc(self, method, kwargs=None):
"""Invoke RPC method on all DSE Nodes.
Args:
@ -268,13 +275,16 @@ class DseNode(object):
Raises: RemoteError, MessageDeliveryFailure
"""
if kwargs is None:
kwargs = {}
target = self.node_rpc_target(fanout=True)
LOG.trace("<%s> Casting RPC '%s' on %s", self.node_id, method, target)
client = messaging.RPCClient(self.transport, target)
client.cast(self.context, method, **kwargs)
# Note(thread-safety): blocking function
def invoke_service_rpc(self, service_id, method, **kwargs):
def invoke_service_rpc(
self, service_id, method, kwargs=None, timeout=None, local=False):
"""Invoke RPC method on a DSE Service.
Args:
@ -287,23 +297,36 @@ class DseNode(object):
Raises: MessagingTimeout, RemoteError, MessageDeliveryFailure, NotFound
"""
target = self.service_rpc_target(service_id)
LOG.trace("<%s> Invoking RPC '%s' on %s", self.node_id, method, target)
client = messaging.RPCClient(self.transport, target)
# Using the control bus to check if the service exists before
# running the RPC doesn't always work, either because of bugs
# or nondeterminism--not clear which.
target = self.service_rpc_target(
service_id, server=(self.node_id if local else None))
LOG.trace("<%s> Preparing to invoking RPC '%s' on %s",
self.node_id, method, target)
client = messaging.RPCClient(self.transport, target, timeout=timeout)
if not self.is_valid_service(service_id):
try:
# First ping the destination to fail fast if unresponsive
LOG.trace("<%s> Checking responsiveness before invoking RPC "
"'%s' on %s", self.node_id, method, target)
client.prepare(timeout=cfg.CONF.dse_ping_timeout).call(
self.context, 'ping')
except messaging_exceptions.MessagingTimeout:
msg = "service '%s' could not be found"
raise exception.NotFound(msg % service_id)
if kwargs is None:
kwargs = {}
try:
LOG.trace(
"<%s> Invoking RPC '%s' on %s", self.node_id, method, target)
result = client.call(self.context, method, **kwargs)
except messaging_exceptions.MessagingTimeout:
msg = "service '%s' could not be found"
msg = "Request to service '%s' timed out"
raise exception.NotFound(msg % service_id)
LOG.trace("<%s> RPC call returned: %s", self.node_id, result)
return result
# Note(thread-safety): blocking function
def broadcast_service_rpc(self, service_id, method, **kwargs):
"""Invoke RPC method on all insances of service_id.
def broadcast_service_rpc(self, service_id, method, kwargs=None):
"""Invoke RPC method on all instances of service_id.
Args:
service_id: The ID of the data service on which to invoke the call.
@ -315,6 +338,8 @@ class DseNode(object):
Raises: RemoteError, MessageDeliveryFailure
"""
if kwargs is None:
kwargs = {}
if not self.is_valid_service(service_id):
msg = "service '%s' is not a registered service"
raise exception.NotFound(msg % service_id)
@ -342,8 +367,9 @@ class DseNode(object):
"""
LOG.trace("<%s> Publishing from '%s' table %s: %s",
self.node_id, publisher, table, data)
self.broadcast_node_rpc("handle_publish", publisher=publisher,
table=table, data=data)
self.broadcast_node_rpc(
"handle_publish",
{'publisher': publisher, 'table': table, 'data': data})
# Note(thread-safety): blocking function
def publish_table_sequenced(
@ -363,8 +389,9 @@ class DseNode(object):
LOG.trace("<%s> Publishing from '%s' table %s: %s",
self.node_id, publisher, table, data)
self.broadcast_node_rpc(
"handle_publish_sequenced", publisher=publisher, table=table,
data=data, is_snapshot=is_snapshot, seqnum=seqnum)
"handle_publish_sequenced",
{'publisher': publisher, 'table': table,
'data': data, 'is_snapshot': is_snapshot, 'seqnum': seqnum})
def table_subscribers(self, publisher, table):
"""List services on this node that subscribes to publisher/table."""
@ -386,12 +413,13 @@ class DseNode(object):
if self.always_snapshot:
# Note(thread-safety): blocking call
snapshot = self.invoke_service_rpc(
publisher, "get_snapshot", table=table)
publisher, "get_snapshot", {'table': table})
return self.to_set_of_tuples(snapshot)
else:
# Note(thread-safety): blocking call
snapshot_seqnum = self.invoke_service_rpc(
publisher, "get_last_published_data_with_seqnum", table=table)
publisher, "get_last_published_data_with_seqnum",
{'table': table})
return snapshot_seqnum
def get_subscription(self, service_id):

View File

@ -2140,9 +2140,10 @@ class Dse2Runtime(DseRuntime):
# TODO(ramineni): This is called only during execute_action, added
# the same function name for compatibility with old arch
args = {'action': action, 'action_args': args}
# Note(thread-safety): blocking call
return self.rpc(service_name, 'request_execute', args)
# 60s timeout for action execution because actions can take a while
return self.rpc(service_name, 'request_execute', args,
timeout=cfg.CONF.dse_long_timeout)
def service_exists(self, service_name):
return self.is_valid_service(service_name)

View File

@ -20,7 +20,8 @@ from congress.tests import fake_datasource
from congress.tests import helper
def setup_config(with_fake_datasource=True):
def setup_config(with_fake_datasource=True, node_id='testnode',
same_partition_as_node=None):
"""Setup DseNode for testing.
:param services is an array of DataServices
@ -34,7 +35,12 @@ def setup_config(with_fake_datasource=True):
['congress.tests.fake_datasource.FakeDataSource'])
cfg.CONF.set_override('enable_synchronizer', False)
node = helper.make_dsenode_new_partition("testnode")
if same_partition_as_node is None:
node = helper.make_dsenode_new_partition(node_id)
else:
node = helper.make_dsenode_same_partition(
same_partition_as_node, node_id)
services = harness.create2(node=node)
# Always register engine and fake datasource

View File

@ -60,7 +60,7 @@ class TestDataSource(base.SqlTestCase):
self.assertIsInstance(services[0],
fake_datasource.FakeDataSource)
obj = self.dseNode.invoke_service_rpc(
req['name'], 'get_status', source_id=None, params=None)
req['name'], 'get_status', {'source_id': None, 'params': None})
self.assertIsNotNone(obj)
def test_get_datasource(self):
@ -136,7 +136,7 @@ class TestDataSource(base.SqlTestCase):
self.assertEqual(len(services), 0)
self.assertRaises(
congressException.NotFound, self.dseNode.invoke_service_rpc,
req['name'], 'get_status', source_id=None, params=None)
req['name'], 'get_status', {'source_id': None, 'params': None})
# TODO(thinrichs): test that we've actually removed
# the row from the DB

View File

@ -305,13 +305,13 @@ class TestDSE(base.TestCase):
test1 = FakeDataSource('test1')
node.register_service(test1)
obj = node.invoke_service_rpc(
'test1', 'get_status', source_id=None, params=None)
'test1', 'get_status', {'source_id': None, 'params': None})
self.assertIsNotNone(obj)
node.unregister_service('test1')
helper.retry_til_exception(
congressException.NotFound,
lambda: node.invoke_service_rpc(
'test1', 'get_status', source_id=None, params=None))
'test1', 'get_status', {'source_id': None, 'params': None}))
def _create_node_with_services(self, nodes, services, num, partition_id):
nid = 'cbd_node%s' % num

View File

@ -43,6 +43,9 @@ class _PingRpcEndpoint(object):
self.ping_received_from = []
def ping(self, client_ctxt, **args):
return args
def ping_test(self, client_ctxt, **args):
self.ping_receive_count += 1
self.ping_received_from.append(client_ctxt)
return args
@ -147,7 +150,7 @@ class TestDseNode(base.TestCase):
for j, target in enumerate(nodes):
scount = endpoints[j].ping_receive_count
args = {'arg1': 1, 'arg2': 'a'}
ret = source.invoke_node_rpc(target.node_id, 'ping', **args)
ret = source.invoke_node_rpc(target.node_id, 'ping_test', args)
self.assertEqual(ret, args, "Ping echoed arguments")
ecount = endpoints[j].ping_receive_count
self.assertEqual(ecount - scount, 1,
@ -176,7 +179,7 @@ class TestDseNode(base.TestCase):
scounts = []
for j, target in enumerate(nodes):
scounts.append(endpoints[j].ping_receive_count)
source.broadcast_node_rpc('ping', arg1=1, arg2='a')
source.broadcast_node_rpc('ping_test', {'arg1': 1, 'arg2': 'a'})
eventlet.sleep(0.5) # wait for async delivery
for j, target in enumerate(nodes):
ecount = endpoints[j].ping_receive_count
@ -209,8 +212,8 @@ class TestDseNode(base.TestCase):
ep = nodes[j]._services[-1].endpoints[0]
scount = ep.ping_receive_count
args = {'arg1': 1, 'arg2': 'a'}
ret = source.invoke_service_rpc(service.service_id, 'ping',
**args)
ret = source.invoke_service_rpc(
service.service_id, 'ping_test', args)
self.assertEqual(ret, args, "Ping echoed arguments")
ecount = ep.ping_receive_count
self.assertEqual(ecount - scount, 1,
@ -241,7 +244,8 @@ class TestDseNode(base.TestCase):
for j, target in enumerate(nodes):
ep = nodes[j]._services[-1].endpoints[0]
scounts.append(ep.ping_receive_count)
source.broadcast_service_rpc('tbsr_svc', 'ping', arg1=1, arg2='a')
source.broadcast_service_rpc(
'tbsr_svc', 'ping_test', {'arg1': 1, 'arg2': 'a'})
eventlet.sleep(0.5) # wait for async delivery
for j, target in enumerate(nodes):
ep = nodes[j]._services[-1].endpoints[0]

View File

@ -31,7 +31,6 @@ cfg.CONF.distributed_architecture = True
import neutronclient.v2_0
from oslo_log import log as logging
from congress.common import config
from congress.datasources import neutronv2_driver
from congress.datasources import nova_driver
from congress import harness
@ -81,10 +80,6 @@ class TestCongress(BaseTestPolicyCongress):
"""Setup tests that use multiple mock neutron instances."""
super(TestCongress, self).setUp()
def setup_config(self):
args = ['--config-file', helper.etcdir('congress.conf.test')]
config.init(args)
def test_startup(self):
self.assertIsNotNone(self.services['api'])
self.assertIsNotNone(self.services[harness.ENGINE_SERVICE_NAME])
@ -179,6 +174,61 @@ class TestCongress(BaseTestPolicyCongress):
helper.retry_check_function_return_value_not_eq(f, 0)
class APILocalRouting(BaseTestPolicyCongress):
def setUp(self):
super(APILocalRouting, self).setUp()
# set up second API+PE node
self.services = api_base.setup_config(
with_fake_datasource=False, node_id='testnode2',
same_partition_as_node=self.node)
self.api2 = self.services['api']
self.node2 = self.services['node']
self.engine2 = self.services['engine']
self.data = self.services['data']
# add different data to two PE instances
# going directly to agnostic not via API to make sure faulty API
# routing (subject of the test) would not affect test accuracy
self.engine.create_policy('policy')
self.engine2.create_policy('policy')
self.engine.insert('p(1) :- NOT q()', 'policy')
# self.engine1.insert('p(1)', 'policy')
self.engine2.insert('p(2) :- NOT q()', 'policy')
self.engine2.insert('p(3) :- NOT q()', 'policy')
def test_intranode_pe_routing(self):
for i in range(0, 5): # run multiple times (non-determinism)
result = self.api['api-row'].get_items(
{}, {'policy_id': 'policy', 'table_id': 'p'})
self.assertEqual(len(result['results']), 1)
result = self.api2['api-row'].get_items(
{}, {'policy_id': 'policy', 'table_id': 'p'})
self.assertEqual(len(result['results']), 2)
def test_non_PE_service_reachable(self):
# intranode
result = self.api['api-row'].get_items(
{}, {'ds_id': 'neutron', 'table_id': 'ports'})
self.assertEqual(len(result['results']), 1)
# internode
result = self.api2['api-row'].get_items(
{}, {'ds_id': 'neutron', 'table_id': 'ports'})
self.assertEqual(len(result['results']), 1)
def test_internode_pe_routing(self):
'''test reach internode PE when intranode PE not available'''
self.node.unregister_service('engine')
result = self.api['api-row'].get_items(
{}, {'policy_id': 'policy', 'table_id': 'p'})
self.assertEqual(len(result['results']), 2)
result = self.api2['api-row'].get_items(
{}, {'policy_id': 'policy', 'table_id': 'p'})
self.assertEqual(len(result['results']), 2)
class TestPolicyExecute(BaseTestPolicyCongress):
def setUp(self):