Merge "JSON ingester API execution feature"

This commit is contained in:
Zuul 2019-03-06 04:05:30 +00:00 committed by Gerrit Code Review
commit 6244272d89
9 changed files with 414 additions and 19 deletions

View File

@ -0,0 +1,170 @@
# Copyright (c) 2019 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import json
import eventlet
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
import psycopg2
from psycopg2 import sql
from congress.datasources import datasource_utils
LOG = logging.getLogger(__name__)
def _get_config():
return {'host': cfg.CONF.json_ingester.postgres_host,
'database': cfg.CONF.json_ingester.postgres_database,
'user': cfg.CONF.json_ingester.postgres_user,
'password': cfg.CONF.json_ingester.postgres_password}
class ExecApiManager(object):
def __init__(self, configs):
super(ExecApiManager, self).__init__()
self._exec_api_sessions = {}
self._exec_api_endpoints = {}
# state tracking the most recent state consisting of the union
# of all the rows from all the _exec_api tables
# used to determine which rows are new
self._last_exec_api_state = set([])
for config in configs:
# FIXME(json_ingester): validate config
if config.get('allow_exec_api', False) is True:
name = config['name']
self._exec_api_endpoints[name] = config['api_endpoint']
self._exec_api_sessions[
name] = datasource_utils.get_keystone_session(
config['authentication'])
@lockutils.synchronized('congress_json_ingester_exec_api')
def evaluate_and_execute_actions(self):
# FIXME(json_ingester): retry
new_exec_api_state = self._read_all_execute_tables()
new_exec_api_rows = new_exec_api_state - self._last_exec_api_state
LOG.debug('New exec_api rows %s', new_exec_api_rows)
self._execute_exec_api_rows(new_exec_api_rows)
self._last_exec_api_state = new_exec_api_state
def _execute_exec_api_rows(self, rows):
def exec_api(session, kwargs):
LOG.info("Making API request %s.", kwargs)
try:
session.request(**kwargs)
except Exception:
LOG.exception('Exception in making API request %s.', kwargs)
for row in rows:
(endpoint, path, method, body, parameters, headers) = row
if endpoint in self._exec_api_endpoints:
kwargs = {
'endpoint_override': self._exec_api_endpoints[endpoint],
'url': path,
'method': method.upper(),
'connect_retries': 10,
'status_code_retries': 10}
body = json.loads(body)
if body is not None:
kwargs['json'] = body
parameters = json.loads(parameters)
if parameters is not None:
kwargs['params'] = parameters
headers = json.loads(headers)
if headers is not None:
kwargs['headers'] = headers
if cfg.CONF.enable_execute_action:
eventlet.spawn_n(
exec_api, self._exec_api_sessions[endpoint], kwargs)
else:
LOG.info("Simulating API request %s", kwargs)
else:
LOG.warning(
'No configured API endpoint with name %s. '
'Skipping the API request: '
'(endpoint, path, method, body, parameters, headers) '
'= %s.', endpoint, row)
eventlet.sleep(0) # defer to greenthreads running api requests
@staticmethod
def _read_all_execute_tables():
params = _get_config()
def json_rows_to_str_rows(json_rows):
# FIXME(json_ingester): validate; log and drop invalid rows
return [(endpoint, path, method, json.dumps(body, sort_keys=True),
json.dumps(parameters, sort_keys=True),
json.dumps(headers, sort_keys=True)) for
(endpoint, path, method, body, parameters, headers)
in json_rows]
FIND_ALL_EXEC_VIEWS = """
SELECT table_schema, table_name FROM information_schema.tables
WHERE table_schema NOT LIKE 'pg\_%'
AND table_schema <> 'information_schema'
AND table_name LIKE '\_exec_api';"""
READ_EXEC_VIEW = """
SELECT endpoint, path, method, body, parameters, headers
FROM {}.{};"""
conn = None
try:
conn = psycopg2.connect(**params)
# repeatable read to make sure all the _exec_api rows from all
# schemas are obtained at the same snapshot
conn.set_session(
isolation_level=psycopg2.extensions.
ISOLATION_LEVEL_REPEATABLE_READ,
readonly=True, autocommit=False)
cur = conn.cursor()
# find all _exec_api tables
cur.execute(sql.SQL(FIND_ALL_EXEC_VIEWS))
all_exec_api_tables = cur.fetchall()
# read each _exec_api_table
all_exec_api_rows = set([])
for (table_schema, table_name) in all_exec_api_tables:
try:
cur.execute(sql.SQL(READ_EXEC_VIEW).format(
sql.Identifier(table_schema),
sql.Identifier(table_name)))
all_rows = cur.fetchall()
all_exec_api_rows.update(
json_rows_to_str_rows(all_rows))
except psycopg2.ProgrammingError:
LOG.warning('The "%s" table in the "%s" schema does not '
'have the right columns for API execution. '
'Its content is ignored for the purpose of '
'API execution. Please check and correct the '
'view definition.',
table_name, table_schema)
conn.commit()
cur.close()
return all_exec_api_rows
except (Exception, psycopg2.Error):
LOG.exception("Error reading from DB")
raise
finally:
if conn is not None:
conn.close()

