Merge "Introduced rpc_compat.create_connection()"

This commit is contained in:
Jenkins 2014-06-16 23:11:30 +00:00 committed by Gerrit Code Review
commit d379170109
29 changed files with 40 additions and 54 deletions

View File

@ -21,7 +21,6 @@ from neutron.common import rpc_compat
from neutron.common import topics
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.openstack.common import timeutils
@ -40,7 +39,7 @@ def create_consumers(dispatcher, prefix, topic_details):
:returns: A common Connection.
"""
connection = rpc.create_connection(new=True)
connection = rpc_compat.create_connection(new=True)
for details in topic_details:
topic, operation, node_name = itertools.islice(
itertools.chain(details, [None]), 3)

View File

@ -60,7 +60,7 @@ class Service(service.Service):
def start(self):
super(Service, self).start()
self.conn = rpc.create_connection(new=True)
self.conn = create_connection(new=True)
LOG.debug("Creating Consumer connection for Service %s" %
self.topic)
@ -93,6 +93,10 @@ class Service(service.Service):
super(Service, self).stop()
# functions
create_connection = rpc.create_connection
# exceptions
RPCException = rpc_common.RPCException
RemoteError = rpc_common.RemoteError

View File

@ -83,7 +83,6 @@ from neutron import manager
from neutron.openstack.common import excutils
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.plugins.bigswitch import config as pl_config
from neutron.plugins.bigswitch.db import porttracker_db
from neutron.plugins.bigswitch import extensions
@ -507,7 +506,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
LOG.debug(_("NeutronRestProxyV2: initialization done"))
def _setup_rpc(self):
self.conn = rpc.create_connection(new=True)
self.conn = rpc_compat.create_connection(new=True)
self.topic = topics.PLUGIN
self.notifier = AgentNotifierApi(topics.AGENT)
# init dhcp agent support

View File

@ -51,7 +51,6 @@ from neutron.extensions import securitygroup as ext_sg
from neutron.openstack.common import context
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.plugins.brocade.db import models as brocade_db
from neutron.plugins.brocade import vlanbm as vbm
from neutron.plugins.common import constants as svc_constants
@ -264,7 +263,7 @@ class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.rpc_context = context.RequestContext('neutron', 'neutron',
is_admin=False)
self.conn = rpc.create_connection(new=True)
self.conn = rpc_compat.create_connection(new=True)
self.callbacks = BridgeRpcCallbacks()
self.dispatcher = self.callbacks.create_rpc_dispatcher()
for svc_topic in self.service_topics.values():

View File

@ -45,7 +45,6 @@ from neutron.extensions import portbindings
from neutron.extensions import providernet
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.openstack.common import uuidutils as uuidutils
from neutron.plugins.cisco.common import cisco_constants as c_const
from neutron.plugins.cisco.common import cisco_credentials_v2 as c_cred
@ -137,7 +136,7 @@ class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
# RPC support
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc.create_connection(new=True)
self.conn = rpc_compat.create_connection(new=True)
self.dispatcher = N1kvRpcCallbacks().create_rpc_dispatcher()
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)

View File

@ -20,6 +20,7 @@ from oslo.config import cfg
from neutron.api.v2 import attributes
from neutron.common import exceptions as n_exc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db import agents_db
from neutron.db import db_base_plugin_v2
@ -30,7 +31,6 @@ from neutron.db import quota_db # noqa
from neutron.extensions import portbindings
from neutron.extensions import providernet as provider
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.plugins.common import constants as svc_constants
from neutron.plugins.common import utils as plugin_utils
from neutron.plugins.hyperv import agent_notifier_api
@ -187,7 +187,7 @@ class HyperVNeutronPlugin(agents_db.AgentDbMixin,
# RPC support
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc.create_connection(new=True)
self.conn = rpc_compat.create_connection(new=True)
self.notifier = agent_notifier_api.AgentNotifierApi(
topics.AGENT)
self.callbacks = rpc_callbacks.HyperVRpcCallbacks(self.notifier)

View File

@ -35,7 +35,6 @@ from neutron.db import quota_db # noqa
from neutron.extensions import portbindings
from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.plugins.ibm.common import config # noqa
from neutron.plugins.ibm.common import constants
from neutron.plugins.ibm.common import exceptions as sdnve_exc
@ -141,7 +140,7 @@ class SdnvePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
def setup_rpc(self):
# RPC support
self.topic = topics.PLUGIN
self.conn = rpc.create_connection(new=True)
self.conn = rpc_compat.create_connection(new=True)
self.notifier = AgentNotifierApi(topics.AGENT)
self.callbacks = SdnveRpcCallbacks(self.notifier)
self.dispatcher = self.callbacks.create_rpc_dispatcher()

View File

@ -45,7 +45,6 @@ from neutron.extensions import providernet as provider
from neutron import manager
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.plugins.common import constants as svc_constants
from neutron.plugins.common import utils as plugin_utils
from neutron.plugins.linuxbridge.common import constants
@ -283,7 +282,7 @@ class LinuxBridgePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
# RPC support
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc.create_connection(new=True)
self.conn = rpc_compat.create_connection(new=True)
self.callbacks = LinuxBridgeRpcCallbacks()
self.dispatcher = self.callbacks.create_rpc_dispatcher()
for svc_topic in self.service_topics.values():

View File

@ -47,7 +47,6 @@ from neutron.extensions import portbindings
from neutron.extensions import securitygroup as ext_sg
from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.plugins.midonet.common import config # noqa
from neutron.plugins.midonet.common import net_util
from neutron.plugins.midonet import midonet_lib
@ -384,7 +383,7 @@ class MidonetPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
def setup_rpc(self):
# RPC support
self.topic = topics.PLUGIN
self.conn = rpc.create_connection(new=True)
self.conn = rpc_compat.create_connection(new=True)
self.callbacks = MidoRpcCallbacks()
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,

View File

@ -23,6 +23,7 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.v2 import attributes
from neutron.common import constants as const
from neutron.common import exceptions as exc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db import agentschedulers_db
from neutron.db import allowedaddresspairs_db as addr_pair_db
@ -44,7 +45,6 @@ from neutron.openstack.common import importutils
from neutron.openstack.common import jsonutils
from neutron.openstack.common import lockutils
from neutron.openstack.common import log
from neutron.openstack.common import rpc as c_rpc
from neutron.plugins.common import constants as service_constants
from neutron.plugins.ml2.common import exceptions as ml2_exc
from neutron.plugins.ml2 import config # noqa
@ -128,7 +128,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def start_rpc_listener(self):
self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager)
self.topic = topics.PLUGIN
self.conn = c_rpc.create_connection(new=True)
self.conn = rpc_compat.create_connection(new=True)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
fanout=False)

View File

@ -25,6 +25,7 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.v2 import attributes
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.common import utils
from neutron.db import agentschedulers_db
@ -40,7 +41,6 @@ from neutron.extensions import portbindings
from neutron.extensions import providernet as provider
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.plugins.common import constants as svc_constants
from neutron.plugins.common import utils as plugin_utils
from neutron.plugins.mlnx import agent_notify_api
@ -119,7 +119,7 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
# RPC support
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc.create_connection(new=True)
self.conn = rpc_compat.create_connection(new=True)
self.callbacks = rpc_callbacks.MlnxRpcCallbacks()
self.dispatcher = self.callbacks.create_rpc_dispatcher()
for svc_topic in self.service_topics.values():

View File

@ -41,7 +41,6 @@ from neutron.extensions import portbindings
from neutron.openstack.common import excutils
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants as svc_constants
from neutron.plugins.nec.common import config
@ -137,7 +136,7 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
def setup_rpc(self):
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc.create_connection(new=True)
self.conn = rpc_compat.create_connection(new=True)
self.notifier = NECPluginV2AgentNotifierApi(topics.AGENT)
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()

View File

@ -42,7 +42,6 @@ from neutron.extensions import portbindings
from neutron.openstack.common import excutils
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.plugins.common import constants as svc_constants
import neutron.plugins.oneconvergence.lib.config # noqa
import neutron.plugins.oneconvergence.lib.exception as nvsdexception
@ -160,7 +159,7 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
# RPC support
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc.create_connection(new=True)
self.conn = rpc_compat.create_connection(new=True)
self.notifier = NVSDPluginV2AgentNotifierApi(topics.AGENT)
self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()

View File

@ -48,7 +48,6 @@ from neutron.extensions import providernet as provider
from neutron import manager
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.plugins.common import constants as svc_constants
from neutron.plugins.common import utils as plugin_utils
from neutron.plugins.openvswitch.common import config # noqa
@ -336,7 +335,7 @@ class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
# RPC support
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc.create_connection(new=True)
self.conn = rpc_compat.create_connection(new=True)
self.notifier = AgentNotifierApi(topics.AGENT)
self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()

View File

@ -39,7 +39,6 @@ from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import portbindings
from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.plugins.common import constants as svc_constants
from neutron.plugins.ryu.common import config # noqa
from neutron.plugins.ryu.db import api_v2 as db_api_v2
@ -143,7 +142,7 @@ class RyuNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
def _setup_rpc(self):
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc.create_connection(new=True)
self.conn = rpc_compat.create_connection(new=True)
self.notifier = AgentNotifierApi(topics.AGENT)
self.callbacks = RyuRpcCallbacks(self.ofp_api_host)
self.dispatcher = self.callbacks.create_rpc_dispatcher()

View File

@ -19,10 +19,10 @@ from oslo.config import cfg
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.common import constants as const
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.plugins.vmware.common import config
from neutron.plugins.vmware.common import exceptions as nsx_exc
from neutron.plugins.vmware.dhcp_meta import combined
@ -69,7 +69,7 @@ class DhcpMetadataAccess(object):
def _setup_rpc_dhcp_metadata(self, notifier=None):
self.topic = topics.PLUGIN
self.conn = rpc.create_connection(new=True)
self.conn = rpc_compat.create_connection(new=True)
self.dispatcher = nsx_rpc.NSXRpcCallbacks().create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher, fanout=False)
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (

View File

@ -28,7 +28,6 @@ from neutron.db import api as qdbapi
from neutron.db.firewall import firewall_db
from neutron.extensions import firewall as fw_ext
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.plugins.common import constants as const
@ -169,7 +168,7 @@ class FirewallPlugin(firewall_db.Firewall_db_mixin):
self.callbacks = FirewallCallbacks(self)
self.conn = rpc.create_connection(new=True)
self.conn = rpc_compat.create_connection(new=True)
self.conn.create_consumer(
topics.FIREWALL_PLUGIN,
self.callbacks.create_rpc_dispatcher(),

View File

@ -32,7 +32,6 @@ from neutron.db import l3_gwmode_db
from neutron.db import l3_rpc_base
from neutron.db import model_base
from neutron.openstack.common import importutils
from neutron.openstack.common import rpc
from neutron.plugins.common import constants
@ -75,7 +74,7 @@ class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin,
def setup_rpc(self):
# RPC support
self.topic = topics.L3PLUGIN
self.conn = rpc.create_connection(new=True)
self.conn = rpc_compat.create_connection(new=True)
self.agent_notifiers.update(
{q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotify})
self.callbacks = L3RouterPluginRpcCallbacks()

View File

@ -31,7 +31,6 @@ from neutron.extensions import lbaas_agentscheduler
from neutron.extensions import portbindings
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.plugins.common import constants
from neutron.services.loadbalancer.drivers import abstract_driver
@ -346,7 +345,7 @@ class AgentDriverBase(abstract_driver.LoadBalancerAbstractDriver):
return
self.plugin.agent_callbacks = LoadBalancerCallbacks(self.plugin)
self.plugin.conn = rpc.create_connection(new=True)
self.plugin.conn = rpc_compat.create_connection(new=True)
self.plugin.conn.create_consumer(
topics.LOADBALANCER_PLUGIN,
self.plugin.agent_callbacks.create_rpc_dispatcher(),

View File

@ -15,10 +15,10 @@
# under the License.
from neutron.api.rpc.agentnotifiers import metering_rpc_agent_api
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db.metering import metering_db
from neutron.db.metering import metering_rpc
from neutron.openstack.common import rpc
class MeteringPlugin(metering_db.MeteringDbMixin):
@ -30,7 +30,7 @@ class MeteringPlugin(metering_db.MeteringDbMixin):
self.callbacks = metering_rpc.MeteringRpcCallbacks(self)
self.conn = rpc.create_connection(new=True)
self.conn = rpc_compat.create_connection(new=True)
self.conn.create_consumer(
topics.METERING_PLUGIN,
self.callbacks.create_rpc_dispatcher(),

View File

@ -29,7 +29,6 @@ from neutron import context as ctx
from neutron.openstack.common import lockutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common import rpc
from neutron.plugins.common import constants
from neutron.plugins.common import utils as plugin_utils
from neutron.services.vpn.common import topics
@ -192,7 +191,7 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver):
# TODO(ihrachys): we can't use RpcCallback here due to
# inheritance issues
self.host = host
self.conn = rpc.create_connection(new=True)
self.conn = rpc_compat.create_connection(new=True)
context = ctx.get_admin_context_without_session()
node_topic = '%s.%s' % (topics.CISCO_IPSEC_AGENT_TOPIC, self.host)

View File

@ -33,7 +33,6 @@ from neutron import context
from neutron.openstack.common import lockutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common import rpc
from neutron.plugins.common import constants
from neutron.plugins.common import utils as plugin_utils
from neutron.services.vpn.common import topics
@ -495,7 +494,7 @@ class IPsecDriver(device_drivers.DeviceDriver):
self.conf = self.agent.conf
self.root_helper = self.agent.root_helper
self.host = host
self.conn = rpc.create_connection(new=True)
self.conn = rpc_compat.create_connection(new=True)
self.context = context.get_admin_context_without_session()
self.topic = topics.IPSEC_AGENT_TOPIC
node_topic = '%s.%s' % (self.topic, self.host)

View File

@ -20,7 +20,6 @@ from neutron.common import rpc as n_rpc
from neutron.common import rpc_compat
from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.plugins.common import constants
from neutron.services.vpn.common import topics
from neutron.services.vpn import service_drivers
@ -91,7 +90,7 @@ class CiscoCsrIPsecVPNDriver(service_drivers.VpnDriver):
def __init__(self, service_plugin):
super(CiscoCsrIPsecVPNDriver, self).__init__(service_plugin)
self.callbacks = CiscoCsrIPsecVpnDriverCallBack(self)
self.conn = rpc.create_connection(new=True)
self.conn = rpc_compat.create_connection(new=True)
self.conn.create_consumer(
topics.CISCO_IPSEC_DRIVER_TOPIC,
self.callbacks.create_rpc_dispatcher(),

View File

@ -19,7 +19,6 @@ import netaddr
from neutron.common import rpc as n_rpc
from neutron.common import rpc_compat
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.services.vpn.common import topics
from neutron.services.vpn import service_drivers
@ -76,7 +75,7 @@ class IPsecVPNDriver(service_drivers.VpnDriver):
def __init__(self, service_plugin):
super(IPsecVPNDriver, self).__init__(service_plugin)
self.callbacks = IPsecVpnDriverCallBack(self)
self.conn = rpc.create_connection(new=True)
self.conn = rpc_compat.create_connection(new=True)
self.conn.create_consumer(
topics.IPSEC_DRIVER_TOPIC,
self.callbacks.create_rpc_dispatcher(),

View File

@ -399,7 +399,7 @@ class TestCiscoCsrIPsecDeviceDriverSyncStatuses(base.BaseTestCase):
def setUp(self):
super(TestCiscoCsrIPsecDeviceDriverSyncStatuses, self).setUp()
for klass in ['neutron.openstack.common.rpc.create_connection',
for klass in ['neutron.common.rpc_compat.create_connection',
'neutron.context.get_admin_context_without_session',
'neutron.openstack.common.'
'loopingcall.FixedIntervalLoopingCall']:

View File

@ -47,7 +47,7 @@ class TestIPsecDeviceDriver(base.BaseTestCase):
'os.makedirs',
'os.path.isdir',
'neutron.agent.linux.utils.replace_file',
'neutron.openstack.common.rpc.create_connection',
'neutron.common.rpc_compat.create_connection',
'neutron.services.vpn.device_drivers.ipsec.'
'OpenSwanProcess._gen_config_content',
'shutil.rmtree',

View File

@ -44,7 +44,7 @@ class TestCiscoIPsecDriverValidation(base.BaseTestCase):
def setUp(self):
super(TestCiscoIPsecDriverValidation, self).setUp()
mock.patch('neutron.openstack.common.rpc.create_connection').start()
mock.patch('neutron.common.rpc_compat.create_connection').start()
self.service_plugin = mock.Mock()
self.driver = ipsec_driver.CiscoCsrIPsecVPNDriver(self.service_plugin)
self.context = n_ctx.Context('some_user', 'some_tenant')
@ -284,7 +284,7 @@ class TestCiscoIPsecDriver(base.BaseTestCase):
super(TestCiscoIPsecDriver, self).setUp()
dbapi.configure_db()
self.addCleanup(dbapi.clear_db)
mock.patch('neutron.openstack.common.rpc.create_connection').start()
mock.patch('neutron.common.rpc_compat.create_connection').start()
l3_agent = mock.Mock()
l3_agent.host = FAKE_HOST

View File

@ -37,7 +37,7 @@ FAKE_HOST = 'fake_host'
class TestIPsecDriver(base.BaseTestCase):
def setUp(self):
super(TestIPsecDriver, self).setUp()
mock.patch('neutron.openstack.common.rpc.create_connection').start()
mock.patch('neutron.common.rpc_compat.create_connection').start()
l3_agent = mock.Mock()
l3_agent.host = FAKE_HOST

View File

@ -91,7 +91,7 @@ class AgentRPCMethods(base.BaseTestCase):
mock.call().consume_in_thread()
]
call_to_patch = 'neutron.openstack.common.rpc.create_connection'
call_to_patch = 'neutron.common.rpc_compat.create_connection'
with mock.patch(call_to_patch) as create_connection:
rpc.create_consumers(dispatcher, 'foo', [('topic', 'op')])
create_connection.assert_has_calls(expected)
@ -107,7 +107,7 @@ class AgentRPCMethods(base.BaseTestCase):
mock.call().consume_in_thread()
]
call_to_patch = 'neutron.openstack.common.rpc.create_connection'
call_to_patch = 'neutron.common.rpc_compat.create_connection'
with mock.patch(call_to_patch) as create_connection:
rpc.create_consumers(dispatcher, 'foo', [('topic', 'op', 'node1')])
create_connection.assert_has_calls(expected)