Refactor datasource synchronizer
This commit adds the following : 1. Move datasource_synchronizer to seperate class 2. Moved add_datasource and delete_datasource to DSManagerService class TODO(ramineni): move other datasource management related operations to DSManagerService class 3. Add support to sync single datasource 4. synchronizer to be started as part of DSManagerService to avoid running synchronizer on non-datasource node. Partially-Implements blueprint refactor-synchronizer Change-Id: I643a0c299695a794469553cc5fb73e8d1bceec7d
This commit is contained in:
parent
dc134c4bb1
commit
1d3f4bfb50
@ -20,6 +20,7 @@ from __future__ import absolute_import
|
||||
from oslo_config import cfg
|
||||
|
||||
ENGINE_SERVICE_ID = '__engine'
|
||||
DS_MANAGER_SERVICE_ID = '_ds_manager'
|
||||
|
||||
|
||||
class APIModel(object):
|
||||
|
@ -25,7 +25,6 @@ from congress.api import api_utils
|
||||
from congress.api import base
|
||||
from congress.api import error_codes
|
||||
from congress.api import webservice
|
||||
from congress.dse2 import dse_node
|
||||
from congress import exception
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -86,7 +85,7 @@ class DatasourceModel(base.APIModel):
|
||||
obj = None
|
||||
try:
|
||||
# Note(thread-safety): blocking call
|
||||
obj = self.invoke_rpc(dse_node.DS_MANAGER_SERVICE_ID,
|
||||
obj = self.invoke_rpc(base.DS_MANAGER_SERVICE_ID,
|
||||
'add_datasource',
|
||||
{'items': item},
|
||||
timeout=self.dse_long_timeout)
|
||||
@ -114,7 +113,7 @@ class DatasourceModel(base.APIModel):
|
||||
# delete a different datasource
|
||||
# Fix: check UUID of datasource before operating.
|
||||
# Abort if mismatch
|
||||
self.invoke_rpc(dse_node.DS_MANAGER_SERVICE_ID,
|
||||
self.invoke_rpc(base.DS_MANAGER_SERVICE_ID,
|
||||
'delete_datasource',
|
||||
{'datasource': datasource},
|
||||
timeout=self.dse_long_timeout)
|
||||
|
@ -25,6 +25,7 @@ from oslo_log import log as logging
|
||||
from oslo_middleware import cors
|
||||
from oslo_policy import opts as policy_opts
|
||||
|
||||
from congress.dse2 import dse_node
|
||||
from congress import version
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -82,6 +83,9 @@ core_opts = [
|
||||
# Register the configuration options
|
||||
cfg.CONF.register_opts(core_opts)
|
||||
|
||||
# Register dse opts
|
||||
cfg.CONF.register_opts(dse_node.dse_opts, group='dse')
|
||||
|
||||
policy_opts.set_defaults(cfg.CONF, 'policy.json')
|
||||
logging.register_options(cfg.CONF)
|
||||
|
||||
|
134
congress/dse2/datasource_manager.py
Normal file
134
congress/dse2/datasource_manager.py
Normal file
@ -0,0 +1,134 @@
|
||||
# Copyright (c) 2016 . All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_log import log as logging
|
||||
|
||||
from congress.api import base as api_base
|
||||
from congress.db import datasources as datasources_db
|
||||
from congress.dse2 import data_service
|
||||
from congress import exception
|
||||
from congress.synchronizer import datasource_synchronizer
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DSManagerService(data_service.DataService):
|
||||
"""A proxy service to datasource managing methods in dse_node."""
|
||||
def __init__(self, service_id):
|
||||
super(DSManagerService, self).__init__(service_id)
|
||||
self.synchronizer = None
|
||||
self.add_rpc_endpoint(DSManagerEndpoints(self))
|
||||
|
||||
def register_synchronizer(self):
|
||||
self.synchronizer = datasource_synchronizer.DatasourceSynchronizer(
|
||||
self.node)
|
||||
self.synchronizer.start()
|
||||
|
||||
def start(self):
|
||||
super(DSManagerService, self).start()
|
||||
self.register_synchronizer()
|
||||
|
||||
def stop(self):
|
||||
if self.synchronizer:
|
||||
self.synchronizer.stop()
|
||||
super(DSManagerService, self).stop()
|
||||
|
||||
# Note(thread-safety): blocking function
|
||||
def add_datasource(self, item, deleted=False, update_db=True):
|
||||
req = self.make_datasource_dict(item)
|
||||
|
||||
# check the request has valid information
|
||||
self.node.validate_create_datasource(req)
|
||||
if (len(req['name']) == 0 or req['name'][0] == '_'):
|
||||
raise exception.InvalidDatasourceName(value=req['name'])
|
||||
|
||||
new_id = req['id']
|
||||
LOG.debug("adding datasource %s", req['name'])
|
||||
if update_db:
|
||||
LOG.debug("updating db")
|
||||
try:
|
||||
# Note(thread-safety): blocking call
|
||||
datasource = datasources_db.add_datasource(
|
||||
id_=req['id'],
|
||||
name=req['name'],
|
||||
driver=req['driver'],
|
||||
config=req['config'],
|
||||
description=req['description'],
|
||||
enabled=req['enabled'])
|
||||
except db_exc.DBDuplicateEntry:
|
||||
raise exception.DatasourceNameInUse(value=req['name'])
|
||||
except db_exc.DBError:
|
||||
LOG.exception('Creating a new datasource failed.')
|
||||
raise exception.DatasourceCreationError(value=req['name'])
|
||||
|
||||
new_id = datasource['id']
|
||||
try:
|
||||
self.synchronizer.sync_datasource(req['name'])
|
||||
# immediate synch policies on local PE if present
|
||||
# otherwise wait for regularly scheduled synch
|
||||
engine = self.node.service_object(api_base.ENGINE_SERVICE_ID)
|
||||
if engine is not None:
|
||||
engine.synchronizer.sync_one_policy(req['name'])
|
||||
# TODO(dse2): also broadcast to all PE nodes to synch
|
||||
except exception.DataServiceError:
|
||||
LOG.exception('the datasource service is already '
|
||||
'created in the node')
|
||||
except Exception:
|
||||
LOG.exception(
|
||||
'Unexpected exception encountered while registering '
|
||||
'new datasource %s.', req['name'])
|
||||
if update_db:
|
||||
datasources_db.delete_datasource(req['id'])
|
||||
msg = ("Datasource service: %s creation fails." % req['name'])
|
||||
raise exception.DatasourceCreationError(message=msg)
|
||||
|
||||
new_item = dict(item)
|
||||
new_item['id'] = new_id
|
||||
return self.node.make_datasource_dict(new_item)
|
||||
|
||||
# Note(thread-safety): blocking function
|
||||
def delete_datasource(self, datasource, update_db=True):
|
||||
LOG.debug("Deleting %s datasource ", datasource['name'])
|
||||
datasource_id = datasource['id']
|
||||
if update_db:
|
||||
# Note(thread-safety): blocking call
|
||||
result = datasources_db.delete_datasource_with_data(datasource_id)
|
||||
if not result:
|
||||
raise exception.DatasourceNotFound(id=datasource_id)
|
||||
|
||||
# Note(thread-safety): blocking call
|
||||
try:
|
||||
self.synchronizer.sync_datasource(datasource['name'])
|
||||
# If local PE exists.. sync
|
||||
engine = self.node.service_object(api_base.ENGINE_SERVICE_ID)
|
||||
if engine:
|
||||
engine.synchronizer.sync_one_policy(datasource['name'])
|
||||
except Exception:
|
||||
msg = ('failed to synchronize_datasource after '
|
||||
'deleting datasource: %s' % datasource_id)
|
||||
LOG.exception(msg)
|
||||
raise exception.DataServiceError(msg)
|
||||
|
||||
|
||||
class DSManagerEndpoints(object):
|
||||
|
||||
def __init__(self, service):
|
||||
self.ds_manager = service
|
||||
|
||||
def add_datasource(self, context, items):
|
||||
return self.ds_manager.add_datasource(items)
|
||||
|
||||
def delete_datasource(self, context, datasource):
|
||||
return self.ds_manager.delete_datasource(datasource)
|
@ -20,10 +20,7 @@ import six
|
||||
import eventlet
|
||||
eventlet.monkey_patch() # for using oslo.messaging w/ eventlet executor
|
||||
|
||||
from futurist import periodics
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_config import cfg
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
from oslo_messaging import exceptions as messaging_exceptions
|
||||
@ -32,19 +29,17 @@ from oslo_utils import importutils
|
||||
from oslo_utils import strutils
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from congress.api import base as api_base
|
||||
from congress.datalog import compile as datalog_compile
|
||||
from congress.datasources import constants
|
||||
from congress.db import datasources as datasources_db
|
||||
from congress.dse2 import control_bus
|
||||
from congress.dse2 import data_service
|
||||
from congress import exception
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_dse_opts = [
|
||||
dse_opts = [
|
||||
cfg.StrOpt('bus_id', default='bus',
|
||||
help='Unique ID of this DSE bus'),
|
||||
cfg.IntOpt('ping_timeout', default=5,
|
||||
@ -63,7 +58,7 @@ _dse_opts = [
|
||||
help='The number of seconds to retry execute action before '
|
||||
'giving up. Zero or negative value means never give up.'),
|
||||
]
|
||||
cfg.CONF.register_opts(_dse_opts, group='dse')
|
||||
# cfg.CONF.register_opts(_dse_opts, group='dse')
|
||||
|
||||
|
||||
class DseNode(object):
|
||||
@ -261,7 +256,6 @@ class DseNode(object):
|
||||
return
|
||||
|
||||
LOG.info("Stopping DSE node '%s'", self.node_id)
|
||||
self.stop_datasource_synchronizer()
|
||||
for s in self._services:
|
||||
s.stop()
|
||||
self._rpc_server.stop()
|
||||
@ -592,98 +586,6 @@ class DseNode(object):
|
||||
results.append(result)
|
||||
return results
|
||||
|
||||
# TODO(ekcs): should we start and stop these along with self.{start, stop}?
|
||||
def start_periodic_tasks(self):
|
||||
callables = [(self.synchronize, None, {}),
|
||||
(self._check_resub_all, None, {})]
|
||||
self.periodic_tasks = periodics.PeriodicWorker(callables)
|
||||
if self._running:
|
||||
self.sync_thread = eventlet.spawn_n(self.periodic_tasks.start)
|
||||
|
||||
def stop_datasource_synchronizer(self):
|
||||
if self.periodic_tasks:
|
||||
self.periodic_tasks.stop()
|
||||
self.periodic_tasks.wait()
|
||||
self.periodic_tasks = None
|
||||
if self.sync_thread:
|
||||
eventlet.greenthread.kill(self.sync_thread)
|
||||
self.sync_thread = None
|
||||
|
||||
@periodics.periodic(spacing=cfg.CONF.datasource_sync_period)
|
||||
def synchronize(self):
|
||||
try:
|
||||
self.synchronize_datasources()
|
||||
except Exception:
|
||||
LOG.exception("synchronize_datasources failed")
|
||||
|
||||
@lockutils.synchronized('congress_synchronize_datasources')
|
||||
def synchronize_datasources(self):
|
||||
if not cfg.CONF.datasources:
|
||||
LOG.debug("Skipping datasource synchronization on a "
|
||||
"non-datasources node")
|
||||
return
|
||||
LOG.info("Synchronizing running datasources")
|
||||
added = 0
|
||||
removed = 0
|
||||
datasources = self.get_datasources(filter_secret=False)
|
||||
db_datasources = []
|
||||
# Look for datasources in the db, but not in the services.
|
||||
for configured_ds in datasources:
|
||||
db_datasources.append(configured_ds['id'])
|
||||
active_ds = self.service_object(uuid_=configured_ds['id'])
|
||||
# If datasource is not enabled, unregister the service
|
||||
if not configured_ds['enabled']:
|
||||
if active_ds:
|
||||
LOG.debug("unregistering %s service, datasource disabled "
|
||||
"in DB.", active_ds.service_id)
|
||||
self.unregister_service(active_ds.service_id)
|
||||
removed = removed + 1
|
||||
continue
|
||||
if active_ds is None:
|
||||
# service is not up, create the service
|
||||
LOG.info("registering %s service on node %s",
|
||||
configured_ds['name'], self.node_id)
|
||||
service = self.create_datasource_service(configured_ds)
|
||||
self.register_service(service)
|
||||
added = added + 1
|
||||
|
||||
# Unregister the services which are not in DB
|
||||
active_ds_services = [s for s in self._services
|
||||
if getattr(s, 'type', '') == 'datasource_driver']
|
||||
db_datasources_set = set(db_datasources)
|
||||
stale_services = [s for s in active_ds_services
|
||||
if s.ds_id not in db_datasources_set]
|
||||
for s in stale_services:
|
||||
LOG.debug("unregistering %s service, datasource not found in DB ",
|
||||
s.service_id)
|
||||
self.unregister_service(uuid_=s.ds_id)
|
||||
removed = removed + 1
|
||||
|
||||
LOG.info("synchronize_datasources, added %d removed %d on node %s",
|
||||
added, removed, self.node_id)
|
||||
|
||||
# This might be required once we support update datasource config
|
||||
|
||||
# if not self._config_eq(configured_ds, active_ds):
|
||||
# LOG.debug('configured and active disagree: %s %s',
|
||||
# strutils.mask_password(active_ds),
|
||||
# strutils.mask_password(configured_ds))
|
||||
|
||||
# LOG.info('Reloading datasource: %s',
|
||||
# strutils.mask_password(configured_ds))
|
||||
# self.delete_datasource(configured_ds['name'],
|
||||
# update_db=False)
|
||||
# self.add_datasource(configured_ds, update_db=False)
|
||||
|
||||
# def _config_eq(self, db_config, active_config):
|
||||
# return (db_config['name'] == active_config.service_id and
|
||||
# db_config['config'] == active_config.service_info['args'])
|
||||
|
||||
@periodics.periodic(spacing=cfg.CONF.dse.time_to_resub)
|
||||
def _check_resub_all(self):
|
||||
for s in self._services:
|
||||
s.check_resub_all()
|
||||
|
||||
def delete_missing_driver_datasources(self):
|
||||
removed = 0
|
||||
for datasource in datasources_db.get_datasources():
|
||||
@ -721,59 +623,6 @@ class DseNode(object):
|
||||
if key in fields))
|
||||
return resource
|
||||
|
||||
# Note(thread-safety): blocking function
|
||||
def add_datasource(self, item, deleted=False, update_db=True):
|
||||
req = self.make_datasource_dict(item)
|
||||
|
||||
# check the request has valid information
|
||||
self.validate_create_datasource(req)
|
||||
if (len(req['name']) == 0 or req['name'][0] == '_'):
|
||||
raise exception.InvalidDatasourceName(value=req['name'])
|
||||
|
||||
new_id = req['id']
|
||||
LOG.debug("adding datasource %s", req['name'])
|
||||
if update_db:
|
||||
LOG.debug("updating db")
|
||||
try:
|
||||
# Note(thread-safety): blocking call
|
||||
datasource = datasources_db.add_datasource(
|
||||
id_=req['id'],
|
||||
name=req['name'],
|
||||
driver=req['driver'],
|
||||
config=req['config'],
|
||||
description=req['description'],
|
||||
enabled=req['enabled'])
|
||||
except db_exc.DBDuplicateEntry:
|
||||
raise exception.DatasourceNameInUse(value=req['name'])
|
||||
except db_exc.DBError:
|
||||
LOG.exception('Creating a new datasource failed.')
|
||||
raise exception.DatasourceCreationError(value=req['name'])
|
||||
|
||||
new_id = datasource['id']
|
||||
try:
|
||||
self.synchronize_datasources()
|
||||
# immediate synch policies on local PE if present
|
||||
# otherwise wait for regularly scheduled synch
|
||||
engine = self.service_object(api_base.ENGINE_SERVICE_ID)
|
||||
if engine is not None:
|
||||
engine.synchronizer.sync_one_policy(req['name'])
|
||||
# TODO(dse2): also broadcast to all PE nodes to synch
|
||||
except exception.DataServiceError:
|
||||
LOG.exception('the datasource service is already '
|
||||
'created in the node')
|
||||
except Exception:
|
||||
LOG.exception(
|
||||
'Unexpected exception encountered while registering '
|
||||
'new datasource %s.', req['name'])
|
||||
if update_db:
|
||||
datasources_db.delete_datasource(req['id'])
|
||||
msg = ("Datasource service: %s creation fails." % req['name'])
|
||||
raise exception.DatasourceCreationError(message=msg)
|
||||
|
||||
new_item = dict(item)
|
||||
new_item['id'] = new_id
|
||||
return self.make_datasource_dict(new_item)
|
||||
|
||||
def validate_create_datasource(self, req):
|
||||
name = req['name']
|
||||
if not datalog_compile.string_is_servicename(name):
|
||||
@ -852,29 +701,6 @@ class DseNode(object):
|
||||
raise exception.DataServiceError(msg % class_path)
|
||||
return service
|
||||
|
||||
# Note(thread-safety): blocking function
|
||||
def delete_datasource(self, datasource, update_db=True):
|
||||
LOG.debug("Deleting %s datasource ", datasource['name'])
|
||||
datasource_id = datasource['id']
|
||||
if update_db:
|
||||
# Note(thread-safety): blocking call
|
||||
result = datasources_db.delete_datasource_with_data(datasource_id)
|
||||
if not result:
|
||||
raise exception.DatasourceNotFound(id=datasource_id)
|
||||
|
||||
# Note(thread-safety): blocking call
|
||||
try:
|
||||
self.synchronize_datasources()
|
||||
# If local PE exists.. sync
|
||||
engine = self.service_object(api_base.ENGINE_SERVICE_ID)
|
||||
if engine:
|
||||
engine.synchronizer.sync_one_policy(datasource['name'])
|
||||
except Exception:
|
||||
msg = ('failed to synchronize_datasource after '
|
||||
'deleting datasource: %s' % datasource_id)
|
||||
LOG.exception(msg)
|
||||
raise exception.DataServiceError(msg)
|
||||
|
||||
|
||||
class DseNodeEndpoints (object):
|
||||
"""Collection of RPC endpoints that the DseNode exposes on the bus.
|
||||
@ -907,25 +733,3 @@ class DseNodeEndpoints (object):
|
||||
self.node.service_object(s).receive_data_sequenced(
|
||||
publisher=publisher, table=table, data=data, seqnum=seqnum,
|
||||
is_snapshot=is_snapshot)
|
||||
|
||||
|
||||
DS_MANAGER_SERVICE_ID = '_ds_manager'
|
||||
|
||||
|
||||
class DSManagerService(data_service.DataService):
|
||||
"""A proxy service to datasource managing methods in dse_node."""
|
||||
def __init__(self, service_id):
|
||||
super(DSManagerService, self).__init__(DS_MANAGER_SERVICE_ID)
|
||||
self.add_rpc_endpoint(DSManagerEndpoints(self))
|
||||
|
||||
|
||||
class DSManagerEndpoints(object):
|
||||
|
||||
def __init__(self, service):
|
||||
self.service = service
|
||||
|
||||
def add_datasource(self, context, items):
|
||||
return self.service.node.add_datasource(items)
|
||||
|
||||
def delete_datasource(self, context, datasource):
|
||||
return self.service.node.delete_datasource(datasource)
|
||||
|
@ -35,6 +35,7 @@ from congress.api import status_model
|
||||
from congress.api.system import driver_model
|
||||
from congress.api import table_model
|
||||
from congress.db import datasources as db_datasources
|
||||
from congress.dse2 import datasource_manager as ds_manager
|
||||
from congress.dse2 import dse_node
|
||||
from congress import exception
|
||||
from congress.policy_engines import agnostic
|
||||
@ -75,9 +76,9 @@ def create2(node_id=None, bus_id=None, existing_node=None,
|
||||
LOG.info("Registering congress datasource services on node %s",
|
||||
node.node_id)
|
||||
services['datasources'] = create_datasources(node)
|
||||
node.start_periodic_tasks()
|
||||
node.register_service(
|
||||
dse_node.DSManagerService(dse_node.DS_MANAGER_SERVICE_ID))
|
||||
services['ds_manager'] = ds_manager.DSManagerService(
|
||||
api_base.DS_MANAGER_SERVICE_ID)
|
||||
node.register_service(services['ds_manager'])
|
||||
|
||||
if policy_engine:
|
||||
LOG.info("Registering congress PolicyEngine service on node %s",
|
||||
|
137
congress/synchronizer/datasource_synchronizer.py
Normal file
137
congress/synchronizer/datasource_synchronizer.py
Normal file
@ -0,0 +1,137 @@
|
||||
# Copyright (c) 2016 NEC Corp. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
import eventlet
|
||||
from futurist import periodics
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from congress.db import datasources
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
SYNCHRONIZER_SERVICE_ID = '_datasource_synchronizer'
|
||||
|
||||
|
||||
class DatasourceSynchronizer(object):
|
||||
|
||||
def __init__(self, node):
|
||||
self.name = SYNCHRONIZER_SERVICE_ID
|
||||
self.sync_thread = None
|
||||
self.periodic_tasks = None
|
||||
self.node = node
|
||||
|
||||
def start(self):
|
||||
callables = [(self.synchronize_all_datasources, None, {}),
|
||||
(self._check_resub_all, None, {})]
|
||||
self.periodic_tasks = periodics.PeriodicWorker(callables)
|
||||
self.sync_thread = eventlet.spawn_n(self.periodic_tasks.start)
|
||||
LOG.info("started datasource synchronizer on node %s",
|
||||
self.node.node_id)
|
||||
|
||||
def stop(self):
|
||||
if self.periodic_tasks:
|
||||
self.periodic_tasks.stop()
|
||||
self.periodic_tasks.wait()
|
||||
self.periodic_tasks = None
|
||||
if self.sync_thread:
|
||||
eventlet.greenthread.kill(self.sync_thread)
|
||||
self.sync_thread = None
|
||||
|
||||
@periodics.periodic(spacing=cfg.CONF.dse.time_to_resub)
|
||||
def _check_resub_all(self):
|
||||
LOG.info("Running periodic resub on node %s", self.node.node_id)
|
||||
for s in self.node.get_services(True):
|
||||
s.check_resub_all()
|
||||
|
||||
@lockutils.synchronized('congress_synchronize_datasources')
|
||||
def sync_datasource(self, ds_name):
|
||||
if not cfg.CONF.datasources:
|
||||
LOG.info("sync not supported on non-datasource node")
|
||||
return
|
||||
datasource = datasources.get_datasource_by_name(ds_name)
|
||||
service_obj = self.node.service_object(ds_name)
|
||||
|
||||
if datasource and not service_obj:
|
||||
# register service with data node
|
||||
service = self.node.create_datasource_service(datasource)
|
||||
self.node.register_service(service)
|
||||
LOG.info("service %s registered by synchronizer", ds_name)
|
||||
return
|
||||
if service_obj and datasource is None:
|
||||
# unregister, datasource not present in DB
|
||||
self.node.unregister_service(ds_name)
|
||||
LOG.info("service %s unregistered by synchronizer", ds_name)
|
||||
return
|
||||
|
||||
@lockutils.synchronized('congress_synchronize_datasources')
|
||||
@periodics.periodic(spacing=cfg.CONF.datasource_sync_period)
|
||||
def synchronize_all_datasources(self):
|
||||
LOG.info("synchronizing datasources on node %s", self.node.node_id)
|
||||
added = 0
|
||||
removed = 0
|
||||
datasources = self.node.get_datasources(filter_secret=False)
|
||||
db_datasources = []
|
||||
# Look for datasources in the db, but not in the services.
|
||||
for configured_ds in datasources:
|
||||
db_datasources.append(configured_ds['id'])
|
||||
active_ds = self.node.service_object(uuid_=configured_ds['id'])
|
||||
# If datasource is not enabled, unregister the service
|
||||
if not configured_ds['enabled']:
|
||||
if active_ds:
|
||||
LOG.info("unregistering %s service, datasource disabled "
|
||||
"in DB.", active_ds.service_id)
|
||||
self.node.unregister_service(active_ds.service_id)
|
||||
removed = removed + 1
|
||||
continue
|
||||
if active_ds is None:
|
||||
# service is not up, create the service
|
||||
LOG.info("registering %s service on node %s",
|
||||
configured_ds['name'], self.node.node_id)
|
||||
service = self.node.create_datasource_service(configured_ds)
|
||||
self.node.register_service(service)
|
||||
added = added + 1
|
||||
|
||||
# Unregister the services which are not in DB
|
||||
active_ds_services = [s for s in self.node.get_services(True)
|
||||
if getattr(s, 'type', '') == 'datasource_driver']
|
||||
db_datasources_set = set(db_datasources)
|
||||
stale_services = [s for s in active_ds_services
|
||||
if s.ds_id not in db_datasources_set]
|
||||
for s in stale_services:
|
||||
LOG.info("unregistering %s service, datasource not found in DB ",
|
||||
s.service_id)
|
||||
self.node.unregister_service(uuid_=s.ds_id)
|
||||
removed = removed + 1
|
||||
|
||||
LOG.info("synchronized datasources, added %d removed %d on node %s",
|
||||
added, removed, self.node.node_id)
|
||||
|
||||
# This might be required once we support update datasource config
|
||||
# if not self._config_eq(configured_ds, active_ds):
|
||||
# LOG.debug('configured and active disagree: %s %s',
|
||||
# strutils.mask_password(active_ds),
|
||||
# strutils.mask_password(configured_ds))
|
||||
|
||||
# LOG.info('Reloading datasource: %s',
|
||||
# strutils.mask_password(configured_ds))
|
||||
# self.delete_datasource(configured_ds['name'],
|
||||
# update_db=False)
|
||||
# self.add_datasource(configured_ds, update_db=False)
|
||||
|
||||
# def _config_eq(self, db_config, active_config):
|
||||
# return (db_config['name'] == active_config.service_id and
|
||||
# db_config['config'] == active_config.service_info['args'])
|
@ -65,6 +65,8 @@ def setup_config(with_fake_datasource=True, node_id='testnode',
|
||||
engine_service = services[api_base.ENGINE_SERVICE_ID]
|
||||
if api:
|
||||
api_service = services['api']
|
||||
if datasources:
|
||||
ds_manager = services['ds_manager']
|
||||
|
||||
return {'node': node, 'engine': engine_service, 'data': data,
|
||||
'api': api_service}
|
||||
'api': api_service, 'ds_manager': ds_manager}
|
||||
|
@ -37,8 +37,9 @@ class TestDatasourceModel(base.SqlTestCase):
|
||||
self.data = services['data']
|
||||
self.node = services['node']
|
||||
self.engine = services['engine']
|
||||
self.ds_manager = services['ds_manager']
|
||||
self.datasource = self._get_datasource_request()
|
||||
self.node.add_datasource(self.datasource)
|
||||
self.ds_manager.add_datasource(self.datasource)
|
||||
|
||||
def tearDown(self):
|
||||
super(TestDatasourceModel, self).tearDown()
|
||||
@ -61,7 +62,7 @@ class TestDatasourceModel(base.SqlTestCase):
|
||||
self.assertEqual(1, len(dinfo))
|
||||
datasource2 = self._get_datasource_request()
|
||||
datasource2['name'] = 'datasource2'
|
||||
self.node.add_datasource(datasource2)
|
||||
self.ds_manager.add_datasource(datasource2)
|
||||
dinfo = self.datasource_model.get_items(None)['results']
|
||||
self.assertEqual(2, len(dinfo))
|
||||
del dinfo[0]['id']
|
||||
|
@ -27,8 +27,9 @@ class TestDriverModel(base.SqlTestCase):
|
||||
super(TestDriverModel, self).setUp()
|
||||
services = api_base.setup_config()
|
||||
self.node = services['node']
|
||||
self.ds_manager = services['ds_manager']
|
||||
|
||||
self.node.add_datasource(self._get_datasource_request())
|
||||
self.ds_manager.add_datasource(self._get_datasource_request())
|
||||
self.driver_model = services['api']['api-system']
|
||||
|
||||
def _get_datasource_request(self):
|
||||
|
@ -33,8 +33,10 @@ class TestDataSource(base.SqlTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestDataSource, self).setUp()
|
||||
self.dseNode = api_base.setup_config(with_fake_datasource=False,
|
||||
api=False, policy=False)['node']
|
||||
config = api_base.setup_config(with_fake_datasource=False, api=False,
|
||||
policy=False)
|
||||
self.dseNode = config['node']
|
||||
self.ds_manager = config['ds_manager']
|
||||
|
||||
def _get_datasource_request(self):
|
||||
# leave ID out--generated during creation
|
||||
@ -50,7 +52,7 @@ class TestDataSource(base.SqlTestCase):
|
||||
|
||||
def test_add_datasource(self):
|
||||
req = self._get_datasource_request()
|
||||
result = self.dseNode.add_datasource(req)
|
||||
result = self.ds_manager.add_datasource(req)
|
||||
# test equality of return value except for 'id' field
|
||||
del(result['id'])
|
||||
self.assertEqual(req, result)
|
||||
@ -70,7 +72,7 @@ class TestDataSource(base.SqlTestCase):
|
||||
|
||||
req = self._get_datasource_request()
|
||||
self.assertRaises(congressException.DatasourceCreationError,
|
||||
self.dseNode.add_datasource, req)
|
||||
self.ds_manager.add_datasource, req)
|
||||
|
||||
@mock.patch.object(dse_node.DseNode, 'register_service')
|
||||
def test_add_datasource_synchronizer_error(self, register_ds):
|
||||
@ -78,13 +80,13 @@ class TestDataSource(base.SqlTestCase):
|
||||
|
||||
req = self._get_datasource_request()
|
||||
self.assertRaises(congressException.DatasourceCreationError,
|
||||
self.dseNode.add_datasource, req)
|
||||
self.ds_manager.add_datasource, req)
|
||||
ds = datasource_db.get_datasource_by_name(req['name'])
|
||||
self.assertIsNone(ds)
|
||||
|
||||
def test_get_datasource(self):
|
||||
req = self._get_datasource_request()
|
||||
ds = self.dseNode.add_datasource(req)
|
||||
ds = self.ds_manager.add_datasource(req)
|
||||
result = self.dseNode.get_datasource(ds['id'])
|
||||
# test equality except for 'id' field
|
||||
del(result['id'])
|
||||
@ -92,7 +94,7 @@ class TestDataSource(base.SqlTestCase):
|
||||
|
||||
def test_get_datasources(self):
|
||||
req = self._get_datasource_request()
|
||||
self.dseNode.add_datasource(req)
|
||||
self.ds_manager.add_datasource(req)
|
||||
result = self.dseNode.get_datasources()
|
||||
self.assertEqual(len(result), 1)
|
||||
result = result[0]
|
||||
@ -103,10 +105,10 @@ class TestDataSource(base.SqlTestCase):
|
||||
def test_get_datasources2(self):
|
||||
req1 = self._get_datasource_request()
|
||||
req1['name'] = 'datasource1'
|
||||
result1 = self.dseNode.add_datasource(req1)
|
||||
result1 = self.ds_manager.add_datasource(req1)
|
||||
req2 = self._get_datasource_request()
|
||||
req2['name'] = 'datasource2'
|
||||
result2 = self.dseNode.add_datasource(req2)
|
||||
result2 = self.ds_manager.add_datasource(req2)
|
||||
# check results of add_datasource
|
||||
for key, value in req1.items():
|
||||
self.assertEqual(value, result1[key])
|
||||
@ -132,7 +134,7 @@ class TestDataSource(base.SqlTestCase):
|
||||
|
||||
def test_get_datasources_hide_secret(self):
|
||||
req = self._get_datasource_request()
|
||||
self.dseNode.add_datasource(req)
|
||||
self.ds_manager.add_datasource(req)
|
||||
result = self.dseNode.get_datasources(filter_secret=True)
|
||||
result = result[0]
|
||||
# check equality except that 'config'/'password' is hidden
|
||||
@ -142,14 +144,14 @@ class TestDataSource(base.SqlTestCase):
|
||||
|
||||
def test_create_datasource_duplicate_name(self):
|
||||
req = self._get_datasource_request()
|
||||
self.dseNode.add_datasource(req)
|
||||
self.ds_manager.add_datasource(req)
|
||||
self.assertRaises(congressException.DatasourceNameInUse,
|
||||
self.dseNode.add_datasource, req)
|
||||
self.ds_manager.add_datasource, req)
|
||||
|
||||
def test_delete_datasource(self):
|
||||
req = self._get_datasource_request()
|
||||
result = self.dseNode.add_datasource(req)
|
||||
self.dseNode.delete_datasource(result)
|
||||
result = self.ds_manager.add_datasource(req)
|
||||
self.ds_manager.delete_datasource(result)
|
||||
# check that service is actually deleted
|
||||
services = self.dseNode.get_services()
|
||||
self.assertEqual(len(services), 0)
|
||||
@ -183,7 +185,7 @@ class TestDataSource(base.SqlTestCase):
|
||||
req = self._get_datasource_request()
|
||||
req['id'] = 'fake-id'
|
||||
self.assertRaises(congressException.DatasourceNotFound,
|
||||
self.dseNode.delete_datasource, req)
|
||||
self.ds_manager.delete_datasource, req)
|
||||
|
||||
# TODO(dse2): Doesn't seem like we need this (or it will be moved to API).
|
||||
# def test_get_driver_schema(self):
|
||||
|
@ -30,6 +30,7 @@ from congress.datalog import compile
|
||||
from congress.datasources import nova_driver
|
||||
from congress import exception as congressException
|
||||
from congress.policy_engines import agnostic
|
||||
from congress.tests.api import base as test_api_base
|
||||
from congress.tests import base
|
||||
from congress.tests import fake_datasource
|
||||
from congress.tests import helper
|
||||
@ -245,12 +246,14 @@ class TestDSE(base.TestCase):
|
||||
node.stop()
|
||||
|
||||
def test_auto_resub(self):
|
||||
node = helper.make_dsenode_new_partition('testnode')
|
||||
config = test_api_base.setup_config(with_fake_datasource=False,
|
||||
api=False, policy=False)
|
||||
node = config['node']
|
||||
config['ds_manager'].synchronizer.start()
|
||||
sub = fake_datasource.FakeDataSource('sub')
|
||||
pub = fake_datasource.FakeDataSource('pub')
|
||||
node.register_service(sub)
|
||||
node.register_service(pub)
|
||||
node.start_periodic_tasks()
|
||||
sub.subscribe('pub', 'p')
|
||||
|
||||
helper.retry_check_function_return_value(
|
||||
|
@ -20,7 +20,7 @@ from oslo_config import cfg
|
||||
from oslo_messaging import conffixture
|
||||
|
||||
from congress.dse2 import data_service
|
||||
from congress.dse2 import dse_node
|
||||
from congress.dse2 import datasource_manager as ds_manager
|
||||
from congress.tests import base
|
||||
from congress.tests import helper
|
||||
|
||||
@ -302,31 +302,31 @@ class TestDSManagerService(base.TestCase):
|
||||
super(TestDSManagerService, self).setUp()
|
||||
|
||||
def test_ds_manager_endpoints_add_ds(self):
|
||||
ds_manager_service = dse_node.DSManagerService('test_mgr')
|
||||
ds_manager_service = ds_manager.DSManagerService('test_mgr')
|
||||
node_mock = mock.MagicMock()
|
||||
node_mock.add_datasource = mock.MagicMock()
|
||||
node_mock.add_datasource.return_value = 'add_datasource'
|
||||
ds_manager_service.add_datasource = mock.MagicMock()
|
||||
ds_manager_service.add_datasource.return_value = 'add_datasource'
|
||||
ds_manager_service.node = node_mock
|
||||
endpoints = dse_node.DSManagerEndpoints(ds_manager_service)
|
||||
endpoints = ds_manager.DSManagerEndpoints(ds_manager_service)
|
||||
|
||||
expect_ret = 'add_datasource'
|
||||
self.assertEqual(expect_ret, endpoints.add_datasource('context', {}))
|
||||
|
||||
node_mock.add_datasource.assert_called_with({})
|
||||
ds_manager_service.add_datasource.assert_called_with({})
|
||||
|
||||
def test_ds_manager_endpoints_delete_ds(self):
|
||||
ds_manager_service = dse_node.DSManagerService('test_mgr')
|
||||
ds_manager_service = ds_manager.DSManagerService('test_mgr')
|
||||
node_mock = mock.MagicMock()
|
||||
node_mock.delete_datasource = mock.MagicMock()
|
||||
node_mock.delete_datasource.return_value = 'delete_datasource'
|
||||
ds_manager_service.delete_datasource = mock.MagicMock()
|
||||
ds_manager_service.delete_datasource.return_value = 'delete_datasource'
|
||||
ds_manager_service.node = node_mock
|
||||
endpoints = dse_node.DSManagerEndpoints(ds_manager_service)
|
||||
endpoints = ds_manager.DSManagerEndpoints(ds_manager_service)
|
||||
|
||||
expect_ret = 'delete_datasource'
|
||||
self.assertEqual(expect_ret,
|
||||
endpoints.delete_datasource('context', 'ds-id'))
|
||||
|
||||
node_mock.delete_datasource.assert_called_with('ds-id')
|
||||
ds_manager_service.delete_datasource.assert_called_with('ds-id')
|
||||
|
||||
|
||||
# Leave this to make manual testing with RabbitMQ easy
|
||||
|
Loading…
Reference in New Issue
Block a user