shim neutron rpc with neutron-lib

The common rpc and exceptions were rehomed into
neutron-lib with [1]. This patch shims those rehomed
modules in neutron to switch over to neutron-lib's
versions under the covers.

To do so:
- The rpc and common exceptions are changed to
reference their counterpart in neutron-lib effectively
swapping the impl over to neutron-lib.
- The fake_notifier is removed from neutron and lib's
version is used instead.
- The rpc tests are removed; they live in lib now.
- A few unit test related changes are required
including changing mock.patch to mock.patch.object,
changing the mock checks for a few UTs as they don't
quite work the same with the shim in place.
- Using the RPC fixture from neutron-lib rather than
that setup in neutron's base test class.

With this shim in place, consumers are effectively using
neutron-lib's RPC plumbing and thus we can move consumers
over to neutron-lib's version at will. Once all
consumers are moved over we can come back and remove
the RPC logic from neutron and follow-up with a consumption
patch.

NeutronLibImpact

[1] https://review.openstack.org/#/c/319328/

Change-Id: I87685be8764a152ac24366f13e190de9d4f6f8d8
changes/25/586525/5
Boden R 5 years ago
parent e59013b9e8
commit 4bd2f0e8f7

@ -24,6 +24,7 @@ from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources as callback_resources
from neutron_lib import constants
from neutron_lib.plugins import utils
from neutron_lib import rpc as lib_rpc
from oslo_log import log as logging
import oslo_messaging
from oslo_utils import uuidutils
@ -83,7 +84,7 @@ class PluginReportStateAPI(object):
def report_state(self, context, agent_state, use_call=False):
cctxt = self.client.prepare(
timeout=n_rpc.TRANSPORT.conf.rpc_response_timeout)
timeout=lib_rpc.TRANSPORT.conf.rpc_response_timeout)
# add unique identifier to a report
# that can be logged on server side.
# This create visible correspondence between events on

