From e3ef58f86bdf31ab0cecaefe0a6e7c656f7765a6 Mon Sep 17 00:00:00 2001
From: Felipe Rodrigues <felipefuty01@gmail.com>
Date: Fri, 3 Jun 2022 15:39:46 -0300
Subject: [PATCH] NetApp ONTAP: Implemented REST transition Client

As the result of adoption of a REST API by NetApp ONTAP, this patch
is adding the basic structure to migrate from ZAPI Client to REST
incrementally.

To avoid adding bugs to the pre-existent ZAPI client and easily backport
the new client to older releases, the new REST implementation is a
completely new code on the client layer: a new client REST and API REST
classes. The driver layer should be agnostic from this transition, the
interfaces that the driver calls should keep the same as before.

The REST client contains a fallback mechanism, which will call the
ZAPI client when the operation is not implemented yet. This first
patch is only implementing the get ONTAP version operation using
REST, all other operations keep working with ZAPI.

The REST client can only work with ONTAP 9.11.1 and upper.

A new configuration `netapp_use_legacy_client` is added to enable
the operator to switch between new REST client implementation and the
old ZAPI implementation. The default is `True`, given that the REST
client is still experimental.

partially-implements: bp netapp-ontap-rest-api-client
Change-Id: I0bbc7609df66b72e9f8e26f4abb8092ae68cbe63
---
 .../dataontap/client/client_cmode_rest.py     | 233 ++++++++++++
 .../netapp/dataontap/client/rest_api.py       | 274 ++++++++++++++
 .../dataontap/cluster_mode/data_motion.py     |  30 +-
 .../netapp/dataontap/cluster_mode/lib_base.py |  35 +-
 manila/share/drivers/netapp/options.py        |  10 +-
 manila/share/drivers/netapp/utils.py          |   4 +
 .../drivers/netapp/dataontap/client/fakes.py  | 175 +++++++++
 .../client/test_client_cmode_rest.py          | 349 ++++++++++++++++++
 .../netapp/dataontap/client/test_rest_api.py  | 341 +++++++++++++++++
 ...ntap-rest-api-client-4c83c7b931f950cf.yaml |  10 +
 10 files changed, 1440 insertions(+), 21 deletions(-)
 create mode 100644 manila/share/drivers/netapp/dataontap/client/client_cmode_rest.py
 create mode 100644 manila/share/drivers/netapp/dataontap/client/rest_api.py
 create mode 100644 manila/tests/share/drivers/netapp/dataontap/client/test_client_cmode_rest.py
 create mode 100644 manila/tests/share/drivers/netapp/dataontap/client/test_rest_api.py
 create mode 100644 releasenotes/notes/netapp-ontap-rest-api-client-4c83c7b931f950cf.yaml

