/* * (C) Copyright 2014,2016 Hewlett Packard Enterprise Development Company LP. * Copyright 2016 FUJITSU LIMITED * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. * See the License for the specific language governing permissions and * limitations under the License. */ package monasca.thresh.infrastructure.thresholding; import monasca.common.model.metric.Metric; import monasca.thresh.ThresholdingConfiguration; import monasca.thresh.domain.model.MetricDefinitionAndTenantId; import monasca.thresh.domain.model.SubAlarm; import monasca.thresh.domain.model.SubAlarmStats; import monasca.thresh.domain.model.SubExpression; import monasca.thresh.domain.model.TenantIdAndMetricName; import monasca.thresh.domain.service.SubAlarmStatsRepository; import monasca.thresh.utils.Logging; import monasca.thresh.utils.Streams; import monasca.thresh.utils.Tuples; import org.apache.storm.Config; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; /** * Aggregates metrics for individual alarms. Receives metric/alarm tuples and tick tuples, and * outputs alarm information whenever an alarm's state changes. Concerned with alarms that relate to * a specific metric. * * The TICK_TUPLE_SECONDS_KEY value should be no greater than the smallest possible window width. * This ensures that the window slides in time with the expected metrics. * * */ public class MetricAggregationBolt extends BaseRichBolt { private static final long serialVersionUID = 5624314196838090726L; public static final String TICK_TUPLE_SECONDS_KEY = "monasca.thresh.aggregation.tick.seconds"; public static final String[] FIELDS = new String[] {"alarmId", "subAlarm"}; public static final String METRIC_AGGREGATION_CONTROL_STREAM = "MetricAggregationControl"; public static final String[] METRIC_AGGREGATION_CONTROL_FIELDS = new String[] {"directive"}; public static final String METRICS_BEHIND = "MetricsBehind"; private final ThresholdingConfiguration config; final Map metricDefToSubAlarmStatsRepos = new HashMap<>(); private final Set subAlarmStatsSet = new HashSet<>(); private final Map subAlarmToSubAlarmStats = new HashMap<>(); private transient Logger logger; /** Namespaces for which metrics are received sporadically */ private Set sporadicMetricNamespaces = Collections.emptySet(); private OutputCollector collector; private boolean upToDate = true; public MetricAggregationBolt(ThresholdingConfiguration config) { this.config = config; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(FIELDS)); } @Override public void execute(Tuple tuple) { logger.debug("tuple: {}", tuple); try { if (Tuples.isTickTuple(tuple)) { evaluateAlarmsAndSlideWindows(); } else { if (Streams.DEFAULT_STREAM_ID.equals(tuple.getSourceStreamId())) { TenantIdAndMetricName timn = (TenantIdAndMetricName) tuple.getValue(0); Metric metric = (Metric) tuple.getValueByField("metric"); MetricDefinitionAndTenantId metricDefinitionAndTenantId = new MetricDefinitionAndTenantId(metric.definition(), timn.getTenantId()); aggregateValues(metricDefinitionAndTenantId, metric); } else if (METRIC_AGGREGATION_CONTROL_STREAM.equals(tuple.getSourceStreamId())) { processControl(tuple.getString(0)); } else { String eventType = tuple.getString(0); if (EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) { final MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) tuple.getValue(2); final String subAlarmId = tuple.getString(4); if (EventProcessingBolt.DELETED.equals(eventType)) { handleAlarmDeleted(metricDefinitionAndTenantId, subAlarmId); } else if (EventProcessingBolt.RESEND.equals(eventType)) { handleAlarmResend(metricDefinitionAndTenantId, subAlarmId); } } else if (EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID.equals(tuple .getSourceStreamId())) { if (EventProcessingBolt.UPDATED.equals(eventType)) { handleAlarmSubExpressionUpdated((SubExpression) tuple.getValue(1)); } } else if (AlarmCreationBolt.ALARM_CREATION_STREAM.equals(tuple .getSourceStreamId())) { MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) tuple.getValue(2); final SubAlarm subAlarm = (SubAlarm) tuple.getValue(4); if (EventProcessingBolt.CREATED.equals(eventType)) { handleAlarmCreated(metricDefinitionAndTenantId, subAlarm); } } } } } catch (Exception e) { logger.error("Error processing tuple {}", tuple, e); } finally { collector.ack(tuple); } } private void processControl(final String directive) { if (METRICS_BEHIND.equals(directive)) { logger.debug("Received {}", directive); this.upToDate = false; } else { logger.error("Unknown directive '{}'", directive); } } @Override public Map getComponentConfiguration() { Map conf = new HashMap(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Integer.valueOf(System.getProperty(TICK_TUPLE_SECONDS_KEY, "60")).intValue()); return conf; } @Override @SuppressWarnings("rawtypes") public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { logger = LoggerFactory.getLogger(Logging.categoryFor(getClass(), context)); logger.info("Preparing"); this.collector = collector; } /** * Aggregates values for the {@code metric} that are within the periods defined for the alarm. */ void aggregateValues(MetricDefinitionAndTenantId metricDefinitionAndTenantId, Metric metric) { SubAlarmStatsRepository subAlarmStatsRepo = getOrCreateSubAlarmStatsRepo(metricDefinitionAndTenantId); if (subAlarmStatsRepo == null || metric == null) { return; } for (SubAlarmStats stats : subAlarmStatsRepo.get()) { long timestamp_secs = metric.timestamp/1000; if (stats.getStats().addValue(metric.value, timestamp_secs)) { logger.trace("Aggregated value {} at {} for {}. Updated {}", metric.value, metric.timestamp, metricDefinitionAndTenantId, stats.getStats()); if (stats.evaluateAndSlideWindow(timestamp_secs, config.alarmDelay)) { sendSubAlarmStateChange(stats); } } else { logger.warn("Metric is too old, age {} seconds: timestamp {} for {}, {}", currentTimeSeconds() - timestamp_secs, timestamp_secs, metricDefinitionAndTenantId, stats.getStats()); } } } /** * Evaluates all SubAlarms for all SubAlarmStatsRepositories using an evaluation time of 1 minute * ago, then sliding the window to the current time. */ void evaluateAlarmsAndSlideWindows() { logger.debug("evaluateAlarmsAndSlideWindows called"); long newWindowTimestamp = currentTimeSeconds(); for (SubAlarmStats subAlarmStats : subAlarmStatsSet) { if (upToDate) { logger.debug("Evaluating {}", subAlarmStats); if (subAlarmStats.evaluateAndSlideWindow(newWindowTimestamp, config.alarmDelay)) { sendSubAlarmStateChange(subAlarmStats); } } else { subAlarmStats.slideWindow(newWindowTimestamp, config.alarmDelay); } } if (!upToDate) { logger.info("Did not evaluate SubAlarms because Metrics are not up to date"); upToDate = true; } } private void sendSubAlarmStateChange(SubAlarmStats subAlarmStats) { logger.debug("Alarm state changed for {}", subAlarmStats); collector.emit(new Values(subAlarmStats.getSubAlarm().getAlarmId(), duplicate(subAlarmStats .getSubAlarm()))); } /** * Create a copy of SubAlarm to prevent ConcurrentModificationExceptions thrown by Storm. * The AlarmSubExpression is not immutable, but since it is only replaced in this Bolt, * it can be reused * @param original SubAlarm to be duplicated * @return copy of original */ public SubAlarm duplicate(final SubAlarm original) { final SubAlarm newSubAlarm = new SubAlarm(original.getId(), original.getAlarmId(), new SubExpression( original.getAlarmSubExpressionId(), original.getExpression()), original.getState()); newSubAlarm.setNoState(original.isNoState()); newSubAlarm.setCurrentValues(original.getCurrentValues()); return newSubAlarm; } /** * Only used for testing. * * @return */ protected long currentTimeSeconds() { return System.currentTimeMillis() / 1000; } /** * Returns an existing or newly created SubAlarmStatsRepository for the * {@code metricDefinitionAndTenantId}. Newly created SubAlarmStatsRepositories are initialized * with stats whose view ends one minute from now. */ SubAlarmStatsRepository getOrCreateSubAlarmStatsRepo( MetricDefinitionAndTenantId metricDefinitionAndTenantId) { SubAlarmStatsRepository subAlarmStatsRepo = metricDefToSubAlarmStatsRepos.get(metricDefinitionAndTenantId); if (subAlarmStatsRepo == null) { // This likely happened because MetricFilteringBolt is sending a MetricDefinitionAndTenantId // before the AlarmCreationBolt has created a sub alarm for it logger.debug("Failed to find sub alarms for {}", metricDefinitionAndTenantId); } return subAlarmStatsRepo; } /** * Adds the {@code subAlarm} subAlarmStatsRepo for the {@code metricDefinitionAndTenantId}. */ void handleAlarmCreated(MetricDefinitionAndTenantId metricDefinitionAndTenantId, SubAlarm subAlarm) { logger.info("Received AlarmCreatedEvent for {}", subAlarm); addSubAlarm(metricDefinitionAndTenantId, subAlarm); } void handleAlarmResend(MetricDefinitionAndTenantId metricDefinitionAndTenantId, final String subAlarmId) { final SubAlarmStats oldSubAlarmStats = findExistingSubAlarmStats(metricDefinitionAndTenantId, subAlarmId); if (oldSubAlarmStats == null) { return; } final SubAlarm oldSubAlarm = oldSubAlarmStats.getSubAlarm(); oldSubAlarm.setNoState(true); // Have it send its state again so the Alarm can be evaluated logger.info("Forcing SubAlarm {} to send state at next evaluation", oldSubAlarm); } private SubAlarmStats findExistingSubAlarmStats( MetricDefinitionAndTenantId metricDefinitionAndTenantId, String subAlarmId) { final SubAlarmStatsRepository oldSubAlarmStatsRepo = metricDefToSubAlarmStatsRepos.get(metricDefinitionAndTenantId); if (oldSubAlarmStatsRepo == null) { logger.error("Did not find SubAlarmStatsRepository for MetricDefinition {}", metricDefinitionAndTenantId); return null; } final SubAlarmStats oldSubAlarmStats = oldSubAlarmStatsRepo.get(subAlarmId); if (oldSubAlarmStats == null) { logger.error("Did not find existing SubAlarm {} in SubAlarmStatsRepository", subAlarmId); return null; } return oldSubAlarmStats; } private void addSubAlarm(MetricDefinitionAndTenantId metricDefinitionAndTenantId, SubAlarm subAlarm) { SubAlarmStats subAlarmStats = subAlarmToSubAlarmStats.get(subAlarm.getId()); if (subAlarmStats == null) { long viewEndTimestamp = currentTimeSeconds() + subAlarm.getExpression().getPeriod(); subAlarmStats = new SubAlarmStats(subAlarm, viewEndTimestamp); subAlarmToSubAlarmStats.put(subAlarm.getId(), subAlarmStats); subAlarmStatsSet.add(subAlarmStats); } SubAlarmStatsRepository subAlarmStatsRepo = metricDefToSubAlarmStatsRepos.get(metricDefinitionAndTenantId); if (subAlarmStatsRepo == null) { subAlarmStatsRepo = new SubAlarmStatsRepository(); metricDefToSubAlarmStatsRepos.put(metricDefinitionAndTenantId, subAlarmStatsRepo); } subAlarmStatsRepo.add(subAlarm.getId(), subAlarmStats); } protected boolean subAlarmRemoved(final String subAlarmId, MetricDefinitionAndTenantId metricDefinitionAndTenantId) { if (subAlarmToSubAlarmStats.containsKey(subAlarmId)) { return false; } SubAlarmStatsRepository subAlarmStatsRepo = metricDefToSubAlarmStatsRepos.get(metricDefinitionAndTenantId); if (subAlarmStatsRepo != null) { if (metricDefToSubAlarmStatsRepos.containsKey(subAlarmId)) { return false; } } for (final SubAlarmStats subAlarmStats : subAlarmStatsSet) { if (subAlarmStats.getSubAlarm().getId().equals(subAlarmId)) { return false; } } return true; } /** * Adds the {@code subAlarm} subAlarmStatsRepo for the {@code metricDefinition}. * * MetricDefinition can't have changed, just how it is evaluated */ void handleAlarmSubExpressionUpdated(SubExpression subExpression) { logger.debug("Processing SubExpression updated for {}", subExpression); int updated = 0; for (final SubAlarmStats subAlarmStats : subAlarmStatsSet) { if (subAlarmStats.getSubAlarm().getAlarmSubExpressionId().equals(subExpression.getId())) { long viewEndTimestamp = currentTimeSeconds() + subExpression.getAlarmSubExpression().getPeriod(); subAlarmStats.updateSubAlarm(subExpression.getAlarmSubExpression(), viewEndTimestamp); updated++; } } logger.debug("Updated {} SubAlarms", updated); } /** * Removes the sub-alarm for the {@code subAlarmId} from the subAlarmStatsRepo for the * {@code metricDefinitionAndTenantId}. */ void handleAlarmDeleted(MetricDefinitionAndTenantId metricDefinitionAndTenantId, String subAlarmId) { logger.debug("Received AlarmDeletedEvent for subAlarm id {}", subAlarmId); SubAlarmStatsRepository subAlarmStatsRepo = metricDefToSubAlarmStatsRepos.get(metricDefinitionAndTenantId); if (subAlarmStatsRepo != null) { subAlarmStatsRepo.remove(subAlarmId); if (subAlarmStatsRepo.isEmpty()) { metricDefToSubAlarmStatsRepos.remove(metricDefinitionAndTenantId); } } final SubAlarmStats subAlarmStats = subAlarmToSubAlarmStats.remove(subAlarmId); if (subAlarmStats != null) { subAlarmStatsSet.remove(subAlarmStats); } } }