Files
octavia/octavia/tests/functional/api/v2/test_l7rule.py
Gregory Thiemonge 173f024b59 Fix bug when rolling back prov and op status for some API calls
POST l7rule API call and PUT pool API call had a bug that might have
kept a load balancer in provisioning_status when the call was rejected
by the provider.

The code didn't use the right locked DB session to set the statuses and
then to rollback them when an exception was caught.

Story 2010210
Task 45947

Change-Id: I494ea357afc072360765d7a30a9488e08bfda52c
(cherry picked from commit fdcea400c6)
(cherry picked from commit 3e6f180b77)
(cherry picked from commit 81cb9d5efc)
2022-08-23 17:08:21 +02:00

1323 lines
60 KiB
Python

# Copyright 2016 Blue Box, an IBM Company
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
from octavia.common import constants
import octavia.common.context
from octavia.common import data_models
from octavia.common import exceptions
from octavia.db import repositories
from octavia.tests.functional.api.v2 import base
class TestL7Rule(base.BaseAPITest):
root_tag = 'rule'
root_tag_list = 'rules'
root_tag_links = 'rules_links'
def setUp(self):
super().setUp()
self.lb = self.create_load_balancer(uuidutils.generate_uuid())
self.lb_id = self.lb.get('loadbalancer').get('id')
self.project_id = self.lb.get('loadbalancer').get('project_id')
self.set_lb_status(self.lb_id)
self.listener = self.create_listener(
constants.PROTOCOL_HTTP, 80, lb_id=self.lb_id)
self.listener_id = self.listener.get('listener').get('id')
self.set_lb_status(self.lb_id)
self.l7policy = self.create_l7policy(
self.listener_id, constants.L7POLICY_ACTION_REJECT)
self.l7policy_id = self.l7policy.get('l7policy').get('id')
self.set_lb_status(self.lb_id)
self.l7rules_path = self.L7RULES_PATH.format(
l7policy_id=self.l7policy_id)
self.l7rule_path = self.l7rules_path + '/{l7rule_id}'
self.l7policy_repo = repositories.L7PolicyRepository()
def test_get(self):
l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api', tags=['test_tag']).get(self.root_tag)
response = self.get(self.l7rule_path.format(
l7rule_id=l7rule.get('id'))).json.get(self.root_tag)
self.assertEqual(l7rule, response)
def test_get_authorized(self):
l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.api_settings.get('auth_strategy')
self.conf.config(group='api_settings', auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_member'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': self.project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
response = self.get(self.l7rule_path.format(
l7rule_id=l7rule.get('id'))).json.get(self.root_tag)
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertEqual(l7rule, response)
def test_get_not_authorized(self):
l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.api_settings.get('auth_strategy')
self.conf.config(group='api_settings', auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
response = self.get(self.l7rule_path.format(
l7rule_id=l7rule.get('id')), status=403).json
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertEqual(self.NOT_AUTHORIZED_BODY, response)
def test_get_deleted_gives_404(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_object_status(self.l7rule_repo, api_l7rule.get('id'),
provisioning_status=constants.DELETED)
self.get(self.l7rule_path.format(l7rule_id=api_l7rule.get('id')),
status=404)
def test_get_bad_parent_policy(self):
bad_path = (self.L7RULES_PATH.format(
lb_id=self.lb_id, listener_id=self.listener_id,
l7policy_id=uuidutils.generate_uuid()) + '/' +
uuidutils.generate_uuid())
self.get(bad_path, status=404)
def test_bad_get(self):
self.get(self.l7rule_path.format(
l7rule_id=uuidutils.generate_uuid()), status=404)
def test_get_all(self):
api_l7r_a = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api', tags=['test_tag1']).get(self.root_tag)
self.set_lb_status(self.lb_id)
api_l7r_b = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_COOKIE,
constants.L7RULE_COMPARE_TYPE_CONTAINS, 'some-value',
key='some-cookie', tags=['test_tag2']).get(self.root_tag)
self.set_lb_status(self.lb_id)
rules = self.get(self.l7rules_path).json.get(self.root_tag_list)
self.assertIsInstance(rules, list)
self.assertEqual(2, len(rules))
rule_id_types = [(r.get('id'), r.get('type'),
r['tags']) for r in rules]
self.assertIn((api_l7r_a.get('id'), api_l7r_a.get('type'),
api_l7r_a['tags']),
rule_id_types)
self.assertIn((api_l7r_b.get('id'), api_l7r_b.get('type'),
api_l7r_b['tags']),
rule_id_types)
def test_get_all_authorized(self):
api_l7r_a = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
api_l7r_b = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_COOKIE,
constants.L7RULE_COMPARE_TYPE_CONTAINS, 'some-value',
key='some-cookie').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.api_settings.get('auth_strategy')
self.conf.config(group='api_settings', auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_member'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': self.project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
rules = self.get(
self.l7rules_path).json.get(self.root_tag_list)
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertIsInstance(rules, list)
self.assertEqual(2, len(rules))
rule_id_types = [(r.get('id'), r.get('type')) for r in rules]
self.assertIn((api_l7r_a.get('id'), api_l7r_a.get('type')),
rule_id_types)
self.assertIn((api_l7r_b.get('id'), api_l7r_b.get('type')),
rule_id_types)
def test_get_all_unscoped_token(self):
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_COOKIE,
constants.L7RULE_COMPARE_TYPE_CONTAINS, 'some-value',
key='some-cookie').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.api_settings.get('auth_strategy')
self.conf.config(group='api_settings', auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
None):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_member'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': None}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
result = self.get(self.l7rules_path, status=403).json
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertEqual(self.NOT_AUTHORIZED_BODY, result)
def test_get_all_not_authorized(self):
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_COOKIE,
constants.L7RULE_COMPARE_TYPE_CONTAINS, 'some-value',
key='some-cookie').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.api_settings.get('auth_strategy')
self.conf.config(group='api_settings', auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
rules = self.get(self.l7rules_path, status=403)
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertEqual(self.NOT_AUTHORIZED_BODY, rules.json)
def test_get_all_sorted(self):
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_COOKIE,
constants.L7RULE_COMPARE_TYPE_CONTAINS, 'some-value',
key='some-cookie').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_HOST_NAME,
constants.L7RULE_COMPARE_TYPE_EQUAL_TO,
'www.example.com').get(self.root_tag)
self.set_lb_status(self.lb_id)
response = self.get(self.l7rules_path,
params={'sort': 'type:desc'})
rules_desc = response.json.get(self.root_tag_list)
response = self.get(self.l7rules_path,
params={'sort': 'type:asc'})
rules_asc = response.json.get(self.root_tag_list)
self.assertEqual(3, len(rules_desc))
self.assertEqual(3, len(rules_asc))
rule_id_types_desc = [(rule.get('id'), rule.get('type'))
for rule in rules_desc]
rule_id_types_asc = [(rule.get('id'), rule.get('type'))
for rule in rules_asc]
self.assertEqual(rule_id_types_asc,
list(reversed(rule_id_types_desc)))
def test_get_all_limited(self):
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_COOKIE,
constants.L7RULE_COMPARE_TYPE_CONTAINS, 'some-value',
key='some-cookie').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_HOST_NAME,
constants.L7RULE_COMPARE_TYPE_EQUAL_TO,
'www.example.com').get(self.root_tag)
self.set_lb_status(self.lb_id)
# First two -- should have 'next' link
first_two = self.get(self.l7rules_path, params={'limit': 2}).json
objs = first_two[self.root_tag_list]
links = first_two[self.root_tag_links]
self.assertEqual(2, len(objs))
self.assertEqual(1, len(links))
self.assertEqual('next', links[0]['rel'])
# Third + off the end -- should have previous link
third = self.get(self.l7rules_path, params={
'limit': 2,
'marker': first_two[self.root_tag_list][1]['id']}).json
objs = third[self.root_tag_list]
links = third[self.root_tag_links]
self.assertEqual(1, len(objs))
self.assertEqual(1, len(links))
self.assertEqual('previous', links[0]['rel'])
# Middle -- should have both links
middle = self.get(self.l7rules_path, params={
'limit': 1,
'marker': first_two[self.root_tag_list][0]['id']}).json
objs = middle[self.root_tag_list]
links = middle[self.root_tag_links]
self.assertEqual(1, len(objs))
self.assertEqual(2, len(links))
self.assertCountEqual(['previous', 'next'],
[link['rel'] for link in links])
def test_get_all_fields_filter(self):
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_COOKIE,
constants.L7RULE_COMPARE_TYPE_CONTAINS, 'some-value',
key='some-cookie').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_HOST_NAME,
constants.L7RULE_COMPARE_TYPE_EQUAL_TO,
'www.example.com').get(self.root_tag)
self.set_lb_status(self.lb_id)
l7rus = self.get(self.l7rules_path, params={
'fields': ['id', 'compare_type']}).json
for l7ru in l7rus['rules']:
self.assertIn(u'id', l7ru)
self.assertIn(u'compare_type', l7ru)
self.assertNotIn(u'project_id', l7ru)
def test_get_one_fields_filter(self):
l7r1 = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
l7ru = self.get(
self.l7rule_path.format(l7rule_id=l7r1.get('id')),
params={'fields': ['id', 'compare_type']}).json.get(self.root_tag)
self.assertIn(u'id', l7ru)
self.assertIn(u'compare_type', l7ru)
self.assertNotIn(u'project_id', l7ru)
def test_get_all_filter(self):
ru1 = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_COOKIE,
constants.L7RULE_COMPARE_TYPE_CONTAINS, 'some-value',
key='some-cookie').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_HOST_NAME,
constants.L7RULE_COMPARE_TYPE_EQUAL_TO,
'www.example.com').get(self.root_tag)
self.set_lb_status(self.lb_id)
l7rus = self.get(self.l7rules_path, params={
'id': ru1['id']}).json
self.assertEqual(1, len(l7rus['rules']))
self.assertEqual(ru1['id'],
l7rus['rules'][0]['id'])
def test_get_all_tags_filter(self):
rule1 = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api', tags=['test_tag1', 'test_tag2']).get(self.root_tag)
self.set_lb_status(self.lb_id)
rule2 = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_COOKIE,
constants.L7RULE_COMPARE_TYPE_CONTAINS, 'some-value',
key='some-cookie',
tags=['test_tag2', 'test_tag3']).get(self.root_tag)
self.set_lb_status(self.lb_id)
rule3 = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_HOST_NAME,
constants.L7RULE_COMPARE_TYPE_EQUAL_TO,
'www.example.com',
tags=['test_tag4', 'test_tag5']).get(self.root_tag)
self.set_lb_status(self.lb_id)
rules = self.get(
self.l7rules_path,
params={'tags': 'test_tag2'}
).json.get(self.root_tag_list)
self.assertIsInstance(rules, list)
self.assertEqual(2, len(rules))
self.assertEqual(
[rule1.get('id'), rule2.get('id')],
[rule.get('id') for rule in rules]
)
rules = self.get(
self.l7rules_path,
params={'tags': ['test_tag2', 'test_tag3']}
).json.get(self.root_tag_list)
self.assertIsInstance(rules, list)
self.assertEqual(1, len(rules))
self.assertEqual(
[rule2.get('id')],
[rule.get('id') for rule in rules]
)
rules = self.get(
self.l7rules_path,
params={'tags-any': 'test_tag2'}
).json.get(self.root_tag_list)
self.assertIsInstance(rules, list)
self.assertEqual(2, len(rules))
self.assertEqual(
[rule1.get('id'), rule2.get('id')],
[rule.get('id') for rule in rules]
)
rules = self.get(
self.l7rules_path,
params={'not-tags': 'test_tag2'}
).json.get(self.root_tag_list)
self.assertIsInstance(rules, list)
self.assertEqual(1, len(rules))
self.assertEqual(
[rule3.get('id')],
[rule.get('id') for rule in rules]
)
rules = self.get(
self.l7rules_path,
params={'not-tags-any': ['test_tag2', 'test_tag4']}
).json.get(self.root_tag_list)
self.assertIsInstance(rules, list)
self.assertEqual(0, len(rules))
rules = self.get(
self.l7rules_path,
params={'tags': 'test_tag2',
'tags-any': ['test_tag1', 'test_tag3']}
).json.get(self.root_tag_list)
self.assertIsInstance(rules, list)
self.assertEqual(2, len(rules))
self.assertEqual(
[rule1.get('id'), rule2.get('id')],
[rule.get('id') for rule in rules]
)
rules = self.get(
self.l7rules_path,
params={'tags': 'test_tag2', 'not-tags': 'test_tag2'}
).json.get(self.root_tag_list)
self.assertIsInstance(rules, list)
self.assertEqual(0, len(rules))
def test_empty_get_all(self):
response = self.get(self.l7rules_path).json.get(self.root_tag_list)
self.assertIsInstance(response, list)
self.assertEqual(0, len(response))
def test_get_all_hides_deleted(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
response = self.get(self.l7rules_path)
objects = response.json.get(self.root_tag_list)
self.assertEqual(len(objects), 1)
self.set_object_status(self.l7rule_repo, api_l7rule.get('id'),
provisioning_status=constants.DELETED)
response = self.get(self.l7rules_path)
objects = response.json.get(self.root_tag_list)
self.assertEqual(len(objects), 0)
def test_create_host_name_rule(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_HOST_NAME,
constants.L7RULE_COMPARE_TYPE_EQUAL_TO,
'www.example.com').get(self.root_tag)
self.assertEqual(constants.L7RULE_TYPE_HOST_NAME,
api_l7rule.get('type'))
self.assertEqual(constants.L7RULE_COMPARE_TYPE_EQUAL_TO,
api_l7rule.get('compare_type'))
self.assertEqual('www.example.com', api_l7rule.get('value'))
self.assertIsNone(api_l7rule.get('key'))
self.assertFalse(api_l7rule.get('invert'))
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
l7policy_id=self.l7policy_id, l7rule_id=api_l7rule.get('id'),
lb_prov_status=constants.PENDING_UPDATE,
listener_prov_status=constants.PENDING_UPDATE,
l7policy_prov_status=constants.PENDING_UPDATE,
l7rule_prov_status=constants.PENDING_CREATE,
l7rule_op_status=constants.OFFLINE)
def test_create_rule_authorized(self):
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.api_settings.get('auth_strategy')
self.conf.config(group='api_settings', auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_member'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': self.project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_HOST_NAME,
constants.L7RULE_COMPARE_TYPE_EQUAL_TO,
'www.example.com').get(self.root_tag)
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertEqual(constants.L7RULE_TYPE_HOST_NAME,
api_l7rule.get('type'))
self.assertEqual(constants.L7RULE_COMPARE_TYPE_EQUAL_TO,
api_l7rule.get('compare_type'))
self.assertEqual('www.example.com', api_l7rule.get('value'))
self.assertIsNone(api_l7rule.get('key'))
self.assertFalse(api_l7rule.get('invert'))
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
l7policy_id=self.l7policy_id, l7rule_id=api_l7rule.get('id'),
lb_prov_status=constants.PENDING_UPDATE,
listener_prov_status=constants.PENDING_UPDATE,
l7policy_prov_status=constants.PENDING_UPDATE,
l7rule_prov_status=constants.PENDING_CREATE,
l7rule_op_status=constants.OFFLINE)
def test_create_rule_not_authorized(self):
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.api_settings.get('auth_strategy')
self.conf.config(group='api_settings', auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_HOST_NAME,
constants.L7RULE_COMPARE_TYPE_EQUAL_TO,
'www.example.com', status=403)
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertEqual(self.NOT_AUTHORIZED_BODY, api_l7rule)
def test_create_l7policy_in_error(self):
l7policy = self.create_l7policy(
self.listener_id, constants.L7POLICY_ACTION_REJECT)
l7policy_id = l7policy.get('l7policy').get('id')
self.set_lb_status(self.lb_id)
self.set_object_status(self.l7policy_repo, l7policy_id,
provisioning_status=constants.ERROR)
api_l7rule = self.create_l7rule(
l7policy_id, constants.L7RULE_TYPE_HOST_NAME,
constants.L7RULE_COMPARE_TYPE_EQUAL_TO,
'www.example.com', status=409)
ref_msg = ('L7Policy %s is immutable and cannot be updated.' %
l7policy_id)
self.assertEqual(ref_msg, api_l7rule.get('faultstring'))
def test_create_path_rule(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH, '/api',
invert=True).get(self.root_tag)
self.assertEqual(constants.L7RULE_TYPE_PATH, api_l7rule.get('type'))
self.assertEqual(constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
api_l7rule.get('compare_type'))
self.assertEqual('/api', api_l7rule.get('value'))
self.assertIsNone(api_l7rule.get('key'))
self.assertTrue(api_l7rule.get('invert'))
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
l7policy_id=self.l7policy_id, l7rule_id=api_l7rule.get('id'),
lb_prov_status=constants.PENDING_UPDATE,
listener_prov_status=constants.PENDING_UPDATE,
l7policy_prov_status=constants.PENDING_UPDATE,
l7rule_prov_status=constants.PENDING_CREATE,
l7rule_op_status=constants.OFFLINE)
def test_create_file_type_rule(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_FILE_TYPE,
constants.L7RULE_COMPARE_TYPE_REGEX, 'jpg|png').get(self.root_tag)
self.assertEqual(constants.L7RULE_TYPE_FILE_TYPE,
api_l7rule.get('type'))
self.assertEqual(constants.L7RULE_COMPARE_TYPE_REGEX,
api_l7rule.get('compare_type'))
self.assertEqual('jpg|png', api_l7rule.get('value'))
self.assertIsNone(api_l7rule.get('key'))
self.assertFalse(api_l7rule.get('invert'))
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
l7policy_id=self.l7policy_id, l7rule_id=api_l7rule.get('id'),
lb_prov_status=constants.PENDING_UPDATE,
listener_prov_status=constants.PENDING_UPDATE,
l7policy_prov_status=constants.PENDING_UPDATE,
l7rule_prov_status=constants.PENDING_CREATE,
l7rule_op_status=constants.OFFLINE)
def test_create_header_rule(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_HEADER,
constants.L7RULE_COMPARE_TYPE_ENDS_WITH, '"some string"',
key='Some-header').get(self.root_tag)
self.assertEqual(constants.L7RULE_TYPE_HEADER, api_l7rule.get('type'))
self.assertEqual(constants.L7RULE_COMPARE_TYPE_ENDS_WITH,
api_l7rule.get('compare_type'))
self.assertEqual('"some string"', api_l7rule.get('value'))
self.assertEqual('Some-header', api_l7rule.get('key'))
self.assertFalse(api_l7rule.get('invert'))
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
l7policy_id=self.l7policy_id, l7rule_id=api_l7rule.get('id'),
lb_prov_status=constants.PENDING_UPDATE,
listener_prov_status=constants.PENDING_UPDATE,
l7policy_prov_status=constants.PENDING_UPDATE,
l7rule_prov_status=constants.PENDING_CREATE,
l7rule_op_status=constants.OFFLINE)
def test_create_cookie_rule(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_COOKIE,
constants.L7RULE_COMPARE_TYPE_CONTAINS, 'some-value',
key='some-cookie').get(self.root_tag)
self.assertEqual(constants.L7RULE_TYPE_COOKIE, api_l7rule.get('type'))
self.assertEqual(constants.L7RULE_COMPARE_TYPE_CONTAINS,
api_l7rule.get('compare_type'))
self.assertEqual('some-value', api_l7rule.get('value'))
self.assertEqual('some-cookie', api_l7rule.get('key'))
self.assertFalse(api_l7rule.get('invert'))
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
l7policy_id=self.l7policy_id, l7rule_id=api_l7rule.get('id'),
lb_prov_status=constants.PENDING_UPDATE,
listener_prov_status=constants.PENDING_UPDATE,
l7policy_prov_status=constants.PENDING_UPDATE,
l7rule_prov_status=constants.PENDING_CREATE,
l7rule_op_status=constants.OFFLINE)
@mock.patch('octavia.common.constants.MAX_L7RULES_PER_L7POLICY', new=2)
def test_create_too_many_rules(self):
for i in range(0, constants.MAX_L7RULES_PER_L7POLICY):
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
body = {'type': constants.L7RULE_TYPE_PATH,
'compare_type': constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'value': '/api'}
self.post(self.l7rules_path, self._build_body(body), status=409)
def test_bad_create(self):
l7rule = {'name': 'test1'}
self.post(self.l7rules_path, self._build_body(l7rule), status=400)
def test_bad_create_host_name_rule(self):
l7rule = {'type': constants.L7RULE_TYPE_HOST_NAME,
'compare_type': constants.L7RULE_COMPARE_TYPE_STARTS_WITH}
self.post(self.l7rules_path, self._build_body(l7rule), status=400)
def test_bad_create_path_rule(self):
l7rule = {'type': constants.L7RULE_TYPE_PATH,
'compare_type': constants.L7RULE_COMPARE_TYPE_REGEX,
'value': 'bad string\\'}
self.post(self.l7rules_path, self._build_body(l7rule), status=400)
def test_bad_create_file_type_rule(self):
l7rule = {'type': constants.L7RULE_TYPE_FILE_TYPE,
'compare_type': constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'value': 'png'}
self.post(self.l7rules_path, self._build_body(l7rule), status=400)
def test_bad_create_header_rule(self):
l7rule = {'type': constants.L7RULE_TYPE_HEADER,
'compare_type': constants.L7RULE_COMPARE_TYPE_CONTAINS,
'value': 'some-string'}
self.post(self.l7rules_path, self._build_body(l7rule), status=400)
def test_bad_create_cookie_rule(self):
l7rule = {'type': constants.L7RULE_TYPE_COOKIE,
'compare_type': constants.L7RULE_COMPARE_TYPE_EQUAL_TO,
'key': 'bad cookie name',
'value': 'some-string'}
self.post(self.l7rules_path, self._build_body(l7rule), status=400)
@mock.patch('octavia.api.drivers.utils.call_provider')
def test_create_with_bad_provider(self, mock_provider):
mock_provider.side_effect = exceptions.ProviderDriverError(
prov='bad_driver', user_msg='broken')
l7rule = {'compare_type': 'REGEX',
'invert': False,
'type': 'PATH',
'value': '/images*',
'admin_state_up': True}
response = self.post(self.l7rules_path, self._build_body(l7rule),
status=500)
self.assertIn('Provider \'bad_driver\' reports error: broken',
response.json.get('faultstring'))
def test_create_with_ssl_rule_types(self):
test_mapping = {
constants.L7RULE_TYPE_SSL_CONN_HAS_CERT: {
'value': 'tRuE',
'compare_type': constants.L7RULE_COMPARE_TYPE_EQUAL_TO},
constants.L7RULE_TYPE_SSL_VERIFY_RESULT: {
'value': '0',
'compare_type': constants.L7RULE_COMPARE_TYPE_EQUAL_TO},
constants.L7RULE_TYPE_SSL_DN_FIELD: {
'key': 'st-1', 'value': 'ST-FIELD1-PREFIX',
'compare_type': constants.L7RULE_COMPARE_TYPE_STARTS_WITH}
}
for l7rule_type, test_body in test_mapping.items():
self.set_lb_status(self.lb_id)
test_body.update({'type': l7rule_type})
api_l7rule = self.create_l7rule(
self.l7policy_id, l7rule_type,
test_body['compare_type'], test_body['value'],
key=test_body.get('key')).get(self.root_tag)
self.assertEqual(l7rule_type, api_l7rule.get('type'))
self.assertEqual(test_body['compare_type'],
api_l7rule.get('compare_type'))
self.assertEqual(test_body['value'], api_l7rule.get('value'))
if test_body.get('key'):
self.assertEqual(test_body['key'], api_l7rule.get('key'))
self.assertFalse(api_l7rule.get('invert'))
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
l7policy_id=self.l7policy_id, l7rule_id=api_l7rule.get('id'),
lb_prov_status=constants.PENDING_UPDATE,
listener_prov_status=constants.PENDING_UPDATE,
l7policy_prov_status=constants.PENDING_UPDATE,
l7rule_prov_status=constants.PENDING_CREATE,
l7rule_op_status=constants.OFFLINE)
def _test_bad_cases_with_ssl_rule_types(self, is_create=True,
rule_id=None):
if is_create:
req_func = self.post
first_req_arg = self.l7rules_path
else:
req_func = self.put
first_req_arg = self.l7rule_path.format(l7rule_id=rule_id)
# test bad cases of L7RULE_TYPE_SSL_CONN_HAS_CERT
l7rule = {'compare_type': constants.L7RULE_COMPARE_TYPE_EQUAL_TO,
'invert': False,
'type': constants.L7RULE_TYPE_SSL_CONN_HAS_CERT,
'value': 'true',
'admin_state_up': True,
'key': 'no-need-key'}
response = req_func(first_req_arg, self._build_body(l7rule),
status=400).json
self.assertIn('L7rule type {0} does not use the "key" field.'.format(
constants.L7RULE_TYPE_SSL_CONN_HAS_CERT),
response.get('faultstring'))
l7rule.pop('key')
l7rule['value'] = 'not-true-string'
response = req_func(first_req_arg, self._build_body(l7rule),
status=400).json
self.assertIn(
'L7rule value {0} is not a boolean True string.'.format(
l7rule['value']), response.get('faultstring'))
l7rule['value'] = 'tRUe'
l7rule['compare_type'] = constants.L7RULE_COMPARE_TYPE_STARTS_WITH
response = req_func(first_req_arg, self._build_body(l7rule),
status=400).json
self.assertIn(
'L7rule type {0} only supports the {1} compare type.'.format(
constants.L7RULE_TYPE_SSL_CONN_HAS_CERT,
constants.L7RULE_COMPARE_TYPE_EQUAL_TO),
response.get('faultstring'))
# test bad cases of L7RULE_TYPE_SSL_VERIFY_RES
l7rule = {'compare_type': constants.L7RULE_COMPARE_TYPE_EQUAL_TO,
'invert': False,
'type': constants.L7RULE_TYPE_SSL_VERIFY_RESULT,
'value': 'true',
'admin_state_up': True,
'key': 'no-need-key'}
response = req_func(first_req_arg, self._build_body(l7rule),
status=400).json
self.assertIn(
'L7rule type {0} does not use the "key" field.'.format(
l7rule['type']), response.get('faultstring'))
l7rule.pop('key')
response = req_func(first_req_arg, self._build_body(l7rule),
status=400).json
self.assertIn(
'L7rule type {0} needs a int value, which is >= 0'.format(
l7rule['type']), response.get('faultstring'))
l7rule['value'] = '0'
l7rule['compare_type'] = constants.L7RULE_COMPARE_TYPE_STARTS_WITH
response = req_func(first_req_arg, self._build_body(l7rule),
status=400).json
self.assertIn(
'L7rule type {0} only supports the {1} compare type.'.format(
l7rule['type'], constants.L7RULE_COMPARE_TYPE_EQUAL_TO),
response.get('faultstring'))
# test bad cases of L7RULE_TYPE_SSL_DN_FIELD
l7rule = {'compare_type': constants.L7RULE_COMPARE_TYPE_REGEX,
'invert': False,
'type': constants.L7RULE_TYPE_SSL_DN_FIELD,
'value': 'bad regex\\',
'admin_state_up': True}
# This case just test that fail to parse the regex from the value
req_func(first_req_arg, self._build_body(l7rule), status=400).json
l7rule['value'] = '^.test*$'
response = req_func(first_req_arg, self._build_body(l7rule),
status=400).json
self.assertIn(
'L7rule type {0} needs to specify a key and a value.'.format(
l7rule['type']), response.get('faultstring'))
l7rule['key'] = 'NOT_SUPPORTED_DN_FIELD'
response = req_func(first_req_arg, self._build_body(l7rule),
status=400).json
self.assertIn('Invalid L7rule distinguished name field.',
response.get('faultstring'))
def test_create_bad_cases_with_ssl_rule_types(self):
self._test_bad_cases_with_ssl_rule_types()
def test_create_over_quota(self):
self.start_quota_mock(data_models.L7Rule)
l7rule = {'compare_type': 'REGEX',
'invert': False,
'type': 'PATH',
'value': '/images*',
'admin_state_up': True}
self.post(self.l7rules_path, self._build_body(l7rule), status=403)
def test_update(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api', tags=['old_tag']).get(self.root_tag)
self.set_lb_status(self.lb_id)
new_l7rule = {'value': '/images', 'tags': ['new_tag']}
response = self.put(self.l7rule_path.format(
l7rule_id=api_l7rule.get('id')),
self._build_body(new_l7rule)).json.get(self.root_tag)
self.assertEqual('/images', response.get('value'))
self.assertEqual(['new_tag'], response['tags'])
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
l7policy_id=self.l7policy_id, l7rule_id=api_l7rule.get('id'),
lb_prov_status=constants.PENDING_UPDATE,
listener_prov_status=constants.PENDING_UPDATE,
l7policy_prov_status=constants.PENDING_UPDATE,
l7rule_prov_status=constants.PENDING_UPDATE)
def test_update_authorized(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
new_l7rule = {'value': '/images'}
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.api_settings.get('auth_strategy')
self.conf.config(group='api_settings', auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_member'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': self.project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
response = self.put(self.l7rule_path.format(
l7rule_id=api_l7rule.get('id')),
self._build_body(new_l7rule)).json.get(self.root_tag)
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertEqual('/images', response.get('value'))
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
l7policy_id=self.l7policy_id, l7rule_id=api_l7rule.get('id'),
lb_prov_status=constants.PENDING_UPDATE,
listener_prov_status=constants.PENDING_UPDATE,
l7policy_prov_status=constants.PENDING_UPDATE,
l7rule_prov_status=constants.PENDING_UPDATE)
def test_update_not_authorized(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
new_l7rule = {'value': '/images'}
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.api_settings.get('auth_strategy')
self.conf.config(group='api_settings', auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
response = self.put(self.l7rule_path.format(
l7rule_id=api_l7rule.get('id')),
self._build_body(new_l7rule), status=403)
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertEqual(self.NOT_AUTHORIZED_BODY, response.json)
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
l7policy_id=self.l7policy_id, l7rule_id=api_l7rule.get('id'),
lb_prov_status=constants.ACTIVE,
listener_prov_status=constants.ACTIVE,
l7policy_prov_status=constants.ACTIVE,
l7rule_prov_status=constants.ACTIVE)
def test_bad_update(self):
l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
new_l7rule = {'type': 'bad type'}
self.put(self.l7rule_path.format(l7rule_id=l7rule.get('id')),
self._build_body(new_l7rule), status=400)
@mock.patch('octavia.api.drivers.utils.call_provider')
def test_update_with_bad_provider(self, mock_provider):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
new_l7rule = {'value': '/images'}
mock_provider.side_effect = exceptions.ProviderDriverError(
prov='bad_driver', user_msg='broken')
response = self.put(
self.l7rule_path.format(l7rule_id=api_l7rule.get('id')),
self._build_body(new_l7rule), status=500)
self.assertIn('Provider \'bad_driver\' reports error: broken',
response.json.get('faultstring'))
def test_update_with_invalid_rule(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
new_l7rule = {'compare_type': constants.L7RULE_COMPARE_TYPE_REGEX,
'value': 'bad string\\'}
self.put(self.l7rule_path.format(
l7rule_id=api_l7rule.get('id')), self._build_body(new_l7rule),
status=400)
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
l7policy_id=self.l7policy_id, l7rule_id=api_l7rule.get('id'),
l7rule_prov_status=constants.ACTIVE)
def test_update_with_ssl_rule_types(self):
test_mapping = {
constants.L7RULE_TYPE_SSL_CONN_HAS_CERT: {
'value': 'tRuE',
'compare_type': constants.L7RULE_COMPARE_TYPE_EQUAL_TO},
constants.L7RULE_TYPE_SSL_VERIFY_RESULT: {
'value': '0',
'compare_type': constants.L7RULE_COMPARE_TYPE_EQUAL_TO},
constants.L7RULE_TYPE_SSL_DN_FIELD: {
'key': 'st-1', 'value': 'ST-FIELD1-PREFIX',
'compare_type': constants.L7RULE_COMPARE_TYPE_STARTS_WITH}
}
for l7rule_type, test_body in test_mapping.items():
self.set_lb_status(self.lb_id)
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
test_body.update({'type': l7rule_type})
response = self.put(self.l7rule_path.format(
l7rule_id=api_l7rule.get('id')),
self._build_body(test_body)).json.get(self.root_tag)
self.assertEqual(l7rule_type, response.get('type'))
self.assertEqual(test_body['compare_type'],
response.get('compare_type'))
self.assertEqual(test_body['value'], response.get('value'))
if test_body.get('key'):
self.assertEqual(test_body['key'], response.get('key'))
self.assertFalse(response.get('invert'))
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
l7policy_id=self.l7policy_id, l7rule_id=response.get('id'),
lb_prov_status=constants.PENDING_UPDATE,
listener_prov_status=constants.PENDING_UPDATE,
l7policy_prov_status=constants.PENDING_UPDATE,
l7rule_prov_status=constants.PENDING_UPDATE)
def test_update_bad_cases_with_ssl_rule_types(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self._test_bad_cases_with_ssl_rule_types(
is_create=False, rule_id=api_l7rule.get('id'))
def test_update_invert_none(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api', tags=['old_tag'], invert=True).get(self.root_tag)
self.set_lb_status(self.lb_id)
new_l7rule = {'invert': None}
response = self.put(self.l7rule_path.format(
l7rule_id=api_l7rule.get('id')),
self._build_body(new_l7rule)).json.get(self.root_tag)
self.assertFalse(response.get('invert'))
def test_delete(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
# Set status to ACTIVE/ONLINE because set_lb_status did it in the db
api_l7rule['provisioning_status'] = constants.ACTIVE
api_l7rule['operating_status'] = constants.ONLINE
api_l7rule.pop('updated_at')
response = self.get(self.l7rule_path.format(
l7rule_id=api_l7rule.get('id'))).json.get(self.root_tag)
response.pop('updated_at')
self.assertEqual(api_l7rule, response)
self.delete(self.l7rule_path.format(l7rule_id=api_l7rule.get('id')))
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
l7policy_id=self.l7policy_id, l7rule_id=api_l7rule.get('id'),
lb_prov_status=constants.PENDING_UPDATE,
listener_prov_status=constants.PENDING_UPDATE,
l7policy_prov_status=constants.PENDING_UPDATE,
l7rule_prov_status=constants.PENDING_DELETE)
self.set_lb_status(self.lb_id)
def test_delete_authorized(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
# Set status to ACTIVE/ONLINE because set_lb_status did it in the db
api_l7rule['provisioning_status'] = constants.ACTIVE
api_l7rule['operating_status'] = constants.ONLINE
api_l7rule.pop('updated_at')
response = self.get(self.l7rule_path.format(
l7rule_id=api_l7rule.get('id'))).json.get(self.root_tag)
response.pop('updated_at')
self.assertEqual(api_l7rule, response)
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.api_settings.get('auth_strategy')
self.conf.config(group='api_settings', auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_member'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': self.project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
self.delete(
self.l7rule_path.format(l7rule_id=api_l7rule.get('id')))
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
l7policy_id=self.l7policy_id, l7rule_id=api_l7rule.get('id'),
lb_prov_status=constants.PENDING_UPDATE,
listener_prov_status=constants.PENDING_UPDATE,
l7policy_prov_status=constants.PENDING_UPDATE,
l7rule_prov_status=constants.PENDING_DELETE)
self.set_lb_status(self.lb_id)
def test_delete_not_authorized(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
# Set status to ACTIVE/ONLINE because set_lb_status did it in the db
api_l7rule['provisioning_status'] = constants.ACTIVE
api_l7rule['operating_status'] = constants.ONLINE
api_l7rule.pop('updated_at')
response = self.get(self.l7rule_path.format(
l7rule_id=api_l7rule.get('id'))).json.get(self.root_tag)
response.pop('updated_at')
self.assertEqual(api_l7rule, response)
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.api_settings.get('auth_strategy')
self.conf.config(group='api_settings', auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
self.delete(
self.l7rule_path.format(l7rule_id=api_l7rule.get('id')),
status=403)
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
l7policy_id=self.l7policy_id, l7rule_id=api_l7rule.get('id'),
lb_prov_status=constants.ACTIVE,
listener_prov_status=constants.ACTIVE,
l7policy_prov_status=constants.ACTIVE,
l7rule_prov_status=constants.ACTIVE)
def test_bad_delete(self):
self.delete(self.l7rule_path.format(
l7rule_id=uuidutils.generate_uuid()), status=404)
@mock.patch('octavia.api.drivers.utils.call_provider')
def test_delete_with_bad_provider(self, mock_provider):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
# Set status to ACTIVE/ONLINE because set_lb_status did it in the db
api_l7rule['provisioning_status'] = constants.ACTIVE
api_l7rule['operating_status'] = constants.ONLINE
response = self.get(self.l7rule_path.format(
l7rule_id=api_l7rule.get('id'))).json.get(self.root_tag)
self.assertIsNone(api_l7rule.pop('updated_at'))
self.assertIsNotNone(response.pop('updated_at'))
self.assertEqual(api_l7rule, response)
mock_provider.side_effect = exceptions.ProviderDriverError(
prov='bad_driver', user_msg='broken')
self.delete(self.l7rule_path.format(l7rule_id=api_l7rule.get('id')),
status=500)
def test_create_when_lb_pending_update(self):
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.put(self.LB_PATH.format(lb_id=self.lb_id),
body={'loadbalancer': {'name': 'test_name_change'}})
new_l7rule = {'type': constants.L7RULE_TYPE_PATH,
'compare_type': constants.L7RULE_COMPARE_TYPE_EQUAL_TO,
'value': '/api'}
self.post(self.l7rules_path, body=self._build_body(new_l7rule),
status=409)
def test_update_when_lb_pending_update(self):
l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.put(self.LB_PATH.format(lb_id=self.lb_id),
body={'loadbalancer': {'name': 'test_name_change'}})
new_l7rule = {'type': constants.L7RULE_TYPE_HOST_NAME,
'compare_type': constants.L7RULE_COMPARE_TYPE_REGEX,
'value': '.*.example.com'}
self.put(self.l7rule_path.format(l7rule_id=l7rule.get('id')),
body=self._build_body(new_l7rule), status=409)
def test_delete_when_lb_pending_update(self):
l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.put(self.LB_PATH.format(lb_id=self.lb_id),
body={'loadbalancer': {'name': 'test_name_change'}})
self.delete(self.l7rule_path.format(l7rule_id=l7rule.get('id')),
status=409)
def test_create_when_lb_pending_delete(self):
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.delete(self.LB_PATH.format(lb_id=self.lb_id),
params={'cascade': "true"})
new_l7rule = {'type': constants.L7RULE_TYPE_HEADER,
'compare_type':
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'value': 'some-string',
'key': 'Some-header'}
self.post(self.l7rules_path, body=self._build_body(new_l7rule),
status=409)
def test_update_when_lb_pending_delete(self):
l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.delete(self.LB_PATH.format(lb_id=self.lb_id),
params={'cascade': "true"})
new_l7rule = {'type': constants.L7RULE_TYPE_COOKIE,
'compare_type':
constants.L7RULE_COMPARE_TYPE_ENDS_WITH,
'value': 'some-string',
'key': 'some-cookie'}
self.put(self.l7rule_path.format(l7rule_id=l7rule.get('id')),
body=self._build_body(new_l7rule), status=409)
def test_delete_when_lb_pending_delete(self):
l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.delete(self.LB_PATH.format(lb_id=self.lb_id),
params={'cascade': "true"})
self.delete(self.l7rule_path.format(l7rule_id=l7rule.get('id')),
status=409)
def test_update_already_deleted(self):
l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
# This updates the child objects
self.set_lb_status(self.lb_id, status=constants.DELETED)
new_l7rule = {'type': constants.L7RULE_TYPE_COOKIE,
'compare_type':
constants.L7RULE_COMPARE_TYPE_ENDS_WITH,
'value': 'some-string',
'key': 'some-cookie'}
self.put(self.l7rule_path.format(l7rule_id=l7rule.get('id')),
body=self._build_body(new_l7rule), status=404)
def test_delete_already_deleted(self):
l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
# This updates the child objects
self.set_lb_status(self.lb_id, status=constants.DELETED)
self.delete(self.l7rule_path.format(l7rule_id=l7rule.get('id')),
status=404)
@mock.patch("octavia.api.drivers.noop_driver.driver.NoopManager."
"l7rule_create")
def test_create_with_exception_in_provider_driver(self,
l7rule_create_mock):
l7rule_create_mock.side_effect = Exception("Provider error")
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api', status=500)
lb = self.get(self.LB_PATH.format(lb_id=self.lb_id)).json.get(
"loadbalancer")
self.assertEqual(lb[constants.PROVISIONING_STATUS],
constants.ACTIVE)