From d7a81b9208fc257752840d4d1591f90d66b1028b Mon Sep 17 00:00:00 2001 From: Masahito Muroi Date: Fri, 8 Jul 2016 18:41:49 +0900 Subject: [PATCH] Instantiating datasource service by synchronizer datasource synchronizer takes care of creating and deleting datasource services. All API managing datasource services calls synchronizer_datasources to prevent unexpected race conditions. Closes-Bugs: #1588167 Change-Id: I09e308132ad9ff00303e77eb06b7d1a41464bd52 --- congress/db/datasources.py | 9 +++++ congress/dse2/dse_node.py | 46 ++++++++++++++------------ congress/policy_engines/agnostic.py | 2 +- congress/tests/db/test_datasources.py | 18 ++++++++++ congress/tests/dse2/test_datasource.py | 22 ++++++++++++ 5 files changed, 74 insertions(+), 23 deletions(-) diff --git a/congress/db/datasources.py b/congress/db/datasources.py index 0c42faeef..ffcd213cd 100644 --- a/congress/db/datasources.py +++ b/congress/db/datasources.py @@ -23,6 +23,7 @@ import sqlalchemy as sa from sqlalchemy.orm import exc as db_exc from congress.db import api as db +from congress.db import db_ds_table_data as table_data from congress.db import model_base @@ -66,6 +67,14 @@ def delete_datasource(id_, session=None): Datasource.id == id_).delete() +def delete_datasource_with_data(id_, session=None): + session = session or db.get_session() + with session.begin(subtransactions=True): + deleted = delete_datasource(id_, session) + table_data.delete_ds_table_data(id_, session=session) + return deleted + + def get_datasource_name(name_or_id, session=None): session = session or db.get_session() datasource_obj = get_datasource(name_or_id, session) diff --git a/congress/dse2/dse_node.py b/congress/dse2/dse_node.py index 834ecdd8c..2941fe3c8 100644 --- a/congress/dse2/dse_node.py +++ b/congress/dse2/dse_node.py @@ -32,9 +32,7 @@ from oslo_utils import strutils from oslo_utils import uuidutils from congress.datasources import constants -from congress.db import api as db from congress.db import datasources as datasources_db -from congress.db import db_ds_table_data from congress.dse2 import control_bus from congress import exception @@ -153,7 +151,6 @@ class DseNode(object): return {'node_id': self.node_id, 'instance': str(self.instance)} # Note(thread-safety): blocking function - @lockutils.synchronized('register_service') def register_service(self, service): assert service.node is None if self.service_object(service.service_id): @@ -603,6 +600,7 @@ class DseNode(object): except Exception: LOG.exception("synchronize_datasources failed") + @lockutils.synchronized('congress_synchronize_datasources') def synchronize_datasources(self): LOG.info("Synchronizing running datasources") added = 0 @@ -623,8 +621,8 @@ class DseNode(object): continue if active_ds is None: # service is not up, create the service - LOG.debug("registering %s service on node %s", - configured_ds['name'], self.node_id) + LOG.info("registering %s service on node %s", + configured_ds['name'], self.node_id) service = self.create_datasource_service(configured_ds) self.register_service(service) added = added + 1 @@ -727,6 +725,9 @@ class DseNode(object): enabled=req['enabled']) except db_exc.DBDuplicateEntry: raise exception.DatasourceNameInUse(value=req['name']) + except db_exc.DBError: + LOG.exception('Creating a new datasource failed.') + raise exception.DatasourceCreationError(value=req['name']) new_id = datasource['id'] try: @@ -745,12 +746,12 @@ class DseNode(object): 'created in the node') except Exception: LOG.exception( - 'Unexpected exception encountered while synchronizing new ' - 'datasource %s.', req['name']) + 'Unexpected exception encountered while registering ' + 'new datasource %s.', req['name']) if update_db: - # Note(thread-safety): blocking call - datasources_db.delete_datasource(new_id) - raise exception.DatasourceCreationError(value=req['name']) + datasources_db.delete_datasource(req['id']) + msg = ("Datasource service: %s creation fails." % req['name']) + raise exception.DatasourceCreationError(message=msg) new_item = dict(item) new_item['id'] = new_id @@ -832,22 +833,23 @@ class DseNode(object): return service # Note(thread-safety): blocking function - # FIXME(thread-safety): make sure unregister_service succeeds even if - # service already unregistered def delete_datasource(self, datasource, update_db=True): LOG.debug("Deleting %s datasource ", datasource['name']) datasource_id = datasource['id'] - session = db.get_session() - with session.begin(subtransactions=True): - if update_db: - # Note(thread-safety): blocking call - result = datasources_db.delete_datasource( - datasource_id, session) - if not result: - raise exception.DatasourceNotFound(id=datasource_id) - db_ds_table_data.delete_ds_table_data(ds_id=datasource_id) + if update_db: # Note(thread-safety): blocking call - self.unregister_service(datasource['name']) + result = datasources_db.delete_datasource_with_data(datasource_id) + if not result: + raise exception.DatasourceNotFound(id=datasource_id) + + # Note(thread-safety): blocking call + try: + self.synchronize_datasources() + except Exception: + msg = ('failed to synchronize_datasource after ' + 'deleting datasource: %s' % datasource_id) + LOG.exception(msg) + raise exception.DataServiceError(msg) class DseNodeEndpoints (object): diff --git a/congress/policy_engines/agnostic.py b/congress/policy_engines/agnostic.py index 58692b48f..b7d18b8f4 100644 --- a/congress/policy_engines/agnostic.py +++ b/congress/policy_engines/agnostic.py @@ -2153,7 +2153,7 @@ class DseRuntime (Runtime, DataService): new_tables = self.tablenames(body_only=True) self.update_table_subscriptions(old_tables, new_tables) - @lockutils.synchronized('synchronize_policies') + @lockutils.synchronized('congress_synchronize_policies') def synchronize_policies(self): LOG.info("Synchronizing policies on node %s", self.node.node_id) # Read policies from DB. diff --git a/congress/tests/db/test_datasources.py b/congress/tests/db/test_datasources.py index 7193b1726..32d024187 100644 --- a/congress/tests/db/test_datasources.py +++ b/congress/tests/db/test_datasources.py @@ -19,6 +19,7 @@ from __future__ import absolute_import from oslo_utils import uuidutils from congress.db import datasources +from congress.db import db_ds_table_data from congress.tests import base @@ -55,6 +56,23 @@ class TestDbDatasource(base.SqlTestCase): def test_delete_non_existing_datasource(self): self.assertFalse(datasources.delete_datasource('no_id')) + def test_delete_datasource_with_data(self): + id_ = uuidutils.generate_uuid() + datasources.add_datasource( + id_=id_, + name="hiya", + driver="foo", + config='{user: foo}', + description="hello", + enabled=True) + db_ds_table_data.store_ds_table_data( + ds_id=id_, + tablename='bar', + tabledata=set([('a1', 'b1'), ('a2', 'b2'), ('a3', 'a4')]) + ) + self.assertTrue(datasources.delete_datasource_with_data(id_)) + self.assertEqual(db_ds_table_data.get_ds_table_data(id_), []) + def test_get_datasource_by_name(self): id_ = uuidutils.generate_uuid() datasources.add_datasource( diff --git a/congress/tests/dse2/test_datasource.py b/congress/tests/dse2/test_datasource.py index 2f6bc7e38..6390dad15 100644 --- a/congress/tests/dse2/test_datasource.py +++ b/congress/tests/dse2/test_datasource.py @@ -17,9 +17,13 @@ from __future__ import print_function from __future__ import division from __future__ import absolute_import +import mock from oslo_config import cfg +from oslo_db import exception as db_exc cfg.CONF.distributed_architecture = True +from congress.db import datasources as datasource_db +from congress.dse2 import dse_node from congress import exception as congressException from congress.tests.api import base as api_base from congress.tests import base @@ -61,6 +65,24 @@ class TestDataSource(base.SqlTestCase): req['name'], 'get_status', {'source_id': None, 'params': None}) self.assertIsNotNone(obj) + @mock.patch.object(datasource_db, 'add_datasource') + def test_add_datasource_db_error(self, add_ds): + add_ds.side_effect = db_exc.DBError('Error in db.') + + req = self._get_datasource_request() + self.assertRaises(congressException.DatasourceCreationError, + self.dseNode.add_datasource, req) + + @mock.patch.object(dse_node.DseNode, 'register_service') + def test_add_datasource_synchronizer_error(self, register_ds): + register_ds.side_effect = Exception('Error in registering service') + + req = self._get_datasource_request() + self.assertRaises(congressException.DatasourceCreationError, + self.dseNode.add_datasource, req) + ds = datasource_db.get_datasource_by_name(req['name']) + self.assertIsNone(ds) + def test_get_datasource(self): req = self._get_datasource_request() ds = self.dseNode.add_datasource(req)