Added blockstorage v1 volumes_api models and unittests.

* Moved config to common/config.  V1 config inherits from it now.
* Added automarshalling module for volumes_api common model code.
* Response models now deserialize metadata and links correctly.
* Added unittests for response models.
* Added unittests for wait behavior methods

Change-Id: Ic261b8af13cc438d2f4da532d87dd37c420f035a
This commit is contained in:
Jose Idar 2014-01-08 16:27:33 -06:00
parent d8bae9e947
commit 61e69db485
15 changed files with 1306 additions and 180 deletions

View File

@ -0,0 +1,15 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

View File

@ -0,0 +1,105 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from cloudcafe.common.models.configuration import ConfigSectionInterface
class VolumesAPIConfig(ConfigSectionInterface):
SECTION_NAME = 'volumes_api'
@property
def serialize_format(self):
return self.get("serialize_format", default="json")
@property
def deserialize_format(self):
return self.get("deserialize_format", default="json")
# Volumes behavior config
@property
def default_volume_type(self):
return self.get("default_volume_type")
@property
def max_volume_size(self):
return int(self.get("max_volume_size", default=1024))
@property
def min_volume_size(self):
return int(self.get("min_volume_size", default=1))
@property
def volume_status_poll_frequency(self):
return int(self.get("volume_status_poll_frequency", default=5))
@property
def volume_create_timeout(self):
return int(self.get("volume_create_timeout", default=600))
@property
def volume_delete_min_timeout(self):
return int(self.get("volume_delete_min_timeout", default=0))
@property
def volume_delete_max_timeout(self):
return int(self.get("volume_delete_max_timeout", default=3600))
@property
def volume_delete_wait_per_gigabyte(self):
return int(self.get("volume_delete_wait_per_gigabyte", default=1))
# Snapshot behaviors config
@property
def snapshot_status_poll_frequency(self):
return int(self.get("snapshot_status_poll_frequency", default=10))
@property
def snapshot_create_max_timeout(self):
return int(self.get("snapshot_create_max_timeout", default=36000))
@property
def snapshot_create_min_timeout(self):
"""Absolute lower limit on calculated snapshot create timeouts"""
return int(self.get("snapshot_create_min_timeout", default=0))
@property
def snapshot_create_base_timeout(self):
"""Amount of time added by default to the final calculated snapshot
create timeouts.
"""
return int(self.get("snapshot_create_base_timeout", default=0))
@property
def snapshot_create_wait_per_gigabyte(self):
return int(self.get("snapshot_create_wait_per_gigabyte", default=600))
@property
def snapshot_delete_max_timeout(self):
"""Absolute upper limit on calculated snapshot delete timeouts"""
return int(self.get("snapshot_delete_max_timeout", default=36000))
@property
def snapshot_delete_min_timeout(self):
"""Absolute lower limit on calculated snapshot delete timeouts"""
return int(self.get("snapshot_delete_min_timeout", default=0))
@property
def snapshot_delete_wait_per_gigabyte(self):
"""If set, volume snapshot delete behaviors can estimate the time
it will take a particular volume to delete given it's size
"""
return int(self.get("snapshot_delete_wait_per_gigabyte", default=60))

View File

@ -0,0 +1,15 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

View File

@ -0,0 +1,88 @@
import json
from xml.etree import ElementTree
from cafe.engine.models.base import \
AutoMarshallingModel, AutoMarshallingListModel
class _VolumesAPIBaseModel(AutoMarshallingModel):
obj_model_key = None
kwarg_map = {}
@classmethod
def _map_values_to_kwargs(cls, deserialized_obj):
kwargs = {}
for local_kw, deserialized_obj_kw in cls.kwarg_map.iteritems():
kwargs[local_kw] = deserialized_obj.get(deserialized_obj_kw)
return cls(**kwargs)
@classmethod
def _json_to_obj(cls, serialized_str):
json_dict = json.loads(serialized_str)
volume_dict = json_dict.get(cls.obj_model_key)
return cls._json_dict_to_obj(volume_dict)
@classmethod
def _json_dict_to_obj(cls, json_dict):
return cls._map_values_to_kwargs(json_dict)
@classmethod
def _xml_to_obj(cls, serialized_str):
element = ElementTree.fromstring(serialized_str)
return cls._xml_ele_to_obj(element)
@classmethod
def _xml_ele_to_obj(cls, element):
return cls._map_values_to_kwargs(element)
class _VolumesAPIBaseListModel(AutoMarshallingListModel):
list_model_key = None
ObjectModel = None
@classmethod
def _json_to_obj(cls, serialized_str):
json_dict = json.loads(serialized_str)
list_of_dicts = json_dict.get(cls.list_model_key)
return cls._json_dict_to_obj(list_of_dicts)
@classmethod
def _json_dict_to_obj(cls, list_of_dicts):
obj_list = cls()
for obj_dict in list_of_dicts:
obj_list.append(cls.ObjectModel._json_dict_to_obj(obj_dict))
return obj_list
@classmethod
def _xml_to_obj(cls, serialized_str):
list_element = ElementTree.fromstring(serialized_str)
return cls._xml_ele_to_obj(list_element)
@classmethod
def _xml_ele_to_obj(cls, xml_etree_element):
obj_list = cls()
for element in xml_etree_element:
if element.tag.endswith(cls.list_model_key):
for obj_element in element:
obj_list.append(
cls.ObjectModel._xml_ele_to_obj(obj_element))
return obj_list
class _XMLDictionary(_VolumesAPIBaseModel):
dict_model_key = 'metadata'
key_name = 'key'
@classmethod
def _xml_ele_to_obj(
cls, xml_etree_element, dict_model_key=None, key_name=None):
dict_model_key = dict_model_key or cls.dict_model_key
key_name = key_name or cls.key_name
obj_dict = {}
for element in xml_etree_element:
if element.tag.endswith(dict_model_key):
for obj_element in element:
obj_dict[obj_element.get(key_name)] = obj_element.text
return obj_dict

View File

