OpenFlow monitor

This OF monitor will track the changes in the OF list in a OVS bridge.
The events (ADDED, MODIFIED, DELETED) are stored internally in the
instance and can be retrieved using the property "of_events". Once
the stored events are read, the buffer is deleted.

Change-Id: I9fd571881bdd899d0ad0f5105dc48ddd123b86f6
Closes-Bug: #1848500
This commit is contained in:
Rodolfo Alonso Hernandez 2019-10-17 13:09:19 +00:00
parent 14efd10c1f
commit cdadaef53a
2 changed files with 182 additions and 0 deletions

View File

@ -0,0 +1,84 @@
# Copyright 2019 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import eventlet
from neutron.agent.common import async_process
class OFEvent(object):
def __init__(self, event_type, flow):
self.event_type = event_type
self.flow = flow
class OFMonitor(async_process.AsyncProcess):
"""Wrapper over 'ovs-ofctl monitor'.
This is an interactive OpenFlow monitor. By default, when the object is
instantiated, the monitor process is started. To retrieve the pending
events, the property "of_events" can be retrieved.
NOTE(ralonsoh): 'ovs-ofctl monitor' command is sending existing flows to
stdout pipe (startup first messages) and next incoming messages to stderr
pipe. That's why this class joins both outputs in one single queue
(self._queue).
"""
EVENT_RE = re.compile(
r"event=(?P<action>ADDED|DELETED|MODIFIED) (?P<flow>.*)")
def __init__(self, bridge_name, namespace=None, respawn_interval=None,
start=True):
cmd = ['ovs-ofctl', 'monitor', bridge_name, 'watch:', '--monitor']
super(OFMonitor, self).__init__(cmd, run_as_root=True,
respawn_interval=respawn_interval,
namespace=namespace)
if start:
self.start()
self._queue = eventlet.queue.Queue()
eventlet.spawn(self._read_and_enqueue, self.iter_stdout)
eventlet.spawn(self._read_and_enqueue, self.iter_stderr)
def _read_and_enqueue(self, iter):
for event_line in iter(block=True):
event = self._parse_event_line(event_line)
if event:
self._queue.put(event)
@property
def of_events(self):
events = []
while not self._queue.empty():
events.append(self._queue.get())
return events
def _parse_event_line(self, event_line):
match = self.EVENT_RE.match(event_line)
if match is None:
return
return OFEvent(match.group('action'), match.group('flow'))
def start(self, **kwargs):
if not self._is_running:
super(OFMonitor, self).start(block=True)
def stop(self, **kwargs):
if self._is_running:
super(OFMonitor, self).stop(block=True)

View File

@ -0,0 +1,98 @@
# Copyright 2019 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.agent.linux import of_monitor
from neutron.common import utils
from neutron.tests.common import net_helpers
from neutron.tests.functional import base as functional_base
class OFMonitorTestCase(functional_base.BaseSudoTestCase):
DEFAULT_FLOW = {'table': 0, 'cookie': '0', 'actions': 'NORMAL'}
def setUp(self):
super(OFMonitorTestCase, self).setUp()
self.bridge = self.useFixture(net_helpers.OVSBridgeFixture()).bridge
self.of_monitor = of_monitor.OFMonitor(self.bridge.br_name,
start=False)
self.addCleanup(self.of_monitor.stop)
def _format_flow(self, flow, event_type):
deleted = ''
if event_type == 'DELETED':
deleted = 'reason=delete'
table = 'table=%s' % flow['table']
cookie = flow.get('cookie') or hex(self.bridge._default_cookie)
# NOTE(ralonsoh): remove PY2 "L" suffix in longs
cookie = 'cookie=' + cookie.rstrip('L')
filters = []
if flow.get('in_port'):
filters.append('in_port=%s' % flow.get('in_port'))
if flow.get('dl_vlan'):
filters.append('dl_vlan=%s' % flow.get('dl_vlan'))
if flow.get('dl_src'):
filters.append('dl_src=%s' % flow.get('dl_src'))
filters = ','.join(filters)
actions = ''
if flow.get('actions'):
actions += 'actions=%s' % flow.get('actions')
flow_sections = [section for section
in (deleted, table, cookie, filters, actions)
if section]
return ' '.join(flow_sections)
def _check_flow(self, reference_flow, event_type):
def _read_and_check():
event = self.of_monitor.of_events
if len(event) == 1:
events_container.append(event[0])
return True
return False
events_container = []
try:
utils.wait_until_true(_read_and_check, timeout=5)
except utils.WaitTimeout:
self.fail('Flow "%s" with action %s not found' % (reference_flow,
event_type))
event = events_container.pop()
self.assertEqual(event_type, event.event_type)
self.assertEqual(self._format_flow(reference_flow, event_type),
event.flow)
def test_of_events(self):
self.of_monitor.start()
self._check_flow(self.DEFAULT_FLOW, 'ADDED')
flow = {'table': 10, 'in_port': 20, 'dl_vlan': 30,
'dl_src': '00:00:00:00:00:01', 'actions': 'NORMAL'}
self.bridge.add_flow(**flow)
self._check_flow(flow, 'ADDED')
flow['table'] = 50
self.bridge.add_flow(**flow)
self._check_flow(flow, 'ADDED')
flow['actions'] = 'resubmit:100'
self.bridge.mod_flow(**flow)
self._check_flow(flow, 'MODIFIED')
flow['table'] = 10
flow['actions'] = 'NORMAL'
flow_to_delete = flow.copy()
flow_to_delete.pop('actions')
self.bridge.delete_flows(**flow_to_delete)
self._check_flow(flow, 'DELETED')