Browse Source

Document the threadgroup module

There were no docstrings for this API, which as a bonus is extremely
subtle and poorly-designed. Document it so that users at least have a
fighting chance of using it to do the things they intended.

Change-Id: Id6677a7e5aaf8ec9ddeb597c6d5e8f97806e2248
Zane Bitter 4 months ago
parent
commit
55f897c613
1 changed files with 201 additions and 5 deletions
  1. 201
    5
      oslo_service/threadgroup.py

+ 201
- 5
oslo_service/threadgroup.py View File

@@ -52,32 +52,75 @@ class Thread(object):
52 52
         return self._ident
53 53
 
54 54
     def stop(self):
55
+        """Kill the thread by raising GreenletExit within it."""
55 56
         self.thread.kill()
56 57
 
57 58
     def wait(self):
59
+        """Block until the thread completes and return the result."""
58 60
         return self.thread.wait()
59 61
 
60 62
     def link(self, func, *args, **kwargs):
63
+        """Schedule a function to be run upon completion of the thread."""
61 64
         self.thread.link(func, *args, **kwargs)
62 65
 
63 66
     def cancel(self, *throw_args):
67
+        """Prevent the thread from starting if it has not already done so.
68
+
69
+        :param throw_args: the `exc_info` data to raise from :func:`wait`.
70
+        """
64 71
         self.thread.cancel(*throw_args)
65 72
 
66 73
 
67 74
 class ThreadGroup(object):
68
-    """The point of the ThreadGroup class is to:
75
+    """A group of greenthreads and timers.
76
+
77
+    The point of the ThreadGroup class is to:
69 78
 
70 79
     * keep track of timers and greenthreads (making it easier to stop them
71 80
       when need be).
72 81
     * provide an easy API to add timers.
