# -*- coding: utf-8 -*- # Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import collections import threading from futurist import waiters from oslo_config import cfg from oslo_utils import excutils from oslo_utils import timeutils from oslo_messaging._executors import base _pool_opts = [ cfg.IntOpt('executor_thread_pool_size', default=64, deprecated_name="rpc_thread_pool_size", help='Size of executor thread pool.'), ] class PooledExecutor(base.ExecutorBase): """A message executor which integrates with some executor. This will create a message thread that polls for messages from a dispatching thread and on reception of an incoming message places the message to be processed into a async executor to be executed at a later time. """ # These may be overridden by subclasses (and implemented using whatever # objects make most sense for the provided async execution model). _event_cls = threading.Event _lock_cls = threading.Lock # Pooling and dispatching (executor submission) will happen from a # thread created from this class/function. _thread_cls = threading.Thread # This one **must** be overridden by a subclass. _executor_cls = None # Blocking function that should wait for all provided futures to finish. _wait_for_all = staticmethod(waiters.wait_for_all) def __init__(self, conf, listener, dispatcher): super(PooledExecutor, self).__init__(conf, listener, dispatcher) self.conf.register_opts(_pool_opts) self._poller = None self._executor = None self._tombstone = self._event_cls() self._incomplete = collections.deque() self._mutator = self._lock_cls() def _do_submit(self, callback): def _on_done(fut): with self._mutator: try: self._incomplete.remove(fut) except ValueError: pass callback.done() try: fut = self._executor.submit(callback.run) except RuntimeError: # This is triggered when the executor has been shutdown... # # TODO(harlowja): should we put whatever we pulled off back # since when this is thrown it means the executor has been # shutdown already?? callback.done() return False else: with self._mutator: self._incomplete.append(fut) # Run the other post processing of the callback when done... fut.add_done_callback(_on_done) return True @excutils.forever_retry_uncaught_exceptions def _runner(self): while not self._tombstone.is_set(): incoming = self.listener.poll( timeout=self.dispatcher.batch_timeout, prefetch_size=self.dispatcher.batch_size) if not incoming: continue callback = self.dispatcher(incoming) was_submitted = self._do_submit(callback) if not was_submitted: break def start(self, override_pool_size=None): if self._executor is None: if override_pool_size is not None and int(override_pool_size) < 1: raise ValueError('The thread pool size should be a positive ' 'value.') self._executor = self._executor_cls( override_pool_size if override_pool_size else self.conf.executor_thread_pool_size) self._tombstone.clear() if self._poller is None or not self._poller.is_alive(): self._poller = self._thread_cls(target=self._runner) self._poller.daemon = True self._poller.start() def stop(self): if self._executor is not None: self._executor.shutdown(wait=False) self._tombstone.set() self.listener.stop() def wait(self, timeout=None): with timeutils.StopWatch(duration=timeout) as w: poller = self._poller if poller is not None: self._tombstone.wait(w.leftover(return_none=True)) if not self._tombstone.is_set(): return False poller.join(w.leftover(return_none=True)) if poller.is_alive(): return False self._poller = None executor = self._executor if executor is not None: with self._mutator: incomplete_fs = list(self._incomplete) if incomplete_fs: (done, not_done) = self._wait_for_all( incomplete_fs, timeout=w.leftover(return_none=True)) with self._mutator: for fut in done: try: self._incomplete.remove(fut) except ValueError: pass if not_done: return False self._executor = None return True