3542d404c5
This patch includes the following updates. - Remove "findspark" and "libpgm" packages. There are no longer in use. - Rename "sklearn" to "scikit-learn". "global-reqirements" needs "scikit-learn" instead of "sklearn" which is old package name. - Pin scipy version less than 1.2.0. Currently, according to upper-constraints.txt of openstack/requirements, scipy version is '1.2.0'. However, this scipy version seems to be unable to import and use some scikit-learn libraries. - Remove "docopt" and re-wite code wihtout "docopt". "docopt" is not used in OpenStack. - Remove requirements-check job. Currently, "tornado" is a required package, but requirements-check job has a error. However, to remove that error, adding "tornado" package to openstack/requirements was refused. Change-Id: I3bb98ef733ff16558d241968b06c31fa7508d047
137 lines
4.4 KiB
Python
137 lines
4.4 KiB
Python
#!/usr/bin/env python
|
|
|
|
# Copyright (c) 2016 Hewlett Packard Enterprise Development Company, L.P.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""Monanas Runner.
|
|
|
|
This script checks for appropriate arguments and starts Monanas to use
|
|
data coming from one or more given sources. The source(s) can be configured
|
|
using the optional argument --sources. However, a default source using random
|
|
data generator is provided in the config folder.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import logging.config as log_conf
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
|
|
import argparse
|
|
import textwrap
|
|
|
|
import setup_property
|
|
|
|
|
|
class RunnerError(Exception):
|
|
def __init__(self, value):
|
|
self._value = value
|
|
|
|
def __str__(self):
|
|
return repr(self._value)
|
|
|
|
|
|
def main(arguments):
|
|
spark_submit = "{0}/bin/spark-submit".format(arguments.spark_path)
|
|
monanas_path = os.environ.get('MONANAS_HOME', "")
|
|
kafka_jar = None
|
|
|
|
try:
|
|
for filename in os.listdir("{0}/external/kafka-assembly/target".
|
|
format(arguments.spark_path)):
|
|
if filename.startswith("spark-streaming-kafka-assembly") and\
|
|
not any(s in filename for s in ["source", "test"]):
|
|
kafka_jar = filename
|
|
break
|
|
|
|
if not kafka_jar:
|
|
raise OSError("Spark's external library required does not exist.")
|
|
except OSError as e:
|
|
raise RunnerError(e.__str__())
|
|
|
|
spark_kafka_jar = "{0}/external/kafka-assembly/target/{1}".\
|
|
format(arguments.spark_path, kafka_jar)
|
|
command = [
|
|
spark_submit, "--master", "local[2]",
|
|
"--jars", spark_kafka_jar, monanas_path + "/monasca_analytics/monanas.py",
|
|
arguments.config, arguments.log_config
|
|
]
|
|
|
|
if arguments.sources is not None:
|
|
command += arguments.sources
|
|
|
|
try:
|
|
logger.info("Executing `{}`...".format(" ".join(command)))
|
|
subprocess.Popen(command).communicate()
|
|
except OSError as e:
|
|
raise RunnerError(e.__str__())
|
|
|
|
|
|
def setup_logging(filename):
|
|
"""Setup logging based on a json string."""
|
|
with open(filename, "rt") as f:
|
|
config = json.load(f)
|
|
|
|
log_conf.dictConfig(config)
|
|
|
|
def setup_parser():
|
|
parser = argparse.ArgumentParser(
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
description=textwrap.dedent(__doc__.strip()),
|
|
add_help=False)
|
|
|
|
parser.add_argument('-c', '--config',
|
|
help='Config file.', required=True)
|
|
# "-d" currently unused
|
|
parser.add_argument('-d', '--debug',
|
|
help='Show debug messages.', action='store_true')
|
|
parser.add_argument('-h', '--help',
|
|
help='Show this screen.', action='help')
|
|
parser.add_argument('-l', '--log_config',
|
|
help='Log config file\'s path.', required=True)
|
|
parser.add_argument('-p', '--spark_path',
|
|
help='Spark\'s path.', required=True)
|
|
parser.add_argument('-s', '--sources',
|
|
help='A list of data sources.', nargs='*')
|
|
parser.add_argument('-v', '--version',
|
|
help='Show version.', action='version',
|
|
version=setup_property.VERSION)
|
|
|
|
return parser
|
|
|
|
if __name__ == "__main__":
|
|
arguments = setup_parser().parse_args()
|
|
|
|
try:
|
|
setup_logging(arguments.log_config)
|
|
except IOError:
|
|
raise RunnerError("File not found: {0}.".
|
|
format(arguments.log_config))
|
|
except ValueError:
|
|
raise RunnerError("{0} is not a valid logging config file.".
|
|
format(arguments.log_config))
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
main(arguments)
|
|
except KeyboardInterrupt:
|
|
logger.info("Monanas run script stopped.")
|
|
except RunnerError as e:
|
|
logger.error(e.__str__())
|
|
except Exception as e:
|
|
logger.error("Unexpected error: {0}. {1}.".
|
|
format(sys.exc_info()[0], e))
|