os-brick/os_brick/tests/caches/test_opencas.py

91 lines
3.4 KiB
Python

# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from os_brick.caches import opencas
from os_brick import exception
from os_brick.tests import base
class OpenCASEngineTestCase(base.TestCase):
def setUp(self):
super().setUp()
self.connection_info = {
"data": {
"device_path": "/dev/disk/by-path/"
"ip-192.0.2.0:3260-iscsi-iqn.2010-10.org.openstack"
":volume-fake_uuid-lun-1",
},
}
@mock.patch(
'os_brick.privileged.opencas.is_engine_ready',
return_value=True,
)
def test_is_engine_ready(self, mock_is_engine_ready):
engine = opencas.OpenCASEngine(opencas_cache_id=42)
self.assertTrue(engine.is_engine_ready())
mock_is_engine_ready.assert_called_once_with(42)
@mock.patch('os_brick.privileged.opencas.map_device')
def test_map_casdisk(self, mock_unmap_device):
engine = opencas.OpenCASEngine(root_helper=None, opencas_cache_id=1)
engine._map_casdisk('1')
mock_unmap_device.assert_called_once_with(1, '1')
@mock.patch('os_brick.caches.opencas.OpenCASEngine._map_casdisk')
def test_attach_volume(self, mock_map_casdisk):
engine = opencas.OpenCASEngine(root_helper=None, opencas_cache_id=1)
# Exception raised if we don't pass a dev_path or the dev_path is None
self.assertRaises(exception.VolumePathsNotFound, engine.attach_volume)
self.assertRaises(
exception.VolumePathsNotFound,
engine.attach_volume,
dev_path=None,
)
# No exception if dev_path set correctly
engine.attach_volume(dev_path='path')
mock_map_casdisk.assert_called_once_with(mock.ANY)
@mock.patch('os_brick.privileged.opencas.unmap_device')
def test_unmap_casdisk(self, mock_unmap_device):
engine = opencas.OpenCASEngine(root_helper=None, opencas_cache_id=1)
engine._unmap_casdisk('1')
mock_unmap_device.assert_called_once_with(1, '1')
@mock.patch('os_brick.privileged.opencas.get_mapping')
@mock.patch('os_brick.caches.opencas.OpenCASEngine._unmap_casdisk')
def test_detach_volume(self, mock_unmap_casdisk, mock_get_mapping):
mock_get_mapping.return_value = ('1', '/dev/sdd')
engine = opencas.OpenCASEngine(root_helper=None, opencas_cache_id=1)
# Exception raised if we don't pass a dev_path or the dev_path is None
self.assertRaises(exception.VolumePathsNotFound, engine.detach_volume)
self.assertRaises(
exception.VolumePathsNotFound,
engine.detach_volume,
dev_path=None,
)
# No exception if dev_path set correctly
engine.detach_volume(dev_path='/dev/cas1-1')
mock_unmap_casdisk.assert_called_once_with('1')