Merge "Add NetApp Active IQ scheduler weigher"

This commit is contained in:
Zuul 2023-09-07 03:37:24 +00:00 committed by Gerrit Code Review
commit f912aafc58
10 changed files with 862 additions and 2 deletions

View File

@ -1177,3 +1177,8 @@ class ShareBackupSizeExceedsAvailableQuota(QuotaError):
message = _("Requested backup exceeds allowed Backup gigabytes "
"quota. Requested %(requested)sG, quota is %(quota)sG and "
"%(consumed)sG has been consumed.")
class NetappActiveIQWeigherRequiredParameter(ManilaException):
message = _("%(config)s configuration of the NetAppActiveIQ weigher "
"must be set.")

View File

@ -0,0 +1,355 @@
# Copyright 2023 NetApp, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
import requests
from requests.adapters import HTTPAdapter
from requests import auth
from urllib3.util import retry
from manila import exception
from manila.scheduler.weighers import base_host
ACTIVE_IQ_WEIGHER_GROUP = 'netapp_active_iq'
active_iq_weight_opts = [
cfg.HostAddressOpt('aiq_hostname',
help='The hostname (or IP address) for the Active IQ.'),
cfg.PortOpt('aiq_port',
help=('The TCP port to use for communication with the Active '
'IQ. If not specified, the weigher driver will use 80 '
'for HTTP and 443 for HTTPS.')),
cfg.StrOpt('aiq_transport_type',
default='https',
choices=['http', 'https'],
help=('The transport protocol used when communicating with '
'the Active IQ. Valid values are '
'http or https.')),
cfg.BoolOpt('aiq_ssl_verify',
default=False,
help='Verifying the SSL certificate. Default is False.'),
cfg.StrOpt('aiq_ssl_cert_path',
help=("The path to a CA_BUNDLE file or directory with "
"certificates of trusted CA. If set to a directory, it "
"must have been processed using the c_rehash utility "
"supplied with OpenSSL. If not informed, it will use the "
"Mozilla's carefully curated collection of Root "
"Certificates for validating the trustworthiness of SSL "
"certificates.")),
cfg.StrOpt('aiq_username',
help=('Administrative user account name used to access the '
'Active IQ.')),
cfg.StrOpt('aiq_password',
help=('Password for the administrative user account '
'specified in the aiq_username option.'),
secret=True),
cfg.IntOpt('aiq_eval_method',
default=0,
help='Integer indicator of which evaluation method, defaults '
'to 0 (0 - by index, 1 - normalized value, 2 - by '
'literal value).'),
cfg.ListOpt('aiq_priority_order',
default=[
'ops',
'latency',
'volume_count',
'size'
],
help='Permutation of the list ["volume_count", "size", '
'"latency", “ops”]. Note that for volume_count and '
'latency, the higher the values, the less optimal the '
'resources. For capacity and ops, the higher the value '
'the more desirable the resources. If metrics are to be '
'considered with equal weights, concatenate the strings, '
'separated by ":".'
'An example is ["volume_count", "size", “latency:ops”] '
'if latency and ops want to have equal but minimum '
'weights, or ["volume_count:size", "latency", “ops”] '
'if volume_count and size have equal maximum weights. '
'If not provided, the default order is '
'["volume_count", "size", "latency", “ops”].'),
]
CONF = cfg.CONF
CONF.register_opts(active_iq_weight_opts, ACTIVE_IQ_WEIGHER_GROUP)
LOG = logging.getLogger(__name__)
class NetAppAIQWeigher(base_host.BaseHostWeigher):
"""AIQ Weigher. Assign weights based on NetApp Active IQ tool."""
def __init__(self, *args, **kwargs):
super(NetAppAIQWeigher, self).__init__(*args, **kwargs)
self.configuration = CONF[ACTIVE_IQ_WEIGHER_GROUP]
self.host = self.configuration.aiq_hostname
if not self.host:
raise exception.NetappActiveIQWeigherRequiredParameter(
config="aiq_hostname")
self.username = self.configuration.aiq_username
if not self.username:
raise exception.NetappActiveIQWeigherRequiredParameter(
config="aiq_username")
self.password = self.configuration.aiq_password
if not self.password:
raise exception.NetappActiveIQWeigherRequiredParameter(
config="aiq_password")
self.protocol = self.configuration.aiq_transport_type
self.port = self.configuration.aiq_port
if not self.port:
self.port = "80" if self.protocol == "http" else "443"
self.ssl_verify = self.configuration.aiq_ssl_verify
if self.ssl_verify and self.configuration.aiq_ssl_cert_path:
self.ssl_verify = self.configuration.aiq_ssl_cert_path
self.eval_method = self.configuration.aiq_eval_method
self.priority_order = self.configuration.aiq_priority_order
def _weigh_object(self, host_state, weight_properties):
"""Weight for a specific object from parent abstract class"""
# NOTE(felipe_rodrigues): this abstract class method is not called for
# the AIQ weigher, since it does not weigh one single object.
raise NotImplementedError()
def _weigh_active_iq(self, netapp_aggregates_location, weight_properties):
"""Determine host's rating based on a Active IQ."""
size = weight_properties.get('size')
share_type = weight_properties.get('share_type', {})
performance_level_name = share_type.get('extra_specs', {}).get(
'netapp:performance_service_level_name')
# retrieves the performance service level key if a PSL name is given.
performance_level_id = None
if performance_level_name:
performance_level_id = self._get_performance_level_id(
performance_level_name)
if not performance_level_id:
return []
# retrieves the equivalent active IQ keys of the pools.
resource_keys = self._get_resource_keys(netapp_aggregates_location)
if len(resource_keys) == 0:
return []
result = self._balance_aggregates(resource_keys, size,
performance_level_id)
return result
def _get_url(self):
"""Get the base URL for REST requests."""
host = self.host
if ':' in host:
host = '[%s]' % host
return f'{self.protocol}://{host}:{self.port}/api/'
def _get_request_method(self, method, session):
"""Returns the request method to be used in the REST call."""
request_methods = {
'post': session.post,
'get': session.get,
'put': session.put,
'delete': session.delete,
'patch': session.patch,
}
return request_methods[method]
def _get_session_method(self, method):
"""Get the REST method from the session."""
# NOTE(felipe_rodrigues): request resilient of temporary network
# failures (like name resolution failure), retrying until 5 times.
_session = requests.Session()
max_retries = retry.Retry(total=5, connect=5, read=2, backoff_factor=1)
adapter = HTTPAdapter(max_retries=max_retries)
_session.mount('%s://' % self.protocol, adapter)
_session.auth = auth.HTTPBasicAuth(self.username, self.password)
_session.verify = self.ssl_verify
_session.headers = {}
return self._get_request_method(method, _session)
def _call_active_iq(self, action_path, method, body=None):
"""Call the Active IQ REST API."""
rest_method = self._get_session_method(method)
url = self._get_url() + action_path
msg_args = {
"method": method.upper(),
"url": url,
"body": body,
}
LOG.debug("REQUEST: %(method)s %(url)s Body=%(body)s", msg_args)
response = rest_method(url, json=body)
code = response.status_code
response_body = response.content
msg_args = {
"code": code,
"body": response_body,
}
LOG.debug("RESPONSE: %(code)s Body=%(body)s", msg_args)
return code, response_body
def _get_performance_level_id(self, performance_level_name):
"""Gets the ID of a performance level name."""
psl_endpoint = (f'storage-provider/performance-service-levels?'
f'name={performance_level_name}')
try:
code, res = self._call_active_iq(psl_endpoint, "get")
except Exception as e:
LOG.error("Could not retrieve the key of the performance service "
"level named as '%(psl)s'. Skipping the weigher. "
"Error: %(error)s",
{'psl': performance_level_name, 'error': e})
LOG.error(e)
return None
if code != 200:
LOG.error("Could not retrieve the key of the performance service "
"level named as '%(psl)s'. Skipping the weigher.",
{'psl': performance_level_name})
return None
res = jsonutils.loads(res) if res else {}
psl_list = res.get('records', [])
if len(psl_list) == 0:
LOG.error("Could not found any performance service level named "
"as '%s'. Skipping the weigher.", performance_level_name)
return None
return psl_list[0].get("key", None)
def _get_aggregate_identifier(self, aggr_name, cluster_name):
"""Returns the string identifier of an aggregate on a cluster."""
return f'{aggr_name}:{cluster_name}'
def _get_resource_keys(self, netapp_aggregates_location):
"""Map the aggregates names to the AIQ resource keys."""
aggregate_endpoint = 'datacenter/storage/aggregates'
try:
code, res = self._call_active_iq(aggregate_endpoint, "get")
except Exception as e:
LOG.error("Could not retrieve the aggregates resource keys. "
"Skipping the weigher. Error: %s", e)
LOG.error(e)
return []
if code != 200:
LOG.error("Could not retrieve the aggregates resource keys. "
"Skipping the weigher.")
return []
res = jsonutils.loads(res) if res else {}
aggr_map = {}
for aggr in res.get('records', []):
identifier = self._get_aggregate_identifier(
aggr["name"], aggr["cluster"]["name"])
aggr_map[identifier] = aggr["key"]
# we must keep the lists with the same order.
resource_keys = []
found_pool_keys = []
for identifier in netapp_aggregates_location:
if identifier in aggr_map:
found_pool_keys.append(identifier)
# If a pool could not be found, it is marked as resource key 0.
resource_keys.append(aggr_map.get(identifier, 0))
LOG.debug("The following pools will be evaluated by Active IQ: %s",
found_pool_keys)
return resource_keys
def _balance_aggregates(self, resource_keys, size, performance_level_uuid):
"""Call AIQ to generate the weights of each aggregate."""
balance_endpoint = 'storage-provider/data-placement/balance'
body = {
"capacity": f'{size}GB',
"eval_method": self.eval_method,
# NOTE(felipe_rodrigues): from Active IQ documentation, the
# opt_method only works as 0.
"opt_method": 0,
"priority_order": self.priority_order,
"separate_flag": False,
# NOTE(felipe_rodrigues): remove the keys marked with 0, since they
# are not found the pool keys.
"resource_keys": [key for key in resource_keys if key != 0],
}
if performance_level_uuid:
body["ssl_key"] = performance_level_uuid
try:
code, res = self._call_active_iq(
balance_endpoint, "post", body=body)
except Exception as e:
LOG.error("Could not balance the aggregates. Skipping the "
"weigher. Error: %s", e)
LOG.error(e)
return []
if code != 200:
LOG.error("Could not balance the aggregates. Skipping the "
"weigher.")
return []
res = jsonutils.loads(res) if res else []
weight_map = {}
for aggr in res:
weight_map[aggr["key"]] = aggr["scores"]["total_weighted_score"]
# it must keep the lists with the same order.
weights = []
for key in resource_keys:
weights.append(weight_map.get(key, 0.0))
return weights
def weigh_objects(self, weighed_obj_list, weight_properties):
"""Weigh multiple objects using Active IQ."""
netapp_aggregates_location = []
for obj in weighed_obj_list:
# if at least one host is not from NetApp, the entire weigher is
# skipped.
if obj.obj.vendor_name != "NetApp":
LOG.debug(
"Skipping Active IQ weigher given that some backends "
"are not from NetApp.")
return []
else:
cluster_name = obj.obj.capabilities.get("netapp_cluster_name")
aggr_name = obj.obj.pool_name
netapp_aggregates_location.append(
self._get_aggregate_identifier(aggr_name, cluster_name))
result = self._weigh_active_iq(
netapp_aggregates_location, weight_properties)
LOG.debug("Active IQ weight result: %s", result)
return result

