153 lines
4.6 KiB
Python
153 lines
4.6 KiB
Python
#
|
|
# Copyright 2024 cmss, inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import json
|
|
import time
|
|
|
|
from oslo_log import log
|
|
from oslo_utils import timeutils
|
|
|
|
from ceilometer.publisher import http
|
|
from ceilometer import sample as smp
|
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
|
|
class OpentelemetryHttpPublisher(http.HttpPublisher):
|
|
"""Publish metering data to Opentelemetry Collector endpoint
|
|
|
|
This dispatcher inherits from all options of the http dispatcher.
|
|
|
|
To use this publisher for samples, add the following section to the
|
|
/etc/ceilometer/pipeline.yaml file or simply add it to an existing
|
|
pipeline::
|
|
|
|
- name: meter_file
|
|
meters:
|
|
- "*"
|
|
publishers:
|
|
- opentelemetryhttp://opentelemetry-http-ip:4318/v1/metrics
|
|
|
|
"""
|
|
|
|
HEADERS = {'Content-type': 'application/json'}
|
|
|
|
@staticmethod
|
|
def get_attribute_model(key, value):
|
|
return {
|
|
"key": key,
|
|
"value": {
|
|
"string_value": value
|
|
}
|
|
}
|
|
|
|
def get_attributes_model(self, sample):
|
|
attributes = []
|
|
resource_id_attr = self.get_attribute_model("resource_id",
|
|
sample.resource_id)
|
|
user_id_attr = self.get_attribute_model("user_id", sample.user_id)
|
|
project_id_attr = self.get_attribute_model("project_id",
|
|
sample.project_id)
|
|
|
|
attributes.append(resource_id_attr)
|
|
attributes.append(user_id_attr)
|
|
attributes.append(project_id_attr)
|
|
|
|
return attributes
|
|
|
|
@staticmethod
|
|
def get_metrics_model(sample, data_points):
|
|
name = sample.name.replace(".", "_")
|
|
desc = str(sample.name) + " unit:" + sample.unit
|
|
unit = sample.unit
|
|
metrics = dict()
|
|
metric_type = None
|
|
if sample.type == smp.TYPE_CUMULATIVE:
|
|
metric_type = "counter"
|
|
else:
|
|
metric_type = "gauge"
|
|
metrics.update({
|
|
"name": name,
|
|
"description": desc,
|
|
"unit": unit,
|
|
metric_type: {"data_points": data_points}
|
|
})
|
|
return metrics
|
|
|
|
@staticmethod
|
|
def get_data_points_model(timestamp, attributes, volume):
|
|
data_points = dict()
|
|
struct_time = timeutils.parse_isotime(timestamp).timetuple()
|
|
unix_time = int(time.mktime(struct_time))
|
|
data_points.update({
|
|
'attributes': attributes,
|
|
"start_time_unix_nano": unix_time,
|
|
"time_unix_nano": unix_time,
|
|
"as_double": volume,
|
|
"flags": 0
|
|
})
|
|
return data_points
|
|
|
|
def get_data_model(self, sample, data_points):
|
|
metrics = [self.get_metrics_model(sample, data_points)]
|
|
data = {
|
|
"resource_metrics": [{
|
|
"scope_metrics": [{
|
|
"scope": {
|
|
"name": "ceilometer",
|
|
"version": "v1"
|
|
},
|
|
"metrics": metrics
|
|
}]
|
|
}]
|
|
}
|
|
return data
|
|
|
|
def get_data_points(self, sample):
|
|
# attributes contain basic metadata
|
|
attributes = self.get_attributes_model(sample)
|
|
try:
|
|
return [self.get_data_points_model(
|
|
sample.timestamp, attributes, sample.volume)]
|
|
except Exception as e:
|
|
LOG.warning("Get data point error, %s" % e)
|
|
return []
|
|
|
|
def get_opentelemetry_model(self, sample):
|
|
data_points = self.get_data_points(sample)
|
|
if data_points:
|
|
data = self.get_data_model(sample, data_points)
|
|
return data
|
|
else:
|
|
return None
|
|
|
|
def publish_samples(self, samples):
|
|
"""Send a metering message for publishing
|
|
|
|
:param samples: Samples from pipeline after transformation
|
|
"""
|
|
if not samples:
|
|
LOG.warning('Data samples is empty!')
|
|
return
|
|
|
|
for s in samples:
|
|
data = self.get_opentelemetry_model(s)
|
|
if data:
|
|
self._do_post(json.dumps(data))
|
|
|
|
@staticmethod
|
|
def publish_events(events):
|
|
raise NotImplementedError
|