Trivial: Move platform independent modules to common dir

async_process.py and ovsdb_monitor.py are now platform
independent, for which reason we can move them to
neutron/agent/common.

Note that a few subprojects are using async_process. We'll use
debtcollector so that we don't break those projects, while logging
a deprecation warning.

Change-Id: I6a7418cb8680cd71fe16c5d98b9b09ef2d260d37
This commit is contained in:
Lucian Petrut 2018-07-17 14:10:00 +03:00
parent fee630efaa
commit 89915a752e
17 changed files with 322 additions and 296 deletions

View File

@ -0,0 +1,272 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import signal
import eventlet
import eventlet.event
import eventlet.queue
from neutron_lib.utils import helpers
from oslo_log import log as logging
from neutron._i18n import _
from neutron.agent.common import ip_lib
from neutron.agent.common import utils
from neutron.common import utils as common_utils
LOG = logging.getLogger(__name__)
class AsyncProcessException(Exception):
pass
class AsyncProcess(object):
"""Manages an asynchronous process.
This class spawns a new process via subprocess and uses
greenthreads to read stderr and stdout asynchronously into queues
that can be read via repeatedly calling iter_stdout() and
iter_stderr().
If respawn_interval is non-zero, any error in communicating with
the managed process will result in the process and greenthreads
being cleaned up and the process restarted after the specified
interval.
Example usage:
>>> import time
>>> proc = AsyncProcess(['ping'])
>>> proc.start()
>>> time.sleep(5)
>>> proc.stop()
>>> for line in proc.iter_stdout():
... print(line)
"""
def __init__(self, cmd, run_as_root=False, respawn_interval=None,
namespace=None, log_output=False, die_on_error=False):
"""Constructor.
:param cmd: The list of command arguments to invoke.
:param run_as_root: The process should run with elevated privileges.
:param respawn_interval: Optional, the interval in seconds to wait
to respawn after unexpected process death. Respawn will
only be attempted if a value of 0 or greater is provided.
:param namespace: Optional, start the command in the specified
namespace.
:param log_output: Optional, also log received output.
:param die_on_error: Optional, kills the process on stderr output.
"""
self.cmd_without_namespace = cmd
self._cmd = ip_lib.add_namespace_to_cmd(cmd, namespace)
self.run_as_root = run_as_root
if respawn_interval is not None and respawn_interval < 0:
raise ValueError(_('respawn_interval must be >= 0 if provided.'))
self.respawn_interval = respawn_interval
self._process = None
self._pid = None
self._is_running = False
self._kill_event = None
self._reset_queues()
self._watchers = []
self.log_output = log_output
self.die_on_error = die_on_error
@property
def cmd(self):
return ' '.join(self._cmd)
def _reset_queues(self):
self._stdout_lines = eventlet.queue.LightQueue()
self._stderr_lines = eventlet.queue.LightQueue()
def is_active(self):
# If using sudo rootwrap as a root_helper, we have to wait until sudo
# spawns rootwrap and rootwrap spawns the process. self.pid will make
# sure to get the correct pid.
return utils.pid_invoked_with_cmdline(
self.pid, self.cmd_without_namespace)
def start(self, block=False):
"""Launch a process and monitor it asynchronously.
:param block: Block until the process has started.
:raises utils.WaitTimeout if blocking is True and the process
did not start in time.
"""
LOG.debug('Launching async process [%s].', self.cmd)
if self._is_running:
raise AsyncProcessException(_('Process is already started'))
else:
self._spawn()
if block:
common_utils.wait_until_true(self.is_active)
def stop(self, block=False, kill_signal=None):
"""Halt the process and watcher threads.
:param block: Block until the process has stopped.
:param kill_signal: Number of signal that will be sent to the process
when terminating the process
:raises utils.WaitTimeout if blocking is True and the process
did not stop in time.
"""
kill_signal = kill_signal or getattr(signal, 'SIGKILL', signal.SIGTERM)
if self._is_running:
LOG.debug('Halting async process [%s].', self.cmd)
self._kill(kill_signal)
else:
raise AsyncProcessException(_('Process is not running.'))
if block:
common_utils.wait_until_true(lambda: not self.is_active())
def _spawn(self):
"""Spawn a process and its watchers."""
self._is_running = True
self._pid = None
self._kill_event = eventlet.event.Event()
self._process, cmd = utils.create_process(self._cmd,
run_as_root=self.run_as_root)
self._watchers = []
for reader in (self._read_stdout, self._read_stderr):
# Pass the stop event directly to the greenthread to
# ensure that assignment of a new event to the instance
# attribute does not prevent the greenthread from using
# the original event.
watcher = eventlet.spawn(self._watch_process,
reader,
self._kill_event)
self._watchers.append(watcher)
@property
def pid(self):
if self._process:
if not self._pid:
self._pid = utils.get_root_helper_child_pid(
self._process.pid,
self.cmd_without_namespace,
run_as_root=self.run_as_root)
return self._pid
def _kill(self, kill_signal):
"""Kill the process and the associated watcher greenthreads."""
pid = self.pid
if pid:
self._is_running = False
self._pid = None
self._kill_process(pid, kill_signal)
# Halt the greenthreads if they weren't already.
if self._kill_event:
self._kill_event.send()
self._kill_event = None
def _kill_process(self, pid, kill_signal):
try:
# A process started by a root helper will be running as
# root and need to be killed via the same helper.
utils.kill_process(pid, kill_signal, self.run_as_root)
except Exception:
LOG.exception('An error occurred while killing [%s].',
self.cmd)
return False
if self._process:
self._process.wait()
return True
def _handle_process_error(self):
"""Kill the async process and respawn if necessary."""
stdout = list(self.iter_stdout())
stderr = list(self.iter_stderr())
LOG.debug('Halting async process [%s] in response to an error. stdout:'
' [%s] - stderr: [%s]', self.cmd, stdout, stderr)
self._kill(getattr(signal, 'SIGKILL', signal.SIGTERM))
if self.respawn_interval is not None and self.respawn_interval >= 0:
eventlet.sleep(self.respawn_interval)
LOG.debug('Respawning async process [%s].', self.cmd)
try:
self.start()
except AsyncProcessException:
# Process was already respawned by someone else...
pass
def _watch_process(self, callback, kill_event):
while not kill_event.ready():
try:
output = callback()
if not output and output != "":
break
except Exception:
LOG.exception('An error occurred while communicating '
'with async process [%s].', self.cmd)
break
# Ensure that watching a process with lots of output does
# not block execution of other greenthreads.
eventlet.sleep()
# self._is_running being True indicates that the loop was
# broken out of due to an error in the watched process rather
# than the loop condition being satisfied.
if self._is_running:
self._is_running = False
self._handle_process_error()
def _read(self, stream, queue):
data = stream.readline()
if data:
data = helpers.safe_decode_utf8(data.strip())
queue.put(data)
return data
def _read_stdout(self):
data = self._read(self._process.stdout, self._stdout_lines)
if self.log_output:
LOG.debug('Output received from [%(cmd)s]: %(data)s',
{'cmd': self.cmd,
'data': data})
return data
def _read_stderr(self):
data = self._read(self._process.stderr, self._stderr_lines)
if self.log_output:
LOG.error('Error received from [%(cmd)s]: %(err)s',
{'cmd': self.cmd,
'err': data})
if self.die_on_error:
LOG.error("Process [%(cmd)s] dies due to the error: %(err)s",
{'cmd': self.cmd,
'err': data})
# the callback caller will use None to indicate the need to bail
# out of the thread
return None
return data
def _iter_queue(self, queue, block):
while True:
try:
yield queue.get(block=block)
except eventlet.queue.Empty:
break
def iter_stdout(self, block=False):
return self._iter_queue(self._stdout_lines, block)
def iter_stderr(self, block=False):
return self._iter_queue(self._stderr_lines, block)

