From f5cd6e2deae6209505b1b2d1a55d0f9fe6a444d0 Mon Sep 17 00:00:00 2001 From: Balazs Gibizer Date: Mon, 25 Jan 2021 17:34:56 +0100 Subject: [PATCH] libvirt: add AsyncDeviceEventsHandler This class will be used as synchronization point between the device detach implementation and the generic libvirt event handling in the libvirt driver. This later makes it possible to wait for libvirt device removal events during device detach in a thread / greenlet safe way. The basic usage pattern is the following: setup: handler = AsyncDeviceEventsHandler() thread1: waiter = handler.create_waiter(, , ) # initiate detach in libvirt detach() # block until the event is received event = handler.wait(waiter, timeout=20) thread2: # at some point after detach() in thread1 handler.notify_waiters(event) This is part of the longer series trying to transform the existing device detach handling to use libvirt events. Change-Id: I7fc1ba2d2cbdb3f46f3649b1279b0c1a40457647 Related-Bug: #1882521 --- nova/tests/unit/virt/libvirt/test_driver.py | 140 ++++++++++++++++++++ nova/virt/libvirt/driver.py | 138 +++++++++++++++++++ 2 files changed, 278 insertions(+) diff --git a/nova/tests/unit/virt/libvirt/test_driver.py b/nova/tests/unit/virt/libvirt/test_driver.py index ec2835cdbbc9..6c22c8735840 100644 --- a/nova/tests/unit/virt/libvirt/test_driver.py +++ b/nova/tests/unit/virt/libvirt/test_driver.py @@ -27296,3 +27296,143 @@ class LibvirtDeviceRemoveEventTestCase(test.NoDBTestCase): drvr.emit_event(event) mock_base_handles.assert_not_called() mock_debug.assert_not_called() + + +class AsyncDeviceEventsHandlerTestCase(test.NoDBTestCase): + + def setUp(self): + super().setUp() + self.handler = libvirt_driver.AsyncDeviceEventsHandler() + + def assert_handler_clean(self): + self.assertEqual(set(), self.handler._waiters) + + def _call_parallel_after_a_delay(self, func): + def run(): + time.sleep(0.1) + func() + + thread = threading.Thread(target=run) + thread.start() + return thread + + def test_event_received_after_wait(self): + waiter = self.handler.create_waiter( + uuids.instance, 'virtio-1', {libvirtevent.DeviceRemovedEvent}) + + sent_event = libvirtevent.DeviceRemovedEvent( + uuids.instance, 'virtio-1') + + thread = self._call_parallel_after_a_delay( + lambda: self.handler.notify_waiters(sent_event)) + received_event = self.handler.wait(waiter, timeout=0.2) + thread.join() + + self.assertEqual(sent_event, received_event) + self.assert_handler_clean() + + def test_event_received_before_wait(self): + waiter = self.handler.create_waiter( + uuids.instance, 'virtio-1', {libvirtevent.DeviceRemovedEvent}) + sent_event = libvirtevent.DeviceRemovedEvent( + uuids.instance, 'virtio-1') + + had_waiter = self.handler.notify_waiters(sent_event) + received_event = self.handler.wait(waiter, timeout=0.1) + + self.assertTrue(had_waiter) + self.assertEqual(sent_event, received_event) + self.assert_handler_clean() + + def test_event_not_received(self): + waiter = self.handler.create_waiter( + uuids.instance, 'virtio-1', {libvirtevent.DeviceRemovedEvent}) + + received_event = self.handler.wait(waiter, timeout=0.1) + + self.assertIsNone(received_event) + self.assert_handler_clean() + + def test_event_received_without_waiter(self): + sent_event = libvirtevent.DeviceRemovedEvent( + uuids.instance, 'virtio-1') + + had_waiter = self.handler.notify_waiters(sent_event) + + self.assertFalse(had_waiter) + self.assert_handler_clean() + + def test_create_remove_waiter_without_event(self): + waiter = self.handler.create_waiter( + uuids.instance, 'virtio-1', {libvirtevent.DeviceRemovedEvent}) + self.handler.delete_waiter(waiter) + + self.assert_handler_clean() + + def test_waiter_cleanup(self): + inst1_dev1_waiter = self.handler.create_waiter( + uuids.instance1, 'virtio-1', {libvirtevent.DeviceRemovedEvent}) + inst1_dev2_waiter = self.handler.create_waiter( + uuids.instance1, + 'virtio-2', + {libvirtevent.DeviceRemovedEvent, + libvirtevent.DeviceRemovalFailedEvent}) + + inst2_waiter = self.handler.create_waiter( + uuids.instance2, + 'virtio-1', + {libvirtevent.DeviceRemovalFailedEvent}) + + self.handler.notify_waiters( + libvirtevent.DeviceRemovedEvent(uuids.instance1, 'virtio-2')) + self.handler.notify_waiters( + libvirtevent.DeviceRemovedEvent(uuids.instance2, 'virtio-1')) + + self.assertEqual(3, len(self.handler._waiters)) + + self.handler.delete_waiter(inst2_waiter) + + self.assertEqual(2, len(self.handler._waiters)) + + self.handler.cleanup_waiters(uuids.instance1) + + # we expect that the waiters are unblocked by the cleanup + self.assertTrue(inst1_dev1_waiter.threading_event.wait()) + self.assertTrue(inst1_dev2_waiter.threading_event.wait()) + self.assert_handler_clean() + + def test_multiple_clients_for_the_same_event(self): + waiter1 = self.handler.create_waiter( + uuids.instance, + 'virtio-1', + {libvirtevent.DeviceRemovedEvent, + libvirtevent.DeviceRemovalFailedEvent} + ) + + waiter2 = self.handler.create_waiter( + uuids.instance, + 'virtio-1', + {libvirtevent.DeviceRemovedEvent} + ) + + waiter3 = self.handler.create_waiter( + uuids.instance, + 'virtio-1', + {libvirtevent.DeviceRemovalFailedEvent} + ) + + sent_event = libvirtevent.DeviceRemovedEvent( + uuids.instance, 'virtio-1') + + had_waiter = self.handler.notify_waiters(sent_event) + + received_event1 = self.handler.wait(waiter1, timeout=0.1) + received_event2 = self.handler.wait(waiter2, timeout=0.1) + received_event3 = self.handler.wait(waiter3, timeout=0.1) + + self.assertTrue(had_waiter) + self.assertEqual(sent_event, received_event1) + self.assertEqual(sent_event, received_event2) + # the third client timed out + self.assertIsNone(received_event3) + self.assert_handler_clean() diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py index 7af56f7a3725..2639e276f199 100644 --- a/nova/virt/libvirt/driver.py +++ b/nova/virt/libvirt/driver.py @@ -41,6 +41,7 @@ import random import shutil import sys import tempfile +import threading import time import typing as ty import uuid @@ -242,6 +243,143 @@ VGPU_RESOURCE_SEMAPHORE = 'vgpu_resources' LIBVIRT_PERF_EVENT_PREFIX = 'VIR_PERF_PARAM_' +class AsyncDeviceEventsHandler: + """A synchornization point between libvirt events an clients waiting for + such events. + + It provides an interface for the clients to wait for one or more libvirt + event types. It implements event delivery by expecting the libvirt driver + to forward libvirt specific events to notify_waiters() + + It handles multiple clients for the same instance, device and event + type and delivers the event to each clients. + """ + + class Waiter: + def __init__( + self, + instance_uuid: str, + device_name: str, + event_types: ty.Set[ty.Type[libvirtevent.DeviceEvent]] + ): + self.instance_uuid = instance_uuid + self.device_name = device_name + self.event_types = event_types + self.threading_event = threading.Event() + self.result: ty.Optional[libvirtevent.DeviceEvent] = None + + def matches(self, event: libvirtevent.DeviceEvent) -> bool: + """Returns true if the event is one of the expected event types + for the given instance and device. + """ + return ( + self.instance_uuid == event.uuid and + self.device_name == event.dev and + isinstance(event, tuple(self.event_types))) + + def __repr__(self) -> str: + return ( + "AsyncDeviceEventsHandler.Waiter(" + f"instance_uuid={self.instance_uuid}, " + f"device_name={self.device_name}, " + f"event_types={self.event_types})") + + def __init__(self): + self._lock = threading.Lock() + # Ongoing device operations in libvirt where we wait for the events + # about success or failure. + self._waiters: ty.Set[AsyncDeviceEventsHandler.Waiter] = set() + + def create_waiter( + self, + instance_uuid: str, + device_name: str, + event_types: ty.Set[ty.Type[libvirtevent.DeviceEvent]] + ) -> 'AsyncDeviceEventsHandler.Waiter': + """Returns an opaque token the caller can use in wait() to + wait for the libvirt event + + :param instance_uuid: The UUID of the instance. + :param device_name: The device name alias used by libvirt for this + device. + :param event_type: A set of classes derived from DeviceEvent + specifying which event types the caller waits for. Specifying more + than one event type means waiting for either of the events to be + received. + :returns: an opaque token to be used with wait_for_event(). + """ + waiter = AsyncDeviceEventsHandler.Waiter( + instance_uuid, device_name, event_types) + with self._lock: + self._waiters.add(waiter) + + return waiter + + def delete_waiter(self, token: 'AsyncDeviceEventsHandler.Waiter'): + """Deletes the waiter + + :param token: the opaque token returned by create_waiter() to be + deleted + """ + with self._lock: + self._waiters.remove(token) + + def wait( + self, token: 'AsyncDeviceEventsHandler.Waiter', timeout: float, + ) -> ty.Optional[libvirtevent.DeviceEvent]: + """Blocks waiting for the libvirt event represented by the opaque token + + :param token: A token created by calling create_waiter() + :param timeout: Maximum number of seconds this call blocks waiting for + the event to be received + :returns: The received libvirt event, or None in case of timeout + """ + token.threading_event.wait(timeout) + + with self._lock: + self._waiters.remove(token) + + return token.result + + def notify_waiters(self, event: libvirtevent.DeviceEvent) -> bool: + """Unblocks the client waiting for this event. + + :param event: the libvirt event that is received + :returns: True if there was a client waiting and False otherwise. + """ + dispatched = False + with self._lock: + for waiter in self._waiters: + if waiter.matches(event): + waiter.result = event + waiter.threading_event.set() + dispatched = True + + return dispatched + + def cleanup_waiters(self, instance_uuid: str) -> None: + """Deletes all waiters and unblock all clients related to the specific + instance. + + param instance_uuid: The instance UUID for which the cleanup is + requested + """ + with self._lock: + instance_waiters = set() + for waiter in self._waiters: + if waiter.instance_uuid == instance_uuid: + # unblock any waiting thread + waiter.threading_event.set() + instance_waiters.add(waiter) + + self._waiters -= instance_waiters + + if instance_waiters: + LOG.debug( + 'Cleaned up device related libvirt event waiters: %s', + instance_waiters) + + class LibvirtDriver(driver.ComputeDriver): def __init__(self, virtapi, read_only=False): # NOTE(aspiers) Some of these are dynamic, so putting