From 3a00ef33ba70c827451f3a60de6d69e6cff2fe19 Mon Sep 17 00:00:00 2001 From: Dexter Fryar Date: Sat, 7 Mar 2015 19:52:03 -0600 Subject: [PATCH] Added monitoring of storm/threshold engine via StatsD See https://wiki.openstack.org/wiki/Monasca/Monitoring_Of_Monasca Change-Id: Ie995fa31791e61dc3d480f12c2dc99271c6e3e4a --- thresh/pom.xml | 6 +- .../src/main/config/thresh-sample-config.yml | 8 + .../thresh/ThresholdingConfiguration.java | 5 + .../java/monasca/thresh/TopologyModule.java | 31 +- .../monasca/thresh/utils/StatsdConfig.java | 70 ++++ .../thresh/utils/StatsdMetricConsumer.java | 326 ++++++++++++++++++ 6 files changed, 442 insertions(+), 4 deletions(-) create mode 100644 thresh/src/main/java/monasca/thresh/utils/StatsdConfig.java create mode 100644 thresh/src/main/java/monasca/thresh/utils/StatsdMetricConsumer.java diff --git a/thresh/pom.xml b/thresh/pom.xml index b8d6268..08205ca 100644 --- a/thresh/pom.xml +++ b/thresh/pom.xml @@ -122,7 +122,11 @@ mysql-connector-java 5.1.26 - + + com.timgroup + java-statsd-client + 3.1.0 + org.apache.kafka kafka_2.9.2 diff --git a/thresh/src/main/config/thresh-sample-config.yml b/thresh/src/main/config/thresh-sample-config.yml index f753877..af89a03 100644 --- a/thresh/src/main/config/thresh-sample-config.yml +++ b/thresh/src/main/config/thresh-sample-config.yml @@ -1,6 +1,14 @@ metricSpoutThreads: 2 metricSpoutTasks: 2 +statsdConfig: + host: localhost + port: 8125 + prefix: monasca.storm. + dimensions: !!map + service : monitoring + component : storm + metricSpoutConfig: maxWaitTime: 500 diff --git a/thresh/src/main/java/monasca/thresh/ThresholdingConfiguration.java b/thresh/src/main/java/monasca/thresh/ThresholdingConfiguration.java index 6f23e05..e8b4c60 100644 --- a/thresh/src/main/java/monasca/thresh/ThresholdingConfiguration.java +++ b/thresh/src/main/java/monasca/thresh/ThresholdingConfiguration.java @@ -19,6 +19,7 @@ package monasca.thresh; import monasca.common.configuration.KafkaProducerConfiguration; import monasca.thresh.infrastructure.thresholding.DataSourceFactory; +import monasca.thresh.utils.StatsdConfig; import java.io.Serializable; import java.util.Set; @@ -73,4 +74,8 @@ public class ThresholdingConfiguration implements Serializable { /** Database configuration. */ @Valid @NotNull public DataSourceFactory database = new DataSourceFactory(); + + /** StatsD configuration. */ + @Valid @NotNull public StatsdConfig statsdConfig = new StatsdConfig(); + } diff --git a/thresh/src/main/java/monasca/thresh/TopologyModule.java b/thresh/src/main/java/monasca/thresh/TopologyModule.java index 563a9c2..f7eba1f 100644 --- a/thresh/src/main/java/monasca/thresh/TopologyModule.java +++ b/thresh/src/main/java/monasca/thresh/TopologyModule.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * Copyright (c) 2015 Hewlett-Packard Development Company, L.P. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,7 @@ import monasca.thresh.infrastructure.thresholding.MetricAggregationBolt; import monasca.thresh.infrastructure.thresholding.MetricFilteringBolt; import monasca.thresh.infrastructure.thresholding.MetricSpout; import monasca.thresh.infrastructure.thresholding.deserializer.EventDeserializer; +import monasca.thresh.utils.StatsdMetricConsumer; import monasca.common.util.Injector; @@ -69,6 +70,30 @@ public class TopologyModule extends AbstractModule { stormConfig = new Config(); stormConfig.setNumWorkers(config.numWorkerProcesses); stormConfig.setNumAckers(config.numAckerThreads); + + /* Configure the StatsdMetricConsumer */ + java.util.Map statsdConfig = new java.util.HashMap<>(); + + /* + * Catch the case where the config file was not updated + * in /etc/monasca/thresh-config.yml + * note that you get default values if these are absent + */ + if (config.statsdConfig.getHost() != null) + statsdConfig.put(StatsdMetricConsumer.STATSD_HOST, + config.statsdConfig.getHost()); + if (config.statsdConfig.getPort() != null) + statsdConfig.put(StatsdMetricConsumer.STATSD_PORT, + config.statsdConfig.getPort()); + if (config.statsdConfig.getPrefix() != null) + statsdConfig.put(StatsdMetricConsumer.STATSD_PREFIX, + config.statsdConfig.getPrefix()); + if (config.statsdConfig.getDimensions() != null) + statsdConfig.put(StatsdMetricConsumer.STATSD_DIMENSIONS, + config.statsdConfig.getDimensions()); + + stormConfig.registerMetricsConsumer(StatsdMetricConsumer.class, + statsdConfig, 2); } return stormConfig; @@ -113,7 +138,7 @@ public class TopologyModule extends AbstractModule { .allGrouping("event-bolt", EventProcessingBolt.ALARM_DEFINITION_EVENT_STREAM_ID) .setNumTasks(config.filteringBoltTasks); - // Filtering /Event -> Alarm Creation + // Filtering /Event -> Alarm Creation builder .setBolt("alarm-creation-bolt", new AlarmCreationBolt(config.database), config.alarmCreationBoltThreads) @@ -125,7 +150,7 @@ public class TopologyModule extends AbstractModule { .allGrouping("event-bolt", EventProcessingBolt.ALARM_DEFINITION_EVENT_STREAM_ID) .setNumTasks(config.alarmCreationBoltTasks); - // Filtering / Event / Alarm Creation -> Aggregation + // Filtering / Event / Alarm Creation -> Aggregation builder .setBolt("aggregation-bolt", new MetricAggregationBolt(config), config.aggregationBoltThreads) diff --git a/thresh/src/main/java/monasca/thresh/utils/StatsdConfig.java b/thresh/src/main/java/monasca/thresh/utils/StatsdConfig.java new file mode 100644 index 0000000..a672daa --- /dev/null +++ b/thresh/src/main/java/monasca/thresh/utils/StatsdConfig.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2015 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monasca.thresh.utils; + +import java.io.Serializable; +import java.util.Map; + +/* + * Intended to deserialize the statsdConfig element in the + * /etc/monasca/thresh-config.yml + */ +public class StatsdConfig implements Serializable { + + private static final long serialVersionUID = 3634080153227179376L; + + private String host; + + private Integer port; + + private String prefix; + + private Map dimensions; + + public Map getDimensions() { + return dimensions; + } + + public void setDimensions(Map dimensions) { + this.dimensions = dimensions; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public Integer getPort() { + return port; + } + + public void setPort(Integer port) { + this.port = port; + } + + public String getPrefix() { + return prefix; + } + + public void setPrefix(String prefix) { + this.prefix = prefix; + } +} diff --git a/thresh/src/main/java/monasca/thresh/utils/StatsdMetricConsumer.java b/thresh/src/main/java/monasca/thresh/utils/StatsdMetricConsumer.java new file mode 100644 index 0000000..392d4cf --- /dev/null +++ b/thresh/src/main/java/monasca/thresh/utils/StatsdMetricConsumer.java @@ -0,0 +1,326 @@ +/* + * Copyright (c) 2015 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monasca.thresh.utils; + +import java.io.IOException; +import java.io.StringWriter; +import java.nio.charset.Charset; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import monasca.common.streaming.storm.Logging; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.Config; +import backtype.storm.metric.api.IMetricsConsumer; +import backtype.storm.task.IErrorReporter; +import backtype.storm.task.TopologyContext; + +import com.fasterxml.jackson.core.JsonGenerationException; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.timgroup.statsd.NonBlockingUdpSender; +import com.timgroup.statsd.StatsDClientErrorHandler; + +public class StatsdMetricConsumer implements IMetricsConsumer { + + public static final String STATSD_HOST = "metrics.statsd.host"; + public static final String STATSD_PORT = "metrics.statsd.port"; + public static final String STATSD_PREFIX = "metrics.statsd.prefix"; + public static final String STATSD_DIMENSIONS = "metrics.statsd.dimensions"; + + String topologyName; + String statsdHost = "localhost"; + int statsdPort = 8125; + String statsdPrefix = "monasca.storm."; + String monascaStatsdDimPrefix = "|#"; + String defaultDimensions = new StringBuilder().append(monascaStatsdDimPrefix) + .append("{\"service\":\"monitoring\",\"component\":\"storm\"}") + .toString(); + String statsdDimensions = defaultDimensions; + + /* + * https://github.com/stackforge/monasca-agent#statsd + * + * Example metric produced from this code from Monasca statsd + * filtering-bolt.sendqueue.read_pos:69 + * |c|#{"hostname":"localhost","service":"monitoring","component":"storm"} + * + * This is the Monasca specific string that adds the dimension element to + * StatsD + * |#{"hostname":"localhost","service":"monitoring","component":"storm"} + * + * To debug this code: + * vi /usr/local/lib/python2.7/dist-packages/monasca_agent/statsd/udp.py + * start():186 log.info('%s' % str(message)) + * sudo service monasca-agent restart + * tail -f /var/log/monasca/agent/statsd.log + * /vagrant/tests/smoke.py + * + * Note: You only know that "|#" is a delimeter by looking at the Monasca + * Python Agent code since the Monasca StatsD server is a derivative of what + * the general purpose StatsD implements and it is executed in the Monasca + * Agent which was forked from DataDog. It extends UDP data by postfixing a + * json struct describing the dimensions. + */ + + transient NonBlockingUdpSender udpclient; + private transient StatsDClientErrorHandler handler; + private transient Logger logger; + + @Override + public void prepare(Map stormConf, Object registrationArgument, + TopologyContext context, IErrorReporter errorReporter) { + logger = LoggerFactory.getLogger(Logging.categoryFor(getClass(), context)); + parseConfig(stormConf); + + if (registrationArgument instanceof Map) { + parseConfig((Map) registrationArgument); + } + + initClient(); + + logger.info( + "statsdPrefix ({}), topologyName ({}), clean(topologyName) ({})", + new Object[] { statsdPrefix, topologyName, clean(topologyName) }); + } + + private void initClient() { + try { + handler = statsdErrorHandler; + udpclient = new NonBlockingUdpSender(statsdHost, statsdPort, + Charset.defaultCharset(), handler); + } + catch (IOException e) { + /* NonBlockingUdpSender only throws an IOException */ + logger.error("{}", e); + } + catch (Exception e) { + /* General purpose exception */ + logger.error("{}", e); + } + } + + StatsDClientErrorHandler statsdErrorHandler = new StatsDClientErrorHandler() { + + @Override + public void handle(Exception e) { + logger.error("Error with StatsD UDP client! {}", e); + } + }; + + @SuppressWarnings("unchecked") + void parseConfig(Map conf) { + if (conf.containsKey(Config.TOPOLOGY_NAME)) { + topologyName = (String) conf.get(Config.TOPOLOGY_NAME); + } + + if (conf.containsKey(STATSD_HOST)) { + statsdHost = (String) conf.get(STATSD_HOST); + } + + if (conf.containsKey(STATSD_PORT)) { + statsdPort = ((Number) conf.get(STATSD_PORT)).intValue(); + } + + if (conf.containsKey(STATSD_PREFIX)) { + statsdPrefix = (String) conf.get(STATSD_PREFIX); + if (!statsdPrefix.endsWith(".")) { + statsdPrefix += "."; + } + } + + if (conf.containsKey(STATSD_DIMENSIONS)) { + statsdDimensions = mapToJsonStr((Map) conf + .get(STATSD_DIMENSIONS)); + if (!isValidJSON(statsdDimensions)) { + logger.error("Ignoring dimensions element invalid JSON ({})", + new Object[] { statsdDimensions }); + // You get default dimensions + statsdDimensions = monascaStatsdDimPrefix + defaultDimensions; + } + else { + statsdDimensions = monascaStatsdDimPrefix + statsdDimensions; + } + } + } + + private String mapToJsonStr(Map inputMap) { + String results = new String(); + ObjectMapper mapper = new ObjectMapper(); + StringWriter sw = new StringWriter(); + + try { + mapper.writeValue(sw, inputMap); + results = sw.toString(); + } + catch (JsonGenerationException e) { + logger.error("{}", e); + } + catch (JsonMappingException e) { + logger.error("{}", e); + } + catch (IOException e) { + logger.error("{}", e); + } + + return results; + } + + private boolean isValidJSON(final String json) { + boolean valid = false; + try { + final JsonParser parser = new ObjectMapper().getFactory().createParser( + json); + while (parser.nextToken() != null) { + } + valid = true; + } + catch (JsonParseException jpe) { + valid = false; + } + catch (IOException ioe) { + valid = false; + } + return valid; + } + + String clean(String s) { + /* storm metrics look pretty bad so cleanup is needed */ + return s.replace('.', '_').replace('/', '_').replace(':', '_') + .replaceAll("__", ""); + } + + @Override + public void handleDataPoints(TaskInfo taskInfo, + Collection dataPoints) { + for (Metric metric : dataPointsToMetrics(taskInfo, dataPoints)) { + report(metric.name, metric.value, metric.dimensions); + } + } + + public static class Metric { + String name; + Double value; + String dimensions; + + public Metric(String name, Double value, String dimensions) { + this.name = name; + this.value = value; + this.dimensions = dimensions; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Metric other = (Metric) obj; + if (name == null) { + if (other.name != null) + return false; + } + else if (!name.equals(other.name)) + return false; + if (value != other.value) + return false; + if (!dimensions.equals(other.dimensions)) + return false; + return true; + } + + @Override + public String toString() { + return "Metric [name=" + name + ", value=" + value + ", dimensions=" + + dimensions + "]"; + } + } + + private List dataPointsToMetrics(TaskInfo taskInfo, + Collection dataPoints) { + List res = new LinkedList<>(); + + StringBuilder sb = new StringBuilder().append( + clean(taskInfo.srcComponentId)).append("."); + + int hdrLength = sb.length(); + + for (DataPoint p : dataPoints) { + + sb.delete(hdrLength, sb.length()); + sb.append(clean(p.name)); + + logger.debug("Storm StatsD metric p.name ({}) p.value ({})", + new Object[] { p.name, p.value }); + + if (p.value instanceof Number) { + res.add(new Metric(sb.toString(), ((Number) p.value).doubleValue(), + statsdDimensions)); + } + // There is a map of data points and it's not empty + else if (p.value instanceof Map && !(((Map) (p.value)).isEmpty())) { + int hdrAndNameLength = sb.length(); + @SuppressWarnings("rawtypes") + Map map = (Map) p.value; + for (Object subName : map.keySet()) { + Object subValue = map.get(subName); + if (subValue instanceof Number) { + sb.delete(hdrAndNameLength, sb.length()); + sb.append(".").append(clean(subName.toString())); + + res.add(new Metric(sb.toString(), + ((Number) subValue).doubleValue(), statsdDimensions)); + } + } + } + } + return res; + } + + /* + * Since the Java client doesn't support the Monasca metric type we need to + * build it with a raw UDP request + */ + public void report(String s, Double number, String dimensions) { + if (udpclient != null) { + StringBuilder statsdMessage = new StringBuilder().append(statsdPrefix) + .append(s).append(":").append(String.valueOf(number)).append("|c") + .append(statsdDimensions); + logger.debug("reporting: {}={}{}", s, number, dimensions); + udpclient.send(statsdMessage.toString()); + } + else { + /* Try to setup the UDP client since it was null */ + initClient(); + } + } + + @Override + public void cleanup() { + udpclient.stop(); + } +}