Merge "Remove VFSLocalFS"

This commit is contained in:
Zuul 2021-03-16 17:33:45 +00:00 committed by Gerrit Code Review
commit f55f5daed8
6 changed files with 0 additions and 439 deletions

View File

@ -129,12 +129,6 @@ def remove_device_maps(device):
return processutils.execute('kpartx', '-d', device)
@nova.privsep.sys_admin_pctxt.entrypoint
def get_filesystem_type(device):
return processutils.execute('blkid', '-o', 'value', '-s', 'TYPE', device,
check_exit_code=[0, 2])
@nova.privsep.sys_admin_pctxt.entrypoint
def e2fsck(image, flags='-fp'):
unprivileged_e2fsck(image, flags=flags)

View File

@ -25,14 +25,6 @@ from nova import exception
import nova.privsep
@nova.privsep.sys_admin_pctxt.entrypoint
def readfile(path):
if not os.path.exists(path):
raise exception.FileNotFound(file_path=path)
with open(path, 'r') as f:
return f.read()
@nova.privsep.sys_admin_pctxt.entrypoint
def writefile(path, mode, content):
if not os.path.exists(os.path.dirname(path)):
@ -41,13 +33,6 @@ def writefile(path, mode, content):
f.write(content)
@nova.privsep.sys_admin_pctxt.entrypoint
def readlink(path):
if not os.path.exists(path):
raise exception.FileNotFound(file_path=path)
return os.readlink(path)
@nova.privsep.sys_admin_pctxt.entrypoint
def chown(
path: str, uid: int = -1, gid: int = -1, recursive: bool = False,
@ -102,13 +87,6 @@ def rmdir(path):
os.rmdir(path)
class path(object):
@staticmethod
@nova.privsep.sys_admin_pctxt.entrypoint
def exists(path):
return os.path.exists(path)
@nova.privsep.sys_admin_pctxt.entrypoint
def last_bytes(path, num):
"""Return num bytes from the end of the file, and remaining byte count.

View File

@ -142,13 +142,6 @@ class PrivsepFilesystemHelpersTestCase(test.NoDBTestCase):
nova.privsep.fs.remove_device_maps('/dev/nosuch')
mock_execute.assert_called_with('kpartx', '-d', '/dev/nosuch')
@mock.patch('oslo_concurrency.processutils.execute')
def test_get_filesystem_type(self, mock_execute):
nova.privsep.fs.get_filesystem_type('/dev/nosuch')
mock_execute.assert_called_with('blkid', '-o', 'value', '-s',
'TYPE', '/dev/nosuch',
check_exit_code=[0, 2])
@mock.patch('oslo_concurrency.processutils.execute')
def test_privileged_e2fsck(self, mock_execute):
nova.privsep.fs.e2fsck('/path/nosuch')

View File

@ -31,19 +31,6 @@ class FileTestCase(test.NoDBTestCase):
super(FileTestCase, self).setUp()
self.useFixture(fixtures.PrivsepFixture())
@mock.patch('os.path.exists', return_value=True)
def test_readfile(self, mock_exists):
mock_open = mock.mock_open(read_data='hello world')
with mock.patch('builtins.open', new=mock_open):
self.assertEqual('hello world',
nova.privsep.path.readfile('/fake/path'))
@mock.patch('os.path.exists', return_value=False)
def test_readfile_file_not_found(self, mock_exists):
self.assertRaises(exception.FileNotFound,
nova.privsep.path.readfile,
'/fake/path')
@mock.patch('os.path.exists', return_value=True)
def test_write(self, mock_exists):
mock_open = mock.mock_open()
@ -62,19 +49,6 @@ class FileTestCase(test.NoDBTestCase):
nova.privsep.path.writefile,
'/fake/path', 'w', 'foo')
@mock.patch('os.path.exists', return_value=True)
@mock.patch('os.readlink')
def test_readlink(self, mock_readlink, mock_exists):
nova.privsep.path.readlink('/fake/path')
mock_exists.assert_called_with('/fake/path')
mock_readlink.assert_called_with('/fake/path')
@mock.patch('os.path.exists', return_value=False)
def test_readlink_file_not_found(self, mock_exists):
self.assertRaises(exception.FileNotFound,
nova.privsep.path.readlink,
'/fake/path')
@mock.patch('os.path.exists', return_value=True)
@mock.patch('os.chown')
def test_chown(self, mock_chown, mock_exists):
@ -162,11 +136,6 @@ class FileTestCase(test.NoDBTestCase):
nova.privsep.path.rmdir,
'/fake/path')
@mock.patch('os.path.exists', return_value=True)
def test_exists(self, mock_exists):
nova.privsep.path.path.exists('/fake/path')
mock_exists.assert_called_with('/fake/path')
class LastBytesTestCase(test.NoDBTestCase):
"""Test the last_bytes() utility method."""

View File

@ -1,226 +0,0 @@
# Copyright (C) 2012 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import grp
import pwd
import tempfile
from collections import namedtuple
import mock
from nova import exception
from nova import test
import nova.utils
from nova.virt.disk.mount import nbd
from nova.virt.disk.vfs import localfs as vfsimpl
from nova.virt.image import model as imgmodel
class VirtDiskVFSLocalFSTestPaths(test.NoDBTestCase):
def setUp(self):
super(VirtDiskVFSLocalFSTestPaths, self).setUp()
self.rawfile = imgmodel.LocalFileImage('/dummy.img',
imgmodel.FORMAT_RAW)
# NOTE(mikal): mocking a decorator is non-trivial, so this is the
# best we can do.
@mock.patch.object(nova.privsep.path, 'readlink')
def test_check_safe_path(self, read_link):
vfs = vfsimpl.VFSLocalFS(self.rawfile)
vfs.imgdir = '/foo'
read_link.return_value = '/foo/etc/something.conf'
ret = vfs._canonical_path('etc/something.conf')
self.assertEqual(ret, '/foo/etc/something.conf')
@mock.patch.object(nova.privsep.path, 'readlink')
def test_check_unsafe_path(self, read_link):
vfs = vfsimpl.VFSLocalFS(self.rawfile)
vfs.imgdir = '/foo'
read_link.return_value = '/etc/something.conf'
self.assertRaises(exception.Invalid,
vfs._canonical_path,
'etc/../../../something.conf')
class VirtDiskVFSLocalFSTest(test.NoDBTestCase):
def setUp(self):
super(VirtDiskVFSLocalFSTest, self).setUp()
self.qcowfile = imgmodel.LocalFileImage('/dummy.qcow2',
imgmodel.FORMAT_QCOW2)
self.rawfile = imgmodel.LocalFileImage('/dummy.img',
imgmodel.FORMAT_RAW)
@mock.patch.object(nova.privsep.path, 'readlink')
@mock.patch.object(nova.privsep.path, 'makedirs')
def test_makepath(self, mkdir, read_link):
vfs = vfsimpl.VFSLocalFS(self.qcowfile)
vfs.imgdir = '/scratch/dir'
vfs.make_path('/some/dir')
read_link.assert_called()
mkdir.assert_called_with(read_link.return_value)
read_link.reset_mock()
mkdir.reset_mock()
vfs.make_path('/other/dir')
read_link.assert_called()
mkdir.assert_called_with(read_link.return_value)
@mock.patch.object(nova.privsep.path, 'readlink')
@mock.patch.object(nova.privsep.path, 'writefile')
def test_append_file(self, write_file, read_link):
vfs = vfsimpl.VFSLocalFS(self.qcowfile)
vfs.imgdir = '/scratch/dir'
vfs.append_file('/some/file', ' Goodbye')
read_link.assert_called()
write_file.assert_called_with(read_link.return_value, 'a', ' Goodbye')
@mock.patch.object(nova.privsep.path, 'readlink')
@mock.patch.object(nova.privsep.path, 'writefile')
def test_replace_file(self, write_file, read_link):
vfs = vfsimpl.VFSLocalFS(self.qcowfile)
vfs.imgdir = '/scratch/dir'
vfs.replace_file('/some/file', 'Goodbye')
read_link.assert_called()
write_file.assert_called_with(read_link.return_value, 'w', 'Goodbye')
@mock.patch.object(nova.privsep.path, 'readlink')
@mock.patch.object(nova.privsep.path, 'readfile')
def test_read_file(self, read_file, read_link):
vfs = vfsimpl.VFSLocalFS(self.qcowfile)
vfs.imgdir = '/scratch/dir'
self.assertEqual(read_file.return_value, vfs.read_file('/some/file'))
read_link.assert_called()
read_file.assert_called()
@mock.patch.object(nova.privsep.path.path, 'exists')
def test_has_file(self, exists):
vfs = vfsimpl.VFSLocalFS(self.qcowfile)
vfs.imgdir = '/scratch/dir'
has = vfs.has_file('/some/file')
self.assertEqual(exists.return_value, has)
@mock.patch.object(nova.privsep.path, 'readlink')
@mock.patch.object(nova.privsep.path, 'chmod')
def test_set_permissions(self, chmod, read_link):
vfs = vfsimpl.VFSLocalFS(self.qcowfile)
vfs.imgdir = '/scratch/dir'
vfs.set_permissions('/some/file', 0o777)
read_link.assert_called()
chmod.assert_called_with(read_link.return_value, 0o777)
@mock.patch.object(nova.privsep.path, 'readlink')
@mock.patch.object(nova.privsep.path, 'chown')
@mock.patch.object(pwd, 'getpwnam')
@mock.patch.object(grp, 'getgrnam')
def test_set_ownership(self, getgrnam, getpwnam, chown, read_link):
vfs = vfsimpl.VFSLocalFS(self.qcowfile)
vfs.imgdir = '/scratch/dir'
fake_passwd = namedtuple('fake_passwd', 'pw_uid')
getpwnam.return_value(fake_passwd(pw_uid=100))
fake_group = namedtuple('fake_group', 'gr_gid')
getgrnam.return_value(fake_group(gr_gid=101))
vfs.set_ownership('/some/file', 'fred', None)
read_link.assert_called()
chown.assert_called_with(read_link.return_value,
uid=getpwnam.return_value.pw_uid)
read_link.reset_mock()
chown.reset_mock()
vfs.set_ownership('/some/file', None, 'users')
read_link.assert_called()
chown.assert_called_with(read_link.return_value,
gid=getgrnam.return_value.gr_gid)
read_link.reset_mock()
chown.reset_mock()
vfs.set_ownership('/some/file', 'joe', 'admins')
read_link.assert_called()
chown.assert_called_with(read_link.return_value,
uid=getpwnam.return_value.pw_uid,
gid=getgrnam.return_value.gr_gid)
@mock.patch('nova.privsep.fs.get_filesystem_type',
return_value=('ext3\n', ''))
def test_get_format_fs(self, mock_type):
vfs = vfsimpl.VFSLocalFS(self.rawfile)
vfs.setup = mock.MagicMock()
vfs.teardown = mock.MagicMock()
def fake_setup():
vfs.mount = mock.MagicMock()
vfs.mount.device = None
vfs.mount.get_dev.side_effect = fake_get_dev
def fake_teardown():
vfs.mount.device = None
def fake_get_dev():
vfs.mount.device = '/dev/xyz'
return True
vfs.setup.side_effect = fake_setup
vfs.teardown.side_effect = fake_teardown
vfs.setup()
self.assertEqual('ext3', vfs.get_image_fs())
vfs.teardown()
vfs.mount.get_dev.assert_called_once_with()
mock_type.assert_called_once_with('/dev/xyz')
@mock.patch.object(tempfile, 'mkdtemp')
@mock.patch.object(nbd, 'NbdMount')
def test_setup_mount(self, NbdMount, mkdtemp):
vfs = vfsimpl.VFSLocalFS(self.qcowfile)
mounter = mock.MagicMock()
mkdtemp.return_value = 'tmp/'
NbdMount.return_value = mounter
vfs.setup()
self.assertTrue(mkdtemp.called)
NbdMount.assert_called_once_with(self.qcowfile, 'tmp/', None)
mounter.do_mount.assert_called_once_with()
@mock.patch.object(tempfile, 'mkdtemp')
@mock.patch.object(nbd, 'NbdMount')
def test_setup_mount_false(self, NbdMount, mkdtemp):
vfs = vfsimpl.VFSLocalFS(self.qcowfile)
mounter = mock.MagicMock()
mkdtemp.return_value = 'tmp/'
NbdMount.return_value = mounter
vfs.setup(mount=False)
self.assertTrue(mkdtemp.called)
NbdMount.assert_called_once_with(self.qcowfile, 'tmp/', None)
self.assertFalse(mounter.do_mount.called)

View File

@ -1,147 +0,0 @@
# Copyright 2012 Red Hat, Inc.
# Copyright 2017 Rackspace Australia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import grp
import os
import pwd
import tempfile
from oslo_log import log as logging
from oslo_utils import excutils
from nova import exception
from nova.i18n import _
import nova.privsep.fs
import nova.privsep.path
from nova.virt.disk.mount import api as mount_api
from nova.virt.disk.vfs import api as vfs
LOG = logging.getLogger(__name__)
class VFSLocalFS(vfs.VFS):
"""os.path.join() with safety check for injected file paths.
Join the supplied path components and make sure that the
resulting path we are injecting into is within the
mounted guest fs. Trying to be clever and specifying a
path with '..' in it will hit this safeguard.
"""
def _canonical_path(self, path):
canonpath = nova.privsep.path.readlink(path)
if not canonpath.startswith(os.path.realpath(self.imgdir) + '/'):
raise exception.Invalid(_('File path %s not valid') % path)
return canonpath
"""
This class implements a VFS module that is mapped to a virtual
root directory present on the host filesystem. This implementation
uses the nova.virt.disk.mount.Mount API to make virtual disk
images visible in the host filesystem. If the disk format is
raw, it will use the loopback mount impl, otherwise it will
use the qemu-nbd impl.
"""
def __init__(self, image, partition=None, imgdir=None):
"""Create a new local VFS instance
:param image: instance of nova.virt.image.model.Image
:param partition: the partition number of access
:param imgdir: the directory to mount the image at
"""
super(VFSLocalFS, self).__init__(image, partition)
self.imgdir = imgdir
self.mount = None
def setup(self, mount=True):
self.imgdir = tempfile.mkdtemp(prefix="openstack-vfs-localfs")
try:
mnt = mount_api.Mount.instance_for_format(self.image,
self.imgdir,
self.partition)
if mount:
if not mnt.do_mount():
raise exception.NovaException(mnt.error)
self.mount = mnt
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.debug("Failed to mount image: %(ex)s", {'ex': e})
self.teardown()
def teardown(self):
try:
if self.mount:
self.mount.do_teardown()
except Exception as e:
LOG.debug("Failed to unmount %(imgdir)s: %(ex)s",
{'imgdir': self.imgdir, 'ex': e})
try:
if self.imgdir:
os.rmdir(self.imgdir)
except Exception as e:
LOG.debug("Failed to remove %(imgdir)s: %(ex)s",
{'imgdir': self.imgdir, 'ex': e})
self.imgdir = None
self.mount = None
def make_path(self, path):
LOG.debug("Make directory path=%s", path)
nova.privsep.path.makedirs(self._canonical_path(path))
def append_file(self, path, content):
LOG.debug("Append file path=%s", path)
return nova.privsep.path.writefile(
self._canonical_path(path), 'a', content)
def replace_file(self, path, content):
LOG.debug("Replace file path=%s", path)
return nova.privsep.path.writefile(
self._canonical_path(path), 'w', content)
def read_file(self, path):
LOG.debug("Read file path=%s", path)
return nova.privsep.path.readfile(self._canonical_path(path))
def has_file(self, path):
# NOTE(mikal): it is deliberate that we don't generate a canonical
# path here, as that tests for existance and would raise an exception.
LOG.debug("Has file path=%s", path)
return nova.privsep.path.path.exists(path)
def set_permissions(self, path, mode):
LOG.debug("Set permissions path=%(path)s mode=%(mode)o",
{'path': path, 'mode': mode})
nova.privsep.path.chmod(self._canonical_path(path), mode)
def set_ownership(self, path, user, group):
LOG.debug("Set permissions path=%(path)s "
"user=%(user)s group=%(group)s",
{'path': path, 'user': user, 'group': group})
canonpath = self._canonical_path(path)
chown_kwargs = {}
if user:
chown_kwargs['uid'] = pwd.getpwnam(user).pw_uid
if group:
chown_kwargs['gid'] = grp.getgrnam(group).gr_gid
nova.privsep.path.chown(canonpath, **chown_kwargs)
def get_image_fs(self):
if self.mount.device or self.mount.get_dev():
out, err = nova.privsep.fs.get_filesystem_type(self.mount.device)
return out.strip()
return ""