Browse Source

Fix neutron-openvswitch-agent Windows support

Currently, the neutron-openvswitch-agent does not start on Windows
due to Linux specific imports. This patch addresses this issue.

Also, we're wrapping the object returned by subprocess.Popen using
tpool.Proxy in order to prevent IO operations on the stream
handles from blocking other threads. Currently, the ovs db monitor
blocks the whole process.

Closes-Bug: #1775382

Co-Authored-By: Lucian Petrut <lpetrut@cloudbasesolutions.com>
Change-Id: I8bbc9d1f8332e5644a6071f599a7c6a66bef7928
(cherry picked from commit fee630efaa)
changes/99/596999/1
Claudiu Belu 3 years ago
committed by Lucian Petrut
parent
commit
3cc89a9c44
  1. 2
      neutron/agent/common/ip_lib.py
  2. 4
      neutron/agent/common/utils.py
  3. 9
      neutron/agent/linux/async_process.py
  4. 2
      neutron/agent/linux/iptables_manager.py
  5. 16
      neutron/agent/linux/utils.py
  6. 6
      neutron/agent/windows/ip_lib.py
  7. 61
      neutron/agent/windows/utils.py
  8. 6
      neutron/common/exceptions.py
  9. 2
      neutron/tests/unit/agent/linux/test_async_process.py
  10. 4
      neutron/tests/unit/agent/linux/test_iptables_manager.py
  11. 15
      neutron/tests/unit/agent/linux/test_utils.py
  12. 215
      neutron/tests/unit/agent/windows/test_utils.py

2
neutron/agent/common/ip_lib.py

@ -27,3 +27,5 @@ else:
IPWrapper = ip_lib.IPWrapper
IPDevice = ip_lib.IPDevice
add_namespace_to_cmd = ip_lib.add_namespace_to_cmd

4
neutron/agent/common/utils.py

@ -37,7 +37,11 @@ agents_db.register_db_agents_opts()
INTERFACE_NAMESPACE = 'neutron.interface_drivers'
create_process = utils.create_process
kill_process = utils.kill_process
execute = utils.execute
get_root_helper_child_pid = utils.get_root_helper_child_pid
pid_invoked_with_cmdline = utils.pid_invoked_with_cmdline
def load_interface_driver(conf):

9
neutron/agent/linux/async_process.py

@ -21,8 +21,8 @@ from neutron_lib.utils import helpers
from oslo_log import log as logging
from neutron._i18n import _
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.agent.common import ip_lib
from neutron.agent.common import utils
from neutron.common import utils as common_utils
@ -117,7 +117,7 @@ class AsyncProcess(object):
if block:
common_utils.wait_until_true(self.is_active)
def stop(self, block=False, kill_signal=signal.SIGKILL):
def stop(self, block=False, kill_signal=None):
"""Halt the process and watcher threads.
:param block: Block until the process has stopped.
@ -126,6 +126,7 @@ class AsyncProcess(object):
:raises utils.WaitTimeout if blocking is True and the process
did not stop in time.
"""
kill_signal = kill_signal or getattr(signal, 'SIGKILL', signal.SIGTERM)
if self._is_running:
LOG.debug('Halting async process [%s].', self.cmd)
self._kill(kill_signal)
@ -196,7 +197,7 @@ class AsyncProcess(object):
stderr = list(self.iter_stderr())
LOG.debug('Halting async process [%s] in response to an error. stdout:'
' [%s] - stderr: [%s]', self.cmd, stdout, stderr)
self._kill(signal.SIGKILL)
self._kill(getattr(signal, 'SIGKILL', signal.SIGTERM))
if self.respawn_interval is not None and self.respawn_interval >= 0:
eventlet.sleep(self.respawn_interval)
LOG.debug('Respawning async process [%s].', self.cmd)

2
neutron/agent/linux/iptables_manager.py