@ -13,7 +13,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from time import time, sleep
from cafe.engine.behaviors import BaseBehavior, behavior
@ -53,10 +52,10 @@ class VolumesAPI_Behaviors(BaseBehavior):
def calculate_snapshot_create_timeout(self, volume_size):
timeout = self._calculate_timeout(
size=int(volume_size),
max_timeout=int(self.config.snapshot_create_max_timeout),
min_timeout=int(self.config.snapshot_create_min_timeout),
wait_per_gb=int(self.config.snapshot_create_wait_per_gigabyte))
timeout += int(self.config.snapshot_create_base_timeout)
max_timeout=self.config.snapshot_create_max_timeout,
min_timeout=self.config.snapshot_create_min_timeout,
wait_per_gb=self.config.snapshot_create_wait_per_gigabyte)
timeout += self.config.snapshot_create_base_timeout
return timeout
@behavior(VolumesClient)
@ -141,9 +140,8 @@ class VolumesAPI_Behaviors(BaseBehavior):
Note: Unreliable for transient statuses like 'deleting'.
"""
poll_rate = int(
poll_rate or self.config.volume_status_poll_frequency)
timeout = int(timeout or self.config.volume_create_timeout)
poll_rate = poll_rate or self.config.volume_status_poll_frequency
timeout = timeout or self.config.volume_create_timeout
end_time = time() + int(timeout)
while time() < end_time:
@ -166,14 +164,13 @@ class VolumesAPI_Behaviors(BaseBehavior):
@behavior(VolumesClient)
def wait_for_snapshot_status(
self, snapshot_id, expected_status, timeout=None,
wait_period=None):
poll_rate=None):
""" Waits for a specific status and returns None when that status is
observed.
Note: Unreliable for transient statuses like 'deleting'.
"""
wait_period = int(
wait_period or self.config.snapshot_status_poll_frequency)
poll_rate = poll_rate or self.config.snapshot_status_poll_frequency
end_time = time() + int(timeout)
while time() < end_time:
@ -201,7 +198,7 @@ class VolumesAPI_Behaviors(BaseBehavior):
.format(expected_status))
break
sleep(wait_period)
sleep(poll_rate)
else:
msg = (
@ -317,7 +314,7 @@ class VolumesAPI_Behaviors(BaseBehavior):
@behavior(VolumesClient)
def delete_volume_confirmed(
self, volume_id, size=None, timeout=None, wait_period=None):
self, volume_id, size=None, timeout=None, poll_rate=None):
"""Returns True if volume deleted, False otherwise"""
timeout = self._calculate_timeout(
@ -326,8 +323,7 @@ class VolumesAPI_Behaviors(BaseBehavior):
max_timeout=self.config.volume_delete_max_timeout,
wait_per_gb=self.config.volume_delete_wait_per_gigabyte)
wait_period = float(
wait_period or self.config.volume_status_poll_frequency)
poll_rate = poll_rate or self.config.volume_status_poll_frequency
timeout_msg = (
"delete_volume_confirmed() was unable to confirm the volume"
@ -364,7 +360,7 @@ class VolumesAPI_Behaviors(BaseBehavior):
volume_id, status_resp.status_code))
return False
sleep(wait_period)
sleep(poll_rate)
else:
self._log.error(timeout_msg)
@ -372,7 +368,7 @@ class VolumesAPI_Behaviors(BaseBehavior):
@behavior(VolumesClient)
def delete_snapshot_confirmed(
self, snapshot_id, vol_size=None, timeout=None, wait_period=None):
self, snapshot_id, vol_size=None, timeout=None, poll_rate=None):
"""Returns True if snapshot deleted, False otherwise"""
timeout = self._calculate_timeout(
@ -381,8 +377,7 @@ class VolumesAPI_Behaviors(BaseBehavior):
max_timeout=self.config.snapshot_delete_max_timeout,
wait_per_gb=self.config.snapshot_delete_wait_per_gigabyte)
wait_period = float(
wait_period or self.config.snapshot_status_poll_frequency)
poll_rate = poll_rate or self.config.snapshot_status_poll_frequency
timeout_msg = (
"delete_snapshot_confirmed() was unable to confirm the snapshot "
@ -402,7 +397,7 @@ class VolumesAPI_Behaviors(BaseBehavior):
else:
break
sleep(wait_period)
sleep(poll_rate)
else:
self._log.error(timeout_msg)
return False
@ -422,7 +417,7 @@ class VolumesAPI_Behaviors(BaseBehavior):
snapshot_id, resp.status_code))
return False
sleep(wait_period)
sleep(poll_rate)
else:
self._log.error(timeout_msg)
return False

View File

@ -15,6 +15,7 @@ limitations under the License.
"""
from cafe.engine.clients.rest import AutoMarshallingRestClient
from cloudcafe.blockstorage.volumes_api.v1.models.requests import (
VolumeRequest, VolumeSnapshotRequest)

View File

@ -14,90 +14,9 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
from cloudcafe.common.models.configuration import ConfigSectionInterface
from cloudcafe.blockstorage.volumes_api.common.config import \
VolumesAPIConfig as _VolumesAPIConfig
class VolumesAPIConfig(ConfigSectionInterface):
class VolumesAPIConfig(_VolumesAPIConfig):
SECTION_NAME = 'volumes_api_v1'
@property
def serialize_format(self):
return self.get("serialize_format", default="json")
@property
def deserialize_format(self):
return self.get("deserialize_format", default="json")
#Volumes behavior config
@property
def default_volume_type(self):
return self.get("default_volume_type")
@property
def max_volume_size(self):
return self.get("max_volume_size", default=1024)
@property
def min_volume_size(self):
return self.get("min_volume_size", default=1)
@property
def volume_status_poll_frequency(self):
return self.get("volume_status_poll_frequency", default=5)
@property
def volume_create_timeout(self):
return self.get("volume_create_timeout", default=600)
@property
def volume_delete_min_timeout(self):
return self.get("volume_delete_min_timeout", default=0)
@property
def volume_delete_max_timeout(self):
return self.get("volume_delete_max_timeout", default=3600)
@property
def volume_delete_wait_per_gigabyte(self):
return self.get("volume_delete_wait_per_gigabyte", default=1)
#Snapshot behaviors config
@property
def snapshot_status_poll_frequency(self):
return self.get("snapshot_status_poll_frequency", default=10)
@property
def snapshot_create_max_timeout(self):
return self.get("snapshot_create_max_timeout", default=36000)
@property
def snapshot_create_min_timeout(self):
"""Absolute lower limit on calculated snapshot create timeouts"""
return self.get("snapshot_create_min_timeout", default=0)
@property
def snapshot_create_base_timeout(self):
"""Amount of time added by default to the final calculated snapshot
create timeouts."""
return self.get("snapshot_create_base_timeout", default=0)
@property
def snapshot_create_wait_per_gigabyte(self):
return self.get("snapshot_create_wait_per_gigabyte", default=600)
@property
def snapshot_delete_max_timeout(self):
"""Absolute upper limit on calculated snapshot delete timeouts"""
return self.get("snapshot_delete_max_timeout", default=36000)
@property
def snapshot_delete_min_timeout(self):
"""Absolute lower limit on calculated snapshot delete timeouts
"""
return self.get("snapshot_delete_min_timeout", default=0)
@property
def snapshot_delete_wait_per_gigabyte(self):
"""If set, volume snapshot delete behaviors can estimate the time
it will take a particular volume to delete given it's size"""
return self.get("snapshot_delete_wait_per_gigabyte", default=60)

