diff --git a/manila/share/drivers/netapp/dataontap/client/client_cmode_rest.py b/manila/share/drivers/netapp/dataontap/client/client_cmode_rest.py
new file mode 100644
index 0000000000..eb165ed941
--- /dev/null
+++ b/manila/share/drivers/netapp/dataontap/client/client_cmode_rest.py
@@ -0,0 +1,233 @@
+# Copyright (c) 2023 NetApp, Inc. All rights reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from http import client as http_client
+
+from oslo_log import log
+
+from manila import exception
+from manila.i18n import _
+from manila.share.drivers.netapp.dataontap.client import client_base
+from manila.share.drivers.netapp.dataontap.client import client_cmode
+from manila.share.drivers.netapp.dataontap.client import rest_api as netapp_api
+from manila.share.drivers.netapp import utils as na_utils
+from manila import utils
+
+
+LOG = log.getLogger(__name__)
+DEFAULT_MAX_PAGE_LENGTH = 10000
+
+
+class NetAppRestClient(object):
+
+    def __init__(self, **kwargs):
+
+        self.connection = netapp_api.RestNaServer(
+            host=kwargs['hostname'],
+            transport_type=kwargs['transport_type'],
+            ssl_cert_path=kwargs['ssl_cert_path'],
+            port=kwargs['port'],
+            username=kwargs['username'],
+            password=kwargs['password'],
+            trace=kwargs.get('trace', False),
+            api_trace_pattern=kwargs.get('api_trace_pattern',
+                                         na_utils.API_TRACE_PATTERN))
+
+        self.async_rest_timeout = kwargs.get('async_rest_timeout', 60)
+
+        self.vserver = kwargs.get('vserver', None)
+
+        self.connection.set_vserver(self.vserver)
+
+        ontap_version = self.get_ontap_version(cached=False)
+        if ontap_version['version-tuple'] < (9, 11, 1):
+            msg = _('This driver can communicate with ONTAP via REST APIs '
+                    'exclusively only when paired with a NetApp ONTAP storage '
+                    'system running release 9.11.1 or newer. '
+                    'To use ZAPI and supported REST APIs instead, '
+                    'set "netapp_use_legacy_client" to True.')
+            raise exception.NetAppException(msg)
+        self.connection.set_ontap_version(ontap_version)
+
+        # NOTE(nahimsouza): ZAPI Client is needed to implement the fallback
+        # when a REST method is not supported.
+        self.zapi_client = client_cmode.NetAppCmodeClient(**kwargs)
+
+        self._init_features()
+
+    def _init_features(self):
+        """Initialize feature support map."""
+        self.features = client_base.Features()
+
+        # NOTE(felipe_rodrigues): REST client only runs with ONTAP 9.11.1 or
+        # upper, so all features below are supported with this client.
+        self.features.add_feature('SNAPMIRROR_V2', supported=True)
+        self.features.add_feature('SYSTEM_METRICS', supported=True)
+        self.features.add_feature('SYSTEM_CONSTITUENT_METRICS',
+                                  supported=True)
+        self.features.add_feature('BROADCAST_DOMAINS', supported=True)
+        self.features.add_feature('IPSPACES', supported=True)
+        self.features.add_feature('SUBNETS', supported=True)
+        self.features.add_feature('CLUSTER_PEER_POLICY', supported=True)
+        self.features.add_feature('ADVANCED_DISK_PARTITIONING',
+                                  supported=True)
+        self.features.add_feature('KERBEROS_VSERVER', supported=True)
+        self.features.add_feature('FLEXVOL_ENCRYPTION', supported=True)
+        self.features.add_feature('SVM_DR', supported=True)
+        self.features.add_feature('ADAPTIVE_QOS', supported=True)
+        self.features.add_feature('TRANSFER_LIMIT_NFS_CONFIG',
+                                  supported=True)
+        self.features.add_feature('CIFS_DC_ADD_SKIP_CHECK',
+                                  supported=True)
+        self.features.add_feature('LDAP_LDAP_SERVERS',
+                                  supported=True)
+        self.features.add_feature('FLEXGROUP', supported=True)
+        self.features.add_feature('FLEXGROUP_FAN_OUT', supported=True)
+        self.features.add_feature('SVM_MIGRATE', supported=True)
+
+    def __getattr__(self, name):
+        """If method is not implemented for REST, try to call the ZAPI."""
+        LOG.debug("The %s call is not supported for REST, falling back to "
+                  "ZAPI.", name)
+        # Don't use self.zapi_client to avoid reentrant call to __getattr__()
+        zapi_client = object.__getattribute__(self, 'zapi_client')
+        return getattr(zapi_client, name)
+
+    def _wait_job_result(self, job_url):
+
+        interval = 2
+        retries = (self.async_rest_timeout / interval)
+
+        @utils.retry(netapp_api.NaRetryableError, interval=interval,
+                     retries=retries, backoff_rate=1)
+        def _waiter():
+            response = self.send_request(job_url, 'get',
+                                         enable_tunneling=False)
+
+            job_state = response.get('state')
+            if job_state == 'success':
+                return response
+            elif job_state == 'failure':
+                message = response['error']['message']
+                code = response['error']['code']
+                raise netapp_api.NaRetryableError(message=message, code=code)
+
+            msg_args = {'job': job_url, 'state': job_state}
+            LOG.debug("Job %(job)s has not finished: %(state)s", msg_args)
+            raise netapp_api.NaRetryableError(message='Job is running.')
+
+        try:
+            return _waiter()
+        except netapp_api.NaRetryableError:
+            msg = _("Job %s did not reach the expected state. Retries "
+                    "exhausted. Aborting.") % job_url
+            raise na_utils.NetAppDriverException(msg)
+
+    def send_request(self, action_url, method, body=None, query=None,
+                     enable_tunneling=True,
+                     max_page_length=DEFAULT_MAX_PAGE_LENGTH,
+                     wait_on_accepted=True):
+
+        """Sends REST request to ONTAP.
+
+        :param action_url: action URL for the request
+        :param method: HTTP method for the request ('get', 'post', 'put',
+            'delete' or 'patch')
+        :param body: dict of arguments to be passed as request body
+        :param query: dict of arguments to be passed as query string
+        :param enable_tunneling: enable tunneling to the ONTAP host
+        :param max_page_length: size of the page during pagination
+        :param wait_on_accepted: if True, wait until the job finishes when
+            HTTP code 202 (Accepted) is returned
+
+        :returns: parsed REST response
+        """
+
+        response = None
+
+        if method == 'get':
+            response = self.get_records(
+                action_url, query, enable_tunneling, max_page_length)
+        else:
+            code, response = self.connection.invoke_successfully(
+                action_url, method, body=body, query=query,
+                enable_tunneling=enable_tunneling)
+
+            if code == http_client.ACCEPTED and wait_on_accepted:
+                # get job URL and discard '/api'
+                job_url = response['job']['_links']['self']['href'][4:]
+                response = self._wait_job_result(job_url)
+
+        return response
+
+    def get_records(self, action_url, query=None, enable_tunneling=True,
+                    max_page_length=DEFAULT_MAX_PAGE_LENGTH):
+        """Retrieves ONTAP resources using pagination REST request.
+
+        :param action_url: action URL for the request
+        :param query: dict of arguments to be passed as query string
+        :param enable_tunneling: enable tunneling to the ONTAP host
+        :param max_page_length: size of the page during pagination
+
+        :returns: dict containing records and num_records
+        """
+
+        # Initialize query variable if it is None
+        query = query if query else {}
+        query['max_records'] = max_page_length
+
+        _, response = self.connection.invoke_successfully(
+            action_url, 'get', query=query,
+            enable_tunneling=enable_tunneling)
+
+        # NOTE(nahimsouza): if all records are returned in the first call,
+        # 'next_url' will be None.
+        next_url = response.get('_links', {}).get('next', {}).get('href')
+        next_url = next_url[4:] if next_url else None  # discard '/api'
+
+        # Get remaining pages, saving data into first page
+        while next_url:
+            # NOTE(nahimsouza): clean the 'query', because the parameters are
+            # already included in 'next_url'.
+            _, next_response = self.connection.invoke_successfully(
+                next_url, 'get', query=None,
+                enable_tunneling=enable_tunneling)
+
+            response['num_records'] += next_response.get('num_records', 0)
+            response['records'].extend(next_response.get('records'))
+
+            next_url = (
+                next_response.get('_links', {}).get('next', {}).get('href'))
+            next_url = next_url[4:] if next_url else None  # discard '/api'
+
+        return response
+
+    def get_ontap_version(self, cached=True):
+        """Get the current Data ONTAP version."""
+
+        if cached:
+            return self.connection.get_ontap_version()
+
+        query = {
+            'fields': 'version'
+        }
+        response = self.send_request('/cluster/nodes', 'get', query=query)
+        records = response.get('records')[0]
+
+        return {
+            'version': records['version']['full'],
+            'version-tuple': (records['version']['generation'],
+                              records['version']['major'],
+                              records['version']['minor']),
+        }
diff --git a/manila/share/drivers/netapp/dataontap/client/rest_api.py b/manila/share/drivers/netapp/dataontap/client/rest_api.py
new file mode 100644
index 0000000000..6964a57062
--- /dev/null
+++ b/manila/share/drivers/netapp/dataontap/client/rest_api.py
@@ -0,0 +1,274 @@
+# Copyright 2023 NetApp, Inc. All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+"""
+NetApp API for REST Data ONTAP.
+
+Contains classes required to issue REST API calls to Data ONTAP.
+"""
+
+import re
+
+from oslo_log import log
+from oslo_serialization import jsonutils
+import requests
+from requests.adapters import HTTPAdapter
+from requests import auth
+from requests.packages.urllib3.util import retry
+
+from manila.share.drivers.netapp.dataontap.client import api
+from manila.share.drivers.netapp import utils
+
+
+LOG = log.getLogger(__name__)
+
+ESIS_CLONE_NOT_LICENSED = '14956'
+
+
+class NaRetryableError(api.NaApiError):
+    def __str__(self, *args, **kwargs):
+        return 'NetApp API failed. Try again. Reason - %s:%s' % (
+            self.code, self.message)
+
+
+class RestNaServer(object):
+
+    TRANSPORT_TYPE_HTTP = 'http'
+    TRANSPORT_TYPE_HTTPS = 'https'
+    HTTP_PORT = '80'
+    HTTPS_PORT = '443'
+    TUNNELING_HEADER_KEY = "X-Dot-SVM-Name"
+
+    def __init__(self, host, transport_type=TRANSPORT_TYPE_HTTP,
+                 ssl_cert_path=None, username=None, password=None, port=None,
+                 trace=False, api_trace_pattern=utils.API_TRACE_PATTERN):
+        self._host = host
+        self.set_transport_type(transport_type)
+        self.set_port(port=port)
+        self._username = username
+        self._password = password
+        self._trace = trace
+        self._api_trace_pattern = api_trace_pattern
+        self._timeout = None
+
+        if ssl_cert_path is not None:
+            self._ssl_verify = ssl_cert_path
+        else:
+            # Note(felipe_rodrigues): it will verify with the mozila CA roots,
+            # given by certifi package.
+            self._ssl_verify = True
+
+        LOG.debug('Using REST with NetApp controller: %s', self._host)
+
+    def set_transport_type(self, transport_type):
+        """Set the transport type protocol for API.
+
+        Supports http and https transport types.
+        """
+        if transport_type is None or transport_type.lower() not in (
+                RestNaServer.TRANSPORT_TYPE_HTTP,
+                RestNaServer.TRANSPORT_TYPE_HTTPS):
+            raise ValueError('Unsupported transport type')
+        self._protocol = transport_type.lower()
+
+    def get_transport_type(self):
+        """Get the transport type protocol."""
+        return self._protocol
+
+    def set_api_version(self, major, minor):
+        """Set the API version."""
+        try:
+            self._api_major_version = int(major)
+            self._api_minor_version = int(minor)
+            self._api_version = str(major) + "." + str(minor)
+        except ValueError:
+            raise ValueError('Major and minor versions must be integers')
+
+    def get_api_version(self):
+        """Gets the API version tuple."""
+        if hasattr(self, '_api_version'):
+            return (self._api_major_version, self._api_minor_version)
+        return None
+
+    def set_ontap_version(self, ontap_version):
+        """Set the ONTAP version."""
+        self._ontap_version = ontap_version
+
+    def get_ontap_version(self):
+        """Gets the ONTAP version."""
+        if hasattr(self, '_ontap_version'):
+            return self._ontap_version
+        return None
+
+    def set_port(self, port=None):
+        """Set the ONTAP port, if not informed, set with default one."""
+        if port is None and self._protocol == RestNaServer.TRANSPORT_TYPE_HTTP:
+            self._port = RestNaServer.HTTP_PORT
+        elif port is None:
+            self._port = RestNaServer.HTTPS_PORT
+        else:
+            try:
+                int(port)
+            except ValueError:
+                raise ValueError('Port must be integer')
+            self._port = str(port)
+
+    def get_port(self):
+        """Get the server communication port."""
+        return self._port
+
+    def set_timeout(self, seconds):
+        """Sets the timeout in seconds."""
+        try:
+            self._timeout = int(seconds)
+        except ValueError:
+            raise ValueError('timeout in seconds must be integer')
+
+    def get_timeout(self):
+        """Gets the timeout in seconds if set."""
+        return self._timeout
+
+    def set_vserver(self, vserver):
+        """Set the vserver to use if tunneling gets enabled."""
+        self._vserver = vserver
+
+    def get_vserver(self):
+        """Get the vserver to use in tunneling."""
+        return self._vserver
+
+    def __str__(self):
+        """Gets a representation of the client."""
+        return "server: %s" % (self._host)
+
+    def _get_request_method(self, method, session):
+        """Returns the request method to be used in the REST call."""
+
+        request_methods = {
+            'post': session.post,
+            'get': session.get,
+            'put': session.put,
+            'delete': session.delete,
+            'patch': session.patch,
+        }
+        return request_methods[method]
+
+    def _add_query_params_to_url(self, url, query):
+        """Populates the URL with specified filters."""
+        filters = '&'.join([f"{k}={v}" for k, v in query.items()])
+        url += "?" + filters
+        return url
+
+    def _get_base_url(self):
+        """Get the base URL for REST requests."""
+        host = self._host
+        if ':' in host:
+            host = '[%s]' % host
+        return f'{self._protocol}://{host}:{self._port}/api'
+
+    def _build_session(self, headers):
+        """Builds a session in the client."""
+        self._session = requests.Session()
+
+        # NOTE(felipe_rodrigues): request resilient of temporary network
+        # failures (like name resolution failure), retrying until 5 times.
+        max_retries = retry.Retry(total=5, connect=5, read=2, backoff_factor=1)
+        adapter = HTTPAdapter(max_retries=max_retries)
+        self._session.mount('%s://' % self._protocol, adapter)
+
+        self._session.auth = self._create_basic_auth_handler()
+        self._session.verify = self._ssl_verify
+        self._session.headers = headers
+
+    def _build_headers(self, enable_tunneling):
+        """Build and return headers for a REST request."""
+        headers = {
+            "Accept": "application/json",
+            "Content-Type": "application/json"
+        }
+        if enable_tunneling:
+            headers[RestNaServer.TUNNELING_HEADER_KEY] = self.get_vserver()
+
+        return headers
+
+    def _create_basic_auth_handler(self):
+        """Creates and returns a basic HTTP auth handler."""
+        return auth.HTTPBasicAuth(self._username, self._password)
+
+    def send_http_request(self, method, url, body, headers):
+        """Invoke the API on the server."""
+        data = jsonutils.dumps(body) if body else {}
+
+        self._build_session(headers)
+        request_method = self._get_request_method(method, self._session)
+
+        api_name_matches_regex = (re.match(self._api_trace_pattern, url)
+                                  is not None)
+        if self._trace and api_name_matches_regex:
+            svm = headers.get(RestNaServer.TUNNELING_HEADER_KEY)
+            message = ("Request: %(method)s Header=%(header)s %(url)s "
+                       "Body=%(body)s")
+            msg_args = {
+                "method": method.upper(),
+                "url": url,
+                "body": body,
+                "header": ({RestNaServer.TUNNELING_HEADER_KEY: svm}
+                           if svm else {}),
+            }
+            LOG.debug(message, msg_args)
+
+        try:
+            if self._timeout is not None:
+                response = request_method(
+                    url, data=data, timeout=self._timeout)
+            else:
+                response = request_method(url, data=data)
+        except requests.HTTPError as e:
+            raise api.NaApiError(e.errno, e.strerror)
+        except Exception as e:
+            raise api.NaApiError(message=e)
+
+        code = response.status_code
+        res = jsonutils.loads(response.content) if response.content else {}
+
+        if self._trace and api_name_matches_regex:
+            message = "Response: %(code)s Body=%(body)s"
+            msg_args = {
+                "code": code,
+                "body": res
+            }
+            LOG.debug(message, msg_args)
+
+        return code, res
+
+    def invoke_successfully(self, action_url, method, body=None, query=None,
+                            enable_tunneling=False):
+        """Invokes REST API and checks execution status as success."""
+        headers = self._build_headers(enable_tunneling)
+        if query:
+            action_url = self._add_query_params_to_url(action_url, query)
+        url = self._get_base_url() + action_url
+        code, response = self.send_http_request(method, url, body, headers)
+
+        if not response.get('error'):
+            return code, response
+
+        result_error = response.get('error')
+        code = result_error.get('code') or 'ESTATUSFAILED'
+        # TODO(felipe_rodrigues): add the correct code number for REST not
+        #  licensed clone error.
+        if code == ESIS_CLONE_NOT_LICENSED:
+            msg = 'Clone operation failed: FlexClone not licensed.'
+        else:
+            msg = (result_error.get('message')
+                   or 'Execution failed due to unknown reason')
+        raise api.NaApiError(code, msg)
diff --git a/manila/share/drivers/netapp/dataontap/cluster_mode/data_motion.py b/manila/share/drivers/netapp/dataontap/cluster_mode/data_motion.py
index 3fe9f26713..c6fd0f0495 100644
--- a/manila/share/drivers/netapp/dataontap/cluster_mode/data_motion.py
+++ b/manila/share/drivers/netapp/dataontap/cluster_mode/data_motion.py
@@ -30,6 +30,7 @@ from manila.share import configuration
 from manila.share import driver
 from manila.share.drivers.netapp.dataontap.client import api as netapp_api
 from manila.share.drivers.netapp.dataontap.client import client_cmode
+from manila.share.drivers.netapp.dataontap.client import client_cmode_rest
 from manila.share.drivers.netapp import options as na_opts
 from manila.share.drivers.netapp import utils as na_utils
 from manila.share import utils as share_utils
@@ -72,15 +73,26 @@ def get_backend_configuration(backend_name):
 
 def get_client_for_backend(backend_name, vserver_name=None):
     config = get_backend_configuration(backend_name)
-    client = client_cmode.NetAppCmodeClient(
-        transport_type=config.netapp_transport_type,
-        ssl_cert_path=config.netapp_ssl_cert_path,
-        username=config.netapp_login,
-        password=config.netapp_password,
-        hostname=config.netapp_server_hostname,
-        port=config.netapp_server_port,
-        vserver=vserver_name or config.netapp_vserver,
-        trace=na_utils.TRACE_API)
+    if config.netapp_use_legacy_client:
+        client = client_cmode.NetAppCmodeClient(
+            transport_type=config.netapp_transport_type,
+            ssl_cert_path=config.netapp_ssl_cert_path,
+            username=config.netapp_login,
+            password=config.netapp_password,
+            hostname=config.netapp_server_hostname,
+            port=config.netapp_server_port,
+            vserver=vserver_name or config.netapp_vserver,
+            trace=na_utils.TRACE_API)
+    else:
+        client = client_cmode_rest.NetAppRestClient(
+            transport_type=config.netapp_transport_type,
+            ssl_cert_path=config.netapp_ssl_cert_path,
+            username=config.netapp_login,
+            password=config.netapp_password,
+            hostname=config.netapp_server_hostname,
+            port=config.netapp_server_port,
+            vserver=vserver_name or config.netapp_vserver,
+            trace=na_utils.TRACE_API)
 
     return client
 
diff --git a/manila/share/drivers/netapp/dataontap/cluster_mode/lib_base.py b/manila/share/drivers/netapp/dataontap/cluster_mode/lib_base.py
index bb25fb4462..0167fa84c1 100644
--- a/manila/share/drivers/netapp/dataontap/cluster_mode/lib_base.py
+++ b/manila/share/drivers/netapp/dataontap/cluster_mode/lib_base.py
@@ -40,6 +40,7 @@ from manila.i18n import _
 from manila.message import api as message_api
 from manila.share.drivers.netapp.dataontap.client import api as netapp_api
 from manila.share.drivers.netapp.dataontap.client import client_cmode
+from manila.share.drivers.netapp.dataontap.client import client_cmode_rest
 from manila.share.drivers.netapp.dataontap.cluster_mode import data_motion
 from manila.share.drivers.netapp.dataontap.cluster_mode import performance
 from manila.share.drivers.netapp.dataontap.protocols import cifs_cmode
@@ -215,20 +216,32 @@ class NetAppCmodeFileStorageLibrary(object):
     @na_utils.trace
     def _get_api_client(self, vserver=None):
 
-        # Use cached value to prevent calls to system-get-ontapi-version.
+        # Use cached value to prevent redo calls during client initialization.
         client = self._clients.get(vserver)
 
         if not client:
-            client = client_cmode.NetAppCmodeClient(
-                transport_type=self.configuration.netapp_transport_type,
-                ssl_cert_path=self.configuration.netapp_ssl_cert_path,
-                username=self.configuration.netapp_login,
-                password=self.configuration.netapp_password,
-                hostname=self.configuration.netapp_server_hostname,
-                port=self.configuration.netapp_server_port,
-                vserver=vserver,
-                trace=na_utils.TRACE_API,
-                api_trace_pattern=na_utils.API_TRACE_PATTERN)
+            if self.configuration.netapp_use_legacy_client:
+                client = client_cmode.NetAppCmodeClient(
+                    transport_type=self.configuration.netapp_transport_type,
+                    ssl_cert_path=self.configuration.netapp_ssl_cert_path,
+                    username=self.configuration.netapp_login,
+                    password=self.configuration.netapp_password,
+                    hostname=self.configuration.netapp_server_hostname,
+                    port=self.configuration.netapp_server_port,
+                    vserver=vserver,
+                    trace=na_utils.TRACE_API,
+                    api_trace_pattern=na_utils.API_TRACE_PATTERN)
+            else:
+                client = client_cmode_rest.NetAppRestClient(
+                    transport_type=self.configuration.netapp_transport_type,
+                    ssl_cert_path=self.configuration.netapp_ssl_cert_path,
+                    username=self.configuration.netapp_login,
+                    password=self.configuration.netapp_password,
+                    hostname=self.configuration.netapp_server_hostname,
+                    port=self.configuration.netapp_server_port,
+                    vserver=vserver,
+                    trace=na_utils.TRACE_API,
+                    api_trace_pattern=na_utils.API_TRACE_PATTERN)
             self._clients[vserver] = client
 
         return client
diff --git a/manila/share/drivers/netapp/options.py b/manila/share/drivers/netapp/options.py
index 4b5a08443b..f9f1e82ff4 100644
--- a/manila/share/drivers/netapp/options.py
+++ b/manila/share/drivers/netapp/options.py
@@ -37,7 +37,15 @@ netapp_connection_opts = [
     cfg.PortOpt('netapp_server_port',
                 help=('The TCP port to use for communication with the storage '
                       'system or proxy server. If not specified, Data ONTAP '
-                      'drivers will use 80 for HTTP and 443 for HTTPS.')), ]
+                      'drivers will use 80 for HTTP and 443 for HTTPS.')),
+    cfg.BoolOpt('netapp_use_legacy_client',
+                default=True,
+                help=('The ONTAP client used for retrieving and modifying '
+                      'data on the storage. The legacy client relies mostly '
+                      'on ZAPI calls, only using REST calls for SVM migrate '
+                      'feature. If set to False, the new REST client is used, '
+                      'which runs REST calls if supported, otherwise falls '
+                      'back to the equivalent ZAPI call.')), ]
 
 netapp_transport_opts = [
     cfg.StrOpt('netapp_transport_type',
diff --git a/manila/share/drivers/netapp/utils.py b/manila/share/drivers/netapp/utils.py
index 167e739283..d9b7732ffd 100644
--- a/manila/share/drivers/netapp/utils.py
+++ b/manila/share/drivers/netapp/utils.py
@@ -52,6 +52,10 @@ FLEXVOL_STYLE_EXTENDED = 'flexvol'
 FLEXGROUP_DEFAULT_POOL_NAME = 'flexgroup_auto'
 
 
+class NetAppDriverException(exception.ShareBackendException):
+    message = _("NetApp Manila Driver exception.")
+
+
 def validate_driver_instantiation(**kwargs):
     """Checks if a driver is instantiated other than by the unified driver.
 
