Merge "Hourly aggregation account for early arrving metrics"
This commit is contained in:
commit
699103e345
@ -29,6 +29,7 @@ enable_instance_usage_df_cache = True
|
||||
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
enable_batch_time_filtering = True
|
||||
data_provider=monasca_transform.processor.pre_hourly_processor:PreHourlyProcessorDataProvider
|
||||
effective_batch_revision=2
|
||||
|
||||
#
|
||||
# Configurable values for the monasca-transform service
|
||||
|
@ -27,6 +27,7 @@ enable_pre_hourly_processor = True
|
||||
enable_instance_usage_df_cache = True
|
||||
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
enable_batch_time_filtering = True
|
||||
effective_batch_revision=2
|
||||
|
||||
#
|
||||
# Configurable values for the monasca-transform service
|
||||
|
@ -143,7 +143,8 @@ class ConfigInitializer(object):
|
||||
'PreHourlyProcessorDataProvider'),
|
||||
cfg.BoolOpt('enable_instance_usage_df_cache'),
|
||||
cfg.StrOpt('instance_usage_df_cache_storage_level'),
|
||||
cfg.BoolOpt('enable_batch_time_filtering')
|
||||
cfg.BoolOpt('enable_batch_time_filtering'),
|
||||
cfg.IntOpt('effective_batch_revision', default=2)
|
||||
]
|
||||
app_group = cfg.OptGroup(name='pre_hourly_processor',
|
||||
title='pre_hourly_processor')
|
||||
|
@ -79,9 +79,10 @@ class MySQLOffsetSpecs(OffsetSpecs):
|
||||
version_spec.revision = revision
|
||||
revision = revision + 1
|
||||
|
||||
self.session.query(MySQLOffsetSpec).filter(
|
||||
MySQLOffsetSpec.revision > self.MAX_REVISIONS).delete(
|
||||
synchronize_session="fetch")
|
||||
# delete any revisions excess than required
|
||||
self.session.query(MySQLOffsetSpec).filter(
|
||||
MySQLOffsetSpec.revision > self.MAX_REVISIONS).delete(
|
||||
synchronize_session="fetch")
|
||||
|
||||
def get_kafka_offsets(self, app_name):
|
||||
return {'%s_%s_%s' % (
|
||||
@ -90,6 +91,13 @@ class MySQLOffsetSpecs(OffsetSpecs):
|
||||
MySQLOffsetSpec.app_name == app_name,
|
||||
MySQLOffsetSpec.revision == 1).all()}
|
||||
|
||||
def get_kafka_offsets_by_revision(self, app_name, revision):
|
||||
return {'%s_%s_%s' % (
|
||||
offset.get_app_name(), offset.get_topic(), offset.get_partition()
|
||||
): offset for offset in self.session.query(MySQLOffsetSpec).filter(
|
||||
MySQLOffsetSpec.app_name == app_name,
|
||||
MySQLOffsetSpec.revision == revision).all()}
|
||||
|
||||
def get_most_recent_batch_time_from_offsets(self, app_name, topic):
|
||||
try:
|
||||
# get partition 0 as a representative of all others
|
||||
|
@ -288,16 +288,123 @@ class PreHourlyProcessor(Processor):
|
||||
"""
|
||||
return simport.load(cfg.CONF.repositories.offsets)()
|
||||
|
||||
@staticmethod
|
||||
def get_effective_offset_range_list(offset_range_list):
|
||||
"""get effective batch offset range.
|
||||
Effective batch offset range covers offsets starting
|
||||
from effective batch revision (defined by effective_batch_revision
|
||||
config property). By default this method will set the
|
||||
pyspark Offset.fromOffset for each partition
|
||||
to have value older than the latest revision
|
||||
(defaults to latest -1) so that prehourly processor has access
|
||||
to entire data for the hour. This will also account for and cover
|
||||
any early arriving data (data that arrives before the start hour).
|
||||
"""
|
||||
|
||||
offset_specifications = PreHourlyProcessor.get_offset_specs()
|
||||
|
||||
app_name = PreHourlyProcessor.get_app_name()
|
||||
|
||||
topic = PreHourlyProcessor.get_kafka_topic()
|
||||
|
||||
# start offset revision
|
||||
effective_batch_revision = cfg.CONF.pre_hourly_processor.\
|
||||
effective_batch_revision
|
||||
|
||||
effective_batch_spec = offset_specifications\
|
||||
.get_kafka_offsets_by_revision(app_name,
|
||||
effective_batch_revision)
|
||||
|
||||
# get latest revision, if penultimate is unavailable
|
||||
if not effective_batch_spec:
|
||||
log.debug("effective batch spec: offsets: revision %s unavailable,"
|
||||
" getting the latest revision instead..." % (
|
||||
effective_batch_revision))
|
||||
# not available
|
||||
effective_batch_spec = offset_specifications.get_kafka_offsets(
|
||||
app_name)
|
||||
|
||||
effective_batch_offsets = PreHourlyProcessor._parse_saved_offsets(
|
||||
app_name, topic,
|
||||
effective_batch_spec)
|
||||
|
||||
# for debugging
|
||||
for effective_key in effective_batch_offsets.keys():
|
||||
effective_offset = effective_batch_offsets.get(effective_key,
|
||||
None)
|
||||
(effect_app_name,
|
||||
effect_topic_name,
|
||||
effect_partition,
|
||||
effect_from_offset,
|
||||
effect_until_offset) = effective_offset
|
||||
log.debug(
|
||||
"effective batch offsets (from db):"
|
||||
" OffSetRanges: %s %s %s %s" % (
|
||||
effect_topic_name, effect_partition,
|
||||
effect_from_offset, effect_until_offset))
|
||||
|
||||
# effective batch revision
|
||||
effective_offset_range_list = []
|
||||
for offset_range in offset_range_list:
|
||||
part_topic_key = "_".join((offset_range.topic,
|
||||
str(offset_range.partition)))
|
||||
effective_offset = effective_batch_offsets.get(part_topic_key,
|
||||
None)
|
||||
if effective_offset:
|
||||
(effect_app_name,
|
||||
effect_topic_name,
|
||||
effect_partition,
|
||||
effect_from_offset,
|
||||
effect_until_offset) = effective_offset
|
||||
|
||||
log.debug(
|
||||
"Extending effective offset range:"
|
||||
" OffSetRanges: %s %s %s-->%s %s" % (
|
||||
effect_topic_name, effect_partition,
|
||||
offset_range.fromOffset,
|
||||
effect_from_offset,
|
||||
effect_until_offset))
|
||||
|
||||
effective_offset_range_list.append(
|
||||
OffsetRange(offset_range.topic,
|
||||
offset_range.partition,
|
||||
effect_from_offset,
|
||||
offset_range.untilOffset))
|
||||
else:
|
||||
effective_offset_range_list.append(
|
||||
OffsetRange(offset_range.topic,
|
||||
offset_range.partition,
|
||||
offset_range.fromOffset,
|
||||
offset_range.untilOffset))
|
||||
|
||||
# return effective offset range list
|
||||
return effective_offset_range_list
|
||||
|
||||
@staticmethod
|
||||
def fetch_pre_hourly_data(spark_context,
|
||||
offset_range_list):
|
||||
"""get metrics pre hourly data from offset range list."""
|
||||
|
||||
for o in offset_range_list:
|
||||
log.debug(
|
||||
"fetch_pre_hourly: offset_range_list:"
|
||||
" OffSetRanges: %s %s %s %s" % (
|
||||
o.topic, o.partition, o.fromOffset, o.untilOffset))
|
||||
|
||||
effective_offset_list = PreHourlyProcessor.\
|
||||
get_effective_offset_range_list(offset_range_list)
|
||||
|
||||
for o in effective_offset_list:
|
||||
log.debug(
|
||||
"fetch_pre_hourly: effective_offset_range_list:"
|
||||
" OffSetRanges: %s %s %s %s" % (
|
||||
o.topic, o.partition, o.fromOffset, o.untilOffset))
|
||||
|
||||
# get kafka stream over the same offsets
|
||||
pre_hourly_rdd = KafkaUtils.createRDD(spark_context,
|
||||
{"metadata.broker.list":
|
||||
cfg.CONF.messaging.brokers},
|
||||
offset_range_list)
|
||||
effective_offset_list)
|
||||
return pre_hourly_rdd
|
||||
|
||||
@staticmethod
|
||||
@ -339,10 +446,17 @@ class PreHourlyProcessor(Processor):
|
||||
app_name, topic))
|
||||
|
||||
if most_recent_batch_time:
|
||||
# batches can fire after late metrics slack time, not neccessarily
|
||||
# at the top of the hour
|
||||
most_recent_batch_time_truncated = most_recent_batch_time.replace(
|
||||
minute=0, second=0, microsecond=0)
|
||||
log.debug("filter out records before : %s" % (
|
||||
most_recent_batch_time_truncated.strftime(
|
||||
'%Y-%m-%dT%H:%M:%S')))
|
||||
# filter out records before current batch
|
||||
instance_usage_df = instance_usage_df.filter(
|
||||
instance_usage_df.lastrecord_timestamp_string >=
|
||||
most_recent_batch_time)
|
||||
most_recent_batch_time_truncated)
|
||||
|
||||
# determine the timestamp of the most recent top-of-the-hour (which
|
||||
# is the end of the current batch).
|
||||
@ -351,6 +465,9 @@ class PreHourlyProcessor(Processor):
|
||||
minute=0, second=0, microsecond=0)
|
||||
|
||||
# filter out records after current batch
|
||||
log.debug("filter out records after : %s" % (
|
||||
truncated_timestamp_to_current_hour.strftime(
|
||||
'%Y-%m-%dT%H:%M:%S')))
|
||||
instance_usage_df = instance_usage_df.filter(
|
||||
instance_usage_df.firstrecord_timestamp_string <
|
||||
truncated_timestamp_to_current_hour)
|
||||
|
@ -11,6 +11,7 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import datetime
|
||||
import os
|
||||
import random
|
||||
import sys
|
||||
@ -21,10 +22,15 @@ from collections import defaultdict
|
||||
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import uuidutils
|
||||
from pyspark.streaming.kafka import OffsetRange
|
||||
|
||||
from monasca_common.kafka_lib.common import OffsetResponse
|
||||
|
||||
from monasca_transform.config.config_initializer import ConfigInitializer
|
||||
from monasca_transform.mysql_offset_specs import MySQLOffsetSpecs
|
||||
from monasca_transform.processor.pre_hourly_processor import PreHourlyProcessor
|
||||
|
||||
from tests.functional.component.insert.dummy_insert import DummyInsert
|
||||
from tests.functional.json_offset_specs import JSONOffsetSpecs
|
||||
from tests.functional.messaging.adapter import DummyAdapter
|
||||
@ -49,6 +55,174 @@ class TestPreHourlyProcessorAgg(SparkContextTest):
|
||||
DummyAdapter.init()
|
||||
DummyAdapter.adapter_impl.metric_list = []
|
||||
|
||||
# get mysql offset specs
|
||||
self.kafka_offset_specs = MySQLOffsetSpecs()
|
||||
|
||||
def add_offset_for_test(self, my_app, my_topic, my_partition,
|
||||
my_from_offset, my_until_offset,
|
||||
my_batch_time):
|
||||
""""utility method to populate mysql db with offsets."""
|
||||
self.kafka_offset_specs.add(topic=my_topic, partition=my_partition,
|
||||
app_name=my_app,
|
||||
from_offset=my_from_offset,
|
||||
until_offset=my_until_offset,
|
||||
batch_time_info=my_batch_time)
|
||||
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||
'PreHourlyProcessor.get_app_name')
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||
'PreHourlyProcessor.get_kafka_topic')
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||
'PreHourlyProcessor._get_offsets_from_kafka')
|
||||
def test_get_processing_offset_range_list(self,
|
||||
kafka_get_offsets,
|
||||
kafka_topic_name,
|
||||
app_name):
|
||||
|
||||
# setup
|
||||
my_app = uuidutils.generate_uuid()
|
||||
my_topic = uuidutils.generate_uuid()
|
||||
|
||||
# mock app_name, topic_name, partition
|
||||
app_name.return_value = my_app
|
||||
kafka_topic_name.return_value = my_topic
|
||||
my_partition = 1
|
||||
|
||||
ret_offset_key = "_".join((my_topic, str(my_partition)))
|
||||
kafka_get_offsets.side_effect = [
|
||||
# mock latest offsets
|
||||
{ret_offset_key: OffsetResponse(topic=my_topic,
|
||||
partition=my_partition,
|
||||
error=None,
|
||||
offsets=[30])},
|
||||
# mock earliest offsets
|
||||
{ret_offset_key: OffsetResponse(topic=my_topic,
|
||||
partition=my_partition,
|
||||
error=None,
|
||||
offsets=[0])}
|
||||
]
|
||||
|
||||
# add offsets
|
||||
my_until_offset = 0
|
||||
my_from_offset = 10
|
||||
my_batch_time = datetime.datetime.strptime('2016-01-01 00:10:00',
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
self.add_offset_for_test(my_app, my_topic,
|
||||
my_partition, my_until_offset,
|
||||
my_from_offset, my_batch_time)
|
||||
|
||||
my_until_offset_2 = 10
|
||||
my_from_offset_2 = 20
|
||||
my_batch_time_2 = datetime.datetime.strptime('2016-01-01 01:10:00',
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
self.add_offset_for_test(my_app, my_topic,
|
||||
my_partition, my_until_offset_2,
|
||||
my_from_offset_2, my_batch_time_2)
|
||||
|
||||
# get latest offset spec as dict
|
||||
current_batch_time = datetime.datetime.strptime('2016-01-01 02:10:00',
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
|
||||
# use mysql offset repositories
|
||||
cfg.CONF.set_override(
|
||||
'offsets',
|
||||
'monasca_transform.mysql_offset_specs:MySQLOffsetSpecs',
|
||||
group='repositories')
|
||||
|
||||
# list of pyspark.streaming.kafka.OffsetRange objects
|
||||
offset_range_list = PreHourlyProcessor.\
|
||||
get_processing_offset_range_list(
|
||||
current_batch_time)
|
||||
|
||||
self.assertEqual(my_partition,
|
||||
offset_range_list[0].partition)
|
||||
self.assertEqual(my_topic,
|
||||
offset_range_list[0].topic)
|
||||
self.assertEqual(20,
|
||||
offset_range_list[0].fromOffset)
|
||||
self.assertEqual(30,
|
||||
offset_range_list[0].untilOffset)
|
||||
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||
'PreHourlyProcessor.get_app_name')
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||
'PreHourlyProcessor.get_kafka_topic')
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||
'PreHourlyProcessor._get_offsets_from_kafka')
|
||||
def test_get_effective_offset_range_list(self,
|
||||
kafka_get_offsets,
|
||||
kafka_topic_name,
|
||||
app_name):
|
||||
# setup
|
||||
my_app = uuidutils.generate_uuid()
|
||||
my_topic = uuidutils.generate_uuid()
|
||||
|
||||
# mock app_name, topic_name, partition
|
||||
app_name.return_value = my_app
|
||||
kafka_topic_name.return_value = my_topic
|
||||
my_partition = 1
|
||||
|
||||
ret_offset_key = "_".join((my_topic, str(my_partition)))
|
||||
kafka_get_offsets.side_effect = [
|
||||
# mock latest offsets in kafka
|
||||
{ret_offset_key: OffsetResponse(topic=my_topic,
|
||||
partition=my_partition,
|
||||
error=None,
|
||||
offsets=[3000])},
|
||||
# mock earliest offsets in kafka
|
||||
{ret_offset_key: OffsetResponse(topic=my_topic,
|
||||
partition=my_partition,
|
||||
error=None,
|
||||
offsets=[0])}
|
||||
]
|
||||
|
||||
# add offsets
|
||||
my_until_offset = 500
|
||||
my_from_offset = 1000
|
||||
my_batch_time = datetime.datetime.strptime('2016-01-01 00:10:00',
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
self.add_offset_for_test(my_app, my_topic,
|
||||
my_partition, my_until_offset,
|
||||
my_from_offset, my_batch_time)
|
||||
|
||||
my_until_offset_2 = 1000
|
||||
my_from_offset_2 = 2000
|
||||
my_batch_time_2 = datetime.datetime.strptime('2016-01-01 01:10:00',
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
self.add_offset_for_test(my_app, my_topic,
|
||||
my_partition, my_until_offset_2,
|
||||
my_from_offset_2, my_batch_time_2)
|
||||
|
||||
# get latest offset spec as dict
|
||||
current_batch_time = datetime.datetime.strptime('2016-01-01 02:10:00',
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
|
||||
# use mysql offset repositories
|
||||
cfg.CONF.set_override(
|
||||
'offsets',
|
||||
'monasca_transform.mysql_offset_specs:MySQLOffsetSpecs',
|
||||
group='repositories')
|
||||
|
||||
# list of pyspark.streaming.kafka.OffsetRange objects
|
||||
offset_range_list = PreHourlyProcessor.\
|
||||
get_processing_offset_range_list(
|
||||
current_batch_time)
|
||||
|
||||
# effective batch range list
|
||||
# should cover range of starting from (latest - 1) offset version to
|
||||
# latest
|
||||
offset_range_list = PreHourlyProcessor.get_effective_offset_range_list(
|
||||
offset_range_list)
|
||||
|
||||
self.assertEqual(my_partition,
|
||||
offset_range_list[0].partition)
|
||||
self.assertEqual(my_topic,
|
||||
offset_range_list[0].topic)
|
||||
self.assertEqual(500,
|
||||
offset_range_list[0].fromOffset)
|
||||
self.assertEqual(3000,
|
||||
offset_range_list[0].untilOffset)
|
||||
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert',
|
||||
DummyInsert)
|
||||
@mock.patch('monasca_transform.processor.pre_hourly_processor.'
|
||||
|
@ -164,3 +164,48 @@ class TestMySQLOffsetSpecs(unittest.TestCase):
|
||||
int(offset_value.get_from_offset()))
|
||||
self.assertEqual(used_value.get('app_name'),
|
||||
offset_value.get_app_name())
|
||||
|
||||
def test_get_offset_by_revision(self):
|
||||
topic_1 = uuidutils.generate_uuid()
|
||||
partition_1 = 0
|
||||
until_offset_1 = 10
|
||||
from_offset_1 = 0
|
||||
app_name_1 = uuidutils.generate_uuid()
|
||||
|
||||
my_batch_time = datetime.datetime.strptime('2016-01-01 00:10:00',
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
|
||||
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
|
||||
app_name=app_name_1,
|
||||
from_offset=from_offset_1,
|
||||
until_offset=until_offset_1,
|
||||
batch_time_info=my_batch_time)
|
||||
|
||||
until_offset_2 = 20
|
||||
from_offset_2 = 10
|
||||
my_batch_time2 = datetime.datetime.strptime('2016-01-01 01:10:00',
|
||||
'%Y-%m-%d %H:%M:%S')
|
||||
|
||||
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
|
||||
app_name=app_name_1,
|
||||
from_offset=from_offset_2,
|
||||
until_offset=until_offset_2,
|
||||
batch_time_info=my_batch_time2)
|
||||
|
||||
# get penultimate revision
|
||||
penultimate_revision = 2
|
||||
kafka_offset_specs = self.kafka_offset_specs\
|
||||
.get_kafka_offsets_by_revision(app_name_1,
|
||||
penultimate_revision)
|
||||
|
||||
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
|
||||
offset_value_1 = kafka_offset_specs.get(offset_key_1)
|
||||
|
||||
used_values = {}
|
||||
used_values[offset_key_1] = {
|
||||
"topic": topic_1, "partition": partition_1, "app_name": app_name_1,
|
||||
"from_offset": from_offset_1, "until_offset": until_offset_1
|
||||
}
|
||||
|
||||
self.assertions_on_offset(used_value=used_values.get(offset_key_1),
|
||||
offset_value=offset_value_1)
|
||||
|
@ -14,8 +14,16 @@ enable_pre_hourly_processor = False
|
||||
enable_instance_usage_df_cache = False
|
||||
instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
enable_batch_time_filtering = True
|
||||
effective_batch_revision = 2
|
||||
|
||||
[service]
|
||||
enable_record_store_df_cache = False
|
||||
record_store_df_cache_storage_level = MEMORY_ONLY_SER_2
|
||||
enable_debug_log_entries = true
|
||||
|
||||
[database]
|
||||
server_type = mysql:thin
|
||||
host = localhost
|
||||
database_name = monasca_transform
|
||||
username = m-transform
|
||||
password = password
|
Loading…
Reference in New Issue
Block a user