Merge "Adding data source update validation"
This commit is contained in:
commit
e2a61e9d0b
@ -122,7 +122,8 @@ def data_source_delete(data_source_id):
|
||||
@rest.put('/data-sources/<data_source_id>')
|
||||
@acl.enforce("data-processing:data-sources:modify")
|
||||
@v.check_exists(api.get_data_source, 'data_source_id')
|
||||
@v.validate(v_d_s_schema.DATA_SOURCE_UPDATE_SCHEMA)
|
||||
@v.validate(
|
||||
v_d_s_schema.DATA_SOURCE_UPDATE_SCHEMA, v_d_s.check_data_source_update)
|
||||
def data_source_update(data_source_id, data):
|
||||
return u.to_wrapped_dict(api.data_source_update, data_source_id, data)
|
||||
|
||||
|
@ -771,17 +771,6 @@ def data_source_update(context, values):
|
||||
validate.check_tenant_for_update(context, data_source)
|
||||
validate.check_protected_from_update(data_source, values)
|
||||
|
||||
jobs = job_execution_get_all(context)
|
||||
pending_jobs = [job for job in jobs if
|
||||
job.info["status"] == "PENDING"]
|
||||
|
||||
for job in pending_jobs:
|
||||
if job.data_source_urls:
|
||||
if ds_id in job.data_source_urls:
|
||||
raise ex.UpdateFailedException(
|
||||
_("DataSource is used in a "
|
||||
"PENDING Job and can not be updated."))
|
||||
|
||||
data_source.update(values)
|
||||
except db_exc.DBDuplicateEntry as e:
|
||||
raise ex.DBDuplicateEntry(
|
||||
|
@ -29,7 +29,10 @@ CONF = cfg.CONF
|
||||
|
||||
def check_data_source_create(data, **kwargs):
|
||||
b.check_data_source_unique_name(data['name'])
|
||||
_check_data_source_url(data)
|
||||
|
||||
|
||||
def _check_data_source_url(data):
|
||||
if "swift" == data["type"]:
|
||||
_check_swift_data_source_create(data)
|
||||
|
||||
@ -105,12 +108,22 @@ def _check_manila_data_source_create(data):
|
||||
raise ex.InvalidDataException(_("Manila url path must not be empty"))
|
||||
|
||||
|
||||
def check_data_source_update(data, **kwargs):
|
||||
def check_data_source_update(data, data_source_id):
|
||||
ctx = context.ctx()
|
||||
jobs = c.API.job_execution_get_all(ctx)
|
||||
pending_jobs = [job for job in jobs if job.info.status == "PENDING"]
|
||||
pending_jobs = [job for job in jobs if job.info["status"] == "PENDING"]
|
||||
for job in pending_jobs:
|
||||
if kwargs["data_source_id"] in job.data_source_urls:
|
||||
if data_source_id in job.data_source_urls:
|
||||
raise ex.UpdateFailedException(
|
||||
_("DataSource is used in a "
|
||||
"PENDING Job and can not be updated."))
|
||||
|
||||
if 'name' in data:
|
||||
b.check_data_source_unique_name(data['name'])
|
||||
|
||||
ds = c.API.data_source_get(ctx, data_source_id)
|
||||
check_data = {'type': data.get('type', None) or ds.type,
|
||||
'url': data.get('url', None) or ds.url,
|
||||
'credentials': data.get(
|
||||
'credentials', None) or ds.credentials}
|
||||
_check_data_source_url(check_data)
|
||||
|
@ -279,31 +279,6 @@ class DataSourceTest(test_base.ConductorManagerTestCase):
|
||||
self.assertEqual("updatedName", updated["name"])
|
||||
self.assertEqual("swift://updatedFakeUrl", updated["url"])
|
||||
|
||||
self._create_job_execution_ref_data_source(ctx, updated["id"])
|
||||
update_json = {"name": "FailsToupdatedName",
|
||||
"url": "swift://FailsupdatedFakeUrl"}
|
||||
with testtools.ExpectedException(ex.UpdateFailedException):
|
||||
self.api.data_source_update(ctx, updated["id"], update_json)
|
||||
|
||||
def _create_job_execution_ref_data_source(self, context, ds_id):
|
||||
job = self.api.job_create(context, SAMPLE_JOB)
|
||||
ds_input = self.api.data_source_create(context, SAMPLE_DATA_SOURCE)
|
||||
SAMPLE_DATA_OUTPUT = copy.copy(SAMPLE_DATA_SOURCE)
|
||||
SAMPLE_DATA_OUTPUT['name'] = 'output'
|
||||
|
||||
SAMPLE_JOB_EXECUTION['job_id'] = job['id']
|
||||
SAMPLE_JOB_EXECUTION['input_id'] = ds_input['id']
|
||||
SAMPLE_JOB_EXECUTION['output_id'] = ds_id
|
||||
SAMPLE_JOB_EXECUTION['data_source_urls'] = {ds_id: "fakeurl"}
|
||||
|
||||
self.api.job_execution_create(context, SAMPLE_JOB_EXECUTION)
|
||||
|
||||
lst = self.api.job_execution_get_all(context)
|
||||
job_ex_id = lst[0]['id']
|
||||
|
||||
new_info = {"status": edp.JOB_STATUS_PENDING}
|
||||
self.api.job_execution_update(context, job_ex_id, {'info': new_info})
|
||||
|
||||
def test_ds_update_delete_when_protected(self):
|
||||
ctx = context.ctx()
|
||||
sample = copy.deepcopy(SAMPLE_DATA_SOURCE)
|
||||
|
@ -29,9 +29,9 @@ SAMPLE_SWIFT_URL = "swift://1234/object"
|
||||
SAMPLE_SWIFT_URL_WITH_SUFFIX = "swift://1234%s/object" % su.SWIFT_URL_SUFFIX
|
||||
|
||||
|
||||
class TestDataSourceValidation(u.ValidationTestCase):
|
||||
class TestDataSourceCreateValidation(u.ValidationTestCase):
|
||||
def setUp(self):
|
||||
super(TestDataSourceValidation, self).setUp()
|
||||
super(TestDataSourceCreateValidation, self).setUp()
|
||||
self._create_object_fun = ds.check_data_source_create
|
||||
self.scheme = ds_schema.DATA_SOURCE_SCHEMA
|
||||
api.plugin_base.setup_plugins()
|
||||
@ -340,3 +340,137 @@ class TestDataSourceValidation(u.ValidationTestCase):
|
||||
"description": ("correct url")
|
||||
}
|
||||
self._assert_types(data)
|
||||
|
||||
|
||||
class TestDataSourceUpdateValidation(u.ValidationTestCase):
|
||||
def _update_swift(self):
|
||||
with testtools.ExpectedException(ex.InvalidDataException):
|
||||
ds.check_data_source_update({'url': 'swift://cont/obj'}, 'ds_id')
|
||||
|
||||
with testtools.ExpectedException(ex.InvalidDataException):
|
||||
ds.check_data_source_update({'type': 'swift'}, 'ds_id')
|
||||
|
||||
with testtools.ExpectedException(ex.InvalidCredentials):
|
||||
ds.check_data_source_update(
|
||||
{'type': 'swift', 'url': 'swift://cont/obj'}, 'ds_id')
|
||||
|
||||
ds.check_data_source_update(
|
||||
{'type': 'swift', 'url': 'swift://cont/obj',
|
||||
'credentials': {'user': 'user', 'password': 'pass'}}, 'ds_id')
|
||||
|
||||
def _update_hdfs(self):
|
||||
with testtools.ExpectedException(ex.InvalidDataException):
|
||||
ds.check_data_source_update({'url': 'hdfs://cl/data'}, 'ds_id')
|
||||
|
||||
with testtools.ExpectedException(ex.InvalidDataException):
|
||||
ds.check_data_source_update({'type': 'hdfs'}, 'ds_id')
|
||||
|
||||
ds.check_data_source_update(
|
||||
{'url': 'hdfs://cl/data', 'type': 'hdfs'}, 'ds_id')
|
||||
|
||||
def _update_maprfs(self):
|
||||
with testtools.ExpectedException(ex.InvalidDataException):
|
||||
ds.check_data_source_update({'type': 'maprfs'}, 'ds_id')
|
||||
|
||||
with testtools.ExpectedException(ex.InvalidDataException):
|
||||
ds.check_data_source_update({'url': 'maprfs://cluster'}, 'ds_id')
|
||||
|
||||
ds.check_data_source_update(
|
||||
{'type': 'maprfs', 'url': 'maprfs://cluster'}, 'ds_id')
|
||||
|
||||
def _update_manilla(self):
|
||||
with testtools.ExpectedException(ex.InvalidDataException):
|
||||
ds.check_data_source_update({'type': 'manila'}, 'ds_id')
|
||||
|
||||
with testtools.ExpectedException(ex.InvalidDataException):
|
||||
ds.check_data_source_update(
|
||||
{"url": "manila://%s/foo" % uuid.uuid4()}, 'ds_id')
|
||||
|
||||
ds.check_data_source_update(
|
||||
{'type': 'manila',
|
||||
'url': 'manila://%s/foo' % uuid.uuid4()}, 'ds_id')
|
||||
|
||||
@mock.patch('sahara.conductor.API.job_execution_get_all')
|
||||
def test_update_referenced_data_source(self, je_all):
|
||||
je_all.return_value = [mock.Mock(
|
||||
info={"status": "PENDING"},
|
||||
data_source_urls={"ds_id": "ds_url"})]
|
||||
with testtools.ExpectedException(ex.UpdateFailedException):
|
||||
ds.check_data_source_update({'name': 'ds'}, 'ds_id')
|
||||
|
||||
@mock.patch('sahara.conductor.API.job_execution_get_all')
|
||||
@mock.patch('sahara.conductor.API.data_source_get_all')
|
||||
@mock.patch('sahara.conductor.API.data_source_get')
|
||||
def test_update_data_source_name(self, je_all, ds_all, ds_get):
|
||||
ds1 = mock.Mock()
|
||||
ds1.name = 'ds1'
|
||||
ds_all.return_value = [ds1]
|
||||
ds.check_data_source_update({'name': 'ds'}, 'ds_id')
|
||||
|
||||
ds1.name = 'ds'
|
||||
with testtools.ExpectedException(ex.NameAlreadyExistsException):
|
||||
ds.check_data_source_update({'name': 'ds'}, 'ds_id')
|
||||
|
||||
@mock.patch('sahara.conductor.API.job_execution_get_all')
|
||||
@mock.patch('sahara.conductor.API.data_source_get')
|
||||
def test_update_data_source_swift(self, ds_get, je_all):
|
||||
old_ds = mock.Mock(id='ds_id', type='swift', url='swift://cont/obj',
|
||||
credentials={'user': 'user', 'password': 'pass'})
|
||||
ds_get.return_value = old_ds
|
||||
|
||||
ds.check_data_source_update({'url': 'swift://cont/obj2'}, 'ds_id')
|
||||
|
||||
self._update_hdfs()
|
||||
self._update_maprfs()
|
||||
self._update_manilla()
|
||||
|
||||
with testtools.ExpectedException(ex.InvalidDataException):
|
||||
ds.check_data_source_update({'url': '/tmp/file'}, 'ds_id')
|
||||
|
||||
@mock.patch('sahara.conductor.API.job_execution_get_all')
|
||||
@mock.patch('sahara.conductor.API.data_source_get')
|
||||
def test_update_data_source_hdfs(self, ds_get, je_all):
|
||||
old_ds = mock.Mock(id='ds_id', type='hdfs', url='hdfs://cl/data',
|
||||
credentials={})
|
||||
ds_get.return_value = old_ds
|
||||
|
||||
ds.check_data_source_update({'url': 'hdfs://cl/data1'}, 'ds_id')
|
||||
|
||||
self._update_swift()
|
||||
self._update_maprfs()
|
||||
self._update_manilla()
|
||||
|
||||
ds.check_data_source_update({'url': '/tmp/file'}, 'ds_id')
|
||||
|
||||
@mock.patch('sahara.conductor.API.job_execution_get_all')
|
||||
@mock.patch('sahara.conductor.API.data_source_get')
|
||||
def test_update_data_source_maprfs(self, ds_get, je_all):
|
||||
old_ds = mock.Mock(id='ds_id', type='maprfs', url='maprfs://cluster',
|
||||
credentials={})
|
||||
ds_get.return_value = old_ds
|
||||
|
||||
ds.check_data_source_update({'url': 'maprfs://cluster/data'}, 'ds_id')
|
||||
|
||||
self._update_swift()
|
||||
self._update_hdfs()
|
||||
self._update_manilla()
|
||||
|
||||
ds.check_data_source_update({'url': '/tmp/file'}, 'ds_id')
|
||||
|
||||
@mock.patch('sahara.conductor.API.job_execution_get_all')
|
||||
@mock.patch('sahara.conductor.API.data_source_get')
|
||||
def test_update_data_source_manila(self, ds_get, je_all):
|
||||
old_ds = mock.Mock(id='ds_id', type='manila',
|
||||
url='manila://%s/foo' % uuid.uuid4(),
|
||||
credentials={})
|
||||
ds_get.return_value = old_ds
|
||||
|
||||
ds.check_data_source_update(
|
||||
{'url': 'manila://%s/foo' % uuid.uuid4()}, 'ds_id')
|
||||
|
||||
self._update_swift()
|
||||
self._update_hdfs()
|
||||
self._update_maprfs()
|
||||
|
||||
with testtools.ExpectedException(ex.InvalidDataException):
|
||||
ds.check_data_source_update({'url': '/tmp/file'}, 'ds_id')
|
||||
|
Loading…
Reference in New Issue
Block a user