Browse Source

Remove threading before process forking

Forking a process when multiple threads are running is an unsafe
operation and could cause a lot of problems because only current
thread will continue working in child thread. Any locked by other
thread resource will remain locked forever.

We faced with this problem during oslo.messaging development and
added workaround to hide this problem:
https://review.openstack.org/#/c/274255/

I tried to fix this problem in oslo.service:
https://review.openstack.org/#/c/270832/

but oslo folks said that this fix is ugly and it is wrong way to add
workarounds to common libraries because projects use them incorrectly.
I think that is fair.

So this patch fixes incorrect usage of oslo libraries. In this patch
I extended functionality of NeutronWorker and add there
`worker_process_count` parameter which determines how many processes
should be spawned for this worker. If `worker_process_count` = 0 - don't
create process and spawn thread in scope of current process for worker

Then I moved all background tasks to workers and return them by
`get_workers` method. start_plugin_workers collects plugin's workers
using `get_workers` method and starts in ProcessLauncher first workers
with `worker_process_count` > 0 and only after this starts threaded
workers by simple Launcher

Closes-bug: #1569404

Change-Id: I0544f1d47ae53d572adda872847a56fa0b202d2e
changes/42/276842/51
dukhlov 7 years ago committed by Dmitriy Ukhlov
parent
commit
1cafff0871
  1. 50
      neutron/db/agentschedulers_db.py
  2. 7
      neutron/db/l3_agentschedulers_db.py
  3. 13
      neutron/neutron_plugin_base_v2.py
  4. 6
      neutron/plugins/ml2/plugin.py
  5. 12
      neutron/server/rpc_eventlet.py
  6. 33
      neutron/server/wsgi_eventlet.py
  7. 120
      neutron/service.py
  8. 12
      neutron/services/l3_router/l3_router_plugin.py
  9. 5
      neutron/services/metering/metering_plugin.py
  10. 8
      neutron/services/service_base.py
  11. 2
      neutron/tests/base.py
  12. 16
      neutron/tests/functional/test_server.py
  13. 2
      neutron/tests/unit/db/test_agentschedulers_db.py
  14. 54
      neutron/worker.py
  15. 14
      neutron/wsgi.py

50
neutron/db/agentschedulers_db.py

