From d925b8fb9cc312d1d9a19f651f268dc48cb26d26 Mon Sep 17 00:00:00 2001 From: Carl Baldwin Date: Tue, 11 Feb 2014 05:55:29 +0000 Subject: [PATCH] Adds multiple RPC worker processes to neutron server blueprint multiple-rpc-workers Co-Authored-By: Terry Wilson Change-Id: I51f2a52add6c11af905e6f1e6e45d31731ebbb5d --- etc/neutron.conf | 5 ++ neutron/neutron_plugin_base_v2.py | 12 ++++ neutron/openstack/common/rpc/amqp.py | 2 +- neutron/plugins/ml2/plugin.py | 4 +- neutron/server/__init__.py | 24 ++++++- neutron/service.py | 63 +++++++++++++++++++ neutron/tests/unit/ml2/test_port_binding.py | 1 + neutron/tests/unit/ml2/test_security_group.py | 5 ++ 8 files changed, 112 insertions(+), 4 deletions(-) diff --git a/etc/neutron.conf b/etc/neutron.conf index e6f699ae404..8aa5cb2879a 100644 --- a/etc/neutron.conf +++ b/etc/neutron.conf @@ -257,6 +257,11 @@ notification_driver = neutron.openstack.common.notifier.rpc_notifier # child processes as workers. The parent process manages them. # api_workers = 0 +# Number of separate RPC worker processes to spawn. The default, 0, runs the +# worker thread in the current process. Greater than 0 launches that number of +# child processes as RPC workers. The parent process manages them. +# rpc_workers = 0 + # Sets the value of TCP_KEEPIDLE in seconds to use for each server socket when # starting API server. Not supported on OS X. # tcp_keepidle = 600 diff --git a/neutron/neutron_plugin_base_v2.py b/neutron/neutron_plugin_base_v2.py index 603f23e6dfd..046ed1ae846 100644 --- a/neutron/neutron_plugin_base_v2.py +++ b/neutron/neutron_plugin_base_v2.py @@ -324,3 +324,15 @@ class NeutronPluginBaseV2(object): :param id: UUID representing the port to delete. """ pass + + def start_rpc_listener(self): + """Start the rpc listener. + + Most plugins start an RPC listener implicitly on initialization. In + order to support multiple process RPC, the plugin needs to expose + control over when this is started. + + .. note:: this method is optional, as it was not part of the originally + defined plugin API. + """ + raise NotImplementedError diff --git a/neutron/openstack/common/rpc/amqp.py b/neutron/openstack/common/rpc/amqp.py index b74fad05985..a0fa8cef22a 100644 --- a/neutron/openstack/common/rpc/amqp.py +++ b/neutron/openstack/common/rpc/amqp.py @@ -174,7 +174,7 @@ class ConnectionContext(rpc_common.Connection): ack_on_error) def consume_in_thread(self): - self.connection.consume_in_thread() + return self.connection.consume_in_thread() def __getattr__(self, key): """Proxy all other calls to the Connection instance.""" diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 0e07cff9a79..77032a6c1af 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -122,13 +122,15 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self.agent_notifiers[const.AGENT_TYPE_DHCP] = ( dhcp_rpc_agent_api.DhcpAgentNotifyAPI() ) + + def start_rpc_listener(self): self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager) self.topic = topics.PLUGIN self.conn = c_rpc.create_connection(new=True) self.dispatcher = self.callbacks.create_rpc_dispatcher() self.conn.create_consumer(self.topic, self.dispatcher, fanout=False) - self.conn.consume_in_thread() + return self.conn.consume_in_thread() def _process_provider_segment(self, segment): network_type = self._get_attribute(segment, provider.NETWORK_TYPE) diff --git a/neutron/server/__init__.py b/neutron/server/__init__.py index 8c3ebc66c60..d5177cb237b 100755 --- a/neutron/server/__init__.py +++ b/neutron/server/__init__.py @@ -27,8 +27,11 @@ from neutron.common import config from neutron import service from neutron.openstack.common import gettextutils +from neutron.openstack.common import log as logging gettextutils.install('neutron', lazy=True) +LOG = logging.getLogger(__name__) + def main(): eventlet.monkey_patch() @@ -40,8 +43,25 @@ def main(): " search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and" " the '--config-file' option!")) try: - neutron_service = service.serve_wsgi(service.NeutronApiService) - neutron_service.wait() + pool = eventlet.GreenPool() + + neutron_api = service.serve_wsgi(service.NeutronApiService) + api_thread = pool.spawn(neutron_api.wait) + + try: + neutron_rpc = service.serve_rpc() + except NotImplementedError: + LOG.info(_("RPC was already started in parent process by plugin.")) + else: + rpc_thread = pool.spawn(neutron_rpc.wait) + + # api and rpc should die together. When one dies, kill the other. + rpc_thread.link(lambda gt: api_thread.kill()) + api_thread.link(lambda gt: rpc_thread.kill()) + + pool.waitall() + except KeyboardInterrupt: + pass except RuntimeError as e: sys.exit(_("ERROR: %s") % e) diff --git a/neutron/service.py b/neutron/service.py index 2444baca89c..f0351565686 100644 --- a/neutron/service.py +++ b/neutron/service.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import eventlet import inspect import logging as std_logging import os @@ -23,11 +24,15 @@ from oslo.config import cfg from neutron.common import config from neutron.common import legacy from neutron import context +from neutron import manager +from neutron import neutron_plugin_base_v2 +from neutron.openstack.common.db.sqlalchemy import session from neutron.openstack.common import excutils from neutron.openstack.common import importutils from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall from neutron.openstack.common.rpc import service +from neutron.openstack.common.service import ProcessLauncher from neutron import wsgi @@ -38,6 +43,9 @@ service_opts = [ cfg.IntOpt('api_workers', default=0, help=_('Number of separate worker processes for service')), + cfg.IntOpt('rpc_workers', + default=0, + help=_('Number of RPC worker processes for service')), cfg.IntOpt('periodic_fuzzy_delay', default=5, help=_('Range of seconds to randomly delay when starting the ' @@ -108,6 +116,61 @@ def serve_wsgi(cls): return service +class RpcWorker(object): + """Wraps a worker to be handled by ProcessLauncher""" + def __init__(self, plugin): + self._plugin = plugin + self._server = None + + def start(self): + # We may have just forked from parent process. A quick disposal of the + # existing sql connections avoids producing errors later when they are + # discovered to be broken. + session.get_engine(sqlite_fk=True).pool.dispose() + self._server = self._plugin.start_rpc_listener() + + def wait(self): + if isinstance(self._server, eventlet.greenthread.GreenThread): + self._server.wait() + + def stop(self): + if isinstance(self._server, eventlet.greenthread.GreenThread): + self._server.kill() + self._server = None + + +def serve_rpc(): + plugin = manager.NeutronManager.get_plugin() + + # If 0 < rpc_workers then start_rpc_listener would be called in a + # subprocess and we cannot simply catch the NotImplementedError. It is + # simpler to check this up front by testing whether the plugin overrides + # start_rpc_listener. + base = neutron_plugin_base_v2.NeutronPluginBaseV2 + if plugin.__class__.start_rpc_listener == base.start_rpc_listener: + LOG.debug(_("Active plugin doesn't implement start_rpc_listener")) + if 0 < cfg.CONF.rpc_workers: + msg = _("'rpc_workers = %d' ignored because start_rpc_listener " + "is not implemented.") + LOG.error(msg, cfg.CONF.rpc_workers) + raise NotImplementedError + + try: + rpc = RpcWorker(plugin) + + if cfg.CONF.rpc_workers < 1: + rpc.start() + return rpc + else: + launcher = ProcessLauncher(wait_interval=1.0) + launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers) + return launcher + except Exception: + with excutils.save_and_reraise_exception(): + LOG.exception(_('Unrecoverable error: please check log ' + 'for details.')) + + def _run_wsgi(app_name): app = config.load_paste_app(app_name) if not app: diff --git a/neutron/tests/unit/ml2/test_port_binding.py b/neutron/tests/unit/ml2/test_port_binding.py index 86e066892df..d2c0cb1b7ca 100644 --- a/neutron/tests/unit/ml2/test_port_binding.py +++ b/neutron/tests/unit/ml2/test_port_binding.py @@ -40,6 +40,7 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase): super(PortBindingTestCase, self).setUp(PLUGIN_NAME) self.port_create_status = 'DOWN' self.plugin = manager.NeutronManager.get_plugin() + self.plugin.start_rpc_listener() def _check_response(self, port, vif_type, has_port_filter, bound): self.assertEqual(port[portbindings.VIF_TYPE], vif_type) diff --git a/neutron/tests/unit/ml2/test_security_group.py b/neutron/tests/unit/ml2/test_security_group.py index ff0bc39f5e9..34d71a71efe 100644 --- a/neutron/tests/unit/ml2/test_security_group.py +++ b/neutron/tests/unit/ml2/test_security_group.py @@ -51,6 +51,11 @@ class Ml2SecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase): class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase, test_sg.TestSecurityGroups, test_sg_rpc.SGNotificationTestMixin): + def setUp(self): + super(TestMl2SecurityGroups, self).setUp() + plugin = manager.NeutronManager.get_plugin() + plugin.start_rpc_listener() + def test_security_group_get_port_from_device(self): with self.network() as n: with self.subnet(n):