Add TCP publisher

This commit adds a TCP publisher. The publisher works similarly
to the UDP publisher.

Change-Id: Iac662018039e74ad59ac9c7fa4db994da540ef2f
This commit is contained in:
jwysogla 2022-11-28 13:18:16 -05:00
parent e496b5d212
commit ed55b5f4a4
4 changed files with 321 additions and 1 deletions

View File

@ -0,0 +1,94 @@
#
# Copyright 2022 Red Hat, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Publish a sample using a TCP mechanism
"""
import socket
import msgpack
from oslo_log import log
from oslo_utils import netutils
import ceilometer
from ceilometer.i18n import _
from ceilometer import publisher
from ceilometer.publisher import utils
LOG = log.getLogger(__name__)
class TCPPublisher(publisher.ConfigPublisherBase):
def __init__(self, conf, parsed_url):
super(TCPPublisher, self).__init__(conf, parsed_url)
self.host, self.port = netutils.parse_host_port(
parsed_url.netloc, default_port=4952)
addrinfo = None
try:
addrinfo = socket.getaddrinfo(self.host, None, socket.AF_INET6,
socket.SOCK_STREAM)[0]
except socket.gaierror:
try:
addrinfo = socket.getaddrinfo(self.host, None, socket.AF_INET,
socket.SOCK_STREAM)[0]
except socket.gaierror:
pass
if addrinfo:
self.addr_family = addrinfo[0]
else:
LOG.warning(
"Cannot resolve host %s, creating AF_INET socket...",
self.host)
self.addr_family = socket.AF_INET
self.create_and_connect()
def create_and_connect(self):
self.socket = socket.socket(self.addr_family,
socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
def publish_samples(self, samples):
"""Send a metering message for publishing
:param samples: Samples from pipeline after transformation
"""
for sample in samples:
msg = utils.meter_message_from_counter(
sample, self.conf.publisher.telemetry_secret, self.conf.host)
host = self.host
port = self.port
LOG.debug("Publishing sample %(msg)s over TCP to "
"%(host)s:%(port)d", {'msg': msg, 'host': host,
'port': port})
encoded_msg = msgpack.dumps(msg, use_bin_type=True)
msg_len = len(encoded_msg).to_bytes(8, 'little')
try:
self.socket.send(msg_len + encoded_msg)
except Exception:
LOG.warning(_("Unable to send sample over TCP,"
"trying to reconnect and resend the message"))
self.create_and_connect()
try:
self.socket.send(msg_len + encoded_msg)
except Exception:
LOG.exception(_("Unable to reconnect and resend"
"sample over TCP"))
def publish_events(self, events):
"""Send an event message for publishing
:param events: events from pipeline after transformation
"""
raise ceilometer.NotImplementedError

View File

@ -114,7 +114,7 @@ def verify_signature(message, secret):
return secretutils.constant_time_compare(new_sig, old_sig)
def meter_message_from_counter(sample, secret):
def meter_message_from_counter(sample, secret, publisher_id=None):
"""Make a metering message ready to be published or stored.
Returns a dictionary containing a metering message
@ -135,6 +135,8 @@ def meter_message_from_counter(sample, secret):
'message_id': sample.id,
'monotonic_time': sample.monotonic_time,
}
if publisher_id is not None:
msg['publisher_id'] = publisher_id
msg['message_signature'] = compute_signature(msg, secret)
return msg

View File