@ -35,6 +35,7 @@ from neutron.db.availability_zone import network as network_az
from neutron.db import model_base
from neutron.extensions import agent as ext_agent
from neutron.extensions import dhcpagentscheduler
from neutron import worker as neutron_worker
LOG = logging.getLogger(__name__)
@ -82,6 +83,38 @@ class NetworkDhcpAgentBinding(model_base.BASEV2):
primary_key=True)
class AgentStatusCheckWorker(neutron_worker.NeutronWorker):
def __init__(self, check_func, interval, initial_delay):
super(AgentStatusCheckWorker, self).__init__(worker_process_count=0)
self._check_func = check_func
self._loop = None
self._interval = interval
self._initial_delay = initial_delay
def start(self):
super(AgentStatusCheckWorker, self).start()
if self._loop is None:
self._loop = loopingcall.FixedIntervalLoopingCall(self._check_func)
self._loop.start(interval=self._interval,
initial_delay=self._initial_delay)
def wait(self):
if self._loop is not None:
self._loop.wait()
def stop(self):
if self._loop is not None:
self._loop.stop()
def reset(self):
if self._loop is not None:
self.stop()
self.wait()
self.start()
class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
"""Common class for agent scheduler mixins."""
@ -121,18 +154,16 @@ class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
return result
def add_agent_status_check(self, function):
loop = loopingcall.FixedIntervalLoopingCall(function)
# TODO(enikanorov): make interval configurable rather than computed
interval = max(cfg.CONF.agent_down_time // 2, 1)
# add random initial delay to allow agents to check in after the
# neutron server first starts. random to offset multiple servers
initial_delay = random.randint(interval, interval * 2)
loop.start(interval=interval, initial_delay=initial_delay)
if hasattr(self, 'periodic_agent_loops'):
self.periodic_agent_loops.append(loop)
else:
self.periodic_agent_loops = [loop]
check_worker = AgentStatusCheckWorker(function, interval,
initial_delay)
self.add_worker(check_worker)
def agent_dead_limit_seconds(self):
return cfg.CONF.agent_down_time * 2
@ -167,6 +198,13 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler
network_scheduler = None
def start_periodic_dhcp_agent_status_check(self):
LOG.warning(
_LW("DEPRECATED method 'start_periodic_dhcp_agent_status_check'. "
"Please use 'add_periodic_dhcp_agent_status_check' instead")
)
self.add_periodic_dhcp_agent_status_check()
def add_periodic_dhcp_agent_status_check(self):
if not cfg.CONF.allow_automatic_dhcp_failover:
LOG.info(_LI("Skipping periodic DHCP agent status check because "
"automatic network rescheduling is disabled."))

7
neutron/db/l3_agentschedulers_db.py

@ -83,6 +83,13 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
router_scheduler = None
def start_periodic_l3_agent_status_check(self):
LOG.warning(
_LW("DEPRECATED method 'start_periodic_l3_agent_status_check'. "
"Please use 'add_periodic_l3_agent_status_check' instead")
)
self.add_periodic_l3_agent_status_check()
def add_periodic_l3_agent_status_check(self):
if not cfg.CONF.allow_automatic_l3agent_failover:
LOG.info(_LI("Skipping period L3 agent status check because "
"automatic router rescheduling is disabled."))

13
neutron/neutron_plugin_base_v2.py

@ -24,9 +24,11 @@ import abc
import six
from neutron import worker as neutron_worker
@six.add_metaclass(abc.ABCMeta)
class NeutronPluginBaseV2(object):
class NeutronPluginBaseV2(neutron_worker.WorkerSupportServiceMixin):
@abc.abstractmethod
def create_subnet(self, context, subnet):
@ -409,12 +411,3 @@ class NeutronPluginBaseV2(object):
"""
return (self.__class__.start_rpc_state_reports_listener !=
NeutronPluginBaseV2.start_rpc_state_reports_listener)
def get_workers(self):
"""Returns a collection NeutronWorker instances
If a plugin needs to define worker processes outside of API/RPC workers
then it will override this and return a collection of NeutronWorker
instances
"""
return ()

6
neutron/plugins/ml2/plugin.py

@ -162,6 +162,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self._setup_dhcp()
self._start_rpc_notifiers()
self.add_agent_status_check(self.agent_health_check)
self.add_workers(self.mechanism_manager.get_workers())
self._verify_service_plugins_requirements()
LOG.info(_LI("Modular L2 Plugin initialization complete"))
@ -182,7 +183,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self.network_scheduler = importutils.import_object(
cfg.CONF.network_scheduler_driver
)
self.start_periodic_dhcp_agent_status_check()
self.add_periodic_dhcp_agent_status_check()
def _verify_service_plugins_requirements(self):
for service_plugin in cfg.CONF.service_plugins:
@ -1624,9 +1625,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
return port.id
return device
def get_workers(self):
return self.mechanism_manager.get_workers()
def filter_hosts_with_network_access(
self, context, network_id, candidate_hosts):
segments = db.get_network_segments(context.session, network_id)

12
neutron/server/rpc_eventlet.py

@ -18,8 +18,9 @@
# If ../neutron/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
import eventlet
from oslo_config import cfg
from oslo_log import log
from oslo_service import service as common_service
from neutron._i18n import _LI
from neutron import service
@ -28,13 +29,14 @@ LOG = log.getLogger(__name__)
def eventlet_rpc_server():
pool = eventlet.GreenPool()
LOG.info(_LI("Eventlet based AMQP RPC server starting..."))
rpc_workers_launcher = common_service.ProcessLauncher(
cfg.CONF, wait_interval=1.0
)
try:
neutron_rpc = service.serve_rpc()
service.start_rpc_workers(rpc_workers_launcher)
except NotImplementedError:
LOG.info(_LI("RPC was already started in parent process by "
"plugin."))
else:
pool.spawn(neutron_rpc.wait)
pool.waitall()
rpc_workers_launcher.wait()

33
neutron/server/wsgi_eventlet.py

@ -12,7 +12,10 @@
# under the License.
import eventlet
from oslo_config import cfg
from oslo_log import log
from oslo_service import service as common_service
from neutron._i18n import _LI
from neutron import service
@ -26,24 +29,24 @@ def eventlet_wsgi_server():
def start_api_and_rpc_workers(neutron_api):
pool = eventlet.GreenPool()
try:
plugin_workers_launcher = common_service.ProcessLauncher(
cfg.CONF, wait_interval=1.0
)
service.start_rpc_workers(plugin_workers_launcher)
api_thread = pool.spawn(neutron_api.wait)
pool = eventlet.GreenPool()
api_thread = pool.spawn(neutron_api.wait)
plugin_workers_thread = pool.spawn(plugin_workers_launcher.wait)
try:
neutron_rpc = service.serve_rpc()
# api and other workers should die together. When one dies,
# kill the other.
api_thread.link(lambda gt: plugin_workers_thread.kill())
plugin_workers_thread.link(lambda gt: api_thread.kill())
pool.waitall()
except NotImplementedError:
LOG.info(_LI("RPC was already started in parent process by "
"plugin."))
else:
rpc_thread = pool.spawn(neutron_rpc.wait)
plugin_workers = service.start_plugin_workers()
for worker in plugin_workers:
pool.spawn(worker.wait)
# api and rpc should die together. When one dies, kill the other.
rpc_thread.link(lambda gt: api_thread.kill())
api_thread.link(lambda gt: rpc_thread.kill())
pool.waitall()
neutron_api.wait()

120
neutron/service.py

@ -32,7 +32,7 @@ from neutron.common import rpc as n_rpc
from neutron import context
from neutron.db import api as session
from neutron import manager
from neutron import worker
from neutron import worker as neutron_worker
from neutron import wsgi
@ -113,25 +113,15 @@ def serve_wsgi(cls):
return service
def start_plugin_workers():
launchers = []
# NOTE(twilson) get_service_plugins also returns the core plugin
for plugin in manager.NeutronManager.get_unique_service_plugins():
# TODO(twilson) Instead of defaulting here, come up with a good way to
# share a common get_workers default between NeutronPluginBaseV2 and
# ServicePluginBase
for plugin_worker in getattr(plugin, 'get_workers', tuple)():
launcher = common_service.ProcessLauncher(cfg.CONF)
launcher.launch_service(plugin_worker)
launchers.append(launcher)
return launchers
class RpcWorker(worker.NeutronWorker):
class RpcWorker(neutron_worker.NeutronWorker):
"""Wraps a worker to be handled by ProcessLauncher"""
start_listeners_method = 'start_rpc_listeners'
def __init__(self, plugins):
def __init__(self, plugins, worker_process_count=1):
super(RpcWorker, self).__init__(
worker_process_count=worker_process_count
)
self._plugins = plugins
self._servers = []
@ -178,7 +168,7 @@ class RpcReportsWorker(RpcWorker):
start_listeners_method = 'start_rpc_state_reports_listener'
def serve_rpc():
def _get_rpc_workers():
plugin = manager.NeutronManager.get_plugin()
service_plugins = (
manager.NeutronManager.get_service_plugins().values())
@ -198,31 +188,101 @@ def serve_rpc():
cfg.CONF.rpc_workers)
raise NotImplementedError()
# passing service plugins only, because core plugin is among them
rpc_workers = [RpcWorker(service_plugins,
worker_process_count=cfg.CONF.rpc_workers)]
if (cfg.CONF.rpc_state_report_workers > 0 and
plugin.rpc_state_report_workers_supported()):
rpc_workers.append(
RpcReportsWorker(
[plugin],
worker_process_count=cfg.CONF.rpc_state_report_workers
)
)
return rpc_workers
class AllServicesNeutronWorker(neutron_worker.NeutronWorker):
def __init__(self, services, worker_process_count=1):
super(AllServicesNeutronWorker, self).__init__(worker_process_count)
self._services = services
self._launcher = common_service.Launcher(cfg.CONF)
def start(self):
for srv in self._services:
self._launcher.launch_service(srv)
super(AllServicesNeutronWorker, self).start()
def stop(self):
self._launcher.stop()
def wait(self):
self._launcher.wait()
def reset(self):
self._launcher.restart()
def _start_workers(worker_launcher, workers):
if not workers:
return
try:
# passing service plugins only, because core plugin is among them
rpc = RpcWorker(service_plugins)
# dispose the whole pool before os.fork, otherwise there will
# be shared DB connections in child processes which may cause
# DB errors.
LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers)
session.dispose()
launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0)
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
if (cfg.CONF.rpc_state_report_workers > 0 and
plugin.rpc_state_report_workers_supported()):
rpc_state_rep = RpcReportsWorker([plugin])
LOG.debug('using launcher for state reports rpc, workers=%s',
cfg.CONF.rpc_state_report_workers)
launcher.launch_service(
rpc_state_rep, workers=cfg.CONF.rpc_state_report_workers)
return launcher
for worker in workers:
worker_launcher.launch_service(worker, worker.worker_process_count)
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE('Unrecoverable error: please check log for '
'details.'))
def start_plugin_workers(worker_launcher):
# NOTE(twilson) get_service_plugins also returns the core plugin
plugins = manager.NeutronManager.get_unique_service_plugins()
# TODO(twilson) Instead of defaulting here, come up with a good way to
# share a common get_workers default between NeutronPluginBaseV2 and
# ServicePluginBase
plugin_workers = [
plugin_worker
for plugin in plugins if hasattr(plugin, 'get_workers')
for plugin_worker in plugin.get_workers()
]
# we need to fork all necessary processes before spawning extra threads
# to avoid problems with infinite resource locking and other concurrency
# problems
process_plugin_workers = [
plugin_worker for plugin_worker in plugin_workers
if plugin_worker.worker_process_count > 0
]
thread_plugin_workers = [
plugin_worker for plugin_worker in plugin_workers
if plugin_worker.worker_process_count < 1
]
# add extra process worker and spawn there all workers with
# worker_process_count == 0
if thread_plugin_workers:
process_plugin_workers.append(
AllServicesNeutronWorker(thread_plugin_workers)
)
_start_workers(worker_launcher, process_plugin_workers)
def start_rpc_workers(worker_launcher):
rpc_workers = _get_rpc_workers()
_start_workers(worker_launcher, rpc_workers)
def _get_api_workers():
workers = cfg.CONF.api_workers
if not workers:

12
neutron/services/l3_router/l3_router_plugin.py

@ -32,6 +32,7 @@ from neutron.db import l3_gwmode_db
from neutron.db import l3_hamode_db
from neutron.plugins.common import constants
from neutron.quota import resource_registry
from neutron import service
from neutron.services import service_base
@ -62,20 +63,23 @@ class L3RouterPlugin(service_base.ServicePluginBase,
def __init__(self):
self.router_scheduler = importutils.import_object(
cfg.CONF.router_scheduler_driver)
self.start_periodic_l3_agent_status_check()
self.add_periodic_l3_agent_status_check()
super(L3RouterPlugin, self).__init__()
if 'dvr' in self.supported_extension_aliases:
l3_dvrscheduler_db.subscribe()
l3_db.subscribe()
self.start_rpc_listeners()
self.agent_notifiers.update(
{n_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
rpc_worker = service.RpcWorker([self], worker_process_count=0)
self.add_worker(rpc_worker)
@log_helpers.log_method_call
def start_rpc_listeners(self):
# RPC support
self.topic = topics.L3PLUGIN
self.conn = n_rpc.create_connection()
self.agent_notifiers.update(
{n_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
self.endpoints = [l3_rpc.L3RpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)

5
neutron/services/metering/metering_plugin.py

@ -17,6 +17,7 @@ from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.db.metering import metering_db
from neutron.db.metering import metering_rpc
from neutron import service
class MeteringPlugin(metering_db.MeteringDbMixin):
@ -28,7 +29,9 @@ class MeteringPlugin(metering_db.MeteringDbMixin):
super(MeteringPlugin, self).__init__()
self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
self.start_rpc_listeners()
rpc_worker = service.RpcWorker([self], worker_process_count=0)
self.add_worker(rpc_worker)
def start_rpc_listeners(self):
self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)]

8
neutron/services/service_base.py

@ -24,12 +24,14 @@ from neutron._i18n import _, _LE, _LI
from neutron.api import extensions
from neutron.db import servicetype_db as sdb
from neutron.services import provider_configuration as pconf
from neutron import worker as neutron_worker
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class ServicePluginBase(extensions.PluginInterface):
class ServicePluginBase(extensions.PluginInterface,
neutron_worker.WorkerSupportServiceMixin):
"""Define base interface for any Advanced Service plugin."""
supported_extension_aliases = []
@ -46,10 +48,6 @@ class ServicePluginBase(extensions.PluginInterface):
"""Return string description of the plugin."""
pass
def get_workers(self):
"""Returns a collection of NeutronWorkers"""
return ()
def load_drivers(service_type, plugin):
"""Loads drivers for specific service.

2
neutron/tests/base.py

@ -414,7 +414,7 @@ class PluginFixture(fixtures.Fixture):
self.patched_default_svc_plugins = self.default_svc_plugins_p.start()
self.dhcp_periodic_p = mock.patch(
'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.'
'start_periodic_dhcp_agent_status_check')
'add_periodic_dhcp_agent_status_check')
self.patched_dhcp_periodic = self.dhcp_periodic_p.start()
self.agent_health_check_p = mock.patch(
'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.'

16
neutron/tests/functional/test_server.py

@ -22,12 +22,13 @@ import traceback
import httplib2
import mock
from oslo_config import cfg
from oslo_service import service as common_service
import psutil
from neutron.agent.linux import utils
from neutron import service
from neutron.tests import base
from neutron import worker
from neutron import worker as neutron_worker
from neutron import wsgi
@ -244,8 +245,9 @@ class TestRPCServer(TestNeutronServer):
# not interested in state report workers specifically
CONF.set_override("rpc_state_report_workers", 0)
launcher = service.serve_rpc()
launcher.wait()
rpc_workers_launcher = common_service.ProcessLauncher(CONF)
service.start_rpc_workers(rpc_workers_launcher)
rpc_workers_launcher.wait()
def test_restart_rpc_on_sighup_multiple_workers(self):
self._test_restart_service_on_sighup(service=self._serve_rpc,
@ -264,12 +266,12 @@ class TestPluginWorker(TestNeutronServer):
def _start_plugin(self, workers=1):
with mock.patch('neutron.manager.NeutronManager.get_plugin') as gp:
gp.return_value = self.plugin
launchers = service.start_plugin_workers()
for launcher in launchers:
launcher.wait()
plugin_workers_launcher = common_service.ProcessLauncher(CONF)
service.start_plugin_workers(plugin_workers_launcher)
plugin_workers_launcher.wait()
def test_start(self):
class FakeWorker(worker.NeutronWorker):
class FakeWorker(neutron_worker.NeutronWorker):
def start(self):
pass

2
neutron/tests/unit/db/test_agentschedulers_db.py

@ -256,7 +256,7 @@ class OvsAgentSchedulerTestCaseBase(test_l3.L3NatTestCaseMixin,
self.patched_l3_notify = self.l3_notify_p.start()
self.l3_periodic_p = mock.patch('neutron.db.l3_agentschedulers_db.'
'L3AgentSchedulerDbMixin.'
'start_periodic_l3_agent_status_check')
'add_periodic_l3_agent_status_check')
self.patched_l3_periodic = self.l3_periodic_p.start()
self.dhcp_notify_p = mock.patch(
'neutron.extensions.dhcpagentscheduler.notify')

54
neutron/worker.py

@ -17,6 +17,38 @@ from neutron.callbacks import registry
from neutron.callbacks import resources
class WorkerSupportServiceMixin(object):
@property
def _workers(self):
try:
return self.__workers
except AttributeError:
self.__workers = []
return self.__workers
def get_workers(self):
"""Returns a collection NeutronWorker instances needed by this service
"""
return list(self._workers)
def add_worker(self, worker):
"""Adds NeutronWorker needed for this service
If a object needs to define workers thread/processes outside of API/RPC
workers then it will call this method to register worker. Should be
called on initialization stage before running services
"""
self._workers.append(worker)
def add_workers(self, workers):
"""Adds NeutronWorker list needed for this service
The same as add_worker but adds a list of workers
"""
self._workers.extend(workers)
class NeutronWorker(service.ServiceBase):
"""Partial implementation of the ServiceBase ABC
@ -36,5 +68,25 @@ class NeutronWorker(service.ServiceBase):
super(MyPluginWorker, self).start()
do_sync()
"""
# default class value for case when super().__init__ is not called
_worker_process_count = 1
def __init__(self, worker_process_count=_worker_process_count):
"""
Initialize worker
:param worker_process_count: Defines how many processes to spawn for
worker:
0 - spawn 1 new worker thread,
1..N - spawn N new worker processes
"""
self._worker_process_count = worker_process_count
@property
def worker_process_count(self):
return self._worker_process_count
def start(self):
registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start)
if self.worker_process_count > 0:
registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start)

