Scenarios: remove trivial wrapper methods

These methods are harmful: they break the code-reading flow, they keep
reinventing the wheel, they give the impression we don't have consistent
names for methods, etc.

"There should be one-- and preferably only one --obvious way to do it."

Change-Id: I758f003c795b656823eb81503e6e052659b257bb
This commit is contained in:
Jordan Pittier 2016-01-26 17:10:55 +01:00
parent c70440dce4
commit bbb1712ed0
7 changed files with 23 additions and 59 deletions

View File

@ -1115,12 +1115,6 @@ class NetworkScenarioTest(ScenarioTest):
return rules
def _ssh_to_server(self, server, private_key):
ssh_login = CONF.validation.image_ssh_user
return self.get_remote_client(server,
username=ssh_login,
private_key=private_key)
def _get_router(self, client=None, tenant_id=None):
"""Retrieve a router for the given tenant id.

View File

@ -43,16 +43,14 @@ class TestAggregatesBasicOps(manager.ScenarioTest):
def _create_aggregate(self, **kwargs):
aggregate = (self.aggregates_client.create_aggregate(**kwargs)
['aggregate'])
self.addCleanup(self._delete_aggregate, aggregate)
self.addCleanup(self.aggregates_client.delete_aggregate,
aggregate['id'])
aggregate_name = kwargs['name']
availability_zone = kwargs['availability_zone']
self.assertEqual(aggregate['name'], aggregate_name)
self.assertEqual(aggregate['availability_zone'], availability_zone)
return aggregate
def _delete_aggregate(self, aggregate):
self.aggregates_client.delete_aggregate(aggregate['id'])
def _get_host_name(self):
hosts = self.hosts_client.list_hosts()['hosts']
self.assertTrue(len(hosts) >= 1)

View File

@ -62,12 +62,6 @@ class TestMinimumBasicScenario(manager.ScenarioTest):
server, custom_matchers.MatchesDictExceptForKeys(
got_server, excluded_keys=excluded_keys))
def cinder_create(self):
return self.create_volume()
def cinder_list(self):
return self.volumes_client.list_volumes()['volumes']
def cinder_show(self, volume):
got_volume = self.volumes_client.show_volume(volume['id'])['volume']
self.assertEqual(volume, got_volume)
@ -115,8 +109,8 @@ class TestMinimumBasicScenario(manager.ScenarioTest):
self.nova_show(server)
volume = self.cinder_create()
volumes = self.cinder_list()
volume = self.create_volume()
volumes = self.volumes_client.list_volumes()['volumes']
self.assertIn(volume['id'], [x['id'] for x in volumes])
self.cinder_show(volume)

View File

@ -240,8 +240,8 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
old_floating_ip, server = self.floating_ip_tuple
ip_address = old_floating_ip.floating_ip_address
private_key = self._get_server_key(server)
ssh_client = self.get_remote_client(ip_address,
private_key=private_key)
ssh_client = self.get_remote_client(
ip_address, private_key=private_key)
old_nic_list = self._get_server_nics(ssh_client)
# get a port from a list of one item
port_list = self._list_ports(device_id=server['id'])
@ -336,7 +336,8 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
should_connect=True):
ip_address = floating_ip.floating_ip_address
private_key = self._get_server_key(self.floating_ip_tuple.server)
ssh_source = self._ssh_to_server(ip_address, private_key)
ssh_source = self.get_remote_client(
ip_address, private_key=private_key)
for remote_ip in address_list:
if should_connect:
@ -553,7 +554,8 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
floating_ip, server = self.floating_ip_tuple
ip_address = floating_ip.floating_ip_address
private_key = self._get_server_key(server)
ssh_client = self._ssh_to_server(ip_address, private_key)
ssh_client = self.get_remote_client(
ip_address, private_key=private_key)
dns_servers = [initial_dns_server]
servers = ssh_client.get_dns_servers()

View File

@ -321,8 +321,8 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
access_point_ssh = \
self.floating_ips[tenant.access_point['id']].floating_ip_address
private_key = tenant.keypair['private_key']
access_point_ssh = self._ssh_to_server(access_point_ssh,
private_key=private_key)
access_point_ssh = self.get_remote_client(
access_point_ssh, private_key=private_key)
return access_point_ssh
def _check_connectivity(self, access_point, ip, should_succeed=True):

View File

@ -69,10 +69,7 @@ class TestServerBasicOps(manager.ScenarioTest):
image=self.image_ref, flavor=self.flavor_ref,
ssh=self.run_ssh, ssh_user=self.ssh_user))
def add_keypair(self):
self.keypair = self.create_keypair()
def verify_ssh(self):
def verify_ssh(self, keypair):
if self.run_ssh:
# Obtain a floating IP
self.fip = self.create_floating_ip(self.instance)['ip']
@ -80,7 +77,7 @@ class TestServerBasicOps(manager.ScenarioTest):
self.ssh_client = self.get_remote_client(
server_or_ip=self.fip,
username=self.image_utils.ssh_user(self.image_ref),
private_key=self.keypair['private_key'])
private_key=keypair['private_key'])
def verify_metadata(self):
if self.run_ssh and CONF.compute_feature_enabled.metadata_service:
@ -123,19 +120,19 @@ class TestServerBasicOps(manager.ScenarioTest):
@test.attr(type='smoke')
@test.services('compute', 'network')
def test_server_basicops(self):
self.add_keypair()
keypair = self.create_keypair()
self.security_group = self._create_security_group()
security_groups = [{'name': self.security_group['name']}]
self.md = {'meta1': 'data1', 'meta2': 'data2', 'metaN': 'dataN'}
self.instance = self.create_server(
image_id=self.image_ref,
flavor=self.flavor_ref,
key_name=self.keypair['name'],
key_name=keypair['name'],
security_groups=security_groups,
config_drive=CONF.compute_feature_enabled.config_drive,
metadata=self.md,
wait_until='ACTIVE')
self.verify_ssh()
self.verify_ssh(keypair)
self.verify_metadata()
self.verify_metadata_on_config_drive()
self.servers_client.delete_server(self.instance['id'])

View File

@ -60,10 +60,6 @@ class TestStampPattern(manager.ScenarioTest):
if not CONF.volume_feature_enabled.snapshot:
raise cls.skipException("Cinder volume snapshots are disabled")
def _wait_for_volume_snapshot_status(self, volume_snapshot, status):
self.snapshots_client.wait_for_snapshot_status(volume_snapshot['id'],
status)
def _create_volume_snapshot(self, volume):
snapshot_name = data_utils.rand_name('scenario-snapshot')
snapshot = self.snapshots_client.create_snapshot(
@ -78,29 +74,12 @@ class TestStampPattern(manager.ScenarioTest):
except lib_exc.NotFound:
pass
self.addCleanup(cleaner)
self._wait_for_volume_status(volume, 'available')
self.volumes_client.wait_for_volume_status(volume['id'], 'available')
self.snapshots_client.wait_for_snapshot_status(snapshot['id'],
'available')
self.assertEqual(snapshot_name, snapshot['display_name'])
return snapshot
def _wait_for_volume_status(self, volume, status):
self.volumes_client.wait_for_volume_status(volume['id'], status)
def _create_volume(self, snapshot_id=None):
return self.create_volume(snapshot_id=snapshot_id)
def _attach_volume(self, server, volume):
attached_volume = self.servers_client.attach_volume(
server['id'], volumeId=volume['id'], device='/dev/%s'
% CONF.compute.volume_device_name)['volumeAttachment']
self.assertEqual(volume['id'], attached_volume['id'])
self._wait_for_volume_status(attached_volume, 'in-use')
def _detach_volume(self, server, volume):
self.servers_client.detach_volume(server['id'], volume['id'])
self._wait_for_volume_status(volume, 'available')
def _wait_for_volume_available_on_the_system(self, server_or_ip,
private_key):
ssh = self.get_remote_client(server_or_ip, private_key=private_key)
@ -126,7 +105,7 @@ class TestStampPattern(manager.ScenarioTest):
security_group = self._create_security_group()
# boot an instance and create a timestamp file in it
volume = self._create_volume()
volume = self.create_volume()
server = self.create_server(
image_id=CONF.compute.image_ref,
key_name=keypair['name'],
@ -136,13 +115,13 @@ class TestStampPattern(manager.ScenarioTest):
# create and add floating IP to server1
ip_for_server = self.get_server_or_ip(server)
self._attach_volume(server, volume)
self.nova_volume_attach(server, volume)
self._wait_for_volume_available_on_the_system(ip_for_server,
keypair['private_key'])
timestamp = self.create_timestamp(ip_for_server,
CONF.compute.volume_device_name,
private_key=keypair['private_key'])
self._detach_volume(server, volume)
self.nova_volume_detach(server, volume)
# snapshot the volume
volume_snapshot = self._create_volume_snapshot(volume)
@ -151,7 +130,7 @@ class TestStampPattern(manager.ScenarioTest):
snapshot_image = self.create_server_snapshot(server=server)
# create second volume from the snapshot(volume2)
volume_from_snapshot = self._create_volume(
volume_from_snapshot = self.create_volume(
snapshot_id=volume_snapshot['id'])
# boot second instance from the snapshot(instance2)
@ -164,7 +143,7 @@ class TestStampPattern(manager.ScenarioTest):
ip_for_snapshot = self.get_server_or_ip(server_from_snapshot)
# attach volume2 to instance2
self._attach_volume(server_from_snapshot, volume_from_snapshot)
self.nova_volume_attach(server_from_snapshot, volume_from_snapshot)
self._wait_for_volume_available_on_the_system(ip_for_snapshot,
keypair['private_key'])