Merge "Implement volume attachment basic operation"

This commit is contained in:
Jenkins 2016-09-30 02:43:23 +00:00 committed by Gerrit Code Review
commit f5ef262ac9
8 changed files with 551 additions and 34 deletions

View File

@ -303,3 +303,11 @@ class VolumeTypeExists(Duplicate):
class VolumeTypeUpdateFailed(TricircleException):
message = _("Cannot update volume_type %(id)s")
class ServerMappingsNotFound(NotFound):
message = _('Instance %(server_id)s could not be found.')
class VolumeMappingsNotFound(NotFound):
message = _('Volume %(volume_id)s could not be found')

View File

@ -1296,3 +1296,25 @@ def volume_type_project_query(context, session=None, inactive=False,
filters = filters or {}
return model_query(context, models.VolumeTypeProjects, session=session,
read_deleted=read_deleted).filter_by(**filters)
@require_context
def get_server_mappings_by_top_id(context, server_id):
server_mappings = \
get_bottom_mappings_by_top_id(context, server_id, constants.RT_SERVER)
if not server_mappings:
raise exceptions.ServerMappingsNotFound(server_id=server_id)
return server_mappings
@require_context
def get_volume_mappings_by_top_id(context, volume_id):
volume_mappings = \
get_bottom_mappings_by_top_id(context, volume_id, constants.RT_VOLUME)
if not volume_mappings:
raise exceptions.VolumeMappingsNotFound(volume_id=volume_id)
return volume_mappings

View File

@ -33,9 +33,9 @@ from tricircle.nova_apigw.controllers import image
from tricircle.nova_apigw.controllers import network
from tricircle.nova_apigw.controllers import quota_sets
from tricircle.nova_apigw.controllers import server
from tricircle.nova_apigw.controllers import server_ips
from tricircle.nova_apigw.controllers import volume
LOG = logging.getLogger(__name__)
@ -72,7 +72,8 @@ class V21Controller(object):
}
self.server_sub_controller = {
'os-volume_attachments': volume.VolumeController,
'action': action.ActionController
'action': action.ActionController,
'ips': server_ips.ServerIpsController
}
def _get_resource_controller(self, project_id, remainder):

View File

