rehome rpc and related plumbing
As shown in the history of this patch, along with the work in [1], we've discussed rehoming neutron.common.rpc into lib a number of times before. One of the main reasons we've decided not to rehome in the past is that we'd hoped the neutron.common.rcp.BackingOffClient and related plumbing could be put into oslo.messaging. However, it's 2 years later and that still hasn't happened [2][3]. This patch proposes we just move forward with the rehome so that we can begin to untangle the consumers [4]. There's no reason we can't reiterate on this code in lib; it's no more difficult than in neutron. This patch includes rehoming of: - neutron.common.rpc, the only difference in the lib version is that we dynamically add all neutron_lib.exceptions by default (_DFT_EXMODS). - neutron.common.exceptions, but exceptions are broken out into their respective exception modules rather than lumping all together in a generic single module. - The fake notifier and RPC fixture, without any real changes. - A new runtime util method to dynamically load all modules for a package. For a sample neutron consumption patch see [5] that was tested with PS10 herein. [1] https://review.openstack.org/#/q/project:openstack/neutron-lib+message:rpc [2] https://review.openstack.org/#/c/407722/ [3] https://bugs.launchpad.net/oslo.messaging/+bug/1667445 [4] http://codesearch.openstack.org/?q=from%20neutron.common%20import%20rpc [5] https://review.openstack.org/#/c/579989/ Change-Id: I0052ba65973a993e088943056879bc6e982bd0b5
This commit is contained in:
parent
8cbab5756c
commit
a37d43018b
@ -32,3 +32,4 @@ Neutron Lib Internals
|
||||
api_validators
|
||||
callbacks
|
||||
db_model_query
|
||||
rpc_api
|
||||
|
175
doc/source/contributor/rpc_api.rst
Normal file
175
doc/source/contributor/rpc_api.rst
Normal file
@ -0,0 +1,175 @@
|
||||
..
|
||||
Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
not use this file except in compliance with the License. You may obtain
|
||||
a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
License for the specific language governing permissions and limitations
|
||||
under the License.
|
||||
|
||||
|
||||
Convention for heading levels in Neutron devref:
|
||||
======= Heading 0 (reserved for the title in a document)
|
||||
------- Heading 1
|
||||
~~~~~~~ Heading 2
|
||||
+++++++ Heading 3
|
||||
''''''' Heading 4
|
||||
(Avoid deeper levels because they do not render well.)
|
||||
|
||||
|
||||
Neutron RPC API Layer
|
||||
=====================
|
||||
|
||||
Neutron uses the oslo.messaging library to provide an internal communication
|
||||
channel between Neutron services. This communication is typically done via
|
||||
AMQP, but those details are mostly hidden by the use of oslo.messaging and it
|
||||
could be some other protocol in the future.
|
||||
|
||||
RPC APIs are defined in Neutron in two parts: client side and server side.
|
||||
|
||||
Client Side
|
||||
-----------
|
||||
|
||||
Here is an example of an rpc client definition:
|
||||
|
||||
::
|
||||
|
||||
import oslo_messaging
|
||||
|
||||
from neutron_lib import rpc as n_rpc
|
||||
|
||||
|
||||
class ClientAPI(object):
|
||||
"""Client side RPC interface definition.
|
||||
|
||||
API version history:
|
||||
1.0 - Initial version
|
||||
1.1 - Added my_remote_method_2
|
||||
"""
|
||||
|
||||
def __init__(self, topic):
|
||||
target = oslo_messaging.Target(topic=topic, version='1.0')
|
||||
self.client = n_rpc.get_client(target)
|
||||
|
||||
def my_remote_method(self, context, arg1, arg2):
|
||||
cctxt = self.client.prepare()
|
||||
return cctxt.call(context, 'my_remote_method', arg1=arg1, arg2=arg2)
|
||||
|
||||
def my_remote_method_2(self, context, arg1):
|
||||
cctxt = self.client.prepare(version='1.1')
|
||||
return cctxt.call(context, 'my_remote_method_2', arg1=arg1)
|
||||
|
||||
|
||||
This class defines the client side interface for an rpc API. The interface has
|
||||
2 methods. The first method existed in version 1.0 of the interface. The
|
||||
second method was added in version 1.1. When the newer method is called, it
|
||||
specifies that the remote side must implement at least version 1.1 to handle
|
||||
this request.
|
||||
|
||||
Server Side
|
||||
-----------
|
||||
|
||||
The server side of an rpc interface looks like this:
|
||||
|
||||
::
|
||||
|
||||
import oslo_messaging
|
||||
|
||||
|
||||
class ServerAPI(object):
|
||||
|
||||
target = oslo_messaging.Target(version='1.1')
|
||||
|
||||
def my_remote_method(self, context, arg1, arg2):
|
||||
return 'foo'
|
||||
|
||||
def my_remote_method_2(self, context, arg1):
|
||||
return 'bar'
|
||||
|
||||
|
||||
This class implements the server side of the interface. The
|
||||
oslo_messaging.Target() defined says that this class currently implements
|
||||
version 1.1 of the interface.
|
||||
|
||||
.. _rpc_versioning:
|
||||
|
||||
Versioning
|
||||
----------
|
||||
|
||||
Note that changes to rpc interfaces must always be done in a backwards
|
||||
compatible way. The server side should always be able to handle older clients
|
||||
(within the same major version series, such as 1.X).
|
||||
|
||||
It is possible to bump the major version number and drop some code only needed
|
||||
for backwards compatibility. For more information about how to do that, see
|
||||
https://wiki.openstack.org/wiki/RpcMajorVersionUpdates.
|
||||
|
||||
Example Change
|
||||
~~~~~~~~~~~~~~
|
||||
|
||||
As an example minor API change, let's assume we want to add a new parameter to
|
||||
my_remote_method_2. First, we add the argument on the server side. To be
|
||||
backwards compatible, the new argument must have a default value set so that the
|
||||
interface will still work even if the argument is not supplied. Also, the
|
||||
interface's minor version number must be incremented. So, the new server side
|
||||
code would look like this:
|
||||
|
||||
::
|
||||
|
||||
import oslo_messaging
|
||||
|
||||
|
||||
class ServerAPI(object):
|
||||
|
||||
target = oslo_messaging.Target(version='1.2')
|
||||
|
||||
def my_remote_method(self, context, arg1, arg2):
|
||||
return 'foo'
|
||||
|
||||
def my_remote_method_2(self, context, arg1, arg2=None):
|
||||
if not arg2:
|
||||
# Deal with the fact that arg2 was not specified if needed.
|
||||
return 'bar'
|
||||
|
||||
We can now update the client side to pass the new argument. The client must
|
||||
also specify that version '1.2' is required for this method call to be
|
||||
successful. The updated client side would look like this:
|
||||
|
||||
::
|
||||
|
||||
import oslo_messaging
|
||||
|
||||
from neutron.common import rpc as n_rpc
|
||||
|
||||
|
||||
class ClientAPI(object):
|
||||
"""Client side RPC interface definition.
|
||||
|
||||
API version history:
|
||||
1.0 - Initial version
|
||||
1.1 - Added my_remote_method_2
|
||||
1.2 - Added arg2 to my_remote_method_2
|
||||
"""
|
||||
|
||||
def __init__(self, topic):
|
||||
target = oslo_messaging.Target(topic=topic, version='1.0')
|
||||
self.client = n_rpc.get_client(target)
|
||||
|
||||
def my_remote_method(self, context, arg1, arg2):
|
||||
cctxt = self.client.prepare()
|
||||
return cctxt.call(context, 'my_remote_method', arg1=arg1, arg2=arg2)
|
||||
|
||||
def my_remote_method_2(self, context, arg1, arg2):
|
||||
cctxt = self.client.prepare(version='1.2')
|
||||
return cctxt.call(context, 'my_remote_method_2',
|
||||
arg1=arg1, arg2=arg2)
|
||||
|
||||
More Info
|
||||
---------
|
||||
|
||||
For more information, see the oslo.messaging documentation:
|
||||
https://docs.openstack.org/oslo.messaging/latest/.
|
@ -557,3 +557,227 @@ class PhysicalNetworkNameError(NeutronException):
|
||||
|
||||
class TenantIdProjectIdFilterConflict(BadRequest):
|
||||
message = _("Both tenant_id and project_id passed as filters.")
|
||||
|
||||
|
||||
class SubnetPoolNotFound(NotFound):
|
||||
message = _("Subnet pool %(subnetpool_id)s could not be found.")
|
||||
|
||||
|
||||
class StateInvalid(BadRequest):
|
||||
message = _("Unsupported port state: %(port_state)s.")
|
||||
|
||||
|
||||
class DhcpPortInUse(InUse):
|
||||
message = _("Port %(port_id)s is already acquired by another DHCP agent")
|
||||
|
||||
|
||||
class HostRoutesExhausted(BadRequest):
|
||||
# NOTE(xchenum): probably make sense to use quota exceeded exception?
|
||||
message = _("Unable to complete operation for %(subnet_id)s. "
|
||||
"The number of host routes exceeds the limit %(quota)s.")
|
||||
|
||||
|
||||
class DNSNameServersExhausted(BadRequest):
|
||||
# NOTE(xchenum): probably make sense to use quota exceeded exception?
|
||||
message = _("Unable to complete operation for %(subnet_id)s. "
|
||||
"The number of DNS nameservers exceeds the limit %(quota)s.")
|
||||
|
||||
|
||||
class FlatNetworkInUse(InUse):
|
||||
message = _("Unable to create the flat network. "
|
||||
"Physical network %(physical_network)s is in use.")
|
||||
|
||||
|
||||
class NoNetworkFoundInMaximumAllowedAttempts(ServiceUnavailable):
|
||||
message = _("Unable to create the network. "
|
||||
"No available network found in maximum allowed attempts.")
|
||||
|
||||
|
||||
class MalformedRequestBody(BadRequest):
|
||||
message = _("Malformed request body: %(reason)s.")
|
||||
|
||||
|
||||
class InvalidAllocationPool(BadRequest):
|
||||
message = _("The allocation pool %(pool)s is not valid.")
|
||||
|
||||
|
||||
class UnsupportedPortDeviceOwner(Conflict):
|
||||
message = _("Operation %(op)s is not supported for device_owner "
|
||||
"%(device_owner)s on port %(port_id)s.")
|
||||
|
||||
|
||||
class OverlappingAllocationPools(Conflict):
|
||||
message = _("Found overlapping allocation pools: "
|
||||
"%(pool_1)s %(pool_2)s for subnet %(subnet_cidr)s.")
|
||||
|
||||
|
||||
class OutOfBoundsAllocationPool(BadRequest):
|
||||
message = _("The allocation pool %(pool)s spans "
|
||||
"beyond the subnet cidr %(subnet_cidr)s.")
|
||||
|
||||
|
||||
class BridgeDoesNotExist(NeutronException):
|
||||
message = _("Bridge %(bridge)s does not exist.")
|
||||
|
||||
|
||||
class QuotaResourceUnknown(NotFound):
|
||||
message = _("Unknown quota resources %(unknown)s.")
|
||||
|
||||
|
||||
class QuotaMissingTenant(BadRequest):
|
||||
message = _("Tenant-id was missing from quota request.")
|
||||
|
||||
|
||||
class InvalidQuotaValue(Conflict):
|
||||
message = _("Change would make usage less than 0 for the following "
|
||||
"resources: %(unders)s.")
|
||||
|
||||
|
||||
class InvalidSharedSetting(Conflict):
|
||||
message = _("Unable to reconfigure sharing settings for network "
|
||||
"%(network)s. Multiple tenants are using it.")
|
||||
|
||||
|
||||
class ExtensionsNotFound(NotFound):
|
||||
message = _("Extensions not found: %(extensions)s.")
|
||||
|
||||
|
||||
class GatewayConflictWithAllocationPools(InUse):
|
||||
message = _("Gateway ip %(ip_address)s conflicts with "
|
||||
"allocation pool %(pool)s.")
|
||||
|
||||
|
||||
class GatewayIpInUse(InUse):
|
||||
message = _("Current gateway ip %(ip_address)s already in use "
|
||||
"by port %(port_id)s. Unable to update.")
|
||||
|
||||
|
||||
class NetworkVxlanPortRangeError(NeutronException):
|
||||
message = _("Invalid network VXLAN port range: '%(vxlan_range)s'.")
|
||||
|
||||
|
||||
class VxlanNetworkUnsupported(NeutronException):
|
||||
message = _("VXLAN network unsupported.")
|
||||
|
||||
|
||||
class DuplicatedExtension(NeutronException):
|
||||
message = _("Found duplicate extension: %(alias)s.")
|
||||
|
||||
|
||||
class DriverCallError(MultipleExceptions):
|
||||
def __init__(self, exc_list=None):
|
||||
super(DriverCallError, self).__init__(exc_list or [])
|
||||
|
||||
|
||||
class DeviceIDNotOwnedByTenant(Conflict):
|
||||
message = _("The following device_id %(device_id)s is not owned by your "
|
||||
"tenant or matches another tenants router.")
|
||||
|
||||
|
||||
class InvalidCIDR(BadRequest):
|
||||
message = _("Invalid CIDR %(input)s given as IP prefix.")
|
||||
|
||||
|
||||
class FailToDropPrivilegesExit(SystemExit):
|
||||
"""Exit exception raised when a drop privileges action fails."""
|
||||
code = 99
|
||||
|
||||
|
||||
class NetworkIdOrRouterIdRequiredError(NeutronException):
|
||||
message = _('Both network_id and router_id are None. '
|
||||
'One must be provided.')
|
||||
|
||||
|
||||
class EmptySubnetPoolPrefixList(BadRequest):
|
||||
message = _("Empty subnet pool prefix list.")
|
||||
|
||||
|
||||
class PrefixVersionMismatch(BadRequest):
|
||||
message = _("Cannot mix IPv4 and IPv6 prefixes in a subnet pool.")
|
||||
|
||||
|
||||
class UnsupportedMinSubnetPoolPrefix(BadRequest):
|
||||
message = _("Prefix '%(prefix)s' not supported in IPv%(version)s pool.")
|
||||
|
||||
|
||||
class IllegalSubnetPoolPrefixBounds(BadRequest):
|
||||
message = _("Illegal prefix bounds: %(prefix_type)s=%(prefixlen)s, "
|
||||
"%(base_prefix_type)s=%(base_prefixlen)s.")
|
||||
|
||||
|
||||
class IllegalSubnetPoolPrefixUpdate(BadRequest):
|
||||
message = _("Illegal update to prefixes: %(msg)s.")
|
||||
|
||||
|
||||
class SubnetAllocationError(NeutronException):
|
||||
message = _("Failed to allocate subnet: %(reason)s.")
|
||||
|
||||
|
||||
class AddressScopePrefixConflict(Conflict):
|
||||
message = _("Failed to associate address scope: subnetpools "
|
||||
"within an address scope must have unique prefixes.")
|
||||
|
||||
|
||||
class IllegalSubnetPoolAssociationToAddressScope(BadRequest):
|
||||
message = _("Illegal subnetpool association: subnetpool %(subnetpool_id)s "
|
||||
"cannot be associated with address scope "
|
||||
"%(address_scope_id)s.")
|
||||
|
||||
|
||||
class IllegalSubnetPoolIpVersionAssociationToAddressScope(BadRequest):
|
||||
message = _("Illegal subnetpool association: subnetpool %(subnetpool_id)s "
|
||||
"cannot associate with address scope %(address_scope_id)s "
|
||||
"because subnetpool ip_version is not %(ip_version)s.")
|
||||
|
||||
|
||||
class IllegalSubnetPoolUpdate(BadRequest):
|
||||
message = _("Illegal subnetpool update : %(reason)s.")
|
||||
|
||||
|
||||
class MinPrefixSubnetAllocationError(BadRequest):
|
||||
message = _("Unable to allocate subnet with prefix length %(prefixlen)s, "
|
||||
"minimum allowed prefix is %(min_prefixlen)s.")
|
||||
|
||||
|
||||
class MaxPrefixSubnetAllocationError(BadRequest):
|
||||
message = _("Unable to allocate subnet with prefix length %(prefixlen)s, "
|
||||
"maximum allowed prefix is %(max_prefixlen)s.")
|
||||
|
||||
|
||||
class SubnetPoolDeleteError(BadRequest):
|
||||
message = _("Unable to delete subnet pool: %(reason)s.")
|
||||
|
||||
|
||||
class SubnetPoolQuotaExceeded(OverQuota):
|
||||
message = _("Per-tenant subnet pool prefix quota exceeded.")
|
||||
|
||||
|
||||
class NetworkSubnetPoolAffinityError(BadRequest):
|
||||
message = _("Subnets hosted on the same network must be allocated from "
|
||||
"the same subnet pool.")
|
||||
|
||||
|
||||
class ObjectActionError(NeutronException):
|
||||
message = _('Object action %(action)s failed because: %(reason)s.')
|
||||
|
||||
|
||||
class CTZoneExhaustedError(NeutronException):
|
||||
message = _("IPtables conntrack zones exhausted, iptables rules cannot "
|
||||
"be applied.")
|
||||
|
||||
|
||||
class TenantQuotaNotFound(NotFound):
|
||||
message = _("Quota for tenant %(tenant_id)s could not be found.")
|
||||
|
||||
|
||||
class MultipleFilterIDForIPFound(Conflict):
|
||||
message = _("Multiple filter IDs for IP %(ip)s found.")
|
||||
|
||||
|
||||
class FilterIDForIPNotFound(NotFound):
|
||||
message = _("Filter ID for IP %(ip)s could not be found.")
|
||||
|
||||
|
||||
class FailedToAddQdiscToDevice(NeutronException):
|
||||
message = _("Failed to add %(direction)s qdisc "
|
||||
"to device %(device)s.")
|
||||
|
@ -71,3 +71,23 @@ class RouterExternalGatewayInUseByFloatingIp(exceptions.InUse):
|
||||
|
||||
class RouterInterfaceAttachmentConflict(exceptions.Conflict):
|
||||
message = _("Error %(reason)s while attempting the operation.")
|
||||
|
||||
|
||||
class RouterNotCompatibleWithAgent(exceptions.NeutronException):
|
||||
message = _("Router '%(router_id)s' is not compatible with this agent.")
|
||||
|
||||
|
||||
class FloatingIpSetupException(exceptions.NeutronException):
|
||||
def __init__(self, message=None):
|
||||
self.message = message
|
||||
super(FloatingIpSetupException, self).__init__()
|
||||
|
||||
|
||||
class AbortSyncRouters(exceptions.NeutronException):
|
||||
message = _("Aborting periodic_sync_routers_task due to an error.")
|
||||
|
||||
|
||||
class IpTablesApplyException(exceptions.NeutronException):
|
||||
def __init__(self, message=None):
|
||||
self.message = message
|
||||
super(IpTablesApplyException, self).__init__()
|
||||
|
87
neutron_lib/exceptions/qos.py
Normal file
87
neutron_lib/exceptions/qos.py
Normal file
@ -0,0 +1,87 @@
|
||||
# Copyright 2011 VMware, Inc
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from neutron_lib._i18n import _
|
||||
from neutron_lib import exceptions as e
|
||||
|
||||
|
||||
class QosPolicyNotFound(e.NotFound):
|
||||
message = _("QoS policy %(policy_id)s could not be found.")
|
||||
|
||||
|
||||
class QosRuleNotFound(e.NotFound):
|
||||
message = _("QoS rule %(rule_id)s for policy %(policy_id)s "
|
||||
"could not be found.")
|
||||
|
||||
|
||||
class QoSPolicyDefaultAlreadyExists(e.Conflict):
|
||||
message = _("A default QoS policy exists for project %(project_id)s.")
|
||||
|
||||
|
||||
class PortQosBindingNotFound(e.NotFound):
|
||||
message = _("QoS binding for port %(port_id)s and policy %(policy_id)s "
|
||||
"could not be found.")
|
||||
|
||||
|
||||
class PortQosBindingError(e.NeutronException):
|
||||
message = _("QoS binding for port %(port_id)s and policy %(policy_id)s "
|
||||
"could not be created: %(db_error)s.")
|
||||
|
||||
|
||||
class NetworkQosBindingNotFound(e.NotFound):
|
||||
message = _("QoS binding for network %(net_id)s and policy %(policy_id)s "
|
||||
"could not be found.")
|
||||
|
||||
|
||||
class FloatingIPQosBindingNotFound(e.NotFound):
|
||||
message = _("QoS binding for floating IP %(fip_id)s and policy "
|
||||
"%(policy_id)s could not be found.")
|
||||
|
||||
|
||||
class QosPolicyInUse(e.InUse):
|
||||
message = _("QoS Policy %(policy_id)s is used by "
|
||||
"%(object_type)s %(object_id)s.")
|
||||
|
||||
|
||||
class FloatingIPQosBindingError(e.NeutronException):
|
||||
message = _("QoS binding for floating IP %(fip_id)s and policy "
|
||||
"%(policy_id)s could not be created: %(db_error)s.")
|
||||
|
||||
|
||||
class NetworkQosBindingError(e.NeutronException):
|
||||
message = _("QoS binding for network %(net_id)s and policy %(policy_id)s "
|
||||
"could not be created: %(db_error)s.")
|
||||
|
||||
|
||||
class QosRuleNotSupported(e.Conflict):
|
||||
message = _("Rule %(rule_type)s is not supported by port %(port_id)s")
|
||||
|
||||
|
||||
class QoSRuleParameterConflict(e.Conflict):
|
||||
message = _("Unable to add the rule with value %(rule_value)s to the "
|
||||
"policy %(policy_id)s as the existing rule of type "
|
||||
"%(existing_rule)s restricts the bandwidth to "
|
||||
"%(existing_value)s.")
|
||||
|
||||
|
||||
class QoSRulesConflict(e.Conflict):
|
||||
message = _("Rule %(new_rule_type)s conflicts with "
|
||||
"rule %(rule_id)s which already exists in "
|
||||
"QoS Policy %(policy_id)s.")
|
||||
|
||||
|
||||
class PolicyRemoveAuthorizationError(e.NotAuthorized):
|
||||
message = _("Failed to remove provided policy %(policy_id)s "
|
||||
"because you are not authorized.")
|
@ -14,6 +14,8 @@ import copy
|
||||
|
||||
import fixtures
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_messaging import conffixture
|
||||
|
||||
from neutron_lib.api import attributes
|
||||
from neutron_lib.api import definitions
|
||||
@ -23,6 +25,11 @@ from neutron_lib.db import api as db_api
|
||||
from neutron_lib.db import model_base
|
||||
from neutron_lib.db import model_query
|
||||
from neutron_lib.plugins import directory
|
||||
from neutron_lib import rpc
|
||||
from neutron_lib.tests.unit import fake_notifier
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class PluginDirectoryFixture(fixtures.Fixture):
|
||||
@ -244,3 +251,22 @@ class DBQueryHooksFixture(fixtures.Fixture):
|
||||
|
||||
def _restore(self):
|
||||
model_query._model_query_hooks = self._backup
|
||||
|
||||
|
||||
class RPCFixture(fixtures.Fixture):
|
||||
|
||||
def _setUp(self):
|
||||
# don't actually start RPC listeners when testing
|
||||
mock.patch.object(rpc.Connection, 'consume_in_threads',
|
||||
return_value=[]).start()
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'oslo_messaging.Notifier', fake_notifier.FakeNotifier))
|
||||
|
||||
self.messaging_conf = conffixture.ConfFixture(CONF)
|
||||
self.messaging_conf.transport_driver = 'fake'
|
||||
# NOTE(russellb) We want all calls to return immediately.
|
||||
self.messaging_conf.response_timeout = 0
|
||||
self.useFixture(self.messaging_conf)
|
||||
|
||||
self.addCleanup(rpc.cleanup)
|
||||
rpc.init(CONF)
|
||||
|
357
neutron_lib/rpc.py
Normal file
357
neutron_lib/rpc.py
Normal file
@ -0,0 +1,357 @@
|
||||
# Copyright (c) 2012 OpenStack Foundation.
|
||||
# Copyright (c) 2014 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import random
|
||||
import time
|
||||
|
||||
from neutron_lib._i18n import _
|
||||
from neutron_lib import context
|
||||
from neutron_lib import exceptions
|
||||
from neutron_lib.utils import runtime
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
from oslo_messaging.rpc import dispatcher
|
||||
from oslo_messaging import serializer as om_serializer
|
||||
from oslo_service import service
|
||||
from oslo_utils import excutils
|
||||
from osprofiler import profiler
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
TRANSPORT = None
|
||||
NOTIFICATION_TRANSPORT = None
|
||||
NOTIFIER = None
|
||||
|
||||
_DFT_EXMODS = runtime.list_package_modules(exceptions.__name__)
|
||||
|
||||
|
||||
def init(conf, rpc_ext_mods=None):
|
||||
"""Initialize the global RPC objects.
|
||||
|
||||
:param conf: The oslo conf to use for initialization.
|
||||
:param rpc_ext_mods: Exception modules to expose via RPC.
|
||||
:returns: None.
|
||||
"""
|
||||
global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
|
||||
|
||||
if rpc_ext_mods is None:
|
||||
rpc_ext_mods = _DFT_EXMODS
|
||||
else:
|
||||
rpc_ext_mods = list(set(rpc_ext_mods + _DFT_EXMODS))
|
||||
|
||||
TRANSPORT = oslo_messaging.get_rpc_transport(
|
||||
conf, allowed_remote_exmods=rpc_ext_mods)
|
||||
NOTIFICATION_TRANSPORT = oslo_messaging.get_notification_transport(
|
||||
conf, allowed_remote_exmods=rpc_ext_mods)
|
||||
serializer = RequestContextSerializer()
|
||||
NOTIFIER = oslo_messaging.Notifier(NOTIFICATION_TRANSPORT,
|
||||
serializer=serializer)
|
||||
|
||||
|
||||
def cleanup():
|
||||
"""Deactivate and cleanup the global RPC objects.
|
||||
|
||||
:returns: None.
|
||||
"""
|
||||
global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
|
||||
if TRANSPORT is None:
|
||||
raise AssertionError(_("'TRANSPORT' must not be None"))
|
||||
if NOTIFICATION_TRANSPORT is None:
|
||||
raise AssertionError(
|
||||
_("'NOTIFICATION_TRANSPORT' must not be None"))
|
||||
if NOTIFIER is None:
|
||||
raise AssertionError(_("'NOTIFIER' must not be None"))
|
||||
TRANSPORT.cleanup()
|
||||
NOTIFICATION_TRANSPORT.cleanup()
|
||||
_BackingOffContextWrapper.reset_timeouts()
|
||||
TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None
|
||||
|
||||
|
||||
def _get_default_method_timeout():
|
||||
return TRANSPORT.conf.rpc_response_timeout
|
||||
|
||||
|
||||
def _get_default_method_timeouts():
|
||||
return collections.defaultdict(_get_default_method_timeout)
|
||||
|
||||
|
||||
class _ContextWrapper(object):
|
||||
def __init__(self, original_context):
|
||||
self._original_context = original_context
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self._original_context, name)
|
||||
|
||||
def cast(self, ctxt, method, **kwargs):
|
||||
try:
|
||||
self._original_context.cast(ctxt, method, **kwargs)
|
||||
except Exception as e:
|
||||
# TODO(kevinbenton): make catch specific to missing exchange once
|
||||
# bug/1705351 is resolved on the oslo.messaging side; if
|
||||
# oslo.messaging auto-creates the exchange, then just remove the
|
||||
# code completely
|
||||
LOG.debug("Ignored exception during cast: %e", e)
|
||||
|
||||
|
||||
class _BackingOffContextWrapper(_ContextWrapper):
|
||||
"""Wraps oslo messaging contexts to set the timeout for calls.
|
||||
|
||||
This intercepts RPC calls and sets the timeout value to the globally
|
||||
adapting value for each method. An oslo messaging timeout results in
|
||||
a doubling of the timeout value for the method on which it timed out.
|
||||
There currently is no logic to reduce the timeout since busy Neutron
|
||||
servers are more frequently the cause of timeouts rather than lost
|
||||
messages.
|
||||
"""
|
||||
_METHOD_TIMEOUTS = _get_default_method_timeouts()
|
||||
_max_timeout = None
|
||||
|
||||
@classmethod
|
||||
def reset_timeouts(cls):
|
||||
# restore the original default timeout factory
|
||||
cls._METHOD_TIMEOUTS = _get_default_method_timeouts()
|
||||
cls._max_timeout = None
|
||||
|
||||
@classmethod
|
||||
def get_max_timeout(cls):
|
||||
return cls._max_timeout or _get_default_method_timeout() * 10
|
||||
|
||||
@classmethod
|
||||
def set_max_timeout(cls, max_timeout):
|
||||
if max_timeout < cls.get_max_timeout():
|
||||
cls._METHOD_TIMEOUTS = collections.defaultdict(
|
||||
lambda: max_timeout, **{
|
||||
k: min(v, max_timeout)
|
||||
for k, v in cls._METHOD_TIMEOUTS.items()
|
||||
})
|
||||
cls._max_timeout = max_timeout
|
||||
|
||||
def call(self, ctxt, method, **kwargs):
|
||||
# two methods with the same name in different namespaces should
|
||||
# be tracked independently
|
||||
if self._original_context.target.namespace:
|
||||
scoped_method = '%s.%s' % (self._original_context.target.namespace,
|
||||
method)
|
||||
else:
|
||||
scoped_method = method
|
||||
# set the timeout from the global method timeout tracker for this
|
||||
# method
|
||||
self._original_context.timeout = self._METHOD_TIMEOUTS[scoped_method]
|
||||
try:
|
||||
return self._original_context.call(ctxt, method, **kwargs)
|
||||
except oslo_messaging.MessagingTimeout:
|
||||
with excutils.save_and_reraise_exception():
|
||||
wait = random.uniform(
|
||||
0,
|
||||
min(self._METHOD_TIMEOUTS[scoped_method],
|
||||
TRANSPORT.conf.rpc_response_timeout)
|
||||
)
|
||||
LOG.error("Timeout in RPC method %(method)s. Waiting for "
|
||||
"%(wait)s seconds before next attempt. If the "
|
||||
"server is not down, consider increasing the "
|
||||
"rpc_response_timeout option as Neutron "
|
||||
"server(s) may be overloaded and unable to "
|
||||
"respond quickly enough.",
|
||||
{'wait': int(round(wait)), 'method': scoped_method})
|
||||
new_timeout = min(
|
||||
self._original_context.timeout * 2, self.get_max_timeout())
|
||||
if new_timeout > self._METHOD_TIMEOUTS[scoped_method]:
|
||||
LOG.warning("Increasing timeout for %(method)s calls "
|
||||
"to %(new)s seconds. Restart the agent to "
|
||||
"restore it to the default value.",
|
||||
{'method': scoped_method, 'new': new_timeout})
|
||||
self._METHOD_TIMEOUTS[scoped_method] = new_timeout
|
||||
time.sleep(wait)
|
||||
|
||||
|
||||
class BackingOffClient(oslo_messaging.RPCClient):
|
||||
"""An oslo messaging RPC Client that implements a timeout backoff.
|
||||
|
||||
This has all of the same interfaces as oslo_messaging.RPCClient but
|
||||
if the timeout parameter is not specified, the _BackingOffContextWrapper
|
||||
returned will track when call timeout exceptions occur and exponentially
|
||||
increase the timeout for the given call method.
|
||||
"""
|
||||
def prepare(self, *args, **kwargs):
|
||||
ctx = super(BackingOffClient, self).prepare(*args, **kwargs)
|
||||
# don't back off contexts that explicitly set a timeout
|
||||
if 'timeout' in kwargs:
|
||||
return _ContextWrapper(ctx)
|
||||
return _BackingOffContextWrapper(ctx)
|
||||
|
||||
@staticmethod
|
||||
def set_max_timeout(max_timeout):
|
||||
'''Set RPC timeout ceiling for all backing-off RPC clients.'''
|
||||
_BackingOffContextWrapper.set_max_timeout(max_timeout)
|
||||
|
||||
|
||||
def get_client(target, version_cap=None, serializer=None):
|
||||
"""Get an RPC client for the said target.
|
||||
|
||||
The init() function must be called prior to calling this.
|
||||
:param target: The RPC target for the client.
|
||||
:param version_cap: The optional version cap for the RPC client.
|
||||
:param serializer: The optional serializer to use for the RPC client.
|
||||
:returns: A new RPC client.
|
||||
"""
|
||||
if TRANSPORT is None:
|
||||
raise AssertionError(_("'TRANSPORT' must not be None"))
|
||||
serializer = RequestContextSerializer(serializer)
|
||||
return BackingOffClient(TRANSPORT,
|
||||
target,
|
||||
version_cap=version_cap,
|
||||
serializer=serializer)
|
||||
|
||||
|
||||
def get_server(target, endpoints, serializer=None):
|
||||
"""Get a new RPC server reference.
|
||||
|
||||
:param target: The target for the new RPC server.
|
||||
:param endpoints: The endpoints for the RPC server.
|
||||
:param serializer: The optional serialize to use for the RPC server.
|
||||
:returns: A new RPC server reference.
|
||||
"""
|
||||
if TRANSPORT is None:
|
||||
raise AssertionError(_("'TRANSPORT' must not be None"))
|
||||
serializer = RequestContextSerializer(serializer)
|
||||
access_policy = dispatcher.DefaultRPCAccessPolicy
|
||||
return oslo_messaging.get_rpc_server(TRANSPORT, target, endpoints,
|
||||
'eventlet', serializer,
|
||||
access_policy=access_policy)
|
||||
|
||||
|
||||
def get_notifier(service=None, host=None, publisher_id=None):
|
||||
"""Get a new notifier reference.
|
||||
|
||||
:param service: The optional service for the notifier.
|
||||
:param host: The optional host for the notifier. If not given the host
|
||||
will be taken from the global CONF.
|
||||
:param publisher_id: The optional publisher ID for the notifer.
|
||||
:returns: A new RPC notifier reference.
|
||||
"""
|
||||
if NOTIFIER is None:
|
||||
raise AssertionError(_("'NOTIFIER' must not be None"))
|
||||
if not publisher_id:
|
||||
publisher_id = "%s.%s" % (service, host or cfg.CONF.host)
|
||||
return NOTIFIER.prepare(publisher_id=publisher_id)
|
||||
|
||||
|
||||
class RequestContextSerializer(om_serializer.Serializer):
|
||||
"""Convert RPC common context into Neutron Context."""
|
||||
def __init__(self, base=None):
|
||||
super(RequestContextSerializer, self).__init__()
|
||||
self._base = base
|
||||
|
||||
def serialize_entity(self, ctxt, entity):
|
||||
if not self._base:
|
||||
return entity
|
||||
return self._base.serialize_entity(ctxt, entity)
|
||||
|
||||
def deserialize_entity(self, ctxt, entity):
|
||||
if not self._base:
|
||||
return entity
|
||||
return self._base.deserialize_entity(ctxt, entity)
|
||||
|
||||
def serialize_context(self, ctxt):
|
||||
_context = ctxt.to_dict()
|
||||
prof = profiler.get()
|
||||
if prof:
|
||||
trace_info = {
|
||||
"hmac_key": prof.hmac_key,
|
||||
"base_id": prof.get_base_id(),
|
||||
"parent_id": prof.get_id()
|
||||
}
|
||||
_context['trace_info'] = trace_info
|
||||
return _context
|
||||
|
||||
def deserialize_context(self, ctxt):
|
||||
rpc_ctxt_dict = ctxt.copy()
|
||||
trace_info = rpc_ctxt_dict.pop("trace_info", None)
|
||||
if trace_info:
|
||||
profiler.init(**trace_info)
|
||||
return context.Context.from_dict(rpc_ctxt_dict)
|
||||
|
||||
|
||||
@profiler.trace_cls("rpc")
|
||||
class Service(service.Service):
|
||||
"""Service object for binaries running on hosts.
|
||||
|
||||
A service enables rpc by listening to queues based on topic and host.
|
||||
"""
|
||||
def __init__(self, host, topic, manager=None, serializer=None):
|
||||
super(Service, self).__init__()
|
||||
self.host = host
|
||||
self.topic = topic
|
||||
self.serializer = serializer
|
||||
if manager is None:
|
||||
self.manager = self
|
||||
else:
|
||||
self.manager = manager
|
||||
|
||||
def start(self):
|
||||
super(Service, self).start()
|
||||
|
||||
self.conn = Connection()
|
||||
LOG.debug("Creating Consumer connection for Service %s",
|
||||
self.topic)
|
||||
|
||||
endpoints = [self.manager]
|
||||
|
||||
self.conn.create_consumer(self.topic, endpoints)
|
||||
|
||||
# Hook to allow the manager to do other initializations after
|
||||
# the rpc connection is created.
|
||||
if callable(getattr(self.manager, 'initialize_service_hook', None)):
|
||||
self.manager.initialize_service_hook(self)
|
||||
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
def stop(self):
|
||||
# Try to shut the connection down, but if we get any sort of
|
||||
# errors, go ahead and ignore them.. as we're shutting down anyway
|
||||
try:
|
||||
self.conn.close()
|
||||
except Exception: # nosec
|
||||
pass
|
||||
super(Service, self).stop()
|
||||
|
||||
|
||||
class Connection(object):
|
||||
"""A utility class that manages a collection of RPC servers."""
|
||||
|
||||
def __init__(self):
|
||||
super(Connection, self).__init__()
|
||||
self.servers = []
|
||||
|
||||
def create_consumer(self, topic, endpoints, fanout=False):
|
||||
target = oslo_messaging.Target(
|
||||
topic=topic, server=cfg.CONF.host, fanout=fanout)
|
||||
server = get_server(target, endpoints)
|
||||
self.servers.append(server)
|
||||
|
||||
def consume_in_threads(self):
|
||||
for server in self.servers:
|
||||
server.start()
|
||||
return self.servers
|
||||
|
||||
def close(self):
|
||||
for server in self.servers:
|
||||
server.stop()
|
||||
for server in self.servers:
|
||||
server.wait()
|
52
neutron_lib/tests/unit/fake_notifier.py
Normal file
52
neutron_lib/tests/unit/fake_notifier.py
Normal file
@ -0,0 +1,52 @@
|
||||
# Copyright 2014 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import functools
|
||||
|
||||
|
||||
NOTIFICATIONS = []
|
||||
|
||||
|
||||
def reset():
|
||||
del NOTIFICATIONS[:]
|
||||
|
||||
|
||||
FakeMessage = collections.namedtuple('Message',
|
||||
['publisher_id', 'priority',
|
||||
'event_type', 'payload'])
|
||||
|
||||
|
||||
class FakeNotifier(object):
|
||||
|
||||
def __init__(self, transport, publisher_id=None,
|
||||
driver=None, topics=None,
|
||||
serializer=None, retry=None):
|
||||
self.transport = transport
|
||||
self.publisher_id = publisher_id
|
||||
for priority in ('debug', 'info', 'warn', 'error', 'critical'):
|
||||
setattr(self, priority,
|
||||
functools.partial(self._notify, priority=priority.upper()))
|
||||
|
||||
def prepare(self, publisher_id=None):
|
||||
if publisher_id is None:
|
||||
publisher_id = self.publisher_id
|
||||
return self.__class__(self.transport, publisher_id)
|
||||
|
||||
def _notify(self, ctxt, event_type, payload, priority):
|
||||
msg = dict(publisher_id=self.publisher_id,
|
||||
priority=priority,
|
||||
event_type=event_type,
|
||||
payload=payload)
|
||||
NOTIFICATIONS.append(msg)
|
493
neutron_lib/tests/unit/test_rpc.py
Normal file
493
neutron_lib/tests/unit/test_rpc.py
Normal file
@ -0,0 +1,493 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from oslo_config import cfg
|
||||
import oslo_messaging as messaging
|
||||
from oslo_messaging import conffixture as messaging_conffixture
|
||||
from oslo_messaging.rpc import dispatcher
|
||||
import testtools
|
||||
|
||||
from neutron_lib import fixture
|
||||
from neutron_lib import rpc
|
||||
from neutron_lib.tests import _base as base
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class TestRPC(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestRPC, self).setUp()
|
||||
self.useFixture(fixture.RPCFixture())
|
||||
|
||||
@mock.patch.object(rpc, 'RequestContextSerializer')
|
||||
@mock.patch.object(messaging, 'get_rpc_transport')
|
||||
@mock.patch.object(messaging, 'get_notification_transport')
|
||||
@mock.patch.object(messaging, 'Notifier')
|
||||
def test_init(self, mock_not, mock_noti_trans, mock_trans, mock_ser):
|
||||
notifier = mock.Mock()
|
||||
transport = mock.Mock()
|
||||
noti_transport = mock.Mock()
|
||||
serializer = mock.Mock()
|
||||
conf = mock.Mock()
|
||||
|
||||
mock_trans.return_value = transport
|
||||
mock_noti_trans.return_value = noti_transport
|
||||
mock_ser.return_value = serializer
|
||||
mock_not.return_value = notifier
|
||||
|
||||
rpc.init(conf, rpc_ext_mods=['foo'])
|
||||
|
||||
expected_mods = list(set(['foo'] + rpc._DFT_EXMODS))
|
||||
mock_trans.assert_called_once_with(
|
||||
conf, allowed_remote_exmods=expected_mods)
|
||||
mock_noti_trans.assert_called_once_with(
|
||||
conf, allowed_remote_exmods=expected_mods)
|
||||
mock_not.assert_called_once_with(noti_transport,
|
||||
serializer=serializer)
|
||||
self.assertIsNotNone(rpc.TRANSPORT)
|
||||
self.assertIsNotNone(rpc.NOTIFICATION_TRANSPORT)
|
||||
self.assertIsNotNone(rpc.NOTIFIER)
|
||||
|
||||
def test_cleanup_transport_null(self):
|
||||
rpc.NOTIFIER = mock.Mock()
|
||||
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
|
||||
rpc.TRANSPORT = None
|
||||
self.assertRaises(AssertionError, rpc.cleanup)
|
||||
rpc.TRANSPORT = mock.Mock()
|
||||
|
||||
def test_cleanup_notification_transport_null(self):
|
||||
rpc.TRANSPORT = mock.Mock()
|
||||
rpc.NOTIFIER = mock.Mock()
|
||||
rpc.NOTIFICATION_TRANSPORT = None
|
||||
self.assertRaises(AssertionError, rpc.cleanup)
|
||||
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
|
||||
|
||||
def test_cleanup_notifier_null(self):
|
||||
rpc.TRANSPORT = mock.Mock()
|
||||
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
|
||||
rpc.NOTIFIER = None
|
||||
self.assertRaises(AssertionError, rpc.cleanup)
|
||||
rpc.NOTIFIER = mock.Mock()
|
||||
|
||||
def test_cleanup(self):
|
||||
rpc.NOTIFIER = mock.Mock()
|
||||
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
|
||||
rpc.TRANSPORT = mock.Mock()
|
||||
trans_cleanup = mock.Mock()
|
||||
not_trans_cleanup = mock.Mock()
|
||||
rpc.TRANSPORT.cleanup = trans_cleanup
|
||||
rpc.NOTIFICATION_TRANSPORT.cleanup = not_trans_cleanup
|
||||
|
||||
rpc.cleanup()
|
||||
|
||||
trans_cleanup.assert_called_once_with()
|
||||
not_trans_cleanup.assert_called_once_with()
|
||||
self.assertIsNone(rpc.TRANSPORT)
|
||||
self.assertIsNone(rpc.NOTIFICATION_TRANSPORT)
|
||||
self.assertIsNone(rpc.NOTIFIER)
|
||||
|
||||
rpc.TRANSPORT = mock.Mock()
|
||||
rpc.NOTIFIER = mock.Mock()
|
||||
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
|
||||
|
||||
@mock.patch.object(rpc, 'RequestContextSerializer')
|
||||
@mock.patch.object(rpc, 'BackingOffClient')
|
||||
def test_get_client(self, mock_client, mock_ser):
|
||||
rpc.TRANSPORT = mock.Mock()
|
||||
tgt = mock.Mock()
|
||||
ser = mock.Mock()
|
||||
mock_client.return_value = 'client'
|
||||
mock_ser.return_value = ser
|
||||
|
||||
client = rpc.get_client(tgt, version_cap='1.0', serializer='foo')
|
||||
|
||||
mock_ser.assert_called_once_with('foo')
|
||||
mock_client.assert_called_once_with(rpc.TRANSPORT,
|
||||
tgt, version_cap='1.0',
|
||||
serializer=ser)
|
||||
self.assertEqual('client', client)
|
||||
|
||||
@mock.patch.object(rpc, 'RequestContextSerializer')
|
||||
@mock.patch.object(messaging, 'get_rpc_server')
|
||||
def test_get_server(self, mock_get, mock_ser):
|
||||
rpc.TRANSPORT = mock.Mock()
|
||||
ser = mock.Mock()
|
||||
tgt = mock.Mock()
|
||||
ends = mock.Mock()
|
||||
mock_ser.return_value = ser
|
||||
mock_get.return_value = 'server'
|
||||
|
||||
server = rpc.get_server(tgt, ends, serializer='foo')
|
||||
|
||||
mock_ser.assert_called_once_with('foo')
|
||||
access_policy = dispatcher.DefaultRPCAccessPolicy
|
||||
mock_get.assert_called_once_with(rpc.TRANSPORT, tgt, ends,
|
||||
'eventlet', ser,
|
||||
access_policy=access_policy)
|
||||
self.assertEqual('server', server)
|
||||
|
||||
def test_get_notifier(self):
|
||||
rpc.NOTIFIER = mock.Mock()
|
||||
mock_prep = mock.Mock()
|
||||
mock_prep.return_value = 'notifier'
|
||||
rpc.NOTIFIER.prepare = mock_prep
|
||||
|
||||
notifier = rpc.get_notifier('service', publisher_id='foo')
|
||||
|
||||
mock_prep.assert_called_once_with(publisher_id='foo')
|
||||
self.assertEqual('notifier', notifier)
|
||||
|
||||
def test_get_notifier_null_publisher(self):
|
||||
rpc.NOTIFIER = mock.Mock()
|
||||
mock_prep = mock.Mock()
|
||||
mock_prep.return_value = 'notifier'
|
||||
rpc.NOTIFIER.prepare = mock_prep
|
||||
|
||||
notifier = rpc.get_notifier('service', host='bar')
|
||||
|
||||
mock_prep.assert_called_once_with(publisher_id='service.bar')
|
||||
self.assertEqual('notifier', notifier)
|
||||
|
||||
|
||||
class TestRequestContextSerializer(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestRequestContextSerializer, self).setUp()
|
||||
self.mock_base = mock.Mock()
|
||||
self.ser = rpc.RequestContextSerializer(self.mock_base)
|
||||
self.ser_null = rpc.RequestContextSerializer(None)
|
||||
|
||||
def test_serialize_entity(self):
|
||||
self.mock_base.serialize_entity.return_value = 'foo'
|
||||
|
||||
ser_ent = self.ser.serialize_entity('context', 'entity')
|
||||
|
||||
self.mock_base.serialize_entity.assert_called_once_with('context',
|
||||
'entity')
|
||||
self.assertEqual('foo', ser_ent)
|
||||
|
||||
def test_deserialize_entity(self):
|
||||
self.mock_base.deserialize_entity.return_value = 'foo'
|
||||
|
||||
deser_ent = self.ser.deserialize_entity('context', 'entity')
|
||||
|
||||
self.mock_base.deserialize_entity.assert_called_once_with('context',
|
||||
'entity')
|
||||
self.assertEqual('foo', deser_ent)
|
||||
|
||||
def test_deserialize_entity_null_base(self):
|
||||
deser_ent = self.ser_null.deserialize_entity('context', 'entity')
|
||||
|
||||
self.assertEqual('entity', deser_ent)
|
||||
|
||||
def test_serialize_context(self):
|
||||
context = mock.Mock()
|
||||
|
||||
self.ser.serialize_context(context)
|
||||
|
||||
context.to_dict.assert_called_once_with()
|
||||
|
||||
def test_deserialize_context(self):
|
||||
context_dict = {'foo': 'bar',
|
||||
'user_id': 1,
|
||||
'tenant_id': 1,
|
||||
'is_admin': True}
|
||||
|
||||
c = self.ser.deserialize_context(context_dict)
|
||||
|
||||
self.assertEqual(1, c.user_id)
|
||||
self.assertEqual(1, c.project_id)
|
||||
|
||||
def test_deserialize_context_no_user_id(self):
|
||||
context_dict = {'foo': 'bar',
|
||||
'user': 1,
|
||||
'tenant_id': 1,
|
||||
'is_admin': True}
|
||||
|
||||
c = self.ser.deserialize_context(context_dict)
|
||||
|
||||
self.assertEqual(1, c.user_id)
|
||||
self.assertEqual(1, c.project_id)
|
||||
|
||||
def test_deserialize_context_no_tenant_id(self):
|
||||
context_dict = {'foo': 'bar',
|
||||
'user_id': 1,
|
||||
'project_id': 1,
|
||||
'is_admin': True}
|
||||
|
||||
c = self.ser.deserialize_context(context_dict)
|
||||
|
||||
self.assertEqual(1, c.user_id)
|
||||
self.assertEqual(1, c.project_id)
|
||||
|
||||
def test_deserialize_context_no_ids(self):
|
||||
context_dict = {'foo': 'bar', 'is_admin': True}
|
||||
|
||||
c = self.ser.deserialize_context(context_dict)
|
||||
|
||||
self.assertIsNone(c.user_id)
|
||||
self.assertIsNone(c.project_id)
|
||||
|
||||
|
||||
class ServiceTestCase(base.BaseTestCase):
|
||||
# the class cannot be based on BaseTestCase since it mocks rpc.Connection
|
||||
|
||||
def setUp(self):
|
||||
super(ServiceTestCase, self).setUp()
|
||||
self.host = 'foo'
|
||||
self.topic = 'neutron-agent'
|
||||
|
||||
self.target_mock = mock.patch('oslo_messaging.Target')
|
||||
self.target_mock.start()
|
||||
|
||||
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
|
||||
self.messaging_conf.transport_url = 'fake://'
|
||||
self.messaging_conf.response_timeout = 0
|
||||
self.useFixture(self.messaging_conf)
|
||||
|
||||
self.addCleanup(rpc.cleanup)
|
||||
rpc.init(CONF)
|
||||
|
||||
@mock.patch.object(cfg, 'CONF')
|
||||
def test_operations(self, mock_conf):
|
||||
mock_conf.host = self.host
|
||||
with mock.patch('oslo_messaging.get_rpc_server') as get_rpc_server:
|
||||
rpc_server = get_rpc_server.return_value
|
||||
|
||||
service = rpc.Service(self.host, self.topic)
|
||||
service.start()
|
||||
rpc_server.start.assert_called_once_with()
|
||||
|
||||
service.stop()
|
||||
rpc_server.stop.assert_called_once_with()
|
||||
rpc_server.wait.assert_called_once_with()
|
||||
|
||||
|
||||
class TimeoutTestCase(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TimeoutTestCase, self).setUp()
|
||||
|
||||
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
|
||||
self.messaging_conf.transport_url = 'fake://'
|
||||
self.messaging_conf.response_timeout = 0
|
||||
self.useFixture(self.messaging_conf)
|
||||
|
||||
self.addCleanup(rpc.cleanup)
|
||||
rpc.init(CONF)
|
||||
rpc.TRANSPORT = mock.MagicMock()
|
||||
rpc.TRANSPORT._send.side_effect = messaging.MessagingTimeout
|
||||
target = messaging.Target(version='1.0', topic='testing')
|
||||
self.client = rpc.get_client(target)
|
||||
self.call_context = mock.Mock()
|
||||
self.sleep = mock.patch('time.sleep').start()
|
||||
rpc.TRANSPORT.conf.rpc_response_timeout = 10
|
||||
|
||||
def test_timeout_unaffected_when_explicitly_set(self):
|
||||
rpc.TRANSPORT.conf.rpc_response_timeout = 5
|
||||
ctx = self.client.prepare(topic='sandwiches', timeout=77)
|
||||
with testtools.ExpectedException(messaging.MessagingTimeout):
|
||||
ctx.call(self.call_context, 'create_pb_and_j')
|
||||
# ensure that the timeout was not increased and the back-off sleep
|
||||
# wasn't called
|
||||
self.assertEqual(
|
||||
5,
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['create_pb_and_j'])
|
||||
self.assertFalse(self.sleep.called)
|
||||
|
||||
def test_timeout_store_defaults(self):
|
||||
# any method should default to the configured timeout
|
||||
self.assertEqual(
|
||||
rpc.TRANSPORT.conf.rpc_response_timeout,
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
|
||||
self.assertEqual(
|
||||
rpc.TRANSPORT.conf.rpc_response_timeout,
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'])
|
||||
# a change to an existing should not affect new or existing ones
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'] = 7000
|
||||
self.assertEqual(
|
||||
rpc.TRANSPORT.conf.rpc_response_timeout,
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
|
||||
self.assertEqual(
|
||||
rpc.TRANSPORT.conf.rpc_response_timeout,
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_3'])
|
||||
|
||||
def test_method_timeout_sleep(self):
|
||||
rpc.TRANSPORT.conf.rpc_response_timeout = 2
|
||||
for i in range(100):
|
||||
with testtools.ExpectedException(messaging.MessagingTimeout):
|
||||
self.client.call(self.call_context, 'method_1')
|
||||
# sleep value should always be between 0 and configured timeout
|
||||
self.assertGreaterEqual(self.sleep.call_args_list[0][0][0], 0)
|
||||
self.assertLessEqual(self.sleep.call_args_list[0][0][0], 2)
|
||||
self.sleep.reset_mock()
|
||||
|
||||
def test_method_timeout_increases_on_timeout_exception(self):
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
|
||||
for i in range(5):
|
||||
with testtools.ExpectedException(messaging.MessagingTimeout):
|
||||
self.client.call(self.call_context, 'method_1')
|
||||
|
||||
# we only care to check the timeouts sent to the transport
|
||||
timeouts = [call[1]['timeout']
|
||||
for call in rpc.TRANSPORT._send.call_args_list]
|
||||
self.assertEqual([1, 2, 4, 8, 16], timeouts)
|
||||
|
||||
def test_method_timeout_10x_config_ceiling(self):
|
||||
rpc.TRANSPORT.conf.rpc_response_timeout = 10
|
||||
# 5 doublings should max out at the 10xdefault ceiling
|
||||
for i in range(5):
|
||||
with testtools.ExpectedException(messaging.MessagingTimeout):
|
||||
self.client.call(self.call_context, 'method_1')
|
||||
self.assertEqual(
|
||||
10 * rpc.TRANSPORT.conf.rpc_response_timeout,
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
|
||||
with testtools.ExpectedException(messaging.MessagingTimeout):
|
||||
self.client.call(self.call_context, 'method_1')
|
||||
self.assertEqual(
|
||||
10 * rpc.TRANSPORT.conf.rpc_response_timeout,
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
|
||||
|
||||
def test_timeout_unchanged_on_other_exception(self):
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
|
||||
rpc.TRANSPORT._send.side_effect = ValueError
|
||||
with testtools.ExpectedException(ValueError):
|
||||
self.client.call(self.call_context, 'method_1')
|
||||
rpc.TRANSPORT._send.side_effect = messaging.MessagingTimeout
|
||||
with testtools.ExpectedException(messaging.MessagingTimeout):
|
||||
self.client.call(self.call_context, 'method_1')
|
||||
timeouts = [call[1]['timeout']
|
||||
for call in rpc.TRANSPORT._send.call_args_list]
|
||||
self.assertEqual([1, 1], timeouts)
|
||||
|
||||
def test_timeouts_for_methods_tracked_independently(self):
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'] = 1
|
||||
for method in ('method_1', 'method_1', 'method_2',
|
||||
'method_1', 'method_2'):
|
||||
with testtools.ExpectedException(messaging.MessagingTimeout):
|
||||
self.client.call(self.call_context, method)
|
||||
timeouts = [call[1]['timeout']
|
||||
for call in rpc.TRANSPORT._send.call_args_list]
|
||||
self.assertEqual([1, 2, 1, 4, 2], timeouts)
|
||||
|
||||
def test_timeouts_for_namespaces_tracked_independently(self):
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['ns1.method'] = 1
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['ns2.method'] = 1
|
||||
for ns in ('ns1', 'ns2'):
|
||||
self.client.target.namespace = ns
|
||||
for i in range(4):
|
||||
with testtools.ExpectedException(messaging.MessagingTimeout):
|
||||
self.client.call(self.call_context, 'method')
|
||||
timeouts = [call[1]['timeout']
|
||||
for call in rpc.TRANSPORT._send.call_args_list]
|
||||
self.assertEqual([1, 2, 4, 8, 1, 2, 4, 8], timeouts)
|
||||
|
||||
def test_method_timeout_increases_with_prepare(self):
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
|
||||
ctx = self.client.prepare(version='1.4')
|
||||
with testtools.ExpectedException(messaging.MessagingTimeout):
|
||||
ctx.call(self.call_context, 'method_1')
|
||||
with testtools.ExpectedException(messaging.MessagingTimeout):
|
||||
ctx.call(self.call_context, 'method_1')
|
||||
|
||||
# we only care to check the timeouts sent to the transport
|
||||
timeouts = [call[1]['timeout']
|
||||
for call in rpc.TRANSPORT._send.call_args_list]
|
||||
self.assertEqual([1, 2], timeouts)
|
||||
|
||||
def test_set_max_timeout_caps_all_methods(self):
|
||||
rpc.TRANSPORT.conf.rpc_response_timeout = 300
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 100
|
||||
rpc.BackingOffClient.set_max_timeout(50)
|
||||
# both explicitly tracked
|
||||
self.assertEqual(
|
||||
50, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
|
||||
# as well as new methods
|
||||
self.assertEqual(
|
||||
50, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'])
|
||||
|
||||
def test_set_max_timeout_retains_lower_timeouts(self):
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 10
|
||||
rpc.BackingOffClient.set_max_timeout(50)
|
||||
self.assertEqual(
|
||||
10, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
|
||||
|
||||
def test_set_max_timeout_overrides_default_timeout(self):
|
||||
rpc.TRANSPORT.conf.rpc_response_timeout = 10
|
||||
self.assertEqual(
|
||||
10 * 10, rpc._BackingOffContextWrapper.get_max_timeout())
|
||||
rpc._BackingOffContextWrapper.set_max_timeout(10)
|
||||
self.assertEqual(10, rpc._BackingOffContextWrapper.get_max_timeout())
|
||||
|
||||
|
||||
class CastExceptionTestCase(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(CastExceptionTestCase, self).setUp()
|
||||
|
||||
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
|
||||
self.messaging_conf.transport_url = 'fake://'
|
||||
self.messaging_conf.response_timeout = 0
|
||||
self.useFixture(self.messaging_conf)
|
||||
|
||||
self.addCleanup(rpc.cleanup)
|
||||
rpc.init(CONF)
|
||||
rpc.TRANSPORT = mock.MagicMock()
|
||||
rpc.TRANSPORT._send.side_effect = Exception
|
||||
target = messaging.Target(version='1.0', topic='testing')
|
||||
self.client = rpc.get_client(target)
|
||||
self.cast_context = mock.Mock()
|
||||
|
||||
def test_cast_catches_exception(self):
|
||||
self.client.cast(self.cast_context, 'method_1')
|
||||
|
||||
|
||||
class TestConnection(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestConnection, self).setUp()
|
||||
self.conn = rpc.Connection()
|
||||
|
||||
@mock.patch.object(messaging, 'Target')
|
||||
@mock.patch.object(cfg, 'CONF')
|
||||
@mock.patch.object(rpc, 'get_server')
|
||||
def test_create_consumer(self, mock_get, mock_cfg, mock_tgt):
|
||||
mock_cfg.host = 'foo'
|
||||
server = mock.Mock()
|
||||
target = mock.Mock()
|
||||
mock_get.return_value = server
|
||||
mock_tgt.return_value = target
|
||||
|
||||
self.conn.create_consumer('topic', 'endpoints', fanout=True)
|
||||
|
||||
mock_tgt.assert_called_once_with(topic='topic', server='foo',
|
||||
fanout=True)
|
||||
mock_get.assert_called_once_with(target, 'endpoints')
|
||||
self.assertEqual([server], self.conn.servers)
|
||||
|
||||
def test_consume_in_threads(self):
|
||||
self.conn.servers = [mock.Mock(), mock.Mock()]
|
||||
|
||||
servs = self.conn.consume_in_threads()
|
||||
|
||||
for serv in self.conn.servers:
|
||||
serv.start.assert_called_once_with()
|
||||
self.assertEqual(servs, self.conn.servers)
|
||||
|
||||
def test_close(self):
|
||||
self.conn.servers = [mock.Mock(), mock.Mock()]
|
||||
|
||||
self.conn.close()
|
||||
|
||||
for serv in self.conn.servers:
|
||||
serv.stop.assert_called_once_with()
|
||||
serv.wait.assert_called_once_with()
|
@ -110,3 +110,11 @@ class TestNamespacedPlugins(base.BaseTestCase):
|
||||
plugins.new_plugin_instance('b')
|
||||
mock_epa.plugin.assert_called_once_with('c', 'd', karg='kval')
|
||||
mock_epb.plugin.assert_called_once_with()
|
||||
|
||||
|
||||
class TestListPackageModules(base.BaseTestCase):
|
||||
|
||||
def test_list_package_modules(self):
|
||||
# mainly just to ensure we can import modules for both PY2/PY3
|
||||
self.assertTrue(
|
||||
len(runtime.list_package_modules('neutron_lib.exceptions')) > 3)
|
||||
|
@ -11,6 +11,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import pkgutil
|
||||
import sys
|
||||
|
||||
from oslo_concurrency import lockutils
|
||||
@ -21,6 +22,7 @@ from stevedore import enabled
|
||||
|
||||
from neutron_lib._i18n import _
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
SYNCHRONIZED_PREFIX = 'neutron-'
|
||||
|
||||
@ -123,3 +125,20 @@ def load_class_by_alias_or_classname(namespace, name):
|
||||
exc_info=True)
|
||||
raise ImportError(_("Class not found."))
|
||||
return class_to_load
|
||||
|
||||
|
||||
def list_package_modules(package_name):
|
||||
"""Get a list of the modules for a given package.
|
||||
|
||||
:param package_name: The package name to get modules for.
|
||||
:returns: A list of module objects for the said package name.
|
||||
"""
|
||||
pkg_mod = importutils.import_module(package_name)
|
||||
modules = [pkg_mod]
|
||||
|
||||
for mod in pkgutil.walk_packages(pkg_mod.__path__):
|
||||
_, mod_name, _ = mod
|
||||
fq_name = pkg_mod.__name__ + "." + mod_name
|
||||
modules.append(importutils.import_module(fq_name))
|
||||
|
||||
return modules
|
||||
|
15
releasenotes/notes/rehome-common-rpc-5d84a9fe0faa71b7.yaml
Normal file
15
releasenotes/notes/rehome-common-rpc-5d84a9fe0faa71b7.yaml
Normal file
@ -0,0 +1,15 @@
|
||||
---
|
||||
features:
|
||||
- The ``neutron.common.rpc`` module is now available as ``neutron_lib.rpc``
|
||||
and automatically exposes all exception modules from
|
||||
``neutron_lib.exceptions`` for RPC usage.
|
||||
- Exceptions from ``neutron.common.exceptions`` are now available
|
||||
in the ``neutron_lib.exceptions`` package whereupon exceptions are now
|
||||
in their respective module (e.g. L3 exceptions are in
|
||||
``neutron_lib.exceptions.l3``, etc.).
|
||||
- The ``neutron.tests.fake_notifier`` is now available as
|
||||
``neutron_lib.tests.unit.fake_notifier``.
|
||||
- The ``neutron_lib.utils.runtime.list_package_modules`` function is now
|
||||
available for listing all modules in a said package.
|
||||
- The ``RPCFixture`` is now available in ``neutron_lib.fixtures`` for setting
|
||||
up RPC based unit tests.
|
3
tox.ini
3
tox.ini
@ -99,8 +99,9 @@ import-order-style = pep8
|
||||
|
||||
[testenv:bandit]
|
||||
# B104: Possible binding to all interfaces
|
||||
# B311: Standard pseudo-random generators are not suitable for security/cryptographic purpose
|
||||
deps = -r{toxinidir}/test-requirements.txt
|
||||
commands = bandit -r neutron_lib -x tests -n5 -s B104
|
||||
commands = bandit -r neutron_lib -x tests -n5 -s B104,B311
|
||||
|
||||
[hacking]
|
||||
import_exceptions = neutron_lib._i18n
|
||||
|
Loading…
Reference in New Issue
Block a user