Fix some typos and adjust capitalization
Change-Id: I61cf108f9746fc44a08d83e11d44ed1007a6a1fa
This commit is contained in:
parent
b29a1462c2
commit
0b078b6062
@ -20,9 +20,9 @@
|
|||||||
"""
|
"""
|
||||||
Shared code between AMQP based openstack.common.rpc implementations.
|
Shared code between AMQP based openstack.common.rpc implementations.
|
||||||
|
|
||||||
The code in this module is shared between the rpc implemenations based on AMQP.
|
The code in this module is shared between the rpc implementations based on
|
||||||
Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
|
AMQP. Specifically, this includes impl_kombu and impl_qpid. impl_carrot also
|
||||||
AMQP, but is deprecated and predates this code.
|
uses AMQP, but is deprecated and predates this code.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import collections
|
import collections
|
||||||
@ -190,7 +190,7 @@ class ReplyProxy(ConnectionContext):
|
|||||||
def __init__(self, conf, connection_pool):
|
def __init__(self, conf, connection_pool):
|
||||||
self._call_waiters = {}
|
self._call_waiters = {}
|
||||||
self._num_call_waiters = 0
|
self._num_call_waiters = 0
|
||||||
self._num_call_waiters_wrn_threshhold = 10
|
self._num_call_waiters_wrn_threshold = 10
|
||||||
self._reply_q = 'reply_' + uuid.uuid4().hex
|
self._reply_q = 'reply_' + uuid.uuid4().hex
|
||||||
super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
|
super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
|
||||||
self.declare_direct_consumer(self._reply_q, self._process_data)
|
self.declare_direct_consumer(self._reply_q, self._process_data)
|
||||||
@ -209,11 +209,11 @@ class ReplyProxy(ConnectionContext):
|
|||||||
|
|
||||||
def add_call_waiter(self, waiter, msg_id):
|
def add_call_waiter(self, waiter, msg_id):
|
||||||
self._num_call_waiters += 1
|
self._num_call_waiters += 1
|
||||||
if self._num_call_waiters > self._num_call_waiters_wrn_threshhold:
|
if self._num_call_waiters > self._num_call_waiters_wrn_threshold:
|
||||||
LOG.warn(_('Number of call waiters is greater than warning '
|
LOG.warn(_('Number of call waiters is greater than warning '
|
||||||
'threshhold: %d. There could be a MulticallProxyWaiter '
|
'threshold: %d. There could be a MulticallProxyWaiter '
|
||||||
'leak.') % self._num_call_waiters_wrn_threshhold)
|
'leak.') % self._num_call_waiters_wrn_threshold)
|
||||||
self._num_call_waiters_wrn_threshhold *= 2
|
self._num_call_waiters_wrn_threshold *= 2
|
||||||
self._call_waiters[msg_id] = waiter
|
self._call_waiters[msg_id] = waiter
|
||||||
|
|
||||||
def del_call_waiter(self, msg_id):
|
def del_call_waiter(self, msg_id):
|
||||||
@ -242,7 +242,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
|
|||||||
_add_unique_id(msg)
|
_add_unique_id(msg)
|
||||||
# If a reply_q exists, add the msg_id to the reply and pass the
|
# If a reply_q exists, add the msg_id to the reply and pass the
|
||||||
# reply_q to direct_send() to use it as the response queue.
|
# reply_q to direct_send() to use it as the response queue.
|
||||||
# Otherwise use the msg_id for backward compatibilty.
|
# Otherwise use the msg_id for backward compatibility.
|
||||||
if reply_q:
|
if reply_q:
|
||||||
msg['_msg_id'] = msg_id
|
msg['_msg_id'] = msg_id
|
||||||
conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
|
conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
|
||||||
@ -278,7 +278,7 @@ def unpack_context(conf, msg):
|
|||||||
"""Unpack context from msg."""
|
"""Unpack context from msg."""
|
||||||
context_dict = {}
|
context_dict = {}
|
||||||
for key in list(msg.keys()):
|
for key in list(msg.keys()):
|
||||||
# NOTE(vish): Some versions of python don't like unicode keys
|
# NOTE(vish): Some versions of Python don't like unicode keys
|
||||||
# in kwargs.
|
# in kwargs.
|
||||||
key = str(key)
|
key = str(key)
|
||||||
if key.startswith('_context_'):
|
if key.startswith('_context_'):
|
||||||
|
@ -50,7 +50,7 @@ class AMQPIncomingMessage(base.IncomingMessage):
|
|||||||
|
|
||||||
# If a reply_q exists, add the msg_id to the reply and pass the
|
# If a reply_q exists, add the msg_id to the reply and pass the
|
||||||
# reply_q to direct_send() to use it as the response queue.
|
# reply_q to direct_send() to use it as the response queue.
|
||||||
# Otherwise use the msg_id for backward compatibilty.
|
# Otherwise use the msg_id for backward compatibility.
|
||||||
if self.reply_q:
|
if self.reply_q:
|
||||||
msg['_msg_id'] = self.msg_id
|
msg['_msg_id'] = self.msg_id
|
||||||
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
|
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
|
||||||
@ -97,7 +97,7 @@ class ReplyWaiters(object):
|
|||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._queues = {}
|
self._queues = {}
|
||||||
self._wrn_threshhold = 10
|
self._wrn_threshold = 10
|
||||||
|
|
||||||
def get(self, msg_id, timeout):
|
def get(self, msg_id, timeout):
|
||||||
try:
|
try:
|
||||||
@ -129,11 +129,11 @@ class ReplyWaiters(object):
|
|||||||
|
|
||||||
def add(self, msg_id, queue):
|
def add(self, msg_id, queue):
|
||||||
self._queues[msg_id] = queue
|
self._queues[msg_id] = queue
|
||||||
if len(self._queues) > self._wrn_threshhold:
|
if len(self._queues) > self._wrn_threshold:
|
||||||
LOG.warn('Number of call queues is greater than warning '
|
LOG.warn('Number of call queues is greater than warning '
|
||||||
'threshhold: %d. There could be a leak.' %
|
'threshold: %d. There could be a leak.' %
|
||||||
self._wrn_threshhold)
|
self._wrn_threshold)
|
||||||
self._wrn_threshhold *= 2
|
self._wrn_threshold *= 2
|
||||||
|
|
||||||
def remove(self, msg_id):
|
def remove(self, msg_id):
|
||||||
del self._queues[msg_id]
|
del self._queues[msg_id]
|
||||||
|
@ -361,7 +361,7 @@ def deserialize_remote_exception(data, allowed_remote_exmods):
|
|||||||
try:
|
try:
|
||||||
# NOTE(ameade): Dynamically create a new exception type and swap it in
|
# NOTE(ameade): Dynamically create a new exception type and swap it in
|
||||||
# as the new type for the exception. This only works on user defined
|
# as the new type for the exception. This only works on user defined
|
||||||
# Exceptions and not core python exceptions. This is important because
|
# Exceptions and not core Python exceptions. This is important because
|
||||||
# we cannot necessarily change an exception message so we must override
|
# we cannot necessarily change an exception message so we must override
|
||||||
# the __str__ method.
|
# the __str__ method.
|
||||||
failure.__class__ = new_ex_type
|
failure.__class__ = new_ex_type
|
||||||
|
@ -53,10 +53,10 @@ qpid_opts = [
|
|||||||
help='Qpid HA cluster host:port pairs'),
|
help='Qpid HA cluster host:port pairs'),
|
||||||
cfg.StrOpt('qpid_username',
|
cfg.StrOpt('qpid_username',
|
||||||
default='',
|
default='',
|
||||||
help='Username for qpid connection'),
|
help='Username for Qpid connection'),
|
||||||
cfg.StrOpt('qpid_password',
|
cfg.StrOpt('qpid_password',
|
||||||
default='',
|
default='',
|
||||||
help='Password for qpid connection',
|
help='Password for Qpid connection',
|
||||||
secret=True),
|
secret=True),
|
||||||
cfg.StrOpt('qpid_sasl_mechanisms',
|
cfg.StrOpt('qpid_sasl_mechanisms',
|
||||||
default='',
|
default='',
|
||||||
@ -123,11 +123,11 @@ class ConsumerBase(object):
|
|||||||
self.connect(session)
|
self.connect(session)
|
||||||
|
|
||||||
def connect(self, session):
|
def connect(self, session):
|
||||||
"""Declare the reciever on connect."""
|
"""Declare the receiver on connect."""
|
||||||
self._declare_receiver(session)
|
self._declare_receiver(session)
|
||||||
|
|
||||||
def reconnect(self, session):
|
def reconnect(self, session):
|
||||||
"""Re-declare the receiver after a qpid reconnect."""
|
"""Re-declare the receiver after a Qpid reconnect."""
|
||||||
self._declare_receiver(session)
|
self._declare_receiver(session)
|
||||||
|
|
||||||
def _declare_receiver(self, session):
|
def _declare_receiver(self, session):
|
||||||
@ -267,7 +267,7 @@ class Publisher(object):
|
|||||||
"type": "topic",
|
"type": "topic",
|
||||||
"x-declare": {
|
"x-declare": {
|
||||||
"durable": False,
|
"durable": False,
|
||||||
# auto-delete isn't implemented for exchanges in qpid,
|
# auto-delete isn't implemented for exchanges in Qpid,
|
||||||
# but put in here anyway
|
# but put in here anyway
|
||||||
"auto-delete": True,
|
"auto-delete": True,
|
||||||
},
|
},
|
||||||
@ -327,7 +327,7 @@ class DirectPublisher(Publisher):
|
|||||||
class TopicPublisher(Publisher):
|
class TopicPublisher(Publisher):
|
||||||
"""Publisher class for 'topic'."""
|
"""Publisher class for 'topic'."""
|
||||||
def __init__(self, conf, session, topic):
|
def __init__(self, conf, session, topic):
|
||||||
"""init a 'topic' publisher.
|
"""Init a 'topic' publisher.
|
||||||
"""
|
"""
|
||||||
exchange_name = rpc_amqp.get_control_exchange(conf)
|
exchange_name = rpc_amqp.get_control_exchange(conf)
|
||||||
super(TopicPublisher, self).__init__(session,
|
super(TopicPublisher, self).__init__(session,
|
||||||
@ -337,7 +337,7 @@ class TopicPublisher(Publisher):
|
|||||||
class FanoutPublisher(Publisher):
|
class FanoutPublisher(Publisher):
|
||||||
"""Publisher class for 'fanout'."""
|
"""Publisher class for 'fanout'."""
|
||||||
def __init__(self, conf, session, topic):
|
def __init__(self, conf, session, topic):
|
||||||
"""init a 'fanout' publisher.
|
"""Init a 'fanout' publisher.
|
||||||
"""
|
"""
|
||||||
super(FanoutPublisher, self).__init__(
|
super(FanoutPublisher, self).__init__(
|
||||||
session,
|
session,
|
||||||
@ -347,7 +347,7 @@ class FanoutPublisher(Publisher):
|
|||||||
class NotifyPublisher(Publisher):
|
class NotifyPublisher(Publisher):
|
||||||
"""Publisher class for notifications."""
|
"""Publisher class for notifications."""
|
||||||
def __init__(self, conf, session, topic):
|
def __init__(self, conf, session, topic):
|
||||||
"""init a 'topic' publisher.
|
"""Init a 'topic' publisher.
|
||||||
"""
|
"""
|
||||||
exchange_name = rpc_amqp.get_control_exchange(conf)
|
exchange_name = rpc_amqp.get_control_exchange(conf)
|
||||||
super(NotifyPublisher, self).__init__(session,
|
super(NotifyPublisher, self).__init__(session,
|
||||||
@ -582,11 +582,11 @@ class Connection(object):
|
|||||||
#
|
#
|
||||||
# We want to create a message with attributes, e.g. a TTL. We
|
# We want to create a message with attributes, e.g. a TTL. We
|
||||||
# don't really need to keep 'msg' in its JSON format any longer
|
# don't really need to keep 'msg' in its JSON format any longer
|
||||||
# so let's create an actual qpid message here and get some
|
# so let's create an actual Qpid message here and get some
|
||||||
# value-add on the go.
|
# value-add on the go.
|
||||||
#
|
#
|
||||||
# WARNING: Request timeout happens to be in the same units as
|
# WARNING: Request timeout happens to be in the same units as
|
||||||
# qpid's TTL (seconds). If this changes in the future, then this
|
# Qpid's TTL (seconds). If this changes in the future, then this
|
||||||
# will need to be altered accordingly.
|
# will need to be altered accordingly.
|
||||||
#
|
#
|
||||||
qpid_message = qpid_messaging.Message(content=msg, ttl=timeout)
|
qpid_message = qpid_messaging.Message(content=msg, ttl=timeout)
|
||||||
|
@ -349,7 +349,7 @@ class Publisher(object):
|
|||||||
class DirectPublisher(Publisher):
|
class DirectPublisher(Publisher):
|
||||||
"""Publisher class for 'direct'."""
|
"""Publisher class for 'direct'."""
|
||||||
def __init__(self, conf, channel, msg_id, **kwargs):
|
def __init__(self, conf, channel, msg_id, **kwargs):
|
||||||
"""init a 'direct' publisher.
|
"""Init a 'direct' publisher.
|
||||||
|
|
||||||
Kombu options may be passed as keyword args to override defaults
|
Kombu options may be passed as keyword args to override defaults
|
||||||
"""
|
"""
|
||||||
@ -365,7 +365,7 @@ class DirectPublisher(Publisher):
|
|||||||
class TopicPublisher(Publisher):
|
class TopicPublisher(Publisher):
|
||||||
"""Publisher class for 'topic'."""
|
"""Publisher class for 'topic'."""
|
||||||
def __init__(self, conf, channel, topic, **kwargs):
|
def __init__(self, conf, channel, topic, **kwargs):
|
||||||
"""init a 'topic' publisher.
|
"""Init a 'topic' publisher.
|
||||||
|
|
||||||
Kombu options may be passed as keyword args to override defaults
|
Kombu options may be passed as keyword args to override defaults
|
||||||
"""
|
"""
|
||||||
@ -384,7 +384,7 @@ class TopicPublisher(Publisher):
|
|||||||
class FanoutPublisher(Publisher):
|
class FanoutPublisher(Publisher):
|
||||||
"""Publisher class for 'fanout'."""
|
"""Publisher class for 'fanout'."""
|
||||||
def __init__(self, conf, channel, topic, **kwargs):
|
def __init__(self, conf, channel, topic, **kwargs):
|
||||||
"""init a 'fanout' publisher.
|
"""Init a 'fanout' publisher.
|
||||||
|
|
||||||
Kombu options may be passed as keyword args to override defaults
|
Kombu options may be passed as keyword args to override defaults
|
||||||
"""
|
"""
|
||||||
|
@ -31,7 +31,7 @@ _eventlet_opts = [
|
|||||||
|
|
||||||
class EventletExecutor(base.ExecutorBase):
|
class EventletExecutor(base.ExecutorBase):
|
||||||
|
|
||||||
"""A message exector which integrates with eventlet.
|
"""A message executor which integrates with eventlet.
|
||||||
|
|
||||||
This is an executor which polls for incoming messages from a greenthread
|
This is an executor which polls for incoming messages from a greenthread
|
||||||
and dispatches each message in its own greenthread.
|
and dispatches each message in its own greenthread.
|
||||||
|
@ -36,7 +36,7 @@ def get_local_context(ctxt):
|
|||||||
request ID, user and tenant in every message logged from a RPC endpoint
|
request ID, user and tenant in every message logged from a RPC endpoint
|
||||||
method.
|
method.
|
||||||
|
|
||||||
:returns: the context for the retuest dispatched in the current thread
|
:returns: the context for the request dispatched in the current thread
|
||||||
"""
|
"""
|
||||||
return getattr(_STORE, _KEY, None)
|
return getattr(_STORE, _KEY, None)
|
||||||
|
|
||||||
|
@ -116,7 +116,7 @@ class MessageHandlingServer(object):
|
|||||||
registering a callback with an event loop. Similarly, the executor may
|
registering a callback with an event loop. Similarly, the executor may
|
||||||
choose to dispatch messages in a new thread, coroutine or simply the
|
choose to dispatch messages in a new thread, coroutine or simply the
|
||||||
current thread. An RPCServer subclass is available for each I/O
|
current thread. An RPCServer subclass is available for each I/O
|
||||||
strategy supported by the library, so choose the subclass appropraite
|
strategy supported by the library, so choose the subclass appropriate
|
||||||
for your program.
|
for your program.
|
||||||
"""
|
"""
|
||||||
if self._executor is not None:
|
if self._executor is not None:
|
||||||
|
@ -132,7 +132,7 @@ def get_transport(conf, url=None, allowed_remote_exmods=[]):
|
|||||||
If a transport URL is supplied as a parameter, any transport configuration
|
If a transport URL is supplied as a parameter, any transport configuration
|
||||||
contained in it takes precedence. If no transport URL is supplied, but
|
contained in it takes precedence. If no transport URL is supplied, but
|
||||||
there is a transport URL supplied in the user's configuration then that
|
there is a transport URL supplied in the user's configuration then that
|
||||||
URL will take the place of the url parameter. In both cases, any
|
URL will take the place of the URL parameter. In both cases, any
|
||||||
configuration not supplied in the transport URL may be taken from
|
configuration not supplied in the transport URL may be taken from
|
||||||
individual configuration parameters in the user's configuration.
|
individual configuration parameters in the user's configuration.
|
||||||
|
|
||||||
@ -324,7 +324,7 @@ class TransportURL(object):
|
|||||||
|
|
||||||
* It is first splitted by ',' in order to support multiple hosts
|
* It is first splitted by ',' in order to support multiple hosts
|
||||||
* The last parsed username and password will be propagated to the rest
|
* The last parsed username and password will be propagated to the rest
|
||||||
of hotsts specified:
|
of hosts specified:
|
||||||
|
|
||||||
user:passwd@host1:port1,host2:port2
|
user:passwd@host1:port1,host2:port2
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user