Browse Source

Always use a poll timeout in the executor

This change allows to stop the eventlet executor without
using eventlet.kill.

And also, the kombu docs actually recommend that drain_events
by default have a 1 timeout.

Co-Authored-by: Joshua Harlow <harlowja@yahoo-inc.com>

Change-Id: I3a3919e38d08267439843a127346300fd2131540
changes/69/137569/2
Mehdi Abaakouk 7 years ago
parent
commit
7bce31a2d1
  1. 4
      oslo/messaging/_executors/base.py
  2. 17
      oslo/messaging/_executors/impl_blocking.py
  3. 14
      oslo/messaging/_executors/impl_eventlet.py

4
oslo/messaging/_executors/base.py

@ -16,6 +16,10 @@ import abc
import six
# NOTE(sileht): value choosen according the best practice from kombu
# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
POLL_TIMEOUT = 1
@six.add_metaclass(abc.ABCMeta)
class ExecutorBase(object):

17
oslo/messaging/_executors/impl_blocking.py

@ -13,7 +13,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo.messaging._executors import base
from oslo.messaging._i18n import _
LOG = logging.getLogger(__name__)
class BlockingExecutor(base.ExecutorBase):
@ -36,11 +41,13 @@ class BlockingExecutor(base.ExecutorBase):
def start(self):
self._running = True
while self._running:
message = self.listener.poll(timeout=0.01)
if not message:
continue
with self.dispatcher(message) as callback:
callback()
try:
incoming = self.listener.poll(timeout=base.POLL_TIMEOUT)
if incoming is not None:
with self.dispatcher(incoming) as callback:
callback()
except Exception:
LOG.exception(_("Unexpected exception occurred."))
def stop(self):
self._running = False

14
oslo/messaging/_executors/impl_eventlet.py

@ -75,6 +75,7 @@ class EventletExecutor(base.ExecutorBase):
self.conf.register_opts(_eventlet_opts)
self._thread = None
self._greenpool = greenpool.GreenPool(self.conf.rpc_thread_pool_size)
self._running = False
def start(self):
if self._thread is not None:
@ -83,19 +84,22 @@ class EventletExecutor(base.ExecutorBase):
@excutils.forever_retry_uncaught_exceptions
def _executor_thread():
try:
while True:
incoming = self.listener.poll()
spawn_with(ctxt=self.dispatcher(incoming),
pool=self._greenpool)
while self._running:
incoming = self.listener.poll(timeout=base.POLL_TIMEOUT)
if incoming is not None:
spawn_with(ctxt=self.dispatcher(incoming),
pool=self._greenpool)
except greenlet.GreenletExit:
return
self._running = True
self._thread = eventlet.spawn(_executor_thread)
def stop(self):
if self._thread is None:
return
self._thread.kill()
self._running = False
self._thread.cancel()
def wait(self):
if self._thread is None:

Loading…
Cancel
Save