Add delay queue for events
Certain events can come in a series. For instance, if you run a VM reboot, the hypervisor will emit a SHUTTING_DOWN, NOT_ACTIVATED and STARTING state. If these states are just fed to nova raw, due to the async nature it could come in out of order. So the ACTIVE event could come in before the NOT_ACTIVATED (even though that doesn't reflect how it came in to the host). This change follows the libvirt event mechanism and builds a delay queue. For certain powering off events, they will be delayed before they are emitted up to nova. If a corresponding starting event comes in afterwards, it will void out the shutting down event. Change-Id: Iefd33b6bf9e3621a9f716488bf43b02c7d3cb0c4 Closes-Bug: 1617746
This commit is contained in:
parent
c5f5c10283
commit
f98f022a28
|
@ -15,8 +15,7 @@
|
|||
# under the License.
|
||||
#
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
from eventlet import greenthread
|
||||
import logging
|
||||
import mock
|
||||
from nova import test
|
||||
|
@ -78,3 +77,117 @@ class TestPowerVMNovaEventHandler(test.TestCase):
|
|||
|
||||
self.assertTrue(self.mock_driver.emit_event.called)
|
||||
self.assertTrue(self.mock_driver.nvram_mgr.store.called)
|
||||
|
||||
|
||||
class TestPowerVMLifecycleEventHandler(test.TestCase):
|
||||
def setUp(self):
|
||||
super(TestPowerVMLifecycleEventHandler, self).setUp()
|
||||
self.mock_driver = mock.MagicMock()
|
||||
self.handler = event.PowerVMLifecycleEventHandler(self.mock_driver)
|
||||
|
||||
def test_is_delay_event(self):
|
||||
non_delay_evts = [
|
||||
pvm_bp.LPARState.ERROR,
|
||||
pvm_bp.LPARState.OPEN_FIRMWARE,
|
||||
pvm_bp.LPARState.RUNNING,
|
||||
pvm_bp.LPARState.MIGRATING_NOT_ACTIVE,
|
||||
pvm_bp.LPARState.MIGRATING_RUNNING,
|
||||
pvm_bp.LPARState.HARDWARE_DISCOVERY,
|
||||
pvm_bp.LPARState.STARTING,
|
||||
pvm_bp.LPARState.UNKNOWN
|
||||
]
|
||||
|
||||
delay_evts = [
|
||||
pvm_bp.LPARState.NOT_ACTIVATED,
|
||||
pvm_bp.LPARState.SHUTTING_DOWN,
|
||||
pvm_bp.LPARState.SUSPENDING,
|
||||
pvm_bp.LPARState.RESUMING,
|
||||
pvm_bp.LPARState.NOT_AVAILBLE
|
||||
]
|
||||
|
||||
for non_delay_evt in non_delay_evts:
|
||||
self.assertFalse(self.handler._is_delay_event(non_delay_evt),
|
||||
msg=non_delay_evt)
|
||||
|
||||
for delay_evt in delay_evts:
|
||||
self.assertTrue(self.handler._is_delay_event(delay_evt),
|
||||
msg=delay_evt)
|
||||
|
||||
@mock.patch('nova_powervm.virt.powervm.event.'
|
||||
'PowerVMLifecycleEventHandler._register_delayed_event')
|
||||
@mock.patch('nova_powervm.virt.powervm.event.'
|
||||
'PowerVMLifecycleEventHandler._emit_event')
|
||||
def test_process(self, mock_emit, mock_reg_delay_evt):
|
||||
non_delay_evts = [
|
||||
pvm_bp.LPARState.ERROR,
|
||||
pvm_bp.LPARState.OPEN_FIRMWARE
|
||||
]
|
||||
|
||||
delay_evts = [
|
||||
pvm_bp.LPARState.NOT_ACTIVATED,
|
||||
pvm_bp.LPARState.SHUTTING_DOWN,
|
||||
pvm_bp.LPARState.RESUMING,
|
||||
]
|
||||
|
||||
for state in non_delay_evts + delay_evts:
|
||||
self.handler.process(mock.Mock(), state)
|
||||
|
||||
self.assertEqual(mock_emit.call_count, 2)
|
||||
self.assertEqual(mock_reg_delay_evt.call_count, 3)
|
||||
|
||||
@mock.patch('nova_powervm.virt.powervm.event.vm.translate_event')
|
||||
def test_emit_event_immed(self, mock_translate):
|
||||
mock_translate.return_value = 'test'
|
||||
mock_delayed = mock.MagicMock()
|
||||
mock_inst = mock.Mock()
|
||||
mock_inst.uuid = 'inst_uuid'
|
||||
self.handler._delayed_event_threads = {'inst_uuid': mock_delayed}
|
||||
|
||||
self.handler._emit_event(pvm_bp.LPARState.RUNNING, mock_inst, True)
|
||||
|
||||
self.assertEqual({}, self.handler._delayed_event_threads)
|
||||
self.mock_driver.emit_event.assert_called_once()
|
||||
mock_delayed.cancel.assert_called_once()
|
||||
|
||||
@mock.patch('nova_powervm.virt.powervm.event.vm.translate_event')
|
||||
def test_emit_event_delayed(self, mock_translate):
|
||||
mock_translate.return_value = 'test'
|
||||
mock_delayed = mock.MagicMock()
|
||||
mock_inst = mock.Mock()
|
||||
mock_inst.uuid = 'inst_uuid'
|
||||
self.handler._delayed_event_threads = {'inst_uuid': mock_delayed}
|
||||
|
||||
self.handler._emit_event(pvm_bp.LPARState.NOT_ACTIVATED, mock_inst,
|
||||
False)
|
||||
|
||||
self.assertEqual({}, self.handler._delayed_event_threads)
|
||||
self.mock_driver.emit_event.assert_called_once()
|
||||
|
||||
def test_emit_event_delayed_no_queue(self):
|
||||
mock_inst = mock.Mock()
|
||||
mock_inst.uuid = 'inst_uuid'
|
||||
self.handler._delayed_event_threads = {}
|
||||
|
||||
self.handler._emit_event(pvm_bp.LPARState.NOT_ACTIVATED, mock_inst,
|
||||
False)
|
||||
|
||||
self.assertFalse(self.mock_driver.emit_event.called)
|
||||
|
||||
@mock.patch.object(greenthread, 'spawn_after')
|
||||
def test_register_delay_event(self, mock_spawn):
|
||||
mock_old_delayed, mock_new_delayed = mock.Mock(), mock.Mock()
|
||||
mock_spawn.return_value = mock_new_delayed
|
||||
|
||||
mock_inst = mock.Mock()
|
||||
mock_inst.uuid = 'inst_uuid'
|
||||
self.handler._delayed_event_threads = {'inst_uuid': mock_old_delayed}
|
||||
|
||||
self.handler._register_delayed_event(pvm_bp.LPARState.NOT_ACTIVATED,
|
||||
mock_inst)
|
||||
|
||||
mock_old_delayed.cancel.assert_called_once()
|
||||
mock_spawn.assert_called_once_with(
|
||||
15, self.handler._emit_event, pvm_bp.LPARState.NOT_ACTIVATED,
|
||||
mock_inst, False)
|
||||
self.assertEqual({'inst_uuid': mock_new_delayed},
|
||||
self.handler._delayed_event_threads)
|
||||
|
|
|
@ -15,11 +15,14 @@
|
|||
# under the License.
|
||||
|
||||
|
||||
from eventlet import greenthread
|
||||
from nova import context as ctx
|
||||
from nova.virt import event
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_log import log as logging
|
||||
from pypowervm import adapter as pvm_apt
|
||||
from pypowervm import util as pvm_util
|
||||
from pypowervm.wrappers import base_partition as pvm_bp
|
||||
from pypowervm.wrappers import event as pvm_evt
|
||||
|
||||
from nova_powervm.virt.powervm.i18n import _LI
|
||||
|
@ -28,6 +31,8 @@ from nova_powervm.virt.powervm import vm
|
|||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
_LIFECYCLE_EVT_LOCK = 'pvm_lifecycle_event'
|
||||
|
||||
|
||||
class PowerVMNovaEventHandler(pvm_apt.WrapperEventHandler):
|
||||
"""Used to receive and handle events from PowerVM and convert to Nova."""
|
||||
|
@ -35,6 +40,7 @@ class PowerVMNovaEventHandler(pvm_apt.WrapperEventHandler):
|
|||
|
||||
def __init__(self, driver):
|
||||
self._driver = driver
|
||||
self._lifecycle_handler = PowerVMLifecycleEventHandler(self._driver)
|
||||
|
||||
def _handle_event(self, pvm_event, details, inst=None):
|
||||
"""Handle an individual event.
|
||||
|
@ -85,17 +91,7 @@ class PowerVMNovaEventHandler(pvm_apt.WrapperEventHandler):
|
|||
# Get the current state
|
||||
pvm_state = vm.get_vm_qp(self._driver.adapter, pvm_uuid,
|
||||
'PartitionState')
|
||||
# See if it's really a change of state from what OpenStack knows
|
||||
transition = vm.translate_event(pvm_state, inst.power_state)
|
||||
if transition is not None:
|
||||
LOG.debug('New state for instance: %s', pvm_state,
|
||||
instance=inst)
|
||||
|
||||
# Now create an event and sent it.
|
||||
lce = event.LifecycleEvent(inst.uuid, transition)
|
||||
LOG.info(_LI('Sending life cycle event for instance state '
|
||||
'change to: %s'), pvm_state, instance=inst)
|
||||
self._driver.emit_event(lce)
|
||||
self._lifecycle_handler.process(inst, pvm_state)
|
||||
|
||||
# If the NVRAM has changed for this instance and a store is configured.
|
||||
if 'NVRAM' in details and self._driver.nvram_mgr is not None:
|
||||
|
@ -124,3 +120,89 @@ class PowerVMNovaEventHandler(pvm_apt.WrapperEventHandler):
|
|||
LOG.exception(e)
|
||||
LOG.warning(_LW('Unable to parse event URI: %s from PowerVM.'),
|
||||
pvm_event.data)
|
||||
|
||||
|
||||
class PowerVMLifecycleEventHandler(object):
|
||||
"""Because lifecycle events are weird, we need our own handler.
|
||||
|
||||
Lifecycle events that come back from the hypervisor are very 'surface
|
||||
value'. They tell you that it started, stopped, migrated, etc... However,
|
||||
multiple events may come in quickly that represent a bigger action. For
|
||||
instance a restart will generate a stop and then a start rapidly.
|
||||
|
||||
Nova being asynchronous can flip those events around. Where the start
|
||||
would flow through before the stop. That is bad.
|
||||
|
||||
We need to make sure that these events that can be linked to bigger
|
||||
lifecycle events can be wiped out if the converse action is run against
|
||||
it. Ex. Don't send a stop event up to nova if you received a start event
|
||||
shortly after it.
|
||||
"""
|
||||
|
||||
def __init__(self, driver):
|
||||
self._driver = driver
|
||||
self._delayed_event_threads = {}
|
||||
|
||||
def _is_delay_event(self, pvm_state):
|
||||
return pvm_state in [pvm_bp.LPARState.NOT_ACTIVATED,
|
||||
pvm_bp.LPARState.SHUTTING_DOWN,
|
||||
pvm_bp.LPARState.SUSPENDING,
|
||||
pvm_bp.LPARState.RESUMING,
|
||||
pvm_bp.LPARState.NOT_AVAILBLE]
|
||||
|
||||
@lockutils.synchronized(_LIFECYCLE_EVT_LOCK)
|
||||
def _register_delayed_event(self, pvm_state, inst):
|
||||
# Cancel out the current delay event. Can happen as it goes
|
||||
# from SHUTTING_DOWN to NOT_ACTIVATED, multiple delayed events
|
||||
# can come in at once. Only want the last.
|
||||
if inst.uuid in self._delayed_event_threads:
|
||||
self._delayed_event_threads[inst.uuid].cancel()
|
||||
|
||||
# Spawn in the background
|
||||
elem = greenthread.spawn_after(
|
||||
15, self._emit_event, pvm_state, inst, False)
|
||||
self._delayed_event_threads[inst.uuid] = elem
|
||||
|
||||
@lockutils.synchronized(_LIFECYCLE_EVT_LOCK)
|
||||
def _emit_event(self, pvm_state, inst, is_immed):
|
||||
if is_immed:
|
||||
# Cancel out any delayed events
|
||||
cancel_thread = self._delayed_event_threads.get(inst.uuid)
|
||||
if cancel_thread:
|
||||
cancel_thread.cancel()
|
||||
del self._delayed_event_threads[inst.uuid]
|
||||
else:
|
||||
# Make sure you're still in the thread. If not (thread was started
|
||||
# but the is_immed _emit_event had run the del), then just bail
|
||||
inst_queue = self._delayed_event_threads.get(inst.uuid)
|
||||
if not inst_queue:
|
||||
return
|
||||
|
||||
# See if it's really a change of state from what OpenStack knows
|
||||
transition = vm.translate_event(pvm_state, inst.power_state)
|
||||
if transition is None:
|
||||
return
|
||||
|
||||
# Log as if normal event
|
||||
lce = event.LifecycleEvent(inst.uuid, transition)
|
||||
LOG.info(_LI('Sending life cycle event for instance state '
|
||||
'change to: %s'), pvm_state, instance=inst)
|
||||
self._driver.emit_event(lce)
|
||||
|
||||
if not is_immed:
|
||||
# Delete out the queue
|
||||
del self._delayed_event_threads[inst.uuid]
|
||||
|
||||
def process(self, inst, pvm_state):
|
||||
"""Adds the event to the emit queue.
|
||||
|
||||
:param inst: The nova instance.
|
||||
:param pvm_state: The PowerVM LPAR State.
|
||||
"""
|
||||
LOG.debug('New state for instance: %s', pvm_state,
|
||||
instance=inst)
|
||||
# Now create an event and sent it.
|
||||
if self._is_delay_event(pvm_state):
|
||||
self._register_delayed_event(pvm_state, inst)
|
||||
else:
|
||||
self._emit_event(pvm_state, inst, True)
|
||||
|
|
Loading…
Reference in New Issue