Move the workers running code to its own async.py file

Change-Id: I5b0bdbb688e30a74a6c9230e720099461328960c
This commit is contained in:
Joshua Harlow 2015-07-07 23:33:02 -07:00
parent 5193dc0cf5
commit 49ac3755d9
2 changed files with 113 additions and 35 deletions

97
anvil/async.py Normal file
View File

@ -0,0 +1,97 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import threading
from concurrent import futures
import six
from six.moves import queue as compat_queue
from six.moves import range as compat_range
from anvil import log as logging
LOG = logging.getLogger(__name__)
_TOMBSTONE = object()
def _chained_worker(ident, shared_death, queue, futs):
running = True
while running:
if shared_death.is_set():
LOG.warn("Worker %s dying unhappily...", ident)
running = False
else:
w = queue.get()
if w is _TOMBSTONE:
queue.put(w)
LOG.info("Worker %s dying happily...", ident)
running = False
else:
func, fut = w
if fut.set_running_or_notify_cancel():
try:
result = func()
except BaseException:
LOG.exception("Worker %s dying unhappily...", ident)
exc_type, exc_val, exc_tb = sys.exc_info()
if six.PY2:
fut.set_exception_info(exc_val, exc_tb)
else:
fut.set_exception(exc_val)
# Stop all other workers from doing any more work...
shared_death.set()
for fut in futs:
fut.cancel()
running = False
else:
fut.set_result(result)
class ChainedWorkerExecutor(object):
def __init__(self, max_workers):
self._workers = []
self._max_workers = int(max_workers)
self._queue = compat_queue.Queue()
self._death = threading.Event()
def run(self, funcs):
if self._workers:
raise RuntimeError("Can not start another `run` with %s"
" existing workers" % (len(self._workers)))
self._queue = compat_queue.Queue()
self._death.clear()
futs = []
for i in compat_range(0, self._max_workers):
w = threading.Thread(target=_chained_worker,
args=(i + 1, self._death,
self._queue, futs))
w.daemon = True
w.start()
self._workers.append(w)
for func in funcs:
fut = futures.Future()
futs.append(fut)
self._queue.put((func, fut))
return futs
def wait(self):
self._queue.put(_TOMBSTONE)
while self._workers:
w = self._workers.pop()
w.join()

View File

@ -21,13 +21,10 @@ import os
import re
import sys
import tarfile
import threading
from concurrent import futures
import six
from six.moves import queue as compat_queue
from anvil import async
from anvil import colorizer
from anvil import env
from anvil import exceptions as excp
@ -199,40 +196,24 @@ class VenvDependencyHandler(base.DependencyHandler):
len(instances), self.jobs)
results = [None] * len(instances)
if self.jobs >= 1:
workers = []
futs = []
queue = compat_queue.Queue()
executor = async.ChainedWorkerExecutor(self.jobs)
retryable_exceptions = [
excp.ProcessExecutionError,
]
try:
shared_death = threading.Event()
for instance in instances:
fut = futures.Future()
func = functools.partial(utils.retry,
self._RETRIES, self._RETRY_DELAY,
self._package_instance,
instance,
retryable_exceptions=retryable_exceptions)
queue.put((func, fut))
futs.append(fut)
for i in range(0, self.jobs):
w = threading.Thread(target=_worker,
args=(i + 1, shared_death,
queue, futs))
w.daemon = True
w.start()
workers.append(w)
finally:
queue.put(TOMBSTONE)
while workers:
w = workers.pop()
w.join()
for fut in futs:
if fut.cancelled():
continue
if fut.done():
fut.result()
run_funcs = []
for instance in instances:
func = functools.partial(utils.retry,
self._RETRIES, self._RETRY_DELAY,
self._package_instance, instance,
retryable_exceptions=retryable_exceptions)
run_funcs.append(func)
futs = executor.run(run_funcs)
executor.wait()
for fut in futs:
if fut.cancelled():
continue
if fut.done():
fut.result()
else:
for instance in instances:
self.package_instance(instance)