View File

@ -176,6 +176,7 @@ class NetAppCmodeFileStorageLibrary(object):
self.message_api = message_api.API()
self._snapmirror_schedule = self._convert_schedule_to_seconds(
schedule=self.configuration.netapp_snapmirror_schedule)
self._cluster_name = self.configuration.netapp_cluster_name
@na_utils.trace
def do_setup(self, context):
@ -429,9 +430,12 @@ class NetAppCmodeFileStorageLibrary(object):
flexgroup_aggr = self._get_flexgroup_aggr_set()
aggr_space = self._get_aggregate_space(aggr_pool.union(flexgroup_aggr))
if self._have_cluster_creds:
cluster_name = self._cluster_name
if self._have_cluster_creds and not cluster_name:
# Get up-to-date node utilization metrics just once.
self._perf_library.update_performance_cache({}, self._ssc_stats)
cluster_name = self._client.get_cluster_name()
self._cluster_name = cluster_name
# Add FlexVol pools.
filter_function = (get_filter_function() if get_filter_function
@ -446,6 +450,7 @@ class NetAppCmodeFileStorageLibrary(object):
pool_with_func = copy.deepcopy(pool)
pool_with_func['filter_function'] = filter_function
pool_with_func['goodness_function'] = goodness_function
pool_with_func['netapp_cluster_name'] = self._cluster_name
pools.append(pool_with_func)
@ -462,6 +467,7 @@ class NetAppCmodeFileStorageLibrary(object):
pool_with_func = copy.deepcopy(pool)
pool_with_func['filter_function'] = filter_function
pool_with_func['goodness_function'] = goodness_function
pool_with_func['netapp_cluster_name'] = self._cluster_name
pools.append(pool_with_func)
@ -497,6 +503,7 @@ class NetAppCmodeFileStorageLibrary(object):
'pool_name': pool_name,
'filter_function': None,
'goodness_function': None,
'netapp_cluster_name': '',
'total_capacity_gb': total_capacity_gb,
'free_capacity_gb': free_capacity_gb,
'allocated_capacity_gb': allocated_capacity_gb,

View File

@ -208,7 +208,12 @@ netapp_cluster_opts = [
'option should only be specified when the option '
'driver_handles_share_servers is set to False (i.e. the '
'driver is managing shares on a single pre-configured '
'Vserver).')), ]
'Vserver).')),
cfg.StrOpt('netapp_cluster_name',
help=('This option specifies the Cluster Name on which '
'provisioning of file storage shares should occur. '
'If not set, the driver will try to discover by '
'API call.')), ]
netapp_support_opts = [
cfg.StrOpt('netapp_trace_flags',

View File

@ -244,6 +244,166 @@ SHARE_SERVICE_STATES_WITH_POOLS = {
thin_provisioning=False)]),
}
FAKE_ACTIVE_IQ_WEIGHER_LIST = [
"fake_aggregate_1:fake_cluster_name1",
"fake_aggregate_2:fake_cluster_name2",
"fake_aggregate_3:fake_cluster_name3"
]
FAKE_ACTIVE_IQ_WEIGHER_AGGREGATES_RESPONSE = {
"records": [
{
"name": "fake_aggregate_1",
"key": "fake_key_1",
"cluster": {
"name": "fake_cluster_name1"
}
},
{
"name": "fake_aggregate_2",
"key": "fake_key_2",
"cluster": {
"name": "fake_cluster_name2"
}
},
{
"name": "fake_aggregate_3",
"key": "fake_key_3",
"cluster": {
"name": "fake_cluster_name3"
}
}
]
}
FAKE_ACTIVE_IQ_WEIGHER_BALANCE_RESPONSE = [
{
"key": "fake_key_1",
"scores": {
"total_weighted_score": 10.0
}
},
{
"key": "fake_key_2",
"scores": {
"total_weighted_score": 20.0
}
}
]
class FakeHostManagerNetAppOnly(host_manager.HostManager):
def __init__(self):
super(FakeHostManagerNetAppOnly, self).__init__()
self.service_states = {
'host1': {
'total_capacity_gb': 1024,
'free_capacity_gb': 1024,
'allocated_capacity_gb': 0,
'thin_provisioning': False,
'reserved_percentage': 10,
'reserved_snapshot_percentage': 5,
'reserved_share_extend_percentage': 15,
'timestamp': None,
'snapshot_support': True,
'create_share_from_snapshot_support': True,
'replication_type': 'writable',
'replication_domain': 'endor',
'storage_protocol': 'NFS_CIFS',
'vendor_name': 'NetApp',
'netapp_cluster_name': 'cluster1',
},
'host2': {
'total_capacity_gb': 2048,
'free_capacity_gb': 300,
'allocated_capacity_gb': 1748,
'provisioned_capacity_gb': 1748,
'max_over_subscription_ratio': 2.0,
'thin_provisioning': True,
'reserved_percentage': 10,
'reserved_snapshot_percentage': 5,
'reserved_share_extend_percentage': 15,
'timestamp': None,
'snapshot_support': True,
'create_share_from_snapshot_support': True,
'replication_type': 'readable',
'replication_domain': 'kashyyyk',
'storage_protocol': 'NFS_CIFS',
'vendor_name': 'NetApp',
'netapp_cluster_name': 'cluster2',
},
'host3': {
'total_capacity_gb': 512,
'free_capacity_gb': 256,
'allocated_capacity_gb': 256,
'provisioned_capacity_gb': 256,
'max_over_subscription_ratio': 2.0,
'thin_provisioning': [False],
'reserved_percentage': 0,
'reserved_snapshot_percentage': 0,
'reserved_share_extend_percentage': 0,
'snapshot_support': True,
'create_share_from_snapshot_support': True,
'timestamp': None,
'storage_protocol': 'NFS_CIFS',
'vendor_name': 'NetApp',
'netapp_cluster_name': 'cluster3',
},
'host4': {
'total_capacity_gb': 2048,
'free_capacity_gb': 200,
'allocated_capacity_gb': 1848,
'provisioned_capacity_gb': 1848,
'max_over_subscription_ratio': 1.0,
'thin_provisioning': [True],
'reserved_percentage': 5,
'reserved_snapshot_percentage': 2,
'reserved_share_extend_percentage': 5,
'timestamp': None,
'snapshot_support': True,
'create_share_from_snapshot_support': True,
'replication_type': 'dr',
'replication_domain': 'naboo',
'storage_protocol': 'NFS_CIFS',
'vendor_name': 'NetApp',
'netapp_cluster_name': 'cluster4',
},
'host5': {
'total_capacity_gb': 2048,
'free_capacity_gb': 500,
'allocated_capacity_gb': 1548,
'provisioned_capacity_gb': 1548,
'max_over_subscription_ratio': 1.5,
'thin_provisioning': [True, False],
'reserved_percentage': 5,
'reserved_snapshot_percentage': 2,
'reserved_share_extend_percentage': 5,
'timestamp': None,
'snapshot_support': True,
'create_share_from_snapshot_support': True,
'replication_type': None,
'storage_protocol': 'NFS_CIFS',
'vendor_name': 'NetApp',
'netapp_cluster_name': 'cluster5',
},
'host6': {
'total_capacity_gb': 'unknown',
'free_capacity_gb': 'unknown',
'allocated_capacity_gb': 1548,
'thin_provisioning': False,
'reserved_percentage': 5,
'reserved_snapshot_percentage': 2,
'reserved_share_extend_percentage': 5,
'snapshot_support': True,
'create_share_from_snapshot_support': True,
'timestamp': None,
'storage_protocol': 'GLUSTERFS',
'vendor_name': 'NetApp',
'netapp_cluster_name': 'cluster6',
},
}
class FakeFilterScheduler(filter.FilterScheduler):
def __init__(self, *args, **kwargs):
@ -269,6 +429,7 @@ class FakeHostManager(host_manager.HostManager):
'replication_type': 'writable',
'replication_domain': 'endor',
'storage_protocol': 'NFS_CIFS',
'vendor_name': 'Dummy',
},
'host2': {'total_capacity_gb': 2048,
'free_capacity_gb': 300,
@ -285,6 +446,7 @@ class FakeHostManager(host_manager.HostManager):
'replication_type': 'readable',
'replication_domain': 'kashyyyk',
'storage_protocol': 'NFS_CIFS',
'vendor_name': 'Dummy',
},
'host3': {'total_capacity_gb': 512,
'free_capacity_gb': 256,
@ -299,6 +461,7 @@ class FakeHostManager(host_manager.HostManager):
'create_share_from_snapshot_support': True,
'timestamp': None,
'storage_protocol': 'NFS_CIFS',
'vendor_name': 'Dummy',
},
'host4': {'total_capacity_gb': 2048,
'free_capacity_gb': 200,
@ -315,6 +478,7 @@ class FakeHostManager(host_manager.HostManager):
'replication_type': 'dr',
'replication_domain': 'naboo',
'storage_protocol': 'NFS_CIFS',
'vendor_name': 'Dummy',
},
'host5': {'total_capacity_gb': 2048,
'free_capacity_gb': 500,
@ -330,6 +494,7 @@ class FakeHostManager(host_manager.HostManager):
'create_share_from_snapshot_support': True,
'replication_type': None,
'storage_protocol': 'NFS_CIFS',
'vendor_name': 'Dummy',
},
'host6': {'total_capacity_gb': 'unknown',
'free_capacity_gb': 'unknown',
@ -342,6 +507,7 @@ class FakeHostManager(host_manager.HostManager):
'create_share_from_snapshot_support': True,
'timestamp': None,
'storage_protocol': 'GLUSTERFS',
'vendor_name': 'Dummy',
},
}

