Browse Source

Taas out of order etcd messages bug

Change-Id: I59dc7f6f2de360a62e367e818bdf3edb4e079cf0
changes/81/613381/1
jb 3 years ago
parent
commit
cd6cd52683
  1. 80
      networking_vpp/agent/taas_vpp_agent.py

80
networking_vpp/agent/taas_vpp_agent.py

@ -171,13 +171,22 @@ class TaasFlowAgentWatcher(etcdutils.EtcdChangeWatcher):
srv_port_idx,
direction)
except etcd.EtcdKeyNotFound:
# Remote Span
srv_uplink_idx = service_bridge['if_uplink_idx']
dst_idx = srv_uplink_idx
remote_span = True
self.vppf.vpp.enable_port_mirroring(source_port_idx,
srv_uplink_idx,
direction)
# Check if there is a pending request to create tap service
# in the same node (e.g. out of order etcd messages).
port_path = (LEADIN + '/node/' + self._host +
'/taas_service/' + tap_srv_id)
try:
self.etcd_client.read(port_path).value
return
except etcd.EtcdKeyNotFound:
# Remote Span
srv_uplink_idx = service_bridge['if_uplink_idx']
dst_idx = srv_uplink_idx
remote_span = True
self.vppf.vpp.enable_port_mirroring(source_port_idx,
srv_uplink_idx,
direction)
# Set the tap_flow state in etcd
data = {"tf": data,
@ -229,6 +238,57 @@ class TaasFlowAgentWatcher(etcdutils.EtcdChangeWatcher):
pass
# This watcher is needed to manage the out of order etcd messages
# (e.g when the agent is restarted)
class TaasPortAgentWatcher(etcdutils.EtcdChangeWatcher):
path = 'taas_port'
def __init__(self, host, etcd_client_factory, tsw, tfw):
self._port_key_space = LEADIN + '/state/%s/ports' % (host)
self.etcd_client = etcd_client_factory.client()
self._host = host
self._tsw = tsw
self._tfw = tfw
etcd_helper = etcdutils.EtcdHelper(self.etcd_client)
etcd_helper.ensure_dir(self._port_key_space)
super(TaasPortAgentWatcher, self).__init__(self.etcd_client,
self.path,
self._port_key_space)
def _trigger_tap_service(self, key, value):
self._tsw.added(key, value)
def _trigger_tap_flow(self, key, value):
self._tfw.added(key, value)
def _check_etcd_taas(self, port_id):
watch_space = LEADIN + '/nodes/%s/taas_service' % (self._host)
rv = self.etcd_client.read(watch_space,
recursive=True)
tap_srv_lst = list()
for f in rv.children:
data = jsonutils.loads(f.value)
if data['tap_service']['port_id'] == port_id:
tap_srv_lst.append(f.key)
self._trigger_tap_service(f.key, f.value)
watch_space = LEADIN + '/nodes/%s/taas_flow' % (self._host)
rv = self.etcd_client.read(watch_space,
recursive=True)
for f in rv.children:
data = jsonutils.loads(f.value)
if data['tap_flow']['source_port'] == port_id:
self._trigger_tap_flow(f.key, f.value)
elif data['tap_flow']['tap_service_id'] in tap_srv_lst:
self._trigger_tap_flow(f.key, f.value)
def added(self, key, value):
self._check_etcd_taas(key)
def removed(self, key):
pass
class TaasVPPAgentExtension(VPPAgentExtensionBase):
def initialize(self, manager):
pass
@ -240,5 +300,11 @@ class TaasVPPAgentExtension(VPPAgentExtensionBase):
self.taas_flow_watcher = TaasFlowAgentWatcher(host,
client_factory,
vpp_forwarder)
self.taas_port_watcher = TaasPortAgentWatcher(
host,
client_factory,
self.taas_service_watcher,
self.taas_flow_watcher)
gthread_pool.spawn(self.taas_service_watcher.watch_forever)
gthread_pool.spawn(self.taas_flow_watcher.watch_forever)
gthread_pool.spawn(self.taas_port_watcher.watch_forever)
Loading…
Cancel
Save