Port test_metadata to Python 3

* convert_password(): on Python 3, decode password from UTF-8 if it
  is a byte string
* convert_password(): don't modify password in the loop body, use an
  index increased at each iteration. This is a minor cleanup,
  unrelated to Python 3.
* ec2_md_print() now returns byte strings unchanged. On Python 3,
  str(bytes) uses "b'...'" format which is not the expected result
  (or it raises a BytesWarinng exception when using python3 -bb).
* Replace the base64 module with oslo_serialization.base64 to control
  the output type (bytes or text) on Python 2 and Python 3.
* _make_cache_key(): on Python 3, don't encode the host to UTF-8.
  Python 3 requires text.
* MetadataRequestHandler: on Python 3, encode HTTP body to UTF-8 if
  the body type is Unicode.
* test_metadata:

  - encode hmac.new() parameters using encodeutils.to_utf8()
  - use response.text rather than response.body to compare with
    Unicode string
  - use a byte strings for the instance identifier
  - fix some Unicode versus bytes issues

* tests-py3.txt: run test_metadata on Python 3

Co-Authored-By: Davanum Srinivas <davanum@gmail.com>
Co-Authored-By: ChangBo Guo(gcb) <eric.guo@easystack.cn>
Partially-Implements: blueprint goal-python35

Change-Id: Ifb43ce164d9fd80f9f85c95c762d7b010e84dfeb
This commit is contained in:
Victor Stinner 2016-06-22 11:24:27 +02:00 committed by ChangBo Guo(gcb)
parent 00b359ce14
commit 67af1eefc5
7 changed files with 65 additions and 56 deletions

View File

@ -16,11 +16,11 @@
"""Instance Metadata information."""
import base64
import os
import posixpath
from oslo_log import log as logging
from oslo_serialization import base64
from oslo_serialization import jsonutils
from oslo_utils import importutils
from oslo_utils import timeutils
@ -138,7 +138,7 @@ class InstanceMetadata(object):
self.mappings = _format_instance_mapping(ctxt, instance)
if instance.user_data is not None:
self.userdata_raw = base64.b64decode(instance.user_data)
self.userdata_raw = base64.decode_as_bytes(instance.user_data)
else:
self.userdata_raw = None
@ -365,7 +365,7 @@ class InstanceMetadata(object):
metadata['availability_zone'] = self.availability_zone
if self._check_os_version(GRIZZLY, version):
metadata['random_seed'] = base64.b64encode(os.urandom(512))
metadata['random_seed'] = base64.encode_as_text(os.urandom(512))
if self._check_os_version(LIBERTY, version):
metadata['project_id'] = self.instance.project_id
@ -694,6 +694,8 @@ def ec2_md_print(data):
return output[:-1]
elif isinstance(data, list):
return '\n'.join(data)
elif isinstance(data, (bytes, six.text_type)):
return data
else:
return str(data)

View File

