From a4fb58043f6b70be8d86daf6fa1400f1bcfce8fa Mon Sep 17 00:00:00 2001 From: Sergey Reshetnyak Date: Mon, 13 Jul 2015 15:06:21 +0300 Subject: [PATCH] Add put data in HDFS for EDP testcase Now you can use hdfs as input datasource method Change-Id: I08c179e3ffb60306c90178080b45e6443d03ecb6 --- etc/scenario/sahara-ci/edp.yaml.mako | 3 ++- sahara/tests/scenario/README.rst | 16 +++++++----- sahara/tests/scenario/base.py | 34 ++++++++++++++++++++++--- sahara/tests/scenario/validation.py | 3 +++ sahara/tests/scenario_unit/test_base.py | 21 +++++++++++++++ 5 files changed, 66 insertions(+), 11 deletions(-) diff --git a/etc/scenario/sahara-ci/edp.yaml.mako b/etc/scenario/sahara-ci/edp.yaml.mako index 8a80c6df..f01cd720 100644 --- a/etc/scenario/sahara-ci/edp.yaml.mako +++ b/etc/scenario/sahara-ci/edp.yaml.mako @@ -2,7 +2,8 @@ edp_jobs_flow: hadoop_2: - type: Pig input_datasource: - type: swift + type: hdfs + hdfs_username: hadoop source: etc/edp-examples/edp-pig/trim-spaces/data/input output_datasource: type: hdfs diff --git a/sahara/tests/scenario/README.rst b/sahara/tests/scenario/README.rst index cd34fb6f..30f07406 100644 --- a/sahara/tests/scenario/README.rst +++ b/sahara/tests/scenario/README.rst @@ -349,13 +349,15 @@ Section "input_datasource" Required: type, source This section is dictionary-type. -+--------+--------+----------+-----------+---------------------------+ -| Fields | Type | Required | Default | Value | -+========+========+==========+===========+===========================+ -| type | string | True | | "swift", "hdfs", "maprfs" | -+--------+--------+----------+-----------+---------------------------+ -| source | string | True | | uri of source | -+--------+--------+----------+-----------+---------------------------+ ++---------------+--------+----------+-----------+---------------------------+ +| Fields | Type | Required | Default | Value | ++===============+========+==========+===========+===========================+ +| type | string | True | | "swift", "hdfs", "maprfs" | ++---------------+--------+----------+-----------+---------------------------+ +| hdfs_username | string | | | username for hdfs | ++---------------+--------+----------+-----------+---------------------------+ +| source | string | True | | uri of source | ++---------------+--------+----------+-----------+---------------------------+ Section "output_datasource" diff --git a/sahara/tests/scenario/base.py b/sahara/tests/scenario/base.py index a916dac2..a4893f36 100644 --- a/sahara/tests/scenario/base.py +++ b/sahara/tests/scenario/base.py @@ -198,7 +198,10 @@ class BaseTestCase(base.BaseTestCase): location = utils.rand_name(ds['destination']) if ds['type'] == 'swift': url = self._create_swift_data(location) - if ds['type'] == 'hdfs' or ds['type'] == 'maprfs': + if ds['type'] == 'hdfs': + url = self._create_hdfs_data(location, ds.get('hdfs_username', + 'oozie')) + if ds['type'] == 'maprfs': url = location return self.__create_datasource( name=utils.rand_name(name), @@ -288,6 +291,31 @@ class BaseTestCase(base.BaseTestCase): return 'swift://%s.sahara/%s' % (container, path) + def _create_hdfs_data(self, source, hdfs_username): + + def to_hex_present(string): + return "".join(map(lambda x: hex(ord(x)).replace("0x", "\\x"), + string)) + + if 'user' in source: + return source + hdfs_dir = utils.rand_name("/user/%s/data" % hdfs_username) + inst_ip = self._get_nodes_with_process()[0]["management_ip"] + self._run_command_on_node( + inst_ip, + "sudo su - -c \"hdfs dfs -mkdir -p %(path)s \" %(user)s" % { + "path": hdfs_dir, "user": hdfs_username}) + hdfs_filepath = utils.rand_name(hdfs_dir + "/file") + data = open(source).read() + self._run_command_on_node( + inst_ip, + ("echo -e \"%(data)s\" | sudo su - -c \"hdfs dfs" + " -put - %(path)s\" %(user)s") % { + "data": to_hex_present(data), + "path": hdfs_filepath, + "user": hdfs_username}) + return hdfs_filepath + def _create_internal_db_data(self, source): data = open(source).read() id = self.__create_internal_db_data(utils.rand_name('test'), data) @@ -534,11 +562,11 @@ class BaseTestCase(base.BaseTestCase): pkey=self.private_key) return ssh_session.exec_command(command) - def _get_nodes_with_process(self, process): + def _get_nodes_with_process(self, process=None): nodegroups = self.sahara.get_cluster(self.cluster_id).node_groups nodes_with_process = [] for nodegroup in nodegroups: - if process in nodegroup['node_processes']: + if not process or process in nodegroup['node_processes']: nodes_with_process.extend(nodegroup['instances']) return nodes_with_process diff --git a/sahara/tests/scenario/validation.py b/sahara/tests/scenario/validation.py index 5841d41f..2e8a7d4f 100644 --- a/sahara/tests/scenario/validation.py +++ b/sahara/tests/scenario/validation.py @@ -329,6 +329,9 @@ SCHEMA = { }, "source": { "type": "string" + }, + "hdfs_username": { + "type": "string" } }, "required": ["type", "source"], diff --git a/sahara/tests/scenario_unit/test_base.py b/sahara/tests/scenario_unit/test_base.py index 663bdb7f..c9d201f2 100644 --- a/sahara/tests/scenario_unit/test_base.py +++ b/sahara/tests/scenario_unit/test_base.py @@ -506,3 +506,24 @@ class TestBase(testtools.TestCase): "ephemeral_disk": 1, "swap_disk": 1 })) + + @mock.patch('sahara.tests.scenario.base.BaseTestCase._run_command_on_node') + @mock.patch('keystoneclient.session.Session') + def test_create_hdfs_data(self, mock_session, mock_ssh): + self.base_scenario._init_clients() + output_path = '/user/test/data/output' + self.assertEqual(output_path, + self.base_scenario._create_hdfs_data(output_path, + None)) + input_path = 'etc/edp-examples/edp-pig/trim-spaces/data/input' + with mock.patch( + 'sahara.tests.scenario.clients.SaharaClient.get_cluster', + return_value=FakeResponse(node_groups=[ + { + 'instances': [ + { + 'management_ip': 'test_ip' + }] + }])): + self.assertTrue('/user/test/data-' in ( + self.base_scenario._create_hdfs_data(input_path, 'test')))