Start and Stop DatasourceDriver in new architecture

PollingDatasourceDriver starts polling once the driver has been
initialized and will never stop polling. This patch enables
PollingDatasourceDriver to start polling after its start() is
called and to stop polling after its stop() is called.

Change-Id: I395e5ddbfa6907cc26d4636eeb1ecbb954baae8d
This commit is contained in:
Masahito Muroi 2016-05-30 14:11:43 +09:00 committed by Anusha Ramineni
parent dd5892787f
commit dfff8458f1
11 changed files with 138 additions and 60 deletions

View File

@ -1121,6 +1121,7 @@ class DataSourceDriver(deepsix.deepSix):
def request_refresh(self):
raise NotImplementedError('request_refresh() is not implemented.')
@utils.removed_in_dse2
def cleanup(self):
"""Cleanup this object in preparation for elimination."""
pass
@ -1254,18 +1255,31 @@ class PollingDataSourceDriver(DataSourceDriver):
Every instance of this class must call the method at the end of
__init__()
"""
LOG.debug("start to poll from datasource %s", self.name)
self.worker_greenthread = eventlet.spawn(self.poll_loop,
self.poll_time)
if self._running:
self.worker_greenthread = eventlet.spawn(self.poll_loop,
self.poll_time)
self.initialized = True
def cleanup(self):
"""Delete worker thread if created."""
def start(self):
super(PollingDataSourceDriver, self).start()
if not self.worker_greenthread:
self.worker_greenthread = eventlet.spawn(self.poll_loop,
self.poll_time)
def stop(self):
self.stop_polling_thread()
super(PollingDataSourceDriver, self).stop()
def stop_polling_thread(self):
if self.worker_greenthread is not None:
eventlet.greenthread.kill(self.worker_greenthread)
self.worker_greenthread = None
self.log_info("killed worker thread")
@utils.removed_in_dse2
def cleanup(self):
"""Delete worker thread if created."""
self.stop_polling_thread()
super(PollingDataSourceDriver, self).cleanup()
def get_last_updated_time(self):
@ -1323,9 +1337,8 @@ class PollingDataSourceDriver(DataSourceDriver):
:param poll_time: is the amount of time (in seconds) to wait between
polling rounds.
"""
# todo(dse2) replace self.running with self._running since self.running
# is defined in deepsix and deepsix2, a placeholder.
while self.running:
LOG.debug("start to poll from datasource %s", self.name)
while self._running:
if poll_time:
if self.last_updated_time is None:
self.poll()

View File

@ -402,7 +402,7 @@ class d6Cage(deepsix.deepSix):
self.d6reload(msg)
def router_loop(self):
while self.running:
while self._running:
msg = self.dataPath.get()
self.routemsg(msg)
self.dataPath.task_done()

View File

@ -48,7 +48,7 @@ class deepSix(greenthread.GreenThread):
self.keys = keyList
self.running = True
self._running = True
self.pubdata = {}
self.subdata = {}
@ -216,7 +216,7 @@ class deepSix(greenthread.GreenThread):
except Exception as errmsg:
self.log("error stopping timer thread: %s", errmsg)
self.running = False
self._running = False
self.keys = {}
keydata = {}
@ -527,7 +527,7 @@ class deepSix(greenthread.GreenThread):
# self.running will be set to False when processing a shutdown a
# message
while self.running:
while self._running:
if self.inbox:
msg = self.inbox.get()
self.receive(msg)

View File

@ -28,7 +28,6 @@ class deepSix(DataService):
def __init__(self, name, keys, inbox=None, dataPath=None):
DataService.__init__(self, name)
self.name = name
self.running = True
def log_info(self, msg, *args):
LOG.info(msg, *args)

View File

