From d05bf66592cc5f31da2b54720d2c8cd9621d90c0 Mon Sep 17 00:00:00 2001 From: Jeremy Freudberg Date: Thu, 16 Nov 2017 06:18:44 +0000 Subject: [PATCH] S3 job binary and binary retriever * Create common module for managing S3 job binaries * Add new dependency on botocore * Use common S3 library to create S3 job binary type for EDP * Use common S3 library to create S3 job binary retriever * Support storing S3 secret key in Castellan * Document new job binary type (and foreshadow the S3 data source type) * Unit tests for new code Change-Id: I6781203d802305446ba1418ed6999186db4dfe9b Partially-Implements: bp sahara-support-s3 --- doc/source/user/edp.rst | 19 +++- ...upport-s3-job-binary-6d91267ae11d09d3.yaml | 3 + requirements.txt | 1 + sahara/conductor/manager.py | 41 ++++--- sahara/exceptions.py | 10 ++ .../service/edp/binary_retrievers/dispatch.py | 5 + .../edp/binary_retrievers/s3_storage.py | 19 ++++ sahara/service/edp/job_binaries/opts.py | 2 +- .../service/edp/job_binaries/s3/__init__.py | 0 .../edp/job_binaries/s3/implementation.py | 51 +++++++++ sahara/service/edp/s3_common.py | 86 +++++++++++++++ .../edp/binary_retrievers/test_dispatch.py | 8 +- .../service/edp/job_binaries/s3/__init__.py | 0 .../edp/job_binaries/s3/test_s3_type.py | 69 ++++++++++++ .../tests/unit/service/edp/test_s3_common.py | 103 ++++++++++++++++++ setup.cfg | 1 + 16 files changed, 398 insertions(+), 20 deletions(-) create mode 100644 releasenotes/notes/support-s3-job-binary-6d91267ae11d09d3.yaml create mode 100644 sahara/service/edp/binary_retrievers/s3_storage.py create mode 100644 sahara/service/edp/job_binaries/s3/__init__.py create mode 100644 sahara/service/edp/job_binaries/s3/implementation.py create mode 100644 sahara/service/edp/s3_common.py create mode 100644 sahara/tests/unit/service/edp/job_binaries/s3/__init__.py create mode 100644 sahara/tests/unit/service/edp/job_binaries/s3/test_s3_type.py create mode 100644 sahara/tests/unit/service/edp/test_s3_common.py diff --git a/doc/source/user/edp.rst b/doc/source/user/edp.rst index 65a786a302..183da48745 100644 --- a/doc/source/user/edp.rst +++ b/doc/source/user/edp.rst @@ -12,8 +12,8 @@ of jobs on clusters created from sahara. EDP supports: * Spark jobs on Spark standalone clusters, MapR (v5.0.0 - v5.2.0) clusters, Vanilla clusters (v2.7.1) and CDH clusters (v5.3.0 or higher). * storage of job binaries in the OpenStack Object Storage service (swift), - the OpenStack Shared file systems service (manila), or sahara's own - database + the OpenStack Shared file systems service (manila), sahara's own database, + or any S3-like object store * access to input and output data sources in + HDFS for all job types @@ -66,6 +66,11 @@ swift unless swift proxy users are configured as described in :doc:`../admin/advanced-configuration-guide`. The swift service must be running in the same OpenStack installation referenced by sahara. +Sahara requires the following credentials/configs to access files stored in an +S3-like object store: ``accesskey``, ``secretkey``, ``endpoint``. +These credentials are specified through the `extra` in the body of the request +when creating a job binary or data source referencing S3. + To reference a binary file stored in manila, create the job binary with the URL ``manila://{share_id}/{path}``. This assumes that you have already stored that file in the appropriate path on the share. The share will be @@ -581,9 +586,9 @@ estimating Pi. Special Sahara URLs ------------------- -Sahara uses custom URLs to refer to objects stored in swift, in manila, or in -the sahara internal database. These URLs are not meant to be used outside of -sahara. +Sahara uses custom URLs to refer to objects stored in swift, in manila, in the +sahara internal database, or in S3-like storage. These URLs are usually not +meant to be used outside of sahara. Sahara swift URLs passed to running jobs as input or output sources include a ".sahara" suffix on the container, for example: @@ -611,6 +616,10 @@ Manila NFS filesystem reference URLS take the form: This format should be used when referring to a job binary or a data source stored in a manila NFS share. +For job binaries only, S3 urls take the form: + +``s3://bucket/path/to/object`` + EDP Requirements ================ diff --git a/releasenotes/notes/support-s3-job-binary-6d91267ae11d09d3.yaml b/releasenotes/notes/support-s3-job-binary-6d91267ae11d09d3.yaml new file mode 100644 index 0000000000..790c3175f1 --- /dev/null +++ b/releasenotes/notes/support-s3-job-binary-6d91267ae11d09d3.yaml @@ -0,0 +1,3 @@ +--- +features: + - An EDP job binary may reference a file stored in a S3-like object store. diff --git a/requirements.txt b/requirements.txt index 9aaf2daa15..009e73d1e3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,7 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0 alembic>=0.8.10 # MIT Babel!=2.4.0,>=2.3.4 # BSD +botocore>=1.5.1 # Apache-2.0 castellan>=0.16.0 # Apache-2.0 eventlet!=0.18.3,!=0.20.1,<0.21.0,>=0.18.2 # MIT Flask!=0.11,<1.0,>=0.10 # BSD diff --git a/sahara/conductor/manager.py b/sahara/conductor/manager.py index 7ab04d1f2c..5405cb4ee0 100644 --- a/sahara/conductor/manager.py +++ b/sahara/conductor/manager.py @@ -609,6 +609,9 @@ class ConductorManager(db_base.Base): if values.get('extra') and values['extra'].get('password'): values['extra']['password'] = key_manager.store_secret( values['extra']['password'], context) + if values.get('extra') and values['extra'].get('secretkey'): + values['extra']['secretkey'] = key_manager.store_secret( + values['extra']['secretkey'], context) return self.db.job_binary_create(context, values) def job_binary_destroy(self, context, job_binary): @@ -617,11 +620,16 @@ class ConductorManager(db_base.Base): # in cases where the credentials to access the job binary are # stored with the record and the external key manager is being # used, we need to delete the key from the external manager. - if (CONF.use_barbican_key_manager and not - CONF.use_domain_for_proxy_users): + if CONF.use_barbican_key_manager: jb_record = self.job_binary_get(context, job_binary) - if jb_record.get('extra') and jb_record['extra'].get('password'): - key_manager.delete_secret(jb_record['extra']['password'], + if not CONF.use_domain_for_proxy_users: + if (jb_record.get('extra') and + jb_record['extra'].get('password')): + key_manager.delete_secret(jb_record['extra']['password'], + context) + if (jb_record.get('extra') and + jb_record['extra'].get('secretkey')): + key_manager.delete_secret(jb_record['extra']['secretkey'], context) self.db.job_binary_destroy(context, job_binary) @@ -637,18 +645,25 @@ class ConductorManager(db_base.Base): # the previous key and check to see if it has changed, but it # seems less expensive to just delete the old and create a new # one. - if (CONF.use_barbican_key_manager and not - CONF.use_domain_for_proxy_users): + if CONF.use_barbican_key_manager: # first we retrieve the original record to get the old key # uuid, and delete it. - jb_record = self.job_binary_get(context, id) - if jb_record.get('extra') and jb_record['extra'].get('password'): - key_manager.delete_secret(jb_record['extra']['password'], - context) # next we create the new key. - if values.get('extra') and values['extra'].get('password'): - values['extra']['password'] = key_manager.store_secret( - values['extra']['password'], context) + jb_record = self.job_binary_get(context, id) + if not CONF.use_domain_for_proxy_users: + if (jb_record.get('extra') and + jb_record['extra'].get('password')): + key_manager.delete_secret(jb_record['extra']['password'], + context) + if values.get('extra') and values['extra'].get('password'): + values['extra']['password'] = key_manager.store_secret( + values['extra']['password'], context) + if jb_record.get('extra') and jb_record['extra'].get('secretkey'): + key_manager.delete_secret(jb_record['extra']['secretkey'], + context) + if values.get('extra') and values['extra'].get('secretkey'): + values['extra']['secretkey'] = key_manager.store_secret( + values['extra']['secretkey'], context) return self.db.job_binary_update(context, values) # JobBinaryInternal ops diff --git a/sahara/exceptions.py b/sahara/exceptions.py index 0792b95f81..a7506570af 100644 --- a/sahara/exceptions.py +++ b/sahara/exceptions.py @@ -198,6 +198,16 @@ class SwiftClientException(SaharaException): message = _("An error has occurred while performing a request to Swift") +class S3ClientException(SaharaException): + '''General wrapper object for boto exceptions + + Intended to replace any errors raised by the botocore client. + ''' + + code = "S3_CLIENT_EXCEPTION" + message = _("An error has occurred while performing a request to S3") + + class DataTooBigException(SaharaException): code = "DATA_TOO_BIG" message_template = _("Size of data (%(size)s) is greater than maximum " diff --git a/sahara/service/edp/binary_retrievers/dispatch.py b/sahara/service/edp/binary_retrievers/dispatch.py index c5772e19f0..4053482956 100644 --- a/sahara/service/edp/binary_retrievers/dispatch.py +++ b/sahara/service/edp/binary_retrievers/dispatch.py @@ -16,7 +16,9 @@ from sahara import context from sahara.service.edp.binary_retrievers import internal_swift as i_swift from sahara.service.edp.binary_retrievers import manila_share as manila +from sahara.service.edp.binary_retrievers import s3_storage as s3 from sahara.service.edp.binary_retrievers import sahara_db as db +from sahara.service.edp import s3_common from sahara.swift import utils as su from sahara.utils.openstack import manila as m @@ -42,6 +44,9 @@ def get_raw_binary(job_binary, proxy_configs=None, if url.startswith("internal-db://"): res = db.get_raw_data(context.ctx(), job_binary) + if url.startswith(s3_common.S3_JB_PREFIX): + res = s3.get_raw_data(job_binary) + if url.startswith(su.SWIFT_INTERNAL_PREFIX): if with_context: res = i_swift.get_raw_data_with_context(job_binary) diff --git a/sahara/service/edp/binary_retrievers/s3_storage.py b/sahara/service/edp/binary_retrievers/s3_storage.py new file mode 100644 index 0000000000..b4fdfbaa6b --- /dev/null +++ b/sahara/service/edp/binary_retrievers/s3_storage.py @@ -0,0 +1,19 @@ +# Copyright (c) 2017 Massachusetts Open Cloud +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from sahara.service.edp import s3_common + + +def get_raw_data(job_binary): + return s3_common.get_raw_job_binary_data(job_binary) diff --git a/sahara/service/edp/job_binaries/opts.py b/sahara/service/edp/job_binaries/opts.py index 6d07b7c822..f077732df9 100644 --- a/sahara/service/edp/job_binaries/opts.py +++ b/sahara/service/edp/job_binaries/opts.py @@ -19,7 +19,7 @@ from oslo_config import cfg opts = [ cfg.ListOpt('job_binary_types', - default=['swift', 'manila', 'internal-db'], + default=['swift', 'manila', 'internal-db', 's3'], help='List of job binary types to be loaded. Sahara ' 'preserves the order of the list when returning it.'), ] diff --git a/sahara/service/edp/job_binaries/s3/__init__.py b/sahara/service/edp/job_binaries/s3/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sahara/service/edp/job_binaries/s3/implementation.py b/sahara/service/edp/job_binaries/s3/implementation.py new file mode 100644 index 0000000000..56a0704059 --- /dev/null +++ b/sahara/service/edp/job_binaries/s3/implementation.py @@ -0,0 +1,51 @@ +# Copyright (c) 2017 Massachusetts Open Cloud +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import six +import six.moves.urllib.parse as urlparse + +import sahara.exceptions as ex +from sahara.i18n import _ +from sahara.service.edp.job_binaries.base import JobBinaryType +from sahara.service.edp import s3_common + + +class S3Type(JobBinaryType): + def copy_binary_to_cluster(self, job_binary, **kwargs): + r = kwargs.pop('remote') + + dst = self._generate_valid_path(job_binary) + raw = self.get_raw_data(job_binary) + + r.write_file_to(dst, raw) + return dst + + def validate_job_location_format(self, url): + url = urlparse.urlparse(url) + return url.scheme == "s3" and url.hostname + + def validate(self, data, **kwargs): + # We only check on create, not update + if not kwargs.get('job_binary_id', None): + s3_common._validate_job_binary_url(data['url']) + extra = data.get("extra", {}) + if (six.viewkeys(extra) != + {"accesskey", "secretkey", "endpoint"}): + raise ex.InvalidDataException( + _("Configs 'accesskey', 'secretkey', and 'endpoint'" + " must be provided.")) + + def get_raw_data(self, job_binary, **kwargs): + return s3_common.get_raw_job_binary_data(job_binary) diff --git a/sahara/service/edp/s3_common.py b/sahara/service/edp/s3_common.py new file mode 100644 index 0000000000..4b6e532bdf --- /dev/null +++ b/sahara/service/edp/s3_common.py @@ -0,0 +1,86 @@ +# Copyright 2017 Massachusetts Open Cloud +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import botocore.exceptions +import botocore.session +from oslo_config import cfg +import six + +import sahara.exceptions as ex +from sahara.i18n import _ +from sahara.service.castellan import utils as key_manager + +S3_JB_PREFIX = "s3://" +CONF = cfg.CONF + + +def _get_s3_client(extra): + sess = botocore.session.get_session() + secretkey = key_manager.get_secret(extra['secretkey']) + return sess.create_client( + 's3', + # TODO(jfreud): investigate region name support + region_name=None, + # TODO(jfreud): investigate configurable verify + verify=False, + endpoint_url=extra['endpoint'], + aws_access_key_id=extra['accesskey'], + aws_secret_access_key=secretkey + ) + + +def _get_names_from_job_binary_url(url): + parse = six.moves.urllib.parse.urlparse(url) + return (parse.netloc + parse.path).split('/', 1) + + +def _get_raw_job_binary_data(job_binary, conn): + names = _get_names_from_job_binary_url(job_binary.url) + bucket, obj = names + try: + size = conn.head_object(Bucket=bucket, Key=obj)['ContentLength'] + # We have bytes, but want kibibytes: + total_KB = size / 1024.0 + if total_KB > CONF.job_binary_max_KB: + raise ex.DataTooBigException( + round(total_KB, 1), CONF.job_binary_max_KB, + _("Size of S3 object (%(size)sKB) is greater " + "than maximum (%(maximum)sKB)")) + body = conn.get_object(Bucket=bucket, Key=obj)['Body'].read() + except ex.DataTooBigException: + raise + except Exception: + raise ex.S3ClientException("Couldn't get object from s3") + return body + + +def _validate_job_binary_url(job_binary_url): + if not job_binary_url.startswith(S3_JB_PREFIX): + # Sanity check + raise ex.BadJobBinaryException( + _("URL for binary in S3 must start with %s") % S3_JB_PREFIX) + names = _get_names_from_job_binary_url(job_binary_url) + if len(names) == 1: + # we have a bucket instead of an individual object + raise ex.BadJobBinaryException( + _("URL for binary in S3 must specify an object not a bucket")) + + +def get_raw_job_binary_data(job_binary): + _validate_job_binary_url(job_binary.url) + try: + conn = _get_s3_client(job_binary.extra) + except Exception: + raise ex.S3ClientException("Couldn't create boto client") + return _get_raw_job_binary_data(job_binary, conn) diff --git a/sahara/tests/unit/service/edp/binary_retrievers/test_dispatch.py b/sahara/tests/unit/service/edp/binary_retrievers/test_dispatch.py index ba25384a6e..3537276a2c 100644 --- a/sahara/tests/unit/service/edp/binary_retrievers/test_dispatch.py +++ b/sahara/tests/unit/service/edp/binary_retrievers/test_dispatch.py @@ -23,6 +23,7 @@ class TestDispatch(base.SaharaTestCase): def setUp(self): super(TestDispatch, self).setUp() + @mock.patch('sahara.service.edp.s3_common.get_raw_job_binary_data') @mock.patch('sahara.service.edp.binary_retrievers.' 'manila_share.get_file_info') @mock.patch( @@ -33,7 +34,8 @@ class TestDispatch(base.SaharaTestCase): @mock.patch('sahara.service.edp.binary_retrievers.sahara_db.get_raw_data') @mock.patch('sahara.context.ctx') def test_get_raw_binary(self, ctx, db_get_raw_data, i_s_get_raw_data, - i_s_get_raw_data_with_context, m_s_get_file_info): + i_s_get_raw_data_with_context, m_s_get_file_info, + s3_get_raw_jb_data): ctx.return_value = mock.Mock() job_binary = mock.Mock() @@ -58,3 +60,7 @@ class TestDispatch(base.SaharaTestCase): remote.instance.node_group.shares = [] dispatch.get_raw_binary(job_binary, remote=remote) self.assertEqual(1, m_s_get_file_info.call_count) + + job_binary.url = 's3://bucket/object.jar' + dispatch.get_raw_binary(job_binary) + self.assertEqual(1, s3_get_raw_jb_data.call_count) diff --git a/sahara/tests/unit/service/edp/job_binaries/s3/__init__.py b/sahara/tests/unit/service/edp/job_binaries/s3/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sahara/tests/unit/service/edp/job_binaries/s3/test_s3_type.py b/sahara/tests/unit/service/edp/job_binaries/s3/test_s3_type.py new file mode 100644 index 0000000000..07cc4d530e --- /dev/null +++ b/sahara/tests/unit/service/edp/job_binaries/s3/test_s3_type.py @@ -0,0 +1,69 @@ +# Copyright (c) 2017 Massachusetts Open Cloud +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +import testtools + +from sahara import exceptions as ex +from sahara.service.edp.job_binaries.s3.implementation import S3Type +from sahara.tests.unit import base + + +class TestS3Type(base.SaharaTestCase): + + def setUp(self): + super(TestS3Type, self).setUp() + self.i_s = S3Type() + + @mock.patch('sahara.service.edp.job_binaries.s3.implementation.S3Type.' + 'get_raw_data') + def test_copy_binary_to_cluster(self, get_raw_data): + remote = mock.Mock() + job_binary = mock.Mock() + job_binary.name = 'test' + job_binary.url = 's3://somebinary' + get_raw_data.return_value = 'test' + + res = self.i_s.copy_binary_to_cluster(job_binary, + remote=remote) + + self.assertEqual('/tmp/test', res) + remote.write_file_to.assert_called_with( + '/tmp/test', + 'test') + + def test_validate_job_location_format(self): + self.assertTrue( + self.i_s.validate_job_location_format("s3://temp/temp")) + self.assertFalse( + self.i_s.validate_job_location_format("s4://temp/temp")) + self.assertFalse(self.i_s.validate_job_location_format("s3:///")) + + def test_validate(self): + data = {"extra": {}, "url": "s3://temp/temp"} + with testtools.ExpectedException(ex.InvalidDataException): + self.i_s.validate(data) + data["extra"] = {"accesskey": "a", + "secretkey": "s", + "endpoint": "e"} + self.i_s.validate(data) + data["extra"].pop("accesskey") + with testtools.ExpectedException(ex.InvalidDataException): + self.i_s.validate(data) + + @mock.patch('sahara.service.edp.s3_common.get_raw_job_binary_data') + def test_get_raw_data(self, s3_get_raw_jbd): + self.i_s.get_raw_data('a job binary') + self.assertEqual(1, s3_get_raw_jbd.call_count) diff --git a/sahara/tests/unit/service/edp/test_s3_common.py b/sahara/tests/unit/service/edp/test_s3_common.py new file mode 100644 index 0000000000..002941f3ea --- /dev/null +++ b/sahara/tests/unit/service/edp/test_s3_common.py @@ -0,0 +1,103 @@ +# Copyright (c) 2017 Massachusetts Open Cloud +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import mock +import testtools + +from sahara import exceptions as ex +from sahara.service.edp import s3_common +from sahara.tests.unit import base + + +class FakeJB(object): + extra = {"accesskey": "access", + "secretkey": "my-secret", + "endpoint": "pointy-end"} + url = "s3://temp/temp" + + +class S3CommonTestCase(base.SaharaTestCase): + + @mock.patch("botocore.session.Session.create_client") + @mock.patch("sahara.service.castellan.utils.get_secret") + def test__get_s3_client(self, cast, boto): + cast.return_value = "the-actual-password" + je = FakeJB().extra + s3_common._get_s3_client(je) + args = ('s3', None, False, je['endpoint'], je['accesskey'], + 'the-actual-password') + boto.called_once_with(*args) + + def test__get_names_from_job_binary_url(self): + self.assertEqual( + s3_common._get_names_from_job_binary_url("s3://buck"), ["buck"]) + self.assertEqual( + s3_common._get_names_from_job_binary_url("s3://buck/obj"), + ["buck", "obj"]) + self.assertEqual( + s3_common._get_names_from_job_binary_url("s3://buck/dir/obj"), + ["buck", "dir/obj"]) + + def test__get_raw_job_binary_data(self): + jb = mock.Mock() + jb.url = "s3://bucket/object" + boto_conn = mock.Mock() + boto_conn.head_object = mock.Mock() + boto_conn.get_object = mock.Mock() + self.override_config('job_binary_max_KB', 1) + + boto_conn.head_object.return_value = {"ContentLength": 1025} + self.assertRaises(ex.DataTooBigException, + s3_common._get_raw_job_binary_data, + jb, boto_conn) + + reader = mock.Mock() + reader.read = lambda: "the binary" + boto_conn.get_object.return_value = {"Body": reader} + + boto_conn.head_object.return_value = {"ContentLength": 1024} + s3_common._get_raw_job_binary_data(jb, boto_conn) + + self.assertEqual(s3_common._get_raw_job_binary_data(jb, boto_conn), + "the binary") + + def _raiser(): + raise ValueError + reader.read = _raiser + self.assertRaises(ex.S3ClientException, + s3_common._get_raw_job_binary_data, + jb, boto_conn) + + def test__validate_job_binary_url(self): + jb_url = "s3://bucket/object" + s3_common._validate_job_binary_url(jb_url) + jb_url = "s4://bucket/object" + with testtools.ExpectedException(ex.BadJobBinaryException): + s3_common._validate_job_binary_url(jb_url) + jb_url = "s3://bucket" + with testtools.ExpectedException(ex.BadJobBinaryException): + s3_common._validate_job_binary_url(jb_url) + + @mock.patch("sahara.service.edp.s3_common._get_raw_job_binary_data") + @mock.patch("sahara.service.edp.s3_common._get_s3_client") + @mock.patch("sahara.service.edp.s3_common._validate_job_binary_url") + def test_get_raw_job_binary_data(self, validate_jbu, get_s3cl, get_rjbd): + get_s3cl.return_value = "this would have been boto" + jb = FakeJB() + s3_common.get_raw_job_binary_data(jb) + validate_jbu.assert_called_once_with(jb.url) + get_s3cl.assert_called_once_with(jb.extra) + get_rjbd.assert_called_once_with(jb, "this would have been boto") diff --git a/setup.cfg b/setup.cfg index b215df47ce..6d31ac5ea8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -64,6 +64,7 @@ sahara.job_binary.types = internal-db = sahara.service.edp.job_binaries.internal_db.implementation:InternalDBType manila = sahara.service.edp.job_binaries.manila.implementation:ManilaType swift = sahara.service.edp.job_binaries.swift.implementation:SwiftType + s3 = sahara.service.edp.job_binaries.s3.implementation:S3Type sahara.infrastructure.engine = heat = sahara.service.heat.heat_engine:HeatEngine