View File

@ -25,7 +25,7 @@ class VolumeRequest(AutoMarshallingModel):
def __init__(
self, size=None, volume_type=None, display_name=None,
display_description=None, metadata=None, availability_zone=None,
snapshot_id=None, attachments=None):
snapshot_id=None):
self.size = size
self.volume_type = volume_type
@ -33,6 +33,7 @@ class VolumeRequest(AutoMarshallingModel):
self.display_description = display_description
self.metadata = metadata or dict()
self.availability_zone = availability_zone
self.snapshot_id = snapshot_id
def _obj_to_json(self):
return json.dumps(self._obj_to_json_dict())
@ -44,7 +45,8 @@ class VolumeRequest(AutoMarshallingModel):
"display_name": self.display_name,
"display_description": self.display_description,
"metadata": self.metadata,
"availability_zone": self.availability_zone}
"availability_zone": self.availability_zone,
"snapshot_id": self.snapshot_id}
return {'volume': self._remove_empty_values(volume_attrs)}
@ -55,7 +57,8 @@ class VolumeRequest(AutoMarshallingModel):
"volume_type": self.volume_type,
"display_name": self.display_name,
"display_description": self.display_description,
"availability_zone": self.availability_zone}
"availability_zone": self.availability_zone,
"snapshot_id": self.snapshot_id}
element = self._set_xml_etree_element(element, volume_attrs)
if len(self.metadata.keys()) > 0:

View File

@ -14,73 +14,9 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
import json
from xml.etree import ElementTree
from cafe.engine.models.base import \
AutoMarshallingModel, AutoMarshallingListModel
class _VolumesAPIBaseModel(AutoMarshallingModel):
obj_model_key = None
kwarg_map = {}
@classmethod
def _map_values_to_kwargs(cls, deserialized_obj):
kwargs = {}
for local_kw, deserialized_obj_kw in cls.kwarg_map.iteritems():
kwargs[local_kw] = deserialized_obj.get(deserialized_obj_kw)
return cls(**kwargs)
@classmethod
def _json_to_obj(cls, serialized_str):
json_dict = json.loads(serialized_str)
volume_dict = json_dict.get(cls.obj_model_key)
return cls._json_dict_to_obj(volume_dict)
@classmethod
def _json_dict_to_obj(cls, json_dict):
return cls._map_values_to_kwargs(json_dict)
@classmethod
def _xml_to_obj(cls, serialized_str):
element = ElementTree.fromstring(serialized_str)
return cls._xml_ele_to_obj(element)
@classmethod
def _xml_ele_to_obj(cls, element):
return cls._map_values_to_kwargs(element)
class _VolumesAPIBaseListModel(AutoMarshallingListModel):
list_model_key = None
ObjectModel = None
@classmethod
def _json_to_obj(cls, serialized_str):
json_dict = json.loads(serialized_str)
dict_list = json_dict.get(cls.list_model_key)
return cls._json_dict_to_obj(dict_list)
@classmethod
def _json_dict_to_obj(cls, json_dict):
obj_list = cls()
for obj_dict in json_dict:
obj_list.append(cls.ObjectModel._json_dict_to_obj(obj_dict))
return obj_list
@classmethod
def _xml_to_obj(cls, serialized_str):
list_element = ElementTree.fromstring(serialized_str)
return cls._xml_ele_to_obj(list_element)
@classmethod
def _xml_ele_to_obj(cls, xml_etree_element):
obj_list = cls()
for obj_element in xml_etree_element:
obj_list.append(cls.ObjectModel._xml_ele_to_obj(obj_element))
return obj_list
from cloudcafe.blockstorage.volumes_api.common.models.automarshalling import \
_VolumesAPIBaseListModel, _VolumesAPIBaseModel, _XMLDictionary
class VolumeResponse(_VolumesAPIBaseModel):
@ -96,24 +32,48 @@ class VolumeResponse(_VolumesAPIBaseModel):
"snapshot_id": "snapshot_id",
"attachments": "attachments",
"created_at": "created_at",
"links": "links",
"status": "status"}
def __init__(
self, id_=None, display_name=None, size=None, volume_type=None,
display_description=None, metadata=None, availability_zone=None,
snapshot_id=None, attachments=None, created_at=None, status=None):
snapshot_id=None, attachments=None, created_at=None, status=None,
links=None):
self.id_ = id_
self.display_name = display_name
self.display_description = display_description
self.size = size
self.volume_type = volume_type
self.metadata = metadata or {}
self.availability_zone = availability_zone
self.snapshot_id = snapshot_id
self.attachments = attachments
self.created_at = created_at
self.status = status
self.links = links or []
self.attachments = attachments or []
self.metadata = metadata or {}
@classmethod
def _json_to_obj(cls, serialized_str):
volume = super(VolumeResponse, cls)._json_to_obj(serialized_str)
volume.attachments = _VolumeAttachmentsList._json_dict_to_obj(
volume.attachments)
volume.links = _LinksList._json_dict_to_obj(volume.links)
return volume
@classmethod
def _xml_to_obj(cls, serialized_str):
element = ElementTree.fromstring(serialized_str)
kwargs = {}
for local_kw, deserialized_obj_kw in cls.kwarg_map.iteritems():
kwargs[local_kw] = element.get(deserialized_obj_kw)
volume = cls(**kwargs)
volume.metadata = _XMLDictionary._xml_ele_to_obj(element)
volume.attachments = _VolumeAttachmentsList._xml_ele_to_obj(element)
volume.links = _LinksList._xml_ele_to_obj(element)
return volume
class VolumeSnapshotResponse(_VolumesAPIBaseModel):
@ -125,11 +85,13 @@ class VolumeSnapshotResponse(_VolumesAPIBaseModel):
"display_description": "display_description",
"status": "status",
"size": "size",
"created_at": "created_at"}
"created_at": "created_at",
"metadata": "metadata"}
def __init__(
self, id_=None, volume_id=None, display_name=None,
display_description=None, status=None, size=None, created_at=None):
display_description=None, status=None, size=None, created_at=None,
metadata=None):
self.id_ = id_
self.volume_id = volume_id
@ -138,6 +100,18 @@ class VolumeSnapshotResponse(_VolumesAPIBaseModel):
self.status = status
self.size = size
self.created_at = created_at
self.metadata = metadata or {}
@classmethod
def _xml_to_obj(cls, serialized_str):
element = ElementTree.fromstring(serialized_str)
kwargs = {}
for local_kw, deserialized_obj_kw in cls.kwarg_map.iteritems():
kwargs[local_kw] = element.get(deserialized_obj_kw)
snapshot = cls(**kwargs)
snapshot.metadata = _XMLDictionary._xml_ele_to_obj(element)
return snapshot
class VolumeTypeResponse(_VolumesAPIBaseModel):
@ -151,7 +125,18 @@ class VolumeTypeResponse(_VolumesAPIBaseModel):
self.id_ = id_
self.name = name
self.extra_specs = extra_specs
self.extra_specs = extra_specs or {}
@classmethod
def _xml_to_obj(cls, serialized_str):
element = ElementTree.fromstring(serialized_str)
kwargs = {}
for local_kw, deserialized_obj_kw in cls.kwarg_map.iteritems():
kwargs[local_kw] = element.get(deserialized_obj_kw)
volume_type_obj = cls(**kwargs)
volume_type_obj.extra_specs = _XMLDictionary._xml_ele_to_obj(
element, 'extra_specs')
return volume_type_obj
class VolumeListResponse(_VolumesAPIBaseListModel):
@ -167,3 +152,45 @@ class VolumeSnapshotListResponse(_VolumesAPIBaseListModel):
class VolumeTypeListResponse(_VolumesAPIBaseListModel):
list_model_key = 'volume_types'
ObjectModel = VolumeTypeResponse
class _VolumeAttachmentItem(_VolumesAPIBaseModel):
kwarg_map = {
"id_": "id",
"device": "device",
"volume_id": "volume_id",
"server_id": "server_id"}
def __init__(self, id_=None, device=None, server_id=None, volume_id=None):
self.id_ = id_
self.device = device
self.volume_id = volume_id
self.server_id = server_id
class _VolumeAttachmentsList(_VolumesAPIBaseListModel):
list_model_key = 'attachments'
ObjectModel = _VolumeAttachmentItem
@classmethod
def _json_to_obj(cls, serialized_str):
raise NotImplementedError
@classmethod
def _xml_to_obj(cls, serialized_str):
raise NotImplementedError
class _LinksItem(_VolumesAPIBaseModel):
kwarg_map = {
"href": "href",
"rel": "rel"}
def __init__(self, href=None, rel=None):
self.href = href
self.rel = rel
class _LinksList(_VolumesAPIBaseListModel):
list_model_key = 'links'
ObjectModel = _LinksItem

