oslo.service/oslo_service/tests/test_service.py

716 lines
26 KiB
Python

# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for service class
"""
from __future__ import print_function
import logging
import multiprocessing
import os
import signal
import socket
import time
import traceback
import eventlet
from eventlet import event
import mock
from oslotest import base as test_base
from oslo_service import service
from oslo_service.tests import base
from oslo_service.tests import eventlet_service
LOG = logging.getLogger(__name__)
class ExtendedService(service.Service):
def test_method(self):
return 'service'
class ServiceManagerTestCase(test_base.BaseTestCase):
"""Test cases for Services."""
def test_override_manager_method(self):
serv = ExtendedService()
serv.start()
self.assertEqual('service', serv.test_method())
class ServiceWithTimer(service.Service):
def __init__(self, ready_event=None):
super(ServiceWithTimer, self).__init__()
self.ready_event = ready_event
def start(self):
super(ServiceWithTimer, self).start()
self.timer_fired = 0
self.tg.add_timer(1, self.timer_expired)
def wait(self):
if self.ready_event:
self.ready_event.set()
super(ServiceWithTimer, self).wait()
def timer_expired(self):
self.timer_fired = self.timer_fired + 1
class ServiceCrashOnStart(ServiceWithTimer):
def start(self):
super(ServiceCrashOnStart, self).start()
raise ValueError
class ServiceTestBase(base.ServiceBaseTestCase):
"""A base class for ServiceLauncherTest and ServiceRestartTest."""
def _spawn_service(self,
workers=1,
service_maker=None,
launcher_maker=None):
self.workers = workers
pid = os.fork()
if pid == 0:
os.setsid()
# NOTE(johannes): We can't let the child processes exit back
# into the unit test framework since then we'll have multiple
# processes running the same tests (and possibly forking more
# processes that end up in the same situation). So we need
# to catch all exceptions and make sure nothing leaks out, in
# particular SystemExit, which is raised by sys.exit(). We use
# os._exit() which doesn't have this problem.
status = 0
try:
serv = service_maker() if service_maker else ServiceWithTimer()
if launcher_maker:
launcher = launcher_maker()
launcher.launch_service(serv, workers=workers)
else:
launcher = service.launch(self.conf, serv, workers=workers)
status = launcher.wait()
except SystemExit as exc:
status = exc.code
except BaseException:
# We need to be defensive here too
try:
traceback.print_exc()
except BaseException:
print("Couldn't print traceback")
status = 2
# Really exit
os._exit(status or 0)
return pid
def _wait(self, cond, timeout):
start = time.time()
while not cond():
if time.time() - start > timeout:
break
time.sleep(.1)
def setUp(self):
super(ServiceTestBase, self).setUp()
# NOTE(markmc): ConfigOpts.log_opt_values() uses CONF.config-file
self.conf(args=[], default_config_files=[])
self.addCleanup(self.conf.reset)
self.addCleanup(self._reap_pid)
self.pid = 0
def _reap_pid(self):
if self.pid:
# Make sure all processes are stopped
os.kill(self.pid, signal.SIGTERM)
# Make sure we reap our test process
self._reap_test()
def _reap_test(self):
pid, status = os.waitpid(self.pid, 0)
self.pid = None
return status
class ServiceLauncherTest(ServiceTestBase):
"""Originally from nova/tests/integrated/test_multiprocess_api.py."""
def _spawn(self):
self.pid = self._spawn_service(workers=2)
# Wait at most 10 seconds to spawn workers
cond = lambda: self.workers == len(self._get_workers())
timeout = 10
self._wait(cond, timeout)
workers = self._get_workers()
self.assertEqual(len(workers), self.workers)
return workers
def _get_workers(self):
f = os.popen('ps ax -o pid,ppid,command')
# Skip ps header
f.readline()
processes = [tuple(int(p) for p in l.strip().split()[:2])
for l in f]
return [p for p, pp in processes if pp == self.pid]
def test_killed_worker_recover(self):
start_workers = self._spawn()
# kill one worker and check if new worker can come up
LOG.info('pid of first child is %s' % start_workers[0])
os.kill(start_workers[0], signal.SIGTERM)
# Wait at most 5 seconds to respawn a worker
cond = lambda: start_workers != self._get_workers()
timeout = 5
self._wait(cond, timeout)
# Make sure worker pids don't match
end_workers = self._get_workers()
LOG.info('workers: %r' % end_workers)
self.assertNotEqual(start_workers, end_workers)
def _terminate_with_signal(self, sig):
self._spawn()
os.kill(self.pid, sig)
# Wait at most 5 seconds to kill all workers
cond = lambda: not self._get_workers()
timeout = 5
self._wait(cond, timeout)
workers = self._get_workers()
LOG.info('workers: %r' % workers)
self.assertFalse(workers, 'No OS processes left.')
def test_terminate_sigkill(self):
self._terminate_with_signal(signal.SIGKILL)
status = self._reap_test()
self.assertTrue(os.WIFSIGNALED(status))
self.assertEqual(signal.SIGKILL, os.WTERMSIG(status))
def test_terminate_sigterm(self):
self._terminate_with_signal(signal.SIGTERM)
status = self._reap_test()
self.assertTrue(os.WIFEXITED(status))
self.assertEqual(0, os.WEXITSTATUS(status))
def test_crashed_service(self):
service_maker = lambda: ServiceCrashOnStart()
self.pid = self._spawn_service(service_maker=service_maker)
status = self._reap_test()
self.assertTrue(os.WIFEXITED(status))
self.assertEqual(1, os.WEXITSTATUS(status))
def test_child_signal_sighup(self):
start_workers = self._spawn()
os.kill(start_workers[0], signal.SIGHUP)
# Wait at most 5 seconds to respawn a worker
cond = lambda: start_workers != self._get_workers()
timeout = 5
self._wait(cond, timeout)
# Make sure worker pids match
end_workers = self._get_workers()
LOG.info('workers: %r' % end_workers)
self.assertEqual(start_workers, end_workers)
def test_parent_signal_sighup(self):
start_workers = self._spawn()
os.kill(self.pid, signal.SIGHUP)
def cond():
workers = self._get_workers()
return (len(workers) == len(start_workers) and
not set(start_workers).intersection(workers))
# Wait at most 5 seconds to respawn a worker
timeout = 10
self._wait(cond, timeout)
self.assertTrue(cond())
class ServiceRestartTest(ServiceTestBase):
def _spawn(self):
ready_event = multiprocessing.Event()
service_maker = lambda: ServiceWithTimer(ready_event=ready_event)
self.pid = self._spawn_service(service_maker=service_maker)
return ready_event
def test_service_restart(self):
ready = self._spawn()
timeout = 5
ready.wait(timeout)
self.assertTrue(ready.is_set(), 'Service never became ready')
ready.clear()
os.kill(self.pid, signal.SIGHUP)
ready.wait(timeout)
self.assertTrue(ready.is_set(), 'Service never back after SIGHUP')
def test_terminate_sigterm(self):
ready = self._spawn()
timeout = 5
ready.wait(timeout)
self.assertTrue(ready.is_set(), 'Service never became ready')
os.kill(self.pid, signal.SIGTERM)
status = self._reap_test()
self.assertTrue(os.WIFEXITED(status))
self.assertEqual(0, os.WEXITSTATUS(status))
def test_mutate_hook_service_launcher(self):
"""Test mutate_config_files is called by ServiceLauncher on SIGHUP.
Not using _spawn_service because ServiceLauncher doesn't fork and it's
simplest to stay all in one process.
"""
mutate = multiprocessing.Event()
self.conf.register_mutate_hook(lambda c, f: mutate.set())
launcher = service.launch(
self.conf, ServiceWithTimer(), restart_method='mutate')
self.assertFalse(mutate.is_set(), "Hook was called too early")
launcher.restart()
self.assertTrue(mutate.is_set(), "Hook wasn't called")
def test_mutate_hook_process_launcher(self):
"""Test mutate_config_files is called by ProcessLauncher on SIGHUP.
Forks happen in _spawn_service and ProcessLauncher. So we get three
tiers of processes, the top tier being the test process. self.pid
refers to the middle tier, which represents our application. Both
service_maker and launcher_maker execute in the middle tier. The bottom
tier is the workers.
The behavior we want is that when the application (middle tier)
receives a SIGHUP, it catches that, calls mutate_config_files and
relaunches all the workers. This causes them to inherit the mutated
config.
"""
mutate = multiprocessing.Event()
ready = multiprocessing.Event()
def service_maker():
self.conf.register_mutate_hook(lambda c, f: mutate.set())
return ServiceWithTimer(ready)
def launcher_maker():
return service.ProcessLauncher(self.conf, restart_method='mutate')
self.pid = self._spawn_service(1, service_maker, launcher_maker)
timeout = 5
ready.wait(timeout)
self.assertTrue(ready.is_set(), 'Service never became ready')
ready.clear()
self.assertFalse(mutate.is_set(), "Hook was called too early")
os.kill(self.pid, signal.SIGHUP)
ready.wait(timeout)
self.assertTrue(ready.is_set(), 'Service never back after SIGHUP')
self.assertTrue(mutate.is_set(), "Hook wasn't called")
class _Service(service.Service):
def __init__(self):
super(_Service, self).__init__()
self.init = event.Event()
self.cleaned_up = False
def start(self):
self.init.send()
def stop(self):
self.cleaned_up = True
super(_Service, self).stop()
class LauncherTest(base.ServiceBaseTestCase):
def test_graceful_shutdown(self):
# test that services are given a chance to clean up:
svc = _Service()
launcher = service.launch(self.conf, svc)
# wait on 'init' so we know the service had time to start:
svc.init.wait()
launcher.stop()
self.assertTrue(svc.cleaned_up)
# make sure stop can be called more than once. (i.e. play nice with
# unit test fixtures in nova bug #1199315)
launcher.stop()
@mock.patch('oslo_service.service.ServiceLauncher.launch_service')
def _test_launch_single(self, workers, mock_launch):
svc = service.Service()
service.launch(self.conf, svc, workers=workers)
mock_launch.assert_called_with(svc, workers=workers)
def test_launch_none(self):
self._test_launch_single(None)
def test_launch_one_worker(self):
self._test_launch_single(1)
def test_launch_invalid_workers_number(self):
svc = service.Service()
for num_workers in [0, -1]:
self.assertRaises(ValueError, service.launch, self.conf,
svc, num_workers)
@mock.patch('signal.alarm')
@mock.patch('oslo_service.service.ProcessLauncher.launch_service')
def test_multiple_worker(self, mock_launch, alarm_mock):
svc = service.Service()
service.launch(self.conf, svc, workers=3)
mock_launch.assert_called_with(svc, workers=3)
def test_launch_wrong_service_base_class(self):
# check that services that do not subclass service.ServiceBase
# can not be launched.
svc = mock.Mock()
self.assertRaises(TypeError, service.launch, self.conf, svc)
@mock.patch('signal.alarm')
@mock.patch("oslo_service.service.Services.add")
@mock.patch("oslo_service.eventlet_backdoor.initialize_if_enabled")
def test_check_service_base(self, initialize_if_enabled_mock,
services_mock,
alarm_mock):
initialize_if_enabled_mock.return_value = None
launcher = service.Launcher(self.conf)
serv = _Service()
launcher.launch_service(serv)
@mock.patch('signal.alarm')
@mock.patch("oslo_service.service.Services.add")
@mock.patch("oslo_service.eventlet_backdoor.initialize_if_enabled")
def test_check_service_base_fails(self, initialize_if_enabled_mock,
services_mock,
alarm_mock):
initialize_if_enabled_mock.return_value = None
launcher = service.Launcher(self.conf)
class FooService(object):
def __init__(self):
pass
serv = FooService()
self.assertRaises(TypeError, launcher.launch_service, serv)
class ProcessLauncherTest(base.ServiceBaseTestCase):
@mock.patch('signal.alarm')
@mock.patch("signal.signal")
def test_stop(self, signal_mock, alarm_mock):
signal_mock.SIGTERM = 15
launcher = service.ProcessLauncher(self.conf)
self.assertTrue(launcher.running)
pid_nums = [22, 222]
fakeServiceWrapper = service.ServiceWrapper(service.Service(), 1)
launcher.children = {pid_nums[0]: fakeServiceWrapper,
pid_nums[1]: fakeServiceWrapper}
with mock.patch('oslo_service.service.os.kill') as mock_kill:
with mock.patch.object(launcher, '_wait_child') as _wait_child:
def fake_wait_child():
pid = pid_nums.pop()
return launcher.children.pop(pid)
_wait_child.side_effect = fake_wait_child
with mock.patch('oslo_service.service.Service.stop') as \
mock_service_stop:
mock_service_stop.side_effect = lambda: None
launcher.stop()
self.assertFalse(launcher.running)
self.assertFalse(launcher.children)
mock_kill.assert_has_calls([mock.call(222, signal_mock.SIGTERM),
mock.call(22, signal_mock.SIGTERM)],
any_order=True)
self.assertEqual(2, mock_kill.call_count)
mock_service_stop.assert_called_once_with()
def test__handle_signal(self):
signal_handler = service.SignalHandler()
signal_handler.clear()
self.assertEqual(0,
len(signal_handler._signal_handlers[signal.SIGTERM]))
call_1, call_2 = mock.Mock(), mock.Mock()
signal_handler.add_handler('SIGTERM', call_1)
signal_handler.add_handler('SIGTERM', call_2)
self.assertEqual(2,
len(signal_handler._signal_handlers[signal.SIGTERM]))
signal_handler._handle_signal(signal.SIGTERM, 'test')
# execute pending eventlet callbacks
time.sleep(0)
for m in signal_handler._signal_handlers[signal.SIGTERM]:
m.assert_called_once_with(signal.SIGTERM, 'test')
signal_handler.clear()
@mock.patch('sys.version_info', (3, 5))
def test_setup_signal_interruption_no_select_poll(self):
# NOTE(claudiub): SignalHandler is a singleton, which means that it
# might already be initialized. We need to clear to clear the cache
# in order to prevent race conditions between tests.
service.SignalHandler.__class__._instances.clear()
with mock.patch('eventlet.patcher.original',
return_value=object()) as get_original:
signal_handler = service.SignalHandler()
get_original.assert_called_with('select')
self.addCleanup(service.SignalHandler.__class__._instances.clear)
self.assertFalse(
signal_handler._SignalHandler__force_interrupt_on_signal)
@mock.patch('sys.version_info', (3, 5))
def test_setup_signal_interruption_select_poll(self):
# NOTE(claudiub): SignalHandler is a singleton, which means that it
# might already be initialized. We need to clear to clear the cache
# in order to prevent race conditions between tests.
service.SignalHandler.__class__._instances.clear()
signal_handler = service.SignalHandler()
self.addCleanup(service.SignalHandler.__class__._instances.clear)
self.assertTrue(
signal_handler._SignalHandler__force_interrupt_on_signal)
@mock.patch('signal.alarm')
@mock.patch("os.kill")
@mock.patch("oslo_service.service.ProcessLauncher.stop")
@mock.patch("oslo_service.service.ProcessLauncher._respawn_children")
@mock.patch("oslo_service.service.ProcessLauncher.handle_signal")
@mock.patch("oslo_config.cfg.CONF.log_opt_values")
@mock.patch("oslo_service.systemd.notify_once")
@mock.patch("oslo_config.cfg.CONF.reload_config_files")
@mock.patch("oslo_service.service._is_sighup_and_daemon")
def test_parent_process_reload_config(self,
is_sighup_and_daemon_mock,
reload_config_files_mock,
notify_once_mock,
log_opt_values_mock,
handle_signal_mock,
respawn_children_mock,
stop_mock,
kill_mock,
alarm_mock):
is_sighup_and_daemon_mock.return_value = True
respawn_children_mock.side_effect = [None,
eventlet.greenlet.GreenletExit()]
launcher = service.ProcessLauncher(self.conf)
launcher.sigcaught = 1
launcher.children = {}
wrap_mock = mock.Mock()
launcher.children[222] = wrap_mock
launcher.wait()
reload_config_files_mock.assert_called_once_with()
wrap_mock.service.reset.assert_called_once_with()
@mock.patch("oslo_service.service.ProcessLauncher._start_child")
@mock.patch("oslo_service.service.ProcessLauncher.handle_signal")
@mock.patch("eventlet.greenio.GreenPipe")
@mock.patch("os.pipe")
def test_check_service_base(self, pipe_mock, green_pipe_mock,
handle_signal_mock, start_child_mock):
pipe_mock.return_value = [None, None]
launcher = service.ProcessLauncher(self.conf)
serv = _Service()
launcher.launch_service(serv, workers=0)
@mock.patch("oslo_service.service.ProcessLauncher._start_child")
@mock.patch("oslo_service.service.ProcessLauncher.handle_signal")
@mock.patch("eventlet.greenio.GreenPipe")
@mock.patch("os.pipe")
def test_check_service_base_fails(self, pipe_mock, green_pipe_mock,
handle_signal_mock, start_child_mock):
pipe_mock.return_value = [None, None]
launcher = service.ProcessLauncher(self.conf)
class FooService(object):
def __init__(self):
pass
serv = FooService()
self.assertRaises(TypeError, launcher.launch_service, serv, 0)
@mock.patch("oslo_service.service.ProcessLauncher._start_child")
@mock.patch("oslo_service.service.ProcessLauncher.handle_signal")
@mock.patch("eventlet.greenio.GreenPipe")
@mock.patch("os.pipe")
def test_double_sighup(self, pipe_mock, green_pipe_mock,
handle_signal_mock, start_child_mock):
# Test that issuing two SIGHUPs in a row does not exit; then send a
# TERM that does cause an exit.
pipe_mock.return_value = [None, None]
launcher = service.ProcessLauncher(self.conf)
serv = _Service()
launcher.launch_service(serv, workers=0)
def stager():
# -1: start state
# 0: post-init
# 1: first HUP sent
# 2: second HUP sent
# 3: TERM sent
stager.stage += 1
if stager.stage < 3:
launcher._handle_hup(1, mock.sentinel.frame)
elif stager.stage == 3:
launcher._handle_term(15, mock.sentinel.frame)
else:
self.fail("TERM did not kill launcher")
stager.stage = -1
handle_signal_mock.side_effect = stager
launcher.wait()
self.assertEqual(3, stager.stage)
class GracefulShutdownTestService(service.Service):
def __init__(self):
super(GracefulShutdownTestService, self).__init__()
self.finished_task = event.Event()
def start(self, sleep_amount):
def sleep_and_send(finish_event):
time.sleep(sleep_amount)
finish_event.send()
self.tg.add_thread(sleep_and_send, self.finished_task)
def exercise_graceful_test_service(sleep_amount, time_to_wait, graceful):
svc = GracefulShutdownTestService()
svc.start(sleep_amount)
svc.stop(graceful)
def wait_for_task(svc):
svc.finished_task.wait()
return eventlet.timeout.with_timeout(time_to_wait, wait_for_task,
svc=svc, timeout_value="Timeout!")
class ServiceTest(test_base.BaseTestCase):
def test_graceful_stop(self):
# Here we wait long enough for the task to gracefully finish.
self.assertIsNone(exercise_graceful_test_service(1, 2, True))
def test_ungraceful_stop(self):
# Here we stop ungracefully, and will never see the task finish.
self.assertEqual("Timeout!",
exercise_graceful_test_service(1, 2, False))
class EventletServerProcessLauncherTest(base.ServiceBaseTestCase):
def setUp(self):
super(EventletServerProcessLauncherTest, self).setUp()
self.conf(args=[], default_config_files=[])
self.addCleanup(self.conf.reset)
self.workers = 3
def run_server(self):
queue = multiprocessing.Queue()
# NOTE(bnemec): process_time of 5 needs to be longer than the graceful
# shutdown timeout in the "exceeded" test below, but also needs to be
# shorter than the timeout in the regular graceful shutdown test.
proc = multiprocessing.Process(target=eventlet_service.run,
args=(queue,),
kwargs={'workers': self.workers,
'process_time': 5})
proc.start()
port = queue.get()
conn = socket.create_connection(('127.0.0.1', port))
# Send request to make the connection active.
conn.sendall(b'GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
# NOTE(blk-u): The sleep shouldn't be necessary. There must be a bug in
# the server implementation where it takes some time to set up the
# server or signal handlers.
time.sleep(1)
return (proc, conn)
def test_shuts_down_on_sigint_when_client_connected(self):
proc, conn = self.run_server()
# check that server is live
self.assertTrue(proc.is_alive())
# send SIGINT to the server and wait for it to exit while client still
# connected.
os.kill(proc.pid, signal.SIGINT)
proc.join()
conn.close()
def test_graceful_shuts_down_on_sigterm_when_client_connected(self):
self.config(graceful_shutdown_timeout=7)
proc, conn = self.run_server()
# send SIGTERM to the server and wait for it to exit while client still
# connected.
os.kill(proc.pid, signal.SIGTERM)
# server with graceful shutdown must wait forever if
# option graceful_shutdown_timeout is not specified.
# we can not wait forever ... so 1 second is enough.
# NOTE(bnemec): In newer versions of eventlet that drop idle
# connections, this needs to be long enough to allow the signal
# handler to fire but short enough that our request doesn't complete
# or the connection will be closed and the server will stop.
time.sleep(1)
self.assertTrue(proc.is_alive())
conn.close()
proc.join()
def test_graceful_stop_with_exceeded_graceful_shutdown_timeout(self):
# Server must exit if graceful_shutdown_timeout exceeded
graceful_shutdown_timeout = 4
self.config(graceful_shutdown_timeout=graceful_shutdown_timeout)
proc, conn = self.run_server()
time_before = time.time()
os.kill(proc.pid, signal.SIGTERM)
self.assertTrue(proc.is_alive())
proc.join()
self.assertFalse(proc.is_alive())
time_after = time.time()
self.assertTrue(time_after - time_before > graceful_shutdown_timeout)
class EventletServerServiceLauncherTest(EventletServerProcessLauncherTest):
def setUp(self):
super(EventletServerServiceLauncherTest, self).setUp()
self.workers = 1