Merge "Add a validation resources fixture"
This commit is contained in:
@@ -12,6 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import fixtures
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
|
||||
@@ -340,3 +341,111 @@ def clear_validation_resources(clients, keypair=None, floating_ip=None,
|
||||
has_exception = exc
|
||||
if has_exception:
|
||||
raise has_exception
|
||||
|
||||
|
||||
class ValidationResourcesFixture(fixtures.Fixture):
|
||||
"""Fixture to provision and cleanup validation resources"""
|
||||
|
||||
DICT_KEYS = ['keypair', 'security_group', 'floating_ip']
|
||||
|
||||
def __init__(self, clients, keypair=False, floating_ip=False,
|
||||
security_group=False, security_group_rules=False,
|
||||
ethertype='IPv4', use_neutron=True, floating_network_id=None,
|
||||
floating_network_name=None):
|
||||
"""Create a ValidationResourcesFixture
|
||||
|
||||
Create a ValidationResourcesFixture fixtures, which provisions the
|
||||
resources required to be able to ping / ssh a virtual machine upon
|
||||
setUp and clears them out upon cleanup. Resources are keypair,
|
||||
security group, security group rules and a floating IP - depending
|
||||
on the params.
|
||||
|
||||
The fixture exposes a dictionary that includes provisioned resources.
|
||||
|
||||
:param clients: `tempest.lib.services.clients.ServiceClients` or of a
|
||||
subclass of it. Resources are provisioned using clients from
|
||||
`clients`.
|
||||
:param keypair: Whether to provision a keypair. Defaults to False.
|
||||
:param floating_ip: Whether to provision a floating IP.
|
||||
Defaults to False.
|
||||
:param security_group: Whether to provision a security group.
|
||||
Defaults to False.
|
||||
:param security_group_rules: Whether to provision security group rules.
|
||||
Defaults to False.
|
||||
:param ethertype: 'IPv4' or 'IPv6'. Honoured only if neutron is used.
|
||||
:param use_neutron: When True resources are provisioned via neutron,
|
||||
when False resources are provisioned via nova.
|
||||
:param floating_network_id: The id of the network used to provision a
|
||||
floating IP. Only used if a floating IP is requested in case
|
||||
neutron is used.
|
||||
:param floating_network_name: The name of the floating IP pool used to
|
||||
provision the floating IP. Only used if a floating IP is requested
|
||||
and with nova-net.
|
||||
:returns: A dictionary with the same keys as the input
|
||||
`validation_resources` and the resources for values in the format
|
||||
they are returned by the API.
|
||||
|
||||
Examples::
|
||||
|
||||
from tempest.common import validation_resources as vr
|
||||
from tempest.lib import auth
|
||||
from tempest.lib.services import clients
|
||||
import testtools
|
||||
|
||||
|
||||
class TestWithVR(testtools.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
creds = auth.get_credentials(
|
||||
'http://mycloud/identity/v3',
|
||||
username='me', project_name='me',
|
||||
password='secret', domain_name='Default')
|
||||
|
||||
osclients = clients.ServiceClients(
|
||||
creds, 'http://mycloud/identity/v3')
|
||||
# Request keypair and floating IP
|
||||
resources = dict(keypair=True, security_group=False,
|
||||
security_group_rules=False,
|
||||
floating_ip=True)
|
||||
network_id = '4240E68E-23DA-4C82-AC34-9FEFAA24521C'
|
||||
self.vr = self.useFixture(vr.ValidationResourcesFixture(
|
||||
osclients, use_neutron=True,
|
||||
floating_network_id=network_id,
|
||||
**resources)
|
||||
|
||||
def test_use_ip(self):
|
||||
# The floating IP to be attached to the VM
|
||||
floating_ip = self.vr['floating_ip']['ip']
|
||||
"""
|
||||
self._clients = clients
|
||||
self._keypair = keypair
|
||||
self._floating_ip = floating_ip
|
||||
self._security_group = security_group
|
||||
self._security_group_rules = security_group_rules
|
||||
self._ethertype = ethertype
|
||||
self._use_neutron = use_neutron
|
||||
self._floating_network_id = floating_network_id
|
||||
self._floating_network_name = floating_network_name
|
||||
self._validation_resources = None
|
||||
|
||||
def _setUp(self):
|
||||
self._validation_resources = create_validation_resources(
|
||||
self._clients, keypair=self._keypair,
|
||||
floating_ip=self._floating_ip,
|
||||
security_group=self._security_group,
|
||||
security_group_rules=self._security_group_rules,
|
||||
ethertype=self._ethertype, use_neutron=self._use_neutron,
|
||||
floating_network_id=self._floating_network_id,
|
||||
floating_network_name=self._floating_network_name)
|
||||
# If provisioning raises an exception we won't have anything to
|
||||
# cleanup here, so we don't need a try-finally around provisioning
|
||||
vr = self._validation_resources
|
||||
self.addCleanup(clear_validation_resources, self._clients,
|
||||
keypair=vr['keypair'],
|
||||
floating_ip=vr['floating_ip'],
|
||||
security_group=vr['security_group'],
|
||||
use_neutron=self._use_neutron)
|
||||
|
||||
@property
|
||||
def resources(self):
|
||||
return self._validation_resources
|
||||
|
||||
@@ -0,0 +1,344 @@
|
||||
# Copyright (c) 2017 IBM Corp.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import fixtures
|
||||
import mock
|
||||
import testtools
|
||||
|
||||
from tempest.common import validation_resources as vr
|
||||
from tempest.lib import exceptions as lib_exc
|
||||
from tempest.lib.services import clients
|
||||
from tempest.tests import base
|
||||
from tempest.tests.lib import fake_credentials
|
||||
from tempest.tests.lib.services import registry_fixture
|
||||
|
||||
FAKE_SECURITY_GROUP = {'security_group': {'id': 'sg_id'}}
|
||||
FAKE_KEYPAIR = {'keypair': {'name': 'keypair_name'}}
|
||||
FAKE_FIP_NOVA_NET = {'floating_ip': {'ip': '1.2.3.4', 'id': '1234'}}
|
||||
FAKE_FIP_NEUTRON = {'floatingip': {'floating_ip_address': '1.2.3.4',
|
||||
'id': '1234'}}
|
||||
|
||||
SERVICES = 'tempest.lib.services'
|
||||
SG_CLIENT = (SERVICES + '.%s.security_groups_client.SecurityGroupsClient.%s')
|
||||
SGR_CLIENT = (SERVICES + '.%s.security_group_rules_client.'
|
||||
'SecurityGroupRulesClient.create_security_group_rule')
|
||||
KP_CLIENT = (SERVICES + '.compute.keypairs_client.KeyPairsClient.%s')
|
||||
FIP_CLIENT = (SERVICES + '.%s.floating_ips_client.FloatingIPsClient.%s')
|
||||
|
||||
|
||||
class TestValidationResources(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestValidationResources, self).setUp()
|
||||
self.useFixture(registry_fixture.RegistryFixture())
|
||||
self.mock_sg_compute = self.useFixture(fixtures.MockPatch(
|
||||
SG_CLIENT % ('compute', 'create_security_group'), autospec=True,
|
||||
return_value=FAKE_SECURITY_GROUP))
|
||||
self.mock_sg_network = self.useFixture(fixtures.MockPatch(
|
||||
SG_CLIENT % ('network', 'create_security_group'), autospec=True,
|
||||
return_value=FAKE_SECURITY_GROUP))
|
||||
self.mock_sgr_compute = self.useFixture(fixtures.MockPatch(
|
||||
SGR_CLIENT % 'compute', autospec=True))
|
||||
self.mock_sgr_network = self.useFixture(fixtures.MockPatch(
|
||||
SGR_CLIENT % 'network', autospec=True))
|
||||
self.mock_kp = self.useFixture(fixtures.MockPatch(
|
||||
KP_CLIENT % 'create_keypair', autospec=True,
|
||||
return_value=FAKE_KEYPAIR))
|
||||
self.mock_fip_compute = self.useFixture(fixtures.MockPatch(
|
||||
FIP_CLIENT % ('compute', 'create_floating_ip'), autospec=True,
|
||||
return_value=FAKE_FIP_NOVA_NET))
|
||||
self.mock_fip_network = self.useFixture(fixtures.MockPatch(
|
||||
FIP_CLIENT % ('network', 'create_floatingip'), autospec=True,
|
||||
return_value=FAKE_FIP_NEUTRON))
|
||||
self.os = clients.ServiceClients(
|
||||
fake_credentials.FakeKeystoneV3Credentials(), 'fake_uri')
|
||||
|
||||
def test_create_ssh_security_group_nova_net(self):
|
||||
expected_sg_id = FAKE_SECURITY_GROUP['security_group']['id']
|
||||
sg = vr.create_ssh_security_group(self.os, add_rule=True,
|
||||
use_neutron=False)
|
||||
self.assertEqual(FAKE_SECURITY_GROUP['security_group'], sg)
|
||||
# Neutron clients have not been used
|
||||
self.assertEqual(self.mock_sg_network.mock.call_count, 0)
|
||||
self.assertEqual(self.mock_sgr_network.mock.call_count, 0)
|
||||
# Nova-net clients assertions
|
||||
self.assertGreater(self.mock_sg_compute.mock.call_count, 0)
|
||||
self.assertGreater(self.mock_sgr_compute.mock.call_count, 0)
|
||||
for call in self.mock_sgr_compute.mock.call_args_list[1:]:
|
||||
self.assertIn(expected_sg_id, call[1].values())
|
||||
|
||||
def test_create_ssh_security_group_neutron(self):
|
||||
expected_sg_id = FAKE_SECURITY_GROUP['security_group']['id']
|
||||
expected_ethertype = 'fake_ethertype'
|
||||
sg = vr.create_ssh_security_group(self.os, add_rule=True,
|
||||
use_neutron=True,
|
||||
ethertype=expected_ethertype)
|
||||
self.assertEqual(FAKE_SECURITY_GROUP['security_group'], sg)
|
||||
# Nova-net clients have not been used
|
||||
self.assertEqual(self.mock_sg_compute.mock.call_count, 0)
|
||||
self.assertEqual(self.mock_sgr_compute.mock.call_count, 0)
|
||||
# Nova-net clients assertions
|
||||
self.assertGreater(self.mock_sg_network.mock.call_count, 0)
|
||||
self.assertGreater(self.mock_sgr_network.mock.call_count, 0)
|
||||
# Check SG ID and ethertype are passed down to rules
|
||||
for call in self.mock_sgr_network.mock.call_args_list[1:]:
|
||||
self.assertIn(expected_sg_id, call[1].values())
|
||||
self.assertIn(expected_ethertype, call[1].values())
|
||||
|
||||
def test_create_ssh_security_no_rules(self):
|
||||
sg = vr.create_ssh_security_group(self.os, add_rule=False)
|
||||
self.assertEqual(FAKE_SECURITY_GROUP['security_group'], sg)
|
||||
# SG Rules clients have not been used
|
||||
self.assertEqual(self.mock_sgr_compute.mock.call_count, 0)
|
||||
self.assertEqual(self.mock_sgr_network.mock.call_count, 0)
|
||||
|
||||
@mock.patch.object(vr, 'create_ssh_security_group',
|
||||
return_value=FAKE_SECURITY_GROUP['security_group'])
|
||||
def test_create_validation_resources_nova_net(self, mock_create_sg):
|
||||
expected_floating_network_id = 'my_fni'
|
||||
expected_floating_network_name = 'my_fnn'
|
||||
resources = vr.create_validation_resources(
|
||||
self.os, keypair=True, floating_ip=True, security_group=True,
|
||||
security_group_rules=True, ethertype='IPv6', use_neutron=False,
|
||||
floating_network_id=expected_floating_network_id,
|
||||
floating_network_name=expected_floating_network_name)
|
||||
# Keypair calls
|
||||
self.assertGreater(self.mock_kp.mock.call_count, 0)
|
||||
# Floating IP calls
|
||||
self.assertGreater(self.mock_fip_compute.mock.call_count, 0)
|
||||
for call in self.mock_fip_compute.mock.call_args_list[1:]:
|
||||
self.assertIn(expected_floating_network_name, call[1].values())
|
||||
self.assertNotIn(expected_floating_network_id, call[1].values())
|
||||
self.assertEqual(self.mock_fip_network.mock.call_count, 0)
|
||||
# SG calls
|
||||
mock_create_sg.assert_called_once()
|
||||
# Resources
|
||||
for resource in ['keypair', 'floating_ip', 'security_group']:
|
||||
self.assertIn(resource, resources)
|
||||
self.assertEqual(FAKE_KEYPAIR['keypair'], resources['keypair'])
|
||||
self.assertEqual(FAKE_SECURITY_GROUP['security_group'],
|
||||
resources['security_group'])
|
||||
self.assertEqual(FAKE_FIP_NOVA_NET['floating_ip'],
|
||||
resources['floating_ip'])
|
||||
|
||||
@mock.patch.object(vr, 'create_ssh_security_group',
|
||||
return_value=FAKE_SECURITY_GROUP['security_group'])
|
||||
def test_create_validation_resources_neutron(self, mock_create_sg):
|
||||
expected_floating_network_id = 'my_fni'
|
||||
expected_floating_network_name = 'my_fnn'
|
||||
resources = vr.create_validation_resources(
|
||||
self.os, keypair=True, floating_ip=True, security_group=True,
|
||||
security_group_rules=True, ethertype='IPv6', use_neutron=True,
|
||||
floating_network_id=expected_floating_network_id,
|
||||
floating_network_name=expected_floating_network_name)
|
||||
# Keypair calls
|
||||
self.assertGreater(self.mock_kp.mock.call_count, 0)
|
||||
# Floating IP calls
|
||||
self.assertEqual(self.mock_fip_compute.mock.call_count, 0)
|
||||
self.assertGreater(self.mock_fip_network.mock.call_count, 0)
|
||||
for call in self.mock_fip_compute.mock.call_args_list[1:]:
|
||||
self.assertIn(expected_floating_network_id, call[1].values())
|
||||
self.assertNotIn(expected_floating_network_name, call[1].values())
|
||||
# SG calls
|
||||
mock_create_sg.assert_called_once()
|
||||
# Resources
|
||||
for resource in ['keypair', 'floating_ip', 'security_group']:
|
||||
self.assertIn(resource, resources)
|
||||
self.assertEqual(FAKE_KEYPAIR['keypair'], resources['keypair'])
|
||||
self.assertEqual(FAKE_SECURITY_GROUP['security_group'],
|
||||
resources['security_group'])
|
||||
self.assertIn('ip', resources['floating_ip'])
|
||||
self.assertEqual(resources['floating_ip']['ip'],
|
||||
FAKE_FIP_NEUTRON['floatingip']['floating_ip_address'])
|
||||
self.assertEqual(resources['floating_ip']['id'],
|
||||
FAKE_FIP_NEUTRON['floatingip']['id'])
|
||||
|
||||
|
||||
class TestClearValidationResourcesFixture(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestClearValidationResourcesFixture, self).setUp()
|
||||
self.useFixture(registry_fixture.RegistryFixture())
|
||||
self.mock_sg_compute = self.useFixture(fixtures.MockPatch(
|
||||
SG_CLIENT % ('compute', 'delete_security_group'), autospec=True))
|
||||
self.mock_sg_network = self.useFixture(fixtures.MockPatch(
|
||||
SG_CLIENT % ('network', 'delete_security_group'), autospec=True))
|
||||
self.mock_sg_wait_compute = self.useFixture(fixtures.MockPatch(
|
||||
SG_CLIENT % ('compute', 'wait_for_resource_deletion'),
|
||||
autospec=True))
|
||||
self.mock_sg_wait_network = self.useFixture(fixtures.MockPatch(
|
||||
SG_CLIENT % ('network', 'wait_for_resource_deletion'),
|
||||
autospec=True))
|
||||
self.mock_kp = self.useFixture(fixtures.MockPatch(
|
||||
KP_CLIENT % 'delete_keypair', autospec=True))
|
||||
self.mock_fip_compute = self.useFixture(fixtures.MockPatch(
|
||||
FIP_CLIENT % ('compute', 'delete_floating_ip'), autospec=True))
|
||||
self.mock_fip_network = self.useFixture(fixtures.MockPatch(
|
||||
FIP_CLIENT % ('network', 'delete_floatingip'), autospec=True))
|
||||
self.os = clients.ServiceClients(
|
||||
fake_credentials.FakeKeystoneV3Credentials(), 'fake_uri')
|
||||
|
||||
def test_clear_validation_resources_nova_net(self):
|
||||
vr.clear_validation_resources(
|
||||
self.os,
|
||||
floating_ip=FAKE_FIP_NOVA_NET['floating_ip'],
|
||||
security_group=FAKE_SECURITY_GROUP['security_group'],
|
||||
keypair=FAKE_KEYPAIR['keypair'],
|
||||
use_neutron=False)
|
||||
self.assertGreater(self.mock_kp.mock.call_count, 0)
|
||||
for call in self.mock_kp.mock.call_args_list[1:]:
|
||||
self.assertIn(FAKE_KEYPAIR['keypair']['name'], call[1].values())
|
||||
self.assertGreater(self.mock_sg_compute.mock.call_count, 0)
|
||||
for call in self.mock_sg_compute.mock.call_args_list[1:]:
|
||||
self.assertIn(FAKE_SECURITY_GROUP['security_group']['id'],
|
||||
call[1].values())
|
||||
self.assertGreater(self.mock_sg_wait_compute.mock.call_count, 0)
|
||||
for call in self.mock_sg_wait_compute.mock.call_args_list[1:]:
|
||||
self.assertIn(FAKE_SECURITY_GROUP['security_group']['id'],
|
||||
call[1].values())
|
||||
self.assertEqual(self.mock_sg_network.mock.call_count, 0)
|
||||
self.assertEqual(self.mock_sg_wait_network.mock.call_count, 0)
|
||||
self.assertGreater(self.mock_fip_compute.mock.call_count, 0)
|
||||
for call in self.mock_fip_compute.mock.call_args_list[1:]:
|
||||
self.assertIn(FAKE_FIP_NOVA_NET['floating_ip']['id'],
|
||||
call[1].values())
|
||||
self.assertEqual(self.mock_fip_network.mock.call_count, 0)
|
||||
|
||||
def test_clear_validation_resources_neutron(self):
|
||||
vr.clear_validation_resources(
|
||||
self.os,
|
||||
floating_ip=FAKE_FIP_NEUTRON['floatingip'],
|
||||
security_group=FAKE_SECURITY_GROUP['security_group'],
|
||||
keypair=FAKE_KEYPAIR['keypair'],
|
||||
use_neutron=True)
|
||||
self.assertGreater(self.mock_kp.mock.call_count, 0)
|
||||
for call in self.mock_kp.mock.call_args_list[1:]:
|
||||
self.assertIn(FAKE_KEYPAIR['keypair']['name'], call[1].values())
|
||||
self.assertGreater(self.mock_sg_network.mock.call_count, 0)
|
||||
for call in self.mock_sg_network.mock.call_args_list[1:]:
|
||||
self.assertIn(FAKE_SECURITY_GROUP['security_group']['id'],
|
||||
call[1].values())
|
||||
self.assertGreater(self.mock_sg_wait_network.mock.call_count, 0)
|
||||
for call in self.mock_sg_wait_network.mock.call_args_list[1:]:
|
||||
self.assertIn(FAKE_SECURITY_GROUP['security_group']['id'],
|
||||
call[1].values())
|
||||
self.assertEqual(self.mock_sg_compute.mock.call_count, 0)
|
||||
self.assertEqual(self.mock_sg_wait_compute.mock.call_count, 0)
|
||||
self.assertGreater(self.mock_fip_network.mock.call_count, 0)
|
||||
for call in self.mock_fip_network.mock.call_args_list[1:]:
|
||||
self.assertIn(FAKE_FIP_NEUTRON['floatingip']['id'],
|
||||
call[1].values())
|
||||
self.assertEqual(self.mock_fip_compute.mock.call_count, 0)
|
||||
|
||||
def test_clear_validation_resources_exceptions(self):
|
||||
# Test that even with exceptions all cleanups are invoked and that only
|
||||
# the first exception is reported.
|
||||
# NOTE(andreaf) There's not way of knowing which exception is going to
|
||||
# be raised first unless we enforce which resource is cleared first,
|
||||
# which is not really interesting, but also not harmful. keypair first.
|
||||
self.mock_kp.mock.side_effect = Exception('keypair exception')
|
||||
self.mock_sg_network.mock.side_effect = Exception('sg exception')
|
||||
self.mock_fip_network.mock.side_effect = Exception('fip exception')
|
||||
with testtools.ExpectedException(Exception, value_re='keypair'):
|
||||
vr.clear_validation_resources(
|
||||
self.os,
|
||||
floating_ip=FAKE_FIP_NEUTRON['floatingip'],
|
||||
security_group=FAKE_SECURITY_GROUP['security_group'],
|
||||
keypair=FAKE_KEYPAIR['keypair'],
|
||||
use_neutron=True)
|
||||
# Clients calls are still made, but not the wait call
|
||||
self.assertGreater(self.mock_kp.mock.call_count, 0)
|
||||
self.assertGreater(self.mock_sg_network.mock.call_count, 0)
|
||||
self.assertGreater(self.mock_fip_network.mock.call_count, 0)
|
||||
|
||||
def test_clear_validation_resources_wait_not_found_wait(self):
|
||||
# Test that a not found on wait is not an exception
|
||||
self.mock_sg_wait_network.mock.side_effect = lib_exc.NotFound('yay')
|
||||
vr.clear_validation_resources(
|
||||
self.os,
|
||||
floating_ip=FAKE_FIP_NEUTRON['floatingip'],
|
||||
security_group=FAKE_SECURITY_GROUP['security_group'],
|
||||
keypair=FAKE_KEYPAIR['keypair'],
|
||||
use_neutron=True)
|
||||
# Clients calls are still made, but not the wait call
|
||||
self.assertGreater(self.mock_kp.mock.call_count, 0)
|
||||
self.assertGreater(self.mock_sg_network.mock.call_count, 0)
|
||||
self.assertGreater(self.mock_sg_wait_network.mock.call_count, 0)
|
||||
self.assertGreater(self.mock_fip_network.mock.call_count, 0)
|
||||
|
||||
def test_clear_validation_resources_wait_not_found_delete(self):
|
||||
# Test that a not found on delete is not an exception
|
||||
self.mock_kp.mock.side_effect = lib_exc.NotFound('yay')
|
||||
self.mock_sg_network.mock.side_effect = lib_exc.NotFound('yay')
|
||||
self.mock_fip_network.mock.side_effect = lib_exc.NotFound('yay')
|
||||
vr.clear_validation_resources(
|
||||
self.os,
|
||||
floating_ip=FAKE_FIP_NEUTRON['floatingip'],
|
||||
security_group=FAKE_SECURITY_GROUP['security_group'],
|
||||
keypair=FAKE_KEYPAIR['keypair'],
|
||||
use_neutron=True)
|
||||
# Clients calls are still made, but not the wait call
|
||||
self.assertGreater(self.mock_kp.mock.call_count, 0)
|
||||
self.assertGreater(self.mock_sg_network.mock.call_count, 0)
|
||||
self.assertEqual(self.mock_sg_wait_network.mock.call_count, 0)
|
||||
self.assertGreater(self.mock_fip_network.mock.call_count, 0)
|
||||
|
||||
|
||||
class TestValidationResourcesFixture(base.TestCase):
|
||||
|
||||
@mock.patch.object(vr, 'create_validation_resources', autospec=True)
|
||||
def test_use_fixture(self, mock_vr):
|
||||
exp_vr = dict(keypair='keypair',
|
||||
floating_ip='floating_ip',
|
||||
security_group='security_group')
|
||||
mock_vr.return_value = exp_vr
|
||||
exp_clients = 'clients'
|
||||
exp_parameters = dict(keypair=True, floating_ip=True,
|
||||
security_group=True, security_group_rules=True,
|
||||
ethertype='v6', use_neutron=True,
|
||||
floating_network_id='fnid',
|
||||
floating_network_name='fnname')
|
||||
# First mock cleanup
|
||||
self.useFixture(fixtures.MockPatchObject(
|
||||
vr, 'clear_validation_resources', autospec=True))
|
||||
# And then use vr fixture, so when the fixture is cleaned-up, the mock
|
||||
# is still there
|
||||
vr_fixture = self.useFixture(vr.ValidationResourcesFixture(
|
||||
exp_clients, **exp_parameters))
|
||||
# Assert vr have been provisioned
|
||||
mock_vr.assert_called_once_with(exp_clients, **exp_parameters)
|
||||
# Assert vr have been setup in the fixture
|
||||
self.assertEqual(exp_vr, vr_fixture.resources)
|
||||
|
||||
@mock.patch.object(vr, 'clear_validation_resources', autospec=True)
|
||||
@mock.patch.object(vr, 'create_validation_resources', autospec=True)
|
||||
def test_use_fixture_context(self, mock_vr, mock_clear):
|
||||
exp_vr = dict(keypair='keypair',
|
||||
floating_ip='floating_ip',
|
||||
security_group='security_group')
|
||||
mock_vr.return_value = exp_vr
|
||||
exp_clients = 'clients'
|
||||
exp_parameters = dict(keypair=True, floating_ip=True,
|
||||
security_group=True, security_group_rules=True,
|
||||
ethertype='v6', use_neutron=True,
|
||||
floating_network_id='fnid',
|
||||
floating_network_name='fnname')
|
||||
with vr.ValidationResourcesFixture(exp_clients,
|
||||
**exp_parameters) as vr_fixture:
|
||||
# Assert vr have been provisioned
|
||||
mock_vr.assert_called_once_with(exp_clients, **exp_parameters)
|
||||
# Assert vr have been setup in the fixture
|
||||
self.assertEqual(exp_vr, vr_fixture.resources)
|
||||
# After context manager is closed, clear is invoked
|
||||
exp_vr['use_neutron'] = exp_parameters['use_neutron']
|
||||
mock_clear.assert_called_once_with(exp_clients, **exp_vr)
|
||||
@@ -40,6 +40,7 @@ from tempest.tests import base
|
||||
from tempest.tests import fake_config
|
||||
from tempest.tests.lib import fake_http
|
||||
from tempest.tests.lib import fake_identity
|
||||
from tempest.tests.lib.services import registry_fixture
|
||||
|
||||
|
||||
class TestDynamicCredentialProvider(base.TestCase):
|
||||
@@ -62,6 +63,7 @@ class TestDynamicCredentialProvider(base.TestCase):
|
||||
def setUp(self):
|
||||
super(TestDynamicCredentialProvider, self).setUp()
|
||||
self.useFixture(fake_config.ConfigFixture())
|
||||
self.useFixture(registry_fixture.RegistryFixture())
|
||||
self.patchobject(config, 'TempestConfigPrivate',
|
||||
fake_config.FakePrivate)
|
||||
self.patchobject(self.token_client_class, 'raw_request',
|
||||
|
||||
@@ -32,6 +32,7 @@ from tempest.lib import exceptions as lib_exc
|
||||
from tempest.tests import base
|
||||
from tempest.tests import fake_config
|
||||
from tempest.tests.lib import fake_identity
|
||||
from tempest.tests.lib.services import registry_fixture
|
||||
|
||||
|
||||
class TestPreProvisionedCredentials(base.TestCase):
|
||||
@@ -92,9 +93,8 @@ class TestPreProvisionedCredentials(base.TestCase):
|
||||
return_value=self.test_accounts))
|
||||
self.useFixture(fixtures.MockPatch(
|
||||
'os.path.isfile', return_value=True))
|
||||
# NOTE(andreaf) Ensure config is loaded so service clients are
|
||||
# registered in the registry before tests
|
||||
config.service_client_config()
|
||||
# Make sure we leave the registry clean
|
||||
self.useFixture(registry_fixture.RegistryFixture())
|
||||
|
||||
def tearDown(self):
|
||||
super(TestPreProvisionedCredentials, self).tearDown()
|
||||
|
||||
@@ -0,0 +1,65 @@
|
||||
# Copyright 2017 IBM Corp.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import fixtures
|
||||
|
||||
from tempest.lib.services import clients
|
||||
|
||||
|
||||
class RegistryFixture(fixtures.Fixture):
|
||||
"""A fixture to setup a test client registry
|
||||
|
||||
The clients registry is a singleton. In Tempest it's filled with
|
||||
content from configuration. When testing Tempest lib classes without
|
||||
configuration it's handy to have the registry setup to be able to access
|
||||
service client factories.
|
||||
|
||||
This fixture sets up the registry using a fake plugin, which includes all
|
||||
services specified at __init__ time. Any other plugin in the registry
|
||||
is removed at setUp time. The fake plugin is removed from the registry
|
||||
on cleanup.
|
||||
"""
|
||||
|
||||
PLUGIN_NAME = 'fake_plugin_for_test'
|
||||
|
||||
def __init__(self):
|
||||
"""Initialise the registry fixture"""
|
||||
self.services = set(['compute', 'identity.v2', 'identity.v3',
|
||||
'image.v1', 'image.v2', 'network', 'volume.v1',
|
||||
'volume.v2', 'volume.v3'])
|
||||
|
||||
def _setUp(self):
|
||||
# Cleanup the registry
|
||||
registry = clients.ClientsRegistry()
|
||||
registry._service_clients = {}
|
||||
# Prepare the clients for registration
|
||||
all_clients = []
|
||||
service_clients = clients.tempest_modules()
|
||||
for sc in self.services:
|
||||
sc_module = service_clients[sc]
|
||||
sc_unversioned = sc.split('.')[0]
|
||||
sc_name = sc.replace('.', '_')
|
||||
# Pass the bare minimum params to satisfy the clients interface
|
||||
service_client_data = dict(
|
||||
name=sc_name, service_version=sc, service=sc_unversioned,
|
||||
module_path=sc_module.__name__,
|
||||
client_names=sc_module.__all__)
|
||||
all_clients.append(service_client_data)
|
||||
registry.register_service_client(self.PLUGIN_NAME, all_clients)
|
||||
|
||||
def _cleanup():
|
||||
del registry._service_clients[self.PLUGIN_NAME]
|
||||
|
||||
self.addCleanup(_cleanup)
|
||||
@@ -17,9 +17,16 @@ from tempest.lib.services import clients
|
||||
from tempest.test_discover import plugins
|
||||
from tempest.tests import base
|
||||
from tempest.tests import fake_tempest_plugin as fake_plugin
|
||||
from tempest.tests.lib.services import registry_fixture
|
||||
|
||||
|
||||
class TestPluginDiscovery(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestPluginDiscovery, self).setUp()
|
||||
# Make sure we leave the registry clean
|
||||
self.useFixture(registry_fixture.RegistryFixture())
|
||||
|
||||
def test_load_tests_with_one_plugin(self):
|
||||
# we can't mock stevedore since it's a singleton and already executed
|
||||
# during test discovery. So basically this test covers the plugin loop
|
||||
|
||||
Reference in New Issue
Block a user