From 1cafff087194711399ad2a85a9f394f7204d7bdd Mon Sep 17 00:00:00 2001 From: dukhlov Date: Fri, 5 Feb 2016 19:25:42 +0200 Subject: [PATCH] Remove threading before process forking Forking a process when multiple threads are running is an unsafe operation and could cause a lot of problems because only current thread will continue working in child thread. Any locked by other thread resource will remain locked forever. We faced with this problem during oslo.messaging development and added workaround to hide this problem: https://review.openstack.org/#/c/274255/ I tried to fix this problem in oslo.service: https://review.openstack.org/#/c/270832/ but oslo folks said that this fix is ugly and it is wrong way to add workarounds to common libraries because projects use them incorrectly. I think that is fair. So this patch fixes incorrect usage of oslo libraries. In this patch I extended functionality of NeutronWorker and add there `worker_process_count` parameter which determines how many processes should be spawned for this worker. If `worker_process_count` = 0 - don't create process and spawn thread in scope of current process for worker Then I moved all background tasks to workers and return them by `get_workers` method. start_plugin_workers collects plugin's workers using `get_workers` method and starts in ProcessLauncher first workers with `worker_process_count` > 0 and only after this starts threaded workers by simple Launcher Closes-bug: #1569404 Change-Id: I0544f1d47ae53d572adda872847a56fa0b202d2e --- neutron/db/agentschedulers_db.py | 50 +++++++- neutron/db/l3_agentschedulers_db.py | 7 + neutron/neutron_plugin_base_v2.py | 13 +- neutron/plugins/ml2/plugin.py | 6 +- neutron/server/rpc_eventlet.py | 12 +- neutron/server/wsgi_eventlet.py | 35 ++--- neutron/service.py | 120 +++++++++++++----- .../services/l3_router/l3_router_plugin.py | 12 +- neutron/services/metering/metering_plugin.py | 5 +- neutron/services/service_base.py | 8 +- neutron/tests/base.py | 2 +- neutron/tests/functional/test_server.py | 16 ++- .../tests/unit/db/test_agentschedulers_db.py | 2 +- neutron/worker.py | 54 +++++++- neutron/wsgi.py | 14 +- 15 files changed, 260 insertions(+), 96 deletions(-) diff --git a/neutron/db/agentschedulers_db.py b/neutron/db/agentschedulers_db.py index 2f90bf9f5fb..e12b1f399d3 100644 --- a/neutron/db/agentschedulers_db.py +++ b/neutron/db/agentschedulers_db.py @@ -35,6 +35,7 @@ from neutron.db.availability_zone import network as network_az from neutron.db import model_base from neutron.extensions import agent as ext_agent from neutron.extensions import dhcpagentscheduler +from neutron import worker as neutron_worker LOG = logging.getLogger(__name__) @@ -82,6 +83,38 @@ class NetworkDhcpAgentBinding(model_base.BASEV2): primary_key=True) +class AgentStatusCheckWorker(neutron_worker.NeutronWorker): + + def __init__(self, check_func, interval, initial_delay): + super(AgentStatusCheckWorker, self).__init__(worker_process_count=0) + + self._check_func = check_func + self._loop = None + self._interval = interval + self._initial_delay = initial_delay + + def start(self): + super(AgentStatusCheckWorker, self).start() + if self._loop is None: + self._loop = loopingcall.FixedIntervalLoopingCall(self._check_func) + self._loop.start(interval=self._interval, + initial_delay=self._initial_delay) + + def wait(self): + if self._loop is not None: + self._loop.wait() + + def stop(self): + if self._loop is not None: + self._loop.stop() + + def reset(self): + if self._loop is not None: + self.stop() + self.wait() + self.start() + + class AgentSchedulerDbMixin(agents_db.AgentDbMixin): """Common class for agent scheduler mixins.""" @@ -121,18 +154,16 @@ class AgentSchedulerDbMixin(agents_db.AgentDbMixin): return result def add_agent_status_check(self, function): - loop = loopingcall.FixedIntervalLoopingCall(function) # TODO(enikanorov): make interval configurable rather than computed interval = max(cfg.CONF.agent_down_time // 2, 1) # add random initial delay to allow agents to check in after the # neutron server first starts. random to offset multiple servers initial_delay = random.randint(interval, interval * 2) - loop.start(interval=interval, initial_delay=initial_delay) - if hasattr(self, 'periodic_agent_loops'): - self.periodic_agent_loops.append(loop) - else: - self.periodic_agent_loops = [loop] + check_worker = AgentStatusCheckWorker(function, interval, + initial_delay) + + self.add_worker(check_worker) def agent_dead_limit_seconds(self): return cfg.CONF.agent_down_time * 2 @@ -167,6 +198,13 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler network_scheduler = None def start_periodic_dhcp_agent_status_check(self): + LOG.warning( + _LW("DEPRECATED method 'start_periodic_dhcp_agent_status_check'. " + "Please use 'add_periodic_dhcp_agent_status_check' instead") + ) + self.add_periodic_dhcp_agent_status_check() + + def add_periodic_dhcp_agent_status_check(self): if not cfg.CONF.allow_automatic_dhcp_failover: LOG.info(_LI("Skipping periodic DHCP agent status check because " "automatic network rescheduling is disabled.")) diff --git a/neutron/db/l3_agentschedulers_db.py b/neutron/db/l3_agentschedulers_db.py index b0516072ee1..e98e3bf294e 100644 --- a/neutron/db/l3_agentschedulers_db.py +++ b/neutron/db/l3_agentschedulers_db.py @@ -83,6 +83,13 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase, router_scheduler = None def start_periodic_l3_agent_status_check(self): + LOG.warning( + _LW("DEPRECATED method 'start_periodic_l3_agent_status_check'. " + "Please use 'add_periodic_l3_agent_status_check' instead") + ) + self.add_periodic_l3_agent_status_check() + + def add_periodic_l3_agent_status_check(self): if not cfg.CONF.allow_automatic_l3agent_failover: LOG.info(_LI("Skipping period L3 agent status check because " "automatic router rescheduling is disabled.")) diff --git a/neutron/neutron_plugin_base_v2.py b/neutron/neutron_plugin_base_v2.py index 12ed5bba1e6..513890f7822 100644 --- a/neutron/neutron_plugin_base_v2.py +++ b/neutron/neutron_plugin_base_v2.py @@ -24,9 +24,11 @@ import abc import six +from neutron import worker as neutron_worker + @six.add_metaclass(abc.ABCMeta) -class NeutronPluginBaseV2(object): +class NeutronPluginBaseV2(neutron_worker.WorkerSupportServiceMixin): @abc.abstractmethod def create_subnet(self, context, subnet): @@ -409,12 +411,3 @@ class NeutronPluginBaseV2(object): """ return (self.__class__.start_rpc_state_reports_listener != NeutronPluginBaseV2.start_rpc_state_reports_listener) - - def get_workers(self): - """Returns a collection NeutronWorker instances - - If a plugin needs to define worker processes outside of API/RPC workers - then it will override this and return a collection of NeutronWorker - instances - """ - return () diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index c361475c5ab..68861178c77 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -162,6 +162,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self._setup_dhcp() self._start_rpc_notifiers() self.add_agent_status_check(self.agent_health_check) + self.add_workers(self.mechanism_manager.get_workers()) self._verify_service_plugins_requirements() LOG.info(_LI("Modular L2 Plugin initialization complete")) @@ -182,7 +183,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self.network_scheduler = importutils.import_object( cfg.CONF.network_scheduler_driver ) - self.start_periodic_dhcp_agent_status_check() + self.add_periodic_dhcp_agent_status_check() def _verify_service_plugins_requirements(self): for service_plugin in cfg.CONF.service_plugins: @@ -1624,9 +1625,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, return port.id return device - def get_workers(self): - return self.mechanism_manager.get_workers() - def filter_hosts_with_network_access( self, context, network_id, candidate_hosts): segments = db.get_network_segments(context.session, network_id) diff --git a/neutron/server/rpc_eventlet.py b/neutron/server/rpc_eventlet.py index 9b218cdaeb5..36118a034f9 100644 --- a/neutron/server/rpc_eventlet.py +++ b/neutron/server/rpc_eventlet.py @@ -18,8 +18,9 @@ # If ../neutron/__init__.py exists, add ../ to Python search path, so that # it will override what happens to be installed in /usr/(local/)lib/python... -import eventlet +from oslo_config import cfg from oslo_log import log +from oslo_service import service as common_service from neutron._i18n import _LI from neutron import service @@ -28,13 +29,14 @@ LOG = log.getLogger(__name__) def eventlet_rpc_server(): - pool = eventlet.GreenPool() LOG.info(_LI("Eventlet based AMQP RPC server starting...")) + rpc_workers_launcher = common_service.ProcessLauncher( + cfg.CONF, wait_interval=1.0 + ) try: - neutron_rpc = service.serve_rpc() + service.start_rpc_workers(rpc_workers_launcher) except NotImplementedError: LOG.info(_LI("RPC was already started in parent process by " "plugin.")) else: - pool.spawn(neutron_rpc.wait) - pool.waitall() + rpc_workers_launcher.wait() diff --git a/neutron/server/wsgi_eventlet.py b/neutron/server/wsgi_eventlet.py index 472ea6bf743..7381da8f834 100644 --- a/neutron/server/wsgi_eventlet.py +++ b/neutron/server/wsgi_eventlet.py @@ -12,7 +12,10 @@ # under the License. import eventlet + +from oslo_config import cfg from oslo_log import log +from oslo_service import service as common_service from neutron._i18n import _LI from neutron import service @@ -26,24 +29,24 @@ def eventlet_wsgi_server(): def start_api_and_rpc_workers(neutron_api): - pool = eventlet.GreenPool() - - api_thread = pool.spawn(neutron_api.wait) - try: - neutron_rpc = service.serve_rpc() + plugin_workers_launcher = common_service.ProcessLauncher( + cfg.CONF, wait_interval=1.0 + ) + service.start_rpc_workers(plugin_workers_launcher) + + pool = eventlet.GreenPool() + api_thread = pool.spawn(neutron_api.wait) + plugin_workers_thread = pool.spawn(plugin_workers_launcher.wait) + + # api and other workers should die together. When one dies, + # kill the other. + api_thread.link(lambda gt: plugin_workers_thread.kill()) + plugin_workers_thread.link(lambda gt: api_thread.kill()) + + pool.waitall() except NotImplementedError: LOG.info(_LI("RPC was already started in parent process by " "plugin.")) - else: - rpc_thread = pool.spawn(neutron_rpc.wait) - plugin_workers = service.start_plugin_workers() - for worker in plugin_workers: - pool.spawn(worker.wait) - - # api and rpc should die together. When one dies, kill the other. - rpc_thread.link(lambda gt: api_thread.kill()) - api_thread.link(lambda gt: rpc_thread.kill()) - - pool.waitall() + neutron_api.wait() diff --git a/neutron/service.py b/neutron/service.py index 6510f579667..7bd74467738 100644 --- a/neutron/service.py +++ b/neutron/service.py @@ -32,7 +32,7 @@ from neutron.common import rpc as n_rpc from neutron import context from neutron.db import api as session from neutron import manager -from neutron import worker +from neutron import worker as neutron_worker from neutron import wsgi @@ -113,25 +113,15 @@ def serve_wsgi(cls): return service -def start_plugin_workers(): - launchers = [] - # NOTE(twilson) get_service_plugins also returns the core plugin - for plugin in manager.NeutronManager.get_unique_service_plugins(): - # TODO(twilson) Instead of defaulting here, come up with a good way to - # share a common get_workers default between NeutronPluginBaseV2 and - # ServicePluginBase - for plugin_worker in getattr(plugin, 'get_workers', tuple)(): - launcher = common_service.ProcessLauncher(cfg.CONF) - launcher.launch_service(plugin_worker) - launchers.append(launcher) - return launchers - - -class RpcWorker(worker.NeutronWorker): +class RpcWorker(neutron_worker.NeutronWorker): """Wraps a worker to be handled by ProcessLauncher""" start_listeners_method = 'start_rpc_listeners' - def __init__(self, plugins): + def __init__(self, plugins, worker_process_count=1): + super(RpcWorker, self).__init__( + worker_process_count=worker_process_count + ) + self._plugins = plugins self._servers = [] @@ -178,7 +168,7 @@ class RpcReportsWorker(RpcWorker): start_listeners_method = 'start_rpc_state_reports_listener' -def serve_rpc(): +def _get_rpc_workers(): plugin = manager.NeutronManager.get_plugin() service_plugins = ( manager.NeutronManager.get_service_plugins().values()) @@ -198,31 +188,101 @@ def serve_rpc(): cfg.CONF.rpc_workers) raise NotImplementedError() + # passing service plugins only, because core plugin is among them + rpc_workers = [RpcWorker(service_plugins, + worker_process_count=cfg.CONF.rpc_workers)] + + if (cfg.CONF.rpc_state_report_workers > 0 and + plugin.rpc_state_report_workers_supported()): + rpc_workers.append( + RpcReportsWorker( + [plugin], + worker_process_count=cfg.CONF.rpc_state_report_workers + ) + ) + return rpc_workers + + +class AllServicesNeutronWorker(neutron_worker.NeutronWorker): + def __init__(self, services, worker_process_count=1): + super(AllServicesNeutronWorker, self).__init__(worker_process_count) + self._services = services + self._launcher = common_service.Launcher(cfg.CONF) + + def start(self): + for srv in self._services: + self._launcher.launch_service(srv) + super(AllServicesNeutronWorker, self).start() + + def stop(self): + self._launcher.stop() + + def wait(self): + self._launcher.wait() + + def reset(self): + self._launcher.restart() + + +def _start_workers(worker_launcher, workers): + if not workers: + return try: - # passing service plugins only, because core plugin is among them - rpc = RpcWorker(service_plugins) # dispose the whole pool before os.fork, otherwise there will # be shared DB connections in child processes which may cause # DB errors. LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers) session.dispose() - launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0) - launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers) - if (cfg.CONF.rpc_state_report_workers > 0 and - plugin.rpc_state_report_workers_supported()): - rpc_state_rep = RpcReportsWorker([plugin]) - LOG.debug('using launcher for state reports rpc, workers=%s', - cfg.CONF.rpc_state_report_workers) - launcher.launch_service( - rpc_state_rep, workers=cfg.CONF.rpc_state_report_workers) - return launcher + for worker in workers: + worker_launcher.launch_service(worker, worker.worker_process_count) except Exception: with excutils.save_and_reraise_exception(): LOG.exception(_LE('Unrecoverable error: please check log for ' 'details.')) +def start_plugin_workers(worker_launcher): + # NOTE(twilson) get_service_plugins also returns the core plugin + plugins = manager.NeutronManager.get_unique_service_plugins() + + # TODO(twilson) Instead of defaulting here, come up with a good way to + # share a common get_workers default between NeutronPluginBaseV2 and + # ServicePluginBase + plugin_workers = [ + plugin_worker + for plugin in plugins if hasattr(plugin, 'get_workers') + for plugin_worker in plugin.get_workers() + ] + + # we need to fork all necessary processes before spawning extra threads + # to avoid problems with infinite resource locking and other concurrency + # problems + process_plugin_workers = [ + plugin_worker for plugin_worker in plugin_workers + if plugin_worker.worker_process_count > 0 + ] + + thread_plugin_workers = [ + plugin_worker for plugin_worker in plugin_workers + if plugin_worker.worker_process_count < 1 + ] + + # add extra process worker and spawn there all workers with + # worker_process_count == 0 + if thread_plugin_workers: + process_plugin_workers.append( + AllServicesNeutronWorker(thread_plugin_workers) + ) + + _start_workers(worker_launcher, process_plugin_workers) + + +def start_rpc_workers(worker_launcher): + rpc_workers = _get_rpc_workers() + _start_workers(worker_launcher, rpc_workers) + + def _get_api_workers(): workers = cfg.CONF.api_workers if not workers: diff --git a/neutron/services/l3_router/l3_router_plugin.py b/neutron/services/l3_router/l3_router_plugin.py index c10668cbc11..ef89c626612 100644 --- a/neutron/services/l3_router/l3_router_plugin.py +++ b/neutron/services/l3_router/l3_router_plugin.py @@ -32,6 +32,7 @@ from neutron.db import l3_gwmode_db from neutron.db import l3_hamode_db from neutron.plugins.common import constants from neutron.quota import resource_registry +from neutron import service from neutron.services import service_base @@ -62,20 +63,23 @@ class L3RouterPlugin(service_base.ServicePluginBase, def __init__(self): self.router_scheduler = importutils.import_object( cfg.CONF.router_scheduler_driver) - self.start_periodic_l3_agent_status_check() + self.add_periodic_l3_agent_status_check() super(L3RouterPlugin, self).__init__() if 'dvr' in self.supported_extension_aliases: l3_dvrscheduler_db.subscribe() l3_db.subscribe() - self.start_rpc_listeners() + self.agent_notifiers.update( + {n_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()}) + + rpc_worker = service.RpcWorker([self], worker_process_count=0) + + self.add_worker(rpc_worker) @log_helpers.log_method_call def start_rpc_listeners(self): # RPC support self.topic = topics.L3PLUGIN self.conn = n_rpc.create_connection() - self.agent_notifiers.update( - {n_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()}) self.endpoints = [l3_rpc.L3RpcCallback()] self.conn.create_consumer(self.topic, self.endpoints, fanout=False) diff --git a/neutron/services/metering/metering_plugin.py b/neutron/services/metering/metering_plugin.py index 11a928642a0..f7cb42411e6 100644 --- a/neutron/services/metering/metering_plugin.py +++ b/neutron/services/metering/metering_plugin.py @@ -17,6 +17,7 @@ from neutron.common import rpc as n_rpc from neutron.common import topics from neutron.db.metering import metering_db from neutron.db.metering import metering_rpc +from neutron import service class MeteringPlugin(metering_db.MeteringDbMixin): @@ -28,7 +29,9 @@ class MeteringPlugin(metering_db.MeteringDbMixin): super(MeteringPlugin, self).__init__() self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI() - self.start_rpc_listeners() + rpc_worker = service.RpcWorker([self], worker_process_count=0) + + self.add_worker(rpc_worker) def start_rpc_listeners(self): self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)] diff --git a/neutron/services/service_base.py b/neutron/services/service_base.py index 5fa9bcf388d..33da0286d15 100644 --- a/neutron/services/service_base.py +++ b/neutron/services/service_base.py @@ -24,12 +24,14 @@ from neutron._i18n import _, _LE, _LI from neutron.api import extensions from neutron.db import servicetype_db as sdb from neutron.services import provider_configuration as pconf +from neutron import worker as neutron_worker LOG = logging.getLogger(__name__) @six.add_metaclass(abc.ABCMeta) -class ServicePluginBase(extensions.PluginInterface): +class ServicePluginBase(extensions.PluginInterface, + neutron_worker.WorkerSupportServiceMixin): """Define base interface for any Advanced Service plugin.""" supported_extension_aliases = [] @@ -46,10 +48,6 @@ class ServicePluginBase(extensions.PluginInterface): """Return string description of the plugin.""" pass - def get_workers(self): - """Returns a collection of NeutronWorkers""" - return () - def load_drivers(service_type, plugin): """Loads drivers for specific service. diff --git a/neutron/tests/base.py b/neutron/tests/base.py index 6e3d8b3a9bc..6488cabc3d3 100644 --- a/neutron/tests/base.py +++ b/neutron/tests/base.py @@ -414,7 +414,7 @@ class PluginFixture(fixtures.Fixture): self.patched_default_svc_plugins = self.default_svc_plugins_p.start() self.dhcp_periodic_p = mock.patch( 'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.' - 'start_periodic_dhcp_agent_status_check') + 'add_periodic_dhcp_agent_status_check') self.patched_dhcp_periodic = self.dhcp_periodic_p.start() self.agent_health_check_p = mock.patch( 'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.' diff --git a/neutron/tests/functional/test_server.py b/neutron/tests/functional/test_server.py index 891bdf3276d..2c7615a5c05 100644 --- a/neutron/tests/functional/test_server.py +++ b/neutron/tests/functional/test_server.py @@ -22,12 +22,13 @@ import traceback import httplib2 import mock from oslo_config import cfg +from oslo_service import service as common_service import psutil from neutron.agent.linux import utils from neutron import service from neutron.tests import base -from neutron import worker +from neutron import worker as neutron_worker from neutron import wsgi @@ -244,8 +245,9 @@ class TestRPCServer(TestNeutronServer): # not interested in state report workers specifically CONF.set_override("rpc_state_report_workers", 0) - launcher = service.serve_rpc() - launcher.wait() + rpc_workers_launcher = common_service.ProcessLauncher(CONF) + service.start_rpc_workers(rpc_workers_launcher) + rpc_workers_launcher.wait() def test_restart_rpc_on_sighup_multiple_workers(self): self._test_restart_service_on_sighup(service=self._serve_rpc, @@ -264,12 +266,12 @@ class TestPluginWorker(TestNeutronServer): def _start_plugin(self, workers=1): with mock.patch('neutron.manager.NeutronManager.get_plugin') as gp: gp.return_value = self.plugin - launchers = service.start_plugin_workers() - for launcher in launchers: - launcher.wait() + plugin_workers_launcher = common_service.ProcessLauncher(CONF) + service.start_plugin_workers(plugin_workers_launcher) + plugin_workers_launcher.wait() def test_start(self): - class FakeWorker(worker.NeutronWorker): + class FakeWorker(neutron_worker.NeutronWorker): def start(self): pass diff --git a/neutron/tests/unit/db/test_agentschedulers_db.py b/neutron/tests/unit/db/test_agentschedulers_db.py index 692e1b96de1..655073846f9 100644 --- a/neutron/tests/unit/db/test_agentschedulers_db.py +++ b/neutron/tests/unit/db/test_agentschedulers_db.py @@ -256,7 +256,7 @@ class OvsAgentSchedulerTestCaseBase(test_l3.L3NatTestCaseMixin, self.patched_l3_notify = self.l3_notify_p.start() self.l3_periodic_p = mock.patch('neutron.db.l3_agentschedulers_db.' 'L3AgentSchedulerDbMixin.' - 'start_periodic_l3_agent_status_check') + 'add_periodic_l3_agent_status_check') self.patched_l3_periodic = self.l3_periodic_p.start() self.dhcp_notify_p = mock.patch( 'neutron.extensions.dhcpagentscheduler.notify') diff --git a/neutron/worker.py b/neutron/worker.py index 80a16533eba..a602d1c01f0 100644 --- a/neutron/worker.py +++ b/neutron/worker.py @@ -17,6 +17,38 @@ from neutron.callbacks import registry from neutron.callbacks import resources +class WorkerSupportServiceMixin(object): + + @property + def _workers(self): + try: + return self.__workers + except AttributeError: + self.__workers = [] + return self.__workers + + def get_workers(self): + """Returns a collection NeutronWorker instances needed by this service + """ + return list(self._workers) + + def add_worker(self, worker): + """Adds NeutronWorker needed for this service + + If a object needs to define workers thread/processes outside of API/RPC + workers then it will call this method to register worker. Should be + called on initialization stage before running services + """ + self._workers.append(worker) + + def add_workers(self, workers): + """Adds NeutronWorker list needed for this service + + The same as add_worker but adds a list of workers + """ + self._workers.extend(workers) + + class NeutronWorker(service.ServiceBase): """Partial implementation of the ServiceBase ABC @@ -36,5 +68,25 @@ class NeutronWorker(service.ServiceBase): super(MyPluginWorker, self).start() do_sync() """ + + # default class value for case when super().__init__ is not called + _worker_process_count = 1 + + def __init__(self, worker_process_count=_worker_process_count): + """ + Initialize worker + + :param worker_process_count: Defines how many processes to spawn for + worker: + 0 - spawn 1 new worker thread, + 1..N - spawn N new worker processes + """ + self._worker_process_count = worker_process_count + + @property + def worker_process_count(self): + return self._worker_process_count + def start(self): - registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start) + if self.worker_process_count > 0: + registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start) diff --git a/neutron/wsgi.py b/neutron/wsgi.py index 684fb69cb87..f49358579c6 100644 --- a/neutron/wsgi.py +++ b/neutron/wsgi.py @@ -44,7 +44,7 @@ from neutron.common import config from neutron.common import exceptions as n_exc from neutron import context from neutron.db import api -from neutron import worker +from neutron import worker as neutron_worker socket_opts = [ cfg.IntOpt('backlog', @@ -74,9 +74,12 @@ def encode_body(body): return encodeutils.to_utf8(body) -class WorkerService(worker.NeutronWorker): +class WorkerService(neutron_worker.NeutronWorker): """Wraps a worker to be handled by ProcessLauncher""" - def __init__(self, service, application, disable_ssl=False): + def __init__(self, service, application, disable_ssl=False, + worker_process_count=0): + super(WorkerService, self).__init__(worker_process_count) + self._service = service self._application = application self._disable_ssl = disable_ssl @@ -188,7 +191,7 @@ class Server(object): self._launch(application, workers) def _launch(self, application, workers=0): - service = WorkerService(self, application, self.disable_ssl) + service = WorkerService(self, application, self.disable_ssl, workers) if workers < 1: # The API service should run in the current process. self._server = service @@ -206,7 +209,8 @@ class Server(object): # wait interval past the default of 0.01s. self._server = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0) - self._server.launch_service(service, workers=workers) + self._server.launch_service(service, + workers=service.worker_process_count) @property def host(self):