Move WaitForPortBindingEvent out from testing code
The event is useful in other projects too. For example networking-ovn can make a use of it [1] [1] https://review.opendev.org/#/c/661628/1/networking_ovn/tests/functional/test_metadata_agent.py@59 Change-Id: I9d3953cb244806ed832665f24133e2d9f0556e16
This commit is contained in:
parent
ad69266e3f
commit
6a246a73a8
|
@ -0,0 +1,22 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from ovsdbapp.backend.ovs_idl import event
|
||||
|
||||
|
||||
class WaitForPortBindingEvent(event.WaitEvent):
|
||||
event_name = 'WaitForPortBindingEvent'
|
||||
|
||||
def __init__(self, port, timeout=5):
|
||||
super(WaitForPortBindingEvent, self).__init__(
|
||||
(self.ROW_CREATE,), 'Port_Binding', (('logical_port', '=', port),),
|
||||
timeout=timeout)
|
|
@ -10,23 +10,17 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from ovsdbapp.backend.ovs_idl import event
|
||||
from ovsdbapp.backend.ovs_idl import idlutils
|
||||
from ovsdbapp import event as ovsdb_event
|
||||
from ovsdbapp.schema.ovn_northbound import impl_idl as nbidl
|
||||
from ovsdbapp.schema.ovn_southbound import impl_idl
|
||||
from ovsdbapp.tests.functional import base
|
||||
from ovsdbapp.tests.functional.schema.ovn_southbound import event
|
||||
from ovsdbapp.tests.functional.schema.ovn_southbound import fixtures
|
||||
from ovsdbapp.tests import utils
|
||||
|
||||
|
||||
class WaitForPortBindingEvent(event.WaitEvent):
|
||||
event_name = 'WaitForPortBindingEvent'
|
||||
|
||||
def __init__(self, port, timeout=5):
|
||||
super(WaitForPortBindingEvent, self).__init__(
|
||||
(self.ROW_CREATE,), 'Port_Binding', (('logical_port', '=', port),),
|
||||
timeout=timeout)
|
||||
# Keep the class here for backward compatiblity
|
||||
WaitForPortBindingEvent = event.WaitForPortBindingEvent
|
||||
|
||||
|
||||
class OvnSouthboundTest(base.FunctionalTestCase):
|
||||
|
@ -93,7 +87,7 @@ class OvnSouthboundTest(base.FunctionalTestCase):
|
|||
cname, sname, pname = (utils.get_rand_device_name(prefix=p)
|
||||
for p in ("chassis", "switch", "port"))
|
||||
chassis = self._chassis_add(['vxlan'], '192.0.2.1', chassis=cname)
|
||||
row_event = WaitForPortBindingEvent(pname)
|
||||
row_event = event.WaitForPortBindingEvent(pname)
|
||||
# We have to wait for ovn-northd to actually create the port binding
|
||||
self.handler.watch_event(row_event)
|
||||
with self.nbapi.transaction(check_error=True) as txn:
|
||||
|
|
Loading…
Reference in New Issue