diff --git a/manila/share/drivers/netapp/dataontap/client/client_cmode_rest.py b/manila/share/drivers/netapp/dataontap/client/client_cmode_rest.py
new file mode 100644
index 0000000000..eb165ed941
--- /dev/null
+++ b/manila/share/drivers/netapp/dataontap/client/client_cmode_rest.py
@@ -0,0 +1,233 @@
+# Copyright (c) 2023 NetApp, Inc. All rights reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from http import client as http_client
+
+from oslo_log import log
+
+from manila import exception
+from manila.i18n import _
+from manila.share.drivers.netapp.dataontap.client import client_base
+from manila.share.drivers.netapp.dataontap.client import client_cmode
+from manila.share.drivers.netapp.dataontap.client import rest_api as netapp_api
+from manila.share.drivers.netapp import utils as na_utils
+from manila import utils
+
+
+LOG = log.getLogger(__name__)
+DEFAULT_MAX_PAGE_LENGTH = 10000
+
+
+class NetAppRestClient(object):
+
+    def __init__(self, **kwargs):
+
+        self.connection = netapp_api.RestNaServer(
+            host=kwargs['hostname'],
+            transport_type=kwargs['transport_type'],
+            ssl_cert_path=kwargs['ssl_cert_path'],
+            port=kwargs['port'],
+            username=kwargs['username'],
+            password=kwargs['password'],
+            trace=kwargs.get('trace', False),
+            api_trace_pattern=kwargs.get('api_trace_pattern',
+                                         na_utils.API_TRACE_PATTERN))
+
+        self.async_rest_timeout = kwargs.get('async_rest_timeout', 60)
+
+        self.vserver = kwargs.get('vserver', None)
+
+        self.connection.set_vserver(self.vserver)
+
+        ontap_version = self.get_ontap_version(cached=False)
+        if ontap_version['version-tuple'] < (9, 11, 1):
+            msg = _('This driver can communicate with ONTAP via REST APIs '
+                    'exclusively only when paired with a NetApp ONTAP storage '
+                    'system running release 9.11.1 or newer. '
+                    'To use ZAPI and supported REST APIs instead, '
+                    'set "netapp_use_legacy_client" to True.')
+            raise exception.NetAppException(msg)
+        self.connection.set_ontap_version(ontap_version)
+
+        # NOTE(nahimsouza): ZAPI Client is needed to implement the fallback
+        # when a REST method is not supported.
+        self.zapi_client = client_cmode.NetAppCmodeClient(**kwargs)
+
+        self._init_features()
+
+    def _init_features(self):
+        """Initialize feature support map."""
+        self.features = client_base.Features()
+
+        # NOTE(felipe_rodrigues): REST client only runs with ONTAP 9.11.1 or
+        # upper, so all features below are supported with this client.
+        self.features.add_feature('SNAPMIRROR_V2', supported=True)
+        self.features.add_feature('SYSTEM_METRICS', supported=True)
+        self.features.add_feature('SYSTEM_CONSTITUENT_METRICS',
+                                  supported=True)
+        self.features.add_feature('BROADCAST_DOMAINS', supported=True)
+        self.features.add_feature('IPSPACES', supported=True)
+        self.features.add_feature('SUBNETS', supported=True)
+        self.features.add_feature('CLUSTER_PEER_POLICY', supported=True)
+        self.features.add_feature('ADVANCED_DISK_PARTITIONING',
+                                  supported=True)
+        self.features.add_feature('KERBEROS_VSERVER', supported=True)
+        self.features.add_feature('FLEXVOL_ENCRYPTION', supported=True)
+        self.features.add_feature('SVM_DR', supported=True)
+        self.features.add_feature('ADAPTIVE_QOS', supported=True)
+        self.features.add_feature('TRANSFER_LIMIT_NFS_CONFIG',
+                                  supported=True)
+        self.features.add_feature('CIFS_DC_ADD_SKIP_CHECK',
+                                  supported=True)
+        self.features.add_feature('LDAP_LDAP_SERVERS',
+                                  supported=True)
+        self.features.add_feature('FLEXGROUP', supported=True)
+        self.features.add_feature('FLEXGROUP_FAN_OUT', supported=True)
+        self.features.add_feature('SVM_MIGRATE', supported=True)
+
+    def __getattr__(self, name):
+        """If method is not implemented for REST, try to call the ZAPI."""
+        LOG.debug("The %s call is not supported for REST, falling back to "
+                  "ZAPI.", name)
+        # Don't use self.zapi_client to avoid reentrant call to __getattr__()
+        zapi_client = object.__getattribute__(self, 'zapi_client')
+        return getattr(zapi_client, name)
+
+    def _wait_job_result(self, job_url):
+
+        interval = 2
+        retries = (self.async_rest_timeout / interval)
+
+        @utils.retry(netapp_api.NaRetryableError, interval=interval,
+                     retries=retries, backoff_rate=1)
+        def _waiter():
+            response = self.send_request(job_url, 'get',
+                                         enable_tunneling=False)
+
+            job_state = response.get('state')
+            if job_state == 'success':
+                return response
+            elif job_state == 'failure':
+                message = response['error']['message']
+                code = response['error']['code']
+                raise netapp_api.NaRetryableError(message=message, code=code)
+
+            msg_args = {'job': job_url, 'state': job_state}
+            LOG.debug("Job %(job)s has not finished: %(state)s", msg_args)
+            raise netapp_api.NaRetryableError(message='Job is running.')
+
+        try:
+            return _waiter()
+        except netapp_api.NaRetryableError:
+            msg = _("Job %s did not reach the expected state. Retries "
+                    "exhausted. Aborting.") % job_url
+            raise na_utils.NetAppDriverException(msg)
+
+    def send_request(self, action_url, method, body=None, query=None,
+                     enable_tunneling=True,
+                     max_page_length=DEFAULT_MAX_PAGE_LENGTH,
+                     wait_on_accepted=True):
+
+        """Sends REST request to ONTAP.
+
+        :param action_url: action URL for the request
+        :param method: HTTP method for the request ('get', 'post', 'put',
+            'delete' or 'patch')
+        :param body: dict of arguments to be passed as request body
+        :param query: dict of arguments to be passed as query string
+        :param enable_tunneling: enable tunneling to the ONTAP host
+        :param max_page_length: size of the page during pagination
+        :param wait_on_accepted: if True, wait until the job finishes when
+            HTTP code 202 (Accepted) is returned
+
+        :returns: parsed REST response
+        """
+
+        response = None
+
+        if method == 'get':
+            response = self.get_records(
+                action_url, query, enable_tunneling, max_page_length)
+        else:
+            code, response = self.connection.invoke_successfully(
+                action_url, method, body=body, query=query,
+                enable_tunneling=enable_tunneling)
+
+            if code == http_client.ACCEPTED and wait_on_accepted:
+                # get job URL and discard '/api'
+                job_url = response['job']['_links']['self']['href'][4:]
+                response = self._wait_job_result(job_url)
+
+        return response
+
+    def get_records(self, action_url, query=None, enable_tunneling=True,
+                    max_page_length=DEFAULT_MAX_PAGE_LENGTH):
+        """Retrieves ONTAP resources using pagination REST request.
+
+        :param action_url: action URL for the request
+        :param query: dict of arguments to be passed as query string
+        :param enable_tunneling: enable tunneling to the ONTAP host
+        :param max_page_length: size of the page during pagination
+
+        :returns: dict containing records and num_records
+        """
+
+        # Initialize query variable if it is None
+        query = query if query else {}
+        query['max_records'] = max_page_length
+
+        _, response = self.connection.invoke_successfully(
+            action_url, 'get', query=query,
+            enable_tunneling=enable_tunneling)
+
+        # NOTE(nahimsouza): if all records are returned in the first call,
+        # 'next_url' will be None.
+        next_url = response.get('_links', {}).get('next', {}).get('href')
+        next_url = next_url[4:] if next_url else None  # discard '/api'
+
+        # Get remaining pages, saving data into first page
+        while next_url:
+            # NOTE(nahimsouza): clean the 'query', because the parameters are
+            # already included in 'next_url'.
+            _, next_response = self.connection.invoke_successfully(
+                next_url, 'get', query=None,
+                enable_tunneling=enable_tunneling)
+
+            response['num_records'] += next_response.get('num_records', 0)
+            response['records'].extend(next_response.get('records'))
+
+            next_url = (
+                next_response.get('_links', {}).get('next', {}).get('href'))
+            next_url = next_url[4:] if next_url else None  # discard '/api'
+
+        return response
+
+    def get_ontap_version(self, cached=True):
+        """Get the current Data ONTAP version."""
+
+        if cached:
+            return self.connection.get_ontap_version()
+
+        query = {
+            'fields': 'version'
+        }
+        response = self.send_request('/cluster/nodes', 'get', query=query)
+        records = response.get('records')[0]
+
+        return {
+            'version': records['version']['full'],
+            'version-tuple': (records['version']['generation'],
+                              records['version']['major'],
+                              records['version']['minor']),
+        }
diff --git a/manila/share/drivers/netapp/dataontap/client/rest_api.py b/manila/share/drivers/netapp/dataontap/client/rest_api.py
new file mode 100644
index 0000000000..6964a57062
--- /dev/null
+++ b/manila/share/drivers/netapp/dataontap/client/rest_api.py
@@ -0,0 +1,274 @@
+# Copyright 2023 NetApp, Inc. All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+"""
+NetApp API for REST Data ONTAP.
+
+Contains classes required to issue REST API calls to Data ONTAP.
+"""
+
+import re
+
+from oslo_log import log
+from oslo_serialization import jsonutils
+import requests
+from requests.adapters import HTTPAdapter
+from requests import auth
+from requests.packages.urllib3.util import retry
+
+from manila.share.drivers.netapp.dataontap.client import api
+from manila.share.drivers.netapp import utils
+
+
+LOG = log.getLogger(__name__)
+
+ESIS_CLONE_NOT_LICENSED = '14956'
+
+
+class NaRetryableError(api.NaApiError):
+    def __str__(self, *args, **kwargs):
+        return 'NetApp API failed. Try again. Reason - %s:%s' % (
+            self.code, self.message)
+
+
+class RestNaServer(object):
+
+    TRANSPORT_TYPE_HTTP = 'http'
+    TRANSPORT_TYPE_HTTPS = 'https'
+    HTTP_PORT = '80'
+    HTTPS_PORT = '443'
+    TUNNELING_HEADER_KEY = "X-Dot-SVM-Name"
+
+    def __init__(self, host, transport_type=TRANSPORT_TYPE_HTTP,
+                 ssl_cert_path=None, username=None, password=None, port=None,
+                 trace=False, api_trace_pattern=utils.API_TRACE_PATTERN):
+        self._host = host
+        self.set_transport_type(transport_type)
+        self.set_port(port=port)
+        self._username = username
+        self._password = password
+        self._trace = trace
+        self._api_trace_pattern = api_trace_pattern
+        self._timeout = None
+
+        if ssl_cert_path is not None:
+            self._ssl_verify = ssl_cert_path
+        else:
+            # Note(felipe_rodrigues): it will verify with the mozila CA roots,
+            # given by certifi package.
+            self._ssl_verify = True
+
+        LOG.debug('Using REST with NetApp controller: %s', self._host)
+
+    def set_transport_type(self, transport_type):
+        """Set the transport type protocol for API.
+
+        Supports http and https transport types.
+        """
+        if transport_type is None or transport_type.lower() not in (
+                RestNaServer.TRANSPORT_TYPE_HTTP,
+                RestNaServer.TRANSPORT_TYPE_HTTPS):
+            raise ValueError('Unsupported transport type')
+        self._protocol = transport_type.lower()
+
+    def get_transport_type(self):
+        """Get the transport type protocol."""
+        return self._protocol
+
+    def set_api_version(self, major, minor):
+        """Set the API version."""
+        try:
+            self._api_major_version = int(major)
+            self._api_minor_version = int(minor)
+            self._api_version = str(major) + "." + str(minor)
+        except ValueError:
+            raise ValueError('Major and minor versions must be integers')
+
+    def get_api_version(self):
+        """Gets the API version tuple."""
+        if hasattr(self, '_api_version'):
+            return (self._api_major_version, self._api_minor_version)
+        return None
+
+    def set_ontap_version(self, ontap_version):
+        """Set the ONTAP version."""
+        self._ontap_version = ontap_version
+
+    def get_ontap_version(self):
+        """Gets the ONTAP version."""
+        if hasattr(self, '_ontap_version'):
+            return self._ontap_version
+        return None
+
+    def set_port(self, port=None):
+        """Set the ONTAP port, if not informed, set with default one."""
+        if port is None and self._protocol == RestNaServer.TRANSPORT_TYPE_HTTP:
+            self._port = RestNaServer.HTTP_PORT
+        elif port is None:
+            self._port = RestNaServer.HTTPS_PORT
+        else:
+            try:
+                int(port)
+            except ValueError:
+                raise ValueError('Port must be integer')
+            self._port = str(port)
+
+    def get_port(self):
+        """Get the server communication port."""
+        return self._port
+
+    def set_timeout(self, seconds):
+        """Sets the timeout in seconds."""
+        try:
+            self._timeout = int(seconds)
+        except ValueError:
+            raise ValueError('timeout in seconds must be integer')
+
+    def get_timeout(self):
+        """Gets the timeout in seconds if set."""
+        return self._timeout
+
+    def set_vserver(self, vserver):
+        """Set the vserver to use if tunneling gets enabled."""
+        self._vserver = vserver
+
+    def get_vserver(self):
+        """Get the vserver to use in tunneling."""
+        return self._vserver
+
+    def __str__(self):
+        """Gets a representation of the client."""
+        return "server: %s" % (self._host)
+
+    def _get_request_method(self, method, session):
+        """Returns the request method to be used in the REST call."""
+
+        request_methods = {
+            'post': session.post,
+            'get': session.get,
+            'put': session.put,
+            'delete': session.delete,
+            'patch': session.patch,
+        }
+        return request_methods[method]
+
+    def _add_query_params_to_url(self, url, query):
+        """Populates the URL with specified filters."""
+        filters = '&'.join([f"{k}={v}" for k, v in query.items()])
+        url += "?" + filters
+        return url
+
+    def _get_base_url(self):
+        """Get the base URL for REST requests."""
+        host = self._host
+        if ':' in host:
+            host = '[%s]' % host
+        return f'{self._protocol}://{host}:{self._port}/api'
+
+    def _build_session(self, headers):
+        """Builds a session in the client."""
+        self._session = requests.Session()
+
+        # NOTE(felipe_rodrigues): request resilient of temporary network
+        # failures (like name resolution failure), retrying until 5 times.
+        max_retries = retry.Retry(total=5, connect=5, read=2, backoff_factor=1)
+        adapter = HTTPAdapter(max_retries=max_retries)
+        self._session.mount('%s://' % self._protocol, adapter)
+
+        self._session.auth = self._create_basic_auth_handler()
+        self._session.verify = self._ssl_verify
+        self._session.headers = headers
+
+    def _build_headers(self, enable_tunneling):
+        """Build and return headers for a REST request."""
+        headers = {
+            "Accept": "application/json",
+            "Content-Type": "application/json"
+        }
+        if enable_tunneling:
+            headers[RestNaServer.TUNNELING_HEADER_KEY] = self.get_vserver()
+
+        return headers
+
+    def _create_basic_auth_handler(self):
+        """Creates and returns a basic HTTP auth handler."""
+        return auth.HTTPBasicAuth(self._username, self._password)
+
+    def send_http_request(self, method, url, body, headers):
+        """Invoke the API on the server."""
+        data = jsonutils.dumps(body) if body else {}
+
+        self._build_session(headers)
+        request_method = self._get_request_method(method, self._session)
+
+        api_name_matches_regex = (re.match(self._api_trace_pattern, url)
+                                  is not None)
+        if self._trace and api_name_matches_regex:
+            svm = headers.get(RestNaServer.TUNNELING_HEADER_KEY)
+            message = ("Request: %(method)s Header=%(header)s %(url)s "
+                       "Body=%(body)s")
+            msg_args = {
+                "method": method.upper(),
+                "url": url,
+                "body": body,
+                "header": ({RestNaServer.TUNNELING_HEADER_KEY: svm}
+                           if svm else {}),
+            }
+            LOG.debug(message, msg_args)
+
+        try:
+            if self._timeout is not None:
+                response = request_method(
+                    url, data=data, timeout=self._timeout)
+            else:
+                response = request_method(url, data=data)
+        except requests.HTTPError as e:
+            raise api.NaApiError(e.errno, e.strerror)
+        except Exception as e:
+            raise api.NaApiError(message=e)
+
+        code = response.status_code
+        res = jsonutils.loads(response.content) if response.content else {}
+
+        if self._trace and api_name_matches_regex:
+            message = "Response: %(code)s Body=%(body)s"
+            msg_args = {
+                "code": code,
+                "body": res
+            }
+            LOG.debug(message, msg_args)
+
+        return code, res
+
+    def invoke_successfully(self, action_url, method, body=None, query=None,
+                            enable_tunneling=False):
+        """Invokes REST API and checks execution status as success."""
+        headers = self._build_headers(enable_tunneling)
+        if query:
+            action_url = self._add_query_params_to_url(action_url, query)
+        url = self._get_base_url() + action_url
+        code, response = self.send_http_request(method, url, body, headers)
+
+        if not response.get('error'):
+            return code, response
+
+        result_error = response.get('error')
+        code = result_error.get('code') or 'ESTATUSFAILED'
+        # TODO(felipe_rodrigues): add the correct code number for REST not
+        #  licensed clone error.
+        if code == ESIS_CLONE_NOT_LICENSED:
+            msg = 'Clone operation failed: FlexClone not licensed.'
+        else:
+            msg = (result_error.get('message')
+                   or 'Execution failed due to unknown reason')
+        raise api.NaApiError(code, msg)
diff --git a/manila/share/drivers/netapp/dataontap/cluster_mode/data_motion.py b/manila/share/drivers/netapp/dataontap/cluster_mode/data_motion.py
index 3fe9f26713..c6fd0f0495 100644
--- a/manila/share/drivers/netapp/dataontap/cluster_mode/data_motion.py
+++ b/manila/share/drivers/netapp/dataontap/cluster_mode/data_motion.py
@@ -30,6 +30,7 @@ from manila.share import configuration
 from manila.share import driver
 from manila.share.drivers.netapp.dataontap.client import api as netapp_api
 from manila.share.drivers.netapp.dataontap.client import client_cmode
+from manila.share.drivers.netapp.dataontap.client import client_cmode_rest
 from manila.share.drivers.netapp import options as na_opts
 from manila.share.drivers.netapp import utils as na_utils
 from manila.share import utils as share_utils
@@ -72,15 +73,26 @@ def get_backend_configuration(backend_name):
 
 def get_client_for_backend(backend_name, vserver_name=None):
     config = get_backend_configuration(backend_name)
-    client = client_cmode.NetAppCmodeClient(
-        transport_type=config.netapp_transport_type,
-        ssl_cert_path=config.netapp_ssl_cert_path,
-        username=config.netapp_login,
-        password=config.netapp_password,
-        hostname=config.netapp_server_hostname,
-        port=config.netapp_server_port,
-        vserver=vserver_name or config.netapp_vserver,
-        trace=na_utils.TRACE_API)
+    if config.netapp_use_legacy_client:
+        client = client_cmode.NetAppCmodeClient(
+            transport_type=config.netapp_transport_type,
+            ssl_cert_path=config.netapp_ssl_cert_path,
+            username=config.netapp_login,
+            password=config.netapp_password,
+            hostname=config.netapp_server_hostname,
+            port=config.netapp_server_port,
+            vserver=vserver_name or config.netapp_vserver,
+            trace=na_utils.TRACE_API)
+    else:
+        client = client_cmode_rest.NetAppRestClient(
+            transport_type=config.netapp_transport_type,
+            ssl_cert_path=config.netapp_ssl_cert_path,
+            username=config.netapp_login,
+            password=config.netapp_password,
+            hostname=config.netapp_server_hostname,
+            port=config.netapp_server_port,
+            vserver=vserver_name or config.netapp_vserver,
+            trace=na_utils.TRACE_API)
 
     return client
 
diff --git a/manila/share/drivers/netapp/dataontap/cluster_mode/lib_base.py b/manila/share/drivers/netapp/dataontap/cluster_mode/lib_base.py
index d2b62846a9..087c8bc940 100644
--- a/manila/share/drivers/netapp/dataontap/cluster_mode/lib_base.py
+++ b/manila/share/drivers/netapp/dataontap/cluster_mode/lib_base.py
@@ -40,6 +40,7 @@ from manila.i18n import _
 from manila.message import api as message_api
 from manila.share.drivers.netapp.dataontap.client import api as netapp_api
 from manila.share.drivers.netapp.dataontap.client import client_cmode
+from manila.share.drivers.netapp.dataontap.client import client_cmode_rest
 from manila.share.drivers.netapp.dataontap.cluster_mode import data_motion
 from manila.share.drivers.netapp.dataontap.cluster_mode import performance
 from manila.share.drivers.netapp.dataontap.protocols import cifs_cmode
@@ -215,20 +216,32 @@ class NetAppCmodeFileStorageLibrary(object):
     @na_utils.trace
     def _get_api_client(self, vserver=None):
 
-        # Use cached value to prevent calls to system-get-ontapi-version.
+        # Use cached value to prevent redo calls during client initialization.
         client = self._clients.get(vserver)
 
         if not client:
-            client = client_cmode.NetAppCmodeClient(
-                transport_type=self.configuration.netapp_transport_type,
-                ssl_cert_path=self.configuration.netapp_ssl_cert_path,
-                username=self.configuration.netapp_login,
-                password=self.configuration.netapp_password,
-                hostname=self.configuration.netapp_server_hostname,
-                port=self.configuration.netapp_server_port,
-                vserver=vserver,
-                trace=na_utils.TRACE_API,
-                api_trace_pattern=na_utils.API_TRACE_PATTERN)
+            if self.configuration.netapp_use_legacy_client:
+                client = client_cmode.NetAppCmodeClient(
+                    transport_type=self.configuration.netapp_transport_type,
+                    ssl_cert_path=self.configuration.netapp_ssl_cert_path,
+                    username=self.configuration.netapp_login,
+                    password=self.configuration.netapp_password,
+                    hostname=self.configuration.netapp_server_hostname,
+                    port=self.configuration.netapp_server_port,
+                    vserver=vserver,
+                    trace=na_utils.TRACE_API,
+                    api_trace_pattern=na_utils.API_TRACE_PATTERN)
+            else:
+                client = client_cmode_rest.NetAppRestClient(
+                    transport_type=self.configuration.netapp_transport_type,
+                    ssl_cert_path=self.configuration.netapp_ssl_cert_path,
+                    username=self.configuration.netapp_login,
+                    password=self.configuration.netapp_password,
+                    hostname=self.configuration.netapp_server_hostname,
+                    port=self.configuration.netapp_server_port,
+                    vserver=vserver,
+                    trace=na_utils.TRACE_API,
+                    api_trace_pattern=na_utils.API_TRACE_PATTERN)
             self._clients[vserver] = client
 
         return client
diff --git a/manila/share/drivers/netapp/options.py b/manila/share/drivers/netapp/options.py
index 4b5a08443b..f9f1e82ff4 100644
--- a/manila/share/drivers/netapp/options.py
+++ b/manila/share/drivers/netapp/options.py
@@ -37,7 +37,15 @@ netapp_connection_opts = [
     cfg.PortOpt('netapp_server_port',
                 help=('The TCP port to use for communication with the storage '
                       'system or proxy server. If not specified, Data ONTAP '
-                      'drivers will use 80 for HTTP and 443 for HTTPS.')), ]
+                      'drivers will use 80 for HTTP and 443 for HTTPS.')),
+    cfg.BoolOpt('netapp_use_legacy_client',
+                default=True,
+                help=('The ONTAP client used for retrieving and modifying '
+                      'data on the storage. The legacy client relies mostly '
+                      'on ZAPI calls, only using REST calls for SVM migrate '
+                      'feature. If set to False, the new REST client is used, '
+                      'which runs REST calls if supported, otherwise falls '
+                      'back to the equivalent ZAPI call.')), ]
 
 netapp_transport_opts = [
     cfg.StrOpt('netapp_transport_type',
diff --git a/manila/share/drivers/netapp/utils.py b/manila/share/drivers/netapp/utils.py
index 167e739283..d9b7732ffd 100644
--- a/manila/share/drivers/netapp/utils.py
+++ b/manila/share/drivers/netapp/utils.py
@@ -52,6 +52,10 @@ FLEXVOL_STYLE_EXTENDED = 'flexvol'
 FLEXGROUP_DEFAULT_POOL_NAME = 'flexgroup_auto'
 
 
+class NetAppDriverException(exception.ShareBackendException):
+    message = _("NetApp Manila Driver exception.")
+
+
 def validate_driver_instantiation(**kwargs):
     """Checks if a driver is instantiated other than by the unified driver.
 
