Remove the wrappers and scenario utils of keystone, cinder, glance

We switched these wrappers to servcie model several versions ago, it
is the time to remove them.
Also,fix the unittest issue of test_setup_and_cleanup_existing_image

Change-Id: I0ff4054abcd5f9c3b4584f260ff8767f09636bd3
This commit is contained in:
chenhb 2018-08-02 16:15:46 +08:00
parent 13e3540ea3
commit 15253a86fb
16 changed files with 17 additions and 2978 deletions

View File

@ -19,6 +19,16 @@ Changelog
[unreleased] - 2018-07-20 [unreleased] - 2018-07-20
------------------------- -------------------------
Removed
~~~~~~~
* Remove deprecated wrappers (rally_openstack.wrappers) and
helpers (scenario utils) for keystone, cinder, glance
services. The new service model should be used instead
(see ``rally_openstack.services`` module for more details)
while developing custom plugins. All the inner plugins used
the new code for a long time.
Fixed Fixed
~~~~~ ~~~~~

View File

@ -17,14 +17,9 @@ import random
from rally.common import cfg from rally.common import cfg
from rally.common import logging from rally.common import logging
from rally import exceptions
from rally.task import atomic
from rally.task import utils as bench_utils
from rally_openstack import scenario from rally_openstack import scenario
from rally_openstack.services.storage import block from rally_openstack.services.storage import block
from rally_openstack.wrappers import cinder as cinder_wrapper
from rally_openstack.wrappers import glance as glance_wrapper
CONF = cfg.CONF CONF = cfg.CONF
@ -46,462 +41,3 @@ class CinderBasic(scenario.OpenStackScenario):
def get_random_server(self): def get_random_server(self):
server_id = random.choice(self.context["tenant"]["servers"]) server_id = random.choice(self.context["tenant"]["servers"])
return self.clients("nova").servers.get(server_id) return self.clients("nova").servers.get(server_id)
class CinderScenario(scenario.OpenStackScenario):
"""Base class for Cinder scenarios with basic atomic actions."""
def __init__(self, context=None, admin_clients=None, clients=None):
super(CinderScenario, self).__init__(context, admin_clients, clients)
LOG.warning(
"Class %s is deprecated since Rally 0.10.0 and will be removed "
"soon. Use "
"rally.plugins.openstack.services.storage.block.BlockStorage "
"instead." % self.__class__)
@atomic.action_timer("cinder.list_volumes")
def _list_volumes(self, detailed=True):
"""Returns user volumes list."""
return self.clients("cinder").volumes.list(detailed)
@atomic.action_timer("cinder.get_volume")
def _get_volume(self, volume_id):
"""get volume detailed information.
:param volume_id: id of volume
:returns: class:`Volume`
"""
return self.clients("cinder").volumes.get(volume_id)
@atomic.action_timer("cinder.list_snapshots")
def _list_snapshots(self, detailed=True):
"""Returns user snapshots list."""
return self.clients("cinder").volume_snapshots.list(detailed)
@atomic.action_timer("cinder.list_types")
def _list_types(self, search_opts=None, is_public=None):
"""Lists all volume types.
:param search_opts: Options used when search for volume types
:param is_public: If query public volume type
:returns: A list of volume types
"""
return self.clients("cinder").volume_types.list(search_opts,
is_public)
def _set_metadata(self, volume, sets=10, set_size=3):
"""Set volume metadata.
:param volume: The volume to set metadata on
:param sets: how many operations to perform
:param set_size: number of metadata keys to set in each operation
:returns: A list of keys that were set
"""
key = "cinder.set_%s_metadatas_%s_times" % (set_size, sets)
with atomic.ActionTimer(self, key):
keys = []
for i in range(sets):
metadata = {}
for j in range(set_size):
key = self.generate_random_name()
keys.append(key)
metadata[key] = self.generate_random_name()
self.clients("cinder").volumes.set_metadata(volume, metadata)
return keys
def _delete_metadata(self, volume, keys, deletes=10, delete_size=3):
"""Delete volume metadata keys.
Note that ``len(keys)`` must be greater than or equal to
``deletes * delete_size``.
:param volume: The volume to delete metadata from
:param deletes: how many operations to perform
:param delete_size: number of metadata keys to delete in each operation
:param keys: a list of keys to choose deletion candidates from
"""
if len(keys) < deletes * delete_size:
raise exceptions.InvalidArgumentsException(
"Not enough metadata keys to delete: "
"%(num_keys)s keys, but asked to delete %(num_deletes)s" %
{"num_keys": len(keys),
"num_deletes": deletes * delete_size})
# make a shallow copy of the list of keys so that, when we pop
# from it later, we don't modify the original list.
keys = list(keys)
random.shuffle(keys)
action_name = "cinder.delete_%s_metadatas_%s_times" % (delete_size,
deletes)
with atomic.ActionTimer(self, action_name):
for i in range(deletes):
to_del = keys[i * delete_size:(i + 1) * delete_size]
self.clients("cinder").volumes.delete_metadata(volume, to_del)
@atomic.action_timer("cinder.create_volume")
def _create_volume(self, size, **kwargs):
"""Create one volume.
Returns when the volume is actually created and is in the "Available"
state.
:param size: int be size of volume in GB, or
dictionary, must contain two values:
min - minimum size volumes will be created as;
max - maximum size volumes will be created as.
:param kwargs: Other optional parameters to initialize the volume
:returns: Created volume object
"""
if isinstance(size, dict):
size = random.randint(size["min"], size["max"])
client = cinder_wrapper.wrap(self._clients.cinder, self)
volume = client.create_volume(size, **kwargs)
# NOTE(msdubov): It is reasonable to wait 5 secs before starting to
# check whether the volume is ready => less API calls.
self.sleep_between(CONF.openstack.cinder_volume_create_prepoll_delay)
volume = bench_utils.wait_for_status(
volume,
ready_statuses=["available"],
update_resource=bench_utils.get_from_manager(),
timeout=CONF.openstack.cinder_volume_create_timeout,
check_interval=CONF.openstack.cinder_volume_create_poll_interval
)
return volume
@atomic.action_timer("cinder.update_volume")
def _update_volume(self, volume, **update_volume_args):
"""Update name and description for this volume
This atomic function updates volume information. The volume
display name is always changed, and additional update
arguments may also be specified.
:param volume: volume object
:param update_volume_args: dict, contains values to be updated.
"""
client = cinder_wrapper.wrap(self._clients.cinder, self)
client.update_volume(volume, **update_volume_args)
@atomic.action_timer("cinder.update_readonly_flag")
def _update_readonly_flag(self, volume, read_only):
"""Update the read-only access mode flag of the specified volume.
:param volume: The UUID of the volume to update.
:param read_only: The value to indicate whether to update volume to
read-only access mode.
:returns: A tuple of http Response and body
"""
return self.clients("cinder").volumes.update_readonly_flag(
volume, read_only)
@atomic.action_timer("cinder.delete_volume")
def _delete_volume(self, volume):
"""Delete the given volume.
Returns when the volume is actually deleted.
:param volume: volume object
"""
volume.delete()
bench_utils.wait_for_status(
volume,
ready_statuses=["deleted"],
check_deletion=True,
update_resource=bench_utils.get_from_manager(),
timeout=CONF.openstack.cinder_volume_delete_timeout,
check_interval=CONF.openstack.cinder_volume_delete_poll_interval
)
@atomic.action_timer("cinder.extend_volume")
def _extend_volume(self, volume, new_size):
"""Extend the given volume.
Returns when the volume is actually extended.
:param volume: volume object
:param new_size: new volume size in GB, or
dictionary, must contain two values:
min - minimum size volumes will be created as;
max - maximum size volumes will be created as.
Notice: should be bigger volume size
"""
if isinstance(new_size, dict):
new_size = random.randint(new_size["min"], new_size["max"])
volume.extend(volume, new_size)
volume = bench_utils.wait_for_status(
volume,
ready_statuses=["available"],
update_resource=bench_utils.get_from_manager(),
timeout=CONF.openstack.cinder_volume_create_timeout,
check_interval=CONF.openstack.cinder_volume_create_poll_interval
)
@atomic.action_timer("cinder.upload_volume_to_image")
def _upload_volume_to_image(self, volume, force=False,
container_format="bare", disk_format="raw"):
"""Upload the given volume to image.
Returns created image.
:param volume: volume object
:param force: flag to indicate whether to snapshot a volume even if
it's attached to an instance
:param container_format: container format of image. Acceptable
formats: ami, ari, aki, bare, and ovf
:param disk_format: disk format of image. Acceptable formats:
ami, ari, aki, vhd, vmdk, raw, qcow2, vdi and iso
:returns: Returns created image object
"""
resp, img = volume.upload_to_image(force, self.generate_random_name(),
container_format, disk_format)
# NOTE (e0ne): upload_to_image changes volume status to uploading so
# we need to wait until it will be available.
volume = bench_utils.wait_for_status(
volume,
ready_statuses=["available"],
update_resource=bench_utils.get_from_manager(),
timeout=CONF.openstack.cinder_volume_create_timeout,
check_interval=CONF.openstack.cinder_volume_create_poll_interval
)
image_id = img["os-volume_upload_image"]["image_id"]
image = self.clients("glance").images.get(image_id)
wrapper = glance_wrapper.wrap(self._clients.glance, self)
image = bench_utils.wait_for_status(
image,
ready_statuses=["active"],
update_resource=wrapper.get_image,
timeout=CONF.openstack.glance_image_create_timeout,
check_interval=CONF.openstack.glance_image_create_poll_interval
)
return image
@atomic.action_timer("cinder.create_snapshot")
def _create_snapshot(self, volume_id, force=False, **kwargs):
"""Create one snapshot.
Returns when the snapshot is actually created and is in the "Available"
state.
:param volume_id: volume uuid for creating snapshot
:param force: flag to indicate whether to snapshot a volume even if
it's attached to an instance
:param kwargs: Other optional parameters to initialize the volume
:returns: Created snapshot object
"""
kwargs["force"] = force
client = cinder_wrapper.wrap(self._clients.cinder, self)
snapshot = client.create_snapshot(volume_id, **kwargs)
self.sleep_between(CONF.openstack.cinder_volume_create_prepoll_delay)
snapshot = bench_utils.wait_for_status(
snapshot,
ready_statuses=["available"],
update_resource=bench_utils.get_from_manager(),
timeout=CONF.openstack.cinder_volume_create_timeout,
check_interval=CONF.openstack.cinder_volume_create_poll_interval
)
return snapshot
@atomic.action_timer("cinder.delete_snapshot")
def _delete_snapshot(self, snapshot):
"""Delete the given snapshot.
Returns when the snapshot is actually deleted.
:param snapshot: snapshot object
"""
snapshot.delete()
bench_utils.wait_for_status(
snapshot,
ready_statuses=["deleted"],
check_deletion=True,
update_resource=bench_utils.get_from_manager(),
timeout=CONF.openstack.cinder_volume_delete_timeout,
check_interval=CONF.openstack.cinder_volume_delete_poll_interval
)
@atomic.action_timer("cinder.create_backup")
def _create_backup(self, volume_id, **kwargs):
"""Create a volume backup of the given volume.
:param volume_id: The ID of the volume to backup.
:param kwargs: Other optional parameters
"""
backup = self.clients("cinder").backups.create(volume_id, **kwargs)
return bench_utils.wait_for_status(
backup,
ready_statuses=["available"],
update_resource=bench_utils.get_from_manager(),
timeout=CONF.openstack.cinder_volume_create_timeout,
check_interval=CONF.openstack.cinder_volume_create_poll_interval
)
@atomic.action_timer("cinder.delete_backup")
def _delete_backup(self, backup):
"""Delete the given backup.
Returns when the backup is actually deleted.
:param backup: backup instance
"""
backup.delete()
bench_utils.wait_for_status(
backup,
ready_statuses=["deleted"],
check_deletion=True,
update_resource=bench_utils.get_from_manager(),
timeout=CONF.openstack.cinder_volume_delete_timeout,
check_interval=CONF.openstack.cinder_volume_delete_poll_interval
)
@atomic.action_timer("cinder.restore_backup")
def _restore_backup(self, backup_id, volume_id=None):
"""Restore the given backup.
:param backup_id: The ID of the backup to restore.
:param volume_id: The ID of the volume to restore the backup to.
"""
restore = self.clients("cinder").restores.restore(backup_id, volume_id)
restored_volume = self.clients("cinder").volumes.get(restore.volume_id)
backup_for_restore = self.clients("cinder").backups.get(backup_id)
bench_utils.wait_for_status(
backup_for_restore,
ready_statuses=["available"],
update_resource=bench_utils.get_from_manager(),
timeout=CONF.openstack.cinder_backup_restore_timeout,
check_interval=CONF.openstack.cinder_backup_restore_poll_interval
)
return bench_utils.wait_for_status(
restored_volume,
ready_statuses=["available"],
update_resource=bench_utils.get_from_manager(),
timeout=CONF.openstack.cinder_volume_create_timeout,
check_interval=CONF.openstack.cinder_volume_create_poll_interval
)
@atomic.action_timer("cinder.list_backups")
def _list_backups(self, detailed=True):
"""Return user volume backups list.
:param detailed: True if detailed information about backup
should be listed
"""
return self.clients("cinder").backups.list(detailed)
def get_random_server(self):
server_id = random.choice(self.context["tenant"]["servers"])
return self.clients("nova").servers.get(server_id)
@atomic.action_timer("cinder.list_transfers")
def _list_transfers(self, detailed=True, search_opts=None):
"""Get a list of all volume transfers.
:param detailed: If True, detailed information about transfer
should be listed
:param search_opts: Search options to filter out volume transfers
:returns: list of :class:`VolumeTransfer`
"""
return self.clients("cinder").transfers.list(detailed, search_opts)
@atomic.action_timer("cinder.create_volume_type")
def _create_volume_type(self, **kwargs):
"""create volume type.
:param kwargs: Optional additional arguments for volume type creation
:returns: VolumeType object
"""
kwargs["name"] = self.generate_random_name()
return self.admin_clients("cinder").volume_types.create(**kwargs)
@atomic.action_timer("cinder.delete_volume_type")
def _delete_volume_type(self, volume_type):
"""delete a volume type.
:param volume_type: Name or Id of the volume type
:returns: base on client response return True if the request
has been accepted or not
"""
tuple_res = self.admin_clients("cinder").volume_types.delete(
volume_type)
return (tuple_res[0].status_code == 202)
@atomic.action_timer("cinder.set_volume_type_keys")
def _set_volume_type_keys(self, volume_type, metadata):
"""Set extra specs on a volume type.
:param volume_type: The :class:`VolumeType` to set extra spec on
:param metadata: A dict of key/value pairs to be set
:returns: extra_specs if the request has been accepted
"""
return volume_type.set_keys(metadata)
@atomic.action_timer("cinder.get_volume_type")
def _get_volume_type(self, volume_type):
"""get details of volume_type.
:param volume_type: The ID of the :class:`VolumeType` to get
:rtype: :class:`VolumeType`
"""
return self.admin_clients("cinder").volume_types.get(volume_type)
@atomic.action_timer("cinder.transfer_create")
def _transfer_create(self, volume_id):
"""Create a volume transfer.
:param volume_id: The ID of the volume to transfer
:rtype: VolumeTransfer
"""
name = self.generate_random_name()
return self.clients("cinder").transfers.create(volume_id, name)
@atomic.action_timer("cinder.transfer_accept")
def _transfer_accept(self, transfer_id, auth_key):
"""Accept a volume transfer.
:param transfer_id: The ID of the transfer to accept.
:param auth_key: The auth_key of the transfer.
:rtype: VolumeTransfer
"""
return self.clients("cinder").transfers.accept(transfer_id, auth_key)
@atomic.action_timer("cinder.create_encryption_type")
def _create_encryption_type(self, volume_type, specs):
"""Create encryption type for a volume type. Default: admin only.
:param volume_type: the volume type on which to add an encryption type
:param specs: the encryption type specifications to add
:return: an instance of :class: VolumeEncryptionType
"""
return self.admin_clients("cinder").volume_encryption_types.create(
volume_type, specs)
@atomic.action_timer("cinder.list_encryption_type")
def _list_encryption_type(self, search_opts=None):
"""List all volume encryption types.
:param search_opts: Options used when search for encryption types
:return: a list of :class: VolumeEncryptionType instances
"""
return self.admin_clients("cinder").volume_encryption_types.list(
search_opts)
@atomic.action_timer("cinder.delete_encryption_type")
def _delete_encryption_type(self, volume_type):
"""Delete the encryption type information for the specified volume type.
:param volume_type: the volume type whose encryption type information
must be deleted
"""
resp = self.admin_clients("cinder").volume_encryption_types.delete(
volume_type)
if (resp[0].status_code != 202):
raise exceptions.RallyException("EncryptionType Deletion Failed")

