Merge "Use common rootwrap from oslo-incubator"
This commit is contained in:
		| @@ -16,20 +16,18 @@ | ||||
| #    License for the specific language governing permissions and limitations | ||||
| #    under the License. | ||||
|  | ||||
| """Root wrapper for Nova | ||||
| """Root wrapper for OpenStack services | ||||
|  | ||||
|    Filters which commands nova is allowed to run as another user. | ||||
|    Filters which commands a service is allowed to run as another user. | ||||
|  | ||||
|    To use this, you should set the following in nova.conf: | ||||
|    To use this with nova, you should set the following in nova.conf: | ||||
|    rootwrap_config=/etc/nova/rootwrap.conf | ||||
|  | ||||
|    You also need to let the nova user run nova-rootwrap as root in sudoers: | ||||
|    nova ALL = (root) NOPASSWD: /usr/bin/nova-rootwrap /etc/nova/rootwrap.conf * | ||||
|  | ||||
|    To make allowed commands node-specific, your packaging should only | ||||
|    install {compute,network,volume}.filters respectively on compute, network | ||||
|    and volume nodes (i.e. nova-api nodes should not have any of those files | ||||
|    installed). | ||||
|    Service packaging should deploy .filters files only on nodes where they are | ||||
|    needed, to avoid allowing more than is necessary. | ||||
| """ | ||||
|  | ||||
| import ConfigParser | ||||
| @@ -75,7 +73,7 @@ if __name__ == '__main__': | ||||
|     if os.path.exists(os.path.join(possible_topdir, "nova", "__init__.py")): | ||||
|         sys.path.insert(0, possible_topdir) | ||||
|  | ||||
|     from nova.rootwrap import wrapper | ||||
|     from nova.openstack.common.rootwrap import wrapper | ||||
|  | ||||
|     # Load configuration | ||||
|     try: | ||||
|   | ||||
| @@ -1,16 +0,0 @@ | ||||
| # vim: tabstop=4 shiftwidth=4 softtabstop=4 | ||||
|  | ||||
| # Copyright (c) 2011 OpenStack, LLC. | ||||
| # All Rights Reserved. | ||||
| # | ||||
| #    Licensed under the Apache License, Version 2.0 (the "License"); you may | ||||
| #    not use this file except in compliance with the License. You may obtain | ||||
| #    a copy of the License at | ||||
| # | ||||
| #         http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| #    Unless required by applicable law or agreed to in writing, software | ||||
| #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||||
| #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||||
| #    License for the specific language governing permissions and limitations | ||||
| #    under the License. | ||||
| @@ -1,180 +0,0 @@ | ||||
| # vim: tabstop=4 shiftwidth=4 softtabstop=4 | ||||
|  | ||||
| # Copyright (c) 2011 OpenStack, LLC. | ||||
| # All Rights Reserved. | ||||
| # | ||||
| #    Licensed under the Apache License, Version 2.0 (the "License"); you may | ||||
| #    not use this file except in compliance with the License. You may obtain | ||||
| #    a copy of the License at | ||||
| # | ||||
| #         http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| #    Unless required by applicable law or agreed to in writing, software | ||||
| #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||||
| #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||||
| #    License for the specific language governing permissions and limitations | ||||
| #    under the License. | ||||
|  | ||||
| import os | ||||
| import re | ||||
|  | ||||
|  | ||||
| class CommandFilter(object): | ||||
|     """Command filter only checking that the 1st argument matches exec_path.""" | ||||
|  | ||||
|     def __init__(self, exec_path, run_as, *args): | ||||
|         self.name = '' | ||||
|         self.exec_path = exec_path | ||||
|         self.run_as = run_as | ||||
|         self.args = args | ||||
|         self.real_exec = None | ||||
|  | ||||
|     def get_exec(self, exec_dirs=[]): | ||||
|         """Returns existing executable, or empty string if none found.""" | ||||
|         if self.real_exec is not None: | ||||
|             return self.real_exec | ||||
|         self.real_exec = "" | ||||
|         if self.exec_path.startswith('/'): | ||||
|             if os.access(self.exec_path, os.X_OK): | ||||
|                 self.real_exec = self.exec_path | ||||
|         else: | ||||
|             for binary_path in exec_dirs: | ||||
|                 expanded_path = os.path.join(binary_path, self.exec_path) | ||||
|                 if os.access(expanded_path, os.X_OK): | ||||
|                     self.real_exec = expanded_path | ||||
|                     break | ||||
|         return self.real_exec | ||||
|  | ||||
|     def match(self, userargs): | ||||
|         """Only check that the first argument (command) matches exec_path.""" | ||||
|         if (os.path.basename(self.exec_path) == userargs[0]): | ||||
|             return True | ||||
|         return False | ||||
|  | ||||
|     def get_command(self, userargs, exec_dirs=[]): | ||||
|         """Returns command to execute (with sudo -u if run_as != root).""" | ||||
|         to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path | ||||
|         if (self.run_as != 'root'): | ||||
|             # Used to run commands at lesser privileges | ||||
|             return ['sudo', '-u', self.run_as, to_exec] + userargs[1:] | ||||
|         return [to_exec] + userargs[1:] | ||||
|  | ||||
|     def get_environment(self, userargs): | ||||
|         """Returns specific environment to set, None if none.""" | ||||
|         return None | ||||
|  | ||||
|  | ||||
| class RegExpFilter(CommandFilter): | ||||
|     """Command filter doing regexp matching for every argument.""" | ||||
|  | ||||
|     def match(self, userargs): | ||||
|         # Early skip if command or number of args don't match | ||||
|         if (len(self.args) != len(userargs)): | ||||
|             # DENY: argument numbers don't match | ||||
|             return False | ||||
|         # Compare each arg (anchoring pattern explicitly at end of string) | ||||
|         for (pattern, arg) in zip(self.args, userargs): | ||||
|             try: | ||||
|                 if not re.match(pattern + '$', arg): | ||||
|                     break | ||||
|             except re.error: | ||||
|                 # DENY: Badly-formed filter | ||||
|                 return False | ||||
|         else: | ||||
|             # ALLOW: All arguments matched | ||||
|             return True | ||||
|  | ||||
|         # DENY: Some arguments did not match | ||||
|         return False | ||||
|  | ||||
|  | ||||
| class DnsmasqFilter(CommandFilter): | ||||
|     """Specific filter for the dnsmasq call (which includes env).""" | ||||
|  | ||||
|     CONFIG_FILE_ARG = 'CONFIG_FILE' | ||||
|  | ||||
|     def match(self, userargs): | ||||
|         if (userargs[0] == 'env' and | ||||
|             userargs[1].startswith(self.CONFIG_FILE_ARG) and | ||||
|             userargs[2].startswith('NETWORK_ID=') and | ||||
|             userargs[3] == 'dnsmasq'): | ||||
|             return True | ||||
|         return False | ||||
|  | ||||
|     def get_command(self, userargs, exec_dirs=[]): | ||||
|         to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path | ||||
|         dnsmasq_pos = userargs.index('dnsmasq') | ||||
|         return [to_exec] + userargs[dnsmasq_pos + 1:] | ||||
|  | ||||
|     def get_environment(self, userargs): | ||||
|         env = os.environ.copy() | ||||
|         env[self.CONFIG_FILE_ARG] = userargs[1].split('=')[-1] | ||||
|         env['NETWORK_ID'] = userargs[2].split('=')[-1] | ||||
|         return env | ||||
|  | ||||
|  | ||||
| class DeprecatedDnsmasqFilter(DnsmasqFilter): | ||||
|     """Variant of dnsmasq filter to support old-style FLAGFILE.""" | ||||
|     CONFIG_FILE_ARG = 'FLAGFILE' | ||||
|  | ||||
|  | ||||
| class KillFilter(CommandFilter): | ||||
|     """Specific filter for the kill calls. | ||||
|        1st argument is the user to run /bin/kill under | ||||
|        2nd argument is the location of the affected executable | ||||
|        Subsequent arguments list the accepted signals (if any) | ||||
|  | ||||
|        This filter relies on /proc to accurately determine affected | ||||
|        executable, so it will only work on procfs-capable systems (not OSX). | ||||
|     """ | ||||
|  | ||||
|     def __init__(self, *args): | ||||
|         super(KillFilter, self).__init__("/bin/kill", *args) | ||||
|  | ||||
|     def match(self, userargs): | ||||
|         if userargs[0] != "kill": | ||||
|             return False | ||||
|         args = list(userargs) | ||||
|         if len(args) == 3: | ||||
|             # A specific signal is requested | ||||
|             signal = args.pop(1) | ||||
|             if signal not in self.args[1:]: | ||||
|                 # Requested signal not in accepted list | ||||
|                 return False | ||||
|         else: | ||||
|             if len(args) != 2: | ||||
|                 # Incorrect number of arguments | ||||
|                 return False | ||||
|             if len(self.args) > 1: | ||||
|                 # No signal requested, but filter requires specific signal | ||||
|                 return False | ||||
|         try: | ||||
|             command = os.readlink("/proc/%d/exe" % int(args[1])) | ||||
|             # NOTE(dprince): /proc/PID/exe may have ' (deleted)' on | ||||
|             # the end if an executable is updated or deleted | ||||
|             if command.endswith(" (deleted)"): | ||||
|                 command = command[:command.rindex(" ")] | ||||
|             if command != self.args[0]: | ||||
|                 # Affected executable does not match | ||||
|                 return False | ||||
|         except (ValueError, OSError): | ||||
|             # Incorrect PID | ||||
|             return False | ||||
|         return True | ||||
|  | ||||
|  | ||||
| class ReadFileFilter(CommandFilter): | ||||
|     """Specific filter for the utils.read_file_as_root call.""" | ||||
|  | ||||
|     def __init__(self, file_path, *args): | ||||
|         self.file_path = file_path | ||||
|         super(ReadFileFilter, self).__init__("/bin/cat", "root", *args) | ||||
|  | ||||
|     def match(self, userargs): | ||||
|         if userargs[0] != 'cat': | ||||
|             return False | ||||
|         if userargs[1] != self.file_path: | ||||
|             return False | ||||
|         if len(userargs) != 2: | ||||
|             return False | ||||
|         return True | ||||
| @@ -1,149 +0,0 @@ | ||||
| # vim: tabstop=4 shiftwidth=4 softtabstop=4 | ||||
|  | ||||
| # Copyright (c) 2011 OpenStack, LLC. | ||||
| # All Rights Reserved. | ||||
| # | ||||
| #    Licensed under the Apache License, Version 2.0 (the "License"); you may | ||||
| #    not use this file except in compliance with the License. You may obtain | ||||
| #    a copy of the License at | ||||
| # | ||||
| #         http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| #    Unless required by applicable law or agreed to in writing, software | ||||
| #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||||
| #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||||
| #    License for the specific language governing permissions and limitations | ||||
| #    under the License. | ||||
|  | ||||
|  | ||||
| import ConfigParser | ||||
| import logging | ||||
| import logging.handlers | ||||
| import os | ||||
| import string | ||||
|  | ||||
| from nova.rootwrap import filters | ||||
|  | ||||
|  | ||||
| class NoFilterMatched(Exception): | ||||
|     """This exception is raised when no filter matched.""" | ||||
|     pass | ||||
|  | ||||
|  | ||||
| class FilterMatchNotExecutable(Exception): | ||||
|     """ | ||||
|     This exception is raised when a filter matched but no executable was | ||||
|     found. | ||||
|     """ | ||||
|     def __init__(self, match=None, **kwargs): | ||||
|         self.match = match | ||||
|  | ||||
|  | ||||
| class RootwrapConfig(object): | ||||
|  | ||||
|     def __init__(self, config): | ||||
|         # filters_path | ||||
|         self.filters_path = config.get("DEFAULT", "filters_path").split(",") | ||||
|  | ||||
|         # exec_dirs | ||||
|         if config.has_option("DEFAULT", "exec_dirs"): | ||||
|             self.exec_dirs = config.get("DEFAULT", "exec_dirs").split(",") | ||||
|         else: | ||||
|             # Use system PATH if exec_dirs is not specified | ||||
|             self.exec_dirs = os.environ["PATH"].split(':') | ||||
|  | ||||
|         # syslog_log_facility | ||||
|         if config.has_option("DEFAULT", "syslog_log_facility"): | ||||
|             v = config.get("DEFAULT", "syslog_log_facility") | ||||
|             facility_names = logging.handlers.SysLogHandler.facility_names | ||||
|             self.syslog_log_facility = getattr(logging.handlers.SysLogHandler, | ||||
|                                                v, None) | ||||
|             if self.syslog_log_facility is None and v in facility_names: | ||||
|                 self.syslog_log_facility = facility_names.get(v) | ||||
|             if self.syslog_log_facility is None: | ||||
|                 raise ValueError('Unexpected syslog_log_facility: %s' % v) | ||||
|         else: | ||||
|             default_facility = logging.handlers.SysLogHandler.LOG_SYSLOG | ||||
|             self.syslog_log_facility = default_facility | ||||
|  | ||||
|         # syslog_log_level | ||||
|         if config.has_option("DEFAULT", "syslog_log_level"): | ||||
|             v = config.get("DEFAULT", "syslog_log_level") | ||||
|             self.syslog_log_level = logging.getLevelName(v.upper()) | ||||
|             if (self.syslog_log_level == "Level %s" % v.upper()): | ||||
|                 raise ValueError('Unexepected syslog_log_level: %s' % v) | ||||
|         else: | ||||
|             self.syslog_log_level = logging.ERROR | ||||
|  | ||||
|         # use_syslog | ||||
|         if config.has_option("DEFAULT", "use_syslog"): | ||||
|             self.use_syslog = config.getboolean("DEFAULT", "use_syslog") | ||||
|         else: | ||||
|             self.use_syslog = False | ||||
|  | ||||
|  | ||||
| def setup_syslog(execname, facility, level): | ||||
|     rootwrap_logger = logging.getLogger() | ||||
|     rootwrap_logger.setLevel(level) | ||||
|     handler = logging.handlers.SysLogHandler(address='/dev/log', | ||||
|                                              facility=facility) | ||||
|     handler.setFormatter(logging.Formatter( | ||||
|                          os.path.basename(execname) + ': %(message)s')) | ||||
|     rootwrap_logger.addHandler(handler) | ||||
|  | ||||
|  | ||||
| def build_filter(class_name, *args): | ||||
|     """Returns a filter object of class class_name.""" | ||||
|     if not hasattr(filters, class_name): | ||||
|         logging.warning("Skipping unknown filter class (%s) specified " | ||||
|                         "in filter definitions" % class_name) | ||||
|         return None | ||||
|     filterclass = getattr(filters, class_name) | ||||
|     return filterclass(*args) | ||||
|  | ||||
|  | ||||
| def load_filters(filters_path): | ||||
|     """Load filters from a list of directories.""" | ||||
|     filterlist = [] | ||||
|     for filterdir in filters_path: | ||||
|         if not os.path.isdir(filterdir): | ||||
|             continue | ||||
|         for filterfile in os.listdir(filterdir): | ||||
|             filterconfig = ConfigParser.RawConfigParser() | ||||
|             filterconfig.read(os.path.join(filterdir, filterfile)) | ||||
|             for (name, value) in filterconfig.items("Filters"): | ||||
|                 filterdefinition = [string.strip(s) for s in value.split(',')] | ||||
|                 newfilter = build_filter(*filterdefinition) | ||||
|                 if newfilter is None: | ||||
|                     continue | ||||
|                 newfilter.name = name | ||||
|                 filterlist.append(newfilter) | ||||
|     return filterlist | ||||
|  | ||||
|  | ||||
| def match_filter(filters, userargs, exec_dirs=[]): | ||||
|     """ | ||||
|     Checks user command and arguments through command filters and | ||||
|     returns the first matching filter. | ||||
|     Raises NoFilterMatched if no filter matched. | ||||
|     Raises FilterMatchNotExecutable if no executable was found for the | ||||
|     best filter match. | ||||
|     """ | ||||
|     first_not_executable_filter = None | ||||
|  | ||||
|     for f in filters: | ||||
|         if f.match(userargs): | ||||
|             # Try other filters if executable is absent | ||||
|             if not f.get_exec(exec_dirs=exec_dirs): | ||||
|                 if not first_not_executable_filter: | ||||
|                     first_not_executable_filter = f | ||||
|                 continue | ||||
|             # Otherwise return matching filter for execution | ||||
|             return f | ||||
|  | ||||
|     if first_not_executable_filter: | ||||
|         # A filter matched, but no executable was found for it | ||||
|         raise FilterMatchNotExecutable(match=first_not_executable_filter) | ||||
|  | ||||
|     # No filter matched | ||||
|     raise NoFilterMatched() | ||||
| @@ -1,198 +0,0 @@ | ||||
| # vim: tabstop=4 shiftwidth=4 softtabstop=4 | ||||
|  | ||||
| #    Copyright 2011 OpenStack LLC | ||||
| # | ||||
| #    Licensed under the Apache License, Version 2.0 (the "License"); you may | ||||
| #    not use this file except in compliance with the License. You may obtain | ||||
| #    a copy of the License at | ||||
| # | ||||
| #         http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| #    Unless required by applicable law or agreed to in writing, software | ||||
| #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||||
| #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||||
| #    License for the specific language governing permissions and limitations | ||||
| #    under the License. | ||||
|  | ||||
| import ConfigParser | ||||
| import logging | ||||
| import logging.handlers | ||||
| import os | ||||
| import subprocess | ||||
|  | ||||
| from nova.rootwrap import filters | ||||
| from nova.rootwrap import wrapper | ||||
| from nova import test | ||||
|  | ||||
|  | ||||
| class RootwrapTestCase(test.TestCase): | ||||
|  | ||||
|     def setUp(self): | ||||
|         super(RootwrapTestCase, self).setUp() | ||||
|         self.filters = [ | ||||
|             filters.RegExpFilter("/bin/ls", "root", 'ls', '/[a-z]+'), | ||||
|             filters.CommandFilter("/usr/bin/foo_bar_not_exist", "root"), | ||||
|             filters.RegExpFilter("/bin/cat", "root", 'cat', '/[a-z]+'), | ||||
|             filters.CommandFilter("/nonexistent/cat", "root"), | ||||
|             filters.CommandFilter("/bin/cat", "root")  # Keep this one last | ||||
|             ] | ||||
|  | ||||
|     def test_RegExpFilter_match(self): | ||||
|         usercmd = ["ls", "/root"] | ||||
|         filtermatch = wrapper.match_filter(self.filters, usercmd) | ||||
|         self.assertFalse(filtermatch is None) | ||||
|         self.assertEqual(filtermatch.get_command(usercmd), | ||||
|             ["/bin/ls", "/root"]) | ||||
|  | ||||
|     def test_RegExpFilter_reject(self): | ||||
|         usercmd = ["ls", "root"] | ||||
|         self.assertRaises(wrapper.NoFilterMatched, | ||||
|             wrapper.match_filter, self.filters, usercmd) | ||||
|  | ||||
|     def test_missing_command(self): | ||||
|         valid_but_missing = ["foo_bar_not_exist"] | ||||
|         invalid = ["foo_bar_not_exist_and_not_matched"] | ||||
|         self.assertRaises(wrapper.FilterMatchNotExecutable, | ||||
|             wrapper.match_filter, self.filters, valid_but_missing) | ||||
|         self.assertRaises(wrapper.NoFilterMatched, | ||||
|             wrapper.match_filter, self.filters, invalid) | ||||
|  | ||||
|     def _test_DnsmasqFilter(self, filter_class, config_file_arg): | ||||
|         usercmd = ['env', config_file_arg + '=A', 'NETWORK_ID=foobar', | ||||
|                    'dnsmasq', 'foo'] | ||||
|         f = filter_class("/usr/bin/dnsmasq", "root") | ||||
|         self.assertTrue(f.match(usercmd)) | ||||
|         self.assertEqual(f.get_command(usercmd), ['/usr/bin/dnsmasq', 'foo']) | ||||
|         env = f.get_environment(usercmd) | ||||
|         self.assertEqual(env.get(config_file_arg), 'A') | ||||
|         self.assertEqual(env.get('NETWORK_ID'), 'foobar') | ||||
|  | ||||
|     def test_DnsmasqFilter(self): | ||||
|         self._test_DnsmasqFilter(filters.DnsmasqFilter, 'CONFIG_FILE') | ||||
|  | ||||
|     def test_DeprecatedDnsmasqFilter(self): | ||||
|         self._test_DnsmasqFilter(filters.DeprecatedDnsmasqFilter, 'FLAGFILE') | ||||
|  | ||||
|     def test_KillFilter(self): | ||||
|         if not os.path.exists("/proc/%d" % os.getpid()): | ||||
|             self.skipTest("Test requires /proc filesystem (procfs)") | ||||
|         p = subprocess.Popen(["cat"], stdin=subprocess.PIPE, | ||||
|                              stdout=subprocess.PIPE, | ||||
|                              stderr=subprocess.STDOUT) | ||||
|         try: | ||||
|             f = filters.KillFilter("root", "/bin/cat", "-9", "-HUP") | ||||
|             f2 = filters.KillFilter("root", "/usr/bin/cat", "-9", "-HUP") | ||||
|             usercmd = ['kill', '-ALRM', p.pid] | ||||
|             # Incorrect signal should fail | ||||
|             self.assertFalse(f.match(usercmd) or f2.match(usercmd)) | ||||
|             usercmd = ['kill', p.pid] | ||||
|             # Providing no signal should fail | ||||
|             self.assertFalse(f.match(usercmd) or f2.match(usercmd)) | ||||
|             # Providing matching signal should be allowed | ||||
|             usercmd = ['kill', '-9', p.pid] | ||||
|             self.assertTrue(f.match(usercmd) or f2.match(usercmd)) | ||||
|  | ||||
|             f = filters.KillFilter("root", "/bin/cat") | ||||
|             f2 = filters.KillFilter("root", "/usr/bin/cat") | ||||
|             usercmd = ['kill', os.getpid()] | ||||
|             # Our own PID does not match /bin/sleep, so it should fail | ||||
|             self.assertFalse(f.match(usercmd) or f2.match(usercmd)) | ||||
|             usercmd = ['kill', 999999] | ||||
|             # Nonexistent PID should fail | ||||
|             self.assertFalse(f.match(usercmd) or f2.match(usercmd)) | ||||
|             usercmd = ['kill', p.pid] | ||||
|             # Providing no signal should work | ||||
|             self.assertTrue(f.match(usercmd) or f2.match(usercmd)) | ||||
|         finally: | ||||
|             # Terminate the "cat" process and wait for it to finish | ||||
|             p.terminate() | ||||
|             p.wait() | ||||
|  | ||||
|     def test_KillFilter_no_raise(self): | ||||
|         # Makes sure ValueError from bug 926412 is gone. | ||||
|         f = filters.KillFilter("root", "") | ||||
|         # Providing anything other than kill should be False | ||||
|         usercmd = ['notkill', 999999] | ||||
|         self.assertFalse(f.match(usercmd)) | ||||
|         # Providing something that is not a pid should be False | ||||
|         usercmd = ['kill', 'notapid'] | ||||
|         self.assertFalse(f.match(usercmd)) | ||||
|  | ||||
|     def test_KillFilter_deleted_exe(self): | ||||
|         # Makes sure deleted exe's are killed correctly. | ||||
|         # See bug #967931. | ||||
|         def fake_readlink(blah): | ||||
|             return '/bin/commandddddd (deleted)' | ||||
|  | ||||
|         f = filters.KillFilter("root", "/bin/commandddddd") | ||||
|         usercmd = ['kill', 1234] | ||||
|         # Providing no signal should work | ||||
|         self.stubs.Set(os, 'readlink', fake_readlink) | ||||
|         self.assertTrue(f.match(usercmd)) | ||||
|  | ||||
|     def test_ReadFileFilter(self): | ||||
|         goodfn = '/good/file.name' | ||||
|         f = filters.ReadFileFilter(goodfn) | ||||
|         usercmd = ['cat', '/bad/file'] | ||||
|         self.assertFalse(f.match(['cat', '/bad/file'])) | ||||
|         usercmd = ['cat', goodfn] | ||||
|         self.assertEqual(f.get_command(usercmd), ['/bin/cat', goodfn]) | ||||
|         self.assertTrue(f.match(usercmd)) | ||||
|  | ||||
|     def test_exec_dirs_search(self): | ||||
|         # This test supposes you have /bin/cat or /usr/bin/cat locally | ||||
|         f = filters.CommandFilter("cat", "root") | ||||
|         usercmd = ['cat', '/f'] | ||||
|         self.assertTrue(f.match(usercmd)) | ||||
|         self.assertTrue(f.get_command(usercmd, exec_dirs=['/bin', | ||||
|             '/usr/bin']) in (['/bin/cat', '/f'], ['/usr/bin/cat', '/f'])) | ||||
|  | ||||
|     def test_skips(self): | ||||
|         # Check that all filters are skipped and that the last matches | ||||
|         usercmd = ["cat", "/"] | ||||
|         filtermatch = wrapper.match_filter(self.filters, usercmd) | ||||
|         self.assertTrue(filtermatch is self.filters[-1]) | ||||
|  | ||||
|     def test_RootwrapConfig(self): | ||||
|         raw = ConfigParser.RawConfigParser() | ||||
|  | ||||
|         # Empty config should raise ConfigParser.Error | ||||
|         self.assertRaises(ConfigParser.Error, wrapper.RootwrapConfig, raw) | ||||
|  | ||||
|         # Check default values | ||||
|         raw.set('DEFAULT', 'filters_path', '/a,/b') | ||||
|         config = wrapper.RootwrapConfig(raw) | ||||
|         self.assertEqual(config.filters_path, ['/a', '/b']) | ||||
|         self.assertEqual(config.exec_dirs, os.environ["PATH"].split(':')) | ||||
|         self.assertFalse(config.use_syslog) | ||||
|         self.assertEqual(config.syslog_log_facility, | ||||
|                          logging.handlers.SysLogHandler.LOG_SYSLOG) | ||||
|         self.assertEqual(config.syslog_log_level, logging.ERROR) | ||||
|  | ||||
|         # Check general values | ||||
|         raw.set('DEFAULT', 'exec_dirs', '/a,/x') | ||||
|         config = wrapper.RootwrapConfig(raw) | ||||
|         self.assertEqual(config.exec_dirs, ['/a', '/x']) | ||||
|  | ||||
|         raw.set('DEFAULT', 'use_syslog', 'oui') | ||||
|         self.assertRaises(ValueError, wrapper.RootwrapConfig, raw) | ||||
|         raw.set('DEFAULT', 'use_syslog', 'true') | ||||
|         config = wrapper.RootwrapConfig(raw) | ||||
|         self.assertTrue(config.use_syslog) | ||||
|  | ||||
|         raw.set('DEFAULT', 'syslog_log_facility', 'moo') | ||||
|         self.assertRaises(ValueError, wrapper.RootwrapConfig, raw) | ||||
|         raw.set('DEFAULT', 'syslog_log_facility', 'local0') | ||||
|         config = wrapper.RootwrapConfig(raw) | ||||
|         self.assertEqual(config.syslog_log_facility, | ||||
|                          logging.handlers.SysLogHandler.LOG_LOCAL0) | ||||
|         raw.set('DEFAULT', 'syslog_log_facility', 'LOG_AUTH') | ||||
|         config = wrapper.RootwrapConfig(raw) | ||||
|         self.assertEqual(config.syslog_log_facility, | ||||
|                          logging.handlers.SysLogHandler.LOG_AUTH) | ||||
|  | ||||
|         raw.set('DEFAULT', 'syslog_log_level', 'bar') | ||||
|         self.assertRaises(ValueError, wrapper.RootwrapConfig, raw) | ||||
|         raw.set('DEFAULT', 'syslog_log_level', 'INFO') | ||||
|         config = wrapper.RootwrapConfig(raw) | ||||
|         self.assertEqual(config.syslog_log_level, logging.INFO) | ||||
		Reference in New Issue
	
	Block a user
	 Jenkins
					Jenkins