add elasticsearch events db

this patch implements an elasticsearch driver for events.

Implements: blueprint elasticsearch-driver
Change-Id: Ie579f325685c14aed78d83de7c6d7bff326bc188
This commit is contained in:
gordon chung 2014-11-24 09:02:36 -05:00
parent a3a4e0e283
commit cc7b8b4ecf
7 changed files with 339 additions and 1 deletions

View File

@ -0,0 +1,268 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import operator
import elasticsearch as es
from elasticsearch import helpers
from oslo.utils import netutils
from oslo.utils import timeutils
import six
from ceilometer.event.storage import base
from ceilometer.event.storage import models
from ceilometer import utils
AVAILABLE_CAPABILITIES = {
'events': {'query': {'simple': True}},
}
AVAILABLE_STORAGE_CAPABILITIES = {
'storage': {'production_ready': True},
}
class Connection(base.Connection):
"""Put the event data into an ElasticSearch db.
Events in ElasticSearch are indexed by day and stored by event_type.
An example document::
{"_index":"events_2014-10-21",
"_type":"event_type0",
"_id":"dc90e464-65ab-4a5d-bf66-ecb956b5d779",
"_score":1.0,
"_source":{"timestamp": "2014-10-21T20:02:09.274797"
"traits": {"id4_0": "2014-10-21T20:02:09.274797",
"id3_0": 0.7510790937279408,
"id2_0": 5,
"id1_0": "18c97ba1-3b74-441a-b948-a702a30cbce2"}
}
}
"""
CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
AVAILABLE_CAPABILITIES)
STORAGE_CAPABILITIES = utils.update_nested(
base.Connection.STORAGE_CAPABILITIES,
AVAILABLE_STORAGE_CAPABILITIES,
)
index_name = 'events'
# NOTE(gordc): mainly for testing, data is not searchable after write,
# it is only searchable after periodic refreshes.
_refresh_on_write = False
def __init__(self, url):
url_split = netutils.urlsplit(url)
self.conn = es.Elasticsearch(url_split.netloc)
def upgrade(self):
iclient = es.client.IndicesClient(self.conn)
ts_template = {
'template': '*',
'mappings': {'_default_':
{'_timestamp': {'enabled': True,
'store': True},
'properties': {'traits': {'type': 'nested'}}}}}
iclient.put_template(name='enable_timestamp', body=ts_template)
def record_events(self, events):
def _build_bulk_index(event_list):
for ev in event_list:
traits = {t.name: t.value for t in ev.traits}
yield {'_op_type': 'create',
'_index': '%s_%s' % (self.index_name,
ev.generated.date().isoformat()),
'_type': ev.event_type,
'_id': ev.message_id,
'_source': {'timestamp': ev.generated.isoformat(),
'traits': traits}}
problem_events = []
for ok, result in helpers.streaming_bulk(
self.conn, _build_bulk_index(events)):
if not ok:
__, result = result.popitem()
if result['status'] == 409:
problem_events.append((models.Event.DUPLICATE,
result['_id']))
else:
problem_events.append((models.Event.UNKNOWN_PROBLEM,
result['_id']))
if self._refresh_on_write:
self.conn.indices.refresh(index='%s_*' % self.index_name)
while self.conn.cluster.pending_tasks(local=True)['tasks']:
pass
return problem_events
def _make_dsl_from_filter(self, indices, ev_filter):
q_args = {}
filters = []
if ev_filter.start_timestamp:
filters.append({'range': {'timestamp':
{'ge': ev_filter.start_timestamp.isoformat()}}})
while indices[0] < (
'%s_%s' % (self.index_name,
ev_filter.start_timestamp.date().isoformat())):
del indices[0]
if ev_filter.end_timestamp:
filters.append({'range': {'timestamp':
{'le': ev_filter.end_timestamp.isoformat()}}})
while indices[-1] > (
'%s_%s' % (self.index_name,
ev_filter.end_timestamp.date().isoformat())):
del indices[-1]
q_args['index'] = indices
if ev_filter.event_type:
q_args['doc_type'] = ev_filter.event_type
if ev_filter.message_id:
filters.append({'term': {'_id': ev_filter.message_id}})
if ev_filter.traits_filter:
trait_filters = []
for t_filter in ev_filter.traits_filter:
value = None
for val_type in ['integer', 'string', 'float', 'datetime']:
if t_filter.get(val_type):
value = t_filter.get(val_type)
if isinstance(value, six.string_types):
value = value.lower()
elif isinstance(value, datetime.datetime):
value = value.isoformat()
break
if t_filter.get('op') in ['gt', 'ge', 'lt', 'le']:
op = (t_filter.get('op').replace('ge', 'gte')
.replace('le', 'lte'))
trait_filters.append(
{'range': {t_filter['key']: {op: value}}})
else:
tf = {"query": {"query_string": {
"query": "%s: \"%s\"" % (t_filter['key'], value)}}}
if t_filter.get('op') == 'ne':
tf = {"not": tf}
trait_filters.append(tf)
filters.append(
{'nested': {'path': 'traits', 'query': {'filtered': {
'filter': {'bool': {'must': trait_filters}}}}}})
q_args['body'] = {'query': {'filtered':
{'filter': {'bool': {'must': filters}}}}}
return q_args
def get_events(self, event_filter):
iclient = es.client.IndicesClient(self.conn)
indices = iclient.get_mapping('%s_*' % self.index_name).keys()
if indices:
filter_args = self._make_dsl_from_filter(indices, event_filter)
results = self.conn.search(fields=['_id', 'timestamp',
'_type', '_source'],
sort='timestamp:asc',
**filter_args)
trait_mappings = {}
for record in results['hits']['hits']:
trait_list = []
if not record['_type'] in trait_mappings:
trait_mappings[record['_type']] = list(
self.get_trait_types(record['_type']))
for key in record['_source']['traits'].keys():
value = record['_source']['traits'][key]
for t_map in trait_mappings[record['_type']]:
if t_map['name'] == key:
dtype = t_map['data_type']
break
trait_list.append(models.Trait(
name=key, dtype=dtype,
value=models.Trait.convert_value(dtype, value)))
gen_ts = timeutils.normalize_time(timeutils.parse_isotime(
record['_source']['timestamp']))
yield models.Event(message_id=record['_id'],
event_type=record['_type'],
generated=gen_ts,
traits=sorted(
trait_list,
key=operator.attrgetter('dtype')))
def get_event_types(self):
iclient = es.client.IndicesClient(self.conn)
es_mappings = iclient.get_mapping('%s_*' % self.index_name)
seen_types = set()
for index in es_mappings.keys():
for ev_type in es_mappings[index]['mappings'].keys():
seen_types.add(ev_type)
# TODO(gordc): tests assume sorted ordering but backends are not
# explicitly ordered.
# NOTE: _default_ is a type that appears in all mappings but is not
# real 'type'
seen_types.discard('_default_')
return sorted(list(seen_types))
@staticmethod
def _remap_es_types(d_type):
if d_type == 'string':
d_type = 'text'
elif d_type == 'long':
d_type = 'int'
elif d_type == 'double':
d_type = 'float'
elif d_type == 'date' or d_type == 'date_time':
d_type = 'datetime'
return d_type
def get_trait_types(self, event_type):
iclient = es.client.IndicesClient(self.conn)
es_mappings = iclient.get_mapping('%s_*' % self.index_name)
seen_types = []
for index in es_mappings.keys():
# if event_type exists in index and has traits
if (es_mappings[index]['mappings'].get(event_type) and
es_mappings[index]['mappings'][event_type]['properties']
['traits'].get('properties')):
for t_type in (es_mappings[index]['mappings'][event_type]
['properties']['traits']['properties'].keys()):
d_type = (es_mappings[index]['mappings'][event_type]
['properties']['traits']['properties']
[t_type]['type'])
d_type = models.Trait.get_type_by_name(
self._remap_es_types(d_type))
if (t_type, d_type) not in seen_types:
yield {'name': t_type, 'data_type': d_type}
seen_types.append((t_type, d_type))
def get_traits(self, event_type, trait_type=None):
t_types = dict((res['name'], res['data_type'])
for res in self.get_trait_types(event_type))
if not t_types or (trait_type and trait_type not in t_types.keys()):
return
result = self.conn.search('%s_*' % self.index_name, event_type)
for ev in result['hits']['hits']:
if trait_type and ev['_source']['traits'].get(trait_type):
yield models.Trait(
name=trait_type,
dtype=t_types[trait_type],
value=models.Trait.convert_value(
t_types[trait_type],
ev['_source']['traits'][trait_type]))
else:
for trait in ev['_source']['traits'].keys():
yield models.Trait(
name=trait,
dtype=t_types[trait],
value=models.Trait.convert_value(
t_types[trait],
ev['_source']['traits'][trait]))

