Fix health_manager to exit without waiting

When trying to exit health_manager the terminal
would hang due to a child process using time.sleep().
Now the process uses futurist.periodics to schedule
when to run which allows it to quickly and gracefully
exit.

Also handles the `failover_amphora` not working out or being
cancelled correctly and logging the statistics of those occurences
instead of incorrectly assuming everything always works out.

Co-Authored-By: Adam Harwell <flux.adam@gmail.com>
Co-Authored-By: Joshua Harlow <jxharlow@godaddy.com>

Change-Id: I870edaab73ab20a9322c8bc1bd2514897417d12a
This commit is contained in:
Jude Cross 2017-04-12 17:21:14 -07:00 committed by Michael Johnson
parent 504cb6c682
commit 7663430f06
5 changed files with 180 additions and 102 deletions

View File

@ -12,9 +12,14 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
# #
import multiprocessing import multiprocessing
import os
import signal
import sys import sys
from futurist.periodics import PeriodicWorker
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr from oslo_reports import guru_meditation_report as gmr
@ -30,30 +35,28 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
# Used for while true loops to allow testing def hm_listener(exit_event):
# TODO(johnsom) This will be removed with # TODO(german): stevedore load those drivers
# https://review.openstack.org/#/c/456420/ signal.signal(signal.SIGINT, signal.SIG_IGN)
def true_func():
return True
def hm_listener():
# TODO(german): steved'or load those drivers
udp_getter = heartbeat_udp.UDPStatusGetter( udp_getter = heartbeat_udp.UDPStatusGetter(
update_db.UpdateHealthDb(), update_db.UpdateHealthDb(),
update_db.UpdateStatsDb()) update_db.UpdateStatsDb())
while True: while not exit_event.is_set():
udp_getter.check() udp_getter.check()
def hm_health_check(): def hm_health_check(exit_event):
hm = health_manager.HealthManager() hm = health_manager.HealthManager(exit_event)
while true_func(): health_check = PeriodicWorker([(hm.health_check, None, None)],
try: schedule_strategy='aligned_last_finished')
hm.health_check()
except Exception as e: def hm_exit(*args, **kwargs):
LOG.warning('Health Manager caught the following exception and ' health_check.stop()
'is restarting: {}'.format(e)) hm.executor.shutdown()
signal.signal(signal.SIGINT, hm_exit)
LOG.debug("Pausing before starting health check")
exit_event.wait(CONF.health_manager.heartbeat_timeout)
health_check.start()
def main(): def main():
@ -62,22 +65,33 @@ def main():
gmr.TextGuruMeditation.setup_autorun(version) gmr.TextGuruMeditation.setup_autorun(version)
processes = [] processes = []
exit_event = multiprocessing.Event()
hm_listener_proc = multiprocessing.Process(name='HM_listener', hm_listener_proc = multiprocessing.Process(name='HM_listener',
target=hm_listener) target=hm_listener,
args=(exit_event,))
processes.append(hm_listener_proc) processes.append(hm_listener_proc)
hm_health_check_proc = multiprocessing.Process(name='HM_health_check', hm_health_check_proc = multiprocessing.Process(name='HM_health_check',
target=hm_health_check) target=hm_health_check,
args=(exit_event,))
processes.append(hm_health_check_proc) processes.append(hm_health_check_proc)
LOG.info("Health Manager listener process starts:") LOG.info("Health Manager listener process starts:")
hm_listener_proc.start() hm_listener_proc.start()
LOG.info("Health manager check process starts:") LOG.info("Health manager check process starts:")
hm_health_check_proc.start() hm_health_check_proc.start()
def process_cleanup(*args, **kwargs):
LOG.info("Health Manager exiting due to signal")
exit_event.set()
os.kill(hm_health_check_proc.pid, signal.SIGINT)
hm_health_check_proc.join()
hm_listener_proc.terminate()
signal.signal(signal.SIGTERM, process_cleanup)
try: try:
for process in processes: for process in processes:
process.join() process.join()
except KeyboardInterrupt: except KeyboardInterrupt:
LOG.info("Health Manager existing due to signal") process_cleanup()
hm_listener_proc.terminate()
hm_health_check_proc.terminate()

View File

