2014-09-16 16:15:49 -06:00
|
|
|
#!/usr/bin/env python
|
|
|
|
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
|
|
|
#
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
# You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
|
|
# implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
|
2014-11-24 19:50:28 -07:00
|
|
|
"""Persister Module
|
2014-09-16 16:15:49 -06:00
|
|
|
|
2014-11-24 19:50:28 -07:00
|
|
|
The Persister reads metrics and alarms from Kafka and then stores them
|
|
|
|
in InfluxDB.
|
|
|
|
|
|
|
|
Start the perister as stand-alone process by running 'persister.py
|
|
|
|
--config-file <config file>'
|
|
|
|
|
|
|
|
Also able to use Openstack service to start the persister.
|
2014-09-16 16:15:49 -06:00
|
|
|
|
|
|
|
"""
|
2014-10-03 09:38:12 -06:00
|
|
|
|
|
|
|
import abc
|
2014-10-02 08:33:19 -06:00
|
|
|
from datetime import datetime
|
2014-09-16 16:15:49 -06:00
|
|
|
import json
|
2014-11-19 15:20:27 -07:00
|
|
|
import os
|
2014-10-03 09:38:12 -06:00
|
|
|
import six
|
2014-09-16 16:15:49 -06:00
|
|
|
import sys
|
2014-10-02 08:33:19 -06:00
|
|
|
import threading
|
|
|
|
|
|
|
|
from influxdb import InfluxDBClient
|
2015-05-13 07:44:46 -06:00
|
|
|
import pytz
|
2014-09-16 16:15:49 -06:00
|
|
|
from oslo.config import cfg
|
|
|
|
|
|
|
|
from openstack.common import log
|
|
|
|
from openstack.common import service as os_service
|
|
|
|
import service
|
|
|
|
|
2015-10-27 16:04:02 -06:00
|
|
|
from monasca_common.kafka.consumer import KafkaConsumer
|
|
|
|
|
2014-10-02 08:33:19 -06:00
|
|
|
|
2014-09-16 16:15:49 -06:00
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
|
2014-12-02 13:44:20 -07:00
|
|
|
zookeeper_opts = [cfg.StrOpt('uri'),
|
|
|
|
cfg.IntOpt('partition_interval_recheck_seconds')]
|
|
|
|
|
|
|
|
zookeeper_group = cfg.OptGroup(name='zookeeper', title='zookeeper')
|
|
|
|
cfg.CONF.register_group(zookeeper_group)
|
|
|
|
cfg.CONF.register_opts(zookeeper_opts, zookeeper_group)
|
|
|
|
|
2014-12-01 11:13:59 -07:00
|
|
|
kafka_common_opts = [cfg.StrOpt('uri'),
|
|
|
|
cfg.StrOpt('group_id'),
|
|
|
|
cfg.StrOpt('topic'),
|
|
|
|
cfg.StrOpt('consumer_id'),
|
|
|
|
cfg.StrOpt('client_id'),
|
|
|
|
cfg.IntOpt('database_batch_size'),
|
|
|
|
cfg.IntOpt('max_wait_time_seconds'),
|
|
|
|
cfg.IntOpt('fetch_size_bytes'),
|
|
|
|
cfg.IntOpt('buffer_size'),
|
2014-12-02 13:44:20 -07:00
|
|
|
cfg.IntOpt('max_buffer_size'),
|
|
|
|
cfg.StrOpt('zookeeper_path')]
|
2014-12-01 11:13:59 -07:00
|
|
|
|
|
|
|
kafka_metrics_opts = kafka_common_opts
|
|
|
|
kafka_alarm_history_opts = kafka_common_opts
|
2014-11-24 19:50:28 -07:00
|
|
|
|
|
|
|
kafka_metrics_group = cfg.OptGroup(name='kafka_metrics', title='kafka_metrics')
|
|
|
|
kafka_alarm_history_group = cfg.OptGroup(name='kafka_alarm_history',
|
|
|
|
title='kafka_alarm_history')
|
|
|
|
|
|
|
|
cfg.CONF.register_group(kafka_metrics_group)
|
|
|
|
cfg.CONF.register_group(kafka_alarm_history_group)
|
|
|
|
cfg.CONF.register_opts(kafka_metrics_opts, kafka_metrics_group)
|
|
|
|
cfg.CONF.register_opts(kafka_alarm_history_opts, kafka_alarm_history_group)
|
2014-09-16 16:15:49 -06:00
|
|
|
|
2014-11-20 09:27:58 -07:00
|
|
|
influxdb_opts = [cfg.StrOpt('database_name'),
|
|
|
|
cfg.StrOpt('ip_address'),
|
|
|
|
cfg.StrOpt('port'),
|
|
|
|
cfg.StrOpt('user'),
|
2014-10-03 09:38:12 -06:00
|
|
|
cfg.StrOpt('password')]
|
2014-09-16 16:15:49 -06:00
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
influxdb_group = cfg.OptGroup(name='influxdb', title='influxdb')
|
2014-09-16 16:15:49 -06:00
|
|
|
cfg.CONF.register_group(influxdb_group)
|
|
|
|
cfg.CONF.register_opts(influxdb_opts, influxdb_group)
|
|
|
|
|
2015-05-13 07:44:46 -06:00
|
|
|
cfg.CONF(sys.argv[1:], project='monasca', prog='persister')
|
|
|
|
log_levels = (cfg.CONF.default_log_levels)
|
|
|
|
cfg.set_defaults(log.log_opts, default_log_levels=log_levels)
|
|
|
|
log.setup("monasca-persister")
|
2014-09-16 16:15:49 -06:00
|
|
|
|
|
|
|
def main():
|
2014-12-01 11:13:59 -07:00
|
|
|
"""Start persister.
|
2014-09-16 16:15:49 -06:00
|
|
|
|
2014-12-01 11:13:59 -07:00
|
|
|
Start metric persister and alarm persister in separate threads.
|
|
|
|
"""
|
|
|
|
|
|
|
|
metric_persister = MetricPersister(cfg.CONF.kafka_metrics,
|
2014-12-02 13:44:20 -07:00
|
|
|
cfg.CONF.influxdb,
|
|
|
|
cfg.CONF.zookeeper)
|
|
|
|
|
2014-12-01 11:13:59 -07:00
|
|
|
alarm_persister = AlarmPersister(cfg.CONF.kafka_alarm_history,
|
2014-12-02 13:44:20 -07:00
|
|
|
cfg.CONF.influxdb,
|
|
|
|
cfg.CONF.zookeeper)
|
2014-11-19 09:57:37 -07:00
|
|
|
|
2014-12-01 11:13:59 -07:00
|
|
|
metric_persister.start()
|
|
|
|
alarm_persister.start()
|
2014-09-16 16:15:49 -06:00
|
|
|
|
2014-12-01 11:13:59 -07:00
|
|
|
LOG.info('''
|
2014-11-24 19:50:28 -07:00
|
|
|
|
|
|
|
_____
|
|
|
|
/ \ ____ ____ _____ ______ ____ _____
|
|
|
|
/ \ / \ / _ \ / \\\__ \ / ___// ___\\\__ \\
|
|
|
|
/ Y ( <_> ) | \/ __ \_\___ \\ \___ / __ \\_
|
|
|
|
\____|__ /\____/|___| (____ /____ >\___ >____ /
|
|
|
|
\/ \/ \/ \/ \/ \/
|
|
|
|
__________ .__ __
|
|
|
|
\______ \ ___________ _____|__| _______/ |_ ___________
|
|
|
|
| ___// __ \_ __ \/ ___/ |/ ___/\ __\/ __ \_ __ \\
|
|
|
|
| | \ ___/| | \/\___ \| |\___ \ | | \ ___/| | \/
|
|
|
|
|____| \___ >__| /____ >__/____ > |__| \___ >__|
|
|
|
|
\/ \/ \/ \/
|
|
|
|
|
|
|
|
''')
|
|
|
|
|
2014-12-01 11:13:59 -07:00
|
|
|
LOG.info('Monasca Persister has started successfully!')
|
|
|
|
|
|
|
|
|
|
|
|
def shutdown_all_threads_and_die():
|
|
|
|
"""Shut down all threads and exit process.
|
|
|
|
|
|
|
|
Hit it with a hammer to kill all threads and die. May cause duplicate
|
|
|
|
messages in kafka queue to be reprocessed when the persister starts again.
|
|
|
|
Happens if the persister dies just after sending metrics and alarms to the
|
2014-12-01 13:16:11 -07:00
|
|
|
DB but does not reach the commit.
|
2014-12-01 11:13:59 -07:00
|
|
|
"""
|
|
|
|
|
|
|
|
os._exit(1)
|
2014-11-20 09:27:58 -07:00
|
|
|
|
2014-09-16 16:15:49 -06:00
|
|
|
|
|
|
|
class Persister(os_service.Service):
|
|
|
|
"""Class used with Openstack service.
|
|
|
|
"""
|
|
|
|
|
2014-11-19 15:20:27 -07:00
|
|
|
def __init__(self, threads=1):
|
2014-11-19 09:57:37 -07:00
|
|
|
super(Persister, self).__init__(threads)
|
|
|
|
|
2014-11-19 15:20:27 -07:00
|
|
|
def start(self):
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
2014-11-19 09:57:37 -07:00
|
|
|
main()
|
|
|
|
|
2014-11-20 09:27:58 -07:00
|
|
|
except:
|
2014-12-01 11:13:59 -07:00
|
|
|
LOG.exception('Persister encountered fatal error. '
|
|
|
|
'Shutting down all threads and exiting.')
|
|
|
|
shutdown_all_threads_and_die()
|
2014-09-16 16:15:49 -06:00
|
|
|
|
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
@six.add_metaclass(abc.ABCMeta)
|
|
|
|
class AbstractPersister(threading.Thread):
|
2014-12-02 13:44:20 -07:00
|
|
|
def __init__(self, kafka_conf, influxdb_conf, zookeeper_conf):
|
2014-11-19 15:20:27 -07:00
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
super(AbstractPersister, self).__init__()
|
|
|
|
|
2015-10-27 16:04:02 -06:00
|
|
|
self._data_points = []
|
2014-11-24 19:50:28 -07:00
|
|
|
|
2014-12-02 13:44:20 -07:00
|
|
|
self._kafka_topic = kafka_conf.topic
|
|
|
|
|
2015-10-27 16:04:02 -06:00
|
|
|
self._database_batch_size = kafka_conf.database_batch_size
|
|
|
|
|
|
|
|
self._consumer = KafkaConsumer(kafka_conf.uri,
|
|
|
|
zookeeper_conf.uri,
|
|
|
|
kafka_conf.zookeeper_path,
|
|
|
|
kafka_conf.group_id,
|
|
|
|
kafka_conf.topic,
|
|
|
|
repartition_callback=self._flush,
|
2015-11-09 10:50:20 -07:00
|
|
|
commit_callback=self._flush,
|
|
|
|
commit_timeout=kafka_conf.max_wait_time_seconds)
|
2015-10-27 16:04:02 -06:00
|
|
|
|
2014-11-24 19:50:28 -07:00
|
|
|
self._influxdb_client = InfluxDBClient(influxdb_conf.ip_address,
|
|
|
|
influxdb_conf.port,
|
|
|
|
influxdb_conf.user,
|
|
|
|
influxdb_conf.password,
|
|
|
|
influxdb_conf.database_name)
|
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
@abc.abstractmethod
|
|
|
|
def process_message(self, message):
|
|
|
|
pass
|
2014-09-16 16:15:49 -06:00
|
|
|
|
2015-10-27 16:04:02 -06:00
|
|
|
def _flush(self):
|
|
|
|
if not self._data_points:
|
|
|
|
return
|
2015-05-13 07:44:46 -06:00
|
|
|
|
2015-10-27 16:04:02 -06:00
|
|
|
try:
|
|
|
|
self._influxdb_client.write_points(self._data_points, 'ms')
|
2015-05-13 07:44:46 -06:00
|
|
|
|
2014-12-08 08:39:33 -07:00
|
|
|
LOG.info("Processed {} messages from topic '{}'".format(
|
2015-10-27 16:04:02 -06:00
|
|
|
len(self._data_points), self._kafka_topic))
|
2015-05-13 07:44:46 -06:00
|
|
|
|
|
|
|
self._data_points = []
|
2015-10-27 16:04:02 -06:00
|
|
|
self._consumer.commit()
|
|
|
|
except Exception:
|
|
|
|
LOG.exception("Error writing to influxdb: {}"
|
|
|
|
.format(self._data_points))
|
|
|
|
raise
|
2015-05-13 07:44:46 -06:00
|
|
|
|
2015-10-27 16:04:02 -06:00
|
|
|
def run(self):
|
|
|
|
try:
|
|
|
|
for raw_message in self._consumer:
|
2014-12-02 13:44:20 -07:00
|
|
|
try:
|
2015-10-27 16:04:02 -06:00
|
|
|
message = raw_message[1]
|
2015-02-10 10:54:49 -07:00
|
|
|
data_point = self.process_message(message)
|
2015-05-13 07:44:46 -06:00
|
|
|
self._data_points.append(data_point)
|
2014-12-02 13:44:20 -07:00
|
|
|
except Exception:
|
|
|
|
LOG.exception('Error processing message. Message is '
|
|
|
|
'being dropped. {}'.format(message))
|
|
|
|
|
2015-10-27 16:04:02 -06:00
|
|
|
if len(self._data_points) >= self._database_batch_size:
|
|
|
|
self._flush()
|
2014-11-20 09:27:58 -07:00
|
|
|
except:
|
2014-09-16 16:15:49 -06:00
|
|
|
LOG.exception(
|
2014-11-24 19:50:28 -07:00
|
|
|
'Persister encountered fatal exception processing messages. '
|
|
|
|
'Shutting down all threads and exiting')
|
2014-12-01 11:13:59 -07:00
|
|
|
shutdown_all_threads_and_die()
|
2014-09-16 16:15:49 -06:00
|
|
|
|
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
class AlarmPersister(AbstractPersister):
|
|
|
|
"""Class for persisting alarms.
|
|
|
|
"""
|
|
|
|
|
2014-12-02 13:44:20 -07:00
|
|
|
def __init__(self, kafka_conf, influxdb_conf, zookeeper_conf):
|
2014-11-19 15:20:27 -07:00
|
|
|
|
2014-12-02 13:44:20 -07:00
|
|
|
super(AlarmPersister, self).__init__(kafka_conf,
|
|
|
|
influxdb_conf,
|
|
|
|
zookeeper_conf)
|
2014-11-21 09:08:01 -07:00
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
def process_message(self, message):
|
2014-11-19 15:20:27 -07:00
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug(message.message.value.decode('utf8'))
|
|
|
|
|
|
|
|
decoded = json.loads(message.message.value)
|
|
|
|
LOG.debug(json.dumps(decoded, sort_keys=True, indent=4))
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
alarm_transitioned = decoded['alarm-transitioned']
|
|
|
|
|
|
|
|
actions_enabled = alarm_transitioned['actionsEnabled']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('actions enabled: %s', actions_enabled)
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
alarm_description = alarm_transitioned['alarmDescription']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('alarm description: %s', alarm_description)
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
alarm_id = alarm_transitioned['alarmId']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('alarm id: %s', alarm_id)
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
alarm_definition_id = alarm_transitioned[
|
2014-10-03 09:38:12 -06:00
|
|
|
'alarmDefinitionId']
|
|
|
|
LOG.debug('alarm definition id: %s', alarm_definition_id)
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
metrics = alarm_transitioned['metrics']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('metrics: %s', metrics)
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
alarm_name = alarm_transitioned['alarmName']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('alarm name: %s', alarm_name)
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
new_state = alarm_transitioned['newState']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('new state: %s', new_state)
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
old_state = alarm_transitioned['oldState']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('old state: %s', old_state)
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
state_change_reason = alarm_transitioned[
|
2014-10-03 09:38:12 -06:00
|
|
|
'stateChangeReason']
|
|
|
|
LOG.debug('state change reason: %s', state_change_reason)
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
tenant_id = alarm_transitioned['tenantId']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('tenant id: %s', tenant_id)
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
time_stamp = alarm_transitioned['timestamp']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('time stamp: %s', time_stamp)
|
|
|
|
|
2015-05-13 07:44:46 -06:00
|
|
|
sub_alarms = alarm_transitioned['subAlarms']
|
|
|
|
|
2015-10-13 09:22:54 -06:00
|
|
|
if sub_alarms:
|
|
|
|
|
|
|
|
sub_alarms_json = json.dumps(sub_alarms, ensure_ascii=False)
|
|
|
|
|
|
|
|
sub_alarms_json_snake_case = sub_alarms_json.replace(
|
|
|
|
'"subAlarmExpression":',
|
|
|
|
'"sub_alarm_expression":')
|
|
|
|
|
|
|
|
sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace(
|
|
|
|
'"metricDefinition":',
|
|
|
|
'"metric_definition":')
|
|
|
|
|
|
|
|
sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace(
|
|
|
|
'"subAlarmState":',
|
|
|
|
'"sub_alarm_state":')
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
sub_alarms_json_snake_case = "[]"
|
|
|
|
|
2015-05-13 07:44:46 -06:00
|
|
|
ts = time_stamp / 1000.0
|
|
|
|
|
2015-06-25 14:57:53 -06:00
|
|
|
data = {"measurement": 'alarm_state_history',
|
2015-11-19 10:53:09 -07:00
|
|
|
"time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime(
|
2015-05-13 07:44:46 -06:00
|
|
|
'%Y-%m-%dT%H:%M:%S.%fZ'),
|
|
|
|
"fields": {
|
|
|
|
"tenant_id": tenant_id.encode('utf8'),
|
|
|
|
"alarm_id": alarm_id.encode('utf8'),
|
|
|
|
"metrics": json.dumps(metrics, ensure_ascii=False).encode('utf8'),
|
|
|
|
"new_state": new_state.encode('utf8'),
|
|
|
|
"old_state": old_state.encode('utf8'),
|
|
|
|
"reason": state_change_reason.encode('utf8'),
|
2015-06-25 14:57:53 -06:00
|
|
|
"reason_data": "{}".encode('utf8'),
|
2015-10-13 09:22:54 -06:00
|
|
|
"sub_alarms": sub_alarms_json_snake_case.encode('utf8')
|
2015-05-13 07:44:46 -06:00
|
|
|
},
|
|
|
|
"tags": {
|
|
|
|
"tenant_id": tenant_id.encode('utf8')
|
|
|
|
}}
|
2014-10-03 09:38:12 -06:00
|
|
|
|
|
|
|
LOG.debug(data)
|
2014-11-21 09:08:01 -07:00
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
|
|
class MetricPersister(AbstractPersister):
|
2014-09-16 16:15:49 -06:00
|
|
|
"""Class for persisting metrics.
|
|
|
|
"""
|
|
|
|
|
2014-12-02 13:44:20 -07:00
|
|
|
def __init__(self, kafka_conf, influxdb_conf, zookeeper_conf):
|
2014-09-16 16:15:49 -06:00
|
|
|
|
2014-12-02 13:44:20 -07:00
|
|
|
super(MetricPersister, self).__init__(kafka_conf,
|
|
|
|
influxdb_conf,
|
|
|
|
zookeeper_conf)
|
2014-11-21 09:08:01 -07:00
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
def process_message(self, message):
|
2014-09-16 16:15:49 -06:00
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug(message.message.value.decode('utf8'))
|
2014-09-16 16:15:49 -06:00
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
decoded = json.loads(message.message.value)
|
|
|
|
LOG.debug(json.dumps(decoded, sort_keys=True, indent=4))
|
2014-10-02 08:33:19 -06:00
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
metric = decoded['metric']
|
|
|
|
|
|
|
|
metric_name = metric['name']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('name: %s', metric_name)
|
2014-09-16 16:15:49 -06:00
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
creation_time = decoded['creation_time']
|
|
|
|
LOG.debug('creation time: %s', creation_time)
|
|
|
|
|
|
|
|
region = decoded['meta']['region']
|
|
|
|
LOG.debug('region: %s', region)
|
|
|
|
|
|
|
|
tenant_id = decoded['meta']['tenantId']
|
|
|
|
LOG.debug('tenant id: %s', tenant_id)
|
|
|
|
|
|
|
|
dimensions = {}
|
2014-10-08 17:06:33 -06:00
|
|
|
if 'dimensions' in metric:
|
|
|
|
for dimension_name in metric['dimensions']:
|
2015-05-15 14:25:46 -06:00
|
|
|
dimensions[dimension_name.encode('utf8')] = (
|
|
|
|
metric['dimensions'][dimension_name].encode('utf8'))
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('dimension: %s : %s', dimension_name,
|
|
|
|
dimensions[dimension_name])
|
|
|
|
|
2014-10-08 17:06:33 -06:00
|
|
|
time_stamp = metric['timestamp']
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('timestamp %s', time_stamp)
|
|
|
|
|
2015-11-02 12:30:43 -06:00
|
|
|
value = float(metric['value'])
|
2014-10-03 09:38:12 -06:00
|
|
|
LOG.debug('value: %s', value)
|
|
|
|
|
2015-10-08 14:05:03 -06:00
|
|
|
if 'value_meta' in metric and metric['value_meta']:
|
2015-05-13 07:44:46 -06:00
|
|
|
|
|
|
|
value_meta = metric['value_meta']
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
2015-10-13 17:05:03 -06:00
|
|
|
value_meta = {}
|
2015-05-13 07:44:46 -06:00
|
|
|
|
|
|
|
LOG.debug('value_meta: %s', value_meta)
|
|
|
|
|
2015-05-15 14:25:46 -06:00
|
|
|
tags = dimensions
|
|
|
|
tags['_tenant_id'] = tenant_id.encode('utf8')
|
|
|
|
tags['_region'] = region.encode('utf8')
|
|
|
|
|
2015-05-13 07:44:46 -06:00
|
|
|
ts = time_stamp / 1000.0
|
|
|
|
|
2015-06-19 14:02:14 -06:00
|
|
|
data = {"measurement": metric_name.encode('utf8'),
|
|
|
|
"time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime(
|
2015-05-13 07:44:46 -06:00
|
|
|
'%Y-%m-%dT%H:%M:%S.%fZ'),
|
|
|
|
"fields": {
|
|
|
|
"value": value,
|
2015-06-19 14:02:14 -06:00
|
|
|
"value_meta": json.dumps(value_meta,
|
2015-10-13 17:05:03 -06:00
|
|
|
ensure_ascii=False).encode('utf8')
|
2015-05-13 07:44:46 -06:00
|
|
|
},
|
2015-05-15 14:25:46 -06:00
|
|
|
"tags": tags}
|
2014-10-03 09:38:12 -06:00
|
|
|
|
|
|
|
LOG.debug(data)
|
2014-11-19 15:20:27 -07:00
|
|
|
|
2014-10-03 09:38:12 -06:00
|
|
|
return data
|
2014-09-16 16:15:49 -06:00
|
|
|
|
|
|
|
|
2014-09-29 09:34:48 -06:00
|
|
|
def main_service():
|
2014-09-16 16:15:49 -06:00
|
|
|
"""Method to use with Openstack service.
|
|
|
|
"""
|
|
|
|
|
|
|
|
service.prepare_service()
|
|
|
|
launcher = os_service.ServiceLauncher()
|
|
|
|
launcher.launch_service(Persister())
|
|
|
|
launcher.wait()
|
|
|
|
|
|
|
|
# Used if run without Openstack service.
|
|
|
|
if __name__ == "__main__":
|
2014-11-20 09:27:58 -07:00
|
|
|
sys.exit(main())
|