Merge "Replaces httplib with requests lib in Quobyte RPC layer"

This commit is contained in:
Jenkins 2016-08-01 18:08:47 +00:00 committed by Gerrit Code Review
commit 00f8908984
3 changed files with 148 additions and 412 deletions

View File

@ -18,184 +18,88 @@
Control Quobyte over its JSON RPC API.
"""
import base64
import socket
import ssl
import requests
from requests import auth
from requests import codes
from oslo_log import log
from oslo_serialization import jsonutils
from oslo_utils import excutils
import six
from six.moves import http_client
import six.moves.urllib.parse as urlparse
from manila import exception
from manila.i18n import _, _LW
from manila.i18n import _LW
from manila import utils
LOG = log.getLogger(__name__)
ERROR_ENOENT = 2
CONNECTION_RETRIES = 3
class BasicAuthCredentials(object):
def __init__(self, username, password):
self._username = username
self._password = password
@property
def username(self):
return self._username
def get_authorization_header(self):
header = '%s:%s' % (self._username, self._password)
auth = base64.standard_b64encode(six.b(header))
return 'BASIC %s' % auth.decode()
class HTTPSConnectionWithCaVerification(http_client.HTTPConnection):
"""Verify server cert against a given CA certificate."""
default_port = http_client.HTTPS_PORT
def __init__(self, host, port=None, key_file=None, cert_file=None,
ca_file=None,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
http_client.HTTPConnection.__init__(self, host, port, timeout=timeout)
self.key_file = key_file
self.cert_file = cert_file
self.ca_file = ca_file
def connect(self):
"""Connect to a host on a given (SSL) port."""
sock = socket.create_connection((self.host, self.port), self.timeout)
if self._tunnel_host:
self.sock = sock
self._tunnel()
self.sock = ssl.wrap_socket(sock, keyfile=self.key_file,
certfile=self.cert_file,
ca_certs=self.ca_file,
cert_reqs=ssl.CERT_REQUIRED)
http_client.__all__.append("HTTPSConnectionWithCaVerification")
class JsonRpc(object):
def __init__(self, url, user_credentials, ca_file=None):
def __init__(self, url, user_credentials, ca_file=None, key_file=None,
cert_file=None):
parsedurl = urlparse.urlparse(url)
self._url = parsedurl.geturl()
self._netloc = parsedurl.netloc
self._ca_file = ca_file
if parsedurl.scheme == 'https':
if self._ca_file:
self._connection = HTTPSConnectionWithCaVerification(
self._netloc,
ca_file=self._ca_file.name)
else:
self._connection = http_client.HTTPSConnection(self._netloc)
self._url_scheme = parsedurl.scheme
if self._url_scheme == 'https':
if not self._ca_file:
self._ca_file = False
LOG.warning(_LW(
"Will not verify the server certificate of the API service"
" because the CA certificate is not available."))
else:
self._connection = http_client.HTTPConnection(self._netloc)
self._id = 0
self._fail_fast = True
self._credentials = BasicAuthCredentials(
self._credentials = auth.HTTPBasicAuth(
user_credentials[0], user_credentials[1])
self._require_cert_verify = self._ca_file is not None
self._disabled_cert_verification = False
self._key_file = key_file
self._cert_file = cert_file
@utils.synchronized('quobyte-request')
def call(self, method_name, user_parameters):
# prepare request
self._id += 1
parameters = {'retry': 'INFINITELY'} # Backend specific setting
if user_parameters:
parameters.update(user_parameters)
call_body = {'jsonrpc': '2.0',
'method': method_name,
'params': parameters,
'id': six.text_type(self._id)}
self.call_counter = 0
self._connection.connect() # prevents http_client timing issue
post_data = {
'jsonrpc': '2.0',
'method': method_name,
'params': parameters,
'id': six.text_type(self._id),
}
LOG.debug("Request payload to be send is: %s",
jsonutils.dumps(post_data))
while self.call_counter < CONNECTION_RETRIES:
self.call_counter += 1
try:
self._id += 1
call_body['id'] = six.text_type(self._id)
LOG.debug("Posting to Quobyte backend: %s",
jsonutils.dumps(call_body))
self._connection.request(
"POST", self._url + '/', jsonutils.dumps(call_body),
dict(Authorization=(self._credentials.
get_authorization_header())))
# send request
if self._url_scheme == 'https':
if self._cert_file:
result = requests.post(url=self._url,
json=post_data,
auth=self._credentials,
verify=self._ca_file,
cert=(self._cert_file, self._key_file))
else:
result = requests.post(url=self._url,
json=post_data,
auth=self._credentials,
verify=self._ca_file)
else:
result = requests.post(url=self._url,
json=post_data,
auth=self._credentials)
response = self._connection.getresponse()
self._throw_on_http_error(response)
result = jsonutils.loads(response.read())
LOG.debug("Retrieved data from Quobyte backend: %s", result)
return self._checked_for_application_error(result)
except ssl.SSLError as e:
# Generic catch because OpenSSL does not return
# meaningful errors.
if (not self._disabled_cert_verification
and not self._require_cert_verify):
LOG.warning(_LW(
"Could not verify server certificate of "
"API service against CA."))
self._connection.close()
# Core HTTPSConnection does no certificate verification.
self._connection = http_client.HTTPSConnection(
self._netloc)
self._disabled_cert_verification = True
else:
raise exception.QBException(_(
"Client SSL subsystem returned error: %s") % e)
except http_client.BadStatusLine as e:
raise exception.QBException(_(
"If SSL is enabled for the API service, the URL must"
" start with 'https://' for the URL. Failed to parse"
" status code from server response. Error was %s")
% e)
except socket.error as se:
error_code = se.errno
error_msg = se.strerror
composite_msg = _("Socket error No. %(code)s (%(msg)s) "
"connecting to API with") % {
'code': (six.text_type(error_code)),
'msg': error_msg}
if self._fail_fast:
raise exception.QBException(composite_msg)
else:
LOG.warning(composite_msg)
except http_client.HTTPException as e:
with excutils.save_and_reraise_exception() as ctxt:
if self._fail_fast:
ctxt.reraise = True
else:
LOG.warning(_LW("Encountered error, retrying: %s"),
six.text_type(e))
ctxt.reraise = False
# eval request response
if result.status_code == codes['OK']:
LOG.debug("Retrieved data from Quobyte backend: %s", result.text)
response = result.json()
return self._checked_for_application_error(response)
raise exception.QBException("Unable to connect to backend after "
"%s retries" %
six.text_type(CONNECTION_RETRIES))
def _throw_on_http_error(self, response):
if response.status == 401:
raise exception.QBException(
_("JSON RPC failed: unauthorized user %(status)s %(reason)s"
" Please check the Quobyte API service log for "
"more details.")
% {'status': six.text_type(response.status),
'reason': response.reason})
elif response.status >= 300:
raise exception.QBException(
_("JSON RPC failed: %(status)s %(reason)s"
" Please check the Quobyte API service log for "
"more details.")
% {'status': six.text_type(response.status),
'reason': response.reason})
# If things did not work out provide error info
LOG.debug("Backend request resulted in error: %s" % result.text)
result.raise_for_status()
def _checked_for_application_error(self, result):
if 'error' in result and result['error']:

View File

@ -78,9 +78,10 @@ class QuobyteShareDriver(driver.ExecuteMixin, driver.ShareDriver,):
1.2 - Adds update_access() implementation and related methods
1.2.1 - Improved capacity calculation
1.2.2 - Minor optimizations
1.2.3 - Updated RPC layer for improved stability
"""
DRIVER_VERSION = '1.2.2'
DRIVER_VERSION = '1.2.3'
def __init__(self, *args, **kwargs):
super(QuobyteShareDriver, self).__init__(False, *args, **kwargs)

