reimplement oozie client as abstract

this change reimplements oozie client to support running
of edp jobs on top of the kerberized clusters.

additionally, we will setup keytabs and principals in order
to have ability to process authentication in clients.
these principals will be refreshed by additional cron
jobs once per hour.

implements bp: initial-kerberos-integration
Change-Id: I55b4267c09c43fdef75a443410f4ae11f56127e1
This commit is contained in:
Vitaly Gridnev 2016-08-17 16:00:51 +03:00
parent 1779654d8d
commit 5120dd7060
19 changed files with 270 additions and 54 deletions

View File

@ -0,0 +1,5 @@
---
features:
- Kerberos support implemented for Cloudera and Ambari
plugins. New oozie client implemented to support authentication
for oozie in kerberized cluster.

View File

@ -657,3 +657,21 @@ def add_hadoop_swift_jar(instances):
len(instances))
for inst in instances:
_add_hadoop_swift_jar(inst, new_jar)
def deploy_kerberos_principals(cluster, instances=None):
if not kerberos.is_kerberos_security_enabled(cluster):
return
if instances is None:
instances = plugin_utils.get_instances(cluster)
mapper = {
'hdfs': plugin_utils.instances_with_services(
instances, [p_common.SECONDARY_NAMENODE, p_common.NAMENODE,
p_common.DATANODE, p_common.JOURNAL_NODE]),
'spark': plugin_utils.instances_with_services(
instances, [p_common.SPARK_JOBHISTORYSERVER]),
'oozie': plugin_utils.instances_with_services(
instances, [p_common.OOZIE_SERVER]),
}
kerberos.create_keytabs_for_map(cluster, mapper)

View File

@ -17,6 +17,7 @@ from sahara import exceptions as exc
from sahara.i18n import _
from sahara.plugins.ambari import common as p_common
from sahara.plugins import exceptions as pex
from sahara.plugins import kerberos
from sahara.plugins import utils as plugin_utils
from sahara.service.edp import hdfs_helper
from sahara.service.edp.oozie import engine as oozie_engine
@ -45,6 +46,11 @@ class EDPOozieEngine(oozie_engine.OozieJobEngine):
def get_hdfs_user(self):
return "oozie"
def get_client(self):
if kerberos.is_kerberos_security_enabled(self.cluster):
return super(EDPOozieEngine, self).get_remote_client()
return super(EDPOozieEngine, self).get_client()
def create_hdfs_dir(self, remote, dir_name):
hdfs_helper.create_dir_hadoop2(remote, dir_name, self.get_hdfs_user())

View File

