Merge "Add put data in HDFS for EDP testcase"
This commit is contained in:
commit
f8e6907299
@ -2,7 +2,8 @@ edp_jobs_flow:
|
||||
hadoop_2:
|
||||
- type: Pig
|
||||
input_datasource:
|
||||
type: swift
|
||||
type: hdfs
|
||||
hdfs_username: hadoop
|
||||
source: etc/edp-examples/edp-pig/trim-spaces/data/input
|
||||
output_datasource:
|
||||
type: hdfs
|
||||
|
@ -349,13 +349,15 @@ Section "input_datasource"
|
||||
Required: type, source
|
||||
This section is dictionary-type.
|
||||
|
||||
+--------+--------+----------+-----------+---------------------------+
|
||||
+---------------+--------+----------+-----------+---------------------------+
|
||||
| Fields | Type | Required | Default | Value |
|
||||
+========+========+==========+===========+===========================+
|
||||
+===============+========+==========+===========+===========================+
|
||||
| type | string | True | | "swift", "hdfs", "maprfs" |
|
||||
+--------+--------+----------+-----------+---------------------------+
|
||||
+---------------+--------+----------+-----------+---------------------------+
|
||||
| hdfs_username | string | | | username for hdfs |
|
||||
+---------------+--------+----------+-----------+---------------------------+
|
||||
| source | string | True | | uri of source |
|
||||
+--------+--------+----------+-----------+---------------------------+
|
||||
+---------------+--------+----------+-----------+---------------------------+
|
||||
|
||||
|
||||
Section "output_datasource"
|
||||
|
@ -198,7 +198,10 @@ class BaseTestCase(base.BaseTestCase):
|
||||
location = utils.rand_name(ds['destination'])
|
||||
if ds['type'] == 'swift':
|
||||
url = self._create_swift_data(location)
|
||||
if ds['type'] == 'hdfs' or ds['type'] == 'maprfs':
|
||||
if ds['type'] == 'hdfs':
|
||||
url = self._create_hdfs_data(location, ds.get('hdfs_username',
|
||||
'oozie'))
|
||||
if ds['type'] == 'maprfs':
|
||||
url = location
|
||||
return self.__create_datasource(
|
||||
name=utils.rand_name(name),
|
||||
@ -288,6 +291,31 @@ class BaseTestCase(base.BaseTestCase):
|
||||
|
||||
return 'swift://%s.sahara/%s' % (container, path)
|
||||
|
||||
def _create_hdfs_data(self, source, hdfs_username):
|
||||
|
||||
def to_hex_present(string):
|
||||
return "".join(map(lambda x: hex(ord(x)).replace("0x", "\\x"),
|
||||
string))
|
||||
|
||||
if 'user' in source:
|
||||
return source
|
||||
hdfs_dir = utils.rand_name("/user/%s/data" % hdfs_username)
|
||||
inst_ip = self._get_nodes_with_process()[0]["management_ip"]
|
||||
self._run_command_on_node(
|
||||
inst_ip,
|
||||
"sudo su - -c \"hdfs dfs -mkdir -p %(path)s \" %(user)s" % {
|
||||
"path": hdfs_dir, "user": hdfs_username})
|
||||
hdfs_filepath = utils.rand_name(hdfs_dir + "/file")
|
||||
data = open(source).read()
|
||||
self._run_command_on_node(
|
||||
inst_ip,
|
||||
("echo -e \"%(data)s\" | sudo su - -c \"hdfs dfs"
|
||||
" -put - %(path)s\" %(user)s") % {
|
||||
"data": to_hex_present(data),
|
||||
"path": hdfs_filepath,
|
||||
"user": hdfs_username})
|
||||
return hdfs_filepath
|
||||
|
||||
def _create_internal_db_data(self, source):
|
||||
data = open(source).read()
|
||||
id = self.__create_internal_db_data(utils.rand_name('test'), data)
|
||||
@ -534,11 +562,11 @@ class BaseTestCase(base.BaseTestCase):
|
||||
pkey=self.private_key)
|
||||
return ssh_session.exec_command(command)
|
||||
|
||||
def _get_nodes_with_process(self, process):
|
||||
def _get_nodes_with_process(self, process=None):
|
||||
nodegroups = self.sahara.get_cluster(self.cluster_id).node_groups
|
||||
nodes_with_process = []
|
||||
for nodegroup in nodegroups:
|
||||
if process in nodegroup['node_processes']:
|
||||
if not process or process in nodegroup['node_processes']:
|
||||
nodes_with_process.extend(nodegroup['instances'])
|
||||
return nodes_with_process
|
||||
|
||||
|
@ -328,6 +328,9 @@ SCHEMA = {
|
||||
},
|
||||
"source": {
|
||||
"type": "string"
|
||||
},
|
||||
"hdfs_username": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": ["type", "source"],
|
||||
|
@ -506,3 +506,24 @@ class TestBase(testtools.TestCase):
|
||||
"ephemeral_disk": 1,
|
||||
"swap_disk": 1
|
||||
}))
|
||||
|
||||
@mock.patch('sahara.tests.scenario.base.BaseTestCase._run_command_on_node')
|
||||
@mock.patch('keystoneclient.session.Session')
|
||||
def test_create_hdfs_data(self, mock_session, mock_ssh):
|
||||
self.base_scenario._init_clients()
|
||||
output_path = '/user/test/data/output'
|
||||
self.assertEqual(output_path,
|
||||
self.base_scenario._create_hdfs_data(output_path,
|
||||
None))
|
||||
input_path = 'etc/edp-examples/edp-pig/trim-spaces/data/input'
|
||||
with mock.patch(
|
||||
'sahara.tests.scenario.clients.SaharaClient.get_cluster',
|
||||
return_value=FakeResponse(node_groups=[
|
||||
{
|
||||
'instances': [
|
||||
{
|
||||
'management_ip': 'test_ip'
|
||||
}]
|
||||
}])):
|
||||
self.assertTrue('/user/test/data-' in (
|
||||
self.base_scenario._create_hdfs_data(input_path, 'test')))
|
||||
|
Loading…
Reference in New Issue
Block a user