Prepare Table Model API for the new distributed architecture

Table Model API uses a python referece to both policy engine class
and datasource_manager class to call its methods. In the new architecture,
API model needs to use RPC calling methods in policy engine and
dataosurce_manager.

This patch changes Table Model API to use rpc style method calling.

Patially-Implements: blueprint dist-api-rpcify-table

Change-Id: I66bbda3b6a5a6f7b76c0c95bf3cb1ed9e3cc6640
This commit is contained in:
Masahito Muroi 2015-08-23 22:15:35 +09:00
parent 959525df52
commit cd18b5e802
10 changed files with 268 additions and 127 deletions

View File

@ -12,6 +12,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from congress.api import webservice
LOG = logging.getLogger(__name__)
def create_table_dict(tablename, schema):
# FIXME(arosen): Should not be returning None
@ -20,3 +26,15 @@ def create_table_dict(tablename, schema):
for x in schema[tablename]]
return {'table_id': tablename,
'columns': cols}
def get_id_from_context(context, datasource_mgr, policy_engine):
if 'ds_id' in context:
return datasource_mgr, context.get('ds_id')
elif 'policy_id' in context:
return policy_engine, context.get('policy_id')
else:
msg = "Internal error: context %s should have included " % str(context)
"either ds_id or policy_id"
LOG.exception(msg)
raise webservice.DataModelException('404', msg)

View File

@ -15,9 +15,9 @@
from oslo_log import log as logging
from congress.api import webservice
from congress.api import api_utils
from congress.dse import deepsix
from congress.managers import datasource as datasource_manager
from congress import exception
LOG = logging.getLogger(__name__)
@ -29,12 +29,19 @@ def d6service(name, keys, inbox, datapath, args):
class TableModel(deepsix.deepSix):
"""Model for handling API requests about Tables."""
def __init__(self, name, keys, inbox=None, dataPath=None,
policy_engine=None):
policy_engine=None, datasource_mgr=None):
super(TableModel, self).__init__(name, keys, inbox=inbox,
dataPath=dataPath)
self.datasource_mgr = datasource_manager.DataSourceManager()
self.datasource_mgr = datasource_mgr
self.engine = policy_engine
def rpc(self, caller, name, *args, **kwargs):
func = getattr(caller, name, None)
if func:
return func(*args, **kwargs)
raise exception.CongressException('method: %s is not defined in %s' %
(name, caller.__name__))
def get_item(self, id_, params, context=None):
"""Retrieve item with id id_ from model.
@ -48,41 +55,16 @@ class TableModel(deepsix.deepSix):
The matching item or None if item with id_ does not exist.
"""
# table defined by data-source
if 'ds_id' in context:
datasource_id = context['ds_id']
if datasource_id in self.engine.d6cage.getservices().keys():
datasource = self.engine.d6cage.getservice(name=datasource_id)
else:
datasource = self.engine.d6cage.getservice(id_=datasource_id)
caller, source_id = api_utils.get_id_from_context(context,
self.datasource_mgr,
self.engine)
if not datasource:
LOG.info("data-source %s not found", datasource_id)
return None
tablename = self.rpc(caller, 'get_tablename', source_id, id_)
if tablename:
return {'id': tablename}
service_obj = self.engine.d6cage.service_object(datasource['name'])
tablename = context['table_id']
if tablename not in service_obj.state:
LOG.info("data-source %s does not have table %s",
datasource_id, tablename)
return None
return {'id': id_}
# table defined by policy
elif 'policy_id' in context:
policy_name = context['policy_id']
if policy_name not in self.engine.theory:
return None
tables = self.engine.theory[policy_name].tablenames()
tablename = context['table_id']
if tablename not in tables:
return None
return {'id': id_}
# should not happen
else:
raise Exception("Internal error: context %s should have included "
"either ds_id or policy_id" % str(context))
LOG.info('source id %s or table id %s is not found',
source_id, id_)
def get_items(self, params, context=None):
"""Get items in model.
@ -98,38 +80,17 @@ class TableModel(deepsix.deepSix):
"""
LOG.info('get_items has context %s', context)
# data-source
if 'ds_id' in context:
# FIXME(arosen): this file needs refactoring.
datasource = context.get('ds_id')
try:
datasource = self.datasource_mgr.get_datasource(
datasource)
except datasource_manager.DatasourceNotFound as e:
raise webservice.DataModelException(e.code, e.message)
caller, source_id = api_utils.get_id_from_context(context,
self.datasource_mgr,
self.engine)
service_name = context['ds_id']
service_obj = self.engine.d6cage.service_object(datasource['name'])
if service_obj is None:
LOG.info("data-source %s not found", service_name)
return []
LOG.info("data-source %s found", service_name)
results = [{'id': x} for x in service_obj.state.keys()]
tablenames = self.rpc(caller, 'get_tablenames', source_id)
# when the source_id doesn't have any table, 'tablenames' is set([])
# when the source_id doesn't exist 'tablenames' is None
if isinstance(tablenames, set):
return {'results': [{'id': x} for x in tablenames]}
# policy
elif 'policy_id' in context:
policy_name = context['policy_id']
if policy_name not in self.engine.theory:
LOG.info("policy %s not found", policy_name)
return None
results = [{'id': x}
for x in self.engine.theory[policy_name].tablenames()]
# should not happen
else:
LOG.error("Blackhole for table context %s", context)
results = []
return {'results': results}
LOG.info('source id %s not found', source_id)
# Tables can only be created/updated/deleted by writing policy
# or by adding new data sources. Once we have internal data sources

