diff --git a/cinder/policy.py b/cinder/policy.py index 5e863fd4f6a..f7c106a9e0a 100644 --- a/cinder/policy.py +++ b/cinder/policy.py @@ -16,6 +16,8 @@ """Policy Engine For Cinder""" import sys +import typing +from typing import Optional, Type # noqa: H301 from oslo_config import cfg from oslo_log import log as logging @@ -23,6 +25,8 @@ from oslo_policy import opts as policy_opts from oslo_policy import policy from oslo_utils import excutils + +from cinder import context from cinder import exception from cinder import policies @@ -31,10 +35,10 @@ LOG = logging.getLogger(__name__) DEFAULT_POLICY_FILENAME = 'policy.yaml' policy_opts.set_defaults(cfg.CONF, DEFAULT_POLICY_FILENAME) -_ENFORCER = None +_ENFORCER: Optional[policy.Enforcer] = None -def reset(): +def reset() -> None: global _ENFORCER if _ENFORCER: _ENFORCER.clear() @@ -42,7 +46,7 @@ def reset(): def init(use_conf=True, - suppress_deprecation_warnings=False): + suppress_deprecation_warnings: bool = False) -> None: """Init an Enforcer class. :param use_conf: Whether to load rules from config file. @@ -58,8 +62,10 @@ def init(use_conf=True, register_rules(_ENFORCER) _ENFORCER.load_rules() + _ENFORCER = typing.cast(policy.Enforcer, _ENFORCER) -def enforce(context, action, target): + +def enforce(context, action: str, target: dict): """Verifies that the action is valid on the target in this context. :param context: cinder context @@ -77,6 +83,7 @@ def enforce(context, action, target): :raises PolicyNotAuthorized: if verification fails. """ init() + assert _ENFORCER is not None try: return _ENFORCER.enforce(action, @@ -89,7 +96,9 @@ def enforce(context, action, target): raise exception.PolicyNotAuthorized(action=action) -def set_rules(rules, overwrite=True, use_conf=False): +def set_rules(rules: dict, + overwrite: bool = True, + use_conf: bool = False) -> None: """Set rules based on the provided dict of rules. :param rules: New rules to use. It should be an instance of dict. @@ -99,6 +108,8 @@ def set_rules(rules, overwrite=True, use_conf=False): """ init(use_conf=False) + assert _ENFORCER is not None + _ENFORCER.set_rules(rules, overwrite, use_conf) @@ -111,7 +122,7 @@ def register_rules(enforcer): enforcer.register_defaults(policies.list_rules()) -def get_enforcer(): +def get_enforcer() -> policy.Enforcer: # This method is for use by oslopolicy CLI scripts. Those scripts need the # 'output-file' and 'namespace' options, but having those in sys.argv means # loading the Cinder config options will fail as those are not expected to @@ -128,10 +139,15 @@ def get_enforcer(): cfg.CONF(conf_args, project='cinder') init() + assert _ENFORCER is not None return _ENFORCER -def authorize(context, action, target, do_raise=True, exc=None): +def authorize(context, + action: str, + target: dict, + do_raise: bool = True, + exc: Optional[Type[Exception]] = None): """Verifies that the action is valid on the target in this context. :param context: cinder context @@ -161,6 +177,8 @@ def authorize(context, action, target, do_raise=True, exc=None): do_raise is False. """ init() + assert _ENFORCER is not None + if not exc: exc = exception.PolicyNotAuthorized try: @@ -178,9 +196,11 @@ def authorize(context, action, target, do_raise=True, exc=None): return result -def check_is_admin(context): +def check_is_admin(context: 'context.RequestContext'): """Whether or not user is admin according to policy setting.""" init() + assert _ENFORCER is not None + # the target is user-self credentials = context.to_policy_values() target = credentials diff --git a/mypy-files.txt b/mypy-files.txt index 27587898a7b..dfae267dc9e 100644 --- a/mypy-files.txt +++ b/mypy-files.txt @@ -11,6 +11,7 @@ cinder/image/image_utils.py cinder/exception.py cinder/manager.py cinder/objects/backup.py +cinder/policy.py cinder/scheduler/base_handler.py cinder/scheduler/base_weight.py cinder/scheduler/evaluator/evaluator.py