@ -20,6 +20,7 @@ import hmac
import os
from oslo_log import log as logging
from oslo_utils import encodeutils
from oslo_utils import secretutils as secutils
import six
import webob.dec
@ -93,7 +94,7 @@ class MetadataRequestHandler(wsgi.Application):
def __call__(self, req):
if os.path.normpath(req.path_info) == "/":
resp = base.ec2_md_print(base.VERSIONS + ["latest"])
req.response.body = resp
req.response.body = encodeutils.to_utf8(resp)
req.response.content_type = base.MIME_TYPE_TEXT_PLAIN
return req.response
@ -122,10 +123,7 @@ class MetadataRequestHandler(wsgi.Application):
return data(req, meta_data)
resp = base.ec2_md_print(data)
if isinstance(resp, six.text_type):
req.response.text = resp
else:
req.response.body = resp
req.response.body = encodeutils.to_utf8(resp)
req.response.content_type = meta_data.get_mimetype()
return req.response
@ -264,8 +262,9 @@ class MetadataRequestHandler(wsgi.Application):
def _validate_shared_secret(self, requestor_id, signature,
requestor_address):
expected_signature = hmac.new(
CONF.neutron.metadata_proxy_shared_secret,
requestor_id, hashlib.sha256).hexdigest()
encodeutils.to_utf8(CONF.neutron.metadata_proxy_shared_secret),
encodeutils.to_utf8(requestor_id),
hashlib.sha256).hexdigest()
if not secutils.constant_time_compare(expected_signature, signature):
if requestor_id:

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import six
from six.moves import range
from webob import exc
@ -42,6 +43,9 @@ def convert_password(context, password):
Password is stored with the keys 'password_0' -> 'password_3'.
"""
password = password or ''
if six.PY3 and isinstance(password, bytes):
password = password.decode('utf-8')
meta = {}
for i in range(CHUNKS):
meta['password_%d' % i] = password[:CHUNK_LENGTH]

View File

@ -36,7 +36,7 @@ class JsonFileVendorData(vendordata.VendorDataDriver):
logprefix = "vendordata_jsonfile_path[%s]:" % fpath
if fpath:
try:
with open(fpath, "r") as fp:
with open(fpath, "rb") as fp:
data = jsonutils.load(fp)
except IOError as e:
if e.errno == errno.ENOENT:

View File

@ -16,9 +16,11 @@
"""Availability zone helper functions."""
import collections
import nova.conf
import six
from nova import cache_utils
import nova.conf
from nova import objects
# NOTE(vish): azs don't change that often, so cache them for an hour to
@ -49,7 +51,9 @@ def reset_cache():
def _make_cache_key(host):
return "azcache-%s" % host.encode('utf-8')
if six.PY2:
host = host.encode('utf-8')
return "azcache-%s" % host
def _build_metadata_by_host(aggregates, hosts=None):

View File

@ -16,7 +16,6 @@
"""Tests for metadata service."""
import base64
import copy
import hashlib
import hmac
@ -31,7 +30,9 @@ except ImportError:
import mock
from oslo_config import cfg
from oslo_serialization import base64
from oslo_serialization import jsonutils
from oslo_utils import encodeutils
import six
import webob
@ -61,7 +62,7 @@ from nova.virt import netutils
CONF = cfg.CONF
USER_DATA_STRING = (b"This is an encoded string")
ENCODE_USER_DATA_STRING = base64.b64encode(USER_DATA_STRING)
ENCODE_USER_DATA_STRING = base64.encode_as_text(USER_DATA_STRING)
FAKE_SEED = '7qtD24mpMR2'
@ -263,10 +264,10 @@ class MetadataTestCase(test.TestCase):
def test_user_data(self):
inst = self.instance.obj_clone()
inst['user_data'] = base64.b64encode("happy")
inst['user_data'] = base64.encode_as_text("happy")
md = fake_InstanceMetadata(self, inst)
self.assertEqual(
md.get_ec2_metadata(version='2009-04-04')['user-data'], "happy")
md.get_ec2_metadata(version='2009-04-04')['user-data'], b"happy")
def test_no_user_data(self):
inst = self.instance.obj_clone()
@ -492,11 +493,12 @@ class MetadataTestCase(test.TestCase):
data = md.get_ec2_metadata(version='2009-04-04')
self.assertEqual(data['meta-data']['local-ipv4'], expected_local)
@mock.patch.object(base64, 'b64encode', lambda data: FAKE_SEED)
@mock.patch('oslo_serialization.base64.encode_as_text',
return_value=FAKE_SEED)
@mock.patch('nova.cells.rpcapi.CellsAPI.get_keypair_at_top')
@mock.patch.object(jsonutils, 'dump_as_bytes')
def _test_as_json_with_options(self, mock_json_dump_as_bytes,
mock_cells_keypair,
mock_cells_keypair, mock_base64,
is_cells=False, os_version=base.GRIZZLY):
if is_cells:
self.flags(enable=True, group='cells')
@ -753,7 +755,8 @@ class OpenStackMetadataTestCase(test.TestCase):
mddict = jsonutils.loads(mdjson)
self.assertIn("random_seed", mddict)
self.assertEqual(len(base64.b64decode(mddict["random_seed"])), 512)
self.assertEqual(len(base64.decode_as_bytes(mddict["random_seed"])),
512)
# verify that older version do not have it
mdjson = mdinst.lookup("/openstack/2012-08-10/meta_data.json")
@ -1015,15 +1018,15 @@ class MetadataHandlerTestCase(test.TestCase):
response = fake_request(self, CallableMD(), "/bar")
self.assertEqual(response.status_int, 200)
self.assertEqual(response.body, "foo")
self.assertEqual(response.text, "foo")
def test_root(self):
expected = "\n".join(base.VERSIONS) + "\nlatest"
response = fake_request(self, self.mdinst, "/")
self.assertEqual(response.body, expected)
self.assertEqual(response.text, expected)
response = fake_request(self, self.mdinst, "/foo/../")
self.assertEqual(response.body, expected)
self.assertEqual(response.text, expected)
def test_root_metadata_proxy_enabled(self):
self.flags(service_metadata_proxy=True,
@ -1031,16 +1034,16 @@ class MetadataHandlerTestCase(test.TestCase):
expected = "\n".join(base.VERSIONS) + "\nlatest"
response = fake_request(self, self.mdinst, "/")
self.assertEqual(response.body, expected)
self.assertEqual(response.text, expected)
response = fake_request(self, self.mdinst, "/foo/../")
self.assertEqual(response.body, expected)
self.assertEqual(response.text, expected)
def test_version_root(self):
response = fake_request(self, self.mdinst, "/2009-04-04")
response_ctype = response.headers['Content-Type']
self.assertTrue(response_ctype.startswith("text/plain"))
self.assertEqual(response.body, 'meta-data/\nuser-data')
self.assertEqual(response.text, 'meta-data/\nuser-data')
response = fake_request(self, self.mdinst, "/9999-99-99")
self.assertEqual(response.status_int, 404)
@ -1095,7 +1098,7 @@ class MetadataHandlerTestCase(test.TestCase):
response_ctype = response.headers['Content-Type']
self.assertTrue(response_ctype.startswith("text/plain"))
self.assertEqual(response.body,
base64.b64decode(self.instance['user_data']))
base64.decode_as_bytes(self.instance['user_data']))
response = fake_request(self, self.mdinst,
relpath="/2009-04-04/user-data",
@ -1123,18 +1126,19 @@ class MetadataHandlerTestCase(test.TestCase):
def _fake_x_get_metadata(self, self_app, instance_id, remote_address):
if remote_address is None:
raise Exception('Expected X-Forwared-For header')
elif instance_id == self.expected_instance_id:
if encodeutils.to_utf8(instance_id) == self.expected_instance_id:
return self.mdinst
else:
# raise the exception to aid with 500 response code test
raise Exception("Expected instance_id of %s, got %s" %
(self.expected_instance_id, instance_id))
# raise the exception to aid with 500 response code test
raise Exception("Expected instance_id of %r, got %r" %
(self.expected_instance_id, instance_id))
def test_user_data_with_neutron_instance_id(self):
self.expected_instance_id = 'a-b-c-d'
self.expected_instance_id = b'a-b-c-d'
signed = hmac.new(
CONF.neutron.metadata_proxy_shared_secret,
encodeutils.to_utf8(CONF.neutron.metadata_proxy_shared_secret),
self.expected_instance_id,
hashlib.sha256).hexdigest()
@ -1165,7 +1169,7 @@ class MetadataHandlerTestCase(test.TestCase):
response_ctype = response.headers['Content-Type']
self.assertTrue(response_ctype.startswith("text/plain"))
self.assertEqual(response.body,
base64.b64decode(self.instance['user_data']))
base64.decode_as_bytes(self.instance['user_data']))
# mismatched signature
response = fake_request(
@ -1219,8 +1223,8 @@ class MetadataHandlerTestCase(test.TestCase):
# unexpected Instance-ID
signed = hmac.new(
CONF.neutron.metadata_proxy_shared_secret,
'z-z-z-z',
encodeutils.to_utf8(CONF.neutron.metadata_proxy_shared_secret),
b'z-z-z-z',
hashlib.sha256).hexdigest()
response = fake_request(
@ -1240,7 +1244,7 @@ class MetadataHandlerTestCase(test.TestCase):
# available at relpath.
response = fake_request(self, self.mdinst,
relpath=relpath)
for item in response.body.split('\n'):
for item in response.text.split('\n'):
if 'public-keys' in relpath:
# meta-data/public-keys/0=keyname refers to
# meta-data/public-keys/0
@ -1257,10 +1261,10 @@ class MetadataHandlerTestCase(test.TestCase):
_test_metadata_path('/2009-04-04/meta-data')
def _metadata_handler_with_instance_id(self, hnd):
expected_instance_id = 'a-b-c-d'
expected_instance_id = b'a-b-c-d'
signed = hmac.new(
CONF.neutron.metadata_proxy_shared_secret,
encodeutils.to_utf8(CONF.neutron.metadata_proxy_shared_secret),
expected_instance_id,
hashlib.sha256).hexdigest()
@ -1277,7 +1281,7 @@ class MetadataHandlerTestCase(test.TestCase):
'X-Instance-ID-Signature': signed})
self.assertEqual(200, response.status_int)
self.assertEqual(base64.b64decode(self.instance['user_data']),
self.assertEqual(base64.decode_as_bytes(self.instance['user_data']),
response.body)
@mock.patch.object(base, 'get_metadata_by_instance_id')
@ -1308,7 +1312,7 @@ class MetadataHandlerTestCase(test.TestCase):
relpath="/2009-04-04/user-data",
address="192.192.192.2")
self.assertEqual(200, response.status_int)
self.assertEqual(base64.b64decode(self.instance.user_data),
self.assertEqual(base64.decode_as_bytes(self.instance.user_data),
response.body)
@mock.patch.object(base, 'get_metadata_by_address')
@ -1336,7 +1340,7 @@ class MetadataHandlerTestCase(test.TestCase):
self.flags(service_metadata_proxy=True, group='neutron')
self.expected_instance_id = 'a-b-c-d'
self.expected_instance_id = b'a-b-c-d'
# with X-Metadata-Provider
proxy_lb_id = 'edge-x'
@ -1362,7 +1366,7 @@ class MetadataHandlerTestCase(test.TestCase):
self.flags(service_metadata_proxy=True, group='neutron')
self.expected_instance_id = 'a-b-c-d'
self.expected_instance_id = b'a-b-c-d'
# with X-Metadata-Provider
proxy_lb_id = 'edge-x'
@ -1400,14 +1404,14 @@ class MetadataHandlerTestCase(test.TestCase):
metadata_proxy_shared_secret=shared_secret,
service_metadata_proxy=True, group='neutron')
self.expected_instance_id = 'a-b-c-d'
self.expected_instance_id = b'a-b-c-d'
# with X-Metadata-Provider
proxy_lb_id = 'edge-x'
signature = hmac.new(
shared_secret,
proxy_lb_id,
encodeutils.to_utf8(shared_secret),
encodeutils.to_utf8(proxy_lb_id),
hashlib.sha256).hexdigest()
mock_client = mock_get_client()
@ -1436,14 +1440,14 @@ class MetadataHandlerTestCase(test.TestCase):
metadata_proxy_shared_secret=shared_secret,
service_metadata_proxy=True, group='neutron')
self.expected_instance_id = 'a-b-c-d'
self.expected_instance_id = b'a-b-c-d'
# with X-Metadata-Provider
proxy_lb_id = 'edge-x'
signature = hmac.new(
bad_secret,
proxy_lb_id,
encodeutils.to_utf8(bad_secret),
encodeutils.to_utf8(proxy_lb_id),
hashlib.sha256).hexdigest()
mock_client = mock_get_client()
@ -1537,7 +1541,7 @@ class MetadataPasswordTestCase(test.TestCase):
password.handle_password, request, self.mdinst)
@mock.patch('nova.objects.Instance.get_by_uuid')
def _try_set_password(self, get_by_uuid, val='bar'):
def _try_set_password(self, get_by_uuid, val=b'bar'):
request = webob.Request.blank('')
request.method = 'POST'
request.body = val
@ -1562,4 +1566,4 @@ class MetadataPasswordTestCase(test.TestCase):
self.mdinst.password = ''
self.assertRaises(webob.exc.HTTPBadRequest,
self._try_set_password,
val=('a' * (password.MAX_SIZE + 1)))
val=(b'a' * (password.MAX_SIZE + 1)))

View File

@ -38,10 +38,6 @@ nova.tests.unit.network.test_manager.LdapDNSTestCase
nova.tests.unit.test_bdm.BlockDeviceMappingEc2CloudTestCase
nova.tests.unit.test_configdrive2.ConfigDriveTestCase
nova.tests.unit.test_matchers.TestDictMatches.test__str__
nova.tests.unit.test_metadata.MetadataHandlerTestCase
nova.tests.unit.test_metadata.MetadataPasswordTestCase
nova.tests.unit.test_metadata.MetadataTestCase
nova.tests.unit.test_metadata.OpenStackMetadataTestCase
nova.tests.unit.test_wsgi.TestWSGIServerWithSSL
nova.tests.unit.virt.disk.mount.test_nbd.NbdTestCase
nova.tests.unit.virt.ironic.test_driver.IronicDriverTestCase