Fix multi-thread issues

Add exception handling for multi-threads and main thread.  Change
default mode from "parallel" to "sequential".

Change-Id: I91c85f00c4e8e1a90e46cd41e344d5e246ac85e4
This commit is contained in:
Changbin Liu 2013-05-24 16:04:43 -04:00
parent ede087ff4b
commit ed4e34d2f3
1 changed files with 73 additions and 39 deletions

View File

@ -24,7 +24,9 @@ developer/user
"""
import getopt
import functools
import os
import Queue
import sys
import threading
import time
@ -47,7 +49,7 @@ class Orchestrator(object):
num_workers,
chef_repo,
chef_repo_branch,
sequential,
parallel,
user='ubuntu',
image='befe3cc7-c448-43bf-8a24-b05ba2247835',
flavor=4,
@ -64,8 +66,8 @@ class Orchestrator(object):
@param num_workers: how many worker nodes you'd like
@param chef_repo: chef repository location
@param chef_repo_branch: which branch to use in repo
@param sequential: whether run threads in sequential or parallel
(for accelerating)
@param parallel: whether run functions in parallel (via threads, for
accelerating) or sequential
@param user: username (with root permission) for all servers
@param image: default u1204-130529-gvc
@param flavor: default large
@ -92,7 +94,7 @@ class Orchestrator(object):
self.num_workers = num_workers
self.chef_repo = chef_repo
self.chef_repo_branch = chef_repo_branch
self.sequential = sequential
self.parallel = parallel
self.user = user
self.image = image
self.flavor = flavor
@ -293,7 +295,7 @@ class Orchestrator(object):
check-in all VMs into chefserver (knife bootstrap), and set their
environment to be self.prefix
"""
threads = []
funcs = []
ipaddrs = ([self._chefserver_ip, self._gateway_ip,
self._controller_ip] + self._worker_ips)
hostnames = ([self._chefserver_name, self._gateway_name,
@ -302,12 +304,10 @@ class Orchestrator(object):
uri = self.user + '@' + self._chefserver_ip
command = ('/usr/bin/knife bootstrap %s -x %s -N %s -E %s --sudo'
% (ipaddr, self.user, hostname, self.prefix))
thread = threading.Thread(target=cmd.ssh,
args=(uri, command),
kwargs={"screen_output": True,
"agent_forwarding": True})
threads.append(thread)
self._run_threads(threads)
func = functools.partial(cmd.ssh, uri, command, screen_output=True,
agent_forwarding=True)
funcs.append(func)
self._execute_funcs(funcs)
# run an empty list to make sure attributes are properly propagated
self._run_chef_client(ipaddrs)
@ -344,17 +344,15 @@ class Orchestrator(object):
@param hostnames: hostnames of specified servers
@param item: name of the item (e.g., recipe, role, etc)
"""
threads = []
funcs = []
for hostname in hostnames:
uri = self.user + '@' + self._chefserver_ip
command = "/usr/bin/knife node run_list add %s %s" % (
hostname, item)
thread = threading.Thread(target=cmd.ssh,
args=(uri, command),
kwargs={"screen_output": True,
"agent_forwarding": True})
threads.append(thread)
self._run_threads(threads)
func = functools.partial(cmd.ssh, uri, command, screen_output=True,
agent_forwarding=True)
funcs.append(func)
self._execute_funcs(funcs)
def _run_chef_client(self, ipaddrs):
"""
@ -363,32 +361,44 @@ class Orchestrator(object):
@param param: ip addresses of the servers
"""
threads = []
funcs = []
for ipaddr in ipaddrs:
uri = self.user + '@' + ipaddr
command = "sudo chef-client"
thread = threading.Thread(target=cmd.ssh,
args=(uri, command),
kwargs={"screen_output": True,
"agent_forwarding": True})
threads.append(thread)
self._run_threads(threads)
func = functools.partial(cmd.ssh, uri, command, screen_output=True,
agent_forwarding=True)
funcs.append(func)
self._execute_funcs(funcs)
def _run_threads(self, threads):
def _execute_funcs(self, funcs):
"""
run threads, whether in a sequential or parallel way
Execute functions, whether in parallel (via threads) or sequential.
If parallel, exceptions of subthreads will be collected in a queue,
and an exception will raised in main thread later
@param threads: the threads to be run
@param funcs: the functions to be executed
"""
if self.sequential:
for thread in threads:
thread.start()
thread.join()
if not self.parallel:
for func in funcs:
func()
else:
for thread in threads:
exception_queue = Queue.Queue()
threads = []
# create and start all threads
for func in funcs:
thread = FuncThread(func, exception_queue)
threads.append(thread)
thread.start()
# wait for all threads to finish
for thread in threads:
thread.join()
# check whether got exception in threads
got_exception = not exception_queue.empty()
while not exception_queue.empty():
thread_name, func_info, exc = exception_queue.get()
print thread_name, func_info, exc
if got_exception:
raise RuntimeError("One or more subthreads got exception")
def _setup_controller(self):
"""
@ -446,6 +456,30 @@ class Orchestrator(object):
print "Inception cloud '%s' has been cleaned up." % self.prefix
class FuncThread(threading.Thread):
"""
thread of calling a partial function, based on the regular thread by adding
a exception queue
"""
def __init__(self, func, exception_queue):
threading.Thread.__init__(self)
self._func = func
self._exception_queue = exception_queue
def run(self):
"""
Call the function, and put exception in queue if any
"""
try:
self._func()
except Exception:
func_info = (str(self._func.func) + " " + str(self._func.args) +
" " + str(self._func.keywords))
info = (self.name, func_info, traceback.format_exc())
print info
self._exception_queue.put(info)
def main():
"""
program starting point
@ -455,10 +489,10 @@ def main():
cleanup = False
chef_repo = "git://github.com/maoy/inception-chef-repo.git"
chef_repo_branch = "master"
sequential = False
parallel = False
try:
optlist, _ = getopt.getopt(sys.argv[1:], 'p:n:',
["shell", "atomic", "cleanup", "sequential",
["shell", "atomic", "cleanup", "parallel",
"chef-repo=", "chef-repo-branch="])
optdict = dict(optlist)
prefix = optdict['-p']
@ -473,14 +507,14 @@ def main():
chef_repo = optdict["--chef-repo"]
if "--chef-repo-branch" in optdict:
chef_repo_branch = optdict["--chef-repo-branch"]
if "--sequential" in optdict:
sequential = True
if "--parallel" in optdict:
parallel = True
except Exception:
print traceback.format_exc()
usage()
sys.exit(1)
orchestrator = Orchestrator(prefix, num_workers, chef_repo,
chef_repo_branch, sequential)
chef_repo_branch, parallel)
if shell:
# give me a ipython shell
IPython.embed()
@ -494,7 +528,7 @@ def main():
def usage():
print """
python %s -p <prefix> -n <num_workers> [--shell] [--atomic] [--cleanup]
[--sequential] [--chef-repo=git://github.com/maoy/inception-chef-repo.git]
[--parallel] [--chef-repo=git://github.com/maoy/inception-chef-repo.git]
[--chef-repo-branch=master]
Note: make sure OpenStack-related environment variables are defined.