@ -0,0 +1,72 @@
# Copyright (c) 2015 Huawei Tech. Co., Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import pecan
from pecan import expose
from pecan import rest
from oslo_log import log as logging
import tricircle.common.client as t_client
from tricircle.common import constants
import tricircle.common.context as t_context
from tricircle.common.i18n import _
from tricircle.common import utils
import tricircle.db.api as db_api
LOG = logging.getLogger(__name__)
class ServerIpsController(rest.RestController):
def __init__(self, project_id, server_id):
self.project_id = project_id
self.server_id = server_id
self.clients = {constants.TOP: t_client.Client()}
def _get_client(self, pod_name=constants.TOP):
if pod_name not in self.clients:
self.clients[pod_name] = t_client.Client(pod_name)
return self.clients[pod_name]
@expose(generic=True, template='json')
def get_all(self, **kwargs):
context = t_context.extract_context_from_environ()
server_mappings = db_api.get_server_mappings_by_top_id(
context, self.server_id)
if not server_mappings:
return utils.format_nova_error(
404, _('Server %s could not be found') % self.server_id)
try:
server_pod_name = server_mappings[0][0]['pod_name']
api = self._get_client(server_pod_name).get_native_client(
constants.RT_SERVER, context)
resp, body = api.client.get('/servers/%s/ips' % self.server_id)
pecan.response.status = resp.status_code
if not body:
return pecan.response
else:
return body
except Exception as e:
code = 500
message = _('Fail to lists assigned IP addresses'
'%(server_id)s: %(exception)s') % {
'server_id': self.server_id,
'exception': e}
if hasattr(e, 'code'):
code = e.code
LOG.error(message)
return utils.format_nova_error(code, message)

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import pecan
from pecan import expose
from pecan import rest
import re
@ -20,8 +21,8 @@ import re
from oslo_log import log as logging
import tricircle.common.client as t_client
from tricircle.common import constants
import tricircle.common.context as t_context
from tricircle.common import exceptions
from tricircle.common.i18n import _
from tricircle.common.i18n import _LE
from tricircle.common import utils
@ -53,17 +54,23 @@ class VolumeController(rest.RestController):
if 'volumeId' not in body:
return utils.format_nova_error(
400, _('Invalid input for field/attribute volumeAttachment'))
server_mappings = db_api.get_bottom_mappings_by_top_id(
context, self.server_id, constants.RT_SERVER)
if not server_mappings:
return utils.format_nova_error(404, _('Instance %s could not be '
'found.') % self.server_id)
volume_mappings = db_api.get_bottom_mappings_by_top_id(
context, body['volumeId'], constants.RT_VOLUME)
if not volume_mappings:
try:
server_mappings = db_api.get_server_mappings_by_top_id(
context, self.server_id)
volume_mappings = db_api.get_volume_mappings_by_top_id(
context, body['volumeId'])
except exceptions.ServerMappingsNotFound as e:
return utils.format_nova_error(404, e.message)
except exceptions.VolumeMappingsNotFound as e:
return utils.format_nova_error(404, e.message)
except Exception as e:
LOG.exception(_LE('Fail to create volume attachment for given'
'server %(server_id)s:'
'%(exception)s'),
{'server_id': self.server_id,
'exception': e})
return utils.format_nova_error(
404, _('Volume %s could not be found') % body['volumeId'])
500, _('Fail to create volume attachment'))
server_pod_name = server_mappings[0][0]['pod_name']
volume_pod_name = volume_mappings[0][0]['pod_name']
@ -89,8 +96,164 @@ class VolumeController(rest.RestController):
400, _('The supplied device path (%s) is '
'invalid.') % device)
client = self._get_client(server_pod_name)
volume = client.action_server_volumes(
context, 'create_server_volume',
server_mappings[0][1], volume_mappings[0][1], device)
return {'volumeAttachment': volume.to_dict()}
try:
api = self._get_client(server_pod_name).get_native_client(
'server_volume', context)
resp, body = api.client.post(
"/servers/%s/os-volume_attachments" % self.server_id, body=kw)
pecan.response.status = resp.status_code
if not body:
return pecan.response
else:
return body
except Exception as e:
code = 500
message = _('Fail to create volume attachment for given server '
'%(server_id)s: %(exception)s') % {
'server_id': self.server_id,
'exception': e}
if hasattr(e, 'code'):
code = e.code
LOG.error(message)
return utils.format_nova_error(code, message)
@expose(generic=True, template='json')
def get_one(self, _id):
"""Get the volume attachment identified by the attachment ID.
:param _id: the ID of volume attachment
:returns: the volume attachment
"""
context = t_context.extract_context_from_environ()
try:
server_mappings = db_api.get_server_mappings_by_top_id(
context, self.server_id)
except exceptions.ServerMappingsNotFound as e:
return utils.format_nova_error(404, e.message)
except Exception as e:
LOG.exception(_LE('Fail to get volume attachment'
'%(attachment_id)s from server %(server_id)s:'
'%(exception)s'),
{'attachment_id': _id,
'server_id': self.server_id,
'exception': e})
return utils.format_nova_error(
500, _('Fail to get volume attachment'))
server_pod_name = server_mappings[0][0]['pod_name']
try:
api = self._get_client(server_pod_name).get_native_client(
'server_volume', context)
resp, body = api.client.get(
"/servers/%s/os-volume_attachments/%s" %
(self.server_id, _id,))
pecan.response.status = resp.status_code
if not body:
return pecan.response
else:
return body
except Exception as e:
code = 500
message = _('Fail to get volume attachment %(attachment_id)s'
'from server %(server_id)s: %(exception)s') % {
'attachment_id': _id,
'server_id': self.server_id,
'exception': e}
if hasattr(e, 'code'):
code = e.code
LOG.error(message)
return utils.format_nova_error(code, message)
@expose(generic=True, template='json')
def get_all(self):
"""GET a list of all volume attachments for a server.
:returns: a list of volume attachments
"""
context = t_context.extract_context_from_environ()
try:
server_mappings = db_api.get_server_mappings_by_top_id(
context, self.server_id)
except exceptions.ServerMappingsNotFound as e:
return utils.format_nova_error(404, e.message)
except Exception as e:
LOG.exception(_LE('Fail to get volume attachments of server'
'%(server_id)s: %(exception)s'),
{'server_id': self.server_id,
'exception': e})
return utils.format_nova_error(
500, _('Fail to get volume attachments'))
server_pod_name = server_mappings[0][0]['pod_name']
try:
api = self._get_client(server_pod_name).get_native_client(
'server_volume', context)
resp, body = api.client.get(
"/servers/%s/os-volume_attachments" % self.server_id)
pecan.response.status = resp.status_code
if not body:
return pecan.response
else:
return body
except Exception as e:
code = 500
message = _('Fail to get volume attachments of server'
'%(server_id)s: %(exception)s') % {
'server_id': self.server_id,
'exception': e}
if hasattr(e, 'code'):
code = e.code
LOG.error(message)
return utils.format_nova_error(code, message)
@expose(generic=True, template='json')
def delete(self, _id):
"""Detach a volume identified by the attachment ID from the given server ID.
:param _id: the ID of volume attachment
"""
context = t_context.extract_context_from_environ()
try:
server_mappings = db_api.get_server_mappings_by_top_id(
context, self.server_id)
except exceptions.ServerMappingsNotFound as e:
return utils.format_nova_error(404, e.message)
except Exception as e:
LOG.exception(_LE('Fail to delete volume attachment'
'%(attachment_id)s from server %(server_id)s:'
'%(exception)s'),
{'attachment_id': _id,
'server_id': self.server_id,
'exception': e})
return utils.format_nova_error(
500, _('Fail to delete volume attachment'))
server_pod_name = server_mappings[0][0]['pod_name']
try:
api = self._get_client(server_pod_name).get_native_client(
'server_volume', context)
resp, body = api.client.delete(
"/servers/%s/os-volume_attachments/%s" %
(self.server_id, _id,))
pecan.response.status = resp.status_code
if not body:
return pecan.response
else:
return body
except Exception as e:
code = 500
message = _('Fail to delete volume attachments %(attachment_id)s'
'from server %(server_id)s: %(exception)s') % {
'attachment_id': _id,
'server_id': self.server_id,
'exception': e}
if hasattr(e, 'code'):
code = e.code
LOG.error(message)
return utils.format_nova_error(code, message)

