Files
python-group-based-policy-c…/gbpclient/gbp/v2_0/groupbasedpolicy.py
Sumit Naiksatam d90eb26446 Make list and dict input consistent, other fixes
With some recent changes, a dictionary argument could be provided
as:
key1,key2,key3

But list arguments had to be space separated:
"x1 x2 x3"

This patch makes this consistent, to always be comma separated. At the
same time backward compatibility is being preserved.

Some other bugs are also getting fixed on account of this clean up:
Closes-bug: 1505452
Closes-bug: 1452155
Closes-bug: 1482310
Closes-bug: 1505343
Closes-bug: 1501659
Closes-bug: 1452156
Closes-bug: 1486725
Closes-bug: 1403192

(cherry picked from commit 3cbbf82073)
(cherry picked from commit f9cad0befa2f995cbb09c164e796772aee9a1180)

Change-Id: I965f5f0166db118beae1e0ba2428e448a418880f
2015-10-16 03:42:53 -07:00

1479 lines
53 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import logging
from neutronclient.neutron import v2_0 as neutronV20
from oslo.serialization import jsonutils
from gbpclient.common import utils
try:
from neutronclient.i18n import _
except ImportError:
from neutronclient.openstack.common.gettextutils import _
def _format_network_service_params(net_svc_policy):
try:
return '\n'.join([jsonutils.dumps(param) for param in
net_svc_policy['network_service_params']])
except (TypeError, KeyError):
return ''
def _format_host_routes(subnet):
try:
return '\n'.join([jsonutils.dumps(route) for route in
subnet['host_routes']])
except (TypeError, KeyError):
return ''
class ListPolicyTarget(neutronV20.ListCommand):
"""List policy_targets that belong to a given tenant."""
resource = 'policy_target'
log = logging.getLogger(__name__ + '.ListPolicyTarget')
_formatters = {}
list_columns = ['id', 'name', 'description', 'policy_target_group_id',
'port_id']
pagination_support = True
sorting_support = True
class ShowPolicyTarget(neutronV20.ShowCommand):
"""Show information of a given policy_target."""
resource = 'policy_target'
log = logging.getLogger(__name__ + '.ShowPolicyTarget')
class CreatePolicyTarget(neutronV20.CreateCommand):
"""Create a policy_target for a given tenant."""
resource = 'policy_target'
log = logging.getLogger(__name__ + '.CreatePolicyTarget')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the Policy Target'))
parser.add_argument(
'--policy-target-group',
help=_('Policy Target Group (required argument)'))
parser.add_argument(
'--port-id', default='',
help=_('Neutron Port UUID'))
parser.add_argument(
'name', metavar='NAME',
help=_('Name of Policy Target to create (required argument)'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description'])
if parsed_args.policy_target_group:
body[self.resource]['policy_target_group_id'] = \
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'policy_target_group',
parsed_args.policy_target_group)
if parsed_args.port_id:
body[self.resource]['port_id'] = (
parsed_args.port_id)
return body
class DeletePolicyTarget(neutronV20.DeleteCommand):
"""Delete a given Policy Target."""
resource = 'policy_target'
log = logging.getLogger(__name__ + '.DeletePolicyTarget')
class UpdatePolicyTarget(neutronV20.UpdateCommand):
"""Update Policy Target's information."""
resource = 'policy_target'
log = logging.getLogger(__name__ + '.UpdatePolicyTarget')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('New description of the Policy Target'))
parser.add_argument(
'--name',
help=_('New name of the Policy Target'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description'])
return body
class ListPolicyTargetGroup(neutronV20.ListCommand):
"""List Policy Target Groups that belong to a given tenant."""
resource = 'policy_target_group'
log = logging.getLogger(__name__ + '.ListPolicyTargetGroup')
list_columns = ['id', 'name', 'description', 'l2_policy_id', 'subnets']
pagination_support = True
sorting_support = True
class ShowPolicyTargetGroup(neutronV20.ShowCommand):
"""Show information of a given Policy Target Group."""
resource = 'policy_target_group'
log = logging.getLogger(__name__ + '.ShowPolicyTargetGroup')
class CreatePolicyTargetGroup(neutronV20.CreateCommand):
"""Create a Policy Target Group for a given tenant."""
resource = 'policy_target_group'
log = logging.getLogger(__name__ + '.CreatePolicyTargetGroup')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the Policy Target Group'))
parser.add_argument(
'name', metavar='NAME',
help=_('Name of Policy Target Group to create '
'(required argument)'))
parser.add_argument(
'--l2-policy', metavar='L2_POLICY',
default='',
help=_('L2 Policy UUID (if not specified, default is used)'))
parser.add_argument(
'--provided-policy-rule-sets', type=utils.str2dict,
help=_('Comma separated list of Policy Rule Sets'))
parser.add_argument(
'--consumed-policy-rule-sets', type=utils.str2dict,
help=_('Comma separated list of Policy Rule Sets'))
parser.add_argument(
'--network-service-policy', metavar='NETWORK_SERVICE_POLICY',
default='',
help=_('Network Service Policy'))
parser.add_argument(
'--subnets', type=utils.str2list,
help=_('Comma separated list of Neutron Subnet UUIDs'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing, default is False'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.l2_policy:
body[self.resource]['l2_policy_id'] = \
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'l2_policy',
parsed_args.l2_policy)
if parsed_args.network_service_policy:
body[self.resource]['network_service_policy_id'] = \
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'network_service_policy',
parsed_args.network_service_policy)
if parsed_args.provided_policy_rule_sets:
for key in parsed_args.provided_policy_rule_sets.keys():
id_key = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'policy_rule_set',
key)
parsed_args.provided_policy_rule_sets[id_key] = \
parsed_args.provided_policy_rule_sets.pop(key)
if parsed_args.consumed_policy_rule_sets:
for key in parsed_args.consumed_policy_rule_sets.keys():
id_key = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'policy_rule_set',
key)
parsed_args.consumed_policy_rule_sets[id_key] = \
parsed_args.consumed_policy_rule_sets.pop(key)
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'provided_policy_rule_sets', 'subnets',
'consumed_policy_rule_sets', 'shared'])
return body
class DeletePolicyTargetGroup(neutronV20.DeleteCommand):
"""Delete a given Policy Target Group."""
resource = 'policy_target_group'
log = logging.getLogger(__name__ + '.DeletePolicyTargetGroup')
class UpdatePolicyTargetGroup(neutronV20.UpdateCommand):
"""Update Policy Target Group's information."""
resource = 'policy_target_group'
log = logging.getLogger(__name__ + '.UpdatePolicyTargetGroup')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('New description of the Policy Target Group'))
parser.add_argument(
'--l2-policy', metavar='L2_POLICY',
help=_('New L2 policy'))
parser.add_argument(
'--network-service-policy', metavar='NETWORK_SERVICE_POLICY',
help=_('New Network Service Policy'))
parser.add_argument(
'--provided-policy-rule-sets', type=utils.str2dict,
help=_('New comma separated list of Policy Rule Sets '
'(to unset use "")'))
parser.add_argument(
'--consumed-policy-rule-sets', type=utils.str2dict,
help=_('New comma separated list of Policy Rule Sets '
'(to unset use "")'))
parser.add_argument(
'--subnets', type=utils.str2list,
help=_('New comma separated list of Neutron Subnet UUIDs '
'(to unset use "")'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing, default is False'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.l2_policy:
body[self.resource]['l2_policy_id'] = \
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'l2_policy',
parsed_args.l2_policy)
if parsed_args.network_service_policy == '':
body[self.resource]['network_service_policy_id'] = None
elif parsed_args.network_service_policy:
body[self.resource]['network_service_policy_id'] = \
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'network_service_policy',
parsed_args.network_service_policy)
if parsed_args.provided_policy_rule_sets:
for key in parsed_args.provided_policy_rule_sets.keys():
id_key = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'policy_rule_set',
key)
parsed_args.provided_policy_rule_sets[id_key] = \
parsed_args.provided_policy_rule_sets.pop(key)
if parsed_args.consumed_policy_rule_sets:
for key in parsed_args.consumed_policy_rule_sets.keys():
id_key = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'policy_rule_set',
key)
parsed_args.consumed_policy_rule_sets[id_key] = \
parsed_args.consumed_policy_rule_sets.pop(key)
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'provided_policy_rule_sets', 'subnets',
'consumed_policy_rule_sets', 'shared'])
return body
class ListL2Policy(neutronV20.ListCommand):
"""List L2 Policies that belong to a given tenant."""
resource = 'l2_policy'
log = logging.getLogger(__name__ + '.ListL2Policy')
_formatters = {}
list_columns = ['id', 'name', 'description', 'l3_policy_id', 'network_id']
pagination_support = True
sorting_support = True
class ShowL2Policy(neutronV20.ShowCommand):
"""Show information of a given L2 Policy."""
resource = 'l2_policy'
log = logging.getLogger(__name__ + '.ShowL2Policy')
class CreateL2Policy(neutronV20.CreateCommand):
"""Create a L2 Policy for a given tenant."""
resource = 'l2_policy'
log = logging.getLogger(__name__ + '.CreateL2Policy')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the L2 Policy'))
parser.add_argument(
'--network',
help=_('Neutron Network UUID to map the L2 Policy to '
'(if not specified, new Neutron Network is created '
'implicitly)'))
parser.add_argument(
'--l3-policy',
default='',
help=_('L3 Policy UUID (if not specified default is used)'))
parser.add_argument(
'name', metavar='NAME',
help=_('Name of L2 Policy to create (required argument)'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing, default is False'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description', 'shared'])
if parsed_args.l3_policy:
body[self.resource]['l3_policy_id'] = \
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'l3_policy',
parsed_args.l3_policy)
if parsed_args.network:
body[self.resource]['network_id'] = (
parsed_args.network)
return body
class DeleteL2Policy(neutronV20.DeleteCommand):
"""Delete a given L2 Policy."""
resource = 'l2_policy'
log = logging.getLogger(__name__ + '.DeleteL2Policy')
class UpdateL2Policy(neutronV20.UpdateCommand):
"""Update L2 Policy's information."""
resource = 'l2_policy'
log = logging.getLogger(__name__ + '.UpdateL2Policy')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('New description of the L2 Policy'))
parser.add_argument(
'--l3-policy',
default='',
help=_('New L3 Policy'))
parser.add_argument(
'--name',
help=_('New name of the L2 Policy'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description', 'shared'])
if parsed_args.l3_policy:
body[self.resource]['l3_policy_id'] = \
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'l3_policy',
parsed_args.l3_policy)
return body
class ListL3Policy(neutronV20.ListCommand):
"""List l3_policies that belong to a given tenant."""
resource = 'l3_policy'
log = logging.getLogger(__name__ + '.ListL3Policy')
_formatters = {}
list_columns = ['id', 'name', 'description', 'ip_pool',
'subnet_prefix_length']
pagination_support = True
sorting_support = True
class ShowL3Policy(neutronV20.ShowCommand):
"""Show information of a given L3 Policy."""
resource = 'l3_policy'
log = logging.getLogger(__name__ + '.ShowL3Policy')
class CreateL3Policy(neutronV20.CreateCommand):
"""Create a L3 Policy for a given tenant."""
resource = 'l3_policy'
log = logging.getLogger(__name__ + '.CreateL3Policy')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the L3 Policy'))
parser.add_argument(
'--ip-version',
type=int,
# default=4, choices=[4, 6],
help=_('IP version, default is 4'))
parser.add_argument(
'--ip-pool',
help=_('CIDR of IP pool to create, default is 10.0.0.0/8'))
parser.add_argument(
'--subnet-prefix-length',
type=int,
help=_('Subnet prefix length, default is 24'))
parser.add_argument(
'--external-segment',
action='append', dest='external_segments', type=utils.str2dict,
# Note: The following format is also supported but we do not
# show it to avoid confusion
# help=_('Use format <ext-segment-id-1>=<ip-addr1:ipaddr2:...>'
# '(this option can be repeated)'))
help=_('Comma separated list of External Segments'
'(this option can be repeated)'))
parser.add_argument(
'--routers', type=utils.str2list,
help=_('Comma separated list of Neutron Router UUIDs'))
parser.add_argument(
'name', metavar='NAME',
help=_('Name of L3 policy to create (required argument)'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing, default is False'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.external_segments:
external_segments_dict = {}
for external_segment in parsed_args.external_segments:
external_segment_id = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'external_segment',
external_segment.keys()[0])
ipaddrs = external_segment.itervalues().next()
if ipaddrs is "":
ipaddrs = []
else:
ipaddrs = external_segment.itervalues().next().split(':')
external_segments_dict[external_segment_id] = ipaddrs
body[self.resource]['external_segments'] = external_segments_dict
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'ip_version', 'ip_pool', 'routers',
'subnet_prefix_length', 'shared'])
return body
class DeleteL3Policy(neutronV20.DeleteCommand):
"""Delete a given L3 Policy."""
resource = 'l3_policy'
log = logging.getLogger(__name__ + '.DeleteL3Policy')
class UpdateL3Policy(neutronV20.UpdateCommand):
"""Update L3 Policy's information."""
resource = 'l3_policy'
log = logging.getLogger(__name__ + '.UpdateL3Policy')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('New description of the L3 Policy'))
parser.add_argument(
'--subnet-prefix-length',
type=int,
help=_('New subnet prefix length'))
parser.add_argument(
'--external-segment',
action='append', dest='external_segments', type=utils.str2dict,
# Note: The following format is also supported but we do not
# show it to avoid confusion
# help=_('Use format <ext-segment-id-1>=<ip-addr1:ipaddr2:...>'
# '(this option can be repeated)'))
help=_('New comma separated list of External Segments'
'(this option can be repeated)'))
parser.add_argument(
'--routers', type=utils.str2list,
help=_('New comma separated list of Neutron Router UUIDs'))
parser.add_argument(
'--name',
help=_('New name of the L3 Policy'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.external_segments:
external_segments_dict = {}
for external_segment in parsed_args.external_segments:
if not external_segment:
break
external_segment_id = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'external_segment',
external_segment.keys()[0])
ipaddrs = external_segment.itervalues().next()
if ipaddrs is "":
ipaddrs = []
else:
ipaddrs = external_segment.itervalues().next().split(':')
external_segments_dict[external_segment_id] = ipaddrs
body[self.resource]['external_segments'] = external_segments_dict
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'ip_version', 'ip_pool', 'routers',
'subnet_prefix_length', 'shared'])
return body
class ListNetworkServicePolicy(neutronV20.ListCommand):
"""List Network Service Policies that belong to a given tenant."""
resource = 'network_service_policy'
log = logging.getLogger(__name__ + '.ListNetworkServicePolicy')
_formatters = {'network_servie_params': _format_network_service_params}
list_columns = ['id', 'name', 'description', 'network_service_params',
'policy_target_groups']
pagination_support = True
sorting_support = True
class ShowNetworkServicePolicy(neutronV20.ShowCommand):
"""Show information of a given network_service_policy."""
resource = 'network_service_policy'
log = logging.getLogger(__name__ + '.ShowNetworkServicePolicy')
class CreateNetworkServicePolicy(neutronV20.CreateCommand):
"""Create a Network Service Policy for a given tenant."""
resource = 'network_service_policy'
log = logging.getLogger(__name__ + '.CreateNetworkServicePolicy')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the network_service_policy'))
parser.add_argument(
'name',
help=_('Name of network_service_policy to create (required '
'argument)'))
parser.add_argument(
'--network-service-params',
metavar='type=PARAM_TYPE,name=PARAM_NAME,value=PARAM_VALUE',
action='append', dest='network_service_params',
type=utils.str2dict,
help=_('Params for this Network Service '
'Policy (this option can be repeated)'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing, default is False'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'network_service_params', 'shared'])
return body
class DeleteNetworkServicePolicy(neutronV20.DeleteCommand):
"""Delete a given network_service_policy."""
resource = 'network_service_policy'
log = logging.getLogger(__name__ + '.DeleteNetworkServicePolicy')
class UpdateNetworkServicePolicy(neutronV20.UpdateCommand):
"""Update network_service_policy's information."""
resource = 'network_service_policy'
log = logging.getLogger(__name__ + '.UpdateNetworkServicePolicy')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('New description of the network_service_policy'))
parser.add_argument(
'--name',
help=_('New name of the network_service_policy'))
parser.add_argument(
'--network-service-params',
metavar='type=PARAM_TYPE,name=PARAM_NAME,value=PARAM_VALUE',
action='append', dest='network_service_params',
type=utils.str2dict,
help=_('New params for this Network Service '
'Policy (this option can be repeated)'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'network_service_params', 'shared'])
return body
class ListPolicyClassifier(neutronV20.ListCommand):
"""List classifiers that belong to a given tenant."""
resource = 'policy_classifier'
log = logging.getLogger(__name__ + '.ListPolicyClassifier')
_formatters = {}
list_columns = ['id', 'name', 'protocol', 'port_range', 'direction']
pagination_support = True
sorting_support = True
class ShowPolicyClassifier(neutronV20.ShowCommand):
"""Show information of a given classifier."""
resource = 'policy_classifier'
log = logging.getLogger(__name__ + '.ShowPolicyClassifier')
class CreatePolicyClassifier(neutronV20.CreateCommand):
"""Create a classifier for a given tenant."""
resource = 'policy_classifier'
log = logging.getLogger(__name__ + '.CreatePolicyClassifier')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the policy classifier'))
parser.add_argument(
'--protocol',
choices=['tcp', 'udp', 'icmp'],
help=_('Protocol'))
parser.add_argument(
'--port-range',
help=_('Port range'))
parser.add_argument(
'--direction',
choices=['in', 'out', 'bi', ''],
help=_('Direction'))
parser.add_argument(
'name', metavar='NAME',
help=_('Name of classifier to create (required argument)'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing, default is False'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'protocol', 'port_range',
'direction', 'shared'])
return body
class DeletePolicyClassifier(neutronV20.DeleteCommand):
"""Delete a given classifier."""
resource = 'policy_classifier'
log = logging.getLogger(__name__ + '.DeletePolicyClassifier')
class UpdatePolicyClassifier(neutronV20.UpdateCommand):
"""Update classifier's information."""
resource = 'policy_classifier'
log = logging.getLogger(__name__ + '.UpdatePolicyClassifier')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('New description of the policy classifier'))
parser.add_argument(
'--protocol',
choices=['tcp', 'udp', 'icmp'],
help=_('New Protocol'))
parser.add_argument(
'--port-range',
help=_('New Port range'))
parser.add_argument(
'--direction',
choices=['in', 'out', 'bi', ''],
help=_('New Direction'))
parser.add_argument(
'--name',
help=_('New name of the classifier'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'protocol', 'port_range',
'direction', 'shared'])
return body
class ListPolicyAction(neutronV20.ListCommand):
"""List actions that belong to a given tenant."""
resource = 'policy_action'
log = logging.getLogger(__name__ + '.ListPolicyAction')
_formatters = {}
list_columns = ['id', 'name', 'action_type', 'action_value']
pagination_support = True
sorting_support = True
class ShowPolicyAction(neutronV20.ShowCommand):
"""Show information of a given action."""
resource = 'policy_action'
log = logging.getLogger(__name__ + '.ShowPolicyAction')
class CreatePolicyAction(neutronV20.CreateCommand):
"""Create a action for a given tenant."""
resource = 'policy_action'
log = logging.getLogger(__name__ + '.CreatePolicyAction')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the policy action'))
parser.add_argument(
'--action-type',
help=_('Type of action'))
parser.add_argument(
'--action-value',
help=_('Name/UUID of servicechain spec for redirect action'))
parser.add_argument(
'name', metavar='NAME',
help=_('Name of action to create (required argument)'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing, default is False'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.action_value:
body[self.resource]['action_value'] = (
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'servicechain_spec',
parsed_args.action_value))
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'action_type', 'shared'])
return body
class DeletePolicyAction(neutronV20.DeleteCommand):
"""Delete a given action."""
resource = 'policy_action'
log = logging.getLogger(__name__ + '.DeletePolicyAction')
class UpdatePolicyAction(neutronV20.UpdateCommand):
"""Update action's information."""
resource = 'policy_action'
log = logging.getLogger(__name__ + '.UpdatePolicyAction')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('New description of the policy action'))
parser.add_argument(
'--action-value',
help=_('New name/UUID of servicechain spec for redirect action'))
parser.add_argument(
'--name',
help=_('New name of the action'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.action_value:
body[self.resource]['action_value'] = (
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'servicechain_spec',
parsed_args.action_value))
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'shared'])
return body
class ListPolicyRule(neutronV20.ListCommand):
"""List policy_rules that belong to a given tenant."""
resource = 'policy_rule'
log = logging.getLogger(__name__ + '.ListPolicyRule')
_formatters = {}
list_columns = ['id', 'name', 'enabled', 'classifier_id',
'actions']
pagination_support = True
sorting_support = True
class ShowPolicyRule(neutronV20.ShowCommand):
"""Show information of a given policy_rule."""
resource = 'policy_rule'
log = logging.getLogger(__name__ + '.ShowPolicyRule')
class CreatePolicyRule(neutronV20.CreateCommand):
"""Create a policy_rule for a given tenant."""
resource = 'policy_rule'
log = logging.getLogger(__name__ + '.CreatePolicyRule')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the policy_rule'))
parser.add_argument(
'--enabled', type=bool,
help=_('Enable flag, default is True '
'(if False, this Policy Rule is ignored)'))
parser.add_argument(
'--classifier',
help=_('Policy Classifier (required argument)'))
parser.add_argument(
'--actions', type=utils.str2list,
help=_('Comma separated list of Policy Action(s)'))
parser.add_argument(
'name', metavar='NAME',
help=_('Name of Policy Rule to create (required argument)'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing, default is False'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.actions:
body[self.resource]['policy_actions'] = [
neutronV20.find_resourceid_by_name_or_id(
self.get_client(),
'policy_action',
elem) for elem in parsed_args.actions]
if parsed_args.classifier:
body[self.resource]['policy_classifier_id'] = \
neutronV20.find_resourceid_by_name_or_id(
self.get_client(),
'policy_classifier',
parsed_args.classifier)
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'enabled', 'shared'])
return body
class DeletePolicyRule(neutronV20.DeleteCommand):
"""Delete a given policy_rule."""
resource = 'policy_rule'
log = logging.getLogger(__name__ + '.DeletePolicyRule')
class UpdatePolicyRule(neutronV20.UpdateCommand):
"""Update policy_rule's information."""
resource = 'policy_rule'
log = logging.getLogger(__name__ + '.UpdatePolicyRule')
def add_known_arguments(self, parser):
parser.add_argument(
'--name',
help=_('New name of the Policy Rule'))
parser.add_argument(
'--description',
help=_('New description of the Policy Rule'))
parser.add_argument(
'--enabled', type=bool,
help=_('Enable flag (if False, this Policy Rule is ignored)'))
parser.add_argument(
'--classifier',
help=_('New Policy Classifier'))
parser.add_argument(
'--actions', type=utils.str2list,
help=_('New comma separated list of Policy Actions '
'(to unset use "")'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.actions == []:
body[self.resource]['policy_actions'] = []
elif parsed_args.actions:
body[self.resource]['policy_actions'] = [
neutronV20.find_resourceid_by_name_or_id(
self.get_client(),
'policy_action',
elem) for elem in parsed_args.actions]
if parsed_args.classifier:
body[self.resource]['policy_classifier_id'] = \
neutronV20.find_resourceid_by_name_or_id(
self.get_client(),
'policy_classifier',
parsed_args.classifier)
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'description',
'enabled', 'shared'])
return body
class ListPolicyRuleSet(neutronV20.ListCommand):
"""List policy_rule_sets that belong to a given tenant."""
resource = 'policy_rule_set'
log = logging.getLogger(__name__ + '.ListPolicyRuleSet')
_formatters = {}
list_columns = ['id', 'name', 'policy_rules']
pagination_support = True
sorting_support = True
class ShowPolicyRuleSet(neutronV20.ShowCommand):
"""Show information of a given policy_rule_set."""
resource = 'policy_rule_set'
log = logging.getLogger(__name__ + '.ShowPolicyRuleSet')
class CreatePolicyRuleSet(neutronV20.CreateCommand):
"""Create a policy rule set for a given tenant."""
resource = 'policy_rule_set'
log = logging.getLogger(__name__ + '.CreatePolicyRuleSet')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the policy rule set'))
parser.add_argument(
'--policy-rules', type=utils.str2list,
help=_('Comma separated list of Policy Rules'))
parser.add_argument(
'--child-policy-rule-sets', type=utils.str2list,
help=_('Comma separated list of child Policy Rule Sets'))
parser.add_argument(
'name', metavar='NAME',
help=_('Name of Policy Rule Set to create (required argument)'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing, default is False'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.policy_rules:
body[self.resource]['policy_rules'] = [
neutronV20.find_resourceid_by_name_or_id(
self.get_client(),
'policy_rule',
elem) for elem in parsed_args.policy_rules]
if parsed_args.child_policy_rule_sets:
body[self.resource]['child_policy_rule_sets'] = [
neutronV20.find_resourceid_by_name_or_id(
self.get_client(),
'policy_rule_set',
elem) for elem in parsed_args.child_policy_rule_sets]
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description', 'shared'])
return body
class DeletePolicyRuleSet(neutronV20.DeleteCommand):
"""Delete a given policy rule set."""
resource = 'policy_rule_set'
log = logging.getLogger(__name__ + '.DeletePolicyRuleSet')
class UpdatePolicyRuleSet(neutronV20.UpdateCommand):
"""Update policy rule set's information."""
resource = 'policy_rule_set'
log = logging.getLogger(__name__ + '.UpdatePolicyRuleSet')
def add_known_arguments(self, parser):
parser.add_argument(
'--name',
help=_('New name of the Policy Rule Set'))
parser.add_argument(
'--description',
help=_('New description of the Policy Rule Set'))
parser.add_argument(
'--policy-rules', type=utils.str2list,
help=_('New comma separated list of Policy Rules '
'(to unset use "")'))
parser.add_argument(
'--child-policy-rule-sets', type=utils.str2list,
help=_('New comma separated list of child Policy Rule Sets '
'(to unset use "")'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.policy_rules == []:
body[self.resource]['policy_rules'] = []
elif parsed_args.policy_rules:
body[self.resource]['policy_rules'] = [
neutronV20.find_resourceid_by_name_or_id(
self.get_client(),
'policy_rule',
elem) for elem in parsed_args.policy_rules]
if parsed_args.child_policy_rule_sets == []:
body[self.resource]['child_policy_rule_sets'] = []
elif parsed_args.child_policy_rule_sets:
body[self.resource]['child_policy_rule_sets'] = [
neutronV20.find_resourceid_by_name_or_id(
self.get_client(),
'policy_rule_set',
elem) for elem in parsed_args.child_policy_rule_sets]
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'description', 'shared'])
return body
class ListExternalPolicy(neutronV20.ListCommand):
"""List External Policies that belong to a given tenant."""
resource = 'external_policy'
log = logging.getLogger(__name__ + '.ListExternalPolicy')
list_columns = ['id', 'name', 'description', 'shared', 'external_segments']
pagination_support = True
sorting_support = True
class ShowExternalPolicy(neutronV20.ShowCommand):
"""Show information of a given External Policy."""
resource = 'external_policy'
log = logging.getLogger(__name__ + '.ShowExternalPolicy')
class CreateExternalPolicy(neutronV20.CreateCommand):
"""Create a External Policy for a given tenant."""
resource = 'external_policy'
log = logging.getLogger(__name__ + '.CreateExternalPolicy')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the External Policy'))
parser.add_argument(
'name', metavar='NAME',
help=_('Name of External Policy to create (required argument)'))
parser.add_argument(
'--external-segments', type=utils.str2list,
help=_('Comma separated list of External Segments'))
parser.add_argument(
'--provided-policy-rule-sets', type=utils.str2dict,
help=_('Comma separated list of Policy Rule Sets'))
parser.add_argument(
'--consumed-policy-rule-sets', type=utils.str2dict,
help=_('Comma separated list of Policy Rule Sets'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing, default is False'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.provided_policy_rule_sets:
for key in parsed_args.provided_policy_rule_sets.keys():
id_key = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'policy_rule_set',
key)
parsed_args.provided_policy_rule_sets[id_key] = (
parsed_args.provided_policy_rule_sets.pop(key))
if parsed_args.consumed_policy_rule_sets:
for key in parsed_args.consumed_policy_rule_sets.keys():
id_key = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'policy_rule_set',
key)
parsed_args.consumed_policy_rule_sets[id_key] = (
parsed_args.consumed_policy_rule_sets.pop(key))
if parsed_args.external_segments:
body[self.resource]['external_segments'] = [
neutronV20.find_resourceid_by_name_or_id(
self.get_client(),
'external_segment',
elem) for elem in parsed_args.external_segments]
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'provided_policy_rule_sets',
'consumed_policy_rule_sets', 'shared'])
return body
class DeleteExternalPolicy(neutronV20.DeleteCommand):
"""Delete a given External Policy."""
resource = 'external_policy'
log = logging.getLogger(__name__ + '.DeleteExternalPolicy')
class UpdateExternalPolicy(neutronV20.UpdateCommand):
"""Update External Policy's information."""
resource = 'external_policy'
log = logging.getLogger(__name__ + '.UpdateExternalPolicy')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('New description of the External Policy'))
parser.add_argument(
'--name',
help=_('New name of the External Policy'))
parser.add_argument(
'--external-segments', type=utils.str2list,
help=_('New comma separated list of External Segments '
'(to unset use "")'))
parser.add_argument(
'--provided-policy-rule-sets', type=utils.str2dict,
help=_('New comma separated list of Policy Rule Sets '
'(to unset use "")'))
parser.add_argument(
'--consumed-policy-rule-sets', type=utils.str2dict,
help=_('New comma separated list of Policy Rule Sets '
'(to unset use "")'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.provided_policy_rule_sets:
for key in parsed_args.provided_policy_rule_sets.keys():
id_key = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'policy_rule_set',
key)
parsed_args.provided_policy_rule_sets[id_key] = (
parsed_args.provided_policy_rule_sets.pop(key))
if parsed_args.consumed_policy_rule_sets:
for key in parsed_args.consumed_policy_rule_sets.keys():
id_key = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'policy_rule_set',
key)
parsed_args.consumed_policy_rule_sets[id_key] = (
parsed_args.consumed_policy_rule_sets.pop(key))
if parsed_args.external_segments == []:
body[self.resource]['external_segments'] = []
elif parsed_args.external_segments:
body[self.resource]['external_segments'] = [
neutronV20.find_resourceid_by_name_or_id(
self.get_client(),
'external_segment',
elem) for elem in parsed_args.external_segments]
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'provided_policy_rule_sets',
'consumed_policy_rule_sets', 'shared'])
return body
class ListExternalSegment(neutronV20.ListCommand):
"""List External Segments that belong to a given tenant."""
resource = 'external_segment'
log = logging.getLogger(__name__ + '.ListExternalSegment')
_formatters = {'external_routes': _format_host_routes, }
list_columns = ['id', 'name', 'description', 'cidr',
'external_routes', 'port_address_translation', 'shared']
pagination_support = True
sorting_support = True
class ShowExternalSegment(neutronV20.ShowCommand):
"""Show information of a given External Segment."""
resource = 'external_segment'
log = logging.getLogger(__name__ + '.ShowExternalSegment')
class CreateExternalSegment(neutronV20.CreateCommand):
"""Create a External Segment for a given tenant."""
resource = 'external_segment'
log = logging.getLogger(__name__ + '.CreateExternalSegment')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the External Segment'))
parser.add_argument(
'name',
help=_('Name of External Segment to create (required argument)'))
parser.add_argument(
'--ip-version',
type=int, choices=[4, 6],
help=_('IP version, default is 4'))
parser.add_argument(
'--cidr',
help=_('CIDR of External Segment, default is 172.16.0.0/12'))
parser.add_argument(
'--external-route', metavar='destination=CIDR,nexthop=IP_ADDR',
action='append', dest='external_routes', type=utils.str2dict,
help=_('If no nexthop, use format: destination=CIDR,nexthop '
'(this option can be repeated)'))
parser.add_argument(
'--port-address-translation', type=bool,
help=_('Enable port-based address translation, default is False'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing, default is False'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.external_routes:
eroutes = []
for er in parsed_args.external_routes:
if 'nexthop' in er and er['nexthop'] == '':
er['nexthop'] = None
if er:
eroutes.append(er)
body['external_segment']['external_routes'] = eroutes
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'ip_version', 'cidr',
'port_address_translation', 'shared'])
return body
class DeleteExternalSegment(neutronV20.DeleteCommand):
"""Delete a given External Segment."""
resource = 'external_segment'
log = logging.getLogger(__name__ + '.DeleteExternalSegment')
class UpdateExternalSegment(neutronV20.UpdateCommand):
"""Update External Segment's information."""
resource = 'external_segment'
log = logging.getLogger(__name__ + '.UpdateExternalSegment')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('New description of the External Segment'))
parser.add_argument(
'--name',
help=_('New name of External Segment'))
parser.add_argument(
'--external-route', metavar='destination=CIDR,nexthop=IP_ADDR',
action='append', dest='external_routes', type=utils.str2dict,
help=_('If no nexthop, use format: destination=CIDR,nexthop '
'(this option can be repeated)'))
parser.add_argument(
'--port-address-translation', type=bool,
help=_('Enable or disable port-based address translation'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
if parsed_args.external_routes == [{}]:
body[self.resource]['external_routes'] = []
elif parsed_args.external_routes:
eroutes = []
for er in parsed_args.external_routes:
if 'nexthop' in er and er['nexthop'] == '':
er['nexthop'] = None
if er:
eroutes.append(er)
body[self.resource]['external_routes'] = eroutes
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'ip_version', 'cidr',
'port_address_translation', 'shared'])
return body
class ListNatPool(neutronV20.ListCommand):
"""List NAT Pools that belong to a given tenant."""
resource = 'nat_pool'
log = logging.getLogger(__name__ + '.ListNatPool')
list_columns = ['id', 'name', 'description', 'ip_pool',
'external_segment_id', 'shared']
pagination_support = True
sorting_support = True
class ShowNatPool(neutronV20.ShowCommand):
"""Show information of a given NAT Pool."""
resource = 'nat_pool'
log = logging.getLogger(__name__ + '.ShowNatPool')
class CreateNatPool(neutronV20.CreateCommand):
"""Create a NAT Pool for a given tenant."""
resource = 'nat_pool'
log = logging.getLogger(__name__ + '.CreateNatPool')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('Description of the NAT Pool'))
parser.add_argument(
'name', metavar='NAME',
help=_('Name of NAT Pool to create (required argument)'))
parser.add_argument(
'--ip-version',
type=int, choices=[4, 6],
help=_('IP version, default is 4'))
parser.add_argument(
'--ip-pool',
help=_('CIDR for NAT Pool'))
parser.add_argument(
'--external-segment',
help=_('External Segment'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing, default is False'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'ip_version', 'ip_pool', 'shared'])
if parsed_args.external_segment:
body[self.resource]['external_segment_id'] = (
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'external_segment',
parsed_args.external_segment))
return body
class DeleteNatPool(neutronV20.DeleteCommand):
"""Delete a given NAT Pool."""
resource = 'nat_pool'
log = logging.getLogger(__name__ + '.DeleteNatPool')
class UpdateNatPool(neutronV20.UpdateCommand):
"""Update NAT Pool's information."""
resource = 'nat_pool'
log = logging.getLogger(__name__ + '.UpdateNatPool')
def add_known_arguments(self, parser):
parser.add_argument(
'--description',
help=_('New description of the NAT Pool'))
parser.add_argument(
'--name',
help=_('New name of NAT Pool'))
parser.add_argument(
'--external-segment',
help=_('New External Segment'))
parser.add_argument(
'--shared', type=bool,
help=_('Enable or disable resource sharing'))
def args2body(self, parsed_args):
body = {self.resource: {}, }
neutronV20.update_dict(parsed_args, body[self.resource],
['name', 'tenant_id', 'description',
'ip_version', 'ip_pool', 'shared'])
if parsed_args.external_segment:
body[self.resource]['external_segment_id'] = (
neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'external_segment',
parsed_args.external_segment))
return body