Merge "Rework agent to better handle multithreaded checks"
This commit is contained in:
commit
fa4529cac5
@ -1,824 +0,0 @@
|
|||||||
# {{{ http://code.activestate.com/recipes/576519/ (r9)
|
|
||||||
# Author: David Decotigny, Oct 1 2008
|
|
||||||
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
|
|
||||||
# @brief Pool of threads similar to multiprocessing.Pool
|
|
||||||
# See http://docs.python.org/dev/library/multiprocessing.html
|
|
||||||
# Differences: added imap_async and imap_unordered_async, and terminate()
|
|
||||||
# has to be called explicitly (it's not registered by atexit).
|
|
||||||
#
|
|
||||||
# The general idea is that we submit works to a workqueue, either as
|
|
||||||
# single Jobs (one function to call), or JobSequences (batch of
|
|
||||||
# Jobs). Each Job is associated with an ApplyResult object which has 2
|
|
||||||
# states: waiting for the Job to complete, or Ready. Instead of
|
|
||||||
# waiting for the jobs to finish, we wait for their ApplyResult object
|
|
||||||
# to become ready: an event mechanism is used for that.
|
|
||||||
# When we apply a function to several arguments in "parallel", we need
|
|
||||||
# a way to wait for all/part of the Jobs to be processed: that's what
|
|
||||||
# "collectors" are for; they group and wait for a set of ApplyResult
|
|
||||||
# objects. Once a collector is ready to be used, we can use a
|
|
||||||
# CollectorIterator to iterate over the result values it's collecting.
|
|
||||||
#
|
|
||||||
# The methods of a Pool object use all these concepts and expose
|
|
||||||
# them to their caller in a very simple way.
|
|
||||||
|
|
||||||
import Queue
|
|
||||||
import sys
|
|
||||||
import threading
|
|
||||||
import traceback
|
|
||||||
|
|
||||||
|
|
||||||
# Item pushed on the work queue to tell the worker threads to terminate
|
|
||||||
SENTINEL = "QUIT"
|
|
||||||
|
|
||||||
|
|
||||||
def is_sentinel(obj):
|
|
||||||
"""Predicate to determine whether an item from the queue is the
|
|
||||||
|
|
||||||
signal to stop
|
|
||||||
"""
|
|
||||||
return isinstance(obj, str) and obj == SENTINEL
|
|
||||||
|
|
||||||
|
|
||||||
class TimeoutError(Exception):
|
|
||||||
|
|
||||||
"""Raised when a result is not available within the given timeout.
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class PoolWorker(threading.Thread):
|
|
||||||
|
|
||||||
"""Thread that consumes WorkUnits from a queue to process them.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, workq, *args, **kwds):
|
|
||||||
"""\param workq: Queue object to consume the work units from.
|
|
||||||
"""
|
|
||||||
threading.Thread.__init__(self, *args, **kwds)
|
|
||||||
self._workq = workq
|
|
||||||
self.running = False
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
"""Process the work unit, or wait for sentinel to exit.
|
|
||||||
"""
|
|
||||||
while True:
|
|
||||||
self.running = True
|
|
||||||
workunit = self._workq.get()
|
|
||||||
if is_sentinel(workunit):
|
|
||||||
# Got sentinel
|
|
||||||
break
|
|
||||||
|
|
||||||
# Run the job / sequence
|
|
||||||
workunit.process()
|
|
||||||
self.running = False
|
|
||||||
|
|
||||||
|
|
||||||
class Pool(object):
|
|
||||||
|
|
||||||
"""The Pool class represents a pool of worker threads.
|
|
||||||
|
|
||||||
It has methods which allows tasks to be offloaded to the
|
|
||||||
worker processes in a few different ways.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, nworkers, name="Pool"):
|
|
||||||
"""\param nworkers (integer) number of worker threads to start
|
|
||||||
|
|
||||||
\param name (string) prefix for the worker threads' name
|
|
||||||
"""
|
|
||||||
self._workq = Queue.Queue()
|
|
||||||
self._closed = False
|
|
||||||
self._workers = []
|
|
||||||
for idx in xrange(nworkers):
|
|
||||||
thr = PoolWorker(self._workq, name="Worker-%s-%d" % (name, idx))
|
|
||||||
try:
|
|
||||||
thr.start()
|
|
||||||
except Exception:
|
|
||||||
# If one thread has a problem, undo everything
|
|
||||||
self.terminate()
|
|
||||||
raise
|
|
||||||
else:
|
|
||||||
self._workers.append(thr)
|
|
||||||
|
|
||||||
def get_nworkers(self):
|
|
||||||
return len([w for w in self._workers if w.running])
|
|
||||||
|
|
||||||
def apply(self, func, args=(), kwds=None):
|
|
||||||
"""Equivalent of the apply() builtin function.
|
|
||||||
|
|
||||||
It blocks till the result is ready.
|
|
||||||
"""
|
|
||||||
if not kwds:
|
|
||||||
kwds = dict()
|
|
||||||
return self.apply_async(func, args, kwds).get()
|
|
||||||
|
|
||||||
def map(self, func, iterable, chunksize=None):
|
|
||||||
"""A parallel equivalent of the map() builtin function.
|
|
||||||
|
|
||||||
It blocks till the result is ready.
|
|
||||||
|
|
||||||
This method chops the iterable into a number of chunks which
|
|
||||||
it submits to the process pool as separate tasks. The
|
|
||||||
(approximate) size of these chunks can be specified by setting
|
|
||||||
chunksize to a positive integer.
|
|
||||||
"""
|
|
||||||
return self.map_async(func, iterable, chunksize).get()
|
|
||||||
|
|
||||||
def imap(self, func, iterable, chunksize=1):
|
|
||||||
"""An equivalent of itertools.imap().
|
|
||||||
|
|
||||||
The chunksize argument is the same as the one used by the
|
|
||||||
map() method. For very long iterables using a large value for
|
|
||||||
chunksize can make make the job complete much faster than
|
|
||||||
using the default value of 1.
|
|
||||||
|
|
||||||
Also if chunksize is 1 then the next() method of the iterator
|
|
||||||
returned by the imap() method has an optional timeout
|
|
||||||
parameter: next(timeout) will raise processing.TimeoutError if
|
|
||||||
the result cannot be returned within timeout seconds.
|
|
||||||
"""
|
|
||||||
collector = OrderedResultCollector(as_iterator=True)
|
|
||||||
self._create_sequences(func, iterable, chunksize, collector)
|
|
||||||
return iter(collector)
|
|
||||||
|
|
||||||
def imap_unordered(self, func, iterable, chunksize=1):
|
|
||||||
"""The same as imap() except that the ordering of the results
|
|
||||||
|
|
||||||
from the returned iterator should be considered arbitrary.
|
|
||||||
(Only when there is only one worker process is the order
|
|
||||||
guaranteed to be "correct".)
|
|
||||||
"""
|
|
||||||
collector = UnorderedResultCollector()
|
|
||||||
self._create_sequences(func, iterable, chunksize, collector)
|
|
||||||
return iter(collector)
|
|
||||||
|
|
||||||
def apply_async(self, func, args=(), kwds=None, callback=None):
|
|
||||||
"""A variant of the apply() method which returns an ApplyResult object.
|
|
||||||
|
|
||||||
If callback is specified then it should be a callable which
|
|
||||||
accepts a single argument. When the result becomes ready,
|
|
||||||
callback is applied to it (unless the call failed). callback
|
|
||||||
should complete immediately since otherwise the thread which
|
|
||||||
handles the results will get blocked.
|
|
||||||
"""
|
|
||||||
if not kwds:
|
|
||||||
kwds = dict()
|
|
||||||
assert not self._closed # No lock here. We assume it's atomic...
|
|
||||||
apply_result = ApplyResult(callback=callback)
|
|
||||||
job = Job(func, args, kwds, apply_result)
|
|
||||||
self._workq.put(job)
|
|
||||||
return apply_result
|
|
||||||
|
|
||||||
def map_async(self, func, iterable, chunksize=None, callback=None):
|
|
||||||
"""A variant of the map() method which returns a ApplyResult object.
|
|
||||||
|
|
||||||
If callback is specified then it should be a callable which
|
|
||||||
accepts a single argument. When the result becomes ready
|
|
||||||
callback is applied to it (unless the call failed). callback
|
|
||||||
should complete immediately since otherwise the thread which
|
|
||||||
handles the results will get blocked.
|
|
||||||
"""
|
|
||||||
apply_result = ApplyResult(callback=callback)
|
|
||||||
collector = OrderedResultCollector(apply_result, as_iterator=False)
|
|
||||||
self._create_sequences(func, iterable, chunksize, collector)
|
|
||||||
return apply_result
|
|
||||||
|
|
||||||
def imap_async(self, func, iterable, chunksize=None, callback=None):
|
|
||||||
"""A variant of the imap() method which returns an ApplyResult
|
|
||||||
|
|
||||||
object that provides an iterator (next method(timeout)
|
|
||||||
available).
|
|
||||||
|
|
||||||
If callback is specified then it should be a callable which
|
|
||||||
accepts a single argument. When the resulting iterator becomes
|
|
||||||
ready, callback is applied to it (unless the call
|
|
||||||
failed). callback should complete immediately since otherwise
|
|
||||||
the thread which handles the results will get blocked.
|
|
||||||
"""
|
|
||||||
apply_result = ApplyResult(callback=callback)
|
|
||||||
collector = OrderedResultCollector(apply_result, as_iterator=True)
|
|
||||||
self._create_sequences(func, iterable, chunksize, collector)
|
|
||||||
return apply_result
|
|
||||||
|
|
||||||
def imap_unordered_async(self, func, iterable, chunksize=None,
|
|
||||||
callback=None):
|
|
||||||
"""A variant of the imap_unordered() method which returns an
|
|
||||||
|
|
||||||
ApplyResult object that provides an iterator (next method(timeout)
|
|
||||||
available).
|
|
||||||
|
|
||||||
If callback is specified then it should be a callable which
|
|
||||||
accepts a single argument. When the resulting iterator becomes
|
|
||||||
ready, callback is applied to it (unless the call
|
|
||||||
failed). callback should complete immediately since otherwise
|
|
||||||
the thread which handles the results will get blocked.
|
|
||||||
"""
|
|
||||||
apply_result = ApplyResult(callback=callback)
|
|
||||||
collector = UnorderedResultCollector(apply_result)
|
|
||||||
self._create_sequences(func, iterable, chunksize, collector)
|
|
||||||
return apply_result
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
"""Prevents any more tasks from being submitted to the pool.
|
|
||||||
|
|
||||||
Once all the tasks have been completed the worker
|
|
||||||
processes will exit.
|
|
||||||
"""
|
|
||||||
# No lock here. We assume it's sufficiently atomic...
|
|
||||||
self._closed = True
|
|
||||||
|
|
||||||
def terminate(self):
|
|
||||||
"""Stops the worker processes immediately without completing outstanding work.
|
|
||||||
|
|
||||||
When the pool object is garbage collected terminate() will be called immediately.
|
|
||||||
"""
|
|
||||||
self.close()
|
|
||||||
|
|
||||||
# Clearing the job queue
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
self._workq.get_nowait()
|
|
||||||
except Queue.Empty:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Send one sentinel for each worker thread: each thread will die
|
|
||||||
# eventually, leaving the next sentinel for the next thread
|
|
||||||
for thr in self._workers:
|
|
||||||
self._workq.put(SENTINEL)
|
|
||||||
|
|
||||||
def join(self):
|
|
||||||
"""Wait for the worker processes to exit.
|
|
||||||
|
|
||||||
One must call close() or terminate() before using join().
|
|
||||||
"""
|
|
||||||
for thr in self._workers:
|
|
||||||
thr.join()
|
|
||||||
|
|
||||||
def _create_sequences(self, func, iterable, chunksize, collector=None):
|
|
||||||
"""Create the WorkUnit objects to process and pushes them on the work queue.
|
|
||||||
|
|
||||||
Each work unit is meant to process a slice of iterable of size chunksize.
|
|
||||||
If collector is specified, then the ApplyResult objects associated with
|
|
||||||
the jobs will notify collector when their result becomes ready.
|
|
||||||
|
|
||||||
\return the list of WorkUnit objects (basically: JobSequences)
|
|
||||||
pushed onto the work queue
|
|
||||||
"""
|
|
||||||
assert not self._closed # No lock here. We assume it's atomic...
|
|
||||||
sequences = []
|
|
||||||
results = []
|
|
||||||
it_ = iter(iterable)
|
|
||||||
exit_loop = False
|
|
||||||
while not exit_loop:
|
|
||||||
seq = []
|
|
||||||
for i in xrange(chunksize or 1):
|
|
||||||
try:
|
|
||||||
arg = it_.next()
|
|
||||||
except StopIteration:
|
|
||||||
exit_loop = True
|
|
||||||
break
|
|
||||||
apply_result = ApplyResult(collector)
|
|
||||||
job = Job(func, (arg,), {}, apply_result)
|
|
||||||
seq.append(job)
|
|
||||||
results.append(apply_result)
|
|
||||||
sequences.append(JobSequence(seq))
|
|
||||||
|
|
||||||
for seq in sequences:
|
|
||||||
self._workq.put(seq)
|
|
||||||
|
|
||||||
return sequences
|
|
||||||
|
|
||||||
|
|
||||||
class WorkUnit(object):
|
|
||||||
|
|
||||||
"""ABC for a unit of work submitted to the worker threads.
|
|
||||||
|
|
||||||
It's basically just an object equipped with a process() method
|
|
||||||
"""
|
|
||||||
|
|
||||||
def process(self):
|
|
||||||
"""Do the work. Shouldn't raise any exception"""
|
|
||||||
raise NotImplementedError("Children must override Process")
|
|
||||||
|
|
||||||
|
|
||||||
class Job(WorkUnit):
|
|
||||||
|
|
||||||
"""A work unit that corresponds to the execution of a single function.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, func, args, kwds, apply_result):
|
|
||||||
"""\param func/args/kwds used to call the function
|
|
||||||
|
|
||||||
\param apply_result ApplyResult object that holds the result
|
|
||||||
of the function call
|
|
||||||
"""
|
|
||||||
WorkUnit.__init__(self)
|
|
||||||
self._func = func
|
|
||||||
self._args = args
|
|
||||||
self._kwds = kwds
|
|
||||||
self._result = apply_result
|
|
||||||
|
|
||||||
def process(self):
|
|
||||||
"""Call the function with the args/kwds and tell the ApplyResult
|
|
||||||
|
|
||||||
that its result is ready. Correctly handles the exceptions
|
|
||||||
happening during the execution of the function.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
result = self._func(*self._args, **self._kwds)
|
|
||||||
except Exception:
|
|
||||||
self._result._set_exception()
|
|
||||||
else:
|
|
||||||
self._result._set_value(result)
|
|
||||||
|
|
||||||
|
|
||||||
class JobSequence(WorkUnit):
|
|
||||||
|
|
||||||
"""A work unit that corresponds to the processing of a continuous
|
|
||||||
sequence of Job objects
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, jobs):
|
|
||||||
WorkUnit.__init__(self)
|
|
||||||
self._jobs = jobs
|
|
||||||
|
|
||||||
def process(self):
|
|
||||||
"""Call process() on all the Job objects that have been specified.
|
|
||||||
"""
|
|
||||||
for job in self._jobs:
|
|
||||||
job.process()
|
|
||||||
|
|
||||||
|
|
||||||
class ApplyResult(object):
|
|
||||||
|
|
||||||
"""An object associated with a Job object that holds its result:
|
|
||||||
|
|
||||||
it's available during the whole life the Job and after, even when
|
|
||||||
the Job didn't process yet. It's possible to use this object to
|
|
||||||
wait for the result/exception of the job to be available.
|
|
||||||
|
|
||||||
The result objects returns by the Pool::*_async() methods are of
|
|
||||||
this type
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, collector=None, callback=None):
|
|
||||||
"""\param collector when not None, the notify_ready() method of
|
|
||||||
|
|
||||||
the collector will be called when the result from the Job is
|
|
||||||
ready
|
|
||||||
\param callback when not None, function to call when the
|
|
||||||
result becomes available (this is the paramater passed to the
|
|
||||||
Pool::*_async() methods.
|
|
||||||
"""
|
|
||||||
self._success = False
|
|
||||||
self._event = threading.Event()
|
|
||||||
self._data = None
|
|
||||||
self._collector = None
|
|
||||||
self._callback = callback
|
|
||||||
|
|
||||||
if collector is not None:
|
|
||||||
collector.register_result(self)
|
|
||||||
self._collector = collector
|
|
||||||
|
|
||||||
def get(self, timeout=None):
|
|
||||||
"""Returns the result when it arrives.
|
|
||||||
|
|
||||||
If timeout is not None and the result does not arrive within timeout
|
|
||||||
seconds then TimeoutError is raised. If the remote call raised an
|
|
||||||
exception then that exception will be re-raised by get().
|
|
||||||
"""
|
|
||||||
if not self.wait(timeout):
|
|
||||||
raise TimeoutError("Result not available within %fs" % timeout)
|
|
||||||
if self._success:
|
|
||||||
return self._data
|
|
||||||
raise self._data[0], self._data[1], self._data[2]
|
|
||||||
|
|
||||||
def wait(self, timeout=None):
|
|
||||||
"""Waits until the result is available or until timeout seconds pass.
|
|
||||||
"""
|
|
||||||
self._event.wait(timeout)
|
|
||||||
return self._event.isSet()
|
|
||||||
|
|
||||||
def ready(self):
|
|
||||||
"""Returns whether the call has completed.
|
|
||||||
"""
|
|
||||||
return self._event.isSet()
|
|
||||||
|
|
||||||
def successful(self):
|
|
||||||
"""Returns whether the call completed without raising an exception.
|
|
||||||
|
|
||||||
Will raise AssertionError if the result is not ready.
|
|
||||||
"""
|
|
||||||
assert self.ready()
|
|
||||||
return self._success
|
|
||||||
|
|
||||||
def _set_value(self, value):
|
|
||||||
"""Called by a Job object to tell the result is ready, and
|
|
||||||
|
|
||||||
provides the value of this result. The object will become
|
|
||||||
ready and successful. The collector's notify_ready() method
|
|
||||||
will be called, and the callback method too.
|
|
||||||
"""
|
|
||||||
assert not self.ready()
|
|
||||||
self._data = value
|
|
||||||
self._success = True
|
|
||||||
self._event.set()
|
|
||||||
if self._collector is not None:
|
|
||||||
self._collector.notify_ready(self)
|
|
||||||
if self._callback is not None:
|
|
||||||
try:
|
|
||||||
self._callback(value)
|
|
||||||
except Exception:
|
|
||||||
traceback.print_exc()
|
|
||||||
|
|
||||||
def _set_exception(self):
|
|
||||||
"""Called by a Job object to tell that an exception occurred
|
|
||||||
|
|
||||||
during the processing of the function. The object will become
|
|
||||||
ready but not successful. The collector's notify_ready()
|
|
||||||
method will be called, but NOT the callback method
|
|
||||||
"""
|
|
||||||
assert not self.ready()
|
|
||||||
self._data = sys.exc_info()
|
|
||||||
self._success = False
|
|
||||||
self._event.set()
|
|
||||||
if self._collector is not None:
|
|
||||||
self._collector.notify_ready(self)
|
|
||||||
|
|
||||||
|
|
||||||
class AbstractResultCollector(object):
|
|
||||||
|
|
||||||
"""ABC to define the interface of a ResultCollector object.
|
|
||||||
|
|
||||||
It is basically an object which knows whuich results it's waiting for,
|
|
||||||
and which is able to get notify when they get available. It is
|
|
||||||
also able to provide an iterator over the results when they are
|
|
||||||
available.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, to_notify):
|
|
||||||
"""\param to_notify ApplyResult object to notify when all the
|
|
||||||
|
|
||||||
results we're waiting for become available. Can be None.
|
|
||||||
"""
|
|
||||||
self._to_notify = to_notify
|
|
||||||
|
|
||||||
def register_result(self, apply_result):
|
|
||||||
"""Used to identify which results we're waiting for.
|
|
||||||
|
|
||||||
Will always be called BEFORE the Jobs get submitted to the work
|
|
||||||
queue, and BEFORE the __iter__ and _get_result() methods can
|
|
||||||
be called
|
|
||||||
\param apply_result ApplyResult object to add in our collection
|
|
||||||
"""
|
|
||||||
raise NotImplementedError("Children classes must implement it")
|
|
||||||
|
|
||||||
def notify_ready(self, apply_result):
|
|
||||||
"""Called by the ApplyResult object (already registered via
|
|
||||||
|
|
||||||
register_result()) that it is now ready (ie. the Job's result
|
|
||||||
is available or an exception has been raised).
|
|
||||||
\param apply_result ApplyResult object telling us that the job
|
|
||||||
has been processed
|
|
||||||
"""
|
|
||||||
raise NotImplementedError("Children classes must implement it")
|
|
||||||
|
|
||||||
def _get_result(self, idx, timeout=None):
|
|
||||||
"""Called by the CollectorIterator object to retrieve the
|
|
||||||
|
|
||||||
result's values one after another (order defined by the
|
|
||||||
implementation)
|
|
||||||
\param idx The index of the result we want, wrt collector's order
|
|
||||||
\param timeout integer telling how long to wait (in seconds)
|
|
||||||
for the result at index idx to be available, or None (wait
|
|
||||||
forever)
|
|
||||||
"""
|
|
||||||
raise NotImplementedError("Children classes must implement it")
|
|
||||||
|
|
||||||
def __iter__(self):
|
|
||||||
"""Return a new CollectorIterator object for this collector.
|
|
||||||
"""
|
|
||||||
return CollectorIterator(self)
|
|
||||||
|
|
||||||
|
|
||||||
class CollectorIterator(object):
|
|
||||||
|
|
||||||
"""An iterator that allows to iterate over the result values
|
|
||||||
|
|
||||||
available in the given collector object. Equipped with an extended
|
|
||||||
next() method accepting a timeout argument. Created by the
|
|
||||||
AbstractResultCollector::__iter__() method
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, collector):
|
|
||||||
"""\param AbstractResultCollector instance.
|
|
||||||
"""
|
|
||||||
self._collector = collector
|
|
||||||
self._idx = 0
|
|
||||||
|
|
||||||
def __iter__(self):
|
|
||||||
return self
|
|
||||||
|
|
||||||
def next(self, timeout=None):
|
|
||||||
"""Return the next result value in the sequence.
|
|
||||||
|
|
||||||
Raise StopIteration at the end. Can raise the exception raised by
|
|
||||||
the Job.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
apply_result = self._collector._get_result(self._idx, timeout)
|
|
||||||
except IndexError:
|
|
||||||
# Reset for next time
|
|
||||||
self._idx = 0
|
|
||||||
raise StopIteration
|
|
||||||
except Exception:
|
|
||||||
self._idx = 0
|
|
||||||
raise
|
|
||||||
self._idx += 1
|
|
||||||
assert apply_result.ready()
|
|
||||||
return apply_result.get(0)
|
|
||||||
|
|
||||||
|
|
||||||
class UnorderedResultCollector(AbstractResultCollector):
|
|
||||||
|
|
||||||
"""An AbstractResultCollector implementation that collects the
|
|
||||||
|
|
||||||
values of the ApplyResult objects in the order they become ready. The
|
|
||||||
CollectorIterator object returned by __iter__() will iterate over
|
|
||||||
them in the order they become ready.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, to_notify=None):
|
|
||||||
"""\param to_notify ApplyResult object to notify when all the
|
|
||||||
|
|
||||||
results we're waiting for become available. Can be None.
|
|
||||||
"""
|
|
||||||
AbstractResultCollector.__init__(self, to_notify)
|
|
||||||
self._cond = threading.Condition()
|
|
||||||
self._collection = []
|
|
||||||
self._expected = 0
|
|
||||||
|
|
||||||
def register_result(self, apply_result):
|
|
||||||
"""Used to identify which results we're waiting for.
|
|
||||||
|
|
||||||
Will always be called BEFORE the Jobs get submitted to the work
|
|
||||||
queue, and BEFORE the __iter__ and _get_result() methods can
|
|
||||||
be called
|
|
||||||
\param apply_result ApplyResult object to add in our collection
|
|
||||||
"""
|
|
||||||
self._expected += 1
|
|
||||||
|
|
||||||
def _get_result(self, idx, timeout=None):
|
|
||||||
"""Called by the CollectorIterator object to retrieve the
|
|
||||||
|
|
||||||
result's values one after another, in the order the results have
|
|
||||||
become available.
|
|
||||||
\param idx The index of the result we want, wrt collector's order
|
|
||||||
\param timeout integer telling how long to wait (in seconds)
|
|
||||||
for the result at index idx to be available, or None (wait
|
|
||||||
forever)
|
|
||||||
"""
|
|
||||||
self._cond.acquire()
|
|
||||||
try:
|
|
||||||
if idx >= self._expected:
|
|
||||||
raise IndexError
|
|
||||||
elif idx < len(self._collection):
|
|
||||||
return self._collection[idx]
|
|
||||||
elif idx != len(self._collection):
|
|
||||||
# Violation of the sequence protocol
|
|
||||||
raise IndexError()
|
|
||||||
else:
|
|
||||||
self._cond.wait(timeout=timeout)
|
|
||||||
try:
|
|
||||||
return self._collection[idx]
|
|
||||||
except IndexError:
|
|
||||||
# Still not added !
|
|
||||||
raise TimeoutError("Timeout while waiting for results")
|
|
||||||
finally:
|
|
||||||
self._cond.release()
|
|
||||||
|
|
||||||
def notify_ready(self, apply_result):
|
|
||||||
"""Called by the ApplyResult object (already registered via
|
|
||||||
|
|
||||||
register_result()) that it is now ready (ie. the Job's result
|
|
||||||
is available or an exception has been raised).
|
|
||||||
\param apply_result ApplyResult object telling us that the job
|
|
||||||
has been processed
|
|
||||||
"""
|
|
||||||
first_item = False
|
|
||||||
self._cond.acquire()
|
|
||||||
try:
|
|
||||||
self._collection.append(apply_result)
|
|
||||||
first_item = (len(self._collection) == 1)
|
|
||||||
|
|
||||||
self._cond.notifyAll()
|
|
||||||
finally:
|
|
||||||
self._cond.release()
|
|
||||||
|
|
||||||
if first_item and self._to_notify is not None:
|
|
||||||
self._to_notify._set_value(iter(self))
|
|
||||||
|
|
||||||
|
|
||||||
class OrderedResultCollector(AbstractResultCollector):
|
|
||||||
|
|
||||||
"""An AbstractResultCollector implementation that collects the
|
|
||||||
|
|
||||||
values of the ApplyResult objects in the order they have been
|
|
||||||
submitted. The CollectorIterator object returned by __iter__()
|
|
||||||
will iterate over them in the order they have been submitted.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, to_notify=None, as_iterator=True):
|
|
||||||
"""\param to_notify ApplyResult object to notify when all the
|
|
||||||
|
|
||||||
results we're waiting for become available. Can be None.
|
|
||||||
\param as_iterator boolean telling whether the result value
|
|
||||||
set on to_notify should be an iterator (available as soon as 1
|
|
||||||
result arrived) or a list (available only after the last
|
|
||||||
result arrived)
|
|
||||||
"""
|
|
||||||
AbstractResultCollector.__init__(self, to_notify)
|
|
||||||
self._results = []
|
|
||||||
self._lock = threading.Lock()
|
|
||||||
self._remaining = 0
|
|
||||||
self._as_iterator = as_iterator
|
|
||||||
|
|
||||||
def register_result(self, apply_result):
|
|
||||||
"""Used to identify which results we're waiting for.
|
|
||||||
|
|
||||||
Will always be called BEFORE the Jobs get submitted to the work
|
|
||||||
queue, and BEFORE the __iter__ and _get_result() methods can
|
|
||||||
be called
|
|
||||||
\param apply_result ApplyResult object to add in our collection
|
|
||||||
"""
|
|
||||||
self._results.append(apply_result)
|
|
||||||
self._remaining += 1
|
|
||||||
|
|
||||||
def _get_result(self, idx, timeout=None):
|
|
||||||
"""Called by the CollectorIterator object to retrieve the
|
|
||||||
|
|
||||||
result's values one after another (order defined by the
|
|
||||||
implementation)
|
|
||||||
\param idx The index of the result we want, wrt collector's order
|
|
||||||
\param timeout integer telling how long to wait (in seconds)
|
|
||||||
for the result at index idx to be available, or None (wait
|
|
||||||
forever)
|
|
||||||
"""
|
|
||||||
res = self._results[idx]
|
|
||||||
res.wait(timeout)
|
|
||||||
return res
|
|
||||||
|
|
||||||
def notify_ready(self, apply_result):
|
|
||||||
"""Called by the ApplyResult object (already registered via
|
|
||||||
|
|
||||||
register_result()) that it is now ready (ie. the Job's result
|
|
||||||
is available or an exception has been raised).
|
|
||||||
\param apply_result ApplyResult object telling us that the job
|
|
||||||
has been processed
|
|
||||||
"""
|
|
||||||
got_first = False
|
|
||||||
got_last = False
|
|
||||||
self._lock.acquire()
|
|
||||||
try:
|
|
||||||
assert self._remaining > 0
|
|
||||||
got_first = (len(self._results) == self._remaining)
|
|
||||||
self._remaining -= 1
|
|
||||||
got_last = (self._remaining == 0)
|
|
||||||
finally:
|
|
||||||
self._lock.release()
|
|
||||||
|
|
||||||
if self._to_notify is not None:
|
|
||||||
if self._as_iterator and got_first:
|
|
||||||
self._to_notify._set_value(iter(self))
|
|
||||||
elif not self._as_iterator and got_last:
|
|
||||||
try:
|
|
||||||
lst = [r.get(0) for r in self._results]
|
|
||||||
except Exception:
|
|
||||||
self._to_notify._set_exception()
|
|
||||||
else:
|
|
||||||
self._to_notify._set_value(lst)
|
|
||||||
|
|
||||||
|
|
||||||
def _test():
|
|
||||||
"""Some tests.
|
|
||||||
"""
|
|
||||||
import time
|
|
||||||
|
|
||||||
def f(x):
|
|
||||||
return x * x
|
|
||||||
|
|
||||||
def work(seconds):
|
|
||||||
print("[%d] Start to work for %fs..." % (threading.thread.get_ident(), seconds))
|
|
||||||
time.sleep(seconds)
|
|
||||||
print("[%d] Work done (%fs)." % (threading.thread.get_ident(), seconds))
|
|
||||||
return "%d slept %fs" % (threading.thread.get_ident(), seconds)
|
|
||||||
|
|
||||||
# Test copy/pasted from multiprocessing
|
|
||||||
pool = Pool(9) # start 4 worker threads
|
|
||||||
|
|
||||||
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously
|
|
||||||
print(result.get(timeout=1)) # prints "100" unless slow computer
|
|
||||||
|
|
||||||
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
|
|
||||||
|
|
||||||
it = pool.imap(f, range(10))
|
|
||||||
print(it.next()) # prints "0"
|
|
||||||
print(it.next()) # prints "1"
|
|
||||||
print(it.next(timeout=1)) # prints "4" unless slow computer
|
|
||||||
|
|
||||||
# Test apply_sync exceptions
|
|
||||||
result = pool.apply_async(time.sleep, (3,))
|
|
||||||
try:
|
|
||||||
print(result.get(timeout=1)) # raises `TimeoutError`
|
|
||||||
except TimeoutError:
|
|
||||||
print("Good. Got expected timeout exception.")
|
|
||||||
else:
|
|
||||||
assert False, "Expected exception !"
|
|
||||||
print(result.get())
|
|
||||||
|
|
||||||
def cb(s):
|
|
||||||
print("Result ready: %s" % s)
|
|
||||||
|
|
||||||
# Test imap()
|
|
||||||
for res in pool.imap(work, xrange(10, 3, -1), chunksize=4):
|
|
||||||
print("Item:", res)
|
|
||||||
|
|
||||||
# Test imap_unordered()
|
|
||||||
for res in pool.imap_unordered(work, xrange(10, 3, -1)):
|
|
||||||
print("Item:", res)
|
|
||||||
|
|
||||||
# Test map_async()
|
|
||||||
result = pool.map_async(work, xrange(10), callback=cb)
|
|
||||||
try:
|
|
||||||
print(result.get(timeout=1)) # raises `TimeoutError`
|
|
||||||
except TimeoutError:
|
|
||||||
print("Good. Got expected timeout exception.")
|
|
||||||
else:
|
|
||||||
assert False, "Expected exception !"
|
|
||||||
print(result.get())
|
|
||||||
|
|
||||||
# Test imap_async()
|
|
||||||
result = pool.imap_async(work, xrange(3, 10), callback=cb)
|
|
||||||
try:
|
|
||||||
print(result.get(timeout=1)) # raises `TimeoutError`
|
|
||||||
except TimeoutError:
|
|
||||||
print("Good. Got expected timeout exception.")
|
|
||||||
else:
|
|
||||||
assert False, "Expected exception !"
|
|
||||||
for i in result.get():
|
|
||||||
print("Item:", i)
|
|
||||||
print("### Loop again:")
|
|
||||||
for i in result.get():
|
|
||||||
print("Item2:", i)
|
|
||||||
|
|
||||||
# Test imap_unordered_async()
|
|
||||||
result = pool.imap_unordered_async(work, xrange(10, 3, -1), callback=cb)
|
|
||||||
try:
|
|
||||||
print(result.get(timeout=1)) # raises `TimeoutError`
|
|
||||||
except TimeoutError:
|
|
||||||
print("Good. Got expected timeout exception.")
|
|
||||||
else:
|
|
||||||
assert False, "Expected exception !"
|
|
||||||
for i in result.get():
|
|
||||||
print("Item1:", i)
|
|
||||||
for i in result.get():
|
|
||||||
print("Item2:", i)
|
|
||||||
r = result.get()
|
|
||||||
for i in r:
|
|
||||||
print("Item3:", i)
|
|
||||||
for i in r:
|
|
||||||
print("Item4:", i)
|
|
||||||
for i in r:
|
|
||||||
print("Item5:", i)
|
|
||||||
|
|
||||||
#
|
|
||||||
# The case for the exceptions
|
|
||||||
#
|
|
||||||
|
|
||||||
# Exceptions in imap_unordered_async()
|
|
||||||
result = pool.imap_unordered_async(work, xrange(2, -10, -1), callback=cb)
|
|
||||||
time.sleep(3)
|
|
||||||
try:
|
|
||||||
for i in result.get():
|
|
||||||
print("Got item:", i)
|
|
||||||
except IOError:
|
|
||||||
print("Good. Got expected exception:")
|
|
||||||
traceback.print_exc()
|
|
||||||
|
|
||||||
# Exceptions in imap_async()
|
|
||||||
result = pool.imap_async(work, xrange(2, -10, -1), callback=cb)
|
|
||||||
time.sleep(3)
|
|
||||||
try:
|
|
||||||
for i in result.get():
|
|
||||||
print("Got item:", i)
|
|
||||||
except IOError:
|
|
||||||
print("Good. Got expected exception:")
|
|
||||||
traceback.print_exc()
|
|
||||||
|
|
||||||
# Stop the test: need to stop the pool !!!
|
|
||||||
pool.terminate()
|
|
||||||
print("End of tests")
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
_test()
|
|
||||||
# end of http://code.activestate.com/recipes/576519/ }}}
|
|
@ -5,8 +5,11 @@ import Queue
|
|||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
from gevent import monkey
|
||||||
|
from gevent import Timeout
|
||||||
|
from multiprocessing.dummy import Pool as ThreadPool
|
||||||
|
|
||||||
import monasca_agent.collector.checks
|
import monasca_agent.collector.checks
|
||||||
import monasca_agent.collector.checks.libs.thread_pool
|
|
||||||
|
|
||||||
|
|
||||||
DEFAULT_TIMEOUT = 180
|
DEFAULT_TIMEOUT = 180
|
||||||
@ -18,6 +21,7 @@ FAILURE = "FAILURE"
|
|||||||
up_down = collections.namedtuple('up_down', ['UP', 'DOWN'])
|
up_down = collections.namedtuple('up_down', ['UP', 'DOWN'])
|
||||||
Status = up_down('UP', 'DOWN')
|
Status = up_down('UP', 'DOWN')
|
||||||
EventType = up_down("servicecheck.state_change.up", "servicecheck.state_change.down")
|
EventType = up_down("servicecheck.state_change.up", "servicecheck.state_change.down")
|
||||||
|
monkey.patch_all()
|
||||||
|
|
||||||
|
|
||||||
class ServicesCheck(monasca_agent.collector.checks.AgentCheck):
|
class ServicesCheck(monasca_agent.collector.checks.AgentCheck):
|
||||||
@ -25,7 +29,7 @@ class ServicesCheck(monasca_agent.collector.checks.AgentCheck):
|
|||||||
|
|
||||||
"""Services checks inherits from this class.
|
"""Services checks inherits from this class.
|
||||||
|
|
||||||
This class should never be directly instanciated.
|
This class should never be directly instantiated.
|
||||||
|
|
||||||
Work flow:
|
Work flow:
|
||||||
The main agent loop will call the check function for each instance for
|
The main agent loop will call the check function for each instance for
|
||||||
@ -47,82 +51,71 @@ class ServicesCheck(monasca_agent.collector.checks.AgentCheck):
|
|||||||
# A dictionary to keep track of service statuses
|
# A dictionary to keep track of service statuses
|
||||||
self.statuses = {}
|
self.statuses = {}
|
||||||
self.notified = {}
|
self.notified = {}
|
||||||
|
self.resultsq = Queue.Queue()
|
||||||
self.nb_failures = 0
|
self.nb_failures = 0
|
||||||
self.pool_started = False
|
self.pool = None
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
self.stop_pool()
|
|
||||||
self.pool_started = False
|
|
||||||
|
|
||||||
def start_pool(self):
|
|
||||||
# The pool size should be the minimum between the number of instances
|
# The pool size should be the minimum between the number of instances
|
||||||
# and the DEFAULT_SIZE_POOL. It can also be overridden by the 'threads_count'
|
# and the DEFAULT_SIZE_POOL. It can also be overridden by the 'threads_count'
|
||||||
# parameter in the init_config of the check
|
# parameter in the init_config of the check
|
||||||
self.log.info("Starting Thread Pool")
|
|
||||||
default_size = min(self.instance_count(), DEFAULT_SIZE_POOL)
|
default_size = min(self.instance_count(), DEFAULT_SIZE_POOL)
|
||||||
self.pool_size = int(self.init_config.get('threads_count', default_size))
|
self.pool_size = int(self.init_config.get('threads_count', default_size))
|
||||||
self.timeout = int(self.agent_config.get('timeout', DEFAULT_TIMEOUT))
|
self.timeout = int(self.agent_config.get('timeout', DEFAULT_TIMEOUT))
|
||||||
|
|
||||||
self.pool = monasca_agent.collector.checks.libs.thread_pool.Pool(self.pool_size)
|
def start_pool(self):
|
||||||
|
self.log.info("Starting Thread Pool")
|
||||||
self.resultsq = Queue.Queue()
|
self.pool = ThreadPool(self.pool_size)
|
||||||
self.jobs_status = {}
|
if threading.activeCount() > MAX_ALLOWED_THREADS:
|
||||||
self.pool_started = True
|
self.log.error("Thread count ({0}) exceeds maximum ({1})".format(threading.activeCount(),
|
||||||
|
MAX_ALLOWED_THREADS))
|
||||||
|
self.running_jobs = set()
|
||||||
|
|
||||||
def stop_pool(self):
|
def stop_pool(self):
|
||||||
self.log.info("Stopping Thread Pool")
|
self.log.info("Stopping Thread Pool")
|
||||||
if self.pool_started:
|
if self.pool:
|
||||||
self.pool.terminate()
|
self.pool.close()
|
||||||
self.pool.join()
|
self.pool.join()
|
||||||
self.jobs_status.clear()
|
self.pool = None
|
||||||
assert self.pool.get_nworkers() == 0
|
|
||||||
|
|
||||||
def restart_pool(self):
|
def restart_pool(self):
|
||||||
self.stop_pool()
|
self.stop_pool()
|
||||||
self.start_pool()
|
self.start_pool()
|
||||||
|
|
||||||
def check(self, instance):
|
def check(self, instance):
|
||||||
if not self.pool_started:
|
if not self.pool:
|
||||||
self.start_pool()
|
self.start_pool()
|
||||||
if threading.activeCount() > MAX_ALLOWED_THREADS:
|
|
||||||
exception = "Thread number ({0}) exceeds maximum ({1}). Skipping this check.".format(threading.activeCount(),
|
|
||||||
MAX_ALLOWED_THREADS)
|
|
||||||
if self.pool_size >= MAX_ALLOWED_THREADS:
|
|
||||||
exception += " threads_count is set too high in the {0} plugin config.".format(self.name)
|
|
||||||
else:
|
|
||||||
exception += " Another plugin may have threads_count set too high."
|
|
||||||
raise Exception(exception)
|
|
||||||
self._process_results()
|
self._process_results()
|
||||||
self._clean()
|
|
||||||
name = instance.get('name', None)
|
name = instance.get('name', None)
|
||||||
if name is None:
|
if name is None:
|
||||||
self.log.error('Each service check must have a name')
|
self.log.error('Each service check must have a name')
|
||||||
return
|
return
|
||||||
|
|
||||||
if name not in self.jobs_status:
|
if name not in self.running_jobs:
|
||||||
# A given instance should be processed one at a time
|
# A given instance should be processed one at a time
|
||||||
self.jobs_status[name] = time.time()
|
self.running_jobs.add(name)
|
||||||
self.pool.apply_async(self._process, args=(instance,))
|
self.pool.apply_async(self._process, args=(instance,))
|
||||||
else:
|
else:
|
||||||
self.log.info("Instance: %s skipped because it's already running." % name)
|
self.log.info("Instance: %s skipped because it's already running." % name)
|
||||||
|
|
||||||
def _process(self, instance):
|
def _process(self, instance):
|
||||||
name = instance.get('name', None)
|
name = instance.get('name', None)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return_value = self._check(instance)
|
with Timeout(self.timeout):
|
||||||
|
return_value = self._check(instance)
|
||||||
if not return_value:
|
if not return_value:
|
||||||
del self.jobs_status[name]
|
|
||||||
return
|
return
|
||||||
status, msg = return_value
|
status, msg = return_value
|
||||||
result = (status, msg, name, instance)
|
result = (status, msg, name, instance)
|
||||||
# We put the results in the result queue
|
# We put the results in the result queue
|
||||||
self.resultsq.put(result)
|
self.resultsq.put(result)
|
||||||
|
except Timeout:
|
||||||
|
self.log.error('ServiceCheck {0} timed out'.format(name))
|
||||||
except Exception:
|
except Exception:
|
||||||
self.log.exception('Failure in ServiceCheck {0}'.format(name))
|
self.log.exception('Failure in ServiceCheck {0}'.format(name))
|
||||||
result = (FAILURE, FAILURE, FAILURE, FAILURE)
|
result = (FAILURE, FAILURE, FAILURE, FAILURE)
|
||||||
self.resultsq.put(result)
|
self.resultsq.put(result)
|
||||||
|
finally:
|
||||||
|
self.running_jobs.remove(name)
|
||||||
|
|
||||||
def _process_results(self):
|
def _process_results(self):
|
||||||
for i in range(MAX_LOOP_ITERATIONS):
|
for i in range(MAX_LOOP_ITERATIONS):
|
||||||
@ -171,18 +164,8 @@ class ServicesCheck(monasca_agent.collector.checks.AgentCheck):
|
|||||||
if event is not None:
|
if event is not None:
|
||||||
self.events.append(event)
|
self.events.append(event)
|
||||||
|
|
||||||
# The job is finished here, this instance can be re processed
|
|
||||||
del self.jobs_status[name]
|
|
||||||
|
|
||||||
def _check(self, instance):
|
def _check(self, instance):
|
||||||
"""This function should be implemented by inherited classes.
|
"""This function should be implemented by inherited classes.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
def _clean(self):
|
|
||||||
now = time.time()
|
|
||||||
for name, start_time in self.jobs_status.items():
|
|
||||||
if now - start_time > self.timeout:
|
|
||||||
self.log.critical("Restarting Pool. One check is stuck.")
|
|
||||||
self.restart_pool()
|
|
||||||
|
@ -5,6 +5,7 @@ oslo.utils
|
|||||||
oslo.vmware
|
oslo.vmware
|
||||||
|
|
||||||
PyYAML
|
PyYAML
|
||||||
|
gevent
|
||||||
gearman>=2.0.2,<2.1
|
gearman>=2.0.2,<2.1
|
||||||
httplib2
|
httplib2
|
||||||
netaddr
|
netaddr
|
||||||
|
Loading…
x
Reference in New Issue
Block a user