Add Windows SMB share driver

This patch proposes a new share driver, handling Windows service
instances and exporting SMB shares.

WinRM is used for managing the service instances, allowing
password or certificate based authentication.

A new service instance manager is introduced, handling Windows
instances. One requirement is that Cloudbase-Init is present on the
instance.

If AD credentials are provided via security services, the instance
is joined to an AD domain. This can be leveraged in order to set
user based permissions.

A CI testing this driver will be up by L-3. Meanwhile, a link to
Tempest results can be found on the blueprint page whiteboard.

Change-Id: Ice5ee63ee4dc2cfcb41567214c165c2705421920
Implements: blueprint windows-smb-support
This commit is contained in:
Lucian Petrut 2015-06-15 16:37:32 +03:00
parent 8d9ef0b7c8
commit d9c0bda0b7
11 changed files with 2116 additions and 3 deletions

View File

@ -69,9 +69,12 @@ SSH_PORTS = (
PING_PORTS = ( PING_PORTS = (
("icmp", (-1, -1)), ("icmp", (-1, -1)),
) )
WINRM_PORTS = (
("tcp", (5985, 5986)),
)
SERVICE_INSTANCE_SECGROUP_DATA = ( SERVICE_INSTANCE_SECGROUP_DATA = (
CIFS_PORTS + NFS_PORTS + SSH_PORTS + PING_PORTS) CIFS_PORTS + NFS_PORTS + SSH_PORTS + PING_PORTS + WINRM_PORTS)
ACCESS_LEVEL_RW = 'rw' ACCESS_LEVEL_RW = 'rw'
ACCESS_LEVEL_RO = 'ro' ACCESS_LEVEL_RO = 'ro'

View File

@ -168,7 +168,7 @@ class API(base.Base):
block_device_mapping=None, block_device_mapping=None,
block_device_mapping_v2=None, nics=None, block_device_mapping_v2=None, nics=None,
availability_zone=None, instance_count=1, availability_zone=None, instance_count=1,
admin_pass=None): admin_pass=None, meta=None):
return _untranslate_server_summary_view( return _untranslate_server_summary_view(
novaclient(context).servers.create( novaclient(context).servers.create(
name, image, flavor, userdata=user_data, name, image, flavor, userdata=user_data,
@ -176,7 +176,8 @@ class API(base.Base):
block_device_mapping=block_device_mapping, block_device_mapping=block_device_mapping,
block_device_mapping_v2=block_device_mapping_v2, block_device_mapping_v2=block_device_mapping_v2,
nics=nics, availability_zone=availability_zone, nics=nics, availability_zone=availability_zone,
min_count=instance_count, admin_pass=admin_pass) min_count=instance_count, admin_pass=admin_pass,
meta=meta)
) )
def server_delete(self, context, instance): def server_delete(self, context, instance):

View File