View File

@ -45,7 +45,8 @@ def _get_config():
class JsonIngester(datasource_driver.PollingDataSourceDriver):
def __init__(self, name, config):
def __init__(self, name, config, exec_manager):
def validate_config(config):
# FIXME: use json schema to validate config
@ -66,8 +67,14 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver):
# use prefix to avoid service_id clash with regular data sources
super(JsonIngester, self).__init__(
api_base.JSON_DS_SERVICE_PREFIX + name)
self.exec_manager = exec_manager # ref to global mgr for api exec
self.type = 'json_ingester'
self.name = name # set name back to one without prefix for use here
if 'tables' not in config:
# config w/o table used to define exec_api endpoint
# in this case, no need to create datasource service
return
validate_config(config)
self._config = config
self._create_schema_and_tables()
@ -111,9 +118,10 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver):
LOG.trace('publish(self=%s, table=%s, data=%s, use_snapshot=%s',
self, table, data, use_snapshot)
self._update_table(table, new_data=data,
old_data=self.prior_state.get(table, set([])),
use_snapshot=use_snapshot)
return self._update_table(
table, new_data=data,
old_data=self.prior_state.get(table, set([])),
use_snapshot=use_snapshot)
def _create_schema_and_tables(self):
params = _get_config()
@ -159,9 +167,9 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver):
def _update_table(
self, table_name, new_data, old_data, use_snapshot):
# return immediately if no change to update
# return False immediately if no change to update
if new_data == old_data:
return
return False
params = _get_config()
@ -198,10 +206,12 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver):
(d, str(self.key_sets[table_name].add_and_get_key(d))))
conn.commit()
cur.close()
return True # return True indicating change made
except (Exception, psycopg2.Error):
LOG.exception("Error writing to DB")
# makes the next update use snapshot
self._clear_table_state(table_name)
return False # return False indicating no change made (rollback)
finally:
if conn is not None:
conn.close()
@ -259,11 +269,17 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver):
try:
self.update_from_datasource() # sets self.state
# publish those tables with polling update methods
overall_change_made = False
for tablename in self.update_methods:
use_snapshot = tablename not in self.prior_state
# Note(thread-safety): blocking call[
self.publish(tablename, self.state.get(tablename, set([])),
use_snapshot=use_snapshot)
this_table_change_made = self.publish(
tablename, self.state.get(tablename, set([])),
use_snapshot=use_snapshot)
overall_change_made = (overall_change_made
or this_table_change_made)
if overall_change_made:
self.exec_manager.evaluate_and_execute_actions()
except Exception as e:
self.last_error = e
LOG.exception("Datasource driver raised exception")
@ -302,6 +318,7 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver):
'expression "{}" fails to obtain exactly one match on JSON record'
' "{}".')
self._webhook_update_table(table_name, key=json_id, data=json_record)
self.exec_manager.evaluate_and_execute_actions()
def _webhook_update_table(self, table_name, key, data):
key_string = json.dumps(key, sort_keys=True)

View File

@ -36,7 +36,8 @@ from congress.api import status_model
from congress.api.system import driver_model
from congress.api import table_model
from congress.api import webhook_model
from congress.datasources import json_ingester
from congress.datasources.json_ingester import exec_api
from congress.datasources.json_ingester import json_ingester
from congress.db import datasources as db_datasources
from congress.dse2 import datasource_manager as ds_manager
from congress.dse2 import dse_node
@ -168,14 +169,21 @@ def create_json_ingester_datasources(bus):
ds_configs = utils.YamlConfigs(
cfg.CONF.json_ingester.config_path, 'name')
ds_configs.load_from_files()
exec_manager = exec_api.ExecApiManager(
ds_configs.loaded_structures.values())
datasources = []
for name in ds_configs.loaded_structures:
LOG.debug('creating datasource %s', name)
datasource_config = ds_configs.loaded_structures[name]
try:
service = json_ingester.JsonIngester(name, datasource_config)
bus.register_service(service)
datasources.append(service)
service = json_ingester.JsonIngester(
name, datasource_config, exec_manager)
if service:
# config w/o table used to define exec_api endpoint
# no service created in that case
bus.register_service(service)
datasources.append(service)
except Exception:
LOG.exception(
"Failed to create JsonIngester service {}.".format(name))

