diff --git a/ceilometer/publisher/tcp.py b/ceilometer/publisher/tcp.py new file mode 100644 index 0000000000..ef5e802d20 --- /dev/null +++ b/ceilometer/publisher/tcp.py @@ -0,0 +1,94 @@ +# +# Copyright 2022 Red Hat, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Publish a sample using a TCP mechanism +""" + +import socket + +import msgpack +from oslo_log import log +from oslo_utils import netutils + +import ceilometer +from ceilometer.i18n import _ +from ceilometer import publisher +from ceilometer.publisher import utils + +LOG = log.getLogger(__name__) + + +class TCPPublisher(publisher.ConfigPublisherBase): + def __init__(self, conf, parsed_url): + super(TCPPublisher, self).__init__(conf, parsed_url) + self.host, self.port = netutils.parse_host_port( + parsed_url.netloc, default_port=4952) + addrinfo = None + try: + addrinfo = socket.getaddrinfo(self.host, None, socket.AF_INET6, + socket.SOCK_STREAM)[0] + except socket.gaierror: + try: + addrinfo = socket.getaddrinfo(self.host, None, socket.AF_INET, + socket.SOCK_STREAM)[0] + except socket.gaierror: + pass + if addrinfo: + self.addr_family = addrinfo[0] + else: + LOG.warning( + "Cannot resolve host %s, creating AF_INET socket...", + self.host) + self.addr_family = socket.AF_INET + self.create_and_connect() + + def create_and_connect(self): + self.socket = socket.socket(self.addr_family, + socket.SOCK_STREAM) + self.socket.connect((self.host, self.port)) + + def publish_samples(self, samples): + """Send a metering message for publishing + + :param samples: Samples from pipeline after transformation + """ + + for sample in samples: + msg = utils.meter_message_from_counter( + sample, self.conf.publisher.telemetry_secret, self.conf.host) + host = self.host + port = self.port + LOG.debug("Publishing sample %(msg)s over TCP to " + "%(host)s:%(port)d", {'msg': msg, 'host': host, + 'port': port}) + encoded_msg = msgpack.dumps(msg, use_bin_type=True) + msg_len = len(encoded_msg).to_bytes(8, 'little') + try: + self.socket.send(msg_len + encoded_msg) + except Exception: + LOG.warning(_("Unable to send sample over TCP," + "trying to reconnect and resend the message")) + self.create_and_connect() + try: + self.socket.send(msg_len + encoded_msg) + except Exception: + LOG.exception(_("Unable to reconnect and resend" + "sample over TCP")) + + def publish_events(self, events): + """Send an event message for publishing + + :param events: events from pipeline after transformation + """ + raise ceilometer.NotImplementedError diff --git a/ceilometer/publisher/utils.py b/ceilometer/publisher/utils.py index 0d1e7be0fb..58d799a382 100644 --- a/ceilometer/publisher/utils.py +++ b/ceilometer/publisher/utils.py @@ -114,7 +114,7 @@ def verify_signature(message, secret): return secretutils.constant_time_compare(new_sig, old_sig) -def meter_message_from_counter(sample, secret): +def meter_message_from_counter(sample, secret, publisher_id=None): """Make a metering message ready to be published or stored. Returns a dictionary containing a metering message @@ -135,6 +135,8 @@ def meter_message_from_counter(sample, secret): 'message_id': sample.id, 'monotonic_time': sample.monotonic_time, } + if publisher_id is not None: + msg['publisher_id'] = publisher_id msg['message_signature'] = compute_signature(msg, secret) return msg diff --git a/ceilometer/tests/unit/publisher/test_tcp.py b/ceilometer/tests/unit/publisher/test_tcp.py new file mode 100644 index 0000000000..737d6fb3cc --- /dev/null +++ b/ceilometer/tests/unit/publisher/test_tcp.py @@ -0,0 +1,223 @@ +# +# Copyright 2022 Red Hat, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Tests for ceilometer/publisher/tcp.py""" + +import datetime +from unittest import mock + +import msgpack +from oslo_utils import netutils +from oslotest import base + +from ceilometer.publisher import tcp +from ceilometer.publisher import utils +from ceilometer import sample +from ceilometer import service + + +COUNTER_SOURCE = 'testsource' + + +class TestTCPPublisher(base.BaseTestCase): + test_data = [ + sample.Sample( + name='test', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + source=COUNTER_SOURCE, + ), + sample.Sample( + name='test', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + source=COUNTER_SOURCE, + ), + sample.Sample( + name='test2', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + source=COUNTER_SOURCE, + ), + sample.Sample( + name='test2', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + source=COUNTER_SOURCE, + ), + sample.Sample( + name='test3', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + source=COUNTER_SOURCE, + ), + ] + + @staticmethod + def _make_fake_socket(published): + def _fake_socket_socket(family, type): + def record_data(msg): + msg_length = int.from_bytes(msg[0:8], "little") + published.append(msg[8:msg_length + 8]) + + def connect(dst): + pass + + tcp_socket = mock.Mock() + tcp_socket.send = record_data + tcp_socket.connect = connect + return tcp_socket + + return _fake_socket_socket + + def setUp(self): + super(TestTCPPublisher, self).setUp() + self.CONF = service.prepare_service([], []) + self.CONF.publisher.telemetry_secret = 'not-so-secret' + + def test_published(self): + self.data_sent = [] + with mock.patch('socket.socket', + self._make_fake_socket(self.data_sent)): + publisher = tcp.TCPPublisher( + self.CONF, + netutils.urlsplit('tcp://somehost')) + publisher.publish_samples(self.test_data) + + self.assertEqual(5, len(self.data_sent)) + + sent_counters = [] + + for data in self.data_sent: + counter = msgpack.loads(data, raw=False) + sent_counters.append(counter) + + # Check that counters are equal + def sort_func(counter): + return counter['counter_name'] + + counters = [utils.meter_message_from_counter(d, + "not-so-secret", + publisher.conf.host) + for d in self.test_data] + counters.sort(key=sort_func) + sent_counters.sort(key=sort_func) + self.assertEqual(counters, sent_counters) + + @staticmethod + def _make_disconnecting_socket(published, connections): + def _fake_socket_socket(family, type): + def record_data(msg): + if len(published) == len(connections) - 1: + # Raise for every each first send attempt to + # trigger a reconnection attempt and send the data + # correctly after reconnecting + raise IOError + msg_length = int.from_bytes(msg[0:8], "little") + published.append(msg[8:msg_length + 8]) + + def record_connection(dest): + connections.append(dest) + + tcp_socket = mock.Mock() + tcp_socket.send = record_data + tcp_socket.connect = record_connection + return tcp_socket + + return _fake_socket_socket + + def test_reconnect(self): + self.data_sent = [] + self.connections = [] + with mock.patch('socket.socket', + self._make_disconnecting_socket(self.data_sent, + self.connections)): + publisher = tcp.TCPPublisher( + self.CONF, + netutils.urlsplit('tcp://somehost')) + publisher.publish_samples(self.test_data) + + sent_counters = [] + + for data in self.data_sent: + counter = msgpack.loads(data, raw=False) + sent_counters.append(counter) + + for connection in self.connections: + # Check destination + self.assertEqual(('somehost', 4952), connection) + self.assertEqual(len(self.connections) - 1, len(self.data_sent)) + + # Check that counters are equal + def sort_func(counter): + return counter['counter_name'] + + counters = [utils.meter_message_from_counter(d, + "not-so-secret", + publisher.conf.host) + for d in self.test_data] + counters.sort(key=sort_func) + sent_counters.sort(key=sort_func) + self.assertEqual(counters, sent_counters) + + @staticmethod + def _raise_ioerror(*args): + raise IOError + + def _make_broken_socket(self, family, type): + def connect(dst): + pass + + tcp_socket = mock.Mock() + tcp_socket.send = self._raise_ioerror + tcp_socket.connect = connect + return tcp_socket + + def test_publish_error(self): + with mock.patch('socket.socket', + self._make_broken_socket): + publisher = tcp.TCPPublisher( + self.CONF, + netutils.urlsplit('tcp://localhost')) + publisher.publish_samples(self.test_data) diff --git a/setup.cfg b/setup.cfg index 1caccacb6b..3530abd35d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -179,6 +179,7 @@ ceilometer.sample.publisher = test = ceilometer.publisher.test:TestPublisher notifier = ceilometer.publisher.messaging:SampleNotifierPublisher udp = ceilometer.publisher.udp:UDPPublisher + tcp = ceilometer.publisher.tcp:TCPPublisher file = ceilometer.publisher.file:FilePublisher http = ceilometer.publisher.http:HttpPublisher prometheus = ceilometer.publisher.prometheus:PrometheusPublisher