14
neutron/wsgi.py

@ -44,7 +44,7 @@ from neutron.common import config
from neutron.common import exceptions as n_exc
from neutron import context
from neutron.db import api
from neutron import worker
from neutron import worker as neutron_worker
socket_opts = [
cfg.IntOpt('backlog',
@ -74,9 +74,12 @@ def encode_body(body):
return encodeutils.to_utf8(body)
class WorkerService(worker.NeutronWorker):
class WorkerService(neutron_worker.NeutronWorker):
"""Wraps a worker to be handled by ProcessLauncher"""
def __init__(self, service, application, disable_ssl=False):
def __init__(self, service, application, disable_ssl=False,
worker_process_count=0):
super(WorkerService, self).__init__(worker_process_count)
self._service = service
self._application = application
self._disable_ssl = disable_ssl
@ -188,7 +191,7 @@ class Server(object):
self._launch(application, workers)
def _launch(self, application, workers=0):
service = WorkerService(self, application, self.disable_ssl)
service = WorkerService(self, application, self.disable_ssl, workers)
if workers < 1:
# The API service should run in the current process.
self._server = service
@ -206,7 +209,8 @@ class Server(object):
# wait interval past the default of 0.01s.
self._server = common_service.ProcessLauncher(cfg.CONF,
wait_interval=1.0)
self._server.launch_service(service, workers=workers)
self._server.launch_service(service,
workers=service.worker_process_count)
@property
def host(self):

Loading…
Cancel
Save