WARN and use defaults when no policy file is found

Rather than refuse to start, it's nicer to use a sane set of default
policies when no policy file is found.

Fixes bug 1043482

Change-Id: I849737c61c0266952d931395fbc2ad3745c46f6e
This commit is contained in:
Brian Waldon 2012-08-29 14:34:10 -07:00
parent 3be563029a
commit 4f36fba996
4 changed files with 107 additions and 11 deletions

View File

@ -28,7 +28,7 @@ from glance.openstack.common import policy
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
policy_opts = ( policy_opts = (
cfg.StrOpt('policy_file', default=None), cfg.StrOpt('policy_file', default='policy.json'),
cfg.StrOpt('policy_default_rule', default='default'), cfg.StrOpt('policy_default_rule', default='default'),
) )
@ -36,6 +36,12 @@ CONF = cfg.CONF
CONF.register_opts(policy_opts) CONF.register_opts(policy_opts)
DEFAULT_RULES = {
'default': [[]],
'manage_image_cache': [['role:admin']]
}
class Enforcer(object): class Enforcer(object):
"""Responsible for loading and enforcing rules""" """Responsible for loading and enforcing rules"""
@ -52,20 +58,23 @@ class Enforcer(object):
def load_rules(self): def load_rules(self):
"""Set the rules found in the json file on disk""" """Set the rules found in the json file on disk"""
rules = self._read_policy_file() if self.policy_path:
rules = self._read_policy_file()
LOG.debug(_('Loaded policy rules: %s') % rules)
else:
rules = DEFAULT_RULES
LOG.debug(_('Using default policy rules: %s') % rules)
self.set_rules(rules) self.set_rules(rules)
@staticmethod @staticmethod
def _find_policy_file(): def _find_policy_file():
"""Locate the policy json data file""" """Locate the policy json data file"""
if CONF.policy_file: policy_file = CONF.find_file(CONF.policy_file)
return CONF.policy_file if policy_file:
return policy_file
policy_file = CONF.find_file('policy.json') else:
if not policy_file: LOG.warn(_('Unable to find policy file'))
raise cfg.ConfigFilesNotFoundError(('policy.json',)) return None
return policy_file
def _read_policy_file(self): def _read_policy_file(self):
"""Read contents of the policy file """Read contents of the policy file

View File

@ -21,6 +21,7 @@ import tempfile
from glance import client from glance import client
from glance.common import client as base_client from glance.common import client as base_client
from glance.common import config
from glance.common import exception from glance.common import exception
from glance.common import utils from glance.common import utils
from glance import context from glance import context
@ -37,6 +38,9 @@ _gen_uuid = utils.generate_uuid
UUID1 = _gen_uuid() UUID1 = _gen_uuid()
UUID2 = _gen_uuid() UUID2 = _gen_uuid()
#NOTE(bcwaldon): needed to init config_dir cli opt
config.parse_args()
class TestBadClients(test_utils.BaseTestCase): class TestBadClients(test_utils.BaseTestCase):

View File

@ -0,0 +1,79 @@
# Copyright 2012 OpenStack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the 'License'); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os.path
import glance.api.policy
from glance.common import exception
import glance.context
from glance.tests import utils as test_utils
from glance.tests.unit import base
class TestPolicyEnforcer(base.IsolatedUnitTest):
def test_policy_file_default_rules_default_location(self):
enforcer = glance.api.policy.Enforcer()
context = glance.context.RequestContext(roles=[])
enforcer.enforce(context, 'get_image', {})
def test_policy_file_custom_rules_default_location(self):
rules = {"get_image": [["false:false"]]}
self.set_policy_rules(rules)
enforcer = glance.api.policy.Enforcer()
context = glance.context.RequestContext(roles=[])
self.assertRaises(exception.Forbidden,
enforcer.enforce, context, 'get_image', {})
def test_policy_file_custom_location(self):
self.config(policy_file=os.path.join(self.test_dir, 'gobble.gobble'))
rules = {"get_image": [["false:false"]]}
self.set_policy_rules(rules)
enforcer = glance.api.policy.Enforcer()
context = glance.context.RequestContext(roles=[])
self.assertRaises(exception.Forbidden,
enforcer.enforce, context, 'get_image', {})
class TestPolicyEnforcerNoFile(test_utils.BaseTestCase):
def test_policy_file_specified_but_not_found(self):
"""Missing defined policy file should result in a default ruleset"""
self.config(policy_file='gobble.gobble')
enforcer = glance.api.policy.Enforcer()
context = glance.context.RequestContext(roles=[])
enforcer.enforce(context, 'get_image', {})
self.assertRaises(exception.Forbidden,
enforcer.enforce, context, 'manage_image_cache', {})
admin_context = glance.context.RequestContext(roles=['admin'])
enforcer.enforce(admin_context, 'manage_image_cache', {})
def test_policy_file_default_not_found(self):
"""Missing default policy file should result in a default ruleset"""
enforcer = glance.api.policy.Enforcer()
context = glance.context.RequestContext(roles=[])
enforcer.enforce(context, 'get_image', {})
self.assertRaises(exception.Forbidden,
enforcer.enforce, context, 'manage_image_cache', {})
admin_context = glance.context.RequestContext(roles=['admin'])
enforcer.enforce(admin_context, 'manage_image_cache', {})

View File

@ -28,7 +28,6 @@ import unittest
import nose.plugins.skip import nose.plugins.skip
# NOTE(ameade): this import is necessary so that common cfg opts are registered
from glance.common import config from glance.common import config
from glance.common import utils from glance.common import utils
from glance.common import wsgi from glance.common import wsgi
@ -55,6 +54,11 @@ class BaseTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
super(BaseTestCase, self).setUp() super(BaseTestCase, self).setUp()
#NOTE(bcwaldon): parse_args has to be called to register certain
# command-line options - specifically we need config_dir for
# the following policy tests
config.parse_args()
def tearDown(self): def tearDown(self):
super(BaseTestCase, self).tearDown() super(BaseTestCase, self).tearDown()
CONF.reset() CONF.reset()