diff --git a/neutron/db/agentschedulers_db.py b/neutron/db/agentschedulers_db.py index 2f90bf9f5fb..e12b1f399d3 100644 --- a/neutron/db/agentschedulers_db.py +++ b/neutron/db/agentschedulers_db.py @@ -35,6 +35,7 @@ from neutron.db.availability_zone import network as network_az from neutron.db import model_base from neutron.extensions import agent as ext_agent from neutron.extensions import dhcpagentscheduler +from neutron import worker as neutron_worker LOG = logging.getLogger(__name__) @@ -82,6 +83,38 @@ class NetworkDhcpAgentBinding(model_base.BASEV2): primary_key=True) +class AgentStatusCheckWorker(neutron_worker.NeutronWorker): + + def __init__(self, check_func, interval, initial_delay): + super(AgentStatusCheckWorker, self).__init__(worker_process_count=0) + + self._check_func = check_func + self._loop = None + self._interval = interval + self._initial_delay = initial_delay + + def start(self): + super(AgentStatusCheckWorker, self).start() + if self._loop is None: + self._loop = loopingcall.FixedIntervalLoopingCall(self._check_func) + self._loop.start(interval=self._interval, + initial_delay=self._initial_delay) + + def wait(self): + if self._loop is not None: + self._loop.wait() + + def stop(self): + if self._loop is not None: + self._loop.stop() + + def reset(self): + if self._loop is not None: + self.stop() + self.wait() + self.start() + + class AgentSchedulerDbMixin(agents_db.AgentDbMixin): """Common class for agent scheduler mixins.""" @@ -121,18 +154,16 @@ class AgentSchedulerDbMixin(agents_db.AgentDbMixin): return result def add_agent_status_check(self, function): - loop = loopingcall.FixedIntervalLoopingCall(function) # TODO(enikanorov): make interval configurable rather than computed interval = max(cfg.CONF.agent_down_time // 2, 1) # add random initial delay to allow agents to check in after the # neutron server first starts. random to offset multiple servers initial_delay = random.randint(interval, interval * 2) - loop.start(interval=interval, initial_delay=initial_delay) - if hasattr(self, 'periodic_agent_loops'): - self.periodic_agent_loops.append(loop) - else: - self.periodic_agent_loops = [loop] + check_worker = AgentStatusCheckWorker(function, interval, + initial_delay) + + self.add_worker(check_worker) def agent_dead_limit_seconds(self): return cfg.CONF.agent_down_time * 2 @@ -167,6 +198,13 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler network_scheduler = None def start_periodic_dhcp_agent_status_check(self): + LOG.warning( + _LW("DEPRECATED method 'start_periodic_dhcp_agent_status_check'. " + "Please use 'add_periodic_dhcp_agent_status_check' instead") + ) + self.add_periodic_dhcp_agent_status_check() + + def add_periodic_dhcp_agent_status_check(self): if not cfg.CONF.allow_automatic_dhcp_failover: LOG.info(_LI("Skipping periodic DHCP agent status check because " "automatic network rescheduling is disabled.")) diff --git a/neutron/db/l3_agentschedulers_db.py b/neutron/db/l3_agentschedulers_db.py index b0516072ee1..e98e3bf294e 100644 --- a/neutron/db/l3_agentschedulers_db.py +++ b/neutron/db/l3_agentschedulers_db.py @@ -83,6 +83,13 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase, router_scheduler = None def start_periodic_l3_agent_status_check(self): + LOG.warning( + _LW("DEPRECATED method 'start_periodic_l3_agent_status_check'. " + "Please use 'add_periodic_l3_agent_status_check' instead") + ) + self.add_periodic_l3_agent_status_check() + + def add_periodic_l3_agent_status_check(self): if not cfg.CONF.allow_automatic_l3agent_failover: LOG.info(_LI("Skipping period L3 agent status check because " "automatic router rescheduling is disabled.")) diff --git a/neutron/neutron_plugin_base_v2.py b/neutron/neutron_plugin_base_v2.py index 12ed5bba1e6..513890f7822 100644 --- a/neutron/neutron_plugin_base_v2.py +++ b/neutron/neutron_plugin_base_v2.py @@ -24,9 +24,11 @@ import abc import six +from neutron import worker as neutron_worker + @six.add_metaclass(abc.ABCMeta) -class NeutronPluginBaseV2(object): +class NeutronPluginBaseV2(neutron_worker.WorkerSupportServiceMixin): @abc.abstractmethod def create_subnet(self, context, subnet): @@ -409,12 +411,3 @@ class NeutronPluginBaseV2(object): """ return (self.__class__.start_rpc_state_reports_listener != NeutronPluginBaseV2.start_rpc_state_reports_listener) - - def get_workers(self): - """Returns a collection NeutronWorker instances - - If a plugin needs to define worker processes outside of API/RPC workers - then it will override this and return a collection of NeutronWorker - instances - """ - return () diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index c361475c5ab..68861178c77 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -162,6 +162,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self._setup_dhcp() self._start_rpc_notifiers() self.add_agent_status_check(self.agent_health_check) + self.add_workers(self.mechanism_manager.get_workers()) self._verify_service_plugins_requirements() LOG.info(_LI("Modular L2 Plugin initialization complete")) @@ -182,7 +183,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self.network_scheduler = importutils.import_object( cfg.CONF.network_scheduler_driver ) - self.start_periodic_dhcp_agent_status_check() + self.add_periodic_dhcp_agent_status_check() def _verify_service_plugins_requirements(self): for service_plugin in cfg.CONF.service_plugins: @@ -1624,9 +1625,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, return port.id return device - def get_workers(self): - return self.mechanism_manager.get_workers() - def filter_hosts_with_network_access( self, context, network_id, candidate_hosts): segments = db.get_network_segments(context.session, network_id) diff --git a/neutron/server/rpc_eventlet.py b/neutron/server/rpc_eventlet.py index 9b218cdaeb5..36118a034f9 100644 --- a/neutron/server/rpc_eventlet.py +++ b/neutron/server/rpc_eventlet.py @@ -18,8 +18,9 @@ # If ../neutron/__init__.py exists, add ../ to Python search path, so that # it will override what happens to be installed in /usr/(local/)lib/python... -import eventlet +from oslo_config import cfg from oslo_log import log +from oslo_service import service as common_service from neutron._i18n import _LI from neutron import service @@ -28,13 +29,14 @@ LOG = log.getLogger(__name__) def eventlet_rpc_server(): - pool = eventlet.GreenPool() LOG.info(_LI("Eventlet based AMQP RPC server starting...")) + rpc_workers_launcher = common_service.ProcessLauncher( + cfg.CONF, wait_interval=1.0 + ) try: - neutron_rpc = service.serve_rpc() + service.start_rpc_workers(rpc_workers_launcher) except NotImplementedError: LOG.info(_LI("RPC was already started in parent process by " "plugin.")) else: - pool.spawn(neutron_rpc.wait) - pool.waitall() + rpc_workers_launcher.wait() diff --git a/neutron/server/wsgi_eventlet.py b/neutron/server/wsgi_eventlet.py index 472ea6bf743..7381da8f834 100644 --- a/neutron/server/wsgi_eventlet.py +++ b/neutron/server/wsgi_eventlet.py @@ -12,7 +12,10 @@ # under the License. import eventlet + +from oslo_config import cfg from oslo_log import log +from oslo_service import service as common_service from neutron._i18n import _LI from neutron import service @@ -26,24 +29,24 @@ def eventlet_wsgi_server(): def start_api_and_rpc_workers(neutron_api): - pool = eventlet.GreenPool() - - api_thread = pool.spawn(neutron_api.wait) - try: - neutron_rpc = service.serve_rpc() + plugin_workers_launcher = common_service.ProcessLauncher( + cfg.CONF, wait_interval=1.0 + ) + service.start_rpc_workers(plugin_workers_launcher) + + pool = eventlet.GreenPool() + api_thread = pool.spawn(neutron_api.wait) + plugin_workers_thread = pool.spawn(plugin_workers_launcher.wait) + + # api and other workers should die together. When one dies, + # kill the other. + api_thread.link(lambda gt: plugin_workers_thread.kill()) + plugin_workers_thread.link(lambda gt: api_thread.kill()) + + pool.waitall() except NotImplementedError: LOG.info(_LI("RPC was already started in parent process by " "plugin.")) - else: - rpc_thread = pool.spawn(neutron_rpc.wait) - plugin_workers = service.start_plugin_workers() - for worker in plugin_workers: - pool.spawn(worker.wait) - - # api and rpc should die together. When one dies, kill the other. - rpc_thread.link(lambda gt: api_thread.kill()) - api_thread.link(lambda gt: rpc_thread.kill()) - - pool.waitall() + neutron_api.wait() diff --git a/neutron/service.py b/neutron/service.py index 6510f579667..7bd74467738 100644 --- a/neutron/service.py +++ b/neutron/service.py @@ -32,7 +32,7 @@ from neutron.common import rpc as n_rpc from neutron import context from neutron.db import api as session from neutron import manager -from neutron import worker +from neutron import worker as neutron_worker from neutron import wsgi @@ -113,25 +113,15 @@ def serve_wsgi(cls): return service -def start_plugin_workers(): - launchers = [] - # NOTE(twilson) get_service_plugins also returns the core plugin - for plugin in manager.NeutronManager.get_unique_service_plugins(): - # TODO(twilson) Instead of defaulting here, come up with a good way to - # share a common get_workers default between NeutronPluginBaseV2 and - # ServicePluginBase - for plugin_worker in getattr(plugin, 'get_workers', tuple)(): - launcher = common_service.ProcessLauncher(cfg.CONF) - launcher.launch_service(plugin_worker) - launchers.append(launcher) - return launchers - - -class RpcWorker(worker.NeutronWorker): +class RpcWorker(neutron_worker.NeutronWorker): """Wraps a worker to be handled by ProcessLauncher""" start_listeners_method = 'start_rpc_listeners' - def __init__(self, plugins): + def __init__(self, plugins, worker_process_count=1): + super(RpcWorker, self).__init__( + worker_process_count=worker_process_count + ) + self._plugins = plugins self._servers = [] @@ -178,7 +168,7 @@ class RpcReportsWorker(RpcWorker): start_listeners_method = 'start_rpc_state_reports_listener' -def serve_rpc(): +def _get_rpc_workers(): plugin = manager.NeutronManager.get_plugin() service_plugins = ( manager.NeutronManager.get_service_plugins().values()) @@ -198,31 +188,101 @@ def serve_rpc(): cfg.CONF.rpc_workers) raise NotImplementedError() + # passing service plugins only, because core plugin is among them + rpc_workers = [RpcWorker(service_plugins, + worker_process_count=cfg.CONF.rpc_workers)] + + if (cfg.CONF.rpc_state_report_workers > 0 and + plugin.rpc_state_report_workers_supported()): + rpc_workers.append( + RpcReportsWorker( + [plugin], + worker_process_count=cfg.CONF.rpc_state_report_workers + ) + ) + return rpc_workers + + +class AllServicesNeutronWorker(neutron_worker.NeutronWorker): + def __init__(self, services, worker_process_count=1): + super(AllServicesNeutronWorker, self).__init__(worker_process_count) + self._services = services + self._launcher = common_service.Launcher(cfg.CONF) + + def start(self): + for srv in self._services: + self._launcher.launch_service(srv) + super(AllServicesNeutronWorker, self).start() + + def stop(self): + self._launcher.stop() + + def wait(self): + self._launcher.wait() + + def reset(self): + self._launcher.restart() + + +def _start_workers(worker_launcher, workers): + if not workers: + return try: - # passing service plugins only, because core plugin is among them - rpc = RpcWorker(service_plugins) # dispose the whole pool before os.fork, otherwise there will # be shared DB connections in child processes which may cause # DB errors. LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers) session.dispose() - launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0) - launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers) - if (cfg.CONF.rpc_state_report_workers > 0 and - plugin.rpc_state_report_workers_supported()): - rpc_state_rep = RpcReportsWorker([plugin]) - LOG.debug('using launcher for state reports rpc, workers=%s', - cfg.CONF.rpc_state_report_workers) - launcher.launch_service( - rpc_state_rep, workers=cfg.CONF.rpc_state_report_workers) - return launcher + for worker in workers: + worker_launcher.launch_service(worker, worker.worker_process_count) except Exception: with excutils.save_and_reraise_exception(): LOG.exception(_LE('Unrecoverable error: please check log for ' 'details.')) +def start_plugin_workers(worker_launcher): + # NOTE(twilson) get_service_plugins also returns the core plugin + plugins = manager.NeutronManager.get_unique_service_plugins() + + # TODO(twilson) Instead of defaulting here, come up with a good way to + # share a common get_workers default between NeutronPluginBaseV2 and + # ServicePluginBase + plugin_workers = [ + plugin_worker + for plugin in plugins if hasattr(plugin, 'get_workers') + for plugin_worker in plugin.get_workers() + ] + + # we need to fork all necessary processes before spawning extra threads + # to avoid problems with infinite resource locking and other concurrency + # problems + process_plugin_workers = [ + plugin_worker for plugin_worker in plugin_workers + if plugin_worker.worker_process_count > 0 + ] + + thread_plugin_workers = [ + plugin_worker for plugin_worker in plugin_workers + if plugin_worker.worker_process_count < 1 + ] + + # add extra process worker and spawn there all workers with + # worker_process_count == 0 + if thread_plugin_workers: + process_plugin_workers.append( + AllServicesNeutronWorker(thread_plugin_workers) + ) + + _start_workers(worker_launcher, process_plugin_workers) + + +def start_rpc_workers(worker_launcher): + rpc_workers = _get_rpc_workers() + _start_workers(worker_launcher, rpc_workers) + + def _get_api_workers(): workers = cfg.CONF.api_workers if not workers: diff --git a/neutron/services/l3_router/l3_router_plugin.py b/neutron/services/l3_router/l3_router_plugin.py index c10668cbc11..ef89c626612 100644 --- a/neutron/services/l3_router/l3_router_plugin.py +++ b/neutron/services/l3_router/l3_router_plugin.py @@ -32,6 +32,7 @@ from neutron.db import l3_gwmode_db from neutron.db import l3_hamode_db from neutron.plugins.common import constants from neutron.quota import resource_registry +from neutron import service from neutron.services import service_base @@ -62,20 +63,23 @@ class L3RouterPlugin(service_base.ServicePluginBase, def __init__(self): self.router_scheduler = importutils.import_object( cfg.CONF.router_scheduler_driver) - self.start_periodic_l3_agent_status_check() + self.add_periodic_l3_agent_status_check() super(L3RouterPlugin, self).__init__() if 'dvr' in self.supported_extension_aliases: l3_dvrscheduler_db.subscribe() l3_db.subscribe() - self.start_rpc_listeners() + self.agent_notifiers.update( + {n_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()}) + + rpc_worker = service.RpcWorker([self], worker_process_count=0) + + self.add_worker(rpc_worker) @log_helpers.log_method_call def start_rpc_listeners(self): # RPC support self.topic = topics.L3PLUGIN self.conn = n_rpc.create_connection() - self.agent_notifiers.update( - {n_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()}) self.endpoints = [l3_rpc.L3RpcCallback()] self.conn.create_consumer(self.topic, self.endpoints, fanout=False) diff --git a/neutron/services/metering/metering_plugin.py b/neutron/services/metering/metering_plugin.py index 11a928642a0..f7cb42411e6 100644 --- a/neutron/services/metering/metering_plugin.py +++ b/neutron/services/metering/metering_plugin.py @@ -17,6 +17,7 @@ from neutron.common import rpc as n_rpc from neutron.common import topics from neutron.db.metering import metering_db from neutron.db.metering import metering_rpc +from neutron import service class MeteringPlugin(metering_db.MeteringDbMixin): @@ -28,7 +29,9 @@ class MeteringPlugin(metering_db.MeteringDbMixin): super(MeteringPlugin, self).__init__() self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI() - self.start_rpc_listeners() + rpc_worker = service.RpcWorker([self], worker_process_count=0) + + self.add_worker(rpc_worker) def start_rpc_listeners(self): self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)] diff --git a/neutron/services/service_base.py b/neutron/services/service_base.py index 5fa9bcf388d..33da0286d15 100644 --- a/neutron/services/service_base.py +++ b/neutron/services/service_base.py @@ -24,12 +24,14 @@ from neutron._i18n import _, _LE, _LI from neutron.api import extensions from neutron.db import servicetype_db as sdb from neutron.services import provider_configuration as pconf +from neutron import worker as neutron_worker LOG = logging.getLogger(__name__) @six.add_metaclass(abc.ABCMeta) -class ServicePluginBase(extensions.PluginInterface): +class ServicePluginBase(extensions.PluginInterface, + neutron_worker.WorkerSupportServiceMixin): """Define base interface for any Advanced Service plugin.""" supported_extension_aliases = [] @@ -46,10 +48,6 @@ class ServicePluginBase(extensions.PluginInterface): """Return string description of the plugin.""" pass - def get_workers(self): - """Returns a collection of NeutronWorkers""" - return () - def load_drivers(service_type, plugin): """Loads drivers for specific service. diff --git a/neutron/tests/base.py b/neutron/tests/base.py index 6e3d8b3a9bc..6488cabc3d3 100644 --- a/neutron/tests/base.py +++ b/neutron/tests/base.py @@ -414,7 +414,7 @@ class PluginFixture(fixtures.Fixture): self.patched_default_svc_plugins = self.default_svc_plugins_p.start() self.dhcp_periodic_p = mock.patch( 'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.' - 'start_periodic_dhcp_agent_status_check') + 'add_periodic_dhcp_agent_status_check') self.patched_dhcp_periodic = self.dhcp_periodic_p.start() self.agent_health_check_p = mock.patch( 'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.' diff --git a/neutron/tests/functional/test_server.py b/neutron/tests/functional/test_server.py index 891bdf3276d..2c7615a5c05 100644 --- a/neutron/tests/functional/test_server.py +++ b/neutron/tests/functional/test_server.py @@ -22,12 +22,13 @@ import traceback import httplib2 import mock from oslo_config import cfg +from oslo_service import service as common_service import psutil from neutron.agent.linux import utils from neutron import service from neutron.tests import base -from neutron import worker +from neutron import worker as neutron_worker from neutron import wsgi @@ -244,8 +245,9 @@ class TestRPCServer(TestNeutronServer): # not interested in state report workers specifically CONF.set_override("rpc_state_report_workers", 0) - launcher = service.serve_rpc() - launcher.wait() + rpc_workers_launcher = common_service.ProcessLauncher(CONF) + service.start_rpc_workers(rpc_workers_launcher) + rpc_workers_launcher.wait() def test_restart_rpc_on_sighup_multiple_workers(self): self._test_restart_service_on_sighup(service=self._serve_rpc, @@ -264,12 +266,12 @@ class TestPluginWorker(TestNeutronServer): def _start_plugin(self, workers=1): with mock.patch('neutron.manager.NeutronManager.get_plugin') as gp: gp.return_value = self.plugin - launchers = service.start_plugin_workers() - for launcher in launchers: - launcher.wait() + plugin_workers_launcher = common_service.ProcessLauncher(CONF) + service.start_plugin_workers(plugin_workers_launcher) + plugin_workers_launcher.wait() def test_start(self): - class FakeWorker(worker.NeutronWorker): + class FakeWorker(neutron_worker.NeutronWorker): def start(self): pass diff --git a/neutron/tests/unit/db/test_agentschedulers_db.py b/neutron/tests/unit/db/test_agentschedulers_db.py index 692e1b96de1..655073846f9 100644 --- a/neutron/tests/unit/db/test_agentschedulers_db.py +++ b/neutron/tests/unit/db/test_agentschedulers_db.py @@ -256,7 +256,7 @@ class OvsAgentSchedulerTestCaseBase(test_l3.L3NatTestCaseMixin, self.patched_l3_notify = self.l3_notify_p.start() self.l3_periodic_p = mock.patch('neutron.db.l3_agentschedulers_db.' 'L3AgentSchedulerDbMixin.' - 'start_periodic_l3_agent_status_check') + 'add_periodic_l3_agent_status_check') self.patched_l3_periodic = self.l3_periodic_p.start() self.dhcp_notify_p = mock.patch( 'neutron.extensions.dhcpagentscheduler.notify') diff --git a/neutron/worker.py b/neutron/worker.py index 80a16533eba..a602d1c01f0 100644 --- a/neutron/worker.py +++ b/neutron/worker.py @@ -17,6 +17,38 @@ from neutron.callbacks import registry from neutron.callbacks import resources +class WorkerSupportServiceMixin(object): + + @property + def _workers(self): + try: + return self.__workers + except AttributeError: + self.__workers = [] + return self.__workers + + def get_workers(self): + """Returns a collection NeutronWorker instances needed by this service + """ + return list(self._workers) + + def add_worker(self, worker): + """Adds NeutronWorker needed for this service + + If a object needs to define workers thread/processes outside of API/RPC + workers then it will call this method to register worker. Should be + called on initialization stage before running services + """ + self._workers.append(worker) + + def add_workers(self, workers): + """Adds NeutronWorker list needed for this service + + The same as add_worker but adds a list of workers + """ + self._workers.extend(workers) + + class NeutronWorker(service.ServiceBase): """Partial implementation of the ServiceBase ABC @@ -36,5 +68,25 @@ class NeutronWorker(service.ServiceBase): super(MyPluginWorker, self).start() do_sync() """ + + # default class value for case when super().__init__ is not called + _worker_process_count = 1 + + def __init__(self, worker_process_count=_worker_process_count): + """ + Initialize worker + + :param worker_process_count: Defines how many processes to spawn for + worker: + 0 - spawn 1 new worker thread, + 1..N - spawn N new worker processes + """ + self._worker_process_count = worker_process_count + + @property + def worker_process_count(self): + return self._worker_process_count + def start(self): - registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start) + if self.worker_process_count > 0: + registry.notify(resources.PROCESS, events.AFTER_CREATE, self.start) diff --git a/neutron/wsgi.py b/neutron/wsgi.py index 684fb69cb87..f49358579c6 100644 --- a/neutron/wsgi.py +++ b/neutron/wsgi.py @@ -44,7 +44,7 @@ from neutron.common import config from neutron.common import exceptions as n_exc from neutron import context from neutron.db import api -from neutron import worker +from neutron import worker as neutron_worker socket_opts = [ cfg.IntOpt('backlog', @@ -74,9 +74,12 @@ def encode_body(body): return encodeutils.to_utf8(body) -class WorkerService(worker.NeutronWorker): +class WorkerService(neutron_worker.NeutronWorker): """Wraps a worker to be handled by ProcessLauncher""" - def __init__(self, service, application, disable_ssl=False): + def __init__(self, service, application, disable_ssl=False, + worker_process_count=0): + super(WorkerService, self).__init__(worker_process_count) + self._service = service self._application = application self._disable_ssl = disable_ssl @@ -188,7 +191,7 @@ class Server(object): self._launch(application, workers) def _launch(self, application, workers=0): - service = WorkerService(self, application, self.disable_ssl) + service = WorkerService(self, application, self.disable_ssl, workers) if workers < 1: # The API service should run in the current process. self._server = service @@ -206,7 +209,8 @@ class Server(object): # wait interval past the default of 0.01s. self._server = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0) - self._server.launch_service(service, workers=workers) + self._server.launch_service(service, + workers=service.worker_process_count) @property def host(self):