@ -13,343 +13,104 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import exceptions as e
from neutron_lib import exceptions
from neutron_lib.exceptions import l3
from neutron_lib.exceptions import qos
from neutron._i18n import _
# TODO(boden): remove rpc shims
PortBindingNotFound = e.PortBindingNotFound
class SubnetPoolNotFound(e.NotFound):
message = _("Subnet pool %(subnetpool_id)s could not be found.")
class QosPolicyNotFound(e.NotFound):
message = _("QoS policy %(policy_id)s could not be found.")
class QosRuleNotFound(e.NotFound):
message = _("QoS rule %(rule_id)s for policy %(policy_id)s "
"could not be found.")
class QoSPolicyDefaultAlreadyExists(e.Conflict):
message = _("A default QoS policy exists for project %(project_id)s.")
class PortQosBindingNotFound(e.NotFound):
message = _("QoS binding for port %(port_id)s and policy %(policy_id)s "
"could not be found.")
class PortQosBindingError(e.NeutronException):
message = _("QoS binding for port %(port_id)s and policy %(policy_id)s "
"could not be created: %(db_error)s.")
class NetworkQosBindingNotFound(e.NotFound):
message = _("QoS binding for network %(net_id)s and policy %(policy_id)s "
"could not be found.")
class FloatingIPQosBindingNotFound(e.NotFound):
message = _("QoS binding for floating IP %(fip_id)s and policy "
"%(policy_id)s could not be found.")
class FloatingIPQosBindingError(e.NeutronException):
message = _("QoS binding for floating IP %(fip_id)s and policy "
"%(policy_id)s could not be created: %(db_error)s.")
class NetworkQosBindingError(e.NeutronException):
message = _("QoS binding for network %(net_id)s and policy %(policy_id)s "
"could not be created: %(db_error)s.")
class PolicyRemoveAuthorizationError(e.NotAuthorized):
message = _("Failed to remove provided policy %(policy_id)s "
"because you are not authorized.")
class StateInvalid(e.BadRequest):
message = _("Unsupported port state: %(port_state)s.")
class QosPolicyInUse(e.InUse):
message = _("QoS Policy %(policy_id)s is used by "
"%(object_type)s %(object_id)s.")
class DhcpPortInUse(e.InUse):
message = _("Port %(port_id)s is already acquired by another DHCP agent")
class HostRoutesExhausted(e.BadRequest):
# NOTE(xchenum): probably make sense to use quota exceeded exception?
message = _("Unable to complete operation for %(subnet_id)s. "
"The number of host routes exceeds the limit %(quota)s.")
class DNSNameServersExhausted(e.BadRequest):
# NOTE(xchenum): probably make sense to use quota exceeded exception?
message = _("Unable to complete operation for %(subnet_id)s. "
"The number of DNS nameservers exceeds the limit %(quota)s.")
class FlatNetworkInUse(e.InUse):
message = _("Unable to create the flat network. "
"Physical network %(physical_network)s is in use.")
class NoNetworkFoundInMaximumAllowedAttempts(e.ServiceUnavailable):
message = _("Unable to create the network. "
"No available network found in maximum allowed attempts.")
class MalformedRequestBody(e.BadRequest):
message = _("Malformed request body: %(reason)s.")
class InvalidAllocationPool(e.BadRequest):
message = _("The allocation pool %(pool)s is not valid.")
class QosRuleNotSupported(e.Conflict):
message = _("Rule %(rule_type)s is not supported by port %(port_id)s")
class UnsupportedPortDeviceOwner(e.Conflict):
message = _("Operation %(op)s is not supported for device_owner "
"%(device_owner)s on port %(port_id)s.")
class OverlappingAllocationPools(e.Conflict):
message = _("Found overlapping allocation pools: "
"%(pool_1)s %(pool_2)s for subnet %(subnet_cidr)s.")
class OutOfBoundsAllocationPool(e.BadRequest):
message = _("The allocation pool %(pool)s spans "
"beyond the subnet cidr %(subnet_cidr)s.")
class BridgeDoesNotExist(e.NeutronException):
message = _("Bridge %(bridge)s does not exist.")
class QuotaResourceUnknown(e.NotFound):
message = _("Unknown quota resources %(unknown)s.")
class QuotaMissingTenant(e.BadRequest):
message = _("Tenant-id was missing from quota request.")
class InvalidQuotaValue(e.Conflict):
message = _("Change would make usage less than 0 for the following "
"resources: %(unders)s.")
class InvalidSharedSetting(e.Conflict):
message = _("Unable to reconfigure sharing settings for network "
"%(network)s. Multiple tenants are using it.")
class QoSRuleParameterConflict(e.Conflict):
message = _("Unable to add the rule with value %(rule_value)s to the "
"policy %(policy_id)s as the existing rule of type "
"%(existing_rule)s restricts the bandwidth to "
"%(existing_value)s.")
class QoSRulesConflict(e.Conflict):
message = _("Rule %(new_rule_type)s conflicts with "
"rule %(rule_id)s which already exists in "
"QoS Policy %(policy_id)s.")
class ExtensionsNotFound(e.NotFound):
message = _("Extensions not found: %(extensions)s.")
class GatewayConflictWithAllocationPools(e.InUse):
message = _("Gateway ip %(ip_address)s conflicts with "
"allocation pool %(pool)s.")
class GatewayIpInUse(e.InUse):
message = _("Current gateway ip %(ip_address)s already in use "
"by port %(port_id)s. Unable to update.")
class NetworkVxlanPortRangeError(e.NeutronException):
message = _("Invalid network VXLAN port range: '%(vxlan_range)s'.")
class VxlanNetworkUnsupported(e.NeutronException):
message = _("VXLAN network unsupported.")
class DuplicatedExtension(e.NeutronException):
message = _("Found duplicate extension: %(alias)s.")
class DriverCallError(e.MultipleExceptions):
def __init__(self, exc_list=None):
super(DriverCallError, self).__init__(exc_list or [])
class DeviceIDNotOwnedByTenant(e.Conflict):
message = _("The following device_id %(device_id)s is not owned by your "
"tenant or matches another tenants router.")
class InvalidCIDR(e.BadRequest):
message = _("Invalid CIDR %(input)s given as IP prefix.")
class RouterNotCompatibleWithAgent(e.NeutronException):
message = _("Router '%(router_id)s' is not compatible with this agent.")
class FailToDropPrivilegesExit(SystemExit):
"""Exit exception raised when a drop privileges action fails."""
code = 99
class FloatingIpSetupException(e.NeutronException):
def __init__(self, message=None):
self.message = message
super(FloatingIpSetupException, self).__init__()
class IpTablesApplyException(e.NeutronException):
def __init__(self, message=None):
self.message = message
super(IpTablesApplyException, self).__init__()
class NetworkIdOrRouterIdRequiredError(e.NeutronException):
message = _('Both network_id and router_id are None. '
'One must be provided.')
class AbortSyncRouters(e.NeutronException):
message = _("Aborting periodic_sync_routers_task due to an error.")
class EmptySubnetPoolPrefixList(e.BadRequest):
message = _("Empty subnet pool prefix list.")
class PrefixVersionMismatch(e.BadRequest):
message = _("Cannot mix IPv4 and IPv6 prefixes in a subnet pool.")
class UnsupportedMinSubnetPoolPrefix(e.BadRequest):
message = _("Prefix '%(prefix)s' not supported in IPv%(version)s pool.")
class IllegalSubnetPoolPrefixBounds(e.BadRequest):
message = _("Illegal prefix bounds: %(prefix_type)s=%(prefixlen)s, "
"%(base_prefix_type)s=%(base_prefixlen)s.")
class IllegalSubnetPoolPrefixUpdate(e.BadRequest):
message = _("Illegal update to prefixes: %(msg)s.")
class SubnetAllocationError(e.NeutronException):
message = _("Failed to allocate subnet: %(reason)s.")
class AddressScopePrefixConflict(e.Conflict):
message = _("Failed to associate address scope: subnetpools "
"within an address scope must have unique prefixes.")
class IllegalSubnetPoolAssociationToAddressScope(e.BadRequest):
message = _("Illegal subnetpool association: subnetpool %(subnetpool_id)s "
"cannot be associated with address scope "
"%(address_scope_id)s.")
class IllegalSubnetPoolIpVersionAssociationToAddressScope(e.BadRequest):
message = _("Illegal subnetpool association: subnetpool %(subnetpool_id)s "
"cannot associate with address scope %(address_scope_id)s "
"because subnetpool ip_version is not %(ip_version)s.")
class IllegalSubnetPoolUpdate(e.BadRequest):
message = _("Illegal subnetpool update : %(reason)s.")
class MinPrefixSubnetAllocationError(e.BadRequest):
message = _("Unable to allocate subnet with prefix length %(prefixlen)s, "
"minimum allowed prefix is %(min_prefixlen)s.")
class MaxPrefixSubnetAllocationError(e.BadRequest):
message = _("Unable to allocate subnet with prefix length %(prefixlen)s, "
"maximum allowed prefix is %(max_prefixlen)s.")
class SubnetPoolDeleteError(e.BadRequest):
message = _("Unable to delete subnet pool: %(reason)s.")
class SubnetPoolQuotaExceeded(e.OverQuota):
message = _("Per-tenant subnet pool prefix quota exceeded.")
class NetworkSubnetPoolAffinityError(e.BadRequest):
message = _("Subnets hosted on the same network must be allocated from "
"the same subnet pool.")
class ObjectActionError(e.NeutronException):
message = _('Object action %(action)s failed because: %(reason)s.')
class CTZoneExhaustedError(e.NeutronException):
message = _("IPtables conntrack zones exhausted, iptables rules cannot "
"be applied.")
class TenantQuotaNotFound(e.NotFound):
message = _("Quota for tenant %(tenant_id)s could not be found.")
class TenantIdProjectIdFilterConflict(e.BadRequest):
message = _("Both tenant_id and project_id passed as filters.")
class MultipleFilterIDForIPFound(e.Conflict):
message = _("Multiple filter IDs for IP %(ip)s found.")
class FilterIDForIPNotFound(e.NotFound):
message = _("Filter ID for IP %(ip)s could not be found.")
class FailedToAddQdiscToDevice(e.NeutronException):
message = _("Failed to add %(direction)s qdisc "
"to device %(device)s.")
class PortBindingAlreadyActive(e.Conflict):
# TODO(boden): remove lib shims
SubnetPoolNotFound = exceptions.SubnetPoolNotFound
StateInvalid = exceptions.StateInvalid
DhcpPortInUse = exceptions.DhcpPortInUse
HostRoutesExhausted = exceptions.HostRoutesExhausted
DNSNameServersExhausted = exceptions.DNSNameServersExhausted
FlatNetworkInUse = exceptions.FlatNetworkInUse
NoNetworkFoundInMaximumAllowedAttempts = \
exceptions.NoNetworkFoundInMaximumAllowedAttempts
MalformedRequestBody = exceptions.MalformedRequestBody
InvalidAllocationPool = exceptions.InvalidAllocationPool
UnsupportedPortDeviceOwner = \
exceptions.UnsupportedPortDeviceOwner
OverlappingAllocationPools = exceptions.OverlappingAllocationPools
OutOfBoundsAllocationPool = exceptions.OutOfBoundsAllocationPool
BridgeDoesNotExist = exceptions.BridgeDoesNotExist
QuotaResourceUnknown = exceptions.QuotaResourceUnknown
QuotaMissingTenant = exceptions.QuotaMissingTenant
InvalidQuotaValue = exceptions.InvalidQuotaValue
InvalidSharedSetting = exceptions.InvalidSharedSetting
ExtensionsNotFound = exceptions.ExtensionsNotFound
GatewayConflictWithAllocationPools = \
exceptions.GatewayConflictWithAllocationPools
GatewayIpInUse = exceptions.GatewayIpInUse
NetworkVxlanPortRangeError = exceptions.NetworkVxlanPortRangeError
VxlanNetworkUnsupported = exceptions.VxlanNetworkUnsupported
DuplicatedExtension = exceptions.DuplicatedExtension
DriverCallError = exceptions.DriverCallError
DeviceIDNotOwnedByTenant = exceptions.DeviceIDNotOwnedByTenant
InvalidCIDR = exceptions.InvalidCIDR
FailToDropPrivilegesExit = exceptions.FailToDropPrivilegesExit
NetworkIdOrRouterIdRequiredError = exceptions.NetworkIdOrRouterIdRequiredError
EmptySubnetPoolPrefixList = exceptions.EmptySubnetPoolPrefixList
PrefixVersionMismatch = exceptions.PrefixVersionMismatch
UnsupportedMinSubnetPoolPrefix = exceptions.UnsupportedMinSubnetPoolPrefix
IllegalSubnetPoolPrefixBounds = exceptions.IllegalSubnetPoolPrefixBounds
IllegalSubnetPoolPrefixUpdate = exceptions.IllegalSubnetPoolPrefixUpdate
SubnetAllocationError = exceptions.SubnetAllocationError
AddressScopePrefixConflict = exceptions.AddressScopePrefixConflict
IllegalSubnetPoolAssociationToAddressScope = \
exceptions.IllegalSubnetPoolAssociationToAddressScope
IllegalSubnetPoolIpVersionAssociationToAddressScope = \
exceptions.IllegalSubnetPoolIpVersionAssociationToAddressScope
IllegalSubnetPoolUpdate = exceptions.IllegalSubnetPoolUpdate
MinPrefixSubnetAllocationError = exceptions.MinPrefixSubnetAllocationError
MaxPrefixSubnetAllocationError = exceptions.MaxPrefixSubnetAllocationError
SubnetPoolDeleteError = exceptions.SubnetPoolDeleteError
SubnetPoolQuotaExceeded = exceptions.SubnetPoolQuotaExceeded
NetworkSubnetPoolAffinityError = exceptions.NetworkSubnetPoolAffinityError
ObjectActionError = exceptions.ObjectActionError
CTZoneExhaustedError = exceptions.CTZoneExhaustedError
TenantQuotaNotFound = exceptions.TenantQuotaNotFound
TenantIdProjectIdFilterConflict = exceptions.TenantIdProjectIdFilterConflict
MultipleFilterIDForIPFound = exceptions.MultipleFilterIDForIPFound
FilterIDForIPNotFound = exceptions.FilterIDForIPNotFound
FailedToAddQdiscToDevice = exceptions.FailedToAddQdiscToDevice
PortBindingNotFound = exceptions.PortBindingNotFound
QosPolicyNotFound = qos.QosPolicyNotFound
QosRuleNotFound = qos.QosRuleNotFound
QoSPolicyDefaultAlreadyExists = qos.QoSPolicyDefaultAlreadyExists
PortQosBindingNotFound = qos.PortQosBindingNotFound
PortQosBindingError = qos.PortQosBindingError
NetworkQosBindingNotFound = qos.NetworkQosBindingNotFound
FloatingIPQosBindingNotFound = qos.FloatingIPQosBindingNotFound
FloatingIPQosBindingError = qos.FloatingIPQosBindingError
NetworkQosBindingError = qos.NetworkQosBindingError
PolicyRemoveAuthorizationError = qos.PolicyRemoveAuthorizationError
QosPolicyInUse = qos.QosPolicyInUse
QosRuleNotSupported = qos.QosRuleNotSupported
QoSRuleParameterConflict = qos.QoSRuleParameterConflict
QoSRulesConflict = qos.QoSRulesConflict
RouterNotCompatibleWithAgent = l3.RouterNotCompatibleWithAgent
FloatingIpSetupException = l3.FloatingIpSetupException
IpTablesApplyException = l3.IpTablesApplyException
AbortSyncRouters = l3.AbortSyncRouters
# TODO(boden): rehome these
class PortBindingAlreadyActive(exceptions.Conflict):
message = _("Binding for port %(port_id)s on host %(host)s is already "
"active.")
class PortBindingAlreadyExists(e.Conflict):
class PortBindingAlreadyExists(exceptions.Conflict):
message = _("Binding for port %(port_id)s on host %(host)s already "
"exists.")
class PortBindingError(e.NeutronException):
class PortBindingError(exceptions.NeutronException):
message = _("Binding for port %(port_id)s on host %(host)s could not be "
"created or updated.")

