 f784d83703
			
		
	
	f784d83703
	
	
	
		
			
			fix N402 (single line docstrings should end in a period) for rest of nova files Change-Id: I57d0d9ab01345dd83e544e476d79d2c2ca68ee51
		
			
				
	
	
		
			131 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			131 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # vim: tabstop=4 shiftwidth=4 softtabstop=4
 | |
| 
 | |
| # Copyright (c) 2011 OpenStack, LLC.
 | |
| # All Rights Reserved.
 | |
| #
 | |
| #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 | |
| #    not use this file except in compliance with the License. You may obtain
 | |
| #    a copy of the License at
 | |
| #
 | |
| #         http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| #    Unless required by applicable law or agreed to in writing, software
 | |
| #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | |
| #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | |
| #    License for the specific language governing permissions and limitations
 | |
| #    under the License.
 | |
| 
 | |
| """Policy Engine For Nova."""
 | |
| 
 | |
| import os.path
 | |
| 
 | |
| from nova import exception
 | |
| from nova.openstack.common import cfg
 | |
| from nova.openstack.common import policy
 | |
| from nova import utils
 | |
| 
 | |
| 
 | |
| policy_opts = [
 | |
|     cfg.StrOpt('policy_file',
 | |
|                default='policy.json',
 | |
|                help=_('JSON file representing policy')),
 | |
|     cfg.StrOpt('policy_default_rule',
 | |
|                default='default',
 | |
|                help=_('Rule checked when requested rule is not found')),
 | |
|     ]
 | |
| 
 | |
| CONF = cfg.CONF
 | |
| CONF.register_opts(policy_opts)
 | |
| 
 | |
| _POLICY_PATH = None
 | |
| _POLICY_CACHE = {}
 | |
| 
 | |
| 
 | |
| def reset():
 | |
|     global _POLICY_PATH
 | |
|     global _POLICY_CACHE
 | |
|     _POLICY_PATH = None
 | |
|     _POLICY_CACHE = {}
 | |
|     policy.reset()
 | |
| 
 | |
| 
 | |
| def init():
 | |
|     global _POLICY_PATH
 | |
|     global _POLICY_CACHE
 | |
|     if not _POLICY_PATH:
 | |
|         _POLICY_PATH = CONF.policy_file
 | |
|         if not os.path.exists(_POLICY_PATH):
 | |
|             _POLICY_PATH = CONF.find_file(_POLICY_PATH)
 | |
|         if not _POLICY_PATH:
 | |
|             raise exception.ConfigNotFound(path=CONF.policy_file)
 | |
|     utils.read_cached_file(_POLICY_PATH, _POLICY_CACHE,
 | |
|                            reload_func=_set_rules)
 | |
| 
 | |
| 
 | |
| def _set_rules(data):
 | |
|     default_rule = CONF.policy_default_rule
 | |
|     policy.set_rules(policy.Rules.load_json(data, default_rule))
 | |
| 
 | |
| 
 | |
| def enforce(context, action, target, do_raise=True):
 | |
|     """Verifies that the action is valid on the target in this context.
 | |
| 
 | |
|        :param context: nova context
 | |
|        :param action: string representing the action to be checked
 | |
|            this should be colon separated for clarity.
 | |
|            i.e. ``compute:create_instance``,
 | |
|            ``compute:attach_volume``,
 | |
|            ``volume:attach_volume``
 | |
|        :param target: dictionary representing the object of the action
 | |
|            for object creation this should be a dictionary representing the
 | |
|            location of the object e.g. ``{'project_id': context.project_id}``
 | |
|        :param do_raise: if True (the default), raises PolicyNotAuthorized;
 | |
|            if False, returns False
 | |
| 
 | |
|        :raises nova.exception.PolicyNotAuthorized: if verification fails
 | |
|            and do_raise is True.
 | |
| 
 | |
|        :return: returns a non-False value (not necessarily "True") if
 | |
|            authorized, and the exact value False if not authorized and
 | |
|            do_raise is False.
 | |
|     """
 | |
|     init()
 | |
| 
 | |
|     credentials = context.to_dict()
 | |
| 
 | |
|     # Add the exception arguments if asked to do a raise
 | |
|     extra = {}
 | |
|     if do_raise:
 | |
|         extra.update(exc=exception.PolicyNotAuthorized, action=action)
 | |
| 
 | |
|     return policy.check(action, target, credentials, **extra)
 | |
| 
 | |
| 
 | |
| def check_is_admin(roles):
 | |
|     """Whether or not roles contains 'admin' role according to policy setting.
 | |
| 
 | |
|     """
 | |
|     init()
 | |
| 
 | |
|     target = {}
 | |
|     credentials = {'roles': roles}
 | |
| 
 | |
|     return policy.check('context_is_admin', target, credentials)
 | |
| 
 | |
| 
 | |
| @policy.register('is_admin')
 | |
| class IsAdminCheck(policy.Check):
 | |
|     """An explicit check for is_admin."""
 | |
| 
 | |
|     def __init__(self, kind, match):
 | |
|         """Initialize the check."""
 | |
| 
 | |
|         self.expected = (match.lower() == 'true')
 | |
| 
 | |
|         super(IsAdminCheck, self).__init__(kind, str(self.expected))
 | |
| 
 | |
|     def __call__(self, target, creds):
 | |
|         """Determine whether is_admin matches the requested value."""
 | |
| 
 | |
|         return creds['is_admin'] == self.expected
 |