diff --git a/anchor/auth/ldap.py b/anchor/auth/ldap.py index 1963292..8d09d6c 100644 --- a/anchor/auth/ldap.py +++ b/anchor/auth/ldap.py @@ -12,6 +12,7 @@ # under the License. from __future__ import absolute_import +import logging import ldap3 from ldap3.utils import dn @@ -20,6 +21,9 @@ from anchor.auth import results from anchor import jsonloader +logger = logging.getLogger(__name__) + + def user_get_groups(attributes): """Retrieve the group membership @@ -27,37 +31,44 @@ def user_get_groups(attributes): :returns: List -- A list of groups that the user is a member of """ groups = attributes.get('memberOf', []) + logger.error("!!! groups: '%s'", groups) group_dns = [dn.parse_dn(g) for g in groups] - return set(x[0][1] for x in group_dns if x[1] == ('OU', 'Groups', ',')) + return [x[0][1] for x in group_dns if x[1] == ('OU', 'Groups', ',')] -def login(user, secret): +def login(ra_name, user, secret): """Attempt to Authenitcate user using LDAP :param user: Username :param secret: Secret/Passphrase :returns: AuthDetails -- Class used for authentication information """ - ldap_port = int(jsonloader.conf.auth['ldap'].get('port', 389)) - use_ssl = jsonloader.conf.auth['ldap'].get('ssl', ldap_port == 636) + conf = jsonloader.authentication_for_registration_authority(ra_name) + ldap_port = int(conf.get('port', 389)) + use_ssl = conf.get('ssl', ldap_port == 636) - lds = ldap3.Server(jsonloader.conf.auth['ldap']['host'], port=ldap_port, + lds = ldap3.Server(conf['host'], port=ldap_port, get_info=ldap3.ALL, use_ssl=use_ssl) try: - ldap_user = "%s@%s" % (user, jsonloader.conf.auth['ldap']['domain']) + ldap_user = "%s@%s" % (user, conf['domain']) ldc = ldap3.Connection(lds, auto_bind=True, client_strategy=ldap3.SYNC, user=ldap_user, password=secret, authentication=ldap3.SIMPLE, check_names=True) filter_str = ('(sAMAccountName=%s)' % ldap3.utils.conv.escape_bytes(user)) - ldc.search(jsonloader.conf.auth['ldap']['base'], filter_str, + ldc.search(conf['base'], filter_str, ldap3.SUBTREE, attributes=['memberOf']) if ldc.result['result'] != 0: return None user_attrs = ldc.response[0]['attributes'] user_groups = user_get_groups(user_attrs) return results.AuthDetails(username=user, groups=user_groups) - except ldap3.LDAPBindError: + except ldap3.LDAPSocketOpenError: + logger.error("cannot connect to LDAP host '%s' (authority '%s')", + conf['host'], ra_name) + return None + except ldap3.LDAPBindError: + logger.info("failed ldap auth for user %s", user) return None diff --git a/tests/auth/test_ldap.py b/tests/auth/test_ldap.py new file mode 100644 index 0000000..8f073d0 --- /dev/null +++ b/tests/auth/test_ldap.py @@ -0,0 +1,117 @@ +# -*- coding:utf-8 -*- +# +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import unittest + +import ldap3 +import mock +from webob import exc as http_status + +from anchor import auth +from anchor.auth import results +from anchor import jsonloader +import tests + + +class AuthLdapTests(tests.DefaultConfigMixin, unittest.TestCase): + + def setUp(self): + super(AuthLdapTests, self).setUp() + self.sample_conf_auth['default_auth'] = { + "backend": "ldap", + "host": "ldap.example.com", + "base": "CN=Users,DC=example,DC=com", + "domain": "example.com", + "port": 636, + "ssl": True + } + + def tearDown(self): + pass + + @mock.patch('ldap3.Connection') + def test_login_good(self, mock_connection): + """Test all static user/pass authentication paths.""" + jsonloader.conf.load_extensions() + config = "anchor.jsonloader.conf._config" + + mock_ldc = mock.Mock() + mock_connection.return_value = mock_ldc + mock_ldc.result = {'result': 0} + mock_ldc.response = [{'attributes': {}}] + + with mock.patch.dict(config, self.sample_conf): + expected = results.AuthDetails(username='user', groups=[]) + self.assertEqual(auth.validate('default_ra', 'user', 'pass'), + expected) + + @mock.patch('ldap3.Connection') + def test_login_good_with_groups(self, mock_connection): + """Test all static user/pass authentication paths.""" + jsonloader.conf.load_extensions() + config = "anchor.jsonloader.conf._config" + + mock_ldc = mock.Mock() + mock_connection.return_value = mock_ldc + mock_ldc.result = {'result': 0} + mock_ldc.response = [{'attributes': {'memberOf': [ + u'CN=some_group,OU=Groups,DC=example,DC=com', + u'CN=other_group,OU=Groups,DC=example,DC=com']}}] + + with mock.patch.dict(config, self.sample_conf): + expected = results.AuthDetails( + username='user', + groups=[u'some_group', u'other_group']) + self.assertEqual(auth.validate('default_ra', 'user', 'pass'), + expected) + + @mock.patch('ldap3.Connection') + def test_login_search_fail(self, mock_connection): + """Test all static user/pass authentication paths.""" + jsonloader.conf.load_extensions() + config = "anchor.jsonloader.conf._config" + + mock_ldc = mock.Mock() + mock_connection.return_value = mock_ldc + mock_ldc.result = {'result': 1} + + with mock.patch.dict(config, self.sample_conf): + with self.assertRaises(http_status.HTTPUnauthorized): + auth.validate('default_ra', 'user', 'pass') + + @mock.patch('ldap3.Connection') + def test_login_bind_fail(self, mock_connection): + """Test all static user/pass authentication paths.""" + jsonloader.conf.load_extensions() + config = "anchor.jsonloader.conf._config" + + mock_connection.side_effect = ldap3.LDAPBindError() + + with mock.patch.dict(config, self.sample_conf): + with self.assertRaises(http_status.HTTPUnauthorized): + auth.validate('default_ra', 'user', 'pass') + + @mock.patch('ldap3.Connection') + def test_login_connection_fail(self, mock_connection): + """Test all static user/pass authentication paths.""" + jsonloader.conf.load_extensions() + config = "anchor.jsonloader.conf._config" + + mock_connection.side_effect = ldap3.LDAPSocketOpenError() + + with mock.patch.dict(config, self.sample_conf): + with self.assertRaises(http_status.HTTPUnauthorized): + auth.validate('default_ra', 'user', 'pass')