Merge monasca-common code into the monasca-api

This change copies the code from monasca-common used by the 3
monasca APIs into monasca-api for the Merge-APIs target.

After mergin the APIs the duplicated code can be removed from
monasca-common.

Change-Id: I52d36fad846637baf10516f5cbbedc541d4c2064
Story: 2003881
Task: 30427
This commit is contained in:
Martin Chacon Piza 2019-04-09 13:47:25 +02:00
parent fb23e914b1
commit 2485e39b53
23 changed files with 1012 additions and 13 deletions

View File

@ -93,6 +93,7 @@ PyYAML==3.12
reno==2.5.0
requests==2.14.2
requestsexceptions==1.2.0
requests-mock==1.2.0
restructuredtext-lint==1.1.1
rfc3986==0.3.1
six==1.10.0

View File

@ -15,9 +15,8 @@
import falcon
from monasca_common.policy import policy_engine as policy
from monasca_api.api.core import request_context
from monasca_api.common.policy import policy_engine as policy
from monasca_api.common.repositories import constants
from monasca_api import policies
from monasca_api.v2.common import exceptions

View File

@ -13,9 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from monasca_common.policy import policy_engine as policy
from oslo_context import context
from monasca_api.common.policy import policy_engine as policy
from monasca_api import policies
policy.POLICIES = policies

View File

@ -14,7 +14,7 @@
from oslo_utils import timeutils
from monasca_common.rest import utils as rest_utils
from monasca_api.common.rest import utils as rest_utils
def transform(metrics, tenant_id, region):

View File

View File

@ -0,0 +1,46 @@
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""oslo.i18n integration module.
See https://docs.openstack.org/oslo.i18n/latest/user/index.html
"""
import oslo_i18n
DOMAIN = 'monasca'
_translators = oslo_i18n.TranslatorFactory(domain=DOMAIN)
# The primary translation function using the well-known name "_"
_ = _translators.primary
# Translators for log levels.
#
# The abbreviated names are meant to reflect the usual use of a short
# name like '_'. The "L" is for "log" and the other letter comes from
# the level.
_LI = _translators.log_info
_LW = _translators.log_warning
_LE = _translators.log_error
_LC = _translators.log_critical
def translate(value, user_locale):
return oslo_i18n.translate(value, user_locale)
def get_available_languages():
return oslo_i18n.get_available_languages(DOMAIN)

View File

@ -0,0 +1,248 @@
# Copyright 2017 OP5 AB
# Copyright 2017 FUJITSU LIMITED
# Copyright (c) 2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import re
import sys
import logging
from oslo_config import cfg
from oslo_policy import policy
from monasca_api.common.policy.i18n import _LW
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
POLICIES = None
USER_BASED_RESOURCES = ['os-keypairs']
KEY_EXPR = re.compile(r'%\((\w+)\)s')
_ENFORCER = None
# oslo_policy will read the policy configuration file again when the file
# is changed in runtime so the old policy rules will be saved to
# saved_file_rules and used to compare with new rules to determine
# whether the rules were updated.
saved_file_rules = []
def reset():
"""Reset Enforcer class."""
global _ENFORCER
if _ENFORCER:
_ENFORCER.clear()
_ENFORCER = None
def init(policy_file=None, rules=None, default_rule=None, use_conf=True):
"""Init an Enforcer class.
:param policy_file: Custom policy file to use, if none is specified,
`CONF.policy_file` will be used.
:param rules: Default dictionary / Rules to use. It will be
considered just in the first instantiation.
:param default_rule: Default rule to use, CONF.default_rule will
be used if none is specified.
:param use_conf: Whether to load rules from config file.
"""
global _ENFORCER
global saved_file_rules
if not _ENFORCER:
_ENFORCER = policy.Enforcer(CONF,
policy_file=policy_file,
rules=rules,
default_rule=default_rule,
use_conf=use_conf
)
register_rules(_ENFORCER)
_ENFORCER.load_rules()
# Only the rules which are loaded from file may be changed
current_file_rules = _ENFORCER.file_rules
current_file_rules = _serialize_rules(current_file_rules)
if saved_file_rules != current_file_rules:
_warning_for_deprecated_user_based_rules(current_file_rules)
saved_file_rules = copy.deepcopy(current_file_rules)
def _serialize_rules(rules):
"""Serialize all the Rule object as string.
New string is used to compare the rules list.
"""
result = [(rule_name, str(rule)) for rule_name, rule in rules.items()]
return sorted(result, key=lambda rule: rule[0])
def _warning_for_deprecated_user_based_rules(rules):
"""Warning user based policy enforcement used in the rule but the rule
doesn't support it.
"""
for rule in rules:
# We will skip the warning for the resources which support user based
# policy enforcement.
if [resource for resource in USER_BASED_RESOURCES
if resource in rule[0]]:
continue
if 'user_id' in KEY_EXPR.findall(rule[1]):
LOG.warning(_LW("The user_id attribute isn't supported in the "
"rule '%s'. All the user_id based policy "
"enforcement will be removed in the "
"future."), rule[0])
def register_rules(enforcer):
"""Register default policy rules."""
rules = POLICIES.list_rules()
enforcer.register_defaults(rules)
def authorize(context, action, target, do_raise=True):
"""Verify that the action is valid on the target in this context.
:param context: monasca project context
:param action: String representing the action to be checked. This
should be colon separated for clarity.
:param target: Dictionary representing the object of the action for
object creation. This should be a dictionary representing
the location of the object e.g.
``{'project_id': 'context.project_id'}``
:param do_raise: if True (the default), raises PolicyNotAuthorized,
if False returns False
:type context: object
:type action: str
:type target: dict
:type do_raise: bool
:return: returns a non-False value (not necessarily True) if authorized,
and the False if not authorized and do_raise if False
:raises oslo_policy.policy.PolicyNotAuthorized: if verification fails
"""
init()
credentials = context.to_policy_values()
try:
result = _ENFORCER.authorize(action, target, credentials,
do_raise=do_raise, action=action)
return result
except policy.PolicyNotRegistered:
LOG.exception('Policy not registered')
raise
except Exception:
LOG.debug('Policy check for %(action)s failed with credentials '
'%(credentials)s',
{'action': action, 'credentials': credentials})
raise
def check_is_admin(context):
"""Check if roles contains 'admin' role according to policy settings."""
init()
credentials = context.to_policy_values()
target = credentials
return _ENFORCER.authorize('admin_required', target, credentials)
def set_rules(rules, overwrite=True, use_conf=False): # pragma: no cover
"""Set rules based on the provided dict of rules.
Note:
Used in tests only.
:param rules: New rules to use. It should be an instance of dict
:param overwrite: Whether to overwrite current rules or update them
with the new rules.
:param use_conf: Whether to reload rules from config file.
"""
init(use_conf=False)
_ENFORCER.set_rules(rules, overwrite, use_conf)
def verify_deprecated_policy(old_policy, new_policy, default_rule, context):
"""Check the rule of the deprecated policy action
If the current rule of the deprecated policy action is set to a non-default
value, then a warning message is logged stating that the new policy
action should be used to dictate permissions as the old policy action is
being deprecated.
:param old_policy: policy action that is being deprecated
:param new_policy: policy action that is replacing old_policy
:param default_rule: the old_policy action default rule value
:param context: the monasca context
"""
if _ENFORCER:
current_rule = str(_ENFORCER.rules[old_policy])
else:
current_rule = None
if current_rule != default_rule:
LOG.warning("Start using the new action '{0}'. The existing "
"action '{1}' is being deprecated and will be "
"removed in future release.".format(new_policy,
old_policy))
target = {'project_id': context.project_id,
'user_id': context.user_id}
return authorize(context=context, action=old_policy, target=target)
else:
return False
def get_rules():
if _ENFORCER:
return _ENFORCER.rules
def get_enforcer():
# This method is for use by oslopolicy CLI scripts. Those scripts need the
# 'output-file' and 'namespace' options, but having those in sys.argv means
# loading the project config options will fail as those are not expected to
# be present. So we pass in an arg list with those stripped out.
conf_args = []
# Start at 1 because cfg.CONF expects the equivalent of sys.argv[1:]
i = 1
while i < len(sys.argv):
if sys.argv[i].strip('-') in ['namespace', 'output-file']:
i += 2
continue
conf_args.append(sys.argv[i])
i += 1
cfg.CONF(conf_args, project='monasca')
init()
return _ENFORCER
@policy.register('is_admin')
class IsAdminCheck(policy.Check):
"""An explicit check for is_admin."""
def __init__(self, kind, match):
"""Initialize the check."""
self.expected = (match.lower() == 'true')
super(IsAdminCheck, self).__init__(kind, str(self.expected))
def __call__(self, target, creds, enforcer):
"""Determine whether is_admin matches the requested value."""
return creds['is_admin'] == self.expected

View File

@ -27,7 +27,7 @@ from cassandra.cluster import DCAwareRoundRobinPolicy
from cassandra.cluster import TokenAwarePolicy
from cassandra.query import FETCH_SIZE_UNSET
from cassandra.query import SimpleStatement
from monasca_common.rest import utils as rest_utils
from monasca_api.common.rest import utils as rest_utils
from oslo_config import cfg
from oslo_log import log
from oslo_utils import encodeutils

View File

@ -24,7 +24,7 @@ from oslo_utils import timeutils
import requests
from six import PY3
from monasca_common.rest import utils as rest_utils
from monasca_api.common.rest import utils as rest_utils
from monasca_api.common.repositories import exceptions
from monasca_api.common.repositories import metrics_repository

View File

View File

@ -0,0 +1,39 @@
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class UnsupportedContentTypeException(Exception):
"""Exception thrown if content type is not supported."""
pass
class UnreadableContentError(IOError):
"""Exception thrown if reading data fails
:py:class`.UnreadableContentError` may be thrown
if data was impossible to read from input
"""
pass
class DataConversionException(Exception):
"""Exception thrown if data transformation fails
:py:class`.DataConversionException` may be thrown
if data was impossible to transform into target
representation according to content_type classifier.
"""
pass

View File

@ -0,0 +1,115 @@
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
import ujson as json
from monasca_api.common.rest import exceptions
ENCODING = 'utf8'
TEXT_CONTENT_TYPE = 'text/plain'
JSON_CONTENT_TYPE = 'application/json'
def _try_catch(fun):
@six.wraps(fun)
def wrapper(*args, **kwargs):
try:
return fun(*args, **kwargs)
except Exception as ex:
raise exceptions.DataConversionException(str(ex))
return wrapper
@_try_catch
def as_json(data, **kwargs):
"""Writes data as json.
:param dict data: data to convert to json
:param kwargs kwargs: kwargs for json dumps
:return: json string
:rtype: str
"""
if 'sort_keys' not in kwargs:
kwargs['sort_keys'] = False
if 'ensure_ascii' not in kwargs:
kwargs['ensure_ascii'] = False
data = json.dumps(data, **kwargs)
return data
@_try_catch
def from_json(data, **kwargs):
"""Reads data from json str.
:param str data: data to read
:param kwargs kwargs: kwargs for json loads
:return: read data
:rtype: dict
"""
return json.loads(data, **kwargs)
_READABLE_CONTENT_TYPES = {
TEXT_CONTENT_TYPE: lambda content: content,
JSON_CONTENT_TYPE: from_json
}
def read_body(payload, content_type=JSON_CONTENT_TYPE):
"""Reads HTTP payload according to given content_type.
Function is capable of reading from payload stream.
Read data is then processed according to content_type.
Note:
Content-Type is validated. It means that if read_body
body is not capable of reading data in requested type,
it will throw an exception.
If read data was empty method will return false boolean
value to indicate that.
Note:
There is no transformation if content type is equal to
'text/plain'. What has been read is returned.
:param stream payload: payload to read, payload should have read method
:param str content_type: payload content type, default to application/json
:return: read data, returned type depends on content_type or False
if empty
:exception: :py:class:`.UnreadableBody` - in case of any failure when
reading data
"""
if content_type not in _READABLE_CONTENT_TYPES:
msg = ('Cannot read %s, not in %s' %
(content_type, _READABLE_CONTENT_TYPES))
raise exceptions.UnsupportedContentTypeException(msg)
try:
content = payload.read()
if not content:
return None
except Exception as ex:
raise exceptions.UnreadableContentError(str(ex))
return _READABLE_CONTENT_TYPES[content_type](content)

View File

@ -18,7 +18,7 @@ import os
import falcon
from falcon import testing
import fixtures
from monasca_common.policy import policy_engine as policy
from monasca_api.common.policy import policy_engine as policy
from oslo_config import cfg
from oslo_config import fixture as oo_cfg
from oslo_context import fixture as oo_ctx

View File

View File

@ -0,0 +1,102 @@
# Copyright 2017 OP5 AB
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Base classes for policy unit tests."""
import os
import fixtures
from oslo_config import cfg
from oslo_config import fixture as config_fixture
from oslo_policy import opts as policy_opts
from oslo_serialization import jsonutils
from oslotest import base
from monasca_api.common.policy import policy_engine
CONF = cfg.CONF
class FakePolicy(object):
def list_rules(self):
return []
class ConfigFixture(config_fixture.Config):
def setUp(self):
super(ConfigFixture, self).setUp()
CONF(args=[],
prog='api',
project='monasca',
version=0,
description='Testing monasca-api.common')
policy_opts.set_defaults(CONF)
class BaseTestCase(base.BaseTestCase):
def setUp(self):
super(BaseTestCase, self).setUp()
self.useFixture(ConfigFixture(CONF))
self.useFixture(EmptyPolicyFixture())
@staticmethod
def conf_override(**kw):
"""Override flag variables for a test."""
group = kw.pop('group', None)
for k, v in kw.items():
CONF.set_override(k, v, group)
class EmptyPolicyFixture(fixtures.Fixture):
"""Override the policy with an empty policy file.
This overrides the policy with a completely fake and synthetic
policy file.
"""
def setUp(self):
super(EmptyPolicyFixture, self).setUp()
self._prepare_policy()
policy_engine.POLICIES = FakePolicy()
policy_engine.reset()
policy_engine.init()
self.addCleanup(policy_engine.reset)
def _prepare_policy(self):
policy_dir = self.useFixture(fixtures.TempDir())
policy_file = os.path.join(policy_dir.path, 'policy.yaml')
policy_rules = jsonutils.loads('{}')
self.add_missing_default_rules(policy_rules)
with open(policy_file, 'w') as f:
jsonutils.dump(policy_rules, f)
BaseTestCase.conf_override(policy_file=policy_file,
group='oslo_policy')
BaseTestCase.conf_override(policy_dirs=[], group='oslo_policy')
def add_missing_default_rules(self, rules):
policies = FakePolicy()
for rule in policies.list_rules():
if rule.name not in rules:
rules[rule.name] = rule.check_str

View File

@ -0,0 +1,274 @@
# Copyright 2017 OP5 AB
# Copyright 2011 Piston Cloud Computing, Inc.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import requests_mock
from oslo_context import context
from oslo_policy import policy as os_policy
from monasca_api.common.policy import policy_engine
from monasca_api.tests.policy import base
class PolicyFileTestCase(base.BaseTestCase):
def setUp(self):
super(PolicyFileTestCase, self).setUp()
self.context = context.RequestContext(user='fake',
tenant='fake',
is_admin=False)
self.target = {}
def test_modified_policy_reloads(self):
tmp_file = \
self.create_tempfiles(files=[('policies', '{}')], ext='.yaml')[0]
base.BaseTestCase.conf_override(policy_file=tmp_file,
group='oslo_policy')
policy_engine.reset()
policy_engine.init()
action = 'example:test'
rule = os_policy.RuleDefault(action, '')
policy_engine._ENFORCER.register_defaults([rule])
with open(tmp_file, 'w') as policy_file:
policy_file.write('{"example:test": ""}')
policy_engine.authorize(self.context, action, self.target)
with open(tmp_file, 'w') as policy_file:
policy_file.write('{"example:test": "!"}')
policy_engine._ENFORCER.load_rules(True)
self.assertRaises(os_policy.PolicyNotAuthorized,
policy_engine.authorize,
self.context, action, self.target)
class PolicyTestCase(base.BaseTestCase):
def setUp(self):
super(PolicyTestCase, self).setUp()
rules = [
os_policy.RuleDefault("true", "@"),
os_policy.RuleDefault("example:allowed", "@"),
os_policy.RuleDefault("example:denied", "!"),
os_policy.RuleDefault("old_action_not_default", "@"),
os_policy.RuleDefault("new_action", "@"),
os_policy.RuleDefault("old_action_default", "rule:admin_api"),
os_policy.RuleDefault("example:lowercase_admin",
"role:admin or role:sysadmin"),
os_policy.RuleDefault("example:uppercase_admin",
"role:ADMIN or role:sysadmin"),
os_policy.RuleDefault("example:get_http",
"http://www.example.com"),
os_policy.RuleDefault("example:my_file",
"role:compute_admin or "
"project_id:%(project_id)s"),
os_policy.RuleDefault("example:early_and_fail", "! and @"),
os_policy.RuleDefault("example:early_or_success", "@ or !"),
]
policy_engine.reset()
policy_engine.init()
self.context = context.RequestContext(user='fake',
tenant='fake',
is_admin=False)
policy_engine._ENFORCER.register_defaults(rules)
self.target = {}
def test_authorize_nonexistent_action_throws(self):
action = 'example:noexists'
self.assertRaises(os_policy.PolicyNotRegistered, policy_engine.authorize,
self.context, action, self.target)
def test_authorize_bad_action_throws(self):
action = 'example:denied'
self.assertRaises(os_policy.PolicyNotAuthorized, policy_engine.authorize,
self.context, action, self.target)
def test_authorize_bad_action_noraise(self):
action = "example:denied"
result = policy_engine.authorize(self.context, action, self.target, False)
self.assertFalse(result)
def test_authorize_good_action(self):
action = "example:allowed"
result = policy_engine.authorize(self.context, action, self.target)
self.assertTrue(result)
@requests_mock.mock()
def test_authorize_http_true(self, req_mock):
req_mock.post('http://www.example.com/',
text='True')
action = "example:get_http"
target = {}
result = policy_engine.authorize(self.context, action, target)
self.assertTrue(result)
@requests_mock.mock()
def test_authorize_http_false(self, req_mock):
req_mock.post('http://www.example.com/',
text='False')
action = "example:get_http"
target = {}
self.assertRaises(os_policy.PolicyNotAuthorized, policy_engine.authorize,
self.context, action, target)
def test_templatized_authorization(self):
target_mine = {'project_id': 'fake'}
target_not_mine = {'project_id': 'another'}
action = "example:my_file"
policy_engine.authorize(self.context, action, target_mine)
self.assertRaises(os_policy.PolicyNotAuthorized, policy_engine.authorize,
self.context, action, target_not_mine)
def test_early_AND_authorization(self):
action = "example:early_and_fail"
self.assertRaises(os_policy.PolicyNotAuthorized, policy_engine.authorize,
self.context, action, self.target)
def test_early_OR_authorization(self):
action = "example:early_or_success"
policy_engine.authorize(self.context, action, self.target)
def test_ignore_case_role_check(self):
lowercase_action = "example:lowercase_admin"
uppercase_action = "example:uppercase_admin"
# NOTE(dprince) we mix case in the Admin role here to ensure
# case is ignored
admin_context = context.RequestContext('admin',
'fake',
roles=['AdMiN'])
policy_engine.authorize(admin_context, lowercase_action, self.target)
policy_engine.authorize(admin_context, uppercase_action, self.target)
@mock.patch.object(policy_engine.LOG, 'warning')
def test_warning_when_deprecated_user_based_rule_used(self, mock_warning):
policy_engine._warning_for_deprecated_user_based_rules(
[("os_compute_api:servers:index",
"project_id:%(project_id)s or user_id:%(user_id)s")])
mock_warning.assert_called_once_with(
u"The user_id attribute isn't supported in the rule "
"'%s'. All the user_id based policy enforcement will be removed "
"in the future.", "os_compute_api:servers:index")
@mock.patch.object(policy_engine.LOG, 'warning')
def test_no_warning_for_user_based_resource(self, mock_warning):
policy_engine._warning_for_deprecated_user_based_rules(
[("os_compute_api:os-keypairs:index",
"user_id:%(user_id)s")])
mock_warning.assert_not_called()
@mock.patch.object(policy_engine.LOG, 'warning')
def test_no_warning_for_no_user_based_rule(self, mock_warning):
policy_engine._warning_for_deprecated_user_based_rules(
[("os_compute_api:servers:index",
"project_id:%(project_id)s")])
mock_warning.assert_not_called()
@mock.patch.object(policy_engine.LOG, 'warning')
def test_verify_deprecated_policy_using_old_action(self, mock_warning):
policy_engine._ENFORCER.load_rules(True)
old_policy = "old_action_not_default"
new_policy = "new_action"
default_rule = "rule:admin_api"
using_old_action = policy_engine.verify_deprecated_policy(
old_policy, new_policy, default_rule, self.context)
mock_warning.assert_called_once_with(
"Start using the new action '{0}'. The existing action '{1}' is "
"being deprecated and will be removed in "
"future release.".format(new_policy, old_policy))
self.assertTrue(using_old_action)
def test_verify_deprecated_policy_using_new_action(self):
policy_engine._ENFORCER.load_rules(True)
old_policy = "old_action_default"
new_policy = "new_action"
default_rule = "rule:admin_api"
using_old_action = policy_engine.verify_deprecated_policy(
old_policy, new_policy, default_rule, self.context)
self.assertFalse(using_old_action)
class IsAdminCheckTestCase(base.BaseTestCase):
def setUp(self):
super(IsAdminCheckTestCase, self).setUp()
policy_engine.init()
def test_init_true(self):
check = policy_engine.IsAdminCheck('is_admin', 'True')
self.assertEqual(check.kind, 'is_admin')
self.assertEqual(check.match, 'True')
self.assertTrue(check.expected)
def test_init_false(self):
check = policy_engine.IsAdminCheck('is_admin', 'nottrue')
self.assertEqual(check.kind, 'is_admin')
self.assertEqual(check.match, 'False')
self.assertFalse(check.expected)
def test_call_true(self):
check = policy_engine.IsAdminCheck('is_admin', 'True')
self.assertTrue(check('target', dict(is_admin=True),
policy_engine._ENFORCER))
self.assertFalse(check('target', dict(is_admin=False),
policy_engine._ENFORCER))
def test_call_false(self):
check = policy_engine.IsAdminCheck('is_admin', 'False')
self.assertFalse(check('target', dict(is_admin=True),
policy_engine._ENFORCER))
self.assertTrue(check('target', dict(is_admin=False),
policy_engine._ENFORCER))
class AdminRolePolicyTestCase(base.BaseTestCase):
def setUp(self):
super(AdminRolePolicyTestCase, self).setUp()
self.noadmin_context = context.RequestContext('fake', 'fake',
roles=['member'])
self.admin_context = context.RequestContext('fake', 'fake',
roles=['admin'])
admin_rule = [
os_policy.RuleDefault('example.admin', 'role:admin'),
]
policy_engine.reset()
policy_engine.init(policy_file=None)
policy_engine._ENFORCER.register_defaults(admin_rule)
policy_engine._ENFORCER.load_rules(True)
self.target = {}
def test_authorize_admin_actions_with_admin_context(self):
for action in policy_engine.get_rules().keys():
policy_engine.authorize(self.admin_context, action, self.target)
def test_authorize_admin_actions_with_nonadmin_context_throws(self):
"""Check if non-admin context passed to admin actions throws
Policy not authorized exception
"""
for action in policy_engine.get_rules().keys():
self.assertRaises(os_policy.PolicyNotAuthorized,
policy_engine.authorize,
self.noadmin_context, action, self.target)

View File

@ -21,12 +21,101 @@ from monasca_api.tests import base
class TestAlarmExpression(base.BaseTestCase):
good_simple_expression = "max(cpu.idle_perc{hostname=fred}, 60) > 10 times 4"
good_simple_expression = "max(cpu.idle_perc{hostname=fred}, 60) <= 3 times 4 OR \
avg(CPU.PERCENT)<5 OR min(cpu.percent, deterministic) gte 3"
def test_good_expression(self):
expression = self.good_simple_expression
sub_exprs = alarm_expr_parser.AlarmExprParser(expression).sub_expr_list
self.assertEqual(1, len(sub_exprs))
self.assertEqual(3, len(sub_exprs))
def test_fmtd_sub_expr(self):
expression = self.good_simple_expression
sub_exprs = alarm_expr_parser.AlarmExprParser(expression).sub_expr_list
self.assertEqual([x.fmtd_sub_expr_str for x in sub_exprs],
['MAX(cpu.idle_perc{hostname=fred}) <= 3.0 times 4',
'AVG(CPU.PERCENT{}) < 5.0', 'MIN(cpu.percent{}) gte 3.0'])
def test_dimensions_str(self):
expression = self.good_simple_expression
sub_exprs = alarm_expr_parser.AlarmExprParser(expression).sub_expr_list
self.assertEqual([x.dimensions_str for x in sub_exprs], ['hostname=fred', '', ''])
def test_function(self):
expression = self.good_simple_expression
sub_exprs = alarm_expr_parser.AlarmExprParser(expression).sub_expr_list
self.assertEqual([x.func for x in sub_exprs], ['max', 'avg', 'min'])
def test_normalized_function(self):
expression = self.good_simple_expression
sub_exprs = alarm_expr_parser.AlarmExprParser(expression).sub_expr_list
self.assertEqual([x.normalized_func for x in sub_exprs], ['MAX', 'AVG', 'MIN'])
def test_metric_name(self):
expression = self.good_simple_expression
sub_exprs = alarm_expr_parser.AlarmExprParser(expression).sub_expr_list
self.assertEqual([x.metric_name for x in sub_exprs],
['cpu.idle_perc', 'CPU.PERCENT', 'cpu.percent'])
def test_normalized_metric_name(self):
expression = self.good_simple_expression
sub_exprs = alarm_expr_parser.AlarmExprParser(expression).sub_expr_list
self.assertEqual([x.normalized_metric_name for x in sub_exprs],
['cpu.idle_perc', 'cpu.percent', 'cpu.percent'])
def test_dimensions(self):
expression = self.good_simple_expression
sub_exprs = alarm_expr_parser.AlarmExprParser(expression).sub_expr_list
self.assertEqual([x.dimensions for x in sub_exprs], ['hostname=fred', '', ''])
def test_dimensions_as_list(self):
expression = self.good_simple_expression
sub_exprs = alarm_expr_parser.AlarmExprParser(expression).sub_expr_list
print([x.dimensions_as_list for x in sub_exprs].__str__())
self.assertEqual([x.dimensions_as_list for x in sub_exprs].__str__(),
"[(['hostname=fred'], {}), [], []]")
def test_operator(self):
expression = self.good_simple_expression
sub_exprs = alarm_expr_parser.AlarmExprParser(expression).sub_expr_list
self.assertEqual([x.operator for x in sub_exprs], ['<=', '<', 'gte'])
def test_threshold(self):
expression = self.good_simple_expression
sub_exprs = alarm_expr_parser.AlarmExprParser(expression).sub_expr_list
self.assertEqual([x.threshold for x in sub_exprs], [3.0, 5.0, 3.0])
def test_period(self):
expression = self.good_simple_expression
sub_exprs = alarm_expr_parser.AlarmExprParser(expression).sub_expr_list
self.assertEqual([x.period for x in sub_exprs], [60, 60, 60])
def test_periods(self):
expression = self.good_simple_expression
sub_exprs = alarm_expr_parser.AlarmExprParser(expression).sub_expr_list
self.assertEqual([x.periods for x in sub_exprs], [4, 1, 1])
def test_deterministic(self):
expression = self.good_simple_expression
sub_exprs = alarm_expr_parser.AlarmExprParser(expression).sub_expr_list
self.assertEqual([x.deterministic for x in sub_exprs], [False, False, True])
def test_normalized_operator(self):
expression = self.good_simple_expression
sub_exprs = alarm_expr_parser.AlarmExprParser(expression).sub_expr_list
self.assertEqual([x.normalized_operator for x in sub_exprs], ['LTE', 'LT', 'GTE'])
def test_id(self):
expression = self.good_simple_expression
sub_exprs = alarm_expr_parser.AlarmExprParser(expression).sub_expr_list
self.assertEqual([x.id for x in sub_exprs], [None, None, None])
def test_set_id(self):
expression = self.good_simple_expression
sub_exprs = alarm_expr_parser.AlarmExprParser(expression).sub_expr_list
for x in sub_exprs:
x.id = 88
self.assertEqual([x.id for x in sub_exprs], [88, 88, 88])
def _ensure_parse_fails(self, expression):
parser = alarm_expr_parser.AlarmExprParser(expression)
@ -35,6 +124,10 @@ class TestAlarmExpression(base.BaseTestCase):
pyparsing.ParseFatalException),
getattr, parser, "sub_expr_list")
def test_incomplete_operator(self):
expression = self.good_simple_expression.replace('<= 3', '')
self._ensure_parse_fails(expression)
def test_no_dimension_name(self):
expression = self.good_simple_expression.replace('hostname', '')
self._ensure_parse_fails(expression)

