Instantiating datasource service by synchronizer

datasource synchronizer takes care of creating and deleting
datasource services. All API managing datasource services calls
synchronizer_datasources to prevent unexpected race conditions.

Closes-Bugs: #1588167
Change-Id: I09e308132ad9ff00303e77eb06b7d1a41464bd52
This commit is contained in:
Masahito Muroi 2016-07-08 18:41:49 +09:00 committed by Eric K
parent b7c2307da1
commit d7a81b9208
5 changed files with 74 additions and 23 deletions

View File

@ -23,6 +23,7 @@ import sqlalchemy as sa
from sqlalchemy.orm import exc as db_exc
from congress.db import api as db
from congress.db import db_ds_table_data as table_data
from congress.db import model_base
@ -66,6 +67,14 @@ def delete_datasource(id_, session=None):
Datasource.id == id_).delete()
def delete_datasource_with_data(id_, session=None):
session = session or db.get_session()
with session.begin(subtransactions=True):
deleted = delete_datasource(id_, session)
table_data.delete_ds_table_data(id_, session=session)
return deleted
def get_datasource_name(name_or_id, session=None):
session = session or db.get_session()
datasource_obj = get_datasource(name_or_id, session)

View File

@ -32,9 +32,7 @@ from oslo_utils import strutils
from oslo_utils import uuidutils
from congress.datasources import constants
from congress.db import api as db
from congress.db import datasources as datasources_db
from congress.db import db_ds_table_data
from congress.dse2 import control_bus
from congress import exception
@ -153,7 +151,6 @@ class DseNode(object):
return {'node_id': self.node_id, 'instance': str(self.instance)}
# Note(thread-safety): blocking function
@lockutils.synchronized('register_service')
def register_service(self, service):
assert service.node is None
if self.service_object(service.service_id):
@ -603,6 +600,7 @@ class DseNode(object):
except Exception:
LOG.exception("synchronize_datasources failed")
@lockutils.synchronized('congress_synchronize_datasources')
def synchronize_datasources(self):
LOG.info("Synchronizing running datasources")
added = 0
@ -623,8 +621,8 @@ class DseNode(object):
continue
if active_ds is None:
# service is not up, create the service
LOG.debug("registering %s service on node %s",
configured_ds['name'], self.node_id)
LOG.info("registering %s service on node %s",
configured_ds['name'], self.node_id)
service = self.create_datasource_service(configured_ds)
self.register_service(service)
added = added + 1
@ -727,6 +725,9 @@ class DseNode(object):
enabled=req['enabled'])
except db_exc.DBDuplicateEntry:
raise exception.DatasourceNameInUse(value=req['name'])
except db_exc.DBError:
LOG.exception('Creating a new datasource failed.')
raise exception.DatasourceCreationError(value=req['name'])
new_id = datasource['id']
try:
@ -745,12 +746,12 @@ class DseNode(object):
'created in the node')
except Exception:
LOG.exception(
'Unexpected exception encountered while synchronizing new '
'datasource %s.', req['name'])
'Unexpected exception encountered while registering '
'new datasource %s.', req['name'])
if update_db:
# Note(thread-safety): blocking call
datasources_db.delete_datasource(new_id)
raise exception.DatasourceCreationError(value=req['name'])
datasources_db.delete_datasource(req['id'])
msg = ("Datasource service: %s creation fails." % req['name'])
raise exception.DatasourceCreationError(message=msg)
new_item = dict(item)
new_item['id'] = new_id
@ -832,22 +833,23 @@ class DseNode(object):
return service
# Note(thread-safety): blocking function
# FIXME(thread-safety): make sure unregister_service succeeds even if
# service already unregistered
def delete_datasource(self, datasource, update_db=True):
LOG.debug("Deleting %s datasource ", datasource['name'])
datasource_id = datasource['id']
session = db.get_session()
with session.begin(subtransactions=True):
if update_db:
# Note(thread-safety): blocking call
result = datasources_db.delete_datasource(
datasource_id, session)
if not result:
raise exception.DatasourceNotFound(id=datasource_id)
db_ds_table_data.delete_ds_table_data(ds_id=datasource_id)
if update_db:
# Note(thread-safety): blocking call
self.unregister_service(datasource['name'])
result = datasources_db.delete_datasource_with_data(datasource_id)
if not result:
raise exception.DatasourceNotFound(id=datasource_id)
# Note(thread-safety): blocking call
try:
self.synchronize_datasources()
except Exception:
msg = ('failed to synchronize_datasource after '
'deleting datasource: %s' % datasource_id)
LOG.exception(msg)
raise exception.DataServiceError(msg)
class DseNodeEndpoints (object):

View File

@ -2153,7 +2153,7 @@ class DseRuntime (Runtime, DataService):
new_tables = self.tablenames(body_only=True)
self.update_table_subscriptions(old_tables, new_tables)
@lockutils.synchronized('synchronize_policies')
@lockutils.synchronized('congress_synchronize_policies')
def synchronize_policies(self):
LOG.info("Synchronizing policies on node %s", self.node.node_id)
# Read policies from DB.

View File

@ -19,6 +19,7 @@ from __future__ import absolute_import
from oslo_utils import uuidutils
from congress.db import datasources
from congress.db import db_ds_table_data
from congress.tests import base
@ -55,6 +56,23 @@ class TestDbDatasource(base.SqlTestCase):
def test_delete_non_existing_datasource(self):
self.assertFalse(datasources.delete_datasource('no_id'))
def test_delete_datasource_with_data(self):
id_ = uuidutils.generate_uuid()
datasources.add_datasource(
id_=id_,
name="hiya",
driver="foo",
config='{user: foo}',
description="hello",
enabled=True)
db_ds_table_data.store_ds_table_data(
ds_id=id_,
tablename='bar',
tabledata=set([('a1', 'b1'), ('a2', 'b2'), ('a3', 'a4')])
)
self.assertTrue(datasources.delete_datasource_with_data(id_))
self.assertEqual(db_ds_table_data.get_ds_table_data(id_), [])
def test_get_datasource_by_name(self):
id_ = uuidutils.generate_uuid()
datasources.add_datasource(

View File

@ -17,9 +17,13 @@ from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import mock
from oslo_config import cfg
from oslo_db import exception as db_exc
cfg.CONF.distributed_architecture = True
from congress.db import datasources as datasource_db
from congress.dse2 import dse_node
from congress import exception as congressException
from congress.tests.api import base as api_base
from congress.tests import base
@ -61,6 +65,24 @@ class TestDataSource(base.SqlTestCase):
req['name'], 'get_status', {'source_id': None, 'params': None})
self.assertIsNotNone(obj)
@mock.patch.object(datasource_db, 'add_datasource')
def test_add_datasource_db_error(self, add_ds):
add_ds.side_effect = db_exc.DBError('Error in db.')
req = self._get_datasource_request()
self.assertRaises(congressException.DatasourceCreationError,
self.dseNode.add_datasource, req)
@mock.patch.object(dse_node.DseNode, 'register_service')
def test_add_datasource_synchronizer_error(self, register_ds):
register_ds.side_effect = Exception('Error in registering service')
req = self._get_datasource_request()
self.assertRaises(congressException.DatasourceCreationError,
self.dseNode.add_datasource, req)
ds = datasource_db.get_datasource_by_name(req['name'])
self.assertIsNone(ds)
def test_get_datasource(self):
req = self._get_datasource_request()
ds = self.dseNode.add_datasource(req)