@ -100,6 +100,7 @@ class AmbariPluginProvider(p.ProvisioningPluginBase):
swift_helper.install_ssl_certs(cluster_instances)
deploy.add_hadoop_swift_jar(cluster_instances)
deploy.prepare_hive(cluster)
deploy.deploy_kerberos_principals(cluster)
def _set_cluster_info(self, cluster):
ambari_ip = plugin_utils.get_instance(
@ -194,6 +195,7 @@ class AmbariPluginProvider(p.ProvisioningPluginBase):
deploy.configure_rack_awareness(cluster, instances)
swift_helper.install_ssl_certs(instances)
deploy.add_hadoop_swift_jar(instances)
deploy.deploy_kerberos_principals(cluster, instances)
def decommission_nodes(self, cluster, instances):
deploy.decommission_hosts(cluster, instances)

View File

@ -18,14 +18,17 @@ def setup_kerberos_for_cluster(cluster, cloudera_utils):
if kerberos.is_kerberos_security_enabled(cluster):
manager = cloudera_utils.pu.get_manager(cluster)
kerberos.deploy_infrastructure(cluster, manager)
cloudera_utils.full_cluster_stop(cluster)
kerberos.prepare_policy_files(cluster)
cloudera_utils.push_kerberos_configs(cluster)
cloudera_utils.full_cluster_start(cluster)
kerberos.create_keytabs_for_map(
cluster,
{'hdfs': cloudera_utils.pu.get_hdfs_nodes(cluster),
'spark': [cloudera_utils.pu.get_spark_historyserver(cluster)]})
def prepare_scaling_kerberized_cluster(cluster, cloudera_utils):
def prepare_scaling_kerberized_cluster(cluster, cloudera_utils, instances):
if kerberos.is_kerberos_security_enabled(cluster):
server = None
if not kerberos.using_existing_kdc(cluster):
@ -34,3 +37,6 @@ def prepare_scaling_kerberized_cluster(cluster, cloudera_utils):
kerberos.prepare_policy_files(cluster)
# manager can correctly handle updating configs
cloudera_utils.push_kerberos_configs(cluster)
kerberos.create_keytabs_for_map(
cluster,
{'hdfs': cloudera_utils.pu.get_hdfs_nodes(cluster, instances)})

View File

@ -16,6 +16,7 @@
from sahara import exceptions as ex
from sahara.i18n import _
from sahara.plugins import exceptions as pl_ex
from sahara.plugins import kerberos
from sahara.plugins import utils as u
from sahara.service.edp import hdfs_helper
from sahara.service.edp.oozie import engine as edp_engine
@ -29,6 +30,11 @@ class EdpOozieEngine(edp_engine.OozieJobEngine):
# will be defined in derived classes
self.cloudera_utils = None
def get_client(self):
if kerberos.is_kerberos_security_enabled(self.cluster):
return super(EdpOozieEngine, self).get_remote_client()
return super(EdpOozieEngine, self).get_client()
def get_hdfs_user(self):
return 'hdfs'

View File

@ -116,6 +116,12 @@ class AbstractPluginUtils(object):
def get_datanodes(self, cluster):
return u.get_instances(cluster, 'HDFS_DATANODE')
def get_hdfs_nodes(self, cluster, instances=None):
instances = instances if instances else u.get_instances(cluster)
return u.instances_with_services(
instances, ["HDFS_DATANODE", "HDFS_NAMENODE",
"HDFS_SECONDARYNAMENODE"])
def get_secondarynamenode(self, cluster):
return u.get_instance(cluster, 'HDFS_SECONDARYNAMENODE')

View File

@ -407,7 +407,11 @@ class ClouderaUtilsV550(cu.ClouderaUtils):
},
'NODEMANAGER': {
'yarn_nodemanager_local_dirs':
get_hadoop_dirs(paths, '/yarn/local')
get_hadoop_dirs(paths, '/yarn/local'),
'container_executor_allowed_system_users':
"nobody,impala,hive,llama,hdfs,yarn,mapred,"
"spark,oozie",
"container_executor_banned_users": "bin"
},
'SERVER': {
'maxSessionTimeout': 60000

View File

@ -110,7 +110,8 @@ def scale_cluster(cluster, instances):
CU.configure_rack_awareness(cluster)
CU.configure_instances(instances, cluster)
CU.update_configs(instances)
common_deploy.prepare_scaling_kerberized_cluster(cluster, CU)
common_deploy.prepare_scaling_kerberized_cluster(
cluster, CU, instances)
CU.pu.configure_swift(cluster, instances)
_start_roles(cluster, instances)

View File

@ -406,7 +406,11 @@ class ClouderaUtilsV570(cu.ClouderaUtils):
},
'NODEMANAGER': {
'yarn_nodemanager_local_dirs':
get_hadoop_dirs(paths, '/yarn/local')
get_hadoop_dirs(paths, '/yarn/local'),
'container_executor_allowed_system_users':
"nobody,impala,hive,llama,hdfs,yarn,mapred,"
"spark,oozie",
"container_executor_banned_users": "bin"
},
'SERVER': {
'maxSessionTimeout': 60000

View File

@ -110,7 +110,8 @@ def scale_cluster(cluster, instances):
CU.configure_rack_awareness(cluster)
CU.configure_instances(instances, cluster)
CU.update_configs(instances)
common_deploy.prepare_scaling_kerberized_cluster(cluster, CU)
common_deploy.prepare_scaling_kerberized_cluster(cluster, CU,
instances)
CU.pu.configure_swift(cluster, instances)
_start_roles(cluster, instances)

View File

@ -343,3 +343,57 @@ def _prepare_policy_files(instance, remote_url):
r.execute_command(
'sudo cp %s/*.jar %s/lib/security/'
% (POLICY_FILES_DIR, java_home))
def _get_script_for_user_creation(cluster, instance, user):
data = files.get_file_text(
'plugins/resources/create-principal-keytab')
cron_file = files.get_file_text('plugins/resources/cron-file')
cron_script = files.get_file_text('plugins/resources/cron-script')
data = data % {
'user': user, 'admin_principal': get_admin_principal(cluster),
'admin_password': get_server_password(cluster),
'principal': "%s/sahara-%s@%s" % (
user, instance.fqdn(), get_realm_name(cluster)),
'keytab': '%s-sahara-%s.keytab' % (user, instance.fqdn())
}
cron_script_location = '/tmp/sahara-kerberos/%s.sh' % _get_short_uuid()
cron_file = cron_file % {'refresher': cron_script_location, 'user': user}
cron_script = cron_script % {
'principal': "%s/sahara-%s@%s" % (
user, instance.fqdn(), get_realm_name(cluster)),
'keytab': '%s-sahara-%s.keytab' % (user, instance.fqdn()),
'user': user,
}
return data, cron_file, cron_script, cron_script_location
def _create_keytabs_for_user(instance, user):
script, cron, cron_script, cs_location = _get_script_for_user_creation(
instance.cluster, instance, user)
_execute_script(instance, script)
# setting up refresher
with instance.remote() as r:
tmp_location = '/tmp/%s' % _get_short_uuid()
r.write_file_to(tmp_location, cron_script, run_as_root=True)
r.execute_command(
"cat {0} | sudo tee {1} "
"&& rm -rf {0} && sudo chmod +x {1}".format(
tmp_location, cs_location))
r.execute_command(
'echo "%s" | sudo tee /etc/cron.d/%s.cron' % (
cron, _get_short_uuid()))
# executing script
r.execute_command('sudo bash %s' % cs_location)
@cpo.event_wrapper(
True, step=_('Setting up keytabs for users'), param=('cluster', 0))
def create_keytabs_for_map(cluster, mapper):
# cluster parameter is used by event log feature
with context.ThreadGroup() as tg:
for user, instances in mapper.items():
for instance in instances:
tg.spawn(
'create-keytabs', _create_keytabs_for_user,
instance, user)

View File

@ -0,0 +1,12 @@
#!/bin/bash
mkdir -p /tmp/sahara-kerberos/
kadmin -p %(admin_principal)s <<EOF
%(admin_password)s
addprinc -randkey %(principal)s
xst -k /tmp/sahara-kerberos/%(keytab)s %(principal)s
exit
EOF
sudo chown %(user)s:%(user)s /tmp/sahara-kerberos/%(keytab)s

View File

@ -0,0 +1,2 @@
# Once per hour refreshes tickets for user %(user)s
0 * * * * root %(refresher)s

View File

@ -0,0 +1,3 @@
#!/bin/bash
sudo -u %(user)s kinit -p %(principal)s -kt /tmp/sahara-kerberos/%(keytab)s

View File

@ -47,7 +47,13 @@ class OozieJobEngine(base_engine.JobEngine):
self.cluster = cluster
self.plugin = job_utils.get_plugin(self.cluster)
def _get_client(self):
def get_remote_client(self):
return o.RemoteOozieClient(self.get_oozie_server_uri(self.cluster),
self.get_oozie_server(self.cluster),
self.get_hdfs_user())
def get_client(self):
# by default engine will return standard oozie client implementation
return o.OozieClient(self.get_oozie_server_uri(self.cluster),
self.get_oozie_server(self.cluster))
@ -110,13 +116,13 @@ class OozieJobEngine(base_engine.JobEngine):
def cancel_job(self, job_execution):
if job_execution.engine_job_id is not None:
client = self._get_client()
client = self.get_client()
client.kill_job(job_execution)
return client.get_job_info(job_execution)
def get_job_status(self, job_execution):
if job_execution.engine_job_id is not None:
return self._get_client().get_job_info(job_execution)
return self.get_client().get_job_info(job_execution)
def _prepare_run_job(self, job_execution):
ctx = context.ctx()
@ -219,7 +225,7 @@ class OozieJobEngine(base_engine.JobEngine):
oozie_params,
use_hbase_lib)
client = self._get_client()
client = self.get_client()
oozie_job_id = client.add_job(x.create_hadoop_xml(job_params),
job_execution)
job_execution = conductor.job_execution_get(ctx, job_execution.id)
@ -260,7 +266,7 @@ class OozieJobEngine(base_engine.JobEngine):
job_execution.job_configs.job_execution_info, wf_dir,
"scheduled")
client = self._get_client()
client = self.get_client()
oozie_job_id = client.add_job(x.create_hadoop_xml(job_params),
job_execution)
@ -442,7 +448,7 @@ class OozieJobEngine(base_engine.JobEngine):
def _manage_job(self, job_execution, action):
if job_execution.oozie_job_id is not None:
client = self._get_client()
client = self.get_client()
if action == edp.JOB_ACTION_SUSPEND:
client.suspend_job(job_execution)
return client.get_job_status(job_execution)

