diff --git a/tests/base.py b/tests/base.py index b029f0bd7e..028a194e88 100755 --- a/tests/base.py +++ b/tests/base.py @@ -1361,6 +1361,63 @@ class FakeBuild(object): return repos +class RecordingAnsibleJob(zuul.executor.server.AnsibleJob): + def doMergeChanges(self, merger, items, repo_state): + # Get a merger in order to update the repos involved in this job. + commit = super(RecordingAnsibleJob, self).doMergeChanges( + merger, items, repo_state) + if not commit: # merge conflict + self.recordResult('MERGER_FAILURE') + return commit + + def recordResult(self, result): + build = self.executor_server.job_builds[self.job.unique] + self.executor_server.lock.acquire() + self.executor_server.build_history.append( + BuildHistory(name=build.name, result=result, changes=build.changes, + node=build.node, uuid=build.unique, + ref=build.parameters['zuul']['ref'], + parameters=build.parameters, jobdir=build.jobdir, + pipeline=build.parameters['zuul']['pipeline']) + ) + self.executor_server.running_builds.remove(build) + del self.executor_server.job_builds[self.job.unique] + self.executor_server.lock.release() + + def runPlaybooks(self, args): + build = self.executor_server.job_builds[self.job.unique] + build.jobdir = self.jobdir + + result = super(RecordingAnsibleJob, self).runPlaybooks(args) + self.recordResult(result) + return result + + def runAnsible(self, cmd, timeout, playbook): + build = self.executor_server.job_builds[self.job.unique] + + if self.executor_server._run_ansible: + result = super(RecordingAnsibleJob, self).runAnsible( + cmd, timeout, playbook) + else: + if playbook.path: + result = build.run() + else: + result = (self.RESULT_NORMAL, 0) + return result + + def getHostList(self, args): + self.log.debug("hostlist") + hosts = super(RecordingAnsibleJob, self).getHostList(args) + for host in hosts: + host['host_vars']['ansible_connection'] = 'local' + + hosts.append(dict( + name='localhost', + host_vars=dict(ansible_connection='local'), + host_keys=[])) + return hosts + + class RecordingExecutorServer(zuul.executor.server.ExecutorServer): """An Ansible executor to be used in tests. @@ -1445,63 +1502,6 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer): super(RecordingExecutorServer, self).stop() -class RecordingAnsibleJob(zuul.executor.server.AnsibleJob): - def doMergeChanges(self, merger, items, repo_state): - # Get a merger in order to update the repos involved in this job. - commit = super(RecordingAnsibleJob, self).doMergeChanges( - merger, items, repo_state) - if not commit: # merge conflict - self.recordResult('MERGER_FAILURE') - return commit - - def recordResult(self, result): - build = self.executor_server.job_builds[self.job.unique] - self.executor_server.lock.acquire() - self.executor_server.build_history.append( - BuildHistory(name=build.name, result=result, changes=build.changes, - node=build.node, uuid=build.unique, - ref=build.parameters['zuul']['ref'], - parameters=build.parameters, jobdir=build.jobdir, - pipeline=build.parameters['zuul']['pipeline']) - ) - self.executor_server.running_builds.remove(build) - del self.executor_server.job_builds[self.job.unique] - self.executor_server.lock.release() - - def runPlaybooks(self, args): - build = self.executor_server.job_builds[self.job.unique] - build.jobdir = self.jobdir - - result = super(RecordingAnsibleJob, self).runPlaybooks(args) - self.recordResult(result) - return result - - def runAnsible(self, cmd, timeout, playbook): - build = self.executor_server.job_builds[self.job.unique] - - if self.executor_server._run_ansible: - result = super(RecordingAnsibleJob, self).runAnsible( - cmd, timeout, playbook) - else: - if playbook.path: - result = build.run() - else: - result = (self.RESULT_NORMAL, 0) - return result - - def getHostList(self, args): - self.log.debug("hostlist") - hosts = super(RecordingAnsibleJob, self).getHostList(args) - for host in hosts: - host['host_vars']['ansible_connection'] = 'local' - - hosts.append(dict( - name='localhost', - host_vars=dict(ansible_connection='local'), - host_keys=[])) - return hosts - - class FakeGearmanServer(gear.Server): """A Gearman server for use in tests. diff --git a/zuul/executor/server.py b/zuul/executor/server.py index d739c18957..670a42021a 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -500,1359 +500,1359 @@ def make_inventory_dict(nodes, groups, all_vars): return inventory -class ExecutorMergeWorker(gear.TextWorker): - def __init__(self, executor_server, *args, **kw): - self.zuul_executor_server = executor_server - super(ExecutorMergeWorker, self).__init__(*args, **kw) +class AnsibleJobLogAdapter(logging.LoggerAdapter): + def process(self, msg, kwargs): + msg, kwargs = super(AnsibleJobLogAdapter, self).process(msg, kwargs) + msg = '[build: %s] %s' % (kwargs['extra']['job'], msg) + return msg, kwargs - def handleNoop(self, packet): - # Wait until the update queue is empty before responding - while self.zuul_executor_server.update_queue.qsize(): - time.sleep(1) - with self.zuul_executor_server.merger_lock: - super(ExecutorMergeWorker, self).handleNoop(packet) +class AnsibleJob(object): + RESULT_NORMAL = 1 + RESULT_TIMED_OUT = 2 + RESULT_UNREACHABLE = 3 + RESULT_ABORTED = 4 + RESULT_DISK_FULL = 5 + RESULT_MAP = { + RESULT_NORMAL: 'RESULT_NORMAL', + RESULT_TIMED_OUT: 'RESULT_TIMED_OUT', + RESULT_UNREACHABLE: 'RESULT_UNREACHABLE', + RESULT_ABORTED: 'RESULT_ABORTED', + RESULT_DISK_FULL: 'RESULT_DISK_FULL', + } -class ExecutorExecuteWorker(gear.TextWorker): - def __init__(self, executor_server, *args, **kw): - self.zuul_executor_server = executor_server - super(ExecutorExecuteWorker, self).__init__(*args, **kw) + def __init__(self, executor_server, job): + logger = logging.getLogger("zuul.AnsibleJob") + self.log = AnsibleJobLogAdapter(logger, {'job': job.unique}) + self.executor_server = executor_server + self.job = job + self.jobdir = None + self.proc = None + self.proc_lock = threading.Lock() + self.running = False + self.aborted = False + self.aborted_reason = None + self.thread = None + self.private_key_file = get_default(self.executor_server.config, + 'executor', 'private_key_file', + '~/.ssh/id_rsa') + self.ssh_agent = SshAgent() - def handleNoop(self, packet): - # Delay our response to running a new job based on the number - # of jobs we're currently running, in an attempt to spread - # load evenly among executors. - workers = len(self.zuul_executor_server.job_workers) - delay = (workers ** 2) / 1000.0 - time.sleep(delay) - return super(ExecutorExecuteWorker, self).handleNoop(packet) + self.executor_variables_file = None + if self.executor_server.config.has_option('executor', 'variables'): + self.executor_variables_file = self.executor_server.config.get( + 'executor', 'variables') -class ExecutorServer(object): - log = logging.getLogger("zuul.ExecutorServer") + def run(self): + self.running = True + self.thread = threading.Thread(target=self.execute) + self.thread.start() - def __init__(self, config, connections={}, jobdir_root=None, - keep_jobdir=False, log_streaming_port=DEFAULT_FINGER_PORT): - self.config = config - self.keep_jobdir = keep_jobdir - self.jobdir_root = jobdir_root - # TODOv3(mordred): make the executor name more unique -- - # perhaps hostname+pid. - self.hostname = socket.gethostname() - self.log_streaming_port = log_streaming_port - self.merger_lock = threading.Lock() - self.verbose = False - self.command_map = dict( - stop=self.stop, - pause=self.pause, - unpause=self.unpause, - graceful=self.graceful, - verbose=self.verboseOn, - unverbose=self.verboseOff, - keep=self.keep, - nokeep=self.nokeep, - ) + def stop(self, reason=None): + self.aborted = True + self.aborted_reason = reason + self.abortRunningProc() + if self.thread: + self.thread.join() - self.merge_root = get_default(self.config, 'executor', 'git_dir', - '/var/lib/zuul/executor-git') - self.default_username = get_default(self.config, 'executor', - 'default_username', 'zuul') - self.disk_limit_per_job = int(get_default(self.config, 'executor', - 'disk_limit_per_job', 250)) - self.merge_email = get_default(self.config, 'merger', 'git_user_email') - self.merge_name = get_default(self.config, 'merger', 'git_user_name') - self.merge_speed_limit = get_default( - config, 'merger', 'git_http_low_speed_limit', '1000') - self.merge_speed_time = get_default( - config, 'merger', 'git_http_low_speed_time', '30') - execution_wrapper_name = get_default(self.config, 'executor', - 'execution_wrapper', 'bubblewrap') - load_multiplier = float(get_default(self.config, 'executor', - 'load_multiplier', '2.5')) - self.max_load_avg = multiprocessing.cpu_count() * load_multiplier - self.accepting_work = False - self.execution_wrapper = connections.drivers[execution_wrapper_name] + def execute(self): + try: + self.ssh_agent.start() + self.ssh_agent.add(self.private_key_file) + self.jobdir = JobDir(self.executor_server.jobdir_root, + self.executor_server.keep_jobdir, + str(self.job.unique)) + self._execute() + except ExecutorError as e: + result_data = json.dumps(dict(result='ERROR', + error_detail=e.args[0])) + self.log.debug("Sending result: %s" % (result_data,)) + self.job.sendWorkComplete(result_data) + except Exception: + self.log.exception("Exception while executing job") + self.job.sendWorkException(traceback.format_exc()) + finally: + self.running = False + if self.jobdir: + try: + self.jobdir.cleanup() + except Exception: + self.log.exception("Error cleaning up jobdir:") + if self.ssh_agent: + try: + self.ssh_agent.stop() + except Exception: + self.log.exception("Error stopping SSH agent:") + try: + self.executor_server.finishJob(self.job.unique) + except Exception: + self.log.exception("Error finalizing job thread:") - self.connections = connections - # This merger and its git repos are used to maintain - # up-to-date copies of all the repos that are used by jobs, as - # well as to support the merger:cat functon to supply - # configuration information to Zuul when it starts. - self.merger = self._getMerger(self.merge_root) - self.update_queue = DeduplicateQueue() + def _execute(self): + args = json.loads(self.job.arguments) + self.log.debug("Beginning job %s for ref %s" % + (self.job.name, args['zuul']['ref'])) + self.log.debug("Job root: %s" % (self.jobdir.root,)) + tasks = [] + projects = set() - state_dir = get_default(self.config, 'executor', 'state_dir', - '/var/lib/zuul', expand_user=True) - path = os.path.join(state_dir, 'executor.socket') - self.command_socket = commandsocket.CommandSocket(path) - ansible_dir = os.path.join(state_dir, 'ansible') - self.ansible_dir = ansible_dir - if os.path.exists(ansible_dir): - shutil.rmtree(ansible_dir) + # Make sure all projects used by the job are updated... + for project in args['projects']: + self.log.debug("Updating project %s" % (project,)) + tasks.append(self.executor_server.update( + project['connection'], project['name'])) + projects.add((project['connection'], project['name'])) - zuul_dir = os.path.join(ansible_dir, 'zuul') - plugin_dir = os.path.join(zuul_dir, 'ansible') + # ...as well as all playbook and role projects. + repos = [] + playbooks = (args['pre_playbooks'] + args['playbooks'] + + args['post_playbooks']) + for playbook in playbooks: + repos.append(playbook) + repos += playbook['roles'] - os.makedirs(plugin_dir, mode=0o0755) + for repo in repos: + self.log.debug("Updating playbook or role %s" % (repo['project'],)) + key = (repo['connection'], repo['project']) + if key not in projects: + tasks.append(self.executor_server.update(*key)) + projects.add(key) - self.library_dir = os.path.join(plugin_dir, 'library') - self.action_dir = os.path.join(plugin_dir, 'action') - self.callback_dir = os.path.join(plugin_dir, 'callback') - self.lookup_dir = os.path.join(plugin_dir, 'lookup') - self.filter_dir = os.path.join(plugin_dir, 'filter') + for task in tasks: + task.wait() - _copy_ansible_files(zuul.ansible, plugin_dir) + self.log.debug("Git updates complete") + merger = self.executor_server._getMerger(self.jobdir.src_root, + self.log) + repos = {} + for project in args['projects']: + self.log.debug("Cloning %s/%s" % (project['connection'], + project['name'],)) + repo = merger.getRepo(project['connection'], + project['name']) + repos[project['canonical_name']] = repo - # We're copying zuul.ansible.* into a directory we are going - # to add to pythonpath, so our plugins can "import - # zuul.ansible". But we're not installing all of zuul, so - # create a __init__.py file for the stub "zuul" module. - with open(os.path.join(zuul_dir, '__init__.py'), 'w'): - pass + merge_items = [i for i in args['items'] if i.get('number')] + if merge_items: + if not self.doMergeChanges(merger, merge_items, + args['repo_state']): + # There was a merge conflict and we have already sent + # a work complete result, don't run any jobs + return - self.job_workers = {} - self.disk_accountant = DiskAccountant(self.jobdir_root, - self.disk_limit_per_job, - self.stopJobDiskFull, - self.merge_root) + state_items = [i for i in args['items'] if not i.get('number')] + if state_items: + merger.setRepoState(state_items, args['repo_state']) - def _getMerger(self, root, logger=None): - if root != self.merge_root: - cache_root = self.merge_root - else: - cache_root = None - return zuul.merger.merger.Merger( - root, self.connections, self.merge_email, self.merge_name, - self.merge_speed_limit, self.merge_speed_time, cache_root, logger) + for project in args['projects']: + repo = repos[project['canonical_name']] + # If this project is the Zuul project and this is a ref + # rather than a change, checkout the ref. + if (project['canonical_name'] == + args['zuul']['project']['canonical_name'] and + (not args['zuul'].get('branch')) and + args['zuul'].get('ref')): + ref = args['zuul']['ref'] + else: + ref = None + self.checkoutBranch(repo, + project['name'], + ref, + args['branch'], + args['override_branch'], + project['override_branch'], + project['default_branch']) - def start(self): - self._running = True - self._command_running = True - server = self.config.get('gearman', 'server') - port = get_default(self.config, 'gearman', 'port', 4730) - ssl_key = get_default(self.config, 'gearman', 'ssl_key') - ssl_cert = get_default(self.config, 'gearman', 'ssl_cert') - ssl_ca = get_default(self.config, 'gearman', 'ssl_ca') - self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger') - self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca) - self.executor_worker = ExecutorExecuteWorker( - self, 'Zuul Executor Server') - self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca) - self.log.debug("Waiting for server") - self.merger_worker.waitForServer() - self.executor_worker.waitForServer() - self.log.debug("Registering") - self.register() - - self.log.debug("Starting command processor") - self.command_socket.start() - self.command_thread = threading.Thread(target=self.runCommand) - self.command_thread.daemon = True - self.command_thread.start() - - self.log.debug("Starting worker") - self.update_thread = threading.Thread(target=self._updateLoop) - self.update_thread.daemon = True - self.update_thread.start() - self.merger_thread = threading.Thread(target=self.run_merger) - self.merger_thread.daemon = True - self.merger_thread.start() - self.executor_thread = threading.Thread(target=self.run_executor) - self.executor_thread.daemon = True - self.executor_thread.start() - self.governor_stop_event = threading.Event() - self.governor_thread = threading.Thread(target=self.run_governor) - self.governor_thread.daemon = True - self.governor_thread.start() - self.disk_accountant.start() - - def register(self): - self.register_work() - self.executor_worker.registerFunction("executor:stop:%s" % - self.hostname) - self.merger_worker.registerFunction("merger:merge") - self.merger_worker.registerFunction("merger:cat") - self.merger_worker.registerFunction("merger:refstate") + # Delete the origin remote from each repo we set up since + # it will not be valid within the jobs. + for repo in repos.values(): + repo.deleteRemote('origin') - def register_work(self): - self.accepting_work = True - self.executor_worker.registerFunction("executor:execute") + # This prepares each playbook and the roles needed for each. + self.preparePlaybooks(args) - def unregister_work(self): - self.accepting_work = False - self.executor_worker.unregisterFunction("executor:execute") + self.prepareAnsibleFiles(args) + self.writeLoggingConfig() - def stop(self): - self.log.debug("Stopping") - self.disk_accountant.stop() - self.governor_stop_event.set() - self._running = False - self._command_running = False - self.command_socket.stop() - self.update_queue.put(None) + data = { + # TODO(mordred) worker_name is needed as a unique name for the + # client to use for cancelling jobs on an executor. It's defaulting + # to the hostname for now, but in the future we should allow + # setting a per-executor override so that one can run more than + # one executor on a host. + 'worker_name': self.executor_server.hostname, + 'worker_hostname': self.executor_server.hostname, + 'worker_log_port': self.executor_server.log_streaming_port + } + if self.executor_server.log_streaming_port != DEFAULT_FINGER_PORT: + data['url'] = "finger://{hostname}:{port}/{uuid}".format( + hostname=data['worker_hostname'], + port=data['worker_log_port'], + uuid=self.job.unique) + else: + data['url'] = 'finger://{hostname}/{uuid}'.format( + hostname=data['worker_hostname'], + uuid=self.job.unique) - for job_worker in list(self.job_workers.values()): - try: - job_worker.stop() - except Exception: - self.log.exception("Exception sending stop command " - "to worker:") - self.merger_worker.shutdown() - self.executor_worker.shutdown() - self.log.debug("Stopped") + self.job.sendWorkData(json.dumps(data)) + self.job.sendWorkStatus(0, 100) - def pause(self): - # TODOv3: implement - pass + result = self.runPlaybooks(args) + if self.aborted_reason == self.RESULT_DISK_FULL: + result = 'DISK_FULL' + data = self.getResultData() + result_data = json.dumps(dict(result=result, + data=data)) + self.log.debug("Sending result: %s" % (result_data,)) + self.job.sendWorkComplete(result_data) - def unpause(self): - # TODOv3: implement - pass + def getResultData(self): + data = {} + try: + with open(self.jobdir.result_data_file) as f: + file_data = f.read() + if file_data: + data = json.loads(file_data) + except Exception: + self.log.exception("Unable to load result data:") + return data - def graceful(self): - # TODOv3: implement - pass + def doMergeChanges(self, merger, items, repo_state): + ret = merger.mergeChanges(items, repo_state=repo_state) + if not ret: # merge conflict + result = dict(result='MERGER_FAILURE') + self.job.sendWorkComplete(json.dumps(result)) + return False + recent = ret[3] + for key, commit in recent.items(): + (connection, project, branch) = key + repo = merger.getRepo(connection, project) + repo.setRef('refs/heads/' + branch, commit) + return True - def verboseOn(self): - self.verbose = True + def checkoutBranch(self, repo, project_name, ref, zuul_branch, + job_branch, project_override_branch, + project_default_branch): + branches = repo.getBranches() + if project_override_branch in branches: + self.log.info("Checking out %s project override branch %s", + project_name, project_override_branch) + repo.checkoutLocalBranch(project_override_branch) + elif job_branch in branches: + self.log.info("Checking out %s job branch %s", + project_name, job_branch) + repo.checkoutLocalBranch(job_branch) + elif ref and ref.startswith('refs/heads/'): + b = ref[len('refs/heads/'):] + self.log.info("Checking out %s branch ref %s", + project_name, b) + repo.checkoutLocalBranch(b) + elif ref and ref.startswith('refs/tags/'): + t = ref[len('refs/tags/'):] + self.log.info("Checking out %s tag ref %s", + project_name, t) + repo.checkout(t) + elif zuul_branch and zuul_branch in branches: + self.log.info("Checking out %s zuul branch %s", + project_name, zuul_branch) + repo.checkoutLocalBranch(zuul_branch) + elif project_default_branch in branches: + self.log.info("Checking out %s project default branch %s", + project_name, project_default_branch) + repo.checkoutLocalBranch(project_default_branch) + else: + raise ExecutorError("Project %s does not have the " + "default branch %s" % + (project_name, project_default_branch)) - def verboseOff(self): - self.verbose = False + def runPlaybooks(self, args): + result = None - def keep(self): - self.keep_jobdir = True + # Run the Ansible 'setup' module on all hosts in the inventory + # at the start of the job with a 60 second timeout. If we + # aren't able to connect to all the hosts and gather facts + # within that timeout, there is likely a network problem + # between here and the hosts in the inventory; return them and + # reschedule the job. + setup_status, setup_code = self.runAnsibleSetup( + self.jobdir.setup_playbook) + if setup_status != self.RESULT_NORMAL or setup_code != 0: + return result - def nokeep(self): - self.keep_jobdir = False + pre_failed = False + success = False + for index, playbook in enumerate(self.jobdir.pre_playbooks): + # TODOv3(pabelanger): Implement pre-run timeout setting. + pre_status, pre_code = self.runAnsiblePlaybook( + playbook, args['timeout'], phase='pre', index=index) + if pre_status != self.RESULT_NORMAL or pre_code != 0: + # These should really never fail, so return None and have + # zuul try again + pre_failed = True + break - def join(self): - self.update_thread.join() - self.merger_thread.join() - self.executor_thread.join() - self.governor_thread.join() + if not pre_failed: + job_status, job_code = self.runAnsiblePlaybook( + self.jobdir.playbook, args['timeout'], phase='run') + if job_status == self.RESULT_ABORTED: + return 'ABORTED' + elif job_status == self.RESULT_TIMED_OUT: + # Set the pre-failure flag so this doesn't get + # overridden by a post-failure. + pre_failed = True + result = 'TIMED_OUT' + elif job_status == self.RESULT_NORMAL: + success = (job_code == 0) + if success: + result = 'SUCCESS' + else: + result = 'FAILURE' + else: + # The result of the job is indeterminate. Zuul will + # run it again. + return None - def runCommand(self): - while self._command_running: - try: - command = self.command_socket.get().decode('utf8') - if command != '_stop': - self.command_map[command]() - except Exception: - self.log.exception("Exception while processing command") + for index, playbook in enumerate(self.jobdir.post_playbooks): + # TODOv3(pabelanger): Implement post-run timeout setting. + post_status, post_code = self.runAnsiblePlaybook( + playbook, args['timeout'], success, phase='post', index=index) + if post_status != self.RESULT_NORMAL or post_code != 0: + success = False + # If we encountered a pre-failure, that takes + # precedence over the post result. + if not pre_failed: + result = 'POST_FAILURE' + if (index + 1) == len(self.jobdir.post_playbooks): + self._logFinalPlaybookError() - def _updateLoop(self): - while self._running: - try: - self._innerUpdateLoop() - except: - self.log.exception("Exception in update thread:") + return result - def _innerUpdateLoop(self): - # Inside of a loop that keeps the main repositories up to date - task = self.update_queue.get() - if task is None: - # We are asked to stop + def _logFinalPlaybookError(self): + # Failures in the final post playbook can include failures + # uploading logs, which makes diagnosing issues difficult. + # Grab the output from the last playbook from the json + # file and log it. + json_output = self.jobdir.job_output_file.replace('txt', 'json') + self.log.debug("Final playbook failed") + if not os.path.exists(json_output): + self.log.debug("JSON logfile {logfile} is missing".format( + logfile=json_output)) return - with self.merger_lock: - self.log.info("Updating repo %s/%s" % ( - task.connection_name, task.project_name)) - self.merger.updateRepo(task.connection_name, task.project_name) - self.log.debug("Finished updating repo %s/%s" % - (task.connection_name, task.project_name)) - task.setComplete() - - def update(self, connection_name, project_name): - # Update a repository in the main merger - task = UpdateTask(connection_name, project_name) - task = self.update_queue.put(task) - return task - - def run_merger(self): - self.log.debug("Starting merger listener") - while self._running: - try: - job = self.merger_worker.getJob() - try: - if job.name == 'merger:cat': - self.log.debug("Got cat job: %s" % job.unique) - self.cat(job) - elif job.name == 'merger:merge': - self.log.debug("Got merge job: %s" % job.unique) - self.merge(job) - elif job.name == 'merger:refstate': - self.log.debug("Got refstate job: %s" % job.unique) - self.refstate(job) - else: - self.log.error("Unable to handle job %s" % job.name) - job.sendWorkFail() - except Exception: - self.log.exception("Exception while running job") - job.sendWorkException( - traceback.format_exc().encode('utf8')) - except gear.InterruptedError: - pass - except Exception: - self.log.exception("Exception while getting job") - - def run_executor(self): - self.log.debug("Starting executor listener") - while self._running: - try: - job = self.executor_worker.getJob() - try: - if job.name == 'executor:execute': - self.log.debug("Got execute job: %s" % job.unique) - self.executeJob(job) - elif job.name.startswith('executor:stop'): - self.log.debug("Got stop job: %s" % job.unique) - self.stopJob(job) - else: - self.log.error("Unable to handle job %s" % job.name) - job.sendWorkFail() - except Exception: - self.log.exception("Exception while running job") - job.sendWorkException( - traceback.format_exc().encode('utf8')) - except gear.InterruptedError: - pass - except Exception: - self.log.exception("Exception while getting job") - - def run_governor(self): - while not self.governor_stop_event.wait(30): - self.manageLoad() + try: + output = json.load(open(json_output, 'r')) + last_playbook = output[-1] + # Transform json to yaml - because it's easier to read and given + # the size of the data it'll be extra-hard to read this as an + # all on one line stringified nested dict. + yaml_out = yaml.safe_dump(last_playbook, default_flow_style=False) + for line in yaml_out.split('\n'): + self.log.debug(line) + except Exception: + self.log.exception( + "Could not decode json from {logfile}".format( + logfile=json_output)) - def executeJob(self, job): - self.job_workers[job.unique] = AnsibleJob(self, job) - self.job_workers[job.unique].run() + def getHostList(self, args): + hosts = [] + for node in args['nodes']: + # NOTE(mordred): This assumes that the nodepool launcher + # and the zuul executor both have similar network + # characteristics, as the launcher will do a test for ipv6 + # viability and if so, and if the node has an ipv6 + # address, it will be the interface_ip. force-ipv4 can be + # set to True in the clouds.yaml for a cloud if this + # results in the wrong thing being in interface_ip + # TODO(jeblair): Move this notice to the docs. + ip = node.get('interface_ip') + port = node.get('ssh_port', 22) + host_vars = dict( + ansible_host=ip, + ansible_user=self.executor_server.default_username, + ansible_port=port, + nodepool=dict( + label=node.get('label'), + az=node.get('az'), + cloud=node.get('cloud'), + provider=node.get('provider'), + region=node.get('region'), + interface_ip=node.get('interface_ip'), + public_ipv4=node.get('public_ipv4'), + private_ipv4=node.get('private_ipv4'), + public_ipv6=node.get('public_ipv6'))) - def manageLoad(self): - ''' Apply some heuristics to decide whether or not we should - be askign for more jobs ''' - load_avg = os.getloadavg()[0] - if self.accepting_work: - # Don't unregister if we don't have any active jobs. - if load_avg > self.max_load_avg and self.job_workers: - self.log.info( - "Unregistering due to high system load {} > {}".format( - load_avg, self.max_load_avg)) - self.unregister_work() - elif load_avg <= self.max_load_avg: - self.log.info( - "Re-registering as load is within limits {} <= {}".format( - load_avg, self.max_load_avg)) - self.register_work() + host_keys = [] + for key in node.get('host_keys'): + if port != 22: + host_keys.append("[%s]:%s %s" % (ip, port, key)) + else: + host_keys.append("%s %s" % (ip, key)) - def finishJob(self, unique): - del(self.job_workers[unique]) + hosts.append(dict( + name=node['name'], + host_vars=host_vars, + host_keys=host_keys)) + return hosts - def stopJobDiskFull(self, jobdir): - unique = os.path.basename(jobdir) - self.stopJobByUnique(unique, reason=AnsibleJob.RESULT_DISK_FULL) + def _blockPluginDirs(self, path): + '''Prevent execution of playbooks or roles with plugins - def stopJob(self, job): - try: - args = json.loads(job.arguments) - self.log.debug("Stop job with arguments: %s" % (args,)) - unique = args['uuid'] - self.stopJobByUnique(unique) - finally: - job.sendWorkComplete() + Plugins are loaded from roles and also if there is a plugin + dir adjacent to the playbook. Throw an error if the path + contains a location that would cause a plugin to get loaded. - def stopJobByUnique(self, unique, reason=None): - job_worker = self.job_workers.get(unique) - if not job_worker: - self.log.debug("Unable to find worker for job %s" % (unique,)) - return - try: - job_worker.stop(reason) - except Exception: - self.log.exception("Exception sending stop command " - "to worker:") + ''' + for entry in os.listdir(path): + if os.path.isdir(entry) and entry.endswith('_plugins'): + raise ExecutorError( + "Ansible plugin dir %s found adjacent to playbook %s in " + "non-trusted repo." % (entry, path)) - def cat(self, job): - args = json.loads(job.arguments) - task = self.update(args['connection'], args['project']) - task.wait() - with self.merger_lock: - files = self.merger.getFiles(args['connection'], args['project'], - args['branch'], args['files'], - args.get('dirs', [])) - result = dict(updated=True, - files=files) - job.sendWorkComplete(json.dumps(result)) + def findPlaybook(self, path, required=False, trusted=False): + for ext in ['.yaml', '.yml']: + fn = path + ext + if os.path.exists(fn): + if not trusted: + playbook_dir = os.path.dirname(os.path.abspath(fn)) + self._blockPluginDirs(playbook_dir) + return fn + if required: + raise ExecutorError("Unable to find playbook %s" % path) + return None - def refstate(self, job): - args = json.loads(job.arguments) - with self.merger_lock: - success, repo_state = self.merger.getRepoState(args['items']) - result = dict(updated=success, - repo_state=repo_state) - job.sendWorkComplete(json.dumps(result)) + def preparePlaybooks(self, args): + self.writeAnsibleConfig(self.jobdir.setup_playbook) - def merge(self, job): - args = json.loads(job.arguments) - with self.merger_lock: - ret = self.merger.mergeChanges(args['items'], args.get('files'), - args.get('dirs', []), - args.get('repo_state')) - result = dict(merged=(ret is not None)) - if ret is None: - result['commit'] = result['files'] = result['repo_state'] = None - else: - (result['commit'], result['files'], result['repo_state'], - recent) = ret - job.sendWorkComplete(json.dumps(result)) + for playbook in args['pre_playbooks']: + jobdir_playbook = self.jobdir.addPrePlaybook() + self.preparePlaybook(jobdir_playbook, playbook, + args, required=True) + for playbook in args['playbooks']: + jobdir_playbook = self.jobdir.addPlaybook() + self.preparePlaybook(jobdir_playbook, playbook, + args, required=False) + if jobdir_playbook.path is not None: + self.jobdir.playbook = jobdir_playbook + break -class AnsibleJobLogAdapter(logging.LoggerAdapter): - def process(self, msg, kwargs): - msg, kwargs = super(AnsibleJobLogAdapter, self).process(msg, kwargs) - msg = '[build: %s] %s' % (kwargs['extra']['job'], msg) - return msg, kwargs + if self.jobdir.playbook is None: + raise ExecutorError("No valid playbook found") + for playbook in args['post_playbooks']: + jobdir_playbook = self.jobdir.addPostPlaybook() + self.preparePlaybook(jobdir_playbook, playbook, + args, required=True) -class AnsibleJob(object): - RESULT_NORMAL = 1 - RESULT_TIMED_OUT = 2 - RESULT_UNREACHABLE = 3 - RESULT_ABORTED = 4 - RESULT_DISK_FULL = 5 + def preparePlaybook(self, jobdir_playbook, playbook, args, required): + self.log.debug("Prepare playbook repo for %s" % + (playbook['project'],)) + # Check out the playbook repo if needed and set the path to + # the playbook that should be run. + source = self.executor_server.connections.getSource( + playbook['connection']) + project = source.getProject(playbook['project']) + jobdir_playbook.trusted = playbook['trusted'] + jobdir_playbook.branch = playbook['branch'] + jobdir_playbook.canonical_name_and_path = os.path.join( + project.canonical_name, playbook['path']) + path = None + if not playbook['trusted']: + # This is a project repo, so it is safe to use the already + # checked out version (from speculative merging) of the + # playbook + for i in args['items']: + if (i['connection'] == playbook['connection'] and + i['project'] == playbook['project']): + # We already have this repo prepared + path = os.path.join(self.jobdir.src_root, + project.canonical_hostname, + project.name, + playbook['path']) + break + if not path: + # The playbook repo is either a config repo, or it isn't in + # the stack of changes we are testing, so check out the branch + # tip into a dedicated space. + path = self.checkoutTrustedProject(project, playbook['branch']) + path = os.path.join(path, playbook['path']) - RESULT_MAP = { - RESULT_NORMAL: 'RESULT_NORMAL', - RESULT_TIMED_OUT: 'RESULT_TIMED_OUT', - RESULT_UNREACHABLE: 'RESULT_UNREACHABLE', - RESULT_ABORTED: 'RESULT_ABORTED', - RESULT_DISK_FULL: 'RESULT_DISK_FULL', - } + jobdir_playbook.path = self.findPlaybook( + path, + required=required, + trusted=playbook['trusted']) - def __init__(self, executor_server, job): - logger = logging.getLogger("zuul.AnsibleJob") - self.log = AnsibleJobLogAdapter(logger, {'job': job.unique}) - self.executor_server = executor_server - self.job = job - self.jobdir = None - self.proc = None - self.proc_lock = threading.Lock() - self.running = False - self.aborted = False - self.aborted_reason = None - self.thread = None - self.private_key_file = get_default(self.executor_server.config, - 'executor', 'private_key_file', - '~/.ssh/id_rsa') - self.ssh_agent = SshAgent() + # If this playbook doesn't exist, don't bother preparing + # roles. + if not jobdir_playbook.path: + return - self.executor_variables_file = None + for role in playbook['roles']: + self.prepareRole(jobdir_playbook, role, args) - if self.executor_server.config.has_option('executor', 'variables'): - self.executor_variables_file = self.executor_server.config.get( - 'executor', 'variables') + secrets = playbook['secrets'] + if secrets: + if 'zuul' in secrets: + # We block this in configloader, but block it here too to make + # sure that a job doesn't pass secrets named zuul. + raise Exception("Defining secrets named 'zuul' is not allowed") + jobdir_playbook.secrets_content = yaml.safe_dump( + secrets, default_flow_style=False) - def run(self): - self.running = True - self.thread = threading.Thread(target=self.execute) - self.thread.start() + self.writeAnsibleConfig(jobdir_playbook) - def stop(self, reason=None): - self.aborted = True - self.aborted_reason = reason - self.abortRunningProc() - if self.thread: - self.thread.join() + def checkoutTrustedProject(self, project, branch): + root = self.jobdir.getTrustedProject(project.canonical_name, + branch) + if not root: + root = self.jobdir.addTrustedProject(project.canonical_name, + branch) + merger = self.executor_server._getMerger(root, self.log) + merger.checkoutBranch(project.connection_name, project.name, + branch) - def execute(self): - try: - self.ssh_agent.start() - self.ssh_agent.add(self.private_key_file) - self.jobdir = JobDir(self.executor_server.jobdir_root, - self.executor_server.keep_jobdir, - str(self.job.unique)) - self._execute() - except ExecutorError as e: - result_data = json.dumps(dict(result='ERROR', - error_detail=e.args[0])) - self.log.debug("Sending result: %s" % (result_data,)) - self.job.sendWorkComplete(result_data) - except Exception: - self.log.exception("Exception while executing job") - self.job.sendWorkException(traceback.format_exc()) - finally: - self.running = False - if self.jobdir: - try: - self.jobdir.cleanup() - except Exception: - self.log.exception("Error cleaning up jobdir:") - if self.ssh_agent: - try: - self.ssh_agent.stop() - except Exception: - self.log.exception("Error stopping SSH agent:") - try: - self.executor_server.finishJob(self.job.unique) - except Exception: - self.log.exception("Error finalizing job thread:") + path = os.path.join(root, + project.canonical_hostname, + project.name) + return path - def _execute(self): - args = json.loads(self.job.arguments) - self.log.debug("Beginning job %s for ref %s" % - (self.job.name, args['zuul']['ref'])) - self.log.debug("Job root: %s" % (self.jobdir.root,)) - tasks = [] - projects = set() + def prepareRole(self, jobdir_playbook, role, args): + if role['type'] == 'zuul': + root = jobdir_playbook.addRole() + self.prepareZuulRole(jobdir_playbook, role, args, root) - # Make sure all projects used by the job are updated... - for project in args['projects']: - self.log.debug("Updating project %s" % (project,)) - tasks.append(self.executor_server.update( - project['connection'], project['name'])) - projects.add((project['connection'], project['name'])) + def findRole(self, path, trusted=False): + d = os.path.join(path, 'tasks') + if os.path.isdir(d): + # This is a bare role + if not trusted: + self._blockPluginDirs(path) + # None signifies that the repo is a bare role + return None + d = os.path.join(path, 'roles') + if os.path.isdir(d): + # This repo has a collection of roles + if not trusted: + self._blockPluginDirs(d) + for entry in os.listdir(d): + entry_path = os.path.join(d, entry) + if os.path.isdir(entry_path): + self._blockPluginDirs(entry_path) + return d + # It is neither a bare role, nor a collection of roles + raise RoleNotFoundError("Unable to find role in %s" % (path,)) - # ...as well as all playbook and role projects. - repos = [] - playbooks = (args['pre_playbooks'] + args['playbooks'] + - args['post_playbooks']) - for playbook in playbooks: - repos.append(playbook) - repos += playbook['roles'] + def prepareZuulRole(self, jobdir_playbook, role, args, root): + self.log.debug("Prepare zuul role for %s" % (role,)) + # Check out the role repo if needed + source = self.executor_server.connections.getSource( + role['connection']) + project = source.getProject(role['project']) + name = role['target_name'] + path = None - for repo in repos: - self.log.debug("Updating playbook or role %s" % (repo['project'],)) - key = (repo['connection'], repo['project']) - if key not in projects: - tasks.append(self.executor_server.update(*key)) - projects.add(key) + if not jobdir_playbook.trusted: + # This playbook is untrested. Use the already checked out + # version (from speculative merging) of the role if it + # exists. - for task in tasks: - task.wait() + for i in args['items']: + if (i['connection'] == role['connection'] and + i['project'] == role['project']): + # We already have this repo prepared; use it. + path = os.path.join(self.jobdir.src_root, + project.canonical_hostname, + project.name) + break - self.log.debug("Git updates complete") - merger = self.executor_server._getMerger(self.jobdir.src_root, - self.log) - repos = {} - for project in args['projects']: - self.log.debug("Cloning %s/%s" % (project['connection'], - project['name'],)) - repo = merger.getRepo(project['connection'], - project['name']) - repos[project['canonical_name']] = repo + if not path: + # This is a trusted playbook or the role did not appear + # in the dependency chain for the change (in which case, + # there is no existing untrusted checkout of it). Check + # out the branch tip into a dedicated space. + path = self.checkoutTrustedProject(project, 'master') - merge_items = [i for i in args['items'] if i.get('number')] - if merge_items: - if not self.doMergeChanges(merger, merge_items, - args['repo_state']): - # There was a merge conflict and we have already sent - # a work complete result, don't run any jobs - return + # The name of the symlink is the requested name of the role + # (which may be the repo name or may be something else; this + # can come into play if this is a bare role). + link = os.path.join(root, name) + link = os.path.realpath(link) + if not link.startswith(os.path.realpath(root)): + raise ExecutorError("Invalid role name %s", name) + os.symlink(path, link) - state_items = [i for i in args['items'] if not i.get('number')] - if state_items: - merger.setRepoState(state_items, args['repo_state']) + try: + role_path = self.findRole(link, trusted=jobdir_playbook.trusted) + except RoleNotFoundError: + if role['implicit']: + self.log.info("Implicit role not found in %s", link) + return + raise + if role_path is None: + # In the case of a bare role, add the containing directory + role_path = root + self.log.debug("Adding role path %s", role_path) + jobdir_playbook.roles_path.append(role_path) - for project in args['projects']: - repo = repos[project['canonical_name']] - # If this project is the Zuul project and this is a ref - # rather than a change, checkout the ref. - if (project['canonical_name'] == - args['zuul']['project']['canonical_name'] and - (not args['zuul'].get('branch')) and - args['zuul'].get('ref')): - ref = args['zuul']['ref'] - else: - ref = None - self.checkoutBranch(repo, - project['name'], - ref, - args['branch'], - args['override_branch'], - project['override_branch'], - project['default_branch']) - - # Delete the origin remote from each repo we set up since - # it will not be valid within the jobs. - for repo in repos.values(): - repo.deleteRemote('origin') + def prepareAnsibleFiles(self, args): + all_vars = args['vars'].copy() + # TODO(mordred) Hack to work around running things with python3 + all_vars['ansible_python_interpreter'] = '/usr/bin/python2' + if 'zuul' in all_vars: + # We block this in configloader, but block it here too to make + # sure that a job doesn't pass variables named zuul. + raise Exception("Defining vars named 'zuul' is not allowed") + all_vars['zuul'] = args['zuul'].copy() + all_vars['zuul']['executor'] = dict( + hostname=self.executor_server.hostname, + src_root=self.jobdir.src_root, + log_root=self.jobdir.log_root, + work_root=self.jobdir.work_root, + result_data_file=self.jobdir.result_data_file) - # This prepares each playbook and the roles needed for each. - self.preparePlaybooks(args) + nodes = self.getHostList(args) + inventory = make_inventory_dict(nodes, args['groups'], all_vars) - self.prepareAnsibleFiles(args) - self.writeLoggingConfig() + with open(self.jobdir.inventory, 'w') as inventory_yaml: + inventory_yaml.write( + yaml.safe_dump(inventory, default_flow_style=False)) - data = { - # TODO(mordred) worker_name is needed as a unique name for the - # client to use for cancelling jobs on an executor. It's defaulting - # to the hostname for now, but in the future we should allow - # setting a per-executor override so that one can run more than - # one executor on a host. - 'worker_name': self.executor_server.hostname, - 'worker_hostname': self.executor_server.hostname, - 'worker_log_port': self.executor_server.log_streaming_port - } - if self.executor_server.log_streaming_port != DEFAULT_FINGER_PORT: - data['url'] = "finger://{hostname}:{port}/{uuid}".format( - hostname=data['worker_hostname'], - port=data['worker_log_port'], - uuid=self.job.unique) - else: - data['url'] = 'finger://{hostname}/{uuid}'.format( - hostname=data['worker_hostname'], - uuid=self.job.unique) + with open(self.jobdir.known_hosts, 'w') as known_hosts: + for node in nodes: + for key in node['host_keys']: + known_hosts.write('%s\n' % key) - self.job.sendWorkData(json.dumps(data)) - self.job.sendWorkStatus(0, 100) + def writeLoggingConfig(self): + self.log.debug("Writing logging config for job %s %s", + self.jobdir.job_output_file, + self.jobdir.logging_json) + logging_config = zuul.ansible.logconfig.JobLoggingConfig( + job_output_file=self.jobdir.job_output_file) + logging_config.writeJson(self.jobdir.logging_json) - result = self.runPlaybooks(args) - if self.aborted_reason == self.RESULT_DISK_FULL: - result = 'DISK_FULL' - data = self.getResultData() - result_data = json.dumps(dict(result=result, - data=data)) - self.log.debug("Sending result: %s" % (result_data,)) - self.job.sendWorkComplete(result_data) + def writeAnsibleConfig(self, jobdir_playbook): + trusted = jobdir_playbook.trusted - def getResultData(self): - data = {} - try: - with open(self.jobdir.result_data_file) as f: - file_data = f.read() - if file_data: - data = json.loads(file_data) - except Exception: - self.log.exception("Unable to load result data:") - return data + # TODO(mordred) This should likely be extracted into a more generalized + # mechanism for deployers being able to add callback + # plugins. + if ara_callbacks: + callback_path = '%s:%s' % ( + self.executor_server.callback_dir, + os.path.dirname(ara_callbacks.__file__)) + else: + callback_path = self.executor_server.callback_dir + with open(jobdir_playbook.ansible_config, 'w') as config: + config.write('[defaults]\n') + config.write('hostfile = %s\n' % self.jobdir.inventory) + config.write('local_tmp = %s/local_tmp\n' % + self.jobdir.ansible_cache_root) + config.write('retry_files_enabled = False\n') + config.write('gathering = smart\n') + config.write('fact_caching = jsonfile\n') + config.write('fact_caching_connection = %s\n' % + self.jobdir.fact_cache) + config.write('library = %s\n' + % self.executor_server.library_dir) + config.write('command_warnings = False\n') + config.write('callback_plugins = %s\n' % callback_path) + config.write('stdout_callback = zuul_stream\n') + config.write('filter_plugins = %s\n' + % self.executor_server.filter_dir) + # bump the timeout because busy nodes may take more than + # 10s to respond + config.write('timeout = 30\n') + if not trusted: + config.write('action_plugins = %s\n' + % self.executor_server.action_dir) + config.write('lookup_plugins = %s\n' + % self.executor_server.lookup_dir) - def doMergeChanges(self, merger, items, repo_state): - ret = merger.mergeChanges(items, repo_state=repo_state) - if not ret: # merge conflict - result = dict(result='MERGER_FAILURE') - self.job.sendWorkComplete(json.dumps(result)) - return False - recent = ret[3] - for key, commit in recent.items(): - (connection, project, branch) = key - repo = merger.getRepo(connection, project) - repo.setRef('refs/heads/' + branch, commit) - return True + if jobdir_playbook.roles_path: + config.write('roles_path = %s\n' % ':'.join( + jobdir_playbook.roles_path)) - def checkoutBranch(self, repo, project_name, ref, zuul_branch, - job_branch, project_override_branch, - project_default_branch): - branches = repo.getBranches() - if project_override_branch in branches: - self.log.info("Checking out %s project override branch %s", - project_name, project_override_branch) - repo.checkoutLocalBranch(project_override_branch) - elif job_branch in branches: - self.log.info("Checking out %s job branch %s", - project_name, job_branch) - repo.checkoutLocalBranch(job_branch) - elif ref and ref.startswith('refs/heads/'): - b = ref[len('refs/heads/'):] - self.log.info("Checking out %s branch ref %s", - project_name, b) - repo.checkoutLocalBranch(b) - elif ref and ref.startswith('refs/tags/'): - t = ref[len('refs/tags/'):] - self.log.info("Checking out %s tag ref %s", - project_name, t) - repo.checkout(t) - elif zuul_branch and zuul_branch in branches: - self.log.info("Checking out %s zuul branch %s", - project_name, zuul_branch) - repo.checkoutLocalBranch(zuul_branch) - elif project_default_branch in branches: - self.log.info("Checking out %s project default branch %s", - project_name, project_default_branch) - repo.checkoutLocalBranch(project_default_branch) - else: - raise ExecutorError("Project %s does not have the " - "default branch %s" % - (project_name, project_default_branch)) + # On playbooks with secrets we want to prevent the + # printing of args since they may be passed to a task or a + # role. Otherwise, printing the args could be useful for + # debugging. + config.write('display_args_to_stdout = %s\n' % + str(not jobdir_playbook.secrets_content)) - def runPlaybooks(self, args): - result = None + # Increase the internal poll interval of ansible. + # The default interval of 0.001s is optimized for interactive + # ui at the expense of CPU load. As we have a non-interactive + # automation use case a longer poll interval is more suitable + # and reduces CPU load of the ansible process. + config.write('internal_poll_interval = 0.01\n') - # Run the Ansible 'setup' module on all hosts in the inventory - # at the start of the job with a 60 second timeout. If we - # aren't able to connect to all the hosts and gather facts - # within that timeout, there is likely a network problem - # between here and the hosts in the inventory; return them and - # reschedule the job. - setup_status, setup_code = self.runAnsibleSetup( - self.jobdir.setup_playbook) - if setup_status != self.RESULT_NORMAL or setup_code != 0: - return result + config.write('[ssh_connection]\n') + # NB: when setting pipelining = True, keep_remote_files + # must be False (the default). Otherwise it apparently + # will override the pipelining option and effectively + # disable it. Pipelining has a side effect of running the + # command without a tty (ie, without the -tt argument to + # ssh). We require this behavior so that if a job runs a + # command which expects interactive input on a tty (such + # as sudo) it does not hang. + config.write('pipelining = True\n') + ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \ + "-o UserKnownHostsFile=%s" % self.jobdir.known_hosts + config.write('ssh_args = %s\n' % ssh_args) - pre_failed = False - success = False - for index, playbook in enumerate(self.jobdir.pre_playbooks): - # TODOv3(pabelanger): Implement pre-run timeout setting. - pre_status, pre_code = self.runAnsiblePlaybook( - playbook, args['timeout'], phase='pre', index=index) - if pre_status != self.RESULT_NORMAL or pre_code != 0: - # These should really never fail, so return None and have - # zuul try again - pre_failed = True - break + def _ansibleTimeout(self, msg): + self.log.warning(msg) + self.abortRunningProc() - if not pre_failed: - job_status, job_code = self.runAnsiblePlaybook( - self.jobdir.playbook, args['timeout'], phase='run') - if job_status == self.RESULT_ABORTED: - return 'ABORTED' - elif job_status == self.RESULT_TIMED_OUT: - # Set the pre-failure flag so this doesn't get - # overridden by a post-failure. - pre_failed = True - result = 'TIMED_OUT' - elif job_status == self.RESULT_NORMAL: - success = (job_code == 0) - if success: - result = 'SUCCESS' - else: - result = 'FAILURE' - else: - # The result of the job is indeterminate. Zuul will - # run it again. - return None + def abortRunningProc(self): + with self.proc_lock: + if not self.proc: + self.log.debug("Abort: no process is running") + return + self.log.debug("Abort: sending kill signal to job " + "process group") + try: + pgid = os.getpgid(self.proc.pid) + os.killpg(pgid, signal.SIGKILL) + except Exception: + self.log.exception("Exception while killing ansible process:") - for index, playbook in enumerate(self.jobdir.post_playbooks): - # TODOv3(pabelanger): Implement post-run timeout setting. - post_status, post_code = self.runAnsiblePlaybook( - playbook, args['timeout'], success, phase='post', index=index) - if post_status != self.RESULT_NORMAL or post_code != 0: - success = False - # If we encountered a pre-failure, that takes - # precedence over the post result. - if not pre_failed: - result = 'POST_FAILURE' - if (index + 1) == len(self.jobdir.post_playbooks): - self._logFinalPlaybookError() + def runAnsible(self, cmd, timeout, playbook): + config_file = playbook.ansible_config + env_copy = os.environ.copy() + env_copy.update(self.ssh_agent.env) + if ara_callbacks: + env_copy['ARA_LOG_CONFIG'] = self.jobdir.logging_json + env_copy['ZUUL_JOB_LOG_CONFIG'] = self.jobdir.logging_json + env_copy['ZUUL_JOBDIR'] = self.jobdir.root + pythonpath = env_copy.get('PYTHONPATH') + if pythonpath: + pythonpath = [pythonpath] + else: + pythonpath = [] + pythonpath = [self.executor_server.ansible_dir] + pythonpath + env_copy['PYTHONPATH'] = os.path.pathsep.join(pythonpath) - return result + if playbook.trusted: + opt_prefix = 'trusted' + else: + opt_prefix = 'untrusted' + ro_paths = get_default(self.executor_server.config, 'executor', + '%s_ro_paths' % opt_prefix) + rw_paths = get_default(self.executor_server.config, 'executor', + '%s_rw_paths' % opt_prefix) + ro_paths = ro_paths.split(":") if ro_paths else [] + rw_paths = rw_paths.split(":") if rw_paths else [] - def _logFinalPlaybookError(self): - # Failures in the final post playbook can include failures - # uploading logs, which makes diagnosing issues difficult. - # Grab the output from the last playbook from the json - # file and log it. - json_output = self.jobdir.job_output_file.replace('txt', 'json') - self.log.debug("Final playbook failed") - if not os.path.exists(json_output): - self.log.debug("JSON logfile {logfile} is missing".format( - logfile=json_output)) - return - try: - output = json.load(open(json_output, 'r')) - last_playbook = output[-1] - # Transform json to yaml - because it's easier to read and given - # the size of the data it'll be extra-hard to read this as an - # all on one line stringified nested dict. - yaml_out = yaml.safe_dump(last_playbook, default_flow_style=False) - for line in yaml_out.split('\n'): - self.log.debug(line) - except Exception: - self.log.exception( - "Could not decode json from {logfile}".format( - logfile=json_output)) + ro_paths.append(self.executor_server.ansible_dir) + ro_paths.append(self.jobdir.ansible_root) + ro_paths.append(self.jobdir.trusted_root) + ro_paths.append(playbook.root) - def getHostList(self, args): - hosts = [] - for node in args['nodes']: - # NOTE(mordred): This assumes that the nodepool launcher - # and the zuul executor both have similar network - # characteristics, as the launcher will do a test for ipv6 - # viability and if so, and if the node has an ipv6 - # address, it will be the interface_ip. force-ipv4 can be - # set to True in the clouds.yaml for a cloud if this - # results in the wrong thing being in interface_ip - # TODO(jeblair): Move this notice to the docs. - ip = node.get('interface_ip') - port = node.get('ssh_port', 22) - host_vars = dict( - ansible_host=ip, - ansible_user=self.executor_server.default_username, - ansible_port=port, - nodepool=dict( - label=node.get('label'), - az=node.get('az'), - cloud=node.get('cloud'), - provider=node.get('provider'), - region=node.get('region'), - interface_ip=node.get('interface_ip'), - public_ipv4=node.get('public_ipv4'), - private_ipv4=node.get('private_ipv4'), - public_ipv6=node.get('public_ipv6'))) + rw_paths.append(self.jobdir.ansible_cache_root) - host_keys = [] - for key in node.get('host_keys'): - if port != 22: - host_keys.append("[%s]:%s %s" % (ip, port, key)) - else: - host_keys.append("%s %s" % (ip, key)) + if self.executor_variables_file: + ro_paths.append(self.executor_variables_file) - hosts.append(dict( - name=node['name'], - host_vars=host_vars, - host_keys=host_keys)) - return hosts + secrets = {} + if playbook.secrets_content: + secrets[playbook.secrets] = playbook.secrets_content - def _blockPluginDirs(self, path): - '''Prevent execution of playbooks or roles with plugins + context = self.executor_server.execution_wrapper.getExecutionContext( + ro_paths, rw_paths, secrets) - Plugins are loaded from roles and also if there is a plugin - dir adjacent to the playbook. Throw an error if the path - contains a location that would cause a plugin to get loaded. + popen = context.getPopen( + work_dir=self.jobdir.work_root, + ssh_auth_sock=env_copy.get('SSH_AUTH_SOCK')) - ''' - for entry in os.listdir(path): - if os.path.isdir(entry) and entry.endswith('_plugins'): - raise ExecutorError( - "Ansible plugin dir %s found adjacent to playbook %s in " - "non-trusted repo." % (entry, path)) + env_copy['ANSIBLE_CONFIG'] = config_file + # NOTE(pabelanger): Default HOME variable to jobdir.work_root, as it is + # possible we don't bind mount current zuul user home directory. + env_copy['HOME'] = self.jobdir.work_root - def findPlaybook(self, path, required=False, trusted=False): - for ext in ['.yaml', '.yml']: - fn = path + ext - if os.path.exists(fn): - if not trusted: - playbook_dir = os.path.dirname(os.path.abspath(fn)) - self._blockPluginDirs(playbook_dir) - return fn - if required: - raise ExecutorError("Unable to find playbook %s" % path) - return None + with self.proc_lock: + if self.aborted: + return (self.RESULT_ABORTED, None) + self.log.debug("Ansible command: ANSIBLE_CONFIG=%s %s", + config_file, " ".join(shlex.quote(c) for c in cmd)) + self.proc = popen( + cmd, + cwd=self.jobdir.work_root, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + preexec_fn=os.setsid, + env=env_copy, + ) - def preparePlaybooks(self, args): - self.writeAnsibleConfig(self.jobdir.setup_playbook) + syntax_buffer = [] + ret = None + if timeout: + watchdog = Watchdog(timeout, self._ansibleTimeout, + ("Ansible timeout exceeded",)) + watchdog.start() + try: + # Use manual idx instead of enumerate so that RESULT lines + # don't count towards BUFFER_LINES_FOR_SYNTAX + idx = 0 + for line in iter(self.proc.stdout.readline, b''): + if line.startswith(b'RESULT'): + # TODO(mordred) Process result commands if sent + continue + else: + idx += 1 + if idx < BUFFER_LINES_FOR_SYNTAX: + syntax_buffer.append(line) + line = line[:1024].rstrip() + self.log.debug("Ansible output: %s" % (line,)) + self.log.debug("Ansible output terminated") + ret = self.proc.wait() + self.log.debug("Ansible exit code: %s" % (ret,)) + finally: + if timeout: + watchdog.stop() + self.log.debug("Stopped watchdog") + self.log.debug("Stopped disk job killer") - for playbook in args['pre_playbooks']: - jobdir_playbook = self.jobdir.addPrePlaybook() - self.preparePlaybook(jobdir_playbook, playbook, - args, required=True) + with self.proc_lock: + self.proc = None - for playbook in args['playbooks']: - jobdir_playbook = self.jobdir.addPlaybook() - self.preparePlaybook(jobdir_playbook, playbook, - args, required=False) - if jobdir_playbook.path is not None: - self.jobdir.playbook = jobdir_playbook - break + if timeout and watchdog.timed_out: + return (self.RESULT_TIMED_OUT, None) + if ret == 3: + # AnsibleHostUnreachable: We had a network issue connecting to + # our zuul-worker. + return (self.RESULT_UNREACHABLE, None) + elif ret == -9: + # Received abort request. + return (self.RESULT_ABORTED, None) + elif ret == 1: + if syntax_buffer[0].startswith('ERROR!'): + with open(self.jobdir.job_output_file, 'a') as job_output: + for line in syntax_buffer: + job_output.write("{now} | {line}\n".format( + now=datetime.datetime.now(), + line=line.decode('utf-8').rstrip())) + elif ret == 4: + # Ansible could not parse the yaml. + self.log.debug("Ansible parse error") + # TODO(mordred) If/when we rework use of logger in ansible-playbook + # we'll want to change how this works to use that as well. For now, + # this is what we need to do. + # TODO(mordred) We probably want to put this into the json output + # as well. + with open(self.jobdir.job_output_file, 'a') as job_output: + job_output.write("{now} | ANSIBLE PARSE ERROR\n".format( + now=datetime.datetime.now())) + for line in syntax_buffer: + job_output.write("{now} | {line}\n".format( + now=datetime.datetime.now(), + line=line.decode('utf-8').rstrip())) + elif ret == 250: + # Unexpected error from ansible + with open(self.jobdir.job_output_file, 'a') as job_output: + job_output.write("{now} | UNEXPECTED ANSIBLE ERROR\n".format( + now=datetime.datetime.now())) + found_marker = False + for line in syntax_buffer: + if line.startswith('ERROR! Unexpected Exception'): + found_marker = True + if not found_marker: + continue + job_output.write("{now} | {line}\n".format( + now=datetime.datetime.now(), + line=line.decode('utf-8').rstrip())) - if self.jobdir.playbook is None: - raise ExecutorError("No valid playbook found") + return (self.RESULT_NORMAL, ret) - for playbook in args['post_playbooks']: - jobdir_playbook = self.jobdir.addPostPlaybook() - self.preparePlaybook(jobdir_playbook, playbook, - args, required=True) + def runAnsibleSetup(self, playbook): + if self.executor_server.verbose: + verbose = '-vvv' + else: + verbose = '-v' - def preparePlaybook(self, jobdir_playbook, playbook, args, required): - self.log.debug("Prepare playbook repo for %s" % - (playbook['project'],)) - # Check out the playbook repo if needed and set the path to - # the playbook that should be run. - source = self.executor_server.connections.getSource( - playbook['connection']) - project = source.getProject(playbook['project']) - jobdir_playbook.trusted = playbook['trusted'] - jobdir_playbook.branch = playbook['branch'] - jobdir_playbook.canonical_name_and_path = os.path.join( - project.canonical_name, playbook['path']) - path = None - if not playbook['trusted']: - # This is a project repo, so it is safe to use the already - # checked out version (from speculative merging) of the - # playbook - for i in args['items']: - if (i['connection'] == playbook['connection'] and - i['project'] == playbook['project']): - # We already have this repo prepared - path = os.path.join(self.jobdir.src_root, - project.canonical_hostname, - project.name, - playbook['path']) - break - if not path: - # The playbook repo is either a config repo, or it isn't in - # the stack of changes we are testing, so check out the branch - # tip into a dedicated space. - path = self.checkoutTrustedProject(project, playbook['branch']) - path = os.path.join(path, playbook['path']) + cmd = ['ansible', '*', verbose, '-m', 'setup', + '-a', 'gather_subset=!all'] - jobdir_playbook.path = self.findPlaybook( - path, - required=required, - trusted=playbook['trusted']) + result, code = self.runAnsible( + cmd=cmd, timeout=60, playbook=playbook) + self.log.debug("Ansible complete, result %s code %s" % ( + self.RESULT_MAP[result], code)) + return result, code - # If this playbook doesn't exist, don't bother preparing - # roles. - if not jobdir_playbook.path: - return + def emitPlaybookBanner(self, playbook, step, phase, result=None): + # This is used to print a header and a footer, respectively at the + # beginning and the end of each playbook execution. + # We are doing it from the executor rather than from a callback because + # the parameters are not made available to the callback until it's too + # late. + phase = phase or '' + trusted = playbook.trusted + trusted = 'trusted' if trusted else 'untrusted' + branch = playbook.branch + playbook = playbook.canonical_name_and_path - for role in playbook['roles']: - self.prepareRole(jobdir_playbook, role, args) + if phase and phase != 'run': + phase = '{phase}-run'.format(phase=phase) + phase = phase.upper() - secrets = playbook['secrets'] - if secrets: - if 'zuul' in secrets: - # We block this in configloader, but block it here too to make - # sure that a job doesn't pass secrets named zuul. - raise Exception("Defining secrets named 'zuul' is not allowed") - jobdir_playbook.secrets_content = yaml.safe_dump( - secrets, default_flow_style=False) + if result is not None: + result = self.RESULT_MAP[result] + msg = "{phase} {step} {result}: [{trusted} : {playbook}@{branch}]" + msg = msg.format(phase=phase, step=step, result=result, + trusted=trusted, playbook=playbook, branch=branch) + else: + msg = "{phase} {step}: [{trusted} : {playbook}@{branch}]" + msg = msg.format(phase=phase, step=step, trusted=trusted, + playbook=playbook, branch=branch) - self.writeAnsibleConfig(jobdir_playbook) + with open(self.jobdir.job_output_file, 'a') as job_output: + job_output.write("{now} | {msg}\n".format( + now=datetime.datetime.now(), + msg=msg)) - def checkoutTrustedProject(self, project, branch): - root = self.jobdir.getTrustedProject(project.canonical_name, - branch) - if not root: - root = self.jobdir.addTrustedProject(project.canonical_name, - branch) - merger = self.executor_server._getMerger(root, self.log) - merger.checkoutBranch(project.connection_name, project.name, - branch) + def runAnsiblePlaybook(self, playbook, timeout, success=None, + phase=None, index=None): + if self.executor_server.verbose: + verbose = '-vvv' + else: + verbose = '-v' + + cmd = ['ansible-playbook', verbose, playbook.path] + if playbook.secrets_content: + cmd.extend(['-e', '@' + playbook.secrets]) - path = os.path.join(root, - project.canonical_hostname, - project.name) - return path + if success is not None: + cmd.extend(['-e', 'zuul_success=%s' % str(bool(success))]) - def prepareRole(self, jobdir_playbook, role, args): - if role['type'] == 'zuul': - root = jobdir_playbook.addRole() - self.prepareZuulRole(jobdir_playbook, role, args, root) + if phase: + cmd.extend(['-e', 'zuul_execution_phase=%s' % phase]) - def findRole(self, path, trusted=False): - d = os.path.join(path, 'tasks') - if os.path.isdir(d): - # This is a bare role - if not trusted: - self._blockPluginDirs(path) - # None signifies that the repo is a bare role - return None - d = os.path.join(path, 'roles') - if os.path.isdir(d): - # This repo has a collection of roles - if not trusted: - self._blockPluginDirs(d) - for entry in os.listdir(d): - entry_path = os.path.join(d, entry) - if os.path.isdir(entry_path): - self._blockPluginDirs(entry_path) - return d - # It is neither a bare role, nor a collection of roles - raise RoleNotFoundError("Unable to find role in %s" % (path,)) + if index is not None: + cmd.extend(['-e', 'zuul_execution_phase_index=%s' % index]) - def prepareZuulRole(self, jobdir_playbook, role, args, root): - self.log.debug("Prepare zuul role for %s" % (role,)) - # Check out the role repo if needed - source = self.executor_server.connections.getSource( - role['connection']) - project = source.getProject(role['project']) - name = role['target_name'] - path = None + cmd.extend(['-e', 'zuul_execution_trusted=%s' % str(playbook.trusted)]) + cmd.extend([ + '-e', + 'zuul_execution_canonical_name_and_path=%s' + % playbook.canonical_name_and_path]) + cmd.extend(['-e', 'zuul_execution_branch=%s' % str(playbook.branch)]) - if not jobdir_playbook.trusted: - # This playbook is untrested. Use the already checked out - # version (from speculative merging) of the role if it - # exists. + if self.executor_variables_file is not None: + cmd.extend(['-e@%s' % self.executor_variables_file]) - for i in args['items']: - if (i['connection'] == role['connection'] and - i['project'] == role['project']): - # We already have this repo prepared; use it. - path = os.path.join(self.jobdir.src_root, - project.canonical_hostname, - project.name) - break + self.emitPlaybookBanner(playbook, 'START', phase) - if not path: - # This is a trusted playbook or the role did not appear - # in the dependency chain for the change (in which case, - # there is no existing untrusted checkout of it). Check - # out the branch tip into a dedicated space. - path = self.checkoutTrustedProject(project, 'master') + result, code = self.runAnsible( + cmd=cmd, timeout=timeout, playbook=playbook) + self.log.debug("Ansible complete, result %s code %s" % ( + self.RESULT_MAP[result], code)) - # The name of the symlink is the requested name of the role - # (which may be the repo name or may be something else; this - # can come into play if this is a bare role). - link = os.path.join(root, name) - link = os.path.realpath(link) - if not link.startswith(os.path.realpath(root)): - raise ExecutorError("Invalid role name %s", name) - os.symlink(path, link) + self.emitPlaybookBanner(playbook, 'END', phase, result=result) + return result, code - try: - role_path = self.findRole(link, trusted=jobdir_playbook.trusted) - except RoleNotFoundError: - if role['implicit']: - self.log.info("Implicit role not found in %s", link) - return - raise - if role_path is None: - # In the case of a bare role, add the containing directory - role_path = root - self.log.debug("Adding role path %s", role_path) - jobdir_playbook.roles_path.append(role_path) - def prepareAnsibleFiles(self, args): - all_vars = args['vars'].copy() - # TODO(mordred) Hack to work around running things with python3 - all_vars['ansible_python_interpreter'] = '/usr/bin/python2' - if 'zuul' in all_vars: - # We block this in configloader, but block it here too to make - # sure that a job doesn't pass variables named zuul. - raise Exception("Defining vars named 'zuul' is not allowed") - all_vars['zuul'] = args['zuul'].copy() - all_vars['zuul']['executor'] = dict( - hostname=self.executor_server.hostname, - src_root=self.jobdir.src_root, - log_root=self.jobdir.log_root, - work_root=self.jobdir.work_root, - result_data_file=self.jobdir.result_data_file) +class ExecutorMergeWorker(gear.TextWorker): + def __init__(self, executor_server, *args, **kw): + self.zuul_executor_server = executor_server + super(ExecutorMergeWorker, self).__init__(*args, **kw) - nodes = self.getHostList(args) - inventory = make_inventory_dict(nodes, args['groups'], all_vars) + def handleNoop(self, packet): + # Wait until the update queue is empty before responding + while self.zuul_executor_server.update_queue.qsize(): + time.sleep(1) - with open(self.jobdir.inventory, 'w') as inventory_yaml: - inventory_yaml.write( - yaml.safe_dump(inventory, default_flow_style=False)) + with self.zuul_executor_server.merger_lock: + super(ExecutorMergeWorker, self).handleNoop(packet) - with open(self.jobdir.known_hosts, 'w') as known_hosts: - for node in nodes: - for key in node['host_keys']: - known_hosts.write('%s\n' % key) - def writeLoggingConfig(self): - self.log.debug("Writing logging config for job %s %s", - self.jobdir.job_output_file, - self.jobdir.logging_json) - logging_config = zuul.ansible.logconfig.JobLoggingConfig( - job_output_file=self.jobdir.job_output_file) - logging_config.writeJson(self.jobdir.logging_json) +class ExecutorExecuteWorker(gear.TextWorker): + def __init__(self, executor_server, *args, **kw): + self.zuul_executor_server = executor_server + super(ExecutorExecuteWorker, self).__init__(*args, **kw) - def writeAnsibleConfig(self, jobdir_playbook): - trusted = jobdir_playbook.trusted + def handleNoop(self, packet): + # Delay our response to running a new job based on the number + # of jobs we're currently running, in an attempt to spread + # load evenly among executors. + workers = len(self.zuul_executor_server.job_workers) + delay = (workers ** 2) / 1000.0 + time.sleep(delay) + return super(ExecutorExecuteWorker, self).handleNoop(packet) - # TODO(mordred) This should likely be extracted into a more generalized - # mechanism for deployers being able to add callback - # plugins. - if ara_callbacks: - callback_path = '%s:%s' % ( - self.executor_server.callback_dir, - os.path.dirname(ara_callbacks.__file__)) - else: - callback_path = self.executor_server.callback_dir - with open(jobdir_playbook.ansible_config, 'w') as config: - config.write('[defaults]\n') - config.write('hostfile = %s\n' % self.jobdir.inventory) - config.write('local_tmp = %s/local_tmp\n' % - self.jobdir.ansible_cache_root) - config.write('retry_files_enabled = False\n') - config.write('gathering = smart\n') - config.write('fact_caching = jsonfile\n') - config.write('fact_caching_connection = %s\n' % - self.jobdir.fact_cache) - config.write('library = %s\n' - % self.executor_server.library_dir) - config.write('command_warnings = False\n') - config.write('callback_plugins = %s\n' % callback_path) - config.write('stdout_callback = zuul_stream\n') - config.write('filter_plugins = %s\n' - % self.executor_server.filter_dir) - # bump the timeout because busy nodes may take more than - # 10s to respond - config.write('timeout = 30\n') - if not trusted: - config.write('action_plugins = %s\n' - % self.executor_server.action_dir) - config.write('lookup_plugins = %s\n' - % self.executor_server.lookup_dir) - if jobdir_playbook.roles_path: - config.write('roles_path = %s\n' % ':'.join( - jobdir_playbook.roles_path)) +class ExecutorServer(object): + log = logging.getLogger("zuul.ExecutorServer") - # On playbooks with secrets we want to prevent the - # printing of args since they may be passed to a task or a - # role. Otherwise, printing the args could be useful for - # debugging. - config.write('display_args_to_stdout = %s\n' % - str(not jobdir_playbook.secrets_content)) + def __init__(self, config, connections={}, jobdir_root=None, + keep_jobdir=False, log_streaming_port=DEFAULT_FINGER_PORT): + self.config = config + self.keep_jobdir = keep_jobdir + self.jobdir_root = jobdir_root + # TODOv3(mordred): make the executor name more unique -- + # perhaps hostname+pid. + self.hostname = socket.gethostname() + self.log_streaming_port = log_streaming_port + self.merger_lock = threading.Lock() + self.verbose = False + self.command_map = dict( + stop=self.stop, + pause=self.pause, + unpause=self.unpause, + graceful=self.graceful, + verbose=self.verboseOn, + unverbose=self.verboseOff, + keep=self.keep, + nokeep=self.nokeep, + ) - # Increase the internal poll interval of ansible. - # The default interval of 0.001s is optimized for interactive - # ui at the expense of CPU load. As we have a non-interactive - # automation use case a longer poll interval is more suitable - # and reduces CPU load of the ansible process. - config.write('internal_poll_interval = 0.01\n') + self.merge_root = get_default(self.config, 'executor', 'git_dir', + '/var/lib/zuul/executor-git') + self.default_username = get_default(self.config, 'executor', + 'default_username', 'zuul') + self.disk_limit_per_job = int(get_default(self.config, 'executor', + 'disk_limit_per_job', 250)) + self.merge_email = get_default(self.config, 'merger', 'git_user_email') + self.merge_name = get_default(self.config, 'merger', 'git_user_name') + self.merge_speed_limit = get_default( + config, 'merger', 'git_http_low_speed_limit', '1000') + self.merge_speed_time = get_default( + config, 'merger', 'git_http_low_speed_time', '30') + execution_wrapper_name = get_default(self.config, 'executor', + 'execution_wrapper', 'bubblewrap') + load_multiplier = float(get_default(self.config, 'executor', + 'load_multiplier', '2.5')) + self.max_load_avg = multiprocessing.cpu_count() * load_multiplier + self.accepting_work = False + self.execution_wrapper = connections.drivers[execution_wrapper_name] - config.write('[ssh_connection]\n') - # NB: when setting pipelining = True, keep_remote_files - # must be False (the default). Otherwise it apparently - # will override the pipelining option and effectively - # disable it. Pipelining has a side effect of running the - # command without a tty (ie, without the -tt argument to - # ssh). We require this behavior so that if a job runs a - # command which expects interactive input on a tty (such - # as sudo) it does not hang. - config.write('pipelining = True\n') - ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \ - "-o UserKnownHostsFile=%s" % self.jobdir.known_hosts - config.write('ssh_args = %s\n' % ssh_args) + self.connections = connections + # This merger and its git repos are used to maintain + # up-to-date copies of all the repos that are used by jobs, as + # well as to support the merger:cat functon to supply + # configuration information to Zuul when it starts. + self.merger = self._getMerger(self.merge_root) + self.update_queue = DeduplicateQueue() - def _ansibleTimeout(self, msg): - self.log.warning(msg) - self.abortRunningProc() + state_dir = get_default(self.config, 'executor', 'state_dir', + '/var/lib/zuul', expand_user=True) + path = os.path.join(state_dir, 'executor.socket') + self.command_socket = commandsocket.CommandSocket(path) + ansible_dir = os.path.join(state_dir, 'ansible') + self.ansible_dir = ansible_dir + if os.path.exists(ansible_dir): + shutil.rmtree(ansible_dir) - def abortRunningProc(self): - with self.proc_lock: - if not self.proc: - self.log.debug("Abort: no process is running") - return - self.log.debug("Abort: sending kill signal to job " - "process group") - try: - pgid = os.getpgid(self.proc.pid) - os.killpg(pgid, signal.SIGKILL) - except Exception: - self.log.exception("Exception while killing ansible process:") + zuul_dir = os.path.join(ansible_dir, 'zuul') + plugin_dir = os.path.join(zuul_dir, 'ansible') - def runAnsible(self, cmd, timeout, playbook): - config_file = playbook.ansible_config - env_copy = os.environ.copy() - env_copy.update(self.ssh_agent.env) - if ara_callbacks: - env_copy['ARA_LOG_CONFIG'] = self.jobdir.logging_json - env_copy['ZUUL_JOB_LOG_CONFIG'] = self.jobdir.logging_json - env_copy['ZUUL_JOBDIR'] = self.jobdir.root - pythonpath = env_copy.get('PYTHONPATH') - if pythonpath: - pythonpath = [pythonpath] - else: - pythonpath = [] - pythonpath = [self.executor_server.ansible_dir] + pythonpath - env_copy['PYTHONPATH'] = os.path.pathsep.join(pythonpath) + os.makedirs(plugin_dir, mode=0o0755) - if playbook.trusted: - opt_prefix = 'trusted' - else: - opt_prefix = 'untrusted' - ro_paths = get_default(self.executor_server.config, 'executor', - '%s_ro_paths' % opt_prefix) - rw_paths = get_default(self.executor_server.config, 'executor', - '%s_rw_paths' % opt_prefix) - ro_paths = ro_paths.split(":") if ro_paths else [] - rw_paths = rw_paths.split(":") if rw_paths else [] + self.library_dir = os.path.join(plugin_dir, 'library') + self.action_dir = os.path.join(plugin_dir, 'action') + self.callback_dir = os.path.join(plugin_dir, 'callback') + self.lookup_dir = os.path.join(plugin_dir, 'lookup') + self.filter_dir = os.path.join(plugin_dir, 'filter') - ro_paths.append(self.executor_server.ansible_dir) - ro_paths.append(self.jobdir.ansible_root) - ro_paths.append(self.jobdir.trusted_root) - ro_paths.append(playbook.root) + _copy_ansible_files(zuul.ansible, plugin_dir) - rw_paths.append(self.jobdir.ansible_cache_root) + # We're copying zuul.ansible.* into a directory we are going + # to add to pythonpath, so our plugins can "import + # zuul.ansible". But we're not installing all of zuul, so + # create a __init__.py file for the stub "zuul" module. + with open(os.path.join(zuul_dir, '__init__.py'), 'w'): + pass - if self.executor_variables_file: - ro_paths.append(self.executor_variables_file) + self.job_workers = {} + self.disk_accountant = DiskAccountant(self.jobdir_root, + self.disk_limit_per_job, + self.stopJobDiskFull, + self.merge_root) - secrets = {} - if playbook.secrets_content: - secrets[playbook.secrets] = playbook.secrets_content + def _getMerger(self, root, logger=None): + if root != self.merge_root: + cache_root = self.merge_root + else: + cache_root = None + return zuul.merger.merger.Merger( + root, self.connections, self.merge_email, self.merge_name, + self.merge_speed_limit, self.merge_speed_time, cache_root, logger) - context = self.executor_server.execution_wrapper.getExecutionContext( - ro_paths, rw_paths, secrets) + def start(self): + self._running = True + self._command_running = True + server = self.config.get('gearman', 'server') + port = get_default(self.config, 'gearman', 'port', 4730) + ssl_key = get_default(self.config, 'gearman', 'ssl_key') + ssl_cert = get_default(self.config, 'gearman', 'ssl_cert') + ssl_ca = get_default(self.config, 'gearman', 'ssl_ca') + self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger') + self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca) + self.executor_worker = ExecutorExecuteWorker( + self, 'Zuul Executor Server') + self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca) + self.log.debug("Waiting for server") + self.merger_worker.waitForServer() + self.executor_worker.waitForServer() + self.log.debug("Registering") + self.register() - popen = context.getPopen( - work_dir=self.jobdir.work_root, - ssh_auth_sock=env_copy.get('SSH_AUTH_SOCK')) + self.log.debug("Starting command processor") + self.command_socket.start() + self.command_thread = threading.Thread(target=self.runCommand) + self.command_thread.daemon = True + self.command_thread.start() - env_copy['ANSIBLE_CONFIG'] = config_file - # NOTE(pabelanger): Default HOME variable to jobdir.work_root, as it is - # possible we don't bind mount current zuul user home directory. - env_copy['HOME'] = self.jobdir.work_root + self.log.debug("Starting worker") + self.update_thread = threading.Thread(target=self._updateLoop) + self.update_thread.daemon = True + self.update_thread.start() + self.merger_thread = threading.Thread(target=self.run_merger) + self.merger_thread.daemon = True + self.merger_thread.start() + self.executor_thread = threading.Thread(target=self.run_executor) + self.executor_thread.daemon = True + self.executor_thread.start() + self.governor_stop_event = threading.Event() + self.governor_thread = threading.Thread(target=self.run_governor) + self.governor_thread.daemon = True + self.governor_thread.start() + self.disk_accountant.start() - with self.proc_lock: - if self.aborted: - return (self.RESULT_ABORTED, None) - self.log.debug("Ansible command: ANSIBLE_CONFIG=%s %s", - config_file, " ".join(shlex.quote(c) for c in cmd)) - self.proc = popen( - cmd, - cwd=self.jobdir.work_root, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - preexec_fn=os.setsid, - env=env_copy, - ) + def register(self): + self.register_work() + self.executor_worker.registerFunction("executor:stop:%s" % + self.hostname) + self.merger_worker.registerFunction("merger:merge") + self.merger_worker.registerFunction("merger:cat") + self.merger_worker.registerFunction("merger:refstate") - syntax_buffer = [] - ret = None - if timeout: - watchdog = Watchdog(timeout, self._ansibleTimeout, - ("Ansible timeout exceeded",)) - watchdog.start() - try: - # Use manual idx instead of enumerate so that RESULT lines - # don't count towards BUFFER_LINES_FOR_SYNTAX - idx = 0 - for line in iter(self.proc.stdout.readline, b''): - if line.startswith(b'RESULT'): - # TODO(mordred) Process result commands if sent - continue - else: - idx += 1 - if idx < BUFFER_LINES_FOR_SYNTAX: - syntax_buffer.append(line) - line = line[:1024].rstrip() - self.log.debug("Ansible output: %s" % (line,)) - self.log.debug("Ansible output terminated") - ret = self.proc.wait() - self.log.debug("Ansible exit code: %s" % (ret,)) - finally: - if timeout: - watchdog.stop() - self.log.debug("Stopped watchdog") - self.log.debug("Stopped disk job killer") + def register_work(self): + self.accepting_work = True + self.executor_worker.registerFunction("executor:execute") - with self.proc_lock: - self.proc = None + def unregister_work(self): + self.accepting_work = False + self.executor_worker.unregisterFunction("executor:execute") - if timeout and watchdog.timed_out: - return (self.RESULT_TIMED_OUT, None) - if ret == 3: - # AnsibleHostUnreachable: We had a network issue connecting to - # our zuul-worker. - return (self.RESULT_UNREACHABLE, None) - elif ret == -9: - # Received abort request. - return (self.RESULT_ABORTED, None) - elif ret == 1: - if syntax_buffer[0].startswith('ERROR!'): - with open(self.jobdir.job_output_file, 'a') as job_output: - for line in syntax_buffer: - job_output.write("{now} | {line}\n".format( - now=datetime.datetime.now(), - line=line.decode('utf-8').rstrip())) - elif ret == 4: - # Ansible could not parse the yaml. - self.log.debug("Ansible parse error") - # TODO(mordred) If/when we rework use of logger in ansible-playbook - # we'll want to change how this works to use that as well. For now, - # this is what we need to do. - # TODO(mordred) We probably want to put this into the json output - # as well. - with open(self.jobdir.job_output_file, 'a') as job_output: - job_output.write("{now} | ANSIBLE PARSE ERROR\n".format( - now=datetime.datetime.now())) - for line in syntax_buffer: - job_output.write("{now} | {line}\n".format( - now=datetime.datetime.now(), - line=line.decode('utf-8').rstrip())) - elif ret == 250: - # Unexpected error from ansible - with open(self.jobdir.job_output_file, 'a') as job_output: - job_output.write("{now} | UNEXPECTED ANSIBLE ERROR\n".format( - now=datetime.datetime.now())) - found_marker = False - for line in syntax_buffer: - if line.startswith('ERROR! Unexpected Exception'): - found_marker = True - if not found_marker: - continue - job_output.write("{now} | {line}\n".format( - now=datetime.datetime.now(), - line=line.decode('utf-8').rstrip())) + def stop(self): + self.log.debug("Stopping") + self.disk_accountant.stop() + self.governor_stop_event.set() + self._running = False + self._command_running = False + self.command_socket.stop() + self.update_queue.put(None) + + for job_worker in list(self.job_workers.values()): + try: + job_worker.stop() + except Exception: + self.log.exception("Exception sending stop command " + "to worker:") + self.merger_worker.shutdown() + self.executor_worker.shutdown() + self.log.debug("Stopped") - return (self.RESULT_NORMAL, ret) + def pause(self): + # TODOv3: implement + pass - def runAnsibleSetup(self, playbook): - if self.executor_server.verbose: - verbose = '-vvv' - else: - verbose = '-v' + def unpause(self): + # TODOv3: implement + pass - cmd = ['ansible', '*', verbose, '-m', 'setup', - '-a', 'gather_subset=!all'] + def graceful(self): + # TODOv3: implement + pass - result, code = self.runAnsible( - cmd=cmd, timeout=60, playbook=playbook) - self.log.debug("Ansible complete, result %s code %s" % ( - self.RESULT_MAP[result], code)) - return result, code + def verboseOn(self): + self.verbose = True - def emitPlaybookBanner(self, playbook, step, phase, result=None): - # This is used to print a header and a footer, respectively at the - # beginning and the end of each playbook execution. - # We are doing it from the executor rather than from a callback because - # the parameters are not made available to the callback until it's too - # late. - phase = phase or '' - trusted = playbook.trusted - trusted = 'trusted' if trusted else 'untrusted' - branch = playbook.branch - playbook = playbook.canonical_name_and_path + def verboseOff(self): + self.verbose = False - if phase and phase != 'run': - phase = '{phase}-run'.format(phase=phase) - phase = phase.upper() + def keep(self): + self.keep_jobdir = True - if result is not None: - result = self.RESULT_MAP[result] - msg = "{phase} {step} {result}: [{trusted} : {playbook}@{branch}]" - msg = msg.format(phase=phase, step=step, result=result, - trusted=trusted, playbook=playbook, branch=branch) - else: - msg = "{phase} {step}: [{trusted} : {playbook}@{branch}]" - msg = msg.format(phase=phase, step=step, trusted=trusted, - playbook=playbook, branch=branch) + def nokeep(self): + self.keep_jobdir = False - with open(self.jobdir.job_output_file, 'a') as job_output: - job_output.write("{now} | {msg}\n".format( - now=datetime.datetime.now(), - msg=msg)) + def join(self): + self.update_thread.join() + self.merger_thread.join() + self.executor_thread.join() + self.governor_thread.join() - def runAnsiblePlaybook(self, playbook, timeout, success=None, - phase=None, index=None): - if self.executor_server.verbose: - verbose = '-vvv' - else: - verbose = '-v' + def runCommand(self): + while self._command_running: + try: + command = self.command_socket.get().decode('utf8') + if command != '_stop': + self.command_map[command]() + except Exception: + self.log.exception("Exception while processing command") - cmd = ['ansible-playbook', verbose, playbook.path] - if playbook.secrets_content: - cmd.extend(['-e', '@' + playbook.secrets]) + def _updateLoop(self): + while self._running: + try: + self._innerUpdateLoop() + except: + self.log.exception("Exception in update thread:") - if success is not None: - cmd.extend(['-e', 'zuul_success=%s' % str(bool(success))]) + def _innerUpdateLoop(self): + # Inside of a loop that keeps the main repositories up to date + task = self.update_queue.get() + if task is None: + # We are asked to stop + return + with self.merger_lock: + self.log.info("Updating repo %s/%s" % ( + task.connection_name, task.project_name)) + self.merger.updateRepo(task.connection_name, task.project_name) + self.log.debug("Finished updating repo %s/%s" % + (task.connection_name, task.project_name)) + task.setComplete() - if phase: - cmd.extend(['-e', 'zuul_execution_phase=%s' % phase]) + def update(self, connection_name, project_name): + # Update a repository in the main merger + task = UpdateTask(connection_name, project_name) + task = self.update_queue.put(task) + return task - if index is not None: - cmd.extend(['-e', 'zuul_execution_phase_index=%s' % index]) + def run_merger(self): + self.log.debug("Starting merger listener") + while self._running: + try: + job = self.merger_worker.getJob() + try: + if job.name == 'merger:cat': + self.log.debug("Got cat job: %s" % job.unique) + self.cat(job) + elif job.name == 'merger:merge': + self.log.debug("Got merge job: %s" % job.unique) + self.merge(job) + elif job.name == 'merger:refstate': + self.log.debug("Got refstate job: %s" % job.unique) + self.refstate(job) + else: + self.log.error("Unable to handle job %s" % job.name) + job.sendWorkFail() + except Exception: + self.log.exception("Exception while running job") + job.sendWorkException( + traceback.format_exc().encode('utf8')) + except gear.InterruptedError: + pass + except Exception: + self.log.exception("Exception while getting job") - cmd.extend(['-e', 'zuul_execution_trusted=%s' % str(playbook.trusted)]) - cmd.extend([ - '-e', - 'zuul_execution_canonical_name_and_path=%s' - % playbook.canonical_name_and_path]) - cmd.extend(['-e', 'zuul_execution_branch=%s' % str(playbook.branch)]) + def run_executor(self): + self.log.debug("Starting executor listener") + while self._running: + try: + job = self.executor_worker.getJob() + try: + if job.name == 'executor:execute': + self.log.debug("Got execute job: %s" % job.unique) + self.executeJob(job) + elif job.name.startswith('executor:stop'): + self.log.debug("Got stop job: %s" % job.unique) + self.stopJob(job) + else: + self.log.error("Unable to handle job %s" % job.name) + job.sendWorkFail() + except Exception: + self.log.exception("Exception while running job") + job.sendWorkException( + traceback.format_exc().encode('utf8')) + except gear.InterruptedError: + pass + except Exception: + self.log.exception("Exception while getting job") - if self.executor_variables_file is not None: - cmd.extend(['-e@%s' % self.executor_variables_file]) + def run_governor(self): + while not self.governor_stop_event.wait(30): + self.manageLoad() - self.emitPlaybookBanner(playbook, 'START', phase) + def executeJob(self, job): + self.job_workers[job.unique] = AnsibleJob(self, job) + self.job_workers[job.unique].run() - result, code = self.runAnsible( - cmd=cmd, timeout=timeout, playbook=playbook) - self.log.debug("Ansible complete, result %s code %s" % ( - self.RESULT_MAP[result], code)) + def manageLoad(self): + ''' Apply some heuristics to decide whether or not we should + be askign for more jobs ''' + load_avg = os.getloadavg()[0] + if self.accepting_work: + # Don't unregister if we don't have any active jobs. + if load_avg > self.max_load_avg and self.job_workers: + self.log.info( + "Unregistering due to high system load {} > {}".format( + load_avg, self.max_load_avg)) + self.unregister_work() + elif load_avg <= self.max_load_avg: + self.log.info( + "Re-registering as load is within limits {} <= {}".format( + load_avg, self.max_load_avg)) + self.register_work() - self.emitPlaybookBanner(playbook, 'END', phase, result=result) - return result, code + def finishJob(self, unique): + del(self.job_workers[unique]) + + def stopJobDiskFull(self, jobdir): + unique = os.path.basename(jobdir) + self.stopJobByUnique(unique, reason=AnsibleJob.RESULT_DISK_FULL) + + def stopJob(self, job): + try: + args = json.loads(job.arguments) + self.log.debug("Stop job with arguments: %s" % (args,)) + unique = args['uuid'] + self.stopJobByUnique(unique) + finally: + job.sendWorkComplete() + + def stopJobByUnique(self, unique, reason=None): + job_worker = self.job_workers.get(unique) + if not job_worker: + self.log.debug("Unable to find worker for job %s" % (unique,)) + return + try: + job_worker.stop(reason) + except Exception: + self.log.exception("Exception sending stop command " + "to worker:") + + def cat(self, job): + args = json.loads(job.arguments) + task = self.update(args['connection'], args['project']) + task.wait() + with self.merger_lock: + files = self.merger.getFiles(args['connection'], args['project'], + args['branch'], args['files'], + args.get('dirs', [])) + result = dict(updated=True, + files=files) + job.sendWorkComplete(json.dumps(result)) + + def refstate(self, job): + args = json.loads(job.arguments) + with self.merger_lock: + success, repo_state = self.merger.getRepoState(args['items']) + result = dict(updated=success, + repo_state=repo_state) + job.sendWorkComplete(json.dumps(result)) + + def merge(self, job): + args = json.loads(job.arguments) + with self.merger_lock: + ret = self.merger.mergeChanges(args['items'], args.get('files'), + args.get('dirs', []), + args.get('repo_state')) + result = dict(merged=(ret is not None)) + if ret is None: + result['commit'] = result['files'] = result['repo_state'] = None + else: + (result['commit'], result['files'], result['repo_state'], + recent) = ret + job.sendWorkComplete(json.dumps(result))