View File

@ -45,6 +45,11 @@ TESTCASES="$TESTCASES|tempest.api.compute.admin.test_servers.ServersAdminTestJSO
TESTCASES="$TESTCASES|tempest.api.compute.admin.test_servers_negative.ServersAdminNegativeTestJSON.test_reset_state_server_invalid_state"
TESTCASES="$TESTCASES|tempest.api.compute.admin.test_servers_negative.ServersAdminNegativeTestJSON.test_reset_state_server_invalid_type"
TESTCASES="$TESTCASES|tempest.api.compute.admin.test_servers_negative.ServersAdminNegativeTestJSON.test_reset_state_server_nonexistent_server"
TESTCASES="$TESTCASES|tempest.api.compute.volumes.test_attach_volume.AttachVolumeShelveTestJSON.test_detach_volume_shelved_or_offload_server"
TESTCASES="$TESTCASES|tempest.api.compute.volumes.test_attach_volume.AttachVolumeTestJSON"
TESTCASES="$TESTCASES|tempest.api.compute.volumes.test_volumes_negative.VolumesNegativeTest.test_get_volume_without_passing_volume_id"
# add new test cases like following line for volume_type test
# TESTCASES="$TESTCASES|tempest.api.volume.admin.test_volumes_type"
TESTCASES="$TESTCASES)"
ostestr --regex $TESTCASES
@ -573,10 +578,10 @@ ostestr --regex $TESTCASES
# **DONE** tempest.api.compute.test_versions.TestVersions.test_list_api_versions[id-6c0a0990-43b6-4529-9b61-5fd8daf7c55c]
# tempest.api.compute.volumes.test_attach_volume.AttachVolumeShelveTestJSON.test_attach_detach_volume[id-52e9045a-e90d-4c0d-9087-79d657faffff]
# tempest.api.compute.volumes.test_attach_volume.AttachVolumeShelveTestJSON.test_attach_volume_shelved_or_offload_server[id-13a940b6-3474-4c3c-b03f-29b89112bfee]
# tempest.api.compute.volumes.test_attach_volume.AttachVolumeShelveTestJSON.test_detach_volume_shelved_or_offload_server[id-b54e86dd-a070-49c4-9c07-59ae6dae15aa]
# **DONE** tempest.api.compute.volumes.test_attach_volume.AttachVolumeShelveTestJSON.test_detach_volume_shelved_or_offload_server[id-b54e86dd-a070-49c4-9c07-59ae6dae15aa]
# tempest.api.compute.volumes.test_attach_volume.AttachVolumeShelveTestJSON.test_list_get_volume_attachments[id-7fa563fe-f0f7-43eb-9e22-a1ece036b513]
# tempest.api.compute.volumes.test_attach_volume.AttachVolumeTestJSON.test_attach_detach_volume[id-52e9045a-e90d-4c0d-9087-79d657faffff]
# tempest.api.compute.volumes.test_attach_volume.AttachVolumeTestJSON.test_list_get_volume_attachments[id-7fa563fe-f0f7-43eb-9e22-a1ece036b513]
# **DONE** tempest.api.compute.volumes.test_attach_volume.AttachVolumeTestJSON.test_attach_detach_volume[id-52e9045a-e90d-4c0d-9087-79d657faffff]
# **DONE** tempest.api.compute.volumes.test_attach_volume.AttachVolumeTestJSON.test_list_get_volume_attachments[id-7fa563fe-f0f7-43eb-9e22-a1ece036b513]
# tempest.api.compute.volumes.test_volume_snapshots.VolumesSnapshotsTestJSON.test_volume_snapshot_create_get_list_delete[id-cd4ec87d-7825-450d-8040-6e2068f2da8f]
# tempest.api.compute.volumes.test_volumes_get.VolumesGetTestJSON.test_volume_create_get_delete[id-f10f25eb-9775-4d9d-9cbe-1cf54dae9d5f]
# tempest.api.compute.volumes.test_volumes_list.VolumesTestJSON.test_volume_list[id-bc2dd1a0-15af-48e5-9990-f2e75a48325d]
@ -591,6 +596,6 @@ ostestr --regex $TESTCASES
# tempest.api.compute.volumes.test_volumes_negative.VolumesNegativeTest.test_delete_invalid_volume_id[id-62972737-124b-4513-b6cf-2f019f178494,negative]
# tempest.api.compute.volumes.test_volumes_negative.VolumesNegativeTest.test_delete_volume_without_passing_volume_id[id-0d1417c5-4ae8-4c2c-adc5-5f0b864253e5,negative]
# tempest.api.compute.volumes.test_volumes_negative.VolumesNegativeTest.test_get_invalid_volume_id[id-f01904f2-e975-4915-98ce-cb5fa27bde4f,negative]
# tempest.api.compute.volumes.test_volumes_negative.VolumesNegativeTest.test_get_volume_without_passing_volume_id[id-62bab09a-4c03-4617-8cca-8572bc94af9b,negative]
# **DONE** tempest.api.compute.volumes.test_volumes_negative.VolumesNegativeTest.test_get_volume_without_passing_volume_id[id-62bab09a-4c03-4617-8cca-8572bc94af9b,negative]
# tempest.api.compute.volumes.test_volumes_negative.VolumesNegativeTest.test_volume_delete_nonexistent_volume_id[id-54a34226-d910-4b00-9ef8-8683e6c55846,negative]
# tempest.api.compute.volumes.test_volumes_negative.VolumesNegativeTest.test_volume_get_nonexistent_volume_id[id-c03ea686-905b-41a2-8748-9635154b7c57,negative]

