Check DB only to detect duplicate DS name on add
Now that we have moved to modifying DB first and letting in-mem catch up on sync we should only consult DB as the official record The extraneous check of whether the ds exists in-mem may have been causing testHA failure when attempting to create ds 'fake' because 'fake' was removed from DB but sometimes remains in-mem. Additionally, prepend __ to internal service_ids to avoid breaking compatibility with users who may have named datasource 'engine' Change-Id: Iffff446693605c43f7d741ee111b13e79d30b30f
This commit is contained in:
parent
40b3b3473b
commit
aeb5d6f79e
@ -41,7 +41,7 @@ def get_id_from_context(context):
|
|||||||
ds_name = db_datasources.get_datasource_name(context.get('ds_id'))
|
ds_name = db_datasources.get_datasource_name(context.get('ds_id'))
|
||||||
return ds_name, context.get('ds_id')
|
return ds_name, context.get('ds_id')
|
||||||
elif 'policy_id' in context:
|
elif 'policy_id' in context:
|
||||||
return base.ENGINE_SERVICE, context.get('policy_id')
|
return base.ENGINE_SERVICE_ID, context.get('policy_id')
|
||||||
else:
|
else:
|
||||||
msg = "Internal error: context %s should have included " % str(context)
|
msg = "Internal error: context %s should have included " % str(context)
|
||||||
"either ds_id or policy_id"
|
"either ds_id or policy_id"
|
||||||
|
@ -19,7 +19,7 @@ from __future__ import absolute_import
|
|||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
|
|
||||||
ENGINE_SERVICE = 'engine'
|
ENGINE_SERVICE_ID = '__engine'
|
||||||
|
|
||||||
|
|
||||||
class APIModel(object):
|
class APIModel(object):
|
||||||
@ -32,7 +32,8 @@ class APIModel(object):
|
|||||||
|
|
||||||
# Note(thread-safety): blocking function
|
# Note(thread-safety): blocking function
|
||||||
def invoke_rpc(self, caller, name, kwargs, timeout=None):
|
def invoke_rpc(self, caller, name, kwargs, timeout=None):
|
||||||
local = (caller is ENGINE_SERVICE and
|
local = (caller is ENGINE_SERVICE_ID and
|
||||||
self.bus.node.service_object(ENGINE_SERVICE) is not None)
|
self.bus.node.service_object(
|
||||||
|
ENGINE_SERVICE_ID) is not None)
|
||||||
return self.bus.rpc(
|
return self.bus.rpc(
|
||||||
caller, name, kwargs, timeout=timeout, local=local)
|
caller, name, kwargs, timeout=timeout, local=local)
|
||||||
|
@ -149,7 +149,7 @@ class DatasourceModel(base.APIModel):
|
|||||||
# TODO(ekcs): perhaps keep execution synchronous when explicitly
|
# TODO(ekcs): perhaps keep execution synchronous when explicitly
|
||||||
# called via API
|
# called via API
|
||||||
# Note(thread-safety): blocking call
|
# Note(thread-safety): blocking call
|
||||||
self.invoke_rpc(base.ENGINE_SERVICE, 'execute_action', args)
|
self.invoke_rpc(base.ENGINE_SERVICE_ID, 'execute_action', args)
|
||||||
except exception.PolicyException as e:
|
except exception.PolicyException as e:
|
||||||
(num, desc) = error_codes.get('execute_error')
|
(num, desc) = error_codes.get('execute_error')
|
||||||
raise webservice.DataModelException(num, desc + "::" + str(e))
|
raise webservice.DataModelException(num, desc + "::" + str(e))
|
||||||
|
@ -46,7 +46,7 @@ class PolicyModel(base.APIModel):
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# Note(thread-safety): blocking call
|
# Note(thread-safety): blocking call
|
||||||
return {"results": self.invoke_rpc(base.ENGINE_SERVICE,
|
return {"results": self.invoke_rpc(base.ENGINE_SERVICE_ID,
|
||||||
'persistent_get_policies',
|
'persistent_get_policies',
|
||||||
{})}
|
{})}
|
||||||
except exception.CongressException as e:
|
except exception.CongressException as e:
|
||||||
@ -67,7 +67,7 @@ class PolicyModel(base.APIModel):
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# Note(thread-safety): blocking call
|
# Note(thread-safety): blocking call
|
||||||
return self.invoke_rpc(base.ENGINE_SERVICE,
|
return self.invoke_rpc(base.ENGINE_SERVICE_ID,
|
||||||
'persistent_get_policy',
|
'persistent_get_policy',
|
||||||
{'id_': id_})
|
{'id_': id_})
|
||||||
except exception.CongressException as e:
|
except exception.CongressException as e:
|
||||||
@ -96,7 +96,7 @@ class PolicyModel(base.APIModel):
|
|||||||
try:
|
try:
|
||||||
# Note(thread-safety): blocking call
|
# Note(thread-safety): blocking call
|
||||||
policy_metadata = self.invoke_rpc(
|
policy_metadata = self.invoke_rpc(
|
||||||
base.ENGINE_SERVICE, 'persistent_create_policy',
|
base.ENGINE_SERVICE_ID, 'persistent_create_policy',
|
||||||
{'name': name,
|
{'name': name,
|
||||||
'abbr': item.get('abbreviation'),
|
'abbr': item.get('abbreviation'),
|
||||||
'kind': item.get('kind'),
|
'kind': item.get('kind'),
|
||||||
@ -139,7 +139,7 @@ class PolicyModel(base.APIModel):
|
|||||||
KeyError: Item with specified id_ not present.
|
KeyError: Item with specified id_ not present.
|
||||||
"""
|
"""
|
||||||
# Note(thread-safety): blocking call
|
# Note(thread-safety): blocking call
|
||||||
return self.invoke_rpc(base.ENGINE_SERVICE,
|
return self.invoke_rpc(base.ENGINE_SERVICE_ID,
|
||||||
'persistent_delete_policy',
|
'persistent_delete_policy',
|
||||||
{'name_or_id': id_})
|
{'name_or_id': id_})
|
||||||
|
|
||||||
@ -176,8 +176,8 @@ class PolicyModel(base.APIModel):
|
|||||||
'action_theory': actions, 'delta': delta,
|
'action_theory': actions, 'delta': delta,
|
||||||
'trace': trace, 'as_list': True}
|
'trace': trace, 'as_list': True}
|
||||||
# Note(thread-safety): blocking call
|
# Note(thread-safety): blocking call
|
||||||
result = self.invoke_rpc(base.ENGINE_SERVICE, 'simulate', args,
|
result = self.invoke_rpc(base.ENGINE_SERVICE_ID, 'simulate',
|
||||||
timeout=self.dse_long_timeout)
|
args, timeout=self.dse_long_timeout)
|
||||||
except exception.PolicyException as e:
|
except exception.PolicyException as e:
|
||||||
(num, desc) = error_codes.get('simulate_error')
|
(num, desc) = error_codes.get('simulate_error')
|
||||||
raise webservice.DataModelException(num, desc + "::" + str(e))
|
raise webservice.DataModelException(num, desc + "::" + str(e))
|
||||||
@ -209,7 +209,7 @@ class PolicyModel(base.APIModel):
|
|||||||
'action': action,
|
'action': action,
|
||||||
'action_args': action_args}
|
'action_args': action_args}
|
||||||
# Note(thread-safety): blocking call
|
# Note(thread-safety): blocking call
|
||||||
self.invoke_rpc(base.ENGINE_SERVICE, 'execute_action', args)
|
self.invoke_rpc(base.ENGINE_SERVICE_ID, 'execute_action', args)
|
||||||
except exception.PolicyException as e:
|
except exception.PolicyException as e:
|
||||||
(num, desc) = error_codes.get('execute_error')
|
(num, desc) = error_codes.get('execute_error')
|
||||||
raise webservice.DataModelException(num, desc + "::" + str(e))
|
raise webservice.DataModelException(num, desc + "::" + str(e))
|
||||||
|
@ -78,7 +78,7 @@ class RowModel(base.APIModel):
|
|||||||
try:
|
try:
|
||||||
args = {'table_id': table_id, 'source_id': source_id,
|
args = {'table_id': table_id, 'source_id': source_id,
|
||||||
'trace': gen_trace}
|
'trace': gen_trace}
|
||||||
if caller is base.ENGINE_SERVICE:
|
if caller is base.ENGINE_SERVICE_ID:
|
||||||
# allow extra time for row policy engine query
|
# allow extra time for row policy engine query
|
||||||
# Note(thread-safety): blocking call
|
# Note(thread-safety): blocking call
|
||||||
result = self.invoke_rpc(
|
result = self.invoke_rpc(
|
||||||
@ -93,7 +93,7 @@ class RowModel(base.APIModel):
|
|||||||
LOG.exception(m)
|
LOG.exception(m)
|
||||||
raise webservice.DataModelException.create(e)
|
raise webservice.DataModelException.create(e)
|
||||||
|
|
||||||
if gen_trace and caller is base.ENGINE_SERVICE:
|
if gen_trace and caller is base.ENGINE_SERVICE_ID:
|
||||||
# DSE2 returns lists instead of tuples, so correct that.
|
# DSE2 returns lists instead of tuples, so correct that.
|
||||||
results = [{'data': tuple(x['data'])} for x in result[0]]
|
results = [{'data': tuple(x['data'])} for x in result[0]]
|
||||||
return {'results': results,
|
return {'results': results,
|
||||||
|
@ -48,7 +48,7 @@ class RuleModel(base.APIModel):
|
|||||||
try:
|
try:
|
||||||
args = {'id_': id_, 'policy_name': self.policy_name(context)}
|
args = {'id_': id_, 'policy_name': self.policy_name(context)}
|
||||||
# Note(thread-safety): blocking call
|
# Note(thread-safety): blocking call
|
||||||
return self.invoke_rpc(base.ENGINE_SERVICE,
|
return self.invoke_rpc(base.ENGINE_SERVICE_ID,
|
||||||
'persistent_get_rule', args)
|
'persistent_get_rule', args)
|
||||||
except exception.CongressException as e:
|
except exception.CongressException as e:
|
||||||
raise webservice.DataModelException.create(e)
|
raise webservice.DataModelException.create(e)
|
||||||
@ -69,7 +69,7 @@ class RuleModel(base.APIModel):
|
|||||||
try:
|
try:
|
||||||
args = {'policy_name': self.policy_name(context)}
|
args = {'policy_name': self.policy_name(context)}
|
||||||
# Note(thread-safety): blocking call
|
# Note(thread-safety): blocking call
|
||||||
rules = self.invoke_rpc(base.ENGINE_SERVICE,
|
rules = self.invoke_rpc(base.ENGINE_SERVICE_ID,
|
||||||
'persistent_get_rules', args)
|
'persistent_get_rules', args)
|
||||||
return {'results': rules}
|
return {'results': rules}
|
||||||
except exception.CongressException as e:
|
except exception.CongressException as e:
|
||||||
@ -101,7 +101,7 @@ class RuleModel(base.APIModel):
|
|||||||
'rule_name': item.get('name'),
|
'rule_name': item.get('name'),
|
||||||
'comment': item.get('comment')}
|
'comment': item.get('comment')}
|
||||||
# Note(thread-safety): blocking call
|
# Note(thread-safety): blocking call
|
||||||
return self.invoke_rpc(base.ENGINE_SERVICE,
|
return self.invoke_rpc(base.ENGINE_SERVICE_ID,
|
||||||
'persistent_insert_rule', args,
|
'persistent_insert_rule', args,
|
||||||
timeout=self.dse_long_timeout)
|
timeout=self.dse_long_timeout)
|
||||||
except exception.CongressException as e:
|
except exception.CongressException as e:
|
||||||
@ -126,7 +126,7 @@ class RuleModel(base.APIModel):
|
|||||||
try:
|
try:
|
||||||
args = {'id_': id_, 'policy_name_or_id': self.policy_name(context)}
|
args = {'id_': id_, 'policy_name_or_id': self.policy_name(context)}
|
||||||
# Note(thread-safety): blocking call
|
# Note(thread-safety): blocking call
|
||||||
return self.invoke_rpc(base.ENGINE_SERVICE,
|
return self.invoke_rpc(base.ENGINE_SERVICE_ID,
|
||||||
'persistent_delete_rule', args,
|
'persistent_delete_rule', args,
|
||||||
timeout=self.dse_long_timeout)
|
timeout=self.dse_long_timeout)
|
||||||
except exception.CongressException as e:
|
except exception.CongressException as e:
|
||||||
|
@ -32,6 +32,7 @@ from oslo_utils import importutils
|
|||||||
from oslo_utils import strutils
|
from oslo_utils import strutils
|
||||||
from oslo_utils import uuidutils
|
from oslo_utils import uuidutils
|
||||||
|
|
||||||
|
from congress.api import base as api_base
|
||||||
from congress.datasources import constants
|
from congress.datasources import constants
|
||||||
from congress.db import datasources as datasources_db
|
from congress.db import datasources as datasources_db
|
||||||
from congress.dse2 import control_bus
|
from congress.dse2 import control_bus
|
||||||
@ -718,8 +719,8 @@ class DseNode(object):
|
|||||||
|
|
||||||
# check the request has valid information
|
# check the request has valid information
|
||||||
self.validate_create_datasource(req)
|
self.validate_create_datasource(req)
|
||||||
if self.is_valid_service(req['name']):
|
if (len(req['name']) == 0 or req['name'][0] == '_'):
|
||||||
raise exception.DatasourceNameInUse(value=req['name'])
|
raise exception.InvalidDatasourceName(value=req['name'])
|
||||||
|
|
||||||
new_id = req['id']
|
new_id = req['id']
|
||||||
LOG.debug("adding datasource %s", req['name'])
|
LOG.debug("adding datasource %s", req['name'])
|
||||||
@ -748,7 +749,7 @@ class DseNode(object):
|
|||||||
# immediate synch policies on local PE if present
|
# immediate synch policies on local PE if present
|
||||||
# otherwise wait for regularly scheduled synch
|
# otherwise wait for regularly scheduled synch
|
||||||
# TODO(dse2): use finer-grained method to synch specific policies
|
# TODO(dse2): use finer-grained method to synch specific policies
|
||||||
engine = self.service_object('engine')
|
engine = self.service_object(api_base.ENGINE_SERVICE_ID)
|
||||||
if engine is not None:
|
if engine is not None:
|
||||||
engine.synchronize_policies()
|
engine.synchronize_policies()
|
||||||
# TODO(dse2): also broadcast to all PE nodes to synch
|
# TODO(dse2): also broadcast to all PE nodes to synch
|
||||||
|
@ -212,6 +212,11 @@ class DatasourceNameInUse(Conflict):
|
|||||||
msg_fmt = _("Datasource already in use with name %(value)s")
|
msg_fmt = _("Datasource already in use with name %(value)s")
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidDatasourceName(BadConfig):
|
||||||
|
msg_fmt = _("Datasource name %(value) is invalid. Cannot be empty or "
|
||||||
|
"start with underscore.")
|
||||||
|
|
||||||
|
|
||||||
class DatasourceNotFound(NotFound):
|
class DatasourceNotFound(NotFound):
|
||||||
msg_fmt = _("Datasource not found %(id)s")
|
msg_fmt = _("Datasource not found %(id)s")
|
||||||
|
|
||||||
|
@ -29,6 +29,7 @@ from oslo_log import log as logging
|
|||||||
|
|
||||||
from congress.api import action_model
|
from congress.api import action_model
|
||||||
from congress.api import application
|
from congress.api import application
|
||||||
|
from congress.api import base as api_base
|
||||||
from congress.api import datasource_model
|
from congress.api import datasource_model
|
||||||
from congress.api import policy_model
|
from congress.api import policy_model
|
||||||
from congress.api import router
|
from congress.api import router
|
||||||
@ -44,7 +45,6 @@ from congress.policy_engines import agnostic
|
|||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
ENGINE_SERVICE_NAME = 'engine'
|
|
||||||
|
|
||||||
|
|
||||||
def create2(node, policy_engine=True, datasources=True, api=True):
|
def create2(node, policy_engine=True, datasources=True, api=True):
|
||||||
@ -69,9 +69,9 @@ def create2(node, policy_engine=True, datasources=True, api=True):
|
|||||||
if policy_engine:
|
if policy_engine:
|
||||||
LOG.info("Registering congress PolicyEngine service on node %s",
|
LOG.info("Registering congress PolicyEngine service on node %s",
|
||||||
node.node_id)
|
node.node_id)
|
||||||
services[ENGINE_SERVICE_NAME] = create_policy_engine()
|
services[api_base.ENGINE_SERVICE_ID] = create_policy_engine()
|
||||||
node.register_service(services[ENGINE_SERVICE_NAME])
|
node.register_service(services[api_base.ENGINE_SERVICE_ID])
|
||||||
initialize_policy_engine(services[ENGINE_SERVICE_NAME])
|
initialize_policy_engine(services[api_base.ENGINE_SERVICE_ID])
|
||||||
|
|
||||||
if datasources:
|
if datasources:
|
||||||
LOG.info("Registering congress datasource services on node %s",
|
LOG.info("Registering congress datasource services on node %s",
|
||||||
@ -82,7 +82,7 @@ def create2(node, policy_engine=True, datasources=True, api=True):
|
|||||||
# for ds in services['datasources']:
|
# for ds in services['datasources']:
|
||||||
# try:
|
# try:
|
||||||
# utils.create_datasource_policy(ds, ds.name,
|
# utils.create_datasource_policy(ds, ds.name,
|
||||||
# ENGINE_SERVICE_NAME)
|
# api_base.ENGINE_SERVICE_ID)
|
||||||
# except (exception.BadConfig,
|
# except (exception.BadConfig,
|
||||||
# exception.DatasourceNameInUse,
|
# exception.DatasourceNameInUse,
|
||||||
# exception.DriverNotFound,
|
# exception.DriverNotFound,
|
||||||
@ -92,7 +92,7 @@ def create2(node, policy_engine=True, datasources=True, api=True):
|
|||||||
|
|
||||||
# start synchronizer and other periodic tasks
|
# start synchronizer and other periodic tasks
|
||||||
if policy_engine:
|
if policy_engine:
|
||||||
services[ENGINE_SERVICE_NAME].start_policy_synchronizer()
|
services[api_base.ENGINE_SERVICE_ID].start_policy_synchronizer()
|
||||||
if datasources:
|
if datasources:
|
||||||
node.start_periodic_tasks()
|
node.start_periodic_tasks()
|
||||||
return services
|
return services
|
||||||
@ -126,7 +126,7 @@ def create_api_models(bus):
|
|||||||
|
|
||||||
def create_policy_engine():
|
def create_policy_engine():
|
||||||
"""Create policy engine and initialize it using the api models."""
|
"""Create policy engine and initialize it using the api models."""
|
||||||
engine = agnostic.DseRuntime(ENGINE_SERVICE_NAME)
|
engine = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
|
||||||
engine.debug_mode() # should take this out for production
|
engine.debug_mode() # should take this out for production
|
||||||
return engine
|
return engine
|
||||||
|
|
||||||
|
@ -16,6 +16,7 @@ from futurist import periodics
|
|||||||
import mock
|
import mock
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
|
|
||||||
|
from congress.api import base as api_base
|
||||||
from congress.common import config
|
from congress.common import config
|
||||||
from congress import harness
|
from congress import harness
|
||||||
from congress.tests import fake_datasource
|
from congress.tests import fake_datasource
|
||||||
@ -60,7 +61,7 @@ def setup_config(with_fake_datasource=True, node_id='testnode',
|
|||||||
engine_service = None
|
engine_service = None
|
||||||
api_service = None
|
api_service = None
|
||||||
if policy:
|
if policy:
|
||||||
engine_service = services[harness.ENGINE_SERVICE_NAME]
|
engine_service = services[api_base.ENGINE_SERVICE_ID]
|
||||||
if api:
|
if api:
|
||||||
api_service = services['api']
|
api_service = services['api']
|
||||||
|
|
||||||
|
@ -45,7 +45,7 @@ class TestAPIUtils(base.SqlTestCase):
|
|||||||
|
|
||||||
def test_get_id_from_context_policy_id(self):
|
def test_get_id_from_context_policy_id(self):
|
||||||
context = {'policy_id': 'policy id'}
|
context = {'policy_id': 'policy id'}
|
||||||
expected = ('engine', 'policy id')
|
expected = ('__engine', 'policy id')
|
||||||
result = api_utils.get_id_from_context(context)
|
result = api_utils.get_id_from_context(context)
|
||||||
self.assertEqual(expected, result)
|
self.assertEqual(expected, result)
|
||||||
|
|
||||||
|
@ -24,6 +24,7 @@ from oslo_config import cfg
|
|||||||
cfg.CONF.datasource_sync_period = 0
|
cfg.CONF.datasource_sync_period = 0
|
||||||
from oslo_messaging import conffixture
|
from oslo_messaging import conffixture
|
||||||
|
|
||||||
|
from congress.api import base as api_base
|
||||||
from congress.datalog import compile
|
from congress.datalog import compile
|
||||||
from congress.datasources import nova_driver
|
from congress.datasources import nova_driver
|
||||||
from congress import exception as congressException
|
from congress import exception as congressException
|
||||||
@ -268,7 +269,7 @@ class TestDSE(base.TestCase):
|
|||||||
node = helper.make_dsenode_new_partition('testnode')
|
node = helper.make_dsenode_new_partition('testnode')
|
||||||
node.always_snapshot = False
|
node.always_snapshot = False
|
||||||
data = fake_datasource.FakeDataSource('data')
|
data = fake_datasource.FakeDataSource('data')
|
||||||
engine = agnostic.DseRuntime('engine')
|
engine = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
|
||||||
node.register_service(data)
|
node.register_service(data)
|
||||||
node.register_service(engine)
|
node.register_service(engine)
|
||||||
|
|
||||||
@ -286,7 +287,7 @@ class TestDSE(base.TestCase):
|
|||||||
node = helper.make_dsenode_new_partition('testnode')
|
node = helper.make_dsenode_new_partition('testnode')
|
||||||
node.always_snapshot = False
|
node.always_snapshot = False
|
||||||
data = fake_datasource.FakeDataSource('data')
|
data = fake_datasource.FakeDataSource('data')
|
||||||
engine = agnostic.DseRuntime('engine')
|
engine = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
|
||||||
node.register_service(data)
|
node.register_service(data)
|
||||||
node.register_service(engine)
|
node.register_service(engine)
|
||||||
|
|
||||||
@ -308,7 +309,7 @@ class TestDSE(base.TestCase):
|
|||||||
node = helper.make_dsenode_new_partition('testnode')
|
node = helper.make_dsenode_new_partition('testnode')
|
||||||
node.always_snapshot = False
|
node.always_snapshot = False
|
||||||
data = fake_datasource.FakeDataSource('data')
|
data = fake_datasource.FakeDataSource('data')
|
||||||
engine = agnostic.DseRuntime('engine')
|
engine = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
|
||||||
node.register_service(data)
|
node.register_service(data)
|
||||||
node.register_service(engine)
|
node.register_service(engine)
|
||||||
|
|
||||||
|
@ -17,8 +17,9 @@ import sys
|
|||||||
|
|
||||||
import mock
|
import mock
|
||||||
|
|
||||||
|
from congress.api import base as api_base
|
||||||
from congress.policy_engines import agnostic
|
from congress.policy_engines import agnostic
|
||||||
from congress.tests.api import base as api_base
|
from congress.tests.api import base as tests_api_base
|
||||||
from congress.tests import base
|
from congress.tests import base
|
||||||
from congress.tests import helper
|
from congress.tests import helper
|
||||||
|
|
||||||
@ -44,7 +45,7 @@ class TestDseRuntime(base.SqlTestCase):
|
|||||||
]
|
]
|
||||||
patched_persisted_rules.return_value = persisted_rule
|
patched_persisted_rules.return_value = persisted_rule
|
||||||
|
|
||||||
services = api_base.setup_config()
|
services = tests_api_base.setup_config()
|
||||||
engine2 = services['engine']
|
engine2 = services['engine']
|
||||||
node = services['node']
|
node = services['node']
|
||||||
|
|
||||||
@ -73,7 +74,7 @@ class TestDseRuntime(base.SqlTestCase):
|
|||||||
class TestAgnostic(base.TestCase):
|
class TestAgnostic(base.TestCase):
|
||||||
def test_receive_data_no_sequence_num(self):
|
def test_receive_data_no_sequence_num(self):
|
||||||
'''Test receiving data without sequence numbers'''
|
'''Test receiving data without sequence numbers'''
|
||||||
run = agnostic.DseRuntime('engine')
|
run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
|
||||||
run.always_snapshot = False
|
run.always_snapshot = False
|
||||||
run.create_policy('datasource1')
|
run.create_policy('datasource1')
|
||||||
|
|
||||||
@ -119,7 +120,7 @@ class TestAgnostic(base.TestCase):
|
|||||||
|
|
||||||
def test_receive_data_in_order(self):
|
def test_receive_data_in_order(self):
|
||||||
'''Test receiving data with sequence numbers, in order'''
|
'''Test receiving data with sequence numbers, in order'''
|
||||||
run = agnostic.DseRuntime('engine')
|
run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
|
||||||
run.always_snapshot = False
|
run.always_snapshot = False
|
||||||
run.create_policy('datasource1')
|
run.create_policy('datasource1')
|
||||||
|
|
||||||
@ -165,7 +166,7 @@ class TestAgnostic(base.TestCase):
|
|||||||
|
|
||||||
def test_receive_data_out_of_order(self):
|
def test_receive_data_out_of_order(self):
|
||||||
'''Test receiving data with sequence numbers, out of order'''
|
'''Test receiving data with sequence numbers, out of order'''
|
||||||
run = agnostic.DseRuntime('engine')
|
run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
|
||||||
run.always_snapshot = False
|
run.always_snapshot = False
|
||||||
run.create_policy('datasource1')
|
run.create_policy('datasource1')
|
||||||
|
|
||||||
@ -208,7 +209,7 @@ class TestAgnostic(base.TestCase):
|
|||||||
|
|
||||||
def test_receive_data_arbitrary_start(self):
|
def test_receive_data_arbitrary_start(self):
|
||||||
'''Test receiving data with arbitrary starting sequence number'''
|
'''Test receiving data with arbitrary starting sequence number'''
|
||||||
run = agnostic.DseRuntime('engine')
|
run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
|
||||||
run.always_snapshot = False
|
run.always_snapshot = False
|
||||||
run.create_policy('datasource1')
|
run.create_policy('datasource1')
|
||||||
run.receive_data_sequenced(
|
run.receive_data_sequenced(
|
||||||
@ -223,7 +224,7 @@ class TestAgnostic(base.TestCase):
|
|||||||
|
|
||||||
Only one message (arbitrary) should be processed.
|
Only one message (arbitrary) should be processed.
|
||||||
'''
|
'''
|
||||||
run = agnostic.DseRuntime('engine')
|
run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
|
||||||
run.always_snapshot = False
|
run.always_snapshot = False
|
||||||
run.create_policy('datasource1')
|
run.create_policy('datasource1')
|
||||||
|
|
||||||
@ -255,7 +256,7 @@ class TestAgnostic(base.TestCase):
|
|||||||
|
|
||||||
def test_receive_data_sequence_number_max_int(self):
|
def test_receive_data_sequence_number_max_int(self):
|
||||||
'''Test receiving data when sequence number goes over max int'''
|
'''Test receiving data when sequence number goes over max int'''
|
||||||
run = agnostic.DseRuntime('engine')
|
run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
|
||||||
run.always_snapshot = False
|
run.always_snapshot = False
|
||||||
run.create_policy('datasource1')
|
run.create_policy('datasource1')
|
||||||
|
|
||||||
@ -297,7 +298,7 @@ class TestAgnostic(base.TestCase):
|
|||||||
|
|
||||||
def test_receive_data_multiple_tables(self):
|
def test_receive_data_multiple_tables(self):
|
||||||
'''Test receiving data with sequence numbers, multiple tables'''
|
'''Test receiving data with sequence numbers, multiple tables'''
|
||||||
run = agnostic.DseRuntime('engine')
|
run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
|
||||||
run.always_snapshot = False
|
run.always_snapshot = False
|
||||||
run.create_policy('datasource1')
|
run.create_policy('datasource1')
|
||||||
|
|
||||||
|
@ -22,6 +22,7 @@ import tenacity
|
|||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
|
||||||
|
from congress.api import base as api_base
|
||||||
from congress.datalog import base
|
from congress.datalog import base
|
||||||
from congress.datalog import compile
|
from congress.datalog import compile
|
||||||
from congress import harness
|
from congress import harness
|
||||||
@ -206,7 +207,7 @@ class TestDsePerformance(testbase.SqlTestCase):
|
|||||||
'datasource': self.cage.service_object('api-datasource'),
|
'datasource': self.cage.service_object('api-datasource'),
|
||||||
'status': self.cage.service_object('api-status'),
|
'status': self.cage.service_object('api-status'),
|
||||||
'schema': self.cage.service_object('api-schema')}
|
'schema': self.cage.service_object('api-schema')}
|
||||||
self.engine = self.cage.service_object('engine')
|
self.engine = self.cage.service_object(api_base.ENGINE_SERVICE_ID)
|
||||||
|
|
||||||
@tenacity.retry(wait=tenacity.wait_fixed(0.1))
|
@tenacity.retry(wait=tenacity.wait_fixed(0.1))
|
||||||
def wait_til_query_nonempty(self, query, policy):
|
def wait_til_query_nonempty(self, query, policy):
|
||||||
|
@ -22,6 +22,7 @@ import eventlet
|
|||||||
from mox3 import mox
|
from mox3 import mox
|
||||||
from six.moves import range
|
from six.moves import range
|
||||||
|
|
||||||
|
from congress.api import base as api_base
|
||||||
from congress.datalog import compile
|
from congress.datalog import compile
|
||||||
from congress.dse import dataobj
|
from congress.dse import dataobj
|
||||||
from congress import harness
|
from congress import harness
|
||||||
@ -41,7 +42,7 @@ class BenchmarkDatasource(base.Benchmark):
|
|||||||
'module': helper.data_module_path('benchmark_driver.py'),
|
'module': helper.data_module_path('benchmark_driver.py'),
|
||||||
'poll_time': 0}}
|
'poll_time': 0}}
|
||||||
cage = harness.create(helper.root_path(), None, config)
|
cage = harness.create(helper.root_path(), None, config)
|
||||||
engine = cage.service_object('engine')
|
engine = cage.service_object(api_base.ENGINE_SERVICE_ID)
|
||||||
api = {'policy': cage.service_object('api-policy'),
|
api = {'policy': cage.service_object('api-policy'),
|
||||||
'rule': cage.service_object('api-rule'),
|
'rule': cage.service_object('api-rule'),
|
||||||
'table': cage.service_object('api-table'),
|
'table': cage.service_object('api-table'),
|
||||||
|
@ -28,11 +28,11 @@ import mock
|
|||||||
import neutronclient.v2_0
|
import neutronclient.v2_0
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
|
||||||
|
from congress.api import base as api_base
|
||||||
from congress.common import config
|
from congress.common import config
|
||||||
from congress.datasources import neutronv2_driver
|
from congress.datasources import neutronv2_driver
|
||||||
from congress.datasources import nova_driver
|
from congress.datasources import nova_driver
|
||||||
from congress import harness
|
from congress.tests.api import base as tests_api_base
|
||||||
from congress.tests.api import base as api_base
|
|
||||||
from congress.tests import base
|
from congress.tests import base
|
||||||
from congress.tests.datasources import test_neutron_driver as test_neutron
|
from congress.tests.datasources import test_neutron_driver as test_neutron
|
||||||
from congress.tests import helper
|
from congress.tests import helper
|
||||||
@ -45,7 +45,7 @@ class BaseTestPolicyCongress(base.SqlTestCase):
|
|||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(BaseTestPolicyCongress, self).setUp()
|
super(BaseTestPolicyCongress, self).setUp()
|
||||||
self.services = api_base.setup_config(with_fake_datasource=False)
|
self.services = tests_api_base.setup_config(with_fake_datasource=False)
|
||||||
self.api = self.services['api']
|
self.api = self.services['api']
|
||||||
self.node = self.services['node']
|
self.node = self.services['node']
|
||||||
self.engine = self.services['engine']
|
self.engine = self.services['engine']
|
||||||
@ -87,8 +87,8 @@ class TestCongress(BaseTestPolicyCongress):
|
|||||||
|
|
||||||
def test_startup(self):
|
def test_startup(self):
|
||||||
self.assertIsNotNone(self.services['api'])
|
self.assertIsNotNone(self.services['api'])
|
||||||
self.assertIsNotNone(self.services[harness.ENGINE_SERVICE_NAME])
|
self.assertIsNotNone(self.services['engine'])
|
||||||
self.assertIsNotNone(self.services[harness.ENGINE_SERVICE_NAME].node)
|
self.assertIsNotNone(self.services['engine'].node)
|
||||||
|
|
||||||
def test_policy(self):
|
def test_policy(self):
|
||||||
self.create_policy('alpha')
|
self.create_policy('alpha')
|
||||||
@ -187,7 +187,7 @@ class APILocalRouting(BaseTestPolicyCongress):
|
|||||||
super(APILocalRouting, self).setUp()
|
super(APILocalRouting, self).setUp()
|
||||||
|
|
||||||
# set up second API+PE node
|
# set up second API+PE node
|
||||||
self.services = api_base.setup_config(
|
self.services = tests_api_base.setup_config(
|
||||||
with_fake_datasource=False, node_id='testnode2',
|
with_fake_datasource=False, node_id='testnode2',
|
||||||
same_partition_as_node=self.node)
|
same_partition_as_node=self.node)
|
||||||
self.api2 = self.services['api']
|
self.api2 = self.services['api']
|
||||||
@ -227,7 +227,7 @@ class APILocalRouting(BaseTestPolicyCongress):
|
|||||||
|
|
||||||
def test_internode_pe_routing(self):
|
def test_internode_pe_routing(self):
|
||||||
'''test reach internode PE when intranode PE not available'''
|
'''test reach internode PE when intranode PE not available'''
|
||||||
self.node.unregister_service('engine')
|
self.node.unregister_service(api_base.ENGINE_SERVICE_ID)
|
||||||
result = self.api['api-row'].get_items(
|
result = self.api['api-row'].get_items(
|
||||||
{}, {'policy_id': 'policy', 'table_id': 'p'})
|
{}, {'policy_id': 'policy', 'table_id': 'p'})
|
||||||
self.assertEqual(len(result['results']), 2)
|
self.assertEqual(len(result['results']), 2)
|
||||||
|
Loading…
Reference in New Issue
Block a user