Merge "Generic webhook processer for JsonIngester"

This commit is contained in:
Zuul 2019-02-28 04:01:19 +00:00 committed by Gerrit Code Review
commit 5ea52088ed
9 changed files with 318 additions and 63 deletions

View File

@ -151,6 +151,13 @@ class APIRouterV1(object):
webhook)
resource_mgr.register_handler(webhook_collection_handler)
# Setup /v1/data-sources/<ds_id>/tables/<table_name>/webhook
json_ingester_webhook_path = \
"%s/tables/(?P<table_name>[^/]+)/webhook" % ds_path
json_ingester_webhook_collection_handler = \
webservice.CollectionHandler(json_ingester_webhook_path, webhook)
resource_mgr.register_handler(json_ingester_webhook_collection_handler)
# Setup /v1/system/datasource-drivers
system = process_dict['api-system']
# NOTE(arosen): start url out with datasource-drivers since we don't

View File

@ -35,9 +35,18 @@ class WebhookModel(base.APIModel):
:param context: Key-values providing frame of reference of request
"""
caller, source_id = api_utils.get_id_from_context(context)
table_name = context.get('table_name')
try:
args = {'payload': item}
# Note(thread-safety): blocking call
self.invoke_rpc(caller, 'process_webhook_notification', args)
if table_name: # json ingester case
args = {'table_name': table_name,
'body': item}
# Note(thread-safety): blocking call
self.invoke_rpc(base.JSON_DS_SERVICE_PREFIX + caller,
'json_ingester_webhook_handler', args)
else:
args = {'payload': item}
# Note(thread-safety): blocking call
self.invoke_rpc(caller, 'process_webhook_notification', args)
except exception.CongressException as e:
raise webservice.DataModelException.create(e)

View File

@ -1,4 +1,4 @@
# Copyright (c) 2019 VMware, Inc. All rights reserved.
# Copyright (c) 2018, 2019 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -30,6 +30,7 @@ from psycopg2 import sql
from congress.api import base as api_base
from congress.datasources import datasource_driver
from congress.datasources import datasource_utils
from congress.dse2 import data_service
from congress import exception
@ -43,20 +44,46 @@ def _get_config():
'password': cfg.CONF.json_ingester.postgres_password}
class PollingJsonIngester(datasource_driver.PollingDataSourceDriver):
class JsonIngester(datasource_driver.PollingDataSourceDriver):
def __init__(self, name, config):
def validate_config(config):
# FIXME: use json schema to validate config
config_tables = config['tables']
poll_tables = [table for table in config_tables
if 'poll' in config_tables[table]]
if len(poll_tables) > 0:
# FIXME: when polling table exists, require configs:
# api_endpoint, authentication
pass
for table_name in config_tables:
if ('poll' in config_tables[table_name]
and 'webhook' in config_tables[table_name]):
raise exception.BadConfig(
'Table ({}) cannot be configured for '
'both poll and webhook.'.format(table_name))
# use prefix to avoid service_id clash with regular data sources
super(PollingJsonIngester, self).__init__(
super(JsonIngester, self).__init__(
api_base.JSON_DS_SERVICE_PREFIX + name)
self.type = 'json_ingester'
self.name = name # set name back to one without prefix for use here
validate_config(config)
self._config = config
self._create_schema_and_tables()
self.poll_time = self._config.get('poll', 60)
self.poll_time = self._config.get('poll_interval', 60)
self._setup_table_key_sets()
self._api_endpoint = self._config['api_endpoint']
self._api_endpoint = self._config.get('api_endpoint')
self._initialize_session()
self._initialize_update_methods()
self._init_end_start_poll()
if len(self.update_methods) > 0:
self._init_end_start_poll()
else:
self.initialized = True
# For DSE2. Must go after __init__
if hasattr(self, 'add_rpc_endpoint'):
self.add_rpc_endpoint(JsonIngesterEndpoints(self))
def _setup_table_key_sets(self):
# because postgres cannot directly use the jsonb column d as key,
@ -94,7 +121,7 @@ class PollingJsonIngester(datasource_driver.PollingDataSourceDriver):
create_schema_statement = """CREATE SCHEMA IF NOT EXISTS {};"""
create_table_statement = """
CREATE TABLE IF NOT EXISTS {}.{}
(d jsonb, _key bigint, primary key (_key));"""
(d jsonb, _key text, primary key (_key));"""
# Note: because postgres cannot directly use the jsonb column d as key,
# the _key column is added as key in order to support performant
# delete of specific rows in delta update to the db table
@ -113,11 +140,12 @@ class PollingJsonIngester(datasource_driver.PollingDataSourceDriver):
# create table
cur.execute(sql.SQL(create_table_statement).format(
sql.Identifier(self.name), sql.Identifier(table_name)))
# TODO(ekcs): make index creation optional
cur.execute(sql.SQL(create_index_statement).format(
sql.Identifier(self.name), sql.Identifier(table_name)))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError):
except (Exception, psycopg2.Error):
if 'table_name' in locals():
LOG.exception("Error creating table %s in schema %s",
table_name, self.name)
@ -141,7 +169,7 @@ class PollingJsonIngester(datasource_driver.PollingDataSourceDriver):
VALUES(%s, %s);"""
delete_all_statement = """DELETE FROM {}.{};"""
delete_tuple_statement = """
DELETE FROM {}.{} WHERE _key == %s;"""
DELETE FROM {}.{} WHERE _key = %s;"""
conn = None
try:
conn = psycopg2.connect(**params)
@ -159,17 +187,18 @@ class PollingJsonIngester(datasource_driver.PollingDataSourceDriver):
for d in to_delete:
cur.execute(sql.SQL(delete_tuple_statement).format(
sql.Identifier(self.name),
sql.Identifier(table_name),
(self.key_sets[table_name].remove_and_get_key(d),)))
sql.Identifier(table_name)),
(str(self.key_sets[table_name].remove_and_get_key(d)),)
)
# insert new data into table
for d in to_insert:
cur.execute(sql.SQL(insert_statement).format(
sql.Identifier(self.name),
sql.Identifier(table_name)),
(d, self.key_sets[table_name].add_and_get_key(d)))
(d, str(self.key_sets[table_name].add_and_get_key(d))))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError):
except (Exception, psycopg2.Error):
LOG.exception("Error writing to DB")
# makes the next update use snapshot
self._clear_table_state(table_name)
@ -185,31 +214,35 @@ class PollingJsonIngester(datasource_driver.PollingDataSourceDriver):
self.update_methods[table_name] = method
def _initialize_session(self):
self._session = datasource_utils.get_keystone_session(
self._config['authentication'])
if 'authentication' in self._config:
self._session = datasource_utils.get_keystone_session(
self._config['authentication'])
def _initialize_update_methods(self):
for table_name in self._config['tables']:
table_info = self._config['tables'][table_name]
if 'poll' in self._config['tables'][table_name]:
table_info = self._config['tables'][table_name]['poll']
# Note: using default parameters to get early-binding of variables
# in closure
def update_method(table_name=table_name, table_info=table_info):
full_path = self._api_endpoint.rstrip('/') + '/' + table_info[
'api_path'].lstrip('/')
result = self._session.get(full_path).json()
# FIXME: generalize to other verbs?
# Note: using default parameters to get early-binding of
# variables in closure
def update_method(
table_name=table_name, table_info=table_info):
full_path = self._api_endpoint.rstrip(
'/') + '/' + table_info['api_path'].lstrip('/')
result = self._session.get(full_path).json()
# FIXME: generalize to other verbs?
jsonpath_expr = parser.parse(table_info['jsonpath'])
ingest_data = [match.value for match in
jsonpath_expr.find(result)]
self.state[table_name] = set(
[json.dumps(item, sort_keys=True) for item in ingest_data])
jsonpath_expr = parser.parse(table_info['jsonpath'])
ingest_data = [match.value for match in
jsonpath_expr.find(result)]
self.state[table_name] = set(
[json.dumps(item, sort_keys=True)
for item in ingest_data])
self.add_update_method(update_method, table_name)
self.add_update_method(update_method, table_name)
def update_from_datasource(self):
for table in self._config['tables']:
for table in self.update_methods:
LOG.debug('update table %s.' % table)
self.update_methods[table]()
@ -225,10 +258,11 @@ class PollingJsonIngester(datasource_driver.PollingDataSourceDriver):
self.last_error = None # non-None only when last poll errored
try:
self.update_from_datasource() # sets self.state
for tablename in self.state:
# publish those tables with polling update methods
for tablename in self.update_methods:
use_snapshot = tablename not in self.prior_state
# Note(thread-safety): blocking call[
self.publish(tablename, self.state[tablename],
self.publish(tablename, self.state.get(tablename, set([])),
use_snapshot=use_snapshot)
except Exception as e:
self.last_error = e
@ -238,6 +272,73 @@ class PollingJsonIngester(datasource_driver.PollingDataSourceDriver):
self.number_of_updates += 1
LOG.info("%s:: finished polling", self.name)
def json_ingester_webhook_handler(self, table_name, body):
def get_exactly_one_jsonpath_match(
jsonpath, jsondata, custom_error_msg):
jsonpath_expr = parser.parse(jsonpath)
matches = jsonpath_expr.find(jsondata)
if len(matches) != 1:
raise exception.BadRequest(
custom_error_msg.format(jsonpath, jsondata))
return matches[0].value
try:
webhook_config = self._config['tables'][table_name]['webhook']
except KeyError:
raise exception.NotFound(
'In JSON Ingester: "{}", the table "{}" either does not exist '
'or is not configured for webhook.'.format(
self.name, table_name))
json_record = get_exactly_one_jsonpath_match(
webhook_config['record_jsonpath'], body,
'In identifying JSON record from webhook body, the configured '
'jsonpath expression "{}" fails to obtain exactly one match on '
'webhook body "{}".')
json_id = get_exactly_one_jsonpath_match(
webhook_config['id_jsonpath'], json_record,
'In identifying ID from JSON record, the configured jsonpath '
'expression "{}" fails to obtain exactly one match on JSON record'
' "{}".')
self._webhook_update_table(table_name, key=json_id, data=json_record)
def _webhook_update_table(self, table_name, key, data):
key_string = json.dumps(key, sort_keys=True)
PGSQL_MAX_INDEXABLE_SIZE = 2712
if len(key_string) > PGSQL_MAX_INDEXABLE_SIZE:
raise exception.BadRequest(
'The supplied key ({}) exceeds the max indexable size ({}) in '
'PostgreSQL.'.format(key_string, PGSQL_MAX_INDEXABLE_SIZE))
params = _get_config()
insert_statement = """INSERT INTO {}.{}
VALUES(%s, %s);"""
delete_tuple_statement = """
DELETE FROM {}.{} WHERE _key = %s;"""
conn = None
try:
conn = psycopg2.connect(**params)
cur = conn.cursor()
# delete the appropriate row from table
cur.execute(sql.SQL(delete_tuple_statement).format(
sql.Identifier(self.name),
sql.Identifier(table_name)),
(key_string,))
# insert new row into table
cur.execute(sql.SQL(insert_statement).format(
sql.Identifier(self.name),
sql.Identifier(table_name)),
(json.dumps(data), key_string))
conn.commit()
cur.close()
except (Exception, psycopg2.Error):
LOG.exception("Error writing to DB")
finally:
if conn is not None:
conn.close()
def validate_lazy_tables(self):
'''override non-applicable parent method as no-op'''
pass
@ -267,6 +368,16 @@ class PollingJsonIngester(datasource_driver.PollingDataSourceDriver):
'This method should not be called in PollingJsonIngester.')
class JsonIngesterEndpoints(data_service.DataServiceEndPoints):
def __init__(self, service):
super(JsonIngesterEndpoints, self).__init__(service)
# Note (thread-safety): blocking function
def json_ingester_webhook_handler(self, context, table_name, body):
# Note (thread-safety): blocking call
return self.service.json_ingester_webhook_handler(table_name, body)
class KeyMap(object):
'''Map associating a unique integer key with each hashable object'''

