Adds support for renaming the builtin account administrator
Add new parameter which renames the builtin administrator user instead of creating a new one. The new account name is provided by the metadata service or specified in the conf file using the "username" option. - rename_admin_user: type bool, default value False Change-Id: I5f8c98c4854db21f7bb8eeddb81e9cf98c309a01 Implements: blueprint add-rename-administrator-option Co-Authored-By: Paula Madalina Crismaru <pcrismaru@cloudbasesolutions.com>
This commit is contained in:
parent
278468d2e2
commit
f4fce4f37c
@ -95,6 +95,10 @@ class GlobalOptions(conf_base.Options):
|
||||
'groups', default=['Administrators'],
|
||||
help='List of local groups to which the user specified in '
|
||||
'"username" will be added'),
|
||||
cfg.BoolOpt(
|
||||
'rename_admin_user', default=False,
|
||||
help='Renames the builtin admin user instead of creating a '
|
||||
'new user'),
|
||||
cfg.StrOpt(
|
||||
'heat_config_dir', default='C:\\cfn',
|
||||
help='The directory where the Heat configuration files must '
|
||||
|
@ -137,6 +137,9 @@ class BaseMetadataService(object):
|
||||
in the namedtuple defined above.
|
||||
"""
|
||||
|
||||
def get_admin_username(self):
|
||||
pass
|
||||
|
||||
def get_admin_password(self):
|
||||
pass
|
||||
|
||||
|
@ -54,6 +54,15 @@ class BaseOSUtils(object):
|
||||
def create_user(self, username, password, password_expires=False):
|
||||
raise NotImplementedError()
|
||||
|
||||
def rename_user(self, username, new_username):
|
||||
raise NotImplementedError()
|
||||
|
||||
def enum_users(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def is_builtin_admin(self, username):
|
||||
raise NotImplementedError()
|
||||
|
||||
def set_user_password(self, username, password, password_expires=False):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
@ -407,6 +407,39 @@ class WindowsUtils(base.BaseOSUtils):
|
||||
raise exception.CloudbaseInitException(
|
||||
"Create user failed: %s" % ex.args[2])
|
||||
|
||||
def rename_user(self, username, new_username):
|
||||
user_info = {
|
||||
"name": new_username,
|
||||
}
|
||||
try:
|
||||
win32net.NetUserSetInfo(None, username, 0, user_info)
|
||||
except win32net.error as ex:
|
||||
if ex.args[0] == self.NERR_UserNotFound:
|
||||
raise exception.ItemNotFoundException(
|
||||
"User not found: %s" % username)
|
||||
else:
|
||||
raise exception.CloudbaseInitException(
|
||||
"Renaming user failed: %s" % ex.args[2])
|
||||
|
||||
def enum_users(self):
|
||||
usernames = []
|
||||
resume_handle = 0
|
||||
while True:
|
||||
try:
|
||||
users_info, total, resume_handle = win32net.NetUserEnum(
|
||||
None, 0, win32netcon.FILTER_NORMAL_ACCOUNT, resume_handle)
|
||||
except win32net.error as ex:
|
||||
raise exception.CloudbaseInitException(
|
||||
"Enumerating users failed: %s" % ex.args[2])
|
||||
|
||||
usernames += [u["name"] for u in users_info]
|
||||
if not resume_handle:
|
||||
return usernames
|
||||
|
||||
def is_builtin_admin(self, username):
|
||||
sid = self.get_user_sid(username)
|
||||
return sid and sid.startswith(u"S-1-5-") and sid.endswith(u"-500")
|
||||
|
||||
def _get_user_info(self, username, level):
|
||||
try:
|
||||
return win32net.NetUserGetInfo(None, username, level)
|
||||
|
@ -54,13 +54,27 @@ class BaseCreateUserPlugin(base.BasePlugin):
|
||||
return osutils.generate_random_password(maximum_length)
|
||||
|
||||
def execute(self, service, shared_data):
|
||||
user_name = CONF.username
|
||||
user_name = service.get_admin_username() or CONF.username
|
||||
shared_data[constants.SHARED_DATA_USERNAME] = user_name
|
||||
|
||||
osutils = osutils_factory.get_os_utils()
|
||||
password = self._get_password(osutils)
|
||||
|
||||
if osutils.user_exists(user_name):
|
||||
if CONF.rename_admin_user:
|
||||
admin_user_name = [u for u in osutils.enum_users()
|
||||
if osutils.is_builtin_admin(u)][0]
|
||||
|
||||
if admin_user_name.lower() != user_name.lower():
|
||||
LOG.info('Renaming builtin admin user "%(admin_user_name)s" '
|
||||
'to %(new_user_name)s and setting password',
|
||||
{'admin_user_name': admin_user_name,
|
||||
'new_user_name': user_name})
|
||||
osutils.rename_user(admin_user_name, user_name)
|
||||
osutils.set_user_password(user_name, password)
|
||||
else:
|
||||
LOG.info('"%s" is already the name of the builtin admin '
|
||||
'user, skipping renaming', user_name)
|
||||
elif osutils.user_exists(user_name):
|
||||
LOG.info('Setting password for existing user "%s"', user_name)
|
||||
osutils.set_user_password(user_name, password)
|
||||
else:
|
||||
|
@ -2361,3 +2361,92 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
|
||||
|
||||
def test_trim_exception(self):
|
||||
self._test_trim(err=True)
|
||||
|
||||
def _test_rename_user(self, exc=None):
|
||||
new_username = "new username"
|
||||
user_info = {
|
||||
"name": new_username,
|
||||
}
|
||||
userset_mock = self._win32net_mock.NetUserSetInfo
|
||||
if exc:
|
||||
userset_mock.side_effect = [exc]
|
||||
error_class = (
|
||||
exception.ItemNotFoundException if
|
||||
exc.args[0] == self._winutils.NERR_UserNotFound else
|
||||
exception.CloudbaseInitException)
|
||||
with self.assertRaises(error_class):
|
||||
self._winutils.rename_user(self._USERNAME, new_username)
|
||||
return
|
||||
self._winutils.rename_user(self._USERNAME, new_username)
|
||||
userset_mock.assert_called_once_with(
|
||||
None, self._USERNAME, 0, user_info)
|
||||
|
||||
def test_rename_user(self):
|
||||
self._test_rename_user()
|
||||
|
||||
def test_rename_user_item_not_found(self):
|
||||
exc = self._win32net_mock.error(self._winutils.NERR_UserNotFound,
|
||||
*([mock.Mock()] * 2))
|
||||
self._test_rename_user(exc=exc)
|
||||
|
||||
def test_rename_user_failed(self):
|
||||
exc = self._win32net_mock.error(*([mock.Mock()] * 3))
|
||||
self._test_rename_user(exc=exc)
|
||||
|
||||
def _test_enum_users(self, resume_handle=False, exc=None):
|
||||
userenum_mock = self._win32net_mock.NetUserEnum
|
||||
|
||||
if exc is not None:
|
||||
userenum_mock.side_effect = [exc]
|
||||
with self.assertRaises(exception.CloudbaseInitException):
|
||||
self._winutils.enum_users()
|
||||
return
|
||||
else:
|
||||
userenum_mock.side_effect = (
|
||||
[([{"name": "fake name"}], mock.sentinel, False)] * 3 +
|
||||
[([{"name": "fake name"}], mock.sentinel, resume_handle)])
|
||||
self._winutils.enum_users()
|
||||
|
||||
result = self._winutils.enum_users()
|
||||
if resume_handle:
|
||||
self.assertEqual(result, ["fake name"] * 3)
|
||||
|
||||
def test_enum_users_exception(self):
|
||||
exc = self._win32net_mock.error(self._win32net_mock.error,
|
||||
*([mock.Mock()] * 2))
|
||||
self._test_enum_users(exc=exc)
|
||||
|
||||
def test_enum_users(self):
|
||||
self._test_enum_users(resume_handle=False)
|
||||
|
||||
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
|
||||
'.get_user_sid')
|
||||
def _test_is_builtin_admin(self, mock_get_user_sid, sid_exists,
|
||||
sid_startswith, sid_endswith):
|
||||
mock_sid = mock.Mock()
|
||||
mock_sid.startswith.return_value = sid_startswith
|
||||
mock_sid.endswith.return_value = sid_endswith
|
||||
if sid_exists:
|
||||
mock_get_user_sid.return_value = mock_sid
|
||||
else:
|
||||
mock_get_user_sid.return_value = False
|
||||
expected_result = sid_exists and sid_startswith and sid_endswith
|
||||
|
||||
result = self._winutils.is_builtin_admin(mock.sentinel)
|
||||
self.assertEqual(result, expected_result)
|
||||
|
||||
def test_is_builtin_admin_no_sid(self):
|
||||
self._test_is_builtin_admin(sid_exists=False,
|
||||
sid_startswith=True, sid_endswith=True)
|
||||
|
||||
def test_is_builtin_admin_sid_no_startswith(self):
|
||||
self._test_is_builtin_admin(sid_exists=True,
|
||||
sid_startswith=False, sid_endswith=True)
|
||||
|
||||
def test_is_builtin_admin_sid_no_endswith(self):
|
||||
self._test_is_builtin_admin(sid_exists=True,
|
||||
sid_startswith=True, sid_endswith=False)
|
||||
|
||||
def test_is_builtin_admin(self):
|
||||
self._test_is_builtin_admin(sid_exists=True,
|
||||
sid_startswith=True, sid_endswith=True)
|
||||
|
@ -58,29 +58,51 @@ class CreateUserPluginTests(unittest.TestCase):
|
||||
@mock.patch.object(CreateUserPlugin, 'post_create_user')
|
||||
def _test_execute(self, mock_post_create_user, mock_create_user,
|
||||
mock_get_password, mock_get_os_utils,
|
||||
user_exists=True,
|
||||
group_adding_works=True):
|
||||
user_exists=False, group_adding_works=True,
|
||||
rename_admin_user=False, rename_admin_taken=False):
|
||||
shared_data = {}
|
||||
mock_osutils = mock.MagicMock()
|
||||
mock_service = mock.MagicMock()
|
||||
mock_service.get_admin_username.return_value = CONF.username
|
||||
mock_get_password.return_value = 'password'
|
||||
mock_get_os_utils.return_value = mock_osutils
|
||||
mock_osutils.user_exists.return_value = user_exists
|
||||
if rename_admin_user:
|
||||
mock_osutils.is_builtin_admin.return_value = True
|
||||
if rename_admin_taken:
|
||||
mock_osutils.enum_users.return_value = [CONF.username]
|
||||
else:
|
||||
mock_osutils.enum_users.return_value = ["fake user name"]
|
||||
if not group_adding_works:
|
||||
mock_osutils.add_user_to_local_group.side_effect = Exception
|
||||
|
||||
with testutils.ConfPatcher('rename_admin_user', rename_admin_user):
|
||||
with testutils.LogSnatcher("cloudbaseinit.plugins.common."
|
||||
"createuser") as snatcher:
|
||||
response = self._create_user.execute(mock_service, shared_data)
|
||||
|
||||
mock_get_os_utils.assert_called_once_with()
|
||||
mock_get_password.assert_called_once_with(mock_osutils)
|
||||
mock_osutils.user_exists.assert_called_once_with(CONF.username)
|
||||
|
||||
if user_exists:
|
||||
mock_osutils.user_exists.assert_called_once_with(CONF.username)
|
||||
mock_osutils.set_user_password.assert_called_once_with(
|
||||
CONF.username, 'password')
|
||||
expected_logging = ["Setting password for existing user \"%s\""
|
||||
% CONF.username]
|
||||
elif rename_admin_user:
|
||||
if rename_admin_taken:
|
||||
expected_logging = [
|
||||
'"%s" is already the name of the builtin admin '
|
||||
'user, skipping renaming' % CONF.username
|
||||
]
|
||||
else:
|
||||
expected_logging = [
|
||||
'Renaming builtin admin user "{admin_user_name}" '
|
||||
'to {new_user_name} and setting password'.format(
|
||||
admin_user_name="fake user name",
|
||||
new_user_name=CONF.username)
|
||||
]
|
||||
else:
|
||||
mock_create_user.assert_called_once_with(
|
||||
CONF.username, 'password',
|
||||
@ -110,3 +132,12 @@ class CreateUserPluginTests(unittest.TestCase):
|
||||
|
||||
def test_execute_add_to_group_fails(self):
|
||||
self._test_execute(group_adding_works=False)
|
||||
|
||||
def test_execute_rename_admin(self):
|
||||
self._test_execute(
|
||||
user_exists=False,
|
||||
rename_admin_user=True)
|
||||
|
||||
def test_execute_rename_admin_taken(self):
|
||||
self._test_execute(rename_admin_user=True,
|
||||
rename_admin_taken=True)
|
||||
|
Loading…
Reference in New Issue
Block a user