Implement custom check for Kafka Service

This change implements custom check for Kafka Service.
This creates test topic, several messages are sending.

Change-Id: If6013ecc6a173b99ced68722775fbe30702943c5
This commit is contained in:
Vitaly Gridnev 2015-11-05 16:59:11 +03:00
parent 600032aca6
commit e9a7a3858a
6 changed files with 273 additions and 17 deletions

View File

@ -8,11 +8,11 @@ SparkPi example estimates Pi. It can take a single optional integer
argument specifying the number of slices (tasks) to use.
Example spark-wordcount Job
==========================
===========================
spark-wordcount is a modified version of the WordCount example from Apache Spark.
It can read input data from hdfs or swift container, then output the number of occurrences
of each word to standard output or hdfs.
spark-wordcount is a modified version of the WordCount example from Apache
Spark. It can read input data from hdfs or swift container, then output the
number of occurrences of each word to standard output or hdfs.
Launching wordcount job from Sahara UI
--------------------------------------
@ -26,9 +26,41 @@ Launching wordcount job from Sahara UI
1. Put path to input file in ``args``
2. Put path to output file in ``args``
3. Fill the ``Main class`` input with the following class: ``sahara.edp.spark.SparkWordCount``
4. Put the following values in the job's configs: ``edp.spark.adapt_for_swift`` with value ``True``,
``fs.swift.service.sahara.password`` with password for your username, and
``fs.swift.service.sahara.username`` with your username. These values are required for
correct access to your input file, located in Swift.
3. Fill the ``Main class`` input with the following class:
``sahara.edp.spark.SparkWordCount``
4. Put the following values in the job's configs:
``edp.spark.adapt_for_swift`` with value ``True``,
``fs.swift.service.sahara.password`` with password for your username,
and ``fs.swift.service.sahara.username`` with your username. These
values are required for correct access to your input file, located in
Swift.
5. Execute the job. You will be able to view your output in hdfs.
Launching spark-kafka-example
-----------------------------
0. Create a cluster with ``Kafka Broker``, ``ZooKeeper`` and
``Spark History Server``. The Ambari plugin can be used for that purpose.
Please, use your keypair during cluster creation to have the ability to
ssh in instances with that processes. For simplicity, these services
should located on same the node.
1. Ssh to the node with the ``Kafka Broker`` service. Create a sample topic
using the following command:
``path/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic test-topic``.
Also execute ``path/kafka-console-producer.sh --broker-list \
localhost:6667 --topic test-topic`` and then put several messages in the
topic. Please, note that you need to replace the values ``localhost``
and ``path`` with your own values.
2. Download the Spark Streaming utils to the node with your
``Spark History Server`` from this URL:
``http://central.maven.org/maven2/org/apache/spark/spark-streaming-kafka-assembly_2.10/1.4.1/spark-streaming-kafka-assembly_2.10-1.4.1.jar``.
Now you are ready to launch your job from sahara UI.
3. Create a job binary that points to ``spark-kafka-example.py``.
Also you need to create a job that uses this job binary as a main binary.
4. Execute the job with the following job configs:
``edp.spark.driver.classpath`` with a value that points to the utils
downloaded during step 2. Also the job should be run with the following
arguments: ``localhost:2181`` as the first argument, ``test-topic`` as
the second, and ``30`` as the third.
5. Congratulations, your job was successfully launched!

View File

@ -0,0 +1,48 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
def main():
if len(sys.argv) != 4:
print("Usage: kafka_wordcount.py <zk> <topic> <timeout>",
file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)
timeout = None
if len(sys.argv) == 4:
zk, topic, timeout = sys.argv[1:]
timeout = int(timeout)
else:
zk, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(
ssc, zk, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: (line.split(" "))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a+b))
counts.pprint()
kwargs = {}
if timeout:
kwargs['timeout'] = timeout
ssc.start()
ssc.awaitTermination(**kwargs)

View File

@ -14,6 +14,7 @@ clusters:
- SecondaryNameNode
- YARN Timeline Server
- ZooKeeper
- Kafka Broker
auto_security_group: true
- name: master-edp
flavor: ${ci_flavor_id}
@ -39,10 +40,26 @@ clusters:
cluster_configs:
HDFS:
dfs.datanode.du.reserved: 0
custom_checks:
check_kafka:
zookeeper_process: ZooKeeper
kafka_process: Kafka Broker
spark_flow:
- type: Spark
main_lib:
type: database
source: etc/edp-examples/edp-spark/spark-kafka-example.jar
args:
- '{zookeeper_list}'
- '{topic}'
- '{timeout}'
timeout: 30
cluster:
name: ${cluster_name}
scenario:
- run_jobs
- kafka
edp_jobs_flow:
- java_job
- spark_pi
- spark_pi

View File

