diff --git a/cloudbaseinit/tests/plugins/windows/test_userdata.py b/cloudbaseinit/tests/plugins/windows/test_userdata.py index 12d48278..f6de2612 100644 --- a/cloudbaseinit/tests/plugins/windows/test_userdata.py +++ b/cloudbaseinit/tests/plugins/windows/test_userdata.py @@ -15,12 +15,11 @@ # under the License. import mock -import tempfile -import uuid import unittest -from cloudbaseinit.metadata.services import base as base_metadata +from cloudbaseinit.metadata.services import base as metadata_services_base from cloudbaseinit.openstack.common import cfg +from cloudbaseinit.plugins import base from cloudbaseinit.plugins.windows import userdata from cloudbaseinit.tests.metadata import fake_json_response @@ -34,230 +33,199 @@ class UserDataPluginTest(unittest.TestCase): self.fake_data = fake_json_response.get_fake_metadata_json( '2013-04-04') - @mock.patch('os.path') - def test_get_plugin_path(self, mock_ospath): - mock_ospath.join.return_value = 'fake path' - response = self._userdata._get_plugin_path() - mock_ospath.join.assert_called_with( - mock_ospath.dirname(mock_ospath.dirname(mock_ospath.realpath())), - "windows/userdata-plugins") - self.assertEqual(response, 'fake path') - @mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin' - '._process_userdata') - def _test_execute(self, mock_process_userdata, user_data, exception): + '._process_user_data') + def _test_execute(self, mock_process_user_data, ret_val): mock_service = mock.MagicMock() - fake_shared_data = 'fake data' - if exception: - e = base_metadata.NotExistingMetadataException() - mock_service.side_effect = e + mock_service.get_user_data.side_effect = [ret_val] + response = self._userdata.execute(service=mock_service, + shared_data=None) + mock_service.get_user_data.assert_called_once_with('openstack') + if ret_val is metadata_services_base.NotExistingMetadataException: + self.assertEqual(response, (base.PLUGIN_EXECUTION_DONE, False)) + elif ret_val is None: + self.assertEqual(response, (base.PLUGIN_EXECUTION_DONE, False)) else: - mock_service.get_user_data.return_value = user_data - response = self._userdata.execute(mock_service, fake_shared_data) - mock_service.get_user_data.assert_called_with('openstack') - if user_data: - mock_process_userdata.assert_called_with(user_data) - self.assertEqual(response, (1, False)) + mock_process_user_data.assert_called_once_with(ret_val) + self.assertEqual(response, mock_process_user_data()) def test_execute(self): - self._test_execute(user_data=self.fake_data, exception=False) + self._test_execute(ret_val='fake data') - def test_execute_no_data(self): - self._test_execute(user_data=None, exception=False) + def test_execute_NotExistingMetadataException(self): + self._test_execute( + ret_val=metadata_services_base.NotExistingMetadataException) - def test_execute_exception(self): - self._test_execute(user_data=None, exception=True) + def test_execute_not_user_data(self): + self._test_execute(ret_val=None) - @mock.patch('cloudbaseinit.plugins.windows.userdata.handle') + @mock.patch('email.message_from_string') + def test_parse_mime(self, mock_message_from_string): + fake_user_data = 'fake data' + response = self._userdata._parse_mime(user_data=fake_user_data) + mock_message_from_string.assert_called_once_with(fake_user_data) + self.assertEqual(response, mock_message_from_string().walk()) + + @mock.patch('cloudbaseinit.plugins.windows.userdataplugins.factory.' + 'UserDataPluginsFactory.load_plugins') @mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin' '._parse_mime') @mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin' '._process_part') - def _test_process_userdata(self, mock_process_part, mock_parse_mime, - mock_handle, user_data): - mock_process_part().__iter__.side_effect = ['fake'] - self._userdata._process_userdata(user_data) - print mock_parse_mime.mock_calls - print mock_process_part.mock_calls - if user_data.startswith('Content-Type: multipart'): - mock_parse_mime.assert_called_once_with(user_data) - self.assertEqual(mock_process_part.call_count, 1) - else: - mock_handle.assert_called_once_with(user_data) - - def test_process_userdata_multipart(self): - user_data = 'Content-Type: multipart fake data' - self._test_process_userdata(user_data=user_data) - - def test_process_userdata(self): - user_data = 'fake data' - self._test_process_userdata(user_data=user_data) - - @mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin' - '._get_part_handler') - @mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin' - '._begin_part_process_event') @mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin' '._end_part_process_event') - def test_process_part(self, mock_end_part_process_event, - mock_begin_part_process_event, - mock_get_part_handler): - mock_part = mock.MagicMock() - mock_part_handler = mock.MagicMock() - mock_get_part_handler.return_value = mock_part_handler - - self._userdata._process_part(mock_part) - - mock_get_part_handler.assert_called_once_with(mock_part) - mock_begin_part_process_event.assert_called_once_with(mock_part) - mock_part.get_content_type.assert_called_once_with() - mock_part.get_filename.assert_called_once_with() - mock_part_handler.process.assert_called_with(mock_part) - mock_end_part_process_event.assert_called_with(mock_part) - @mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin' - '._get_custom_handler') - def test_begin_part_process_event(self, mock_get_custom_handler): + '._process_non_multi_part') + def _test_process_user_data(self, mock_process_non_multi_part, + mock_end_part_process_event, + mock_process_part, mock_parse_mime, + mock_load_plugins, user_data, reboot): mock_part = mock.MagicMock() - mock_handler = mock.MagicMock() - mock_get_custom_handler.return_value = mock_handler - self._userdata._begin_part_process_event(mock_part) - mock_part.get_filename.assert_called_with() - mock_part.get_payload.assert_called_with() - mock_handler.assert_called_with("", "__begin__", - mock_part.get_filename(), - mock_part.get_payload()) + mock_parse_mime.return_value = [mock_part] + mock_process_part.return_value = (base.PLUGIN_EXECUTION_DONE, reboot) - @mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin' - '._get_custom_handler') - def test_end_part_process_event(self, mock_get_custom_handler): - mock_part = mock.MagicMock() - mock_handler = mock.MagicMock() - mock_get_custom_handler.return_value = mock_handler - self._userdata._end_part_process_event(mock_part) - mock_part.get_payload.assert_called_with() - mock_handler.assert_called_with("", "__end__", - mock_part.get_filename(), - mock_part.get_payload()) - - def test_get_custom_handler(self): - mock_part = mock.MagicMock() - mock_part.get_content_type.return_value = 0 - self._userdata.plugin_set.has_custom_handlers = True - self._userdata.plugin_set.custom_handlers = [0] - response = self._userdata._get_custom_handler(mock_part) - mock_part.get_content_type.assert_called_with() - self.assertEqual(response, 0) - - def test_get_part_handler(self): - mock_part = mock.MagicMock() - mock_part.get_content_type.return_value = 0 - self._userdata.plugin_set.set = {0: 'fake value'} - response = self._userdata._get_part_handler(mock_part) - mock_part.get_content_type.assert_called_with() - self.assertEqual(response, 'fake value') - - @mock.patch('email.message_from_string') - def test_parse_mime(self, mock_message_from_string): - mock_msg = mock.MagicMock() - mock_message_from_string.return_value = mock_msg - response = self._userdata._parse_mime(self.fake_data) - mock_message_from_string.assert_called_once_with(self.fake_data) - mock_msg.walk.assert_called_once_with() - self.assertEqual(response, mock_msg.walk()) - - @mock.patch('re.search') - @mock.patch('tempfile.gettempdir') - @mock.patch('os.remove') - @mock.patch('os.path.isdir') - @mock.patch('os.path.join') - @mock.patch('os.path.exists') - @mock.patch('os.path.expandvars') - @mock.patch('cloudbaseinit.osutils.factory.OSUtilsFactory.get_os_utils') - def _test_handle(self, mock_get_os_utils, mock_path_expandvars, - mock_path_exists, mock_path_join, mock_path_isdir, - mock_os_remove, mock_gettempdir, mock_re_search, - fake_user_data, directory_exists): - #TODO: recheck, these are old! - mock_osutils = mock.MagicMock() - uuid.uuid4 = mock.MagicMock(return_value='randomID') - mock_path_join.return_value = 'fake_temp\\randomID' - match_instance = mock.MagicMock() - path = 'fake_temp\\randomID' - args = None - mock_get_os_utils.return_value = mock_osutils - - if fake_user_data == '^rem cmd\s': - side_effect = [match_instance] - number_of_calls = 1 - extension = '.cmd' - args = [path+extension] - shell = True - elif fake_user_data == '#!': - side_effect = [None, match_instance] - number_of_calls = 2 - extension = '.sh' - args = ['bash.exe', path+extension] - shell = False - elif fake_user_data == '#ps1\s': - side_effect = [None, None, match_instance] - number_of_calls = 3 - extension = '.ps1' - args = ['powershell.exe', '-ExecutionPolicy', 'RemoteSigned', - '-NonInteractive', path+extension] - shell = False + response = self._userdata._process_user_data(user_data=user_data) + if user_data.startswith('Content-Type: multipart'): + mock_load_plugins.assert_called_once_with() + mock_parse_mime.assert_called_once_with(user_data) + mock_process_part.assert_called_once_with(mock_part, + mock_load_plugins(), {}) + self.assertEqual(response, (base.PLUGIN_EXECUTION_DONE, reboot)) else: - side_effect = [None, None, None, match_instance] - number_of_calls = 4 - extension = '.ps1' - shell = False - if directory_exists: - args = [mock_path_expandvars('%windir%\\sysnative\\' - 'WindowsPowerShell\\v1.0\\' - 'powershell.exe'), - '-ExecutionPolicy', - 'RemoteSigned', '-NonInteractive', path+extension] - mock_path_isdir.return_value = True - else: - mock_path_isdir.return_value = False + mock_process_non_multi_part.assert_called_once_with(user_data) + self.assertEqual(response, mock_process_non_multi_part()) - mock_re_search.side_effect = side_effect + def test_process_user_data_multipart_reboot_true(self): + self._test_process_user_data(user_data='Content-Type: multipart', + reboot=True) - with mock.patch('cloudbaseinit.plugins.windows.userdata.open', - mock.mock_open(), create=True): - response = userdata.handle(fake_user_data) + def test_process_user_data_multipart_reboot_false(self): + self._test_process_user_data(user_data='Content-Type: multipart', + reboot=False) - tempfile.gettempdir.assert_called_once_with() + def test_process_user_data_non_multipart(self): + self._test_process_user_data(user_data='Content-Type: non-multipart', + reboot=False) - mock_path_join.assert_called_once_with(mock_gettempdir(), - str(uuid.uuid4())) - assert mock_re_search.call_count == number_of_calls - if args: - mock_osutils.execute_process.assert_called_with(args, shell) + @mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin' + '._add_part_handlers') + @mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin' + '._get_plugin_return_value') + def _test_process_part(self, mock_get_plugin_return_value, + mock_add_part_handlers, + handler_func, user_data_plugin, content_type): + mock_part = mock.MagicMock() + mock_user_data_plugins = mock.MagicMock() + mock_user_handlers = mock.MagicMock() + mock_user_handlers.get.side_effect = [handler_func] + mock_user_data_plugins.get.side_effect = [user_data_plugin] + if content_type: + _content_type = self._userdata._part_handler_content_type + mock_part.get_content_type.return_value = _content_type + else: + _content_type = 'other content type' + mock_part.get_content_type.return_value = _content_type - self.assertEqual(response, (1, False)) + response = self._userdata._process_part( + part=mock_part, user_data_plugins=mock_user_data_plugins, + user_handlers=mock_user_handlers) + mock_part.get_content_type.assert_called_once_with() + mock_user_handlers.get.assert_called_once_with( + _content_type) + if handler_func and handler_func is Exception: + self.assertEqual(mock_part.get_content_type.call_count, 2) + self.assertEqual(mock_part.get_filename.call_count, 2) + elif handler_func: + handler_func.assert_called_once_with(None, _content_type, + mock_part.get_filename(), + mock_part.get_payload()) - def test_handle_batch(self): - fake_user_data = '^rem cmd\s' - self._test_handle(fake_user_data=fake_user_data, - directory_exists=True) + self.assertEqual(mock_part.get_content_type.call_count, 1) + self.assertEqual(mock_part.get_filename.call_count, 2) + else: + mock_user_data_plugins.get.assert_called_once_with(_content_type) + if user_data_plugin and content_type: + user_data_plugin.process.assert_called_with(mock_part) + mock_add_part_handlers.assert_called_with( + mock_user_data_plugins, mock_user_handlers, + user_data_plugin.process()) + elif user_data_plugin and not content_type: + mock_get_plugin_return_value.assert_called_once_with( + user_data_plugin.process()) + self.assertEqual(response, mock_get_plugin_return_value()) - def test_handle_shell(self): - fake_user_data = '^#!' - self._test_handle(fake_user_data=fake_user_data, - directory_exists=True) + def test_process_part(self): + handler_func = mock.MagicMock() + self._test_process_part(handler_func=handler_func, + user_data_plugin=None, content_type=False) - def test_handle_powershell(self): - fake_user_data = '^#ps1\s' - self._test_handle(fake_user_data=fake_user_data, - directory_exists=True) + def test_process_part_no_handler_func(self): + user_data_plugin = mock.MagicMock() + self._test_process_part(handler_func=None, + user_data_plugin=user_data_plugin, + content_type=True) - def test_handle_powershell_sysnative(self): - fake_user_data = '#ps1_sysnative\s' - self._test_handle(fake_user_data=fake_user_data, - directory_exists=True) + def test_process_part_not_content_type(self): + user_data_plugin = mock.MagicMock() + self._test_process_part(handler_func=None, + user_data_plugin=user_data_plugin, + content_type=False) - def test_handle_powershell_sysnative_no_sysnative(self): - fake_user_data = '#ps1_sysnative\s' - self._test_handle(fake_user_data=fake_user_data, - directory_exists=False) + @mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin' + '._begin_part_process_event') + def _test_add_part_handlers(self, mock_begin_part_process_event, ret_val): + mock_user_data_plugins = mock.MagicMock(spec=dict) + mock_new_user_handlers = mock.MagicMock(spec=dict) + mock_user_handlers = mock.MagicMock(spec=dict) + mock_handler_func = mock.MagicMock() + + mock_new_user_handlers.items.return_value = [('fake content type', + mock_handler_func)] + if ret_val: + mock_user_data_plugins.get.return_value = mock_handler_func + else: + mock_user_data_plugins.get.return_value = None + + self._userdata._add_part_handlers( + user_data_plugins=mock_user_data_plugins, + user_handlers=mock_user_handlers, + new_user_handlers=mock_new_user_handlers) + mock_user_data_plugins.get.assert_called_with('fake content type') + if ret_val is None: + mock_user_handlers.__setitem__.assert_called_once_with( + 'fake content type', mock_handler_func) + mock_begin_part_process_event.assert_called_with(mock_handler_func) + + def test_add_part_handlers(self): + self._test_add_part_handlers(ret_val=None) + + def test_add_part_handlers_skip_part_handler(self): + mock_func = mock.MagicMock() + self._test_add_part_handlers(ret_val=mock_func) + + def test_begin_part_process_event(self): + mock_handler_func = mock.MagicMock() + self._userdata._begin_part_process_event( + handler_func=mock_handler_func) + mock_handler_func.assert_called_once_with(None, "__begin__", None, + None) + + def test_end_part_process_event(self): + mock_handler_func = mock.MagicMock() + self._userdata._end_part_process_event( + handler_func=mock_handler_func) + mock_handler_func.assert_called_once_with(None, "__end__", None, + None) + + @mock.patch('cloudbaseinit.plugins.windows.userdatautils' + '.execute_user_data_script') + @mock.patch('cloudbaseinit.plugins.windows.userdata.UserDataPlugin' + '._get_plugin_return_value') + def test_process_non_multi_part(self, mock_get_plugin_return_value, + mock_execute_user_data_script): + user_data = 'fake' + response = self._userdata._process_non_multi_part(user_data=user_data) + mock_execute_user_data_script.assert_called_once_with(user_data) + mock_get_plugin_return_value.assert_called_once_with( + mock_execute_user_data_script()) + self.assertEqual(response, mock_get_plugin_return_value()) diff --git a/cloudbaseinit/tests/plugins/windows/test_userdata_plugins.py b/cloudbaseinit/tests/plugins/windows/test_userdata_plugins.py deleted file mode 100644 index fe0b9cba..00000000 --- a/cloudbaseinit/tests/plugins/windows/test_userdata_plugins.py +++ /dev/null @@ -1,43 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2013 Mirantis Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import mock -import unittest - -from cloudbaseinit.plugins.windows import userdata_plugins - - -class MultipartUserDataPluginTest(unittest.TestCase): - - def setUp(self): - fake_path = 'fake path' - self._userdata = userdata_plugins.PluginSet(fake_path) - - @mock.patch('glob.glob') - @mock.patch('cloudbaseinit.plugins.windows.userdata_plugins.' - 'load_from_file') - def test_load(self, mock_load_from_file, mock_glob): - fake_files = ['fake_file.py'] - mock_plugin = mock.MagicMock() - mock_glob.return_value = fake_files - mock_load_from_file.return_value = mock_plugin - - self._userdata.load() - mock_glob.assert_called_once_with(self._userdata.path + '/*.py') - mock_load_from_file.assert_called_once_with('fake_file.py', - self._userdata) - self.assertEqual(self._userdata.set[mock_plugin.type], mock_plugin) diff --git a/cloudbaseinit/tests/plugins/windows/test_userdatautils.py b/cloudbaseinit/tests/plugins/windows/test_userdatautils.py new file mode 100644 index 00000000..acf3b8f0 --- /dev/null +++ b/cloudbaseinit/tests/plugins/windows/test_userdatautils.py @@ -0,0 +1,135 @@ +# Copyright 2014 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +import os +import uuid +import unittest + +from cloudbaseinit.openstack.common import cfg +from cloudbaseinit.plugins.windows import userdatautils +from cloudbaseinit.tests.metadata import fake_json_response + +CONF = cfg.CONF + + +class UserDataUtilsTest(unittest.TestCase): + + def setUp(self): + self.fake_data = fake_json_response.get_fake_metadata_json( + '2013-04-04') + + @mock.patch('re.search') + @mock.patch('tempfile.gettempdir') + @mock.patch('os.remove') + @mock.patch('os.path.isdir') + @mock.patch('os.path.exists') + @mock.patch('os.path.expandvars') + @mock.patch('cloudbaseinit.osutils.factory.OSUtilsFactory.get_os_utils') + def _test_execute_user_data_script(self, mock_get_os_utils, + mock_path_expandvars, + mock_path_exists, mock_path_isdir, + mock_os_remove, mock_gettempdir, + mock_re_search, fake_user_data, + directory_exists): + mock_osutils = mock.MagicMock() + mock_gettempdir.return_value = 'fake_temp' + uuid.uuid4 = mock.MagicMock(return_value='randomID') + match_instance = mock.MagicMock() + path = os.path.join('fake_temp', 'randomID') + args = None + mock_get_os_utils.return_value = mock_osutils + if fake_user_data == '^rem cmd\s': + side_effect = [match_instance] + number_of_calls = 1 + extension = '.cmd' + args = [path+extension] + shell = True + elif fake_user_data == '^#!/usr/bin/env\spython\s': + side_effect = [None, match_instance] + number_of_calls = 2 + extension = '.py' + args = ['python.exe', path+extension] + shell = False + elif fake_user_data == '#!': + side_effect = [None, None, match_instance] + number_of_calls = 3 + extension = '.sh' + args = ['bash.exe', path+extension] + shell = False + elif fake_user_data == '#ps1\s': + side_effect = [None, None, None, match_instance] + number_of_calls = 4 + extension = '.ps1' + args = ['powershell.exe', '-ExecutionPolicy', 'RemoteSigned', + '-NonInteractive', path+extension] + shell = False + else: + side_effect = [None, None, None, None, match_instance] + number_of_calls = 5 + extension = '.ps1' + shell = False + if directory_exists: + args = [mock_path_expandvars('%windir%\\sysnative\\' + 'WindowsPowerShell\\v1.0\\' + 'powershell.exe'), + '-ExecutionPolicy', + 'RemoteSigned', '-NonInteractive', path+extension] + mock_path_isdir.return_value = True + else: + mock_path_isdir.return_value = False + + mock_re_search.side_effect = side_effect + + with mock.patch('cloudbaseinit.plugins.windows.userdatautils.open', + mock.mock_open(), create=True): + response = userdatautils.execute_user_data_script(fake_user_data) + mock_gettempdir.assert_called_once_with() + self.assertEqual(mock_re_search.call_count, number_of_calls) + if args: + mock_osutils.execute_process.assert_called_with(args, shell) + if not directory_exists: + self.assertEqual(response, 0) + else: + self.assertEqual(response, None) + + def test_handle_batch(self): + fake_user_data = '^rem cmd\s' + self._test_execute_user_data_script(fake_user_data=fake_user_data, + directory_exists=True) + + def test_handle_python(self): + fake_user_data = '^#!/usr/bin/env\spython\s' + self._test_execute_user_data_script(fake_user_data=fake_user_data, + directory_exists=True) + + def test_handle_shell(self): + fake_user_data = '^#!' + self._test_execute_user_data_script(fake_user_data=fake_user_data, + directory_exists=True) + + def test_handle_powershell(self): + fake_user_data = '^#ps1\s' + self._test_execute_user_data_script(fake_user_data=fake_user_data, + directory_exists=True) + + def test_handle_powershell_sysnative(self): + fake_user_data = '#ps1_sysnative\s' + self._test_execute_user_data_script(fake_user_data=fake_user_data, + directory_exists=True) + + def test_handle_powershell_sysnative_no_sysnative(self): + fake_user_data = '#ps1_sysnative\s' + self._test_execute_user_data_script(fake_user_data=fake_user_data, + directory_exists=False) diff --git a/cloudbaseinit/tests/plugins/windows/userdata_plugins/test_parthandler.py b/cloudbaseinit/tests/plugins/windows/userdata_plugins/test_parthandler.py deleted file mode 100644 index c1b0b3da..00000000 --- a/cloudbaseinit/tests/plugins/windows/userdata_plugins/test_parthandler.py +++ /dev/null @@ -1,87 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2013 Cloudbase Solutions Srl -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import importlib -import mock -import os -import unittest - -from cloudbaseinit.openstack.common import cfg -from cloudbaseinit.plugins.windows import userdata_plugins -#the name of the module includes "-", importlib.import_module is needed: -parthandler = importlib.import_module("cloudbaseinit.plugins.windows" - ".userdata-plugins.parthandler") - -CONF = cfg.CONF - - -class PartHandlerScriptHandlerTests(unittest.TestCase): - - def setUp(self): - parent_set = userdata_plugins.PluginSet('fake_path') - self._parthandler = parthandler.PartHandlerScriptHandler(parent_set) - - @mock.patch('imp.load_source') - @mock.patch('imp.load_compiled') - @mock.patch('cloudbaseinit.plugins.windows.userdata-plugins.parthandler' - '.__import__', create=True) - def _test_load_from_file(self, mock__import__, mock_load_compiled, - mock_load_source, filepath): - mock_module = mock.MagicMock() - mock__import__.return_value = mock_module - mod_name, file_ext = os.path.splitext(os.path.split(filepath)[-1]) - response = parthandler.load_from_file(filepath, 'fake_function') - print response - if file_ext.lower() == '.py': - mock_load_source.assert_called_with('path', filepath) - elif file_ext.lower() == '.pyc': - mock_load_compiled.assert_called_with('path', filepath) - mock__import__.assert_called_once_with('path') - self.assertEqual(response, mock_module.fake_function) - - def test_load_from_file_py(self): - fake_file_path = os.path.join(os.path.join('fake', 'file'), 'path') - self._test_load_from_file(filepath=fake_file_path + '.py') - - def test_load_from_file_pyc(self): - fake_file_path = os.path.join(os.path.join('fake', 'file'), 'path') - self._test_load_from_file(filepath=fake_file_path + '.pyc') - - @mock.patch('cloudbaseinit.plugins.windows.userdata-plugins.parthandler.' - 'load_from_file') - def test_process(self, mock_load_from_file): - mock_part = mock.MagicMock() - mock_part.get_filename.return_value = 'fake_name' - handler_path = self._parthandler.parent_set.path + "/part-handler/" - handler_path += 'fake_name' - expected = [mock.call(), - mock.call(handler_path, "list_types"), - mock.call(handler_path, "handle_part")] - mock_load_from_file().return_value = ['fake part'] - with mock.patch("cloudbaseinit.plugins.windows.userdata-plugins." - "parthandler.open", mock.mock_open(), create=True): - self._parthandler.process(mock_part) - - print mock_load_from_file.mock_calls - print self._parthandler.parent_set.custom_handlers - - mock_part.get_filename.assert_called_once_with() - mock_part.get_payload.assert_called_once_with() - self.assertEqual(mock_load_from_file.call_args_list, expected) - self.assertEqual(self._parthandler.parent_set.has_custom_handlers, - True) - self.assertEqual(self._parthandler.parent_set.custom_handlers, - {'fake part': mock_load_from_file()}) diff --git a/cloudbaseinit/tests/plugins/windows/userdata_plugins/__init__.py b/cloudbaseinit/tests/plugins/windows/userdataplugins/__init__.py similarity index 100% rename from cloudbaseinit/tests/plugins/windows/userdata_plugins/__init__.py rename to cloudbaseinit/tests/plugins/windows/userdataplugins/__init__.py diff --git a/cloudbaseinit/tests/plugins/windows/userdataplugins/test_cloudboothook.py b/cloudbaseinit/tests/plugins/windows/userdataplugins/test_cloudboothook.py new file mode 100644 index 00000000..30d4da1f --- /dev/null +++ b/cloudbaseinit/tests/plugins/windows/userdataplugins/test_cloudboothook.py @@ -0,0 +1,36 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2014 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +import unittest + +from cloudbaseinit.openstack.common import cfg +from cloudbaseinit.plugins.windows.userdataplugins import cloudboothook + +CONF = cfg.CONF + + +class CloudBootHookPluginTests(unittest.TestCase): + + def setUp(self): + self._cloud_hook = cloudboothook.CloudBootHookPlugin() + + @mock.patch('cloudbaseinit.plugins.windows.userdataplugins.base' + '.BaseUserDataPlugin.get_mime_type') + def test_process(self, mock_get_mime_type): + mock_part = mock.MagicMock() + self._cloud_hook.process(mock_part) + mock_get_mime_type.assert_called_once_with() diff --git a/cloudbaseinit/tests/plugins/windows/userdataplugins/test_cloudconfig.py b/cloudbaseinit/tests/plugins/windows/userdataplugins/test_cloudconfig.py new file mode 100644 index 00000000..732ce156 --- /dev/null +++ b/cloudbaseinit/tests/plugins/windows/userdataplugins/test_cloudconfig.py @@ -0,0 +1,36 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2014 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +import unittest + +from cloudbaseinit.openstack.common import cfg +from cloudbaseinit.plugins.windows.userdataplugins import cloudconfig + +CONF = cfg.CONF + + +class CloudConfigPluginTests(unittest.TestCase): + + def setUp(self): + self._cloudconfig = cloudconfig.CloudConfigPlugin() + + @mock.patch('cloudbaseinit.plugins.windows.userdataplugins.base' + '.BaseUserDataPlugin.get_mime_type') + def test_process(self, mock_get_mime_type): + mock_part = mock.MagicMock() + self._cloudconfig.process(mock_part) + mock_get_mime_type.assert_called_once_with() diff --git a/cloudbaseinit/tests/plugins/windows/userdataplugins/test_factory.py b/cloudbaseinit/tests/plugins/windows/userdataplugins/test_factory.py new file mode 100644 index 00000000..7f8431d3 --- /dev/null +++ b/cloudbaseinit/tests/plugins/windows/userdataplugins/test_factory.py @@ -0,0 +1,34 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2014 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +import unittest + +from cloudbaseinit.openstack.common import cfg +from cloudbaseinit.plugins.windows.userdataplugins import factory + +CONF = cfg.CONF + + +class UserDataPluginsFactoryTests(unittest.TestCase): + + def setUp(self): + self._factory = factory.UserDataPluginsFactory() + + @mock.patch('cloudbaseinit.utils.classloader.ClassLoader.load_class') + def test_process(self, mock_load_class): + response = self._factory.load_plugins() + self.assertTrue(response is not None) diff --git a/cloudbaseinit/tests/plugins/windows/userdata_plugins/test_heathandler.py b/cloudbaseinit/tests/plugins/windows/userdataplugins/test_heat.py similarity index 50% rename from cloudbaseinit/tests/plugins/windows/userdata_plugins/test_heathandler.py rename to cloudbaseinit/tests/plugins/windows/userdataplugins/test_heat.py index cba1467f..5c543cd4 100644 --- a/cloudbaseinit/tests/plugins/windows/userdata_plugins/test_heathandler.py +++ b/cloudbaseinit/tests/plugins/windows/userdataplugins/test_heat.py @@ -14,15 +14,11 @@ # License for the specific language governing permissions and limitations # under the License. -import importlib import mock import unittest from cloudbaseinit.openstack.common import cfg -from cloudbaseinit.plugins.windows import userdata_plugins -#the name of the module includes "-", importlib.import_module is needed: -heathandler = importlib.import_module("cloudbaseinit.plugins.windows" - ".userdata-plugins.heathandler") +from cloudbaseinit.plugins.windows.userdataplugins import heat CONF = cfg.CONF @@ -30,13 +26,24 @@ CONF = cfg.CONF class HeatUserDataHandlerTests(unittest.TestCase): def setUp(self): - parent_set = userdata_plugins.PluginSet - self._heathandler = heathandler.HeatUserDataHandler(parent_set) + self._heat = heat.HeatPlugin() - @mock.patch('cloudbaseinit.plugins.windows.userdata.handle') - def test_process(self, mock_handle): + @mock.patch('cloudbaseinit.plugins.windows.userdatautils' + '.execute_user_data_script') + def _test_process(self, mock_execute_user_data_script, filename): mock_part = mock.MagicMock() - mock_part.get_filename.return_value = "cfn-userdata" - self._heathandler.process(mock_part) - mock_part.get_filename.assert_called_once_with() - mock_handle.assert_called_once_with(mock_part.get_payload()) + mock_part.get_filename.return_value = filename + response = self._heat.process(mock_part) + mock_part.get_filename.assert_called_with() + if filename: + mock_execute_user_data_script.assert_called_with( + mock_part.get_payload()) + self.assertEqual(response, mock_execute_user_data_script()) + else: + self.assertTrue(response is None) + + def test_process(self): + self._test_process(filename='cfn-userdata') + + def test_process_content_not_supported(self): + self._test_process(filename=None) diff --git a/cloudbaseinit/tests/plugins/windows/userdataplugins/test_parthandler.py b/cloudbaseinit/tests/plugins/windows/userdataplugins/test_parthandler.py new file mode 100644 index 00000000..9e485b53 --- /dev/null +++ b/cloudbaseinit/tests/plugins/windows/userdataplugins/test_parthandler.py @@ -0,0 +1,52 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +import os +import unittest + +from cloudbaseinit.openstack.common import cfg +from cloudbaseinit.plugins.windows.userdataplugins import parthandler + +CONF = cfg.CONF + + +class PartHandlerPluginTests(unittest.TestCase): + + def setUp(self): + self._parthandler = parthandler.PartHandlerPlugin() + + @mock.patch('tempfile.gettempdir') + @mock.patch('cloudbaseinit.utils.classloader.ClassLoader.load_module') + def test_process(self, mock_load_module, mock_gettempdir): + mock_part = mock.MagicMock() + mock_part_handler = mock.MagicMock() + mock_part.get_filename.return_value = 'fake_name' + mock_gettempdir.return_value = 'fake_directory' + mock_load_module.return_value = mock_part_handler + mock_part_handler.list_types.return_value = ['fake part'] + + with mock.patch('cloudbaseinit.plugins.windows.userdataplugins.' + 'parthandler.open', + mock.mock_open(read_data='fake data'), create=True): + response = self._parthandler.process(mock_part) + + mock_part.get_filename.assert_called_once_with() + mock_load_module.assert_called_once_with(os.path.join( + 'fake_directory', 'fake_name')) + mock_part_handler.list_types.assert_called_once_with() + self.assertEqual(response, {'fake part': + mock_part_handler.handle_part}) diff --git a/cloudbaseinit/tests/plugins/windows/userdata_plugins/test_shellscript.py b/cloudbaseinit/tests/plugins/windows/userdataplugins/test_shellscript.py similarity index 79% rename from cloudbaseinit/tests/plugins/windows/userdata_plugins/test_shellscript.py rename to cloudbaseinit/tests/plugins/windows/userdataplugins/test_shellscript.py index bc61693b..a6bb2dd3 100644 --- a/cloudbaseinit/tests/plugins/windows/userdata_plugins/test_shellscript.py +++ b/cloudbaseinit/tests/plugins/windows/userdataplugins/test_shellscript.py @@ -14,25 +14,20 @@ # License for the specific language governing permissions and limitations # under the License. -import importlib import mock import os import unittest from cloudbaseinit.openstack.common import cfg -from cloudbaseinit.plugins.windows import userdata_plugins -#the name of the module includes "-", importlib.import_module is needed: -shellscript = importlib.import_module("cloudbaseinit.plugins.windows" - ".userdata-plugins.shellscript") +from cloudbaseinit.plugins.windows.userdataplugins import shellscript CONF = cfg.CONF -class ShellScriptHandlerTests(unittest.TestCase): +class ShellScriptPluginTests(unittest.TestCase): def setUp(self): - parent_set = userdata_plugins.PluginSet('fake_path') - self._shellscript = shellscript.ShellScriptHandler(parent_set) + self._shellscript = shellscript.ShellScriptPlugin() @mock.patch('cloudbaseinit.osutils.factory.OSUtilsFactory.get_os_utils') @mock.patch('tempfile.gettempdir') @@ -49,7 +44,7 @@ class ShellScriptHandlerTests(unittest.TestCase): if exception: mock_osutils.execute_process.side_effect = [Exception] - with mock.patch("cloudbaseinit.plugins.windows.userdata-plugins." + with mock.patch("cloudbaseinit.plugins.windows.userdataplugins." "shellscript.open", mock.mock_open(), create=True): response = self._shellscript.process(mock_part) @@ -61,21 +56,27 @@ class ShellScriptHandlerTests(unittest.TestCase): elif filename.endswith(".sh"): mock_osutils.execute_process.assert_called_with( ['bash.exe', os.path.join(fake_dir_path, filename)], False) + elif filename.endswith(".py"): + mock_osutils.execute_process.assert_called_with( + ['python.exe', os.path.join(fake_dir_path, filename)], False) elif filename.endswith(".ps1"): mock_osutils.execute_process.assert_called_with( ['powershell.exe', '-ExecutionPolicy', 'RemoteSigned', '-NonInteractive', os.path.join(fake_dir_path, filename)], - False) + False) self.assertFalse(response) def test_process_cmd(self): self._test_process(filename='fake.cmd') def test_process_sh(self): - self._test_process(filename='fake.cmd') + self._test_process(filename='fake.sh') + + def test_process_py(self): + self._test_process(filename='fake.py') def test_process_ps1(self): - self._test_process(filename='fake.cmd') + self._test_process(filename='fake.ps1') def test_process_other(self): self._test_process(filename='fake.other')