elastic-recheck/elastic_recheck/results.py

245 lines
8.8 KiB
Python

# Copyright Samsung Electronics 2013. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Elastic search wrapper to make handling results easier."""
import calendar
import copy
import datetime
import pprint
import dateutil.parser as dp
import pyelasticsearch
import pytz
pp = pprint.PrettyPrinter()
class SearchEngine(object):
"""Wrapper for pyelasticsearch so that it returns result sets."""
def __init__(self, url, indexfmt='logstash-%Y.%m.%d'):
self._url = url
self._indexfmt = indexfmt
self.index_cache = {}
def _is_valid_index(self, es, index):
if index in self.index_cache:
return self.index_cache[index]
try:
es.status(index=index)
self.index_cache[index] = True
return True
except pyelasticsearch.exceptions.ElasticHttpNotFoundError:
return False
def search(self, query, size=1000, recent=False, days=0):
"""Search an elasticsearch server.
`query` parameter is the complicated query structure that
pyelasticsearch uses. More details in their documentation.
`size` is the max number of results to return from the search
engine. We default it to 1000 to ensure we don't loose things.
For certain classes of queries (like faceted ones), this can actually
be set very low, as it won't impact the facet counts.
`recent` search only most recent indexe(s), assuming this is basically
a real time query that you only care about the last hour of time.
Using recent dramatically reduces the load on the ES cluster.
`days` search only the last number of days.
The returned result is a ResultSet query.
"""
es = pyelasticsearch.ElasticSearch(self._url)
args = {'size': size}
if recent or days:
# today's index
datefmt = self._indexfmt
now = datetime.datetime.utcnow()
indexes = []
latest_index = now.strftime(datefmt)
if self._is_valid_index(es, latest_index):
indexes.append(latest_index)
if recent:
lasthr = now - datetime.timedelta(hours=1)
lasthr_index = lasthr.strftime(datefmt)
if lasthr_index != latest_index:
if self._is_valid_index(es, lasthr_index):
indexes.append(lasthr.strftime(datefmt))
for day in range(1, days):
lastday = now - datetime.timedelta(days=day)
index_name = lastday.strftime(datefmt)
if self._is_valid_index(es, index_name):
indexes.append(index_name)
args['index'] = indexes
results = es.search(query, **args)
return ResultSet(results)
class ResultSet(list):
"""An easy iterator object for handling elasticsearch results.
pyelasticsearch returns very complex result structures, and manipulating
them directly is both ugly and error prone. The point of this wrapper class
is to give us a container that makes working with pyes results more
natural.
For instance:
::
results = se.search(...)
for hit in results:
print hit.build_status
This greatly simplifies code that is interacting with search results, and
allows us to handle some schema instability with elasticsearch, through
adapting our __getattr__ methods.
Design goals for ResultSet are that it is an iterator, and that all the
data that we want to work with is mapped to a flat attribute namespace
(pyes goes way overboard with nesting, which is fine in the general
case, but in the elastic_recheck case is just added complexity).
"""
def __init__(self, results=None):
self._results = results or {}
if 'hits' in self._results:
self._parse_hits(self._results['hits'])
def _parse_hits(self, hits):
# why, oh why elastic search
hits = hits['hits']
for hit in hits:
list.append(self, Hit(hit))
def __getattr__(self, attr):
"""Magic __getattr__, flattens the attributes namespace.
First search to see if a facet attribute exists by this name,
secondly look at the top level attributes to return.
"""
if 'facets' in self._results:
if attr in self._results['facets']['tag']:
return self._results['facets']['tag'][attr]
if attr in self._results:
return self._results[attr]
raise AttributeError(attr)
class FacetSet(dict):
"""A dictionary like collection for creating faceted ResultSets.
Elastic Search doesn't support nested facets, which are incredibly
useful for things like faceting by build_status then by build_uuid.
This is a client side implementation that processes a ResultSet
with an ordered list of facets, and turns it into a data structure
which is FacetSet -> FacetSet ... -> ResultSet (arbitrary nesting
of FaceSets with ResultSet as the leaves.
Treat this basically like a dictionary (which it inherits from).
"""
@staticmethod
def _histogram(data, facet, res=3600):
"""A preprocessor for data should we want to bucket it."""
# NOTE(mriedem): We sometimes hit a case where the @timestamp attribute
# is too large and ES won't return it. At some point we should probably
# log a warning/error for these so we can clean them up.
if facet == "timestamp" and data is not None:
ts = dp.parse(data)
tsepoch = int(calendar.timegm(ts.timetuple()))
# take the floor based on resolution
ts -= datetime.timedelta(
seconds=(tsepoch % res),
microseconds=ts.microsecond)
# ms since epoch
epoch = datetime.datetime.fromtimestamp(0, pytz.UTC)
pos = int(((ts - epoch).total_seconds()) * 1000)
return pos
return data
def detect_facets(self, results, facets, res=3600):
if len(facets) > 0:
facet = facets.pop(0)
for hit in results:
attr = self._histogram(hit[facet], facet)
if attr not in self:
dict.setdefault(self, attr, ResultSet())
self[attr].append(hit)
else:
self[attr].append(hit)
# if we still have more facets to go, recurse down
if len(facets) > 0:
newkeys = {}
for key in self:
fs = FacetSet()
fs.detect_facets(self[key], copy.deepcopy(facets), res=res)
newkeys[key] = fs
self.update(newkeys)
class Hit(object):
def __init__(self, hit):
self._hit = hit
def index(self):
return self._hit['_index']
def __getitem__(self, key):
return self.__getattr__(key)
def __getattr__(self, attr):
"""flatten out our attr space into a few key types
new style ES has
_source[attr] for a flat space
old style ES has
_source['@attr'] for things like message, @timestamp
and
_source['@fields'][attr] for things like build_name, build_status
also, always collapse down all attributes to singletons, because
they might be lists if we use multiline processing (which we do
a lot). In the general case this could be a problem, but the way
we use logstash, there is only ever one element in these lists.
"""
def first(item):
if isinstance(item, list):
# We've seen cases where the field data, like @timestamp, is
# too large so we don't get anything back from elastic-search,
# so skip over those.
if len(item) > 0:
return item[0]
return None
return item
result = None
at_attr = "@%s" % attr
if attr in self._hit['_source']:
result = first(self._hit['_source'][attr])
elif at_attr in self._hit['_source']:
result = first(self._hit['_source'][at_attr])
elif attr in self._hit['_source']['@fields']:
result = first(self._hit['_source']['@fields'][attr])
return result
def __repr__(self):
return pp.pformat(self._hit)