Merge "Add OS::Sahara::Job resource"

This commit is contained in:
Jenkins 2016-12-28 04:32:00 +00:00 committed by Gerrit Code Review
commit 12091a6c98
3 changed files with 438 additions and 1 deletions

View File

@ -0,0 +1,259 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import six
from heat.common import exception
from heat.common.i18n import _
from heat.engine import attributes
from heat.engine import constraints
from heat.engine import properties
from heat.engine import resource
from heat.engine.resources import signal_responder
from heat.engine import support
# NOTE(tlashchova): copied from sahara/utils/api_validator.py
SAHARA_NAME_REGEX = r"^[a-zA-Z0-9][a-zA-Z0-9\-_\.]*$"
class SaharaJob(signal_responder.SignalResponder, resource.Resource):
"""A resource for creating Sahara Job.
A job specifies the type of the job and lists all of the individual
job binary objects. Can be launched using resource-signal.
"""
support_status = support.SupportStatus(version='8.0.0')
PROPERTIES = (
NAME, TYPE, MAINS, LIBS, DESCRIPTION,
DEFAULT_EXECUTION_DATA, IS_PUBLIC, IS_PROTECTED
) = (
'name', 'type', 'mains', 'libs', 'description',
'default_execution_data', 'is_public', 'is_protected'
)
_EXECUTION_DATA_KEYS = (
CLUSTER, INPUT, OUTPUT, CONFIGS, PARAMS, ARGS,
IS_PUBLIC, INTERFACE
) = (
'cluster', 'input', 'output', 'configs', 'params', 'args',
'is_public', 'interface'
)
ATTRIBUTES = (
EXECUTIONS, DEFAULT_EXECUTION_URL
) = (
'executions', 'default_execution_url'
)
JOB_TYPES = ['Hive', 'Java', 'MapReduce', 'MapReduce.Streaming',
'Pig', 'Shell', 'Spark', 'Storm', 'Storm.Pyleus']
properties_schema = {
NAME: properties.Schema(
properties.Schema.STRING,
_("Name of the job."),
constraints=[
constraints.Length(min=1, max=50),
constraints.AllowedPattern(SAHARA_NAME_REGEX),
],
update_allowed=True
),
TYPE: properties.Schema(
properties.Schema.STRING,
_("Type of the job."),
constraints=[constraints.AllowedValues(JOB_TYPES)],
required=True
),
MAINS: properties.Schema(
properties.Schema.LIST,
_("IDs of job's main job binary. In case of specific Sahara "
"service, this property designed as a list, but accepts only "
"one item."),
schema=properties.Schema(
properties.Schema.STRING,
_("ID of job's main job binary.")
),
constraints=[
constraints.Length(max=1)
],
default=[]
),
LIBS: properties.Schema(
properties.Schema.LIST,
_("IDs of job's lib job binaries."),
schema=properties.Schema(
properties.Schema.STRING
),
default=[]
),
DESCRIPTION: properties.Schema(
properties.Schema.STRING,
_('Description of the job.'),
update_allowed=True
),
IS_PUBLIC: properties.Schema(
properties.Schema.BOOLEAN,
_('If True, job will be shared across the tenants.'),
update_allowed=True,
default=False
),
IS_PROTECTED: properties.Schema(
properties.Schema.BOOLEAN,
_('If True, job will be protected from modifications and '
'can not be deleted until this property is set to False.'),
update_allowed=True,
default=False
),
DEFAULT_EXECUTION_DATA: properties.Schema(
properties.Schema.MAP,
_('Default execution data to use when run signal.'),
schema={
CLUSTER: properties.Schema(
properties.Schema.STRING,
_('ID of the cluster to run the job in.'),
required=True
),
INPUT: properties.Schema(
properties.Schema.STRING,
_('ID of the input data source.')
),
OUTPUT: properties.Schema(
properties.Schema.STRING,
_('ID of the output data source.')
),
CONFIGS: properties.Schema(
properties.Schema.MAP,
_('Config parameters to add to the job.'),
default={}
),
PARAMS: properties.Schema(
properties.Schema.MAP,
_('Parameters to add to the job.'),
default={}
),
ARGS: properties.Schema(
properties.Schema.LIST,
_('Arguments to add to the job.'),
schema=properties.Schema(
properties.Schema.STRING,
),
default=[]
),
IS_PUBLIC: properties.Schema(
properties.Schema.BOOLEAN,
_('If True, execution will be shared across the tenants.'),
default=False
),
INTERFACE: properties.Schema(
properties.Schema.MAP,
_('Interface arguments to add to the job.'),
default={}
)
},
update_allowed=True
)
}
attributes_schema = {
DEFAULT_EXECUTION_URL: attributes.Schema(
_("A signed url to create execution specified in "
"default_execution_data property."),
type=attributes.Schema.STRING
),
EXECUTIONS: attributes.Schema(
_("List of the job executions."),
type=attributes.Schema.LIST
)
}
default_client_name = 'sahara'
entity = 'jobs'
def handle_create(self):
args = {
'name': self.properties[
self.NAME] or self.physical_resource_name(),
'type': self.properties[self.TYPE],
# Note: sahara accepts only one main binary but schema demands
# that it should be in a list.
'mains': self.properties[self.MAINS],
'libs': self.properties[self.LIBS],
'description': self.properties[self.DESCRIPTION],
'is_public': self.properties[self.IS_PUBLIC],
'is_protected': self.properties[self.IS_PROTECTED]
}
job = self.client().jobs.create(**args)
self.resource_id_set(job.id)
def handle_update(self, json_snippet, tmpl_diff, prop_diff):
if self.NAME in prop_diff:
name = prop_diff[self.NAME] or self.physical_resource_name()
prop_diff[self.NAME] = name
if self.DEFAULT_EXECUTION_DATA in prop_diff:
del prop_diff[self.DEFAULT_EXECUTION_DATA]
if prop_diff:
self.client().jobs.update(self.resource_id, **prop_diff)
def handle_signal(self, details):
data = details or self.properties.get(self.DEFAULT_EXECUTION_DATA)
execution_args = {
'job_id': self.resource_id,
'cluster_id': data.get(self.CLUSTER),
'input_id': data.get(self.INPUT),
'output_id': data.get(self.OUTPUT),
'is_public': data.get(self.IS_PUBLIC),
'interface': data.get(self.INTERFACE),
'configs': {
'configs': data.get(self.CONFIGS),
'params': data.get(self.PARAMS),
'args': data.get(self.ARGS)
},
'is_protected': False
}
try:
self.client().job_executions.create(**execution_args)
except Exception as ex:
raise exception.ResourceFailure(ex, self)
def handle_delete(self):
if self.resource_id is None:
return
with self.client_plugin().ignore_not_found:
job_exs = self.client().job_executions.find(id=self.resource_id)
for ex in job_exs:
self.client().job_executions.delete(ex.id)
super(SaharaJob, self).handle_delete()
def _resolve_attribute(self, name):
if name == self.DEFAULT_EXECUTION_URL:
return six.text_type(self._get_ec2_signed_url())
elif name == self.EXECUTIONS:
try:
job_execs = self.client().job_executions.find(
id=self.resource_id)
except Exception:
return []
return [execution.to_dict() for execution in job_execs]
def resource_mapping():
return {
'OS::Sahara::Job': SaharaJob
}