@ -11,10 +11,12 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
#
from concurrent import futures from concurrent import futures
import time import functools
from futurist import periodics
from oslo_config import cfg from oslo_config import cfg
from oslo_db import exception as db_exc from oslo_db import exception as db_exc
from oslo_log import log as logging from oslo_log import log as logging
@ -28,56 +30,88 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def wait_done_or_dead(futs, dead, check_timeout=1):
while True:
_done, not_done = futures.wait(futs, timeout=check_timeout)
if len(not_done) == 0:
break
if dead.is_set():
for fut in not_done:
# This may not actually be able to cancel, but try to
# if we can.
fut.cancel()
def update_stats_on_done(stats, fut):
# This utilizes the fact that python, non-primitive types are
# passed by reference (not by value)...
stats['failover_attempted'] += 1
try:
fut.result()
except futures.CancelledError:
stats['failover_cancelled'] += 1
except Exception:
stats['failover_failed'] += 1
class HealthManager(object): class HealthManager(object):
def __init__(self): def __init__(self, exit_event):
self.cw = cw.ControllerWorker() self.cw = cw.ControllerWorker()
self.threads = CONF.health_manager.failover_threads self.threads = CONF.health_manager.failover_threads
self.executor = futures.ThreadPoolExecutor(max_workers=self.threads)
self.amp_health_repo = repo.AmphoraHealthRepository()
self.dead = exit_event
@periodics.periodic(CONF.health_manager.health_check_interval,
run_immediately=True)
def health_check(self): def health_check(self):
amp_health_repo = repo.AmphoraHealthRepository() stats = {
'failover_attempted': 0,
'failover_failed': 0,
'failover_cancelled': 0,
}
futs = []
while not self.dead.is_set():
lock_session = db_api.get_session(autocommit=False)
amp = None
try:
amp = self.amp_health_repo.get_stale_amphora(lock_session)
lock_session.commit()
except db_exc.DBDeadlock:
LOG.debug('Database reports deadlock. Skipping.')
lock_session.rollback()
except db_exc.RetryRequest:
LOG.debug('Database is requesting a retry. Skipping.')
lock_session.rollback()
except Exception:
with excutils.save_and_reraise_exception():
lock_session.rollback()
with futures.ThreadPoolExecutor(max_workers=self.threads) as executor: if amp is None:
# Don't start checking immediately, as the health manager may break
# have been down for a while and amphorae not able to check in.
LOG.debug("Pausing before starting health check")
time.sleep(CONF.health_manager.heartbeat_timeout)
while True:
LOG.debug("Starting amphora health check")
failover_count = 0
while True:
lock_session = db_api.get_session(autocommit=False) LOG.info("Stale amphora's id is: %s", amp.amphora_id)
amp = None fut = self.executor.submit(
try: self.cw.failover_amphora, amp.amphora_id)
amp = amp_health_repo.get_stale_amphora(lock_session) fut.add_done_callback(
lock_session.commit() functools.partial(update_stats_on_done, stats)
except db_exc.DBDeadlock: )
LOG.debug('Database reports deadlock. Skipping.') futs.append(fut)
try: if len(futs) == self.threads:
lock_session.rollback() break
except Exception: if futs:
pass LOG.info("Waiting for %s failovers to finish",
except db_exc.RetryRequest: len(futs))
LOG.debug('Database is requesting a retry. Skipping.') wait_done_or_dead(futs, self.dead)
try: if stats['failover_attempted'] > 0:
lock_session.rollback() LOG.info("Attempted %s failovers of amphora",
except Exception: stats['failover_attempted'])
pass LOG.info("Failed at %s failovers of amphora",
except Exception: stats['failover_failed'])
with excutils.save_and_reraise_exception(): LOG.info("Cancelled %s failovers of amphora",
try: stats['failover_cancelled'])
lock_session.rollback() happy_failovers = stats['failover_attempted']
except Exception: happy_failovers -= stats['failover_cancelled']
pass happy_failovers -= stats['failover_failed']
LOG.info("Successfully completed %s failovers of amphora",
if amp is None: happy_failovers)
break
failover_count += 1
LOG.info("Stale amphora's id is: %s",
amp.amphora_id)
executor.submit(self.cw.failover_amphora,
amp.amphora_id)
if failover_count > 0:
LOG.info("Failed over %s amphora",
failover_count)
time.sleep(CONF.health_manager.health_check_interval)

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import signal
import mock import mock
from octavia.cmd import health_manager from octavia.cmd import health_manager
@ -23,40 +25,42 @@ class TestHealthManagerCMD(base.TestCase):
def setUp(self): def setUp(self):
super(TestHealthManagerCMD, self).setUp() super(TestHealthManagerCMD, self).setUp()
@mock.patch('multiprocessing.Event')
@mock.patch('octavia.controller.healthmanager.' @mock.patch('octavia.controller.healthmanager.'
'update_db.UpdateStatsDb') 'update_db.UpdateStatsDb')
@mock.patch('octavia.controller.healthmanager.' @mock.patch('octavia.controller.healthmanager.'
'update_db.UpdateHealthDb') 'update_db.UpdateHealthDb')
@mock.patch('octavia.amphorae.drivers.health.' @mock.patch('octavia.amphorae.drivers.health.'
'heartbeat_udp.UDPStatusGetter') 'heartbeat_udp.UDPStatusGetter')
def test_hm_listener(self, mock_getter, mock_health, mock_stats): def test_hm_listener(self, mock_getter, mock_health, mock_stats,
mock_event):
mock_event.is_set.side_effect = [False, False]
getter_mock = mock.MagicMock() getter_mock = mock.MagicMock()
check_mock = mock.MagicMock() check_mock = mock.MagicMock()
getter_mock.check = check_mock getter_mock.check = check_mock
getter_mock.check.side_effect = [None, Exception('break')] getter_mock.check.side_effect = [None, Exception('break')]
mock_getter.return_value = getter_mock mock_getter.return_value = getter_mock
self.assertRaisesRegex(Exception, 'break', self.assertRaisesRegexp(Exception, 'break',
health_manager.hm_listener) health_manager.hm_listener, mock_event)
mock_getter.assert_called_once_with(mock_health(), mock_stats()) mock_getter.assert_called_once_with(mock_health(), mock_stats())
self.assertEqual(2, getter_mock.check.call_count) self.assertEqual(2, getter_mock.check.call_count)
@mock.patch('octavia.cmd.health_manager.true_func') @mock.patch('multiprocessing.Event')
@mock.patch('futurist.periodics.PeriodicWorker.start')
@mock.patch('futurist.periodics.PeriodicWorker.__init__')
@mock.patch('signal.signal')
@mock.patch('octavia.controller.healthmanager.' @mock.patch('octavia.controller.healthmanager.'
'health_manager.HealthManager') 'health_manager.HealthManager')
def test_hm_health_check(self, mock_health, mock_true_func): def test_hm_health_check(self, mock_health, mock_signal, mock_worker,
mock_start, mock_event):
mock_event.is_set.side_effect = [False, True]
hm_mock = mock.MagicMock() hm_mock = mock.MagicMock()
mock_worker.return_value = None
health_check_mock = mock.MagicMock() health_check_mock = mock.MagicMock()
hm_mock.health_check = health_check_mock hm_mock.health_check = health_check_mock
hm_mock.health_check.side_effect = [None, Exception('break')]
mock_true_func.side_effect = [True, True, Exception('break')]
mock_health.return_value = hm_mock mock_health.return_value = hm_mock
self.assertRaisesRegex(Exception, 'break', health_manager.hm_health_check(mock_event)
health_manager.hm_health_check) mock_health.assert_called_once_with(mock_event)
mock_health.assert_called_once_with()
self.assertEqual(2, hm_mock.health_check.call_count)
def test_hm_true_func(self):
self.assertTrue(health_manager.true_func())
@mock.patch('multiprocessing.Process') @mock.patch('multiprocessing.Process')
@mock.patch('octavia.common.service.prepare_service') @mock.patch('octavia.common.service.prepare_service')
@ -73,13 +77,15 @@ class TestHealthManagerCMD(base.TestCase):
mock_listener_proc.join.assert_called_once_with() mock_listener_proc.join.assert_called_once_with()
mock_health_proc.join.assert_called_once_with() mock_health_proc.join.assert_called_once_with()
@mock.patch('os.kill')
@mock.patch('multiprocessing.Process') @mock.patch('multiprocessing.Process')
@mock.patch('octavia.common.service.prepare_service') @mock.patch('octavia.common.service.prepare_service')
def test_main_keyboard_interrupt(self, mock_service, mock_process): def test_main_keyboard_interrupt(self, mock_service, mock_process,
mock_kill):
mock_listener_proc = mock.MagicMock() mock_listener_proc = mock.MagicMock()
mock_health_proc = mock.MagicMock() mock_health_proc = mock.MagicMock()
mock_join = mock.MagicMock() mock_join = mock.MagicMock()
mock_join.side_effect = KeyboardInterrupt mock_join.side_effect = [KeyboardInterrupt, None]
mock_listener_proc.join = mock_join mock_listener_proc.join = mock_join
mock_process.side_effect = [mock_listener_proc, mock_health_proc] mock_process.side_effect = [mock_listener_proc, mock_health_proc]
@ -89,4 +95,7 @@ class TestHealthManagerCMD(base.TestCase):
mock_listener_proc.start.assert_called_once_with() mock_listener_proc.start.assert_called_once_with()
mock_health_proc.start.assert_called_once_with() mock_health_proc.start.assert_called_once_with()
mock_listener_proc.join.assert_called_once_with() mock_listener_proc.join.assert_called_once_with()
self.assertFalse(mock_health_proc.join.called) mock_health_proc.join.assert_called_once_with()
mock_listener_proc.terminate.assert_called_once_with()
mock_kill.assert_called_once_with(mock_health_proc.pid,
signal.SIGINT)

