heat/heat/engine/resources/openstack/sahara/job.py

316 lines
11 KiB
Python

#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import six
from heat.common import exception
from heat.common.i18n import _
from heat.engine import attributes
from heat.engine import constraints
from heat.engine import properties
from heat.engine import resource
from heat.engine.resources import signal_responder
from heat.engine import support
from heat.engine import translation
# NOTE(tlashchova): copied from sahara/utils/api_validator.py
SAHARA_NAME_REGEX = r"^[a-zA-Z0-9][a-zA-Z0-9\-_\.]*$"
class SaharaJob(signal_responder.SignalResponder, resource.Resource):
"""A resource for creating Sahara Job.
A job specifies the type of the job and lists all of the individual
job binary objects. Can be launched using resource-signal.
"""
support_status = support.SupportStatus(version='8.0.0')
PROPERTIES = (
NAME, TYPE, MAINS, LIBS, DESCRIPTION,
DEFAULT_EXECUTION_DATA, IS_PUBLIC, IS_PROTECTED
) = (
'name', 'type', 'mains', 'libs', 'description',
'default_execution_data', 'is_public', 'is_protected'
)
_EXECUTION_DATA_KEYS = (
CLUSTER, INPUT, OUTPUT, CONFIGS, PARAMS, ARGS,
IS_PUBLIC, INTERFACE
) = (
'cluster', 'input', 'output', 'configs', 'params', 'args',
'is_public', 'interface'
)
ATTRIBUTES = (
EXECUTIONS, DEFAULT_EXECUTION_URL
) = (
'executions', 'default_execution_url'
)
properties_schema = {
NAME: properties.Schema(
properties.Schema.STRING,
_("Name of the job."),
constraints=[
constraints.Length(min=1, max=50),
constraints.AllowedPattern(SAHARA_NAME_REGEX),
],
update_allowed=True
),
TYPE: properties.Schema(
properties.Schema.STRING,
_("Type of the job."),
constraints=[
constraints.CustomConstraint('sahara.job_type')
],
required=True
),
MAINS: properties.Schema(
properties.Schema.LIST,
_("IDs or names of job's main job binary. In case of specific "
"Sahara service, this property designed as a list, but accepts "
"only one item."),
schema=properties.Schema(
properties.Schema.STRING,
_("ID of job's main job binary."),
constraints=[constraints.CustomConstraint('sahara.job_binary')]
),
constraints=[constraints.Length(max=1)],
default=[]
),
LIBS: properties.Schema(
properties.Schema.LIST,
_("IDs or names of job's lib job binaries."),
schema=properties.Schema(
properties.Schema.STRING,
constraints=[
constraints.CustomConstraint('sahara.job_binary')
]
),
default=[]
),
DESCRIPTION: properties.Schema(
properties.Schema.STRING,
_('Description of the job.'),
update_allowed=True
),
IS_PUBLIC: properties.Schema(
properties.Schema.BOOLEAN,
_('If True, job will be shared across the tenants.'),
update_allowed=True,
default=False
),
IS_PROTECTED: properties.Schema(
properties.Schema.BOOLEAN,
_('If True, job will be protected from modifications and '
'can not be deleted until this property is set to False.'),
update_allowed=True,
default=False
),
DEFAULT_EXECUTION_DATA: properties.Schema(
properties.Schema.MAP,
_('Default execution data to use when run signal.'),
schema={
CLUSTER: properties.Schema(
properties.Schema.STRING,
_('ID or name of the cluster to run the job in.'),
constraints=[
constraints.CustomConstraint('sahara.cluster')
],
required=True
),
INPUT: properties.Schema(
properties.Schema.STRING,
_('ID or name of the input data source.'),
constraints=[
constraints.CustomConstraint('sahara.data_source')
]
),
OUTPUT: properties.Schema(
properties.Schema.STRING,
_('ID or name of the output data source.'),
constraints=[
constraints.CustomConstraint('sahara.data_source')
]
),
CONFIGS: properties.Schema(
properties.Schema.MAP,
_('Config parameters to add to the job.'),
default={}
),
PARAMS: properties.Schema(
properties.Schema.MAP,
_('Parameters to add to the job.'),
default={}
),
ARGS: properties.Schema(
properties.Schema.LIST,
_('Arguments to add to the job.'),
schema=properties.Schema(
properties.Schema.STRING,
),
default=[]
),
IS_PUBLIC: properties.Schema(
properties.Schema.BOOLEAN,
_('If True, execution will be shared across the tenants.'),
default=False
),
INTERFACE: properties.Schema(
properties.Schema.MAP,
_('Interface arguments to add to the job.'),
default={}
)
},
update_allowed=True
)
}
attributes_schema = {
DEFAULT_EXECUTION_URL: attributes.Schema(
_("A signed url to create execution specified in "
"default_execution_data property."),
type=attributes.Schema.STRING,
cache_mode=attributes.Schema.CACHE_NONE
),
EXECUTIONS: attributes.Schema(
_("List of the job executions."),
type=attributes.Schema.LIST
)
}
default_client_name = 'sahara'
entity = 'jobs'
def translation_rules(self, properties):
return [
translation.TranslationRule(
properties,
translation.TranslationRule.RESOLVE,
[self.MAINS],
client_plugin=self.client_plugin(),
finder='find_resource_by_name_or_id',
entity='job_binaries'
),
translation.TranslationRule(
properties,
translation.TranslationRule.RESOLVE,
[self.LIBS],
client_plugin=self.client_plugin(),
finder='find_resource_by_name_or_id',
entity='job_binaries'
),
translation.TranslationRule(
properties,
translation.TranslationRule.RESOLVE,
[self.DEFAULT_EXECUTION_DATA, self.CLUSTER],
client_plugin=self.client_plugin(),
finder='find_resource_by_name_or_id',
entity='clusters'
),
translation.TranslationRule(
properties,
translation.TranslationRule.RESOLVE,
[self.DEFAULT_EXECUTION_DATA, self.INPUT],
client_plugin=self.client_plugin(),
finder='find_resource_by_name_or_id',
entity='data_sources'
),
translation.TranslationRule(
properties,
translation.TranslationRule.RESOLVE,
[self.DEFAULT_EXECUTION_DATA, self.OUTPUT],
client_plugin=self.client_plugin(),
finder='find_resource_by_name_or_id',
entity='data_sources'
)
]
def handle_create(self):
args = {
'name': self.properties[
self.NAME] or self.physical_resource_name(),
'type': self.properties[self.TYPE],
# Note: sahara accepts only one main binary but schema demands
# that it should be in a list.
'mains': self.properties[self.MAINS],
'libs': self.properties[self.LIBS],
'description': self.properties[self.DESCRIPTION],
'is_public': self.properties[self.IS_PUBLIC],
'is_protected': self.properties[self.IS_PROTECTED]
}
job = self.client().jobs.create(**args)
self.resource_id_set(job.id)
def handle_update(self, json_snippet, tmpl_diff, prop_diff):
if self.NAME in prop_diff:
name = prop_diff[self.NAME] or self.physical_resource_name()
prop_diff[self.NAME] = name
if self.DEFAULT_EXECUTION_DATA in prop_diff:
del prop_diff[self.DEFAULT_EXECUTION_DATA]
if prop_diff:
self.client().jobs.update(self.resource_id, **prop_diff)
def handle_signal(self, details):
data = details or self.properties.get(self.DEFAULT_EXECUTION_DATA)
execution_args = {
'job_id': self.resource_id,
'cluster_id': data.get(self.CLUSTER),
'input_id': data.get(self.INPUT),
'output_id': data.get(self.OUTPUT),
'is_public': data.get(self.IS_PUBLIC),
'interface': data.get(self.INTERFACE),
'configs': {
'configs': data.get(self.CONFIGS),
'params': data.get(self.PARAMS),
'args': data.get(self.ARGS)
},
'is_protected': False
}
try:
self.client().job_executions.create(**execution_args)
except Exception as ex:
raise exception.ResourceFailure(ex, self)
def handle_delete(self):
if self.resource_id is None:
return
with self.client_plugin().ignore_not_found:
job_exs = self.client().job_executions.find(id=self.resource_id)
for ex in job_exs:
self.client().job_executions.delete(ex.id)
super(SaharaJob, self).handle_delete()
def _resolve_attribute(self, name):
if name == self.DEFAULT_EXECUTION_URL:
return six.text_type(self._get_ec2_signed_url(never_expire=True))
elif name == self.EXECUTIONS:
try:
job_execs = self.client().job_executions.find(
id=self.resource_id)
except Exception:
return []
return [execution.to_dict() for execution in job_execs]
def resource_mapping():
return {
'OS::Sahara::Job': SaharaJob
}