Files
python-group-based-policy-c…/gbpclient/gbp/v2_0/groupbasedpolicy.py
Sumit Naiksatam df9540094a Friendlier option for providing dict key arguments
Several CLI commands require providing dictionary arguments. These
arguments are currently provided as:

--argument-name "key1=<value1,key2=value2,...>"

Although the value fields are optional, it requires the user to
at least provide input in the form "key=". This is not very intuitive
and often confusing.

In this patch we preseve the current convention of providing the
dictionary arguments, but we enhance the parsing logic to also accept only
keys as arguments. The values are assumed as empty strings in such cases
and the appropriate dictonary is constructed and passed to the backend. So,
the following new syntax is also supported:

--argument-name "key1<,key2,...>"

This is a backward compatible change to the CLI.

Change-Id: I0f4c15b02c151a5f4facb2881fb0c11896a7e39c
Closes-bug: 1504912
2015-10-11 02:08:07 -07:00

1444 lines
51 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import logging
import string
from neutronclient.i18n import _
from neutronclient.neutron import v2_0 as neutronV20
from oslo.serialization import jsonutils
from gbpclient.common import utils
def _format_network_service_params(net_svc_policy):
try:
return '\n'.join([jsonutils.dumps(param) for param in
net_svc_policy['network_service_params']])
except (TypeError, KeyError):
return ''
def _format_host_routes(subnet):
try:
return '\n'.join([jsonutils.dumps(route) for route in
subnet['host_routes']])
except (TypeError, KeyError):
return ''
class ListPolicyTarget(neutronV20.ListCommand):
"""List policy_targets that belong to a given tenant."""
resource = 'policy_target'
log = logging.getLogger(__name__ + '.ListPolicyTarget')
_formatters = {}
list_columns = ['id', 'name', 'description', 'policy_target_group_id']
pagination_support = True
sorting_support = True
class ShowPolicyTarget(neutronV20.ShowCommand):
"""Show information of a given policy_target."""
resource = 'policy_target'
log = logging.getLogger(__name__ + '.ShowPolicyTarget')
class CreatePolicyTarget(neutronV20.CreateCommand):
"""Create a policy_target for a given tenant."""
resource = 'policy_target'
log = logging.getLogger(__name__ + '.CreatePolicyTarget')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the Policy Target'))
parser.add_argument(
'--policy-target-group', metavar='PTG',
default='',
help=_('Policy Target Group uuid'))
parser.add_argument(
'--port-id',
default='',
help=_('Neutron Port UUID'))
parser.add_argument(
'name', metavar='NAME',
help=_('Name of policy target to create'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description'])
if parsed_args.policy_target_group:
body[self.resource]['policy_target_group_id'] = \
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'policy_target_group',
parsed_args.policy_target_group)
if parsed_args.port_id:
body[self.resource]['port_id'] = (
parsed_args.port_id)
return body
class DeletePolicyTarget(neutronV20.DeleteCommand):
"""Delete a given Policy Target."""
resource = 'policy_target'
log = logging.getLogger(__name__ + '.DeletePolicyTarget')
class UpdatePolicyTarget(neutronV20.UpdateCommand):
"""Update Policy Target's information."""
resource = 'policy_target'
log = logging.getLogger(__name__ + '.UpdatePolicyTarget')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the Policy Target'))
parser.add_argument(
'--policy-target-group', metavar='PTG',
default='',
help=_('Policy Target Group uuid'))
parser.add_argument(
'--name',
help=_('New name of the Policy Target'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description'])
if parsed_args.policy_target_group:
body[self.resource]['policy_target_group_id'] = \
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'policy_target_group',
parsed_args.policy_target_group)
return body
class ListPolicyTargetGroup(neutronV20.ListCommand):
"""List Policy Target Groups that belong to a given tenant."""
resource = 'policy_target_group'
log = logging.getLogger(__name__ + '.ListPolicyTargetGroup')
list_columns = ['id', 'name', 'description']
pagination_support = True
sorting_support = True
class ShowPolicyTargetGroup(neutronV20.ShowCommand):
"""Show information of a given Policy Target Group."""
resource = 'policy_target_group'
log = logging.getLogger(__name__ + '.ShowPolicyTargetGroup')
class CreatePolicyTargetGroup(neutronV20.CreateCommand):
"""Create a Policy Target Group for a given tenant."""
resource = 'policy_target_group'
log = logging.getLogger(__name__ + '.CreatePolicyTargetGroup')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the Policy Target Group'))
parser.add_argument(
'name', metavar='NAME',
help=_('Name of Policy Target Group to create'))
parser.add_argument(
'--l2-policy', metavar='L2_POLICY',
default='',
help=_('L2 policy uuid'))
parser.add_argument(
'--provided-policy-rule-sets', type=utils.str2dict,
# default={},
help=_('Dictionary of provided policy rule set uuids'))
parser.add_argument(
'--consumed-policy-rule-sets', type=utils.str2dict,
# default={},
help=_('Dictionary of consumed policy rule set uuids'))
parser.add_argument(
'--network-service-policy', metavar='NETWORK_SERVICE_POLICY',
default='',
help=_('Network service policy uuid'))
parser.add_argument(
'--subnets', type=string.split,
help=_('List of neutron subnet uuids'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.l2_policy:
body[self.resource]['l2_policy_id'] = \
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'l2_policy',
parsed_args.l2_policy)
if parsed_args.network_service_policy:
body[self.resource]['network_service_policy_id'] = \
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'network_service_policy',
parsed_args.network_service_policy)
if parsed_args.provided_policy_rule_sets:
for key in parsed_args.provided_policy_rule_sets.keys():
id_key = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'policy_rule_set',
key)
parsed_args.provided_policy_rule_sets[id_key] = \
parsed_args.provided_policy_rule_sets.pop(key)
if parsed_args.consumed_policy_rule_sets:
for key in parsed_args.consumed_policy_rule_sets.keys():
id_key = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'policy_rule_set',
key)
parsed_args.consumed_policy_rule_sets[id_key] = \
parsed_args.consumed_policy_rule_sets.pop(key)
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'provided_policy_rule_sets', 'subnets',
'consumed_policy_rule_sets', 'shared'])
return body
class DeletePolicyTargetGroup(neutronV20.DeleteCommand):
"""Delete a given Policy Target Group."""
resource = 'policy_target_group'
log = logging.getLogger(__name__ + '.DeletePolicyTargetGroup')
class UpdatePolicyTargetGroup(neutronV20.UpdateCommand):
"""Update Policy Target Group's information."""
resource = 'policy_target_group'
log = logging.getLogger(__name__ + '.UpdatePolicyTargetGroup')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the Policy Target Group'))
parser.add_argument(
'--l2-policy', metavar='L2_POLICY',
help=_('L2 policy uuid'))
parser.add_argument(
'--network-service-policy', metavar='NETWORK_SERVICE_POLICY',
help=_('Network Service Policy uuid'))
parser.add_argument(
'--provided-policy-rule-sets', type=utils.str2dict,
help=_('Dictionary of provided policy rule set uuids'))
parser.add_argument(
'--consumed-policy-rule-sets', type=utils.str2dict,
help=_('Dictionary of consumed policy rule set uuids'))
parser.add_argument(
'--subnets', type=string.split,
help=_('List of neutron subnet uuids'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.l2_policy:
body[self.resource]['l2_policy_id'] = \
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'l2_policy',
parsed_args.l2_policy)
if parsed_args.network_service_policy:
body[self.resource]['network_service_policy_id'] = \
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'network_service_policy',
parsed_args.network_service_policy)
if parsed_args.provided_policy_rule_sets:
for key in parsed_args.provided_policy_rule_sets.keys():
id_key = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'policy_rule_set',
key)
parsed_args.provided_policy_rule_sets[id_key] = \
parsed_args.provided_policy_rule_sets.pop(key)
if parsed_args.consumed_policy_rule_sets:
for key in parsed_args.consumed_policy_rule_sets.keys():
id_key = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'policy_rule_set',
key)
parsed_args.consumed_policy_rule_sets[id_key] = \
parsed_args.consumed_policy_rule_sets.pop(key)
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'provided_policy_rule_sets', 'subnets',
'consumed_policy_rule_sets', 'shared'])
return body
class ListL2Policy(neutronV20.ListCommand):
"""List L2 Policies that belong to a given tenant."""
resource = 'l2_policy'
log = logging.getLogger(__name__ + '.ListL2Policy')
_formatters = {}
list_columns = ['id', 'name', 'description', 'l3_policy_id']
pagination_support = True
sorting_support = True
class ShowL2Policy(neutronV20.ShowCommand):
"""Show information of a given L2 Policy."""
resource = 'l2_policy'
log = logging.getLogger(__name__ + '.ShowL2Policy')
class CreateL2Policy(neutronV20.CreateCommand):
"""Create a L2 Policy for a given tenant."""
resource = 'l2_policy'
log = logging.getLogger(__name__ + '.CreateL2Policy')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the L2 Policy'))
parser.add_argument(
'--network',
help=_('Neutron network uuid to map the L2 Policy to'))
parser.add_argument(
'--l3-policy',
default='',
help=_('L3 Policy uuid'))
parser.add_argument(
'name', metavar='NAME',
help=_('Name of L2 Policy to create'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description', 'shared'])
if parsed_args.l3_policy:
body[self.resource]['l3_policy_id'] = \
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'l3_policy',
parsed_args.l3_policy)
if parsed_args.network:
body[self.resource]['network_id'] = (
parsed_args.network)
return body
class DeleteL2Policy(neutronV20.DeleteCommand):
"""Delete a given L2 Policy."""
resource = 'l2_policy'
log = logging.getLogger(__name__ + '.DeleteL2Policy')
class UpdateL2Policy(neutronV20.UpdateCommand):
"""Update L2 Policy's information."""
resource = 'l2_policy'
log = logging.getLogger(__name__ + '.UpdateL2Policy')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the L2 Policy'))
parser.add_argument(
'--l3-policy',
default='',
help=_('L3 Policy uuid'))
parser.add_argument(
'--name', metavar='NAME',
help=_('New name of the L2 Policy'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description', 'shared'])
if parsed_args.l3_policy:
body[self.resource]['l3_policy_id'] = \
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'l3_policy',
parsed_args.l3_policy)
return body
class ListL3Policy(neutronV20.ListCommand):
"""List l3_policies that belong to a given tenant."""
resource = 'l3_policy'
log = logging.getLogger(__name__ + '.ListL3Policy')
_formatters = {}
list_columns = ['id', 'name', 'description', 'ip_pool',
'subnet_prefix_length']
pagination_support = True
sorting_support = True
class ShowL3Policy(neutronV20.ShowCommand):
"""Show information of a given L3 Policy."""
resource = 'l3_policy'
log = logging.getLogger(__name__ + '.ShowL3Policy')
class CreateL3Policy(neutronV20.CreateCommand):
"""Create a L3 Policy for a given tenant."""
resource = 'l3_policy'
log = logging.getLogger(__name__ + '.CreateL3Policy')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the L3 Policy'))
parser.add_argument(
'--ip-version',
type=int,
# default=4, choices=[4, 6],
help=_('IP version, default is 4'))
parser.add_argument(
'--ip-pool',
help=_('CIDR of IP pool to create, default is 10.0.0.0/8'))
parser.add_argument(
'--subnet-prefix-length',
type=int,
# default=24,
help=_('Subnet prefix length, default is 24'))
parser.add_argument(
'--external-segment',
action='append', dest='external_segments', type=utils.str2dict,
help=_('Use format <ext-segment-id-1>=<ip-addr1:ipaddr2:...>'
'(this option can be repeated)'))
parser.add_argument(
'name', metavar='NAME',
help=_('Name of L3 policy to create'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.external_segments:
external_segments_dict = {}
for external_segment in parsed_args.external_segments:
external_segment_id = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'external_segment',
external_segment.keys()[0])
ipaddrs = external_segment.itervalues().next().split(':')
external_segments_dict[external_segment_id] = ipaddrs
body[self.resource]['external_segments'] = external_segments_dict
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'ip_version', 'ip_pool',
'subnet_prefix_length', 'shared'])
return body
class DeleteL3Policy(neutronV20.DeleteCommand):
"""Delete a given L3 Policy."""
resource = 'l3_policy'
log = logging.getLogger(__name__ + '.DeleteL3Policy')
class UpdateL3Policy(neutronV20.UpdateCommand):
"""Update L3 Policy's information."""
resource = 'l3_policy'
log = logging.getLogger(__name__ + '.UpdateL3Policy')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the L3 Policy'))
parser.add_argument(
'--ip-version',
type=int,
help=_('IP version, default is 4'))
parser.add_argument(
'--ip-pool',
help=_('CIDR of IP pool to create, default is 10.0.0.0/8'))
parser.add_argument(
'--subnet-prefix-length',
type=int,
help=_('Subnet prefix length, default is 24'))
parser.add_argument(
'--external-segment',
action='append', dest='external_segments', type=utils.str2dict,
help=_('Use format <ext-segment-id-1>=<ip-addr1:ipaddr2:...>'
'(this option can be repeated)'))
parser.add_argument(
'--name', metavar='NAME',
help=_('New name of the L3 Policy'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.external_segments:
external_segments_dict = {}
for external_segment in parsed_args.external_segments:
if not external_segment:
break
external_segment_id = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'external_segment',
external_segment.keys()[0])
ipaddrs = external_segment.itervalues().next().split(':')
external_segments_dict[external_segment_id] = ipaddrs
body[self.resource]['external_segments'] = external_segments_dict
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'ip_version', 'ip_pool',
'subnet_prefix_length', 'shared'])
return body
class ListNetworkServicePolicy(neutronV20.ListCommand):
"""List Network Service Policies that belong to a given tenant."""
resource = 'network_service_policy'
log = logging.getLogger(__name__ + '.ListNetworkServicePolicy')
_formatters = {'network_servie_params': _format_network_service_params}
list_columns = ['id', 'name', 'description', 'network_service_params']
pagination_support = True
sorting_support = True
class ShowNetworkServicePolicy(neutronV20.ShowCommand):
"""Show information of a given network_service_policy."""
resource = 'network_service_policy'
log = logging.getLogger(__name__ + '.ShowNetworkServicePolicy')
class CreateNetworkServicePolicy(neutronV20.CreateCommand):
"""Create a Network Service Policy for a given tenant."""
resource = 'network_service_policy'
log = logging.getLogger(__name__ + '.CreateNetworkServicePolicy')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the network_service_policy'))
parser.add_argument(
'name',
help=_('Name of network_service_policy to create'))
parser.add_argument(
'--network-service-params',
metavar='type=PARAM_TYPE,name=PARAM_NAME,value=PARAM_VALUE',
action='append', dest='network_service_params',
type=utils.str2dict,
help=_('Network service params for this network service policy'
'(This option can be repeated).'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'network_service_params', 'shared'])
return body
class DeleteNetworkServicePolicy(neutronV20.DeleteCommand):
"""Delete a given network_service_policy."""
resource = 'network_service_policy'
log = logging.getLogger(__name__ + '.DeleteNetworkServicePolicy')
class UpdateNetworkServicePolicy(neutronV20.UpdateCommand):
"""Update network_service_policy's information."""
resource = 'network_service_policy'
log = logging.getLogger(__name__ + '.UpdateNetworkServicePolicy')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the network_service_policy'))
parser.add_argument(
'--name',
help=_('New name of the network_service_policy'))
parser.add_argument(
'--network-service-params',
metavar='type=PARAM_TYPE,name=PARAM_NAME,value=PARAM_VALUE',
action='append', dest='network_service_params',
type=utils.str2dict,
help=_('Network service params for this network service policy'
'(This option can be repeated).'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'network_service_params', 'shared'])
return body
class ListPolicyClassifier(neutronV20.ListCommand):
"""List classifiers that belong to a given tenant."""
resource = 'policy_classifier'
log = logging.getLogger(__name__ + '.ListPolicyClassifier')
_formatters = {}
list_columns = ['id', 'name', 'protocol', 'port_range', 'direction']
pagination_support = True
sorting_support = True
class ShowPolicyClassifier(neutronV20.ShowCommand):
"""Show information of a given classifier."""
resource = 'policy_classifier'
log = logging.getLogger(__name__ + '.ShowPolicyClassifier')
class CreatePolicyClassifier(neutronV20.CreateCommand):
"""Create a classifier for a given tenant."""
resource = 'policy_classifier'
log = logging.getLogger(__name__ + '.CreatePolicyClassifier')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the policy classifier'))
parser.add_argument(
'--protocol',
choices=['tcp', 'udp', 'icmp'],
help=_('Protocol'))
parser.add_argument(
'--port-range',
help=_('Port range'))
parser.add_argument(
'--direction',
choices=['in', 'out', 'bi', ''],
help=_('Direction'))
parser.add_argument(
'name', metavar='NAME',
help=_('Name of classifier to create'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'protocol', 'port_range',
'direction', 'shared'])
return body
class DeletePolicyClassifier(neutronV20.DeleteCommand):
"""Delete a given classifier."""
resource = 'policy_classifier'
log = logging.getLogger(__name__ + '.DeletePolicyClassifier')
class UpdatePolicyClassifier(neutronV20.UpdateCommand):
"""Update classifier's information."""
resource = 'policy_classifier'
log = logging.getLogger(__name__ + '.UpdatePolicyClassifier')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the policy classifier'))
parser.add_argument(
'--protocol',
choices=['tcp', 'udp', 'icmp'],
help=_('Protocol'))
parser.add_argument(
'--port-range',
help=_('Port range'))
parser.add_argument(
'--direction',
choices=['in', 'out', 'bi', ''],
help=_('Direction'))
parser.add_argument(
'--name',
help=_('New name of the classifier'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'protocol', 'port_range',
'direction', 'shared'])
return body
class ListPolicyAction(neutronV20.ListCommand):
"""List actions that belong to a given tenant."""
resource = 'policy_action'
log = logging.getLogger(__name__ + '.ListPolicyAction')
_formatters = {}
list_columns = ['id', 'name', 'action_type', 'action_value']
pagination_support = True
sorting_support = True
class ShowPolicyAction(neutronV20.ShowCommand):
"""Show information of a given action."""
resource = 'policy_action'
log = logging.getLogger(__name__ + '.ShowPolicyAction')
class CreatePolicyAction(neutronV20.CreateCommand):
"""Create a action for a given tenant."""
resource = 'policy_action'
log = logging.getLogger(__name__ + '.CreatePolicyAction')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the policy action'))
parser.add_argument(
'--action-type',
help=_('Type of action'))
parser.add_argument(
'--action-value',
help=_('Name/UUID of servicechain spec for redirect action'))
parser.add_argument(
'name', metavar='NAME',
help=_('Name of action to create'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.action_value:
body[self.resource]['action_value'] = (
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'servicechain_spec',
parsed_args.action_value))
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'action_type', 'shared'])
return body
class DeletePolicyAction(neutronV20.DeleteCommand):
"""Delete a given action."""
resource = 'policy_action'
log = logging.getLogger(__name__ + '.DeletePolicyAction')
class UpdatePolicyAction(neutronV20.UpdateCommand):
"""Update action's information."""
resource = 'policy_action'
log = logging.getLogger(__name__ + '.UpdatePolicyAction')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the policy action'))
parser.add_argument(
'--action-value',
help=_('Name/UUID of servicechain spec for redirect action'))
parser.add_argument(
'--name',
help=_('New name of the action'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.action_value:
body[self.resource]['action_value'] = (
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'servicechain_spec',
parsed_args.action_value))
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'shared'])
return body
class ListPolicyRule(neutronV20.ListCommand):
"""List policy_rules that belong to a given tenant."""
resource = 'policy_rule'
log = logging.getLogger(__name__ + '.ListPolicyRule')
_formatters = {}
list_columns = ['id', 'name', 'enabled', 'classifier_id',
'actions']
pagination_support = True
sorting_support = True
class ShowPolicyRule(neutronV20.ShowCommand):
"""Show information of a given policy_rule."""
resource = 'policy_rule'
log = logging.getLogger(__name__ + '.ShowPolicyRule')
class CreatePolicyRule(neutronV20.CreateCommand):
"""Create a policy_rule for a given tenant."""
resource = 'policy_rule'
log = logging.getLogger(__name__ + '.CreatePolicyRule')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the policy_rule'))
parser.add_argument(
'--enabled', type=bool,
help=_('Enable flag'))
parser.add_argument(
'--classifier',
help=_('uuid of policy classifier'))
parser.add_argument(
'--actions', type=string.split,
help=_('List of policy actions'))
parser.add_argument(
'name', metavar='NAME',
help=_('Name of policy_rule to create'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.actions:
body[self.resource]['policy_actions'] = [
neutronV20.find_resourceid_by_name_or_id(
self.get_client(),
'policy_action',
elem) for elem in parsed_args.actions]
if parsed_args.classifier:
body[self.resource]['policy_classifier_id'] = \
neutronV20.find_resourceid_by_name_or_id(
self.get_client(),
'policy_classifier',
parsed_args.classifier)
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'enabled', 'shared'])
return body
class DeletePolicyRule(neutronV20.DeleteCommand):
"""Delete a given policy_rule."""
resource = 'policy_rule'
log = logging.getLogger(__name__ + '.DeletePolicyRule')
class UpdatePolicyRule(neutronV20.UpdateCommand):
"""Update policy_rule's information."""
resource = 'policy_rule'
log = logging.getLogger(__name__ + '.UpdatePolicyRule')
def add_known_arguments(self, parser):
parser.add_argument(
'--enabled', type=bool,
help=_('Enable flag'))
parser.add_argument(
'--classifier',
help=_('uuid of policy classifier'))
parser.add_argument(
'--actions', type=string.split,
help=_('List of policy actions'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.actions:
body[self.resource]['policy_actions'] = [
neutronV20.find_resourceid_by_name_or_id(
self.get_client(),
'policy_action',
elem) for elem in parsed_args.actions]
if parsed_args.classifier:
body[self.resource]['policy_classifier_id'] = \
neutronV20.find_resourceid_by_name_or_id(
self.get_client(),
'policy_classifier',
parsed_args.classifier)
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'description',
'enabled', 'shared'])
return body
class ListPolicyRuleSet(neutronV20.ListCommand):
"""List policy_rule_sets that belong to a given tenant."""
resource = 'policy_rule_set'
log = logging.getLogger(__name__ + '.ListPolicyRuleSet')
_formatters = {}
list_columns = ['id', 'name', 'ploicy_rules']
pagination_support = True
sorting_support = True
class ShowPolicyRuleSet(neutronV20.ShowCommand):
"""Show information of a given policy_rule_set."""
resource = 'policy_rule_set'
log = logging.getLogger(__name__ + '.ShowPolicyRuleSet')
class CreatePolicyRuleSet(neutronV20.CreateCommand):
"""Create a policy rule set for a given tenant."""
resource = 'policy_rule_set'
log = logging.getLogger(__name__ + '.CreatePolicyRuleSet')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the policy rule set'))
parser.add_argument(
'--policy-rules', type=string.split,
help=_('List of policy rules'))
parser.add_argument(
'--child-policy-rule-sets', type=string.split,
help=_('List of child policy rule sets'))
parser.add_argument(
'name', metavar='NAME',
help=_('Name of policy rule set to create'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.policy_rules:
body[self.resource]['policy_rules'] = [
neutronV20.find_resourceid_by_name_or_id(
self.get_client(),
'policy_rule',
elem) for elem in parsed_args.policy_rules]
if parsed_args.child_policy_rule_sets:
body[self.resource]['child_policy_rule_sets'] = [
neutronV20.find_resourceid_by_name_or_id(
self.get_client(),
'policy_rule_set',
elem) for elem in parsed_args.child_policy_rule_sets]
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description', 'shared'])
return body
class DeletePolicyRuleSet(neutronV20.DeleteCommand):
"""Delete a given policy rule set."""
resource = 'policy_rule_set'
log = logging.getLogger(__name__ + '.DeletePolicyRuleSet')
class UpdatePolicyRuleSet(neutronV20.UpdateCommand):
"""Update policy rule set's information."""
resource = 'policy_rule_set'
log = logging.getLogger(__name__ + '.UpdatePolicyRuleSet')
def add_known_arguments(self, parser):
parser.add_argument(
'--policy-rules', type=string.split,
help=_('List of policy rules'))
parser.add_argument(
'--child-policy-rule-sets', type=string.split,
help=_('List of child policy rule sets'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.policy_rules:
body[self.resource]['policy_rules'] = [
neutronV20.find_resourceid_by_name_or_id(
self.get_client(),
'policy_rule',
elem) for elem in parsed_args.policy_rules]
parsed_args.policy_rules = body[self.resource]['policy_rules']
if parsed_args.child_policy_rule_sets:
body[self.resource]['child_policy_rule_sets'] = [
neutronV20.find_resourceid_by_name_or_id(
self.get_client(),
'policy_rule_set',
elem) for elem in parsed_args.child_policy_rule_sets]
parsed_args.child_policy_rule_sets = (
parsed_args.child_policy_rule_sets)
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'description', 'policy_rules',
'child_policy_rule_sets', 'shared'])
return body
class ListExternalPolicy(neutronV20.ListCommand):
"""List External Policies that belong to a given tenant."""
resource = 'external_policy'
log = logging.getLogger(__name__ + '.ListExternalPolicy')
list_columns = ['id', 'name', 'description', 'shared']
pagination_support = True
sorting_support = True
class ShowExternalPolicy(neutronV20.ShowCommand):
"""Show information of a given External Policy."""
resource = 'external_policy'
log = logging.getLogger(__name__ + '.ShowExternalPolicy')
class CreateExternalPolicy(neutronV20.CreateCommand):
"""Create a External Policy for a given tenant."""
resource = 'external_policy'
log = logging.getLogger(__name__ + '.CreateExternalPolicy')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the External Policy'))
parser.add_argument(
'name', metavar='NAME',
help=_('Name of External Policy to create'))
parser.add_argument(
'--external-segments', type=string.split,
help=_('List of External Segment uuids'))
parser.add_argument(
'--provided-policy-rule-sets', type=utils.str2dict,
help=_('Dictionary of provided policy rule set uuids'))
parser.add_argument(
'--consumed-policy-rule-sets', type=utils.str2dict,
help=_('Dictionary of consumed policy rule set uuids'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.provided_policy_rule_sets:
for key in parsed_args.provided_policy_rule_sets.keys():
id_key = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'policy_rule_set',
key)
parsed_args.provided_policy_rule_sets[id_key] = (
parsed_args.provided_policy_rule_sets.pop(key))
if parsed_args.consumed_policy_rule_sets:
for key in parsed_args.consumed_policy_rule_sets.keys():
id_key = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'policy_rule_set',
key)
parsed_args.consumed_policy_rule_sets[id_key] = (
parsed_args.consumed_policy_rule_sets.pop(key))
if parsed_args.external_segments:
for external_segment in parsed_args.external_segments:
external_segment_id = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'external_segment', external_segment)
parsed_args.external_segments.remove(external_segment)
parsed_args.external_segments.append(external_segment_id)
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'provided_policy_rule_sets',
'external_segments',
'consumed_policy_rule_sets', 'shared'])
return body
class DeleteExternalPolicy(neutronV20.DeleteCommand):
"""Delete a given External Policy."""
resource = 'external_policy'
log = logging.getLogger(__name__ + '.DeleteExternalPolicy')
class UpdateExternalPolicy(neutronV20.UpdateCommand):
"""Update External Policy's information."""
resource = 'external_policy'
log = logging.getLogger(__name__ + '.UpdateExternalPolicy')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the External Policy'))
parser.add_argument(
'--name',
help=_('New name of the External Policy'))
parser.add_argument(
'--external-segments', type=string.split,
help=_('List of External Segment uuids'))
parser.add_argument(
'--provided-policy-rule-sets', type=utils.str2dict,
help=_('Dictionary of provided policy rule set uuids'))
parser.add_argument(
'--consumed-policy-rule-sets', type=utils.str2dict,
help=_('Dictionary of consumed policy rule set uuids'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.provided_policy_rule_sets:
for key in parsed_args.provided_policy_rule_sets.keys():
id_key = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'policy_rule_set',
key)
parsed_args.provided_policy_rule_sets[id_key] = (
parsed_args.provided_policy_rule_sets.pop(key))
if parsed_args.consumed_policy_rule_sets:
for key in parsed_args.consumed_policy_rule_sets.keys():
id_key = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'policy_rule_set',
key)
parsed_args.consumed_policy_rule_sets[id_key] = (
parsed_args.consumed_policy_rule_sets.pop(key))
if parsed_args.external_segments:
for external_segment in parsed_args.external_segments:
external_segment_id = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'external_segment', external_segment)
parsed_args.external_segments.remove(external_segment)
parsed_args.external_segments.append(external_segment_id)
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'provided_policy_rule_sets',
'external_segments',
'consumed_policy_rule_sets', 'shared'])
return body
class ListExternalSegment(neutronV20.ListCommand):
"""List External Segments that belong to a given tenant."""
resource = 'external_segment'
log = logging.getLogger(__name__ + '.ListExternalSegment')
_formatters = {'external_routes': _format_host_routes, }
list_columns = ['id', 'name', 'description', 'cidr',
'external_routes', 'port_address_translation', 'shared']
pagination_support = True
sorting_support = True
class ShowExternalSegment(neutronV20.ShowCommand):
"""Show information of a given External Segment."""
resource = 'external_segment'
log = logging.getLogger(__name__ + '.ShowExternalSegment')
class CreateExternalSegment(neutronV20.CreateCommand):
"""Create a External Segment for a given tenant."""
resource = 'external_segment'
log = logging.getLogger(__name__ + '.CreateExternalSegment')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the External Segment'))
parser.add_argument(
'name', metavar='NAME',
help=_('Name of External Segment to create'))
parser.add_argument(
'--ip-version',
type=int, choices=[4, 6],
help=_('IP version, default is 4'))
parser.add_argument(
'--cidr',
help=_('CIDR of External Segment, default is 172.16.0.0/12'))
parser.add_argument(
'--external-route', metavar='destination=CIDR,nexthop=IP_ADDR',
action='append', dest='external_routes', type=utils.str2dict,
help=_('External route (This option can be repeated).'))
parser.add_argument(
'--port-address-translation', type=bool,
help=_('Perform port-based address translation, default is False'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.external_routes:
body['external_segment']['external_routes'] = (
parsed_args.external_routes)
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'ip_version', 'cidr',
'external_routes', 'port_address_translation',
'shared'])
return body
class DeleteExternalSegment(neutronV20.DeleteCommand):
"""Delete a given External Segment."""
resource = 'external_segment'
log = logging.getLogger(__name__ + '.DeleteExternalSegment')
class UpdateExternalSegment(neutronV20.UpdateCommand):
"""Update External Segment's information."""
resource = 'external_segment'
log = logging.getLogger(__name__ + '.UpdateExternalSegment')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the External Segment'))
parser.add_argument(
'--name',
help=_('New name of External Segment'))
parser.add_argument(
'--ip-version',
type=int, choices=[4, 6],
help=_('IP version, default is 4'))
parser.add_argument(
'--cidr',
help=_('CIDR of External Segment, default is 172.16.0.0/12'))
parser.add_argument(
'--external-route', metavar='destination=CIDR,nexthop=IP_ADDR',
action='append', dest='external_routes', type=utils.str2dict,
help=_('External route (This option can be repeated).'))
parser.add_argument(
'--port-address-translation', type=bool,
help=_('Perform port-based address translation, default is False'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.external_routes:
body['external_segment']['external_routes'] = (
parsed_args.external_routes)
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'ip_version', 'cidr',
'external_routes', 'port_address_translation',
'shared'])
return body
class ListNatPool(neutronV20.ListCommand):
"""List NAT Pools that belong to a given tenant."""
resource = 'nat_pool'
log = logging.getLogger(__name__ + '.ListNatPool')
list_columns = ['id', 'name', 'description', 'ip_pool',
'external_segment_id', 'shared']
pagination_support = True
sorting_support = True
class ShowNatPool(neutronV20.ShowCommand):
"""Show information of a given NAT Pool."""
resource = 'nat_pool'
log = logging.getLogger(__name__ + '.ShowNatPool')
class CreateNatPool(neutronV20.CreateCommand):
"""Create a NAT Pool for a given tenant."""
resource = 'nat_pool'
log = logging.getLogger(__name__ + '.CreateNatPool')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the NAT Pool'))
parser.add_argument(
'name', metavar='NAME',
help=_('Name of NAT Pool to create'))
parser.add_argument(
'--ip-version',
type=int, choices=[4, 6],
help=_('IP version, default is 4'))
parser.add_argument(
'--ip-pool',
help=_('CIDR for NAT Pool'))
parser.add_argument(
'--external-segment',
help=_('External Segment name or UUID'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'ip_version', 'ip_pool', 'shared'])
if parsed_args.external_segment:
body[self.resource]['external_segment_id'] = (
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'external_segment',
parsed_args.external_segment))
return body
class DeleteNatPool(neutronV20.DeleteCommand):
"""Delete a given NAT Pool."""
resource = 'nat_pool'
log = logging.getLogger(__name__ + '.DeleteNatPool')
class UpdateNatPool(neutronV20.UpdateCommand):
"""Update NAT Pool's information."""
resource = 'nat_pool'
log = logging.getLogger(__name__ + '.UpdateNatPool')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the NAT Pool'))
parser.add_argument(
'--name',
help=_('New name of NAT Pool'))
parser.add_argument(
'--ip-version',
type=int, choices=[4, 6],
help=_('IP version, default is 4'))
parser.add_argument(
'--ip-pool',
help=_('CIDR for NAT Pool'))
parser.add_argument(
'--external-segment',
help=_('External Segment name or UUID'))
parser.add_argument(
'--shared', type=bool,
help=_('Shared flag'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'ip_version', 'ip_pool', 'shared'])
if parsed_args.external_segment:
body[self.resource]['external_segment_id'] = (
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'external_segment',
parsed_args.external_segment))
return body