View File

@ -0,0 +1,188 @@
# Copyright (c) 2019 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import mock
from congress.datasources.json_ingester import exec_api
from congress.tests import base
def mock_spawn_execute(func, *args, **kwargs):
return func(*args, **kwargs)
class TestExecApiManager(base.TestCase):
@mock.patch('congress.datasources.datasource_utils.get_keystone_session')
def setUp(self, get_keystone_session):
super(TestExecApiManager, self).setUp()
get_keystone_session.side_effect = [
mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock()]
self.test_configs = [
{
"api_endpoint": "test1_url",
"tables": {
"flavors": {
"poll": {
"api_method": "get",
"api_path": "flavors/detail",
"jsonpath": "$.flavors[:]"
}
},
"servers": {
"poll": {
"api_method": "get",
"api_path": "servers/detail",
"jsonpath": "$.servers[:]"
}
},
"alarms": {
"webhook": {
"record_jsonpath": "$.payload",
"id_jsonpath": "$.id"
}
}
},
"authentication": {
"username": "admin",
"type": "keystone",
"project_name": "admin",
"password": "password",
"auth_url": "http://127.0.0.1/identity"
},
"poll_interval": 1,
"name": "test1"
},
{
"allow_exec_api": True,
"api_endpoint": "test2_url",
"tables": {
"flavors": {
"poll": {
"api_method": "get",
"api_path": "flavors/detail",
"jsonpath": "$.flavors[:]"
}
},
"servers": {
"poll": {
"api_method": "get",
"api_path": "servers/detail",
"jsonpath": "$.servers[:]"
}
},
"alarms": {
"webhook": {
"record_jsonpath": "$.payload",
"id_jsonpath": "$.id"
}
}
},
"authentication": {
"username": "admin",
"type": "keystone",
"project_name": "admin",
"password": "password",
"auth_url": "http://127.0.0.1/identity"
},
"poll_interval": 1,
"name": "test2"
},
{
"allow_exec_api": True,
"api_endpoint": "test3_url",
"authentication": {
"username": "admin",
"type": "keystone",
"project_name": "admin",
"password": "password",
"auth_url": "http://127.0.0.1/identity"
},
"name": "test3"
}
]
self.test_exec_mgr = exec_api.ExecApiManager(self.test_configs)
def test_init(self):
# 'test1' ignored because no "allow_exec_api": True
self.assertEqual(set(self.test_exec_mgr._exec_api_sessions.keys()),
set(['test2', 'test3']))
self.assertEqual(set(self.test_exec_mgr._exec_api_endpoints.keys()),
set(['test2', 'test3']))
def test_evaluate_and_execute_actions(self):
test_rows1 = set([1, 2, 3])
test_rows2 = set([2, 3, 4])
self.test_exec_mgr._read_all_execute_tables = mock.Mock(
spec_set=[], return_value=test_rows1)
self.test_exec_mgr._execute_exec_api_rows = mock.Mock(spec_set=[])
self.assertEqual(self.test_exec_mgr._last_exec_api_state, set([]))
self.test_exec_mgr.evaluate_and_execute_actions()
self.assertEqual(self.test_exec_mgr._last_exec_api_state, test_rows1)
self.test_exec_mgr._read_all_execute_tables.assert_called_once()
self.test_exec_mgr._execute_exec_api_rows.assert_called_once_with(
test_rows1)
self.test_exec_mgr._read_all_execute_tables = mock.Mock(
spec_set=[], return_value=test_rows2)
self.test_exec_mgr._execute_exec_api_rows = mock.Mock(spec_set=[])
self.test_exec_mgr.evaluate_and_execute_actions()
self.assertEqual(self.test_exec_mgr._last_exec_api_state, test_rows2)
self.test_exec_mgr._read_all_execute_tables.assert_called_once()
self.test_exec_mgr._execute_exec_api_rows.assert_called_once_with(
test_rows2 - test_rows1)
@mock.patch('eventlet.spawn_n', side_effect=mock_spawn_execute)
def test_execute_exec_api_rows(self, mock_spawn):
test_row1 = ('test1', 'path1', 'method1', '["body1"]', '["params1"]',
'["headers1"]')
test_row2a = ('test2', 'path2a', 'method2a', '["body2a"]',
'["params2a"]', '["headers2a"]')
test_row2b = ('test2', 'path2b', 'method2b', '["body2b"]',
'["params2b"]', '["headers2b"]')
test_row3 = ('test3', 'path3', 'method3', '["body3"]', '["params3"]',
'["headers3"]')
self.test_exec_mgr._execute_exec_api_rows(
[test_row1, test_row2a, test_row3, test_row2b])
self.assertEqual(
self.test_exec_mgr._exec_api_sessions['test2'].request.call_count,
2)
self.test_exec_mgr._exec_api_sessions[
'test2'].request.assert_any_call(
endpoint_override='test2_url', url='path2a', method='METHOD2A',
json=[u'body2a'], params=[u'params2a'], headers=[u'headers2a'],
connect_retries=10, status_code_retries=10)
self.test_exec_mgr._exec_api_sessions[
'test2'].request.assert_any_call(
endpoint_override='test2_url', url='path2b', method='METHOD2B',
json=[u'body2b'], params=[u'params2b'], headers=[u'headers2b'],
connect_retries=10, status_code_retries=10)
self.test_exec_mgr._exec_api_sessions[
'test3'].request.assert_called_once_with(
endpoint_override='test3_url', url='path3', method='METHOD3',
json=[u'body3'], params=[u'params3'], headers=[u'headers3'],
connect_retries=10, status_code_retries=10)

