Remove old drivers dead code
This patch removes some common helper method/class. Because they are not used anymore by current amqpdriver. Change-Id: I183750e158e05cf1d0b5e37725676d4882e0c043
This commit is contained in:
committed by
Gerrit Code Review
parent
6a2c39738c
commit
fd86e0fd54
@@ -154,21 +154,6 @@ class ConnectionContext(rpc_common.Connection):
|
|||||||
"""Caller is done with this connection."""
|
"""Caller is done with this connection."""
|
||||||
self._done()
|
self._done()
|
||||||
|
|
||||||
def create_consumer(self, topic, proxy, fanout=False):
|
|
||||||
self.connection.create_consumer(topic, proxy, fanout)
|
|
||||||
|
|
||||||
def create_worker(self, topic, proxy, pool_name):
|
|
||||||
self.connection.create_worker(topic, proxy, pool_name)
|
|
||||||
|
|
||||||
def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
|
|
||||||
self.connection.join_consumer_pool(callback,
|
|
||||||
pool_name,
|
|
||||||
topic,
|
|
||||||
exchange_name)
|
|
||||||
|
|
||||||
def consume_in_thread(self):
|
|
||||||
self.connection.consume_in_thread()
|
|
||||||
|
|
||||||
def __getattr__(self, key):
|
def __getattr__(self, key):
|
||||||
"""Proxy all other calls to the Connection instance."""
|
"""Proxy all other calls to the Connection instance."""
|
||||||
if self.connection:
|
if self.connection:
|
||||||
@@ -177,71 +162,6 @@ class ConnectionContext(rpc_common.Connection):
|
|||||||
raise rpc_common.InvalidRPCConnectionReuse()
|
raise rpc_common.InvalidRPCConnectionReuse()
|
||||||
|
|
||||||
|
|
||||||
class ReplyProxy(ConnectionContext):
|
|
||||||
"""Connection class for RPC replies / callbacks."""
|
|
||||||
def __init__(self, conf, connection_pool):
|
|
||||||
self._call_waiters = {}
|
|
||||||
self._num_call_waiters = 0
|
|
||||||
self._num_call_waiters_wrn_threshold = 10
|
|
||||||
self._reply_q = 'reply_' + uuid.uuid4().hex
|
|
||||||
super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
|
|
||||||
self.declare_direct_consumer(self._reply_q, self._process_data)
|
|
||||||
self.consume_in_thread()
|
|
||||||
|
|
||||||
def _process_data(self, message_data):
|
|
||||||
msg_id = message_data.pop('_msg_id', None)
|
|
||||||
waiter = self._call_waiters.get(msg_id)
|
|
||||||
if not waiter:
|
|
||||||
LOG.warn(_('No calling threads waiting for msg_id : %(msg_id)s'
|
|
||||||
', message : %(data)s'), {'msg_id': msg_id,
|
|
||||||
'data': message_data})
|
|
||||||
LOG.warn(_('_call_waiters: %s') % str(self._call_waiters))
|
|
||||||
else:
|
|
||||||
waiter.put(message_data)
|
|
||||||
|
|
||||||
def add_call_waiter(self, waiter, msg_id):
|
|
||||||
self._num_call_waiters += 1
|
|
||||||
if self._num_call_waiters > self._num_call_waiters_wrn_threshold:
|
|
||||||
LOG.warn(_('Number of call waiters is greater than warning '
|
|
||||||
'threshold: %d. There could be a MulticallProxyWaiter '
|
|
||||||
'leak.') % self._num_call_waiters_wrn_threshold)
|
|
||||||
self._num_call_waiters_wrn_threshold *= 2
|
|
||||||
self._call_waiters[msg_id] = waiter
|
|
||||||
|
|
||||||
def del_call_waiter(self, msg_id):
|
|
||||||
self._num_call_waiters -= 1
|
|
||||||
del self._call_waiters[msg_id]
|
|
||||||
|
|
||||||
def get_reply_q(self):
|
|
||||||
return self._reply_q
|
|
||||||
|
|
||||||
|
|
||||||
def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
|
|
||||||
failure=None, ending=False, log_failure=True):
|
|
||||||
"""Sends a reply or an error on the channel signified by msg_id.
|
|
||||||
|
|
||||||
Failure should be a sys.exc_info() tuple.
|
|
||||||
|
|
||||||
"""
|
|
||||||
with ConnectionContext(conf, connection_pool) as conn:
|
|
||||||
if failure:
|
|
||||||
failure = rpc_common.serialize_remote_exception(failure,
|
|
||||||
log_failure)
|
|
||||||
|
|
||||||
msg = {'result': reply, 'failure': failure}
|
|
||||||
if ending:
|
|
||||||
msg['ending'] = True
|
|
||||||
_add_unique_id(msg)
|
|
||||||
# If a reply_q exists, add the msg_id to the reply and pass the
|
|
||||||
# reply_q to direct_send() to use it as the response queue.
|
|
||||||
# Otherwise use the msg_id for backward compatibility.
|
|
||||||
if reply_q:
|
|
||||||
msg['_msg_id'] = msg_id
|
|
||||||
conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
|
|
||||||
else:
|
|
||||||
conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
|
|
||||||
|
|
||||||
|
|
||||||
class RpcContext(rpc_common.CommonRpcContext):
|
class RpcContext(rpc_common.CommonRpcContext):
|
||||||
"""Context that supports replying to a rpc.call."""
|
"""Context that supports replying to a rpc.call."""
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
@@ -257,14 +177,6 @@ class RpcContext(rpc_common.CommonRpcContext):
|
|||||||
values['reply_q'] = self.reply_q
|
values['reply_q'] = self.reply_q
|
||||||
return self.__class__(**values)
|
return self.__class__(**values)
|
||||||
|
|
||||||
def reply(self, reply=None, failure=None, ending=False,
|
|
||||||
connection_pool=None, log_failure=True):
|
|
||||||
if self.msg_id:
|
|
||||||
msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool,
|
|
||||||
reply, failure, ending, log_failure)
|
|
||||||
if ending:
|
|
||||||
self.msg_id = None
|
|
||||||
|
|
||||||
|
|
||||||
def unpack_context(conf, msg):
|
def unpack_context(conf, msg):
|
||||||
"""Unpack context from msg."""
|
"""Unpack context from msg."""
|
||||||
|
|||||||
@@ -108,26 +108,6 @@ class RPCException(Exception):
|
|||||||
super(RPCException, self).__init__(message)
|
super(RPCException, self).__init__(message)
|
||||||
|
|
||||||
|
|
||||||
class RemoteError(RPCException):
|
|
||||||
"""Signifies that a remote class has raised an exception.
|
|
||||||
|
|
||||||
Contains a string representation of the type of the original exception,
|
|
||||||
the value of the original exception, and the traceback. These are
|
|
||||||
sent to the parent as a joined string so printing the exception
|
|
||||||
contains all of the relevant info.
|
|
||||||
|
|
||||||
"""
|
|
||||||
msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
|
|
||||||
|
|
||||||
def __init__(self, exc_type=None, value=None, traceback=None):
|
|
||||||
self.exc_type = exc_type
|
|
||||||
self.value = value
|
|
||||||
self.traceback = traceback
|
|
||||||
super(RemoteError, self).__init__(exc_type=exc_type,
|
|
||||||
value=value,
|
|
||||||
traceback=traceback)
|
|
||||||
|
|
||||||
|
|
||||||
class Timeout(RPCException):
|
class Timeout(RPCException):
|
||||||
"""Signifies that a timeout has occurred.
|
"""Signifies that a timeout has occurred.
|
||||||
|
|
||||||
@@ -194,83 +174,6 @@ class Connection(object):
|
|||||||
"""
|
"""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
def create_consumer(self, topic, proxy, fanout=False):
|
|
||||||
"""Create a consumer on this connection.
|
|
||||||
|
|
||||||
A consumer is associated with a message queue on the backend message
|
|
||||||
bus. The consumer will read messages from the queue, unpack them, and
|
|
||||||
dispatch them to the proxy object. The contents of the message pulled
|
|
||||||
off of the queue will determine which method gets called on the proxy
|
|
||||||
object.
|
|
||||||
|
|
||||||
:param topic: This is a name associated with what to consume from.
|
|
||||||
Multiple instances of a service may consume from the same
|
|
||||||
topic. For example, all instances of nova-compute consume
|
|
||||||
from a queue called "compute". In that case, the
|
|
||||||
messages will get distributed amongst the consumers in a
|
|
||||||
round-robin fashion if fanout=False. If fanout=True,
|
|
||||||
every consumer associated with this topic will get a
|
|
||||||
copy of every message.
|
|
||||||
:param proxy: The object that will handle all incoming messages.
|
|
||||||
:param fanout: Whether or not this is a fanout topic. See the
|
|
||||||
documentation for the topic parameter for some
|
|
||||||
additional comments on this.
|
|
||||||
"""
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
def create_worker(self, topic, proxy, pool_name):
|
|
||||||
"""Create a worker on this connection.
|
|
||||||
|
|
||||||
A worker is like a regular consumer of messages directed to a
|
|
||||||
topic, except that it is part of a set of such consumers (the
|
|
||||||
"pool") which may run in parallel. Every pool of workers will
|
|
||||||
receive a given message, but only one worker in the pool will
|
|
||||||
be asked to process it. Load is distributed across the members
|
|
||||||
of the pool in round-robin fashion.
|
|
||||||
|
|
||||||
:param topic: This is a name associated with what to consume from.
|
|
||||||
Multiple instances of a service may consume from the same
|
|
||||||
topic.
|
|
||||||
:param proxy: The object that will handle all incoming messages.
|
|
||||||
:param pool_name: String containing the name of the pool of workers
|
|
||||||
"""
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
|
|
||||||
"""Register as a member of a group of consumers.
|
|
||||||
|
|
||||||
Uses given topic from the specified exchange.
|
|
||||||
Exactly one member of a given pool will receive each message.
|
|
||||||
|
|
||||||
A message will be delivered to multiple pools, if more than
|
|
||||||
one is created.
|
|
||||||
|
|
||||||
:param callback: Callable to be invoked for each message.
|
|
||||||
:type callback: callable accepting one argument
|
|
||||||
:param pool_name: The name of the consumer pool.
|
|
||||||
:type pool_name: str
|
|
||||||
:param topic: The routing topic for desired messages.
|
|
||||||
:type topic: str
|
|
||||||
:param exchange_name: The name of the message exchange where
|
|
||||||
the client should attach. Defaults to
|
|
||||||
the configured exchange.
|
|
||||||
:type exchange_name: str
|
|
||||||
"""
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
def consume_in_thread(self):
|
|
||||||
"""Spawn a thread to handle incoming messages.
|
|
||||||
|
|
||||||
Spawn a thread that will be responsible for handling all incoming
|
|
||||||
messages for consumers that were set up on this connection.
|
|
||||||
|
|
||||||
Message dispatching inside of this is expected to be implemented in a
|
|
||||||
non-blocking manner. An example implementation would be having this
|
|
||||||
thread pull messages in for all of the consumers, but utilize a thread
|
|
||||||
pool for dispatching the messages to the proxy objects.
|
|
||||||
"""
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
|
|
||||||
def _safe_log(log_func, msg, msg_data):
|
def _safe_log(log_func, msg, msg_data):
|
||||||
"""Sanitizes the msg_data field before logging."""
|
"""Sanitizes the msg_data field before logging."""
|
||||||
@@ -396,28 +299,6 @@ class CommonRpcContext(object):
|
|||||||
#local.store.context = self
|
#local.store.context = self
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def elevated(self, read_deleted=None, overwrite=False):
|
|
||||||
"""Return a version of this context with admin flag set."""
|
|
||||||
# TODO(russellb) This method is a bit of a nova-ism. It makes
|
|
||||||
# some assumptions about the data in the request context sent
|
|
||||||
# across rpc, while the rest of this class does not. We could get
|
|
||||||
# rid of this if we changed the nova code that uses this to
|
|
||||||
# convert the RpcContext back to its native RequestContext doing
|
|
||||||
# something like nova.context.RequestContext.from_dict(ctxt.to_dict())
|
|
||||||
|
|
||||||
context = self.deepcopy()
|
|
||||||
context.values['is_admin'] = True
|
|
||||||
|
|
||||||
context.values.setdefault('roles', [])
|
|
||||||
|
|
||||||
if 'admin' not in context.values['roles']:
|
|
||||||
context.values['roles'].append('admin')
|
|
||||||
|
|
||||||
if read_deleted is not None:
|
|
||||||
context.values['read_deleted'] = read_deleted
|
|
||||||
|
|
||||||
return context
|
|
||||||
|
|
||||||
|
|
||||||
class ClientException(Exception):
|
class ClientException(Exception):
|
||||||
"""Encapsulates actual exception expected to be hit by a RPC proxy object.
|
"""Encapsulates actual exception expected to be hit by a RPC proxy object.
|
||||||
@@ -429,32 +310,6 @@ class ClientException(Exception):
|
|||||||
self._exc_info = sys.exc_info()
|
self._exc_info = sys.exc_info()
|
||||||
|
|
||||||
|
|
||||||
def catch_client_exception(exceptions, func, *args, **kwargs):
|
|
||||||
try:
|
|
||||||
return func(*args, **kwargs)
|
|
||||||
except Exception as e:
|
|
||||||
if type(e) in exceptions:
|
|
||||||
raise ClientException()
|
|
||||||
else:
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
def client_exceptions(*exceptions):
|
|
||||||
"""Decorator for manager methods that raise expected exceptions.
|
|
||||||
|
|
||||||
Marking a Manager method with this decorator allows the declaration
|
|
||||||
of expected exceptions that the RPC layer should not consider fatal,
|
|
||||||
and not log as if they were generated in a real error scenario. Note
|
|
||||||
that this will cause listed exceptions to be wrapped in a
|
|
||||||
ClientException, which is used internally by the RPC layer.
|
|
||||||
"""
|
|
||||||
def outer(func):
|
|
||||||
def inner(*args, **kwargs):
|
|
||||||
return catch_client_exception(exceptions, func, *args, **kwargs)
|
|
||||||
return inner
|
|
||||||
return outer
|
|
||||||
|
|
||||||
|
|
||||||
def serialize_msg(raw_msg):
|
def serialize_msg(raw_msg):
|
||||||
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
|
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
|
||||||
# information about this format.
|
# information about this format.
|
||||||
|
|||||||
@@ -42,7 +42,6 @@ zmq = importutils.try_import('eventlet.green.zmq')
|
|||||||
pformat = pprint.pformat
|
pformat = pprint.pformat
|
||||||
Timeout = eventlet.timeout.Timeout
|
Timeout = eventlet.timeout.Timeout
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
RemoteError = rpc_common.RemoteError
|
|
||||||
RPCException = rpc_common.RPCException
|
RPCException = rpc_common.RPCException
|
||||||
|
|
||||||
# FIXME(markmc): remove this
|
# FIXME(markmc): remove this
|
||||||
|
|||||||
Reference in New Issue
Block a user