82
+
83
+    .. note::
84
+        The API is inconsistent, confusing, and not orthogonal. The same verbs
85
+        often mean different things when applied to timers and threads,
86
+        respectively. Read the documentation carefully.
73 87
     """
88
+
74 89
     def __init__(self, thread_pool_size=10):
90
+        """Create a ThreadGroup with a pool of greenthreads.
91
+
92
+        :param thread_pool_size: the maximum number of threads allowed to run
93
+                                 concurrently.
94
+        """
75 95
         self.pool = greenpool.GreenPool(thread_pool_size)
76 96
         self.threads = []
77 97
         self.timers = []
78 98
 
79 99
     def add_dynamic_timer(self, callback, initial_delay=None,
80 100
                           periodic_interval_max=None, *args, **kwargs):
101
+        """Add a timer that controls its own period dynamically.
102
+
103
+        The period of each iteration of the timer is controlled by the return
104
+        value of the callback function on the previous iteration.
105
+
106
+        .. warning::
107
+            Passing arguments to the callback function is deprecated. Use the
108
+            :func:`add_dynamic_timer_args` method to pass arguments for the
109
+            callback function.
110
+
111
+        :param callback: The callback function to run when the timer is
112
+                         triggered.
113
+        :param initial_delay: The delay in seconds before first triggering the
114
+                              timer. If not set, the timer is liable to be
115
+                              scheduled immediately.
116
+        :param periodic_interval_max: The maximum interval in seconds to allow
117
+                                      the callback function to request. If
118
+                                      provided, this is also used as the
119
+                                      default delay if None is returned by the
120
+                                      callback function.
121
+        :returns: an :class:`oslo_service.loopingcall.DynamicLoopingCall`
122
+                  instance
123
+        """
81 124
         if args or kwargs:
82 125
             warnings.warn("Calling add_dynamic_timer() with arguments to the "
83 126
                           "callback function is deprecated. Use "
@@ -91,6 +134,29 @@ class ThreadGroup(object):
91 134
     def add_dynamic_timer_args(self, callback, args=None, kwargs=None,
92 135
                                initial_delay=None, periodic_interval_max=None,
93 136
                                stop_on_exception=True):
137
+        """Add a timer that controls its own period dynamically.
138
+
139
+        The period of each iteration of the timer is controlled by the return
140
+        value of the callback function on the previous iteration.
141
+
142
+        :param callback: The callback function to run when the timer is
143
+                         triggered.
144
+        :param args: A list of positional args to the callback function.
145
+        :param kwargs: A dict of keyword args to the callback function.
146
+        :param initial_delay: The delay in seconds before first triggering the
147
+                              timer. If not set, the timer is liable to be
148
+                              scheduled immediately.
149
+        :param periodic_interval_max: The maximum interval in seconds to allow
150
+                                      the callback function to request. If
151
+                                      provided, this is also used as the
152
+                                      default delay if None is returned by the
153
+                                      callback function.
154
+        :param stop_on_exception: Pass ``False`` to have the timer continue
155
+                                  running even if the callback function raises
156
+                                  an exception.
157
+        :returns: an :class:`oslo_service.loopingcall.DynamicLoopingCall`
158
+                  instance
159
+        """
94 160
         args = args or []
95 161
         kwargs = kwargs or {}
96 162
         timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs)
@@ -102,6 +168,23 @@ class ThreadGroup(object):
102 168
 
103 169
     def add_timer(self, interval, callback, initial_delay=None,
104 170
                   *args, **kwargs):
171
+        """Add a timer with a fixed period.
172
+
173
+        .. warning::
174
+            Passing arguments to the callback function is deprecated. Use the
175
+            :func:`add_timer_args` method to pass arguments for the callback
176
+            function.
177
+
178
+        :param interval: The minimum period in seconds between calls to the
179
+                         callback function.
180
+        :param callback: The callback function to run when the timer is
181
+                         triggered.
182
+        :param initial_delay: The delay in seconds before first triggering the
183
+                              timer. If not set, the timer is liable to be
184
+                              scheduled immediately.
185
+        :returns: an :class:`oslo_service.loopingcall.FixedIntervalLoopingCall`
186
+                  instance
187
+        """
105 188
         if args or kwargs:
106 189
             warnings.warn("Calling add_timer() with arguments to the callback "
107 190
                           "function is deprecated. Use add_timer_args() "
@@ -112,6 +195,23 @@ class ThreadGroup(object):
112 195
 
113 196
     def add_timer_args(self, interval, callback, args=None, kwargs=None,
114 197
                        initial_delay=None, stop_on_exception=True):
198
+        """Add a timer with a fixed period.
199
+
200
+        :param interval: The minimum period in seconds between calls to the
201
+                         callback function.
202
+        :param callback: The callback function to run when the timer is
203
+                         triggered.
204
+        :param args: A list of positional args to the callback function.
205
+        :param kwargs: A dict of keyword args to the callback function.
206
+        :param initial_delay: The delay in seconds before first triggering the
207
+                              timer. If not set, the timer is liable to be
208
+                              scheduled immediately.
209
+        :param stop_on_exception: Pass ``False`` to have the timer continue
210
+                                  running even if the callback function raises
211
+                                  an exception.
212
+        :returns: an :class:`oslo_service.loopingcall.FixedIntervalLoopingCall`
213
+                  instance
214
+        """
115 215
         args = args or []
116 216
         kwargs = kwargs or {}
117 217
         pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs)
@@ -122,6 +222,17 @@ class ThreadGroup(object):
122 222
         return pulse
123 223
 
124 224
     def add_thread(self, callback, *args, **kwargs):
225
+        """Spawn a new thread.
226
+
227
+        This call will block until capacity is available in the thread pool.
228
+        After that, it returns immediately (i.e. *before* the new thread is
229
+        scheduled).
230
+
231
+        :param callback: the function to run in the new thread.
232
+        :param args: positional arguments to the callback function.
233
+        :param kwargs: keyword arguments to the callback function.
234
+        :returns: a :class:`Thread` object
235
+        """
125 236
         gt = self.pool.spawn(callback, *args, **kwargs)
126 237
         th = Thread(gt, self, link=False)
127 238
         self.threads.append(th)
@@ -129,9 +240,19 @@ class ThreadGroup(object):
129 240
         return th
130 241
 
131 242
     def thread_done(self, thread):
243
+        """Remove a completed thread from the group.
244
+
245
+        This method is automatically called on completion of a thread in the
246
+        group, and should not be called explicitly.
247
+        """
132 248
         self.threads.remove(thread)
133 249
 
134 250
     def timer_done(self, timer):
251
+        """Remove a timer from the group.
252
+
253
+        :param timer: The timer object returned from :func:`add_timer` or its
254
+                      analogues.
255
+        """
135 256
         self.timers.remove(timer)
136 257
 
137 258
     def _perform_action_on_threads(self, action_func, on_error_func):
@@ -156,6 +277,18 @@ class ThreadGroup(object):
156 277
             lambda x: LOG.exception('Error stopping thread.'))
157 278
 
158 279
     def stop_timers(self, wait=False):
280
+        """Stop all timers in the group and remove them from the group.
281
+
282
+        No new invocations of timers will be triggered after they are stopped,
283
+        but calls that are in progress will not be interrupted.
284
+
285
+        To wait for in-progress calls to complete, pass ``wait=True`` - calling
286
+        :func:`wait` will not have the desired effect as the timers will have
287
+        already been removed from the group.
288
+
289
+        :param wait: If true, block until all timers have been stopped before
290
+                     returning.
291
+        """
159 292
         for timer in self.timers:
160 293
             timer.stop()
161 294
         if wait:
@@ -163,11 +296,25 @@ class ThreadGroup(object):
163 296
         self.timers = []
164 297
 
165 298
     def stop(self, graceful=False):
166
-        """stop function has the option of graceful=True/False.
299
+        """Stop all timers and threads in the group.
300
+
301
+        No new invocations of timers will be triggered after they are stopped,
302
+        but calls that are in progress will not be interrupted.
303
+
304
+        If ``graceful`` is false, kill all threads immediately by raising
305
+        GreenletExit. Note that in this case, this method will **not** block
306
+        until all threads and running timer callbacks have actually exited. To
307
+        guarantee that all threads have exited, call :func:`wait`.
167 308
 
168
-        * In case of graceful=True, wait for all threads to be finished.
169
-          Never kill threads.
170
-        * In case of graceful=False, kill threads immediately.
309
+        If ``graceful`` is true, do not kill threads. Block until all threads
310
+        and running timer callbacks have completed. This is equivalent to
311
+        calling :func:`stop_timers` with ``wait=True`` followed by
312
+        :func:`wait`.
313
+
314
+        :param graceful: If true, block until all timers have stopped and all
315
+                         threads completed; never kill threads. Otherwise,
316
+                         kill threads immediately and return immediately even
317
+                         if there are timer callbacks still running.
171 318
         """
