diff --git a/nova/tests/unit/virt/libvirt/volume/test_mount.py b/nova/tests/unit/virt/libvirt/volume/test_mount.py new file mode 100644 index 000000000000..8dd136c7eaaa --- /dev/null +++ b/nova/tests/unit/virt/libvirt/volume/test_mount.py @@ -0,0 +1,611 @@ +# Copyright 2017 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import os.path +import threading +import time + +import eventlet +import fixtures +import mock + +from oslo_concurrency import processutils + +from nova import exception +from nova import test +from nova.tests import uuidsentinel as uuids +from nova.virt.libvirt import config as libvirt_config +from nova.virt.libvirt import guest as libvirt_guest +from nova.virt.libvirt import host as libvirt_host +from nova.virt.libvirt.volume import mount + + +# We wait on events in a few cases. In normal execution the wait period should +# be in the order of fractions of a millisecond. However, if we've hit a bug we +# might deadlock and never return. To be nice to our test environment, we cut +# this short at MAX_WAIT seconds. This should be large enough that normal +# jitter won't trigger it, but not so long that it's annoying to wait for. +MAX_WAIT = 2 + + +class TestThreadController(object): + """Helper class for executing a test thread incrementally by waiting at + named waitpoints. + + def test(ctl): + things() + ctl.waitpoint('foo') + more_things() + ctl.waitpoint('bar') + final_things() + + ctl = TestThreadController(test) + ctl.runto('foo') + assert(things) + ctl.runto('bar') + assert(more_things) + ctl.finish() + assert(final_things) + + This gets more interesting when the waitpoints are mocked into non-test + code. + """ + + # A map of threads to controllers + all_threads = {} + + def __init__(self, fn): + """Create a TestThreadController. + + :param fn: A test function which takes a TestThreadController as its + only argument + """ + + # All updates to wait_at and waiting are guarded by wait_lock + self.wait_lock = threading.Condition() + # The name of the next wait point + self.wait_at = None + # True when waiting at a waitpoint + self.waiting = False + # Incremented every time we continue from a waitpoint + self.epoch = 1 + # The last epoch we waited at + self.last_epoch = 0 + + self.start_event = eventlet.event.Event() + self.running = False + self.complete = False + + # We must not execute fn() until the thread has been registered in + # all_threads. eventlet doesn't give us an API to do this directly, + # so we defer with an Event + def deferred_start(): + self.start_event.wait() + fn() + + with self.wait_lock: + self.complete = True + self.wait_lock.notify_all() + + self.thread = eventlet.greenthread.spawn(deferred_start) + self.all_threads[self.thread] = self + + @classmethod + def current(cls): + return cls.all_threads.get(eventlet.greenthread.getcurrent()) + + def _ensure_running(self): + if not self.running: + self.running = True + self.start_event.send() + + def waitpoint(self, name): + """Called by the test thread. Wait at a waitpoint called name""" + with self.wait_lock: + wait_since = time.time() + while name == self.wait_at: + self.waiting = True + self.wait_lock.notify_all() + self.wait_lock.wait(1) + assert(time.time() - wait_since < MAX_WAIT) + + self.epoch += 1 + self.waiting = False + self.wait_lock.notify_all() + + def runto(self, name): + """Called by the control thread. Cause the test thread to run until + reaching a waitpoint called name. When runto() exits, the test + thread is guaranteed to have reached this waitpoint. + """ + with self.wait_lock: + # Set a new wait point + self.wait_at = name + self.wait_lock.notify_all() + + # We deliberately don't do this first to avoid a race the first + # time we call runto() + self._ensure_running() + + # Wait until the test thread is at the wait point + wait_since = time.time() + while self.epoch == self.last_epoch or not self.waiting: + self.wait_lock.wait(1) + assert(time.time() - wait_since < MAX_WAIT) + + self.last_epoch = self.epoch + + def start(self): + """Called by the control thread. Cause the test thread to start + running, but to not wait for it to complete. + """ + self._ensure_running() + + def finish(self): + """Called by the control thread. Cause the test thread to run to + completion. When finish() exits, the test thread is guaranteed to + have completed. + """ + self._ensure_running() + + wait_since = time.time() + with self.wait_lock: + self.wait_at = None + self.wait_lock.notify_all() + while not self.complete: + self.wait_lock.wait(1) + assert(time.time() - wait_since < MAX_WAIT) + + self.thread.wait() + + +class HostMountStateTestCase(test.NoDBTestCase): + def setUp(self): + super(HostMountStateTestCase, self).setUp() + + self.mounted = set() + + def fake_execute(cmd, *args, **kwargs): + if cmd == 'mount': + path = args[-1] + if path in self.mounted: + raise processutils.ProcessExecutionError('Already mounted') + self.mounted.add(path) + elif cmd == 'umount': + path = args[-1] + if path not in self.mounted: + raise processutils.ProcessExecutionError('Not mounted') + self.mounted.remove(path) + + def fake_ismount(path): + return path in self.mounted + + mock_execute = mock.MagicMock(side_effect=fake_execute) + + self.useFixture(fixtures.MonkeyPatch('nova.utils.execute', + mock_execute)) + self.useFixture(fixtures.MonkeyPatch('os.path.ismount', fake_ismount)) + + def test_init(self): + # Test that we initialise the state of MountManager correctly at + # startup + def fake_disk(disk): + libvirt_disk = libvirt_config.LibvirtConfigGuestDisk() + libvirt_disk.source_type = disk[0] + libvirt_disk.source_path = os.path.join(*disk[1]) + return libvirt_disk + + def mock_guest(uuid, disks): + guest = mock.create_autospec(libvirt_guest.Guest) + guest.uuid = uuid + guest.get_all_disks.return_value = map(fake_disk, disks) + return guest + + local_dir = '/local' + mountpoint_a = '/mnt/a' + mountpoint_b = '/mnt/b' + + self.mounted.add(mountpoint_a) + self.mounted.add(mountpoint_b) + + guests = map(mock_guest, [uuids.instance_a, uuids.instance_b], [ + # Local file root disk and a volume on each of mountpoints a and b + [ + ('file', (local_dir, uuids.instance_a, 'disk')), + ('file', (mountpoint_a, 'vola1')), + ('file', (mountpoint_b, 'volb1')), + ], + + # Local LVM root disk and a volume on each of mountpoints a and b + [ + ('block', ('/dev', 'vg', uuids.instance_b + '_disk')), + ('file', (mountpoint_a, 'vola2')), + ('file', (mountpoint_b, 'volb2')), + ] + ]) + + host = mock.create_autospec(libvirt_host.Host) + host.list_guests.return_value = guests + + m = mount._HostMountState(host, 0) + + self.assertEqual([mountpoint_a, mountpoint_b], + sorted(m.mountpoints.keys())) + + self.assertSetEqual(set([('vola1', uuids.instance_a), + ('vola2', uuids.instance_b)]), + m.mountpoints[mountpoint_a].attachments) + self.assertSetEqual(set([('volb1', uuids.instance_a), + ('volb2', uuids.instance_b)]), + m.mountpoints[mountpoint_b].attachments) + + @staticmethod + def _get_clean_hostmountstate(): + # list_guests returns no guests: _HostMountState initial state is + # clean. + host = mock.create_autospec(libvirt_host.Host) + host.list_guests.return_value = [] + return mount._HostMountState(host, 0) + + def _sentinel_mount(self, m, vol, mountpoint=mock.sentinel.mountpoint, + instance=None): + if instance is None: + instance = mock.sentinel.instance + instance.uuid = uuids.instance + + m.mount(mock.sentinel.fstype, mock.sentinel.export, + vol, mountpoint, instance, + [mock.sentinel.option1, mock.sentinel.option2]) + + def _sentinel_umount(self, m, vol, mountpoint=mock.sentinel.mountpoint, + instance=mock.sentinel.instance): + m.umount(vol, mountpoint, instance) + + @staticmethod + def _expected_sentinel_mount_calls(mountpoint=mock.sentinel.mountpoint): + return [mock.call('mkdir', '-p', mountpoint), + mock.call('mount', '-t', mock.sentinel.fstype, + mock.sentinel.option1, mock.sentinel.option2, + mock.sentinel.export, mountpoint, + run_as_root=True)] + + @staticmethod + def _expected_sentinel_umount_calls(mountpoint=mock.sentinel.mountpoint): + return [mock.call('umount', mountpoint, + attempts=3, delay_on_retry=True, + run_as_root=True), + mock.call('rmdir', mountpoint)] + + def test_mount_umount(self): + # Mount 2 different volumes from the same export. Test that we only + # mount and umount once. + m = self._get_clean_hostmountstate() + + # Mount vol_a from export + self._sentinel_mount(m, mock.sentinel.vol_a) + expected_calls = self._expected_sentinel_mount_calls() + mount.utils.execute.assert_has_calls(expected_calls) + + # Mount vol_b from export. We shouldn't have mounted again + self._sentinel_mount(m, mock.sentinel.vol_b) + mount.utils.execute.assert_has_calls(expected_calls) + + # Unmount vol_a. We shouldn't have unmounted + self._sentinel_umount(m, mock.sentinel.vol_a) + mount.utils.execute.assert_has_calls(expected_calls) + + # Unmount vol_b. We should have umounted. + self._sentinel_umount(m, mock.sentinel.vol_b) + expected_calls.extend(self._expected_sentinel_umount_calls()) + mount.utils.execute.assert_has_calls(expected_calls) + + def test_mount_umount_multi_attach(self): + # Mount a volume from a single export for 2 different instances. Test + # that we only mount and umount once. + m = self._get_clean_hostmountstate() + + instance_a = mock.sentinel.instance_a + instance_a.uuid = uuids.instance_a + instance_b = mock.sentinel.instance_b + instance_b.uuid = uuids.instance_b + + # Mount vol_a for instance_a + self._sentinel_mount(m, mock.sentinel.vol_a, instance=instance_a) + expected_calls = self._expected_sentinel_mount_calls() + mount.utils.execute.assert_has_calls(expected_calls) + + # Mount vol_a for instance_b. We shouldn't have mounted again + self._sentinel_mount(m, mock.sentinel.vol_a, instance=instance_b) + mount.utils.execute.assert_has_calls(expected_calls) + + # Unmount vol_a for instance_a. We shouldn't have unmounted + self._sentinel_umount(m, mock.sentinel.vol_a, instance=instance_a) + mount.utils.execute.assert_has_calls(expected_calls) + + # Unmount vol_a for instance_b. We should have umounted. + self._sentinel_umount(m, mock.sentinel.vol_a, instance=instance_b) + expected_calls.extend(self._expected_sentinel_umount_calls()) + mount.utils.execute.assert_has_calls(expected_calls) + + def test_mount_concurrent(self): + # This is 2 tests in 1, because the first test is the precondition + # for the second. + + # The first test is that if 2 threads call mount simultaneously, + # only one of them will call mount + + # The second test is that we correctly handle the case where we + # delete a lock after umount. During the umount of the first test, + # which will delete the lock when it completes, we start 2 more + # threads which both call mount. These threads are holding a lock + # which is about to be deleted. We test that they still don't race, + # and only one of them calls mount. + m = self._get_clean_hostmountstate() + + def mount_a(): + # Mount vol_a from export + self._sentinel_mount(m, mock.sentinel.vol_a) + TestThreadController.current().waitpoint('mounted') + self._sentinel_umount(m, mock.sentinel.vol_a) + + def mount_b(): + # Mount vol_b from export + self._sentinel_mount(m, mock.sentinel.vol_b) + self._sentinel_umount(m, mock.sentinel.vol_b) + + def mount_c(): + self._sentinel_mount(m, mock.sentinel.vol_c) + + def mount_d(): + self._sentinel_mount(m, mock.sentinel.vol_d) + + ctl_a = TestThreadController(mount_a) + ctl_b = TestThreadController(mount_b) + ctl_c = TestThreadController(mount_c) + ctl_d = TestThreadController(mount_d) + + orig_execute = mount.utils.execute.side_effect + + def trap_mount_umount(cmd, *args, **kwargs): + # Conditionally wait at a waitpoint named after the command + # we're executing + ctl = TestThreadController.current() + ctl.waitpoint(cmd) + + orig_execute(cmd, *args, **kwargs) + + mount.utils.execute.side_effect = trap_mount_umount + + expected_calls = [] + + # Run the first thread until it's blocked while calling mount + ctl_a.runto('mount') + expected_calls.extend(self._expected_sentinel_mount_calls()) + + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + # Start the second mount, and ensure it's got plenty of opportunity + # to race. + ctl_b.start() + time.sleep(0.01) + + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + # Allow ctl_a to complete its mount + ctl_a.runto('mounted') + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + # Allow ctl_b to finish. We should not have done a umount + ctl_b.finish() + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + # Allow ctl_a to start umounting + ctl_a.runto('umount') + + expected_calls.extend(self._expected_sentinel_umount_calls()) + # We haven't executed rmdir yet, beause we've blocked during umount + rmdir = expected_calls.pop() + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + expected_calls.append(rmdir) + + # While ctl_a is umounting, simultaneously start both ctl_c and + # ctl_d, and ensure they have an opportunity to race + ctl_c.start() + ctl_d.start() + time.sleep(0.01) + + # Allow a, c, and d to complete + for ctl in (ctl_a, ctl_c, ctl_d): + ctl.finish() + + # We should have completed the previous umount, then remounted + # exactly once + expected_calls.extend(self._expected_sentinel_mount_calls()) + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + def test_mount_concurrent_no_interfere(self): + # Test that concurrent calls to mount volumes in different exports + # run concurrently + m = self._get_clean_hostmountstate() + + def mount_a(): + # Mount vol on mountpoint a + self._sentinel_mount(m, mock.sentinel.vol, + mock.sentinel.mountpoint_a) + TestThreadController.current().waitpoint('mounted') + self._sentinel_umount(m, mock.sentinel.vol, + mock.sentinel.mountpoint_a) + + def mount_b(): + # Mount vol on mountpoint b + self._sentinel_mount(m, mock.sentinel.vol, + mock.sentinel.mountpoint_b) + self._sentinel_umount(m, mock.sentinel.vol, + mock.sentinel.mountpoint_b) + + ctl_a = TestThreadController(mount_a) + ctl_b = TestThreadController(mount_b) + + expected_calls = [] + + ctl_a.runto('mounted') + expected_calls.extend(self._expected_sentinel_mount_calls( + mock.sentinel.mountpoint_a)) + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + ctl_b.finish() + expected_calls.extend(self._expected_sentinel_mount_calls( + mock.sentinel.mountpoint_b)) + expected_calls.extend(self._expected_sentinel_umount_calls( + mock.sentinel.mountpoint_b)) + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + ctl_a.finish() + expected_calls.extend(self._expected_sentinel_umount_calls( + mock.sentinel.mountpoint_a)) + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + def test_mount_after_failed_umount(self): + # Test that MountManager correctly tracks state when umount fails. + # Test that when umount fails a subsequent mount doesn't try to + # remount it. + + # We've already got a fake execute (see setUp) which is ensuring mount, + # umount, and ismount work as expected. We don't want to mess with + # that, except that we want umount to raise an exception. We store the + # original here so we can call it if we're not unmounting, and so we + # can restore it when we no longer want the exception. + orig_execute = mount.utils.execute.side_effect + + def raise_on_umount(cmd, *args, **kwargs): + if cmd == 'umount': + raise mount.processutils.ProcessExecutionError() + orig_execute(cmd, *args, **kwargs) + + mount.utils.execute.side_effect = raise_on_umount + + expected_calls = [] + + m = self._get_clean_hostmountstate() + + # Mount vol_a + self._sentinel_mount(m, mock.sentinel.vol_a) + expected_calls.extend(self._expected_sentinel_mount_calls()) + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + # Umount vol_a. The umount command will fail. + self._sentinel_umount(m, mock.sentinel.vol_a) + expected_calls.extend(self._expected_sentinel_umount_calls()) + + # We should not have called rmdir, because umount failed + expected_calls.pop() + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + # Mount vol_a again. We should not have called mount, because umount + # failed. + self._sentinel_mount(m, mock.sentinel.vol_a) + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + # Prevent future failure of umount + mount.utils.execute.side_effect = orig_execute + + # Umount vol_a successfully + self._sentinel_umount(m, mock.sentinel.vol_a) + expected_calls.extend(self._expected_sentinel_umount_calls()) + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + @mock.patch.object(mount.LOG, 'error') + def test_umount_log_failure(self, mock_LOG_error): + # Test that we log an error when umount fails + orig_execute = mount.utils.execute.side_effect + + def raise_on_umount(cmd, *args, **kwargs): + if cmd == 'umount': + raise mount.processutils.ProcessExecutionError( + None, None, None, 'umount', 'umount: device is busy.') + orig_execute(cmd, *args, **kwargs) + + mount.utils.execute.side_effect = raise_on_umount + + m = self._get_clean_hostmountstate() + + self._sentinel_mount(m, mock.sentinel.vol_a) + self._sentinel_umount(m, mock.sentinel.vol_a) + + mock_LOG_error.assert_called() + + +class MountManagerTestCase(test.NoDBTestCase): + class FakeHostMountState(object): + def __init__(self, host, generation): + self.host = host + self.generation = generation + + ctl = TestThreadController.current() + if ctl is not None: + ctl.waitpoint('init') + + def setUp(self): + super(MountManagerTestCase, self).setUp() + + self.useFixture(fixtures.MonkeyPatch( + 'nova.virt.libvirt.volume.mount._HostMountState', + self.FakeHostMountState)) + + self.m = mount.get_manager() + self.m._reset_state() + + def _get_state(self): + with self.m.get_state() as state: + return state + + def test_host_up_down(self): + self.m.host_up(mock.sentinel.host) + state = self._get_state() + self.assertEqual(state.host, mock.sentinel.host) + self.assertEqual(state.generation, 0) + + self.m.host_down() + self.assertRaises(exception.HypervisorUnavailable, self._get_state) + + def test_host_up_waits_for_completion(self): + self.m.host_up(mock.sentinel.host) + + def txn(): + with self.m.get_state(): + TestThreadController.current().waitpoint('running') + + # Start a thread which blocks holding a state object + ctl = TestThreadController(txn) + ctl.runto('running') + + # Host goes down + self.m.host_down() + + # Call host_up in a separate thread because it will block, and give + # it plenty of time to race + host_up = eventlet.greenthread.spawn(self.m.host_up, + mock.sentinel.host) + time.sleep(0.01) + + # Assert that we haven't instantiated a new state while there's an + # ongoing operation from the previous state + self.assertRaises(exception.HypervisorUnavailable, self._get_state) + + # Allow the previous ongoing operation and host_up to complete + ctl.finish() + host_up.wait() + + # Assert that we've got a new state generation + state = self._get_state() + self.assertEqual(1, state.generation) diff --git a/nova/tests/unit/virt/libvirt/volume/test_nfs.py b/nova/tests/unit/virt/libvirt/volume/test_nfs.py index 8f58cbeec826..a101791aca3a 100644 --- a/nova/tests/unit/virt/libvirt/volume/test_nfs.py +++ b/nova/tests/unit/virt/libvirt/volume/test_nfs.py @@ -12,12 +12,13 @@ import os +import fixtures import mock -from oslo_concurrency import processutils from nova.tests.unit.virt.libvirt.volume import test_volume +from nova.tests import uuidsentinel as uuids from nova import utils -from nova.virt.libvirt import utils as libvirt_utils +from nova.virt.libvirt.volume import mount from nova.virt.libvirt.volume import nfs @@ -26,11 +27,38 @@ class LibvirtNFSVolumeDriverTestCase(test_volume.LibvirtVolumeBaseTestCase): def setUp(self): super(LibvirtNFSVolumeDriverTestCase, self).setUp() + + m = mount.get_manager() + m._reset_state() + self.mnt_base = '/mnt' + m.host_up(self.fake_host) self.flags(nfs_mount_point_base=self.mnt_base, group='libvirt') - @mock.patch.object(libvirt_utils, 'is_mounted', return_value=False) - def test_libvirt_nfs_driver(self, mock_is_mounted): + # Caution: this is also faked by the superclass + orig_execute = utils.execute + + mounted = [False] + + def fake_execute(*cmd, **kwargs): + orig_execute(*cmd, **kwargs) + + if cmd[0] == 'mount': + mounted[0] = True + + if cmd[0] == 'umount': + mounted[0] = False + + self.useFixture(fixtures.MonkeyPatch('nova.utils.execute', + fake_execute)) + + # Mock ismount to return the current mount state + # N.B. This is only valid for tests which mount and unmount a single + # directory. + self.useFixture(fixtures.MonkeyPatch('os.path.ismount', + lambda *args, **kwargs: mounted[0])) + + def test_libvirt_nfs_driver(self): libvirt_driver = nfs.LibvirtNFSVolumeDriver(self.fake_host) export_string = '192.168.1.1:/nfs/share1' @@ -39,8 +67,10 @@ class LibvirtNFSVolumeDriverTestCase(test_volume.LibvirtVolumeBaseTestCase): connection_info = {'data': {'export': export_string, 'name': self.name}} + instance = mock.sentinel.instance + instance.uuid = uuids.instance libvirt_driver.connect_volume(connection_info, self.disk_info, - mock.sentinel.instance) + instance) libvirt_driver.disconnect_volume(connection_info, "vde", mock.sentinel.instance) @@ -50,39 +80,9 @@ class LibvirtNFSVolumeDriverTestCase(test_volume.LibvirtVolumeBaseTestCase): expected_commands = [ ('mkdir', '-p', export_mnt_base), ('mount', '-t', 'nfs', export_string, export_mnt_base), - ('umount', export_mnt_base)] + ('umount', export_mnt_base), + ('rmdir', export_mnt_base)] self.assertEqual(expected_commands, self.executes) - self.assertTrue(mock_is_mounted.called) - - @mock.patch.object(nfs.utils, 'execute') - @mock.patch.object(nfs.LOG, 'debug') - @mock.patch.object(nfs.LOG, 'exception') - def test_libvirt_nfs_driver_umount_error(self, mock_LOG_exception, - mock_LOG_debug, mock_utils_exe): - export_string = '192.168.1.1:/nfs/share1' - connection_info = {'data': {'export': export_string, - 'name': self.name}} - libvirt_driver = nfs.LibvirtNFSVolumeDriver(self.fake_host) - mock_utils_exe.side_effect = processutils.ProcessExecutionError( - None, None, None, 'umount', 'umount: device is busy.') - libvirt_driver.disconnect_volume(connection_info, "vde", - mock.sentinel.instance) - self.assertTrue(mock_LOG_debug.called) - mock_utils_exe.side_effect = processutils.ProcessExecutionError( - None, None, None, 'umount', 'umount: target is busy.') - libvirt_driver.disconnect_volume(connection_info, "vde", - mock.sentinel.instance) - self.assertTrue(mock_LOG_debug.called) - mock_utils_exe.side_effect = processutils.ProcessExecutionError( - None, None, None, 'umount', 'umount: not mounted.') - libvirt_driver.disconnect_volume(connection_info, "vde", - mock.sentinel.instance) - self.assertTrue(mock_LOG_debug.called) - mock_utils_exe.side_effect = processutils.ProcessExecutionError( - None, None, None, 'umount', 'umount: Other error.') - libvirt_driver.disconnect_volume(connection_info, "vde", - mock.sentinel.instance) - self.assertTrue(mock_LOG_exception.called) def test_libvirt_nfs_driver_get_config(self): libvirt_driver = nfs.LibvirtNFSVolumeDriver(self.fake_host) @@ -100,27 +100,7 @@ class LibvirtNFSVolumeDriverTestCase(test_volume.LibvirtVolumeBaseTestCase): self.assertEqual('raw', tree.find('./driver').get('type')) self.assertEqual('native', tree.find('./driver').get('io')) - @mock.patch.object(libvirt_utils, 'is_mounted', return_value=True) - def test_libvirt_nfs_driver_already_mounted(self, mock_is_mounted): - libvirt_driver = nfs.LibvirtNFSVolumeDriver(self.fake_host) - - export_string = '192.168.1.1:/nfs/share1' - export_mnt_base = os.path.join(self.mnt_base, - utils.get_hash_str(export_string)) - - connection_info = {'data': {'export': export_string, - 'name': self.name}} - libvirt_driver.connect_volume(connection_info, self.disk_info, - mock.sentinel.instance) - libvirt_driver.disconnect_volume(connection_info, "vde", - mock.sentinel.instance) - - expected_commands = [ - ('umount', export_mnt_base)] - self.assertEqual(expected_commands, self.executes) - - @mock.patch.object(libvirt_utils, 'is_mounted', return_value=False) - def test_libvirt_nfs_driver_with_opts(self, mock_is_mounted): + def test_libvirt_nfs_driver_with_opts(self): libvirt_driver = nfs.LibvirtNFSVolumeDriver(self.fake_host) export_string = '192.168.1.1:/nfs/share1' options = '-o intr,nfsvers=3' @@ -130,8 +110,10 @@ class LibvirtNFSVolumeDriverTestCase(test_volume.LibvirtVolumeBaseTestCase): connection_info = {'data': {'export': export_string, 'name': self.name, 'options': options}} + instance = mock.sentinel.instance + instance.uuid = uuids.instance libvirt_driver.connect_volume(connection_info, self.disk_info, - mock.sentinel.instance) + instance) libvirt_driver.disconnect_volume(connection_info, "vde", mock.sentinel.instance) @@ -140,6 +122,6 @@ class LibvirtNFSVolumeDriverTestCase(test_volume.LibvirtVolumeBaseTestCase): ('mount', '-t', 'nfs', '-o', 'intr,nfsvers=3', export_string, export_mnt_base), ('umount', export_mnt_base), + ('rmdir', export_mnt_base) ] self.assertEqual(expected_commands, self.executes) - self.assertTrue(mock_is_mounted.called) diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py index 6b1a1e5855b7..3cc061d1aae9 100644 --- a/nova/virt/libvirt/driver.py +++ b/nova/virt/libvirt/driver.py @@ -108,6 +108,7 @@ from nova.virt.libvirt.storage import lvm from nova.virt.libvirt.storage import rbd_utils from nova.virt.libvirt import utils as libvirt_utils from nova.virt.libvirt import vif as libvirt_vif +from nova.virt.libvirt.volume import mount from nova.virt.libvirt.volume import remotefs from nova.virt import netutils from nova.volume import cinder @@ -3462,6 +3463,11 @@ class LibvirtDriver(driver.ComputeDriver): 'due to an unexpected exception.'), CONF.host, exc_info=True) + if enabled: + mount.get_manager().host_up(self._host) + else: + mount.get_manager().host_down() + def _get_guest_cpu_model_config(self): mode = CONF.libvirt.cpu_mode model = CONF.libvirt.cpu_model diff --git a/nova/virt/libvirt/volume/fs.py b/nova/virt/libvirt/volume/fs.py index df84f068a16c..b14f2942aeb0 100644 --- a/nova/virt/libvirt/volume/fs.py +++ b/nova/virt/libvirt/volume/fs.py @@ -18,6 +18,7 @@ import os import six from nova import utils +from nova.virt.libvirt.volume import mount from nova.virt.libvirt.volume import volume as libvirt_volume @@ -93,3 +94,40 @@ class LibvirtBaseFileSystemVolumeDriver( """ mount_path = self._get_mount_path(connection_info) return os.path.join(mount_path, connection_info['data']['name']) + + +@six.add_metaclass(abc.ABCMeta) +class LibvirtMountedFileSystemVolumeDriver(LibvirtBaseFileSystemVolumeDriver): + # NOTE(mdbooth): Hopefully we'll get to the point where everything which + # previously subclassed LibvirtBaseFileSystemVolumeDriver now subclasses + # LibvirtMountedFileSystemVolumeDriver. If we get there, we should fold + # this class into the base class. + def __init__(self, host, fstype): + super(LibvirtMountedFileSystemVolumeDriver, self).__init__(host) + + self.fstype = fstype + + def connect_volume(self, connection_info, disk_info, instance): + """Connect the volume.""" + export = connection_info['data']['export'] + vol_name = connection_info['data']['name'] + mountpoint = self._get_mount_path(connection_info) + + mount.mount(self.fstype, export, vol_name, mountpoint, instance, + self._mount_options(connection_info)) + + connection_info['data']['device_path'] = \ + self._get_device_path(connection_info) + + def disconnect_volume(self, connection_info, disk_dev, instance): + """Disconnect the volume.""" + vol_name = connection_info['data']['name'] + mountpoint = self._get_mount_path(connection_info) + + mount.umount(vol_name, mountpoint, instance) + + @abc.abstractmethod + def _mount_options(self, connection_info): + """Return a list of additional arguments to pass to the mount command. + """ + pass diff --git a/nova/virt/libvirt/volume/mount.py b/nova/virt/libvirt/volume/mount.py new file mode 100644 index 000000000000..b761a33494ba --- /dev/null +++ b/nova/virt/libvirt/volume/mount.py @@ -0,0 +1,428 @@ +# Copyright 2016,2017 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import contextlib +import os.path +import threading + +from oslo_concurrency import processutils +from oslo_log import log +import six + +import nova.conf +from nova import exception +from nova.i18n import _LE, _LW +from nova import utils + +CONF = nova.conf.CONF +LOG = log.getLogger(__name__) + + +class _HostMountStateManager(object): + """A global manager of filesystem mounts. + + _HostMountStateManager manages a _HostMountState object for the current + compute node. Primarily it creates one on host_up(), destroys it on + host_down(), and returns it via get_state(). + + _HostMountStateManager manages concurrency itself. Independent callers do + not need to consider interactions between multiple _HostMountStateManager + calls when designing their own locking. + + _HostMountStateManager is a singleton, and must only be accessed via: + + mount.get_manager() + """ + + def __init__(self): + self._reset_state() + + def _reset_state(self): + """Reset state of global _HostMountStateManager. + + Should only be called by __init__ and tests. + """ + + self.state = None + self.use_count = 0 + + # Guards both state and use_count + self.cond = threading.Condition() + + # Incremented each time we initialise a new mount state. Aids + # debugging. + self.generation = 0 + + @contextlib.contextmanager + def get_state(self): + """Return the current mount state. + + _HostMountStateManager will not permit a new state object to be + created while any previous state object is still in use. + + get_state will raise HypervisorUnavailable if the libvirt connection is + currently down. + + :rtype: _HostMountState + """ + + # We hold the instance lock here so that if a _HostMountState is + # currently initialising we'll wait for it to complete rather than + # fail. + with self.cond: + state = self.state + if state is None: + raise exception.HypervisorUnavailable(host=CONF.host) + self.use_count += 1 + + try: + LOG.debug('Got _HostMountState generation %(gen)i', + {'gen': state.generation}) + + yield state + finally: + with self.cond: + self.use_count -= 1 + self.cond.notify_all() + + def host_up(self, host): + """Inialise a new _HostMountState when the libvirt connection comes + up. + + host_up will destroy and re-initialise the current state if one + already exists, but this is considered an error. + + host_up will block before creating a new state until all operations + using a previous state have completed. + + :param host: A connected libvirt Host object + """ + with self.cond: + if self.state is not None: + LOG.warning(_LW("host_up called, but we think host is " + "already up")) + self._host_down() + + # Wait until all operations using a previous state generation are + # complete before initialising a new one. Note that self.state is + # already None, set either by initialisation or by host_down. This + # means the current state will not be returned to any new callers, + # and use_count will eventually reach zero. + # We do this to avoid a race between _HostMountState initialisation + # and an on-going mount/unmount operation + while self.use_count != 0: + self.cond.wait() + + # Another thread might have initialised state while we were + # wait()ing + if self.state is None: + LOG.debug('Initialising _HostMountState generation %(gen)i', + {'gen': self.generation}) + self.state = _HostMountState(host, self.generation) + self.generation += 1 + + def host_down(self): + """Destroy the current _HostMountState when the libvirt connection + goes down. + """ + with self.cond: + if self.state is None: + LOG.warning(_LW("host_down called, but we don't think host " + "is up")) + return + + self._host_down() + + def _host_down(self): + LOG.debug('Destroying MountManager generation %(gen)i', + {'gen': self.state.generation}) + self.state = None + + +class _HostMountState(object): + """A data structure recording all managed mountpoints and the + attachments in use for each one. _HostMountState ensures that the compute + node only attempts to mount a single mountpoint in use by multiple + attachments once, and that it is not unmounted until it is no longer in use + by any attachments. + + Callers should not create a _HostMountState directly, but should obtain + it via: + + with mount.get_manager().get_state() as state: + state.mount(...) + + On creation _HostMountState inspects the compute host directly to discover + all current mountpoints and the attachments on them. After creation it + expects to have exclusive control of these mountpoints until it is + destroyed. + + _HostMountState manages concurrency itself. Independent callers do not need + to consider interactions between multiple _HostMountState calls when + designing their own locking. + """ + + class _MountPoint(object): + """A single mountpoint, and the set of attachments in use on it.""" + def __init__(self): + # A guard for operations on this mountpoint + # N.B. Care is required using this lock, as it will be deleted + # if the containing _MountPoint is deleted. + self.lock = threading.Lock() + + # The set of attachments on this mountpoint. + self.attachments = set() + + def add_attachment(self, vol_name, instance_uuid): + self.attachments.add((vol_name, instance_uuid)) + + def remove_attachment(self, vol_name, instance_uuid): + self.attachments.remove((vol_name, instance_uuid)) + + def in_use(self): + return len(self.attachments) > 0 + + def __init__(self, host, generation): + """Initialise a _HostMountState by inspecting the current compute + host for mountpoints and the attachments in use on them. + + :param host: A connected libvirt Host object + :param generation: An integer indicating the generation of this + _HostMountState object. This is 0 for the first + _HostMountState created, and incremented for each + created subsequently. It is used in log messages to + aid debugging. + """ + self.generation = generation + self.mountpoints = collections.defaultdict(self._MountPoint) + + # Iterate over all guests on the connected libvirt + for guest in host.list_guests(only_running=False): + for disk in guest.get_all_disks(): + + # All remote filesystem volumes are files + if disk.source_type != 'file': + continue + + # NOTE(mdbooth): We're assuming that the mountpoint is our + # immediate parent, which is currently true for all + # volume drivers. We deliberately don't do anything clever + # here, because we don't want to, e.g.: + # * Add mountpoints for non-volume disks + # * Get it wrong when a non-running domain references a + # volume which isn't mounted because the host just rebooted. + # and this is good enough. We could probably do better here + # with more thought. + + mountpoint = os.path.dirname(disk.source_path) + if not os.path.ismount(mountpoint): + continue + + name = os.path.basename(disk.source_path) + mount = self.mountpoints[mountpoint] + mount.add_attachment(name, guest.uuid) + + LOG.debug('Discovered volume %(vol)s in use for existing ' + 'mountpoint %(mountpoint)s', + {'vol': name, 'mountpoint': mountpoint}) + + @contextlib.contextmanager + def _get_locked(self, mountpoint): + """Get a locked mountpoint object + + :param mountpoint: The path of the mountpoint whose object we should + return. + :rtype: _HostMountState._MountPoint + """ + # This dance is because we delete locks. We need to be sure that the + # lock we hold does not belong to an object which has been deleted. + # We do this by checking that mountpoint still refers to this object + # when we hold the lock. This is safe because: + # * we only delete an object from mountpounts whilst holding its lock + # * mountpoints is a defaultdict which will atomically create a new + # object on access + while True: + mount = self.mountpoints[mountpoint] + with mount.lock: + if self.mountpoints[mountpoint] is mount: + yield mount + break + + def mount(self, fstype, export, vol_name, mountpoint, instance, options): + """Ensure a mountpoint is available for an attachment, mounting it + if necessary. + + If this is the first attachment on this mountpoint, we will mount it + with: + + mount -t + + :param fstype: The filesystem type to be passed to mount command. + :param export: The type-specific identifier of the filesystem to be + mounted. e.g. for nfs 'host.example.com:/mountpoint'. + :param vol_name: The name of the volume on the remote filesystem. + :param mountpoint: The directory where the filesystem will be + mounted on the local compute host. + :param instance: The instance the volume will be attached to. + :param options: An arbitrary list of additional arguments to be + passed to the mount command immediate before export + and mountpoint. + """ + + # NOTE(mdbooth): mount() may currently be called multiple times for a + # single attachment. Any operation which calls + # LibvirtDriver._hard_reboot will re-attach volumes which are probably + # already attached, resulting in multiple mount calls. + + LOG.debug('_HostMountState.mount(fstype=%(fstype)s, ' + 'export=%(export)s, vol_name=%(vol_name)s, %(mountpoint)s, ' + 'options=%(options)s) generation %(gen)s', + {'fstype': fstype, 'export': export, 'vol_name': vol_name, + 'mountpoint': mountpoint, 'options': options, + 'gen': self.generation}) + with self._get_locked(mountpoint) as mount: + if not os.path.ismount(mountpoint): + LOG.debug('Mounting %(mountpoint)s generation %(gen)s', + {'mountpoint': mountpoint, 'gen': self.generation}) + + utils.execute('mkdir', '-p', mountpoint) + + mount_cmd = ['mount', '-t', fstype] + if options is not None: + mount_cmd.extend(options) + mount_cmd.extend([export, mountpoint]) + + try: + utils.execute(*mount_cmd, run_as_root=True) + except Exception: + # Check to see if mountpoint is mounted despite the error + # eg it was already mounted + if os.path.ismount(mountpoint): + # We're not going to raise the exception because we're + # in the desired state anyway. However, this is still + # unusual so we'll log it. + LOG.exception(_LE('Error mounting %(fstype)s export ' + '%(export)s on %(mountpoint)s. ' + 'Continuing because mountpount is ' + 'mounted despite this.'), + {'fstype': fstype, 'export': export, + 'mountpoint': mountpoint}) + + else: + # If the mount failed there's no reason for us to keep + # a record of it. It will be created again if the + # caller retries. + + # Delete while holding lock + del self.mountpoints[mountpoint] + + raise + + mount.add_attachment(vol_name, instance.uuid) + + LOG.debug('_HostMountState.mount() for %(mountpoint)s ' + 'generation %(gen)s completed successfully', + {'mountpoint': mountpoint, 'gen': self.generation}) + + def umount(self, vol_name, mountpoint, instance): + """Mark an attachment as no longer in use, and unmount its mountpoint + if necessary. + + :param vol_name: The name of the volume on the remote filesystem. + :param mountpoint: The directory where the filesystem is be + mounted on the local compute host. + :param instance: The instance the volume was attached to. + """ + LOG.debug('_HostMountState.umount(vol_name=%(vol_name)s, ' + 'mountpoint=%(mountpoint)s) generation %(gen)s', + {'vol_name': vol_name, 'mountpoint': mountpoint, + 'gen': self.generation}) + with self._get_locked(mountpoint) as mount: + try: + mount.remove_attachment(vol_name, instance.uuid) + except KeyError: + LOG.warning(_LW("Request to remove attachment " + "(%(vol_name)s, %(instance)s) from " + "%(mountpoint)s, but we don't think it's in " + "use."), + {'vol_name': vol_name, 'instance': instance.uuid, + 'mountpoint': mountpoint}) + + if not mount.in_use(): + mounted = os.path.ismount(mountpoint) + + if mounted: + mounted = self._real_umount(mountpoint) + + # Delete our record entirely if it's unmounted + if not mounted: + del self.mountpoints[mountpoint] + + LOG.debug('_HostMountState.umount() for %(mountpoint)s ' + 'generation %(gen)s completed successfully', + {'mountpoint': mountpoint, 'gen': self.generation}) + + def _real_umount(self, mountpoint): + # Unmount and delete a mountpoint. + # Return mount state after umount (i.e. True means still mounted) + LOG.debug('Unmounting %(mountpoint)s generation %(gen)s', + {'mountpoint': mountpoint, 'gen': self.generation}) + + try: + utils.execute('umount', mountpoint, run_as_root=True, + attempts=3, delay_on_retry=True) + except processutils.ProcessExecutionError as ex: + LOG.error(_LE("Couldn't unmount %(mountpoint)s: %(reason)s"), + {'mountpoint': mountpoint, 'reason': six.text_type(ex)}) + + if not os.path.ismount(mountpoint): + try: + utils.execute('rmdir', mountpoint) + except processutils.ProcessExecutionError as ex: + LOG.error(_LE("Couldn't remove directory %(mountpoint)s: " + "%(reason)s"), + {'mountpoint': mountpoint, + 'reason': six.text_type(ex)}) + return False + + return True + + +__manager__ = _HostMountStateManager() + + +def get_manager(): + """Return the _HostMountStateManager singleton. + + :rtype: _HostMountStateManager + """ + return __manager__ + + +def mount(fstype, export, vol_name, mountpoint, instance, options=None): + """A convenience wrapper around _HostMountState.mount(), called via the + _HostMountStateManager singleton. + """ + with __manager__.get_state() as mount_state: + mount_state.mount(fstype, export, vol_name, mountpoint, instance, + options) + + +def umount(vol_name, mountpoint, instance): + """A convenience wrapper around _HostMountState.umount(), called via the + _HostMountStateManager singleton. + """ + with __manager__.get_state() as mount_state: + mount_state.umount(vol_name, mountpoint, instance) diff --git a/nova/virt/libvirt/volume/nfs.py b/nova/virt/libvirt/volume/nfs.py index 327328f0e665..594950573ffa 100644 --- a/nova/virt/libvirt/volume/nfs.py +++ b/nova/virt/libvirt/volume/nfs.py @@ -11,14 +11,9 @@ # under the License. -from oslo_concurrency import processutils from oslo_log import log as logging -import six import nova.conf -from nova.i18n import _LE, _LW -from nova import utils -from nova.virt.libvirt import utils as libvirt_utils from nova.virt.libvirt.volume import fs LOG = logging.getLogger(__name__) @@ -26,9 +21,12 @@ LOG = logging.getLogger(__name__) CONF = nova.conf.CONF -class LibvirtNFSVolumeDriver(fs.LibvirtBaseFileSystemVolumeDriver): +class LibvirtNFSVolumeDriver(fs.LibvirtMountedFileSystemVolumeDriver): """Class implements libvirt part of volume driver for NFS.""" + def __init__(self, connection): + super(LibvirtNFSVolumeDriver, self).__init__(connection, 'nfs') + def _get_mount_point_base(self): return CONF.libvirt.nfs_mount_point_base @@ -43,57 +41,13 @@ class LibvirtNFSVolumeDriver(fs.LibvirtBaseFileSystemVolumeDriver): conf.driver_io = "native" return conf - def connect_volume(self, connection_info, disk_info, instance): - """Connect the volume.""" - self._ensure_mounted(connection_info) - - connection_info['data']['device_path'] = \ - self._get_device_path(connection_info) - - def disconnect_volume(self, connection_info, disk_dev, instance): - """Disconnect the volume.""" - - mount_path = self._get_mount_path(connection_info) - - try: - utils.execute('umount', mount_path, run_as_root=True) - except processutils.ProcessExecutionError as exc: - export = connection_info['data']['export'] - if ('device is busy' in six.text_type(exc) or - 'target is busy' in six.text_type(exc)): - LOG.debug("The NFS share %s is still in use.", export) - elif ('not mounted' in six.text_type(exc)): - LOG.debug("The NFS share %s has already been unmounted.", - export) - else: - LOG.exception(_LE("Couldn't unmount the NFS share %s"), export) - - def _ensure_mounted(self, connection_info): - """@type connection_info: dict - """ - nfs_export = connection_info['data']['export'] - mount_path = self._get_mount_path(connection_info) - if not libvirt_utils.is_mounted(mount_path, nfs_export): - options = connection_info['data'].get('options') - self._mount_nfs(mount_path, nfs_export, options, ensure=True) - return mount_path - - def _mount_nfs(self, mount_path, nfs_share, options=None, ensure=False): - """Mount nfs export to mount path.""" - utils.execute('mkdir', '-p', mount_path) - - # Construct the NFS mount command. - nfs_cmd = ['mount', '-t', 'nfs'] + def _mount_options(self, connection_info): + options = [] if CONF.libvirt.nfs_mount_options is not None: - nfs_cmd.extend(['-o', CONF.libvirt.nfs_mount_options]) - if options: - nfs_cmd.extend(options.split(' ')) - nfs_cmd.extend([nfs_share, mount_path]) + options.extend(['-o', CONF.libvirt.nfs_mount_options]) - try: - utils.execute(*nfs_cmd, run_as_root=True) - except processutils.ProcessExecutionError as exc: - if ensure and 'already mounted' in six.text_type(exc): - LOG.warning(_LW("%s is already mounted"), nfs_share) - else: - raise + conn_options = connection_info['data'].get('options') + if conn_options: + options.extend(conn_options.split(' ')) + + return options