@ -14,316 +14,20 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import random
import time
from neutron_lib import context
from neutron_lib import exceptions as lib_exceptions
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from oslo_messaging.rpc import dispatcher
from oslo_messaging import serializer as om_serializer
from oslo_service import service
from oslo_utils import excutils
from osprofiler import profiler
from neutron.common import exceptions
LOG = logging.getLogger(__name__)
TRANSPORT = None
NOTIFICATION_TRANSPORT = None
NOTIFIER = None
_DFT_EXMODS = [
exceptions.__name__,
lib_exceptions.__name__,
]
def init(conf, rpc_ext_mods=None):
global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
if rpc_ext_mods is None:
rpc_ext_mods = _DFT_EXMODS
else:
rpc_ext_mods = list(set(rpc_ext_mods + _DFT_EXMODS))
TRANSPORT = oslo_messaging.get_rpc_transport(
conf, allowed_remote_exmods=rpc_ext_mods)
NOTIFICATION_TRANSPORT = oslo_messaging.get_notification_transport(
conf, allowed_remote_exmods=rpc_ext_mods)
serializer = RequestContextSerializer()
NOTIFIER = oslo_messaging.Notifier(NOTIFICATION_TRANSPORT,
serializer=serializer)
def cleanup():
global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
if TRANSPORT is None:
raise AssertionError("'TRANSPORT' must not be None")
if NOTIFICATION_TRANSPORT is None:
raise AssertionError("'NOTIFICATION_TRANSPORT' must not be None")
if NOTIFIER is None:
raise AssertionError("'NOTIFIER' must not be None")
TRANSPORT.cleanup()
NOTIFICATION_TRANSPORT.cleanup()
_BackingOffContextWrapper.reset_timeouts()
TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None
def _get_default_method_timeout():
return TRANSPORT.conf.rpc_response_timeout
def _get_default_method_timeouts():
return collections.defaultdict(_get_default_method_timeout)
class _ContextWrapper(object):
def __init__(self, original_context):
self._original_context = original_context
def __getattr__(self, name):
return getattr(self._original_context, name)
def cast(self, ctxt, method, **kwargs):
try:
self._original_context.cast(ctxt, method, **kwargs)
except Exception as e:
# TODO(kevinbenton): make catch specific to missing exchange once
# bug/1705351 is resolved on the oslo.messaging side; if
# oslo.messaging auto-creates the exchange, then just remove the
# code completely
LOG.debug("Ignored exception during cast: %e", e)
class _BackingOffContextWrapper(_ContextWrapper):
"""Wraps oslo messaging contexts to set the timeout for calls.
This intercepts RPC calls and sets the timeout value to the globally
adapting value for each method. An oslo messaging timeout results in
a doubling of the timeout value for the method on which it timed out.
There currently is no logic to reduce the timeout since busy Neutron
servers are more frequently the cause of timeouts rather than lost
messages.
"""
_METHOD_TIMEOUTS = _get_default_method_timeouts()
_max_timeout = None
@classmethod
def reset_timeouts(cls):
# restore the original default timeout factory
cls._METHOD_TIMEOUTS = _get_default_method_timeouts()
cls._max_timeout = None
@classmethod
def get_max_timeout(cls):
return cls._max_timeout or _get_default_method_timeout() * 10
@classmethod
def set_max_timeout(cls, max_timeout):
if max_timeout < cls.get_max_timeout():
cls._METHOD_TIMEOUTS = collections.defaultdict(
lambda: max_timeout, **{
k: min(v, max_timeout)
for k, v in cls._METHOD_TIMEOUTS.items()
})
cls._max_timeout = max_timeout
def call(self, ctxt, method, **kwargs):
# two methods with the same name in different namespaces should
# be tracked independently
if self._original_context.target.namespace:
scoped_method = '%s.%s' % (self._original_context.target.namespace,
method)
else:
scoped_method = method
# set the timeout from the global method timeout tracker for this
# method
self._original_context.timeout = self._METHOD_TIMEOUTS[scoped_method]
try:
return self._original_context.call(ctxt, method, **kwargs)
except oslo_messaging.MessagingTimeout:
with excutils.save_and_reraise_exception():
wait = random.uniform(
0,
min(self._METHOD_TIMEOUTS[scoped_method],
TRANSPORT.conf.rpc_response_timeout)
)
LOG.error("Timeout in RPC method %(method)s. Waiting for "
"%(wait)s seconds before next attempt. If the "
"server is not down, consider increasing the "
"rpc_response_timeout option as Neutron "
"server(s) may be overloaded and unable to "
"respond quickly enough.",
{'wait': int(round(wait)), 'method': scoped_method})
new_timeout = min(
self._original_context.timeout * 2, self.get_max_timeout())
if new_timeout > self._METHOD_TIMEOUTS[scoped_method]:
LOG.warning("Increasing timeout for %(method)s calls "
"to %(new)s seconds. Restart the agent to "
"restore it to the default value.",
{'method': scoped_method, 'new': new_timeout})
self._METHOD_TIMEOUTS[scoped_method] = new_timeout
time.sleep(wait)
class BackingOffClient(oslo_messaging.RPCClient):
"""An oslo messaging RPC Client that implements a timeout backoff.
This has all of the same interfaces as oslo_messaging.RPCClient but
if the timeout parameter is not specified, the _BackingOffContextWrapper
returned will track when call timeout exceptions occur and exponentially
increase the timeout for the given call method.
"""
def prepare(self, *args, **kwargs):
ctx = super(BackingOffClient, self).prepare(*args, **kwargs)
# don't back off contexts that explicitly set a timeout
if 'timeout' in kwargs:
return _ContextWrapper(ctx)
return _BackingOffContextWrapper(ctx)
@staticmethod
def set_max_timeout(max_timeout):
'''Set RPC timeout ceiling for all backing-off RPC clients.'''
_BackingOffContextWrapper.set_max_timeout(max_timeout)
def get_client(target, version_cap=None, serializer=None):
if TRANSPORT is None:
raise AssertionError("'TRANSPORT' must not be None")
serializer = RequestContextSerializer(serializer)
return BackingOffClient(TRANSPORT,
target,
version_cap=version_cap,
serializer=serializer)
def get_server(target, endpoints, serializer=None):
if TRANSPORT is None:
raise AssertionError("'TRANSPORT' must not be None")
serializer = RequestContextSerializer(serializer)
access_policy = dispatcher.DefaultRPCAccessPolicy
return oslo_messaging.get_rpc_server(TRANSPORT, target, endpoints,
'eventlet', serializer,
access_policy=access_policy)
def get_notifier(service=None, host=None, publisher_id=None):
if NOTIFIER is None:
raise AssertionError("'NOTIFIER' must not be None")
if not publisher_id:
publisher_id = "%s.%s" % (service, host or cfg.CONF.host)
return NOTIFIER.prepare(publisher_id=publisher_id)
class RequestContextSerializer(om_serializer.Serializer):
"""This serializer is used to convert RPC common context into
Neutron Context.
"""
def __init__(self, base=None):
super(RequestContextSerializer, self).__init__()
self._base = base
def serialize_entity(self, ctxt, entity):
if not self._base:
return entity
return self._base.serialize_entity(ctxt, entity)
def deserialize_entity(self, ctxt, entity):
if not self._base:
return entity
return self._base.deserialize_entity(ctxt, entity)
def serialize_context(self, ctxt):
_context = ctxt.to_dict()
prof = profiler.get()
if prof:
trace_info = {
"hmac_key": prof.hmac_key,
"base_id": prof.get_base_id(),
"parent_id": prof.get_id()
}
_context['trace_info'] = trace_info
return _context
def deserialize_context(self, ctxt):
rpc_ctxt_dict = ctxt.copy()
trace_info = rpc_ctxt_dict.pop("trace_info", None)
if trace_info:
profiler.init(**trace_info)
return context.Context.from_dict(rpc_ctxt_dict)
@profiler.trace_cls("rpc")
class Service(service.Service):
"""Service object for binaries running on hosts.
A service enables rpc by listening to queues based on topic and host.
"""
def __init__(self, host, topic, manager=None, serializer=None):
super(Service, self).__init__()
self.host = host
self.topic = topic
self.serializer = serializer
if manager is None:
self.manager = self
else:
self.manager = manager
def start(self):
super(Service, self).start()
self.conn = Connection()
LOG.debug("Creating Consumer connection for Service %s",
self.topic)
endpoints = [self.manager]
self.conn.create_consumer(self.topic, endpoints)
# Hook to allow the manager to do other initializations after
# the rpc connection is created.
if callable(getattr(self.manager, 'initialize_service_hook', None)):
self.manager.initialize_service_hook(self)
# Consume from all consumers in threads
self.conn.consume_in_threads()
def stop(self):
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.conn.close()
except Exception: # nosec
pass
super(Service, self).stop()
class Connection(object):
def __init__(self):
super(Connection, self).__init__()
self.servers = []
def create_consumer(self, topic, endpoints, fanout=False):
target = oslo_messaging.Target(
topic=topic, server=cfg.CONF.host, fanout=fanout)
server = get_server(target, endpoints)
self.servers.append(server)
def consume_in_threads(self):
for server in self.servers:
server.start()
return self.servers
def close(self):
for server in self.servers:
server.stop()
for server in self.servers:
server.wait()
from neutron_lib import rpc
# TODO(boden): remove lib rpc shims
TRANSPORT = rpc.TRANSPORT
NOTIFICATION_TRANSPORT = rpc.NOTIFICATION_TRANSPORT
NOTIFIER = rpc.NOTIFIER
init = rpc.init
cleanup = rpc.cleanup
BackingOffClient = rpc.BackingOffClient
get_client = rpc.get_client
get_server = rpc.get_server
get_notifier = rpc.get_notifier
RequestContextSerializer = rpc.RequestContextSerializer
Service = rpc.Service
Connection = rpc.Connection