View File

@ -12,7 +12,10 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import threading
import mock import mock
from oslo_config import cfg from oslo_config import cfg
from oslo_db import exception as db_exc from oslo_db import exception as db_exc
from oslo_utils import uuidutils from oslo_utils import uuidutils
@ -44,22 +47,18 @@ class TestHealthManager(base.TestCase):
'ControllerWorker.failover_amphora') 'ControllerWorker.failover_amphora')
@mock.patch('octavia.db.repositories.AmphoraHealthRepository.' @mock.patch('octavia.db.repositories.AmphoraHealthRepository.'
'get_stale_amphora') 'get_stale_amphora')
@mock.patch('time.sleep')
@mock.patch('octavia.db.api.get_session') @mock.patch('octavia.db.api.get_session')
def test_health_check_stale_amphora(self, session_mock, def test_health_check_stale_amphora(self, session_mock, get_stale_amp_mock,
sleep_mock, get_stale_amp_mock,
failover_mock): failover_mock):
amphora_health = mock.MagicMock() amphora_health = mock.MagicMock()
amphora_health.amphora_id = AMPHORA_ID amphora_health.amphora_id = AMPHORA_ID
get_stale_amp_mock.side_effect = [amphora_health, get_stale_amp_mock.side_effect = [amphora_health, None]
None,
TestException('test')]
hm = healthmanager.HealthManager() exit_event = threading.Event()
self.assertRaises(TestException, hm.health_check) hm = healthmanager.HealthManager(exit_event)
failover_mock.assert_called_once_with(AMPHORA_ID) hm.health_check()
# Test DBDeadlock and RetryRequest exceptions # Test DBDeadlock and RetryRequest exceptions
session_mock.reset_mock() session_mock.reset_mock()
@ -70,6 +69,11 @@ class TestHealthManager(base.TestCase):
db_exc.DBDeadlock, db_exc.DBDeadlock,
db_exc.RetryRequest(Exception('retry_test')), db_exc.RetryRequest(Exception('retry_test')),
TestException('test')] TestException('test')]
# Test that a DBDeadlock does not raise an exception
self.assertIsNone(hm.health_check())
# Test that a RetryRequest does not raise an exception
self.assertIsNone(hm.health_check())
# Other exceptions should raise
self.assertRaises(TestException, hm.health_check) self.assertRaises(TestException, hm.health_check)
self.assertEqual(3, mock_session.rollback.call_count) self.assertEqual(3, mock_session.rollback.call_count)
@ -77,14 +81,30 @@ class TestHealthManager(base.TestCase):
'ControllerWorker.failover_amphora') 'ControllerWorker.failover_amphora')
@mock.patch('octavia.db.repositories.AmphoraHealthRepository.' @mock.patch('octavia.db.repositories.AmphoraHealthRepository.'
'get_stale_amphora', return_value=None) 'get_stale_amphora', return_value=None)
@mock.patch('time.sleep')
@mock.patch('octavia.db.api.get_session') @mock.patch('octavia.db.api.get_session')
def test_health_check_nonestale_amphora(self, session_mock, def test_health_check_nonstale_amphora(self, session_mock,
sleep_mock, get_stale_amp_mock, get_stale_amp_mock, failover_mock):
failover_mock):
get_stale_amp_mock.side_effect = [None, TestException('test')] get_stale_amp_mock.side_effect = [None, TestException('test')]
hm = healthmanager.HealthManager() exit_event = threading.Event()
self.assertRaises(TestException, hm.health_check) hm = healthmanager.HealthManager(exit_event)
hm.health_check()
session_mock.assert_called_once_with(autocommit=False)
self.assertFalse(failover_mock.called)
@mock.patch('octavia.controller.worker.controller_worker.'
'ControllerWorker.failover_amphora')
@mock.patch('octavia.db.repositories.AmphoraHealthRepository.'
'get_stale_amphora', return_value=None)
@mock.patch('octavia.db.api.get_session')
def test_health_check_exit(self, session_mock, get_stale_amp_mock,
failover_mock):
get_stale_amp_mock.return_value = None
exit_event = threading.Event()
hm = healthmanager.HealthManager(exit_event)
hm.health_check()
session_mock.assert_called_once_with(autocommit=False)
self.assertFalse(failover_mock.called) self.assertFalse(failover_mock.called)

View File

@ -7,6 +7,7 @@ pecan!=1.0.2,!=1.0.3,!=1.0.4,!=1.2,>=1.0.0 # BSD
pbr!=2.1.0,>=2.0.0 # Apache-2.0 pbr!=2.1.0,>=2.0.0 # Apache-2.0
SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT
Babel!=2.4.0,>=2.3.4 # BSD Babel!=2.4.0,>=2.3.4 # BSD
futurist>=1.2.0 # Apache-2.0
requests>=2.14.2 # Apache-2.0 requests>=2.14.2 # Apache-2.0
rfc3986>=0.3.1 # Apache-2.0 rfc3986>=0.3.1 # Apache-2.0
keystoneauth1>=3.2.0 # Apache-2.0 keystoneauth1>=3.2.0 # Apache-2.0