NFS based driver for Quobyte file storage system

Based on the nfs-ganesha approach we're adding a Quobyte file storage driver
using the Quobyte FSAL implementation. This is extended by calls to
the QB Backend API for volume creation/deletion while nfs-ganesha is used
to provide NFS access for the nova guests.

blueprint: quobyte-manila-driver

DocImpact: Adds configs for Quobyte Manila driver: quobyte_api_url,
quobyte_api_ca, quobyte_delete_shares, quobyte_api_username,
quobyte_api_password, quobyte_volume_configuration,
quobyte_default_volume_user, quobyte_default_volume_group
(Additional info on these config params can be found at
manila/share/drivers/quobyte/quobyte.py)

Change-Id: I935cb1f2321fe288d997533387aff01938d1f3d0
This commit is contained in:
Silvan Kaiser 2015-03-04 17:35:56 +01:00
parent 4db796339a
commit c434e3e7d3
9 changed files with 991 additions and 0 deletions

View File

@ -545,3 +545,14 @@ class SopAPIError(Invalid):
class HDFSException(ManilaException): class HDFSException(ManilaException):
message = _("HDFS exception occurred!") message = _("HDFS exception occurred!")
class QBException(ManilaException):
message = _("Quobyte exception occurred: %(msg)s")
class QBRpcException(ManilaException):
"""Quobyte backend specific exception."""
message = _("Quobyte JsonRpc call to backend raised "
"an exception: %(result)s, Quobyte error"
" code %(qbcode)s")

View File

