From e9a7a3858a13a312c12a539b9e1665186d2dd1da Mon Sep 17 00:00:00 2001 From: Vitaly Gridnev Date: Thu, 5 Nov 2015 16:59:11 +0300 Subject: [PATCH] Implement custom check for Kafka Service This change implements custom check for Kafka Service. This creates test topic, several messages are sending. Change-Id: If6013ecc6a173b99ced68722775fbe30702943c5 --- etc/edp-examples/edp-spark/README.rst | 50 ++++-- .../edp-spark/spark-kafka-example.py | 48 ++++++ etc/scenario/sahara-ci/ambari-2.3.yaml.mako | 19 ++- sahara/tests/scenario/base.py | 17 +- .../scenario/custom_checks/check_kafka.py | 148 ++++++++++++++++++ sahara/tests/scenario/validation.py | 8 + 6 files changed, 273 insertions(+), 17 deletions(-) create mode 100644 etc/edp-examples/edp-spark/spark-kafka-example.py create mode 100644 sahara/tests/scenario/custom_checks/check_kafka.py diff --git a/etc/edp-examples/edp-spark/README.rst b/etc/edp-examples/edp-spark/README.rst index 2666b480..0f222828 100644 --- a/etc/edp-examples/edp-spark/README.rst +++ b/etc/edp-examples/edp-spark/README.rst @@ -8,11 +8,11 @@ SparkPi example estimates Pi. It can take a single optional integer argument specifying the number of slices (tasks) to use. Example spark-wordcount Job -========================== +=========================== -spark-wordcount is a modified version of the WordCount example from Apache Spark. -It can read input data from hdfs or swift container, then output the number of occurrences -of each word to standard output or hdfs. +spark-wordcount is a modified version of the WordCount example from Apache +Spark. It can read input data from hdfs or swift container, then output the +number of occurrences of each word to standard output or hdfs. Launching wordcount job from Sahara UI -------------------------------------- @@ -26,9 +26,41 @@ Launching wordcount job from Sahara UI 1. Put path to input file in ``args`` 2. Put path to output file in ``args`` - 3. Fill the ``Main class`` input with the following class: ``sahara.edp.spark.SparkWordCount`` - 4. Put the following values in the job's configs: ``edp.spark.adapt_for_swift`` with value ``True``, - ``fs.swift.service.sahara.password`` with password for your username, and - ``fs.swift.service.sahara.username`` with your username. These values are required for - correct access to your input file, located in Swift. + 3. Fill the ``Main class`` input with the following class: + ``sahara.edp.spark.SparkWordCount`` + 4. Put the following values in the job's configs: + ``edp.spark.adapt_for_swift`` with value ``True``, + ``fs.swift.service.sahara.password`` with password for your username, + and ``fs.swift.service.sahara.username`` with your username. These + values are required for correct access to your input file, located in + Swift. 5. Execute the job. You will be able to view your output in hdfs. + +Launching spark-kafka-example +----------------------------- + +0. Create a cluster with ``Kafka Broker``, ``ZooKeeper`` and + ``Spark History Server``. The Ambari plugin can be used for that purpose. + Please, use your keypair during cluster creation to have the ability to + ssh in instances with that processes. For simplicity, these services + should located on same the node. +1. Ssh to the node with the ``Kafka Broker`` service. Create a sample topic + using the following command: + ``path/kafka-topics.sh --create --zookeeper localhost:2181 \ + --replication-factor 1 --partitions 1 --topic test-topic``. + Also execute ``path/kafka-console-producer.sh --broker-list \ + localhost:6667 --topic test-topic`` and then put several messages in the + topic. Please, note that you need to replace the values ``localhost`` + and ``path`` with your own values. +2. Download the Spark Streaming utils to the node with your + ``Spark History Server`` from this URL: + ``http://central.maven.org/maven2/org/apache/spark/spark-streaming-kafka-assembly_2.10/1.4.1/spark-streaming-kafka-assembly_2.10-1.4.1.jar``. + Now you are ready to launch your job from sahara UI. +3. Create a job binary that points to ``spark-kafka-example.py``. + Also you need to create a job that uses this job binary as a main binary. +4. Execute the job with the following job configs: + ``edp.spark.driver.classpath`` with a value that points to the utils + downloaded during step 2. Also the job should be run with the following + arguments: ``localhost:2181`` as the first argument, ``test-topic`` as + the second, and ``30`` as the third. +5. Congratulations, your job was successfully launched! diff --git a/etc/edp-examples/edp-spark/spark-kafka-example.py b/etc/edp-examples/edp-spark/spark-kafka-example.py new file mode 100644 index 00000000..5d336604 --- /dev/null +++ b/etc/edp-examples/edp-spark/spark-kafka-example.py @@ -0,0 +1,48 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import sys + +from pyspark import SparkContext +from pyspark.streaming.kafka import KafkaUtils +from pyspark.streaming import StreamingContext + + +def main(): + if len(sys.argv) != 4: + print("Usage: kafka_wordcount.py ", + file=sys.stderr) + exit(-1) + + sc = SparkContext(appName="PythonStreamingKafkaWordCount") + ssc = StreamingContext(sc, 1) + timeout = None + if len(sys.argv) == 4: + zk, topic, timeout = sys.argv[1:] + timeout = int(timeout) + else: + zk, topic = sys.argv[1:] + kvs = KafkaUtils.createStream( + ssc, zk, "spark-streaming-consumer", {topic: 1}) + lines = kvs.map(lambda x: x[1]) + counts = lines.flatMap(lambda line: (line.split(" ")) + .map(lambda word: (word, 1)) + .reduceByKey(lambda a, b: a+b)) + counts.pprint() + kwargs = {} + if timeout: + kwargs['timeout'] = timeout + ssc.start() + ssc.awaitTermination(**kwargs) diff --git a/etc/scenario/sahara-ci/ambari-2.3.yaml.mako b/etc/scenario/sahara-ci/ambari-2.3.yaml.mako index 13909bbc..c8dfbf0f 100644 --- a/etc/scenario/sahara-ci/ambari-2.3.yaml.mako +++ b/etc/scenario/sahara-ci/ambari-2.3.yaml.mako @@ -14,6 +14,7 @@ clusters: - SecondaryNameNode - YARN Timeline Server - ZooKeeper + - Kafka Broker auto_security_group: true - name: master-edp flavor: ${ci_flavor_id} @@ -39,10 +40,26 @@ clusters: cluster_configs: HDFS: dfs.datanode.du.reserved: 0 + custom_checks: + check_kafka: + zookeeper_process: ZooKeeper + kafka_process: Kafka Broker + spark_flow: + - type: Spark + main_lib: + type: database + source: etc/edp-examples/edp-spark/spark-kafka-example.jar + args: + - '{zookeeper_list}' + - '{topic}' + - '{timeout}' + timeout: 30 cluster: name: ${cluster_name} scenario: - run_jobs + - kafka + edp_jobs_flow: - java_job - - spark_pi \ No newline at end of file + - spark_pi diff --git a/sahara/tests/scenario/base.py b/sahara/tests/scenario/base.py index c84fd672..a064c743 100644 --- a/sahara/tests/scenario/base.py +++ b/sahara/tests/scenario/base.py @@ -177,6 +177,15 @@ class BaseTestCase(base.BaseTestCase): configs['args'] = args return configs + def _prepare_job_running(self, job): + input_id, output_id = self._create_datasources(job) + main_libs, additional_libs = self._create_job_binaries(job) + job_id = self._create_job(job['type'], main_libs, additional_libs) + configs = self._parse_job_configs(job) + configs = self._put_io_data_to_configs( + configs, input_id, output_id) + return [job_id, input_id, output_id, configs] + @track_result("Check EDP jobs", False) def check_run_jobs(self): batching = self.testcase.get('edp_batching', @@ -186,13 +195,7 @@ class BaseTestCase(base.BaseTestCase): pre_exec = [] for job in jobs: - input_id, output_id = self._create_datasources(job) - main_libs, additional_libs = self._create_job_binaries(job) - job_id = self._create_job(job['type'], main_libs, additional_libs) - configs = self._parse_job_configs(job) - configs = self._put_io_data_to_configs( - configs, input_id, output_id) - pre_exec.append([job_id, input_id, output_id, configs]) + pre_exec.append(self._prepare_job_running(job)) batching -= 1 if not batching: self._job_batching(pre_exec) diff --git a/sahara/tests/scenario/custom_checks/check_kafka.py b/sahara/tests/scenario/custom_checks/check_kafka.py new file mode 100644 index 00000000..de88d7c9 --- /dev/null +++ b/sahara/tests/scenario/custom_checks/check_kafka.py @@ -0,0 +1,148 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sahara.tests.scenario import base as base_scenario +from sahara.tests.scenario import utils + + +class CustomCheckKafka(object): + def __init__(self, base_class): + self.base = base_class + + def _run_command_on_node(self, *args, **kwargs): + return self.base._run_command_on_node(*args, **kwargs) + + def _get_nodes_with_process(self, *args, **kwargs): + return self.base._get_nodes_with_process(*args, **kwargs) + + def fail(self, *args, **kwargs): + return self.base.fail(*args, **kwargs) + + def _prepare_job_running(self, *args, **kwargs): + return self.base._prepare_job_running(*args, **kwargs) + + def _job_batching(self, *args, **kwargs): + return self.base._job_batching(*args, **kwargs) + + @property + def _results(self): + return self.base._results + + @_results.setter + def _results(self, value): + self.base._results = value + + @staticmethod + def _get_nodes_desc_list(nodes, node_domain, port): + data = [] + for node in nodes: + fqdn = "{0}.{1}".format( + node["instance_name"], node_domain) + data.append("{0}:{1}".format(fqdn, port)) + return ",".join(data) + + def _get_node_ip(self, process): + node = self._get_nodes_with_process(process)[0] + return node["management_ip"] + + def _search_file_on_node(self, ip, file): + file_path = self._run_command_on_node( + ip, 'find / -name "{file}" 2>/dev/null -print | head -n 1' + .format(file=file)) + if not file_path: + self.fail("Cannot find file: {file}".format(file)) + return file_path.rstrip() + + def _create_test_topic(self, broker, topic, zookeepers): + ip = self._get_node_ip(broker) + scr = self._search_file_on_node(ip, "kafka-topics.sh") + # TODO(vgridnev): Avoid hardcoded values in future + self._run_command_on_node( + ip, "{script} --create --zookeeper {zoo} --replication-factor " + "1 --partitions 1 --topic {topic}".format( + script=scr, zoo=zookeepers, topic=topic)) + + def _send_messages(self, broker, topic, broker_list): + ip = self._get_node_ip(broker) + + scr = self._search_file_on_node(ip, "kafka-console-producer.sh") + messages = ["<