View File

@ -0,0 +1,302 @@
# Copyright 2023 NetApp, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For NetApp Active IQ Weigher.
"""
from unittest import mock
import ddt
from oslo_config import cfg
from oslo_serialization import jsonutils
import requests
from manila import context
from manila import exception
from manila.scheduler.weighers import base_host
from manila.scheduler.weighers import netapp_aiq
from manila.share import utils
from manila import test
from manila.tests.scheduler import fakes
from manila.tests import utils as test_utils
CONF = cfg.CONF
@ddt.ddt
class NetAppAIQWeigherTestCase(test.TestCase):
def setUp(self):
super(NetAppAIQWeigherTestCase, self).setUp()
self.weight_handler = base_host.HostWeightHandler(
'manila.scheduler.weighers')
netapp_aiq.LOG.debug = mock.Mock()
netapp_aiq.LOG.error = mock.Mock()
self.mock_session = mock.Mock()
self.mock_session.get = mock.Mock()
self.mock_session.post = mock.Mock()
self.mock_session.delete = mock.Mock()
self.mock_session.patch = mock.Mock()
self.mock_session.put = mock.Mock()
data = {
'netapp_active_iq': {
'aiq_hostname': "10.10.10.10",
'aiq_transport_type': 'https',
'aiq_ssl_verify': True,
'aiq_ssl_cert_path': 'fake_cert',
'aiq_username': 'fake_user',
'aiq_password': 'fake_password',
'aiq_eval_method': 1,
'aiq_priority_order': 'ops'
}
}
self.netapp_aiq_weigher = None
with test_utils.create_temp_config_with_opts(data):
self.netapp_aiq_weigher = netapp_aiq.NetAppAIQWeigher()
def test__weigh_object(self):
self.assertRaises(NotImplementedError,
self.netapp_aiq_weigher._weigh_object,
"fake", "fake")
@ddt.data(
{'resource_keys': ["fake_resource_key"], 'performance_level': None},
{'resource_keys': ["fake_resource_key"],
'performance_level': "fake_psl"},
{'resource_keys': [], 'performance_level': 'fake_psl'})
@ddt.unpack
def test__weigh_active_iq(self, resource_keys, performance_level):
weight_properties = {
'size': 1,
'share_type': {
'extra_specs': {
"netapp:performance_service_level_name": "fake_name",
}
}
}
mock_get_psl_id = self.mock_object(
self.netapp_aiq_weigher, '_get_performance_level_id',
mock.Mock(return_value=performance_level))
mock_get_resource_keys = self.mock_object(
self.netapp_aiq_weigher, '_get_resource_keys',
mock.Mock(return_value=resource_keys))
mock_balance_aggregates = self.mock_object(
self.netapp_aiq_weigher, '_balance_aggregates',
mock.Mock(return_value=["1.0", "1.0"]))
res = self.netapp_aiq_weigher._weigh_active_iq(
fakes.FAKE_ACTIVE_IQ_WEIGHER_LIST, weight_properties)
mock_get_psl_id.assert_called_once_with("fake_name")
if not resource_keys or not performance_level:
self.assertEqual([], res)
else:
self.assertEqual(["1.0", "1.0"], res)
if performance_level:
mock_get_resource_keys.assert_called_once_with(
fakes.FAKE_ACTIVE_IQ_WEIGHER_LIST)
else:
mock_get_resource_keys.assert_not_called()
if not resource_keys or not performance_level:
mock_balance_aggregates.assert_not_called()
else:
mock_balance_aggregates.assert_called_once_with(
resource_keys, 1, performance_level)
@ddt.data(True, False)
def test__get_url(self, ipv6):
if ipv6:
self.netapp_aiq_weigher.host = "2001:db8::"
else:
self.netapp_aiq_weigher.host = "1.1.1.1"
self.netapp_aiq_weigher.port = "fake_port"
self.netapp_aiq_weigher.protocol = "fake_protocol"
res = self.netapp_aiq_weigher._get_url()
if ipv6:
self.assertEqual('fake_protocol://[2001:db8::]:fake_port/api/',
res)
else:
self.assertEqual('fake_protocol://1.1.1.1:fake_port/api/',
res)
@ddt.data('get', 'post', 'delete', 'patch', 'put')
def test__get_request_method(self, method):
res = self.netapp_aiq_weigher._get_request_method(
method, self.mock_session)
if method == 'get':
self.assertEqual(self.mock_session.get, res)
elif method == 'post':
self.assertEqual(self.mock_session.post, res)
elif method == 'delete':
self.assertEqual(self.mock_session.delete, res)
elif method == 'put':
self.assertEqual(self.mock_session.put, res)
elif method == 'patch':
self.assertEqual(self.mock_session.patch, res)
def test__get_session_method(self):
mock_session_builder = self.mock_object(
requests, 'Session', mock.Mock(return_value=self.mock_session))
mock__get_request_method = self.mock_object(
self.netapp_aiq_weigher, '_get_request_method',
mock.Mock(return_value=self.mock_session.post))
res = self.netapp_aiq_weigher._get_session_method('post')
self.assertEqual(self.mock_session.post, res)
mock_session_builder.assert_called_once_with()
mock__get_request_method.assert_called_once_with(
'post', self.mock_session)
def test__call_active_iq(self):
response = mock.Mock()
response.content = "fake_response"
response.status_code = "fake_code"
mock_post = mock.Mock(return_value=response)
mock__get_session_method = self.mock_object(
self.netapp_aiq_weigher, '_get_session_method',
mock.Mock(return_value=mock_post))
fake_url = "fake_url"
fake_path = "/fake_path"
mock__get_url = self.mock_object(
self.netapp_aiq_weigher, '_get_url',
mock.Mock(return_value=fake_url))
self.netapp_aiq_weigher._call_active_iq(fake_path, "post",
body="fake_body")
mock_post.assert_called_once_with(fake_url + fake_path,
json="fake_body")
self.assertTrue(netapp_aiq.LOG.debug.called)
mock__get_session_method.assert_called_once_with("post")
mock__get_url.assert_called_once_with()
@ddt.data({}, jsonutils.dumps(
fakes.FAKE_ACTIVE_IQ_WEIGHER_AGGREGATES_RESPONSE))
def test__get_resource_keys(self, api_res):
mock__call_active_iq = self.mock_object(
self.netapp_aiq_weigher, '_call_active_iq',
mock.Mock(return_value=(200, api_res)))
res = self.netapp_aiq_weigher._get_resource_keys(
fakes.FAKE_ACTIVE_IQ_WEIGHER_LIST)
if api_res:
self.assertEqual(['fake_key_1', 'fake_key_2', 'fake_key_3'], res)
else:
self.assertEqual([0, 0, 0], res)
mock__call_active_iq.assert_called_once_with(
'datacenter/storage/aggregates', 'get')
@ddt.data(mock.Mock(side_effect=exception.NotFound),
mock.Mock(return_value=(400, "fake_res")))
def test__get_resource_keys_error(self, mock_cal):
self.mock_object(
self.netapp_aiq_weigher, '_call_active_iq', mock_cal)
res = self.netapp_aiq_weigher._get_resource_keys(
fakes.FAKE_ACTIVE_IQ_WEIGHER_LIST)
self.assertEqual([], res)
self.assertTrue(netapp_aiq.LOG.error.called)
@ddt.data([], jsonutils.dumps(
fakes.FAKE_ACTIVE_IQ_WEIGHER_BALANCE_RESPONSE))
def test__balance_aggregates(self, api_res):
mock__call_active_iq = self.mock_object(
self.netapp_aiq_weigher, '_call_active_iq',
mock.Mock(return_value=(200, api_res)))
res = self.netapp_aiq_weigher._balance_aggregates(
['fake_key_1', 'fake_key_2', 0, 'fake_key_3'], 10, 'fake_uuid')
if not api_res:
self.assertEqual([0.0, 0.0, 0.0, 0.0], res)
else:
self.assertEqual([10.0, 20.0, 0.0, 0.0], res)
fake_body = {
"capacity": '10GB',
"eval_method": 1,
"opt_method": 0,
"priority_order": ['ops'],
"separate_flag": False,
"resource_keys": ['fake_key_1', 'fake_key_2', 'fake_key_3'],
"ssl_key": 'fake_uuid'
}
mock__call_active_iq.assert_called_once_with(
'storage-provider/data-placement/balance', 'post', body=fake_body)
@ddt.data(mock.Mock(side_effect=exception.NotFound),
mock.Mock(return_value=(400, "fake_res")))
def test__balance_aggregates_error(self, mock_cal):
self.mock_object(
self.netapp_aiq_weigher, '_call_active_iq', mock_cal)
res = self.netapp_aiq_weigher._balance_aggregates(
['fake_key_1', 'fake_key_2', 0, 'fake_key_3'], 10, 'fake_uuid')
self.assertEqual([], res)
self.assertTrue(netapp_aiq.LOG.error.called)
@mock.patch('manila.db.api.IMPL.service_get_all_by_topic')
def _get_all_hosts(self, _mock_service_get_all_by_topic, disabled=False):
ctxt = context.get_admin_context()
fakes.mock_host_manager_db_calls(_mock_service_get_all_by_topic,
disabled=disabled)
host_states = self.host_manager.get_all_host_states_share(ctxt)
_mock_service_get_all_by_topic.assert_called_once_with(
ctxt, CONF.share_topic)
return host_states
def test_weigh_objects_netapp_only(self):
self.host_manager = fakes.FakeHostManagerNetAppOnly()
hosts = self._get_all_hosts() # pylint: disable=no-value-for-parameter
weight_properties = "fake_properties"
mock_weigh_active_iq = self.mock_object(
netapp_aiq.NetAppAIQWeigher, '_weigh_active_iq',
# third host wins
mock.Mock(return_value=[0.0, 0.0, 10.0, 0.0, 0.0, 0.0]))
weighed_host = self.weight_handler.get_weighed_objects(
[netapp_aiq.NetAppAIQWeigher],
hosts,
weight_properties)[0]
mock_weigh_active_iq.assert_called()
self.assertEqual(1.0, weighed_host.weight)
self.assertEqual(
'host3', utils.extract_host(weighed_host.obj.host))
def test_weigh_objects_non_netapp_backends(self):
self.host_manager = fakes.FakeHostManager()
hosts = self._get_all_hosts() # pylint: disable=no-value-for-parameter
weight_properties = "fake_properties"
mock_weigh_active_iq = self.mock_object(
netapp_aiq.NetAppAIQWeigher, '_weigh_active_iq')
weighed_host = self.weight_handler.get_weighed_objects(
[netapp_aiq.NetAppAIQWeigher],
hosts,
weight_properties)[0]
mock_weigh_active_iq.assert_not_called()
self.assertEqual(0.0, weighed_host.weight)
self.assertEqual(
'host1', utils.extract_host(weighed_host.obj.host))

View File

@ -502,6 +502,9 @@ class NetAppFileStorageLibraryTestCase(test.TestCase):
mock_get_flexgroup_space = self.mock_object(
self.library, '_get_flexgroup_pool_space',
mock.Mock(return_value=(fake_total, fake_free, fake_used)))
mock_get_cluster_name = self.mock_object(
self.library._client, 'get_cluster_name',
mock.Mock(return_value='fake_cluster_name'))
self.library._cache_pool_status = na_utils.DataCache(60)
self.library._have_cluster_creds = True
@ -520,6 +523,7 @@ class NetAppFileStorageLibraryTestCase(test.TestCase):
mock_get_flexgroup_space.assert_has_calls([
mock.call(fake.AGGREGATE_CAPACITIES,
fake.FLEXGROUP_POOL_OPT[fake.FLEXGROUP_POOL_NAME])])
mock_get_cluster_name.assert_called_once_with()
mock_get_pool.assert_has_calls([
mock.call(fake.AGGREGATES[0], fake_total, fake_free, fake_used),
mock.call(fake.AGGREGATES[1], fake_total, fake_free, fake_used),
@ -547,6 +551,7 @@ class NetAppFileStorageLibraryTestCase(test.TestCase):
fake_pool = copy.deepcopy(fake.POOLS[0])
fake_pool['filter_function'] = None
fake_pool['goodness_function'] = None
fake_pool['netapp_cluster_name'] = ''
self.library._have_cluster_creds = True
self.library._revert_to_snapshot_support = True
self.library._cluster_info = fake.CLUSTER_INFO

View File

@ -958,6 +958,7 @@ FLEXGROUP_POOL = {
'qos': True,
'security_service_update_support': True,
'netapp_flexgroup': True,
'netapp_cluster_name': 'fake_cluster_name',
}
FLEXGROUP_AGGR_SET = set(FLEXGROUP_POOL_OPT[FLEXGROUP_POOL_NAME])
@ -1040,6 +1041,7 @@ POOLS = [
'security_service_update_support': True,
'share_server_multiple_subnet_support': True,
'netapp_flexgroup': False,
'netapp_cluster_name': 'fake_cluster_name',
},
{
'pool_name': AGGREGATES[1],
@ -1068,6 +1070,7 @@ POOLS = [
'security_service_update_support': True,
'share_server_multiple_subnet_support': True,
'netapp_flexgroup': False,
'netapp_cluster_name': 'fake_cluster_name',
},
]
@ -1076,6 +1079,7 @@ POOLS_VSERVER_CREDS = [
'pool_name': AGGREGATES[0],
'filter_function': None,
'goodness_function': None,
'netapp_cluster_name': '',
'netapp_aggregate': AGGREGATES[0],
'total_capacity_gb': 'unknown',
'free_capacity_gb': 1.1,

View File

@ -0,0 +1,10 @@
---
features:
- |
Added the ``NetAppAIQWeigher`` scheduler weigher that relies on the
`NetApp Active IQ <https://www.netapp.com/services/support/active-iq/>`_
to weigh the hosts. It only works with NetApp backends. When other
backends exist, the weigher is skipped. Added a new NetApp specific
pool information called ``netapp_cluster_name`` that contains the name
of the cluster where the pool is located, it can be set by a new
NetApp configuration option.

View File

@ -65,6 +65,7 @@ manila.scheduler.weighers =
GoodnessWeigher = manila.scheduler.weighers.goodness:GoodnessWeigher
PoolWeigher = manila.scheduler.weighers.pool:PoolWeigher
HostAffinityWeigher = manila.scheduler.weighers.host_affinity:HostAffinityWeigher
NetAppAIQWeigher = manila.scheduler.weighers.netapp_aiq:NetAppAIQWeigher
oslo.config.opts =
manila = manila.opts:list_opts