
The very specific 'password' is being masked when logging requests but not when logging response bodies. This change fixes the response logging to mask passwords and also makes the request logging more robust as it was just checking for 'password' but the mask_password method handles much more than that. Change-Id: Id8bf583ecdf60eafb50fd69d6a19180ea97bd92c Closes-Bug: #1640269
316 lines
12 KiB
Python
316 lines
12 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import json
|
|
import logging
|
|
|
|
import fixtures
|
|
from keystoneauth1 import adapter
|
|
from keystoneauth1 import exceptions as keystone_exception
|
|
import mock
|
|
from oslo_serialization import jsonutils
|
|
import six
|
|
|
|
import cinderclient.client
|
|
import cinderclient.v1.client
|
|
import cinderclient.v2.client
|
|
from cinderclient import exceptions
|
|
from cinderclient.tests.unit import utils
|
|
|
|
|
|
class ClientTest(utils.TestCase):
|
|
|
|
def test_get_client_class_v1(self):
|
|
output = cinderclient.client.get_client_class('1')
|
|
self.assertEqual(cinderclient.v1.client.Client, output)
|
|
|
|
def test_get_client_class_v2(self):
|
|
output = cinderclient.client.get_client_class('2')
|
|
self.assertEqual(cinderclient.v2.client.Client, output)
|
|
|
|
def test_get_client_class_unknown(self):
|
|
self.assertRaises(cinderclient.exceptions.UnsupportedVersion,
|
|
cinderclient.client.get_client_class, '0')
|
|
|
|
@mock.patch.object(cinderclient.client.HTTPClient, '__init__')
|
|
@mock.patch('cinderclient.client.SessionClient')
|
|
def test_construct_http_client_bypass_url(
|
|
self, session_mock, httpclient_mock):
|
|
bypass_url = 'http://example.com/'
|
|
httpclient_mock.return_value = None
|
|
cinderclient.client._construct_http_client(
|
|
bypass_url=bypass_url)
|
|
self.assertTrue(httpclient_mock.called)
|
|
self.assertEqual(bypass_url,
|
|
httpclient_mock.call_args[1].get('bypass_url'))
|
|
session_mock.assert_not_called()
|
|
|
|
def test_log_req(self):
|
|
self.logger = self.useFixture(
|
|
fixtures.FakeLogger(
|
|
format="%(message)s",
|
|
level=logging.DEBUG,
|
|
nuke_handlers=True
|
|
)
|
|
)
|
|
|
|
kwargs = {
|
|
'headers': {"X-Foo": "bar"},
|
|
'data': ('{"auth": {"tenantName": "fakeService",'
|
|
' "passwordCredentials": {"username": "fakeUser",'
|
|
' "password": "fakePassword"}}}')
|
|
}
|
|
|
|
cs = cinderclient.client.HTTPClient("user", None, None,
|
|
"http://127.0.0.1:5000")
|
|
cs.http_log_debug = True
|
|
cs.http_log_req('PUT', kwargs)
|
|
|
|
output = self.logger.output.split('\n')
|
|
|
|
self.assertNotIn("fakePassword", output[1])
|
|
self.assertIn("fakeUser", output[1])
|
|
|
|
def test_versions(self):
|
|
v1_url = 'http://fakeurl/v1/tenants'
|
|
v2_url = 'http://fakeurl/v2/tenants'
|
|
unknown_url = 'http://fakeurl/v9/tenants'
|
|
|
|
self.assertEqual('1',
|
|
cinderclient.client.get_volume_api_from_url(v1_url))
|
|
self.assertEqual('2',
|
|
cinderclient.client.get_volume_api_from_url(v2_url))
|
|
self.assertRaises(cinderclient.exceptions.UnsupportedVersion,
|
|
cinderclient.client.get_volume_api_from_url,
|
|
unknown_url)
|
|
|
|
@mock.patch('cinderclient.client.SessionClient.get_endpoint')
|
|
def test_get_base_url(self, mock_get_endpoint):
|
|
url = 'http://192.168.122.104:8776/v3/de50d1f33a38415fadfd3e1dea28f4d3'
|
|
mock_get_endpoint.return_value = url
|
|
cs = cinderclient.client.SessionClient(self, api_version='3.0')
|
|
self.assertEqual('http://192.168.122.104:8776/', cs._get_base_url())
|
|
|
|
@mock.patch.object(cinderclient.client, '_log_request_id')
|
|
@mock.patch.object(adapter.Adapter, 'request')
|
|
@mock.patch.object(exceptions, 'from_response')
|
|
def test_sessionclient_request_method(
|
|
self, mock_from_resp, mock_request, mock_log):
|
|
kwargs = {
|
|
"body": {
|
|
"volume": {
|
|
"status": "creating",
|
|
"imageRef": "username",
|
|
"attach_status": "detached"
|
|
},
|
|
"authenticated": "True"
|
|
}
|
|
}
|
|
|
|
resp = {
|
|
"text": {
|
|
"volume": {
|
|
"status": "creating",
|
|
"id": "431253c0-e203-4da2-88df-60c756942aaf",
|
|
"size": 1
|
|
}
|
|
},
|
|
"code": 202
|
|
}
|
|
|
|
mock_response = utils.TestResponse({
|
|
"status_code": 202,
|
|
"text": six.b(json.dumps(resp)),
|
|
})
|
|
|
|
# 'request' method of Adaptor will return 202 response
|
|
mock_request.return_value = mock_response
|
|
session_client = cinderclient.client.SessionClient(session=mock.Mock())
|
|
response, body = session_client.request(mock.sentinel.url,
|
|
'POST', **kwargs)
|
|
self.assertIsNotNone(session_client._logger)
|
|
mock_log.assert_called_once_with(session_client._logger, mock_response,
|
|
mock.ANY)
|
|
|
|
# In this case, from_response method will not get called
|
|
# because response status_code is < 400
|
|
self.assertEqual(202, response.status_code)
|
|
self.assertFalse(mock_from_resp.called)
|
|
|
|
@mock.patch.object(cinderclient.client, '_log_request_id')
|
|
@mock.patch.object(adapter.Adapter, 'request')
|
|
def test_sessionclient_request_method_raises_badrequest(
|
|
self, mock_request, mock_log):
|
|
kwargs = {
|
|
"body": {
|
|
"volume": {
|
|
"status": "creating",
|
|
"imageRef": "username",
|
|
"attach_status": "detached"
|
|
},
|
|
"authenticated": "True"
|
|
}
|
|
}
|
|
|
|
resp = {
|
|
"badRequest": {
|
|
"message": "Invalid image identifier or unable to access "
|
|
"requested image.",
|
|
"code": 400
|
|
}
|
|
}
|
|
|
|
mock_response = utils.TestResponse({
|
|
"status_code": 400,
|
|
"text": six.b(json.dumps(resp)),
|
|
})
|
|
|
|
# 'request' method of Adaptor will return 400 response
|
|
mock_request.return_value = mock_response
|
|
session_client = cinderclient.client.SessionClient(
|
|
session=mock.Mock())
|
|
|
|
# 'from_response' method will raise BadRequest because
|
|
# resp.status_code is 400
|
|
self.assertRaises(exceptions.BadRequest, session_client.request,
|
|
mock.sentinel.url, 'POST', **kwargs)
|
|
self.assertIsNotNone(session_client._logger)
|
|
mock_log.assert_called_once_with(session_client._logger, mock_response,
|
|
mock.ANY)
|
|
|
|
@mock.patch.object(cinderclient.client, '_log_request_id')
|
|
@mock.patch.object(adapter.Adapter, 'request')
|
|
def test_sessionclient_request_method_raises_overlimit(
|
|
self, mock_request, mock_log):
|
|
resp = {
|
|
"overLimitFault": {
|
|
"message": "This request was rate-limited.",
|
|
"code": 413
|
|
}
|
|
}
|
|
|
|
mock_response = utils.TestResponse({
|
|
"status_code": 413,
|
|
"text": six.b(json.dumps(resp)),
|
|
})
|
|
|
|
# 'request' method of Adaptor will return 413 response
|
|
mock_request.return_value = mock_response
|
|
session_client = cinderclient.client.SessionClient(
|
|
session=mock.Mock())
|
|
|
|
self.assertRaises(exceptions.OverLimit, session_client.request,
|
|
mock.sentinel.url, 'GET')
|
|
self.assertIsNotNone(session_client._logger)
|
|
mock_log.assert_called_once_with(session_client._logger, mock_response,
|
|
mock.ANY)
|
|
|
|
@mock.patch.object(exceptions, 'from_response')
|
|
def test_keystone_request_raises_auth_failure_exception(
|
|
self, mock_from_resp):
|
|
|
|
kwargs = {
|
|
"body": {
|
|
"volume": {
|
|
"status": "creating",
|
|
"imageRef": "username",
|
|
"attach_status": "detached"
|
|
},
|
|
"authenticated": "True"
|
|
}
|
|
}
|
|
|
|
with mock.patch.object(adapter.Adapter, 'request',
|
|
side_effect=
|
|
keystone_exception.AuthorizationFailure()):
|
|
session_client = cinderclient.client.SessionClient(
|
|
session=mock.Mock())
|
|
self.assertRaises(keystone_exception.AuthorizationFailure,
|
|
session_client.request,
|
|
mock.sentinel.url, 'POST', **kwargs)
|
|
|
|
# As keystonesession.request method will raise
|
|
# AuthorizationFailure exception, check exceptions.from_response
|
|
# is not getting called.
|
|
self.assertFalse(mock_from_resp.called)
|
|
|
|
|
|
class ClientTestSensitiveInfo(utils.TestCase):
|
|
def test_req_does_not_log_sensitive_info(self):
|
|
self.logger = self.useFixture(
|
|
fixtures.FakeLogger(
|
|
format="%(message)s",
|
|
level=logging.DEBUG,
|
|
nuke_handlers=True
|
|
)
|
|
)
|
|
|
|
secret_auth_token = "MY_SECRET_AUTH_TOKEN"
|
|
kwargs = {
|
|
'headers': {"X-Auth-Token": secret_auth_token},
|
|
'data': ('{"auth": {"tenantName": "fakeService",'
|
|
' "passwordCredentials": {"username": "fakeUser",'
|
|
' "password": "fakePassword"}}}')
|
|
}
|
|
|
|
cs = cinderclient.client.HTTPClient("user", None, None,
|
|
"http://127.0.0.1:5000")
|
|
cs.http_log_debug = True
|
|
cs.http_log_req('PUT', kwargs)
|
|
|
|
output = self.logger.output.split('\n')
|
|
self.assertNotIn(secret_auth_token, output[1])
|
|
|
|
def test_resp_does_not_log_sensitive_info(self):
|
|
self.logger = self.useFixture(
|
|
fixtures.FakeLogger(
|
|
format="%(message)s",
|
|
level=logging.DEBUG,
|
|
nuke_handlers=True
|
|
)
|
|
)
|
|
cs = cinderclient.client.HTTPClient("user", None, None,
|
|
"http://127.0.0.1:5000")
|
|
resp = mock.Mock()
|
|
resp.status_code = 200
|
|
resp.headers = {
|
|
'x-compute-request-id': 'req-f551871a-4950-4225-9b2c-29a14c8f075e'
|
|
}
|
|
auth_password = "kk4qD6CpKFLyz9JD"
|
|
body = {
|
|
"connection_info": {
|
|
"driver_volume_type": "iscsi",
|
|
"data": {
|
|
"auth_password": auth_password,
|
|
"target_discovered": False,
|
|
"encrypted": False,
|
|
"qos_specs": None,
|
|
"target_iqn": ("iqn.2010-10.org.openstack:volume-"
|
|
"a2f33dcc-1bb7-45ba-b8fc-5b38179120f8"),
|
|
"target_portal": "10.0.100.186:3260",
|
|
"volume_id": "a2f33dcc-1bb7-45ba-b8fc-5b38179120f8",
|
|
"target_lun": 1,
|
|
"access_mode": "rw",
|
|
"auth_username": "s4BfSfZ67Bo2mnpuFWY8",
|
|
"auth_method": "CHAP"
|
|
}
|
|
}
|
|
}
|
|
resp.text = jsonutils.dumps(body)
|
|
cs.http_log_debug = True
|
|
cs.http_log_resp(resp)
|
|
|
|
output = self.logger.output.split('\n')
|
|
self.assertIn('***', output[1], output)
|
|
self.assertNotIn(auth_password, output[1], output)
|