View File

@ -13,15 +13,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import socket
import ssl
import requests
from requests import auth
from requests import exceptions
import tempfile
import time
import mock
from oslo_serialization import jsonutils
import six
from six.moves import http_client
from manila import exception
from manila.share.drivers.quobyte import jsonrpc
@ -30,78 +29,13 @@ from manila import test
class FakeResponse(object):
def __init__(self, status, body):
self.status = status
self.status_code = status
self.reason = "HTTP reason"
self._body = body
self.body = body
self.text = six.text_type(body)
def read(self):
return self._body
class QuobyteBasicAuthCredentialsTestCase(test.TestCase):
def test_get_authorization_header(self):
creds = jsonrpc.BasicAuthCredentials('fakeuser', 'fakepwd')
self.assertEqual('BASIC ZmFrZXVzZXI6ZmFrZXB3ZA==',
creds.get_authorization_header())
class QuobyteHttpsConnectionWithCaVerificationTestCase(test.TestCase):
@mock.patch.object(socket, "create_connection",
return_value="fake_socket")
@mock.patch.object(ssl, "wrap_socket")
def test_https_with_ca_connect(self, mock_ssl, mock_cc):
key_file = tempfile.TemporaryFile()
cert_file = tempfile.gettempdir()
ca_file = tempfile.gettempdir()
mycon = (jsonrpc.
HTTPSConnectionWithCaVerification(host="localhost",
key_file=key_file,
cert_file=cert_file,
ca_file=ca_file,
port=1234,
timeout=999))
mycon.connect()
mock_cc.assert_called_once_with(("localhost", 1234), 999)
mock_ssl.assert_called_once_with("fake_socket",
keyfile=key_file,
certfile=cert_file,
ca_certs=ca_file,
cert_reqs=mock.ANY)
@mock.patch.object(http_client.HTTPConnection, "_tunnel")
@mock.patch.object(socket, "create_connection",
return_value="fake_socket")
@mock.patch.object(ssl, "wrap_socket")
def test_https_with_ca_connect_tunnel(self,
mock_ssl,
mock_cc,
mock_tunnel):
key_file = tempfile.TemporaryFile()
cert_file = tempfile.gettempdir()
ca_file = tempfile.gettempdir()
mycon = (jsonrpc.
HTTPSConnectionWithCaVerification(host="localhost",
key_file=key_file,
cert_file=cert_file,
ca_file=ca_file,
port=1234,
timeout=999))
mycon._tunnel_host = "fake_tunnel_host"
mycon.connect()
mock_tunnel.assert_called_once_with()
mock_cc.assert_called_once_with(("localhost", 1234), 999)
mock_ssl.assert_called_once_with("fake_socket",
keyfile=key_file,
certfile=cert_file,
ca_certs=ca_file,
cert_reqs=mock.ANY)
def json(self):
return self.body
class QuobyteJsonRpcTestCase(test.TestCase):
@ -110,38 +44,35 @@ class QuobyteJsonRpcTestCase(test.TestCase):
super(QuobyteJsonRpcTestCase, self).setUp()
self.rpc = jsonrpc.JsonRpc(url="http://test",
user_credentials=("me", "team"))
self.mock_object(self.rpc, '_connection')
self.mock_object(time, 'sleep')
def test_request_generation_and_basic_auth(self):
self.mock_object(
self.rpc._connection,
'getresponse',
mock.Mock(return_value=FakeResponse(200, '{"result":"yes"}')))
@mock.patch.object(requests, 'post',
return_value=FakeResponse(200, {"result": "yes"}))
def test_request_generation_and_basic_auth(self, req_get_mock):
self.rpc.call('method', {'param': 'value'})
self.rpc._connection.request.assert_called_once_with(
'POST', 'http://test/',
jsonutils.dumps({'jsonrpc': '2.0',
'method': 'method',
'params': {'retry': 'INFINITELY',
'param': 'value'},
'id': '1'}),
dict(Authorization=jsonrpc.BasicAuthCredentials("me", "team")
.get_authorization_header()))
req_get_mock.assert_called_once_with(
url='http://test',
auth=auth.HTTPBasicAuth("me", "team"),
json=mock.ANY)
@mock.patch.object(jsonrpc.HTTPSConnectionWithCaVerification,
'__init__',
return_value=None)
def test_jsonrpc_init_with_ca(self, mock_init):
def test_jsonrpc_init_with_ca(self):
foofile = tempfile.TemporaryFile()
self.rpc = jsonrpc.JsonRpc("https://foo.bar/",
('fakeuser', 'fakepwd'),
foofile)
fake_url = "https://foo.bar/"
fake_credentials = ('fakeuser', 'fakepwd')
fake_cert_file = tempfile.TemporaryFile()
fake_key_file = tempfile.TemporaryFile()
self.rpc = jsonrpc.JsonRpc(url=fake_url,
user_credentials=fake_credentials,
ca_file=foofile,
key_file=fake_key_file,
cert_file=fake_cert_file)
mock_init.assert_called_once_with("foo.bar",
ca_file=foofile.name)
self.assertEqual("https", self.rpc._url_scheme)
self.assertEqual(fake_url, self.rpc._url)
self.assertEqual(foofile, self.rpc._ca_file)
self.assertEqual(fake_cert_file, self.rpc._cert_file)
self.assertEqual(fake_key_file, self.rpc._key_file)
@mock.patch.object(jsonrpc.LOG, "warning")
def test_jsonrpc_init_without_ca(self, mock_warning):
@ -153,190 +84,90 @@ class QuobyteJsonRpcTestCase(test.TestCase):
"Will not verify the server certificate of the API service"
" because the CA certificate is not available.")
@mock.patch.object(http_client.HTTPConnection,
'__init__',
return_value=None)
def test_jsonrpc_init_no_ssl(self, mock_init):
def test_jsonrpc_init_no_ssl(self):
self.rpc = jsonrpc.JsonRpc("http://foo.bar/",
('fakeuser', 'fakepwd'))
mock_init.assert_called_once_with("foo.bar")
self.assertEqual("http", self.rpc._url_scheme)
def test_successful_call(self):
self.mock_object(
self.rpc._connection, 'getresponse',
mock.Mock(return_value=FakeResponse(
200, '{"result":"Sweet gorilla of Manila"}')))
@mock.patch.object(requests, "post",
return_value=FakeResponse(
200, {"result": "Sweet gorilla of Manila"}))
def test_successful_call(self, mock_req_get):
result = self.rpc.call('method', {'param': 'value'})
mock_req_get.assert_called_once_with(
url=self.rpc._url,
json=mock.ANY, # not checking here as of undefined order in dict
auth=self.rpc._credentials)
self.assertEqual("Sweet gorilla of Manila", result)
@mock.patch.object(requests, "post",
return_value=FakeResponse(
200, {"result": "Sweet gorilla of Manila"}))
def test_https_call_with_cert(self, mock_req_get):
fake_cert_file = tempfile.TemporaryFile()
fake_key_file = tempfile.TemporaryFile()
self.rpc = jsonrpc.JsonRpc(url="https://test",
user_credentials=("me", "team"),
cert_file=fake_cert_file,
key_file=fake_key_file)
result = self.rpc.call('method', {'param': 'value'})
self.rpc._connection.connect.assert_called_once_with()
mock_req_get.assert_called_once_with(
url=self.rpc._url,
json=mock.ANY, # not checking here as of undefined order in dict
auth=self.rpc._credentials,
verify=False,
cert=(fake_cert_file, fake_key_file))
self.assertEqual("Sweet gorilla of Manila", result)
@mock.patch('six.moves.http_client.HTTPSConnection')
def test_jsonrpc_call_ssl_disable(self, mock_connection):
mock_connection.return_value = self.rpc._connection
self.mock_object(
self.rpc._connection,
'request',
mock.Mock(side_effect=ssl.SSLError))
self.mock_object(
self.rpc._connection,
'getresponse',
mock.Mock(return_value=FakeResponse(
403, '{"error":{"code":28,"message":"text"}}')))
self.mock_object(jsonrpc.LOG, 'warning')
@mock.patch.object(requests, "post",
return_value=FakeResponse(
200, {"result": "Sweet gorilla of Manila"}))
def test_https_call_verify(self, mock_req_get):
fake_ca_file = tempfile.TemporaryFile()
self.rpc = jsonrpc.JsonRpc(url="https://test",
user_credentials=("me", "team"),
ca_file=fake_ca_file)
self.assertRaises(exception.QBException,
result = self.rpc.call('method', {'param': 'value'})
mock_req_get.assert_called_once_with(
url=self.rpc._url,
json=mock.ANY, # not checking here as of undefined order in dict
auth=self.rpc._credentials,
verify=fake_ca_file)
self.assertEqual("Sweet gorilla of Manila", result)
@mock.patch.object(requests, "post", side_effect=exceptions.HTTPError)
def test_jsonrpc_call_http_exception(self, req_get_mock):
self.assertRaises(exceptions.HTTPError,
self.rpc.call,
'method', {'param': 'value'})
req_get_mock.assert_called_once_with(
url=self.rpc._url,
json=mock.ANY, # not checking here as of undefined order in dict
auth=self.rpc._credentials)
self.assertTrue(self.rpc._disabled_cert_verification)
jsonrpc.LOG.warning.assert_called_once_with(
"Could not verify server certificate of "
"API service against CA.")
def test_jsonrpc_call_ssl_error(self):
"""This test succeeds if a specific exception is thrown.
Throwing a different exception or none at all
is a failure in this specific test case.
"""
self.mock_object(
self.rpc._connection,
'request',
mock.Mock(side_effect=ssl.SSLError))
self.rpc._disabled_cert_verification = True
try:
self.rpc.call('method', {'param': 'value'})
except exception.QBException as me:
self.rpc._connection.connect.assert_called_once_with()
(self.assertTrue(six.text_type(me).startswith
('Client SSL subsystem returned error:')))
except Exception as e:
self.fail('Unexpected exception thrown: %s' % e)
else:
self.fail('Expected exception not thrown')
def test_jsonrpc_call_bad_status_line(self):
self.mock_object(
self.rpc._connection,
'getresponse',
mock.Mock(side_effect=http_client.BadStatusLine("fake_line")))
self.assertRaises(exception.QBException,
self.rpc.call,
'method', {'param': 'value'})
def test_jsonrpc_call_http_exception(self):
self.mock_object(
self.rpc._connection,
'getresponse',
mock.Mock(side_effect=http_client.HTTPException))
self.mock_object(jsonrpc.LOG, 'warning')
self.assertRaises(http_client.HTTPException,
self.rpc.call,
'method', {'param': 'value'})
self.rpc._connection.connect.assert_called_once_with()
jsonrpc.LOG.warning.assert_has_calls([])
def test_jsonrpc_call_socket_error(self):
self.mock_object(
self.rpc._connection,
'getresponse',
mock.Mock(side_effect=socket.error(23, "Test")))
self.mock_object(jsonrpc.LOG, 'warning')
self.assertRaises(exception.QBException,
self.rpc.call,
'method', {'param': 'value'})
self.rpc._connection.connect.assert_called_once_with()
jsonrpc.LOG.warning.assert_has_calls([])
def test_jsonrpc_call_http_exception_retry(self):
self.mock_object(
self.rpc._connection,
'getresponse',
mock.Mock(side_effect=http_client.HTTPException))
self.mock_object(jsonrpc.LOG, 'warning')
self.rpc._fail_fast = False
self.assertRaises(exception.QBException,
self.rpc.call,
'method', {'param': 'value'})
self.rpc._connection.connect.assert_called_once_with()
jsonrpc.LOG.warning.assert_called_with(
"Encountered error, retrying: %s", "")
def test_jsonrpc_call_no_connect(self):
orig_retries = jsonrpc.CONNECTION_RETRIES
jsonrpc.CONNECTION_RETRIES = 0
try:
self.rpc.call('method', {'param': 'value'})
except exception.QBException as me:
self.rpc._connection.connect.assert_called_once_with()
self.assertEqual("Unable to connect to backend after 0 retries",
six.text_type(me))
else:
self.fail('Expected exception not thrown')
finally:
jsonrpc.CONNECTION_RETRIES = orig_retries
def test_http_error_401(self):
self.mock_object(
self.rpc._connection,
'getresponse',
mock.Mock(return_value=FakeResponse(401, '')))
self.assertRaises(exception.QBException,
self.rpc.call, 'method', {'param': 'value'})
self.rpc._connection.connect.assert_called_once_with()
def test_http_error_other(self):
self.mock_object(
self.rpc._connection,
'getresponse',
mock.Mock(return_value=FakeResponse(300, '')))
self.assertRaises(exception.QBException,
self.rpc.call, 'method', {'param': 'value'})
self.rpc._connection.connect.assert_called_once_with()
self.assertTrue(self.rpc._connection.getresponse.called)
def test_application_error(self):
self.mock_object(
self.rpc._connection,
'getresponse',
mock.Mock(return_value=FakeResponse(
200, '{"error":{"code":28,"message":"text"}}')))
@mock.patch.object(requests, "post",
return_value=FakeResponse(
200,
{"error": {"code": 28, "message": "text"}}))
def test_application_error(self, req_get_mock):
self.assertRaises(exception.QBRpcException,
self.rpc.call, 'method', {'param': 'value'})
self.rpc._connection.connect.assert_called_once_with()
self.assertTrue(self.rpc._connection.getresponse.called)
def test_broken_application_error(self):
self.mock_object(
self.rpc._connection,
'getresponse',
mock.Mock(return_value=FakeResponse(
200, '{"error":{"code":28,"message":"text"}}')))
self.assertRaises(exception.QBRpcException,
self.rpc.call, 'method', {'param': 'value'})
self.rpc._connection.connect.assert_called_once_with()
self.assertTrue(self.rpc._connection.getresponse.called)
req_get_mock.assert_called_once_with(
url=self.rpc._url,
json=mock.ANY, # not checking here as of undefined order in dict
auth=self.rpc._credentials)
def test_checked_for_application_error(self):
resultdict = {"result": "Sweet gorilla of Manila"}
self.assertEqual("Sweet gorilla of Manila",
(self.rpc.
_checked_for_application_error(result=resultdict))
)
(self.rpc._checked_for_application_error(
result=resultdict)))
def test_checked_for_application_error_no_entry(self):
resultdict = {"result": "Sweet gorilla of Manila",