Merge "tests: Split external service fixtures out"

This commit is contained in:
Zuul 2021-05-20 14:06:56 +00:00 committed by Gerrit Code Review
commit f44abb03a0
8 changed files with 1835 additions and 1703 deletions

View File

@ -13,7 +13,11 @@
from .api_paste import ApiPasteNoProjectId # noqa: F401
from .api_paste import ApiPasteV21Fixture # noqa: F401
from .cast_as_call import CastAsCallFixture # noqa: F401
from .conf import ConfFixture # noqa: F401
from .cinder import CinderFixture # noqa: F401
from .conf import ConfFixture # noqa: F401, F403
from .cyborg import CyborgFixture # noqa: F401
from .glance import GlanceFixture # noqa: F401
from .neutron import NeutronFixture # noqa: F401
from .nova import * # noqa: F401, F403
from .policy import OverridePolicyFixture # noqa: F401
from .policy import PolicyFixture # noqa: F401

329
nova/tests/fixtures/cinder.py vendored Normal file
View File

@ -0,0 +1,329 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Cinder fixture."""
import collections
import fixtures
from oslo_log import log as logging
from oslo_utils import uuidutils
from nova import exception
from nova.tests.fixtures import nova as nova_fixtures
LOG = logging.getLogger(__name__)
class CinderFixture(fixtures.Fixture):
"""A fixture to volume operations with the new Cinder attach/detach API"""
# the default project_id in OSAPIFixtures
tenant_id = nova_fixtures.PROJECT_ID
SWAP_OLD_VOL = 'a07f71dc-8151-4e7d-a0cc-cd24a3f11113'
SWAP_NEW_VOL = '227cc671-f30b-4488-96fd-7d0bf13648d8'
SWAP_ERR_OLD_VOL = '828419fa-3efb-4533-b458-4267ca5fe9b1'
SWAP_ERR_NEW_VOL = '9c6d9c2d-7a8f-4c80-938d-3bf062b8d489'
SWAP_ERR_ATTACH_ID = '4a3cd440-b9c2-11e1-afa6-0800200c9a66'
MULTIATTACH_VOL = '4757d51f-54eb-4442-8684-3399a6431f67'
# This represents a bootable image-backed volume to test
# boot-from-volume scenarios.
IMAGE_BACKED_VOL = '6ca404f3-d844-4169-bb96-bc792f37de98'
# This represents a bootable image-backed volume with required traits
# as part of volume image metadata
IMAGE_WITH_TRAITS_BACKED_VOL = '6194fc02-c60e-4a01-a8e5-600798208b5f'
def __init__(self, test, az='nova'):
"""Initialize this instance of the CinderFixture.
:param test: The TestCase using this fixture.
:param az: The availability zone to return in volume GET responses.
Defaults to "nova" since that is the default we would see
from Cinder's storage_availability_zone config option.
"""
super().__init__()
self.test = test
self.swap_volume_instance_uuid = None
self.swap_volume_instance_error_uuid = None
self.attachment_error_id = None
self.az = az
# A dict, keyed by volume id, to a dict, keyed by attachment id,
# with keys:
# - id: the attachment id
# - instance_uuid: uuid of the instance attached to the volume
# - connector: host connector dict; None if not connected
# Note that a volume can have multiple attachments even without
# multi-attach, as some flows create a blank 'reservation' attachment
# before deleting another attachment. However, a non-multiattach volume
# can only have at most one attachment with a host connector at a time.
self.volume_to_attachment = collections.defaultdict(dict)
def setUp(self):
super().setUp()
def fake_get(self_api, context, volume_id, microversion=None):
# Check for the special swap volumes.
attachments = self.volume_to_attachment[volume_id]
if volume_id in (self.SWAP_OLD_VOL, self.SWAP_ERR_OLD_VOL):
volume = {
'status': 'available',
'display_name': 'TEST1',
'attach_status': 'detached',
'id': volume_id,
'multiattach': False,
'size': 1
}
if (
(
self.swap_volume_instance_uuid and
volume_id == self.SWAP_OLD_VOL
) or (
self.swap_volume_instance_error_uuid and
volume_id == self.SWAP_ERR_OLD_VOL
)
):
if volume_id == self.SWAP_OLD_VOL:
instance_uuid = self.swap_volume_instance_uuid
else:
instance_uuid = self.swap_volume_instance_error_uuid
if attachments:
attachment = list(attachments.values())[0]
volume.update({
'status': 'in-use',
'attachments': {
instance_uuid: {
'mountpoint': '/dev/vdb',
'attachment_id': attachment['id']
}
},
'attach_status': 'attached',
})
return volume
# Check to see if the volume is attached.
if attachments:
# The volume is attached.
attachment = list(attachments.values())[0]
volume = {
'status': 'in-use',
'display_name': volume_id,
'attach_status': 'attached',
'id': volume_id,
'multiattach': volume_id == self.MULTIATTACH_VOL,
'size': 1,
'attachments': {
attachment['instance_uuid']: {
'attachment_id': attachment['id'],
'mountpoint': '/dev/vdb'
}
}
}
else:
# This is a test that does not care about the actual details.
volume = {
'status': 'available',
'display_name': 'TEST2',
'attach_status': 'detached',
'id': volume_id,
'multiattach': volume_id == self.MULTIATTACH_VOL,
'size': 1
}
if 'availability_zone' not in volume:
volume['availability_zone'] = self.az
# Check for our special image-backed volume.
if volume_id in (
self.IMAGE_BACKED_VOL, self.IMAGE_WITH_TRAITS_BACKED_VOL,
):
# Make it a bootable volume.
volume['bootable'] = True
if volume_id == self.IMAGE_BACKED_VOL:
# Add the image_id metadata.
volume['volume_image_metadata'] = {
# There would normally be more image metadata in here.
'image_id': '155d900f-4e14-4e4c-a73d-069cbf4541e6'
}
elif volume_id == self.IMAGE_WITH_TRAITS_BACKED_VOL:
# Add the image_id metadata with traits.
volume['volume_image_metadata'] = {
'image_id': '155d900f-4e14-4e4c-a73d-069cbf4541e6',
"trait:HW_CPU_X86_SGX": "required",
}
return volume
def fake_migrate_volume_completion(
_self, context, old_volume_id, new_volume_id, error,
):
return {'save_volume_id': new_volume_id}
def _find_attachment(attachment_id):
"""Find attachment corresponding to ``attachment_id``.
:returns: A tuple of the volume ID, an attachment dict for the
given attachment ID, and a dict (keyed by attachment id) of
attachment dicts for the volume.
"""
for volume_id, attachments in self.volume_to_attachment.items():
for attachment in attachments.values():
if attachment_id == attachment['id']:
return volume_id, attachment, attachments
raise exception.VolumeAttachmentNotFound(
attachment_id=attachment_id)
def fake_attachment_create(
_self, context, volume_id, instance_uuid, connector=None,
mountpoint=None,
):
attachment_id = uuidutils.generate_uuid()
if self.attachment_error_id is not None:
attachment_id = self.attachment_error_id
attachment = {'id': attachment_id, 'connection_info': {'data': {}}}
self.volume_to_attachment[volume_id][attachment_id] = {
'id': attachment_id,
'instance_uuid': instance_uuid,
'connector': connector,
}
LOG.info(
'Created attachment %s for volume %s. Total attachments '
'for volume: %d',
attachment_id, volume_id,
len(self.volume_to_attachment[volume_id]))
return attachment
def fake_attachment_delete(_self, context, attachment_id):
# 'attachment' is a tuple defining a attachment-instance mapping
volume_id, attachment, attachments = (
_find_attachment(attachment_id))
del attachments[attachment_id]
LOG.info(
'Deleted attachment %s for volume %s. Total attachments '
'for volume: %d',
attachment_id, volume_id, len(attachments))
def fake_attachment_update(
_self, context, attachment_id, connector, mountpoint=None,
):
# Ensure the attachment exists
volume_id, attachment, attachments = _find_attachment(
attachment_id)
# Cinder will only allow one "connected" attachment per
# non-multiattach volume at a time.
if volume_id != self.MULTIATTACH_VOL:
for _attachment in attachments.values():
if _attachment['connector'] is not None:
raise exception.InvalidInput(
'Volume %s is already connected with attachment '
'%s on host %s' % (
volume_id, _attachment['id'],
_attachment['connector'].get('host')))
attachment['connector'] = connector
LOG.info('Updating volume attachment: %s', attachment_id)
attachment_ref = {
'id': attachment_id,
'connection_info': {
'driver_volume_type': 'fake',
'data': {
'foo': 'bar',
'target_lun': '1'
}
}
}
if attachment_id == self.SWAP_ERR_ATTACH_ID:
# This intentionally triggers a TypeError for the
# instance.volume_swap.error versioned notification tests.
attachment_ref = {'connection_info': ()}
return attachment_ref
def fake_attachment_get(_self, context, attachment_id):
# Ensure the attachment exists
_find_attachment(attachment_id)
attachment_ref = {
'driver_volume_type': 'fake_type',
'id': attachment_id,
'connection_info': {
'data': {
'foo': 'bar',
'target_lun': '1',
},
},
}
return attachment_ref
def fake_get_all_volume_types(*args, **kwargs):
return [{
# This is used in the 2.67 API sample test.
'id': '5f9204ec-3e94-4f27-9beb-fe7bb73b6eb9',
'name': 'lvm-1'
}]
def fake_attachment_complete(_self, _context, attachment_id):
# Ensure the attachment exists
_find_attachment(attachment_id)
LOG.info('Completing volume attachment: %s', attachment_id)
self.test.stub_out(
'nova.volume.cinder.API.attachment_create', fake_attachment_create)
self.test.stub_out(
'nova.volume.cinder.API.attachment_delete', fake_attachment_delete)
self.test.stub_out(
'nova.volume.cinder.API.attachment_update', fake_attachment_update)
self.test.stub_out(
'nova.volume.cinder.API.attachment_complete',
fake_attachment_complete)
self.test.stub_out(
'nova.volume.cinder.API.attachment_get', fake_attachment_get)
self.test.stub_out(
'nova.volume.cinder.API.begin_detaching',
lambda *args, **kwargs: None)
self.test.stub_out('nova.volume.cinder.API.get', fake_get)
self.test.stub_out(
'nova.volume.cinder.API.migrate_volume_completion',
fake_migrate_volume_completion)
self.test.stub_out(
'nova.volume.cinder.API.roll_detaching',
lambda *args, **kwargs: None)
self.test.stub_out(
'nova.volume.cinder.is_microversion_supported',
lambda ctxt, microversion: None)
self.test.stub_out(
'nova.volume.cinder.API.check_attached',
lambda *args, **kwargs: None)
self.test.stub_out(
'nova.volume.cinder.API.get_all_volume_types',
fake_get_all_volume_types)
def volume_ids_for_instance(self, instance_uuid):
for volume_id, attachments in self.volume_to_attachment.items():
for attachment in attachments.values():
if attachment['instance_uuid'] == instance_uuid:
# we might have multiple volumes attached to this instance
# so yield rather than return
yield volume_id
break
def attachment_ids_for_instance(self, instance_uuid):
attachment_ids = []
for volume_id, attachments in self.volume_to_attachment.items():
for attachment in attachments.values():
if attachment['instance_uuid'] == instance_uuid:
attachment_ids.append(attachment['id'])
return attachment_ids

184
nova/tests/fixtures/cyborg.py vendored Normal file
View File

@ -0,0 +1,184 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import copy
import fixtures
def _get_device_profile(dp_name, trait):
dp = [
{
'name': dp_name,
'uuid': 'cbec22f3-ac29-444e-b4bb-98509f32faae',
'groups': [
{
'resources:FPGA': '1',
'trait:' + trait: 'required',
},
],
# Skipping links key in Cyborg API return value
},
]
return dp
def get_arqs(dp_name):
arq = {
'uuid': 'b59d34d3-787b-4fb0-a6b9-019cd81172f8',
'device_profile_name': dp_name,
'device_profile_group_id': 0,
'state': 'Initial',
'device_rp_uuid': None,
'hostname': None,
'instance_uuid': None,
'attach_handle_info': {},
'attach_handle_type': '',
}
bound_arq = copy.deepcopy(arq)
bound_arq.update({
'state': 'Bound',
'attach_handle_type': 'TEST_PCI',
'attach_handle_info': {
'bus': '0c',
'device': '0',
'domain': '0000',
'function': '0'
},
})
return [arq], [bound_arq]
class CyborgFixture(fixtures.Fixture):
"""Fixture that mocks Cyborg APIs used by nova/accelerator/cyborg.py"""
dp_name = 'fakedev-dp'
trait = 'CUSTOM_FAKE_DEVICE'
arq_list, bound_arq_list = get_arqs(dp_name)
# NOTE(Sundar): The bindings passed to the fake_bind_arqs() from the
# conductor are indexed by ARQ UUID and include the host name, device
# RP UUID and instance UUID. (See params to fake_bind_arqs below.)
#
# Later, when the compute manager calls fake_get_arqs_for_instance() with
# the instance UUID, the returned ARQs must contain the host name and
# device RP UUID. But these can vary from test to test.
#
# So, fake_bind_arqs() below takes bindings indexed by ARQ UUID and
# converts them to bindings indexed by instance UUID, which are then
# stored in the dict below. This dict looks like:
# { $instance_uuid: [
# {'hostname': $hostname,
# 'device_rp_uuid': $device_rp_uuid,
# 'arq_uuid': $arq_uuid
# }
# ]
# }
# Since it is indexed by instance UUID, and that is presumably unique
# across concurrently executing tests, this should be safe for
# concurrent access.
bindings_by_instance = {}
def setUp(self):
super().setUp()
self.mock_get_dp = self.useFixture(fixtures.MockPatch(
'nova.accelerator.cyborg._CyborgClient._get_device_profile_list',
return_value=_get_device_profile(self.dp_name, self.trait))).mock
self.mock_create_arqs = self.useFixture(fixtures.MockPatch(
'nova.accelerator.cyborg._CyborgClient._create_arqs',
return_value=self.arq_list)).mock
self.mock_bind_arqs = self.useFixture(fixtures.MockPatch(
'nova.accelerator.cyborg._CyborgClient.bind_arqs',
side_effect=self.fake_bind_arqs)).mock
self.mock_get_arqs = self.useFixture(fixtures.MockPatch(
'nova.accelerator.cyborg._CyborgClient.'
'get_arqs_for_instance',
side_effect=self.fake_get_arqs_for_instance)).mock
self.mock_del_arqs = self.useFixture(fixtures.MockPatch(
'nova.accelerator.cyborg._CyborgClient.'
'delete_arqs_for_instance',
side_effect=self.fake_delete_arqs_for_instance)).mock
@staticmethod
def fake_bind_arqs(bindings):
"""Simulate Cyborg ARQ bindings.
Since Nova calls Cyborg for binding on per-instance basis, the
instance UUIDs would be the same for all ARQs in a single call.
This function converts bindings indexed by ARQ UUID to bindings
indexed by instance UUID, so that fake_get_arqs_for_instance can
retrieve them later.
:param bindings:
{ "$arq_uuid": {
"hostname": STRING
"device_rp_uuid": UUID
"instance_uuid": UUID
},
...
}
:returns: None
"""
binding_by_instance = collections.defaultdict(list)
for index, arq_uuid in enumerate(bindings):
arq_binding = bindings[arq_uuid]
# instance_uuid is same for all ARQs in a single call.
instance_uuid = arq_binding['instance_uuid']
newbinding = {
'hostname': arq_binding['hostname'],
'device_rp_uuid': arq_binding['device_rp_uuid'],
'arq_uuid': arq_uuid,
}
binding_by_instance[instance_uuid].append(newbinding)
CyborgFixture.bindings_by_instance.update(binding_by_instance)
@staticmethod
def fake_get_arqs_for_instance(instance_uuid, only_resolved=False):
"""Get list of bound ARQs for this instance.
This function uses bindings indexed by instance UUID to
populate the bound ARQ templates in CyborgFixture.bound_arq_list.
"""
arq_host_rp_list = CyborgFixture.bindings_by_instance.get(
instance_uuid)
if not arq_host_rp_list:
return []
# The above looks like:
# [{'hostname': $hostname,
# 'device_rp_uuid': $device_rp_uuid,
# 'arq_uuid': $arq_uuid
# }]
bound_arq_list = copy.deepcopy(CyborgFixture.bound_arq_list)
for arq in bound_arq_list:
match = [
(
arq_host_rp['hostname'],
arq_host_rp['device_rp_uuid'],
instance_uuid,
)
for arq_host_rp in arq_host_rp_list
if arq_host_rp['arq_uuid'] == arq['uuid']
]
# Only 1 ARQ UUID would match, so len(match) == 1
arq['hostname'], arq['device_rp_uuid'], arq['instance_uuid'] = (
match[0][0], match[0][1], match[0][2],
)
return bound_arq_list
@staticmethod
def fake_delete_arqs_for_instance(instance_uuid):
CyborgFixture.bindings_by_instance.pop(instance_uuid, None)

335
nova/tests/fixtures/glance.py vendored Normal file
View File

@ -0,0 +1,335 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import datetime
import fixtures
from oslo_log import log as logging
from oslo_utils import uuidutils
from nova import exception
from nova.objects import fields as obj_fields
from nova.tests.fixtures import nova as nova_fixtures
LOG = logging.getLogger(__name__)
class GlanceFixture(fixtures.Fixture):
"""A fixture for simulating Glance."""
# NOTE(justinsb): The OpenStack API can't upload an image?
# So, make sure we've got one..
timestamp = datetime.datetime(2011, 1, 1, 1, 2, 3)
image1 = {
'id': '155d900f-4e14-4e4c-a73d-069cbf4541e6',
'name': 'fakeimage123456',
'created_at': timestamp,
'updated_at': timestamp,
'deleted_at': None,
'deleted': False,
'status': 'active',
'is_public': False,
'container_format': 'raw',
'disk_format': 'raw',
'size': '25165824',
'min_ram': 0,
'min_disk': 0,
'protected': False,
'visibility': 'public',
'tags': ['tag1', 'tag2'],
'properties': {
'kernel_id': 'nokernel',
'ramdisk_id': 'nokernel',
'architecture': obj_fields.Architecture.X86_64,
},
}
image2 = {
'id': 'a2459075-d96c-40d5-893e-577ff92e721c',
'name': 'fakeimage123456',
'created_at': timestamp,
'updated_at': timestamp,
'deleted_at': None,
'deleted': False,
'status': 'active',
'is_public': True,
'container_format': 'ami',
'disk_format': 'ami',
'size': '58145823',
'min_ram': 0,
'min_disk': 0,
'protected': False,
'visibility': 'public',
'tags': [],
'properties': {
'kernel_id': 'nokernel',
'ramdisk_id': 'nokernel',
},
}
image3 = {
'id': '76fa36fc-c930-4bf3-8c8a-ea2a2420deb6',
'name': 'fakeimage123456',
'created_at': timestamp,
'updated_at': timestamp,
'deleted_at': None,
'deleted': False,
'status': 'active',
'is_public': True,
'container_format': 'bare',
'disk_format': 'raw',
'size': '83594576',
'min_ram': 0,
'min_disk': 0,
'protected': False,
'visibility': 'public',
'tags': ['tag3', 'tag4'],
'properties': {
'kernel_id': 'nokernel',
'ramdisk_id': 'nokernel',
'architecture': obj_fields.Architecture.X86_64,
},
}
image4 = {
'id': 'cedef40a-ed67-4d10-800e-17455edce175',
'name': 'fakeimage123456',
'created_at': timestamp,
'updated_at': timestamp,
'deleted_at': None,
'deleted': False,
'status': 'active',
'is_public': True,
'container_format': 'ami',
'disk_format': 'ami',
'size': '84035174',
'min_ram': 0,
'min_disk': 0,
'protected': False,
'visibility': 'public',
'tags': [],
'properties': {
'kernel_id': 'nokernel',
'ramdisk_id': 'nokernel',
},
}
image5 = {
'id': 'c905cedb-7281-47e4-8a62-f26bc5fc4c77',
'name': 'fakeimage123456',
'created_at': timestamp,
'updated_at': timestamp,
'deleted_at': None,
'deleted': False,
'status': 'active',
'is_public': True,
'container_format': 'ami',
'disk_format': 'ami',
'size': '26360814',
'min_ram': 0,
'min_disk': 0,
'protected': False,
'visibility': 'public',
'tags': [],
'properties': {
'kernel_id': '155d900f-4e14-4e4c-a73d-069cbf4541e6',
'ramdisk_id': None,
},
}
auto_disk_config_disabled_image = {
'id': 'a440c04b-79fa-479c-bed1-0b816eaec379',
'name': 'fakeimage6',
'created_at': timestamp,
'updated_at': timestamp,
'deleted_at': None,
'deleted': False,
'status': 'active',
'is_public': False,
'container_format': 'ova',
'disk_format': 'vhd',
'size': '49163826',
'min_ram': 0,
'min_disk': 0,
'protected': False,
'visibility': 'public',
'tags': [],
'properties': {
'kernel_id': 'nokernel',
'ramdisk_id': 'nokernel',
'architecture': obj_fields.Architecture.X86_64,
'auto_disk_config': 'False',
},
}
auto_disk_config_enabled_image = {
'id': '70a599e0-31e7-49b7-b260-868f441e862b',
'name': 'fakeimage7',
'created_at': timestamp,
'updated_at': timestamp,
'deleted_at': None,
'deleted': False,
'status': 'active',
'is_public': False,
'container_format': 'ova',
'disk_format': 'vhd',
'size': '74185822',
'min_ram': 0,
'min_disk': 0,
'protected': False,
'visibility': 'public',
'tags': [],
'properties': {
'kernel_id': 'nokernel',
'ramdisk_id': 'nokernel',
'architecture': obj_fields.Architecture.X86_64,
'auto_disk_config': 'True',
},
}
def __init__(self, test):
super().__init__()
self.test = test
self.images = {}
def setUp(self):
super().setUp()
self.test.useFixture(nova_fixtures.ConfPatcher(
group='glance', api_servers=['http://localhost:9292']))
self.test.stub_out(
'nova.image.glance.API.get_remote_image_service',
lambda context, image_href: (self, image_href))
self.test.stub_out(
'nova.image.glance.get_default_image_service',
lambda: self)
self.create(None, self.image1)
self.create(None, self.image2)
self.create(None, self.image3)
self.create(None, self.image4)
self.create(None, self.image5)
self.create(None, self.auto_disk_config_disabled_image)
self.create(None, self.auto_disk_config_enabled_image)
self._imagedata = {}
# TODO(bcwaldon): implement optional kwargs such as limit, sort_dir
def detail(self, context, **kwargs):
"""Return list of detailed image information."""
return copy.deepcopy(list(self.images.values()))
def download(
self, context, image_id, data=None, dst_path=None, trusted_certs=None,
):
self.show(context, image_id)
if data:
data.write(self._imagedata.get(image_id, b''))
elif dst_path:
with open(dst_path, 'wb') as data:
data.write(self._imagedata.get(image_id, b''))
def show(
self, context, image_id, include_locations=False, show_deleted=True,
):
"""Get data about specified image.
Returns a dict containing image data for the given opaque image id.
"""
image = self.images.get(str(image_id))
if image:
return copy.deepcopy(image)
LOG.warning(
'Unable to find image id %s. Have images: %s',
image_id, self.images)
raise exception.ImageNotFound(image_id=image_id)
def create(self, context, metadata, data=None):
"""Store the image data and return the new image id.
:raises: Duplicate if the image already exist.
"""
image_id = str(metadata.get('id', uuidutils.generate_uuid()))
metadata['id'] = image_id
if image_id in self.images:
raise exception.CouldNotUploadImage(image_id=image_id)
image_meta = copy.deepcopy(metadata)
# Glance sets the size value when an image is created, so we
# need to do that here to fake things out if it's not provided
# by the caller. This is needed to avoid a KeyError in the
# image-size API.
if 'size' not in image_meta:
image_meta['size'] = None
# Similarly, Glance provides the status on the image once it's created
# and this is checked in the compute API when booting a server from
# this image, so we just fake it out to be 'active' even though this
# is mostly a lie on a newly created image.
if 'status' not in metadata:
image_meta['status'] = 'active'
# The owner of the image is by default the request context project_id.
if context and 'owner' not in image_meta.get('properties', {}):
# Note that normally "owner" is a top-level field in an image
# resource in glance but we have to fake this out for the images
# proxy API by throwing it into the generic "properties" dict.
image_meta.get('properties', {})['owner'] = context.project_id
self.images[image_id] = image_meta
if data:
self._imagedata[image_id] = data.read()
return self.images[image_id]
def update(self, context, image_id, metadata, data=None,
purge_props=False):
"""Replace the contents of the given image with the new data.
:raises: ImageNotFound if the image does not exist.
"""
if not self.images.get(image_id):
raise exception.ImageNotFound(image_id=image_id)
if purge_props:
self.images[image_id] = copy.deepcopy(metadata)
else:
image = self.images[image_id]
try:
image['properties'].update(metadata.pop('properties'))
except KeyError:
pass
image.update(metadata)
return self.images[image_id]
def delete(self, context, image_id):
"""Delete the given image.
:raises: ImageNotFound if the image does not exist.
"""
removed = self.images.pop(image_id, None)
if not removed:
raise exception.ImageNotFound(image_id=image_id)
def get_location(self, context, image_id):
if image_id in self.images:
return 'fake_location'
return None

978
nova/tests/fixtures/neutron.py vendored Normal file
View File

@ -0,0 +1,978 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import copy
import random
import fixtures
from keystoneauth1 import adapter as ksa_adap
import mock
from neutronclient.common import exceptions as neutron_client_exc
import os_resource_classes as orc
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from nova import exception
from nova.network import constants as neutron_constants
from nova.network import model as network_model
from nova.tests.fixtures import nova as nova_fixtures
from nova.tests.unit import fake_requests
class _FakeNeutronClient:
"""Class representing a Neutron client which wraps a NeutronFixture.
This wrapper class stores an instance of a NeutronFixture and whether the
Neutron client is an admin client.
For supported methods, (example: list_ports), this class will call the
NeutronFixture's class method with an additional 'is_admin' keyword
argument indicating whether the client is an admin client and the
NeutronFixture method handles it accordingly.
For all other methods, this wrapper class simply calls through to the
corresponding NeutronFixture class method without any modifications.
"""
def __init__(self, fixture, is_admin):
self.fixture = fixture
self.is_admin = is_admin
def __getattr__(self, name):
return getattr(self.fixture, name)
def list_ports(self, retrieve_all=True, **_params):
return self.fixture.list_ports(
self.is_admin, retrieve_all=retrieve_all, **_params,
)
# TODO(stephenfin): We should split out the stubs of neutronclient from the
# stubs of 'nova.network.neutron' to simplify matters
class NeutronFixture(fixtures.Fixture):
"""A fixture to boot instances with neutron ports"""
# the default project_id in OsaAPIFixtures
tenant_id = nova_fixtures.PROJECT_ID
network_1 = {
'id': '3cb9bc59-5699-4588-a4b1-b87f96708bc6',
'name': 'private',
'description': '',
'status': 'ACTIVE',
'subnets': [],
'admin_state_up': True,
'tenant_id': tenant_id,
'project_id': tenant_id,
'shared': False,
'mtu': 1450,
'router:external': False,
'availability_zone_hints': [],
'availability_zones': [
'nova'
],
'port_security_enabled': True,
'ipv4_address_scope': None,
'ipv6_address_scope': None,
'provider:network_type': 'vxlan',
'provider:physical_network': None,
'provider:segmentation_id': 24,
}
security_group = {
'id': 'aec9df91-db1f-4e04-8ac6-e761d8461c53',
'name': 'default',
'description': 'Default security group',
'tenant_id': tenant_id,
'project_id': tenant_id,
'security_group_rules': [], # setup later
}
security_group_rule_ip4_ingress = {
'id': 'e62268aa-1a17-4ff4-ae77-ab348bfe13a7',
'description': None,
'direction': 'ingress',
'ethertype': 'IPv4',
'protocol': None,
'port_range_min': None,
'port_range_max': None,
'remote_group_id': 'aec9df91-db1f-4e04-8ac6-e761d8461c53',
'remote_ip_prefix': None,
'security_group_id': 'aec9df91-db1f-4e04-8ac6-e761d8461c53',
'tenant_id': tenant_id,
'project_id': tenant_id,
}
security_group_rule_ip4_egress = {
'id': 'adf54daf-2ff9-4462-a0b0-f226abd1db28',
'description': None,
'direction': 'egress',
'ethertype': 'IPv4',
'protocol': None,
'port_range_min': None,
'port_range_max': None,
'remote_group_id': None,
'remote_ip_prefix': None,
'security_group_id': 'aec9df91-db1f-4e04-8ac6-e761d8461c53',
'tenant_id': tenant_id,
'project_id': tenant_id,
}
security_group_rule_ip6_ingress = {
'id': 'c4194b5c-3b50-4d35-9247-7850766aee2b',
'description': None,
'direction': 'ingress',
'ethertype': 'IPv6',
'protocol': None,
'port_range_min': None,
'port_range_max': None,
'remote_group_id': 'aec9df91-db1f-4e04-8ac6-e761d8461c53',
'remote_ip_prefix': None,
'security_group_id': 'aec9df91-db1f-4e04-8ac6-e761d8461c53',
'tenant_id': tenant_id,
'project_id': tenant_id,
}
security_group_rule_ip6_egress = {
'id': '16ce6a83-a1db-4d66-a10d-9481d493b072',
'description': None,
'direction': 'egress',
'ethertype': 'IPv6',
'protocol': None,
'port_range_min': None,
'port_range_max': None,
'remote_group_id': None,
'remote_ip_prefix': None,
'security_group_id': 'aec9df91-db1f-4e04-8ac6-e761d8461c53',
'tenant_id': tenant_id,
'project_id': tenant_id,
}
security_group['security_group_rules'] = [
security_group_rule_ip4_ingress['id'],
security_group_rule_ip4_egress['id'],
security_group_rule_ip6_ingress['id'],
security_group_rule_ip6_egress['id'],
]
subnet_1 = {
'id': 'f8a6e8f8-c2ec-497c-9f23-da9616de54ef',
'name': 'private-subnet',
'description': '',
'ip_version': 4,
'ipv6_address_mode': None,
'ipv6_ra_mode': None,
'enable_dhcp': True,
'network_id': network_1['id'],
'tenant_id': tenant_id,
'project_id': tenant_id,
'dns_nameservers': [],
'gateway_ip': '192.168.1.1',
'allocation_pools': [
{
'start': '192.168.1.1',
'end': '192.168.1.254'
}
],
'host_routes': [],
'cidr': '192.168.1.1/24',
}
subnet_ipv6_1 = {
'id': 'f8fa37b7-c10a-44b8-a5fe-d2e65d40b403',
'name': 'ipv6-private-subnet',
'description': '',
'ip_version': 6,
'ipv6_address_mode': 'slaac',
'ipv6_ra_mode': 'slaac',
'enable_dhcp': True,
'network_id': network_1['id'],
'tenant_id': tenant_id,
'project_id': tenant_id,
'dns_nameservers': [],
'gateway_ip': 'fd37:44e8:ad06::1',
'allocation_pools': [
{
'start': 'fd37:44e8:ad06::2',
'end': 'fd37:44e8:ad06:0:ffff:ffff:ffff:ffff'
}
],
'host_routes': [],
'cidr': 'fd37:44e8:ad06::/64',
}
network_1['subnets'] = [subnet_1['id'], subnet_ipv6_1['id']]
port_1 = {
'id': 'ce531f90-199f-48c0-816c-13e38010b442',
'name': '', # yes, this what the neutron API returns
'description': '',
'network_id': network_1['id'],
'admin_state_up': True,
'status': 'ACTIVE',
'mac_address': 'fa:16:3e:4c:2c:30',
'fixed_ips': [
{
# The IP on this port must be a prefix of the IP on port_2 to
# test listing servers with an ip filter regex.
'ip_address': '192.168.1.3',
'subnet_id': subnet_1['id']
}
],
'tenant_id': tenant_id,
'project_id': tenant_id,
'device_id': '',
'binding:profile': {},
'binding:vif_details': {},
'binding:vif_type': 'ovs',
'binding:vnic_type': 'normal',
'port_security_enabled': True,
'security_groups': [
security_group['id'],
],
}
port_2 = {
'id': '88dae9fa-0dc6-49e3-8c29-3abc41e99ac9',
'name': '',
'description': '',
'network_id': network_1['id'],
'admin_state_up': True,
'status': 'ACTIVE',
'mac_address': '00:0c:29:0d:11:74',
'fixed_ips': [
{
'ip_address': '192.168.1.30',
'subnet_id': subnet_1['id']
}
],
'tenant_id': tenant_id,
'project_id': tenant_id,
'device_id': '',
'binding:profile': {},
'binding:vif_details': {},
'binding:vif_type': 'ovs',
'binding:vnic_type': 'normal',
'port_security_enabled': True,
'security_groups': [
security_group['id'],
],
}
port_with_resource_request = {
'id': '2f2613ce-95a9-490a-b3c4-5f1c28c1f886',
'name': '',
'description': '',
'network_id': network_1['id'],
'admin_state_up': True,
'status': 'ACTIVE',
'mac_address': '52:54:00:1e:59:c3',
'fixed_ips': [
{
'ip_address': '192.168.1.42',
'subnet_id': subnet_1['id']
}
],
'tenant_id': tenant_id,
'project_id': tenant_id,
'device_id': '',
'binding:profile': {},
'binding:vif_details': {},
'binding:vif_type': 'ovs',
'binding:vnic_type': 'normal',
'resource_request': {
"resources": {
orc.NET_BW_IGR_KILOBIT_PER_SEC: 1000,
orc.NET_BW_EGR_KILOBIT_PER_SEC: 1000,
},
"required": ["CUSTOM_PHYSNET2", "CUSTOM_VNIC_TYPE_NORMAL"]
},
'port_security_enabled': True,
'security_groups': [
security_group['id'],
],
}
# Fixtures inheriting from NeutronFixture can redefine the default port
# that create_port() is duplicating for creating a new port by using this
# variable
default_port = copy.deepcopy(port_2)
# network_2 does not have security groups enabled - that's okay since most
# of these ports are SR-IOV'y anyway
network_2 = {
'id': '1b70879f-fd00-411e-8ea9-143e7820e61d',
# TODO(stephenfin): This would be more useful name due to things like
# https://bugs.launchpad.net/nova/+bug/1708316
'name': 'private',
'description': '',
'status': 'ACTIVE',
'subnets': [],
'admin_state_up': True,
'tenant_id': tenant_id,
'project_id': tenant_id,
'shared': False,
'mtu': 1450,
'router:external': False,
'availability_zone_hints': [],
'availability_zones': [
'nova'
],
'port_security_enabled': False,
'ipv4_address_scope': None,
'ipv6_address_scope': None,
'provider:network_type': 'vlan',
'provider:physical_network': 'physnet2',
'provider:segmentation_id': 24,
}
subnet_2 = {
'id': 'c7ca1baf-f536-4849-89fe-9671318375ff',
'name': '',
'description': '',
'ip_version': 4,
'ipv6_address_mode': None,
'ipv6_ra_mode': None,
'enable_dhcp': True,
'network_id': network_2['id'],
'tenant_id': tenant_id,
'project_id': tenant_id,
'dns_nameservers': [],
'gateway_ip': '192.168.1.1',
'allocation_pools': [
{
'start': '192.168.13.1',
'end': '192.168.1.254'
}
],
'host_routes': [],
'cidr': '192.168.1.1/24',
}
network_2['subnets'] = [subnet_2['id']]
sriov_port = {
'id': '5460ee0c-ffbb-4e45-8d58-37bfceabd084',
'name': '',
'description': '',
'network_id': network_2['id'],
'admin_state_up': True,
'status': 'ACTIVE',
'mac_address': '52:54:00:1e:59:c4',
'fixed_ips': [
{
'ip_address': '192.168.13.2',
'subnet_id': subnet_2['id']
}
],
'tenant_id': tenant_id,
'project_id': tenant_id,
'device_id': '',
'resource_request': {},
'binding:profile': {},
'binding:vif_details': {'vlan': 100},
'binding:vif_type': 'hw_veb',
'binding:vnic_type': 'direct',
'port_security_enabled': False,
}
sriov_port2 = {
'id': '3f675f19-8b2d-479d-9d42-054644a95a04',
'name': '',
'description': '',
'network_id': network_2['id'],
'admin_state_up': True,
'status': 'ACTIVE',
'mac_address': '52:54:00:1e:59:c5',
'fixed_ips': [
{
'ip_address': '192.168.13.2',
'subnet_id': subnet_2['id']
}
],
'tenant_id': tenant_id,
'project_id': tenant_id,
'device_id': '',
'resource_request': {},
'binding:profile': {},
'binding:vnic_type': 'direct',
'binding:vif_type': 'hw_veb',
'binding:vif_details': {'vlan': 100},
'port_security_enabled': False,
}
sriov_pf_port = {
'id': 'ce2a6ff9-573d-493e-9498-8100953e6f00',
'name': '',
'description': '',
'network_id': network_2['id'],
'admin_state_up': True,
'status': 'ACTIVE',
'mac_address': '52:54:00:1e:59:c6',
'fixed_ips': [
{
'ip_address': '192.168.13.2',
'subnet_id': subnet_2['id']
}
],
'tenant_id': tenant_id,
'project_id': tenant_id,
'device_id': '',
'resource_request': {},
'binding:profile': {},
'binding:vnic_type': 'direct-physical',
'binding:vif_type': 'hostdev_physical',
'binding:vif_details': {'vlan': 100},
'port_security_enabled': False,
}
sriov_pf_port2 = {
'id': 'ad2fd6c2-2c55-4c46-abdc-a8ec0d5f6a29',
'name': '',
'description': '',
'network_id': network_2['id'],
'admin_state_up': True,
'status': 'ACTIVE',
'mac_address': '52:54:00:1e:59:c7',
'fixed_ips': [
{
'ip_address': '192.168.13.2',
'subnet_id': subnet_2['id']
}
],
'tenant_id': tenant_id,
'project_id': tenant_id,
'device_id': '',
'resource_request': {},
'binding:profile': {},
'binding:vnic_type': 'direct-physical',
'binding:vif_type': 'hostdev_physical',
'binding:vif_details': {'vlan': 100},
'port_security_enabled': False,
}
macvtap_port = {
'id': '6eada1f1-6311-428c-a7a5-52b35cabc8fd',
'name': '',
'description': '',
'network_id': network_2['id'],
'admin_state_up': True,
'status': 'ACTIVE',
'mac_address': '52:54:00:1e:59:c8',
'fixed_ips': [
{
'ip_address': '192.168.13.4',
'subnet_id': subnet_2['id']
}
],
'tenant_id': tenant_id,
'project_id': tenant_id,
'device_id': '',
'binding:profile': {},
'binding:vnic_type': 'macvtap',
'binding:vif_type': 'hw_veb',
'binding:vif_details': {'vlan': 100},
'port_security_enabled': False,
}
macvtap_port2 = {
'id': 'fc79cc0c-93e9-4613-9f78-34c828d92e9f',
'name': '',
'description': '',
'network_id': network_2['id'],
'admin_state_up': True,
'status': 'ACTIVE',
'mac_address': '52:54:00:1e:59:c9',
'fixed_ips': [
{
'ip_address': '192.168.13.4',
'subnet_id': subnet_2['id']
}
],
'tenant_id': tenant_id,
'project_id': tenant_id,
'device_id': '',
'binding:profile': {},
'binding:vnic_type': 'macvtap',
'binding:vif_type': 'hw_veb',
'binding:vif_details': {'vlan': 100},
'port_security_enabled': False,
}
port_with_sriov_resource_request = {
'id': '7059503b-a648-40fd-a561-5ca769304bee',
'name': '',
'description': '',
'network_id': network_2['id'],
'admin_state_up': True,
'status': 'ACTIVE',
'mac_address': '52:54:00:1e:59:c5',
# Do neutron really adds fixed_ips to an direct vnic_type port?
'fixed_ips': [
{
'ip_address': '192.168.13.3',
'subnet_id': subnet_2['id']
}
],
'tenant_id': tenant_id,
'project_id': tenant_id,
'device_id': '',
'resource_request': {
"resources": {
orc.NET_BW_IGR_KILOBIT_PER_SEC: 10000,
orc.NET_BW_EGR_KILOBIT_PER_SEC: 10000},
"required": ["CUSTOM_PHYSNET2", "CUSTOM_VNIC_TYPE_DIRECT"]
},
'binding:profile': {},
'binding:vif_details': {},
'binding:vif_type': 'hw_veb',
'binding:vnic_type': 'direct',
'port_security_enabled': False,
}
port_macvtap_with_resource_request = {
'id': 'cbb9707f-3559-4675-a973-4ea89c747f02',
'name': '',
'description': '',
'network_id': network_2['id'],
'admin_state_up': True,
'status': 'ACTIVE',
'mac_address': '52:54:00:1e:59:c6',
# Do neutron really adds fixed_ips to an direct vnic_type port?
'fixed_ips': [
{
'ip_address': '192.168.13.4',
'subnet_id': subnet_2['id']
}
],
'tenant_id': tenant_id,
'project_id': tenant_id,
'device_id': '',
'resource_request': {
"resources": {
orc.NET_BW_IGR_KILOBIT_PER_SEC: 10000,
orc.NET_BW_EGR_KILOBIT_PER_SEC: 10000},
"required": ["CUSTOM_PHYSNET2", "CUSTOM_VNIC_TYPE_MACVTAP"]
},
'binding:profile': {},
'binding:vif_details': {},
'binding:vif_type': 'hw_veb',
'binding:vnic_type': 'macvtap',
'port_security_enabled': False,
}
nw_info = [{
"profile": {},
"ovs_interfaceid": "b71f1699-42be-4515-930a-f3ef01f94aa7",
"preserve_on_delete": False,
"network": {
"bridge": "br-int",
"subnets": [{
"ips": [{
"meta": {},
"version": 4,
"type": "fixed",
"floating_ips": [],
"address": "10.0.0.4"
}],
"version": 4,
"meta": {},
"dns": [],
"routes": [],
"cidr": "10.0.0.0/26",
"gateway": {
"meta": {},
"version": 4,
"type": "gateway",
"address": "10.0.0.1"
}
}],
"meta": {
"injected": False,
"tenant_id": tenant_id,
"mtu": 1500
},
"id": "e1882e38-38c2-4239-ade7-35d644cb963a",
"label": "public"
},
"devname": "tapb71f1699-42",
"vnic_type": "normal",
"qbh_params": None,
"meta": {},
"details": {
"port_filter": True,
"ovs_hybrid_plug": True
},
"address": "fa:16:3e:47:94:4a",
"active": True,
"type": "ovs",
"id": "b71f1699-42be-4515-930a-f3ef01f94aa7",
"qbg_params": None
}]
def __init__(self, test):
super().__init__()
self.test = test
# TODO(stephenfin): This should probably happen in setUp
# The fixture allows port update so we need to deepcopy the class
# variables to avoid test case interference.
self._ports = {
# NOTE(gibi)The port_with_sriov_resource_request cannot be added
# globally in this fixture as it adds a second network that makes
# auto allocation based test to fail due to ambiguous networks.
self.port_1['id']: copy.deepcopy(self.port_1),
self.port_with_resource_request['id']:
copy.deepcopy(self.port_with_resource_request)
}
# Store multiple port bindings per port in a dict keyed by the host.
# At startup we assume that none of the ports are bound.
# {<port_id>: {<hostname>: <binding>}}
self._port_bindings = collections.defaultdict(dict)
# The fixture does not allow network, subnet or security group updates
# so we don't have to deepcopy here
self._networks = {
self.network_1['id']: self.network_1
}
self._subnets = {
self.subnet_1['id']: self.subnet_1,
self.subnet_ipv6_1['id']: self.subnet_ipv6_1,
}
self._security_groups = {
self.security_group['id']: self.security_group,
}
def setUp(self):
super().setUp()
# NOTE(gibi): This is the simplest way to unblock nova during live
# migration. A nicer way would be to actually send network-vif-plugged
# events to the nova-api from NeutronFixture when the port is bound but
# calling nova API from this fixture needs a big surgery and sending
# event right at the binding request means that such event will arrive
# to nova earlier than the compute manager starts waiting for it.
self.test.flags(vif_plugging_timeout=0)
self.test.stub_out(
'nova.network.neutron.API.add_fixed_ip_to_instance',
lambda *args, **kwargs: network_model.NetworkInfo.hydrate(
self.nw_info))
self.test.stub_out(
'nova.network.neutron.API.remove_fixed_ip_from_instance',
lambda *args, **kwargs: network_model.NetworkInfo.hydrate(
self.nw_info))
# Stub out port binding APIs which go through a KSA client Adapter
# rather than python-neutronclient.
self.test.stub_out(
'nova.network.neutron._get_ksa_client',
lambda *args, **kwargs: mock.Mock(
spec=ksa_adap.Adapter))
self.test.stub_out(
'nova.network.neutron.API._create_port_binding',
self.create_port_binding)
self.test.stub_out(
'nova.network.neutron.API._delete_port_binding',
self.delete_port_binding)
self.test.stub_out(
'nova.network.neutron.API._activate_port_binding',
self.activate_port_binding)
self.test.stub_out(
'nova.network.neutron.API._get_port_binding',
self.get_port_binding)
self.test.stub_out(
'nova.network.neutron.get_client', self._get_client)
def _get_client(self, context, admin=False):
# This logic is copied from nova.network.neutron._get_auth_plugin
admin = admin or context.is_admin and not context.auth_token
return _FakeNeutronClient(self, admin)
def create_port_binding(self, context, client, port_id, data):
if port_id not in self._ports:
return fake_requests.FakeResponse(
404, content='Port %s not found' % port_id)
port = self._ports[port_id]
binding = copy.deepcopy(data['binding'])
# NOTE(stephenfin): We don't allow changing of backend
binding['vif_type'] = port['binding:vif_type']
binding['vif_details'] = port['binding:vif_details']
binding['vnic_type'] = port['binding:vnic_type']
# the first binding is active by default
if not self._port_bindings[port_id]:
binding['status'] = 'ACTIVE'
else:
binding['status'] = 'INACTIVE'
self._port_bindings[port_id][binding['host']] = binding
return fake_requests.FakeResponse(
200, content=jsonutils.dumps({'binding': binding}),
)
def _get_failure_response_if_port_or_binding_not_exists(
self, port_id, host,
):
if port_id not in self._ports:
return fake_requests.FakeResponse(
404, content='Port %s not found' % port_id)
if host not in self._port_bindings[port_id]:
return fake_requests.FakeResponse(
404,
content='Binding for host %s for port %s not found'
% (host, port_id))
def delete_port_binding(self, context, client, port_id, host):
failure = self._get_failure_response_if_port_or_binding_not_exists(
port_id, host)
if failure is not None:
return failure
del self._port_bindings[port_id][host]
return fake_requests.FakeResponse(204)
def _activate_port_binding(self, port_id, host):
# It makes sure that only one binding is active for a port
for h, binding in self._port_bindings[port_id].items():
if h == host:
# NOTE(gibi): neutron returns 409 if this binding is already
# active but nova does not depend on this behaviour yet.
binding['status'] = 'ACTIVE'
else:
binding['status'] = 'INACTIVE'
def activate_port_binding(self, context, client, port_id, host):
failure = self._get_failure_response_if_port_or_binding_not_exists(
port_id, host)
if failure is not None:
return failure
self._activate_port_binding(port_id, host)
return fake_requests.FakeResponse(200)
def get_port_binding(self, context, client, port_id, host):
failure = self._get_failure_response_if_port_or_binding_not_exists(
port_id, host)
if failure is not None:
return failure
binding = {"binding": self._port_bindings[port_id][host]}
return fake_requests.FakeResponse(
200, content=jsonutils.dumps(binding))
def _list_resource(self, resources, retrieve_all, **_params):
# If 'fields' is passed we need to strip that out since it will mess
# up the filtering as 'fields' is not a filter parameter.
_params.pop('fields', None)
result = []
for resource in resources.values():
for key, val in _params.items():
# params can be strings or lists/tuples and these need to be
# handled differently
if isinstance(val, list) or isinstance(val, tuple):
if not any(resource.get(key) == v for v in val):
break
else:
if resource.get(key) != val:
break
else: # triggers if we didn't hit a break above
result.append(copy.deepcopy(resource))
return result
def list_extensions(self, *args, **kwargs):
return {
'extensions': [
{
# Copied from neutron-lib portbindings_extended.py
"updated": "2017-07-17T10:00:00-00:00",
"name": neutron_constants.PORT_BINDING_EXTENDED,
"links": [],
"alias": "binding-extended",
"description": "Expose port bindings of a virtual port to "
"external application"
}
]
}
def _get_active_binding(self, port_id):
for host, binding in self._port_bindings[port_id].items():
if binding['status'] == 'ACTIVE':
return host, copy.deepcopy(binding)
return None, {}
def _merge_in_active_binding(self, port):
"""Update the port dict with the currently active port binding"""
if port['id'] not in self._port_bindings:
return
_, binding = self._get_active_binding(port['id'])
for key, value in binding.items():
# keys in the binding is like 'vnic_type' but in the port response
# they are like 'binding:vnic_type'. Except for the host_id that
# is called 'host' in the binding but 'binding:host_id' in the
# port response.
if key != 'host':
port['binding:' + key] = value
else:
port['binding:host_id'] = binding['host']
def show_port(self, port_id, **_params):
if port_id not in self._ports:
raise exception.PortNotFound(port_id=port_id)
port = copy.deepcopy(self._ports[port_id])
self._merge_in_active_binding(port)
return {'port': port}
def delete_port(self, port_id, **_params):
if port_id in self._ports:
del self._ports[port_id]
# Not all flow use explicit binding creation by calling
# neutronv2.api.API.bind_ports_to_host(). Non live migration flows
# simply update the port to bind it. So we need to delete bindings
# conditionally
if port_id in self._port_bindings:
del self._port_bindings[port_id]
def list_ports(self, is_admin, retrieve_all=True, **_params):
ports = self._list_resource(self._ports, retrieve_all, **_params)
for port in ports:
self._merge_in_active_binding(port)
# Neutron returns None instead of the real resource_request if
# the ports are queried by a non-admin. So simulate this behavior
# here
if not is_admin:
if 'resource_request' in port:
port['resource_request'] = None
return {'ports': ports}
def show_network(self, network_id, **_params):
if network_id not in self._networks:
raise neutron_client_exc.NetworkNotFoundClient()
return {'network': copy.deepcopy(self._networks[network_id])}
def list_networks(self, retrieve_all=True, **_params):
return {
'networks': self._list_resource(
self._networks, retrieve_all, **_params,
),
}
def show_subnet(self, subnet_id, **_params):
if subnet_id not in self._subnets:
raise neutron_client_exc.NeutronClientException()
return {'subnet': copy.deepcopy(self._subnets[subnet_id])}
def list_subnets(self, retrieve_all=True, **_params):
# NOTE(gibi): The fixture does not support filtering for subnets
return {'subnets': copy.deepcopy(list(self._subnets.values()))}
def list_floatingips(self, retrieve_all=True, **_params):
return {'floatingips': []}
def list_security_groups(self, retrieve_all=True, **_params):
return {
'security_groups': self._list_resource(
self._security_groups, retrieve_all, **_params,
),
}
def create_port(self, body=None):
body = body or {'port': {}}
# Note(gibi): Some of the test expects that a pre-defined port is
# created. This is port_2. So if that port is not created yet then
# that is the one created here.
new_port = copy.deepcopy(body['port'])
new_port.update(copy.deepcopy(self.default_port))
if self.default_port['id'] in self._ports:
# If the port is already created then create a new port based on
# the request body, the default port as a template, and assign new
# port_id and mac_address for the new port
# we need truly random uuids instead of named sentinels as some
# tests needs more than 3 ports
new_port.update({
'id': str(uuidutils.generate_uuid()),
'mac_address': '00:' + ':'.join(
['%02x' % random.randint(0, 255) for _ in range(5)]),
})
self._ports[new_port['id']] = new_port
# we need to copy again what we return as nova might modify the
# returned port locally and we don't want that it effects the port in
# the self._ports dict.
return {'port': copy.deepcopy(new_port)}
def update_port(self, port_id, body=None):
port = self._ports[port_id]
# We need to deepcopy here as well as the body can have a nested dict
# which can be modified by the caller after this update_port call
port.update(copy.deepcopy(body['port']))
# update port binding
if (
'binding:host_id' in body['port'] and
body['port']['binding:host_id'] is None
):
# if the host_id is explicitly set to None, delete the binding
host, _ = self._get_active_binding(port_id)
del self._port_bindings[port_id][host]
else:
# else it's an update
if 'binding:host_id' in body['port']:
# if the host ID is present, update that specific binding
host = body['port']['binding:host_id']
else:
# else update the active one
host, _ = self._get_active_binding(port_id)
update = {
'host': host,
'status': 'ACTIVE',
'vif_type': port['binding:vif_type'],
'vnic_type': port['binding:vnic_type'],
}
if body['port'].get('binding:profile'):
update['profile'] = copy.deepcopy(
body['port']['binding:profile'])
if body['port'].get('binding:vif_details'):
update['vif_details'] = copy.deepcopy(
body['port']['binding:vif_details'])
self._port_bindings[port_id][host] = update
# mark any other active bindings as inactive
self._activate_port_binding(port_id, host)
return {'port': copy.deepcopy(port)}
def show_quota(self, project_id):
# unlimited quota
return {'quota': {'port': -1}}
def validate_auto_allocated_topology_requirements(self, project_id):
# from https://github.com/openstack/python-neutronclient/blob/6.14.0/
# neutronclient/v2_0/client.py#L2009-L2011
return self.get_auto_allocated_topology(project_id, fields=['dry-run'])
def get_auto_allocated_topology(self, project_id, **_params):
# from https://github.com/openstack/neutron/blob/14.0.0/
# neutron/services/auto_allocate/db.py#L134-L162
if _params == {'fields': ['dry-run']}:
return {'id': 'dry-run=pass', 'tenant_id': project_id}
return {
'auto_allocated_topology': {
'id': self.network_1['id'],
'tenant_id': project_id,
}
}

File diff suppressed because it is too large Load Diff

View File

@ -68,6 +68,7 @@ from nova.policies import servers as servers_policy
from nova.scheduler import utils as scheduler_utils
from nova import test
from nova.tests import fixtures
from nova.tests.fixtures import cyborg as cyborg_fixture
from nova.tests.unit.compute import eventlet_utils
from nova.tests.unit.compute import fake_resource_tracker
from nova.tests.unit import fake_block_device
@ -3148,7 +3149,7 @@ class ComputeTestCase(BaseTestCase,
def test_reboot_with_accels_ok(self, mock_get_arqs):
dp_name = 'mydp'
extra_specs = {'accel:device_profile': dp_name}
_, accel_info = fixtures.get_arqs(dp_name)
_, accel_info = cyborg_fixture.get_arqs(dp_name)
mock_get_arqs.return_value = accel_info
instance_uuid = self._test_reboot_with_accels(

View File

@ -53,6 +53,7 @@ from nova.scheduler.client import query
from nova.scheduler import utils as scheduler_utils
from nova import test
from nova.tests import fixtures
from nova.tests.fixtures import cyborg as cyborg_fixture
from nova.tests.unit.api.openstack import fakes
from nova.tests.unit.compute import test_compute
from nova.tests.unit import fake_build_request
@ -2280,7 +2281,7 @@ class ConductorTaskTestCase(_BaseTaskTestCase, test_compute.BaseTestCase):
dp_name = 'mydp'
instance.flavor.extra_specs = {'accel:device_profile': dp_name}
in_arq_list, _ = fixtures.get_arqs(dp_name)
in_arq_list, _ = cyborg_fixture.get_arqs(dp_name)
mock_create.return_value = in_arq_list
self.conductor._create_and_bind_arqs(self.context,