Merge "Do not use the six library in the tests."

This commit is contained in:
Zuul
2020-07-16 21:25:14 +00:00
committed by Gerrit Code Review
9 changed files with 76 additions and 105 deletions

View File

@@ -19,13 +19,13 @@ from unittest import mock
import uuid import uuid
import fixtures import fixtures
import io
from keystoneauth1 import session from keystoneauth1 import session
from keystoneauth1 import token_endpoint from keystoneauth1 import token_endpoint
from oslo_utils import encodeutils from oslo_utils import encodeutils
import requests import requests
from requests_mock.contrib import fixture from requests_mock.contrib import fixture
import six from urllib import parse
from six.moves.urllib import parse
from testscenarios import load_tests_apply_scenarios as load_tests # noqa from testscenarios import load_tests_apply_scenarios as load_tests # noqa
import testtools import testtools
from testtools import matchers from testtools import matchers
@@ -310,14 +310,14 @@ class TestClient(testtools.TestCase):
def test__chunk_body_exact_size_chunk(self): def test__chunk_body_exact_size_chunk(self):
test_client = http._BaseHTTPClient() test_client = http._BaseHTTPClient()
bytestring = b'x' * http.CHUNKSIZE bytestring = b'x' * http.CHUNKSIZE
data = six.BytesIO(bytestring) data = io.BytesIO(bytestring)
chunk = list(test_client._chunk_body(data)) chunk = list(test_client._chunk_body(data))
self.assertEqual(1, len(chunk)) self.assertEqual(1, len(chunk))
self.assertEqual([bytestring], chunk) self.assertEqual([bytestring], chunk)
def test_http_chunked_request(self): def test_http_chunked_request(self):
text = "Ok" text = "Ok"
data = six.StringIO(text) data = io.StringIO(text)
path = '/v1/images/' path = '/v1/images/'
self.mock.post(self.endpoint + path, text=text) self.mock.post(self.endpoint + path, text=text)
@@ -336,13 +336,13 @@ class TestClient(testtools.TestCase):
resp, body = self.client.post(path, headers=headers, data=data) resp, body = self.client.post(path, headers=headers, data=data)
self.assertEqual(text, resp.text) self.assertEqual(text, resp.text)
self.assertIsInstance(self.mock.last_request.body, six.string_types) self.assertIsInstance(self.mock.last_request.body, str)
self.assertEqual(data, json.loads(self.mock.last_request.body)) self.assertEqual(data, json.loads(self.mock.last_request.body))
def test_http_chunked_response(self): def test_http_chunked_response(self):
data = "TEST" data = "TEST"
path = '/v1/images/' path = '/v1/images/'
self.mock.get(self.endpoint + path, body=six.StringIO(data), self.mock.get(self.endpoint + path, body=io.StringIO(data),
headers={"Content-Type": "application/octet-stream"}) headers={"Content-Type": "application/octet-stream"})
resp, body = self.client.get(path) resp, body = self.client.get(path)
@@ -355,7 +355,7 @@ class TestClient(testtools.TestCase):
response = 'Ok' response = 'Ok'
headers = {"Content-Type": "text/plain", headers = {"Content-Type": "text/plain",
"test": "value1\xa5\xa6"} "test": "value1\xa5\xa6"}
fake = utils.FakeResponse(headers, six.StringIO(response)) fake = utils.FakeResponse(headers, io.StringIO(response))
self.client.log_http_response(fake) self.client.log_http_response(fake)
except UnicodeDecodeError as e: except UnicodeDecodeError as e:
self.fail("Unexpected UnicodeDecodeError exception '%s'" % e) self.fail("Unexpected UnicodeDecodeError exception '%s'" % e)
@@ -458,7 +458,7 @@ class TestClient(testtools.TestCase):
logger = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG)) logger = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
data = "TEST" data = "TEST"
path = '/v1/images/' path = '/v1/images/'
self.mock.get(self.endpoint + path, body=six.StringIO(data), self.mock.get(self.endpoint + path, body=io.StringIO(data),
headers={"Content-Type": "application/octet-stream", headers={"Content-Type": "application/octet-stream",
'x-openstack-request-id': "1234"}) 'x-openstack-request-id': "1234"})

View File

