Separate Polling Methods in DataSourceDriver to PollingDataSourceDriver
For implementing push type datasource driver, methods related to polling feature in DataSourceDriver should be separated to PollingDataSourceDriver. This patch adds PollingDataSourceDriver which is subclass of DataSourceDriver and has polling features for some datasource services. Partially implements blueprint: push-type-datasource-driver Change-Id: I92cc2f53fa2206fa0f41021eb1278c51b502b509
This commit is contained in:
@@ -26,7 +26,7 @@ def d6service(name, keys, inbox, datapath, args):
|
||||
return BenchmarkDriver(name, keys, inbox, datapath, args)
|
||||
|
||||
|
||||
class BenchmarkDriver(datasource_driver.DataSourceDriver):
|
||||
class BenchmarkDriver(datasource_driver.PollingDataSourceDriver):
|
||||
BENCHTABLE = 'benchtable'
|
||||
value_trans = {'translation-type': 'VALUE'}
|
||||
translator = {
|
||||
|
||||
@@ -39,7 +39,7 @@ def d6service(name, keys, inbox, datapath, args):
|
||||
# into DataSourceDriver. E.g. change all the classes to Driver instead of
|
||||
# NeutronDriver, CeilometerDriver, etc. and move the d6instantiate function
|
||||
# to DataSourceDriver.
|
||||
class CeilometerDriver(datasource_driver.DataSourceDriver,
|
||||
class CeilometerDriver(datasource_driver.PollingDataSourceDriver,
|
||||
datasource_driver.ExecutionDriver):
|
||||
METERS = "meters"
|
||||
ALARMS = "alarms"
|
||||
|
||||
@@ -23,7 +23,7 @@ def d6service(name, keys, inbox, datapath, args):
|
||||
return CinderDriver(name, keys, inbox, datapath, args)
|
||||
|
||||
|
||||
class CinderDriver(datasource_driver.DataSourceDriver,
|
||||
class CinderDriver(datasource_driver.PollingDataSourceDriver,
|
||||
datasource_driver.ExecutionDriver):
|
||||
VOLUMES = "volumes"
|
||||
SNAPSHOTS = "snapshots"
|
||||
|
||||
@@ -29,7 +29,7 @@ def d6service(name, keys, inbox, datapath, args):
|
||||
return CloudFoundryV2Driver(name, keys, inbox, datapath, args)
|
||||
|
||||
|
||||
class CloudFoundryV2Driver(datasource_driver.DataSourceDriver,
|
||||
class CloudFoundryV2Driver(datasource_driver.PollingDataSourceDriver,
|
||||
datasource_driver.ExecutionDriver):
|
||||
ORGANIZATIONS = 'organizations'
|
||||
SERVICE_BINDINGS = 'service_bindings'
|
||||
|
||||
@@ -12,9 +12,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# TODO(thinrichs): not all datasources poll, though for now that's the only
|
||||
# option. Create PollingDataSourceDriver subclass to handle the polling
|
||||
# logic.
|
||||
|
||||
import eventlet
|
||||
from oslo_log import log as logging
|
||||
@@ -275,23 +272,10 @@ class DataSourceDriver(deepsix.deepSix):
|
||||
|
||||
def __init__(self, name, keys, inbox, datapath, args):
|
||||
self.initialized = False
|
||||
if args is None:
|
||||
args = dict()
|
||||
|
||||
if 'poll_time' in args:
|
||||
poll_time = int(args['poll_time'])
|
||||
else:
|
||||
poll_time = 10
|
||||
|
||||
# a number of tests rely on polling being disabled if there's no inbox
|
||||
# provided to the deepSix base class so clamp to zero here in that case
|
||||
self.poll_time = poll_time if inbox is not None else 0
|
||||
|
||||
self.last_poll_time = None
|
||||
self.last_updated_time = None
|
||||
self.last_error = None
|
||||
self.worker_greenthread = None
|
||||
self.number_of_updates = 0
|
||||
self.poller_greenthread = None
|
||||
self.refresh_request_queue = eventlet.Queue(maxsize=1)
|
||||
|
||||
# a dictionary from tablename to the SET of tuples, both currently
|
||||
# and in the past.
|
||||
@@ -327,23 +311,6 @@ class DataSourceDriver(deepsix.deepSix):
|
||||
# this because it will publish info to the bus.
|
||||
super(DataSourceDriver, self).__init__(name, keys, inbox, datapath)
|
||||
|
||||
def _init_end_start_poll(self):
|
||||
"""Mark initializes the success and launch poll loop.
|
||||
|
||||
Every instance of this class must call the method at the end of
|
||||
__init__()
|
||||
"""
|
||||
LOG.debug("start to poll from datasource %s", self.name)
|
||||
self.poller_greenthread = eventlet.spawn(self.poll_loop,
|
||||
self.poll_time)
|
||||
self.initialized = True
|
||||
|
||||
def cleanup(self):
|
||||
"""Cleanup this object in preparation for elimination."""
|
||||
if hasattr(self, "poller_greenthread"):
|
||||
eventlet.greenthread.kill(self.poller_greenthread)
|
||||
self.log_info("killed poller thread")
|
||||
|
||||
def _make_tmp_state(self, root_table_name, row_data):
|
||||
tmp_state = {}
|
||||
# init all related tables to empty set
|
||||
@@ -639,24 +606,6 @@ class DataSourceDriver(deepsix.deepSix):
|
||||
col_map[name] = index
|
||||
return col_map
|
||||
|
||||
def get_last_updated_time(self):
|
||||
return self.last_poll_time
|
||||
|
||||
def get_status(self):
|
||||
d = {}
|
||||
d['last_updated'] = str(self.last_poll_time)
|
||||
d['last_error'] = str(self.last_error)
|
||||
d['number_of_updates'] = str(self.number_of_updates)
|
||||
d['initialized'] = str(self.initialized)
|
||||
d['subscriptions'] = [(value.key, value.dataindex)
|
||||
for value in self.subdata.values()]
|
||||
d['subscribers'] = [(name, pubdata.dataindex)
|
||||
for pubdata in self.pubdata.values()
|
||||
for name in pubdata.subscribers]
|
||||
|
||||
# d['inbox_size'] = str(len(self.inbox))
|
||||
return d
|
||||
|
||||
def state_set_diff(self, state1, state2, table=None):
|
||||
"""Return STATE1 - STATE2.
|
||||
|
||||
@@ -1075,35 +1024,6 @@ class DataSourceDriver(deepsix.deepSix):
|
||||
raise exception.InvalidParamException(
|
||||
"Param (%s) must be in translator" % cls.TRANSLATION_TYPE)
|
||||
|
||||
def poll(self):
|
||||
"""Periodically called to update new info.
|
||||
|
||||
Function called periodically to grab new information, compute
|
||||
deltas, and publish those deltas.
|
||||
"""
|
||||
self.log_info("polling")
|
||||
self.prior_state = dict(self.state) # copying self.state
|
||||
self.last_error = None # non-None only when last poll errored
|
||||
try:
|
||||
self.update_from_datasource() # sets self.state
|
||||
tablenames = set(self.state.keys()) | set(self.prior_state.keys())
|
||||
for tablename in tablenames:
|
||||
# publishing full table and using prepush_processing to send
|
||||
# only deltas. Useful so that if policy engine subscribes
|
||||
# late (or dies and comes back up), DSE can automatically
|
||||
# send the full table.
|
||||
if tablename in self.state:
|
||||
self.publish(tablename, self.state[tablename])
|
||||
else:
|
||||
self.publish(tablename, set())
|
||||
except Exception as e:
|
||||
self.last_error = e
|
||||
LOG.exception("Datasource driver raised exception")
|
||||
|
||||
self.last_poll_time = datetime.datetime.now()
|
||||
self.number_of_updates += 1
|
||||
self.log_info("finished polling")
|
||||
|
||||
def prepush_processor(self, data, dataindex, type=None):
|
||||
"""Called before push.
|
||||
|
||||
@@ -1153,12 +1073,98 @@ class DataSourceDriver(deepsix.deepSix):
|
||||
dataindex, text)
|
||||
return result
|
||||
|
||||
def request_refresh(self):
|
||||
raise NotImplementedError('request_refresh() is not implemented.')
|
||||
|
||||
def cleanup(self):
|
||||
"""Cleanup this object in preparation for elimination."""
|
||||
if hasattr(self, "worker_greenthread"):
|
||||
eventlet.greenthread.kill(self.worker_greenthread)
|
||||
self.log_info("killed worker thread")
|
||||
|
||||
def get_status(self):
|
||||
d = {}
|
||||
d['last_updated'] = str(self.last_updated_time)
|
||||
d['last_error'] = str(self.last_error)
|
||||
d['number_of_updates'] = str(self.number_of_updates)
|
||||
d['initialized'] = str(self.initialized)
|
||||
d['subscriptions'] = [(value.key, value.dataindex)
|
||||
for value in self.subdata.values()]
|
||||
d['subscribers'] = [(name, pubdata.dataindex)
|
||||
for pubdata in self.pubdata.values()
|
||||
for name in pubdata.subscribers]
|
||||
|
||||
return d
|
||||
|
||||
def empty_credentials(self):
|
||||
return {'username': '',
|
||||
'password': '',
|
||||
'auth_url': '',
|
||||
'tenant_name': ''}
|
||||
|
||||
|
||||
class PollingDataSourceDriver(DataSourceDriver):
|
||||
def __init__(self, name, keys, inbox, datapath, args):
|
||||
if args is None:
|
||||
args = dict()
|
||||
|
||||
if 'poll_time' in args:
|
||||
poll_time = int(args['poll_time'])
|
||||
else:
|
||||
poll_time = 10
|
||||
|
||||
# a number of tests rely on polling being disabled if there's no inbox
|
||||
# provided to the deepSix base class so clamp to zero here in that case
|
||||
self.poll_time = poll_time if inbox is not None else 0
|
||||
|
||||
self.refresh_request_queue = eventlet.Queue(maxsize=1)
|
||||
|
||||
super(PollingDataSourceDriver, self).__init__(name, keys, inbox,
|
||||
datapath, args)
|
||||
|
||||
def _init_end_start_poll(self):
|
||||
"""Mark initializes the success and launch poll loop.
|
||||
|
||||
Every instance of this class must call the method at the end of
|
||||
__init__()
|
||||
"""
|
||||
LOG.debug("start to poll from datasource %s", self.name)
|
||||
self.worker_greenthread = eventlet.spawn(self.poll_loop,
|
||||
self.poll_time)
|
||||
self.initialized = True
|
||||
|
||||
def get_last_updated_time(self):
|
||||
return self.last_updated_time
|
||||
|
||||
def poll(self):
|
||||
"""Periodically called to update new info.
|
||||
|
||||
Function called periodically to grab new information, compute
|
||||
deltas, and publish those deltas.
|
||||
"""
|
||||
self.log_info("polling")
|
||||
self.prior_state = dict(self.state) # copying self.state
|
||||
self.last_error = None # non-None only when last poll errored
|
||||
try:
|
||||
self.update_from_datasource() # sets self.state
|
||||
tablenames = set(self.state.keys()) | set(self.prior_state.keys())
|
||||
for tablename in tablenames:
|
||||
# publishing full table and using prepush_processing to send
|
||||
# only deltas. Useful so that if policy engine subscribes
|
||||
# late (or dies and comes back up), DSE can automatically
|
||||
# send the full table.
|
||||
if tablename in self.state:
|
||||
self.publish(tablename, self.state[tablename])
|
||||
else:
|
||||
self.publish(tablename, set())
|
||||
except Exception as e:
|
||||
self.last_error = e
|
||||
LOG.exception("Datasource driver raised exception")
|
||||
|
||||
self.last_updated_time = datetime.datetime.now()
|
||||
self.number_of_updates += 1
|
||||
self.log_info("finished polling")
|
||||
|
||||
def request_refresh(self):
|
||||
"""Request a refresh of this service's data."""
|
||||
try:
|
||||
@@ -1183,7 +1189,7 @@ class DataSourceDriver(deepsix.deepSix):
|
||||
"""
|
||||
while self.running:
|
||||
if poll_time:
|
||||
if self.last_poll_time is None:
|
||||
if self.last_updated_time is None:
|
||||
self.poll()
|
||||
else:
|
||||
try:
|
||||
|
||||
@@ -27,7 +27,7 @@ def d6service(name, keys, inbox, datapath, args):
|
||||
return GlanceV2Driver(name, keys, inbox, datapath, args)
|
||||
|
||||
|
||||
class GlanceV2Driver(datasource_driver.DataSourceDriver,
|
||||
class GlanceV2Driver(datasource_driver.PollingDataSourceDriver,
|
||||
datasource_driver.ExecutionDriver):
|
||||
|
||||
IMAGES = "images"
|
||||
|
||||
@@ -25,7 +25,7 @@ def d6service(name, keys, inbox, datapath, args):
|
||||
return HeatV1Driver(name, keys, inbox, datapath, args)
|
||||
|
||||
|
||||
class HeatV1Driver(datasource_driver.DataSourceDriver,
|
||||
class HeatV1Driver(datasource_driver.PollingDataSourceDriver,
|
||||
datasource_driver.ExecutionDriver):
|
||||
|
||||
STACKS = "stacks"
|
||||
|
||||
@@ -28,7 +28,7 @@ def d6service(name, keys, inbox, datapath, args):
|
||||
return IronicDriver(name, keys, inbox, datapath, args)
|
||||
|
||||
|
||||
class IronicDriver(datasource_driver.DataSourceDriver,
|
||||
class IronicDriver(datasource_driver.PollingDataSourceDriver,
|
||||
datasource_driver.ExecutionDriver):
|
||||
CHASSISES = "chassises"
|
||||
NODES = "nodes"
|
||||
|
||||
@@ -25,7 +25,7 @@ def d6service(name, keys, inbox, datapath, args):
|
||||
return d
|
||||
|
||||
|
||||
class KeystoneDriver(datasource_driver.DataSourceDriver,
|
||||
class KeystoneDriver(datasource_driver.PollingDataSourceDriver,
|
||||
datasource_driver.ExecutionDriver):
|
||||
# Table names
|
||||
USERS = "users"
|
||||
|
||||
@@ -35,7 +35,7 @@ def d6service(name, keys, inbox, datapath, args):
|
||||
return MuranoDriver(name, keys, inbox, datapath, args)
|
||||
|
||||
|
||||
class MuranoDriver(datasource_driver.DataSourceDriver,
|
||||
class MuranoDriver(datasource_driver.PollingDataSourceDriver,
|
||||
datasource_driver.ExecutionDriver):
|
||||
OBJECTS = "objects"
|
||||
PARENT_TYPES = "parent_types"
|
||||
|
||||
@@ -26,7 +26,7 @@ def d6service(name, keys, inbox, datapath, args):
|
||||
return NeutronDriver(name, keys, inbox, datapath, args)
|
||||
|
||||
|
||||
class NeutronDriver(datasource_driver.DataSourceDriver,
|
||||
class NeutronDriver(datasource_driver.PollingDataSourceDriver,
|
||||
datasource_driver.ExecutionDriver):
|
||||
|
||||
NETWORKS = "networks"
|
||||
|
||||
@@ -28,7 +28,7 @@ def d6service(name, keys, inbox, datapath, args):
|
||||
return NeutronV2Driver(name, keys, inbox, datapath, args)
|
||||
|
||||
|
||||
class NeutronV2Driver(datasource_driver.DataSourceDriver,
|
||||
class NeutronV2Driver(datasource_driver.PollingDataSourceDriver,
|
||||
datasource_driver.ExecutionDriver):
|
||||
|
||||
NETWORKS = 'networks'
|
||||
|
||||
@@ -28,7 +28,7 @@ def d6service(name, keys, inbox, datapath, args):
|
||||
return NovaDriver(name, keys, inbox, datapath, args)
|
||||
|
||||
|
||||
class NovaDriver(datasource_driver.DataSourceDriver,
|
||||
class NovaDriver(datasource_driver.PollingDataSourceDriver,
|
||||
datasource_driver.ExecutionDriver):
|
||||
SERVERS = "servers"
|
||||
FLAVORS = "flavors"
|
||||
|
||||
@@ -45,7 +45,7 @@ def d6service(name, keys, inbox, datapath, args):
|
||||
return PlexxiDriver(name, keys, inbox, datapath, args)
|
||||
|
||||
|
||||
class PlexxiDriver(datasource_driver.DataSourceDriver,
|
||||
class PlexxiDriver(datasource_driver.PollingDataSourceDriver,
|
||||
datasource_driver.ExecutionDriver):
|
||||
HOSTS = "hosts"
|
||||
HOST_MACS = HOSTS + '.macs'
|
||||
|
||||
@@ -26,7 +26,7 @@ def d6service(name, keys, inbox, datapath, args):
|
||||
return SwiftDriver(name, keys, inbox, datapath, args)
|
||||
|
||||
|
||||
class SwiftDriver(datasource_driver.DataSourceDriver,
|
||||
class SwiftDriver(datasource_driver.PollingDataSourceDriver,
|
||||
datasource_driver.ExecutionDriver):
|
||||
|
||||
CONTAINERS = "containers"
|
||||
|
||||
@@ -34,7 +34,7 @@ def d6service(name, keys, inbox, datapath, args):
|
||||
return VCenterDriver(name, keys, inbox, datapath, args)
|
||||
|
||||
|
||||
class VCenterDriver(datasource_driver.DataSourceDriver,
|
||||
class VCenterDriver(datasource_driver.PollingDataSourceDriver,
|
||||
datasource_driver.ExecutionDriver):
|
||||
|
||||
HOSTS = "hosts"
|
||||
|
||||
@@ -32,7 +32,7 @@ def d6service(name, keys, inbox, datapath, args):
|
||||
return PerformanceTestDriver(name, keys, inbox, datapath, args)
|
||||
|
||||
|
||||
class PerformanceTestDriver(datasource_driver.DataSourceDriver):
|
||||
class PerformanceTestDriver(datasource_driver.PollingDataSourceDriver):
|
||||
TABLE = 'p'
|
||||
|
||||
# This is the most common per-value translator, so define it once here.
|
||||
|
||||
@@ -1552,17 +1552,6 @@ class TestDatasourceDriver(base.TestCase):
|
||||
'level10': ['level10']}
|
||||
self.assertEqual(expected_table_deps, driver._table_deps)
|
||||
|
||||
@mock.patch.object(eventlet, 'spawn')
|
||||
def test_init_consistence(self, mock_spawn):
|
||||
class TestDriver(datasource_driver.DataSourceDriver):
|
||||
def __init__(self):
|
||||
super(TestDriver, self).__init__('', '', None, None, None)
|
||||
self._init_end_start_poll()
|
||||
test_driver = TestDriver()
|
||||
mock_spawn.assert_called_once_with(test_driver.poll_loop,
|
||||
test_driver.poll_time)
|
||||
self.assertTrue(test_driver.initialized)
|
||||
|
||||
@mock.patch.object(eventlet, 'spawn')
|
||||
def test_init_consistence_with_exception(self, mock_spawn):
|
||||
class TestDriver(datasource_driver.DataSourceDriver):
|
||||
@@ -1685,6 +1674,22 @@ class TestDatasourceDriver(base.TestCase):
|
||||
self.assertEqual([], expected_ret)
|
||||
|
||||
|
||||
class TestPollingDataSourceDriver(base.TestCase):
|
||||
def setUp(self):
|
||||
super(TestPollingDataSourceDriver, self).setUp()
|
||||
|
||||
@mock.patch.object(eventlet, 'spawn')
|
||||
def test_init_consistence(self, mock_spawn):
|
||||
class TestDriver(datasource_driver.PollingDataSourceDriver):
|
||||
def __init__(self):
|
||||
super(TestDriver, self).__init__('', '', None, None, None)
|
||||
self._init_end_start_poll()
|
||||
test_driver = TestDriver()
|
||||
mock_spawn.assert_called_once_with(test_driver.poll_loop,
|
||||
test_driver.poll_time)
|
||||
self.assertTrue(test_driver.initialized)
|
||||
|
||||
|
||||
class TestExecutionDriver(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
||||
@@ -32,7 +32,7 @@ def d6service(name, keys, inbox, datapath, args):
|
||||
return TestDriver(name, keys, inbox, datapath, args)
|
||||
|
||||
|
||||
class TestDriver(datasource_driver.DataSourceDriver):
|
||||
class TestDriver(datasource_driver.PollingDataSourceDriver):
|
||||
def __init__(self, name='', keys='', inbox=None, datapath=None, args=None):
|
||||
if args is None:
|
||||
args = self._empty_openstack_credentials()
|
||||
|
||||
@@ -26,7 +26,7 @@ def d6service(name, keys, inbox, datapath, args):
|
||||
return FakeDataSource(name, keys, inbox, datapath, args)
|
||||
|
||||
|
||||
class FakeDataSource(datasource_driver.DataSourceDriver,
|
||||
class FakeDataSource(datasource_driver.PollingDataSourceDriver,
|
||||
datasource_driver.ExecutionDriver):
|
||||
|
||||
value_trans = {'translation-type': 'VALUE'}
|
||||
|
||||
Reference in New Issue
Block a user