[BUG] Support globally available kafka binaries
Following change adds capabilities of using globally accessible kafka binaries (i.e. /usr/bin or /usr/local/bin) instead of expecting them to be found in /opt/kafka/bin Story: 2000992 Task: 4169 Change-Id: I86cceb36ceb8dab0aae56ce29fef1283383ff49b
This commit is contained in:
parent
0d6b92d7e8
commit
037aaabed6
|
@ -1,8 +1,9 @@
|
|||
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development LP
|
||||
# Copyright 2016 FUJITSU LIMITED
|
||||
# Copyright 2016-2017 FUJITSU LIMITED
|
||||
|
||||
import logging
|
||||
import os
|
||||
from os import path
|
||||
import subprocess as sp
|
||||
import re
|
||||
|
||||
from subprocess import CalledProcessError
|
||||
|
@ -20,9 +21,13 @@ from monasca_setup.detection.utils import find_addr_listening_on_port_over_tcp
|
|||
log = logging.getLogger(__name__)
|
||||
|
||||
_VIA_KAFKA_TOPIC_INDEX = 1
|
||||
_KAFKA_BIN_DIR = '/opt/kafka/bin'
|
||||
_KAFKA_CONSUMER_GROUP_COMMAND = '%s/kafka-consumer-groups.sh' % _KAFKA_BIN_DIR
|
||||
_KAFKA_ZOOKEEPER_SHELL_COMMAND = '%s/zookeeper-shell.sh' % _KAFKA_BIN_DIR
|
||||
_KAFKA_BIN_LOCATIONS = (
|
||||
'', # refers to kafka binaries being accessible
|
||||
# global (i.e. /usr/(local/)?bin)
|
||||
'/opt/kafka/bin'
|
||||
)
|
||||
_KAFKA_CONSUMER_GROUP_BIN = 'kafka-consumer-groups.sh'
|
||||
_KAFKA_ZOOKEEPER_SHELL_BIN = 'zookeeper-shell.sh'
|
||||
|
||||
_CONSUMER_GROUP_COMMAND_LINE_VALUES_LEN = 7
|
||||
|
||||
|
@ -52,13 +57,11 @@ class Kafka(Plugin):
|
|||
process_exists = find_process_cmdline('kafka') is not None
|
||||
has_dependencies = self.dependencies_installed()
|
||||
|
||||
self._consumer_group_shell_exists = os.path.isfile(
|
||||
_KAFKA_CONSUMER_GROUP_COMMAND)
|
||||
self._zookeeper_shell_exists = os.path.isfile(
|
||||
_KAFKA_ZOOKEEPER_SHELL_COMMAND)
|
||||
(self._kafka_consumer_bin,
|
||||
self._zookeeper_consumer_bin) = self._find_topic_listing_binaries()
|
||||
|
||||
kafka_has_scripts = (self._consumer_group_shell_exists or
|
||||
self._zookeeper_shell_exists)
|
||||
kafka_has_scripts = (self._kafka_consumer_bin or
|
||||
self._zookeeper_consumer_bin)
|
||||
|
||||
self.available = (process_exists and has_dependencies and
|
||||
kafka_has_scripts)
|
||||
|
@ -74,8 +77,8 @@ class Kafka(Plugin):
|
|||
log.error(('Kafka process exists, dependencies are installed '
|
||||
'but neither %s nor %s '
|
||||
'executable was found.'),
|
||||
_KAFKA_CONSUMER_GROUP_COMMAND,
|
||||
_KAFKA_ZOOKEEPER_SHELL_COMMAND)
|
||||
_KAFKA_CONSUMER_GROUP_BIN,
|
||||
_KAFKA_ZOOKEEPER_SHELL_BIN)
|
||||
|
||||
def _detect_consumers(self):
|
||||
"""Using zookeeper and a kafka connection find the consumers and associated topics. """
|
||||
|
@ -86,9 +89,9 @@ class Kafka(Plugin):
|
|||
|
||||
consumers = None
|
||||
|
||||
if self._consumer_group_shell_exists:
|
||||
if self._kafka_consumer_bin:
|
||||
consumers = self._detect_consumer_via_kafka()
|
||||
if not consumers and self._zookeeper_shell_exists:
|
||||
if not consumers and self._zookeeper_consumer_bin:
|
||||
consumers = self._detect_consumer_via_zookeeper()
|
||||
|
||||
instances = {
|
||||
|
@ -109,10 +112,10 @@ class Kafka(Plugin):
|
|||
def _detect_consumer_via_kafka(self):
|
||||
"""Detect consumers groups using kafka-consumer-groups"""
|
||||
log.info("\tDetecting kafka consumers with {:s} command".format(
|
||||
_KAFKA_CONSUMER_GROUP_COMMAND))
|
||||
self._kafka_consumer_bin))
|
||||
try:
|
||||
output = check_output([
|
||||
_KAFKA_CONSUMER_GROUP_COMMAND,
|
||||
self._kafka_consumer_bin,
|
||||
'--zookeeper',
|
||||
self.zk_url,
|
||||
'--list'
|
||||
|
@ -123,7 +126,7 @@ class Kafka(Plugin):
|
|||
|
||||
for consumer_group in consumer_groups:
|
||||
output = check_output([
|
||||
_KAFKA_CONSUMER_GROUP_COMMAND,
|
||||
self._kafka_consumer_bin,
|
||||
'--zookeeper',
|
||||
self.zk_url,
|
||||
'--describe',
|
||||
|
@ -158,7 +161,7 @@ class Kafka(Plugin):
|
|||
:rtype: dict
|
||||
"""
|
||||
log.info("\tDetecting kafka consumers with {:s} command".format(
|
||||
_KAFKA_ZOOKEEPER_SHELL_COMMAND))
|
||||
self._zookeeper_consumer_bin))
|
||||
try:
|
||||
consumers = {}
|
||||
for consumer in self._ls_zookeeper('/consumers'):
|
||||
|
@ -206,11 +209,12 @@ class Kafka(Plugin):
|
|||
I am using the local command line kafka rather than kazoo because it doesn't make sense to
|
||||
have kazoo as a dependency only for detection.
|
||||
"""
|
||||
zk_shell = [_KAFKA_ZOOKEEPER_SHELL_COMMAND, self.zk_url, 'ls', path]
|
||||
zk_shell = [self._zookeeper_consumer_bin, self.zk_url, 'ls', path]
|
||||
try:
|
||||
output = check_output(zk_shell, stderr=STDOUT)
|
||||
except CalledProcessError:
|
||||
log.error('Error running the zookeeper shell to list path %s' % path)
|
||||
log.error('Error running the zookeeper shell to list path %s',
|
||||
path)
|
||||
raise
|
||||
|
||||
# The last line is like '[item1, item2, item3]', '[]' or an error message not starting with [
|
||||
|
@ -220,6 +224,65 @@ class Kafka(Plugin):
|
|||
else:
|
||||
return [entry.strip() for entry in last_line.strip('[]').split(',')]
|
||||
|
||||
def _find_topic_listing_binaries(self):
|
||||
|
||||
kafka_bin = zookeper_bin = None
|
||||
|
||||
for location in _KAFKA_BIN_LOCATIONS:
|
||||
if not kafka_bin:
|
||||
kafka_bin = self._verify_callable_exists(
|
||||
path.join(location, _KAFKA_CONSUMER_GROUP_BIN)
|
||||
)
|
||||
if not zookeper_bin:
|
||||
zookeper_bin = self._verify_callable_exists(
|
||||
path.join(location, _KAFKA_ZOOKEEPER_SHELL_BIN)
|
||||
)
|
||||
|
||||
# traversed all locations, this is what we've got
|
||||
if kafka_bin:
|
||||
log.debug('\tFound %s at %s', _KAFKA_CONSUMER_GROUP_BIN,
|
||||
kafka_bin)
|
||||
if zookeper_bin:
|
||||
log.debug('\tFound %s at %s', _KAFKA_ZOOKEEPER_SHELL_BIN,
|
||||
zookeper_bin)
|
||||
|
||||
return kafka_bin, zookeper_bin
|
||||
|
||||
@staticmethod
|
||||
def _verify_callable_exists(path):
|
||||
"""Verify if callable exists.
|
||||
|
||||
Method tries to execute binary located
|
||||
under path to see if that exists.
|
||||
|
||||
If binary cannot be called, an :py:exc:`OSError` is thrown.
|
||||
Effectively that means that binary is not accessible.
|
||||
|
||||
Note:
|
||||
If binary is not callable, method returns None
|
||||
|
||||
:param path: path to binary
|
||||
:type path: str
|
||||
:return: path or None
|
||||
:rtype: (str, None)
|
||||
|
||||
"""
|
||||
try:
|
||||
sp.check_output(args=[path], stderr=sp.STDOUT)
|
||||
except sp.CalledProcessError:
|
||||
log.debug('\tExecutable %s exists', path)
|
||||
except OSError:
|
||||
log.debug('\tNo executable/file at %s', path)
|
||||
return None
|
||||
except Exception as ex:
|
||||
log.warning('Skipping exception %s', str(ex))
|
||||
# note(trebskit) we are not interested in other problems
|
||||
# from POV of this method only relevant information
|
||||
# is that OSError is not thrown which would be the case
|
||||
# if binary is not accessible
|
||||
pass
|
||||
return path # return path if if it can be launched
|
||||
|
||||
def build_config(self):
|
||||
"""Build the config as a Plugins object and return.
|
||||
Config includes: consumer_groups (include topics) and kafka_connection_str
|
||||
|
|
Loading…
Reference in New Issue