172 319
         self.stop_timers(wait=graceful)
173 320
         if graceful:
@@ -195,6 +342,25 @@ class ThreadGroup(object):
195 342
             lambda x: LOG.exception('Error waiting on thread.'))
196 343
 
197 344
     def wait(self):
345
+        """Block until all timers and threads in the group are complete.
346
+
347
+        .. note::
348
+            Before calling this method, any timers should be stopped first by
349
+            calling :func:`stop_timers`, :func:`stop`, or :func:`cancel` with a
350
+            ``timeout`` argument. Otherwise this will block forever.
351
+
352
+        .. note::
353
+            Calling :func:`stop_timers` removes the timers from the group, so a
354
+            subsequent call to this method will not wait for any in-progress
355
+            timer calls to complete.
356
+
357
+        Any exceptions raised by the threads will be logged but suppressed.
358
+
359
+        .. note::
360
+            This call guarantees only that the threads themselves have
361
+            completed, **not** that any cleanup functions added via
362
+            :func:`Thread.link` have completed.
363
+        """
198 364
         self._wait_timers()
199 365
         self._wait_threads()
200 366
 
@@ -209,6 +375,36 @@ class ThreadGroup(object):
209 375
         return False
210 376
 
211 377
     def cancel(self, *throw_args, **kwargs):
378
+        """Cancel unstarted threads in the group, and optionally stop the rest.
379
+
380
+        If called without the ``timeout`` argument, this does **not** stop any
381
+        running threads, but prevents any threads in the group that have not
382
+        yet started from running, then returns immediately. Timers are not
383
+        affected.
384
+
385
+        If the 'timeout' argument is supplied, then it serves as a grace period
386
+        to allow running threads to finish. After the timeout, any threads in
387
+        the group that are still running will be killed by raising GreenletExit
388
+        in them, and all timers will be stopped (so that they are not
389
+        retriggered - timer calls that are in progress will not be
390
+        interrupted). This method will **not** block until all threads have
391
+        actually exited, nor that all in-progress timer calls have completed.
392
+        To guarantee that all threads have exited, call :func:`wait`. If all
393
+        threads complete before the timeout expires, timers will be left
394
+        running; there is no way to then stop those timers, so for consistent
395
+        behaviour :func`stop_timers` should be called before calling this
396
+        method.
397
+
398
+        :param throw_args: the `exc_info` data to raise from
399
+                           :func:`Thread.wait` for any of the unstarted
400
+                           threads. (Though note that :func:`ThreadGroup.wait`
401
+                           suppresses exceptions.)
402
+        :param timeout: time to wait for running threads to complete before
403
+                        calling stop(). If not supplied, threads that are
404
+                        already running continue to completion.
405
+        :param wait_time: length of time in seconds to sleep between checks of
406
+                          whether any threads are still alive. (Default 1s.)
407
+        """
212 408
         self._perform_action_on_threads(
213 409
             lambda x: x.cancel(*throw_args),
214 410
             lambda x: LOG.exception('Error canceling thread.'))

Loading…
Cancel
Save