View File

@ -32,7 +32,6 @@ class JobBinary(resource.Resource):
PROPERTIES = (
NAME, URL, DESCRIPTION, CREDENTIALS
) = (
'name', 'url', 'description', 'credentials'
)

View File

@ -0,0 +1,179 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from heat.common import template_format
from heat.engine.clients.os import sahara
from heat.engine.resources.openstack.sahara import job
from heat.engine import scheduler
from heat.tests import common
from heat.tests import utils
job_template = """
heat_template_version: newton
resources:
job:
type: OS::Sahara::Job
properties:
name: test_name_job
type: MapReduce
libs: [ fake-lib-id ]
description: test_description
is_public: True
default_execution_data:
cluster: fake-cluster-id
input: fake-input-id
output: fake-output-id
is_public: True
configs:
mapred.map.class: org.apache.oozie.example.SampleMapper
mapred.reduce.class: org.apache.oozie.example.SampleReducer
mapreduce.framework.name: yarn
"""
class SaharaJobTest(common.HeatTestCase):
def setUp(self):
super(SaharaJobTest, self).setUp()
t = template_format.parse(job_template)
self.stack = utils.parse_stack(t)
resource_defns = self.stack.t.resource_definitions(self.stack)
self.rsrc_defn = resource_defns['job']
self.client = mock.Mock()
self.patchobject(job.SaharaJob, 'client', return_value=self.client)
fake_execution = mock.Mock()
fake_execution.job_id = 'fake-resource-id'
fake_execution.id = 'fake-execution-id'
fake_execution.to_dict.return_value = {'job_id': 'fake-resource-id',
'id': 'fake-execution-id'}
self.client.job_executions.find.return_value = [fake_execution]
def _create_resource(self, name, snippet, stack):
jb = job.SaharaJob(name, snippet, stack)
value = mock.MagicMock(id='fake-resource-id')
self.client.jobs.create.return_value = value
scheduler.TaskRunner(jb.create)()
return jb
def test_create(self):
jb = self._create_resource('job', self.rsrc_defn, self.stack)
args = self.client.jobs.create.call_args[1]
expected_args = {
'name': 'test_name_job',
'type': 'MapReduce',
'libs': ['fake-lib-id'],
'description': 'test_description',
'is_public': True,
'is_protected': False,
'mains': []
}
self.assertEqual(expected_args, args)
self.assertEqual('fake-resource-id', jb.resource_id)
expected_state = (jb.CREATE, jb.COMPLETE)
self.assertEqual(expected_state, jb.state)
def test_create_without_name_passed(self):
props = self.stack.t.t['resources']['job']['properties']
del props['name']
self.rsrc_defn = self.rsrc_defn.freeze(properties=props)
jb = job.SaharaJob('job', self.rsrc_defn, self.stack)
value = mock.MagicMock(id='fake-resource-id')
self.client.jobs.create.return_value = value
jb.physical_resource_name = mock.Mock(return_value='fake_phys_name')
scheduler.TaskRunner(jb.create)()
args = self.client.jobs.create.call_args[1]
expected_args = {
'name': 'fake_phys_name',
'type': 'MapReduce',
'libs': ['fake-lib-id'],
'description': 'test_description',
'is_public': True,
'is_protected': False,
'mains': []
}
self.assertEqual(expected_args, args)
self.assertEqual('fake-resource-id', jb.resource_id)
expected_state = (jb.CREATE, jb.COMPLETE)
self.assertEqual(expected_state, jb.state)
def test_delete(self):
jb = self._create_resource('job', self.rsrc_defn, self.stack)
scheduler.TaskRunner(jb.delete)()
self.assertEqual((jb.DELETE, jb.COMPLETE), jb.state)
self.client.jobs.delete.assert_called_once_with(jb.resource_id)
self.client.job_executions.delete.assert_called_once_with(
'fake-execution-id')
def test_delete_not_found(self):
jb = self._create_resource('job', self.rsrc_defn, self.stack)
self.client.jobs.delete.side_effect = (
sahara.sahara_base.APIException(error_code=404))
scheduler.TaskRunner(jb.delete)()
self.assertEqual((jb.DELETE, jb.COMPLETE), jb.state)
self.client.jobs.delete.assert_called_once_with(jb.resource_id)
self.client.job_executions.delete.assert_called_once_with(
'fake-execution-id')
def test_delete_job_executions_raises_error(self):
jb = self._create_resource('job', self.rsrc_defn, self.stack)
self.client.job_executions.find.side_effect = [
sahara.sahara_base.APIException(400)]
self.assertRaises(sahara.sahara_base.APIException, jb.handle_delete)
def test_update(self):
jb = self._create_resource('job', self.rsrc_defn, self.stack)
props = self.stack.t.t['resources']['job']['properties'].copy()
props['name'] = 'test_name_job_new'
props['description'] = 'test_description_new'
props['is_public'] = False
self.rsrc_defn = self.rsrc_defn.freeze(properties=props)
scheduler.TaskRunner(jb.update, self.rsrc_defn)()
self.client.jobs.update.assert_called_once_with(
'fake-resource-id', name='test_name_job_new',
description='test_description_new', is_public=False)
self.assertEqual((jb.UPDATE, jb.COMPLETE), jb.state)
def test_handle_signal(self):
jb = self._create_resource('job', self.rsrc_defn, self.stack)
scheduler.TaskRunner(jb.handle_signal, None)()
expected_args = {
'job_id': 'fake-resource-id',
'cluster_id': 'fake-cluster-id',
'input_id': 'fake-input-id',
'output_id': 'fake-output-id',
'is_public': True,
'is_protected': False,
'interface': {},
'configs': {
'configs': {
'mapred.reduce.class':
'org.apache.oozie.example.SampleReducer',
'mapred.map.class':
'org.apache.oozie.example.SampleMapper',
'mapreduce.framework.name': 'yarn'},
'args': [],
'params': {}
}
}
self.client.job_executions.create.assert_called_once_with(
**expected_args)
def test_attributes(self):
jb = self._create_resource('job', self.rsrc_defn, self.stack)
jb._get_ec2_signed_url = mock.Mock(return_value='fake-url')
self.assertEqual('fake-execution-id',
jb.FnGetAtt('executions')[0]['id'])
self.assertEqual('fake-url', jb.FnGetAtt('default_execution_url'))