@ -14,6 +14,7 @@
# under the License.
import etcd
from ipaddress import ip_network
from networking_vpp . constants import LEADIN
from networking_vpp import etcdutils
from networking_vpp . extension import VPPAgentExtensionBase
@ -22,6 +23,8 @@ from oslo_serialization import jsonutils
class TaasServiceAgentWatcher ( etcdutils . EtcdChangeWatcher ) :
""" Watch for changes in TaaS flow destinations. """
path = ' taas_service '
def __init__ ( self , host , etcd_client_factory , vppf ) :
@ -39,8 +42,17 @@ class TaasServiceAgentWatcher(etcdutils.EtcdChangeWatcher):
self . _node_key_space )
def added ( self , key , value ) :
""" New TAP service created
The TAP service nominates a port , which should be subverted for
tapping purposes .
"""
tap_service_id = key
# Structure is created in taas_vpp.py in mech driver
data = jsonutils . loads ( value )
# TODO(ijw): this does not respond to changes in the port after this
# moment
port_path = ( LEADIN + ' /nodes/ ' + self . _host + ' /ports/ ' +
str ( data [ ' tap_service ' ] [ ' port_id ' ] ) )
try :
@ -48,8 +60,16 @@ class TaasServiceAgentWatcher(etcdutils.EtcdChangeWatcher):
physnet = port_info [ ' physnet ' ]
network_type = port_info [ ' network_type ' ]
# Need to put the tapped packets in a dedicated VLAN
network_type = ' vlan '
# Tapped packets for this destination will be put on their own
# overlay
# NB we use VXLAN flows if the port was attached to a VXLAN
# network originally. The TaaS service object can't express
# carrying networks properly, and if we're using VXLAN networks
# there may be no L2 transport between hosts.
# For unknown types we fall back to VLAN - for VXLAN we require
# the GPE config.
if network_type != ' vxlan ' :
network_type = ' vlan '
port_path = ( LEADIN + ' /state/ ' + self . _host + ' /ports/ ' +
str ( data [ ' tap_service ' ] [ ' port_id ' ] ) )
@ -57,14 +77,40 @@ class TaasServiceAgentWatcher(etcdutils.EtcdChangeWatcher):
port_sw_if_idx = port_info [ ' iface_idx ' ]
old_bridge_domain_id = port_info [ ' net_data ' ] [ ' bridge_domain_id ' ]
bridge_data = self . vppf . ensure_network_on_host ( physnet ,
network_type ,
data [ ' taas_id ' ] )
if network_type == ' vxlan ' :
# TODO(ijw): magic number - will go away when we
# get BD tags
bd_idx = data [ ' taas_id ' ] + 64000
self . vppf . ensure_bridge_domain_in_vpp ( bd_idx )
bridge_data = {
' physnet ' : physnet ,
' if_physnet ' : - 1 ,
' bridge_domain_id ' : bd_idx ,
' network_type ' : network_type ,
' segmentation_id ' : data [ ' taas_id ' ] ,
}
else :
bridge_data = self . vppf . ensure_network_on_host ( physnet ,
network_type ,
data [ ' taas_id ' ] )
# Since we want all packets regardless of MAC to go to the other
# end, we want the bridge to flood
self . vppf . vpp . bridge_enable_flooding (
bridge_data [ ' bridge_domain_id ' ] )
# Tapped interfaces feed this bridge; this interface will
# receive the results.
# TODO(ijw): this does not prevent the receiving interface
# from transmitting traffic.
self . vppf . vpp . add_to_bridge ( bridge_data [ ' bridge_domain_id ' ] ,
port_sw_if_idx )
# TODO(ijw) we should not be using etcd for state storage -
# we might not get to store this if we're reset
# Instead we should be tagging the interface to mark it
# for finding on resync
props = { " ts " : data ,
" service_bridge " : bridge_data ,
" port " : { " iface_idx " : port_sw_if_idx ,
@ -77,22 +123,38 @@ class TaasServiceAgentWatcher(etcdutils.EtcdChangeWatcher):
pass
def removed ( self , key ) :
# Removing key == desire to unbind
""" A TAP Service has been deleted. """
try :
# rebind iface to appropriate bridge
# TODO(ijw): this should discover info from VPP tags, not use
# etcd as a state store.
taas_path = self . _state_key_space + ' / ' + key
tap_service_info = jsonutils . loads (
self . etcd_client . read ( taas_path ) . value )
physnet = tap_service_info [ ' service_bridge ' ] [ ' physnet ' ]
net_type = tap_service_info [ ' service_bridge ' ] [ ' network_type ' ]
seg_id = tap_service_info [ ' service_bridge ' ] [ ' segmentation_id ' ]
self . vppf . delete_network_on_host ( physnet , net_type , seg_id )
# put back the port to the old bridge
# TODO(ijw): this should just bind the port conventionally,
# using the port info, as this info can get outdated.
self . vppf . vpp . add_to_bridge (
tap_service_info [ ' port ' ] [ ' bridge_domain_id ' ] ,
tap_service_info [ ' port ' ] [ ' iface_idx ' ] )
# In the nominal case, the tap flows are deleted before the tap
# service.
# TODO(jb): The code needs to manage properly the case of deleting
# tap service with remaining tap flows.
# TODO(ijw): delete any mirroring on the host for this
# service.
physnet = tap_service_info [ ' service_bridge ' ] [ ' physnet ' ]
net_type = tap_service_info [ ' service_bridge ' ] [ ' network_type ' ]
seg_id = tap_service_info [ ' service_bridge ' ] [ ' segmentation_id ' ]
if net_type == ' vxlan ' :
bd_idx = tap_service_info [ ' service_bridge ' ] [ ' bridge_domain_id ' ]
self . vppf . vpp . delete_bridge_domain ( bd_idx )
else :
self . vppf . delete_network_on_host ( physnet , net_type , seg_id )
self . etcd_client . delete ( taas_path )
except etcd . EtcdKeyNotFound :
# Gone is fine, if we didn't delete it
@ -101,6 +163,10 @@ class TaasServiceAgentWatcher(etcdutils.EtcdChangeWatcher):
class TaasFlowAgentWatcher ( etcdutils . EtcdChangeWatcher ) :
""" Monitor state changes on flows
Flows are packet sources being mirrored .
"""
path = ' taas_flow '
def __init__ ( self , host , etcd_client_factory , vppf ) :
@ -118,83 +184,198 @@ class TaasFlowAgentWatcher(etcdutils.EtcdChangeWatcher):
self . path ,
self . _node_key_space )
def _find_port_idx ( self , port_id , host ) :
port_path = ( LEADIN + ' /state/ ' + host + ' /ports/ ' +
str ( port_id ) )
try :
# TODO(ijw): shouldn't be reading the port info, or relying on
# etcd's picture of the world. This is in an internal
# datastructure already.
port_info = jsonutils . loads ( self . etcd_client . read ( port_path ) . value )
port_idx = port_info [ ' iface_idx ' ]
except etcd . EtcdKeyNotFound :
port_idx = - 1
return port_idx
def _create_vxlan_tunnel ( self , dst_adr , vni ) :
""" Create a tunnel to a remote destination VTEP. """
dst_adr = self . vppf . _pack_address ( dst_adr )
# TODO(ijw) this reads all VXLAN tunnels from VPP every time
# and as such is not amazingly efficient.
vxtuns = self . vppf . vpp . get_vxlan_tunnels ( )
tidx = vxtuns . get ( ( vni , dst_adr , ) )
if tidx is not None :
return tidx
self . vppf . ensure_gpe_link ( )
src_adr = self . vppf . gpe_underlay_addr
if ip_network ( unicode ( src_adr ) ) . version == 6 :
is_ipv6 = 1
else :
is_ipv6 = 0
idx = self . vppf . vpp . create_vxlan_tunnel (
self . vppf . _pack_address ( src_adr ) ,
self . vppf . _pack_address ( dst_adr ) ,
is_ipv6 ,
vni )
return idx
def _delete_vxlan_tunnel ( self , dst_adr , vni ) :
""" Remove a VXLAN tunnel from VPP. """
self . vppf . ensure_gpe_link ( )
src_adr = self . vppf . gpe_underlay_addr
if ip_network ( unicode ( src_adr ) ) . version == 6 :
is_ipv6 = 1
else :
is_ipv6 = 0
self . vppf . vpp . delete_vxlan_tunnel (
self . vppf . _pack_address ( src_adr ) ,
self . vppf . _pack_address ( dst_adr ) ,
is_ipv6 ,
vni )
def _get_remote_addr ( self , port_mac ) :
self . vppf . load_gpe_mappings ( )
remote_ip = ' '
for mac_vni_tpl in self . vppf . gpe_map [ ' remote_map ' ] . keys ( ) :
mac , vni = mac_vni_tpl
if mac == port_mac :
remote_ip = self . vppf . gpe_map [ ' remote_map ' ] [ ( mac , vni ) ]
return remote_ip
def _get_num_flow ( self , tf_host , taas_id ) :
""" Get the number of open tap flows associated with a given taas_id """
num = 0
watch_space = LEADIN + ' /nodes/ %s /taas_flow ' % ( tf_host )
rv = self . etcd_client . read ( watch_space ,
recursive = True )
for f in rv . children :
if f . value is not None :
data = jsonutils . loads ( f . value )
if data [ ' taas_id ' ] == taas_id :
num = num + 1
return num
def added ( self , key , value ) :
# Create or update == bind
""" New TAP flow created. """
flow_id = key
data = jsonutils . loads ( value )
# data = value
taas_id = data [ ' taas_id ' ]
direction = data [ ' tap_flow ' ] [ ' direction ' ]
port_path = ( LEADIN + ' /nodes/ ' + self . _host + ' /ports/ ' +
str ( data [ ' tap_flow ' ] [ ' source_port ' ] ) )
ts_host = data [ ' ts_host ' ]
tf_host = data [ ' tf_host ' ]
# Check Span direction
DIR_RX = 1
DIR_TX = 2
DIR_TX_RX = 3
if direction == ' IN ' :
direction = DIR_RX
elif direction == ' OUT ' :
direction = DIR_TX
else :
direction = DIR_TX_RX
tap_srv_id = data [ ' tap_flow ' ] [ ' tap_service_id ' ]
ts_path = ( LEADIN + ' /state_taas/ ' + ts_host +
' /taas_service/ ' + tap_srv_id )
try :
port_info = jsonutils . loads ( self . etcd_client . read ( port_path ) . value )
physnet = port_info [ ' physnet ' ]
network_type = port_info [ ' network_type ' ]
# Need to put the tapped packets in a dedicated VLAN
network_type = ' vlan '
port_path = ( LEADIN + ' /state/ ' + self . _host + ' /ports/ ' +
str ( data [ ' tap_flow ' ] [ ' source_port ' ] ) )
port_info = jsonutils . loads ( self . etcd_client . read ( port_path ) . value )
source_port_idx = port_info [ ' iface_idx ' ]
# get/create a numbered bridge domain for the service
service_bridge = self . vppf . ensure_network_on_host (
physnet , network_type , taas_id )
service_bridge_id = service_bridge [ ' bridge_domain_id ' ]
self . vppf . vpp . bridge_enable_flooding ( service_bridge_id )
# Check Span direction
if direction == ' IN ' :
direction = 1
elif direction == ' OUT ' :
direction = 2
else :
direction = 3
# Check if the tap flow is located in the same node
# as the tap service
tap_srv_id = data [ ' tap_flow ' ] [ ' tap_service_id ' ]
port_path = ( LEADIN + ' /state_taas/ ' + self . _host +
' /taas_service/ ' + tap_srv_id )
try :
srv_port_info = jsonutils . loads (
self . etcd_client . read ( port_path ) . value )
srv_port_idx = srv_port_info [ ' port ' ] [ ' iface_idx ' ]
ts_info = jsonutils . loads ( self . etcd_client . read ( ts_path ) . value )
network_type = ts_info [ ' service_bridge ' ] [ ' network_type ' ]
physnet = ts_info [ ' service_bridge ' ] [ ' physnet ' ]
if network_type != ' vxlan ' :
network_type = ' vlan '
# Check if tap flow and tap service are located in the same node
# Local Span
if ts_host == tf_host :
source_port_idx = self . _find_port_idx (
data [ ' tap_flow ' ] [ ' source_port ' ] , tf_host )
srv_port_idx = ts_info [ ' port ' ] [ ' iface_idx ' ]
# Local Span
dst_idx = srv_port_idx
remote_span = False
self . vppf . vpp . enable_port_mirroring ( source_port_idx ,
srv_port_idx ,
direction )
except etcd . EtcdKeyNotFound :
# Check if there is a pending request to create tap service
# in the same node (e.g. out of order etcd messages).
port_path = ( LEADIN + ' /node/ ' + self . _host +
' /taas_service/ ' + tap_srv_id )
try :
self . etcd_client . read ( port_path ) . value
return
except etcd . EtcdKeyNotFound :
# Set the tap_flow state in etcd
data = { " tf " : data ,
" port_idx " : source_port_idx ,
' dst_idx ' : dst_idx ,
' span_mode ' : 0 , # Local
' tfn ' : True
}
# Remote Span via vlan
elif network_type == ' vlan ' :
if self . _host == tf_host :
source_port_idx = self . _find_port_idx (
data [ ' tap_flow ' ] [ ' source_port ' ] , tf_host )
# get/create a numbered bridge domain for the service
service_bridge = self . vppf . ensure_network_on_host (
physnet , network_type , taas_id )
service_bridge_id = service_bridge [ ' bridge_domain_id ' ]
self . vppf . vpp . bridge_enable_flooding ( service_bridge_id )
# Remote Span
srv_uplink_idx = service_bridge [ ' if_uplink_idx ' ]
dst_idx = srv_uplink_idx
remote_span = True
self . vppf . vpp . enable_port_mirroring ( source_port_idx ,
srv_uplink_idx ,
direction )
# Set the tap_flow state in etcd
data = { " tf " : data ,
" service_bridge " : service_bridge ,
" port_idx " : source_port_idx ,
' dst_idx ' : dst_idx ,
' remote_span ' : remote_span
}
# Set the tap_flow state in etcd
data = { " tf " : data ,
" service_bridge " : service_bridge ,
" port_idx " : source_port_idx ,
' dst_idx ' : dst_idx ,
' span_mode ' : 1 , # vlan
' tfn ' : True
}
else :
# Set the tap_flow state in etcd
data = { " tf " : data ,
' span_mode ' : 1 , # vlan
' tfn ' : False
}
# Remote Span via vxlan
else :
if self . _host == tf_host :
dst_adr = self . _get_remote_addr ( data [ ' ts_port_mac ' ] )
else :
dst_adr = self . _get_remote_addr ( data [ ' port_mac ' ] )
source_port_idx = self . _find_port_idx (
data [ ' tap_flow ' ] [ ' source_port ' ] , tf_host )
vni = taas_id
tun_idx = self . _create_vxlan_tunnel ( dst_adr , vni )
# source_port_idx = -1
if self . _host == tf_host :
tfn = True
self . vppf . vpp . cross_connect ( tun_idx , 0 )
self . vppf . vpp . enable_port_mirroring ( source_port_idx ,
tun_idx ,
direction )
else :
tfn = False
bd_idx = taas_id + 64000
self . vppf . vpp . add_to_bridge ( bd_idx ,
tun_idx )
self . vppf . vpp . ifup ( tun_idx )
# Set the tap_flow state in etcd
data = { " tf " : data ,
" dst_idx " : tun_idx ,
" port_idx " : source_port_idx ,
' span_mode ' : 2 , # vxlan
' tfn ' : tfn ,
' dst_adr ' : dst_adr ,
' vni ' : vni
}
self . etcd_client . write ( self . _state_key_space +
' / %s ' % flow_id ,
jsonutils . dumps ( data ) )
@ -209,14 +390,16 @@ class TaasFlowAgentWatcher(etcdutils.EtcdChangeWatcher):
tap_flow_info = jsonutils . loads (
self . etcd_client . read ( taas_path ) . value )
dst_idx = tap_flow_info [ ' dst_idx ' ]
remote_span = tap_flow_info [ ' remote_span ' ]
self . vppf . vpp . disable_port_mirroring ( tap_flow_info [ ' port_idx ' ] ,
dst_idx )
span_mode = tap_flow_info [ ' span_mode ' ]
tfn = tap_flow_info [ ' tfn ' ]
if tfn :
dst_idx = tap_flow_info [ ' dst_idx ' ]
source_port_idx = tap_flow_info [ ' port_idx ' ]
self . vppf . vpp . disable_port_mirroring ( source_port_idx ,
dst_idx )
if remote_span :
if span_mode == 1 and tf n:
service_bridge = tap_flow_info [ ' service_bridge ' ]
# service_bridge_id = service_bridge['bridge_domain_id']
physnet = service_bridge [ ' physnet ' ]
net_type = service_bridge [ ' network_type ' ]
@ -230,6 +413,18 @@ class TaasFlowAgentWatcher(etcdutils.EtcdChangeWatcher):
if cnt == 0 :
self . vppf . delete_network_on_host ( physnet , net_type , seg_id )
elif span_mode == 2 : # vxlan
taas_id = tap_flow_info [ ' tf ' ] [ ' taas_id ' ]
tf_host = tap_flow_info [ ' tf ' ] [ ' tf_host ' ]
tf_nb = self . _get_num_flow ( tf_host , taas_id )
if tf_nb == 0 :
if tfn is False :
self . vppf . vpp . delete_from_bridge (
tap_flow_info [ ' dst_idx ' ] )
vni = tap_flow_info [ ' vni ' ]
dst_adr = tap_flow_info [ ' dst_adr ' ]
self . _delete_vxlan_tunnel ( dst_adr , vni )
self . etcd_client . delete ( self . _state_key_space +
' / %s ' % flow_id )
except etcd . EtcdKeyNotFound :
@ -256,31 +451,44 @@ class TaasPortAgentWatcher(etcdutils.EtcdChangeWatcher):
self . _port_key_space )
def _trigger_tap_service ( self , key , value ) :
""" Manage the creation of the tap services when agent is restarted """
self . _tsw . added ( key , value )
def _trigger_tap_flow ( self , key , value ) :
""" Manage the creation of the tap flows when agent is restarted """
self . _tfw . added ( key , value )
def _check_etcd_taas ( self , port_id ) :
""" Check pending taas creation
Wen the agent is restarted , the requests of port creation and
tap creation will occur in a random order . This function is used
to check if there is a pending tap service or flow creation for the
port that has just been created . It is required to properly manage
the case when the port creation request is received after the tap
creation request .
"""
watch_space = LEADIN + ' /nodes/ %s /taas_service ' % ( self . _host )
rv = self . etcd_client . read ( watch_space ,
recursive = True )
tap_srv_lst = list ( )
for f in rv . children :
data = jsonutils . loads ( f . value )
if data [ ' tap_service ' ] [ ' port_id ' ] == port_id :
tap_srv_lst . append ( f . key )
self . _trigger_tap_service ( f . key , f . value )
if f . value is not None :
data = jsonutils . loads ( f . value )
if data [ ' tap_service ' ] [ ' port_id ' ] == port_id :
tap_srv_lst . append ( f . key )
self . _trigger_tap_service ( f . key , f . value )
watch_space = LEADIN + ' /nodes/ %s /taas_flow ' % ( self . _host )
rv = self . etcd_client . read ( watch_space ,
recursive = True )
for f in rv . children :
data = jsonutils . loads ( f . value )
if data [ ' tap_flow ' ] [ ' source_port ' ] == port_id :
self . _trigger_tap_flow ( f . key , f . value )
elif data [ ' tap_flow ' ] [ ' tap_service_id ' ] in tap_srv_lst :
self . _trigger_tap_flow ( f . key , f . value )
if f . value is not None :
data = jsonutils . loads ( f . value )
if data [ ' tap_flow ' ] [ ' source_port ' ] == port_id :
self . _trigger_tap_flow ( f . key , f . value )
elif data [ ' tap_flow ' ] [ ' tap_service_id ' ] in tap_srv_lst :
self . _trigger_tap_flow ( f . key , f . value )
def added ( self , key , value ) :
self . _check_etcd_taas ( key )