View File

@ -1,80 +0,0 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from rally.common import cfg
from rally.common import logging
from rally.task import atomic
from rally.task import utils
from rally_openstack import scenario
from rally_openstack.wrappers import glance as glance_wrapper
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class GlanceScenario(scenario.OpenStackScenario):
"""Base class for Glance scenarios with basic atomic actions."""
def __init__(self, context=None, admin_clients=None, clients=None):
super(GlanceScenario, self).__init__(context, admin_clients, clients)
LOG.warning(
"Class %s is deprecated since Rally 0.10.0 and will be removed "
"soon. Use "
"rally.plugins.openstack.services.image.image.Image "
"instead." % self.__class__)
@atomic.action_timer("glance.list_images")
def _list_images(self):
"""Returns user images list."""
return list(self.clients("glance").images.list())
@atomic.action_timer("glance.create_image")
def _create_image(self, container_format, image_location, disk_format,
**kwargs):
"""Create a new image.
:param container_format: container format of image. Acceptable
formats: ami, ari, aki, bare, and ovf
:param image_location: image file location
:param disk_format: disk format of image. Acceptable formats:
ami, ari, aki, vhd, vmdk, raw, qcow2, vdi, and iso
:param kwargs: optional parameters to create image
:returns: image object
"""
if not kwargs.get("name"):
kwargs["name"] = self.generate_random_name()
client = glance_wrapper.wrap(self._clients.glance, self)
return client.create_image(container_format, image_location,
disk_format, **kwargs)
@atomic.action_timer("glance.delete_image")
def _delete_image(self, image):
"""Deletes given image.
Returns when the image is actually deleted.
:param image: Image object
"""
self.clients("glance").images.delete(image.id)
wrapper = glance_wrapper.wrap(self._clients.glance, self)
utils.wait_for_status(
image, ["deleted", "pending_delete"],
check_deletion=True,
update_resource=wrapper.get_image,
timeout=CONF.openstack.glance_image_delete_timeout,
check_interval=CONF.openstack.glance_image_delete_poll_interval)

View File

@ -1,300 +0,0 @@
# Copyright 2013: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from rally.common import logging
from rally.task import atomic
from rally_openstack import scenario
from rally_openstack.wrappers import keystone as keystone_wrapper
LOG = logging.getLogger(__name__)
class KeystoneScenario(scenario.OpenStackScenario):
"""Base class for Keystone scenarios with basic atomic actions."""
def __init__(self, context=None, admin_clients=None, clients=None):
super(KeystoneScenario, self).__init__(context, admin_clients, clients)
LOG.warning(
"Class %s is deprecated since Rally 0.8.0 and will be removed "
"soon. Use "
"rally.plugins.openstack.services.identity.identity.Identity "
"instead." % self.__class__)
@atomic.action_timer("keystone.create_user")
def _user_create(self, email=None, **kwargs):
"""Creates keystone user with random name.
:param kwargs: Other optional parameters to create users like
"tenant_id", "enabled".
:returns: keystone user instance
"""
name = self.generate_random_name()
# NOTE(boris-42): password and email parameters are required by
# keystone client v2.0. This should be cleanuped
# when we switch to v3.
password = kwargs.pop("password", str(uuid.uuid4()))
email = email or (name + "@rally.me")
return self.admin_clients("keystone").users.create(
name, password=password, email=email, **kwargs)
@atomic.action_timer("keystone.update_user_enabled")
def _update_user_enabled(self, user, enabled):
"""Enable or disable a user.
:param user: The user to enable or disable
:param enabled: Boolean indicating if the user should be
enabled (True) or disabled (False)
"""
self.admin_clients("keystone").users.update_enabled(user, enabled)
@atomic.action_timer("keystone.validate_token")
def _token_validate(self, token):
"""Validate a token for a user.
:param token: The token to validate
"""
self.admin_clients("keystone").tokens.validate(token)
@atomic.action_timer("keystone.token_authenticate")
def _authenticate_token(self, name, password, tenant_id, tenant):
"""Authenticate user token.
:param name: The user username
:param password: User password for authentication
:param tenant_id: Tenant id for authentication
:param tenant: Tenant on which authentication will take place
"""
return self.admin_clients("keystone").tokens.authenticate(name,
tenant_id,
tenant,
password)
def _resource_delete(self, resource):
""""Delete keystone resource."""
r = "keystone.delete_%s" % resource.__class__.__name__.lower()
with atomic.ActionTimer(self, r):
resource.delete()
@atomic.action_timer("keystone.create_tenant")
def _tenant_create(self, **kwargs):
"""Creates keystone tenant with random name.
:param kwargs: Other optional parameters
:returns: keystone tenant instance
"""
name = self.generate_random_name()
return self.admin_clients("keystone").tenants.create(name, **kwargs)
@atomic.action_timer("keystone.create_service")
def _service_create(self, service_type=None,
description=None):
"""Creates keystone service with random name.
:param service_type: type of the service
:param description: description of the service
:returns: keystone service instance
"""
service_type = service_type or "rally_test_type"
description = description or self.generate_random_name()
return self.admin_clients("keystone").services.create(
self.generate_random_name(),
service_type, description=description)
@atomic.action_timer("keystone.create_users")
def _users_create(self, tenant, users_per_tenant):
"""Adds users to a tenant.
:param tenant: tenant object
:param users_per_tenant: number of users in per tenant
"""
for i in range(users_per_tenant):
name = self.generate_random_name()
password = name
email = name + "@rally.me"
self.admin_clients("keystone").users.create(
name, password=password, email=email, tenant_id=tenant.id)
@atomic.action_timer("keystone.create_role")
def _role_create(self, **kwargs):
"""Creates keystone user role with random name.
:param **kwargs: Optional additional arguments for roles creation
:returns: keystone user role
"""
admin_clients = keystone_wrapper.wrap(self.admin_clients("keystone"))
role = admin_clients.create_role(
self.generate_random_name(), **kwargs)
return role
@atomic.action_timer("keystone.role_delete")
def _role_delete(self, role_id):
"""Creates keystone user role with random name.
:param user_id: id of the role
"""
admin_clients = keystone_wrapper.wrap(self.admin_clients("keystone"))
admin_clients.delete_role(role_id)
@atomic.action_timer("keystone.list_users")
def _list_users(self):
"""List users."""
return self.admin_clients("keystone").users.list()
@atomic.action_timer("keystone.list_tenants")
def _list_tenants(self):
"""List tenants."""
return self.admin_clients("keystone").tenants.list()
@atomic.action_timer("keystone.service_list")
def _list_services(self):
"""List services."""
return self.admin_clients("keystone").services.list()
@atomic.action_timer("keystone.list_roles")
def _list_roles_for_user(self, user, tenant):
"""List user roles.
:param user: user for whom roles will be listed
:param tenant: tenant on which user have roles
"""
return self.admin_clients("keystone").roles.roles_for_user(
user, tenant)
@atomic.action_timer("keystone.add_role")
def _role_add(self, user, role, tenant):
"""Add role to a given user on a tenant.
:param user: user to be assigned the role to
:param role: user role to assign with
:param tenant: tenant on which assignation will take place
"""
self.admin_clients("keystone").roles.add_user_role(user, role, tenant)
@atomic.action_timer("keystone.remove_role")
def _role_remove(self, user, role, tenant):
"""Dissociate user with role.
:param user: user to be stripped with role
:param role: role to be dissociated with user
:param tenant: tenant on which assignation took place
"""
self.admin_clients("keystone").roles.remove_user_role(user,
role, tenant)
@atomic.action_timer("keystone.get_tenant")
def _get_tenant(self, tenant_id):
"""Get given tenant.
:param tenant_id: tenant object
"""
return self.admin_clients("keystone").tenants.get(tenant_id)
@atomic.action_timer("keystone.get_user")
def _get_user(self, user_id):
"""Get given user.
:param user_id: user object
"""
return self.admin_clients("keystone").users.get(user_id)
@atomic.action_timer("keystone.get_role")
def _get_role(self, role_id):
"""Get given user role.
:param role_id: user role object
"""
return self.admin_clients("keystone").roles.get(role_id)
@atomic.action_timer("keystone.get_service")
def _get_service(self, service_id):
"""Get service with given service id.
:param service_id: id for service object
"""
return self.admin_clients("keystone").services.get(service_id)
def _get_service_by_name(self, name):
for i in self._list_services():
if i.name == name:
return i
@atomic.action_timer("keystone.delete_service")
def _delete_service(self, service_id):
"""Delete service.
:param service_id: service to be deleted
"""
self.admin_clients("keystone").services.delete(service_id)
@atomic.action_timer("keystone.update_tenant")
def _update_tenant(self, tenant, description=None):
"""Update tenant name and description.
:param tenant: tenant to be updated
:param description: tenant description to be set
"""
name = self.generate_random_name()
description = description or self.generate_random_name()
self.admin_clients("keystone").tenants.update(tenant.id,
name, description)
@atomic.action_timer("keystone.update_user_password")
def _update_user_password(self, user_id, password):
"""Update user password.
:param user_id: id of the user
:param password: new password
"""
admin_clients = self.admin_clients("keystone")
if admin_clients.version in ["v3"]:
admin_clients.users.update(user_id, password=password)
else:
admin_clients.users.update_password(user_id, password)
@atomic.action_timer("keystone.create_ec2creds")
def _create_ec2credentials(self, user_id, tenant_id):
"""Create ec2credentials.
:param user_id: User ID for which to create credentials
:param tenant_id: Tenant ID for which to create credentials
:returns: Created ec2-credentials object
"""
return self.clients("keystone").ec2.create(user_id, tenant_id)
@atomic.action_timer("keystone.list_ec2creds")
def _list_ec2credentials(self, user_id):
"""List of access/secret pairs for a user_id.
:param user_id: List all ec2-credentials for User ID
:returns: Return ec2-credentials list
"""
return self.clients("keystone").ec2.list(user_id)
@atomic.action_timer("keystone.delete_ec2creds")
def _delete_ec2credential(self, user_id, access):
"""Delete ec2credential.
:param user_id: User ID for which to delete credential
:param access: access key for ec2credential to delete
"""
self.clients("keystone").ec2.delete(user_id, access)