View File

@ -102,6 +102,24 @@ class MySQLManager(SQLManager):
self._conn.execute('CREATE DATABASE %s;' % self._db_name)
class ElasticSearchManager(fixtures.Fixture):
def __init__(self, url):
self.url = url
def setUp(self):
super(ElasticSearchManager, self).setUp()
self.connection = storage.get_connection(
'sqlite://', 'ceilometer.metering.storage')
self.alarm_connection = storage.get_connection(
'sqlite://', 'ceilometer.alarm.storage')
self.event_connection = storage.get_connection(
self.url, 'ceilometer.event.storage')
# prefix each test with unique index name
self.event_connection.index_name = 'events_%s' % uuid.uuid4().hex
# force index on write so data is queryable right away
self.event_connection._refresh_on_write = True
class HBaseManager(fixtures.Fixture):
def __init__(self, url):
self._url = url
@ -166,6 +184,7 @@ class TestBase(testscenarios.testcase.WithScenarios, test_base.BaseTestCase):
'db2': MongoDbManager,
'sqlite': SQLiteManager,
'hbase': HBaseManager,
'es': ElasticSearchManager,
}
db_url = 'sqlite://' # NOTE(Alexei_987) Set default db url
@ -256,7 +275,7 @@ class MixinTestsWithBackendScenarios(object):
('sqlite', {'db_url': 'sqlite://'}),
]
for db in ('MONGODB', 'MYSQL', 'PGSQL', 'HBASE', 'DB2'):
for db in ('MONGODB', 'MYSQL', 'PGSQL', 'HBASE', 'DB2', 'ES'):
if os.environ.get('CEILOMETER_TEST_%s_URL' % db):
scenarios.append(
(db.lower(), {'db_url': os.environ.get(

41
setup-test-env-elastic.sh Executable file
View File

@ -0,0 +1,41 @@
#!/bin/bash
set -e
source functions.sh
if [ "$1" = "--coverage" ]; then
COVERAGE_ARG="$1"
shift
fi
export PATH=$PATH:/usr/share/elasticsearch/bin
check_for_cmd elasticsearch
# check for Java
if [ -x "$JAVA_HOME/bin/java" ]; then
JAVA="$JAVA_HOME/bin/java"
else
JAVA=`which java`
fi
if [ ! -x "$JAVA" ]; then
echo "Could not find any executable java binary. Please install java in your PATH or set JAVA_HOME"
exit 1
fi
# Start ElasticSearch process for tests
ES_DATA=`mktemp -d /tmp/CEILO-ES-XXXXX`
ES_PORT=9200
ES_PID=${ES_DATA}/elasticsearch.pid
elasticsearch -p ${ES_PID} -Des.http.port=${ES_PORT} -Des.path.logs=${ES_DATA}/logs -Des.path.data=${ES_DATA} -Des.path.conf=/etc/elasticsearch &> ${ES_DATA}/out &
# Wait for ElasticSearch to start listening to connections
sleep 3
wait_for_line "started" ${ES_DATA}/out
export CEILOMETER_TEST_ES_URL="es://localhost:${ES_PORT}"
# Yield execution to venv command
$*
# Kill ElasticSearch
kill $(cat ${ES_PID})

View File

@ -220,6 +220,7 @@ ceilometer.alarm.storage =
db2 = ceilometer.alarm.storage.impl_db2:Connection
ceilometer.event.storage =
es = ceilometer.event.storage.impl_elasticsearch:Connection
log = ceilometer.event.storage.impl_log:Connection
mongodb = ceilometer.event.storage.impl_mongodb:Connection
mysql = ceilometer.event.storage.impl_sqlalchemy:Connection

View File

@ -7,6 +7,7 @@ hacking>=0.10.0,<0.11
Babel>=1.3
coverage>=3.6
discover
elasticsearch>=1.3.0
fixtures>=0.3.14
httplib2>=0.7.5
mock>=1.0

View File

@ -7,6 +7,7 @@ hacking>=0.10.0,<0.11
Babel>=1.3
coverage>=3.6
discover
elasticsearch>=1.3.0
fixtures>=0.3.14
httplib2>=0.7.5
mock>=1.0

View File

@ -24,6 +24,10 @@ commands =
commands =
bash -x {toxinidir}/setup-test-env-postgresql.sh python setup.py testr --slowest --testr-args="{posargs}"
[testenv:py-elastic]
commands =
bash -x {toxinidir}/setup-test-env-elastic.sh python setup.py testr --slowest --testr-args="{posargs}"
[testenv:py33]
deps = -r{toxinidir}/requirements-py3.txt
-r{toxinidir}/test-requirements-py3.txt
@ -83,6 +87,9 @@ commands = bash -x {toxinidir}/setup-test-env-mysql.sh oslo_debug_helper {posarg
[testenv:debug-pgsql]
commands = bash -x {toxinidir}/setup-test-env-postgresql.sh oslo_debug_helper {posargs}
[testenv:debug-elastic]
commands = bash -x {toxinidir}/setup-test-env-elastic.sh oslo_debug_helper {posargs}
[flake8]
ignore =
exclude=.venv,.git,.tox,dist,doc,./ceilometer/openstack/common,*lib/python*,*egg,nova_tests,build,tools/lintstack.head.py