@ -495,7 +495,7 @@ class IptablesManager(object):
return self._do_run_restore(args, commands, lock=True)
err = self._do_run_restore(args, commands)
if (isinstance(err, linux_utils.ProcessExecutionError) and
if (isinstance(err, n_exc.ProcessExecutionError) and
err.returncode == XTABLES_RESOURCE_PROBLEM_CODE):
# maybe we run on a platform that includes iptables commit
# 999eaa241212d3952ddff39a99d0d55a74e3639e (for example, latest

16
neutron/agent/linux/utils.py

@ -35,6 +35,7 @@ from six.moves import http_client as httplib
from neutron._i18n import _
from neutron.agent.linux import xenapi_root_helper
from neutron.common import exceptions
from neutron.common import utils
from neutron.conf.agent import common as config
from neutron import wsgi
@ -43,12 +44,6 @@ from neutron import wsgi
LOG = logging.getLogger(__name__)
class ProcessExecutionError(RuntimeError):
def __init__(self, message, returncode):
super(ProcessExecutionError, self).__init__(message)
self.returncode = returncode
class RootwrapDaemonHelper(object):
__client = None
__lock = threading.Lock()
@ -148,7 +143,8 @@ def execute(cmd, process_input=None, addl_env=None,
if log_fail_as_error:
LOG.error(msg)
if check_exit_code:
raise ProcessExecutionError(msg, returncode=returncode)
raise exceptions.ProcessExecutionError(msg,
returncode=returncode)
finally:
# NOTE(termie): this appears to be necessary to let the subprocess
@ -167,7 +163,7 @@ def find_child_pids(pid, recursive=False):
try:
raw_pids = execute(['ps', '--ppid', pid, '-o', 'pid='],
log_fail_as_error=False)
except ProcessExecutionError as e:
except exceptions.ProcessExecutionError as e:
# Unexpected errors are the responsibility of the caller
with excutils.save_and_reraise_exception() as ctxt:
# Exception has already been logged by execute
@ -191,7 +187,7 @@ def find_parent_pid(pid):
try:
ppid = execute(['ps', '-o', 'ppid=', pid],
log_fail_as_error=False)
except ProcessExecutionError as e:
except exceptions.ProcessExecutionError as e:
# Unexpected errors are the responsibility of the caller
with excutils.save_and_reraise_exception() as ctxt:
# Exception has already been logged by execute
@ -221,7 +217,7 @@ def kill_process(pid, signal, run_as_root=False):
"""Kill the process with the given pid using the given signal."""
try:
execute(['kill', '-%d' % signal, pid], run_as_root=run_as_root)
except ProcessExecutionError:
except exceptions.ProcessExecutionError:
if process_is_running(pid):
raise

6
neutron/agent/windows/ip_lib.py

@ -79,3 +79,9 @@ class IPLink(object):
return False
return [eth_addr['addr'] for eth_addr in
device_addresses.get(netifaces.AF_LINK, [])]
def add_namespace_to_cmd(cmd, namespace=None):
"""Add an optional namespace to the command."""
return cmd

61
neutron/agent/windows/utils.py

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import io
import os
import eventlet
@ -20,8 +21,13 @@ from eventlet import tpool
from neutron_lib.utils import helpers
from oslo_log import log as logging
from oslo_utils import encodeutils
import six
from neutron._i18n import _
from neutron.common import exceptions
if os.name == 'nt':
import wmi
LOG = logging.getLogger(__name__)
@ -32,7 +38,8 @@ subprocess = eventlet.patcher.original('subprocess')
subprocess.threading = eventlet.patcher.original('threading')
def create_process(cmd, addl_env=None):
def create_process(cmd, run_as_root=False, addl_env=None,
tpool_proxy=True):
cmd = list(map(str, cmd))
LOG.debug("Running command: %s", cmd)
@ -48,10 +55,39 @@ def create_process(cmd, addl_env=None):
env=env,
preexec_fn=None,
close_fds=False)
if tpool_proxy and eventlet.getcurrent().parent:
# If we intend to access the process streams, we need to wrap this
# in a tpool proxy object, avoding blocking other greenthreads.
#
# The 'file' type is not available on Python 3.x.
file_type = getattr(six.moves.builtins, 'file', io.IOBase)
obj = tpool.Proxy(obj, autowrap=(file_type, ))
return obj, cmd
def _get_wmi_process(pid):
if not pid:
return None
conn = wmi.WMI()
processes = conn.Win32_Process(ProcessId=pid)
if processes:
return processes[0]
return None
def kill_process(pid, signal, run_as_root=False):
"""Kill the process with the given pid using the given signal."""
process = _get_wmi_process(pid)
try:
if process:
process.Terminate()
except Exception:
if _get_wmi_process(pid):
raise
def execute(cmd, process_input=None, addl_env=None,
check_exit_code=True, return_stderr=False, log_fail_as_error=True,
extra_ok_codes=None, run_as_root=False, do_decode=True):
@ -60,7 +96,7 @@ def execute(cmd, process_input=None, addl_env=None,
_process_input = encodeutils.to_utf8(process_input)
else:
_process_input = None
obj, cmd = create_process(cmd, addl_env=addl_env)
obj, cmd = create_process(cmd, addl_env=addl_env, tpool_proxy=False)
_stdout, _stderr = avoid_blocking_call(obj.communicate, _process_input)
obj.stdin.close()
_stdout = helpers.safe_decode_utf8(_stdout)
@ -85,7 +121,7 @@ def execute(cmd, process_input=None, addl_env=None,
LOG.debug(log_msg)
if obj.returncode and check_exit_code:
raise RuntimeError(m)
raise exceptions.ProcessExecutionError(m, returncode=obj.returncode)
return (_stdout, _stderr) if return_stderr else _stdout
@ -107,3 +143,22 @@ def avoid_blocking_call(f, *args, **kwargs):
return tpool.execute(f, *args, **kwargs)
else:
return f(*args, **kwargs)
def get_root_helper_child_pid(pid, expected_cmd, run_as_root=False):
# We don't use a root helper on Windows.
return str(pid)
def process_is_running(pid):
"""Find if the given PID is running in the system."""
return _get_wmi_process(pid) is not None
def pid_invoked_with_cmdline(pid, expected_cmd):
process = _get_wmi_process(pid)
if not process:
return False
command = process.CommandLine
return command == " ".join(expected_cmd)

6
neutron/common/exceptions.py

@ -353,3 +353,9 @@ class PortBindingAlreadyExists(e.Conflict):
class PortBindingError(e.NeutronException):
message = _("Binding for port %(port_id)s on host %(host)s could not be "
"created or updated.")
class ProcessExecutionError(RuntimeError):
def __init__(self, message, returncode):
super(ProcessExecutionError, self).__init__(message)
self.returncode = returncode

2
neutron/tests/unit/agent/linux/test_async_process.py

@ -19,8 +19,8 @@ import eventlet.queue
import mock
import testtools
from neutron.agent.common import utils
from neutron.agent.linux import async_process
from neutron.agent.linux import utils
from neutron.tests import base
from neutron.tests.unit.agent.linux import failing_process

4
neutron/tests/unit/agent/linux/test_iptables_manager.py

@ -887,7 +887,7 @@ class IptablesManagerStateFulTestCase(IptablesManagerBaseTestCase):
# pretend line 11 failed
msg = ("Exit code: 1\nStdout: ''\n"
"Stderr: 'iptables-restore: line 11 failed\n'")
raise linux_utils.ProcessExecutionError(
raise n_exc.ProcessExecutionError(
msg, iptables_manager.XTABLES_RESOURCE_PROBLEM_CODE)
return FILTER_DUMP
self.execute.side_effect = iptables_restore_failer
@ -926,7 +926,7 @@ class IptablesManagerStateFulTestCase(IptablesManagerBaseTestCase):
def test_iptables_use_table_lock(self):
# Under normal operation, if we do call iptables-restore with a -w
# and it succeeds, the next call will only use -w.
PE_error = linux_utils.ProcessExecutionError(
PE_error = n_exc.ProcessExecutionError(
"", iptables_manager.XTABLES_RESOURCE_PROBLEM_CODE)
num_calls = 3

15
neutron/tests/unit/agent/linux/test_utils.py

@ -24,6 +24,7 @@ from oslo_config import cfg
import oslo_i18n
from neutron.agent.linux import utils
from neutron.common import exceptions as n_exc
from neutron.tests import base
from neutron.tests.common import helpers
@ -146,7 +147,7 @@ class AgentUtilsExecuteTest(base.BaseTestCase):
self.mock_popen.return_value = ('', '')
self.process.return_value.returncode = 1
with mock.patch.object(utils, 'LOG') as log:
self.assertRaises(utils.ProcessExecutionError, utils.execute,
self.assertRaises(n_exc.ProcessExecutionError, utils.execute,
['ls'], log_fail_as_error=False)
self.assertFalse(log.error.called)
@ -200,7 +201,7 @@ class TestFindParentPid(base.BaseTestCase):
self.m_execute = mock.patch.object(utils, 'execute').start()
def test_returns_none_for_no_valid_pid(self):
self.m_execute.side_effect = utils.ProcessExecutionError('',
self.m_execute.side_effect = n_exc.ProcessExecutionError('',
returncode=1)
self.assertIsNone(utils.find_parent_pid(-1))
@ -209,9 +210,9 @@ class TestFindParentPid(base.BaseTestCase):
self.assertEqual(utils.find_parent_pid(-1), '123')
def test_raises_exception_returncode_0(self):
with testtools.ExpectedException(utils.ProcessExecutionError):
with testtools.ExpectedException(n_exc.ProcessExecutionError):
self.m_execute.side_effect = \
utils.ProcessExecutionError('', returncode=0)
n_exc.ProcessExecutionError('', returncode=0)
utils.find_parent_pid(-1)
def test_raises_unknown_exception(self):
@ -264,7 +265,7 @@ class TestKillProcess(base.BaseTestCase):
def _test_kill_process(self, pid, raise_exception=False,
kill_signal=signal.SIGKILL, pid_killed=True):
if raise_exception:
exc = utils.ProcessExecutionError('', returncode=0)
exc = n_exc.ProcessExecutionError('', returncode=0)
else:
exc = None
with mock.patch.object(utils, 'execute',
@ -283,7 +284,7 @@ class TestKillProcess(base.BaseTestCase):
self._test_kill_process('1', raise_exception=True)
def test_kill_process_raises_exception_for_execute_exception(self):
with testtools.ExpectedException(utils.ProcessExecutionError):
with testtools.ExpectedException(n_exc.ProcessExecutionError):
# Simulate that the process is running after trying to kill due to
# any reason such as, for example, Permission denied
self._test_kill_process('1', raise_exception=True,
@ -297,7 +298,7 @@ class TestFindChildPids(base.BaseTestCase):
def test_returns_empty_list_for_exit_code_1(self):
with mock.patch.object(utils, 'execute',
side_effect=utils.ProcessExecutionError(
side_effect=n_exc.ProcessExecutionError(
'', returncode=1)):
self.assertEqual([], utils.find_child_pids(-1))

215
neutron/tests/unit/agent/windows/test_utils.py

@ -0,0 +1,215 @@
# Copyright 2018 Cloudbase Solutions.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import io
import ddt
import eventlet
from eventlet import tpool
import mock
import six
from neutron.agent.windows import utils
from neutron.common import exceptions
from neutron.tests import base
@ddt.ddt
class WindowsUtilsTestCase(base.BaseTestCase):
@mock.patch('os.environ', {mock.sentinel.key0: mock.sentinel.val0})
@mock.patch.object(utils.subprocess, 'Popen')
@mock.patch.object(tpool, 'Proxy')
@mock.patch.object(eventlet, 'getcurrent')
def test_create_process(self, mock_get_current_gt,
mock_tpool_proxy, mock_popen):
cmd = ['fake_cmd']
popen_obj, ret_cmd = utils.create_process(
cmd,
run_as_root=mock.sentinel.run_as_root,
addl_env={mock.sentinel.key1: mock.sentinel.val1},
tpool_proxy=True)
exp_env = {mock.sentinel.key0: mock.sentinel.val0,
mock.sentinel.key1: mock.sentinel.val1}
mock_popen.assert_called_once_with(
cmd,
shell=False,
stdin=utils.subprocess.PIPE,
stdout=utils.subprocess.PIPE,
stderr=utils.subprocess.PIPE,
env=exp_env,
preexec_fn=None,
close_fds=False)
file_type = getattr(six.moves.builtins, 'file', io.IOBase)
mock_tpool_proxy.assert_called_once_with(
mock_popen.return_value, autowrap=(file_type, ))
self.assertEqual(mock_tpool_proxy.return_value, popen_obj)
self.assertEqual(ret_cmd, cmd)
@ddt.data({},
{'pid': None},
{'process_exists': True})
@ddt.unpack
@mock.patch.object(utils, 'wmi', create=True)
def test_get_wmi_process(self, mock_wmi,
pid=mock.sentinel.pid,
process_exists=False):
mock_conn = mock_wmi.WMI.return_value
if not pid:
exp_process = None
elif process_exists:
exp_process = mock.sentinel.wmi_obj
mock_conn.Win32_Process.return_value = [exp_process]
else:
exp_process = None
mock_conn.Win32_Process.return_value = []
wmi_obj = utils._get_wmi_process(pid)
self.assertEqual(exp_process, wmi_obj)
if pid:
mock_conn.Win32_Process.assert_called_once_with(ProcessId=pid)
@ddt.data(True, False)
@mock.patch.object(utils, '_get_wmi_process')
def test_kill_process(self, process_exists, mock_get_process):
if not process_exists:
mock_get_process.return_value = None
utils.kill_process(mock.sentinel.pid, mock.sentinel.signal,
run_as_root=False)
mock_get_process.assert_called_once_with(mock.sentinel.pid)
if process_exists:
mock_get_process.return_value.Terminate.assert_called_once_with()
@ddt.data(True, False)
@mock.patch.object(utils, '_get_wmi_process')
def test_kill_process_exception(self, process_still_running,
mock_get_process):
mock_process = mock.Mock()
mock_process.Terminate.side_effect = OSError
mock_get_process.side_effect = [
mock_process,
mock_process if process_still_running else None]
if process_still_running:
self.assertRaises(OSError,
utils.kill_process,
mock.sentinel.pid,
mock.sentinel.signal)
else:
utils.kill_process(mock.sentinel.pid,
mock.sentinel.signal)
@ddt.data({'return_stder': True},
{'returncode': 1,
'check_exit_code': False,
'log_fail_as_error': True},
{'returncode': 1,
'log_fail_as_error': True,
'extra_ok_codes': [1]},
{'returncode': 1,
'log_fail_as_error': True,
'exp_fail': True})
@ddt.unpack
@mock.patch.object(utils, 'create_process')
@mock.patch.object(utils, 'avoid_blocking_call')
def test_execute(self, mock_avoid_blocking_call, mock_create_process,
returncode=0, check_exit_code=True, return_stder=True,
log_fail_as_error=True, extra_ok_codes=None,
exp_fail=False):
fake_stdin = 'fake_stdin'
fake_stdout = 'fake_stdout'
fake_stderr = 'fake_stderr'
mock_popen = mock.Mock()
mock_popen.communicate.return_value = fake_stdout, fake_stderr
mock_popen.returncode = returncode
mock_create_process.return_value = mock_popen, mock.sentinel.cmd
mock_avoid_blocking_call.side_effect = (
lambda func, *args, **kwargs: func(*args, **kwargs))
args = (mock.sentinel.cmd, fake_stdin, mock.sentinel.env,
check_exit_code, return_stder, log_fail_as_error,
extra_ok_codes)
if exp_fail:
self.assertRaises(exceptions.ProcessExecutionError,
utils.execute,
*args)
else:
ret_val = utils.execute(*args)
if return_stder:
exp_ret_val = (fake_stdout, fake_stderr)
else:
exp_ret_val = fake_stdout
self.assertEqual(exp_ret_val, ret_val)
mock_create_process.assert_called_once_with(
mock.sentinel.cmd, addl_env=mock.sentinel.env,
tpool_proxy=False)
mock_avoid_blocking_call.assert_called_once_with(
mock_popen.communicate, six.b(fake_stdin))
mock_popen.communicate.assert_called_once_with(six.b(fake_stdin))
mock_popen.stdin.close.assert_called_once_with()
def test_get_root_helper_child_pid(self):
pid = utils.get_root_helper_child_pid(
mock.sentinel.pid,
mock.sentinel.exp_cmd,
run_as_root=False)
self.assertEqual(str(mock.sentinel.pid), pid)
@ddt.data(True, False)
@mock.patch.object(utils, '_get_wmi_process')
def test_process_is_running(self, process_running, mock_get_process):
mock_get_process.return_value = (
mock.sentinel.wmi_obj if process_running else None)
self.assertEqual(process_running,
utils.process_is_running(mock.sentinel.pid))
mock_get_process.assert_called_once_with(mock.sentinel.pid)
@ddt.data({},
{'process_running': False},
{'command_matches': False})
@ddt.unpack
@mock.patch.object(utils, '_get_wmi_process')
def test_pid_invoked_with_cmdline(self, mock_get_process,
process_running=True,
command_matches=False):
exp_cmd = 'exp_cmd'
mock_process = mock.Mock()
mock_get_process.return_value = (
mock_process if process_running else None)
mock_process.CommandLine = (
exp_cmd if command_matches else 'unexpected_cmd')
exp_result = process_running and command_matches
result = utils.pid_invoked_with_cmdline(mock.sentinel.pid,
[exp_cmd])
self.assertEqual(exp_result, result)
mock_get_process.assert_called_once_with(mock.sentinel.pid)
Loading…
Cancel
Save