Remove redundant expressions

Change-Id: Ia0a4833d4f67028921d5000a6793c213d2161f21
This commit is contained in:
Hiroyuki Eguchi 2017-01-21 12:07:12 +09:00
parent b9fbbba8b7
commit 10d6362b61
2 changed files with 72 additions and 149 deletions

View File

@ -151,6 +151,51 @@ class GenericLearningDriver(driver.LearningDriver):
except Exception:
pass
def _execute_job(self, context, method, request_specs):
"""Execute Sahara Job."""
job_args = {}
job_template_id = request_specs.get('job_template_id')
cluster_id = request_specs.get('cluster_id')
job_args['method'] = method
# Set parameters of DataSet
job_args['source_dataset_url'] = request_specs.get('source_dataset_url')
job_args['dataset_format'] = request_specs.get('dataset_format')
dataset_args = {'params': request_specs.get('params')}
job_args['dataset'] = dataset_args
# Set parameters of Swift
swift_args = {}
swift_args['tenant'] = request_specs.get('swift_tenant')
swift_args['username'] = request_specs.get('swift_username')
swift_args['password'] = request_specs.get('swift_password')
job_args['swift'] = swift_args
# Set parameters of Model
model_args = {'type': request_specs.get('model_type'),
'port': request_specs.get('port'),
'params': request_specs.get('model_params')}
job_args['model'] = model_args
# Set parameters of Learning
learning_args = {'params': request_specs.get('args')}
job_args['learning'] = learning_args
LOG.debug("Execute %s job with args: %s", method, job_args)
configs = {'configs': {'edp.java.main_class': 'sahara.dummy',
'edp.spark.adapt_for_swift': True},
'args': [request_specs.get('id'),
base64.b64encode(str(job_args))]}
result = self.cluster_api.job_create(
context, job_template_id, cluster_id, configs)
return result
def create_template(self, context, request_specs):
"""Creates Template."""
@ -331,41 +376,11 @@ class GenericLearningDriver(driver.LearningDriver):
def create_dataset(self, context, request_specs):
"""Create Dataset."""
job_args = {}
method = request_specs['method'] + '_dataset'
job_template_id = request_specs['job_template_id']
cluster_id = request_specs['cluster_id']
source_dataset_url = request_specs['source_dataset_url']
job_args['method'] = request_specs['method'] + '_dataset'
job_args['source_dataset_url'] = source_dataset_url
dataset_args = {'params': request_specs['params']}
job_args['dataset'] = dataset_args
swift_args = {}
if source_dataset_url.count('swift'):
swift_args['tenant'] = request_specs['swift_tenant']
swift_args['username'] = request_specs['swift_username']
swift_args['password'] = request_specs['swift_password']
job_args['swift'] = swift_args
model_args = {'type': None}
job_args['model'] = model_args
LOG.debug("Execute job with args: %s", job_args)
configs = {'configs': {'edp.java.main_class': 'sahara.dummy',
'edp.spark.adapt_for_swift': True},
'args': [request_specs['id'],
base64.b64encode(str(job_args))]}
result = self.cluster_api.job_create(
context, job_template_id, cluster_id, configs)
return result
return self._execute_job(context,
method,
request_specs)
def delete_dataset(self, context, cluster_id, job_id, id):
"""Delete Dataset."""
@ -378,40 +393,9 @@ class GenericLearningDriver(driver.LearningDriver):
def create_model(self, context, request_specs):
"""Create Model."""
job_args = {}
job_template_id = request_specs['job_template_id']
cluster_id = request_specs['cluster_id']
source_dataset_url = request_specs['source_dataset_url']
job_args['method'] = 'create_model'
job_args['source_dataset_url'] = source_dataset_url
job_args['dataset_format'] = request_specs['dataset_format']
model_args = {'type': request_specs['model_type'],
'params': request_specs['model_params']}
job_args['model'] = model_args
swift_args = {}
if source_dataset_url.count('swift'):
swift_args['tenant'] = request_specs['swift_tenant']
swift_args['username'] = request_specs['swift_username']
swift_args['password'] = request_specs['swift_password']
job_args['swift'] = swift_args
LOG.debug("Execute job with args: %s", job_args)
configs = {'configs': {'edp.java.main_class': 'sahara.dummy',
'edp.spark.adapt_for_swift': True},
'args': [request_specs['id'],
base64.b64encode(str(job_args))]}
result = self.cluster_api.job_create(
context, job_template_id, cluster_id, configs)
return result
return self._execute_job(context,
'create_model',
request_specs)
def delete_model(self, context, cluster_id, job_id, id):
"""Delete Model."""
@ -461,29 +445,13 @@ class GenericLearningDriver(driver.LearningDriver):
def load_model(self, context, request_specs):
"""Load Model."""
job_args = {}
job_template_id = request_specs['job_template_id']
cluster_id = request_specs['cluster_id']
cluster_id = request_specs.get('cluster_id')
ip = self._get_master_ip(context, cluster_id)
port = request_specs['port']
job_args['method'] = 'online_predict'
job_args['dataset_format'] = request_specs['dataset_format']
model_args = {'type': request_specs['model_type'],
'port': port}
job_args['model'] = model_args
LOG.debug("Execute job with args: %s", job_args)
configs = {'configs': {'edp.java.main_class': 'sahara.dummy',
'edp.spark.adapt_for_swift': True},
'args': [request_specs['id'],
base64.b64encode(str(job_args))]}
result = self.cluster_api.job_create(
context, job_template_id, cluster_id, configs)
result = self._execute_job(context,
'online_predict',
request_specs)
self._wait_for_model_to_load(ip, port)
@ -507,39 +475,11 @@ class GenericLearningDriver(driver.LearningDriver):
def create_model_evaluation(self, context, request_specs):
"""Create Model Evaluation."""
job_args = {}
request_specs['id'] = request_specs['model_id']
job_template_id = request_specs['job_template_id']
cluster_id = request_specs['cluster_id']
source_dataset_url = request_specs['source_dataset_url']
job_args['method'] = 'evaluate_model'
job_args['dataset_format'] = request_specs['dataset_format']
job_args['source_dataset_url'] = source_dataset_url
model_args = {'type': request_specs['model_type']}
job_args['model'] = model_args
swift_args = {}
if source_dataset_url.count('swift'):
swift_args['tenant'] = request_specs['swift_tenant']
swift_args['username'] = request_specs['swift_username']
swift_args['password'] = request_specs['swift_password']
job_args['swift'] = swift_args
LOG.debug("Execute job with args: %s", job_args)
configs = {'configs': {'edp.java.main_class': 'sahara.dummy',
'edp.spark.adapt_for_swift': True},
'args': [request_specs['model_id'],
base64.b64encode(str(job_args))]}
result = self.cluster_api.job_create(
context, job_template_id, cluster_id, configs)
return result
return self._execute_job(context,
'evaluate_model',
request_specs)
def delete_model_evaluation(self, context, cluster_id, job_id, id):
"""Delete Model Evaluation."""
@ -549,30 +489,11 @@ class GenericLearningDriver(driver.LearningDriver):
def create_learning(self, context, request_specs):
"""Create Learning."""
job_args = {}
request_specs['id'] = request_specs['model_id']
job_template_id = request_specs['job_template_id']
cluster_id = request_specs['cluster_id']
job_args['method'] = request_specs['method']
job_args['dataset_format'] = request_specs['dataset_format']
model_args = {'type': request_specs['model_type']}
job_args['model'] = model_args
learning_args = {'params': request_specs['args']}
job_args['learning'] = learning_args
LOG.debug("Execute job with args: %s", job_args)
configs = {'configs': {'edp.java.main_class': 'sahara.dummy',
'edp.spark.adapt_for_swift': True},
'args': [request_specs['model_id'],
base64.b64encode(str(job_args))]}
result = self.cluster_api.job_create(
context, job_template_id, cluster_id, configs)
return result
return self._execute_job(context,
request_specs['method'],
request_specs)
def create_online_learning(self, context, request_specs):
"""Create Learning."""

