Parameter to tune mutual authentication in kerberos

Parameter is optional.

Change-Id: Idfec093d5af677ba4899dc17aafa1ede17f0d4c0
Closes-Bug: #1681448
This commit is contained in:
Jose Castro Leon 2017-04-10 15:52:49 +02:00
parent 81363eca79
commit bb5cb9d418
7 changed files with 158 additions and 10 deletions

View File

@ -31,12 +31,19 @@ from keystoneauth1.identity import v3
from keystoneauth1.identity.v3 import federation
def _requests_auth():
# NOTE(jamielennox): request_kerberos.OPTIONAL allows the plugin to accept
# unencrypted error messages where we can't verify the origin of the error
# because we aren't authenticated.
def _mutual_auth(value):
if value is None:
return requests_kerberos.OPTIONAL
return {
'required': requests_kerberos.REQUIRED,
'optional': requests_kerberos.OPTIONAL,
'disabled': requests_kerberos.DISABLED,
}.get(value.lower(), requests_kerberos.OPTIONAL)
def _requests_auth(mutual_authentication):
return requests_kerberos.HTTPKerberosAuth(
mutual_authentication=requests_kerberos.OPTIONAL)
mutual_authentication=_mutual_auth(mutual_authentication))
def _dependency_check():
@ -51,7 +58,7 @@ packages. These can be installed with::
class KerberosMethod(v3.AuthMethod):
_method_parameters = []
_method_parameters = ['mutual_auth']
def __init__(self, *args, **kwargs):
_dependency_check()
@ -60,7 +67,7 @@ class KerberosMethod(v3.AuthMethod):
def get_auth_data(self, session, auth, headers, request_kwargs, **kwargs):
# NOTE(jamielennox): request_kwargs is passed as a kwarg however it is
# required and always present when called from keystoneclient.
request_kwargs['requests_auth'] = _requests_auth()
request_kwargs['requests_auth'] = _requests_auth(self.mutual_auth)
return 'kerberos', {}
@ -75,15 +82,16 @@ class MappedKerberos(federation.FederationBaseAuth):
use the standard keystone auth process to scope that to any given project.
"""
def __init__(self, auth_url, identity_provider, protocol, **kwargs):
def __init__(self, auth_url, identity_provider, protocol,
mutual_auth=None, **kwargs):
_dependency_check()
self.mutual_auth = mutual_auth
super(MappedKerberos, self).__init__(auth_url, identity_provider,
protocol, **kwargs)
def get_unscoped_auth_ref(self, session, **kwargs):
resp = session.get(self.federated_token_url,
requests_auth=_requests_auth(),
requests_auth=_requests_auth(self.mutual_auth),
authenticated=False)
return access.create(body=resp.json(), resp=resp)

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from keystoneauth1 import exceptions
from keystoneauth1.extras import kerberos
from keystoneauth1 import loading
@ -24,6 +25,29 @@ class Kerberos(loading.BaseV3Loader):
def available(self):
return kerberos.requests_kerberos is not None
def get_options(self):
options = super(Kerberos, self).get_options()
options.extend([
loading.Opt('mutual-auth',
required=False,
default='optional',
help='Configures Kerberos Mutual Authentication'),
])
return options
def load_from_options(self, **kwargs):
if kwargs.get('mutual_auth'):
value = kwargs.get('mutual_auth')
if not (value.lower() in ['required', 'optional', 'disabled']):
m = ('You need to provide a valid value for kerberos mutual '
'authentication. It can be one of the following: '
'(required, optional, disabled)')
raise exceptions.OptionError(m)
return super(Kerberos, self).load_from_options(**kwargs)
class MappedKerberos(loading.BaseFederationLoader):
@ -34,3 +58,26 @@ class MappedKerberos(loading.BaseFederationLoader):
@property
def available(self):
return kerberos.requests_kerberos is not None
def get_options(self):
options = super(MappedKerberos, self).get_options()
options.extend([
loading.Opt('mutual-auth',
required=False,
default='optional',
help='Configures Kerberos Mutual Authentication'),
])
return options
def load_from_options(self, **kwargs):
if kwargs.get('mutual_auth'):
value = kwargs.get('mutual_auth')
if not (value.lower() in ['required', 'optional', 'disabled']):
m = ('You need to provide a valid value for kerberos mutual '
'authentication. It can be one of the following: '
'(required, optional, disabled)')
raise exceptions.OptionError(m)
return super(MappedKerberos, self).load_from_options(**kwargs)

View File

@ -31,6 +31,7 @@ class FedKerbLoadingTests(test_utils.TestCase):
'protocol',
'trust-id',
'auth-url',
'mutual-auth',
]
self.assertItemsEqual(allowed_opts, opts)

View File

@ -28,6 +28,7 @@ class KerberosLoadingTests(test_utils.TestCase):
'project-domain-name',
'trust-id',
'auth-url',
'mutual-auth',
]
self.assertItemsEqual(allowed_opts, opts)

View File

@ -75,3 +75,59 @@ class TestMappedAuth(base.TestCase):
self.assertEqual(scoped_id, tok)
self.assertEqual(scoped_body.project_id, proj)
def test_authenticate_with_mutual_authentication_required(self):
self.kerberos_mock.mock_auth_success(url=self.token_url,
method='GET')
scoped_id = uuid.uuid4().hex
scoped_body = ks_fixture.V3Token()
scoped_body.set_project_scope()
self.requests_mock.post(
'%s/auth/tokens' % self.TEST_V3_URL,
json=scoped_body,
headers={'X-Subject-Token': scoped_id,
'Content-Type': 'application/json'})
plugin = kerberos.MappedKerberos(
auth_url=self.TEST_V3_URL, protocol=self.protocol,
identity_provider=self.identity_provider,
project_id=scoped_body.project_id,
mutual_auth='required')
sess = session.Session()
tok = plugin.get_token(sess)
proj = plugin.get_project_id(sess)
self.assertEqual(scoped_id, tok)
self.assertEqual(scoped_body.project_id, proj)
self.assertEqual(self.kerberos_mock.called_auth_server, True)
def test_authenticate_with_mutual_authentication_disabled(self):
self.kerberos_mock.mock_auth_success(url=self.token_url,
method='GET')
scoped_id = uuid.uuid4().hex
scoped_body = ks_fixture.V3Token()
scoped_body.set_project_scope()
self.requests_mock.post(
'%s/auth/tokens' % self.TEST_V3_URL,
json=scoped_body,
headers={'X-Subject-Token': scoped_id,
'Content-Type': 'application/json'})
plugin = kerberos.MappedKerberos(
auth_url=self.TEST_V3_URL, protocol=self.protocol,
identity_provider=self.identity_provider,
project_id=scoped_body.project_id,
mutual_auth='disabled')
sess = session.Session()
tok = plugin.get_token(sess)
proj = plugin.get_project_id(sess)
self.assertEqual(scoped_id, tok)
self.assertEqual(scoped_body.project_id, proj)
self.assertEqual(self.kerberos_mock.called_auth_server, False)

View File

@ -36,3 +36,35 @@ class TestKerberosAuth(base.TestCase):
self.requests_mock.last_request.headers['Authorization'])
self.assertEqual(token_id, a.auth_ref.auth_token)
self.assertEqual(token_id, token)
def test_authenticate_with_kerberos_mutual_authentication_required(self):
token_id, token_body = self.kerberos_mock.mock_auth_success()
a = kerberos.Kerberos(self.TEST_ROOT_URL + 'v3',
mutual_auth='required')
s = session.Session(a)
token = a.get_token(s)
self.assertRequestBody()
self.assertEqual(
self.kerberos_mock.challenge_header,
self.requests_mock.last_request.headers['Authorization'])
self.assertEqual(token_id, a.auth_ref.auth_token)
self.assertEqual(token_id, token)
self.assertEqual(self.kerberos_mock.called_auth_server, True)
def test_authenticate_with_kerberos_mutual_authentication_disabled(self):
token_id, token_body = self.kerberos_mock.mock_auth_success()
a = kerberos.Kerberos(self.TEST_ROOT_URL + 'v3',
mutual_auth='disabled')
s = session.Session(a)
token = a.get_token(s)
self.assertRequestBody()
self.assertEqual(
self.kerberos_mock.challenge_header,
self.requests_mock.last_request.headers['Authorization'])
self.assertEqual(token_id, a.auth_ref.auth_token)
self.assertEqual(token_id, token)
self.assertEqual(self.kerberos_mock.called_auth_server, False)

View File

@ -54,6 +54,7 @@ class KerberosMock(fixtures.Fixture):
return self.challenge_header
def _authenticate_server(self, response):
self.called_auth_server = True
return response.headers.get('www-authenticate') == self.pass_header
def mock_auth_success(
@ -67,6 +68,8 @@ class KerberosMock(fixtures.Fixture):
if not token_body:
token_body = ks_fixture.V3Token()
self.called_auth_server = False
response_list = [{'text': 'Fail',
'status_code': 401,
'headers': {'WWW-Authenticate': 'Negotiate'}},