diff --git a/manila/tests/share/drivers/netapp/dataontap/client/fakes.py b/manila/tests/share/drivers/netapp/dataontap/client/fakes.py
index 5684959287..b4bc264ecf 100644
--- a/manila/tests/share/drivers/netapp/dataontap/client/fakes.py
+++ b/manila/tests/share/drivers/netapp/dataontap/client/fakes.py
@@ -30,6 +30,7 @@ CONNECTION_INFO = {
     'api_trace_pattern': '(.*)',
 }
 
+FAKE_UUID = 'b32bab78-82be-11ec-a8a3-0242ac120002'
 CLUSTER_NAME = 'fake_cluster'
 REMOTE_CLUSTER_NAME = 'fake_cluster_2'
 CLUSTER_ADDRESS_1 = 'fake_cluster_address'
@@ -48,6 +49,7 @@ NFS_VERSIONS = ['nfs3', 'nfs4.0']
 ROOT_AGGREGATE_NAMES = ('root_aggr1', 'root_aggr2')
 ROOT_VOLUME_AGGREGATE_NAME = 'fake_root_aggr'
 ROOT_VOLUME_NAME = 'fake_root_volume'
+VOLUME_NAMES = ('volume1', 'volume2')
 SHARE_AGGREGATE_NAME = 'fake_aggr1'
 SHARE_AGGREGATE_NAMES = ('fake_aggr1', 'fake_aggr2')
 SHARE_AGGREGATE_RAID_TYPES = ('raid4', 'raid_dp')
