104 lines
3.9 KiB
Python
104 lines
3.9 KiB
Python
#
|
|
# Copyright 2022 Red Hat, Inc
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
"""Publish a sample using a TCP mechanism
|
|
"""
|
|
|
|
import socket
|
|
|
|
import msgpack
|
|
from oslo_log import log
|
|
from oslo_utils import netutils
|
|
|
|
import ceilometer
|
|
from ceilometer.i18n import _
|
|
from ceilometer import publisher
|
|
from ceilometer.publisher import utils
|
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
|
|
class TCPPublisher(publisher.ConfigPublisherBase):
|
|
def __init__(self, conf, parsed_url):
|
|
super(TCPPublisher, self).__init__(conf, parsed_url)
|
|
self.inet_addr = netutils.parse_host_port(
|
|
parsed_url.netloc, default_port=4952)
|
|
self.socket = None
|
|
self.connect_socket()
|
|
|
|
def connect_socket(self):
|
|
try:
|
|
self.socket = socket.create_connection(self.inet_addr)
|
|
return True
|
|
except socket.gaierror:
|
|
LOG.error(_("Unable to resolv the remote %(host)s") %
|
|
{'host': self.inet_addr[0],
|
|
'port': self.inet_addr[1]})
|
|
except TimeoutError:
|
|
LOG.error(_("Unable to connect to the remote endpoint "
|
|
"%(host)s:%(port)d. The connection timed out.") %
|
|
{'host': self.inet_addr[0],
|
|
'port': self.inet_addr[1]})
|
|
except ConnectionRefusedError:
|
|
LOG.error(_("Unable to connect to the remote endpoint "
|
|
"%(host)s:%(port)d. Connection refused.") %
|
|
{'host': self.inet_addr[0],
|
|
'port': self.inet_addr[1]})
|
|
return False
|
|
|
|
def publish_samples(self, samples):
|
|
"""Send a metering message for publishing
|
|
|
|
:param samples: Samples from pipeline after transformation
|
|
"""
|
|
|
|
for sample in samples:
|
|
msg = utils.meter_message_from_counter(
|
|
sample, self.conf.publisher.telemetry_secret, self.conf.host)
|
|
LOG.debug("Publishing sample %(msg)s over TCP to "
|
|
"%(host)s:%(port)d",
|
|
{'msg': msg,
|
|
'host': self.inet_addr[0],
|
|
'port': self.inet_addr[1]})
|
|
encoded_msg = msgpack.dumps(msg, use_bin_type=True)
|
|
msg_len = len(encoded_msg).to_bytes(8, 'little')
|
|
if self.socket:
|
|
try:
|
|
self.socket.send(msg_len + encoded_msg)
|
|
continue
|
|
except OSError:
|
|
LOG.warning(_("Unable to send sample over TCP, trying "
|
|
"to reconnect and resend the message"))
|
|
if self.connect_socket():
|
|
try:
|
|
self.socket.send(msg_len + encoded_msg)
|
|
continue
|
|
except OSError:
|
|
pass
|
|
LOG.error(_("Unable to reconnect and resend sample over TCP"))
|
|
# NOTE (jokke): We do not handle exceptions in the calling code
|
|
# so raising the exception from here needs quite a bit more work.
|
|
# Same time we don't want to spam the retry messages as it's
|
|
# unlikely to change between iterations on this loop. 'break'
|
|
# rather than 'return' even the end result is the same feels
|
|
# more appropriate for now.
|
|
break
|
|
|
|
def publish_events(self, events):
|
|
"""Send an event message for publishing
|
|
|
|
:param events: events from pipeline after transformation
|
|
"""
|
|
raise ceilometer.NotImplementedError
|