@ -177,6 +177,15 @@ class BaseTestCase(base.BaseTestCase):
configs['args'] = args
return configs
def _prepare_job_running(self, job):
input_id, output_id = self._create_datasources(job)
main_libs, additional_libs = self._create_job_binaries(job)
job_id = self._create_job(job['type'], main_libs, additional_libs)
configs = self._parse_job_configs(job)
configs = self._put_io_data_to_configs(
configs, input_id, output_id)
return [job_id, input_id, output_id, configs]
@track_result("Check EDP jobs", False)
def check_run_jobs(self):
batching = self.testcase.get('edp_batching',
@ -186,13 +195,7 @@ class BaseTestCase(base.BaseTestCase):
pre_exec = []
for job in jobs:
input_id, output_id = self._create_datasources(job)
main_libs, additional_libs = self._create_job_binaries(job)
job_id = self._create_job(job['type'], main_libs, additional_libs)
configs = self._parse_job_configs(job)
configs = self._put_io_data_to_configs(
configs, input_id, output_id)
pre_exec.append([job_id, input_id, output_id, configs])
pre_exec.append(self._prepare_job_running(job))
batching -= 1
if not batching:
self._job_batching(pre_exec)

View File

@ -0,0 +1,148 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.tests.scenario import base as base_scenario
from sahara.tests.scenario import utils
class CustomCheckKafka(object):
def __init__(self, base_class):
self.base = base_class
def _run_command_on_node(self, *args, **kwargs):
return self.base._run_command_on_node(*args, **kwargs)
def _get_nodes_with_process(self, *args, **kwargs):
return self.base._get_nodes_with_process(*args, **kwargs)
def fail(self, *args, **kwargs):
return self.base.fail(*args, **kwargs)
def _prepare_job_running(self, *args, **kwargs):
return self.base._prepare_job_running(*args, **kwargs)
def _job_batching(self, *args, **kwargs):
return self.base._job_batching(*args, **kwargs)
@property
def _results(self):
return self.base._results
@_results.setter
def _results(self, value):
self.base._results = value
@staticmethod
def _get_nodes_desc_list(nodes, node_domain, port):
data = []
for node in nodes:
fqdn = "{0}.{1}".format(
node["instance_name"], node_domain)
data.append("{0}:{1}".format(fqdn, port))
return ",".join(data)
def _get_node_ip(self, process):
node = self._get_nodes_with_process(process)[0]
return node["management_ip"]
def _search_file_on_node(self, ip, file):
file_path = self._run_command_on_node(
ip, 'find / -name "{file}" 2>/dev/null -print | head -n 1'
.format(file=file))
if not file_path:
self.fail("Cannot find file: {file}".format(file))
return file_path.rstrip()
def _create_test_topic(self, broker, topic, zookeepers):
ip = self._get_node_ip(broker)
scr = self._search_file_on_node(ip, "kafka-topics.sh")
# TODO(vgridnev): Avoid hardcoded values in future
self._run_command_on_node(
ip, "{script} --create --zookeeper {zoo} --replication-factor "
"1 --partitions 1 --topic {topic}".format(
script=scr, zoo=zookeepers, topic=topic))
def _send_messages(self, broker, topic, broker_list):
ip = self._get_node_ip(broker)
scr = self._search_file_on_node(ip, "kafka-console-producer.sh")
messages = ["<<EOF", "banana", "in", "sahara", "sahara", "data",
"processing", "service", "stack", "open", "stack", "EOF"]
cmd = "{script} --broker-list {brokers} --topic {topic} {msg}"
self._run_command_on_node(
ip, cmd.format(
script=scr, topic=topic, brokers=broker_list,
msg=" ".join(messages)))
def _prepare_spark_kafka_job_running(self, shs):
ip = self._get_node_ip(shs)
utils_url = (
"http://central.maven.org/maven2/org/apache/spark"
"/spark-streaming-kafka-assembly_2.10/1.4.1"
"/spark-streaming-kafka-assembly_2.10-1.4.1.jar")
# try to search spark-kafka assembly utils
result = self._search_file_on_node(ip, "spark-streaming-kafka")
if not result:
self._run_command_on_node(
ip, "wget -P /tmp/spark-utils {url}".format(
url=utils_url))
return self._search_file_on_node(ip, "spark-streaming-kafka")
@base_scenario.track_result("Check Kafka", False)
def check(self):
# This check will check correct work of Kafka
# Required things to run this check:
# Cluster running with at least one ZooKeeper server and
# Kafka Brokers and Spark can be included too
# Initially designed for Ambari plugin.
ckd = self.base.testcase.get(
'custom_checks', {}).get('check_kafka', {})
topic = ckd.get('topic', 'test-topic')
topic = utils.rand_name(topic)
zk = ckd.get('zookeeper_process', "ZooKeeper")
kb = ckd.get('kafka_process', "Kafka Broker")
shs = ckd.get('spark_process', "Spark History Server")
# Disable spark job running by default
spark_flow = ckd.get('spark_flow_test', None)
kb_port = ckd.get('kafka_port', 6667)
zk_port = ckd.get('zookeeper_port', 2181)
node_domain = ckd.get('node_domain', "novalocal")
broker_list = self._get_nodes_desc_list(
self._get_nodes_with_process(kb), node_domain, kb_port)
zookeeper_list = self._get_nodes_desc_list(
self._get_nodes_with_process(zk), node_domain, zk_port)
self._create_test_topic(kb, topic, zookeeper_list)
self._send_messages(kb, topic, broker_list)
if spark_flow:
dest = self._prepare_spark_kafka_job_running(shs)
if 'configs' not in spark_flow:
spark_flow['configs'] = {}
# override driver classpath
spark_flow['configs']['edp.spark.driver.classpath'] = dest
timeout = spark_flow.get('timeout', 30)
if 'args' not in spark_flow:
spark_flow['args'] = []
new_args = []
for arg in spark_flow['args']:
arg = arg.format(zookeeper_list=zookeeper_list,
timeout=timeout, topic=topic)
new_args.append(arg)
spark_flow['args'] = new_args
to_execute = [self._prepare_job_running(spark_flow)]
self._job_batching(to_execute)
def check(self):
CustomCheckKafka(self).check()

View File

@ -275,6 +275,14 @@ SCHEMA = {
"type": "integer",
"minimum": 1
},
"custom_checks": {
"type": "object",
"properties": {
".*": {
"type": "object",
}
}
},
"scaling": {
"type": "array",
"minItems": 1,