From 7663430f06fefdb749435b2e20df45af7245f75c Mon Sep 17 00:00:00 2001 From: Jude Cross Date: Wed, 12 Apr 2017 17:21:14 -0700 Subject: [PATCH] Fix health_manager to exit without waiting When trying to exit health_manager the terminal would hang due to a child process using time.sleep(). Now the process uses futurist.periodics to schedule when to run which allows it to quickly and gracefully exit. Also handles the `failover_amphora` not working out or being cancelled correctly and logging the statistics of those occurences instead of incorrectly assuming everything always works out. Co-Authored-By: Adam Harwell Co-Authored-By: Joshua Harlow Change-Id: I870edaab73ab20a9322c8bc1bd2514897417d12a --- octavia/cmd/health_manager.py | 60 ++++---- .../healthmanager/health_manager.py | 128 +++++++++++------- octavia/tests/unit/cmd/test_health_manager.py | 43 +++--- .../healthmanager/test_health_manager.py | 50 +++++-- requirements.txt | 1 + 5 files changed, 180 insertions(+), 102 deletions(-) diff --git a/octavia/cmd/health_manager.py b/octavia/cmd/health_manager.py index 19c886a9bc..cd5c4a750e 100644 --- a/octavia/cmd/health_manager.py +++ b/octavia/cmd/health_manager.py @@ -12,9 +12,14 @@ # License for the specific language governing permissions and limitations # under the License. # + import multiprocessing +import os +import signal import sys +from futurist.periodics import PeriodicWorker + from oslo_config import cfg from oslo_log import log as logging from oslo_reports import guru_meditation_report as gmr @@ -30,30 +35,28 @@ CONF = cfg.CONF LOG = logging.getLogger(__name__) -# Used for while true loops to allow testing -# TODO(johnsom) This will be removed with -# https://review.openstack.org/#/c/456420/ -def true_func(): - return True - - -def hm_listener(): - # TODO(german): steved'or load those drivers +def hm_listener(exit_event): + # TODO(german): stevedore load those drivers + signal.signal(signal.SIGINT, signal.SIG_IGN) udp_getter = heartbeat_udp.UDPStatusGetter( update_db.UpdateHealthDb(), update_db.UpdateStatsDb()) - while True: + while not exit_event.is_set(): udp_getter.check() -def hm_health_check(): - hm = health_manager.HealthManager() - while true_func(): - try: - hm.health_check() - except Exception as e: - LOG.warning('Health Manager caught the following exception and ' - 'is restarting: {}'.format(e)) +def hm_health_check(exit_event): + hm = health_manager.HealthManager(exit_event) + health_check = PeriodicWorker([(hm.health_check, None, None)], + schedule_strategy='aligned_last_finished') + + def hm_exit(*args, **kwargs): + health_check.stop() + hm.executor.shutdown() + signal.signal(signal.SIGINT, hm_exit) + LOG.debug("Pausing before starting health check") + exit_event.wait(CONF.health_manager.heartbeat_timeout) + health_check.start() def main(): @@ -62,22 +65,33 @@ def main(): gmr.TextGuruMeditation.setup_autorun(version) processes = [] + exit_event = multiprocessing.Event() hm_listener_proc = multiprocessing.Process(name='HM_listener', - target=hm_listener) + target=hm_listener, + args=(exit_event,)) processes.append(hm_listener_proc) hm_health_check_proc = multiprocessing.Process(name='HM_health_check', - target=hm_health_check) + target=hm_health_check, + args=(exit_event,)) processes.append(hm_health_check_proc) + LOG.info("Health Manager listener process starts:") hm_listener_proc.start() LOG.info("Health manager check process starts:") hm_health_check_proc.start() + def process_cleanup(*args, **kwargs): + LOG.info("Health Manager exiting due to signal") + exit_event.set() + os.kill(hm_health_check_proc.pid, signal.SIGINT) + hm_health_check_proc.join() + hm_listener_proc.terminate() + + signal.signal(signal.SIGTERM, process_cleanup) + try: for process in processes: process.join() except KeyboardInterrupt: - LOG.info("Health Manager existing due to signal") - hm_listener_proc.terminate() - hm_health_check_proc.terminate() + process_cleanup() diff --git a/octavia/controller/healthmanager/health_manager.py b/octavia/controller/healthmanager/health_manager.py index 2776b379a1..639195bded 100644 --- a/octavia/controller/healthmanager/health_manager.py +++ b/octavia/controller/healthmanager/health_manager.py @@ -11,10 +11,12 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +# from concurrent import futures -import time +import functools +from futurist import periodics from oslo_config import cfg from oslo_db import exception as db_exc from oslo_log import log as logging @@ -28,56 +30,88 @@ CONF = cfg.CONF LOG = logging.getLogger(__name__) +def wait_done_or_dead(futs, dead, check_timeout=1): + while True: + _done, not_done = futures.wait(futs, timeout=check_timeout) + if len(not_done) == 0: + break + if dead.is_set(): + for fut in not_done: + # This may not actually be able to cancel, but try to + # if we can. + fut.cancel() + + +def update_stats_on_done(stats, fut): + # This utilizes the fact that python, non-primitive types are + # passed by reference (not by value)... + stats['failover_attempted'] += 1 + try: + fut.result() + except futures.CancelledError: + stats['failover_cancelled'] += 1 + except Exception: + stats['failover_failed'] += 1 + + class HealthManager(object): - def __init__(self): + def __init__(self, exit_event): self.cw = cw.ControllerWorker() self.threads = CONF.health_manager.failover_threads + self.executor = futures.ThreadPoolExecutor(max_workers=self.threads) + self.amp_health_repo = repo.AmphoraHealthRepository() + self.dead = exit_event + @periodics.periodic(CONF.health_manager.health_check_interval, + run_immediately=True) def health_check(self): - amp_health_repo = repo.AmphoraHealthRepository() + stats = { + 'failover_attempted': 0, + 'failover_failed': 0, + 'failover_cancelled': 0, + } + futs = [] + while not self.dead.is_set(): + lock_session = db_api.get_session(autocommit=False) + amp = None + try: + amp = self.amp_health_repo.get_stale_amphora(lock_session) + lock_session.commit() + except db_exc.DBDeadlock: + LOG.debug('Database reports deadlock. Skipping.') + lock_session.rollback() + except db_exc.RetryRequest: + LOG.debug('Database is requesting a retry. Skipping.') + lock_session.rollback() + except Exception: + with excutils.save_and_reraise_exception(): + lock_session.rollback() - with futures.ThreadPoolExecutor(max_workers=self.threads) as executor: - # Don't start checking immediately, as the health manager may - # have been down for a while and amphorae not able to check in. - LOG.debug("Pausing before starting health check") - time.sleep(CONF.health_manager.heartbeat_timeout) - while True: - LOG.debug("Starting amphora health check") - failover_count = 0 - while True: + if amp is None: + break - lock_session = db_api.get_session(autocommit=False) - amp = None - try: - amp = amp_health_repo.get_stale_amphora(lock_session) - lock_session.commit() - except db_exc.DBDeadlock: - LOG.debug('Database reports deadlock. Skipping.') - try: - lock_session.rollback() - except Exception: - pass - except db_exc.RetryRequest: - LOG.debug('Database is requesting a retry. Skipping.') - try: - lock_session.rollback() - except Exception: - pass - except Exception: - with excutils.save_and_reraise_exception(): - try: - lock_session.rollback() - except Exception: - pass - - if amp is None: - break - failover_count += 1 - LOG.info("Stale amphora's id is: %s", - amp.amphora_id) - executor.submit(self.cw.failover_amphora, - amp.amphora_id) - if failover_count > 0: - LOG.info("Failed over %s amphora", - failover_count) - time.sleep(CONF.health_manager.health_check_interval) + LOG.info("Stale amphora's id is: %s", amp.amphora_id) + fut = self.executor.submit( + self.cw.failover_amphora, amp.amphora_id) + fut.add_done_callback( + functools.partial(update_stats_on_done, stats) + ) + futs.append(fut) + if len(futs) == self.threads: + break + if futs: + LOG.info("Waiting for %s failovers to finish", + len(futs)) + wait_done_or_dead(futs, self.dead) + if stats['failover_attempted'] > 0: + LOG.info("Attempted %s failovers of amphora", + stats['failover_attempted']) + LOG.info("Failed at %s failovers of amphora", + stats['failover_failed']) + LOG.info("Cancelled %s failovers of amphora", + stats['failover_cancelled']) + happy_failovers = stats['failover_attempted'] + happy_failovers -= stats['failover_cancelled'] + happy_failovers -= stats['failover_failed'] + LOG.info("Successfully completed %s failovers of amphora", + happy_failovers) diff --git a/octavia/tests/unit/cmd/test_health_manager.py b/octavia/tests/unit/cmd/test_health_manager.py index 0c7ac5b16c..907e7f0394 100644 --- a/octavia/tests/unit/cmd/test_health_manager.py +++ b/octavia/tests/unit/cmd/test_health_manager.py @@ -12,6 +12,8 @@ # License for the specific language governing permissions and limitations # under the License. +import signal + import mock from octavia.cmd import health_manager @@ -23,40 +25,42 @@ class TestHealthManagerCMD(base.TestCase): def setUp(self): super(TestHealthManagerCMD, self).setUp() + @mock.patch('multiprocessing.Event') @mock.patch('octavia.controller.healthmanager.' 'update_db.UpdateStatsDb') @mock.patch('octavia.controller.healthmanager.' 'update_db.UpdateHealthDb') @mock.patch('octavia.amphorae.drivers.health.' 'heartbeat_udp.UDPStatusGetter') - def test_hm_listener(self, mock_getter, mock_health, mock_stats): + def test_hm_listener(self, mock_getter, mock_health, mock_stats, + mock_event): + mock_event.is_set.side_effect = [False, False] getter_mock = mock.MagicMock() check_mock = mock.MagicMock() getter_mock.check = check_mock getter_mock.check.side_effect = [None, Exception('break')] mock_getter.return_value = getter_mock - self.assertRaisesRegex(Exception, 'break', - health_manager.hm_listener) + self.assertRaisesRegexp(Exception, 'break', + health_manager.hm_listener, mock_event) mock_getter.assert_called_once_with(mock_health(), mock_stats()) self.assertEqual(2, getter_mock.check.call_count) - @mock.patch('octavia.cmd.health_manager.true_func') + @mock.patch('multiprocessing.Event') + @mock.patch('futurist.periodics.PeriodicWorker.start') + @mock.patch('futurist.periodics.PeriodicWorker.__init__') + @mock.patch('signal.signal') @mock.patch('octavia.controller.healthmanager.' 'health_manager.HealthManager') - def test_hm_health_check(self, mock_health, mock_true_func): + def test_hm_health_check(self, mock_health, mock_signal, mock_worker, + mock_start, mock_event): + mock_event.is_set.side_effect = [False, True] hm_mock = mock.MagicMock() + mock_worker.return_value = None health_check_mock = mock.MagicMock() hm_mock.health_check = health_check_mock - hm_mock.health_check.side_effect = [None, Exception('break')] - mock_true_func.side_effect = [True, True, Exception('break')] mock_health.return_value = hm_mock - self.assertRaisesRegex(Exception, 'break', - health_manager.hm_health_check) - mock_health.assert_called_once_with() - self.assertEqual(2, hm_mock.health_check.call_count) - - def test_hm_true_func(self): - self.assertTrue(health_manager.true_func()) + health_manager.hm_health_check(mock_event) + mock_health.assert_called_once_with(mock_event) @mock.patch('multiprocessing.Process') @mock.patch('octavia.common.service.prepare_service') @@ -73,13 +77,15 @@ class TestHealthManagerCMD(base.TestCase): mock_listener_proc.join.assert_called_once_with() mock_health_proc.join.assert_called_once_with() + @mock.patch('os.kill') @mock.patch('multiprocessing.Process') @mock.patch('octavia.common.service.prepare_service') - def test_main_keyboard_interrupt(self, mock_service, mock_process): + def test_main_keyboard_interrupt(self, mock_service, mock_process, + mock_kill): mock_listener_proc = mock.MagicMock() mock_health_proc = mock.MagicMock() mock_join = mock.MagicMock() - mock_join.side_effect = KeyboardInterrupt + mock_join.side_effect = [KeyboardInterrupt, None] mock_listener_proc.join = mock_join mock_process.side_effect = [mock_listener_proc, mock_health_proc] @@ -89,4 +95,7 @@ class TestHealthManagerCMD(base.TestCase): mock_listener_proc.start.assert_called_once_with() mock_health_proc.start.assert_called_once_with() mock_listener_proc.join.assert_called_once_with() - self.assertFalse(mock_health_proc.join.called) + mock_health_proc.join.assert_called_once_with() + mock_listener_proc.terminate.assert_called_once_with() + mock_kill.assert_called_once_with(mock_health_proc.pid, + signal.SIGINT) diff --git a/octavia/tests/unit/controller/healthmanager/test_health_manager.py b/octavia/tests/unit/controller/healthmanager/test_health_manager.py index d909655471..bfd3ef1dbf 100644 --- a/octavia/tests/unit/controller/healthmanager/test_health_manager.py +++ b/octavia/tests/unit/controller/healthmanager/test_health_manager.py @@ -12,7 +12,10 @@ # License for the specific language governing permissions and limitations # under the License. +import threading + import mock + from oslo_config import cfg from oslo_db import exception as db_exc from oslo_utils import uuidutils @@ -44,22 +47,18 @@ class TestHealthManager(base.TestCase): 'ControllerWorker.failover_amphora') @mock.patch('octavia.db.repositories.AmphoraHealthRepository.' 'get_stale_amphora') - @mock.patch('time.sleep') @mock.patch('octavia.db.api.get_session') - def test_health_check_stale_amphora(self, session_mock, - sleep_mock, get_stale_amp_mock, + def test_health_check_stale_amphora(self, session_mock, get_stale_amp_mock, failover_mock): amphora_health = mock.MagicMock() amphora_health.amphora_id = AMPHORA_ID - get_stale_amp_mock.side_effect = [amphora_health, - None, - TestException('test')] + get_stale_amp_mock.side_effect = [amphora_health, None] - hm = healthmanager.HealthManager() - self.assertRaises(TestException, hm.health_check) + exit_event = threading.Event() + hm = healthmanager.HealthManager(exit_event) - failover_mock.assert_called_once_with(AMPHORA_ID) + hm.health_check() # Test DBDeadlock and RetryRequest exceptions session_mock.reset_mock() @@ -70,6 +69,11 @@ class TestHealthManager(base.TestCase): db_exc.DBDeadlock, db_exc.RetryRequest(Exception('retry_test')), TestException('test')] + # Test that a DBDeadlock does not raise an exception + self.assertIsNone(hm.health_check()) + # Test that a RetryRequest does not raise an exception + self.assertIsNone(hm.health_check()) + # Other exceptions should raise self.assertRaises(TestException, hm.health_check) self.assertEqual(3, mock_session.rollback.call_count) @@ -77,14 +81,30 @@ class TestHealthManager(base.TestCase): 'ControllerWorker.failover_amphora') @mock.patch('octavia.db.repositories.AmphoraHealthRepository.' 'get_stale_amphora', return_value=None) - @mock.patch('time.sleep') @mock.patch('octavia.db.api.get_session') - def test_health_check_nonestale_amphora(self, session_mock, - sleep_mock, get_stale_amp_mock, - failover_mock): + def test_health_check_nonstale_amphora(self, session_mock, + get_stale_amp_mock, failover_mock): get_stale_amp_mock.side_effect = [None, TestException('test')] - hm = healthmanager.HealthManager() - self.assertRaises(TestException, hm.health_check) + exit_event = threading.Event() + hm = healthmanager.HealthManager(exit_event) + hm.health_check() + session_mock.assert_called_once_with(autocommit=False) + self.assertFalse(failover_mock.called) + + @mock.patch('octavia.controller.worker.controller_worker.' + 'ControllerWorker.failover_amphora') + @mock.patch('octavia.db.repositories.AmphoraHealthRepository.' + 'get_stale_amphora', return_value=None) + @mock.patch('octavia.db.api.get_session') + def test_health_check_exit(self, session_mock, get_stale_amp_mock, + failover_mock): + get_stale_amp_mock.return_value = None + + exit_event = threading.Event() + hm = healthmanager.HealthManager(exit_event) + hm.health_check() + + session_mock.assert_called_once_with(autocommit=False) self.assertFalse(failover_mock.called) diff --git a/requirements.txt b/requirements.txt index 0a06107860..1483446f84 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,7 @@ pecan!=1.0.2,!=1.0.3,!=1.0.4,!=1.2,>=1.0.0 # BSD pbr!=2.1.0,>=2.0.0 # Apache-2.0 SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT Babel!=2.4.0,>=2.3.4 # BSD +futurist>=1.2.0 # Apache-2.0 requests>=2.14.2 # Apache-2.0 rfc3986>=0.3.1 # Apache-2.0 keystoneauth1>=3.2.0 # Apache-2.0