Files
python-glanceclient/glanceclient/tests/unit/test_http.py
Vincent Untz f65ba82268 Ensure that identity token in header is not an unicode string
We need all the headers to be safe strings so they can be joined
together and not become an unicode string in doing so.

This fixes a bug when creating an image with non-ascii characters in the
name.

This is required for python 2.6 compatibility.

Change-Id: I66ebc27edf4ccd8f903399da58705711c372536d
Closes-Bug: 1448080
2015-11-17 18:14:20 +01:00

385 lines
16 KiB
Python

# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import json
from keystoneclient.auth import token_endpoint
from keystoneclient import session
import mock
import requests
from requests_mock.contrib import fixture
import six
from six.moves.urllib import parse
from testscenarios import load_tests_apply_scenarios as load_tests # noqa
import testtools
from testtools import matchers
import types
import glanceclient
from glanceclient.common import http
from glanceclient.tests import utils
def original_only(f):
@functools.wraps(f)
def wrapper(self, *args, **kwargs):
if not hasattr(self.client, 'log_curl_request'):
self.skipTest('Skip logging tests for session client')
return f(self, *args, **kwargs)
class TestClient(testtools.TestCase):
scenarios = [
('httpclient', {'create_client': '_create_http_client'}),
('session', {'create_client': '_create_session_client'})
]
def _create_http_client(self):
return http.HTTPClient(self.endpoint, token=self.token)
def _create_session_client(self):
auth = token_endpoint.Token(self.endpoint, self.token)
sess = session.Session(auth=auth)
return http.SessionClient(sess)
def setUp(self):
super(TestClient, self).setUp()
self.mock = self.useFixture(fixture.Fixture())
self.endpoint = 'http://example.com:9292'
self.ssl_endpoint = 'https://example.com:9292'
self.token = u'abc123'
self.client = getattr(self, self.create_client)()
def test_identity_headers_and_token(self):
identity_headers = {
'X-Auth-Token': 'auth_token',
'X-User-Id': 'user',
'X-Tenant-Id': 'tenant',
'X-Roles': 'roles',
'X-Identity-Status': 'Confirmed',
'X-Service-Catalog': 'service_catalog',
}
# with token
kwargs = {'token': u'fake-token',
'identity_headers': identity_headers}
http_client_object = http.HTTPClient(self.endpoint, **kwargs)
self.assertEqual('auth_token', http_client_object.auth_token)
self.assertTrue(http_client_object.identity_headers.
get('X-Auth-Token') is None)
def test_identity_headers_and_no_token_in_header(self):
identity_headers = {
'X-User-Id': 'user',
'X-Tenant-Id': 'tenant',
'X-Roles': 'roles',
'X-Identity-Status': 'Confirmed',
'X-Service-Catalog': 'service_catalog',
}
# without X-Auth-Token in identity headers
kwargs = {'token': u'fake-token',
'identity_headers': identity_headers}
http_client_object = http.HTTPClient(self.endpoint, **kwargs)
self.assertEqual(u'fake-token', http_client_object.auth_token)
self.assertTrue(http_client_object.identity_headers.
get('X-Auth-Token') is None)
def test_identity_headers_and_no_token_in_session_header(self):
# Tests that if token or X-Auth-Token are not provided in the kwargs
# when creating the http client, the session headers don't contain
# the X-Auth-Token key.
identity_headers = {
'X-User-Id': 'user',
'X-Tenant-Id': 'tenant',
'X-Roles': 'roles',
'X-Identity-Status': 'Confirmed',
'X-Service-Catalog': 'service_catalog',
}
kwargs = {'identity_headers': identity_headers}
http_client_object = http.HTTPClient(self.endpoint, **kwargs)
self.assertIsNone(http_client_object.auth_token)
self.assertNotIn('X-Auth-Token', http_client_object.session.headers)
def test_identity_headers_are_passed(self):
# Tests that if token or X-Auth-Token are not provided in the kwargs
# when creating the http client, the session headers don't contain
# the X-Auth-Token key.
identity_headers = {
'X-User-Id': b'user',
'X-Tenant-Id': b'tenant',
'X-Roles': b'roles',
'X-Identity-Status': b'Confirmed',
'X-Service-Catalog': b'service_catalog',
}
kwargs = {'identity_headers': identity_headers}
http_client = http.HTTPClient(self.endpoint, **kwargs)
path = '/v1/images/my-image'
self.mock.get(self.endpoint + path)
http_client.get(path)
headers = self.mock.last_request.headers
for k, v in six.iteritems(identity_headers):
self.assertEqual(v, headers[k])
def test_language_header_passed(self):
kwargs = {'language_header': 'nb_NO'}
http_client = http.HTTPClient(self.endpoint, **kwargs)
path = '/v2/images/my-image'
self.mock.get(self.endpoint + path)
http_client.get(path)
headers = self.mock.last_request.headers
self.assertEqual(kwargs['language_header'], headers['Accept-Language'])
def test_language_header_not_passed_no_language(self):
kwargs = {}
http_client = http.HTTPClient(self.endpoint, **kwargs)
path = '/v2/images/my-image'
self.mock.get(self.endpoint + path)
http_client.get(path)
headers = self.mock.last_request.headers
self.assertTrue('Accept-Language' not in headers)
def test_connection_timeout(self):
"""Should receive an InvalidEndpoint if connection timeout."""
def cb(request, context):
raise requests.exceptions.Timeout
path = '/v1/images'
self.mock.get(self.endpoint + path, text=cb)
comm_err = self.assertRaises(glanceclient.exc.InvalidEndpoint,
self.client.get,
'/v1/images')
self.assertIn(self.endpoint, comm_err.message)
def test_connection_refused(self):
"""
Should receive a CommunicationError if connection refused.
And the error should list the host and port that refused the
connection
"""
def cb(request, context):
raise requests.exceptions.ConnectionError()
path = '/v1/images/detail?limit=20'
self.mock.get(self.endpoint + path, text=cb)
comm_err = self.assertRaises(glanceclient.exc.CommunicationError,
self.client.get,
'/v1/images/detail?limit=20')
self.assertIn(self.endpoint, comm_err.message)
def test_http_encoding(self):
path = '/v1/images/detail'
text = 'Ok'
self.mock.get(self.endpoint + path, text=text,
headers={"Content-Type": "text/plain"})
headers = {"test": u'ni\xf1o'}
resp, body = self.client.get(path, headers=headers)
self.assertEqual(text, resp.text)
def test_headers_encoding(self):
if not hasattr(self.client, 'encode_headers'):
self.skipTest('Cannot do header encoding check on SessionClient')
value = u'ni\xf1o'
headers = {"test": value, "none-val": None}
encoded = self.client.encode_headers(headers)
self.assertEqual(b"ni\xc3\xb1o", encoded[b"test"])
self.assertNotIn("none-val", encoded)
def test_auth_token_header_encoding(self):
# Tests that X-Auth-Token header is converted to ascii string, as
# httplib in python 2.6 won't do the conversion
value = u'ni\xf1o'
http_client_object = http.HTTPClient(self.endpoint, token=value)
self.assertEqual(b'ni\xc3\xb1o',
http_client_object.session.headers['X-Auth-Token'])
def test_raw_request(self):
"""Verify the path being used for HTTP requests reflects accurately."""
headers = {"Content-Type": "text/plain"}
text = 'Ok'
path = '/v1/images/detail'
self.mock.get(self.endpoint + path, text=text, headers=headers)
resp, body = self.client.get('/v1/images/detail', headers=headers)
self.assertEqual(headers, resp.headers)
self.assertEqual(text, resp.text)
def test_parse_endpoint(self):
endpoint = 'http://example.com:9292'
test_client = http.HTTPClient(endpoint, token=u'adc123')
actual = test_client.parse_endpoint(endpoint)
expected = parse.SplitResult(scheme='http',
netloc='example.com:9292', path='',
query='', fragment='')
self.assertEqual(expected, actual)
def test_get_connections_kwargs_http(self):
endpoint = 'http://example.com:9292'
test_client = http.HTTPClient(endpoint, token=u'adc123')
self.assertEqual(test_client.timeout, 600.0)
def test_http_chunked_request(self):
text = "Ok"
data = six.StringIO(text)
path = '/v1/images/'
self.mock.post(self.endpoint + path, text=text)
headers = {"test": u'chunked_request'}
resp, body = self.client.post(path, headers=headers, data=data)
self.assertIsInstance(self.mock.last_request.body, types.GeneratorType)
self.assertEqual(text, resp.text)
def test_http_json(self):
data = {"test": "json_request"}
path = '/v1/images'
text = 'OK'
self.mock.post(self.endpoint + path, text=text)
headers = {"test": u'chunked_request'}
resp, body = self.client.post(path, headers=headers, data=data)
self.assertEqual(text, resp.text)
self.assertIsInstance(self.mock.last_request.body, six.string_types)
self.assertEqual(data, json.loads(self.mock.last_request.body))
def test_http_chunked_response(self):
data = "TEST"
path = '/v1/images/'
self.mock.get(self.endpoint + path, body=six.StringIO(data),
headers={"Content-Type": "application/octet-stream"})
resp, body = self.client.get(path)
self.assertTrue(isinstance(body, types.GeneratorType))
self.assertEqual([data], list(body))
@original_only
def test_log_http_response_with_non_ascii_char(self):
try:
response = 'Ok'
headers = {"Content-Type": "text/plain",
"test": "value1\xa5\xa6"}
fake = utils.FakeResponse(headers, six.StringIO(response))
self.client.log_http_response(fake)
except UnicodeDecodeError as e:
self.fail("Unexpected UnicodeDecodeError exception '%s'" % e)
@original_only
def test_log_curl_request_with_non_ascii_char(self):
try:
headers = {'header1': 'value1\xa5\xa6'}
body = 'examplebody\xa5\xa6'
self.client.log_curl_request('GET', '/api/v1/\xa5', headers, body,
None)
except UnicodeDecodeError as e:
self.fail("Unexpected UnicodeDecodeError exception '%s'" % e)
@original_only
@mock.patch('glanceclient.common.http.LOG.debug')
def test_log_curl_request_with_body_and_header(self, mock_log):
hd_name = 'header1'
hd_val = 'value1'
headers = {hd_name: hd_val}
body = 'examplebody'
self.client.log_curl_request('GET', '/api/v1/', headers, body, None)
self.assertTrue(mock_log.called, 'LOG.debug never called')
self.assertTrue(mock_log.call_args[0],
'LOG.debug called with no arguments')
hd_regex = ".*\s-H\s+'\s*%s\s*:\s*%s\s*'.*" % (hd_name, hd_val)
self.assertThat(mock_log.call_args[0][0],
matchers.MatchesRegex(hd_regex),
'header not found in curl command')
body_regex = ".*\s-d\s+'%s'\s.*" % body
self.assertThat(mock_log.call_args[0][0],
matchers.MatchesRegex(body_regex),
'body not found in curl command')
def _test_log_curl_request_with_certs(self, mock_log, key, cert, cacert):
headers = {'header1': 'value1'}
http_client_object = http.HTTPClient(self.ssl_endpoint, key_file=key,
cert_file=cert, cacert=cacert,
token='fake-token')
http_client_object.log_curl_request('GET', '/api/v1/', headers, None,
None)
self.assertTrue(mock_log.called, 'LOG.debug never called')
self.assertTrue(mock_log.call_args[0],
'LOG.debug called with no arguments')
needles = {'key': key, 'cert': cert, 'cacert': cacert}
for option, value in six.iteritems(needles):
if value:
regex = ".*\s--%s\s+('%s'|%s).*" % (option, value, value)
self.assertThat(mock_log.call_args[0][0],
matchers.MatchesRegex(regex),
'no --%s option in curl command' % option)
else:
regex = ".*\s--%s\s+.*" % option
self.assertThat(mock_log.call_args[0][0],
matchers.Not(matchers.MatchesRegex(regex)),
'unexpected --%s option in curl command' %
option)
@mock.patch('glanceclient.common.http.LOG.debug')
def test_log_curl_request_with_all_certs(self, mock_log):
self._test_log_curl_request_with_certs(mock_log, 'key1', 'cert1',
'cacert2')
@mock.patch('glanceclient.common.http.LOG.debug')
def test_log_curl_request_with_some_certs(self, mock_log):
self._test_log_curl_request_with_certs(mock_log, 'key1', 'cert1', None)
@mock.patch('glanceclient.common.http.LOG.debug')
def test_log_curl_request_with_insecure_param(self, mock_log):
headers = {'header1': 'value1'}
http_client_object = http.HTTPClient(self.ssl_endpoint, insecure=True,
token='fake-token')
http_client_object.log_curl_request('GET', '/api/v1/', headers, None,
None)
self.assertTrue(mock_log.called, 'LOG.debug never called')
self.assertTrue(mock_log.call_args[0],
'LOG.debug called with no arguments')
self.assertThat(mock_log.call_args[0][0],
matchers.MatchesRegex('.*\s-k\s.*'),
'no -k option in curl command')
@mock.patch('glanceclient.common.http.LOG.debug')
def test_log_curl_request_with_token_header(self, mock_log):
fake_token = 'fake-token'
headers = {'X-Auth-Token': fake_token}
http_client_object = http.HTTPClient(self.endpoint,
identity_headers=headers)
http_client_object.log_curl_request('GET', '/api/v1/', headers, None,
None)
self.assertTrue(mock_log.called, 'LOG.debug never called')
self.assertTrue(mock_log.call_args[0],
'LOG.debug called with no arguments')
token_regex = '.*%s.*' % fake_token
self.assertThat(mock_log.call_args[0][0],
matchers.Not(matchers.MatchesRegex(token_regex)),
'token found in LOG.debug parameter')