Add json_plugin: simple way to post metrics to Monasca
This patch adds a plugin that reads metrics from JSON files. This makes it very easy to integrate with Monasca -- instead of writing a new plugin, simply write your metrics to a file. The advantage of this mechanism is that: - It's simple and easy to understand - It is asynchronous with Monasca Agent. There are two technical advantages to this: - Your check does not need to run at same frequency as the agent. - If your check blocks, this does not affect other checks in the agent because the agent is not blocked. - You can create the metrics as a side effect of doing other stuff. For example, a server process can write the JSON file while it is processing requests. With the normal plugin architecture, you need to write code that talks to your server process to access the metrics. - You can write the check code in any language. Change-Id: I1e998fa677e16cc04d46edd46d0e6433131825e7
This commit is contained in:
parent
c170af038d
commit
a72f907eef
11
conf.d/json_plugin.yaml.example
Normal file
11
conf.d/json_plugin.yaml.example
Normal file
@ -0,0 +1,11 @@
|
||||
init_config: null
|
||||
instances:
|
||||
- built_by: JsonPlugin
|
||||
metrics_dir: /var/cache/monasca_json_plugin
|
||||
name: /var/cache/monasca_json_plugin
|
||||
- built_by: Me
|
||||
metrics_file: /var/cache/my_dir/my_metrics.json
|
||||
name: Mine1
|
||||
- built_by: Me
|
||||
metrics_file: /dev/shm/more_metrics.json
|
||||
name: Mine2
|
117
docs/Plugins.md
117
docs/Plugins.md
@ -139,6 +139,7 @@ The following plugins are delivered via setup as part of the standard plugin che
|
||||
| http_metrics | | |
|
||||
| iis | | Microsoft Internet Information Services |
|
||||
| jenkins | | |
|
||||
| json_plugin | | |
|
||||
| kafka_consumer | | |
|
||||
| kibana | **kibana_install_dir**/kibana.yml | Integration to Kibana |
|
||||
| kyototycoon | | |
|
||||
@ -1025,6 +1026,122 @@ See [the example configuration](https://github.com/openstack/monasca-agent/blob/
|
||||
## Jenkins
|
||||
See [the example configuration](https://github.com/openstack/monasca-agent/blob/master/conf.d/jenkins.yaml.example) for how to configure the Jenkins plugin.
|
||||
|
||||
## JsonPlugin
|
||||
This plugin allows you to report metrics by simply writing the metrics to a file. The plugin reads the file
|
||||
and sends the metrics to Monasca.
|
||||
|
||||
### Simple Reporting
|
||||
|
||||
The simplest approach is to create a file in the /var/cache/monasca_json_plugin directory. The file should
|
||||
contain a list of metrics in JSON format as shown in the following example. The file must have
|
||||
a ".json" extension in the name.
|
||||
|
||||
Simple Example -- /var/cache/monasca_json_plugin/my-metrics-file.json:
|
||||
```
|
||||
[
|
||||
{"name": "metric1", "value": 10.1, "timestamp": 1475596165},
|
||||
{"name": "metric2", "value": 12.3, "timestamp": 1475596165}
|
||||
]
|
||||
```
|
||||
|
||||
In the above example, the "name", "value" and "timestamp" of each measurement is reported. The following keys are available:
|
||||
|
||||
| Key | Description |
|
||||
| ----------- | ---------- |
|
||||
| name | Required. The name of the metric. The key "metric" may be used instead of "name". |
|
||||
| value | Required. The value of the measurement. This is a floating point number. |
|
||||
| timestamp | Optional (if replace_timestamps is true; see below); otherwise required. The time of the measurement. Uses UNIX time epoch value. Note: this is seconds, not mulliseconds, since the epoch.|
|
||||
| dimensions | Optional. Dimensions of the metric as a set of key/value pairs. |
|
||||
| value_meta | Optional. Value meta of the metric as a set of key/value pairs. |
|
||||
|
||||
### Writing and Locking the Metrics File
|
||||
|
||||
You should take an exclusive lock on the file while you write new metrics
|
||||
(this plugin takes a shared lock). You must close or flush the file
|
||||
after writing new data to make sure the data is written to the file.
|
||||
|
||||
Example of writing metrics file:
|
||||
|
||||
```
|
||||
metric_data = [{"name": "metric1", "value": 10.1, "timestamp": time.time()}]
|
||||
max_retries = 10
|
||||
delay = 0.02
|
||||
attempts = 0
|
||||
with open('/var/cache/monasca_json_plugin/my-metrics-file.json', 'w') as fd:
|
||||
while True:
|
||||
attempts += 1
|
||||
try:
|
||||
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
break
|
||||
except IOError as err:
|
||||
if (err.errno not in [errno.EWOULDBLOCK, errno.EACCES] or
|
||||
attempts > max_retries):
|
||||
raise
|
||||
time.sleep(delay * attempts)
|
||||
fd.write(json.dumps(metric_data))
|
||||
```
|
||||
|
||||
### Additional Directives
|
||||
|
||||
You can add additional directives to the JSON file as shown in the following example:
|
||||
|
||||
Additional Directives Example:
|
||||
```
|
||||
{
|
||||
"replace_timestamps": false,
|
||||
"stale_age": 300,
|
||||
"measurements": [
|
||||
{"name": "metric1", "value": 10.1, "timestamp": 1475596165, "dimensions": {"path": "/tmp"}},
|
||||
{"name": "metric2", "value": 12.3, "timestamp": 1475596165, "value_meta": {"msg": "hello world"}}
|
||||
]
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
The additional directives are described in the following table. The directives are optional.
|
||||
|
||||
| Directive | Description |
|
||||
| --------- | ----------- |
|
||||
| replace_timestamps | If true, the timestamps are ignored. Instead, the timestamp of the measurement is set to the current time. Default is false.|
|
||||
| stale_age | The number of seconds after which metrics are considered stale. This stops measurements from a file that is not updating from being reported to Monasca. It defaults to 4 minutes.|
|
||||
|
||||
The main purpose of the stale_age directive is to detect if the JSON file stops updating (e.g., due to a bug or system failure). See the description of the monasca.json_plugin.status metric below.
|
||||
|
||||
The main purpose of the replace_timestamps directive is where the mechanism to write the JSON file runs infrequently or erratically. Every time Monasa Agent runs, the metrics
|
||||
are reported with the current time -- whether or not the file is updated. In this mode, you do not need to supply a timestamp (in fact, any timestamp you include is ignored). Also the
|
||||
stale_age directive is also ignored.
|
||||
|
||||
### Custom JSON file locations
|
||||
|
||||
To use the built-in /var/cache/monasca_json_plugin directory, your application must be
|
||||
able to create and write files to that directory. If this is not possible, you can
|
||||
write the JSON file(s) to a different file path. An example of this configuration
|
||||
is in [the example configuration](https://github.com/openstack/monasca-agent/blob/master/conf.d/json_plugin.yaml.example).
|
||||
|
||||
The Monasca Agent user must be able to read the files.
|
||||
|
||||
### The monasca.json_plugin.status Metric
|
||||
|
||||
The plugin reports a metric called "monasca.json_plugin.status". A single metric is reported by the
|
||||
JSON plugin. If there are problems, you can examine the value_meta. It will contain a list
|
||||
of problem paths/messages. You to create an alarm to trigger if there is
|
||||
a problem processing any JSON file.
|
||||
|
||||
The monasca.json_plugin.status metric has the following information:
|
||||
|
||||
| Field | Description |
|
||||
| --------- | ----------- |
|
||||
| name | "monasca.json_plugin.status" -- the name of the metric |
|
||||
| value | A value of 0.0 is normal -- there are no issues processing all JSON files. A value of 1.0 indicates there is a problem. |
|
||||
| value_meta | Value meta is only present when the value is 1.0. The value meta contains a "msg" key indicating the problem. |
|
||||
|
||||
The value_meta/msg reports problems such as:
|
||||
|
||||
- Failure to open the JSON file
|
||||
- Invalid JSON syntax
|
||||
- That metrics are older than the stale_age
|
||||
|
||||
|
||||
## Kafka Checks
|
||||
This section describes the Kafka check that can be performed by the Agent. The Kafka check requires a configuration file called kafka.yaml to be available in the agent conf.d configuration directory.
|
||||
|
||||
|
327
monasca_agent/collector/checks_d/json_plugin.py
Normal file
327
monasca_agent/collector/checks_d/json_plugin.py
Normal file
@ -0,0 +1,327 @@
|
||||
# (C) Copyright 2016 Hewlett Packard Enterprise Development LP
|
||||
|
||||
|
||||
from copy import deepcopy
|
||||
import errno
|
||||
import fcntl
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
|
||||
from monasca_agent.collector import checks
|
||||
|
||||
|
||||
OK = 0
|
||||
FAIL = 1
|
||||
|
||||
# name used for metrics reported directly by this module
|
||||
PLUGIN_METRIC_NAME = 'monasca.json_plugin.status'
|
||||
|
||||
# Assumes metrics file written every 60 seconds
|
||||
DEFAULT_STALE_AGE = 60 * 4 # These are too old to report
|
||||
|
||||
# Valid attributes of a metric
|
||||
METRIC_KEYS = ['name', 'metric', 'timestamp', 'value', 'dimensions',
|
||||
'value_meta']
|
||||
|
||||
|
||||
def _now():
|
||||
"""Makes unit testing easier"""
|
||||
return time.time()
|
||||
|
||||
|
||||
class JsonPlugin(checks.AgentCheck):
|
||||
"""Read measurements from JSON-formatted files
|
||||
|
||||
This plugin reads measurements from JSON-formatted files.
|
||||
|
||||
The format of the file is shown in the following example:
|
||||
|
||||
{
|
||||
"stale_age": 300,
|
||||
"replace_timestamps": false,
|
||||
"measurements: [
|
||||
{
|
||||
"metric": "a_metric",
|
||||
"dimensions: ["dim1": "val1"],
|
||||
"value: 30.0,
|
||||
"timestamp": 1474644040
|
||||
},
|
||||
{
|
||||
"metric": "second_metric",
|
||||
"dimensions: ["dim2": "val2"],
|
||||
"value: 22.4,
|
||||
"timestamp": 1474644040
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
In effect, the file contains a header and a list of measurements. The
|
||||
header has the following fields:
|
||||
|
||||
stale_age:
|
||||
|
||||
A time in seconds. If the timestamp of a measurement is
|
||||
older than this, this plugin reports a json_plugin.check metric
|
||||
with a value of 1. The value_meta contains the name of
|
||||
the JSON file that is aged.
|
||||
|
||||
This header is optional. It defaults to 4 minutes
|
||||
|
||||
replace_timestamps:
|
||||
|
||||
A boolean. If set, the next time the plugin is called, it will
|
||||
send all the measurements with a timestamp equal to the current
|
||||
time (ignoring the timestamp in the measurements list). The
|
||||
stale_age value is ignored with this setting.
|
||||
|
||||
This header is optional. It defaults to false.
|
||||
|
||||
measurements:
|
||||
|
||||
This is a list of the measurements, formatted in the same way
|
||||
that measurements are presented to the Monasca API. However,
|
||||
if replace_timestamps is set, the timestamp key can be omitted
|
||||
(since it is set to current time).
|
||||
|
||||
An alternate format for the file is that the header is omitted, i.e.,
|
||||
if the first item in the file is a list, it is assumed this is the
|
||||
measurement list and the header values are defaulted.
|
||||
"""
|
||||
|
||||
def __init__(self, name, init_config, agent_config, instances=None,
|
||||
logger=None):
|
||||
super(JsonPlugin, self).__init__(name, init_config, agent_config,
|
||||
instances)
|
||||
self.log = logger or self.log
|
||||
self.plugin_failures = {}
|
||||
self.now = time.time()
|
||||
self.posted_metrics = {}
|
||||
|
||||
def _plugin_failed(self, file_name, msg):
|
||||
self.plugin_failures[file_name] = msg
|
||||
self.log.warn('%s: %s' % (file_name, msg))
|
||||
|
||||
def _plugin_check_metric(self):
|
||||
"""Generate metric to report status of the plugin"""
|
||||
plugin_metric = dict(metric=PLUGIN_METRIC_NAME,
|
||||
dimensions={},
|
||||
value=OK,
|
||||
timestamp=self.now)
|
||||
# If there were any failures, put the path
|
||||
# and error message into value_meta.
|
||||
errors = []
|
||||
for path, err in self.plugin_failures.items():
|
||||
if err:
|
||||
errors.append('%s: %s' % (path, err))
|
||||
msg = ''
|
||||
if errors:
|
||||
msg = ', '.join(errors)
|
||||
if msg:
|
||||
if len(msg) > 1024: # keep well below length limit
|
||||
msg = msg[:-1021] + '...'
|
||||
plugin_metric.update(dict(value_meta=dict(msg=msg),
|
||||
value=FAIL))
|
||||
return plugin_metric
|
||||
|
||||
@staticmethod
|
||||
def _take_shared_lock(fd):
|
||||
"""Take shared lock on a file descriptor
|
||||
|
||||
Assuming the writer of the JSON file also takes a lock, this
|
||||
function locks a file descriptor so that we can read the file
|
||||
without worrying that the content is changing as we read.
|
||||
|
||||
Raises IOError if lock cannot be taken after a number of attempts.
|
||||
|
||||
:param fd: the file descriptor of the file being read
|
||||
"""
|
||||
max_retries = 5
|
||||
delay = 0.02
|
||||
attempts = 0
|
||||
while True:
|
||||
attempts += 1
|
||||
try:
|
||||
fcntl.flock(fd, fcntl.LOCK_SH | fcntl.LOCK_NB)
|
||||
break
|
||||
except IOError as err:
|
||||
if (err.errno not in [errno.EWOULDBLOCK, errno.EACCES] or
|
||||
attempts > max_retries):
|
||||
raise
|
||||
time.sleep(delay * attempts)
|
||||
|
||||
def _load_measurements_from_file(self, file_name):
|
||||
handling = {}
|
||||
file_data = {'measurements': []}
|
||||
try:
|
||||
with open(file_name, 'r') as f:
|
||||
self._take_shared_lock(f)
|
||||
f.seek(0)
|
||||
file_data = json.load(f)
|
||||
except (ValueError, TypeError) as e:
|
||||
self._plugin_failed(file_name,
|
||||
'failed parsing json: %s' % e)
|
||||
except Exception as e: # noqa
|
||||
self._plugin_failed(file_name,
|
||||
'loading error: %s' % e)
|
||||
try:
|
||||
if isinstance(file_data, list):
|
||||
metrics = file_data
|
||||
handling['stale_age'] = DEFAULT_STALE_AGE
|
||||
handling['replace_timestamps'] = False
|
||||
else:
|
||||
metrics = file_data.get('measurements', [])
|
||||
handling['stale_age'] = file_data.get('stale_age',
|
||||
DEFAULT_STALE_AGE)
|
||||
handling['replace_timestamps'] = file_data.get(
|
||||
'replace_timestamps', False)
|
||||
except Exception as e: # noqa
|
||||
self._plugin_failed(file_name,
|
||||
'unable to process file contents: %s' % e)
|
||||
metrics = []
|
||||
|
||||
metrics = self._filter_metrics(metrics, file_name)
|
||||
return self._remove_duplicate_metrics(handling, metrics, file_name)
|
||||
|
||||
def _filter_metrics(self, metrics, file_name):
|
||||
"""Remove invalid metrics from the metric list
|
||||
|
||||
Validate and clean up so the metric is suitable for passing to
|
||||
AgentCheck.gauge(). The metric might be invalid (e.g., value_meta too
|
||||
long), but that's not our concern here.
|
||||
"""
|
||||
invalid_metric = None
|
||||
valid_metrics = []
|
||||
for metric in metrics:
|
||||
if not isinstance(metric, dict):
|
||||
invalid_metric = metric # not a dict
|
||||
continue
|
||||
for key in metric.keys():
|
||||
if key not in METRIC_KEYS:
|
||||
invalid_metric = metric # spurious attribute
|
||||
continue
|
||||
if 'name' not in metric.keys() and 'metric' not in metric.keys():
|
||||
invalid_metric = metric # missing name
|
||||
continue
|
||||
if 'value' not in metric.keys():
|
||||
invalid_metric = metric # missing value
|
||||
continue
|
||||
|
||||
if 'name' in metric:
|
||||
# API use 'name'; AgentCheck uses 'metric'
|
||||
metric['metric'] = metric.get('name')
|
||||
del metric['name']
|
||||
if not metric.get('dimensions', None):
|
||||
metric['dimensions'] = {}
|
||||
valid_metrics.append(metric)
|
||||
|
||||
if invalid_metric:
|
||||
# Only report one invalid metric per file
|
||||
self._plugin_failed(file_name, 'invalid metric found: %s' % metric)
|
||||
return valid_metrics
|
||||
|
||||
def _remove_duplicate_metrics(self, handling, metrics, file_name):
|
||||
"""Remove metrics if we've already reported them
|
||||
|
||||
We track the metrics we've posted to the Monasca Agent This allows us
|
||||
to discard duplicate metrics. The most common cause of duplicates is
|
||||
that the agent runs more often than the update period of the JSON file.
|
||||
|
||||
We also discard metrics that seem stale. This can occur when the
|
||||
program creating the metrics file has died, so the JSON file
|
||||
does not update with new metrics.
|
||||
|
||||
:param: handling: options for how measurements are handled
|
||||
:param metrics: The metrics we found in the JSON file
|
||||
:param file_name: the path of the JSON file
|
||||
:returns: A list of metrics that should be posted
|
||||
"""
|
||||
|
||||
# Set timestamp if asked
|
||||
if handling['replace_timestamps']:
|
||||
for metric in metrics:
|
||||
metric['timestamp'] = self.now
|
||||
# Since we've set the timestamp, these are unique (not duplicate)
|
||||
# so no further processing is required
|
||||
return metrics
|
||||
|
||||
# Remove metrics we've already posted. Also remove stale metrics.
|
||||
if file_name not in self.posted_metrics:
|
||||
self.posted_metrics[file_name] = []
|
||||
stale_metrics = False
|
||||
for metric in deepcopy(metrics):
|
||||
if ((self.now - metric.get('timestamp', 0)) >
|
||||
handling.get('stale_age')):
|
||||
metrics.remove(metric) # too old
|
||||
stale_metrics = True
|
||||
elif metric in self.posted_metrics[file_name]:
|
||||
metrics.remove(metric) # already sent to Monasca
|
||||
else:
|
||||
# New metric; not stale.
|
||||
self.posted_metrics[file_name].append(metric)
|
||||
|
||||
# Purge really old metrics from posted
|
||||
for metric in list(self.posted_metrics[file_name]):
|
||||
if ((self.now - metric.get('timestamp', 0)) >=
|
||||
handling.get('stale_age') * 2):
|
||||
self.posted_metrics[file_name].remove(metric)
|
||||
|
||||
if stale_metrics:
|
||||
self._plugin_failed(file_name, 'Metrics are older than %s seconds;'
|
||||
' file not updating?' %
|
||||
handling.get('stale_age'))
|
||||
return metrics
|
||||
|
||||
def _get_metrics(self):
|
||||
reported = []
|
||||
for file_name in self.metrics_files:
|
||||
metrics = self._load_measurements_from_file(file_name)
|
||||
for metric in metrics:
|
||||
reported.append(metric)
|
||||
return reported
|
||||
|
||||
def _load_instance_config(self, instance):
|
||||
self.metrics_files = []
|
||||
self.metrics_dir = instance.get('metrics_dir', '')
|
||||
if self.metrics_dir:
|
||||
self.plugin_failures[self.metrics_dir] = ''
|
||||
try:
|
||||
file_names = os.listdir(self.metrics_dir)
|
||||
for name in [os.path.join(self.metrics_dir, name) for
|
||||
name in file_names]:
|
||||
# .json extension protects from reading .swp and similar
|
||||
if os.path.isfile(name) and name.lower().endswith('.json'):
|
||||
self.metrics_files.append(name)
|
||||
except OSError as err:
|
||||
self._plugin_failed(self.metrics_dir,
|
||||
'Error processing: %s' % err)
|
||||
else:
|
||||
metric_file = instance.get('metrics_file', '')
|
||||
if metric_file:
|
||||
self.metrics_files = [metric_file]
|
||||
self.log.debug('Using metrics files %s' % ','.join(self.metrics_files))
|
||||
for file_name in self.metrics_files:
|
||||
self.plugin_failures[file_name] = ''
|
||||
|
||||
def check(self, instance):
|
||||
self._load_instance_config(instance)
|
||||
all_metrics = []
|
||||
self.now = _now()
|
||||
|
||||
# Load measurements from files
|
||||
metrics = self._get_metrics()
|
||||
all_metrics.extend(metrics)
|
||||
|
||||
# Add this plugin status
|
||||
all_metrics.append(self._plugin_check_metric())
|
||||
|
||||
for metric in all_metrics:
|
||||
# apply any instance dimensions that may be configured,
|
||||
# overriding any dimension with same key that check has set.
|
||||
metric['dimensions'] = self._set_dimensions(metric['dimensions'],
|
||||
instance)
|
||||
self.log.debug('Posting metric: %s' % metric)
|
||||
try:
|
||||
self.gauge(**metric)
|
||||
except Exception as e: # noqa
|
||||
self.log.exception('Exception while reporting metric: %s' % e)
|
40
monasca_setup/detection/plugins/json_plugin.py
Normal file
40
monasca_setup/detection/plugins/json_plugin.py
Normal file
@ -0,0 +1,40 @@
|
||||
# (c) Copyright 2016 Hewlett Packard Enterprise Development LP
|
||||
|
||||
from monasca_setup import agent_config
|
||||
import monasca_setup.detection
|
||||
import os
|
||||
|
||||
|
||||
VAR_CACHE_DIR = '/var/cache/monasca_json_plugin'
|
||||
|
||||
|
||||
class JsonPlugin(monasca_setup.detection.ArgsPlugin):
|
||||
"""Detect if /var/cache/monasca_json_plugin exists
|
||||
|
||||
This builds a config for the json_plugin. This detects if
|
||||
/var/cache/monasca_json_plugin exists and if so,
|
||||
builds a configuration for it.
|
||||
|
||||
Users are free to add their own configs.
|
||||
"""
|
||||
def __init__(self, template_dir, overwrite=True, args=None):
|
||||
super(JsonPlugin, self).__init__(
|
||||
template_dir, overwrite, args)
|
||||
|
||||
def _detect(self):
|
||||
self.available = False
|
||||
if os.path.isdir(VAR_CACHE_DIR):
|
||||
self.available = True
|
||||
|
||||
def build_config(self):
|
||||
"""Build the config as a Plugins object and return."""
|
||||
config = agent_config.Plugins()
|
||||
config['json_plugin'] = {'init_config': None,
|
||||
'instances': [{'name': VAR_CACHE_DIR,
|
||||
'metrics_dir': VAR_CACHE_DIR}]}
|
||||
|
||||
return config
|
||||
|
||||
def dependencies_installed(self):
|
||||
"""Return True if dependencies are installed."""
|
||||
return True
|
364
tests/checks_d/test_json_plugin.py
Normal file
364
tests/checks_d/test_json_plugin.py
Normal file
@ -0,0 +1,364 @@
|
||||
# (C) Copyright 2016 Hewlett Packard Enterprise Development LP
|
||||
|
||||
import fcntl
|
||||
import json
|
||||
from shutil import rmtree
|
||||
from socket import gethostname
|
||||
import tempfile
|
||||
import os
|
||||
import unittest
|
||||
|
||||
from monasca_agent.collector.checks_d import json_plugin
|
||||
import monasca_agent.common.config
|
||||
|
||||
|
||||
HOSTNAME = gethostname()
|
||||
|
||||
|
||||
def _create_agent_conf():
|
||||
# create a temp conf file
|
||||
tempdir = tempfile.mkdtemp()
|
||||
conf_file = os.path.join(tempdir, 'agent.yaml')
|
||||
with open(conf_file, 'wb') as fd:
|
||||
fd.write(
|
||||
"""
|
||||
Logging:
|
||||
collector_log_file: /var/log/monasca/agent/collector.log
|
||||
forwarder_log_file: /var/log/monasca/agent/forwarder.log
|
||||
log_level: DEBUG
|
||||
statsd_log_file: /var/log/monasca/agent/statsd.log
|
||||
Main:
|
||||
check_freq: 60
|
||||
dimensions: {{}}
|
||||
hostname: {hostname}
|
||||
""".format(hostname=HOSTNAME)
|
||||
)
|
||||
|
||||
config = monasca_agent.common.config.Config(conf_file)
|
||||
# clean up
|
||||
rmtree(tempdir, ignore_errors=True)
|
||||
return config
|
||||
|
||||
|
||||
fake_now = 1
|
||||
|
||||
|
||||
def FakeNow():
|
||||
global fake_now
|
||||
return fake_now
|
||||
|
||||
|
||||
class MockJsonPlugin(json_plugin.JsonPlugin):
|
||||
def __init__(self):
|
||||
super(MockJsonPlugin, self).__init__(
|
||||
name='json_plugin',
|
||||
init_config=_create_agent_conf(),
|
||||
instances=[],
|
||||
agent_config={}
|
||||
)
|
||||
self._metrics = []
|
||||
|
||||
def check(self, instance):
|
||||
self._metrics = []
|
||||
return super(MockJsonPlugin, self).check(instance)
|
||||
|
||||
def gauge(self, **kwargs):
|
||||
self._metrics.append(kwargs)
|
||||
|
||||
|
||||
def metricsDiffer(expected, actual_orig, ignore_timestamps=True):
|
||||
expected = list(expected)
|
||||
actual = list(actual_orig)
|
||||
if ignore_timestamps:
|
||||
for metric in expected:
|
||||
metric['timestamp'] = 'ts'
|
||||
for metric in actual:
|
||||
metric['timestamp'] = 'ts'
|
||||
for metric in list(expected):
|
||||
if metric not in actual:
|
||||
return 'Expected...\n%s\n ...is missing from actual:\n%s' %\
|
||||
(metrics_sort(metric), metrics_sort(actual_orig))
|
||||
actual.remove(metric)
|
||||
if actual:
|
||||
return 'Unexpected (i.e., extra) metrics:\n%s' % metrics_sort(actual)
|
||||
return ''
|
||||
|
||||
|
||||
def metrics_repr(metric):
|
||||
m = ''
|
||||
for key in ['timestamp', 'metric', 'value', 'dimensions', 'value_meta']:
|
||||
m += '%s ' % metric.get(key, '-')
|
||||
return m
|
||||
|
||||
|
||||
def metrics_sort(metrics):
|
||||
"""Makes it easier to debug failed asserts"""
|
||||
if isinstance(metrics, list):
|
||||
mlist = []
|
||||
for metric in metrics:
|
||||
mlist.append(metrics_repr(metric))
|
||||
mlist.sort()
|
||||
else:
|
||||
mlist = [metrics_repr(metrics)]
|
||||
return '\n'.join(mlist)
|
||||
|
||||
|
||||
def write_metrics_file(file_name, metrics, replace_timestamps=False,
|
||||
stale_age=None):
|
||||
file_data = {'replace_timestamps': replace_timestamps,
|
||||
'measurements': []}
|
||||
if stale_age:
|
||||
file_data.update({'stale_age': stale_age})
|
||||
for metric in metrics:
|
||||
file_data['measurements'].append(metric)
|
||||
with open(file_name, mode='w') as fd:
|
||||
fd.write(json.dumps(file_data))
|
||||
|
||||
|
||||
def make_expected(metrics, file_name, now, ts_override=None):
|
||||
expected = []
|
||||
for metric in list(metrics):
|
||||
if ts_override:
|
||||
metric['timestamp'] = ts_override
|
||||
metric['dimensions'].update({'hostname': HOSTNAME})
|
||||
expected.append(metric)
|
||||
json_plugin_status = {'metric': 'monasca.json_plugin.status', 'value': 0,
|
||||
'dimensions': {'hostname': HOSTNAME},
|
||||
'timestamp': now}
|
||||
expected.append(json_plugin_status)
|
||||
return expected
|
||||
|
||||
|
||||
class JsonPluginCheckTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
super(JsonPluginCheckTest, self).setUp()
|
||||
self.json_plugin = MockJsonPlugin()
|
||||
|
||||
def test_no_config(self):
|
||||
self.json_plugin.check({})
|
||||
|
||||
def test_metric_dir(self):
|
||||
tempdir = tempfile.mkdtemp()
|
||||
# Empty metrics_dir:
|
||||
self.json_plugin.check({'dimensions': {},
|
||||
'metrics_dir': tempdir})
|
||||
self.assertEqual([], self.json_plugin.metrics_files)
|
||||
expected = [
|
||||
{'metric': 'monasca.json_plugin.status', 'value': 0,
|
||||
'dimensions': {'hostname': HOSTNAME}}]
|
||||
differs = metricsDiffer(expected, self.json_plugin._metrics)
|
||||
self.assertEqual('', differs, msg=differs)
|
||||
|
||||
# Create json files:
|
||||
file1 = os.path.join(tempdir, 'file1.json')
|
||||
file2 = os.path.join(tempdir, 'file2.json')
|
||||
for metric_file in [file1, file2]:
|
||||
with open(metric_file, mode='w') as fd:
|
||||
fd.write('[]')
|
||||
self.json_plugin.check({'dimensions': {},
|
||||
'metrics_dir': tempdir})
|
||||
self.assertIn(file1, self.json_plugin.metrics_files)
|
||||
self.assertIn(file2, self.json_plugin.metrics_files)
|
||||
rmtree(tempdir, ignore_errors=True)
|
||||
|
||||
expected = [
|
||||
{'metric': 'monasca.json_plugin.status', 'value': 0,
|
||||
'dimensions': {'hostname': HOSTNAME}}
|
||||
]
|
||||
differs = metricsDiffer(expected, self.json_plugin._metrics)
|
||||
self.assertEqual('', differs, msg=differs)
|
||||
|
||||
def test_bad_json_reporting(self):
|
||||
global fake_now
|
||||
tempdir = tempfile.mkdtemp()
|
||||
file1 = os.path.join(tempdir, 'file1.json')
|
||||
with open(file1, mode='w') as fd:
|
||||
fd.write('{')
|
||||
self.json_plugin.check({'dimensions': {},
|
||||
'metrics_file': file1})
|
||||
rmtree(tempdir, ignore_errors=True)
|
||||
for now in [1000, 2000]:
|
||||
fake_now = now
|
||||
expected = [{'metric': 'monasca.json_plugin.status', 'value': 1,
|
||||
'dimensions': {'hostname': HOSTNAME},
|
||||
'value_meta': {
|
||||
'msg': '%s: failed parsing json: Expecting'
|
||||
' object: line 1'
|
||||
' column 1 (char 0)' % file1}}]
|
||||
differs = metricsDiffer(expected, self.json_plugin._metrics)
|
||||
self.assertEqual('', differs, msg=differs)
|
||||
|
||||
def test_replaced_timestamps(self):
|
||||
global fake_now
|
||||
json_plugin._now = FakeNow
|
||||
tempdir = tempfile.mkdtemp()
|
||||
file1 = os.path.join(tempdir, 'file1.json')
|
||||
metrics = [
|
||||
{'metric': 'name1', 'value': 1,
|
||||
'dimensions': {'dim1': 'dim1val'}},
|
||||
{'metric': 'name2', 'value': 2,
|
||||
'dimensions': {'dim2': 'dim2val'}}
|
||||
]
|
||||
|
||||
write_metrics_file(file1, metrics, replace_timestamps=True)
|
||||
for now in [1000, 2000]:
|
||||
fake_now = now
|
||||
expected = make_expected(metrics, file1, now, ts_override=now)
|
||||
self.json_plugin.check({'dimensions': {},
|
||||
'metrics_file': file1})
|
||||
differs = metricsDiffer(expected, self.json_plugin._metrics,
|
||||
ignore_timestamps=False)
|
||||
self.assertEqual('', differs, msg=differs)
|
||||
rmtree(tempdir, ignore_errors=True)
|
||||
|
||||
def test_with_timestamps(self):
|
||||
global fake_now
|
||||
json_plugin._now = FakeNow
|
||||
tempdir = tempfile.mkdtemp()
|
||||
file1 = os.path.join(tempdir, 'file1.json')
|
||||
metrics = [
|
||||
{'metric': 'name1', 'value': 1,
|
||||
'dimensions': {'dim1': 'dim1val'}},
|
||||
{'metric': 'name2', 'value': 2,
|
||||
'dimensions': {'dim2': 'dim2val'}}
|
||||
]
|
||||
for now in [1000, 2000]:
|
||||
fake_now = now
|
||||
for metric in metrics:
|
||||
metric['timestamp'] = now
|
||||
write_metrics_file(file1, metrics, replace_timestamps=False,
|
||||
stale_age=3000)
|
||||
expected = make_expected(metrics, file1, now)
|
||||
self.json_plugin.check({'dimensions': {},
|
||||
'metrics_file': file1})
|
||||
differs = metricsDiffer(expected, self.json_plugin._metrics,
|
||||
ignore_timestamps=False)
|
||||
self.assertEqual('', differs, msg=differs)
|
||||
rmtree(tempdir, ignore_errors=True)
|
||||
|
||||
def test_with_stale_age(self):
|
||||
global fake_now
|
||||
json_plugin._now = FakeNow
|
||||
tempdir = tempfile.mkdtemp()
|
||||
file1 = os.path.join(tempdir, 'file1.json')
|
||||
metrics = [
|
||||
{'metric': 'name1', 'value': 1,
|
||||
'dimensions': {'dim1': 'dim1val'}},
|
||||
{'metric': 'name2', 'value': 2,
|
||||
'dimensions': {'dim2': 'dim2val'}}
|
||||
]
|
||||
now = 1000
|
||||
fake_now = now
|
||||
for metric in metrics:
|
||||
metric['timestamp'] = now
|
||||
write_metrics_file(file1, metrics, replace_timestamps=False,
|
||||
stale_age=500)
|
||||
expected = make_expected(metrics, file1, now, ts_override=now)
|
||||
self.json_plugin.check({'dimensions': {},
|
||||
'metrics_file': file1})
|
||||
differs = metricsDiffer(expected, self.json_plugin._metrics,
|
||||
ignore_timestamps=False)
|
||||
self.assertEqual('', differs, msg=differs)
|
||||
|
||||
# Time moves on, but don't re-write the metrics file
|
||||
now = 2000
|
||||
fake_now = now
|
||||
expected = [{'metric': 'monasca.json_plugin.status', 'value': 1,
|
||||
'dimensions': {'hostname': HOSTNAME},
|
||||
'value_meta': {
|
||||
'msg': '%s: Metrics are older than 500 seconds;'
|
||||
' file not updating?' % file1}}]
|
||||
self.json_plugin.check({'dimensions': {},
|
||||
'metrics_file': file1})
|
||||
differs = metricsDiffer(expected, self.json_plugin._metrics,
|
||||
ignore_timestamps=True)
|
||||
self.assertEqual('', differs, msg=differs)
|
||||
rmtree(tempdir, ignore_errors=True)
|
||||
|
||||
def test_no_duplicates(self):
|
||||
global fake_now
|
||||
json_plugin._now = FakeNow
|
||||
tempdir = tempfile.mkdtemp()
|
||||
file1 = os.path.join(tempdir, 'file1.json')
|
||||
metrics = [
|
||||
{'metric': 'name1', 'value': 1,
|
||||
'dimensions': {'dim1': 'dim1val'}},
|
||||
{'metric': 'name2', 'value': 2,
|
||||
'dimensions': {'dim2': 'dim2val'}}
|
||||
]
|
||||
now = 1000
|
||||
fake_now = now
|
||||
for metric in metrics:
|
||||
metric['timestamp'] = now
|
||||
write_metrics_file(file1, metrics, replace_timestamps=False,
|
||||
stale_age=5000)
|
||||
expected = make_expected(metrics, file1, now, ts_override=now)
|
||||
self.json_plugin.check({'dimensions': {},
|
||||
'metrics_file': file1})
|
||||
differs = metricsDiffer(expected, self.json_plugin._metrics,
|
||||
ignore_timestamps=False)
|
||||
self.assertEqual('', differs, msg=differs)
|
||||
|
||||
# Time moves on, but don't re-write the metrics file
|
||||
now = 2000
|
||||
fake_now = now
|
||||
# We don't get the metrics from the file again -- just the plugin
|
||||
# status metric
|
||||
expected = [{'metric': 'monasca.json_plugin.status', 'value': 0,
|
||||
'dimensions': {'hostname': HOSTNAME},
|
||||
'timestamp': now}]
|
||||
self.json_plugin.check({'dimensions': {},
|
||||
'metrics_file': file1})
|
||||
differs = metricsDiffer(expected, self.json_plugin._metrics,
|
||||
ignore_timestamps=False)
|
||||
self.assertEqual('', differs, msg=differs)
|
||||
rmtree(tempdir, ignore_errors=True)
|
||||
|
||||
def test_validate_metrics(self):
|
||||
metrics = [
|
||||
{'metric': 'ok1', 'value': 1},
|
||||
{'name': 'ok2', 'value': 2},
|
||||
{'metric': 'ok3', 'value': 3, 'dimensions': {}, 'value_meta': {},
|
||||
'timestamp': 123},
|
||||
{'metric': 'bad1'},
|
||||
{'metric': 'bad2', 'junk_key': 'extra'},
|
||||
{'value': 1, 'value_meta': {'msg': 'no name or metric key'}},
|
||||
{'metric': 'ok4', 'value': 1},
|
||||
]
|
||||
valid = self.json_plugin._filter_metrics(metrics, 'dummy.json')
|
||||
self.assertTrue('dummy.json' in self.json_plugin.plugin_failures)
|
||||
self.assertEqual(4, len(valid))
|
||||
|
||||
def test_posted_metrics_are_purged(self):
|
||||
global fake_now
|
||||
json_plugin._now = FakeNow
|
||||
tempdir = tempfile.mkdtemp()
|
||||
file1 = os.path.join(tempdir, 'file1.json')
|
||||
metrics = [
|
||||
{'metric': 'name1', 'value': 1,
|
||||
'dimensions': {'dim1': 'dim1val'}},
|
||||
{'metric': 'name2', 'value': 2,
|
||||
'dimensions': {'dim2': 'dim2val'}}
|
||||
]
|
||||
for now in [1000, 2000, 3000, 4000, 5000, 6000]:
|
||||
fake_now = now
|
||||
for metric in metrics:
|
||||
metric['timestamp'] = now
|
||||
write_metrics_file(file1, metrics, replace_timestamps=False,
|
||||
stale_age=2000)
|
||||
self.json_plugin.check({'dimensions': {},
|
||||
'metrics_file': file1})
|
||||
for metric in self.json_plugin.posted_metrics[file1]:
|
||||
self.assertTrue(metric.get('timestamp', 0) >= 2001, 'not purged')
|
||||
self.assertTrue(len(self.json_plugin.posted_metrics[file1]) > 0,
|
||||
'posted metrics not being cached')
|
||||
rmtree(tempdir, ignore_errors=True)
|
||||
|
||||
def test_take_lock(self):
|
||||
tempdir = tempfile.mkdtemp()
|
||||
file1 = os.path.join(tempdir, 'file1.json')
|
||||
with open(file1, 'w') as fd_writer:
|
||||
with open(file1, 'r') as fd_reader:
|
||||
fcntl.flock(fd_writer, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
with self.assertRaises(IOError):
|
||||
json_plugin.JsonPlugin._take_shared_lock(fd_reader)
|
36
tests/detection/test_json_plugin.py
Normal file
36
tests/detection/test_json_plugin.py
Normal file
@ -0,0 +1,36 @@
|
||||
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
from monasca_setup.detection.plugins import json_plugin
|
||||
|
||||
|
||||
class TestJsonPlugin(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
unittest.TestCase.setUp(self)
|
||||
self.plugin_obj = json_plugin.JsonPlugin('temp_dir')
|
||||
self.varcachedir = tempfile.mkdtemp()
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.varcachedir)
|
||||
|
||||
def test_var_cache_exists(self):
|
||||
json_plugin.VAR_CACHE_DIR = self.varcachedir
|
||||
self.plugin_obj._detect()
|
||||
result = self.plugin_obj.build_config()
|
||||
self.assertTrue(self.plugin_obj.available)
|
||||
self.assertEqual(result['json_plugin']['instances'],
|
||||
[{'name': self.varcachedir,
|
||||
'metrics_dir': self.varcachedir}])
|
||||
|
||||
def test_var_cache_not_exists(self):
|
||||
json_plugin.VAR_CACHE_DIR = os.path.join(self.varcachedir, 'dummy')
|
||||
self.plugin_obj._detect()
|
||||
self.assertFalse(self.plugin_obj.available)
|
||||
|
||||
def test_dependencies_installed(self):
|
||||
self.assertTrue(self.plugin_obj.dependencies_installed())
|
Loading…
Reference in New Issue
Block a user