[FWaaS_v2]: RPC listener should be served by rpc worker
When neutron-api is served by a web server which cannot have a thread to listen to the RPC [1]. Therefore, neutron-rpc-server will do that job. So RPC listener should be served by RPC worker instead of plugin worker. [1] http://lists.openstack.org/pipermail/openstack-dev/2018-June/131722.html Partially-implements: blueprint run-in-wsgi-server Change-Id: I2b251cc0bf799f43d3c601e4e4f9f103e2670270
This commit is contained in:
parent
d625886898
commit
f6e0a06f69
|
@ -13,6 +13,7 @@
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from neutron.db import servicetype_db as st_db
|
from neutron.db import servicetype_db as st_db
|
||||||
|
from neutron import service
|
||||||
from neutron.services import provider_configuration as provider_conf
|
from neutron.services import provider_configuration as provider_conf
|
||||||
from neutron.services import service_base
|
from neutron.services import service_base
|
||||||
from neutron_lib.api.definitions import firewall_v2
|
from neutron_lib.api.definitions import firewall_v2
|
||||||
|
@ -30,6 +31,7 @@ from oslo_log import log as logging
|
||||||
from neutron_fwaas.common import exceptions
|
from neutron_fwaas.common import exceptions
|
||||||
from neutron_fwaas.common import fwaas_constants
|
from neutron_fwaas.common import fwaas_constants
|
||||||
from neutron_fwaas.extensions.firewall_v2 import Firewallv2PluginBase
|
from neutron_fwaas.extensions.firewall_v2 import Firewallv2PluginBase
|
||||||
|
from neutron_fwaas.services.firewall.service_drivers import driver_api
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -63,6 +65,14 @@ class FirewallPluginV2(Firewallv2PluginBase):
|
||||||
|
|
||||||
self.driver = drivers[default_provider]
|
self.driver = drivers[default_provider]
|
||||||
|
|
||||||
|
# start rpc listener if driver required
|
||||||
|
if isinstance(self.driver, driver_api.FirewallDriverRPCMixin):
|
||||||
|
rpc_worker = service.RpcWorker([self], worker_process_count=0)
|
||||||
|
self.add_worker(rpc_worker)
|
||||||
|
|
||||||
|
def start_rpc_listeners(self):
|
||||||
|
return self.driver.start_rpc_listener()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def _core_plugin(self):
|
def _core_plugin(self):
|
||||||
return directory.get_plugin()
|
return directory.get_plugin()
|
||||||
|
|
|
@ -154,7 +154,8 @@ class FirewallAgentApi(object):
|
||||||
firewall_group=firewall_group, host=self.host)
|
firewall_group=firewall_group, host=self.host)
|
||||||
|
|
||||||
|
|
||||||
class FirewallAgentDriver(driver_api.FirewallDriverDB):
|
class FirewallAgentDriver(driver_api.FirewallDriverDB,
|
||||||
|
driver_api.FirewallDriverRPCMixin):
|
||||||
"""Firewall driver to implement agent messages and callback methods
|
"""Firewall driver to implement agent messages and callback methods
|
||||||
|
|
||||||
Implement RPC Firewall v2 API and callback methods for agents based on
|
Implement RPC Firewall v2 API and callback methods for agents based on
|
||||||
|
@ -164,15 +165,13 @@ class FirewallAgentDriver(driver_api.FirewallDriverDB):
|
||||||
def __init__(self, service_plugin):
|
def __init__(self, service_plugin):
|
||||||
super(FirewallAgentDriver, self).__init__(service_plugin)
|
super(FirewallAgentDriver, self).__init__(service_plugin)
|
||||||
self.agent_rpc = FirewallAgentApi(constants.FW_AGENT, cfg.CONF.host)
|
self.agent_rpc = FirewallAgentApi(constants.FW_AGENT, cfg.CONF.host)
|
||||||
self.start_rpc_listeners()
|
|
||||||
|
|
||||||
def start_rpc_listeners(self):
|
def start_rpc_listener(self):
|
||||||
self.endpoints = [FirewallAgentCallbacks(self.firewall_db)]
|
self.endpoints = [FirewallAgentCallbacks(self.firewall_db)]
|
||||||
|
|
||||||
self.rpc_connection = neutron_rpc.Connection()
|
self.rpc_connection = neutron_rpc.Connection()
|
||||||
self.rpc_connection.create_consumer(constants.FIREWALL_PLUGIN,
|
self.rpc_connection.create_consumer(constants.FIREWALL_PLUGIN,
|
||||||
self.endpoints, fanout=False)
|
self.endpoints, fanout=False)
|
||||||
self.rpc_connection.consume_in_threads()
|
return self.rpc_connection.consume_in_threads()
|
||||||
|
|
||||||
def _rpc_update_firewall_group(self, context, fwg_id):
|
def _rpc_update_firewall_group(self, context, fwg_id):
|
||||||
status_update = {"status": nl_constants.PENDING_UPDATE}
|
status_update = {"status": nl_constants.PENDING_UPDATE}
|
||||||
|
|
|
@ -433,3 +433,16 @@ class FirewallDriverDB(FirewallDriverDBMixin):
|
||||||
|
|
||||||
def remove_rule_postcommit(self, context, policy_id, rule_info):
|
def remove_rule_postcommit(self, context, policy_id, rule_info):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
|
class FirewallDriverRPCMixin(object):
|
||||||
|
"""FirewallAgent interface for driver with rpc callback listener.
|
||||||
|
|
||||||
|
Each firewall backend driver that needs a rpc callback listener should
|
||||||
|
inherit from this driver.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def start_rpc_listener(self):
|
||||||
|
pass
|
||||||
|
|
|
@ -107,8 +107,8 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase,
|
||||||
extra_service_plugins=l3_plugin,
|
extra_service_plugins=l3_plugin,
|
||||||
extra_extension_paths=neutron_extensions.__path__)
|
extra_extension_paths=neutron_extensions.__path__)
|
||||||
|
|
||||||
self.callbacks = self.plugin.driver.endpoints[0]
|
|
||||||
self.db = self.plugin.driver.firewall_db
|
self.db = self.plugin.driver.firewall_db
|
||||||
|
self.callbacks = agents.FirewallAgentCallbacks(self.db)
|
||||||
|
|
||||||
router_distributed_opts = [
|
router_distributed_opts = [
|
||||||
cfg.BoolOpt(
|
cfg.BoolOpt(
|
||||||
|
|
Loading…
Reference in New Issue