Add tests to check attachments are secure

A vulnerability was detected in Cinder that could allow users to access
other people's volumes.

The solution was to limit some of the operations on attached volumes to
only OpenStack services.

This patch adds some negative tests to check that a user cannot directly
call Cinder to detach a volume, force detach it, terminate its
connection, or delete its attachment.

Depends-On: I612905a1bf4a1706cce913c0d8a6df7a240d599a
Related-Bug: #2004555
Change-Id: Ice6532ce7943e9a9363e443515946865541d09c2
This commit is contained in:
Gorka Eguileor 2023-04-20 17:09:43 +02:00 committed by Dan Smith
parent e5da6756b9
commit cbaf22e85d
6 changed files with 217 additions and 4 deletions

View File

@ -0,0 +1,5 @@
---
features:
- |
Add delete_attachment to the v3 AttachmentsClient and terminate_connection
to the v3 VolumesClient.

View File

@ -83,7 +83,7 @@ class VolumesActionsTest(base.BaseVolumeAdminTest):
server_id = self.create_server()['id'] server_id = self.create_server()['id']
volume_id = self.create_volume()['id'] volume_id = self.create_volume()['id']
# Attach volume # Request Cinder to map & export volume (it's not attached to instance)
self.volumes_client.attach_volume( self.volumes_client.attach_volume(
volume_id, volume_id,
instance_uuid=server_id, instance_uuid=server_id,
@ -101,7 +101,9 @@ class VolumesActionsTest(base.BaseVolumeAdminTest):
waiters.wait_for_volume_resource_status(self.volumes_client, waiters.wait_for_volume_resource_status(self.volumes_client,
volume_id, 'error') volume_id, 'error')
# Force detach volume # The force detach volume calls works because the volume is not really
# connected to the instance (it is safe), otherwise it would be
# rejected for security reasons (bug #2004555).
self.admin_volume_client.force_detach_volume( self.admin_volume_client.force_detach_volume(
volume_id, connector=None, volume_id, connector=None,
attachment_id=attachment['attachment_id']) attachment_id=attachment['attachment_id'])

View File

@ -295,6 +295,7 @@ show_volume_summary = {
attach_volume = {'status_code': [202]} attach_volume = {'status_code': [202]}
set_bootable_volume = {'status_code': [200]} set_bootable_volume = {'status_code': [200]}
detach_volume = {'status_code': [202]} detach_volume = {'status_code': [202]}
terminate_connection = {'status_code': [202]}
reserve_volume = {'status_code': [202]} reserve_volume = {'status_code': [202]}
unreserve_volume = {'status_code': [202]} unreserve_volume = {'status_code': [202]}
extend_volume = {'status_code': [202]} extend_volume = {'status_code': [202]}

View File

@ -26,3 +26,10 @@ class AttachmentsClient(base_client.BaseClient):
body = json.loads(body) body = json.loads(body)
self.expected_success(200, resp.status) self.expected_success(200, resp.status)
return rest_client.ResponseBody(resp, body) return rest_client.ResponseBody(resp, body)
def delete_attachment(self, attachment_id):
"""Delete volume attachment."""
url = "attachments/%s" % (attachment_id)
resp, body = self.delete(url)
self.expected_success(200, resp.status)
return rest_client.ResponseBody(resp, body)

View File

@ -205,14 +205,23 @@ class VolumesClient(base_client.BaseClient):
self.validate_response(schema.set_bootable_volume, resp, body) self.validate_response(schema.set_bootable_volume, resp, body)
return rest_client.ResponseBody(resp, body) return rest_client.ResponseBody(resp, body)
def detach_volume(self, volume_id): def detach_volume(self, volume_id, **kwargs):
"""Detaches a volume from an instance.""" """Detaches a volume from an instance."""
post_body = json.dumps({'os-detach': {}}) post_body = json.dumps({'os-detach': kwargs})
url = 'volumes/%s/action' % (volume_id) url = 'volumes/%s/action' % (volume_id)
resp, body = self.post(url, post_body) resp, body = self.post(url, post_body)
self.validate_response(schema.detach_volume, resp, body) self.validate_response(schema.detach_volume, resp, body)
return rest_client.ResponseBody(resp, body) return rest_client.ResponseBody(resp, body)
def terminate_connection(self, volume_id, connector):
"""Detaches a volume from an instance using terminate_connection."""
post_body = json.dumps(
{'os-terminate_connection': {'connector': connector}})
url = 'volumes/%s/action' % (volume_id)
resp, body = self.post(url, post_body)
self.validate_response(schema.terminate_connection, resp, body)
return rest_client.ResponseBody(resp, body)
def reserve_volume(self, volume_id): def reserve_volume(self, volume_id):
"""Reserves a volume.""" """Reserves a volume."""
post_body = json.dumps({'os-reserve': {}}) post_body = json.dumps({'os-reserve': {}})

View File

@ -0,0 +1,189 @@
# Copyright 2023 Red Hat
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from tempest.common import utils
from tempest.common import waiters
from tempest import config
from tempest.lib import decorators
from tempest.lib import exceptions
from tempest.scenario import manager
CONF = config.CONF
class BaseAttachmentTest(manager.ScenarioTest):
@classmethod
def setup_clients(cls):
super().setup_clients()
cls.attachments_client = cls.os_primary.attachments_client_latest
cls.admin_volume_client = cls.os_admin.volumes_client_latest
def _call_with_fake_service_token(self, valid_token,
client, method_name, *args, **kwargs):
"""Call client method with non-service service token
Add a service token header that can be a valid normal user token (which
won't have the service role) or an invalid token altogether.
"""
original_raw_request = client.raw_request
def raw_request(url, method, headers=None, body=None, chunked=False,
log_req_body=None):
token = headers['X-Auth-Token']
if not valid_token:
token = token[:-1] + ('a' if token[-1] != 'a' else 'b')
headers['X-Service-Token'] = token
return original_raw_request(url, method, headers=headers,
body=body, chunked=chunked,
log_req_body=log_req_body)
client_method = getattr(client, method_name)
with mock.patch.object(client, 'raw_request', raw_request):
return client_method(*args, **kwargs)
class TestServerVolumeAttachmentScenario(BaseAttachmentTest):
"""Test server attachment behaviors
This tests that volume attachments to servers may not be removed directly
and are only allowed through the compute service (bug #2004555).
"""
@decorators.attr(type='slow')
@decorators.idempotent_id('be615530-f105-437a-8afe-ce998c9535d9')
@utils.services('compute', 'volume', 'image', 'network')
def test_server_detach_rules(self):
"""Test that various methods of detaching a volume honors the rules"""
server = self.create_server(wait_until='SSHABLE')
servers = self.servers_client.list_servers()['servers']
self.assertIn(server['id'], [x['id'] for x in servers])
volume = self.create_volume()
volume = self.nova_volume_attach(server, volume)
self.addCleanup(self.nova_volume_detach, server, volume)
att_id = volume['attachments'][0]['attachment_id']
# Test user call to detach volume is rejected
self.assertRaises((exceptions.Forbidden, exceptions.Conflict),
self.volumes_client.detach_volume, volume['id'])
# Test user call to terminate connection is rejected
self.assertRaises((exceptions.Forbidden, exceptions.Conflict),
self.volumes_client.terminate_connection,
volume['id'], connector={})
# Test faking of service token on call to detach, force detach,
# terminate_connection
for valid_token in (True, False):
valid_exceptions = [exceptions.Forbidden, exceptions.Conflict]
if not valid_token:
valid_exceptions.append(exceptions.Unauthorized)
self.assertRaises(
tuple(valid_exceptions),
self._call_with_fake_service_token,
valid_token,
self.volumes_client,
'detach_volume',
volume['id'])
self.assertRaises(
tuple(valid_exceptions),
self._call_with_fake_service_token,
valid_token,
self.volumes_client,
'terminate_connection',
volume['id'], connector={})
# Reset volume's status to error
self.admin_volume_client.reset_volume_status(volume['id'],
status='error')
waiters.wait_for_volume_resource_status(self.volumes_client,
volume['id'], 'error')
# For the cleanup, we need to reset the volume status to in-use before
# the other cleanup steps try to detach it.
self.addCleanup(waiters.wait_for_volume_resource_status,
self.volumes_client, volume['id'], 'in-use')
self.addCleanup(self.admin_volume_client.reset_volume_status,
volume['id'], status='in-use')
# Test user call to force detach volume is rejected
self.assertRaises(
(exceptions.Forbidden, exceptions.Conflict),
self.admin_volume_client.force_detach_volume,
volume['id'], connector=None,
attachment_id=att_id)
# Test trying to override detach with force and service token
for valid_token in (True, False):
valid_exceptions = [exceptions.Forbidden, exceptions.Conflict]
if not valid_token:
valid_exceptions.append(exceptions.Unauthorized)
self.assertRaises(
tuple(valid_exceptions),
self._call_with_fake_service_token,
valid_token,
self.admin_volume_client,
'force_detach_volume',
volume['id'], connector=None, attachment_id=att_id)
# Test user call to detach with mismatch is rejected
volume2 = self.create_volume()
volume2 = self.nova_volume_attach(server, volume2)
att_id2 = volume2['attachments'][0]['attachment_id']
self.assertRaises(
(exceptions.Forbidden, exceptions.BadRequest),
self.volumes_client.detach_volume,
volume['id'], attachment_id=att_id2)
class TestServerVolumeAttachScenarioOldVersion(BaseAttachmentTest):
volume_min_microversion = '3.27'
volume_max_microversion = 'latest'
@decorators.attr(type='slow')
@decorators.idempotent_id('6f4d2144-99f4-495c-8b0b-c6a537971418')
@utils.services('compute', 'volume', 'image', 'network')
def test_old_versions_reject(self):
server = self.create_server(wait_until='SSHABLE')
servers = self.servers_client.list_servers()['servers']
self.assertIn(server['id'], [x['id'] for x in servers])
volume = self.create_volume()
volume = self.nova_volume_attach(server, volume)
self.addCleanup(self.nova_volume_detach, server, volume)
att_id = volume['attachments'][0]['attachment_id']
for valid_token in (True, False):
valid_exceptions = [exceptions.Forbidden,
exceptions.Conflict]
if not valid_token:
valid_exceptions.append(exceptions.Unauthorized)
self.assertRaises(
tuple(valid_exceptions),
self._call_with_fake_service_token,
valid_token,
self.attachments_client,
'delete_attachment',
att_id)
self.assertRaises(
(exceptions.Forbidden, exceptions.Conflict),
self.attachments_client.delete_attachment,
att_id)