JSON ingester deployment enhancements

Allows json ingester config YAMLs to use the !ref tag to reference
previously defined reusable structures, which makes deployment much
more convenient.

Allows devstack plugin to optionally enable JSON ingester feature.

Renamed and reorganized the JSON ingester config options.

Adds CI job which sets up JSON ingester.

partially-implements: bp json-data-model
Change-Id: I6391224c99249d16fe943b8f00fe12d1e6b7d8e6
This commit is contained in:
Eric Kao 2019-03-01 14:40:37 -08:00
parent 6244272d89
commit 5da6e463ff
14 changed files with 184 additions and 82 deletions

View File

@ -109,7 +109,9 @@
parent: congress-tempest-replicated
voting: false
vars:
database: postgresql
devstack_services:
mysql: false
postgresql: true
- job:
name: congress-tempest-py2-mysql
@ -124,18 +126,37 @@
database: mysql
- job:
name: congress-tempest-py2-postgresql
parent: congress-tempest-py2
name: congress-tempest-py2-JsonIngester
parent: congress-tempest-base
voting: false
vars:
database: postgresql
devstack_localrc:
ENABLE_CONGRESS_JSON: true
- job:
name: congress-tempest-py3-JsonIngester
parent: congress-tempest-py2-JsonIngester
voting: false
vars:
devstack_localrc:
USE_PYTHON3: true
- job:
name: congress-tempest-py2-postgresql
parent: congress-tempest-base
voting: false
vars:
devstack_services:
mysql: false
postgresql: true
- job:
name: congress-tempest-py3-postgresql
parent: congress-tempest-py3
parent: congress-tempest-py2-postgresql
voting: false
vars:
database: postgresql
devstack_localrc:
USE_PYTHON3: true
- project:
templates:
@ -153,6 +174,8 @@
- congress-tempest-py2-mysql
- congress-tempest-py3-mysql
- congress-tempest-replicated-postgresql
- congress-tempest-py2-JsonIngester
- congress-tempest-py3-JsonIngester
# Note: the above jobs most likely provides sufficient coverage
# - congress-tempest-py2-postgresql
# - congress-tempest-py3-postgresql

View File

@ -154,7 +154,7 @@ class APIRouterV1(object):
resource_mgr.register_handler(webhook_collection_handler)
# Setup /v1/data-sources/<ds_id>/tables/<table_name>/webhook
if cfg.CONF.json_ingester.json_ingester_experimental:
if cfg.CONF.json_ingester.enable:
json_ingester_webhook_path = \
"%s/tables/(?P<table_name>[^/]+)/webhook" % ds_path
json_ingester_webhook_collection_handler = \

View File

