9f2658d8cf
jsonutils have several benefits in comparison to pure json implementation, like enabling C boosted encoders and decoders for Python2.6 by using simplejson when available. Change-Id: I24d0cd442e8d9d89fac50e43fc97f7bb4a293c3d Closes-Bug: 1329496
844 lines
32 KiB
Python
844 lines
32 KiB
Python
# Copyright 2010 United States Government as represented by the
|
|
# Administrator of the National Aeronautics and Space Administration.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""Tests for metadata service."""
|
|
|
|
import base64
|
|
import hashlib
|
|
import hmac
|
|
import re
|
|
|
|
try:
|
|
import cPickle as pickle
|
|
except ImportError:
|
|
import pickle
|
|
|
|
import mock
|
|
from oslo.config import cfg
|
|
import webob
|
|
|
|
from nova.api.metadata import base
|
|
from nova.api.metadata import handler
|
|
from nova.api.metadata import password
|
|
from nova import block_device
|
|
from nova.compute import flavors
|
|
from nova.conductor import api as conductor_api
|
|
from nova import context
|
|
from nova import db
|
|
from nova.db.sqlalchemy import api
|
|
from nova import exception
|
|
from nova.network import api as network_api
|
|
from nova import objects
|
|
from nova.openstack.common import jsonutils
|
|
from nova import test
|
|
from nova.tests import fake_block_device
|
|
from nova.tests import fake_instance
|
|
from nova.tests import fake_network
|
|
from nova.tests.objects import test_instance_info_cache
|
|
from nova.tests.objects import test_security_group
|
|
from nova.virt import netutils
|
|
|
|
CONF = cfg.CONF
|
|
|
|
USER_DATA_STRING = ("This is an encoded string")
|
|
ENCODE_USER_DATA_STRING = base64.b64encode(USER_DATA_STRING)
|
|
|
|
INSTANCE = fake_instance.fake_db_instance(**
|
|
{'id': 1,
|
|
'uuid': 'b65cee2f-8c69-4aeb-be2f-f79742548fc2',
|
|
'name': 'fake',
|
|
'project_id': 'test',
|
|
'key_name': "mykey",
|
|
'key_data': "ssh-rsa AAAAB3Nzai....N3NtHw== someuser@somehost",
|
|
'host': 'test',
|
|
'launch_index': 1,
|
|
'instance_type': {'name': 'm1.tiny'},
|
|
'reservation_id': 'r-xxxxxxxx',
|
|
'user_data': ENCODE_USER_DATA_STRING,
|
|
'image_ref': 7,
|
|
'vcpus': 1,
|
|
'fixed_ips': [],
|
|
'root_device_name': '/dev/sda1',
|
|
'info_cache': test_instance_info_cache.fake_info_cache,
|
|
'hostname': 'test.novadomain',
|
|
'display_name': 'my_displayname',
|
|
'metadata': {},
|
|
'system_metadata': {},
|
|
})
|
|
|
|
|
|
def fake_inst_obj(context):
|
|
return objects.Instance._from_db_object(
|
|
context, objects.Instance(), INSTANCE,
|
|
expected_attrs=['metadata', 'system_metadata',
|
|
'info_cache'])
|
|
|
|
|
|
def get_default_sys_meta():
|
|
return flavors.save_flavor_info(
|
|
{}, flavors.get_default_flavor())
|
|
|
|
|
|
def return_non_existing_address(*args, **kwarg):
|
|
raise exception.NotFound()
|
|
|
|
|
|
def fake_InstanceMetadata(stubs, inst_data, address=None,
|
|
sgroups=None, content=None, extra_md=None,
|
|
vd_driver=None, network_info=None):
|
|
content = content or []
|
|
extra_md = extra_md or {}
|
|
if sgroups is None:
|
|
sgroups = [dict(test_security_group.fake_secgroup,
|
|
name='default')]
|
|
|
|
def sg_get(*args, **kwargs):
|
|
return sgroups
|
|
|
|
stubs.Set(api, 'security_group_get_by_instance', sg_get)
|
|
return base.InstanceMetadata(inst_data, address=address,
|
|
content=content, extra_md=extra_md,
|
|
vd_driver=vd_driver, network_info=network_info)
|
|
|
|
|
|
def fake_request(stubs, mdinst, relpath, address="127.0.0.1",
|
|
fake_get_metadata=None, headers=None,
|
|
fake_get_metadata_by_instance_id=None):
|
|
|
|
def get_metadata_by_remote_address(address):
|
|
return mdinst
|
|
|
|
app = handler.MetadataRequestHandler()
|
|
|
|
if fake_get_metadata is None:
|
|
fake_get_metadata = get_metadata_by_remote_address
|
|
|
|
if stubs:
|
|
stubs.Set(app, 'get_metadata_by_remote_address', fake_get_metadata)
|
|
|
|
if fake_get_metadata_by_instance_id:
|
|
stubs.Set(app, 'get_metadata_by_instance_id',
|
|
fake_get_metadata_by_instance_id)
|
|
|
|
request = webob.Request.blank(relpath)
|
|
request.remote_addr = address
|
|
|
|
if headers is not None:
|
|
request.headers.update(headers)
|
|
|
|
response = request.get_response(app)
|
|
return response
|
|
|
|
|
|
class MetadataTestCase(test.TestCase):
|
|
def setUp(self):
|
|
super(MetadataTestCase, self).setUp()
|
|
self.context = context.RequestContext('fake', 'fake')
|
|
self.instance = fake_inst_obj(self.context)
|
|
self.instance.system_metadata = get_default_sys_meta()
|
|
self.flags(use_local=True, group='conductor')
|
|
fake_network.stub_out_nw_api_get_instance_nw_info(self.stubs)
|
|
|
|
def test_can_pickle_metadata(self):
|
|
# Make sure that InstanceMetadata is possible to pickle. This is
|
|
# required for memcache backend to work correctly.
|
|
md = fake_InstanceMetadata(self.stubs, self.instance.obj_clone())
|
|
pickle.dumps(md, protocol=0)
|
|
|
|
def test_user_data(self):
|
|
inst = self.instance.obj_clone()
|
|
inst['user_data'] = base64.b64encode("happy")
|
|
md = fake_InstanceMetadata(self.stubs, inst)
|
|
self.assertEqual(
|
|
md.get_ec2_metadata(version='2009-04-04')['user-data'], "happy")
|
|
|
|
def test_no_user_data(self):
|
|
inst = self.instance.obj_clone()
|
|
inst.user_data = None
|
|
md = fake_InstanceMetadata(self.stubs, inst)
|
|
obj = object()
|
|
self.assertEqual(
|
|
md.get_ec2_metadata(version='2009-04-04').get('user-data', obj),
|
|
obj)
|
|
|
|
def test_security_groups(self):
|
|
inst = self.instance.obj_clone()
|
|
sgroups = [dict(test_security_group.fake_secgroup, name='default'),
|
|
dict(test_security_group.fake_secgroup, name='other')]
|
|
expected = ['default', 'other']
|
|
|
|
md = fake_InstanceMetadata(self.stubs, inst, sgroups=sgroups)
|
|
data = md.get_ec2_metadata(version='2009-04-04')
|
|
self.assertEqual(data['meta-data']['security-groups'], expected)
|
|
|
|
def test_local_hostname_fqdn(self):
|
|
md = fake_InstanceMetadata(self.stubs, self.instance.obj_clone())
|
|
data = md.get_ec2_metadata(version='2009-04-04')
|
|
self.assertEqual(data['meta-data']['local-hostname'],
|
|
"%s.%s" % (self.instance['hostname'], CONF.dhcp_domain))
|
|
|
|
def test_format_instance_mapping(self):
|
|
# Make sure that _format_instance_mappings works.
|
|
ctxt = None
|
|
instance_ref0 = objects.Instance(**{'id': 0,
|
|
'uuid': 'e5fe5518-0288-4fa3-b0c4-c79764101b85',
|
|
'root_device_name': None,
|
|
'default_ephemeral_device': None,
|
|
'default_swap_device': None})
|
|
instance_ref1 = objects.Instance(**{'id': 0,
|
|
'uuid': 'b65cee2f-8c69-4aeb-be2f-f79742548fc2',
|
|
'root_device_name': '/dev/sda1',
|
|
'default_ephemeral_device': None,
|
|
'default_swap_device': None})
|
|
|
|
def fake_bdm_get(ctxt, uuid, use_slave=False):
|
|
return [fake_block_device.FakeDbBlockDeviceDict(
|
|
{'volume_id': 87654321,
|
|
'snapshot_id': None,
|
|
'no_device': None,
|
|
'source_type': 'volume',
|
|
'destination_type': 'volume',
|
|
'delete_on_termination': True,
|
|
'device_name': '/dev/sdh'}),
|
|
fake_block_device.FakeDbBlockDeviceDict(
|
|
{'volume_id': None,
|
|
'snapshot_id': None,
|
|
'no_device': None,
|
|
'source_type': 'blank',
|
|
'destination_type': 'local',
|
|
'guest_format': 'swap',
|
|
'delete_on_termination': None,
|
|
'device_name': '/dev/sdc'}),
|
|
fake_block_device.FakeDbBlockDeviceDict(
|
|
{'volume_id': None,
|
|
'snapshot_id': None,
|
|
'no_device': None,
|
|
'source_type': 'blank',
|
|
'destination_type': 'local',
|
|
'guest_format': None,
|
|
'delete_on_termination': None,
|
|
'device_name': '/dev/sdb'})]
|
|
|
|
self.stubs.Set(db, 'block_device_mapping_get_all_by_instance',
|
|
fake_bdm_get)
|
|
|
|
expected = {'ami': 'sda1',
|
|
'root': '/dev/sda1',
|
|
'ephemeral0': '/dev/sdb',
|
|
'swap': '/dev/sdc',
|
|
'ebs0': '/dev/sdh'}
|
|
|
|
conductor_api.LocalAPI()
|
|
|
|
self.assertEqual(base._format_instance_mapping(ctxt,
|
|
instance_ref0), block_device._DEFAULT_MAPPINGS)
|
|
self.assertEqual(base._format_instance_mapping(ctxt,
|
|
instance_ref1), expected)
|
|
|
|
def test_pubkey(self):
|
|
md = fake_InstanceMetadata(self.stubs, self.instance.obj_clone())
|
|
pubkey_ent = md.lookup("/2009-04-04/meta-data/public-keys")
|
|
|
|
self.assertEqual(base.ec2_md_print(pubkey_ent),
|
|
"0=%s" % self.instance['key_name'])
|
|
self.assertEqual(base.ec2_md_print(pubkey_ent['0']['openssh-key']),
|
|
self.instance['key_data'])
|
|
|
|
def test_image_type_ramdisk(self):
|
|
inst = self.instance.obj_clone()
|
|
inst['ramdisk_id'] = 'ari-853667c0'
|
|
md = fake_InstanceMetadata(self.stubs, inst)
|
|
data = md.lookup("/latest/meta-data/ramdisk-id")
|
|
|
|
self.assertIsNotNone(data)
|
|
self.assertTrue(re.match('ari-[0-9a-f]{8}', data))
|
|
|
|
def test_image_type_kernel(self):
|
|
inst = self.instance.obj_clone()
|
|
inst['kernel_id'] = 'aki-c2e26ff2'
|
|
md = fake_InstanceMetadata(self.stubs, inst)
|
|
data = md.lookup("/2009-04-04/meta-data/kernel-id")
|
|
|
|
self.assertTrue(re.match('aki-[0-9a-f]{8}', data))
|
|
|
|
self.assertEqual(
|
|
md.lookup("/ec2/2009-04-04/meta-data/kernel-id"), data)
|
|
|
|
inst.kernel_id = None
|
|
md = fake_InstanceMetadata(self.stubs, inst)
|
|
self.assertRaises(base.InvalidMetadataPath,
|
|
md.lookup, "/2009-04-04/meta-data/kernel-id")
|
|
|
|
def test_check_version(self):
|
|
inst = self.instance.obj_clone()
|
|
md = fake_InstanceMetadata(self.stubs, inst)
|
|
|
|
self.assertTrue(md._check_version('1.0', '2009-04-04'))
|
|
self.assertFalse(md._check_version('2009-04-04', '1.0'))
|
|
|
|
self.assertFalse(md._check_version('2009-04-04', '2008-09-01'))
|
|
self.assertTrue(md._check_version('2008-09-01', '2009-04-04'))
|
|
|
|
self.assertTrue(md._check_version('2009-04-04', '2009-04-04'))
|
|
|
|
def test_InstanceMetadata_uses_passed_network_info(self):
|
|
network_info = []
|
|
|
|
self.mox.StubOutWithMock(netutils, "get_injected_network_template")
|
|
netutils.get_injected_network_template(network_info).AndReturn(False)
|
|
self.mox.ReplayAll()
|
|
|
|
base.InstanceMetadata(fake_inst_obj(self.context),
|
|
network_info=network_info)
|
|
|
|
def test_InstanceMetadata_invoke_metadata_for_config_drive(self):
|
|
inst = self.instance.obj_clone()
|
|
inst_md = base.InstanceMetadata(inst)
|
|
for (path, value) in inst_md.metadata_for_config_drive():
|
|
self.assertIsNotNone(path)
|
|
|
|
def test_InstanceMetadata_queries_network_API_when_needed(self):
|
|
network_info_from_api = []
|
|
|
|
self.mox.StubOutWithMock(netutils, "get_injected_network_template")
|
|
|
|
netutils.get_injected_network_template(
|
|
network_info_from_api).AndReturn(False)
|
|
|
|
self.mox.ReplayAll()
|
|
|
|
base.InstanceMetadata(fake_inst_obj(self.context))
|
|
|
|
def test_local_ipv4_from_nw_info(self):
|
|
nw_info = fake_network.fake_get_instance_nw_info(self.stubs,
|
|
num_networks=2)
|
|
expected_local = "192.168.1.100"
|
|
md = fake_InstanceMetadata(self.stubs, self.instance,
|
|
network_info=nw_info)
|
|
data = md.get_ec2_metadata(version='2009-04-04')
|
|
self.assertEqual(data['meta-data']['local-ipv4'], expected_local)
|
|
|
|
def test_local_ipv4_from_address(self):
|
|
nw_info = fake_network.fake_get_instance_nw_info(self.stubs,
|
|
num_networks=2)
|
|
expected_local = "fake"
|
|
md = fake_InstanceMetadata(self.stubs, self.instance,
|
|
network_info=nw_info, address="fake")
|
|
data = md.get_ec2_metadata(version='2009-04-04')
|
|
self.assertEqual(data['meta-data']['local-ipv4'], expected_local)
|
|
|
|
def test_local_ipv4_from_nw_none(self):
|
|
md = fake_InstanceMetadata(self.stubs, self.instance,
|
|
network_info=[])
|
|
data = md.get_ec2_metadata(version='2009-04-04')
|
|
self.assertEqual(data['meta-data']['local-ipv4'], '')
|
|
|
|
|
|
class OpenStackMetadataTestCase(test.TestCase):
|
|
def setUp(self):
|
|
super(OpenStackMetadataTestCase, self).setUp()
|
|
self.context = context.RequestContext('fake', 'fake')
|
|
self.instance = fake_inst_obj(self.context)
|
|
self.instance['system_metadata'] = get_default_sys_meta()
|
|
self.flags(use_local=True, group='conductor')
|
|
fake_network.stub_out_nw_api_get_instance_nw_info(self.stubs)
|
|
|
|
def test_with_primitive_instance(self):
|
|
mdinst = fake_InstanceMetadata(self.stubs, INSTANCE)
|
|
result = mdinst.lookup('/openstack')
|
|
self.assertIn('latest', result)
|
|
|
|
def test_top_level_listing(self):
|
|
# request for /openstack/<version>/ should show metadata.json
|
|
inst = self.instance.obj_clone()
|
|
mdinst = fake_InstanceMetadata(self.stubs, inst)
|
|
|
|
result = mdinst.lookup("/openstack")
|
|
|
|
# trailing / should not affect anything
|
|
self.assertEqual(result, mdinst.lookup("/openstack/"))
|
|
|
|
# the 'content' should not show up in directory listing
|
|
self.assertNotIn(base.CONTENT_DIR, result)
|
|
self.assertIn('2012-08-10', result)
|
|
self.assertIn('latest', result)
|
|
|
|
def test_version_content_listing(self):
|
|
# request for /openstack/<version>/ should show metadata.json
|
|
inst = self.instance.obj_clone()
|
|
mdinst = fake_InstanceMetadata(self.stubs, inst)
|
|
|
|
listing = mdinst.lookup("/openstack/2012-08-10")
|
|
self.assertIn("meta_data.json", listing)
|
|
|
|
def test_returns_apis_supported_in_havana_version(self):
|
|
mdinst = fake_InstanceMetadata(self.stubs, self.instance)
|
|
havana_supported_apis = mdinst.lookup("/openstack/2013-10-17")
|
|
|
|
self.assertEqual([base.MD_JSON_NAME, base.UD_NAME, base.PASS_NAME,
|
|
base.VD_JSON_NAME], havana_supported_apis)
|
|
|
|
def test_returns_apis_supported_in_folsom_version(self):
|
|
mdinst = fake_InstanceMetadata(self.stubs, self.instance)
|
|
folsom_supported_apis = mdinst.lookup("/openstack/2012-08-10")
|
|
|
|
self.assertEqual([base.MD_JSON_NAME, base.UD_NAME],
|
|
folsom_supported_apis)
|
|
|
|
def test_returns_apis_supported_in_grizzly_version(self):
|
|
mdinst = fake_InstanceMetadata(self.stubs, self.instance)
|
|
grizzly_supported_apis = mdinst.lookup("/openstack/2013-04-04")
|
|
|
|
self.assertEqual([base.MD_JSON_NAME, base.UD_NAME, base.PASS_NAME],
|
|
grizzly_supported_apis)
|
|
|
|
def test_metadata_json(self):
|
|
inst = self.instance.obj_clone()
|
|
content = [
|
|
('/etc/my.conf', "content of my.conf"),
|
|
('/root/hello', "content of /root/hello"),
|
|
]
|
|
|
|
mdinst = fake_InstanceMetadata(self.stubs, inst,
|
|
content=content)
|
|
mdjson = mdinst.lookup("/openstack/2012-08-10/meta_data.json")
|
|
mdjson = mdinst.lookup("/openstack/latest/meta_data.json")
|
|
|
|
mddict = jsonutils.loads(mdjson)
|
|
|
|
self.assertEqual(mddict['uuid'], self.instance['uuid'])
|
|
self.assertIn('files', mddict)
|
|
|
|
self.assertIn('public_keys', mddict)
|
|
self.assertEqual(mddict['public_keys'][self.instance['key_name']],
|
|
self.instance['key_data'])
|
|
|
|
self.assertIn('launch_index', mddict)
|
|
self.assertEqual(mddict['launch_index'], self.instance['launch_index'])
|
|
|
|
# verify that each of the things we put in content
|
|
# resulted in an entry in 'files', that their content
|
|
# there is as expected, and that /content lists them.
|
|
for (path, content) in content:
|
|
fent = [f for f in mddict['files'] if f['path'] == path]
|
|
self.assertEqual(1, len(fent))
|
|
fent = fent[0]
|
|
found = mdinst.lookup("/openstack%s" % fent['content_path'])
|
|
self.assertEqual(found, content)
|
|
|
|
def test_extra_md(self):
|
|
# make sure extra_md makes it through to metadata
|
|
inst = self.instance.obj_clone()
|
|
extra = {'foo': 'bar', 'mylist': [1, 2, 3],
|
|
'mydict': {"one": 1, "two": 2}}
|
|
mdinst = fake_InstanceMetadata(self.stubs, inst, extra_md=extra)
|
|
|
|
mdjson = mdinst.lookup("/openstack/2012-08-10/meta_data.json")
|
|
mddict = jsonutils.loads(mdjson)
|
|
|
|
for key, val in extra.iteritems():
|
|
self.assertEqual(mddict[key], val)
|
|
|
|
def test_password(self):
|
|
# make sure extra_md makes it through to metadata
|
|
inst = self.instance.obj_clone()
|
|
mdinst = fake_InstanceMetadata(self.stubs, inst)
|
|
|
|
result = mdinst.lookup("/openstack/latest/password")
|
|
self.assertEqual(result, password.handle_password)
|
|
|
|
def test_userdata(self):
|
|
inst = self.instance.obj_clone()
|
|
mdinst = fake_InstanceMetadata(self.stubs, inst)
|
|
|
|
userdata_found = mdinst.lookup("/openstack/2012-08-10/user_data")
|
|
self.assertEqual(USER_DATA_STRING, userdata_found)
|
|
|
|
# since we had user-data in this instance, it should be in listing
|
|
self.assertIn('user_data', mdinst.lookup("/openstack/2012-08-10"))
|
|
|
|
inst.user_data = None
|
|
mdinst = fake_InstanceMetadata(self.stubs, inst)
|
|
|
|
# since this instance had no user-data it should not be there.
|
|
self.assertNotIn('user_data', mdinst.lookup("/openstack/2012-08-10"))
|
|
|
|
self.assertRaises(base.InvalidMetadataPath,
|
|
mdinst.lookup, "/openstack/2012-08-10/user_data")
|
|
|
|
def test_random_seed(self):
|
|
inst = self.instance.obj_clone()
|
|
mdinst = fake_InstanceMetadata(self.stubs, inst)
|
|
|
|
# verify that 2013-04-04 has the 'random' field
|
|
mdjson = mdinst.lookup("/openstack/2013-04-04/meta_data.json")
|
|
mddict = jsonutils.loads(mdjson)
|
|
|
|
self.assertIn("random_seed", mddict)
|
|
self.assertEqual(len(base64.b64decode(mddict["random_seed"])), 512)
|
|
|
|
# verify that older version do not have it
|
|
mdjson = mdinst.lookup("/openstack/2012-08-10/meta_data.json")
|
|
self.assertNotIn("random_seed", jsonutils.loads(mdjson))
|
|
|
|
def test_no_dashes_in_metadata(self):
|
|
# top level entries in meta_data should not contain '-' in their name
|
|
inst = self.instance.obj_clone()
|
|
mdinst = fake_InstanceMetadata(self.stubs, inst)
|
|
mdjson = jsonutils.loads(
|
|
mdinst.lookup("/openstack/latest/meta_data.json"))
|
|
|
|
self.assertEqual([], [k for k in mdjson.keys() if k.find("-") != -1])
|
|
|
|
def test_vendor_data_presence(self):
|
|
inst = self.instance.obj_clone()
|
|
mdinst = fake_InstanceMetadata(self.stubs, inst)
|
|
|
|
# verify that 2013-10-17 has the vendor_data.json file
|
|
result = mdinst.lookup("/openstack/2013-10-17")
|
|
self.assertIn('vendor_data.json', result)
|
|
|
|
# verify that older version do not have it
|
|
result = mdinst.lookup("/openstack/2013-04-04")
|
|
self.assertNotIn('vendor_data.json', result)
|
|
|
|
def test_vendor_data_response(self):
|
|
inst = self.instance.obj_clone()
|
|
|
|
mydata = {'mykey1': 'value1', 'mykey2': 'value2'}
|
|
|
|
class myVdriver(base.VendorDataDriver):
|
|
def __init__(self, *args, **kwargs):
|
|
super(myVdriver, self).__init__(*args, **kwargs)
|
|
data = mydata.copy()
|
|
uuid = kwargs['instance']['uuid']
|
|
data.update({'inst_uuid': uuid})
|
|
self.data = data
|
|
|
|
def get(self):
|
|
return self.data
|
|
|
|
mdinst = fake_InstanceMetadata(self.stubs, inst, vd_driver=myVdriver)
|
|
|
|
# verify that 2013-10-17 has the vendor_data.json file
|
|
vdpath = "/openstack/2013-10-17/vendor_data.json"
|
|
vd = jsonutils.loads(mdinst.lookup(vdpath))
|
|
|
|
# the instance should be passed through, and our class copies the
|
|
# uuid through to 'inst_uuid'.
|
|
self.assertEqual(vd['inst_uuid'], inst['uuid'])
|
|
|
|
# check the other expected values
|
|
for k, v in mydata.items():
|
|
self.assertEqual(vd[k], v)
|
|
|
|
|
|
class MetadataHandlerTestCase(test.TestCase):
|
|
"""Test that metadata is returning proper values."""
|
|
|
|
def setUp(self):
|
|
super(MetadataHandlerTestCase, self).setUp()
|
|
|
|
fake_network.stub_out_nw_api_get_instance_nw_info(self.stubs)
|
|
self.context = context.RequestContext('fake', 'fake')
|
|
self.instance = fake_inst_obj(self.context)
|
|
self.instance.system_metadata = get_default_sys_meta()
|
|
self.flags(use_local=True, group='conductor')
|
|
self.mdinst = fake_InstanceMetadata(self.stubs, self.instance,
|
|
address=None, sgroups=None)
|
|
|
|
def test_callable(self):
|
|
|
|
def verify(req, meta_data):
|
|
self.assertIsInstance(meta_data, CallableMD)
|
|
return "foo"
|
|
|
|
class CallableMD(object):
|
|
def lookup(self, path_info):
|
|
return verify
|
|
|
|
response = fake_request(self.stubs, CallableMD(), "/bar")
|
|
self.assertEqual(response.status_int, 200)
|
|
self.assertEqual(response.body, "foo")
|
|
|
|
def test_root(self):
|
|
expected = "\n".join(base.VERSIONS) + "\nlatest"
|
|
response = fake_request(self.stubs, self.mdinst, "/")
|
|
self.assertEqual(response.body, expected)
|
|
|
|
response = fake_request(self.stubs, self.mdinst, "/foo/../")
|
|
self.assertEqual(response.body, expected)
|
|
|
|
def test_root_metadata_proxy_enabled(self):
|
|
self.flags(service_metadata_proxy=True,
|
|
group='neutron')
|
|
|
|
expected = "\n".join(base.VERSIONS) + "\nlatest"
|
|
response = fake_request(self.stubs, self.mdinst, "/")
|
|
self.assertEqual(response.body, expected)
|
|
|
|
response = fake_request(self.stubs, self.mdinst, "/foo/../")
|
|
self.assertEqual(response.body, expected)
|
|
|
|
def test_version_root(self):
|
|
response = fake_request(self.stubs, self.mdinst, "/2009-04-04")
|
|
response_ctype = response.headers['Content-Type']
|
|
self.assertTrue(response_ctype.startswith("text/plain"))
|
|
self.assertEqual(response.body, 'meta-data/\nuser-data')
|
|
|
|
response = fake_request(self.stubs, self.mdinst, "/9999-99-99")
|
|
self.assertEqual(response.status_int, 404)
|
|
|
|
def test_json_data(self):
|
|
response = fake_request(self.stubs, self.mdinst,
|
|
"/openstack/latest/meta_data.json")
|
|
response_ctype = response.headers['Content-Type']
|
|
self.assertTrue(response_ctype.startswith("application/json"))
|
|
|
|
response = fake_request(self.stubs, self.mdinst,
|
|
"/openstack/latest/vendor_data.json")
|
|
response_ctype = response.headers['Content-Type']
|
|
self.assertTrue(response_ctype.startswith("application/json"))
|
|
|
|
def test_user_data_non_existing_fixed_address(self):
|
|
self.stubs.Set(network_api.API, 'get_fixed_ip_by_address',
|
|
return_non_existing_address)
|
|
response = fake_request(None, self.mdinst, "/2009-04-04/user-data",
|
|
"127.1.1.1")
|
|
self.assertEqual(response.status_int, 404)
|
|
|
|
def test_fixed_address_none(self):
|
|
response = fake_request(None, self.mdinst,
|
|
relpath="/2009-04-04/user-data", address=None)
|
|
self.assertEqual(response.status_int, 500)
|
|
|
|
def test_invalid_path_is_404(self):
|
|
response = fake_request(self.stubs, self.mdinst,
|
|
relpath="/2009-04-04/user-data-invalid")
|
|
self.assertEqual(response.status_int, 404)
|
|
|
|
def test_user_data_with_use_forwarded_header(self):
|
|
expected_addr = "192.192.192.2"
|
|
|
|
def fake_get_metadata(address):
|
|
if address == expected_addr:
|
|
return self.mdinst
|
|
else:
|
|
raise Exception("Expected addr of %s, got %s" %
|
|
(expected_addr, address))
|
|
|
|
self.flags(use_forwarded_for=True)
|
|
response = fake_request(self.stubs, self.mdinst,
|
|
relpath="/2009-04-04/user-data",
|
|
address="168.168.168.1",
|
|
fake_get_metadata=fake_get_metadata,
|
|
headers={'X-Forwarded-For': expected_addr})
|
|
|
|
self.assertEqual(response.status_int, 200)
|
|
response_ctype = response.headers['Content-Type']
|
|
self.assertTrue(response_ctype.startswith("text/plain"))
|
|
self.assertEqual(response.body,
|
|
base64.b64decode(self.instance['user_data']))
|
|
|
|
response = fake_request(self.stubs, self.mdinst,
|
|
relpath="/2009-04-04/user-data",
|
|
address="168.168.168.1",
|
|
fake_get_metadata=fake_get_metadata,
|
|
headers=None)
|
|
self.assertEqual(response.status_int, 500)
|
|
|
|
@mock.patch('nova.utils.constant_time_compare')
|
|
def test_by_instance_id_uses_constant_time_compare(self, mock_compare):
|
|
mock_compare.side_effect = test.TestingException
|
|
|
|
req = webob.Request.blank('/')
|
|
hnd = handler.MetadataRequestHandler()
|
|
|
|
req.headers['X-Instance-ID'] = 'fake-inst'
|
|
req.headers['X-Tenant-ID'] = 'fake-proj'
|
|
|
|
self.assertRaises(test.TestingException,
|
|
hnd._handle_instance_id_request, req)
|
|
|
|
self.assertEqual(1, mock_compare.call_count)
|
|
|
|
def test_user_data_with_neutron_instance_id(self):
|
|
expected_instance_id = 'a-b-c-d'
|
|
|
|
def fake_get_metadata(instance_id, remote_address):
|
|
if remote_address is None:
|
|
raise Exception('Expected X-Forwared-For header')
|
|
elif instance_id == expected_instance_id:
|
|
return self.mdinst
|
|
else:
|
|
# raise the exception to aid with 500 response code test
|
|
raise Exception("Expected instance_id of %s, got %s" %
|
|
(expected_instance_id, instance_id))
|
|
|
|
signed = hmac.new(
|
|
CONF.neutron.metadata_proxy_shared_secret,
|
|
expected_instance_id,
|
|
hashlib.sha256).hexdigest()
|
|
|
|
# try a request with service disabled
|
|
response = fake_request(
|
|
self.stubs, self.mdinst,
|
|
relpath="/2009-04-04/user-data",
|
|
address="192.192.192.2",
|
|
headers={'X-Instance-ID': 'a-b-c-d',
|
|
'X-Tenant-ID': 'test',
|
|
'X-Instance-ID-Signature': signed})
|
|
self.assertEqual(response.status_int, 200)
|
|
|
|
# now enable the service
|
|
self.flags(service_metadata_proxy=True,
|
|
group='neutron')
|
|
response = fake_request(
|
|
self.stubs, self.mdinst,
|
|
relpath="/2009-04-04/user-data",
|
|
address="192.192.192.2",
|
|
fake_get_metadata_by_instance_id=fake_get_metadata,
|
|
headers={'X-Forwarded-For': '192.192.192.2',
|
|
'X-Instance-ID': 'a-b-c-d',
|
|
'X-Tenant-ID': 'test',
|
|
'X-Instance-ID-Signature': signed})
|
|
|
|
self.assertEqual(response.status_int, 200)
|
|
response_ctype = response.headers['Content-Type']
|
|
self.assertTrue(response_ctype.startswith("text/plain"))
|
|
self.assertEqual(response.body,
|
|
base64.b64decode(self.instance['user_data']))
|
|
|
|
# mismatched signature
|
|
response = fake_request(
|
|
self.stubs, self.mdinst,
|
|
relpath="/2009-04-04/user-data",
|
|
address="192.192.192.2",
|
|
fake_get_metadata_by_instance_id=fake_get_metadata,
|
|
headers={'X-Forwarded-For': '192.192.192.2',
|
|
'X-Instance-ID': 'a-b-c-d',
|
|
'X-Tenant-ID': 'test',
|
|
'X-Instance-ID-Signature': ''})
|
|
|
|
self.assertEqual(response.status_int, 403)
|
|
|
|
# missing X-Tenant-ID from request
|
|
response = fake_request(
|
|
self.stubs, self.mdinst,
|
|
relpath="/2009-04-04/user-data",
|
|
address="192.192.192.2",
|
|
fake_get_metadata_by_instance_id=fake_get_metadata,
|
|
headers={'X-Forwarded-For': '192.192.192.2',
|
|
'X-Instance-ID': 'a-b-c-d',
|
|
'X-Instance-ID-Signature': signed})
|
|
|
|
self.assertEqual(response.status_int, 400)
|
|
|
|
# mismatched X-Tenant-ID
|
|
response = fake_request(
|
|
self.stubs, self.mdinst,
|
|
relpath="/2009-04-04/user-data",
|
|
address="192.192.192.2",
|
|
fake_get_metadata_by_instance_id=fake_get_metadata,
|
|
headers={'X-Forwarded-For': '192.192.192.2',
|
|
'X-Instance-ID': 'a-b-c-d',
|
|
'X-Tenant-ID': 'FAKE',
|
|
'X-Instance-ID-Signature': signed})
|
|
|
|
self.assertEqual(response.status_int, 404)
|
|
|
|
# without X-Forwarded-For
|
|
response = fake_request(
|
|
self.stubs, self.mdinst,
|
|
relpath="/2009-04-04/user-data",
|
|
address="192.192.192.2",
|
|
fake_get_metadata_by_instance_id=fake_get_metadata,
|
|
headers={'X-Instance-ID': 'a-b-c-d',
|
|
'X-Tenant-ID': 'test',
|
|
'X-Instance-ID-Signature': signed})
|
|
|
|
self.assertEqual(response.status_int, 500)
|
|
|
|
# unexpected Instance-ID
|
|
signed = hmac.new(
|
|
CONF.neutron.metadata_proxy_shared_secret,
|
|
'z-z-z-z',
|
|
hashlib.sha256).hexdigest()
|
|
|
|
response = fake_request(
|
|
self.stubs, self.mdinst,
|
|
relpath="/2009-04-04/user-data",
|
|
address="192.192.192.2",
|
|
fake_get_metadata_by_instance_id=fake_get_metadata,
|
|
headers={'X-Forwarded-For': '192.192.192.2',
|
|
'X-Instance-ID': 'z-z-z-z',
|
|
'X-Tenant-ID': 'test',
|
|
'X-Instance-ID-Signature': signed})
|
|
self.assertEqual(response.status_int, 500)
|
|
|
|
|
|
class MetadataPasswordTestCase(test.TestCase):
|
|
def setUp(self):
|
|
super(MetadataPasswordTestCase, self).setUp()
|
|
fake_network.stub_out_nw_api_get_instance_nw_info(self.stubs)
|
|
self.context = context.RequestContext('fake', 'fake')
|
|
self.instance = fake_inst_obj(self.context)
|
|
self.instance.system_metadata = get_default_sys_meta()
|
|
self.flags(use_local=True, group='conductor')
|
|
self.mdinst = fake_InstanceMetadata(self.stubs, self.instance,
|
|
address=None, sgroups=None)
|
|
self.flags(use_local=True, group='conductor')
|
|
|
|
def test_get_password(self):
|
|
request = webob.Request.blank('')
|
|
self.mdinst.password = 'foo'
|
|
result = password.handle_password(request, self.mdinst)
|
|
self.assertEqual(result, 'foo')
|
|
|
|
def test_bad_method(self):
|
|
request = webob.Request.blank('')
|
|
request.method = 'PUT'
|
|
self.assertRaises(webob.exc.HTTPBadRequest,
|
|
password.handle_password, request, self.mdinst)
|
|
|
|
@mock.patch('nova.objects.Instance.get_by_uuid')
|
|
def _try_set_password(self, get_by_uuid, val='bar'):
|
|
request = webob.Request.blank('')
|
|
request.method = 'POST'
|
|
request.body = val
|
|
get_by_uuid.return_value = self.instance
|
|
|
|
with mock.patch.object(self.instance, 'save') as save:
|
|
password.handle_password(request, self.mdinst)
|
|
save.assert_called_once_with()
|
|
|
|
self.assertIn('password_0', self.instance.system_metadata)
|
|
|
|
def test_set_password(self):
|
|
self.mdinst.password = ''
|
|
self._try_set_password()
|
|
|
|
def test_conflict(self):
|
|
self.mdinst.password = 'foo'
|
|
self.assertRaises(webob.exc.HTTPConflict,
|
|
self._try_set_password)
|
|
|
|
def test_too_large(self):
|
|
self.mdinst.password = ''
|
|
self.assertRaises(webob.exc.HTTPBadRequest,
|
|
self._try_set_password,
|
|
val=('a' * (password.MAX_SIZE + 1)))
|