View File

@ -1,95 +0,0 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from rally.common import logging
from rally import exceptions
import six
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class CinderWrapper(object):
def __init__(self, client, owner):
self.owner = owner
self.client = client
@abc.abstractmethod
def create_volume(self, volume):
"""Creates new volume."""
@abc.abstractmethod
def update_volume(self, volume):
"""Updates name and description for this volume."""
@abc.abstractmethod
def create_snapshot(self, volume_id):
"""Creates a volume snapshot."""
class CinderV1Wrapper(CinderWrapper):
def create_volume(self, size, **kwargs):
kwargs["display_name"] = self.owner.generate_random_name()
volume = self.client.volumes.create(size, **kwargs)
return volume
def update_volume(self, volume, **update_args):
update_args["display_name"] = self.owner.generate_random_name()
update_args["display_description"] = (
update_args.get("display_description"))
self.client.volumes.update(volume, **update_args)
def create_snapshot(self, volume_id, **kwargs):
kwargs["display_name"] = self.owner.generate_random_name()
snapshot = self.client.volume_snapshots.create(volume_id, **kwargs)
return snapshot
class CinderV2Wrapper(CinderWrapper):
def create_volume(self, size, **kwargs):
kwargs["name"] = self.owner.generate_random_name()
volume = self.client.volumes.create(size, **kwargs)
return volume
def update_volume(self, volume, **update_args):
update_args["name"] = self.owner.generate_random_name()
update_args["description"] = update_args.get("description")
self.client.volumes.update(volume, **update_args)
def create_snapshot(self, volume_id, **kwargs):
kwargs["name"] = self.owner.generate_random_name()
snapshot = self.client.volume_snapshots.create(volume_id, **kwargs)
return snapshot
def wrap(client, owner):
"""Returns cinderclient wrapper based on cinder client version."""
LOG.warning("Method wrap from %s and whole Cinder wrappers are "
"deprecated since Rally 0.10.0 and will be removed soon. Use "
"rally.plugins.openstack.services.storage.block.BlockStorage "
"instead." % __file__)
version = client.choose_version()
if version == "1":
return CinderV1Wrapper(client(), owner)
elif version == "2":
return CinderV2Wrapper(client(), owner)
else:
msg = "This version of API %s could not be identified." % version
LOG.warning(msg)
raise exceptions.InvalidArgumentsException(msg)

View File

@ -1,211 +0,0 @@
# Copyright 2016: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import os
import time
from glanceclient import exc as glance_exc
import requests
import six
from rally.common import cfg
from rally.common import logging
from rally.common import utils as rutils
from rally import exceptions
from rally.task import utils
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
@six.add_metaclass(abc.ABCMeta)
class GlanceWrapper(object):
def __init__(self, client, owner):
self.owner = owner
self.client = client
def get_image(self, image):
"""Gets image.
This serves to fetch the latest data on the image for the
various wait_for_*() functions.
Must raise rally.exceptions.GetResourceNotFound if the
resource is not found or deleted.
"""
# NOTE(stpierre): This function actually has a single
# implementation that works for both Glance v1 and Glance v2,
# but since we need to use this function in both wrappers, it
# gets implemented here.
try:
return self.client.images.get(image.id)
except glance_exc.HTTPNotFound:
raise exceptions.GetResourceNotFound(resource=image)
@abc.abstractmethod
def create_image(self, container_format, image_location, disk_format):
"""Creates new image.
Accepts all Glance v2 parameters.
"""
@abc.abstractmethod
def set_visibility(self, image, visibility="public"):
"""Set an existing image to public or private."""
@abc.abstractmethod
def list_images(self, **filters):
"""List images.
Accepts all Glance v2 filters.
"""
class GlanceV1Wrapper(GlanceWrapper):
def create_image(self, container_format, image_location,
disk_format, **kwargs):
kw = {
"container_format": container_format,
"disk_format": disk_format,
}
kw.update(kwargs)
if "name" not in kw:
kw["name"] = self.owner.generate_random_name()
if "visibility" in kw:
kw["is_public"] = kw.pop("visibility") == "public"
image_location = os.path.expanduser(image_location)
try:
if os.path.isfile(image_location):
kw["data"] = open(image_location)
else:
kw["copy_from"] = image_location
image = self.client.images.create(**kw)
rutils.interruptable_sleep(CONF.openstack.
glance_image_create_prepoll_delay)
image = utils.wait_for_status(
image, ["active"],
update_resource=self.get_image,
timeout=CONF.openstack.glance_image_create_timeout,
check_interval=CONF.openstack.
glance_image_create_poll_interval)
finally:
if "data" in kw:
kw["data"].close()
return image
def set_visibility(self, image, visibility="public"):
self.client.images.update(image.id, is_public=(visibility == "public"))
def list_images(self, **filters):
kwargs = {"filters": filters}
if "owner" in filters:
# NOTE(stpierre): in glance v1, "owner" is not a filter,
# so we need to handle it separately.
kwargs["owner"] = kwargs["filters"].pop("owner")
visibility = kwargs["filters"].pop("visibility", None)
images = self.client.images.list(**kwargs)
# NOTE(stpierre): Glance v1 isn't smart enough to filter on
# public/private images, so we have to do it manually.
if visibility is not None:
is_public = visibility == "public"
return [i for i in images if i.is_public is is_public]
return images
class GlanceV2Wrapper(GlanceWrapper):
def create_image(self, container_format, image_location,
disk_format, **kwargs):
kw = {
"container_format": container_format,
"disk_format": disk_format,
}
kw.update(kwargs)
if "name" not in kw:
kw["name"] = self.owner.generate_random_name()
if "is_public" in kw:
LOG.warning("is_public is not supported by Glance v2, and is "
"deprecated in Rally v0.8.0")
kw["visibility"] = "public" if kw.pop("is_public") else "private"
image_location = os.path.expanduser(image_location)
image = self.client.images.create(**kw)
rutils.interruptable_sleep(CONF.openstack.
glance_image_create_prepoll_delay)
start = time.time()
image = utils.wait_for_status(
image, ["queued"],
update_resource=self.get_image,
timeout=CONF.openstack.glance_image_create_timeout,
check_interval=CONF.openstack.
glance_image_create_poll_interval)
timeout = time.time() - start
image_data = None
response = None
try:
if os.path.isfile(image_location):
image_data = open(image_location)
else:
response = requests.get(image_location, stream=True)
image_data = response.raw
self.client.images.upload(image.id, image_data)
finally:
if image_data is not None:
image_data.close()
if response is not None:
response.close()
return utils.wait_for_status(
image, ["active"],
update_resource=self.get_image,
timeout=timeout,
check_interval=CONF.openstack.
glance_image_create_poll_interval)
def set_visibility(self, image, visibility="public"):
self.client.images.update(image.id, visibility=visibility)
def list_images(self, **filters):
return self.client.images.list(filters=filters)
def wrap(client, owner):
"""Returns glanceclient wrapper based on glance client version."""
LOG.warning("Method wrap from %s and whole Glance wrappers are "
"deprecated since Rally 0.10.0 and will be removed soon. Use "
"rally.plugins.openstack.services.image.image.Image "
"instead." % __file__)
version = client.choose_version()
if version == "1":
return GlanceV1Wrapper(client(), owner)
elif version == "2":
return GlanceV2Wrapper(client(), owner)
else:
msg = "Version %s of the glance API could not be identified." % version
LOG.warning(msg)
raise exceptions.InvalidArgumentsException(msg)

View File

