Rule parser for property protections

This patch introduces the way protected properties
will be configured and parsed.

Related to bp api-v2-property-protection
docImpact

Change-Id: I3d24cacccf3f51b07a4090b8a5db1f2451090762
This commit is contained in:
iccha.sethi 2013-08-21 14:53:13 -04:00
parent 2687b677bf
commit e8440d1ee8
6 changed files with 320 additions and 0 deletions

View File

@ -107,6 +107,15 @@ workers = 1
# (string value). This setting needs to be the same for both
# glance-scrubber and glance-api.
#lock_path=<None>
#
# Property Protections config file
# This file contains the rules for property protections and the roles
# associated with it.
# If this config value is not specified, by default, property protections
# won't be enforced.
# If a value is specified and the file is not found, then an
# HTTPInternalServerError will be thrown.
#property_protection_file =
# Set a system wide quota for every user. This value is the total number
# of bytes that a user can use across all storage systems. A value of

View File

@ -0,0 +1,25 @@
# property-protections.conf.sample
# Specify regular expression for which properties will be protected in []
# For each section, specify CRUD permissions. You may refer to roles defined
# in policy.json
# The property rules will be applied in the order specified below. Once
# a match is found the remaining property rules will not be traversed through.
# WARNING:
# * If the reg ex specified below does not compile, then
# HTTPInternalServerErrors will be thrown. (Guide for reg ex python compiler used:
# http://docs.python.org/2/library/re.html#regular-expression-syntax)
# * If an operation(create, read, update, delete) is not specified or misspelt
# then that operation for the given regex is disabled for all roles.
# So, remember, with GREAT POWER comes GREAT RESPONSIBILITY!
[^x_.*]
create = admin,member
read = admin,member
update = admin,member
delete = admin,member
[.*]
create = admin
read = admin
update = admin
delete = admin

View File

@ -0,0 +1,101 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ConfigParser
import re
from oslo.config import cfg
import webob.exc
from glance.openstack.common import log as logging
CONFIG = ConfigParser.SafeConfigParser()
LOG = logging.getLogger(__name__)
property_opts = [
cfg.StrOpt('property_protection_file',
default=None,
help=_('The location of the property protection file.')),
]
CONF = cfg.CONF
CONF.register_opts(property_opts)
def is_property_protection_enabled():
return (CONF.property_protection_file is not None)
class PropertyRules(object):
def __init__(self):
self.rules = {}
if is_property_protection_enabled():
self._load_rules()
def _load_rules(self):
try:
conf_file = CONF.find_file(CONF.property_protection_file)
CONFIG.read(conf_file)
except Exception as e:
msg = _("Couldn't find property protection file %s:%s." %
(CONF.property_protection_file, e))
LOG.error(msg)
raise webob.exc.HTTPInternalServerError(explanation=msg)
operations = ['create', 'read', 'update', 'delete']
properties = CONFIG.sections()
for property_exp in properties:
property_dict = {}
compiled_rule = self._compile_rule(property_exp)
for operation in operations:
roles = CONFIG.get(property_exp, operation)
if roles:
roles = [role.strip() for role in roles.split(',')]
property_dict[operation] = roles
else:
property_dict[operation] = []
msg = _(('Property protection on operation %s for rule '
'%s is not found. No role will be allowed to '
'perform this operation.' %
(operation, property_exp)))
LOG.warn(msg)
self.rules[compiled_rule] = property_dict
def _compile_rule(self, rule):
try:
return re.compile(rule)
except Exception as e:
msg = _("Encountered a malfored property protection rule %s:%s."
% (rule, e))
LOG.error(msg)
raise webob.exc.HTTPInternalServerError(explanation=msg)
def check_property_rules(self, property_name, action, roles):
if not self.rules:
return True
if action not in ['create', 'read', 'update', 'delete']:
return False
for rule_exp, rule in self.rules.items():
if rule_exp.search(str(property_name)):
if set(roles).intersection(set(rule.get(action))):
return True
return False

View File

@ -0,0 +1,11 @@
[^x_owner_.*]
create = admin,member
read = admin,member
update = admin,member
delete = admin,member
[.*]
create = admin
read = admin
update = admin
delete = admin

View File

