Rename demo_cloud to user_cloud

The operator cloud is "operator_cloud" which is clear. Even though the
devstack user is called "demo" - the point of the non-operator cloud is
that it's an end-user connection. Rename demo to user to remove any
confusion/ambiguity. (also, it's the same number of letters so pep8
doesn't get borked)

Change-Id: Icfd2dd1687099798ed2cdf9349cc5b7d27dade58
This commit is contained in:
Monty Taylor 2017-02-11 08:37:24 -06:00
parent d2df08ecd8
commit eee55a8966
No known key found for this signature in database
GPG Key ID: 7BAE94BC7141A594
19 changed files with 284 additions and 284 deletions

View File

@ -27,7 +27,7 @@ class BaseFunctionalTestCase(base.TestCase):
self.config = occ.OpenStackConfig()
demo_config = self.config.get_one_cloud(cloud=demo_name)
self.demo_cloud = shade.OpenStackCloud(
self.user_cloud = shade.OpenStackCloud(
cloud_config=demo_config,
log_inner_exceptions=True)
operator_config = self.config.get_one_cloud(cloud=op_name)

View File

@ -29,7 +29,7 @@ class TestClusterTemplate(base.BaseFunctionalTestCase):
def setUp(self):
super(TestClusterTemplate, self).setUp()
if not self.demo_cloud.has_service('container-infra'):
if not self.user_cloud.has_service('container-infra'):
self.skipTest('Container service not supported by cloud')
self.ct = None
@ -58,10 +58,10 @@ class TestClusterTemplate(base.BaseFunctionalTestCase):
# add keypair to nova
with open('%s/id_rsa_shade.pub' % ssh_directory) as f:
key_content = f.read()
self.demo_cloud.create_keypair('testkey', key_content)
self.user_cloud.create_keypair('testkey', key_content)
# Test we can create a cluster_template and we get it returned
self.ct = self.demo_cloud.create_cluster_template(
self.ct = self.user_cloud.create_cluster_template(
name=name, image_id=image_id,
keypair_id=keypair_id, coe=coe)
self.assertEqual(self.ct['name'], name)
@ -74,40 +74,40 @@ class TestClusterTemplate(base.BaseFunctionalTestCase):
self.assertEqual(self.ct['server_type'], server_type)
# Test that we can list cluster_templates
cluster_templates = self.demo_cloud.list_cluster_templates()
cluster_templates = self.user_cloud.list_cluster_templates()
self.assertIsNotNone(cluster_templates)
# Test we get the same cluster_template with the
# get_cluster_template method
cluster_template_get = self.demo_cloud.get_cluster_template(
cluster_template_get = self.user_cloud.get_cluster_template(
self.ct['uuid'])
self.assertEqual(cluster_template_get['uuid'], self.ct['uuid'])
# Test the get method also works by name
cluster_template_get = self.demo_cloud.get_cluster_template(name)
cluster_template_get = self.user_cloud.get_cluster_template(name)
self.assertEqual(cluster_template_get['name'], self.ct['name'])
# Test we can update a field on the cluster_template and only that
# field is updated
cluster_template_update = self.demo_cloud.update_cluster_template(
cluster_template_update = self.user_cloud.update_cluster_template(
self.ct['uuid'], 'replace', tls_disabled=True)
self.assertEqual(
cluster_template_update['uuid'], self.ct['uuid'])
self.assertEqual(cluster_template_update['tls_disabled'], True)
# Test we can delete and get True returned
cluster_template_delete = self.demo_cloud.delete_cluster_template(
cluster_template_delete = self.user_cloud.delete_cluster_template(
self.ct['uuid'])
self.assertTrue(cluster_template_delete)
def cleanup(self, name):
if self.ct:
try:
self.demo_cloud.delete_cluster_template(self.ct['name'])
self.user_cloud.delete_cluster_template(self.ct['name'])
except:
pass
# delete keypair
self.demo_cloud.delete_keypair('testkey')
self.user_cloud.delete_keypair('testkey')
os.unlink('/tmp/.ssh/id_rsa_shade')
os.unlink('/tmp/.ssh/id_rsa_shade.pub')

View File

@ -28,10 +28,10 @@ from shade import _utils
class TestCompute(base.BaseFunctionalTestCase):
def setUp(self):
super(TestCompute, self).setUp()
self.flavor = pick_flavor(self.demo_cloud.list_flavors())
self.flavor = pick_flavor(self.user_cloud.list_flavors())
if self.flavor is None:
self.assertFalse('no sensible flavor available')
self.image = pick_image(self.demo_cloud.list_images())
self.image = pick_image(self.user_cloud.list_images())
if self.image is None:
self.assertFalse('no sensible image available')
self.server_name = self.getUniqueString()
@ -44,18 +44,18 @@ class TestCompute(base.BaseFunctionalTestCase):
a server can start the process of deleting a volume if it is booted
from that volume. This encapsulates that logic.
"""
server = self.demo_cloud.get_server(server_name)
server = self.user_cloud.get_server(server_name)
if not server:
return
volumes = self.demo_cloud.get_volumes(server)
self.demo_cloud.delete_server(server.name, wait=True)
volumes = self.user_cloud.get_volumes(server)
self.user_cloud.delete_server(server.name, wait=True)
for volume in volumes:
if volume.status != 'deleting':
self.demo_cloud.delete_volume(volume.id, wait=True)
self.user_cloud.delete_volume(volume.id, wait=True)
def test_create_and_delete_server(self):
self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
server = self.demo_cloud.create_server(
server = self.user_cloud.create_server(
name=self.server_name,
image=self.image,
flavor=self.flavor,
@ -65,12 +65,12 @@ class TestCompute(base.BaseFunctionalTestCase):
self.assertEqual(self.flavor.id, server['flavor']['id'])
self.assertIsNotNone(server['adminPass'])
self.assertTrue(
self.demo_cloud.delete_server(self.server_name, wait=True))
self.assertIsNone(self.demo_cloud.get_server(self.server_name))
self.user_cloud.delete_server(self.server_name, wait=True))
self.assertIsNone(self.user_cloud.get_server(self.server_name))
def test_list_all_servers(self):
self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
server = self.demo_cloud.create_server(
server = self.user_cloud.create_server(
name=self.server_name,
image=self.image,
flavor=self.flavor,
@ -87,12 +87,12 @@ class TestCompute(base.BaseFunctionalTestCase):
# Normal users are not allowed to pass all_projects=True
self.assertRaises(
exc.OpenStackCloudException,
self.demo_cloud.list_servers,
self.user_cloud.list_servers,
all_projects=True)
def test_create_server_image_flavor_dict(self):
self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
server = self.demo_cloud.create_server(
server = self.user_cloud.create_server(
name=self.server_name,
image={'id': self.image.id},
flavor={'id': self.flavor.id},
@ -102,33 +102,33 @@ class TestCompute(base.BaseFunctionalTestCase):
self.assertEqual(self.flavor.id, server['flavor']['id'])
self.assertIsNotNone(server['adminPass'])
self.assertTrue(
self.demo_cloud.delete_server(self.server_name, wait=True))
self.assertIsNone(self.demo_cloud.get_server(self.server_name))
self.user_cloud.delete_server(self.server_name, wait=True))
self.assertIsNone(self.user_cloud.get_server(self.server_name))
def test_get_server_console(self):
self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
server = self.demo_cloud.create_server(
server = self.user_cloud.create_server(
name=self.server_name,
image=self.image,
flavor=self.flavor,
wait=True)
for _ in _utils._iterate_timeout(
5, "Did not get more than 0 lines in the console log"):
log = self.demo_cloud.get_server_console(server=server)
log = self.user_cloud.get_server_console(server=server)
self.assertTrue(isinstance(log, six.string_types))
if len(log) > 0:
break
def test_get_server_console_name_or_id(self):
self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
self.demo_cloud.create_server(
self.user_cloud.create_server(
name=self.server_name,
image=self.image,
flavor=self.flavor,
wait=True)
for _ in _utils._iterate_timeout(
5, "Did not get more than 0 lines in the console log"):
log = self.demo_cloud.get_server_console(server=self.server_name)
log = self.user_cloud.get_server_console(server=self.server_name)
self.assertTrue(isinstance(log, six.string_types))
if len(log) > 0:
break
@ -136,12 +136,12 @@ class TestCompute(base.BaseFunctionalTestCase):
def test_get_server_console_bad_server(self):
self.assertRaises(
exc.OpenStackCloudException,
self.demo_cloud.get_server_console,
self.user_cloud.get_server_console,
server=self.server_name)
def test_create_and_delete_server_with_admin_pass(self):
self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
server = self.demo_cloud.create_server(
server = self.user_cloud.create_server(
name=self.server_name,
image=self.image,
flavor=self.flavor,
@ -152,26 +152,26 @@ class TestCompute(base.BaseFunctionalTestCase):
self.assertEqual(self.flavor.id, server['flavor']['id'])
self.assertEqual(server['adminPass'], 'sheiqu9loegahSh')
self.assertTrue(
self.demo_cloud.delete_server(self.server_name, wait=True))
self.assertIsNone(self.demo_cloud.get_server(self.server_name))
self.user_cloud.delete_server(self.server_name, wait=True))
self.assertIsNone(self.user_cloud.get_server(self.server_name))
def test_get_image_id(self):
self.assertEqual(
self.image.id, self.demo_cloud.get_image_id(self.image.id))
self.image.id, self.user_cloud.get_image_id(self.image.id))
self.assertEqual(
self.image.id, self.demo_cloud.get_image_id(self.image.name))
self.image.id, self.user_cloud.get_image_id(self.image.name))
def test_get_image_name(self):
self.assertEqual(
self.image.name, self.demo_cloud.get_image_name(self.image.id))
self.image.name, self.user_cloud.get_image_name(self.image.id))
self.assertEqual(
self.image.name, self.demo_cloud.get_image_name(self.image.name))
self.image.name, self.user_cloud.get_image_name(self.image.name))
def _assert_volume_attach(self, server, volume_id=None, image=''):
self.assertEqual(self.server_name, server['name'])
self.assertEqual(image, server['image'])
self.assertEqual(self.flavor.id, server['flavor']['id'])
volumes = self.demo_cloud.get_volumes(server)
volumes = self.user_cloud.get_volumes(server)
self.assertEqual(1, len(volumes))
volume = volumes[0]
if volume_id:
@ -183,10 +183,10 @@ class TestCompute(base.BaseFunctionalTestCase):
return volume_id
def test_create_boot_from_volume_image(self):
if not self.demo_cloud.has_service('volume'):
if not self.user_cloud.has_service('volume'):
self.skipTest('volume service not supported by cloud')
self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
server = self.demo_cloud.create_server(
server = self.user_cloud.create_server(
name=self.server_name,
image=self.image,
flavor=self.flavor,
@ -194,21 +194,21 @@ class TestCompute(base.BaseFunctionalTestCase):
volume_size=1,
wait=True)
volume_id = self._assert_volume_attach(server)
volume = self.demo_cloud.get_volume(volume_id)
volume = self.user_cloud.get_volume(volume_id)
self.assertIsNotNone(volume)
self.assertEqual(volume['name'], volume['display_name'])
self.assertEqual(True, volume['bootable'])
self.assertEqual(server['id'], volume['attachments'][0]['server_id'])
self.assertTrue(self.demo_cloud.delete_server(server.id, wait=True))
self.assertTrue(self.demo_cloud.delete_volume(volume.id, wait=True))
self.assertIsNone(self.demo_cloud.get_server(server.id))
self.assertIsNone(self.demo_cloud.get_volume(volume.id))
self.assertTrue(self.user_cloud.delete_server(server.id, wait=True))
self.assertTrue(self.user_cloud.delete_volume(volume.id, wait=True))
self.assertIsNone(self.user_cloud.get_server(server.id))
self.assertIsNone(self.user_cloud.get_volume(volume.id))
def test_create_terminate_volume_image(self):
if not self.demo_cloud.has_service('volume'):
if not self.user_cloud.has_service('volume'):
self.skipTest('volume service not supported by cloud')
self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
server = self.demo_cloud.create_server(
server = self.user_cloud.create_server(
name=self.server_name,
image=self.image,
flavor=self.flavor,
@ -218,21 +218,21 @@ class TestCompute(base.BaseFunctionalTestCase):
wait=True)
volume_id = self._assert_volume_attach(server)
self.assertTrue(
self.demo_cloud.delete_server(self.server_name, wait=True))
volume = self.demo_cloud.get_volume(volume_id)
self.user_cloud.delete_server(self.server_name, wait=True))
volume = self.user_cloud.get_volume(volume_id)
# We can either get None (if the volume delete was quick), or a volume
# that is in the process of being deleted.
if volume:
self.assertEqual('deleting', volume.status)
self.assertIsNone(self.demo_cloud.get_server(self.server_name))
self.assertIsNone(self.user_cloud.get_server(self.server_name))
def test_create_boot_from_volume_preexisting(self):
if not self.demo_cloud.has_service('volume'):
if not self.user_cloud.has_service('volume'):
self.skipTest('volume service not supported by cloud')
self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
volume = self.demo_cloud.create_volume(
volume = self.user_cloud.create_volume(
size=1, name=self.server_name, image=self.image, wait=True)
server = self.demo_cloud.create_server(
server = self.user_cloud.create_server(
name=self.server_name,
image=None,
flavor=self.flavor,
@ -241,25 +241,25 @@ class TestCompute(base.BaseFunctionalTestCase):
wait=True)
volume_id = self._assert_volume_attach(server, volume_id=volume['id'])
self.assertTrue(
self.demo_cloud.delete_server(self.server_name, wait=True))
self.addCleanup(self.demo_cloud.delete_volume, volume_id)
volume = self.demo_cloud.get_volume(volume_id)
self.user_cloud.delete_server(self.server_name, wait=True))
self.addCleanup(self.user_cloud.delete_volume, volume_id)
volume = self.user_cloud.get_volume(volume_id)
self.assertIsNotNone(volume)
self.assertEqual(volume['name'], volume['display_name'])
self.assertEqual(True, volume['bootable'])
self.assertEqual([], volume['attachments'])
self.assertTrue(self.demo_cloud.delete_volume(volume_id))
self.assertIsNone(self.demo_cloud.get_server(self.server_name))
self.assertIsNone(self.demo_cloud.get_volume(volume_id))
self.assertTrue(self.user_cloud.delete_volume(volume_id))
self.assertIsNone(self.user_cloud.get_server(self.server_name))
self.assertIsNone(self.user_cloud.get_volume(volume_id))
def test_create_boot_attach_volume(self):
if not self.demo_cloud.has_service('volume'):
if not self.user_cloud.has_service('volume'):
self.skipTest('volume service not supported by cloud')
self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
volume = self.demo_cloud.create_volume(
volume = self.user_cloud.create_volume(
size=1, name=self.server_name, image=self.image, wait=True)
self.addCleanup(self.demo_cloud.delete_volume, volume['id'])
server = self.demo_cloud.create_server(
self.addCleanup(self.user_cloud.delete_volume, volume['id'])
server = self.user_cloud.create_server(
name=self.server_name,
flavor=self.flavor,
image=self.image,
@ -269,22 +269,22 @@ class TestCompute(base.BaseFunctionalTestCase):
volume_id = self._assert_volume_attach(
server, volume_id=volume['id'], image={'id': self.image['id']})
self.assertTrue(
self.demo_cloud.delete_server(self.server_name, wait=True))
volume = self.demo_cloud.get_volume(volume_id)
self.user_cloud.delete_server(self.server_name, wait=True))
volume = self.user_cloud.get_volume(volume_id)
self.assertIsNotNone(volume)
self.assertEqual(volume['name'], volume['display_name'])
self.assertEqual([], volume['attachments'])
self.assertTrue(self.demo_cloud.delete_volume(volume_id))
self.assertIsNone(self.demo_cloud.get_server(self.server_name))
self.assertIsNone(self.demo_cloud.get_volume(volume_id))
self.assertTrue(self.user_cloud.delete_volume(volume_id))
self.assertIsNone(self.user_cloud.get_server(self.server_name))
self.assertIsNone(self.user_cloud.get_volume(volume_id))
def test_create_boot_from_volume_preexisting_terminate(self):
if not self.demo_cloud.has_service('volume'):
if not self.user_cloud.has_service('volume'):
self.skipTest('volume service not supported by cloud')
self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
volume = self.demo_cloud.create_volume(
volume = self.user_cloud.create_volume(
size=1, name=self.server_name, image=self.image, wait=True)
server = self.demo_cloud.create_server(
server = self.user_cloud.create_server(
name=self.server_name,
image=None,
flavor=self.flavor,
@ -294,69 +294,69 @@ class TestCompute(base.BaseFunctionalTestCase):
wait=True)
volume_id = self._assert_volume_attach(server, volume_id=volume['id'])
self.assertTrue(
self.demo_cloud.delete_server(self.server_name, wait=True))
volume = self.demo_cloud.get_volume(volume_id)
self.user_cloud.delete_server(self.server_name, wait=True))
volume = self.user_cloud.get_volume(volume_id)
# We can either get None (if the volume delete was quick), or a volume
# that is in the process of being deleted.
if volume:
self.assertEqual('deleting', volume.status)
self.assertIsNone(self.demo_cloud.get_server(self.server_name))
self.assertIsNone(self.user_cloud.get_server(self.server_name))
def test_create_image_snapshot_wait_active(self):
self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
server = self.demo_cloud.create_server(
server = self.user_cloud.create_server(
name=self.server_name,
image=self.image,
flavor=self.flavor,
admin_pass='sheiqu9loegahSh',
wait=True)
image = self.demo_cloud.create_image_snapshot('test-snapshot', server,
image = self.user_cloud.create_image_snapshot('test-snapshot', server,
wait=True)
self.addCleanup(self.demo_cloud.delete_image, image['id'])
self.addCleanup(self.user_cloud.delete_image, image['id'])
self.assertEqual('active', image['status'])
def test_set_and_delete_metadata(self):
self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
self.demo_cloud.create_server(
self.user_cloud.create_server(
name=self.server_name,
image=self.image,
flavor=self.flavor,
wait=True)
self.demo_cloud.set_server_metadata(self.server_name,
self.user_cloud.set_server_metadata(self.server_name,
{'key1': 'value1',
'key2': 'value2'})
updated_server = self.demo_cloud.get_server(self.server_name)
updated_server = self.user_cloud.get_server(self.server_name)
self.assertEqual(set(updated_server.metadata.items()),
set({'key1': 'value1', 'key2': 'value2'}.items()))
self.demo_cloud.set_server_metadata(self.server_name,
self.user_cloud.set_server_metadata(self.server_name,
{'key2': 'value3'})
updated_server = self.demo_cloud.get_server(self.server_name)
updated_server = self.user_cloud.get_server(self.server_name)
self.assertEqual(set(updated_server.metadata.items()),
set({'key1': 'value1', 'key2': 'value3'}.items()))
self.demo_cloud.delete_server_metadata(self.server_name, ['key2'])
updated_server = self.demo_cloud.get_server(self.server_name)
self.user_cloud.delete_server_metadata(self.server_name, ['key2'])
updated_server = self.user_cloud.get_server(self.server_name)
self.assertEqual(set(updated_server.metadata.items()),
set({'key1': 'value1'}.items()))
self.demo_cloud.delete_server_metadata(self.server_name, ['key1'])
updated_server = self.demo_cloud.get_server(self.server_name)
self.user_cloud.delete_server_metadata(self.server_name, ['key1'])
updated_server = self.user_cloud.get_server(self.server_name)
self.assertEqual(set(updated_server.metadata.items()), set([]))
self.assertRaises(
exc.OpenStackCloudException,
self.demo_cloud.delete_server_metadata,
self.user_cloud.delete_server_metadata,
self.server_name, ['key1'])
def test_update_server(self):
self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
self.demo_cloud.create_server(
self.user_cloud.create_server(
name=self.server_name,
image=self.image,
flavor=self.flavor,
wait=True)
server_updated = self.demo_cloud.update_server(
server_updated = self.user_cloud.update_server(
self.server_name,
name='new_name'
)

View File

@ -39,7 +39,7 @@ class TestDevstack(base.BaseFunctionalTestCase):
def test_has_service(self):
if os.environ.get('SHADE_HAS_{env}'.format(env=self.env), '0') == '1':
self.assertTrue(self.demo_cloud.has_service(self.service))
self.assertTrue(self.user_cloud.has_service(self.service))
class TestKeystoneVersion(base.BaseFunctionalTestCase):

View File

@ -114,7 +114,7 @@ class TestFlavor(base.BaseFunctionalTestCase):
new_flavor = self.operator_cloud.create_flavor(**private_kwargs)
# Validate the 'demo' user cannot see the new flavor
flavors = self.demo_cloud.search_flavors(priv_flavor_name)
flavors = self.user_cloud.search_flavors(priv_flavor_name)
self.assertEqual(0, len(flavors))
# We need the tenant ID for the 'demo' user
@ -125,7 +125,7 @@ class TestFlavor(base.BaseFunctionalTestCase):
self.operator_cloud.add_flavor_access(new_flavor['id'], project['id'])
# Now see if the 'demo' user has access to it
flavors = self.demo_cloud.search_flavors(priv_flavor_name)
flavors = self.user_cloud.search_flavors(priv_flavor_name)
self.assertEqual(1, len(flavors))
self.assertEqual(priv_flavor_name, flavors[0]['name'])
@ -138,7 +138,7 @@ class TestFlavor(base.BaseFunctionalTestCase):
# Now revoke the access and make sure we can't find it
self.operator_cloud.remove_flavor_access(new_flavor['id'],
project['id'])
flavors = self.demo_cloud.search_flavors(priv_flavor_name)
flavors = self.user_cloud.search_flavors(priv_flavor_name)
self.assertEqual(0, len(flavors))
def test_set_unset_flavor_specs(self):

View File

@ -36,9 +36,9 @@ class TestFloatingIP(base.BaseFunctionalTestCase):
def setUp(self):
super(TestFloatingIP, self).setUp()
self.nova = self.demo_cloud.nova_client
if self.demo_cloud.has_service('network'):
self.neutron = self.demo_cloud.neutron_client
self.nova = self.user_cloud.nova_client
if self.user_cloud.has_service('network'):
self.neutron = self.user_cloud.neutron_client
self.flavor = pick_flavor(self.nova.flavors.list())
if self.flavor is None:
self.assertFalse('no sensible flavor available')
@ -56,9 +56,9 @@ class TestFloatingIP(base.BaseFunctionalTestCase):
exception_list = list()
# Delete stale networks as well as networks created for this test
if self.demo_cloud.has_service('network'):
if self.user_cloud.has_service('network'):
# Delete routers
for r in self.demo_cloud.list_routers():
for r in self.user_cloud.list_routers():
try:
if r['name'].startswith(self.new_item_name):
# ToDo: update_router currently won't allow removing
@ -69,7 +69,7 @@ class TestFloatingIP(base.BaseFunctionalTestCase):
self.neutron.update_router(
router=r['id'], body={'router': router})
# ToDo: Shade currently doesn't have methods for this
for s in self.demo_cloud.list_subnets():
for s in self.user_cloud.list_subnets():
if s['name'].startswith(self.new_item_name):
try:
self.neutron.remove_interface_router(
@ -77,23 +77,23 @@ class TestFloatingIP(base.BaseFunctionalTestCase):
body={'subnet_id': s['id']})
except Exception:
pass
self.demo_cloud.delete_router(name_or_id=r['id'])
self.user_cloud.delete_router(name_or_id=r['id'])
except Exception as e:
exception_list.append(str(e))
continue
# Delete subnets
for s in self.demo_cloud.list_subnets():
for s in self.user_cloud.list_subnets():
if s['name'].startswith(self.new_item_name):
try:
self.demo_cloud.delete_subnet(name_or_id=s['id'])
self.user_cloud.delete_subnet(name_or_id=s['id'])
except Exception as e:
exception_list.append(str(e))
continue
# Delete networks
for n in self.demo_cloud.list_networks():
for n in self.user_cloud.list_networks():
if n['name'].startswith(self.new_item_name):
try:
self.demo_cloud.delete_network(name_or_id=n['id'])
self.user_cloud.delete_network(name_or_id=n['id'])
except Exception as e:
exception_list.append(str(e))
continue
@ -131,11 +131,11 @@ class TestFloatingIP(base.BaseFunctionalTestCase):
fixed_ip = meta.get_server_private_ip(server)
for ip in self.demo_cloud.list_floating_ips():
for ip in self.user_cloud.list_floating_ips():
if (ip.get('fixed_ip', None) == fixed_ip
or ip.get('fixed_ip_address', None) == fixed_ip):
try:
self.demo_cloud.delete_floating_ip(ip['id'])
self.user_cloud.delete_floating_ip(ip['id'])
except Exception as e:
exception_list.append(str(e))
continue
@ -146,24 +146,24 @@ class TestFloatingIP(base.BaseFunctionalTestCase):
raise OpenStackCloudException('\n'.join(exception_list))
def _setup_networks(self):
if self.demo_cloud.has_service('network'):
if self.user_cloud.has_service('network'):
# Create a network
self.test_net = self.demo_cloud.create_network(
self.test_net = self.user_cloud.create_network(
name=self.new_item_name + '_net')
# Create a subnet on it
self.test_subnet = self.demo_cloud.create_subnet(
self.test_subnet = self.user_cloud.create_subnet(
subnet_name=self.new_item_name + '_subnet',
network_name_or_id=self.test_net['id'],
cidr='10.24.4.0/24',
enable_dhcp=True
)
# Create a router
self.test_router = self.demo_cloud.create_router(
self.test_router = self.user_cloud.create_router(
name=self.new_item_name + '_router')
# Attach the router to an external network
ext_nets = self.demo_cloud.search_networks(
ext_nets = self.user_cloud.search_networks(
filters={'router:external': True})
self.demo_cloud.update_router(
self.user_cloud.update_router(
name_or_id=self.test_router['id'],
ext_gateway_net_id=ext_nets[0]['id'])
# Attach the router to the internal subnet
@ -176,10 +176,10 @@ class TestFloatingIP(base.BaseFunctionalTestCase):
self.addDetail(
'networks-neutron',
content.text_content(pprint.pformat(
self.demo_cloud.list_networks())))
self.user_cloud.list_networks())))
else:
# ToDo: remove once we have list/get methods for nova networks
nets = self.demo_cloud.nova_client.networks.list()
nets = self.user_cloud.nova_client.networks.list()
self.addDetail(
'networks-nova',
content.text_content(pprint.pformat(
@ -189,8 +189,8 @@ class TestFloatingIP(base.BaseFunctionalTestCase):
def test_private_ip(self):
self._setup_networks()
new_server = self.demo_cloud.get_openstack_vars(
self.demo_cloud.create_server(
new_server = self.user_cloud.get_openstack_vars(
self.user_cloud.create_server(
wait=True, name=self.new_item_name + '_server',
image=self.image,
flavor=self.flavor, nics=[self.nic]))
@ -202,7 +202,7 @@ class TestFloatingIP(base.BaseFunctionalTestCase):
def test_add_auto_ip(self):
self._setup_networks()
new_server = self.demo_cloud.create_server(
new_server = self.user_cloud.create_server(
wait=True, name=self.new_item_name + '_server',
image=self.image,
flavor=self.flavor, nics=[self.nic])
@ -212,17 +212,17 @@ class TestFloatingIP(base.BaseFunctionalTestCase):
ip = None
for _ in _utils._iterate_timeout(
self.timeout, "Timeout waiting for IP address to be attached"):
ip = meta.get_server_external_ipv4(self.demo_cloud, new_server)
ip = meta.get_server_external_ipv4(self.user_cloud, new_server)
if ip is not None:
break
new_server = self.demo_cloud.get_server(new_server.id)
new_server = self.user_cloud.get_server(new_server.id)
self.addCleanup(self._cleanup_ips, new_server)
def test_detach_ip_from_server(self):
self._setup_networks()
new_server = self.demo_cloud.create_server(
new_server = self.user_cloud.create_server(
wait=True, name=self.new_item_name + '_server',
image=self.image,
flavor=self.flavor, nics=[self.nic])
@ -232,29 +232,29 @@ class TestFloatingIP(base.BaseFunctionalTestCase):
ip = None
for _ in _utils._iterate_timeout(
self.timeout, "Timeout waiting for IP address to be attached"):
ip = meta.get_server_external_ipv4(self.demo_cloud, new_server)
ip = meta.get_server_external_ipv4(self.user_cloud, new_server)
if ip is not None:
break
new_server = self.demo_cloud.get_server(new_server.id)
new_server = self.user_cloud.get_server(new_server.id)
self.addCleanup(self._cleanup_ips, new_server)
f_ip = self.demo_cloud.get_floating_ip(
f_ip = self.user_cloud.get_floating_ip(
id=None, filters={'floating_ip_address': ip})
self.demo_cloud.detach_ip_from_server(
self.user_cloud.detach_ip_from_server(
server_id=new_server.id, floating_ip_id=f_ip['id'])
def test_list_floating_ips(self):
fip_admin = self.operator_cloud.create_floating_ip()
self.addCleanup(self.operator_cloud.delete_floating_ip, fip_admin.id)
fip_user = self.demo_cloud.create_floating_ip()
self.addCleanup(self.demo_cloud.delete_floating_ip, fip_user.id)
fip_user = self.user_cloud.create_floating_ip()
self.addCleanup(self.user_cloud.delete_floating_ip, fip_user.id)
# Get all the floating ips.
fip_id_list = [
fip.id for fip in self.operator_cloud.list_floating_ips()
]
if self.demo_cloud.has_service('network'):
if self.user_cloud.has_service('network'):
# Neutron returns all FIP for all projects by default
self.assertIn(fip_admin.id, fip_id_list)
self.assertIn(fip_user.id, fip_id_list)
@ -262,7 +262,7 @@ class TestFloatingIP(base.BaseFunctionalTestCase):
# Ask Neutron for only a subset of all the FIPs.
filtered_fip_id_list = [
fip.id for fip in self.operator_cloud.list_floating_ips(
{'tenant_id': self.demo_cloud.current_project_id}
{'tenant_id': self.user_cloud.current_project_id}
)
]
self.assertNotIn(fip_admin.id, filtered_fip_id_list)
@ -279,16 +279,16 @@ class TestFloatingIP(base.BaseFunctionalTestCase):
)
def test_search_floating_ips(self):
fip_user = self.demo_cloud.create_floating_ip()
self.addCleanup(self.demo_cloud.delete_floating_ip, fip_user.id)
fip_user = self.user_cloud.create_floating_ip()
self.addCleanup(self.user_cloud.delete_floating_ip, fip_user.id)
self.assertIn(
fip_user['id'],
[fip.id for fip in self.demo_cloud.search_floating_ips(
[fip.id for fip in self.user_cloud.search_floating_ips(
filters={"attached": False})]
)
self.assertNotIn(
fip_user['id'],
[fip.id for fip in self.demo_cloud.search_floating_ips(
[fip.id for fip in self.user_cloud.search_floating_ips(
filters={"attached": True})]
)

View File

@ -35,14 +35,14 @@ class TestFloatingIPPool(base.BaseFunctionalTestCase):
def setUp(self):
super(TestFloatingIPPool, self).setUp()
if not self.demo_cloud._has_nova_extension('os-floating-ip-pools'):
if not self.user_cloud._has_nova_extension('os-floating-ip-pools'):
# Skipping this test is floating-ip-pool extension is not
# available on the testing cloud
self.skip(
'Floating IP pools extension is not available')
def test_list_floating_ip_pools(self):
pools = self.demo_cloud.list_floating_ip_pools()
pools = self.user_cloud.list_floating_ip_pools()
if not pools:
self.assertFalse('no floating-ip pool available')

View File

@ -28,7 +28,7 @@ from shade.tests.functional.util import pick_image
class TestImage(base.BaseFunctionalTestCase):
def setUp(self):
super(TestImage, self).setUp()
self.image = pick_image(self.demo_cloud.nova_client.images.list())
self.image = pick_image(self.user_cloud.nova_client.images.list())
def test_create_image(self):
test_image = tempfile.NamedTemporaryFile(delete=False)
@ -36,7 +36,7 @@ class TestImage(base.BaseFunctionalTestCase):
test_image.close()
image_name = self.getUniqueString('image')
try:
self.demo_cloud.create_image(
self.user_cloud.create_image(
name=image_name,
filename=test_image.name,
disk_format='raw',
@ -45,7 +45,7 @@ class TestImage(base.BaseFunctionalTestCase):
min_ram=1024,
wait=True)
finally:
self.demo_cloud.delete_image(image_name, wait=True)
self.user_cloud.delete_image(image_name, wait=True)
def test_download_image(self):
test_image = tempfile.NamedTemporaryFile(delete=False)
@ -53,7 +53,7 @@ class TestImage(base.BaseFunctionalTestCase):
test_image.write('\0' * 1024 * 1024)
test_image.close()
image_name = self.getUniqueString('image')
self.demo_cloud.create_image(
self.user_cloud.create_image(
name=image_name,
filename=test_image.name,
disk_format='raw',
@ -61,9 +61,9 @@ class TestImage(base.BaseFunctionalTestCase):
min_disk=10,
min_ram=1024,
wait=True)
self.addCleanup(self.demo_cloud.delete_image, image_name, wait=True)
self.addCleanup(self.user_cloud.delete_image, image_name, wait=True)
output = os.path.join(tempfile.gettempdir(), self.getUniqueString())
self.demo_cloud.download_image(image_name, output)
self.user_cloud.download_image(image_name, output)
self.addCleanup(os.remove, output)
self.assertTrue(filecmp.cmp(test_image.name, output),
"Downloaded contents don't match created image")
@ -74,7 +74,7 @@ class TestImage(base.BaseFunctionalTestCase):
test_image.close()
image_name = self.getUniqueString('image')
try:
first_image = self.demo_cloud.create_image(
first_image = self.user_cloud.create_image(
name=image_name,
filename=test_image.name,
disk_format='raw',
@ -82,7 +82,7 @@ class TestImage(base.BaseFunctionalTestCase):
min_disk=10,
min_ram=1024,
wait=True)
second_image = self.demo_cloud.create_image(
second_image = self.user_cloud.create_image(
name=image_name,
filename=test_image.name,
disk_format='raw',
@ -92,7 +92,7 @@ class TestImage(base.BaseFunctionalTestCase):
wait=True)
self.assertEqual(first_image.id, second_image.id)
finally:
self.demo_cloud.delete_image(image_name, wait=True)
self.user_cloud.delete_image(image_name, wait=True)
def test_create_image_force_duplicate(self):
test_image = tempfile.NamedTemporaryFile(delete=False)
@ -102,7 +102,7 @@ class TestImage(base.BaseFunctionalTestCase):
first_image = None
second_image = None
try:
first_image = self.demo_cloud.create_image(
first_image = self.user_cloud.create_image(
name=image_name,
filename=test_image.name,
disk_format='raw',
@ -110,7 +110,7 @@ class TestImage(base.BaseFunctionalTestCase):
min_disk=10,
min_ram=1024,
wait=True)
second_image = self.demo_cloud.create_image(
second_image = self.user_cloud.create_image(
name=image_name,
filename=test_image.name,
disk_format='raw',
@ -122,9 +122,9 @@ class TestImage(base.BaseFunctionalTestCase):
self.assertNotEqual(first_image.id, second_image.id)
finally:
if first_image:
self.demo_cloud.delete_image(first_image.id, wait=True)
self.user_cloud.delete_image(first_image.id, wait=True)
if second_image:
self.demo_cloud.delete_image(second_image.id, wait=True)
self.user_cloud.delete_image(second_image.id, wait=True)
def test_create_image_update_properties(self):
test_image = tempfile.NamedTemporaryFile(delete=False)
@ -132,7 +132,7 @@ class TestImage(base.BaseFunctionalTestCase):
test_image.close()
image_name = self.getUniqueString('image')
try:
image = self.demo_cloud.create_image(
image = self.user_cloud.create_image(
name=image_name,
filename=test_image.name,
disk_format='raw',
@ -140,12 +140,12 @@ class TestImage(base.BaseFunctionalTestCase):
min_disk=10,
min_ram=1024,
wait=True)
self.demo_cloud.update_image_properties(
self.user_cloud.update_image_properties(
image=image,
name=image_name,
foo='bar')
image = self.demo_cloud.get_image(image_name)
image = self.user_cloud.get_image(image_name)
self.assertIn('foo', image.properties)
self.assertEqual(image.properties['foo'], 'bar')
finally:
self.demo_cloud.delete_image(image_name, wait=True)
self.user_cloud.delete_image(image_name, wait=True)

View File

@ -23,7 +23,7 @@ class TestUsage(base.BaseFunctionalTestCase):
def test_get_our_limits(self):
'''Test quotas functionality'''
limits = self.demo_cloud.get_compute_limits()
limits = self.user_cloud.get_compute_limits()
self.assertIsNotNone(limits)
self.assertTrue(hasattr(limits, 'max_server_meta'))

View File

@ -31,17 +31,17 @@ class TestObject(base.BaseFunctionalTestCase):
def setUp(self):
super(TestObject, self).setUp()
if not self.demo_cloud.has_service('object-store'):
if not self.user_cloud.has_service('object-store'):
self.skipTest('Object service not supported by cloud')
def test_create_object(self):
'''Test uploading small and large files.'''
container_name = self.getUniqueString('container')
self.addDetail('container', content.text_content(container_name))
self.addCleanup(self.demo_cloud.delete_container, container_name)
self.demo_cloud.create_container(container_name)
self.addCleanup(self.user_cloud.delete_container, container_name)
self.user_cloud.create_container(container_name)
self.assertEqual(container_name,
self.demo_cloud.list_containers()[0]['name'])
self.user_cloud.list_containers()[0]['name'])
sizes = (
(64 * 1024, 1), # 64K, one segment
(64 * 1024, 5) # 64MB, 5 segments
@ -57,30 +57,30 @@ class TestObject(base.BaseFunctionalTestCase):
fake_file.flush()
name = 'test-%d' % size
self.addCleanup(
self.demo_cloud.delete_object, container_name, name)
self.demo_cloud.create_object(
self.user_cloud.delete_object, container_name, name)
self.user_cloud.create_object(
container_name, name,
fake_file.name,
segment_size=segment_size,
metadata={'foo': 'bar'})
self.assertFalse(self.demo_cloud.is_object_stale(
self.assertFalse(self.user_cloud.is_object_stale(
container_name, name,
fake_file.name
)
)
self.assertEqual(
'bar', self.demo_cloud.get_object_metadata(
'bar', self.user_cloud.get_object_metadata(
container_name, name)['x-object-meta-foo']
)
self.demo_cloud.update_object(container=container_name, name=name,
self.user_cloud.update_object(container=container_name, name=name,
metadata={'testk': 'testv'})
self.assertEqual(
'testv', self.demo_cloud.get_object_metadata(
'testv', self.user_cloud.get_object_metadata(
container_name, name)['x-object-meta-testk']
)
try:
self.assertIsNotNone(
self.demo_cloud.get_object(container_name, name))
self.user_cloud.get_object(container_name, name))
except exc.OpenStackCloudException as e:
self.addDetail(
'failed_response',
@ -90,22 +90,22 @@ class TestObject(base.BaseFunctionalTestCase):
content.text_content(e.response.text))
self.assertEqual(
name,
self.demo_cloud.list_objects(container_name)[0]['name'])
self.user_cloud.list_objects(container_name)[0]['name'])
self.assertTrue(
self.demo_cloud.delete_object(container_name, name))
self.assertEqual([], self.demo_cloud.list_objects(container_name))
self.user_cloud.delete_object(container_name, name))
self.assertEqual([], self.user_cloud.list_objects(container_name))
self.assertEqual(container_name,
self.demo_cloud.list_containers()[0]['name'])
self.demo_cloud.delete_container(container_name)
self.user_cloud.list_containers()[0]['name'])
self.user_cloud.delete_container(container_name)
def test_download_object_to_file(self):
'''Test uploading small and large files.'''
container_name = self.getUniqueString('container')
self.addDetail('container', content.text_content(container_name))
self.addCleanup(self.demo_cloud.delete_container, container_name)
self.demo_cloud.create_container(container_name)
self.addCleanup(self.user_cloud.delete_container, container_name)
self.user_cloud.create_container(container_name)
self.assertEqual(container_name,
self.demo_cloud.list_containers()[0]['name'])
self.user_cloud.list_containers()[0]['name'])
sizes = (
(64 * 1024, 1), # 64K, one segment
(64 * 1024, 5) # 64MB, 5 segments
@ -122,30 +122,30 @@ class TestObject(base.BaseFunctionalTestCase):
fake_file.flush()
name = 'test-%d' % size
self.addCleanup(
self.demo_cloud.delete_object, container_name, name)
self.demo_cloud.create_object(
self.user_cloud.delete_object, container_name, name)
self.user_cloud.create_object(
container_name, name,
fake_file.name,
segment_size=segment_size,
metadata={'foo': 'bar'})
self.assertFalse(self.demo_cloud.is_object_stale(
self.assertFalse(self.user_cloud.is_object_stale(
container_name, name,
fake_file.name
)
)
self.assertEqual(
'bar', self.demo_cloud.get_object_metadata(
'bar', self.user_cloud.get_object_metadata(
container_name, name)['x-object-meta-foo']
)
self.demo_cloud.update_object(container=container_name, name=name,
self.user_cloud.update_object(container=container_name, name=name,
metadata={'testk': 'testv'})
self.assertEqual(
'testv', self.demo_cloud.get_object_metadata(
'testv', self.user_cloud.get_object_metadata(
container_name, name)['x-object-meta-testk']
)
try:
with tempfile.NamedTemporaryFile() as fake_file:
self.demo_cloud.get_object(
self.user_cloud.get_object(
container_name, name, outfile=fake_file.name)
downloaded_content = open(fake_file.name, 'rb').read()
self.assertEqual(fake_content, downloaded_content)
@ -159,10 +159,10 @@ class TestObject(base.BaseFunctionalTestCase):
raise
self.assertEqual(
name,
self.demo_cloud.list_objects(container_name)[0]['name'])
self.user_cloud.list_objects(container_name)[0]['name'])
self.assertTrue(
self.demo_cloud.delete_object(container_name, name))
self.assertEqual([], self.demo_cloud.list_objects(container_name))
self.user_cloud.delete_object(container_name, name))
self.assertEqual([], self.user_cloud.list_objects(container_name))
self.assertEqual(container_name,
self.demo_cloud.list_containers()[0]['name'])
self.demo_cloud.delete_container(container_name)
self.user_cloud.list_containers()[0]['name'])
self.user_cloud.delete_container(container_name)

View File

@ -30,14 +30,14 @@ class TestRangeSearch(base.BaseFunctionalTestCase):
return new_results
def test_range_search_bad_range(self):
flavors = self.demo_cloud.list_flavors()
flavors = self.user_cloud.list_flavors()
self.assertRaises(
exc.OpenStackCloudException,
self.demo_cloud.range_search, flavors, {"ram": "<1a0"})
self.user_cloud.range_search, flavors, {"ram": "<1a0"})
def test_range_search_exact(self):
flavors = self.demo_cloud.list_flavors()
result = self.demo_cloud.range_search(flavors, {"ram": "4096"})
flavors = self.user_cloud.list_flavors()
result = self.user_cloud.range_search(flavors, {"ram": "4096"})
self.assertIsInstance(result, list)
# should only be 1 m1 flavor with 4096 ram
result = self._filter_m1_flavors(result)
@ -45,23 +45,23 @@ class TestRangeSearch(base.BaseFunctionalTestCase):
self.assertEqual("m1.medium", result[0]['name'])
def test_range_search_min(self):
flavors = self.demo_cloud.list_flavors()
result = self.demo_cloud.range_search(flavors, {"ram": "MIN"})
flavors = self.user_cloud.list_flavors()
result = self.user_cloud.range_search(flavors, {"ram": "MIN"})
self.assertIsInstance(result, list)
self.assertEqual(1, len(result))
# older devstack does not have cirros256
self.assertTrue(result[0]['name'] in ('cirros256', 'm1.tiny'))
def test_range_search_max(self):
flavors = self.demo_cloud.list_flavors()
result = self.demo_cloud.range_search(flavors, {"ram": "MAX"})
flavors = self.user_cloud.list_flavors()
result = self.user_cloud.range_search(flavors, {"ram": "MAX"})
self.assertIsInstance(result, list)
self.assertEqual(1, len(result))
self.assertEqual("m1.xlarge", result[0]['name'])
def test_range_search_lt(self):
flavors = self.demo_cloud.list_flavors()
result = self.demo_cloud.range_search(flavors, {"ram": "<1024"})
flavors = self.user_cloud.list_flavors()
result = self.user_cloud.range_search(flavors, {"ram": "<1024"})
self.assertIsInstance(result, list)
# should only be 1 m1 flavor with <1024 ram
result = self._filter_m1_flavors(result)
@ -69,8 +69,8 @@ class TestRangeSearch(base.BaseFunctionalTestCase):
self.assertEqual("m1.tiny", result[0]['name'])
def test_range_search_gt(self):
flavors = self.demo_cloud.list_flavors()
result = self.demo_cloud.range_search(flavors, {"ram": ">4096"})
flavors = self.user_cloud.list_flavors()
result = self.user_cloud.range_search(flavors, {"ram": ">4096"})
self.assertIsInstance(result, list)
# should only be 2 m1 flavors with >4096 ram
result = self._filter_m1_flavors(result)
@ -80,8 +80,8 @@ class TestRangeSearch(base.BaseFunctionalTestCase):
self.assertIn("m1.xlarge", flavor_names)
def test_range_search_le(self):
flavors = self.demo_cloud.list_flavors()
result = self.demo_cloud.range_search(flavors, {"ram": "<=4096"})
flavors = self.user_cloud.list_flavors()
result = self.user_cloud.range_search(flavors, {"ram": "<=4096"})
self.assertIsInstance(result, list)
# should only be 3 m1 flavors with <=4096 ram
result = self._filter_m1_flavors(result)
@ -92,8 +92,8 @@ class TestRangeSearch(base.BaseFunctionalTestCase):
self.assertIn("m1.medium", flavor_names)
def test_range_search_ge(self):
flavors = self.demo_cloud.list_flavors()
result = self.demo_cloud.range_search(flavors, {"ram": ">=4096"})
flavors = self.user_cloud.list_flavors()
result = self.user_cloud.range_search(flavors, {"ram": ">=4096"})
self.assertIsInstance(result, list)
# should only be 3 m1 flavors with >=4096 ram
result = self._filter_m1_flavors(result)
@ -104,8 +104,8 @@ class TestRangeSearch(base.BaseFunctionalTestCase):
self.assertIn("m1.xlarge", flavor_names)
def test_range_search_multi_1(self):
flavors = self.demo_cloud.list_flavors()
result = self.demo_cloud.range_search(
flavors = self.user_cloud.list_flavors()
result = self.user_cloud.range_search(
flavors, {"ram": "MIN", "vcpus": "MIN"})
self.assertIsInstance(result, list)
self.assertEqual(1, len(result))
@ -113,8 +113,8 @@ class TestRangeSearch(base.BaseFunctionalTestCase):
self.assertTrue(result[0]['name'] in ('cirros256', 'm1.tiny'))
def test_range_search_multi_2(self):
flavors = self.demo_cloud.list_flavors()
result = self.demo_cloud.range_search(
flavors = self.user_cloud.list_flavors()
result = self.user_cloud.range_search(
flavors, {"ram": "<1024", "vcpus": "MIN"})
self.assertIsInstance(result, list)
result = self._filter_m1_flavors(result)
@ -123,8 +123,8 @@ class TestRangeSearch(base.BaseFunctionalTestCase):
self.assertIn("m1.tiny", flavor_names)
def test_range_search_multi_3(self):
flavors = self.demo_cloud.list_flavors()
result = self.demo_cloud.range_search(
flavors = self.user_cloud.list_flavors()
result = self.user_cloud.range_search(
flavors, {"ram": ">=4096", "vcpus": "<6"})
self.assertIsInstance(result, list)
result = self._filter_m1_flavors(result)
@ -134,8 +134,8 @@ class TestRangeSearch(base.BaseFunctionalTestCase):
self.assertIn("m1.large", flavor_names)
def test_range_search_multi_4(self):
flavors = self.demo_cloud.list_flavors()
result = self.demo_cloud.range_search(
flavors = self.user_cloud.list_flavors()
result = self.user_cloud.range_search(
flavors, {"ram": ">=4096", "vcpus": "MAX"})
self.assertIsInstance(result, list)
self.assertEqual(1, len(result))

View File

@ -26,7 +26,7 @@ class TestRecordset(base.BaseFunctionalTestCase):
def setUp(self):
super(TestRecordset, self).setUp()
if not self.demo_cloud.has_service('dns'):
if not self.user_cloud.has_service('dns'):
self.skipTest('dns service not supported by cloud')
def test_recordsets(self):
@ -44,10 +44,10 @@ class TestRecordset(base.BaseFunctionalTestCase):
self.addCleanup(self.cleanup, zone, name)
# Create a zone to hold the tested recordset
zone_obj = self.demo_cloud.create_zone(name=zone, email=email)
zone_obj = self.user_cloud.create_zone(name=zone, email=email)
# Test we can create a recordset and we get it returned
created_recordset = self.demo_cloud.create_recordset(zone, name, type_,
created_recordset = self.user_cloud.create_recordset(zone, name, type_,
records,
description, ttl)
self.assertEqual(created_recordset['zone_id'], zone_obj['id'])
@ -58,21 +58,21 @@ class TestRecordset(base.BaseFunctionalTestCase):
self.assertEqual(created_recordset['ttl'], ttl)
# Test that we can list recordsets
recordsets = self.demo_cloud.list_recordsets(zone)
recordsets = self.user_cloud.list_recordsets(zone)
self.assertIsNotNone(recordsets)
# Test we get the same recordset with the get_recordset method
get_recordset = self.demo_cloud.get_recordset(zone,
get_recordset = self.user_cloud.get_recordset(zone,
created_recordset['id'])
self.assertEqual(get_recordset['id'], created_recordset['id'])
# Test the get method also works by name
get_recordset = self.demo_cloud.get_recordset(zone, name + '.' + zone)
get_recordset = self.user_cloud.get_recordset(zone, name + '.' + zone)
self.assertEqual(get_recordset['id'], created_recordset['id'])
# Test we can update a field on the recordset and only that field
# is updated
updated_recordset = self.demo_cloud.update_recordset(zone_obj['id'],
updated_recordset = self.user_cloud.update_recordset(zone_obj['id'],
name + '.' + zone,
ttl=7200)
self.assertEqual(updated_recordset['id'], created_recordset['id'])
@ -83,11 +83,11 @@ class TestRecordset(base.BaseFunctionalTestCase):
self.assertEqual(updated_recordset['ttl'], 7200)
# Test we can delete and get True returned
deleted_recordset = self.demo_cloud.delete_recordset(
deleted_recordset = self.user_cloud.delete_recordset(
zone, name + '.' + zone)
self.assertTrue(deleted_recordset)
def cleanup(self, zone_name, recordset_name):
self.demo_cloud.delete_recordset(
self.user_cloud.delete_recordset(
zone_name, recordset_name + '.' + zone_name)
self.demo_cloud.delete_zone(zone_name)
self.user_cloud.delete_zone(zone_name)

View File

@ -22,14 +22,14 @@ from shade.tests.functional import base
class TestSecurityGroups(base.BaseFunctionalTestCase):
def test_create_list_security_groups(self):
sg1 = self.demo_cloud.create_security_group(
sg1 = self.user_cloud.create_security_group(
name="sg1", description="sg1")
self.addCleanup(self.demo_cloud.delete_security_group, sg1['id'])
self.addCleanup(self.user_cloud.delete_security_group, sg1['id'])
sg2 = self.operator_cloud.create_security_group(
name="sg2", description="sg2")
self.addCleanup(self.operator_cloud.delete_security_group, sg2['id'])
if self.demo_cloud.has_service('network'):
if self.user_cloud.has_service('network'):
# Neutron defaults to all_tenants=1 when admin
sg_list = self.operator_cloud.list_security_groups()
self.assertIn(sg1['id'], [sg['id'] for sg in sg_list])
@ -37,7 +37,7 @@ class TestSecurityGroups(base.BaseFunctionalTestCase):
# Filter by tenant_id (filtering by project_id won't work with
# Keystone V2)
sg_list = self.operator_cloud.list_security_groups(
filters={'tenant_id': self.demo_cloud.current_project_id})
filters={'tenant_id': self.user_cloud.current_project_id})
self.assertIn(sg1['id'], [sg['id'] for sg in sg_list])
self.assertNotIn(sg2['id'], [sg['id'] for sg in sg_list])

View File

@ -25,16 +25,16 @@ class TestServerGroup(base.BaseFunctionalTestCase):
def test_server_group(self):
server_group_name = self.getUniqueString()
self.addCleanup(self.cleanup, server_group_name)
server_group = self.demo_cloud.create_server_group(
server_group = self.user_cloud.create_server_group(
server_group_name, ['affinity'])
server_group_ids = [v['id']
for v in self.demo_cloud.list_server_groups()]
for v in self.user_cloud.list_server_groups()]
self.assertIn(server_group['id'], server_group_ids)
self.demo_cloud.delete_server_group(server_group_name)
self.user_cloud.delete_server_group(server_group_name)
def cleanup(self, server_group_name):
server_group = self.demo_cloud.get_server_group(server_group_name)
server_group = self.user_cloud.get_server_group(server_group_name)
if server_group:
self.demo_cloud.delete_server_group(server_group['id'])
self.user_cloud.delete_server_group(server_group['id'])

View File

@ -75,12 +75,12 @@ class TestStack(base.BaseFunctionalTestCase):
def setUp(self):
super(TestStack, self).setUp()
if not self.demo_cloud.has_service('orchestration'):
if not self.user_cloud.has_service('orchestration'):
self.skipTest('Orchestration service not supported by cloud')
def _cleanup_stack(self):
self.demo_cloud.delete_stack(self.stack_name, wait=True)
self.assertIsNone(self.demo_cloud.get_stack(self.stack_name))
self.user_cloud.delete_stack(self.stack_name, wait=True)
self.assertIsNone(self.user_cloud.get_stack(self.stack_name))
def test_stack_validation(self):
test_template = tempfile.NamedTemporaryFile(delete=False)
@ -88,7 +88,7 @@ class TestStack(base.BaseFunctionalTestCase):
test_template.close()
stack_name = self.getUniqueString('validate_template')
self.assertRaises(exc.OpenStackCloudException,
self.demo_cloud.create_stack,
self.user_cloud.create_stack,
name=stack_name,
template_file=test_template.name)
@ -98,7 +98,7 @@ class TestStack(base.BaseFunctionalTestCase):
test_template.close()
self.stack_name = self.getUniqueString('simple_stack')
self.addCleanup(self._cleanup_stack)
stack = self.demo_cloud.create_stack(
stack = self.user_cloud.create_stack(
name=self.stack_name,
template_file=test_template.name,
wait=True)
@ -109,17 +109,17 @@ class TestStack(base.BaseFunctionalTestCase):
self.assertEqual(10, len(rand))
# assert get_stack matches returned create_stack
stack = self.demo_cloud.get_stack(self.stack_name)
stack = self.user_cloud.get_stack(self.stack_name)
self.assertEqual('CREATE_COMPLETE', stack['stack_status'])
self.assertEqual(rand, stack['outputs'][0]['output_value'])
# assert stack is in list_stacks
stacks = self.demo_cloud.list_stacks()
stacks = self.user_cloud.list_stacks()
stack_ids = [s['id'] for s in stacks]
self.assertIn(stack['id'], stack_ids)
# update with no changes
stack = self.demo_cloud.update_stack(
stack = self.user_cloud.update_stack(
self.stack_name,
template_file=test_template.name,
wait=True)
@ -130,14 +130,14 @@ class TestStack(base.BaseFunctionalTestCase):
self.assertEqual(rand, stack['outputs'][0]['output_value'])
# update with changes
stack = self.demo_cloud.update_stack(
stack = self.user_cloud.update_stack(
self.stack_name,
template_file=test_template.name,
wait=True,
length=12)
# assert changed output in updated stack
stack = self.demo_cloud.get_stack(self.stack_name)
stack = self.user_cloud.get_stack(self.stack_name)
self.assertEqual('UPDATE_COMPLETE', stack['stack_status'])
new_rand = stack['outputs'][0]['output_value']
self.assertNotEqual(rand, new_rand)
@ -160,7 +160,7 @@ class TestStack(base.BaseFunctionalTestCase):
self.stack_name = self.getUniqueString('nested_stack')
self.addCleanup(self._cleanup_stack)
stack = self.demo_cloud.create_stack(
stack = self.user_cloud.create_stack(
name=self.stack_name,
template_file=test_template.name,
environment_files=[env.name],

View File

@ -26,7 +26,7 @@ class TestVolume(base.BaseFunctionalTestCase):
def setUp(self):
super(TestVolume, self).setUp()
if not self.demo_cloud.has_service('volume'):
if not self.user_cloud.has_service('volume'):
self.skipTest('volume service not supported by cloud')
def test_volumes(self):
@ -35,26 +35,26 @@ class TestVolume(base.BaseFunctionalTestCase):
snapshot_name = self.getUniqueString()
self.addDetail('volume', content.text_content(volume_name))
self.addCleanup(self.cleanup, volume_name, snapshot_name=snapshot_name)
volume = self.demo_cloud.create_volume(
volume = self.user_cloud.create_volume(
display_name=volume_name, size=1)
snapshot = self.demo_cloud.create_volume_snapshot(
snapshot = self.user_cloud.create_volume_snapshot(
volume['id'],
display_name=snapshot_name
)
volume_ids = [v['id'] for v in self.demo_cloud.list_volumes()]
volume_ids = [v['id'] for v in self.user_cloud.list_volumes()]
self.assertIn(volume['id'], volume_ids)
snapshot_list = self.demo_cloud.list_volume_snapshots()
snapshot_list = self.user_cloud.list_volume_snapshots()
snapshot_ids = [s['id'] for s in snapshot_list]
self.assertIn(snapshot['id'], snapshot_ids)
ret_snapshot = self.demo_cloud.get_volume_snapshot_by_id(
ret_snapshot = self.user_cloud.get_volume_snapshot_by_id(
snapshot['id'])
self.assertEqual(snapshot['id'], ret_snapshot['id'])
self.demo_cloud.delete_volume_snapshot(snapshot_name, wait=True)
self.demo_cloud.delete_volume(volume_name, wait=True)
self.user_cloud.delete_volume_snapshot(snapshot_name, wait=True)
self.user_cloud.delete_volume(volume_name, wait=True)
def test_volume_to_image(self):
'''Test volume export to image functionality'''
@ -62,32 +62,32 @@ class TestVolume(base.BaseFunctionalTestCase):
image_name = self.getUniqueString()
self.addDetail('volume', content.text_content(volume_name))
self.addCleanup(self.cleanup, volume_name, image_name=image_name)
volume = self.demo_cloud.create_volume(
volume = self.user_cloud.create_volume(
display_name=volume_name, size=1)
image = self.demo_cloud.create_image(
image = self.user_cloud.create_image(
image_name, volume=volume, wait=True)
volume_ids = [v['id'] for v in self.demo_cloud.list_volumes()]
volume_ids = [v['id'] for v in self.user_cloud.list_volumes()]
self.assertIn(volume['id'], volume_ids)
image_list = self.demo_cloud.list_images()
image_list = self.user_cloud.list_images()
image_ids = [s['id'] for s in image_list]
self.assertIn(image['id'], image_ids)
self.demo_cloud.delete_image(image_name, wait=True)
self.demo_cloud.delete_volume(volume_name, wait=True)
self.user_cloud.delete_image(image_name, wait=True)
self.user_cloud.delete_volume(volume_name, wait=True)
def cleanup(self, volume_name, snapshot_name=None, image_name=None):
# Need to delete snapshots before volumes
if snapshot_name:
snapshot = self.demo_cloud.get_volume_snapshot(snapshot_name)
snapshot = self.user_cloud.get_volume_snapshot(snapshot_name)
if snapshot:
self.demo_cloud.delete_volume_snapshot(
self.user_cloud.delete_volume_snapshot(
snapshot_name, wait=True)
if image_name:
image = self.demo_cloud.get_image(image_name)
image = self.user_cloud.get_image(image_name)
if image:
self.demo_cloud.delete_image(image_name, wait=True)
volume = self.demo_cloud.get_volume(volume_name)
self.user_cloud.delete_image(image_name, wait=True)
volume = self.user_cloud.get_volume(volume_name)
if volume:
self.demo_cloud.delete_volume(volume_name, wait=True)
self.user_cloud.delete_volume(volume_name, wait=True)

View File

@ -18,56 +18,56 @@ class TestVolume(base.BaseFunctionalTestCase):
def setUp(self):
super(TestVolume, self).setUp()
if not self.demo_cloud.has_service('volume'):
if not self.user_cloud.has_service('volume'):
self.skipTest('volume service not supported by cloud')
if not self.demo_cloud.has_service('object-store'):
if not self.user_cloud.has_service('object-store'):
self.skipTest('volume backups require swift')
def test_create_get_delete_volume_backup(self):
volume = self.demo_cloud.create_volume(
volume = self.user_cloud.create_volume(
display_name=self.getUniqueString(), size=1)
self.addCleanup(self.demo_cloud.delete_volume, volume['id'])
self.addCleanup(self.user_cloud.delete_volume, volume['id'])
backup_name_1 = self.getUniqueString()
backup_desc_1 = self.getUniqueString()
backup = self.demo_cloud.create_volume_backup(
backup = self.user_cloud.create_volume_backup(
volume_id=volume['id'], name=backup_name_1,
description=backup_desc_1, wait=True)
self.assertEqual(backup_name_1, backup['name'])
backup = self.demo_cloud.get_volume_backup(backup['id'])
backup = self.user_cloud.get_volume_backup(backup['id'])
self.assertEqual("available", backup['status'])
self.assertEqual(backup_desc_1, backup['description'])
self.demo_cloud.delete_volume_backup(backup['id'], wait=True)
self.assertIsNone(self.demo_cloud.get_volume_backup(backup['id']))
self.user_cloud.delete_volume_backup(backup['id'], wait=True)
self.assertIsNone(self.user_cloud.get_volume_backup(backup['id']))
def test_list_volume_backups(self):
vol1 = self.demo_cloud.create_volume(
vol1 = self.user_cloud.create_volume(
display_name=self.getUniqueString(), size=1)
self.addCleanup(self.demo_cloud.delete_volume, vol1['id'])
self.addCleanup(self.user_cloud.delete_volume, vol1['id'])
# We create 2 volumes to create 2 backups. We could have created 2
# backups from the same volume but taking 2 successive backups seems
# to be race-condition prone. And I didn't want to use an ugly sleep()
# here.
vol2 = self.demo_cloud.create_volume(
vol2 = self.user_cloud.create_volume(
display_name=self.getUniqueString(), size=1)
self.addCleanup(self.demo_cloud.delete_volume, vol2['id'])
self.addCleanup(self.user_cloud.delete_volume, vol2['id'])
backup_name_1 = self.getUniqueString()
backup = self.demo_cloud.create_volume_backup(
backup = self.user_cloud.create_volume_backup(
volume_id=vol1['id'], name=backup_name_1)
self.addCleanup(self.demo_cloud.delete_volume_backup, backup['id'])
self.addCleanup(self.user_cloud.delete_volume_backup, backup['id'])
backup = self.demo_cloud.create_volume_backup(volume_id=vol2['id'])
self.addCleanup(self.demo_cloud.delete_volume_backup, backup['id'])
backup = self.user_cloud.create_volume_backup(volume_id=vol2['id'])
self.addCleanup(self.user_cloud.delete_volume_backup, backup['id'])
backups = self.demo_cloud.list_volume_backups()
backups = self.user_cloud.list_volume_backups()
self.assertEqual(2, len(backups))
backups = self.demo_cloud.list_volume_backups(
backups = self.user_cloud.list_volume_backups(
search_opts={"name": backup_name_1})
self.assertEqual(1, len(backups))
self.assertEqual(backup_name_1, backups[0]['name'])

View File

@ -32,7 +32,7 @@ class TestVolumeType(base.BaseFunctionalTestCase):
def setUp(self):
super(TestVolumeType, self).setUp()
if not self.demo_cloud.has_service('volume'):
if not self.user_cloud.has_service('volume'):
self.skipTest('volume service not supported by cloud')
self.operator_cloud.cinder_client.volume_types.create(
'test-volume-type', is_public=False)

View File

@ -26,7 +26,7 @@ class TestZone(base.BaseFunctionalTestCase):
def setUp(self):
super(TestZone, self).setUp()
if not self.demo_cloud.has_service('dns'):
if not self.user_cloud.has_service('dns'):
self.skipTest('dns service not supported by cloud')
def test_zones(self):
@ -42,7 +42,7 @@ class TestZone(base.BaseFunctionalTestCase):
self.addCleanup(self.cleanup, name)
# Test we can create a zone and we get it returned
zone = self.demo_cloud.create_zone(
zone = self.user_cloud.create_zone(
name=name, zone_type=zone_type, email=email,
description=description, ttl=ttl,
masters=masters)
@ -54,20 +54,20 @@ class TestZone(base.BaseFunctionalTestCase):
self.assertEqual(zone['masters'], [])
# Test that we can list zones
zones = self.demo_cloud.list_zones()
zones = self.user_cloud.list_zones()
self.assertIsNotNone(zones)
# Test we get the same zone with the get_zone method
zone_get = self.demo_cloud.get_zone(zone['id'])
zone_get = self.user_cloud.get_zone(zone['id'])
self.assertEqual(zone_get['id'], zone['id'])
# Test the get method also works by name
zone_get = self.demo_cloud.get_zone(name)
zone_get = self.user_cloud.get_zone(name)
self.assertEqual(zone_get['name'], zone['name'])
# Test we can update a field on the zone and only that field
# is updated
zone_update = self.demo_cloud.update_zone(zone['id'], ttl=7200)
zone_update = self.user_cloud.update_zone(zone['id'], ttl=7200)
self.assertEqual(zone_update['id'], zone['id'])
self.assertEqual(zone_update['name'], zone['name'])
self.assertEqual(zone_update['type'], zone['type'])
@ -77,8 +77,8 @@ class TestZone(base.BaseFunctionalTestCase):
self.assertEqual(zone_update['masters'], zone['masters'])
# Test we can delete and get True returned
zone_delete = self.demo_cloud.delete_zone(zone['id'])
zone_delete = self.user_cloud.delete_zone(zone['id'])
self.assertTrue(zone_delete)
def cleanup(self, name):
self.demo_cloud.delete_zone(name)
self.user_cloud.delete_zone(name)