View File

@ -13,76 +13,119 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import re
from oslo_serialization import jsonutils as json
from oslo_utils import uuidutils
import six
from six.moves.urllib import parse as urlparse
import sahara.exceptions as ex
class OozieClient(object):
@six.add_metaclass(abc.ABCMeta)
class BaseOozieClient(object):
def __init__(self, url, oozie_server):
self.job_url = url + "/v2/job/%s"
self.jobs_url = url + "/v2/jobs"
self.oozie_server = oozie_server
self.port = urlparse.urlparse(url).port
@abc.abstractmethod
def add_job(self, job_config, job_execution):
pass
@abc.abstractmethod
def manage_job(self, job_execution, action, job_id=None):
pass
@abc.abstractmethod
def get_job_info(self, job_execution, job_id=None):
pass
def kill_job(self, job_execution):
self.manage_job(job_execution, 'kill')
def run_job(self, job_execution, job_id):
self.manage_job(job_execution, 'start', job_id=job_id)
class OozieClient(BaseOozieClient):
def add_job(self, job_config, job_execution):
return self.post(
job_execution, self.jobs_url, data=job_config, headers={
"Content-Type": "application/xml;charset=UTF-8"})
def manage_job(self, job_execution, action, job_id=None):
job_id = job_id if job_id else job_execution.engine_job_id
url = self.job_url % job_id + "?action=" + action
self.put(job_execution, url)
def get_job_info(self, job_execution, job_id=None):
job_id = job_id if job_id else job_execution.engine_job_id
url = self.job_url % job_id + "?show=info"
return self.get(job_execution, url)
def _get_http_session(self, info=None):
return self.oozie_server.remote().get_http_client(self.port, info=info)
def add_job(self, job_config, job_execution):
def post(self, job_execution, url, data, headers):
session = self._get_http_session(job_execution.extra.get('neutron'))
resp = session.post(self.jobs_url, data=job_config, headers={
"Content-Type": "application/xml;charset=UTF-8"
})
resp = session.post(url, data=data, headers=headers)
_check_status_code(resp, 201)
return get_json(resp)['id']
def run_job(self, job_execution, job_id):
def put(self, job_execution, url):
session = self._get_http_session(job_execution.extra.get('neutron'))
resp = session.put(self.job_url % job_id + "?action=start")
resp = session.put(url)
_check_status_code(resp, 200)
def kill_job(self, job_execution):
def get(self, job_execution, url):
session = self._get_http_session(job_execution.extra.get('neutron'))
resp = session.put(self.job_url % job_execution.engine_job_id +
"?action=kill")
_check_status_code(resp, 200)
def manage_job(self, job_execution, action):
session = self._get_http_session(job_execution.extra.get('neutron'))
resp = session.put(self.job_url % job_execution.oozie_job_id +
"?action=" + action)
_check_status_code(resp, 200)
def get_job_info(self, job_execution, job_id=None):
if job_id is None:
job_id = job_execution.engine_job_id
session = self._get_http_session(job_execution.extra.get('neutron'))
resp = session.get(self.job_url % job_id + "?show=info")
_check_status_code(resp, 200)
return get_json(resp)
def get_job_logs(self, job_execution):
session = self._get_http_session(job_execution.extra.get('neutron'))
resp = session.get(self.job_url % job_execution.engine_job_id +
"?show=log")
_check_status_code(resp, 200)
return resp.text
def get_jobs(self, offset, size, **filter):
url = self.jobs_url + "?offset=%s&len=%s" % (offset, size)
if len(filter) > 0:
f = ";".join([k + "=" + v for k, v in filter.items()])
url += "&filter=" + urlparse.quote(f)
session = self._get_http_session()
resp = session.get(url)
_check_status_code(resp, 200)
return get_json(resp)
class RemoteOozieClient(OozieClient):
def __init__(self, url, oozie_server, hdfs_user):
self.hdfs_user = hdfs_user
self.oozie_url = url.replace(
urlparse.urlparse(url).hostname, oozie_server.fqdn())
super(RemoteOozieClient, self).__init__(url, oozie_server)
def _oozie(self, cmd):
return (
"sudo su - -c 'oozie -Doozie.auth.token.cache=false "
"{cmd} -oozie {oozie}' {user}".format(
cmd=cmd, oozie=self.oozie_url, user=self.hdfs_user))
def add_job(self, job_config, job_execution):
with self.oozie_server.remote() as r:
name = "/tmp/%s.xml" % uuidutils.generate_uuid()[:8]
r.write_file_to(name, job_config)
cmd = self._oozie("job -submit -config %s" % name)
cmd += " | awk '{ print $2 }'"
code, stdout = r.execute_command(cmd)
stdout = stdout.strip()
return stdout
def manage_job(self, job_execution, action, job_id=None):
job_id = job_id if job_id else job_execution.engine_job_id
cmd = self._oozie("job -%s %s" % (action, job_id))
with self.oozie_server.remote() as r:
r.execute_command(cmd)
def get_job_info(self, job_execution, job_id=None):
job_id = job_id if job_id else job_execution.engine_job_id
cmd = self._oozie("job -info %s" % job_id)
cmd += " | grep Status | head -n 1 | awk '{ print $3 }'"
with self.oozie_server.remote() as r:
code, stdout = r.execute_command(cmd)
return {'status': stdout.strip()}
def _check_status_code(resp, expected_code):
if resp.status_code != expected_code:
resp_text = resp.text