@ -0,0 +1,151 @@
# Copyright 2013 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import webob.exc
from glance.common import property_utils
from glance.tests import utils
class TestPropertyRules(utils.BaseTestCase):
def setUp(self):
super(TestPropertyRules, self).setUp()
self.set_property_protections()
def tearDown(self):
for section in property_utils.CONFIG.sections():
property_utils.CONFIG.remove_section(section)
super(TestPropertyRules, self).tearDown()
def test_is_property_protections_enabled_true(self):
self.config(property_protection_file="property-protections.conf")
self.assertTrue(property_utils.is_property_protection_enabled())
def test_is_property_protections_enabled_false(self):
self.config(property_protection_file=None)
self.assertFalse(property_utils.is_property_protection_enabled())
def test_property_protection_file_doesnt_exist(self):
self.config(property_protection_file='fake-file.conf')
self.assertRaises(webob.exc.HTTPInternalServerError,
property_utils.PropertyRules)
def test_property_protection_with_malformed_rule(self):
malformed_rules = {'^[0-9)': {'create': ['fake-role'],
'read': ['fake-role'],
'update': ['fake-role'],
'delete': ['fake-role']}}
self.set_property_protection_rules(malformed_rules)
self.assertRaises(webob.exc.HTTPInternalServerError,
property_utils.PropertyRules)
def test_property_protection_with_missing_operation(self):
rules_with_missing_operation = {'^[0-9]': {'create': ['fake-role'],
'update': ['fake-role'],
'delete': ['fake-role']}}
self.set_property_protection_rules(rules_with_missing_operation)
self.assertRaises(webob.exc.HTTPInternalServerError,
property_utils.PropertyRules)
def test_property_protection_with_misspelt_operation(self):
rules_with_misspelt_operation = {'^[0-9]': {'create': ['fake-role'],
'rade': ['fake-role'],
'update': ['fake-role'],
'delete': ['fake-role']}}
self.set_property_protection_rules(rules_with_misspelt_operation)
self.assertRaises(webob.exc.HTTPInternalServerError,
property_utils.PropertyRules)
def test_property_protection_with_whitespace(self):
rules_whitespace = {
'^test_prop.*': {
'create': ['member ,fake-role'],
'read': ['fake-role, member'],
'update': ['fake-role, member'],
'delete': ['fake-role, member']
}
}
self.set_property_protection_rules(rules_whitespace)
self.rules_checker = property_utils.PropertyRules()
self.assertTrue(self.rules_checker.check_property_rules('test_prop_1',
'read', ['member']))
self.assertTrue(self.rules_checker.check_property_rules('test_prop_1',
'read', ['fake-role']))
def test_check_property_rules_invalid_action(self):
self.rules_checker = property_utils.PropertyRules()
self.assertFalse(self.rules_checker.check_property_rules('test_prop',
'hall', ['admin']))
def test_check_property_rules_read_permitted_admin_role(self):
self.rules_checker = property_utils.PropertyRules()
self.assertTrue(self.rules_checker.check_property_rules('test_prop',
'read', ['admin']))
def test_check_property_rules_read_permitted_specific_role(self):
self.rules_checker = property_utils.PropertyRules()
self.assertTrue(self.rules_checker.check_property_rules(
'x_owner_prop', 'read', ['member']))
def test_check_property_rules_read_unpermitted_role(self):
self.rules_checker = property_utils.PropertyRules()
self.assertFalse(self.rules_checker.check_property_rules('test_prop',
'read', ['member']))
def test_check_property_rules_create_permitted_admin_role(self):
self.rules_checker = property_utils.PropertyRules()
self.assertTrue(self.rules_checker.check_property_rules('test_prop',
'create', ['admin']))
def test_check_property_rules_create_permitted_specific_role(self):
self.rules_checker = property_utils.PropertyRules()
self.assertTrue(self.rules_checker.check_property_rules(
'x_owner_prop', 'create', ['member']))
def test_check_property_rules_create_unpermitted_role(self):
self.rules_checker = property_utils.PropertyRules()
self.assertFalse(self.rules_checker.check_property_rules('test_prop',
'create', ['member']))
def test_check_property_rules_update_permitted_admin_role(self):
self.rules_checker = property_utils.PropertyRules()
self.assertTrue(self.rules_checker.check_property_rules('test_prop',
'update', ['admin']))
def test_check_property_rules_update_permitted_specific_role(self):
self.rules_checker = property_utils.PropertyRules()
self.assertTrue(self.rules_checker.check_property_rules(
'x_owner_prop', 'update', ['member']))
def test_check_property_rules_update_unpermitted_role(self):
self.rules_checker = property_utils.PropertyRules()
self.assertFalse(self.rules_checker.check_property_rules('test_prop',
'update', ['member']))
def test_check_property_rules_delete_permitted_admin_role(self):
self.rules_checker = property_utils.PropertyRules()
self.assertTrue(self.rules_checker.check_property_rules('test_prop',
'delete', ['admin']))
def test_check_property_rules_delete_permitted_specific_role(self):
self.rules_checker = property_utils.PropertyRules()
self.assertTrue(self.rules_checker.check_property_rules(
'x_owner_prop', 'delete', ['member']))
def test_check_property_rules_delete_unpermitted_role(self):
self.rules_checker = property_utils.PropertyRules()
self.assertFalse(self.rules_checker.check_property_rules('test_prop',
'delete', ['member']))

View File

@ -21,11 +21,13 @@ import errno
import functools
import os
import shlex
import shutil
import socket
import StringIO
import subprocess
import sys
import fixtures
from oslo.config import cfg
import stubout
import testtools
@ -51,12 +53,33 @@ class BaseTestCase(testtools.TestCase):
self.addCleanup(CONF.reset)
self.stubs = stubout.StubOutForTesting()
self.stubs.Set(exception, '_FATAL_EXCEPTION_FORMAT_ERRORS', True)
self.test_dir = self.useFixture(fixtures.TempDir()).path
def tearDown(self):
self.stubs.UnsetAll()
self.stubs.SmartUnsetAll()
super(BaseTestCase, self).tearDown()
def set_property_protections(self):
self.property_file = self._copy_data_file('property-protections.conf',
self.test_dir)
self.config(property_protection_file=self.property_file)
def _copy_data_file(self, file_name, dst_dir):
src_file_name = os.path.join('glance/tests/etc', file_name)
shutil.copy(src_file_name, dst_dir)
dst_file_name = os.path.join(dst_dir, file_name)
return dst_file_name
def set_property_protection_rules(self, rules):
f = open(self.property_file, 'w')
for rule_key in rules.keys():
f.write('[%s]\n' % rule_key)
for operation in rules[rule_key].keys():
roles_str = ','.join(rules[rule_key][operation])
f.write('%s = %s\n' % (operation, roles_str))
f.close()
def config(self, **kw):
"""
Override some configuration values.