@ -107,20 +107,19 @@ cfg.CONF.register_opts(dse_opts, group='dse')
# json ingester opts
json_opts = [
cfg.BoolOpt('json_ingester_experimental', default=False,
cfg.BoolOpt('enable', default=False,
help='Set the flag to True to enable the experimental JSON'
'ingester feature.'),
cfg.StrOpt('config_path', default='/etc/congress/json_ingesters',
help=_('The directory for JSON ingester config files.')),
cfg.StrOpt('postgres_host', default='localhost',
help=_('Host name/address of the PostgreSQL server for '
'JSON ingestion.')),
cfg.StrOpt('postgres_database', default='congress',
help=_('Name of PostgreSQL database for JSON ingestion.')),
cfg.StrOpt('postgres_user', default='postgres',
help=_('PostgreSQL user name for JSON ingestion.')),
cfg.StrOpt('postgres_password',
help=_('PostgreSQL password for JSON ingestion.')),
cfg.StrOpt('config_reusables_path',
default='/etc/congress/config_reusables.yaml',
help=_('The path to reusables YAML file for JSON '
'ingesters config.')),
cfg.StrOpt('db_connection',
help='The PostgreSQL connection string to use to connect to '
'the database.',
secret=True),
]
# Register dse opts

View File

@ -32,13 +32,6 @@ from congress.datasources import datasource_utils
LOG = logging.getLogger(__name__)
def _get_config():
return {'host': cfg.CONF.json_ingester.postgres_host,
'database': cfg.CONF.json_ingester.postgres_database,
'user': cfg.CONF.json_ingester.postgres_user,
'password': cfg.CONF.json_ingester.postgres_password}
class ExecApiManager(object):
def __init__(self, configs):
super(ExecApiManager, self).__init__()
@ -53,10 +46,12 @@ class ExecApiManager(object):
# FIXME(json_ingester): validate config
if config.get('allow_exec_api', False) is True:
name = config['name']
self._exec_api_endpoints[name] = config['api_endpoint']
self._exec_api_endpoints[name] = (
config.get('api_endpoint_host', '').rstrip('/') + '/'
+ config.get('api_endpoint_path', '').lstrip('/'))
self._exec_api_sessions[
name] = datasource_utils.get_keystone_session(
config['authentication'])
config['authentication']['config'])
@lockutils.synchronized('congress_json_ingester_exec_api')
def evaluate_and_execute_actions(self):
@ -110,8 +105,6 @@ class ExecApiManager(object):
@staticmethod
def _read_all_execute_tables():
params = _get_config()
def json_rows_to_str_rows(json_rows):
# FIXME(json_ingester): validate; log and drop invalid rows
return [(endpoint, path, method, json.dumps(body, sort_keys=True),
@ -130,7 +123,7 @@ class ExecApiManager(object):
FROM {}.{};"""
conn = None
try:
conn = psycopg2.connect(**params)
conn = psycopg2.connect(cfg.CONF.json_ingester.db_connection)
# repeatable read to make sure all the _exec_api rows from all
# schemas are obtained at the same snapshot
conn.set_session(

View File

@ -37,13 +37,6 @@ from congress import exception
LOG = logging.getLogger(__name__)
def _get_config():
return {'host': cfg.CONF.json_ingester.postgres_host,
'database': cfg.CONF.json_ingester.postgres_database,
'user': cfg.CONF.json_ingester.postgres_user,
'password': cfg.CONF.json_ingester.postgres_password}
class JsonIngester(datasource_driver.PollingDataSourceDriver):
def __init__(self, name, config, exec_manager):
@ -80,7 +73,8 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver):
self._create_schema_and_tables()
self.poll_time = self._config.get('poll_interval', 60)
self._setup_table_key_sets()
self._api_endpoint = self._config.get('api_endpoint')
self._api_endpoint = self._config.get('api_endpoint_host', '').rstrip(
'/') + '/' + self._config.get('api_endpoint_path', '').lstrip('/')
self._initialize_session()
self._initialize_update_methods()
if len(self.update_methods) > 0:
@ -124,8 +118,6 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver):
use_snapshot=use_snapshot)
def _create_schema_and_tables(self):
params = _get_config()
create_schema_statement = """CREATE SCHEMA IF NOT EXISTS {};"""
create_table_statement = """
CREATE TABLE IF NOT EXISTS {}.{}
@ -138,7 +130,7 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver):
"""CREATE INDEX on {}.{} USING GIN (d);"""
conn = None
try:
conn = psycopg2.connect(**params)
conn = psycopg2.connect(cfg.CONF.json_ingester.db_connection)
cur = conn.cursor()
# create schema
cur.execute(
@ -171,8 +163,6 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver):
if new_data == old_data:
return False
params = _get_config()
insert_statement = """INSERT INTO {}.{}
VALUES(%s, %s);"""
delete_all_statement = """DELETE FROM {}.{};"""
@ -180,7 +170,7 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver):
DELETE FROM {}.{} WHERE _key = %s;"""
conn = None
try:
conn = psycopg2.connect(**params)
conn = psycopg2.connect(cfg.CONF.json_ingester.db_connection)
cur = conn.cursor()
if use_snapshot:
to_insert = new_data
@ -226,7 +216,7 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver):
def _initialize_session(self):
if 'authentication' in self._config:
self._session = datasource_utils.get_keystone_session(
self._config['authentication'])
self._config['authentication']['config'])
def _initialize_update_methods(self):
for table_name in self._config['tables']:
@ -328,15 +318,13 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver):
'The supplied key ({}) exceeds the max indexable size ({}) in '
'PostgreSQL.'.format(key_string, PGSQL_MAX_INDEXABLE_SIZE))
params = _get_config()
insert_statement = """INSERT INTO {}.{}
VALUES(%s, %s);"""
delete_tuple_statement = """
DELETE FROM {}.{} WHERE _key = %s;"""
conn = None
try:
conn = psycopg2.connect(**params)
conn = psycopg2.connect(cfg.CONF.json_ingester.db_connection)
cur = conn.cursor()
# delete the appropriate row from table
cur.execute(sql.SQL(delete_tuple_statement).format(

View File

@ -167,7 +167,8 @@ def create_policy_library_service():
def create_json_ingester_datasources(bus):
ds_configs = utils.YamlConfigs(
cfg.CONF.json_ingester.config_path, 'name')
cfg.CONF.json_ingester.config_path, 'name',
cfg.CONF.json_ingester.config_reusables_path)
ds_configs.load_from_files()
exec_manager = exec_api.ExecApiManager(
ds_configs.loaded_structures.values())
@ -217,7 +218,7 @@ def create_datasources(bus):
"be running.", ds.name, ds.driver)
# create json_ingester data sources
if cfg.CONF.json_ingester.json_ingester_experimental:
if cfg.CONF.json_ingester.enable:
create_json_ingester_datasources(bus)
return services

View File

@ -38,7 +38,8 @@ class TestExecApiManager(base.TestCase):
self.test_configs = [
{
"api_endpoint": "test1_url",
"api_endpoint_host": "test1",
"api_endpoint_path": "url",
"tables": {
"flavors": {
"poll": {
@ -62,18 +63,20 @@ class TestExecApiManager(base.TestCase):
}
},
"authentication": {
"username": "admin",
"type": "keystone",
"project_name": "admin",
"password": "password",
"auth_url": "http://127.0.0.1/identity"
"config": {
"username": "admin",
"project_name": "admin",
"password": "password",
"auth_url": "http://127.0.0.1/identity"}
},
"poll_interval": 1,
"name": "test1"
},
{
"allow_exec_api": True,
"api_endpoint": "test2_url",
"api_endpoint_host": "test2",
"api_endpoint_path": "url",
"tables": {
"flavors": {
"poll": {
@ -97,24 +100,27 @@ class TestExecApiManager(base.TestCase):
}
},
"authentication": {
"username": "admin",
"type": "keystone",
"project_name": "admin",
"password": "password",
"auth_url": "http://127.0.0.1/identity"
"config": {
"username": "admin",
"project_name": "admin",
"password": "password",
"auth_url": "http://127.0.0.1/identity"}
},
"poll_interval": 1,
"name": "test2"
},
{
"allow_exec_api": True,
"api_endpoint": "test3_url",
"api_endpoint_host": "test3",
"api_endpoint_path": "url",
"authentication": {
"username": "admin",
"type": "keystone",
"project_name": "admin",
"password": "password",
"auth_url": "http://127.0.0.1/identity"
"config": {
"username": "admin",
"project_name": "admin",
"password": "password",
"auth_url": "http://127.0.0.1/identity"}
},
"name": "test3"
}
@ -173,16 +179,16 @@ class TestExecApiManager(base.TestCase):
2)
self.test_exec_mgr._exec_api_sessions[
'test2'].request.assert_any_call(
endpoint_override='test2_url', url='path2a', method='METHOD2A',
endpoint_override='test2/url', url='path2a', method='METHOD2A',
json=[u'body2a'], params=[u'params2a'], headers=[u'headers2a'],
connect_retries=10, status_code_retries=10)
self.test_exec_mgr._exec_api_sessions[
'test2'].request.assert_any_call(
endpoint_override='test2_url', url='path2b', method='METHOD2B',
endpoint_override='test2/url', url='path2b', method='METHOD2B',
json=[u'body2b'], params=[u'params2b'], headers=[u'headers2b'],
connect_retries=10, status_code_retries=10)
self.test_exec_mgr._exec_api_sessions[
'test3'].request.assert_called_once_with(
endpoint_override='test3_url', url='path3', method='METHOD3',
endpoint_override='test3/url', url='path3', method='METHOD3',
json=[u'body3'], params=[u'params3'], headers=[u'headers3'],
connect_retries=10, status_code_retries=10)

View File

@ -63,11 +63,13 @@ class TestJsonIngester(base.TestCase):
}
},
"authentication": {
"username": "admin",
"type": "keystone",
"project_name": "admin",
"password": "password",
"auth_url": "http://127.0.0.1/identity"
"config": {
"username": "admin",
"project_name": "admin",
"password": "password",
"auth_url": "http://127.0.0.1/identity"
}
},
"poll_interval": 1,
"name": "nova"

View File

@ -1,4 +1,4 @@
# Copyright (c) 2014 VMware
# Copyright (c) 2014, 2019 VMware
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.

View File

@ -165,19 +165,65 @@ def pretty_rule(rule_str):
class YamlConfigs (object):
def __init__(self, dir_path, key_attrib):
def __init__(self, dir_path, key_attrib, reusables_path=None):
self.dir_path = dir_path
self.key_attrib = key_attrib
self.reusables_path = reusables_path
# dictionary of loaded structures
# indexed by the value of each struct[key_attrib]
self.loaded_structures = {}
# dictionary of reusable yaml-style structures
# indexed by unique name
self.reusables = {}
yaml.SafeLoader.add_constructor(
'!ref', self._resolve_reuse_reference_constructor)
def _resolve_reuse_reference_constructor(self, loader, node):
import six
if not isinstance(node.value, six.string_types):
raise yaml.YAMLError(
'Cannot resolve reference {} because the value is not '
'a string.'.format(node))
if node.value in self.reusables:
return self.reusables[node.value]
else:
raise yaml.YAMLError(
'Cannot resolve reference {} because no reusable '
'data has been defined with the name "{}". Please double '
'check the reference name or the reusables file "{}".'.format(
node, node.value, self.reusables_path))
def load_from_files(self):
'''load YAML config files from directory
return total number of files on which error encountered
return total number of files on which error encountered.
Separately callable apart from __init__ to support reloading changed
files.
'''
if self.reusables_path is not None:
self.reusables = {}
try:
with open(self.reusables_path, "r") as stream:
try:
self.reusables = yaml.safe_load(stream)
except Exception:
LOG.warning(
'Unable to YAML-load reusables file at path %s. '
'Proceeding with empty reusables.',
self.reusables_path)
except IOError:
LOG.warning('Unable to find or open reusables file at path %s.'
' Proceeding with empty reusables.',
self.reusables_path)
if not isinstance(self.reusables, dict):
LOG.warning('The loaded reusables file does not conform to the'
' expected format (must be a hash at the top '
'level). Proceeding with empty reusables. '
'Provided structure: %s', self.reusables)
def _load_yaml_config_file(full_path):
try:
success_yaml_count = 0
@ -185,7 +231,7 @@ class YamlConfigs (object):
doc_num_in_file = 0
file_error = False
with open(full_path, "r") as stream:
policies = yaml.load_all(stream)
policies = yaml.safe_load_all(stream)
for policy in policies:
doc_num_in_file += 1
# FIXME: validate YAML config

View File

@ -61,6 +61,29 @@ function configure_congress {
iniset $CONGRESS_CONF DEFAULT replicated_policy_engine "$CONGRESS_REPLICATED"
iniset $CONGRESS_CONF DEFAULT transport_url rabbit://$RABBIT_USERID:$RABBIT_PASSWORD@$RABBIT_HOST:5672/
iniset $CONGRESS_CONF database connection `database_connection_url $CONGRESS_DB_NAME`
if [ "$ENABLE_CONGRESS_JSON" == "True" ]; then
iniset $CONGRESS_CONF json_ingester enable "True"
# when the main db is not postgres, the devstack function
# database_connection_url_postgresql returns URL with wrong prefix,
# so we do a substitution here
local db_connection_mysql=`database_connection_url_postgresql $CONGRESS_JSON_DB_NAME`
iniset $CONGRESS_CONF json_ingester db_connection ${db_connection_mysql/?*:\/\//postgresql:\/\/}
iniset $CONGRESS_CONF json_ingester config_path "$CONGRESS_JSON_CONF_DIR"
iniset $CONGRESS_CONF json_ingester config_reusables_path "$CONGRESS_JSON_CONF_REUSABLES_PATH"
echo "primary_host: http://$SERVICE_HOST" > "$CONGRESS_JSON_CONF_REUSABLES_PATH"
echo "keystone_admin_auth_config:" >> "$CONGRESS_JSON_CONF_REUSABLES_PATH"
echo " type: keystone" >> "$CONGRESS_JSON_CONF_REUSABLES_PATH"
echo " config:" >> "$CONGRESS_JSON_CONF_REUSABLES_PATH"
echo " project_name: $OS_PASSWORD" >> "$CONGRESS_JSON_CONF_REUSABLES_PATH"
echo " username: $OS_USERNAME" >> "$CONGRESS_JSON_CONF_REUSABLES_PATH"
echo " password: $OS_PASSWORD" >> "$CONGRESS_JSON_CONF_REUSABLES_PATH"
echo " auth_url: http://$SERVICE_HOST/identity" >> "$CONGRESS_JSON_CONF_REUSABLES_PATH"
if [[ ! -d $CONGRESS_JSON_CONF_DIR ]]; then
mkdir $CONGRESS_JSON_CONF_DIR
fi
cp -r $CONGRESS_DIR/etc/sample_json_ingesters/* $CONGRESS_JSON_CONF_DIR
fi
_congress_setup_keystone $CONGRESS_CONF keystone_authtoken
}
@ -231,6 +254,15 @@ function create_congress_accounts {
# init_congress() - Initialize databases, etc.
function init_congress {
recreate_database $CONGRESS_DB_NAME utf8
if [ "$ENABLE_CONGRESS_JSON" == "True" ]; then
if [ ${DATABASE_TYPE,,} != "postgresql" ]; then
# setup separate postgres db if main is not already postgres
install_database_postgresql
install_database_python_postgresql
configure_database_postgresql
fi
recreate_database_postgresql $CONGRESS_JSON_DB_NAME utf8
fi
# Run Congress db migrations
congress-db-manage --config-file $CONGRESS_CONF upgrade head
}

View File

@ -49,6 +49,14 @@ ENABLE_CONGRESS_Z3=$(trueorfalse False ENABLE_CONGRESS_Z3)
# Flag to indicate that we prefer to use a precompiled release
USE_Z3_RELEASE=${USE_Z3_RELEASE:-None}
# Flag for enabling experimental JSON ingester
# Requires DATABASE_TYPE: postgresql
ENABLE_CONGRESS_JSON=$(trueorfalse False ENABLE_CONGRESS_JSON)
CONGRESS_JSON_DB_NAME=${CONGRESS_JSON_DB_NAME:-congress_json}
CONGRESS_JSON_CONF_DIR=$CONGRESS_CONF_DIR/json_ingesters
CONGRESS_JSON_CONF_REUSABLES_PATH=$CONGRESS_CONF_DIR/config_reusables.yaml
TEMPEST_DIR=$DEST/tempest
TEMPEST_CONFIG_DIR=${TEMPEST_CONFIG_DIR:-$TEMPEST_DIR/etc}
TEMPEST_CONFIG=$TEMPEST_CONFIG_DIR/tempest.conf

View File

@ -0,0 +1,8 @@
keystone_admin_auth_config:
type: keystone
config:
username: admin
auth_url: http://127.0.0.1/identity
project_name: admin
password: password
primary_host: http://127.0.0.1

View File

@ -1,13 +1,9 @@
name: nova
poll_interval: 5
allow_exec_api: true
authentication:
type: keystone
username: admin
auth_url: http://127.0.0.1/identity
project_name: admin
password: password
api_endpoint: http://127.0.0.1/compute/v2.1/
authentication: !ref keystone_admin_auth_config
api_endpoint_host: !ref primary_host
api_endpoint_path: compute/v2.1/
tables:
flavors:
poll: