Merge "libvirt: add AsyncDeviceEventsHandler"

This commit is contained in:
Zuul 2021-03-02 13:11:22 +00:00 committed by Gerrit Code Review
commit cefa7a8e14
2 changed files with 278 additions and 0 deletions

View File

@ -27554,3 +27554,143 @@ class LibvirtDeviceRemoveEventTestCase(test.NoDBTestCase):
drvr.emit_event(event)
mock_base_handles.assert_not_called()
mock_debug.assert_not_called()
class AsyncDeviceEventsHandlerTestCase(test.NoDBTestCase):
def setUp(self):
super().setUp()
self.handler = libvirt_driver.AsyncDeviceEventsHandler()
def assert_handler_clean(self):
self.assertEqual(set(), self.handler._waiters)
def _call_parallel_after_a_delay(self, func):
def run():
time.sleep(0.1)
func()
thread = threading.Thread(target=run)
thread.start()
return thread
def test_event_received_after_wait(self):
waiter = self.handler.create_waiter(
uuids.instance, 'virtio-1', {libvirtevent.DeviceRemovedEvent})
sent_event = libvirtevent.DeviceRemovedEvent(
uuids.instance, 'virtio-1')
thread = self._call_parallel_after_a_delay(
lambda: self.handler.notify_waiters(sent_event))
received_event = self.handler.wait(waiter, timeout=0.2)
thread.join()
self.assertEqual(sent_event, received_event)
self.assert_handler_clean()
def test_event_received_before_wait(self):
waiter = self.handler.create_waiter(
uuids.instance, 'virtio-1', {libvirtevent.DeviceRemovedEvent})
sent_event = libvirtevent.DeviceRemovedEvent(
uuids.instance, 'virtio-1')
had_waiter = self.handler.notify_waiters(sent_event)
received_event = self.handler.wait(waiter, timeout=0.1)
self.assertTrue(had_waiter)
self.assertEqual(sent_event, received_event)
self.assert_handler_clean()
def test_event_not_received(self):
waiter = self.handler.create_waiter(
uuids.instance, 'virtio-1', {libvirtevent.DeviceRemovedEvent})
received_event = self.handler.wait(waiter, timeout=0.1)
self.assertIsNone(received_event)
self.assert_handler_clean()
def test_event_received_without_waiter(self):
sent_event = libvirtevent.DeviceRemovedEvent(
uuids.instance, 'virtio-1')
had_waiter = self.handler.notify_waiters(sent_event)
self.assertFalse(had_waiter)
self.assert_handler_clean()
def test_create_remove_waiter_without_event(self):
waiter = self.handler.create_waiter(
uuids.instance, 'virtio-1', {libvirtevent.DeviceRemovedEvent})
self.handler.delete_waiter(waiter)
self.assert_handler_clean()
def test_waiter_cleanup(self):
inst1_dev1_waiter = self.handler.create_waiter(
uuids.instance1, 'virtio-1', {libvirtevent.DeviceRemovedEvent})
inst1_dev2_waiter = self.handler.create_waiter(
uuids.instance1,
'virtio-2',
{libvirtevent.DeviceRemovedEvent,
libvirtevent.DeviceRemovalFailedEvent})
inst2_waiter = self.handler.create_waiter(
uuids.instance2,
'virtio-1',
{libvirtevent.DeviceRemovalFailedEvent})
self.handler.notify_waiters(
libvirtevent.DeviceRemovedEvent(uuids.instance1, 'virtio-2'))
self.handler.notify_waiters(
libvirtevent.DeviceRemovedEvent(uuids.instance2, 'virtio-1'))
self.assertEqual(3, len(self.handler._waiters))
self.handler.delete_waiter(inst2_waiter)
self.assertEqual(2, len(self.handler._waiters))
self.handler.cleanup_waiters(uuids.instance1)
# we expect that the waiters are unblocked by the cleanup
self.assertTrue(inst1_dev1_waiter.threading_event.wait())
self.assertTrue(inst1_dev2_waiter.threading_event.wait())
self.assert_handler_clean()
def test_multiple_clients_for_the_same_event(self):
waiter1 = self.handler.create_waiter(
uuids.instance,
'virtio-1',
{libvirtevent.DeviceRemovedEvent,
libvirtevent.DeviceRemovalFailedEvent}
)
waiter2 = self.handler.create_waiter(
uuids.instance,
'virtio-1',
{libvirtevent.DeviceRemovedEvent}
)
waiter3 = self.handler.create_waiter(
uuids.instance,
'virtio-1',
{libvirtevent.DeviceRemovalFailedEvent}
)
sent_event = libvirtevent.DeviceRemovedEvent(
uuids.instance, 'virtio-1')
had_waiter = self.handler.notify_waiters(sent_event)
received_event1 = self.handler.wait(waiter1, timeout=0.1)
received_event2 = self.handler.wait(waiter2, timeout=0.1)
received_event3 = self.handler.wait(waiter3, timeout=0.1)
self.assertTrue(had_waiter)
self.assertEqual(sent_event, received_event1)
self.assertEqual(sent_event, received_event2)
# the third client timed out
self.assertIsNone(received_event3)
self.assert_handler_clean()

