OSC: Remove SFC V2 calls to neutronclient
Change SFC CLI to use SDK instead of deprecated neutronclient python bindings. Add possibility to delete multiple resources as it is common for other delete commands. Bump SDK minimum version to 1.5.0. Depends-On: https://review.opendev.org/c/openstack/openstacksdk/+/887387 Actually the new SDK release is the dependency. Related-Bug: #1999774 Change-Id: Ic22b8163155904113db8a4acf1fe7d75ae100a84
This commit is contained in:
parent
396432ab06
commit
b9152a5042
@ -55,6 +55,26 @@ _attr_map = (
|
||||
('project_id', 'Project', column_util.LIST_LONG_ONLY),
|
||||
)
|
||||
|
||||
_attr_map_dict = {
|
||||
'id': 'ID',
|
||||
'name': 'Name',
|
||||
'description': 'Description',
|
||||
'summary': 'Summary',
|
||||
'protocol': 'Protocol',
|
||||
'ethertype': 'Ethertype',
|
||||
'source_ip_prefix': 'Source IP',
|
||||
'destination_ip_prefix': 'Destination IP',
|
||||
'logical_source_port': 'Logical Source Port',
|
||||
'logical_destination_port': 'Logical Destination Port',
|
||||
'source_port_range_min': 'Source Port Range Min',
|
||||
'source_port_range_max': 'Source Port Range Max',
|
||||
'destination_port_range_min': 'Destination Port Range Min',
|
||||
'destination_port_range_max': 'Destination Port Range Max',
|
||||
'l7_parameters': 'L7 Parameters',
|
||||
'tenant_id': 'Project',
|
||||
'project_id': 'Project',
|
||||
}
|
||||
|
||||
|
||||
class CreateSfcFlowClassifier(command.ShowOne):
|
||||
_description = _("Create a flow classifier")
|
||||
@ -114,11 +134,11 @@ class CreateSfcFlowClassifier(command.ShowOne):
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
client = self.app.client_manager.network
|
||||
attrs = _get_common_attrs(self.app.client_manager, parsed_args)
|
||||
body = {resource: attrs}
|
||||
obj = client.create_sfc_flow_classifier(body)[resource]
|
||||
columns, display_columns = column_util.get_columns(obj, _attr_map)
|
||||
obj = client.create_sfc_flow_classifier(**attrs)
|
||||
display_columns, columns = utils.get_osc_show_columns_for_sdk_resource(
|
||||
obj, _attr_map_dict, ['location', 'tenant_id', 'summary'])
|
||||
data = utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
@ -131,20 +151,27 @@ class DeleteSfcFlowClassifier(command.Command):
|
||||
parser.add_argument(
|
||||
'flow_classifier',
|
||||
metavar='<flow-classifier>',
|
||||
help=_("Flow classifier to delete (name or ID)")
|
||||
nargs='+',
|
||||
help=_("Flow classifier(s) to delete (name or ID)")
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
# TODO(mohan): Add support for deleting multiple resources.
|
||||
client = self.app.client_manager.neutronclient
|
||||
fc_id = _get_id(client, parsed_args.flow_classifier, resource)
|
||||
try:
|
||||
client.delete_sfc_flow_classifier(fc_id)
|
||||
except Exception as e:
|
||||
msg = (_("Failed to delete flow classifier with name "
|
||||
"or ID '%(fc)s': %(e)s")
|
||||
% {'fc': parsed_args.flow_classifier, 'e': e})
|
||||
client = self.app.client_manager.network
|
||||
result = 0
|
||||
for fcl in parsed_args.flow_classifier:
|
||||
try:
|
||||
fc_id = client.find_sfc_flow_classifier(
|
||||
fcl, ignore_missing=False)['id']
|
||||
client.delete_sfc_flow_classifier(fc_id)
|
||||
except Exception as e:
|
||||
result += 1
|
||||
LOG.error(_("Failed to delete flow classifier with name "
|
||||
"or ID '%(fc)s': %(e)s"), {'fc': fcl, 'e': e})
|
||||
if result > 0:
|
||||
total = len(parsed_args.flow_classifier)
|
||||
msg = (_("%(result)s of %(total)s flow classifier(s) "
|
||||
"failed to delete.") % {'result': result, 'total': total})
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
@ -161,8 +188,8 @@ class ListSfcFlowClassifier(command.Lister):
|
||||
return parser
|
||||
|
||||
def extend_list(self, data, parsed_args):
|
||||
ext_data = data['flow_classifiers']
|
||||
for d in ext_data:
|
||||
ext_data = []
|
||||
for d in data:
|
||||
val = []
|
||||
protocol = d['protocol'].upper() if d['protocol'] else 'any'
|
||||
val.append('protocol: ' + protocol)
|
||||
@ -180,6 +207,7 @@ class ListSfcFlowClassifier(command.Lister):
|
||||
l7_param = 'l7_parameters: {%s}' % ','.join(d['l7_parameters'])
|
||||
val.append(l7_param)
|
||||
d['summary'] = ',\n'.join(val)
|
||||
ext_data.append(d)
|
||||
return ext_data
|
||||
|
||||
def _get_protocol_port_details(self, data, val):
|
||||
@ -197,8 +225,8 @@ class ListSfcFlowClassifier(command.Lister):
|
||||
val, ip_prefix, min_port, max_port)
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
obj = client.list_sfc_flow_classifiers()
|
||||
client = self.app.client_manager.network
|
||||
obj = client.sfc_flow_classifiers()
|
||||
obj_extend = self.extend_list(obj, parsed_args)
|
||||
headers, columns = column_util.get_column_definitions(
|
||||
_attr_map, long_listing=parsed_args.long)
|
||||
@ -227,13 +255,13 @@ class SetSfcFlowClassifier(command.Command):
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
fc_id = _get_id(client, parsed_args.flow_classifier, resource)
|
||||
client = self.app.client_manager.network
|
||||
fc_id = client.find_sfc_flow_classifier(parsed_args.flow_classifier,
|
||||
ignore_missing=False)['id']
|
||||
attrs = _get_common_attrs(self.app.client_manager, parsed_args,
|
||||
is_create=False)
|
||||
body = {resource: attrs}
|
||||
try:
|
||||
client.update_sfc_flow_classifier(fc_id, body)
|
||||
client.update_sfc_flow_classifier(fc_id, **attrs)
|
||||
except Exception as e:
|
||||
msg = (_("Failed to update flow classifier '%(fc)s': %(e)s")
|
||||
% {'fc': parsed_args.flow_classifier, 'e': e})
|
||||
@ -253,10 +281,12 @@ class ShowSfcFlowClassifier(command.ShowOne):
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
fc_id = _get_id(client, parsed_args.flow_classifier, resource)
|
||||
obj = client.show_sfc_flow_classifier(fc_id)[resource]
|
||||
columns, display_columns = column_util.get_columns(obj, _attr_map)
|
||||
client = self.app.client_manager.network
|
||||
fc_id = client.find_sfc_flow_classifier(parsed_args.flow_classifier,
|
||||
ignore_missing=False)['id']
|
||||
obj = client.get_sfc_flow_classifier(fc_id)
|
||||
display_columns, columns = utils.get_osc_show_columns_for_sdk_resource(
|
||||
obj, _attr_map_dict, ['location', 'tenant_id', 'summary'])
|
||||
data = utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
@ -282,13 +312,13 @@ def _get_attrs(client_manager, attrs, parsed_args):
|
||||
if parsed_args.destination_ip_prefix is not None:
|
||||
attrs['destination_ip_prefix'] = parsed_args.destination_ip_prefix
|
||||
if parsed_args.logical_source_port is not None:
|
||||
attrs['logical_source_port'] = _get_id(
|
||||
client_manager.neutronclient, parsed_args.logical_source_port,
|
||||
'port')
|
||||
attrs['logical_source_port'] = client_manager.network.find_port(
|
||||
parsed_args.logical_source_port, ignore_missing=False
|
||||
)['id']
|
||||
if parsed_args.logical_destination_port is not None:
|
||||
attrs['logical_destination_port'] = _get_id(
|
||||
client_manager.neutronclient, parsed_args.logical_destination_port,
|
||||
'port')
|
||||
attrs['logical_destination_port'] = client_manager.network.find_port(
|
||||
parsed_args.logical_destination_port, ignore_missing=False
|
||||
)['id']
|
||||
if parsed_args.source_port is not None:
|
||||
_fill_protocol_port_info(attrs, 'source',
|
||||
parsed_args.source_port)
|
||||
@ -314,7 +344,3 @@ def _fill_protocol_port_info(attrs, port_type, port_val):
|
||||
message = (_("Protocol port value %s must be an integer "
|
||||
"or integer:integer.") % port_val)
|
||||
raise nc_exc.CommandError(message=message)
|
||||
|
||||
|
||||
def _get_id(client, id_or_name, resource):
|
||||
return client.find_resource(resource, id_or_name)['id']
|
||||
|
@ -36,10 +36,20 @@ _attr_map = (
|
||||
('chain_parameters', 'Chain Parameters',
|
||||
column_util.LIST_BOTH),
|
||||
('description', 'Description', column_util.LIST_LONG_ONLY),
|
||||
('chain_id', 'Chain ID', column_util.LIST_BOTH),
|
||||
('project_id', 'Project', column_util.LIST_LONG_ONLY),
|
||||
)
|
||||
|
||||
_attr_map_dict = {
|
||||
'id': 'ID',
|
||||
'name': 'Name',
|
||||
'port_pair_groups': 'Port Pair Groups',
|
||||
'flow_classifiers': 'Flow Classifiers',
|
||||
'chain_parameters': 'Chain Parameters',
|
||||
'description': 'Description',
|
||||
'tenant_id': 'Project',
|
||||
'project_id': 'Project',
|
||||
}
|
||||
|
||||
|
||||
class CreateSfcPortChain(command.ShowOne):
|
||||
_description = _("Create a port chain")
|
||||
@ -81,11 +91,11 @@ class CreateSfcPortChain(command.ShowOne):
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
client = self.app.client_manager.network
|
||||
attrs = _get_common_attrs(self.app.client_manager, parsed_args)
|
||||
body = {resource: attrs}
|
||||
obj = client.create_sfc_port_chain(body)[resource]
|
||||
columns, display_columns = column_util.get_columns(obj, _attr_map)
|
||||
obj = client.create_sfc_port_chain(**attrs)
|
||||
display_columns, columns = utils.get_osc_show_columns_for_sdk_resource(
|
||||
obj, _attr_map_dict, ['location', 'tenant_id'])
|
||||
data = utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
@ -98,20 +108,28 @@ class DeleteSfcPortChain(command.Command):
|
||||
parser.add_argument(
|
||||
'port_chain',
|
||||
metavar="<port-chain>",
|
||||
help=_("Port chain to delete (name or ID)")
|
||||
nargs='+',
|
||||
help=_("Port chain(s) to delete (name or ID)")
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
# TODO(mohan): Add support for deleting multiple resources.
|
||||
client = self.app.client_manager.neutronclient
|
||||
pc_id = _get_id(client, parsed_args.port_chain, resource)
|
||||
try:
|
||||
client.delete_sfc_port_chain(pc_id)
|
||||
except Exception as e:
|
||||
msg = (_("Failed to delete port chain with name "
|
||||
"or ID '%(pc)s': %(e)s")
|
||||
% {'pc': parsed_args.port_chain, 'e': e})
|
||||
client = self.app.client_manager.network
|
||||
result = 0
|
||||
for pc in parsed_args.port_chain:
|
||||
try:
|
||||
pc_id = client.find_sfc_port_chain(
|
||||
pc, ignore_missing=False)['id']
|
||||
client.delete_sfc_port_chain(pc_id)
|
||||
except Exception as e:
|
||||
result += 1
|
||||
LOG.error(_("Failed to delete port chain with name "
|
||||
"or ID '%(pc)s': %(e)s"), {'pc': pc, 'e': e})
|
||||
if result > 0:
|
||||
total = len(parsed_args.port_chain)
|
||||
msg = (_("%(result)s of %(total)s port chain(s) "
|
||||
"failed to delete.") % {'result': result,
|
||||
'total': total})
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
@ -129,13 +147,13 @@ class ListSfcPortChain(command.Lister):
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
data = client.list_sfc_port_chains()
|
||||
client = self.app.client_manager.network
|
||||
data = client.sfc_port_chains()
|
||||
headers, columns = column_util.get_column_definitions(
|
||||
_attr_map, long_listing=parsed_args.long)
|
||||
return (headers,
|
||||
(utils.get_dict_properties(s, columns)
|
||||
for s in data['port_chains']))
|
||||
for s in data))
|
||||
|
||||
|
||||
class SetSfcPortChain(command.Command):
|
||||
@ -184,8 +202,9 @@ class SetSfcPortChain(command.Command):
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
pc_id = _get_id(client, parsed_args.port_chain, resource)
|
||||
client = self.app.client_manager.network
|
||||
pc_id = client.find_sfc_port_chain(parsed_args.port_chain,
|
||||
ignore_missing=False)['id']
|
||||
attrs = _get_common_attrs(self.app.client_manager, parsed_args,
|
||||
is_create=False)
|
||||
if parsed_args.no_flow_classifier:
|
||||
@ -194,13 +213,14 @@ class SetSfcPortChain(command.Command):
|
||||
if parsed_args.no_flow_classifier:
|
||||
fc_list = []
|
||||
else:
|
||||
fc_list = client.find_resource(
|
||||
resource, parsed_args.port_chain,
|
||||
cmd_resource='sfc_port_chain')['flow_classifiers']
|
||||
fc_list = client.find_sfc_port_chain(
|
||||
parsed_args.port_chain,
|
||||
ignore_missing=False
|
||||
)['flow_classifiers']
|
||||
for fc in parsed_args.flow_classifiers:
|
||||
fc_id = client.find_resource(
|
||||
'flow_classifier', fc,
|
||||
cmd_resource='sfc_flow_classifier')['id']
|
||||
fc_id = client.find_sfc_flow_classifier(
|
||||
fc,
|
||||
ignore_missing=False)['id']
|
||||
if fc_id not in fc_list:
|
||||
fc_list.append(fc_id)
|
||||
attrs['flow_classifiers'] = fc_list
|
||||
@ -211,27 +231,25 @@ class SetSfcPortChain(command.Command):
|
||||
if parsed_args.no_port_pair_group and parsed_args.port_pair_groups:
|
||||
ppg_list = []
|
||||
for ppg in parsed_args.port_pair_groups:
|
||||
ppg_id = client.find_resource(
|
||||
'port_pair_group', ppg,
|
||||
cmd_resource='sfc_port_pair_group')['id']
|
||||
ppg_id = client.find_sfc_port_pair_group(
|
||||
ppg, ignore_missing=False)['id']
|
||||
if ppg_id not in ppg_list:
|
||||
ppg_list.append(ppg_id)
|
||||
attrs['port_pair_groups'] = ppg_list
|
||||
if (parsed_args.port_pair_groups and
|
||||
not parsed_args.no_port_pair_group):
|
||||
ppg_list = client.find_resource(
|
||||
resource, parsed_args.port_chain,
|
||||
cmd_resource='sfc_port_chain')['port_pair_groups']
|
||||
ppg_list = client.find_sfc_port_chain(
|
||||
parsed_args.port_chain,
|
||||
ignore_missing=False
|
||||
)['port_pair_groups']
|
||||
for ppg in parsed_args.port_pair_groups:
|
||||
ppg_id = client.find_resource(
|
||||
'port_pair_group', ppg,
|
||||
cmd_resource='sfc_port_pair_group')['id']
|
||||
ppg_id = client.find_sfc_port_pair_group(
|
||||
ppg, ignore_missing=False)['id']
|
||||
if ppg_id not in ppg_list:
|
||||
ppg_list.append(ppg_id)
|
||||
attrs['port_pair_groups'] = ppg_list
|
||||
body = {resource: attrs}
|
||||
try:
|
||||
client.update_sfc_port_chain(pc_id, body)
|
||||
client.update_sfc_port_chain(pc_id, **attrs)
|
||||
except Exception as e:
|
||||
msg = (_("Failed to update port chain '%(pc)s': %(e)s")
|
||||
% {'pc': parsed_args.port_chain, 'e': e})
|
||||
@ -251,10 +269,12 @@ class ShowSfcPortChain(command.ShowOne):
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
pc_id = _get_id(client, parsed_args.port_chain, resource)
|
||||
obj = client.show_sfc_port_chain(pc_id)[resource]
|
||||
columns, display_columns = column_util.get_columns(obj, _attr_map)
|
||||
client = self.app.client_manager.network
|
||||
pc_id = client.find_sfc_port_chain(parsed_args.port_chain,
|
||||
ignore_missing=False)['id']
|
||||
obj = client.get_sfc_port_chain(pc_id)
|
||||
display_columns, columns = utils.get_osc_show_columns_for_sdk_resource(
|
||||
obj, _attr_map_dict, ['location', 'tenant_id'])
|
||||
data = utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
@ -290,30 +310,31 @@ class UnsetSfcPortChain(command.Command):
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
pc_id = _get_id(client, parsed_args.port_chain, resource)
|
||||
client = self.app.client_manager.network
|
||||
pc_id = client.find_sfc_port_chain(parsed_args.port_chain,
|
||||
ignore_missing=False)['id']
|
||||
attrs = {}
|
||||
if parsed_args.flow_classifiers:
|
||||
fc_list = client.find_resource(
|
||||
resource, parsed_args.port_chain,
|
||||
cmd_resource='sfc_port_chain')['flow_classifiers']
|
||||
fc_list = client.find_sfc_port_chain(
|
||||
parsed_args.port_chain, ignore_missing=False
|
||||
)['flow_classifiers']
|
||||
for fc in parsed_args.flow_classifiers:
|
||||
fc_id = client.find_resource(
|
||||
'flow_classifier', fc,
|
||||
cmd_resource='sfc_flow_classifier')['id']
|
||||
fc_id = client.find_sfc_flow_classifier(
|
||||
fc,
|
||||
ignore_missing=False)['id']
|
||||
if fc_id in fc_list:
|
||||
fc_list.remove(fc_id)
|
||||
attrs['flow_classifiers'] = fc_list
|
||||
if parsed_args.all_flow_classifier:
|
||||
attrs['flow_classifiers'] = []
|
||||
if parsed_args.port_pair_groups:
|
||||
ppg_list = client.find_resource(
|
||||
resource, parsed_args.port_chain,
|
||||
cmd_resource='sfc_port_chain')['port_pair_groups']
|
||||
ppg_list = client.find_sfc_port_chain(
|
||||
parsed_args.port_chain,
|
||||
ignore_missing=False)['port_pair_groups']
|
||||
for ppg in parsed_args.port_pair_groups:
|
||||
ppg_id = client.find_resource(
|
||||
'port_pair_group', ppg,
|
||||
cmd_resource='sfc_port_pair_group')['id']
|
||||
ppg_id = client.find_sfc_port_pair_group(
|
||||
ppg,
|
||||
ignore_missing=False)['id']
|
||||
if ppg_id in ppg_list:
|
||||
ppg_list.remove(ppg_id)
|
||||
if ppg_list == []:
|
||||
@ -321,9 +342,8 @@ class UnsetSfcPortChain(command.Command):
|
||||
' specified.')
|
||||
raise exceptions.CommandError(message)
|
||||
attrs['port_pair_groups'] = ppg_list
|
||||
body = {resource: attrs}
|
||||
try:
|
||||
client.update_sfc_port_chain(pc_id, body)
|
||||
client.update_sfc_port_chain(pc_id, **attrs)
|
||||
except Exception as e:
|
||||
msg = (_("Failed to unset port chain '%(pc)s': %(e)s")
|
||||
% {'pc': parsed_args.port_chain, 'e': e})
|
||||
@ -332,17 +352,18 @@ class UnsetSfcPortChain(command.Command):
|
||||
|
||||
def _get_common_attrs(client_manager, parsed_args, is_create=True):
|
||||
attrs = {}
|
||||
client = client_manager.network
|
||||
if parsed_args.name is not None:
|
||||
attrs['name'] = parsed_args.name
|
||||
if parsed_args.description is not None:
|
||||
attrs['description'] = parsed_args.description
|
||||
if parsed_args.port_pair_groups:
|
||||
attrs['port_pair_groups'] = [(_get_id(client_manager.neutronclient,
|
||||
ppg, 'port_pair_group'))
|
||||
attrs['port_pair_groups'] = [client.find_sfc_port_pair_group(
|
||||
ppg, ignore_missing=False)['id']
|
||||
for ppg in parsed_args.port_pair_groups]
|
||||
if parsed_args.flow_classifiers:
|
||||
attrs['flow_classifiers'] = [(_get_id(client_manager.neutronclient, fc,
|
||||
'flow_classifier'))
|
||||
attrs['flow_classifiers'] = [client.find_sfc_flow_classifier(
|
||||
fc, ignore_missing=False)['id']
|
||||
for fc in parsed_args.flow_classifiers]
|
||||
if is_create is True:
|
||||
_get_attrs(attrs, parsed_args)
|
||||
@ -358,7 +379,3 @@ def _get_attrs(attrs, parsed_args):
|
||||
if 'symmetric' in chain_param:
|
||||
chain_params['symmetric'] = chain_param['symmetric']
|
||||
attrs['chain_parameters'] = chain_params
|
||||
|
||||
|
||||
def _get_id(client, id_or_name, resource):
|
||||
return client.find_resource(resource, id_or_name)['id']
|
||||
|
@ -38,6 +38,17 @@ _attr_map = (
|
||||
('project_id', 'Project', column_util.LIST_LONG_ONLY),
|
||||
)
|
||||
|
||||
_attr_map_dict = {
|
||||
'id': 'ID',
|
||||
'name': 'Name',
|
||||
'description': 'Description',
|
||||
'ingress': 'Ingress Logical Port',
|
||||
'egress': 'Egress Logical Port',
|
||||
'service_function_parameters': 'Service Function Parameters',
|
||||
'tenant_id': 'Project',
|
||||
'project_id': 'Project',
|
||||
}
|
||||
|
||||
|
||||
class CreateSfcPortPair(command.ShowOne):
|
||||
_description = _("Create a port pair")
|
||||
@ -76,11 +87,11 @@ class CreateSfcPortPair(command.ShowOne):
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
client = self.app.client_manager.network
|
||||
attrs = _get_common_attrs(self.app.client_manager, parsed_args)
|
||||
body = {resource: attrs}
|
||||
obj = client.create_sfc_port_pair(body)[resource]
|
||||
columns, display_columns = column_util.get_columns(obj, _attr_map)
|
||||
obj = client.create_sfc_port_pair(**attrs)
|
||||
display_columns, columns = utils.get_osc_show_columns_for_sdk_resource(
|
||||
obj, _attr_map_dict, ['location', 'tenant_id'])
|
||||
data = utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
@ -93,20 +104,29 @@ class DeleteSfcPortPair(command.Command):
|
||||
parser.add_argument(
|
||||
'port_pair',
|
||||
metavar="<port-pair>",
|
||||
help=_("Port pair to delete (name or ID)")
|
||||
nargs='+',
|
||||
help=_("Port pair(s) to delete (name or ID)")
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
# TODO(mohan): Add support for deleting multiple resources.
|
||||
client = self.app.client_manager.neutronclient
|
||||
port_pair_id = _get_id(client, parsed_args.port_pair, resource)
|
||||
try:
|
||||
client.delete_sfc_port_pair(port_pair_id)
|
||||
except Exception as e:
|
||||
msg = (_("Failed to delete port pair with name "
|
||||
"or ID '%(port_pair)s': %(e)s")
|
||||
% {'port_pair': parsed_args.port_pair, 'e': e})
|
||||
client = self.app.client_manager.network
|
||||
result = 0
|
||||
for pp in parsed_args.port_pair:
|
||||
try:
|
||||
port_pair_id = client.find_sfc_port_pair(
|
||||
pp, ignore_missing=False)['id']
|
||||
client.delete_sfc_port_pair(port_pair_id)
|
||||
except Exception as e:
|
||||
result += 1
|
||||
LOG.error(_("Failed to delete port pair with name "
|
||||
"or ID '%(port_pair)s': %(e)s"),
|
||||
{'port_pair': pp, 'e': e})
|
||||
if result > 0:
|
||||
total = len(parsed_args.port_pair)
|
||||
msg = (_("%(result)s of %(total)s port pair(s) "
|
||||
"failed to delete.") % {'result': result,
|
||||
'total': total})
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
@ -123,14 +143,14 @@ class ListSfcPortPair(command.Lister):
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
data = client.list_sfc_port_pairs()
|
||||
client = self.app.client_manager.network
|
||||
data = client.sfc_port_pairs()
|
||||
headers, columns = column_util.get_column_definitions(
|
||||
_attr_map, long_listing=parsed_args.long)
|
||||
return (headers,
|
||||
(utils.get_dict_properties(
|
||||
s, columns,
|
||||
) for s in data['port_pairs']))
|
||||
) for s in data))
|
||||
|
||||
|
||||
class SetSfcPortPair(command.Command):
|
||||
@ -154,13 +174,14 @@ class SetSfcPortPair(command.Command):
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
port_pair_id = _get_id(client, parsed_args.port_pair, resource)
|
||||
client = self.app.client_manager.network
|
||||
port_pair_id = client.find_sfc_port_pair(
|
||||
parsed_args.port_pair, ignore_missing=False
|
||||
)['id']
|
||||
attrs = _get_common_attrs(self.app.client_manager, parsed_args,
|
||||
is_create=False)
|
||||
body = {resource: attrs}
|
||||
try:
|
||||
client.update_sfc_port_pair(port_pair_id, body)
|
||||
client.update_sfc_port_pair(port_pair_id, **attrs)
|
||||
except Exception as e:
|
||||
msg = (_("Failed to update port pair '%(port_pair)s': %(e)s")
|
||||
% {'port_pair': parsed_args.port_pair, 'e': e})
|
||||
@ -180,10 +201,13 @@ class ShowSfcPortPair(command.ShowOne):
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
port_pair_id = _get_id(client, parsed_args.port_pair, resource)
|
||||
obj = client.show_sfc_port_pair(port_pair_id)[resource]
|
||||
columns, display_columns = column_util.get_columns(obj, _attr_map)
|
||||
client = self.app.client_manager.network
|
||||
port_pair_id = client.find_sfc_port_pair(
|
||||
parsed_args.port_pair, ignore_missing=False
|
||||
)['id']
|
||||
obj = client.get_sfc_port_pair(port_pair_id)
|
||||
display_columns, columns = utils.get_osc_show_columns_for_sdk_resource(
|
||||
obj, _attr_map_dict, ['location', 'tenant_id'])
|
||||
data = utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
@ -200,12 +224,15 @@ def _get_common_attrs(client_manager, parsed_args, is_create=True):
|
||||
|
||||
|
||||
def _get_attrs(client_manager, attrs, parsed_args):
|
||||
client = client_manager.network
|
||||
if parsed_args.ingress is not None:
|
||||
attrs['ingress'] = _get_id(client_manager.neutronclient,
|
||||
parsed_args.ingress, 'port')
|
||||
attrs['ingress'] = client.find_port(
|
||||
parsed_args.ingress, ignore_missing=False
|
||||
)['id']
|
||||
if parsed_args.egress is not None:
|
||||
attrs['egress'] = _get_id(client_manager.neutronclient,
|
||||
parsed_args.egress, 'port')
|
||||
attrs['egress'] = client.find_port(
|
||||
parsed_args.egress, ignore_missing=False
|
||||
)['id']
|
||||
if parsed_args.service_function_parameters is not None:
|
||||
attrs['service_function_parameters'] = _get_service_function_params(
|
||||
parsed_args.service_function_parameters)
|
||||
@ -222,7 +249,3 @@ def _get_service_function_params(sf_params):
|
||||
if 'weight' in sf_param:
|
||||
attrs['weight'] = sf_param['weight']
|
||||
return attrs
|
||||
|
||||
|
||||
def _get_id(client, id_or_name, resource):
|
||||
return client.find_resource(resource, id_or_name)['id']
|
||||
|
@ -34,11 +34,21 @@ _attr_map = (
|
||||
('port_pair_group_parameters', 'Port Pair Group Parameters',
|
||||
column_util.LIST_BOTH),
|
||||
('description', 'Description', column_util.LIST_LONG_ONLY),
|
||||
('group_id', 'Loadbalance ID', column_util.LIST_LONG_ONLY),
|
||||
('project_id', 'Project', column_util.LIST_LONG_ONLY),
|
||||
('tap_enabled', 'Tap Enabled', column_util.LIST_BOTH)
|
||||
('is_tap_enabled', 'Tap Enabled', column_util.LIST_BOTH)
|
||||
)
|
||||
|
||||
_attr_map_dict = {
|
||||
'id': 'ID',
|
||||
'name': 'Name',
|
||||
'description': 'Description',
|
||||
'port_pairs': 'Port Pair',
|
||||
'port_pair_group_parameters': 'Port Pair Group Parameters',
|
||||
'is_tap_enabled': 'Tap Enabled',
|
||||
'tenant_id': 'Project',
|
||||
'project_id': 'Project',
|
||||
}
|
||||
|
||||
|
||||
class CreateSfcPortPairGroup(command.ShowOne):
|
||||
_description = _("Create a port pair group")
|
||||
@ -85,11 +95,11 @@ class CreateSfcPortPairGroup(command.ShowOne):
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
client = self.app.client_manager.network
|
||||
attrs = _get_common_attrs(self.app.client_manager, parsed_args)
|
||||
body = {resource: attrs}
|
||||
obj = client.create_sfc_port_pair_group(body)[resource]
|
||||
columns, display_columns = column_util.get_columns(obj, _attr_map)
|
||||
obj = client.create_sfc_port_pair_group(**attrs)
|
||||
display_columns, columns = utils.get_osc_show_columns_for_sdk_resource(
|
||||
obj, _attr_map_dict, ['location', 'tenant_id'])
|
||||
data = utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
@ -102,20 +112,28 @@ class DeleteSfcPortPairGroup(command.Command):
|
||||
parser.add_argument(
|
||||
'port_pair_group',
|
||||
metavar='<port-pair-group>',
|
||||
help=_("Port pair group to delete (name or ID)")
|
||||
nargs='+',
|
||||
help=_("Port pair group(s) to delete (name or ID)")
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
# TODO(mohan): Add support for deleting multiple resources.
|
||||
client = self.app.client_manager.neutronclient
|
||||
ppg_id = _get_id(client, parsed_args.port_pair_group, resource)
|
||||
try:
|
||||
client.delete_sfc_port_pair_group(ppg_id)
|
||||
except Exception as e:
|
||||
msg = (_("Failed to delete port pair group with name "
|
||||
"or ID '%(ppg)s': %(e)s")
|
||||
% {'ppg': parsed_args.port_pair_group, 'e': e})
|
||||
client = self.app.client_manager.network
|
||||
result = 0
|
||||
for ppg in parsed_args.port_pair_group:
|
||||
try:
|
||||
ppg_id = client.find_sfc_port_pair_group(
|
||||
ppg, ignore_missing=False)['id']
|
||||
client.delete_sfc_port_pair_group(ppg_id)
|
||||
except Exception as e:
|
||||
result += 1
|
||||
LOG.error(_("Failed to delete port pair group with name "
|
||||
"or ID '%(ppg)s': %(e)s"), {'ppg': ppg, 'e': e})
|
||||
if result > 0:
|
||||
total = len(parsed_args.port_pair_group)
|
||||
msg = (_("%(result)s of %(total)s port pair group(s) "
|
||||
"failed to delete.") % {'result': result,
|
||||
'total': total})
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
@ -133,14 +151,14 @@ class ListSfcPortPairGroup(command.Lister):
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
data = client.list_sfc_port_pair_groups()
|
||||
client = self.app.client_manager.network
|
||||
data = client.sfc_port_pair_groups()
|
||||
headers, columns = column_util.get_column_definitions(
|
||||
_attr_map, long_listing=parsed_args.long)
|
||||
return (headers,
|
||||
(utils.get_dict_properties(
|
||||
s, columns,
|
||||
) for s in data['port_pair_groups']))
|
||||
) for s in data))
|
||||
|
||||
|
||||
class SetSfcPortPairGroup(command.Command):
|
||||
@ -175,26 +193,26 @@ class SetSfcPortPairGroup(command.Command):
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
ppg_id = _get_id(client, parsed_args.port_pair_group, resource)
|
||||
client = self.app.client_manager.network
|
||||
ppg_id = client.find_sfc_port_pair_group(
|
||||
parsed_args.port_pair_group)['id']
|
||||
attrs = _get_common_attrs(self.app.client_manager, parsed_args,
|
||||
is_create=False)
|
||||
if parsed_args.no_port_pair:
|
||||
attrs['port_pairs'] = []
|
||||
if parsed_args.port_pairs:
|
||||
added = [client.find_resource('port_pair', pp,
|
||||
cmd_resource='sfc_port_pair')['id']
|
||||
added = [client.find_sfc_port_pair(pp,
|
||||
ignore_missing=False)['id']
|
||||
for pp in parsed_args.port_pairs]
|
||||
if parsed_args.no_port_pair:
|
||||
existing = []
|
||||
else:
|
||||
existing = client.find_resource(
|
||||
resource, parsed_args.port_pair_group,
|
||||
cmd_resource='sfc_port_pair_group')['port_pairs']
|
||||
existing = client.find_sfc_port_pair_group(
|
||||
parsed_args.port_pair_group,
|
||||
ignore_missing=False)['port_pairs']
|
||||
attrs['port_pairs'] = sorted(list(set(existing) | set(added)))
|
||||
body = {resource: attrs}
|
||||
try:
|
||||
client.update_sfc_port_pair_group(ppg_id, body)
|
||||
client.update_sfc_port_pair_group(ppg_id, **attrs)
|
||||
except Exception as e:
|
||||
msg = (_("Failed to update port pair group '%(ppg)s': %(e)s")
|
||||
% {'ppg': parsed_args.port_pair_group, 'e': e})
|
||||
@ -214,10 +232,12 @@ class ShowSfcPortPairGroup(command.ShowOne):
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
ppg_id = _get_id(client, parsed_args.port_pair_group, resource)
|
||||
obj = client.show_sfc_port_pair_group(ppg_id)[resource]
|
||||
columns, display_columns = column_util.get_columns(obj, _attr_map)
|
||||
client = self.app.client_manager.network
|
||||
ppg_id = client.find_sfc_port_pair_group(
|
||||
parsed_args.port_pair_group, ignore_missing=False)['id']
|
||||
obj = client.get_sfc_port_pair_group(ppg_id)
|
||||
display_columns, columns = utils.get_osc_show_columns_for_sdk_resource(
|
||||
obj, _attr_map_dict, ['location', 'tenant_id'])
|
||||
data = utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
@ -246,22 +266,22 @@ class UnsetSfcPortPairGroup(command.Command):
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
ppg_id = _get_id(client, parsed_args.port_pair_group, resource)
|
||||
client = self.app.client_manager.network
|
||||
ppg_id = client.find_sfc_port_pair_group(
|
||||
parsed_args.port_pair_group, ignore_missing=False)['id']
|
||||
attrs = {}
|
||||
if parsed_args.port_pairs:
|
||||
existing = client.find_resource(
|
||||
resource, parsed_args.port_pair_group,
|
||||
cmd_resource='sfc_port_pair_group')['port_pairs']
|
||||
removed = [client.find_resource('port_pair', pp,
|
||||
cmd_resource='sfc_port_pair')['id']
|
||||
existing = client.find_sfc_port_pair_group(
|
||||
parsed_args.port_pair_group,
|
||||
ignore_missing=False)['port_pairs']
|
||||
removed = [client.find_sfc_port_pair(pp,
|
||||
ignore_missing=False)['id']
|
||||
for pp in parsed_args.port_pairs]
|
||||
attrs['port_pairs'] = list(set(existing) - set(removed))
|
||||
if parsed_args.all_port_pair:
|
||||
attrs['port_pairs'] = []
|
||||
body = {resource: attrs}
|
||||
try:
|
||||
client.update_sfc_port_pair_group(ppg_id, body)
|
||||
client.update_sfc_port_pair_group(ppg_id, **attrs)
|
||||
except Exception as e:
|
||||
msg = (_("Failed to unset port pair group '%(ppg)s': %(e)s")
|
||||
% {'ppg': parsed_args.port_pair_group, 'e': e})
|
||||
@ -280,15 +300,17 @@ def _get_ppg_param(attrs, ppg):
|
||||
|
||||
|
||||
def _get_common_attrs(client_manager, parsed_args, is_create=True):
|
||||
client = client_manager.network
|
||||
attrs = {}
|
||||
if parsed_args.name is not None:
|
||||
attrs['name'] = parsed_args.name
|
||||
if parsed_args.description is not None:
|
||||
attrs['description'] = parsed_args.description
|
||||
if parsed_args.port_pairs:
|
||||
attrs['port_pairs'] = [(_get_id(client_manager.neutronclient, pp,
|
||||
'port_pair'))
|
||||
attrs['port_pairs'] = [client.find_sfc_port_pair(
|
||||
pp, ignore_missing=False)['id']
|
||||
for pp in parsed_args.port_pairs]
|
||||
|
||||
if is_create:
|
||||
_get_attrs(attrs, parsed_args)
|
||||
return attrs
|
||||
@ -302,7 +324,3 @@ def _get_attrs(attrs, parsed_args):
|
||||
attrs['tap_enabled'] = True
|
||||
if parsed_args.disable_tap:
|
||||
attrs['tap_enabled'] = False
|
||||
|
||||
|
||||
def _get_id(client, id_or_name, resource):
|
||||
return client.find_resource(resource, id_or_name)['id']
|
||||
|
@ -33,6 +33,15 @@ _attr_map = (
|
||||
('project_id', 'Project', column_util.LIST_LONG_ONLY),
|
||||
)
|
||||
|
||||
_attr_map_dict = {
|
||||
'id': 'ID',
|
||||
'name': 'Name',
|
||||
'description': 'Description',
|
||||
'port_chains': 'Branching Points',
|
||||
'tenant_id': 'Project',
|
||||
'project_id': 'Project',
|
||||
}
|
||||
|
||||
|
||||
class CreateSfcServiceGraph(command.ShowOne):
|
||||
"""Create a service graph."""
|
||||
@ -57,12 +66,13 @@ class CreateSfcServiceGraph(command.ShowOne):
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
client = self.app.client_manager.network
|
||||
attrs = _get_common_attrs(self.app.client_manager, parsed_args)
|
||||
try:
|
||||
body = {resource: attrs}
|
||||
obj = client.create_sfc_service_graph(body)[resource]
|
||||
columns, display_columns = column_util.get_columns(obj, _attr_map)
|
||||
obj = client.create_sfc_service_graph(**attrs)
|
||||
display_columns, columns = \
|
||||
utils.get_osc_show_columns_for_sdk_resource(
|
||||
obj, _attr_map_dict, ['location', 'tenant_id'])
|
||||
data = utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
except Exception as e:
|
||||
@ -92,13 +102,13 @@ class SetSfcServiceGraph(command.Command):
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
service_graph_id = _get_id(client, parsed_args.service_graph, resource)
|
||||
client = self.app.client_manager.network
|
||||
service_graph_id = client.find_sfc_service_graph(
|
||||
parsed_args.service_graph, ignore_missing=False)['id']
|
||||
attrs = _get_common_attrs(self.app.client_manager, parsed_args,
|
||||
is_create=False)
|
||||
body = {resource: attrs}
|
||||
try:
|
||||
client.update_sfc_service_graph(service_graph_id, body)
|
||||
client.update_sfc_service_graph(service_graph_id, **attrs)
|
||||
except Exception as e:
|
||||
msg = (_("Failed to update service graph "
|
||||
"'%(service_graph)s': %(e)s")
|
||||
@ -114,14 +124,30 @@ class DeleteSfcServiceGraph(command.Command):
|
||||
parser.add_argument(
|
||||
'service_graph',
|
||||
metavar="<service-graph>",
|
||||
help=_("ID or name of the service graph to delete.")
|
||||
nargs='+',
|
||||
help=_("ID or name of the service graph(s) to delete.")
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
id = _get_id(client, parsed_args.service_graph, resource)
|
||||
client.delete_sfc_service_graph(id)
|
||||
client = self.app.client_manager.network
|
||||
result = 0
|
||||
for sg in parsed_args.service_graph:
|
||||
try:
|
||||
sg_id = client.find_sfc_service_graph(
|
||||
sg, ignore_missing=False)['id']
|
||||
client.delete_sfc_service_graph(sg_id)
|
||||
except Exception as e:
|
||||
result += 1
|
||||
LOG.error(_("Failed to delete service graph with name "
|
||||
"or ID '%(sg)s': %(e)s"),
|
||||
{'sg': sg, 'e': e})
|
||||
if result > 0:
|
||||
total = len(parsed_args.service_graph)
|
||||
msg = (_("%(result)s of %(total)s service graph(s) "
|
||||
"failed to delete.") % {'result': result,
|
||||
'total': total})
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
class ListSfcServiceGraph(command.Lister):
|
||||
@ -138,13 +164,13 @@ class ListSfcServiceGraph(command.Lister):
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
data = client.list_sfc_service_graphs()
|
||||
client = self.app.client_manager.network
|
||||
data = client.sfc_service_graphs()
|
||||
headers, columns = column_util.get_column_definitions(
|
||||
_attr_map, long_listing=parsed_args.long)
|
||||
return (headers,
|
||||
(utils.get_dict_properties(s, columns)
|
||||
for s in data['service_graphs']))
|
||||
for s in data))
|
||||
|
||||
|
||||
class ShowSfcServiceGraph(command.ShowOne):
|
||||
@ -160,10 +186,12 @@ class ShowSfcServiceGraph(command.ShowOne):
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
sg_id = _get_id(client, parsed_args.service_graph, resource)
|
||||
obj = client.show_sfc_service_graph(sg_id)[resource]
|
||||
columns, display_columns = column_util.get_columns(obj, _attr_map)
|
||||
client = self.app.client_manager.network
|
||||
sg_id = client.find_sfc_service_graph(parsed_args.service_graph,
|
||||
ignore_missing=False)['id']
|
||||
obj = client.get_sfc_service_graph(sg_id)
|
||||
display_columns, columns = utils.get_osc_show_columns_for_sdk_resource(
|
||||
obj, _attr_map_dict, ['location', 'tenant_id'])
|
||||
data = utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
@ -179,10 +207,10 @@ def _get_common_attrs(client_manager, parsed_args, is_create=True):
|
||||
return attrs
|
||||
|
||||
|
||||
def _validate_destination_chains(comma_split, attrs, client_manager, sc_):
|
||||
def _validate_destination_chains(comma_split, attrs, client, sc_):
|
||||
for e in comma_split:
|
||||
if e != "":
|
||||
dc_ = _get_id(client_manager.neutronclient, e, 'port_chain')
|
||||
dc_ = client.find_sfc_port_chain(e, ignore_missing=False)['id']
|
||||
attrs['port_chains'][sc_].append(dc_)
|
||||
if _check_cycle(attrs['port_chains'], sc_, dc_):
|
||||
raise(exceptions.CommandError(
|
||||
@ -214,6 +242,7 @@ def _visit(graph, src, new_dest, new_src):
|
||||
|
||||
|
||||
def _get_attrs_for_create(client_manager, attrs, parsed_args):
|
||||
client = client_manager.network
|
||||
if parsed_args.branching_points:
|
||||
attrs['port_chains'] = {}
|
||||
src_chain = None
|
||||
@ -224,8 +253,8 @@ def _get_attrs_for_create(client_manager, attrs, parsed_args):
|
||||
"destination chain for each source chain.")
|
||||
colon_split = c.split(':')
|
||||
src_chain = colon_split.pop(0)
|
||||
sc_ = _get_id(client_manager.neutronclient,
|
||||
src_chain, 'port_chain')
|
||||
sc_ = client.find_sfc_port_chain(src_chain,
|
||||
ignore_missing=False)['id']
|
||||
for i in colon_split:
|
||||
comma_split = i.split(',')
|
||||
unique = set(comma_split)
|
||||
@ -240,8 +269,4 @@ def _get_attrs_for_create(client_manager, attrs, parsed_args):
|
||||
"use already ".format(src_chain))
|
||||
attrs['port_chains'][sc_] = []
|
||||
_validate_destination_chains(
|
||||
comma_split, attrs, client_manager, sc_)
|
||||
|
||||
|
||||
def _get_id(client, id_or_name, resource):
|
||||
return client.find_resource(resource, id_or_name)['id']
|
||||
comma_split, attrs, client, sc_)
|
||||
|
@ -14,13 +14,18 @@
|
||||
# under the License.
|
||||
|
||||
import argparse
|
||||
import copy
|
||||
from unittest import mock
|
||||
|
||||
|
||||
from osc_lib.tests import utils
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from openstack.network.v2 import sfc_flow_classifier as flow_classifier
|
||||
from openstack.network.v2 import sfc_port_chain as port_chain
|
||||
from openstack.network.v2 import sfc_port_pair as port_pair
|
||||
from openstack.network.v2 import sfc_port_pair_group as port_pair_group
|
||||
from openstack.network.v2 import sfc_service_graph as service_graph
|
||||
|
||||
|
||||
class TestNeutronClientOSCV2(utils.TestCommand):
|
||||
|
||||
@ -28,12 +33,32 @@ class TestNeutronClientOSCV2(utils.TestCommand):
|
||||
super(TestNeutronClientOSCV2, self).setUp()
|
||||
self.namespace = argparse.Namespace()
|
||||
self.app.client_manager.session = mock.Mock()
|
||||
self.app.client_manager.neutronclient = mock.Mock()
|
||||
self.neutronclient = self.app.client_manager.neutronclient
|
||||
self.neutronclient.find_resource = mock.Mock(
|
||||
side_effect=lambda resource, name_or_id, project_id=None,
|
||||
cmd_resource=None, parent_id=None, fields=None:
|
||||
{'id': name_or_id})
|
||||
self.app.client_manager.network = mock.Mock()
|
||||
self.network = self.app.client_manager.network
|
||||
self.network.find_sfc_flow_classifier = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing=False:
|
||||
{'id': name_or_id}
|
||||
)
|
||||
self.network.find_sfc_port_chain = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing=False:
|
||||
{'id': name_or_id}
|
||||
)
|
||||
self.network.find_sfc_port_pair = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing=False:
|
||||
{'id': name_or_id}
|
||||
)
|
||||
self.network.find_sfc_port_pair_group = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing=False:
|
||||
{'id': name_or_id}
|
||||
)
|
||||
self.network.find_sfc_service_graph = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing=False:
|
||||
{'id': name_or_id}
|
||||
)
|
||||
self.network.find_port = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing=False:
|
||||
{'id': name_or_id}
|
||||
)
|
||||
|
||||
|
||||
class FakeSfcPortPair(object):
|
||||
@ -58,13 +83,14 @@ class FakeSfcPortPair(object):
|
||||
'id': uuidutils.generate_uuid(),
|
||||
'ingress': uuidutils.generate_uuid(),
|
||||
'name': 'port-pair-name',
|
||||
'service_function_parameters': 'correlation=None,weight=1',
|
||||
'service_function_parameters': [('correlation', None),
|
||||
('weight', 1)],
|
||||
'project_id': uuidutils.generate_uuid(),
|
||||
}
|
||||
|
||||
# Overwrite default attributes.
|
||||
port_pair_attrs.update(attrs)
|
||||
return copy.deepcopy(port_pair_attrs)
|
||||
return port_pair.SfcPortPair(**port_pair_attrs)
|
||||
|
||||
@staticmethod
|
||||
def create_port_pairs(attrs=None, count=1):
|
||||
@ -102,18 +128,16 @@ class FakeSfcPortPairGroup(object):
|
||||
# Set default attributes.
|
||||
port_pair_group_attrs = {
|
||||
'id': uuidutils.generate_uuid(),
|
||||
'group_id': uuidutils.generate_uuid(),
|
||||
'name': 'port-pair-group-name',
|
||||
'description': 'description',
|
||||
'port_pairs': uuidutils.generate_uuid(),
|
||||
'port_pair_group_parameters': '{"lb_fields": []}',
|
||||
'port_pair_group_parameters': {"lb_fields": []},
|
||||
'project_id': uuidutils.generate_uuid(),
|
||||
'tap_enabled': False
|
||||
}
|
||||
|
||||
# port_pair_group_attrs default attributes.
|
||||
port_pair_group_attrs.update(attrs)
|
||||
return copy.deepcopy(port_pair_group_attrs)
|
||||
return port_pair_group.SfcPortPairGroup(**port_pair_group_attrs)
|
||||
|
||||
@staticmethod
|
||||
def create_port_pair_groups(attrs=None, count=1):
|
||||
@ -164,10 +188,10 @@ class FakeSfcFlowClassifier(object):
|
||||
'source_port_range_max': '20',
|
||||
'source_port_range_min': '10',
|
||||
'project_id': uuidutils.generate_uuid(),
|
||||
'l7_parameters': '{}'
|
||||
'l7_parameters': {}
|
||||
}
|
||||
flow_classifier_attrs.update(attrs)
|
||||
return copy.deepcopy(flow_classifier_attrs)
|
||||
return flow_classifier.SfcFlowClassifier(**flow_classifier_attrs)
|
||||
|
||||
@staticmethod
|
||||
def create_flow_classifiers(attrs=None, count=1):
|
||||
@ -205,18 +229,16 @@ class FakeSfcPortChain(object):
|
||||
# Set default attributes.
|
||||
port_chain_attrs = {
|
||||
'id': uuidutils.generate_uuid(),
|
||||
'chain_id': uuidutils.generate_uuid(),
|
||||
'name': 'port-chain-name',
|
||||
'description': 'description',
|
||||
'port_pair_groups': uuidutils.generate_uuid(),
|
||||
'flow_classifiers': uuidutils.generate_uuid(),
|
||||
'chain_parameters': '{"correlation": mpls}',
|
||||
'chain_parameters': {"correlation": "mpls", "symmetric": False},
|
||||
'project_id': uuidutils.generate_uuid(),
|
||||
}
|
||||
|
||||
# port_pair_group_attrs default attributes.
|
||||
port_chain_attrs.update(attrs)
|
||||
return copy.deepcopy(port_chain_attrs)
|
||||
return port_chain.SfcPortChain(**port_chain_attrs)
|
||||
|
||||
@staticmethod
|
||||
def create_port_chains(attrs=None, count=1):
|
||||
@ -260,7 +282,7 @@ class FakeSfcServiceGraph(object):
|
||||
}
|
||||
|
||||
service_graph_attrs.update(attrs)
|
||||
return copy.deepcopy(service_graph_attrs)
|
||||
return service_graph.SfcServiceGraph(**service_graph_attrs)
|
||||
|
||||
@staticmethod
|
||||
def create_sfc_service_graphs(attrs=None, count=1):
|
||||
|
@ -15,15 +15,12 @@
|
||||
|
||||
from unittest import mock
|
||||
|
||||
from osc_lib import exceptions
|
||||
import testtools
|
||||
|
||||
from neutronclient.osc.v2.sfc import sfc_flow_classifier
|
||||
from neutronclient.tests.unit.osc.v2.sfc import fakes
|
||||
|
||||
get_id = 'neutronclient.osc.v2.sfc.sfc_flow_classifier._get_id'
|
||||
|
||||
|
||||
def _get_id(client, id_or_name, resource):
|
||||
return id_or_name
|
||||
|
||||
|
||||
class TestCreateSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
@ -43,7 +40,8 @@ class TestCreateSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
'Protocol',
|
||||
'Source IP',
|
||||
'Source Port Range Max',
|
||||
'Source Port Range Min')
|
||||
'Source Port Range Min',
|
||||
'Summary',)
|
||||
|
||||
def get_data(self):
|
||||
return (
|
||||
@ -66,9 +64,8 @@ class TestCreateSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestCreateSfcFlowClassifier, self).setUp()
|
||||
mock.patch(get_id, new=_get_id).start()
|
||||
self.neutronclient.create_sfc_flow_classifier = mock.Mock(
|
||||
return_value={'flow_classifier': self._fc})
|
||||
self.network.create_sfc_flow_classifier = mock.Mock(
|
||||
return_value=self._fc)
|
||||
self.data = self.get_data()
|
||||
|
||||
# Get the command object to test
|
||||
@ -90,14 +87,13 @@ class TestCreateSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = (self.cmd.take_action(parsed_args))
|
||||
|
||||
self.neutronclient.create_sfc_flow_classifier.assert_called_once_with({
|
||||
'flow_classifier': {
|
||||
'name': self._fc['name'],
|
||||
'logical_source_port': self._fc['logical_source_port'],
|
||||
'ethertype': self._fc['ethertype']}
|
||||
})
|
||||
self.network.create_sfc_flow_classifier.assert_called_once_with(
|
||||
**{'name': self._fc['name'],
|
||||
'logical_source_port': self._fc['logical_source_port'],
|
||||
'ethertype': self._fc['ethertype']
|
||||
}
|
||||
)
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
def test_create_flow_classifier(self):
|
||||
arglist = [
|
||||
@ -129,8 +125,8 @@ class TestCreateSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = (self.cmd.take_action(parsed_args))
|
||||
self.neutronclient.create_sfc_flow_classifier.assert_called_once_with({
|
||||
'flow_classifier': {
|
||||
self.network.create_sfc_flow_classifier.assert_called_once_with(
|
||||
**{
|
||||
'name': self._fc['name'],
|
||||
'description': self._fc['description'],
|
||||
'ethertype': self._fc['ethertype'],
|
||||
@ -142,9 +138,8 @@ class TestCreateSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
self._fc['logical_destination_port'],
|
||||
'l7_parameters': param
|
||||
}
|
||||
})
|
||||
)
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
|
||||
class TestDeleteSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
@ -154,20 +149,19 @@ class TestDeleteSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestDeleteSfcFlowClassifier, self).setUp()
|
||||
mock.patch(get_id, new=_get_id).start()
|
||||
self.neutronclient.delete_sfc_flow_classifier = mock.Mock(
|
||||
self.network.delete_sfc_flow_classifier = mock.Mock(
|
||||
return_value=None)
|
||||
self.cmd = sfc_flow_classifier.DeleteSfcFlowClassifier(self.app,
|
||||
self.namespace)
|
||||
|
||||
def test_delete_flow_classifier(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
client = self.app.client_manager.network
|
||||
mock_flow_classifier_delete = client.delete_sfc_flow_classifier
|
||||
arglist = [
|
||||
self._flow_classifier[0]['id'],
|
||||
]
|
||||
verifylist = [
|
||||
('flow_classifier', self._flow_classifier[0]['id']),
|
||||
('flow_classifier', [self._flow_classifier[0]['id']]),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
@ -175,6 +169,22 @@ class TestDeleteSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
self._flow_classifier[0]['id'])
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_delete_multiple_flow_classifiers_with_exception(self):
|
||||
client = self.app.client_manager.network
|
||||
target1 = self._flow_classifier[0]['id']
|
||||
arglist = [target1]
|
||||
verifylist = [('flow_classifier', [target1])]
|
||||
|
||||
client.find_sfc_flow_classifier.side_effect = [
|
||||
target1, exceptions.CommandError
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
msg = "1 of 2 flow classifier(s) failed to delete."
|
||||
with testtools.ExpectedException(exceptions.CommandError) as e:
|
||||
self.cmd.take_action(parsed_args)
|
||||
self.assertEqual(msg, str(e))
|
||||
|
||||
|
||||
class TestSetSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
_flow_classifier = fakes.FakeSfcFlowClassifier.create_flow_classifier()
|
||||
@ -183,14 +193,13 @@ class TestSetSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestSetSfcFlowClassifier, self).setUp()
|
||||
mock.patch(get_id, new=_get_id).start()
|
||||
self.neutronclient.update_sfc_flow_classifier = mock.Mock(
|
||||
self.network.update_sfc_flow_classifier = mock.Mock(
|
||||
return_value=None)
|
||||
self.cmd = sfc_flow_classifier.SetSfcFlowClassifier(self.app,
|
||||
self.namespace)
|
||||
|
||||
def test_set_flow_classifier(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
client = self.app.client_manager.network
|
||||
mock_flow_classifier_update = client.update_sfc_flow_classifier
|
||||
arglist = [
|
||||
self._flow_classifier_name,
|
||||
@ -206,11 +215,11 @@ class TestSetSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
|
||||
attrs = {'flow_classifier': {
|
||||
attrs = {
|
||||
'name': 'name_updated',
|
||||
'description': 'desc_updated'}}
|
||||
'description': 'desc_updated'}
|
||||
mock_flow_classifier_update.assert_called_once_with(
|
||||
self._flow_classifier_name, attrs)
|
||||
self._flow_classifier_name, **attrs)
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
@ -234,7 +243,7 @@ class TestShowSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
_fc['source_port_range_max'],
|
||||
_fc['source_port_range_min']
|
||||
)
|
||||
_flow_classifier = {'flow_classifier': _fc}
|
||||
_flow_classifier = _fc
|
||||
_flow_classifier_id = _fc['id']
|
||||
columns = ('Description',
|
||||
'Destination IP',
|
||||
@ -250,12 +259,12 @@ class TestShowSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
'Protocol',
|
||||
'Source IP',
|
||||
'Source Port Range Max',
|
||||
'Source Port Range Min')
|
||||
'Source Port Range Min',
|
||||
'Summary',)
|
||||
|
||||
def setUp(self):
|
||||
super(TestShowSfcFlowClassifier, self).setUp()
|
||||
mock.patch(get_id, new=_get_id).start()
|
||||
self.neutronclient.show_sfc_flow_classifier = mock.Mock(
|
||||
self.network.get_sfc_flow_classifier = mock.Mock(
|
||||
return_value=self._flow_classifier
|
||||
)
|
||||
# Get the command object to test
|
||||
@ -263,8 +272,8 @@ class TestShowSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
self.namespace)
|
||||
|
||||
def test_show_flow_classifier(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
mock_flow_classifier_show = client.show_sfc_flow_classifier
|
||||
client = self.app.client_manager.network
|
||||
mock_flow_classifier_show = client.get_sfc_flow_classifier
|
||||
arglist = [
|
||||
self._flow_classifier_id,
|
||||
]
|
||||
@ -277,7 +286,6 @@ class TestShowSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
mock_flow_classifier_show.assert_called_once_with(
|
||||
self._flow_classifier_id)
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
|
||||
class TestListSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
@ -324,9 +332,8 @@ class TestListSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestListSfcFlowClassifier, self).setUp()
|
||||
mock.patch(get_id, new=_get_id).start()
|
||||
self.neutronclient.list_sfc_flow_classifiers = mock.Mock(
|
||||
return_value={'flow_classifiers': self._fc}
|
||||
self.network.sfc_flow_classifiers = mock.Mock(
|
||||
return_value=self._fc
|
||||
)
|
||||
# Get the command object to test
|
||||
self.cmd = sfc_flow_classifier.ListSfcFlowClassifier(self.app,
|
||||
@ -337,8 +344,7 @@ class TestListSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
verifylist = []
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns = self.cmd.take_action(parsed_args)
|
||||
fcs = self.neutronclient \
|
||||
.list_sfc_flow_classifiers()['flow_classifiers']
|
||||
fcs = self.network.sfc_flow_classifiers()
|
||||
fc = fcs[0]
|
||||
data = [
|
||||
fc['id'],
|
||||
@ -355,8 +361,7 @@ class TestListSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
def test_list_with_long_option(self):
|
||||
arglist = ['--long']
|
||||
verifylist = [('long', True)]
|
||||
fcs = self.neutronclient \
|
||||
.list_sfc_flow_classifiers()['flow_classifiers']
|
||||
fcs = self.network.sfc_flow_classifiers()
|
||||
fc = fcs[0]
|
||||
data = [
|
||||
fc['id'],
|
||||
|
@ -16,6 +16,7 @@
|
||||
from unittest import mock
|
||||
|
||||
from osc_lib import exceptions
|
||||
import testtools
|
||||
|
||||
from neutronclient.osc.v2.sfc import sfc_port_chain
|
||||
from neutronclient.tests.unit.osc.v2.sfc import fakes
|
||||
@ -29,8 +30,7 @@ class TestCreateSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
# The new port_chain created
|
||||
_port_chain = fakes.FakeSfcPortChain.create_port_chain()
|
||||
|
||||
columns = ('Chain ID',
|
||||
'Chain Parameters',
|
||||
columns = ('Chain Parameters',
|
||||
'Description',
|
||||
'Flow Classifiers',
|
||||
'ID',
|
||||
@ -40,7 +40,6 @@ class TestCreateSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def get_data(self):
|
||||
return (
|
||||
self._port_chain['chain_id'],
|
||||
self._port_chain['chain_parameters'],
|
||||
self._port_chain['description'],
|
||||
self._port_chain['flow_classifiers'],
|
||||
@ -52,11 +51,8 @@ class TestCreateSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestCreateSfcPortChain, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_chain._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.create_sfc_port_chain = mock.Mock(
|
||||
return_value={'port_chain': self._port_chain})
|
||||
self.network.create_sfc_port_chain = mock.Mock(
|
||||
return_value=self._port_chain)
|
||||
self.data = self.get_data()
|
||||
|
||||
# Get the command object to test
|
||||
@ -77,11 +73,12 @@ class TestCreateSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = (self.cmd.take_action(parsed_args))
|
||||
|
||||
self.neutronclient.create_sfc_port_chain.assert_called_once_with({
|
||||
'port_chain': {
|
||||
self.network.create_sfc_port_chain.assert_called_once_with(
|
||||
**{
|
||||
'name': self._port_chain['name'],
|
||||
'port_pair_groups': [self._port_chain['port_pair_groups']]}
|
||||
})
|
||||
'port_pair_groups': [self._port_chain['port_pair_groups']]
|
||||
}
|
||||
)
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
@ -107,15 +104,15 @@ class TestCreateSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = (self.cmd.take_action(parsed_args))
|
||||
|
||||
self.neutronclient.create_sfc_port_chain.assert_called_once_with({
|
||||
'port_chain': {
|
||||
self.network.create_sfc_port_chain.assert_called_once_with(
|
||||
**{
|
||||
'name': self._port_chain['name'],
|
||||
'port_pair_groups': [self._port_chain['port_pair_groups']],
|
||||
'description': self._port_chain['description'],
|
||||
'flow_classifiers': [self._port_chain['flow_classifiers']],
|
||||
'chain_parameters': cp
|
||||
}
|
||||
})
|
||||
}
|
||||
)
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
@ -126,20 +123,17 @@ class TestDeleteSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestDeleteSfcPortChain, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_chain._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.delete_sfc_port_chain = mock.Mock(return_value=None)
|
||||
self.network.delete_sfc_port_chain = mock.Mock(return_value=None)
|
||||
self.cmd = sfc_port_chain.DeleteSfcPortChain(self.app, self.namespace)
|
||||
|
||||
def test_delete_port_chain(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
client = self.app.client_manager.network
|
||||
mock_port_chain_delete = client.delete_sfc_port_chain
|
||||
arglist = [
|
||||
self._port_chain[0]['id'],
|
||||
]
|
||||
verifylist = [
|
||||
('port_chain', self._port_chain[0]['id']),
|
||||
('port_chain', [self._port_chain[0]['id']]),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
@ -147,13 +141,29 @@ class TestDeleteSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
self._port_chain[0]['id'])
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_delete_multiple_port_chains_with_exception(self):
|
||||
client = self.app.client_manager.network
|
||||
target1 = self._port_chain[0]['id']
|
||||
arglist = [target1]
|
||||
verifylist = [('port_chain', [target1])]
|
||||
|
||||
client.find_sfc_port_chain.side_effect = [
|
||||
target1, exceptions.CommandError
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
msg = "1 of 2 port chain(s) failed to delete."
|
||||
with testtools.ExpectedException(exceptions.CommandError) as e:
|
||||
self.cmd.take_action(parsed_args)
|
||||
self.assertEqual(msg, str(e))
|
||||
|
||||
|
||||
class TestListSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
_port_chains = fakes.FakeSfcPortChain.create_port_chains(count=1)
|
||||
columns = ('ID', 'Name', 'Port Pair Groups', 'Flow Classifiers',
|
||||
'Chain Parameters', 'Chain ID')
|
||||
'Chain Parameters')
|
||||
columns_long = ('ID', 'Name', 'Port Pair Groups', 'Flow Classifiers',
|
||||
'Chain Parameters', 'Description', 'Chain ID', 'Project')
|
||||
'Chain Parameters', 'Description', 'Project')
|
||||
_port_chain = _port_chains[0]
|
||||
data = [
|
||||
_port_chain['id'],
|
||||
@ -161,13 +171,11 @@ class TestListSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
_port_chain['port_pair_groups'],
|
||||
_port_chain['flow_classifiers'],
|
||||
_port_chain['chain_parameters'],
|
||||
_port_chain['chain_id']
|
||||
]
|
||||
data_long = [
|
||||
_port_chain['id'],
|
||||
_port_chain['name'],
|
||||
_port_chain['project_id'],
|
||||
_port_chain['chain_id'],
|
||||
_port_chain['port_pair_groups'],
|
||||
_port_chain['flow_classifiers'],
|
||||
_port_chain['chain_parameters'],
|
||||
@ -178,11 +186,8 @@ class TestListSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestListSfcPortChain, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_chain._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.list_sfc_port_chains = mock.Mock(
|
||||
return_value={'port_chains': self._port_chains}
|
||||
self.network.sfc_port_chains = mock.Mock(
|
||||
return_value=self._port_chains
|
||||
)
|
||||
# Get the command object to test
|
||||
self.cmd = sfc_port_chain.ListSfcPortChain(self.app, self.namespace)
|
||||
@ -192,7 +197,7 @@ class TestListSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
verifylist = []
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns = self.cmd.take_action(parsed_args)[0]
|
||||
pcs = self.neutronclient.list_sfc_port_chains()['port_chains']
|
||||
pcs = self.network.sfc_port_chains()
|
||||
pc = pcs[0]
|
||||
data = [
|
||||
pc['id'],
|
||||
@ -200,7 +205,6 @@ class TestListSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
pc['port_pair_groups'],
|
||||
pc['flow_classifiers'],
|
||||
pc['chain_parameters'],
|
||||
pc['chain_id']
|
||||
]
|
||||
self.assertEqual(list(self.columns), columns)
|
||||
self.assertEqual(self.data, data)
|
||||
@ -210,13 +214,12 @@ class TestListSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
verifylist = [('long', True)]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns = self.cmd.take_action(parsed_args)[0]
|
||||
pcs = self.neutronclient.list_sfc_port_chains()['port_chains']
|
||||
pcs = self.network.sfc_port_chains()
|
||||
pc = pcs[0]
|
||||
data = [
|
||||
pc['id'],
|
||||
pc['name'],
|
||||
pc['project_id'],
|
||||
pc['chain_id'],
|
||||
pc['port_pair_groups'],
|
||||
pc['flow_classifiers'],
|
||||
pc['chain_parameters'],
|
||||
@ -237,14 +240,11 @@ class TestSetSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestSetSfcPortChain, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_chain._get_id',
|
||||
new=_get_id).start()
|
||||
self.mocked = self.neutronclient.update_sfc_port_chain
|
||||
self.mocked = self.network.update_sfc_port_chain
|
||||
self.cmd = sfc_port_chain.SetSfcPortChain(self.app, self.namespace)
|
||||
|
||||
def test_set_port_chain(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
client = self.app.client_manager.network
|
||||
mock_port_chain_update = client.update_sfc_port_chain
|
||||
arglist = [
|
||||
self._port_chain_name,
|
||||
@ -258,10 +258,9 @@ class TestSetSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
attrs = {'port_chain': {'name': 'name_updated',
|
||||
'description': 'desc_updated'}}
|
||||
attrs = {'name': 'name_updated', 'description': 'desc_updated'}
|
||||
mock_port_chain_update.assert_called_once_with(self._port_chain_name,
|
||||
attrs)
|
||||
**attrs)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_set_flow_classifiers(self):
|
||||
@ -269,23 +268,12 @@ class TestSetSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
fc1 = 'flow_classifier1'
|
||||
fc2 = 'flow_classifier2'
|
||||
|
||||
def _mock_flow_classifier(*args, **kwargs):
|
||||
if self.neutronclient.find_resource.call_count == 1:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
self.res, target, cmd_resource='sfc_port_chain')
|
||||
return {'flow_classifiers': [self.pc_fc]}
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 2:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
'flow_classifier', fc1, cmd_resource='sfc_flow_classifier')
|
||||
return {'id': args[1]}
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 3:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
'flow_classifier', fc2, cmd_resource='sfc_flow_classifier')
|
||||
return {'id': args[1]}
|
||||
|
||||
self.neutronclient.find_resource.side_effect = _mock_flow_classifier
|
||||
self.network.find_sfc_port_chain = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing=False:
|
||||
{'id': name_or_id, 'flow_classifiers': [self.pc_fc]}
|
||||
)
|
||||
self.network.find_sfc_flow_classifier.side_effect = \
|
||||
lambda name_or_id, ignore_missing=False: {'id': name_or_id}
|
||||
arglist = [
|
||||
target,
|
||||
'--flow-classifier', fc1,
|
||||
@ -298,12 +286,11 @@ class TestSetSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
expect = {'flow_classifiers': [self.pc_fc, fc1, fc2]}
|
||||
self.mocked.assert_called_once_with(target, {self.res: expect})
|
||||
self.assertEqual(3, self.neutronclient.find_resource.call_count)
|
||||
self.mocked.assert_called_once_with(target, **expect)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_set_no_flow_classifier(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
client = self.app.client_manager.network
|
||||
mock_port_chain_update = client.update_sfc_port_chain
|
||||
arglist = [
|
||||
self._port_chain_name,
|
||||
@ -315,9 +302,9 @@ class TestSetSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
attrs = {'port_chain': {'flow_classifiers': []}}
|
||||
attrs = {'flow_classifiers': []}
|
||||
mock_port_chain_update.assert_called_once_with(self._port_chain_name,
|
||||
attrs)
|
||||
**attrs)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_set_port_pair_groups(self):
|
||||
@ -326,25 +313,10 @@ class TestSetSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
ppg1 = 'port_pair_group1'
|
||||
ppg2 = 'port_pair_group2'
|
||||
|
||||
def _mock_flow_classifier(*args, **kwargs):
|
||||
if self.neutronclient.find_resource.call_count == 1:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
self.res, target, cmd_resource='sfc_port_chain')
|
||||
return {'port_pair_groups': [self.pc_ppg]}
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 2:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
'port_pair_group', ppg1,
|
||||
cmd_resource='sfc_port_pair_group')
|
||||
return {'id': args[1]}
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 3:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
'port_pair_group', ppg2,
|
||||
cmd_resource='sfc_port_pair_group')
|
||||
return {'id': args[1]}
|
||||
|
||||
self.neutronclient.find_resource.side_effect = _mock_flow_classifier
|
||||
self.network.find_sfc_port_chain = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing=False:
|
||||
{'id': name_or_id, 'port_pair_groups': [self.pc_ppg]}
|
||||
)
|
||||
arglist = [
|
||||
target,
|
||||
'--port-pair-group', ppg1,
|
||||
@ -357,22 +329,13 @@ class TestSetSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
expect = {'port_pair_groups': [existing_ppg, ppg1, ppg2]}
|
||||
self.mocked.assert_called_once_with(target, {self.res: expect})
|
||||
self.assertEqual(3, self.neutronclient.find_resource.call_count)
|
||||
self.mocked.assert_called_once_with(target, **expect)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_set_no_port_pair_group(self):
|
||||
target = self.resource['id']
|
||||
ppg1 = 'port_pair_group1'
|
||||
|
||||
def _mock_port_pair_group(*args, **kwargs):
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 1:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
'port_pair_group', ppg1,
|
||||
cmd_resource='sfc_port_pair_group')
|
||||
return {'id': args[1]}
|
||||
self.neutronclient.find_resource.side_effect = _mock_port_pair_group
|
||||
arglist = [
|
||||
target,
|
||||
'--no-port-pair-group',
|
||||
@ -386,8 +349,7 @@ class TestSetSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
expect = {'port_pair_groups': [ppg1]}
|
||||
self.mocked.assert_called_once_with(target, {self.res: expect})
|
||||
self.assertEqual(1, self.neutronclient.find_resource.call_count)
|
||||
self.mocked.assert_called_once_with(target, **expect)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_set_only_no_port_pair_group(self):
|
||||
@ -409,7 +371,6 @@ class TestShowSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
_pc = fakes.FakeSfcPortChain.create_port_chain()
|
||||
data = (
|
||||
_pc['chain_id'],
|
||||
_pc['chain_parameters'],
|
||||
_pc['description'],
|
||||
_pc['flow_classifiers'],
|
||||
@ -418,10 +379,9 @@ class TestShowSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
_pc['port_pair_groups'],
|
||||
_pc['project_id']
|
||||
)
|
||||
_port_chain = {'port_chain': _pc}
|
||||
_port_chain = _pc
|
||||
_port_chain_id = _pc['id']
|
||||
columns = ('Chain ID',
|
||||
'Chain Parameters',
|
||||
columns = ('Chain Parameters',
|
||||
'Description',
|
||||
'Flow Classifiers',
|
||||
'ID',
|
||||
@ -431,18 +391,15 @@ class TestShowSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestShowSfcPortChain, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_chain._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.show_sfc_port_chain = mock.Mock(
|
||||
self.network.get_sfc_port_chain = mock.Mock(
|
||||
return_value=self._port_chain
|
||||
)
|
||||
# Get the command object to test
|
||||
self.cmd = sfc_port_chain.ShowSfcPortChain(self.app, self.namespace)
|
||||
|
||||
def test_show_port_chain(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
mock_port_chain_show = client.show_sfc_port_chain
|
||||
client = self.app.client_manager.network
|
||||
mock_port_chain_show = client.get_sfc_port_chain
|
||||
arglist = [
|
||||
self._port_chain_id,
|
||||
]
|
||||
@ -468,34 +425,21 @@ class TestUnsetSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestUnsetSfcPortChain, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_chain._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.update_sfc_port_chain = mock.Mock(
|
||||
self.network.update_sfc_port_chain = mock.Mock(
|
||||
return_value=None)
|
||||
self.mocked = self.neutronclient.update_sfc_port_chain
|
||||
self.mocked = self.network.update_sfc_port_chain
|
||||
self.cmd = sfc_port_chain.UnsetSfcPortChain(self.app, self.namespace)
|
||||
|
||||
def test_unset_port_pair_group(self):
|
||||
target = self.resource['id']
|
||||
ppg1 = 'port_pair_group1'
|
||||
|
||||
def _mock_port_pair_group(*args, **kwargs):
|
||||
if self.neutronclient.find_resource.call_count == 1:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
self.res, target, cmd_resource='sfc_port_chain')
|
||||
return {'port_pair_groups': [self.pc_ppg]}
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 2:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
'port_pair_group', ppg1,
|
||||
cmd_resource='sfc_port_pair_group')
|
||||
return {'id': args[1]}
|
||||
if self.neutronclient.find_resource.call_count == 3:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
self.res, target, cmd_resource='sfc_port_chain')
|
||||
return {'id': args[1]}
|
||||
self.neutronclient.find_resource.side_effect = _mock_port_pair_group
|
||||
self.network.find_sfc_port_chain = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing=False:
|
||||
{'id': name_or_id, 'port_pair_groups': [self.pc_ppg]}
|
||||
)
|
||||
self.network.find_sfc_port_pair_group.side_effect = \
|
||||
lambda name_or_id, ignore_missing=False: {'id': name_or_id}
|
||||
|
||||
arglist = [
|
||||
target,
|
||||
@ -508,24 +452,18 @@ class TestUnsetSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
expect = {'port_pair_groups': [self.pc_ppg]}
|
||||
self.mocked.assert_called_once_with(target, {self.res: expect})
|
||||
self.mocked.assert_called_once_with(target, **expect)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_unset_flow_classifier(self):
|
||||
target = self.resource['id']
|
||||
fc1 = 'flow_classifier1'
|
||||
|
||||
def _mock_flow_classifier(*args, **kwargs):
|
||||
if self.neutronclient.find_resource.call_count == 1:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
self.res, target, cmd_resource='sfc_port_chain')
|
||||
return {'flow_classifiers': [self.pc_fc]}
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 2:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
'flow_classifier', fc1, cmd_resource='sfc_flow_classifier')
|
||||
return {'id': args[1]}
|
||||
self.neutronclient.find_resource.side_effect = _mock_flow_classifier
|
||||
self.network.find_sfc_port_chain = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing=False:
|
||||
{'id': name_or_id, 'flow_classifiers': [self.pc_fc]}
|
||||
)
|
||||
self.network.find_sfc_flow_classifier.side_effect = \
|
||||
lambda name_or_id, ignore_missing=False: {'id': name_or_id}
|
||||
|
||||
arglist = [
|
||||
target,
|
||||
@ -538,11 +476,11 @@ class TestUnsetSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
expect = {'flow_classifiers': [self.pc_fc]}
|
||||
self.mocked.assert_called_once_with(target, {self.res: expect})
|
||||
self.mocked.assert_called_once_with(target, **expect)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_unset_all_flow_classifier(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
client = self.app.client_manager.network
|
||||
target = self.resource['id']
|
||||
mock_port_chain_update = client.update_sfc_port_chain
|
||||
arglist = [
|
||||
@ -557,5 +495,5 @@ class TestUnsetSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
expect = {'flow_classifiers': []}
|
||||
mock_port_chain_update.assert_called_once_with(target,
|
||||
{self.res: expect})
|
||||
**expect)
|
||||
self.assertIsNone(result)
|
||||
|
@ -15,14 +15,13 @@
|
||||
|
||||
from unittest import mock
|
||||
|
||||
from osc_lib import exceptions
|
||||
import testtools
|
||||
|
||||
from neutronclient.osc.v2.sfc import sfc_port_pair
|
||||
from neutronclient.tests.unit.osc.v2.sfc import fakes
|
||||
|
||||
|
||||
def _get_id(client, id_or_name, resource):
|
||||
return id_or_name
|
||||
|
||||
|
||||
class TestCreateSfcPortPair(fakes.TestNeutronClientOSCV2):
|
||||
# The new port_pair created
|
||||
_port_pair = fakes.FakeSfcPortPair.create_port_pair()
|
||||
@ -48,10 +47,8 @@ class TestCreateSfcPortPair(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestCreateSfcPortPair, self).setUp()
|
||||
mock.patch('neutronclient.osc.v2.sfc.sfc_port_pair._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.create_sfc_port_pair = mock.Mock(
|
||||
return_value={'port_pair': self._port_pair})
|
||||
self.network.create_sfc_port_pair = mock.Mock(
|
||||
return_value=self._port_pair)
|
||||
self.data = self.get_data()
|
||||
|
||||
# Get the command object to test
|
||||
@ -71,12 +68,11 @@ class TestCreateSfcPortPair(fakes.TestNeutronClientOSCV2):
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = (self.cmd.take_action(parsed_args))
|
||||
|
||||
self.neutronclient.create_sfc_port_pair.assert_called_once_with({
|
||||
'port_pair': {'name': self._port_pair['name'],
|
||||
'ingress': self._port_pair['ingress'],
|
||||
'egress': self._port_pair['egress'],
|
||||
}
|
||||
})
|
||||
self.network.create_sfc_port_pair.assert_called_once_with(
|
||||
**{'name': self._port_pair['name'],
|
||||
'ingress': self._port_pair['ingress'],
|
||||
'egress': self._port_pair['egress']}
|
||||
)
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
@ -106,16 +102,14 @@ class TestCreateSfcPortPair(fakes.TestNeutronClientOSCV2):
|
||||
correlation_param = None
|
||||
else:
|
||||
correlation_param = correlation
|
||||
self.neutronclient.create_sfc_port_pair.assert_called_once_with({
|
||||
'port_pair': {'name': self._port_pair['name'],
|
||||
'ingress': self._port_pair['ingress'],
|
||||
'egress': self._port_pair['egress'],
|
||||
'description': self._port_pair['description'],
|
||||
'service_function_parameters':
|
||||
{'correlation': correlation_param, 'weight':
|
||||
'1'},
|
||||
}
|
||||
})
|
||||
self.network.create_sfc_port_pair.assert_called_once_with(
|
||||
**{'name': self._port_pair['name'],
|
||||
'ingress': self._port_pair['ingress'],
|
||||
'egress': self._port_pair['egress'],
|
||||
'description': self._port_pair['description'],
|
||||
'service_function_parameters':
|
||||
{'correlation': correlation_param, 'weight': '1'}}
|
||||
)
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
@ -132,19 +126,17 @@ class TestDeleteSfcPortPair(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestDeleteSfcPortPair, self).setUp()
|
||||
mock.patch('neutronclient.osc.v2.sfc.sfc_port_pair._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.delete_sfc_port_pair = mock.Mock(return_value=None)
|
||||
self.network.delete_sfc_port_pair = mock.Mock(return_value=None)
|
||||
self.cmd = sfc_port_pair.DeleteSfcPortPair(self.app, self.namespace)
|
||||
|
||||
def test_delete_port_pair(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
client = self.app.client_manager.network
|
||||
mock_port_pair_delete = client.delete_sfc_port_pair
|
||||
arglist = [
|
||||
self._port_pair[0]['id'],
|
||||
]
|
||||
verifylist = [
|
||||
('port_pair', self._port_pair[0]['id']),
|
||||
('port_pair', [self._port_pair[0]['id']]),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
@ -152,6 +144,22 @@ class TestDeleteSfcPortPair(fakes.TestNeutronClientOSCV2):
|
||||
self._port_pair[0]['id'])
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_delete_multiple_port_pairs_with_exception(self):
|
||||
client = self.app.client_manager.network
|
||||
target1 = self._port_pair[0]['id']
|
||||
arglist = [target1]
|
||||
verifylist = [('port_pair', [target1])]
|
||||
|
||||
client.find_sfc_port_pair.side_effect = [
|
||||
target1, exceptions.CommandError
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
msg = "1 of 2 port pair(s) failed to delete."
|
||||
with testtools.ExpectedException(exceptions.CommandError) as e:
|
||||
self.cmd.take_action(parsed_args)
|
||||
self.assertEqual(msg, str(e))
|
||||
|
||||
|
||||
class TestListSfcPortPair(fakes.TestNeutronClientOSCV2):
|
||||
_port_pairs = fakes.FakeSfcPortPair.create_port_pairs()
|
||||
@ -179,11 +187,8 @@ class TestListSfcPortPair(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestListSfcPortPair, self).setUp()
|
||||
mock.patch('neutronclient.osc.v2.sfc.sfc_port_pair._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.list_sfc_port_pairs = mock.Mock(
|
||||
return_value={'port_pairs': self._port_pairs}
|
||||
)
|
||||
self.network.sfc_port_pairs = mock.Mock(
|
||||
return_value=self._port_pairs)
|
||||
# Get the command object to test
|
||||
self.cmd = sfc_port_pair.ListSfcPortPair(self.app, self.namespace)
|
||||
|
||||
@ -192,7 +197,7 @@ class TestListSfcPortPair(fakes.TestNeutronClientOSCV2):
|
||||
verifylist = []
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns = self.cmd.take_action(parsed_args)[0]
|
||||
port_pairs = self.neutronclient.list_sfc_port_pairs()['port_pairs']
|
||||
port_pairs = self.network.sfc_port_pairs()
|
||||
port_pair = port_pairs[0]
|
||||
data = [
|
||||
port_pair['id'],
|
||||
@ -206,7 +211,7 @@ class TestListSfcPortPair(fakes.TestNeutronClientOSCV2):
|
||||
def test_list_with_long_option(self):
|
||||
arglist = ['--long']
|
||||
verifylist = [('long', True)]
|
||||
port_pairs = self.neutronclient.list_sfc_port_pairs()['port_pairs']
|
||||
port_pairs = self.network.sfc_port_pairs()
|
||||
port_pair = port_pairs[0]
|
||||
data = [
|
||||
port_pair['id'],
|
||||
@ -229,13 +234,11 @@ class TestSetSfcPortPair(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestSetSfcPortPair, self).setUp()
|
||||
mock.patch('neutronclient.osc.v2.sfc.sfc_port_pair._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.update_sfc_port_pair = mock.Mock(return_value=None)
|
||||
self.network.update_sfc_port_pair = mock.Mock(return_value=None)
|
||||
self.cmd = sfc_port_pair.SetSfcPortPair(self.app, self.namespace)
|
||||
|
||||
def test_set_port_pair(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
client = self.app.client_manager.network
|
||||
mock_port_pair_update = client.update_sfc_port_pair
|
||||
arglist = [
|
||||
self._port_pair_name,
|
||||
@ -249,12 +252,12 @@ class TestSetSfcPortPair(fakes.TestNeutronClientOSCV2):
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
attrs = {'port_pair': {
|
||||
attrs = {
|
||||
'name': 'name_updated',
|
||||
'description': 'desc_updated'}
|
||||
'description': 'desc_updated'
|
||||
}
|
||||
mock_port_pair_update.assert_called_once_with(self._port_pair_name,
|
||||
attrs)
|
||||
**attrs)
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
@ -271,7 +274,7 @@ class TestShowSfcPortPair(fakes.TestNeutronClientOSCV2):
|
||||
_pp['project_id'],
|
||||
_pp['service_function_parameters'],
|
||||
)
|
||||
_port_pair = {'port_pair': _pp}
|
||||
_port_pair = _pp
|
||||
_port_pair_id = _pp['id']
|
||||
columns = (
|
||||
'Description',
|
||||
@ -285,10 +288,8 @@ class TestShowSfcPortPair(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestShowSfcPortPair, self).setUp()
|
||||
mock.patch('neutronclient.osc.v2.sfc.sfc_port_pair._get_id',
|
||||
new=_get_id).start()
|
||||
|
||||
self.neutronclient.show_sfc_port_pair = mock.Mock(
|
||||
self.network.get_sfc_port_pair = mock.Mock(
|
||||
return_value=self._port_pair
|
||||
)
|
||||
|
||||
@ -296,8 +297,8 @@ class TestShowSfcPortPair(fakes.TestNeutronClientOSCV2):
|
||||
self.cmd = sfc_port_pair.ShowSfcPortPair(self.app, self.namespace)
|
||||
|
||||
def test_show_port_pair(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
mock_port_pair_show = client.show_sfc_port_pair
|
||||
client = self.app.client_manager.network
|
||||
mock_port_pair_show = client.get_sfc_port_pair
|
||||
arglist = [
|
||||
self._port_pair_id,
|
||||
]
|
||||
|
@ -15,21 +15,19 @@
|
||||
|
||||
from unittest import mock
|
||||
|
||||
from osc_lib import exceptions
|
||||
import testtools
|
||||
|
||||
from neutronclient.osc.v2.sfc import sfc_port_pair_group
|
||||
from neutronclient.tests.unit.osc.v2.sfc import fakes
|
||||
|
||||
|
||||
def _get_id(client, id_or_name, resource):
|
||||
return id_or_name
|
||||
|
||||
|
||||
class TestCreateSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
_port_pair_group = fakes.FakeSfcPortPairGroup.create_port_pair_group()
|
||||
|
||||
columns = ('Description',
|
||||
'ID',
|
||||
'Loadbalance ID',
|
||||
'Name',
|
||||
'Port Pair',
|
||||
'Port Pair Group Parameters',
|
||||
@ -40,7 +38,6 @@ class TestCreateSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
return (
|
||||
ppg['description'],
|
||||
ppg['id'],
|
||||
ppg['group_id'],
|
||||
ppg['name'],
|
||||
ppg['port_pairs'],
|
||||
ppg['port_pair_group_parameters'],
|
||||
@ -50,12 +47,10 @@ class TestCreateSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestCreateSfcPortPairGroup, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.create_sfc_port_pair_group = mock.Mock(
|
||||
return_value={'port_pair_group': self._port_pair_group})
|
||||
self.network.create_sfc_port_pair_group = mock.Mock(
|
||||
return_value=self._port_pair_group)
|
||||
self.data = self.get_data(self._port_pair_group)
|
||||
|
||||
# Get the command object to test
|
||||
self.cmd = sfc_port_pair_group.CreateSfcPortPairGroup(self.app,
|
||||
self.namespace)
|
||||
@ -72,12 +67,9 @@ class TestCreateSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = (self.cmd.take_action(parsed_args))
|
||||
self.neutronclient.create_sfc_port_pair_group.assert_called_once_with({
|
||||
'port_pair_group': {
|
||||
'name': self._port_pair_group['name'],
|
||||
'port_pairs': [self._port_pair_group['port_pairs']]
|
||||
}
|
||||
})
|
||||
self.network.create_sfc_port_pair_group.assert_called_once_with(
|
||||
**{'name': self._port_pair_group['name'],
|
||||
'port_pairs': [self._port_pair_group['port_pairs']]})
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
@ -95,13 +87,11 @@ class TestCreateSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = (self.cmd.take_action(parsed_args))
|
||||
|
||||
self.neutronclient.create_sfc_port_pair_group.assert_called_once_with({
|
||||
'port_pair_group': {
|
||||
self.network.create_sfc_port_pair_group.assert_called_once_with(
|
||||
**{
|
||||
'name': self._port_pair_group['name'],
|
||||
'port_pairs': [self._port_pair_group['port_pairs']],
|
||||
'description': self._port_pair_group['description'],
|
||||
}
|
||||
})
|
||||
'description': self._port_pair_group['description']})
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
@ -127,22 +117,19 @@ class TestCreateSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = (self.cmd.take_action(parsed_args))
|
||||
|
||||
self.neutronclient.create_sfc_port_pair_group.assert_called_once_with({
|
||||
'port_pair_group': {
|
||||
self.network.create_sfc_port_pair_group.assert_called_once_with(
|
||||
**{
|
||||
'name': self._port_pair_group['name'],
|
||||
'port_pairs': [self._port_pair_group['port_pairs']],
|
||||
'description': self._port_pair_group['description'],
|
||||
'tap_enabled': True
|
||||
}
|
||||
})
|
||||
'tap_enabled': True})
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(expected_data, data)
|
||||
|
||||
def _update_expected_response_data(self, data):
|
||||
# REVISIT(vks1) - This method can be common for other test functions.
|
||||
ppg = fakes.FakeSfcPortPairGroup.create_port_pair_group(data)
|
||||
self.neutronclient.create_sfc_port_pair_group.return_value = {
|
||||
'port_pair_group': ppg}
|
||||
self.network.create_sfc_port_pair_group.return_value = ppg
|
||||
return self.get_data(ppg)
|
||||
|
||||
|
||||
@ -153,22 +140,19 @@ class TestDeleteSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestDeleteSfcPortPairGroup, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.delete_sfc_port_pair_group = mock.Mock(
|
||||
self.network.delete_sfc_port_pair_group = mock.Mock(
|
||||
return_value=None)
|
||||
self.cmd = sfc_port_pair_group.DeleteSfcPortPairGroup(self.app,
|
||||
self.namespace)
|
||||
|
||||
def test_delete_port_pair_group(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
client = self.app.client_manager.network
|
||||
mock_port_pair_group_delete = client.delete_sfc_port_pair_group
|
||||
arglist = [
|
||||
self._port_pair_group[0]['id'],
|
||||
]
|
||||
verifylist = [
|
||||
('port_pair_group', self._port_pair_group[0]['id']),
|
||||
('port_pair_group', [self._port_pair_group[0]['id']]),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
@ -176,13 +160,29 @@ class TestDeleteSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
self._port_pair_group[0]['id'])
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_delete_multiple_port_pair_groups_with_exception(self):
|
||||
client = self.app.client_manager.network
|
||||
target1 = 'target'
|
||||
arglist = [target1]
|
||||
verifylist = [('port_pair_group', [target1])]
|
||||
|
||||
client.find_sfc_port_pair_group.side_effect = [
|
||||
target1, exceptions.CommandError
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
msg = "1 of 2 port pair group(s) failed to delete."
|
||||
with testtools.ExpectedException(exceptions.CommandError) as e:
|
||||
self.cmd.take_action(parsed_args)
|
||||
self.assertEqual(msg, str(e))
|
||||
|
||||
|
||||
class TestListSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
_ppgs = fakes.FakeSfcPortPairGroup.create_port_pair_groups(count=1)
|
||||
columns = ('ID', 'Name', 'Port Pair', 'Port Pair Group Parameters',
|
||||
'Tap Enabled')
|
||||
columns_long = ('ID', 'Name', 'Port Pair', 'Port Pair Group Parameters',
|
||||
'Description', 'Loadbalance ID', 'Project', 'Tap Enabled')
|
||||
'Description', 'Project', 'Tap Enabled')
|
||||
_port_pair_group = _ppgs[0]
|
||||
data = [
|
||||
_port_pair_group['id'],
|
||||
@ -204,12 +204,9 @@ class TestListSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestListSfcPortPairGroup, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id',
|
||||
new=_get_id).start()
|
||||
|
||||
self.neutronclient.list_sfc_port_pair_groups = mock.Mock(
|
||||
return_value={'port_pair_groups': self._ppgs}
|
||||
self.network.sfc_port_pair_groups = mock.Mock(
|
||||
return_value=self._ppgs
|
||||
)
|
||||
# Get the command object to test
|
||||
self.cmd = sfc_port_pair_group.ListSfcPortPairGroup(self.app,
|
||||
@ -220,8 +217,7 @@ class TestListSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
verifylist = []
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns = self.cmd.take_action(parsed_args)[0]
|
||||
ppgs = self.neutronclient \
|
||||
.list_sfc_port_pair_groups()['port_pair_groups']
|
||||
ppgs = self.network.sfc_port_pair_groups()
|
||||
ppg = ppgs[0]
|
||||
data = [
|
||||
ppg['id'],
|
||||
@ -236,8 +232,7 @@ class TestListSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
def test_list_with_long_option(self):
|
||||
arglist = ['--long']
|
||||
verifylist = [('long', True)]
|
||||
ppgs = self.neutronclient \
|
||||
.list_sfc_port_pair_groups()['port_pair_groups']
|
||||
ppgs = self.network.sfc_port_pair_groups()
|
||||
ppg = ppgs[0]
|
||||
data = [
|
||||
ppg['id'],
|
||||
@ -264,12 +259,9 @@ class TestSetSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
def setUp(self):
|
||||
super(TestSetSfcPortPairGroup, self).setUp()
|
||||
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.update_sfc_port_pair_group = mock.Mock(
|
||||
self.network.update_sfc_port_pair_group = mock.Mock(
|
||||
return_value=None)
|
||||
self.mocked = self.neutronclient.update_sfc_port_pair_group
|
||||
self.mocked = self.network.update_sfc_port_pair_group
|
||||
self.cmd = sfc_port_pair_group.SetSfcPortPairGroup(self.app,
|
||||
self.namespace)
|
||||
|
||||
@ -278,24 +270,14 @@ class TestSetSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
port_pair1 = 'additional_port1'
|
||||
port_pair2 = 'additional_port2'
|
||||
|
||||
def _mock_port_pair_group(*args, **kwargs):
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 1:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
'port_pair', port_pair1, cmd_resource='sfc_port_pair')
|
||||
return {'id': args[1]}
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 2:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
'port_pair', port_pair2, cmd_resource='sfc_port_pair')
|
||||
return {'id': args[1]}
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 3:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
self.res, target, cmd_resource='sfc_port_pair_group')
|
||||
return {'port_pairs': [self.ppg_pp]}
|
||||
|
||||
self.neutronclient.find_resource.side_effect = _mock_port_pair_group
|
||||
self.network.find_sfc_port_pair = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing=False:
|
||||
{'id': name_or_id}
|
||||
)
|
||||
self.network.find_sfc_port_pair_group = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing=False:
|
||||
{'id': name_or_id, 'port_pairs': self.ppg_pp}
|
||||
)
|
||||
|
||||
arglist = [
|
||||
target,
|
||||
@ -308,13 +290,12 @@ class TestSetSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
expect = {'port_pairs': sorted([self.ppg_pp, port_pair1, port_pair2])}
|
||||
self.mocked.assert_called_once_with(target, {self.res: expect})
|
||||
self.assertEqual(3, self.neutronclient.find_resource.call_count)
|
||||
expect = {'port_pairs': sorted([*self.ppg_pp, port_pair1, port_pair2])}
|
||||
self.mocked.assert_called_once_with(target, **expect)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_set_no_port_pair(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
client = self.app.client_manager.network
|
||||
mock_port_pair_group_update = client.update_sfc_port_pair_group
|
||||
arglist = [
|
||||
self._port_pair_group_name,
|
||||
@ -331,11 +312,10 @@ class TestSetSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
|
||||
attrs = {'port_pair_group': {'name': 'name_updated',
|
||||
'description': 'desc_updated',
|
||||
'port_pairs': []}}
|
||||
attrs = {'name': 'name_updated', 'description': 'desc_updated',
|
||||
'port_pairs': []}
|
||||
mock_port_pair_group_update.assert_called_once_with(
|
||||
self._port_pair_group_name, attrs)
|
||||
self._port_pair_group_name, **attrs)
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
@ -345,18 +325,16 @@ class TestShowSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
data = (
|
||||
_ppg['description'],
|
||||
_ppg['id'],
|
||||
_ppg['group_id'],
|
||||
_ppg['name'],
|
||||
_ppg['port_pairs'],
|
||||
_ppg['port_pair_group_parameters'],
|
||||
_ppg['project_id'],
|
||||
_ppg['tap_enabled'])
|
||||
_port_pair_group = {'port_pair_group': _ppg}
|
||||
_port_pair_group = _ppg
|
||||
_port_pair_group_id = _ppg['id']
|
||||
columns = (
|
||||
'Description',
|
||||
'ID',
|
||||
'Loadbalance ID',
|
||||
'Name',
|
||||
'Port Pair',
|
||||
'Port Pair Group Parameters',
|
||||
@ -366,19 +344,16 @@ class TestShowSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestShowSfcPortPairGroup, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id',
|
||||
new=_get_id).start()
|
||||
|
||||
self.neutronclient.show_sfc_port_pair_group = mock.Mock(
|
||||
self.network.get_sfc_port_pair_group = mock.Mock(
|
||||
return_value=self._port_pair_group
|
||||
)
|
||||
self.cmd = sfc_port_pair_group.ShowSfcPortPairGroup(self.app,
|
||||
self.namespace)
|
||||
|
||||
def test_show_port_pair_group(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
mock_port_pair_group_show = client.show_sfc_port_pair_group
|
||||
client = self.app.client_manager.network
|
||||
mock_port_pair_group_show = client.get_sfc_port_pair_group
|
||||
arglist = [
|
||||
self._port_pair_group_id,
|
||||
]
|
||||
@ -404,12 +379,9 @@ class TestUnsetSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestUnsetSfcPortPairGroup, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.update_sfc_port_pair_group = mock.Mock(
|
||||
self.network.update_sfc_port_pair_group = mock.Mock(
|
||||
return_value=None)
|
||||
self.mocked = self.neutronclient.update_sfc_port_pair_group
|
||||
self.mocked = self.network.update_sfc_port_pair_group
|
||||
self.cmd = sfc_port_pair_group.UnsetSfcPortPairGroup(
|
||||
self.app, self.namespace)
|
||||
|
||||
@ -418,30 +390,14 @@ class TestUnsetSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
port_pair1 = 'additional_port1'
|
||||
port_pair2 = 'additional_port2'
|
||||
|
||||
def _mock_port_pair(*args, **kwargs):
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 1:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
self.res, target, cmd_resource='sfc_port_pair_group')
|
||||
return {'port_pairs': [self.ppg_pp]}
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 2:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
'port_pair', port_pair1, cmd_resource='sfc_port_pair')
|
||||
return {'id': args[1]}
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 3:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
'port_pair', port_pair2, cmd_resource='sfc_port_pair')
|
||||
return {'id': args[1]}
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 4:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
self.res, target, cmd_resource='sfc_port_pair_group')
|
||||
return {'id': args[1]}
|
||||
|
||||
self.neutronclient.find_resource.side_effect = _mock_port_pair
|
||||
|
||||
self.network.find_sfc_port_pair = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing=False:
|
||||
{'id': name_or_id}
|
||||
)
|
||||
self.network.find_sfc_port_pair_group = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing=False:
|
||||
{'id': name_or_id, 'port_pairs': self.ppg_pp}
|
||||
)
|
||||
arglist = [
|
||||
target,
|
||||
'--port-pair', port_pair1,
|
||||
@ -453,12 +409,12 @@ class TestUnsetSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
expect = {'port_pairs': sorted([self.ppg_pp])}
|
||||
self.mocked.assert_called_once_with(target, {self.res: expect})
|
||||
expect = {'port_pairs': sorted([*self.ppg_pp])}
|
||||
self.mocked.assert_called_once_with(target, **expect)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_unset_all_port_pair(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
client = self.app.client_manager.network
|
||||
mock_port_pair_group_update = client.update_sfc_port_pair_group
|
||||
arglist = [
|
||||
self._port_pair_group_name,
|
||||
@ -471,7 +427,7 @@ class TestUnsetSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
|
||||
attrs = {'port_pair_group': {'port_pairs': []}}
|
||||
attrs = {'port_pairs': []}
|
||||
mock_port_pair_group_update.assert_called_once_with(
|
||||
self._port_pair_group_name, attrs)
|
||||
self._port_pair_group_name, **attrs)
|
||||
self.assertIsNone(result)
|
||||
|
@ -11,19 +11,17 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from unittest import mock
|
||||
|
||||
from osc_lib import exceptions
|
||||
from osc_lib.tests import utils as tests_utils
|
||||
import testtools
|
||||
|
||||
from neutronclient.osc.v2.sfc import sfc_service_graph
|
||||
from neutronclient.tests.unit.osc.v2.sfc import fakes
|
||||
|
||||
|
||||
def _get_id(client, id_or_name, resource):
|
||||
return id_or_name
|
||||
|
||||
|
||||
class TestListSfcServiceGraph(fakes.TestNeutronClientOSCV2):
|
||||
_service_graphs = fakes.FakeSfcServiceGraph.create_sfc_service_graphs(
|
||||
count=1)
|
||||
@ -47,11 +45,8 @@ class TestListSfcServiceGraph(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestListSfcServiceGraph, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_service_graph._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.list_sfc_service_graphs = mock.Mock(
|
||||
return_value={'service_graphs': self._service_graphs}
|
||||
self.network.sfc_service_graphs = mock.Mock(
|
||||
return_value=self._service_graphs
|
||||
)
|
||||
# Get the command object to test
|
||||
self.cmd = sfc_service_graph.ListSfcServiceGraph(
|
||||
@ -62,7 +57,7 @@ class TestListSfcServiceGraph(fakes.TestNeutronClientOSCV2):
|
||||
verifylist = []
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns = self.cmd.take_action(parsed_args)[0]
|
||||
sgs = self.neutronclient.list_sfc_service_graphs()['service_graphs']
|
||||
sgs = self.network.sfc_service_graphs()
|
||||
sg = sgs[0]
|
||||
data = [
|
||||
sg['id'],
|
||||
@ -77,7 +72,7 @@ class TestListSfcServiceGraph(fakes.TestNeutronClientOSCV2):
|
||||
verifylist = [('long', True)]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns = self.cmd.take_action(parsed_args)[0]
|
||||
sgs = self.neutronclient.list_sfc_service_graphs()['service_graphs']
|
||||
sgs = self.network.sfc_service_graphs()
|
||||
sg = sgs[0]
|
||||
data = [
|
||||
sg['id'],
|
||||
@ -107,10 +102,8 @@ class TestCreateSfcServiceGraph(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestCreateSfcServiceGraph, self).setUp()
|
||||
mock.patch('neutronclient.osc.v2.sfc.sfc_service_graph._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.create_sfc_service_graph = mock.Mock(
|
||||
return_value={'service_graph': self._service_graph})
|
||||
self.network.create_sfc_service_graph = mock.Mock(
|
||||
return_value=self._service_graph)
|
||||
self.data = self.get_data()
|
||||
self.cmd = sfc_service_graph.CreateSfcServiceGraph(
|
||||
self.app, self.namespace)
|
||||
@ -145,12 +138,10 @@ class TestCreateSfcServiceGraph(fakes.TestNeutronClientOSCV2):
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = (self.cmd.take_action(parsed_args))
|
||||
|
||||
self.neutronclient.create_sfc_service_graph.assert_called_once_with({
|
||||
'service_graph': {
|
||||
self.network.create_sfc_service_graph.assert_called_once_with(**{
|
||||
'description': self._service_graph['description'],
|
||||
'name': self._service_graph['name'],
|
||||
'port_chains': pcs
|
||||
}
|
||||
})
|
||||
self.assertEqual(self.columns_long, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
@ -227,19 +218,19 @@ class TestDeleteSfcServiceGraph(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestDeleteSfcServiceGraph, self).setUp()
|
||||
self.neutronclient.delete_sfc_service_graph = mock.Mock(
|
||||
self.network.delete_sfc_service_graph = mock.Mock(
|
||||
return_value=None)
|
||||
self.cmd = sfc_service_graph.DeleteSfcServiceGraph(
|
||||
self.app, self.namespace)
|
||||
|
||||
def test_delete_sfc_service_graph(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
client = self.app.client_manager.network
|
||||
mock_service_graph_delete = client.delete_sfc_service_graph
|
||||
arglist = [
|
||||
self._service_graph[0]['id'],
|
||||
]
|
||||
verifylist = [
|
||||
('service_graph', self._service_graph[0]['id']),
|
||||
('service_graph', [self._service_graph[0]['id']]),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
@ -247,6 +238,22 @@ class TestDeleteSfcServiceGraph(fakes.TestNeutronClientOSCV2):
|
||||
self._service_graph[0]['id'])
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_delete_multiple_service_graphs_with_exception(self):
|
||||
client = self.app.client_manager.network
|
||||
target = self._service_graph[0]['id']
|
||||
arglist = [target]
|
||||
verifylist = [('service_graph', [target])]
|
||||
|
||||
client.find_sfc_service_graph.side_effect = [
|
||||
target, exceptions.CommandError
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
msg = "1 of 2 service graph(s) failed to delete."
|
||||
with testtools.ExpectedException(exceptions.CommandError) as e:
|
||||
self.cmd.take_action(parsed_args)
|
||||
self.assertEqual(msg, str(e))
|
||||
|
||||
|
||||
class TestShowSfcServiceGraph(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
@ -266,15 +273,12 @@ class TestShowSfcServiceGraph(fakes.TestNeutronClientOSCV2):
|
||||
_sg['project_id']
|
||||
)
|
||||
|
||||
_service_graph = {'service_graph': _sg}
|
||||
_service_graph = _sg
|
||||
_service_graph_id = _sg['id']
|
||||
|
||||
def setUp(self):
|
||||
super(TestShowSfcServiceGraph, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_service_graph._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.show_sfc_service_graph = mock.Mock(
|
||||
self.network.get_sfc_service_graph = mock.Mock(
|
||||
return_value=self._service_graph
|
||||
)
|
||||
# Get the command object to test
|
||||
@ -282,8 +286,8 @@ class TestShowSfcServiceGraph(fakes.TestNeutronClientOSCV2):
|
||||
self.app, self.namespace)
|
||||
|
||||
def test_service_graph_show(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
mock_service_graph_show = client.show_sfc_service_graph
|
||||
client = self.app.client_manager.network
|
||||
mock_service_graph_show = client.get_sfc_service_graph
|
||||
arglist = [
|
||||
self._service_graph_id,
|
||||
]
|
||||
@ -305,15 +309,13 @@ class TestSetSfcServiceGraph(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super(TestSetSfcServiceGraph, self).setUp()
|
||||
mock.patch('neutronclient.osc.v2.sfc.sfc_service_graph._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.update_sfc_service_graph = mock.Mock(
|
||||
self.network.update_sfc_service_graph = mock.Mock(
|
||||
return_value=None)
|
||||
self.cmd = sfc_service_graph.SetSfcServiceGraph(
|
||||
self.app, self.namespace)
|
||||
|
||||
def test_set_service_graph(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
client = self.app.client_manager.network
|
||||
mock_service_graph_update = client.update_sfc_service_graph
|
||||
arglist = [
|
||||
self._service_graph_name,
|
||||
@ -327,10 +329,10 @@ class TestSetSfcServiceGraph(fakes.TestNeutronClientOSCV2):
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
attrs = {'service_graph': {
|
||||
attrs = {
|
||||
'name': 'name_updated',
|
||||
'description': 'desc_updated'}
|
||||
'description': 'desc_updated'
|
||||
}
|
||||
mock_service_graph_update.assert_called_once_with(
|
||||
self._service_graph_name, attrs)
|
||||
self._service_graph_name, **attrs)
|
||||
self.assertIsNone(result)
|
||||
|
Loading…
Reference in New Issue
Block a user