@ -62,6 +62,7 @@ import manila.share.drivers.ibm.gpfs
import manila.share.drivers.netapp.options import manila.share.drivers.netapp.options
import manila.share.drivers.quobyte.quobyte import manila.share.drivers.quobyte.quobyte
import manila.share.drivers.service_instance import manila.share.drivers.service_instance
import manila.share.drivers.windows.service_instance
import manila.share.drivers.windows.winrm_helper import manila.share.drivers.windows.winrm_helper
import manila.share.drivers.zfssa.zfssashare import manila.share.drivers.zfssa.zfssashare
import manila.share.drivers_private_data import manila.share.drivers_private_data
@ -125,6 +126,7 @@ _global_opt_lists = [
manila.share.drivers.service_instance.common_opts, manila.share.drivers.service_instance.common_opts,
manila.share.drivers.service_instance.no_share_servers_handling_mode_opts, manila.share.drivers.service_instance.no_share_servers_handling_mode_opts,
manila.share.drivers.service_instance.share_servers_handling_mode_opts, manila.share.drivers.service_instance.share_servers_handling_mode_opts,
manila.share.drivers.windows.service_instance.windows_share_server_opts,
manila.share.drivers.windows.winrm_helper.winrm_opts, manila.share.drivers.windows.winrm_helper.winrm_opts,
manila.share.drivers.zfssa.zfssashare.ZFSSA_OPTS, manila.share.drivers.zfssa.zfssashare.ZFSSA_OPTS,
manila.share.manager.share_manager_opts, manila.share.manager.share_manager_opts,

View File

@ -0,0 +1,281 @@
# Copyright (c) 2015 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import re
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import log
from manila import exception
from manila.i18n import _, _LI, _LW
from manila.share.drivers import service_instance
from manila.share.drivers.windows import windows_utils
from manila.share.drivers.windows import winrm_helper
CONF = cfg.CONF
LOG = log.getLogger(__name__)
windows_share_server_opts = [
cfg.StrOpt(
"winrm_cert_pem_path",
default="~/.ssl/cert.pem",
help="Path to the x509 certificate used for accessing the service"
"instance."),
cfg.StrOpt(
"winrm_cert_key_pem_path",
default="~/.ssl/key.pem",
help="Path to the x509 certificate key."),
cfg.BoolOpt(
"winrm_use_cert_based_auth",
default=False,
help="Use x509 certificates in order to authenticate to the"
"service instance.")
]
CONF = cfg.CONF
CONF.register_opts(windows_share_server_opts)
class WindowsServiceInstanceManager(service_instance.ServiceInstanceManager):
""""Manages Windows Nova instances."""
_INSTANCE_CONNECTION_PROTO = "WinRM"
_CBS_INIT_RUN_PLUGIN_AFTER_REBOOT = 2
_CBS_INIT_WINRM_PLUGIN = "ConfigWinRMListenerPlugin"
_DEFAULT_MINIMUM_PASS_LENGTH = 6
def __init__(self, driver_config=None, remote_execute=None):
super(WindowsServiceInstanceManager, self).__init__(
driver_config=driver_config)
driver_config.append_config_values(windows_share_server_opts)
self._use_cert_auth = self.get_config_option(
"winrm_use_cert_based_auth")
self._cert_pem_path = self.get_config_option(
"winrm_cert_pem_path")
self._cert_key_pem_path = self.get_config_option(
"winrm_cert_key_pem_path")
self._check_auth_mode()
self._remote_execute = (remote_execute or
winrm_helper.WinRMHelper(
configuration=driver_config).execute)
self._windows_utils = windows_utils.WindowsUtils(
remote_execute=self._remote_execute)
def _check_auth_mode(self):
if self._use_cert_auth:
if not (os.path.exists(self._cert_pem_path) and
os.path.exists(self._cert_key_pem_path)):
msg = _("Certificate based authentication was configured "
"but one or more certificates are missing.")
raise exception.ServiceInstanceException(msg)
LOG.debug("Using certificate based authentication for "
"service instances.")
else:
instance_password = self.get_config_option(
"service_instance_password")
if not self._check_password_complexity(instance_password):
msg = _("The configured service instance password does not "
"match the minimum complexity requirements. "
"The password must contain at least %s characters. "
"Also, it must contain at least one digit, "
"one lower case and one upper case character.")
raise exception.ServiceInstanceException(
msg % self._DEFAULT_MINIMUM_PASS_LENGTH)
LOG.debug("Using password based authentication for "
"service instances.")
def _get_auth_info(self):
auth_info = {'use_cert_auth': self._use_cert_auth}
if self._use_cert_auth:
auth_info.update(cert_pem_path=self._cert_pem_path,
cert_key_pem_path=self._cert_key_pem_path)
return auth_info
def get_common_server(self):
data = super(WindowsServiceInstanceManager, self).get_common_server()
data['backend_details'].update(self._get_auth_info())
return data
def _get_new_instance_details(self, server):
instance_details = super(WindowsServiceInstanceManager,
self)._get_new_instance_details(server)
instance_details.update(self._get_auth_info())
return instance_details
def _check_password_complexity(self, password):
# Make sure that the Windows complexity requirements are met:
# http://technet.microsoft.com/en-us/library/cc786468(v=ws.10).aspx
if len(password) < self._DEFAULT_MINIMUM_PASS_LENGTH:
return False
for r in ("[a-z]", "[A-Z]", "[0-9]"):
if not re.search(r, password):
return False
return True
def _test_server_connection(self, server):
try:
self._remote_execute(server, "whoami", retry=False)
LOG.debug("Service VM %s is available via WinRM",
server['ip'])
return True
except Exception as ex:
LOG.debug("Server %(ip)s is not available via WinRM. "
"Exception: %(ex)s ",
dict(ip=server['ip'],
ex=ex))
return False
def _get_service_instance_create_kwargs(self):
create_kwargs = {}
if self._use_cert_auth:
# At the moment, we pass the x509 certificate via user data.
# We'll use keypairs instead as soon as the nova client will
# support x509 certificates.
with open(self._cert_pem_path, 'r') as f:
cert_pem_data = f.read()
create_kwargs['user_data'] = cert_pem_data
else:
# The admin password has to be specified via instance metadata in
# order to be passed to the instance via the metadata service or
# configdrive.
admin_pass = self.get_config_option("service_instance_password")
create_kwargs['meta'] = {'admin_pass': admin_pass}
return create_kwargs
def set_up_service_instance(self, context, network_info):
instance_details = super(WindowsServiceInstanceManager,
self).set_up_service_instance(context,
network_info)
security_services = network_info['security_services']
security_service = self.get_valid_security_service(security_services)
if security_service:
self._setup_security_service(instance_details, security_service)
instance_details['joined_domain'] = bool(security_service)
return instance_details
def _setup_security_service(self, server, security_service):
domain = security_service['domain']
admin_username = security_service['user']
admin_password = security_service['password']
dns_ip = security_service['dns_ip']
self._windows_utils.set_dns_client_search_list(server, [domain])
if_index = self._windows_utils.get_interface_index_by_ip(server,
server['ip'])
self._windows_utils.set_dns_client_server_addresses(server,
if_index,
[dns_ip])
# Joining an AD domain will alter the WinRM Listener configuration.
# Cloudbase-init is required to be running on the Windows service
# instance, so we re-enable the plugin configuring the WinRM listener.
#
# TODO(lpetrut): add a config option so that we may rely on the AD
# group policies taking care of the WinRM configuration.
self._run_cloudbase_init_plugin_after_reboot(
server, plugin_name=self._CBS_INIT_WINRM_PLUGIN)
self._join_domain(server, domain, admin_username, admin_password)
def _join_domain(self, server, domain, admin_username, admin_password):
# As the WinRM configuration may be altered and existing connections
# closed, we may not be able to retrieve the result of this operation.
# Instead, we'll ensure that the instance actually joined the domain
# after the reboot.
try:
self._windows_utils.join_domain(server, domain, admin_username,
admin_password)
except processutils.ProcessExecutionError:
raise
except Exception as exc:
LOG.debug("Unexpected error while attempting to join domain "
"%(domain)s. Verifying the result of the operation "
"after instance reboot. Exception: %(exc)s",
dict(domain=domain, exc=exc))
# We reboot the service instance using the Compute API so that
# we can wait for it to become active.
self.reboot_server(server, soft_reboot=True)
self.wait_for_instance_to_be_active(
server['instance_id'],
timeout=self.max_time_to_build_instance)
if not self._check_server_availability(server):
raise exception.ServiceInstanceException(
_('%(conn_proto)s connection has not been '
'established to %(server)s in %(time)ss. Giving up.') % {
'conn_proto': self._INSTANCE_CONNECTION_PROTO,
'server': server['ip'],
'time': self.max_time_to_build_instance})
current_domain = self._windows_utils.get_current_domain(server)
if current_domain != domain:
err_msg = _("Failed to join domain %(requested_domain)s. "
"Current domain: %(current_domain)s")
raise exception.ServiceInstanceException(
err_msg % dict(requested_domain=domain,
current_domain=current_domain))
def get_valid_security_service(self, security_services):
if not security_services:
LOG.info(_LI("No security services provided."))
elif len(security_services) > 1:
LOG.warn(_LW("Multiple security services provided. Only one "
"security service of type 'active_directory' "
"is supported."))
else:
security_service = security_services[0]
security_service_type = security_service['type']
if security_service_type == 'active_directory':
return security_service
else:
LOG.warn(_LW("Only security services of type "
"'active_directory' are supported. "
"Retrieved security service type: %(sec_type)s"),
{'sec_type': security_service_type})
return None
def _run_cloudbase_init_plugin_after_reboot(self, server, plugin_name):
cbs_init_reg_section = self._get_cbs_init_reg_section(server)
plugin_key_path = "%(cbs_init_section)s\\%(instance_id)s\\Plugins" % {
'cbs_init_section': cbs_init_reg_section,
'instance_id': server['instance_id']
}
self._windows_utils.set_win_reg_value(
server, path=plugin_key_path, key=plugin_name,
value=self._CBS_INIT_RUN_PLUGIN_AFTER_REBOOT)
def _get_cbs_init_reg_section(self, server):
base_path = 'hklm:\\SOFTWARE'
cbs_section = 'Cloudbase Solutions\\Cloudbase-Init'
for upper_section in ('', 'Wow6432Node'):
cbs_init_section = self._windows_utils.normalize_path(
os.path.join(base_path, upper_section, cbs_section))
try:
self._windows_utils.get_win_reg_value(
server, path=cbs_init_section)
return cbs_init_section
except processutils.ProcessExecutionError as ex:
# The exit code will always be '1' in case of errors, so the
# only way to determine the error type is checking stderr.
if 'Cannot find path' in ex.stderr:
continue
else:
raise
raise exception.ServiceInstanceException(
_("Could not retrieve Cloudbase Init registry section"))

View File

@ -0,0 +1,164 @@
# Copyright (c) 2015 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from oslo_log import log
from oslo_utils import units
from manila.i18n import _LW
from manila.share import driver as base_driver
from manila.share.drivers import generic
from manila.share.drivers.windows import service_instance
from manila.share.drivers.windows import windows_smb_helper
from manila.share.drivers.windows import windows_utils
from manila.share.drivers.windows import winrm_helper
LOG = log.getLogger(__name__)
class WindowsSMBDriver(generic.GenericShareDriver):
# NOTE(lpetrut): The first partition will be reserved by the OS.
_DEFAULT_SHARE_PARTITION = 2
def __init__(self, *args, **kwargs):
super(WindowsSMBDriver, self).__init__(*args, **kwargs)
self._remote_execute = winrm_helper.WinRMHelper(
configuration=self.configuration).execute
self._windows_utils = windows_utils.WindowsUtils(
remote_execute=self._remote_execute)
self._smb_helper = windows_smb_helper.WindowsSMBHelper(
remote_execute=self._remote_execute,
configuration=self.configuration)
def _update_share_stats(self, data=None):
base_driver.ShareDriver._update_share_stats(
self, data=dict(storage_protocol="CIFS"))
def _setup_service_instance_manager(self):
self.service_instance_manager = (
service_instance.WindowsServiceInstanceManager(
driver_config=self.configuration))
def _setup_helpers(self):
self._helpers = {key: self._smb_helper for key in ("SMB", "CIFS")}
def _teardown_server(self, server_details, security_services=None):
security_service = (
self.service_instance_manager.get_valid_security_service(
security_services))
if server_details.get('joined_domain') and security_service:
try:
self._windows_utils.unjoin_domain(server_details,
security_service['user'],
security_service['password'])
except Exception as exc:
LOG.warn(_LW("Failed to remove service instance "
"%(instance_id)s from domain %(domain)s. "
"Exception: %(exc)s"),
dict(instance_id=server_details['instance_id'],
domain=security_service['domain'],
exc=exc))
super(WindowsSMBDriver, self)._teardown_server(server_details,
security_services)
def _format_device(self, server_details, volume):
disk_number = self._get_disk_number(server_details, volume)
self._windows_utils.initialize_disk(server_details, disk_number)
self._windows_utils.create_partition(server_details, disk_number)
self._windows_utils.format_partition(
server_details, disk_number,
self._DEFAULT_SHARE_PARTITION)
def _mount_device(self, share, server_details, volume):
mount_path = self._get_mount_path(share)
if not self._is_device_mounted(mount_path, server_details, volume):
disk_number = self._get_disk_number(server_details, volume)
self._windows_utils.ensure_directory_exists(server_details,
mount_path)
self._ensure_disk_online_and_writable(server_details, disk_number)
self._windows_utils.add_access_path(server_details,
mount_path,
disk_number,
self._DEFAULT_SHARE_PARTITION)
def _unmount_device(self, share, server_details):
mount_path = self._get_mount_path(share)
disk_number = self._windows_utils.get_disk_number_by_mount_path(
server_details, mount_path)
self._windows_utils.remove(server_details, mount_path,
is_junction=True)
if disk_number:
self._windows_utils.set_disk_online_status(
server_details, disk_number, online=False)
def _resize_filesystem(self, server_details, volume, new_size=None):
disk_number = self._get_disk_number(server_details, volume)
self._ensure_disk_online_and_writable(server_details, disk_number)
if not new_size:
new_size_bytes = self._windows_utils.get_partition_maximum_size(
server_details, disk_number, self._DEFAULT_SHARE_PARTITION)
else:
new_size_bytes = new_size * units.Gi
self._windows_utils.resize_partition(server_details,
new_size_bytes,
disk_number,
self._DEFAULT_SHARE_PARTITION)
def _ensure_disk_online_and_writable(self, server_details, disk_number):
self._windows_utils.update_disk(server_details, disk_number)
self._windows_utils.set_disk_readonly_status(
server_details, disk_number, readonly=False)
self._windows_utils.set_disk_online_status(
server_details, disk_number, online=True)
def _get_mounted_share_size(self, mount_path, server_details):
total_bytes = self._windows_utils.get_disk_space_by_path(
server_details, mount_path)[0]
return float(total_bytes) / units.Gi
def _get_consumed_space(self, mount_path, server_details):
total_bytes, free_bytes = self._windows_utils.get_disk_space_by_path(
server_details, mount_path)
return float(total_bytes - free_bytes) / units.Gi
def _get_mount_path(self, share):
mount_path = os.path.join(self.configuration.share_mount_path,
share['name'])
return self._windows_utils.normalize_path(mount_path)
def _get_disk_number(self, server_details, volume):
disk_number = self._windows_utils.get_disk_number_by_serial_number(
server_details, volume['id'])
if disk_number is None:
LOG.debug("Could not identify the mounted disk by serial number "
"using the volume id %(volume_id)s. Attempting to "
"retrieve it by the volume mount point %(mountpoint)s.",
dict(volume_id=volume['id'],
mountpoint=volume['mountpoint']))
# Assumes the mount_point will be something like /dev/hdX
mount_point = volume['mountpoint']
disk_number = ord(mount_point[-1]) - ord('a')
return disk_number
def _is_device_mounted(self, mount_path, server_details, volume=None):
disk_number = self._windows_utils.get_disk_number_by_mount_path(
server_details, mount_path)
return disk_number is not None

View File

@ -0,0 +1,154 @@
# Copyright (c) 2015 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from oslo_log import log
from manila.common import constants
from manila import exception
from manila.i18n import _, _LI
from manila.share.drivers import generic
from manila.share.drivers.windows import windows_utils
LOG = log.getLogger(__name__)
class WindowsSMBHelper(generic.NASHelperBase):
_SHARE_ACCESS_RIGHT_MAP = {
constants.ACCESS_LEVEL_RW: "Change",
constants.ACCESS_LEVEL_RO: "Read"}
_ICACLS_ACCESS_RIGHT_MAP = {
constants.ACCESS_LEVEL_RW: 'M',
constants.ACCESS_LEVEL_RO: 'R'}
def __init__(self, remote_execute, configuration):
self._remote_exec = remote_execute
self.configuration = configuration
self._windows_utils = windows_utils.WindowsUtils(
remote_execute=remote_execute)
def init_helper(self, server):
self._remote_exec(server, "Get-SmbShare")
def create_export(self, server, share_name, recreate=False):
export_location = '\\\\%s\\%s' % (server['public_address'],
share_name)
if not self._share_exists(server, share_name):
share_path = self._windows_utils.normalize_path(
os.path.join(self.configuration.share_mount_path,
share_name))
cmd = ['New-SmbShare', '-Name', share_name, '-Path', share_path]
self._remote_exec(server, cmd)
else:
LOG.info(_LI("Skipping creating export %s as it already exists."),
share_name)
return export_location
def remove_export(self, server, share_name):
if self._share_exists(server, share_name):
cmd = ['Remove-SmbShare', '-Name', share_name, "-Force"]
self._remote_exec(server, cmd)
else:
LOG.debug("Skipping removing export %s as it does not exist.",
share_name)
def _get_volume_path_by_share_name(self, server, share_name):
share_path = self._get_share_path_by_name(server, share_name)
volume_path = self._windows_utils.get_volume_path_by_mount_path(
server, share_path)
return volume_path
def allow_access(self, server, share_name, access_type, access_level,
access_to):
"""Add access for share."""
if access_type != 'user':
reason = _('Only user access type allowed.')
raise exception.InvalidShareAccess(reason=reason)
self._grant_share_access(server, share_name, access_level, access_to)
self._grant_share_path_access(server, share_name,
access_level, access_to)
def _grant_share_access(self, server, share_name, access_level, access_to):
access_right = self._SHARE_ACCESS_RIGHT_MAP[access_level]
cmd = ["Grant-SmbShareAccess", "-Name", share_name,
"-AccessRight", access_right,
"-AccountName", access_to, "-Force"]
self._remote_exec(server, cmd)
self._refresh_acl(server, share_name)
def _grant_share_path_access(self, server, share_name,
access_level, access_to):
# Set NTFS level permissions
access_right = self._ICACLS_ACCESS_RIGHT_MAP[access_level]
ace = '"%(access_to)s:(OI)(CI)%(access_right)s"' % dict(
access_to=access_to, access_right=access_right)
vol_path = self._get_volume_path_by_share_name(server, share_name)
cmd = ["icacls", self._windows_utils.quote_string(vol_path),
"/grant", ace, "/t", "/c"]
self._remote_exec(server, cmd)
def _refresh_acl(self, server, share_name):
cmd = ['Set-SmbPathAcl', '-ShareName', share_name]
self._remote_exec(server, cmd)
def deny_access(self, server, share_name, access, force=False):
access_to = access['access_to']
self._revoke_share_access(server, share_name, access_to)
self._revoke_share_path_access(server, share_name, access_to)
def _revoke_share_access(self, server, share_name, access_to):
cmd = ['Revoke-SmbShareAccess', '-Name', share_name,
'-AccountName', access_to, '-Force']
self._remote_exec(server, cmd)
self._refresh_acl(server, share_name)
def _revoke_share_path_access(self, server, share_name, access_to):
vol_path = self._get_volume_path_by_share_name(server, share_name)
cmd = ["icacls", self._windows_utils.quote_string(vol_path),
"/remove", access_to, "/t", "/c"]
self._remote_exec(server, cmd)
def _get_share_name(self, export_location):
return self._windows_utils.normalize_path(
export_location).split('\\')[-1]
def get_exports_for_share(self, server, old_export_location):
share_name = self._get_share_name(old_export_location)
data = dict(ip=server['public_address'], share_name=share_name)
return ['\\\\%(ip)s\\%(share_name)s' % data]
def _get_share_path_by_name(self, server, share_name,
ignore_missing=False):
cmd = ('Get-SmbShare -Name %s | '
'Select-Object -ExpandProperty Path' % share_name)
check_exit_code = not ignore_missing
(share_path, err) = self._remote_exec(server, cmd,
check_exit_code=check_exit_code)
return share_path.strip() if share_path else None
def get_share_path_by_export_location(self, server, export_location):
share_name = self._get_share_name(export_location)
return self._get_share_path_by_name(server, share_name)
def _share_exists(self, server, share_name):
share_path = self._get_share_path_by_name(server, share_name,
ignore_missing=True)
return bool(share_path)

View File

@ -0,0 +1,231 @@
# Copyright (c) 2015 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
from oslo_log import log
from manila.i18n import _LI
LOG = log.getLogger(__name__)
class WindowsUtils(object):
def __init__(self, remote_execute):
self._remote_exec = remote_execute
self._fsutil_total_space_regex = re.compile('of bytes *: ([0-9]*)')
self._fsutil_free_space_regex = re.compile(
'of avail free bytes *: ([0-9]*)')
def initialize_disk(self, server, disk_number):
cmd = ["Initialize-Disk", "-Number", disk_number]
self._remote_exec(server, cmd)
def create_partition(self, server, disk_number):
cmd = ["New-Partition", "-DiskNumber", disk_number, "-UseMaximumSize"]
self._remote_exec(server, cmd)
def format_partition(self, server, disk_number, partition_number):
cmd = ("Get-Partition -DiskNumber %(disk_number)s "
"-PartitionNumber %(partition_number)s | "
"Format-Volume -FileSystem NTFS -Force -Confirm:$false" % {
'disk_number': disk_number,
'partition_number': partition_number,
})
self._remote_exec(server, cmd)
def add_access_path(self, server, mount_path, disk_number,
partition_number):
cmd = ["Add-PartitionAccessPath", "-DiskNumber", disk_number,
"-PartitionNumber", partition_number,
"-AccessPath", self.quote_string(mount_path)]
self._remote_exec(server, cmd)
def resize_partition(self, server, size_bytes, disk_number,
partition_number):
cmd = ['Resize-Partition', '-DiskNumber', disk_number,
'-PartitionNumber', partition_number,
'-Size', size_bytes]
self._remote_exec(server, cmd)
def get_disk_number_by_serial_number(self, server, serial_number):
pattern = "%s*" % serial_number[:15]
cmd = ("Get-Disk | "
"Where-Object {$_.SerialNumber -like '%s'} | "
"Select-Object -ExpandProperty Number" % pattern)
(out, err) = self._remote_exec(server, cmd)
return int(out) if (len(out) > 0) else None
def get_disk_number_by_mount_path(self, server, mount_path):
cmd = ('Get-Partition | '
'Where-Object {$_.AccessPaths -contains "%s"} | '
'Select-Object -ExpandProperty DiskNumber' %
(mount_path + "\\"))
(out, err) = self._remote_exec(server, cmd)
return int(out) if (len(out) > 0) else None
def get_volume_path_by_mount_path(self, server, mount_path):
cmd = ('Get-Partition | '
'Where-Object {$_.AccessPaths -contains "%s"} | '
'Get-Volume | '
'Select-Object -ExpandProperty Path' %
(mount_path + "\\"))
(out, err) = self._remote_exec(server, cmd)
return out.strip()
def get_disk_space_by_path(self, server, mount_path):
cmd = ["fsutil", "volume", "diskfree",
self.quote_string(mount_path)]
(out, err) = self._remote_exec(server, cmd)
total_bytes = int(self._fsutil_total_space_regex.findall(out)[0])
free_bytes = int(self._fsutil_free_space_regex.findall(out)[0])
return total_bytes, free_bytes
def get_partition_maximum_size(self, server, disk_number,
partition_number):
cmd = ('Get-PartitionSupportedSize -DiskNumber %(disk_number)s '
'-PartitionNumber %(partition_number)s | '
'Select-Object -ExpandProperty SizeMax' %
dict(disk_number=disk_number,
partition_number=partition_number))
(out, err) = self._remote_exec(server, cmd)
max_bytes = int(out)
return max_bytes
def set_disk_online_status(self, server, disk_number, online=True):
is_offline = int(not online)
cmd = ["Set-Disk", "-Number", disk_number, "-IsOffline", is_offline]
self._remote_exec(server, cmd)
def set_disk_readonly_status(self, server, disk_number, readonly=False):
cmd = ["Set-Disk", "-Number", disk_number,
"-IsReadOnly", int(readonly)]
self._remote_exec(server, cmd)
def update_disk(self, server, disk_number):
"""Updates cached disk information."""
cmd = ["Update-Disk", disk_number]
self._remote_exec(server, cmd)
def join_domain(self, server, domain, admin_username, admin_password):
# NOTE(lpetrut): An instance reboot is needed but this will be
# performed using Nova so that the instance state can be
# retrieved easier.
LOG.info(_LI("Joining server %(ip)s to Active Directory "
"domain %(domain)s"), dict(ip=server['ip'],
domain=domain))
cmds = [
('$password = "%s" | '
'ConvertTo-SecureString -asPlainText -Force' % admin_password),
('$credential = '
'New-Object System.Management.Automation.PSCredential('
'"%s", $password)' % admin_username),
('Add-Computer -DomainName "%s" -Credential $credential' %
domain)]
cmd = ";".join(cmds)
self._remote_exec(server, cmd)
def unjoin_domain(self, server, admin_username, admin_password,
reboot=False):
cmds = [
('$password = "%s" | '
'ConvertTo-SecureString -asPlainText -Force' % admin_password),
('$credential = '
'New-Object System.Management.Automation.PSCredential('
'"%s", $password)' % admin_username),
('Remove-Computer -UnjoinDomaincredential $credential '
'-Passthru -Verbose -Force')]
cmd = ";".join(cmds)
self._remote_exec(server, cmd)
def get_current_domain(self, server):
cmd = "(Get-WmiObject Win32_ComputerSystem).Domain"
(out, err) = self._remote_exec(server, cmd)
return out.strip()
def ensure_directory_exists(self, server, path):
cmd = ["New-Item", "-ItemType", "Directory",
"-Force", "-Path", self.quote_string(path)]
self._remote_exec(server, cmd)
def remove(self, server, path, force=True, recurse=False,
is_junction=False):
if self.path_exists(server, path):
if is_junction:
cmd = ('[System.IO.Directory]::Delete('
'%(path)s, %(recurse)d)'
% dict(path=self.quote_string(path),
recurse=recurse))
else:
cmd = ["Remove-Item", "-Confirm:$false",
"-Path", self.quote_string(path)]
if force:
cmd += ['-Force']
if recurse:
cmd += ['-Recurse']
self._remote_exec(server, cmd)
else:
LOG.debug("Skipping deleting path %s as it does "
"not exist.", path)
def path_exists(self, server, path):
cmd = ["Test-Path", path]
(out, _) = self._remote_exec(server, cmd)
return out.strip() == "True"
def normalize_path(self, path):
return path.replace('/', '\\')
def get_interface_index_by_ip(self, server, ip):
cmd = ('Get-NetIPAddress | '
'Where-Object {$_.IPAddress -eq "%(ip)s"} | '
'Select-Object -ExpandProperty InterfaceIndex' %
dict(ip=ip))
(out, err) = self._remote_exec(server, cmd)
if_index = int(out)
return if_index
def set_dns_client_search_list(self, server, search_list):
src_list = ",".join(["'%s'" % domain for domain in search_list])
cmd = ["Set-DnsClientGlobalSetting",
"-SuffixSearchList", "@(%s)" % src_list]
self._remote_exec(server, cmd)
def set_dns_client_server_addresses(self, server, if_index, dns_servers):
dns_sv_list = ",".join(["'%s'" % dns_sv for dns_sv in dns_servers])
cmd = ["Set-DnsClientServerAddress",
"-InterfaceIndex", if_index,
"-ServerAddresses", "(%s)" % dns_sv_list]
self._remote_exec(server, cmd)
def set_win_reg_value(self, server, path, key, value):
cmd = ['Set-ItemProperty', '-Path', self.quote_string(path),
'-Name', key, '-Value', value]
self._remote_exec(server, cmd)
def get_win_reg_value(self, server, path, name=None):
cmd = "Get-ItemProperty -Path %s" % self.quote_string(path)
if name:
cmd += " | Select-Object -ExpandProperty %s" % name
return self._remote_exec(server, cmd, retry=False)[0]
def quote_string(self, string):
return '"%s"' % string

View File

@ -0,0 +1,367 @@
# Copyright (c) 2015 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import ddt
import mock
from oslo_concurrency import processutils
from oslo_config import cfg
from manila import exception
from manila.share import configuration
from manila.share.drivers import service_instance as generic_service_instance
from manila.share.drivers.windows import service_instance
from manila.share.drivers.windows import windows_utils
from manila import test
CONF = cfg.CONF
CONF.import_opt('driver_handles_share_servers',
'manila.share.driver')
CONF.register_opts(generic_service_instance.common_opts)
serv_mgr_cls = service_instance.WindowsServiceInstanceManager
generic_serv_mgr_cls = generic_service_instance.ServiceInstanceManager
@ddt.ddt
class WindowsServiceInstanceManagerTestCase(test.TestCase):
_FAKE_SERVER = {'ip': mock.sentinel.ip,
'instance_id': mock.sentinel.instance_id}
@mock.patch.object(windows_utils, 'WindowsUtils')
@mock.patch.object(serv_mgr_cls, '_check_auth_mode')
def setUp(self, mock_check_auth, mock_utils_cls):
self.flags(service_instance_user=mock.sentinel.username)
self._remote_execute = mock.Mock()
fake_conf = configuration.Configuration(None)
self._mgr = serv_mgr_cls(remote_execute=self._remote_execute,
driver_config=fake_conf)
self._windows_utils = mock_utils_cls.return_value
super(WindowsServiceInstanceManagerTestCase, self).setUp()
@ddt.data({},
{'use_cert_auth': False},
{'use_cert_auth': False, 'valid_pass_complexity': False},
{'certs_exist': False})
@mock.patch('os.path.exists')
@mock.patch.object(serv_mgr_cls, '_check_password_complexity')
@ddt.unpack
def test_check_auth_mode(self, mock_check_complexity, mock_path_exists,
use_cert_auth=True, certs_exist=True,
valid_pass_complexity=True):
self.flags(service_instance_password=mock.sentinel.password)
self._mgr._cert_pem_path = mock.sentinel.cert_path
self._mgr._cert_key_pem_path = mock.sentinel.key_path
mock_path_exists.return_value = certs_exist
mock_check_complexity.return_value = valid_pass_complexity
self._mgr._use_cert_auth = use_cert_auth
invalid_auth = ((use_cert_auth and not certs_exist)
or not valid_pass_complexity)
if invalid_auth:
self.assertRaises(exception.ServiceInstanceException,
self._mgr._check_auth_mode)
else:
self._mgr._check_auth_mode()
if not use_cert_auth:
mock_check_complexity.assert_called_once_with(
mock.sentinel.password)
@ddt.data(False, True)
def test_get_auth_info(self, use_cert_auth):
self._mgr._use_cert_auth = use_cert_auth
self._mgr._cert_pem_path = mock.sentinel.cert_path
self._mgr._cert_key_pem_path = mock.sentinel.key_path
auth_info = self._mgr._get_auth_info()
expected_auth_info = {'use_cert_auth': use_cert_auth}
if use_cert_auth:
expected_auth_info.update(cert_pem_path=mock.sentinel.cert_path,
cert_key_pem_path=mock.sentinel.key_path)
self.assertEqual(expected_auth_info, auth_info)
@mock.patch.object(serv_mgr_cls, '_get_auth_info')
@mock.patch.object(generic_serv_mgr_cls, 'get_common_server')
def test_common_server(self, mock_generic_get_server, mock_get_auth):
mock_server_details = {'backend_details': {}}
mock_auth_info = {'fake_auth_info': mock.sentinel.auth_info}
mock_generic_get_server.return_value = mock_server_details
mock_get_auth.return_value = mock_auth_info
expected_server_details = dict(backend_details=mock_auth_info)
server_details = self._mgr.get_common_server()
mock_generic_get_server.assert_called_once_with()
self.assertEqual(expected_server_details, server_details)
@mock.patch.object(serv_mgr_cls, '_get_auth_info')
@mock.patch.object(generic_serv_mgr_cls, '_get_new_instance_details')
def test_get_new_instance_details(self, mock_generic_get_details,
mock_get_auth):
mock_server_details = {'fake_server_details':
mock.sentinel.server_details}
mock_generic_get_details.return_value = mock_server_details
mock_auth_info = {'fake_auth_info': mock.sentinel.auth_info}
mock_get_auth.return_value = mock_auth_info
expected_server_details = dict(mock_server_details, **mock_auth_info)
instance_details = self._mgr._get_new_instance_details(
server=mock.sentinel.server)
mock_generic_get_details.assert_called_once_with(mock.sentinel.server)
self.assertEqual(expected_server_details, instance_details)
@ddt.data(('abAB01', True),
('abcdef', False),
('aA0', False))
@ddt.unpack
def test_check_password_complexity(self, password, expected_result):
valid_complexity = self._mgr._check_password_complexity(
password)
self.assertEqual(expected_result, valid_complexity)
@ddt.data(None, Exception)
def test_server_connection(self, side_effect):
self._remote_execute.side_effect = side_effect
expected_result = side_effect is None
is_available = self._mgr._test_server_connection(self._FAKE_SERVER)
self.assertEqual(expected_result, is_available)
self._remote_execute.assert_called_once_with(self._FAKE_SERVER,
"whoami",
retry=False)
@ddt.data(False, True)
def test_get_service_instance_create_kwargs(self, use_cert_auth):
self._mgr._use_cert_auth = use_cert_auth
self.flags(service_instance_password=mock.sentinel.admin_pass)
if use_cert_auth:
mock_cert_data = 'mock_cert_data'
self.mock_object(service_instance, 'open',
mock.mock_open(
read_data=mock_cert_data))
expected_kwargs = dict(user_data=mock_cert_data)
else:
expected_kwargs = dict(
meta=dict(admin_pass=mock.sentinel.admin_pass))
create_kwargs = self._mgr._get_service_instance_create_kwargs()
self.assertEqual(expected_kwargs, create_kwargs)
@mock.patch.object(generic_serv_mgr_cls, 'set_up_service_instance')
@mock.patch.object(serv_mgr_cls, 'get_valid_security_service')
@mock.patch.object(serv_mgr_cls, '_setup_security_service')
def test_set_up_service_instance(self, mock_setup_security_service,
mock_get_valid_security_service,
mock_generic_setup_serv_inst):
mock_service_instance = {'instance_details': None}
mock_network_info = {'security_services':
mock.sentinel.security_services}
mock_generic_setup_serv_inst.return_value = mock_service_instance
mock_get_valid_security_service.return_value = (
mock.sentinel.security_service)
instance_details = self._mgr.set_up_service_instance(
mock.sentinel.context, mock_network_info)
mock_generic_setup_serv_inst.assert_called_once_with(
mock.sentinel.context, mock_network_info)
mock_get_valid_security_service.assert_called_once_with(
mock.sentinel.security_services)
mock_setup_security_service.assert_called_once_with(
mock_service_instance, mock.sentinel.security_service)
expected_instance_details = dict(mock_service_instance,
joined_domain=True)
self.assertEqual(expected_instance_details,
instance_details)
@mock.patch.object(serv_mgr_cls, '_run_cloudbase_init_plugin_after_reboot')
@mock.patch.object(serv_mgr_cls, '_join_domain')
def test_setup_security_service(self, mock_join_domain,
mock_run_cbsinit_plugin):
utils = self._windows_utils
mock_security_service = {'domain': mock.sentinel.domain,
'user': mock.sentinel.admin_username,
'password': mock.sentinel.admin_password,
'dns_ip': mock.sentinel.dns_ip}
utils.get_interface_index_by_ip.return_value = (
mock.sentinel.interface_index)
self._mgr._setup_security_service(self._FAKE_SERVER,
mock_security_service)
utils.set_dns_client_search_list.assert_called_once_with(
self._FAKE_SERVER,
[mock_security_service['domain']])
utils.get_interface_index_by_ip.assert_called_once_with(
self._FAKE_SERVER,
self._FAKE_SERVER['ip'])
utils.set_dns_client_server_addresses.assert_called_once_with(
self._FAKE_SERVER,
mock.sentinel.interface_index,
[mock_security_service['dns_ip']])
mock_run_cbsinit_plugin.assert_called_once_with(
self._FAKE_SERVER,
plugin_name=self._mgr._CBS_INIT_WINRM_PLUGIN)
mock_join_domain.assert_called_once_with(
self._FAKE_SERVER,
mock.sentinel.domain,
mock.sentinel.admin_username,
mock.sentinel.admin_password)
@ddt.data({'join_domain_side_eff': Exception},
{'server_available': False,
'expected_exception': exception.ServiceInstanceException},
{'join_domain_side_eff': processutils.ProcessExecutionError,
'expected_exception': processutils.ProcessExecutionError},
{'domain_mismatch': True,
'expected_exception': exception.ServiceInstanceException})
@mock.patch.object(generic_serv_mgr_cls, 'reboot_server')
@mock.patch.object(generic_serv_mgr_cls, 'wait_for_instance_to_be_active')
@mock.patch.object(generic_serv_mgr_cls, '_check_server_availability')
@ddt.unpack
def test_join_domain(self, mock_check_avail,
mock_wait_instance_active,
mock_reboot_server,
expected_exception=None,
server_available=True,
domain_mismatch=False,
join_domain_side_eff=None):
self._windows_utils.join_domain.side_effect = join_domain_side_eff
mock_check_avail.return_value = server_available
self._windows_utils.get_current_domain.return_value = (
None if domain_mismatch else mock.sentinel.domain)
domain_params = (mock.sentinel.domain,
mock.sentinel.admin_username,
mock.sentinel.admin_password)
if expected_exception:
self.assertRaises(expected_exception,
self._mgr._join_domain,
self._FAKE_SERVER,
*domain_params)
else:
self._mgr._join_domain(self._FAKE_SERVER,
*domain_params)
if join_domain_side_eff != processutils.ProcessExecutionError:
mock_reboot_server.assert_called_once_with(
self._FAKE_SERVER, soft_reboot=True)
mock_wait_instance_active.assert_called_once_with(
self._FAKE_SERVER['instance_id'],
timeout=self._mgr.max_time_to_build_instance)
mock_check_avail.assert_called_once_with(self._FAKE_SERVER)
if server_available:
self._windows_utils.get_current_domain.assert_called_once_with(
self._FAKE_SERVER)
self._windows_utils.join_domain.assert_called_once_with(
self._FAKE_SERVER,
*domain_params)
@ddt.data([],
[{'type': 'active_directory'}],
[{'type': 'active_directory'}] * 2,
[{'type': mock.sentinel.invalid_type}])
def test_get_valid_security_service(self, security_services):
valid_security_service = self._mgr.get_valid_security_service(
security_services)
if (security_services and len(security_services) == 1 and
security_services[0]['type'] == 'active_directory'):
expected_valid_sec_service = security_services[0]
else:
expected_valid_sec_service = None
self.assertEqual(expected_valid_sec_service,
valid_security_service)
@mock.patch.object(serv_mgr_cls, '_get_cbs_init_reg_section')
def test_run_cloudbase_init_plugin_after_reboot(self,
mock_get_cbs_init_reg):
self._FAKE_SERVER = {'instance_id': mock.sentinel.instance_id}
mock_get_cbs_init_reg.return_value = mock.sentinel.cbs_init_reg_sect
expected_plugin_key_path = "%(cbs_init)s\\%(instance_id)s\\Plugins" % {
'cbs_init': mock.sentinel.cbs_init_reg_sect,
'instance_id': self._FAKE_SERVER['instance_id']}
self._mgr._run_cloudbase_init_plugin_after_reboot(
server=self._FAKE_SERVER,
plugin_name=mock.sentinel.plugin_name)
mock_get_cbs_init_reg.assert_called_once_with(self._FAKE_SERVER)
self._windows_utils.set_win_reg_value.assert_called_once_with(
self._FAKE_SERVER,
path=expected_plugin_key_path,
key=mock.sentinel.plugin_name,
value=self._mgr._CBS_INIT_RUN_PLUGIN_AFTER_REBOOT)
@ddt.data(
{},
{'exec_errors': [
processutils.ProcessExecutionError(stderr='Cannot find path'),
processutils.ProcessExecutionError(stderr='Cannot find path')],
'expected_exception': exception.ServiceInstanceException},
{'exec_errors': [processutils.ProcessExecutionError(stderr='')],
'expected_exception': processutils.ProcessExecutionError},
{'exec_errors': [
processutils.ProcessExecutionError(stderr='Cannot find path'),
None]}
)
@ddt.unpack
def test_get_cbs_init_reg_section(self, exec_errors=None,
expected_exception=None):
self._windows_utils.normalize_path.return_value = (
mock.sentinel.normalized_section_path)
self._windows_utils.get_win_reg_value.side_effect = exec_errors
if expected_exception:
self.assertRaises(expected_exception,
self._mgr._get_cbs_init_reg_section,
mock.sentinel.server)
else:
cbs_init_section = self._mgr._get_cbs_init_reg_section(
mock.sentinel.server)
self.assertEqual(mock.sentinel.normalized_section_path,
cbs_init_section)
base_path = 'hklm:\\SOFTWARE'
cbs_section = 'Cloudbase Solutions\\Cloudbase-Init'
tested_upper_sections = ['']
if exec_errors and 'Cannot find path' in exec_errors[0].stderr:
tested_upper_sections.append('Wow6432Node')
tested_sections = [os.path.join(base_path,
upper_section,
cbs_section)
for upper_section in tested_upper_sections]
self._windows_utils.normalize_path.assert_has_calls(
[mock.call(tested_section)
for tested_section in tested_sections])

View File

@ -0,0 +1,275 @@
# Copyright (c) 2015 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
import os
from manila.share import configuration
from manila.share.drivers import generic
from manila.share.drivers.windows import service_instance
from manila.share.drivers.windows import windows_smb_driver as windows_drv
from manila.share.drivers.windows import windows_smb_helper
from manila.share.drivers.windows import windows_utils
from manila.share.drivers.windows import winrm_helper
from manila import test
@ddt.ddt
class WindowsSMBDriverTestCase(test.TestCase):
@mock.patch.object(winrm_helper, 'WinRMHelper')
@mock.patch.object(windows_utils, 'WindowsUtils')
@mock.patch.object(windows_smb_helper, 'WindowsSMBHelper')
@mock.patch.object(service_instance,
'WindowsServiceInstanceManager')
def setUp(self, mock_sv_instance_mgr, mock_smb_helper_cls,
mock_utils_cls, mock_winrm_helper_cls):
self.flags(driver_handles_share_servers=False)
self._fake_conf = configuration.Configuration(None)
self._drv = windows_drv.WindowsSMBDriver(
configuration=self._fake_conf)
self._remote_execute = mock_winrm_helper_cls.return_value
self._windows_utils = mock_utils_cls.return_value
self._smb_helper = mock_smb_helper_cls.return_value
super(WindowsSMBDriverTestCase, self).setUp()
@mock.patch('manila.share.driver.ShareDriver')
def test_update_share_stats(self, mock_base_driver):
self._drv._update_share_stats()
mock_base_driver._update_share_stats.assert_called_once_with(
self._drv,
data=dict(storage_protocol="CIFS"))
@mock.patch.object(service_instance, 'WindowsServiceInstanceManager')
def test_setup_service_instance_manager(self, mock_sv_instance_mgr):
self._drv._setup_service_instance_manager()
mock_sv_instance_mgr.assert_called_once_with(
driver_config=self._fake_conf)
def test_setup_helpers(self):
expected_helpers = {"SMB": self._smb_helper,
"CIFS": self._smb_helper}
self._drv._setup_helpers()
self.assertEqual(expected_helpers, self._drv._helpers)
@mock.patch.object(generic.GenericShareDriver, '_teardown_server')
def test_teardown_server(self, mock_super_teardown):
mock_server = {'joined_domain': True,
'instance_id': mock.sentinel.instance_id}
mock_sec_service = {'user': mock.sentinel.user,
'password': mock.sentinel.password,
'domain': mock.sentinel.domain}
sv_mgr = self._drv.service_instance_manager
sv_mgr.get_valid_security_service.return_value = mock_sec_service
# We ensure that domain unjoin exceptions do not prevent the
# service instance from being teared down.
self._windows_utils.unjoin_domain.side_effect = Exception
self._drv._teardown_server(mock_server,
mock_sec_service)
sv_mgr.get_valid_security_service.assert_called_once_with(
mock_sec_service)
self._windows_utils.unjoin_domain.assert_called_once_with(
mock_server,
mock_sec_service['user'],
mock_sec_service['password'])
mock_super_teardown.assert_called_once_with(mock_server,
mock_sec_service)
@mock.patch.object(windows_drv.WindowsSMBDriver, '_get_disk_number')
def test_format_device(self, mock_get_disk_number):
mock_get_disk_number.return_value = mock.sentinel.disk_number
self._drv._format_device(mock.sentinel.server, mock.sentinel.vol)
self._drv._get_disk_number.assert_called_once_with(
mock.sentinel.server, mock.sentinel.vol)
self._windows_utils.initialize_disk.assert_called_once_with(
mock.sentinel.server, mock.sentinel.disk_number)
self._windows_utils.create_partition.assert_called_once_with(
mock.sentinel.server, mock.sentinel.disk_number)
self._windows_utils.format_partition.assert_called_once_with(
mock.sentinel.server, mock.sentinel.disk_number,
self._drv._DEFAULT_SHARE_PARTITION)
@mock.patch.object(windows_drv.WindowsSMBDriver,
'_ensure_disk_online_and_writable')
@mock.patch.object(windows_drv.WindowsSMBDriver, '_get_disk_number')
@mock.patch.object(windows_drv.WindowsSMBDriver, '_get_mount_path')
@mock.patch.object(windows_drv.WindowsSMBDriver, '_is_device_mounted')
def test_mount_device(self, mock_device_mounted, mock_get_mount_path,
mock_get_disk_number, mock_ensure_disk):
mock_get_mount_path.return_value = mock.sentinel.mount_path
mock_get_disk_number.return_value = mock.sentinel.disk_number
mock_device_mounted.return_value = False
self._drv._mount_device(share=mock.sentinel.share,
server_details=mock.sentinel.server,
volume=mock.sentinel.vol)
mock_device_mounted.assert_called_once_with(
mock.sentinel.mount_path, mock.sentinel.server, mock.sentinel.vol)
mock_get_disk_number.assert_called_once_with(
mock.sentinel.server, mock.sentinel.vol)
self._windows_utils.ensure_directory_exists.assert_called_once_with(
mock.sentinel.server, mock.sentinel.mount_path)
self._windows_utils.add_access_path(
mock.sentinel.server,
mock.sentinel.mount_path,
mock.sentinel.disk_number,
self._drv._DEFAULT_SHARE_PARTITION)
mock_ensure_disk.assert_called_once_with(
mock.sentinel.server, mock.sentinel.disk_number)
@mock.patch.object(windows_drv.WindowsSMBDriver, '_get_mount_path')
def test_unmount_device(self, mock_get_mount_path):
mock_get_mount_path.return_value = mock.sentinel.mount_path
mock_get_disk_number_by_path = (
self._windows_utils.get_disk_number_by_mount_path)
self._drv._unmount_device(mock.sentinel.share,
mock.sentinel.server)
mock_get_mount_path.assert_called_once_with(mock.sentinel.share)
mock_get_disk_number_by_path.assert_called_once_with(
mock.sentinel.server, mock.sentinel.mount_path)
self._windows_utils.set_disk_online_status.assert_called_once_with(
mock.sentinel.server,
mock_get_disk_number_by_path.return_value,
online=False)
@ddt.data(None, 1)
@mock.patch.object(windows_drv.WindowsSMBDriver, '_get_disk_number')
@mock.patch.object(windows_drv.WindowsSMBDriver,
'_ensure_disk_online_and_writable')
def test_resize_filesystem(self, new_size, mock_ensure_disk,
mock_get_disk_number):
mock_get_disk_number.return_value = mock.sentinel.disk_number
mock_get_max_size = self._windows_utils.get_partition_maximum_size
mock_get_max_size.return_value = mock.sentinel.max_size
self._drv._resize_filesystem(mock.sentinel.server,
mock.sentinel.vol,
new_size=new_size)
mock_get_disk_number.assert_called_once_with(mock.sentinel.server,
mock.sentinel.vol)
self._drv._ensure_disk_online_and_writable.assert_called_once_with(
mock.sentinel.server, mock.sentinel.disk_number)
if not new_size:
mock_get_max_size.assert_called_once_with(
mock.sentinel.server,
mock.sentinel.disk_number,
self._drv._DEFAULT_SHARE_PARTITION)
expected_new_size = mock.sentinel.max_size
else:
expected_new_size = new_size << 30
self._windows_utils.resize_partition.assert_called_once_with(
mock.sentinel.server,
expected_new_size,
mock.sentinel.disk_number,
self._drv._DEFAULT_SHARE_PARTITION)
def test_ensure_disk_online_and_writable(self):
self._drv._ensure_disk_online_and_writable(
mock.sentinel.server, mock.sentinel.disk_number)
self._windows_utils.update_disk.assert_called_once_with(
mock.sentinel.server, mock.sentinel.disk_number)
self._windows_utils.set_disk_online_status.assert_called_once_with(
mock.sentinel.server, mock.sentinel.disk_number, online=True)
self._windows_utils.set_disk_readonly_status.assert_called_once_with(
mock.sentinel.server, mock.sentinel.disk_number, readonly=False)
def test_get_mounted_share_size(self):
fake_size_gb = 10
self._windows_utils.get_disk_space_by_path.return_value = (
fake_size_gb << 30, mock.sentinel.free_bytes)
share_size = self._drv._get_mounted_share_size(
mock.sentinel.mount_path,
mock.sentinel.server)
self.assertEqual(share_size, fake_size_gb)
def test_get_consumed_space(self):
fake_size_gb = 2
fake_free_space_gb = 1
self._windows_utils.get_disk_space_by_path.return_value = (
fake_size_gb << 30, fake_free_space_gb << 30)
consumed_space = self._drv._get_consumed_space(
mock.sentinel.mount_path,
mock.sentinel.server)
self.assertEqual(fake_size_gb - fake_free_space_gb, consumed_space)
def test_get_mount_path(self):
fake_mount_path = 'fake_mount_path'
fake_share_name = 'fake_share_name'
mock_share = {'name': fake_share_name}
self.flags(share_mount_path=fake_mount_path)
mount_path = self._drv._get_mount_path(mock_share)
self._windows_utils.normalize_path.assert_called_once_with(
os.path.join(fake_mount_path, fake_share_name))
self.assertEqual(self._windows_utils.normalize_path.return_value,
mount_path)
@ddt.data(None, 2)
def test_get_disk_number(self, disk_number_by_serial=None):
mock_get_disk_number_by_serial = (
self._windows_utils.get_disk_number_by_serial_number)
mock_get_disk_number_by_serial.return_value = disk_number_by_serial
mock_volume = {'id': mock.sentinel.vol_id,
'mountpoint': "/dev/sdb"}
# If the disk number cannot be identified using the disk serial
# number, we expect it to be retrieved based on the volume mountpoint,
# having disk number 1 in this case.
expected_disk_number = (disk_number_by_serial
if disk_number_by_serial else 1)
disk_number = self._drv._get_disk_number(mock.sentinel.server,
mock_volume)
mock_get_disk_number_by_serial.assert_called_once_with(
mock.sentinel.server, mock.sentinel.vol_id)
self.assertEqual(expected_disk_number, disk_number)
@ddt.data(None, 2)
def test_is_device_mounted(self, disk_number_by_path):
mock_get_disk_number_by_path = (
self._windows_utils.get_disk_number_by_mount_path)
mock_get_disk_number_by_path.return_value = disk_number_by_path
expected_result = disk_number_by_path is not None
is_mounted = self._drv._is_device_mounted(
mount_path=mock.sentinel.mount_path,
server_details=mock.sentinel.server)
mock_get_disk_number_by_path.assert_called_once_with(
mock.sentinel.server,
mock.sentinel.mount_path)
self.assertEqual(expected_result, is_mounted)

View File

@ -0,0 +1,268 @@
# Copyright (c) 2015 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import ddt
import mock
from manila.common import constants
from manila import exception
from manila.share import configuration
from manila.share.drivers.windows import windows_smb_helper
from manila.share.drivers.windows import windows_utils
from manila import test
from oslo_config import cfg
CONF = cfg.CONF
CONF.import_opt('share_mount_path',
'manila.share.drivers.generic')
@ddt.ddt
class WindowsSMBHelperTestCase(test.TestCase):
_FAKE_SERVER = {'public_address': mock.sentinel.public_address}
_FAKE_SHARE_NAME = "fake_share_name"
_FAKE_SHARE = "\\\\%s\\%s" % (_FAKE_SERVER['public_address'],
_FAKE_SHARE_NAME)
_FAKE_SHARE_LOCATION = os.path.join(
configuration.Configuration(None).share_mount_path,
_FAKE_SHARE_NAME)
def setUp(self):
self._remote_exec = mock.Mock()
fake_conf = configuration.Configuration(None)
self._win_smb_helper = windows_smb_helper.WindowsSMBHelper(
self._remote_exec, fake_conf)
super(WindowsSMBHelperTestCase, self).setUp()
def test_init_helper(self):
self._win_smb_helper.init_helper(mock.sentinel.server)
self._remote_exec.assert_called_once_with(mock.sentinel.server,
"Get-SmbShare")
@ddt.data(True, False)
@mock.patch.object(windows_smb_helper.WindowsSMBHelper, '_share_exists')
def test_create_export(self, share_exists, mock_share_exists):
mock_share_exists.return_value = share_exists
result = self._win_smb_helper.create_export(self._FAKE_SERVER,
self._FAKE_SHARE_NAME)
if not share_exists:
cmd = ['New-SmbShare', '-Name', self._FAKE_SHARE_NAME, '-Path',
self._win_smb_helper._windows_utils.normalize_path(
self._FAKE_SHARE_LOCATION)]
self._remote_exec.assert_called_once_with(self._FAKE_SERVER, cmd)
else:
self.assertFalse(self._remote_exec.called)
self.assertEqual(self._FAKE_SHARE, result)
@mock.patch.object(windows_smb_helper.WindowsSMBHelper, '_share_exists')
def test_remove_export(self, mock_share_exists):
mock_share_exists.return_value = True
self._win_smb_helper.remove_export(mock.sentinel.server,
mock.sentinel.share_name)
cmd = ['Remove-SmbShare', '-Name', mock.sentinel.share_name, "-Force"]
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
@mock.patch.object(windows_utils.WindowsUtils,
'get_volume_path_by_mount_path')
@mock.patch.object(windows_smb_helper.WindowsSMBHelper,
'_get_share_path_by_name')
def test_get_volume_path_by_share_name(self, mock_get_share_path,
mock_get_vol_path):
mock_get_share_path.return_value = self._FAKE_SHARE_LOCATION
volume_path = self._win_smb_helper._get_volume_path_by_share_name(
mock.sentinel.server, self._FAKE_SHARE_NAME)
mock_get_share_path.assert_called_once_with(mock.sentinel.server,
self._FAKE_SHARE_NAME)
mock_get_vol_path.assert_called_once_with(mock.sentinel.server,
self._FAKE_SHARE_LOCATION)
self.assertEqual(mock_get_vol_path.return_value, volume_path)
@ddt.data('ip', 'user')
@mock.patch.object(windows_smb_helper.WindowsSMBHelper,
'_grant_share_access')
@mock.patch.object(windows_smb_helper.WindowsSMBHelper,
'_grant_share_path_access')
def test_allow_access(self, access_type, mock_grant_share_access,
mock_grant_share_path_access):
mock_args = (mock.sentinel.server, mock.sentinel.share_name,
access_type, mock.sentinel.access_level,
mock.sentinel.username)
if access_type != 'user':
self.assertRaises(exception.InvalidShareAccess,
self._win_smb_helper.allow_access,
*mock_args)
else:
self._win_smb_helper.allow_access(*mock_args)
mock_grant_share_access.assert_called_once_with(
mock.sentinel.server,
mock.sentinel.share_name,
mock.sentinel.access_level,
mock.sentinel.username)
mock_grant_share_path_access.assert_called_once_with(
mock.sentinel.server,
mock.sentinel.share_name,
mock.sentinel.access_level,
mock.sentinel.username)
@mock.patch.object(windows_smb_helper.WindowsSMBHelper, '_refresh_acl')
def test_grant_share_access(self, mock_refresh_acl):
self._win_smb_helper._grant_share_access(mock.sentinel.server,
mock.sentinel.share_name,
constants.ACCESS_LEVEL_RW,
mock.sentinel.username)
cmd = ["Grant-SmbShareAccess", "-Name", mock.sentinel.share_name,
"-AccessRight", "Change",
"-AccountName", mock.sentinel.username, "-Force"]
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
mock_refresh_acl.assert_called_once_with(mock.sentinel.server,
mock.sentinel.share_name)
@mock.patch.object(windows_smb_helper.WindowsSMBHelper,
'_get_volume_path_by_share_name')
def test_grant_share_path_access(self, mock_get_vol_path):
fake_vol_path = 'fake_vol_path'
mock_get_vol_path.return_value = fake_vol_path
self._win_smb_helper._grant_share_path_access(
mock.sentinel.server,
mock.sentinel.share_name,
constants.ACCESS_LEVEL_RW,
mock.sentinel.username)
expected_ace = '"%s:(OI)(CI)M"' % mock.sentinel.username
cmd = ["icacls", '"%s"' % fake_vol_path, "/grant",
expected_ace, "/t", "/c"]
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
def test_refresh_acl(self):
self._win_smb_helper._refresh_acl(mock.sentinel.server,
mock.sentinel.share_name)
cmd = ['Set-SmbPathAcl', '-ShareName', mock.sentinel.share_name]
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
@mock.patch.object(windows_smb_helper.WindowsSMBHelper,
'_revoke_share_path_access')
@mock.patch.object(windows_smb_helper.WindowsSMBHelper,
'_revoke_share_access')
def test_deny_access(self, mock_revoke_share_path_access,
mock_revoke_share_access):
mock_access = {'access_to': mock.sentinel.username}
self._win_smb_helper.deny_access(mock.sentinel.server,
mock.sentinel.share_name,
mock_access)
mock_revoke_share_access.assert_called_once_with(
mock.sentinel.server,
mock.sentinel.share_name,
mock.sentinel.username)
mock_revoke_share_path_access.assert_called_once_with(
mock.sentinel.server,
mock.sentinel.share_name,
mock.sentinel.username)
@mock.patch.object(windows_smb_helper.WindowsSMBHelper, '_refresh_acl')
def test_revoke_share_access(self, mock_refresh_acl):
self._win_smb_helper._revoke_share_access(mock.sentinel.server,
mock.sentinel.share_name,
mock.sentinel.username)
cmd = ["Revoke-SmbShareAccess", "-Name", mock.sentinel.share_name,
"-AccountName", mock.sentinel.username, "-Force"]
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
mock_refresh_acl.assert_called_once_with(mock.sentinel.server,
mock.sentinel.share_name)
@mock.patch.object(windows_smb_helper.WindowsSMBHelper,
'_get_volume_path_by_share_name')
def test_revoke_share_path_access(self, mock_get_vol_path):
fake_vol_path = 'fake_vol_path'
mock_get_vol_path.return_value = fake_vol_path
self._win_smb_helper._revoke_share_path_access(
mock.sentinel.server,
mock.sentinel.share_name,
mock.sentinel.username)
cmd = ["icacls", '"%s"' % fake_vol_path,
"/remove", mock.sentinel.username, "/t", "/c"]
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
def test_get_share_name(self):
result = self._win_smb_helper._get_share_name(self._FAKE_SHARE)
self.assertEqual(self._FAKE_SHARE_NAME, result)
def test_exports_for_share(self):
result = self._win_smb_helper.get_exports_for_share(
self._FAKE_SERVER, self._FAKE_SHARE_LOCATION)
self.assertEqual([self._FAKE_SHARE], result)
def test_get_share_path_by_name(self):
self._remote_exec.return_value = (self._FAKE_SHARE_LOCATION,
mock.sentinel.std_err)
result = self._win_smb_helper._get_share_path_by_name(
mock.sentinel.server,
mock.sentinel.share_name)
cmd = ('Get-SmbShare -Name %s | '
'Select-Object -ExpandProperty Path' % mock.sentinel.share_name)
self._remote_exec.assert_called_once_with(mock.sentinel.server,
cmd,
check_exit_code=True)
self.assertEqual(self._FAKE_SHARE_LOCATION, result)
@mock.patch.object(windows_smb_helper.WindowsSMBHelper,
'_get_share_path_by_name')
def test_get_share_path_by_export_location(self,
mock_get_share_path_by_name):
mock_get_share_path_by_name.return_value = mock.sentinel.share_path
result = self._win_smb_helper.get_share_path_by_export_location(
mock.sentinel.server, self._FAKE_SHARE)
mock_get_share_path_by_name.assert_called_once_with(
mock.sentinel.server, self._FAKE_SHARE_NAME)
self.assertEqual(mock.sentinel.share_path, result)
@mock.patch.object(windows_smb_helper.WindowsSMBHelper,
'_get_share_path_by_name')
def test_share_exists(self, mock_get_share_path_by_name):
result = self._win_smb_helper._share_exists(mock.sentinel.server,
mock.sentinel.share_name)
mock_get_share_path_by_name.assert_called_once_with(
mock.sentinel.server,
mock.sentinel.share_name,
ignore_missing=True)
self.assertTrue(result)

View File

@ -0,0 +1,367 @@
# Copyright (c) 2015 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
from manila.share.drivers.windows import windows_utils
from manila import test
@ddt.ddt
class WindowsUtilsTestCase(test.TestCase):
def setUp(self):
self._remote_exec = mock.Mock()
self._windows_utils = windows_utils.WindowsUtils(self._remote_exec)
super(WindowsUtilsTestCase, self).setUp()
def test_initialize_disk(self):
self._windows_utils.initialize_disk(mock.sentinel.server,
mock.sentinel.disk_number)
cmd = ["Initialize-Disk", "-Number", mock.sentinel.disk_number]
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
def test_create_partition(self):
self._windows_utils.create_partition(mock.sentinel.server,
mock.sentinel.disk_number)
cmd = ["New-Partition", "-DiskNumber",
mock.sentinel.disk_number, "-UseMaximumSize"]
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
def test_format_partition(self):
self._windows_utils.format_partition(mock.sentinel.server,
mock.sentinel.disk_number,
mock.sentinel.partition_number)
cmd = ("Get-Partition -DiskNumber %(disk_number)s "
"-PartitionNumber %(partition_number)s | "
"Format-Volume -FileSystem NTFS -Force -Confirm:$false" % {
'disk_number': mock.sentinel.disk_number,
'partition_number': mock.sentinel.partition_number,
})
self._remote_exec.assert_called_once_with(mock.sentinel.server,
cmd)
def test_add_access_path(self):
self._windows_utils.add_access_path(mock.sentinel.server,
mock.sentinel.mount_path,
mock.sentinel.disk_number,
mock.sentinel.partition_number)
cmd = ["Add-PartitionAccessPath", "-DiskNumber",
mock.sentinel.disk_number,
"-PartitionNumber", mock.sentinel.partition_number,
"-AccessPath", self._windows_utils.quote_string(
mock.sentinel.mount_path)
]
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
def test_resize_partition(self):
self._windows_utils.resize_partition(mock.sentinel.server,
mock.sentinel.size_bytes,
mock.sentinel.disk_number,
mock.sentinel.partition_number)
cmd = ['Resize-Partition', '-DiskNumber', mock.sentinel.disk_number,
'-PartitionNumber', mock.sentinel.partition_number,
'-Size', mock.sentinel.size_bytes]
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
@ddt.data("1", "")
def test_get_disk_number_by_serial_number(self, disk_number):
mock_serial_number = "serial_number"
self._remote_exec.return_value = (disk_number, mock.sentinel.std_err)
expected_disk_number = int(disk_number) if disk_number else None
result = self._windows_utils.get_disk_number_by_serial_number(
mock.sentinel.server,
mock_serial_number)
pattern = "%s*" % mock_serial_number
cmd = ("Get-Disk | "
"Where-Object {$_.SerialNumber -like '%s'} | "
"Select-Object -ExpandProperty Number" % pattern)
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
self.assertEqual(expected_disk_number, result)
@ddt.data("1", "")
def test_get_disk_number_by_mount_path(self, disk_number):
fake_mount_path = "fake_mount_path"
self._remote_exec.return_value = (disk_number, mock.sentinel.std_err)
expected_disk_number = int(disk_number) if disk_number else None
result = self._windows_utils.get_disk_number_by_mount_path(
mock.sentinel.server,
fake_mount_path)
cmd = ('Get-Partition | '
'Where-Object {$_.AccessPaths -contains "%s"} | '
'Select-Object -ExpandProperty DiskNumber' %
(fake_mount_path + "\\"))
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
self.assertEqual(expected_disk_number, result)
def test_get_volume_path_by_mount_path(self):
fake_mount_path = "fake_mount_path"
fake_volume_path = "fake_volume_path"
self._remote_exec.return_value = fake_volume_path + '\r\n', None
result = self._windows_utils.get_volume_path_by_mount_path(
mock.sentinel.server,
fake_mount_path)
cmd = ('Get-Partition | '
'Where-Object {$_.AccessPaths -contains "%s"} | '
'Get-Volume | '
'Select-Object -ExpandProperty Path' %
(fake_mount_path + "\\"))
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
self.assertEqual(fake_volume_path, result)
def test_get_disk_space_by_path(self):
fake_disk_size = 1024
fake_free_bytes = 1000
fake_fsutil_output = ("Total # of bytes : %(total_bytes)s"
"Total # of avail free bytes : %(free_bytes)s"
% dict(total_bytes=fake_disk_size,
free_bytes=fake_free_bytes))
self._remote_exec.return_value = fake_fsutil_output, None
result = self._windows_utils.get_disk_space_by_path(
mock.sentinel.server,
mock.sentinel.mount_path)
cmd = ["fsutil", "volume", "diskfree",
self._windows_utils.quote_string(mock.sentinel.mount_path)]
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
self.assertEqual((fake_disk_size, fake_free_bytes), result)
def test_get_partition_maximum_size(self):
fake_max_size = 1024
self._remote_exec.return_value = ("%s" % fake_max_size,
mock.sentinel.std_err)
result = self._windows_utils.get_partition_maximum_size(
mock.sentinel.server,
mock.sentinel.disk_number,
mock.sentinel.partition_number)
cmd = ('Get-PartitionSupportedSize -DiskNumber %(disk_number)s '
'-PartitionNumber %(partition_number)s | '
'Select-Object -ExpandProperty SizeMax' %
dict(disk_number=mock.sentinel.disk_number,
partition_number=mock.sentinel.partition_number))
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
self.assertEqual(fake_max_size, result)
def test_set_disk_online_status(self):
self._windows_utils.set_disk_online_status(mock.sentinel.server,
mock.sentinel.disk_number,
online=True)
cmd = ["Set-Disk", "-Number", mock.sentinel.disk_number,
"-IsOffline", 0]
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
def test_set_disk_readonly_status(self):
self._windows_utils.set_disk_readonly_status(mock.sentinel.server,
mock.sentinel.disk_number,
readonly=False)
cmd = ["Set-Disk", "-Number", mock.sentinel.disk_number,
"-IsReadOnly", 0]
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
def test_update_disk(self):
self._windows_utils.update_disk(mock.sentinel.server,
mock.sentinel.disk_number)
cmd = ["Update-Disk", mock.sentinel.disk_number]
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
def test_join_domain(self):
mock_server = {'ip': mock.sentinel.server_ip}
self._windows_utils.join_domain(mock_server,
mock.sentinel.domain,
mock.sentinel.admin_username,
mock.sentinel.admin_password)
cmds = [
('$password = "%s" | '
'ConvertTo-SecureString -asPlainText -Force' %
mock.sentinel.admin_password),
('$credential = '
'New-Object System.Management.Automation.PSCredential('
'"%s", $password)' % mock.sentinel.admin_username),
('Add-Computer -DomainName "%s" -Credential $credential' %
mock.sentinel.domain)]
cmd = ";".join(cmds)
self._remote_exec.assert_called_once_with(mock_server, cmd)
def test_unjoin_domain(self):
self._windows_utils.unjoin_domain(mock.sentinel.server,
mock.sentinel.admin_username,
mock.sentinel.admin_password)
cmds = [
('$password = "%s" | '
'ConvertTo-SecureString -asPlainText -Force' %
mock.sentinel.admin_password),
('$credential = '
'New-Object System.Management.Automation.PSCredential('
'"%s", $password)' % mock.sentinel.admin_username),
('Remove-Computer -UnjoinDomaincredential $credential '
'-Passthru -Verbose -Force')]
cmd = ";".join(cmds)
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
def test_get_current_domain(self):
fake_domain = " domain"
self._remote_exec.return_value = (fake_domain, mock.sentinel.std_err)
result = self._windows_utils.get_current_domain(mock.sentinel.server)
cmd = "(Get-WmiObject Win32_ComputerSystem).Domain"
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
self.assertEqual(fake_domain.strip(), result)
def test_ensure_directory_exists(self):
self._windows_utils.ensure_directory_exists(mock.sentinel.server,
mock.sentinel.path)
cmd = ["New-Item", "-ItemType", "Directory", "-Force", "-Path",
self._windows_utils.quote_string(mock.sentinel.path)]
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
@ddt.data(False, True)
@mock.patch.object(windows_utils.WindowsUtils, 'path_exists')
def test_remove(self, is_junction, mock_path_exists):
recurse = True
self._windows_utils.remove(mock.sentinel.server,
mock.sentinel.path,
is_junction=is_junction,
recurse=recurse)
if is_junction:
cmd = ('[System.IO.Directory]::Delete('
'%(path)s, %(recurse)d)'
% dict(path=self._windows_utils.quote_string(
mock.sentinel.path),
recurse=recurse))
else:
cmd = ["Remove-Item", "-Confirm:$false", "-Path",
self._windows_utils.quote_string(mock.sentinel.path),
"-Force", '-Recurse']
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
@mock.patch.object(windows_utils.WindowsUtils, 'path_exists')
def test_remove_unexisting_path(self, mock_path_exists):
mock_path_exists.return_value = False
self._windows_utils.remove(mock.sentinel.server,
mock.sentinel.path)
self.assertFalse(self._remote_exec.called)
@ddt.data("True", "False")
def test_path_exists(self, path_exists):
self._remote_exec.return_value = (path_exists,
mock.sentinel.std_err)
result = self._windows_utils.path_exists(mock.sentinel.server,
mock.sentinel.path)
cmd = ["Test-Path", mock.sentinel.path]
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
self.assertEqual(path_exists == "True", result)
def test_normalize_path(self):
fake_path = "C:/"
result = self._windows_utils.normalize_path(fake_path)
self.assertEqual("C:\\", result)
def test_get_interface_index_by_ip(self):
_FAKE_INDEX = "2"
self._remote_exec.return_value = (_FAKE_INDEX, mock.sentinel.std_err)
result = self._windows_utils.get_interface_index_by_ip(
mock.sentinel.server,
mock.sentinel.ip)
cmd = ('Get-NetIPAddress | '
'Where-Object {$_.IPAddress -eq "%(ip)s"} | '
'Select-Object -ExpandProperty InterfaceIndex' %
dict(ip=mock.sentinel.ip))
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
self.assertEqual(int(_FAKE_INDEX), result)
def test_set_dns_client_search_list(self):
mock_search_list = ["A", "B", "C"]
self._windows_utils.set_dns_client_search_list(mock.sentinel.server,
mock_search_list)
cmd = ["Set-DnsClientGlobalSetting",
"-SuffixSearchList", "@('A','B','C')"]
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
def test_set_dns_client_server_addresses(self):
mock_dns_servers = ["A", "B", "C"]
self._windows_utils.set_dns_client_server_addresses(
mock.sentinel.server,
mock.sentinel.if_index,
mock_dns_servers)
cmd = ["Set-DnsClientServerAddress",
"-InterfaceIndex", mock.sentinel.if_index,
"-ServerAddresses", "('A','B','C')"]
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
def test_set_win_reg_value(self):
self._windows_utils.set_win_reg_value(mock.sentinel.server,
mock.sentinel.path,
mock.sentinel.key,
mock.sentinel.value)
cmd = ['Set-ItemProperty', '-Path',
self._windows_utils.quote_string(mock.sentinel.path),
'-Name', mock.sentinel.key, '-Value', mock.sentinel.value]
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
@ddt.data(None, mock.sentinel.key_name)
def test_get_win_reg_value(self, key_name):
self._remote_exec.return_value = (mock.sentinel.value,
mock.sentinel.std_err)
result = self._windows_utils.get_win_reg_value(mock.sentinel.server,
mock.sentinel.path,
name=key_name)
cmd = "Get-ItemProperty -Path %s" % (
self._windows_utils.quote_string(mock.sentinel.path))
if key_name:
cmd += " | Select-Object -ExpandProperty %s" % key_name
self._remote_exec.assert_called_once_with(mock.sentinel.server,
cmd,
retry=False)
self.assertEqual(mock.sentinel.value, result)
def test_quote_string(self):
result = self._windows_utils.quote_string(mock.sentinel.string)
self.assertEqual('"%s"' % mock.sentinel.string, result)