@ -29,10 +29,10 @@ import mock
from neutron_lib.callbacks import manager as registry_manager
from neutron_lib.db import api as db_api
from neutron_lib import fixture
from neutron_lib.tests.unit import fake_notifier
from oslo_concurrency.fixture import lockutils
from oslo_config import cfg
from oslo_db import options as db_options
from oslo_messaging import conffixture as messaging_conffixture
from oslo_utils import excutils
from oslo_utils import fileutils
from oslo_utils import strutils
@ -45,7 +45,6 @@ from neutron.agent.linux import external_process
from neutron.api.rpc.callbacks.consumer import registry as rpc_consumer_reg
from neutron.api.rpc.callbacks.producer import registry as rpc_producer_reg
from neutron.common import config
from neutron.common import rpc as n_rpc
from neutron.conf.agent import common as agent_config
from neutron.db import _model_query as model_query
from neutron.db import _resource_extend as resource_extend
@ -53,7 +52,6 @@ from neutron.db import agentschedulers_db
from neutron import manager
from neutron import policy
from neutron.quota import resource_registry
from neutron.tests import fake_notifier
from neutron.tests import post_mortem_debug
from neutron.tests import tools
@ -325,7 +323,8 @@ class BaseTestCase(DietTestCase):
'oslo_config.cfg.find_config_files',
lambda project=None, prog=None, extension=None: []))
self.setup_rpc_mocks()
self.useFixture(fixture.RPCFixture())
self.setup_config()
self._callback_manager = registry_manager.CallbacksManager()
@ -377,24 +376,6 @@ class BaseTestCase(DietTestCase):
root = root or self.get_default_temp_dir()
return root.join(filename)
def setup_rpc_mocks(self):
# don't actually start RPC listeners when testing
mock.patch(
'neutron.common.rpc.Connection.consume_in_threads',
return_value=[]).start()
self.useFixture(fixtures.MonkeyPatch(
'oslo_messaging.Notifier', fake_notifier.FakeNotifier))
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
self.messaging_conf.transport_url = 'fake://'
# NOTE(russellb) We want all calls to return immediately.
self.messaging_conf.response_timeout = 0
self.useFixture(self.messaging_conf)
self.addCleanup(n_rpc.cleanup)
n_rpc.init(CONF)
def setup_config(self, args=None):
"""Tests that need a non-default config can override this method."""
self.config_parse(args=args)

