Merge "Make sure we return unicode strings for process output"

This commit is contained in:
Jenkins 2015-11-24 05:13:18 +00:00 committed by Gerrit Code Review
commit 830c2e30fd
9 changed files with 51 additions and 15 deletions

View File

@ -21,6 +21,7 @@ from oslo_log import log as logging
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.common import utils as common_utils
from neutron.i18n import _LE
@ -223,7 +224,7 @@ class AsyncProcess(object):
def _read(self, stream, queue):
data = stream.readline()
if data:
data = data.strip()
data = common_utils.safe_decode_utf8(data.strip())
queue.put(data)
return data

View File

@ -120,11 +120,8 @@ def execute(cmd, process_input=None, addl_env=None,
_stdout, _stderr = obj.communicate(_process_input)
returncode = obj.returncode
obj.stdin.close()
if six.PY3:
if isinstance(_stdout, bytes):
_stdout = _stdout.decode('utf-8', 'surrogateescape')
if isinstance(_stderr, bytes):
_stderr = _stderr.decode('utf-8', 'surrogateescape')
_stdout = utils.safe_decode_utf8(_stdout)
_stderr = utils.safe_decode_utf8(_stderr)
extra_ok_codes = extra_ok_codes or []
if returncode and returncode not in extra_ok_codes:

View File

@ -57,11 +57,8 @@ def execute(cmd, process_input=None, addl_env=None,
obj, cmd = create_process(cmd, addl_env=addl_env)
_stdout, _stderr = obj.communicate(_process_input)
obj.stdin.close()
if six.PY3:
if isinstance(_stdout, bytes):
_stdout = _stdout.decode('utf-8', 'surrogateescape')
if isinstance(_stderr, bytes):
_stderr = _stderr.decode('utf-8', 'surrogateescape')
_stdout = utils.safe_decode_utf8(_stdout)
_stderr = utils.safe_decode_utf8(_stderr)
m = _("\nCommand: %(cmd)s\nExit code: %(code)s\nStdin: %(stdin)s\n"
"Stdout: %(stdout)s\nStderr: %(stderr)s") % \

View File

@ -524,3 +524,9 @@ def load_class_by_alias_or_classname(namespace, name):
exc_info=True)
raise ImportError(_("Class not found."))
return class_to_load
def safe_decode_utf8(s):
if six.PY3 and isinstance(s, bytes):
return s.decode('utf-8', 'surrogateescape')
return s

View File

@ -16,6 +16,8 @@ import datetime
import os
from oslo_utils import timeutils
import six
import testtools
import neutron
from neutron.common import constants
@ -151,3 +153,11 @@ def register_ovs_agent(host=HOST, agent_type=constants.AGENT_TYPE_OVS,
tunneling_ip, interface_mappings,
l2pop_network_types)
return _register_agent(agent)
def requires_py2(testcase):
return testtools.skipUnless(six.PY2, "requires python 2.x")(testcase)
def requires_py3(testcase):
return testtools.skipUnless(six.PY3, "requires python 3.x")(testcase)

View File

@ -13,6 +13,7 @@
# under the License.
import eventlet
import six
from neutron.agent.linux import async_process
from neutron.agent.linux import utils
@ -24,7 +25,7 @@ class AsyncProcessTestFramework(base.BaseTestCase):
def setUp(self):
super(AsyncProcessTestFramework, self).setUp()
self.test_file_path = self.get_temp_file_path('test_async_process.tmp')
self.data = [str(x) for x in range(4)]
self.data = [six.text_type(x) for x in range(4)]
with open(self.test_file_path, 'w') as f:
f.writelines('%s\n' % item for item in self.data)

View File

@ -22,6 +22,7 @@ import oslo_i18n
from neutron.agent.linux import utils
from neutron.tests import base
from neutron.tests.common import helpers
_marker = object()
@ -147,7 +148,7 @@ class AgentUtilsExecuteTest(base.BaseTestCase):
result = utils.execute(['ls', self.test_file], return_stderr=True)
self.assertEqual((str_data, ''), result)
@testtools.skipUnless(six.PY3, 'This test makes sense only in Python 3')
@helpers.requires_py3
def test_surrogateescape_in_decoding_out_data(self):
bytes_err_data = b'\xed\xa0\xbd'
err_data = bytes_err_data.decode('utf-8', 'surrogateescape')

View File

@ -18,6 +18,7 @@ import re
import eventlet
import mock
import netaddr
import six
import testtools
from neutron.common import constants
@ -26,6 +27,7 @@ from neutron.common import utils
from neutron.plugins.common import constants as p_const
from neutron.plugins.common import utils as plugin_utils
from neutron.tests import base
from neutron.tests.common import helpers
from oslo_log import log as logging
@ -716,3 +718,24 @@ class TestGetRandomString(base.BaseTestCase):
self.assertEqual(length, len(random_string))
regex = re.compile('^[0-9a-fA-F]+$')
self.assertIsNotNone(regex.match(random_string))
class TestSafeDecodeUtf8(base.BaseTestCase):
@helpers.requires_py2
def test_py2_does_nothing(self):
s = 'test-py2'
self.assertIs(s, utils.safe_decode_utf8(s))
@helpers.requires_py3
def test_py3_decoded_valid_bytes(self):
s = bytes('test-py2', 'utf-8')
decoded_str = utils.safe_decode_utf8(s)
self.assertIsInstance(decoded_str, six.text_type)
self.assertEqual(s, decoded_str.encode('utf-8'))
@helpers.requires_py3
def test_py3_decoded_invalid_bytes(self):
s = bytes('test-py2', 'utf_16')
decoded_str = utils.safe_decode_utf8(s)
self.assertIsInstance(decoded_str, six.text_type)

View File

@ -19,7 +19,6 @@ import ssl
import mock
from oslo_config import cfg
import six
import six.moves.urllib.request as urlrequest
import testtools
import webob
@ -28,6 +27,7 @@ import webob.exc
from neutron.common import exceptions as exception
from neutron.db import api
from neutron.tests import base
from neutron.tests.common import helpers
from neutron import wsgi
CONF = cfg.CONF
@ -496,7 +496,7 @@ class JSONDictSerializerTest(base.BaseTestCase):
# The tested behaviour is only meant to be witnessed in Python 2, so it is
# OK to skip this test with Python 3.
@testtools.skipIf(six.PY3, "This test does not make sense in Python 3")
@helpers.requires_py2
def test_json_with_utf8(self):
input_dict = dict(servers=dict(a=(2, '\xe7\xbd\x91\xe7\xbb\x9c')))
expected_json = b'{"servers":{"a":[2,"\\u7f51\\u7edc"]}}'