Merge "Refactor TCP provider"

This commit is contained in:
Zuul 2023-09-15 13:58:52 +00:00 committed by Gerrit Code Review
commit cf43abbfbf
2 changed files with 66 additions and 90 deletions

View File

@ -32,35 +32,25 @@ LOG = log.getLogger(__name__)
class TCPPublisher(publisher.ConfigPublisherBase):
def __init__(self, conf, parsed_url):
super(TCPPublisher, self).__init__(conf, parsed_url)
self.host, self.port = netutils.parse_host_port(
self.inet_addr = netutils.parse_host_port(
parsed_url.netloc, default_port=4952)
addrinfo = None
try:
addrinfo = socket.getaddrinfo(self.host, None, socket.AF_INET6,
socket.SOCK_STREAM)[0]
except socket.gaierror:
try:
addrinfo = socket.getaddrinfo(self.host, None, socket.AF_INET,
socket.SOCK_STREAM)[0]
except socket.gaierror:
pass
if addrinfo:
self.addr_family = addrinfo[0]
else:
LOG.warning(
"Cannot resolve host %s, creating AF_INET socket...",
self.host)
self.addr_family = socket.AF_INET
try:
self.create_and_connect()
except Exception:
LOG.error(_("Unable to connect to the "
"remote endpoint"))
self.socket = None
self.connect_socket()
def create_and_connect(self):
self.socket = socket.socket(self.addr_family,
socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
def connect_socket(self):
try:
self.socket = socket.create_connection(self.inet_addr)
return True
except socket.gaierror:
LOG.error(_("Unable to resolv the remote %(host)s",
{'host': self.inet_addr[0],
'port': self.inet_addr[1]}))
except TimeoutError:
LOG.error(_("Unable to connect to the remote endpoint "
"%(host)s:%(port)d. The connection timed out.",
{'host': self.inet_addr[0],
'port': self.inet_addr[1]}))
return False
def publish_samples(self, samples):
"""Send a metering message for publishing
@ -71,24 +61,34 @@ class TCPPublisher(publisher.ConfigPublisherBase):
for sample in samples:
msg = utils.meter_message_from_counter(
sample, self.conf.publisher.telemetry_secret, self.conf.host)
host = self.host
port = self.port
LOG.debug("Publishing sample %(msg)s over TCP to "
"%(host)s:%(port)d", {'msg': msg, 'host': host,
'port': port})
"%(host)s:%(port)d",
{'msg': msg,
'host': self.inet_addr[0],
'port': self.inet_addr[1]})
encoded_msg = msgpack.dumps(msg, use_bin_type=True)
msg_len = len(encoded_msg).to_bytes(8, 'little')
try:
self.socket.send(msg_len + encoded_msg)
except Exception:
LOG.error(_("Unable to send sample over TCP,"
"trying to reconnect and resend the message"))
self.create_and_connect()
if self.socket:
try:
self.socket.send(msg_len + encoded_msg)
except Exception:
LOG.exception(_("Unable to reconnect and resend"
"sample over TCP"))
continue
except OSError:
LOG.warning(_("Unable to send sample over TCP, trying "
"to reconnect and resend the message"))
if self.connect_socket():
try:
self.socket.send(msg_len + encoded_msg)
continue
except OSError:
pass
LOG.error(_("Unable to reconnect and resend sample over TCP"))
# NOTE (jokke): We do not handle exceptions in the calling code
# so raising the exception from here needs quite a bit more work.
# Same time we don't want to spam the retry messages as it's
# unlikely to change between iterations on this loop. 'break'
# rather than 'return' even the end result is the same feels
# more appropriate for now.
break
def publish_events(self, events):
"""Send an event message for publishing

View File

@ -21,7 +21,7 @@ import msgpack
from oslo_utils import netutils
from oslotest import base
from ceilometer.publisher import tcp
from ceilometer.publisher.tcp import TCPPublisher
from ceilometer.publisher import utils
from ceilometer import sample
from ceilometer import service
@ -96,20 +96,16 @@ class TestTCPPublisher(base.BaseTestCase):
@staticmethod
def _make_fake_socket(published):
def _fake_socket_socket(family, type):
def _fake_socket_create_connection(inet_addr):
def record_data(msg):
msg_length = int.from_bytes(msg[0:8], "little")
published.append(msg[8:msg_length + 8])
def connect(dst):
pass
tcp_socket = mock.Mock()
tcp_socket.send = record_data
tcp_socket.connect = connect
return tcp_socket
return _fake_socket_socket
return _fake_socket_create_connection
def setUp(self):
super(TestTCPPublisher, self).setUp()
@ -118,11 +114,10 @@ class TestTCPPublisher(base.BaseTestCase):
def test_published(self):
self.data_sent = []
with mock.patch('socket.socket',
with mock.patch('ceilometer.publisher.tcp.socket.create_connection',
self._make_fake_socket(self.data_sent)):
publisher = tcp.TCPPublisher(
self.CONF,
netutils.urlsplit('tcp://somehost'))
publisher = TCPPublisher(self.CONF,
netutils.urlsplit('tcp://somehost'))
publisher.publish_samples(self.test_data)
self.assertEqual(5, len(self.data_sent))
@ -145,37 +140,28 @@ class TestTCPPublisher(base.BaseTestCase):
sent_counters.sort(key=sort_func)
self.assertEqual(counters, sent_counters)
@staticmethod
def _make_disconnecting_socket(published, connections):
def _fake_socket_socket(family, type):
def _make_disconnecting_socket(self):
def _fake_socket_create_connection(inet_addr):
def record_data(msg):
if len(published) == len(connections) - 1:
# Raise for every each first send attempt to
# trigger a reconnection attempt and send the data
# correctly after reconnecting
raise IOError
if not self.connections:
self.connections = True
raise OSError
msg_length = int.from_bytes(msg[0:8], "little")
published.append(msg[8:msg_length + 8])
self.data_sent.append(msg[8:msg_length + 8])
def record_connection(dest):
connections.append(dest)
tcp_socket = mock.Mock()
tcp_socket = mock.MagicMock()
tcp_socket.send = record_data
tcp_socket.connect = record_connection
return tcp_socket
return _fake_socket_socket
return _fake_socket_create_connection
def test_reconnect(self):
self.data_sent = []
self.connections = []
with mock.patch('socket.socket',
self._make_disconnecting_socket(self.data_sent,
self.connections)):
publisher = tcp.TCPPublisher(
self.CONF,
netutils.urlsplit('tcp://somehost'))
self.connections = False
with mock.patch('ceilometer.publisher.tcp.socket.create_connection',
self._make_disconnecting_socket()):
publisher = TCPPublisher(self.CONF,
netutils.urlsplit('tcp://somehost'))
publisher.publish_samples(self.test_data)
sent_counters = []
@ -184,11 +170,6 @@ class TestTCPPublisher(base.BaseTestCase):
counter = msgpack.loads(data, raw=False)
sent_counters.append(counter)
for connection in self.connections:
# Check destination
self.assertEqual(('somehost', 4952), connection)
self.assertEqual(len(self.connections) - 1, len(self.data_sent))
# Check that counters are equal
def sort_func(counter):
return counter['counter_name']
@ -202,22 +183,17 @@ class TestTCPPublisher(base.BaseTestCase):
self.assertEqual(counters, sent_counters)
@staticmethod
def _raise_ioerror(*args):
raise IOError
def _make_broken_socket(self, family, type):
def connect(dst):
pass
def _raise_OSError(*args):
raise OSError
def _make_broken_socket(self, inet_addr):
tcp_socket = mock.Mock()
tcp_socket.send = self._raise_ioerror
tcp_socket.connect = connect
tcp_socket.send = self._raise_OSError
return tcp_socket
def test_publish_error(self):
with mock.patch('socket.socket',
with mock.patch('ceilometer.publisher.tcp.socket.create_connection',
self._make_broken_socket):
publisher = tcp.TCPPublisher(
self.CONF,
netutils.urlsplit('tcp://localhost'))
publisher = TCPPublisher(self.CONF,
netutils.urlsplit('tcp://localhost'))
publisher.publish_samples(self.test_data)