View File

@ -0,0 +1,15 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

View File

@ -0,0 +1,15 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

View File

@ -0,0 +1,15 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

View File

@ -0,0 +1,444 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import unittest
from mock import MagicMock, Mock
from requests import Response
from cloudcafe.blockstorage.volumes_api.v1.behaviors import \
VolumesAPI_Behaviors, VolumesAPIBehaviorException
from cloudcafe.blockstorage.volumes_api.v1.models.responses import\
VolumeResponse, VolumeSnapshotResponse
from cloudcafe.blockstorage.volumes_api.v1.client import VolumesClient
from cloudcafe.blockstorage.volumes_api.v1.config import VolumesAPIConfig
class wait_for_snapshot_status(unittest.TestCase):
class defaults:
snapshot_name = 'mock_snapshot'
snapshot_id = '111111'
expected_status = 'available'
timeout = 10
poll_rate = 2
def test_wait_for_snapshot_status_good_response_code(self):
client = Mock(spec=VolumesClient)
config = Mock(spec=VolumesAPIConfig)
config.snapshot_status_poll_frequency = 1
snapshot_model = Mock(spec=VolumeSnapshotResponse)
snapshot_model.status = self.defaults.expected_status
response = Mock(spec=Response)
response.ok = True
response.entity = snapshot_model
client.get_snapshot_info = MagicMock(
return_value=response)
behavior = VolumesAPI_Behaviors(client, config)
resp = behavior.wait_for_snapshot_status(
self.defaults.snapshot_id, self.defaults.expected_status,
self.defaults.timeout, poll_rate=self.defaults.poll_rate)
self.assertIsNone(resp)
def test_wait_for_snapshot_status_good_response_code_config_wait_period(
self):
client = Mock(spec=VolumesClient)
config = Mock(spec=VolumesAPIConfig)
config.snapshot_status_poll_frequency = 1
snapshot_model = Mock(spec=VolumeSnapshotResponse)
snapshot_model.status = self.defaults.expected_status
response = Mock(spec=Response)
response.ok = True
response.entity = snapshot_model
client.get_snapshot_info = MagicMock(
return_value=response)
behavior = VolumesAPI_Behaviors(client, config)
resp = behavior.wait_for_snapshot_status(
self.defaults.snapshot_name, self.defaults.expected_status,
self.defaults.timeout)
self.assertIsNone(resp)
def test_wait_for_snapshot_status_good_response_code_bad_status(self):
client = Mock(spec=VolumesClient)
config = Mock(spec=VolumesAPIConfig)
config.snapshot_status_poll_frequency = 1
snapshot_model = Mock(spec=VolumeSnapshotResponse)
snapshot_model.status = self.defaults.expected_status
response = Mock(spec=Response)
response.ok = True
response.entity = snapshot_model
client.get_snapshot_info = MagicMock(
return_value=response)
behavior = VolumesAPI_Behaviors(client, config)
resp = behavior.wait_for_snapshot_status(
self.defaults.snapshot_name, self.defaults.expected_status,
self.defaults.timeout, poll_rate=self.defaults.poll_rate)
self.assertIsNone(resp)
def test_wait_for_snapshot_status_bad_response_code(self):
client = Mock(spec=VolumesClient)
config = Mock(spec=VolumesAPIConfig)
config.snapshot_status_poll_frequency = 1
snapshot_model = Mock(spec=VolumeSnapshotResponse)
snapshot_model.status = self.defaults.expected_status
response = Mock(spec=Response)
response.ok = False
response.entity = snapshot_model
response.status_code = '401'
client.get_snapshot_info = MagicMock(
return_value=response)
behavior = VolumesAPI_Behaviors(client, config)
with self.assertRaises(VolumesAPIBehaviorException):
behavior.wait_for_snapshot_status(
self.defaults.snapshot_name, self.defaults.expected_status,
self.defaults.timeout, poll_rate=self.defaults.poll_rate)
def test_wait_for_snapshot_status_good_response_code_empty_entity(self):
client = Mock(spec=VolumesClient)
config = Mock(spec=VolumesAPIConfig)
config.snapshot_status_poll_frequency = 1
snapshot_model = Mock(spec=VolumeSnapshotResponse)
snapshot_model.status = self.defaults.expected_status
response = Mock(spec=Response)
response.ok = True
response.entity = None
response.status_code = '200'
client.get_snapshot_info = MagicMock(
return_value=response)
behavior = VolumesAPI_Behaviors(client, config)
with self.assertRaises(VolumesAPIBehaviorException):
behavior.wait_for_snapshot_status(
self.defaults.snapshot_name, self.defaults.expected_status,
self.defaults.timeout, poll_rate=self.defaults.poll_rate)
def test_wait_for_snapshot_status_good_response_and_entity_bad_status(
self):
recieved_status = 'error'
client = Mock(spec=VolumesClient)
config = Mock(spec=VolumesAPIConfig)
config.snapshot_status_poll_frequency = 1
snapshot_model = Mock(spec=VolumeSnapshotResponse)
snapshot_model.status = recieved_status
response = Mock(spec=Response)
response.ok = True
response.entity = snapshot_model
client.get_snapshot_info = MagicMock(
return_value=response)
behavior = VolumesAPI_Behaviors(client, config)
with self.assertRaises(VolumesAPIBehaviorException):
behavior.wait_for_snapshot_status(
self.defaults.snapshot_name, self.defaults.expected_status,
self.defaults.timeout, poll_rate=self.defaults.poll_rate)
class wait_for_volume_status(unittest.TestCase):
class defaults:
volume_id = '111111'
volume_name = 'mock_volume'
expected_status = 'available'
timeout = 10
poll_rate = 2
timeout = 10
def test_wait_for_volume_status_good_response_code(self):
client = Mock(spec=VolumesClient)
config = Mock(spec=VolumesAPIConfig)
config.volume_status_poll_frequency = 1
volume_model = Mock(spec=VolumeResponse)
volume_model.status = self.defaults.expected_status
response = Mock(spec=Response)
response.ok = True
response.entity = volume_model
client.get_volume_info = MagicMock(
return_value=response)
behavior = VolumesAPI_Behaviors(client, config)
resp = behavior.wait_for_volume_status(
self.defaults.volume_id, self.defaults.expected_status,
self.defaults.timeout, poll_rate=self.defaults.poll_rate)
self.assertIsNone(resp)
def test_wait_for_volume_status_good_response_code_configured_poll_rate(
self):
client = Mock(spec=VolumesClient)
config = Mock(spec=VolumesAPIConfig)
config.volume_status_poll_frequency = 1
volume_model = Mock(spec=VolumeResponse)
volume_model.status = self.defaults.expected_status
response = Mock(spec=Response)
response.ok = True
response.entity = volume_model
client.get_volume_info = MagicMock(
return_value=response)
behavior = VolumesAPI_Behaviors(client, config)
resp = behavior.wait_for_volume_status(
self.defaults.volume_name, self.defaults.expected_status,
self.defaults.timeout)
self.assertIsNone(resp)
def test_wait_for_volume_status_good_response_code_bad_status(self):
volume_name = 'mock_volume'
expected_status = 'available'
timeout = 10
poll_rate = 2
client = Mock(spec=VolumesClient)
config = Mock(spec=VolumesAPIConfig)
config.volume_status_poll_frequency = 1
volume_model = Mock(spec=VolumeResponse)
volume_model.status = expected_status
response = Mock(spec=Response)
response.ok = True
response.entity = volume_model
client.get_volume_info = MagicMock(
return_value=response)
behavior = VolumesAPI_Behaviors(client, config)
resp = behavior.wait_for_volume_status(
volume_name, expected_status, timeout, poll_rate=poll_rate)
self.assertIsNone(resp)
def test_wait_for_volume_status_bad_response_code(self):
client = Mock(spec=VolumesClient)
config = Mock(spec=VolumesAPIConfig)
config.volume_status_poll_frequency = 1
volume_model = Mock(spec=VolumeResponse)
volume_model.status = self.defaults.expected_status
response = Mock(spec=Response)
response.ok = False
response.entity = volume_model
response.status_code = '401'
client.get_volume_info = MagicMock(
return_value=response)
behavior = VolumesAPI_Behaviors(client, config)
with self.assertRaises(VolumesAPIBehaviorException):
behavior.wait_for_volume_status(
self.defaults.volume_name, self.defaults.expected_status,
self.defaults.timeout, poll_rate=self.defaults.poll_rate)
def test_wait_for_volume_status_good_response_code_empty_entity(self):
client = Mock(spec=VolumesClient)
config = Mock(spec=VolumesAPIConfig)
config.volume_status_poll_frequency = 1
volume_model = Mock(spec=VolumeResponse)
volume_model.status = self.defaults.expected_status
response = Mock(spec=Response)
response.ok = True
response.entity = None
response.status_code = '200'
client.get_volume_info = MagicMock(
return_value=response)
behavior = VolumesAPI_Behaviors(client, config)
with self.assertRaises(VolumesAPIBehaviorException):
behavior.wait_for_volume_status(
self.defaults.volume_name, self.defaults.expected_status,
self.defaults.timeout, poll_rate=self.defaults.poll_rate)
def test_wait_for_volume_status_good_response_and_entity_bad_status(self):
recieved_status = 'error'
client = Mock(spec=VolumesClient)
config = Mock(spec=VolumesAPIConfig)
config.volume_status_poll_frequency = 1
volume_model = Mock(spec=VolumeResponse)
volume_model.status = recieved_status
response = Mock(spec=Response)
response.ok = True
response.entity = volume_model
client.get_volume_info = MagicMock(
return_value=response)
behavior = VolumesAPI_Behaviors(client, config)
with self.assertRaises(VolumesAPIBehaviorException):
behavior.wait_for_volume_status(
self.defaults.volume_name, self.defaults.expected_status,
self.defaults.timeout, poll_rate=self.defaults.poll_rate)
class create_available_volume(unittest.TestCase):
class defaults:
display_name = "mock_volume"
volume_type = "mock_type"
size = 1
def test_create_availabe_volume_happy_path(self):
volume_model = Mock(spec=VolumeResponse)
volume_model.id_ = "mock"
volume_create_response = Mock(spec=Response)
volume_create_response.entity = volume_model
volume_create_response.ok = True
client = Mock(spec=VolumesClient)
client.create_volume = MagicMock(return_value=volume_create_response)
config = Mock(spec=VolumesAPIConfig)
config.volume_create_timeout = 5
behavior = VolumesAPI_Behaviors(client, config)
behavior.wait_for_volume_status = MagicMock(return_value=None)
volume_entity = behavior.create_available_volume(
self.defaults.display_name, self.defaults.size,
self.defaults.volume_type)
self.assertIsInstance(volume_entity, VolumeResponse)
def test_create_available_volume_failure_response_no_model(self):
volume_model = Mock(spec=VolumeResponse)
volume_model.id_ = "mock"
volume_create_response = Mock(spec=Response)
volume_create_response.entity = None
volume_create_response.ok = False
volume_create_response.status_code = 500
client = Mock(spec=VolumesClient)
client.create_volume = MagicMock(return_value=volume_create_response)
config = Mock(spec=VolumesAPIConfig)
config.volume_create_timeout = 5
behavior = VolumesAPI_Behaviors(client, config)
behavior.wait_for_volume_status = MagicMock(return_value=None)
with self.assertRaises(VolumesAPIBehaviorException):
behavior.create_available_volume(
self.defaults.display_name, self.defaults.size,
self.defaults.volume_type)
def test_create_available_volume_failure_response_with_model(self):
volume_model = Mock(spec=VolumeResponse)
volume_model.id_ = "mock"
volume_create_response = Mock(spec=Response)
volume_create_response.entity = None
volume_create_response.ok = True
volume_create_response.status_code = 200
client = Mock(spec=VolumesClient)
client.create_volume = MagicMock(return_value=volume_create_response)
config = Mock(spec=VolumesAPIConfig)
config.volume_create_timeout = 5
behavior = VolumesAPI_Behaviors(client, config)
behavior.wait_for_volume_status = MagicMock(return_value=None)
with self.assertRaises(VolumesAPIBehaviorException):
behavior.create_available_volume(
self.defaults.display_name, self.defaults.size,
self.defaults.volume_type)
def test_create_available_volume_timeout_failure(self):
volume_model = Mock(spec=VolumeResponse)
volume_model.id_ = "mock"
volume_create_response = Mock(spec=Response)
volume_create_response.entity = None
volume_create_response.ok = True
volume_create_response.status_code = 200
client = Mock(spec=VolumesClient)
client.create_volume = MagicMock(return_value=volume_create_response)
config = Mock(spec=VolumesAPIConfig)
config.volume_create_timeout = 5
behavior = VolumesAPI_Behaviors(client, config)
behavior.wait_for_volume_status = MagicMock(
side_effect=VolumesAPIBehaviorException)
with self.assertRaises(VolumesAPIBehaviorException):
behavior.create_available_volume(
self.defaults.display_name, self.defaults.size,
self.defaults.volume_type)