@ -1,273 +0,0 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import collections
from keystoneclient import exceptions
import six
from rally.common import logging
LOG = logging.getLogger(__name__)
Project = collections.namedtuple("Project", ["id", "name", "domain_id"])
User = collections.namedtuple("User",
["id", "name", "project_id", "domain_id"])
Service = collections.namedtuple("Service", ["id", "name"])
Role = collections.namedtuple("Role", ["id", "name"])
@six.add_metaclass(abc.ABCMeta)
class KeystoneWrapper(object):
def __init__(self, client):
self.client = client
LOG.warning(
"Class %s is deprecated since Rally 0.8.0 and will be removed "
"soon. Use "
"rally.plugins.openstack.services.identity.identity.Identity "
"instead." % self.__class__)
def __getattr__(self, attr_name):
return getattr(self.client, attr_name)
@abc.abstractmethod
def create_project(self, project_name, domain_name="Default"):
"""Creates new project/tenant and return project object.
:param project_name: Name of project to be created.
:param domain_name: Name or id of domain where to create project, for
implementations that don't support domains this
argument must be None or 'Default'.
"""
@abc.abstractmethod
def delete_project(self, project_id):
"""Deletes project."""
@abc.abstractmethod
def create_user(self, username, password, email=None, project_id=None,
domain_name="Default", default_role="member"):
"""Create user.
:param username: name of user
:param password: user password
:param project: user's default project
:param domain_name: Name or id of domain where to create project, for
implementations that don't support domains this
argument must be None or 'Default'.
:param default_role: user's default role
"""
@abc.abstractmethod
def delete_user(self, user_id):
"""Deletes user."""
@abc.abstractmethod
def list_users(self):
"""List all users."""
@abc.abstractmethod
def list_projects(self):
"""List all projects/tenants."""
def delete_service(self, service_id):
"""Deletes service."""
self.client.services.delete(service_id)
def list_services(self):
"""List all services."""
return map(KeystoneWrapper._wrap_service, self.client.services.list())
def create_role(self, name, **kwargs):
"""create a role.
:param name: name of role
:param kwargs: Optional additional arguments for roles creation
"""
def delete_role(self, role_id):
"""Deletes role."""
self.client.roles.delete(role_id)
def list_roles(self):
"""List all roles."""
return map(KeystoneWrapper._wrap_role, self.client.roles.list())
@abc.abstractmethod
def add_role(self, role_id, user_id, project_id):
"""Assign role to user."""
@abc.abstractmethod
def remove_role(self, role_id, user_id, project_id):
"""Remove role from user."""
@staticmethod
def _wrap_service(service):
return Service(id=service.id, name=service.name)
@staticmethod
def _wrap_role(role):
return Role(id=role.id, name=role.name)
class KeystoneV2Wrapper(KeystoneWrapper):
def _check_domain(self, domain_name):
if domain_name.lower() != "default":
raise NotImplementedError("Domain functionality not implemented "
"in Keystone v2")
@staticmethod
def _wrap_v2_tenant(tenant):
return Project(id=tenant.id, name=tenant.name, domain_id="default")
@staticmethod
def _wrap_v2_role(role):
return Role(id=role.id, name=role.name)
@staticmethod
def _wrap_v2_user(user):
return User(id=user.id, name=user.name,
project_id=getattr(user, "tenantId", None),
domain_id="default")
def create_project(self, project_name, domain_name="Default"):
self._check_domain(domain_name)
tenant = self.client.tenants.create(project_name)
return KeystoneV2Wrapper._wrap_v2_tenant(tenant)
def delete_project(self, project_id):
self.client.tenants.delete(project_id)
def create_user(self, username, password, email=None, project_id=None,
domain_name="Default", default_role="member"):
# NOTE(liuyulong): For v2 wrapper the `default_role` here is not used.
self._check_domain(domain_name)
user = self.client.users.create(username, password, email, project_id)
return KeystoneV2Wrapper._wrap_v2_user(user)
def delete_user(self, user_id):
self.client.users.delete(user_id)
def list_users(self):
return map(KeystoneV2Wrapper._wrap_v2_user, self.client.users.list())
def list_projects(self):
return map(KeystoneV2Wrapper._wrap_v2_tenant,
self.client.tenants.list())
def create_role(self, name):
role = self.client.roles.create(name)
return KeystoneV2Wrapper._wrap_v2_role(role)
def add_role(self, role_id, user_id, project_id):
self.client.roles.add_user_role(user_id, role_id, tenant=project_id)
def remove_role(self, role_id, user_id, project_id):
self.client.roles.remove_user_role(user_id, role_id, tenant=project_id)
class KeystoneV3Wrapper(KeystoneWrapper):
def _get_domain_id(self, domain_name_or_id):
try:
# First try to find domain by ID
return self.client.domains.get(domain_name_or_id).id
except exceptions.NotFound:
# Domain not found by ID, try to find it by name
domains = self.client.domains.list(name=domain_name_or_id)
if domains:
return domains[0].id
# Domain not found by name, raise original NotFound exception
raise
@staticmethod
def _wrap_v3_project(project):
return Project(id=project.id, name=project.name,
domain_id=project.domain_id)
@staticmethod
def _wrap_v3_role(role):
return Role(id=role.id, name=role.name)
@staticmethod
def _wrap_v3_user(user):
# When user has default_project_id that is None user.default_project_id
# will raise AttributeError
project_id = getattr(user, "default_project_id", None)
return User(id=user.id, name=user.name, project_id=project_id,
domain_id=user.domain_id)
def create_project(self, project_name, domain_name="Default"):
domain_id = self._get_domain_id(domain_name)
project = self.client.projects.create(
name=project_name, domain=domain_id)
return KeystoneV3Wrapper._wrap_v3_project(project)
def delete_project(self, project_id):
self.client.projects.delete(project_id)
def create_user(self, username, password, email=None, project_id=None,
domain_name="Default", default_role="member"):
domain_id = self._get_domain_id(domain_name)
user = self.client.users.create(name=username, password=password,
default_project=project_id,
email=email, domain=domain_id)
for role in self.client.roles.list():
if default_role in role.name.lower():
self.client.roles.grant(role.id, user=user.id,
project=project_id)
break
else:
LOG.warning(
"Unable to set %s role to created user." % default_role)
return KeystoneV3Wrapper._wrap_v3_user(user)
def delete_user(self, user_id):
self.client.users.delete(user_id)
def list_users(self):
return map(KeystoneV3Wrapper._wrap_v3_user, self.client.users.list())
def list_projects(self):
return map(KeystoneV3Wrapper._wrap_v3_project,
self.client.projects.list())
def create_role(self, name, domain, **kwargs):
role = self.client.roles.create(name, domain=domain, **kwargs)
return KeystoneV3Wrapper._wrap_v3_role(role)
def add_role(self, role_id, user_id, project_id):
self.client.roles.grant(role_id, user=user_id, project=project_id)
def remove_role(self, role_id, user_id, project_id):
self.client.roles.revoke(role_id, user=user_id, project=project_id)
def wrap(client):
"""Returns keystone wrapper based on keystone client version."""
LOG.warning("Method wrap from %s and whole Keystone wrappers are "
"deprecated since Rally 0.8.0 and will be removed soon. Use "
"rally.plugins.openstack.services.identity.identity.Identity "
"instead." % __file__)
if client.version == "v2.0":
return KeystoneV2Wrapper(client)
elif client.version == "v3":
return KeystoneV3Wrapper(client)
else:
raise NotImplementedError(
"Wrapper for version %s is not implemented." % client.version)

View File

@ -18,7 +18,7 @@ import os
import traceback import traceback
import unittest import unittest
import rally as rally_m import rally_openstack
from tests.check_samples import utils from tests.check_samples import utils
@ -32,7 +32,8 @@ class TestPreCreatedTasks(unittest.TestCase):
rally = utils.Rally() rally = utils.Rally()
full_path = os.path.join( full_path = os.path.join(
os.path.dirname(rally_m.__file__), os.pardir, "tasks", "openstack") os.path.dirname(rally_openstack.__file__), os.pardir,
"tasks", "openstack")
task_path = os.path.join(full_path, "task.yaml") task_path = os.path.join(full_path, "task.yaml")
args_path = os.path.join(full_path, "task_arguments.yaml") args_path = os.path.join(full_path, "task_arguments.yaml")

View File

@ -29,7 +29,7 @@ from rally import api
from rally.common import broker from rally.common import broker
from rally.common import yamlutils as yaml from rally.common import yamlutils as yaml
from rally import plugins from rally import plugins
from rally.plugins.openstack.context.keystone import users from rally_openstack.contexts.keystone import users
from tests.check_samples import utils from tests.check_samples import utils

View File

@ -22,7 +22,6 @@ from tests.unit import test
BASE_CTX = "rally.task.context" BASE_CTX = "rally.task.context"
CTX = "rally_openstack.contexts.sahara.sahara_image" CTX = "rally_openstack.contexts.sahara.sahara_image"
BASE_SCN = "rally.task.scenarios" BASE_SCN = "rally.task.scenarios"
SCN = "rally_openstack.scenarios"
class SaharaImageTestCase(test.ScenarioTestCase): class SaharaImageTestCase(test.ScenarioTestCase):
@ -135,19 +134,17 @@ class SaharaImageTestCase(test.ScenarioTestCase):
superclass=sahara_ctx.__class__, superclass=sahara_ctx.__class__,
task_id=ctx["owner_id"]) task_id=ctx["owner_id"])
@mock.patch("%s.glance.utils.GlanceScenario._create_image" % SCN,
return_value=mock.MagicMock(id=42))
@mock.patch("%s.resource_manager.cleanup" % CTX) @mock.patch("%s.resource_manager.cleanup" % CTX)
@mock.patch("%s.osclients.Clients" % CTX) @mock.patch("%s.osclients.Clients" % CTX)
def test_setup_and_cleanup_existing_image( def test_setup_and_cleanup_existing_image(
self, mock_clients, mock_cleanup, self, mock_clients, mock_cleanup):
mock_glance_scenario__create_image):
mock_clients.glance.images.get.return_value = mock.MagicMock( mock_clients.glance.images.get.return_value = mock.MagicMock(
is_public=True) is_public=True)
ctx = self.existing_image_context ctx = self.existing_image_context
sahara_ctx = sahara_image.SaharaImage(ctx) sahara_ctx = sahara_image.SaharaImage(ctx)
sahara_ctx._create_image = mock.Mock()
sahara_ctx.setup() sahara_ctx.setup()
for tenant_id in sahara_ctx.context["tenants"]: for tenant_id in sahara_ctx.context["tenants"]:
@ -155,7 +152,7 @@ class SaharaImageTestCase(test.ScenarioTestCase):
sahara_ctx.context["tenants"][tenant_id]["sahara"]["image"]) sahara_ctx.context["tenants"][tenant_id]["sahara"]["image"])
self.assertEqual("some_id", image_id) self.assertEqual("some_id", image_id)
self.assertFalse(mock_glance_scenario__create_image.called) self.assertFalse(sahara_ctx._create_image.called)
sahara_ctx.cleanup() sahara_ctx.cleanup()
self.assertFalse(mock_cleanup.called) self.assertFalse(mock_cleanup.called)

View File

