Browse Source

Merge "Remove threading before process forking"

changes/46/303346/40
Jenkins 6 years ago committed by Gerrit Code Review
parent
commit
7f1ae6c1b7
  1. 50
      neutron/db/agentschedulers_db.py
  2. 7
      neutron/db/l3_agentschedulers_db.py
  3. 13
      neutron/neutron_plugin_base_v2.py
  4. 6
      neutron/plugins/ml2/plugin.py
  5. 12
      neutron/server/rpc_eventlet.py
  6. 33
      neutron/server/wsgi_eventlet.py
  7. 120
      neutron/service.py
  8. 12
      neutron/services/l3_router/l3_router_plugin.py
  9. 5
      neutron/services/metering/metering_plugin.py
  10. 8
      neutron/services/service_base.py
  11. 2
      neutron/tests/base.py
  12. 16
      neutron/tests/functional/test_server.py
  13. 2
      neutron/tests/unit/db/test_agentschedulers_db.py
  14. 54
      neutron/worker.py
  15. 14
      neutron/wsgi.py

50
neutron/db/agentschedulers_db.py

@ -35,6 +35,7 @@ from neutron.db.availability_zone import network as network_az
from neutron.db import model_base
from neutron.extensions import agent as ext_agent
from neutron.extensions import dhcpagentscheduler
from neutron import worker as neutron_worker
LOG = logging.getLogger(__name__)
@ -82,6 +83,38 @@ class NetworkDhcpAgentBinding(model_base.BASEV2):
primary_key=True)
class AgentStatusCheckWorker(neutron_worker.NeutronWorker):
def __init__(self, check_func, interval, initial_delay):
super(AgentStatusCheckWorker, self).__init__(worker_process_count=0)
self._check_func = check_func
self._loop = None
self._interval = interval
self._initial_delay = initial_delay
def start(self):
super(AgentStatusCheckWorker, self).start()
if self._loop is None:
self._loop = loopingcall.FixedIntervalLoopingCall(self._check_func)
self._loop.start(interval=self._interval,
initial_delay=self._initial_delay)
def wait(self):
if self._loop is not None:
self._loop.wait()
def stop(self):
if self._loop is not None:
self._loop.stop()
def reset(self):
if self._loop is not None:
self.stop()
self.wait()
self.start()
class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
"""Common class for agent scheduler mixins."""
@ -121,18 +154,16 @@ class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
return result
def add_agent_status_check(self, function):
loop = loopingcall.FixedIntervalLoopingCall(function)
# TODO(enikanorov): make interval configurable rather than computed
interval = max(cfg.CONF.agent_down_time // 2, 1)
# add random initial delay to allow agents to check in after the
# neutron server first starts. random to offset multiple servers
initial_delay = random.randint(interval, interval * 2)
loop.start(interval=interval, initial_delay=initial_delay)
if hasattr(self, 'periodic_agent_loops'):
self.periodic_agent_loops.append(loop)
else:
self.periodic_agent_loops = [loop]
check_worker = AgentStatusCheckWorker(function, interval,
initial_delay)
self.add_worker(check_worker)
def agent_dead_limit_seconds(self):
return cfg.CONF.agent_down_time * 2
@ -167,6 +198,13 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler
network_scheduler = None
def start_periodic_dhcp_agent_status_check(self):
LOG.warning(
_LW("DEPRECATED method 'start_periodic_dhcp_agent_status_check'. "
"Please use 'add_periodic_dhcp_agent_status_check' instead")
)
self.add_periodic_dhcp_agent_status_check()
def add_periodic_dhcp_agent_status_check(self):
if not cfg.CONF.allow_automatic_dhcp_failover:
LOG.info(_LI("Skipping periodic DHCP agent status check because "
"automatic network rescheduling is disabled."))

7
neutron/db/l3_agentschedulers_db.py

@ -83,6 +83,13 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
router_scheduler = None
def start_periodic_l3_agent_status_check(self):
LOG.warning(
_LW("DEPRECATED method 'start_periodic_l3_agent_status_check'. "
"Please use 'add_periodic_l3_agent_status_check' instead")
)
self.add_periodic_l3_agent_status_check()
def add_periodic_l3_agent_status_check(self):
if not cfg.CONF.allow_automatic_l3agent_failover:
LOG.info(_LI("Skipping period L3 agent status check because "
"automatic router rescheduling is disabled."))

13
neutron/neutron_plugin_base_v2.py

@ -24,9 +24,11 @@ import abc
import six
from neutron import worker as neutron_worker
@six.add_metaclass(abc.ABCMeta)
class NeutronPluginBaseV2(object):
class NeutronPluginBaseV2(neutron_worker.WorkerSupportServiceMixin):
@abc.abstractmethod
def create_subnet(self, context, subnet):
@ -409,12 +411,3 @@ class NeutronPluginBaseV2(object):
"""
return (self.__class__.start_rpc_state_reports_listener !=
NeutronPluginBaseV2.start_rpc_state_reports_listener)
def get_workers(self):
"""Returns a collection NeutronWorker instances
If a plugin needs to define worker processes outside of API/RPC workers
then it will override this and return a collection of NeutronWorker
instances
"""
return ()

6
neutron/plugins/ml2/plugin.py

@ -162,6 +162,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self._setup_dhcp()
self._start_rpc_notifiers()
self.add_agent_status_check(self.agent_health_check)
self.add_workers(self.mechanism_manager.get_workers())
self._verify_service_plugins_requirements()
LOG.info(_LI("Modular L2 Plugin initialization complete"))
@ -182,7 +183,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self.network_scheduler = importutils.import_object(
cfg.CONF.network_scheduler_driver
)
self.start_periodic_dhcp_agent_status_check()
self.add_periodic_dhcp_agent_status_check()
def _verify_service_plugins_requirements(self):
for service_plugin in cfg.CONF.service_plugins:
@ -1624,9 +1625,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
return port.id
return device
def get_workers(self):
return self.mechanism_manager.get_workers()
def filter_hosts_with_network_access(
self, context, network_id, candidate_hosts):
segments = db.get_network_segments(context.session, network_id)

12
neutron/server/rpc_eventlet.py

@ -18,8 +18,9 @@
# If ../neutron/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
import eventlet
from oslo_config import cfg
from oslo_log import log
from oslo_service import service as common_service
from neutron._i18n import _LI
from neutron import service
@ -28,13 +29,14 @@ LOG = log.getLogger(__name__)
def eventlet_rpc_server():
pool = eventlet.GreenPool()
LOG.info(_LI("Eventlet based AMQP RPC server starting..."))
rpc_workers_launcher = common_service.ProcessLauncher(
cfg.CONF, wait_interval=1.0
)
try:
neutron_rpc = service.serve_rpc()
service.start_rpc_workers(rpc_workers_launcher)
except NotImplementedError:
LOG.info(_LI("RPC was already started in parent process by "
"plugin."))
else:
pool.spawn(neutron_rpc.wait)
pool.waitall()
rpc_workers_launcher.wait()

33
neutron/server/wsgi_eventlet.py

@ -12,7 +12,10 @@
# under the License.
import eventlet
from oslo_config import cfg
from oslo_log import log
from oslo_service import service as common_service
from neutron._i18n import _LI
from neutron import service
@ -26,24 +29,24 @@ def eventlet_wsgi_server():
def start_api_and_rpc_workers(neutron_api):
pool = eventlet.GreenPool()
try:
plugin_workers_launcher = common_service.ProcessLauncher(
cfg.CONF, wait_interval=1.0
)
service.start_rpc_workers(plugin_workers_launcher)
api_thread = pool.spawn(neutron_api.wait)
pool = eventlet.GreenPool()
api_thread = pool.spawn(neutron_api.wait)
plugin_workers_thread = pool.spawn(plugin_workers_launcher.wait)
try:
neutron_rpc = service.serve_rpc()
# api and other workers should die together. When one dies,
# kill the other.
api_thread.link(lambda gt: plugin_workers_thread.kill())
plugin_workers_thread.link(lambda gt: api_thread.kill())
pool.waitall()
except NotImplementedError:
LOG.info(_LI("RPC was already started in parent process by "
"plugin."))
else:
rpc_thread = pool.spawn(neutron_rpc.wait)
plugin_workers = service.start_plugin_workers()
for worker in plugin_workers:
pool.spawn(worker.wait)
# api and rpc should die together. When one dies, kill the other.
rpc_thread.link(lambda gt: api_thread.kill())
api_thread.link(lambda gt: rpc_thread.kill())
pool.waitall()
neutron_api.wait()

120
neutron/service.py

@ -32,7 +32,7 @@ from neutron.common import rpc as n_rpc
from neutron import context
from neutron.db import api as session
from neutron import manager
from neutron import worker
from neutron import worker as neutron_worker
from neutron import wsgi
@ -113,25 +113,15 @@ def serve_wsgi(cls):
return service
def start_plugin_workers():
launchers = []
# NOTE(twilson) get_service_plugins also returns the core plugin
for plugin in manager.NeutronManager.get_unique_service_plugins():
# TODO(twilson) Instead of defaulting here, come up with a good way to
# share a common get_workers default between NeutronPluginBaseV2 and
# ServicePluginBase
for plugin_worker in getattr(plugin, 'get_workers', tuple)():
launcher = common_service.ProcessLauncher(cfg.CONF)
launcher.launch_service(plugin_worker)
launchers.append(launcher)
return launchers
class RpcWorker(worker.NeutronWorker):
class RpcWorker(neutron_worker.NeutronWorker):
"""Wraps a worker to be handled by ProcessLauncher"""
start_listeners_method = 'start_rpc_listeners'
def __init__(self, plugins):
def __init__(self, plugins, worker_process_count=1):
super(RpcWorker, self).__init__(
worker_process_count=worker_process_count
)
self._plugins = plugins
self._servers = []
@ -178,7 +168,7 @@ class RpcReportsWorker(RpcWorker):
start_listeners_method = 'start_rpc_state_reports_listener'
def serve_rpc():
def _get_rpc_workers():
plugin = manager.NeutronManager.get_plugin()
service_plugins = (
manager.NeutronManager.get_service_plugins().values())
@ -198,31 +188,101 @@ def serve_rpc():
cfg.CONF.rpc_workers)
raise NotImplementedError()
# passing service plugins only, because core plugin is among them
rpc_workers = [RpcWorker(service_plugins,
worker_process_count=cfg.CONF.rpc_workers)]
if (cfg.CONF.rpc_state_report_workers > 0 and
plugin.rpc_state_report_workers_supported()):
rpc_workers.append(
RpcReportsWorker(
[plugin],
worker_process_count=cfg.CONF.rpc_state_report_workers
)
)
return rpc_workers
class AllServicesNeutronWorker(neutron_worker.NeutronWorker):
def __init__(self, services, worker_process_count=1):
super(AllServicesNeutronWorker, self).__init__(worker_process_count)
self._services = services
self._launcher = common_service.Launcher(cfg.CONF)
def start(self):
for srv in self._services:
self._launcher.launch_service(srv)
super(AllServicesNeutronWorker, self).start()
def stop(self):
self._launcher.stop()
def wait(self):
self._launcher.wait()
def reset(self):
self._launcher.restart()
def _start_workers(worker_launcher, workers):
if not workers:
return
try:
# passing service plugins only, because core plugin is among them
rpc = RpcWorker(service_plugins)
# dispose the whole pool before os.fork, otherwise there will
# be shared DB connections in child processes which may cause
# DB errors.
LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers)
session.dispose()
launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0)
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
if (cfg.CONF.rpc_state_report_workers > 0 and
plugin.rpc_state_report_workers_supported()):
rpc_state_rep = RpcReportsWorker([plugin])
LOG.debug('using launcher for state reports rpc, workers=%s',
cfg.CONF.rpc_state_report_workers)
launcher.launch_service(
rpc_state_rep, workers=cfg.CONF.rpc_state_report_workers)
return launcher
for worker in workers:
worker_launcher.launch_service(worker, worker.worker_process_count)
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE('Unrecoverable error: please check log for '
'details.'))
def start_plugin_workers(worker_launcher):
# NOTE(twilson) get_service_plugins also returns the core plugin
plugins = manager.NeutronManager.get_unique_service_plugins()
# TODO(twilson) Instead of defaulting here, come up with a good way to
# share a common get_workers default between NeutronPluginBaseV2 and
# ServicePluginBase
plugin_workers = [
plugin_worker
for plugin in plugins if hasattr(plugin, 'get_workers')
for plugin_worker in plugin.get_workers()
]
# we need to fork all necessary processes before spawning extra threads
# to avoid problems with infinite resource locking and other concurrency
# problems
process_plugin_workers = [
plugin_worker for plugin_worker in plugin_workers
if plugin_worker.worker_process_count > 0
]
thread_plugin_workers = [
plugin_worker for plugin_worker in plugin_workers
if plugin_worker.worker_process_count < 1
]
# add extra process worker and spawn there all workers with
# worker_process_count == 0
if thread_plugin_workers:
process_plugin_workers.append(
AllServicesNeutronWorker(thread_plugin_workers)
)
_start_workers(worker_launcher, process_plugin_workers)
def start_rpc_workers(worker_launcher):
rpc_workers = _get_rpc_workers()
_start_workers(worker_launcher, rpc_workers)
def _get_api_workers():
workers = cfg.CONF.api_workers
if not workers:

12
neutron/services/l3_router/l3_router_plugin.py

@ -32,6 +32,7 @@ from neutron.db import l3_gwmode_db
from neutron.db import l3_hamode_db
from neutron.plugins.common import constants
from neutron.quota import resource_registry
from neutron import service
from neutron.services import service_base
@ -62,20 +63,23 @@ class L3RouterPlugin(service_base.ServicePluginBase,
def __init__(self):
self.router_scheduler = importutils.import_object(
cfg.CONF.router_scheduler_driver)
self.start_periodic_l3_agent_status_check()
self.add_periodic_l3_agent_status_check()
super(L3RouterPlugin, self).__init__()
if 'dvr' in self.supported_extension_aliases:
l3_dvrscheduler_db.subscribe()
l3_db.subscribe()
self.start_rpc_listeners()
self.agent_notifiers.update(
{n_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
rpc_worker = service.RpcWorker([self], worker_process_count=0)
self.add_worker(rpc_worker)
@log_helpers.log_method_call
def start_rpc_listeners(self):
# RPC support
self.topic = topics.L3PLUGIN
self.conn = n_rpc.create_connection()
self.agent_notifiers.update(
{n_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
self.endpoints = [l3_rpc.L3RpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)

5
neutron/services/metering/metering_plugin.py

@ -17,6 +17,7 @@ from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.db.metering import metering_db
from neutron.db.metering import metering_rpc
from neutron import service
class MeteringPlugin(metering_db.MeteringDbMixin):
@ -28,7 +29,9 @@ class MeteringPlugin(metering_db.MeteringDbMixin):
super(MeteringPlugin, self).__init__()
self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
self.start_rpc_listeners()
rpc_worker = service.RpcWorker([self], worker_process_count=0)
self.add_worker(rpc_worker)
def start_rpc_listeners(self):
self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)]

8
neutron/services/service_base.py

@ -24,12 +24,14 @@ from neutron._i18n import _, _LE, _LI
from neutron.api import extensions
from neutron.db import servicetype_db as sdb
from neutron.services import provider_configuration as pconf
from neutron import worker as neutron_worker
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class ServicePluginBase(extensions.PluginInterface):
class ServicePluginBase(extensions.PluginInterface,
neutron_worker.WorkerSupportServiceMixin):
"""Define base interface for any Advanced Service plugin."""
supported_extension_aliases = []
@ -46,10 +48,6 @@ class ServicePluginBase(extensions.PluginInterface):
"""Return string description of the plugin."""
pass
def get_workers(self):
"""Returns a collection of NeutronWorkers"""
return ()
def load_drivers(service_type, plugin):
"""Loads drivers for specific service.

2
neutron/tests/base.py

@ -414,7 +414,7 @@ class PluginFixture(fixtures.Fixture):
self.patched_default_svc_plugins = self.default_svc_plugins_p.start()
self.dhcp_periodic_p = mock.patch(
'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.'
'start_periodic_dhcp_agent_status_check')
'add_periodic_dhcp_agent_status_check')
self.patched_dhcp_periodic = self.dhcp_periodic_p.start()
self.agent_health_check_p = mock.patch(
'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.'

16
neutron/tests/functional/test_server.py

@ -22,12 +22,13 @@ import traceback
import httplib2
import mock
from oslo_config import cfg
from oslo_service import service as common_service
import psutil
from neutron.agent.linux import utils
from neutron import service
from neutron.tests import base
from neutron import worker
from neutron import worker as neutron_worker
from neutron import wsgi
@ -244,8 +245,9 @@ class TestRPCServer(TestNeutronServer):
# not interested in state report workers specifically
CONF.set_override("rpc_state_report_workers", 0)
launcher = service.serve_rpc()
launcher.wait()
rpc_workers_launcher = common_service.ProcessLauncher(CONF)
service.start_rpc_workers(rpc_workers_launcher)
rpc_workers_launcher.wait()
def test_restart_rpc_on_sighup_multiple_workers(self):
self._test_restart_service_on_sighup(service=self._serve_rpc,
@ -264,12 +266,12 @@ class TestPluginWorker(TestNeutronServer):
def _start_plugin(self, workers=1):
with mock.patch('neutron.manager.NeutronManager.get_plugin') as gp:
gp.return_value = self.plugin
launchers = service.start_plugin_workers()
for launcher in launchers:
launcher.wait()
plugin_workers_launcher = common_service.ProcessLauncher(CONF)
service.start_plugin_workers(plugin_workers_launcher)
plugin_workers_launcher.wait()
def test_start(self):
class FakeWorker(worker.NeutronWorker):
class FakeWorker(neutron_worker.NeutronWorker):
def start(self):
pass

2
neutron/tests/unit/db/test_agentschedulers_db.py

@ -256,7 +256,7 @@ class OvsAgentSchedulerTestCaseBase(test_l3.L3NatTestCaseMixin,
self.patched_l3_notify = self.l3_notify_p.start()
self.l3_periodic_p = mock.patch('neutron.db.l3_agentschedulers_db.'
'L3AgentSchedulerDbMixin.'
'start_periodic_l3_agent_status_check')
'add_periodic_l3_agent_status_check')
self.patched_l3_periodic = self.l3_periodic_p.start()
self.dhcp_notify_p = mock.patch(
'neutron.extensions.dhcpagentscheduler.notify')

54
neutron/worker.py

@ -17,6 +17,38 @@ from neutron.callbacks import registry
from neutron.callbacks import resources
class WorkerSupportServiceMixin(object):
@property
def _workers(self):
try:
return self.__workers
except AttributeError:
self.__workers = []
return self.__workers
def get_workers(self):
"""Returns a collection NeutronWorker instances needed by this service
"""
return list(self._workers)
def add_worker(self, worker):
"""Adds NeutronWorker needed for this service
If a object needs to define workers thread/processes outside of API/RPC
workers then it will call this method to register worker. Should be
called on initialization stage before running services
"""
self._workers.append(worker)
def add_workers(self, workers):
"""Adds NeutronWorker list needed for this service
The same as add_worker but adds a list of workers
"""
self._workers.extend(workers)
class NeutronWorker(service.ServiceBase):
"""Partial implementation of the ServiceBase ABC
@ -36,5 +68,25 @@ class NeutronWorker(service.ServiceBase):
super(MyPluginWorker, self).start()
do_sync()
"""
# default class value for case when super().__init__ is not called
_worker_process_count = 1
def __init__(self, worker_process_count=_worker_process_count):
"""
Initialize worker
:param worker_process_count: Defines how many processes to spawn for
worker:
0 - spawn 1 new worker thread,
1..N - spawn N new worker processes
"""
self._worker_process_count = worker_process_count
@property
def worker_process_count(self):
return self._worker_process_count
def start(self):
registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start)
if self.worker_process_count > 0:
registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start)

