Adds multiple RPC worker processes to neutron server
blueprint multiple-rpc-workers Co-Authored-By: Terry Wilson<twilson@redhat.com> Change-Id: I51f2a52add6c11af905e6f1e6e45d31731ebbb5d
This commit is contained in:
parent
78366adca8
commit
d925b8fb9c
@ -257,6 +257,11 @@ notification_driver = neutron.openstack.common.notifier.rpc_notifier
|
||||
# child processes as workers. The parent process manages them.
|
||||
# api_workers = 0
|
||||
|
||||
# Number of separate RPC worker processes to spawn. The default, 0, runs the
|
||||
# worker thread in the current process. Greater than 0 launches that number of
|
||||
# child processes as RPC workers. The parent process manages them.
|
||||
# rpc_workers = 0
|
||||
|
||||
# Sets the value of TCP_KEEPIDLE in seconds to use for each server socket when
|
||||
# starting API server. Not supported on OS X.
|
||||
# tcp_keepidle = 600
|
||||
|
@ -324,3 +324,15 @@ class NeutronPluginBaseV2(object):
|
||||
:param id: UUID representing the port to delete.
|
||||
"""
|
||||
pass
|
||||
|
||||
def start_rpc_listener(self):
|
||||
"""Start the rpc listener.
|
||||
|
||||
Most plugins start an RPC listener implicitly on initialization. In
|
||||
order to support multiple process RPC, the plugin needs to expose
|
||||
control over when this is started.
|
||||
|
||||
.. note:: this method is optional, as it was not part of the originally
|
||||
defined plugin API.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
@ -174,7 +174,7 @@ class ConnectionContext(rpc_common.Connection):
|
||||
ack_on_error)
|
||||
|
||||
def consume_in_thread(self):
|
||||
self.connection.consume_in_thread()
|
||||
return self.connection.consume_in_thread()
|
||||
|
||||
def __getattr__(self, key):
|
||||
"""Proxy all other calls to the Connection instance."""
|
||||
|
@ -122,13 +122,15 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
|
||||
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
)
|
||||
|
||||
def start_rpc_listener(self):
|
||||
self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager)
|
||||
self.topic = topics.PLUGIN
|
||||
self.conn = c_rpc.create_connection(new=True)
|
||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.conn.create_consumer(self.topic, self.dispatcher,
|
||||
fanout=False)
|
||||
self.conn.consume_in_thread()
|
||||
return self.conn.consume_in_thread()
|
||||
|
||||
def _process_provider_segment(self, segment):
|
||||
network_type = self._get_attribute(segment, provider.NETWORK_TYPE)
|
||||
|
@ -27,8 +27,11 @@ from neutron.common import config
|
||||
from neutron import service
|
||||
|
||||
from neutron.openstack.common import gettextutils
|
||||
from neutron.openstack.common import log as logging
|
||||
gettextutils.install('neutron', lazy=True)
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def main():
|
||||
eventlet.monkey_patch()
|
||||
@ -40,8 +43,25 @@ def main():
|
||||
" search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and"
|
||||
" the '--config-file' option!"))
|
||||
try:
|
||||
neutron_service = service.serve_wsgi(service.NeutronApiService)
|
||||
neutron_service.wait()
|
||||
pool = eventlet.GreenPool()
|
||||
|
||||
neutron_api = service.serve_wsgi(service.NeutronApiService)
|
||||
api_thread = pool.spawn(neutron_api.wait)
|
||||
|
||||
try:
|
||||
neutron_rpc = service.serve_rpc()
|
||||
except NotImplementedError:
|
||||
LOG.info(_("RPC was already started in parent process by plugin."))
|
||||
else:
|
||||
rpc_thread = pool.spawn(neutron_rpc.wait)
|
||||
|
||||
# api and rpc should die together. When one dies, kill the other.
|
||||
rpc_thread.link(lambda gt: api_thread.kill())
|
||||
api_thread.link(lambda gt: rpc_thread.kill())
|
||||
|
||||
pool.waitall()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except RuntimeError as e:
|
||||
sys.exit(_("ERROR: %s") % e)
|
||||
|
||||
|
@ -13,6 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
import inspect
|
||||
import logging as std_logging
|
||||
import os
|
||||
@ -23,11 +24,15 @@ from oslo.config import cfg
|
||||
from neutron.common import config
|
||||
from neutron.common import legacy
|
||||
from neutron import context
|
||||
from neutron import manager
|
||||
from neutron import neutron_plugin_base_v2
|
||||
from neutron.openstack.common.db.sqlalchemy import session
|
||||
from neutron.openstack.common import excutils
|
||||
from neutron.openstack.common import importutils
|
||||
from neutron.openstack.common import log as logging
|
||||
from neutron.openstack.common import loopingcall
|
||||
from neutron.openstack.common.rpc import service
|
||||
from neutron.openstack.common.service import ProcessLauncher
|
||||
from neutron import wsgi
|
||||
|
||||
|
||||
@ -38,6 +43,9 @@ service_opts = [
|
||||
cfg.IntOpt('api_workers',
|
||||
default=0,
|
||||
help=_('Number of separate worker processes for service')),
|
||||
cfg.IntOpt('rpc_workers',
|
||||
default=0,
|
||||
help=_('Number of RPC worker processes for service')),
|
||||
cfg.IntOpt('periodic_fuzzy_delay',
|
||||
default=5,
|
||||
help=_('Range of seconds to randomly delay when starting the '
|
||||
@ -108,6 +116,61 @@ def serve_wsgi(cls):
|
||||
return service
|
||||
|
||||
|
||||
class RpcWorker(object):
|
||||
"""Wraps a worker to be handled by ProcessLauncher"""
|
||||
def __init__(self, plugin):
|
||||
self._plugin = plugin
|
||||
self._server = None
|
||||
|
||||
def start(self):
|
||||
# We may have just forked from parent process. A quick disposal of the
|
||||
# existing sql connections avoids producing errors later when they are
|
||||
# discovered to be broken.
|
||||
session.get_engine(sqlite_fk=True).pool.dispose()
|
||||
self._server = self._plugin.start_rpc_listener()
|
||||
|
||||
def wait(self):
|
||||
if isinstance(self._server, eventlet.greenthread.GreenThread):
|
||||
self._server.wait()
|
||||
|
||||
def stop(self):
|
||||
if isinstance(self._server, eventlet.greenthread.GreenThread):
|
||||
self._server.kill()
|
||||
self._server = None
|
||||
|
||||
|
||||
def serve_rpc():
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
|
||||
# If 0 < rpc_workers then start_rpc_listener would be called in a
|
||||
# subprocess and we cannot simply catch the NotImplementedError. It is
|
||||
# simpler to check this up front by testing whether the plugin overrides
|
||||
# start_rpc_listener.
|
||||
base = neutron_plugin_base_v2.NeutronPluginBaseV2
|
||||
if plugin.__class__.start_rpc_listener == base.start_rpc_listener:
|
||||
LOG.debug(_("Active plugin doesn't implement start_rpc_listener"))
|
||||
if 0 < cfg.CONF.rpc_workers:
|
||||
msg = _("'rpc_workers = %d' ignored because start_rpc_listener "
|
||||
"is not implemented.")
|
||||
LOG.error(msg, cfg.CONF.rpc_workers)
|
||||
raise NotImplementedError
|
||||
|
||||
try:
|
||||
rpc = RpcWorker(plugin)
|
||||
|
||||
if cfg.CONF.rpc_workers < 1:
|
||||
rpc.start()
|
||||
return rpc
|
||||
else:
|
||||
launcher = ProcessLauncher(wait_interval=1.0)
|
||||
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
|
||||
return launcher
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.exception(_('Unrecoverable error: please check log '
|
||||
'for details.'))
|
||||
|
||||
|
||||
def _run_wsgi(app_name):
|
||||
app = config.load_paste_app(app_name)
|
||||
if not app:
|
||||
|
@ -40,6 +40,7 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
|
||||
super(PortBindingTestCase, self).setUp(PLUGIN_NAME)
|
||||
self.port_create_status = 'DOWN'
|
||||
self.plugin = manager.NeutronManager.get_plugin()
|
||||
self.plugin.start_rpc_listener()
|
||||
|
||||
def _check_response(self, port, vif_type, has_port_filter, bound):
|
||||
self.assertEqual(port[portbindings.VIF_TYPE], vif_type)
|
||||
|
@ -51,6 +51,11 @@ class Ml2SecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
|
||||
class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase,
|
||||
test_sg.TestSecurityGroups,
|
||||
test_sg_rpc.SGNotificationTestMixin):
|
||||
def setUp(self):
|
||||
super(TestMl2SecurityGroups, self).setUp()
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
plugin.start_rpc_listener()
|
||||
|
||||
def test_security_group_get_port_from_device(self):
|
||||
with self.network() as n:
|
||||
with self.subnet(n):
|
||||
|
Loading…
Reference in New Issue
Block a user