@ -16,6 +16,7 @@ import collections
import datetime
import json
import logging
import multiprocessing
import os
import shutil
import signal
@ -563,6 +564,10 @@ class ExecutorServer(object):
self . merge_name = get_default ( self . config , ' merger ' , ' git_user_name ' )
execution_wrapper_name = get_default ( self . config , ' executor ' ,
' execution_wrapper ' , ' bubblewrap ' )
load_multiplier = float ( get_default ( self . config , ' executor ' ,
' load_multiplier ' , ' 2.5 ' ) )
self . max_load_avg = multiprocessing . cpu_count ( ) * load_multiplier
self . accepting_work = False
self . execution_wrapper = connections . drivers [ execution_wrapper_name ]
self . connections = connections
@ -652,19 +657,32 @@ class ExecutorServer(object):
self . executor_thread = threading . Thread ( target = self . run_executor )
self . executor_thread . daemon = True
self . executor_thread . start ( )
self . governor_stop_event = threading . Event ( )
self . governor_thread = threading . Thread ( target = self . run_governor )
self . governor_thread . daemon = True
self . governor_thread . start ( )
self . disk_accountant . start ( )
def register ( self ) :
self . executor_worker . registerFunction ( " executor:execute " )
self . register_work ( )
self . executor_worker . registerFunction ( " executor:stop: %s " %
self . hostname )
self . merger_worker . registerFunction ( " merger:merge " )
self . merger_worker . registerFunction ( " merger:cat " )
self . merger_worker . registerFunction ( " merger:refstate " )
def register_work ( self ) :
self . accepting_work = True
self . executor_worker . registerFunction ( " executor:execute " )
def unregister_work ( self ) :
self . accepting_work = False
self . executor_worker . unregisterFunction ( " executor:execute " )
def stop ( self ) :
self . log . debug ( " Stopping " )
self . disk_accountant . stop ( )
self . governor_stop_event . set ( )
self . _running = False
self . _command_running = False
self . command_socket . stop ( )
@ -708,6 +726,7 @@ class ExecutorServer(object):
self . update_thread . join ( )
self . merger_thread . join ( )
self . executor_thread . join ( )
self . governor_thread . join ( )
def runCommand ( self ) :
while self . _command_running :
@ -796,10 +815,31 @@ class ExecutorServer(object):
except Exception :
self . log . exception ( " Exception while getting job " )
def run_governor ( self ) :
while not self . governor_stop_event . wait ( 30 ) :
self . manageLoad ( )
def executeJob ( self , job ) :
self . job_workers [ job . unique ] = AnsibleJob ( self , job )
self . job_workers [ job . unique ] . run ( )
def manageLoad ( self ) :
''' Apply some heuristics to decide whether or not we should
be askign for more jobs '''
load_avg = os . getloadavg ( ) [ 0 ]
if self . accepting_work :
# Don't unregister if we don't have any active jobs.
if load_avg > self . max_load_avg and self . job_workers :
self . log . info (
" Unregistering due to high system load {} > {} " . format (
load_avg , self . max_load_avg ) )
self . unregister_work ( )
elif load_avg < = self . max_load_avg :
self . log . info (
" Re-registering as load is within limits {} <= {} " . format (
load_avg , self . max_load_avg ) )
self . register_work ( )
def finishJob ( self , unique ) :
del ( self . job_workers [ unique ] )