@ -15,16 +15,9 @@
import mock import mock
from rally.common import cfg
from rally import exceptions
from rally_openstack import osclients
from rally_openstack.scenarios.cinder import utils from rally_openstack.scenarios.cinder import utils
from tests.unit import fakes
from tests.unit import test from tests.unit import test
CINDER_UTILS = "rally_openstack.scenarios.cinder.utils"
CONF = cfg.CONF
class CinderBasicTestCase(test.ScenarioTestCase): class CinderBasicTestCase(test.ScenarioTestCase):
@ -53,467 +46,3 @@ class CinderBasicTestCase(test.ScenarioTestCase):
basic.clients("nova").servers.get.assert_called_once_with(server_id) basic.clients("nova").servers.get.assert_called_once_with(server_id)
self.assertEqual(basic.clients("nova").servers.get.return_value, self.assertEqual(basic.clients("nova").servers.get.return_value,
return_server) return_server)
class CinderScenarioTestCase(test.ScenarioTestCase):
def setUp(self):
super(CinderScenarioTestCase, self).setUp()
wrap = mock.patch("rally_openstack.wrappers.cinder.wrap")
self.mock_wrap = wrap.start()
self.addCleanup(self.mock_wrap.stop)
self.scenario = utils.CinderScenario(
self.context,
clients=osclients.Clients(
fakes.FakeUserContext.user["credential"]))
def test__list_volumes(self):
return_volumes_list = self.scenario._list_volumes()
self.assertEqual(self.clients("cinder").volumes.list.return_value,
return_volumes_list)
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.list_volumes")
def test__list_types(self):
return_types_list = self.scenario._list_types()
self.assertEqual(self.clients("cinder").volume_types.list.return_value,
return_types_list)
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.list_types")
def test__get_volume(self):
volume = fakes.FakeVolume()
self.assertEqual(self.clients("cinder").volumes.get.return_value,
self.scenario._get_volume(volume.id))
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.get_volume")
def test__list_snapshots(self):
return_snapshots_list = self.scenario._list_snapshots()
self.assertEqual(
self.clients("cinder").volume_snapshots.list.return_value,
return_snapshots_list)
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.list_snapshots")
def test__list_transfers(self):
return_transfers_list = self.scenario._list_transfers()
self.assertEqual(
self.clients("cinder").transfers.list.return_value,
return_transfers_list)
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.list_transfers")
def test__set_metadata(self):
volume = fakes.FakeVolume()
self.scenario._set_metadata(volume, sets=2, set_size=4)
calls = self.clients("cinder").volumes.set_metadata.call_args_list
self.assertEqual(2, len(calls))
for call in calls:
call_volume, metadata = call[0]
self.assertEqual(volume, call_volume)
self.assertEqual(4, len(metadata))
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.set_4_metadatas_2_times")
def test__delete_metadata(self):
volume = fakes.FakeVolume()
keys = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"]
self.scenario._delete_metadata(volume, keys, deletes=3, delete_size=4)
calls = self.clients("cinder").volumes.delete_metadata.call_args_list
self.assertEqual(3, len(calls))
all_deleted = []
for call in calls:
call_volume, del_keys = call[0]
self.assertEqual(volume, call_volume)
self.assertEqual(4, len(del_keys))
for key in del_keys:
self.assertIn(key, keys)
self.assertNotIn(key, all_deleted)
all_deleted.append(key)
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.delete_4_metadatas_3_times")
def test__delete_metadata_not_enough_keys(self):
volume = fakes.FakeVolume()
keys = ["a", "b", "c", "d", "e"]
self.assertRaises(exceptions.InvalidArgumentsException,
self.scenario._delete_metadata,
volume, keys, deletes=2, delete_size=3)
def test__create_volume(self):
return_volume = self.scenario._create_volume(1)
self.mock_wait_for_status.mock.assert_called_once_with(
self.mock_wrap.return_value.create_volume.return_value,
ready_statuses=["available"],
update_resource=self.mock_get_from_manager.mock.return_value,
timeout=CONF.openstack.cinder_volume_create_timeout,
check_interval=CONF.openstack.cinder_volume_create_poll_interval
)
self.mock_get_from_manager.mock.assert_called_once_with()
self.assertEqual(self.mock_wait_for_status.mock.return_value,
return_volume)
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.create_volume")
@mock.patch("rally_openstack.scenarios.cinder.utils.random")
def test__create_volume_with_size_range(self, mock_random):
mock_random.randint.return_value = 3
return_volume = self.scenario._create_volume(
size={"min": 1, "max": 5},
display_name="TestVolume")
self.mock_wrap.return_value.create_volume.assert_called_once_with(
3, display_name="TestVolume")
self.mock_wait_for_status.mock.assert_called_once_with(
self.mock_wrap.return_value.create_volume.return_value,
ready_statuses=["available"],
update_resource=self.mock_get_from_manager.mock.return_value,
timeout=CONF.openstack.cinder_volume_create_timeout,
check_interval=CONF.openstack.cinder_volume_create_poll_interval
)
self.mock_get_from_manager.mock.assert_called_once_with()
self.assertEqual(self.mock_wait_for_status.mock.return_value,
return_volume)
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.create_volume")
def test__update_volume(self):
fake_volume = mock.MagicMock()
volume_update_args = {"display_name": "_updated",
"display_description": "_updated"}
self.scenario._update_volume(fake_volume, **volume_update_args)
self.mock_wrap.return_value.update_volume.assert_called_once_with(
fake_volume,
display_name="_updated",
display_description="_updated")
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.update_volume")
def test__update_readonly_flag(self):
fake_volume = mock.MagicMock()
self.scenario._update_readonly_flag(fake_volume, "fake_flag")
self.clients(
"cinder").volumes.update_readonly_flag.assert_called_once_with(
fake_volume, "fake_flag")
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.update_readonly_flag")
def test__delete_volume(self):
cinder = mock.Mock()
self.scenario._delete_volume(cinder)
cinder.delete.assert_called_once_with()
self.mock_wait_for_status.mock.assert_called_once_with(
cinder,
ready_statuses=["deleted"],
check_deletion=True,
update_resource=self.mock_get_from_manager.mock.return_value,
timeout=cfg.CONF.openstack.cinder_volume_create_timeout,
check_interval=cfg.CONF.openstack
.cinder_volume_create_poll_interval)
self.mock_get_from_manager.mock.assert_called_once_with()
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.delete_volume")
@mock.patch("rally_openstack.scenarios.cinder.utils.random")
def test__extend_volume_with_size_range(self, mock_random):
volume = mock.Mock()
mock_random.randint.return_value = 3
self.clients("cinder").volumes.extend.return_value = volume
self.scenario._extend_volume(volume, new_size={"min": 1, "max": 5})
volume.extend.assert_called_once_with(volume, 3)
self.mock_wait_for_status.mock.assert_called_once_with(
volume,
ready_statuses=["available"],
update_resource=self.mock_get_from_manager.mock.return_value,
timeout=CONF.openstack.cinder_volume_create_timeout,
check_interval=CONF.openstack.cinder_volume_create_poll_interval
)
self.mock_get_from_manager.mock.assert_called_once_with()
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.extend_volume")
def test__extend_volume(self):
volume = mock.Mock()
self.clients("cinder").volumes.extend.return_value = volume
self.scenario._extend_volume(volume, 2)
self.mock_wait_for_status.mock.assert_called_once_with(
volume,
ready_statuses=["available"],
update_resource=self.mock_get_from_manager.mock.return_value,
timeout=CONF.openstack.cinder_volume_create_timeout,
check_interval=CONF.openstack.cinder_volume_create_poll_interval
)
self.mock_get_from_manager.mock.assert_called_once_with()
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.extend_volume")
@mock.patch("rally_openstack.wrappers.glance.wrap")
def test__upload_volume_to_image(self, mock_wrap):
volume = mock.Mock()
image = {"os-volume_upload_image": {"image_id": 1}}
volume.upload_to_image.return_value = (None, image)
self.clients("cinder").images.get.return_value = image
self.scenario.generate_random_name = mock.Mock(
return_value="test_vol")
self.scenario._upload_volume_to_image(volume, False,
"container", "disk")
volume.upload_to_image.assert_called_once_with(False, "test_vol",
"container", "disk")
self.mock_wait_for_status.mock.assert_has_calls([
mock.call(
volume,
ready_statuses=["available"],
update_resource=self.mock_get_from_manager.mock.return_value,
timeout=CONF.openstack.cinder_volume_create_timeout,
check_interval=CONF.openstack.
cinder_volume_create_poll_interval),
mock.call(
self.clients("glance").images.get.return_value,
ready_statuses=["active"],
update_resource=mock_wrap.return_value.get_image,
timeout=CONF.openstack.glance_image_create_timeout,
check_interval=CONF.openstack.
glance_image_create_poll_interval)
])
self.mock_get_from_manager.mock.assert_called_once_with()
self.clients("glance").images.get.assert_called_once_with(1)
def test__create_snapshot(self):
return_snapshot = self.scenario._create_snapshot("uuid", False)
self.mock_wait_for_status.mock.assert_called_once_with(
self.mock_wrap.return_value.create_snapshot.return_value,
ready_statuses=["available"],
update_resource=self.mock_get_from_manager.mock.return_value,
timeout=cfg.CONF.openstack.cinder_volume_create_timeout,
check_interval=cfg.CONF.openstack
.cinder_volume_create_poll_interval)
self.mock_get_from_manager.mock.assert_called_once_with()
self.assertEqual(self.mock_wait_for_status.mock.return_value,
return_snapshot)
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.create_snapshot")
def test__delete_snapshot(self):
snapshot = mock.Mock()
self.scenario._delete_snapshot(snapshot)
snapshot.delete.assert_called_once_with()
self.mock_wait_for_status.mock.assert_called_once_with(
snapshot,
ready_statuses=["deleted"],
check_deletion=True,
update_resource=self.mock_get_from_manager.mock.return_value,
timeout=cfg.CONF.openstack.cinder_volume_create_timeout,
check_interval=cfg.CONF.openstack
.cinder_volume_create_poll_interval)
self.mock_get_from_manager.mock.assert_called_once_with()
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.delete_snapshot")
def test__create_backup(self):
return_backup = self.scenario._create_backup("uuid")
self.mock_wait_for_status.mock.assert_called_once_with(
self.clients("cinder").backups.create.return_value,
ready_statuses=["available"],
update_resource=self.mock_get_from_manager.mock.return_value,
timeout=cfg.CONF.openstack.cinder_volume_create_timeout,
check_interval=cfg.CONF.openstack
.cinder_volume_create_poll_interval)
self.mock_get_from_manager.mock.assert_called_once_with()
self.assertEqual(self.mock_wait_for_status.mock.return_value,
return_backup)
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.create_backup")
def test__delete_backup(self):
backup = mock.Mock()
self.scenario._delete_backup(backup)
backup.delete.assert_called_once_with()
self.mock_wait_for_status.mock.assert_called_once_with(
backup,
ready_statuses=["deleted"],
check_deletion=True,
update_resource=self.mock_get_from_manager.mock.return_value,
timeout=cfg.CONF.openstack.cinder_volume_create_timeout,
check_interval=cfg.CONF.openstack
.cinder_volume_create_poll_interval)
self.mock_get_from_manager.mock.assert_called_once_with()
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.delete_backup")
def test__restore_backup(self):
# NOTE(mdovgal): added for pep8 visual indent test passing
bench_cfg = cfg.CONF.openstack
backup = mock.Mock()
restore = mock.Mock()
self.clients("cinder").restores.restore.return_value = backup
self.clients("cinder").backups.get.return_value = backup
self.clients("cinder").volumes.get.return_value = restore
return_restore = self.scenario._restore_backup(backup.id, None)
self.mock_wait_for_status.mock.assert_has_calls([
mock.call(
backup,
ready_statuses=["available"],
update_resource=self.mock_get_from_manager.mock.return_value,
timeout=bench_cfg.cinder_backup_restore_timeout,
check_interval=bench_cfg.cinder_backup_restore_poll_interval),
mock.call(
restore,
ready_statuses=["available"],
update_resource=self.mock_get_from_manager.mock.return_value,
timeout=bench_cfg.cinder_volume_create_timeout,
check_interval=bench_cfg.cinder_volume_create_poll_interval)
])
self.mock_get_from_manager.mock.assert_has_calls([mock.call(),
mock.call()])
self.assertEqual(self.mock_wait_for_status.mock.return_value,
return_restore)
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.restore_backup")
def test__list_backups(self):
return_backups_list = self.scenario._list_backups()
self.assertEqual(
self.clients("cinder").backups.list.return_value,
return_backups_list)
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.list_backups")
def test__get_random_server(self):
servers = [1, 2, 3]
context = {"user": {"tenant_id": "fake"},
"users": [{"tenant_id": "fake",
"users_per_tenant": 1}],
"tenant": {"id": "fake", "servers": servers}}
self.scenario.context = context
self.scenario.clients = mock.Mock()
self.scenario.clients("nova").servers.get = mock.Mock(
side_effect=lambda arg: arg)
server_id = self.scenario.get_random_server()
self.assertIn(server_id, servers)
def test__create_volume_type(self, **kwargs):
random_name = "random_name"
self.scenario.generate_random_name = mock.Mock(
return_value=random_name)
result = self.scenario._create_volume_type()
self.assertEqual(
self.admin_clients("cinder").volume_types.create.return_value,
result)
admin_clients = self.admin_clients("cinder")
admin_clients.volume_types.create.assert_called_once_with(
name="random_name")
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.create_volume_type")
def test__delete_encryption_type(self):
volume_type = mock.Mock()
self.assertRaises(exceptions.RallyException,
self.scenario._delete_encryption_type,
volume_type)
def test__create_encryption_type(self):
volume_type = mock.Mock()
specs = {
"provider": "foo_pro",
"cipher": "foo_cip",
"key_size": 512,
"control_location": "foo_con"
}
result = self.scenario._create_encryption_type(volume_type, specs)
self.assertEqual(
self.admin_clients(
"cinder").volume_encryption_types.create.return_value, result)
self.admin_clients(
"cinder").volume_encryption_types.create.assert_called_once_with(
volume_type, specs)
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.create_encryption_type")
def test__list_encryption_type(self):
return_encryption_types_list = self.scenario._list_encryption_type()
client = self.admin_clients("cinder")
self.assertEqual(client.volume_encryption_types.list.return_value,
return_encryption_types_list)
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.list_encryption_type")
def test__get_volume_type(self):
volume_type = mock.Mock()
result = self.scenario._get_volume_type(volume_type)
self.assertEqual(
self.admin_clients("cinder").volume_types.get.return_value,
result)
self.admin_clients("cinder").volume_types.get.assert_called_once_with(
volume_type)
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.get_volume_type")
def test__delete_volume_type(self):
volume_type = mock.Mock()
self.scenario._delete_volume_type(volume_type)
admin_clients = self.admin_clients("cinder")
admin_clients.volume_types.delete.assert_called_once_with(
volume_type)
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.delete_volume_type")
def test__transfer_create(self):
fake_volume = mock.MagicMock()
random_name = "random_name"
self.scenario.generate_random_name = mock.MagicMock(
return_value=random_name)
result = self.scenario._transfer_create(fake_volume.id)
self.assertEqual(
self.clients("cinder").transfers.create.return_value,
result)
self.clients("cinder").transfers.create.assert_called_once_with(
fake_volume.id, random_name)
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.transfer_create")
def test__transfer_accept(self):
fake_transfer = mock.MagicMock()
result = self.scenario._transfer_accept(fake_transfer.id, "fake_key")
self.assertEqual(
self.clients("cinder").transfers.accept.return_value,
result)
self.clients("cinder").transfers.accept.assert_called_once_with(
fake_transfer.id, "fake_key")
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.transfer_accept")
def test__set_volume_type_keys(self):
volume_type = mock.MagicMock()
volume_type.set_keys = mock.MagicMock()
volume_type_key = {"volume_backend_name": "LVM_iSCSI"}
result = self.scenario._set_volume_type_keys(volume_type,
volume_type_key)
self.assertEqual(volume_type.set_keys.return_value, result)
volume_type.set_keys.assert_called_once_with(volume_type_key)
self._test_atomic_action_timer(self.scenario.atomic_actions(),
"cinder.set_volume_type_keys")