View File

@ -41,6 +41,7 @@ import random
import shutil
import sys
import tempfile
import threading
import time
import typing as ty
import uuid
@ -242,6 +243,143 @@ VGPU_RESOURCE_SEMAPHORE = 'vgpu_resources'
LIBVIRT_PERF_EVENT_PREFIX = 'VIR_PERF_PARAM_'
class AsyncDeviceEventsHandler:
"""A synchornization point between libvirt events an clients waiting for
such events.
It provides an interface for the clients to wait for one or more libvirt
event types. It implements event delivery by expecting the libvirt driver
to forward libvirt specific events to notify_waiters()
It handles multiple clients for the same instance, device and event
type and delivers the event to each clients.
"""
class Waiter:
def __init__(
self,
instance_uuid: str,
device_name: str,
event_types: ty.Set[ty.Type[libvirtevent.DeviceEvent]]
):
self.instance_uuid = instance_uuid
self.device_name = device_name
self.event_types = event_types
self.threading_event = threading.Event()
self.result: ty.Optional[libvirtevent.DeviceEvent] = None
def matches(self, event: libvirtevent.DeviceEvent) -> bool:
"""Returns true if the event is one of the expected event types
for the given instance and device.
"""
return (
self.instance_uuid == event.uuid and
self.device_name == event.dev and
isinstance(event, tuple(self.event_types)))
def __repr__(self) -> str:
return (
"AsyncDeviceEventsHandler.Waiter("
f"instance_uuid={self.instance_uuid}, "
f"device_name={self.device_name}, "
f"event_types={self.event_types})")
def __init__(self):
self._lock = threading.Lock()
# Ongoing device operations in libvirt where we wait for the events
# about success or failure.
self._waiters: ty.Set[AsyncDeviceEventsHandler.Waiter] = set()
def create_waiter(
self,
instance_uuid: str,
device_name: str,
event_types: ty.Set[ty.Type[libvirtevent.DeviceEvent]]
) -> 'AsyncDeviceEventsHandler.Waiter':
"""Returns an opaque token the caller can use in wait() to
wait for the libvirt event
:param instance_uuid: The UUID of the instance.
:param device_name: The device name alias used by libvirt for this
device.
:param event_type: A set of classes derived from DeviceEvent
specifying which event types the caller waits for. Specifying more
than one event type means waiting for either of the events to be
received.
:returns: an opaque token to be used with wait_for_event().
"""
waiter = AsyncDeviceEventsHandler.Waiter(
instance_uuid, device_name, event_types)
with self._lock:
self._waiters.add(waiter)
return waiter
def delete_waiter(self, token: 'AsyncDeviceEventsHandler.Waiter'):
"""Deletes the waiter
:param token: the opaque token returned by create_waiter() to be
deleted
"""
with self._lock:
self._waiters.remove(token)
def wait(
self, token: 'AsyncDeviceEventsHandler.Waiter', timeout: float,
) -> ty.Optional[libvirtevent.DeviceEvent]:
"""Blocks waiting for the libvirt event represented by the opaque token
:param token: A token created by calling create_waiter()
:param timeout: Maximum number of seconds this call blocks waiting for
the event to be received
:returns: The received libvirt event, or None in case of timeout
"""
token.threading_event.wait(timeout)
with self._lock:
self._waiters.remove(token)
return token.result
def notify_waiters(self, event: libvirtevent.DeviceEvent) -> bool:
"""Unblocks the client waiting for this event.
:param event: the libvirt event that is received
:returns: True if there was a client waiting and False otherwise.
"""
dispatched = False
with self._lock:
for waiter in self._waiters:
if waiter.matches(event):
waiter.result = event
waiter.threading_event.set()
dispatched = True
return dispatched
def cleanup_waiters(self, instance_uuid: str) -> None:
"""Deletes all waiters and unblock all clients related to the specific
instance.
param instance_uuid: The instance UUID for which the cleanup is
requested
"""
with self._lock:
instance_waiters = set()
for waiter in self._waiters:
if waiter.instance_uuid == instance_uuid:
# unblock any waiting thread
waiter.threading_event.set()
instance_waiters.add(waiter)
self._waiters -= instance_waiters
if instance_waiters:
LOG.debug(
'Cleaned up device related libvirt event waiters: %s',
instance_waiters)
class LibvirtDriver(driver.ComputeDriver):
def __init__(self, virtapi, read_only=False):
# NOTE(aspiers) Some of these are dynamic, so putting