Add support for lifecycle events in the libvirt driver

This wires up the libvirt compute driver to process async
events from libvirtd and emit lifecycle events. This makes
use of the native libvirt event loop to handle I/O processing
which requires a native thread.

The native thread uses a queue + self-pipe to forward events
onto a green thread. The green thread reads events off the
queue and uses the emit_event method to dispatch them to the
compute manager.

Blueprint: compute-driver-events
Change-Id: Icd2cb7081adde10420ae55beebe60350afe21379
Signed-off-by: Daniel P. Berrange <berrange@redhat.com>
This commit is contained in:
Daniel P. Berrange
2013-02-12 13:32:06 +00:00
parent f9437fdc63
commit 2b1d73b092
2 changed files with 121 additions and 1 deletions

View File

@@ -70,6 +70,17 @@ VIR_DOMAIN_CRASHED = 6
VIR_DOMAIN_XML_SECURE = 1
VIR_DOMAIN_EVENT_ID_LIFECYCLE = 0
VIR_DOMAIN_EVENT_DEFINED = 0
VIR_DOMAIN_EVENT_UNDEFINED = 1
VIR_DOMAIN_EVENT_STARTED = 2
VIR_DOMAIN_EVENT_SUSPENDED = 3
VIR_DOMAIN_EVENT_RESUMED = 4
VIR_DOMAIN_EVENT_STOPPED = 5
VIR_DOMAIN_EVENT_SHUTDOWN = 6
VIR_DOMAIN_EVENT_PMSUSPENDED = 7
VIR_DOMAIN_UNDEFINE_MANAGED_SAVE = 1
VIR_DOMAIN_AFFECT_CURRENT = 0
@@ -506,6 +517,7 @@ class Connection(object):
self._running_vms = {}
self._id_counter = 1 # libvirt reserves 0 for the hypervisor.
self._nwfilters = {}
self._event_callbacks = {}
self.fakeLibVersion = version
self.fakeVersion = version
@@ -517,6 +529,7 @@ class Connection(object):
def _mark_running(self, dom):
self._running_vms[self._id_counter] = dom
self._emit_lifecycle(dom, VIR_DOMAIN_EVENT_STARTED, 0)
self._id_counter += 1
def _mark_not_running(self, dom):
@@ -528,10 +541,13 @@ class Connection(object):
for (k, v) in self._running_vms.iteritems():
if v == dom:
del self._running_vms[k]
self._emit_lifecycle(dom, VIR_DOMAIN_EVENT_STOPPED, 0)
return
def _undefine(self, dom):
del self._vms[dom.name()]
if not dom._transient:
self._emit_lifecycle(dom, VIR_DOMAIN_EVENT_UNDEFINED, 0)
def getInfo(self):
return [node_arch,
@@ -563,14 +579,25 @@ class Connection(object):
'name "%s"' % name,
VIR_ERR_NO_DOMAIN, VIR_FROM_QEMU)
def _emit_lifecycle(self, dom, event, detail):
if VIR_DOMAIN_EVENT_ID_LIFECYCLE not in self._event_callbacks:
return
cbinfo = self._event_callbacks[VIR_DOMAIN_EVENT_ID_LIFECYCLE]
callback = cbinfo[0]
opaque = cbinfo[1]
callback(self, dom, event, detail, opaque)
def defineXML(self, xml):
dom = Domain(connection=self, running=False, transient=False, xml=xml)
self._vms[dom.name()] = dom
self._emit_lifecycle(dom, VIR_DOMAIN_EVENT_DEFINED, 0)
return dom
def createXML(self, xml, flags):
dom = Domain(connection=self, running=True, transient=True, xml=xml)
self._vms[dom.name()] = dom
self._emit_lifecycle(dom, VIR_DOMAIN_EVENT_STARTED, 0)
return dom
def getType(self):
@@ -586,6 +613,9 @@ class Connection(object):
def getHostname(self):
return 'compute1'
def domainEventRegisterAny(self, dom, eventid, callback, opaque):
self._event_callbacks[eventid] = [callback, opaque]
def getCapabilities(self):
return '''<capabilities>
<host>
@@ -875,6 +905,14 @@ def openAuth(uri, auth, flags):
return Connection(uri, readonly=False)
def virEventRunDefaultImpl():
time.sleep(1)
def virEventRegisterDefaultImpl():
pass
virDomain = Domain

View File

@@ -52,6 +52,7 @@ from nova import utils
from nova import version
from nova.virt.disk import api as disk
from nova.virt import driver
from nova.virt import event as virtevent
from nova.virt import fake
from nova.virt import firewall as base_firewall
from nova.virt import images
@@ -99,7 +100,8 @@ class FakeVirDomainSnapshot(object):
class FakeVirtDomain(object):
def __init__(self, fake_xml=None):
def __init__(self, fake_xml=None, uuidstr=None):
self.uuidstr = uuidstr
if fake_xml:
self._fake_dom_xml = fake_xml
else:
@@ -131,6 +133,9 @@ class FakeVirtDomain(object):
def XMLDesc(self, *args):
return self._fake_dom_xml
def UUIDString(self):
return self.uuidstr
class CacheConcurrencyTestCase(test.TestCase):
def setUp(self):
@@ -3340,6 +3345,83 @@ class LibvirtConnTestCase(test.TestCase):
got = conn.get_instance_capabilities()
self.assertEqual(want, got)
def test_event_dispatch(self):
# Validate that the libvirt self-pipe for forwarding
# events between threads is working sanely
conn = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
got_events = []
def handler(event):
got_events.append(event)
conn.register_event_listener(handler)
conn._init_events_pipe()
event1 = virtevent.LifecycleEvent(
"cef19ce0-0ca2-11df-855d-b19fbce37686",
virtevent.EVENT_LIFECYCLE_STARTED)
event2 = virtevent.LifecycleEvent(
"cef19ce0-0ca2-11df-855d-b19fbce37686",
virtevent.EVENT_LIFECYCLE_PAUSED)
conn._queue_event(event1)
conn._queue_event(event2)
conn._dispatch_events()
want_events = [event1, event2]
self.assertEqual(want_events, got_events)
event3 = virtevent.LifecycleEvent(
"cef19ce0-0ca2-11df-855d-b19fbce37686",
virtevent.EVENT_LIFECYCLE_RESUMED)
event4 = virtevent.LifecycleEvent(
"cef19ce0-0ca2-11df-855d-b19fbce37686",
virtevent.EVENT_LIFECYCLE_STOPPED)
conn._queue_event(event3)
conn._queue_event(event4)
conn._dispatch_events()
want_events = [event1, event2, event3, event4]
self.assertEqual(want_events, got_events)
def test_event_lifecycle(self):
# Validate that libvirt events are correctly translated
# to Nova events
conn = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
got_events = []
def handler(event):
got_events.append(event)
conn.register_event_listener(handler)
conn._init_events_pipe()
fake_dom_xml = """
<domain type='kvm'>
<uuid>cef19ce0-0ca2-11df-855d-b19fbce37686</uuid>
<devices>
<disk type='file'>
<source file='filename'/>
</disk>
</devices>
</domain>
"""
dom = FakeVirtDomain(fake_dom_xml,
"cef19ce0-0ca2-11df-855d-b19fbce37686")
conn._event_lifecycle_callback(conn._conn,
dom,
libvirt.VIR_DOMAIN_EVENT_STOPPED,
0,
conn)
conn._dispatch_events()
self.assertEqual(len(got_events), 1)
self.assertEqual(type(got_events[0]), virtevent.LifecycleEvent)
self.assertEqual(got_events[0].uuid,
"cef19ce0-0ca2-11df-855d-b19fbce37686")
self.assertEqual(got_events[0].transition,
virtevent.EVENT_LIFECYCLE_STOPPED)
class HostStateTestCase(test.TestCase):