View File

@ -0,0 +1,103 @@
# Copyright (c) 2015 Huawei Tech. Co., Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mock import patch
import pecan
import unittest
from oslo_utils import uuidutils
from novaclient import client
from tricircle.common import constants
from tricircle.common import context
from tricircle.db import api
from tricircle.db import core
from tricircle.db import models
from tricircle.nova_apigw.controllers import server_ips
class FakeResponse(object):
def __new__(cls, code=500):
cls.status = code
cls.status_code = code
return super(FakeResponse, cls).__new__(cls)
class ServerIpsTest(unittest.TestCase):
def setUp(self):
core.initialize()
core.ModelBase.metadata.create_all(core.get_engine())
self.context = context.Context()
self.project_id = 'test_project'
self.context.tenant = self.project_id
self.context.user = 'test_user'
self.controller = server_ips.ServerIpsController(self.project_id, '')
def _prepare_pod(self, bottom_pod_num=1):
t_pod = {'pod_id': 't_pod_uuid', 'pod_name': 't_region',
'az_name': ''}
api.create_pod(self.context, t_pod)
b_pods = []
if bottom_pod_num == 1:
b_pod = {'pod_id': 'b_pod_uuid', 'pod_name': 'b_region',
'az_name': 'b_az'}
api.create_pod(self.context, b_pod)
b_pods.append(b_pod)
else:
for i in xrange(1, bottom_pod_num + 1):
b_pod = {'pod_id': 'b_pod_%d_uuid' % i,
'pod_name': 'b_region_%d' % i,
'az_name': 'b_az_%d' % i}
api.create_pod(self.context, b_pod)
b_pods.append(b_pod)
return t_pod, b_pods
def _prepare_server(self, pod):
t_server_id = uuidutils.generate_uuid()
b_server_id = t_server_id
with self.context.session.begin():
core.create_resource(
self.context, models.ResourceRouting,
{'top_id': t_server_id, 'bottom_id': b_server_id,
'pod_id': pod['pod_id'], 'project_id': self.project_id,
'resource_type': constants.RT_SERVER})
return t_server_id
def _prepare_pod_service(self, pod_id, service):
config_dict = {'service_id': uuidutils.generate_uuid(),
'pod_id': pod_id,
'service_type': service,
'service_url': 'fake_pod_service'}
api.create_pod_service_configuration(self.context, config_dict)
@patch.object(pecan, 'response', new=FakeResponse)
@patch.object(client.HTTPClient, 'get')
@patch.object(context, 'extract_context_from_environ')
def test_list_ips(self, mock_context, mock_get):
mock_context.return_value = self.context
mock_get.return_value = (FakeResponse(202), None)
t_pod, b_pods = self._prepare_pod()
self._prepare_pod_service(b_pods[0]['pod_id'], constants.ST_NOVA)
t_server_id = self._prepare_server(b_pods[0])
self.controller.server_id = t_server_id
res = self.controller.get_all()
url = '/servers/%s/ips' % t_server_id
mock_get.assert_called_once_with(url)
self.assertEqual(202, res.status)
def tearDown(self):
core.ModelBase.metadata.drop_all(core.get_engine())

