From aeb5d6f79eb76ba1948c42f0547f50307ffce8c1 Mon Sep 17 00:00:00 2001 From: Eric K Date: Mon, 12 Sep 2016 20:37:05 -0700 Subject: [PATCH] Check DB only to detect duplicate DS name on add Now that we have moved to modifying DB first and letting in-mem catch up on sync we should only consult DB as the official record The extraneous check of whether the ds exists in-mem may have been causing testHA failure when attempting to create ds 'fake' because 'fake' was removed from DB but sometimes remains in-mem. Additionally, prepend __ to internal service_ids to avoid breaking compatibility with users who may have named datasource 'engine' Change-Id: Iffff446693605c43f7d741ee111b13e79d30b30f --- congress/api/api_utils.py | 2 +- congress/api/base.py | 7 ++++--- congress/api/datasource_model.py | 2 +- congress/api/policy_model.py | 14 +++++++------- congress/api/row_model.py | 4 ++-- congress/api/rule_model.py | 8 ++++---- congress/dse2/dse_node.py | 7 ++++--- congress/exception.py | 5 +++++ congress/harness.py | 14 +++++++------- congress/tests/api/base.py | 3 ++- congress/tests/api/test_api_utils.py | 2 +- congress/tests/dse2/test_dse2.py | 7 ++++--- .../policy_engines/test_agnostic_dse2.py | 19 ++++++++++--------- .../test_agnostic_performance.py | 3 ++- congress/tests/test_benchmark_updates.py | 3 ++- congress/tests/test_congress.py | 14 +++++++------- 16 files changed, 63 insertions(+), 51 deletions(-) diff --git a/congress/api/api_utils.py b/congress/api/api_utils.py index f0f1c1c4a..a31f6db7f 100644 --- a/congress/api/api_utils.py +++ b/congress/api/api_utils.py @@ -41,7 +41,7 @@ def get_id_from_context(context): ds_name = db_datasources.get_datasource_name(context.get('ds_id')) return ds_name, context.get('ds_id') elif 'policy_id' in context: - return base.ENGINE_SERVICE, context.get('policy_id') + return base.ENGINE_SERVICE_ID, context.get('policy_id') else: msg = "Internal error: context %s should have included " % str(context) "either ds_id or policy_id" diff --git a/congress/api/base.py b/congress/api/base.py index 5f1a84b58..c76e71c66 100644 --- a/congress/api/base.py +++ b/congress/api/base.py @@ -19,7 +19,7 @@ from __future__ import absolute_import from oslo_config import cfg -ENGINE_SERVICE = 'engine' +ENGINE_SERVICE_ID = '__engine' class APIModel(object): @@ -32,7 +32,8 @@ class APIModel(object): # Note(thread-safety): blocking function def invoke_rpc(self, caller, name, kwargs, timeout=None): - local = (caller is ENGINE_SERVICE and - self.bus.node.service_object(ENGINE_SERVICE) is not None) + local = (caller is ENGINE_SERVICE_ID and + self.bus.node.service_object( + ENGINE_SERVICE_ID) is not None) return self.bus.rpc( caller, name, kwargs, timeout=timeout, local=local) diff --git a/congress/api/datasource_model.py b/congress/api/datasource_model.py index 3cdf37f21..4009181ff 100644 --- a/congress/api/datasource_model.py +++ b/congress/api/datasource_model.py @@ -149,7 +149,7 @@ class DatasourceModel(base.APIModel): # TODO(ekcs): perhaps keep execution synchronous when explicitly # called via API # Note(thread-safety): blocking call - self.invoke_rpc(base.ENGINE_SERVICE, 'execute_action', args) + self.invoke_rpc(base.ENGINE_SERVICE_ID, 'execute_action', args) except exception.PolicyException as e: (num, desc) = error_codes.get('execute_error') raise webservice.DataModelException(num, desc + "::" + str(e)) diff --git a/congress/api/policy_model.py b/congress/api/policy_model.py index 28a39cb8a..ebddb101c 100644 --- a/congress/api/policy_model.py +++ b/congress/api/policy_model.py @@ -46,7 +46,7 @@ class PolicyModel(base.APIModel): """ try: # Note(thread-safety): blocking call - return {"results": self.invoke_rpc(base.ENGINE_SERVICE, + return {"results": self.invoke_rpc(base.ENGINE_SERVICE_ID, 'persistent_get_policies', {})} except exception.CongressException as e: @@ -67,7 +67,7 @@ class PolicyModel(base.APIModel): """ try: # Note(thread-safety): blocking call - return self.invoke_rpc(base.ENGINE_SERVICE, + return self.invoke_rpc(base.ENGINE_SERVICE_ID, 'persistent_get_policy', {'id_': id_}) except exception.CongressException as e: @@ -96,7 +96,7 @@ class PolicyModel(base.APIModel): try: # Note(thread-safety): blocking call policy_metadata = self.invoke_rpc( - base.ENGINE_SERVICE, 'persistent_create_policy', + base.ENGINE_SERVICE_ID, 'persistent_create_policy', {'name': name, 'abbr': item.get('abbreviation'), 'kind': item.get('kind'), @@ -139,7 +139,7 @@ class PolicyModel(base.APIModel): KeyError: Item with specified id_ not present. """ # Note(thread-safety): blocking call - return self.invoke_rpc(base.ENGINE_SERVICE, + return self.invoke_rpc(base.ENGINE_SERVICE_ID, 'persistent_delete_policy', {'name_or_id': id_}) @@ -176,8 +176,8 @@ class PolicyModel(base.APIModel): 'action_theory': actions, 'delta': delta, 'trace': trace, 'as_list': True} # Note(thread-safety): blocking call - result = self.invoke_rpc(base.ENGINE_SERVICE, 'simulate', args, - timeout=self.dse_long_timeout) + result = self.invoke_rpc(base.ENGINE_SERVICE_ID, 'simulate', + args, timeout=self.dse_long_timeout) except exception.PolicyException as e: (num, desc) = error_codes.get('simulate_error') raise webservice.DataModelException(num, desc + "::" + str(e)) @@ -209,7 +209,7 @@ class PolicyModel(base.APIModel): 'action': action, 'action_args': action_args} # Note(thread-safety): blocking call - self.invoke_rpc(base.ENGINE_SERVICE, 'execute_action', args) + self.invoke_rpc(base.ENGINE_SERVICE_ID, 'execute_action', args) except exception.PolicyException as e: (num, desc) = error_codes.get('execute_error') raise webservice.DataModelException(num, desc + "::" + str(e)) diff --git a/congress/api/row_model.py b/congress/api/row_model.py index 0965b853a..f77f7247c 100644 --- a/congress/api/row_model.py +++ b/congress/api/row_model.py @@ -78,7 +78,7 @@ class RowModel(base.APIModel): try: args = {'table_id': table_id, 'source_id': source_id, 'trace': gen_trace} - if caller is base.ENGINE_SERVICE: + if caller is base.ENGINE_SERVICE_ID: # allow extra time for row policy engine query # Note(thread-safety): blocking call result = self.invoke_rpc( @@ -93,7 +93,7 @@ class RowModel(base.APIModel): LOG.exception(m) raise webservice.DataModelException.create(e) - if gen_trace and caller is base.ENGINE_SERVICE: + if gen_trace and caller is base.ENGINE_SERVICE_ID: # DSE2 returns lists instead of tuples, so correct that. results = [{'data': tuple(x['data'])} for x in result[0]] return {'results': results, diff --git a/congress/api/rule_model.py b/congress/api/rule_model.py index 3e31ab22c..1de782f14 100644 --- a/congress/api/rule_model.py +++ b/congress/api/rule_model.py @@ -48,7 +48,7 @@ class RuleModel(base.APIModel): try: args = {'id_': id_, 'policy_name': self.policy_name(context)} # Note(thread-safety): blocking call - return self.invoke_rpc(base.ENGINE_SERVICE, + return self.invoke_rpc(base.ENGINE_SERVICE_ID, 'persistent_get_rule', args) except exception.CongressException as e: raise webservice.DataModelException.create(e) @@ -69,7 +69,7 @@ class RuleModel(base.APIModel): try: args = {'policy_name': self.policy_name(context)} # Note(thread-safety): blocking call - rules = self.invoke_rpc(base.ENGINE_SERVICE, + rules = self.invoke_rpc(base.ENGINE_SERVICE_ID, 'persistent_get_rules', args) return {'results': rules} except exception.CongressException as e: @@ -101,7 +101,7 @@ class RuleModel(base.APIModel): 'rule_name': item.get('name'), 'comment': item.get('comment')} # Note(thread-safety): blocking call - return self.invoke_rpc(base.ENGINE_SERVICE, + return self.invoke_rpc(base.ENGINE_SERVICE_ID, 'persistent_insert_rule', args, timeout=self.dse_long_timeout) except exception.CongressException as e: @@ -126,7 +126,7 @@ class RuleModel(base.APIModel): try: args = {'id_': id_, 'policy_name_or_id': self.policy_name(context)} # Note(thread-safety): blocking call - return self.invoke_rpc(base.ENGINE_SERVICE, + return self.invoke_rpc(base.ENGINE_SERVICE_ID, 'persistent_delete_rule', args, timeout=self.dse_long_timeout) except exception.CongressException as e: diff --git a/congress/dse2/dse_node.py b/congress/dse2/dse_node.py index e197abc39..61af7a3f4 100644 --- a/congress/dse2/dse_node.py +++ b/congress/dse2/dse_node.py @@ -32,6 +32,7 @@ from oslo_utils import importutils from oslo_utils import strutils from oslo_utils import uuidutils +from congress.api import base as api_base from congress.datasources import constants from congress.db import datasources as datasources_db from congress.dse2 import control_bus @@ -718,8 +719,8 @@ class DseNode(object): # check the request has valid information self.validate_create_datasource(req) - if self.is_valid_service(req['name']): - raise exception.DatasourceNameInUse(value=req['name']) + if (len(req['name']) == 0 or req['name'][0] == '_'): + raise exception.InvalidDatasourceName(value=req['name']) new_id = req['id'] LOG.debug("adding datasource %s", req['name']) @@ -748,7 +749,7 @@ class DseNode(object): # immediate synch policies on local PE if present # otherwise wait for regularly scheduled synch # TODO(dse2): use finer-grained method to synch specific policies - engine = self.service_object('engine') + engine = self.service_object(api_base.ENGINE_SERVICE_ID) if engine is not None: engine.synchronize_policies() # TODO(dse2): also broadcast to all PE nodes to synch diff --git a/congress/exception.py b/congress/exception.py index 964d59527..71fb04cef 100644 --- a/congress/exception.py +++ b/congress/exception.py @@ -212,6 +212,11 @@ class DatasourceNameInUse(Conflict): msg_fmt = _("Datasource already in use with name %(value)s") +class InvalidDatasourceName(BadConfig): + msg_fmt = _("Datasource name %(value) is invalid. Cannot be empty or " + "start with underscore.") + + class DatasourceNotFound(NotFound): msg_fmt = _("Datasource not found %(id)s") diff --git a/congress/harness.py b/congress/harness.py index 216864406..ca76fb5e3 100644 --- a/congress/harness.py +++ b/congress/harness.py @@ -29,6 +29,7 @@ from oslo_log import log as logging from congress.api import action_model from congress.api import application +from congress.api import base as api_base from congress.api import datasource_model from congress.api import policy_model from congress.api import router @@ -44,7 +45,6 @@ from congress.policy_engines import agnostic LOG = logging.getLogger(__name__) -ENGINE_SERVICE_NAME = 'engine' def create2(node, policy_engine=True, datasources=True, api=True): @@ -69,9 +69,9 @@ def create2(node, policy_engine=True, datasources=True, api=True): if policy_engine: LOG.info("Registering congress PolicyEngine service on node %s", node.node_id) - services[ENGINE_SERVICE_NAME] = create_policy_engine() - node.register_service(services[ENGINE_SERVICE_NAME]) - initialize_policy_engine(services[ENGINE_SERVICE_NAME]) + services[api_base.ENGINE_SERVICE_ID] = create_policy_engine() + node.register_service(services[api_base.ENGINE_SERVICE_ID]) + initialize_policy_engine(services[api_base.ENGINE_SERVICE_ID]) if datasources: LOG.info("Registering congress datasource services on node %s", @@ -82,7 +82,7 @@ def create2(node, policy_engine=True, datasources=True, api=True): # for ds in services['datasources']: # try: # utils.create_datasource_policy(ds, ds.name, - # ENGINE_SERVICE_NAME) + # api_base.ENGINE_SERVICE_ID) # except (exception.BadConfig, # exception.DatasourceNameInUse, # exception.DriverNotFound, @@ -92,7 +92,7 @@ def create2(node, policy_engine=True, datasources=True, api=True): # start synchronizer and other periodic tasks if policy_engine: - services[ENGINE_SERVICE_NAME].start_policy_synchronizer() + services[api_base.ENGINE_SERVICE_ID].start_policy_synchronizer() if datasources: node.start_periodic_tasks() return services @@ -126,7 +126,7 @@ def create_api_models(bus): def create_policy_engine(): """Create policy engine and initialize it using the api models.""" - engine = agnostic.DseRuntime(ENGINE_SERVICE_NAME) + engine = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID) engine.debug_mode() # should take this out for production return engine diff --git a/congress/tests/api/base.py b/congress/tests/api/base.py index 12006a1cc..de491d901 100644 --- a/congress/tests/api/base.py +++ b/congress/tests/api/base.py @@ -16,6 +16,7 @@ from futurist import periodics import mock from oslo_config import cfg +from congress.api import base as api_base from congress.common import config from congress import harness from congress.tests import fake_datasource @@ -60,7 +61,7 @@ def setup_config(with_fake_datasource=True, node_id='testnode', engine_service = None api_service = None if policy: - engine_service = services[harness.ENGINE_SERVICE_NAME] + engine_service = services[api_base.ENGINE_SERVICE_ID] if api: api_service = services['api'] diff --git a/congress/tests/api/test_api_utils.py b/congress/tests/api/test_api_utils.py index 060813a32..5dd89d434 100644 --- a/congress/tests/api/test_api_utils.py +++ b/congress/tests/api/test_api_utils.py @@ -45,7 +45,7 @@ class TestAPIUtils(base.SqlTestCase): def test_get_id_from_context_policy_id(self): context = {'policy_id': 'policy id'} - expected = ('engine', 'policy id') + expected = ('__engine', 'policy id') result = api_utils.get_id_from_context(context) self.assertEqual(expected, result) diff --git a/congress/tests/dse2/test_dse2.py b/congress/tests/dse2/test_dse2.py index 131acf38b..1a6419ee9 100644 --- a/congress/tests/dse2/test_dse2.py +++ b/congress/tests/dse2/test_dse2.py @@ -24,6 +24,7 @@ from oslo_config import cfg cfg.CONF.datasource_sync_period = 0 from oslo_messaging import conffixture +from congress.api import base as api_base from congress.datalog import compile from congress.datasources import nova_driver from congress import exception as congressException @@ -268,7 +269,7 @@ class TestDSE(base.TestCase): node = helper.make_dsenode_new_partition('testnode') node.always_snapshot = False data = fake_datasource.FakeDataSource('data') - engine = agnostic.DseRuntime('engine') + engine = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID) node.register_service(data) node.register_service(engine) @@ -286,7 +287,7 @@ class TestDSE(base.TestCase): node = helper.make_dsenode_new_partition('testnode') node.always_snapshot = False data = fake_datasource.FakeDataSource('data') - engine = agnostic.DseRuntime('engine') + engine = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID) node.register_service(data) node.register_service(engine) @@ -308,7 +309,7 @@ class TestDSE(base.TestCase): node = helper.make_dsenode_new_partition('testnode') node.always_snapshot = False data = fake_datasource.FakeDataSource('data') - engine = agnostic.DseRuntime('engine') + engine = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID) node.register_service(data) node.register_service(engine) diff --git a/congress/tests/policy_engines/test_agnostic_dse2.py b/congress/tests/policy_engines/test_agnostic_dse2.py index 4caedf11e..19eaf2edf 100644 --- a/congress/tests/policy_engines/test_agnostic_dse2.py +++ b/congress/tests/policy_engines/test_agnostic_dse2.py @@ -17,8 +17,9 @@ import sys import mock +from congress.api import base as api_base from congress.policy_engines import agnostic -from congress.tests.api import base as api_base +from congress.tests.api import base as tests_api_base from congress.tests import base from congress.tests import helper @@ -44,7 +45,7 @@ class TestDseRuntime(base.SqlTestCase): ] patched_persisted_rules.return_value = persisted_rule - services = api_base.setup_config() + services = tests_api_base.setup_config() engine2 = services['engine'] node = services['node'] @@ -73,7 +74,7 @@ class TestDseRuntime(base.SqlTestCase): class TestAgnostic(base.TestCase): def test_receive_data_no_sequence_num(self): '''Test receiving data without sequence numbers''' - run = agnostic.DseRuntime('engine') + run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID) run.always_snapshot = False run.create_policy('datasource1') @@ -119,7 +120,7 @@ class TestAgnostic(base.TestCase): def test_receive_data_in_order(self): '''Test receiving data with sequence numbers, in order''' - run = agnostic.DseRuntime('engine') + run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID) run.always_snapshot = False run.create_policy('datasource1') @@ -165,7 +166,7 @@ class TestAgnostic(base.TestCase): def test_receive_data_out_of_order(self): '''Test receiving data with sequence numbers, out of order''' - run = agnostic.DseRuntime('engine') + run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID) run.always_snapshot = False run.create_policy('datasource1') @@ -208,7 +209,7 @@ class TestAgnostic(base.TestCase): def test_receive_data_arbitrary_start(self): '''Test receiving data with arbitrary starting sequence number''' - run = agnostic.DseRuntime('engine') + run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID) run.always_snapshot = False run.create_policy('datasource1') run.receive_data_sequenced( @@ -223,7 +224,7 @@ class TestAgnostic(base.TestCase): Only one message (arbitrary) should be processed. ''' - run = agnostic.DseRuntime('engine') + run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID) run.always_snapshot = False run.create_policy('datasource1') @@ -255,7 +256,7 @@ class TestAgnostic(base.TestCase): def test_receive_data_sequence_number_max_int(self): '''Test receiving data when sequence number goes over max int''' - run = agnostic.DseRuntime('engine') + run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID) run.always_snapshot = False run.create_policy('datasource1') @@ -297,7 +298,7 @@ class TestAgnostic(base.TestCase): def test_receive_data_multiple_tables(self): '''Test receiving data with sequence numbers, multiple tables''' - run = agnostic.DseRuntime('engine') + run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID) run.always_snapshot = False run.create_policy('datasource1') diff --git a/congress/tests/policy_engines/test_agnostic_performance.py b/congress/tests/policy_engines/test_agnostic_performance.py index feca48ecc..0baca2960 100644 --- a/congress/tests/policy_engines/test_agnostic_performance.py +++ b/congress/tests/policy_engines/test_agnostic_performance.py @@ -22,6 +22,7 @@ import tenacity from oslo_config import cfg from oslo_log import log as logging +from congress.api import base as api_base from congress.datalog import base from congress.datalog import compile from congress import harness @@ -206,7 +207,7 @@ class TestDsePerformance(testbase.SqlTestCase): 'datasource': self.cage.service_object('api-datasource'), 'status': self.cage.service_object('api-status'), 'schema': self.cage.service_object('api-schema')} - self.engine = self.cage.service_object('engine') + self.engine = self.cage.service_object(api_base.ENGINE_SERVICE_ID) @tenacity.retry(wait=tenacity.wait_fixed(0.1)) def wait_til_query_nonempty(self, query, policy): diff --git a/congress/tests/test_benchmark_updates.py b/congress/tests/test_benchmark_updates.py index cdd623719..f4f922ccc 100644 --- a/congress/tests/test_benchmark_updates.py +++ b/congress/tests/test_benchmark_updates.py @@ -22,6 +22,7 @@ import eventlet from mox3 import mox from six.moves import range +from congress.api import base as api_base from congress.datalog import compile from congress.dse import dataobj from congress import harness @@ -41,7 +42,7 @@ class BenchmarkDatasource(base.Benchmark): 'module': helper.data_module_path('benchmark_driver.py'), 'poll_time': 0}} cage = harness.create(helper.root_path(), None, config) - engine = cage.service_object('engine') + engine = cage.service_object(api_base.ENGINE_SERVICE_ID) api = {'policy': cage.service_object('api-policy'), 'rule': cage.service_object('api-rule'), 'table': cage.service_object('api-table'), diff --git a/congress/tests/test_congress.py b/congress/tests/test_congress.py index d10dba7f7..c53a67f13 100644 --- a/congress/tests/test_congress.py +++ b/congress/tests/test_congress.py @@ -28,11 +28,11 @@ import mock import neutronclient.v2_0 from oslo_log import log as logging +from congress.api import base as api_base from congress.common import config from congress.datasources import neutronv2_driver from congress.datasources import nova_driver -from congress import harness -from congress.tests.api import base as api_base +from congress.tests.api import base as tests_api_base from congress.tests import base from congress.tests.datasources import test_neutron_driver as test_neutron from congress.tests import helper @@ -45,7 +45,7 @@ class BaseTestPolicyCongress(base.SqlTestCase): def setUp(self): super(BaseTestPolicyCongress, self).setUp() - self.services = api_base.setup_config(with_fake_datasource=False) + self.services = tests_api_base.setup_config(with_fake_datasource=False) self.api = self.services['api'] self.node = self.services['node'] self.engine = self.services['engine'] @@ -87,8 +87,8 @@ class TestCongress(BaseTestPolicyCongress): def test_startup(self): self.assertIsNotNone(self.services['api']) - self.assertIsNotNone(self.services[harness.ENGINE_SERVICE_NAME]) - self.assertIsNotNone(self.services[harness.ENGINE_SERVICE_NAME].node) + self.assertIsNotNone(self.services['engine']) + self.assertIsNotNone(self.services['engine'].node) def test_policy(self): self.create_policy('alpha') @@ -187,7 +187,7 @@ class APILocalRouting(BaseTestPolicyCongress): super(APILocalRouting, self).setUp() # set up second API+PE node - self.services = api_base.setup_config( + self.services = tests_api_base.setup_config( with_fake_datasource=False, node_id='testnode2', same_partition_as_node=self.node) self.api2 = self.services['api'] @@ -227,7 +227,7 @@ class APILocalRouting(BaseTestPolicyCongress): def test_internode_pe_routing(self): '''test reach internode PE when intranode PE not available''' - self.node.unregister_service('engine') + self.node.unregister_service(api_base.ENGINE_SERVICE_ID) result = self.api['api-row'].get_items( {}, {'policy_id': 'policy', 'table_id': 'p'}) self.assertEqual(len(result['results']), 2)