diff --git a/monasca_agent/collector/checks_d/host_alive.py b/monasca_agent/collector/checks_d/host_alive.py index 551230df..466d528a 100644 --- a/monasca_agent/collector/checks_d/host_alive.py +++ b/monasca_agent/collector/checks_d/host_alive.py @@ -63,7 +63,7 @@ class HostAlive(services_checks.ServicesCheck): error_message = 'Unable to open socket to host {0}'.format(host) self.log.warn(error_message) return False, error_message - if banner.startswith('SSH'): + if banner.startswith(b'SSH'): return True, None else: error_message = 'Unexpected response "{0}" from host {1}'.format(banner, host) diff --git a/monasca_agent/collector/checks_d/http_check.py b/monasca_agent/collector/checks_d/http_check.py index 33710381..9d232e33 100644 --- a/monasca_agent/collector/checks_d/http_check.py +++ b/monasca_agent/collector/checks_d/http_check.py @@ -23,8 +23,8 @@ import sys import time from httplib2 import Http -from httplib2 import httplib from httplib2 import HttpLib2Error +from six.moves import http_client import monasca_agent.collector.checks.services_checks as services_checks import monasca_agent.common.config as cfg @@ -130,7 +130,7 @@ class HTTPCheck(services_checks.ServicesCheck): self.log.warn(warn_string) return False, error_msg - except httplib.ResponseNotReady as e: + except http_client.ResponseNotReady as e: length = int((time.time() - start) * 1000) error_msg = 'error: {0}. Network is not routable after {1} ' \ 'ms'.format(repr(e), length) diff --git a/monasca_agent/collector/checks_d/kafka_consumer.py b/monasca_agent/collector/checks_d/kafka_consumer.py index da9a3f98..48d5482f 100644 --- a/monasca_agent/collector/checks_d/kafka_consumer.py +++ b/monasca_agent/collector/checks_d/kafka_consumer.py @@ -69,8 +69,7 @@ class KafkaCheck(checks.AgentCheck): assert isinstance(group, six.string_types) if isinstance(topics, dict): self.log.info("Found old config format, discarding partition list") - topics = topics.keys() - assert isinstance(topics, list) + topics = list(topics.keys()) assert isinstance(topics[0], six.string_types) consumer_groups[group] = topics return consumer_groups @@ -122,14 +121,12 @@ class KafkaCheck(checks.AgentCheck): for p in partitions: try: response = kafka_conn.send_offset_request( - [common.OffsetRequest(topic, p, -1, 1)]) + [common.OffsetRequest(topic.encode('utf-8'), p, -1, 1)]) offset_responses.append(response[0]) except common.KafkaError as e: self.log.error("Error fetching broker offset: {0}".format(e)) - for resp in offset_responses: broker_offsets[(resp.topic, resp.partition)] = resp.offsets[0] - return consumer_offsets, broker_offsets def check(self, instance): @@ -146,13 +143,12 @@ class KafkaCheck(checks.AgentCheck): # Connect to Kafka and pull information with KafkaConnection(kafka_host_ports) as kafka_conn: consumer_offsets, broker_offsets = self._get_kafka_offsets(kafka_conn, consumer_groups) - # Report the broker data if full output if full_output: broker_dimensions = dimensions.copy() for (topic, partition), broker_offset in broker_offsets.items(): broker_dimensions.update({'topic': topic, 'partition': str(partition)}) - broker_offset = broker_offsets.get((topic, partition)) + broker_offset = broker_offsets.get((topic.encode('utf-8'), partition)) self.gauge('kafka.broker_offset', broker_offset, dimensions=self._set_dimensions(broker_dimensions, instance)) @@ -162,7 +158,7 @@ class KafkaCheck(checks.AgentCheck): if per_partition: for partition, consumer_offset in offsets.items(): # Get the broker offset - broker_offset = broker_offsets.get((topic, partition)) + broker_offset = broker_offsets.get((topic.encode('utf-8'), partition)) # Report the consumer offset and lag consumer_dimensions.update({'topic': topic, 'partition': str(partition), 'consumer_group': consumer_group}) @@ -175,7 +171,7 @@ class KafkaCheck(checks.AgentCheck): consumer_dimensions.update({'topic': topic, 'consumer_group': consumer_group}) total_lag = 0 for partition, consumer_offset in offsets.items(): - broker_offset = broker_offsets.get((topic, partition)) + broker_offset = broker_offsets.get((topic.encode('utf-8'), partition)) total_lag += broker_offset - consumer_offset self.gauge('kafka.consumer_lag', total_lag, diff --git a/monasca_agent/collector/checks_d/zk.py b/monasca_agent/collector/checks_d/zk.py index 105be467..66e2b86c 100644 --- a/monasca_agent/collector/checks_d/zk.py +++ b/monasca_agent/collector/checks_d/zk.py @@ -39,6 +39,7 @@ import re import socket import struct +from oslo_utils import encodeutils from six import StringIO from monasca_agent.collector.checks import AgentCheck @@ -63,10 +64,10 @@ class Zookeeper(AgentCheck): try: # Connect to the zk client port and send the stat command sock.connect((host, port)) - sock.sendall('stat') + sock.sendall(b'stat') # Read the response into a StringIO buffer - chunk = sock.recv(chunk_size) + chunk = encodeutils.safe_decode(sock.recv(chunk_size), 'utf-8') buf.write(chunk) num_reads = 1 max_reads = 10000 @@ -76,7 +77,7 @@ class Zookeeper(AgentCheck): raise Exception( "Read %s bytes before exceeding max reads of %s. " % (buf.tell(), max_reads)) - chunk = sock.recv(chunk_size) + chunk = encodeutils.safe_decode(sock.recv(chunk_size), 'utf-8') buf.write(chunk) num_reads += 1 except socket.timeout: @@ -115,7 +116,7 @@ class Zookeeper(AgentCheck): raise Exception("Could not parse version from stat command output: %s" % start_line) else: version_tuple = match.groups() - has_connections_val = map(int, version_tuple) >= [3, 4, 4] + has_connections_val = list(map(int, version_tuple)) >= [3, 4, 4] # Clients: buf.readline() # skip the Clients: header