@@ -3031,6 +3033,7 @@ FAKE_ACTION_URL = '/endpoint'
 FAKE_BASE_URL = '10.0.0.3/api'
 FAKE_HTTP_BODY = {'fake_key': 'fake_value'}
 FAKE_HTTP_QUERY = {'type': 'fake_type'}
+FAKE_FORMATTED_HTTP_QUERY = "?type=fake_type"
 FAKE_HTTP_HEADER = {"fake_header_key": "fake_header_value"}
 FAKE_URL_PARAMS = {"fake_url_key": "fake_url_value_to_be_concatenated"}
 
@@ -3237,3 +3240,175 @@ JOB_GET_STATE_NOT_UNIQUE_RESPONSE = etree.XML("""
 """ % {
     'state': JOB_STATE,
 })
+
+NO_RECORDS_RESPONSE_REST = {
+    "records": [],
+    "num_records": 0,
+    "_links": {
+        "self": {
+            "href": "/api/cluster/nodes"
+        }
+    }
+}
+
+ERROR_RESPONSE_REST = {
+    "error": {
+        "code": 1100,
+        "message": "fake error",
+    }
+}
+
+GET_VERSION_RESPONSE_REST = {
+    "records": [
+        {
+            "version": {
+                "generation": "9",
+                "minor": "11",
+                "major": "1",
+                "full": "NetApp Release 9.11.1: Sun Nov 05 18:20:57 UTC 2017"
+            }
+        }
+    ],
+    "_links": {
+        "next": {
+            "href": "/api/resourcelink"
+        },
+        "self": {
+            "href": "/api/resourcelink"
+        }
+    },
+    "num_records": 0
+}
+
+VOLUME_GET_ITER_RESPONSE_LIST_REST = [
+    {
+        "uuid": "2407b637-119c-11ec-a4fb-00a0b89c9a78",
+        "name": VOLUME_NAMES[0],
+        "state": "online",
+        "style": "flexvol",
+        "is_svm_root": False,
+        "type": "rw",
+        "error_state": {
+            "is_inconsistent": False
+        },
+        "_links": {
+            "self": {
+                "href": "/api/storage/volumes/2407b637-119c-11ec-a4fb"
+            }
+        }
+    },
+    {
+        "uuid": "2c190609-d51c-11eb-b83a",
+        "name": VOLUME_NAMES[1],
+        "state": "online",
+        "style": "flexvol",
+        "is_svm_root": False,
+        "type": "rw",
+        "error_state": {
+            "is_inconsistent": False
+        },
+        "_links": {
+            "self": {
+                "href": "/api/storage/volumes/2c190609-d51c-11eb-b83a"
+            }
+        }
+    }
+]
+
+VOLUME_GET_ITER_RESPONSE_REST_PAGE = {
+    "records": [
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+    ],
+    "num_records": 10,
+    "_links": {
+        "self": {
+            "href": "/api/storage/volumes?fields=name&max_records=2"
+        },
+        "next": {
+            "href": "/api/storage/volumes?"
+            f"start.uuid={VOLUME_GET_ITER_RESPONSE_LIST_REST[0]['uuid']}"
+            "&fields=name&max_records=2"
+        }
+    }
+}
+
+VOLUME_GET_ITER_RESPONSE_REST_LAST_PAGE = {
+    "records": [
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+        VOLUME_GET_ITER_RESPONSE_LIST_REST[0],
+    ],
+    "num_records": 8,
+}
+
+INVALID_GET_ITER_RESPONSE_NO_RECORDS_REST = {
+    "num_records": 1,
+}
+
+INVALID_GET_ITER_RESPONSE_NO_NUM_RECORDS_REST = {
+    "records": [],
+}
+
+JOB_RESPONSE_REST = {
+    "job": {
+        "uuid": "uuid-12345",
+        "_links": {
+            "self": {
+                "href": "/api/cluster/jobs/uuid-12345"
+            }
+        }
+    }
+}
+
+JOB_SUCCESSFUL_REST = {
+    "uuid": FAKE_UUID,
+    "description": "Fake description",
+    "state": "success",
+    "message": "success",
+    "code": 0,
+    "start_time": "2022-02-18T20:08:03+00:00",
+    "end_time": "2022-02-18T20:08:04+00:00",
+}
+
+JOB_RUNNING_REST = {
+    "uuid": FAKE_UUID,
+    "description": "Fake description",
+    "state": "running",
+    "message": "running",
+    "code": 0,
+}
+
+JOB_ERROR_REST = {
+    "uuid": FAKE_UUID,
+    "description": "Fake description",
+    "state": "failure",
+    "message": "failure",
+    "code": 4,
+    "error": {
+        "target": "uuid",
+        "arguments": [
+            {
+                "message": "string",
+                "code": "string"
+            }
+        ],
+        "message": "entry doesn't exist",
+        "code": "4"
+    },
+    "start_time": "2022-02-18T20:08:03+00:00",
+    "end_time": "2022-02-18T20:08:04+00:00",
+}
diff --git a/manila/tests/share/drivers/netapp/dataontap/client/test_client_cmode_rest.py b/manila/tests/share/drivers/netapp/dataontap/client/test_client_cmode_rest.py
new file mode 100644
index 0000000000..c70266cdbc
--- /dev/null
+++ b/manila/tests/share/drivers/netapp/dataontap/client/test_client_cmode_rest.py
@@ -0,0 +1,349 @@
+# Copyright (c) 2023 NetApp, Inc. All rights reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import copy
+from unittest import mock
+
+import ddt
+from oslo_log import log
+
+from manila.share.drivers.netapp.dataontap.client import client_cmode
+from manila.share.drivers.netapp.dataontap.client import client_cmode_rest
+from manila.share.drivers.netapp import utils as netapp_utils
+from manila import test
+from manila.tests.share.drivers.netapp.dataontap.client import fakes as fake
+
+
+@ddt.ddt
+class NetAppRestCmodeClientTestCase(test.TestCase):
+
+    def setUp(self):
+        super(NetAppRestCmodeClientTestCase, self).setUp()
+
+        # Mock loggers as themselves to allow logger arg validation
+        mock_logger = log.getLogger('mock_logger')
+        self.mock_object(client_cmode_rest.LOG,
+                         'error',
+                         mock.Mock(side_effect=mock_logger.error))
+        self.mock_object(client_cmode_rest.LOG,
+                         'warning',
+                         mock.Mock(side_effect=mock_logger.warning))
+        self.mock_object(client_cmode_rest.LOG,
+                         'debug',
+                         mock.Mock(side_effect=mock_logger.debug))
+        self.mock_object(client_cmode.NetAppCmodeClient,
+                         'get_ontapi_version',
+                         mock.Mock(return_value=(1, 20)))
+        # store the original reference so we can call it later in
+        # test_get_ontap_version
+        self.original_get_ontap_version = (
+            client_cmode_rest.NetAppRestClient.get_ontap_version)
+        self.mock_object(client_cmode_rest.NetAppRestClient,
+                         'get_ontap_version',
+                         mock.Mock(return_value={
+                             'version-tuple': (9, 11, 1),
+                             'version': fake.VERSION,
+                         }))
+        self.mock_object(client_cmode.NetAppCmodeClient,
+                         'get_system_version',
+                         mock.Mock(return_value={
+                             'version-tuple': (9, 10, 1),
+                             'version': fake.VERSION,
+                         }))
+
+        self.client = client_cmode_rest.NetAppRestClient(
+            **fake.CONNECTION_INFO)
+        self.client.connection = mock.MagicMock()
+
+        self.vserver_client = client_cmode.NetAppCmodeClient(
+            **fake.CONNECTION_INFO)
+        self.vserver_client.set_vserver(fake.VSERVER_NAME)
+        self.vserver_client.connection = mock.MagicMock()
+
+    def test_send_request(self):
+        expected = 'fake_response'
+        mock_get_records = self.mock_object(
+            self.client, 'get_records',
+            mock.Mock(return_value=expected))
+
+        res = self.client.send_request(
+            fake.FAKE_ACTION_URL, 'get',
+            body=fake.FAKE_HTTP_BODY,
+            query=fake.FAKE_HTTP_QUERY, enable_tunneling=False)
+
+        self.assertEqual(expected, res)
+        mock_get_records.assert_called_once_with(
+            fake.FAKE_ACTION_URL,
+            fake.FAKE_HTTP_QUERY, False, 10000)
+
+    def test_send_request_post(self):
+        expected = (201, 'fake_response')
+        mock_invoke = self.mock_object(
+            self.client.connection, 'invoke_successfully',
+            mock.Mock(return_value=expected))
+
+        res = self.client.send_request(
+            fake.FAKE_ACTION_URL, 'post',
+            body=fake.FAKE_HTTP_BODY,
+            query=fake.FAKE_HTTP_QUERY, enable_tunneling=False)
+
+        self.assertEqual(expected[1], res)
+        mock_invoke.assert_called_once_with(
+            fake.FAKE_ACTION_URL, 'post',
+            body=fake.FAKE_HTTP_BODY,
+            query=fake.FAKE_HTTP_QUERY, enable_tunneling=False)
+
+    def test_send_request_wait(self):
+        expected = (202, fake.JOB_RESPONSE_REST)
+        mock_invoke = self.mock_object(
+            self.client.connection, 'invoke_successfully',
+            mock.Mock(return_value=expected))
+
+        mock_wait = self.mock_object(
+            self.client, '_wait_job_result',
+            mock.Mock(return_value=expected[1]))
+
+        res = self.client.send_request(
+            fake.FAKE_ACTION_URL, 'post',
+            body=fake.FAKE_HTTP_BODY,
+            query=fake.FAKE_HTTP_QUERY, enable_tunneling=False)
+
+        self.assertEqual(expected[1], res)
+        mock_invoke.assert_called_once_with(
+            fake.FAKE_ACTION_URL, 'post',
+            body=fake.FAKE_HTTP_BODY,
+            query=fake.FAKE_HTTP_QUERY, enable_tunneling=False)
+        mock_wait.assert_called_once_with(
+            expected[1]['job']['_links']['self']['href'][4:])
+
+    @ddt.data(True, False)
+    def test_get_records(self, enable_tunneling):
+        api_responses = [
+            (200, fake.VOLUME_GET_ITER_RESPONSE_REST_PAGE),
+            (200, fake.VOLUME_GET_ITER_RESPONSE_REST_PAGE),
+            (200, fake.VOLUME_GET_ITER_RESPONSE_REST_LAST_PAGE),
+        ]
+
+        mock_invoke = self.mock_object(
+            self.client.connection, 'invoke_successfully',
+            mock.Mock(side_effect=copy.deepcopy(api_responses)))
+
+        query = {
+            'fields': 'name'
+        }
+
+        result = self.client.get_records(
+            '/storage/volumes/', query=query,
+            enable_tunneling=enable_tunneling,
+            max_page_length=10)
+
+        num_records = result['num_records']
+        self.assertEqual(28, num_records)
+        self.assertEqual(28, len(result['records']))
+
+        expected_records = []
+        expected_records.extend(api_responses[0][1]['records'])
+        expected_records.extend(api_responses[1][1]['records'])
+        expected_records.extend(api_responses[2][1]['records'])
+
+        self.assertEqual(expected_records, result['records'])
+
+        next_tag = result.get('next')
+        self.assertIsNone(next_tag)
+
+        expected_query = copy.deepcopy(query)
+        expected_query['max_records'] = 10
+
+        next_url_1 = api_responses[0][1]['_links']['next']['href'][4:]
+        next_url_2 = api_responses[1][1]['_links']['next']['href'][4:]
+
+        mock_invoke.assert_has_calls([
+            mock.call('/storage/volumes/', 'get', query=expected_query,
+                      enable_tunneling=enable_tunneling),
+            mock.call(next_url_1, 'get', query=None,
+                      enable_tunneling=enable_tunneling),
+            mock.call(next_url_2, 'get', query=None,
+                      enable_tunneling=enable_tunneling),
+        ])
+
+    def test_get_records_single_page(self):
+
+        api_response = (
+            200, fake.VOLUME_GET_ITER_RESPONSE_REST_LAST_PAGE)
+        mock_invoke = self.mock_object(self.client.connection,
+                                       'invoke_successfully',
+                                       mock.Mock(return_value=api_response))
+
+        query = {
+            'fields': 'name'
+        }
+
+        result = self.client.get_records(
+            '/storage/volumes/', query=query, max_page_length=10)
+
+        num_records = result['num_records']
+        self.assertEqual(8, num_records)
+        self.assertEqual(8, len(result['records']))
+
+        next_tag = result.get('next')
+        self.assertIsNone(next_tag)
+
+        args = copy.deepcopy(query)
+        args['max_records'] = 10
+
+        mock_invoke.assert_has_calls([
+            mock.call('/storage/volumes/', 'get', query=args,
+                      enable_tunneling=True),
+        ])
+
+    def test_get_records_not_found(self):
+
+        api_response = (200, fake.NO_RECORDS_RESPONSE_REST)
+        mock_invoke = self.mock_object(self.client.connection,
+                                       'invoke_successfully',
+                                       mock.Mock(return_value=api_response))
+
+        result = self.client.get_records('/storage/volumes/')
+
+        num_records = result['num_records']
+        self.assertEqual(0, num_records)
+        self.assertEqual(0, len(result['records']))
+
+        args = {
+            'max_records': client_cmode_rest.DEFAULT_MAX_PAGE_LENGTH
+        }
+
+        mock_invoke.assert_has_calls([
+            mock.call('/storage/volumes/', 'get', query=args,
+                      enable_tunneling=True),
+        ])
+
+    def test_get_records_timeout(self):
+        # To simulate timeout, max_records is 30, but the API returns less
+        # records and fill the 'next url' pointing to the next page.
+        max_records = 30
+        api_responses = [
+            (200, fake.VOLUME_GET_ITER_RESPONSE_REST_PAGE),
+            (200, fake.VOLUME_GET_ITER_RESPONSE_REST_PAGE),
+            (200, fake.VOLUME_GET_ITER_RESPONSE_REST_LAST_PAGE),
+        ]
+
+        mock_invoke = self.mock_object(
+            self.client.connection, 'invoke_successfully',
+            mock.Mock(side_effect=copy.deepcopy(api_responses)))
+
+        query = {
+            'fields': 'name'
+        }
+
+        result = self.client.get_records(
+            '/storage/volumes/', query=query, max_page_length=max_records)
+
+        num_records = result['num_records']
+        self.assertEqual(28, num_records)
+        self.assertEqual(28, len(result['records']))
+
+        expected_records = []
+        expected_records.extend(api_responses[0][1]['records'])
+        expected_records.extend(api_responses[1][1]['records'])
+        expected_records.extend(api_responses[2][1]['records'])
+
+        self.assertEqual(expected_records, result['records'])
+
+        next_tag = result.get('next', None)
+        self.assertIsNone(next_tag)
+
+        args1 = copy.deepcopy(query)
+        args1['max_records'] = max_records
+
+        next_url_1 = api_responses[0][1]['_links']['next']['href'][4:]
+        next_url_2 = api_responses[1][1]['_links']['next']['href'][4:]
+
+        mock_invoke.assert_has_calls([
+            mock.call('/storage/volumes/', 'get', query=args1,
+                      enable_tunneling=True),
+            mock.call(next_url_1, 'get', query=None, enable_tunneling=True),
+            mock.call(next_url_2, 'get', query=None, enable_tunneling=True),
+        ])
+
+    def test__getattr__(self):
+        # NOTE(nahimsouza): get_ontapi_version is implemented only in ZAPI
+        # client, therefore, it will call __getattr__
+        self.client.get_ontapi_version()
+
+    @ddt.data(True, False)
+    def test_get_ontap_version(self, cached):
+        self.client.get_ontap_version = (
+            self.original_get_ontap_version)
+        api_response = {
+            'records': [
+                {
+                    'version': {
+                        'generation': 9,
+                        'major': 11,
+                        'minor': 1,
+                        'full': 'NetApp Release 9.11.1'
+                    }
+                }]
+
+        }
+        return_mock = {
+            'version': 'NetApp Release 9.11.1',
+            'version-tuple': (9, 11, 1)
+        }
+        mock_connect = self.mock_object(self.client.connection,
+                                        'get_ontap_version',
+                                        mock.Mock(return_value=return_mock))
+        mock_send_request = self.mock_object(
+            self.client,
+            'send_request',
+            mock.Mock(return_value=api_response))
+
+        result = self.client.get_ontap_version(self=self.client, cached=cached)
+
+        if cached:
+            mock_connect.assert_called_once()
+        else:
+            mock_send_request.assert_called_once_with(
+                '/cluster/nodes', 'get', query={'fields': 'version'})
+
+        self.assertEqual(return_mock, result)
+
+    def test__wait_job_result(self):
+        response = fake.JOB_SUCCESSFUL_REST
+        self.mock_object(self.client,
+                         'send_request',
+                         mock.Mock(return_value=response))
+        result = self.client._wait_job_result(
+            f'/cluster/jobs/{fake.FAKE_UUID}')
+        self.assertEqual(response, result)
+
+    def test__wait_job_result_failure(self):
+        response = fake.JOB_ERROR_REST
+        self.mock_object(self.client,
+                         'send_request',
+                         mock.Mock(return_value=response))
+        self.assertRaises(netapp_utils.NetAppDriverException,
+                          self.client._wait_job_result,
+                          f'/cluster/jobs/{fake.FAKE_UUID}')
+
+    def test__wait_job_result_timeout(self):
+        response = fake.JOB_RUNNING_REST
+        self.client.async_rest_timeout = 2
+        self.mock_object(self.client,
+                         'send_request',
+                         mock.Mock(return_value=response))
+        self.assertRaises(netapp_utils.NetAppDriverException,
+                          self.client._wait_job_result,
+                          f'/cluster/jobs/{fake.FAKE_UUID}')
diff --git a/manila/tests/share/drivers/netapp/dataontap/client/test_rest_api.py b/manila/tests/share/drivers/netapp/dataontap/client/test_rest_api.py
new file mode 100644
index 0000000000..1d09d5f53d
--- /dev/null
+++ b/manila/tests/share/drivers/netapp/dataontap/client/test_rest_api.py
@@ -0,0 +1,341 @@
+# Copyright 2022 NetApp, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+"""
+Tests for NetApp REST API layer
+"""
+
+from unittest import mock
+
+import ddt
+from oslo_serialization import jsonutils
+import requests
+from requests import auth
+
+from manila.share.drivers.netapp.dataontap.client import api as legacy_api
+from manila.share.drivers.netapp.dataontap.client import rest_api as netapp_api
+from manila import test
+from manila.tests.share.drivers.netapp.dataontap.client import fakes as fake
+
+
+@ddt.ddt
+class NetAppRestApiServerTests(test.TestCase):
+    """Test case for NetApp REST API server methods."""
+    def setUp(self):
+        self.rest_client = netapp_api.RestNaServer('127.0.0.1')
+        super(NetAppRestApiServerTests, self).setUp()
+
+    @ddt.data(None, 'my_cert')
+    def test__init__ssl_verify(self, ssl_cert_path):
+        client = netapp_api.RestNaServer('127.0.0.1',
+                                         ssl_cert_path=ssl_cert_path)
+
+        if ssl_cert_path:
+            self.assertEqual(ssl_cert_path, client._ssl_verify)
+        else:
+            self.assertTrue(client._ssl_verify)
+
+    @ddt.data(None, 'ftp')
+    def test_set_transport_type_value_error(self, transport_type):
+        self.assertRaises(ValueError, self.rest_client.set_transport_type,
+                          transport_type)
+
+    @ddt.data('!&', '80na', '')
+    def test_set_port__value_error(self, port):
+        self.assertRaises(ValueError, self.rest_client.set_port, port)
+
+    @ddt.data(
+        {'port': None, 'protocol': 'http', 'expected_port': '80'},
+        {'port': None, 'protocol': 'https', 'expected_port': '443'},
+        {'port': '111', 'protocol': None, 'expected_port': '111'}
+    )
+    @ddt.unpack
+    def test_set_port(self, port, protocol, expected_port):
+        self.rest_client._protocol = protocol
+
+        self.rest_client.set_port(port=port)
+
+        self.assertEqual(expected_port, self.rest_client._port)
+
+    @ddt.data('!&', '80na', '')
+    def test_set_timeout_value_error(self, timeout):
+        self.assertRaises(ValueError, self.rest_client.set_timeout, timeout)
+
+    @ddt.data({'params': {'major': 1, 'minor': '20a'}},
+              {'params': {'major': '20a', 'minor': 1}},
+              {'params': {'major': '!*', 'minor': '20a'}})
+    @ddt.unpack
+    def test_set_api_version_value_error(self, params):
+        self.assertRaises(ValueError, self.rest_client.set_api_version,
+                          **params)
+
+    def test_set_api_version_valid(self):
+        args = {'major': '20', 'minor': 1}
+
+        self.rest_client.set_api_version(**args)
+
+        self.assertEqual(self.rest_client._api_major_version, 20)
+        self.assertEqual(self.rest_client._api_minor_version, 1)
+        self.assertEqual(self.rest_client._api_version, "20.1")
+
+    def test_invoke_successfully_naapi_error(self):
+        self.mock_object(self.rest_client, '_build_headers')
+        self.mock_object(self.rest_client, '_get_base_url',
+                         mock.Mock(return_value=''))
+        self.mock_object(
+            self.rest_client, 'send_http_request',
+            mock.Mock(return_value=(10, fake.ERROR_RESPONSE_REST)))
+
+        self.assertRaises(legacy_api.NaApiError,
+                          self.rest_client.invoke_successfully,
+                          fake.FAKE_ACTION_URL, 'get')
+
+    @ddt.data(None, {'fields': 'fake_fields'})
+    def test_invoke_successfully(self, query):
+        mock_build_header = self.mock_object(
+            self.rest_client, '_build_headers',
+            mock.Mock(return_value=fake.FAKE_HTTP_HEADER))
+        mock_base = self.mock_object(
+            self.rest_client, '_get_base_url',
+            mock.Mock(return_value=fake.FAKE_BASE_URL))
+        mock_add_query = self.mock_object(
+            self.rest_client, '_add_query_params_to_url',
+            mock.Mock(return_value=fake.FAKE_ACTION_URL))
+        http_code = 200
+        mock_send_http = self.mock_object(
+            self.rest_client, 'send_http_request',
+            mock.Mock(return_value=(http_code, fake.NO_RECORDS_RESPONSE_REST)))
+
+        code, response = self.rest_client.invoke_successfully(
+            fake.FAKE_ACTION_URL, 'get', body=fake.FAKE_HTTP_BODY, query=query,
+            enable_tunneling=True)
+
+        self.assertEqual(response, fake.NO_RECORDS_RESPONSE_REST)
+        self.assertEqual(code, http_code)
+        mock_build_header.assert_called_once_with(True)
+        mock_base.assert_called_once_with()
+        self.assertEqual(bool(query), mock_add_query.called)
+        mock_send_http.assert_called_once_with(
+            'get',
+            fake.FAKE_BASE_URL + fake.FAKE_ACTION_URL, fake.FAKE_HTTP_BODY,
+            fake.FAKE_HTTP_HEADER)
+
+    @ddt.data(
+        {'error': requests.HTTPError(), 'raised': legacy_api.NaApiError},
+        {'error': Exception, 'raised': legacy_api.NaApiError})
+    @ddt.unpack
+    def test_send_http_request_http_error(self, error, raised):
+        self.mock_object(netapp_api, 'LOG')
+        self.mock_object(self.rest_client, '_build_session')
+        self.rest_client._session = mock.Mock()
+        self.mock_object(
+            self.rest_client, '_get_request_method', mock.Mock(
+                return_value=mock.Mock(side_effect=error)))
+
+        self.assertRaises(raised, self.rest_client.send_http_request,
+                          'get', fake.FAKE_ACTION_URL, fake.FAKE_HTTP_BODY,
+                          fake.FAKE_HTTP_HEADER)
+
+    @ddt.data(
+        {
+            'resp_content': fake.NO_RECORDS_RESPONSE_REST,
+            'body': fake.FAKE_HTTP_BODY,
+            'timeout': 10,
+        },
+        {
+            'resp_content': fake.NO_RECORDS_RESPONSE_REST,
+            'body': fake.FAKE_HTTP_BODY,
+            'timeout': None,
+        },
+        {
+            'resp_content': fake.NO_RECORDS_RESPONSE_REST,
+            'body': None,
+            'timeout': None,
+        },
+        {
+            'resp_content': None,
+            'body': None,
+            'timeout': None,
+        }
+    )
+    @ddt.unpack
+    def test_send_http_request(self, resp_content, body, timeout):
+        if timeout:
+            self.rest_client._timeout = timeout
+        self.mock_object(netapp_api, 'LOG')
+        mock_json_dumps = self.mock_object(
+            jsonutils, 'dumps', mock.Mock(return_value='fake_dump_body'))
+        mock_build_session = self.mock_object(
+            self.rest_client, '_build_session')
+        _mock_session = mock.Mock()
+        self.rest_client._session = _mock_session
+        response = mock.Mock()
+        response.content = resp_content
+        response.status_code = 10
+        mock_post = mock.Mock(return_value=response)
+        mock_get_request_method = self.mock_object(
+            self.rest_client, '_get_request_method', mock.Mock(
+                return_value=mock_post))
+        mock_json_loads = self.mock_object(
+            jsonutils, 'loads',
+            mock.Mock(return_value='fake_loads_response'))
+
+        code, res = self.rest_client.send_http_request(
+            'post', fake.FAKE_ACTION_URL, body, fake.FAKE_HTTP_HEADER)
+
+        expected_res = 'fake_loads_response' if resp_content else {}
+        self.assertEqual(expected_res, res)
+        self.assertEqual(10, code)
+        self.assertEqual(bool(body), mock_json_dumps.called)
+        self.assertEqual(bool(resp_content), mock_json_loads.called)
+        mock_build_session.assert_called_once_with(fake.FAKE_HTTP_HEADER)
+        mock_get_request_method.assert_called_once_with('post', _mock_session)
+        expected_data = 'fake_dump_body' if body else {}
+        if timeout:
+            mock_post.assert_called_once_with(
+                fake.FAKE_ACTION_URL, data=expected_data, timeout=timeout)
+        else:
+            mock_post.assert_called_once_with(fake.FAKE_ACTION_URL,
+                                              data=expected_data)
+
+    @ddt.data(
+        {'host': '192.168.1.0', 'port': '80', 'protocol': 'http'},
+        {'host': '0.0.0.0', 'port': '443', 'protocol': 'https'},
+        {'host': '::ffff:8', 'port': '80', 'protocol': 'http'},
+        {'host': 'fdf8:f53b:82e4::53', 'port': '443', 'protocol': 'https'})
+    @ddt.unpack
+    def test__get_base_url(self, host, port, protocol):
+        client = netapp_api.RestNaServer(host, port=port,
+                                         transport_type=protocol)
+        expected_host = f'[{host}]' if ':' in host else host
+        expected_url = '%s://%s:%s/api' % (protocol, expected_host, port)
+
+        url = client._get_base_url()
+
+        self.assertEqual(expected_url, url)
+
+    def test__add_query_params_to_url(self):
+        formatted_url = self.rest_client._add_query_params_to_url(
+            fake.FAKE_ACTION_URL, fake.FAKE_HTTP_QUERY)
+
+        expected_formatted_url = fake.FAKE_ACTION_URL
+        expected_formatted_url += fake.FAKE_FORMATTED_HTTP_QUERY
+        self.assertEqual(expected_formatted_url, formatted_url)
+
+    @ddt.data('post', 'get', 'put', 'delete', 'patch')
+    def test_get_request_method(self, method):
+        _mock_session = mock.Mock()
+        _mock_session.post = mock.Mock()
+        _mock_session.get = mock.Mock()
+        _mock_session.put = mock.Mock()
+        _mock_session.delete = mock.Mock()
+        _mock_session.patch = mock.Mock()
+
+        res = self.rest_client._get_request_method(method, _mock_session)
+
+        expected_method = getattr(_mock_session, method)
+        self.assertEqual(expected_method, res)
+
+    def test__str__(self):
+        fake_host = 'fake_host'
+        client = netapp_api.RestNaServer(fake_host)
+
+        expected_str = "server: %s" % fake_host
+        self.assertEqual(expected_str, str(client))
+
+    def test_get_transport_type(self):
+        expected_protocol = 'fake_protocol'
+        self.rest_client._protocol = expected_protocol
+
+        res = self.rest_client.get_transport_type()
+
+        self.assertEqual(expected_protocol, res)
+
+    @ddt.data(None, ('1', '0'))
+    def test_get_api_version(self, api_version):
+        if api_version:
+            self.rest_client._api_version = str(api_version)
+            (self.rest_client._api_major_version, _) = api_version
+            (_, self.rest_client._api_minor_version) = api_version
+
+        res = self.rest_client.get_api_version()
+
+        self.assertEqual(api_version, res)
+
+    @ddt.data(None, '9.10')
+    def test_get_ontap_version(self, ontap_version):
+        if ontap_version:
+            self.rest_client._ontap_version = ontap_version
+
+        res = self.rest_client.get_ontap_version()
+
+        self.assertEqual(ontap_version, res)
+
+    def test_set_vserver(self):
+        expected_vserver = 'fake_vserver'
+        self.rest_client.set_vserver(expected_vserver)
+
+        self.assertEqual(expected_vserver, self.rest_client._vserver)
+
+    def test_get_vserver(self):
+        expected_vserver = 'fake_vserver'
+        self.rest_client._vserver = expected_vserver
+
+        res = self.rest_client.get_vserver()
+
+        self.assertEqual(expected_vserver, res)
+
+    def test__build_session(self):
+        fake_session = mock.Mock()
+        mock_requests_session = self.mock_object(
+            requests, 'Session', mock.Mock(return_value=fake_session))
+        mock_auth = self.mock_object(
+            self.rest_client, '_create_basic_auth_handler',
+            mock.Mock(return_value='fake_auth'))
+        self.rest_client._ssl_verify = 'fake_ssl'
+
+        self.rest_client._build_session(fake.FAKE_HTTP_HEADER)
+
+        self.assertEqual(fake_session, self.rest_client._session)
+        self.assertEqual('fake_auth', self.rest_client._session.auth)
+        self.assertEqual('fake_ssl', self.rest_client._session.verify)
+        self.assertEqual(fake.FAKE_HTTP_HEADER,
+                         self.rest_client._session.headers)
+        mock_requests_session.assert_called_once_with()
+        mock_auth.assert_called_once_with()
+
+    @ddt.data(True, False)
+    def test__build_headers(self, enable_tunneling):
+        self.rest_client._vserver = fake.VSERVER_NAME
+
+        res = self.rest_client._build_headers(enable_tunneling)
+
+        expected = {
+            "Accept": "application/json",
+            "Content-Type": "application/json"
+        }
+        if enable_tunneling:
+            expected["X-Dot-SVM-Name"] = fake.VSERVER_NAME
+        self.assertEqual(expected, res)
+
+    def test__create_basic_auth_handler(self):
+        username = 'fake_username'
+        password = 'fake_password'
+        client = netapp_api.RestNaServer('10.1.1.1', username=username,
+                                         password=password)
+
+        res = client._create_basic_auth_handler()
+
+        expected = auth.HTTPBasicAuth(username, password)
+        self.assertEqual(expected.__dict__, res.__dict__)
diff --git a/releasenotes/notes/netapp-ontap-rest-api-client-4c83c7b931f950cf.yaml b/releasenotes/notes/netapp-ontap-rest-api-client-4c83c7b931f950cf.yaml
new file mode 100644
index 0000000000..ee164f6f14
--- /dev/null
+++ b/releasenotes/notes/netapp-ontap-rest-api-client-4c83c7b931f950cf.yaml
@@ -0,0 +1,10 @@
+---
+features:
+  - |
+    NetApp driver: it has now the option to request ONTAP operations through
+    REST API. The new option `netapp_use_legacy_client` allows switching
+    between the old ZAPI client approach and new REST client. It is default
+    to `True`, meaning that the drivers will keep working as before using ZAPI
+    operations. If desired, this option can be set to `False` connecting with
+    new REST client that performs REST API operations if it is available,
+    otherwise falls back to ZAPI.