Reflecting L7 content rules capability in LBaaS

Adds CLI commands for L7 policies and rules

Change-Id: I3617c7ecd2a3ac0cae555893235e34d6c2135b81
Implements: blueprint lbaas-l7-rules
Co-Authored-By: Evgeny Fedoruk <evgenyf@radware.com>
Co-Authored-By: Stephen Balukoff <stephen@balukoff.com>
This commit is contained in:
Evgeny Fedoruk 2015-08-26 08:35:29 -07:00
parent 82ec2abb22
commit a64aad2c0f
7 changed files with 857 additions and 0 deletions

View File

@ -0,0 +1,155 @@
# Copyright 2016 Radware LTD.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from neutronclient._i18n import _
from neutronclient.common import exceptions
from neutronclient.common import utils
from neutronclient.neutron import v2_0 as neutronV20
def _get_listener_id(client, listener_id_or_name):
return neutronV20.find_resourceid_by_name_or_id(
client, 'listener', listener_id_or_name)
def _get_pool_id(client, pool_id_or_name):
return neutronV20.find_resourceid_by_name_or_id(
client, 'pool', pool_id_or_name, cmd_resource='lbaas_pool')
def _add_common_args(parser, is_create=True):
parser.add_argument(
'--name',
help=_('Name of the policy.'))
parser.add_argument(
'--description',
help=_('Description of the policy.'))
parser.add_argument(
'--action',
required=is_create,
metavar='ACTION',
type=utils.convert_to_uppercase,
choices=['REJECT', 'REDIRECT_TO_POOL', 'REDIRECT_TO_URL'],
help=_('Action type of the policy.'))
parser.add_argument(
'--redirect-pool',
help=_('ID or name of the pool for REDIRECT_TO_POOL action type.'))
parser.add_argument(
'--redirect-url',
help=_('URL for REDIRECT_TO_URL action type. '
'This should be a valid URL string.'))
parser.add_argument(
'--position',
type=int,
help=_('L7 policy position in ordered policies list. '
'This must be an integer starting from 1. '
'Not specifying the position will place the policy '
'at the tail of existing policies list.'))
def _common_args2body(client, parsed_args, is_create=True):
if parsed_args.redirect_url:
if parsed_args.action != 'REDIRECT_TO_URL':
raise exceptions.CommandError(_('Action must be REDIRECT_TO_URL'))
if parsed_args.redirect_pool:
if parsed_args.action != 'REDIRECT_TO_POOL':
raise exceptions.CommandError(_('Action must be REDIRECT_TO_POOL'))
parsed_args.redirect_pool_id = _get_pool_id(
client, parsed_args.redirect_pool)
if (parsed_args.action == 'REDIRECT_TO_URL' and
not parsed_args.redirect_url):
raise exceptions.CommandError(_('Redirect URL must be specified'))
if (parsed_args.action == 'REDIRECT_TO_POOL' and
not parsed_args.redirect_pool):
raise exceptions.CommandError(_('Redirect pool must be specified'))
attributes = ['name', 'description',
'action', 'redirect_pool_id', 'redirect_url',
'position', 'admin_state_up']
if is_create:
parsed_args.listener_id = _get_listener_id(
client, parsed_args.listener)
attributes.extend(['listener_id', 'tenant_id'])
body = {}
neutronV20.update_dict(parsed_args, body, attributes)
return {'l7policy': body}
class ListL7Policy(neutronV20.ListCommand):
"""LBaaS v2 List L7 policies that belong to a given listener."""
resource = 'l7policy'
shadow_resource = 'lbaas_l7policy'
pagination_support = True
sorting_support = True
list_columns = [
'id', 'name', 'action', 'redirect_pool_id', 'redirect_url',
'position', 'admin_state_up', 'status'
]
class ShowL7Policy(neutronV20.ShowCommand):
"""LBaaS v2 Show information of a given L7 policy."""
resource = 'l7policy'
shadow_resource = 'lbaas_l7policy'
class CreateL7Policy(neutronV20.CreateCommand):
"""LBaaS v2 Create L7 policy."""
resource = 'l7policy'
shadow_resource = 'lbaas_l7policy'
def add_known_arguments(self, parser):
_add_common_args(parser)
parser.add_argument(
'--admin-state-down',
dest='admin_state_up',
action='store_false',
help=_('Set admin state up to false.'))
parser.add_argument(
'--listener',
required=True,
metavar='LISTENER',
help=_('ID or name of the listener this policy belongs to.'))
def args2body(self, parsed_args):
return _common_args2body(self.get_client(), parsed_args)
class UpdateL7Policy(neutronV20.UpdateCommand):
"""LBaaS v2 Update a given L7 policy."""
resource = 'l7policy'
shadow_resource = 'lbaas_l7policy'
def add_known_arguments(self, parser):
_add_common_args(parser, is_create=False)
utils.add_boolean_argument(
parser, '--admin-state-up',
help=_('Specify the administrative state of the policy'
' (True meaning "Up").'))
def args2body(self, parsed_args):
return _common_args2body(self.get_client(), parsed_args, False)
class DeleteL7Policy(neutronV20.DeleteCommand):
"""LBaaS v2 Delete a given L7 policy."""
resource = 'l7policy'
shadow_resource = 'lbaas_l7policy'

View File

@ -0,0 +1,148 @@
# Copyright 2016 Radware LTD.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from neutronclient._i18n import _
from neutronclient.common import utils
from neutronclient.neutron import v2_0 as neutronV20
def _get_policy_id(client, policy_id_or_name):
return neutronV20.find_resourceid_by_name_or_id(
client, 'l7policy', policy_id_or_name,
cmd_resource='lbaas_l7policy')
class LbaasL7RuleMixin(object):
def set_extra_attrs(self, parsed_args):
self.parent_id = _get_policy_id(self.get_client(),
parsed_args.l7policy)
def add_known_arguments(self, parser):
parser.add_argument(
'l7policy', metavar='L7POLICY',
help=_('ID or name of L7 policy this rule belongs to.'))
def _add_common_args(parser, is_create=True):
parser.add_argument(
'--type',
required=is_create,
type=utils.convert_to_uppercase,
choices=['HOST_NAME', 'PATH', 'FILE_TYPE', 'HEADER', 'COOKIE'],
help=_('Rule type.'))
parser.add_argument(
'--compare-type',
required=is_create,
type=utils.convert_to_uppercase,
choices=['REGEX', 'STARTS_WITH', 'ENDS_WITH',
'CONTAINS', 'EQUAL_TO'],
help=_('Rule compare type.'))
parser.add_argument(
'--invert-compare',
dest='invert',
action='store_true',
help=_('Invert the compare type.'))
parser.add_argument(
'--key',
help=_('Key to compare.'
' Relevant for HEADER and COOKIE types only.'))
parser.add_argument(
'--value',
required=is_create,
help=_('Value to compare.'))
def _common_args2body(client, parsed_args, is_create=True):
attributes = ['type', 'compare_type',
'invert', 'key', 'value', 'admin_state_up']
if is_create:
attributes.append('tenant_id')
body = {}
neutronV20.update_dict(parsed_args, body, attributes)
return {'rule': body}
class ListL7Rule(LbaasL7RuleMixin, neutronV20.ListCommand):
"""LBaaS v2 List L7 rules that belong to a given L7 policy."""
resource = 'rule'
shadow_resource = 'lbaas_l7rule'
pagination_support = True
sorting_support = True
list_columns = [
'id', 'type', 'compare_type', 'invert', 'key', 'value',
'admin_state_up', 'status'
]
def take_action(self, parsed_args):
self.parent_id = _get_policy_id(self.get_client(),
parsed_args.l7policy)
self.values_specs.append('--l7policy_id=%s' % self.parent_id)
return super(ListL7Rule, self).take_action(parsed_args)
class ShowL7Rule(LbaasL7RuleMixin, neutronV20.ShowCommand):
"""LBaaS v2 Show information of a given rule."""
resource = 'rule'
shadow_resource = 'lbaas_l7rule'
class CreateL7Rule(LbaasL7RuleMixin, neutronV20.CreateCommand):
"""LBaaS v2 Create L7 rule."""
resource = 'rule'
shadow_resource = 'lbaas_l7rule'
def add_known_arguments(self, parser):
super(CreateL7Rule, self).add_known_arguments(parser)
_add_common_args(parser)
parser.add_argument(
'--admin-state-down',
dest='admin_state_up',
action='store_false',
help=_('Set admin state up to false'))
def args2body(self, parsed_args):
return _common_args2body(self.get_client(), parsed_args)
class UpdateL7Rule(LbaasL7RuleMixin, neutronV20.UpdateCommand):
"""LBaaS v2 Update a given L7 rule."""
resource = 'rule'
shadow_resource = 'lbaas_l7rule'
def add_known_arguments(self, parser):
super(UpdateL7Rule, self).add_known_arguments(parser)
_add_common_args(parser, False)
utils.add_boolean_argument(
parser, '--admin-state-up',
help=_('Specify the administrative state of the rule'
' (True meaning "Up").'))
def args2body(self, parsed_args):
return _common_args2body(self.get_client(), parsed_args, False)
class DeleteL7Rule(LbaasL7RuleMixin, neutronV20.DeleteCommand):
"""LBaaS v2 Delete a given L7 rule."""
resource = 'rule'
shadow_resource = 'lbaas_l7rule'

View File

@ -59,6 +59,8 @@ from neutronclient.neutron.v2_0.lb import healthmonitor as lb_healthmonitor
from neutronclient.neutron.v2_0.lb import member as lb_member
from neutronclient.neutron.v2_0.lb import pool as lb_pool
from neutronclient.neutron.v2_0.lb.v2 import healthmonitor as lbaas_healthmon
from neutronclient.neutron.v2_0.lb.v2 import l7policy as lbaas_l7policy
from neutronclient.neutron.v2_0.lb.v2 import l7rule as lbaas_l7rule
from neutronclient.neutron.v2_0.lb.v2 import listener as lbaas_listener
from neutronclient.neutron.v2_0.lb.v2 import loadbalancer as lbaas_loadbalancer
from neutronclient.neutron.v2_0.lb.v2 import member as lbaas_member
@ -217,6 +219,16 @@ COMMAND_V2 = {
'lbaas-listener-create': lbaas_listener.CreateListener,
'lbaas-listener-update': lbaas_listener.UpdateListener,
'lbaas-listener-delete': lbaas_listener.DeleteListener,
'lbaas-l7policy-list': lbaas_l7policy.ListL7Policy,
'lbaas-l7policy-show': lbaas_l7policy.ShowL7Policy,
'lbaas-l7policy-create': lbaas_l7policy.CreateL7Policy,
'lbaas-l7policy-update': lbaas_l7policy.UpdateL7Policy,
'lbaas-l7policy-delete': lbaas_l7policy.DeleteL7Policy,
'lbaas-l7rule-list': lbaas_l7rule.ListL7Rule,
'lbaas-l7rule-show': lbaas_l7rule.ShowL7Rule,
'lbaas-l7rule-create': lbaas_l7rule.CreateL7Rule,
'lbaas-l7rule-update': lbaas_l7rule.UpdateL7Rule,
'lbaas-l7rule-delete': lbaas_l7rule.DeleteL7Rule,
'lbaas-pool-list': lbaas_pool.ListPool,
'lbaas-pool-show': lbaas_pool.ShowPool,
'lbaas-pool-create': lbaas_pool.CreatePool,

View File

@ -0,0 +1,260 @@
# Copyright 2016 Radware LTD.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import sys
from neutronclient.common import exceptions
from neutronclient.neutron.v2_0.lb.v2 import l7policy
from neutronclient.tests.unit import test_cli20
"""Structure for mapping cli and api arguments
The structure maps cli arguments and a list of its
api argument name, default cli value and default api value.
It helps to make tests more general for different argument types.
"""
args_conf = {
'name': ['name', 'test_policy', 'test_policy'],
'description': ['description', 'test policy', 'test policy'],
'listener': ['listener_id', 'test_listener', 'test_listener'],
'admin-state-up': ['admin_state_up', True, True],
'admin-state-down': ['admin_state_up', None, False],
'action': ['action', 'REJECT', 'REJECT'],
'redirect-url': ['redirect_url', 'http://url', 'http://url'],
'redirect-pool': ['redirect_pool_id', 'test_pool', 'test_pool'],
'position': ['position', '1', 1]}
class CLITestV20LbL7PolicyJSON(test_cli20.CLITestV20Base):
def _get_test_args(self, *args, **kwargs):
"""Function for generically building testing arguments"""
cli_args = []
api_args = {}
for arg in args:
cli_args.append('--' + arg)
if not args_conf[arg][1]:
pass
elif arg in kwargs:
cli_args.append(str(kwargs[arg]))
else:
cli_args.append(args_conf[arg][1])
if arg in kwargs:
api_args[args_conf[arg][0]] = kwargs[arg]
else:
api_args[args_conf[arg][0]] = args_conf[arg][2]
return cli_args, api_args
def _test_create_policy(self, *args, **kwargs):
resource = 'l7policy'
cmd_resource = 'lbaas_l7policy'
cmd = l7policy.CreateL7Policy(test_cli20.MyApp(sys.stdout), None)
cli_args, api_args = self._get_test_args(*args, **kwargs)
position_names = list(api_args.keys())
position_values = list(api_args.values())
self._test_create_resource(resource, cmd, None, 'test_id',
cli_args, position_names, position_values,
cmd_resource=cmd_resource)
def _test_update_policy(self, *args, **kwargs):
resource = 'l7policy'
cmd_resource = 'lbaas_l7policy'
cmd = l7policy.UpdateL7Policy(test_cli20.MyApp(sys.stdout), None)
cli_args, api_args = self._get_test_args(*args, **kwargs)
cli_args.append('test_id')
self._test_update_resource(resource, cmd, 'test_id',
cli_args, api_args,
cmd_resource=cmd_resource)
def test_create_policy_with_mandatory_params(self):
# lbaas-l7policy-create with mandatory params only.
self._test_create_policy('action', 'listener')
def test_create_policy_with_all_params(self):
# lbaas-l7policy-create REJECT policy.
self._test_create_policy('name', 'description',
'action', 'listener',
'position')
def test_create_disabled_policy(self):
# lbaas-l7policy-create disabled REJECT policy.
self._test_create_policy('action', 'listener', 'admin-state-down')
def test_create_url_redirect_policy(self):
# lbaas-l7policy-create REDIRECT_TO_URL policy.
self._test_create_policy('name', 'description',
'action', 'listener',
'redirect-url',
action='REDIRECT_TO_URL')
def test_create_url_redirect_policy_no_url(self):
# lbaas-l7policy-create REDIRECT_TO_URL policy without url argument.
self.assertRaises(exceptions.CommandError,
self._test_create_policy,
'name', 'description',
'action', 'listener',
action='REDIRECT_TO_URL')
def test_create_pool_redirect_policy(self):
# lbaas-l7policy-create REDIRECT_TO_POOL policy.
self._test_create_policy('name', 'description',
'action', 'listener',
'redirect-pool',
action='REDIRECT_TO_POOL')
def test_create_pool_redirect_policy_no_pool(self):
# lbaas-l7policy-create REDIRECT_TO_POOL policy without pool argument.
self.assertRaises(exceptions.CommandError,
self._test_create_policy,
'name', 'description',
'action', 'listener',
action='REDIRECT_TO_POOL')
def test_create_reject_policy_with_url(self):
# lbaas-l7policy-create REJECT policy while specifying url argument.
self.assertRaises(exceptions.CommandError,
self._test_create_policy,
'action', 'listener',
'redirect-url')
def test_create_reject_policy_with_pool(self):
# lbaas-l7policy-create REJECT policy while specifying pool argument.
self.assertRaises(exceptions.CommandError,
self._test_create_policy,
'action', 'listener',
'redirect-pool')
def test_create_pool_redirect_policy_with_url(self):
# lbaas-l7policy-create REDIRECT_TO_POOL policy with url argument.
self.assertRaises(exceptions.CommandError,
self._test_create_policy,
'action', 'listener',
'redirect-pool', 'redirect-url',
action='REDIRECT_TO_POOL')
def test_create_url_redirect_policy_with_pool(self):
# lbaas-l7policy-create REDIRECT_TO_URL policy with pool argument.
self.assertRaises(exceptions.CommandError,
self._test_create_policy,
'action', 'listener',
'redirect-pool', 'redirect-url',
action='REDIRECT_TO_URL')
def test_list_policies(self):
# lbaas-l7policy-list.
resources = 'l7policies'
cmd_resources = 'lbaas_l7policies'
cmd = l7policy.ListL7Policy(test_cli20.MyApp(sys.stdout), None)
self._test_list_resources(resources, cmd, True,
cmd_resources=cmd_resources)
def test_list_policies_pagination(self):
# lbaas-l7policy-list with pagination.
resources = 'l7policies'
cmd_resources = 'lbaas_l7policies'
cmd = l7policy.ListL7Policy(test_cli20.MyApp(sys.stdout), None)
self._test_list_resources_with_pagination(
resources, cmd, cmd_resources=cmd_resources)
def test_list_policies_sort(self):
# lbaas-l7policy-list --sort-key id --sort-key asc.
resources = 'l7policies'
cmd_resources = 'lbaas_l7policies'
cmd = l7policy.ListL7Policy(test_cli20.MyApp(sys.stdout), None)
self._test_list_resources(
resources, cmd, True, cmd_resources=cmd_resources)
def test_list_policies_limit(self):
# lbaas-l7policy-list -P.
resources = 'l7policies'
cmd_resources = 'lbaas_l7policies'
cmd = l7policy.ListL7Policy(test_cli20.MyApp(sys.stdout), None)
self._test_list_resources(
resources, cmd, page_size=1000, cmd_resources=cmd_resources)
def test_show_policy_id(self):
# lbaas-l7policy-show test_id.
resource = 'l7policy'
cmd_resource = 'lbaas_l7policy'
cmd = l7policy.ShowL7Policy(test_cli20.MyApp(sys.stdout), None)
args = ['--fields', 'test_id', self.test_id]
self._test_show_resource(
resource, cmd, self.test_id, args,
['test_id'], cmd_resource=cmd_resource)
def test_show_policy_id_name(self):
# lbaas-l7policy-show.
resource = 'l7policy'
cmd_resource = 'lbaas_l7policy'
cmd = l7policy.ShowL7Policy(test_cli20.MyApp(sys.stdout), None)
args = ['--fields', 'test_id', '--fields', 'name', self.test_id]
self._test_show_resource(
resource, cmd, self.test_id, args,
['test_id', 'name'], cmd_resource=cmd_resource)
def test_disable_policy(self):
# lbaas-l7policy-update test_id --admin-state-up False.
self._test_update_policy('admin-state-up',
**{'admin-state-up': 'False'})
def test_update_policy_name_and_description(self):
# lbaas-l7policy-update test_id --name other --description other_desc.
self._test_update_policy('name', 'description',
name='name',
description='other desc')
def test_update_pool_redirect_policy(self):
# lbaas-l7policy-update test_id --action REDIRECT_TO_POOL
# --redirect-pool id.
self._test_update_policy('action', 'redirect-pool',
**{'action': 'REDIRECT_TO_POOL',
'redirect-pool': 'id'})
def test_update_url_redirect_policy(self):
# lbaas-l7policy-update test_id --action REDIRECT_TO_URL
# --redirect-url http://other_url.
self._test_update_policy('action', 'redirect-url',
**{'action': 'REDIRECT_TO_URL',
'redirect-url': 'http://other_url'})
def test_update_policy_position(self):
# lbaas-l7policy-update test_id --position 2.
self._test_update_policy('position',
position=2)
def test_delete_policy(self):
# lbaas-l7policy-delete test_id.
resource = 'l7policy'
cmd_resource = 'lbaas_l7policy'
cmd = l7policy.DeleteL7Policy(test_cli20.MyApp(sys.stdout), None)
test_id = 'test_id'
args = [test_id]
self._test_delete_resource(resource, cmd, test_id, args,
cmd_resource=cmd_resource)

View File

@ -0,0 +1,210 @@
# Copyright 2016 Radware LTD.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import sys
from neutronclient.neutron.v2_0.lb.v2 import l7rule
from neutronclient.tests.unit import test_cli20
"""Structure for mapping cli and api arguments
The structure maps cli arguments and a list of its
api argument name, default cli value and default api value.
It helps to make tests more general for different argument types.
"""
args_conf = {
'admin-state-up': ['admin_state_up', True, True],
'admin-state-down': ['admin_state_up', None, False],
'type': ['type', 'HOST_NAME', 'HOST_NAME'],
'compare-type': ['compare_type', 'EQUAL_TO', 'EQUAL_TO'],
'invert-compare': ['invert', None, True],
'key': ['key', 'key', 'key'],
'value': ['value', 'value', 'value']}
class CLITestV20LbL7RuleJSON(test_cli20.CLITestV20Base):
def _get_test_args(self, *args, **kwargs):
"""Function for generically building testing arguments"""
cli_args = []
api_args = {}
for arg in args:
cli_args.append('--' + arg)
if not args_conf[arg][1]:
pass
elif arg in kwargs:
cli_args.append(str(kwargs[arg]))
else:
cli_args.append(args_conf[arg][1])
if arg in kwargs:
api_args[args_conf[arg][0]] = kwargs[arg]
else:
api_args[args_conf[arg][0]] = args_conf[arg][2]
if 'invert' not in api_args:
api_args['invert'] = False
return cli_args, api_args
def _test_create_rule(self, *args, **kwargs):
resource = 'rule'
cmd_resource = 'lbaas_l7rule'
cmd = l7rule.CreateL7Rule(test_cli20.MyApp(sys.stdout), None)
cli_args, api_args = self._get_test_args(*args, **kwargs)
position_names = list(api_args.keys())
position_values = list(api_args.values())
cli_args.append('test_policy')
self._test_create_resource(resource, cmd, None, 'test_id',
cli_args, position_names, position_values,
cmd_resource=cmd_resource,
parent_id='test_policy')
def _test_update_rule(self, *args, **kwargs):
resource = 'rule'
cmd_resource = 'lbaas_l7rule'
cmd = l7rule.UpdateL7Rule(test_cli20.MyApp(sys.stdout), None)
cli_args, api_args = self._get_test_args(*args, **kwargs)
cli_args.append('test_id')
cli_args.append('test_policy')
self._test_update_resource(resource, cmd, 'test_id',
cli_args, api_args,
cmd_resource=cmd_resource,
parent_id='test_policy')
def test_create_rule_with_mandatory_params(self):
# lbaas-l7rule-create with mandatory params only.
self._test_create_rule('type', 'compare-type',
'value')
def test_create_disabled_rule(self):
# lbaas-l7rule-create disabled rule.
self._test_create_rule('type', 'compare-type',
'value', 'admin-state-down')
def test_create_rule_with_all_params(self):
# lbaas-l7rule-create with all params set.
self._test_create_rule('type', 'compare-type',
'invert-compare', 'key', 'value',
type='HEADER', compare_type='CONTAINS',
key='other_key', value='other_value')
def test_create_rule_with_inverted_compare(self):
# lbaas-l7rule-create with invertted compare type.
self._test_create_rule('type', 'compare-type',
'invert-compare', 'value')
def test_list_rules(self):
# lbaas-l7rule-list.
resources = 'rules'
cmd_resources = 'lbaas_l7rules'
cmd = l7rule.ListL7Rule(test_cli20.MyApp(sys.stdout), None)
policy_id = 'policy_id'
self._test_list_resources(resources, cmd, True,
base_args=[policy_id],
cmd_resources=cmd_resources,
parent_id=policy_id,
query="l7policy_id=%s" % policy_id)
def test_list_rules_pagination(self):
# lbaas-l7rule-list with pagination.
resources = 'rules'
cmd_resources = 'lbaas_l7rules'
cmd = l7rule.ListL7Rule(test_cli20.MyApp(sys.stdout), None)
policy_id = 'policy_id'
self._test_list_resources_with_pagination(
resources, cmd, base_args=[policy_id],
cmd_resources=cmd_resources, parent_id=policy_id,
query="l7policy_id=%s" % policy_id)
def test_list_rules_sort(self):
# lbaas-l7rule-list --sort-key id --sort-key asc.
resources = 'rules'
cmd_resources = 'lbaas_l7rules'
cmd = l7rule.ListL7Rule(test_cli20.MyApp(sys.stdout), None)
policy_id = 'policy_id'
self._test_list_resources(
resources, cmd, True, base_args=[policy_id],
cmd_resources=cmd_resources, parent_id=policy_id,
query="l7policy_id=%s" % policy_id)
def test_list_rules_limit(self):
# lbaas-l7rule-list -P.
resources = 'rules'
cmd_resources = 'lbaas_l7rules'
cmd = l7rule.ListL7Rule(test_cli20.MyApp(sys.stdout), None)
policy_id = 'policy_id'
self._test_list_resources(resources, cmd, page_size=1000,
base_args=[policy_id],
cmd_resources=cmd_resources,
parent_id=policy_id,
query="l7policy_id=%s" % policy_id)
def test_show_rule_id(self):
# lbaas-l7rule-show test_id.
resource = 'rule'
cmd_resource = 'lbaas_l7rule'
policy_id = 'policy_id'
cmd = l7rule.ShowL7Rule(test_cli20.MyApp(sys.stdout), None)
args = ['--fields', 'id', self.test_id, policy_id]
self._test_show_resource(resource, cmd, self.test_id, args, ['id'],
cmd_resource=cmd_resource,
parent_id=policy_id)
def test_update_rule_type(self):
# lbaas-l7rule-update test_id --type HEADER test_policy
self._test_update_rule('type', type='HEADER')
def test_update_rule_compare_type(self):
# lbaas-l7rule-update test_id --compare-type CONTAINS test_policy.
self._test_update_rule('compare-type',
**{'compare-type': 'CONTAINS'})
def test_update_rule_inverted_compare_type(self):
# lbaas-l7rule-update test_id --invert-compare test_policy.
self._test_update_rule('invert-compare')
def test_update_rule_key_value(self):
# lbaas-l7rule-update test_id --key other --value other test_policy.
self._test_update_rule('key', 'value',
key='other', value='other')
def test_delete_rule(self):
# lbaas-l7rule-delete test_id policy_id.
resource = 'rule'
cmd_resource = 'lbaas_l7rule'
policy_id = 'policy_id'
test_id = 'test_id'
cmd = l7rule.DeleteL7Rule(test_cli20.MyApp(sys.stdout), None)
args = [test_id, policy_id]
self._test_delete_resource(resource, cmd, test_id, args,
cmd_resource=cmd_resource,
parent_id=policy_id)

View File

@ -339,6 +339,10 @@ class Client(ClientBase):
lbaas_loadbalancer_path_status = "/lbaas/loadbalancers/%s/statuses"
lbaas_listeners_path = "/lbaas/listeners"
lbaas_listener_path = "/lbaas/listeners/%s"
lbaas_l7policies_path = "/lbaas/l7policies"
lbaas_l7policy_path = lbaas_l7policies_path + "/%s"
lbaas_l7rules_path = lbaas_l7policy_path + "/rules"
lbaas_l7rule_path = lbaas_l7rules_path + "/%s"
lbaas_pools_path = "/lbaas/pools"
lbaas_pool_path = "/lbaas/pools/%s"
lbaas_healthmonitors_path = "/lbaas/healthmonitors"
@ -438,6 +442,9 @@ class Client(ClientBase):
'metering_label_rules': 'metering_label_rule',
'loadbalancers': 'loadbalancer',
'listeners': 'listener',
'l7rules': 'l7rule',
'l7policies': 'l7policy',
'lbaas_l7policies': 'lbaas_l7policy',
'lbaas_pools': 'lbaas_pool',
'lbaas_healthmonitors': 'lbaas_healthmonitor',
'lbaas_members': 'lbaas_member',
@ -447,6 +454,7 @@ class Client(ClientBase):
'qos_policies': 'qos_policy',
'policies': 'policy',
'bandwidth_limit_rules': 'bandwidth_limit_rule',
'rules': 'rule',
'rule_types': 'rule_type',
'flavors': 'flavor',
'bgp_speakers': 'bgp_speaker',
@ -984,6 +992,62 @@ class Client(ClientBase):
"""Deletes the specified lbaas_listener."""
return self.delete(self.lbaas_listener_path % (lbaas_listener))
@APIParamsCall
def list_lbaas_l7policies(self, retrieve_all=True, **_params):
"""Fetches a list of all L7 policies for a listener."""
return self.list('l7policies', self.lbaas_l7policies_path,
retrieve_all, **_params)
@APIParamsCall
def show_lbaas_l7policy(self, l7policy, **_params):
"""Fetches information of a certain listener's L7 policy."""
return self.get(self.lbaas_l7policy_path % l7policy,
params=_params)
@APIParamsCall
def create_lbaas_l7policy(self, body=None):
"""Creates L7 policy for a certain listener."""
return self.post(self.lbaas_l7policies_path, body=body)
@APIParamsCall
def update_lbaas_l7policy(self, l7policy, body=None):
"""Updates L7 policy."""
return self.put(self.lbaas_l7policy_path % l7policy,
body=body)
@APIParamsCall
def delete_lbaas_l7policy(self, l7policy):
"""Deletes the specified L7 policy."""
return self.delete(self.lbaas_l7policy_path % l7policy)
@APIParamsCall
def list_lbaas_l7rules(self, l7policy, retrieve_all=True, **_params):
"""Fetches a list of all rules for L7 policy."""
return self.list('rules', self.lbaas_l7rules_path % l7policy,
retrieve_all, **_params)
@APIParamsCall
def show_lbaas_l7rule(self, l7rule, l7policy, **_params):
"""Fetches information of a certain L7 policy's rule."""
return self.get(self.lbaas_l7rule_path % (l7policy, l7rule),
params=_params)
@APIParamsCall
def create_lbaas_l7rule(self, l7policy, body=None):
"""Creates rule for a certain L7 policy."""
return self.post(self.lbaas_l7rules_path % l7policy, body=body)
@APIParamsCall
def update_lbaas_l7rule(self, l7rule, l7policy, body=None):
"""Updates L7 rule."""
return self.put(self.lbaas_l7rule_path % (l7policy, l7rule),
body=body)
@APIParamsCall
def delete_lbaas_l7rule(self, l7rule, l7policy):
"""Deletes the specified L7 rule."""
return self.delete(self.lbaas_l7rule_path % (l7policy, l7rule))
@APIParamsCall
def list_lbaas_pools(self, retrieve_all=True, **_params):
"""Fetches a list of all lbaas_pools for a tenant."""

View File

@ -0,0 +1,8 @@
---
features:
- |
CLI support for Layer 7 content policies and rules.
* L7 policies can be defined for listeners along
with the ability to set L7 policy order.
* Multiple rules can be created for an L7 policy.