@ -62,6 +62,7 @@ import manila.share.drivers.hp.hp_3par_driver
import manila.share.drivers.huawei.huawei_nas import manila.share.drivers.huawei.huawei_nas
import manila.share.drivers.ibm.gpfs import manila.share.drivers.ibm.gpfs
import manila.share.drivers.netapp.options import manila.share.drivers.netapp.options
import manila.share.drivers.quobyte.quobyte
import manila.share.drivers.service_instance import manila.share.drivers.service_instance
import manila.share.drivers.zfssa.zfssashare import manila.share.drivers.zfssa.zfssashare
import manila.share.manager import manila.share.manager
@ -128,6 +129,7 @@ _global_opt_lists = [
manila.share.drivers.netapp.options.netapp_transport_opts, manila.share.drivers.netapp.options.netapp_transport_opts,
manila.share.drivers.netapp.options.netapp_basicauth_opts, manila.share.drivers.netapp.options.netapp_basicauth_opts,
manila.share.drivers.netapp.options.netapp_provisioning_opts, manila.share.drivers.netapp.options.netapp_provisioning_opts,
manila.share.drivers.quobyte.quobyte.quobyte_manila_share_opts,
manila.share.drivers.service_instance.common_opts, manila.share.drivers.service_instance.common_opts,
manila.share.drivers.service_instance.no_share_servers_handling_mode_opts, manila.share.drivers.service_instance.no_share_servers_handling_mode_opts,
manila.share.drivers.service_instance.share_servers_handling_mode_opts, manila.share.drivers.service_instance.share_servers_handling_mode_opts,

View File

View File

@ -0,0 +1,197 @@
# Copyright (c) 2015 Quobyte Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Quobyte driver helper.
Control Quobyte over its JSON RPC API.
"""
import base64
import httplib
import socket
import ssl
import time
from oslo_log import log
from oslo_serialization import jsonutils
import six
import six.moves.urllib.parse as urlparse
from manila import exception
from manila.i18n import _
from manila.i18n import _LW
LOG = log.getLogger(__name__)
ERROR_ENOENT = 2
CONNECTION_RETRIES = 3
class BasicAuthCredentials(object):
def __init__(self, username, password):
self._username = username
self._password = password
@property
def username(self):
return self._username
def get_authorization_header(self):
auth = base64.standard_b64encode(
'%s:%s' % (self._username, self._password))
return 'BASIC %s' % auth
class HTTPSConnectionWithCaVerification(httplib.HTTPConnection):
"""Verify server cert against a given CA certificate."""
default_port = httplib.HTTPS_PORT
def __init__(self, host, port=None, key_file=None, cert_file=None,
ca_file=None, strict=None,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
httplib.HTTPConnection.__init__(self, host, port, strict, timeout)
self.key_file = key_file
self.cert_file = cert_file
self.ca_file = ca_file
def connect(self):
"""Connect to a host on a given (SSL) port."""
sock = socket.create_connection((self.host, self.port), self.timeout)
if self._tunnel_host:
self.sock = sock
self._tunnel()
self.sock = ssl.wrap_socket(sock, keyfile=self.key_file,
certfile=self.cert_file,
ca_certs=self.ca_file,
cert_reqs=ssl.CERT_REQUIRED)
httplib.__all__.append("HTTPSConnectionWithCaVerification")
class JsonRpc(object):
def __init__(self, url, user_credentials, ca_file=None):
parsedurl = urlparse.urlparse(url)
self._url = parsedurl.geturl()
self._netloc = parsedurl.netloc
self._ca_file = ca_file
if parsedurl.scheme == 'https':
if self._ca_file:
self._connection = HTTPSConnectionWithCaVerification(
self._netloc,
ca_file=self._ca_file.name)
else:
self._connection = httplib.HTTPSConnection(self._netloc)
LOG.warning(_LW(
"Will not verify the server certificate of the API service"
" because the CA certificate is not available."))
else:
self._connection = httplib.HTTPConnection(self._netloc)
self._id = 0
self._fail_fast = True
self._credentials = BasicAuthCredentials(
user_credentials[0], user_credentials[1])
self._require_cert_verify = self._ca_file is not None
self._disabled_cert_verification = False
def call(self, method_name, user_parameters):
parameters = {'retry': 'INFINITELY'} # Backend specific setting
if user_parameters:
parameters.update(user_parameters)
call_body = {'jsonrpc': '2.0',
'method': method_name,
'params': parameters,
'id': six.text_type(self._id)}
self.call_counter = 0
while self.call_counter < CONNECTION_RETRIES:
self.call_counter += 1
try:
self._id += 1
call_body['id'] = six.text_type(self._id)
LOG.debug("Posting to Quobyte backend: %s",
jsonutils.dumps(call_body))
self._connection.request(
"POST", self._url + '/', jsonutils.dumps(call_body),
dict(Authorization=(self._credentials.
get_authorization_header())))
response = self._connection.getresponse()
self._throw_on_http_error(response)
result = jsonutils.loads(response.read())
LOG.debug("Retrieved data from Quobyte backend: %s", result)
return self._checked_for_application_error(result)
except ssl.SSLError as e:
# Generic catch because OpenSSL does not return
# meaningful errors.
if (not self._disabled_cert_verification
and not self._require_cert_verify):
LOG.warning(_LW(
"Could not verify server certificate of "
"API service against CA."))
self._connection.close()
# Core HTTPSConnection does no certificate verification.
self._connection = httplib.HTTPSConnection(self._netloc)
self._disabled_cert_verification = True
else:
raise exception.QBException(_(
"Client SSL subsystem returned error: %s") % e)
except httplib.BadStatusLine as e:
raise exception.QBException(_(
"If SSL is enabled for the API service, the URL must"
" start with 'https://' for the URL. Failed to parse"
" status code from server response. Error was %s")
% e)
except (httplib.HTTPException, socket.error) as e:
if self._fail_fast:
raise exception.QBException(msg=six.text_type(e))
else:
LOG.warning(_LW("Encountered error, retrying: %s"),
six.text_type(e))
time.sleep(1)
raise exception.QBException("Unable to connect to backend after "
"%s retries" %
six.text_type(CONNECTION_RETRIES))
def _throw_on_http_error(self, response):
if response.status == 401:
raise exception.QBException(
_("JSON RPC failed: unauthorized user %(status)s %(reason)s"
" Please check the Quobyte API service log for "
"more details.")
% {'status': six.text_type(response.status),
'reason': response.reason})
elif response.status >= 300:
raise exception.QBException(
_("JSON RPC failed: %(status)s %(reason)s"
" Please check the Quobyte API service log for "
"more details.")
% {'status': six.text_type(response.status),
'reason': response.reason})
def _checked_for_application_error(self, result):
if 'error' in result and result['error']:
if 'message' in result['error'] and 'code' in result['error']:
if result["error"]["code"] == ERROR_ENOENT:
return None # No Entry
else:
raise exception.QBRpcException(
result=result["error"]["message"],
qbcode=result["error"]["code"])
else:
raise exception.QBException(six.text_type(result["error"]))
return result["result"]

View File

@ -0,0 +1,217 @@
# Copyright (c) 2015 Quobyte Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Quobyte driver.
Manila shares are directly mapped to Quobyte volumes. The access to the
shares is provided by the Quobyte NFS proxy (a Ganesha NFS server).
"""
from oslo_config import cfg
from oslo_log import log
import manila.common.constants
from manila import exception
from manila.i18n import _
from manila.i18n import _LE
from manila.i18n import _LW
from manila.share import driver
from manila.share.drivers.quobyte import jsonrpc
LOG = log.getLogger(__name__)
quobyte_manila_share_opts = [
cfg.StrOpt('quobyte_api_url',
help='URL of the Quobyte API server (http or https)'),
cfg.StrOpt('quobyte_api_ca',
default=None,
help='The X.509 CA file to verify the server cert.'),
cfg.BoolOpt('quobyte_delete_shares',
default=False,
help='Actually deletes shares (vs. unexport)'),
cfg.StrOpt('quobyte_api_username',
default='admin',
help='Username for Quobyte API server.'),
cfg.StrOpt('quobyte_api_password',
default='quobyte',
secret=True,
help='Password for Quobyte API server'),
cfg.StrOpt('quobyte_volume_configuration',
default='BASE',
help='Name of volume configuration used for new shares.'),
cfg.StrOpt('quobyte_default_volume_user',
default='root',
help='Default owning user for new volumes.'),
cfg.StrOpt('quobyte_default_volume_group',
default='root',
help='Default owning group for new volumes.'),
]
CONF = cfg.CONF
CONF.register_opts(quobyte_manila_share_opts)
class QuobyteShareDriver(driver.ExecuteMixin, driver.ShareDriver,):
"""Map share commands to Quobyte volumes."""
DRIVER_VERSION = '1.0'
def __init__(self, db, *args, **kwargs):
super(QuobyteShareDriver, self).__init__(False, *args, **kwargs)
self.db = db
self.configuration.append_config_values(quobyte_manila_share_opts)
self.backend_name = (self.configuration.safe_get('share_backend_name')
or CONF.share_backend_name or 'Quobyte')
def do_setup(self, context):
"""Prepares the backend."""
self.rpc = jsonrpc.JsonRpc(
url=self.configuration.quobyte_api_url,
ca_file=self.configuration.quobyte_api_ca,
user_credentials=(
self.configuration.quobyte_api_username,
self.configuration.quobyte_api_password))
try:
self.rpc.call('getInformation', {})
except Exception as exc:
LOG.error(_LE("Could not connect to API: %s"), exc)
raise exception.QBException(
_('Could not connect to API: %s') % exc)
def _update_share_stats(self):
data = dict(
storage_protocol='NFS',
vendor_name='Quobyte',
share_backend_name=self.backend_name,
driver_version=self.DRIVER_VERSION)
# TODO(kaisers): Extend by total_capacity and free_capacity
super(QuobyteShareDriver, self)._update_share_stats(data)
def check_for_setup_error(self):
pass
def get_network_allocations_number(self):
return 0
def _get_project_name(self, context, project_id):
"""Retrieve the project name.
TODO (kaisers): retrieve the project name in order
to store and use in the backend for better usability.
"""
return project_id
def _resolve_volume_name(self, volume_name, tenant_domain):
"""Resolve a volume name to the global volume uuid."""
result = self.rpc.call('resolveVolumeName', dict(
volume_name=volume_name,
tenant_domain=tenant_domain))
if result:
return result['volume_uuid']
return None # not found
def create_share(self, context, share, share_server=None):
"""Create or export a volume that is usable as a Manila share."""
if share['share_proto'] != 'NFS':
raise exception.QBException(
_('Quobyte driver only supports NFS shares'))
volume_uuid = self._resolve_volume_name(
share['name'],
self._get_project_name(context, share['project_id']))
if not volume_uuid:
result = self.rpc.call('createVolume', dict(
name=share['name'],
tenant_domain=share['project_id'],
root_user_id=self.configuration.quobyte_default_volume_user,
root_group_id=self.configuration.quobyte_default_volume_group,
configuration_name=(self.configuration.
quobyte_volume_configuration)))
volume_uuid = result['volume_uuid']
result = self.rpc.call('exportVolume', dict(
volume_uuid=volume_uuid,
protocol='NFS'))
return '%(nfs_server_ip)s:%(nfs_export_path)s' % result
def delete_share(self, context, share, share_server=None):
"""Delete the corresponding Quobyte volume."""
volume_uuid = self._resolve_volume_name(
share['name'],
self._get_project_name(context, share['project_id']))
if not volume_uuid:
LOG.warning(_LW("No volume found for "
"share %(project_id)s/%(name)s")
% {"project_id": share['project_id'],
"name": share['name']})
return
if self.configuration.quobyte_delete_shares:
self.rpc.call('deleteVolume', {'volume_uuid': volume_uuid})
self.rpc.call('exportVolume', dict(
volume_uuid=volume_uuid,
remove_export=True))
def create_snapshot(self, context, snapshot, share_server=None):
"""Is called to create snapshot."""
raise NotImplementedError()
def create_share_from_snapshot(self, context, share, snapshot,
share_server=None):
"""Is called to create share from snapshot."""
raise NotImplementedError()
def delete_snapshot(self, context, snapshot, share_server=None):
"""TBD: Is called to remove snapshot."""
raise NotImplementedError()
def ensure_share(self, context, share, share_server=None):
"""Invoked to ensure that share is exported."""
def allow_access(self, context, share, access, share_server=None):
"""Allow access to a share."""
if access['access_type'] != 'ip':
raise exception.InvalidShareAccess(
_('Quobyte driver only supports ip access control'))
volume_uuid = self._resolve_volume_name(
share['name'],
self._get_project_name(context, share['project_id']))
self.rpc.call('exportVolume', dict(
volume_uuid=volume_uuid,
read_only='access_level' == (manila.common.constants.
ACCESS_LEVEL_RO),
add_allow_ip=access['access_to']))
def deny_access(self, context, share, access, share_server=None):
"""Remove white-list ip from a share."""
if access['access_type'] != 'ip':
LOG.debug('Quobyte driver only supports ip access control. '
'Ignoring deny access call for %s , %s',
share['name'],
self._get_project_name(context, share['project_id']))
return
volume_uuid = self._resolve_volume_name(
share['name'],
self._get_project_name(context, share['project_id']))
self.rpc.call('exportVolume', dict(
volume_uuid=volume_uuid,
remove_allow_ip=access['access_to']))

View File

@ -26,6 +26,7 @@ def fake_share(**kwargs):
'share_network_id': 'fake share network id', 'share_network_id': 'fake share network id',
'share_server_id': 'fake share server id', 'share_server_id': 'fake share server id',
'export_location': 'fake_location:/fake_share', 'export_location': 'fake_location:/fake_share',
'project_id': 'fake_project_uuid',
} }
share.update(kwargs) share.update(kwargs)
return db_fakes.FakeModel(share) return db_fakes.FakeModel(share)