@ -0,0 +1,223 @@
#
# Copyright 2022 Red Hat, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer/publisher/tcp.py"""
import datetime
from unittest import mock
import msgpack
from oslo_utils import netutils
from oslotest import base
from ceilometer.publisher import tcp
from ceilometer.publisher import utils
from ceilometer import sample
from ceilometer import service
COUNTER_SOURCE = 'testsource'
class TestTCPPublisher(base.BaseTestCase):
test_data = [
sample.Sample(
name='test',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
source=COUNTER_SOURCE,
),
sample.Sample(
name='test',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
source=COUNTER_SOURCE,
),
sample.Sample(
name='test2',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
source=COUNTER_SOURCE,
),
sample.Sample(
name='test2',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
source=COUNTER_SOURCE,
),
sample.Sample(
name='test3',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
source=COUNTER_SOURCE,
),
]
@staticmethod
def _make_fake_socket(published):
def _fake_socket_socket(family, type):
def record_data(msg):
msg_length = int.from_bytes(msg[0:8], "little")
published.append(msg[8:msg_length + 8])
def connect(dst):
pass
tcp_socket = mock.Mock()
tcp_socket.send = record_data
tcp_socket.connect = connect
return tcp_socket
return _fake_socket_socket
def setUp(self):
super(TestTCPPublisher, self).setUp()
self.CONF = service.prepare_service([], [])
self.CONF.publisher.telemetry_secret = 'not-so-secret'
def test_published(self):
self.data_sent = []
with mock.patch('socket.socket',
self._make_fake_socket(self.data_sent)):
publisher = tcp.TCPPublisher(
self.CONF,
netutils.urlsplit('tcp://somehost'))
publisher.publish_samples(self.test_data)
self.assertEqual(5, len(self.data_sent))
sent_counters = []
for data in self.data_sent:
counter = msgpack.loads(data, raw=False)
sent_counters.append(counter)
# Check that counters are equal
def sort_func(counter):
return counter['counter_name']
counters = [utils.meter_message_from_counter(d,
"not-so-secret",
publisher.conf.host)
for d in self.test_data]
counters.sort(key=sort_func)
sent_counters.sort(key=sort_func)
self.assertEqual(counters, sent_counters)
@staticmethod
def _make_disconnecting_socket(published, connections):
def _fake_socket_socket(family, type):
def record_data(msg):
if len(published) == len(connections) - 1:
# Raise for every each first send attempt to
# trigger a reconnection attempt and send the data
# correctly after reconnecting
raise IOError
msg_length = int.from_bytes(msg[0:8], "little")
published.append(msg[8:msg_length + 8])
def record_connection(dest):
connections.append(dest)
tcp_socket = mock.Mock()
tcp_socket.send = record_data
tcp_socket.connect = record_connection
return tcp_socket
return _fake_socket_socket
def test_reconnect(self):
self.data_sent = []
self.connections = []
with mock.patch('socket.socket',
self._make_disconnecting_socket(self.data_sent,
self.connections)):
publisher = tcp.TCPPublisher(
self.CONF,
netutils.urlsplit('tcp://somehost'))
publisher.publish_samples(self.test_data)
sent_counters = []
for data in self.data_sent:
counter = msgpack.loads(data, raw=False)
sent_counters.append(counter)
for connection in self.connections:
# Check destination
self.assertEqual(('somehost', 4952), connection)
self.assertEqual(len(self.connections) - 1, len(self.data_sent))
# Check that counters are equal
def sort_func(counter):
return counter['counter_name']
counters = [utils.meter_message_from_counter(d,
"not-so-secret",
publisher.conf.host)
for d in self.test_data]
counters.sort(key=sort_func)
sent_counters.sort(key=sort_func)
self.assertEqual(counters, sent_counters)
@staticmethod
def _raise_ioerror(*args):
raise IOError
def _make_broken_socket(self, family, type):
def connect(dst):
pass
tcp_socket = mock.Mock()
tcp_socket.send = self._raise_ioerror
tcp_socket.connect = connect
return tcp_socket
def test_publish_error(self):
with mock.patch('socket.socket',
self._make_broken_socket):
publisher = tcp.TCPPublisher(
self.CONF,
netutils.urlsplit('tcp://localhost'))
publisher.publish_samples(self.test_data)

View File

@ -180,6 +180,7 @@ ceilometer.sample.publisher =
test = ceilometer.publisher.test:TestPublisher
notifier = ceilometer.publisher.messaging:SampleNotifierPublisher
udp = ceilometer.publisher.udp:UDPPublisher
tcp = ceilometer.publisher.tcp:TCPPublisher
file = ceilometer.publisher.file:FilePublisher
http = ceilometer.publisher.http:HttpPublisher
prometheus = ceilometer.publisher.prometheus:PrometheusPublisher