From b9fbbba8b74228dcf4d4107239f3d8e4ea5c17b7 Mon Sep 17 00:00:00 2001 From: Hiroyuki Eguchi Date: Fri, 20 Jan 2017 13:49:31 +0900 Subject: [PATCH] Model Evaluation In Machine Learning, Prediction model needs to be evaluated accuracy. Add a "meteos model-evaluation" command to allow users to evaluate their models which they created. implements blueprint evaluate-model Change-Id: I35b9322d423a224311541f940ecf985870d7c396 --- meteos/api/v1/model_evaluations.py | 158 ++++++++++++++++++ meteos/api/v1/router.py | 7 + meteos/api/views/model_evaluations.py | 84 ++++++++++ meteos/cluster/binary/meteos-script-1.6.0.py | 92 +++++++++- meteos/db/api.py | 39 +++++ .../alembic/versions/001_meteos_init.py | 34 +++- meteos/db/sqlalchemy/api.py | 109 ++++++++++++ meteos/db/sqlalchemy/models.py | 24 +++ meteos/engine/api.py | 93 +++++++++++ meteos/engine/drivers/generic.py | 42 +++++ meteos/engine/manager.py | 48 ++++++ meteos/engine/rpcapi.py | 11 ++ 12 files changed, 738 insertions(+), 3 deletions(-) create mode 100644 meteos/api/v1/model_evaluations.py create mode 100644 meteos/api/views/model_evaluations.py diff --git a/meteos/api/v1/model_evaluations.py b/meteos/api/v1/model_evaluations.py new file mode 100644 index 0000000..e2144bc --- /dev/null +++ b/meteos/api/v1/model_evaluations.py @@ -0,0 +1,158 @@ +# Copyright 2013 NetApp +# All Rights Reserved. +# Copyright (c) 2016 NEC Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""The model_evaluations api.""" + +import ast +import re +import string + +from oslo_log import log +from oslo_utils import strutils +from oslo_utils import uuidutils +import six +import webob +from webob import exc + +from meteos.api import common +from meteos.api.openstack import wsgi +from meteos.api.views import model_evaluations as model_evaluation_views +from meteos import exception +from meteos.i18n import _, _LI +from meteos import engine + +LOG = log.getLogger(__name__) + + +class ModelEvaluationController(wsgi.Controller, wsgi.AdminActionsMixin): + + """The ModelEvaluations API v1 controller for the OpenStack API.""" + resource_name = 'model_evaluation' + _view_builder_class = model_evaluation_views.ViewBuilder + + def __init__(self): + super(self.__class__, self).__init__() + self.engine_api = engine.API() + + def show(self, req, id): + """Return data about the given model evaluation.""" + context = req.environ['meteos.context'] + + try: + model_evaluation = self.engine_api.get_model_evaluation(context, id) + except exception.NotFound: + raise exc.HTTPNotFound() + + return self._view_builder.detail(req, model_evaluation) + + def delete(self, req, id): + """Delete a model evaluation.""" + context = req.environ['meteos.context'] + + LOG.info(_LI("Delete model evaluation with id: %s"), id, context=context) + + try: + self.engine_api.delete_model_evaluation(context, id) + except exception.NotFound: + raise exc.HTTPNotFound() + except exception.InvalidLearning as e: + raise exc.HTTPForbidden(explanation=six.text_type(e)) + + return webob.Response(status_int=202) + + def index(self, req): + """Returns a summary list of model evaluations.""" + return self._get_model_evaluations(req, is_detail=False) + + def detail(self, req): + """Returns a detailed list of model evaluations.""" + return self._get_model_evaluations(req, is_detail=True) + + def _get_model_evaluations(self, req, is_detail): + """Returns a list of model evaluations, transformed through view builder.""" + context = req.environ['meteos.context'] + + search_opts = {} + search_opts.update(req.GET) + + # Remove keys that are not related to model_evaluation attrs + search_opts.pop('limit', None) + search_opts.pop('offset', None) + sort_key = search_opts.pop('sort_key', 'created_at') + sort_dir = search_opts.pop('sort_dir', 'desc') + + model_evaluations = self.engine_api.get_all_model_evaluations( + context, search_opts=search_opts, sort_key=sort_key, + sort_dir=sort_dir) + + limited_list = common.limited(model_evaluations, req) + + if is_detail: + model_evaluations = self._view_builder.detail_list(req, limited_list) + else: + model_evaluations = self._view_builder.summary_list(req, limited_list) + return model_evaluations + + def create(self, req, body): + """Creates a new model evaluation.""" + context = req.environ['meteos.context'] + + if not self.is_valid_body(body, 'model_evaluation'): + raise exc.HTTPUnprocessableEntity() + + model_evaluation = body['model_evaluation'] + + LOG.debug("Create model evaluation with request: %s", model_evaluation) + + display_name = model_evaluation.get('display_name') + model_id = model_evaluation.get('model_id') + source_dataset_url = model_evaluation.get('source_dataset_url') + + swift_tenant = model_evaluation.get('swift_tenant') + swift_username = model_evaluation.get('swift_username') + swift_password = model_evaluation.get('swift_password') + + try: + model = self.engine_api.get_model(context, model_id) + experiment = self.engine_api.get_experiment( + context, + model.experiment_id) + template = self.engine_api.get_template( + context, + experiment.template_id) + except exception.NotFound: + raise exc.HTTPNotFound() + + new_model_evaluation = self.engine_api.create_model_evaluation( + context, + display_name, + source_dataset_url, + model_id, + model.model_type, + model.dataset_format, + template.id, + template.job_template_id, + model.experiment_id, + experiment.cluster_id, + swift_tenant, + swift_username, + swift_password) + + return self._view_builder.detail(req, new_model_evaluation) + + +def create_resource(): + return wsgi.Resource(ModelEvaluationController()) diff --git a/meteos/api/v1/router.py b/meteos/api/v1/router.py index 1524ebe..331bf5d 100644 --- a/meteos/api/v1/router.py +++ b/meteos/api/v1/router.py @@ -25,6 +25,7 @@ from meteos.api.v1 import experiments from meteos.api.v1 import templates from meteos.api.v1 import datasets from meteos.api.v1 import models +from meteos.api.v1 import model_evaluations from meteos.api import versions @@ -71,3 +72,9 @@ class APIRouter(meteos.api.openstack.APIRouter): controller=self.resources['models'], collection={'detail': 'GET'}, member={'action': 'POST'}) + + self.resources['model_evaluations'] = model_evaluations.create_resource() + mapper.resource("model_evaluations", "model_evaluations", + controller=self.resources['model_evaluations'], + collection={'detail': 'GET'}, + member={'action': 'POST'}) diff --git a/meteos/api/views/model_evaluations.py b/meteos/api/views/model_evaluations.py new file mode 100644 index 0000000..180a200 --- /dev/null +++ b/meteos/api/views/model_evaluations.py @@ -0,0 +1,84 @@ +# Copyright 2013 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from meteos.api import common + + +class ViewBuilder(common.ViewBuilder): + + """Model a server API response as a python dictionary.""" + + _collection_name = 'model_evaluations' + _detail_version_modifiers = [] + + def summary_list(self, request, model_evaluations): + """Show a list of model evaluations without many details.""" + return self._list_view(self.summary, request, model_evaluations) + + def detail_list(self, request, model_evaluations): + """Detailed view of a list of model evaluations.""" + return self._list_view(self.detail, request, model_evaluations) + + def summary(self, request, model_evaluation): + """Generic, non-detailed view of a model evaluation.""" + return { + 'model_evaluation': { + 'id': model_evaluation.get('id'), + 'name': model_evaluation.get('display_name'), + 'status': model_evaluation.get('status'), + 'source_dataset_url': model_evaluation.get('source_dataset_url'), + 'model_id': model_evaluation.get('model_id'), + 'model_type': model_evaluation.get('model_type'), + 'stdout': model_evaluation.get('stdout'), + 'created_at': model_evaluation.get('created_at'), + 'links': self._get_links(request, model_evaluation['id']) + } + } + + def detail(self, request, model_evaluation): + """Detailed view of a single model evaluation.""" + context = request.environ['meteos.context'] + + model_evaluation_dict = { + 'id': model_evaluation.get('id'), + 'name': model_evaluation.get('display_name'), + 'status': model_evaluation.get('status'), + 'source_dataset_url': model_evaluation.get('source_dataset_url'), + 'model_id': model_evaluation.get('model_id'), + 'model_type': model_evaluation.get('model_type'), + 'user_id': model_evaluation.get('user_id'), + 'project_id': model_evaluation.get('project_id'), + 'created_at': model_evaluation.get('created_at'), + 'stdout': model_evaluation.get('stdout'), + 'stderr': model_evaluation.get('stderr'), + } + + self.update_versioned_resource_dict(request, model_evaluation_dict, model_evaluation) + + return {'model_evaluation': model_evaluation_dict} + + def _list_view(self, func, request, model_evaluations): + """Provide a view for a list of model evaluations.""" + model_evaluations_list = [func(request, model_evaluation)['model_evaluation'] + for model_evaluation in model_evaluations] + model_evaluations_links = self._get_collection_links(request, + model_evaluations, + self._collection_name) + model_evaluations_dict = dict(model_evaluations=model_evaluations_list) + + if model_evaluations_links: + model_evaluations_dict['model_evaluations_links'] = model_evaluations_links + + return model_evaluations_dict diff --git a/meteos/cluster/binary/meteos-script-1.6.0.py b/meteos/cluster/binary/meteos-script-1.6.0.py index 1465284..a56124d 100644 --- a/meteos/cluster/binary/meteos-script-1.6.0.py +++ b/meteos/cluster/binary/meteos-script-1.6.0.py @@ -48,6 +48,10 @@ from pyspark.mllib.feature import Word2Vec from pyspark.mllib.feature import Word2VecModel from pyspark.mllib.fpm import FPGrowth from pyspark.mllib.fpm import FPGrowthModel +from pyspark.mllib.evaluation import BinaryClassificationMetrics +from pyspark.mllib.evaluation import RegressionMetrics +from pyspark.mllib.evaluation import MulticlassMetrics +from pyspark.mllib.evaluation import RankingMetrics from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.regression import LinearRegressionWithSGD @@ -55,8 +59,10 @@ from pyspark.mllib.regression import LinearRegressionModel from pyspark.mllib.tree import DecisionTree, DecisionTreeModel from pyspark.mllib.util import MLUtils + EXIT_CODE='80577372-9349-463a-bbc3-1ca54f187cc9' + class ModelController(object): """Class defines interface of Model Controller.""" @@ -72,6 +78,10 @@ class ModelController(object): """Is called to create mode.""" raise NotImplementedError() + def evaluate_model(self, context, model, data): + """Is called to evaluate mode.""" + raise NotImplementedError() + def load_model(self, context, path): """Is called to load mode.""" raise NotImplementedError() @@ -90,6 +100,7 @@ class ModelController(object): values[0] = 0 return LabeledPoint(values[0], values[1:]) + class KMeansModelController(ModelController): def __init__(self): @@ -123,17 +134,42 @@ class RecommendationController(ModelController): def __init__(self): super(RecommendationController, self).__init__() + def _create_ratings(self, data): + + return data.map(lambda l: l.split(','))\ + .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2]))) + def create_model(self, data, params): # Build the recommendation model using Alternating Least Squares rank = params.get('rank', 10) numIterations = params.get('numIterations', 10) - ratings = data.map(lambda l: l.split(','))\ - .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2]))) + ratings = self._create_ratings(data) return ALS.train(ratings, rank, numIterations) + def evaluate_model(self, context, model, data): + + ratings = self._create_ratings(data) + testData = ratings.map(lambda p: (p.user, p.product)) + + predictions = model.predictAll(testData)\ + .map(lambda r: ((r.user, r.product), r.rating)) + + ratingsTuple = ratings.map(lambda r: ((r.user, r.product), r.rating)) + scoreAndLabels = predictions.join(ratingsTuple).map(lambda tup: tup[1]) + + # Instantiate regression metrics to compare predicted and actual ratings + metrics = RegressionMetrics(scoreAndLabels) + + result = "{}: {}".format("MAE", metrics.meanAbsoluteError) + os.linesep\ + + "{}: {}".format("MSE", metrics.meanSquaredError) + os.linesep\ + + "{}: {}".format("RMSE", metrics.rootMeanSquaredError) + os.linesep\ + + "{}: {}".format("R-squared", metrics.r2) + + return result + def load_model(self, context, path): return MatrixFactorizationModel.load(context, path) @@ -167,6 +203,20 @@ class LinearRegressionModelController(ModelController): iterations=iterations, step=step) + def evaluate_model(self, context, model, data): + + points = data.map(self.parsePoint) + scoreAndLabels = points.map(lambda p: (float(model.predict(p.features)), p.label)) + + metrics = RegressionMetrics(scoreAndLabels) + + result = "{}: {}".format("MAE", metrics.meanAbsoluteError) + os.linesep\ + + "{}: {}".format("MSE", metrics.meanSquaredError) + os.linesep\ + + "{}: {}".format("RMSE", metrics.rootMeanSquaredError) + os.linesep\ + + "{}: {}".format("R-squared", metrics.r2) + + return result + def load_model(self, context, path): return LinearRegressionModel.load(context, path) @@ -195,6 +245,18 @@ class LogisticRegressionModelController(ModelController): return LogisticRegressionWithSGD.train(data, numIterations) + def evaluate_model(self, context, model, data): + + predictionAndLabels = data.map(self.parsePoint)\ + .map(lambda lp: (float(model.predict(lp.features)), lp.label)) + + metrics = BinaryClassificationMetrics(predictionAndLabels) + + result = "{}: {}".format("Area under PR", metrics.areaUnderPR) + os.linesep\ + + "{}: {}".format("Area under ROC", metrics.areaUnderROC) + + return result + def load_model(self, context, path): return LogisticRegressionModel.load(context, path) @@ -241,6 +303,18 @@ class DecisionTreeModelController(ModelController): maxDepth=maxDepth, maxBins=maxBins) + def evaluate_model(self, context, model, data): + + predictions = model.predict(data.map(lambda x: x.features)) + predictionAndLabels = data.map(lambda lp: lp.label).zip(predictions) + metrics = MulticlassMetrics(predictionAndLabels) + + result = "{}: {}".format("Precision", metrics.precision()) + os.linesep\ + + "{}: {}".format("Recall", metrics.recall()) + os.linesep\ + + "{}: {}".format("F1 Score", metrics.fMeasure()) + + return result + def load_model(self, context, path): return DecisionTreeModel.load(context, path) @@ -290,6 +364,7 @@ class Word2VecModelController(ModelController): return result + class FPGrowthModelController(ModelController): def __init__(self): @@ -407,6 +482,19 @@ class MeteosSparkController(object): if self.model: self.model.save(self.context, self.modelpath) + def evaluate_model(self): + + self.load_data() + self.model = self.controller.load_model(self.context, + self.modelpath) + + output = self.controller.evaluate_model(self.context, + self.model, + self.data) + + if output is not None: + print(output) + def download_dataset(self): self.load_data() diff --git a/meteos/db/api.py b/meteos/db/api.py index eb7a02d..d071573 100644 --- a/meteos/db/api.py +++ b/meteos/db/api.py @@ -292,6 +292,45 @@ def model_delete(context, model_id): # +def model_evaluation_create(context, model_evaluation_values): + """Create new model_evaluation.""" + return IMPL.model_evaluation_create(context, model_evaluation_values) + + +def model_evaluation_update(context, model_evaluation_id, values): + """Update model_evaluation fields.""" + return IMPL.model_evaluation_update(context, model_evaluation_id, values) + + +def model_evaluation_get(context, model_evaluation_id): + """Get model_evaluation by id.""" + return IMPL.model_evaluation_get(context, model_evaluation_id) + + +def model_evaluation_get_all(context, filters=None, sort_key=None, sort_dir=None): + """Get all model_evaluations.""" + return IMPL.model_evaluation_get_all( + context, filters=filters, sort_key=sort_key, sort_dir=sort_dir, + ) + + +def model_evaluation_get_all_by_project(context, project_id, filters=None, + sort_key=None, sort_dir=None): + """Returns all model_evaluations with given project ID.""" + return IMPL.model_evaluation_get_all_by_project( + context, project_id, filters=filters, + sort_key=sort_key, sort_dir=sort_dir, + ) + + +def model_evaluation_delete(context, model_evaluation_id): + """Delete model_evaluation.""" + return IMPL.model_evaluation_delete(context, model_evaluation_id) + + +# + + def learning_create(context, learning_values): """Create new learning.""" return IMPL.learning_create(context, learning_values) diff --git a/meteos/db/migrations/alembic/versions/001_meteos_init.py b/meteos/db/migrations/alembic/versions/001_meteos_init.py index 303b72c..2c60734 100644 --- a/meteos/db/migrations/alembic/versions/001_meteos_init.py +++ b/meteos/db/migrations/alembic/versions/001_meteos_init.py @@ -168,6 +168,32 @@ def upgrade(): mysql_charset='utf8' ) + model_evaluations = Table( + 'model_evaluations', meta, + Column('created_at', DateTime), + Column('updated_at', DateTime), + Column('deleted_at', DateTime), + Column('deleted', String(length=36), default='False'), + Column('id', String(length=36), primary_key=True, nullable=False), + Column('model_id', String(length=36)), + Column('model_type', String(length=255)), + Column('source_dataset_url', String(length=255)), + Column('dataset_format', String(length=255)), + Column('user_id', String(length=255)), + Column('project_id', String(length=255)), + Column('cluster_id', String(length=36)), + Column('job_id', String(length=36)), + Column('status', String(length=255)), + Column('scheduled_at', DateTime), + Column('launched_at', DateTime), + Column('terminated_at', DateTime), + Column('display_name', String(length=255)), + Column('stdout', Text), + Column('stderr', Text), + mysql_engine='InnoDB', + mysql_charset='utf8' + ) + learnings = Table( 'learnings', meta, Column('created_at', DateTime), @@ -198,7 +224,13 @@ def upgrade(): # create all tables # Take care on create order for those with FK dependencies - tables = [services, templates, learnings, experiments, data_sets, models] + tables = [services, + templates, + learnings, + experiments, + data_sets, + models, + model_evaluations] for table in tables: if not table.exists(): diff --git a/meteos/db/sqlalchemy/api.py b/meteos/db/sqlalchemy/api.py index 3633bde..a648723 100644 --- a/meteos/db/sqlalchemy/api.py +++ b/meteos/db/sqlalchemy/api.py @@ -811,6 +811,115 @@ def model_delete(context, model_id): # +def _model_evaluation_get_query(context, session=None): + if session is None: + session = get_session() + return model_query(context, models.Model_Evaluation, session=session) + + +@require_context +def model_evaluation_get(context, model_evaluation_id, session=None): + result = _model_evaluation_get_query( + context, session).filter_by(id=model_evaluation_id).first() + + if result is None: + raise exception.NotFound() + + return result + + +@require_context +@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) +def model_evaluation_update(context, model_evaluation_id, update_values): + session = get_session() + values = copy.deepcopy(update_values) + + with session.begin(): + model_evaluation_ref = model_evaluation_get(context, + model_evaluation_id, + session=session) + + model_evaluation_ref.update(values) + model_evaluation_ref.save(session=session) + return model_evaluation_ref + + +@require_context +def model_evaluation_create(context, model_evaluation_values): + values = copy.deepcopy(model_evaluation_values) + values = ensure_dict_has_id(values) + + session = get_session() + model_evaluation_ref = models.Model_Evaluation() + model_evaluation_ref.update(values) + + with session.begin(): + model_evaluation_ref.save(session=session) + + # NOTE(u_glide): Do so to prevent errors with relationships + return model_evaluation_get(context, model_evaluation_ref['id'], session=session) + + +def _model_evaluation_get_all_with_filters(context, project_id=None, filters=None, + sort_key=None, sort_dir=None): + if not sort_key: + sort_key = 'created_at' + if not sort_dir: + sort_dir = 'desc' + query = ( + _model_evaluation_get_query(context).join() + ) + + # Apply filters + if not filters: + filters = {} + + # Apply sorting + if sort_dir.lower() not in ('desc', 'asc'): + msg = _("Wrong sorting data provided: sort key is '%(sort_key)s' " + "and sort direction is '%(sort_dir)s'.") % { + "sort_key": sort_key, "sort_dir": sort_dir} + raise exception.InvalidInput(reason=msg) + + def apply_sorting(model_evaluation, query): + sort_attr = getattr(model_evaluation, sort_key) + sort_method = getattr(sort_attr, sort_dir.lower()) + return query.order_by(sort_method()) + + try: + query = apply_sorting(models.Model_Evaluation, query) + except AttributeError: + msg = _("Wrong sorting key provided - '%s'.") % sort_key + raise exception.InvalidInput(reason=msg) + + # Returns list of model_evaluations that satisfy filters. + query = query.all() + return query + + +@require_context +def model_evaluation_get_all_by_project(context, project_id, filters=None, + sort_key=None, sort_dir=None): + """Returns list of model_evaluations with given project ID.""" + query = _model_evaluation_get_all_with_filters( + context, project_id=project_id, filters=filters, + sort_key=sort_key, sort_dir=sort_dir, + ) + return query + + +@require_context +def model_evaluation_delete(context, model_evaluation_id): + session = get_session() + + with session.begin(): + model_evaluation_ref = model_evaluation_get(context, model_evaluation_id, session) + model_evaluation_ref.soft_delete(session=session) + + +# + + def _learning_get_query(context, session=None): if session is None: session = get_session() diff --git a/meteos/db/sqlalchemy/models.py b/meteos/db/sqlalchemy/models.py index 3ea9682..4a266d9 100644 --- a/meteos/db/sqlalchemy/models.py +++ b/meteos/db/sqlalchemy/models.py @@ -174,6 +174,30 @@ class Model(BASE, MeteosBase): stderr = Column(Text) +class Model_Evaluation(BASE, MeteosBase): + + __tablename__ = 'model_evaluations' + id = Column(String(36), primary_key=True) + model_id = Column(String(36)) + model_type = Column(String(255)) + source_dataset_url = Column(String(255)) + dataset_format = Column(String(255)) + cluster_id = Column(String(36)) + job_id = Column(String(36)) + + deleted = Column(String(36), default='False') + user_id = Column(String(255)) + project_id = Column(String(255)) + + display_name = Column(String(255)) + + status = Column(String(255)) + launched_at = Column(DateTime) + + stdout = Column(Text) + stderr = Column(Text) + + class Learning(BASE, MeteosBase): __tablename__ = 'learnings' diff --git a/meteos/engine/api.py b/meteos/engine/api.py index 1587506..368b886 100644 --- a/meteos/engine/api.py +++ b/meteos/engine/api.py @@ -444,6 +444,99 @@ class API(base.Base): updates = {'status': constants.STATUS_ERROR} self.db.model_update(context, id, updates) + def get_all_model_evaluations(self, context, search_opts=None, + sort_key='created_at', sort_dir='desc'): + policy.check_policy(context, 'model_evaluation', 'get_all') + + if search_opts is None: + search_opts = {} + + LOG.debug("Searching for model evaluations by: %s", six.text_type(search_opts)) + + project_id = context.project_id + + model_evaluations = self.db.model_evaluation_get_all_by_project(context, + project_id, + sort_key=sort_key, + sort_dir=sort_dir) + + if search_opts: + results = [] + for s in model_evaluations: + # values in search_opts can be only strings + if all(s.get(k, None) == v for k, v in search_opts.items()): + results.append(s) + model_evaluations = results + return model_evaluations + + def get_model_evaluation(self, context, model_evaluation_id): + rv = self.db.model_evaluation_get(context, model_evaluation_id) + return rv + + def create_model_evaluation(self, context, name, source_dataset_url, + model_id, model_type, dataset_format, template_id, + job_template_id, experiment_id, cluster_id, + swift_tenant, swift_username, swift_password): + """Create a Model Evaluation""" + policy.check_policy(context, 'model_evaluation', 'create') + + model_evaluation = {'id': None, + 'display_name': name, + 'model_id': model_id, + 'model_type': model_type, + 'source_dataset_url': source_dataset_url, + 'dataset_format': dataset_format, + 'user_id': context.user_id, + 'project_id': context.project_id, + 'cluster_id': cluster_id + } + + try: + result = self.db.model_evaluation_create(context, model_evaluation) + result['template_id'] = template_id + result['job_template_id'] = job_template_id + result['swift_tenant'] = swift_tenant + result['swift_username'] = swift_username + result['swift_password'] = swift_password + + self.engine_rpcapi.create_model_evaluation(context, result) + updates = {'status': constants.STATUS_CREATING} + self.db.model_evaluation_update(context, + result['id'], + updates) + + LOG.info(_LI("Accepted creation of model evaluation %s."), result['id']) + except Exception: + with excutils.save_and_reraise_exception(): + self.db.model_evaluation_delete(context, result['id']) + + # Retrieve the model_evaluation with instance details + model_evaluation = self.db.model_evaluation_get(context, result['id']) + + return model_evaluation + + def delete_model_evaluation(self, context, id, force=False): + """Delete model evaluation.""" + + policy.check_policy(context, 'model_evaluation', 'delete') + + model_evaluation = self.db.model_evaluation_get(context, id) + + statuses = (constants.STATUS_AVAILABLE, constants.STATUS_ERROR, + constants.STATUS_INACTIVE) + if not (force or model_evaluation['status'] in statuses): + msg = _("Model Evaluation status must be one of %(statuses)s") % { + "statuses": statuses} + raise exception.InvalidLearning(reason=msg) + + if model_evaluation.job_id: + self.engine_rpcapi.delete_model_evaluation(context, + model_evaluation['cluster_id'], + model_evaluation['job_id'], + id) + else: + self.db.model_evaluation_delete(context, id) + def get_all_learnings(self, context, search_opts=None, sort_key='created_at', sort_dir='desc'): policy.check_policy(context, 'learning', 'get_all') diff --git a/meteos/engine/drivers/generic.py b/meteos/engine/drivers/generic.py index 7ab5b45..8e5d6b9 100644 --- a/meteos/engine/drivers/generic.py +++ b/meteos/engine/drivers/generic.py @@ -504,6 +504,48 @@ class GenericLearningDriver(driver.LearningDriver): self._wait_for_model_to_load(ip, port, unload=True) + def create_model_evaluation(self, context, request_specs): + """Create Model Evaluation.""" + + job_args = {} + + job_template_id = request_specs['job_template_id'] + cluster_id = request_specs['cluster_id'] + source_dataset_url = request_specs['source_dataset_url'] + + job_args['method'] = 'evaluate_model' + job_args['dataset_format'] = request_specs['dataset_format'] + job_args['source_dataset_url'] = source_dataset_url + + model_args = {'type': request_specs['model_type']} + job_args['model'] = model_args + + swift_args = {} + + if source_dataset_url.count('swift'): + swift_args['tenant'] = request_specs['swift_tenant'] + swift_args['username'] = request_specs['swift_username'] + swift_args['password'] = request_specs['swift_password'] + + job_args['swift'] = swift_args + + LOG.debug("Execute job with args: %s", job_args) + + configs = {'configs': {'edp.java.main_class': 'sahara.dummy', + 'edp.spark.adapt_for_swift': True}, + 'args': [request_specs['model_id'], + base64.b64encode(str(job_args))]} + + result = self.cluster_api.job_create( + context, job_template_id, cluster_id, configs) + + return result + + def delete_model_evaluation(self, context, cluster_id, job_id, id): + """Delete Model Evaluation.""" + + self.cluster_api.job_delete(context, job_id) + def create_learning(self, context, request_specs): """Create Learning.""" diff --git a/meteos/engine/manager.py b/meteos/engine/manager.py index f0ea994..98f648e 100644 --- a/meteos/engine/manager.py +++ b/meteos/engine/manager.py @@ -106,6 +106,10 @@ class LearningManager(manager.Manager): updates['stdout'] = stdout self.db.model_update(context, id, updates) + elif resource_name == 'Model Evaluation': + updates['stdout'] = stdout.rstrip('\n') + self.db.model_evaluation_update(context, id, updates) + elif resource_name == 'Learning': updates['stdout'] = stdout.rstrip('\n') self.db.learning_update(context, id, updates) @@ -347,6 +351,50 @@ class LearningManager(manager.Manager): request_spec['id'], {'status' : constants.STATUS_AVAILABLE}) + def create_model_evaluation(self, context, request_spec=None): + """Create a Model Evaluation.""" + context = context.elevated() + + LOG.debug("Create model evaluation with request: %s", request_spec) + + try: + job_id = self.driver.create_model_evaluation(context, request_spec) + stdout, stderr = self.driver.get_job_result( + context, + job_id, + request_spec['template_id'], + request_spec['cluster_id']) + + except Exception as e: + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Model Evaluation %s failed on creation."), + request_spec['id']) + self.db.model_evaluation_update( + context, request_spec['id'], + {'status': constants.STATUS_ERROR} + ) + + self._update_status(context, 'Model Evaluation', request_spec['id'], + job_id, stdout, stderr) + + def delete_model_evaluation(self, context, cluster_id=None, job_id=None, id=None): + """Deletes a Model Evaluation.""" + context = context.elevated() + + try: + self.driver.delete_model_evaluation(context, cluster_id, job_id, id) + + except Exception as e: + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Model Evaluation %s failed on deletion."), id) + self.db.model_evaluation_update( + context, id, + {'status': constants.STATUS_ERROR_DELETING} + ) + + self.db.model_evaluation_delete(context, id) + LOG.info(_LI("Model Evaluation %s deleted successfully."), id) + def create_learning(self, context, request_spec=None): """Create a Learning.""" context = context.elevated() diff --git a/meteos/engine/rpcapi.py b/meteos/engine/rpcapi.py index e6dacc3..0db860e 100644 --- a/meteos/engine/rpcapi.py +++ b/meteos/engine/rpcapi.py @@ -116,6 +116,17 @@ class LearningAPI(object): return self.cast(context, self.make_msg('unload_model', request_spec=request_spec_p)) + def create_model_evaluation(self, context, request_spec): + request_spec_p = jsonutils.to_primitive(request_spec) + return self.cast(context, self.make_msg('create_model_evaluation', + request_spec=request_spec_p)) + + def delete_model_evaluation(self, context, cluster_id, job_id, id): + return self.call(context, self.make_msg('delete_model_evaluation', + cluster_id=cluster_id, + job_id=job_id, + id=id)) + def create_learning(self, context, request_spec): request_spec_p = jsonutils.to_primitive(request_spec) return self.cast(context, self.make_msg('create_learning',