View File

@ -1,90 +0,0 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import tempfile
import ddt
import mock
from rally_openstack.scenarios.glance import utils
from tests.unit import test
GLANCE_UTILS = "rally_openstack.scenarios.glance.utils"
@ddt.ddt
class GlanceScenarioTestCase(test.ScenarioTestCase):
def setUp(self):
super(GlanceScenarioTestCase, self).setUp()
self.image = mock.Mock()
self.image1 = mock.Mock()
self.scenario_clients = mock.Mock()
self.scenario_clients.glance.choose_version.return_value = 1
def test_list_images(self):
scenario = utils.GlanceScenario(context=self.context)
return_images_list = scenario._list_images()
self.clients("glance").images.list.assert_called_once_with()
self.assertEqual(list(self.clients("glance").images.list.return_value),
return_images_list)
self._test_atomic_action_timer(scenario.atomic_actions(),
"glance.list_images")
@ddt.data({},
{"name": "foo"},
{"name": None},
{"name": ""},
{"name": "bar", "fakearg": "fakearg"},
{"fakearg": "fakearg"})
@mock.patch("rally_openstack.wrappers.glance.wrap")
def test_create_image(self, create_args, mock_wrap):
image_location = tempfile.NamedTemporaryFile()
mock_wrap.return_value.create_image.return_value = self.image
scenario = utils.GlanceScenario(context=self.context,
clients=self.scenario_clients)
scenario.generate_random_name = mock.Mock()
return_image = scenario._create_image("container_format",
image_location.name,
"disk_format",
**create_args)
expected_args = dict(create_args)
if not expected_args.get("name"):
expected_args["name"] = scenario.generate_random_name.return_value
self.assertEqual(self.image, return_image)
mock_wrap.assert_called_once_with(scenario._clients.glance, scenario)
mock_wrap.return_value.create_image.assert_called_once_with(
"container_format", image_location.name, "disk_format",
**expected_args)
self._test_atomic_action_timer(scenario.atomic_actions(),
"glance.create_image")
@mock.patch("rally_openstack.wrappers.glance.wrap")
def test_delete_image(self, mock_wrap):
deleted_image = mock.Mock(status="DELETED")
wrapper = mock_wrap.return_value
wrapper.get_image.side_effect = [self.image, deleted_image]
scenario = utils.GlanceScenario(context=self.context,
clients=self.scenario_clients)
scenario._delete_image(self.image)
self.clients("glance").images.delete.assert_called_once_with(
self.image.id)
mock_wrap.assert_called_once_with(scenario._clients.glance, scenario)
self._test_atomic_action_timer(scenario.atomic_actions(),
"glance.delete_image")

View File

@ -1,372 +0,0 @@
# Copyright 2013: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
from rally_openstack.scenarios.keystone import utils
from tests.unit import fakes
from tests.unit import test
UTILS = "rally_openstack.scenarios.keystone.utils."
@ddt.ddt
class KeystoneScenarioTestCase(test.ScenarioTestCase):
@mock.patch("uuid.uuid4", return_value="pwd")
def test_user_create(self, mock_uuid4):
scenario = utils.KeystoneScenario(self.context)
scenario.generate_random_name = mock.Mock(return_value="foobarov")
result = scenario._user_create()
self.assertEqual(
self.admin_clients("keystone").users.create.return_value, result)
self.admin_clients("keystone").users.create.assert_called_once_with(
"foobarov",
password=mock_uuid4.return_value,
email="foobarov@rally.me")
mock_uuid4.assert_called_with()
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.create_user")
def test_update_user_enabled(self):
user = mock.Mock()
enabled = mock.Mock()
scenario = utils.KeystoneScenario(self.context)
scenario._update_user_enabled(user, enabled)
self.admin_clients(
"keystone").users.update_enabled.assert_called_once_with(user,
enabled)
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.update_user_enabled")
def test_token_validate(self):
token = mock.MagicMock()
scenario = utils.KeystoneScenario(self.context)
scenario._token_validate(token)
self.admin_clients(
"keystone").tokens.validate.assert_called_once_with(token)
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.validate_token")
def test_token_authenticate(self):
name = mock.MagicMock()
psswd = "foopsswd"
tenant_id = mock.MagicMock()
tenant_name = mock.MagicMock()
scenario = utils.KeystoneScenario(self.context)
scenario._authenticate_token(name, psswd, tenant_id, tenant_name)
self.admin_clients(
"keystone").tokens.authenticate.assert_called_once_with(
name, tenant_id, tenant_name, "foopsswd")
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.token_authenticate")
@mock.patch("rally_openstack.wrappers.keystone.wrap")
def test_role_create(self, mock_wrap, **kwargs):
role = mock.MagicMock()
mock_wrap.return_value.create_role.return_value = role
scenario = utils.KeystoneScenario(self.context)
scenario.generate_random_name = mock.MagicMock()
return_role = scenario._role_create(**kwargs)
self.assertEqual(role, return_role)
mock_wrap.assert_called_once_with(scenario.admin_clients("keystone"))
mock_wrap.return_value.create_role.assert_called_once_with(
scenario.generate_random_name.return_value, **kwargs)
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.create_role")
@mock.patch("rally_openstack.wrappers.keystone.wrap")
def test_role_delete(self, mock_wrap):
role = mock.MagicMock()
scenario = utils.KeystoneScenario(self.context)
scenario._role_delete(role.id)
mock_wrap.assert_called_once_with(scenario.admin_clients("keystone"))
mock_wrap.return_value.delete_role.assert_called_once_with(role.id)
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.role_delete")
def test_list_roles_for_user(self):
user = mock.MagicMock()
tenant = mock.MagicMock()
scenario = utils.KeystoneScenario(self.context)
scenario._list_roles_for_user(user, tenant)
self.admin_clients(
"keystone").roles.roles_for_user.assert_called_once_with(user,
tenant)
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.list_roles")
def test_role_add(self):
user = mock.MagicMock()
role = mock.MagicMock()
tenant = mock.MagicMock()
scenario = utils.KeystoneScenario(self.context)
scenario._role_add(user=user.id, role=role.id, tenant=tenant.id)
self.admin_clients(
"keystone").roles.add_user_role.assert_called_once_with(user.id,
role.id,
tenant.id)
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.add_role")
def test_user_delete(self):
resource = fakes.FakeResource()
resource.delete = mock.MagicMock()
scenario = utils.KeystoneScenario(self.context)
scenario._resource_delete(resource)
resource.delete.assert_called_once_with()
r = "keystone.delete_%s" % resource.__class__.__name__.lower()
self._test_atomic_action_timer(scenario.atomic_actions(), r)
def test_role_remove(self):
user = mock.MagicMock()
role = mock.MagicMock()
tenant = mock.MagicMock()
scenario = utils.KeystoneScenario(self.context)
scenario._role_remove(user=user, role=role, tenant=tenant)
self.admin_clients(
"keystone").roles.remove_user_role.assert_called_once_with(user,
role,
tenant)
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.remove_role")
def test_tenant_create(self):
scenario = utils.KeystoneScenario(self.context)
scenario.generate_random_name = mock.Mock()
result = scenario._tenant_create()
self.assertEqual(
self.admin_clients("keystone").tenants.create.return_value, result)
self.admin_clients("keystone").tenants.create.assert_called_once_with(
scenario.generate_random_name.return_value)
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.create_tenant")
@ddt.data(
{"service_type": "service_type"},
{"service_type": None}
)
def test_service_create(self, service_type):
scenario = utils.KeystoneScenario(self.context)
scenario.generate_random_name = mock.Mock()
result = scenario._service_create(
service_type=service_type, description="description")
self.assertEqual(
self.admin_clients("keystone").services.create.return_value,
result)
if service_type == "service_type":
self.admin_clients(
"keystone").services.create.assert_called_once_with(
scenario.generate_random_name.return_value,
service_type, description="description")
elif service_type is None:
self.admin_clients(
"keystone").services.create.assert_called_once_with(
scenario.generate_random_name.return_value,
"rally_test_type", description="description")
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.create_service")
def test_tenant_create_with_users(self):
tenant = mock.MagicMock()
scenario = utils.KeystoneScenario(self.context)
scenario.generate_random_name = mock.Mock(return_value="foobarov")
scenario._users_create(tenant, users_per_tenant=1)
self.admin_clients("keystone").users.create.assert_called_once_with(
"foobarov", password="foobarov", email="foobarov@rally.me",
tenant_id=tenant.id)
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.create_users")
def test_list_users(self):
scenario = utils.KeystoneScenario(self.context)
scenario._list_users()
self.admin_clients("keystone").users.list.assert_called_once_with()
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.list_users")
def test_list_tenants(self):
scenario = utils.KeystoneScenario(self.context)
scenario._list_tenants()
self.admin_clients("keystone").tenants.list.assert_called_once_with()
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.list_tenants")
def test_list_services(self):
scenario = utils.KeystoneScenario(self.context)
scenario._list_services()
self.admin_clients("keystone").services.list.assert_called_once_with()
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.service_list")
def test_delete_service(self):
service = mock.MagicMock()
scenario = utils.KeystoneScenario(self.context)
scenario._delete_service(service_id=service.id)
self.admin_clients("keystone").services.delete.assert_called_once_with(
service.id)
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.delete_service")
def test_get_tenant(self):
tenant = mock.MagicMock()
scenario = utils.KeystoneScenario(self.context)
scenario._get_tenant(tenant_id=tenant.id)
self.admin_clients("keystone").tenants.get.assert_called_once_with(
tenant.id)
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.get_tenant")
def test_get_user(self):
user = mock.MagicMock()
scenario = utils.KeystoneScenario(self.context)
scenario._get_user(user_id=user.id)
self.admin_clients("keystone").users.get.assert_called_once_with(
user.id)
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.get_user")
def test_get_role(self):
role = mock.MagicMock()
scenario = utils.KeystoneScenario(self.context)
scenario._get_role(role_id=role.id)
self.admin_clients("keystone").roles.get.assert_called_once_with(
role.id)
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.get_role")
def test_get_service(self):
service = mock.MagicMock()
scenario = utils.KeystoneScenario(self.context)
scenario._get_service(service_id=service.id)
self.admin_clients("keystone").services.get.assert_called_once_with(
service.id)
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.get_service")
def test_update_tenant(self):
tenant = mock.MagicMock()
description = "new description"
scenario = utils.KeystoneScenario(self.context)
scenario.generate_random_name = mock.Mock()
scenario._update_tenant(tenant=tenant, description=description)
self.admin_clients("keystone").tenants.update.assert_called_once_with(
tenant.id, scenario.generate_random_name.return_value,
description)
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.update_tenant")
def test_update_user_password(self):
password = "pswd"
user = mock.MagicMock()
scenario = utils.KeystoneScenario(self.context)
scenario._update_user_password(password=password, user_id=user.id)
self.admin_clients(
"keystone").users.update_password.assert_called_once_with(user.id,
password)
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.update_user_password")
@mock.patch("rally_openstack.scenario.OpenStackScenario."
"admin_clients")
def test_update_user_password_v3(self,
mock_open_stack_scenario_admin_clients):
password = "pswd"
user = mock.MagicMock()
scenario = utils.KeystoneScenario()
type(mock_open_stack_scenario_admin_clients.return_value).version = (
mock.PropertyMock(return_value="v3"))
scenario._update_user_password(password=password, user_id=user.id)
mock_open_stack_scenario_admin_clients(
"keystone").users.update.assert_called_once_with(
user.id, password=password)
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.update_user_password")
def test_get_service_by_name(self):
scenario = utils.KeystoneScenario(self.context)
svc_foo, svc_bar = mock.Mock(), mock.Mock()
scenario._list_services = mock.Mock(return_value=[svc_foo, svc_bar])
self.assertEqual(scenario._get_service_by_name(svc_bar.name), svc_bar)
self.assertIsNone(scenario._get_service_by_name("spam"))
@mock.patch(UTILS + "KeystoneScenario.clients")
def test_create_ec2credentials(self, mock_clients):
scenario = utils.KeystoneScenario(self.context)
creds = mock.Mock()
mock_clients("keystone").ec2.create.return_value = creds
create_creds = scenario._create_ec2credentials("user_id",
"tenant_id")
self.assertEqual(create_creds, creds)
mock_clients("keystone").ec2.create.assert_called_once_with(
"user_id", "tenant_id")
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.create_ec2creds")
@mock.patch(UTILS + "KeystoneScenario.clients")
def test_list_ec2credentials(self, mock_clients):
scenario = utils.KeystoneScenario(self.context)
creds_list = mock.Mock()
mock_clients("keystone").ec2.list.return_value = creds_list
list_creds = scenario._list_ec2credentials("user_id")
self.assertEqual(list_creds, creds_list)
mock_clients("keystone").ec2.list.assert_called_once_with("user_id")
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.list_ec2creds")
@mock.patch(UTILS + "KeystoneScenario.clients")
def test_delete_ec2credentials(self, mock_clients):
scenario = utils.KeystoneScenario(self.context)
mock_clients("keystone").ec2.delete = mock.MagicMock()
scenario._delete_ec2credential("user_id", "access")
mock_clients("keystone").ec2.delete.assert_called_once_with("user_id",
"access")
self._test_atomic_action_timer(scenario.atomic_actions(),
"keystone.delete_ec2creds")