View File

@ -19,7 +19,7 @@ from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from neutron.agent.linux import async_process
from neutron.agent.common import async_process
from neutron.agent.ovsdb import api as ovsdb
from neutron.agent.ovsdb.native import helpers
from neutron.common import utils

View File

@ -12,261 +12,17 @@
# License for the specific language governing permissions and limitations
# under the License.
import signal
from debtcollector import moves
import eventlet
import eventlet.event
import eventlet.queue
from neutron_lib.utils import helpers
from oslo_log import log as logging
from neutron._i18n import _
from neutron.agent.common import ip_lib
from neutron.agent.common import utils
from neutron.common import utils as common_utils
from neutron.agent.common import async_process
LOG = logging.getLogger(__name__)
AsyncProcessException = moves.moved_class(
async_process.AsyncProcessException,
'AsyncProcessException',
__name__)
class AsyncProcessException(Exception):
pass
class AsyncProcess(object):
"""Manages an asynchronous process.
This class spawns a new process via subprocess and uses
greenthreads to read stderr and stdout asynchronously into queues
that can be read via repeatedly calling iter_stdout() and
iter_stderr().
If respawn_interval is non-zero, any error in communicating with
the managed process will result in the process and greenthreads
being cleaned up and the process restarted after the specified
interval.
Example usage:
>>> import time
>>> proc = AsyncProcess(['ping'])
>>> proc.start()
>>> time.sleep(5)
>>> proc.stop()
>>> for line in proc.iter_stdout():
... print(line)
"""
def __init__(self, cmd, run_as_root=False, respawn_interval=None,
namespace=None, log_output=False, die_on_error=False):
"""Constructor.
:param cmd: The list of command arguments to invoke.
:param run_as_root: The process should run with elevated privileges.
:param respawn_interval: Optional, the interval in seconds to wait
to respawn after unexpected process death. Respawn will
only be attempted if a value of 0 or greater is provided.
:param namespace: Optional, start the command in the specified
namespace.
:param log_output: Optional, also log received output.
:param die_on_error: Optional, kills the process on stderr output.
"""
self.cmd_without_namespace = cmd
self._cmd = ip_lib.add_namespace_to_cmd(cmd, namespace)
self.run_as_root = run_as_root
if respawn_interval is not None and respawn_interval < 0:
raise ValueError(_('respawn_interval must be >= 0 if provided.'))
self.respawn_interval = respawn_interval
self._process = None
self._pid = None
self._is_running = False
self._kill_event = None
self._reset_queues()
self._watchers = []
self.log_output = log_output
self.die_on_error = die_on_error
@property
def cmd(self):
return ' '.join(self._cmd)
def _reset_queues(self):
self._stdout_lines = eventlet.queue.LightQueue()
self._stderr_lines = eventlet.queue.LightQueue()
def is_active(self):
# If using sudo rootwrap as a root_helper, we have to wait until sudo
# spawns rootwrap and rootwrap spawns the process. self.pid will make
# sure to get the correct pid.
return utils.pid_invoked_with_cmdline(
self.pid, self.cmd_without_namespace)
def start(self, block=False):
"""Launch a process and monitor it asynchronously.
:param block: Block until the process has started.
:raises utils.WaitTimeout if blocking is True and the process
did not start in time.
"""
LOG.debug('Launching async process [%s].', self.cmd)
if self._is_running:
raise AsyncProcessException(_('Process is already started'))
else:
self._spawn()
if block:
common_utils.wait_until_true(self.is_active)
def stop(self, block=False, kill_signal=None):
"""Halt the process and watcher threads.
:param block: Block until the process has stopped.
:param kill_signal: Number of signal that will be sent to the process
when terminating the process
:raises utils.WaitTimeout if blocking is True and the process
did not stop in time.
"""
kill_signal = kill_signal or getattr(signal, 'SIGKILL', signal.SIGTERM)
if self._is_running:
LOG.debug('Halting async process [%s].', self.cmd)
self._kill(kill_signal)
else:
raise AsyncProcessException(_('Process is not running.'))
if block:
common_utils.wait_until_true(lambda: not self.is_active())
def _spawn(self):
"""Spawn a process and its watchers."""
self._is_running = True
self._pid = None
self._kill_event = eventlet.event.Event()
self._process, cmd = utils.create_process(self._cmd,
run_as_root=self.run_as_root)
self._watchers = []
for reader in (self._read_stdout, self._read_stderr):
# Pass the stop event directly to the greenthread to
# ensure that assignment of a new event to the instance
# attribute does not prevent the greenthread from using
# the original event.
watcher = eventlet.spawn(self._watch_process,
reader,
self._kill_event)
self._watchers.append(watcher)
@property
def pid(self):
if self._process:
if not self._pid:
self._pid = utils.get_root_helper_child_pid(
self._process.pid,
self.cmd_without_namespace,
run_as_root=self.run_as_root)
return self._pid
def _kill(self, kill_signal):
"""Kill the process and the associated watcher greenthreads."""
pid = self.pid
if pid:
self._is_running = False
self._pid = None
self._kill_process(pid, kill_signal)
# Halt the greenthreads if they weren't already.
if self._kill_event:
self._kill_event.send()
self._kill_event = None
def _kill_process(self, pid, kill_signal):
try:
# A process started by a root helper will be running as
# root and need to be killed via the same helper.
utils.kill_process(pid, kill_signal, self.run_as_root)
except Exception:
LOG.exception('An error occurred while killing [%s].',
self.cmd)
return False
if self._process:
self._process.wait()
return True
def _handle_process_error(self):
"""Kill the async process and respawn if necessary."""
stdout = list(self.iter_stdout())
stderr = list(self.iter_stderr())
LOG.debug('Halting async process [%s] in response to an error. stdout:'
' [%s] - stderr: [%s]', self.cmd, stdout, stderr)
self._kill(getattr(signal, 'SIGKILL', signal.SIGTERM))
if self.respawn_interval is not None and self.respawn_interval >= 0:
eventlet.sleep(self.respawn_interval)
LOG.debug('Respawning async process [%s].', self.cmd)
try:
self.start()
except AsyncProcessException:
# Process was already respawned by someone else...
pass
def _watch_process(self, callback, kill_event):
while not kill_event.ready():
try:
output = callback()
if not output and output != "":
break
except Exception:
LOG.exception('An error occurred while communicating '
'with async process [%s].', self.cmd)
break
# Ensure that watching a process with lots of output does
# not block execution of other greenthreads.
eventlet.sleep()
# self._is_running being True indicates that the loop was
# broken out of due to an error in the watched process rather
# than the loop condition being satisfied.
if self._is_running:
self._is_running = False
self._handle_process_error()
def _read(self, stream, queue):
data = stream.readline()
if data:
data = helpers.safe_decode_utf8(data.strip())
queue.put(data)
return data
def _read_stdout(self):
data = self._read(self._process.stdout, self._stdout_lines)
if self.log_output:
LOG.debug('Output received from [%(cmd)s]: %(data)s',
{'cmd': self.cmd,
'data': data})
return data
def _read_stderr(self):
data = self._read(self._process.stderr, self._stderr_lines)
if self.log_output:
LOG.error('Error received from [%(cmd)s]: %(err)s',
{'cmd': self.cmd,
'err': data})
if self.die_on_error:
LOG.error("Process [%(cmd)s] dies due to the error: %(err)s",
{'cmd': self.cmd,
'err': data})
# the callback caller will use None to indicate the need to bail
# out of the thread
return None
return data
def _iter_queue(self, queue, block):
while True:
try:
yield queue.get(block=block)
except eventlet.queue.Empty:
break
def iter_stdout(self, block=False):
return self._iter_queue(self._stdout_lines, block)
def iter_stderr(self, block=False):
return self._iter_queue(self._stderr_lines, block)
AsyncProcess = moves.moved_class(
async_process.AsyncProcess,
'AsyncProcess',
__name__)