diff --git a/manila/tests/share/drivers/netapp/dataontap/client/fakes.py b/manila/tests/share/drivers/netapp/dataontap/client/fakes.py
index 5684959287..b4bc264ecf 100644
--- a/manila/tests/share/drivers/netapp/dataontap/client/fakes.py
+++ b/manila/tests/share/drivers/netapp/dataontap/client/fakes.py
@@ -30,6 +30,7 @@ CONNECTION_INFO = {
     'api_trace_pattern': '(.*)',
 }
 
+FAKE_UUID = 'b32bab78-82be-11ec-a8a3-0242ac120002'
 CLUSTER_NAME = 'fake_cluster'
 REMOTE_CLUSTER_NAME = 'fake_cluster_2'
 CLUSTER_ADDRESS_1 = 'fake_cluster_address'
@@ -48,6 +49,7 @@ NFS_VERSIONS = ['nfs3', 'nfs4.0']
 ROOT_AGGREGATE_NAMES = ('root_aggr1', 'root_aggr2')
 ROOT_VOLUME_AGGREGATE_NAME = 'fake_root_aggr'
 ROOT_VOLUME_NAME = 'fake_root_volume'
+VOLUME_NAMES = ('volume1', 'volume2')
 SHARE_AGGREGATE_NAME = 'fake_aggr1'
 SHARE_AGGREGATE_NAMES = ('fake_aggr1', 'fake_aggr2')
 SHARE_AGGREGATE_RAID_TYPES = ('raid4', 'raid_dp')
