diff --git a/meteos/cluster/binary/meteos-script-1.6.0.py b/meteos/cluster/binary/meteos-script-1.6.0.py index 30f44c3..9c34f6d 100644 --- a/meteos/cluster/binary/meteos-script-1.6.0.py +++ b/meteos/cluster/binary/meteos-script-1.6.0.py @@ -62,6 +62,8 @@ from pyspark.mllib.recommendation import Rating from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.regression import LinearRegressionModel from pyspark.mllib.regression import LinearRegressionWithSGD +from pyspark.mllib.regression import RidgeRegressionModel +from pyspark.mllib.regression import RidgeRegressionWithSGD from pyspark.mllib.tree import DecisionTree from pyspark.mllib.tree import DecisionTreeModel from pyspark.mllib.util import MLUtils @@ -185,29 +187,40 @@ class RecommendationController(ModelController): return model.predict(parsedData[0], parsedData[1]) -class LinearRegressionModelController(ModelController): +class RegressionModelController(ModelController): - def __init__(self): - super(LinearRegressionModelController, self).__init__() + def __init__(self, train_name, model_name): + super(RegressionModelController, self).__init__() + self.train_class = eval(train_name) + self.model_class = eval(model_name) + self.model_params = {} + + def _parse_model_params(self, params): + + p = {} + p['iterations'] = int(params.get('numIterations', 100)) + p['step'] = float(params.get('step', 0.00000001)) + p['miniBatchFraction'] = float(params.get('miniBatchFraction', 1.0)) + p['convergenceTol'] = float(params.get('convergenceTol', 0.001)) + if self.__class__.__name__ == 'LinearRegressionModelController': + p['regParam'] = float(params.get('regParam', 0.0)) + elif self.__class__.__name__ == 'RidgeRegressionModelController': + p['regParam'] = float(params.get('regParam', 0.01)) + + self.model_params = p def create_model(self, data, params): - iterations = int(params.get('numIterations', 10)) - step = float(params.get('step', 0.00000001)) + self._parse_model_params(params) points = data.map(self.parsePoint) - return LinearRegressionWithSGD.train(points, - iterations=iterations, - step=step) + return getattr(self.train_class, 'train')(points, **self.model_params) def create_model_libsvm(self, data, params): - iterations = int(params.get('numIterations', 10)) - step = float(params.get('step', 0.00000001)) + self._parse_model_params(params) - return LinearRegressionWithSGD.train(data, - iterations=iterations, - step=step) + return getattr(self.train_class, 'train')(data, **self.model_params) def evaluate_model(self, context, model, data): @@ -224,7 +237,7 @@ class LinearRegressionModelController(ModelController): return result def load_model(self, context, path): - return LinearRegressionModel.load(context, path) + return getattr(self.model_class, 'load')(context, path) def predict(self, model, params): return model.predict(params.split(',')) @@ -233,6 +246,24 @@ class LinearRegressionModelController(ModelController): return self.predict(model, params) +class LinearRegressionModelController(RegressionModelController): + + def __init__(self): + train_name = 'LinearRegressionWithSGD' + model_name = 'LinearRegressionModel' + super(LinearRegressionModelController, self).__init__(train_name, + model_name) + + +class RidgeRegressionModelController(RegressionModelController): + + def __init__(self): + train_name = 'RidgeRegressionWithSGD' + model_name = 'RidgeRegressionModel' + super(RidgeRegressionModelController, self).__init__(train_name, + model_name) + + class LogisticRegressionModelController(ModelController): def __init__(self): @@ -456,6 +487,8 @@ class MeteosSparkController(object): self.controller = LogisticRegressionModelController() elif model_type == 'LinearRegression': self.controller = LinearRegressionModelController() + elif model_type == 'RidgeRegression': + self.controller = RidgeRegressionModelController() elif model_type == 'DecisionTreeRegression': self.controller = DecisionTreeModelController() elif model_type == 'Word2Vec':