Merge "Coho Data: New socket connections per request"
This commit is contained in:
commit
95f3f966ff
|
@ -247,6 +247,7 @@ class CohoDriverTest(test.TestCase):
|
|||
[mock.call(ADDR, self.configuration.coho_rpc_port),
|
||||
mock.call().create_volume_from_snapshot(
|
||||
SNAPSHOT['name'], os.path.join(PATH, VOLUME['name'])),
|
||||
mock.call(ADDR, self.configuration.coho_rpc_port),
|
||||
mock.call().set_qos_policy(os.path.join(PATH, VOLUME['name']),
|
||||
QOS)])
|
||||
|
||||
|
@ -412,7 +413,6 @@ class CohoDriverTest(test.TestCase):
|
|||
mock.call().unpack_uint()])
|
||||
mock_socket.assert_has_calls(
|
||||
[mock.call(socket.AF_INET, socket.SOCK_STREAM),
|
||||
mock.call().bind(('', 0)),
|
||||
mock.call().connect((ADDR, RPC_PORT))])
|
||||
mock_init_call.assert_has_calls(
|
||||
[mock.call(coho.COHO1_CREATE_VOLUME_FROM_SNAPSHOT,
|
||||
|
@ -464,7 +464,6 @@ class CohoDriverTest(test.TestCase):
|
|||
self.assertTrue(mock_recvrecord.called)
|
||||
mock_socket.assert_has_calls(
|
||||
[mock.call(socket.AF_INET, socket.SOCK_STREAM),
|
||||
mock.call().bind(('', 0)),
|
||||
mock.call().connect((ADDR, RPC_PORT))])
|
||||
|
||||
def test_rpc_client_error_in_receive_fragment(self):
|
||||
|
@ -481,7 +480,6 @@ class CohoDriverTest(test.TestCase):
|
|||
self.assertTrue(mock_sendrcd.called)
|
||||
mock_socket.assert_has_calls(
|
||||
[mock.call(socket.AF_INET, socket.SOCK_STREAM),
|
||||
mock.call().bind(('', 0)),
|
||||
mock.call().connect((ADDR, RPC_PORT)),
|
||||
mock.call().recv(4)])
|
||||
|
||||
|
|
|
@ -32,6 +32,8 @@ from cinder.volume.drivers import nfs
|
|||
from cinder.volume import qos_specs
|
||||
from cinder.volume import volume_types
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
#
|
||||
# RPC Definition
|
||||
#
|
||||
|
@ -71,11 +73,6 @@ COHO_NO_QOS = {'maxIOPS': 0, 'maxMBS': 0}
|
|||
#
|
||||
|
||||
|
||||
def make_auth_null():
|
||||
|
||||
return six.b('')
|
||||
|
||||
|
||||
class Client(object):
|
||||
|
||||
def __init__(self, address, prog, vers, port):
|
||||
|
@ -94,7 +91,6 @@ class Client(object):
|
|||
def init_socket(self):
|
||||
try:
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.sock.bind(('', 0))
|
||||
self.sock.connect((self.address, self.port))
|
||||
except socket.error:
|
||||
msg = _('Failed to establish connection with Coho cluster')
|
||||
|
@ -108,12 +104,12 @@ class Client(object):
|
|||
|
||||
def make_cred(self):
|
||||
if self.cred is None:
|
||||
self.cred = (AUTH_NULL, make_auth_null())
|
||||
self.cred = (AUTH_NULL, six.b(''))
|
||||
return self.cred
|
||||
|
||||
def make_verf(self):
|
||||
if self.verf is None:
|
||||
self.verf = (AUTH_NULL, make_auth_null())
|
||||
self.verf = (AUTH_NULL, six.b(''))
|
||||
return self.verf
|
||||
|
||||
def pack_auth(self, auth):
|
||||
|
@ -246,6 +242,7 @@ class Client(object):
|
|||
except socket.error as e:
|
||||
if e.errno == errno.EPIPE:
|
||||
# Reopen connection to cluster and retry
|
||||
LOG.debug('Re-establishing socket, retry number %d', retry)
|
||||
self.init_socket()
|
||||
else:
|
||||
msg = (_('Unable to send requests: %s') %
|
||||
|
@ -266,21 +263,28 @@ class CohoRPCClient(Client):
|
|||
Client.__init__(self, address, COHO_PROGRAM, 1, port)
|
||||
|
||||
def create_snapshot(self, src, dst, flags):
|
||||
LOG.debug('COHO1_CREATE_SNAPSHOT src %s to dst %s', src, dst)
|
||||
self._call(COHO1_CREATE_SNAPSHOT,
|
||||
[(six.b(src), self.packer.pack_string),
|
||||
(six.b(dst), self.packer.pack_string),
|
||||
(flags, self.packer.pack_uint)])
|
||||
|
||||
def delete_snapshot(self, name):
|
||||
LOG.debug('COHO1_DELETE_SNAPSHOT name %s', name)
|
||||
self._call(COHO1_DELETE_SNAPSHOT,
|
||||
[(six.b(name), self.packer.pack_string)])
|
||||
|
||||
def create_volume_from_snapshot(self, src, dst):
|
||||
LOG.debug('COHO1_CREATE_VOLUME_FROM_SNAPSHOT src %s to dst %s',
|
||||
src, dst)
|
||||
self._call(COHO1_CREATE_VOLUME_FROM_SNAPSHOT,
|
||||
[(six.b(src), self.packer.pack_string),
|
||||
(six.b(dst), self.packer.pack_string)])
|
||||
|
||||
def set_qos_policy(self, src, qos):
|
||||
LOG.debug('COHO1_SET_QOS_POLICY volume %s, uuid %s, %d:%d',
|
||||
src, qos.get('uuid', ''), qos.get('maxIOPS', 0),
|
||||
qos.get('maxMBS', ''))
|
||||
self._call(COHO1_SET_QOS_POLICY,
|
||||
[(six.b(src), self.packer.pack_string),
|
||||
(six.b(qos.get('uuid', '')), self.packer.pack_string),
|
||||
|
@ -294,9 +298,7 @@ class CohoRPCClient(Client):
|
|||
# Coho Data Volume Driver
|
||||
#
|
||||
|
||||
VERSION = '1.1.0'
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
VERSION = '1.1.1'
|
||||
|
||||
coho_opts = [
|
||||
cfg.IntOpt('coho_rpc_port',
|
||||
|
@ -316,6 +318,7 @@ class CohoDriver(nfs.NfsDriver):
|
|||
Version history:
|
||||
1.0.0 - Initial driver
|
||||
1.1.0 - Added QoS support
|
||||
1.1.1 - Stability fixes in the RPC client
|
||||
"""
|
||||
|
||||
# We have to overload this attribute of RemoteFSDriver because
|
||||
|
@ -333,19 +336,11 @@ class CohoDriver(nfs.NfsDriver):
|
|||
def __init__(self, *args, **kwargs):
|
||||
super(CohoDriver, self).__init__(*args, **kwargs)
|
||||
self.configuration.append_config_values(coho_opts)
|
||||
self._rpcclients = dict()
|
||||
self._backend_name = (self.configuration.volume_backend_name or
|
||||
self.__class__.__name__)
|
||||
|
||||
def _init_rpcclient(self, addr, port):
|
||||
client = CohoRPCClient(addr, port)
|
||||
self._rpcclients[(addr, port)] = client
|
||||
return client
|
||||
|
||||
def _get_rpcclient(self, addr, port):
|
||||
if (addr, port) in self._rpcclients:
|
||||
return self._rpcclients[(addr, port)]
|
||||
return self._init_rpcclient(addr, port)
|
||||
return CohoRPCClient(addr, port)
|
||||
|
||||
def do_setup(self, context):
|
||||
"""Any initialization the volume driver does while starting."""
|
||||
|
|
Loading…
Reference in New Issue