View File

@ -16,7 +16,7 @@
from oslo_log import log as logging
from oslo_utils import excutils
from neutron.agent.linux import async_process
from neutron.agent.common import async_process
from neutron.agent.linux import ip_lib
LOG = logging.getLogger(__name__)

View File

@ -18,9 +18,9 @@ import eventlet
from oslo_config import cfg
from oslo_log import log as logging
from neutron.agent.common import async_process
from neutron.agent.common import base_polling
from neutron.agent.linux import async_process
from neutron.agent.linux import ovsdb_monitor
from neutron.agent.common import ovsdb_monitor
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
LOG = logging.getLogger(__name__)

View File

@ -44,10 +44,10 @@ from six import moves
from neutron._i18n import _
from neutron.agent.common import ip_lib
from neutron.agent.common import ovs_lib
from neutron.agent.common import ovsdb_monitor
from neutron.agent.common import polling
from neutron.agent.common import utils
from neutron.agent.l2 import l2_agent_extensions_manager as ext_manager
from neutron.agent.linux import ovsdb_monitor
from neutron.agent.linux import xenapi_root_helper
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as agent_sg_rpc

View File

@ -18,7 +18,7 @@ import signal
from oslo_log import log as logging
from neutron.agent.linux import async_process
from neutron.agent.common import async_process
from neutron.agent.linux import iptables_manager
from neutron.common import utils as common_utils

