diff --git a/congress/api/base.py b/congress/api/base.py index c76e71c66..3c6ffc8b3 100644 --- a/congress/api/base.py +++ b/congress/api/base.py @@ -20,6 +20,7 @@ from __future__ import absolute_import from oslo_config import cfg ENGINE_SERVICE_ID = '__engine' +DS_MANAGER_SERVICE_ID = '_ds_manager' class APIModel(object): diff --git a/congress/api/datasource_model.py b/congress/api/datasource_model.py index 6b8e7fb77..5cd79bebd 100644 --- a/congress/api/datasource_model.py +++ b/congress/api/datasource_model.py @@ -25,7 +25,6 @@ from congress.api import api_utils from congress.api import base from congress.api import error_codes from congress.api import webservice -from congress.dse2 import dse_node from congress import exception LOG = logging.getLogger(__name__) @@ -86,7 +85,7 @@ class DatasourceModel(base.APIModel): obj = None try: # Note(thread-safety): blocking call - obj = self.invoke_rpc(dse_node.DS_MANAGER_SERVICE_ID, + obj = self.invoke_rpc(base.DS_MANAGER_SERVICE_ID, 'add_datasource', {'items': item}, timeout=self.dse_long_timeout) @@ -114,7 +113,7 @@ class DatasourceModel(base.APIModel): # delete a different datasource # Fix: check UUID of datasource before operating. # Abort if mismatch - self.invoke_rpc(dse_node.DS_MANAGER_SERVICE_ID, + self.invoke_rpc(base.DS_MANAGER_SERVICE_ID, 'delete_datasource', {'datasource': datasource}, timeout=self.dse_long_timeout) diff --git a/congress/common/config.py b/congress/common/config.py index 4356e0ba3..11c629786 100644 --- a/congress/common/config.py +++ b/congress/common/config.py @@ -25,6 +25,7 @@ from oslo_log import log as logging from oslo_middleware import cors from oslo_policy import opts as policy_opts +from congress.dse2 import dse_node from congress import version LOG = logging.getLogger(__name__) @@ -82,6 +83,9 @@ core_opts = [ # Register the configuration options cfg.CONF.register_opts(core_opts) +# Register dse opts +cfg.CONF.register_opts(dse_node.dse_opts, group='dse') + policy_opts.set_defaults(cfg.CONF, 'policy.json') logging.register_options(cfg.CONF) diff --git a/congress/dse2/datasource_manager.py b/congress/dse2/datasource_manager.py new file mode 100644 index 000000000..2d83d3bd2 --- /dev/null +++ b/congress/dse2/datasource_manager.py @@ -0,0 +1,134 @@ +# Copyright (c) 2016 . All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +from oslo_db import exception as db_exc +from oslo_log import log as logging + +from congress.api import base as api_base +from congress.db import datasources as datasources_db +from congress.dse2 import data_service +from congress import exception +from congress.synchronizer import datasource_synchronizer + +LOG = logging.getLogger(__name__) + + +class DSManagerService(data_service.DataService): + """A proxy service to datasource managing methods in dse_node.""" + def __init__(self, service_id): + super(DSManagerService, self).__init__(service_id) + self.synchronizer = None + self.add_rpc_endpoint(DSManagerEndpoints(self)) + + def register_synchronizer(self): + self.synchronizer = datasource_synchronizer.DatasourceSynchronizer( + self.node) + self.synchronizer.start() + + def start(self): + super(DSManagerService, self).start() + self.register_synchronizer() + + def stop(self): + if self.synchronizer: + self.synchronizer.stop() + super(DSManagerService, self).stop() + + # Note(thread-safety): blocking function + def add_datasource(self, item, deleted=False, update_db=True): + req = self.make_datasource_dict(item) + + # check the request has valid information + self.node.validate_create_datasource(req) + if (len(req['name']) == 0 or req['name'][0] == '_'): + raise exception.InvalidDatasourceName(value=req['name']) + + new_id = req['id'] + LOG.debug("adding datasource %s", req['name']) + if update_db: + LOG.debug("updating db") + try: + # Note(thread-safety): blocking call + datasource = datasources_db.add_datasource( + id_=req['id'], + name=req['name'], + driver=req['driver'], + config=req['config'], + description=req['description'], + enabled=req['enabled']) + except db_exc.DBDuplicateEntry: + raise exception.DatasourceNameInUse(value=req['name']) + except db_exc.DBError: + LOG.exception('Creating a new datasource failed.') + raise exception.DatasourceCreationError(value=req['name']) + + new_id = datasource['id'] + try: + self.synchronizer.sync_datasource(req['name']) + # immediate synch policies on local PE if present + # otherwise wait for regularly scheduled synch + engine = self.node.service_object(api_base.ENGINE_SERVICE_ID) + if engine is not None: + engine.synchronizer.sync_one_policy(req['name']) + # TODO(dse2): also broadcast to all PE nodes to synch + except exception.DataServiceError: + LOG.exception('the datasource service is already ' + 'created in the node') + except Exception: + LOG.exception( + 'Unexpected exception encountered while registering ' + 'new datasource %s.', req['name']) + if update_db: + datasources_db.delete_datasource(req['id']) + msg = ("Datasource service: %s creation fails." % req['name']) + raise exception.DatasourceCreationError(message=msg) + + new_item = dict(item) + new_item['id'] = new_id + return self.node.make_datasource_dict(new_item) + + # Note(thread-safety): blocking function + def delete_datasource(self, datasource, update_db=True): + LOG.debug("Deleting %s datasource ", datasource['name']) + datasource_id = datasource['id'] + if update_db: + # Note(thread-safety): blocking call + result = datasources_db.delete_datasource_with_data(datasource_id) + if not result: + raise exception.DatasourceNotFound(id=datasource_id) + + # Note(thread-safety): blocking call + try: + self.synchronizer.sync_datasource(datasource['name']) + # If local PE exists.. sync + engine = self.node.service_object(api_base.ENGINE_SERVICE_ID) + if engine: + engine.synchronizer.sync_one_policy(datasource['name']) + except Exception: + msg = ('failed to synchronize_datasource after ' + 'deleting datasource: %s' % datasource_id) + LOG.exception(msg) + raise exception.DataServiceError(msg) + + +class DSManagerEndpoints(object): + + def __init__(self, service): + self.ds_manager = service + + def add_datasource(self, context, items): + return self.ds_manager.add_datasource(items) + + def delete_datasource(self, context, datasource): + return self.ds_manager.delete_datasource(datasource) diff --git a/congress/dse2/dse_node.py b/congress/dse2/dse_node.py index 5ea525f7e..c75e7d748 100644 --- a/congress/dse2/dse_node.py +++ b/congress/dse2/dse_node.py @@ -20,10 +20,7 @@ import six import eventlet eventlet.monkey_patch() # for using oslo.messaging w/ eventlet executor -from futurist import periodics -from oslo_concurrency import lockutils from oslo_config import cfg -from oslo_db import exception as db_exc from oslo_log import log as logging import oslo_messaging as messaging from oslo_messaging import exceptions as messaging_exceptions @@ -32,19 +29,17 @@ from oslo_utils import importutils from oslo_utils import strutils from oslo_utils import uuidutils -from congress.api import base as api_base from congress.datalog import compile as datalog_compile from congress.datasources import constants from congress.db import datasources as datasources_db from congress.dse2 import control_bus -from congress.dse2 import data_service from congress import exception LOG = logging.getLogger(__name__) -_dse_opts = [ +dse_opts = [ cfg.StrOpt('bus_id', default='bus', help='Unique ID of this DSE bus'), cfg.IntOpt('ping_timeout', default=5, @@ -63,7 +58,7 @@ _dse_opts = [ help='The number of seconds to retry execute action before ' 'giving up. Zero or negative value means never give up.'), ] -cfg.CONF.register_opts(_dse_opts, group='dse') +# cfg.CONF.register_opts(_dse_opts, group='dse') class DseNode(object): @@ -261,7 +256,6 @@ class DseNode(object): return LOG.info("Stopping DSE node '%s'", self.node_id) - self.stop_datasource_synchronizer() for s in self._services: s.stop() self._rpc_server.stop() @@ -592,98 +586,6 @@ class DseNode(object): results.append(result) return results - # TODO(ekcs): should we start and stop these along with self.{start, stop}? - def start_periodic_tasks(self): - callables = [(self.synchronize, None, {}), - (self._check_resub_all, None, {})] - self.periodic_tasks = periodics.PeriodicWorker(callables) - if self._running: - self.sync_thread = eventlet.spawn_n(self.periodic_tasks.start) - - def stop_datasource_synchronizer(self): - if self.periodic_tasks: - self.periodic_tasks.stop() - self.periodic_tasks.wait() - self.periodic_tasks = None - if self.sync_thread: - eventlet.greenthread.kill(self.sync_thread) - self.sync_thread = None - - @periodics.periodic(spacing=cfg.CONF.datasource_sync_period) - def synchronize(self): - try: - self.synchronize_datasources() - except Exception: - LOG.exception("synchronize_datasources failed") - - @lockutils.synchronized('congress_synchronize_datasources') - def synchronize_datasources(self): - if not cfg.CONF.datasources: - LOG.debug("Skipping datasource synchronization on a " - "non-datasources node") - return - LOG.info("Synchronizing running datasources") - added = 0 - removed = 0 - datasources = self.get_datasources(filter_secret=False) - db_datasources = [] - # Look for datasources in the db, but not in the services. - for configured_ds in datasources: - db_datasources.append(configured_ds['id']) - active_ds = self.service_object(uuid_=configured_ds['id']) - # If datasource is not enabled, unregister the service - if not configured_ds['enabled']: - if active_ds: - LOG.debug("unregistering %s service, datasource disabled " - "in DB.", active_ds.service_id) - self.unregister_service(active_ds.service_id) - removed = removed + 1 - continue - if active_ds is None: - # service is not up, create the service - LOG.info("registering %s service on node %s", - configured_ds['name'], self.node_id) - service = self.create_datasource_service(configured_ds) - self.register_service(service) - added = added + 1 - - # Unregister the services which are not in DB - active_ds_services = [s for s in self._services - if getattr(s, 'type', '') == 'datasource_driver'] - db_datasources_set = set(db_datasources) - stale_services = [s for s in active_ds_services - if s.ds_id not in db_datasources_set] - for s in stale_services: - LOG.debug("unregistering %s service, datasource not found in DB ", - s.service_id) - self.unregister_service(uuid_=s.ds_id) - removed = removed + 1 - - LOG.info("synchronize_datasources, added %d removed %d on node %s", - added, removed, self.node_id) - - # This might be required once we support update datasource config - - # if not self._config_eq(configured_ds, active_ds): - # LOG.debug('configured and active disagree: %s %s', - # strutils.mask_password(active_ds), - # strutils.mask_password(configured_ds)) - - # LOG.info('Reloading datasource: %s', - # strutils.mask_password(configured_ds)) - # self.delete_datasource(configured_ds['name'], - # update_db=False) - # self.add_datasource(configured_ds, update_db=False) - - # def _config_eq(self, db_config, active_config): - # return (db_config['name'] == active_config.service_id and - # db_config['config'] == active_config.service_info['args']) - - @periodics.periodic(spacing=cfg.CONF.dse.time_to_resub) - def _check_resub_all(self): - for s in self._services: - s.check_resub_all() - def delete_missing_driver_datasources(self): removed = 0 for datasource in datasources_db.get_datasources(): @@ -721,59 +623,6 @@ class DseNode(object): if key in fields)) return resource - # Note(thread-safety): blocking function - def add_datasource(self, item, deleted=False, update_db=True): - req = self.make_datasource_dict(item) - - # check the request has valid information - self.validate_create_datasource(req) - if (len(req['name']) == 0 or req['name'][0] == '_'): - raise exception.InvalidDatasourceName(value=req['name']) - - new_id = req['id'] - LOG.debug("adding datasource %s", req['name']) - if update_db: - LOG.debug("updating db") - try: - # Note(thread-safety): blocking call - datasource = datasources_db.add_datasource( - id_=req['id'], - name=req['name'], - driver=req['driver'], - config=req['config'], - description=req['description'], - enabled=req['enabled']) - except db_exc.DBDuplicateEntry: - raise exception.DatasourceNameInUse(value=req['name']) - except db_exc.DBError: - LOG.exception('Creating a new datasource failed.') - raise exception.DatasourceCreationError(value=req['name']) - - new_id = datasource['id'] - try: - self.synchronize_datasources() - # immediate synch policies on local PE if present - # otherwise wait for regularly scheduled synch - engine = self.service_object(api_base.ENGINE_SERVICE_ID) - if engine is not None: - engine.synchronizer.sync_one_policy(req['name']) - # TODO(dse2): also broadcast to all PE nodes to synch - except exception.DataServiceError: - LOG.exception('the datasource service is already ' - 'created in the node') - except Exception: - LOG.exception( - 'Unexpected exception encountered while registering ' - 'new datasource %s.', req['name']) - if update_db: - datasources_db.delete_datasource(req['id']) - msg = ("Datasource service: %s creation fails." % req['name']) - raise exception.DatasourceCreationError(message=msg) - - new_item = dict(item) - new_item['id'] = new_id - return self.make_datasource_dict(new_item) - def validate_create_datasource(self, req): name = req['name'] if not datalog_compile.string_is_servicename(name): @@ -852,29 +701,6 @@ class DseNode(object): raise exception.DataServiceError(msg % class_path) return service - # Note(thread-safety): blocking function - def delete_datasource(self, datasource, update_db=True): - LOG.debug("Deleting %s datasource ", datasource['name']) - datasource_id = datasource['id'] - if update_db: - # Note(thread-safety): blocking call - result = datasources_db.delete_datasource_with_data(datasource_id) - if not result: - raise exception.DatasourceNotFound(id=datasource_id) - - # Note(thread-safety): blocking call - try: - self.synchronize_datasources() - # If local PE exists.. sync - engine = self.service_object(api_base.ENGINE_SERVICE_ID) - if engine: - engine.synchronizer.sync_one_policy(datasource['name']) - except Exception: - msg = ('failed to synchronize_datasource after ' - 'deleting datasource: %s' % datasource_id) - LOG.exception(msg) - raise exception.DataServiceError(msg) - class DseNodeEndpoints (object): """Collection of RPC endpoints that the DseNode exposes on the bus. @@ -907,25 +733,3 @@ class DseNodeEndpoints (object): self.node.service_object(s).receive_data_sequenced( publisher=publisher, table=table, data=data, seqnum=seqnum, is_snapshot=is_snapshot) - - -DS_MANAGER_SERVICE_ID = '_ds_manager' - - -class DSManagerService(data_service.DataService): - """A proxy service to datasource managing methods in dse_node.""" - def __init__(self, service_id): - super(DSManagerService, self).__init__(DS_MANAGER_SERVICE_ID) - self.add_rpc_endpoint(DSManagerEndpoints(self)) - - -class DSManagerEndpoints(object): - - def __init__(self, service): - self.service = service - - def add_datasource(self, context, items): - return self.service.node.add_datasource(items) - - def delete_datasource(self, context, datasource): - return self.service.node.delete_datasource(datasource) diff --git a/congress/harness.py b/congress/harness.py index 30bb56274..58ab00166 100644 --- a/congress/harness.py +++ b/congress/harness.py @@ -35,6 +35,7 @@ from congress.api import status_model from congress.api.system import driver_model from congress.api import table_model from congress.db import datasources as db_datasources +from congress.dse2 import datasource_manager as ds_manager from congress.dse2 import dse_node from congress import exception from congress.policy_engines import agnostic @@ -75,9 +76,9 @@ def create2(node_id=None, bus_id=None, existing_node=None, LOG.info("Registering congress datasource services on node %s", node.node_id) services['datasources'] = create_datasources(node) - node.start_periodic_tasks() - node.register_service( - dse_node.DSManagerService(dse_node.DS_MANAGER_SERVICE_ID)) + services['ds_manager'] = ds_manager.DSManagerService( + api_base.DS_MANAGER_SERVICE_ID) + node.register_service(services['ds_manager']) if policy_engine: LOG.info("Registering congress PolicyEngine service on node %s", diff --git a/congress/synchronizer/datasource_synchronizer.py b/congress/synchronizer/datasource_synchronizer.py new file mode 100644 index 000000000..9d3973251 --- /dev/null +++ b/congress/synchronizer/datasource_synchronizer.py @@ -0,0 +1,137 @@ +# Copyright (c) 2016 NEC Corp. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import eventlet +from futurist import periodics +from oslo_concurrency import lockutils +from oslo_config import cfg +from oslo_log import log as logging + +from congress.db import datasources + +LOG = logging.getLogger(__name__) + +SYNCHRONIZER_SERVICE_ID = '_datasource_synchronizer' + + +class DatasourceSynchronizer(object): + + def __init__(self, node): + self.name = SYNCHRONIZER_SERVICE_ID + self.sync_thread = None + self.periodic_tasks = None + self.node = node + + def start(self): + callables = [(self.synchronize_all_datasources, None, {}), + (self._check_resub_all, None, {})] + self.periodic_tasks = periodics.PeriodicWorker(callables) + self.sync_thread = eventlet.spawn_n(self.periodic_tasks.start) + LOG.info("started datasource synchronizer on node %s", + self.node.node_id) + + def stop(self): + if self.periodic_tasks: + self.periodic_tasks.stop() + self.periodic_tasks.wait() + self.periodic_tasks = None + if self.sync_thread: + eventlet.greenthread.kill(self.sync_thread) + self.sync_thread = None + + @periodics.periodic(spacing=cfg.CONF.dse.time_to_resub) + def _check_resub_all(self): + LOG.info("Running periodic resub on node %s", self.node.node_id) + for s in self.node.get_services(True): + s.check_resub_all() + + @lockutils.synchronized('congress_synchronize_datasources') + def sync_datasource(self, ds_name): + if not cfg.CONF.datasources: + LOG.info("sync not supported on non-datasource node") + return + datasource = datasources.get_datasource_by_name(ds_name) + service_obj = self.node.service_object(ds_name) + + if datasource and not service_obj: + # register service with data node + service = self.node.create_datasource_service(datasource) + self.node.register_service(service) + LOG.info("service %s registered by synchronizer", ds_name) + return + if service_obj and datasource is None: + # unregister, datasource not present in DB + self.node.unregister_service(ds_name) + LOG.info("service %s unregistered by synchronizer", ds_name) + return + + @lockutils.synchronized('congress_synchronize_datasources') + @periodics.periodic(spacing=cfg.CONF.datasource_sync_period) + def synchronize_all_datasources(self): + LOG.info("synchronizing datasources on node %s", self.node.node_id) + added = 0 + removed = 0 + datasources = self.node.get_datasources(filter_secret=False) + db_datasources = [] + # Look for datasources in the db, but not in the services. + for configured_ds in datasources: + db_datasources.append(configured_ds['id']) + active_ds = self.node.service_object(uuid_=configured_ds['id']) + # If datasource is not enabled, unregister the service + if not configured_ds['enabled']: + if active_ds: + LOG.info("unregistering %s service, datasource disabled " + "in DB.", active_ds.service_id) + self.node.unregister_service(active_ds.service_id) + removed = removed + 1 + continue + if active_ds is None: + # service is not up, create the service + LOG.info("registering %s service on node %s", + configured_ds['name'], self.node.node_id) + service = self.node.create_datasource_service(configured_ds) + self.node.register_service(service) + added = added + 1 + + # Unregister the services which are not in DB + active_ds_services = [s for s in self.node.get_services(True) + if getattr(s, 'type', '') == 'datasource_driver'] + db_datasources_set = set(db_datasources) + stale_services = [s for s in active_ds_services + if s.ds_id not in db_datasources_set] + for s in stale_services: + LOG.info("unregistering %s service, datasource not found in DB ", + s.service_id) + self.node.unregister_service(uuid_=s.ds_id) + removed = removed + 1 + + LOG.info("synchronized datasources, added %d removed %d on node %s", + added, removed, self.node.node_id) + + # This might be required once we support update datasource config + # if not self._config_eq(configured_ds, active_ds): + # LOG.debug('configured and active disagree: %s %s', + # strutils.mask_password(active_ds), + # strutils.mask_password(configured_ds)) + + # LOG.info('Reloading datasource: %s', + # strutils.mask_password(configured_ds)) + # self.delete_datasource(configured_ds['name'], + # update_db=False) + # self.add_datasource(configured_ds, update_db=False) + + # def _config_eq(self, db_config, active_config): + # return (db_config['name'] == active_config.service_id and + # db_config['config'] == active_config.service_info['args']) diff --git a/congress/tests/api/base.py b/congress/tests/api/base.py index f6a308007..698a8328e 100644 --- a/congress/tests/api/base.py +++ b/congress/tests/api/base.py @@ -65,6 +65,8 @@ def setup_config(with_fake_datasource=True, node_id='testnode', engine_service = services[api_base.ENGINE_SERVICE_ID] if api: api_service = services['api'] + if datasources: + ds_manager = services['ds_manager'] return {'node': node, 'engine': engine_service, 'data': data, - 'api': api_service} + 'api': api_service, 'ds_manager': ds_manager} diff --git a/congress/tests/api/test_datasource_model.py b/congress/tests/api/test_datasource_model.py index 76d4a7946..4fa15884a 100644 --- a/congress/tests/api/test_datasource_model.py +++ b/congress/tests/api/test_datasource_model.py @@ -37,8 +37,9 @@ class TestDatasourceModel(base.SqlTestCase): self.data = services['data'] self.node = services['node'] self.engine = services['engine'] + self.ds_manager = services['ds_manager'] self.datasource = self._get_datasource_request() - self.node.add_datasource(self.datasource) + self.ds_manager.add_datasource(self.datasource) def tearDown(self): super(TestDatasourceModel, self).tearDown() @@ -61,7 +62,7 @@ class TestDatasourceModel(base.SqlTestCase): self.assertEqual(1, len(dinfo)) datasource2 = self._get_datasource_request() datasource2['name'] = 'datasource2' - self.node.add_datasource(datasource2) + self.ds_manager.add_datasource(datasource2) dinfo = self.datasource_model.get_items(None)['results'] self.assertEqual(2, len(dinfo)) del dinfo[0]['id'] diff --git a/congress/tests/api/test_driver_model.py b/congress/tests/api/test_driver_model.py index 66acf4f85..d20360577 100644 --- a/congress/tests/api/test_driver_model.py +++ b/congress/tests/api/test_driver_model.py @@ -27,8 +27,9 @@ class TestDriverModel(base.SqlTestCase): super(TestDriverModel, self).setUp() services = api_base.setup_config() self.node = services['node'] + self.ds_manager = services['ds_manager'] - self.node.add_datasource(self._get_datasource_request()) + self.ds_manager.add_datasource(self._get_datasource_request()) self.driver_model = services['api']['api-system'] def _get_datasource_request(self): diff --git a/congress/tests/dse2/test_datasource.py b/congress/tests/dse2/test_datasource.py index 3581ecb96..6da7d55a8 100644 --- a/congress/tests/dse2/test_datasource.py +++ b/congress/tests/dse2/test_datasource.py @@ -33,8 +33,10 @@ class TestDataSource(base.SqlTestCase): def setUp(self): super(TestDataSource, self).setUp() - self.dseNode = api_base.setup_config(with_fake_datasource=False, - api=False, policy=False)['node'] + config = api_base.setup_config(with_fake_datasource=False, api=False, + policy=False) + self.dseNode = config['node'] + self.ds_manager = config['ds_manager'] def _get_datasource_request(self): # leave ID out--generated during creation @@ -50,7 +52,7 @@ class TestDataSource(base.SqlTestCase): def test_add_datasource(self): req = self._get_datasource_request() - result = self.dseNode.add_datasource(req) + result = self.ds_manager.add_datasource(req) # test equality of return value except for 'id' field del(result['id']) self.assertEqual(req, result) @@ -70,7 +72,7 @@ class TestDataSource(base.SqlTestCase): req = self._get_datasource_request() self.assertRaises(congressException.DatasourceCreationError, - self.dseNode.add_datasource, req) + self.ds_manager.add_datasource, req) @mock.patch.object(dse_node.DseNode, 'register_service') def test_add_datasource_synchronizer_error(self, register_ds): @@ -78,13 +80,13 @@ class TestDataSource(base.SqlTestCase): req = self._get_datasource_request() self.assertRaises(congressException.DatasourceCreationError, - self.dseNode.add_datasource, req) + self.ds_manager.add_datasource, req) ds = datasource_db.get_datasource_by_name(req['name']) self.assertIsNone(ds) def test_get_datasource(self): req = self._get_datasource_request() - ds = self.dseNode.add_datasource(req) + ds = self.ds_manager.add_datasource(req) result = self.dseNode.get_datasource(ds['id']) # test equality except for 'id' field del(result['id']) @@ -92,7 +94,7 @@ class TestDataSource(base.SqlTestCase): def test_get_datasources(self): req = self._get_datasource_request() - self.dseNode.add_datasource(req) + self.ds_manager.add_datasource(req) result = self.dseNode.get_datasources() self.assertEqual(len(result), 1) result = result[0] @@ -103,10 +105,10 @@ class TestDataSource(base.SqlTestCase): def test_get_datasources2(self): req1 = self._get_datasource_request() req1['name'] = 'datasource1' - result1 = self.dseNode.add_datasource(req1) + result1 = self.ds_manager.add_datasource(req1) req2 = self._get_datasource_request() req2['name'] = 'datasource2' - result2 = self.dseNode.add_datasource(req2) + result2 = self.ds_manager.add_datasource(req2) # check results of add_datasource for key, value in req1.items(): self.assertEqual(value, result1[key]) @@ -132,7 +134,7 @@ class TestDataSource(base.SqlTestCase): def test_get_datasources_hide_secret(self): req = self._get_datasource_request() - self.dseNode.add_datasource(req) + self.ds_manager.add_datasource(req) result = self.dseNode.get_datasources(filter_secret=True) result = result[0] # check equality except that 'config'/'password' is hidden @@ -142,14 +144,14 @@ class TestDataSource(base.SqlTestCase): def test_create_datasource_duplicate_name(self): req = self._get_datasource_request() - self.dseNode.add_datasource(req) + self.ds_manager.add_datasource(req) self.assertRaises(congressException.DatasourceNameInUse, - self.dseNode.add_datasource, req) + self.ds_manager.add_datasource, req) def test_delete_datasource(self): req = self._get_datasource_request() - result = self.dseNode.add_datasource(req) - self.dseNode.delete_datasource(result) + result = self.ds_manager.add_datasource(req) + self.ds_manager.delete_datasource(result) # check that service is actually deleted services = self.dseNode.get_services() self.assertEqual(len(services), 0) @@ -183,7 +185,7 @@ class TestDataSource(base.SqlTestCase): req = self._get_datasource_request() req['id'] = 'fake-id' self.assertRaises(congressException.DatasourceNotFound, - self.dseNode.delete_datasource, req) + self.ds_manager.delete_datasource, req) # TODO(dse2): Doesn't seem like we need this (or it will be moved to API). # def test_get_driver_schema(self): diff --git a/congress/tests/dse2/test_dse2.py b/congress/tests/dse2/test_dse2.py index 02b9586e5..f3f16a0cd 100644 --- a/congress/tests/dse2/test_dse2.py +++ b/congress/tests/dse2/test_dse2.py @@ -30,6 +30,7 @@ from congress.datalog import compile from congress.datasources import nova_driver from congress import exception as congressException from congress.policy_engines import agnostic +from congress.tests.api import base as test_api_base from congress.tests import base from congress.tests import fake_datasource from congress.tests import helper @@ -245,12 +246,14 @@ class TestDSE(base.TestCase): node.stop() def test_auto_resub(self): - node = helper.make_dsenode_new_partition('testnode') + config = test_api_base.setup_config(with_fake_datasource=False, + api=False, policy=False) + node = config['node'] + config['ds_manager'].synchronizer.start() sub = fake_datasource.FakeDataSource('sub') pub = fake_datasource.FakeDataSource('pub') node.register_service(sub) node.register_service(pub) - node.start_periodic_tasks() sub.subscribe('pub', 'p') helper.retry_check_function_return_value( diff --git a/congress/tests/dse2/test_dse_node.py b/congress/tests/dse2/test_dse_node.py index 7835da6d1..e14960ccb 100644 --- a/congress/tests/dse2/test_dse_node.py +++ b/congress/tests/dse2/test_dse_node.py @@ -20,7 +20,7 @@ from oslo_config import cfg from oslo_messaging import conffixture from congress.dse2 import data_service -from congress.dse2 import dse_node +from congress.dse2 import datasource_manager as ds_manager from congress.tests import base from congress.tests import helper @@ -302,31 +302,31 @@ class TestDSManagerService(base.TestCase): super(TestDSManagerService, self).setUp() def test_ds_manager_endpoints_add_ds(self): - ds_manager_service = dse_node.DSManagerService('test_mgr') + ds_manager_service = ds_manager.DSManagerService('test_mgr') node_mock = mock.MagicMock() - node_mock.add_datasource = mock.MagicMock() - node_mock.add_datasource.return_value = 'add_datasource' + ds_manager_service.add_datasource = mock.MagicMock() + ds_manager_service.add_datasource.return_value = 'add_datasource' ds_manager_service.node = node_mock - endpoints = dse_node.DSManagerEndpoints(ds_manager_service) + endpoints = ds_manager.DSManagerEndpoints(ds_manager_service) expect_ret = 'add_datasource' self.assertEqual(expect_ret, endpoints.add_datasource('context', {})) - node_mock.add_datasource.assert_called_with({}) + ds_manager_service.add_datasource.assert_called_with({}) def test_ds_manager_endpoints_delete_ds(self): - ds_manager_service = dse_node.DSManagerService('test_mgr') + ds_manager_service = ds_manager.DSManagerService('test_mgr') node_mock = mock.MagicMock() - node_mock.delete_datasource = mock.MagicMock() - node_mock.delete_datasource.return_value = 'delete_datasource' + ds_manager_service.delete_datasource = mock.MagicMock() + ds_manager_service.delete_datasource.return_value = 'delete_datasource' ds_manager_service.node = node_mock - endpoints = dse_node.DSManagerEndpoints(ds_manager_service) + endpoints = ds_manager.DSManagerEndpoints(ds_manager_service) expect_ret = 'delete_datasource' self.assertEqual(expect_ret, endpoints.delete_datasource('context', 'ds-id')) - node_mock.delete_datasource.assert_called_with('ds-id') + ds_manager_service.delete_datasource.assert_called_with('ds-id') # Leave this to make manual testing with RabbitMQ easy