Adds nova-idmapshift cli utility

nova-idmapshift will be used by the libvirt-lxc driver to
correctly shift ownership of instance rootfs for use with
user namespaces.

Partially implements: bp libvirt-lxc-user-namespaces

Change-Id: I9aee622cca4578149201f02d02231a7bd0cbe912
This commit is contained in:
Andrew Melton
2014-08-12 10:41:44 -04:00
parent f74f777658
commit e310985b77
4 changed files with 860 additions and 0 deletions

224
nova/cmd/idmapshift.py Normal file
View File

@@ -0,0 +1,224 @@
# Copyright 2014 Rackspace, Andrew Melton
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
##########
IDMapShift
##########
IDMapShift is a tool that properly sets the ownership of a filesystem for use
with linux user namespaces.
=====
Usage
=====
nova-idmapshift -i -u 0:10000:2000 -g 0:10000:2000 path
This command will idempotently shift `path` to proper ownership using
the provided uid and gid mappings.
=========
Arguments
=========
nova-idmapshift -i -c -d -v
-u [[guest-uid:host-uid:count],...]
-g [[guest-gid:host-gid:count],...]
-n [nobody-id]
path
path: Root path of the filesystem to be shifted
-i, --idempotent: Shift operation will only be performed if filesystem
appears unshifted
-c, --confirm: Will perform check on filesystem
Returns 0 when filesystem appears shifted
Returns 1 when filesystem appears unshifted
-d, --dry-run: Print chown operations, but won't perform them
-v, --verbose: Print chown operations while performing them
-u, --uid: User ID mappings, maximum of 3 ranges
-g, --gid: Group ID mappings, maximum of 3 ranges
-n, --nobody: ID to map all unmapped uid and gids to.
=======
Purpose
=======
When using user namespaces with linux containers, the filesystem of the
container must be owned by the targeted user and group ids being applied
to that container. Otherwise, processes inside the container won't be able
to access the filesystem.
For example, when using the id map string '0:10000:2000', this means that
user ids inside the container between 0 and 1999 will map to user ids on
the host between 10000 and 11999. Root (0) becomes 10000, user 1 becomes
10001, user 50 becomes 10050 and user 1999 becomes 11999. This means that
files that are owned by root need to actually be owned by user 10000, and
files owned by 50 need to be owned by 10050, and so on.
IDMapShift will take the uid and gid strings used for user namespaces and
properly set up the filesystem for use by those users. Uids and gids outside
of provided ranges will be mapped to nobody (max uid/gid) so that they are
inaccessible inside the container.
"""
import argparse
import os
import sys
NOBODY_ID = 65534
def find_target_id(fsid, mappings, nobody, memo):
if fsid not in memo:
for start, target, count in mappings:
if start <= fsid < start + count:
memo[fsid] = (fsid - start) + target
break
else:
memo[fsid] = nobody
return memo[fsid]
def print_chown(path, uid, gid, target_uid, target_gid):
print('%s %s:%s -> %s:%s' % (path, uid, gid, target_uid, target_gid))
def shift_path(path, uid_mappings, gid_mappings, nobody, uid_memo, gid_memo,
dry_run=False, verbose=False):
stat = os.lstat(path)
uid = stat.st_uid
gid = stat.st_gid
target_uid = find_target_id(uid, uid_mappings, nobody, uid_memo)
target_gid = find_target_id(gid, gid_mappings, nobody, gid_memo)
if verbose:
print_chown(path, uid, gid, target_uid, target_gid)
if not dry_run:
os.lchown(path, target_uid, target_gid)
def shift_dir(fsdir, uid_mappings, gid_mappings, nobody,
dry_run=False, verbose=False):
uid_memo = dict()
gid_memo = dict()
def shift_path_short(p):
shift_path(p, uid_mappings, gid_mappings, nobody,
dry_run=dry_run, verbose=verbose,
uid_memo=uid_memo, gid_memo=gid_memo)
shift_path_short(fsdir)
for root, dirs, files in os.walk(fsdir):
for d in dirs:
path = os.path.join(root, d)
shift_path_short(path)
for f in files:
path = os.path.join(root, f)
shift_path_short(path)
def confirm_path(path, uid_ranges, gid_ranges, nobody):
stat = os.lstat(path)
uid = stat.st_uid
gid = stat.st_gid
uid_in_range = True if uid == nobody else False
gid_in_range = True if gid == nobody else False
if not uid_in_range or not gid_in_range:
for (start, end) in uid_ranges:
if start <= uid <= end:
uid_in_range = True
break
for (start, end) in gid_ranges:
if start <= gid <= end:
gid_in_range = True
break
return uid_in_range and gid_in_range
def get_ranges(maps):
return [(target, target + count - 1) for (start, target, count) in maps]
def confirm_dir(fsdir, uid_mappings, gid_mappings, nobody):
uid_ranges = get_ranges(uid_mappings)
gid_ranges = get_ranges(gid_mappings)
if not confirm_path(fsdir, uid_ranges, gid_ranges, nobody):
return False
for root, dirs, files in os.walk(fsdir):
for d in dirs:
path = os.path.join(root, d)
if not confirm_path(path, uid_ranges, gid_ranges, nobody):
return False
for f in files:
path = os.path.join(root, f)
if not confirm_path(path, uid_ranges, gid_ranges, nobody):
return False
return True
def id_map_type(val):
maps = val.split(',')
id_maps = []
for m in maps:
map_vals = m.split(':')
if len(map_vals) != 3:
msg = ('Invalid id map %s, correct syntax is '
'guest-id:host-id:count.')
raise argparse.ArgumentTypeError(msg % val)
try:
vals = [int(i) for i in map_vals]
except ValueError:
msg = 'Invalid id map %s, values must be integers' % val
raise argparse.ArgumentTypeError(msg)
id_maps.append(tuple(vals))
return id_maps
def main():
parser = argparse.ArgumentParser('User Namespace FS Owner Shift')
parser.add_argument('path')
parser.add_argument('-u', '--uid', type=id_map_type, default=[])
parser.add_argument('-g', '--gid', type=id_map_type, default=[])
parser.add_argument('-n', '--nobody', default=NOBODY_ID, type=int)
parser.add_argument('-i', '--idempotent', action='store_true')
parser.add_argument('-c', '--confirm', action='store_true')
parser.add_argument('-d', '--dry-run', action='store_true')
parser.add_argument('-v', '--verbose', action='store_true')
args = parser.parse_args()
if args.idempotent or args.confirm:
if confirm_dir(args.path, args.uid, args.gid, args.nobody):
sys.exit(0)
else:
if args.confirm:
sys.exit(1)
shift_dir(args.path, args.uid, args.gid, args.nobody,
dry_run=args.dry_run, verbose=args.verbose)

View File

View File

@@ -0,0 +1,635 @@
# Copyright 2014 Rackspace, Andrew Melton
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import unittest
import mock
from nova.cmd import idmapshift
def join_side_effect(root, *args):
path = root
if root != '/':
path += '/'
path += '/'.join(args)
return path
class FakeStat(object):
def __init__(self, uid, gid):
self.st_uid = uid
self.st_gid = gid
class BaseTestCase(unittest.TestCase):
def __init__(self, *args, **kwargs):
super(BaseTestCase, self).__init__(*args, **kwargs)
self.uid_maps = [(0, 10000, 10), (10, 20000, 1000)]
self.gid_maps = [(0, 10000, 10), (10, 20000, 1000)]
class FindTargetIDTestCase(BaseTestCase):
def test_find_target_id_range_1_first(self):
actual_target = idmapshift.find_target_id(0, self.uid_maps,
idmapshift.NOBODY_ID, dict())
self.assertEqual(10000, actual_target)
def test_find_target_id_inside_range_1(self):
actual_target = idmapshift.find_target_id(2, self.uid_maps,
idmapshift.NOBODY_ID, dict())
self.assertEqual(10002, actual_target)
def test_find_target_id_range_2_first(self):
actual_target = idmapshift.find_target_id(10, self.uid_maps,
idmapshift.NOBODY_ID, dict())
self.assertEqual(20000, actual_target)
def test_find_target_id_inside_range_2(self):
actual_target = idmapshift.find_target_id(100, self.uid_maps,
idmapshift.NOBODY_ID, dict())
self.assertEqual(20090, actual_target)
def test_find_target_id_outside_range(self):
actual_target = idmapshift.find_target_id(10000, self.uid_maps,
idmapshift.NOBODY_ID, dict())
self.assertEqual(idmapshift.NOBODY_ID, actual_target)
def test_find_target_id_no_mappings(self):
actual_target = idmapshift.find_target_id(0, [],
idmapshift.NOBODY_ID, dict())
self.assertEqual(idmapshift.NOBODY_ID, actual_target)
def test_find_target_id_updates_memo(self):
memo = dict()
idmapshift.find_target_id(0, self.uid_maps, idmapshift.NOBODY_ID, memo)
self.assertTrue(0 in memo)
self.assertEqual(10000, memo[0])
def test_find_target_guest_id_greater_than_count(self):
uid_maps = [(500, 10000, 10)]
# Below range
actual_target = idmapshift.find_target_id(499, uid_maps,
idmapshift.NOBODY_ID, dict())
self.assertEqual(idmapshift.NOBODY_ID, actual_target)
# Match
actual_target = idmapshift.find_target_id(501, uid_maps,
idmapshift.NOBODY_ID, dict())
self.assertEqual(10001, actual_target)
# Beyond range
actual_target = idmapshift.find_target_id(510, uid_maps,
idmapshift.NOBODY_ID, dict())
self.assertEqual(idmapshift.NOBODY_ID, actual_target)
class ShiftPathTestCase(BaseTestCase):
@mock.patch('os.lchown')
@mock.patch('os.lstat')
def test_shift_path(self, mock_lstat, mock_lchown):
mock_lstat.return_value = FakeStat(0, 0)
idmapshift.shift_path('/test/path', self.uid_maps, self.gid_maps,
idmapshift.NOBODY_ID, dict(), dict())
mock_lstat.assert_has_calls([mock.call('/test/path')])
mock_lchown.assert_has_calls([mock.call('/test/path', 10000, 10000)])
@mock.patch('os.lchown')
@mock.patch('os.lstat')
def test_shift_path_dry_run(self, mock_lstat, mock_lchown):
mock_lstat.return_value = FakeStat(0, 0)
idmapshift.shift_path('/test/path', self.uid_maps, self.gid_maps,
idmapshift.NOBODY_ID, dict(), dict(),
dry_run=True)
mock_lstat.assert_has_calls([mock.call('/test/path')])
self.assertEqual(0, len(mock_lchown.mock_calls))
@mock.patch('os.lchown')
@mock.patch('nova.cmd.idmapshift.print_chown')
@mock.patch('os.lstat')
def test_shift_path_verbose(self, mock_lstat, mock_print, mock_lchown):
mock_lstat.return_value = FakeStat(0, 0)
idmapshift.shift_path('/test/path', self.uid_maps, self.gid_maps,
idmapshift.NOBODY_ID, dict(), dict(),
verbose=True)
mock_lstat.assert_has_calls([mock.call('/test/path')])
mock_print_call = mock.call('/test/path', 0, 0, 10000, 10000)
mock_print.assert_has_calls([mock_print_call])
mock_lchown.assert_has_calls([mock.call('/test/path', 10000, 10000)])
class ShiftDirTestCase(BaseTestCase):
@mock.patch('nova.cmd.idmapshift.shift_path')
@mock.patch('os.path.join')
@mock.patch('os.walk')
def test_shift_dir(self, mock_walk, mock_join, mock_shift_path):
mock_walk.return_value = [('/', ['a', 'b'], ['c', 'd'])]
mock_join.side_effect = join_side_effect
idmapshift.shift_dir('/', self.uid_maps, self.gid_maps,
idmapshift.NOBODY_ID)
files = ['a', 'b', 'c', 'd']
mock_walk.assert_has_calls([mock.call('/')])
mock_join_calls = [mock.call('/', x) for x in files]
mock_join.assert_has_calls(mock_join_calls)
args = (self.uid_maps, self.gid_maps, idmapshift.NOBODY_ID)
kwargs = dict(dry_run=False, verbose=False,
uid_memo=dict(), gid_memo=dict())
shift_path_calls = [mock.call('/', *args, **kwargs)]
shift_path_calls += [mock.call('/' + x, *args, **kwargs)
for x in files]
mock_shift_path.assert_has_calls(shift_path_calls)
@mock.patch('nova.cmd.idmapshift.shift_path')
@mock.patch('os.path.join')
@mock.patch('os.walk')
def test_shift_dir_dry_run(self, mock_walk, mock_join, mock_shift_path):
mock_walk.return_value = [('/', ['a', 'b'], ['c', 'd'])]
mock_join.side_effect = join_side_effect
idmapshift.shift_dir('/', self.uid_maps, self.gid_maps,
idmapshift.NOBODY_ID, dry_run=True)
mock_walk.assert_has_calls([mock.call('/')])
files = ['a', 'b', 'c', 'd']
mock_join_calls = [mock.call('/', x) for x in files]
mock_join.assert_has_calls(mock_join_calls)
args = (self.uid_maps, self.gid_maps, idmapshift.NOBODY_ID)
kwargs = dict(dry_run=True, verbose=False,
uid_memo=dict(), gid_memo=dict())
shift_path_calls = [mock.call('/', *args, **kwargs)]
shift_path_calls += [mock.call('/' + x, *args, **kwargs)
for x in files]
mock_shift_path.assert_has_calls(shift_path_calls)
class ConfirmPathTestCase(unittest.TestCase):
@mock.patch('os.lstat')
def test_confirm_path(self, mock_lstat):
uid_ranges = [(1000, 1999)]
gid_ranges = [(300, 399)]
mock_lstat.return_value = FakeStat(1000, 301)
result = idmapshift.confirm_path('/test/path', uid_ranges, gid_ranges,
50000)
mock_lstat.assert_has_calls(mock.call('/test/path'))
self.assertTrue(result)
@mock.patch('os.lstat')
def test_confirm_path_nobody(self, mock_lstat):
uid_ranges = [(1000, 1999)]
gid_ranges = [(300, 399)]
mock_lstat.return_value = FakeStat(50000, 50000)
result = idmapshift.confirm_path('/test/path', uid_ranges, gid_ranges,
50000)
mock_lstat.assert_has_calls(mock.call('/test/path'))
self.assertTrue(result)
@mock.patch('os.lstat')
def test_confirm_path_uid_mismatch(self, mock_lstat):
uid_ranges = [(1000, 1999)]
gid_ranges = [(300, 399)]
mock_lstat.return_value = FakeStat(0, 301)
result = idmapshift.confirm_path('/test/path', uid_ranges, gid_ranges,
50000)
mock_lstat.assert_has_calls(mock.call('/test/path'))
self.assertFalse(result)
@mock.patch('os.lstat')
def test_confirm_path_gid_mismatch(self, mock_lstat):
uid_ranges = [(1000, 1999)]
gid_ranges = [(300, 399)]
mock_lstat.return_value = FakeStat(1000, 0)
result = idmapshift.confirm_path('/test/path', uid_ranges, gid_ranges,
50000)
mock_lstat.assert_has_calls(mock.call('/test/path'))
self.assertFalse(result)
@mock.patch('os.lstat')
def test_confirm_path_uid_nobody(self, mock_lstat):
uid_ranges = [(1000, 1999)]
gid_ranges = [(300, 399)]
mock_lstat.return_value = FakeStat(50000, 301)
result = idmapshift.confirm_path('/test/path', uid_ranges, gid_ranges,
50000)
mock_lstat.assert_has_calls(mock.call('/test/path'))
self.assertTrue(result)
@mock.patch('os.lstat')
def test_confirm_path_gid_nobody(self, mock_lstat):
uid_ranges = [(1000, 1999)]
gid_ranges = [(300, 399)]
mock_lstat.return_value = FakeStat(1000, 50000)
result = idmapshift.confirm_path('/test/path', uid_ranges, gid_ranges,
50000)
mock_lstat.assert_has_calls(mock.call('/test/path'))
self.assertTrue(result)
class ConfirmDirTestCase(BaseTestCase):
def setUp(self):
self.uid_map_ranges = idmapshift.get_ranges(self.uid_maps)
self.gid_map_ranges = idmapshift.get_ranges(self.gid_maps)
@mock.patch('nova.cmd.idmapshift.confirm_path')
@mock.patch('os.path.join')
@mock.patch('os.walk')
def test_confirm_dir(self, mock_walk, mock_join, mock_confirm_path):
mock_walk.return_value = [('/', ['a', 'b'], ['c', 'd'])]
mock_join.side_effect = join_side_effect
mock_confirm_path.return_value = True
idmapshift.confirm_dir('/', self.uid_maps, self.gid_maps,
idmapshift.NOBODY_ID)
files = ['a', 'b', 'c', 'd']
mock_walk.assert_has_calls([mock.call('/')])
mock_join_calls = [mock.call('/', x) for x in files]
mock_join.assert_has_calls(mock_join_calls)
args = (self.uid_map_ranges, self.gid_map_ranges, idmapshift.NOBODY_ID)
confirm_path_calls = [mock.call('/', *args)]
confirm_path_calls += [mock.call('/' + x, *args)
for x in files]
mock_confirm_path.assert_has_calls(confirm_path_calls)
@mock.patch('nova.cmd.idmapshift.confirm_path')
@mock.patch('os.path.join')
@mock.patch('os.walk')
def test_confirm_dir_short_circuit_root(self, mock_walk, mock_join,
mock_confirm_path):
mock_walk.return_value = [('/', ['a', 'b'], ['c', 'd'])]
mock_join.side_effect = join_side_effect
mock_confirm_path.return_value = False
idmapshift.confirm_dir('/', self.uid_maps, self.gid_maps,
idmapshift.NOBODY_ID)
args = (self.uid_map_ranges, self.gid_map_ranges, idmapshift.NOBODY_ID)
confirm_path_calls = [mock.call('/', *args)]
mock_confirm_path.assert_has_calls(confirm_path_calls)
@mock.patch('nova.cmd.idmapshift.confirm_path')
@mock.patch('os.path.join')
@mock.patch('os.walk')
def test_confirm_dir_short_circuit_file(self, mock_walk, mock_join,
mock_confirm_path):
mock_walk.return_value = [('/', ['a', 'b'], ['c', 'd'])]
mock_join.side_effect = join_side_effect
def confirm_path_side_effect(path, *args):
if 'a' in path:
return False
return True
mock_confirm_path.side_effect = confirm_path_side_effect
idmapshift.confirm_dir('/', self.uid_maps, self.gid_maps,
idmapshift.NOBODY_ID)
mock_walk.assert_has_calls([mock.call('/')])
mock_join.assert_has_calls([mock.call('/', 'a')])
args = (self.uid_map_ranges, self.gid_map_ranges, idmapshift.NOBODY_ID)
confirm_path_calls = [mock.call('/', *args),
mock.call('/' + 'a', *args)]
mock_confirm_path.assert_has_calls(confirm_path_calls)
@mock.patch('nova.cmd.idmapshift.confirm_path')
@mock.patch('os.path.join')
@mock.patch('os.walk')
def test_confirm_dir_short_circuit_dir(self, mock_walk, mock_join,
mock_confirm_path):
mock_walk.return_value = [('/', ['a', 'b'], ['c', 'd'])]
mock_join.side_effect = join_side_effect
def confirm_path_side_effect(path, *args):
if 'c' in path:
return False
return True
mock_confirm_path.side_effect = confirm_path_side_effect
idmapshift.confirm_dir('/', self.uid_maps, self.gid_maps,
idmapshift.NOBODY_ID)
files = ['a', 'b', 'c']
mock_walk.assert_has_calls([mock.call('/')])
mock_join_calls = [mock.call('/', x) for x in files]
mock_join.assert_has_calls(mock_join_calls)
args = (self.uid_map_ranges, self.gid_map_ranges, idmapshift.NOBODY_ID)
confirm_path_calls = [mock.call('/', *args)]
confirm_path_calls += [mock.call('/' + x, *args)
for x in files]
mock_confirm_path.assert_has_calls(confirm_path_calls)
class IDMapTypeTestCase(unittest.TestCase):
def test_id_map_type(self):
result = idmapshift.id_map_type("1:1:1,2:2:2")
self.assertEqual([(1, 1, 1), (2, 2, 2)], result)
def test_id_map_type_not_int(self):
self.assertRaises(argparse.ArgumentTypeError, idmapshift.id_map_type,
"a:1:1")
def test_id_map_type_not_proper_format(self):
self.assertRaises(argparse.ArgumentTypeError, idmapshift.id_map_type,
"1:1")
class MainTestCase(BaseTestCase):
@mock.patch('nova.cmd.idmapshift.shift_dir')
@mock.patch('argparse.ArgumentParser')
def test_main(self, mock_parser_class, mock_shift_dir):
mock_parser = mock.MagicMock()
mock_parser.parse_args.return_value = mock_parser
mock_parser.idempotent = False
mock_parser.confirm = False
mock_parser.path = '/test/path'
mock_parser.uid = self.uid_maps
mock_parser.gid = self.gid_maps
mock_parser.nobody = idmapshift.NOBODY_ID
mock_parser.dry_run = False
mock_parser.verbose = False
mock_parser_class.return_value = mock_parser
idmapshift.main()
mock_shift_dir_call = mock.call('/test/path', self.uid_maps,
self.gid_maps, idmapshift.NOBODY_ID,
dry_run=False, verbose=False)
mock_shift_dir.assert_has_calls([mock_shift_dir_call])
@mock.patch('nova.cmd.idmapshift.shift_dir')
@mock.patch('nova.cmd.idmapshift.confirm_dir')
@mock.patch('argparse.ArgumentParser')
def test_main_confirm_dir_idempotent_unshifted(self, mock_parser_class,
mock_confirm_dir,
mock_shift_dir):
mock_parser = mock.MagicMock()
mock_parser.parse_args.return_value = mock_parser
mock_parser.idempotent = True
mock_parser.confirm = False
mock_parser.path = '/test/path'
mock_parser.uid = self.uid_maps
mock_parser.gid = self.gid_maps
mock_parser.nobody = idmapshift.NOBODY_ID
mock_parser.dry_run = False
mock_parser.verbose = False
mock_parser_class.return_value = mock_parser
mock_confirm_dir.return_value = False
idmapshift.main()
mock_confirm_dir_call = mock.call('/test/path', self.uid_maps,
self.gid_maps, idmapshift.NOBODY_ID)
mock_confirm_dir.assert_has_calls([mock_confirm_dir_call])
mock_shift_dir_call = mock.call('/test/path', self.uid_maps,
self.gid_maps, idmapshift.NOBODY_ID,
dry_run=False, verbose=False)
mock_shift_dir.assert_has_calls([mock_shift_dir_call])
@mock.patch('nova.cmd.idmapshift.shift_dir')
@mock.patch('nova.cmd.idmapshift.confirm_dir')
@mock.patch('argparse.ArgumentParser')
def test_main_confirm_dir_idempotent_shifted(self, mock_parser_class,
mock_confirm_dir,
mock_shift_dir):
mock_parser = mock.MagicMock()
mock_parser.parse_args.return_value = mock_parser
mock_parser.idempotent = True
mock_parser.confirm = False
mock_parser.path = '/test/path'
mock_parser.uid = self.uid_maps
mock_parser.gid = self.gid_maps
mock_parser.nobody = idmapshift.NOBODY_ID
mock_parser.dry_run = False
mock_parser.verbose = False
mock_parser_class.return_value = mock_parser
mock_confirm_dir.return_value = True
try:
idmapshift.main()
except SystemExit as sys_exit:
self.assertEqual(sys_exit.code, 0)
mock_confirm_dir_call = mock.call('/test/path', self.uid_maps,
self.gid_maps, idmapshift.NOBODY_ID)
mock_confirm_dir.assert_has_calls([mock_confirm_dir_call])
mock_shift_dir.assert_has_calls([])
@mock.patch('nova.cmd.idmapshift.shift_dir')
@mock.patch('nova.cmd.idmapshift.confirm_dir')
@mock.patch('argparse.ArgumentParser')
def test_main_confirm_dir_confirm_unshifted(self, mock_parser_class,
mock_confirm_dir,
mock_shift_dir):
mock_parser = mock.MagicMock()
mock_parser.parse_args.return_value = mock_parser
mock_parser.idempotent = False
mock_parser.confirm = True
mock_parser.exit_on_fail = True
mock_parser.path = '/test/path'
mock_parser.uid = self.uid_maps
mock_parser.gid = self.gid_maps
mock_parser.nobody = idmapshift.NOBODY_ID
mock_parser.dry_run = False
mock_parser.verbose = False
mock_parser_class.return_value = mock_parser
mock_confirm_dir.return_value = False
try:
idmapshift.main()
except SystemExit as sys_exit:
self.assertEqual(sys_exit.code, 1)
mock_confirm_dir_call = mock.call('/test/path', self.uid_maps,
self.gid_maps, idmapshift.NOBODY_ID)
mock_confirm_dir.assert_has_calls([mock_confirm_dir_call])
mock_shift_dir.assert_has_calls([])
@mock.patch('nova.cmd.idmapshift.shift_dir')
@mock.patch('nova.cmd.idmapshift.confirm_dir')
@mock.patch('argparse.ArgumentParser')
def test_main_confirm_dir_confirm_shifted(self, mock_parser_class,
mock_confirm_dir,
mock_shift_dir):
mock_parser = mock.MagicMock()
mock_parser.parse_args.return_value = mock_parser
mock_parser.idempotent = False
mock_parser.confirm = True
mock_parser.exit_on_fail = True
mock_parser.path = '/test/path'
mock_parser.uid = self.uid_maps
mock_parser.gid = self.gid_maps
mock_parser.nobody = idmapshift.NOBODY_ID
mock_parser.dry_run = False
mock_parser.verbose = False
mock_parser_class.return_value = mock_parser
mock_confirm_dir.return_value = True
try:
idmapshift.main()
except SystemExit as sys_exit:
self.assertEqual(sys_exit.code, 0)
mock_confirm_dir_call = mock.call('/test/path', self.uid_maps,
self.gid_maps, idmapshift.NOBODY_ID)
mock_confirm_dir.assert_has_calls([mock_confirm_dir_call])
mock_shift_dir.assert_has_calls([])
class IntegrationTestCase(BaseTestCase):
@mock.patch('os.lchown')
@mock.patch('os.lstat')
@mock.patch('os.path.join')
@mock.patch('os.walk')
def test_integrated_shift_dir(self, mock_walk, mock_join, mock_lstat,
mock_lchown):
mock_walk.return_value = [('/tmp/test', ['a', 'b', 'c'], ['d']),
('/tmp/test/d', ['1', '2'], [])]
mock_join.side_effect = join_side_effect
def lstat(path):
stats = {
't': FakeStat(0, 0),
'a': FakeStat(0, 0),
'b': FakeStat(0, 2),
'c': FakeStat(30000, 30000),
'd': FakeStat(100, 100),
'1': FakeStat(0, 100),
'2': FakeStat(100, 100),
}
return stats[path[-1]]
mock_lstat.side_effect = lstat
idmapshift.shift_dir('/tmp/test', self.uid_maps, self.gid_maps,
idmapshift.NOBODY_ID, verbose=True)
lchown_calls = [
mock.call('/tmp/test', 10000, 10000),
mock.call('/tmp/test/a', 10000, 10000),
mock.call('/tmp/test/b', 10000, 10002),
mock.call('/tmp/test/c', idmapshift.NOBODY_ID,
idmapshift.NOBODY_ID),
mock.call('/tmp/test/d', 20090, 20090),
mock.call('/tmp/test/d/1', 10000, 20090),
mock.call('/tmp/test/d/2', 20090, 20090),
]
mock_lchown.assert_has_calls(lchown_calls)
@mock.patch('os.lchown')
@mock.patch('os.lstat')
@mock.patch('os.path.join')
@mock.patch('os.walk')
def test_integrated_shift_dir_dry_run(self, mock_walk, mock_join,
mock_lstat, mock_lchown):
mock_walk.return_value = [('/tmp/test', ['a', 'b', 'c'], ['d']),
('/tmp/test/d', ['1', '2'], [])]
mock_join.side_effect = join_side_effect
def lstat(path):
stats = {
't': FakeStat(0, 0),
'a': FakeStat(0, 0),
'b': FakeStat(0, 2),
'c': FakeStat(30000, 30000),
'd': FakeStat(100, 100),
'1': FakeStat(0, 100),
'2': FakeStat(100, 100),
}
return stats[path[-1]]
mock_lstat.side_effect = lstat
idmapshift.shift_dir('/tmp/test', self.uid_maps, self.gid_maps,
idmapshift.NOBODY_ID, dry_run=True, verbose=True)
self.assertEqual(0, len(mock_lchown.mock_calls))
@mock.patch('os.lstat')
@mock.patch('os.path.join')
@mock.patch('os.walk')
def test_integrated_confirm_dir_shifted(self, mock_walk, mock_join,
mock_lstat):
mock_walk.return_value = [('/tmp/test', ['a', 'b', 'c'], ['d']),
('/tmp/test/d', ['1', '2'], [])]
mock_join.side_effect = join_side_effect
def lstat(path):
stats = {
't': FakeStat(10000, 10000),
'a': FakeStat(10000, 10000),
'b': FakeStat(10000, 10002),
'c': FakeStat(idmapshift.NOBODY_ID, idmapshift.NOBODY_ID),
'd': FakeStat(20090, 20090),
'1': FakeStat(10000, 20090),
'2': FakeStat(20090, 20090),
}
return stats[path[-1]]
mock_lstat.side_effect = lstat
result = idmapshift.confirm_dir('/tmp/test', self.uid_maps,
self.gid_maps, idmapshift.NOBODY_ID)
self.assertTrue(result)
@mock.patch('os.lstat')
@mock.patch('os.path.join')
@mock.patch('os.walk')
def test_integrated_confirm_dir_unshifted(self, mock_walk, mock_join,
mock_lstat):
mock_walk.return_value = [('/tmp/test', ['a', 'b', 'c'], ['d']),
('/tmp/test/d', ['1', '2'], [])]
mock_join.side_effect = join_side_effect
def lstat(path):
stats = {
't': FakeStat(0, 0),
'a': FakeStat(0, 0),
'b': FakeStat(0, 2),
'c': FakeStat(30000, 30000),
'd': FakeStat(100, 100),
'1': FakeStat(0, 100),
'2': FakeStat(100, 100),
}
return stats[path[-1]]
mock_lstat.side_effect = lstat
result = idmapshift.confirm_dir('/tmp/test', self.uid_maps,
self.gid_maps, idmapshift.NOBODY_ID)
self.assertFalse(result)

View File

@@ -46,6 +46,7 @@ console_scripts =
nova-console = nova.cmd.console:main
nova-consoleauth = nova.cmd.consoleauth:main
nova-dhcpbridge = nova.cmd.dhcpbridge:main
nova-idmapshift = nova.cmd.idmapshift:main
nova-manage = nova.cmd.manage:main
nova-network = nova.cmd.network:main
nova-novncproxy = nova.cmd.novncproxy:main