Re-order executor/job classes
There should be no code changes in this commit, only a re-ordering of the classes involved to reduce the diff in a subsequent change. Change-Id: I0fc4287b6fadfaea02250bc5c1f57eba7e65f450
This commit is contained in:
parent
ded241e598
commit
107bb255f1
114
tests/base.py
114
tests/base.py
|
@ -1361,6 +1361,63 @@ class FakeBuild(object):
|
|||
return repos
|
||||
|
||||
|
||||
class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
|
||||
def doMergeChanges(self, merger, items, repo_state):
|
||||
# Get a merger in order to update the repos involved in this job.
|
||||
commit = super(RecordingAnsibleJob, self).doMergeChanges(
|
||||
merger, items, repo_state)
|
||||
if not commit: # merge conflict
|
||||
self.recordResult('MERGER_FAILURE')
|
||||
return commit
|
||||
|
||||
def recordResult(self, result):
|
||||
build = self.executor_server.job_builds[self.job.unique]
|
||||
self.executor_server.lock.acquire()
|
||||
self.executor_server.build_history.append(
|
||||
BuildHistory(name=build.name, result=result, changes=build.changes,
|
||||
node=build.node, uuid=build.unique,
|
||||
ref=build.parameters['zuul']['ref'],
|
||||
parameters=build.parameters, jobdir=build.jobdir,
|
||||
pipeline=build.parameters['zuul']['pipeline'])
|
||||
)
|
||||
self.executor_server.running_builds.remove(build)
|
||||
del self.executor_server.job_builds[self.job.unique]
|
||||
self.executor_server.lock.release()
|
||||
|
||||
def runPlaybooks(self, args):
|
||||
build = self.executor_server.job_builds[self.job.unique]
|
||||
build.jobdir = self.jobdir
|
||||
|
||||
result = super(RecordingAnsibleJob, self).runPlaybooks(args)
|
||||
self.recordResult(result)
|
||||
return result
|
||||
|
||||
def runAnsible(self, cmd, timeout, playbook):
|
||||
build = self.executor_server.job_builds[self.job.unique]
|
||||
|
||||
if self.executor_server._run_ansible:
|
||||
result = super(RecordingAnsibleJob, self).runAnsible(
|
||||
cmd, timeout, playbook)
|
||||
else:
|
||||
if playbook.path:
|
||||
result = build.run()
|
||||
else:
|
||||
result = (self.RESULT_NORMAL, 0)
|
||||
return result
|
||||
|
||||
def getHostList(self, args):
|
||||
self.log.debug("hostlist")
|
||||
hosts = super(RecordingAnsibleJob, self).getHostList(args)
|
||||
for host in hosts:
|
||||
host['host_vars']['ansible_connection'] = 'local'
|
||||
|
||||
hosts.append(dict(
|
||||
name='localhost',
|
||||
host_vars=dict(ansible_connection='local'),
|
||||
host_keys=[]))
|
||||
return hosts
|
||||
|
||||
|
||||
class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
|
||||
"""An Ansible executor to be used in tests.
|
||||
|
||||
|
@ -1445,63 +1502,6 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
|
|||
super(RecordingExecutorServer, self).stop()
|
||||
|
||||
|
||||
class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
|
||||
def doMergeChanges(self, merger, items, repo_state):
|
||||
# Get a merger in order to update the repos involved in this job.
|
||||
commit = super(RecordingAnsibleJob, self).doMergeChanges(
|
||||
merger, items, repo_state)
|
||||
if not commit: # merge conflict
|
||||
self.recordResult('MERGER_FAILURE')
|
||||
return commit
|
||||
|
||||
def recordResult(self, result):
|
||||
build = self.executor_server.job_builds[self.job.unique]
|
||||
self.executor_server.lock.acquire()
|
||||
self.executor_server.build_history.append(
|
||||
BuildHistory(name=build.name, result=result, changes=build.changes,
|
||||
node=build.node, uuid=build.unique,
|
||||
ref=build.parameters['zuul']['ref'],
|
||||
parameters=build.parameters, jobdir=build.jobdir,
|
||||
pipeline=build.parameters['zuul']['pipeline'])
|
||||
)
|
||||
self.executor_server.running_builds.remove(build)
|
||||
del self.executor_server.job_builds[self.job.unique]
|
||||
self.executor_server.lock.release()
|
||||
|
||||
def runPlaybooks(self, args):
|
||||
build = self.executor_server.job_builds[self.job.unique]
|
||||
build.jobdir = self.jobdir
|
||||
|
||||
result = super(RecordingAnsibleJob, self).runPlaybooks(args)
|
||||
self.recordResult(result)
|
||||
return result
|
||||
|
||||
def runAnsible(self, cmd, timeout, playbook):
|
||||
build = self.executor_server.job_builds[self.job.unique]
|
||||
|
||||
if self.executor_server._run_ansible:
|
||||
result = super(RecordingAnsibleJob, self).runAnsible(
|
||||
cmd, timeout, playbook)
|
||||
else:
|
||||
if playbook.path:
|
||||
result = build.run()
|
||||
else:
|
||||
result = (self.RESULT_NORMAL, 0)
|
||||
return result
|
||||
|
||||
def getHostList(self, args):
|
||||
self.log.debug("hostlist")
|
||||
hosts = super(RecordingAnsibleJob, self).getHostList(args)
|
||||
for host in hosts:
|
||||
host['host_vars']['ansible_connection'] = 'local'
|
||||
|
||||
hosts.append(dict(
|
||||
name='localhost',
|
||||
host_vars=dict(ansible_connection='local'),
|
||||
host_keys=[]))
|
||||
return hosts
|
||||
|
||||
|
||||
class FakeGearmanServer(gear.Server):
|
||||
"""A Gearman server for use in tests.
|
||||
|
||||
|
|
|
@ -500,412 +500,6 @@ def make_inventory_dict(nodes, groups, all_vars):
|
|||
return inventory
|
||||
|
||||
|
||||
class ExecutorMergeWorker(gear.TextWorker):
|
||||
def __init__(self, executor_server, *args, **kw):
|
||||
self.zuul_executor_server = executor_server
|
||||
super(ExecutorMergeWorker, self).__init__(*args, **kw)
|
||||
|
||||
def handleNoop(self, packet):
|
||||
# Wait until the update queue is empty before responding
|
||||
while self.zuul_executor_server.update_queue.qsize():
|
||||
time.sleep(1)
|
||||
|
||||
with self.zuul_executor_server.merger_lock:
|
||||
super(ExecutorMergeWorker, self).handleNoop(packet)
|
||||
|
||||
|
||||
class ExecutorExecuteWorker(gear.TextWorker):
|
||||
def __init__(self, executor_server, *args, **kw):
|
||||
self.zuul_executor_server = executor_server
|
||||
super(ExecutorExecuteWorker, self).__init__(*args, **kw)
|
||||
|
||||
def handleNoop(self, packet):
|
||||
# Delay our response to running a new job based on the number
|
||||
# of jobs we're currently running, in an attempt to spread
|
||||
# load evenly among executors.
|
||||
workers = len(self.zuul_executor_server.job_workers)
|
||||
delay = (workers ** 2) / 1000.0
|
||||
time.sleep(delay)
|
||||
return super(ExecutorExecuteWorker, self).handleNoop(packet)
|
||||
|
||||
|
||||
class ExecutorServer(object):
|
||||
log = logging.getLogger("zuul.ExecutorServer")
|
||||
|
||||
def __init__(self, config, connections={}, jobdir_root=None,
|
||||
keep_jobdir=False, log_streaming_port=DEFAULT_FINGER_PORT):
|
||||
self.config = config
|
||||
self.keep_jobdir = keep_jobdir
|
||||
self.jobdir_root = jobdir_root
|
||||
# TODOv3(mordred): make the executor name more unique --
|
||||
# perhaps hostname+pid.
|
||||
self.hostname = socket.gethostname()
|
||||
self.log_streaming_port = log_streaming_port
|
||||
self.merger_lock = threading.Lock()
|
||||
self.verbose = False
|
||||
self.command_map = dict(
|
||||
stop=self.stop,
|
||||
pause=self.pause,
|
||||
unpause=self.unpause,
|
||||
graceful=self.graceful,
|
||||
verbose=self.verboseOn,
|
||||
unverbose=self.verboseOff,
|
||||
keep=self.keep,
|
||||
nokeep=self.nokeep,
|
||||
)
|
||||
|
||||
self.merge_root = get_default(self.config, 'executor', 'git_dir',
|
||||
'/var/lib/zuul/executor-git')
|
||||
self.default_username = get_default(self.config, 'executor',
|
||||
'default_username', 'zuul')
|
||||
self.disk_limit_per_job = int(get_default(self.config, 'executor',
|
||||
'disk_limit_per_job', 250))
|
||||
self.merge_email = get_default(self.config, 'merger', 'git_user_email')
|
||||
self.merge_name = get_default(self.config, 'merger', 'git_user_name')
|
||||
self.merge_speed_limit = get_default(
|
||||
config, 'merger', 'git_http_low_speed_limit', '1000')
|
||||
self.merge_speed_time = get_default(
|
||||
config, 'merger', 'git_http_low_speed_time', '30')
|
||||
execution_wrapper_name = get_default(self.config, 'executor',
|
||||
'execution_wrapper', 'bubblewrap')
|
||||
load_multiplier = float(get_default(self.config, 'executor',
|
||||
'load_multiplier', '2.5'))
|
||||
self.max_load_avg = multiprocessing.cpu_count() * load_multiplier
|
||||
self.accepting_work = False
|
||||
self.execution_wrapper = connections.drivers[execution_wrapper_name]
|
||||
|
||||
self.connections = connections
|
||||
# This merger and its git repos are used to maintain
|
||||
# up-to-date copies of all the repos that are used by jobs, as
|
||||
# well as to support the merger:cat functon to supply
|
||||
# configuration information to Zuul when it starts.
|
||||
self.merger = self._getMerger(self.merge_root)
|
||||
self.update_queue = DeduplicateQueue()
|
||||
|
||||
state_dir = get_default(self.config, 'executor', 'state_dir',
|
||||
'/var/lib/zuul', expand_user=True)
|
||||
path = os.path.join(state_dir, 'executor.socket')
|
||||
self.command_socket = commandsocket.CommandSocket(path)
|
||||
ansible_dir = os.path.join(state_dir, 'ansible')
|
||||
self.ansible_dir = ansible_dir
|
||||
if os.path.exists(ansible_dir):
|
||||
shutil.rmtree(ansible_dir)
|
||||
|
||||
zuul_dir = os.path.join(ansible_dir, 'zuul')
|
||||
plugin_dir = os.path.join(zuul_dir, 'ansible')
|
||||
|
||||
os.makedirs(plugin_dir, mode=0o0755)
|
||||
|
||||
self.library_dir = os.path.join(plugin_dir, 'library')
|
||||
self.action_dir = os.path.join(plugin_dir, 'action')
|
||||
self.callback_dir = os.path.join(plugin_dir, 'callback')
|
||||
self.lookup_dir = os.path.join(plugin_dir, 'lookup')
|
||||
self.filter_dir = os.path.join(plugin_dir, 'filter')
|
||||
|
||||
_copy_ansible_files(zuul.ansible, plugin_dir)
|
||||
|
||||
# We're copying zuul.ansible.* into a directory we are going
|
||||
# to add to pythonpath, so our plugins can "import
|
||||
# zuul.ansible". But we're not installing all of zuul, so
|
||||
# create a __init__.py file for the stub "zuul" module.
|
||||
with open(os.path.join(zuul_dir, '__init__.py'), 'w'):
|
||||
pass
|
||||
|
||||
self.job_workers = {}
|
||||
self.disk_accountant = DiskAccountant(self.jobdir_root,
|
||||
self.disk_limit_per_job,
|
||||
self.stopJobDiskFull,
|
||||
self.merge_root)
|
||||
|
||||
def _getMerger(self, root, logger=None):
|
||||
if root != self.merge_root:
|
||||
cache_root = self.merge_root
|
||||
else:
|
||||
cache_root = None
|
||||
return zuul.merger.merger.Merger(
|
||||
root, self.connections, self.merge_email, self.merge_name,
|
||||
self.merge_speed_limit, self.merge_speed_time, cache_root, logger)
|
||||
|
||||
def start(self):
|
||||
self._running = True
|
||||
self._command_running = True
|
||||
server = self.config.get('gearman', 'server')
|
||||
port = get_default(self.config, 'gearman', 'port', 4730)
|
||||
ssl_key = get_default(self.config, 'gearman', 'ssl_key')
|
||||
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
|
||||
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
|
||||
self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger')
|
||||
self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
|
||||
self.executor_worker = ExecutorExecuteWorker(
|
||||
self, 'Zuul Executor Server')
|
||||
self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
|
||||
self.log.debug("Waiting for server")
|
||||
self.merger_worker.waitForServer()
|
||||
self.executor_worker.waitForServer()
|
||||
self.log.debug("Registering")
|
||||
self.register()
|
||||
|
||||
self.log.debug("Starting command processor")
|
||||
self.command_socket.start()
|
||||
self.command_thread = threading.Thread(target=self.runCommand)
|
||||
self.command_thread.daemon = True
|
||||
self.command_thread.start()
|
||||
|
||||
self.log.debug("Starting worker")
|
||||
self.update_thread = threading.Thread(target=self._updateLoop)
|
||||
self.update_thread.daemon = True
|
||||
self.update_thread.start()
|
||||
self.merger_thread = threading.Thread(target=self.run_merger)
|
||||
self.merger_thread.daemon = True
|
||||
self.merger_thread.start()
|
||||
self.executor_thread = threading.Thread(target=self.run_executor)
|
||||
self.executor_thread.daemon = True
|
||||
self.executor_thread.start()
|
||||
self.governor_stop_event = threading.Event()
|
||||
self.governor_thread = threading.Thread(target=self.run_governor)
|
||||
self.governor_thread.daemon = True
|
||||
self.governor_thread.start()
|
||||
self.disk_accountant.start()
|
||||
|
||||
def register(self):
|
||||
self.register_work()
|
||||
self.executor_worker.registerFunction("executor:stop:%s" %
|
||||
self.hostname)
|
||||
self.merger_worker.registerFunction("merger:merge")
|
||||
self.merger_worker.registerFunction("merger:cat")
|
||||
self.merger_worker.registerFunction("merger:refstate")
|
||||
|
||||
def register_work(self):
|
||||
self.accepting_work = True
|
||||
self.executor_worker.registerFunction("executor:execute")
|
||||
|
||||
def unregister_work(self):
|
||||
self.accepting_work = False
|
||||
self.executor_worker.unregisterFunction("executor:execute")
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("Stopping")
|
||||
self.disk_accountant.stop()
|
||||
self.governor_stop_event.set()
|
||||
self._running = False
|
||||
self._command_running = False
|
||||
self.command_socket.stop()
|
||||
self.update_queue.put(None)
|
||||
|
||||
for job_worker in list(self.job_workers.values()):
|
||||
try:
|
||||
job_worker.stop()
|
||||
except Exception:
|
||||
self.log.exception("Exception sending stop command "
|
||||
"to worker:")
|
||||
self.merger_worker.shutdown()
|
||||
self.executor_worker.shutdown()
|
||||
self.log.debug("Stopped")
|
||||
|
||||
def pause(self):
|
||||
# TODOv3: implement
|
||||
pass
|
||||
|
||||
def unpause(self):
|
||||
# TODOv3: implement
|
||||
pass
|
||||
|
||||
def graceful(self):
|
||||
# TODOv3: implement
|
||||
pass
|
||||
|
||||
def verboseOn(self):
|
||||
self.verbose = True
|
||||
|
||||
def verboseOff(self):
|
||||
self.verbose = False
|
||||
|
||||
def keep(self):
|
||||
self.keep_jobdir = True
|
||||
|
||||
def nokeep(self):
|
||||
self.keep_jobdir = False
|
||||
|
||||
def join(self):
|
||||
self.update_thread.join()
|
||||
self.merger_thread.join()
|
||||
self.executor_thread.join()
|
||||
self.governor_thread.join()
|
||||
|
||||
def runCommand(self):
|
||||
while self._command_running:
|
||||
try:
|
||||
command = self.command_socket.get().decode('utf8')
|
||||
if command != '_stop':
|
||||
self.command_map[command]()
|
||||
except Exception:
|
||||
self.log.exception("Exception while processing command")
|
||||
|
||||
def _updateLoop(self):
|
||||
while self._running:
|
||||
try:
|
||||
self._innerUpdateLoop()
|
||||
except:
|
||||
self.log.exception("Exception in update thread:")
|
||||
|
||||
def _innerUpdateLoop(self):
|
||||
# Inside of a loop that keeps the main repositories up to date
|
||||
task = self.update_queue.get()
|
||||
if task is None:
|
||||
# We are asked to stop
|
||||
return
|
||||
with self.merger_lock:
|
||||
self.log.info("Updating repo %s/%s" % (
|
||||
task.connection_name, task.project_name))
|
||||
self.merger.updateRepo(task.connection_name, task.project_name)
|
||||
self.log.debug("Finished updating repo %s/%s" %
|
||||
(task.connection_name, task.project_name))
|
||||
task.setComplete()
|
||||
|
||||
def update(self, connection_name, project_name):
|
||||
# Update a repository in the main merger
|
||||
task = UpdateTask(connection_name, project_name)
|
||||
task = self.update_queue.put(task)
|
||||
return task
|
||||
|
||||
def run_merger(self):
|
||||
self.log.debug("Starting merger listener")
|
||||
while self._running:
|
||||
try:
|
||||
job = self.merger_worker.getJob()
|
||||
try:
|
||||
if job.name == 'merger:cat':
|
||||
self.log.debug("Got cat job: %s" % job.unique)
|
||||
self.cat(job)
|
||||
elif job.name == 'merger:merge':
|
||||
self.log.debug("Got merge job: %s" % job.unique)
|
||||
self.merge(job)
|
||||
elif job.name == 'merger:refstate':
|
||||
self.log.debug("Got refstate job: %s" % job.unique)
|
||||
self.refstate(job)
|
||||
else:
|
||||
self.log.error("Unable to handle job %s" % job.name)
|
||||
job.sendWorkFail()
|
||||
except Exception:
|
||||
self.log.exception("Exception while running job")
|
||||
job.sendWorkException(
|
||||
traceback.format_exc().encode('utf8'))
|
||||
except gear.InterruptedError:
|
||||
pass
|
||||
except Exception:
|
||||
self.log.exception("Exception while getting job")
|
||||
|
||||
def run_executor(self):
|
||||
self.log.debug("Starting executor listener")
|
||||
while self._running:
|
||||
try:
|
||||
job = self.executor_worker.getJob()
|
||||
try:
|
||||
if job.name == 'executor:execute':
|
||||
self.log.debug("Got execute job: %s" % job.unique)
|
||||
self.executeJob(job)
|
||||
elif job.name.startswith('executor:stop'):
|
||||
self.log.debug("Got stop job: %s" % job.unique)
|
||||
self.stopJob(job)
|
||||
else:
|
||||
self.log.error("Unable to handle job %s" % job.name)
|
||||
job.sendWorkFail()
|
||||
except Exception:
|
||||
self.log.exception("Exception while running job")
|
||||
job.sendWorkException(
|
||||
traceback.format_exc().encode('utf8'))
|
||||
except gear.InterruptedError:
|
||||
pass
|
||||
except Exception:
|
||||
self.log.exception("Exception while getting job")
|
||||
|
||||
def run_governor(self):
|
||||
while not self.governor_stop_event.wait(30):
|
||||
self.manageLoad()
|
||||
|
||||
def executeJob(self, job):
|
||||
self.job_workers[job.unique] = AnsibleJob(self, job)
|
||||
self.job_workers[job.unique].run()
|
||||
|
||||
def manageLoad(self):
|
||||
''' Apply some heuristics to decide whether or not we should
|
||||
be askign for more jobs '''
|
||||
load_avg = os.getloadavg()[0]
|
||||
if self.accepting_work:
|
||||
# Don't unregister if we don't have any active jobs.
|
||||
if load_avg > self.max_load_avg and self.job_workers:
|
||||
self.log.info(
|
||||
"Unregistering due to high system load {} > {}".format(
|
||||
load_avg, self.max_load_avg))
|
||||
self.unregister_work()
|
||||
elif load_avg <= self.max_load_avg:
|
||||
self.log.info(
|
||||
"Re-registering as load is within limits {} <= {}".format(
|
||||
load_avg, self.max_load_avg))
|
||||
self.register_work()
|
||||
|
||||
def finishJob(self, unique):
|
||||
del(self.job_workers[unique])
|
||||
|
||||
def stopJobDiskFull(self, jobdir):
|
||||
unique = os.path.basename(jobdir)
|
||||
self.stopJobByUnique(unique, reason=AnsibleJob.RESULT_DISK_FULL)
|
||||
|
||||
def stopJob(self, job):
|
||||
try:
|
||||
args = json.loads(job.arguments)
|
||||
self.log.debug("Stop job with arguments: %s" % (args,))
|
||||
unique = args['uuid']
|
||||
self.stopJobByUnique(unique)
|
||||
finally:
|
||||
job.sendWorkComplete()
|
||||
|
||||
def stopJobByUnique(self, unique, reason=None):
|
||||
job_worker = self.job_workers.get(unique)
|
||||
if not job_worker:
|
||||
self.log.debug("Unable to find worker for job %s" % (unique,))
|
||||
return
|
||||
try:
|
||||
job_worker.stop(reason)
|
||||
except Exception:
|
||||
self.log.exception("Exception sending stop command "
|
||||
"to worker:")
|
||||
|
||||
def cat(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
task = self.update(args['connection'], args['project'])
|
||||
task.wait()
|
||||
with self.merger_lock:
|
||||
files = self.merger.getFiles(args['connection'], args['project'],
|
||||
args['branch'], args['files'],
|
||||
args.get('dirs', []))
|
||||
result = dict(updated=True,
|
||||
files=files)
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def refstate(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
with self.merger_lock:
|
||||
success, repo_state = self.merger.getRepoState(args['items'])
|
||||
result = dict(updated=success,
|
||||
repo_state=repo_state)
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def merge(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
with self.merger_lock:
|
||||
ret = self.merger.mergeChanges(args['items'], args.get('files'),
|
||||
args.get('dirs', []),
|
||||
args.get('repo_state'))
|
||||
result = dict(merged=(ret is not None))
|
||||
if ret is None:
|
||||
result['commit'] = result['files'] = result['repo_state'] = None
|
||||
else:
|
||||
(result['commit'], result['files'], result['repo_state'],
|
||||
recent) = ret
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
|
||||
class AnsibleJobLogAdapter(logging.LoggerAdapter):
|
||||
def process(self, msg, kwargs):
|
||||
msg, kwargs = super(AnsibleJobLogAdapter, self).process(msg, kwargs)
|
||||
|
@ -1856,3 +1450,409 @@ class AnsibleJob(object):
|
|||
|
||||
self.emitPlaybookBanner(playbook, 'END', phase, result=result)
|
||||
return result, code
|
||||
|
||||
|
||||
class ExecutorMergeWorker(gear.TextWorker):
|
||||
def __init__(self, executor_server, *args, **kw):
|
||||
self.zuul_executor_server = executor_server
|
||||
super(ExecutorMergeWorker, self).__init__(*args, **kw)
|
||||
|
||||
def handleNoop(self, packet):
|
||||
# Wait until the update queue is empty before responding
|
||||
while self.zuul_executor_server.update_queue.qsize():
|
||||
time.sleep(1)
|
||||
|
||||
with self.zuul_executor_server.merger_lock:
|
||||
super(ExecutorMergeWorker, self).handleNoop(packet)
|
||||
|
||||
|
||||
class ExecutorExecuteWorker(gear.TextWorker):
|
||||
def __init__(self, executor_server, *args, **kw):
|
||||
self.zuul_executor_server = executor_server
|
||||
super(ExecutorExecuteWorker, self).__init__(*args, **kw)
|
||||
|
||||
def handleNoop(self, packet):
|
||||
# Delay our response to running a new job based on the number
|
||||
# of jobs we're currently running, in an attempt to spread
|
||||
# load evenly among executors.
|
||||
workers = len(self.zuul_executor_server.job_workers)
|
||||
delay = (workers ** 2) / 1000.0
|
||||
time.sleep(delay)
|
||||
return super(ExecutorExecuteWorker, self).handleNoop(packet)
|
||||
|
||||
|
||||
class ExecutorServer(object):
|
||||
log = logging.getLogger("zuul.ExecutorServer")
|
||||
|
||||
def __init__(self, config, connections={}, jobdir_root=None,
|
||||
keep_jobdir=False, log_streaming_port=DEFAULT_FINGER_PORT):
|
||||
self.config = config
|
||||
self.keep_jobdir = keep_jobdir
|
||||
self.jobdir_root = jobdir_root
|
||||
# TODOv3(mordred): make the executor name more unique --
|
||||
# perhaps hostname+pid.
|
||||
self.hostname = socket.gethostname()
|
||||
self.log_streaming_port = log_streaming_port
|
||||
self.merger_lock = threading.Lock()
|
||||
self.verbose = False
|
||||
self.command_map = dict(
|
||||
stop=self.stop,
|
||||
pause=self.pause,
|
||||
unpause=self.unpause,
|
||||
graceful=self.graceful,
|
||||
verbose=self.verboseOn,
|
||||
unverbose=self.verboseOff,
|
||||
keep=self.keep,
|
||||
nokeep=self.nokeep,
|
||||
)
|
||||
|
||||
self.merge_root = get_default(self.config, 'executor', 'git_dir',
|
||||
'/var/lib/zuul/executor-git')
|
||||
self.default_username = get_default(self.config, 'executor',
|
||||
'default_username', 'zuul')
|
||||
self.disk_limit_per_job = int(get_default(self.config, 'executor',
|
||||
'disk_limit_per_job', 250))
|
||||
self.merge_email = get_default(self.config, 'merger', 'git_user_email')
|
||||
self.merge_name = get_default(self.config, 'merger', 'git_user_name')
|
||||
self.merge_speed_limit = get_default(
|
||||
config, 'merger', 'git_http_low_speed_limit', '1000')
|
||||
self.merge_speed_time = get_default(
|
||||
config, 'merger', 'git_http_low_speed_time', '30')
|
||||
execution_wrapper_name = get_default(self.config, 'executor',
|
||||
'execution_wrapper', 'bubblewrap')
|
||||
load_multiplier = float(get_default(self.config, 'executor',
|
||||
'load_multiplier', '2.5'))
|
||||
self.max_load_avg = multiprocessing.cpu_count() * load_multiplier
|
||||
self.accepting_work = False
|
||||
self.execution_wrapper = connections.drivers[execution_wrapper_name]
|
||||
|
||||
self.connections = connections
|
||||
# This merger and its git repos are used to maintain
|
||||
# up-to-date copies of all the repos that are used by jobs, as
|
||||
# well as to support the merger:cat functon to supply
|
||||
# configuration information to Zuul when it starts.
|
||||
self.merger = self._getMerger(self.merge_root)
|
||||
self.update_queue = DeduplicateQueue()
|
||||
|
||||
state_dir = get_default(self.config, 'executor', 'state_dir',
|
||||
'/var/lib/zuul', expand_user=True)
|
||||
path = os.path.join(state_dir, 'executor.socket')
|
||||
self.command_socket = commandsocket.CommandSocket(path)
|
||||
ansible_dir = os.path.join(state_dir, 'ansible')
|
||||
self.ansible_dir = ansible_dir
|
||||
if os.path.exists(ansible_dir):
|
||||
shutil.rmtree(ansible_dir)
|
||||
|
||||
zuul_dir = os.path.join(ansible_dir, 'zuul')
|
||||
plugin_dir = os.path.join(zuul_dir, 'ansible')
|
||||
|
||||
os.makedirs(plugin_dir, mode=0o0755)
|
||||
|
||||
self.library_dir = os.path.join(plugin_dir, 'library')
|
||||
self.action_dir = os.path.join(plugin_dir, 'action')
|
||||
self.callback_dir = os.path.join(plugin_dir, 'callback')
|
||||
self.lookup_dir = os.path.join(plugin_dir, 'lookup')
|
||||
self.filter_dir = os.path.join(plugin_dir, 'filter')
|
||||
|
||||
_copy_ansible_files(zuul.ansible, plugin_dir)
|
||||
|
||||
# We're copying zuul.ansible.* into a directory we are going
|
||||
# to add to pythonpath, so our plugins can "import
|
||||
# zuul.ansible". But we're not installing all of zuul, so
|
||||
# create a __init__.py file for the stub "zuul" module.
|
||||
with open(os.path.join(zuul_dir, '__init__.py'), 'w'):
|
||||
pass
|
||||
|
||||
self.job_workers = {}
|
||||
self.disk_accountant = DiskAccountant(self.jobdir_root,
|
||||
self.disk_limit_per_job,
|
||||
self.stopJobDiskFull,
|
||||
self.merge_root)
|
||||
|
||||
def _getMerger(self, root, logger=None):
|
||||
if root != self.merge_root:
|
||||
cache_root = self.merge_root
|
||||
else:
|
||||
cache_root = None
|
||||
return zuul.merger.merger.Merger(
|
||||
root, self.connections, self.merge_email, self.merge_name,
|
||||
self.merge_speed_limit, self.merge_speed_time, cache_root, logger)
|
||||
|
||||
def start(self):
|
||||
self._running = True
|
||||
self._command_running = True
|
||||
server = self.config.get('gearman', 'server')
|
||||
port = get_default(self.config, 'gearman', 'port', 4730)
|
||||
ssl_key = get_default(self.config, 'gearman', 'ssl_key')
|
||||
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
|
||||
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
|
||||
self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger')
|
||||
self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
|
||||
self.executor_worker = ExecutorExecuteWorker(
|
||||
self, 'Zuul Executor Server')
|
||||
self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
|
||||
self.log.debug("Waiting for server")
|
||||
self.merger_worker.waitForServer()
|
||||
self.executor_worker.waitForServer()
|
||||
self.log.debug("Registering")
|
||||
self.register()
|
||||
|
||||
self.log.debug("Starting command processor")
|
||||
self.command_socket.start()
|
||||
self.command_thread = threading.Thread(target=self.runCommand)
|
||||
self.command_thread.daemon = True
|
||||
self.command_thread.start()
|
||||
|
||||
self.log.debug("Starting worker")
|
||||
self.update_thread = threading.Thread(target=self._updateLoop)
|
||||
self.update_thread.daemon = True
|
||||
self.update_thread.start()
|
||||
self.merger_thread = threading.Thread(target=self.run_merger)
|
||||
self.merger_thread.daemon = True
|
||||
self.merger_thread.start()
|
||||
self.executor_thread = threading.Thread(target=self.run_executor)
|
||||
self.executor_thread.daemon = True
|
||||
self.executor_thread.start()
|
||||
self.governor_stop_event = threading.Event()
|
||||
self.governor_thread = threading.Thread(target=self.run_governor)
|
||||
self.governor_thread.daemon = True
|
||||
self.governor_thread.start()
|
||||
self.disk_accountant.start()
|
||||
|
||||
def register(self):
|
||||
self.register_work()
|
||||
self.executor_worker.registerFunction("executor:stop:%s" %
|
||||
self.hostname)
|
||||
self.merger_worker.registerFunction("merger:merge")
|
||||
self.merger_worker.registerFunction("merger:cat")
|
||||
self.merger_worker.registerFunction("merger:refstate")
|
||||
|
||||
def register_work(self):
|
||||
self.accepting_work = True
|
||||
self.executor_worker.registerFunction("executor:execute")
|
||||
|
||||
def unregister_work(self):
|
||||
self.accepting_work = False
|
||||
self.executor_worker.unregisterFunction("executor:execute")
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("Stopping")
|
||||
self.disk_accountant.stop()
|
||||
self.governor_stop_event.set()
|
||||
self._running = False
|
||||
self._command_running = False
|
||||
self.command_socket.stop()
|
||||
self.update_queue.put(None)
|
||||
|
||||
for job_worker in list(self.job_workers.values()):
|
||||
try:
|
||||
job_worker.stop()
|
||||
except Exception:
|
||||
self.log.exception("Exception sending stop command "
|
||||
"to worker:")
|
||||
self.merger_worker.shutdown()
|
||||
self.executor_worker.shutdown()
|
||||
self.log.debug("Stopped")
|
||||
|
||||
def pause(self):
|
||||
# TODOv3: implement
|
||||
pass
|
||||
|
||||
def unpause(self):
|
||||
# TODOv3: implement
|
||||
pass
|
||||
|
||||
def graceful(self):
|
||||
# TODOv3: implement
|
||||
pass
|
||||
|
||||
def verboseOn(self):
|
||||
self.verbose = True
|
||||
|
||||
def verboseOff(self):
|
||||
self.verbose = False
|
||||
|
||||
def keep(self):
|
||||
self.keep_jobdir = True
|
||||
|
||||
def nokeep(self):
|
||||
self.keep_jobdir = False
|
||||
|
||||
def join(self):
|
||||
self.update_thread.join()
|
||||
self.merger_thread.join()
|
||||
self.executor_thread.join()
|
||||
self.governor_thread.join()
|
||||
|
||||
def runCommand(self):
|
||||
while self._command_running:
|
||||
try:
|
||||
command = self.command_socket.get().decode('utf8')
|
||||
if command != '_stop':
|
||||
self.command_map[command]()
|
||||
except Exception:
|
||||
self.log.exception("Exception while processing command")
|
||||
|
||||
def _updateLoop(self):
|
||||
while self._running:
|
||||
try:
|
||||
self._innerUpdateLoop()
|
||||
except:
|
||||
self.log.exception("Exception in update thread:")
|
||||
|
||||
def _innerUpdateLoop(self):
|
||||
# Inside of a loop that keeps the main repositories up to date
|
||||
task = self.update_queue.get()
|
||||
if task is None:
|
||||
# We are asked to stop
|
||||
return
|
||||
with self.merger_lock:
|
||||
self.log.info("Updating repo %s/%s" % (
|
||||
task.connection_name, task.project_name))
|
||||
self.merger.updateRepo(task.connection_name, task.project_name)
|
||||
self.log.debug("Finished updating repo %s/%s" %
|
||||
(task.connection_name, task.project_name))
|
||||
task.setComplete()
|
||||
|
||||
def update(self, connection_name, project_name):
|
||||
# Update a repository in the main merger
|
||||
task = UpdateTask(connection_name, project_name)
|
||||
task = self.update_queue.put(task)
|
||||
return task
|
||||
|
||||
def run_merger(self):
|
||||
self.log.debug("Starting merger listener")
|
||||
while self._running:
|
||||
try:
|
||||
job = self.merger_worker.getJob()
|
||||
try:
|
||||
if job.name == 'merger:cat':
|
||||
self.log.debug("Got cat job: %s" % job.unique)
|
||||
self.cat(job)
|
||||
elif job.name == 'merger:merge':
|
||||
self.log.debug("Got merge job: %s" % job.unique)
|
||||
self.merge(job)
|
||||
elif job.name == 'merger:refstate':
|
||||
self.log.debug("Got refstate job: %s" % job.unique)
|
||||
self.refstate(job)
|
||||
else:
|
||||
self.log.error("Unable to handle job %s" % job.name)
|
||||
job.sendWorkFail()
|
||||
except Exception:
|
||||
self.log.exception("Exception while running job")
|
||||
job.sendWorkException(
|
||||
traceback.format_exc().encode('utf8'))
|
||||
except gear.InterruptedError:
|
||||
pass
|
||||
except Exception:
|
||||
self.log.exception("Exception while getting job")
|
||||
|
||||
def run_executor(self):
|
||||
self.log.debug("Starting executor listener")
|
||||
while self._running:
|
||||
try:
|
||||
job = self.executor_worker.getJob()
|
||||
try:
|
||||
if job.name == 'executor:execute':
|
||||
self.log.debug("Got execute job: %s" % job.unique)
|
||||
self.executeJob(job)
|
||||
elif job.name.startswith('executor:stop'):
|
||||
self.log.debug("Got stop job: %s" % job.unique)
|
||||
self.stopJob(job)
|
||||
else:
|
||||
self.log.error("Unable to handle job %s" % job.name)
|
||||
job.sendWorkFail()
|
||||
except Exception:
|
||||
self.log.exception("Exception while running job")
|
||||
job.sendWorkException(
|
||||
traceback.format_exc().encode('utf8'))
|
||||
except gear.InterruptedError:
|
||||
pass
|
||||
except Exception:
|
||||
self.log.exception("Exception while getting job")
|
||||
|
||||
def run_governor(self):
|
||||
while not self.governor_stop_event.wait(30):
|
||||
self.manageLoad()
|
||||
|
||||
def executeJob(self, job):
|
||||
self.job_workers[job.unique] = AnsibleJob(self, job)
|
||||
self.job_workers[job.unique].run()
|
||||
|
||||
def manageLoad(self):
|
||||
''' Apply some heuristics to decide whether or not we should
|
||||
be askign for more jobs '''
|
||||
load_avg = os.getloadavg()[0]
|
||||
if self.accepting_work:
|
||||
# Don't unregister if we don't have any active jobs.
|
||||
if load_avg > self.max_load_avg and self.job_workers:
|
||||
self.log.info(
|
||||
"Unregistering due to high system load {} > {}".format(
|
||||
load_avg, self.max_load_avg))
|
||||
self.unregister_work()
|
||||
elif load_avg <= self.max_load_avg:
|
||||
self.log.info(
|
||||
"Re-registering as load is within limits {} <= {}".format(
|
||||
load_avg, self.max_load_avg))
|
||||
self.register_work()
|
||||
|
||||
def finishJob(self, unique):
|
||||
del(self.job_workers[unique])
|
||||
|
||||
def stopJobDiskFull(self, jobdir):
|
||||
unique = os.path.basename(jobdir)
|
||||
self.stopJobByUnique(unique, reason=AnsibleJob.RESULT_DISK_FULL)
|
||||
|
||||
def stopJob(self, job):
|
||||
try:
|
||||
args = json.loads(job.arguments)
|
||||
self.log.debug("Stop job with arguments: %s" % (args,))
|
||||
unique = args['uuid']
|
||||
self.stopJobByUnique(unique)
|
||||
finally:
|
||||
job.sendWorkComplete()
|
||||
|
||||
def stopJobByUnique(self, unique, reason=None):
|
||||
job_worker = self.job_workers.get(unique)
|
||||
if not job_worker:
|
||||
self.log.debug("Unable to find worker for job %s" % (unique,))
|
||||
return
|
||||
try:
|
||||
job_worker.stop(reason)
|
||||
except Exception:
|
||||
self.log.exception("Exception sending stop command "
|
||||
"to worker:")
|
||||
|
||||
def cat(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
task = self.update(args['connection'], args['project'])
|
||||
task.wait()
|
||||
with self.merger_lock:
|
||||
files = self.merger.getFiles(args['connection'], args['project'],
|
||||
args['branch'], args['files'],
|
||||
args.get('dirs', []))
|
||||
result = dict(updated=True,
|
||||
files=files)
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def refstate(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
with self.merger_lock:
|
||||
success, repo_state = self.merger.getRepoState(args['items'])
|
||||
result = dict(updated=success,
|
||||
repo_state=repo_state)
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def merge(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
with self.merger_lock:
|
||||
ret = self.merger.mergeChanges(args['items'], args.get('files'),
|
||||
args.get('dirs', []),
|
||||
args.get('repo_state'))
|
||||
result = dict(merged=(ret is not None))
|
||||
if ret is None:
|
||||
result['commit'] = result['files'] = result['repo_state'] = None
|
||||
else:
|
||||
(result['commit'], result['files'], result['repo_state'],
|
||||
recent) = ret
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
|
|
Loading…
Reference in New Issue