compute/v2 server metadata and server meta resouce

Change-Id: If361abaf76d13da343fd37b32b833eaf32c2ac19
This commit is contained in:
Terry Howe
2014-08-25 15:54:55 -06:00
parent c526a2fad3
commit 6f3a3d64d8
4 changed files with 350 additions and 0 deletions

View File

@@ -0,0 +1,73 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from openstack.compute import compute_service
from openstack import resource
from openstack import utils
class ServerMeta(resource.Resource):
resource_key = 'meta'
id_attribute = 'key'
base_path = '/servers/%(server_id)s/metadata'
service = compute_service.ComputeService()
# capabilities
allow_create = True
allow_retrieve = True
allow_update = True
allow_delete = True
allow_list = True
# Properties
key = resource.prop('key')
server_id = resource.prop('server_id')
value = resource.prop('value')
@classmethod
def create_by_id(cls, session, attrs, resource_id=None, path_args=None):
url = cls.base_path % path_args
url = utils.urljoin(url, resource_id)
body = {cls.resource_key: {attrs['key']: attrs['value']}}
resp = session.put(url, service=cls.service, json=body).body
return {'key': resource_id,
'value': resp[cls.resource_key][resource_id]}
@classmethod
def get_data_by_id(cls, session, resource_id, path_args=None,
include_headers=False):
url = cls.base_path % path_args
url = utils.urljoin(url, resource_id)
resp = session.get(url, service=cls.service).body
return {'key': resource_id,
'value': resp[cls.resource_key][resource_id]}
@classmethod
def update_by_id(cls, session, resource_id, attrs, path_args=None):
return cls.create_by_id(session, attrs, resource_id, path_args)
@classmethod
def delete_by_id(cls, session, resource_id, path_args=None):
url = cls.base_path % path_args
url = utils.urljoin(url, resource_id)
session.delete(url, service=cls.service, accept=None)
@classmethod
def list(cls, session, path_args=None, **params):
url = '/servers/%(server_id)s/metadata' % path_args
resp = session.get(url, service=cls.service, params=params).body
resp = resp['metadata']
return [cls.existing(server_id=path_args['server_id'], key=key,
value=value)
for key, value in six.iteritems(resp)]

View File

@@ -0,0 +1,51 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.compute import compute_service
from openstack import resource
class ServerMetadata(resource.Resource):
resource_key = 'metadata'
id_attribute = 'server_id'
base_path = '/servers/%(server_id)s/metadata'
service = compute_service.ComputeService()
# capabilities
allow_create = True
allow_retrieve = True
allow_update = True
# Properties
server_id = resource.prop('server_id')
@classmethod
def create_by_id(cls, session, attrs, resource_id=None, path_args=None):
no_id = attrs.copy()
no_id.pop('server_id')
body = {"metadata": no_id}
url = cls.base_path % path_args
resp = session.put(url, service=cls.service, json=body).body
attrs = resp["metadata"].copy()
attrs['server_id'] = resource_id
return attrs
@classmethod
def get_data_by_id(cls, session, resource_id, path_args=None,
include_headers=False):
url = cls.base_path % path_args
resp = session.get(url, service=cls.service).body
return resp[cls.resource_key]
@classmethod
def update_by_id(cls, session, resource_id, attrs, path_args=None):
return cls.create_by_id(session, attrs, resource_id, path_args)

View File

@@ -0,0 +1,131 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import testtools
from openstack.compute.v2 import server_meta
FAKE_KEY = 'cervus'
FAKE_SERVER_ID = 'cervidae'
FAKE_VALUE = 'canadensis'
EXAMPLE = {
'key': FAKE_KEY,
'server_id': FAKE_SERVER_ID,
'value': FAKE_VALUE,
}
FAKE_RESPONSE = {"meta": {FAKE_KEY: FAKE_VALUE}}
FAKE_RESPONSES = {"metadata": {FAKE_KEY: FAKE_VALUE}}
class TestServerMeta(testtools.TestCase):
def test_basic(self):
sot = server_meta.ServerMeta()
self.assertEqual('meta', sot.resource_key)
self.assertEqual(None, sot.resources_key)
self.assertEqual('/servers/%(server_id)s/metadata', sot.base_path)
self.assertEqual('compute', sot.service.service_type)
self.assertTrue(sot.allow_create)
self.assertTrue(sot.allow_retrieve)
self.assertTrue(sot.allow_update)
self.assertTrue(sot.allow_delete)
self.assertTrue(sot.allow_list)
def test_make_it(self):
sot = server_meta.ServerMeta(EXAMPLE)
self.assertEqual(EXAMPLE['key'], sot.key)
self.assertEqual(EXAMPLE['server_id'], sot.server_id)
self.assertEqual(EXAMPLE['value'], sot.value)
def test_create(self):
resp = mock.Mock()
resp.body = FAKE_RESPONSE
sess = mock.Mock()
sess.put = mock.MagicMock()
sess.put.return_value = resp
sot = server_meta.ServerMeta(EXAMPLE)
sot.create(sess)
url = 'servers/' + FAKE_SERVER_ID + '/metadata/' + FAKE_KEY
body = {"meta": {FAKE_KEY: FAKE_VALUE}}
sess.put.assert_called_with(url, service=sot.service, json=body)
self.assertEqual(FAKE_VALUE, sot.value)
self.assertEqual(FAKE_KEY, sot.key)
self.assertEqual(FAKE_SERVER_ID, sot.server_id)
def test_get(self):
resp = mock.Mock()
resp.body = FAKE_RESPONSES
sess = mock.Mock()
sess.get = mock.MagicMock()
sess.get.return_value = resp
sot = server_meta.ServerMeta()
path_args = {'server_id': FAKE_SERVER_ID}
resp = sot.list(sess, path_args=path_args)
url = '/servers/' + FAKE_SERVER_ID + '/metadata'
sess.get.assert_called_with(url, service=sot.service, params={})
self.assertEqual(1, len(resp))
self.assertEqual(FAKE_SERVER_ID, resp[0].server_id)
self.assertEqual(FAKE_KEY, resp[0].key)
self.assertEqual(FAKE_VALUE, resp[0].value)
def test_update(self):
resp = mock.Mock()
resp.body = FAKE_RESPONSE
sess = mock.Mock()
sess.put = mock.MagicMock()
sess.put.return_value = resp
sot = server_meta.ServerMeta(EXAMPLE)
sot.update(sess)
url = 'servers/' + FAKE_SERVER_ID + '/metadata/' + FAKE_KEY
body = {"meta": {FAKE_KEY: FAKE_VALUE}}
sess.put.assert_called_with(url, service=sot.service, json=body)
self.assertEqual(FAKE_VALUE, sot.value)
self.assertEqual(FAKE_KEY, sot.key)
self.assertEqual(FAKE_SERVER_ID, sot.server_id)
def test_delete(self):
resp = mock.Mock()
resp.body = FAKE_RESPONSES
sess = mock.Mock()
sess.delete = mock.MagicMock()
sess.delete.return_value = resp
sot = server_meta.ServerMeta(EXAMPLE)
sot.delete(sess)
url = 'servers/' + FAKE_SERVER_ID + '/metadata/' + FAKE_KEY
sess.delete.assert_called_with(url, service=sot.service, accept=None)
def test_list(self):
resp = mock.Mock()
resp.body = FAKE_RESPONSES
sess = mock.Mock()
sess.get = mock.MagicMock()
sess.get.return_value = resp
sot = server_meta.ServerMeta()
path_args = {'server_id': FAKE_SERVER_ID}
resp = sot.list(sess, path_args=path_args)
url = '/servers/' + FAKE_SERVER_ID + '/metadata'
sess.get.assert_called_with(url, service=sot.service, params={})
self.assertEqual(1, len(resp))
self.assertEqual(FAKE_SERVER_ID, resp[0].server_id)
self.assertEqual(FAKE_KEY, resp[0].key)
self.assertEqual(FAKE_VALUE, resp[0].value)

