125 lines
3.8 KiB
Python
125 lines
3.8 KiB
Python
#!/usr/bin/env python
|
|
|
|
# Copyright (c) 2016 Hewlett Packard Enterprise Development Company, L.P.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""Monanas Runner.
|
|
|
|
This script checks for appropriate arguments and starts Monanas to use
|
|
data coming from one or more given sources. The source(s) can be configured
|
|
using the optional argument --sources. However, a default source using random
|
|
data generator is provided in the config folder.
|
|
|
|
Usage:
|
|
run.py -p <spark_path> -c <config> -l <log_config> [-s <sources>...
|
|
[<sources>]] [-dvh]
|
|
run.py -v | --version
|
|
run.py -h | --help
|
|
|
|
Options:
|
|
-c --config Config file.
|
|
-d --debug Show debug messages.
|
|
-h --help Show this screen.
|
|
-l --log_config Log config file's path.
|
|
-p --spark_path Spark's path.
|
|
-s --sources A list of data sources.
|
|
-v --version Show version.
|
|
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import logging.config as log_conf
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
|
|
import docopt
|
|
|
|
import setup_property
|
|
|
|
|
|
class RunnerError(Exception):
|
|
def __init__(self, value):
|
|
self._value = value
|
|
|
|
def __str__(self):
|
|
return repr(self._value)
|
|
|
|
|
|
def main(arguments):
|
|
spark_submit = "{0}/bin/spark-submit".format(arguments["<spark_path>"])
|
|
monanas_path = os.environ.get('MONANAS_HOME', "")
|
|
kafka_jar = None
|
|
|
|
try:
|
|
for filename in os.listdir("{0}/external/kafka-assembly/target".
|
|
format(arguments["<spark_path>"])):
|
|
if filename.startswith("spark-streaming-kafka-assembly") and\
|
|
not any(s in filename for s in ["source", "test"]):
|
|
kafka_jar = filename
|
|
break
|
|
|
|
if not kafka_jar:
|
|
raise OSError("Spark's external library required does not exist.")
|
|
except OSError as e:
|
|
raise RunnerError(e.__str__())
|
|
|
|
spark_kafka_jar = "{0}/external/kafka-assembly/target/{1}".\
|
|
format(arguments["<spark_path>"], kafka_jar)
|
|
command = [
|
|
spark_submit, "--master", "local[2]",
|
|
"--jars", spark_kafka_jar, monanas_path + "/monasca_analytics/monanas.py",
|
|
arguments["<config>"], arguments["<log_config>"]
|
|
]
|
|
command += arguments["<sources>"]
|
|
|
|
try:
|
|
logger.info("Executing `{}`...".format(" ".join(command)))
|
|
subprocess.Popen(command).communicate()
|
|
except OSError as e:
|
|
raise RunnerError(e.__str__())
|
|
|
|
|
|
def setup_logging(filename):
|
|
"""Setup logging based on a json string."""
|
|
with open(filename, "rt") as f:
|
|
config = json.load(f)
|
|
|
|
log_conf.dictConfig(config)
|
|
|
|
if __name__ == "__main__":
|
|
arguments = docopt.docopt(__doc__, version=setup_property.VERSION)
|
|
|
|
try:
|
|
setup_logging(arguments["<log_config>"])
|
|
except IOError:
|
|
raise RunnerError("File not found: {0}.".
|
|
format(arguments["<log_config>"]))
|
|
except ValueError:
|
|
raise RunnerError("{0} is not a valid logging config file.".
|
|
format(arguments["<log_config>"]))
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
main(arguments)
|
|
except KeyboardInterrupt:
|
|
logger.info("Monanas run script stopped.")
|
|
except RunnerError as e:
|
|
logger.error(e.__str__())
|
|
except Exception as e:
|
|
logger.error("Unexpected error: {0}. {1}.".
|
|
format(sys.exc_info()[0], e))
|