@@ -13,10 +13,10 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import io
import sys import sys
import requests import requests
import six
import testtools import testtools
from glanceclient.common import progressbar from glanceclient.common import progressbar
@@ -49,7 +49,7 @@ class TestProgressBarWrapper(testtools.TestCase):
def test_iter_file_display_progress_bar(self): def test_iter_file_display_progress_bar(self):
size = 98304 size = 98304
file_obj = six.StringIO('X' * size) file_obj = io.StringIO('X' * size)
saved_stdout = sys.stdout saved_stdout = sys.stdout
try: try:
sys.stdout = output = test_utils.FakeTTYStdout() sys.stdout = output = test_utils.FakeTTYStdout()
@@ -67,7 +67,7 @@ class TestProgressBarWrapper(testtools.TestCase):
def test_iter_file_no_tty(self): def test_iter_file_no_tty(self):
size = 98304 size = 98304
file_obj = six.StringIO('X' * size) file_obj = io.StringIO('X' * size)
saved_stdout = sys.stdout saved_stdout = sys.stdout
try: try:
sys.stdout = output = test_utils.FakeNoTTYStdout() sys.stdout = output = test_utils.FakeNoTTYStdout()

View File

@@ -17,6 +17,7 @@
import argparse import argparse
from collections import OrderedDict from collections import OrderedDict
import hashlib import hashlib
import io
import logging import logging
import os import os
import sys import sys
@@ -28,7 +29,6 @@ import fixtures
from keystoneauth1 import exceptions as ks_exc from keystoneauth1 import exceptions as ks_exc
from keystoneauth1 import fixture as ks_fixture from keystoneauth1 import fixture as ks_fixture
from requests_mock.contrib import fixture as rm_fixture from requests_mock.contrib import fixture as rm_fixture
import six
from glanceclient.common import utils from glanceclient.common import utils
from glanceclient import exc from glanceclient import exc
@@ -144,8 +144,8 @@ class ShellTest(testutils.TestCase):
orig = sys.stdout orig = sys.stdout
orig_stderr = sys.stderr orig_stderr = sys.stderr
try: try:
sys.stdout = six.StringIO() sys.stdout = io.StringIO()
sys.stderr = six.StringIO() sys.stderr = io.StringIO()
_shell = openstack_shell.OpenStackImagesShell() _shell = openstack_shell.OpenStackImagesShell()
_shell.main(argstr.split()) _shell.main(argstr.split())
except SystemExit: except SystemExit:
@@ -162,9 +162,6 @@ class ShellTest(testutils.TestCase):
def test_fixup_subcommand(self): def test_fixup_subcommand(self):
arglist = [u'image-list', u'--help'] arglist = [u'image-list', u'--help']
if six.PY2:
expected_arglist = [b'image-list', u'--help']
elif six.PY3:
expected_arglist = [u'image-list', u'--help'] expected_arglist = [u'image-list', u'--help']
openstack_shell.OpenStackImagesShell._fixup_subcommand( openstack_shell.OpenStackImagesShell._fixup_subcommand(
@@ -175,11 +172,6 @@ class ShellTest(testutils.TestCase):
def test_fixup_subcommand_with_options_preceding(self): def test_fixup_subcommand_with_options_preceding(self):
arglist = [u'--os-auth-token', u'abcdef', u'image-list', u'--help'] arglist = [u'--os-auth-token', u'abcdef', u'image-list', u'--help']
unknown = arglist[2:] unknown = arglist[2:]
if six.PY2:
expected_arglist = [
u'--os-auth-token', u'abcdef', b'image-list', u'--help'
]
elif six.PY3:
expected_arglist = [ expected_arglist = [
u'--os-auth-token', u'abcdef', u'image-list', u'--help' u'--os-auth-token', u'abcdef', u'image-list', u'--help'
] ]
@@ -194,8 +186,8 @@ class ShellTest(testutils.TestCase):
argstr = '--os-image-api-version 2 help foofoo' argstr = '--os-image-api-version 2 help foofoo'
self.assertRaises(exc.CommandError, shell.main, argstr.split()) self.assertRaises(exc.CommandError, shell.main, argstr.split())
@mock.patch('sys.stdout', six.StringIO()) @mock.patch('sys.stdout', io.StringIO())
@mock.patch('sys.stderr', six.StringIO()) @mock.patch('sys.stderr', io.StringIO())
@mock.patch('sys.argv', ['glance', 'help', 'foofoo']) @mock.patch('sys.argv', ['glance', 'help', 'foofoo'])
def test_no_stacktrace_when_debug_disabled(self): def test_no_stacktrace_when_debug_disabled(self):
with mock.patch.object(traceback, 'print_exc') as mock_print_exc: with mock.patch.object(traceback, 'print_exc') as mock_print_exc:
@@ -205,8 +197,8 @@ class ShellTest(testutils.TestCase):
pass pass
self.assertFalse(mock_print_exc.called) self.assertFalse(mock_print_exc.called)
@mock.patch('sys.stdout', six.StringIO()) @mock.patch('sys.stdout', io.StringIO())
@mock.patch('sys.stderr', six.StringIO()) @mock.patch('sys.stderr', io.StringIO())
@mock.patch('sys.argv', ['glance', 'help', 'foofoo']) @mock.patch('sys.argv', ['glance', 'help', 'foofoo'])
def test_stacktrace_when_debug_enabled_by_env(self): def test_stacktrace_when_debug_enabled_by_env(self):
old_environment = os.environ.copy() old_environment = os.environ.copy()
@@ -221,8 +213,8 @@ class ShellTest(testutils.TestCase):
finally: finally:
os.environ = old_environment os.environ = old_environment
@mock.patch('sys.stdout', six.StringIO()) @mock.patch('sys.stdout', io.StringIO())
@mock.patch('sys.stderr', six.StringIO()) @mock.patch('sys.stderr', io.StringIO())
@mock.patch('sys.argv', ['glance', '--debug', 'help', 'foofoo']) @mock.patch('sys.argv', ['glance', '--debug', 'help', 'foofoo'])
def test_stacktrace_when_debug_enabled(self): def test_stacktrace_when_debug_enabled(self):
with mock.patch.object(traceback, 'print_exc') as mock_print_exc: with mock.patch.object(traceback, 'print_exc') as mock_print_exc:
@@ -589,8 +581,8 @@ class ShellTest(testutils.TestCase):
self.assertRaises(exc.CommandError, glance_shell.main, args.split()) self.assertRaises(exc.CommandError, glance_shell.main, args.split())
@mock.patch('sys.argv', ['glance']) @mock.patch('sys.argv', ['glance'])
@mock.patch('sys.stdout', six.StringIO()) @mock.patch('sys.stdout', io.StringIO())
@mock.patch('sys.stderr', six.StringIO()) @mock.patch('sys.stderr', io.StringIO())
def test_main_noargs(self): def test_main_noargs(self):
# Ensure that main works with no command-line arguments # Ensure that main works with no command-line arguments
try: try:
@@ -781,7 +773,7 @@ class ShellCacheSchemaTest(testutils.TestCase):
return Args(args) return Args(args)
@mock.patch('six.moves.builtins.open', new=mock.mock_open(), create=True) @mock.patch('builtins.open', new=mock.mock_open(), create=True)
@mock.patch('os.path.exists', return_value=True) @mock.patch('os.path.exists', return_value=True)
def test_cache_schemas_gets_when_forced(self, exists_mock): def test_cache_schemas_gets_when_forced(self, exists_mock):
options = { options = {
@@ -804,7 +796,7 @@ class ShellCacheSchemaTest(testutils.TestCase):
actual = json.loads(open.mock_calls[6][1][0]) actual = json.loads(open.mock_calls[6][1][0])
self.assertEqual(schema_odict, actual) self.assertEqual(schema_odict, actual)
@mock.patch('six.moves.builtins.open', new=mock.mock_open(), create=True) @mock.patch('builtins.open', new=mock.mock_open(), create=True)
@mock.patch('os.path.exists', side_effect=[True, False, False, False]) @mock.patch('os.path.exists', side_effect=[True, False, False, False])
def test_cache_schemas_gets_when_not_exists(self, exists_mock): def test_cache_schemas_gets_when_not_exists(self, exists_mock):
options = { options = {
@@ -827,7 +819,7 @@ class ShellCacheSchemaTest(testutils.TestCase):
actual = json.loads(open.mock_calls[6][1][0]) actual = json.loads(open.mock_calls[6][1][0])
self.assertEqual(schema_odict, actual) self.assertEqual(schema_odict, actual)
@mock.patch('six.moves.builtins.open', new=mock.mock_open(), create=True) @mock.patch('builtins.open', new=mock.mock_open(), create=True)
@mock.patch('os.path.exists', return_value=True) @mock.patch('os.path.exists', return_value=True)
def test_cache_schemas_leaves_when_present_not_forced(self, exists_mock): def test_cache_schemas_leaves_when_present_not_forced(self, exists_mock):
options = { options = {
@@ -848,7 +840,7 @@ class ShellCacheSchemaTest(testutils.TestCase):
self.assertEqual(4, exists_mock.call_count) self.assertEqual(4, exists_mock.call_count)
self.assertEqual(0, open.mock_calls.__len__()) self.assertEqual(0, open.mock_calls.__len__())
@mock.patch('six.moves.builtins.open', new=mock.mock_open(), create=True) @mock.patch('builtins.open', new=mock.mock_open(), create=True)
@mock.patch('os.path.exists', return_value=True) @mock.patch('os.path.exists', return_value=True)
def test_cache_schemas_leaves_auto_switch(self, exists_mock): def test_cache_schemas_leaves_auto_switch(self, exists_mock):
options = { options = {
@@ -899,7 +891,7 @@ class ShellTestRequests(testutils.TestCase):
headers = {'Content-Length': '4', headers = {'Content-Length': '4',
'Content-type': 'application/octet-stream'} 'Content-type': 'application/octet-stream'}
fake = testutils.FakeResponse(headers, six.StringIO('DATA')) fake = testutils.FakeResponse(headers, io.StringIO('DATA'))
self.requests.get('http://example.com/v1/images/%s' % id, self.requests.get('http://example.com/v1/images/%s' % id,
raw=fake) raw=fake)
@@ -938,7 +930,7 @@ class ShellTestRequests(testutils.TestCase):
headers = {'Content-Length': '4', headers = {'Content-Length': '4',
'Content-type': 'application/octet-stream'} 'Content-type': 'application/octet-stream'}
fake = testutils.FakeResponse(headers, six.StringIO('DATA')) fake = testutils.FakeResponse(headers, io.StringIO('DATA'))
self.requests.get('http://example.com/v1/images/%s' % id, self.requests.get('http://example.com/v1/images/%s' % id,
headers=headers, raw=fake) headers=headers, raw=fake)
@@ -960,7 +952,7 @@ class ShellTestRequests(testutils.TestCase):
id = image_show_fixture['id'] id = image_show_fixture['id']
headers = {'Content-Length': '4', headers = {'Content-Length': '4',
'Content-type': 'application/octet-stream'} 'Content-type': 'application/octet-stream'}
fake = testutils.FakeResponse(headers, six.StringIO('DATA')) fake = testutils.FakeResponse(headers, io.StringIO('DATA'))
self.requests = self.useFixture(rm_fixture.Fixture()) self.requests = self.useFixture(rm_fixture.Fixture())
self.requests.get('http://example.com/v2/images/%s/file' % id, self.requests.get('http://example.com/v2/images/%s/file' % id,

View File

@@ -16,7 +16,6 @@
import os import os
from unittest import mock from unittest import mock
import six
import ssl import ssl
import testtools import testtools
import threading import threading
@@ -26,10 +25,7 @@ from glanceclient import exc
from glanceclient import v1 from glanceclient import v1
from glanceclient import v2 from glanceclient import v2
if six.PY3 is True: import socketserver
import socketserver
else:
import SocketServer as socketserver
TEST_VAR_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), TEST_VAR_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
@@ -217,9 +213,7 @@ class TestHTTPSVerifyCert(testtools.TestCase):
# starting from python 2.7.8 the way to handle loading private # starting from python 2.7.8 the way to handle loading private
# keys into the SSL_CTX was changed and error message become # keys into the SSL_CTX was changed and error message become
# similar to the one in 3.X # similar to the one in 3.X
if (six.PY2 and 'PrivateKey' not in e.message and if 'PEM lib' not in e.message:
'PEM lib' not in e.message or
six.PY3 and 'PEM lib' not in e.message):
self.fail('No appropriate failure message is received') self.fail('No appropriate failure message is received')
@mock.patch('sys.stderr') @mock.patch('sys.stderr')

View File

@@ -13,14 +13,13 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import io
import sys import sys
from unittest import mock from unittest import mock
from oslo_utils import encodeutils from oslo_utils import encodeutils
from requests import Response from requests import Response
import six
# NOTE(jokke): simplified transition to py3, behaves like py2 xrange # NOTE(jokke): simplified transition to py3, behaves like py2 xrange
from six.moves import range
import testtools import testtools
from glanceclient.common import utils from glanceclient.common import utils
@@ -47,7 +46,7 @@ class TestUtils(testtools.TestCase):
def test_get_new_file_size(self): def test_get_new_file_size(self):
size = 98304 size = 98304
file_obj = six.StringIO('X' * size) file_obj = io.StringIO('X' * size)
try: try:
self.assertEqual(size, utils.get_file_size(file_obj)) self.assertEqual(size, utils.get_file_size(file_obj))
# Check that get_file_size didn't change original file position. # Check that get_file_size didn't change original file position.
@@ -57,7 +56,7 @@ class TestUtils(testtools.TestCase):
def test_get_consumed_file_size(self): def test_get_consumed_file_size(self):
size, consumed = 98304, 304 size, consumed = 98304, 304
file_obj = six.StringIO('X' * size) file_obj = io.StringIO('X' * size)
file_obj.seek(consumed) file_obj.seek(consumed)
try: try:
self.assertEqual(size, utils.get_file_size(file_obj)) self.assertEqual(size, utils.get_file_size(file_obj))
@@ -79,10 +78,10 @@ class TestUtils(testtools.TestCase):
saved_stdout = sys.stdout saved_stdout = sys.stdout
try: try:
sys.stdout = output_list = six.StringIO() sys.stdout = output_list = io.StringIO()
utils.print_list(images, columns) utils.print_list(images, columns)
sys.stdout = output_dict = six.StringIO() sys.stdout = output_dict = io.StringIO()
utils.print_dict({'K': 'k', 'Key': 'veeeeeeeeeeeeeeeeeeeeeeee' utils.print_dict({'K': 'k', 'Key': 'veeeeeeeeeeeeeeeeeeeeeeee'
'eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' 'eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee'
'eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' 'eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee'
@@ -126,7 +125,7 @@ class TestUtils(testtools.TestCase):
'tags': [u'Name1', u'Tag_123', u'veeeery long']})] 'tags': [u'Name1', u'Tag_123', u'veeeery long']})]
saved_stdout = sys.stdout saved_stdout = sys.stdout
try: try:
sys.stdout = output_list = six.StringIO() sys.stdout = output_list = io.StringIO()
utils.print_list(images, columns) utils.print_list(images, columns)
finally: finally:
@@ -145,7 +144,7 @@ class TestUtils(testtools.TestCase):
image = {'id': '42', 'virtual_size': 1337} image = {'id': '42', 'virtual_size': 1337}
saved_stdout = sys.stdout saved_stdout = sys.stdout
try: try:
sys.stdout = output_list = six.StringIO() sys.stdout = output_list = io.StringIO()
utils.print_image(image) utils.print_image(image)
finally: finally:
sys.stdout = saved_stdout sys.stdout = saved_stdout
@@ -164,7 +163,7 @@ class TestUtils(testtools.TestCase):
image = {'id': '42', 'virtual_size': None} image = {'id': '42', 'virtual_size': None}
saved_stdout = sys.stdout saved_stdout = sys.stdout
try: try:
sys.stdout = output_list = six.StringIO() sys.stdout = output_list = io.StringIO()
utils.print_image(image) utils.print_image(image)
finally: finally:
sys.stdout = saved_stdout sys.stdout = saved_stdout
@@ -181,10 +180,6 @@ class TestUtils(testtools.TestCase):
def test_unicode_key_value_to_string(self): def test_unicode_key_value_to_string(self):
src = {u'key': u'\u70fd\u7231\u5a77'} src = {u'key': u'\u70fd\u7231\u5a77'}
expected = {'key': '\xe7\x83\xbd\xe7\x88\xb1\xe5\xa9\xb7'}
if six.PY2:
self.assertEqual(expected, utils.unicode_key_value_to_string(src))
else:
# u'xxxx' in PY3 is str, we will not get extra 'u' from cli # u'xxxx' in PY3 is str, we will not get extra 'u' from cli
# output in PY3 # output in PY3
self.assertEqual(src, utils.unicode_key_value_to_string(src)) self.assertEqual(src, utils.unicode_key_value_to_string(src))
@@ -240,7 +235,7 @@ class TestUtils(testtools.TestCase):
for chunk in i: for chunk in i:
raise(IOError) raise(IOError)
data = six.moves.StringIO('somestring') data = io.StringIO('somestring')
data.close = mock.Mock() data.close = mock.Mock()
i = utils.IterableWithLength(data, 10) i = utils.IterableWithLength(data, 10)
self.assertRaises(IOError, _iterate, i) self.assertRaises(IOError, _iterate, i)

View File

@@ -14,11 +14,10 @@
# under the License. # under the License.
import errno import errno
import io
import json import json
import testtools import testtools
from urllib import parse
import six
from six.moves.urllib import parse
from glanceclient.tests import utils from glanceclient.tests import utils
from glanceclient.v1 import client from glanceclient.v1 import client
@@ -634,7 +633,7 @@ class ImageManagerTest(testtools.TestCase):
self.assertEqual({'a': 'b', 'c': 'd'}, image.properties) self.assertEqual({'a': 'b', 'c': 'd'}, image.properties)
def test_create_with_data(self): def test_create_with_data(self):
image_data = six.StringIO('XXX') image_data = io.StringIO('XXX')
self.mgr.create(data=image_data) self.mgr.create(data=image_data)
expect_headers = {'x-image-meta-size': '3'} expect_headers = {'x-image-meta-size': '3'}
expect = [('POST', '/v1/images', expect_headers, image_data)] expect = [('POST', '/v1/images', expect_headers, image_data)]
@@ -711,7 +710,7 @@ class ImageManagerTest(testtools.TestCase):
self.assertEqual(10, image.min_disk) self.assertEqual(10, image.min_disk)
def test_update_with_data(self): def test_update_with_data(self):
image_data = six.StringIO('XXX') image_data = io.StringIO('XXX')
self.mgr.update('1', data=image_data) self.mgr.update('1', data=image_data)
expect_headers = {'x-image-meta-size': '3', expect_headers = {'x-image-meta-size': '3',
'x-glance-registry-purge-props': 'false'} 'x-glance-registry-purge-props': 'false'}
@@ -744,9 +743,6 @@ class ImageManagerTest(testtools.TestCase):
def test_image_meta_from_headers_encoding(self): def test_image_meta_from_headers_encoding(self):
value = u"ni\xf1o" value = u"ni\xf1o"
if six.PY2:
fields = {"x-image-meta-name": "ni\xc3\xb1o"}
else:
fields = {"x-image-meta-name": value} fields = {"x-image-meta-name": value}
headers = self.mgr._image_meta_from_headers(fields) headers = self.mgr._image_meta_from_headers(fields)
self.assertEqual(value, headers["name"]) self.assertEqual(value, headers["name"])

View File

@@ -15,11 +15,11 @@
# under the License. # under the License.
import argparse import argparse
import io
import json import json
import os import os
from unittest import mock from unittest import mock
import six
import subprocess import subprocess
import tempfile import tempfile
import testtools import testtools
@@ -34,12 +34,6 @@ import glanceclient.v1.shell as v1shell
from glanceclient.tests import utils from glanceclient.tests import utils
if six.PY3:
import io
file_type = io.IOBase
else:
file_type = file
fixtures = { fixtures = {
'/v1/images/96d2c7e1-de4e-4612-8aa2-ba26610c804e': { '/v1/images/96d2c7e1-de4e-4612-8aa2-ba26610c804e': {
'PUT': ( 'PUT': (
@@ -351,7 +345,7 @@ class ShellInvalidEndpointandParameterTest(utils.TestCase):
@mock.patch('sys.stderr') @mock.patch('sys.stderr')
def test_image_create_missing_container_format_stdin_data(self, __): def test_image_create_missing_container_format_stdin_data(self, __):
# Fake that get_data_file method returns data # Fake that get_data_file method returns data
self.mock_get_data_file.return_value = six.StringIO() self.mock_get_data_file.return_value = io.StringIO()
e = self.assertRaises(exc.CommandError, self.run_command, e = self.assertRaises(exc.CommandError, self.run_command,
'--os-image-api-version 1 image-create' '--os-image-api-version 1 image-create'
' --disk-format qcow2') ' --disk-format qcow2')
@@ -361,7 +355,7 @@ class ShellInvalidEndpointandParameterTest(utils.TestCase):
@mock.patch('sys.stderr') @mock.patch('sys.stderr')
def test_image_create_missing_disk_format_stdin_data(self, __): def test_image_create_missing_disk_format_stdin_data(self, __):
# Fake that get_data_file method returns data # Fake that get_data_file method returns data
self.mock_get_data_file.return_value = six.StringIO() self.mock_get_data_file.return_value = io.StringIO()
e = self.assertRaises(exc.CommandError, self.run_command, e = self.assertRaises(exc.CommandError, self.run_command,
'--os-image-api-version 1 image-create' '--os-image-api-version 1 image-create'
' --container-format bare') ' --container-format bare')
@@ -574,7 +568,7 @@ class ShellStdinHandlingTests(testtools.TestCase):
self._do_update('44d2c7e1-de4e-4612-8aa2-ba26610c444f') self._do_update('44d2c7e1-de4e-4612-8aa2-ba26610c444f')
self.assertIn('data', self.collected_args[1]) self.assertIn('data', self.collected_args[1])
self.assertIsInstance(self.collected_args[1]['data'], file_type) self.assertIsInstance(self.collected_args[1]['data'], io.IOBase)
self.assertEqual(b'Some Data', self.assertEqual(b'Some Data',
self.collected_args[1]['data'].read()) self.collected_args[1]['data'].read())
@@ -599,7 +593,7 @@ class ShellStdinHandlingTests(testtools.TestCase):
self._do_update('44d2c7e1-de4e-4612-8aa2-ba26610c444f') self._do_update('44d2c7e1-de4e-4612-8aa2-ba26610c444f')
self.assertIn('data', self.collected_args[1]) self.assertIn('data', self.collected_args[1])
self.assertIsInstance(self.collected_args[1]['data'], file_type) self.assertIsInstance(self.collected_args[1]['data'], io.IOBase)
self.assertEqual(b'Some Data\n', self.assertEqual(b'Some Data\n',
self.collected_args[1]['data'].read()) self.collected_args[1]['data'].read())

View File

@@ -15,11 +15,11 @@
# under the License. # under the License.
import argparse import argparse
from copy import deepcopy from copy import deepcopy
import io
import json import json
import os import os
from unittest import mock from unittest import mock
import six
import sys import sys
import tempfile import tempfile
import testtools import testtools
@@ -196,7 +196,7 @@ class ShellV2Test(testtools.TestCase):
@mock.patch('sys.stderr') @mock.patch('sys.stderr')
def test_image_create_missing_container_format_stdin_data(self, __): def test_image_create_missing_container_format_stdin_data(self, __):
# Fake that get_data_file method returns data # Fake that get_data_file method returns data
self.mock_get_data_file.return_value = six.StringIO() self.mock_get_data_file.return_value = io.StringIO()
e = self.assertRaises(exc.CommandError, self._run_command, e = self.assertRaises(exc.CommandError, self._run_command,
'--os-image-api-version 2 image-create' '--os-image-api-version 2 image-create'
' --disk-format qcow2') ' --disk-format qcow2')
@@ -206,7 +206,7 @@ class ShellV2Test(testtools.TestCase):
@mock.patch('sys.stderr') @mock.patch('sys.stderr')
def test_image_create_missing_disk_format_stdin_data(self, __): def test_image_create_missing_disk_format_stdin_data(self, __):
# Fake that get_data_file method returns data # Fake that get_data_file method returns data
self.mock_get_data_file.return_value = six.StringIO() self.mock_get_data_file.return_value = io.StringIO()
e = self.assertRaises(exc.CommandError, self._run_command, e = self.assertRaises(exc.CommandError, self._run_command,
'--os-image-api-version 2 image-create' '--os-image-api-version 2 image-create'
' --container-format bare') ' --container-format bare')
@@ -618,7 +618,7 @@ class ShellV2Test(testtools.TestCase):
'os_hash_value': None}) 'os_hash_value': None})
def test_do_image_create_with_multihash(self): def test_do_image_create_with_multihash(self):
self.mock_get_data_file.return_value = six.StringIO() self.mock_get_data_file.return_value = io.StringIO()
try: try:
with open(tempfile.mktemp(), 'w+') as f: with open(tempfile.mktemp(), 'w+') as f:
f.write('Some data here') f.write('Some data here')
@@ -694,7 +694,7 @@ class ShellV2Test(testtools.TestCase):
'container_format': 'bare', 'os_hidden': True}) 'container_format': 'bare', 'os_hidden': True})
def test_do_image_create_with_file(self): def test_do_image_create_with_file(self):
self.mock_get_data_file.return_value = six.StringIO() self.mock_get_data_file.return_value = io.StringIO()
try: try:
file_name = None file_name = None
with open(tempfile.mktemp(), 'w+') as f: with open(tempfile.mktemp(), 'w+') as f:
@@ -1412,7 +1412,7 @@ class ShellV2Test(testtools.TestCase):
self, mock_stdin, mock_do_stage, mock_do_import): self, mock_stdin, mock_do_stage, mock_do_import):
"""Backward compat -> handle this like a glance-direct""" """Backward compat -> handle this like a glance-direct"""
mock_stdin.isatty = lambda: False mock_stdin.isatty = lambda: False
self.mock_get_data_file.return_value = six.StringIO() self.mock_get_data_file.return_value = io.StringIO()
args = self._make_args(self.base_args) args = self._make_args(self.base_args)
with mock.patch.object(self.gc.images, 'create') as mocked_create: with mock.patch.object(self.gc.images, 'create') as mocked_create:
with mock.patch.object(self.gc.images, 'get') as mocked_get: with mock.patch.object(self.gc.images, 'get') as mocked_get:
@@ -1447,7 +1447,7 @@ class ShellV2Test(testtools.TestCase):
self, mock_stdin, mock_access, mock_do_stage, mock_do_import): self, mock_stdin, mock_access, mock_do_stage, mock_do_import):
"""Backward compat -> handle this like a glance-direct""" """Backward compat -> handle this like a glance-direct"""
mock_stdin.isatty = lambda: True mock_stdin.isatty = lambda: True
self.mock_get_data_file.return_value = six.StringIO() self.mock_get_data_file.return_value = io.StringIO()
mock_access.return_value = True mock_access.return_value = True
my_args = self.base_args.copy() my_args = self.base_args.copy()
my_args['file'] = 'fake-image-file.browncow' my_args['file'] = 'fake-image-file.browncow'

View File

@@ -14,10 +14,10 @@
# under the License. # under the License.
import copy import copy
import io
import json import json
import six
import six.moves.urllib.parse as urlparse
import testtools import testtools
from urllib import parse
from glanceclient.v2 import schemas from glanceclient.v2 import schemas
@@ -38,11 +38,11 @@ class FakeAPI(object):
fixture = self.fixtures[sort_url_by_query_keys(url)][method] fixture = self.fixtures[sort_url_by_query_keys(url)][method]
data = fixture[1] data = fixture[1]
if isinstance(fixture[1], six.string_types): if isinstance(fixture[1], str):
try: try:
data = json.loads(fixture[1]) data = json.loads(fixture[1])
except ValueError: except ValueError:
data = six.StringIO(fixture[1]) data = io.StringIO(fixture[1])
return FakeResponse(fixture[0], fixture[1]), data return FakeResponse(fixture[0], fixture[1]), data
@@ -141,7 +141,7 @@ class FakeResponse(object):
@property @property
def text(self): def text(self):
if isinstance(self.content, six.binary_type): if isinstance(self.content, bytes):
return self.content.decode('utf-8') return self.content.decode('utf-8')
return self.content return self.content
@@ -166,7 +166,7 @@ class TestCase(testtools.TestCase):
'verify': True} 'verify': True}
class FakeTTYStdout(six.StringIO): class FakeTTYStdout(io.StringIO):
"""A Fake stdout that try to emulate a TTY device as much as possible.""" """A Fake stdout that try to emulate a TTY device as much as possible."""
def isatty(self): def isatty(self):
@@ -177,7 +177,7 @@ class FakeTTYStdout(six.StringIO):
if data.startswith('\r'): if data.startswith('\r'):
self.seek(0) self.seek(0)
data = data[1:] data = data[1:]
return six.StringIO.write(self, data) return io.StringIO.write(self, data)
class FakeNoTTYStdout(FakeTTYStdout): class FakeNoTTYStdout(FakeTTYStdout):
@@ -197,24 +197,24 @@ def sort_url_by_query_keys(url):
:param url: url which will be ordered by query keys :param url: url which will be ordered by query keys
:returns url: url with ordered query keys :returns url: url with ordered query keys
""" """
parsed = urlparse.urlparse(url) parsed = parse.urlparse(url)
queries = urlparse.parse_qsl(parsed.query, True) queries = parse.parse_qsl(parsed.query, True)
sorted_query = sorted(queries, key=lambda x: x[0]) sorted_query = sorted(queries, key=lambda x: x[0])
encoded_sorted_query = urlparse.urlencode(sorted_query, True) encoded_sorted_query = parse.urlencode(sorted_query, True)
url_parts = (parsed.scheme, parsed.netloc, parsed.path, url_parts = (parsed.scheme, parsed.netloc, parsed.path,
parsed.params, encoded_sorted_query, parsed.params, encoded_sorted_query,
parsed.fragment) parsed.fragment)
return urlparse.urlunparse(url_parts) return parse.urlunparse(url_parts)
def build_call_record(method, url, headers, data): def build_call_record(method, url, headers, data):
"""Key the request body be ordered if it's a dict type.""" """Key the request body be ordered if it's a dict type."""
if isinstance(data, dict): if isinstance(data, dict):
data = sorted(data.items()) data = sorted(data.items())
if isinstance(data, six.string_types): if isinstance(data, str):
# NOTE(flwang): For image update, the data will be a 'list' which # NOTE(flwang): For image update, the data will be a 'list' which
# contains operation dict, such as: [{"op": "remove", "path": "/a"}] # contains operation dict, such as: [{"op": "remove", "path": "/a"}]
try: try: