Merge "Updates to Openstack CLI to reflect Database."

This commit is contained in:
Zuul 2019-01-30 10:25:27 +00:00 committed by Gerrit Code Review
commit a0446759a8
29 changed files with 1583 additions and 163 deletions

View File

@ -18,3 +18,50 @@
timeout: 7800
required-projects:
- openstack-infra/devstack-gate
- git.openstack.org/openstack/keystone
- git.openstack.org/openstack/neutron
roles:
- zuul: openstack-infra/devstack
vars:
devstack_localrc:
DATABASE_PASSWORD: secretdatabase
RABBIT_PASSWORD: secretrabbit
ADMIN_PASSWORD: secretadmin
SERVICE_PASSWORD: secretservice
NETWORK_GATEWAY: 10.1.0.1
FIXED_RANGE: 10.1.0.0/20
IPV4_ADDRS_SAFE_TO_USE: 10.1.0.0/20
FLOATING_RANGE: 172.24.5.0/24
PUBLIC_NETWORK_GATEWAY: 172.24.5.1
LOGFILE: /opt/stack/logs/devstacklog.txt
LIBVIRT_TYPE: qemu
LIBS_FROM_GIT: python-openstackclient
GLANCE_V1_ENABLED: true
devstack_services:
dstat: true
etcd3: true
mysql: true
peakmem_tracker: true
rabbit: true
tls-proxy: true
key: true
n-api: true
n-api-meta: true
n-cauth: true
n-cond: true
n-cpu: true
n-novnc: true
n-obj: true
n-sch: true
placement-api: true
q-agt: true
q-dhcp: true
q-l3: true
q-meta: true
q-metering: true
q-svc: true
horizon: false
tempest: false
osc_environment:
PYTHONUNBUFFERED: 'true'
OS_CLOUD: devstack-admin

View File

View File

@ -0,0 +1,164 @@
# Copyright (c) 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from osc_lib.command import command
from osc_lib import utils
object_path = "/classification_groups"
resource = 'classification_group'
class CreateClassificationGroup(command.ShowOne):
"""Create a Classification Group."""
def get_parser(self, prog_name):
parser = super(CreateClassificationGroup, self).get_parser(prog_name)
parser.add_argument(
'name', metavar='NAME',
help=('Name of the Classification Group.'))
parser.add_argument(
'--description',
help=('Description for the Classification Group.'))
parser.add_argument(
'--classification', nargs='*',
help=('Classification value.'))
parser.add_argument(
'--classification-group', nargs='*',
help=('ID of the Classification Group.'))
parser.add_argument(
'--operator',
help=('Operation to be performed (AND/OR).'))
parser.add_argument(
'--shared',
help=('Whether the Classification group should be '
'shared with other projects.'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
attrs = _get_attrs(self.app.client_manager,
parsed_args, is_create=True)
obj = client.create_ext(object_path, {resource: attrs})
columns = _get_columns(obj[resource])
data = utils.get_dict_properties(obj[resource], columns)
return columns, data
class DeleteClassificationGroup(command.Command):
"""Delete a given Classification Group."""
def get_parser(self, prog_name):
parser = super(DeleteClassificationGroup, self).get_parser(prog_name)
parser.add_argument(
'classification_group',
metavar="CLASSIFICATION_GROUP",
help=('ID of the Classification Group to delete.'))
return parser
def take_action(self, parsed_args):
id = parsed_args.classification_group
client = self.app.client_manager.neutronclient
client.delete_ext(object_path + '/%s', id)
class ListClassificationGroup(command.Lister):
"""List the Classification Groups that belong to a given tenant."""
def take_action(self, parsed_args):
data = self.app.client_manager.neutronclient.list(
collection='classification_groups', path=object_path,
retrieve_all=True)
headers = ('ID', 'Name', 'Description', 'Operator', 'Shared')
columns = ('id', 'name', 'description', 'operator', 'shared')
return (headers, (utils.get_dict_properties(
s[resource], columns) for s in data['classification_groups']))
class ShowClassificationGroup(command.ShowOne):
"""Show information of a given Classification Group."""
def get_parser(self, prog_name):
parser = super(ShowClassificationGroup, self).get_parser(prog_name)
parser.add_argument(
'classification_group',
metavar="CLASSIFICATION_GROUP",
help=('ID of the Classification Group to display.'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
cl = client.show_ext(object_path + '/%s',
parsed_args.classification_group)
columns = _get_columns(cl[resource])
data = utils.get_dict_properties(cl[resource], columns)
return columns, data
class UpdateClassificationGroup(command.Command):
def get_parser(self, prog_name):
parser = super(UpdateClassificationGroup, self).get_parser(prog_name)
parser.add_argument(
'--name', default='',
metavar='NAME',
help=('Name of the Classification Group.'))
parser.add_argument(
'--description', default='',
help=('Description for the Classification Group.'))
parser.add_argument(
'classification_group',
metavar="CLASSIFICATION_GROUP",
help=('ID of the Classification Group to update.'))
return parser
def take_action(self, parsed_args):
id = parsed_args.classification_group
client = self.app.client_manager.neutronclient
attrs = _get_attrs(self.app.client_manager,
parsed_args, is_create=False)
cl = client.update_ext(object_path + '/%s', id, {resource: attrs})
columns = _get_columns(cl[resource])
data = utils.get_dict_properties(cl[resource], columns)
return columns, data
def _get_attrs(client_manager, parsed_args, is_create=False):
attrs = {}
if parsed_args.name is not None:
attrs['name'] = str(parsed_args.name)
if parsed_args.description is not None:
attrs['description'] = str(parsed_args.description)
if is_create:
if parsed_args.classification is not None:
attrs['classification'] = parsed_args.classification
if parsed_args.classification_group is not None:
attrs['classification_group'] = parsed_args.classification_group
if parsed_args.operator is not None:
attrs['operator'] = parsed_args.operator
if parsed_args.shared is not None:
attrs['shared'] = parsed_args.shared
return attrs
def _get_columns(resource):
columns = list(resource.keys())
if 'tenant_id' in columns:
columns.remove('tenant_id')
if 'project_id' not in columns:
columns.append('project_id')
return tuple(sorted(columns))

View File

@ -0,0 +1,33 @@
# Copyright (c) 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from osc_lib.command import command
from osc_lib import utils
object_path = "/classification_type"
resource = 'classification_type'
class ListClassificationType(command.Lister):
"""List the Classification Types available."""
def take_action(self, parsed_args):
data = self.app.client_manager.neutronclient.list(
collection='classification_type',
path=object_path, retrieve_all=True)
headers = ('Name', 'Definition')
columns = ('type', 'supported_parameters')
return (headers, (utils.get_dict_properties(
s, columns) for s in data['classification_type']))

View File

@ -0,0 +1,182 @@
# Copyright (c) 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from osc_lib.command import command
from osc_lib import utils
object_path = "/classifications"
resource = 'classification'
class CreateEthernetClassification(command.ShowOne):
"""Create an Ethernet Classification."""
def get_parser(self, prog_name):
parser = super(CreateEthernetClassification,
self).get_parser(prog_name)
parser.add_argument(
'name', metavar='NAME',
help=('Name of the Ethernet Classification.'))
parser.add_argument(
'--description',
help=('Description for the Ethernet Classification.'))
parser.add_argument(
'--negated',
help=('Whether the complement of the Ethernet '
'Classification should be matched.'))
parser.add_argument(
'--shared',
help=('Whether the Ethernet Classification should '
'be shared with other projects.'))
parser.add_argument(
'--src-addr',
help=('Source MAC Address of the Ethernet Classification.'))
parser.add_argument(
'--dst-addr',
help=('Destination MAC Address of the Ethernet '
'Classification.'))
parser.add_argument(
'--ethertype',
help=('Protocol value of the Ethernet Classification.'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
attrs = _get_attrs(self.app.client_manager,
parsed_args, is_create=True)
obj = client.create_ext(object_path, {resource: attrs})
columns = _get_columns(obj[resource])
data = utils.get_dict_properties(obj[resource], columns)
return columns, data
class DeleteEthernetClassification(command.Command):
"""Delete a given Ethernet Classification."""
def get_parser(self, prog_name):
parser = super(DeleteEthernetClassification,
self).get_parser(prog_name)
parser.add_argument(
'ethernet_classification',
metavar="ETHERNET_CLASSIFICATION",
help=('ID of the Ethernet Classification to delete.'))
return parser
def take_action(self, parsed_args):
id = parsed_args.ethernet_classification
client = self.app.client_manager.neutronclient
client.delete_ext(object_path + '/%s', id)
class ListEthernetClassification(command.Lister):
"""List the Ethernet Classifications that belong to a given tenant."""
def take_action(self, parsed_args):
data = self.app.client_manager.neutronclient.list(
collection='classifications',
path=object_path, retrieve_all=True, c_type='ethernet')
headers = ('C_Type', 'ID', 'Name', 'Description', 'Negated', 'Shared')
columns = ('c_type', 'id', 'name', 'description', 'negated', 'shared')
return (headers, (utils.get_dict_properties(
s, columns) for s in data['classifications']))
class ShowEthernetClassification(command.ShowOne):
"""Show information of a given Ethernet Classification."""
def get_parser(self, prog_name):
parser = super(ShowEthernetClassification, self).get_parser(prog_name)
parser.add_argument(
'ethernet_classification',
metavar="ETHERNET_CLASSIFICATION",
help=('ID of the Ethernet Classification to display.'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
cl = client.show_ext(object_path + '/%s',
parsed_args.ethernet_classification,
c_type='ethernet')
columns = _get_columns(cl[resource])
data = utils.get_dict_properties(cl[resource], columns)
return columns, data
class UpdateEthernetClassification(command.ShowOne):
"""Update name and description of a given Ethernet Classification."""
def get_parser(self, prog_name):
parser = super(UpdateEthernetClassification,
self).get_parser(prog_name)
parser.add_argument(
'--name', default='',
metavar='NAME',
help=('Name of the Ethernet Classification.'))
parser.add_argument(
'--description', default='',
help=('Description for the Ethernet Classification.'))
parser.add_argument(
'ethernet_classification',
metavar="ETHERNET_CLASSIFICATION",
help=('ID of the Ethernet Classification to update.'))
return parser
def take_action(self, parsed_args):
id = parsed_args.ethernet_classification
client = self.app.client_manager.neutronclient
attrs = _get_attrs(self.app.client_manager,
parsed_args, is_create=False)
cl = client.update_ext(object_path + '/%s', id, {resource: attrs})
columns = _get_columns(cl[resource])
data = utils.get_dict_properties(cl[resource], columns)
return columns, data
def _get_attrs(client_manager, parsed_args, is_create=False):
attrs = {}
definition = {}
if parsed_args.name is not None:
attrs['name'] = str(parsed_args.name)
if parsed_args.description is not None:
attrs['description'] = str(parsed_args.description)
if is_create:
attrs['c_type'] = 'ethernet'
if parsed_args.negated is not None:
attrs['negated'] = str(parsed_args.negated)
if parsed_args.shared is not None:
attrs['shared'] = str(parsed_args.shared)
if parsed_args.src_addr is not None:
definition['src_addr'] = parsed_args.src_addr
if parsed_args.dst_addr is not None:
definition['dst_addr'] = parsed_args.dst_addr
if parsed_args.ethertype is not None:
definition['ethertype'] = parsed_args.ethertype
attrs['definition'] = definition
return attrs
def _get_columns(resource):
columns = list(resource.keys())
if 'tenant_id' in columns:
columns.remove('tenant_id')
if 'project_id' not in columns:
columns.append('project_id')
return tuple(sorted(columns))

View File

@ -0,0 +1,233 @@
# Copyright (c) 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from osc_lib.command import command
from osc_lib import utils
object_path = "/classifications"
resource = 'classification'
class CreateIPV4Classification(command.ShowOne):
"""Create an IPV4 Classification."""
def get_parser(self, prog_name):
parser = super(CreateIPV4Classification, self).get_parser(prog_name)
parser.add_argument(
'name', metavar='NAME',
help=('Name of the IPV4 Classification.'))
parser.add_argument(
'--description',
help=('Description for the IPV4 Classification.'))
parser.add_argument(
'--negated',
help=('Whether the complement of the IPV4 '
'Classification should be matched.'))
parser.add_argument(
'--shared',
help=('Whether the IPV4 Classification should be '
'shared with other projects.'))
parser.add_argument(
'--dscp',
help=('DSCP Classification value. Type of Service.'))
parser.add_argument(
'--dscp-mask',
help=('DSCP Mask value. Type of Service.'))
parser.add_argument(
'--ecn',
help=('Allows notification of network congestion.'))
parser.add_argument(
'--length-min',
help=('Minimum length of the IP Packet, including '
'IP header and IP payload.'))
parser.add_argument(
'--length-max',
help=('Maximum length of the IP Packet, including '
'IP header and IP payload.'))
parser.add_argument(
'--flags',
help=('Whether the packet can be fragmented.'))
parser.add_argument(
'--flags-mask',
help=('Whether the packet can be fragmented.'))
parser.add_argument(
'--ttl-min',
help=('Minimum number of hops which the packet may '
'be routed over.'))
parser.add_argument(
'--ttl-max',
help=('Maximum number of hops which the packet may '
'be routed over.'))
parser.add_argument(
'--protocol',
help=('Type of transport the packet belongs to.'))
parser.add_argument(
'--src-addr',
help=('Source Address of the IPV4 Classification.'))
parser.add_argument(
'--dst-addr',
help=('Destination Address of the IPV4 Classification.'))
parser.add_argument(
'--options',
help=('Options values for the IPV4 Classification.'))
parser.add_argument(
'--options-mask',
help=('Options values for the IPV4 Classification.'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
attrs = _get_attrs(self.app.client_manager,
parsed_args, is_create=True)
obj = client.create_ext(object_path, {resource: attrs})
columns = _get_columns(obj[resource])
data = utils.get_dict_properties(obj[resource], columns)
return columns, data
class DeleteIPV4Classification(command.Command):
"""Delete a given IPV4 Classification."""
def get_parser(self, prog_name):
parser = super(DeleteIPV4Classification, self).get_parser(prog_name)
parser.add_argument(
'ipv4_classification',
metavar="IPV4_CLASSIFICATION",
help=('ID of the IPV4 Classification to delete.'))
return parser
def take_action(self, parsed_args):
id = parsed_args.ipv4_classification
client = self.app.client_manager.neutronclient
client.delete_ext(object_path + '/%s', id)
class ListIPV4Classification(command.Lister):
"""List the IPV4 Classification that belong to a given tenant."""
def take_action(self, parsed_args):
data = self.app.client_manager.neutronclient.list(
collection='classifications', path=object_path,
retrieve_all=True, c_type='ipv4')
headers = ('C_Type', 'ID', 'Name', 'Description', 'Negated', 'Shared')
columns = ('c_type', 'id', 'name', 'description', 'negated', 'shared')
return (headers, (utils.get_dict_properties(
s, columns) for s in data['classifications']))
class ShowIPV4Classification(command.ShowOne):
"""Show information of a given IPV4 Classification"""
def get_parser(self, prog_name):
parser = super(ShowIPV4Classification, self).get_parser(prog_name)
parser.add_argument(
'ipv4_classification',
metavar="IPV4_CLASSIFICATION",
help=('ID of the IPV4 Classification to display.'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
cl = client.show_ext(object_path + '/%s',
parsed_args.ipv4_classification, c_type='ipv4')
columns = _get_columns(cl[resource])
data = utils.get_dict_properties(cl[resource], columns)
return columns, data
class UpdateIPV4Classification(command.ShowOne):
"""Update name and description of a given IPV4Classification."""
def get_parser(self, prog_name):
parser = super(UpdateIPV4Classification, self).get_parser(prog_name)
parser.add_argument(
'--name', default='',
metavar='NAME',
help=('Name of the IPV4 Classification.'))
parser.add_argument(
'--description', default='',
help=('Description of the IPV4 Classification.'))
parser.add_argument(
'ipv4_classification',
metavar="IPV4_CLASSIFICATION",
help=('ID of the IPV4 Classification to update.'))
return parser
def take_action(self, parsed_args):
id = parsed_args.ipv4_classification
client = self.app.client_manager.neutronclient
attrs = _get_attrs(self.app.client_manager,
parsed_args, is_create=False)
cl = client.update_ext(object_path + '/%s', id, {resource: attrs})
columns = _get_columns(cl[resource])
data = utils.get_dict_properties(cl[resource], columns)
return columns, data
def _get_attrs(client_manager, parsed_args, is_create=False):
attrs = {}
definition = {}
if parsed_args.name is not None:
attrs['name'] = str(parsed_args.name)
if parsed_args.description is not None:
attrs['description'] = str(parsed_args.description)
if is_create:
attrs['c_type'] = 'ipv4'
if parsed_args.negated is not None:
attrs['negated'] = str(parsed_args.negated)
if parsed_args.shared is not None:
attrs['shared'] = str(parsed_args.shared)
if parsed_args.dscp is not None:
definition['dscp'] = parsed_args.dscp
if parsed_args.dscp_mask is not None:
definition['dscp_mask'] = parsed_args.dscp_mask
if parsed_args.ecn is not None:
definition['ecn'] = parsed_args.ecn
if parsed_args.length_min is not None:
definition['length_min'] = parsed_args.length_min
if parsed_args.length_max is not None:
definition['length_max'] = parsed_args.length_max
if parsed_args.ttl_min is not None:
definition['ttl_min'] = parsed_args.ttl_min
if parsed_args.flags is not None:
definition['flags'] = parsed_args.flags
if parsed_args.flags_mask is not None:
definition['flags_mask'] = parsed_args.flags_mask
if parsed_args.protocol is not None:
definition['protocol'] = parsed_args.protocol
if parsed_args.src_addr is not None:
definition['src_addr'] = parsed_args.src_addr
if parsed_args.dst_addr is not None:
definition['dst_addr'] = parsed_args.dst_addr
if parsed_args.options is not None:
definition['options'] = parsed_args.options
if parsed_args.options_mask is not None:
definition['options_mask'] = parsed_args.options_mask
attrs['definition'] = definition
return attrs
def _get_columns(resource):
columns = list(resource.keys())
if 'tenant_id' in columns:
columns.remove('tenant_id')
if 'project_id' not in columns:
columns.append('project_id')
return tuple(sorted(columns))

View File

@ -0,0 +1,217 @@
# Copyright (c) 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from osc_lib.command import command
from osc_lib import utils
object_path = "/classifications"
resource = 'classification'
class CreateIPV6Classification(command.ShowOne):
"""Create an IPV6 Classification."""
def get_parser(self, prog_name):
parser = super(CreateIPV6Classification, self).get_parser(prog_name)
parser.add_argument(
'name', metavar='NAME',
help=('Name of the IPV6 Classification.'))
parser.add_argument(
'--description',
help=('Description for the IPV6 Classification.'))
parser.add_argument(
'--negated',
help=('Whether the complement of the IPV6 '
'Classification should be matched.'))
parser.add_argument(
'--shared',
help=('Whether the IPV6 Classification should be '
'shared with other projects.'))
parser.add_argument(
'--dscp',
help=('DSCP Classification value. Type of Service.'))
parser.add_argument(
'--dscp-mask',
help=('DSCP Mask value. Type of Service.'))
parser.add_argument(
'--ecn',
help=('Allows notification of network congestion.'))
parser.add_argument(
'--length-min',
help=('Minimum length of the Packet, following the IPV6 '
'Header.'))
parser.add_argument(
'--length-max',
help=('Maximum length of the Packet, following the IPV6 '
'Header.'))
parser.add_argument(
'--next-header',
help=('Type of the next header. Transport protocol used by '
'the packet\'s payload.'))
parser.add_argument(
'--hops-min',
help=('Minimum number of hops which the packet may be routed '
'over.'))
parser.add_argument(
'--hops-max',
help=('Maximum number of hops which the packet may be routed '
'over.'))
parser.add_argument(
'--src-addr',
help=('Source Address of the IPV6 Classification.'))
parser.add_argument(
'--dst-addr',
help=('Destination Address of the IPV6 Classification.'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
attrs = _get_attrs(self.app.client_manager,
parsed_args, is_create=True)
obj = client.create_ext(object_path, {resource: attrs})
columns = _get_columns(obj[resource])
data = utils.get_dict_properties(obj[resource], columns)
return columns, data
class DeleteIPV6Classification(command.Command):
"""Delete a given IPV6 Classification."""
def get_parser(self, prog_name):
parser = super(DeleteIPV6Classification, self).get_parser(prog_name)
parser.add_argument(
'ipv6_classification',
metavar="IPV6_CLASSIFICATION",
help=('ID of the IPV6 Classification to delete.'))
return parser
def take_action(self, parsed_args):
id = parsed_args.ipv6_classification
client = self.app.client_manager.neutronclient
client.delete_ext(object_path + '/%s', id)
class ListIPV6Classification(command.Lister):
"""List the IPV6 Classification that belong to a given tenant."""
def take_action(self, parsed_args):
data = self.app.client_manager.neutronclient.list(
collection='classifications',
path=object_path, retrieve_all=True, c_type='ipv6')
headers = ('C_Type', 'ID', 'Name', 'Description', 'Negated', 'Shared')
columns = ('c_type', 'id', 'name', 'description', 'negated', 'shared')
return (headers, (utils.get_dict_properties(
s, columns) for s in data['classifications']))
class ShowIPV6Classification(command.ShowOne):
"""Show informcation of a given IPV6 Classification."""
def get_parser(self, prog_name):
parser = super(ShowIPV6Classification, self).get_parser(prog_name)
parser.add_argument(
'ipv6_classification',
metavar="IPV6_CLASSIFICATION",
help=('ID of the IPV6 Classification to display.'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
cl = client.show_ext(object_path + '/%s',
parsed_args.ipv6_classification, c_type='ipv6')
columns = _get_columns(cl[resource])
data = utils.get_dict_properties(cl[resource], columns)
return columns, data
class UpdateIPV6Classification(command.ShowOne):
"""Update name and description of a given IPV6 Classification."""
def get_parser(self, prog_name):
parser = super(UpdateIPV6Classification, self).get_parser(prog_name)
parser.add_argument(
'--name', default='',
metavar='NAME',
help=('Name of the IPV6 Classification.'))
parser.add_argument(
'--description', default='',
help=('Description for the IPV6 Classification.'))
parser.add_argument(
'ipv6_classification',
metavar="IPV6_CLASSIFICATION",
help=('ID of the IPV6 Classification to update.'))
return parser
def take_action(self, parsed_args):
id = parsed_args.ipv6_classification
client = self.app.client_manager.neutronclient
attrs = _get_attrs(self.app.client_manager,
parsed_args, is_create=False)
cl = client.update_ext(object_path + '/%s', id, {resource: attrs})
columns = _get_columns(cl[resource])
data = utils.get_dict_properties(cl[resource], columns)
return columns, data
def _get_attrs(client_manager, parsed_args, is_create=False):
attrs = {}
definition = {}
if parsed_args.name is not None:
attrs['name'] = str(parsed_args.name)
if parsed_args.description is not None:
attrs['description'] = str(parsed_args.description)
if is_create:
attrs['c_type'] = 'ipv6'
if parsed_args.negated is not None:
attrs['negated'] = str(parsed_args.negated)
if parsed_args.shared is not None:
attrs['shared'] = str(parsed_args.shared)
if parsed_args.dscp is not None:
definition['dscp'] = parsed_args.dscp
if parsed_args.dscp_mask is not None:
definition['dscp_mask'] = parsed_args.dscp_mask
if parsed_args.ecn is not None:
definition['ecn'] = parsed_args.ecn
if parsed_args.length_min is not None:
definition['length_min'] = parsed_args.length_min
if parsed_args.length_max is not None:
definition['length_max'] = parsed_args.length_max
if parsed_args.next_header is not None:
definition['next_header'] = parsed_args.next_header
if parsed_args.hops_min is not None:
definition['hops_min'] = parsed_args.hops_min
if parsed_args.hops_max is not None:
definition['hops_max'] = parsed_args.hops_max
if parsed_args.src_addr is not None:
definition['src_addr'] = parsed_args.src_addr
if parsed_args.dst_addr is not None:
definition['dst_addr'] = parsed_args.dst_addr
attrs['definition'] = definition
return attrs
def _get_columns(resource):
columns = list(resource.keys())
if 'tenant_id' in columns:
columns.remove('tenant_id')
if 'project_id' not in columns:
columns.append('project_id')
return tuple(sorted(columns))

View File

@ -0,0 +1,203 @@
# Copyright (c) 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from osc_lib.command import command
from osc_lib import utils
object_path = "/classifications"
resource = 'classification'
class CreateTCPClassification(command.ShowOne):
"""Create a TCP Classification."""
def get_parser(self, prog_name):
parser = super(CreateTCPClassification, self).get_parser(prog_name)
parser.add_argument(
'name', metavar='NAME',
help=('Name of the TCP Classification.'))
parser.add_argument(
'--description',
help=('Description for the TCP Classification.'))
parser.add_argument(
'--negated',
help=('Whether the complement of the TCP '
'Classification should be matched.'))
parser.add_argument(
'--shared',
help=('Whether the TCP Classification should be '
'shared with other projects.'))
parser.add_argument(
'--src-port-min',
help=('Source port TCP Classification minimum value.'))
parser.add_argument(
'--src-port-max',
help=('Source port TCP Classification maximum value.'))
parser.add_argument(
'--dst-port-min',
help=('Destination port TCP Classification minimum value.'))
parser.add_argument(
'--dst-port-max',
help=('Destination port TCP Classification maximum value.'))
parser.add_argument(
'--flags',
help=('Control flag value for the TCP Classification.'))
parser.add_argument(
'--flags-mask',
help=('Control flag mask for the TCP Classification.'))
parser.add_argument(
'--window-min',
help=('The minimum size of the receive window. Number of data '
'octets the receiver is willing to accept.'))
parser.add_argument(
'--window-max',
help=('The maximum size of the receive window. Number of data '
'octets the receiver is willing to accept.'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
attrs = _get_attrs(self.app.client_manager,
parsed_args, is_create=True)
obj = client.create_ext(object_path, {resource: attrs})
columns = _get_columns(obj[resource])
data = utils.get_dict_properties(obj[resource], columns)
return columns, data
class DeleteTCPClassification(command.Command):
"""Delete a given TCP Classification."""
def get_parser(self, prog_name):
parser = super(DeleteTCPClassification, self).get_parser(prog_name)
parser.add_argument(
'tcp_classification',
metavar="TCP_CLASSIFICATION",
help=('ID of the TCP Classification to delete.'))
return parser
def take_action(self, parsed_args):
id = parsed_args.tcp_classification
client = self.app.client_manager.neutronclient
client.delete_ext(object_path + '/%s', id)
class ListTCPClassification(command.Lister):
"""List the TCP Classification that belong to a given tenant."""
def take_action(self, parsed_args):
data = self.app.client_manager.neutronclient.list(
collection='classifications',
path=object_path, retrieve_all=True, c_type='tcp')
headers = ('C_Type', 'ID', 'Name', 'Description', 'Negated', 'Shared')
columns = ('c_type', 'id', 'name', 'description', 'negated', 'shared')
return (headers, (utils.get_dict_properties(
s, columns) for s in data['classifications']))
class ShowTCPClassification(command.ShowOne):
"""Show information of a given TCP Classification."""
def get_parser(self, prog_name):
parser = super(ShowTCPClassification, self).get_parser(prog_name)
parser.add_argument(
'tcp_classification',
metavar="TCP_CLASSIFICATION",
help=('ID of the TCP Classification to display.'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
cl = client.show_ext(object_path + '/%s',
parsed_args.tcp_classification, c_type='tcp')
columns = _get_columns(cl[resource])
data = utils.get_dict_properties(cl[resource], columns)
return columns, data
class UpdateTCPClassification(command.ShowOne):
"""Update name and description of a given TCP Classification."""
def get_parser(self, prog_name):
parser = super(UpdateTCPClassification, self).get_parser(prog_name)
parser.add_argument(
'--name', default='',
metavar='NAME',
help=('Name of the TCP Classification.'))
parser.add_argument(
'--description', default='',
help=('Description of the TCP Classification.'))
parser.add_argument(
'tcp_classification',
metavar="TCP_CLASSIFICATION",
help=('ID of the TCP Classification to update.'))
return parser
def take_action(self, parsed_args):
id = parsed_args.tcp_classification
client = self.app.client_manager.neutronclient
attrs = _get_attrs(self.app.client_manager,
parsed_args, is_create=False)
cl = client.update_ext(object_path + '/%s', id, {resource: attrs})
columns = _get_columns(cl[resource])
data = utils.get_dict_properties(cl[resource], columns)
return columns, data
def _get_attrs(client_manager, parsed_args, is_create=False):
attrs = {}
definition = {}
if parsed_args.name is not None:
attrs['name'] = str(parsed_args.name)
if parsed_args.description is not None:
attrs['description'] = str(parsed_args.description)
if is_create:
attrs['c_type'] = 'tcp'
if parsed_args.negated is not None:
attrs['negated'] = str(parsed_args.negated)
if parsed_args.shared is not None:
attrs['shared'] = str(parsed_args.shared)
if parsed_args.src_port_min is not None:
definition['src_port_min'] = parsed_args.src_port_min
if parsed_args.src_port_max is not None:
definition['src_port_max'] = parsed_args.src_port_max
if parsed_args.dst_port_min is not None:
definition['dst_port_min'] = parsed_args.dst_port_min
if parsed_args.dst_port_max is not None:
definition['dst_port_max'] = parsed_args.dst_port_max
if parsed_args.flags is not None:
definition['flags'] = parsed_args.flags
if parsed_args.flags_mask is not None:
definition['flags_mask'] = parsed_args.flags_mask
if parsed_args.window_min is not None:
definition['window_min'] = parsed_args.window_min
if parsed_args.window_max is not None:
definition['window_max'] = parsed_args.window_max
attrs['definition'] = definition
return attrs
def _get_columns(resource):
columns = list(resource.keys())
if 'tenant_id' in columns:
columns.remove('tenant_id')
if 'project_id' not in columns:
columns.append('project_id')
return tuple(sorted(columns))

View File

@ -0,0 +1,191 @@
# Copyright (c) 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from osc_lib.command import command
from osc_lib import utils
object_path = "/classifications"
resource = 'classification'
class CreateUDPClassification(command.ShowOne):
"""Create an UDP Classification."""
def get_parser(self, prog_name):
parser = super(CreateUDPClassification, self).get_parser(prog_name)
parser.add_argument(
'name', metavar='NAME',
help=('Name of the UDP Classification.'))
parser.add_argument(
'--description',
help=('Description for the UDP Classification.'))
parser.add_argument(
'--negated',
help=('Whether the complement of the UDP '
'Classification should be matched.'))
parser.add_argument(
'--shared',
help=('Whether the UDP Classification should be '
'shared with other projects.'))
parser.add_argument(
'--src-port-min',
help=('Source port UDP Classification minimum value.'))
parser.add_argument(
'--src-port-max',
help=('Source port UDP Classification maximum value.'))
parser.add_argument(
'--dst-port-min',
help=('Destination port UDP Classification minimum value.'))
parser.add_argument(
'--dst-port-max',
help=('Destination port UDP Classification maximum value.'))
parser.add_argument(
'--length-min',
help=('Minimum length of the UDP header and payload data.'))
parser.add_argument(
'--length-max',
help=('Maximum length of the UDP header and payload data.'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
attrs = _get_attrs(self.app.client_manager,
parsed_args, is_create=True)
obj = client.create_ext(object_path, {resource: attrs})
columns = _get_columns(obj[resource])
data = utils.get_dict_properties(obj[resource], columns)
return columns, data
class DeleteUDPClassification(command.Command):
"""Delete a given UDP Classification."""
def get_parser(self, prog_name):
parser = super(DeleteUDPClassification, self).get_parser(prog_name)
parser.add_argument(
'udp_classification',
metavar="UDP_CLASSIFICATION",
help=('ID of the UDP Classification to delete.'))
return parser
def take_action(self, parsed_args):
id = parsed_args.udp_classification
client = self.app.client_manager.neutronclient
client.delete_ext(object_path + '/%s', id)
class ListUDPClassification(command.Lister):
"""List the UDP Classifications that belong to a given tenant."""
def take_action(self, parsed_args):
data = self.app.client_manager.neutronclient.list(
collection='classifications',
path=object_path, retrieve_all=True, c_type='udp')
headers = ('C_Type', 'ID', 'Name', 'Description', 'Negated', 'Shared')
columns = ('c_type', 'id', 'name', 'description', 'negated', 'shared')
return (headers, (utils.get_dict_properties(
s, columns) for s in data['classifications']))
class ShowUDPClassification(command.ShowOne):
"""Show information of a given UDP Classification."""
def get_parser(self, prog_name):
parser = super(ShowUDPClassification, self).get_parser(prog_name)
parser.add_argument(
'udp_classification',
metavar="UDP_CLASSIFICATION",
help=('ID of the UDP Classification to display.'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
cl = client.show_ext(object_path + '/%s',
parsed_args.udp_classification, c_type='udp')
columns = _get_columns(cl[resource])
data = utils.get_dict_properties(cl[resource], columns)
return columns, data
class UpdateUDPClassification(command.ShowOne):
"""Update name and description of a given UDP Classification."""
def get_parser(self, prog_name):
parser = super(UpdateUDPClassification, self).get_parser(prog_name)
parser.add_argument(
'--name', default='',
metavar='NAME',
help=('Name of the UDP Classification.'))
parser.add_argument(
'--description', default='',
help=('Description of the UDP Classification.'))
parser.add_argument(
'udp_classification',
metavar="UDP_CLASSIFICATION",
help=('ID of the UDP Classification to update.'))
return parser
def take_action(self, parsed_args):
id = parsed_args.udp_classification
client = self.app.client_manager.neutronclient
attrs = _get_attrs(self.app.client_manager,
parsed_args, is_create=False)
cl = client.update_ext(object_path + '/%s', id, {resource: attrs})
columns = _get_columns(cl[resource])
data = utils.get_dict_properties(cl[resource], columns)
return columns, data
def _get_attrs(client_manager, parsed_args, is_create=False):
attrs = {}
definition = {}
if parsed_args.name is not None:
attrs['name'] = str(parsed_args.name)
if parsed_args.description is not None:
attrs['description'] = str(parsed_args.description)
if is_create:
attrs['c_type'] = 'udp'
if parsed_args.negated is not None:
attrs['negated'] = str(parsed_args.negated)
if parsed_args.shared is not None:
attrs['shared'] = str(parsed_args.shared)
if parsed_args.src_port_min is not None:
definition['src_port_min'] = parsed_args.src_port_min
if parsed_args.src_port_max is not None:
definition['src_port_max'] = parsed_args.src_port_max
if parsed_args.dst_port_min is not None:
definition['dst_port_min'] = parsed_args.dst_port_min
if parsed_args.dst_port_max is not None:
definition['dst_port_max'] = parsed_args.dst_port_max
if parsed_args.length_min is not None:
definition['length_min'] = parsed_args.length_min
if parsed_args.length_max is not None:
definition['length_max'] = parsed_args.length_max
attrs['definition'] = definition
return attrs
def _get_columns(resource):
columns = list(resource.keys())
if 'tenant_id' in columns:
columns.remove('tenant_id')
if 'project_id' in columns:
columns.append('project_id')
return tuple(sorted(columns))

View File

@ -16,11 +16,17 @@
from neutron_classifier.objects import classifications as cs
FIELDS_IPV4 = cs.IPV4Classification.fields.keys()
FIELDS_IPV6 = cs.IPV6Classification.fields.keys()
FIELDS_TCP = cs.TCPClassification.fields.keys()
FIELDS_UDP = cs.UDPClassification.fields.keys()
FIELDS_ETHERNET = cs.EthernetClassification.fields.keys()
COMMON_FIELDS = cs.ClassificationBase.fields.keys()
FIELDS_IPV4 = list(set(cs.IPV4Classification.fields.keys()) -
set(COMMON_FIELDS))
FIELDS_IPV6 = list(set(cs.IPV6Classification.fields.keys()) -
set(COMMON_FIELDS))
FIELDS_TCP = list(set(cs.TCPClassification.fields.keys()) -
set(COMMON_FIELDS))
FIELDS_UDP = list(set(cs.UDPClassification.fields.keys()) -
set(COMMON_FIELDS))
FIELDS_ETHERNET = list(set(cs.EthernetClassification.fields.keys()) -
set(COMMON_FIELDS))
SUPPORTED_FIELDS = {'ipv4': FIELDS_IPV4,

View File

@ -46,14 +46,14 @@ CLASSIFICATION_GROUP_RESOURCE_MAP = {
'convert_to': converters.convert_to_boolean},
'operator': {
'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': 'and',
'is_visible': True, 'default': 'AND',
'validate': {'type:string': const.NAME_FIELD_SIZE},
'convert_to': validate_string},
'classifications': {
'classification': {
'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': [],
'convert_to': converters.convert_to_list},
'cg_ids': {
'classification_group': {
'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': []},
}

View File

@ -21,7 +21,7 @@ from neutron_classifier.common import udp_validators
from neutron_classifier.db import models
from neutron_classifier.objects import classifications
from neutron.db import api as db_api
from neutron_lib.db import api as db_api
type_validators = {}
type_validators['ethernet'] = eth_validators.validators_dict
@ -57,7 +57,7 @@ def check_can_delete_classification_group(context, cg_id):
"""
cgs = classifications.ClassificationGroup.get_objects(context)
for cg in cgs:
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
cg_obj = classifications.ClassificationGroup.get_object(context,
id=cg.id)
mapped_cgs = classifications._get_mapped_classification_groups(

View File

@ -15,7 +15,8 @@
from oslo_log import log as logging
from oslo_utils import uuidutils
from neutron.db import api as db_api
from neutron_lib.db import api as db_api
from neutron.db import common_db_mixin
from neutron.objects import base as base_obj
@ -33,40 +34,51 @@ class TrafficClassificationGroupPlugin(common_db_mixin.CommonDbMixin):
def create_classification_group(self, context, classification_group):
details = classification_group['classification_group']
c_flag = cg_flag = False
if details['classifications']:
if 'classification' in details:
c_flag = True
validators.check_valid_classifications(context,
details['classifications'])
details['classification'])
if details['classification_groups']:
if 'classification_group' in details:
cg_flag = True
validators.check_valid_classification_groups(
context, details['classification_groups'])
context, details['classification_group'])
details['id'] = uuidutils.generate_uuid()
mappings = {'c_ids': details['classifications'],
'cg_ids': details['classification_groups']}
mappings = {'c_ids': details['classification'] if c_flag else [],
'cg_ids': details['classification_group']
if cg_flag else []}
db_dict = details
if 'tenant_id' in details:
del details['tenant_id']
cg = classifications.ClassificationGroup(context, **details)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
cg.create()
db_dict['id'] = cg.id
with db_api.context_manager.writer.using(context):
for cl in mappings['c_ids']:
cg_c_mapping = classifications.CGToClassificationMapping(
context,
container_cg_id=cg.id,
stored_classification_id=cl)
cg_c_mapping.create()
for cg_id in mappings['cg_ids']:
cg_cg_mapping = classifications.CGToClassificationGroupMapping(
context,
container_cg_id=cg.id,
stored_cg_id=cg_id
)
cg_cg_mapping.create()
db_dict['classifications'] = details['classifications']
db_dict['classification_group'] = details['classification_groups']
with db_api.CONTEXT_WRITER.using(context):
if c_flag:
for cl in mappings['c_ids']:
cg_c_mapping = classifications.CGToClassificationMapping(
context,
container_cg_id=cg.id,
stored_classification_id=cl)
cg_c_mapping.create()
if cg_flag:
for cg_id in mappings['cg_ids']:
cg_cg_mapping =\
classifications.CGToClassificationGroupMapping(
context,
container_cg_id=cg.id,
stored_cg_id=cg_id
)
cg_cg_mapping.create()
db_dict['classification'] = details['classification']\
if c_flag else []
db_dict['classification_group'] = details['classification_group']\
if cg_flag else []
return db_dict
@ -75,20 +87,22 @@ class TrafficClassificationGroupPlugin(common_db_mixin.CommonDbMixin):
context, classification_group_id):
cg = classifications.ClassificationGroup.get_object(
context, id=classification_group_id)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
cg.delete()
def update_classification_group(self, context, classification_group_id,
fields_to_update):
fields_to_update = fields_to_update['classification_group']
field_keys = list(fields_to_update.keys())
valid_keys = ['name', 'description']
for key in field_keys:
if key not in valid_keys:
raise exceptions.InvalidUpdateRequest()
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
cg = classifications.ClassificationGroup.update_object(
context, fields_to_update, id=classification_group_id)
return cg
db_dict = self._make_db_dict(cg)
return db_dict
def _make_db_dict(self, obj):
db_dict = {'classification_group': {}}
@ -96,20 +110,55 @@ class TrafficClassificationGroupPlugin(common_db_mixin.CommonDbMixin):
db_dict['classification_group'][key] = obj[key]
return db_dict
def get_classification_group(self, context, classification_group_id):
with db_api.context_manager.writer.using(context):
def _make_db_dicts(self, cgs):
db_dict = []
for cg in cgs:
cg_dict = self._make_db_dict(cg)
db_dict.append(cg_dict)
return db_dict
def _make_c_dict(self, c_obj):
c_dict = {'id': c_obj['id'],
'name': c_obj['name'],
'project_id': c_obj['project_id'],
'description': c_obj['description'],
'c_type': c_obj['c_type'],
'negated': c_obj['negated'],
'shared': c_obj['shared']}
return c_dict
def _make_c_dicts(self, c_objs):
if not c_objs:
return []
ret_list = []
for clas in c_objs:
db_dict = self._make_c_dict(clas)
db_dict['id'] = clas.get('id', None)
ret_list.append(db_dict)
return ret_list
def get_classification_group(self, context, classification_group_id,
fields=None):
with db_api.CONTEXT_WRITER.using(context):
cg = classifications.ClassificationGroup.get_object(
context, id=classification_group_id)
db_dict = self._make_db_dict(cg)
db_dict['classification_group']['classifications'] =\
classifications._get_mapped_classifications(context, cg)
db_dict['classification_group']['classification_groups'] = \
classifications._get_mapped_classification_groups(context, cg)
return db_dict
mapped_cs = classifications._get_mapped_classifications(context,
cg)
mapped_cgs = classifications._get_mapped_classification_groups(
context, cg)
c_dict = self._make_c_dicts(mapped_cs)
cg_dict = self._make_db_dicts(mapped_cgs)
db_dict['classification_group']['classifications'] = c_dict
db_dict['classification_group']['classification_groups'] = cg_dict
return db_dict
def get_classification_groups(self, context, sorts=None, limit=None,
marker=None, page_reverse=False):
marker=None, page_reverse=False,
filters=None, fields=None):
pager = base_obj.Pager(sorts, limit, page_reverse, marker)
cgs = classifications.ClassificationGroup.get_objects(context,
_pager=pager)
return cgs
db_dict = self._make_db_dicts(cgs)
return db_dict

View File

@ -12,8 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.db import _model_query as mq
from neutron_lib.db import model_base
from neutron_lib.db import model_query as mq
import sqlalchemy as sa
from sqlalchemy import orm

View File

@ -87,7 +87,7 @@ class Classification(api_ext.ExtensionDescriptor):
allow_bulk=True)
for resource in resources:
resource.path_prefix = '/classifications'
resource.path_prefix = 'ccf/classifications'
return resources
@ -102,7 +102,7 @@ class Classification(api_ext.ExtensionDescriptor):
@six.add_metaclass(ABCMeta)
class NeutronClassificationPluginBase(service_base.ServicePluginBase):
path_prefix = '/classifications'
path_prefix = 'ccf/classifications'
def get_plugin_name(self):
return EXT_NAME

View File

@ -14,7 +14,8 @@
from oslo_log import log as logging
from neutron.db import api as db_api
from neutron_lib.db import api as db_api
from neutron.objects import base as base_obj
from neutron_classifier.common import exceptions
from neutron_classifier.common import validators
@ -44,7 +45,7 @@ class ClassificationPlugin(classification.NeutronClassificationPluginBase,
raise exceptions.InvalidClassificationDefintion()
cl = class_group.CLASS_MAP[c_type](context, **details)
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
cl.create()
db_dict = self.merge_header(cl)
db_dict['id'] = cl['id']
@ -59,11 +60,12 @@ class ClassificationPlugin(classification.NeutronClassificationPluginBase,
validators.check_valid_classifications(context,
[classification_id])
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
classification.delete()
def update_classification(self, context, classification_id,
fields_to_update):
fields_to_update = fields_to_update['classification']
field_keys = list(fields_to_update.keys())
valid_keys = ['name', 'description']
for key in field_keys:
@ -72,10 +74,12 @@ class ClassificationPlugin(classification.NeutronClassificationPluginBase,
cl = class_group.ClassificationBase.get_object(context,
id=classification_id)
cl_class = class_group.CLASS_MAP[cl.c_type]
with db_api.context_manager.writer.using(context):
with db_api.CONTEXT_WRITER.using(context):
classification = cl_class.update_object(
context, fields_to_update, id=classification_id)
return classification
db_dict = self.merge_header(classification)
db_dict['id'] = classification['id']
return db_dict
def get_classification(self, context, classification_id, fields=None):
cl = class_group.ClassificationBase.get_object(context,
@ -89,20 +93,12 @@ class ClassificationPlugin(classification.NeutronClassificationPluginBase,
def get_classifications(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
# NOTE(ndahiwad): If the filters are not passed by the end-user
# then will fetch all the classifications. Otherwise, only the
# classification_types that the user wants will be returned.
if not filters['c_type']:
filters['c_type'] = ['tcp', 'udp', 'ipv4', 'ipv6', 'ethernet']
c_dict = {'classifications': []}
for c_type in filters['c_type']:
pager = base_obj.Pager(sorts, limit, page_reverse, marker)
cl = class_group.CLASS_MAP[c_type].get_objects(context,
_pager=pager)
db_dict = self.merge_headers(cl)
c_dict['classifications'].append(db_dict)
return c_dict
c_type = filters['c_type'][0]
pager = base_obj.Pager(sorts, limit, page_reverse, marker)
cl = class_group.CLASS_MAP[c_type].get_objects(context,
_pager=pager)
db_dict = self.merge_headers(cl)
return db_dict
def get_classification_type(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
@ -140,13 +136,14 @@ class ClassificationPlugin(classification.NeutronClassificationPluginBase,
return cl_dict
def merge_headers(self, classifications):
c_type = classifications[0]['c_type']
ret_list = {CLASSIFICATION_MAP[c_type]: []}
if not classifications:
return []
ret_list = []
for clas in classifications:
db_dict = self.merge_header(clas)
db_dict['id'] = clas.get('id', None)
ret_list[CLASSIFICATION_MAP[c_type]].append(db_dict)
ret_list.append(db_dict)
return ret_list
def merge_header(self, classification):

View File

@ -14,10 +14,10 @@
import copy
from neutron.db import _model_query as mq
from neutron.tests.unit import testlib_api
from neutron_classifier.db import models
from neutron_lib import context
from neutron_lib.db import model_query as mq
from oslo_utils import uuidutils

View File

@ -14,7 +14,8 @@
from oslo_utils import uuidutils
from neutron.db import api as db_api
from neutron_lib.db import api as db_api
from neutron.tests.unit import testlib_api
from neutron_classifier.common import exceptions
@ -35,7 +36,7 @@ class ClassificationGroupApiTest(testlib_api.MySQLTestCaseMixin,
self.test_plugin = cg_plugin()
def test_get_classification_group(self):
with db_api.context_manager.writer.using(self.ctx):
with db_api.CONTEXT_WRITER.using(self.ctx):
cg = self._create_test_cg('Test Group 0')
cg_dict = self.test_plugin._make_db_dict(cg)
fetch_cg = self.test_plugin.get_classification_group(self.ctx,
@ -47,15 +48,15 @@ class ClassificationGroupApiTest(testlib_api.MySQLTestCaseMixin,
self.assertEqual(cg_dict, fetch_cg)
def test_get_classification_groups(self):
with db_api.context_manager.writer.using(self.ctx):
with db_api.CONTEXT_WRITER.using(self.ctx):
cg1 = self._create_test_cg('Test Group 1')
cg2 = self._create_test_cg('Test Group 2')
test_cgs = self.test_plugin._make_db_dicts([cg1, cg2])
cgs = self.test_plugin.get_classification_groups(self.ctx)
self.assertIn(cg1, cgs)
self.assertIn(cg2, cgs)
self.assertItemsEqual(test_cgs, cgs)
def test_create_classification_group(self):
with db_api.context_manager.writer.using(self.ctx):
with db_api.CONTEXT_WRITER.using(self.ctx):
tcp_class = classifications.TCPClassification
ipv4_class = classifications.IPV4Classification
cg2 = self._create_test_cg('Test Group 1')
@ -89,22 +90,24 @@ class ClassificationGroupApiTest(testlib_api.MySQLTestCaseMixin,
c, cg_dict['classification_group']['classifications'])
def test_update_classification_group(self):
with db_api.context_manager.writer.using(self.ctx):
with db_api.CONTEXT_WRITER.using(self.ctx):
cg1 = self._create_test_cg('Test Group 0')
cg2 = self._create_test_cg('Test Group 1')
self.test_plugin.update_classification_group(
self.ctx, cg1.id, {'name': 'Test Group updated'})
self.ctx, cg1.id,
{'classification_group': {'name': 'Test Group updated'}})
fetch_cg1 = classifications.ClassificationGroup.get_object(
self.ctx, id=cg1['id'])
self.assertRaises(
exceptions.InvalidUpdateRequest,
self.test_plugin.update_classification_group,
self.ctx, cg2.id, {'name': 'Test Group updated',
'operator': 'OR'})
self.ctx, cg2.id,
{'classification_group': {'name': 'Test Group updated',
'operator': 'OR'}})
self.assertEqual(fetch_cg1.name, 'Test Group updated')
def test_delete_classification_group(self):
with db_api.context_manager.writer.using(self.ctx):
with db_api.CONTEXT_WRITER.using(self.ctx):
cg1 = self._create_test_cg('Test Group 0')
self.test_plugin.delete_classification_group(self.ctx, cg1.id)
fetch_cg1 = classifications.ClassificationGroup.get_object(
@ -127,7 +130,7 @@ class ClassificationApiTest(testlib_api.MySQLTestCaseMixin,
for key in validators.type_validators[c_type].keys():
attrs['definition'][key] = attrs.pop(key, None)
c_attrs = {'classification': attrs}
with db_api.context_manager.writer.using(self.ctx):
with db_api.CONTEXT_WRITER.using(self.ctx):
c1 = self.test_clas_plugin.create_classification(self.ctx,
c_attrs)
fetch_c1 = classifications.EthernetClassification.get_object(
@ -145,7 +148,7 @@ class ClassificationApiTest(testlib_api.MySQLTestCaseMixin,
def test_delete_classification(self):
tcp_class = classifications.TCPClassification
with db_api.context_manager.writer.using(self.ctx):
with db_api.CONTEXT_WRITER.using(self.ctx):
tcp = self._create_test_classification('tcp', tcp_class)
self.test_clas_plugin.delete_classification(self.ctx, tcp.id)
fetch_tcp = classifications.TCPClassification.get_object(
@ -154,34 +157,34 @@ class ClassificationApiTest(testlib_api.MySQLTestCaseMixin,
def test_get_classification(self):
ipv4_class = classifications.IPV4Classification
with db_api.context_manager.writer.using(self.ctx):
with db_api.CONTEXT_WRITER.using(self.ctx):
ipv4 = self._create_test_classification('ipv4', ipv4_class)
fetch_ipv4 = self.test_clas_plugin.get_classification(self.ctx,
ipv4.id)
self.assertEqual(fetch_ipv4, self.test_clas_plugin.merge_header(ipv4))
def test_get_classifications(self):
with db_api.context_manager.writer.using(self.ctx):
with db_api.CONTEXT_WRITER.using(self.ctx):
c1 = self._create_test_classification(
'ipv6', classifications.IPV6Classification)
c2 = self._create_test_classification(
'udp', classifications.UDPClassification)
fetch_cs = self.test_clas_plugin.get_classifications(
self.ctx, filters={'c_type': ['udp', 'ipv6']})
fetch_cs_udp = self.test_clas_plugin.get_classifications(
self.ctx, filters={'c_type': ['udp']})
fetch_cs_ipv6 = self.test_clas_plugin.get_classifications(
self.ctx, filters={'c_type': ['ipv6']})
c1_dict = self.test_clas_plugin.merge_header(c1)
c2_dict = self.test_clas_plugin.merge_header(c2)
self.assertIn({'UDPClassifications': [c2_dict]},
fetch_cs['classifications'])
self.assertIn({'IPV6Classifications': [c1_dict]},
fetch_cs['classifications'])
self.assertIn(c1_dict, fetch_cs_ipv6)
self.assertIn(c2_dict, fetch_cs_udp)
def test_update_classification(self):
c1 = self._create_test_classification(
'ethernet', classifications.EthernetClassification)
updated_name = 'Test Updated Classification'
with db_api.context_manager.writer.using(self.ctx):
self.test_clas_plugin.update_classification(self.ctx, c1.id,
{'name': updated_name})
with db_api.CONTEXT_WRITER.using(self.ctx):
self.test_clas_plugin.update_classification(
self.ctx, c1.id, {'classification': {'name': updated_name}})
fetch_c1 = classifications.EthernetClassification.get_object(
self.ctx, id=c1.id)
self.assertEqual(fetch_c1.name, updated_name)

View File

@ -67,8 +67,8 @@ class TestClassificationGroupPlugin(base.BaseClassificationTestCase):
'project_id': uuidutils.generate_uuid(),
'operator': 'AND',
'shared': False,
'classifications': [self.c_id1, self.c_id2],
'classification_groups': [self.cg_id]}
'classification': [self.c_id1, self.c_id2],
'classification_group': [self.cg_id]}
}
return self.test_cg
@ -96,8 +96,8 @@ class TestClassificationGroupPlugin(base.BaseClassificationTestCase):
self.assertEqual(val, expected_val)
c_len = len(val['classifications'])
cg_len = len(val['classification_groups'])
c_len = len(val['classification'])
cg_len = len(val['classification_group'])
mock_call_len = len(mock_manager.mock_calls)
self.assertEqual(mock_call_len, c_len + cg_len + 1)
@ -134,33 +134,23 @@ class TestClassificationGroupPlugin(base.BaseClassificationTestCase):
mock_manager.mock_calls.index(mock_cg_get_call) <
mock_manager.mock_calls.index(mock_cg_delete_call))
def _mock_mapped_classifications(self):
self.mock_c1 = mock.Mock(id=uuidutils.generate_uuid(),
name='Ethernet', c_type='ethernet',
**self.test_classification_attrs)
self.mock_c2 = mock.Mock(id=uuidutils.generate_uuid(), name='TCP',
c_type='tcp',
**self.test_classification_attrs)
return [self.mock_c1, self.mock_c2]
@mock.patch('neutron_classifier.objects.classifications.'
'_get_mapped_classification_groups')
@mock.patch('neutron_classifier.objects.classifications.'
'_get_mapped_classifications')
@mock.patch.object(classifications.ClassificationGroup, 'get_object')
def test_get_classification_group(self, mock_cg_get,
@mock.patch('neutron_classifier.db.classification.'
'TrafficClassificationGroupPlugin._make_db_dicts')
def test_get_classification_group(self, mock_db_dicts, mock_cg_get,
mock_mapped_classifications,
mock_mapped_cgs):
mock_manager = mock.Mock()
mock_manager.attach_mock(mock_db_dicts, 'make_db_dicts')
mock_manager.attach_mock(mock_cg_get, 'get_cg')
mock_manager.attach_mock(mock_mapped_classifications, 'get_mapped_cs')
mock_manager.attach_mock(mock_mapped_cgs, 'get_mapped_cgs')
mock_manager.reset_mock()
mock_manager.get_mapped_cs.side_effect =\
self._mock_mapped_classifications()
mock_manager.get_mapped_cgs.side_effect = ['cg2']
test_cg = self._generate_test_classification_group('Test Group')
test_cg['classification_group'].pop('classifications', None)
test_cg['classification_group'].pop('classification_groups', None)
@ -169,32 +159,33 @@ class TestClassificationGroupPlugin(base.BaseClassificationTestCase):
with mock.patch('neutron_classifier.db.classification.'
'TrafficClassificationGroupPlugin._make_db_dict',
return_value=test_cg):
val1 = self.cg_plugin.get_classification_group(
self.ctxt, test_cg['classification_group']['id'])
with mock.patch('neutron_classifier.db.classification.'
'TrafficClassificationGroupPlugin._make_c_dicts'):
val1 = self.cg_plugin.get_classification_group(
self.ctxt, test_cg['classification_group']['id'])
self.assertEqual(val1, test_cg)
mock_manager.get_cg.assert_called_with(
self.ctxt, id=test_cg['classification_group']['id']
)
self.assertEqual(val1['classification_group']['classifications'],
self.mock_c1)
val1['classification_group']['classifications'] =\
classifications._get_mapped_classifications(self.ctxt,
test_cg)
self.assertEqual(val1['classification_group']['classifications'],
self.mock_c2)
self.assertEqual(val1['classification_group']
['classification_groups'], 'cg2')
mapped_cs_call_count = mock_manager.get_mapped_cs.call_count
self.assertEqual(2, mapped_cs_call_count)
mock_manager_call_count = len(mock_manager.mock_calls)
self.assertEqual(4, mock_manager_call_count)
mock_db_dicts.assert_called_once()
mock_cg_get.assert_called_once()
mock_mapped_classifications.assert_called_once()
mock_mapped_cgs.assert_called_once()
@mock.patch.object(base_obj, 'Pager')
@mock.patch.object(classifications.ClassificationGroup, 'get_objects')
def test_get_classification_groups(self, mock_cgs_get, mock_pager):
@mock.patch.object(cg_api.TrafficClassificationGroupPlugin,
'_make_db_dicts')
def test_get_classification_groups(self, mock_db_dicts, mock_cgs_get,
mock_pager):
mock_manager = mock.Mock()
mock_manager.attach_mock(mock_cgs_get, 'get_cgs')
mock_manager.attach_mock(mock_pager, 'pager')
mock_manager.attach_mock(mock_db_dicts, 'db_dicts')
mock_manager.reset_mock()
test_cg1 = self._generate_test_classification_group('Test Group1')
@ -204,16 +195,16 @@ class TestClassificationGroupPlugin(base.BaseClassificationTestCase):
cg1 = classifications.ClassificationGroup(self.ctxt, **test_cg1)
cg2 = classifications.ClassificationGroup(self.ctxt, **test_cg2)
cg_list = [cg1, cg2]
cg_list = [self.cg_plugin._make_db_dict(cg) for cg in [cg1, cg2]]
mock_manager.get_cgs.return_value = cg_list
val = self.cg_plugin.get_classification_groups(self.ctxt)
self.cg_plugin.get_classification_groups(self.ctxt)
self.assertEqual(val, cg_list)
mock_manager.get_cgs.assert_called_once()
mock_manager.pager.assert_called_once()
self.assertEqual(len(mock_manager.mock_calls), 2)
mock_manager.db_dicts.assert_called_once()
self.assertEqual(len(mock_manager.mock_calls), 3)
@mock.patch.object(classifications.ClassificationGroup, 'update_object')
def test_update_classification_group(self, mock_cg_update):
@ -226,13 +217,16 @@ class TestClassificationGroupPlugin(base.BaseClassificationTestCase):
cg = classifications.ClassificationGroup(self.ctxt, **test_cg)
updated_fields = {'name': 'Test Group Updated',
'description': 'Updated Description'}
updated_fields = {'classification_group':
{'name': 'Test Group Updated',
'description': 'Updated Description'}}
self.cg_plugin.update_classification_group(self.ctxt, cg.id,
updated_fields)
updated_fields_called = {'name': 'Test Group Updated',
'description': 'Updated Description'}
mock_manager.cg_update.assert_called_once()
mock_manager.cg_update.assert_called_once_with(self.ctxt,
updated_fields,
updated_fields_called,
id=cg.id)

View File

@ -0,0 +1,65 @@
# Can't be run at the moment until migration with openstack-client
# Copyright (c) 2018 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.tests.unit.extensions import base as test_extensions_base
from neutronclient.v2_0 import client
OPENSTACK_CLI_ID = "/ccf/classifications"
ASSOCS_PATH = "/ccf/classifications"
NET_ASSOC_ID = "uuid_client_foo"
class OpenstackClientTestCase(test_extensions_base.ExtensionTestCase):
def setUp(self):
super(OpenstackClientTestCase, self).setUp()
self.client = client.Client()
self.client.list_ext = mock.Mock()
self.client.create_ext = mock.Mock()
self.client.show_ext = mock.Mock()
self.client.update_ext = mock.Mock()
self.client.delete_ext = mock.Mock()
print("self.client keys: ", dir(self.client))
def test_client_url_list(self):
self.client.ListIPV4Classification(OPENSTACK_CLI_ID)
self.client.list_ext.assert_called_once_with(mock.ANY, ASSOCS_PATH,
mock.ANY)
def test_client_url_create(self):
self.client.CreateIPV4Classification(OPENSTACK_CLI_ID, {})
self.client.create_ext.assert_called_once_with(ASSOCS_PATH, mock.ANY)
def test_client_url_show(self):
self.client.ShowIPV4Classification(NET_ASSOC_ID, OPENSTACK_CLI_ID)
self.client.show_ext.assert_called_once_with(ASSOCS_PATH,
NET_ASSOC_ID)
def test_client_url_update(self):
self.client.UpdateIPV4Classification(NET_ASSOC_ID,
OPENSTACK_CLI_ID, {})
self.client.update_ext.assert_called_once_with(ASSOCS_PATH,
NET_ASSOC_ID,
mock.ANY)
def test_client_url_delete(self):
self.client.DeleteIPV4Classification(NET_ASSOC_ID, OPENSTACK_CLI_ID)
self.client.delete_ext.assert_called_once_with(ASSOCS_PATH,
NET_ASSOC_ID)

View File

@ -20,26 +20,18 @@ class TestClassificationType(base.BaseClassificationTestCase):
def setUp(self):
super(TestClassificationType, self).setUp()
common_fields = ['c_type', 'description', 'negated', 'shared',
'project_id', 'id', 'name']
common_ipv = ['src_addr', 'ecn', 'length_min', 'dscp', 'dscp_mask',
'length_max', 'dst_addr']
common_tcp_udp = ['src_port_min', 'src_port_max', 'dst_port_min',
'dst_port_max']
self.ipv4_fields = common_fields + common_ipv + ['ttl_max', 'flags',
'protocol', 'ttl_min',
'flags_mask']
self.ipv6_fields = common_fields + common_ipv + ['hops_min',
'hops_max',
'next_header']
self.tcp_fields = common_fields + common_tcp_udp + ['window_min',
'flags',
'window_max',
'flags_mask']
self.udp_fields = common_fields + common_tcp_udp + ['length_min',
'length_max']
self.ethernet_fields = common_fields + ['ethertype', 'src_addr',
'dst_addr']
self.ipv4_fields = common_ipv + ['ttl_max', 'flags', 'protocol',
'ttl_min', 'flags_mask']
self.ipv6_fields = common_ipv + ['hops_min', 'hops_max',
'next_header']
self.tcp_fields = common_tcp_udp + ['window_min', 'flags',
'window_max', 'flags_mask']
self.udp_fields = common_tcp_udp + ['length_min', 'length_max']
self.ethernet_fields = ['ethertype', 'src_addr', 'dst_addr']
def test_ipv4_cls_type(self):
ipv4_obj = classification_type.ClassificationType.get_object('ipv4')

View File

@ -19,8 +19,8 @@ from neutron_classifier.tests import objects_base as obj_base
from neutron_classifier.tests import tools
from neutron_lib import context
from neutron_lib.db import api as db_api
from neutron.db import api as db_api
from neutron.tests.unit.objects import test_base
from neutron.tests.unit import testlib_api
@ -149,7 +149,7 @@ class CGToClassificationGroupMappingTest(testlib_api.SqlTestCase,
obj_base._CCFObjectsTestCommon):
def test_get_object(self):
with db_api.context_manager.writer.using(self.ctx):
with db_api.CONTEXT_WRITER.using(self.ctx):
cg1 = self._create_test_cg('Test Group 0')
cg2 = self._create_test_cg('Test Group 1')
cg_m_cg = self._create_test_cg_cg_mapping(cg1.id, cg2.id)
@ -163,7 +163,7 @@ class CGToClassificationGroupMappingTest(testlib_api.SqlTestCase,
self.assertEqual(cg_m_cg, fetch_cg_m_cg)
def test_multiple_cg_mappings(self):
with db_api.context_manager.writer.using(self.ctx):
with db_api.CONTEXT_WRITER.using(self.ctx):
cg1 = self._create_test_cg('Test Group 0')
cg2 = self._create_test_cg('Test Group 1')
cg3 = self._create_test_cg('Test Group 2')
@ -185,7 +185,7 @@ class CGToClassificationMappingTest(testlib_api.SqlTestCase,
ctx = context.get_admin_context()
def test_get_object(self):
with db_api.context_manager.writer.using(self.ctx):
with db_api.CONTEXT_WRITER.using(self.ctx):
cg = self._create_test_cg('Test Group')
cl_ = self._create_test_classification(
'udp', classifications.UDPClassification)
@ -202,7 +202,7 @@ class CGToClassificationMappingTest(testlib_api.SqlTestCase,
self.assertEqual(cg_m_c, fetch_cg_m_c)
def test_multiple_c_mappings(self):
with db_api.context_manager.writer.using(self.ctx):
with db_api.CONTEXT_WRITER.using(self.ctx):
cg = self._create_test_cg('Test Group')
c1 = self._create_test_classification(
'tcp', classifications.TCPClassification)

View File

@ -128,22 +128,27 @@ class TestPlugin(base.BaseClassificationTestCase):
self.assertEqual(expected_val, val)
mock_manager.create.assert_called_once()
@mock.patch.object(plugin.ClassificationPlugin, 'merge_header')
@mock.patch.object(class_group.ClassificationBase, 'get_object')
@mock.patch.object(class_group.EthernetClassification, 'update_object')
def test_update_classification(self, mock_ethernet_update,
mock_class_get):
@mock.patch.object(class_group.EthernetClassification, 'id',
return_value=uuidutils.generate_uuid())
def test_update_classification(self, mock_id, mock_ethernet_update,
mock_class_get, mock_merge):
mock_manager = mock.Mock()
mock_manager.attach_mock(mock_id, 'id')
mock_manager.attach_mock(mock_ethernet_update, 'update')
mock_manager.attach_mock(mock_class_get, 'get_classification')
mock_manager.attach_mock(mock_merge, 'merge_header')
mock_manager.reset_mock()
mock_manager.start()
class_obj = class_group.EthernetClassification(
self.ctxt, **self.test_classification_broken_headers)
ethernet_classification_update = {
ethernet_classification_update = {'classification': {
'name': 'test_ethernet_classification Version 2',
'description': 'Test Ethernet Classification Version 2'}
'description': 'Test Ethernet Classification Version 2'}}
mock_manager.get_classification().c_type = 'ethernet'
self.cl_plugin.update_classification(

View File

@ -1,5 +1,8 @@
pbr>=2.0.0,!=2.1.0 # Apache-2.0
Babel>=2.3.4,!=2.4.0 # BSD
keystoneauth1>=3.6.2 # Apache-2.0
python-neutronclient>=6.7.0 # Apache-2.0
python-openstackclient>=3.16.0 # Apache-2.0
SQLAlchemy>=1.0.10,!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8 # MIT
neutron-lib>=1.18.0 # Apache-2.0
oslo.utils>=3.33.0 # Apache-2.0

View File

@ -25,7 +25,38 @@ packages =
[entry_points]
neutron.service_plugins =
neutron_classifier = neutron_classifier.services.classification.plugin:ClassificationPlugin
openstack.neutronclient.v2 =
network classification ethernet create = neutron_classifier.cli.openstack_cli.eth_classification:CreateEthernetClassification
network classification ethernet delete = neutron_classifier.cli.openstack_cli.eth_classification:DeleteEthernetClassification
network classification ethernet list = neutron_classifier.cli.openstack_cli.eth_classification:ListEthernetClassification
network classification ethernet show = neutron_classifier.cli.openstack_cli.eth_classification:ShowEthernetClassification
network classification ethernet update = neutron_classifier.cli.openstack_cli.eth_classification:UpdateEthernetClassification
network classification ipv4 create = neutron_classifier.cli.openstack_cli.ipv4_classification:CreateIPV4Classification
network classification ipv4 delete = neutron_classifier.cli.openstack_cli.ipv4_classification:DeleteIPV4Classification
network classification ipv4 list = neutron_classifier.cli.openstack_cli.ipv4_classification:ListIPV4Classification
network classification ipv4 show = neutron_classifier.cli.openstack_cli.ipv4_classification:ShowIPV4Classification
network classification ipv4 update = neutron_classifier.cli.openstack_cli.ipv4_classification:UpdateIPV4Classification
network classification ipv6 create = neutron_classifier.cli.openstack_cli.ipv6_classification:CreateIPV6Classification
network classification ipv6 delete = neutron_classifier.cli.openstack_cli.ipv6_classification:DeleteIPV6Classification
network classification ipv6 list = neutron_classifier.cli.openstack_cli.ipv6_classification:ListIPV6Classification
network classification ipv6 show = neutron_classifier.cli.openstack_cli.ipv6_classification:ShowIPV6Classification
network classification ipv6 update = neutron_classifier.cli.openstack_cli.ipv6_classification:UpdateIPV6Classification
network classification tcp create = neutron_classifier.cli.openstack_cli.tcp_classification:CreateTCPClassification
network classification tcp delete = neutron_classifier.cli.openstack_cli.tcp_classification:DeleteTCPClassification
network classification tcp list = neutron_classifier.cli.openstack_cli.tcp_classification:ListTCPClassification
network classification tcp show = neutron_classifier.cli.openstack_cli.tcp_classification:ShowTCPClassification
network classification tcp update = neutron_classifier.cli.openstack_cli.tcp_classification:UpdateTCPClassification
network classification udp create = neutron_classifier.cli.openstack_cli.udp_classification:CreateUDPClassification
network classification udp delete = neutron_classifier.cli.openstack_cli.udp_classification:DeleteUDPClassification
network classification udp list = neutron_classifier.cli.openstack_cli.udp_classification:ListUDPClassification
network classification udp show = neutron_classifier.cli.openstack_cli.udp_classification:ShowUDPClassification
network classification udp update = neutron_classifier.cli.openstack_cli.udp_classification:UpdateUDPClassification
network classification type list = neutron_classifier.cli.openstack_cli.classification_type:ListClassificationType
network classification group create = neutron_classifier.cli.openstack_cli.classification_group:CreateClassificationGroup
network classification group delete = neutron_classifier.cli.openstack_cli.classification_group:DeleteClassificationGroup
network classification group list = neutron_classifier.cli.openstack_cli.classification_group:ListClassificationGroup
network classification group show = neutron_classifier.cli.openstack_cli.classification_group:ShowClassificationGroup
network classification group update = neutron_classifier.cli.openstack_cli.classification_group:UpdateClassificationGroup
neutron.db.alembic_migrations =
neutron-classifier = neutron_classifier.db.migration:alembic_migrations

View File

@ -11,6 +11,7 @@ openstackdocstheme>=1.18.1 # Apache-2.0
oslosphinx>=4.7.0 # Apache-2.0
WebOb>=1.7.1 # MIT
oslotest>=3.2.0 # Apache-2.0
os-client-config>=1.28.0 # Apache-2.0
stestr>=2.0.0 # Apache-2.0
testresources>=2.0.0 # Apache-2.0/BSD
testscenarios>=0.4 # Apache-2.0/BSD

View File

@ -19,6 +19,7 @@ deps =
whitelist_externals =
sh
find
stestr
commands =
find . -type f -name "*.py[c|o]" -delete
find . -type d -name "__pycache__" -delete
@ -45,7 +46,8 @@ setenv = {[testenv]setenv}
deps =
{[testenv]deps}
-r{toxinidir}/neutron_classifier/tests/functional/requirements.txt
commands = stestr run {posargs}
commands =
stestr run {posargs}
[testenv:functional-py35]
basepython = python3.5
@ -62,10 +64,12 @@ commands = stestr run {posargs}
basepython = python2.7
setenv = {[testenv:functional]setenv}
{[testenv:dsvm]setenv}
OS_TEST_PATH=./neutron_classifier/tests/functional
sitepackages=True
deps =
{[testenv:functional]deps}
commands = stestr run {posargs}
commands =
stestr run {posargs}
[testenv:venv]
basepython = python3