View File

@ -20,7 +20,7 @@ import netaddr
from neutron_lib.api.definitions import portbindings as pbs
from neutron_lib import constants
from neutron.agent.linux import async_process
from neutron.agent.common import async_process
from neutron.agent.linux import ip_lib
from neutron.common import utils
from neutron.tests.common import machine_fixtures

View File

@ -24,7 +24,7 @@ from neutronclient.v2_0 import client
from oslo_log import log as logging
from oslo_utils import fileutils
from neutron.agent.linux import async_process
from neutron.agent.common import async_process
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.common import utils as common_utils

View File

@ -16,7 +16,7 @@ import eventlet
import six
from neutron._i18n import _
from neutron.agent.linux import async_process
from neutron.agent.common import async_process
from neutron.agent.linux import utils
from neutron.common import utils as common_utils
from neutron.tests import base

View File

@ -13,7 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.agent.linux import async_process
from neutron.agent.common import async_process
from neutron.agent.linux import ip_monitor
from neutron.tests.functional.agent.linux import test_ip_lib

View File

@ -25,7 +25,7 @@ Tests in this module will be skipped unless:
from oslo_config import cfg
from neutron.agent.common import ovs_lib
from neutron.agent.linux import ovsdb_monitor
from neutron.agent.common import ovsdb_monitor
from neutron.common import utils
from neutron.tests import base
from neutron.tests.common import net_helpers