View File

@ -584,6 +584,19 @@ class DataSourceDriver(deepsix.deepSix):
cls._get_schema(trans, all_schemas)
return all_schemas
@classmethod
def get_tablename(cls, table_id):
"""Get a table name."""
return table_id if table_id in cls.get_tablenames() else None
@classmethod
def get_tablenames(cls):
"""Get a list of table names.
Returns list of table names the datasource has
"""
return set(cls.get_schema().keys())
def get_column_map(self, tablename):
"""Get mapping of column name to column's integer position.

View File

@ -101,7 +101,8 @@ def create(rootdir, config_override=None):
name="api-table",
moduleName="API-table",
description="API-table DSE instance",
args={'policy_engine': engine})
args={'policy_engine': engine,
'datasource_mgr': datasource_mgr})
# add row api
api_path = os.path.join(src_path, "api/row_model.py")

View File

@ -181,6 +181,45 @@ class DataSourceManager(object):
obj = importutils.import_class(driver['module'])
return obj.get_schema()
@classmethod
def load_module_object(cls, datasource_id_or_name):
datasource = datasources_db.get_datasource(datasource_id_or_name)
# Ideally speaking, it should change datasource_db.get_datasource() to
# be able to retrieve datasource info from db at once. The datasource
# table and the method, however, will be removed in the new
# architecture, so it use this way. Supporting both name and id is
# a backward compatibility.
if not datasource:
datasource = (datasources_db.
get_datasource_by_name(datasource_id_or_name))
if not datasource:
return None
driver = cls.get_driver_info(datasource.driver)
obj = importutils.import_class(driver['module'])
return obj
@classmethod
def get_tablename(cls, datasource_id_or_name, table_id):
obj = cls.load_module_object(datasource_id_or_name)
if obj:
return obj.get_tablename(table_id)
else:
return None
@classmethod
def get_tablenames(cls, datasource_id_or_name):
'''The method to get datasource tablename.'''
# In the new architecture, table model would call datasource_driver's
# get_tablenames() directly using RPC
obj = cls.load_module_object(datasource_id_or_name)
if obj:
return obj.get_tablenames()
else:
return None
@classmethod
def delete_datasource(cls, datasource_id, update_db=True):
datasource = cls.get_datasource(datasource_id)

View File

@ -661,12 +661,32 @@ class Runtime (object):
return self._simulate_obj(query, theory, sequence, action_theory,
delta, trace)
def tablenames(self, body_only=False, include_builtin=False):
def get_tablename(self, th_name, table_name):
tables = self.get_tablenames(th_name)
# when the policy doesn't have any rule 'tables' is set([])
# when the policy doesn't exist 'tables' is None
if tables and table_name in tables:
return table_name
def get_tablenames(self, th_name):
if th_name in self.theory.keys():
return self.tablenames(theory_name=th_name)
def tablenames(self, body_only=False, include_builtin=False,
theory_name=None):
"""Return tablenames occurring in some theory."""
tables = set()
if theory_name:
th = self.theory.get(theory_name, None)
if th:
tables |= set(th.tablenames(body_only=body_only,
include_builtin=include_builtin))
return tables
for th in self.theory.values():
tables |= set(th.tablenames(
body_only=body_only, include_builtin=include_builtin))
tables |= set(th.tablenames(body_only=body_only,
include_builtin=include_builtin))
return tables
def reserved_tablename(self, name):

View File

@ -14,6 +14,7 @@
# limitations under the License.
from congress.api import api_utils
from congress.api import webservice
from congress.tests import base
@ -30,3 +31,26 @@ class TestAPIUtils(base.TestCase):
{'name': 'name', 'description': 'None'}]}
result = api_utils.create_table_dict(table_name, schema)
self.assertEqual(expected, result)
def test_get_id_from_context_ds_id(self):
context = {'ds_id': 'datasource id'}
expected = ('datasource-mgr', 'datasource id')
result = api_utils.get_id_from_context(context,
'datasource-mgr',
'policy-engine')
self.assertEqual(expected, result)
def test_get_id_from_context_policy_id(self):
context = {'policy_id': 'policy id'}
expected = ('policy-engine', 'policy id')
result = api_utils.get_id_from_context(context,
'datasource-mgr',
'policy-engine')
self.assertEqual(expected, result)
def test_get_id_from_context_with_invalid_context(self):
context = {'invalid_id': 'invalid id'}
self.assertRaises(webservice.DataModelException,
api_utils.get_id_from_context,
context, 'datasource-mgr', 'policy-engine')

View File

@ -13,11 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo_config import cfg
from congress.api import table_model
from congress.api import webservice
from congress import harness
from congress.managers import datasource as datasource_manager
from congress.tests import base
@ -37,86 +35,62 @@ class TestTableModel(base.SqlTestCase):
# cage so we don't have to create one here.
self.cage = harness.create(helper.root_path())
self.datasource_mgr = datasource_manager.DataSourceManager
self.datasource_mgr.validate_configured_drivers()
self.ds_mgr = datasource_manager.DataSourceManager
self.ds_mgr.validate_configured_drivers()
req = {'driver': 'fake_datasource',
'name': 'fake_datasource'}
req['config'] = {'auth_url': 'foo',
'username': 'foo',
'password': 'password',
'tenant_name': 'foo'}
self.datasource = self.datasource_mgr.add_datasource(req)
self.datasource = self.ds_mgr.add_datasource(req)
self.engine = self.cage.service_object('engine')
self.api_rule = self.cage.service_object('api-rule')
self.table_model = table_model.TableModel("table_model", {},
policy_engine=self.engine)
policy_engine=self.engine,
datasource_mgr=self.ds_mgr)
def tearDown(self):
super(TestTableModel, self).tearDown()
def test_get_datasource_table_with_id(self):
context = {'ds_id': self.datasource['id'],
'table_id': 'fake-table'}
fake_obj = helper.FakeServiceObj()
fake_obj.state = {'fake-table': set([('data1', 'data2')])}
expected_ret = {'id': self.datasource['id']}
self.engine.d6cage.service_object = mock.Mock()
self.engine.d6cage.service_object.return_value = fake_obj
ret = self.table_model.get_item(self.datasource['id'], {}, context)
'table_id': 'fake_table'}
expected_ret = {'id': 'fake_table'}
ret = self.table_model.get_item('fake_table', {}, context)
self.assertEqual(expected_ret, ret)
def test_get_datasource_table_with_name(self):
context = {'ds_id': self.datasource['name'],
'table_id': 'fake-table'}
fake_obj = helper.FakeServiceObj()
fake_obj.state = {'fake-table': set([('data1', 'data2')])}
expected_ret = {'id': self.datasource['id']}
self.engine.d6cage.service_object = mock.Mock()
self.engine.d6cage.service_object.return_value = fake_obj
ret = self.table_model.get_item(self.datasource['id'],
{}, context)
'table_id': 'fake_table'}
expected_ret = {'id': 'fake_table'}
ret = self.table_model.get_item('fake_table', {}, context)
self.assertEqual(expected_ret, ret)
def test_get_invalid_datasource(self):
context = {'ds_id': 'invalid-id',
'table_id': 'fake-table'}
fake_obj = helper.FakeServiceObj()
fake_obj.state = {'fake-table': set([('data1', 'data2')])}
'table_id': 'fake_table'}
expected_ret = None
self.engine.d6cage.service_object = mock.Mock()
self.engine.d6cage.service_object.return_value = fake_obj
ret = self.table_model.get_item(self.datasource['id'], {}, context)
ret = self.table_model.get_item('fake_table', {}, context)
self.assertEqual(expected_ret, ret)
def test_get_invalid_datasource_table(self):
context = {'ds_id': self.datasource['id'],
'table_id': 'invalid-table'}
fake_obj = helper.FakeServiceObj()
fake_obj.state = {'fake-table': set([('data1', 'data2')])}
expected_ret = None
self.engine.d6cage.service_object = mock.Mock()
self.engine.d6cage.service_object.return_value = fake_obj
ret = self.table_model.get_item(self.datasource['id'], {}, context)
ret = self.table_model.get_item('invalid-table', {}, context)
self.assertEqual(expected_ret, ret)
def test_get_policy_table(self):
context = {'policy_id': self.engine.DEFAULT_THEORY,
'table_id': 'p'}
expected_ret = {'id': self.engine.DEFAULT_THEORY}
expected_ret = {'id': 'p'}
self.api_rule.add_item({'rule': 'p(x) :- q(x)'}, {}, context=context)
self.api_rule.add_item({'rule': 'q(x) :- r(x)'}, {}, context=context)
ret = self.table_model.get_item(self.engine.DEFAULT_THEORY, {},
context)
ret = self.table_model.get_item('p', {}, context)
self.assertEqual(expected_ret, ret)
def test_get_invalid_policy(self):
@ -148,15 +122,8 @@ class TestTableModel(base.SqlTestCase):
self.assertEqual(expected_ret, ret)
def test_get_items_datasource_table(self):
context = {'ds_id': self.datasource['id'],
'table_id': 'fake-table'}
fake_obj = helper.FakeServiceObj()
fake_obj.state = {'fake-table1': set([('data1-1', 'data1-2')]),
'fake-table2': set([('data2-1', 'data2-2')])}
expected_ret = {'results': [{'id': x} for x in fake_obj.state.keys()]}
self.engine.d6cage.service_object = mock.Mock()
self.engine.d6cage.service_object.return_value = fake_obj
context = {'ds_id': self.datasource['id']}
expected_ret = {'results': [{'id': 'fake_table'}]}
ret = self.table_model.get_items({}, context)
self.assertEqual(expected_ret, ret)
@ -164,12 +131,9 @@ class TestTableModel(base.SqlTestCase):
def test_get_items_invalid_datasource(self):
context = {'ds_id': 'invalid-id',
'table_id': 'fake-table'}
fake_obj = helper.FakeServiceObj()
fake_obj.state = {'fake-table1': set([('data1-1', 'data1-2')]),
'fake-table2': set([('data2-1', 'data2-2')])}
self.assertRaises(webservice.DataModelException,
self.table_model.get_items, {}, context)
ret = self.table_model.get_items({}, context)
self.assertIsNone(ret)
def _get_id_list_from_return(self, result):
return [r['id'] for r in result['results']]

View File

@ -1234,6 +1234,82 @@ class TestDatasourceDriver(base.TestCase):
self.assertTrue(schema['testtable'] == ('id_col', 'key'))
self.assertTrue(schema['subtable'] == ('parent_key', 'val'))
def test_get_tablename(self):
class TestDriver(datasource_driver.DataSourceDriver):
translator1 = {
'translation-type': 'HDICT',
'table-name': 'table-name1',
'selector-type': 'DICT_SELECTOR',
'field-translators':
({'fieldname': 'col1', 'translator': self.val_trans},
{'fieldname': 'col2', 'translator': self.val_trans})
}
TRANSLATORS = [translator1]
def __init__(self):
super(TestDriver, self).__init__('', '', None, None, None)
expected_ret = 'table-name1'
ret = TestDriver().get_tablename('table-name1')
self.assertEqual(expected_ret, ret)
def test_get_tablenames(self):
class TestDriver(datasource_driver.DataSourceDriver):
translator1 = {
'translation-type': 'HDICT',
'table-name': 'table-name1',
'selector-type': 'DICT_SELECTOR',
'field-translators':
({'fieldname': 'col1', 'translator': self.val_trans},
{'fieldname': 'col2', 'translator': self.val_trans})
}
translator2 = {
'translation-type': 'HDICT',
'table-name': 'table-name2',
'selector-type': 'DICT_SELECTOR',
'field-translators':
({'fieldname': 'col1', 'translator': self.val_trans},
{'fieldname': 'col2', 'translator': self.val_trans})
}
TRANSLATORS = [translator1, translator2]
def __init__(self):
super(TestDriver, self).__init__('', '', None, None, None)
expected_ret = ['table-name1', 'table-name2']
ret = TestDriver().get_tablenames()
self.assertEqual(set(expected_ret), set(ret))
def test_nested_get_tables(self):
class TestDriver(datasource_driver.DataSourceDriver):
translator2 = {
'translation-type': 'HDICT',
'table-name': 'table-name2',
'selector-type': 'DICT_SELECTOR',
'field-translators':
({'fieldname': 'col1', 'translator': self.val_trans},
{'fieldname': 'col2', 'translator': self.val_trans})
}
translator1 = {
'translation-type': 'HDICT',
'table-name': 'table-name1',
'selector-type': 'DICT_SELECTOR',
'field-translators':
({'fieldname': 'col1', 'translator': self.val_trans},
{'fieldname': 'col2', 'translator': translator2})
}
TRANSLATORS = [translator1]
def __init__(self):
super(TestDriver, self).__init__('', '', None, None, None)
expected_ret = ['table-name1', 'table-name2']
ret = TestDriver().get_tablenames()
self.assertEqual(set(expected_ret), set(ret))
def test_update_state_on_changed(self):
mocked_self = mock.MagicMock()
mocked_self.raw_state = dict()

View File

@ -174,6 +174,18 @@ class TestRuntime(base.TestCase):
# double-check that the error didn't result in an inconsistent state
self.assertEqual(run.select('q(5)'), '')
def test_get_tablename(self):
run = agnostic.Runtime()
run.create_policy('test')
run.insert('p(x) :- q(x)')
run.insert('q(x) :- r(x)')
run.insert('execute[nova:disconnect(x, y)] :- s(x, y)')
tables = run.get_tablename('test', 'p')
self.assertEqual(set(tables), set(['p']))
tables = run.get_tablename('test', 't')
self.assertIsNone(tables)
def test_tablenames(self):
run = agnostic.Runtime()
run.create_policy('test')
@ -207,6 +219,19 @@ class TestRuntime(base.TestCase):
'nonrecursive')
mock_delete.assert_called_once_with(policy_name)
def test_tablenames_theory_name(self):
run = agnostic.Runtime()
run.create_policy('test')
run.create_policy('test2')
run.insert('p(x) :- q(x)', 'test')
run.insert('r(x) :- s(x)', 'test2')
tables = run.tablenames()
self.assertEqual(set(tables), set(['p', 'q', 'r', 's']))
tables = run.tablenames(theory_name='test')
self.assertEqual(set(tables), set(['p', 'q']))
class TestArity(base.TestCase):
def test_same_table_diff_policies(self):