Introduce 'lazy_tables' flag to nova datasource driver
Usually, datasources pull data for all tables defined in the driver. It sometime causes a performance issue when no policy rule refers the table but the table takes long time to pull data. This patch introduces 'lazy_tables' flag to PollingDataSourceDriver which suspends to pull data until another service subscribes the table. Admin can specify the flag only when they create a new datasource and only for dse2 now. This allows nova_driver to accept lazy_tables flag as a first implementation. Sample command: $ openstack congress datasource create --config lazy_tables=["flavors"] \ --config <some option for openstack> nova nova Patially implements blueprint: selectable-translator Change-Id: I525c374e4da82dd5704c480f11762de291367519
This commit is contained in:
parent
28812457c0
commit
f8a33953e9
@ -320,8 +320,7 @@ class DataSourceDriver(deepsix.deepSix):
|
||||
self._table_deps = {}
|
||||
|
||||
# setup translators here for datasource drivers that set TRANSLATORS.
|
||||
for translator in self.TRANSLATORS:
|
||||
self.register_translator(translator)
|
||||
self.initialize_translators()
|
||||
|
||||
# Make sure all data structures above are set up *before* calling
|
||||
# this because it will publish info to the bus.
|
||||
@ -445,6 +444,10 @@ class DataSourceDriver(deepsix.deepSix):
|
||||
raise exception.InvalidTranslationType(msg)
|
||||
self._validate_by_translation_type(translator, related_tables)
|
||||
|
||||
def initialize_translators(self):
|
||||
for translator in self.TRANSLATORS:
|
||||
self.register_translator(translator)
|
||||
|
||||
def register_translator(self, translator):
|
||||
"""Registers translator with congress and validates its schema."""
|
||||
related_tables = []
|
||||
@ -1267,6 +1270,15 @@ class PollingDataSourceDriver(DataSourceDriver):
|
||||
|
||||
self.poll_time = poll_time
|
||||
|
||||
self.lazy_tables = args.get('lazy_tables', [])
|
||||
self.validate_lazy_tables()
|
||||
|
||||
# a dict for update method
|
||||
# key: root table name
|
||||
# value: update method
|
||||
# ex: {'servers': <pointer for the updating method>}
|
||||
self.update_methods = {}
|
||||
|
||||
self.refresh_request_queue = eventlet.Queue(maxsize=1)
|
||||
self.worker_greenthread = None
|
||||
|
||||
@ -1284,6 +1296,37 @@ class PollingDataSourceDriver(DataSourceDriver):
|
||||
self.poll_time)
|
||||
self.initialized = True
|
||||
|
||||
def add_update_method(self, method, translator):
|
||||
if translator[self.TABLE_NAME] in self.update_methods:
|
||||
raise exception.Conflict('A method has already registered for '
|
||||
'the table %s.' %
|
||||
translator[self.TABLE_NAME])
|
||||
self.update_methods[translator[self.TABLE_NAME]] = method
|
||||
|
||||
def validate_lazy_tables(self):
|
||||
"""Check all the lazy_tables is root table name."""
|
||||
root_table_names = [t[self.TABLE_NAME] for t in self.TRANSLATORS]
|
||||
invalid_table = [t for t in self.lazy_tables
|
||||
if t not in root_table_names]
|
||||
if invalid_table:
|
||||
LOG.info('Invalid table name in lazy_tables config: %s')
|
||||
msg = ("Invalid lazy tables: %s. Accepted tables for datasource "
|
||||
"%s are %s." % (invalid_table, self.name, root_table_names))
|
||||
raise exception.BadRequest(msg)
|
||||
|
||||
def initialize_translators(self):
|
||||
"""Register translators for polling and define tables.
|
||||
|
||||
This registers a translator and defines tables for subscribers.
|
||||
When a table name in root translator is specified as a lazy
|
||||
it skips registering the translator and doesn't define the table.
|
||||
"""
|
||||
for translator in self.TRANSLATORS:
|
||||
if translator[self.TABLE_NAME] not in self.lazy_tables:
|
||||
LOG.debug('register translator: %s'
|
||||
% translator[self.TABLE_NAME])
|
||||
self.register_translator(translator)
|
||||
|
||||
def start(self):
|
||||
super(PollingDataSourceDriver, self).start()
|
||||
if not self.worker_greenthread:
|
||||
@ -1304,9 +1347,33 @@ class PollingDataSourceDriver(DataSourceDriver):
|
||||
self.worker_greenthread = None
|
||||
self.log_info("killed worker thread")
|
||||
|
||||
def get_snapshot(self, table_name):
|
||||
"""Return a snapshot of table."""
|
||||
if (table_name in [t[self.TABLE_NAME] for t in self.TRANSLATORS] and
|
||||
table_name not in self._table_deps):
|
||||
new_translator = next(t for t in self.TRANSLATORS
|
||||
if t[self.TABLE_NAME] == table_name)
|
||||
self.register_translator(new_translator)
|
||||
self.update_methods[table_name]()
|
||||
|
||||
return super(PollingDataSourceDriver, self).get_snapshot(table_name)
|
||||
|
||||
def get_row_data(self, table_id, *args, **kwargs):
|
||||
if table_id not in self.state and table_id in self.lazy_tables:
|
||||
raise exception.LazyTable(lazy_table=table_id)
|
||||
|
||||
return super(PollingDataSourceDriver, self).get_row_data(table_id,
|
||||
*args,
|
||||
**kwargs)
|
||||
|
||||
def get_last_updated_time(self):
|
||||
return self.last_updated_time
|
||||
|
||||
def update_from_datasource(self):
|
||||
for registered_table in self._table_deps:
|
||||
LOG.debug('update table %s.' % registered_table)
|
||||
self.update_methods[registered_table]()
|
||||
|
||||
# Note(thread-safety): blocking function
|
||||
def poll(self):
|
||||
"""Periodically called to update new info.
|
||||
|
@ -187,6 +187,7 @@ class NovaDriver(datasource_driver.PollingDataSourceDriver,
|
||||
'e.g. meta1=val1 meta2=val2'}],
|
||||
"A wrapper for servers.set_meta()")
|
||||
self.add_executable_client_methods(self.nova_client, 'novaclient.v2.')
|
||||
self.initialize_update_methods()
|
||||
self._init_end_start_poll()
|
||||
|
||||
@staticmethod
|
||||
@ -197,19 +198,36 @@ class NovaDriver(datasource_driver.PollingDataSourceDriver,
|
||||
'OpenStack Compute aka nova.')
|
||||
result['config'] = ds_utils.get_openstack_required_config()
|
||||
result['config']['api_version'] = constants.OPTIONAL
|
||||
result['config']['lazy_tables'] = constants.OPTIONAL
|
||||
result['secret'] = ['password']
|
||||
return result
|
||||
|
||||
def update_from_datasource(self):
|
||||
servers = self.nova_client.servers.list(
|
||||
detailed=True, search_opts={"all_tenants": 1})
|
||||
self._translate_servers(servers)
|
||||
self._translate_flavors(self.nova_client.flavors.list())
|
||||
self._translate_hosts(self.nova_client.hosts.list())
|
||||
self._translate_floating_ips(self.nova_client.floating_ips.list())
|
||||
self._translate_services(self.nova_client.services.list())
|
||||
self._translate_availability_zones(
|
||||
def initialize_update_methods(self):
|
||||
servers_method = lambda: self._translate_servers(
|
||||
self.nova_client.servers.list(
|
||||
detailed=True, search_opts={"all_tenants": 1}))
|
||||
self.add_update_method(servers_method, self.servers_translator)
|
||||
|
||||
flavors_method = lambda: self._translate_flavors(
|
||||
self.nova_client.flavors.list())
|
||||
self.add_update_method(flavors_method, self.flavors_translator)
|
||||
|
||||
hosts_method = lambda: self._translate_hosts(
|
||||
self.nova_client.hosts.list())
|
||||
self.add_update_method(hosts_method, self.hosts_translator)
|
||||
|
||||
floating_ips_method = lambda: self._translate_floating_ips(
|
||||
self.nova_client.floating_ips.list())
|
||||
self.add_update_method(floating_ips_method,
|
||||
self.floating_ips_translator)
|
||||
|
||||
services_method = lambda: self._translate_services(
|
||||
self.nova_client.services.list())
|
||||
self.add_update_method(services_method, self.services_translator)
|
||||
|
||||
az_method = lambda: self._translate_availability_zones(
|
||||
self.nova_client.availability_zones.list())
|
||||
self.add_update_method(az_method, self.availability_zones_translator)
|
||||
|
||||
@ds_utils.update_state_on_changed(SERVERS)
|
||||
def _translate_servers(self, obj):
|
||||
|
@ -156,6 +156,10 @@ class DanglingReference(Conflict):
|
||||
pass
|
||||
|
||||
|
||||
class LazyTable(BadRequest):
|
||||
msg_fmt = _("table %(lazy_table)s is a lazy table and is not subscribed.")
|
||||
|
||||
|
||||
# NOTE(thinrichs): The following represent different kinds of
|
||||
# exceptions: the policy compiler and the policy runtime, respectively.
|
||||
class PolicyException(CongressException):
|
||||
|
@ -30,6 +30,7 @@ from congress.db import db_ds_table_data
|
||||
from congress import exception
|
||||
from congress.tests import base
|
||||
from congress.tests.datasources import util
|
||||
from congress.tests import fake_datasource
|
||||
from congress.tests import helper
|
||||
|
||||
|
||||
@ -1759,6 +1760,58 @@ class TestPollingDataSourceDriver(base.TestCase):
|
||||
mock_kill.assert_called_once_with(dummy_thread)
|
||||
self.assertIsNone(test_driver.worker_greenthread)
|
||||
|
||||
def test_evaluate_lazy_table(self):
|
||||
args = {'lazy_tables': ['fake_table']}
|
||||
test_driver = fake_datasource.FakeDataSource(args=args)
|
||||
|
||||
self.assertTrue('fake_table' not in test_driver._table_deps)
|
||||
test_driver.update_from_datasource()
|
||||
self.assertEqual(test_driver.update_number, 0)
|
||||
|
||||
test_driver.get_snapshot('fake_table')
|
||||
|
||||
self.assertTrue('fake_table' in test_driver._table_deps)
|
||||
test_driver.update_from_datasource()
|
||||
# update happens twice before the check. First one is in get_snapshot.
|
||||
self.assertEqual(test_driver.update_number, 2)
|
||||
|
||||
def test_add_update_method(self):
|
||||
class TestDriver(datasource_driver.PollingDataSourceDriver):
|
||||
test_translator = {
|
||||
'table-name': 'test'
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
super(TestDriver, self).__init__('', '', None, None, None)
|
||||
self.add_update_method(self.update_method,
|
||||
self.test_translator)
|
||||
|
||||
def update_method(self):
|
||||
pass
|
||||
|
||||
test_driver = TestDriver()
|
||||
self.assertEqual(test_driver.update_methods['test'],
|
||||
test_driver.update_method)
|
||||
|
||||
def test_add_duplicated_update_method(self):
|
||||
class TestDriver(datasource_driver.PollingDataSourceDriver):
|
||||
test_translator = {
|
||||
'table-name': 'test'
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
super(TestDriver, self).__init__('', '', None, None, None)
|
||||
self.add_update_method(self.update_method,
|
||||
self.test_translator)
|
||||
|
||||
def update_method(self):
|
||||
pass
|
||||
|
||||
test_driver = TestDriver()
|
||||
self.assertRaises(exception.Conflict, test_driver.add_update_method,
|
||||
test_driver.update_method,
|
||||
test_driver.test_translator)
|
||||
|
||||
|
||||
class TestPushedDriver(base.SqlTestCase):
|
||||
class TestDriver(datasource_driver.PushedDataSourceDriver):
|
||||
|
@ -53,6 +53,9 @@ class FakeDataSource(datasource_driver.PollingDataSourceDriver,
|
||||
[{'name': 'server_id',
|
||||
'description': 'server to act'}],
|
||||
'fake action')
|
||||
|
||||
self.update_number = 0
|
||||
self.initialize_update_method()
|
||||
self.exec_history = []
|
||||
self._init_end_start_poll()
|
||||
|
||||
@ -65,8 +68,12 @@ class FakeDataSource(datasource_driver.PollingDataSourceDriver,
|
||||
result['secret'] = ['password']
|
||||
return result
|
||||
|
||||
def update_from_datasource(self):
|
||||
def initialize_update_method(self):
|
||||
self.add_update_method(self.update_fake_table, self.fake_translator)
|
||||
|
||||
def update_fake_table(self):
|
||||
LOG.info("fake:: update_from_datasource")
|
||||
self.update_number += 1
|
||||
|
||||
def execute(self, action, action_args):
|
||||
self.exec_history.append((action, action_args))
|
||||
|
@ -172,9 +172,10 @@ class TestHA(manager_congress.ScenarioPolicyBase):
|
||||
item = {'id': None,
|
||||
'name': 'fake',
|
||||
'driver': 'fake_datasource',
|
||||
'config': '{"username":"fakeu", "tenant_name": "faket",' +
|
||||
'"password": "fakep",' +
|
||||
'"auth_url": "http://127.0.0.1:5000/v2"}',
|
||||
'config': {"username": "fakeu",
|
||||
"tenant_name": "faket",
|
||||
"password": "fakep",
|
||||
"auth_url": "http://127.0.0.1:5000/v2"},
|
||||
'description': 'bar',
|
||||
'enabled': True}
|
||||
ret = client.create_datasource(item)
|
||||
|
Loading…
Reference in New Issue
Block a user