@ -1,52 +0,0 @@
# Copyright 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import functools
NOTIFICATIONS = []
def reset():
del NOTIFICATIONS[:]
FakeMessage = collections.namedtuple('Message',
['publisher_id', 'priority',
'event_type', 'payload'])
class FakeNotifier(object):
def __init__(self, transport, publisher_id=None,
driver=None, topics=None,
serializer=None, retry=None):
self.transport = transport
self.publisher_id = publisher_id
for priority in ('debug', 'info', 'warn', 'error', 'critical'):
setattr(self, priority,
functools.partial(self._notify, priority=priority.upper()))
def prepare(self, publisher_id=None):
if publisher_id is None:
publisher_id = self.publisher_id
return self.__class__(self.transport, publisher_id)
def _notify(self, ctxt, event_type, payload, priority):
msg = dict(publisher_id=self.publisher_id,
priority=priority,
event_type=event_type,
payload=payload)
NOTIFICATIONS.append(msg)

@ -26,6 +26,7 @@ from neutron.agent.l3 import router_info as l3router
from neutron.api.rpc.callbacks.consumer import registry
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import rpc as n_rpc
from neutron.objects.qos import policy
from neutron.objects.qos import rule
from neutron.tests import base
@ -136,8 +137,7 @@ class FipQosExtensionInitializeTestCase(QosExtensionBaseTestCase):
@mock.patch.object(registry, 'register')
@mock.patch.object(resources_rpc, 'ResourcesPushRpcCallback')
def test_initialize_subscribed_to_rpc(self, rpc_mock, subscribe_mock):
call_to_patch = 'neutron.common.rpc.Connection'
with mock.patch(call_to_patch,
with mock.patch.object(n_rpc, 'Connection',
return_value=self.connection) as create_connection:
self.fip_qos_ext.initialize(
self.connection, lib_const.L3_AGENT_MODE)

@ -26,6 +26,7 @@ from oslo_utils import uuidutils
from neutron.agent import rpc
from neutron.common import constants as n_const
from neutron.common import rpc as n_rpc
from neutron.objects import network
from neutron.objects import ports
from neutron.tests import base
@ -125,8 +126,7 @@ class AgentRPCMethods(base.BaseTestCase):
def _test_create_consumers(
self, endpoints, method, expected, topics, listen):
call_to_patch = 'neutron.common.rpc.Connection'
with mock.patch(call_to_patch) as create_connection:
with mock.patch.object(n_rpc, 'Connection') as create_connection:
rpc.create_consumers(
endpoints, method, topics, start_listening=listen)
create_connection.assert_has_calls(expected)
@ -167,8 +167,7 @@ class AgentRPCMethods(base.BaseTestCase):
mock.call().consume_in_threads()
]
call_to_patch = 'neutron.common.rpc.Connection'
with mock.patch(call_to_patch) as create_connection:
with mock.patch.object(n_rpc, 'Connection') as create_connection:
rpc.create_consumers(endpoints, 'foo', [('topic', 'op', 'node1')])
create_connection.assert_has_calls(expected)