View File

@ -15,10 +15,10 @@
from falcon import errors
from falcon import testing
from monasca_common.policy import policy_engine as policy
from oslo_policy import policy as os_policy
from monasca_api.api.core import request
from monasca_api.common.policy import policy_engine as policy
from monasca_api.tests import base
import monasca_api.v2.reference.helpers as helpers

View File

@ -14,11 +14,11 @@
# under the License.
from falcon import testing
from monasca_common.policy import policy_engine as policy
from oslo_context import context
from oslo_policy import policy as os_policy
from monasca_api.api.core import request
from monasca_api.common.policy import policy_engine as policy
from monasca_api.policies import roles_list_to_check_str
from monasca_api.tests import base

View File

@ -12,10 +12,10 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca_common.policy import policy_engine as policy
from oslo_policy import policy as os_policy
from monasca_api.api.core import request
from monasca_api.common.policy import policy_engine as policy
import monasca_api.common.repositories.constants as const
from monasca_api.tests import base
from monasca_api.v2.common import exceptions

View File

@ -0,0 +1,81 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslotest import base
from monasca_api.common.rest import exceptions
from monasca_api.common.rest import utils
class TestRestUtils(base.BaseTestCase):
def setUp(self):
super(TestRestUtils, self).setUp()
self.mock_json_patcher = mock.patch('monasca_api.common.rest.utils.json')
self.mock_json = self.mock_json_patcher.start()
def tearDown(self):
super(TestRestUtils, self).tearDown()
self.mock_json_patcher.stop()
def test_read_body_with_success(self):
self.mock_json.loads.return_value = ""
payload = mock.Mock()
utils.read_body(payload)
self.mock_json.loads.assert_called_once_with(payload.read.return_value)
def test_read_body_empty_content_in_payload(self):
self.mock_json.loads.return_value = ""
payload = mock.Mock()
payload.read.return_value = None
self.assertIsNone(utils.read_body(payload))
def test_read_body_json_loads_exception(self):
self.mock_json.loads.side_effect = Exception
payload = mock.Mock()
self.assertRaises(exceptions.DataConversionException,
utils.read_body, payload)
def test_read_body_unsupported_content_type(self):
unsupported_content_type = mock.Mock()
self.assertRaises(
exceptions.UnsupportedContentTypeException, utils.read_body, None,
unsupported_content_type)
def test_read_body_unreadable_content_error(self):
unreadable_content = mock.Mock()
unreadable_content.read.side_effect = Exception
self.assertRaises(
exceptions.UnreadableContentError,
utils.read_body, unreadable_content)
def test_as_json_success(self):
data = mock.Mock()
dumped_json = utils.as_json(data)
self.assertEqual(dumped_json, self.mock_json.dumps.return_value)
def test_as_json_with_exception(self):
data = mock.Mock()
self.mock_json.dumps.side_effect = Exception
self.assertRaises(exceptions.DataConversionException,
utils.as_json, data)

View File

@ -24,7 +24,7 @@ from oslo_utils import timeutils
import six
import six.moves.urllib.parse as urlparse
from monasca_common.rest import utils as rest_utils
from monasca_api.common.rest import utils as rest_utils
from monasca_common.validation import metrics as metric_validation
from monasca_api.v2.common.exceptions import HTTPUnprocessableEntityError

View File

@ -15,6 +15,7 @@ influxdb>=2.9.2;python_version>='3.0' # MIT
mock>=2.0.0 # BSD
funcsigs>=1.0.0;python_version=='2.7' or python_version=='2.6' # Apache-2.0
oslotest>=3.2.0 # Apache-2.0
requests-mock>=1.2.0 # Apache-2.0
stestr>=1.0.0 # Apache-2.0
python-subunit>=1.0.0 # Apache-2.0/BSD