Remove VFSLocalFS
The fix Iac8496065c8b6212d7edac320659444ab341b513 removed the last user of VFSLocalFS so this patch remove the class, the related tests and all the privsep functions that become dead code after this cleanup. Change-Id: Ia1eb1d93d1f9699a4027b7a07107109ab9a3a29a
This commit is contained in:
parent
d06a10f096
commit
4b32f9c9e3
|
@ -129,12 +129,6 @@ def remove_device_maps(device):
|
|||
return processutils.execute('kpartx', '-d', device)
|
||||
|
||||
|
||||
@nova.privsep.sys_admin_pctxt.entrypoint
|
||||
def get_filesystem_type(device):
|
||||
return processutils.execute('blkid', '-o', 'value', '-s', 'TYPE', device,
|
||||
check_exit_code=[0, 2])
|
||||
|
||||
|
||||
@nova.privsep.sys_admin_pctxt.entrypoint
|
||||
def e2fsck(image, flags='-fp'):
|
||||
unprivileged_e2fsck(image, flags=flags)
|
||||
|
|
|
@ -25,14 +25,6 @@ from nova import exception
|
|||
import nova.privsep
|
||||
|
||||
|
||||
@nova.privsep.sys_admin_pctxt.entrypoint
|
||||
def readfile(path):
|
||||
if not os.path.exists(path):
|
||||
raise exception.FileNotFound(file_path=path)
|
||||
with open(path, 'r') as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
@nova.privsep.sys_admin_pctxt.entrypoint
|
||||
def writefile(path, mode, content):
|
||||
if not os.path.exists(os.path.dirname(path)):
|
||||
|
@ -41,13 +33,6 @@ def writefile(path, mode, content):
|
|||
f.write(content)
|
||||
|
||||
|
||||
@nova.privsep.sys_admin_pctxt.entrypoint
|
||||
def readlink(path):
|
||||
if not os.path.exists(path):
|
||||
raise exception.FileNotFound(file_path=path)
|
||||
return os.readlink(path)
|
||||
|
||||
|
||||
@nova.privsep.sys_admin_pctxt.entrypoint
|
||||
def chown(
|
||||
path: str, uid: int = -1, gid: int = -1, recursive: bool = False,
|
||||
|
@ -102,13 +87,6 @@ def rmdir(path):
|
|||
os.rmdir(path)
|
||||
|
||||
|
||||
class path(object):
|
||||
@staticmethod
|
||||
@nova.privsep.sys_admin_pctxt.entrypoint
|
||||
def exists(path):
|
||||
return os.path.exists(path)
|
||||
|
||||
|
||||
@nova.privsep.sys_admin_pctxt.entrypoint
|
||||
def last_bytes(path, num):
|
||||
"""Return num bytes from the end of the file, and remaining byte count.
|
||||
|
|
|
@ -142,13 +142,6 @@ class PrivsepFilesystemHelpersTestCase(test.NoDBTestCase):
|
|||
nova.privsep.fs.remove_device_maps('/dev/nosuch')
|
||||
mock_execute.assert_called_with('kpartx', '-d', '/dev/nosuch')
|
||||
|
||||
@mock.patch('oslo_concurrency.processutils.execute')
|
||||
def test_get_filesystem_type(self, mock_execute):
|
||||
nova.privsep.fs.get_filesystem_type('/dev/nosuch')
|
||||
mock_execute.assert_called_with('blkid', '-o', 'value', '-s',
|
||||
'TYPE', '/dev/nosuch',
|
||||
check_exit_code=[0, 2])
|
||||
|
||||
@mock.patch('oslo_concurrency.processutils.execute')
|
||||
def test_privileged_e2fsck(self, mock_execute):
|
||||
nova.privsep.fs.e2fsck('/path/nosuch')
|
||||
|
|
|
@ -31,19 +31,6 @@ class FileTestCase(test.NoDBTestCase):
|
|||
super(FileTestCase, self).setUp()
|
||||
self.useFixture(fixtures.PrivsepFixture())
|
||||
|
||||
@mock.patch('os.path.exists', return_value=True)
|
||||
def test_readfile(self, mock_exists):
|
||||
mock_open = mock.mock_open(read_data='hello world')
|
||||
with mock.patch('builtins.open', new=mock_open):
|
||||
self.assertEqual('hello world',
|
||||
nova.privsep.path.readfile('/fake/path'))
|
||||
|
||||
@mock.patch('os.path.exists', return_value=False)
|
||||
def test_readfile_file_not_found(self, mock_exists):
|
||||
self.assertRaises(exception.FileNotFound,
|
||||
nova.privsep.path.readfile,
|
||||
'/fake/path')
|
||||
|
||||
@mock.patch('os.path.exists', return_value=True)
|
||||
def test_write(self, mock_exists):
|
||||
mock_open = mock.mock_open()
|
||||
|
@ -62,19 +49,6 @@ class FileTestCase(test.NoDBTestCase):
|
|||
nova.privsep.path.writefile,
|
||||
'/fake/path', 'w', 'foo')
|
||||
|
||||
@mock.patch('os.path.exists', return_value=True)
|
||||
@mock.patch('os.readlink')
|
||||
def test_readlink(self, mock_readlink, mock_exists):
|
||||
nova.privsep.path.readlink('/fake/path')
|
||||
mock_exists.assert_called_with('/fake/path')
|
||||
mock_readlink.assert_called_with('/fake/path')
|
||||
|
||||
@mock.patch('os.path.exists', return_value=False)
|
||||
def test_readlink_file_not_found(self, mock_exists):
|
||||
self.assertRaises(exception.FileNotFound,
|
||||
nova.privsep.path.readlink,
|
||||
'/fake/path')
|
||||
|
||||
@mock.patch('os.path.exists', return_value=True)
|
||||
@mock.patch('os.chown')
|
||||
def test_chown(self, mock_chown, mock_exists):
|
||||
|
@ -162,11 +136,6 @@ class FileTestCase(test.NoDBTestCase):
|
|||
nova.privsep.path.rmdir,
|
||||
'/fake/path')
|
||||
|
||||
@mock.patch('os.path.exists', return_value=True)
|
||||
def test_exists(self, mock_exists):
|
||||
nova.privsep.path.path.exists('/fake/path')
|
||||
mock_exists.assert_called_with('/fake/path')
|
||||
|
||||
|
||||
class LastBytesTestCase(test.NoDBTestCase):
|
||||
"""Test the last_bytes() utility method."""
|
||||
|
|
|
@ -1,226 +0,0 @@
|
|||
# Copyright (C) 2012 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import grp
|
||||
import pwd
|
||||
import tempfile
|
||||
|
||||
from collections import namedtuple
|
||||
import mock
|
||||
|
||||
from nova import exception
|
||||
from nova import test
|
||||
import nova.utils
|
||||
from nova.virt.disk.mount import nbd
|
||||
from nova.virt.disk.vfs import localfs as vfsimpl
|
||||
from nova.virt.image import model as imgmodel
|
||||
|
||||
|
||||
class VirtDiskVFSLocalFSTestPaths(test.NoDBTestCase):
|
||||
def setUp(self):
|
||||
super(VirtDiskVFSLocalFSTestPaths, self).setUp()
|
||||
|
||||
self.rawfile = imgmodel.LocalFileImage('/dummy.img',
|
||||
imgmodel.FORMAT_RAW)
|
||||
|
||||
# NOTE(mikal): mocking a decorator is non-trivial, so this is the
|
||||
# best we can do.
|
||||
|
||||
@mock.patch.object(nova.privsep.path, 'readlink')
|
||||
def test_check_safe_path(self, read_link):
|
||||
vfs = vfsimpl.VFSLocalFS(self.rawfile)
|
||||
vfs.imgdir = '/foo'
|
||||
|
||||
read_link.return_value = '/foo/etc/something.conf'
|
||||
|
||||
ret = vfs._canonical_path('etc/something.conf')
|
||||
self.assertEqual(ret, '/foo/etc/something.conf')
|
||||
|
||||
@mock.patch.object(nova.privsep.path, 'readlink')
|
||||
def test_check_unsafe_path(self, read_link):
|
||||
vfs = vfsimpl.VFSLocalFS(self.rawfile)
|
||||
vfs.imgdir = '/foo'
|
||||
|
||||
read_link.return_value = '/etc/something.conf'
|
||||
|
||||
self.assertRaises(exception.Invalid,
|
||||
vfs._canonical_path,
|
||||
'etc/../../../something.conf')
|
||||
|
||||
|
||||
class VirtDiskVFSLocalFSTest(test.NoDBTestCase):
|
||||
def setUp(self):
|
||||
super(VirtDiskVFSLocalFSTest, self).setUp()
|
||||
|
||||
self.qcowfile = imgmodel.LocalFileImage('/dummy.qcow2',
|
||||
imgmodel.FORMAT_QCOW2)
|
||||
self.rawfile = imgmodel.LocalFileImage('/dummy.img',
|
||||
imgmodel.FORMAT_RAW)
|
||||
|
||||
@mock.patch.object(nova.privsep.path, 'readlink')
|
||||
@mock.patch.object(nova.privsep.path, 'makedirs')
|
||||
def test_makepath(self, mkdir, read_link):
|
||||
vfs = vfsimpl.VFSLocalFS(self.qcowfile)
|
||||
vfs.imgdir = '/scratch/dir'
|
||||
|
||||
vfs.make_path('/some/dir')
|
||||
read_link.assert_called()
|
||||
mkdir.assert_called_with(read_link.return_value)
|
||||
|
||||
read_link.reset_mock()
|
||||
mkdir.reset_mock()
|
||||
vfs.make_path('/other/dir')
|
||||
read_link.assert_called()
|
||||
mkdir.assert_called_with(read_link.return_value)
|
||||
|
||||
@mock.patch.object(nova.privsep.path, 'readlink')
|
||||
@mock.patch.object(nova.privsep.path, 'writefile')
|
||||
def test_append_file(self, write_file, read_link):
|
||||
vfs = vfsimpl.VFSLocalFS(self.qcowfile)
|
||||
vfs.imgdir = '/scratch/dir'
|
||||
|
||||
vfs.append_file('/some/file', ' Goodbye')
|
||||
|
||||
read_link.assert_called()
|
||||
write_file.assert_called_with(read_link.return_value, 'a', ' Goodbye')
|
||||
|
||||
@mock.patch.object(nova.privsep.path, 'readlink')
|
||||
@mock.patch.object(nova.privsep.path, 'writefile')
|
||||
def test_replace_file(self, write_file, read_link):
|
||||
vfs = vfsimpl.VFSLocalFS(self.qcowfile)
|
||||
vfs.imgdir = '/scratch/dir'
|
||||
|
||||
vfs.replace_file('/some/file', 'Goodbye')
|
||||
|
||||
read_link.assert_called()
|
||||
write_file.assert_called_with(read_link.return_value, 'w', 'Goodbye')
|
||||
|
||||
@mock.patch.object(nova.privsep.path, 'readlink')
|
||||
@mock.patch.object(nova.privsep.path, 'readfile')
|
||||
def test_read_file(self, read_file, read_link):
|
||||
vfs = vfsimpl.VFSLocalFS(self.qcowfile)
|
||||
vfs.imgdir = '/scratch/dir'
|
||||
|
||||
self.assertEqual(read_file.return_value, vfs.read_file('/some/file'))
|
||||
read_link.assert_called()
|
||||
read_file.assert_called()
|
||||
|
||||
@mock.patch.object(nova.privsep.path.path, 'exists')
|
||||
def test_has_file(self, exists):
|
||||
vfs = vfsimpl.VFSLocalFS(self.qcowfile)
|
||||
vfs.imgdir = '/scratch/dir'
|
||||
has = vfs.has_file('/some/file')
|
||||
self.assertEqual(exists.return_value, has)
|
||||
|
||||
@mock.patch.object(nova.privsep.path, 'readlink')
|
||||
@mock.patch.object(nova.privsep.path, 'chmod')
|
||||
def test_set_permissions(self, chmod, read_link):
|
||||
vfs = vfsimpl.VFSLocalFS(self.qcowfile)
|
||||
vfs.imgdir = '/scratch/dir'
|
||||
|
||||
vfs.set_permissions('/some/file', 0o777)
|
||||
read_link.assert_called()
|
||||
chmod.assert_called_with(read_link.return_value, 0o777)
|
||||
|
||||
@mock.patch.object(nova.privsep.path, 'readlink')
|
||||
@mock.patch.object(nova.privsep.path, 'chown')
|
||||
@mock.patch.object(pwd, 'getpwnam')
|
||||
@mock.patch.object(grp, 'getgrnam')
|
||||
def test_set_ownership(self, getgrnam, getpwnam, chown, read_link):
|
||||
vfs = vfsimpl.VFSLocalFS(self.qcowfile)
|
||||
vfs.imgdir = '/scratch/dir'
|
||||
|
||||
fake_passwd = namedtuple('fake_passwd', 'pw_uid')
|
||||
getpwnam.return_value(fake_passwd(pw_uid=100))
|
||||
|
||||
fake_group = namedtuple('fake_group', 'gr_gid')
|
||||
getgrnam.return_value(fake_group(gr_gid=101))
|
||||
|
||||
vfs.set_ownership('/some/file', 'fred', None)
|
||||
read_link.assert_called()
|
||||
chown.assert_called_with(read_link.return_value,
|
||||
uid=getpwnam.return_value.pw_uid)
|
||||
|
||||
read_link.reset_mock()
|
||||
chown.reset_mock()
|
||||
vfs.set_ownership('/some/file', None, 'users')
|
||||
read_link.assert_called()
|
||||
chown.assert_called_with(read_link.return_value,
|
||||
gid=getgrnam.return_value.gr_gid)
|
||||
|
||||
read_link.reset_mock()
|
||||
chown.reset_mock()
|
||||
vfs.set_ownership('/some/file', 'joe', 'admins')
|
||||
read_link.assert_called()
|
||||
chown.assert_called_with(read_link.return_value,
|
||||
uid=getpwnam.return_value.pw_uid,
|
||||
gid=getgrnam.return_value.gr_gid)
|
||||
|
||||
@mock.patch('nova.privsep.fs.get_filesystem_type',
|
||||
return_value=('ext3\n', ''))
|
||||
def test_get_format_fs(self, mock_type):
|
||||
vfs = vfsimpl.VFSLocalFS(self.rawfile)
|
||||
vfs.setup = mock.MagicMock()
|
||||
vfs.teardown = mock.MagicMock()
|
||||
|
||||
def fake_setup():
|
||||
vfs.mount = mock.MagicMock()
|
||||
vfs.mount.device = None
|
||||
vfs.mount.get_dev.side_effect = fake_get_dev
|
||||
|
||||
def fake_teardown():
|
||||
vfs.mount.device = None
|
||||
|
||||
def fake_get_dev():
|
||||
vfs.mount.device = '/dev/xyz'
|
||||
return True
|
||||
|
||||
vfs.setup.side_effect = fake_setup
|
||||
vfs.teardown.side_effect = fake_teardown
|
||||
|
||||
vfs.setup()
|
||||
self.assertEqual('ext3', vfs.get_image_fs())
|
||||
vfs.teardown()
|
||||
vfs.mount.get_dev.assert_called_once_with()
|
||||
mock_type.assert_called_once_with('/dev/xyz')
|
||||
|
||||
@mock.patch.object(tempfile, 'mkdtemp')
|
||||
@mock.patch.object(nbd, 'NbdMount')
|
||||
def test_setup_mount(self, NbdMount, mkdtemp):
|
||||
vfs = vfsimpl.VFSLocalFS(self.qcowfile)
|
||||
|
||||
mounter = mock.MagicMock()
|
||||
mkdtemp.return_value = 'tmp/'
|
||||
NbdMount.return_value = mounter
|
||||
|
||||
vfs.setup()
|
||||
|
||||
self.assertTrue(mkdtemp.called)
|
||||
NbdMount.assert_called_once_with(self.qcowfile, 'tmp/', None)
|
||||
mounter.do_mount.assert_called_once_with()
|
||||
|
||||
@mock.patch.object(tempfile, 'mkdtemp')
|
||||
@mock.patch.object(nbd, 'NbdMount')
|
||||
def test_setup_mount_false(self, NbdMount, mkdtemp):
|
||||
vfs = vfsimpl.VFSLocalFS(self.qcowfile)
|
||||
|
||||
mounter = mock.MagicMock()
|
||||
mkdtemp.return_value = 'tmp/'
|
||||
NbdMount.return_value = mounter
|
||||
|
||||
vfs.setup(mount=False)
|
||||
|
||||
self.assertTrue(mkdtemp.called)
|
||||
NbdMount.assert_called_once_with(self.qcowfile, 'tmp/', None)
|
||||
self.assertFalse(mounter.do_mount.called)
|
|
@ -1,147 +0,0 @@
|
|||
# Copyright 2012 Red Hat, Inc.
|
||||
# Copyright 2017 Rackspace Australia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import grp
|
||||
import os
|
||||
import pwd
|
||||
import tempfile
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
|
||||
from nova import exception
|
||||
from nova.i18n import _
|
||||
import nova.privsep.fs
|
||||
import nova.privsep.path
|
||||
from nova.virt.disk.mount import api as mount_api
|
||||
from nova.virt.disk.vfs import api as vfs
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VFSLocalFS(vfs.VFS):
|
||||
|
||||
"""os.path.join() with safety check for injected file paths.
|
||||
|
||||
Join the supplied path components and make sure that the
|
||||
resulting path we are injecting into is within the
|
||||
mounted guest fs. Trying to be clever and specifying a
|
||||
path with '..' in it will hit this safeguard.
|
||||
"""
|
||||
def _canonical_path(self, path):
|
||||
canonpath = nova.privsep.path.readlink(path)
|
||||
if not canonpath.startswith(os.path.realpath(self.imgdir) + '/'):
|
||||
raise exception.Invalid(_('File path %s not valid') % path)
|
||||
return canonpath
|
||||
|
||||
"""
|
||||
This class implements a VFS module that is mapped to a virtual
|
||||
root directory present on the host filesystem. This implementation
|
||||
uses the nova.virt.disk.mount.Mount API to make virtual disk
|
||||
images visible in the host filesystem. If the disk format is
|
||||
raw, it will use the loopback mount impl, otherwise it will
|
||||
use the qemu-nbd impl.
|
||||
"""
|
||||
def __init__(self, image, partition=None, imgdir=None):
|
||||
"""Create a new local VFS instance
|
||||
|
||||
:param image: instance of nova.virt.image.model.Image
|
||||
:param partition: the partition number of access
|
||||
:param imgdir: the directory to mount the image at
|
||||
"""
|
||||
|
||||
super(VFSLocalFS, self).__init__(image, partition)
|
||||
|
||||
self.imgdir = imgdir
|
||||
self.mount = None
|
||||
|
||||
def setup(self, mount=True):
|
||||
self.imgdir = tempfile.mkdtemp(prefix="openstack-vfs-localfs")
|
||||
try:
|
||||
mnt = mount_api.Mount.instance_for_format(self.image,
|
||||
self.imgdir,
|
||||
self.partition)
|
||||
if mount:
|
||||
if not mnt.do_mount():
|
||||
raise exception.NovaException(mnt.error)
|
||||
self.mount = mnt
|
||||
except Exception as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.debug("Failed to mount image: %(ex)s", {'ex': e})
|
||||
self.teardown()
|
||||
|
||||
def teardown(self):
|
||||
try:
|
||||
if self.mount:
|
||||
self.mount.do_teardown()
|
||||
except Exception as e:
|
||||
LOG.debug("Failed to unmount %(imgdir)s: %(ex)s",
|
||||
{'imgdir': self.imgdir, 'ex': e})
|
||||
try:
|
||||
if self.imgdir:
|
||||
os.rmdir(self.imgdir)
|
||||
except Exception as e:
|
||||
LOG.debug("Failed to remove %(imgdir)s: %(ex)s",
|
||||
{'imgdir': self.imgdir, 'ex': e})
|
||||
self.imgdir = None
|
||||
self.mount = None
|
||||
|
||||
def make_path(self, path):
|
||||
LOG.debug("Make directory path=%s", path)
|
||||
nova.privsep.path.makedirs(self._canonical_path(path))
|
||||
|
||||
def append_file(self, path, content):
|
||||
LOG.debug("Append file path=%s", path)
|
||||
return nova.privsep.path.writefile(
|
||||
self._canonical_path(path), 'a', content)
|
||||
|
||||
def replace_file(self, path, content):
|
||||
LOG.debug("Replace file path=%s", path)
|
||||
return nova.privsep.path.writefile(
|
||||
self._canonical_path(path), 'w', content)
|
||||
|
||||
def read_file(self, path):
|
||||
LOG.debug("Read file path=%s", path)
|
||||
return nova.privsep.path.readfile(self._canonical_path(path))
|
||||
|
||||
def has_file(self, path):
|
||||
# NOTE(mikal): it is deliberate that we don't generate a canonical
|
||||
# path here, as that tests for existance and would raise an exception.
|
||||
LOG.debug("Has file path=%s", path)
|
||||
return nova.privsep.path.path.exists(path)
|
||||
|
||||
def set_permissions(self, path, mode):
|
||||
LOG.debug("Set permissions path=%(path)s mode=%(mode)o",
|
||||
{'path': path, 'mode': mode})
|
||||
nova.privsep.path.chmod(self._canonical_path(path), mode)
|
||||
|
||||
def set_ownership(self, path, user, group):
|
||||
LOG.debug("Set permissions path=%(path)s "
|
||||
"user=%(user)s group=%(group)s",
|
||||
{'path': path, 'user': user, 'group': group})
|
||||
canonpath = self._canonical_path(path)
|
||||
|
||||
chown_kwargs = {}
|
||||
if user:
|
||||
chown_kwargs['uid'] = pwd.getpwnam(user).pw_uid
|
||||
if group:
|
||||
chown_kwargs['gid'] = grp.getgrnam(group).gr_gid
|
||||
nova.privsep.path.chown(canonpath, **chown_kwargs)
|
||||
|
||||
def get_image_fs(self):
|
||||
if self.mount.device or self.mount.get_dev():
|
||||
out, err = nova.privsep.fs.get_filesystem_type(self.mount.device)
|
||||
return out.strip()
|
||||
return ""
|
Loading…
Reference in New Issue