diff --git a/sahara/api/v11.py b/sahara/api/v11.py index 779f660f..9f1e93fe 100644 --- a/sahara/api/v11.py +++ b/sahara/api/v11.py @@ -122,7 +122,8 @@ def data_source_delete(data_source_id): @rest.put('/data-sources/') @acl.enforce("data-processing:data-sources:modify") @v.check_exists(api.get_data_source, 'data_source_id') -@v.validate(v_d_s_schema.DATA_SOURCE_UPDATE_SCHEMA) +@v.validate( + v_d_s_schema.DATA_SOURCE_UPDATE_SCHEMA, v_d_s.check_data_source_update) def data_source_update(data_source_id, data): return u.to_wrapped_dict(api.data_source_update, data_source_id, data) diff --git a/sahara/db/sqlalchemy/api.py b/sahara/db/sqlalchemy/api.py index 44a0545f..3414f4dc 100644 --- a/sahara/db/sqlalchemy/api.py +++ b/sahara/db/sqlalchemy/api.py @@ -771,17 +771,6 @@ def data_source_update(context, values): validate.check_tenant_for_update(context, data_source) validate.check_protected_from_update(data_source, values) - jobs = job_execution_get_all(context) - pending_jobs = [job for job in jobs if - job.info["status"] == "PENDING"] - - for job in pending_jobs: - if job.data_source_urls: - if ds_id in job.data_source_urls: - raise ex.UpdateFailedException( - _("DataSource is used in a " - "PENDING Job and can not be updated.")) - data_source.update(values) except db_exc.DBDuplicateEntry as e: raise ex.DBDuplicateEntry( diff --git a/sahara/service/validations/edp/data_source.py b/sahara/service/validations/edp/data_source.py index 13eb47b4..acd4dbd5 100644 --- a/sahara/service/validations/edp/data_source.py +++ b/sahara/service/validations/edp/data_source.py @@ -29,7 +29,10 @@ CONF = cfg.CONF def check_data_source_create(data, **kwargs): b.check_data_source_unique_name(data['name']) + _check_data_source_url(data) + +def _check_data_source_url(data): if "swift" == data["type"]: _check_swift_data_source_create(data) @@ -105,12 +108,22 @@ def _check_manila_data_source_create(data): raise ex.InvalidDataException(_("Manila url path must not be empty")) -def check_data_source_update(data, **kwargs): +def check_data_source_update(data, data_source_id): ctx = context.ctx() jobs = c.API.job_execution_get_all(ctx) - pending_jobs = [job for job in jobs if job.info.status == "PENDING"] + pending_jobs = [job for job in jobs if job.info["status"] == "PENDING"] for job in pending_jobs: - if kwargs["data_source_id"] in job.data_source_urls: + if data_source_id in job.data_source_urls: raise ex.UpdateFailedException( _("DataSource is used in a " "PENDING Job and can not be updated.")) + + if 'name' in data: + b.check_data_source_unique_name(data['name']) + + ds = c.API.data_source_get(ctx, data_source_id) + check_data = {'type': data.get('type', None) or ds.type, + 'url': data.get('url', None) or ds.url, + 'credentials': data.get( + 'credentials', None) or ds.credentials} + _check_data_source_url(check_data) diff --git a/sahara/tests/unit/conductor/manager/test_edp.py b/sahara/tests/unit/conductor/manager/test_edp.py index 50301554..c977ab1c 100644 --- a/sahara/tests/unit/conductor/manager/test_edp.py +++ b/sahara/tests/unit/conductor/manager/test_edp.py @@ -279,31 +279,6 @@ class DataSourceTest(test_base.ConductorManagerTestCase): self.assertEqual("updatedName", updated["name"]) self.assertEqual("swift://updatedFakeUrl", updated["url"]) - self._create_job_execution_ref_data_source(ctx, updated["id"]) - update_json = {"name": "FailsToupdatedName", - "url": "swift://FailsupdatedFakeUrl"} - with testtools.ExpectedException(ex.UpdateFailedException): - self.api.data_source_update(ctx, updated["id"], update_json) - - def _create_job_execution_ref_data_source(self, context, ds_id): - job = self.api.job_create(context, SAMPLE_JOB) - ds_input = self.api.data_source_create(context, SAMPLE_DATA_SOURCE) - SAMPLE_DATA_OUTPUT = copy.copy(SAMPLE_DATA_SOURCE) - SAMPLE_DATA_OUTPUT['name'] = 'output' - - SAMPLE_JOB_EXECUTION['job_id'] = job['id'] - SAMPLE_JOB_EXECUTION['input_id'] = ds_input['id'] - SAMPLE_JOB_EXECUTION['output_id'] = ds_id - SAMPLE_JOB_EXECUTION['data_source_urls'] = {ds_id: "fakeurl"} - - self.api.job_execution_create(context, SAMPLE_JOB_EXECUTION) - - lst = self.api.job_execution_get_all(context) - job_ex_id = lst[0]['id'] - - new_info = {"status": edp.JOB_STATUS_PENDING} - self.api.job_execution_update(context, job_ex_id, {'info': new_info}) - def test_ds_update_delete_when_protected(self): ctx = context.ctx() sample = copy.deepcopy(SAMPLE_DATA_SOURCE) diff --git a/sahara/tests/unit/service/validation/edp/test_data_source.py b/sahara/tests/unit/service/validation/edp/test_data_source.py index 82111604..94317577 100644 --- a/sahara/tests/unit/service/validation/edp/test_data_source.py +++ b/sahara/tests/unit/service/validation/edp/test_data_source.py @@ -29,9 +29,9 @@ SAMPLE_SWIFT_URL = "swift://1234/object" SAMPLE_SWIFT_URL_WITH_SUFFIX = "swift://1234%s/object" % su.SWIFT_URL_SUFFIX -class TestDataSourceValidation(u.ValidationTestCase): +class TestDataSourceCreateValidation(u.ValidationTestCase): def setUp(self): - super(TestDataSourceValidation, self).setUp() + super(TestDataSourceCreateValidation, self).setUp() self._create_object_fun = ds.check_data_source_create self.scheme = ds_schema.DATA_SOURCE_SCHEMA api.plugin_base.setup_plugins() @@ -340,3 +340,137 @@ class TestDataSourceValidation(u.ValidationTestCase): "description": ("correct url") } self._assert_types(data) + + +class TestDataSourceUpdateValidation(u.ValidationTestCase): + def _update_swift(self): + with testtools.ExpectedException(ex.InvalidDataException): + ds.check_data_source_update({'url': 'swift://cont/obj'}, 'ds_id') + + with testtools.ExpectedException(ex.InvalidDataException): + ds.check_data_source_update({'type': 'swift'}, 'ds_id') + + with testtools.ExpectedException(ex.InvalidCredentials): + ds.check_data_source_update( + {'type': 'swift', 'url': 'swift://cont/obj'}, 'ds_id') + + ds.check_data_source_update( + {'type': 'swift', 'url': 'swift://cont/obj', + 'credentials': {'user': 'user', 'password': 'pass'}}, 'ds_id') + + def _update_hdfs(self): + with testtools.ExpectedException(ex.InvalidDataException): + ds.check_data_source_update({'url': 'hdfs://cl/data'}, 'ds_id') + + with testtools.ExpectedException(ex.InvalidDataException): + ds.check_data_source_update({'type': 'hdfs'}, 'ds_id') + + ds.check_data_source_update( + {'url': 'hdfs://cl/data', 'type': 'hdfs'}, 'ds_id') + + def _update_maprfs(self): + with testtools.ExpectedException(ex.InvalidDataException): + ds.check_data_source_update({'type': 'maprfs'}, 'ds_id') + + with testtools.ExpectedException(ex.InvalidDataException): + ds.check_data_source_update({'url': 'maprfs://cluster'}, 'ds_id') + + ds.check_data_source_update( + {'type': 'maprfs', 'url': 'maprfs://cluster'}, 'ds_id') + + def _update_manilla(self): + with testtools.ExpectedException(ex.InvalidDataException): + ds.check_data_source_update({'type': 'manila'}, 'ds_id') + + with testtools.ExpectedException(ex.InvalidDataException): + ds.check_data_source_update( + {"url": "manila://%s/foo" % uuid.uuid4()}, 'ds_id') + + ds.check_data_source_update( + {'type': 'manila', + 'url': 'manila://%s/foo' % uuid.uuid4()}, 'ds_id') + + @mock.patch('sahara.conductor.API.job_execution_get_all') + def test_update_referenced_data_source(self, je_all): + je_all.return_value = [mock.Mock( + info={"status": "PENDING"}, + data_source_urls={"ds_id": "ds_url"})] + with testtools.ExpectedException(ex.UpdateFailedException): + ds.check_data_source_update({'name': 'ds'}, 'ds_id') + + @mock.patch('sahara.conductor.API.job_execution_get_all') + @mock.patch('sahara.conductor.API.data_source_get_all') + @mock.patch('sahara.conductor.API.data_source_get') + def test_update_data_source_name(self, je_all, ds_all, ds_get): + ds1 = mock.Mock() + ds1.name = 'ds1' + ds_all.return_value = [ds1] + ds.check_data_source_update({'name': 'ds'}, 'ds_id') + + ds1.name = 'ds' + with testtools.ExpectedException(ex.NameAlreadyExistsException): + ds.check_data_source_update({'name': 'ds'}, 'ds_id') + + @mock.patch('sahara.conductor.API.job_execution_get_all') + @mock.patch('sahara.conductor.API.data_source_get') + def test_update_data_source_swift(self, ds_get, je_all): + old_ds = mock.Mock(id='ds_id', type='swift', url='swift://cont/obj', + credentials={'user': 'user', 'password': 'pass'}) + ds_get.return_value = old_ds + + ds.check_data_source_update({'url': 'swift://cont/obj2'}, 'ds_id') + + self._update_hdfs() + self._update_maprfs() + self._update_manilla() + + with testtools.ExpectedException(ex.InvalidDataException): + ds.check_data_source_update({'url': '/tmp/file'}, 'ds_id') + + @mock.patch('sahara.conductor.API.job_execution_get_all') + @mock.patch('sahara.conductor.API.data_source_get') + def test_update_data_source_hdfs(self, ds_get, je_all): + old_ds = mock.Mock(id='ds_id', type='hdfs', url='hdfs://cl/data', + credentials={}) + ds_get.return_value = old_ds + + ds.check_data_source_update({'url': 'hdfs://cl/data1'}, 'ds_id') + + self._update_swift() + self._update_maprfs() + self._update_manilla() + + ds.check_data_source_update({'url': '/tmp/file'}, 'ds_id') + + @mock.patch('sahara.conductor.API.job_execution_get_all') + @mock.patch('sahara.conductor.API.data_source_get') + def test_update_data_source_maprfs(self, ds_get, je_all): + old_ds = mock.Mock(id='ds_id', type='maprfs', url='maprfs://cluster', + credentials={}) + ds_get.return_value = old_ds + + ds.check_data_source_update({'url': 'maprfs://cluster/data'}, 'ds_id') + + self._update_swift() + self._update_hdfs() + self._update_manilla() + + ds.check_data_source_update({'url': '/tmp/file'}, 'ds_id') + + @mock.patch('sahara.conductor.API.job_execution_get_all') + @mock.patch('sahara.conductor.API.data_source_get') + def test_update_data_source_manila(self, ds_get, je_all): + old_ds = mock.Mock(id='ds_id', type='manila', + url='manila://%s/foo' % uuid.uuid4(), + credentials={}) + ds_get.return_value = old_ds + + ds.check_data_source_update( + {'url': 'manila://%s/foo' % uuid.uuid4()}, 'ds_id') + + self._update_swift() + self._update_hdfs() + self._update_maprfs() + + with testtools.ExpectedException(ex.InvalidDataException): + ds.check_data_source_update({'url': '/tmp/file'}, 'ds_id')