Browse Source

Update to switch to confluent-kafka library for driver

This patch:
* Updates server version to 2.0.0 release
* Add service backend for hybrid combinations
* Add dependencies

Change-Id: Ice374dca539b8ed1b1965b75379bad5140121483
Andy Smith 7 months ago
parent
commit
28be3eed69
2 changed files with 77 additions and 20 deletions
  1. 71
    18
      devstack/plugin.sh
  2. 6
    2
      devstack/settings

+ 71
- 18
devstack/plugin.sh View File

@@ -24,10 +24,12 @@
24 24
 # Notification transport wil be kafka://
25 25
 #
26 26
 # Environment Configuration
27
+# RPC_SERVICE - the messaging backend service for RPC
27 28
 # RPC_HOST - the host used to connect to the RPC messaging service.
28 29
 # RPC_PORT - the port used to connect to the RPC messaging service.
29 30
 #     Defaults to 5672.
30 31
 # RPC_{USERNAME,PASSWORD} - for authentication with RPC messaging service.
32
+# NOTIFY_SERVICE - the messaging backend service for Notification
31 33
 # NOTIFY_HOST - the host used to connect to the Notification messaging service.
32 34
 # NOTIFY_PORT - the port used to connect to the Notification messaging service.
33 35
 #     Defaults to 9092.
@@ -41,23 +43,38 @@ set +o xtrace
41 43
 # builds rpc transport url string
42 44
 function _get_rpc_transport_url {
43 45
     local virtual_host=$1
44
-    if [ -z "$RPC_USERNAME" ]; then
45
-        echo "amqp://$RPC_HOST:${RPC_PORT}/$virtual_host"
46
+
47
+    if [ "$RPC_SERVICE" == "rabbit" ]; then
48
+        echo $(_get_rabbit_transport_url $virtual_host)
49
+    elif [ -z "$RPC_USERNAME" ]; then
50
+        echo "$RPC_SERVICE://$RPC_HOST:${RPC_PORT}/$virtual_host"
46 51
     else
47
-        echo "amqp://$RPC_USERNAME:$RPC_PASSWORD@$RPC_HOST:${RPC_PORT}/$virtual_host"
52
+        echo "$RPC_SERVICE://$RPC_USERNAME:$RPC_PASSWORD@$RPC_HOST:${RPC_PORT}/$virtual_host"
48 53
     fi
49 54
 }
50 55
 
51 56
 # builds notify transport url string
52 57
 function _get_notify_transport_url {
53 58
     local virtual_host=$1
59
+
54 60
     if [ -z "$NOTIFY_USERNAME" ]; then
55
-        echo "kafka://$NOTIFY_HOST:${NOTIFY_PORT}/$virtual_host"
61
+        echo "$NOTIFY_SERVICE://$NOTIFY_HOST:${NOTIFY_PORT}/$virtual_host"
56 62
     else
57
-        echo "kafka://$NOTIFY_USERNAME:$NOTIFY_PASSWORD@$NOTIFY_HOST:${NOTIFY_PORT}/$virtual_host"
63
+        echo "$NOTIFY_SERVICE://$NOTIFY_USERNAME:$NOTIFY_PASSWORD@$NOTIFY_HOST:${NOTIFY_PORT}/$virtual_host"
58 64
     fi
59 65
 }
60 66
 
67
+# override the default in devstack
68
+function _rpc_add_vhost {
69
+
70
+    if [ "$RPC_SERVICE" == "rabbit" ]; then
71
+        _rabbit_rpc_backend_add_vhost $@
72
+    fi
73
+
74
+    # no configuration necessary for amqp backend
75
+    return 0
76
+}
77
+
61 78
 # Functions
62 79
 # ------------
63 80
 # _download_kafka() - downloading kafka