View File

@ -172,9 +172,13 @@ def create_json_ingester_datasources(bus):
for name in ds_configs.loaded_structures:
LOG.debug('creating datasource %s', name)
datasource_config = ds_configs.loaded_structures[name]
service = json_ingester.PollingJsonIngester(name, datasource_config)
bus.register_service(service)
datasources.append(service)
try:
service = json_ingester.JsonIngester(name, datasource_config)
bus.register_service(service)
datasources.append(service)
except Exception:
LOG.exception(
"Failed to create JsonIngester service {}.".format(name))
return datasources

View File

@ -25,7 +25,7 @@ from congress.tests import helper
def setup_config(with_fake_datasource=True, node_id='testnode',
same_partition_as_node=None, api=True, policy=True,
datasources=True):
datasources=True, with_fake_json_ingester=False):
"""Setup DseNode for testing.
:param: services is an array of DataServices
@ -59,6 +59,11 @@ def setup_config(with_fake_datasource=True, node_id='testnode',
data.type = 'no_sync_datasource_driver'
node.register_service(data)
ingester = None
if with_fake_json_ingester:
ingester = fake_datasource.FakeJsonIngester()
node.register_service(ingester)
engine_service = None
library_service = None
api_service = None
@ -71,4 +76,5 @@ def setup_config(with_fake_datasource=True, node_id='testnode',
ds_manager = services['ds_manager']
return {'node': node, 'engine': engine_service, 'library': library_service,
'data': data, 'api': api_service, 'ds_manager': ds_manager}
'data': data, 'api': api_service, 'ds_manager': ds_manager,
'json_ingester': ingester}

View File

@ -24,13 +24,22 @@ from congress.tests import base
class TestWebhookModel(base.SqlTestCase):
def setUp(self):
super(TestWebhookModel, self).setUp()
services = api_base.setup_config()
services = api_base.setup_config(with_fake_json_ingester=True)
self.webhook_model = services['api']['api-webhook']
self.node = services['node']
self.data = services['data']
self.json_ingester = services['json_ingester']
def test_add_item(self):
context = {'ds_id': self.data.service_id}
payload = {'test_payload': 'test_payload'}
self.webhook_model.add_item(payload, {}, context=context)
self.assertEqual(self.data.webhook_payload, payload)
def test_add_json_ingester(self):
context = {'ds_id': self.json_ingester.name, 'table_name': 'table1'}
payload = {'test_payload': 'test_payload'}
self.webhook_model.add_item(payload, {}, context=context)
self.assertEqual(self.json_ingester.webhook_payload, payload)
self.assertEqual(
self.json_ingester.webhook_table_name, context['table_name'])

View File

@ -1,4 +1,4 @@
# Copyright (c) 2019 VMware Inc All rights reserved.
# Copyright (c) 2019 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -22,32 +22,43 @@ import uuid
from congress.datasources import datasource_driver
from congress.datasources import json_ingester
from congress import exception
from congress.tests import base
class TestPollingJsonIngester(base.TestCase):
class TestJsonIngester(base.TestCase):
@mock.patch('congress.datasources.datasource_utils.get_keystone_session')
@mock.patch.object(
datasource_driver.PollingDataSourceDriver, '_init_end_start_poll')
@mock.patch.object(
json_ingester.PollingJsonIngester, '_create_schema_and_tables')
json_ingester.JsonIngester, '_create_schema_and_tables')
def setUp(self, _create_schema_and_tables, _init_end_start_poll,
get_keystone_session):
super(TestPollingJsonIngester, self).setUp()
super(TestJsonIngester, self).setUp()
test_config = {
self.test_config = {
"api_endpoint": "http://127.0.0.1/compute/v2.1/",
"tables": {
"flavors": {
"api_verb": "get",
"api_path": "flavors/detail",
"jsonpath": "$.flavors[:]"
"poll": {
"api_method": "get",
"api_path": "flavors/detail",
"jsonpath": "$.flavors[:]"
}
},
"servers": {
"api_verb": "get",
"api_path": "servers/detail",
"jsonpath": "$.servers[:]"
"poll": {
"api_method": "get",
"api_path": "servers/detail",
"jsonpath": "$.servers[:]"
}
},
"alarms": {
"webhook": {
"record_jsonpath": "$.payload",
"id_jsonpath": "$.id"
}
}
},
"authentication": {
@ -57,14 +68,23 @@ class TestPollingJsonIngester(base.TestCase):
"password": "password",
"auth_url": "http://127.0.0.1/identity"
},
"poll": 1,
"poll_interval": 1,
"name": "nova"
}
self.test_driver = json_ingester.PollingJsonIngester(
test_config['name'], test_config)
self.test_driver = json_ingester.JsonIngester(
self.test_config['name'], self.test_config)
@mock.patch.object(json_ingester.PollingJsonIngester, '_update_table')
def test_invalid_config_poll_plus_webhook(self):
invalid_config = self.test_config
invalid_config['tables']['servers']['webhook'] = {
"record_jsonpath": "$.payload",
"id_jsonpath": "$.id"}
self.assertRaises(
exception.BadConfig, json_ingester.JsonIngester,
invalid_config['name'], invalid_config)
@mock.patch.object(json_ingester.JsonIngester, '_update_table')
def test_poll(self, _update_table):
mock_api_result = {
"servers": [
@ -107,6 +127,61 @@ class TestPollingJsonIngester(base.TestCase):
_update_table.assert_any_call(
'flavors', new_data=set([]), old_data=set([]), use_snapshot=False)
@mock.patch.object(json_ingester.JsonIngester, '_webhook_update_table')
def test_json_ingester_webhook_handler(self, _webhook_update_table):
test_body = {"payload": {"id": 42, "other": "stuff"}}
self.test_driver.json_ingester_webhook_handler('alarms', test_body)
_webhook_update_table.assert_called_once_with(
'alarms', key=42, data=test_body['payload'])
@mock.patch.object(json_ingester.JsonIngester, '_webhook_update_table')
def test_json_ingester_webhook_handler_non_primitive_key(
self, _webhook_update_table):
test_key = {1: [2, 3], "2": "4"}
test_body = {"payload": {"id": test_key, "other": "stuff"}}
self.test_driver.json_ingester_webhook_handler('alarms', test_body)
_webhook_update_table.assert_called_once_with(
'alarms', key=test_key, data=test_body['payload'])
@mock.patch.object(json_ingester.JsonIngester, '_webhook_update_table')
def test_json_ingester_webhook_handler_missing_payload(
self, _webhook_update_table):
test_body = {"not_payload": {"id": 42, "other": "stuff"}}
self.assertRaises(
exception.BadRequest,
self.test_driver.json_ingester_webhook_handler,
'alarms', test_body)
@mock.patch.object(json_ingester.JsonIngester, '_webhook_update_table')
def test_json_ingester_webhook_handler_missing_id(
self, _webhook_update_table):
test_body = {"payload": {"not_id": 42, "other": "stuff"}}
self.assertRaises(
exception.BadRequest,
self.test_driver.json_ingester_webhook_handler,
'alarms', test_body)
def test_json_ingester_webhook_key_too_long(self):
test_body = {"payload": {"id": "X"*2713, "other": "stuff"}}
self.assertRaises(
exception.BadRequest,
self.test_driver.json_ingester_webhook_handler,
'alarms', test_body)
def test_json_ingester_webhook_nonexistent_table(self):
test_body = {"payload": {"id": 42, "other": "stuff"}}
self.assertRaises(
exception.NotFound,
self.test_driver.json_ingester_webhook_handler,
'no_such_table', test_body)
def test_json_ingester_webhook_non_webhook_table(self):
test_body = {"payload": {"id": 42, "other": "stuff"}}
self.assertRaises(
exception.NotFound,
self.test_driver.json_ingester_webhook_handler,
'servers', test_body)
class TestKeyMap(base.TestCase):

View File

@ -1,5 +1,5 @@
name: nova
poll: 1
poll_interval: 60
authentication:
type: keystone
username: admin
@ -9,10 +9,16 @@ authentication:
api_endpoint: http://127.0.0.1/compute/v2.1/
tables:
flavors:
api_path: flavors/detail
api_verb: get
jsonpath: $.flavors[:]
poll:
api_path: flavors/detail
api_method: get
jsonpath: $.flavors[:]
servers:
api_path: servers/detail
api_verb: get
jsonpath: $.servers[:]
poll:
api_path: servers/detail
api_method: get
jsonpath: $.servers[:]
alarms:
webhook:
record_jsonpath: $.payload
id_jsonpath: $.id

View File

@ -21,6 +21,7 @@ from oslo_log import log as logging
from congress.datasources import datasource_driver
from congress.datasources import datasource_utils
from congress.datasources import json_ingester
LOG = logging.getLogger(__name__)
@ -74,3 +75,30 @@ class FakeDataSource(datasource_driver.PollingDataSourceDriver,
def _webhook_handler(self, payload):
self.webhook_payload = payload
class FakeJsonIngester(json_ingester.JsonIngester):
def __init__(self, name='fake_json', config=None):
if config is None:
config = {
"tables": {
"alarms": {
"webhook": {
"record_jsonpath": "$.payload",
"id_jsonpath": "$.id"
}
}
},
"name": name
}
super(FakeJsonIngester, self).__init__(name, config)
# override for unit testing
def _create_schema_and_tables(self):
pass
# override for unit testing
def json_ingester_webhook_handler(self, table_name, body):
self.webhook_table_name = table_name
self.webhook_payload = body