Remove unused eventlet/greenlet from qpid/rabbit
Because driver should rely on executor and not directly on eventlet, delete eventlet/greenlet related code. qpid/rabbit part. Change-Id: I20a0850d54b6c6f81957beabb12f7d67f0c1e741
This commit is contained in:
parent
64f91d30a8
commit
0218991a57
|
@ -18,15 +18,12 @@ import itertools
|
|||
import logging
|
||||
import time
|
||||
|
||||
import eventlet
|
||||
import greenlet
|
||||
from oslo.config import cfg
|
||||
import six
|
||||
|
||||
from oslo.messaging._drivers import amqp as rpc_amqp
|
||||
from oslo.messaging._drivers import amqpdriver
|
||||
from oslo.messaging._drivers import common as rpc_common
|
||||
from oslo.messaging.openstack.common import excutils
|
||||
from oslo.messaging.openstack.common import importutils
|
||||
from oslo.messaging.openstack.common import jsonutils
|
||||
|
||||
|
@ -444,8 +441,6 @@ class Connection(object):
|
|||
self.connection = None
|
||||
self.session = None
|
||||
self.consumers = {}
|
||||
self.consumer_thread = None
|
||||
self.proxy_callbacks = []
|
||||
self.conf = conf
|
||||
|
||||
if server_params and 'hostname' in server_params:
|
||||
|
@ -542,8 +537,6 @@ class Connection(object):
|
|||
|
||||
def close(self):
|
||||
"""Close/release this connection."""
|
||||
self.cancel_consumer_thread()
|
||||
self.wait_on_proxy_callbacks()
|
||||
try:
|
||||
self.connection.close()
|
||||
except Exception:
|
||||
|
@ -555,8 +548,6 @@ class Connection(object):
|
|||
|
||||
def reset(self):
|
||||
"""Reset a connection so it can be used again."""
|
||||
self.cancel_consumer_thread()
|
||||
self.wait_on_proxy_callbacks()
|
||||
self.session.close()
|
||||
self.session = self.connection.session()
|
||||
self.consumers = {}
|
||||
|
@ -601,21 +592,6 @@ class Connection(object):
|
|||
raise StopIteration
|
||||
yield self.ensure(_error_callback, _consume)
|
||||
|
||||
def cancel_consumer_thread(self):
|
||||
"""Cancel a consumer thread."""
|
||||
if self.consumer_thread is not None:
|
||||
self.consumer_thread.kill()
|
||||
try:
|
||||
self.consumer_thread.wait()
|
||||
except greenlet.GreenletExit:
|
||||
pass
|
||||
self.consumer_thread = None
|
||||
|
||||
def wait_on_proxy_callbacks(self):
|
||||
"""Wait for all proxy callback threads to exit."""
|
||||
for proxy_cb in self.proxy_callbacks:
|
||||
proxy_cb.wait()
|
||||
|
||||
def publisher_send(self, cls, topic, msg):
|
||||
"""Send to a publisher based on the publisher class."""
|
||||
|
||||
|
@ -686,18 +662,6 @@ class Connection(object):
|
|||
except StopIteration:
|
||||
return
|
||||
|
||||
def consume_in_thread(self):
|
||||
"""Consumer from all queues/consumers in a greenthread."""
|
||||
@excutils.forever_retry_uncaught_exceptions
|
||||
def _consumer_thread():
|
||||
try:
|
||||
self.consume()
|
||||
except greenlet.GreenletExit:
|
||||
return
|
||||
if self.consumer_thread is None:
|
||||
self.consumer_thread = eventlet.spawn(_consumer_thread)
|
||||
return self.consumer_thread
|
||||
|
||||
|
||||
class QpidDriver(amqpdriver.AMQPDriverBase):
|
||||
|
||||
|
|
|
@ -20,8 +20,6 @@ import ssl
|
|||
import time
|
||||
import uuid
|
||||
|
||||
import eventlet
|
||||
import greenlet
|
||||
import kombu
|
||||
import kombu.connection
|
||||
import kombu.entity
|
||||
|
@ -32,7 +30,6 @@ import six
|
|||
from oslo.messaging._drivers import amqp as rpc_amqp
|
||||
from oslo.messaging._drivers import amqpdriver
|
||||
from oslo.messaging._drivers import common as rpc_common
|
||||
from oslo.messaging.openstack.common import excutils
|
||||
from oslo.messaging.openstack.common import network_utils
|
||||
from oslo.messaging.openstack.common import sslutils
|
||||
|
||||
|
@ -408,8 +405,6 @@ class Connection(object):
|
|||
|
||||
def __init__(self, conf, server_params=None):
|
||||
self.consumers = []
|
||||
self.consumer_thread = None
|
||||
self.proxy_callbacks = []
|
||||
self.conf = conf
|
||||
self.max_retries = self.conf.rabbit_max_retries
|
||||
# Try forever?
|
||||
|
@ -593,15 +588,11 @@ class Connection(object):
|
|||
|
||||
def close(self):
|
||||
"""Close/release this connection."""
|
||||
self.cancel_consumer_thread()
|
||||
self.wait_on_proxy_callbacks()
|
||||
self.connection.release()
|
||||
self.connection = None
|
||||
|
||||
def reset(self):
|
||||
"""Reset a connection so it can be used again."""
|
||||
self.cancel_consumer_thread()
|
||||
self.wait_on_proxy_callbacks()
|
||||
self.channel.close()
|
||||
self.channel = self.connection.channel()
|
||||
# work around 'memory' transport bug in 1.1.3
|
||||
|
@ -655,21 +646,6 @@ class Connection(object):
|
|||
raise StopIteration
|
||||
yield self.ensure(_error_callback, _consume)
|
||||
|
||||
def cancel_consumer_thread(self):
|
||||
"""Cancel a consumer thread."""
|
||||
if self.consumer_thread is not None:
|
||||
self.consumer_thread.kill()
|
||||
try:
|
||||
self.consumer_thread.wait()
|
||||
except greenlet.GreenletExit:
|
||||
pass
|
||||
self.consumer_thread = None
|
||||
|
||||
def wait_on_proxy_callbacks(self):
|
||||
"""Wait for all proxy callback threads to exit."""
|
||||
for proxy_cb in self.proxy_callbacks:
|
||||
proxy_cb.wait()
|
||||
|
||||
def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
|
||||
"""Send to a publisher based on the publisher class."""
|
||||
|
||||
|
@ -729,18 +705,6 @@ class Connection(object):
|
|||
except StopIteration:
|
||||
return
|
||||
|
||||
def consume_in_thread(self):
|
||||
"""Consumer from all queues/consumers in a greenthread."""
|
||||
@excutils.forever_retry_uncaught_exceptions
|
||||
def _consumer_thread():
|
||||
try:
|
||||
self.consume()
|
||||
except greenlet.GreenletExit:
|
||||
return
|
||||
if self.consumer_thread is None:
|
||||
self.consumer_thread = eventlet.spawn(_consumer_thread)
|
||||
return self.consumer_thread
|
||||
|
||||
|
||||
class RabbitDriver(amqpdriver.AMQPDriverBase):
|
||||
|
||||
|
|
Loading…
Reference in New Issue