View File

@ -17,11 +17,12 @@ from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import mock
import uuid
import mock
from congress.datasources import datasource_driver
from congress.datasources import json_ingester
from congress.datasources.json_ingester import json_ingester
from congress import exception
from congress.tests import base
@ -72,8 +73,11 @@ class TestJsonIngester(base.TestCase):
"name": "nova"
}
from congress.datasources.json_ingester import exec_api
# exec_manager = exec_api.ExecApiManager([])
exec_manager_mock = mock.Mock(spec=exec_api.ExecApiManager)
self.test_driver = json_ingester.JsonIngester(
self.test_config['name'], self.test_config)
self.test_config['name'], self.test_config, exec_manager_mock)
def test_invalid_config_poll_plus_webhook(self):
invalid_config = self.test_config
@ -82,7 +86,7 @@ class TestJsonIngester(base.TestCase):
"id_jsonpath": "$.id"}
self.assertRaises(
exception.BadConfig, json_ingester.JsonIngester,
invalid_config['name'], invalid_config)
invalid_config['name'], invalid_config, None)
@mock.patch.object(json_ingester.JsonIngester, '_update_table')
def test_poll(self, _update_table):
@ -133,6 +137,8 @@ class TestJsonIngester(base.TestCase):
self.test_driver.json_ingester_webhook_handler('alarms', test_body)
_webhook_update_table.assert_called_once_with(
'alarms', key=42, data=test_body['payload'])
(self.test_driver.exec_manager.
evaluate_and_execute_actions.assert_called_once())
@mock.patch.object(json_ingester.JsonIngester, '_webhook_update_table')
def test_json_ingester_webhook_handler_non_primitive_key(
@ -142,6 +148,8 @@ class TestJsonIngester(base.TestCase):
self.test_driver.json_ingester_webhook_handler('alarms', test_body)
_webhook_update_table.assert_called_once_with(
'alarms', key=test_key, data=test_body['payload'])
(self.test_driver.exec_manager.
evaluate_and_execute_actions.assert_called_once())
@mock.patch.object(json_ingester.JsonIngester, '_webhook_update_table')
def test_json_ingester_webhook_handler_missing_payload(

View File

@ -1,5 +1,6 @@
name: nova
poll_interval: 60
poll_interval: 5
allow_exec_api: true
authentication:
type: keystone
username: admin

View File

@ -17,11 +17,13 @@ from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import mock
from oslo_log import log as logging
from congress.datasources import datasource_driver
from congress.datasources import datasource_utils
from congress.datasources import json_ingester
from congress.datasources.json_ingester import exec_api
from congress.datasources.json_ingester import json_ingester
LOG = logging.getLogger(__name__)
@ -92,7 +94,8 @@ class FakeJsonIngester(json_ingester.JsonIngester):
},
"name": name
}
super(FakeJsonIngester, self).__init__(name, config)
super(FakeJsonIngester, self).__init__(
name, config, mock.Mock(spec_set=exec_api.ExecApiManager))
# override for unit testing
def _create_schema_and_tables(self):