[HBase] Implement events on HBase

In this change events are written into HBase and read from it.

For this purpose were implemented several methods:
1. record_events --- store events into HBase
2. get_events, get_event_types, get_trait_types and get_traits --- get events
from db.

Add new filters in mock HBase part, that were used in new methods.

Change-Id: I787eeaf998c36a6829f17f0c6ea27b786979edd1
Implements: blueprint hbase-events-feature
This commit is contained in:
Igor Degtiarov 2014-06-10 12:04:00 +03:00
parent c0052cf6f1
commit 08a811a5f0
3 changed files with 274 additions and 15 deletions

View File

@ -23,6 +23,7 @@ import datetime
import hashlib
import itertools
import json
import operator
import os
import re
import six.moves.urllib.parse as urlparse
@ -52,6 +53,7 @@ AVAILABLE_CAPABILITIES = {
'statistics': {'query': {'simple': True,
'metadata': True},
'aggregation': {'standard': True}},
'events': {'query': {'simple': True}},
}
@ -59,6 +61,11 @@ AVAILABLE_STORAGE_CAPABILITIES = {
'storage': {'production_ready': True},
}
DTYPE_NAMES = {'none': 0, 'string': 1, 'integer': 2, 'float': 3,
'datetime': 4}
OP_SIGN = {'eq': '=', 'lt': '<', 'le': '<=', 'ne': '!=', 'gt': '>',
'ge': '>='}
class Connection(base.Connection):
"""Put the data into a HBase database
@ -114,6 +121,16 @@ class Connection(base.Connection):
- Column Families:
f: raw incoming alarm_history data. Timestamp becomes now()
if not determined
- events
- row_key: timestamp of event's generation + uuid of event
in format: "%s+%s" % (ts, Event.message_id)
-Column Families:
f: contains the following qualifiers:
-event_type: description of event's type
-timestamp: time stamp of event generation
-all traits for this event in format
"%s+%s" % (trait_name, trait_type)
"""
CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
@ -128,6 +145,7 @@ class Connection(base.Connection):
METER_TABLE = "meter"
ALARM_TABLE = "alarm"
ALARM_HISTORY_TABLE = "alarm_h"
EVENT_TABLE = "event"
def __init__(self, url):
"""Hbase Connection Initialization."""
@ -155,6 +173,7 @@ class Connection(base.Connection):
conn.create_table(self.METER_TABLE, {'f': dict(max_versions=1)})
conn.create_table(self.ALARM_TABLE, {'f': dict()})
conn.create_table(self.ALARM_HISTORY_TABLE, {'f': dict()})
conn.create_table(self.EVENT_TABLE, {'f': dict(max_versions=1)})
def clear(self):
LOG.debug(_('Dropping HBase schema...'))
@ -162,7 +181,8 @@ class Connection(base.Connection):
for table in [self.RESOURCE_TABLE,
self.METER_TABLE,
self.ALARM_TABLE,
self.ALARM_HISTORY_TABLE]:
self.ALARM_HISTORY_TABLE,
self.EVENT_TABLE]:
try:
conn.disable_table(table)
except Exception:
@ -269,7 +289,7 @@ class Connection(base.Connection):
"""
alarm_change_dict = serialize_entry(alarm_change)
ts = alarm_change.get('timestamp') or datetime.datetime.now()
rts = reverse_timestamp(ts)
rts = timestamp(ts)
with self.conn_pool.connection() as conn:
alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE)
alarm_history_table.put(alarm_change.get('alarm_id') + "_" +
@ -307,7 +327,7 @@ class Connection(base.Connection):
# We use reverse timestamps in rowkeys as they are sorted
# alphabetically.
rts = reverse_timestamp(data['timestamp'])
rts = timestamp(data['timestamp'])
row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest())
record = serialize_entry(data, **{'source': data['source'],
'rts': rts,
@ -558,6 +578,139 @@ class Connection(base.Connection):
self._update_meter_stats(results[-1], meter[0])
return results
def record_events(self, event_models):
"""Write the events to Hbase.
:param event_models: a list of models.Event objects.
"""
problem_events = []
with self.conn_pool.connection() as conn:
events_table = conn.table(self.EVENT_TABLE)
for event_model in event_models:
# Row key consists of timestamp and message_id from
# models.Event or purposes of storage event sorted by
# timestamp in the database.
ts = event_model.generated
row = "%d_%s" % (timestamp(ts, reverse=False),
event_model.message_id)
event_type = event_model.event_type
traits = {}
if event_model.traits:
for trait in event_model.traits:
key = "%s+%d" % (trait.name, trait.dtype)
traits[key] = trait.value
record = serialize_entry(traits, event_type=event_type,
timestamp=ts)
try:
events_table.put(row, record)
except Exception as ex:
LOG.debug(_("Failed to record event: %s") % ex)
problem_events.append((models.Event.UNKNOWN_PROBLEM,
event_model))
return problem_events
def get_events(self, event_filter):
"""Return an iterable of models.Event objects.
:param event_filter: storage.EventFilter object, consists of filters
for events that are stored in database.
"""
q, start, stop = make_events_query_from_filter(event_filter)
with self.conn_pool.connection() as conn:
events_table = conn.table(self.EVENT_TABLE)
gen = events_table.scan(filter=q, row_start=start, row_stop=stop)
events = []
for event_id, data in gen:
traits = []
events_dict = deserialize_entry(data)[0]
for key, value in events_dict.items():
if (not key.startswith('event_type')
and not key.startswith('timestamp')):
trait_name, trait_dtype = key.rsplit('+', 1)
traits.append(models.Trait(name=trait_name,
dtype=int(trait_dtype),
value=value))
ts, mess = event_id.split('_', 1)
events.append(models.Event(
message_id=mess,
event_type=events_dict['event_type'],
generated=events_dict['timestamp'],
traits=sorted(traits, key=(lambda item:
getattr(item, 'dtype')))
))
return events
def get_event_types(self):
"""Return all event types as an iterable of strings."""
with self.conn_pool.connection() as conn:
events_table = conn.table(self.EVENT_TABLE)
gen = events_table.scan()
event_types = set()
for event_id, data in gen:
events_dict = deserialize_entry(data)[0]
for key, value in events_dict.items():
if key.startswith('event_type'):
if value not in event_types:
event_types.add(value)
yield value
def get_trait_types(self, event_type):
"""Return a dictionary containing the name and data type of the trait.
Only trait types for the provided event_type are returned.
:param event_type: the type of the Event
"""
q = make_query(event_type=event_type)
trait_types = set()
with self.conn_pool.connection() as conn:
events_table = conn.table(self.EVENT_TABLE)
gen = events_table.scan(filter=q)
for event_id, data in gen:
events_dict = deserialize_entry(data)[0]
for key, value in events_dict.items():
if (not key.startswith('event_type') and
not key.startswith('timestamp')):
name, tt_number = key.rsplit('+', 1)
if name not in trait_types:
# Here we check that our method return only unique
# trait types, for ex. if it is found the same trait
# types in different events with equal event_type,
# method will return only one trait type. It is
# proposed that certain trait name could have only one
# trait type.
trait_types.add(name)
data_type = models.Trait.type_names[int(tt_number)]
yield {'name': name, 'data_type': data_type}
def get_traits(self, event_type, trait_type=None):
"""Return all trait instances associated with an event_type. If
trait_type is specified, only return instances of that trait type.
:param event_type: the type of the Event to filter by
:param trait_type: the name of the Trait to filter by
"""
q = make_query(event_type=event_type, trait_type=trait_type)
traits = []
with self.conn_pool.connection() as conn:
events_table = conn.table(self.EVENT_TABLE)
gen = events_table.scan(filter=q)
for event_id, data in gen:
events_dict = deserialize_entry(data)[0]
for key, value in events_dict.items():
if (not key.startswith('event_type') and
not key.startswith('timestamp')):
name, tt_number = key.rsplit('+', 1)
traits.append(models.Trait(name=name,
dtype=int(tt_number), value=value))
for trait in sorted(traits, key=operator.attrgetter('dtype')):
yield trait
###############
# This is a very crude version of "in-memory HBase", which implements just
@ -656,6 +809,55 @@ class MTable(object):
"yet" % op)
return r
@staticmethod
def ColumnPrefixFilter(args, rows):
"""This is filter for testing "in-memory HBase".
This method is called from scan() when 'ColumnPrefixFilter' is found
in the 'filter' argument
:param args is list of filter arguments, contain prefix of column
:param rows is dict of row prefixes for filtering
"""
value = args[0]
column = 'f:' + value
r = {}
for row, data in rows.items():
column_dict = {}
for key in data:
if key.startswith(column):
column_dict[key] = data[key]
r[row] = column_dict
return r
@staticmethod
def RowFilter(args, rows):
"""This is filter for testing "in-memory HBase".
This method is called from scan() when 'RowFilter'
is found in the 'filter' argument
:param args is list of filter arguments, it contains operator and
sought string
:param rows is dict of rows which are filtered
"""
op = args[0]
value = args[1]
if value.startswith('regexstring:'):
value = value[len('regexstring:'):]
r = {}
for row, data in rows.items():
try:
g = re.search(value, row).group()
if op == '=':
if g == row:
r[row] = data
else:
raise NotImplementedError("In-memory "
"RowFilter doesn't support "
"the %s operation yet" % op)
except AttributeError:
pass
return r
class MConnectionPool(object):
def __init__(self):
@ -697,19 +899,56 @@ class MConnection(object):
#################################################
# Here be various HBase helpers
def reverse_timestamp(dt):
"""Reverse timestamp so that newer timestamps are represented by smaller
numbers than older ones.
def timestamp(dt, reverse=True):
"""Timestamp is count of milliseconds since start of epoch.
Reverse timestamps is a technique used in HBase rowkey design. When period
Timestamps is a technique used in HBase rowkey design. When period
queries are required the HBase rowkeys must include timestamps, but as
rowkeys in HBase are ordered lexicographically, the timestamps must be
reversed.
rowkeys in HBase are ordered lexicographically.
Same for the reversed timestamps, but the order will be opposite.
:param: dt: datetime which is translated to the (reversed or not) timestamp
:param: reverse: is a boolean parameter for reverse or straight count of
timestamp in milliseconds
:return count or reversed count of milliseconds since start of epoch
"""
epoch = datetime.datetime(1970, 1, 1)
td = dt - epoch
ts = td.microseconds + td.seconds * 1000000 + td.days * 86400000000
return 0x7fffffffffffffff - ts
return 0x7fffffffffffffff - ts if reverse else ts
def make_events_query_from_filter(event_filter):
"""Return start and stop row for filtering and a query which based on the
selected parameter.
:param event_filter: storage.EventFilter object.
"""
q = []
res_q = None
start = "%s" % (timestamp(event_filter.start_time, reverse=False)
if event_filter.start_time else "")
stop = "%s" % (timestamp(event_filter.end_time, reverse=False)
if event_filter.end_time else "")
if event_filter.event_type:
q.append("SingleColumnValueFilter ('f', 'event_type', = , "
"'binary:%s')" % dump(event_filter.event_type))
if event_filter.message_id:
q.append("RowFilter ( = , 'regexstring:\d*_%s')" %
event_filter.message_id)
if len(q):
res_q = " AND ".join(q)
if event_filter.traits_filter:
for trait_filter in event_filter.traits_filter:
q_trait = make_query(trait_query=True, **trait_filter)
if q_trait:
if res_q:
res_q += " AND " + q_trait
else:
res_q = q_trait
return res_q, start, stop
def make_timestamp_query(func, start=None, start_op=None, end=None,
@ -748,8 +987,8 @@ def make_timestamp_query(func, start=None, start_op=None, end=None,
def get_start_end_rts(start, start_op, end, end_op):
rts_start = str(reverse_timestamp(start) + 1) if start else ""
rts_end = str(reverse_timestamp(end) + 1) if end else ""
rts_start = str(timestamp(start) + 1) if start else ""
rts_end = str(timestamp(end) + 1) if end else ""
#By default, we are using ge for lower bound and lt for upper bound
if start_op == 'gt':
@ -760,23 +999,42 @@ def get_start_end_rts(start, start_op, end, end_op):
return rts_start, rts_end
def make_query(metaquery=None, **kwargs):
def make_query(metaquery=None, trait_query=None, **kwargs):
"""Return a filter query string based on the selected parameters.
:param metaquery: optional metaquery dict
:param trait_query: optional boolean, for trait_query from kwargs
:param kwargs: key-value pairs to filter on. Key should be a real
column name in db
"""
q = []
res_q = None
# Query for traits if a little differ from others it is constructed with
# SingleColumnValueFilter with the possibility to choose an operator for
# value
if trait_query:
trait_name = kwargs.pop('key')
op = kwargs.pop('op', 'eq')
for k, v in kwargs.items():
if v is not None:
res_q = ("SingleColumnValueFilter "
"('f', '%s+%d', %s, 'binary:%s', true, true)" %
(trait_name, DTYPE_NAMES[k], OP_SIGN[op],
dump(v)))
return res_q
# Note: we use extended constructor for SingleColumnValueFilter here.
# It is explicitly specified that entry should not be returned if CF is not
# found in table.
for key, value in kwargs.items():
for key, value in sorted(kwargs.items()):
if value is not None:
if key == 'source':
q.append("SingleColumnValueFilter "
"('f', 's_%s', =, 'binary:%s', true, true)" %
(value, dump('1')))
elif key == 'trait_type':
q.append("ColumnPrefixFilter('%s')" % value)
else:
q.append("SingleColumnValueFilter "
"('f', '%s', =, 'binary:%s', true, true)" %

View File

@ -93,7 +93,7 @@ class CapabilitiesTest(test_base.BaseTestCase):
'complex': False},
'history': {'query': {'simple': False,
'complex': False}}},
'events': {'query': {'simple': False}}
'events': {'query': {'simple': True}},
}
actual_capabilities = hbase.Connection.get_capabilities()

View File

@ -2718,6 +2718,7 @@ class EventTestBase(tests_db.TestBase,
class EventTest(EventTestBase):
@tests_db.run_with('sqlite', 'mongodb', 'db2')
def test_duplicate_message_id(self):
now = datetime.datetime.utcnow()
m = [models.Event("1", "Foo", now, None),