View File

@ -14,7 +14,7 @@
import functools
from neutron.agent.linux import async_process
from neutron.agent.common import async_process
from neutron.agent.linux import utils
from neutron.common import utils as common_utils
from neutron.tests.functional.agent.linux import test_async_process

View File

@ -19,8 +19,8 @@ import eventlet.queue
import mock
import testtools
from neutron.agent.common import async_process
from neutron.agent.common import utils
from neutron.agent.linux import async_process
from neutron.tests import base
from neutron.tests.unit.agent.linux import failing_process

View File

@ -14,8 +14,9 @@
import mock
from neutron.agent.common import async_process
from neutron.agent.common import ovs_lib
from neutron.agent.linux import ovsdb_monitor
from neutron.agent.common import ovsdb_monitor
from neutron.agent.ovsdb.native import helpers
from neutron.tests import base
@ -29,26 +30,25 @@ class TestOvsdbMonitor(base.BaseTestCase):
def test___init__(self):
ovsdb_monitor.OvsdbMonitor('Interface')
def test___init___with_columns(self):
@mock.patch.object(async_process.AsyncProcess, '__init__')
def test___init___with_columns(self, init):
columns = ['col1', 'col2']
with mock.patch(
'neutron.agent.linux.async_process.AsyncProcess.__init__') as init:
ovsdb_monitor.OvsdbMonitor('Interface', columns=columns)
cmd = init.call_args_list[0][0][0]
self.assertEqual('col1,col2', cmd[-1])
def test___init___with_format(self):
with mock.patch(
'neutron.agent.linux.async_process.AsyncProcess.__init__') as init:
@mock.patch.object(async_process.AsyncProcess, '__init__')
def test___init___with_format(self, init):
ovsdb_monitor.OvsdbMonitor('Interface', format='blob')
cmd = init.call_args_list[0][0][0]
self.assertEqual('--format=blob', cmd[-1])
def test__init__with_connection_columns(self):
@mock.patch.object(async_process.AsyncProcess, '__init__')
def test__init__with_connection_columns(self, init):
conn_info = 'tcp:10.10.10.10:6640'
columns = ['col1', 'col2']
with mock.patch(
'neutron.agent.linux.async_process.AsyncProcess.__init__') as init:
ovsdb_monitor.OvsdbMonitor('Interface', columns=columns,
ovsdb_connection=conn_info)
cmd_all = init.call_args_list[0][0][0]
@ -64,9 +64,7 @@ class TestSimpleInterfaceMonitor(base.BaseTestCase):
self.monitor = ovsdb_monitor.SimpleInterfaceMonitor()
def test_has_updates_is_false_if_active_with_no_output(self):
target = ('neutron.agent.linux.ovsdb_monitor.SimpleInterfaceMonitor'
'.is_active')
with mock.patch(target, return_value=True):
with mock.patch.object(self.monitor, 'is_active', return_value=True):
self.assertFalse(self.monitor.has_updates)
def test_has_updates_after_calling_get_events_is_false(self):

