Nexenta: Added use of sessions for NexentaEdge drivers

Modify jsonrpc to use sessions for consecutive calls,
registering new session for each.

Change-Id: Ic2aac3b6c85a2738106fef701fed1a1a3cb14528
This commit is contained in:
Aleksey Ruban 2016-08-16 15:43:59 -06:00
parent 345cbf8b00
commit 5197b2c8b2
2 changed files with 89 additions and 114 deletions

View File

@ -13,7 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import base64
import socket
import mock
@ -25,6 +24,7 @@ from cinder import context
from cinder import exception
from cinder import test
from cinder.volume import configuration as conf
from cinder.volume.drivers.nexenta.nexentaedge import jsonrpc
from cinder.volume.drivers.nexenta.nexentaedge import nbd
@ -53,16 +53,6 @@ class RequestParams(object):
return '%s://%s:%s/%s' % (
self.scheme, self.host, self.port, path)
@property
def headers(self):
auth = base64.b64encode(
('%s:%s' % (self.user, self.password)).encode('utf-8'))
headers = {
'Content-Type': 'application/json',
'Authorization': 'Basic %s' % auth
}
return headers
def build_post_args(self, args):
return jsonutils.dumps(args)
@ -91,7 +81,15 @@ class TestNexentaEdgeNBDDriver(test.TestCase):
self.ctx = context.get_admin_context()
self.drv = nbd.NexentaEdgeNBDDriver(configuration=self.cfg)
session = mock.Mock()
session.get = mock.Mock()
session.post = mock.Mock()
session.put = mock.Mock()
session.delete = mock.Mock()
self.drv.do_setup(self.ctx)
self.drv.restapi.session = session
self.mock_api = session
self.request_params = RequestParams(
'http', self.cfg.nexenta_rest_address, self.cfg.nexenta_rest_port,
@ -107,23 +105,21 @@ class TestNexentaEdgeNBDDriver(test.TestCase):
self.assertRaises(
exception.NexentaException, self.drv.check_for_setup_error)
@patch('requests.get')
@patch('os.path.exists')
def test_check_do_setup__empty_response(self, exists, get):
get.return_value = FakeResponse({})
def test_check_do_setup__empty_response(self, exists):
self.mock_api.get.return_value = FakeResponse({})
exists.return_value = True
self.assertRaises(exception.VolumeBackendAPIException,
self.drv.check_for_setup_error)
@patch('requests.get')
@patch('os.path.exists')
def test_check_do_setup(self, exists, get):
get.return_value = FakeResponse({'response': 'OK'})
def test_check_do_setup(self, exists):
self.mock_api.get.return_value = FakeResponse({'response': 'OK'})
exists.return_value = True
self.drv.check_for_setup_error()
get.assert_any_call(
self.mock_api.get.assert_any_call(
self.request_params.url(self.drv.bucket_url + '/objects/'),
headers=self.request_params.headers)
timeout=jsonrpc.TIMEOUT)
def test_local_path__error(self):
self.drv._get_nbd_number = lambda volume_: -1
@ -131,8 +127,7 @@ class TestNexentaEdgeNBDDriver(test.TestCase):
self.assertRaises(exception.VolumeBackendAPIException,
self.drv.local_path, volume)
@patch('requests.get')
def test_local_path(self, get):
def test_local_path(self):
volume = {
'name': 'volume',
'host': 'myhost@backend#pool'
@ -173,11 +168,10 @@ class TestNexentaEdgeNBDDriver(test.TestCase):
else:
raise Exception('Unexpected request')
get.side_effect = my_side_effect
self.mock_api.get.side_effect = my_side_effect
self.drv.local_path(volume)
@patch('requests.get')
def test_local_path__host_not_found(self, get):
def test_local_path__host_not_found(self):
volume = {
'name': 'volume',
'host': 'unknown-host@backend#pool'
@ -218,14 +212,13 @@ class TestNexentaEdgeNBDDriver(test.TestCase):
else:
raise Exception('Unexpected request')
get.side_effect = my_side_effect
self.mock_api.get.side_effect = my_side_effect
self.assertRaises(exception.VolumeBackendAPIException,
self.drv.local_path, volume)
@patch('cinder.utils.execute')
@patch('requests.post')
def test_create_volume(self, post, execute):
post.returning_value = FakeResponse({})
def test_create_volume(self, execute):
self.mock_api.post.returning_value = FakeResponse({})
volume = {
'host': 'host@backend#pool info',
'size': 1,
@ -236,7 +229,7 @@ class TestNexentaEdgeNBDDriver(test.TestCase):
self.drv._get_remote_url = lambda host_: remote_url
self.drv._get_nbd_number = lambda volume_: number
self.drv.create_volume(volume)
post.assert_called_with(
self.mock_api.post.assert_called_with(
self.request_params.url('nbd' + remote_url),
data=self.request_params.build_post_args({
'objectPath': '/'.join((self.cfg.nexenta_lun_container,
@ -244,11 +237,10 @@ class TestNexentaEdgeNBDDriver(test.TestCase):
'volSizeMB': volume['size'] * units.Ki,
'blockSize': self.cfg.nexenta_blocksize,
'chunkSize': self.cfg.nexenta_chunksize}),
headers=self.request_params.headers)
timeout=jsonrpc.TIMEOUT)
@patch('requests.delete')
def test_delete_volume(self, delete):
delete.returning_value = FakeResponse({})
def test_delete_volume(self):
self.mock_api.delete.returning_value = FakeResponse({})
volume = {
'host': 'host@backend#pool info',
'size': 1,
@ -259,17 +251,16 @@ class TestNexentaEdgeNBDDriver(test.TestCase):
self.drv._get_remote_url = lambda host_: remote_url
self.drv._get_nbd_number = lambda volume_: number
self.drv.delete_volume(volume)
delete.assert_called_with(
self.mock_api.delete.assert_called_with(
self.request_params.url('nbd' + remote_url),
data=self.request_params.build_post_args({
'objectPath': '/'.join((self.cfg.nexenta_lun_container,
volume['name'])),
'number': number}),
headers=self.request_params.headers)
timeout=jsonrpc.TIMEOUT)
@patch('requests.delete')
def test_delete_volume__not_found(self, delete):
delete.returning_value = FakeResponse({})
def test_delete_volume__not_found(self):
self.mock_api.delete.returning_value = FakeResponse({})
volume = {
'host': 'host@backend#pool info',
'size': 1,
@ -279,11 +270,10 @@ class TestNexentaEdgeNBDDriver(test.TestCase):
self.drv._get_remote_url = lambda host_: remote_url
self.drv._get_nbd_number = lambda volume_: -1
self.drv.delete_volume(volume)
delete.assert_not_called()
self.mock_api.delete.assert_not_called()
@patch('requests.put')
def test_extend_volume(self, put):
put.returning_value = FakeResponse({})
def test_extend_volume(self):
self.mock_api.put.returning_value = FakeResponse({})
volume = {
'host': 'host@backend#pool info',
'size': 1,
@ -293,49 +283,46 @@ class TestNexentaEdgeNBDDriver(test.TestCase):
remote_url = ''
self.drv._get_remote_url = lambda host_: remote_url
self.drv.extend_volume(volume, new_size)
put.assert_called_with(
self.mock_api.put.assert_called_with(
self.request_params.url('nbd/resize' + remote_url),
data=self.request_params.build_post_args({
'objectPath': '/'.join((self.cfg.nexenta_lun_container,
volume['name'])),
'newSizeMB': new_size * units.Ki}),
headers=self.request_params.headers)
timeout=jsonrpc.TIMEOUT)
@patch('requests.post')
def test_create_snapshot(self, post):
post.returning_value = FakeResponse({})
def test_create_snapshot(self):
self.mock_api.post.returning_value = FakeResponse({})
snapshot = {
'name': 'dsfsdsdgfdf',
'volume_name': 'volume'
}
self.drv.create_snapshot(snapshot)
post.assert_called_with(
self.mock_api.post.assert_called_with(
self.request_params.url('nbd/snapshot'),
data=self.request_params.build_post_args({
'objectPath': '/'.join((self.cfg.nexenta_lun_container,
snapshot['volume_name'])),
'snapName': snapshot['name']}),
headers=self.request_params.headers)
timeout=jsonrpc.TIMEOUT)
@patch('requests.delete')
def test_delete_snapshot(self, delete):
delete.returning_value = FakeResponse({})
def test_delete_snapshot(self):
self.mock_api.delete.returning_value = FakeResponse({})
snapshot = {
'name': 'dsfsdsdgfdf',
'volume_name': 'volume'
}
self.drv.delete_snapshot(snapshot)
delete.assert_called_with(
self.mock_api.delete.assert_called_with(
self.request_params.url('nbd/snapshot'),
data=self.request_params.build_post_args({
'objectPath': '/'.join((self.cfg.nexenta_lun_container,
snapshot['volume_name'])),
'snapName': snapshot['name']}),
headers=self.request_params.headers)
timeout=jsonrpc.TIMEOUT)
@patch('requests.put')
def test_create_volume_from_snapshot(self, put):
put.returning_value = FakeResponse({})
def test_create_volume_from_snapshot(self):
self.mock_api.put.returning_value = FakeResponse({})
snapshot = {
'name': 'dsfsdsdgfdf',
'volume_size': 1,
@ -350,7 +337,7 @@ class TestNexentaEdgeNBDDriver(test.TestCase):
self.drv._get_remote_url = lambda host_: remote_url
self.drv.extend_volume = lambda v, s: None
self.drv.create_volume_from_snapshot(volume, snapshot)
put.assert_called_with(
self.mock_api.put.assert_called_with(
self.request_params.url('nbd/snapshot/clone' + remote_url),
data=self.request_params.build_post_args({
'objectPath': '/'.join((self.cfg.nexenta_lun_container,
@ -359,11 +346,10 @@ class TestNexentaEdgeNBDDriver(test.TestCase):
'clonePath': '/'.join((self.cfg.nexenta_lun_container,
volume['name']))
}),
headers=self.request_params.headers)
timeout=jsonrpc.TIMEOUT)
@patch('requests.post')
def test_create_cloned_volume(self, post):
post.returning_value = FakeResponse({})
def test_create_cloned_volume(self):
self.mock_api.post.returning_value = FakeResponse({})
volume = {
'host': 'host@backend#pool info',
'size': 1,
@ -377,7 +363,7 @@ class TestNexentaEdgeNBDDriver(test.TestCase):
remote_url = ''
self.drv._get_remote_url = lambda host_: remote_url
self.drv.create_cloned_volume(volume, src_vref)
post.assert_called_with(
self.mock_api.post.assert_called_with(
self.request_params.url('nbd' + remote_url),
data=self.request_params.build_post_args({
'objectPath': '/'.join((container, volume['name'])),
@ -385,11 +371,10 @@ class TestNexentaEdgeNBDDriver(test.TestCase):
'blockSize': self.cfg.nexenta_blocksize,
'chunkSize': self.cfg.nexenta_chunksize
}),
headers=self.request_params.headers)
timeout=jsonrpc.TIMEOUT)
@patch('requests.post')
def test_create_cloned_volume_gt_src(self, post):
post.returning_value = FakeResponse({})
def test_create_cloned_volume_gt_src(self):
self.mock_api.post.returning_value = FakeResponse({})
volume = {
'host': 'host@backend#pool info',
'size': 2,
@ -403,7 +388,7 @@ class TestNexentaEdgeNBDDriver(test.TestCase):
remote_url = ''
self.drv._get_remote_url = lambda host_: remote_url
self.drv.create_cloned_volume(volume, src_vref)
post.assert_called_with(
self.mock_api.post.assert_called_with(
self.request_params.url('nbd' + remote_url),
data=self.request_params.build_post_args({
'objectPath': '/'.join((container, volume['name'])),
@ -411,12 +396,11 @@ class TestNexentaEdgeNBDDriver(test.TestCase):
'blockSize': self.cfg.nexenta_blocksize,
'chunkSize': self.cfg.nexenta_chunksize
}),
headers=self.request_params.headers)
timeout=jsonrpc.TIMEOUT)
@patch('requests.get')
def test_get_volume_stats(self, get):
def test_get_volume_stats(self):
self.cfg.volume_backend_name = None
get.return_value = FakeResponse({
self.mock_api.get.return_value = FakeResponse({
'response': {
'stats': {
'summary': {
@ -441,7 +425,9 @@ class TestNexentaEdgeNBDDriver(test.TestCase):
'QoS_support': False,
'volume_backend_name': self.drv.__class__.__name__,
'location_info': location_info,
'restapi_url': self.request_params.url()
'restapi_url': '%s://%s:%s/' % (
'http', self.cfg.nexenta_rest_address,
self.cfg.nexenta_rest_port)
}
self.assertEqual(expected, self.drv.get_volume_stats())
@ -473,8 +459,7 @@ class TestNexentaEdgeNBDDriver(test.TestCase):
upload_volume.assert_called_with(
self.ctx, 'image_service', 'image_meta', 'local_path')
@patch('requests.get')
def test_validate_connector(self, get):
def test_validate_connector(self):
connector = {'host': 'host2'}
r = {
'stats': {
@ -484,13 +469,13 @@ class TestNexentaEdgeNBDDriver(test.TestCase):
}
}
}
get.return_value = FakeResponse({'response': r})
self.mock_api.get.return_value = FakeResponse({'response': r})
self.drv.validate_connector(connector)
get.assert_called_with(self.request_params.url('system/stats'),
headers=self.request_params.headers)
self.mock_api.get.assert_called_with(
self.request_params.url('system/stats'),
timeout=jsonrpc.TIMEOUT)
@patch('requests.get')
def test_validate_connector__host_not_found(self, get):
def test_validate_connector__host_not_found(self):
connector = {'host': 'host3'}
r = {
'stats': {
@ -500,7 +485,7 @@ class TestNexentaEdgeNBDDriver(test.TestCase):
}
}
}
get.return_value = FakeResponse({'response': r})
self.mock_api.get.return_value = FakeResponse({'response': r})
self.assertRaises(exception.VolumeBackendAPIException,
self.drv.validate_connector, connector)

View File

@ -13,10 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import base64
import json
import requests
import socket
from oslo_log import log as logging
@ -25,7 +23,7 @@ from cinder.i18n import _
from cinder.utils import retry
LOG = logging.getLogger(__name__)
socket.setdefaulttimeout(100)
TIMEOUT = 60
class NexentaEdgeJSONProxy(object):
@ -36,7 +34,13 @@ class NexentaEdgeJSONProxy(object):
)
def __init__(self, protocol, host, port, path, user, password, auto=False,
method=None):
method=None, session=None):
if session:
self.session = session
else:
self.session = requests.Session()
self.session.auth = (user, password)
self.session.headers.update({'Content-Type': 'application/json'})
self.protocol = protocol.lower()
self.host = host
self.port = port
@ -48,21 +52,18 @@ class NexentaEdgeJSONProxy(object):
@property
def url(self):
return '%s://%s:%s/%s' % (self.protocol,
self.host, self.port, self.path)
return '%s://%s:%s/%s' % (
self.protocol, self.host, self.port, self.path)
def __getattr__(self, name):
if not self.method:
method = name
else:
raise exception.VolumeDriverException(
_("Wrong resource call syntax"))
return NexentaEdgeJSONProxy(
self.protocol, self.host, self.port, self.path,
self.user, self.password, self.auto, method)
if name in ('get', 'post', 'put', 'delete'):
return NexentaEdgeJSONProxy(
self.protocol, self.host, self.port, self.path, self.user,
self.password, self.auto, name, self.session)
return super(NexentaEdgeJSONProxy, self).__getattr__(name)
def __hash__(self):
return self.url.__hash___()
return self.url.__hash__()
def __repr__(self):
return 'HTTP JSON proxy: %s' % self.url
@ -70,34 +71,23 @@ class NexentaEdgeJSONProxy(object):
@retry(retry_exc_tuple, interval=1, retries=6)
def __call__(self, *args):
self.path = args[0]
kwargs = {'timeout': TIMEOUT}
data = None
if len(args) > 1:
data = json.dumps(args[1])
kwargs['data'] = data
auth = base64.b64encode(
('%s:%s' % (self.user, self.password)).encode('utf-8'))
headers = {
'Content-Type': 'application/json',
'Authorization': 'Basic %s' % auth
}
LOG.debug('Sending JSON data: %s, method: %s, data: %s',
self.url, self.method, data)
LOG.debug('Sending JSON data: %s, method: %s, data: %s', self.url,
self.method, data)
if self.method == 'get':
req = requests.get(self.url, headers=headers)
elif self.method == 'post':
req = requests.post(self.url, data=data, headers=headers)
elif self.method == 'put':
req = requests.put(self.url, data=data, headers=headers)
elif self.method == 'delete':
req = requests.delete(self.url, data=data, headers=headers)
func = getattr(self.session, self.method)
if func:
req = func(self.url, **kwargs)
else:
raise exception.VolumeDriverException(
message=_('Unsupported method: %s') % self.method)
rsp = req.json()
req.close()
LOG.debug('Got response: %s', rsp)
if rsp.get('response') is None: