[04/xx] Introduce SFC forwarding app

This patch introduces forwarding app for SFC, see API at [1] for
more information.

[1] https://docs.openstack.org/developer/networking-sfc/api.html

Partially-implements: blueprint service-function-chaining
Change-Id: I87b7337b3a56bd416eaa793d58e47872e18f4fae
This commit is contained in:
Dima Kuznetsov 2017-01-02 13:37:06 +02:00
parent 24dc5e2be7
commit 4647e0b8ab
14 changed files with 2222 additions and 10 deletions

View File

@ -28,6 +28,11 @@ if [ -n "${DEVSTACK_GATE_TEMPEST}" ] && [ ${DEVSTACK_GATE_TEMPEST} -gt 0 ]; then
export DEVSTACK_LOCAL_CONFIG+=$'\n'"NEUTRON_CREATE_INITIAL_NETWORKS=True"
else
export DEVSTACK_LOCAL_CONFIG+=$'\n'"NEUTRON_CREATE_INITIAL_NETWORKS=False"
# Enable SFC only in fullstack job
if [[ "${PROJECTS}" != *"openstack/rally"* ]]; then
export DEVSTACK_LOCAL_CONFIG+=$'\n'"ENABLE_DF_SFC=True"
fi
fi
# Begin list of exclusions.

View File

@ -24,6 +24,7 @@ export DEVSTACK_LOCAL_CONFIG+=$'\n'"ENABLE_PORT_STATUS_NOTIFIER=False"
export DEVSTACK_LOCAL_CONFIG+=$'\n'"ENABLE_ACTIVE_DETECTION=False"
export DEVSTACK_LOCAL_CONFIG+=$'\n'"DF_RUNNING_IN_GATE=True"
export DEVSTACK_LOCAL_CONFIG+=$'\n'"EXTERNAL_HOST_IP=172.24.4.100"
export DEVSTACK_LOCAL_CONFIG+=$'\n'"ENABLE_DF_SFC=True"
if [ -n "${DEVSTACK_GATE_TEMPEST}" ] && [ ${DEVSTACK_GATE_TEMPEST} -gt 0 ]; then
# Only include tempest if this is a tempest job

View File

@ -20,7 +20,7 @@ DEFAULT_TUNNEL_TYPES="vxlan,geneve,gre"
DEFAULT_APPS_LIST="l2,l3_proactive,dhcp,dnat,sg,portsec,portqos,classifier,tunneling,provider"
if [[ $ENABLE_DF_SFC == "True" ]]; then
DEFAULT_APPS_LIST="$DEFAULT_APPS_LIST,fc"
DEFAULT_APPS_LIST="$DEFAULT_APPS_LIST,fc,sfc"
fi
if is_service_enabled df-metadata ; then
@ -193,6 +193,10 @@ function configure_bgp {
iniset $NEUTRON_CONF DEFAULT api_extensions_path "$DEST/neutron-dynamic-routing/neutron_dynamic_routing/extensions"
}
function configure_sfc {
setup_develop $DEST/networking-sfc
}
function init_neutron_sample_config {
# NOTE: We must make sure that neutron config file exists before
# going further with ovs setup
@ -228,6 +232,10 @@ function configure_df_plugin {
configure_trunk
fi
if [[ "$ENABLE_DF_SFC" == "True" ]]; then
configure_sfc
fi
# NOTE(gsagie) needed for tempest
export NETWORK_API_EXTENSIONS=$(python -c \
'from dragonflow.common import extensions ;\

View File

@ -0,0 +1,328 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
from oslo_log import log
import six
from dragonflow._i18n import _
from dragonflow.controller.apps import sfc_mpls_driver
from dragonflow.controller import df_base_app
from dragonflow.db.models import constants as model_const
from dragonflow.db.models import l2
from dragonflow.db.models import sfc
LOG = log.getLogger(__name__)
class SfcApp(df_base_app.DFlowApp):
def switch_features_handler(self, ev):
self.mpls_driver = sfc_mpls_driver.MplsDriver(self)
def _get_port_chain_driver(self, port_chain):
proto = port_chain.protocol
if proto == sfc.PROTO_MPLS:
return self.mpls_driver
else:
raise RuntimeError(
_('Unsupported portchain proto {0}').format(proto),
)
@df_base_app.register_event(sfc.PortPairGroup, model_const.EVENT_UPDATED)
def _port_pair_group_updated(self, port_pair_group, old_port_pair_group):
'''Handler for port pair group changes.
If port pair group is part of a port-chain, update dispatch flows and
install/uninstal flows for changes port-pairs.
'''
port_chain = self._port_chain_by_port_pair_group(
port_pair_group,
)
if port_chain is None:
return
driver = self._get_port_chain_driver(port_chain)
# FIXME (dimak) maybe use modify
driver.uninstall_port_pair_group_flows(
port_chain,
old_port_pair_group,
)
driver.install_port_pair_group_flows(
port_chain,
port_pair_group,
)
for old_pp, new_pp in six.moves.zip_longest(
port_pair_group.port_pairs,
old_port_pair_group.port_pairs,
):
if (
old_pp is not None and
new_pp is not None and
old_pp.id == new_pp.id
):
continue
if old_pp is not None:
self._uninstall_port_pair_egress(
port_chain,
old_port_pair_group,
old_pp,
)
if new_pp is not None:
self._install_port_pair_egress(
port_chain,
port_pair_group,
new_pp,
)
def _add_port_pair_group(self, port_chain, port_pair_group):
driver = self._get_port_chain_driver(port_chain)
driver.install_port_pair_group_flows(
port_chain,
port_pair_group,
)
for pp in port_pair_group.port_pairs:
self._install_port_pair_egress(
port_chain,
port_pair_group,
pp,
)
@df_base_app.register_event(sfc.PortChain, model_const.EVENT_CREATED)
def _port_chain_created(self, port_chain):
driver = self._get_port_chain_driver(port_chain)
for fc in port_chain.flow_classifiers:
driver.install_flow_classifier_flows(port_chain, fc)
for ppg in port_chain.port_pair_groups:
self._add_port_pair_group(port_chain, ppg)
def _remove_port_pair_group(self, port_chain, port_pair_group):
driver = self._get_port_chain_driver(port_chain)
driver.uninstall_port_pair_group_flows(
port_chain,
port_pair_group,
)
for pp in port_pair_group.port_pairs:
self._uninstall_port_pair_egress(
port_chain,
port_pair_group,
pp,
)
def _install_port_pair_egress(self, port_chain, port_pair_group,
port_pair):
if port_pair.egress_port.is_local:
driver = self._get_port_chain_driver(port_chain)
driver.install_port_pair_egress_flows(
port_chain,
port_pair_group,
port_pair,
)
def _uninstall_port_pair_egress(self, port_chain, port_pair_group,
port_pair):
if port_pair.egress_port.is_local:
driver = self._get_port_chain_driver(port_chain)
driver.uninstall_port_pair_egress_flows(
port_chain,
port_pair_group,
port_pair,
)
@df_base_app.register_event(sfc.PortChain, model_const.EVENT_DELETED)
def _port_chain_deleted(self, port_chain):
driver = self._get_port_chain_driver(port_chain)
for fc in port_chain.flow_classifiers:
driver.uninstall_flow_classifier_flows(port_chain, fc)
for ppg in port_chain.port_pair_groups:
self._remove_port_pair_group(port_chain, ppg)
@df_base_app.register_event(sfc.PortChain, model_const.EVENT_UPDATED)
def _port_chain_updated(self, port_chain, old_port_chain):
'''Handler for port-chain update
* Install/uninstall changed flow classifiers.
* Install/uninstall changed port pair groups.
'''
driver = self._get_port_chain_driver(port_chain)
old_fc_ids = set(fc.id for fc in old_port_chain.flow_classifiers)
new_fc_ids = set(fc.id for fc in port_chain.flow_classifiers)
added_fc_ids = new_fc_ids - old_fc_ids
removed_fc_ids = old_fc_ids - new_fc_ids
removed_fcs = (
fc for fc in old_port_chain.flow_classifiers
if fc.id in removed_fc_ids
)
for fc in removed_fcs:
driver.uninstall_flow_classifier_flows(old_port_chain, fc)
added_fcs = (
fc for fc in port_chain.flow_classifiers if fc.id in added_fc_ids
)
for fc in added_fcs:
driver.install_flow_classifier_flows(port_chain, fc)
# Port pairs groups are more complex since labels depend on index :(
for old_ppg, new_ppg in six.moves.zip_longest(
old_port_chain.port_pair_groups,
port_chain.port_pair_groups,
):
if new_ppg is not None and old_ppg is None:
# New chain is longer
self._add_port_pair_group(port_chain, new_ppg)
elif old_ppg is not None and new_ppg is None:
# New chain is shorter
self._remove_port_pair_group(old_port_chain, old_ppg)
elif new_ppg.id != old_ppg.id:
# At most one is None so here we have both present
self._remove_port_pair_group(old_port_chain, old_ppg)
self._add_port_pair_group(port_chain, new_ppg)
def _flow_classifiers_by_lport(self, lport):
return itertools.chain(
self.db_store.get_all(
sfc.FlowClassifier(source_port=lport),
index=sfc.FlowClassifier.get_index('source_port'),
),
self.db_store.get_all(
sfc.FlowClassifier(dest_port=lport),
index=sfc.FlowClassifier.get_index('dest_port'),
),
)
def _port_pairs_by_lport(self, lport):
# If port pair uses same port for ingress and egress, we will get it
# here twice so need to filter out:
seen = set()
for port_pair in itertools.chain(
self.db_store.get_all(
sfc.PortPair(egress_port=lport),
index=sfc.PortPair.get_index('egress'),
),
self.db_store.get_all(
sfc.PortPair(egress_port=lport),
index=sfc.PortPair.get_index('ingress'),
),
):
if port_pair.id not in seen:
seen.add(port_pair.id)
yield port_pair
def _port_chain_by_flow_classifier(self, flow_classifier):
return self.db_store.get_one(
sfc.PortChain(flow_classifiers=[flow_classifier]),
index=sfc.PortChain.get_index('flow_classifiers'),
)
def _port_chain_by_port_pair_group(self, port_pair_group):
return self.db_store.get_one(
sfc.PortChain(
port_pair_groups=[port_pair_group],
),
index=sfc.PortChain.get_index('port_pair_groups'),
)
def _port_chain_with_port_pair_group_by_port_pair(self, port_pair):
port_pair_group = self.db_store.get_one(
sfc.PortPairGroup(
port_pairs=[port_pair],
),
index=sfc.PortPairGroup.get_index('port_pairs'),
)
if port_pair_group is not None:
return (
self._port_chain_by_port_pair_group(port_pair_group),
port_pair_group,
)
return None, None
@df_base_app.register_event(l2.LogicalPort, l2.EVENT_LOCAL_CREATED)
def _local_lport_created(self, lport):
'''Handler for local port create
* Update all flow classifiers that reference this port
* Update all port-pair-groups that contain this port.
'''
# install new encap/decap flows
for fc in self._flow_classifiers_by_lport(lport):
port_chain = self._port_chain_by_flow_classifier(fc)
if port_chain is not None:
driver = self._get_port_chain_driver(port_chain)
driver.install_flow_classifier_local_port_flows(
port_chain, fc)
for pp in self._port_pairs_by_lport(lport):
port_chain, port_pair_group = \
self._port_chain_with_port_pair_group_by_port_pair(pp)
if port_chain is not None:
driver = self._get_port_chain_driver(port_chain)
if lport.id == pp.egress_port.id:
driver.install_port_pair_egress_flows(
port_chain,
port_pair_group,
pp,
)
if lport.id == pp.ingress_port.id:
# To refresh the dispatch groups
driver.uninstall_port_pair_group_flows(
port_chain,
port_pair_group,
)
driver.install_port_pair_group_flows(
port_chain,
port_pair_group,
)
@df_base_app.register_event(l2.LogicalPort, l2.EVENT_LOCAL_DELETED)
def _local_lport_deleted(self, lport):
'''Handler for local port remove
* Update all flow classifiers that reference this port
* Update all port-pair-groups that contain this port.
'''
for fc in self._flow_classifiers_by_lport(lport):
port_chain = self._port_chain_by_flow_classifier(fc)
if port_chain is not None:
driver = self._get_port_chain_driver(port_chain)
driver.uninstall_flow_classifier_local_port_flows(
port_chain, fc)
for pp in self._port_pairs_by_lport(lport):
port_chain, port_pair_group = \
self._port_chain_with_port_pair_group_by_port_pair(pp)
if port_chain is not None:
driver = self._get_port_chain_driver(port_chain)
if lport.id == pp.egress_port.id:
driver.uninstall_port_pair_egress_flows(
port_chain, port_pair_group, pp)
if lport.id == pp.ingress_port.id:
driver.uninstall_port_pair_group_flows(
port_chain,
port_pair_group,
)
driver.install_port_pair_group_flows(
port_chain,
port_pair_group,
)

View File

@ -0,0 +1,114 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from oslo_log import log
import six
LOG = log.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class SfcBaseDriver(object):
def __init__(self, app):
'''Initialization code
:param app: Instance of SFC app
'''
pass
@abc.abstractmethod
def install_flow_classifier_flows(self, port_chain, flow_classifier):
'''Install flows that will capture packets classified by Flow
Classifier app. The captured packets arrive at SFC_ENCAP_TABLE with
reg6 set to unique key of the flow classifier. Packets will arive to
the table only if fc.is_classification_local is true.
Additionally, install flows s.t. packets that finish the chain arrive
to SFC_END_OF_CHAIN_TABLE, for dispatch by FC app. Packets arriving
to this table should have unique key of their original flow classifier
in reg6, and should only arrive to this table if fc.is_dispatch_local
is true, otherwise, should be forwarded by the driver to the chassis
of the destination lport.
This is called for all flow classifiers of the port chain
:param port_chain: Relevant port chain
:param flow_classifier: Relevant flow classifier
'''
pass
@abc.abstractmethod
def uninstall_flow_classifier_flows(self, port_chain, flow_classifier):
'''Reverse the installed flows of the above
'''
pass
@abc.abstractmethod
def install_flow_classifier_local_port_flows(self, port_chain,
flow_classifier):
'''Install flows for a new local port referenced by a flow classifier.
This code is called when the port referenced by flow classifier becomes
available locally, i.e., flow classifier was installed but the port
was not local, and it becomes local now.
:param port_chain: Relevant port chain
:param flow_classifier: Relevant flow classifier
'''
pass
@abc.abstractmethod
def uninstall_flow_classifier_local_port_flows(self, port_chain,
flow_classifier):
'''Same as above, but when a local port is removed.
'''
pass
@abc.abstractmethod
def install_port_pair_group_flows(self, port_chain, port_pair_group):
'''Install flows that forward a packet into all the port pairs of the
provided port pair group.
This is called for all port pair groups of the port chain
:param port_chain: Relevant port chain
:param port_pair_group: Relevant port pair group
'''
pass
@abc.abstractmethod
def uninstall_port_pair_group_flows(self, port_chain, port_pair_group):
'''Reverse the installed flows of the above
'''
pass
@abc.abstractmethod
def install_port_pair_egress_flows(self, port_chain, port_pair_group,
port_pair):
'''Install flows that capture the packets coming out of the egress
port of the provided port pair and forward them into flows
that dispatch the next port pair group.
This method is called for all port parts whose egress lport is local.
:param port_chain: Relevant port chain
:param port_pair_group: Relevant port pair group
:param port_pair: Relevant port pair
'''
pass
@abc.abstractmethod
def uninstall_port_pair_egress_flows(self, port_chain, port_pair_groups,
port_pair):
'''Reverse the installed flows of the above
'''
pass

View File

@ -0,0 +1,563 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from ryu.lib.packet import ether_types
from dragonflow.controller.apps import sfc_driver_base
from dragonflow.controller.common import constants
from dragonflow.db.models import sfc
LOG = log.getLogger(__name__)
def _get_index_by_id(lst, obj):
return next(i for i, o in enumerate(lst) if o.id == obj.id)
def _create_group_id(label, extra):
# FIXME add global way to share group IDs
return (label << 8) | extra
def _get_dispatch_to_all_group_id(label):
return _create_group_id(label, 1)
def _get_dispatch_locally_group_id(label):
return _create_group_id(label, 2)
class _SimpleMplsLabelAllocator(object):
@classmethod
def _create_label(cls, chain_idx, fc_idx, ppg_idx):
return ppg_idx | (fc_idx << 8) | (chain_idx << 11)
@classmethod
def _get_ingress_label(cls, port_chain, flow_classifier, port_pair_group):
fc_idx = _get_index_by_id(
port_chain.flow_classifiers,
flow_classifier,
)
ppg_idx = _get_index_by_id(
port_chain.port_pair_groups,
port_pair_group,
)
return cls._create_label(port_chain.chain_id, fc_idx, ppg_idx)
@classmethod
def _get_egress_label(cls, port_chain, flow_classifier, port_pair_group):
label = cls._get_ingress_label(
port_chain,
flow_classifier,
port_pair_group,
)
return label + 1
@classmethod
def _get_encap_label(cls, port_chain, flow_classifier):
# Can be done faster but this reads better
return cls._get_ingress_label(
port_chain,
flow_classifier,
port_chain.port_pair_groups[0],
)
@classmethod
def _get_decap_label(cls, port_chain, flow_classifier):
# Can be done faster but this reads better
return cls._get_egress_label(
port_chain,
flow_classifier,
port_chain.port_pair_groups[-1],
)
class MplsDriver(_SimpleMplsLabelAllocator, sfc_driver_base.SfcBaseDriver):
_ETH_TYPE_TO_TC = {
ether_types.ETH_TYPE_IP: 0,
ether_types.ETH_TYPE_IPV6: 1,
}
def __init__(self, app):
self.app = app
def _install_encap_flows(self, port_chain, flow_classifier):
for eth_type in self._ETH_TYPE_TO_TC:
self.app.mod_flow(
table_id=constants.SFC_ENCAP_TABLE,
priority=constants.PRIORITY_HIGH,
match=self.app.parser.OFPMatch(
reg2=flow_classifier.unique_key,
eth_type=eth_type,
),
inst=[
self.app.parser.OFPInstructionActions(
self.app.ofproto.OFPIT_APPLY_ACTIONS,
[
self.app.parser.OFPActionPushMpls(
ether_types.ETH_TYPE_MPLS,
),
self.app.parser.OFPActionSetField(
mpls_label=self._get_encap_label(
port_chain,
flow_classifier,
),
),
self.app.parser.OFPActionSetField(
mpls_tc=self._ETH_TYPE_TO_TC[eth_type],
),
],
),
self.app.parser.OFPInstructionGotoTable(
constants.SFC_MPLS_DISPATCH_TABLE
),
],
)
def _uninstall_encap_flows(self, port_chain, flow_classifier):
for eth_type in self._ETH_TYPE_TO_TC:
self.app.mod_flow(
command=self.app.ofproto.OFPFC_DELETE_STRICT,
table_id=constants.SFC_ENCAP_TABLE,
priority=constants.PRIORITY_HIGH,
match=self.app.parser.OFPMatch(
reg2=flow_classifier.unique_key,
eth_type=eth_type,
),
)
def _install_decap_flows(self, port_chain, flow_classifier):
for eth_type in self._ETH_TYPE_TO_TC:
self.app.mod_flow(
table_id=constants.SFC_MPLS_DISPATCH_TABLE,
priority=constants.PRIORITY_HIGH,
match=self.app.parser.OFPMatch(
eth_type=ether_types.ETH_TYPE_MPLS,
mpls_label=self._get_decap_label(
port_chain,
flow_classifier,
),
mpls_tc=self._ETH_TYPE_TO_TC[eth_type],
),
inst=[
self.app.parser.OFPInstructionActions(
self.app.ofproto.OFPIT_APPLY_ACTIONS,
[
self.app.parser.OFPActionPopMpls(eth_type),
self.app.parser.OFPActionSetField(
reg2=flow_classifier.unique_key,
),
],
),
self.app.parser.OFPInstructionGotoTable(
constants.SFC_END_OF_CHAIN_TABLE,
),
],
)
def _uninstall_decap_flows(self, port_chain, flow_classifier):
for eth_type in self._ETH_TYPE_TO_TC:
self.app.mod_flow(
command=self.app.ofproto.OFPFC_DELETE_STRICT,
table_id=constants.SFC_MPLS_DISPATCH_TABLE,
priority=constants.PRIORITY_HIGH,
match=self.app.parser.OFPMatch(
eth_type=ether_types.ETH_TYPE_MPLS,
mpls_label=self._get_decap_label(
port_chain,
flow_classifier
),
mpls_tc=self._ETH_TYPE_TO_TC[eth_type],
),
)
def _install_forward_to_dest(self, port_chain, flow_classifier):
for eth_type in self._ETH_TYPE_TO_TC:
self.app.mod_flow(
table_id=constants.SFC_MPLS_DISPATCH_TABLE,
priority=constants.PRIORITY_HIGH,
match=self.app.parser.OFPMatch(
eth_type=ether_types.ETH_TYPE_MPLS,
mpls_label=self._get_decap_label(
port_chain,
flow_classifier
),
mpls_tc=self._ETH_TYPE_TO_TC[eth_type],
),
inst=[
self.app.parser.OFPInstructionActions(
self.app.ofproto.OFPIT_APPLY_ACTIONS,
[
self.app.parser.OFPActionSetField(
reg2=flow_classifier.dest_port.unique_key,
),
],
),
self.app.parser.OFPInstructionGotoTable(
constants.EGRESS_TABLE,
),
],
)
def _uninstall_forward_to_dest(self, port_chain, flow_classifier):
for eth_type in self._ETH_TYPE_TO_TC:
self.app.mod_flow(
command=self.app.ofproto.OFPFC_DELETE_STRICT,
priority=constants.PRIORITY_HIGH,
table_id=constants.SFC_MPLS_DISPATCH_TABLE,
match=self.app.parser.OFPMatch(
eth_type=ether_types.ETH_TYPE_MPLS,
mpls_label=self._get_decap_label(
port_chain,
flow_classifier,
),
mpls_tc=self._ETH_TYPE_TO_TC[eth_type],
),
)
def install_flow_classifier_flows(self, port_chain, flow_classifier):
if flow_classifier.is_classification_local:
self._install_encap_flows(port_chain, flow_classifier)
if flow_classifier.is_dispatch_local:
self._install_decap_flows(port_chain, flow_classifier)
else:
self._install_forward_to_dest(port_chain, flow_classifier)
def uninstall_flow_classifier_flows(self, port_chain, flow_classifier):
if flow_classifier.is_classification_local:
self._uninstall_encap_flows(port_chain, flow_classifier)
if flow_classifier.is_dispatch_local:
self._uninstall_decap_flows(port_chain, flow_classifier)
else:
self._uninstall_forward_to_dest(port_chain, flow_classifier)
def install_flow_classifier_local_port_flows(self, port_chain,
flow_classifier):
if flow_classifier.source_port is not None:
self._install_encap_flows(port_chain, flow_classifier)
if flow_classifier.dest_port is not None:
self._uninstall_forward_to_dest(port_chain, flow_classifier)
self._install_decap_flows(port_chain, flow_classifier)
def uninstall_flow_classifier_local_port_flows(self, port_chain,
flow_classifier):
if flow_classifier.source_port is not None:
self._uninstall_encap_flows(port_chain, flow_classifier)
if flow_classifier.dest_port is not None:
self._install_forward_to_dest(port_chain, flow_classifier)
self._uninstall_decap_flows(port_chain, flow_classifier)
def _port_pair_to_bucket(self, port_pair):
if (
port_pair.correlation_mechanism == sfc.CORR_MPLS or
not port_pair.ingress_port.is_local
):
next_table = constants.EGRESS_TABLE
else:
next_table = constants.SFC_MPLS_PP_DECAP_TABLE
actions = [
self.app.parser.OFPActionSetField(
reg7=port_pair.ingress_port.unique_key,
),
self.app.parser.NXActionResubmitTable(table_id=next_table),
]
return self.app.parser.OFPBucket(actions=actions, weight=1)
def _install_port_pair_decap_flows(self, label):
for eth_type in self._ETH_TYPE_TO_TC:
self.app.mod_flow(
table_id=constants.SFC_MPLS_PP_DECAP_TABLE,
priority=constants.PRIORITY_HIGH,
match=self.app.parser.OFPMatch(
eth_type=ether_types.ETH_TYPE_MPLS,
mpls_label=label,
mpls_tc=self._ETH_TYPE_TO_TC[eth_type],
),
actions=[
self.app.parser.OFPActionPopMpls(eth_type),
self.app.parser.NXActionResubmitTable(
table_id=constants.EGRESS_TABLE,
),
],
)
def _uninstall_port_pair_decap_flows(self, label):
self.app.mod_flow(
command=self.app.ofproto.OFPFC_DELETE_STRICT,
table_id=constants.SFC_MPLS_PP_DECAP_TABLE,
priority=constants.PRIORITY_HIGH,
match=self.app.parser.OFPMatch(
eth_type=ether_types.ETH_TYPE_MPLS,
mpls_label=label,
),
)
def _install_dispatch_to_all_port_pairs(self, port_pair_group, label):
all_group_id = _get_dispatch_to_all_group_id(label)
# Add group: pick random SF from all available
self.app.add_group(
group_id=all_group_id,
group_type=self.app.ofproto.OFPGT_SELECT,
buckets=[
self._port_pair_to_bucket(pp)
for pp in port_pair_group.port_pairs
],
replace=True,
)
# Add flow: label => execute above group
self.app.mod_flow(
table_id=constants.SFC_MPLS_DISPATCH_TABLE,
priority=constants.PRIORITY_HIGH,
match=self.app.parser.OFPMatch(
eth_type=ether_types.ETH_TYPE_MPLS,
mpls_label=label,
),
actions=[self.app.parser.OFPActionGroup(group_id=all_group_id)],
)
def _uninstall_dispatch_to_all_port_pairs(self, port_pair_group, label):
all_group_id = _get_dispatch_to_all_group_id(label)
# Remove execute group flow
self.app.mod_flow(
command=self.app.ofproto.OFPFC_DELETE_STRICT,
table_id=constants.SFC_MPLS_DISPATCH_TABLE,
priority=constants.PRIORITY_HIGH,
match=self.app.parser.OFPMatch(
eth_type=ether_types.ETH_TYPE_MPLS,
mpls_label=label,
),
)
# Delete group
self.app.del_group(
group_id=all_group_id,
group_type=self.app.ofproto.OFPGT_SELECT,
)
def _install_dispatch_to_local_port_pairs(self, port_pair_group, label):
local_pps = [
pp for pp in port_pair_group.port_pairs if pp.ingress_port.is_local
]
if not local_pps:
return
local_group_id = _get_dispatch_locally_group_id(label)
# Add group: pick random SF from local only
self.app.add_group(
group_id=local_group_id,
group_type=self.app.ofproto.OFPGT_SELECT,
buckets=[self._port_pair_to_bucket(pp) for pp in local_pps],
replace=True,
)
# Add flow: label => execute above group
self.app.mod_flow(
table_id=constants.INGRESS_DESTINATION_PORT_LOOKUP_TABLE,
priority=constants.PRIORITY_VERY_HIGH,
match=self.app.parser.OFPMatch(
eth_type=ether_types.ETH_TYPE_MPLS,
mpls_label=label,
),
actions=[self.app.parser.OFPActionGroup(group_id=local_group_id)],
)
def _uninstall_dispatch_to_local_port_pairs(self, port_pair_group, label):
local_pps = [
pp for pp in port_pair_group.port_pairs if pp.ingress_port.is_local
]
if not local_pps:
return
self.app.mod_flow(
command=self.app.ofproto.OFPFC_DELETE_STRICT,
table_id=constants.INGRESS_DESTINATION_PORT_LOOKUP_TABLE,
priority=constants.PRIORITY_VERY_HIGH,
match=self.app.parser.OFPMatch(
eth_type=ether_types.ETH_TYPE_MPLS,
mpls_label=label,
),
)
local_group_id = _get_dispatch_locally_group_id(label)
self.app.del_group(
group_id=local_group_id,
group_type=self.app.ofproto.OFPGT_SELECT,
)
def install_port_pair_group_flows(self, port_chain, port_pair_group):
for flow_classifier in port_chain.flow_classifiers:
label = self._get_ingress_label(
port_chain,
flow_classifier,
port_pair_group,
)
# Flows to remove MPLS shim for non MPLS service functions
self._install_port_pair_decap_flows(label)
self._install_dispatch_to_all_port_pairs(port_pair_group, label)
self._install_dispatch_to_local_port_pairs(port_pair_group, label)
def uninstall_port_pair_group_flows(self, port_chain, port_pair_group):
for flow_classifier in port_chain.flow_classifiers:
label = self._get_ingress_label(
port_chain,
flow_classifier,
port_pair_group,
)
self._uninstall_port_pair_decap_flows(label)
self._uninstall_dispatch_to_all_port_pairs(port_pair_group, label)
self._uninstall_dispatch_to_local_port_pairs(
port_pair_group, label)
def install_port_pair_egress_flows(self, port_chain, port_pair_group,
port_pair):
if port_pair.correlation_mechanism == sfc.CORR_MPLS:
self._install_mpls_port_pair_egress_flows(
port_chain,
port_pair_group,
port_pair,
)
elif port_pair.correlation_mechanism == sfc.CORR_NONE:
self._install_none_port_pair_egress_flows(
port_chain,
port_pair_group,
port_pair,
)
else:
LOG.warning('Driver does not support correlation_mechanism %s',
port_pair.correlation_mechanism)
def _install_mpls_port_pair_egress_flows(self, port_chain, port_pair_group,
port_pair):
for flow_classifier in port_chain.flow_classifiers:
self.app.mod_flow(
table_id=constants.EGRESS_PORT_SECURITY_TABLE,
priority=constants.PRIORITY_VERY_HIGH,
match=self.app.parser.OFPMatch(
reg6=port_pair.egress_port.unique_key,
eth_type=ether_types.ETH_TYPE_MPLS,
mpls_label=self._get_ingress_label(
port_chain,
flow_classifier,
port_pair_group,
),
),
inst=[
self.app.parser.OFPInstructionActions(
self.app.ofproto.OFPIT_APPLY_ACTIONS,
[
self.app.parser.OFPActionSetField(
mpls_label=self._get_egress_label(
port_chain,
flow_classifier,
port_pair_group
),
),
],
),
self.app.parser.OFPInstructionGotoTable(
constants.SFC_MPLS_DISPATCH_TABLE,
),
],
)
def _install_none_port_pair_egress_flows(self, port_chain, port_pair_group,
port_pair):
for flow_classifier in port_chain.flow_classifiers:
mpls_label = self._get_egress_label(
port_chain,
flow_classifier,
port_pair_group,
)
for eth_type in self._ETH_TYPE_TO_TC:
self.app.mod_flow(
table_id=constants.EGRESS_PORT_SECURITY_TABLE,
priority=constants.PRIORITY_VERY_HIGH,
match=self.app.parser.OFPMatch(
reg6=port_pair.egress_port.unique_key,
eth_type=eth_type,
),
actions=[
self.app.parser.OFPActionPushMpls(
ether_types.ETH_TYPE_MPLS,
),
self.app.parser.OFPActionSetField(
mpls_label=mpls_label,
),
self.app.parser.OFPActionSetField(
mpls_tc=self._ETH_TYPE_TO_TC[eth_type],
),
self.app.parser.NXActionResubmitTable(
table_id=constants.SFC_MPLS_DISPATCH_TABLE,
),
],
)
def uninstall_port_pair_egress_flows(self, port_chain, port_pair_groups,
port_pair):
if port_pair.correlation_mechanism == sfc.CORR_MPLS:
self._uninstall_mpls_port_pair_egress_flows(
port_chain,
port_pair_groups,
port_pair,
)
elif port_pair.correlation_mechanism == sfc.CORR_NONE:
self._uninstall_none_port_pair_egress_flows(port_pair)
else:
LOG.warning('Driver does not support correlation_mechanism %s',
port_pair.correlation_mechanism)
def _uninstall_mpls_port_pair_egress_flows(self, port_chain,
port_pair_group, port_pair):
for flow_classifier in port_chain.flow_classifiers:
self.app.mod_flow(
command=self.app.ofproto.OFPFC_DELETE_STRICT,
table_id=constants.EGRESS_PORT_SECURITY_TABLE,
priority=constants.PRIORITY_VERY_HIGH,
match=self.app.parser.OFPMatch(
reg6=port_pair.egress_port.unique_key,
eth_type=ether_types.ETH_TYPE_MPLS,
mpls_label=self._get_ingress_label(
port_chain,
flow_classifier,
port_pair_group,
),
),
)
def _uninstall_none_port_pair_egress_flows(self, port_pair):
for eth_type in self._ETH_TYPE_TO_TC:
self.app.mod_flow(
command=self.app.ofproto.OFPFC_DELETE_STRICT,
priority=constants.PRIORITY_VERY_HIGH,
table_id=constants.EGRESS_PORT_SECURITY_TABLE,
match=self.app.parser.OFPMatch(
reg6=port_pair.egress_port.unique_key,
eth_type=eth_type,
),
)

View File

@ -75,6 +75,8 @@ INGRESS_DISPATCH_TABLE = 115
# SFC tables
SFC_ENCAP_TABLE = 120
SFC_MPLS_DISPATCH_TABLE = 121
SFC_MPLS_PP_DECAP_TABLE = 122
SFC_END_OF_CHAIN_TABLE = 125
# Table used by aging app.

View File

@ -166,7 +166,7 @@ class DFlowApp(object):
LOG.debug("Got the following flows: %s", flows)
return flows
def add_group(self, group_id, group_type, buckets):
def add_group(self, group_id, group_type, buckets, replace=False):
"""Add an entry to the groups table:
:param group_id: ID for the new group
@ -174,6 +174,12 @@ class DFlowApp(object):
:param buckets: List of parser.OFPBucket objects that define
group's actions.
"""
if replace:
self.del_group(
group_id=group_id,
group_type=group_type,
)
self._mod_group(
command=self.ofproto.OFPGC_ADD,
group_id=group_id,

View File

@ -26,7 +26,8 @@ PROTO_MPLS = 'mpls'
@mf.register_model
@mf.construct_nb_db_model(
indexes={
'egress': 'egress_port',
'egress': 'egress_port.id',
'ingress': 'ingress_port.id',
},
)
class PortPair(mf.ModelBase,
@ -44,7 +45,7 @@ class PortPair(mf.ModelBase,
@mf.register_model
@mf.construct_nb_db_model(
indexes={
'port_pairs': 'port_pairs',
'port_pairs': 'port_pairs.id',
},
)
class PortPairGroup(mf.ModelBase,
@ -141,7 +142,7 @@ class FlowClassifier(mf.ModelBase,
@mf.construct_nb_db_model(
indexes={
'flow_classifiers': 'flow_classifiers.id',
'port_pair_groups': 'port_pair_groups',
'port_pair_groups': 'port_pair_groups.id',
},
)
class PortChain(mf.ModelBase,

View File

@ -32,7 +32,9 @@ from ryu.lib.packet import icmp
from ryu.lib.packet import icmpv6
from ryu.lib.packet import ipv4
from ryu.lib.packet import ipv6
from ryu.lib.packet import mpls
from ryu.lib.packet import packet
from ryu.lib.packet import udp
from ryu.lib.packet import vlan
import six
@ -157,7 +159,8 @@ class Topology(object):
def get_networks(self):
return self.networks
def create_subnet(self, network=None, cidr=None, enable_dhcp=True):
def create_subnet(self, network=None, cidr=None, enable_dhcp=True,
allocation_pool=()):
"""Create a subnet in this topology, with the given subnet address
range.
:param cidr: The subnet's address range, in form <IP>/<mask len>.
@ -166,11 +169,14 @@ class Topology(object):
:type cidr: String
:param enable_dhcp: Whether to enable dhcp for this subnet.
:type cidr: Boolean
:param allocation_pool: Optional, allocation range for DHCP
:type allocation_pool: Tuple of 2 addresses (start, end)
"""
if not network:
network = self.networks[0]
subnet_id = len(self.subnets)
subnet = Subnet(self, network, subnet_id, cidr, enable_dhcp)
subnet = Subnet(self, network, subnet_id, cidr, enable_dhcp,
allocation_pool)
self.subnets.append(subnet)
return subnet
@ -205,7 +211,8 @@ class Topology(object):
class Subnet(object):
"""Represent a single subnet."""
def __init__(self, topology, network, subnet_id, cidr, enable_dhcp):
def __init__(self, topology, network, subnet_id, cidr, enable_dhcp,
allocation_pool):
"""Create the subnet under the given topology, with the given ID, and
the given address range.
:param topology: The topology to which the subnet belongs
@ -220,6 +227,9 @@ class Subnet(object):
:type cidr: String
:param enable_dhcp: Whether to enable dhcp for this subnet.
:type cidr: Boolean
:param allocation_pool: Allocation range for DHCP
:type allocation_pool: Tuple of (start, end) or empty tuple for
implicit range.
"""
self.topology = topology
self.subnet_id = subnet_id
@ -232,12 +242,21 @@ class Subnet(object):
)
if cidr:
ip_version = self._get_ip_version(cidr)
self.subnet.create(subnet={
subnet = {
'cidr': cidr,
'enable_dhcp': enable_dhcp,
'ip_version': ip_version,
'network_id': self.network.network_id
})
}
if allocation_pool:
start, end = allocation_pool
subnet['allocation_pools'] = [
{
'start': start,
'end': end,
},
]
self.subnet.create(subnet=subnet)
else:
self.subnet.create()
@ -675,6 +694,21 @@ class Filter(object):
raise Exception('Filter not implemented')
class ExactMatchFilter(Filter):
def __init__(self, fixture):
self._fixture = fixture
def __call__(self, buf):
return self._fixture == buf
class RyuIPv4Filter(object):
"""Use ryu to parse the packet and test if it's IPv4."""
def __call__(self, buf):
pkt = packet.Packet(buf)
return (pkt.get_protocol(ipv4.ipv4) is not None)
class RyuIPv6Filter(object):
"""Use ryu to parse the packet and test if it's IPv6."""
def __call__(self, buf):
@ -946,6 +980,40 @@ class AndingFilter(object):
return all(filter_(buf) for filter_ in self.filters)
class RyuMplsFilter(object):
def __init__(self, label=None):
self._label = label
def __call__(self, buf):
pkt = packet.Packet(buf)
pkt_mpls = pkt.get_protocol(mpls.mpls)
if pkt_mpls is None:
return False
if self._label is not None and pkt_mpls.label != self._label:
return False
return True
class RyuUdpFilter(object):
def __init__(self, dst_port=None):
self._dst_port = dst_port
def __call__(self, buf):
pkt = packet.Packet(buf)
pkt_udp = pkt.get_protocol(udp.udp)
if pkt_udp is None:
return False
if self._dst_port is not None and pkt_udp.dst_port != self._dst_port:
return False
return True
class Action(object):
"""Base class of actions to execute. Actions are executed on matched
packets in policy rules (PortPolicyRule).

View File

@ -671,3 +671,107 @@ class ChildPortSegmentationTestObj(object):
'tenant_id': self.trunk['tenant_id']})
if not keep_trunk:
self.neutron.delete_trunk(self.trunk['id'])
class PortPairTestObj(object):
def __init__(self, neutron, nb_api):
self.portpair_id = None
self.neutron = neutron
self.nb_api = nb_api
self.closed = False
def create(self, portpair):
new_portpair = self.neutron.create_port_pair({'port_pair': portpair})
self.portpair_id = new_portpair['port_pair']['id']
return self.portpair_id
def create_from_ports(self, ingress, egress, type_='mpls'):
self.create({
'ingress': ingress.port.port_id,
'egress': egress.port.port_id,
'service_function_parameters': {
'correlation': type_,
},
})
self.ingress = ingress
self.egress = egress
def close(self):
if self.closed or self.portpair_id is None:
return
self.neutron.delete_port_pair(self.portpair_id)
self.portpair_id = None
class PortPairGroupTestObj(object):
def __init__(self, neutron, nb_api):
self.portpairgroup_id = None
self.neutron = neutron
self.nb_api = nb_api
self.closed = False
def create(self, portpairgroup):
new_ppg = self.neutron.create_port_pair_group(
{'port_pair_group': portpairgroup})
self.portpairgroup_id = new_ppg['port_pair_group']['id']
return self.portpairgroup_id
def create_from_portpairs(self, pps):
self.create({
'port_pairs': [pp.portpair_id for pp in pps],
})
self.port_pairs = pps
def close(self):
if self.closed or self.portpairgroup_id is None:
return
self.neutron.delete_port_pair_group(self.portpairgroup_id)
self.portpairgroup_id = None
class PortChainTestObj(object):
def __init__(self, neutron, nb_api):
self.portchain_id = None
self.neutron = neutron
self.nb_api = nb_api
self.closed = False
def create(self, portchain):
new_portchain = self.neutron.create_port_chain(
{'port_chain': portchain})
self.portchain_id = new_portchain['port_chain']['id']
return self.portchain_id
def create_from_fcs_ppgs(self, fcs, ppgs):
self.create({
'flow_classifiers': [fc.flowclassifier_id for fc in fcs],
'port_pair_groups': [ppg.portpairgroup_id for ppg in ppgs],
})
self.flow_classifiers = fcs
self.port_pair_groups = ppgs
def close(self):
if self.closed or self.portchain_id is None:
return
self.neutron.delete_port_chain(self.portchain_id)
self.portchain_id = None
class FlowClassifierTestObj(object):
def __init__(self, neutron, nb_api):
self.flowclassifier_id = None
self.neutron = neutron
self.nb_api = nb_api
self.closed = False
def create(self, flowclassifier):
new_flowclassifier = self.neutron.create_flow_classifier(
{'flow_classifier': flowclassifier})
self.flowclassifier_id = new_flowclassifier['flow_classifier']['id']
return self.flowclassifier_id
def close(self):
if self.closed or self.flowclassifier_id is None:
return
self.neutron.delete_flow_classifier(self.flowclassifier_id)
self.flowclassifier_id = None

View File

@ -0,0 +1,631 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import time
import mock
from neutron_lib import constants
from oslo_log import log
from ryu.lib.packet import ether_types
from ryu.lib.packet import ethernet
from ryu.lib.packet import in_proto as inet
from ryu.lib.packet import ipv4
from ryu.lib.packet import ipv6
from ryu.lib.packet import mpls
from ryu.lib.packet import packet
from ryu.lib.packet import tcp
from ryu.lib.packet import udp
import testscenarios
from dragonflow.db.models import l2
from dragonflow.db.models import sfc
from dragonflow.tests.common import app_testing_objects
from dragonflow.tests.fullstack import test_base
from dragonflow.tests.fullstack import test_objects as objects
LOG = log.getLogger(__name__)
IPV4_CIDR = '192.168.19.0/24'
IPV4_ALLOCATION_POOL = ('192.168.19.30', '192.168.19.254')
IPV6_CIDR = '2001:db9::1/64'
IPV6_ALLOCATION_POOL = ('2001:db9::30', '2001:db9::99')
SRC_IPV4_1 = '192.168.19.11'
SRC_IPV4_2 = '192.168.19.12'
DST_IPV4_1 = '192.168.19.21'
DST_IPV4_2 = '192.168.19.22'
SRC_IPV6_1 = '2001:db9::11'
SRC_IPV6_2 = '2001:db9::12'
DST_IPV6_1 = '2001:db9::21'
DST_IPV6_2 = '2001:db9::22'
SRC_PORT = 2222
DST_PORT = 4444
load_tests = testscenarios.load_tests_apply_scenarios
# We could use the default 30sec timeout here but testing showed that 5 seconds
# is was good enough, double that and the 20 secs still save almost 10 minutes
# in total.
_QUICK_RESOURCE_READY_TIMEOUT = 10
class SfcTestsCommonBase(test_base.DFTestBase):
def _create_sf_port(self):
port = self.subnet.create_port([self.security_group.secgroup_id])
port.update({'device_id': 'device1'})
return port
@classmethod
def _parse_sfc_packet(cls, buf):
# NOTE (dimak) Ruy's mpls class always parses its payload as ipv4.
# since we also test with ipv6 packets, we monkeypatch its parse
# method to try ipv6 as next protocol.
orig_parser = mpls.mpls.parser
for cls in (ipv4.ipv4, ipv6.ipv6):
def parser(*args, **kwargs):
res = orig_parser(*args, **kwargs)
return res[0], cls, res[2]
with mock.patch(
'ryu.lib.packet.mpls.mpls.parser', side_effect=parser,
):
pkt = packet.Packet(buf)
if not isinstance(pkt.protocols[-1], mpls.mpls):
return pkt
raise ValueError(buf)
@classmethod
def _sf_callback(cls, buf):
'''This is the code each SF runs, increments all chars in payload.
'''
pkt = cls._parse_sfc_packet(buf)
protocols = pkt.protocols[:-1]
payload = bytearray(pkt.protocols[-1])
new_payload = bytearray(c + 1 for c in payload)
new_pkt = packet.Packet()
for protocol in protocols:
if hasattr(protocol, 'csum'):
protocol.csum = 0
new_pkt.add_protocol(protocol)
new_pkt.add_protocol(new_payload)
new_pkt.serialize()
return new_pkt.data
def _create_pp(self):
ingress = self._create_sf_port()
egress = self._create_sf_port()
pp = self.store(objects.PortPairTestObj(self.neutron, self.nb_api))
pp.create_from_ports(
ingress=ingress,
egress=egress,
type_=self.corr,
)
return pp
def _create_ppg(self, width):
pps = [self._create_pp() for _ in range(width)]
ppg = self.store(
objects.PortPairGroupTestObj(self.neutron, self.nb_api))
ppg.create_from_portpairs(pps)
return ppg
def _create_pc(self, fc, layout):
ppgs = [self._create_ppg(w) for w in layout]
pc = self.store(
objects.PortChainTestObj(self.neutron, self.nb_api))
pc.create_from_fcs_ppgs([fc], ppgs)
return pc
def setUp(self):
super(SfcTestsCommonBase, self).setUp()
self.security_group = self.store(objects.SecGroupTestObj(
self.neutron,
self.nb_api))
security_group_id = self.security_group.create()
self.assertTrue(self.security_group.exists())
for direction, ethertype, protocol in itertools.product(
('ingress', 'egress'),
(constants.IPv4, constants.IPv6),
(constants.PROTO_NAME_TCP, constants.PROTO_NAME_UDP),
):
rule = {
'direction': direction,
'ethertype': ethertype,
'protocol': protocol,
'port_range_min': 1,
'port_range_max': 65535,
}
rule_id = self.security_group.rule_create(secrule=rule)
self.assertTrue(self.security_group.rule_exists(rule_id))
self.topology = self.store(
app_testing_objects.Topology(
self.neutron,
self.nb_api,
),
)
self.subnet = self.topology.create_subnet(
cidr=IPV4_CIDR,
enable_dhcp=True,
allocation_pool=IPV4_ALLOCATION_POOL,
)
self.subnet_ipv6 = self.topology.create_subnet(
cidr=IPV6_CIDR,
enable_dhcp=True,
allocation_pool=IPV6_ALLOCATION_POOL,
)
self.src_port = self.subnet.create_port([security_group_id])
self.dst_port = self.subnet.create_port([security_group_id])
self.src_port.update({
'name': 'src_port',
'admin_state_up': True,
'fixed_ips': [
{'ip_address': SRC_IPV4_1},
{'ip_address': SRC_IPV4_2},
{'ip_address': SRC_IPV6_1},
{'ip_address': SRC_IPV6_2},
],
})
self.dst_port.update({
'name': 'dst_port',
'admin_state_up': True,
'fixed_ips': [
{'ip_address': DST_IPV4_1},
{'ip_address': DST_IPV4_2},
{'ip_address': DST_IPV6_1},
{'ip_address': DST_IPV6_2},
],
})
self.src_lport = self.nb_api.get(
l2.LogicalPort(id=self.src_port.port.port_id),
)
self.dst_lport = self.nb_api.get(
l2.LogicalPort(id=self.dst_port.port.port_id),
)
def _create_port_policies(self, pc):
res = {}
if self.corr == sfc.CORR_MPLS:
sf_filter = app_testing_objects.RyuMplsFilter()
else:
sf_filter = app_testing_objects.RyuUdpFilter(DST_PORT)
for _, ppg in enumerate(pc.port_pair_groups):
for _, pp in enumerate(ppg.port_pairs):
key = (self.subnet.subnet_id, pp.ingress.port_id)
res[key] = app_testing_objects.PortPolicy(
rules=[
app_testing_objects.PortPolicyRule(
sf_filter,
actions=[
app_testing_objects.SendAction(
self.subnet.subnet_id,
pp.egress.port_id,
self._sf_callback,
),
],
),
],
default_action=app_testing_objects.IgnoreAction(),
)
return res
def _gen_ethernet(self, src=None, dst=None, ethertype=None):
return ethernet.ethernet(
src=(src or self.src_lport.mac),
dst=(dst or self.dst_lport.mac),
ethertype=(ethertype or ether_types.ETH_TYPE_IP),
)
def _gen_ipv4(self, proto, src=None, dst=None):
return ipv4.ipv4(
src=(src or SRC_IPV4_1),
dst=(dst or DST_IPV4_1),
proto=proto,
)
def _gen_ipv6(self, nxt, src=None, dst=None):
return ipv6.ipv6(
src=(src or SRC_IPV6_1),
dst=(dst or DST_IPV6_1),
nxt=nxt,
)
@classmethod
def _gen_udp(cls, src_port=SRC_PORT, dst_port=DST_PORT):
return udp.udp(
src_port=src_port,
dst_port=dst_port,
)
@classmethod
def _gen_tcp(cls, src_port=SRC_PORT, dst_port=DST_PORT, bits=tcp.TCP_SYN):
return tcp.tcp(
src_port=src_port,
dst_port=dst_port,
bits=bits,
)
@classmethod
def _get_bytes(cls, pkt):
pkt.serialize()
return pkt.data
def _make_scenario(name, **kwargs):
return (
name,
{
'pkt_ipver': kwargs.get('pkt_ipver', constants.IP_VERSION_4),
'pkt_proto': kwargs.get('pkt_proto', constants.PROTO_NAME_UDP),
'fc_lport_type': kwargs.get('fc_lport_type', 'src'),
'fc_ipver': kwargs.get('fc_ipver'),
'fc_ip_src': kwargs.get('fc_ip_src'),
'fc_ip_dst': kwargs.get('fc_ip_dst'),
'fc_proto': kwargs.get('fc_proto'),
'fc_src_tp_range': kwargs.get('fc_src_tp_range'),
'fc_dst_tp_range': kwargs.get('fc_dst_tp_range'),
'fc_matches': kwargs['fc_matches'],
}
)
class TestFcApp(SfcTestsCommonBase):
corr = 'mpls'
scenarios = [
_make_scenario(
'src_lport',
fc_lport_type='src',
fc_matches=True,
),
_make_scenario(
'dst_lport',
fc_lport_type='dst',
fc_matches=True,
),
_make_scenario(
'ipv4',
pkt_ipver=constants.IP_VERSION_4,
fc_ipver=constants.IP_VERSION_4,
fc_matches=True,
),
_make_scenario(
'ipv4_negative',
pkt_ipver=constants.IP_VERSION_4,
fc_ipver=constants.IP_VERSION_6,
fc_matches=False,
),
_make_scenario(
'ipv6',
pkt_ipver=constants.IP_VERSION_6,
fc_ipver=constants.IP_VERSION_6,
fc_matches=True,
),
_make_scenario(
'ipv6_negative',
pkt_ipver=constants.IP_VERSION_6,
fc_ipver=constants.IP_VERSION_4,
fc_matches=False
),
_make_scenario(
'ipv4_src_cidr',
fc_ipver=constants.IP_VERSION_4,
fc_ip_src=SRC_IPV4_1,
fc_matches=True,
),
_make_scenario(
'ipv4_src_cidr_negative',
fc_ipver=constants.IP_VERSION_4,
fc_ip_src=SRC_IPV4_2,
fc_matches=False,
),
_make_scenario(
'ipv4_dst_cidr',
fc_ipver=constants.IP_VERSION_4,
fc_ip_dst=DST_IPV4_1,
fc_matches=True,
),
_make_scenario(
'ipv4_dst_cidr_negative',
fc_ipver=constants.IP_VERSION_4,
fc_ip_dst=DST_IPV4_2,
fc_matches=False,
),
_make_scenario(
'ipv6_src_cidr',
pkt_ipver=constants.IP_VERSION_6,
fc_ipver=constants.IP_VERSION_6,
fc_ip_src=SRC_IPV6_1,
fc_matches=True,
),
_make_scenario(
'ipv6_src_cidr_negative',
pkt_ipver=constants.IP_VERSION_6,
fc_ipver=constants.IP_VERSION_6,
fc_ip_src=SRC_IPV6_2,
fc_matches=False,
),
_make_scenario(
'ipv6_dst_cidr',
pkt_ipver=constants.IP_VERSION_6,
fc_ipver=constants.IP_VERSION_6,
fc_ip_dst=DST_IPV6_1,
fc_matches=True,
),
_make_scenario(
'ipv6_dst_cidr_negative',
pkt_ipver=constants.IP_VERSION_6,
fc_ipver=constants.IP_VERSION_6,
fc_ip_dst=DST_IPV6_2,
fc_matches=False,
),
_make_scenario(
'proto_tcp',
pkt_proto=constants.PROTO_NAME_TCP,
fc_ipver=constants.IP_VERSION_4,
fc_proto=constants.PROTO_NAME_TCP,
fc_matches=True,
),
_make_scenario(
'proto_udp',
pkt_proto=constants.PROTO_NAME_UDP,
fc_ipver=constants.IP_VERSION_4,
fc_proto=constants.PROTO_NAME_UDP,
fc_matches=True,
),
_make_scenario(
'proto_negative',
pkt_proto=constants.PROTO_NAME_UDP,
fc_ipver=constants.IP_VERSION_4,
fc_proto=constants.PROTO_NAME_TCP,
fc_matches=False,
),
_make_scenario(
'src_ports',
pkt_proto=constants.PROTO_NAME_UDP,
fc_ipver=constants.IP_VERSION_4,
fc_proto=constants.PROTO_NAME_UDP,
fc_src_tp_range=[SRC_PORT - 1, SRC_PORT + 1],
fc_matches=True,
),
_make_scenario(
'src_ports_negative',
pkt_proto=constants.PROTO_NAME_UDP,
fc_ipver=constants.IP_VERSION_4,
fc_proto=constants.PROTO_NAME_UDP,
fc_src_tp_range=[SRC_PORT + 1, SRC_PORT + 2],
fc_matches=False,
),
_make_scenario(
'dst_ports',
pkt_proto=constants.PROTO_NAME_UDP,
fc_ipver=constants.IP_VERSION_4,
fc_proto=constants.PROTO_NAME_UDP,
fc_dst_tp_range=[DST_PORT - 1, DST_PORT + 1],
fc_matches=True,
),
_make_scenario(
'dst_ports_negative',
pkt_proto=constants.PROTO_NAME_UDP,
fc_ipver=constants.IP_VERSION_4,
fc_proto=constants.PROTO_NAME_UDP,
fc_dst_tp_range=[DST_PORT + 1, DST_PORT + 2],
fc_matches=False,
),
]
@property
def _fc_params(self):
IPVER_TO_MASK = {
constants.IP_VERSION_4: constants.IPv4_BITS,
constants.IP_VERSION_6: constants.IPv6_BITS,
}
params = {}
if self.fc_lport_type == 'src':
params['logical_source_port'] = self.src_lport.id
elif self.fc_lport_type == 'dst':
params['logical_destination_port'] = self.dst_lport.id
if self.fc_ipver == constants.IP_VERSION_4:
params['ethertype'] = constants.IPv4
elif self.fc_ipver == constants.IP_VERSION_6:
params['ethertype'] = constants.IPv6
if self.fc_ip_src is not None:
params['source_ip_prefix'] = '{addr}/{mask}'.format(
addr=self.fc_ip_src,
mask=IPVER_TO_MASK[self.fc_ipver],
)
if self.fc_ip_dst is not None:
params['destination_ip_prefix'] = '{addr}/{mask}'.format(
addr=self.fc_ip_dst,
mask=IPVER_TO_MASK[self.fc_ipver],
)
if self.fc_proto is not None:
params['protocol'] = self.fc_proto
if self.fc_src_tp_range is not None:
params['source_port_range_min'] = self.fc_src_tp_range[0]
params['source_port_range_max'] = self.fc_src_tp_range[1]
if self.fc_dst_tp_range is not None:
params['destination_port_range_min'] = self.fc_dst_tp_range[0]
params['destination_port_range_max'] = self.fc_dst_tp_range[1]
return params
@property
def _initial_packet(self):
payload = '0' * 64
if self.pkt_proto == constants.PROTO_NAME_TCP:
tp = self._gen_tcp()
proto = inet.IPPROTO_TCP
elif self.pkt_proto == constants.PROTO_NAME_UDP:
tp = self._gen_udp()
proto = inet.IPPROTO_UDP
if self.pkt_ipver == constants.IP_VERSION_4:
nw = self._gen_ipv4(proto)
ethertype = ether_types.ETH_TYPE_IP
else:
nw = self._gen_ipv6(proto)
ethertype = ether_types.ETH_TYPE_IPV6
return self._get_bytes(
self._gen_ethernet(ethertype=ethertype) / nw / tp / payload
)
@property
def _final_packet(self):
packet = self._initial_packet
if self.fc_matches:
packet = self._sf_callback(packet)
return packet
def test_fc(self):
fc = self.store(
objects.FlowClassifierTestObj(self.neutron, self.nb_api),
)
fc.create(self._fc_params)
pc = self._create_pc(fc, [1])
time.sleep(_QUICK_RESOURCE_READY_TIMEOUT)
dst_key = (self.subnet.subnet_id, self.dst_port.port_id)
port_policies = {
dst_key: app_testing_objects.PortPolicy(
rules=[
app_testing_objects.PortPolicyRule(
app_testing_objects.ExactMatchFilter(
self._final_packet,
),
actions=[app_testing_objects.StopSimulationAction()],
),
],
default_action=app_testing_objects.IgnoreAction(),
),
}
port_policies.update(self._create_port_policies(pc))
policy = self.store(
app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(
self.subnet.subnet_id,
self.src_port.port_id,
self._initial_packet,
),
],
port_policies=port_policies,
unknown_port_action=app_testing_objects.LogAction()
),
)
policy.start(self.topology)
policy.wait(10)
if policy.exceptions:
raise policy.exceptions[0]
class TestSfcApp(SfcTestsCommonBase):
scenarios = testscenarios.scenarios.multiply_scenarios(
[
('corr-none', {'corr': None}),
('corr-mpls', {'corr': sfc.CORR_MPLS}),
],
[
('single-ppg', {'layout': (1,)}),
('single-wide-ppg', {'layout': (3,)}),
('three-ppgs', {'layout': (1, 1, 1)}),
('mixed-ppgs', {'layout': (2, 1, 3)}),
],
)
def test_sfc(self):
initial_packet = self._get_bytes(
self._gen_ethernet() /
self._gen_ipv4(proto=inet.IPPROTO_UDP) /
self._gen_udp(src_port=SRC_PORT, dst_port=DST_PORT) /
('0' * 64)
)
final_packet = self._get_bytes(
self._gen_ethernet() /
self._gen_ipv4(proto=inet.IPPROTO_UDP) /
self._gen_udp(src_port=SRC_PORT, dst_port=DST_PORT) /
('{len}'.format(len=len(self.layout)) * 64)
)
fc = self.store(
objects.FlowClassifierTestObj(self.neutron, self.nb_api),
)
fc.create(
{
'logical_source_port': self.src_port.port.port_id
},
)
pc = self._create_pc(fc, self.layout)
time.sleep(_QUICK_RESOURCE_READY_TIMEOUT)
dst_key = (self.subnet.subnet_id, self.dst_port.port_id)
port_policies = {
dst_key: app_testing_objects.PortPolicy(
rules=[
app_testing_objects.PortPolicyRule(
app_testing_objects.ExactMatchFilter(final_packet),
actions=[app_testing_objects.StopSimulationAction()],
),
],
default_action=app_testing_objects.IgnoreAction(),
),
}
port_policies.update(self._create_port_policies(pc))
policy = self.store(
app_testing_objects.Policy(
initial_actions=[
app_testing_objects.SendAction(
self.subnet.subnet_id,
self.src_port.port_id,
initial_packet,
),
],
port_policies=port_policies,
unknown_port_action=app_testing_objects.LogAction()
),
)
policy.start(self.topology)
policy.wait(10)
if policy.exceptions:
raise policy.exceptions[0]

View File

@ -0,0 +1,380 @@
# Copyright (c) 2016 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from dragonflow.db.models import l2
from dragonflow.db.models import sfc
from dragonflow.tests.common import utils
from dragonflow.tests.unit import test_app_base
lswitch1 = l2.LogicalSwitch(
id='lswitch1',
topic='topic1',
version=10,
unique_key=22,
)
fc1lport = l2.LogicalPort(
id='lport1',
topic='topic1',
version=10,
unique_key=22,
lswitch='lswitch1',
binding=test_app_base.local_binding,
)
fc2lport = l2.LogicalPort(
id='lport2',
topic='topic1',
version=10,
unique_key=23,
lswitch='lswitch1',
binding=test_app_base.local_binding,
)
fc1 = sfc.FlowClassifier(
id='fc1',
topic='topic1',
unique_key=22,
source_port='lport1',
)
fc2 = sfc.FlowClassifier(
id='fc2',
topic='topic1',
unique_key=23,
dest_port='lport2',
)
pp11ingress = l2.LogicalPort(
id='pp11ingress',
topic='topic1',
version=10,
unique_key=23,
lswitch='lswitch1',
binding=test_app_base.local_binding,
)
pp11egress = l2.LogicalPort(
id='pp11egress',
topic='topic1',
version=10,
unique_key=24,
lswitch='lswitch1',
binding=test_app_base.local_binding,
)
pp12ingress = l2.LogicalPort(
id='pp12ingress',
topic='topic1',
version=10,
unique_key=24,
lswitch='lswitch1',
binding=test_app_base.local_binding,
)
pp12egress = l2.LogicalPort(
id='pp12egress',
topic='topic1',
version=10,
unique_key=24,
lswitch='lswitch1',
binding=test_app_base.local_binding,
)
pp11 = sfc.PortPair(
id='pp11',
topic='topic1',
ingress_port='pp11ingress',
egress_port='pp11egress',
)
pp21 = sfc.PortPair(
id='pp21',
topic='topic1',
ingress_port='pp11ingress',
egress_port='pp11egress',
)
pp12 = sfc.PortPair(
id='pp12',
topic='topic1',
ingress_port='pp12ingress',
egress_port='pp12egress',
)
ppg1 = sfc.PortPairGroup(
id='ppg1',
topic='topic1',
port_pairs=['pp11', 'pp12'],
)
ppg2 = sfc.PortPairGroup(
id='ppg2',
topic='topic1',
port_pairs=['pp21'],
)
pc1 = sfc.PortChain(
id='pc1',
topic='topic1',
flow_classifiers=['fc1'],
port_pair_groups=['ppg1'],
)
pc1_fc_add = sfc.PortChain(
id='pc1',
topic='topic1',
flow_classifiers=['fc1', 'fc2'],
port_pair_groups=['ppg1'],
)
pc1_fc_remove = sfc.PortChain(
id='pc1',
topic='topic1',
flow_classifiers=[],
port_pair_groups=['ppg1'],
)
pc1_fc_change = sfc.PortChain(
id='pc1',
topic='topic1',
flow_classifiers=['fc2'],
port_pair_groups=['ppg1'],
)
pc1_ppg_add = sfc.PortChain(
id='pc1',
topic='topic1',
flow_classifiers=['fc1'],
port_pair_groups=['ppg1', 'ppg2'],
)
pc1_ppg_remove = sfc.PortChain(
id='pc1',
topic='topic1',
flow_classifiers=['fc1'],
port_pair_groups=[],
)
pc1_ppg_change = sfc.PortChain(
id='pc1',
topic='topic1',
flow_classifiers=['fc1'],
port_pair_groups=['ppg2'],
)
l2_objs = (lswitch1, fc1lport, fc2lport, pp11ingress, pp11egress, pp12ingress,
pp12egress)
class TestSfcApp(test_app_base.DFAppTestBase):
apps_list = ['sfc']
def setUp(self):
super(TestSfcApp, self).setUp()
self.app = self.open_flow_app.dispatcher.apps[0]
self.driver = mock.Mock()
def get_driver(pc):
return self.driver
self.app._get_port_chain_driver = get_driver
@utils.with_local_objects(fc1, pp11, pp12, ppg1, pc1, *l2_objs)
def test_port_chain_added(self):
pc1.emit_created()
self.driver.install_flow_classifier_flows.assert_called_once_with(
pc1, pc1.flow_classifiers[0])
self.driver.install_port_pair_group_flows(
pc1, pc1.port_pair_groups[0])
self.driver.install_port_pair_egress_flows.assert_has_calls(
[
mock.call(
pc1,
pc1.port_pair_groups[0],
pc1.port_pair_groups[0].port_pairs[0],
),
mock.call(
pc1,
pc1.port_pair_groups[0],
pc1.port_pair_groups[0].port_pairs[1],
),
]
)
self.assertEqual(
2, self.driver.install_port_pair_group_flows.call_count)
@utils.with_local_objects(fc1, pp11, pp12, ppg1, pc1, *l2_objs)
def test_port_chain_deleted(self):
pc1.emit_deleted()
self.driver.uninstall_flow_classifier_flows.assert_called_once_with(
pc1, pc1.flow_classifiers[0])
self.driver.uninstall_port_pair_group_flows(
pc1, pc1.port_pair_groups[0])
self.driver.uninstall_port_pair_egress_flows.assert_has_calls(
[
mock.call(
pc1,
pc1.port_pair_groups[0],
pc1.port_pair_groups[0].port_pairs[0],
),
mock.call(
pc1,
pc1.port_pair_groups[0],
pc1.port_pair_groups[0].port_pairs[1],
),
]
)
self.assertEqual(
2, self.driver.uninstall_port_pair_group_flows.call_count)
@utils.with_local_objects(fc1, fc2, pp11, pp12, ppg1, pc1, *l2_objs)
def test_port_chain_updated_add_fc(self):
pc1_fc_add.emit_updated(pc1)
self.driver.install_flow_classifier_flows.assert_called_once_with(
pc1_fc_add, pc1_fc_add.flow_classifiers[1])
self.driver.uninstall_flow_classifier_flows.assert_not_called()
@utils.with_local_objects(fc1, fc2, pp11, pp12, ppg1, pc1, *l2_objs)
def test_port_chain_updated_remove_fc(self):
pc1_fc_remove.emit_updated(pc1)
self.driver.install_flow_classifier_flows.assert_not_called()
self.driver.uninstall_flow_classifier_flows.assert_called_once_with(
pc1, pc1.flow_classifiers[0])
@utils.with_local_objects(fc1, fc2, pp11, pp12, ppg1, pc1, *l2_objs)
def test_port_chain_updated_replace_fc(self):
pc1_fc_change.emit_updated(pc1)
self.driver.uninstall_flow_classifier_flows.assert_called_once_with(
pc1, pc1.flow_classifiers[0])
self.driver.install_flow_classifier_flows.assert_called_once_with(
pc1_fc_change, pc1_fc_change.flow_classifiers[0])
@utils.with_local_objects(fc1, pp21, ppg2, pp11, pp12, ppg1, pc1, *l2_objs)
def test_port_chain_updated_add_ppg(self):
pc1_ppg_add.emit_updated(pc1)
self.driver.install_port_pair_group_flows.assert_called_once_with(
pc1_ppg_add, pc1_ppg_add.port_pair_groups[1])
self.driver.uninstall_port_pair_group_flows.assert_not_called()
self.driver.install_port_pair_egress_flows.assert_called_once_with(
pc1_ppg_add,
pc1_ppg_add.port_pair_groups[1],
pc1_ppg_add.port_pair_groups[1].port_pairs[0],
)
self.driver.uninstall_port_pair_egress_flows.assert_not_called()
@utils.with_local_objects(fc1, pp21, ppg2, pp11, pp12, ppg1, pc1, *l2_objs)
def test_port_chain_updated_remove_ppg(self):
pc1_ppg_remove.emit_updated(pc1)
self.driver.install_port_pair_group_flows.assert_not_called()
self.driver.uninstall_port_pair_group_flows.assert_called_once_with(
pc1, pc1.port_pair_groups[0])
self.driver.install_port_pair_egress_flows.assert_not_called()
self.driver.uninstall_port_pair_egress_flows.assert_has_calls(
[
mock.call(
pc1,
pc1.port_pair_groups[0],
pc1.port_pair_groups[0].port_pairs[0],
),
mock.call(
pc1,
pc1.port_pair_groups[0],
pc1.port_pair_groups[0].port_pairs[1],
),
],
)
self.assertEqual(
2, self.driver.uninstall_port_pair_egress_flows.call_count)
@utils.with_local_objects(fc1, pp21, ppg2, pp11, pp12, ppg1, pc1, *l2_objs)
def test_port_chain_updated_replace_ppg(self):
pc1_ppg_change.emit_updated(pc1)
self.driver.install_port_pair_group_flows.assert_called_once_with(
pc1_ppg_change, pc1_ppg_change.port_pair_groups[0])
self.driver.uninstall_port_pair_group_flows.assert_called_once_with(
pc1, pc1.port_pair_groups[0])
self.driver.install_port_pair_egress_flows.assert_called_once_with(
pc1_ppg_change,
pc1_ppg_change.port_pair_groups[0],
pc1_ppg_change.port_pair_groups[0].port_pairs[0],
)
self.driver.uninstall_port_pair_egress_flows.assert_has_calls(
[
mock.call(
pc1,
pc1.port_pair_groups[0],
pc1.port_pair_groups[0].port_pairs[0],
),
mock.call(
pc1,
pc1.port_pair_groups[0],
pc1.port_pair_groups[0].port_pairs[1],
),
],
)
self.assertEqual(
2, self.driver.uninstall_port_pair_egress_flows.call_count)
@utils.with_local_objects(fc1, fc2, pp11, pp12, ppg1, pc1, *l2_objs)
def test_port_pair_ingress_port_added(self):
pp11ingress.emit_local_created()
self.driver.uninstall_port_pair_group_flows.assert_called_once_with(
pc1, ppg1)
self.driver.install_port_pair_group_flows.assert_called_once_with(
pc1, ppg1)
@utils.with_local_objects(fc1, fc2, pp11, pp12, ppg1, pc1, *l2_objs)
def test_port_pair_ingress_port_deleted(self):
pp11ingress.emit_local_deleted()
self.driver.uninstall_port_pair_group_flows.assert_called_once_with(
pc1, ppg1)
self.driver.install_port_pair_group_flows.assert_called_once_with(
pc1, ppg1)
@utils.with_local_objects(fc1, fc2, pp11, pp12, ppg1, pc1, *l2_objs)
def test_port_pair_egress_port_added(self):
pp11egress.emit_local_created()
self.driver.install_port_pair_egress_flows.assert_called_once_with(
pc1, ppg1, pp11)
self.driver.uninstall_port_pair_egress_flows.assert_not_called()
@utils.with_local_objects(fc1, fc2, pp11, pp12, ppg1, pc1, *l2_objs)
def test_port_pair_egress_port_deleted(self):
pp11egress.emit_local_deleted()
self.driver.install_port_pair_egress_flows.assert_not_called()
self.driver.uninstall_port_pair_egress_flows.assert_called_once_with(
pc1, ppg1, pp11)
@utils.with_local_objects(fc1, fc2, pp11, pp12, ppg1, pc1, *l2_objs)
def test_flow_classifier_port_added(self):
fc1lport.emit_local_created()
self.driver.install_flow_classifier_local_port_flows\
.assert_called_once_with(pc1, fc1)
@utils.with_local_objects(fc1, fc2, pp11, pp12, ppg1, pc1, *l2_objs)
def test_flow_classifier_port_deleted(self):
fc1lport.emit_local_deleted()
self.driver.uninstall_flow_classifier_local_port_flows\
.assert_called_once_with(pc1, fc1)

View File

@ -96,6 +96,7 @@ dragonflow.controller.apps =
portqos = dragonflow.controller.apps.portqos:PortQosApp
portsec = dragonflow.controller.apps.portsec:PortSecApp
provider = dragonflow.controller.apps.provider:ProviderApp
sfc = dragonflow.controller.apps.sfc:SfcApp
sg = dragonflow.controller.apps.sg:SGApp
trunk = dragonflow.controller.apps.trunk:TrunkApp
tunneling = dragonflow.controller.apps.tunneling:TunnelingApp