Merge "Allowing job binary objects to be updated"
This commit is contained in:
commit
00c4e3eda3
@ -56,6 +56,7 @@
|
||||
"data-processing:job-binaries:get": "",
|
||||
"data-processing:job-binaries:delete": "",
|
||||
"data-processing:job-binaries:get_data": "",
|
||||
"data-processing:job-binaries:modify": "",
|
||||
|
||||
"data-processing:job-binary-internals:get_all": "",
|
||||
"data-processing:job-binary-internals:create": "",
|
||||
|
@ -23,6 +23,7 @@ from sahara.service.validations.edp import data_source_schema as v_d_s_schema
|
||||
from sahara.service.validations.edp import job as v_j
|
||||
from sahara.service.validations.edp import job_binary as v_j_b
|
||||
from sahara.service.validations.edp import job_binary_internal as v_j_b_i
|
||||
from sahara.service.validations.edp import job_binary_schema as v_j_b_schema
|
||||
from sahara.service.validations.edp import job_execution as v_j_e
|
||||
import sahara.utils.api as u
|
||||
|
||||
@ -171,7 +172,7 @@ def job_types_get():
|
||||
|
||||
@rest.post('/job-binaries')
|
||||
@acl.enforce("data-processing:job-binaries:create")
|
||||
@v.validate(v_j_b.JOB_BINARY_SCHEMA, v_j_b.check_job_binary)
|
||||
@v.validate(v_j_b_schema.JOB_BINARY_SCHEMA, v_j_b.check_job_binary)
|
||||
def job_binary_create(data):
|
||||
return u.render(api.create_job_binary(data).to_wrapped_dict())
|
||||
|
||||
@ -208,6 +209,14 @@ def job_binary_data(job_binary_id):
|
||||
return data
|
||||
|
||||
|
||||
@rest.put('/job-binaries/<job_binary_id>')
|
||||
@acl.enforce("data-processing:job-binaries:modify")
|
||||
@v.validate(v_j_b_schema.JOB_BINARY_UPDATE_SCHEMA, v_j_b.check_job_binary)
|
||||
def job_binary_update(job_binary_id, data):
|
||||
return u.render(
|
||||
api.update_job_binary(job_binary_id, data).to_wrapped_dict())
|
||||
|
||||
|
||||
# Job binary internals ops
|
||||
|
||||
@rest.put_file('/job-binary-internals/<name>')
|
||||
|
@ -409,6 +409,11 @@ class LocalApi(object):
|
||||
"""Destroy the JobBinary or raise if it does not exist."""
|
||||
self._manager.job_binary_destroy(context, _get_id(job_binary))
|
||||
|
||||
@r.wrap(r.JobBinary)
|
||||
def job_binary_update(self, context, id, values):
|
||||
"""Update a JobBinary from the values dictionary."""
|
||||
return self._manager.job_binary_update(context, id, values)
|
||||
|
||||
# JobBinaryInternal ops
|
||||
|
||||
@r.wrap(r.JobBinaryInternal)
|
||||
|
@ -437,6 +437,13 @@ class ConductorManager(db_base.Base):
|
||||
"""Destroy the JobBinary or raise if it does not exist."""
|
||||
self.db.job_binary_destroy(context, job_binary)
|
||||
|
||||
def job_binary_update(self, context, id, values):
|
||||
"""Update a JobBinary from the values dictionary."""
|
||||
|
||||
values = copy.deepcopy(values)
|
||||
values['id'] = id
|
||||
return self.db.job_binary_update(context, values)
|
||||
|
||||
# JobBinaryInternal ops
|
||||
|
||||
def job_binary_internal_get_all(self, context, **kwargs):
|
||||
|
@ -422,6 +422,12 @@ def job_binary_destroy(context, job_binary):
|
||||
IMPL.job_binary_destroy(context, job_binary)
|
||||
|
||||
|
||||
@to_dict
|
||||
def job_binary_update(context, values):
|
||||
"""Update the JobBinary with the provided values"""
|
||||
return IMPL.job_binary_update(context, values)
|
||||
|
||||
|
||||
@to_dict
|
||||
def job_binary_internal_get_all(context, **kwargs):
|
||||
"""Get all JobBinaryInternals filtered by **kwargs.
|
||||
|
@ -966,8 +966,47 @@ def job_binary_create(context, values):
|
||||
return job_binary
|
||||
|
||||
|
||||
def _check_job_binary_referenced(ctx, session, job_binary_id):
|
||||
def job_binary_update(context, values):
|
||||
"""Returns a JobBinary updated with the provided values."""
|
||||
jb_id = values["id"]
|
||||
session = get_session()
|
||||
try:
|
||||
with session.begin():
|
||||
jb = _job_binary_get(context, session, jb_id)
|
||||
if not jb:
|
||||
raise ex.NotFoundException(
|
||||
jb_id, _("JobBinary id '%s' not found"))
|
||||
# We do not want to update the url for internal binaries
|
||||
new_url = values.get("url", None)
|
||||
if new_url and "internal-db://" in jb["url"]:
|
||||
if jb["url"] != new_url:
|
||||
raise ex.UpdateFailedException(
|
||||
jb_id,
|
||||
_("The url for JobBinary Id '%s' can not "
|
||||
"be updated because it is an internal-db url."))
|
||||
jobs = job_execution_get_all(context)
|
||||
pending_jobs = [job for job in jobs if
|
||||
job.info["status"] == "PENDING"]
|
||||
if len(pending_jobs) > 0:
|
||||
for job in pending_jobs:
|
||||
if _check_job_binary_referenced(
|
||||
context, session, jb_id, job.job_id):
|
||||
raise ex.UpdateFailedException(
|
||||
jb_id,
|
||||
_("JobBinary Id '%s' is used in a PENDING job "
|
||||
"and can not be updated."))
|
||||
jb.update(values)
|
||||
except db_exc.DBDuplicateEntry as e:
|
||||
raise ex.DBDuplicateEntry(
|
||||
_("Duplicate entry for JobBinary: %s") % e.columns)
|
||||
|
||||
return jb
|
||||
|
||||
|
||||
def _check_job_binary_referenced(ctx, session, job_binary_id, job_id=None):
|
||||
args = {"JobBinary_id": job_binary_id}
|
||||
if job_id:
|
||||
args["Job_id"] = job_id
|
||||
mains = model_query(m.mains_association, ctx, session,
|
||||
project_only=False).filter_by(**args)
|
||||
libs = model_query(m.libs_association, ctx, session,
|
||||
|
@ -213,6 +213,10 @@ def get_job_binary(id):
|
||||
return conductor.job_binary_get(context.ctx(), id)
|
||||
|
||||
|
||||
def update_job_binary(id, values):
|
||||
return conductor.job_binary_update(context.ctx(), id, values)
|
||||
|
||||
|
||||
def delete_job_binary(id):
|
||||
conductor.job_binary_destroy(context.ctx(), id)
|
||||
|
||||
|
@ -21,35 +21,6 @@ from sahara.swift import utils as su
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
JOB_BINARY_SCHEMA = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"minLength": 1,
|
||||
"maxLength": 50,
|
||||
"format": "valid_name"
|
||||
},
|
||||
"description": {
|
||||
"type": "string"
|
||||
},
|
||||
"url": {
|
||||
"type": "string",
|
||||
"format": "valid_job_location"
|
||||
},
|
||||
# extra is simple_config for now because we may need not only
|
||||
# user-password it the case of external storage
|
||||
"extra": {
|
||||
"type": "simple_config",
|
||||
}
|
||||
},
|
||||
"additionalProperties": False,
|
||||
"required": [
|
||||
"name",
|
||||
"url"
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def check_job_binary(data, **kwargs):
|
||||
job_binary_location_type = data["url"]
|
||||
|
48
sahara/service/validations/edp/job_binary_schema.py
Normal file
48
sahara/service/validations/edp/job_binary_schema.py
Normal file
@ -0,0 +1,48 @@
|
||||
# Copyright (c) 2015 Red Hat Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import copy
|
||||
|
||||
JOB_BINARY_SCHEMA = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"minLength": 1,
|
||||
"maxLength": 50,
|
||||
"format": "valid_name"
|
||||
},
|
||||
"description": {
|
||||
"type": "string"
|
||||
},
|
||||
"url": {
|
||||
"type": "string",
|
||||
"format": "valid_job_location"
|
||||
},
|
||||
# extra is simple_config for now because we may need not only
|
||||
# user-password it the case of external storage
|
||||
"extra": {
|
||||
"type": "simple_config",
|
||||
}
|
||||
},
|
||||
"additionalProperties": False,
|
||||
"required": [
|
||||
"name",
|
||||
"url"
|
||||
]
|
||||
}
|
||||
|
||||
JOB_BINARY_UPDATE_SCHEMA = copy.copy(JOB_BINARY_SCHEMA)
|
||||
JOB_BINARY_UPDATE_SCHEMA["required"] = []
|
@ -86,6 +86,23 @@ SAMPLE_JOB_BINARY = {
|
||||
"url": "internal-db://test_binary",
|
||||
}
|
||||
|
||||
SAMPLE_JOB_BINARY_UPDATE = {
|
||||
"name": "updatedName",
|
||||
"url": "internal-db://updated-fake-url"
|
||||
}
|
||||
|
||||
SAMPLE_JOB_BINARY_SWIFT = {
|
||||
"tenant_id": "test_tenant",
|
||||
"name": "job_binary_test_swift",
|
||||
"description": "the description",
|
||||
"url": "swift://test_swift_url",
|
||||
}
|
||||
|
||||
SAMPLE_JOB_BINARY_SWIFT_UPDATE = {
|
||||
"name": "SwifterName",
|
||||
"url": "swift://updated-swift"
|
||||
}
|
||||
|
||||
|
||||
class DataSourceTest(test_base.ConductorManagerTestCase):
|
||||
def __init__(self, *args, **kwargs):
|
||||
@ -688,3 +705,44 @@ class JobBinaryTest(test_base.ConductorManagerTestCase):
|
||||
self.assertRaises(sa_exc.InvalidRequestError,
|
||||
self.api.job_binary_get_all,
|
||||
ctx, **{'badfield': 'somevalue'})
|
||||
|
||||
def test_job_binary_update(self):
|
||||
ctx = context.ctx()
|
||||
|
||||
original = self.api.job_binary_create(ctx, SAMPLE_JOB_BINARY_SWIFT)
|
||||
updated = self.api.job_binary_update(
|
||||
ctx, original["id"], SAMPLE_JOB_BINARY_SWIFT_UPDATE)
|
||||
# Make sure that the update did indeed succeed
|
||||
self.assertEqual(
|
||||
SAMPLE_JOB_BINARY_SWIFT_UPDATE["name"], updated["name"])
|
||||
self.assertEqual(SAMPLE_JOB_BINARY_SWIFT_UPDATE["url"], updated["url"])
|
||||
|
||||
# Make sure we do NOT update a binary in use by a PENDING job
|
||||
self._create_job_execution_ref_job_binary(ctx, original["id"])
|
||||
with testtools.ExpectedException(ex.UpdateFailedException):
|
||||
self.api.job_binary_update(
|
||||
ctx, original["id"], SAMPLE_JOB_BINARY_SWIFT_UPDATE)
|
||||
|
||||
original = self.api.job_binary_create(ctx, SAMPLE_JOB_BINARY)
|
||||
# Make sure that internal URL update fails
|
||||
with testtools.ExpectedException(ex.UpdateFailedException):
|
||||
self.api.job_binary_update(
|
||||
ctx, original["id"], SAMPLE_JOB_BINARY_UPDATE)
|
||||
|
||||
def _create_job_execution_ref_job_binary(self, ctx, jb_id):
|
||||
JOB_REF_BINARY = copy.copy(SAMPLE_JOB)
|
||||
JOB_REF_BINARY["mains"] = [jb_id]
|
||||
job = self.api.job_create(ctx, JOB_REF_BINARY)
|
||||
ds_input = self.api.data_source_create(ctx, SAMPLE_DATA_SOURCE)
|
||||
SAMPLE_DATA_OUTPUT = copy.copy(SAMPLE_DATA_SOURCE)
|
||||
SAMPLE_DATA_OUTPUT['name'] = 'output'
|
||||
ds_output = self.api.data_source_create(ctx, SAMPLE_DATA_OUTPUT)
|
||||
SAMPLE_JOB_EXECUTION['job_id'] = job['id']
|
||||
SAMPLE_JOB_EXECUTION['input_id'] = ds_input['id']
|
||||
SAMPLE_JOB_EXECUTION['output_id'] = ds_output['id']
|
||||
|
||||
self.api.job_execution_create(ctx, SAMPLE_JOB_EXECUTION)
|
||||
lst = self.api.job_execution_get_all(ctx)
|
||||
job_ex_id = lst[0]["id"]
|
||||
new_info = {"status": edp.JOB_STATUS_PENDING}
|
||||
self.api.job_execution_update(ctx, job_ex_id, {"info": new_info})
|
||||
|
@ -22,7 +22,7 @@ import testtools
|
||||
|
||||
from sahara.service.validations.edp import data_source_schema
|
||||
from sahara.service.validations.edp import job
|
||||
from sahara.service.validations.edp import job_binary
|
||||
from sahara.service.validations.edp import job_binary_schema
|
||||
from sahara.service.validations.edp import job_execution
|
||||
from sahara.utils import api_validator
|
||||
|
||||
@ -38,7 +38,7 @@ class TestJSONApiExamplesV11(testtools.TestCase):
|
||||
self._test(schema, path, formatter)
|
||||
|
||||
def test_job_binaries(self):
|
||||
schema = job_binary.JOB_BINARY_SCHEMA
|
||||
schema = job_binary_schema.JOB_BINARY_SCHEMA
|
||||
path = self.EXAMPLES_PATH % 'job-binaries'
|
||||
formatter = self._formatter("job_binary_internal_id",
|
||||
"script_binary_internal_id",
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
from sahara.service import api
|
||||
from sahara.service.validations.edp import job_binary as b
|
||||
from sahara.service.validations.edp import job_binary_schema as b_s
|
||||
from sahara.swift import utils as su
|
||||
from sahara.tests.unit.service.validation import utils as u
|
||||
|
||||
@ -23,7 +24,7 @@ class TestJobBinaryValidation(u.ValidationTestCase):
|
||||
def setUp(self):
|
||||
super(TestJobBinaryValidation, self).setUp()
|
||||
self._create_object_fun = b.check_job_binary
|
||||
self.scheme = b.JOB_BINARY_SCHEMA
|
||||
self.scheme = b_s.JOB_BINARY_SCHEMA
|
||||
api.plugin_base.setup_plugins()
|
||||
|
||||
def test_creation(self):
|
||||
|
Loading…
Reference in New Issue
Block a user