Make metadata a common mixin
Implement metadata as a common mixin. Use it in the compute service for the beginning. Change-Id: Id29da2f7a21ed3bc6a1b86052d2a8b855df8516a
This commit is contained in:
parent
8140989639
commit
c1c8dfb79f
138
openstack/common/metadata.py
Normal file
138
openstack/common/metadata.py
Normal file
@ -0,0 +1,138 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from openstack import exceptions
|
||||
from openstack import resource
|
||||
from openstack import utils
|
||||
|
||||
|
||||
class MetadataMixin:
|
||||
|
||||
#: *Type: list of tag strings*
|
||||
metadata = resource.Body('metadata', type=dict)
|
||||
|
||||
def fetch_metadata(self, session):
|
||||
"""Lists metadata set on the entity.
|
||||
|
||||
:param session: The session to use for making this request.
|
||||
:return: The dictionary with metadata attached to the entity
|
||||
"""
|
||||
url = utils.urljoin(self.base_path, self.id, 'metadata')
|
||||
response = session.get(url)
|
||||
exceptions.raise_from_response(response)
|
||||
json = response.json()
|
||||
|
||||
if 'metadata' in json:
|
||||
self._body.attributes.update({'metadata': json['metadata']})
|
||||
return self
|
||||
|
||||
def set_metadata(self, session, metadata=None, replace=False):
|
||||
"""Sets/Replaces metadata key value pairs on the resource.
|
||||
|
||||
:param session: The session to use for making this request.
|
||||
:param dict metadata: Dictionary with key-value pairs
|
||||
:param bool replace: Replace all resource metadata with the new object
|
||||
or merge new and existing.
|
||||
"""
|
||||
url = utils.urljoin(self.base_path, self.id, 'metadata')
|
||||
if not metadata:
|
||||
metadata = {}
|
||||
if not replace:
|
||||
response = session.post(url, json={'metadata': metadata})
|
||||
else:
|
||||
response = session.put(url, json={'metadata': metadata})
|
||||
exceptions.raise_from_response(response)
|
||||
self._body.attributes.update({'metadata': metadata})
|
||||
return self
|
||||
|
||||
def replace_metadata(self, session, metadata=None):
|
||||
"""Replaces all metadata key value pairs on the resource.
|
||||
|
||||
:param session: The session to use for making this request.
|
||||
:param dict metadata: Dictionary with key-value pairs
|
||||
:param bool replace: Replace all resource metadata with the new object
|
||||
or merge new and existing.
|
||||
"""
|
||||
return self.set_metadata(session, metadata, replace=True)
|
||||
|
||||
def delete_metadata(self, session):
|
||||
"""Removes all metadata on the entity.
|
||||
|
||||
:param session: The session to use for making this request.
|
||||
"""
|
||||
self.set_metadata(session, None, replace=True)
|
||||
return self
|
||||
|
||||
def get_metadata_item(self, session, key):
|
||||
"""Get the single metadata item on the entity.
|
||||
|
||||
If the metadata key does not exist a 404 will be returned
|
||||
|
||||
:param session: The session to use for making this request.
|
||||
:param str key: The key of a metadata item.
|
||||
"""
|
||||
url = utils.urljoin(self.base_path, self.id, 'metadata', key)
|
||||
response = session.get(url)
|
||||
exceptions.raise_from_response(
|
||||
response, error_message='Metadata item does not exist')
|
||||
meta = response.json().get('meta', {})
|
||||
# Here we need to potentially init metadata
|
||||
metadata = self.metadata or {}
|
||||
metadata[key] = meta.get(key)
|
||||
self._body.attributes.update({
|
||||
'metadata': metadata
|
||||
})
|
||||
|
||||
return self
|
||||
|
||||
def set_metadata_item(self, session, key, value):
|
||||
"""Create or replace single metadata item to the resource.
|
||||
|
||||
:param session: The session to use for making this request.
|
||||
:param str key: The key for the metadata item.
|
||||
:param str value: The value.
|
||||
"""
|
||||
url = utils.urljoin(self.base_path, self.id, 'metadata', key)
|
||||
response = session.put(
|
||||
url,
|
||||
json={'meta': {key: value}}
|
||||
)
|
||||
exceptions.raise_from_response(response)
|
||||
# we do not want to update tags directly
|
||||
metadata = self.metadata
|
||||
metadata[key] = value
|
||||
self._body.attributes.update({
|
||||
'metadata': metadata
|
||||
})
|
||||
return self
|
||||
|
||||
def delete_metadata_item(self, session, key):
|
||||
"""Removes a single metadata item from the specified resource.
|
||||
|
||||
:param session: The session to use for making this request.
|
||||
:param str key: The key as a string.
|
||||
"""
|
||||
url = utils.urljoin(self.base_path, self.id, 'metadata', key)
|
||||
response = session.delete(url)
|
||||
exceptions.raise_from_response(response)
|
||||
# we do not want to update tags directly
|
||||
metadata = self.metadata
|
||||
try:
|
||||
if metadata:
|
||||
metadata.pop(key)
|
||||
else:
|
||||
metadata = {}
|
||||
except ValueError:
|
||||
pass # do nothing!
|
||||
self._body.attributes.update({
|
||||
'metadata': metadata
|
||||
})
|
||||
return self
|
@ -104,7 +104,7 @@ class TagMixin:
|
||||
return self
|
||||
|
||||
def remove_tag(self, session, tag):
|
||||
"""Removes a single tag from the specified server.
|
||||
"""Removes a single tag from the specified resource.
|
||||
|
||||
:param session: The session to use for making this request.
|
||||
:param tag: The tag as a string.
|
||||
|
@ -483,9 +483,7 @@ class Proxy(proxy.Proxy):
|
||||
:rtype: :class:`~openstack.compute.v2.image.Image`
|
||||
"""
|
||||
res = self._get_base_resource(image, _image.Image)
|
||||
metadata = res.get_metadata(self)
|
||||
result = _image.Image.existing(id=res.id, metadata=metadata)
|
||||
return result
|
||||
return res.fetch_metadata(self)
|
||||
|
||||
def set_image_metadata(self, image, **metadata):
|
||||
"""Update metadata for an image
|
||||
@ -502,23 +500,28 @@ class Proxy(proxy.Proxy):
|
||||
:rtype: :class:`~openstack.compute.v2.image.Image`
|
||||
"""
|
||||
res = self._get_base_resource(image, _image.Image)
|
||||
metadata = res.set_metadata(self, **metadata)
|
||||
result = _image.Image.existing(id=res.id, metadata=metadata)
|
||||
return result
|
||||
return res.set_metadata(self, metadata=metadata)
|
||||
|
||||
def delete_image_metadata(self, image, keys):
|
||||
def delete_image_metadata(self, image, keys=None):
|
||||
"""Delete metadata for an image
|
||||
|
||||
Note: This method will do a HTTP DELETE request for every key in keys.
|
||||
|
||||
:param image: Either the ID of an image or a
|
||||
:class:`~openstack.compute.v2.image.Image` instance.
|
||||
:param keys: The keys to delete.
|
||||
:param list keys: The keys to delete. If left empty complete metadata
|
||||
will be removed.
|
||||
|
||||
:rtype: ``None``
|
||||
"""
|
||||
res = self._get_base_resource(image, _image.Image)
|
||||
return res.delete_metadata(self, keys)
|
||||
if keys is not None:
|
||||
# Create a set as a snapshot of keys to avoid "changed during
|
||||
# iteration"
|
||||
for key in set(keys):
|
||||
res.delete_metadata_item(self, key)
|
||||
else:
|
||||
res.delete_metadata(self)
|
||||
|
||||
def create_keypair(self, **attrs):
|
||||
"""Create a new keypair from attributes
|
||||
@ -1256,14 +1259,12 @@ class Proxy(proxy.Proxy):
|
||||
:class:`~openstack.compute.v2.server.ServerDetail`
|
||||
instance.
|
||||
|
||||
:returns: A :class:`~openstack.compute.v2.server.Server` with only the
|
||||
:returns: A :class:`~openstack.compute.v2.server.Server` with the
|
||||
server's metadata. All keys and values are Unicode text.
|
||||
:rtype: :class:`~openstack.compute.v2.server.Server`
|
||||
"""
|
||||
res = self._get_base_resource(server, _server.Server)
|
||||
metadata = res.get_metadata(self)
|
||||
result = _server.Server.existing(id=res.id, metadata=metadata)
|
||||
return result
|
||||
return res.fetch_metadata(self)
|
||||
|
||||
def set_server_metadata(self, server, **metadata):
|
||||
"""Update metadata for a server
|
||||
@ -1280,23 +1281,28 @@ class Proxy(proxy.Proxy):
|
||||
:rtype: :class:`~openstack.compute.v2.server.Server`
|
||||
"""
|
||||
res = self._get_base_resource(server, _server.Server)
|
||||
metadata = res.set_metadata(self, **metadata)
|
||||
result = _server.Server.existing(id=res.id, metadata=metadata)
|
||||
return result
|
||||
return res.set_metadata(self, metadata=metadata)
|
||||
|
||||
def delete_server_metadata(self, server, keys):
|
||||
def delete_server_metadata(self, server, keys=None):
|
||||
"""Delete metadata for a server
|
||||
|
||||
Note: This method will do a HTTP DELETE request for every key in keys.
|
||||
|
||||
:param server: Either the ID of a server or a
|
||||
:class:`~openstack.compute.v2.server.Server` instance.
|
||||
:param keys: The keys to delete
|
||||
:param list keys: The keys to delete. If left empty complete
|
||||
metadata will be removed.
|
||||
|
||||
:rtype: ``None``
|
||||
"""
|
||||
res = self._get_base_resource(server, _server.Server)
|
||||
return res.delete_metadata(self, keys)
|
||||
if keys is not None:
|
||||
# Create a set as a snapshot of keys to avoid "changed during
|
||||
# iteration"
|
||||
for key in set(keys):
|
||||
res.delete_metadata_item(self, key)
|
||||
else:
|
||||
res.delete_metadata(self)
|
||||
|
||||
def create_server_group(self, **attrs):
|
||||
"""Create a new server group from attributes
|
||||
|
@ -9,8 +9,7 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from openstack.compute.v2 import metadata
|
||||
from openstack.common import metadata
|
||||
from openstack import resource
|
||||
|
||||
|
||||
@ -38,8 +37,6 @@ class Image(resource.Resource, metadata.MetadataMixin):
|
||||
name = resource.Body('name')
|
||||
#: Timestamp when the image was created.
|
||||
created_at = resource.Body('created')
|
||||
#: Metadata pertaining to this image. *Type: dict*
|
||||
metadata = resource.Body('metadata', type=dict)
|
||||
#: The mimimum disk size. *Type: int*
|
||||
min_disk = resource.Body('minDisk', type=int)
|
||||
#: The minimum RAM size. *Type: int*
|
||||
|
@ -1,101 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from openstack import exceptions
|
||||
from openstack import utils
|
||||
|
||||
|
||||
class MetadataMixin:
|
||||
|
||||
def _metadata(self, method, key=None, clear=False, delete=False,
|
||||
metadata=None):
|
||||
metadata = metadata or {}
|
||||
for k, v in metadata.items():
|
||||
if not isinstance(v, str):
|
||||
raise ValueError("The value for %s (%s) must be "
|
||||
"a text string" % (k, v))
|
||||
|
||||
# If we're in a ServerDetail, we need to pop the "detail" portion
|
||||
# of the URL off and then everything else will work the same.
|
||||
pos = self.base_path.find("detail")
|
||||
if pos != -1:
|
||||
base = self.base_path[:pos]
|
||||
else:
|
||||
base = self.base_path
|
||||
|
||||
if key is not None:
|
||||
url = utils.urljoin(base, self.id, "metadata", key)
|
||||
else:
|
||||
url = utils.urljoin(base, self.id, "metadata")
|
||||
|
||||
kwargs = {}
|
||||
if metadata or clear:
|
||||
# 'meta' is the key for singular modifications.
|
||||
# 'metadata' is the key for mass modifications.
|
||||
key = "meta" if key is not None else "metadata"
|
||||
kwargs["json"] = {key: metadata}
|
||||
|
||||
headers = {"Accept": ""} if delete else {}
|
||||
|
||||
response = method(url, headers=headers, **kwargs)
|
||||
|
||||
# ensure Nova API has not returned us an error
|
||||
exceptions.raise_from_response(response)
|
||||
# DELETE doesn't return a JSON body while everything else does.
|
||||
return response.json() if not delete else None
|
||||
|
||||
def get_metadata(self, session):
|
||||
"""Retrieve metadata
|
||||
|
||||
:param session: The session to use for this request.
|
||||
|
||||
:returns: A dictionary of the requested metadata. All keys and values
|
||||
are Unicode text.
|
||||
:rtype: dict
|
||||
"""
|
||||
result = self._metadata(session.get)
|
||||
return result["metadata"]
|
||||
|
||||
def set_metadata(self, session, **metadata):
|
||||
"""Update metadata
|
||||
|
||||
This call will replace only the metadata with the same keys
|
||||
given here. Metadata with other keys will not be modified.
|
||||
|
||||
:param session: The session to use for this request.
|
||||
:param kwargs metadata: key/value metadata pairs to be update on
|
||||
this server instance. All keys and values
|
||||
are stored as Unicode.
|
||||
|
||||
:returns: A dictionary of the metadata after being updated.
|
||||
All keys and values are Unicode text.
|
||||
:rtype: dict
|
||||
"""
|
||||
if not metadata:
|
||||
return dict()
|
||||
|
||||
result = self._metadata(session.post, metadata=metadata)
|
||||
return result["metadata"]
|
||||
|
||||
def delete_metadata(self, session, keys):
|
||||
"""Delete metadata
|
||||
|
||||
Note: This method will do a HTTP DELETE request for every key in keys.
|
||||
|
||||
:param session: The session to use for this request.
|
||||
:param list keys: The keys to delete.
|
||||
|
||||
:rtype: ``None``
|
||||
"""
|
||||
for key in keys:
|
||||
self._metadata(session.delete, key=key, delete=True)
|
@ -9,8 +9,8 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from openstack.common import metadata
|
||||
from openstack.common import tag
|
||||
from openstack.compute.v2 import metadata
|
||||
from openstack import exceptions
|
||||
from openstack.image.v2 import image
|
||||
from openstack import resource
|
||||
@ -143,8 +143,6 @@ class Server(resource.Resource, metadata.MetadataMixin, tag.TagMixin):
|
||||
launched_at = resource.Body('OS-SRV-USG:launched_at')
|
||||
#: The maximum number of servers to create.
|
||||
max_count = resource.Body('max_count')
|
||||
#: Metadata stored for this server. *Type: dict*
|
||||
metadata = resource.Body('metadata', type=dict)
|
||||
#: The minimum number of servers to create.
|
||||
min_count = resource.Body('min_count')
|
||||
#: A networks object. Required parameter when there are multiple
|
||||
|
202
openstack/tests/unit/common/test_metadata.py
Normal file
202
openstack/tests/unit/common/test_metadata.py
Normal file
@ -0,0 +1,202 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from unittest import mock
|
||||
|
||||
from keystoneauth1 import adapter
|
||||
|
||||
from openstack.common import metadata
|
||||
from openstack import exceptions
|
||||
from openstack import resource
|
||||
from openstack.tests.unit import base
|
||||
from openstack.tests.unit.test_resource import FakeResponse
|
||||
|
||||
IDENTIFIER = 'IDENTIFIER'
|
||||
|
||||
|
||||
class TestMetadata(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestMetadata, self).setUp()
|
||||
|
||||
self.service_name = "service"
|
||||
self.base_path = "base_path"
|
||||
|
||||
self.metadata_result = {"metadata": {"go": "cubs", "boo": "sox"}}
|
||||
self.meta_result = {"meta": {"oh": "yeah"}}
|
||||
|
||||
class Test(resource.Resource, metadata.MetadataMixin):
|
||||
service = self.service_name
|
||||
base_path = self.base_path
|
||||
resources_key = 'resources'
|
||||
allow_create = True
|
||||
allow_fetch = True
|
||||
allow_head = True
|
||||
allow_commit = True
|
||||
allow_delete = True
|
||||
allow_list = True
|
||||
|
||||
self.test_class = Test
|
||||
|
||||
self.request = mock.Mock(spec=resource._Request)
|
||||
self.request.url = "uri"
|
||||
self.request.body = "body"
|
||||
self.request.headers = "headers"
|
||||
|
||||
self.response = FakeResponse({})
|
||||
|
||||
self.sot = Test.new(id="id")
|
||||
self.sot._prepare_request = mock.Mock(return_value=self.request)
|
||||
self.sot._translate_response = mock.Mock()
|
||||
|
||||
self.session = mock.Mock(spec=adapter.Adapter)
|
||||
self.session.get = mock.Mock(return_value=self.response)
|
||||
self.session.put = mock.Mock(return_value=self.response)
|
||||
self.session.post = mock.Mock(return_value=self.response)
|
||||
self.session.delete = mock.Mock(return_value=self.response)
|
||||
|
||||
def test_metadata_attribute(self):
|
||||
res = self.sot
|
||||
self.assertTrue(hasattr(res, 'metadata'))
|
||||
|
||||
def test_get_metadata(self):
|
||||
res = self.sot
|
||||
|
||||
mock_response = mock.Mock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.links = {}
|
||||
mock_response.json.return_value = {'metadata': {'foo': 'bar'}}
|
||||
|
||||
self.session.get.side_effect = [mock_response]
|
||||
|
||||
result = res.fetch_metadata(self.session)
|
||||
# Check metadata attribute is updated
|
||||
self.assertDictEqual({'foo': 'bar'}, result.metadata)
|
||||
# Check passed resource is returned
|
||||
self.assertEqual(res, result)
|
||||
url = self.base_path + '/' + res.id + '/metadata'
|
||||
self.session.get.assert_called_once_with(url)
|
||||
|
||||
def test_set_metadata(self):
|
||||
res = self.sot
|
||||
|
||||
result = res.set_metadata(self.session, {'foo': 'bar'})
|
||||
# Check metadata attribute is updated
|
||||
self.assertDictEqual({'foo': 'bar'}, res.metadata)
|
||||
# Check passed resource is returned
|
||||
self.assertEqual(res, result)
|
||||
url = self.base_path + '/' + res.id + '/metadata'
|
||||
self.session.post.assert_called_once_with(
|
||||
url,
|
||||
json={'metadata': {'foo': 'bar'}}
|
||||
)
|
||||
|
||||
def test_replace_metadata(self):
|
||||
res = self.sot
|
||||
|
||||
result = res.replace_metadata(self.session, {'foo': 'bar'})
|
||||
# Check metadata attribute is updated
|
||||
self.assertDictEqual({'foo': 'bar'}, res.metadata)
|
||||
# Check passed resource is returned
|
||||
self.assertEqual(res, result)
|
||||
url = self.base_path + '/' + res.id + '/metadata'
|
||||
self.session.put.assert_called_once_with(
|
||||
url,
|
||||
json={'metadata': {'foo': 'bar'}}
|
||||
)
|
||||
|
||||
def test_delete_all_metadata(self):
|
||||
res = self.sot
|
||||
|
||||
# Set some initial value to check removal
|
||||
res.metadata = {'foo': 'bar'}
|
||||
|
||||
result = res.delete_metadata(self.session)
|
||||
# Check metadata attribute is updated
|
||||
self.assertEqual({}, res.metadata)
|
||||
# Check passed resource is returned
|
||||
self.assertEqual(res, result)
|
||||
url = self.base_path + '/' + res.id + '/metadata'
|
||||
self.session.put.assert_called_once_with(
|
||||
url,
|
||||
json={'metadata': {}})
|
||||
|
||||
def test_get_metadata_item(self):
|
||||
res = self.sot
|
||||
|
||||
mock_response = mock.Mock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.json.return_value = {'meta': {'foo': 'bar'}}
|
||||
self.session.get.side_effect = [mock_response]
|
||||
|
||||
result = res.get_metadata_item(self.session, 'foo')
|
||||
# Check tags attribute is updated
|
||||
self.assertEqual({'foo': 'bar'}, res.metadata)
|
||||
# Check the passed resource is returned
|
||||
self.assertEqual(res, result)
|
||||
url = self.base_path + '/' + res.id + '/metadata/foo'
|
||||
self.session.get.assert_called_once_with(url)
|
||||
|
||||
def test_delete_single_item(self):
|
||||
res = self.sot
|
||||
|
||||
res.metadata = {'foo': 'bar', 'foo2': 'bar2'}
|
||||
|
||||
result = res.delete_metadata_item(self.session, 'foo2')
|
||||
# Check metadata attribute is updated
|
||||
self.assertEqual({'foo': 'bar'}, res.metadata)
|
||||
# Check passed resource is returned
|
||||
self.assertEqual(res, result)
|
||||
url = self.base_path + '/' + res.id + '/metadata/foo2'
|
||||
self.session.delete.assert_called_once_with(url)
|
||||
|
||||
def test_delete_signle_item_empty(self):
|
||||
res = self.sot
|
||||
|
||||
result = res.delete_metadata_item(self.session, 'foo2')
|
||||
# Check metadata attribute is updated
|
||||
self.assertEqual({}, res.metadata)
|
||||
# Check passed resource is returned
|
||||
self.assertEqual(res, result)
|
||||
url = self.base_path + '/' + res.id + '/metadata/foo2'
|
||||
self.session.delete.assert_called_once_with(url)
|
||||
|
||||
def test_get_metadata_item_not_exists(self):
|
||||
res = self.sot
|
||||
|
||||
mock_response = mock.Mock()
|
||||
mock_response.status_code = 404
|
||||
mock_response.content = None
|
||||
self.session.get.side_effect = [mock_response]
|
||||
|
||||
# ensure we get 404
|
||||
self.assertRaises(
|
||||
exceptions.NotFoundException,
|
||||
res.get_metadata_item,
|
||||
self.session,
|
||||
'dummy',
|
||||
)
|
||||
|
||||
def test_set_metadata_item(self):
|
||||
res = self.sot
|
||||
|
||||
# Set some initial value to check add
|
||||
res.metadata = {'foo': 'bar'}
|
||||
|
||||
result = res.set_metadata_item(self.session, 'foo', 'black')
|
||||
# Check metadata attribute is updated
|
||||
self.assertEqual({'foo': 'black'}, res.metadata)
|
||||
# Check passed resource is returned
|
||||
self.assertEqual(res, result)
|
||||
url = self.base_path + '/' + res.id + '/metadata/foo'
|
||||
self.session.put.assert_called_once_with(
|
||||
url,
|
||||
json={'meta': {'foo': 'black'}})
|
@ -1,124 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from unittest import mock
|
||||
|
||||
from openstack.compute.v2 import server
|
||||
from openstack import exceptions
|
||||
from openstack.tests.unit import base
|
||||
|
||||
IDENTIFIER = 'IDENTIFIER'
|
||||
|
||||
# NOTE: The implementation for metadata is done via a mixin class that both
|
||||
# the server and image resources inherit from. Currently this test class
|
||||
# uses the Server resource to test it. Ideally it would be parameterized
|
||||
# to run with both Server and Image when the tooling for subtests starts
|
||||
# working.
|
||||
|
||||
|
||||
class TestMetadata(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestMetadata, self).setUp()
|
||||
self.metadata_result = {"metadata": {"go": "cubs", "boo": "sox"}}
|
||||
self.meta_result = {"meta": {"oh": "yeah"}}
|
||||
|
||||
def test_get_all_metadata_Server(self):
|
||||
self._test_get_all_metadata(server.Server(id=IDENTIFIER))
|
||||
|
||||
def test_get_all_metadata_ServerDetail(self):
|
||||
# This is tested explicitly so we know ServerDetail items are
|
||||
# properly having /detail stripped out of their base_path.
|
||||
self._test_get_all_metadata(server.ServerDetail(id=IDENTIFIER))
|
||||
|
||||
def _test_get_all_metadata(self, sot):
|
||||
response = mock.Mock()
|
||||
response.status_code = 200
|
||||
response.json.return_value = self.metadata_result
|
||||
sess = mock.Mock()
|
||||
sess.get.return_value = response
|
||||
|
||||
result = sot.get_metadata(sess)
|
||||
|
||||
self.assertEqual(result, self.metadata_result["metadata"])
|
||||
sess.get.assert_called_once_with(
|
||||
"servers/IDENTIFIER/metadata",
|
||||
headers={})
|
||||
|
||||
def test_set_metadata(self):
|
||||
response = mock.Mock()
|
||||
response.status_code = 200
|
||||
response.json.return_value = self.metadata_result
|
||||
sess = mock.Mock()
|
||||
sess.post.return_value = response
|
||||
|
||||
sot = server.Server(id=IDENTIFIER)
|
||||
|
||||
set_meta = {"lol": "rofl"}
|
||||
|
||||
result = sot.set_metadata(sess, **set_meta)
|
||||
|
||||
self.assertEqual(result, self.metadata_result["metadata"])
|
||||
sess.post.assert_called_once_with("servers/IDENTIFIER/metadata",
|
||||
headers={},
|
||||
json={"metadata": set_meta})
|
||||
|
||||
def test_delete_metadata(self):
|
||||
sess = mock.Mock()
|
||||
response = mock.Mock()
|
||||
response.status_code = 200
|
||||
sess.delete.return_value = response
|
||||
|
||||
sot = server.Server(id=IDENTIFIER)
|
||||
|
||||
key = "hey"
|
||||
|
||||
sot.delete_metadata(sess, [key])
|
||||
|
||||
sess.delete.assert_called_once_with(
|
||||
"servers/IDENTIFIER/metadata/" + key,
|
||||
headers={"Accept": ""},
|
||||
)
|
||||
|
||||
def test_delete_metadata_error(self):
|
||||
sess = mock.Mock()
|
||||
response = mock.Mock()
|
||||
response.status_code = 400
|
||||
response.content = None
|
||||
sess.delete.return_value = response
|
||||
|
||||
sot = server.Server(id=IDENTIFIER)
|
||||
|
||||
key = "hey"
|
||||
|
||||
self.assertRaises(
|
||||
exceptions.BadRequestException,
|
||||
sot.delete_metadata,
|
||||
sess,
|
||||
[key])
|
||||
|
||||
def test_set_metadata_error(self):
|
||||
sess = mock.Mock()
|
||||
response = mock.Mock()
|
||||
response.status_code = 400
|
||||
response.content = None
|
||||
sess.post.return_value = response
|
||||
|
||||
sot = server.Server(id=IDENTIFIER)
|
||||
|
||||
set_meta = {"lol": "rofl"}
|
||||
|
||||
self.assertRaises(
|
||||
exceptions.BadRequestException,
|
||||
sot.set_metadata,
|
||||
sess,
|
||||
**set_meta)
|
@ -918,12 +918,11 @@ class TestCompute(TestComputeProxy):
|
||||
|
||||
def test_get_all_server_metadata(self):
|
||||
self._verify(
|
||||
"openstack.compute.v2.server.Server.get_metadata",
|
||||
"openstack.compute.v2.server.Server.fetch_metadata",
|
||||
self.proxy.get_server_metadata,
|
||||
method_args=["value"],
|
||||
method_result=server.Server(id="value", metadata={}),
|
||||
expected_args=[self.proxy],
|
||||
expected_result={})
|
||||
expected_result=server.Server(id="value", metadata={}))
|
||||
|
||||
def test_set_server_metadata(self):
|
||||
kwargs = {"a": "1", "b": "2"}
|
||||
@ -935,15 +934,16 @@ class TestCompute(TestComputeProxy):
|
||||
method_kwargs=kwargs,
|
||||
method_result=server.Server.existing(id=id, metadata=kwargs),
|
||||
expected_args=[self.proxy],
|
||||
expected_kwargs=kwargs,
|
||||
expected_result=kwargs)
|
||||
expected_kwargs={'metadata': kwargs},
|
||||
expected_result=server.Server.existing(id=id, metadata=kwargs)
|
||||
)
|
||||
|
||||
def test_delete_server_metadata(self):
|
||||
self._verify(
|
||||
"openstack.compute.v2.server.Server.delete_metadata",
|
||||
"openstack.compute.v2.server.Server.delete_metadata_item",
|
||||
self.proxy.delete_server_metadata,
|
||||
expected_result=None,
|
||||
method_args=["value", "key"],
|
||||
method_args=["value", ["key"]],
|
||||
expected_args=[self.proxy, "key"])
|
||||
|
||||
def test_create_image(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user