View File

@ -41,6 +41,25 @@ class FakeVolume(object):
pass
class FakeClient(object):
def post(self, url, body):
return FakeResponse(), FakeVolume()
def get(self, url):
return FakeResponse(), FakeVolume()
def put(self, url, body):
return FakeResponse(), FakeVolume()
def delete(self, url):
return FakeResponse(), None
class FakeApi(object):
def __init__(self):
self.client = FakeClient()
class VolumeTest(unittest.TestCase):
def setUp(self):
core.initialize()
@ -70,13 +89,16 @@ class VolumeTest(unittest.TestCase):
def _validate_error_code(self, res, code):
self.assertEqual(code, res[res.keys()[0]]['code'])
@patch.object(pecan, 'response', new=FakeResponse)
@patch.object(client.Client, 'action_resources')
@patch.object(pecan, 'response')
@patch.object(FakeClient, 'post')
@patch.object(client.Client, 'get_native_client')
@patch.object(context, 'extract_context_from_environ')
def test_attach_volume(self, mock_context, mock_action):
def test_attach_volume(self, mock_context, mock_api, mock_post,
mock_response):
mock_context.return_value = self.context
mock_action.return_value = FakeVolume()
mock_api.return_value = FakeApi()
mock_response = FakeResponse()
mock_response.status = 202
t_pod, b_pods = self._prepare_pod(bottom_pod_num=2)
b_pod1 = b_pods[0]
b_pod2 = b_pods[1]
@ -109,16 +131,18 @@ class VolumeTest(unittest.TestCase):
self.controller.server_id = t_server_id
body = {'volumeAttachment': {'volumeId': t_volume1_id}}
self.controller.post(**body)
calls = [mock.call('server_volume', self.context)]
mock_api.assert_has_calls(calls)
url = "/servers/%s/os-volume_attachments" % t_server_id
calls = [mock.call(url, body=body)]
mock_post.assert_has_calls(calls)
body = {'volumeAttachment': {'volumeId': t_volume1_id,
'device': '/dev/vdb'}}
self.controller.post(**body)
calls = [mock.call('server_volume', self.context,
'create_server_volume',
b_server_id, b_volume1_id, None),
mock.call('server_volume', self.context,
'create_server_volume',
b_server_id, b_volume1_id, '/dev/vdb')]
mock_action.assert_has_calls(calls)
calls = [mock.call('server_volume', self.context)]
mock_api.assert_has_calls(calls)
calls = [mock.call(url, body=body)]
mock_post.assert_has_calls(calls)
# failure case, bad request
body = {'volumeAttachment': {'volumeId': t_volume2_id}}
@ -157,5 +181,124 @@ class VolumeTest(unittest.TestCase):
self.controller.server_id = 'fake_server_id'
body = {'volumeAttachment': {'volumeId': t_volume1_id}}
res = self.controller.post(**body)
self._validate_error_code(res, 404)
@patch.object(pecan, 'response')
@patch.object(FakeClient, 'delete')
@patch.object(client.Client, 'get_native_client')
@patch.object(context, 'extract_context_from_environ')
def test_detach_volume(self, mock_context, mock_api, mock_delete,
mock_response):
mock_context.return_value = self.context
mock_api.return_value = FakeApi()
mock_response = FakeResponse()
mock_response.status = 202
t_pod, b_pods = self._prepare_pod(bottom_pod_num=1)
b_pod1 = b_pods
t_server_id = uuidutils.generate_uuid()
b_server_id = t_server_id
with self.context.session.begin():
core.create_resource(
self.context, models.ResourceRouting,
{'top_id': t_server_id, 'bottom_id': b_server_id,
'pod_id': b_pod1['pod_id'], 'project_id': self.project_id,
'resource_type': constants.RT_SERVER})
t_volume1_id = uuidutils.generate_uuid()
b_volume1_id = t_volume1_id
with self.context.session.begin():
core.create_resource(
self.context, models.ResourceRouting,
{'top_id': t_volume1_id, 'bottom_id': b_volume1_id,
'pod_id': b_pod1['pod_id'], 'project_id': self.project_id,
'resource_type': constants.RT_VOLUME})
# success case
self.controller.server_id = t_server_id
body = {'volumeAttachment': {'volumeId': t_volume1_id}}
self.controller.post(**body)
self.controller.delete(t_volume1_id)
calls = [mock.call('server_volume', self.context),
mock.call('server_volume', self.context)]
mock_api.assert_has_calls(calls)
url = "/servers/%s/os-volume_attachments/%s" % (
t_server_id, t_volume1_id)
calls = [mock.call(url)]
mock_delete.assert_has_calls(calls)
# failure case, resource not found
body = {'volumeAttachment': {'volumeId': t_volume1_id}}
self.controller.post(**body)
self.controller.server_id = 'fake_server_id'
res = self.controller.delete(t_volume1_id)
self._validate_error_code(res, 404)
@patch.object(pecan, 'response')
@patch.object(FakeClient, 'get')
@patch.object(client.Client, 'get_native_client')
@patch.object(context, 'extract_context_from_environ')
def test_get_volume_attachments(self, mock_context, mock_api,
mock_get, mock_response):
mock_context.return_value = self.context
mock_api.return_value = FakeApi()
mock_response = FakeResponse()
mock_response.status = 202
t_pod, b_pods = self._prepare_pod(bottom_pod_num=1)
b_pod1 = b_pods
t_server_id = uuidutils.generate_uuid()
b_server_id = t_server_id
with self.context.session.begin():
core.create_resource(
self.context, models.ResourceRouting,
{'top_id': t_server_id, 'bottom_id': b_server_id,
'pod_id': b_pod1['pod_id'], 'project_id': self.project_id,
'resource_type': constants.RT_SERVER})
t_volume1_id = uuidutils.generate_uuid()
b_volume1_id = t_volume1_id
t_volume2_id = uuidutils.generate_uuid()
b_volume2_id = t_volume2_id
with self.context.session.begin():
core.create_resource(
self.context, models.ResourceRouting,
{'top_id': t_volume1_id, 'bottom_id': b_volume1_id,
'pod_id': b_pod1['pod_id'], 'project_id': self.project_id,
'resource_type': constants.RT_VOLUME})
core.create_resource(
self.context, models.ResourceRouting,
{'top_id': t_volume2_id, 'bottom_id': b_volume2_id,
'pod_id': b_pod1['pod_id'], 'project_id': self.project_id,
'resource_type': constants.RT_VOLUME})
# success case
self.controller.server_id = t_server_id
body = {'volumeAttachment': {'volumeId': t_volume1_id}}
self.controller.post(**body)
body = {'volumeAttachment': {'volumeId': t_volume2_id}}
self.controller.post(**body)
self.controller.get_one(t_volume1_id)
url = "/servers/%s/os-volume_attachments/%s" % (
t_server_id, t_volume1_id)
calls = [mock.call(url)]
mock_get.asset_has_calls(calls)
self.controller.get_all()
url = "/servers/%s/os-volume_attachments" % t_server_id
calls = [mock.call(calls)]
mock_get.asset_has_calls(calls)
calls = [mock.call('server_volume', self.context),
mock.call('server_volume', self.context),
mock.call('server_volume', self.context),
mock.call('server_volume', self.context)]
mock_api.assert_has_calls(calls)
# failure case, resource not found
self.controller.server_id = 'fake_server_id'
res = self.controller.get_one(t_volume1_id)
self._validate_error_code(res, 404)
res = self.controller.get_all()
self._validate_error_code(res, 404)
def tearDown(self):
core.ModelBase.metadata.drop_all(core.get_engine())