@ -16,6 +16,7 @@
import mock
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.common import rpc
from neutron.tests import base
@ -23,8 +24,8 @@ class TestL3AgentNotifyAPI(base.BaseTestCase):
def setUp(self):
super(TestL3AgentNotifyAPI, self).setUp()
self.rpc_client_mock = mock.patch(
'neutron.common.rpc.get_client').start().return_value
self.rpc_client_mock = mock.patch.object(
rpc, 'get_client').start().return_value
self.l3_notifier = l3_rpc_agent_api.L3AgentNotifyAPI()
def _test_arp_update(self, method):

@ -24,6 +24,7 @@ from neutron_lib import context
from neutron_lib import exceptions as n_exc
from neutron_lib import fixture
from neutron_lib.plugins import directory
from neutron_lib.tests.unit import fake_notifier
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_policy import policy as oslo_policy
@ -42,7 +43,6 @@ from neutron import policy
from neutron import quota
from neutron.quota import resource_registry
from neutron.tests import base
from neutron.tests import fake_notifier
from neutron.tests import tools
from neutron.tests.unit import dummy_plugin
from neutron.tests.unit import testlib_api

@ -1,499 +0,0 @@
# Copyright 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import fixtures
import mock
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_messaging import conffixture as messaging_conffixture
from oslo_messaging.rpc import dispatcher
import testtools
from neutron.common import rpc
from neutron.tests import base
CONF = cfg.CONF
CONF.import_opt('state_path', 'neutron.conf.common')
class RPCFixture(fixtures.Fixture):
def _setUp(self):
self.trans = copy.copy(rpc.TRANSPORT)
self.noti_trans = copy.copy(rpc.NOTIFICATION_TRANSPORT)
self.noti = copy.copy(rpc.NOTIFIER)
self.addCleanup(self._reset_everything)
def _reset_everything(self):
rpc.TRANSPORT = self.trans
rpc.NOTIFICATION_TRANSPORT = self.noti_trans
rpc.NOTIFIER = self.noti
class TestRPC(base.DietTestCase):
def setUp(self):
super(TestRPC, self).setUp()
self.useFixture(RPCFixture())
@mock.patch.object(rpc, 'RequestContextSerializer')
@mock.patch.object(messaging, 'get_rpc_transport')
@mock.patch.object(messaging, 'get_notification_transport')
@mock.patch.object(messaging, 'Notifier')
def test_init(self, mock_not, mock_noti_trans, mock_trans, mock_ser):
notifier = mock.Mock()
transport = mock.Mock()
noti_transport = mock.Mock()
serializer = mock.Mock()
conf = mock.Mock()
mock_trans.return_value = transport
mock_noti_trans.return_value = noti_transport
mock_ser.return_value = serializer
mock_not.return_value = notifier
rpc.init(conf, rpc_ext_mods=['foo'])
expected_mods = list(set(['foo'] + rpc._DFT_EXMODS))
mock_trans.assert_called_once_with(
conf, allowed_remote_exmods=expected_mods)
mock_noti_trans.assert_called_once_with(
conf, allowed_remote_exmods=expected_mods)
mock_not.assert_called_once_with(noti_transport,
serializer=serializer)
self.assertIsNotNone(rpc.TRANSPORT)
self.assertIsNotNone(rpc.NOTIFICATION_TRANSPORT)
self.assertIsNotNone(rpc.NOTIFIER)
def test_cleanup_transport_null(self):
rpc.NOTIFIER = mock.Mock()
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
self.assertRaises(AssertionError, rpc.cleanup)
def test_cleanup_notification_transport_null(self):
rpc.TRANSPORT = mock.Mock()
rpc.NOTIFIER = mock.Mock()
self.assertRaises(AssertionError, rpc.cleanup)
def test_cleanup_notifier_null(self):
rpc.TRANSPORT = mock.Mock()
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
self.assertRaises(AssertionError, rpc.cleanup)
def test_cleanup(self):
rpc.NOTIFIER = mock.Mock()
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
rpc.TRANSPORT = mock.Mock()
trans_cleanup = mock.Mock()
not_trans_cleanup = mock.Mock()
rpc.TRANSPORT.cleanup = trans_cleanup
rpc.NOTIFICATION_TRANSPORT.cleanup = not_trans_cleanup
rpc.cleanup()
trans_cleanup.assert_called_once_with()
not_trans_cleanup.assert_called_once_with()
self.assertIsNone(rpc.TRANSPORT)
self.assertIsNone(rpc.NOTIFICATION_TRANSPORT)
self.assertIsNone(rpc.NOTIFIER)
@mock.patch.object(rpc, 'RequestContextSerializer')
@mock.patch.object(rpc, 'BackingOffClient')
def test_get_client(self, mock_client, mock_ser):
rpc.TRANSPORT = mock.Mock()
tgt = mock.Mock()
ser = mock.Mock()
mock_client.return_value = 'client'
mock_ser.return_value = ser
client = rpc.get_client(tgt, version_cap='1.0', serializer='foo')
mock_ser.assert_called_once_with('foo')
mock_client.assert_called_once_with(rpc.TRANSPORT,
tgt, version_cap='1.0',
serializer=ser)
self.assertEqual('client', client)
@mock.patch.object(rpc, 'RequestContextSerializer')
@mock.patch.object(messaging, 'get_rpc_server')
def test_get_server(self, mock_get, mock_ser):
rpc.TRANSPORT = mock.Mock()
ser = mock.Mock()
tgt = mock.Mock()
ends = mock.Mock()
mock_ser.return_value = ser
mock_get.return_value = 'server'
server = rpc.get_server(tgt, ends, serializer='foo')
mock_ser.assert_called_once_with('foo')
access_policy = dispatcher.DefaultRPCAccessPolicy
mock_get.assert_called_once_with(rpc.TRANSPORT, tgt, ends,
'eventlet', ser,
access_policy=access_policy)
self.assertEqual('server', server)
def test_get_notifier(self):
rpc.NOTIFIER = mock.Mock()
mock_prep = mock.Mock()
mock_prep.return_value = 'notifier'
rpc.NOTIFIER.prepare = mock_prep
notifier = rpc.get_notifier('service', publisher_id='foo')
mock_prep.assert_called_once_with(publisher_id='foo')
self.assertEqual('notifier', notifier)
def test_get_notifier_null_publisher(self):
rpc.NOTIFIER = mock.Mock()
mock_prep = mock.Mock()
mock_prep.return_value = 'notifier'
rpc.NOTIFIER.prepare = mock_prep
notifier = rpc.get_notifier('service', host='bar')
mock_prep.assert_called_once_with(publisher_id='service.bar')
self.assertEqual('notifier', notifier)
class TestRequestContextSerializer(base.DietTestCase):
def setUp(self):
super(TestRequestContextSerializer, self).setUp()
self.mock_base = mock.Mock()
self.ser = rpc.RequestContextSerializer(self.mock_base)
self.ser_null = rpc.RequestContextSerializer(None)
def test_serialize_entity(self):
self.mock_base.serialize_entity.return_value = 'foo'
ser_ent = self.ser.serialize_entity('context', 'entity')
self.mock_base.serialize_entity.assert_called_once_with('context',
'entity')
self.assertEqual('foo', ser_ent)
def test_deserialize_entity(self):
self.mock_base.deserialize_entity.return_value = 'foo'
deser_ent = self.ser.deserialize_entity('context', 'entity')
self.mock_base.deserialize_entity.assert_called_once_with('context',
'entity')
self.assertEqual('foo', deser_ent)
def test_deserialize_entity_null_base(self):
deser_ent = self.ser_null.deserialize_entity('context', 'entity')
self.assertEqual('entity', deser_ent)
def test_serialize_context(self):
context = mock.Mock()
self.ser.serialize_context(context)
context.to_dict.assert_called_once_with()
def test_deserialize_context(self):
context_dict = {'foo': 'bar',
'user_id': 1,
'tenant_id': 1,
'is_admin': True}
c = self.ser.deserialize_context(context_dict)
self.assertEqual(1, c.user_id)
self.assertEqual(1, c.project_id)
def test_deserialize_context_no_user_id(self):
context_dict = {'foo': 'bar',
'user': 1,
'tenant_id': 1,
'is_admin': True}
c = self.ser.deserialize_context(context_dict)
self.assertEqual(1, c.user_id)
self.assertEqual(1, c.project_id)
def test_deserialize_context_no_tenant_id(self):
context_dict = {'foo': 'bar',
'user_id': 1,
'project_id': 1,
'is_admin': True}
c = self.ser.deserialize_context(context_dict)
self.assertEqual(1, c.user_id)
self.assertEqual(1, c.project_id)
def test_deserialize_context_no_ids(self):
context_dict = {'foo': 'bar', 'is_admin': True}
c = self.ser.deserialize_context(context_dict)
self.assertIsNone(c.user_id)
self.assertIsNone(c.project_id)
class ServiceTestCase(base.DietTestCase):
# the class cannot be based on BaseTestCase since it mocks rpc.Connection
def setUp(self):
super(ServiceTestCase, self).setUp()
self.host = 'foo'
self.topic = 'neutron-agent'
self.target_mock = mock.patch('oslo_messaging.Target')
self.target_mock.start()
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
self.messaging_conf.transport_url = 'fake://'
self.messaging_conf.response_timeout = 0
self.useFixture(self.messaging_conf)
self.addCleanup(rpc.cleanup)
rpc.init(CONF)
def test_operations(self):
with mock.patch('oslo_messaging.get_rpc_server') as get_rpc_server:
rpc_server = get_rpc_server.return_value
service = rpc.Service(self.host, self.topic)
service.start()
rpc_server.start.assert_called_once_with()
service.stop()
rpc_server.stop.assert_called_once_with()
rpc_server.wait.assert_called_once_with()
class TimeoutTestCase(base.DietTestCase):
def setUp(self):
super(TimeoutTestCase, self).setUp()
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
self.messaging_conf.transport_url = 'fake://'
self.messaging_conf.response_timeout = 0
self.useFixture(self.messaging_conf)
self.addCleanup(rpc.cleanup)
rpc.init(CONF)
rpc.TRANSPORT = mock.MagicMock()
rpc.TRANSPORT._send.side_effect = messaging.MessagingTimeout
target = messaging.Target(version='1.0', topic='testing')
self.client = rpc.get_client(target)
self.call_context = mock.Mock()
self.sleep = mock.patch('time.sleep').start()
rpc.TRANSPORT.conf.rpc_response_timeout = 10
def test_timeout_unaffected_when_explicitly_set(self):
rpc.TRANSPORT.conf.rpc_response_timeout = 5
ctx = self.client.prepare(topic='sandwiches', timeout=77)
with testtools.ExpectedException(messaging.MessagingTimeout):
ctx.call(self.call_context, 'create_pb_and_j')
# ensure that the timeout was not increased and the back-off sleep
# wasn't called
self.assertEqual(
5,
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['create_pb_and_j'])
self.assertFalse(self.sleep.called)
def test_timeout_store_defaults(self):
# any method should default to the configured timeout
self.assertEqual(
rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
self.assertEqual(
rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'])
# a change to an existing should not affect new or existing ones
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'] = 7000
self.assertEqual(
rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
self.assertEqual(
rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_3'])
def test_method_timeout_sleep(self):
rpc.TRANSPORT.conf.rpc_response_timeout = 2
for i in range(100):
with testtools.ExpectedException(messaging.MessagingTimeout):
self.client.call(self.call_context, 'method_1')
# sleep value should always be between 0 and configured timeout
self.assertGreaterEqual(self.sleep.call_args_list[0][0][0], 0)
self.assertLessEqual(self.sleep.call_args_list[0][0][0], 2)
self.sleep.reset_mock()
def test_method_timeout_increases_on_timeout_exception(self):
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
for i in range(5):
with testtools.ExpectedException(messaging.MessagingTimeout):
self.client.call(self.call_context, 'method_1')
# we only care to check the timeouts sent to the transport
timeouts = [call[1]['timeout']
for call in rpc.TRANSPORT._send.call_args_list]
self.assertEqual([1, 2, 4, 8, 16], timeouts)
def test_method_timeout_10x_config_ceiling(self):
rpc.TRANSPORT.conf.rpc_response_timeout = 10
# 5 doublings should max out at the 10xdefault ceiling
for i in range(5):
with testtools.ExpectedException(messaging.MessagingTimeout):
self.client.call(self.call_context, 'method_1')
self.assertEqual(
10 * rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
with testtools.ExpectedException(messaging.MessagingTimeout):
self.client.call(self.call_context, 'method_1')
self.assertEqual(
10 * rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
def test_timeout_unchanged_on_other_exception(self):
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
rpc.TRANSPORT._send.side_effect = ValueError
with testtools.ExpectedException(ValueError):
self.client.call(self.call_context, 'method_1')
rpc.TRANSPORT._send.side_effect = messaging.MessagingTimeout
with testtools.ExpectedException(messaging.MessagingTimeout):
self.client.call(self.call_context, 'method_1')
timeouts = [call[1]['timeout']
for call in rpc.TRANSPORT._send.call_args_list]
self.assertEqual([1, 1], timeouts)
def test_timeouts_for_methods_tracked_independently(self):
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'] = 1
for method in ('method_1', 'method_1', 'method_2',
'method_1', 'method_2'):
with testtools.ExpectedException(messaging.MessagingTimeout):
self.client.call(self.call_context, method)
timeouts = [call[1]['timeout']
for call in rpc.TRANSPORT._send.call_args_list]
self.assertEqual([1, 2, 1, 4, 2], timeouts)
def test_timeouts_for_namespaces_tracked_independently(self):
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['ns1.method'] = 1
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['ns2.method'] = 1
for ns in ('ns1', 'ns2'):
self.client.target.namespace = ns
for i in range(4):
with testtools.ExpectedException(messaging.MessagingTimeout):
self.client.call(self.call_context, 'method')
timeouts = [call[1]['timeout']
for call in rpc.TRANSPORT._send.call_args_list]
self.assertEqual([1, 2, 4, 8, 1, 2, 4, 8], timeouts)
def test_method_timeout_increases_with_prepare(self):
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
ctx = self.client.prepare(version='1.4')
with testtools.ExpectedException(messaging.MessagingTimeout):
ctx.call(self.call_context, 'method_1')
with testtools.ExpectedException(messaging.MessagingTimeout):
ctx.call(self.call_context, 'method_1')
# we only care to check the timeouts sent to the transport
timeouts = [call[1]['timeout']
for call in rpc.TRANSPORT._send.call_args_list]