@ -76,7 +76,7 @@ class Synchronizer(deepsix.deepSix):
:param poll_time: is the amount of time (in seconds) to wait between
successful polling rounds.
"""
while self.running:
while self._running:
if poll_time:
if self.last_poll_time is None:
self.do_poll()

View File

@ -147,7 +147,8 @@ def datasource_openstack_args():
return {'username': '',
'password': '',
'auth_url': '',
'tenant_name': ''}
'tenant_name': '',
'poll_time': 1}
def pause(factor=1):
@ -380,8 +381,10 @@ def retry_check_function_return_value(f, expected_value):
@retrying.retry(stop_max_attempt_number=10, wait_fixed=500)
def retry_check_function_return_value_not_eq(f, value):
"""Check if function f does not return expected value."""
if f() == value:
raise Exception("Actual value '%s' not different from '%s'" % value)
result = f()
if result == value:
raise Exception("Actual value '%s' should be different "
"from '%s'" % (result, value))
@retrying.retry(stop_max_attempt_number=10, wait_fixed=500)

View File

View File

@ -0,0 +1,76 @@
# Copyright (c) 2016 Styra, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import eventlet
import mock
from oslo_config import cfg
cfg.CONF.distributed_architecture = True
from congress.datasources import datasource_driver
from congress.tests import base
from congress.tests.datasources import test_datasource_driver
from congress.tests.datasources import test_driver
class TestDS(test_datasource_driver.TestDatasourceDriver):
pass
class TestPollingDataSourceDriver(base.TestCase):
class TestDriver(datasource_driver.PollingDataSourceDriver):
def __init__(self):
super(TestPollingDataSourceDriver.TestDriver, self).__init__(
'', '', None, None, None)
self.node = 'node'
self._rpc_server = mock.MagicMock()
self._init_end_start_poll()
def setUp(self):
super(TestPollingDataSourceDriver, self).setUp()
@mock.patch.object(eventlet, 'spawn')
def test_init_consistence(self, mock_spawn):
test_driver = TestPollingDataSourceDriver.TestDriver()
mock_spawn.assert_not_called()
self.assertIsNone(test_driver.worker_greenthread)
test_driver.start()
mock_spawn.assert_called_once_with(test_driver.poll_loop,
test_driver.poll_time)
self.assertTrue(test_driver.initialized)
self.assertIsNotNone(test_driver.worker_greenthread)
@mock.patch.object(eventlet.greenthread, 'kill')
@mock.patch.object(eventlet, 'spawn')
def test_cleanup(self, mock_spawn, mock_kill):
dummy_thread = dict()
mock_spawn.return_value = dummy_thread
test_driver = TestPollingDataSourceDriver.TestDriver()
test_driver.start()
self.assertEqual(test_driver.worker_greenthread, dummy_thread)
test_driver.stop()
mock_kill.assert_called_once_with(dummy_thread)
self.assertIsNone(test_driver.worker_greenthread)
class TestExecution(test_datasource_driver.TestExecutionDriver):
pass
class TestDriver(test_driver.TestDriver):
pass

View File

@ -149,15 +149,26 @@ class TestCongress(base.SqlTestCase):
self.assertEqual(len(ds), 0)
def test_datasource_request_refresh(self):
# Remember that neutron does not poll automatically here, which
# is why this test actually testing request_refresh
# neutron polls automatically here, which is why register_service
# starts its service.
neutron = self.neutronv2
LOG.info("neutron.state: %s", neutron.state)
self.assertEqual(len(neutron.state['ports']), 0)
# TODO(thinrichs): Seems we can't test the datasource API at all.
# api['datasource-model'].request_refresh_action(
# {}, context, helper.FakeRequest({}))
neutron.stop()
self.assertEqual(neutron.refresh_request_queue.qsize(), 0)
neutron.request_refresh()
self.assertEqual(neutron.refresh_request_queue.qsize(), 1)
neutron.start()
neutron.request_refresh()
f = lambda: neutron.refresh_request_queue.qsize()
helper.retry_check_function_return_value(f, 0)
def test_datasource_poll(self):
neutron = self.neutronv2
neutron.stop()
neutron._translate_ports({'ports': []})
self.assertEqual(len(neutron.state['ports']), 0)
neutron.start()
f = lambda: len(neutron.state['ports'])
helper.retry_check_function_return_value_not_eq(f, 0)

View File

@ -1,36 +0,0 @@
# Copyright (c) 2016 Styra, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo_config import cfg
cfg.CONF.distributed_architecture = True
from congress.tests.datasources import test_datasource_driver
from congress.tests.datasources import test_driver
class TestDS(test_datasource_driver.TestDatasourceDriver):
pass
class TestPolling(test_datasource_driver.TestPollingDataSourceDriver):
pass
class TestExecution(test_datasource_driver.TestExecutionDriver):
pass
class TestDriver(test_driver.TestDriver):
pass

View File

@ -21,6 +21,7 @@ from __future__ import division
from __future__ import absolute_import
import contextlib
import functools
import json
import os
import shutil
@ -88,6 +89,17 @@ def get_root_path():
return os.path.dirname(os.path.dirname(__file__))
def removed_in_dse2(wrapped):
@functools.wraps(wrapped)
def wrapper(*args, **kwargs):
if cfg.CONF.distributed_architecture:
LOG.error('%s is called in dse2' % wrapped.__name__)
raise Exception('inappropriate function is called.')
else:
return wrapped(*args, **kwargs)
return wrapper
class Location (object):
"""A location in the program source code."""