@@ -72,7 +89,9 @@ function _download_kafka {
72 89
 # driver
73 90
 function _install_kafka_python {
74 91
     # Install kafka client API
92
+    # TODO(ansmith) remove kafka-python library following switch
75 93
     pip_install_gr kafka-python
94
+    pip_install_gr confluent-kafka
76 95
 }
77 96
 
78 97
 # _install_kafka_backend() - installing Kafka with Scala and Zookeeper
@@ -81,9 +100,11 @@ function _install_kafka_backend {
81 100
     local scala_version=${SCALA_VERSION}.0
82 101
 
83 102
     if is_ubuntu; then
84
-        sudo apt-get install -y scala
103
+        sudo apt-get install -y scala librdkafka1
85 104
     elif is_fedora; then
86 105
 
106
+        install_package librdkafka
107
+
87 108
         is_package_installed java-1.8.0-openjdk-headless || install_package java-1.8.0-openjdk-headless
88 109
 
89 110
         if [ ! -f ${FILES}/scala-${scala_version}.tar.gz ]; then
@@ -142,6 +163,14 @@ function _cleanup_kafka {
142 163
     echo_summary "Clean up kafka service"
143 164
     rm ${FILES}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
144 165
     rm -rf ${KAFKA_DEST}/kafka
166
+
167
+    if is_ubuntu; then
168
+        uninstall_package librdkafka1
169
+    elif is_fedora; then
170
+        uninstall_package librdkafka
171
+    else
172
+        exit_distro_not_supported "kafka installation"
173
+    fi
145 174
 }
146 175
 
147 176
 # Set up the various configuration files used by the qpid-dispatch-router (qdr)
@@ -361,9 +390,22 @@ function _cleanup_qdr_backend {
361 390
 
362 391
 if is_service_enabled kafka; then
363 392
 
364
-    # Note: this is the only tricky part about out of tree
365
-    # oslo.messaging plugins, you must overwrite the functions
366
-    # so that the correct settings files are made.
393
+    # this plugin can be configured to use rabbit for rpc,
394
+    # so save a copy of the original
395
+    if [ ! $(type -t _get_rabbit_transport_url) ]; then
396
+        get_transport_url_definition=$(declare -f get_transport_url)
397
+        eval "_get_rabbit_transport_url() ${get_transport_url_definition#*\()}"
398
+        export -f _get_rabbit_transport_url
399
+    fi
400
+
401
+    # we will need original if using rabbit for rpc
402
+    if [ ! $(type -t _rabbit_rpc_backend_add_vhost) ]; then
403
+        rpc_backend_add_vhost_definition=$(declare -f rpc_backend_add_vhost)
404
+        eval "_rabbit_rpc_backend_add_vhost() ${rpc_backend_add_vhost_definition#*\()}"
405
+        export -f _rabbit_rpc_backend_add_vhost
406
+    fi
407
+
408
+    # export the overridden functions
367 409
     function iniset_rpc_backend {
368 410
         local package=$1
369 411
         local file=$2
@@ -373,21 +415,24 @@ if is_service_enabled kafka; then
373 415
         iniset $file $section transport_url $(_get_rpc_transport_url "$virtual_host")
374 416
         iniset $file oslo_messaging_notifications transport_url $(_get_notify_transport_url "$virtual_host")
375 417
     }
418
+    export -f iniset_rpc_backend
419
+
376 420
     function get_transport_url {
377 421
         # TODO (ansmith) introduce separate get_*_transport calls in devstak
378 422
         _get_rpc_transport_url $@
379 423
     }
424
+    export -f get_transport_url
425
+
380 426
     function get_notification_url {
381 427
         _get_notify_transport_url $@
382 428
     }
429
+    export -f get_notification_url
430
+
383 431
     function rpc_backend_add_vhost {
384
-        return 0
432
+        _rpc_add_vhost $@
385 433
     }
386
-
387
-    export -f iniset_rpc_backend
388
-    export -f get_transport_url
389
-    export -f get_notification_url
390 434
     export -f rpc_backend_add_vhost
435
+
391 436
 fi
392 437
 
393 438
 # check for kafka service
@@ -403,24 +448,32 @@ if is_service_enabled kafka; then
403 448
     elif [[ "$1" == "stack" && "$2" == "install" ]]; then
404 449
         # Install and configure the messaging services
405 450
         _install_kafka_backend
406
-        _install_qdr_backend
451
+        if [ "$RPC_SERVICE" == "amqp" ]; then
452
+            _install_qdr_backend
453
+        fi
407 454
 
408 455
     elif [[ "$1" == "stack" && "$2" == "post-config" ]]; then
409 456
         # Start the messaging service processes, this happens before
410 457
         # any services start
411 458
         _start_kafka_backend
412
-        _start_qdr_backend
459
+        if [ "$RPC_SERVICE" == "amqp" ]; then
460
+            _start_qdr_backend
461
+        fi
413 462
 
414 463
     elif [[ "$1" == "unstack" ]]; then
415 464
         # Shut down messaging services
416 465
         _stop_kafka
417
-        _stop_qdr
466
+        if [ "$RPC_SERVICE" == "amqp" ]; then
467
+            _stop_qdr
468
+        fi
418 469
 
419 470
     elif [[ "$1" == "clean" ]]; then
420 471
         # Remove state and transient data
421 472
         # Remember clean.sh first calls unstack.sh
422 473
         _cleanup_kafka
423
-        _cleanup_qdr
474
+        if [ "$RPC_SERVICE" == "amqp" ]; then
475
+            _cleanup_qdr
476
+        fi
424 477
     fi
425 478
 fi
426 479
 

+ 6
- 2
devstack/settings View File

@@ -6,7 +6,7 @@ enable_service kafka
6 6
 KAFKA_DEST=${KAFKA_DEST:-/opt/stack/devstack-plugin-kafka}
7 7
 
8 8
 # Specify Kafka version
9
-KAFKA_VERSION=${KAFKA_VERSION:-1.1.0}
9
+KAFKA_VERSION=${KAFKA_VERSION:-2.0.0}
10 10
 KAFKA_BASEURL=${KAFKA_BASEURL:-http://www.apache.org/dist/kafka}
11 11
 
12 12
 # Specify Scala version
@@ -14,11 +14,15 @@ SCALA_VERSION=${SCALA_VERSION:-2.12}
14 14
 SCALA_BASEURL=${SCALA_BASEURL:-http://www.scala-lang.org/files/archive}
15 15
 
16 16
 # RPC
17
+RPC_SERVICE=${RPC_SERVICE:-rabbit}
17 18
 RPC_HOST=${RPC_HOST:-$SERVICE_HOST}
18 19
 RPC_PORT=${RPC_PORT:-5672}
19 20
 
20 21
 # Notify
22
+NOTIFY_SERVICE=${NOTIFY_SERVICE:-kafka}
21 23
 NOTIFY_HOST=${NOTIFY_HOST:-$SERVICE_HOST}
22 24
 NOTIFY_PORT=${NOTIFY_PORT:-9092}
23 25
 
24
-disable_service rabbit
26
+if [ "$RPC_SERVICE" != "rabbit" ]; then
27
+    disable_service rabbit
28
+fi

Loading…
Cancel
Save