Merge "Fix health_manager to exit without waiting"

This commit is contained in:
Zuul 2017-11-08 17:08:20 +00:00 committed by Gerrit Code Review
commit 0cb21c543c
5 changed files with 180 additions and 102 deletions

View File

@ -12,9 +12,14 @@
# License for the specific language governing permissions and limitations
# under the License.
#
import multiprocessing
import os
import signal
import sys
from futurist.periodics import PeriodicWorker
from oslo_config import cfg
from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
@ -30,30 +35,28 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__)
# Used for while true loops to allow testing
# TODO(johnsom) This will be removed with
# https://review.openstack.org/#/c/456420/
def true_func():
return True
def hm_listener():
# TODO(german): steved'or load those drivers
def hm_listener(exit_event):
# TODO(german): stevedore load those drivers
signal.signal(signal.SIGINT, signal.SIG_IGN)
udp_getter = heartbeat_udp.UDPStatusGetter(
update_db.UpdateHealthDb(),
update_db.UpdateStatsDb())
while True:
while not exit_event.is_set():
udp_getter.check()
def hm_health_check():
hm = health_manager.HealthManager()
while true_func():
try:
hm.health_check()
except Exception as e:
LOG.warning('Health Manager caught the following exception and '
'is restarting: {}'.format(e))
def hm_health_check(exit_event):
hm = health_manager.HealthManager(exit_event)
health_check = PeriodicWorker([(hm.health_check, None, None)],
schedule_strategy='aligned_last_finished')
def hm_exit(*args, **kwargs):
health_check.stop()
hm.executor.shutdown()
signal.signal(signal.SIGINT, hm_exit)
LOG.debug("Pausing before starting health check")
exit_event.wait(CONF.health_manager.heartbeat_timeout)
health_check.start()
def main():
@ -62,22 +65,33 @@ def main():
gmr.TextGuruMeditation.setup_autorun(version)
processes = []
exit_event = multiprocessing.Event()
hm_listener_proc = multiprocessing.Process(name='HM_listener',
target=hm_listener)
target=hm_listener,
args=(exit_event,))
processes.append(hm_listener_proc)
hm_health_check_proc = multiprocessing.Process(name='HM_health_check',
target=hm_health_check)
target=hm_health_check,
args=(exit_event,))
processes.append(hm_health_check_proc)
LOG.info("Health Manager listener process starts:")
hm_listener_proc.start()
LOG.info("Health manager check process starts:")
hm_health_check_proc.start()
def process_cleanup(*args, **kwargs):
LOG.info("Health Manager exiting due to signal")
exit_event.set()
os.kill(hm_health_check_proc.pid, signal.SIGINT)
hm_health_check_proc.join()
hm_listener_proc.terminate()
signal.signal(signal.SIGTERM, process_cleanup)
try:
for process in processes:
process.join()
except KeyboardInterrupt:
LOG.info("Health Manager existing due to signal")
hm_listener_proc.terminate()
hm_health_check_proc.terminate()
process_cleanup()

View File

@ -11,10 +11,12 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from concurrent import futures
import time
import functools
from futurist import periodics
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log as logging
@ -28,56 +30,88 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def wait_done_or_dead(futs, dead, check_timeout=1):
while True:
_done, not_done = futures.wait(futs, timeout=check_timeout)
if len(not_done) == 0:
break
if dead.is_set():
for fut in not_done:
# This may not actually be able to cancel, but try to
# if we can.
fut.cancel()
def update_stats_on_done(stats, fut):
# This utilizes the fact that python, non-primitive types are
# passed by reference (not by value)...
stats['failover_attempted'] += 1
try:
fut.result()
except futures.CancelledError:
stats['failover_cancelled'] += 1
except Exception:
stats['failover_failed'] += 1
class HealthManager(object):
def __init__(self):
def __init__(self, exit_event):
self.cw = cw.ControllerWorker()
self.threads = CONF.health_manager.failover_threads
self.executor = futures.ThreadPoolExecutor(max_workers=self.threads)
self.amp_health_repo = repo.AmphoraHealthRepository()
self.dead = exit_event
@periodics.periodic(CONF.health_manager.health_check_interval,
run_immediately=True)
def health_check(self):
amp_health_repo = repo.AmphoraHealthRepository()
stats = {
'failover_attempted': 0,
'failover_failed': 0,
'failover_cancelled': 0,
}
futs = []
while not self.dead.is_set():
lock_session = db_api.get_session(autocommit=False)
amp = None
try:
amp = self.amp_health_repo.get_stale_amphora(lock_session)
lock_session.commit()
except db_exc.DBDeadlock:
LOG.debug('Database reports deadlock. Skipping.')
lock_session.rollback()
except db_exc.RetryRequest:
LOG.debug('Database is requesting a retry. Skipping.')
lock_session.rollback()
except Exception:
with excutils.save_and_reraise_exception():
lock_session.rollback()
with futures.ThreadPoolExecutor(max_workers=self.threads) as executor:
# Don't start checking immediately, as the health manager may
# have been down for a while and amphorae not able to check in.
LOG.debug("Pausing before starting health check")
time.sleep(CONF.health_manager.heartbeat_timeout)
while True:
LOG.debug("Starting amphora health check")
failover_count = 0
while True:
if amp is None:
break
lock_session = db_api.get_session(autocommit=False)
amp = None
try:
amp = amp_health_repo.get_stale_amphora(lock_session)
lock_session.commit()
except db_exc.DBDeadlock:
LOG.debug('Database reports deadlock. Skipping.')
try:
lock_session.rollback()
except Exception:
pass
except db_exc.RetryRequest:
LOG.debug('Database is requesting a retry. Skipping.')
try:
lock_session.rollback()
except Exception:
pass
except Exception:
with excutils.save_and_reraise_exception():
try:
lock_session.rollback()
except Exception:
pass
if amp is None:
break
failover_count += 1
LOG.info("Stale amphora's id is: %s",
amp.amphora_id)
executor.submit(self.cw.failover_amphora,
amp.amphora_id)
if failover_count > 0:
LOG.info("Failed over %s amphora",
failover_count)
time.sleep(CONF.health_manager.health_check_interval)
LOG.info("Stale amphora's id is: %s", amp.amphora_id)
fut = self.executor.submit(
self.cw.failover_amphora, amp.amphora_id)
fut.add_done_callback(
functools.partial(update_stats_on_done, stats)
)
futs.append(fut)
if len(futs) == self.threads:
break
if futs:
LOG.info("Waiting for %s failovers to finish",
len(futs))
wait_done_or_dead(futs, self.dead)
if stats['failover_attempted'] > 0:
LOG.info("Attempted %s failovers of amphora",
stats['failover_attempted'])
LOG.info("Failed at %s failovers of amphora",
stats['failover_failed'])
LOG.info("Cancelled %s failovers of amphora",
stats['failover_cancelled'])
happy_failovers = stats['failover_attempted']
happy_failovers -= stats['failover_cancelled']
happy_failovers -= stats['failover_failed']
LOG.info("Successfully completed %s failovers of amphora",
happy_failovers)

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import signal
import mock
from octavia.cmd import health_manager
@ -23,40 +25,42 @@ class TestHealthManagerCMD(base.TestCase):
def setUp(self):
super(TestHealthManagerCMD, self).setUp()
@mock.patch('multiprocessing.Event')
@mock.patch('octavia.controller.healthmanager.'
'update_db.UpdateStatsDb')
@mock.patch('octavia.controller.healthmanager.'
'update_db.UpdateHealthDb')
@mock.patch('octavia.amphorae.drivers.health.'
'heartbeat_udp.UDPStatusGetter')
def test_hm_listener(self, mock_getter, mock_health, mock_stats):
def test_hm_listener(self, mock_getter, mock_health, mock_stats,
mock_event):
mock_event.is_set.side_effect = [False, False]
getter_mock = mock.MagicMock()
check_mock = mock.MagicMock()
getter_mock.check = check_mock
getter_mock.check.side_effect = [None, Exception('break')]
mock_getter.return_value = getter_mock
self.assertRaisesRegex(Exception, 'break',
health_manager.hm_listener)
self.assertRaisesRegexp(Exception, 'break',
health_manager.hm_listener, mock_event)
mock_getter.assert_called_once_with(mock_health(), mock_stats())
self.assertEqual(2, getter_mock.check.call_count)
@mock.patch('octavia.cmd.health_manager.true_func')
@mock.patch('multiprocessing.Event')
@mock.patch('futurist.periodics.PeriodicWorker.start')
@mock.patch('futurist.periodics.PeriodicWorker.__init__')
@mock.patch('signal.signal')
@mock.patch('octavia.controller.healthmanager.'
'health_manager.HealthManager')
def test_hm_health_check(self, mock_health, mock_true_func):
def test_hm_health_check(self, mock_health, mock_signal, mock_worker,
mock_start, mock_event):
mock_event.is_set.side_effect = [False, True]
hm_mock = mock.MagicMock()
mock_worker.return_value = None
health_check_mock = mock.MagicMock()
hm_mock.health_check = health_check_mock
hm_mock.health_check.side_effect = [None, Exception('break')]
mock_true_func.side_effect = [True, True, Exception('break')]
mock_health.return_value = hm_mock
self.assertRaisesRegex(Exception, 'break',
health_manager.hm_health_check)
mock_health.assert_called_once_with()
self.assertEqual(2, hm_mock.health_check.call_count)
def test_hm_true_func(self):
self.assertTrue(health_manager.true_func())
health_manager.hm_health_check(mock_event)
mock_health.assert_called_once_with(mock_event)
@mock.patch('multiprocessing.Process')
@mock.patch('octavia.common.service.prepare_service')
@ -73,13 +77,15 @@ class TestHealthManagerCMD(base.TestCase):
mock_listener_proc.join.assert_called_once_with()
mock_health_proc.join.assert_called_once_with()
@mock.patch('os.kill')
@mock.patch('multiprocessing.Process')
@mock.patch('octavia.common.service.prepare_service')
def test_main_keyboard_interrupt(self, mock_service, mock_process):
def test_main_keyboard_interrupt(self, mock_service, mock_process,
mock_kill):
mock_listener_proc = mock.MagicMock()
mock_health_proc = mock.MagicMock()
mock_join = mock.MagicMock()
mock_join.side_effect = KeyboardInterrupt
mock_join.side_effect = [KeyboardInterrupt, None]
mock_listener_proc.join = mock_join
mock_process.side_effect = [mock_listener_proc, mock_health_proc]
@ -89,4 +95,7 @@ class TestHealthManagerCMD(base.TestCase):
mock_listener_proc.start.assert_called_once_with()
mock_health_proc.start.assert_called_once_with()
mock_listener_proc.join.assert_called_once_with()
self.assertFalse(mock_health_proc.join.called)
mock_health_proc.join.assert_called_once_with()
mock_listener_proc.terminate.assert_called_once_with()
mock_kill.assert_called_once_with(mock_health_proc.pid,
signal.SIGINT)