View File

@ -59,7 +59,7 @@ class TestInterfacePollingMinimizer(base.BaseTestCase):
mock_stop.assert_called_with()
def mock_has_updates(self, return_value):
target = ('neutron.agent.linux.ovsdb_monitor.SimpleInterfaceMonitor'
target = ('neutron.agent.common.ovsdb_monitor.SimpleInterfaceMonitor'
'.has_updates')
return mock.patch(
target,

View File

@ -24,9 +24,9 @@ import oslo_messaging
import testtools
from neutron._i18n import _
from neutron.agent.common import async_process
from neutron.agent.common import ovs_lib
from neutron.agent.common import utils
from neutron.agent.linux import async_process
from neutron.agent.linux import ip_lib
from neutron.common import rpc as n_rpc
from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc
@ -1681,7 +1681,7 @@ class TestOvsNeutronAgent(object):
'physnet1': ex_br_mocks[1]},
bm_mock = mock.Mock()
with mock.patch(
'neutron.agent.linux.ovsdb_monitor.get_bridges_monitor',
'neutron.agent.common.ovsdb_monitor.get_bridges_monitor',
return_value=bm_mock),\
mock.patch.object(
self.agent,
@ -1712,7 +1712,7 @@ class TestOvsNeutronAgent(object):
with mock.patch(
'neutron.agent.common.polling.get_polling_manager'
) as mock_get_pm, mock.patch(
'neutron.agent.linux.ovsdb_monitor.get_bridges_monitor'
'neutron.agent.common.ovsdb_monitor.get_bridges_monitor'
) as mock_get_bm, mock.patch.object(
self.agent, 'rpc_loop'
) as mock_loop, mock.patch.dict(