Adding proxy user creation per job execution

Changes
* adding sahara.utils.proxy module
* adding functions to create and delete proxy users
* adding proxy user creation during job execution
* adding proxy user deletion during job execution status update
* adding sanitization for proxy configs from job execution
* adding unit test for proxy usage detection function

Partial-implements: blueprint edp-swift-trust-authentication
Change-Id: I551a14a3cb5320a27fc6104b35c3b9a08a03abda
This commit is contained in:
Michael McCune 2014-08-20 10:17:27 -04:00
parent 96b45fdfca
commit a539bcfdc9
6 changed files with 189 additions and 2 deletions

View File

@ -227,6 +227,8 @@ class JobExecution(Resource, objects.JobExecution):
configs[swift_helper.HADOOP_SWIFT_PASSWORD] = ""
if 'trusts' in job_configs:
del job_configs['trusts']
if 'proxy_configs' in job_configs:
del job_configs['proxy_configs']
return job_configs
def sanitize_info(self, info):

View File

@ -17,10 +17,13 @@ from oslo.config import cfg
from sahara import conductor as c
from sahara import context
from sahara import exceptions as ex
from sahara.i18n import _LE
from sahara.openstack.common import log as logging
from sahara.service.edp.binary_retrievers import dispatch
from sahara.service.edp import job_manager as manager
from sahara.utils import edp
from sahara.utils import proxy as p
conductor = c.API
@ -42,7 +45,6 @@ def get_job_config_hints(job_type):
def execute_job(job_id, data):
# Elements common to all job types
cluster_id = data['cluster_id']
configs = data.get('job_configs', {})
@ -59,6 +61,16 @@ def execute_job(job_id, data):
'job_configs': configs, 'extra': {}}
job_execution = conductor.job_execution_create(context.ctx(), job_ex_dict)
# check to use proxy user
if p.job_execution_requires_proxy_user(job_execution):
try:
p.create_proxy_user_for_job_execution(job_execution)
except ex.SaharaException as e:
LOG.exception(_LE("Can't run job execution '{0}' "
"(reasons: {1})").format(job_execution.id, e))
conductor.job_execution_destroy(context.ctx(), job_execution)
raise e
OPS.run_edp_job(job_execution.id)
return job_execution

View File

@ -27,6 +27,7 @@ from sahara.service.edp import job_utils
from sahara.service.edp.oozie import engine as oozie_engine
from sahara.service.edp.spark import engine as spark_engine
from sahara.utils import edp
from sahara.utils import proxy as p
LOG = log.getLogger(__name__)
@ -53,6 +54,9 @@ def _write_job_status(job_execution, job_info):
update = {"info": job_info}
if job_info['status'] in edp.JOB_STATUSES_TERMINATED:
update['end_time'] = datetime.datetime.now()
job_configs = p.delete_proxy_user_for_job_execution(job_execution)
if job_configs:
update['job_configs'] = job_configs
return conductor.job_execution_update(context.ctx(),
job_execution,
update)

View File