View File

@ -12,7 +12,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import threading
import mock
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_utils import uuidutils
@ -44,22 +47,18 @@ class TestHealthManager(base.TestCase):
'ControllerWorker.failover_amphora')
@mock.patch('octavia.db.repositories.AmphoraHealthRepository.'
'get_stale_amphora')
@mock.patch('time.sleep')
@mock.patch('octavia.db.api.get_session')
def test_health_check_stale_amphora(self, session_mock,
sleep_mock, get_stale_amp_mock,
def test_health_check_stale_amphora(self, session_mock, get_stale_amp_mock,
failover_mock):
amphora_health = mock.MagicMock()
amphora_health.amphora_id = AMPHORA_ID
get_stale_amp_mock.side_effect = [amphora_health,
None,
TestException('test')]
get_stale_amp_mock.side_effect = [amphora_health, None]
hm = healthmanager.HealthManager()
self.assertRaises(TestException, hm.health_check)
exit_event = threading.Event()
hm = healthmanager.HealthManager(exit_event)
failover_mock.assert_called_once_with(AMPHORA_ID)
hm.health_check()
# Test DBDeadlock and RetryRequest exceptions
session_mock.reset_mock()
@ -70,6 +69,11 @@ class TestHealthManager(base.TestCase):
db_exc.DBDeadlock,
db_exc.RetryRequest(Exception('retry_test')),
TestException('test')]
# Test that a DBDeadlock does not raise an exception
self.assertIsNone(hm.health_check())
# Test that a RetryRequest does not raise an exception
self.assertIsNone(hm.health_check())
# Other exceptions should raise
self.assertRaises(TestException, hm.health_check)
self.assertEqual(3, mock_session.rollback.call_count)
@ -77,14 +81,30 @@ class TestHealthManager(base.TestCase):
'ControllerWorker.failover_amphora')
@mock.patch('octavia.db.repositories.AmphoraHealthRepository.'
'get_stale_amphora', return_value=None)
@mock.patch('time.sleep')
@mock.patch('octavia.db.api.get_session')
def test_health_check_nonestale_amphora(self, session_mock,
sleep_mock, get_stale_amp_mock,
failover_mock):
def test_health_check_nonstale_amphora(self, session_mock,
get_stale_amp_mock, failover_mock):
get_stale_amp_mock.side_effect = [None, TestException('test')]
hm = healthmanager.HealthManager()
self.assertRaises(TestException, hm.health_check)
exit_event = threading.Event()
hm = healthmanager.HealthManager(exit_event)
hm.health_check()
session_mock.assert_called_once_with(autocommit=False)
self.assertFalse(failover_mock.called)
@mock.patch('octavia.controller.worker.controller_worker.'
'ControllerWorker.failover_amphora')
@mock.patch('octavia.db.repositories.AmphoraHealthRepository.'
'get_stale_amphora', return_value=None)
@mock.patch('octavia.db.api.get_session')
def test_health_check_exit(self, session_mock, get_stale_amp_mock,
failover_mock):
get_stale_amp_mock.return_value = None
exit_event = threading.Event()
hm = healthmanager.HealthManager(exit_event)
hm.health_check()
session_mock.assert_called_once_with(autocommit=False)
self.assertFalse(failover_mock.called)

View File

@ -7,6 +7,7 @@ pecan!=1.0.2,!=1.0.3,!=1.0.4,!=1.2,>=1.0.0 # BSD
pbr!=2.1.0,>=2.0.0 # Apache-2.0
SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT
Babel!=2.4.0,>=2.3.4 # BSD
futurist>=1.2.0 # Apache-2.0
requests>=2.14.2 # Apache-2.0
rfc3986>=0.3.1 # Apache-2.0
keystoneauth1>=3.2.0 # Apache-2.0