Windows: allow multiple c-vol backends per service
This change will allow enabling multiple backends within a single Cinder Volume service on Windows. We cannot leverage the oslo_service process launcher as it has some Windows compatibility issues: - it forks - it uses eventlet GreenPipe (not available on Windows) - child processes receive the service objects that should be launched. Those are not serializable, for which reason we're not able to pass them without fork support. For this reason, we'll use the following alternate approach on Windows: - we're adding another cli argument called 'backend_name', which allows requesting a certain backend. - we're adding a Windows process launcher that will just use subprocess.Popen for each backend, using the above mentioned argument to request a specific backend. Thus, for each backend, we'll invoke a command such as the following: cinder-volume.exe --backend_name=<backend_name> --config-file=<conf> - the process launcher will wait for all the subprocesses to exit. In order to ensure that all subprocesses are killed when the parent dies unexpectedly, we're associating Win32 job objects to the child processes, requesting them to be killed when all the job handles are closed. Those handles are kept by the parent process, being closed when the process dies. Depends-On: #Icc2bbe9191e6db685c0fd294abc1d0eb24bc420c Closes-Bug: #1734334 Change-Id: I80fa111fabd46c7802b89fd6136edd8fa8652d47
This commit is contained in:
parent
5419ca908c
commit
3510f38604
@ -19,9 +19,12 @@
|
||||
|
||||
import logging as python_logging
|
||||
import os
|
||||
import re
|
||||
|
||||
import eventlet
|
||||
import eventlet.tpool
|
||||
|
||||
from cinder import exception
|
||||
from cinder import objects
|
||||
|
||||
if os.name == 'nt':
|
||||
@ -58,15 +61,110 @@ CONF = cfg.CONF
|
||||
host_opt = cfg.StrOpt('backend_host', help='Backend override of host value.')
|
||||
CONF.register_cli_opt(host_opt)
|
||||
|
||||
backend_name_opt = cfg.StrOpt(
|
||||
'backend_name',
|
||||
help='NOTE: For Windows internal use only. The name of the backend to be '
|
||||
'managed by this process. It must be one of the backends defined '
|
||||
'using the "enabled_backends" option. Note that normally, this '
|
||||
'should not be used directly. Cinder uses it internally in order to '
|
||||
'spawn subprocesses on Windows.')
|
||||
CONF.register_cli_opt(backend_name_opt)
|
||||
|
||||
|
||||
# TODO(geguileo): Once we complete the work on A-A update the option's help.
|
||||
cluster_opt = cfg.StrOpt('cluster',
|
||||
default=None,
|
||||
help='Name of this cluster. Used to group volume '
|
||||
help='Name of this cluster. Used to group volume '
|
||||
'hosts that share the same backend '
|
||||
'configurations to work in HA Active-Active '
|
||||
'mode. Active-Active is not yet supported.')
|
||||
CONF.register_opt(cluster_opt)
|
||||
|
||||
LOG = None
|
||||
|
||||
service_started = False
|
||||
|
||||
|
||||
def _launch_service(launcher, backend):
|
||||
CONF.register_opt(host_opt, group=backend)
|
||||
backend_host = getattr(CONF, backend).backend_host
|
||||
host = "%s@%s" % (backend_host or CONF.host, backend)
|
||||
# We also want to set cluster to None on empty strings, and we
|
||||
# ignore leading and trailing spaces.
|
||||
cluster = CONF.cluster and CONF.cluster.strip()
|
||||
cluster = (cluster or None) and '%s@%s' % (cluster, backend)
|
||||
try:
|
||||
server = service.Service.create(host=host,
|
||||
service_name=backend,
|
||||
binary=constants.VOLUME_BINARY,
|
||||
coordination=True,
|
||||
cluster=cluster)
|
||||
except Exception:
|
||||
LOG.exception('Volume service %s failed to start.', host)
|
||||
else:
|
||||
# Dispose of the whole DB connection pool here before
|
||||
# starting another process. Otherwise we run into cases where
|
||||
# child processes share DB connections which results in errors.
|
||||
session.dispose_engine()
|
||||
launcher.launch_service(server)
|
||||
_notify_service_started()
|
||||
|
||||
|
||||
def _ensure_service_started():
|
||||
if not service_started:
|
||||
LOG.error('No volume service(s) started successfully, terminating.')
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def _notify_service_started():
|
||||
global service_started
|
||||
service_started = True
|
||||
|
||||
|
||||
def _launch_services_win32():
|
||||
if CONF.backend_name and CONF.backend_name not in CONF.enabled_backends:
|
||||
msg = _('The explicitly passed backend name "%(backend_name)s" is not '
|
||||
'among the enabled backends: %(enabled_backends)s.')
|
||||
raise exception.InvalidInput(
|
||||
reason=msg % dict(backend_name=CONF.backend_name,
|
||||
enabled_backends=CONF.enabled_backends))
|
||||
|
||||
# We'll avoid spawning a subprocess if a single backend is requested.
|
||||
single_backend_name = (CONF.enabled_backends[0]
|
||||
if len(CONF.enabled_backends) == 1
|
||||
else CONF.backend_name)
|
||||
if single_backend_name:
|
||||
launcher = service.get_launcher()
|
||||
_launch_service(launcher, single_backend_name)
|
||||
elif CONF.enabled_backends:
|
||||
# We're using the 'backend_name' argument, requesting a certain backend
|
||||
# and constructing the service object within the child process.
|
||||
launcher = service.WindowsProcessLauncher()
|
||||
py_script_re = re.compile(r'.*\.py\w?$')
|
||||
for backend in filter(None, CONF.enabled_backends):
|
||||
cmd = sys.argv + ['--backend_name=%s' % backend]
|
||||
# Recent setuptools versions will trim '-script.py' and '.exe'
|
||||
# extensions from sys.argv[0].
|
||||
if py_script_re.match(sys.argv[0]):
|
||||
cmd = [sys.executable] + cmd
|
||||
launcher.add_process(cmd)
|
||||
_notify_service_started()
|
||||
|
||||
_ensure_service_started()
|
||||
|
||||
launcher.wait()
|
||||
|
||||
|
||||
def _launch_services_posix():
|
||||
launcher = service.get_launcher()
|
||||
|
||||
for backend in filter(None, CONF.enabled_backends):
|
||||
_launch_service(launcher, backend)
|
||||
|
||||
_ensure_service_started()
|
||||
|
||||
launcher.wait()
|
||||
|
||||
|
||||
def main():
|
||||
objects.register_all()
|
||||
@ -78,43 +176,20 @@ def main():
|
||||
priv_context.init(root_helper=shlex.split(utils.get_root_helper()))
|
||||
utils.monkey_patch()
|
||||
gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)
|
||||
launcher = service.get_launcher()
|
||||
global LOG
|
||||
LOG = logging.getLogger(__name__)
|
||||
service_started = False
|
||||
|
||||
if CONF.enabled_backends:
|
||||
for backend in filter(None, CONF.enabled_backends):
|
||||
CONF.register_opt(host_opt, group=backend)
|
||||
backend_host = getattr(CONF, backend).backend_host
|
||||
host = "%s@%s" % (backend_host or CONF.host, backend)
|
||||
# We also want to set cluster to None on empty strings, and we
|
||||
# ignore leading and trailing spaces.
|
||||
cluster = CONF.cluster and CONF.cluster.strip()
|
||||
cluster = (cluster or None) and '%s@%s' % (cluster, backend)
|
||||
try:
|
||||
server = service.Service.create(host=host,
|
||||
service_name=backend,
|
||||
binary=constants.VOLUME_BINARY,
|
||||
coordination=True,
|
||||
cluster=cluster)
|
||||
except Exception:
|
||||
msg = _('Volume service %s failed to start.') % host
|
||||
LOG.exception(msg)
|
||||
else:
|
||||
# Dispose of the whole DB connection pool here before
|
||||
# starting another process. Otherwise we run into cases where
|
||||
# child processes share DB connections which results in errors.
|
||||
session.dispose_engine()
|
||||
launcher.launch_service(server)
|
||||
service_started = True
|
||||
else:
|
||||
if not CONF.enabled_backends:
|
||||
LOG.error('Configuration for cinder-volume does not specify '
|
||||
'"enabled_backends". Using DEFAULT section to configure '
|
||||
'drivers is not supported since Ocata.')
|
||||
|
||||
if not service_started:
|
||||
msg = _('No volume service(s) started successfully, terminating.')
|
||||
LOG.error(msg)
|
||||
sys.exit(1)
|
||||
|
||||
launcher.wait()
|
||||
if os.name == 'nt':
|
||||
# We cannot use oslo.service to spawn multiple services on Windows.
|
||||
# It relies on forking, which is not available on Windows.
|
||||
# Furthermore, service objects are unmarshallable objects that are
|
||||
# passed to subprocesses.
|
||||
_launch_services_win32()
|
||||
else:
|
||||
_launch_services_posix()
|
||||
|
@ -21,6 +21,9 @@
|
||||
import inspect
|
||||
import os
|
||||
import random
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
from oslo_concurrency import processutils
|
||||
from oslo_config import cfg
|
||||
@ -48,6 +51,11 @@ from cinder import rpc
|
||||
from cinder import version
|
||||
from cinder.volume import utils as vol_utils
|
||||
|
||||
if os.name == 'nt':
|
||||
from os_win import utilsfactory as os_win_utilsfactory
|
||||
else:
|
||||
os_win_utilsfactory = None
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -649,3 +657,48 @@ def get_launcher():
|
||||
return Launcher()
|
||||
else:
|
||||
return process_launcher()
|
||||
|
||||
|
||||
class WindowsProcessLauncher(object):
|
||||
def __init__(self):
|
||||
self._processutils = os_win_utilsfactory.get_processutils()
|
||||
|
||||
self._workers = []
|
||||
self._worker_job_handles = []
|
||||
self._signal_handler = service.SignalHandler()
|
||||
self._add_signal_handlers()
|
||||
|
||||
def add_process(self, cmd):
|
||||
LOG.info("Starting subprocess: %s", cmd)
|
||||
|
||||
worker = subprocess.Popen(cmd)
|
||||
try:
|
||||
job_handle = self._processutils.kill_process_on_job_close(
|
||||
worker.pid)
|
||||
except Exception:
|
||||
LOG.exception("Could not associate child process "
|
||||
"with a job, killing it.")
|
||||
worker.kill()
|
||||
raise
|
||||
|
||||
self._worker_job_handles.append(job_handle)
|
||||
self._workers.append(worker)
|
||||
|
||||
def _add_signal_handlers(self):
|
||||
self._signal_handler.add_handler('SIGINT', self._terminate)
|
||||
self._signal_handler.add_handler('SIGTERM', self._terminate)
|
||||
|
||||
def _terminate(self, *args):
|
||||
# We've already assigned win32 job objects to child processes,
|
||||
# requesting them to stop once all the job handles are closed.
|
||||
# When this process dies, so will the child processes.
|
||||
LOG.info("Received request to terminate.")
|
||||
sys.exit(1)
|
||||
|
||||
def wait(self):
|
||||
pids = [worker.pid for worker in self._workers]
|
||||
if pids:
|
||||
self._processutils.wait_for_multiple_processes(pids,
|
||||
wait_all=True)
|
||||
# By sleeping here, we allow signal handlers to be executed.
|
||||
time.sleep(0)
|
||||
|
@ -140,12 +140,14 @@ class TestCinderSchedulerCmd(test.TestCase):
|
||||
service_wait.assert_called_once_with()
|
||||
|
||||
|
||||
class TestCinderVolumeCmd(test.TestCase):
|
||||
class TestCinderVolumeCmdPosix(test.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestCinderVolumeCmd, self).setUp()
|
||||
super(TestCinderVolumeCmdPosix, self).setUp()
|
||||
sys.argv = ['cinder-volume']
|
||||
|
||||
self.patch('os.name', 'posix')
|
||||
|
||||
@mock.patch('cinder.service.get_launcher')
|
||||
@mock.patch('cinder.service.Service.create')
|
||||
@mock.patch('cinder.utils.monkey_patch')
|
||||
@ -185,6 +187,133 @@ class TestCinderVolumeCmd(test.TestCase):
|
||||
launcher.wait.assert_called_once_with()
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class TestCinderVolumeCmdWin32(test.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestCinderVolumeCmdWin32, self).setUp()
|
||||
sys.argv = ['cinder-volume']
|
||||
|
||||
self._mock_win32_proc_launcher = mock.Mock()
|
||||
|
||||
self.patch('os.name', 'nt')
|
||||
self.patch('cinder.service.WindowsProcessLauncher',
|
||||
lambda *args, **kwargs: self._mock_win32_proc_launcher)
|
||||
|
||||
@mock.patch('cinder.service.get_launcher')
|
||||
@mock.patch('cinder.service.Service.create')
|
||||
@mock.patch('cinder.utils.monkey_patch')
|
||||
@mock.patch('oslo_log.log.setup')
|
||||
def test_main(self, log_setup, monkey_patch, service_create,
|
||||
get_launcher):
|
||||
CONF.set_override('enabled_backends', None)
|
||||
self.assertRaises(SystemExit, cinder_volume.main)
|
||||
self.assertFalse(service_create.called)
|
||||
self.assertFalse(self._mock_win32_proc_launcher.called)
|
||||
|
||||
@mock.patch('cinder.service.get_launcher')
|
||||
@mock.patch('cinder.service.Service.create')
|
||||
@mock.patch('cinder.utils.monkey_patch')
|
||||
@mock.patch('oslo_log.log.setup')
|
||||
def test_main_invalid_backend(self, log_setup, monkey_patch,
|
||||
service_create, get_launcher):
|
||||
CONF.set_override('enabled_backends', 'backend1')
|
||||
CONF.set_override('backend_name', 'backend2')
|
||||
self.assertRaises(exception.InvalidInput, cinder_volume.main)
|
||||
self.assertFalse(service_create.called)
|
||||
self.assertFalse(self._mock_win32_proc_launcher.called)
|
||||
|
||||
@mock.patch('cinder.utils.monkey_patch')
|
||||
@mock.patch('oslo_log.log.setup')
|
||||
@ddt.data({},
|
||||
{'binary_path': 'cinder-volume-script.py',
|
||||
'exp_py_executable': True})
|
||||
@ddt.unpack
|
||||
def test_main_with_multiple_backends(self, log_setup, monkey_patch,
|
||||
binary_path='cinder-volume',
|
||||
exp_py_executable=False):
|
||||
# If multiple backends are used, we expect the Windows process
|
||||
# launcher to be used in order to create the child processes.
|
||||
backends = ['', 'backend1', 'backend2', '']
|
||||
CONF.set_override('enabled_backends', backends)
|
||||
CONF.set_override('host', 'host')
|
||||
launcher = self._mock_win32_proc_launcher
|
||||
|
||||
# Depending on the setuptools version, '-script.py' and '.exe'
|
||||
# binary path extensions may be trimmed. We need to take this
|
||||
# into consideration when building the command that will be
|
||||
# used to spawn child subprocesses.
|
||||
sys.argv = [binary_path]
|
||||
|
||||
cinder_volume.main()
|
||||
|
||||
self.assertEqual('cinder', CONF.project)
|
||||
self.assertEqual(CONF.version, version.version_string())
|
||||
log_setup.assert_called_once_with(CONF, "cinder")
|
||||
monkey_patch.assert_called_once_with()
|
||||
|
||||
exp_cmd_prefix = [sys.executable] if exp_py_executable else []
|
||||
exp_cmds = [
|
||||
exp_cmd_prefix + sys.argv + ['--backend_name=%s' % backend_name]
|
||||
for backend_name in ['backend1', 'backend2']]
|
||||
launcher.add_process.assert_has_calls(
|
||||
[mock.call(exp_cmd) for exp_cmd in exp_cmds])
|
||||
launcher.wait.assert_called_once_with()
|
||||
|
||||
@mock.patch('cinder.service.get_launcher')
|
||||
@mock.patch('cinder.service.Service.create')
|
||||
@mock.patch('cinder.utils.monkey_patch')
|
||||
@mock.patch('oslo_log.log.setup')
|
||||
def test_main_with_multiple_backends_child(
|
||||
self, log_setup, monkey_patch, service_create, get_launcher):
|
||||
# We're testing the code expected to be run within child processes.
|
||||
backends = ['', 'backend1', 'backend2', '']
|
||||
CONF.set_override('enabled_backends', backends)
|
||||
CONF.set_override('host', 'host')
|
||||
launcher = get_launcher.return_value
|
||||
|
||||
sys.argv += ['--backend_name', 'backend2']
|
||||
|
||||
cinder_volume.main()
|
||||
|
||||
self.assertEqual('cinder', CONF.project)
|
||||
self.assertEqual(CONF.version, version.version_string())
|
||||
log_setup.assert_called_once_with(CONF, "cinder")
|
||||
monkey_patch.assert_called_once_with()
|
||||
|
||||
service_create.assert_called_once_with(
|
||||
binary=constants.VOLUME_BINARY, host='host@backend2',
|
||||
service_name='backend2', coordination=True,
|
||||
cluster=None)
|
||||
launcher.launch_service.assert_called_once_with(
|
||||
service_create.return_value)
|
||||
|
||||
@mock.patch('cinder.service.get_launcher')
|
||||
@mock.patch('cinder.service.Service.create')
|
||||
@mock.patch('cinder.utils.monkey_patch')
|
||||
@mock.patch('oslo_log.log.setup')
|
||||
def test_main_with_single_backend(
|
||||
self, log_setup, monkey_patch, service_create, get_launcher):
|
||||
# We're expecting the service to be run within the same process.
|
||||
CONF.set_override('enabled_backends', ['backend2'])
|
||||
CONF.set_override('host', 'host')
|
||||
launcher = get_launcher.return_value
|
||||
|
||||
cinder_volume.main()
|
||||
|
||||
self.assertEqual('cinder', CONF.project)
|
||||
self.assertEqual(CONF.version, version.version_string())
|
||||
log_setup.assert_called_once_with(CONF, "cinder")
|
||||
monkey_patch.assert_called_once_with()
|
||||
|
||||
service_create.assert_called_once_with(
|
||||
binary=constants.VOLUME_BINARY, host='host@backend2',
|
||||
service_name='backend2', coordination=True,
|
||||
cluster=None)
|
||||
launcher.launch_service.assert_called_once_with(
|
||||
service_create.return_value)
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class TestCinderManageCmd(test.TestCase):
|
||||
|
||||
|
@ -588,3 +588,61 @@ class OSCompatibilityTestCase(test.TestCase):
|
||||
|
||||
def test_process_launcher_on_linux(self):
|
||||
self._test_service_launcher('posix')
|
||||
|
||||
|
||||
class WindowsProcessLauncherTestCase(test.TestCase):
|
||||
@mock.patch.object(service, 'os_win_utilsfactory', create=True)
|
||||
@mock.patch('oslo_service.service.SignalHandler')
|
||||
def setUp(self, mock_signal_handler_cls, mock_utilsfactory):
|
||||
super(WindowsProcessLauncherTestCase, self).setUp()
|
||||
|
||||
self._signal_handler = mock_signal_handler_cls.return_value
|
||||
self._processutils = mock_utilsfactory.get_processutils.return_value
|
||||
|
||||
self._launcher = service.WindowsProcessLauncher()
|
||||
|
||||
def test_setup_signal_handlers(self):
|
||||
exp_signal_map = {'SIGINT': self._launcher._terminate,
|
||||
'SIGTERM': self._launcher._terminate}
|
||||
self._signal_handler.add_handler.assert_has_calls(
|
||||
[mock.call(signal, handler)
|
||||
for signal, handler in exp_signal_map.items()],
|
||||
any_order=True)
|
||||
|
||||
@mock.patch('sys.exit')
|
||||
def test_terminate_handler(self, mock_exit):
|
||||
self._launcher._terminate(mock.sentinel.signum, mock.sentinel.frame)
|
||||
mock_exit.assert_called_once_with(1)
|
||||
|
||||
@mock.patch('subprocess.Popen')
|
||||
def test_launch(self, mock_popen):
|
||||
mock_workers = [mock.Mock(), mock.Mock(), mock.Mock()]
|
||||
|
||||
mock_popen.side_effect = mock_workers
|
||||
self._processutils.kill_process_on_job_close.side_effect = [
|
||||
exception.CinderException, None, None]
|
||||
|
||||
# We expect the first process to be cleaned up after failing
|
||||
# to setup a job object.
|
||||
self.assertRaises(exception.CinderException,
|
||||
self._launcher.add_process,
|
||||
mock.sentinel.cmd1)
|
||||
mock_workers[0].kill.assert_called_once_with()
|
||||
|
||||
self._launcher.add_process(mock.sentinel.cmd2)
|
||||
self._launcher.add_process(mock.sentinel.cmd3)
|
||||
|
||||
mock_popen.assert_has_calls(
|
||||
[mock.call(cmd)
|
||||
for cmd in [mock.sentinel.cmd1,
|
||||
mock.sentinel.cmd2,
|
||||
mock.sentinel.cmd3]])
|
||||
self._processutils.kill_process_on_job_close.assert_has_calls(
|
||||
[mock.call(worker.pid) for worker in mock_workers[1:]])
|
||||
|
||||
self._launcher.wait()
|
||||
|
||||
wait_processes = self._processutils.wait_for_multiple_processes
|
||||
wait_processes.assert_called_once_with(
|
||||
[worker.pid for worker in mock_workers[1:]],
|
||||
wait_all=True)
|
||||
|
@ -0,0 +1,5 @@
|
||||
---
|
||||
fixes:
|
||||
- |
|
||||
Multiple backends may now be enabled within the same Cinder Volume service
|
||||
on Windows by using the ``enabled_backends`` config option.
|
Loading…
Reference in New Issue
Block a user