14
neutron/wsgi.py

@ -44,7 +44,7 @@ from neutron.common import config
from neutron.common import exceptions as n_exc
from neutron import context
from neutron.db import api
from neutron import worker
from neutron import worker as neutron_worker
socket_opts = [
cfg.IntOpt('backlog',
@ -74,9 +74,12 @@ def encode_body(body):
return encodeutils.to_utf8(body)
class WorkerService(worker.NeutronWorker):
class WorkerService(neutron_worker.NeutronWorker):
"""Wraps a worker to be handled by ProcessLauncher"""
def __init__(self, service, application, disable_ssl=False):
def __init__(self, service, application, disable_ssl=False,
worker_process_count=0):
super(WorkerService, self).__init__(worker_process_count)
self._service = service
self._application = application
self._disable_ssl = disable_ssl
@ -188,7 +191,7 @@ class Server(object):
self._launch(application, workers)
def _launch(self, application, workers=0):
service = WorkerService(self, application, self.disable_ssl)
service = WorkerService(self, application, self.disable_ssl, workers)
if workers < 1:
# The API service should run in the current process.
self._server = service
@ -206,7 +209,8 @@ class Server(object):
# wait interval past the default of 0.01s.
self._server = common_service.ProcessLauncher(cfg.CONF,
wait_interval=1.0)
self._server.launch_service(service, workers=workers)
self._server.launch_service(service,
workers=service.worker_process_count)
@property
def host(self):

Loading…
Cancel
Save