View File

@ -0,0 +1,312 @@
# Copyright (c) 2015 Quobyte, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import httplib
import socket
import ssl
import tempfile
import mock
from oslo_serialization import jsonutils
import six
from manila import exception
from manila.share.drivers.quobyte import jsonrpc
from manila import test
class FakeResponse(object):
def __init__(self, status, body):
self.status = status
self.reason = "HTTP reason"
self._body = body
def read(self):
return self._body
class QuobyteBasicAuthCredentialsTestCase(test.TestCase):
def test_get_authorization_header(self):
creds = jsonrpc.BasicAuthCredentials('fakeuser', 'fakepwd')
self.assertEqual('BASIC ZmFrZXVzZXI6ZmFrZXB3ZA==',
creds.get_authorization_header())
class QuobyteHttpsConnectionWithCaVerificationTestCase(test.TestCase):
@mock.patch.object(socket, "create_connection",
return_value="fake_socket")
@mock.patch.object(ssl, "wrap_socket")
def test_https_with_ca_connect(self, mock_ssl, mock_cc):
key_file = tempfile.TemporaryFile()
cert_file = tempfile.gettempdir()
ca_file = tempfile.gettempdir()
mycon = (jsonrpc.
HTTPSConnectionWithCaVerification(host="localhost",
key_file=key_file,
cert_file=cert_file,
ca_file=ca_file,
strict="anything",
port=1234,
timeout=999))
mycon.connect()
mock_cc.assert_called_once_with(("localhost", 1234), 999)
mock_ssl.assert_called_once_with("fake_socket",
keyfile=key_file,
certfile=cert_file,
ca_certs=ca_file,
cert_reqs=mock.ANY)
@mock.patch.object(httplib.HTTPConnection, "_tunnel")
@mock.patch.object(socket, "create_connection",
return_value="fake_socket")
@mock.patch.object(ssl, "wrap_socket")
def test_https_with_ca_connect_tunnel(self,
mock_ssl,
mock_cc,
mock_tunnel):
key_file = tempfile.TemporaryFile()
cert_file = tempfile.gettempdir()
ca_file = tempfile.gettempdir()
mycon = (jsonrpc.
HTTPSConnectionWithCaVerification(host="localhost",
key_file=key_file,
cert_file=cert_file,
ca_file=ca_file,
strict="anything",
port=1234,
timeout=999))
mycon._tunnel_host = "fake_tunnel_host"
mycon.connect()
mock_tunnel.assert_called_once_with()
mock_cc.assert_called_once_with(("localhost", 1234), 999)
mock_ssl.assert_called_once_with("fake_socket",
keyfile=key_file,
certfile=cert_file,
ca_certs=ca_file,
cert_reqs=mock.ANY)
class QuobyteJsonRpcTestCase(test.TestCase):
def setUp(self):
super(QuobyteJsonRpcTestCase, self).setUp()
self.rpc = jsonrpc.JsonRpc(url="http://test",
user_credentials=("me", "team"))
self.rpc._connection = mock.Mock()
self.rpc._connection.request = mock.Mock()
def test_request_generation_and_basic_auth(self):
self.rpc._connection.request = mock.Mock()
self.rpc._connection.getresponse = mock.Mock(
return_value=FakeResponse(200, '{"result":"yes"}'))
self.rpc.call('method', {'param': 'value'})
self.rpc._connection.request.assert_called_once_with(
'POST', 'http://test/',
jsonutils.dumps({'jsonrpc': '2.0',
'method': 'method',
'params': {'retry': 'INFINITELY',
'param': 'value'},
'id': '1'}),
dict(Authorization=jsonrpc.BasicAuthCredentials("me", "team")
.get_authorization_header()))
@mock.patch.object(jsonrpc.HTTPSConnectionWithCaVerification,
'__init__',
return_value=None)
def test_jsonrpc_init_with_ca(self, mock_init):
foofile = tempfile.TemporaryFile()
self.rpc = jsonrpc.JsonRpc("https://foo.bar/",
('fakeuser', 'fakepwd'),
foofile)
mock_init.assert_called_once_with("foo.bar",
ca_file=foofile.name)
@mock.patch.object(jsonrpc.LOG, "warning")
def test_jsonrpc_init_without_ca(self, mock_warning):
self.rpc = jsonrpc.JsonRpc("https://foo.bar/",
('fakeuser', 'fakepwd'),
None)
mock_warning.assert_called_once_with(
"Will not verify the server certificate of the API service"
" because the CA certificate is not available.")
@mock.patch.object(httplib.HTTPConnection,
'__init__',
return_value=None)
def test_jsonrpc_init_no_ssl(self, mock_init):
self.rpc = jsonrpc.JsonRpc("http://foo.bar/",
('fakeuser', 'fakepwd'))
mock_init.assert_called_once_with("foo.bar")
def test_successful_call(self):
self.rpc._connection.request = mock.Mock()
self.rpc._connection.getresponse = mock.Mock(
return_value=FakeResponse(
200, '{"result":"Sweet gorilla of Manila"}'))
result = self.rpc.call('method', {'param': 'value'})
self.assertEqual("Sweet gorilla of Manila", result)
def test_jsonrpc_call_ssl_disable(self):
self.rpc._connection.request = mock.Mock(
side_effect=ssl.SSLError)
jsonrpc.LOG.warning = mock.Mock()
self.assertRaises(exception.QBException,
self.rpc.call,
'method', {'param': 'value'})
jsonrpc.LOG.warning.assert_called_once_with(
"Could not verify server certificate of "
"API service against CA.")
def test_jsonrpc_call_ssl_error(self):
"""This test succeeds if a specific exception is thrown.
Throwing a different exception or none at all
is a failure in this specific test case.
"""
self.rpc._connection.request = mock.Mock(
side_effect=ssl.SSLError)
self.rpc._disabled_cert_verification = True
try:
self.rpc.call('method', {'param': 'value'})
except exception.QBException as me:
self.assertEqual("Client SSL subsystem returned error: ",
six.text_type(me))
except Exception as e:
self.fail('Unexpected exception thrown: %s' % e)
else:
self.fail('Expected exception not thrown')
def test_jsonrpc_call_bad_status_line(self):
self.rpc._connection.request = mock.Mock()
self.rpc._connection.getresponse = mock.Mock(
side_effect=httplib.BadStatusLine("fake_line"))
self.assertRaises(exception.QBException,
self.rpc.call,
'method', {'param': 'value'})
def test_jsonrpc_call_http_exception(self):
self.rpc._connection.request = mock.Mock()
self.rpc._connection.getresponse = mock.Mock(
side_effect=httplib.HTTPException)
jsonrpc.LOG.warning = mock.Mock()
self.assertRaises(exception.QBException,
self.rpc.call,
'method', {'param': 'value'})
jsonrpc.LOG.warning.assert_has_calls([])
def test_jsonrpc_call_http_exception_retry(self):
self.rpc._connection.request = mock.Mock()
self.rpc._connection.getresponse = mock.Mock(
side_effect=httplib.HTTPException)
jsonrpc.LOG.warning = mock.Mock()
self.rpc._fail_fast = False
self.assertRaises(exception.QBException,
self.rpc.call,
'method', {'param': 'value'})
jsonrpc.LOG.warning.assert_called_with(
"Encountered error, retrying: %s", "")
def test_jsonrpc_call_no_connect(self):
orig_retries = jsonrpc.CONNECTION_RETRIES
jsonrpc.CONNECTION_RETRIES = 0
try:
self.rpc.call('method', {'param': 'value'})
except exception.QBException as me:
self.assertEqual("Unable to connect to backend after 0 retries",
six.text_type(me))
else:
self.fail('Expected exception not thrown')
finally:
jsonrpc.CONNECTION_RETRIES = orig_retries
def test_http_error_401(self):
self.rpc._connection.request = mock.Mock()
self.rpc._connection.getresponse = mock.Mock(
return_value=FakeResponse(401, ''))
self.assertRaises(exception.QBException,
self.rpc.call, 'method', {'param': 'value'})
def test_http_error_other(self):
self.rpc._connection.request = mock.Mock()
self.rpc._connection.getresponse = mock.Mock(
return_value=FakeResponse(300, ''))
self.assertRaises(exception.QBException,
self.rpc.call, 'method', {'param': 'value'})
def test_application_error(self):
self.rpc._connection.request = mock.Mock()
self.rpc._connection.getresponse = mock.Mock(
return_value=FakeResponse(
200, '{"error":{"code":28,"message":"text"}}'))
self.assertRaises(exception.QBRpcException,
self.rpc.call, 'method', {'param': 'value'})
def test_broken_application_error(self):
self.rpc._connection.request = mock.Mock()
self.rpc._connection.getresponse = mock.Mock(
return_value=FakeResponse(
200, '{"error":{"code":28,"messge":"text"}}'))
self.assertRaises(exception.QBException,
self.rpc.call, 'method', {'param': 'value'})
def test_checked_for_application_error(self):
resultdict = {"result": "Sweet gorilla of Manila"}
self.assertEqual("Sweet gorilla of Manila",
(self.rpc.
_checked_for_application_error(result=resultdict))
)
def test_checked_for_application_error_no_entry(self):
resultdict = {"result": "Sweet gorilla of Manila",
"error": {"message": "No Gorilla",
"code": jsonrpc.ERROR_ENOENT}}
self.assertEqual(None,
self.rpc.
_checked_for_application_error(result=resultdict))
def test_checked_for_application_error_exception(self):
self.assertRaises(exception.QBRpcException,
self.rpc._checked_for_application_error,
{"result": "Sweet gorilla of Manila",
"error": {"message": "No Gorilla",
"code": 666
}
}
)

