Merge "domainEventRegisterAny called too often"
This commit is contained in:
@@ -410,6 +410,9 @@ class LibvirtConnTestCase(test.TestCase):
|
||||
def getLibVersion(self):
|
||||
return (0 * 1000 * 1000) + (9 * 1000) + 11
|
||||
|
||||
def domainEventRegisterAny(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def registerCloseCallback(self, cb, opaque):
|
||||
pass
|
||||
|
||||
@@ -5562,18 +5565,24 @@ class LibvirtConnTestCase(test.TestCase):
|
||||
def connect_with_block(*a, **k):
|
||||
# enough to allow another connect to run
|
||||
eventlet.sleep(0)
|
||||
self.calls += 1
|
||||
self.connect_calls += 1
|
||||
return self.conn
|
||||
|
||||
self.calls = 0
|
||||
def fake_register(*a, **k):
|
||||
self.register_calls += 1
|
||||
|
||||
self.connect_calls = 0
|
||||
self.register_calls = 0
|
||||
self.stubs.Set(libvirt_driver.LibvirtDriver,
|
||||
'_connect', connect_with_block)
|
||||
driver = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
|
||||
self.stubs.Set(self.conn, 'domainEventRegisterAny', fake_register)
|
||||
|
||||
# call serially
|
||||
get_conn_currency(driver)
|
||||
get_conn_currency(driver)
|
||||
self.assertEqual(self.calls, 1)
|
||||
self.assertEqual(self.connect_calls, 1)
|
||||
self.assertEqual(self.register_calls, 1)
|
||||
|
||||
def test_get_connection_concurrency(self):
|
||||
|
||||
@@ -5583,13 +5592,18 @@ class LibvirtConnTestCase(test.TestCase):
|
||||
def connect_with_block(*a, **k):
|
||||
# enough to allow another connect to run
|
||||
eventlet.sleep(0)
|
||||
self.calls += 1
|
||||
self.connect_calls += 1
|
||||
return self.conn
|
||||
|
||||
self.calls = 0
|
||||
def fake_register(*a, **k):
|
||||
self.register_calls += 1
|
||||
|
||||
self.connect_calls = 0
|
||||
self.register_calls = 0
|
||||
self.stubs.Set(libvirt_driver.LibvirtDriver,
|
||||
'_connect', connect_with_block)
|
||||
driver = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
|
||||
self.stubs.Set(self.conn, 'domainEventRegisterAny', fake_register)
|
||||
|
||||
# call concurrently
|
||||
thr1 = eventlet.spawn(get_conn_currency, driver=driver)
|
||||
@@ -5600,7 +5614,8 @@ class LibvirtConnTestCase(test.TestCase):
|
||||
|
||||
thr1.wait()
|
||||
thr2.wait()
|
||||
self.assertEqual(self.calls, 1)
|
||||
self.assertEqual(self.connect_calls, 1)
|
||||
self.assertEqual(self.register_calls, 1)
|
||||
|
||||
|
||||
class HostStateTestCase(test.TestCase):
|
||||
|
||||
@@ -582,52 +582,56 @@ class LibvirtDriver(driver.ComputeDriver):
|
||||
|
||||
self._init_events()
|
||||
|
||||
def _get_new_connection(self):
|
||||
# call with _wrapped_conn_lock held
|
||||
LOG.debug(_('Connecting to libvirt: %s'), self.uri())
|
||||
wrapped_conn = None
|
||||
try:
|
||||
if not CONF.libvirt_nonblocking:
|
||||
wrapped_conn = self._connect(self.uri(), self.read_only)
|
||||
else:
|
||||
wrapped_conn = tpool.proxy_call(
|
||||
(libvirt.virDomain, libvirt.virConnect),
|
||||
self._connect, self.uri(), self.read_only)
|
||||
finally:
|
||||
# Enabling the compute service, in case it was disabled
|
||||
# since the connection was successful.
|
||||
is_connected = bool(wrapped_conn)
|
||||
self.set_host_enabled(CONF.host, is_connected)
|
||||
|
||||
self._wrapped_conn = wrapped_conn
|
||||
|
||||
try:
|
||||
LOG.debug(_("Registering for lifecycle events %s") % str(self))
|
||||
wrapped_conn.domainEventRegisterAny(
|
||||
None,
|
||||
libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE,
|
||||
self._event_lifecycle_callback,
|
||||
self)
|
||||
except Exception as e:
|
||||
LOG.warn(_("URI %(uri)s does not support events: %(error)s"),
|
||||
{'uri': self.uri(), 'error': e})
|
||||
|
||||
if self._has_min_version(wrapped_conn,
|
||||
MIN_LIBVIRT_CLOSE_CALLBACK_VERSION):
|
||||
try:
|
||||
LOG.debug(_("Registering for connection events: %s") %
|
||||
str(self))
|
||||
wrapped_conn.registerCloseCallback(
|
||||
self._close_callback, None)
|
||||
except libvirt.libvirtError as e:
|
||||
LOG.warn(_("URI %(uri)s does not support connection"
|
||||
" events: %(error)s"),
|
||||
{'uri': self.uri(), 'error': e})
|
||||
|
||||
return wrapped_conn
|
||||
|
||||
def _get_connection(self):
|
||||
# multiple concurrent connections are protected by _wrapped_conn_lock
|
||||
with self._wrapped_conn_lock:
|
||||
wrapped_conn = self._wrapped_conn
|
||||
|
||||
if not wrapped_conn or not self._test_connection(wrapped_conn):
|
||||
LOG.debug(_('Connecting to libvirt: %s'), self.uri())
|
||||
try:
|
||||
if not CONF.libvirt_nonblocking:
|
||||
wrapped_conn = self._connect(self.uri(),
|
||||
self.read_only)
|
||||
else:
|
||||
wrapped_conn = tpool.proxy_call(
|
||||
(libvirt.virDomain, libvirt.virConnect),
|
||||
self._connect, self.uri(), self.read_only)
|
||||
finally:
|
||||
# Enabling the compute service, in case it was disabled
|
||||
# since the connection was successful.
|
||||
is_connected = bool(wrapped_conn)
|
||||
self.set_host_enabled(CONF.host, is_connected)
|
||||
|
||||
self._wrapped_conn = wrapped_conn
|
||||
|
||||
try:
|
||||
LOG.debug(_("Registering for lifecycle events %s") %
|
||||
str(self))
|
||||
wrapped_conn.domainEventRegisterAny(
|
||||
None,
|
||||
libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE,
|
||||
self._event_lifecycle_callback,
|
||||
self)
|
||||
except Exception as e:
|
||||
LOG.warn(_("URI %(uri)s does not support events: %(error)s"),
|
||||
{'uri': self.uri(), 'error': e})
|
||||
|
||||
if self._has_min_version(wrapped_conn,
|
||||
MIN_LIBVIRT_CLOSE_CALLBACK_VERSION):
|
||||
try:
|
||||
LOG.debug(_("Registering for connection events: %s") %
|
||||
str(self))
|
||||
wrapped_conn.registerCloseCallback(
|
||||
self._close_callback, None)
|
||||
except libvirt.libvirtError as e:
|
||||
LOG.warn(_("URI %(uri)s does not support connection"
|
||||
" events: %(error)s"),
|
||||
{'uri': self.uri(), 'error': e})
|
||||
wrapped_conn = self._get_new_connection()
|
||||
|
||||
return wrapped_conn
|
||||
|
||||
|
||||
Reference in New Issue
Block a user