View File

@ -1,102 +0,0 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
from rally import exceptions
from rally_openstack.wrappers import cinder as cinder_wrapper
from tests.unit import test
@ddt.ddt
class CinderWrapperTestCase(test.ScenarioTestCase):
@ddt.data(
{"version": "1", "expected_class": cinder_wrapper.CinderV1Wrapper},
{"version": "2", "expected_class": cinder_wrapper.CinderV2Wrapper}
)
@ddt.unpack
def test_wrap(self, version, expected_class):
client = mock.MagicMock()
client.choose_version.return_value = version
self.assertIsInstance(cinder_wrapper.wrap(client, mock.Mock()),
expected_class)
@mock.patch("rally_openstack.wrappers.cinder.LOG")
def test_wrap_wrong_version(self, mock_log):
client = mock.MagicMock()
client.choose_version.return_value = "dummy"
self.assertRaises(exceptions.InvalidArgumentsException,
cinder_wrapper.wrap, client, mock.Mock())
self.assertTrue(mock_log.warning.mock_called)
class CinderV1WrapperTestCase(test.TestCase):
def setUp(self):
super(CinderV1WrapperTestCase, self).setUp()
self.client = mock.MagicMock()
self.client.choose_version.return_value = "1"
self.owner = mock.Mock()
self.wrapped_client = cinder_wrapper.wrap(self.client, self.owner)
def test_create_volume(self):
self.wrapped_client.create_volume(1, display_name="fake_vol")
self.client.return_value.volumes.create.assert_called_once_with(
1, display_name=self.owner.generate_random_name.return_value)
def test_update_volume(self):
self.wrapped_client.update_volume("fake_id", display_name="fake_vol",
display_description="_updated")
self.client.return_value.volumes.update.assert_called_once_with(
"fake_id",
display_name=self.owner.generate_random_name.return_value,
display_description="_updated")
def test_create_snapshot(self):
self.wrapped_client.create_snapshot("fake_id",
display_name="fake_snap")
(self.client.return_value.volume_snapshots.create.
assert_called_once_with(
"fake_id",
display_name=self.owner.generate_random_name.return_value))
class CinderV2WrapperTestCase(test.TestCase):
def setUp(self):
super(CinderV2WrapperTestCase, self).setUp()
self.client = mock.MagicMock()
self.client.choose_version.return_value = "2"
self.owner = mock.Mock()
self.wrapped_client = cinder_wrapper.wrap(self.client, self.owner)
def test_create_volume(self):
self.wrapped_client.create_volume(1, name="fake_vol")
self.client.return_value.volumes.create.assert_called_once_with(
1, name=self.owner.generate_random_name.return_value)
def test_create_snapshot(self):
self.wrapped_client.create_snapshot("fake_id", name="fake_snap")
(self.client.return_value.volume_snapshots.create.
assert_called_once_with(
"fake_id",
name=self.owner.generate_random_name.return_value))
def test_update_volume(self):
self.wrapped_client.update_volume("fake_id", name="fake_vol",
description="_updated")
self.client.return_value.volumes.update.assert_called_once_with(
"fake_id", name=self.owner.generate_random_name.return_value,
description="_updated")

View File

@ -1,269 +0,0 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import tempfile
import ddt
from glanceclient import exc as glance_exc
import mock
from rally.common import cfg
from rally import exceptions
from rally_openstack.wrappers import glance as glance_wrapper
from tests.unit import test
CONF = cfg.CONF
@ddt.ddt
class GlanceWrapperTestCase(test.ScenarioTestCase):
@ddt.data(
{"version": "1", "expected_class": glance_wrapper.GlanceV1Wrapper},
{"version": "2", "expected_class": glance_wrapper.GlanceV2Wrapper}
)
@ddt.unpack
def test_wrap(self, version, expected_class):
client = mock.MagicMock()
client.choose_version.return_value = version
self.assertIsInstance(glance_wrapper.wrap(client, mock.Mock()),
expected_class)
@mock.patch("rally_openstack.wrappers.glance.LOG")
def test_wrap_wrong_version(self, mock_log):
client = mock.MagicMock()
client.choose_version.return_value = "dummy"
self.assertRaises(exceptions.InvalidArgumentsException,
glance_wrapper.wrap, client, mock.Mock())
self.assertTrue(mock_log.warning.mock_called)
@ddt.ddt
class GlanceV1WrapperTestCase(test.ScenarioTestCase):
_tempfile = tempfile.NamedTemporaryFile()
def setUp(self):
super(GlanceV1WrapperTestCase, self).setUp()
self.client = mock.MagicMock()
self.client.choose_version.return_value = "1"
self.owner = mock.Mock()
self.wrapped_client = glance_wrapper.wrap(self.client, self.owner)
def test_get_image(self):
image = mock.Mock()
return_image = self.wrapped_client.get_image(image)
self.client.return_value.images.get.assert_called_once_with(image.id)
self.assertEqual(return_image,
self.client.return_value.images.get.return_value)
def test_get_image_not_found(self):
image = mock.Mock()
self.client.return_value.images.get.side_effect = (
glance_exc.HTTPNotFound)
self.assertRaises(exceptions.GetResourceNotFound,
self.wrapped_client.get_image, image)
self.client.return_value.images.get.assert_called_once_with(image.id)
@ddt.data(
{"location": "image_location", "visibility": "private"},
{"location": "image_location", "fakearg": "fake"},
{"location": "image_location", "name": "image_name"},
{"location": _tempfile.name, "visibility": "public"})
@ddt.unpack
@mock.patch("six.moves.builtins.open")
def test_create_image(self, mock_open, location, **kwargs):
return_image = self.wrapped_client.create_image("container_format",
location,
"disk_format",
**kwargs)
call_args = kwargs
call_args["container_format"] = "container_format"
call_args["disk_format"] = "disk_format"
if location.startswith("/"):
call_args["data"] = mock_open.return_value
mock_open.assert_called_once_with(location)
mock_open.return_value.close.assert_called_once_with()
else:
call_args["copy_from"] = location
if "name" not in kwargs:
call_args["name"] = self.owner.generate_random_name.return_value
if "visibility" in kwargs:
call_args["is_public"] = call_args.pop("visibility") == "public"
self.client().images.create.assert_called_once_with(**call_args)
self.mock_wait_for_status.mock.assert_called_once_with(
self.client().images.create.return_value, ["active"],
update_resource=self.wrapped_client.get_image,
check_interval=CONF.openstack.glance_image_create_poll_interval,
timeout=CONF.openstack.glance_image_create_timeout)
self.assertEqual(self.mock_wait_for_status.mock.return_value,
return_image)
@ddt.data({"expected": True},
{"visibility": "public", "expected": True},
{"visibility": "private", "expected": False})
@ddt.unpack
def test_set_visibility(self, visibility=None, expected=None):
image = mock.Mock()
if visibility is None:
self.wrapped_client.set_visibility(image)
else:
self.wrapped_client.set_visibility(image, visibility=visibility)
self.client().images.update.assert_called_once_with(
image.id, is_public=expected)
@ddt.data({}, {"fakearg": "fake"})
def test_list_images_basic(self, filters):
self.assertEqual(self.wrapped_client.list_images(**filters),
self.client().images.list.return_value)
self.client().images.list.assert_called_once_with(filters=filters)
def test_list_images_with_owner(self):
self.assertEqual(self.wrapped_client.list_images(fakearg="fake",
owner="fakeowner"),
self.client().images.list.return_value)
self.client().images.list.assert_called_once_with(
owner="fakeowner", filters={"fakearg": "fake"})
def test_list_images_visibility_public(self):
public_images = [mock.Mock(is_public=True), mock.Mock(is_public=True)]
private_images = [mock.Mock(is_public=False),
mock.Mock(is_public=False)]
self.client().images.list.return_value = public_images + private_images
self.assertEqual(self.wrapped_client.list_images(fakearg="fake",
visibility="public"),
public_images)
self.client().images.list.assert_called_once_with(
filters={"fakearg": "fake"})
def test_list_images_visibility_private(self):
public_images = [mock.Mock(is_public=True), mock.Mock(is_public=True)]
private_images = [mock.Mock(is_public=False),
mock.Mock(is_public=False)]
self.client().images.list.return_value = public_images + private_images
self.assertEqual(self.wrapped_client.list_images(fakearg="fake",
visibility="private"),
private_images)
self.client().images.list.assert_called_once_with(
filters={"fakearg": "fake"})
@ddt.ddt
class GlanceV2WrapperTestCase(test.ScenarioTestCase):
_tempfile = tempfile.NamedTemporaryFile()
def setUp(self):
super(GlanceV2WrapperTestCase, self).setUp()
self.client = mock.MagicMock()
self.client.choose_version.return_value = "2"
self.owner = mock.Mock()
self.wrapped_client = glance_wrapper.wrap(self.client, self.owner)
def test_get_image(self):
image = mock.Mock()
return_image = self.wrapped_client.get_image(image)
self.client.return_value.images.get.assert_called_once_with(image.id)
self.assertEqual(return_image,
self.client.return_value.images.get.return_value)
def test_get_image_not_found(self):
image = mock.Mock()
self.client.return_value.images.get.side_effect = (
glance_exc.HTTPNotFound)
self.assertRaises(exceptions.GetResourceNotFound,
self.wrapped_client.get_image, image)
self.client.return_value.images.get.assert_called_once_with(image.id)
@ddt.data(
{"location": "image_location", "visibility": "private"},
{"location": "image_location", "fakearg": "fake"},
{"location": "image_location", "name": "image_name"},
{"location": _tempfile.name, "visibility": "public"},
{"location": "image_location",
"expected_kwargs": {"visibility": "public"}, "is_public": True})
@ddt.unpack
@mock.patch("six.moves.builtins.open")
@mock.patch("requests.get")
def test_create_image(self, mock_requests_get, mock_open, location,
expected_kwargs=None, **kwargs):
self.wrapped_client.get_image = mock.Mock()
created_image = mock.Mock()
uploaded_image = mock.Mock()
self.mock_wait_for_status.mock.side_effect = [created_image,
uploaded_image]
return_image = self.wrapped_client.create_image("container_format",
location,
"disk_format",
**kwargs)
create_args = expected_kwargs or kwargs
create_args["container_format"] = "container_format"
create_args["disk_format"] = "disk_format"
create_args.setdefault("name",
self.owner.generate_random_name.return_value)
self.client().images.create.assert_called_once_with(**create_args)
if location.startswith("/"):
data = mock_open.return_value
mock_open.assert_called_once_with(location)
else:
data = mock_requests_get.return_value.raw
mock_requests_get.assert_called_once_with(location, stream=True)
data.close.assert_called_once_with()
self.client().images.upload.assert_called_once_with(created_image.id,
data)
self.mock_wait_for_status.mock.assert_has_calls([
mock.call(
self.client().images.create.return_value, ["queued"],
update_resource=self.wrapped_client.get_image,
check_interval=CONF.openstack.
glance_image_create_poll_interval,
timeout=CONF.openstack.glance_image_create_timeout),
mock.call(
created_image, ["active"],
update_resource=self.wrapped_client.get_image,
check_interval=CONF.openstack.
glance_image_create_poll_interval,
timeout=mock.ANY)])
self.assertEqual(uploaded_image, return_image)
@ddt.data({},
{"visibility": "public"},
{"visibility": "private"})
@ddt.unpack
def test_set_visibility(self, visibility=None):
image = mock.Mock()
if visibility is None:
self.wrapped_client.set_visibility(image)
visibility = "public"
else:
self.wrapped_client.set_visibility(image, visibility=visibility)
self.client().images.update.assert_called_once_with(
image.id, visibility=visibility)
@ddt.data({}, {"fakearg": "fake"})
def test_list_images(self, filters):
self.assertEqual(self.wrapped_client.list_images(**filters),
self.client().images.list.return_value)
self.client().images.list.assert_called_once_with(filters=filters)