View File

@ -0,0 +1,251 @@
# Copyright (c) 2015 Quobyte, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
from manila import context
from manila import exception
from manila.share import configuration as config
from manila.share import driver
from manila.share.drivers.quobyte import jsonrpc
from manila.share.drivers.quobyte import quobyte
from manila import test
from manila.tests import fake_share
CONF = cfg.CONF
def fake_rpc_handler(name, *args):
if name == 'resolveVolumeName':
return None
elif name == 'createVolume':
return {'volume_uuid': 'voluuid'}
elif name == 'exportVolume':
return {'nfs_server_ip': '10.10.1.1',
'nfs_export_path': '/voluuid'}
class QuobyteShareDriverTestCase(test.TestCase):
"""Tests QuobyteShareDriver."""
def setUp(self):
super(QuobyteShareDriverTestCase, self).setUp()
self._context = context.get_admin_context()
CONF.set_default('driver_handles_share_servers', False)
self.fake_conf = config.Configuration(None)
self._db = mock.Mock()
self._driver = quobyte.QuobyteShareDriver(self._db,
configuration=self.fake_conf)
self._driver.rpc = mock.Mock()
self.share = fake_share.fake_share(share_proto='NFS')
self.access = fake_share.fake_access()
@mock.patch('manila.share.drivers.quobyte.jsonrpc.JsonRpc', mock.Mock())
def test_do_setup_success(self):
self._driver.rpc.call = mock.Mock(return_value=None)
self._driver.do_setup(self._context)
self._driver.rpc.call.assert_called_with('getInformation', {})
@mock.patch('manila.share.drivers.quobyte.jsonrpc.JsonRpc.__init__',
mock.Mock(return_value=None))
@mock.patch.object(jsonrpc.JsonRpc, 'call',
side_effect=exception.QBRpcException)
def test_do_setup_failure(self, mock_call):
self.assertRaises(exception.QBException,
self._driver.do_setup, self._context)
def test_create_share_new_volume(self):
self._driver.rpc.call = mock.Mock(wraps=fake_rpc_handler)
result = self._driver.create_share(self._context, self.share)
self.assertEqual('10.10.1.1:/voluuid', result)
self._driver.rpc.call.assert_has_calls([
mock.call('createVolume', dict(
name=self.share['name'],
tenant_domain=self.share['project_id'],
root_user_id=self.fake_conf.quobyte_default_volume_user,
root_group_id=self.fake_conf.quobyte_default_volume_group,
configuration_name=self.fake_conf.quobyte_volume_configuration
)),
mock.call('exportVolume',
dict(protocol='NFS', volume_uuid='voluuid'))])
def test_create_share_existing_volume(self):
self._driver.rpc.call = mock.Mock(wraps=fake_rpc_handler)
self._driver.create_share(self._context, self.share)
self._driver.rpc.call.assert_called_with(
'exportVolume', dict(protocol='NFS', volume_uuid='voluuid'))
def test_create_share_wrong_protocol(self):
share = {'share_proto': 'WRONG_PROTOCOL'}
self.assertRaises(exception.QBException,
self._driver.create_share,
context=None,
share=share)
def test_delete_share_existing_volume(self):
def rpc_handler(name, *args):
if name == 'resolveVolumeName':
return {'volume_uuid': 'voluuid'}
elif name == 'exportVolume':
return {}
self._driver.configuration.quobyte_delete_shares = True
self._driver.rpc.call = mock.Mock(wraps=rpc_handler)
self._driver.delete_share(self._context, self.share)
self._driver.rpc.call.assert_has_calls([
mock.call('resolveVolumeName',
{'volume_name': 'fakename',
'tenant_domain': 'fake_project_uuid'}),
mock.call('deleteVolume', {'volume_uuid': 'voluuid'}),
mock.call('exportVolume', {'volume_uuid': 'voluuid',
'remove_export': True})])
def test_delete_share_existing_volume_disabled(self):
def rpc_handler(name, *args):
if name == 'resolveVolumeName':
return {'volume_uuid': 'voluuid'}
elif name == 'exportVolume':
return {}
CONF.set_default('quobyte_delete_shares', False)
self._driver.rpc.call = mock.Mock(wraps=rpc_handler)
self._driver.delete_share(self._context, self.share)
self._driver.rpc.call.assert_called_with(
'exportVolume', {'volume_uuid': 'voluuid',
'remove_export': True})
@mock.patch.object(quobyte.LOG, 'warning')
def test_delete_share_nonexisting_volume(self, mock_warning):
def rpc_handler(name, *args):
if name == 'resolveVolumeName':
return None
self._driver.rpc.call = mock.Mock(wraps=rpc_handler)
self._driver.delete_share(self._context, self.share)
mock_warning.assert_called_with(
'No volume found for share fake_project_uuid/fakename')
def test_allow_access(self):
def rpc_handler(name, *args):
if name == 'resolveVolumeName':
return {'volume_uuid': 'voluuid'}
elif name == 'exportVolume':
return {'nfs_server_ip': '10.10.1.1',
'nfs_export_path': '/voluuid'}
self._driver.rpc.call = mock.Mock(wraps=rpc_handler)
self._driver.allow_access(self._context, self.share, self.access)
self._driver.rpc.call.assert_called_with(
'exportVolume', {'volume_uuid': 'voluuid',
'read_only': False,
'add_allow_ip': '10.0.0.1'})
def test_allow_access_nonip(self):
self._driver.rpc.call = mock.Mock(wraps=fake_rpc_handler)
self.access = fake_share.fake_access(**{"access_type":
"non_existant_access_type"})
self.assertRaises(exception.InvalidShareAccess,
self._driver.allow_access,
self._context, self.share, self.access)
def test_deny_access(self):
def rpc_handler(name, *args):
if name == 'resolveVolumeName':
return {'volume_uuid': 'voluuid'}
elif name == 'exportVolume':
return {'nfs_server_ip': '10.10.1.1',
'nfs_export_path': '/voluuid'}
self._driver.rpc.call = mock.Mock(wraps=rpc_handler)
self._driver.deny_access(self._context, self.share, self.access)
self._driver.rpc.call.assert_called_with(
'exportVolume',
{'volume_uuid': 'voluuid', 'remove_allow_ip': '10.0.0.1'})
@mock.patch.object(quobyte.LOG, 'debug')
def test_deny_access_nonip(self, mock_debug):
self._driver.rpc.call = mock.Mock(wraps=fake_rpc_handler)
self.access = fake_share.fake_access(
access_type="non_existant_access_type")
self._driver.deny_access(self._context, self.share, self.access)
mock_debug.assert_called_with(
'Quobyte driver only supports ip access control. '
'Ignoring deny access call for %s , %s',
'fakename', 'fake_project_uuid')
def test_resolve_volume_name(self):
self._driver.rpc.call = mock.Mock(
return_value={'volume_uuid': 'fake_uuid'})
self._driver._resolve_volume_name('fake_vol_name', 'fake_domain_name')
self._driver.rpc.call.assert_called_with(
'resolveVolumeName',
{'volume_name': 'fake_vol_name',
'tenant_domain': 'fake_domain_name'})
def test_resolve_volume_name_NOENT(self):
self._driver.rpc.call = mock.Mock(
return_value=None)
self.assertIsNone(
self._driver._resolve_volume_name('fake_vol_name',
'fake_domain_name'))
def test_resolve_volume_name_other_error(self):
self._driver.rpc.call = mock.Mock(
side_effect=exception.QBRpcException(
result='fubar',
qbcode=666))
self.assertRaises(exception.QBRpcException,
self._driver._resolve_volume_name,
volume_name='fake_vol_name',
tenant_domain='fake_domain_name')
@mock.patch.object(driver.ShareDriver, '_update_share_stats')
def test_update_share_stats(self, mock_uss):
self._driver._update_share_stats()
mock_uss.assert_called_once_with(
dict(storage_protocol='NFS',
vendor_name='Quobyte',
share_backend_name=self._driver.backend_name,
driver_version=self._driver.DRIVER_VERSION))