@ -117,6 +117,10 @@ SAMPLE_JOB_EXECUTION = {
swift_helper.HADOOP_SWIFT_USERNAME: "admin",
"myfavoriteconfig": 1
},
"proxy_configs": {
"proxy_username": "admin",
"proxy_password": "openstack"
},
"trusts": {
"input_id": "9c528755099149b8b7166f3d0fa3bf10",
"output_id": "3f2bde9d43ec440381dc9f736481e2b0"
@ -219,6 +223,11 @@ class TestResource(testtools.TestCase):
self.assertIn('trusts', job_exec['job_configs'])
self.assertIn('input_id', job_exec['job_configs']['trusts'])
self.assertIn('output_id', job_exec['job_configs']['trusts'])
self.assertIn('proxy_configs', job_exec['job_configs'])
self.assertIn('proxy_username',
job_exec['job_configs']['proxy_configs'])
self.assertIn('proxy_password',
job_exec['job_configs']['proxy_configs'])
wrapped_dict = job_exec.to_wrapped_dict()['job_execution']
self.assertNotIn('extra', wrapped_dict)
@ -233,3 +242,4 @@ class TestResource(testtools.TestCase):
self.assertNotIn('conf', a)
self.assertNotIn('trusts', wrapped_dict['job_configs'])
self.assertNotIn('proxy_configs', wrapped_dict['job_configs'])

View File

@ -0,0 +1,51 @@
# Copyright (c) 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from sahara.tests.unit import base
from sahara.utils import proxy as p
class TestProxyUtils(base.SaharaWithDbTestCase):
def setUp(self):
super(TestProxyUtils, self).setUp()
@mock.patch('sahara.conductor.API.job_get')
@mock.patch('sahara.conductor.API.data_source_get')
def test_job_execution_requires_proxy_user(self, data_source, job):
self.override_config('use_domain_for_proxy_users', True)
job_execution = mock.Mock(input_id=1,
output_id=2,
job_id=3,
job_configs={})
data_source.return_value = mock.Mock(url='swift://container/object')
self.assertTrue(p.job_execution_requires_proxy_user(job_execution))
data_source.return_value = mock.Mock(url='')
job.return_value = mock.Mock(
mains=[mock.Mock(url='swift://container/object')])
self.assertTrue(p.job_execution_requires_proxy_user(job_execution))
job.return_value = mock.Mock(
mains=[],
libs=[mock.Mock(url='swift://container/object')])
self.assertTrue(p.job_execution_requires_proxy_user(job_execution))
job_execution.job_configs['args'] = ['swift://container/object']
job.return_value = mock.Mock(
mains=[],
libs=[])
self.assertTrue(p.job_execution_requires_proxy_user(job_execution))

View File

@ -13,14 +13,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
import uuid
from oslo.config import cfg
import six
from sahara import conductor as c
from sahara import context
from sahara import exceptions as ex
from sahara.i18n import _
from sahara.openstack.common import log as logging
from sahara.swift import utils as su
from sahara.utils.openstack import keystone as k
PROXY_DOMAIN = None
conductor = c.API
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
opts = [
@ -37,6 +46,40 @@ opts = [
CONF.register_opts(opts)
def create_proxy_user_for_job_execution(job_execution):
'''Creates a proxy user and adds the credentials to the job execution
:param job_execution: The job execution model to update
'''
username = 'job_{0}'.format(job_execution.id)
password = proxy_user_create(username)
update = {'job_configs': job_execution.job_configs.to_dict()}
update['job_configs']['proxy_configs'] = {
'proxy_username': username,
'proxy_password': password
}
conductor.job_execution_update(context.ctx(), job_execution, update)
def delete_proxy_user_for_job_execution(job_execution):
'''Delete a proxy user based on a JobExecution
:param job_execution: The job execution with proxy user information
:returns: An updated job_configs dictionary or None
'''
proxy_configs = job_execution.job_configs.get('proxy_configs')
if proxy_configs is not None:
proxy_username = proxy_configs.get('proxy_username')
if proxy_username is not None:
proxy_user_delete(proxy_username)
update = {'job_configs': job_execution.job_configs.to_dict()}
del update['job_configs']['proxy_configs']
return update
return None
def domain_for_proxy():
'''Return the proxy domain or None
@ -72,3 +115,68 @@ def domain_for_proxy():
'%s'))
PROXY_DOMAIN = domain_list[0]
return PROXY_DOMAIN
def job_execution_requires_proxy_user(job_execution):
'''Returns True if the job execution requires a proxy user.'''
if CONF.use_domain_for_proxy_users is False:
return False
input_ds = conductor.data_source_get(context.ctx(),
job_execution.input_id)
if input_ds and input_ds.url.startswith(su.SWIFT_INTERNAL_PREFIX):
return True
output_ds = conductor.data_source_get(context.ctx(),
job_execution.output_id)
if output_ds and output_ds.url.startswith(su.SWIFT_INTERNAL_PREFIX):
return True
if job_execution.job_configs.get('args'):
for arg in job_execution.job_configs['args']:
if arg.startswith(su.SWIFT_INTERNAL_PREFIX):
return True
job = conductor.job_get(context.ctx(), job_execution.job_id)
for main in job.mains:
if main.url.startswith(su.SWIFT_INTERNAL_PREFIX):
return True
for lib in job.libs:
if lib.url.startswith(su.SWIFT_INTERNAL_PREFIX):
return True
return False
def proxy_user_create(username):
'''Create a new user in the proxy domain
Creates the username specified with a random password.
:param username: The name of the new user.
:returns: The password created for the user.
'''
admin = k.client_for_admin()
domain = domain_for_proxy()
password = six.text_type(uuid.uuid4())
admin.users.create(name=username, password=password, domain=domain.id)
LOG.debug(_('created proxy user {0}').format(username))
return password
def proxy_user_delete(username):
'''Delete the user from the proxy domain.
:param username: The name of the user to delete.
:raises NotFoundException: If there is an error locating the user in the
proxy domain.
'''
admin = k.client_for_admin()
domain = domain_for_proxy()
user_list = admin.users.list(domain=domain.id, name=username)
if len(user_list) == 0:
raise ex.NotFoundException(value=username,
message=_('Failed to find user %s'))
if len(user_list) > 1:
raise ex.NotFoundException(value=username,
message=_('Unexpected results found when '
'searching for user %s'))
admin.users.delete(user_list[0].id)
LOG.debug('deleted proxy user {0}'.format(username))