View File

@ -355,6 +355,7 @@ class LearningManager(manager.Manager):
"""Create a Model Evaluation."""
context = context.elevated()
model_evaluation_id = request_spec['id']
LOG.debug("Create model evaluation with request: %s", request_spec)
try:
@ -368,13 +369,13 @@ class LearningManager(manager.Manager):
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Model Evaluation %s failed on creation."),
request_spec['id'])
model_evaluation_id)
self.db.model_evaluation_update(
context, request_spec['id'],
context, model_evaluation_id,
{'status': constants.STATUS_ERROR}
)
self._update_status(context, 'Model Evaluation', request_spec['id'],
self._update_status(context, 'Model Evaluation', model_evaluation_id,
job_id, stdout, stderr)
def delete_model_evaluation(self, context, cluster_id=None, job_id=None, id=None):
@ -399,6 +400,7 @@ class LearningManager(manager.Manager):
"""Create a Learning."""
context = context.elevated()
learning_id = request_spec['id']
LOG.debug("Create learning with request: %s", request_spec)
try:
@ -412,13 +414,13 @@ class LearningManager(manager.Manager):
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Learning %s failed on creation."),
request_spec['id'])
learning_id)
self.db.learning_update(
context, request_spec['id'],
context, learning_id,
{'status': constants.STATUS_ERROR}
)
self._update_status(context, 'Learning', request_spec['id'],
self._update_status(context, 'Learning', learning_id,
job_id, stdout, stderr)
def create_online_learning(self, context, request_spec=None):