View File

@@ -0,0 +1,95 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import testtools
from openstack.compute.v2 import server_metadata
FAKE_SERVER_ID = 'cervidae'
FAKE_KEY = 'cervus'
FAKE_VALUE = 'canadensis'
FAKE_KEY2 = 'odocoileus'
FAKE_VALUE2 = 'hemionus'
EXAMPLE = {
'server_id': FAKE_SERVER_ID,
FAKE_KEY: FAKE_VALUE,
}
FAKE_RESPONSE = {"metadata": {FAKE_KEY: FAKE_VALUE, FAKE_KEY2: FAKE_VALUE2}}
class TestServerMetadata(testtools.TestCase):
def test_basic(self):
sot = server_metadata.ServerMetadata()
self.assertEqual('metadata', sot.resource_key)
self.assertEqual(None, sot.resources_key)
self.assertEqual('/servers/%(server_id)s/metadata', sot.base_path)
self.assertEqual('compute', sot.service.service_type)
self.assertTrue(sot.allow_create)
self.assertTrue(sot.allow_retrieve)
self.assertTrue(sot.allow_update)
self.assertFalse(sot.allow_delete)
self.assertFalse(sot.allow_list)
def test_make_it(self):
sot = server_metadata.ServerMetadata(EXAMPLE)
self.assertEqual(EXAMPLE['server_id'], sot.server_id)
self.assertEqual(FAKE_VALUE, sot[FAKE_KEY])
def test_create(self):
resp = mock.Mock()
resp.body = FAKE_RESPONSE
sess = mock.Mock()
sess.put = mock.MagicMock()
sess.put.return_value = resp
sot = server_metadata.ServerMetadata(EXAMPLE.copy())
sot.create(sess)
url = '/servers/' + FAKE_SERVER_ID + '/metadata'
body = {"metadata": {FAKE_KEY: FAKE_VALUE}}
sess.put.assert_called_with(url, service=sot.service, json=body)
self.assertEqual(FAKE_SERVER_ID, sot.server_id)
self.assertEqual(FAKE_VALUE, sot[FAKE_KEY])
def test_get(self):
resp = mock.Mock()
resp.body = FAKE_RESPONSE
sess = mock.Mock()
sess.get = mock.MagicMock()
sess.get.return_value = resp
sot = server_metadata.ServerMetadata(EXAMPLE.copy())
sot.get(sess)
url = '/servers/' + FAKE_SERVER_ID + '/metadata'
sess.get.assert_called_with(url, service=sot.service)
self.assertEqual(FAKE_SERVER_ID, sot.server_id)
self.assertEqual(FAKE_VALUE, sot[FAKE_KEY])
self.assertEqual(FAKE_VALUE2, sot[FAKE_KEY2])
def test_update(self):
resp = mock.Mock()
resp.body = FAKE_RESPONSE
sess = mock.Mock()
sess.put = mock.MagicMock()
sess.put.return_value = resp
sot = server_metadata.ServerMetadata(EXAMPLE.copy())
sot.update(sess)
url = '/servers/' + FAKE_SERVER_ID + '/metadata'
body = {"metadata": {FAKE_KEY: FAKE_VALUE}}
sess.put.assert_called_with(url, service=sot.service, json=body)
self.assertEqual(FAKE_SERVER_ID, sot.server_id)
self.assertEqual(FAKE_VALUE, sot[FAKE_KEY])