Removed create_rpc_dispatcher methods
Now that we don't have a special dispatcher class and we pass a list of endpoints to corresponding functions instead, those methods are unneeded. blueprint oslo-messaging Change-Id: If2b187fd8e553594212264f34b51b5b99c4630b2
This commit is contained in:
		@@ -108,15 +108,15 @@ class Service(service.Service):
 | 
				
			|||||||
        LOG.debug("Creating Consumer connection for Service %s" %
 | 
					        LOG.debug("Creating Consumer connection for Service %s" %
 | 
				
			||||||
                  self.topic)
 | 
					                  self.topic)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        dispatcher = [self.manager]
 | 
					        endpoints = [self.manager]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Share this same connection for these Consumers
 | 
					        # Share this same connection for these Consumers
 | 
				
			||||||
        self.conn.create_consumer(self.topic, dispatcher, fanout=False)
 | 
					        self.conn.create_consumer(self.topic, endpoints, fanout=False)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        node_topic = '%s.%s' % (self.topic, self.host)
 | 
					        node_topic = '%s.%s' % (self.topic, self.host)
 | 
				
			||||||
        self.conn.create_consumer(node_topic, dispatcher, fanout=False)
 | 
					        self.conn.create_consumer(node_topic, endpoints, fanout=False)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.conn.create_consumer(self.topic, dispatcher, fanout=True)
 | 
					        self.conn.create_consumer(self.topic, endpoints, fanout=True)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Hook to allow the manager to do other initializations after
 | 
					        # Hook to allow the manager to do other initializations after
 | 
				
			||||||
        # the rpc connection is created.
 | 
					        # the rpc connection is created.
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -30,9 +30,6 @@ class MeteringRpcCallbacks(object):
 | 
				
			|||||||
    def __init__(self, meter_plugin):
 | 
					    def __init__(self, meter_plugin):
 | 
				
			||||||
        self.meter_plugin = meter_plugin
 | 
					        self.meter_plugin = meter_plugin
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        return [self]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def get_sync_data_metering(self, context, **kwargs):
 | 
					    def get_sync_data_metering(self, context, **kwargs):
 | 
				
			||||||
        l3_plugin = manager.NeutronManager.get_service_plugins().get(
 | 
					        l3_plugin = manager.NeutronManager.get_service_plugins().get(
 | 
				
			||||||
            service_constants.L3_ROUTER_NAT)
 | 
					            service_constants.L3_ROUTER_NAT)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -105,10 +105,10 @@ class RestProxyAgent(rpc_compat.RpcCallback,
 | 
				
			|||||||
        self.topic = topics.AGENT
 | 
					        self.topic = topics.AGENT
 | 
				
			||||||
        self.plugin_rpc = PluginApi(topics.PLUGIN)
 | 
					        self.plugin_rpc = PluginApi(topics.PLUGIN)
 | 
				
			||||||
        self.context = q_context.get_admin_context_without_session()
 | 
					        self.context = q_context.get_admin_context_without_session()
 | 
				
			||||||
        self.dispatcher = [self]
 | 
					        self.endpoints = [self]
 | 
				
			||||||
        consumers = [[topics.PORT, topics.UPDATE],
 | 
					        consumers = [[topics.PORT, topics.UPDATE],
 | 
				
			||||||
                     [topics.SECURITY_GROUP, topics.UPDATE]]
 | 
					                     [topics.SECURITY_GROUP, topics.UPDATE]]
 | 
				
			||||||
        self.connection = agent_rpc.create_consumers(self.dispatcher,
 | 
					        self.connection = agent_rpc.create_consumers(self.endpoints,
 | 
				
			||||||
                                                     self.topic,
 | 
					                                                     self.topic,
 | 
				
			||||||
                                                     consumers)
 | 
					                                                     consumers)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -119,9 +119,6 @@ class RestProxyCallbacks(rpc_compat.RpcCallback,
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    RPC_API_VERSION = '1.1'
 | 
					    RPC_API_VERSION = '1.1'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        return [self, agents_db.AgentExtRpcCallback()]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def get_port_from_device(self, device):
 | 
					    def get_port_from_device(self, device):
 | 
				
			||||||
        port_id = re.sub(r"^tap", "", device)
 | 
					        port_id = re.sub(r"^tap", "", device)
 | 
				
			||||||
        port = self.get_port_and_sgs(port_id)
 | 
					        port = self.get_port_and_sgs(port_id)
 | 
				
			||||||
@@ -505,9 +502,9 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
 | 
				
			|||||||
        self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
 | 
					        self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
 | 
				
			||||||
            self._dhcp_agent_notifier
 | 
					            self._dhcp_agent_notifier
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
        self.callbacks = RestProxyCallbacks()
 | 
					        self.endpoints = [RestProxyCallbacks(),
 | 
				
			||||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
					                          agents_db.AgentExtRpcCallback()]
 | 
				
			||||||
        self.conn.create_consumer(self.topic, self.dispatcher,
 | 
					        self.conn.create_consumer(self.topic, self.endpoints,
 | 
				
			||||||
                                  fanout=False)
 | 
					                                  fanout=False)
 | 
				
			||||||
        # Consume from all consumers in threads
 | 
					        # Consume from all consumers in threads
 | 
				
			||||||
        self.conn.consume_in_threads()
 | 
					        self.conn.consume_in_threads()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -91,14 +91,6 @@ class BridgeRpcCallbacks(rpc_compat.RpcCallback,
 | 
				
			|||||||
    #   1.1 Support Security Group RPC
 | 
					    #   1.1 Support Security Group RPC
 | 
				
			||||||
    TAP_PREFIX_LEN = 3
 | 
					    TAP_PREFIX_LEN = 3
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        """Get the rpc dispatcher for this manager.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        If a manager would like to set an rpc API version, or support more than
 | 
					 | 
				
			||||||
        one class as the target of rpc messages, override this method.
 | 
					 | 
				
			||||||
        """
 | 
					 | 
				
			||||||
        return [self, agents_db.AgentExtRpcCallback()]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @classmethod
 | 
					    @classmethod
 | 
				
			||||||
    def get_port_from_device(cls, device):
 | 
					    def get_port_from_device(cls, device):
 | 
				
			||||||
        """Get port from the brocade specific db."""
 | 
					        """Get port from the brocade specific db."""
 | 
				
			||||||
@@ -262,10 +254,10 @@ class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 | 
				
			|||||||
        self.rpc_context = context.RequestContext('neutron', 'neutron',
 | 
					        self.rpc_context = context.RequestContext('neutron', 'neutron',
 | 
				
			||||||
                                                  is_admin=False)
 | 
					                                                  is_admin=False)
 | 
				
			||||||
        self.conn = rpc_compat.create_connection(new=True)
 | 
					        self.conn = rpc_compat.create_connection(new=True)
 | 
				
			||||||
        self.callbacks = BridgeRpcCallbacks()
 | 
					        self.endpoints = [BridgeRpcCallbacks(),
 | 
				
			||||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
					                          agents_db.AgentExtRpcCallback()]
 | 
				
			||||||
        for svc_topic in self.service_topics.values():
 | 
					        for svc_topic in self.service_topics.values():
 | 
				
			||||||
            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
 | 
					            self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
 | 
				
			||||||
        # Consume from all consumers in threads
 | 
					        # Consume from all consumers in threads
 | 
				
			||||||
        self.conn.consume_in_threads()
 | 
					        self.conn.consume_in_threads()
 | 
				
			||||||
        self.notifier = AgentNotifierApi(topics.AGENT)
 | 
					        self.notifier = AgentNotifierApi(topics.AGENT)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -68,14 +68,6 @@ class N1kvRpcCallbacks(rpc_compat.RpcCallback,
 | 
				
			|||||||
    # Set RPC API version to 1.1 by default.
 | 
					    # Set RPC API version to 1.1 by default.
 | 
				
			||||||
    RPC_API_VERSION = '1.1'
 | 
					    RPC_API_VERSION = '1.1'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        """Get the rpc dispatcher for this rpc manager.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        If a manager would like to set an rpc API version, or support more than
 | 
					 | 
				
			||||||
        one class as the target of rpc messages, override this method.
 | 
					 | 
				
			||||||
        """
 | 
					 | 
				
			||||||
        return [self, agents_db.AgentExtRpcCallback()]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 | 
					class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 | 
				
			||||||
                          external_net_db.External_net_db_mixin,
 | 
					                          external_net_db.External_net_db_mixin,
 | 
				
			||||||
@@ -135,9 +127,9 @@ class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 | 
				
			|||||||
        self.service_topics = {svc_constants.CORE: topics.PLUGIN,
 | 
					        self.service_topics = {svc_constants.CORE: topics.PLUGIN,
 | 
				
			||||||
                               svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
 | 
					                               svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
 | 
				
			||||||
        self.conn = rpc_compat.create_connection(new=True)
 | 
					        self.conn = rpc_compat.create_connection(new=True)
 | 
				
			||||||
        self.dispatcher = N1kvRpcCallbacks().create_rpc_dispatcher()
 | 
					        self.endpoints = [N1kvRpcCallbacks(), agents_db.AgentExtRpcCallback()]
 | 
				
			||||||
        for svc_topic in self.service_topics.values():
 | 
					        for svc_topic in self.service_topics.values():
 | 
				
			||||||
            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
 | 
					            self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
 | 
				
			||||||
        self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
 | 
					        self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
 | 
				
			||||||
        self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotifyAPI()
 | 
					        self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotifyAPI()
 | 
				
			||||||
        # Consume from all consumers in threads
 | 
					        # Consume from all consumers in threads
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -97,16 +97,13 @@ class HyperVSecurityAgent(rpc_compat.RpcCallback,
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    def _setup_rpc(self):
 | 
					    def _setup_rpc(self):
 | 
				
			||||||
        self.topic = topics.AGENT
 | 
					        self.topic = topics.AGENT
 | 
				
			||||||
        self.dispatcher = self._create_rpc_dispatcher()
 | 
					        self.endpoints = [HyperVSecurityCallbackMixin(self)]
 | 
				
			||||||
        consumers = [[topics.SECURITY_GROUP, topics.UPDATE]]
 | 
					        consumers = [[topics.SECURITY_GROUP, topics.UPDATE]]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.connection = agent_rpc.create_consumers(self.dispatcher,
 | 
					        self.connection = agent_rpc.create_consumers(self.endpoints,
 | 
				
			||||||
                                                     self.topic,
 | 
					                                                     self.topic,
 | 
				
			||||||
                                                     consumers)
 | 
					                                                     consumers)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        return [HyperVSecurityCallbackMixin(self)]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
class HyperVSecurityCallbackMixin(rpc_compat.RpcCallback,
 | 
					class HyperVSecurityCallbackMixin(rpc_compat.RpcCallback,
 | 
				
			||||||
                                  sg_rpc.SecurityGroupAgentRpcCallbackMixin):
 | 
					                                  sg_rpc.SecurityGroupAgentRpcCallbackMixin):
 | 
				
			||||||
@@ -165,13 +162,13 @@ class HyperVNeutronAgent(rpc_compat.RpcCallback):
 | 
				
			|||||||
        # RPC network init
 | 
					        # RPC network init
 | 
				
			||||||
        self.context = context.get_admin_context_without_session()
 | 
					        self.context = context.get_admin_context_without_session()
 | 
				
			||||||
        # Handle updates from service
 | 
					        # Handle updates from service
 | 
				
			||||||
        self.dispatcher = self._create_rpc_dispatcher()
 | 
					        self.endpoints = [self]
 | 
				
			||||||
        # Define the listening consumers for the agent
 | 
					        # Define the listening consumers for the agent
 | 
				
			||||||
        consumers = [[topics.PORT, topics.UPDATE],
 | 
					        consumers = [[topics.PORT, topics.UPDATE],
 | 
				
			||||||
                     [topics.NETWORK, topics.DELETE],
 | 
					                     [topics.NETWORK, topics.DELETE],
 | 
				
			||||||
                     [topics.PORT, topics.DELETE],
 | 
					                     [topics.PORT, topics.DELETE],
 | 
				
			||||||
                     [constants.TUNNEL, topics.UPDATE]]
 | 
					                     [constants.TUNNEL, topics.UPDATE]]
 | 
				
			||||||
        self.connection = agent_rpc.create_consumers(self.dispatcher,
 | 
					        self.connection = agent_rpc.create_consumers(self.endpoints,
 | 
				
			||||||
                                                     self.topic,
 | 
					                                                     self.topic,
 | 
				
			||||||
                                                     consumers)
 | 
					                                                     consumers)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -233,9 +230,6 @@ class HyperVNeutronAgent(rpc_compat.RpcCallback):
 | 
				
			|||||||
            network_type, physical_network,
 | 
					            network_type, physical_network,
 | 
				
			||||||
            segmentation_id, port['admin_state_up'])
 | 
					            segmentation_id, port['admin_state_up'])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        return [self]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def _get_vswitch_name(self, network_type, physical_network):
 | 
					    def _get_vswitch_name(self, network_type, physical_network):
 | 
				
			||||||
        if network_type != p_const.TYPE_LOCAL:
 | 
					        if network_type != p_const.TYPE_LOCAL:
 | 
				
			||||||
            vswitch_name = self._get_vswitch_for_physical_network(
 | 
					            vswitch_name = self._get_vswitch_for_physical_network(
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -190,10 +190,10 @@ class HyperVNeutronPlugin(agents_db.AgentDbMixin,
 | 
				
			|||||||
        self.conn = rpc_compat.create_connection(new=True)
 | 
					        self.conn = rpc_compat.create_connection(new=True)
 | 
				
			||||||
        self.notifier = agent_notifier_api.AgentNotifierApi(
 | 
					        self.notifier = agent_notifier_api.AgentNotifierApi(
 | 
				
			||||||
            topics.AGENT)
 | 
					            topics.AGENT)
 | 
				
			||||||
        self.callbacks = rpc_callbacks.HyperVRpcCallbacks(self.notifier)
 | 
					        self.endpoints = [rpc_callbacks.HyperVRpcCallbacks(self.notifier),
 | 
				
			||||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
					                          agents_db.AgentExtRpcCallback()]
 | 
				
			||||||
        for svc_topic in self.service_topics.values():
 | 
					        for svc_topic in self.service_topics.values():
 | 
				
			||||||
            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
 | 
					            self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
 | 
				
			||||||
        # Consume from all consumers in threads
 | 
					        # Consume from all consumers in threads
 | 
				
			||||||
        self.conn.consume_in_threads()
 | 
					        self.conn.consume_in_threads()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -18,7 +18,6 @@
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
from neutron.common import constants as q_const
 | 
					from neutron.common import constants as q_const
 | 
				
			||||||
from neutron.common import rpc_compat
 | 
					from neutron.common import rpc_compat
 | 
				
			||||||
from neutron.db import agents_db
 | 
					 | 
				
			||||||
from neutron.db import dhcp_rpc_base
 | 
					from neutron.db import dhcp_rpc_base
 | 
				
			||||||
from neutron.db import l3_rpc_base
 | 
					from neutron.db import l3_rpc_base
 | 
				
			||||||
from neutron.openstack.common import log as logging
 | 
					from neutron.openstack.common import log as logging
 | 
				
			||||||
@@ -41,14 +40,6 @@ class HyperVRpcCallbacks(
 | 
				
			|||||||
        self.notifier = notifier
 | 
					        self.notifier = notifier
 | 
				
			||||||
        self._db = hyperv_db.HyperVPluginDB()
 | 
					        self._db = hyperv_db.HyperVPluginDB()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        '''Get the rpc dispatcher for this manager.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        If a manager would like to set an rpc API version, or support more than
 | 
					 | 
				
			||||||
        one class as the target of rpc messages, override this method.
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        return [self, agents_db.AgentExtRpcCallback()]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def get_device_details(self, rpc_context, **kwargs):
 | 
					    def get_device_details(self, rpc_context, **kwargs):
 | 
				
			||||||
        """Agent requests device details."""
 | 
					        """Agent requests device details."""
 | 
				
			||||||
        agent_id = kwargs.get('agent_id')
 | 
					        agent_id = kwargs.get('agent_id')
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -123,10 +123,10 @@ class SdnveNeutronAgent(rpc_compat.RpcCallback):
 | 
				
			|||||||
        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
 | 
					        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.context = context.get_admin_context_without_session()
 | 
					        self.context = context.get_admin_context_without_session()
 | 
				
			||||||
        self.dispatcher = self.create_rpc_dispatcher()
 | 
					        self.endpoints = [self]
 | 
				
			||||||
        consumers = [[constants.INFO, topics.UPDATE]]
 | 
					        consumers = [[constants.INFO, topics.UPDATE]]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.connection = agent_rpc.create_consumers(self.dispatcher,
 | 
					        self.connection = agent_rpc.create_consumers(self.endpoints,
 | 
				
			||||||
                                                     self.topic,
 | 
					                                                     self.topic,
 | 
				
			||||||
                                                     consumers)
 | 
					                                                     consumers)
 | 
				
			||||||
        if self.polling_interval:
 | 
					        if self.polling_interval:
 | 
				
			||||||
@@ -154,9 +154,6 @@ class SdnveNeutronAgent(rpc_compat.RpcCallback):
 | 
				
			|||||||
                                             "connection-mode",
 | 
					                                             "connection-mode",
 | 
				
			||||||
                                             "out-of-band")
 | 
					                                             "out-of-band")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        return [self]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def setup_integration_br(self, bridge_name, reset_br, out_of_band,
 | 
					    def setup_integration_br(self, bridge_name, reset_br, out_of_band,
 | 
				
			||||||
                             controller_ip=None):
 | 
					                             controller_ip=None):
 | 
				
			||||||
        '''Sets up the integration bridge.
 | 
					        '''Sets up the integration bridge.
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -48,13 +48,6 @@ class SdnveRpcCallbacks():
 | 
				
			|||||||
    def __init__(self, notifier):
 | 
					    def __init__(self, notifier):
 | 
				
			||||||
        self.notifier = notifier  # used to notify the agent
 | 
					        self.notifier = notifier  # used to notify the agent
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        '''Get the rpc dispatcher for this manager.
 | 
					 | 
				
			||||||
        If a manager would like to set an rpc API version, or support more than
 | 
					 | 
				
			||||||
        one class as the target of rpc messages, override this method.
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        return [self, agents_db.AgentExtRpcCallback()]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def sdnve_info(self, rpc_context, **kwargs):
 | 
					    def sdnve_info(self, rpc_context, **kwargs):
 | 
				
			||||||
        '''Update new information.'''
 | 
					        '''Update new information.'''
 | 
				
			||||||
        info = kwargs.get('info')
 | 
					        info = kwargs.get('info')
 | 
				
			||||||
@@ -140,9 +133,9 @@ class SdnvePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 | 
				
			|||||||
        self.topic = topics.PLUGIN
 | 
					        self.topic = topics.PLUGIN
 | 
				
			||||||
        self.conn = rpc_compat.create_connection(new=True)
 | 
					        self.conn = rpc_compat.create_connection(new=True)
 | 
				
			||||||
        self.notifier = AgentNotifierApi(topics.AGENT)
 | 
					        self.notifier = AgentNotifierApi(topics.AGENT)
 | 
				
			||||||
        self.callbacks = SdnveRpcCallbacks(self.notifier)
 | 
					        self.endpoints = [SdnveRpcCallbacks(self.notifier),
 | 
				
			||||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
					                          agents_db.AgentExtRpcCallback()]
 | 
				
			||||||
        self.conn.create_consumer(self.topic, self.dispatcher,
 | 
					        self.conn.create_consumer(self.topic, self.endpoints,
 | 
				
			||||||
                                  fanout=False)
 | 
					                                  fanout=False)
 | 
				
			||||||
        # Consume from all consumers in threads
 | 
					        # Consume from all consumers in threads
 | 
				
			||||||
        self.conn.consume_in_threads()
 | 
					        self.conn.consume_in_threads()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -809,14 +809,6 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback,
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
            getattr(self, method)(context, values)
 | 
					            getattr(self, method)(context, values)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        '''Get the rpc dispatcher for this manager.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        If a manager would like to set an rpc API version, or support more than
 | 
					 | 
				
			||||||
        one class as the target of rpc messages, override this method.
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        return [self]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
class LinuxBridgePluginApi(agent_rpc.PluginApi,
 | 
					class LinuxBridgePluginApi(agent_rpc.PluginApi,
 | 
				
			||||||
                           sg_rpc.SecurityGroupServerRpcApiMixin):
 | 
					                           sg_rpc.SecurityGroupServerRpcApiMixin):
 | 
				
			||||||
@@ -876,9 +868,7 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
 | 
				
			|||||||
        # RPC network init
 | 
					        # RPC network init
 | 
				
			||||||
        self.context = context.get_admin_context_without_session()
 | 
					        self.context = context.get_admin_context_without_session()
 | 
				
			||||||
        # Handle updates from service
 | 
					        # Handle updates from service
 | 
				
			||||||
        self.callbacks = LinuxBridgeRpcCallbacks(self.context,
 | 
					        self.endpoints = [LinuxBridgeRpcCallbacks(self.context, self)]
 | 
				
			||||||
                                                 self)
 | 
					 | 
				
			||||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
					 | 
				
			||||||
        # Define the listening consumers for the agent
 | 
					        # Define the listening consumers for the agent
 | 
				
			||||||
        consumers = [[topics.PORT, topics.UPDATE],
 | 
					        consumers = [[topics.PORT, topics.UPDATE],
 | 
				
			||||||
                     [topics.NETWORK, topics.DELETE],
 | 
					                     [topics.NETWORK, topics.DELETE],
 | 
				
			||||||
@@ -886,7 +876,7 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
 | 
				
			|||||||
        if cfg.CONF.VXLAN.l2_population:
 | 
					        if cfg.CONF.VXLAN.l2_population:
 | 
				
			||||||
            consumers.append([topics.L2POPULATION,
 | 
					            consumers.append([topics.L2POPULATION,
 | 
				
			||||||
                              topics.UPDATE, cfg.CONF.host])
 | 
					                              topics.UPDATE, cfg.CONF.host])
 | 
				
			||||||
        self.connection = agent_rpc.create_consumers(self.dispatcher,
 | 
					        self.connection = agent_rpc.create_consumers(self.endpoints,
 | 
				
			||||||
                                                     self.topic,
 | 
					                                                     self.topic,
 | 
				
			||||||
                                                     consumers)
 | 
					                                                     consumers)
 | 
				
			||||||
        report_interval = cfg.CONF.AGENT.report_interval
 | 
					        report_interval = cfg.CONF.AGENT.report_interval
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -65,14 +65,6 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback,
 | 
				
			|||||||
    # Device names start with "tap"
 | 
					    # Device names start with "tap"
 | 
				
			||||||
    TAP_PREFIX_LEN = 3
 | 
					    TAP_PREFIX_LEN = 3
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        '''Get the rpc dispatcher for this manager.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        If a manager would like to set an rpc API version, or support more than
 | 
					 | 
				
			||||||
        one class as the target of rpc messages, override this method.
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        return [self, agents_db.AgentExtRpcCallback()]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @classmethod
 | 
					    @classmethod
 | 
				
			||||||
    def get_port_from_device(cls, device):
 | 
					    def get_port_from_device(cls, device):
 | 
				
			||||||
        port = db.get_port_from_device(device[cls.TAP_PREFIX_LEN:])
 | 
					        port = db.get_port_from_device(device[cls.TAP_PREFIX_LEN:])
 | 
				
			||||||
@@ -281,10 +273,10 @@ class LinuxBridgePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 | 
				
			|||||||
        self.service_topics = {svc_constants.CORE: topics.PLUGIN,
 | 
					        self.service_topics = {svc_constants.CORE: topics.PLUGIN,
 | 
				
			||||||
                               svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
 | 
					                               svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
 | 
				
			||||||
        self.conn = rpc_compat.create_connection(new=True)
 | 
					        self.conn = rpc_compat.create_connection(new=True)
 | 
				
			||||||
        self.callbacks = LinuxBridgeRpcCallbacks()
 | 
					        self.endpoints = [LinuxBridgeRpcCallbacks(),
 | 
				
			||||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
					                          agents_db.AgentExtRpcCallback()]
 | 
				
			||||||
        for svc_topic in self.service_topics.values():
 | 
					        for svc_topic in self.service_topics.values():
 | 
				
			||||||
            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
 | 
					            self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
 | 
				
			||||||
        # Consume from all consumers in threads
 | 
					        # Consume from all consumers in threads
 | 
				
			||||||
        self.conn.consume_in_threads()
 | 
					        self.conn.consume_in_threads()
 | 
				
			||||||
        self.notifier = AgentNotifierApi(topics.AGENT)
 | 
					        self.notifier = AgentNotifierApi(topics.AGENT)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -180,16 +180,6 @@ class MidoRpcCallbacks(rpc_compat.RpcCallback,
 | 
				
			|||||||
                       dhcp_rpc_base.DhcpRpcCallbackMixin):
 | 
					                       dhcp_rpc_base.DhcpRpcCallbackMixin):
 | 
				
			||||||
    RPC_API_VERSION = '1.1'
 | 
					    RPC_API_VERSION = '1.1'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        """Get the rpc dispatcher for this manager.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        This a basic implementation that will call the plugin like get_ports
 | 
					 | 
				
			||||||
        and handle basic events
 | 
					 | 
				
			||||||
        If a manager would like to set an rpc API version, or support more than
 | 
					 | 
				
			||||||
        one class as the target of rpc messages, override this method.
 | 
					 | 
				
			||||||
        """
 | 
					 | 
				
			||||||
        return [self, agents_db.AgentExtRpcCallback()]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
class MidonetPluginException(n_exc.NeutronException):
 | 
					class MidonetPluginException(n_exc.NeutronException):
 | 
				
			||||||
    message = _("%(msg)s")
 | 
					    message = _("%(msg)s")
 | 
				
			||||||
@@ -382,9 +372,9 @@ class MidonetPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 | 
				
			|||||||
        # RPC support
 | 
					        # RPC support
 | 
				
			||||||
        self.topic = topics.PLUGIN
 | 
					        self.topic = topics.PLUGIN
 | 
				
			||||||
        self.conn = rpc_compat.create_connection(new=True)
 | 
					        self.conn = rpc_compat.create_connection(new=True)
 | 
				
			||||||
        self.callbacks = MidoRpcCallbacks()
 | 
					        self.endpoints = [MidoRpcCallbacks(),
 | 
				
			||||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
					                          agents_db.AgentExtRpcCallback()]
 | 
				
			||||||
        self.conn.create_consumer(self.topic, self.dispatcher,
 | 
					        self.conn.create_consumer(self.topic, self.endpoints,
 | 
				
			||||||
                                  fanout=False)
 | 
					                                  fanout=False)
 | 
				
			||||||
        # Consume from all consumers in threads
 | 
					        # Consume from all consumers in threads
 | 
				
			||||||
        self.conn.consume_in_threads()
 | 
					        self.conn.consume_in_threads()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -25,6 +25,7 @@ from neutron.common import constants as const
 | 
				
			|||||||
from neutron.common import exceptions as exc
 | 
					from neutron.common import exceptions as exc
 | 
				
			||||||
from neutron.common import rpc_compat
 | 
					from neutron.common import rpc_compat
 | 
				
			||||||
from neutron.common import topics
 | 
					from neutron.common import topics
 | 
				
			||||||
 | 
					from neutron.db import agents_db
 | 
				
			||||||
from neutron.db import agentschedulers_db
 | 
					from neutron.db import agentschedulers_db
 | 
				
			||||||
from neutron.db import allowedaddresspairs_db as addr_pair_db
 | 
					from neutron.db import allowedaddresspairs_db as addr_pair_db
 | 
				
			||||||
from neutron.db import db_base_plugin_v2
 | 
					from neutron.db import db_base_plugin_v2
 | 
				
			||||||
@@ -126,11 +127,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
 | 
				
			|||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def start_rpc_listeners(self):
 | 
					    def start_rpc_listeners(self):
 | 
				
			||||||
        self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager)
 | 
					        self.endpoints = [rpc.RpcCallbacks(self.notifier, self.type_manager),
 | 
				
			||||||
 | 
					                          agents_db.AgentExtRpcCallback()]
 | 
				
			||||||
        self.topic = topics.PLUGIN
 | 
					        self.topic = topics.PLUGIN
 | 
				
			||||||
        self.conn = rpc_compat.create_connection(new=True)
 | 
					        self.conn = rpc_compat.create_connection(new=True)
 | 
				
			||||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
					        self.conn.create_consumer(self.topic, self.endpoints,
 | 
				
			||||||
        self.conn.create_consumer(self.topic, self.dispatcher,
 | 
					 | 
				
			||||||
                                  fanout=False)
 | 
					                                  fanout=False)
 | 
				
			||||||
        return self.conn.consume_in_threads()
 | 
					        return self.conn.consume_in_threads()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -19,7 +19,6 @@ from neutron.agent import securitygroups_rpc as sg_rpc
 | 
				
			|||||||
from neutron.common import constants as q_const
 | 
					from neutron.common import constants as q_const
 | 
				
			||||||
from neutron.common import rpc_compat
 | 
					from neutron.common import rpc_compat
 | 
				
			||||||
from neutron.common import topics
 | 
					from neutron.common import topics
 | 
				
			||||||
from neutron.db import agents_db
 | 
					 | 
				
			||||||
from neutron.db import api as db_api
 | 
					from neutron.db import api as db_api
 | 
				
			||||||
from neutron.db import dhcp_rpc_base
 | 
					from neutron.db import dhcp_rpc_base
 | 
				
			||||||
from neutron.db import securitygroups_rpc_base as sg_db_rpc
 | 
					from neutron.db import securitygroups_rpc_base as sg_db_rpc
 | 
				
			||||||
@@ -58,14 +57,6 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
 | 
				
			|||||||
        # test in H3.
 | 
					        # test in H3.
 | 
				
			||||||
        super(RpcCallbacks, self).__init__(notifier, type_manager)
 | 
					        super(RpcCallbacks, self).__init__(notifier, type_manager)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        '''Get the rpc dispatcher for this manager.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        If a manager would like to set an rpc API version, or support more than
 | 
					 | 
				
			||||||
        one class as the target of rpc messages, override this method.
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        return [self, agents_db.AgentExtRpcCallback()]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @classmethod
 | 
					    @classmethod
 | 
				
			||||||
    def _device_to_port_id(cls, device):
 | 
					    def _device_to_port_id(cls, device):
 | 
				
			||||||
        # REVISIT(rkukura): Consider calling into MechanismDrivers to
 | 
					        # REVISIT(rkukura): Consider calling into MechanismDrivers to
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -210,15 +210,6 @@ class MlnxEswitchRpcCallbacks(rpc_compat.RpcCallback,
 | 
				
			|||||||
        else:
 | 
					        else:
 | 
				
			||||||
            LOG.debug(_("No port %s defined on agent."), port['id'])
 | 
					            LOG.debug(_("No port %s defined on agent."), port['id'])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        """Get the rpc dispatcher for this manager.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        If a manager would like to set an rpc API version,
 | 
					 | 
				
			||||||
        or support more than one class as the target of rpc messages,
 | 
					 | 
				
			||||||
        override this method.
 | 
					 | 
				
			||||||
        """
 | 
					 | 
				
			||||||
        return [self]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
class MlnxEswitchPluginApi(agent_rpc.PluginApi,
 | 
					class MlnxEswitchPluginApi(agent_rpc.PluginApi,
 | 
				
			||||||
                           sg_rpc.SecurityGroupServerRpcApiMixin):
 | 
					                           sg_rpc.SecurityGroupServerRpcApiMixin):
 | 
				
			||||||
@@ -268,14 +259,12 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
 | 
				
			|||||||
        # RPC network init
 | 
					        # RPC network init
 | 
				
			||||||
        self.context = context.get_admin_context_without_session()
 | 
					        self.context = context.get_admin_context_without_session()
 | 
				
			||||||
        # Handle updates from service
 | 
					        # Handle updates from service
 | 
				
			||||||
        self.callbacks = MlnxEswitchRpcCallbacks(self.context,
 | 
					        self.endpoints = [MlnxEswitchRpcCallbacks(self.context, self)]
 | 
				
			||||||
                                                 self)
 | 
					 | 
				
			||||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
					 | 
				
			||||||
        # Define the listening consumers for the agent
 | 
					        # Define the listening consumers for the agent
 | 
				
			||||||
        consumers = [[topics.PORT, topics.UPDATE],
 | 
					        consumers = [[topics.PORT, topics.UPDATE],
 | 
				
			||||||
                     [topics.NETWORK, topics.DELETE],
 | 
					                     [topics.NETWORK, topics.DELETE],
 | 
				
			||||||
                     [topics.SECURITY_GROUP, topics.UPDATE]]
 | 
					                     [topics.SECURITY_GROUP, topics.UPDATE]]
 | 
				
			||||||
        self.connection = agent_rpc.create_consumers(self.dispatcher,
 | 
					        self.connection = agent_rpc.create_consumers(self.endpoints,
 | 
				
			||||||
                                                     self.topic,
 | 
					                                                     self.topic,
 | 
				
			||||||
                                                     consumers)
 | 
					                                                     consumers)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -28,6 +28,7 @@ from neutron.common import exceptions as n_exc
 | 
				
			|||||||
from neutron.common import rpc_compat
 | 
					from neutron.common import rpc_compat
 | 
				
			||||||
from neutron.common import topics
 | 
					from neutron.common import topics
 | 
				
			||||||
from neutron.common import utils
 | 
					from neutron.common import utils
 | 
				
			||||||
 | 
					from neutron.db import agents_db
 | 
				
			||||||
from neutron.db import agentschedulers_db
 | 
					from neutron.db import agentschedulers_db
 | 
				
			||||||
from neutron.db import db_base_plugin_v2
 | 
					from neutron.db import db_base_plugin_v2
 | 
				
			||||||
from neutron.db import external_net_db
 | 
					from neutron.db import external_net_db
 | 
				
			||||||
@@ -120,10 +121,10 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
 | 
				
			|||||||
        self.service_topics = {svc_constants.CORE: topics.PLUGIN,
 | 
					        self.service_topics = {svc_constants.CORE: topics.PLUGIN,
 | 
				
			||||||
                               svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
 | 
					                               svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
 | 
				
			||||||
        self.conn = rpc_compat.create_connection(new=True)
 | 
					        self.conn = rpc_compat.create_connection(new=True)
 | 
				
			||||||
        self.callbacks = rpc_callbacks.MlnxRpcCallbacks()
 | 
					        self.endpoints = [rpc_callbacks.MlnxRpcCallbacks(),
 | 
				
			||||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
					                          agents_db.AgentExtRpcCallback()]
 | 
				
			||||||
        for svc_topic in self.service_topics.values():
 | 
					        for svc_topic in self.service_topics.values():
 | 
				
			||||||
            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
 | 
					            self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
 | 
				
			||||||
        # Consume from all consumers in threads
 | 
					        # Consume from all consumers in threads
 | 
				
			||||||
        self.conn.consume_in_threads()
 | 
					        self.conn.consume_in_threads()
 | 
				
			||||||
        self.notifier = agent_notify_api.AgentNotifierApi(topics.AGENT)
 | 
					        self.notifier = agent_notify_api.AgentNotifierApi(topics.AGENT)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -18,7 +18,6 @@ from oslo.config import cfg
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
from neutron.common import constants as q_const
 | 
					from neutron.common import constants as q_const
 | 
				
			||||||
from neutron.common import rpc_compat
 | 
					from neutron.common import rpc_compat
 | 
				
			||||||
from neutron.db import agents_db
 | 
					 | 
				
			||||||
from neutron.db import api as db_api
 | 
					from neutron.db import api as db_api
 | 
				
			||||||
from neutron.db import dhcp_rpc_base
 | 
					from neutron.db import dhcp_rpc_base
 | 
				
			||||||
from neutron.db import l3_rpc_base
 | 
					from neutron.db import l3_rpc_base
 | 
				
			||||||
@@ -40,15 +39,6 @@ class MlnxRpcCallbacks(rpc_compat.RpcCallback,
 | 
				
			|||||||
    #to be compatible with Linux Bridge Agent on Network Node
 | 
					    #to be compatible with Linux Bridge Agent on Network Node
 | 
				
			||||||
    TAP_PREFIX_LEN = 3
 | 
					    TAP_PREFIX_LEN = 3
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        """Get the rpc dispatcher for this manager.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        If a manager would like to set an RPC API version,
 | 
					 | 
				
			||||||
        or support more than one class as the target of RPC messages,
 | 
					 | 
				
			||||||
        override this method.
 | 
					 | 
				
			||||||
        """
 | 
					 | 
				
			||||||
        return [self, agents_db.AgentExtRpcCallback()]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @classmethod
 | 
					    @classmethod
 | 
				
			||||||
    def get_port_from_device(cls, device):
 | 
					    def get_port_from_device(cls, device):
 | 
				
			||||||
        """Get port according to device.
 | 
					        """Get port according to device.
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -156,11 +156,11 @@ class NECNeutronAgent(object):
 | 
				
			|||||||
                                                self, self.sg_agent)
 | 
					                                                self, self.sg_agent)
 | 
				
			||||||
        self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
 | 
					        self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
 | 
				
			||||||
                                                         self.sg_agent)
 | 
					                                                         self.sg_agent)
 | 
				
			||||||
        self.dispatcher = [self.callback_nec, self.callback_sg]
 | 
					        self.endpoints = [self.callback_nec, self.callback_sg]
 | 
				
			||||||
        # Define the listening consumer for the agent
 | 
					        # Define the listening consumer for the agent
 | 
				
			||||||
        consumers = [[topics.PORT, topics.UPDATE],
 | 
					        consumers = [[topics.PORT, topics.UPDATE],
 | 
				
			||||||
                     [topics.SECURITY_GROUP, topics.UPDATE]]
 | 
					                     [topics.SECURITY_GROUP, topics.UPDATE]]
 | 
				
			||||||
        self.connection = agent_rpc.create_consumers(self.dispatcher,
 | 
					        self.connection = agent_rpc.create_consumers(self.endpoints,
 | 
				
			||||||
                                                     self.topic,
 | 
					                                                     self.topic,
 | 
				
			||||||
                                                     consumers)
 | 
					                                                     consumers)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -146,14 +146,14 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        # NOTE: callback_sg is referred to from the sg unit test.
 | 
					        # NOTE: callback_sg is referred to from the sg unit test.
 | 
				
			||||||
        self.callback_sg = SecurityGroupServerRpcCallback()
 | 
					        self.callback_sg = SecurityGroupServerRpcCallback()
 | 
				
			||||||
        self.dispatcher = [
 | 
					        self.endpoints = [
 | 
				
			||||||
            NECPluginV2RPCCallbacks(self.safe_reference),
 | 
					            NECPluginV2RPCCallbacks(self.safe_reference),
 | 
				
			||||||
            DhcpRpcCallback(),
 | 
					            DhcpRpcCallback(),
 | 
				
			||||||
            L3RpcCallback(),
 | 
					            L3RpcCallback(),
 | 
				
			||||||
            self.callback_sg,
 | 
					            self.callback_sg,
 | 
				
			||||||
            agents_db.AgentExtRpcCallback()]
 | 
					            agents_db.AgentExtRpcCallback()]
 | 
				
			||||||
        for svc_topic in self.service_topics.values():
 | 
					        for svc_topic in self.service_topics.values():
 | 
				
			||||||
            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
 | 
					            self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
 | 
				
			||||||
        # Consume from all consumers in threads
 | 
					        # Consume from all consumers in threads
 | 
				
			||||||
        self.conn.consume_in_threads()
 | 
					        self.conn.consume_in_threads()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -715,14 +715,6 @@ class NECPluginV2RPCCallbacks(rpc_compat.RpcCallback):
 | 
				
			|||||||
        super(NECPluginV2RPCCallbacks, self).__init__()
 | 
					        super(NECPluginV2RPCCallbacks, self).__init__()
 | 
				
			||||||
        self.plugin = plugin
 | 
					        self.plugin = plugin
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        '''Get the rpc dispatcher for this manager.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        If a manager would like to set an rpc API version, or support more than
 | 
					 | 
				
			||||||
        one class as the target of rpc messages, override this method.
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        return [self]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def update_ports(self, rpc_context, **kwargs):
 | 
					    def update_ports(self, rpc_context, **kwargs):
 | 
				
			||||||
        """Update ports' information and activate/deavtivate them.
 | 
					        """Update ports' information and activate/deavtivate them.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -286,13 +286,13 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
 | 
				
			|||||||
        # RPC network init
 | 
					        # RPC network init
 | 
				
			||||||
        self.context = context.get_admin_context_without_session()
 | 
					        self.context = context.get_admin_context_without_session()
 | 
				
			||||||
        # Handle updates from service
 | 
					        # Handle updates from service
 | 
				
			||||||
        self.dispatcher = self.create_rpc_dispatcher()
 | 
					        self.endpoints = [self]
 | 
				
			||||||
        # Define the listening consumers for the agent
 | 
					        # Define the listening consumers for the agent
 | 
				
			||||||
        consumers = [[topics.PORT, topics.UPDATE],
 | 
					        consumers = [[topics.PORT, topics.UPDATE],
 | 
				
			||||||
                     [topics.NETWORK, topics.DELETE],
 | 
					                     [topics.NETWORK, topics.DELETE],
 | 
				
			||||||
                     [constants.TUNNEL, topics.UPDATE],
 | 
					                     [constants.TUNNEL, topics.UPDATE],
 | 
				
			||||||
                     [topics.SECURITY_GROUP, topics.UPDATE]]
 | 
					                     [topics.SECURITY_GROUP, topics.UPDATE]]
 | 
				
			||||||
        self.connection = agent_rpc.create_consumers(self.dispatcher,
 | 
					        self.connection = agent_rpc.create_consumers(self.endpoints,
 | 
				
			||||||
                                                     self.topic,
 | 
					                                                     self.topic,
 | 
				
			||||||
                                                     consumers)
 | 
					                                                     consumers)
 | 
				
			||||||
        report_interval = cfg.CONF.AGENT.report_interval
 | 
					        report_interval = cfg.CONF.AGENT.report_interval
 | 
				
			||||||
@@ -344,14 +344,6 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
 | 
				
			|||||||
            return
 | 
					            return
 | 
				
			||||||
        self.setup_tunnel_port(tun_name, tunnel_ip, tunnel_type)
 | 
					        self.setup_tunnel_port(tun_name, tunnel_ip, tunnel_type)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        """Get the rpc dispatcher for this manager.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        If a manager would like to set an rpc API version, or support more than
 | 
					 | 
				
			||||||
        one class as the target of rpc messages, override this method.
 | 
					 | 
				
			||||||
        """
 | 
					 | 
				
			||||||
        return [self]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def _provision_local_vlan_outbound_for_tunnel(self, lvid,
 | 
					    def _provision_local_vlan_outbound_for_tunnel(self, lvid,
 | 
				
			||||||
                                                  segmentation_id, ofports):
 | 
					                                                  segmentation_id, ofports):
 | 
				
			||||||
        br = self.tun_br
 | 
					        br = self.tun_br
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -119,11 +119,11 @@ class NVSDNeutronAgent(rpc_compat.RpcCallback):
 | 
				
			|||||||
                                                self, self.sg_agent)
 | 
					                                                self, self.sg_agent)
 | 
				
			||||||
        self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
 | 
					        self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
 | 
				
			||||||
                                                         self.sg_agent)
 | 
					                                                         self.sg_agent)
 | 
				
			||||||
        self.dispatcher = [self.callback_oc, self.callback_sg]
 | 
					        self.endpoints = [self.callback_oc, self.callback_sg]
 | 
				
			||||||
        # Define the listening consumer for the agent
 | 
					        # Define the listening consumer for the agent
 | 
				
			||||||
        consumers = [[topics.PORT, topics.UPDATE],
 | 
					        consumers = [[topics.PORT, topics.UPDATE],
 | 
				
			||||||
                     [topics.SECURITY_GROUP, topics.UPDATE]]
 | 
					                     [topics.SECURITY_GROUP, topics.UPDATE]]
 | 
				
			||||||
        self.connection = agent_rpc.create_consumers(self.dispatcher,
 | 
					        self.connection = agent_rpc.create_consumers(self.endpoints,
 | 
				
			||||||
                                                     self.topic,
 | 
					                                                     self.topic,
 | 
				
			||||||
                                                     consumers)
 | 
					                                                     consumers)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -58,10 +58,6 @@ class NVSDPluginRpcCallbacks(rpc_compat.RpcCallback,
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    RPC_API_VERSION = '1.1'
 | 
					    RPC_API_VERSION = '1.1'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        """Get the rpc dispatcher for this manager."""
 | 
					 | 
				
			||||||
        return [self, agents_db.AgentExtRpcCallback()]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @staticmethod
 | 
					    @staticmethod
 | 
				
			||||||
    def get_port_from_device(device):
 | 
					    def get_port_from_device(device):
 | 
				
			||||||
        port = nvsd_db.get_port_from_device(device)
 | 
					        port = nvsd_db.get_port_from_device(device)
 | 
				
			||||||
@@ -165,10 +161,10 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 | 
				
			|||||||
        self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
 | 
					        self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
 | 
				
			||||||
            l3_rpc_agent_api.L3AgentNotifyAPI()
 | 
					            l3_rpc_agent_api.L3AgentNotifyAPI()
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
        self.callbacks = NVSDPluginRpcCallbacks()
 | 
					        self.endpoints = [NVSDPluginRpcCallbacks(),
 | 
				
			||||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
					                          agents_db.AgentExtRpcCallback()]
 | 
				
			||||||
        for svc_topic in self.service_topics.values():
 | 
					        for svc_topic in self.service_topics.values():
 | 
				
			||||||
            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
 | 
					            self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Consume from all consumers in threads
 | 
					        # Consume from all consumers in threads
 | 
				
			||||||
        self.conn.consume_in_threads()
 | 
					        self.conn.consume_in_threads()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -249,7 +249,7 @@ class OVSNeutronAgent(rpc_compat.RpcCallback,
 | 
				
			|||||||
        # RPC network init
 | 
					        # RPC network init
 | 
				
			||||||
        self.context = context.get_admin_context_without_session()
 | 
					        self.context = context.get_admin_context_without_session()
 | 
				
			||||||
        # Handle updates from service
 | 
					        # Handle updates from service
 | 
				
			||||||
        self.dispatcher = self.create_rpc_dispatcher()
 | 
					        self.endpoints = [self]
 | 
				
			||||||
        # Define the listening consumers for the agent
 | 
					        # Define the listening consumers for the agent
 | 
				
			||||||
        consumers = [[topics.PORT, topics.UPDATE],
 | 
					        consumers = [[topics.PORT, topics.UPDATE],
 | 
				
			||||||
                     [topics.NETWORK, topics.DELETE],
 | 
					                     [topics.NETWORK, topics.DELETE],
 | 
				
			||||||
@@ -258,7 +258,7 @@ class OVSNeutronAgent(rpc_compat.RpcCallback,
 | 
				
			|||||||
        if self.l2_pop:
 | 
					        if self.l2_pop:
 | 
				
			||||||
            consumers.append([topics.L2POPULATION,
 | 
					            consumers.append([topics.L2POPULATION,
 | 
				
			||||||
                              topics.UPDATE, cfg.CONF.host])
 | 
					                              topics.UPDATE, cfg.CONF.host])
 | 
				
			||||||
        self.connection = agent_rpc.create_consumers(self.dispatcher,
 | 
					        self.connection = agent_rpc.create_consumers(self.endpoints,
 | 
				
			||||||
                                                     self.topic,
 | 
					                                                     self.topic,
 | 
				
			||||||
                                                     consumers)
 | 
					                                                     consumers)
 | 
				
			||||||
        report_interval = cfg.CONF.AGENT.report_interval
 | 
					        report_interval = cfg.CONF.AGENT.report_interval
 | 
				
			||||||
@@ -493,14 +493,6 @@ class OVSNeutronAgent(rpc_compat.RpcCallback,
 | 
				
			|||||||
        else:
 | 
					        else:
 | 
				
			||||||
            LOG.warning(_('Action %s not supported'), action)
 | 
					            LOG.warning(_('Action %s not supported'), action)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        '''Get the rpc dispatcher for this manager.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        If a manager would like to set an rpc API version, or support more than
 | 
					 | 
				
			||||||
        one class as the target of rpc messages, override this method.
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        return [self]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def provision_local_vlan(self, net_uuid, network_type, physical_network,
 | 
					    def provision_local_vlan(self, net_uuid, network_type, physical_network,
 | 
				
			||||||
                             segmentation_id):
 | 
					                             segmentation_id):
 | 
				
			||||||
        '''Provisions a local VLAN.
 | 
					        '''Provisions a local VLAN.
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -73,14 +73,6 @@ class OVSRpcCallbacks(rpc_compat.RpcCallback,
 | 
				
			|||||||
        self.notifier = notifier
 | 
					        self.notifier = notifier
 | 
				
			||||||
        self.tunnel_type = tunnel_type
 | 
					        self.tunnel_type = tunnel_type
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        '''Get the rpc dispatcher for this manager.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        If a manager would like to set an rpc API version, or support more than
 | 
					 | 
				
			||||||
        one class as the target of rpc messages, override this method.
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        return [self, agents_db.AgentExtRpcCallback()]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @classmethod
 | 
					    @classmethod
 | 
				
			||||||
    def get_port_from_device(cls, device):
 | 
					    def get_port_from_device(cls, device):
 | 
				
			||||||
        port = ovs_db_v2.get_port_from_device(device)
 | 
					        port = ovs_db_v2.get_port_from_device(device)
 | 
				
			||||||
@@ -341,10 +333,10 @@ class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 | 
				
			|||||||
        self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
 | 
					        self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
 | 
				
			||||||
            l3_rpc_agent_api.L3AgentNotifyAPI()
 | 
					            l3_rpc_agent_api.L3AgentNotifyAPI()
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
        self.callbacks = OVSRpcCallbacks(self.notifier, self.tunnel_type)
 | 
					        self.endpoints = [OVSRpcCallbacks(self.notifier, self.tunnel_type),
 | 
				
			||||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
					                          agents_db.AgentExtRpcCallback()]
 | 
				
			||||||
        for svc_topic in self.service_topics.values():
 | 
					        for svc_topic in self.service_topics.values():
 | 
				
			||||||
            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
 | 
					            self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
 | 
				
			||||||
        # Consume from all consumers in threads
 | 
					        # Consume from all consumers in threads
 | 
				
			||||||
        self.conn.consume_in_threads()
 | 
					        self.conn.consume_in_threads()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -200,16 +200,13 @@ class OVSNeutronOFPRyuAgent(rpc_compat.RpcCallback,
 | 
				
			|||||||
        self.topic = topics.AGENT
 | 
					        self.topic = topics.AGENT
 | 
				
			||||||
        self.plugin_rpc = RyuPluginApi(topics.PLUGIN)
 | 
					        self.plugin_rpc = RyuPluginApi(topics.PLUGIN)
 | 
				
			||||||
        self.context = q_context.get_admin_context_without_session()
 | 
					        self.context = q_context.get_admin_context_without_session()
 | 
				
			||||||
        self.dispatcher = self._create_rpc_dispatcher()
 | 
					        self.endpoints = [self]
 | 
				
			||||||
        consumers = [[topics.PORT, topics.UPDATE],
 | 
					        consumers = [[topics.PORT, topics.UPDATE],
 | 
				
			||||||
                     [topics.SECURITY_GROUP, topics.UPDATE]]
 | 
					                     [topics.SECURITY_GROUP, topics.UPDATE]]
 | 
				
			||||||
        self.connection = agent_rpc.create_consumers(self.dispatcher,
 | 
					        self.connection = agent_rpc.create_consumers(self.endpoints,
 | 
				
			||||||
                                                     self.topic,
 | 
					                                                     self.topic,
 | 
				
			||||||
                                                     consumers)
 | 
					                                                     consumers)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        return [self]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def _setup_integration_br(self, root_helper, integ_br,
 | 
					    def _setup_integration_br(self, root_helper, integ_br,
 | 
				
			||||||
                              tunnel_ip, ovsdb_port, ovsdb_ip):
 | 
					                              tunnel_ip, ovsdb_port, ovsdb_ip):
 | 
				
			||||||
        self.int_br = OVSBridge(integ_br, root_helper)
 | 
					        self.int_br = OVSBridge(integ_br, root_helper)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -57,9 +57,6 @@ class RyuRpcCallbacks(rpc_compat.RpcCallback,
 | 
				
			|||||||
        super(RyuRpcCallbacks, self).__init__()
 | 
					        super(RyuRpcCallbacks, self).__init__()
 | 
				
			||||||
        self.ofp_rest_api_addr = ofp_rest_api_addr
 | 
					        self.ofp_rest_api_addr = ofp_rest_api_addr
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        return [self]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def get_ofp_rest_api(self, context, **kwargs):
 | 
					    def get_ofp_rest_api(self, context, **kwargs):
 | 
				
			||||||
        LOG.debug(_("get_ofp_rest_api: %s"), self.ofp_rest_api_addr)
 | 
					        LOG.debug(_("get_ofp_rest_api: %s"), self.ofp_rest_api_addr)
 | 
				
			||||||
        return self.ofp_rest_api_addr
 | 
					        return self.ofp_rest_api_addr
 | 
				
			||||||
@@ -143,10 +140,9 @@ class RyuNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 | 
				
			|||||||
                               svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
 | 
					                               svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
 | 
				
			||||||
        self.conn = rpc_compat.create_connection(new=True)
 | 
					        self.conn = rpc_compat.create_connection(new=True)
 | 
				
			||||||
        self.notifier = AgentNotifierApi(topics.AGENT)
 | 
					        self.notifier = AgentNotifierApi(topics.AGENT)
 | 
				
			||||||
        self.callbacks = RyuRpcCallbacks(self.ofp_api_host)
 | 
					        self.endpoints = [RyuRpcCallbacks(self.ofp_api_host)]
 | 
				
			||||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
					 | 
				
			||||||
        for svc_topic in self.service_topics.values():
 | 
					        for svc_topic in self.service_topics.values():
 | 
				
			||||||
            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
 | 
					            self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
 | 
				
			||||||
        self.conn.consume_in_threads()
 | 
					        self.conn.consume_in_threads()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _create_all_tenant_network(self):
 | 
					    def _create_all_tenant_network(self):
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -25,7 +25,6 @@ from neutron.api.v2 import attributes
 | 
				
			|||||||
from neutron.common import constants as const
 | 
					from neutron.common import constants as const
 | 
				
			||||||
from neutron.common import exceptions as ntn_exc
 | 
					from neutron.common import exceptions as ntn_exc
 | 
				
			||||||
from neutron.common import rpc_compat
 | 
					from neutron.common import rpc_compat
 | 
				
			||||||
from neutron.db import agents_db
 | 
					 | 
				
			||||||
from neutron.db import db_base_plugin_v2
 | 
					from neutron.db import db_base_plugin_v2
 | 
				
			||||||
from neutron.db import dhcp_rpc_base
 | 
					from neutron.db import dhcp_rpc_base
 | 
				
			||||||
from neutron.db import l3_db
 | 
					from neutron.db import l3_db
 | 
				
			||||||
@@ -48,14 +47,6 @@ class NSXRpcCallbacks(rpc_compat.RpcCallback,
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    RPC_API_VERSION = '1.1'
 | 
					    RPC_API_VERSION = '1.1'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        '''Get the rpc dispatcher for this manager.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        If a manager would like to set an rpc API version, or support more than
 | 
					 | 
				
			||||||
        one class as the target of rpc messages, override this method.
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        return [self, agents_db.AgentExtRpcCallback()]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
def handle_network_dhcp_access(plugin, context, network, action):
 | 
					def handle_network_dhcp_access(plugin, context, network, action):
 | 
				
			||||||
    pass
 | 
					    pass
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -21,6 +21,7 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
 | 
				
			|||||||
from neutron.common import constants as const
 | 
					from neutron.common import constants as const
 | 
				
			||||||
from neutron.common import rpc_compat
 | 
					from neutron.common import rpc_compat
 | 
				
			||||||
from neutron.common import topics
 | 
					from neutron.common import topics
 | 
				
			||||||
 | 
					from neutron.db import agents_db
 | 
				
			||||||
from neutron.openstack.common import importutils
 | 
					from neutron.openstack.common import importutils
 | 
				
			||||||
from neutron.openstack.common import log as logging
 | 
					from neutron.openstack.common import log as logging
 | 
				
			||||||
from neutron.plugins.vmware.common import config
 | 
					from neutron.plugins.vmware.common import config
 | 
				
			||||||
@@ -70,8 +71,9 @@ class DhcpMetadataAccess(object):
 | 
				
			|||||||
    def _setup_rpc_dhcp_metadata(self, notifier=None):
 | 
					    def _setup_rpc_dhcp_metadata(self, notifier=None):
 | 
				
			||||||
        self.topic = topics.PLUGIN
 | 
					        self.topic = topics.PLUGIN
 | 
				
			||||||
        self.conn = rpc_compat.create_connection(new=True)
 | 
					        self.conn = rpc_compat.create_connection(new=True)
 | 
				
			||||||
        self.dispatcher = nsx_rpc.NSXRpcCallbacks().create_rpc_dispatcher()
 | 
					        self.endpoints = [nsx_rpc.NSXRpcCallbacks(),
 | 
				
			||||||
        self.conn.create_consumer(self.topic, self.dispatcher, fanout=False)
 | 
					                          agents_db.AgentExtRpcCallback()]
 | 
				
			||||||
 | 
					        self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
 | 
				
			||||||
        self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
 | 
					        self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
 | 
				
			||||||
            notifier or dhcp_rpc_agent_api.DhcpAgentNotifyAPI())
 | 
					            notifier or dhcp_rpc_agent_api.DhcpAgentNotifyAPI())
 | 
				
			||||||
        self.conn.consume_in_threads()
 | 
					        self.conn.consume_in_threads()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -40,9 +40,6 @@ class FirewallCallbacks(rpc_compat.RpcCallback):
 | 
				
			|||||||
        super(FirewallCallbacks, self).__init__()
 | 
					        super(FirewallCallbacks, self).__init__()
 | 
				
			||||||
        self.plugin = plugin
 | 
					        self.plugin = plugin
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        return [self]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def set_firewall_status(self, context, firewall_id, status, **kwargs):
 | 
					    def set_firewall_status(self, context, firewall_id, status, **kwargs):
 | 
				
			||||||
        """Agent uses this to set a firewall's status."""
 | 
					        """Agent uses this to set a firewall's status."""
 | 
				
			||||||
        LOG.debug(_("set_firewall_status() called"))
 | 
					        LOG.debug(_("set_firewall_status() called"))
 | 
				
			||||||
@@ -165,13 +162,11 @@ class FirewallPlugin(firewall_db.Firewall_db_mixin):
 | 
				
			|||||||
        """Do the initialization for the firewall service plugin here."""
 | 
					        """Do the initialization for the firewall service plugin here."""
 | 
				
			||||||
        qdbapi.register_models()
 | 
					        qdbapi.register_models()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.callbacks = FirewallCallbacks(self)
 | 
					        self.endpoints = [FirewallCallbacks(self)]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.conn = rpc_compat.create_connection(new=True)
 | 
					        self.conn = rpc_compat.create_connection(new=True)
 | 
				
			||||||
        self.conn.create_consumer(
 | 
					        self.conn.create_consumer(
 | 
				
			||||||
            topics.FIREWALL_PLUGIN,
 | 
					            topics.FIREWALL_PLUGIN, self.endpoints, fanout=False)
 | 
				
			||||||
            self.callbacks.create_rpc_dispatcher(),
 | 
					 | 
				
			||||||
            fanout=False)
 | 
					 | 
				
			||||||
        self.conn.consume_in_threads()
 | 
					        self.conn.consume_in_threads()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.agent_rpc = FirewallAgentApi(
 | 
					        self.agent_rpc = FirewallAgentApi(
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -39,14 +39,6 @@ class L3RouterPluginRpcCallbacks(rpc_compat.RpcCallback,
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    RPC_API_VERSION = '1.1'
 | 
					    RPC_API_VERSION = '1.1'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        """Get the rpc dispatcher for this manager.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        If a manager would like to set an rpc API version, or support more than
 | 
					 | 
				
			||||||
        one class as the target of rpc messages, override this method.
 | 
					 | 
				
			||||||
        """
 | 
					 | 
				
			||||||
        return [self]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin,
 | 
					class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin,
 | 
				
			||||||
                     extraroute_db.ExtraRoute_db_mixin,
 | 
					                     extraroute_db.ExtraRoute_db_mixin,
 | 
				
			||||||
@@ -76,9 +68,8 @@ class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin,
 | 
				
			|||||||
        self.conn = rpc_compat.create_connection(new=True)
 | 
					        self.conn = rpc_compat.create_connection(new=True)
 | 
				
			||||||
        self.agent_notifiers.update(
 | 
					        self.agent_notifiers.update(
 | 
				
			||||||
            {q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
 | 
					            {q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
 | 
				
			||||||
        self.callbacks = L3RouterPluginRpcCallbacks()
 | 
					        self.endpoints = [L3RouterPluginRpcCallbacks()]
 | 
				
			||||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
					        self.conn.create_consumer(self.topic, self.endpoints,
 | 
				
			||||||
        self.conn.create_consumer(self.topic, self.dispatcher,
 | 
					 | 
				
			||||||
                                  fanout=False)
 | 
					                                  fanout=False)
 | 
				
			||||||
        self.conn.consume_in_threads()
 | 
					        self.conn.consume_in_threads()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -64,9 +64,6 @@ class LoadBalancerCallbacks(rpc_compat.RpcCallback):
 | 
				
			|||||||
        super(LoadBalancerCallbacks, self).__init__()
 | 
					        super(LoadBalancerCallbacks, self).__init__()
 | 
				
			||||||
        self.plugin = plugin
 | 
					        self.plugin = plugin
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        return [self, agents_db.AgentExtRpcCallback(self.plugin)]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def get_ready_devices(self, context, host=None):
 | 
					    def get_ready_devices(self, context, host=None):
 | 
				
			||||||
        with context.session.begin(subtransactions=True):
 | 
					        with context.session.begin(subtransactions=True):
 | 
				
			||||||
            agents = self.plugin.get_lbaas_agents(context,
 | 
					            agents = self.plugin.get_lbaas_agents(context,
 | 
				
			||||||
@@ -342,11 +339,14 @@ class AgentDriverBase(abstract_driver.LoadBalancerAbstractDriver):
 | 
				
			|||||||
        if hasattr(self.plugin, 'agent_callbacks'):
 | 
					        if hasattr(self.plugin, 'agent_callbacks'):
 | 
				
			||||||
            return
 | 
					            return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.plugin.agent_callbacks = LoadBalancerCallbacks(self.plugin)
 | 
					        self.plugin.agent_endpoints = [
 | 
				
			||||||
 | 
					            LoadBalancerCallbacks(self.plugin),
 | 
				
			||||||
 | 
					            agents_db.AgentExtRpcCallback(self.plugin)
 | 
				
			||||||
 | 
					        ]
 | 
				
			||||||
        self.plugin.conn = rpc_compat.create_connection(new=True)
 | 
					        self.plugin.conn = rpc_compat.create_connection(new=True)
 | 
				
			||||||
        self.plugin.conn.create_consumer(
 | 
					        self.plugin.conn.create_consumer(
 | 
				
			||||||
            topics.LOADBALANCER_PLUGIN,
 | 
					            topics.LOADBALANCER_PLUGIN,
 | 
				
			||||||
            self.plugin.agent_callbacks.create_rpc_dispatcher(),
 | 
					            self.plugin.agent_endpoints,
 | 
				
			||||||
            fanout=False)
 | 
					            fanout=False)
 | 
				
			||||||
        self.plugin.conn.consume_in_threads()
 | 
					        self.plugin.conn.consume_in_threads()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -28,13 +28,11 @@ class MeteringPlugin(metering_db.MeteringDbMixin):
 | 
				
			|||||||
    def __init__(self):
 | 
					    def __init__(self):
 | 
				
			||||||
        super(MeteringPlugin, self).__init__()
 | 
					        super(MeteringPlugin, self).__init__()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.callbacks = metering_rpc.MeteringRpcCallbacks(self)
 | 
					        self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.conn = rpc_compat.create_connection(new=True)
 | 
					        self.conn = rpc_compat.create_connection(new=True)
 | 
				
			||||||
        self.conn.create_consumer(
 | 
					        self.conn.create_consumer(
 | 
				
			||||||
            topics.METERING_PLUGIN,
 | 
					            topics.METERING_PLUGIN, self.endpoints, fanout=False)
 | 
				
			||||||
            self.callbacks.create_rpc_dispatcher(),
 | 
					 | 
				
			||||||
            fanout=False)
 | 
					 | 
				
			||||||
        self.conn.consume_in_threads()
 | 
					        self.conn.consume_in_threads()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
 | 
					        self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -198,10 +198,8 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        self.service_state = {}
 | 
					        self.service_state = {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.conn.create_consumer(
 | 
					        self.endpoints = [self]
 | 
				
			||||||
            node_topic,
 | 
					        self.conn.create_consumer(node_topic, self.endpoints, fanout=False)
 | 
				
			||||||
            self.create_rpc_dispatcher(),
 | 
					 | 
				
			||||||
            fanout=False)
 | 
					 | 
				
			||||||
        self.conn.consume_in_threads()
 | 
					        self.conn.consume_in_threads()
 | 
				
			||||||
        self.agent_rpc = (
 | 
					        self.agent_rpc = (
 | 
				
			||||||
            CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC, '1.0'))
 | 
					            CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC, '1.0'))
 | 
				
			||||||
@@ -225,9 +223,6 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver):
 | 
				
			|||||||
                                                       v['timeout']))
 | 
					                                                       v['timeout']))
 | 
				
			||||||
                          for k, v in csrs_found.items()])
 | 
					                          for k, v in csrs_found.items()])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        return [self]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def vpnservice_updated(self, context, **kwargs):
 | 
					    def vpnservice_updated(self, context, **kwargs):
 | 
				
			||||||
        """Handle VPNaaS service driver change notifications."""
 | 
					        """Handle VPNaaS service driver change notifications."""
 | 
				
			||||||
        LOG.debug(_("Handling VPN service update notification '%s'"),
 | 
					        LOG.debug(_("Handling VPN service update notification '%s'"),
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -504,10 +504,8 @@ class IPsecDriver(device_drivers.DeviceDriver):
 | 
				
			|||||||
        self.processes = {}
 | 
					        self.processes = {}
 | 
				
			||||||
        self.process_status_cache = {}
 | 
					        self.process_status_cache = {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.conn.create_consumer(
 | 
					        self.endpoints = [self]
 | 
				
			||||||
            node_topic,
 | 
					        self.conn.create_consumer(node_topic, self.endpoints, fanout=False)
 | 
				
			||||||
            self.create_rpc_dispatcher(),
 | 
					 | 
				
			||||||
            fanout=False)
 | 
					 | 
				
			||||||
        self.conn.consume_in_threads()
 | 
					        self.conn.consume_in_threads()
 | 
				
			||||||
        self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC, '1.0')
 | 
					        self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC, '1.0')
 | 
				
			||||||
        self.process_status_cache_check = loopingcall.FixedIntervalLoopingCall(
 | 
					        self.process_status_cache_check = loopingcall.FixedIntervalLoopingCall(
 | 
				
			||||||
@@ -515,9 +513,6 @@ class IPsecDriver(device_drivers.DeviceDriver):
 | 
				
			|||||||
        self.process_status_cache_check.start(
 | 
					        self.process_status_cache_check.start(
 | 
				
			||||||
            interval=self.conf.ipsec.ipsec_status_check_interval)
 | 
					            interval=self.conf.ipsec.ipsec_status_check_interval)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        return [self]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def _update_nat(self, vpnservice, func):
 | 
					    def _update_nat(self, vpnservice, func):
 | 
				
			||||||
        """Setting up nat rule in iptables.
 | 
					        """Setting up nat rule in iptables.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -53,9 +53,6 @@ class CiscoCsrIPsecVpnDriverCallBack(rpc_compat.RpcCallback):
 | 
				
			|||||||
        super(CiscoCsrIPsecVpnDriverCallBack, self).__init__()
 | 
					        super(CiscoCsrIPsecVpnDriverCallBack, self).__init__()
 | 
				
			||||||
        self.driver = driver
 | 
					        self.driver = driver
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        return [self]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def get_vpn_services_on_host(self, context, host=None):
 | 
					    def get_vpn_services_on_host(self, context, host=None):
 | 
				
			||||||
        """Retuns info on the vpnservices on the host."""
 | 
					        """Retuns info on the vpnservices on the host."""
 | 
				
			||||||
        plugin = self.driver.service_plugin
 | 
					        plugin = self.driver.service_plugin
 | 
				
			||||||
@@ -88,12 +85,10 @@ class CiscoCsrIPsecVPNDriver(service_drivers.VpnDriver):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    def __init__(self, service_plugin):
 | 
					    def __init__(self, service_plugin):
 | 
				
			||||||
        super(CiscoCsrIPsecVPNDriver, self).__init__(service_plugin)
 | 
					        super(CiscoCsrIPsecVPNDriver, self).__init__(service_plugin)
 | 
				
			||||||
        self.callbacks = CiscoCsrIPsecVpnDriverCallBack(self)
 | 
					        self.endpoints = [CiscoCsrIPsecVpnDriverCallBack(self)]
 | 
				
			||||||
        self.conn = rpc_compat.create_connection(new=True)
 | 
					        self.conn = rpc_compat.create_connection(new=True)
 | 
				
			||||||
        self.conn.create_consumer(
 | 
					        self.conn.create_consumer(
 | 
				
			||||||
            topics.CISCO_IPSEC_DRIVER_TOPIC,
 | 
					            topics.CISCO_IPSEC_DRIVER_TOPIC, self.endpoints, fanout=False)
 | 
				
			||||||
            self.callbacks.create_rpc_dispatcher(),
 | 
					 | 
				
			||||||
            fanout=False)
 | 
					 | 
				
			||||||
        self.conn.consume_in_threads()
 | 
					        self.conn.consume_in_threads()
 | 
				
			||||||
        self.agent_rpc = CiscoCsrIPsecVpnAgentApi(
 | 
					        self.agent_rpc = CiscoCsrIPsecVpnAgentApi(
 | 
				
			||||||
            topics.CISCO_IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)
 | 
					            topics.CISCO_IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -40,9 +40,6 @@ class IPsecVpnDriverCallBack(rpc_compat.RpcCallback):
 | 
				
			|||||||
        super(IPsecVpnDriverCallBack, self).__init__()
 | 
					        super(IPsecVpnDriverCallBack, self).__init__()
 | 
				
			||||||
        self.driver = driver
 | 
					        self.driver = driver
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_rpc_dispatcher(self):
 | 
					 | 
				
			||||||
        return [self]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def get_vpn_services_on_host(self, context, host=None):
 | 
					    def get_vpn_services_on_host(self, context, host=None):
 | 
				
			||||||
        """Returns the vpnservices on the host."""
 | 
					        """Returns the vpnservices on the host."""
 | 
				
			||||||
        plugin = self.driver.service_plugin
 | 
					        plugin = self.driver.service_plugin
 | 
				
			||||||
@@ -73,12 +70,10 @@ class IPsecVPNDriver(service_drivers.VpnDriver):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    def __init__(self, service_plugin):
 | 
					    def __init__(self, service_plugin):
 | 
				
			||||||
        super(IPsecVPNDriver, self).__init__(service_plugin)
 | 
					        super(IPsecVPNDriver, self).__init__(service_plugin)
 | 
				
			||||||
        self.callbacks = IPsecVpnDriverCallBack(self)
 | 
					        self.endpoints = [IPsecVpnDriverCallBack(self)]
 | 
				
			||||||
        self.conn = rpc_compat.create_connection(new=True)
 | 
					        self.conn = rpc_compat.create_connection(new=True)
 | 
				
			||||||
        self.conn.create_consumer(
 | 
					        self.conn.create_consumer(
 | 
				
			||||||
            topics.IPSEC_DRIVER_TOPIC,
 | 
					            topics.IPSEC_DRIVER_TOPIC, self.endpoints, fanout=False)
 | 
				
			||||||
            self.callbacks.create_rpc_dispatcher(),
 | 
					 | 
				
			||||||
            fanout=False)
 | 
					 | 
				
			||||||
        self.conn.consume_in_threads()
 | 
					        self.conn.consume_in_threads()
 | 
				
			||||||
        self.agent_rpc = IPsecVpnAgentApi(
 | 
					        self.agent_rpc = IPsecVpnAgentApi(
 | 
				
			||||||
            topics.IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)
 | 
					            topics.IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -33,8 +33,6 @@ from neutron.db import portbindings_db  # noqa
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
RESTPROXY_PKG_PATH = 'neutron.plugins.bigswitch.plugin'
 | 
					RESTPROXY_PKG_PATH = 'neutron.plugins.bigswitch.plugin'
 | 
				
			||||||
NOTIFIER = 'neutron.plugins.bigswitch.plugin.AgentNotifierApi'
 | 
					NOTIFIER = 'neutron.plugins.bigswitch.plugin.AgentNotifierApi'
 | 
				
			||||||
CALLBACKS = 'neutron.plugins.bigswitch.plugin.RestProxyCallbacks'
 | 
					 | 
				
			||||||
DISPATCHER = CALLBACKS + '.create_rpc_dispatcher'
 | 
					 | 
				
			||||||
CERTFETCH = 'neutron.plugins.bigswitch.servermanager.ServerPool._fetch_cert'
 | 
					CERTFETCH = 'neutron.plugins.bigswitch.servermanager.ServerPool._fetch_cert'
 | 
				
			||||||
SERVER_MANAGER = 'neutron.plugins.bigswitch.servermanager'
 | 
					SERVER_MANAGER = 'neutron.plugins.bigswitch.servermanager'
 | 
				
			||||||
HTTPCON = 'neutron.plugins.bigswitch.servermanager.httplib.HTTPConnection'
 | 
					HTTPCON = 'neutron.plugins.bigswitch.servermanager.httplib.HTTPConnection'
 | 
				
			||||||
@@ -61,15 +59,11 @@ class BigSwitchTestBase(object):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    def setup_patches(self):
 | 
					    def setup_patches(self):
 | 
				
			||||||
        self.plugin_notifier_p = mock.patch(NOTIFIER)
 | 
					        self.plugin_notifier_p = mock.patch(NOTIFIER)
 | 
				
			||||||
        # prevent rpc callback dispatcher from being created
 | 
					 | 
				
			||||||
        self.callbacks_p = mock.patch(DISPATCHER,
 | 
					 | 
				
			||||||
                                      new=lambda *args, **kwargs: None)
 | 
					 | 
				
			||||||
        # prevent any greenthreads from spawning
 | 
					        # prevent any greenthreads from spawning
 | 
				
			||||||
        self.spawn_p = mock.patch(SPAWN, new=lambda *args, **kwargs: None)
 | 
					        self.spawn_p = mock.patch(SPAWN, new=lambda *args, **kwargs: None)
 | 
				
			||||||
        # prevent the consistency watchdog from starting
 | 
					        # prevent the consistency watchdog from starting
 | 
				
			||||||
        self.watch_p = mock.patch(CWATCH, new=lambda *args, **kwargs: None)
 | 
					        self.watch_p = mock.patch(CWATCH, new=lambda *args, **kwargs: None)
 | 
				
			||||||
        self.addCleanup(db.clear_db)
 | 
					        self.addCleanup(db.clear_db)
 | 
				
			||||||
        self.callbacks_p.start()
 | 
					 | 
				
			||||||
        self.plugin_notifier_p.start()
 | 
					        self.plugin_notifier_p.start()
 | 
				
			||||||
        self.spawn_p.start()
 | 
					        self.spawn_p.start()
 | 
				
			||||||
        self.watch_p.start()
 | 
					        self.watch_p.start()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -32,7 +32,7 @@ class RestProxySecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase,
 | 
				
			|||||||
        super(RestProxySecurityGroupsTestCase, self).setUp(self.plugin_str)
 | 
					        super(RestProxySecurityGroupsTestCase, self).setUp(self.plugin_str)
 | 
				
			||||||
        plugin = manager.NeutronManager.get_plugin()
 | 
					        plugin = manager.NeutronManager.get_plugin()
 | 
				
			||||||
        self.notifier = plugin.notifier
 | 
					        self.notifier = plugin.notifier
 | 
				
			||||||
        self.rpc = plugin.callbacks
 | 
					        self.rpc = plugin.endpoints[0]
 | 
				
			||||||
        self.startHttpPatch()
 | 
					        self.startHttpPatch()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -62,7 +62,7 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
 | 
				
			|||||||
                                 bound, status)
 | 
					                                 bound, status)
 | 
				
			||||||
            port_id = port['port']['id']
 | 
					            port_id = port['port']['id']
 | 
				
			||||||
            neutron_context = context.get_admin_context()
 | 
					            neutron_context = context.get_admin_context()
 | 
				
			||||||
            details = self.plugin.callbacks.get_device_details(
 | 
					            details = self.plugin.endpoints[0].get_device_details(
 | 
				
			||||||
                neutron_context, agent_id="theAgentId", device=port_id)
 | 
					                neutron_context, agent_id="theAgentId", device=port_id)
 | 
				
			||||||
            if bound:
 | 
					            if bound:
 | 
				
			||||||
                self.assertEqual(details['network_type'], 'local')
 | 
					                self.assertEqual(details['network_type'], 'local')
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -74,7 +74,8 @@ class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase,
 | 
				
			|||||||
                                           req.get_response(self.api))
 | 
					                                           req.get_response(self.api))
 | 
				
			||||||
                    port_id = res['port']['id']
 | 
					                    port_id = res['port']['id']
 | 
				
			||||||
                    plugin = manager.NeutronManager.get_plugin()
 | 
					                    plugin = manager.NeutronManager.get_plugin()
 | 
				
			||||||
                    port_dict = plugin.callbacks.get_port_from_device(port_id)
 | 
					                    callbacks = plugin.endpoints[0]
 | 
				
			||||||
 | 
					                    port_dict = callbacks.get_port_from_device(port_id)
 | 
				
			||||||
                    self.assertEqual(port_id, port_dict['id'])
 | 
					                    self.assertEqual(port_id, port_dict['id'])
 | 
				
			||||||
                    self.assertEqual([security_group_id],
 | 
					                    self.assertEqual([security_group_id],
 | 
				
			||||||
                                     port_dict[ext_sg.SECURITYGROUPS])
 | 
					                                     port_dict[ext_sg.SECURITYGROUPS])
 | 
				
			||||||
@@ -85,7 +86,7 @@ class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase,
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    def test_security_group_get_port_from_device_with_no_port(self):
 | 
					    def test_security_group_get_port_from_device_with_no_port(self):
 | 
				
			||||||
        plugin = manager.NeutronManager.get_plugin()
 | 
					        plugin = manager.NeutronManager.get_plugin()
 | 
				
			||||||
        port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
 | 
					        port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
 | 
				
			||||||
        self.assertIsNone(port_dict)
 | 
					        self.assertIsNone(port_dict)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -136,7 +136,8 @@ class TestOneConvergenceSecurityGroups(OneConvergenceSecurityGroupsTestCase,
 | 
				
			|||||||
                                           req.get_response(self.api))
 | 
					                                           req.get_response(self.api))
 | 
				
			||||||
                    port_id = res['port']['id']
 | 
					                    port_id = res['port']['id']
 | 
				
			||||||
                    plugin = manager.NeutronManager.get_plugin()
 | 
					                    plugin = manager.NeutronManager.get_plugin()
 | 
				
			||||||
                    port_dict = plugin.callbacks.get_port_from_device(port_id)
 | 
					                    callbacks = plugin.endpoints[0]
 | 
				
			||||||
 | 
					                    port_dict = callbacks.get_port_from_device(port_id)
 | 
				
			||||||
                    self.assertEqual(port_id, port_dict['id'])
 | 
					                    self.assertEqual(port_id, port_dict['id'])
 | 
				
			||||||
                    self.assertEqual([security_group_id],
 | 
					                    self.assertEqual([security_group_id],
 | 
				
			||||||
                                     port_dict[ext_sg.SECURITYGROUPS])
 | 
					                                     port_dict[ext_sg.SECURITYGROUPS])
 | 
				
			||||||
@@ -148,7 +149,7 @@ class TestOneConvergenceSecurityGroups(OneConvergenceSecurityGroupsTestCase,
 | 
				
			|||||||
    def test_security_group_get_port_from_device_with_no_port(self):
 | 
					    def test_security_group_get_port_from_device_with_no_port(self):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        plugin = manager.NeutronManager.get_plugin()
 | 
					        plugin = manager.NeutronManager.get_plugin()
 | 
				
			||||||
        port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
 | 
					        port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
 | 
				
			||||||
        self.assertIsNone(port_dict)
 | 
					        self.assertIsNone(port_dict)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -84,7 +84,8 @@ class TestOpenvswitchSecurityGroups(OpenvswitchSecurityGroupsTestCase,
 | 
				
			|||||||
                                           req.get_response(self.api))
 | 
					                                           req.get_response(self.api))
 | 
				
			||||||
                    port_id = res['port']['id']
 | 
					                    port_id = res['port']['id']
 | 
				
			||||||
                    plugin = manager.NeutronManager.get_plugin()
 | 
					                    plugin = manager.NeutronManager.get_plugin()
 | 
				
			||||||
                    port_dict = plugin.callbacks.get_port_from_device(port_id)
 | 
					                    callbacks = plugin.endpoints[0]
 | 
				
			||||||
 | 
					                    port_dict = callbacks.get_port_from_device(port_id)
 | 
				
			||||||
                    self.assertEqual(port_id, port_dict['id'])
 | 
					                    self.assertEqual(port_id, port_dict['id'])
 | 
				
			||||||
                    self.assertEqual([security_group_id],
 | 
					                    self.assertEqual([security_group_id],
 | 
				
			||||||
                                     port_dict[ext_sg.SECURITYGROUPS])
 | 
					                                     port_dict[ext_sg.SECURITYGROUPS])
 | 
				
			||||||
@@ -95,7 +96,7 @@ class TestOpenvswitchSecurityGroups(OpenvswitchSecurityGroupsTestCase,
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    def test_security_group_get_port_from_device_with_no_port(self):
 | 
					    def test_security_group_get_port_from_device_with_no_port(self):
 | 
				
			||||||
        plugin = manager.NeutronManager.get_plugin()
 | 
					        plugin = manager.NeutronManager.get_plugin()
 | 
				
			||||||
        port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
 | 
					        port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
 | 
				
			||||||
        self.assertIsNone(port_dict)
 | 
					        self.assertIsNone(port_dict)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -73,7 +73,7 @@ class TestRyuSecurityGroups(RyuSecurityGroupsTestCase,
 | 
				
			|||||||
                                       req.get_response(self.api))
 | 
					                                       req.get_response(self.api))
 | 
				
			||||||
                port_id = res['port']['id']
 | 
					                port_id = res['port']['id']
 | 
				
			||||||
                plugin = manager.NeutronManager.get_plugin()
 | 
					                plugin = manager.NeutronManager.get_plugin()
 | 
				
			||||||
                port_dict = plugin.callbacks.get_port_from_device(port_id)
 | 
					                port_dict = plugin.endpoints[0].get_port_from_device(port_id)
 | 
				
			||||||
                self.assertEqual(port_id, port_dict['id'])
 | 
					                self.assertEqual(port_id, port_dict['id'])
 | 
				
			||||||
                self.assertEqual([security_group_id],
 | 
					                self.assertEqual([security_group_id],
 | 
				
			||||||
                                 port_dict[ext_sg.SECURITYGROUPS])
 | 
					                                 port_dict[ext_sg.SECURITYGROUPS])
 | 
				
			||||||
@@ -84,7 +84,7 @@ class TestRyuSecurityGroups(RyuSecurityGroupsTestCase,
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    def test_security_group_get_port_from_device_with_no_port(self):
 | 
					    def test_security_group_get_port_from_device_with_no_port(self):
 | 
				
			||||||
        plugin = manager.NeutronManager.get_plugin()
 | 
					        plugin = manager.NeutronManager.get_plugin()
 | 
				
			||||||
        port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
 | 
					        port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
 | 
				
			||||||
        self.assertIsNone(port_dict)
 | 
					        self.assertIsNone(port_dict)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -41,7 +41,7 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase):
 | 
				
			|||||||
    def setUp(self):
 | 
					    def setUp(self):
 | 
				
			||||||
        super(TestFirewallCallbacks,
 | 
					        super(TestFirewallCallbacks,
 | 
				
			||||||
              self).setUp(fw_plugin=FW_PLUGIN_KLASS)
 | 
					              self).setUp(fw_plugin=FW_PLUGIN_KLASS)
 | 
				
			||||||
        self.callbacks = self.plugin.callbacks
 | 
					        self.callbacks = self.plugin.endpoints[0]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def test_set_firewall_status(self):
 | 
					    def test_set_firewall_status(self):
 | 
				
			||||||
        ctx = context.get_admin_context()
 | 
					        ctx = context.get_admin_context()
 | 
				
			||||||
@@ -210,7 +210,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    def setUp(self):
 | 
					    def setUp(self):
 | 
				
			||||||
        super(TestFirewallPluginBase, self).setUp(fw_plugin=FW_PLUGIN_KLASS)
 | 
					        super(TestFirewallPluginBase, self).setUp(fw_plugin=FW_PLUGIN_KLASS)
 | 
				
			||||||
        self.callbacks = self.plugin.callbacks
 | 
					        self.callbacks = self.plugin.endpoints[0]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def test_create_second_firewall_not_permitted(self):
 | 
					    def test_create_second_firewall_not_permitted(self):
 | 
				
			||||||
        with self.firewall():
 | 
					        with self.firewall():
 | 
				
			||||||
@@ -342,7 +342,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin):
 | 
				
			|||||||
                for k, v in attrs.iteritems():
 | 
					                for k, v in attrs.iteritems():
 | 
				
			||||||
                    self.assertEqual(fw_db[k], v)
 | 
					                    self.assertEqual(fw_db[k], v)
 | 
				
			||||||
            # cleanup the pending firewall
 | 
					            # cleanup the pending firewall
 | 
				
			||||||
            self.plugin.callbacks.firewall_deleted(ctx, fw_id)
 | 
					            self.plugin.endpoints[0].firewall_deleted(ctx, fw_id)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def test_delete_firewall_after_agent_delete(self):
 | 
					    def test_delete_firewall_after_agent_delete(self):
 | 
				
			||||||
        ctx = context.get_admin_context()
 | 
					        ctx = context.get_admin_context()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -83,31 +83,31 @@ class AgentPluginReportState(base.BaseTestCase):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
class AgentRPCMethods(base.BaseTestCase):
 | 
					class AgentRPCMethods(base.BaseTestCase):
 | 
				
			||||||
    def test_create_consumers(self):
 | 
					    def test_create_consumers(self):
 | 
				
			||||||
        dispatcher = mock.Mock()
 | 
					        endpoints = [mock.Mock()]
 | 
				
			||||||
        expected = [
 | 
					        expected = [
 | 
				
			||||||
            mock.call(new=True),
 | 
					            mock.call(new=True),
 | 
				
			||||||
            mock.call().create_consumer('foo-topic-op', dispatcher,
 | 
					            mock.call().create_consumer('foo-topic-op', endpoints,
 | 
				
			||||||
                                        fanout=True),
 | 
					                                        fanout=True),
 | 
				
			||||||
            mock.call().consume_in_threads()
 | 
					            mock.call().consume_in_threads()
 | 
				
			||||||
        ]
 | 
					        ]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        call_to_patch = 'neutron.common.rpc_compat.create_connection'
 | 
					        call_to_patch = 'neutron.common.rpc_compat.create_connection'
 | 
				
			||||||
        with mock.patch(call_to_patch) as create_connection:
 | 
					        with mock.patch(call_to_patch) as create_connection:
 | 
				
			||||||
            rpc.create_consumers(dispatcher, 'foo', [('topic', 'op')])
 | 
					            rpc.create_consumers(endpoints, 'foo', [('topic', 'op')])
 | 
				
			||||||
            create_connection.assert_has_calls(expected)
 | 
					            create_connection.assert_has_calls(expected)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def test_create_consumers_with_node_name(self):
 | 
					    def test_create_consumers_with_node_name(self):
 | 
				
			||||||
        dispatcher = mock.Mock()
 | 
					        endpoints = [mock.Mock()]
 | 
				
			||||||
        expected = [
 | 
					        expected = [
 | 
				
			||||||
            mock.call(new=True),
 | 
					            mock.call(new=True),
 | 
				
			||||||
            mock.call().create_consumer('foo-topic-op', dispatcher,
 | 
					            mock.call().create_consumer('foo-topic-op', endpoints,
 | 
				
			||||||
                                        fanout=True),
 | 
					                                        fanout=True),
 | 
				
			||||||
            mock.call().create_consumer('foo-topic-op.node1', dispatcher,
 | 
					            mock.call().create_consumer('foo-topic-op.node1', endpoints,
 | 
				
			||||||
                                        fanout=False),
 | 
					                                        fanout=False),
 | 
				
			||||||
            mock.call().consume_in_threads()
 | 
					            mock.call().consume_in_threads()
 | 
				
			||||||
        ]
 | 
					        ]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        call_to_patch = 'neutron.common.rpc_compat.create_connection'
 | 
					        call_to_patch = 'neutron.common.rpc_compat.create_connection'
 | 
				
			||||||
        with mock.patch(call_to_patch) as create_connection:
 | 
					        with mock.patch(call_to_patch) as create_connection:
 | 
				
			||||||
            rpc.create_consumers(dispatcher, 'foo', [('topic', 'op', 'node1')])
 | 
					            rpc.create_consumers(endpoints, 'foo', [('topic', 'op', 'node1')])
 | 
				
			||||||
            create_connection.assert_has_calls(expected)
 | 
					            create_connection.assert_has_calls(expected)
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user