View File

@ -1,242 +0,0 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystoneclient import exceptions
import mock
from rally_openstack.wrappers import keystone
from tests.unit import test
class KeystoneWrapperTestBase(object):
def test_list_services(self):
service = mock.MagicMock()
service.id = "fake_id"
service.name = "Foobar"
service.extra_field = "extra_field"
self.client.services.list.return_value = [service]
result = list(self.wrapped_client.list_services())
self.assertEqual([("fake_id", "Foobar")], result)
self.assertEqual("fake_id", result[0].id)
self.assertEqual("Foobar", result[0].name)
self.assertFalse(hasattr(result[0], "extra_field"))
def test_wrap(self):
client = mock.MagicMock()
client.version = "dummy"
self.assertRaises(NotImplementedError, keystone.wrap, client)
def test_delete_service(self):
self.wrapped_client.delete_service("fake_id")
self.client.services.delete.assert_called_once_with("fake_id")
def test_list_roles(self):
role = mock.MagicMock()
role.id = "fake_id"
role.name = "Foobar"
role.extra_field = "extra_field"
self.client.roles.list.return_value = [role]
result = list(self.wrapped_client.list_roles())
self.assertEqual([("fake_id", "Foobar")], result)
self.assertEqual("fake_id", result[0].id)
self.assertEqual("Foobar", result[0].name)
self.assertFalse(hasattr(result[0], "extra_field"))
def test_delete_role(self):
self.wrapped_client.delete_role("fake_id")
self.client.roles.delete.assert_called_once_with("fake_id")
class KeystoneV2WrapperTestCase(test.TestCase, KeystoneWrapperTestBase):
def setUp(self):
super(KeystoneV2WrapperTestCase, self).setUp()
self.client = mock.MagicMock()
self.client.version = "v2.0"
self.wrapped_client = keystone.wrap(self.client)
def test_create_project(self):
self.wrapped_client.create_project("Foobar")
self.client.tenants.create.assert_called_once_with("Foobar")
def test_create_project_in_non_default_domain_fail(self):
self.assertRaises(
NotImplementedError, self.wrapped_client.create_project,
"Foobar", "non-default-domain")
def test_delete_project(self):
self.wrapped_client.delete_project("fake_id")
self.client.tenants.delete.assert_called_once_with("fake_id")
def test_list_projects(self):
tenant = mock.MagicMock()
tenant.id = "fake_id"
tenant.name = "Foobar"
tenant.extra_field = "extra_field"
self.client.tenants.list.return_value = [tenant]
result = list(self.wrapped_client.list_projects())
self.assertEqual([("fake_id", "Foobar", "default")], result)
self.assertEqual("fake_id", result[0].id)
self.assertEqual("Foobar", result[0].name)
self.assertEqual("default", result[0].domain_id)
self.assertFalse(hasattr(result[0], "extra_field"))
def test_create_user(self):
self.wrapped_client.create_user("foo", "bar", email="foo@bar.com",
project_id="tenant_id",
domain_name="default")
self.client.users.create.assert_called_once_with(
"foo", "bar", "foo@bar.com", "tenant_id")
def test_create_user_in_non_default_domain_fail(self):
self.assertRaises(
NotImplementedError, self.wrapped_client.create_user,
"foo", "bar", email="foo@bar.com", project_id="tenant_id",
domain_name="non-default-domain")
def test_delete_user(self):
self.wrapped_client.delete_user("fake_id")
self.client.users.delete.assert_called_once_with("fake_id")
def test_list_users(self):
user = mock.MagicMock()
user.id = "fake_id"
user.name = "foo"
user.tenantId = "tenant_id"
user.extra_field = "extra_field"
self.client.users.list.return_value = [user]
result = list(self.wrapped_client.list_users())
self.assertEqual([("fake_id", "foo", "tenant_id", "default")], result)
self.assertEqual("fake_id", result[0].id)
self.assertEqual("foo", result[0].name)
self.assertEqual("tenant_id", result[0].project_id)
self.assertEqual("default", result[0].domain_id)
self.assertFalse(hasattr(result[0], "extra_field"))
def test_create_role(self):
self.wrapped_client.create_role("foo_name")
self.client.roles.create.assert_called_once_with("foo_name")
def test_add_role(self):
self.wrapped_client.add_role("fake_role_id", "fake_user_id",
"fake_project_id")
self.client.roles.add_user_role.assert_called_once_with(
"fake_user_id", "fake_role_id", tenant="fake_project_id")
def test_remove_role(self):
self.wrapped_client.remove_role("fake_role_id", "fake_user_id",
"fake_project_id")
self.client.roles.remove_user_role.assert_called_once_with(
"fake_user_id", "fake_role_id", tenant="fake_project_id")
class KeystoneV3WrapperTestCase(test.TestCase, KeystoneWrapperTestBase):
def setUp(self):
super(KeystoneV3WrapperTestCase, self).setUp()
self.client = mock.MagicMock()
self.client.version = "v3"
self.wrapped_client = keystone.wrap(self.client)
self.client.domains.get.side_effect = exceptions.NotFound
self.client.domains.list.return_value = [
mock.MagicMock(id="domain_id")]
def test_create_project(self):
self.wrapped_client.create_project("Foobar", "domain")
self.client.projects.create.assert_called_once_with(
name="Foobar", domain="domain_id")
def test_create_project_with_non_existing_domain_fail(self):
self.client.domains.list.return_value = []
self.assertRaises(exceptions.NotFound,
self.wrapped_client.create_project,
"Foobar", "non-existing-domain")
def test_delete_project(self):
self.wrapped_client.delete_project("fake_id")
self.client.projects.delete.assert_called_once_with("fake_id")
def test_list_projects(self):
project = mock.MagicMock()
project.id = "fake_id"
project.name = "Foobar"
project.domain_id = "domain_id"
project.extra_field = "extra_field"
self.client.projects.list.return_value = [project]
result = list(self.wrapped_client.list_projects())
self.assertEqual([("fake_id", "Foobar", "domain_id")], result)
self.assertEqual("fake_id", result[0].id)
self.assertEqual("Foobar", result[0].name)
self.assertEqual("domain_id", result[0].domain_id)
self.assertFalse(hasattr(result[0], "extra_field"))
def test_create_user(self):
fake_role = mock.MagicMock(id="fake_role_id")
fake_role.name = "__member__"
self.client.roles.list.return_value = [fake_role]
self.client.users.create.return_value = mock.MagicMock(
id="fake_user_id")
self.wrapped_client.create_user(
"foo", "bar", email="foo@bar.com",
project_id="project_id", domain_name="domain")
self.client.users.create.assert_called_once_with(
name="foo", password="bar",
email="foo@bar.com", default_project="project_id",
domain="domain_id")
def test_create_user_with_non_existing_domain_fail(self):
self.client.domains.list.return_value = []
self.assertRaises(exceptions.NotFound,
self.wrapped_client.create_user, "foo", "bar",
email="foo@bar.com", project_id="project_id",
domain_name="non-existing-domain")
def test_delete_user(self):
self.wrapped_client.delete_user("fake_id")
self.client.users.delete.assert_called_once_with("fake_id")
def test_list_users(self):
user = mock.MagicMock()
user.id = "fake_id"
user.name = "foo"
user.default_project_id = "project_id"
user.domain_id = "domain_id"
user.extra_field = "extra_field"
self.client.users.list.return_value = [user]
result = list(self.wrapped_client.list_users())
self.assertEqual([("fake_id", "foo", "project_id", "domain_id")],
result)
self.assertEqual("fake_id", result[0].id)
self.assertEqual("foo", result[0].name)
self.assertEqual("project_id", result[0].project_id)
self.assertEqual("domain_id", result[0].domain_id)
self.assertFalse(hasattr(result[0], "extra_field"))
def test_create_role(self, **kwargs):
self.wrapped_client.create_role("foo_name", domain="domain",
**kwargs)
self.client.roles.create.assert_called_once_with(
"foo_name", domain="domain", **kwargs)
def test_add_role(self):
self.wrapped_client.add_role("fake_role_id", "fake_user_id",
"fake_project_id")
self.client.roles.grant.assert_called_once_with(
"fake_role_id", user="fake_user_id", project="fake_project_id")
def test_remove_role(self):
self.wrapped_client.remove_role("fake_role_id", "fake_user_id",
"fake_project_id")
self.client.roles.revoke.assert_called_once_with(
"fake_role_id", user="fake_user_id", project="fake_project_id")