Add support for PluginWorker and Process creation notification
There are several cases where plugin initialization should be handled after neutron-server forks API/RPC workers. For example, starting a client connection to an SDN controller before forking copies the fd of the socket to the child process, but then you have multiple processes trying to read/write the same socket connection. It is also useful for a plugin to be able to do something in only one process, regardless of how many workers are forked. One example would be handling syncing from an external system to the neutron database. This patch does 3 things: 1) Treats rpc_workers=0 as = 1. This simplifies the code for handling notification that forking has completed. In the existing code, calling the notification in the Worker object's start() method would happen twice in the case where both api and rpc workers were 0, despite there being only one process. An earlier patch already changed the default api_workers to be the number of processors. 2) Adds notification of forking via the callbacks mechanism. Plugins can subscribe to resources.PROCESS, event.AFTER_CREATE and do any post-fork initialization that needs to be done for every spawned process. 3) Adds core/service plugin calls to get_workers() which defaults to returning (). Plugins that need additional processes to spawn should just return an iterable of NeutronWorkers that will be spawned in their own process. DocImpact Closes-Bug: #1463129 Change-Id: Ib99954678c2b4f32f486b537979d446aafbea07b
This commit is contained in:
parent
9ed4be7559
commit
9f6bd17703
@ -301,19 +301,13 @@
|
||||
# ========== end of items for VLAN trunking networks ==========
|
||||
|
||||
# =========== WSGI parameters related to the API server ==============
|
||||
# Number of separate worker processes to spawn. A value of 0 runs the
|
||||
# worker thread in the current process. Greater than 0 launches that number of
|
||||
# child processes as workers. The parent process manages them. If not
|
||||
# specified, the default value is equal to the number of CPUs available to
|
||||
# achieve best performance.
|
||||
# Number of separate API worker processes to spawn. If not specified or < 1,
|
||||
# the default value is equal to the number of CPUs available.
|
||||
# api_workers = <number of CPUs>
|
||||
|
||||
# Number of separate RPC worker processes to spawn. The default, 0, runs the
|
||||
# worker thread in the current process. Greater than 0 launches that number of
|
||||
# child processes as RPC workers. The parent process manages them.
|
||||
# This feature is experimental until issues are addressed and testing has been
|
||||
# enabled for various plugins for compatibility.
|
||||
# rpc_workers = 0
|
||||
# Number of separate RPC worker processes to spawn. If not specified or < 1,
|
||||
# a single RPC worker process is spawned by the parent process.
|
||||
# rpc_workers = 1
|
||||
|
||||
# Timeout for client connections socket operations. If an
|
||||
# incoming connection is idle for this number of seconds it
|
||||
|
@ -12,6 +12,7 @@
|
||||
|
||||
# String literals representing core resources.
|
||||
PORT = 'port'
|
||||
PROCESS = 'process'
|
||||
ROUTER = 'router'
|
||||
ROUTER_GATEWAY = 'router_gateway'
|
||||
ROUTER_INTERFACE = 'router_interface'
|
||||
|
@ -246,3 +246,8 @@ class NeutronManager(object):
|
||||
service_plugins = cls.get_instance().service_plugins
|
||||
return dict((x, weakref.proxy(y))
|
||||
for x, y in six.iteritems(service_plugins))
|
||||
|
||||
@classmethod
|
||||
def get_unique_service_plugins(cls):
|
||||
service_plugins = cls.get_instance().service_plugins
|
||||
return tuple(weakref.proxy(x) for x in set(service_plugins.values()))
|
||||
|
@ -389,3 +389,12 @@ class NeutronPluginBaseV2(object):
|
||||
"""
|
||||
return (self.__class__.start_rpc_listeners !=
|
||||
NeutronPluginBaseV2.start_rpc_listeners)
|
||||
|
||||
def get_workers(self):
|
||||
"""Returns a collection NeutronWorker instances
|
||||
|
||||
If a plugin needs to define worker processes outside of API/RPC workers
|
||||
then it will override this and return a collection of NeutronWorker
|
||||
instances
|
||||
"""
|
||||
return ()
|
||||
|
@ -888,6 +888,14 @@ class MechanismDriver(object):
|
||||
"""
|
||||
pass
|
||||
|
||||
def get_workers(self):
|
||||
"""Get any NeutronWorker instances that should have their own process
|
||||
|
||||
Any driver that needs to run processes separate from the API or RPC
|
||||
workers, can return a sequence of NeutronWorker instances.
|
||||
"""
|
||||
return ()
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class ExtensionDriver(object):
|
||||
|
@ -749,6 +749,12 @@ class MechanismManager(stevedore.named.NamedExtensionManager):
|
||||
return False
|
||||
return True
|
||||
|
||||
def get_workers(self):
|
||||
workers = []
|
||||
for driver in self.ordered_mech_drivers:
|
||||
workers += driver.obj.get_workers()
|
||||
return workers
|
||||
|
||||
|
||||
class ExtensionManager(stevedore.named.NamedExtensionManager):
|
||||
"""Manage extension drivers using drivers."""
|
||||
|
@ -1539,3 +1539,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
if port:
|
||||
return port.id
|
||||
return device
|
||||
|
||||
def get_workers(self):
|
||||
return self.mechanism_manager.get_workers()
|
||||
|
@ -50,6 +50,10 @@ def main():
|
||||
else:
|
||||
rpc_thread = pool.spawn(neutron_rpc.wait)
|
||||
|
||||
plugin_workers = service.start_plugin_workers()
|
||||
for worker in plugin_workers:
|
||||
pool.spawn(worker.wait)
|
||||
|
||||
# api and rpc should die together. When one dies, kill the other.
|
||||
rpc_thread.link(lambda gt: api_thread.kill())
|
||||
api_thread.link(lambda gt: rpc_thread.kill())
|
||||
|
@ -32,6 +32,7 @@ from neutron import context
|
||||
from neutron.db import api as session
|
||||
from neutron.i18n import _LE, _LI
|
||||
from neutron import manager
|
||||
from neutron import worker
|
||||
from neutron import wsgi
|
||||
|
||||
|
||||
@ -44,7 +45,7 @@ service_opts = [
|
||||
'If not specified, the default is equal to the number '
|
||||
'of CPUs available for best performance.')),
|
||||
cfg.IntOpt('rpc_workers',
|
||||
default=0,
|
||||
default=1,
|
||||
help=_('Number of RPC worker processes for service')),
|
||||
cfg.IntOpt('periodic_fuzzy_delay',
|
||||
default=5,
|
||||
@ -108,13 +109,28 @@ def serve_wsgi(cls):
|
||||
return service
|
||||
|
||||
|
||||
class RpcWorker(common_service.ServiceBase):
|
||||
def start_plugin_workers():
|
||||
launchers = []
|
||||
# NOTE(twilson) get_service_plugins also returns the core plugin
|
||||
for plugin in manager.NeutronManager.get_unique_service_plugins():
|
||||
# TODO(twilson) Instead of defaulting here, come up with a good way to
|
||||
# share a common get_workers default between NeutronPluginBaseV2 and
|
||||
# ServicePluginBase
|
||||
for plugin_worker in getattr(plugin, 'get_workers', tuple)():
|
||||
launcher = common_service.ProcessLauncher(cfg.CONF)
|
||||
launcher.launch_service(plugin_worker)
|
||||
launchers.append(launcher)
|
||||
return launchers
|
||||
|
||||
|
||||
class RpcWorker(worker.NeutronWorker):
|
||||
"""Wraps a worker to be handled by ProcessLauncher"""
|
||||
def __init__(self, plugin):
|
||||
self._plugin = plugin
|
||||
self._servers = []
|
||||
|
||||
def start(self):
|
||||
super(RpcWorker, self).start()
|
||||
self._servers = self._plugin.start_rpc_listeners()
|
||||
|
||||
def wait(self):
|
||||
@ -149,6 +165,9 @@ class RpcWorker(common_service.ServiceBase):
|
||||
def serve_rpc():
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
|
||||
if cfg.CONF.rpc_workers < 1:
|
||||
cfg.CONF.set_override('rpc_workers', 1)
|
||||
|
||||
# If 0 < rpc_workers then start_rpc_listeners would be called in a
|
||||
# subprocess and we cannot simply catch the NotImplementedError. It is
|
||||
# simpler to check this up front by testing whether the plugin supports
|
||||
@ -164,22 +183,14 @@ def serve_rpc():
|
||||
try:
|
||||
rpc = RpcWorker(plugin)
|
||||
|
||||
if cfg.CONF.rpc_workers < 1:
|
||||
LOG.debug('starting rpc directly, workers=%s',
|
||||
cfg.CONF.rpc_workers)
|
||||
rpc.start()
|
||||
return rpc
|
||||
else:
|
||||
# dispose the whole pool before os.fork, otherwise there will
|
||||
# be shared DB connections in child processes which may cause
|
||||
# DB errors.
|
||||
LOG.debug('using launcher for rpc, workers=%s',
|
||||
cfg.CONF.rpc_workers)
|
||||
session.dispose()
|
||||
launcher = common_service.ProcessLauncher(cfg.CONF,
|
||||
wait_interval=1.0)
|
||||
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
|
||||
return launcher
|
||||
# dispose the whole pool before os.fork, otherwise there will
|
||||
# be shared DB connections in child processes which may cause
|
||||
# DB errors.
|
||||
LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers)
|
||||
session.dispose()
|
||||
launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0)
|
||||
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
|
||||
return launcher
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.exception(_LE('Unrecoverable error: please check log for '
|
||||
@ -188,7 +199,7 @@ def serve_rpc():
|
||||
|
||||
def _get_api_workers():
|
||||
workers = cfg.CONF.api_workers
|
||||
if workers is None:
|
||||
if not workers:
|
||||
workers = processutils.get_worker_count()
|
||||
return workers
|
||||
|
||||
|
@ -31,9 +31,11 @@ from neutron.db import l3_hamode_db
|
||||
from neutron.db import l3_hascheduler_db
|
||||
from neutron.plugins.common import constants
|
||||
from neutron.quota import resource_registry
|
||||
from neutron.services import service_base
|
||||
|
||||
|
||||
class L3RouterPlugin(common_db_mixin.CommonDbMixin,
|
||||
class L3RouterPlugin(service_base.ServicePluginBase,
|
||||
common_db_mixin.CommonDbMixin,
|
||||
extraroute_db.ExtraRoute_db_mixin,
|
||||
l3_hamode_db.L3_HA_NAT_db_mixin,
|
||||
l3_gwmode_db.L3_NAT_db_mixin,
|
||||
|
@ -46,6 +46,10 @@ class ServicePluginBase(extensions.PluginInterface):
|
||||
"""Return string description of the plugin."""
|
||||
pass
|
||||
|
||||
def get_workers(self):
|
||||
"""Returns a collection of NeutronWorkers"""
|
||||
return ()
|
||||
|
||||
|
||||
def load_drivers(service_type, plugin):
|
||||
"""Loads drivers for specific service.
|
||||
|
@ -27,6 +27,7 @@ import psutil
|
||||
from neutron.agent.linux import utils
|
||||
from neutron import service
|
||||
from neutron.tests import base
|
||||
from neutron import worker
|
||||
from neutron import wsgi
|
||||
|
||||
|
||||
@ -245,3 +246,41 @@ class TestRPCServer(TestNeutronServer):
|
||||
def test_restart_rpc_on_sighup_multiple_workers(self):
|
||||
self._test_restart_service_on_sighup(service=self._serve_rpc,
|
||||
workers=2)
|
||||
|
||||
|
||||
class TestPluginWorker(TestNeutronServer):
|
||||
"""Ensure that a plugin returning Workers spawns workers"""
|
||||
|
||||
def setUp(self):
|
||||
super(TestPluginWorker, self).setUp()
|
||||
self.setup_coreplugin(TARGET_PLUGIN)
|
||||
self._plugin_patcher = mock.patch(TARGET_PLUGIN, autospec=True)
|
||||
self.plugin = self._plugin_patcher.start()
|
||||
|
||||
def _start_plugin(self, workers=0):
|
||||
with mock.patch('neutron.manager.NeutronManager.get_plugin') as gp:
|
||||
gp.return_value = self.plugin
|
||||
launchers = service.start_plugin_workers()
|
||||
for launcher in launchers:
|
||||
launcher.wait()
|
||||
|
||||
def test_start(self):
|
||||
class FakeWorker(worker.NeutronWorker):
|
||||
def start(self):
|
||||
pass
|
||||
|
||||
def wait(self):
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
def reset(self):
|
||||
pass
|
||||
|
||||
# Make both ABC happy and ensure 'self' is correct
|
||||
FakeWorker.reset = self._fake_reset
|
||||
workers = [FakeWorker()]
|
||||
self.plugin.return_value.get_workers.return_value = workers
|
||||
self._test_restart_service_on_sighup(service=self._start_plugin,
|
||||
workers=len(workers))
|
||||
|
40
neutron/worker.py
Normal file
40
neutron/worker.py
Normal file
@ -0,0 +1,40 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_service import service
|
||||
|
||||
from neutron.callbacks import events
|
||||
from neutron.callbacks import registry
|
||||
from neutron.callbacks import resources
|
||||
|
||||
|
||||
class NeutronWorker(service.ServiceBase):
|
||||
"""Partial implementation of the ServiceBase ABC
|
||||
|
||||
Subclasses will still need to add the other abstractmethods defined in
|
||||
service.ServiceBase. See oslo_service for more details.
|
||||
|
||||
If a plugin needs to handle synchornization with the Neutron database and
|
||||
do this only once instead of in every API worker, for instance, it would
|
||||
define a NeutronWorker class and the plugin would have get_workers return
|
||||
an array of NeutronWorker instnaces. For example:
|
||||
class MyPlugin(...):
|
||||
def get_workers(self):
|
||||
return [MyPluginWorker()]
|
||||
|
||||
class MyPluginWorker(NeutronWorker):
|
||||
def start(self):
|
||||
super(MyPluginWorker, self).start()
|
||||
do_sync()
|
||||
"""
|
||||
def start(self):
|
||||
registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start)
|
@ -44,6 +44,7 @@ from neutron.common import exceptions as exception
|
||||
from neutron import context
|
||||
from neutron.db import api
|
||||
from neutron.i18n import _LE, _LI
|
||||
from neutron import worker
|
||||
|
||||
socket_opts = [
|
||||
cfg.IntOpt('backlog',
|
||||
@ -102,7 +103,7 @@ def encode_body(body):
|
||||
return body
|
||||
|
||||
|
||||
class WorkerService(common_service.ServiceBase):
|
||||
class WorkerService(worker.NeutronWorker):
|
||||
"""Wraps a worker to be handled by ProcessLauncher"""
|
||||
def __init__(self, service, application):
|
||||
self._service = service
|
||||
@ -110,6 +111,7 @@ class WorkerService(common_service.ServiceBase):
|
||||
self._server = None
|
||||
|
||||
def start(self):
|
||||
super(WorkerService, self).start()
|
||||
# When api worker is stopped it kills the eventlet wsgi server which
|
||||
# internally closes the wsgi server socket object. This server socket
|
||||
# object becomes not usable which leads to "Bad file descriptor"
|
||||
|
Loading…
Reference in New Issue
Block a user