@@ -3031,6 +3033,7 @@ FAKE_ACTION_URL = '/endpoint'
 FAKE_BASE_URL = '10.0.0.3/api'
 FAKE_HTTP_BODY = {'fake_key': 'fake_value'}
 FAKE_HTTP_QUERY = {'type': 'fake_type'}
+FAKE_FORMATTED_HTTP_QUERY = "?type=fake_type"
 FAKE_HTTP_HEADER = {"fake_header_key": "fake_header_value"}
 FAKE_URL_PARAMS = {"fake_url_key": "fake_url_value_to_be_concatenated"}
 
@@ -3237,3 +3240,175 @@ JOB_GET_STATE_NOT_UNIQUE_RESPONSE = etree.XML("""
 """ % {
     'state': JOB_STATE,
 })
+
+NO_RECORDS_RESPONSE_REST = {
+    "records": [],
+    "num_records": 0,
+    "_links": {
+        "self": {
+            "href": "/api/cluster/nodes"
+        }
+    }
+}
+
+ERROR_RESPONSE_REST = {
+    "error": {
+        "code": 1100,
+        "message": "fake error",
+    }
+}
+
+GET_VERSION_RESPONSE_REST = {
+    "records": [
+        {
+            "version": {
+                "generation": "9",
+                "minor": "11",
+                "major": "1",
+                "full": "NetApp Release 9.11.1: Sun Nov 05 18:20:57 UTC 2017"
+            }
+        }
+    ],
+    "_links": {
+        "next": {
+            "href": "/api/resourcelink"
+        },
+        "self": {
+            "href": "/api/resourcelink"
+        }
+    },
+    "num_records": 0
+}
+
+VOLUME_GET_ITER_RESPONSE_LIST_REST = [
+    {
+        "uuid": "2407b637-119c-11ec-a4fb-00a0b89c9a78",
+        "name": VOLUME_NAMES[0],
+        "state": "online",
+        "style": "flexvol",
+        "is_svm_root": False,
+        "type": "rw",
+        "error_state": {
+            "is_inconsistent": False
+        },
+        "_links": {
+            "self": {
+                "href": "/api/storage/volumes/2407b637-119c-11ec-a4fb"
+            }
+        }
+    },
+    {
+        "uuid": "2c190609-d51c-11eb-b83a",
+        "name": VOLUME_NAMES[1],
+        "state": "online",
+        "style": "flexvol",
+        "is_svm_root": False,
+        "type": "rw",
+        "error_state": {
+            "is_inconsistent": False
+        },
+        "_links": {
+            "self": {
+                "href": "/api/storage/volumes/2c190609-d51c-11eb-b83a"
+            }
+        }
+    }
+]
+
+VOLUME_GET_ITER_RESPONSE_REST_PAGE = {
+    "records": [
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+    ],
+    "num_records": 10,
+    "_links": {
+        "self": {
+            "href": "/api/storage/volumes?fields=name&max_records=2"
+        },
+        "next": {
+            "href": "/api/storage/volumes?"
+            f"start.uuid={VOLUME_GET_ITER_RESPONSE_LIST_REST[0]['uuid']}"
+            "&fields=name&max_records=2"
+        }
+    }
+}
+
+VOLUME_GET_ITER_RESPONSE_REST_LAST_PAGE = {
+    "records": [
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+    ],
+    "num_records": 8,
+}
+
+INVALID_GET_ITER_RESPONSE_NO_RECORDS_REST = {
+    "num_records": 1,
+}
+
+INVALID_GET_ITER_RESPONSE_NO_NUM_RECORDS_REST = {
+    "records": [],
+}
+
+JOB_RESPONSE_REST = {
+    "job": {
+        "uuid": "uuid-12345",
+        "_links": {
+            "self": {
+                "href": "/api/cluster/jobs/uuid-12345"
+            }
+        }
+    }
+}
+
+JOB_SUCCESSFUL_REST = {
+    "uuid": FAKE_UUID,
+    "description": "Fake description",
+    "state": "success",
+    "message": "success",
+    "code": 0,
+    "start_time": "2022-02-18T20:08:03+00:00",
+    "end_time": "2022-02-18T20:08:04+00:00",
+}
+
+JOB_RUNNING_REST = {
+    "uuid": FAKE_UUID,
+    "description": "Fake description",
+    "state": "running",
+    "message": "running",
+    "code": 0,
+}
+
+JOB_ERROR_REST = {
+    "uuid": FAKE_UUID,
+    "description": "Fake description",
+    "state": "failure",
+    "message": "failure",
+    "code": 4,
+    "error": {
+        "target": "uuid",
+        "arguments": [
+            {
+                "message": "string",
+                "code": "string"
+            }
+        ],
+        "message": "entry doesn't exist",
+        "code": "4"
+    },
+    "start_time": "2022-02-18T20:08:03+00:00",
+    "end_time": "2022-02-18T20:08:04+00:00",
+}
diff --git a/manila/tests/share/drivers/netapp/dataontap/client/test_client_cmode_rest.py b/manila/tests/share/drivers/netapp/dataontap/client/test_client_cmode_rest.py
new file mode 100644
index 0000000000..c70266cdbc
--- /dev/null
+++ b/manila/tests/share/drivers/netapp/dataontap/client/test_client_cmode_rest.py
@@ -0,0 +1,349 @@
+# Copyright (c) 2023 NetApp, Inc. All rights reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import copy
+from unittest import mock
+
+import ddt
+from oslo_log import log
+
+from manila.share.drivers.netapp.dataontap.client import client_cmode
+from manila.share.drivers.netapp.dataontap.client import client_cmode_rest
+from manila.share.drivers.netapp import utils as netapp_utils
+from manila import test
+from manila.tests.share.drivers.netapp.dataontap.client import fakes as fake
+
+
+@ddt.ddt
+class NetAppRestCmodeClientTestCase(test.TestCase):
+
+    def setUp(self):
+        super(NetAppRestCmodeClientTestCase, self).setUp()
+
+        # Mock loggers as themselves to allow logger arg validation
+        mock_logger = log.getLogger('mock_logger')
+        self.mock_object(client_cmode_rest.LOG,
+                         'error',
+                         mock.Mock(side_effect=mock_logger.error))
+        self.mock_object(client_cmode_rest.LOG,
+                         'warning',
+                         mock.Mock(side_effect=mock_logger.warning))
+        self.mock_object(client_cmode_rest.LOG,
+                         'debug',
+                         mock.Mock(side_effect=mock_logger.debug))
+        self.mock_object(client_cmode.NetAppCmodeClient,
+                         'get_ontapi_version',
+                         mock.Mock(return_value=(1, 20)))
+        # store the original reference so we can call it later in
+        # test_get_ontap_version
+        self.original_get_ontap_version = (
+            client_cmode_rest.NetAppRestClient.get_ontap_version)
+        self.mock_object(client_cmode_rest.NetAppRestClient,
+                         'get_ontap_version',
+                         mock.Mock(return_value={
+                             'version-tuple': (9, 11, 1),
+                             'version': fake.VERSION,
+                         }))
+        self.mock_object(client_cmode.NetAppCmodeClient,
+                         'get_system_version',
+                         mock.Mock(return_value={
+                             'version-tuple': (9, 10, 1),
+                             'version': fake.VERSION,
+                         }))
+
+        self.client = client_cmode_rest.NetAppRestClient(
+            **fake.CONNECTION_INFO)
+        self.client.connection = mock.MagicMock()
+
+        self.vserver_client = client_cmode.NetAppCmodeClient(
+            **fake.CONNECTION_INFO)
+        self.vserver_client.set_vserver(fake.VSERVER_NAME)
+        self.vserver_client.connection = mock.MagicMock()
+
+    def test_send_request(self):
+        expected = 'fake_response'
+        mock_get_records = self.mock_object(
+            self.client, 'get_records',
+            mock.Mock(return_value=expected))
+
+        res = self.client.send_request(
+            fake.FAKE_ACTION_URL, 'get',
+            body=fake.FAKE_HTTP_BODY,
+            query=fake.FAKE_HTTP_QUERY, enable_tunneling=False)
+
+        self.assertEqual(expected, res)
+        mock_get_records.assert_called_once_with(
+            fake.FAKE_ACTION_URL,
+            fake.FAKE_HTTP_QUERY, False, 10000)
+
+    def test_send_request_post(self):
+        expected = (201, 'fake_response')
+        mock_invoke = self.mock_object(
+            self.client.connection, 'invoke_successfully',
+            mock.Mock(return_value=expected))
+
+        res = self.client.send_request(
+            fake.FAKE_ACTION_URL, 'post',
+            body=fake.FAKE_HTTP_BODY,
+            query=fake.FAKE_HTTP_QUERY, enable_tunneling=False)
+
+        self.assertEqual(expected[1], res)
+        mock_invoke.assert_called_once_with(
+            fake.FAKE_ACTION_URL, 'post',
+            body=fake.FAKE_HTTP_BODY,
+            query=fake.FAKE_HTTP_QUERY, enable_tunneling=False)
+
+    def test_send_request_wait(self):
+        expected = (202, fake.JOB_RESPONSE_REST)
+        mock_invoke = self.mock_object(
+            self.client.connection, 'invoke_successfully',
+            mock.Mock(return_value=expected))
+
+        mock_wait = self.mock_object(
+            self.client, '_wait_job_result',
+            mock.Mock(return_value=expected[1]))
+
+        res = self.client.send_request(
+            fake.FAKE_ACTION_URL, 'post',
+            body=fake.FAKE_HTTP_BODY,
+            query=fake.FAKE_HTTP_QUERY, enable_tunneling=False)
+
+        self.assertEqual(expected[1], res)
+        mock_invoke.assert_called_once_with(
+            fake.FAKE_ACTION_URL, 'post',
+            body=fake.FAKE_HTTP_BODY,
+            query=fake.FAKE_HTTP_QUERY, enable_tunneling=False)
+        mock_wait.assert_called_once_with(
+            expected[1]['job']['_links']['self']['href'][4:])
+
+    @ddt.data(True, False)
+    def test_get_records(self, enable_tunneling):
+        api_responses = [
+            (200, fake.VOLUME_GET_ITER_RESPONSE_REST_PAGE),
+            (200, fake.VOLUME_GET_ITER_RESPONSE_REST_PAGE),
+            (200, fake.VOLUME_GET_ITER_RESPONSE_REST_LAST_PAGE),
+        ]
+
+        mock_invoke = self.mock_object(
+            self.client.connection, 'invoke_successfully',
+            mock.Mock(side_effect=copy.deepcopy(api_responses)))
+
+        query = {
+            'fields': 'name'
+        }
+
+        result = self.client.get_records(
+            '/storage/volumes/', query=query,
+            enable_tunneling=enable_tunneling,
+            max_page_length=10)
+
+        num_records = result['num_records']
+        self.assertEqual(28, num_records)
+        self.assertEqual(28, len(result['records']))
+
+        expected_records = []
+        expected_records.extend(api_responses[0][1]['records'])
+        expected_records.extend(api_responses[1][1]['records'])
+        expected_records.extend(api_responses[2][1]['records'])
+
+        self.assertEqual(expected_records, result['records'])
+
+        next_tag = result.get('next')
+        self.assertIsNone(next_tag)
+
+        expected_query = copy.deepcopy(query)
+        expected_query['max_records'] = 10
+
+        next_url_1 = api_responses[0][1]['_links']['next']['href'][4:]
+        next_url_2 = api_responses[1][1]['_links']['next']['href'][4:]
+
+        mock_invoke.assert_has_calls([
+            mock.call('/storage/volumes/', 'get', query=expected_query,
+                      enable_tunneling=enable_tunneling),
+            mock.call(next_url_1, 'get', query=None,
+                      enable_tunneling=enable_tunneling),
+            mock.call(next_url_2, 'get', query=None,
+                      enable_tunneling=enable_tunneling),
+        ])
+
+    def test_get_records_single_page(self):
+
+        api_response = (
+            200, fake.VOLUME_GET_ITER_RESPONSE_REST_LAST_PAGE)
+        mock_invoke = self.mock_object(self.client.connection,
+                                       'invoke_successfully',
+                                       mock.Mock(return_value=api_response))
+
+        query = {
+            'fields': 'name'
+        }
+
+        result = self.client.get_records(
+            '/storage/volumes/', query=query, max_page_length=10)
+
+        num_records = result['num_records']
+        self.assertEqual(8, num_records)
+        self.assertEqual(8, len(result['records']))
+
+        next_tag = result.get('next')
+        self.assertIsNone(next_tag)
+
+        args = copy.deepcopy(query)
+        args['max_records'] = 10
+
+        mock_invoke.assert_has_calls([
+            mock.call('/storage/volumes/', 'get', query=args,
+                      enable_tunneling=True),
+        ])
+
+    def test_get_records_not_found(self):
+
+        api_response = (200, fake.NO_RECORDS_RESPONSE_REST)
+        mock_invoke = self.mock_object(self.client.connection,
+                                       'invoke_successfully',
+                                       mock.Mock(return_value=api_response))
+
+        result = self.client.get_records('/storage/volumes/')
+
+        num_records = result['num_records']
+        self.assertEqual(0, num_records)
+        self.assertEqual(0, len(result['records']))
+
+        args = {
+            'max_records': client_cmode_rest.DEFAULT_MAX_PAGE_LENGTH
+        }
+
+        mock_invoke.assert_has_calls([
+            mock.call('/storage/volumes/', 'get', query=args,
+                      enable_tunneling=True),
+        ])
+
+    def test_get_records_timeout(self):
+        # To simulate timeout, max_records is 30, but the API returns less
+        # records and fill the 'next url' pointing to the next page.
+        max_records = 30
+        api_responses = [
+            (200, fake.VOLUME_GET_ITER_RESPONSE_REST_PAGE),
+            (200, fake.VOLUME_GET_ITER_RESPONSE_REST_PAGE),
+            (200, fake.VOLUME_GET_ITER_RESPONSE_REST_LAST_PAGE),
+        ]
+
+        mock_invoke = self.mock_object(
+            self.client.connection, 'invoke_successfully',
+            mock.Mock(side_effect=copy.deepcopy(api_responses)))
+
+        query = {
+            'fields': 'name'
+        }
+
+        result = self.client.get_records(
+            '/storage/volumes/', query=query, max_page_length=max_records)
+
+        num_records = result['num_records']
+        self.assertEqual(28, num_records)
+        self.assertEqual(28, len(result['records']))
+
+        expected_records = []
+        expected_records.extend(api_responses[0][1]['records'])
+        expected_records.extend(api_responses[1][1]['records'])
+        expected_records.extend(api_responses[2][1]['records'])
+
+        self.assertEqual(expected_records, result['records'])
+
+        next_tag = result.get('next', None)
+        self.assertIsNone(next_tag)
+
+        args1 = copy.deepcopy(query)
+        args1['max_records'] = max_records
+
+        next_url_1 = api_responses[0][1]['_links']['next']['href'][4:]
+        next_url_2 = api_responses[1][1]['_links']['next']['href'][4:]
+
+        mock_invoke.assert_has_calls([
+            mock.call('/storage/volumes/', 'get', query=args1,
+                      enable_tunneling=True),
+            mock.call(next_url_1, 'get', query=None, enable_tunneling=True),
+            mock.call(next_url_2, 'get', query=None, enable_tunneling=True),
+        ])
+
+    def test__getattr__(self):
+        # NOTE(nahimsouza): get_ontapi_version is implemented only in ZAPI
+        # client, therefore, it will call __getattr__
+        self.client.get_ontapi_version()
+
+    @ddt.data(True, False)
+    def test_get_ontap_version(self, cached):
+        self.client.get_ontap_version = (
+            self.original_get_ontap_version)
+        api_response = {
+            'records': [
+                {
+                    'version': {
+                        'generation': 9,
+                        'major': 11,
+                        'minor': 1,
+                        'full': 'NetApp Release 9.11.1'
+                    }
+                }]
+
+        }
+        return_mock = {
+            'version': 'NetApp Release 9.11.1',
+            'version-tuple': (9, 11, 1)
+        }
+        mock_connect = self.mock_object(self.client.connection,
+                                        'get_ontap_version',
+                                        mock.Mock(return_value=return_mock))
+        mock_send_request = self.mock_object(
+            self.client,
+            'send_request',
+            mock.Mock(return_value=api_response))
+
+        result = self.client.get_ontap_version(self=self.client, cached=cached)
+
+        if cached:
+            mock_connect.assert_called_once()
+        else:
+            mock_send_request.assert_called_once_with(
+                '/cluster/nodes', 'get', query={'fields': 'version'})
+
+        self.assertEqual(return_mock, result)
+
+    def test__wait_job_result(self):
+        response = fake.JOB_SUCCESSFUL_REST
+        self.mock_object(self.client,
+                         'send_request',
+                         mock.Mock(return_value=response))
+        result = self.client._wait_job_result(
+            f'/cluster/jobs/{fake.FAKE_UUID}')
+        self.assertEqual(response, result)
+
+    def test__wait_job_result_failure(self):
+        response = fake.JOB_ERROR_REST
+        self.mock_object(self.client,
+                         'send_request',
+                         mock.Mock(return_value=response))
+        self.assertRaises(netapp_utils.NetAppDriverException,
+                          self.client._wait_job_result,
+                          f'/cluster/jobs/{fake.FAKE_UUID}')
+
+    def test__wait_job_result_timeout(self):
+        response = fake.JOB_RUNNING_REST
+        self.client.async_rest_timeout = 2
+        self.mock_object(self.client,
+                         'send_request',
+                         mock.Mock(return_value=response))
+        self.assertRaises(netapp_utils.NetAppDriverException,
+                          self.client._wait_job_result,
+                          f'/cluster/jobs/{fake.FAKE_UUID}')
diff --git a/manila/tests/share/drivers/netapp/dataontap/client/test_rest_api.py b/manila/tests/share/drivers/netapp/dataontap/client/test_rest_api.py
new file mode 100644
index 0000000000..1d09d5f53d
--- /dev/null
+++ b/manila/tests/share/drivers/netapp/dataontap/client/test_rest_api.py
@@ -0,0 +1,341 @@
+# Copyright 2022 NetApp, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+"""
+Tests for NetApp REST API layer
+"""
+
+from unittest import mock
+
+import ddt
+from oslo_serialization import jsonutils
+import requests
+from requests import auth
+
+from manila.share.drivers.netapp.dataontap.client import api as legacy_api
+from manila.share.drivers.netapp.dataontap.client import rest_api as netapp_api
+from manila import test
+from manila.tests.share.drivers.netapp.dataontap.client import fakes as fake
+
+
+@ddt.ddt
+class NetAppRestApiServerTests(test.TestCase):
+    """Test case for NetApp REST API server methods."""
+    def setUp(self):
+        self.rest_client = netapp_api.RestNaServer('127.0.0.1')
+        super(NetAppRestApiServerTests, self).setUp()
+
+    @ddt.data(None, 'my_cert')
+    def test__init__ssl_verify(self, ssl_cert_path):
+        client = netapp_api.RestNaServer('127.0.0.1',
+                                         ssl_cert_path=ssl_cert_path)
+
+        if ssl_cert_path:
+            self.assertEqual(ssl_cert_path, client._ssl_verify)
+        else:
+            self.assertTrue(client._ssl_verify)
+
+    @ddt.data(None, 'ftp')
+    def test_set_transport_type_value_error(self, transport_type):
+        self.assertRaises(ValueError, self.rest_client.set_transport_type,
+                          transport_type)
+
+    @ddt.data('!&', '80na', '')
+    def test_set_port__value_error(self, port):
+        self.assertRaises(ValueError, self.rest_client.set_port, port)
+
+    @ddt.data(
+        {'port': None, 'protocol': 'http', 'expected_port': '80'},
+        {'port': None, 'protocol': 'https', 'expected_port': '443'},
+        {'port': '111', 'protocol': None, 'expected_port': '111'}
+    )
+    @ddt.unpack
+    def test_set_port(self, port, protocol, expected_port):
+        self.rest_client._protocol = protocol
+
+        self.rest_client.set_port(port=port)
+
+        self.assertEqual(expected_port, self.rest_client._port)
+
+    @ddt.data('!&', '80na', '')
+    def test_set_timeout_value_error(self, timeout):
+        self.assertRaises(ValueError, self.rest_client.set_timeout, timeout)
+
+    @ddt.data({'params': {'major': 1, 'minor': '20a'}},
+              {'params': {'major': '20a', 'minor': 1}},
+              {'params': {'major': '!*', 'minor': '20a'}})
+    @ddt.unpack
+    def test_set_api_version_value_error(self, params):
+        self.assertRaises(ValueError, self.rest_client.set_api_version,
+                          **params)
+
+    def test_set_api_version_valid(self):
+        args = {'major': '20', 'minor': 1}
+
+        self.rest_client.set_api_version(**args)
+
+        self.assertEqual(self.rest_client._api_major_version, 20)
+        self.assertEqual(self.rest_client._api_minor_version, 1)
+        self.assertEqual(self.rest_client._api_version, "20.1")
+
+    def test_invoke_successfully_naapi_error(self):
+        self.mock_object(self.rest_client, '_build_headers')
+        self.mock_object(self.rest_client, '_get_base_url',
+                         mock.Mock(return_value=''))
+        self.mock_object(
+            self.rest_client, 'send_http_request',
+            mock.Mock(return_value=(10, fake.ERROR_RESPONSE_REST)))
+
+        self.assertRaises(legacy_api.NaApiError,
+                          self.rest_client.invoke_successfully,
+                          fake.FAKE_ACTION_URL, 'get')
+
+    @ddt.data(None, {'fields': 'fake_fields'})
+    def test_invoke_successfully(self, query):
+        mock_build_header = self.mock_object(
+            self.rest_client, '_build_headers',
+            mock.Mock(return_value=fake.FAKE_HTTP_HEADER))
+        mock_base = self.mock_object(
+            self.rest_client, '_get_base_url',
+            mock.Mock(return_value=fake.FAKE_BASE_URL))
+        mock_add_query = self.mock_object(
+            self.rest_client, '_add_query_params_to_url',
+            mock.Mock(return_value=fake.FAKE_ACTION_URL))
+        http_code = 200
+        mock_send_http = self.mock_object(
+            self.rest_client, 'send_http_request',
+            mock.Mock(return_value=(http_code, fake.NO_RECORDS_RESPONSE_REST)))
+
+        code, response = self.rest_client.invoke_successfully(
+            fake.FAKE_ACTION_URL, 'get', body=fake.FAKE_HTTP_BODY, query=query,
+            enable_tunneling=True)
+
+        self.assertEqual(response, fake.NO_RECORDS_RESPONSE_REST)
+        self.assertEqual(code, http_code)
+        mock_build_header.assert_called_once_with(True)
+        mock_base.assert_called_once_with()
+        self.assertEqual(bool(query), mock_add_query.called)
+        mock_send_http.assert_called_once_with(
+            'get',
+            fake.FAKE_BASE_URL + fake.FAKE_ACTION_URL, fake.FAKE_HTTP_BODY,
+            fake.FAKE_HTTP_HEADER)
+
+    @ddt.data(
+        {'error': requests.HTTPError(), 'raised': legacy_api.NaApiError},
+        {'error': Exception, 'raised': legacy_api.NaApiError})
+    @ddt.unpack
+    def test_send_http_request_http_error(self, error, raised):
+        self.mock_object(netapp_api, 'LOG')
+        self.mock_object(self.rest_client, '_build_session')
+        self.rest_client._session = mock.Mock()
+        self.mock_object(
+            self.rest_client, '_get_request_method', mock.Mock(
+                return_value=mock.Mock(side_effect=error)))
+
+        self.assertRaises(raised, self.rest_client.send_http_request,
+                          'get', fake.FAKE_ACTION_URL, fake.FAKE_HTTP_BODY,
+                          fake.FAKE_HTTP_HEADER)
+
+    @ddt.data(
+        {
+            'resp_content': fake.NO_RECORDS_RESPONSE_REST,
+            'body': fake.FAKE_HTTP_BODY,
+            'timeout': 10,
+        },
+        {
+            'resp_content': fake.NO_RECORDS_RESPONSE_REST,
+            'body': fake.FAKE_HTTP_BODY,
+            'timeout': None,
+        },
+        {
+            'resp_content': fake.NO_RECORDS_RESPONSE_REST,
+            'body': None,
+            'timeout': None,
+        },
+        {
+            'resp_content': None,
+            'body': None,
+            'timeout': None,
+        }
+    )
+    @ddt.unpack
+    def test_send_http_request(self, resp_content, body, timeout):
+        if timeout:
+            self.rest_client._timeout = timeout
+        self.mock_object(netapp_api, 'LOG')
+        mock_json_dumps = self.mock_object(
+            jsonutils, 'dumps', mock.Mock(return_value='fake_dump_body'))
+        mock_build_session = self.mock_object(
+            self.rest_client, '_build_session')
+        _mock_session = mock.Mock()
+        self.rest_client._session = _mock_session
+        response = mock.Mock()
+        response.content = resp_content
+        response.status_code = 10
+        mock_post = mock.Mock(return_value=response)
+        mock_get_request_method = self.mock_object(
+            self.rest_client, '_get_request_method', mock.Mock(
+                return_value=mock_post))
+        mock_json_loads = self.mock_object(
+            jsonutils, 'loads',
+            mock.Mock(return_value='fake_loads_response'))
+
+        code, res = self.rest_client.send_http_request(
+            'post', fake.FAKE_ACTION_URL, body, fake.FAKE_HTTP_HEADER)
+
+        expected_res = 'fake_loads_response' if resp_content else {}
+        self.assertEqual(expected_res, res)
+        self.assertEqual(10, code)
+        self.assertEqual(bool(body), mock_json_dumps.called)
+        self.assertEqual(bool(resp_content), mock_json_loads.called)
+        mock_build_session.assert_called_once_with(fake.FAKE_HTTP_HEADER)
+        mock_get_request_method.assert_called_once_with('post', _mock_session)
+        expected_data = 'fake_dump_body' if body else {}
+        if timeout:
+            mock_post.assert_called_once_with(
+                fake.FAKE_ACTION_URL, data=expected_data, timeout=timeout)
+        else:
+            mock_post.assert_called_once_with(fake.FAKE_ACTION_URL,
+                                              data=expected_data)
+
+    @ddt.data(
+        {'host': '192.168.1.0', 'port': '80', 'protocol': 'http'},
+        {'host': '0.0.0.0', 'port': '443', 'protocol': 'https'},
+        {'host': '::ffff:8', 'port': '80', 'protocol': 'http'},
+        {'host': 'fdf8:f53b:82e4::53', 'port': '443', 'protocol': 'https'})
+    @ddt.unpack
+    def test__get_base_url(self, host, port, protocol):
+        client = netapp_api.RestNaServer(host, port=port,
+                                         transport_type=protocol)
+        expected_host = f'[{host}]' if ':' in host else host
+        expected_url = '%s://%s:%s/api' % (protocol, expected_host, port)
+
+        url = client._get_base_url()
+
+        self.assertEqual(expected_url, url)
+
+    def test__add_query_params_to_url(self):
+        formatted_url = self.rest_client._add_query_params_to_url(
+            fake.FAKE_ACTION_URL, fake.FAKE_HTTP_QUERY)
+
+        expected_formatted_url = fake.FAKE_ACTION_URL
+        expected_formatted_url += fake.FAKE_FORMATTED_HTTP_QUERY
+        self.assertEqual(expected_formatted_url, formatted_url)
+
+    @ddt.data('post', 'get', 'put', 'delete', 'patch')
+    def test_get_request_method(self, method):
+        _mock_session = mock.Mock()
+        _mock_session.post = mock.Mock()
+        _mock_session.get = mock.Mock()
+        _mock_session.put = mock.Mock()
+        _mock_session.delete = mock.Mock()
+        _mock_session.patch = mock.Mock()
+
+        res = self.rest_client._get_request_method(method, _mock_session)
+
+        expected_method = getattr(_mock_session, method)
+        self.assertEqual(expected_method, res)
+
+    def test__str__(self):
+        fake_host = 'fake_host'
+        client = netapp_api.RestNaServer(fake_host)
+
+        expected_str = "server: %s" % fake_host
+        self.assertEqual(expected_str, str(client))
+
+    def test_get_transport_type(self):
+        expected_protocol = 'fake_protocol'
+        self.rest_client._protocol = expected_protocol
+
+        res = self.rest_client.get_transport_type()
+
+        self.assertEqual(expected_protocol, res)
+
+    @ddt.data(None, ('1', '0'))
+    def test_get_api_version(self, api_version):
+        if api_version:
+            self.rest_client._api_version = str(api_version)
+            (self.rest_client._api_major_version, _) = api_version
+            (_, self.rest_client._api_minor_version) = api_version
+
+        res = self.rest_client.get_api_version()
+
+        self.assertEqual(api_version, res)
+
+    @ddt.data(None, '9.10')
+    def test_get_ontap_version(self, ontap_version):
+        if ontap_version:
+            self.rest_client._ontap_version = ontap_version
+
+        res = self.rest_client.get_ontap_version()
+
+        self.assertEqual(ontap_version, res)
+
+    def test_set_vserver(self):
+        expected_vserver = 'fake_vserver'
+        self.rest_client.set_vserver(expected_vserver)
+
+        self.assertEqual(expected_vserver, self.rest_client._vserver)
+
+    def test_get_vserver(self):
+        expected_vserver = 'fake_vserver'
+        self.rest_client._vserver = expected_vserver
+
+        res = self.rest_client.get_vserver()
+
+        self.assertEqual(expected_vserver, res)
+
+    def test__build_session(self):
+        fake_session = mock.Mock()
+        mock_requests_session = self.mock_object(
+            requests, 'Session', mock.Mock(return_value=fake_session))
+        mock_auth = self.mock_object(
+            self.rest_client, '_create_basic_auth_handler',
+            mock.Mock(return_value='fake_auth'))
+        self.rest_client._ssl_verify = 'fake_ssl'
+
+        self.rest_client._build_session(fake.FAKE_HTTP_HEADER)
+
+        self.assertEqual(fake_session, self.rest_client._session)
+        self.assertEqual('fake_auth', self.rest_client._session.auth)
+        self.assertEqual('fake_ssl', self.rest_client._session.verify)
+        self.assertEqual(fake.FAKE_HTTP_HEADER,
+                         self.rest_client._session.headers)
+        mock_requests_session.assert_called_once_with()
+        mock_auth.assert_called_once_with()
+
+    @ddt.data(True, False)
+    def test__build_headers(self, enable_tunneling):
+        self.rest_client._vserver = fake.VSERVER_NAME
+
+        res = self.rest_client._build_headers(enable_tunneling)
+
+        expected = {
+            "Accept": "application/json",
+            "Content-Type": "application/json"
+        }
+        if enable_tunneling:
+            expected["X-Dot-SVM-Name"] = fake.VSERVER_NAME
+        self.assertEqual(expected, res)
+
+    def test__create_basic_auth_handler(self):
+        username = 'fake_username'
+        password = 'fake_password'
+        client = netapp_api.RestNaServer('10.1.1.1', username=username,
+                                         password=password)
+
+        res = client._create_basic_auth_handler()
+
+        expected = auth.HTTPBasicAuth(username, password)
+        self.assertEqual(expected.__dict__, res.__dict__)
diff --git a/releasenotes/notes/netapp-ontap-rest-api-client-4c83c7b931f950cf.yaml b/releasenotes/notes/netapp-ontap-rest-api-client-4c83c7b931f950cf.yaml
new file mode 100644
index 0000000000..ee164f6f14
--- /dev/null
+++ b/releasenotes/notes/netapp-ontap-rest-api-client-4c83c7b931f950cf.yaml
@@ -0,0 +1,10 @@
+---
+features:
+  - |
+    NetApp driver: it has now the option to request ONTAP operations through
+    REST API. The new option `netapp_use_legacy_client` allows switching
+    between the old ZAPI client approach and new REST client. It is default
+    to `True`, meaning that the drivers will keep working as before using ZAPI
+    operations. If desired, this option can be set to `False` connecting with
+    new REST client that performs REST API operations if it is available,
+    otherwise falls back to ZAPI.