# Copyright 2013 Cloudbase Solutions Srl # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import unittest try: import unittest.mock as mock except ImportError: import mock from cloudbaseinit import conf as cloudbaseinit_conf from cloudbaseinit import constant from cloudbaseinit.plugins.common import constants from cloudbaseinit.plugins.common import setuserpassword from cloudbaseinit.tests.metadata import fake_json_response from cloudbaseinit.tests import testutils CONF = cloudbaseinit_conf.CONF class SetUserPasswordPluginTests(unittest.TestCase): def setUp(self): self._setpassword_plugin = setuserpassword.SetUserPasswordPlugin() self.fake_data = fake_json_response.get_fake_metadata_json( '2013-04-04') @mock.patch('base64.b64encode') @mock.patch('cloudbaseinit.utils.crypt.CryptManager' '.load_ssh_rsa_public_key') def test_encrypt_password(self, mock_load_ssh_key, mock_b64encode): mock_rsa = mock.MagicMock() fake_ssh_pub_key = 'fake key' fake_password = 'fake password' mock_load_ssh_key.return_value = mock_rsa mock_rsa.__enter__().public_encrypt.return_value = 'public encrypted' mock_b64encode.return_value = 'encrypted password' response = self._setpassword_plugin._encrypt_password( fake_ssh_pub_key, fake_password) mock_load_ssh_key.assert_called_with(fake_ssh_pub_key) mock_rsa.__enter__().public_encrypt.assert_called_with( b'fake password') mock_b64encode.assert_called_with('public encrypted') self.assertEqual('encrypted password', response) def _test_get_ssh_public_key(self, data_exists): mock_service = mock.MagicMock() public_keys = self.fake_data['public_keys'] mock_service.get_public_keys.return_value = public_keys.values() response = self._setpassword_plugin._get_ssh_public_key(mock_service) mock_service.get_public_keys.assert_called_with() self.assertEqual(list(public_keys.values())[0], response) def test_get_ssh_plublic_key(self): self._test_get_ssh_public_key(data_exists=True) def test_get_ssh_plublic_key_no_pub_keys(self): self._test_get_ssh_public_key(data_exists=False) def _test_get_password(self, inject_password): shared_data = {} expected_password = 'Passw0rd' if not inject_password: # The password should be the one created by # CreateUser plugin. shared_data[constants.SHARED_DATA_PASSWORD] = ( mock.sentinel.create_user_password) mock_service = mock.MagicMock() mock_service.get_admin_password.return_value = expected_password with testutils.LogSnatcher('cloudbaseinit.plugins.common.' 'setuserpassword') as snatcher: with testutils.ConfPatcher('inject_user_password', inject_password): response = self._setpassword_plugin._get_password( mock_service, shared_data) expected_logging = [ 'Using admin_pass metadata user password. ' 'Consider changing it as soon as possible' ] if inject_password: self.assertEqual(expected_logging, snatcher.output) mock_service.get_admin_password.assert_called_with() expected_password = (expected_password, True) else: self.assertFalse(mock_service.get_admin_password.called) expected_password = (mock.sentinel.create_user_password, False) self.assertEqual(expected_password, response) def test_get_password_inject_true(self): self._test_get_password(inject_password=True) def test_get_password_inject_false(self): self._test_get_password(inject_password=False) @mock.patch('cloudbaseinit.plugins.common.setuserpassword.' 'SetUserPasswordPlugin._get_ssh_public_key') @mock.patch('cloudbaseinit.plugins.common.setuserpassword.' 'SetUserPasswordPlugin._encrypt_password') def _test_set_metadata_password(self, mock_encrypt_password, mock_get_key, ssh_pub_key): fake_passw0rd = 'fake Passw0rd' mock_service = mock.MagicMock() mock_get_key.return_value = ssh_pub_key mock_encrypt_password.return_value = 'encrypted password' mock_service.post_password.return_value = 'value' mock_service.can_post_password = True mock_service.is_password_set = False with testutils.LogSnatcher('cloudbaseinit.plugins.common.' 'setuserpassword') as snatcher: response = self._setpassword_plugin._set_metadata_password( fake_passw0rd, mock_service) expected_logging = [] if ssh_pub_key is None: expected_logging = [ 'No SSH public key available for password encryption' ] self.assertTrue(response) else: mock_get_key.assert_called_once_with(mock_service) mock_encrypt_password.assert_called_once_with(ssh_pub_key, fake_passw0rd) mock_service.post_password.assert_called_with( 'encrypted password') self.assertEqual('value', response) self.assertEqual(expected_logging, snatcher.output) def test_set_metadata_password_with_ssh_key(self): fake_key = 'fake key' self._test_set_metadata_password(ssh_pub_key=fake_key) def test_set_metadata_password_no_ssh_key(self): self._test_set_metadata_password(ssh_pub_key=None) def test_set_metadata_password_already_set(self): mock_service = mock.MagicMock() mock_service.is_password_set = True with testutils.LogSnatcher('cloudbaseinit.plugins.common.' 'setuserpassword') as snatcher: response = self._setpassword_plugin._set_metadata_password( mock.sentinel.fake_password, mock_service) self.assertTrue(response) expected_logging = ['User\'s password already set in the ' 'instance metadata and it cannot be ' 'updated in the instance metadata'] self.assertEqual(expected_logging, snatcher.output) @mock.patch('cloudbaseinit.plugins.common.setuserpassword.' 'SetUserPasswordPlugin._change_logon_behaviour') @mock.patch('cloudbaseinit.plugins.common.setuserpassword.' 'SetUserPasswordPlugin._get_password') def _test_set_password(self, mock_get_password, mock_change_logon_behaviour, password, can_update_password, is_password_changed, max_password_length=20, injected=False): expected_password = password expected_logging = [] user = 'fake_user' mock_get_password.return_value = (password, injected) mock_service = mock.MagicMock() mock_osutils = mock.MagicMock() if not password: expected_password = "*" * CONF.user_password_length mock_osutils.generate_random_password.return_value = expected_password mock_service.can_update_password = can_update_password mock_service.is_password_changed.return_value = is_password_changed with testutils.ConfPatcher('user_password_length', max_password_length): with testutils.LogSnatcher('cloudbaseinit.plugins.common.' 'setuserpassword') as snatcher: response = self._setpassword_plugin._set_password( mock_service, mock_osutils, user, mock.sentinel.shared_data) if can_update_password and not is_password_changed: expected_logging.append('Updating password is not required.') expected_password = None if not password: expected_logging.append('Generating a random user password') if not can_update_password or is_password_changed: mock_get_password.assert_called_once_with( mock_service, mock.sentinel.shared_data) self.assertEqual(expected_password, response) self.assertEqual(expected_logging, snatcher.output) if password and can_update_password and is_password_changed: mock_change_logon_behaviour.assert_called_once_with( user, password_injected=injected) def test_set_password(self): self._test_set_password(password='Password', can_update_password=False, is_password_changed=False) self._test_set_password(password=None, can_update_password=False, is_password_changed=False, max_password_length=25) self._test_set_password(password=None, can_update_password=False, is_password_changed=False, max_password_length=10) self._test_set_password(password='Password', can_update_password=True, is_password_changed=True) self._test_set_password(password='Password', can_update_password=True, is_password_changed=False) @mock.patch('cloudbaseinit.plugins.common.setuserpassword.' 'SetUserPasswordPlugin._set_password') @mock.patch('cloudbaseinit.plugins.common.setuserpassword.' 'SetUserPasswordPlugin._set_metadata_password') @mock.patch('cloudbaseinit.osutils.factory.get_os_utils') def _test_execute(self, mock_get_os_utils, mock_set_metadata_password, mock_set_password, is_password_set, can_post_password, can_update_password=False): mock_service = mock.MagicMock() mock_osutils = mock.MagicMock() fake_shared_data = mock.MagicMock() fake_shared_data.get.return_value = 'fake username' mock_service.is_password_set = is_password_set mock_service.can_post_password = can_post_password mock_service.can_update_password = can_update_password mock_get_os_utils.return_value = mock_osutils mock_osutils.user_exists.return_value = True mock_set_password.return_value = 'fake password' with testutils.LogSnatcher('cloudbaseinit.plugins.common.' 'setuserpassword') as snatcher: response = self._setpassword_plugin.execute(mock_service, fake_shared_data) mock_get_os_utils.assert_called_once_with() fake_shared_data.get.assert_called_with( constants.SHARED_DATA_USERNAME, CONF.username) mock_osutils.user_exists.assert_called_once_with('fake username') mock_set_password.assert_called_once_with(mock_service, mock_osutils, 'fake username', fake_shared_data) expected_logging = [ "Password succesfully updated for user fake username", ] if can_post_password: mock_set_metadata_password.assert_called_once_with('fake password', mock_service) else: expected_logging.append("Cannot set the password in the metadata " "as it is not supported by this service") self.assertFalse(mock_set_metadata_password.called) if can_update_password: self.assertEqual((2, False), response) else: self.assertEqual((1, False), response) self.assertEqual(expected_logging, snatcher.output) def test_execute(self): self._test_execute(is_password_set=False, can_post_password=False) self._test_execute(is_password_set=True, can_post_password=True) self._test_execute(is_password_set=False, can_post_password=True) self._test_execute(is_password_set=True, can_post_password=True, can_update_password=True) @mock.patch.object(setuserpassword.osutils_factory, 'get_os_utils') @testutils.ConfPatcher('first_logon_behaviour', constant.NEVER_CHANGE) def test_logon_behaviour_never_change(self, mock_get_os_utils): self._setpassword_plugin._change_logon_behaviour( mock.sentinel.username) self.assertFalse(mock_get_os_utils.called) @testutils.ConfPatcher('first_logon_behaviour', constant.ALWAYS_CHANGE) @mock.patch.object(setuserpassword, 'osutils_factory') def test_logon_behaviour_always(self, mock_factory): self._setpassword_plugin._change_logon_behaviour( mock.sentinel.username) mock_get_os_utils = mock_factory.get_os_utils self.assertTrue(mock_get_os_utils.called) osutils = mock_get_os_utils.return_value osutils.change_password_next_logon.assert_called_once_with( mock.sentinel.username) @testutils.ConfPatcher('first_logon_behaviour', constant.CLEAR_TEXT_INJECTED_ONLY) @mock.patch.object(setuserpassword, 'osutils_factory') def test_change_logon_behaviour_clear_text_password_not_injected( self, mock_factory): self._setpassword_plugin._change_logon_behaviour( mock.sentinel.username, password_injected=False) mock_get_os_utils = mock_factory.get_os_utils self.assertFalse(mock_get_os_utils.called) @testutils.ConfPatcher('first_logon_behaviour', constant.CLEAR_TEXT_INJECTED_ONLY) @mock.patch.object(setuserpassword, 'osutils_factory') def test_logon_behaviour_clear_text_password_injected( self, mock_factory): self._setpassword_plugin._change_logon_behaviour( mock.sentinel.username, password_injected=True) mock_get_os_utils = mock_factory.get_os_utils self.assertTrue(mock_get_os_utils.called) osutils = mock_get_os_utils.return_value osutils.change_password_next_logon.assert_called_once_with( mock.sentinel.username)