View File

@ -0,0 +1,15 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

View File

@ -0,0 +1,454 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import unittest
import json
from cloudcafe.blockstorage.volumes_api.v1.models import responses
class BaseTest(object):
@classmethod
def setUpClass(cls):
cls.model = cls.model_type.deserialize(
cls.serialized_input, cls.deserialize_type)
class VolumeTypeResponseModelBaseTest(BaseTest):
model_type = responses.VolumeTypeResponse
class defaults:
id_ = "32948732984798324"
name = "fake name"
extra_specs_key = "ExtraSpecsKey"
extra_specs_value = "ExtraSpecsValue"
def test_id(self):
self.assertEqual(self.model.id_, self.defaults.id_)
def test_name(self):
self.assertEqual(self.model.name, self.defaults.name)
def test_extra_specs(self):
self.assertEqual(
self.model.extra_specs,
{self.defaults.extra_specs_key: self.defaults.extra_specs_value})
class VolumeTypeResponseModelTest_JSON(
VolumeTypeResponseModelBaseTest, unittest.TestCase):
defaults = VolumeTypeResponseModelBaseTest.defaults
deserialize_type = 'json'
data = {
"volume_type": {
"id": defaults.id_,
"name": defaults.name,
"extra_specs": {
defaults.extra_specs_key: defaults.extra_specs_value}}}
serialized_input = json.dumps(data)
class VolumeTypeResponseModelTest_XML(
VolumeTypeResponseModelBaseTest, unittest.TestCase):
defaults = VolumeTypeResponseModelBaseTest.defaults
xml_header = """<?xml version="1.0" encoding="UTF-8"?>"""
deserialize_type = 'xml'
input_template = \
"""{xml_header}<volume_type
xmlns="http://docs.openstack.org/fake"
id="{id_}" name="{name}">
<extra_specs>
<extra_spec key="{spec_key}">{spec_value}</extra_spec>
</extra_specs>
</volume_type>"""
serialized_input = input_template.format(
xml_header=xml_header, id_=defaults.id_, name=defaults.name,
spec_key=defaults.extra_specs_key,
spec_value=defaults.extra_specs_value)
class VolumeSnapshotResponseModelBaseTest(BaseTest):
model_type = responses.VolumeSnapshotResponse
class defaults:
status = 'creating'
display_description = 'fake snapshot description'
created_at = "2013-02-25T03:56:53.081642"
metadata_key = "MetaKey"
metadata_value = "MetaValue"
volume_id = "3242343242342342f32f324f3f"
size = 1
id_ = "2305iu32f9j3298f4jh32498fj"
display_name = "FakeSnapshotName"
def test_id(self):
self.assertEqual(self.model.id_, self.defaults.id_)
def test_volume_id(self):
self.assertEqual(self.model.volume_id, self.defaults.volume_id)
def test_display_name(self):
self.assertEqual(self.model.display_name, self.defaults.display_name)
def test_display_description(self):
self.assertEqual(
self.model.display_description, self.defaults.display_description)
def test_status(self):
self.assertEqual(self.model.status, self.defaults.status)
def test_size_int_equivalent(self):
self.assertEqual(int(self.model.size), self.defaults.size)
def test_size_str_compare(self):
self.assertEqual(str(self.model.size), str(self.defaults.size))
def test_size_int_compare(self):
self.assertEqual(int(self.model.size), int(self.defaults.size))
def test_created_at(self):
self.assertEqual(self.model.created_at, self.defaults.created_at)
def test_metadata(self):
self.assertEqual(
self.model.metadata,
{self.defaults.metadata_key: self.defaults.metadata_value})
class VolumeSnapshotResponseModelTests_JSON(
VolumeSnapshotResponseModelBaseTest, unittest.TestCase):
defaults = VolumeSnapshotResponseModelBaseTest.defaults
deserialize_type = 'json'
data = {
"snapshot": {
"status": defaults.status,
"display_description": defaults.display_description,
"created_at": defaults.created_at,
"metadata": {defaults.metadata_key: defaults.metadata_value},
"volume_id": defaults.volume_id,
"size": defaults.size,
"id": defaults.id_,
"display_name": defaults.display_name}}
serialized_input = json.dumps(data)
class VolumeSnapshotResponseModelTests_XML(
VolumeSnapshotResponseModelBaseTest, unittest.TestCase):
defaults = VolumeSnapshotResponseModelBaseTest.defaults
deserialize_type = 'xml'
xml_header = """<?xml version="1.0" encoding="UTF-8"?>"""
input_template = \
"""{xml_header}<snapshot
status="{status}"
display_description="{display_description}"
created_at="{created_at}"
volume_id="{volume_id}"
size="{size}"
id="{id_}"
display_name="{display_name}">
<metadata>
<meta key="{metadata_key}">{metadata_value}</meta>
</metadata>
</snapshot>"""
serialized_input = input_template.format(
xml_header=xml_header, status=defaults.status,
display_description=defaults.display_description,
created_at=defaults.created_at, volume_id=defaults.volume_id,
size=defaults.size, id_=defaults.id_,
display_name=defaults.display_name, metadata_key=defaults.metadata_key,
metadata_value=defaults.metadata_value)
class VolumeResponseModelBaseTests(BaseTest):
model_type = responses.VolumeResponse
class defaults:
status = 'available'
attachment_device = '/dev/xvdg'
attachment_server_id = u'e335bfc4-5ba0-49b0-9f2b-7bc202583047'
attachment_id = u'7d4fdc77-8db7-4ba8-9786-c2a265c8b157'
attachment_volume_id = u'7d4fdc77-8db7-4ba8-9786-c2a265c8b157'
link_href = "http://localhost:8776/v2/0c2ebfde/volumes/5aa119a8-d35"
link_rel = 'self'
availability_zone = "nova"
source_volid = "1234234234324234"
snapshot_id = "34545645645646456"
id_ = "5aa119a8-d25b-45a7-8d1b-88e127885635"
display_description = "Super volume."
bootable = "true"
display_name = "vol-002"
created_at = "2013-02-25T02:40:21.000000"
volume_type = "None"
os_vol_tenant_attr_tenant_id = "0c2eba2c5af04d3f9e9d0d410b371fde"
os_vol_host_attr_host = "ip-10-168-107-25"
size = 1
metadata_key = "MetaKey"
metadata_value = "MetaValue"
def test_id(self):
self.assertEqual(self.model.id_, self.defaults.id_)
def test_size_int_equivalent(self):
self.assertEqual(int(self.model.size), self.defaults.size)
def test_size_str_compare(self):
self.assertEqual(str(self.model.size), str(self.defaults.size))
def test_size_int_compare(self):
self.assertEqual(int(self.model.size), int(self.defaults.size))
def test_display_name(self):
self.assertEqual(self.model.display_name, self.defaults.display_name)
def test_volume_type(self):
self.assertEqual(
self.model.volume_type, self.defaults.volume_type)
def test_display_description(self):
self.assertEqual(
self.model.display_description, self.defaults.display_description)
def test_availability_zone(self):
self.assertEqual(
self.model.availability_zone,
self.defaults.availability_zone)
def test_metadata(self):
self.assertEqual(
self.model.metadata,
{self.defaults.metadata_key: self.defaults.metadata_value})
def test_snapshot_id(self):
self.assertEqual(
self.model.snapshot_id, self.defaults.snapshot_id)
def test_attachments_device(self):
self.assertEqual(
self.model.attachments[0].device,
self.defaults.attachment_device)
def test_attachments_server_id(self):
self.assertEqual(
self.model.attachments[0].server_id,
self.defaults.attachment_server_id)
def test_attachments_id(self):
self.assertEqual(
self.model.attachments[0].id_,
self.defaults.attachment_id)
def test_attachments_volume_id(self):
self.assertEqual(
self.model.attachments[0].volume_id,
self.defaults.attachment_volume_id)
def test_created_at(self):
self.assertEqual(
self.model.created_at,
self.defaults.created_at)
def test_status(self):
self.assertEqual(
self.model.status,
self.defaults.status)
def test_links_href(self):
self.assertEqual(
self.model.links[0].href,
self.defaults.link_href)
def test_links_rel(self):
self.assertEqual(
self.model.links[0].rel,
self.defaults.link_rel)
class VolumeResponseModelTests_JSON(
VolumeResponseModelBaseTests, unittest.TestCase):
defaults = VolumeResponseModelBaseTests.defaults
deserialize_type = 'json'
data = {
"volume": {
"status": defaults.status,
"attachments": [
{u'device': defaults.attachment_device,
u'server_id': defaults.attachment_server_id,
u'id': defaults.attachment_id,
u'volume_id': defaults.attachment_volume_id}],
"links": [{
"href": defaults.link_href,
"rel": defaults.link_rel}],
"availability_zone": defaults.availability_zone,
"snapshot_id": defaults.snapshot_id,
"id": defaults.id_,
"display_description": defaults.display_description,
"display_name": defaults.display_name,
"created_at": defaults.created_at,
"volume_type": defaults.volume_type,
"size": defaults.size,
"metadata": {defaults.metadata_key: defaults.metadata_value}}}
serialized_input = json.dumps(data)
def get_modified_volume_model(self, sub_attr_name, new_object):
modified_data = dict()
modified_data['volume'] = self.data['volume']
modified_data['volume'][sub_attr_name] = new_object
serialized_input = json.dumps(modified_data)
model = self.model_type.deserialize(
serialized_input, self.deserialize_type)
return model
def test_empty_attachments_response(self):
model = self.get_modified_volume_model('attachments', list())
self.assertEqual(model.attachments, list())
self.assertIsInstance(
model.attachments, responses._VolumeAttachmentsList)
def test_empty_links_response(self):
model = self.get_modified_volume_model('links', list())
self.assertEqual(model.links, list())
self.assertIsInstance(
model.attachments, responses._VolumeAttachmentsList)
def test_empty_metadata_response(self):
model = self.get_modified_volume_model('metadata', dict())
self.assertEqual(model.metadata, dict())
class VolumeDetailResponseModelTests_XML(
VolumeResponseModelBaseTests, unittest.TestCase):
defaults = VolumeResponseModelBaseTests.defaults
deserialize_type = 'xml'
xml_header = """<?xml version="1.0" encoding="UTF-8"?>"""
input_template = \
"""{xml_header}<volume
xmlns:atom="http://www.w3.org/2005/Atom"
xmlns="http://docs.openstack.org/volume/api/v1"
status="{status}"
display_name="{display_name}"
availability_zone="{availability_zone}"
created_at="{created_at}"
display_description="{display_description}"
volume_type="{volume_type}"
snapshot_id="{snapshot_id}"
id="{id_}"
size="{size}">
<attachments>
<attachment device="{attachment_device}"
server_id="{attachment_server_id}"
id="{attachment_id}"
volume_id="{attachment_volume_id}"
/>
</attachments>
<metadata>
<meta key="{metadata_key}">{metadata_value}</meta>
</metadata>
</volume>"""
serialized_input = input_template.format(
xml_header=xml_header, status=defaults.status,
display_name=defaults.display_name,
availability_zone=defaults.availability_zone,
created_at=defaults.created_at,
display_description=defaults.display_description,
volume_type=defaults.volume_type, snapshot_id=defaults.snapshot_id,
id_=defaults.id_, size=defaults.size,
attachment_device=defaults.attachment_device,
attachment_server_id=defaults.attachment_server_id,
attachment_id=defaults.attachment_id,
attachment_volume_id=defaults.attachment_volume_id,
metadata_key=defaults.metadata_key,
metadata_value=defaults.metadata_value)
@unittest.skip("There are no XML examples of links in XML responses")
def test_links_href(self):
super(VolumeDetailResponseModelTests_XML, self).test_links_href()
@unittest.skip("There are no XML examples of links in XML responses")
def test_links_rel(self):
super(VolumeDetailResponseModelTests_XML, self).test_links_href()
@unittest.skip("There are no XML examples of links in XML responses")
def test_empty_links_response(self):
pass
def test_empty_attachments_response(self):
serialized_input = """{xml_header}<volume
xmlns:atom="http://www.w3.org/2005/Atom"
xmlns="http://docs.openstack.org/volume/api/v1"
status="{status}"
display_name="{display_name}"
availability_zone="{availability_zone}"
created_at="{created_at}"
display_description="{display_description}"
volume_type="{volume_type}"
snapshot_id="{snapshot_id}"
id="{id_}"
size="{size}">
<metadata>
<meta key="{metadata_key}">{metadata_value}</meta>
</metadata>
</volume>""".format(
xml_header=self.xml_header,
status=self.defaults.status,
display_name=self.defaults.display_name,
availability_zone=self.defaults.availability_zone,
created_at=self.defaults.created_at,
display_description=self.defaults.display_description,
volume_type=self.defaults.volume_type,
snapshot_id=self.defaults.snapshot_id,
id_=self.defaults.id_,
size=self.defaults.size,
metadata_key=self.defaults.metadata_key,
metadata_value=self.defaults.metadata_value)
self.model = self.model_type.deserialize(
serialized_input, self.deserialize_type)
self.assertEqual(self.model.attachments, [])
self.assertIsInstance(
self.model.attachments, responses._VolumeAttachmentsList)
def test_empty_metadata_response(self):
serialized_input = """{xml_header}<volume
xmlns:atom="http://www.w3.org/2005/Atom"
xmlns="http://docs.openstack.org/volume/api/v1"
status="{status}"
display_name="{display_name}"
availability_zone="{availability_zone}"
created_at="{created_at}"
display_description="{display_description}"
volume_type="{volume_type}"
snapshot_id="{snapshot_id}"
id="{id_}"
size="{size}">
</volume>""".format(
xml_header=self.xml_header,
status=self.defaults.status,
display_name=self.defaults.display_name,
availability_zone=self.defaults.availability_zone,
created_at=self.defaults.created_at,
display_description=self.defaults.display_description,
volume_type=self.defaults.volume_type,
snapshot_id=self.defaults.snapshot_id,
id_=self.defaults.id_,
size=self.defaults.size)
self.model = self.model_type.deserialize(
serialized_input, self.deserialize_type)
self.assertEqual(self.model.metadata, {})