View File

@ -17,6 +17,26 @@ from sahara import context
from sahara.plugins import kerberos as krb
from sahara.tests.unit import base
ADD_PRINCIPAL_SCRIPT = """#!/bin/bash
mkdir -p /tmp/sahara-kerberos/
kadmin -p sahara/admin <<EOF
strong
addprinc -randkey bond/sahara-in.loc@SK
xst -k /tmp/sahara-kerberos/bond-sahara-in.loc.keytab bond/sahara-in.loc@SK
exit
EOF
sudo chown bond:bond /tmp/sahara-kerberos/bond-sahara-in.loc.keytab
"""
CRON_FILE = """# Once per hour refreshes tickets for user bond
0 * * * * root /tmp/sahara-kerberos/e1.sh
"""
CRON_SCRIPT = ('#!/bin/bash\n\n'
'sudo -u bond kinit -p bond/sahara-in.loc@SK -kt '
'/tmp/sahara-kerberos/bond-sahara-in.loc.keytab\n')
class FakeObject(dict):
def __init__(self, dict):
@ -91,3 +111,20 @@ class TestKerberosBase(base.SaharaTestCase):
get.return_value = 'password'
krb._get_server_installation_script(
cluster, 'server.novalocal', 'centos', '6.7')
@mock.patch('sahara.plugins.kerberos.get_realm_name')
@mock.patch('sahara.plugins.kerberos._get_short_uuid')
@mock.patch('sahara.plugins.kerberos.get_server_password')
def test_create_keytabs_for_user(self, get_password, get_uuid, realm):
get_uuid.return_value = 'e1'
realm.return_value = 'SK'
get_password.return_value = "strong"
cluster = mock.Mock(node_groups=[], cluster_configs={})
instance = mock.Mock()
instance.fqdn = mock.Mock()
instance.fqdn.return_value = "in.loc"
data = krb._get_script_for_user_creation(cluster, instance, 'bond')
self.assertEqual(ADD_PRINCIPAL_SCRIPT, data[0])
self.assertEqual(CRON_FILE, data[1])
self.assertEqual(CRON_SCRIPT, data[2])
self.assertEqual('/tmp/sahara-kerberos/e1.sh', data[3])

View File

@ -34,7 +34,7 @@ class TestOozieEngine(base.SaharaTestCase):
client_class.add_job = mock.MagicMock(return_value=1)
client_class.get_job_info = mock.MagicMock(
return_value={'status': 'PENDING'})
oje._get_client = mock.MagicMock(return_value=client_class)
oje.get_client = mock.MagicMock(return_value=client_class)
_, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG)
self.assertIsNone(oje.get_job_status(job_exec))
@ -235,7 +235,7 @@ class TestOozieEngine(base.SaharaTestCase):
client_class.add_job = mock.MagicMock(return_value=1)
client_class.get_job_info = mock.MagicMock(
return_value={'status': 'PENDING'})
oje._get_client = mock.MagicMock(return_value=client_class)
oje.get_client = mock.MagicMock(return_value=client_class)
_, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG)
update.return_value = job_exec