Fix Gate HA test Failure

This commit fixes the HA issue happenning from time to time.
The problem is with explict call to create/delete datasource
policy in add_datasource/delete_datasource which is causing
the error, if synchronizer is run between the 2 statements.

So, in this commit removed the explicit call to create/delete
policies and let synchronizer take care of the same in next
periodic iteration.

Haven't added explicit call to synchronizer because of following-
1. In case of HA, even if explicit call to synchronizer added
will only run on one PE node and other nodes needs to be
synchronized later (or we need to broadcast the call) ,
and also if the synchronizer periodic task is running in
different thread, and making an explicit call again, might
cause race condition to happen.

Change-Id: I76e8c859104eaa5d966c9c146dcf0f6bec372d71
This commit is contained in:
Anusha Ramineni 2016-07-29 09:49:21 +05:30
parent 1c38124e75
commit bcb7683132
9 changed files with 16 additions and 25 deletions

View File

@ -27,7 +27,6 @@ from congress.api import base
from congress.api import error_codes
from congress.api import webservice
from congress import exception
from congress import utils
LOG = logging.getLogger(__name__)
@ -99,9 +98,7 @@ class DatasourceModel(base.APIModel):
if self.dist_arch:
# Note(thread-safety): blocking call
obj = self.bus.add_datasource(item=item)
# Note(thread-safety): blocking call
utils.create_datasource_policy(self.bus, obj['name'],
self.engine)
# Let PE synchronizer take care of creating the policy.
else:
# Note(thread-safety): blocking call
obj = self.datasource_mgr.add_datasource(item=item)
@ -126,8 +123,6 @@ class DatasourceModel(base.APIModel):
if self.dist_arch:
# Note(thread-safety): blocking call
datasource = self.bus.get_datasource(ds_id)
args = {'name': datasource['name'],
'disallow_dangling_refs': True}
# FIXME(thread-safety):
# by the time greenthread resumes, the
# returned datasource name could refer to a totally different
@ -147,7 +142,7 @@ class DatasourceModel(base.APIModel):
# Note(thread-safety): blocking call
self.bus.delete_datasource(datasource)
self.invoke_rpc(self.engine, 'delete_policy', args)
# Let PE synchronizer takes care of deleting policy
else:
# Note(thread-safety): blocking call
self.datasource_mgr.delete_datasource(ds_id)

View File

@ -66,9 +66,6 @@ core_opts = [
cfg.ListOpt('drivers',
default=[],
help=_("List of driver class paths to import.")),
cfg.BoolOpt('enable_synchronizer', default=True,
help='enable to synchronize policies and datasources across '
'nodes. Must be enabled for HA'),
cfg.IntOpt('datasource_sync_period', default=0,
help='The number of seconds to wait between synchronizing '
'datasource config from the database '),

View File

@ -540,8 +540,7 @@ class DseNode(object):
if self._running:
self.sync_thread = eventlet.spawn_n(self.periodic_tasks.start)
@periodics.periodic(spacing=(cfg.CONF.datasource_sync_period or 60),
enabled=cfg.CONF.enable_synchronizer)
@periodics.periodic(spacing=(cfg.CONF.datasource_sync_period or 60))
def synchronize(self):
try:
self.synchronize_datasources()

View File

@ -314,11 +314,10 @@ def create2(node, policy_engine=True, datasources=True, api=True):
# node.unregister_service(ds)
# start synchronizer
if cfg.CONF.enable_synchronizer:
if policy_engine:
services[ENGINE_SERVICE_NAME].start_policy_synchronizer()
if datasources:
node.start_datasource_synchronizer()
if policy_engine:
services[ENGINE_SERVICE_NAME].start_policy_synchronizer()
if datasources:
node.start_datasource_synchronizer()
return services

View File

@ -2110,8 +2110,7 @@ class Dse2Runtime(DseRuntime):
super(Dse2Runtime, self).stop()
@periodics.periodic(spacing=(cfg.CONF.datasource_sync_period or 60),
run_immediately=True,
enabled=cfg.CONF.enable_synchronizer)
run_immediately=True)
def synchronize(self):
try:
self.synchronize_policies()

View File

@ -1,5 +1,2 @@
[DEFAULT]
enable_synchronizer = False
[database]
connection = 'sqlite://'

View File

@ -12,7 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
#
from futurist import periodics
import mock
from oslo_config import cfg
from congress import harness
@ -33,7 +34,6 @@ def setup_config(with_fake_datasource=True, node_id='testnode',
cfg.CONF.set_override(
'drivers',
['congress.tests.fake_datasource.FakeDataSource'])
cfg.CONF.set_override('enable_synchronizer', False)
if same_partition_as_node is None:
node = helper.make_dsenode_new_partition(node_id)
@ -41,7 +41,8 @@ def setup_config(with_fake_datasource=True, node_id='testnode',
node = helper.make_dsenode_same_partition(
same_partition_as_node, node_id)
services = harness.create2(node=node)
with mock.patch.object(periodics, 'PeriodicWorker', autospec=True):
services = harness.create2(node=node)
# Always register engine and fake datasource
# engine = Dse2Runtime('engine')

View File

@ -73,6 +73,7 @@ class TestDatasourceModel(base.SqlTestCase):
datasource3 = self._get_datasource_request()
datasource3['name'] = 'datasource-test-3'
self.datasource_model.add_item(datasource3, {})
self.engine.synchronize_policies()
obj = self.engine.policy_object('datasource-test-3')
self.assertIsNotNone(obj.schema)
self.assertEqual('datasource-test-3', obj.name)
@ -86,9 +87,11 @@ class TestDatasourceModel(base.SqlTestCase):
datasource = self._get_datasource_request()
datasource['name'] = 'test-datasource'
d_id, dinfo = self.datasource_model.add_item(datasource, {})
self.engine.synchronize_policies()
self.assertTrue(self.engine.assert_policy_exists('test-datasource'))
context = {'ds_id': d_id}
self.datasource_model.delete_item(None, {}, context=context)
self.engine.synchronize_policies()
self.assertRaises(exception.PolicyRuntimeException,
self.engine.assert_policy_exists, 'test-datasource')
self.assertRaises(exception.DatasourceNotFound,

View File

@ -99,6 +99,7 @@ class TestCongress(BaseTestPolicyCongress):
def test_policy_datasource(self):
self.create_policy('alpha')
self.create_fake_datasource('fake')
self.engine.synchronize_policies()
data = self.node.service_object('fake')
data.state = {'fake_table': set([(1, 2)])}