From 1d3f4bfb50aa6b9b194aa7de0eb8f1eeeabfa5e2 Mon Sep 17 00:00:00 2001 From: Anusha Ramineni Date: Fri, 25 Nov 2016 14:59:49 +0530 Subject: [PATCH] Refactor datasource synchronizer This commit adds the following : 1. Move datasource_synchronizer to seperate class 2. Moved add_datasource and delete_datasource to DSManagerService class TODO(ramineni): move other datasource management related operations to DSManagerService class 3. Add support to sync single datasource 4. synchronizer to be started as part of DSManagerService to avoid running synchronizer on non-datasource node. Partially-Implements blueprint refactor-synchronizer Change-Id: I643a0c299695a794469553cc5fb73e8d1bceec7d --- congress/api/base.py | 1 + congress/api/datasource_model.py | 5 +- congress/common/config.py | 4 + congress/dse2/datasource_manager.py | 134 ++++++++++++ congress/dse2/dse_node.py | 200 +----------------- congress/harness.py | 7 +- .../synchronizer/datasource_synchronizer.py | 137 ++++++++++++ congress/tests/api/base.py | 4 +- congress/tests/api/test_datasource_model.py | 5 +- congress/tests/api/test_driver_model.py | 3 +- congress/tests/dse2/test_datasource.py | 32 +-- congress/tests/dse2/test_dse2.py | 7 +- congress/tests/dse2/test_dse_node.py | 22 +- 13 files changed, 325 insertions(+), 236 deletions(-) create mode 100644 congress/dse2/datasource_manager.py create mode 100644 congress/synchronizer/datasource_synchronizer.py diff --git a/congress/api/base.py b/congress/api/base.py index c76e71c66..3c6ffc8b3 100644 --- a/congress/api/base.py +++ b/congress/api/base.py @@ -20,6 +20,7 @@ from __future__ import absolute_import from oslo_config import cfg ENGINE_SERVICE_ID = '__engine' +DS_MANAGER_SERVICE_ID = '_ds_manager' class APIModel(object): diff --git a/congress/api/datasource_model.py b/congress/api/datasource_model.py index 6b8e7fb77..5cd79bebd 100644 --- a/congress/api/datasource_model.py +++ b/congress/api/datasource_model.py @@ -25,7 +25,6 @@ from congress.api import api_utils from congress.api import base from congress.api import error_codes from congress.api import webservice -from congress.dse2 import dse_node from congress import exception LOG = logging.getLogger(__name__) @@ -86,7 +85,7 @@ class DatasourceModel(base.APIModel): obj = None try: # Note(thread-safety): blocking call - obj = self.invoke_rpc(dse_node.DS_MANAGER_SERVICE_ID, + obj = self.invoke_rpc(base.DS_MANAGER_SERVICE_ID, 'add_datasource', {'items': item}, timeout=self.dse_long_timeout) @@ -114,7 +113,7 @@ class DatasourceModel(base.APIModel): # delete a different datasource # Fix: check UUID of datasource before operating. # Abort if mismatch - self.invoke_rpc(dse_node.DS_MANAGER_SERVICE_ID, + self.invoke_rpc(base.DS_MANAGER_SERVICE_ID, 'delete_datasource', {'datasource': datasource}, timeout=self.dse_long_timeout) diff --git a/congress/common/config.py b/congress/common/config.py index 4356e0ba3..11c629786 100644 --- a/congress/common/config.py +++ b/congress/common/config.py @@ -25,6 +25,7 @@ from oslo_log import log as logging from oslo_middleware import cors from oslo_policy import opts as policy_opts +from congress.dse2 import dse_node from congress import version LOG = logging.getLogger(__name__) @@ -82,6 +83,9 @@ core_opts = [ # Register the configuration options cfg.CONF.register_opts(core_opts) +# Register dse opts +cfg.CONF.register_opts(dse_node.dse_opts, group='dse') + policy_opts.set_defaults(cfg.CONF, 'policy.json') logging.register_options(cfg.CONF) diff --git a/congress/dse2/datasource_manager.py b/congress/dse2/datasource_manager.py new file mode 100644 index 000000000..2d83d3bd2 --- /dev/null +++ b/congress/dse2/datasource_manager.py @@ -0,0 +1,134 @@ +# Copyright (c) 2016 . All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +from oslo_db import exception as db_exc +from oslo_log import log as logging + +from congress.api import base as api_base +from congress.db import datasources as datasources_db +from congress.dse2 import data_service +from congress import exception +from congress.synchronizer import datasource_synchronizer + +LOG = logging.getLogger(__name__) + + +class DSManagerService(data_service.DataService): + """A proxy service to datasource managing methods in dse_node.""" + def __init__(self, service_id): + super(DSManagerService, self).__init__(service_id) + self.synchronizer = None + self.add_rpc_endpoint(DSManagerEndpoints(self)) + + def register_synchronizer(self): + self.synchronizer = datasource_synchronizer.DatasourceSynchronizer( + self.node) + self.synchronizer.start() + + def start(self): + super(DSManagerService, self).start() + self.register_synchronizer() + + def stop(self): + if self.synchronizer: + self.synchronizer.stop() + super(DSManagerService, self).stop() + + # Note(thread-safety): blocking function + def add_datasource(self, item, deleted=False, update_db=True): + req = self.make_datasource_dict(item) + + # check the request has valid information + self.node.validate_create_datasource(req) + if (len(req['name']) == 0 or req['name'][0] == '_'): + raise exception.InvalidDatasourceName(value=req['name']) + + new_id = req['id'] + LOG.debug("adding datasource %s", req['name']) + if update_db: + LOG.debug("updating db") + try: + # Note(thread-safety): blocking call + datasource = datasources_db.add_datasource( + id_=req['id'], + name=req['name'], + driver=req['driver'], + config=req['config'], + description=req['description'], + enabled=req['enabled']) + except db_exc.DBDuplicateEntry: + raise exception.DatasourceNameInUse(value=req['name']) + except db_exc.DBError: + LOG.exception('Creating a new datasource failed.') + raise exception.DatasourceCreationError(value=req['name']) + + new_id = datasource['id'] + try: + self.synchronizer.sync_datasource(req['name']) + # immediate synch policies on local PE if present + # otherwise wait for regularly scheduled synch + engine = self.node.service_object(api_base.ENGINE_SERVICE_ID) + if engine is not None: + engine.synchronizer.sync_one_policy(req['name']) + # TODO(dse2): also broadcast to all PE nodes to synch + except exception.DataServiceError: + LOG.exception('the datasource service is already ' + 'created in the node') + except Exception: + LOG.exception( + 'Unexpected exception encountered while registering ' + 'new datasource %s.', req['name']) + if update_db: + datasources_db.delete_datasource(req['id']) + msg = ("Datasource service: %s creation fails." % req['name']) + raise exception.DatasourceCreationError(message=msg) + + new_item = dict(item) + new_item['id'] = new_id + return self.node.make_datasource_dict(new_item) + + # Note(thread-safety): blocking function + def delete_datasource(self, datasource, update_db=True): + LOG.debug("Deleting %s datasource ", datasource['name']) + datasource_id = datasource['id'] + if update_db: + # Note(thread-safety): blocking call + result = datasources_db.delete_datasource_with_data(datasource_id) + if not result: + raise exception.DatasourceNotFound(id=datasource_id) + + # Note(thread-safety): blocking call + try: + self.synchronizer.sync_datasource(datasource['name']) + # If local PE exists.. sync + engine = self.node.service_object(api_base.ENGINE_SERVICE_ID) + if engine: + engine.synchronizer.sync_one_policy(datasource['name']) + except Exception: + msg = ('failed to synchronize_datasource after ' + 'deleting datasource: %s' % datasource_id) + LOG.exception(msg) + raise exception.DataServiceError(msg) + + +class DSManagerEndpoints(object): + + def __init__(self, service): + self.ds_manager = service + + def add_datasource(self, context, items): + return self.ds_manager.add_datasource(items) + + def delete_datasource(self, context, datasource): + return self.ds_manager.delete_datasource(datasource) diff --git a/congress/dse2/dse_node.py b/congress/dse2/dse_node.py index 5ea525f7e..c75e7d748 100644 --- a/congress/dse2/dse_node.py +++ b/congress/dse2/dse_node.py @@ -20,10 +20,7 @@ import six import eventlet eventlet.monkey_patch() # for using oslo.messaging w/ eventlet executor -from futurist import periodics -from oslo_concurrency import lockutils from oslo_config import cfg -from oslo_db import exception as db_exc from oslo_log import log as logging import oslo_messaging as messaging from oslo_messaging import exceptions as messaging_exceptions @@ -32,19 +29,17 @@ from oslo_utils import importutils from oslo_utils import strutils from oslo_utils import uuidutils -from congress.api import base as api_base from congress.datalog import compile as datalog_compile from congress.datasources import constants from congress.db import datasources as datasources_db from congress.dse2 import control_bus -from congress.dse2 import data_service from congress import exception LOG = logging.getLogger(__name__) -_dse_opts = [ +dse_opts = [ cfg.StrOpt('bus_id', default='bus', help='Unique ID of this DSE bus'), cfg.IntOpt('ping_timeout', default=5, @@ -63,7 +58,7 @@ _dse_opts = [ help='The number of seconds to retry execute action before ' 'giving up. Zero or negative value means never give up.'), ] -cfg.CONF.register_opts(_dse_opts, group='dse') +# cfg.CONF.register_opts(_dse_opts, group='dse') class DseNode(object): @@ -261,7 +256,6 @@ class DseNode(object): return LOG.info("Stopping DSE node '%s'", self.node_id) - self.stop_datasource_synchronizer() for s in self._services: s.stop() self._rpc_server.stop() @@ -592,98 +586,6 @@ class DseNode(object): results.append(result) return results - # TODO(ekcs): should we start and stop these along with self.{start, stop}? - def start_periodic_tasks(self): - callables = [(self.synchronize, None, {}), - (self._check_resub_all, None, {})] - self.periodic_tasks = periodics.PeriodicWorker(callables) - if self._running: - self.sync_thread = eventlet.spawn_n(self.periodic_tasks.start) - - def stop_datasource_synchronizer(self): - if self.periodic_tasks: - self.periodic_tasks.stop() - self.periodic_tasks.wait() - self.periodic_tasks = None - if self.sync_thread: - eventlet.greenthread.kill(self.sync_thread) - self.sync_thread = None - - @periodics.periodic(spacing=cfg.CONF.datasource_sync_period) - def synchronize(self): - try: - self.synchronize_datasources() - except Exception: - LOG.exception("synchronize_datasources failed") - - @lockutils.synchronized('congress_synchronize_datasources') - def synchronize_datasources(self): - if not cfg.CONF.datasources: - LOG.debug("Skipping datasource synchronization on a " - "non-datasources node") - return - LOG.info("Synchronizing running datasources") - added = 0 - removed = 0 - datasources = self.get_datasources(filter_secret=False) - db_datasources = [] - # Look for datasources in the db, but not in the services. - for configured_ds in datasources: - db_datasources.append(configured_ds['id']) - active_ds = self.service_object(uuid_=configured_ds['id']) - # If datasource is not enabled, unregister the service - if not configured_ds['enabled']: - if active_ds: - LOG.debug("unregistering %s service, datasource disabled " - "in DB.", active_ds.service_id) - self.unregister_service(active_ds.service_id) - removed = removed + 1 - continue - if active_ds is None: - # service is not up, create the service - LOG.info("registering %s service on node %s", - configured_ds['name'], self.node_id) - service = self.create_datasource_service(configured_ds) - self.register_service(service) - added = added + 1 - - # Unregister the services which are not in DB - active_ds_services = [s for s in self._services - if getattr(s, 'type', '') == 'datasource_driver'] - db_datasources_set = set(db_datasources) - stale_services = [s for s in active_ds_services - if s.ds_id not in db_datasources_set] - for s in stale_services: - LOG.debug("unregistering %s service, datasource not found in DB ", - s.service_id) - self.unregister_service(uuid_=s.ds_id) - removed = removed + 1 - - LOG.info("synchronize_datasources, added %d removed %d on node %s", - added, removed, self.node_id) - - # This might be required once we support update datasource config - - # if not self._config_eq(configured_ds, active_ds): - # LOG.debug('configured and active disagree: %s %s', - # strutils.mask_password(active_ds), - # strutils.mask_password(configured_ds)) - - # LOG.info('Reloading datasource: %s', - # strutils.mask_password(configured_ds)) - # self.delete_datasource(configured_ds['name'], - # update_db=False) - # self.add_datasource(configured_ds, update_db=False) - - # def _config_eq(self, db_config, active_config): - # return (db_config['name'] == active_config.service_id and - # db_config['config'] == active_config.service_info['args']) - - @periodics.periodic(spacing=cfg.CONF.dse.time_to_resub) - def _check_resub_all(self): - for s in self._services: - s.check_resub_all() - def delete_missing_driver_datasources(self): removed = 0 for datasource in datasources_db.get_datasources(): @@ -721,59 +623,6 @@ class DseNode(object): if key in fields)) return resource - # Note(thread-safety): blocking function - def add_datasource(self, item, deleted=False, update_db=True): - req = self.make_datasource_dict(item) - - # check the request has valid information - self.validate_create_datasource(req) - if (len(req['name']) == 0 or req['name'][0] == '_'): - raise exception.InvalidDatasourceName(value=req['name']) - - new_id = req['id'] - LOG.debug("adding datasource %s", req['name']) - if update_db: - LOG.debug("updating db") - try: - # Note(thread-safety): blocking call - datasource = datasources_db.add_datasource( - id_=req['id'], - name=req['name'], - driver=req['driver'], - config=req['config'], - description=req['description'], - enabled=req['enabled']) - except db_exc.DBDuplicateEntry: - raise exception.DatasourceNameInUse(value=req['name']) - except db_exc.DBError: - LOG.exception('Creating a new datasource failed.') - raise exception.DatasourceCreationError(value=req['name']) - - new_id = datasource['id'] - try: - self.synchronize_datasources() - # immediate synch policies on local PE if present - # otherwise wait for regularly scheduled synch - engine = self.service_object(api_base.ENGINE_SERVICE_ID) - if engine is not None: - engine.synchronizer.sync_one_policy(req['name']) - # TODO(dse2): also broadcast to all PE nodes to synch - except exception.DataServiceError: - LOG.exception('the datasource service is already ' - 'created in the node') - except Exception: - LOG.exception( - 'Unexpected exception encountered while registering ' - 'new datasource %s.', req['name']) - if update_db: - datasources_db.delete_datasource(req['id']) - msg = ("Datasource service: %s creation fails." % req['name']) - raise exception.DatasourceCreationError(message=msg) - - new_item = dict(item) - new_item['id'] = new_id - return self.make_datasource_dict(new_item) - def validate_create_datasource(self, req): name = req['name'] if not datalog_compile.string_is_servicename(name): @@ -852,29 +701,6 @@ class DseNode(object): raise exception.DataServiceError(msg % class_path) return service - # Note(thread-safety): blocking function - def delete_datasource(self, datasource, update_db=True): - LOG.debug("Deleting %s datasource ", datasource['name']) - datasource_id = datasource['id'] - if update_db: - # Note(thread-safety): blocking call - result = datasources_db.delete_datasource_with_data(datasource_id) - if not result: - raise exception.DatasourceNotFound(id=datasource_id) - - # Note(thread-safety): blocking call - try: - self.synchronize_datasources() - # If local PE exists.. sync - engine = self.service_object(api_base.ENGINE_SERVICE_ID) - if engine: - engine.synchronizer.sync_one_policy(datasource['name']) - except Exception: - msg = ('failed to synchronize_datasource after ' - 'deleting datasource: %s' % datasource_id) - LOG.exception(msg) - raise exception.DataServiceError(msg) - class DseNodeEndpoints (object): """Collection of RPC endpoints that the DseNode exposes on the bus. @@ -907,25 +733,3 @@ class DseNodeEndpoints (object): self.node.service_object(s).receive_data_sequenced( publisher=publisher, table=table, data=data, seqnum=seqnum, is_snapshot=is_snapshot) - - -DS_MANAGER_SERVICE_ID = '_ds_manager' - - -class DSManagerService(data_service.DataService): - """A proxy service to datasource managing methods in dse_node.""" - def __init__(self, service_id): - super(DSManagerService, self).__init__(DS_MANAGER_SERVICE_ID) - self.add_rpc_endpoint(DSManagerEndpoints(self)) - - -class DSManagerEndpoints(object): - - def __init__(self, service): - self.service = service - - def add_datasource(self, context, items): - return self.service.node.add_datasource(items) - - def delete_datasource(self, context, datasource): - return self.service.node.delete_datasource(datasource) diff --git a/congress/harness.py b/congress/harness.py index 30bb56274..58ab00166 100644 --- a/congress/harness.py +++ b/congress/harness.py @@ -35,6 +35,7 @@ from congress.api import status_model from congress.api.system import driver_model from congress.api import table_model from congress.db import datasources as db_datasources +from congress.dse2 import datasource_manager as ds_manager from congress.dse2 import dse_node from congress import exception from congress.policy_engines import agnostic @@ -75,9 +76,9 @@ def create2(node_id=None, bus_id=None, existing_node=None, LOG.info("Registering congress datasource services on node %s", node.node_id) services['datasources'] = create_datasources(node) - node.start_periodic_tasks() - node.register_service( - dse_node.DSManagerService(dse_node.DS_MANAGER_SERVICE_ID)) + services['ds_manager'] = ds_manager.DSManagerService( + api_base.DS_MANAGER_SERVICE_ID) + node.register_service(services['ds_manager']) if policy_engine: LOG.info("Registering congress PolicyEngine service on node %s", diff --git a/congress/synchronizer/datasource_synchronizer.py b/congress/synchronizer/datasource_synchronizer.py new file mode 100644 index 000000000..9d3973251 --- /dev/null +++ b/congress/synchronizer/datasource_synchronizer.py @@ -0,0 +1,137 @@ +# Copyright (c) 2016 NEC Corp. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import eventlet +from futurist import periodics +from oslo_concurrency import lockutils +from oslo_config import cfg +from oslo_log import log as logging + +from congress.db import datasources + +LOG = logging.getLogger(__name__) + +SYNCHRONIZER_SERVICE_ID = '_datasource_synchronizer' + + +class DatasourceSynchronizer(object): + + def __init__(self, node): + self.name = SYNCHRONIZER_SERVICE_ID + self.sync_thread = None + self.periodic_tasks = None + self.node = node + + def start(self): + callables = [(self.synchronize_all_datasources, None, {}), + (self._check_resub_all, None, {})] + self.periodic_tasks = periodics.PeriodicWorker(callables) + self.sync_thread = eventlet.spawn_n(self.periodic_tasks.start) + LOG.info("started datasource synchronizer on node %s", + self.node.node_id) + + def stop(self): + if self.periodic_tasks: + self.periodic_tasks.stop() + self.periodic_tasks.wait() + self.periodic_tasks = None + if self.sync_thread: + eventlet.greenthread.kill(self.sync_thread) + self.sync_thread = None + + @periodics.periodic(spacing=cfg.CONF.dse.time_to_resub) + def _check_resub_all(self): + LOG.info("Running periodic resub on node %s", self.node.node_id) + for s in self.node.get_services(True): + s.check_resub_all() + + @lockutils.synchronized('congress_synchronize_datasources') + def sync_datasource(self, ds_name): + if not cfg.CONF.datasources: + LOG.info("sync not supported on non-datasource node") + return + datasource = datasources.get_datasource_by_name(ds_name) + service_obj = self.node.service_object(ds_name) + + if datasource and not service_obj: + # register service with data node + service = self.node.create_datasource_service(datasource) + self.node.register_service(service) + LOG.info("service %s registered by synchronizer", ds_name) + return + if service_obj and datasource is None: + # unregister, datasource not present in DB + self.node.unregister_service(ds_name) + LOG.info("service %s unregistered by synchronizer", ds_name) + return + + @lockutils.synchronized('congress_synchronize_datasources') + @periodics.periodic(spacing=cfg.CONF.datasource_sync_period) + def synchronize_all_datasources(self): + LOG.info("synchronizing datasources on node %s", self.node.node_id) + added = 0 + removed = 0 + datasources = self.node.get_datasources(filter_secret=False) + db_datasources = [] + # Look for datasources in the db, but not in the services. + for configured_ds in datasources: + db_datasources.append(configured_ds['id']) + active_ds = self.node.service_object(uuid_=configured_ds['id']) + # If datasource is not enabled, unregister the service + if not configured_ds['enabled']: + if active_ds: + LOG.info("unregistering %s service, datasource disabled " + "in DB.", active_ds.service_id) + self.node.unregister_service(active_ds.service_id) + removed = removed + 1 + continue + if active_ds is None: + # service is not up, create the service + LOG.info("registering %s service on node %s", + configured_ds['name'], self.node.node_id) + service = self.node.create_datasource_service(configured_ds) + self.node.register_service(service) + added = added + 1 + + # Unregister the services which are not in DB + active_ds_services = [s for s in self.node.get_services(True) + if getattr(s, 'type', '') == 'datasource_driver'] + db_datasources_set = set(db_datasources) + stale_services = [s for s in active_ds_services + if s.ds_id not in db_datasources_set] + for s in stale_services: + LOG.info("unregistering %s service, datasource not found in DB ", + s.service_id) + self.node.unregister_service(uuid_=s.ds_id) + removed = removed + 1 + + LOG.info("synchronized datasources, added %d removed %d on node %s", + added, removed, self.node.node_id) + + # This might be required once we support update datasource config + # if not self._config_eq(configured_ds, active_ds): + # LOG.debug('configured and active disagree: %s %s', + # strutils.mask_password(active_ds), + # strutils.mask_password(configured_ds)) + + # LOG.info('Reloading datasource: %s', + # strutils.mask_password(configured_ds)) + # self.delete_datasource(configured_ds['name'], + # update_db=False) + # self.add_datasource(configured_ds, update_db=False) + + # def _config_eq(self, db_config, active_config): + # return (db_config['name'] == active_config.service_id and + # db_config['config'] == active_config.service_info['args']) diff --git a/congress/tests/api/base.py b/congress/tests/api/base.py index f6a308007..698a8328e 100644 --- a/congress/tests/api/base.py +++ b/congress/tests/api/base.py @@ -65,6 +65,8 @@ def setup_config(with_fake_datasource=True, node_id='testnode', engine_service = services[api_base.ENGINE_SERVICE_ID] if api: api_service = services['api'] + if datasources: + ds_manager = services['ds_manager'] return {'node': node, 'engine': engine_service, 'data': data, - 'api': api_service} + 'api': api_service, 'ds_manager': ds_manager} diff --git a/congress/tests/api/test_datasource_model.py b/congress/tests/api/test_datasource_model.py index 76d4a7946..4fa15884a 100644 --- a/congress/tests/api/test_datasource_model.py +++ b/congress/tests/api/test_datasource_model.py @@ -37,8 +37,9 @@ class TestDatasourceModel(base.SqlTestCase): self.data = services['data'] self.node = services['node'] self.engine = services['engine'] + self.ds_manager = services['ds_manager'] self.datasource = self._get_datasource_request() - self.node.add_datasource(self.datasource) + self.ds_manager.add_datasource(self.datasource) def tearDown(self): super(TestDatasourceModel, self).tearDown() @@ -61,7 +62,7 @@ class TestDatasourceModel(base.SqlTestCase): self.assertEqual(1, len(dinfo)) datasource2 = self._get_datasource_request() datasource2['name'] = 'datasource2' - self.node.add_datasource(datasource2) + self.ds_manager.add_datasource(datasource2) dinfo = self.datasource_model.get_items(None)['results'] self.assertEqual(2, len(dinfo)) del dinfo[0]['id'] diff --git a/congress/tests/api/test_driver_model.py b/congress/tests/api/test_driver_model.py index 66acf4f85..d20360577 100644 --- a/congress/tests/api/test_driver_model.py +++ b/congress/tests/api/test_driver_model.py @@ -27,8 +27,9 @@ class TestDriverModel(base.SqlTestCase): super(TestDriverModel, self).setUp() services = api_base.setup_config() self.node = services['node'] + self.ds_manager = services['ds_manager'] - self.node.add_datasource(self._get_datasource_request()) + self.ds_manager.add_datasource(self._get_datasource_request()) self.driver_model = services['api']['api-system'] def _get_datasource_request(self): diff --git a/congress/tests/dse2/test_datasource.py b/congress/tests/dse2/test_datasource.py index 3581ecb96..6da7d55a8 100644 --- a/congress/tests/dse2/test_datasource.py +++ b/congress/tests/dse2/test_datasource.py @@ -33,8 +33,10 @@ class TestDataSource(base.SqlTestCase): def setUp(self): super(TestDataSource, self).setUp() - self.dseNode = api_base.setup_config(with_fake_datasource=False, - api=False, policy=False)['node'] + config = api_base.setup_config(with_fake_datasource=False, api=False, + policy=False) + self.dseNode = config['node'] + self.ds_manager = config['ds_manager'] def _get_datasource_request(self): # leave ID out--generated during creation @@ -50,7 +52,7 @@ class TestDataSource(base.SqlTestCase): def test_add_datasource(self): req = self._get_datasource_request() - result = self.dseNode.add_datasource(req) + result = self.ds_manager.add_datasource(req) # test equality of return value except for 'id' field del(result['id']) self.assertEqual(req, result) @@ -70,7 +72,7 @@ class TestDataSource(base.SqlTestCase): req = self._get_datasource_request() self.assertRaises(congressException.DatasourceCreationError, - self.dseNode.add_datasource, req) + self.ds_manager.add_datasource, req) @mock.patch.object(dse_node.DseNode, 'register_service') def test_add_datasource_synchronizer_error(self, register_ds): @@ -78,13 +80,13 @@ class TestDataSource(base.SqlTestCase): req = self._get_datasource_request() self.assertRaises(congressException.DatasourceCreationError, - self.dseNode.add_datasource, req) + self.ds_manager.add_datasource, req) ds = datasource_db.get_datasource_by_name(req['name']) self.assertIsNone(ds) def test_get_datasource(self): req = self._get_datasource_request() - ds = self.dseNode.add_datasource(req) + ds = self.ds_manager.add_datasource(req) result = self.dseNode.get_datasource(ds['id']) # test equality except for 'id' field del(result['id']) @@ -92,7 +94,7 @@ class TestDataSource(base.SqlTestCase): def test_get_datasources(self): req = self._get_datasource_request() - self.dseNode.add_datasource(req) + self.ds_manager.add_datasource(req) result = self.dseNode.get_datasources() self.assertEqual(len(result), 1) result = result[0] @@ -103,10 +105,10 @@ class TestDataSource(base.SqlTestCase): def test_get_datasources2(self): req1 = self._get_datasource_request() req1['name'] = 'datasource1' - result1 = self.dseNode.add_datasource(req1) + result1 = self.ds_manager.add_datasource(req1) req2 = self._get_datasource_request() req2['name'] = 'datasource2' - result2 = self.dseNode.add_datasource(req2) + result2 = self.ds_manager.add_datasource(req2) # check results of add_datasource for key, value in req1.items(): self.assertEqual(value, result1[key]) @@ -132,7 +134,7 @@ class TestDataSource(base.SqlTestCase): def test_get_datasources_hide_secret(self): req = self._get_datasource_request() - self.dseNode.add_datasource(req) + self.ds_manager.add_datasource(req) result = self.dseNode.get_datasources(filter_secret=True) result = result[0] # check equality except that 'config'/'password' is hidden @@ -142,14 +144,14 @@ class TestDataSource(base.SqlTestCase): def test_create_datasource_duplicate_name(self): req = self._get_datasource_request() - self.dseNode.add_datasource(req) + self.ds_manager.add_datasource(req) self.assertRaises(congressException.DatasourceNameInUse, - self.dseNode.add_datasource, req) + self.ds_manager.add_datasource, req) def test_delete_datasource(self): req = self._get_datasource_request() - result = self.dseNode.add_datasource(req) - self.dseNode.delete_datasource(result) + result = self.ds_manager.add_datasource(req) + self.ds_manager.delete_datasource(result) # check that service is actually deleted services = self.dseNode.get_services() self.assertEqual(len(services), 0) @@ -183,7 +185,7 @@ class TestDataSource(base.SqlTestCase): req = self._get_datasource_request() req['id'] = 'fake-id' self.assertRaises(congressException.DatasourceNotFound, - self.dseNode.delete_datasource, req) + self.ds_manager.delete_datasource, req) # TODO(dse2): Doesn't seem like we need this (or it will be moved to API). # def test_get_driver_schema(self): diff --git a/congress/tests/dse2/test_dse2.py b/congress/tests/dse2/test_dse2.py index 02b9586e5..f3f16a0cd 100644 --- a/congress/tests/dse2/test_dse2.py +++ b/congress/tests/dse2/test_dse2.py @@ -30,6 +30,7 @@ from congress.datalog import compile from congress.datasources import nova_driver from congress import exception as congressException from congress.policy_engines import agnostic +from congress.tests.api import base as test_api_base from congress.tests import base from congress.tests import fake_datasource from congress.tests import helper @@ -245,12 +246,14 @@ class TestDSE(base.TestCase): node.stop() def test_auto_resub(self): - node = helper.make_dsenode_new_partition('testnode') + config = test_api_base.setup_config(with_fake_datasource=False, + api=False, policy=False) + node = config['node'] + config['ds_manager'].synchronizer.start() sub = fake_datasource.FakeDataSource('sub') pub = fake_datasource.FakeDataSource('pub') node.register_service(sub) node.register_service(pub) - node.start_periodic_tasks() sub.subscribe('pub', 'p') helper.retry_check_function_return_value( diff --git a/congress/tests/dse2/test_dse_node.py b/congress/tests/dse2/test_dse_node.py index 7835da6d1..e14960ccb 100644 --- a/congress/tests/dse2/test_dse_node.py +++ b/congress/tests/dse2/test_dse_node.py @@ -20,7 +20,7 @@ from oslo_config import cfg from oslo_messaging import conffixture from congress.dse2 import data_service -from congress.dse2 import dse_node +from congress.dse2 import datasource_manager as ds_manager from congress.tests import base from congress.tests import helper @@ -302,31 +302,31 @@ class TestDSManagerService(base.TestCase): super(TestDSManagerService, self).setUp() def test_ds_manager_endpoints_add_ds(self): - ds_manager_service = dse_node.DSManagerService('test_mgr') + ds_manager_service = ds_manager.DSManagerService('test_mgr') node_mock = mock.MagicMock() - node_mock.add_datasource = mock.MagicMock() - node_mock.add_datasource.return_value = 'add_datasource' + ds_manager_service.add_datasource = mock.MagicMock() + ds_manager_service.add_datasource.return_value = 'add_datasource' ds_manager_service.node = node_mock - endpoints = dse_node.DSManagerEndpoints(ds_manager_service) + endpoints = ds_manager.DSManagerEndpoints(ds_manager_service) expect_ret = 'add_datasource' self.assertEqual(expect_ret, endpoints.add_datasource('context', {})) - node_mock.add_datasource.assert_called_with({}) + ds_manager_service.add_datasource.assert_called_with({}) def test_ds_manager_endpoints_delete_ds(self): - ds_manager_service = dse_node.DSManagerService('test_mgr') + ds_manager_service = ds_manager.DSManagerService('test_mgr') node_mock = mock.MagicMock() - node_mock.delete_datasource = mock.MagicMock() - node_mock.delete_datasource.return_value = 'delete_datasource' + ds_manager_service.delete_datasource = mock.MagicMock() + ds_manager_service.delete_datasource.return_value = 'delete_datasource' ds_manager_service.node = node_mock - endpoints = dse_node.DSManagerEndpoints(ds_manager_service) + endpoints = ds_manager.DSManagerEndpoints(ds_manager_service) expect_ret = 'delete_datasource' self.assertEqual(expect_ret, endpoints.delete_datasource('context', 'ds-id')) - node_mock.delete_datasource.assert_called_with('ds-id') + ds_manager_service.delete_datasource.assert_called_with('ds-id') # Leave this to make manual testing with RabbitMQ easy