diff --git a/congress/datasources/json_ingester/__init__.py b/congress/datasources/json_ingester/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/congress/datasources/json_ingester/exec_api.py b/congress/datasources/json_ingester/exec_api.py new file mode 100644 index 000000000..328be3a29 --- /dev/null +++ b/congress/datasources/json_ingester/exec_api.py @@ -0,0 +1,170 @@ +# Copyright (c) 2019 VMware, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from __future__ import print_function +from __future__ import division +from __future__ import absolute_import + +import json + +import eventlet +from oslo_concurrency import lockutils +from oslo_config import cfg +from oslo_log import log as logging +import psycopg2 +from psycopg2 import sql + +from congress.datasources import datasource_utils + + +LOG = logging.getLogger(__name__) + + +def _get_config(): + return {'host': cfg.CONF.json_ingester.postgres_host, + 'database': cfg.CONF.json_ingester.postgres_database, + 'user': cfg.CONF.json_ingester.postgres_user, + 'password': cfg.CONF.json_ingester.postgres_password} + + +class ExecApiManager(object): + def __init__(self, configs): + super(ExecApiManager, self).__init__() + self._exec_api_sessions = {} + self._exec_api_endpoints = {} + # state tracking the most recent state consisting of the union + # of all the rows from all the _exec_api tables + # used to determine which rows are new + self._last_exec_api_state = set([]) + + for config in configs: + # FIXME(json_ingester): validate config + if config.get('allow_exec_api', False) is True: + name = config['name'] + self._exec_api_endpoints[name] = config['api_endpoint'] + self._exec_api_sessions[ + name] = datasource_utils.get_keystone_session( + config['authentication']) + + @lockutils.synchronized('congress_json_ingester_exec_api') + def evaluate_and_execute_actions(self): + # FIXME(json_ingester): retry + new_exec_api_state = self._read_all_execute_tables() + + new_exec_api_rows = new_exec_api_state - self._last_exec_api_state + LOG.debug('New exec_api rows %s', new_exec_api_rows) + self._execute_exec_api_rows(new_exec_api_rows) + self._last_exec_api_state = new_exec_api_state + + def _execute_exec_api_rows(self, rows): + def exec_api(session, kwargs): + LOG.info("Making API request %s.", kwargs) + try: + session.request(**kwargs) + except Exception: + LOG.exception('Exception in making API request %s.', kwargs) + + for row in rows: + (endpoint, path, method, body, parameters, headers) = row + if endpoint in self._exec_api_endpoints: + kwargs = { + 'endpoint_override': self._exec_api_endpoints[endpoint], + 'url': path, + 'method': method.upper(), + 'connect_retries': 10, + 'status_code_retries': 10} + body = json.loads(body) + if body is not None: + kwargs['json'] = body + parameters = json.loads(parameters) + if parameters is not None: + kwargs['params'] = parameters + headers = json.loads(headers) + if headers is not None: + kwargs['headers'] = headers + + if cfg.CONF.enable_execute_action: + eventlet.spawn_n( + exec_api, self._exec_api_sessions[endpoint], kwargs) + else: + LOG.info("Simulating API request %s", kwargs) + else: + LOG.warning( + 'No configured API endpoint with name %s. ' + 'Skipping the API request: ' + '(endpoint, path, method, body, parameters, headers) ' + '= %s.', endpoint, row) + eventlet.sleep(0) # defer to greenthreads running api requests + + @staticmethod + def _read_all_execute_tables(): + params = _get_config() + + def json_rows_to_str_rows(json_rows): + # FIXME(json_ingester): validate; log and drop invalid rows + return [(endpoint, path, method, json.dumps(body, sort_keys=True), + json.dumps(parameters, sort_keys=True), + json.dumps(headers, sort_keys=True)) for + (endpoint, path, method, body, parameters, headers) + in json_rows] + + FIND_ALL_EXEC_VIEWS = """ + SELECT table_schema, table_name FROM information_schema.tables + WHERE table_schema NOT LIKE 'pg\_%' + AND table_schema <> 'information_schema' + AND table_name LIKE '\_exec_api';""" + READ_EXEC_VIEW = """ + SELECT endpoint, path, method, body, parameters, headers + FROM {}.{};""" + conn = None + try: + conn = psycopg2.connect(**params) + # repeatable read to make sure all the _exec_api rows from all + # schemas are obtained at the same snapshot + conn.set_session( + isolation_level=psycopg2.extensions. + ISOLATION_LEVEL_REPEATABLE_READ, + readonly=True, autocommit=False) + cur = conn.cursor() + # find all _exec_api tables + cur.execute(sql.SQL(FIND_ALL_EXEC_VIEWS)) + all_exec_api_tables = cur.fetchall() + + # read each _exec_api_table + all_exec_api_rows = set([]) + for (table_schema, table_name) in all_exec_api_tables: + try: + cur.execute(sql.SQL(READ_EXEC_VIEW).format( + sql.Identifier(table_schema), + sql.Identifier(table_name))) + all_rows = cur.fetchall() + all_exec_api_rows.update( + json_rows_to_str_rows(all_rows)) + except psycopg2.ProgrammingError: + LOG.warning('The "%s" table in the "%s" schema does not ' + 'have the right columns for API execution. ' + 'Its content is ignored for the purpose of ' + 'API execution. Please check and correct the ' + 'view definition.', + table_name, table_schema) + conn.commit() + cur.close() + return all_exec_api_rows + except (Exception, psycopg2.Error): + LOG.exception("Error reading from DB") + raise + finally: + if conn is not None: + conn.close() diff --git a/congress/datasources/json_ingester.py b/congress/datasources/json_ingester/json_ingester.py similarity index 93% rename from congress/datasources/json_ingester.py rename to congress/datasources/json_ingester/json_ingester.py index dff0ceb3d..31c5ff95e 100644 --- a/congress/datasources/json_ingester.py +++ b/congress/datasources/json_ingester/json_ingester.py @@ -45,7 +45,8 @@ def _get_config(): class JsonIngester(datasource_driver.PollingDataSourceDriver): - def __init__(self, name, config): + + def __init__(self, name, config, exec_manager): def validate_config(config): # FIXME: use json schema to validate config @@ -66,8 +67,14 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver): # use prefix to avoid service_id clash with regular data sources super(JsonIngester, self).__init__( api_base.JSON_DS_SERVICE_PREFIX + name) + self.exec_manager = exec_manager # ref to global mgr for api exec self.type = 'json_ingester' self.name = name # set name back to one without prefix for use here + if 'tables' not in config: + # config w/o table used to define exec_api endpoint + # in this case, no need to create datasource service + return + validate_config(config) self._config = config self._create_schema_and_tables() @@ -111,9 +118,10 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver): LOG.trace('publish(self=%s, table=%s, data=%s, use_snapshot=%s', self, table, data, use_snapshot) - self._update_table(table, new_data=data, - old_data=self.prior_state.get(table, set([])), - use_snapshot=use_snapshot) + return self._update_table( + table, new_data=data, + old_data=self.prior_state.get(table, set([])), + use_snapshot=use_snapshot) def _create_schema_and_tables(self): params = _get_config() @@ -159,9 +167,9 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver): def _update_table( self, table_name, new_data, old_data, use_snapshot): - # return immediately if no change to update + # return False immediately if no change to update if new_data == old_data: - return + return False params = _get_config() @@ -198,10 +206,12 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver): (d, str(self.key_sets[table_name].add_and_get_key(d)))) conn.commit() cur.close() + return True # return True indicating change made except (Exception, psycopg2.Error): LOG.exception("Error writing to DB") # makes the next update use snapshot self._clear_table_state(table_name) + return False # return False indicating no change made (rollback) finally: if conn is not None: conn.close() @@ -259,11 +269,17 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver): try: self.update_from_datasource() # sets self.state # publish those tables with polling update methods + overall_change_made = False for tablename in self.update_methods: use_snapshot = tablename not in self.prior_state # Note(thread-safety): blocking call[ - self.publish(tablename, self.state.get(tablename, set([])), - use_snapshot=use_snapshot) + this_table_change_made = self.publish( + tablename, self.state.get(tablename, set([])), + use_snapshot=use_snapshot) + overall_change_made = (overall_change_made + or this_table_change_made) + if overall_change_made: + self.exec_manager.evaluate_and_execute_actions() except Exception as e: self.last_error = e LOG.exception("Datasource driver raised exception") @@ -302,6 +318,7 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver): 'expression "{}" fails to obtain exactly one match on JSON record' ' "{}".') self._webhook_update_table(table_name, key=json_id, data=json_record) + self.exec_manager.evaluate_and_execute_actions() def _webhook_update_table(self, table_name, key, data): key_string = json.dumps(key, sort_keys=True) diff --git a/congress/harness.py b/congress/harness.py index 6dce1913a..596d91f0c 100644 --- a/congress/harness.py +++ b/congress/harness.py @@ -36,7 +36,8 @@ from congress.api import status_model from congress.api.system import driver_model from congress.api import table_model from congress.api import webhook_model -from congress.datasources import json_ingester +from congress.datasources.json_ingester import exec_api +from congress.datasources.json_ingester import json_ingester from congress.db import datasources as db_datasources from congress.dse2 import datasource_manager as ds_manager from congress.dse2 import dse_node @@ -168,14 +169,21 @@ def create_json_ingester_datasources(bus): ds_configs = utils.YamlConfigs( cfg.CONF.json_ingester.config_path, 'name') ds_configs.load_from_files() + exec_manager = exec_api.ExecApiManager( + ds_configs.loaded_structures.values()) + datasources = [] for name in ds_configs.loaded_structures: LOG.debug('creating datasource %s', name) datasource_config = ds_configs.loaded_structures[name] try: - service = json_ingester.JsonIngester(name, datasource_config) - bus.register_service(service) - datasources.append(service) + service = json_ingester.JsonIngester( + name, datasource_config, exec_manager) + if service: + # config w/o table used to define exec_api endpoint + # no service created in that case + bus.register_service(service) + datasources.append(service) except Exception: LOG.exception( "Failed to create JsonIngester service {}.".format(name)) diff --git a/congress/tests/datasources/json_ingester/__init__.py b/congress/tests/datasources/json_ingester/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/congress/tests/datasources/json_ingester/test_exec_api.py b/congress/tests/datasources/json_ingester/test_exec_api.py new file mode 100644 index 000000000..319155022 --- /dev/null +++ b/congress/tests/datasources/json_ingester/test_exec_api.py @@ -0,0 +1,188 @@ +# Copyright (c) 2019 VMware, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from __future__ import print_function +from __future__ import division +from __future__ import absolute_import + +import mock + +from congress.datasources.json_ingester import exec_api +from congress.tests import base + + +def mock_spawn_execute(func, *args, **kwargs): + return func(*args, **kwargs) + + +class TestExecApiManager(base.TestCase): + + @mock.patch('congress.datasources.datasource_utils.get_keystone_session') + def setUp(self, get_keystone_session): + super(TestExecApiManager, self).setUp() + + get_keystone_session.side_effect = [ + mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock()] + + self.test_configs = [ + { + "api_endpoint": "test1_url", + "tables": { + "flavors": { + "poll": { + "api_method": "get", + "api_path": "flavors/detail", + "jsonpath": "$.flavors[:]" + } + }, + "servers": { + "poll": { + "api_method": "get", + "api_path": "servers/detail", + "jsonpath": "$.servers[:]" + } + }, + "alarms": { + "webhook": { + "record_jsonpath": "$.payload", + "id_jsonpath": "$.id" + } + } + }, + "authentication": { + "username": "admin", + "type": "keystone", + "project_name": "admin", + "password": "password", + "auth_url": "http://127.0.0.1/identity" + }, + "poll_interval": 1, + "name": "test1" + }, + { + "allow_exec_api": True, + "api_endpoint": "test2_url", + "tables": { + "flavors": { + "poll": { + "api_method": "get", + "api_path": "flavors/detail", + "jsonpath": "$.flavors[:]" + } + }, + "servers": { + "poll": { + "api_method": "get", + "api_path": "servers/detail", + "jsonpath": "$.servers[:]" + } + }, + "alarms": { + "webhook": { + "record_jsonpath": "$.payload", + "id_jsonpath": "$.id" + } + } + }, + "authentication": { + "username": "admin", + "type": "keystone", + "project_name": "admin", + "password": "password", + "auth_url": "http://127.0.0.1/identity" + }, + "poll_interval": 1, + "name": "test2" + }, + { + "allow_exec_api": True, + "api_endpoint": "test3_url", + "authentication": { + "username": "admin", + "type": "keystone", + "project_name": "admin", + "password": "password", + "auth_url": "http://127.0.0.1/identity" + }, + "name": "test3" + } + ] + + self.test_exec_mgr = exec_api.ExecApiManager(self.test_configs) + + def test_init(self): + # 'test1' ignored because no "allow_exec_api": True + self.assertEqual(set(self.test_exec_mgr._exec_api_sessions.keys()), + set(['test2', 'test3'])) + self.assertEqual(set(self.test_exec_mgr._exec_api_endpoints.keys()), + set(['test2', 'test3'])) + + def test_evaluate_and_execute_actions(self): + test_rows1 = set([1, 2, 3]) + test_rows2 = set([2, 3, 4]) + + self.test_exec_mgr._read_all_execute_tables = mock.Mock( + spec_set=[], return_value=test_rows1) + self.test_exec_mgr._execute_exec_api_rows = mock.Mock(spec_set=[]) + + self.assertEqual(self.test_exec_mgr._last_exec_api_state, set([])) + self.test_exec_mgr.evaluate_and_execute_actions() + + self.assertEqual(self.test_exec_mgr._last_exec_api_state, test_rows1) + self.test_exec_mgr._read_all_execute_tables.assert_called_once() + self.test_exec_mgr._execute_exec_api_rows.assert_called_once_with( + test_rows1) + + self.test_exec_mgr._read_all_execute_tables = mock.Mock( + spec_set=[], return_value=test_rows2) + self.test_exec_mgr._execute_exec_api_rows = mock.Mock(spec_set=[]) + + self.test_exec_mgr.evaluate_and_execute_actions() + self.assertEqual(self.test_exec_mgr._last_exec_api_state, test_rows2) + self.test_exec_mgr._read_all_execute_tables.assert_called_once() + self.test_exec_mgr._execute_exec_api_rows.assert_called_once_with( + test_rows2 - test_rows1) + + @mock.patch('eventlet.spawn_n', side_effect=mock_spawn_execute) + def test_execute_exec_api_rows(self, mock_spawn): + test_row1 = ('test1', 'path1', 'method1', '["body1"]', '["params1"]', + '["headers1"]') + test_row2a = ('test2', 'path2a', 'method2a', '["body2a"]', + '["params2a"]', '["headers2a"]') + test_row2b = ('test2', 'path2b', 'method2b', '["body2b"]', + '["params2b"]', '["headers2b"]') + test_row3 = ('test3', 'path3', 'method3', '["body3"]', '["params3"]', + '["headers3"]') + + self.test_exec_mgr._execute_exec_api_rows( + [test_row1, test_row2a, test_row3, test_row2b]) + self.assertEqual( + self.test_exec_mgr._exec_api_sessions['test2'].request.call_count, + 2) + self.test_exec_mgr._exec_api_sessions[ + 'test2'].request.assert_any_call( + endpoint_override='test2_url', url='path2a', method='METHOD2A', + json=[u'body2a'], params=[u'params2a'], headers=[u'headers2a'], + connect_retries=10, status_code_retries=10) + self.test_exec_mgr._exec_api_sessions[ + 'test2'].request.assert_any_call( + endpoint_override='test2_url', url='path2b', method='METHOD2B', + json=[u'body2b'], params=[u'params2b'], headers=[u'headers2b'], + connect_retries=10, status_code_retries=10) + self.test_exec_mgr._exec_api_sessions[ + 'test3'].request.assert_called_once_with( + endpoint_override='test3_url', url='path3', method='METHOD3', + json=[u'body3'], params=[u'params3'], headers=[u'headers3'], + connect_retries=10, status_code_retries=10) diff --git a/congress/tests/datasources/test_json_ingester.py b/congress/tests/datasources/json_ingester/test_json_ingester.py similarity index 93% rename from congress/tests/datasources/test_json_ingester.py rename to congress/tests/datasources/json_ingester/test_json_ingester.py index 5bf1e5248..5c2ac07f0 100644 --- a/congress/tests/datasources/test_json_ingester.py +++ b/congress/tests/datasources/json_ingester/test_json_ingester.py @@ -17,11 +17,12 @@ from __future__ import print_function from __future__ import division from __future__ import absolute_import -import mock import uuid +import mock + from congress.datasources import datasource_driver -from congress.datasources import json_ingester +from congress.datasources.json_ingester import json_ingester from congress import exception from congress.tests import base @@ -72,8 +73,11 @@ class TestJsonIngester(base.TestCase): "name": "nova" } + from congress.datasources.json_ingester import exec_api + # exec_manager = exec_api.ExecApiManager([]) + exec_manager_mock = mock.Mock(spec=exec_api.ExecApiManager) self.test_driver = json_ingester.JsonIngester( - self.test_config['name'], self.test_config) + self.test_config['name'], self.test_config, exec_manager_mock) def test_invalid_config_poll_plus_webhook(self): invalid_config = self.test_config @@ -82,7 +86,7 @@ class TestJsonIngester(base.TestCase): "id_jsonpath": "$.id"} self.assertRaises( exception.BadConfig, json_ingester.JsonIngester, - invalid_config['name'], invalid_config) + invalid_config['name'], invalid_config, None) @mock.patch.object(json_ingester.JsonIngester, '_update_table') def test_poll(self, _update_table): @@ -133,6 +137,8 @@ class TestJsonIngester(base.TestCase): self.test_driver.json_ingester_webhook_handler('alarms', test_body) _webhook_update_table.assert_called_once_with( 'alarms', key=42, data=test_body['payload']) + (self.test_driver.exec_manager. + evaluate_and_execute_actions.assert_called_once()) @mock.patch.object(json_ingester.JsonIngester, '_webhook_update_table') def test_json_ingester_webhook_handler_non_primitive_key( @@ -142,6 +148,8 @@ class TestJsonIngester(base.TestCase): self.test_driver.json_ingester_webhook_handler('alarms', test_body) _webhook_update_table.assert_called_once_with( 'alarms', key=test_key, data=test_body['payload']) + (self.test_driver.exec_manager. + evaluate_and_execute_actions.assert_called_once()) @mock.patch.object(json_ingester.JsonIngester, '_webhook_update_table') def test_json_ingester_webhook_handler_missing_payload( diff --git a/congress/tests/etc/datasources/nova.yaml b/congress/tests/etc/datasources/nova.yaml index ee1aea985..afc9cd244 100644 --- a/congress/tests/etc/datasources/nova.yaml +++ b/congress/tests/etc/datasources/nova.yaml @@ -1,5 +1,6 @@ name: nova -poll_interval: 60 +poll_interval: 5 +allow_exec_api: true authentication: type: keystone username: admin diff --git a/congress/tests/fake_datasource.py b/congress/tests/fake_datasource.py index c8c2344b3..f73754910 100644 --- a/congress/tests/fake_datasource.py +++ b/congress/tests/fake_datasource.py @@ -17,11 +17,13 @@ from __future__ import print_function from __future__ import division from __future__ import absolute_import +import mock from oslo_log import log as logging from congress.datasources import datasource_driver from congress.datasources import datasource_utils -from congress.datasources import json_ingester +from congress.datasources.json_ingester import exec_api +from congress.datasources.json_ingester import json_ingester LOG = logging.getLogger(__name__) @@ -92,7 +94,8 @@ class FakeJsonIngester(json_ingester.JsonIngester): }, "name": name } - super(FakeJsonIngester, self).__init__(name, config) + super(FakeJsonIngester, self).__init__( + name, config, mock.Mock(spec_set=exec_api.ExecApiManager)) # override for unit testing def _create_schema_and_tables(self):