From 28be3eed69db8a2ecdd156fb00012e1626829add Mon Sep 17 00:00:00 2001 From: Andy Smith Date: Mon, 17 Sep 2018 09:56:38 -0400 Subject: [PATCH] Update to switch to confluent-kafka library for driver This patch: * Updates server version to 2.0.0 release * Add service backend for hybrid combinations * Add dependencies Change-Id: Ice374dca539b8ed1b1965b75379bad5140121483 --- devstack/plugin.sh | 91 ++++++++++++++++++++++++++++++++++++---------- devstack/settings | 8 +++- 2 files changed, 78 insertions(+), 21 deletions(-) diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 5c4c4f2..da0dab0 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -24,10 +24,12 @@ # Notification transport wil be kafka:// # # Environment Configuration +# RPC_SERVICE - the messaging backend service for RPC # RPC_HOST - the host used to connect to the RPC messaging service. # RPC_PORT - the port used to connect to the RPC messaging service. # Defaults to 5672. # RPC_{USERNAME,PASSWORD} - for authentication with RPC messaging service. +# NOTIFY_SERVICE - the messaging backend service for Notification # NOTIFY_HOST - the host used to connect to the Notification messaging service. # NOTIFY_PORT - the port used to connect to the Notification messaging service. # Defaults to 9092. @@ -41,23 +43,38 @@ set +o xtrace # builds rpc transport url string function _get_rpc_transport_url { local virtual_host=$1 - if [ -z "$RPC_USERNAME" ]; then - echo "amqp://$RPC_HOST:${RPC_PORT}/$virtual_host" + + if [ "$RPC_SERVICE" == "rabbit" ]; then + echo $(_get_rabbit_transport_url $virtual_host) + elif [ -z "$RPC_USERNAME" ]; then + echo "$RPC_SERVICE://$RPC_HOST:${RPC_PORT}/$virtual_host" else - echo "amqp://$RPC_USERNAME:$RPC_PASSWORD@$RPC_HOST:${RPC_PORT}/$virtual_host" + echo "$RPC_SERVICE://$RPC_USERNAME:$RPC_PASSWORD@$RPC_HOST:${RPC_PORT}/$virtual_host" fi } # builds notify transport url string function _get_notify_transport_url { local virtual_host=$1 + if [ -z "$NOTIFY_USERNAME" ]; then - echo "kafka://$NOTIFY_HOST:${NOTIFY_PORT}/$virtual_host" + echo "$NOTIFY_SERVICE://$NOTIFY_HOST:${NOTIFY_PORT}/$virtual_host" else - echo "kafka://$NOTIFY_USERNAME:$NOTIFY_PASSWORD@$NOTIFY_HOST:${NOTIFY_PORT}/$virtual_host" + echo "$NOTIFY_SERVICE://$NOTIFY_USERNAME:$NOTIFY_PASSWORD@$NOTIFY_HOST:${NOTIFY_PORT}/$virtual_host" fi } +# override the default in devstack +function _rpc_add_vhost { + + if [ "$RPC_SERVICE" == "rabbit" ]; then + _rabbit_rpc_backend_add_vhost $@ + fi + + # no configuration necessary for amqp backend + return 0 +} + # Functions # ------------ # _download_kafka() - downloading kafka @@ -72,7 +89,9 @@ function _download_kafka { # driver function _install_kafka_python { # Install kafka client API + # TODO(ansmith) remove kafka-python library following switch pip_install_gr kafka-python + pip_install_gr confluent-kafka } # _install_kafka_backend() - installing Kafka with Scala and Zookeeper @@ -81,9 +100,11 @@ function _install_kafka_backend { local scala_version=${SCALA_VERSION}.0 if is_ubuntu; then - sudo apt-get install -y scala + sudo apt-get install -y scala librdkafka1 elif is_fedora; then + install_package librdkafka + is_package_installed java-1.8.0-openjdk-headless || install_package java-1.8.0-openjdk-headless if [ ! -f ${FILES}/scala-${scala_version}.tar.gz ]; then @@ -142,6 +163,14 @@ function _cleanup_kafka { echo_summary "Clean up kafka service" rm ${FILES}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz rm -rf ${KAFKA_DEST}/kafka + + if is_ubuntu; then + uninstall_package librdkafka1 + elif is_fedora; then + uninstall_package librdkafka + else + exit_distro_not_supported "kafka installation" + fi } # Set up the various configuration files used by the qpid-dispatch-router (qdr) @@ -361,9 +390,22 @@ function _cleanup_qdr_backend { if is_service_enabled kafka; then - # Note: this is the only tricky part about out of tree - # oslo.messaging plugins, you must overwrite the functions - # so that the correct settings files are made. + # this plugin can be configured to use rabbit for rpc, + # so save a copy of the original + if [ ! $(type -t _get_rabbit_transport_url) ]; then + get_transport_url_definition=$(declare -f get_transport_url) + eval "_get_rabbit_transport_url() ${get_transport_url_definition#*\()}" + export -f _get_rabbit_transport_url + fi + + # we will need original if using rabbit for rpc + if [ ! $(type -t _rabbit_rpc_backend_add_vhost) ]; then + rpc_backend_add_vhost_definition=$(declare -f rpc_backend_add_vhost) + eval "_rabbit_rpc_backend_add_vhost() ${rpc_backend_add_vhost_definition#*\()}" + export -f _rabbit_rpc_backend_add_vhost + fi + + # export the overridden functions function iniset_rpc_backend { local package=$1 local file=$2 @@ -373,21 +415,24 @@ if is_service_enabled kafka; then iniset $file $section transport_url $(_get_rpc_transport_url "$virtual_host") iniset $file oslo_messaging_notifications transport_url $(_get_notify_transport_url "$virtual_host") } + export -f iniset_rpc_backend + function get_transport_url { # TODO (ansmith) introduce separate get_*_transport calls in devstak _get_rpc_transport_url $@ } + export -f get_transport_url + function get_notification_url { _get_notify_transport_url $@ } - function rpc_backend_add_vhost { - return 0 - } - - export -f iniset_rpc_backend - export -f get_transport_url export -f get_notification_url + + function rpc_backend_add_vhost { + _rpc_add_vhost $@ + } export -f rpc_backend_add_vhost + fi # check for kafka service @@ -403,24 +448,32 @@ if is_service_enabled kafka; then elif [[ "$1" == "stack" && "$2" == "install" ]]; then # Install and configure the messaging services _install_kafka_backend - _install_qdr_backend + if [ "$RPC_SERVICE" == "amqp" ]; then + _install_qdr_backend + fi elif [[ "$1" == "stack" && "$2" == "post-config" ]]; then # Start the messaging service processes, this happens before # any services start _start_kafka_backend - _start_qdr_backend + if [ "$RPC_SERVICE" == "amqp" ]; then + _start_qdr_backend + fi elif [[ "$1" == "unstack" ]]; then # Shut down messaging services _stop_kafka - _stop_qdr + if [ "$RPC_SERVICE" == "amqp" ]; then + _stop_qdr + fi elif [[ "$1" == "clean" ]]; then # Remove state and transient data # Remember clean.sh first calls unstack.sh _cleanup_kafka - _cleanup_qdr + if [ "$RPC_SERVICE" == "amqp" ]; then + _cleanup_qdr + fi fi fi diff --git a/devstack/settings b/devstack/settings index 80027bc..a71f446 100644 --- a/devstack/settings +++ b/devstack/settings @@ -6,7 +6,7 @@ enable_service kafka KAFKA_DEST=${KAFKA_DEST:-/opt/stack/devstack-plugin-kafka} # Specify Kafka version -KAFKA_VERSION=${KAFKA_VERSION:-1.1.0} +KAFKA_VERSION=${KAFKA_VERSION:-2.0.0} KAFKA_BASEURL=${KAFKA_BASEURL:-http://www.apache.org/dist/kafka} # Specify Scala version @@ -14,11 +14,15 @@ SCALA_VERSION=${SCALA_VERSION:-2.12} SCALA_BASEURL=${SCALA_BASEURL:-http://www.scala-lang.org/files/archive} # RPC +RPC_SERVICE=${RPC_SERVICE:-rabbit} RPC_HOST=${RPC_HOST:-$SERVICE_HOST} RPC_PORT=${RPC_PORT:-5672} # Notify +NOTIFY_SERVICE=${NOTIFY_SERVICE:-kafka} NOTIFY_HOST=${NOTIFY_HOST:-$SERVICE_HOST} NOTIFY_PORT=${NOTIFY_PORT:-9092} -disable_service rabbit +if [ "$RPC_SERVICE" != "rabbit" ]; then + disable_service rabbit +fi